fix: clean up subagent (#3565)

This commit is contained in:
Wendy Tang
2025-07-24 10:36:08 -07:00
committed by GitHub
parent a65c547699
commit 7b2ca43c77
9 changed files with 131 additions and 308 deletions

View File

@@ -219,6 +219,7 @@ fn render_tool_request(req: &ToolRequest, theme: Theme, debug: bool) {
Ok(call) => match call.name.as_str() {
"developer__text_editor" => render_text_editor_request(call, debug),
"developer__shell" => render_shell_request(call, debug),
"dynamic_task__create_task" => render_dynamic_task_request(call, debug),
_ => render_default_request(call, debug),
},
Err(e) => print_markdown(&e.to_string(), theme),
@@ -392,6 +393,37 @@ fn render_shell_request(call: &ToolCall, debug: bool) {
}
}
fn render_dynamic_task_request(call: &ToolCall, debug: bool) {
print_tool_header(call);
// Print task_parameters array
if let Some(Value::Array(task_parameters)) = call.arguments.get("task_parameters") {
println!("{}:", style("task_parameters").dim());
for task_param in task_parameters.iter() {
println!(" -");
if let Some(param_obj) = task_param.as_object() {
for (key, value) in param_obj {
match value {
Value::String(s) => {
// For strings, print the full content without truncation
println!(" {}: {}", style(key).dim(), style(s).green());
}
_ => {
// For everything else, use print_params
print!(" ");
print_params(value, 0, debug);
}
}
}
}
}
}
println!();
}
fn render_default_request(call: &ToolCall, debug: bool) {
print_tool_header(call);
print_params(&call.arguments, 0, debug);
@@ -463,26 +495,8 @@ fn print_params(value: &Value, depth: usize, debug: bool) {
}
}
Value::String(s) => {
// Special handling for text_instruction to show more content
let max_length = if key == "text_instruction" {
200 // Allow longer display for text instructions
} else {
get_tool_params_max_length()
};
if !debug && s.len() > max_length {
// For text instructions, show a preview instead of just "..."
if key == "text_instruction" {
let preview = &s[..max_length.saturating_sub(3)];
println!(
"{}{}: {}",
indent,
style(key).dim(),
style(format!("{}...", preview)).green()
);
} else {
println!("{}{}: {}", indent, style(key).dim(), style("...").dim());
}
if !debug && s.len() > get_tool_params_max_length() {
println!("{}{}: {}", indent, style(key).dim(), style("...").dim());
} else {
println!("{}{}: {}", indent, style(key).dim(), style(s).green());
}

View File

@@ -75,8 +75,6 @@ pub struct Agent {
pub(super) tool_monitor: Arc<Mutex<Option<ToolMonitor>>>,
pub(super) router_tool_selector: Mutex<Option<Arc<Box<dyn RouterToolSelector>>>>,
pub(super) scheduler_service: Mutex<Option<Arc<dyn SchedulerTrait>>>,
pub(super) mcp_tx: Mutex<mpsc::Sender<JsonRpcMessage>>,
pub(super) mcp_notification_rx: Arc<Mutex<mpsc::Receiver<JsonRpcMessage>>>,
pub(super) retry_manager: RetryManager,
}
@@ -132,8 +130,6 @@ impl Agent {
// Create channels with buffer size 32 (adjust if needed)
let (confirm_tx, confirm_rx) = mpsc::channel(32);
let (tool_tx, tool_rx) = mpsc::channel(32);
// Add MCP notification channel
let (mcp_tx, mcp_rx) = mpsc::channel(100);
let tool_monitor = Arc::new(Mutex::new(None));
let retry_manager = RetryManager::with_tool_monitor(tool_monitor.clone());
@@ -154,9 +150,6 @@ impl Agent {
tool_monitor,
router_tool_selector: Mutex::new(None),
scheduler_service: Mutex::new(None),
// Initialize with MCP notification support
mcp_tx: Mutex::new(mcp_tx),
mcp_notification_rx: Arc::new(Mutex::new(mcp_rx)),
retry_manager,
}
}
@@ -342,9 +335,8 @@ impl Agent {
.await
} else if tool_call.name == SUBAGENT_EXECUTE_TASK_TOOL_NAME {
let provider = self.provider().await.ok();
let mcp_tx = self.mcp_tx.lock().await.clone();
let task_config = TaskConfig::new(provider, mcp_tx);
let task_config = TaskConfig::new(provider);
subagent_execute_task_tool::run_tasks(
tool_call.arguments.clone(),
task_config,
@@ -771,24 +763,6 @@ impl Agent {
break;
}
// Handle MCP notifications from subagents
let mcp_notifications = self.get_mcp_notifications().await;
for notification in mcp_notifications {
if let JsonRpcMessage::Notification(notif) = &notification {
if let Some(data) = notif.notification.params.get("data") {
if let (Some(subagent_id), Some(_message)) = (
data.get("subagent_id").and_then(|v| v.as_str()),
data.get("message").and_then(|v| v.as_str()),
) {
yield AgentEvent::McpNotification((
subagent_id.to_string(),
notification.clone(),
));
}
}
}
}
let mut stream = Self::stream_response_from_provider(
self.provider().await?,
&system_prompt,
@@ -1085,18 +1059,6 @@ impl Agent {
prompt_manager.add_system_prompt_extra(instruction);
}
/// Get MCP notifications from subagents
pub async fn get_mcp_notifications(&self) -> Vec<JsonRpcMessage> {
let mut notifications = Vec::new();
let mut rx = self.mcp_notification_rx.lock().await;
while let Ok(notification) = rx.try_recv() {
notifications.push(notification);
}
notifications
}
pub async fn update_provider(&self, provider: Arc<dyn Provider>) -> Result<()> {
let mut current_provider = self.provider.lock().await;
*current_provider = Some(provider.clone());

View File

@@ -32,11 +32,8 @@ pub fn create_dynamic_task_tool() -> Tool {
text_instruction: Search for the config file in the root directory.
Examples of 'task_parameters' for multiple tasks:
text_instruction: Get weather for Melbourne.
timeout_seconds: 300
text_instruction: Get weather for Los Angeles.
timeout_seconds: 300
text_instruction: Get weather for San Francisco.
timeout_seconds: 300
".to_string(),
json!({
"type": "object",
@@ -54,11 +51,6 @@ pub fn create_dynamic_task_tool() -> Tool {
"type": "string",
"description": "The text instruction to execute"
},
"timeout_seconds": {
"type": "integer",
"description": "Optional timeout for the task in seconds (default: 300)",
"minimum": 1
}
},
"required": ["text_instruction"]
}

View File

@@ -10,8 +10,6 @@ use crate::{
use anyhow::anyhow;
use chrono::{DateTime, Utc};
use mcp_core::{handler::ToolError, tool::Tool};
use rmcp::model::{JsonRpcMessage, JsonRpcNotification, JsonRpcVersion2_0, Notification};
use rmcp::object;
use serde::{Deserialize, Serialize};
// use serde_json::{self};
use std::{collections::HashMap, sync::Arc};
@@ -52,9 +50,7 @@ pub struct SubAgent {
impl SubAgent {
/// Create a new subagent with the given configuration and provider
#[instrument(skip(task_config))]
pub async fn new(
task_config: TaskConfig,
) -> Result<(Arc<Self>, tokio::task::JoinHandle<()>), anyhow::Error> {
pub async fn new(task_config: TaskConfig) -> Result<Arc<Self>, anyhow::Error> {
debug!("Creating new subagent with id: {}", task_config.id);
// Create a new extension manager for this subagent
@@ -90,21 +86,8 @@ impl SubAgent {
extension_manager: Arc::new(RwLock::new(extension_manager)),
});
// Send initial MCP notification
let subagent_clone = Arc::clone(&subagent);
subagent_clone
.send_mcp_notification("subagent_created", "Subagent created and ready")
.await;
// Create a background task handle (for future use with streaming/monitoring)
let subagent_clone = Arc::clone(&subagent);
let handle = tokio::spawn(async move {
// This could be used for background monitoring, cleanup, etc.
debug!("Subagent {} background task started", subagent_clone.id);
});
debug!("Subagent {} created successfully", subagent.id);
Ok((subagent, handle))
Ok(subagent)
}
/// Get the current status of the subagent
@@ -119,51 +102,6 @@ impl SubAgent {
let mut current_status = self.status.write().await;
*current_status = status.clone();
} // Write lock is released here!
// Send MCP notifications based on status
match &status {
SubAgentStatus::Processing => {
self.send_mcp_notification("status_changed", "Processing request")
.await;
}
SubAgentStatus::Completed(msg) => {
self.send_mcp_notification("completed", &format!("Completed: {}", msg))
.await;
}
SubAgentStatus::Terminated => {
self.send_mcp_notification("terminated", "Subagent terminated")
.await;
}
_ => {}
}
}
/// Send an MCP notification about the subagent's activity
pub async fn send_mcp_notification(&self, notification_type: &str, message: &str) {
let notification = JsonRpcMessage::Notification(JsonRpcNotification {
jsonrpc: JsonRpcVersion2_0,
notification: Notification {
method: "notifications/message".to_string(),
params: object!({
"level": "info",
"logger": format!("subagent_{}", self.id),
"data": {
"subagent_id": self.id,
"type": notification_type,
"message": message,
"timestamp": Utc::now().to_rfc3339()
}
}),
extensions: Default::default(),
},
});
if let Err(e) = self.config.mcp_tx.send(notification).await {
error!(
"Failed to send MCP notification from subagent {}: {}",
self.id, e
);
}
}
/// Get current progress information
@@ -192,10 +130,8 @@ impl SubAgent {
&self,
message: String,
task_config: TaskConfig,
) -> Result<Message, anyhow::Error> {
) -> Result<Vec<Message>, anyhow::Error> {
debug!("Processing message for subagent {}", self.id);
self.send_mcp_notification("message_processing", &format!("Processing: {}", message))
.await;
// Get provider from task config
let provider = self
@@ -234,6 +170,7 @@ impl SubAgent {
// Generate response from provider with loop for tool processing (max_turns iterations)
let mut loop_count = 0;
let max_turns = self.config.max_turns.unwrap_or(DEFAULT_SUBAGENT_MAX_TURNS);
let mut last_error: Option<anyhow::Error> = None;
// Generate response from provider
loop {
@@ -265,18 +202,12 @@ impl SubAgent {
// If there are no tool requests, we're done
if tool_requests.is_empty() || loop_count >= max_turns {
self.add_message(response.clone()).await;
messages.push(response.clone());
// Send notification about response
self.send_mcp_notification(
"response_generated",
&format!("Responded: {}", response.as_concat_text()),
)
.await;
// Set status back to ready and return the final response
// Set status back to ready
self.set_status(SubAgentStatus::Completed("Completed!".to_string()))
.await;
break Ok(response);
break;
}
// Add the assistant message with tool calls to the conversation
@@ -285,13 +216,6 @@ impl SubAgent {
// Process each tool request and create user response messages
for request in &tool_requests {
if let Ok(tool_call) = &request.tool_call {
// Send notification about tool usage
self.send_mcp_notification(
"tool_usage",
&format!("Using tool: {}", tool_call.name),
)
.await;
// Handle platform tools or dispatch to extension manager
let tool_result = match self
.extension_manager
@@ -310,13 +234,6 @@ impl SubAgent {
let tool_response_message = Message::user()
.with_tool_response(request.id.clone(), Ok(result.clone()));
messages.push(tool_response_message);
// Send notification about tool completion
self.send_mcp_notification(
"tool_completed",
&format!("Tool {} completed successfully", tool_call.name),
)
.await;
}
Err(e) => {
// Create a user message with the tool error
@@ -325,13 +242,6 @@ impl SubAgent {
Err(ToolError::ExecutionError(e.to_string())),
);
messages.push(tool_error_message);
// Send notification about tool error
self.send_mcp_notification(
"tool_error",
&format!("Tool {} error: {}", tool_call.name, e),
)
.await;
}
}
}
@@ -344,24 +254,31 @@ impl SubAgent {
"Context length exceeded".to_string(),
))
.await;
break Ok(Message::assistant().with_context_length_exceeded(
"The context length of the model has been exceeded. Please start a new session and try again.",
));
last_error = Some(anyhow::anyhow!("Context length exceeded"));
break;
}
Err(ProviderError::RateLimitExceeded(_)) => {
self.set_status(SubAgentStatus::Completed("Rate limit exceeded".to_string()))
.await;
break Ok(Message::assistant()
.with_text("Rate limit exceeded. Please try again later."));
last_error = Some(anyhow::anyhow!("Rate limit exceeded"));
break;
}
Err(e) => {
self.set_status(SubAgentStatus::Completed(format!("Error: {}", e)))
.await;
error!("Error: {}", e);
break Ok(Message::assistant().with_text(format!("Ran into this error: {e}.\n\nPlease retry if you think this is a transient or recoverable error.")));
last_error = Some(anyhow::anyhow!("Provider error: {}", e));
break;
}
}
}
// Handle error cases or return the last message
if let Some(error) = last_error {
Err(error)
} else {
Ok(messages)
}
}
/// Add a message to the conversation (for tracking agent responses)

View File

@@ -40,18 +40,8 @@ fn format_task_metadata(task_info: &TaskInfo) -> String {
})
.collect::<Vec<_>>()
.join(",")
} else if task_info.task.task_type == "text_instruction" {
// For text_instruction tasks, extract and display the instruction
if let Some(text_instruction) = task_info.task.get_text_instruction() {
// Truncate long instructions to keep the display clean
if text_instruction.len() > 80 {
format!("instruction={}...", &text_instruction[..77])
} else {
format!("instruction={}", text_instruction)
}
} else {
String::new()
}
} else if let Some(text_instruction) = task_info.task.get_text_instruction() {
format!("instruction={}", text_instruction)
} else {
String::new()
}

View File

@@ -1,5 +1,4 @@
use serde_json::Value;
use std::ops::Deref;
use std::process::Stdio;
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, BufReader};
@@ -89,37 +88,16 @@ async fn handle_text_instruction_task(
// Start tracking the task
task_execution_tracker.start_task(&task.id).await;
// Create arguments for the subagent task
let task_arguments = serde_json::json!({
"text_instruction": text_instruction,
// "instructions": "You are a helpful assistant. Execute the given task and provide a clear, concise response.",
});
let result = tokio::select! {
result = run_complete_subagent_task(task_arguments, task_config) => result,
result = run_complete_subagent_task(text_instruction.to_string(), task_config) => result,
_ = cancellation_token.cancelled() => {
return Err("Task cancelled".to_string());
}
};
match result {
Ok(contents) => {
// Extract the text content from the result
let result_text = contents
.into_iter()
.filter_map(|content| match content.deref() {
rmcp::model::RawContent::Text(raw_text_content) => {
Some(raw_text_content.text.clone())
}
_ => None,
})
.collect::<Vec<_>>()
.join("\n");
Ok(serde_json::json!({
"result": result_text
}))
}
Ok(result_text) => Ok(serde_json::json!({
"result": result_text
})),
Err(e) => {
let error_msg = format!("Subagent execution failed: {}", e);
Err(error_msg)

View File

@@ -2,43 +2,63 @@ use crate::agents::subagent::SubAgent;
use crate::agents::subagent_task_config::TaskConfig;
use anyhow::Result;
use mcp_core::ToolError;
use rmcp::model::Content;
use serde_json::Value;
/// Standalone function to run a complete subagent task
pub async fn run_complete_subagent_task(
task_arguments: Value,
text_instruction: String,
task_config: TaskConfig,
) -> Result<Vec<Content>, ToolError> {
// Parse arguments - using "task" as the main message parameter
let text_instruction = task_arguments
.get("text_instruction")
.and_then(|v| v.as_str())
.ok_or_else(|| ToolError::ExecutionError("Missing text_instruction parameter".to_string()))?
.to_string();
) -> Result<String, anyhow::Error> {
// Create the subagent with the parent agent's provider
let (subagent, handle) = SubAgent::new(task_config.clone())
let subagent = SubAgent::new(task_config.clone())
.await
.map_err(|e| ToolError::ExecutionError(format!("Failed to create subagent: {}", e)))?;
// Execute the subagent task
let result = match subagent.reply_subagent(text_instruction, task_config).await {
Ok(response) => {
let response_text = response.as_concat_text();
Ok(vec![Content::text(response_text)])
}
Err(e) => Err(ToolError::ExecutionError(format!(
"Subagent execution failed: {}",
e
))),
};
let messages = subagent
.reply_subagent(text_instruction, task_config)
.await?;
// Clean up the subagent handle
if let Err(e) = handle.await {
tracing::debug!("Subagent handle cleanup error: {}", e);
}
// Extract all text content from all messages
let all_text_content: Vec<String> = messages
.iter()
.flat_map(|message| {
message.content.iter().filter_map(|content| {
match content {
crate::message::MessageContent::Text(text_content) => {
Some(text_content.text.clone())
}
crate::message::MessageContent::ToolResponse(tool_response) => {
// Extract text from tool response
if let Ok(contents) = &tool_response.tool_result {
let texts: Vec<String> = contents
.iter()
.filter_map(|content| {
if let rmcp::model::RawContent::Text(raw_text_content) =
&content.raw
{
Some(raw_text_content.text.clone())
} else {
None
}
})
.collect();
if !texts.is_empty() {
Some(format!("Tool result: {}", texts.join("\n")))
} else {
None
}
} else {
None
}
}
_ => None,
}
})
})
.collect();
let response_text = all_text_content.join("\n");
// Return the result
result
Ok(response_text)
}

View File

@@ -1,9 +1,7 @@
use crate::providers::base::Provider;
use rmcp::model::JsonRpcMessage;
use std::env;
use std::fmt;
use std::sync::Arc;
use tokio::sync::mpsc;
use uuid::Uuid;
/// Default maximum number of turns for task execution
@@ -17,7 +15,6 @@ pub const GOOSE_SUBAGENT_MAX_TURNS_ENV_VAR: &str = "GOOSE_SUBAGENT_MAX_TURNS";
pub struct TaskConfig {
pub id: String,
pub provider: Option<Arc<dyn Provider>>,
pub mcp_tx: mpsc::Sender<JsonRpcMessage>,
pub max_turns: Option<usize>,
}
@@ -33,11 +30,10 @@ impl fmt::Debug for TaskConfig {
impl TaskConfig {
/// Create a new TaskConfig with all required dependencies
pub fn new(provider: Option<Arc<dyn Provider>>, mcp_tx: mpsc::Sender<JsonRpcMessage>) -> Self {
pub fn new(provider: Option<Arc<dyn Provider>>) -> Self {
Self {
id: Uuid::new_v4().to_string(),
provider,
mcp_tx,
max_turns: Some(
env::var(GOOSE_SUBAGENT_MAX_TURNS_ENV_VAR)
.ok()
@@ -51,9 +47,4 @@ impl TaskConfig {
pub fn provider(&self) -> Option<&Arc<dyn Provider>> {
self.provider.as_ref()
}
/// Get a clone of the MCP sender
pub fn mcp_tx(&self) -> mpsc::Sender<JsonRpcMessage> {
self.mcp_tx.clone()
}
}

View File

@@ -1,76 +1,35 @@
You are a specialized subagent within the Goose AI framework, created by Block, the parent company of Square, CashApp, and Tidal. Goose is being developed as an open-source software project. You were spawned by the main Goose agent to handle a specific task or set of operations.
You are a specialized subagent within the Goose AI framework, created by Block. You were spawned by the main Goose agent to handle a specific task efficiently. The current date is {{current_date_time}}.
The current date is {{current_date_time}}.
You use LLM providers with tool calling capability. You can be used with different language models (gpt-4o, claude-3.5-sonnet, o1, llama-3.2, deepseek-r1, etc). These models have varying knowledge cut-off dates depending on when they were trained, but typically it's between 5-10 months prior to the current date.
# Your Role as a Subagent
You are an autonomous subagent with the following characteristics:
- **Independence**: You can make decisions and execute tools within your scope
- **Specialization**: You focus on specific tasks assigned by the main Goose agent
- **Collaboration**: You report progress and results back to the main Goose agent
- **Bounded Operation**: You operate within defined limits (turn count, timeout, specific instructions)
- **Security**: You cannot spawn additional subagents to prevent infinite recursion and maintain system stability
# Your Role
You are an autonomous subagent with these characteristics:
- **Independence**: Make decisions and execute tools within your scope
- **Specialization**: Focus on specific tasks assigned by the main agent
- **Efficiency**: Use tools sparingly and only when necessary
- **Bounded Operation**: Operate within defined limits (turn count, timeout)
- **Security**: Cannot spawn additional subagents
The maximum number of turns to respond is {{max_turns}}.
{% if subagent_id is defined %}
**Subagent ID**: {{subagent_id}}
{% endif %}
{% if recipe_title is defined %}
**Recipe**: {{recipe_title}}
{% endif %}
{% if max_turns is defined %}
**Maximum Turns**: {{max_turns}}
{% endif %}
# Task Instructions
{{task_instructions}}
# Extensions and Tools
# Tool Usage Guidelines
**CRITICAL**: Be efficient with tool usage. Use tools only when absolutely necessary to complete your task. Here are the available tools you have access to:
You have access to {{tool_count}} tools: {{available_tools}}
Extensions allow other applications to provide context to you. Extensions connect you to different data sources and tools. You are capable of using tools from these extensions to solve higher level problems and can interact with multiple at once.
{% if recipe_title is defined %}
**Recipe Mode**: You are operating with a specific recipe that defines which extensions and tools you can use. This focused approach helps you stay on task and work efficiently within your defined scope.
{% if (extensions is defined) and extensions %}
You have access to the following recipe-specific extensions ({{extensions|length}} extension{% if extensions|length > 1 %}s{% endif %}). Each of these extensions provides tools that are in your tool specification:
{% for extension in extensions %}
- {{extension}}
{% endfor %}
You have {{tool_count}} tool{% if tool_count > 1 %}s{% endif %} available: {{available_tools}}
{% else %}
Your recipe doesn't specify any extensions, so you have access to the basic tool set.
You have {{tool_count}} tool{% if tool_count > 1 %}s{% endif %} available: {{available_tools}}
{% endif %}
{% else %}
**Inheritance Mode**: You inherit all available extensions and tools from the parent Goose agent. You can use all the tools that were available to the parent agent when you were created.
You have {{tool_count}} tool{% if tool_count > 1 %}s{% endif %} available: {{available_tools}}
{% endif %}
**Tool Efficiency Rules**:
- Use the minimum number of tools needed to complete your task
- Avoid exploratory tool usage unless explicitly required
- Stop using tools once you have sufficient information
- Provide clear, concise responses without excessive tool calls
# Communication Guidelines
- **Progress Updates**: Report progress clearly and concisely
- **Completion**: Clearly indicate when your task is complete
- **Scope**: Stay focused on your assigned task
- **Format**: Use Markdown formatting for responses
- **Progress Updates**: Regularly communicate your progress on the assigned task
- **Completion Reporting**: Clearly indicate when your task is complete and provide results
- **Error Handling**: Report any issues or limitations you encounter
- **Scope Awareness**: Stay focused on your assigned task and don't exceed your defined boundaries
# Response Guidelines
- Use Markdown formatting for all responses.
- Follow best practices for Markdown, including:
- Using headers for organization.
- Bullet points for lists.
- Links formatted correctly, either as linked text (e.g., [this is linked text](https://example.com)) or automatic links using angle brackets (e.g., <http://example.com/>).
- For code examples, use fenced code blocks by placing triple backticks (` ``` `) before and after the code. Include the language identifier after the opening backticks (e.g., ` ```python `) to enable syntax highlighting.
- Ensure clarity, conciseness, and proper formatting to enhance readability and usability.
- Be task-focused in your communications and provide clear status updates about your progress.
- When completing tasks, summarize what was accomplished.
- If you encounter limitations or need clarification, communicate this clearly.
Remember: You are part of a larger Goose system working collaboratively to solve complex problems. Your specialized focus helps the main agent handle multiple concerns efficiently.
Remember: You are part of a larger system. Your specialized focus helps the main agent handle multiple concerns efficiently. Complete your task efficiently with less tool usage.