From d9d7eb06978e0dc0dc438c5f993352399b2268f0 Mon Sep 17 00:00:00 2001 From: Wendy Tang Date: Wed, 25 Jun 2025 16:18:48 -0700 Subject: [PATCH] subagents (#2797) Co-authored-by: Lily Delalande --- crates/goose-cli/src/commands/web.rs | 1 + crates/goose-cli/src/session/mod.rs | 83 ++- crates/goose-cli/src/session/output.rs | 1 + crates/goose-ffi/src/lib.rs | 1 + crates/goose-server/src/routes/reply.rs | 2 + crates/goose/src/agents/agent.rs | 255 +++++-- crates/goose/src/agents/mod.rs | 8 + crates/goose/src/agents/reply_parts.rs | 2 +- crates/goose/src/agents/subagent.rs | 757 ++++++++++++++++++++ crates/goose/src/agents/subagent_handler.rs | 79 ++ crates/goose/src/agents/subagent_manager.rs | 404 +++++++++++ crates/goose/src/agents/subagent_tools.rs | 68 ++ crates/goose/src/agents/subagent_types.rs | 42 ++ crates/goose/src/prompts/subagent_system.md | 76 ++ crates/goose/src/recipe/mod.rs | 12 +- crates/goose/src/scheduler.rs | 1 + crates/goose/tests/agent.rs | 1 + 17 files changed, 1709 insertions(+), 84 deletions(-) create mode 100644 crates/goose/src/agents/subagent.rs create mode 100644 crates/goose/src/agents/subagent_handler.rs create mode 100644 crates/goose/src/agents/subagent_manager.rs create mode 100644 crates/goose/src/agents/subagent_tools.rs create mode 100644 crates/goose/src/agents/subagent_types.rs create mode 100644 crates/goose/src/prompts/subagent_system.md diff --git a/crates/goose-cli/src/commands/web.rs b/crates/goose-cli/src/commands/web.rs index 9ba39b13..33e0d34d 100644 --- a/crates/goose-cli/src/commands/web.rs +++ b/crates/goose-cli/src/commands/web.rs @@ -618,6 +618,7 @@ async fn process_message_streaming( // Log model change tracing::info!("Model changed to {} in {} mode", model, mode); } + Err(e) => { error!("Error in message stream: {}", e); let mut sender = sender.lock().await; diff --git a/crates/goose-cli/src/session/mod.rs b/crates/goose-cli/src/session/mod.rs index 18916550..6f09c1ee 100644 --- a/crates/goose-cli/src/session/mod.rs +++ b/crates/goose-cli/src/session/mod.rs @@ -906,23 +906,87 @@ impl Session { match method.as_str() { "notifications/message" => { let data = o.get("data").unwrap_or(&Value::Null); - let message = match data { - Value::String(s) => s.clone(), + let (formatted_message, subagent_id, _notification_type) = match data { + Value::String(s) => (s.clone(), None, None), Value::Object(o) => { - if let Some(Value::String(output)) = o.get("output") { - output.to_owned() + // Check for subagent notification structure first + if let Some(Value::String(msg)) = o.get("message") { + // Extract subagent info for better display + let subagent_id = o.get("subagent_id") + .and_then(|v| v.as_str()) + .unwrap_or("unknown"); + let notification_type = o.get("type") + .and_then(|v| v.as_str()) + .unwrap_or(""); + + let formatted = match notification_type { + "subagent_created" | "completed" | "terminated" => { + format!("🤖 {}", msg) + } + "tool_usage" | "tool_completed" | "tool_error" => { + format!("🔧 {}", msg) + } + "message_processing" | "turn_progress" => { + format!("💭 {}", msg) + } + "response_generated" => { + // Check verbosity setting for subagent response content + let config = Config::global(); + let min_priority = config + .get_param::("GOOSE_CLI_MIN_PRIORITY") + .ok() + .unwrap_or(0.5); + + if min_priority > 0.1 && !self.debug { + // High/Medium verbosity: show truncated response + if let Some(response_content) = msg.strip_prefix("Responded: ") { + if response_content.len() > 100 { + format!("🤖 Responded: {}...", &response_content[..100]) + } else { + format!("🤖 {}", msg) + } + } else { + format!("🤖 {}", msg) + } + } else { + // All verbosity or debug: show full response + format!("🤖 {}", msg) + } + } + _ => { + msg.to_string() + } + }; + (formatted, Some(subagent_id.to_string()), Some(notification_type.to_string())) + } else if let Some(Value::String(output)) = o.get("output") { + // Fallback for other MCP notification types + (output.to_owned(), None, None) } else { - data.to_string() + (data.to_string(), None, None) } }, v => { - v.to_string() + (v.to_string(), None, None) }, }; - if interactive { - output::set_thinking_message(&message); + + // Handle subagent notifications - show immediately + if let Some(_id) = subagent_id { + // Show subagent notifications immediately (no buffering) with compact spacing + if interactive { + let _ = progress_bars.hide(); + println!("{}", console::style(&formatted_message).green().dim()); + } else { + progress_bars.log(&formatted_message); + } } else { - progress_bars.log(&message); + // Non-subagent notification, display immediately with compact spacing + if interactive { + let _ = progress_bars.hide(); + println!("{}", console::style(&formatted_message).green().dim()); + } else { + progress_bars.log(&formatted_message); + } } }, "notifications/progress" => { @@ -951,6 +1015,7 @@ impl Session { eprintln!("Model changed to {} in {} mode", model, mode); } } + Some(Err(e)) => { eprintln!("Error: {}", e); drop(stream); diff --git a/crates/goose-cli/src/session/output.rs b/crates/goose-cli/src/session/output.rs index f8792c81..8bc26780 100644 --- a/crates/goose-cli/src/session/output.rs +++ b/crates/goose-cli/src/session/output.rs @@ -117,6 +117,7 @@ pub fn hide_thinking() { THINKING.with(|t| t.borrow_mut().hide()); } +#[allow(dead_code)] pub fn set_thinking_message(s: &String) { THINKING.with(|t| { if let Some(spinner) = t.borrow_mut().spinner.as_mut() { diff --git a/crates/goose-ffi/src/lib.rs b/crates/goose-ffi/src/lib.rs index 46b8c15c..bf4197c2 100644 --- a/crates/goose-ffi/src/lib.rs +++ b/crates/goose-ffi/src/lib.rs @@ -269,6 +269,7 @@ pub unsafe extern "C" fn goose_agent_send_message( Ok(AgentEvent::ModelChange { .. }) => { // Model change events are informational, just continue } + Err(e) => { full_response.push_str(&format!("\nError in message stream: {}", e)); } diff --git a/crates/goose-server/src/routes/reply.rs b/crates/goose-server/src/routes/reply.rs index 6954724b..81e1d7fc 100644 --- a/crates/goose-server/src/routes/reply.rs +++ b/crates/goose-server/src/routes/reply.rs @@ -277,6 +277,7 @@ async fn handler( ).await; } } + Ok(Some(Err(e))) => { tracing::error!("Error processing message: {}", e); let _ = stream_event( @@ -392,6 +393,7 @@ async fn ask_handler( // Handle notifications if needed tracing::info!("Received notification: {:?}", n); } + Err(e) => { tracing::error!("Error processing as_ai message: {}", e); return Err(StatusCode::INTERNAL_SERVER_ERROR); diff --git a/crates/goose/src/agents/agent.rs b/crates/goose/src/agents/agent.rs index 2c36da75..c2a5dbdb 100644 --- a/crates/goose/src/agents/agent.rs +++ b/crates/goose/src/agents/agent.rs @@ -22,7 +22,7 @@ use crate::scheduler_trait::SchedulerTrait; use crate::tool_monitor::{ToolCall, ToolMonitor}; use regex::Regex; use serde_json::Value; -use tokio::sync::{mpsc, Mutex}; +use tokio::sync::{mpsc, Mutex, RwLock}; use tracing::{debug, error, instrument}; use crate::agents::extension::{ExtensionConfig, ExtensionError, ExtensionResult, ToolInfo}; @@ -45,14 +45,18 @@ use mcp_core::{ prompt::Prompt, protocol::GetPromptResult, tool::Tool, Content, ToolError, ToolResult, }; +use crate::agents::subagent_tools::SUBAGENT_RUN_TASK_TOOL_NAME; + use super::platform_tools; use super::router_tools; +use super::subagent_manager::SubAgentManager; +use super::subagent_tools; use super::tool_execution::{ToolCallResult, CHAT_MODE_TOOL_SKIPPED_RESPONSE, DECLINED_RESPONSE}; /// The main goose Agent pub struct Agent { pub(super) provider: Mutex>>, - pub(super) extension_manager: Mutex, + pub(super) extension_manager: RwLock, pub(super) sub_recipe_manager: Mutex, pub(super) frontend_tools: Mutex>, pub(super) frontend_instructions: Mutex>, @@ -64,6 +68,8 @@ pub struct Agent { pub(super) tool_monitor: Mutex>, pub(super) router_tool_selector: Mutex>>>, pub(super) scheduler_service: Mutex>>, + pub(super) subagent_manager: Mutex>, + pub(super) mcp_notification_rx: Arc>>, } #[derive(Clone, Debug)] @@ -73,52 +79,6 @@ pub enum AgentEvent { ModelChange { model: String, mode: String }, } -impl Agent { - pub fn new() -> Self { - // Create channels with buffer size 32 (adjust if needed) - let (confirm_tx, confirm_rx) = mpsc::channel(32); - let (tool_tx, tool_rx) = mpsc::channel(32); - - Self { - provider: Mutex::new(None), - extension_manager: Mutex::new(ExtensionManager::new()), - sub_recipe_manager: Mutex::new(SubRecipeManager::new()), - frontend_tools: Mutex::new(HashMap::new()), - frontend_instructions: Mutex::new(None), - prompt_manager: Mutex::new(PromptManager::new()), - confirmation_tx: confirm_tx, - confirmation_rx: Mutex::new(confirm_rx), - tool_result_tx: tool_tx, - tool_result_rx: Arc::new(Mutex::new(tool_rx)), - tool_monitor: Mutex::new(None), - router_tool_selector: Mutex::new(None), - scheduler_service: Mutex::new(None), - } - } - - pub async fn configure_tool_monitor(&self, max_repetitions: Option) { - let mut tool_monitor = self.tool_monitor.lock().await; - *tool_monitor = Some(ToolMonitor::new(max_repetitions)); - } - - pub async fn get_tool_stats(&self) -> Option> { - let tool_monitor = self.tool_monitor.lock().await; - tool_monitor.as_ref().map(|monitor| monitor.get_stats()) - } - - pub async fn reset_tool_monitor(&self) { - if let Some(monitor) = self.tool_monitor.lock().await.as_mut() { - monitor.reset(); - } - } - - /// Set the scheduler service for this agent - pub async fn set_scheduler(&self, scheduler: Arc) { - let mut scheduler_service = self.scheduler_service.lock().await; - *scheduler_service = Some(scheduler); - } -} - impl Default for Agent { fn default() -> Self { Self::new() @@ -160,6 +120,55 @@ where } impl Agent { + pub fn new() -> Self { + // Create channels with buffer size 32 (adjust if needed) + let (confirm_tx, confirm_rx) = mpsc::channel(32); + let (tool_tx, tool_rx) = mpsc::channel(32); + // Add MCP notification channel + let (mcp_tx, mcp_rx) = mpsc::channel(100); + + Self { + provider: Mutex::new(None), + extension_manager: RwLock::new(ExtensionManager::new()), + sub_recipe_manager: Mutex::new(SubRecipeManager::new()), + frontend_tools: Mutex::new(HashMap::new()), + frontend_instructions: Mutex::new(None), + prompt_manager: Mutex::new(PromptManager::new()), + confirmation_tx: confirm_tx, + confirmation_rx: Mutex::new(confirm_rx), + tool_result_tx: tool_tx, + tool_result_rx: Arc::new(Mutex::new(tool_rx)), + tool_monitor: Mutex::new(None), + router_tool_selector: Mutex::new(None), + scheduler_service: Mutex::new(None), + // Initialize with MCP notification support + subagent_manager: Mutex::new(Some(SubAgentManager::new(mcp_tx))), + mcp_notification_rx: Arc::new(Mutex::new(mcp_rx)), + } + } + + pub async fn configure_tool_monitor(&self, max_repetitions: Option) { + let mut tool_monitor = self.tool_monitor.lock().await; + *tool_monitor = Some(ToolMonitor::new(max_repetitions)); + } + + pub async fn get_tool_stats(&self) -> Option> { + let tool_monitor = self.tool_monitor.lock().await; + tool_monitor.as_ref().map(|monitor| monitor.get_stats()) + } + + pub async fn reset_tool_monitor(&self) { + if let Some(monitor) = self.tool_monitor.lock().await.as_mut() { + monitor.reset(); + } + } + + /// Set the scheduler service for this agent + pub async fn set_scheduler(&self, scheduler: Arc) { + let mut scheduler_service = self.scheduler_service.lock().await; + *scheduler_service = Some(scheduler); + } + /// Get a reference count clone to the provider pub async fn provider(&self) -> Result, anyhow::Error> { match &*self.provider.lock().await { @@ -182,7 +191,7 @@ impl Agent { pub async fn get_prefixed_tools(&self) -> ExtensionResult> { let mut tools = self .extension_manager - .lock() + .read() .await .get_prefixed_tools(None) .await?; @@ -249,7 +258,7 @@ impl Agent { return (request_id, Ok(ToolCallResult::from(result))); } - let extension_manager = self.extension_manager.lock().await; + let extension_manager = self.extension_manager.read().await; let sub_recipe_manager = self.sub_recipe_manager.lock().await; let result: ToolCallResult = if sub_recipe_manager.is_sub_recipe_tool(&tool_call.name) { @@ -271,6 +280,11 @@ impl Agent { ) } else if tool_call.name == PLATFORM_SEARCH_AVAILABLE_EXTENSIONS_TOOL_NAME { ToolCallResult::from(extension_manager.search_available_extensions().await) + } else if tool_call.name == SUBAGENT_RUN_TASK_TOOL_NAME { + ToolCallResult::from( + self.handle_run_subagent_task(tool_call.arguments.clone()) + .await, + ) } else if self.is_frontend_tool(&tool_call.name).await { // For frontend tools, return an error indicating we need frontend execution ToolCallResult::from(Err(ToolError::ExecutionError( @@ -333,13 +347,13 @@ impl Agent { extension_name: String, request_id: String, ) -> (String, Result, ToolError>) { - let mut extension_manager = self.extension_manager.lock().await; + let mut extension_manager = self.extension_manager.write().await; let selector = self.router_tool_selector.lock().await.clone(); if ToolRouterIndexManager::is_tool_router_enabled(&selector) { if let Some(selector) = selector { let selector_action = if action == "disable" { "remove" } else { "add" }; - let extension_manager = self.extension_manager.lock().await; + let extension_manager = self.extension_manager.read().await; let selector = Arc::new(selector); if let Err(e) = ToolRouterIndexManager::update_extension_tools( &selector, @@ -407,6 +421,33 @@ impl Agent { }) .map_err(|e| ToolError::ExecutionError(e.to_string())); + // Update vector index if operation was successful and vector routing is enabled + if result.is_ok() { + let selector = self.router_tool_selector.lock().await.clone(); + if ToolRouterIndexManager::is_tool_router_enabled(&selector) { + if let Some(selector) = selector { + let vector_action = if action == "disable" { "remove" } else { "add" }; + let extension_manager = self.extension_manager.read().await; + let selector = Arc::new(selector); + if let Err(e) = ToolRouterIndexManager::update_extension_tools( + &selector, + &extension_manager, + &extension_name, + vector_action, + ) + .await + { + return ( + request_id, + Err(ToolError::ExecutionError(format!( + "Failed to update vector index: {}", + e + ))), + ); + } + } + } + } (request_id, result) } @@ -439,7 +480,7 @@ impl Agent { } } _ => { - let mut extension_manager = self.extension_manager.lock().await; + let mut extension_manager = self.extension_manager.write().await; extension_manager.add_extension(extension.clone()).await?; } } @@ -448,7 +489,7 @@ impl Agent { let selector = self.router_tool_selector.lock().await.clone(); if ToolRouterIndexManager::is_tool_router_enabled(&selector) { if let Some(selector) = selector { - let extension_manager = self.extension_manager.lock().await; + let extension_manager = self.extension_manager.read().await; let selector = Arc::new(selector); if let Err(e) = ToolRouterIndexManager::update_extension_tools( &selector, @@ -471,7 +512,7 @@ impl Agent { } pub async fn list_tools(&self, extension_name: Option) -> Vec { - let extension_manager = self.extension_manager.lock().await; + let extension_manager = self.extension_manager.read().await; let mut prefixed_tools = extension_manager .get_prefixed_tools(extension_name.clone()) .await @@ -485,6 +526,12 @@ impl Agent { platform_tools::manage_schedule_tool(), ]); + // Add subagent tool (only if ALPHA_FEATURES is enabled) + let config = Config::global(); + if config.get_param::("ALPHA_FEATURES").unwrap_or(false) { + prefixed_tools.push(subagent_tools::run_task_subagent_tool()); + } + // Add resource tools if supported if extension_manager.supports_resources() { prefixed_tools.extend([ @@ -521,7 +568,7 @@ impl Agent { let selector = self.router_tool_selector.lock().await.clone(); if let Some(selector) = selector { if let Ok(recent_calls) = selector.get_recent_tool_calls(20).await { - let extension_manager = self.extension_manager.lock().await; + let extension_manager = self.extension_manager.read().await; // Add recent tool calls to the list, avoiding duplicates for tool_name in recent_calls { // Find the tool in the extension manager's tools @@ -541,11 +588,14 @@ impl Agent { } pub async fn remove_extension(&self, name: &str) -> Result<()> { + let mut extension_manager = self.extension_manager.write().await; + extension_manager.remove_extension(name).await?; + // If vector tool selection is enabled, remove tools from the index let selector = self.router_tool_selector.lock().await.clone(); if ToolRouterIndexManager::is_tool_router_enabled(&selector) { if let Some(selector) = selector { - let extension_manager = self.extension_manager.lock().await; + let extension_manager = self.extension_manager.read().await; ToolRouterIndexManager::update_extension_tools( &selector, &extension_manager, @@ -556,14 +606,11 @@ impl Agent { } } - let mut extension_manager = self.extension_manager.lock().await; - extension_manager.remove_extension(name).await?; - Ok(()) } pub async fn list_extensions(&self) -> Vec { - let extension_manager = self.extension_manager.lock().await; + let extension_manager = self.extension_manager.read().await; extension_manager .list_extensions() .await @@ -631,6 +678,28 @@ impl Agent { Ok(Box::pin(async_stream::try_stream! { let _ = reply_span.enter(); loop { + // Check for MCP notifications from subagents + let mcp_notifications = self.get_mcp_notifications().await; + for notification in mcp_notifications { + // Extract subagent info from the notification data + if let JsonRpcMessage::Notification(ref notif) = notification { + if let Some(params) = ¬if.params { + if let Some(data) = params.get("data") { + if let (Some(subagent_id), Some(_message)) = ( + data.get("subagent_id").and_then(|v| v.as_str()), + data.get("message").and_then(|v| v.as_str()) + ) { + // Emit as McpNotification event + yield AgentEvent::McpNotification(( + subagent_id.to_string(), + notification.clone() + )); + } + } + } + } + } + match Self::generate_response_from_provider( self.provider().await?, &system_prompt, @@ -832,6 +901,31 @@ impl Agent { messages.push(response); messages.push(final_message_tool_resp); + + // Check for MCP notifications from subagents again before next iteration + // Note: These are already handled as McpNotification events above, + // so we don't need to convert them to assistant messages here. + // This was causing duplicate plain-text notifications. + // let mcp_notifications = self.get_mcp_notifications().await; + // for notification in mcp_notifications { + // // Extract subagent info from the notification data for assistant messages + // if let JsonRpcMessage::Notification(ref notif) = notification { + // if let Some(params) = ¬if.params { + // if let Some(data) = params.get("data") { + // if let (Some(subagent_id), Some(message)) = ( + // data.get("subagent_id").and_then(|v| v.as_str()), + // data.get("message").and_then(|v| v.as_str()) + // ) { + // yield AgentEvent::Message( + // Message::assistant().with_text( + // format!("Subagent {}: {}", subagent_id, message) + // ) + // ); + // } + // } + // } + // } + // } }, Err(ProviderError::ContextLengthExceeded(_)) => { // At this point, the last message should be a user message @@ -862,9 +956,32 @@ impl Agent { prompt_manager.add_system_prompt_extra(instruction); } - /// Update the provider used by this agent + /// Get MCP notifications from subagents + pub async fn get_mcp_notifications(&self) -> Vec { + let mut notifications = Vec::new(); + let mut rx = self.mcp_notification_rx.lock().await; + + while let Ok(notification) = rx.try_recv() { + notifications.push(notification); + } + + notifications + } + + /// Update the provider pub async fn update_provider(&self, provider: Arc) -> Result<()> { - *self.provider.lock().await = Some(provider.clone()); + let mut current_provider = self.provider.lock().await; + *current_provider = Some(provider.clone()); + + // Initialize subagent manager with MCP notification support + // Need to recreate the MCP channel since we're replacing the manager + let (mcp_tx, mcp_rx) = mpsc::channel(100); + { + let mut rx_guard = self.mcp_notification_rx.lock().await; + *rx_guard = mcp_rx; + } + *self.subagent_manager.lock().await = Some(SubAgentManager::new(mcp_tx)); + self.update_router_tool_selector(Some(provider), None) .await?; Ok(()) @@ -876,7 +993,7 @@ impl Agent { reindex_all: Option, ) -> Result<()> { let config = Config::global(); - let extension_manager = self.extension_manager.lock().await; + let _extension_manager = self.extension_manager.read().await; let provider = match provider { Some(p) => p, None => self.provider().await?, @@ -910,6 +1027,7 @@ impl Agent { }; // First index platform tools + let extension_manager = self.extension_manager.read().await; ToolRouterIndexManager::index_platform_tools(&selector, &extension_manager).await?; if reindex_all.unwrap_or(false) { @@ -934,6 +1052,7 @@ impl Agent { // Update the selector *self.router_tool_selector.lock().await = Some(selector.clone()); + Ok(()) } @@ -944,7 +1063,7 @@ impl Agent { } pub async fn list_extension_prompts(&self) -> HashMap> { - let extension_manager = self.extension_manager.lock().await; + let extension_manager = self.extension_manager.read().await; extension_manager .list_prompts() .await @@ -952,7 +1071,7 @@ impl Agent { } pub async fn get_prompt(&self, name: &str, arguments: Value) -> Result { - let extension_manager = self.extension_manager.lock().await; + let extension_manager = self.extension_manager.read().await; // First find which extension has this prompt let prompts = extension_manager @@ -975,7 +1094,7 @@ impl Agent { } pub async fn get_plan_prompt(&self) -> anyhow::Result { - let extension_manager = self.extension_manager.lock().await; + let extension_manager = self.extension_manager.read().await; let tools = extension_manager.get_prefixed_tools(None).await?; let tools_info = tools .into_iter() @@ -1001,7 +1120,7 @@ impl Agent { } pub async fn create_recipe(&self, mut messages: Vec) -> Result { - let extension_manager = self.extension_manager.lock().await; + let extension_manager = self.extension_manager.read().await; let extensions_info = extension_manager.get_extensions_info().await; // Get model name from provider diff --git a/crates/goose/src/agents/mod.rs b/crates/goose/src/agents/mod.rs index 158a7c2c..6b4a6e9f 100644 --- a/crates/goose/src/agents/mod.rs +++ b/crates/goose/src/agents/mod.rs @@ -11,6 +11,11 @@ mod router_tool_selector; mod router_tools; mod schedule_tool; pub mod sub_recipe_manager; +pub mod subagent; +pub mod subagent_handler; +pub mod subagent_manager; +pub mod subagent_tools; +pub mod subagent_types; mod tool_execution; mod tool_router_index_manager; pub(crate) mod tool_vectordb; @@ -20,4 +25,7 @@ pub use agent::{Agent, AgentEvent}; pub use extension::ExtensionConfig; pub use extension_manager::ExtensionManager; pub use prompt_manager::PromptManager; +pub use subagent::{SubAgent, SubAgentConfig, SubAgentProgress, SubAgentStatus}; +pub use subagent_manager::SubAgentManager; +pub use subagent_types::SpawnSubAgentArgs; pub use types::{FrontendTool, SessionConfig}; diff --git a/crates/goose/src/agents/reply_parts.rs b/crates/goose/src/agents/reply_parts.rs index ca194b21..573e9a93 100644 --- a/crates/goose/src/agents/reply_parts.rs +++ b/crates/goose/src/agents/reply_parts.rs @@ -52,7 +52,7 @@ impl Agent { } // Prepare system prompt - let extension_manager = self.extension_manager.lock().await; + let extension_manager = self.extension_manager.read().await; let extensions_info = extension_manager.get_extensions_info().await; // Get model name from provider diff --git a/crates/goose/src/agents/subagent.rs b/crates/goose/src/agents/subagent.rs new file mode 100644 index 00000000..0a02e2d1 --- /dev/null +++ b/crates/goose/src/agents/subagent.rs @@ -0,0 +1,757 @@ +use crate::{ + agents::{extension_manager::ExtensionManager, Agent}, + message::{Message, MessageContent, ToolRequest}, + prompt_template::render_global_file, + providers::base::Provider, + providers::errors::ProviderError, + recipe::Recipe, +}; +use anyhow::anyhow; +use chrono::{DateTime, Utc}; +use mcp_core::protocol::{JsonRpcMessage, JsonRpcNotification}; +use mcp_core::{handler::ToolError, role::Role, tool::Tool}; +use serde::{Deserialize, Serialize}; +use serde_json::{self, json}; +use std::{collections::HashMap, sync::Arc}; +use tokio::sync::{mpsc, Mutex, RwLock}; +use tracing::{debug, error, instrument}; +use uuid::Uuid; + +use crate::agents::platform_tools::{ + self, PLATFORM_LIST_RESOURCES_TOOL_NAME, PLATFORM_READ_RESOURCE_TOOL_NAME, + PLATFORM_SEARCH_AVAILABLE_EXTENSIONS_TOOL_NAME, +}; +use crate::agents::subagent_tools::SUBAGENT_RUN_TASK_TOOL_NAME; + +/// Status of a subagent +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub enum SubAgentStatus { + Ready, // Ready to process messages + Processing, // Currently working on a task + Completed(String), // Task completed (with optional message for success/error) + Terminated, // Manually terminated +} + +/// Configuration for a subagent +#[derive(Debug)] +pub struct SubAgentConfig { + pub id: String, + pub recipe: Option, + pub instructions: Option, + pub max_turns: Option, + pub timeout_seconds: Option, +} + +impl SubAgentConfig { + pub fn new_with_recipe(recipe: Recipe) -> Self { + Self { + id: Uuid::new_v4().to_string(), + recipe: Some(recipe), + instructions: None, + max_turns: None, + timeout_seconds: None, + } + } + + pub fn new_with_instructions(instructions: String) -> Self { + Self { + id: Uuid::new_v4().to_string(), + recipe: None, + instructions: Some(instructions), + max_turns: None, + timeout_seconds: None, + } + } + + pub fn with_max_turns(mut self, max_turns: usize) -> Self { + self.max_turns = Some(max_turns); + self + } + + pub fn with_timeout(mut self, timeout_seconds: u64) -> Self { + self.timeout_seconds = Some(timeout_seconds); + self + } +} + +/// Progress information for a subagent +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SubAgentProgress { + pub subagent_id: String, + pub status: SubAgentStatus, + pub message: String, + pub turn: usize, + pub max_turns: Option, + pub timestamp: DateTime, +} + +/// A specialized agent that can handle specific tasks independently +pub struct SubAgent { + pub id: String, + pub conversation: Arc>>, + pub status: Arc>, + pub config: SubAgentConfig, + pub turn_count: Arc>, + pub created_at: DateTime, + pub recipe_extensions: Arc>>, + pub missing_extensions: Arc>>, // Track extensions that weren't enabled + pub mcp_notification_tx: mpsc::Sender, // For MCP notifications +} + +impl SubAgent { + /// Create a new subagent with the given configuration and provider + #[instrument(skip(config, _provider, extension_manager, mcp_notification_tx))] + pub async fn new( + config: SubAgentConfig, + _provider: Arc, + extension_manager: Arc>, + mcp_notification_tx: mpsc::Sender, + ) -> Result<(Arc, tokio::task::JoinHandle<()>), anyhow::Error> { + debug!("Creating new subagent with id: {}", config.id); + + let mut missing_extensions = Vec::new(); + let mut recipe_extensions = Vec::new(); + + // Check if extensions from recipe exist in the extension manager + if let Some(recipe) = &config.recipe { + if let Some(extensions) = &recipe.extensions { + for extension in extensions { + let extension_name = extension.name(); + let existing_extensions = extension_manager.list_extensions().await?; + + if !existing_extensions.contains(&extension_name) { + missing_extensions.push(extension_name); + } else { + recipe_extensions.push(extension_name); + } + } + } + } else { + // If no recipe, inherit all extensions from the parent agent + let existing_extensions = extension_manager.list_extensions().await?; + recipe_extensions = existing_extensions; + } + + let subagent = Arc::new(SubAgent { + id: config.id.clone(), + conversation: Arc::new(Mutex::new(Vec::new())), + status: Arc::new(RwLock::new(SubAgentStatus::Ready)), + config, + turn_count: Arc::new(Mutex::new(0)), + created_at: Utc::now(), + recipe_extensions: Arc::new(Mutex::new(recipe_extensions)), + missing_extensions: Arc::new(Mutex::new(missing_extensions)), + mcp_notification_tx, + }); + + // Send initial MCP notification + let subagent_clone = Arc::clone(&subagent); + subagent_clone + .send_mcp_notification("subagent_created", "Subagent created and ready") + .await; + + // Create a background task handle (for future use with streaming/monitoring) + let subagent_clone = Arc::clone(&subagent); + let handle = tokio::spawn(async move { + // This could be used for background monitoring, cleanup, etc. + debug!("Subagent {} background task started", subagent_clone.id); + }); + + debug!("Subagent {} created successfully", subagent.id); + Ok((subagent, handle)) + } + + /// Get the current status of the subagent + pub async fn get_status(&self) -> SubAgentStatus { + self.status.read().await.clone() + } + + /// Update the status of the subagent + async fn set_status(&self, status: SubAgentStatus) { + // Update the status first, then release the lock + { + let mut current_status = self.status.write().await; + *current_status = status.clone(); + } // Write lock is released here! + + // Send MCP notifications based on status + match &status { + SubAgentStatus::Processing => { + self.send_mcp_notification("status_changed", "Processing request") + .await; + } + SubAgentStatus::Completed(msg) => { + self.send_mcp_notification("completed", &format!("Completed: {}", msg)) + .await; + } + SubAgentStatus::Terminated => { + self.send_mcp_notification("terminated", "Subagent terminated") + .await; + } + _ => {} + } + } + + /// Send an MCP notification about the subagent's activity + pub async fn send_mcp_notification(&self, notification_type: &str, message: &str) { + let notification = JsonRpcMessage::Notification(JsonRpcNotification { + jsonrpc: "2.0".to_string(), + method: "notifications/message".to_string(), + params: Some(json!({ + "level": "info", + "logger": format!("subagent_{}", self.id), + "data": { + "subagent_id": self.id, + "type": notification_type, + "message": message, + "timestamp": Utc::now().to_rfc3339() + } + })), + }); + + if let Err(e) = self.mcp_notification_tx.send(notification).await { + error!( + "Failed to send MCP notification from subagent {}: {}", + self.id, e + ); + } + } + + /// Get current progress information + pub async fn get_progress(&self) -> SubAgentProgress { + let status = self.get_status().await; + let turn_count = *self.turn_count.lock().await; + + SubAgentProgress { + subagent_id: self.id.clone(), + status: status.clone(), + message: match &status { + SubAgentStatus::Ready => "Ready to process messages".to_string(), + SubAgentStatus::Processing => "Processing request...".to_string(), + SubAgentStatus::Completed(msg) => msg.clone(), + SubAgentStatus::Terminated => "Subagent terminated".to_string(), + }, + turn: turn_count, + max_turns: self.config.max_turns, + timestamp: Utc::now(), + } + } + + /// Process a message and generate a response using the subagent's provider + #[instrument(skip(self, message, provider, extension_manager))] + pub async fn reply_subagent( + &self, + message: String, + provider: Arc, + extension_manager: Arc>, + ) -> Result { + debug!("Processing message for subagent {}", self.id); + self.send_mcp_notification("message_processing", &format!("Processing: {}", message)) + .await; + + // Check if we've exceeded max turns + { + let turn_count = *self.turn_count.lock().await; + if let Some(max_turns) = self.config.max_turns { + if turn_count >= max_turns { + self.set_status(SubAgentStatus::Completed( + "Maximum turns exceeded".to_string(), + )) + .await; + return Err(anyhow!("Maximum turns ({}) exceeded", max_turns)); + } + } + } + + // Set status to processing + self.set_status(SubAgentStatus::Processing).await; + + // Add user message to conversation + let user_message = Message::user().with_text(message.clone()); + { + let mut conversation = self.conversation.lock().await; + conversation.push(user_message.clone()); + } + + // Increment turn count + { + let mut turn_count = self.turn_count.lock().await; + *turn_count += 1; + self.send_mcp_notification( + "turn_progress", + &format!("Turn {}/{}", turn_count, self.config.max_turns.unwrap_or(0)), + ) + .await; + } + + // Get the current conversation for context + let mut messages = self.get_conversation().await; + + // Get tools based on whether we're using a recipe or inheriting from parent + let tools: Vec = if self.config.recipe.is_some() { + // Recipe mode: only get tools from the recipe's extensions + let recipe_extensions = self.recipe_extensions.lock().await; + let mut recipe_tools = Vec::new(); + + debug!( + "Subagent {} operating in recipe mode with {} extensions", + self.id, + recipe_extensions.len() + ); + + for extension_name in recipe_extensions.iter() { + match extension_manager + .get_prefixed_tools(Some(extension_name.clone())) + .await + { + Ok(mut ext_tools) => { + debug!( + "Added {} tools from extension {}", + ext_tools.len(), + extension_name + ); + recipe_tools.append(&mut ext_tools); + } + Err(e) => { + debug!( + "Failed to get tools for extension {}: {}", + extension_name, e + ); + } + } + } + + debug!( + "Subagent {} has {} total recipe tools before filtering", + self.id, + recipe_tools.len() + ); + // Filter out subagent tools from recipe tools + let mut filtered_tools = Self::filter_subagent_tools(recipe_tools); + + // Add platform tools (except subagent tools) + Self::add_platform_tools(&mut filtered_tools, &extension_manager).await; + + debug!( + "Subagent {} has {} tools after filtering and adding platform tools", + self.id, + filtered_tools.len() + ); + filtered_tools + } else { + // No recipe: inherit all tools from parent (but filter out subagent tools) + debug!( + "Subagent {} operating in inheritance mode, using all parent tools", + self.id + ); + let parent_tools = extension_manager.get_prefixed_tools(None).await?; + debug!( + "Subagent {} has {} parent tools before filtering", + self.id, + parent_tools.len() + ); + let mut filtered_tools = Self::filter_subagent_tools(parent_tools); + + // Add platform tools (except subagent tools) + Self::add_platform_tools(&mut filtered_tools, &extension_manager).await; + + debug!( + "Subagent {} has {} tools after filtering and adding platform tools", + self.id, + filtered_tools.len() + ); + filtered_tools + }; + + let toolshim_tools: Vec = vec![]; + + // Build system prompt using the template + let system_prompt = self.build_system_prompt(&tools).await?; + + // Generate response from provider + loop { + match Agent::generate_response_from_provider( + Arc::clone(&provider), + &system_prompt, + &messages, + &tools, + &toolshim_tools, + ) + .await + { + Ok((response, _usage)) => { + // Process any tool calls in the response + let tool_requests: Vec = response + .content + .iter() + .filter_map(|content| { + if let MessageContent::ToolRequest(req) = content { + Some(req.clone()) + } else { + None + } + }) + .collect(); + + // If there are no tool requests, we're done + if tool_requests.is_empty() { + self.add_message(response.clone()).await; + + // Send notification about response + self.send_mcp_notification( + "response_generated", + &format!("Responded: {}", response.as_concat_text()), + ) + .await; + + // Add delay before completion to ensure all processing finishes + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + + // Set status back to ready and return the final response + self.set_status(SubAgentStatus::Completed("Completed!".to_string())) + .await; + break Ok(response); + } + + // Add the assistant message with tool calls to the conversation + messages.push(response.clone()); + + // Process each tool request and create user response messages + for request in &tool_requests { + if let Ok(tool_call) = &request.tool_call { + // Send notification about tool usage + self.send_mcp_notification( + "tool_usage", + &format!("Using tool: {}", tool_call.name), + ) + .await; + + // Handle platform tools or dispatch to extension manager + let tool_result = if self.is_platform_tool(&tool_call.name) { + self.handle_platform_tool_call( + tool_call.clone(), + &extension_manager, + ) + .await + } else { + match extension_manager + .dispatch_tool_call(tool_call.clone()) + .await + { + Ok(result) => result.result.await, + Err(e) => Err(ToolError::ExecutionError(e.to_string())), + } + }; + + match tool_result { + Ok(result) => { + // Create a user message with the tool response + let tool_response_message = Message::user() + .with_tool_response(request.id.clone(), Ok(result.clone())); + messages.push(tool_response_message); + + // Send notification about tool completion + self.send_mcp_notification( + "tool_completed", + &format!("Tool {} completed successfully", tool_call.name), + ) + .await; + } + Err(e) => { + // Create a user message with the tool error + let tool_error_message = Message::user().with_tool_response( + request.id.clone(), + Err(ToolError::ExecutionError(e.to_string())), + ); + messages.push(tool_error_message); + + // Send notification about tool error + self.send_mcp_notification( + "tool_error", + &format!("Tool {} error: {}", tool_call.name, e), + ) + .await; + } + } + } + } + + // Continue the loop to get the next response from the provider + } + Err(ProviderError::ContextLengthExceeded(_)) => { + self.set_status(SubAgentStatus::Completed( + "Context length exceeded".to_string(), + )) + .await; + break Ok(Message::assistant().with_context_length_exceeded( + "The context length of the model has been exceeded. Please start a new session and try again.", + )); + } + Err(ProviderError::RateLimitExceeded(_)) => { + self.set_status(SubAgentStatus::Completed("Rate limit exceeded".to_string())) + .await; + break Ok(Message::assistant() + .with_text("Rate limit exceeded. Please try again later.")); + } + Err(e) => { + self.set_status(SubAgentStatus::Completed(format!("Error: {}", e))) + .await; + error!("Error: {}", e); + break Ok(Message::assistant().with_text(format!("Ran into this error: {e}.\n\nPlease retry if you think this is a transient or recoverable error."))); + } + } + } + } + + /// Add a message to the conversation (for tracking agent responses) + pub async fn add_message(&self, message: Message) { + let mut conversation = self.conversation.lock().await; + conversation.push(message); + } + + /// Get the full conversation history + pub async fn get_conversation(&self) -> Vec { + self.conversation.lock().await.clone() + } + + /// Check if the subagent has completed its task + pub async fn is_completed(&self) -> bool { + matches!( + self.get_status().await, + SubAgentStatus::Completed(_) | SubAgentStatus::Terminated + ) + } + + /// Terminate the subagent + pub async fn terminate(&self) -> Result<(), anyhow::Error> { + debug!("Terminating subagent {}", self.id); + self.set_status(SubAgentStatus::Terminated).await; + Ok(()) + } + + /// Get formatted conversation for display + pub async fn get_formatted_conversation(&self) -> String { + let conversation = self.conversation.lock().await; + + let mut formatted = format!("=== Subagent {} Conversation ===\n", self.id); + + if let Some(recipe) = &self.config.recipe { + formatted.push_str(&format!("Recipe: {}\n", recipe.title)); + } else if let Some(instructions) = &self.config.instructions { + formatted.push_str(&format!("Instructions: {}\n", instructions)); + } else { + formatted.push_str("Mode: Ad-hoc subagent\n"); + } + + formatted.push_str(&format!( + "Created: {}\n", + self.created_at.format("%Y-%m-%d %H:%M:%S UTC") + )); + + let progress = self.get_progress().await; + + formatted.push_str(&format!("Status: {:?}\n", progress.status)); + formatted.push_str(&format!("Turn: {}", progress.turn)); + if let Some(max_turns) = progress.max_turns { + formatted.push_str(&format!("/{}", max_turns)); + } + formatted.push_str("\n\n"); + + for (i, message) in conversation.iter().enumerate() { + formatted.push_str(&format!( + "{}. {}: {}\n", + i + 1, + match message.role { + Role::User => "User", + Role::Assistant => "Assistant", + }, + message.as_concat_text() + )); + } + + formatted.push_str("=== End Conversation ===\n"); + + formatted + } + + /// Get the list of extensions that weren't enabled + pub async fn get_missing_extensions(&self) -> Vec { + self.missing_extensions.lock().await.clone() + } + + /// Filter out subagent spawning tools to prevent infinite recursion + fn filter_subagent_tools(tools: Vec) -> Vec { + let original_count = tools.len(); + let filtered_tools: Vec = tools + .into_iter() + .filter(|tool| { + let should_keep = tool.name != SUBAGENT_RUN_TASK_TOOL_NAME; + if !should_keep { + debug!("Filtering out subagent tool: {}", tool.name); + } + should_keep + }) + .collect(); + + let filtered_count = filtered_tools.len(); + if filtered_count < original_count { + debug!( + "Filtered {} subagent tool(s) from {} total tools", + original_count - filtered_count, + original_count + ); + } + + filtered_tools + } + + /// Add platform tools to the subagent's tool list (excluding dangerous tools) + async fn add_platform_tools(tools: &mut Vec, extension_manager: &ExtensionManager) { + debug!("Adding safe platform tools to subagent"); + + // Add safe platform tools - subagents can search for extensions but can't manage them or schedules + tools.push(platform_tools::search_available_extensions_tool()); + debug!("Added search_available_extensions tool"); + + // Add resource tools if supported - these are generally safe for subagents + if extension_manager.supports_resources() { + tools.extend([ + platform_tools::read_resource_tool(), + platform_tools::list_resources_tool(), + ]); + debug!("Added 2 resource platform tools"); + } + + // Note: We explicitly do NOT add these tools for security reasons: + // - manage_extensions (could interfere with parent agent's extensions) + // - manage_schedule (could interfere with parent agent's scheduling) + // - subagent spawning tools (prevent recursion) + debug!("Platform tools added successfully (dangerous tools excluded)"); + } + + /// Check if a tool name is a platform tool that subagents can use + fn is_platform_tool(&self, tool_name: &str) -> bool { + matches!( + tool_name, + PLATFORM_SEARCH_AVAILABLE_EXTENSIONS_TOOL_NAME + | PLATFORM_READ_RESOURCE_TOOL_NAME + | PLATFORM_LIST_RESOURCES_TOOL_NAME + ) + } + + /// Handle platform tool calls that are safe for subagents + async fn handle_platform_tool_call( + &self, + tool_call: mcp_core::tool::ToolCall, + extension_manager: &ExtensionManager, + ) -> Result, ToolError> { + debug!("Handling platform tool: {}", tool_call.name); + + match tool_call.name.as_str() { + PLATFORM_SEARCH_AVAILABLE_EXTENSIONS_TOOL_NAME => extension_manager + .search_available_extensions() + .await + .map_err(|e| ToolError::ExecutionError(e.to_string())), + PLATFORM_READ_RESOURCE_TOOL_NAME => extension_manager + .read_resource(tool_call.arguments) + .await + .map_err(|e| ToolError::ExecutionError(e.to_string())), + PLATFORM_LIST_RESOURCES_TOOL_NAME => extension_manager + .list_resources(tool_call.arguments) + .await + .map_err(|e| ToolError::ExecutionError(e.to_string())), + _ => Err(ToolError::ExecutionError(format!( + "Platform tool '{}' is not available to subagents for security reasons", + tool_call.name + ))), + } + } + + /// Build the system prompt for the subagent using the template + async fn build_system_prompt(&self, available_tools: &[Tool]) -> Result { + let mut context = HashMap::new(); + + // Add basic context + context.insert( + "current_date_time", + serde_json::Value::String(Utc::now().format("%Y-%m-%d %H:%M:%S UTC").to_string()), + ); + context.insert("subagent_id", serde_json::Value::String(self.id.clone())); + + // Add recipe information if available + if let Some(recipe) = &self.config.recipe { + context.insert( + "recipe_title", + serde_json::Value::String(recipe.title.clone()), + ); + } + + // Add max turns if configured + if let Some(max_turns) = self.config.max_turns { + context.insert( + "max_turns", + serde_json::Value::Number(serde_json::Number::from(max_turns)), + ); + } + + // Add task instructions + let instructions = if let Some(recipe) = &self.config.recipe { + recipe.instructions.as_deref().unwrap_or("") + } else { + self.config.instructions.as_deref().unwrap_or("") + }; + context.insert( + "task_instructions", + serde_json::Value::String(instructions.to_string()), + ); + + // Add available extensions (only if we have a recipe and extensions) + if self.config.recipe.is_some() { + let extensions: Vec = self.recipe_extensions.lock().await.clone(); + if !extensions.is_empty() { + context.insert( + "extensions", + serde_json::Value::Array( + extensions + .into_iter() + .map(serde_json::Value::String) + .collect(), + ), + ); + } + } + + // Add available tools with descriptions for better context + let tools_with_descriptions: Vec = available_tools + .iter() + .map(|t| { + if t.description.is_empty() { + t.name.clone() + } else { + format!("{}: {}", t.name, t.description) + } + }) + .collect(); + + context.insert( + "available_tools", + serde_json::Value::String(if tools_with_descriptions.is_empty() { + "None".to_string() + } else { + tools_with_descriptions.join(", ") + }), + ); + + // Add tool count for context + context.insert( + "tool_count", + serde_json::Value::Number(serde_json::Number::from(available_tools.len())), + ); + + // Render the subagent system prompt template + let system_prompt = render_global_file("subagent_system.md", &context) + .map_err(|e| anyhow!("Failed to render subagent system prompt: {}", e))?; + + Ok(system_prompt) + } +} diff --git a/crates/goose/src/agents/subagent_handler.rs b/crates/goose/src/agents/subagent_handler.rs new file mode 100644 index 00000000..f281f748 --- /dev/null +++ b/crates/goose/src/agents/subagent_handler.rs @@ -0,0 +1,79 @@ +use anyhow::Result; +use mcp_core::{Content, ToolError}; +use serde_json::Value; +use std::sync::Arc; + +use crate::agents::subagent_types::SpawnSubAgentArgs; +use crate::agents::Agent; + +impl Agent { + /// Handle running a complete subagent task (replaces the individual spawn/send/check tools) + pub async fn handle_run_subagent_task( + &self, + arguments: Value, + ) -> Result, ToolError> { + let subagent_manager = self.subagent_manager.lock().await; + let manager = subagent_manager.as_ref().ok_or_else(|| { + ToolError::ExecutionError("Subagent manager not initialized".to_string()) + })?; + + // Parse arguments - using "task" as the main message parameter + let message = arguments + .get("task") + .and_then(|v| v.as_str()) + .ok_or_else(|| ToolError::ExecutionError("Missing task parameter".to_string()))? + .to_string(); + + // Either recipe_name or instructions must be provided + let recipe_name = arguments + .get("recipe_name") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()); + let instructions = arguments + .get("instructions") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()); + + let mut args = if let Some(recipe_name) = recipe_name { + SpawnSubAgentArgs::new_with_recipe(recipe_name, message.clone()) + } else if let Some(instructions) = instructions { + SpawnSubAgentArgs::new_with_instructions(instructions, message.clone()) + } else { + return Err(ToolError::ExecutionError( + "Either recipe_name or instructions parameter must be provided".to_string(), + )); + }; + + // Set max_turns with default of 10 + let max_turns = arguments + .get("max_turns") + .and_then(|v| v.as_u64()) + .unwrap_or(10) as usize; + args = args.with_max_turns(max_turns); + + if let Some(timeout) = arguments.get("timeout_seconds").and_then(|v| v.as_u64()) { + args = args.with_timeout(timeout); + } + + // Get the provider from the parent agent + let provider = self + .provider() + .await + .map_err(|e| ToolError::ExecutionError(format!("Failed to get provider: {}", e)))?; + + // Get the extension manager from the parent agent + let extension_manager = Arc::new(self.extension_manager.read().await); + + // Run the complete subagent task + match manager + .run_complete_subagent_task(args, provider, extension_manager) + .await + { + Ok(result) => Ok(vec![Content::text(result)]), + Err(e) => Err(ToolError::ExecutionError(format!( + "Failed to run subagent task: {}", + e + ))), + } + } +} diff --git a/crates/goose/src/agents/subagent_manager.rs b/crates/goose/src/agents/subagent_manager.rs new file mode 100644 index 00000000..174facee --- /dev/null +++ b/crates/goose/src/agents/subagent_manager.rs @@ -0,0 +1,404 @@ +use std::collections::HashMap; +use std::path::Path; +use std::sync::Arc; + +use anyhow::{anyhow, Result}; +use mcp_core::protocol::JsonRpcMessage; +use tokio::sync::{mpsc, Mutex, RwLock}; +use tracing::{debug, error, instrument, warn}; + +use crate::agents::extension_manager::ExtensionManager; +use crate::agents::subagent::{SubAgent, SubAgentConfig, SubAgentProgress, SubAgentStatus}; +use crate::agents::subagent_types::SpawnSubAgentArgs; +use crate::providers::base::Provider; +use crate::recipe::Recipe; + +/// Manages the lifecycle of subagents +pub struct SubAgentManager { + subagents: Arc>>>, + handles: Arc>>>, + mcp_notification_tx: mpsc::Sender, +} + +impl SubAgentManager { + /// Create a new subagent manager + pub fn new(mcp_notification_tx: mpsc::Sender) -> Self { + Self { + subagents: Arc::new(RwLock::new(HashMap::new())), + handles: Arc::new(Mutex::new(HashMap::new())), + mcp_notification_tx, + } + } + + /// Spawn a new interactive subagent + #[instrument(skip(self, args, provider, extension_manager))] + pub async fn spawn_interactive_subagent( + &self, + args: SpawnSubAgentArgs, + provider: Arc, + extension_manager: Arc>, + ) -> Result { + debug!("Spawning interactive subagent"); + + // Create subagent config based on whether we have a recipe or instructions + let mut config = if let Some(recipe_name) = args.recipe_name { + debug!("Using recipe: {}", recipe_name); + // Load the recipe + let recipe = self.load_recipe(&recipe_name).await?; + SubAgentConfig::new_with_recipe(recipe) + } else if let Some(instructions) = args.instructions { + debug!("Using direct instructions"); + SubAgentConfig::new_with_instructions(instructions) + } else { + return Err(anyhow!( + "Either recipe_name or instructions must be provided" + )); + }; + + if let Some(max_turns) = args.max_turns { + config = config.with_max_turns(max_turns); + } + if let Some(timeout) = args.timeout_seconds { + config = config.with_timeout(timeout); + } + + // Create the subagent with the parent agent's provider + let (subagent, handle) = SubAgent::new( + config, + Arc::clone(&provider), + Arc::clone(&extension_manager), + self.mcp_notification_tx.clone(), + ) + .await?; + let subagent_id = subagent.id.clone(); + + // Store the subagent and its handle + { + let mut subagents = self.subagents.write().await; + subagents.insert(subagent_id.clone(), Arc::clone(&subagent)); + } + { + let mut handles = self.handles.lock().await; + handles.insert(subagent_id.clone(), handle); + } + + // Return immediately - no initial message processing + Ok(subagent_id) + } + + /// Get a subagent by ID + pub async fn get_subagent(&self, id: &str) -> Option> { + let subagents = self.subagents.read().await; + subagents.get(id).cloned() + } + + /// List all active subagent IDs + pub async fn list_subagents(&self) -> Vec { + let subagents = self.subagents.read().await; + subagents.keys().cloned().collect() + } + + /// Get status of all subagents + pub async fn get_subagent_status(&self) -> HashMap { + let subagents = self.subagents.read().await; + let mut status_map = HashMap::new(); + + for (id, subagent) in subagents.iter() { + status_map.insert(id.clone(), subagent.get_status().await); + } + + status_map + } + + /// Get progress of all subagents + pub async fn get_subagent_progress(&self) -> HashMap { + let subagents = self.subagents.read().await; + let mut progress_map = HashMap::new(); + + for (id, subagent) in subagents.iter() { + progress_map.insert(id.clone(), subagent.get_progress().await); + } + + progress_map + } + + /// Send a message to a specific subagent + #[instrument(skip(self, message, provider, extension_manager))] + pub async fn send_message_to_subagent( + &self, + subagent_id: &str, + message: String, + provider: Arc, + extension_manager: Arc>, + ) -> Result { + let subagent = self + .get_subagent(subagent_id) + .await + .ok_or_else(|| anyhow!("Subagent {} not found", subagent_id))?; + + // Process the message and get a reply + match subagent + .reply_subagent(message, provider, extension_manager) + .await + { + Ok(response) => Ok(format!( + "Message sent to subagent {}. Response:\n{}", + subagent_id, + response.as_concat_text() + )), + Err(e) => Err(anyhow!("Failed to process message in subagent: {}", e)), + } + } + + /// Terminate a specific subagent + #[instrument(skip(self))] + pub async fn terminate_subagent(&self, id: &str) -> Result<()> { + debug!("Terminating subagent {}", id); + + // Get and terminate the subagent + let subagent = { + let mut subagents = self.subagents.write().await; + subagents.remove(id) + }; + + if let Some(subagent) = subagent { + subagent.terminate().await?; + } else { + warn!("Attempted to terminate non-existent subagent {}", id); + return Err(anyhow!("Subagent {} not found", id)); + } + + // Clean up the background handle + let handle = { + let mut handles = self.handles.lock().await; + handles.remove(id) + }; + + if let Some(handle) = handle { + handle.abort(); + } + + debug!("Subagent {} terminated successfully", id); + Ok(()) + } + + /// Terminate all subagents + #[instrument(skip(self))] + pub async fn terminate_all_subagents(&self) -> Result<()> { + debug!("Terminating all subagents"); + + let subagent_ids: Vec = { + let subagents = self.subagents.read().await; + subagents.keys().cloned().collect() + }; + + for id in subagent_ids { + if let Err(e) = self.terminate_subagent(&id).await { + error!("Failed to terminate subagent {}: {}", id, e); + } + } + + debug!("All subagents terminated"); + Ok(()) + } + + /// Get formatted conversation from a subagent + pub async fn get_subagent_conversation(&self, id: &str) -> Result { + let subagent = self + .get_subagent(id) + .await + .ok_or_else(|| anyhow!("Subagent {} not found", id))?; + + Ok(subagent.get_formatted_conversation().await) + } + + /// Clean up completed or failed subagents + pub async fn cleanup_completed_subagents(&self) -> Result { + let mut completed_ids = Vec::new(); + + // Find completed subagents + { + let subagents = self.subagents.read().await; + for (id, subagent) in subagents.iter() { + if subagent.is_completed().await { + completed_ids.push(id.clone()); + } + } + } + + // Remove completed subagents + let count = completed_ids.len(); + for id in completed_ids { + if let Err(e) = self.terminate_subagent(&id).await { + error!("Failed to cleanup completed subagent {}: {}", id, e); + } + } + + debug!("Cleaned up {} completed subagents", count); + Ok(count) + } + + /// Load a recipe from file + async fn load_recipe(&self, recipe_name: &str) -> Result { + // Try to load from current directory first + let recipe_path = if recipe_name.ends_with(".yaml") || recipe_name.ends_with(".yml") { + recipe_name.to_string() + } else { + format!("{}.yaml", recipe_name) + }; + + if Path::new(&recipe_path).exists() { + let content = tokio::fs::read_to_string(&recipe_path).await?; + let recipe: Recipe = serde_yaml::from_str(&content)?; + return Ok(recipe); + } + + // Try some common recipe locations + let common_paths = [ + format!("recipes/{}", recipe_path), + format!("./recipes/{}", recipe_path), + format!("../recipes/{}", recipe_path), + ]; + + for path in &common_paths { + if Path::new(path).exists() { + let content = tokio::fs::read_to_string(path).await?; + let recipe: Recipe = serde_yaml::from_str(&content)?; + return Ok(recipe); + } + } + + Err(anyhow!( + "Recipe file '{}' not found in current directory or common recipe locations", + recipe_name + )) + } + + /// Get count of active subagents + pub async fn get_active_count(&self) -> usize { + let subagents = self.subagents.read().await; + subagents.len() + } + + /// Check if a subagent exists + pub async fn has_subagent(&self, id: &str) -> bool { + let subagents = self.subagents.read().await; + subagents.contains_key(id) + } + + /// Run a complete subagent task (spawn, execute, cleanup) + #[instrument(skip(self, args, provider, extension_manager))] + pub async fn run_complete_subagent_task( + &self, + args: SpawnSubAgentArgs, + provider: Arc, + extension_manager: Arc>, + ) -> Result { + debug!("Running complete subagent task"); + + // Create subagent config based on whether we have a recipe or instructions + let mut config = if let Some(recipe_name) = args.recipe_name { + debug!("Using recipe: {}", recipe_name); + // Load the recipe + let recipe = self.load_recipe(&recipe_name).await?; + SubAgentConfig::new_with_recipe(recipe) + } else if let Some(instructions) = args.instructions { + debug!("Using direct instructions"); + SubAgentConfig::new_with_instructions(instructions) + } else { + return Err(anyhow!( + "Either recipe_name or instructions must be provided" + )); + }; + + // Set default max_turns if not provided + let max_turns = args.max_turns.unwrap_or(10); + config = config.with_max_turns(max_turns); + + if let Some(timeout) = args.timeout_seconds { + config = config.with_timeout(timeout); + } + + // Create the subagent with the parent agent's provider + let (subagent, handle) = SubAgent::new( + config, + Arc::clone(&provider), + Arc::clone(&extension_manager), + self.mcp_notification_tx.clone(), + ) + .await?; + let subagent_id = subagent.id.clone(); + + // Store the subagent and its handle temporarily + { + let mut subagents = self.subagents.write().await; + subagents.insert(subagent_id.clone(), Arc::clone(&subagent)); + } + { + let mut handles = self.handles.lock().await; + handles.insert(subagent_id.clone(), handle); + } + + // Run the complete conversation + let mut conversation_result = String::new(); + let turn_count = 0; + let current_message = args.message.clone(); + + // For now, we just complete after one turn since we don't have a mechanism + // for the subagent to continue autonomously without user input + // In a future iteration, we could add logic for the subagent to continue + // working on multi-step tasks with proper turn management + match subagent + .reply_subagent( + current_message, + Arc::clone(&provider), + Arc::clone(&extension_manager), + ) + .await + { + Ok(response) => { + let response_text = response.as_concat_text(); + conversation_result.push_str(&format!( + "\n--- Turn {} ---\n{}", + turn_count + 1, + response_text + )); + conversation_result.push_str(&format!( + "\n[Task completed after {} turns]", + turn_count + 1 + )); + } + Err(e) => { + conversation_result + .push_str(&format!("\n[Error after {} turns: {}]", turn_count, e)); + } + } + + // Clean up the subagent + if let Err(e) = self.terminate_subagent(&subagent_id).await { + debug!("Failed to cleanup subagent {}: {}", subagent_id, e); + } + + // Return the complete conversation result + Ok(format!("Subagent task completed:\n{}", conversation_result)) + } +} + +impl Default for SubAgentManager { + fn default() -> Self { + // Create a dummy channel for default implementation + // In practice, this should not be used - SubAgentManager should be created + // with a proper MCP notification sender + let (tx, _rx) = mpsc::channel(1); + Self::new(tx) + } +} + +impl Drop for SubAgentManager { + fn drop(&mut self) { + // Note: In a real implementation, you might want to spawn a task to clean up + // subagents gracefully, but for now we'll rely on the Drop implementations + // of the individual components + debug!("SubAgentManager dropped"); + } +} diff --git a/crates/goose/src/agents/subagent_tools.rs b/crates/goose/src/agents/subagent_tools.rs new file mode 100644 index 00000000..f8f35f1f --- /dev/null +++ b/crates/goose/src/agents/subagent_tools.rs @@ -0,0 +1,68 @@ +use indoc::indoc; +use mcp_core::tool::{Tool, ToolAnnotations}; +use serde_json::json; + +pub const SUBAGENT_RUN_TASK_TOOL_NAME: &str = "subagent__run_task"; + +pub fn run_task_subagent_tool() -> Tool { + Tool::new( + SUBAGENT_RUN_TASK_TOOL_NAME.to_string(), + indoc! {r#" + Spawn a specialized subagent to handle a specific task completely and automatically. + + This tool creates a subagent, processes your task through a complete conversation, + and returns the final result. The subagent is automatically cleaned up after completion. + + You can configure the subagent in two ways: + 1. Using a recipe file that defines instructions, extensions, and behavior + 2. Providing direct instructions for ad-hoc tasks + + The subagent will work autonomously until the task is complete, it reaches max_turns, + or it encounters an error. You'll get the final result without needing to manage + the subagent lifecycle manually. + + Examples: + - "Convert these unittest files to pytest format: file1.py, file2.py" + - "Research the latest developments in AI and provide a comprehensive summary" + - "Review this code for security vulnerabilities and suggest fixes" + - "Refactor this legacy code to use modern Python patterns" + "#} + .to_string(), + json!({ + "type": "object", + "required": ["task"], + "properties": { + "recipe_name": { + "type": "string", + "description": "Name of the recipe file to configure the subagent (e.g., 'research_assistant_recipe.yaml'). Either this or 'instructions' must be provided." + }, + "instructions": { + "type": "string", + "description": "Direct instructions for the subagent's task. Either this or 'recipe_name' must be provided. Example: 'You are a code refactoring assistant. Help convert unittest tests to pytest format.'" + }, + "task": { + "type": "string", + "description": "The task description or initial message for the subagent to work on" + }, + "max_turns": { + "type": "integer", + "description": "Maximum number of conversation turns before auto-completion (default: 10)", + "minimum": 1, + "default": 10 + }, + "timeout_seconds": { + "type": "integer", + "description": "Optional timeout for the entire task in seconds", + "minimum": 1 + } + } + }), + Some(ToolAnnotations { + title: Some("Run subagent task".to_string()), + read_only_hint: false, + destructive_hint: false, + idempotent_hint: false, + open_world_hint: false, + }), + ) +} diff --git a/crates/goose/src/agents/subagent_types.rs b/crates/goose/src/agents/subagent_types.rs new file mode 100644 index 00000000..1fbc8556 --- /dev/null +++ b/crates/goose/src/agents/subagent_types.rs @@ -0,0 +1,42 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SpawnSubAgentArgs { + pub recipe_name: Option, + pub instructions: Option, + pub message: String, + pub max_turns: Option, + pub timeout_seconds: Option, +} + +impl SpawnSubAgentArgs { + pub fn new_with_recipe(recipe_name: String, message: String) -> Self { + Self { + recipe_name: Some(recipe_name), + instructions: None, + message, + max_turns: None, + timeout_seconds: None, + } + } + + pub fn new_with_instructions(instructions: String, message: String) -> Self { + Self { + recipe_name: None, + instructions: Some(instructions), + message, + max_turns: None, + timeout_seconds: None, + } + } + + pub fn with_max_turns(mut self, max_turns: usize) -> Self { + self.max_turns = Some(max_turns); + self + } + + pub fn with_timeout(mut self, timeout_seconds: u64) -> Self { + self.timeout_seconds = Some(timeout_seconds); + self + } +} diff --git a/crates/goose/src/prompts/subagent_system.md b/crates/goose/src/prompts/subagent_system.md new file mode 100644 index 00000000..1cb47944 --- /dev/null +++ b/crates/goose/src/prompts/subagent_system.md @@ -0,0 +1,76 @@ +You are a specialized subagent within the Goose AI framework, created by Block, the parent company of Square, CashApp, and Tidal. Goose is being developed as an open-source software project. You were spawned by the main Goose agent to handle a specific task or set of operations. + +The current date is {{current_date_time}}. + +You use LLM providers with tool calling capability. You can be used with different language models (gpt-4o, claude-3.5-sonnet, o1, llama-3.2, deepseek-r1, etc). These models have varying knowledge cut-off dates depending on when they were trained, but typically it's between 5-10 months prior to the current date. + +# Your Role as a Subagent + +You are an autonomous subagent with the following characteristics: +- **Independence**: You can make decisions and execute tools within your scope +- **Specialization**: You focus on specific tasks assigned by the main Goose agent +- **Collaboration**: You report progress and results back to the main Goose agent +- **Bounded Operation**: You operate within defined limits (turn count, timeout, specific instructions) +- **Security**: You cannot spawn additional subagents to prevent infinite recursion and maintain system stability + +{% if subagent_id is defined %} +**Subagent ID**: {{subagent_id}} +{% endif %} +{% if recipe_title is defined %} +**Recipe**: {{recipe_title}} +{% endif %} +{% if max_turns is defined %} +**Maximum Turns**: {{max_turns}} +{% endif %} + +# Task Instructions + +{{task_instructions}} + +# Extensions and Tools + +Extensions allow other applications to provide context to you. Extensions connect you to different data sources and tools. You are capable of using tools from these extensions to solve higher level problems and can interact with multiple at once. + +{% if recipe_title is defined %} +**Recipe Mode**: You are operating with a specific recipe that defines which extensions and tools you can use. This focused approach helps you stay on task and work efficiently within your defined scope. + +{% if (extensions is defined) and extensions %} +You have access to the following recipe-specific extensions ({{extensions|length}} extension{% if extensions|length > 1 %}s{% endif %}). Each of these extensions provides tools that are in your tool specification: + +{% for extension in extensions %} +- {{extension}} +{% endfor %} + +You have {{tool_count}} tool{% if tool_count > 1 %}s{% endif %} available: {{available_tools}} +{% else %} +Your recipe doesn't specify any extensions, so you have access to the basic tool set. + +You have {{tool_count}} tool{% if tool_count > 1 %}s{% endif %} available: {{available_tools}} +{% endif %} +{% else %} +**Inheritance Mode**: You inherit all available extensions and tools from the parent Goose agent. You can use all the tools that were available to the parent agent when you were created. + +You have {{tool_count}} tool{% if tool_count > 1 %}s{% endif %} available: {{available_tools}} +{% endif %} + +# Communication Guidelines + +- **Progress Updates**: Regularly communicate your progress on the assigned task +- **Completion Reporting**: Clearly indicate when your task is complete and provide results +- **Error Handling**: Report any issues or limitations you encounter +- **Scope Awareness**: Stay focused on your assigned task and don't exceed your defined boundaries + +# Response Guidelines + +- Use Markdown formatting for all responses. +- Follow best practices for Markdown, including: + - Using headers for organization. + - Bullet points for lists. + - Links formatted correctly, either as linked text (e.g., [this is linked text](https://example.com)) or automatic links using angle brackets (e.g., ). +- For code examples, use fenced code blocks by placing triple backticks (` ``` `) before and after the code. Include the language identifier after the opening backticks (e.g., ` ```python `) to enable syntax highlighting. +- Ensure clarity, conciseness, and proper formatting to enhance readability and usability. +- Be task-focused in your communications and provide clear status updates about your progress. +- When completing tasks, summarize what was accomplished. +- If you encounter limitations or need clarification, communicate this clearly. + +Remember: You are part of a larger Goose system working collaboratively to solve complex problems. Your specialized focus helps the main agent handle multiple concerns efficiently. \ No newline at end of file diff --git a/crates/goose/src/recipe/mod.rs b/crates/goose/src/recipe/mod.rs index 9b93f7ec..a8604ff7 100644 --- a/crates/goose/src/recipe/mod.rs +++ b/crates/goose/src/recipe/mod.rs @@ -58,7 +58,7 @@ fn default_version() -> String { /// parameters: None, /// }; /// -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct Recipe { // Required fields #[serde(default = "default_version")] @@ -98,7 +98,7 @@ pub struct Recipe { pub sub_recipes: Option>, // sub-recipes for the recipe } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct Author { #[serde(skip_serializing_if = "Option::is_none")] pub contact: Option, // creator/contact information of the recipe @@ -107,7 +107,7 @@ pub struct Author { pub metadata: Option, // any additional metadata for the author } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct Settings { #[serde(skip_serializing_if = "Option::is_none")] pub goose_provider: Option, @@ -152,7 +152,7 @@ where } } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "snake_case")] pub enum RecipeParameterRequirement { Required, @@ -170,7 +170,7 @@ impl fmt::Display for RecipeParameterRequirement { } } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "snake_case")] pub enum RecipeParameterInputType { String, @@ -190,7 +190,7 @@ impl fmt::Display for RecipeParameterInputType { } } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct RecipeParameter { pub key: String, pub input_type: RecipeParameterInputType, diff --git a/crates/goose/src/scheduler.rs b/crates/goose/src/scheduler.rs index 0b28eeda..0d3952cc 100644 --- a/crates/goose/src/scheduler.rs +++ b/crates/goose/src/scheduler.rs @@ -1195,6 +1195,7 @@ async fn run_scheduled_job_internal( Ok(AgentEvent::ModelChange { .. }) => { // Model change events are informational, just continue } + Err(e) => { tracing::error!( "[Job {}] Error receiving message from agent: {}", diff --git a/crates/goose/tests/agent.rs b/crates/goose/tests/agent.rs index 72917d00..18260129 100644 --- a/crates/goose/tests/agent.rs +++ b/crates/goose/tests/agent.rs @@ -142,6 +142,7 @@ async fn run_truncate_test( Ok(AgentEvent::ModelChange { .. }) => { // Model change events are informational, just continue } + Err(e) => { println!("Error: {:?}", e); return Err(e);