diff --git a/packages/tui/cmd/opencode/main.go b/packages/tui/cmd/opencode/main.go index 22841fc8..3a7d1848 100644 --- a/packages/tui/cmd/opencode/main.go +++ b/packages/tui/cmd/opencode/main.go @@ -13,9 +13,11 @@ import ( flag "github.com/spf13/pflag" "github.com/sst/opencode-sdk-go" "github.com/sst/opencode-sdk-go/option" + "github.com/sst/opencode-sdk-go/packages/ssestream" "github.com/sst/opencode/internal/api" "github.com/sst/opencode/internal/app" "github.com/sst/opencode/internal/clipboard" + "github.com/sst/opencode/internal/decoders" "github.com/sst/opencode/internal/tui" "github.com/sst/opencode/internal/util" "golang.org/x/sync/errgroup" @@ -61,6 +63,11 @@ func main() { } } + // Register custom SSE decoder to handle large events (>32MB) + // This is a workaround for the bufio.Scanner token size limit in the auto-generated SDK + // See: packages/tui/internal/decoders/decoder.go + ssestream.RegisterDecoder("text/event-stream", decoders.NewUnboundedDecoder) + httpClient := opencode.NewClient( option.WithBaseURL(url), ) diff --git a/packages/tui/internal/decoders/decoder.go b/packages/tui/internal/decoders/decoder.go new file mode 100644 index 00000000..efb69920 --- /dev/null +++ b/packages/tui/internal/decoders/decoder.go @@ -0,0 +1,118 @@ +package decoders + +import ( + "bufio" + "bytes" + "io" + + "github.com/sst/opencode-sdk-go/packages/ssestream" +) + +// UnboundedDecoder is an SSE decoder that uses bufio.Reader instead of bufio.Scanner +// to avoid the 32MB token size limit. This is a workaround for large SSE events until +// the upstream Stainless SDK is fixed. +// +// This decoder handles SSE events of unlimited size by reading line-by-line with +// bufio.Reader.ReadBytes('\n'), which dynamically grows the buffer as needed. +type UnboundedDecoder struct { + reader *bufio.Reader + closer io.ReadCloser + evt ssestream.Event + err error +} + +// NewUnboundedDecoder creates a new unbounded SSE decoder with a 1MB initial buffer size +func NewUnboundedDecoder(rc io.ReadCloser) ssestream.Decoder { + reader := bufio.NewReaderSize(rc, 1024*1024) // 1MB initial buffer + return &UnboundedDecoder{ + reader: reader, + closer: rc, + } +} + +// Next reads and decodes the next SSE event from the stream +func (d *UnboundedDecoder) Next() bool { + if d.err != nil { + return false + } + + event := "" + data := bytes.NewBuffer(nil) + + for { + line, err := d.reader.ReadBytes('\n') + if err != nil { + if err == io.EOF && len(line) == 0 { + return false + } + if err != io.EOF { + d.err = err + return false + } + } + + // Remove trailing newline characters + line = bytes.TrimRight(line, "\r\n") + + // Empty line indicates end of event + if len(line) == 0 { + if data.Len() > 0 || event != "" { + d.evt = ssestream.Event{ + Type: event, + Data: data.Bytes(), + } + return true + } + continue + } + + // Skip comments (lines starting with ':') + if line[0] == ':' { + continue + } + + // Parse field + name, value, found := bytes.Cut(line, []byte(":")) + if !found { + // Field with no value + continue + } + + // Remove leading space from value + if len(value) > 0 && value[0] == ' ' { + value = value[1:] + } + + switch string(name) { + case "": + // An empty line in the form ": something" is a comment and should be ignored + continue + case "event": + event = string(value) + case "data": + _, d.err = data.Write(value) + if d.err != nil { + return false + } + _, d.err = data.WriteRune('\n') + if d.err != nil { + return false + } + } + } +} + +// Event returns the current event +func (d *UnboundedDecoder) Event() ssestream.Event { + return d.evt +} + +// Close closes the underlying reader +func (d *UnboundedDecoder) Close() error { + return d.closer.Close() +} + +// Err returns any error that occurred during decoding +func (d *UnboundedDecoder) Err() error { + return d.err +} diff --git a/packages/tui/internal/decoders/decoder_test.go b/packages/tui/internal/decoders/decoder_test.go new file mode 100644 index 00000000..e5ad1d55 --- /dev/null +++ b/packages/tui/internal/decoders/decoder_test.go @@ -0,0 +1,194 @@ +package decoders + +import ( + "bytes" + "io" + "strings" + "testing" +) + +func TestUnboundedDecoder_SmallEvent(t *testing.T) { + data := "event: test\ndata: hello world\n\n" + rc := io.NopCloser(strings.NewReader(data)) + decoder := NewUnboundedDecoder(rc) + + if !decoder.Next() { + t.Fatal("Expected Next() to return true") + } + + evt := decoder.Event() + if evt.Type != "test" { + t.Errorf("Expected event type 'test', got '%s'", evt.Type) + } + if string(evt.Data) != "hello world\n" { + t.Errorf("Expected data 'hello world\\n', got '%s'", string(evt.Data)) + } + + if decoder.Next() { + t.Error("Expected Next() to return false at end of stream") + } + + if err := decoder.Err(); err != nil { + t.Errorf("Expected no error, got %v", err) + } +} + +func TestUnboundedDecoder_LargeEvent(t *testing.T) { + // Create a large event (50MB) + size := 50 * 1024 * 1024 + largeData := strings.Repeat("x", size) + + var buf bytes.Buffer + buf.WriteString("event: large\n") + buf.WriteString("data: ") + buf.WriteString(largeData) + buf.WriteString("\n\n") + + rc := io.NopCloser(&buf) + decoder := NewUnboundedDecoder(rc) + + if !decoder.Next() { + t.Fatal("Expected Next() to return true") + } + + evt := decoder.Event() + if evt.Type != "large" { + t.Errorf("Expected event type 'large', got '%s'", evt.Type) + } + + expectedData := largeData + "\n" + if string(evt.Data) != expectedData { + t.Errorf("Data size mismatch: expected %d, got %d", len(expectedData), len(evt.Data)) + } + + if decoder.Next() { + t.Error("Expected Next() to return false at end of stream") + } + + if err := decoder.Err(); err != nil { + t.Errorf("Expected no error, got %v", err) + } +} + +func TestUnboundedDecoder_MultipleEvents(t *testing.T) { + data := "event: first\ndata: data1\n\nevent: second\ndata: data2\n\n" + rc := io.NopCloser(strings.NewReader(data)) + decoder := NewUnboundedDecoder(rc) + + // First event + if !decoder.Next() { + t.Fatal("Expected Next() to return true for first event") + } + evt := decoder.Event() + if evt.Type != "first" { + t.Errorf("Expected event type 'first', got '%s'", evt.Type) + } + if string(evt.Data) != "data1\n" { + t.Errorf("Expected data 'data1\\n', got '%s'", string(evt.Data)) + } + + // Second event + if !decoder.Next() { + t.Fatal("Expected Next() to return true for second event") + } + evt = decoder.Event() + if evt.Type != "second" { + t.Errorf("Expected event type 'second', got '%s'", evt.Type) + } + if string(evt.Data) != "data2\n" { + t.Errorf("Expected data 'data2\\n', got '%s'", string(evt.Data)) + } + + // No more events + if decoder.Next() { + t.Error("Expected Next() to return false at end of stream") + } + + if err := decoder.Err(); err != nil { + t.Errorf("Expected no error, got %v", err) + } +} + +func TestUnboundedDecoder_MultilineData(t *testing.T) { + data := "event: multiline\ndata: line1\ndata: line2\ndata: line3\n\n" + rc := io.NopCloser(strings.NewReader(data)) + decoder := NewUnboundedDecoder(rc) + + if !decoder.Next() { + t.Fatal("Expected Next() to return true") + } + + evt := decoder.Event() + if evt.Type != "multiline" { + t.Errorf("Expected event type 'multiline', got '%s'", evt.Type) + } + + expectedData := "line1\nline2\nline3\n" + if string(evt.Data) != expectedData { + t.Errorf("Expected data '%s', got '%s'", expectedData, string(evt.Data)) + } +} + +func TestUnboundedDecoder_Comments(t *testing.T) { + data := ": this is a comment\nevent: test\n: another comment\ndata: hello\n\n" + rc := io.NopCloser(strings.NewReader(data)) + decoder := NewUnboundedDecoder(rc) + + if !decoder.Next() { + t.Fatal("Expected Next() to return true") + } + + evt := decoder.Event() + if evt.Type != "test" { + t.Errorf("Expected event type 'test', got '%s'", evt.Type) + } + if string(evt.Data) != "hello\n" { + t.Errorf("Expected data 'hello\\n', got '%s'", string(evt.Data)) + } +} + +func TestUnboundedDecoder_NoEventType(t *testing.T) { + data := "data: hello\n\n" + rc := io.NopCloser(strings.NewReader(data)) + decoder := NewUnboundedDecoder(rc) + + if !decoder.Next() { + t.Fatal("Expected Next() to return true") + } + + evt := decoder.Event() + if evt.Type != "" { + t.Errorf("Expected empty event type, got '%s'", evt.Type) + } + if string(evt.Data) != "hello\n" { + t.Errorf("Expected data 'hello\\n', got '%s'", string(evt.Data)) + } +} + +func BenchmarkUnboundedDecoder_LargeEvent(b *testing.B) { + // Create a 10MB event for benchmarking + size := 10 * 1024 * 1024 + largeData := strings.Repeat("x", size) + + var buf bytes.Buffer + buf.WriteString("event: bench\n") + buf.WriteString("data: ") + buf.WriteString(largeData) + buf.WriteString("\n\n") + + data := buf.Bytes() + + b.ResetTimer() + b.SetBytes(int64(len(data))) + + for i := 0; i < b.N; i++ { + rc := io.NopCloser(bytes.NewReader(data)) + decoder := NewUnboundedDecoder(rc) + + if !decoder.Next() { + b.Fatal("Expected Next() to return true") + } + + _ = decoder.Event() + } +}