feat: compact command with auto-compact

This commit is contained in:
adamdottv
2025-05-02 09:24:24 -05:00
committed by Adam
parent 364cf5b429
commit 49423da081
16 changed files with 507 additions and 73 deletions

View File

@@ -6,6 +6,7 @@ import (
"fmt"
"strings"
"sync"
"time"
"github.com/opencode-ai/opencode/internal/config"
"github.com/opencode-ai/opencode/internal/llm/models"
@@ -43,6 +44,9 @@ type Service interface {
IsSessionBusy(sessionID string) bool
IsBusy() bool
Update(agentName config.AgentName, modelID models.ModelID) (models.Model, error)
CompactSession(ctx context.Context, sessionID string) error
PauseSession(sessionID string) error
ResumeSession(sessionID string) error
}
type agent struct {
@@ -55,6 +59,7 @@ type agent struct {
titleProvider provider.Provider
activeRequests sync.Map
pauseLock sync.RWMutex // Lock for pausing message processing
}
func NewAgent(
@@ -187,12 +192,30 @@ func (a *agent) Run(ctx context.Context, sessionID string, content string) (<-ch
}
func (a *agent) processGeneration(ctx context.Context, sessionID, content string) AgentEvent {
// List existing messages; if none, start title generation asynchronously.
msgs, err := a.messages.List(ctx, sessionID)
// Get the current session to check for summary
currentSession, err := a.sessions.Get(ctx, sessionID)
if err != nil {
return a.err(fmt.Errorf("failed to list messages: %w", err))
return a.err(fmt.Errorf("failed to get session: %w", err))
}
if len(msgs) == 0 {
// Fetch messages based on whether a summary exists
var sessionMessages []message.Message
if currentSession.Summary != "" && currentSession.SummarizedAt > 0 {
// If summary exists, only fetch messages after the summarization timestamp
sessionMessages, err = a.messages.ListAfter(ctx, sessionID, currentSession.SummarizedAt)
if err != nil {
return a.err(fmt.Errorf("failed to list messages after summary: %w", err))
}
} else {
// If no summary, fetch all messages
sessionMessages, err = a.messages.List(ctx, sessionID)
if err != nil {
return a.err(fmt.Errorf("failed to list messages: %w", err))
}
}
// If this is a new session, start title generation asynchronously
if len(sessionMessages) == 0 && currentSession.Summary == "" {
go func() {
defer logging.RecoverPanic("agent.Run", func() {
logging.ErrorPersist("panic while generating title")
@@ -209,8 +232,25 @@ func (a *agent) processGeneration(ctx context.Context, sessionID, content string
return a.err(fmt.Errorf("failed to create user message: %w", err))
}
// Append the new user message to the conversation history.
msgHistory := append(msgs, userMsg)
// Prepare the message history for the LLM
var messages []message.Message
if currentSession.Summary != "" && currentSession.SummarizedAt > 0 {
// If summary exists, create a temporary message for the summary
summaryMessage := message.Message{
Role: message.Assistant,
Parts: []message.ContentPart{
message.TextContent{Text: currentSession.Summary},
},
}
// Start with the summary, then add messages after the summary timestamp
messages = append([]message.Message{summaryMessage}, sessionMessages...)
} else {
// If no summary, just use all messages
messages = sessionMessages
}
// Append the new user message to the conversation history
messages = append(messages, userMsg)
for {
// Check for cancellation before each iteration
select {
@@ -219,7 +259,7 @@ func (a *agent) processGeneration(ctx context.Context, sessionID, content string
default:
// Continue processing
}
agentMessage, toolResults, err := a.streamAndHandleEvents(ctx, sessionID, msgHistory)
agentMessage, toolResults, err := a.streamAndHandleEvents(ctx, sessionID, messages)
if err != nil {
if errors.Is(err, context.Canceled) {
agentMessage.AddFinish(message.FinishReasonCanceled)
@@ -231,7 +271,7 @@ func (a *agent) processGeneration(ctx context.Context, sessionID, content string
logging.Info("Result", "message", agentMessage.FinishReason(), "toolResults", toolResults)
if (agentMessage.FinishReason() == message.FinishReasonToolUse) && toolResults != nil {
// We are not done, we need to respond with the tool response
msgHistory = append(msgHistory, agentMessage, *toolResults)
messages = append(messages, agentMessage, *toolResults)
continue
}
return AgentEvent{
@@ -249,7 +289,76 @@ func (a *agent) createUserMessage(ctx context.Context, sessionID, content string
})
}
// estimateTokens provides a rough estimate of token count based on character count
// using a simple heuristic of ~4 characters per token
func estimateTokens(messages []message.Message) int64 {
totalChars := 0
for _, msg := range messages {
// Get text content from all parts
for _, part := range msg.Parts {
if textContent, ok := part.(message.TextContent); ok {
totalChars += len(textContent.Text)
} else {
// For non-text parts, add a conservative estimate
totalChars += 100
}
}
// Add chars for role (conservative estimate)
totalChars += 10
}
// Heuristic: ~4 chars per token
return int64(totalChars / 4)
}
func (a *agent) streamAndHandleEvents(ctx context.Context, sessionID string, msgHistory []message.Message) (message.Message, *message.Message, error) {
// Check if we need to auto-compact based on token count
contextWindow := a.provider.Model().ContextWindow
threshold := int64(float64(contextWindow) * 0.80)
estimatedTokens := estimateTokens(msgHistory)
// If we're approaching the context window limit, trigger auto-compaction
if estimatedTokens >= threshold {
logging.InfoPersist(fmt.Sprintf("Auto-compaction triggered for session %s. Estimated tokens: %d, Threshold: %d", sessionID, estimatedTokens, threshold))
// Perform compaction with pause/resume to ensure safety
if err := a.CompactSession(ctx, sessionID); err != nil {
logging.ErrorPersist(fmt.Sprintf("Auto-compaction failed: %v", err))
// Continue with the request even if compaction fails
} else {
// Re-fetch session details after compaction
currentSession, err := a.sessions.Get(ctx, sessionID)
if err != nil {
return message.Message{}, nil, fmt.Errorf("failed to get session after compaction: %w", err)
}
// Re-prepare messages using the new summary
var sessionMessages []message.Message
if currentSession.Summary != "" && currentSession.SummarizedAt > 0 {
// If summary exists, only fetch messages after the summarization timestamp
sessionMessages, err = a.messages.ListAfter(ctx, sessionID, currentSession.SummarizedAt)
if err != nil {
return message.Message{}, nil, fmt.Errorf("failed to list messages after compaction: %w", err)
}
// Create a new message history with the summary and messages after summarization
summaryMessage := message.Message{
Role: message.Assistant,
Parts: []message.ContentPart{
message.TextContent{Text: currentSession.Summary},
},
}
// Replace msgHistory with the new compacted version
msgHistory = append([]message.Message{summaryMessage}, sessionMessages...)
// Log the new token estimate after compaction
newEstimate := estimateTokens(msgHistory)
logging.InfoPersist(fmt.Sprintf("After compaction: Estimated tokens: %d (reduced by %d)",
newEstimate, estimatedTokens-newEstimate))
}
}
}
eventChan := a.provider.StreamResponse(ctx, msgHistory, a.tools)
assistantMsg, err := a.messages.Create(ctx, sessionID, message.CreateMessageParams{
@@ -374,6 +483,10 @@ func (a *agent) processEvent(ctx context.Context, sessionID string, assistantMsg
// Continue processing.
}
// Check if session is paused - use RLock to allow concurrent reads but block during pause
a.pauseLock.RLock()
defer a.pauseLock.RUnlock()
switch event.Type {
case provider.EventThinkingDelta:
assistantMsg.AppendReasoningContent(event.Content)
@@ -456,6 +569,145 @@ func (a *agent) Update(agentName config.AgentName, modelID models.ModelID) (mode
return a.provider.Model(), nil
}
// PauseSession pauses message processing for a specific session
// This should be called before performing operations that require exclusive access
func (a *agent) PauseSession(sessionID string) error {
if !a.IsSessionBusy(sessionID) {
return nil // Session is not active, no need to pause
}
logging.InfoPersist(fmt.Sprintf("Pausing session: %s", sessionID))
a.pauseLock.Lock() // Acquire write lock to block new operations
return nil
}
// ResumeSession resumes message processing for a session
// This should be called after completing operations that required exclusive access
func (a *agent) ResumeSession(sessionID string) error {
logging.InfoPersist(fmt.Sprintf("Resuming session: %s", sessionID))
a.pauseLock.Unlock() // Release write lock to allow operations to continue
return nil
}
func (a *agent) CompactSession(ctx context.Context, sessionID string) error {
// Check if the session is busy
if a.IsSessionBusy(sessionID) {
// Pause the session before compaction
if err := a.PauseSession(sessionID); err != nil {
return fmt.Errorf("failed to pause session: %w", err)
}
// Make sure to resume the session when we're done
defer a.ResumeSession(sessionID)
logging.InfoPersist(fmt.Sprintf("Session %s paused for compaction", sessionID))
}
// Create a cancellable context
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Mark the session as busy during compaction
compactionCancelFunc := func() {}
a.activeRequests.Store(sessionID+"-compact", compactionCancelFunc)
defer a.activeRequests.Delete(sessionID + "-compact")
// Fetch the session
session, err := a.sessions.Get(ctx, sessionID)
if err != nil {
return fmt.Errorf("failed to get session: %w", err)
}
// Fetch all messages for the session
sessionMessages, err := a.messages.List(ctx, sessionID)
if err != nil {
return fmt.Errorf("failed to list messages: %w", err)
}
var existingSummary string
if session.Summary != "" && session.SummarizedAt > 0 {
// Filter messages that were created after the last summarization
var newMessages []message.Message
for _, msg := range sessionMessages {
if msg.CreatedAt > session.SummarizedAt {
newMessages = append(newMessages, msg)
}
}
sessionMessages = newMessages
existingSummary = session.Summary
}
// If there are no messages to summarize and no existing summary, return early
if len(sessionMessages) == 0 && existingSummary == "" {
return nil
}
messages := []message.Message{
message.Message{
Role: message.System,
Parts: []message.ContentPart{
message.TextContent{
Text: "You are a helpful AI assistant tasked with summarizing conversations.",
},
},
},
}
// If there's an existing summary, include it
if existingSummary != "" {
messages = append(messages, message.Message{
Role: message.Assistant, // TODO: should this be system or user instead?
Parts: []message.ContentPart{
message.TextContent{
Text: existingSummary,
},
},
})
}
// Add all messages since the last summarized message
messages = append(messages, sessionMessages...)
// Add a final user message requesting the summary
messages = append(messages, message.Message{
Role: message.User,
Parts: []message.ContentPart{
message.TextContent{
Text: "Provide a detailed but concise summary of our conversation above. Focus on information that would be helpful for continuing the conversation, including what we did, what we're doing, which files we're working on, and what we're going to do next.",
},
},
})
// Call provider to get the summary
response, err := a.provider.SendMessages(ctx, messages, a.tools)
if err != nil {
return fmt.Errorf("failed to get summary from the assistant: %w", err)
}
// Extract the summary text
summaryText := strings.TrimSpace(response.Content)
if summaryText == "" {
return fmt.Errorf("received empty summary from the assistant")
}
// Update the session with the new summary
currentTime := time.Now().UnixMilli()
session.Summary = summaryText
session.SummarizedAt = currentTime
// Save the updated session
_, err = a.sessions.Save(ctx, session)
if err != nil {
return fmt.Errorf("failed to save session with summary: %w", err)
}
// Track token usage
err = a.TrackUsage(ctx, sessionID, a.provider.Model(), response.Usage)
if err != nil {
return fmt.Errorf("failed to track usage: %w", err)
}
return nil
}
func createAgentProvider(agentName config.AgentName) (provider.Provider, error) {
cfg := config.Get()
agentConfig, ok := cfg.Agents[agentName]