fix: multi-tool calls in streamed openai-compatible responses (#3609)

This commit is contained in:
Jack Amadeo
2025-07-23 18:48:28 -04:00
committed by GitHub
parent 64d311c751
commit ed282ba63f

View File

@@ -437,56 +437,83 @@ where
if chunk.choices.is_empty() {
yield (None, usage)
} else if let Some(tool_calls) = &chunk.choices[0].delta.tool_calls {
let tool_call = &tool_calls[0];
let id = tool_call.id.clone().ok_or(anyhow!("No tool call ID"))?;
let function_name = tool_call.function.name.clone().ok_or(anyhow!("No function name"))?;
let mut arguments = tool_call.function.arguments.clone();
let mut tool_call_data: std::collections::HashMap<i32, (String, String, String)> = std::collections::HashMap::new();
while let Some(response_chunk) = stream.next().await {
if response_chunk.as_ref().is_ok_and(|s| s == "data: [DONE]") {
break 'outer;
}
let response_str = response_chunk?;
if let Some(line) = strip_data_prefix(&response_str) {
let tool_chunk: StreamingChunk = serde_json::from_str(line)
.map_err(|e| anyhow!("Failed to parse streaming chunk: {}: {:?}", e, &line))?;
let more_args = tool_chunk.choices[0].delta.tool_calls.as_ref()
.and_then(|calls| calls.first())
.map(|call| call.function.arguments.as_str());
if let Some(more_args) = more_args {
arguments.push_str(more_args);
} else {
break;
}
for tool_call in tool_calls {
if let (Some(index), Some(id), Some(name)) = (tool_call.index, &tool_call.id, &tool_call.function.name) {
tool_call_data.insert(index, (id.clone(), name.clone(), tool_call.function.arguments.clone()));
}
}
let parsed = if arguments.is_empty() {
Ok(json!({}))
} else {
serde_json::from_str::<Value>(&arguments)
};
let mut done = false;
while !done {
if let Some(response_chunk) = stream.next().await {
if response_chunk.as_ref().is_ok_and(|s| s == "data: [DONE]") {
break 'outer;
}
let response_str = response_chunk?;
if let Some(line) = strip_data_prefix(&response_str) {
let tool_chunk: StreamingChunk = serde_json::from_str(line)
.map_err(|e| anyhow!("Failed to parse streaming chunk: {}: {:?}", e, &line))?;
let content = match parsed {
Ok(params) => MessageContent::tool_request(
id,
Ok(ToolCall::new(function_name, params)),
),
Err(e) => {
let error = ToolError::InvalidParameters(format!(
"Could not interpret tool use parameters for id {}: {}",
id, e
));
MessageContent::tool_request(id, Err(error))
if let Some(delta_tool_calls) = &tool_chunk.choices[0].delta.tool_calls {
for delta_call in delta_tool_calls {
if let Some(index) = delta_call.index {
if let Some((_, _, ref mut args)) = tool_call_data.get_mut(&index) {
args.push_str(&delta_call.function.arguments);
} else if let (Some(id), Some(name)) = (&delta_call.id, &delta_call.function.name) {
tool_call_data.insert(index, (id.clone(), name.clone(), delta_call.function.arguments.clone()));
}
}
}
} else {
done = true;
}
if tool_chunk.choices[0].finish_reason == Some("tool_calls".to_string()) {
done = true;
}
}
} else {
break;
}
};
}
let mut contents = Vec::new();
let mut sorted_indices: Vec<_> = tool_call_data.keys().cloned().collect();
sorted_indices.sort();
for index in sorted_indices {
if let Some((id, function_name, arguments)) = tool_call_data.get(&index) {
let parsed = if arguments.is_empty() {
Ok(json!({}))
} else {
serde_json::from_str::<Value>(arguments)
};
let content = match parsed {
Ok(params) => MessageContent::tool_request(
id.clone(),
Ok(ToolCall::new(function_name.clone(), params)),
),
Err(e) => {
let error = ToolError::InvalidParameters(format!(
"Could not interpret tool use parameters for id {}: {}",
id, e
));
MessageContent::tool_request(id.clone(), Err(error))
}
};
contents.push(content);
}
}
yield (
Some(Message {
id: chunk.id,
role: Role::Assistant,
created: chrono::Utc::now().timestamp(),
content: vec![content],
content: contents,
}),
usage,
)
@@ -609,6 +636,8 @@ pub fn create_request(
mod tests {
use super::*;
use serde_json::json;
use tokio::pin;
use tokio_stream::{self, StreamExt};
#[test]
fn test_validate_tool_schemas() {
@@ -1096,4 +1125,54 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn test_streamed_multi_tool_response_to_messages() -> anyhow::Result<()> {
let response_lines = r#"
data: {"model":"us.anthropic.claude-sonnet-4-20250514-v1:0","choices":[{"delta":{"role":"assistant","content":"I'll run both"},"index":0,"finish_reason":null}],"usage":{"prompt_tokens":4982,"completion_tokens":null,"total_tokens":null},"object":"chat.completion.chunk","id":"msg_bdrk_014pifLTHsNZz6Lmtw1ywgDJ","created":1753288340}
data: {"model":"us.anthropic.claude-sonnet-4-20250514-v1:0","choices":[{"delta":{"role":"assistant","content":" `ls` commands in a"},"index":0,"finish_reason":null}],"usage":{"prompt_tokens":4982,"completion_tokens":null,"total_tokens":null},"object":"chat.completion.chunk","id":"msg_bdrk_014pifLTHsNZz6Lmtw1ywgDJ","created":1753288340}
data: {"model":"us.anthropic.claude-sonnet-4-20250514-v1:0","choices":[{"delta":{"role":"assistant","content":" single turn for you -"},"index":0,"finish_reason":null}],"usage":{"prompt_tokens":4982,"completion_tokens":null,"total_tokens":null},"object":"chat.completion.chunk","id":"msg_bdrk_014pifLTHsNZz6Lmtw1ywgDJ","created":1753288340}
data: {"model":"us.anthropic.claude-sonnet-4-20250514-v1:0","choices":[{"delta":{"role":"assistant","content":" one on the current directory an"},"index":0,"finish_reason":null}],"usage":{"prompt_tokens":4982,"completion_tokens":null,"total_tokens":null},"object":"chat.completion.chunk","id":"msg_bdrk_014pifLTHsNZz6Lmtw1ywgDJ","created":1753288340}
data: {"model":"us.anthropic.claude-sonnet-4-20250514-v1:0","choices":[{"delta":{"role":"assistant","content":"d one on the `working_dir`."},"index":0,"finish_reason":null}],"usage":{"prompt_tokens":4982,"completion_tokens":null,"total_tokens":null},"object":"chat.completion.chunk","id":"msg_bdrk_014pifLTHsNZz6Lmtw1ywgDJ","created":1753288340}
data: {"model":"us.anthropic.claude-sonnet-4-20250514-v1:0","choices":[{"delta":{"role":"assistant","content":null,"tool_calls":[{"index":1,"id":"toolu_bdrk_01RMTd7R9DzQjEEWgDwzcBsU","type":"function","function":{"name":"developer__shell","arguments":""}}]},"index":0,"finish_reason":null}],"usage":{"prompt_tokens":4982,"completion_tokens":null,"total_tokens":null},"object":"chat.completion.chunk","id":"msg_bdrk_014pifLTHsNZz6Lmtw1ywgDJ","created":1753288341}
data: {"model":"us.anthropic.claude-sonnet-4-20250514-v1:0","choices":[{"delta":{"role":"assistant","content":null,"tool_calls":[{"index":1,"function":{"arguments":""}}]},"index":0,"finish_reason":null}],"usage":{"prompt_tokens":4982,"completion_tokens":null,"total_tokens":null},"object":"chat.completion.chunk","id":"msg_bdrk_014pifLTHsNZz6Lmtw1ywgDJ","created":1753288341}
data: {"model":"us.anthropic.claude-sonnet-4-20250514-v1:0","choices":[{"delta":{"role":"assistant","content":null,"tool_calls":[{"index":1,"function":{"arguments":"{\""}}]},"index":0,"finish_reason":null}],"usage":{"prompt_tokens":4982,"completion_tokens":null,"total_tokens":null},"object":"chat.completion.chunk","id":"msg_bdrk_014pifLTHsNZz6Lmtw1ywgDJ","created":1753288341}
data: {"model":"us.anthropic.claude-sonnet-4-20250514-v1:0","choices":[{"delta":{"role":"assistant","content":null,"tool_calls":[{"index":1,"function":{"arguments":"command\": \"l"}}]},"index":0,"finish_reason":null}],"usage":{"prompt_tokens":4982,"completion_tokens":null,"total_tokens":null},"object":"chat.completion.chunk","id":"msg_bdrk_014pifLTHsNZz6Lmtw1ywgDJ","created":1753288341}
data: {"model":"us.anthropic.claude-sonnet-4-20250514-v1:0","choices":[{"delta":{"role":"assistant","content":null,"tool_calls":[{"index":1,"function":{"arguments":"s\"}"}}]},"index":0,"finish_reason":null}],"usage":{"prompt_tokens":4982,"completion_tokens":null,"total_tokens":null},"object":"chat.completion.chunk","id":"msg_bdrk_014pifLTHsNZz6Lmtw1ywgDJ","created":1753288341}
data: {"model":"us.anthropic.claude-sonnet-4-20250514-v1:0","choices":[{"delta":{"role":"assistant","content":null,"tool_calls":[{"index":2,"id":"toolu_bdrk_016bgVTGZdpjP8ehjMWp9cWW","type":"function","function":{"name":"developer__shell","arguments":""}}]},"index":0,"finish_reason":null}],"usage":{"prompt_tokens":4982,"completion_tokens":null,"total_tokens":null},"object":"chat.completion.chunk","id":"msg_bdrk_014pifLTHsNZz6Lmtw1ywgDJ","created":1753288341}
data: {"model":"us.anthropic.claude-sonnet-4-20250514-v1:0","choices":[{"delta":{"role":"assistant","content":null,"tool_calls":[{"index":2,"function":{"arguments":""}}]},"index":0,"finish_reason":null}],"usage":{"prompt_tokens":4982,"completion_tokens":null,"total_tokens":null},"object":"chat.completion.chunk","id":"msg_bdrk_014pifLTHsNZz6Lmtw1ywgDJ","created":1753288341}
data: {"model":"us.anthropic.claude-sonnet-4-20250514-v1:0","choices":[{"delta":{"role":"assistant","content":null,"tool_calls":[{"index":2,"function":{"arguments":"{\""}}]},"index":0,"finish_reason":null}],"usage":{"prompt_tokens":4982,"completion_tokens":null,"total_tokens":null},"object":"chat.completion.chunk","id":"msg_bdrk_014pifLTHsNZz6Lmtw1ywgDJ","created":1753288342}
data: {"model":"us.anthropic.claude-sonnet-4-20250514-v1:0","choices":[{"delta":{"role":"assistant","content":null,"tool_calls":[{"index":2,"function":{"arguments":"command\""}}]},"index":0,"finish_reason":null}],"usage":{"prompt_tokens":4982,"completion_tokens":null,"total_tokens":null},"object":"chat.completion.chunk","id":"msg_bdrk_014pifLTHsNZz6Lmtw1ywgDJ","created":1753288342}
data: {"model":"us.anthropic.claude-sonnet-4-20250514-v1:0","choices":[{"delta":{"role":"assistant","content":null,"tool_calls":[{"index":2,"function":{"arguments":": \"ls wor"}}]},"index":0,"finish_reason":null}],"usage":{"prompt_tokens":4982,"completion_tokens":null,"total_tokens":null},"object":"chat.completion.chunk","id":"msg_bdrk_014pifLTHsNZz6Lmtw1ywgDJ","created":1753288342}
data: {"model":"us.anthropic.claude-sonnet-4-20250514-v1:0","choices":[{"delta":{"role":"assistant","content":null,"tool_calls":[{"index":2,"function":{"arguments":"king_dir"}}]},"index":0,"finish_reason":null}],"usage":{"prompt_tokens":4982,"completion_tokens":null,"total_tokens":null},"object":"chat.completion.chunk","id":"msg_bdrk_014pifLTHsNZz6Lmtw1ywgDJ","created":1753288342}
data: {"model":"us.anthropic.claude-sonnet-4-20250514-v1:0","choices":[{"delta":{"role":"assistant","content":null,"tool_calls":[{"index":2,"function":{"arguments":"\"}"}}]},"index":0,"finish_reason":null}],"usage":{"prompt_tokens":4982,"completion_tokens":null,"total_tokens":null},"object":"chat.completion.chunk","id":"msg_bdrk_014pifLTHsNZz6Lmtw1ywgDJ","created":1753288342}
data: {"model":"us.anthropic.claude-sonnet-4-20250514-v1:0","choices":[{"delta":{"role":"assistant","content":""},"index":0,"finish_reason":"tool_calls"}],"usage":{"prompt_tokens":4982,"completion_tokens":122,"total_tokens":5104},"object":"chat.completion.chunk","id":"msg_bdrk_014pifLTHsNZz6Lmtw1ywgDJ","created":1753288342}
data: [DONE]
"#;
let response_stream =
tokio_stream::iter(response_lines.lines().map(|line| Ok(line.to_string())));
let messages = response_to_streaming_message(response_stream);
pin!(messages);
while let Some(Ok((message, _usage))) = messages.next().await {
if let Some(msg) = message {
println!("{:?}", msg);
if msg.content.len() == 2 {
if let (MessageContent::ToolRequest(req1), MessageContent::ToolRequest(req2)) =
(&msg.content[0], &msg.content[1])
{
if req1.tool_call.is_ok() && req2.tool_call.is_ok() {
// We expect two tool calls in the response
assert_eq!(req1.tool_call.as_ref().unwrap().name, "developer__shell");
assert_eq!(req2.tool_call.as_ref().unwrap().name, "developer__shell");
return Ok(());
}
}
}
}
}
panic!("Expected tool call message with two calls, but did not see it");
}
}