cleanup retry

This commit is contained in:
Kujtim Hoxha
2025-04-08 20:46:40 +02:00
parent fde04bbf85
commit 0d8d324ac6
2 changed files with 48 additions and 81 deletions

View File

@@ -135,40 +135,9 @@ func (a *anthropicProvider) StreamResponse(ctx context.Context, messages []messa
attempts := 0 attempts := 0
for { for {
// If this isn't the first attempt, we're retrying
if attempts > 0 {
if attempts > maxRetries {
eventChan <- ProviderEvent{
Type: EventError,
Error: errors.New("maximum retry attempts reached for rate limit (429)"),
}
return
}
// Inform user we're retrying with attempt number
eventChan <- ProviderEvent{
Type: EventWarning,
Info: fmt.Sprintf("[Retrying due to rate limit... attempt %d of %d]", attempts, maxRetries),
}
// Calculate backoff with exponential backoff and jitter
backoffMs := 2000 * (1 << (attempts - 1)) // 2s, 4s, 8s, 16s, 32s
jitterMs := int(float64(backoffMs) * 0.2)
totalBackoffMs := backoffMs + jitterMs
// Sleep with backoff, respecting context cancellation
select {
case <-ctx.Done():
eventChan <- ProviderEvent{Type: EventError, Error: ctx.Err()}
return
case <-time.After(time.Duration(totalBackoffMs) * time.Millisecond):
// Continue with retry
}
}
attempts++ attempts++
// Create new streaming request
stream := a.client.Messages.NewStreaming( stream := a.client.Messages.NewStreaming(
ctx, ctx,
anthropic.MessageNewParams{ anthropic.MessageNewParams{
@@ -189,11 +158,8 @@ func (a *anthropicProvider) StreamResponse(ctx context.Context, messages []messa
}, },
) )
// Process stream events
accumulatedMessage := anthropic.Message{} accumulatedMessage := anthropic.Message{}
streamSuccess := false
// Process the stream until completion or error
for stream.Next() { for stream.Next() {
event := stream.Current() event := stream.Current()
err := accumulatedMessage.Accumulate(event) err := accumulatedMessage.Accumulate(event)
@@ -223,7 +189,6 @@ func (a *anthropicProvider) StreamResponse(ctx context.Context, messages []messa
eventChan <- ProviderEvent{Type: EventContentStop} eventChan <- ProviderEvent{Type: EventContentStop}
case anthropic.MessageStopEvent: case anthropic.MessageStopEvent:
streamSuccess = true
content := "" content := ""
for _, block := range accumulatedMessage.Content { for _, block := range accumulatedMessage.Content {
if text, ok := block.AsAny().(anthropic.TextBlock); ok { if text, ok := block.AsAny().(anthropic.TextBlock); ok {
@@ -246,51 +211,59 @@ func (a *anthropicProvider) StreamResponse(ctx context.Context, messages []messa
} }
} }
// If the stream completed successfully, we're done err := stream.Err()
if streamSuccess { if err == nil {
return return
} }
// Check for stream errors var apierr *anthropic.Error
err := stream.Err() if !errors.As(err, &apierr) {
if err != nil {
var apierr *anthropic.Error
if errors.As(err, &apierr) {
if apierr.StatusCode == 429 || apierr.StatusCode == 529 {
// Check for Retry-After header
if retryAfterValues := apierr.Response.Header.Values("Retry-After"); len(retryAfterValues) > 0 {
// Parse the retry after value (seconds)
var retryAfterSec int
if _, err := fmt.Sscanf(retryAfterValues[0], "%d", &retryAfterSec); err == nil {
retryMs := retryAfterSec * 1000
// Inform user of retry with specific wait time
eventChan <- ProviderEvent{
Type: EventWarning,
Info: fmt.Sprintf("[Rate limited: waiting %d seconds as specified by API]", retryAfterSec),
}
// Sleep respecting context cancellation
select {
case <-ctx.Done():
eventChan <- ProviderEvent{Type: EventError, Error: ctx.Err()}
return
case <-time.After(time.Duration(retryMs) * time.Millisecond):
// Continue with retry after specified delay
continue
}
}
}
// Fall back to exponential backoff if Retry-After parsing failed
continue
}
}
// For non-rate limit errors, report and exit
eventChan <- ProviderEvent{Type: EventError, Error: err} eventChan <- ProviderEvent{Type: EventError, Error: err}
return return
} }
if apierr.StatusCode != 429 && apierr.StatusCode != 529 {
eventChan <- ProviderEvent{Type: EventError, Error: err}
return
}
if attempts > maxRetries {
eventChan <- ProviderEvent{
Type: EventError,
Error: errors.New("maximum retry attempts reached for rate limit (429)"),
}
return
}
retryMs := 0
retryAfterValues := apierr.Response.Header.Values("Retry-After")
if len(retryAfterValues) > 0 {
var retryAfterSec int
if _, err := fmt.Sscanf(retryAfterValues[0], "%d", &retryAfterSec); err == nil {
retryMs = retryAfterSec * 1000
eventChan <- ProviderEvent{
Type: EventWarning,
Info: fmt.Sprintf("[Rate limited: waiting %d seconds as specified by API]", retryAfterSec),
}
}
} else {
eventChan <- ProviderEvent{
Type: EventWarning,
Info: fmt.Sprintf("[Retrying due to rate limit... attempt %d of %d]", attempts, maxRetries),
}
backoffMs := 2000 * (1 << (attempts - 1))
jitterMs := int(float64(backoffMs) * 0.2)
retryMs = backoffMs + jitterMs
}
select {
case <-ctx.Done():
eventChan <- ProviderEvent{Type: EventError, Error: ctx.Err()}
return
case <-time.After(time.Duration(retryMs) * time.Millisecond):
continue
}
} }
}() }()
@@ -388,7 +361,6 @@ func (a *anthropicProvider) convertToAnthropicMessages(messages []message.Messag
blocks = append(blocks, anthropic.ContentBlockParamOfRequestToolUseBlock(toolCall.ID, inputMap, toolCall.Name)) blocks = append(blocks, anthropic.ContentBlockParamOfRequestToolUseBlock(toolCall.ID, inputMap, toolCall.Name))
} }
// Skip empty assistant messages completely
if len(blocks) > 0 { if len(blocks) > 0 {
anthropicMessages = append(anthropicMessages, anthropic.NewAssistantMessage(blocks...)) anthropicMessages = append(anthropicMessages, anthropic.NewAssistantMessage(blocks...))
} }
@@ -404,4 +376,3 @@ func (a *anthropicProvider) convertToAnthropicMessages(messages []message.Messag
return anthropicMessages return anthropicMessages
} }

View File

@@ -121,11 +121,7 @@ func (t *fetchTool) Run(ctx context.Context, call ToolCall) (ToolResponse, error
ToolName: FetchToolName, ToolName: FetchToolName,
Action: "fetch", Action: "fetch",
Description: fmt.Sprintf("Fetch content from URL: %s", params.URL), Description: fmt.Sprintf("Fetch content from URL: %s", params.URL),
Params: FetchPermissionsParams{ Params: FetchPermissionsParams(params),
URL: params.URL,
Format: params.Format,
Timeout: params.Timeout,
},
}, },
) )