1
0
mirror of https://github.com/coalaura/whiskr.git synced 2025-12-02 20:22:52 +00:00
Files
whiskr/stream.go

121 lines
2.0 KiB
Go
Raw Normal View History

2025-08-05 03:56:23 +02:00
package main
import (
2025-08-29 19:26:55 +02:00
"bytes"
"context"
2025-09-12 21:39:17 +02:00
"encoding/binary"
2025-08-05 03:56:23 +02:00
"errors"
"net/http"
2025-08-29 19:26:55 +02:00
"sync"
2025-08-11 01:21:05 +02:00
"github.com/revrost/go-openrouter"
2025-09-12 21:39:17 +02:00
"github.com/vmihailenco/msgpack/v5"
2025-08-05 03:56:23 +02:00
)
2025-09-12 21:39:17 +02:00
const (
ChunkStart ChunkType = 0
ChunkID ChunkType = 1
ChunkReasoning ChunkType = 2
ChunkText ChunkType = 3
ChunkImage ChunkType = 4
ChunkTool ChunkType = 5
ChunkError ChunkType = 6
ChunkEnd ChunkType = 7
)
type ChunkType uint8
2025-08-05 03:56:23 +02:00
type Chunk struct {
2025-09-12 21:39:17 +02:00
Type ChunkType
Data any
2025-08-05 03:56:23 +02:00
}
type Stream struct {
2025-08-29 19:26:55 +02:00
wr http.ResponseWriter
ctx context.Context
2025-08-05 03:56:23 +02:00
}
2025-08-29 19:26:55 +02:00
var pool = sync.Pool{
New: func() interface{} {
return &bytes.Buffer{}
},
}
2025-08-05 03:56:23 +02:00
2025-08-31 23:46:22 +02:00
func GetFreeBuffer() *bytes.Buffer {
buf := pool.Get().(*bytes.Buffer)
buf.Reset()
return buf
}
2025-08-29 19:26:55 +02:00
func NewStream(w http.ResponseWriter, ctx context.Context) (*Stream, error) {
2025-08-05 03:56:23 +02:00
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
return &Stream{
2025-08-29 19:26:55 +02:00
wr: w,
ctx: ctx,
2025-08-05 03:56:23 +02:00
}, nil
}
2025-09-12 21:39:17 +02:00
func NewChunk(typ ChunkType, data any) *Chunk {
return &Chunk{
Type: typ,
Data: data,
2025-08-31 23:41:28 +02:00
}
}
2025-08-11 01:21:05 +02:00
func GetErrorMessage(err error) string {
if apiErr, ok := err.(*openrouter.APIError); ok {
return apiErr.Error()
}
return err.Error()
}
2025-08-29 19:26:55 +02:00
2025-09-12 21:39:17 +02:00
func (s *Stream) WriteChunk(chunk *Chunk) error {
debugIf(chunk.Type == ChunkError, "error: %v", chunk.Data)
if err := s.ctx.Err(); err != nil {
2025-08-29 19:26:55 +02:00
return err
}
2025-08-31 23:46:22 +02:00
buf := GetFreeBuffer()
2025-08-29 19:26:55 +02:00
defer pool.Put(buf)
2025-09-12 21:39:17 +02:00
binary.Write(buf, binary.LittleEndian, chunk.Type)
if chunk.Data != nil {
data, err := msgpack.Marshal(chunk.Data)
if err != nil {
return err
}
2025-08-29 19:26:55 +02:00
2025-09-12 21:39:17 +02:00
binary.Write(buf, binary.LittleEndian, uint32(len(data)))
buf.Write(data)
} else {
binary.Write(buf, binary.LittleEndian, uint32(0))
}
2025-08-29 19:26:55 +02:00
2025-09-12 21:39:17 +02:00
if _, err := s.wr.Write(buf.Bytes()); err != nil {
2025-08-29 19:26:55 +02:00
return err
}
2025-09-12 21:39:17 +02:00
flusher, ok := s.wr.(http.Flusher)
2025-08-29 19:26:55 +02:00
if !ok {
return errors.New("failed to create flusher")
}
select {
2025-09-12 21:39:17 +02:00
case <-s.ctx.Done():
return s.ctx.Err()
2025-08-29 19:26:55 +02:00
default:
flusher.Flush()
return nil
}
}