diff --git a/crates/goose-cli/src/session/builder.rs b/crates/goose-cli/src/session/builder.rs index 9782dea3..588325c3 100644 --- a/crates/goose-cli/src/session/builder.rs +++ b/crates/goose-cli/src/session/builder.rs @@ -203,6 +203,7 @@ pub async fn build_session(session_config: SessionBuilderConfig) -> Session { // Create the agent let agent: Agent = Agent::new(); + if let Some(sub_recipes) = session_config.sub_recipes { agent.add_sub_recipes(sub_recipes).await; } diff --git a/crates/goose-cli/src/session/mod.rs b/crates/goose-cli/src/session/mod.rs index 5669c142..a0616535 100644 --- a/crates/goose-cli/src/session/mod.rs +++ b/crates/goose-cli/src/session/mod.rs @@ -7,7 +7,10 @@ mod prompt; mod task_execution_display; mod thinking; -use crate::session::task_execution_display::TASK_EXECUTION_NOTIFICATION_TYPE; +use crate::session::task_execution_display::{ + format_task_execution_notification, TASK_EXECUTION_NOTIFICATION_TYPE, +}; +use std::io::Write; pub use self::export::message_to_markdown; pub use builder::{build_session, SessionBuilderConfig, SessionSettings}; @@ -20,8 +23,6 @@ use goose::permission::PermissionConfirmation; use goose::providers::base::Provider; pub use goose::session::Identifier; use goose::utils::safe_truncate; -use std::io::Write; -use task_execution_display::format_task_execution_notification; use anyhow::{Context, Result}; use completion::GooseCompleter; @@ -1077,7 +1078,7 @@ impl Session { // Handle subagent notifications - show immediately if let Some(_id) = subagent_id { - // Show subagent notifications immediately (no buffering) with compact spacing + // TODO: proper display for subagent notifications if interactive { let _ = progress_bars.hide(); println!("{}", console::style(&formatted_message).green().dim()); diff --git a/crates/goose-cli/src/session/output.rs b/crates/goose-cli/src/session/output.rs index 06435bd3..b3305821 100644 --- a/crates/goose-cli/src/session/output.rs +++ b/crates/goose-cli/src/session/output.rs @@ -463,8 +463,26 @@ fn print_params(value: &Value, depth: usize, debug: bool) { } } Value::String(s) => { - if !debug && s.len() > get_tool_params_max_length() { - println!("{}{}: {}", indent, style(key).dim(), style("...").dim()); + // Special handling for text_instruction to show more content + let max_length = if key == "text_instruction" { + 200 // Allow longer display for text instructions + } else { + get_tool_params_max_length() + }; + + if !debug && s.len() > max_length { + // For text instructions, show a preview instead of just "..." + if key == "text_instruction" { + let preview = &s[..max_length.saturating_sub(3)]; + println!( + "{}{}: {}", + indent, + style(key).dim(), + style(format!("{}...", preview)).green() + ); + } else { + println!("{}{}: {}", indent, style(key).dim(), style("...").dim()); + } } else { println!("{}{}: {}", indent, style(key).dim(), style(s).green()); } diff --git a/crates/goose-cli/src/session/task_execution_display/mod.rs b/crates/goose-cli/src/session/task_execution_display/mod.rs index ec6c41ff..b0b208ed 100644 --- a/crates/goose-cli/src/session/task_execution_display/mod.rs +++ b/crates/goose-cli/src/session/task_execution_display/mod.rs @@ -1,5 +1,5 @@ -use goose::agents::sub_recipe_execution_tool::lib::TaskStatus; -use goose::agents::sub_recipe_execution_tool::notification_events::{ +use goose::agents::subagent_execution_tool::lib::TaskStatus; +use goose::agents::subagent_execution_tool::notification_events::{ TaskExecutionNotificationEvent, TaskInfo, }; use serde_json::Value; diff --git a/crates/goose-cli/src/session/task_execution_display/tests.rs b/crates/goose-cli/src/session/task_execution_display/tests.rs index fb532850..725d161d 100644 --- a/crates/goose-cli/src/session/task_execution_display/tests.rs +++ b/crates/goose-cli/src/session/task_execution_display/tests.rs @@ -1,5 +1,5 @@ use super::*; -use goose::agents::sub_recipe_execution_tool::notification_events::{ +use goose::agents::subagent_execution_tool::notification_events::{ FailedTaskInfo, TaskCompletionStats, TaskExecutionStats, }; use serde_json::json; diff --git a/crates/goose/src/agents/agent.rs b/crates/goose/src/agents/agent.rs index 2ef7a183..2cf03708 100644 --- a/crates/goose/src/agents/agent.rs +++ b/crates/goose/src/agents/agent.rs @@ -9,11 +9,14 @@ use futures::{stream, FutureExt, Stream, StreamExt, TryStreamExt}; use mcp_core::protocol::JsonRpcMessage; use crate::agents::final_output_tool::{FINAL_OUTPUT_CONTINUATION_MESSAGE, FINAL_OUTPUT_TOOL_NAME}; -use crate::agents::sub_recipe_execution_tool::sub_recipe_execute_task_tool::{ - self, SUB_RECIPE_EXECUTE_TASK_TOOL_NAME, +use crate::agents::recipe_tools::dynamic_task_tools::{ + create_dynamic_task, create_dynamic_task_tool, DYNAMIC_TASK_TOOL_NAME_PREFIX, }; -use crate::agents::sub_recipe_execution_tool::tasks_manager::TasksManager; use crate::agents::sub_recipe_manager::SubRecipeManager; +use crate::agents::subagent_execution_tool::subagent_execute_task_tool::{ + self, SUBAGENT_EXECUTE_TASK_TOOL_NAME, +}; +use crate::agents::subagent_execution_tool::tasks_manager::TasksManager; use crate::config::{Config, ExtensionConfigManager, PermissionManager}; use crate::message::{push_message, Message}; use crate::permission::permission_judge::check_tool_permissions; @@ -48,21 +51,18 @@ use mcp_core::{ prompt::Prompt, protocol::GetPromptResult, tool::Tool, Content, ToolError, ToolResult, }; -use crate::agents::subagent_tools::SUBAGENT_RUN_TASK_TOOL_NAME; - use super::final_output_tool::FinalOutputTool; use super::platform_tools; use super::router_tools; -use super::subagent_manager::SubAgentManager; -use super::subagent_tools; use super::tool_execution::{ToolCallResult, CHAT_MODE_TOOL_SKIPPED_RESPONSE, DECLINED_RESPONSE}; +use crate::agents::subagent_task_config::TaskConfig; const DEFAULT_MAX_TURNS: u32 = 1000; /// The main goose Agent pub struct Agent { pub(super) provider: Mutex>>, - pub(super) extension_manager: RwLock, + pub(super) extension_manager: Arc>, pub(super) sub_recipe_manager: Mutex, pub(super) tasks_manager: TasksManager, pub(super) final_output_tool: Mutex>, @@ -76,7 +76,7 @@ pub struct Agent { pub(super) tool_monitor: Mutex>, pub(super) router_tool_selector: Mutex>>>, pub(super) scheduler_service: Mutex>>, - pub(super) subagent_manager: Mutex>, + pub(super) mcp_tx: Mutex>, pub(super) mcp_notification_rx: Arc>>, } @@ -137,7 +137,7 @@ impl Agent { Self { provider: Mutex::new(None), - extension_manager: RwLock::new(ExtensionManager::new()), + extension_manager: Arc::new(RwLock::new(ExtensionManager::new())), sub_recipe_manager: Mutex::new(SubRecipeManager::new()), tasks_manager: TasksManager::new(), final_output_tool: Mutex::new(None), @@ -152,7 +152,7 @@ impl Agent { router_tool_selector: Mutex::new(None), scheduler_service: Mutex::new(None), // Initialize with MCP notification support - subagent_manager: Mutex::new(Some(SubAgentManager::new(mcp_tx))), + mcp_tx: Mutex::new(mcp_tx), mcp_notification_rx: Arc::new(Mutex::new(mcp_rx)), } } @@ -300,12 +300,20 @@ impl Agent { &self.tasks_manager, ) .await - } else if tool_call.name == SUB_RECIPE_EXECUTE_TASK_TOOL_NAME { - sub_recipe_execute_task_tool::run_tasks( + } else if tool_call.name == SUBAGENT_EXECUTE_TASK_TOOL_NAME { + let provider = self.provider().await.ok(); + let mcp_tx = self.mcp_tx.lock().await.clone(); + + let task_config = + TaskConfig::new(provider, Some(Arc::clone(&self.extension_manager)), mcp_tx); + subagent_execute_task_tool::run_tasks( tool_call.arguments.clone(), + task_config, &self.tasks_manager, ) .await + } else if tool_call.name == DYNAMIC_TASK_TOOL_NAME_PREFIX { + create_dynamic_task(tool_call.arguments.clone(), &self.tasks_manager).await } else if tool_call.name == PLATFORM_READ_RESOURCE_TOOL_NAME { // Check if the tool is read_resource and handle it separately ToolCallResult::from( @@ -321,11 +329,6 @@ impl Agent { ) } else if tool_call.name == PLATFORM_SEARCH_AVAILABLE_EXTENSIONS_TOOL_NAME { ToolCallResult::from(extension_manager.search_available_extensions().await) - } else if tool_call.name == SUBAGENT_RUN_TASK_TOOL_NAME { - ToolCallResult::from( - self.handle_run_subagent_task(tool_call.arguments.clone()) - .await, - ) } else if self.is_frontend_tool(&tool_call.name).await { // For frontend tools, return an error indicating we need frontend execution ToolCallResult::from(Err(ToolError::ExecutionError( @@ -567,11 +570,8 @@ impl Agent { platform_tools::manage_schedule_tool(), ]); - // Add subagent tool (only if ALPHA_FEATURES is enabled) - let config = Config::global(); - if config.get_param::("ALPHA_FEATURES").unwrap_or(false) { - prefixed_tools.push(subagent_tools::run_task_subagent_tool()); - } + // Dynamic task tool + prefixed_tools.push(create_dynamic_task_tool()); // Add resource tools if supported if extension_manager.supports_resources() { @@ -589,8 +589,7 @@ impl Agent { if let Some(final_output_tool) = self.final_output_tool.lock().await.as_ref() { prefixed_tools.push(final_output_tool.tool()); } - prefixed_tools - .push(sub_recipe_execute_task_tool::create_sub_recipe_execute_task_tool()); + prefixed_tools.push(subagent_execute_task_tool::create_subagent_execute_task_tool()); } prefixed_tools @@ -1074,15 +1073,6 @@ impl Agent { let mut current_provider = self.provider.lock().await; *current_provider = Some(provider.clone()); - // Initialize subagent manager with MCP notification support - // Need to recreate the MCP channel since we're replacing the manager - let (mcp_tx, mcp_rx) = mpsc::channel(100); - { - let mut rx_guard = self.mcp_notification_rx.lock().await; - *rx_guard = mcp_rx; - } - *self.subagent_manager.lock().await = Some(SubAgentManager::new(mcp_tx)); - self.update_router_tool_selector(Some(provider), None) .await?; Ok(()) diff --git a/crates/goose/src/agents/mod.rs b/crates/goose/src/agents/mod.rs index 353e57ac..ffcc2b9c 100644 --- a/crates/goose/src/agents/mod.rs +++ b/crates/goose/src/agents/mod.rs @@ -11,13 +11,11 @@ mod reply_parts; mod router_tool_selector; mod router_tools; mod schedule_tool; -pub mod sub_recipe_execution_tool; pub mod sub_recipe_manager; pub mod subagent; +pub mod subagent_execution_tool; pub mod subagent_handler; -pub mod subagent_manager; -pub mod subagent_tools; -pub mod subagent_types; +mod subagent_task_config; mod tool_execution; mod tool_router_index_manager; pub(crate) mod tool_vectordb; @@ -27,7 +25,6 @@ pub use agent::{Agent, AgentEvent}; pub use extension::ExtensionConfig; pub use extension_manager::ExtensionManager; pub use prompt_manager::PromptManager; -pub use subagent::{SubAgent, SubAgentConfig, SubAgentProgress, SubAgentStatus}; -pub use subagent_manager::SubAgentManager; -pub use subagent_types::SpawnSubAgentArgs; +pub use subagent::{SubAgent, SubAgentProgress, SubAgentStatus}; +pub use subagent_task_config::TaskConfig; pub use types::{FrontendTool, SessionConfig}; diff --git a/crates/goose/src/agents/recipe_tools/dynamic_task_tools.rs b/crates/goose/src/agents/recipe_tools/dynamic_task_tools.rs new file mode 100644 index 00000000..449ea04d --- /dev/null +++ b/crates/goose/src/agents/recipe_tools/dynamic_task_tools.rs @@ -0,0 +1,147 @@ +// ======================================= +// Module: Dynamic Task Tools +// Handles creation of tasks dynamically without sub-recipes +// ======================================= +use crate::agents::subagent_execution_tool::tasks_manager::TasksManager; +use crate::agents::subagent_execution_tool::{lib::ExecutionMode, task_types::Task}; +use crate::agents::tool_execution::ToolCallResult; +use mcp_core::{tool::ToolAnnotations, Content, Tool, ToolError}; +use serde_json::{json, Value}; + +pub const DYNAMIC_TASK_TOOL_NAME_PREFIX: &str = "dynamic_task__create_task"; + +pub fn create_dynamic_task_tool() -> Tool { + Tool::new( + DYNAMIC_TASK_TOOL_NAME_PREFIX.to_string(), + "Use this tool to create one or more dynamic tasks from a shared text instruction and varying parameters.\ + How it works: + - Provide a single text instruction + - Use the 'task_parameters' field to pass an array of parameter sets + - Each resulting task will use the same instruction with different parameter values + This is useful when performing the same operation across many inputs (e.g., getting weather for multiple cities, searching multiple slack channels, iterating through various linear tickets, etc). + Once created, these tasks should be passed to the 'subagent__execute_task' tool for execution. Tasks can run sequentially or in parallel. + --- + What is a 'subagent'? + A 'subagent' is a stateless sub-process that executes a single task independently. Use subagents when: + - You want to parallelize similar work across different inputs + - You are not sure your search or operation will succeed on the first try + Each subagent receives a task with a defined payload and returns a result, which is not visible to the user unless explicitly summarized by the system. + --- + Examples of 'task_parameters' for a single task: + text_instruction: Search for the config file in the root directory. + Examples of 'task_parameters' for multiple tasks: + text_instruction: Get weather for Melbourne. + timeout_seconds: 300 + text_instruction: Get weather for Los Angeles. + timeout_seconds: 300 + text_instruction: Get weather for San Francisco. + timeout_seconds: 300 + ".to_string(), + json!({ + "type": "object", + "properties": { + "task_parameters": { + "type": "array", + "description": "Array of parameter sets for creating tasks. \ + For a single task, provide an array with one element. \ + For multiple tasks, provide an array with multiple elements, each with different parameter values. \ + If there is no parameter set, provide an empty array.", + "items": { + "type": "object", + "properties": { + "text_instruction": { + "type": "string", + "description": "The text instruction to execute" + }, + "timeout_seconds": { + "type": "integer", + "description": "Optional timeout for the task in seconds (default: 300)", + "minimum": 1 + } + }, + "required": ["text_instruction"] + } + } + } + }), + Some(ToolAnnotations { + title: Some("Dynamic Task Creation".to_string()), + read_only_hint: false, + destructive_hint: true, + idempotent_hint: false, + open_world_hint: true, + }), + ) +} + +fn extract_task_parameters(params: &Value) -> Vec { + params + .get("task_parameters") + .and_then(|v| v.as_array()) + .cloned() + .unwrap_or_default() +} + +fn create_text_instruction_tasks_from_params(task_params: &[Value]) -> Vec { + task_params + .iter() + .map(|task_param| { + let text_instruction = task_param + .get("text_instruction") + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + + let payload = json!({ + "text_instruction": text_instruction + }); + + Task { + id: uuid::Uuid::new_v4().to_string(), + task_type: "text_instruction".to_string(), + payload, + } + }) + .collect() +} + +fn create_task_execution_payload(tasks: Vec, execution_mode: ExecutionMode) -> Value { + let task_ids: Vec = tasks.iter().map(|task| task.id.clone()).collect(); + json!({ + "task_ids": task_ids, + "execution_mode": execution_mode + }) +} + +pub async fn create_dynamic_task(params: Value, tasks_manager: &TasksManager) -> ToolCallResult { + let task_params_array = extract_task_parameters(¶ms); + + if task_params_array.is_empty() { + return ToolCallResult::from(Err(ToolError::ExecutionError( + "No task parameters provided".to_string(), + ))); + } + + let tasks = create_text_instruction_tasks_from_params(&task_params_array); + + // Use parallel execution if there are multiple tasks, sequential for single task + let execution_mode = if tasks.len() > 1 { + ExecutionMode::Parallel + } else { + ExecutionMode::Sequential + }; + + let task_execution_payload = create_task_execution_payload(tasks.clone(), execution_mode); + + let tasks_json = match serde_json::to_string(&task_execution_payload) { + Ok(json) => json, + Err(e) => { + return ToolCallResult::from(Err(ToolError::ExecutionError(format!( + "Failed to serialize task list: {}", + e + )))) + } + }; + tasks_manager.save_tasks(tasks.clone()).await; + ToolCallResult::from(Ok(vec![Content::text(tasks_json)])) +} diff --git a/crates/goose/src/agents/recipe_tools/mod.rs b/crates/goose/src/agents/recipe_tools/mod.rs index 90603c88..6e6f28a8 100644 --- a/crates/goose/src/agents/recipe_tools/mod.rs +++ b/crates/goose/src/agents/recipe_tools/mod.rs @@ -1,2 +1,3 @@ +pub mod dynamic_task_tools; pub mod param_utils; pub mod sub_recipe_tools; diff --git a/crates/goose/src/agents/recipe_tools/sub_recipe_tools.rs b/crates/goose/src/agents/recipe_tools/sub_recipe_tools.rs index 810c4a60..66b89ea3 100644 --- a/crates/goose/src/agents/recipe_tools/sub_recipe_tools.rs +++ b/crates/goose/src/agents/recipe_tools/sub_recipe_tools.rs @@ -5,8 +5,8 @@ use anyhow::Result; use mcp_core::tool::{Tool, ToolAnnotations}; use serde_json::{json, Map, Value}; -use crate::agents::sub_recipe_execution_tool::lib::{ExecutionMode, Task}; -use crate::agents::sub_recipe_execution_tool::tasks_manager::TasksManager; +use crate::agents::subagent_execution_tool::lib::{ExecutionMode, Task}; +use crate::agents::subagent_execution_tool::tasks_manager::TasksManager; use crate::recipe::{Recipe, RecipeParameter, RecipeParameterRequirement, SubRecipe}; use super::param_utils::prepare_command_params; diff --git a/crates/goose/src/agents/sub_recipe_execution_tool/mod.rs b/crates/goose/src/agents/sub_recipe_execution_tool/mod.rs index 0b7af3b5..49fcc194 100644 --- a/crates/goose/src/agents/sub_recipe_execution_tool/mod.rs +++ b/crates/goose/src/agents/sub_recipe_execution_tool/mod.rs @@ -8,3 +8,4 @@ mod tasks; pub mod tasks_manager; pub mod utils; mod workers; + diff --git a/crates/goose/src/agents/sub_recipe_execution_tool/tasks.rs b/crates/goose/src/agents/sub_recipe_execution_tool/tasks.rs index a3ab4140..66f67729 100644 --- a/crates/goose/src/agents/sub_recipe_execution_tool/tasks.rs +++ b/crates/goose/src/agents/sub_recipe_execution_tool/tasks.rs @@ -183,3 +183,4 @@ fn process_output(stdout_output: String) -> Result { Ok(Value::String(stdout_output)) } } + diff --git a/crates/goose/src/agents/sub_recipe_execution_tool/workers.rs b/crates/goose/src/agents/sub_recipe_execution_tool/workers.rs index fefbf0eb..89473f7c 100644 --- a/crates/goose/src/agents/sub_recipe_execution_tool/workers.rs +++ b/crates/goose/src/agents/sub_recipe_execution_tool/workers.rs @@ -28,3 +28,4 @@ async fn worker_loop(state: Arc, _worker_id: usize) { state.decrement_active_workers(); } + diff --git a/crates/goose/src/agents/sub_recipe_manager.rs b/crates/goose/src/agents/sub_recipe_manager.rs index 33229b97..891c3c9b 100644 --- a/crates/goose/src/agents/sub_recipe_manager.rs +++ b/crates/goose/src/agents/sub_recipe_manager.rs @@ -7,7 +7,7 @@ use crate::{ recipe_tools::sub_recipe_tools::{ create_sub_recipe_task, create_sub_recipe_task_tool, SUB_RECIPE_TASK_TOOL_NAME_PREFIX, }, - sub_recipe_execution_tool::tasks_manager::TasksManager, + subagent_execution_tool::tasks_manager::TasksManager, tool_execution::ToolCallResult, }, recipe::SubRecipe, diff --git a/crates/goose/src/agents/subagent.rs b/crates/goose/src/agents/subagent.rs index 0a02e2d1..030c7877 100644 --- a/crates/goose/src/agents/subagent.rs +++ b/crates/goose/src/agents/subagent.rs @@ -1,27 +1,18 @@ use crate::{ - agents::{extension_manager::ExtensionManager, Agent}, + agents::{Agent, TaskConfig}, message::{Message, MessageContent, ToolRequest}, prompt_template::render_global_file, - providers::base::Provider, providers::errors::ProviderError, - recipe::Recipe, }; use anyhow::anyhow; use chrono::{DateTime, Utc}; use mcp_core::protocol::{JsonRpcMessage, JsonRpcNotification}; -use mcp_core::{handler::ToolError, role::Role, tool::Tool}; +use mcp_core::{handler::ToolError, tool::Tool}; use serde::{Deserialize, Serialize}; use serde_json::{self, json}; use std::{collections::HashMap, sync::Arc}; -use tokio::sync::{mpsc, Mutex, RwLock}; +use tokio::sync::{Mutex, RwLock}; use tracing::{debug, error, instrument}; -use uuid::Uuid; - -use crate::agents::platform_tools::{ - self, PLATFORM_LIST_RESOURCES_TOOL_NAME, PLATFORM_READ_RESOURCE_TOOL_NAME, - PLATFORM_SEARCH_AVAILABLE_EXTENSIONS_TOOL_NAME, -}; -use crate::agents::subagent_tools::SUBAGENT_RUN_TASK_TOOL_NAME; /// Status of a subagent #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] @@ -32,48 +23,6 @@ pub enum SubAgentStatus { Terminated, // Manually terminated } -/// Configuration for a subagent -#[derive(Debug)] -pub struct SubAgentConfig { - pub id: String, - pub recipe: Option, - pub instructions: Option, - pub max_turns: Option, - pub timeout_seconds: Option, -} - -impl SubAgentConfig { - pub fn new_with_recipe(recipe: Recipe) -> Self { - Self { - id: Uuid::new_v4().to_string(), - recipe: Some(recipe), - instructions: None, - max_turns: None, - timeout_seconds: None, - } - } - - pub fn new_with_instructions(instructions: String) -> Self { - Self { - id: Uuid::new_v4().to_string(), - recipe: None, - instructions: Some(instructions), - max_turns: None, - timeout_seconds: None, - } - } - - pub fn with_max_turns(mut self, max_turns: usize) -> Self { - self.max_turns = Some(max_turns); - self - } - - pub fn with_timeout(mut self, timeout_seconds: u64) -> Self { - self.timeout_seconds = Some(timeout_seconds); - self - } -} - /// Progress information for a subagent #[derive(Debug, Clone, Serialize, Deserialize)] pub struct SubAgentProgress { @@ -90,58 +39,26 @@ pub struct SubAgent { pub id: String, pub conversation: Arc>>, pub status: Arc>, - pub config: SubAgentConfig, + pub config: TaskConfig, pub turn_count: Arc>, pub created_at: DateTime, - pub recipe_extensions: Arc>>, - pub missing_extensions: Arc>>, // Track extensions that weren't enabled - pub mcp_notification_tx: mpsc::Sender, // For MCP notifications } impl SubAgent { /// Create a new subagent with the given configuration and provider - #[instrument(skip(config, _provider, extension_manager, mcp_notification_tx))] + #[instrument(skip(task_config))] pub async fn new( - config: SubAgentConfig, - _provider: Arc, - extension_manager: Arc>, - mcp_notification_tx: mpsc::Sender, + task_config: TaskConfig, ) -> Result<(Arc, tokio::task::JoinHandle<()>), anyhow::Error> { - debug!("Creating new subagent with id: {}", config.id); - - let mut missing_extensions = Vec::new(); - let mut recipe_extensions = Vec::new(); - - // Check if extensions from recipe exist in the extension manager - if let Some(recipe) = &config.recipe { - if let Some(extensions) = &recipe.extensions { - for extension in extensions { - let extension_name = extension.name(); - let existing_extensions = extension_manager.list_extensions().await?; - - if !existing_extensions.contains(&extension_name) { - missing_extensions.push(extension_name); - } else { - recipe_extensions.push(extension_name); - } - } - } - } else { - // If no recipe, inherit all extensions from the parent agent - let existing_extensions = extension_manager.list_extensions().await?; - recipe_extensions = existing_extensions; - } + debug!("Creating new subagent with id: {}", task_config.id); let subagent = Arc::new(SubAgent { - id: config.id.clone(), + id: task_config.id.clone(), conversation: Arc::new(Mutex::new(Vec::new())), status: Arc::new(RwLock::new(SubAgentStatus::Ready)), - config, + config: task_config, turn_count: Arc::new(Mutex::new(0)), created_at: Utc::now(), - recipe_extensions: Arc::new(Mutex::new(recipe_extensions)), - missing_extensions: Arc::new(Mutex::new(missing_extensions)), - mcp_notification_tx, }); // Send initial MCP notification @@ -209,7 +126,7 @@ impl SubAgent { })), }); - if let Err(e) = self.mcp_notification_tx.send(notification).await { + if let Err(e) = self.config.mcp_tx.send(notification).await { error!( "Failed to send MCP notification from subagent {}: {}", self.id, e @@ -238,17 +155,29 @@ impl SubAgent { } /// Process a message and generate a response using the subagent's provider - #[instrument(skip(self, message, provider, extension_manager))] + #[instrument(skip(self, message))] pub async fn reply_subagent( &self, message: String, - provider: Arc, - extension_manager: Arc>, + task_config: TaskConfig, ) -> Result { debug!("Processing message for subagent {}", self.id); self.send_mcp_notification("message_processing", &format!("Processing: {}", message)) .await; + // Get provider and extension manager from task config + let provider = self + .config + .provider + .as_ref() + .ok_or_else(|| anyhow!("No provider configured for subagent"))?; + + let extension_manager = self + .config + .extension_manager + .as_ref() + .ok_or_else(|| anyhow!("No extension manager configured for subagent"))?; + // Check if we've exceeded max turns { let turn_count = *self.turn_count.lock().await; @@ -288,80 +217,12 @@ impl SubAgent { let mut messages = self.get_conversation().await; // Get tools based on whether we're using a recipe or inheriting from parent - let tools: Vec = if self.config.recipe.is_some() { - // Recipe mode: only get tools from the recipe's extensions - let recipe_extensions = self.recipe_extensions.lock().await; - let mut recipe_tools = Vec::new(); - - debug!( - "Subagent {} operating in recipe mode with {} extensions", - self.id, - recipe_extensions.len() - ); - - for extension_name in recipe_extensions.iter() { - match extension_manager - .get_prefixed_tools(Some(extension_name.clone())) - .await - { - Ok(mut ext_tools) => { - debug!( - "Added {} tools from extension {}", - ext_tools.len(), - extension_name - ); - recipe_tools.append(&mut ext_tools); - } - Err(e) => { - debug!( - "Failed to get tools for extension {}: {}", - extension_name, e - ); - } - } - } - - debug!( - "Subagent {} has {} total recipe tools before filtering", - self.id, - recipe_tools.len() - ); - // Filter out subagent tools from recipe tools - let mut filtered_tools = Self::filter_subagent_tools(recipe_tools); - - // Add platform tools (except subagent tools) - Self::add_platform_tools(&mut filtered_tools, &extension_manager).await; - - debug!( - "Subagent {} has {} tools after filtering and adding platform tools", - self.id, - filtered_tools.len() - ); - filtered_tools - } else { - // No recipe: inherit all tools from parent (but filter out subagent tools) - debug!( - "Subagent {} operating in inheritance mode, using all parent tools", - self.id - ); - let parent_tools = extension_manager.get_prefixed_tools(None).await?; - debug!( - "Subagent {} has {} parent tools before filtering", - self.id, - parent_tools.len() - ); - let mut filtered_tools = Self::filter_subagent_tools(parent_tools); - - // Add platform tools (except subagent tools) - Self::add_platform_tools(&mut filtered_tools, &extension_manager).await; - - debug!( - "Subagent {} has {} tools after filtering and adding platform tools", - self.id, - filtered_tools.len() - ); - filtered_tools - }; + let tools: Vec = extension_manager + .read() + .await + .get_prefixed_tools(None) + .await + .unwrap_or_default(); let toolshim_tools: Vec = vec![]; @@ -371,7 +232,7 @@ impl SubAgent { // Generate response from provider loop { match Agent::generate_response_from_provider( - Arc::clone(&provider), + Arc::clone(provider), &system_prompt, &messages, &tools, @@ -427,20 +288,14 @@ impl SubAgent { .await; // Handle platform tools or dispatch to extension manager - let tool_result = if self.is_platform_tool(&tool_call.name) { - self.handle_platform_tool_call( - tool_call.clone(), - &extension_manager, - ) + let tool_result = match extension_manager + .read() .await - } else { - match extension_manager - .dispatch_tool_call(tool_call.clone()) - .await - { - Ok(result) => result.result.await, - Err(e) => Err(ToolError::ExecutionError(e.to_string())), - } + .dispatch_tool_call(tool_call.clone()) + .await + { + Ok(result) => result.result.await, + Err(e) => Err(ToolError::ExecutionError(e.to_string())), }; match tool_result { @@ -529,142 +384,10 @@ impl SubAgent { Ok(()) } - /// Get formatted conversation for display - pub async fn get_formatted_conversation(&self) -> String { - let conversation = self.conversation.lock().await; - - let mut formatted = format!("=== Subagent {} Conversation ===\n", self.id); - - if let Some(recipe) = &self.config.recipe { - formatted.push_str(&format!("Recipe: {}\n", recipe.title)); - } else if let Some(instructions) = &self.config.instructions { - formatted.push_str(&format!("Instructions: {}\n", instructions)); - } else { - formatted.push_str("Mode: Ad-hoc subagent\n"); - } - - formatted.push_str(&format!( - "Created: {}\n", - self.created_at.format("%Y-%m-%d %H:%M:%S UTC") - )); - - let progress = self.get_progress().await; - - formatted.push_str(&format!("Status: {:?}\n", progress.status)); - formatted.push_str(&format!("Turn: {}", progress.turn)); - if let Some(max_turns) = progress.max_turns { - formatted.push_str(&format!("/{}", max_turns)); - } - formatted.push_str("\n\n"); - - for (i, message) in conversation.iter().enumerate() { - formatted.push_str(&format!( - "{}. {}: {}\n", - i + 1, - match message.role { - Role::User => "User", - Role::Assistant => "Assistant", - }, - message.as_concat_text() - )); - } - - formatted.push_str("=== End Conversation ===\n"); - - formatted - } - - /// Get the list of extensions that weren't enabled - pub async fn get_missing_extensions(&self) -> Vec { - self.missing_extensions.lock().await.clone() - } - /// Filter out subagent spawning tools to prevent infinite recursion - fn filter_subagent_tools(tools: Vec) -> Vec { - let original_count = tools.len(); - let filtered_tools: Vec = tools - .into_iter() - .filter(|tool| { - let should_keep = tool.name != SUBAGENT_RUN_TASK_TOOL_NAME; - if !should_keep { - debug!("Filtering out subagent tool: {}", tool.name); - } - should_keep - }) - .collect(); - - let filtered_count = filtered_tools.len(); - if filtered_count < original_count { - debug!( - "Filtered {} subagent tool(s) from {} total tools", - original_count - filtered_count, - original_count - ); - } - - filtered_tools - } - - /// Add platform tools to the subagent's tool list (excluding dangerous tools) - async fn add_platform_tools(tools: &mut Vec, extension_manager: &ExtensionManager) { - debug!("Adding safe platform tools to subagent"); - - // Add safe platform tools - subagents can search for extensions but can't manage them or schedules - tools.push(platform_tools::search_available_extensions_tool()); - debug!("Added search_available_extensions tool"); - - // Add resource tools if supported - these are generally safe for subagents - if extension_manager.supports_resources() { - tools.extend([ - platform_tools::read_resource_tool(), - platform_tools::list_resources_tool(), - ]); - debug!("Added 2 resource platform tools"); - } - - // Note: We explicitly do NOT add these tools for security reasons: - // - manage_extensions (could interfere with parent agent's extensions) - // - manage_schedule (could interfere with parent agent's scheduling) - // - subagent spawning tools (prevent recursion) - debug!("Platform tools added successfully (dangerous tools excluded)"); - } - - /// Check if a tool name is a platform tool that subagents can use - fn is_platform_tool(&self, tool_name: &str) -> bool { - matches!( - tool_name, - PLATFORM_SEARCH_AVAILABLE_EXTENSIONS_TOOL_NAME - | PLATFORM_READ_RESOURCE_TOOL_NAME - | PLATFORM_LIST_RESOURCES_TOOL_NAME - ) - } - - /// Handle platform tool calls that are safe for subagents - async fn handle_platform_tool_call( - &self, - tool_call: mcp_core::tool::ToolCall, - extension_manager: &ExtensionManager, - ) -> Result, ToolError> { - debug!("Handling platform tool: {}", tool_call.name); - - match tool_call.name.as_str() { - PLATFORM_SEARCH_AVAILABLE_EXTENSIONS_TOOL_NAME => extension_manager - .search_available_extensions() - .await - .map_err(|e| ToolError::ExecutionError(e.to_string())), - PLATFORM_READ_RESOURCE_TOOL_NAME => extension_manager - .read_resource(tool_call.arguments) - .await - .map_err(|e| ToolError::ExecutionError(e.to_string())), - PLATFORM_LIST_RESOURCES_TOOL_NAME => extension_manager - .list_resources(tool_call.arguments) - .await - .map_err(|e| ToolError::ExecutionError(e.to_string())), - _ => Err(ToolError::ExecutionError(format!( - "Platform tool '{}' is not available to subagents for security reasons", - tool_call.name - ))), - } + fn _filter_subagent_tools(tools: Vec) -> Vec { + // TODO: add this in subagent loop + tools } /// Build the system prompt for the subagent using the template @@ -678,14 +401,6 @@ impl SubAgent { ); context.insert("subagent_id", serde_json::Value::String(self.id.clone())); - // Add recipe information if available - if let Some(recipe) = &self.config.recipe { - context.insert( - "recipe_title", - serde_json::Value::String(recipe.title.clone()), - ); - } - // Add max turns if configured if let Some(max_turns) = self.config.max_turns { context.insert( @@ -694,33 +409,6 @@ impl SubAgent { ); } - // Add task instructions - let instructions = if let Some(recipe) = &self.config.recipe { - recipe.instructions.as_deref().unwrap_or("") - } else { - self.config.instructions.as_deref().unwrap_or("") - }; - context.insert( - "task_instructions", - serde_json::Value::String(instructions.to_string()), - ); - - // Add available extensions (only if we have a recipe and extensions) - if self.config.recipe.is_some() { - let extensions: Vec = self.recipe_extensions.lock().await.clone(); - if !extensions.is_empty() { - context.insert( - "extensions", - serde_json::Value::Array( - extensions - .into_iter() - .map(serde_json::Value::String) - .collect(), - ), - ); - } - } - // Add available tools with descriptions for better context let tools_with_descriptions: Vec = available_tools .iter() diff --git a/crates/goose/src/agents/sub_recipe_execution_tool/executor/mod.rs b/crates/goose/src/agents/subagent_execution_tool/executor/mod.rs similarity index 83% rename from crates/goose/src/agents/sub_recipe_execution_tool/executor/mod.rs rename to crates/goose/src/agents/subagent_execution_tool/executor/mod.rs index bb73b73e..9a71ad4a 100644 --- a/crates/goose/src/agents/sub_recipe_execution_tool/executor/mod.rs +++ b/crates/goose/src/agents/subagent_execution_tool/executor/mod.rs @@ -1,27 +1,25 @@ +use crate::agents::subagent_execution_tool::lib::{ + ExecutionResponse, ExecutionStats, SharedState, Task, TaskResult, TaskStatus, +}; +use crate::agents::subagent_execution_tool::task_execution_tracker::{ + DisplayMode, TaskExecutionTracker, +}; +use crate::agents::subagent_execution_tool::tasks::process_task; +use crate::agents::subagent_execution_tool::workers::spawn_worker; +use crate::agents::subagent_task_config::TaskConfig; use mcp_core::protocol::JsonRpcMessage; use std::sync::atomic::AtomicUsize; use std::sync::Arc; use tokio::sync::mpsc; use tokio::time::Instant; -use crate::agents::sub_recipe_execution_tool::lib::{ - ExecutionResponse, ExecutionStats, SharedState, Task, TaskResult, TaskStatus, -}; -use crate::agents::sub_recipe_execution_tool::task_execution_tracker::{ - DisplayMode, TaskExecutionTracker, -}; -use crate::agents::sub_recipe_execution_tool::tasks::process_task; -use crate::agents::sub_recipe_execution_tool::workers::spawn_worker; - -#[cfg(test)] -mod tests; - const EXECUTION_STATUS_COMPLETED: &str = "completed"; const DEFAULT_MAX_WORKERS: usize = 10; pub async fn execute_single_task( task: &Task, notifier: mpsc::Sender, + task_config: TaskConfig, ) -> ExecutionResponse { let start_time = Instant::now(); let task_execution_tracker = Arc::new(TaskExecutionTracker::new( @@ -29,7 +27,13 @@ pub async fn execute_single_task( DisplayMode::SingleTaskOutput, notifier, )); - let result = process_task(task, task_execution_tracker).await; + let result = process_task(task, task_execution_tracker.clone(), task_config).await; + + // Complete the task in the tracker + task_execution_tracker + .complete_task(&result.task_id, result.clone()) + .await; + let execution_time = start_time.elapsed().as_millis(); let stats = calculate_stats(&[result.clone()], execution_time); @@ -43,6 +47,7 @@ pub async fn execute_single_task( pub async fn execute_tasks_in_parallel( tasks: Vec, notifier: mpsc::Sender, + task_config: TaskConfig, ) -> ExecutionResponse { let task_execution_tracker = Arc::new(TaskExecutionTracker::new( tasks.clone(), @@ -70,7 +75,7 @@ pub async fn execute_tasks_in_parallel( let worker_count = std::cmp::min(task_count, DEFAULT_MAX_WORKERS); let mut worker_handles = Vec::new(); for i in 0..worker_count { - let handle = spawn_worker(shared_state.clone(), i); + let handle = spawn_worker(shared_state.clone(), i, task_config.clone()); worker_handles.push(handle); } @@ -163,17 +168,25 @@ fn create_empty_response() -> ExecutionResponse { }, } } - async fn collect_results( result_rx: &mut mpsc::Receiver, task_execution_tracker: Arc, expected_count: usize, ) -> Vec { let mut results = Vec::new(); - while let Some(result) = result_rx.recv().await { + while let Some(mut result) = result_rx.recv().await { + // Truncate data to 650 chars if needed + if let Some(data) = result.data.as_mut() { + if let Some(data_str) = data.as_str() { + if data_str.len() > 650 { + *data = serde_json::Value::String(format!("{}...", &data_str[..650])); + } + } + } task_execution_tracker .complete_task(&result.task_id, result.clone()) .await; + results.push(result); if results.len() >= expected_count { break; diff --git a/crates/goose/src/agents/sub_recipe_execution_tool/executor/tests.rs b/crates/goose/src/agents/subagent_execution_tool/executor/tests.rs similarity index 100% rename from crates/goose/src/agents/sub_recipe_execution_tool/executor/tests.rs rename to crates/goose/src/agents/subagent_execution_tool/executor/tests.rs diff --git a/crates/goose/src/agents/sub_recipe_execution_tool/lib/mod.rs b/crates/goose/src/agents/subagent_execution_tool/lib/mod.rs similarity index 91% rename from crates/goose/src/agents/sub_recipe_execution_tool/lib/mod.rs rename to crates/goose/src/agents/subagent_execution_tool/lib/mod.rs index 446b6011..81d72888 100644 --- a/crates/goose/src/agents/sub_recipe_execution_tool/lib/mod.rs +++ b/crates/goose/src/agents/subagent_execution_tool/lib/mod.rs @@ -1,14 +1,11 @@ -use crate::agents::sub_recipe_execution_tool::executor::{ - execute_single_task, execute_tasks_in_parallel, -}; -pub use crate::agents::sub_recipe_execution_tool::task_types::{ +pub use crate::agents::subagent_execution_tool::task_types::{ ExecutionMode, ExecutionResponse, ExecutionStats, SharedState, Task, TaskResult, TaskStatus, }; -use crate::agents::sub_recipe_execution_tool::tasks_manager::TasksManager; - -#[cfg(test)] -mod tests; - +use crate::agents::subagent_execution_tool::{ + executor::{execute_single_task, execute_tasks_in_parallel}, + tasks_manager::TasksManager, +}; +use crate::agents::subagent_task_config::TaskConfig; use mcp_core::protocol::JsonRpcMessage; use serde_json::{json, Value}; use tokio::sync::mpsc; @@ -17,6 +14,7 @@ pub async fn execute_tasks( input: Value, execution_mode: ExecutionMode, notifier: mpsc::Sender, + task_config: TaskConfig, tasks_manager: &TasksManager, ) -> Result { let task_ids: Vec = serde_json::from_value( @@ -44,13 +42,12 @@ pub async fn execute_tasks( match execution_mode { ExecutionMode::Sequential => { if task_count == 1 { - let response = execute_single_task(&tasks[0], notifier).await; + let response = execute_single_task(&tasks[0], notifier, task_config).await; handle_response(response) } else { Err("Sequential execution mode requires exactly one task".to_string()) } } - ExecutionMode::Parallel => { if tasks.iter().any(|task| task.get_sequential_when_repeated()) { Ok(json!( @@ -62,7 +59,7 @@ pub async fn execute_tasks( )) } else { let response: ExecutionResponse = - execute_tasks_in_parallel(tasks, notifier.clone()).await; + execute_tasks_in_parallel(tasks, notifier.clone(), task_config).await; handle_response(response) } } diff --git a/crates/goose/src/agents/sub_recipe_execution_tool/lib/tests.rs b/crates/goose/src/agents/subagent_execution_tool/lib/tests.rs similarity index 100% rename from crates/goose/src/agents/sub_recipe_execution_tool/lib/tests.rs rename to crates/goose/src/agents/subagent_execution_tool/lib/tests.rs diff --git a/crates/goose/src/agents/subagent_execution_tool/mod.rs b/crates/goose/src/agents/subagent_execution_tool/mod.rs new file mode 100644 index 00000000..2226e2d7 --- /dev/null +++ b/crates/goose/src/agents/subagent_execution_tool/mod.rs @@ -0,0 +1,10 @@ +mod executor; +pub mod lib; +pub mod notification_events; +pub mod subagent_execute_task_tool; +pub mod task_execution_tracker; +pub mod task_types; +pub mod tasks; +pub mod tasks_manager; +pub mod utils; +pub mod workers; diff --git a/crates/goose/src/agents/sub_recipe_execution_tool/notification_events.rs b/crates/goose/src/agents/subagent_execution_tool/notification_events.rs similarity index 98% rename from crates/goose/src/agents/sub_recipe_execution_tool/notification_events.rs rename to crates/goose/src/agents/subagent_execution_tool/notification_events.rs index 2a6134ea..632cb976 100644 --- a/crates/goose/src/agents/sub_recipe_execution_tool/notification_events.rs +++ b/crates/goose/src/agents/subagent_execution_tool/notification_events.rs @@ -1,4 +1,4 @@ -use crate::agents::sub_recipe_execution_tool::task_types::TaskStatus; +use crate::agents::subagent_execution_tool::task_types::TaskStatus; use serde::{Deserialize, Serialize}; use serde_json::Value; diff --git a/crates/goose/src/agents/sub_recipe_execution_tool/sub_recipe_execute_task_tool.rs b/crates/goose/src/agents/subagent_execution_tool/subagent_execute_task_tool.rs similarity index 82% rename from crates/goose/src/agents/sub_recipe_execution_tool/sub_recipe_execute_task_tool.rs rename to crates/goose/src/agents/subagent_execution_tool/subagent_execute_task_tool.rs index e5f9062f..73e3fa12 100644 --- a/crates/goose/src/agents/sub_recipe_execution_tool/sub_recipe_execute_task_tool.rs +++ b/crates/goose/src/agents/subagent_execution_tool/subagent_execute_task_tool.rs @@ -1,20 +1,21 @@ use mcp_core::{tool::ToolAnnotations, Content, Tool, ToolError}; use serde_json::Value; +use crate::agents::subagent_task_config::TaskConfig; use crate::agents::{ - sub_recipe_execution_tool::lib::execute_tasks, - sub_recipe_execution_tool::task_types::ExecutionMode, - sub_recipe_execution_tool::tasks_manager::TasksManager, tool_execution::ToolCallResult, + subagent_execution_tool::lib::execute_tasks, + subagent_execution_tool::task_types::ExecutionMode, + subagent_execution_tool::tasks_manager::TasksManager, tool_execution::ToolCallResult, }; use mcp_core::protocol::JsonRpcMessage; use tokio::sync::mpsc; use tokio_stream; -pub const SUB_RECIPE_EXECUTE_TASK_TOOL_NAME: &str = "sub_recipe__execute_task"; -pub fn create_sub_recipe_execute_task_tool() -> Tool { +pub const SUBAGENT_EXECUTE_TASK_TOOL_NAME: &str = "subagent__execute_task"; +pub fn create_subagent_execute_task_tool() -> Tool { Tool::new( - SUB_RECIPE_EXECUTE_TASK_TOOL_NAME, - "Only use this tool when you execute sub recipe task. + SUBAGENT_EXECUTE_TASK_TOOL_NAME, + "Only use the subagent__execute_task tool when you execute sub recipe task or dynamic task. EXECUTION STRATEGY DECISION: 1. If the tasks are created with execution_mode, use the execution_mode. 2. Execute tasks sequentially unless user explicitly requests parallel execution. PARALLEL: User uses keywords like 'parallel', 'simultaneously', 'at the same time', 'concurrently' @@ -24,6 +25,7 @@ IMPLEMENTATION: - Parallel execution: Call this tool once, passing an ARRAY of all tasks EXAMPLES: +User Intent Based: - User: 'get weather and tell me a joke' → Sequential (2 separate tool calls, 1 task each) - User: 'get weather and joke in parallel' → Parallel (1 tool call with array of 2 tasks) - User: 'run these simultaneously' → Parallel (1 tool call with task array) @@ -57,7 +59,11 @@ EXAMPLES: ) } -pub async fn run_tasks(execute_data: Value, tasks_manager: &TasksManager) -> ToolCallResult { +pub async fn run_tasks( + execute_data: Value, + task_config: TaskConfig, + tasks_manager: &TasksManager, +) -> ToolCallResult { let (notification_tx, notification_rx) = mpsc::channel::(100); let tasks_manager_clone = tasks_manager.clone(); @@ -72,6 +78,7 @@ pub async fn run_tasks(execute_data: Value, tasks_manager: &TasksManager) -> Too execute_data, execution_mode, notification_tx, + task_config, &tasks_manager_clone, ) .await diff --git a/crates/goose/src/agents/sub_recipe_execution_tool/task_execution_tracker.rs b/crates/goose/src/agents/subagent_execution_tool/task_execution_tracker.rs similarity index 86% rename from crates/goose/src/agents/sub_recipe_execution_tool/task_execution_tracker.rs rename to crates/goose/src/agents/subagent_execution_tool/task_execution_tracker.rs index a906a59a..c720459e 100644 --- a/crates/goose/src/agents/sub_recipe_execution_tool/task_execution_tracker.rs +++ b/crates/goose/src/agents/subagent_execution_tool/task_execution_tracker.rs @@ -5,14 +5,12 @@ use std::sync::Arc; use tokio::sync::{mpsc, RwLock}; use tokio::time::{sleep, Duration, Instant}; -use crate::agents::sub_recipe_execution_tool::notification_events::{ +use crate::agents::subagent_execution_tool::notification_events::{ FailedTaskInfo, TaskCompletionStats, TaskExecutionNotificationEvent, TaskExecutionStats, TaskInfo as EventTaskInfo, }; -use crate::agents::sub_recipe_execution_tool::task_types::{ - Task, TaskInfo, TaskResult, TaskStatus, -}; -use crate::agents::sub_recipe_execution_tool::utils::{count_by_status, get_task_name}; +use crate::agents::subagent_execution_tool::task_types::{Task, TaskInfo, TaskResult, TaskStatus}; +use crate::agents::subagent_execution_tool::utils::{count_by_status, get_task_name}; use serde_json::Value; #[derive(Debug, Clone, PartialEq)] @@ -41,6 +39,18 @@ fn format_task_metadata(task_info: &TaskInfo) -> String { }) .collect::>() .join(",") + } else if task_info.task.task_type == "text_instruction" { + // For text_instruction tasks, extract and display the instruction + if let Some(text_instruction) = task_info.task.get_text_instruction() { + // Truncate long instructions to keep the display clean + if text_instruction.len() > 80 { + format!("instruction={}...", &text_instruction[..77]) + } else { + format!("instruction={}", text_instruction) + } + } else { + String::new() + } } else { String::new() } @@ -113,27 +123,30 @@ impl TaskExecutionTracker { .map(|task_info| task_info.current_output.clone()) } + async fn format_line(&self, task_info: Option<&TaskInfo>, line: &str) -> String { + if let Some(task_info) = task_info { + let task_name = get_task_name(task_info); + let task_type = task_info.task.task_type.clone(); + let metadata = format_task_metadata(task_info); + + if metadata.is_empty() { + format!("[{} ({})] {}", task_name, task_type, line) + } else { + format!("[{} ({}) {}] {}", task_name, task_type, metadata, line) + } + } else { + line.to_string() + } + } + pub async fn send_live_output(&self, task_id: &str, line: &str) { match self.display_mode { DisplayMode::SingleTaskOutput => { let tasks = self.tasks.read().await; let task_info = tasks.get(task_id); - let formatted_line = if let Some(task_info) = task_info { - let task_name = get_task_name(task_info); - let task_type = task_info.task.task_type.clone(); - let metadata = format_task_metadata(task_info); - - if metadata.is_empty() { - format!("[{} ({})] {}", task_name, task_type, line) - } else { - format!("[{} ({}) {}] {}", task_name, task_type, metadata, line) - } - } else { - line.to_string() - }; + let formatted_line = self.format_line(task_info, line).await; drop(tasks); - let event = TaskExecutionNotificationEvent::line_output( task_id.to_string(), formatted_line, diff --git a/crates/goose/src/agents/sub_recipe_execution_tool/task_types.rs b/crates/goose/src/agents/subagent_execution_tool/task_types.rs similarity index 97% rename from crates/goose/src/agents/sub_recipe_execution_tool/task_types.rs rename to crates/goose/src/agents/subagent_execution_tool/task_types.rs index 4515bb84..796491f6 100644 --- a/crates/goose/src/agents/sub_recipe_execution_tool/task_types.rs +++ b/crates/goose/src/agents/subagent_execution_tool/task_types.rs @@ -4,7 +4,7 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use tokio::sync::mpsc; -use crate::agents::sub_recipe_execution_tool::task_execution_tracker::TaskExecutionTracker; +use crate::agents::subagent_execution_tool::task_execution_tracker::TaskExecutionTracker; #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)] #[serde(rename_all = "lowercase")] @@ -28,18 +28,18 @@ impl Task { .flatten() } - pub fn get_sequential_when_repeated(&self) -> bool { - self.get_sub_recipe() - .and_then(|sr| sr.get("sequential_when_repeated").and_then(|v| v.as_bool())) - .unwrap_or_default() - } - pub fn get_command_parameters(&self) -> Option<&Map> { self.get_sub_recipe() .and_then(|sr| sr.get("command_parameters")) .and_then(|cp| cp.as_object()) } + pub fn get_sequential_when_repeated(&self) -> bool { + self.get_sub_recipe() + .and_then(|sr| sr.get("sequential_when_repeated").and_then(|v| v.as_bool())) + .unwrap_or_default() + } + pub fn get_sub_recipe_name(&self) -> Option<&str> { self.get_sub_recipe() .and_then(|sr| sr.get("name")) diff --git a/crates/goose/src/agents/subagent_execution_tool/tasks.rs b/crates/goose/src/agents/subagent_execution_tool/tasks.rs new file mode 100644 index 00000000..7ed93245 --- /dev/null +++ b/crates/goose/src/agents/subagent_execution_tool/tasks.rs @@ -0,0 +1,231 @@ +use serde_json::Value; +use std::process::Stdio; +use std::sync::Arc; +use tokio::io::{AsyncBufReadExt, BufReader}; +use tokio::process::Command; + +use crate::agents::subagent_execution_tool::task_execution_tracker::TaskExecutionTracker; +use crate::agents::subagent_execution_tool::task_types::{Task, TaskResult, TaskStatus}; +use crate::agents::subagent_handler::run_complete_subagent_task; +use crate::agents::subagent_task_config::TaskConfig; + +pub async fn process_task( + task: &Task, + task_execution_tracker: Arc, + task_config: TaskConfig, +) -> TaskResult { + match get_task_result(task.clone(), task_execution_tracker, task_config).await { + Ok(data) => TaskResult { + task_id: task.id.clone(), + status: TaskStatus::Completed, + data: Some(data), + error: None, + }, + Err(error) => TaskResult { + task_id: task.id.clone(), + status: TaskStatus::Failed, + data: None, + error: Some(error), + }, + } +} + +async fn get_task_result( + task: Task, + task_execution_tracker: Arc, + task_config: TaskConfig, +) -> Result { + if task.task_type == "text_instruction" { + // Handle text_instruction tasks using subagent system + handle_text_instruction_task(task, task_execution_tracker, task_config).await + } else { + // Handle sub_recipe tasks using command execution + let (command, output_identifier) = build_command(&task)?; + let (stdout_output, stderr_output, success) = run_command( + command, + &output_identifier, + &task.id, + task_execution_tracker, + ) + .await?; + + if success { + process_output(stdout_output) + } else { + Err(format!("Command failed:\n{}", stderr_output)) + } + } +} + +async fn handle_text_instruction_task( + task: Task, + task_execution_tracker: Arc, + task_config: TaskConfig, +) -> Result { + let text_instruction = task + .get_text_instruction() + .ok_or_else(|| format!("Task {}: Missing text_instruction", task.id))?; + + // Start tracking the task + task_execution_tracker.start_task(&task.id).await; + + // Create arguments for the subagent task + let task_arguments = serde_json::json!({ + "text_instruction": text_instruction, + // "instructions": "You are a helpful assistant. Execute the given task and provide a clear, concise response.", + }); + + match run_complete_subagent_task(task_arguments, task_config).await { + Ok(contents) => { + // Extract the text content from the result + let result_text = contents + .into_iter() + .filter_map(|content| match content { + mcp_core::Content::Text(text) => Some(text.text), + _ => None, + }) + .collect::>() + .join("\n"); + + Ok(serde_json::json!({ + "result": result_text + })) + } + Err(e) => { + let error_msg = format!("Subagent execution failed: {}", e); + Err(error_msg) + } + } +} + +fn build_command(task: &Task) -> Result<(Command, String), String> { + let task_error = |field: &str| format!("Task {}: Missing {}", task.id, field); + + let (mut command, output_identifier) = if task.task_type == "sub_recipe" { + let sub_recipe_name = task + .get_sub_recipe_name() + .ok_or_else(|| task_error("sub_recipe name"))?; + let path = task + .get_sub_recipe_path() + .ok_or_else(|| task_error("sub_recipe path"))?; + let command_parameters = task + .get_command_parameters() + .ok_or_else(|| task_error("command_parameters"))?; + + let mut cmd = Command::new("goose"); + cmd.arg("run").arg("--recipe").arg(path).arg("--no-session"); + + for (key, value) in command_parameters { + let key_str = key.to_string(); + let value_str = value.as_str().unwrap_or(&value.to_string()).to_string(); + cmd.arg("--params") + .arg(format!("{}={}", key_str, value_str)); + } + (cmd, format!("sub-recipe {}", sub_recipe_name)) + } else { + // This branch should not be reached for text_instruction tasks anymore + // as they are handled in handle_text_instruction_task + return Err("Text instruction tasks are handled separately".to_string()); + }; + + command.stdout(Stdio::piped()); + command.stderr(Stdio::piped()); + Ok((command, output_identifier)) +} + +async fn run_command( + mut command: Command, + output_identifier: &str, + task_id: &str, + task_execution_tracker: Arc, +) -> Result<(String, String, bool), String> { + let mut child = command + .spawn() + .map_err(|e| format!("Failed to spawn goose: {}", e))?; + + let stdout = child.stdout.take().expect("Failed to capture stdout"); + let stderr = child.stderr.take().expect("Failed to capture stderr"); + + let stdout_task = spawn_output_reader( + stdout, + output_identifier, + false, + task_id, + task_execution_tracker.clone(), + ); + let stderr_task = spawn_output_reader( + stderr, + output_identifier, + true, + task_id, + task_execution_tracker.clone(), + ); + + let status = child + .wait() + .await + .map_err(|e| format!("Failed to wait for process: {}", e))?; + + let stdout_output = stdout_task.await.unwrap(); + let stderr_output = stderr_task.await.unwrap(); + + Ok((stdout_output, stderr_output, status.success())) +} + +fn spawn_output_reader( + reader: impl tokio::io::AsyncRead + Unpin + Send + 'static, + output_identifier: &str, + is_stderr: bool, + task_id: &str, + task_execution_tracker: Arc, +) -> tokio::task::JoinHandle { + let output_identifier = output_identifier.to_string(); + let task_id = task_id.to_string(); + tokio::spawn(async move { + let mut buffer = String::new(); + let mut lines = BufReader::new(reader).lines(); + while let Ok(Some(line)) = lines.next_line().await { + buffer.push_str(&line); + buffer.push('\n'); + + if !is_stderr { + task_execution_tracker + .send_live_output(&task_id, &line) + .await; + } else { + tracing::warn!("Task stderr [{}]: {}", output_identifier, line); + } + } + buffer + }) +} + +fn extract_json_from_line(line: &str) -> Option { + let start = line.find('{')?; + let end = line.rfind('}')?; + + if start >= end { + return None; + } + + let potential_json = &line[start..=end]; + if serde_json::from_str::(potential_json).is_ok() { + Some(potential_json.to_string()) + } else { + None + } +} + +fn process_output(stdout_output: String) -> Result { + let last_line = stdout_output + .lines() + .filter(|line| !line.trim().is_empty()) + .next_back() + .unwrap_or(""); + + if let Some(json_string) = extract_json_from_line(last_line) { + Ok(Value::String(json_string)) + } else { + Ok(Value::String(stdout_output)) + } +} diff --git a/crates/goose/src/agents/sub_recipe_execution_tool/tasks_manager.rs b/crates/goose/src/agents/subagent_execution_tool/tasks_manager.rs similarity index 97% rename from crates/goose/src/agents/sub_recipe_execution_tool/tasks_manager.rs rename to crates/goose/src/agents/subagent_execution_tool/tasks_manager.rs index 433478be..334379fa 100644 --- a/crates/goose/src/agents/sub_recipe_execution_tool/tasks_manager.rs +++ b/crates/goose/src/agents/subagent_execution_tool/tasks_manager.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use std::sync::Arc; use tokio::sync::RwLock; -use crate::agents::sub_recipe_execution_tool::task_types::Task; +use crate::agents::subagent_execution_tool::task_types::Task; #[derive(Debug, Clone)] pub struct TasksManager { diff --git a/crates/goose/src/agents/sub_recipe_execution_tool/utils/mod.rs b/crates/goose/src/agents/subagent_execution_tool/utils/mod.rs similarity index 91% rename from crates/goose/src/agents/sub_recipe_execution_tool/utils/mod.rs rename to crates/goose/src/agents/subagent_execution_tool/utils/mod.rs index 1ead865e..5d756752 100644 --- a/crates/goose/src/agents/sub_recipe_execution_tool/utils/mod.rs +++ b/crates/goose/src/agents/subagent_execution_tool/utils/mod.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; -use crate::agents::sub_recipe_execution_tool::task_types::{TaskInfo, TaskStatus}; +use crate::agents::subagent_execution_tool::task_types::{TaskInfo, TaskStatus}; pub fn get_task_name(task_info: &TaskInfo) -> &str { task_info diff --git a/crates/goose/src/agents/sub_recipe_execution_tool/utils/tests.rs b/crates/goose/src/agents/subagent_execution_tool/utils/tests.rs similarity index 96% rename from crates/goose/src/agents/sub_recipe_execution_tool/utils/tests.rs rename to crates/goose/src/agents/subagent_execution_tool/utils/tests.rs index de5bac92..b4e7f757 100644 --- a/crates/goose/src/agents/sub_recipe_execution_tool/utils/tests.rs +++ b/crates/goose/src/agents/subagent_execution_tool/utils/tests.rs @@ -1,5 +1,5 @@ -use crate::agents::sub_recipe_execution_tool::task_types::{Task, TaskInfo, TaskStatus}; -use crate::agents::sub_recipe_execution_tool::utils::{count_by_status, get_task_name}; +use crate::agents::subagent_execution_tool::task_types::{Task, TaskInfo, TaskStatus}; +use crate::agents::subagent_execution_tool::utils::{count_by_status, get_task_name}; use serde_json::json; use std::collections::HashMap; diff --git a/crates/goose/src/agents/subagent_execution_tool/workers.rs b/crates/goose/src/agents/subagent_execution_tool/workers.rs new file mode 100644 index 00000000..4ae0ab25 --- /dev/null +++ b/crates/goose/src/agents/subagent_execution_tool/workers.rs @@ -0,0 +1,40 @@ +use crate::agents::subagent_execution_tool::task_types::{SharedState, Task}; +use crate::agents::subagent_execution_tool::tasks::process_task; +use crate::agents::subagent_task_config::TaskConfig; +use std::sync::Arc; + +async fn receive_task(state: &SharedState) -> Option { + let mut receiver = state.task_receiver.lock().await; + receiver.recv().await +} + +pub fn spawn_worker( + state: Arc, + worker_id: usize, + task_config: TaskConfig, +) -> tokio::task::JoinHandle<()> { + state.increment_active_workers(); + + tokio::spawn(async move { + worker_loop(state, worker_id, task_config).await; + }) +} + +async fn worker_loop(state: Arc, _worker_id: usize, task_config: TaskConfig) { + while let Some(task) = receive_task(&state).await { + state.task_execution_tracker.start_task(&task.id).await; + let result = process_task( + &task, + state.task_execution_tracker.clone(), + task_config.clone(), + ) + .await; + + if let Err(e) = state.result_sender.send(result).await { + tracing::error!("Worker failed to send result: {}", e); + break; + } + } + + state.decrement_active_workers(); +} diff --git a/crates/goose/src/agents/subagent_handler.rs b/crates/goose/src/agents/subagent_handler.rs index f281f748..f40a34d4 100644 --- a/crates/goose/src/agents/subagent_handler.rs +++ b/crates/goose/src/agents/subagent_handler.rs @@ -1,79 +1,43 @@ +use crate::agents::subagent::SubAgent; +use crate::agents::subagent_task_config::TaskConfig; use anyhow::Result; use mcp_core::{Content, ToolError}; use serde_json::Value; -use std::sync::Arc; -use crate::agents::subagent_types::SpawnSubAgentArgs; -use crate::agents::Agent; +/// Standalone function to run a complete subagent task +pub async fn run_complete_subagent_task( + task_arguments: Value, + task_config: TaskConfig, +) -> Result, ToolError> { + // Parse arguments - using "task" as the main message parameter + let text_instruction = task_arguments + .get("text_instruction") + .and_then(|v| v.as_str()) + .ok_or_else(|| ToolError::ExecutionError("Missing text_instruction parameter".to_string()))? + .to_string(); -impl Agent { - /// Handle running a complete subagent task (replaces the individual spawn/send/check tools) - pub async fn handle_run_subagent_task( - &self, - arguments: Value, - ) -> Result, ToolError> { - let subagent_manager = self.subagent_manager.lock().await; - let manager = subagent_manager.as_ref().ok_or_else(|| { - ToolError::ExecutionError("Subagent manager not initialized".to_string()) - })?; + // Create the subagent with the parent agent's provider + let (subagent, handle) = SubAgent::new(task_config.clone()) + .await + .map_err(|e| ToolError::ExecutionError(format!("Failed to create subagent: {}", e)))?; - // Parse arguments - using "task" as the main message parameter - let message = arguments - .get("task") - .and_then(|v| v.as_str()) - .ok_or_else(|| ToolError::ExecutionError("Missing task parameter".to_string()))? - .to_string(); - - // Either recipe_name or instructions must be provided - let recipe_name = arguments - .get("recipe_name") - .and_then(|v| v.as_str()) - .map(|s| s.to_string()); - let instructions = arguments - .get("instructions") - .and_then(|v| v.as_str()) - .map(|s| s.to_string()); - - let mut args = if let Some(recipe_name) = recipe_name { - SpawnSubAgentArgs::new_with_recipe(recipe_name, message.clone()) - } else if let Some(instructions) = instructions { - SpawnSubAgentArgs::new_with_instructions(instructions, message.clone()) - } else { - return Err(ToolError::ExecutionError( - "Either recipe_name or instructions parameter must be provided".to_string(), - )); - }; - - // Set max_turns with default of 10 - let max_turns = arguments - .get("max_turns") - .and_then(|v| v.as_u64()) - .unwrap_or(10) as usize; - args = args.with_max_turns(max_turns); - - if let Some(timeout) = arguments.get("timeout_seconds").and_then(|v| v.as_u64()) { - args = args.with_timeout(timeout); + // Execute the subagent task + let result = match subagent.reply_subagent(text_instruction, task_config).await { + Ok(response) => { + let response_text = response.as_concat_text(); + Ok(vec![Content::text(response_text)]) } + Err(e) => Err(ToolError::ExecutionError(format!( + "Subagent execution failed: {}", + e + ))), + }; - // Get the provider from the parent agent - let provider = self - .provider() - .await - .map_err(|e| ToolError::ExecutionError(format!("Failed to get provider: {}", e)))?; - - // Get the extension manager from the parent agent - let extension_manager = Arc::new(self.extension_manager.read().await); - - // Run the complete subagent task - match manager - .run_complete_subagent_task(args, provider, extension_manager) - .await - { - Ok(result) => Ok(vec![Content::text(result)]), - Err(e) => Err(ToolError::ExecutionError(format!( - "Failed to run subagent task: {}", - e - ))), - } + // Clean up the subagent handle + if let Err(e) = handle.await { + tracing::debug!("Subagent handle cleanup error: {}", e); } + + // Return the result + result } diff --git a/crates/goose/src/agents/subagent_manager.rs b/crates/goose/src/agents/subagent_manager.rs deleted file mode 100644 index 174facee..00000000 --- a/crates/goose/src/agents/subagent_manager.rs +++ /dev/null @@ -1,404 +0,0 @@ -use std::collections::HashMap; -use std::path::Path; -use std::sync::Arc; - -use anyhow::{anyhow, Result}; -use mcp_core::protocol::JsonRpcMessage; -use tokio::sync::{mpsc, Mutex, RwLock}; -use tracing::{debug, error, instrument, warn}; - -use crate::agents::extension_manager::ExtensionManager; -use crate::agents::subagent::{SubAgent, SubAgentConfig, SubAgentProgress, SubAgentStatus}; -use crate::agents::subagent_types::SpawnSubAgentArgs; -use crate::providers::base::Provider; -use crate::recipe::Recipe; - -/// Manages the lifecycle of subagents -pub struct SubAgentManager { - subagents: Arc>>>, - handles: Arc>>>, - mcp_notification_tx: mpsc::Sender, -} - -impl SubAgentManager { - /// Create a new subagent manager - pub fn new(mcp_notification_tx: mpsc::Sender) -> Self { - Self { - subagents: Arc::new(RwLock::new(HashMap::new())), - handles: Arc::new(Mutex::new(HashMap::new())), - mcp_notification_tx, - } - } - - /// Spawn a new interactive subagent - #[instrument(skip(self, args, provider, extension_manager))] - pub async fn spawn_interactive_subagent( - &self, - args: SpawnSubAgentArgs, - provider: Arc, - extension_manager: Arc>, - ) -> Result { - debug!("Spawning interactive subagent"); - - // Create subagent config based on whether we have a recipe or instructions - let mut config = if let Some(recipe_name) = args.recipe_name { - debug!("Using recipe: {}", recipe_name); - // Load the recipe - let recipe = self.load_recipe(&recipe_name).await?; - SubAgentConfig::new_with_recipe(recipe) - } else if let Some(instructions) = args.instructions { - debug!("Using direct instructions"); - SubAgentConfig::new_with_instructions(instructions) - } else { - return Err(anyhow!( - "Either recipe_name or instructions must be provided" - )); - }; - - if let Some(max_turns) = args.max_turns { - config = config.with_max_turns(max_turns); - } - if let Some(timeout) = args.timeout_seconds { - config = config.with_timeout(timeout); - } - - // Create the subagent with the parent agent's provider - let (subagent, handle) = SubAgent::new( - config, - Arc::clone(&provider), - Arc::clone(&extension_manager), - self.mcp_notification_tx.clone(), - ) - .await?; - let subagent_id = subagent.id.clone(); - - // Store the subagent and its handle - { - let mut subagents = self.subagents.write().await; - subagents.insert(subagent_id.clone(), Arc::clone(&subagent)); - } - { - let mut handles = self.handles.lock().await; - handles.insert(subagent_id.clone(), handle); - } - - // Return immediately - no initial message processing - Ok(subagent_id) - } - - /// Get a subagent by ID - pub async fn get_subagent(&self, id: &str) -> Option> { - let subagents = self.subagents.read().await; - subagents.get(id).cloned() - } - - /// List all active subagent IDs - pub async fn list_subagents(&self) -> Vec { - let subagents = self.subagents.read().await; - subagents.keys().cloned().collect() - } - - /// Get status of all subagents - pub async fn get_subagent_status(&self) -> HashMap { - let subagents = self.subagents.read().await; - let mut status_map = HashMap::new(); - - for (id, subagent) in subagents.iter() { - status_map.insert(id.clone(), subagent.get_status().await); - } - - status_map - } - - /// Get progress of all subagents - pub async fn get_subagent_progress(&self) -> HashMap { - let subagents = self.subagents.read().await; - let mut progress_map = HashMap::new(); - - for (id, subagent) in subagents.iter() { - progress_map.insert(id.clone(), subagent.get_progress().await); - } - - progress_map - } - - /// Send a message to a specific subagent - #[instrument(skip(self, message, provider, extension_manager))] - pub async fn send_message_to_subagent( - &self, - subagent_id: &str, - message: String, - provider: Arc, - extension_manager: Arc>, - ) -> Result { - let subagent = self - .get_subagent(subagent_id) - .await - .ok_or_else(|| anyhow!("Subagent {} not found", subagent_id))?; - - // Process the message and get a reply - match subagent - .reply_subagent(message, provider, extension_manager) - .await - { - Ok(response) => Ok(format!( - "Message sent to subagent {}. Response:\n{}", - subagent_id, - response.as_concat_text() - )), - Err(e) => Err(anyhow!("Failed to process message in subagent: {}", e)), - } - } - - /// Terminate a specific subagent - #[instrument(skip(self))] - pub async fn terminate_subagent(&self, id: &str) -> Result<()> { - debug!("Terminating subagent {}", id); - - // Get and terminate the subagent - let subagent = { - let mut subagents = self.subagents.write().await; - subagents.remove(id) - }; - - if let Some(subagent) = subagent { - subagent.terminate().await?; - } else { - warn!("Attempted to terminate non-existent subagent {}", id); - return Err(anyhow!("Subagent {} not found", id)); - } - - // Clean up the background handle - let handle = { - let mut handles = self.handles.lock().await; - handles.remove(id) - }; - - if let Some(handle) = handle { - handle.abort(); - } - - debug!("Subagent {} terminated successfully", id); - Ok(()) - } - - /// Terminate all subagents - #[instrument(skip(self))] - pub async fn terminate_all_subagents(&self) -> Result<()> { - debug!("Terminating all subagents"); - - let subagent_ids: Vec = { - let subagents = self.subagents.read().await; - subagents.keys().cloned().collect() - }; - - for id in subagent_ids { - if let Err(e) = self.terminate_subagent(&id).await { - error!("Failed to terminate subagent {}: {}", id, e); - } - } - - debug!("All subagents terminated"); - Ok(()) - } - - /// Get formatted conversation from a subagent - pub async fn get_subagent_conversation(&self, id: &str) -> Result { - let subagent = self - .get_subagent(id) - .await - .ok_or_else(|| anyhow!("Subagent {} not found", id))?; - - Ok(subagent.get_formatted_conversation().await) - } - - /// Clean up completed or failed subagents - pub async fn cleanup_completed_subagents(&self) -> Result { - let mut completed_ids = Vec::new(); - - // Find completed subagents - { - let subagents = self.subagents.read().await; - for (id, subagent) in subagents.iter() { - if subagent.is_completed().await { - completed_ids.push(id.clone()); - } - } - } - - // Remove completed subagents - let count = completed_ids.len(); - for id in completed_ids { - if let Err(e) = self.terminate_subagent(&id).await { - error!("Failed to cleanup completed subagent {}: {}", id, e); - } - } - - debug!("Cleaned up {} completed subagents", count); - Ok(count) - } - - /// Load a recipe from file - async fn load_recipe(&self, recipe_name: &str) -> Result { - // Try to load from current directory first - let recipe_path = if recipe_name.ends_with(".yaml") || recipe_name.ends_with(".yml") { - recipe_name.to_string() - } else { - format!("{}.yaml", recipe_name) - }; - - if Path::new(&recipe_path).exists() { - let content = tokio::fs::read_to_string(&recipe_path).await?; - let recipe: Recipe = serde_yaml::from_str(&content)?; - return Ok(recipe); - } - - // Try some common recipe locations - let common_paths = [ - format!("recipes/{}", recipe_path), - format!("./recipes/{}", recipe_path), - format!("../recipes/{}", recipe_path), - ]; - - for path in &common_paths { - if Path::new(path).exists() { - let content = tokio::fs::read_to_string(path).await?; - let recipe: Recipe = serde_yaml::from_str(&content)?; - return Ok(recipe); - } - } - - Err(anyhow!( - "Recipe file '{}' not found in current directory or common recipe locations", - recipe_name - )) - } - - /// Get count of active subagents - pub async fn get_active_count(&self) -> usize { - let subagents = self.subagents.read().await; - subagents.len() - } - - /// Check if a subagent exists - pub async fn has_subagent(&self, id: &str) -> bool { - let subagents = self.subagents.read().await; - subagents.contains_key(id) - } - - /// Run a complete subagent task (spawn, execute, cleanup) - #[instrument(skip(self, args, provider, extension_manager))] - pub async fn run_complete_subagent_task( - &self, - args: SpawnSubAgentArgs, - provider: Arc, - extension_manager: Arc>, - ) -> Result { - debug!("Running complete subagent task"); - - // Create subagent config based on whether we have a recipe or instructions - let mut config = if let Some(recipe_name) = args.recipe_name { - debug!("Using recipe: {}", recipe_name); - // Load the recipe - let recipe = self.load_recipe(&recipe_name).await?; - SubAgentConfig::new_with_recipe(recipe) - } else if let Some(instructions) = args.instructions { - debug!("Using direct instructions"); - SubAgentConfig::new_with_instructions(instructions) - } else { - return Err(anyhow!( - "Either recipe_name or instructions must be provided" - )); - }; - - // Set default max_turns if not provided - let max_turns = args.max_turns.unwrap_or(10); - config = config.with_max_turns(max_turns); - - if let Some(timeout) = args.timeout_seconds { - config = config.with_timeout(timeout); - } - - // Create the subagent with the parent agent's provider - let (subagent, handle) = SubAgent::new( - config, - Arc::clone(&provider), - Arc::clone(&extension_manager), - self.mcp_notification_tx.clone(), - ) - .await?; - let subagent_id = subagent.id.clone(); - - // Store the subagent and its handle temporarily - { - let mut subagents = self.subagents.write().await; - subagents.insert(subagent_id.clone(), Arc::clone(&subagent)); - } - { - let mut handles = self.handles.lock().await; - handles.insert(subagent_id.clone(), handle); - } - - // Run the complete conversation - let mut conversation_result = String::new(); - let turn_count = 0; - let current_message = args.message.clone(); - - // For now, we just complete after one turn since we don't have a mechanism - // for the subagent to continue autonomously without user input - // In a future iteration, we could add logic for the subagent to continue - // working on multi-step tasks with proper turn management - match subagent - .reply_subagent( - current_message, - Arc::clone(&provider), - Arc::clone(&extension_manager), - ) - .await - { - Ok(response) => { - let response_text = response.as_concat_text(); - conversation_result.push_str(&format!( - "\n--- Turn {} ---\n{}", - turn_count + 1, - response_text - )); - conversation_result.push_str(&format!( - "\n[Task completed after {} turns]", - turn_count + 1 - )); - } - Err(e) => { - conversation_result - .push_str(&format!("\n[Error after {} turns: {}]", turn_count, e)); - } - } - - // Clean up the subagent - if let Err(e) = self.terminate_subagent(&subagent_id).await { - debug!("Failed to cleanup subagent {}: {}", subagent_id, e); - } - - // Return the complete conversation result - Ok(format!("Subagent task completed:\n{}", conversation_result)) - } -} - -impl Default for SubAgentManager { - fn default() -> Self { - // Create a dummy channel for default implementation - // In practice, this should not be used - SubAgentManager should be created - // with a proper MCP notification sender - let (tx, _rx) = mpsc::channel(1); - Self::new(tx) - } -} - -impl Drop for SubAgentManager { - fn drop(&mut self) { - // Note: In a real implementation, you might want to spawn a task to clean up - // subagents gracefully, but for now we'll rely on the Drop implementations - // of the individual components - debug!("SubAgentManager dropped"); - } -} diff --git a/crates/goose/src/agents/subagent_task_config.rs b/crates/goose/src/agents/subagent_task_config.rs new file mode 100644 index 00000000..261fb82b --- /dev/null +++ b/crates/goose/src/agents/subagent_task_config.rs @@ -0,0 +1,55 @@ +use crate::agents::extension_manager::ExtensionManager; +use crate::providers::base::Provider; +use mcp_core::protocol::JsonRpcMessage; +use std::fmt; +use std::sync::Arc; +use tokio::sync::{mpsc, RwLock}; +use uuid::Uuid; + +/// Configuration for task execution with all necessary dependencies +#[derive(Clone)] +pub struct TaskConfig { + pub id: String, + pub provider: Option>, + pub extension_manager: Option>>, + pub mcp_tx: mpsc::Sender, + pub max_turns: Option, +} + +impl fmt::Debug for TaskConfig { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("TaskConfig") + .field("id", &self.id) + .field("provider", &"") + .field("extension_manager", &"") + .field("max_turns", &self.max_turns) + .finish() + } +} + +impl TaskConfig { + /// Create a new TaskConfig with all required dependencies + pub fn new( + provider: Option>, + extension_manager: Option>>, + mcp_tx: mpsc::Sender, + ) -> Self { + Self { + id: Uuid::new_v4().to_string(), + provider, + extension_manager, + mcp_tx, + max_turns: Some(10), + } + } + + /// Get a reference to the provider + pub fn provider(&self) -> Option<&Arc> { + self.provider.as_ref() + } + + /// Get a clone of the MCP sender + pub fn mcp_tx(&self) -> mpsc::Sender { + self.mcp_tx.clone() + } +} diff --git a/crates/goose/src/agents/subagent_tools.rs b/crates/goose/src/agents/subagent_tools.rs deleted file mode 100644 index f8f35f1f..00000000 --- a/crates/goose/src/agents/subagent_tools.rs +++ /dev/null @@ -1,68 +0,0 @@ -use indoc::indoc; -use mcp_core::tool::{Tool, ToolAnnotations}; -use serde_json::json; - -pub const SUBAGENT_RUN_TASK_TOOL_NAME: &str = "subagent__run_task"; - -pub fn run_task_subagent_tool() -> Tool { - Tool::new( - SUBAGENT_RUN_TASK_TOOL_NAME.to_string(), - indoc! {r#" - Spawn a specialized subagent to handle a specific task completely and automatically. - - This tool creates a subagent, processes your task through a complete conversation, - and returns the final result. The subagent is automatically cleaned up after completion. - - You can configure the subagent in two ways: - 1. Using a recipe file that defines instructions, extensions, and behavior - 2. Providing direct instructions for ad-hoc tasks - - The subagent will work autonomously until the task is complete, it reaches max_turns, - or it encounters an error. You'll get the final result without needing to manage - the subagent lifecycle manually. - - Examples: - - "Convert these unittest files to pytest format: file1.py, file2.py" - - "Research the latest developments in AI and provide a comprehensive summary" - - "Review this code for security vulnerabilities and suggest fixes" - - "Refactor this legacy code to use modern Python patterns" - "#} - .to_string(), - json!({ - "type": "object", - "required": ["task"], - "properties": { - "recipe_name": { - "type": "string", - "description": "Name of the recipe file to configure the subagent (e.g., 'research_assistant_recipe.yaml'). Either this or 'instructions' must be provided." - }, - "instructions": { - "type": "string", - "description": "Direct instructions for the subagent's task. Either this or 'recipe_name' must be provided. Example: 'You are a code refactoring assistant. Help convert unittest tests to pytest format.'" - }, - "task": { - "type": "string", - "description": "The task description or initial message for the subagent to work on" - }, - "max_turns": { - "type": "integer", - "description": "Maximum number of conversation turns before auto-completion (default: 10)", - "minimum": 1, - "default": 10 - }, - "timeout_seconds": { - "type": "integer", - "description": "Optional timeout for the entire task in seconds", - "minimum": 1 - } - } - }), - Some(ToolAnnotations { - title: Some("Run subagent task".to_string()), - read_only_hint: false, - destructive_hint: false, - idempotent_hint: false, - open_world_hint: false, - }), - ) -} diff --git a/crates/goose/src/agents/subagent_types.rs b/crates/goose/src/agents/subagent_types.rs deleted file mode 100644 index 1fbc8556..00000000 --- a/crates/goose/src/agents/subagent_types.rs +++ /dev/null @@ -1,42 +0,0 @@ -use serde::{Deserialize, Serialize}; - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct SpawnSubAgentArgs { - pub recipe_name: Option, - pub instructions: Option, - pub message: String, - pub max_turns: Option, - pub timeout_seconds: Option, -} - -impl SpawnSubAgentArgs { - pub fn new_with_recipe(recipe_name: String, message: String) -> Self { - Self { - recipe_name: Some(recipe_name), - instructions: None, - message, - max_turns: None, - timeout_seconds: None, - } - } - - pub fn new_with_instructions(instructions: String, message: String) -> Self { - Self { - recipe_name: None, - instructions: Some(instructions), - message, - max_turns: None, - timeout_seconds: None, - } - } - - pub fn with_max_turns(mut self, max_turns: usize) -> Self { - self.max_turns = Some(max_turns); - self - } - - pub fn with_timeout(mut self, timeout_seconds: u64) -> Self { - self.timeout_seconds = Some(timeout_seconds); - self - } -} diff --git a/crates/goose/src/recipe/mod.rs b/crates/goose/src/recipe/mod.rs index 8b0144a9..2d753b20 100644 --- a/crates/goose/src/recipe/mod.rs +++ b/crates/goose/src/recipe/mod.rs @@ -146,11 +146,6 @@ pub struct SubRecipe { #[serde(default)] pub sequential_when_repeated: bool, } -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct Execution { - #[serde(default)] - pub parallel: bool, -} fn deserialize_value_map_as_string<'de, D>( deserializer: D, diff --git a/crates/goose/tests/pricing_integration_test.rs b/crates/goose/tests/pricing_integration_test.rs index 9e447290..083f96da 100644 --- a/crates/goose/tests/pricing_integration_test.rs +++ b/crates/goose/tests/pricing_integration_test.rs @@ -3,6 +3,10 @@ use std::time::Instant; #[tokio::test] async fn test_pricing_cache_performance() { + // Use a unique cache directory for this test to avoid conflicts + let test_cache_dir = format!("/tmp/goose_test_cache_perf_{}", std::process::id()); + std::env::set_var("GOOSE_CACHE_DIR", &test_cache_dir); + // Initialize the cache let start = Instant::now(); initialize_pricing_cache() @@ -65,10 +69,18 @@ async fn test_pricing_cache_performance() { first_fetch_duration, second_fetch_duration ); + + // Clean up + std::env::remove_var("GOOSE_CACHE_DIR"); + let _ = std::fs::remove_dir_all(&test_cache_dir); } #[tokio::test] async fn test_pricing_refresh() { + // Use a unique cache directory for this test to avoid conflicts + let test_cache_dir = format!("/tmp/goose_test_cache_refresh_{}", std::process::id()); + std::env::set_var("GOOSE_CACHE_DIR", &test_cache_dir); + // Initialize first initialize_pricing_cache() .await @@ -90,10 +102,18 @@ async fn test_pricing_refresh() { refreshed_pricing.is_some(), "Expected pricing after refresh" ); + + // Clean up + std::env::remove_var("GOOSE_CACHE_DIR"); + let _ = std::fs::remove_dir_all(&test_cache_dir); } #[tokio::test] async fn test_model_not_in_openrouter() { + // Use a unique cache directory for this test to avoid conflicts + let test_cache_dir = format!("/tmp/goose_test_cache_model_{}", std::process::id()); + std::env::set_var("GOOSE_CACHE_DIR", &test_cache_dir); + initialize_pricing_cache() .await .expect("Failed to initialize pricing cache"); @@ -104,12 +124,20 @@ async fn test_model_not_in_openrouter() { pricing.is_none(), "Should return None for non-existent model" ); + + // Clean up + std::env::remove_var("GOOSE_CACHE_DIR"); + let _ = std::fs::remove_dir_all(&test_cache_dir); } #[tokio::test] async fn test_concurrent_access() { use tokio::task; + // Use a unique cache directory for this test to avoid conflicts + let test_cache_dir = format!("/tmp/goose_test_cache_concurrent_{}", std::process::id()); + std::env::set_var("GOOSE_CACHE_DIR", &test_cache_dir); + initialize_pricing_cache() .await .expect("Failed to initialize pricing cache"); @@ -133,4 +161,8 @@ async fn test_concurrent_access() { assert!(has_pricing, "Task {} should have gotten pricing", task_id); println!("Task {} took: {:?}", task_id, duration); } + + // Clean up + std::env::remove_var("GOOSE_CACHE_DIR"); + let _ = std::fs::remove_dir_all(&test_cache_dir); }