feat: consolidate subagent execution for dynamic tasks (#3444)

Co-authored-by: Lifei Zhou <lifei@squareup.com>
This commit is contained in:
Wendy Tang
2025-07-17 18:48:00 -07:00
committed by GitHub
parent 9f65455cc3
commit 4771ffbb5b
36 changed files with 751 additions and 1062 deletions

View File

@@ -203,6 +203,7 @@ pub async fn build_session(session_config: SessionBuilderConfig) -> Session {
// Create the agent
let agent: Agent = Agent::new();
if let Some(sub_recipes) = session_config.sub_recipes {
agent.add_sub_recipes(sub_recipes).await;
}

View File

@@ -7,7 +7,10 @@ mod prompt;
mod task_execution_display;
mod thinking;
use crate::session::task_execution_display::TASK_EXECUTION_NOTIFICATION_TYPE;
use crate::session::task_execution_display::{
format_task_execution_notification, TASK_EXECUTION_NOTIFICATION_TYPE,
};
use std::io::Write;
pub use self::export::message_to_markdown;
pub use builder::{build_session, SessionBuilderConfig, SessionSettings};
@@ -20,8 +23,6 @@ use goose::permission::PermissionConfirmation;
use goose::providers::base::Provider;
pub use goose::session::Identifier;
use goose::utils::safe_truncate;
use std::io::Write;
use task_execution_display::format_task_execution_notification;
use anyhow::{Context, Result};
use completion::GooseCompleter;
@@ -1077,7 +1078,7 @@ impl Session {
// Handle subagent notifications - show immediately
if let Some(_id) = subagent_id {
// Show subagent notifications immediately (no buffering) with compact spacing
// TODO: proper display for subagent notifications
if interactive {
let _ = progress_bars.hide();
println!("{}", console::style(&formatted_message).green().dim());

View File

@@ -463,8 +463,26 @@ fn print_params(value: &Value, depth: usize, debug: bool) {
}
}
Value::String(s) => {
if !debug && s.len() > get_tool_params_max_length() {
println!("{}{}: {}", indent, style(key).dim(), style("...").dim());
// Special handling for text_instruction to show more content
let max_length = if key == "text_instruction" {
200 // Allow longer display for text instructions
} else {
get_tool_params_max_length()
};
if !debug && s.len() > max_length {
// For text instructions, show a preview instead of just "..."
if key == "text_instruction" {
let preview = &s[..max_length.saturating_sub(3)];
println!(
"{}{}: {}",
indent,
style(key).dim(),
style(format!("{}...", preview)).green()
);
} else {
println!("{}{}: {}", indent, style(key).dim(), style("...").dim());
}
} else {
println!("{}{}: {}", indent, style(key).dim(), style(s).green());
}

View File

@@ -1,5 +1,5 @@
use goose::agents::sub_recipe_execution_tool::lib::TaskStatus;
use goose::agents::sub_recipe_execution_tool::notification_events::{
use goose::agents::subagent_execution_tool::lib::TaskStatus;
use goose::agents::subagent_execution_tool::notification_events::{
TaskExecutionNotificationEvent, TaskInfo,
};
use serde_json::Value;

View File

@@ -1,5 +1,5 @@
use super::*;
use goose::agents::sub_recipe_execution_tool::notification_events::{
use goose::agents::subagent_execution_tool::notification_events::{
FailedTaskInfo, TaskCompletionStats, TaskExecutionStats,
};
use serde_json::json;

View File

@@ -9,11 +9,14 @@ use futures::{stream, FutureExt, Stream, StreamExt, TryStreamExt};
use mcp_core::protocol::JsonRpcMessage;
use crate::agents::final_output_tool::{FINAL_OUTPUT_CONTINUATION_MESSAGE, FINAL_OUTPUT_TOOL_NAME};
use crate::agents::sub_recipe_execution_tool::sub_recipe_execute_task_tool::{
self, SUB_RECIPE_EXECUTE_TASK_TOOL_NAME,
use crate::agents::recipe_tools::dynamic_task_tools::{
create_dynamic_task, create_dynamic_task_tool, DYNAMIC_TASK_TOOL_NAME_PREFIX,
};
use crate::agents::sub_recipe_execution_tool::tasks_manager::TasksManager;
use crate::agents::sub_recipe_manager::SubRecipeManager;
use crate::agents::subagent_execution_tool::subagent_execute_task_tool::{
self, SUBAGENT_EXECUTE_TASK_TOOL_NAME,
};
use crate::agents::subagent_execution_tool::tasks_manager::TasksManager;
use crate::config::{Config, ExtensionConfigManager, PermissionManager};
use crate::message::{push_message, Message};
use crate::permission::permission_judge::check_tool_permissions;
@@ -48,21 +51,18 @@ use mcp_core::{
prompt::Prompt, protocol::GetPromptResult, tool::Tool, Content, ToolError, ToolResult,
};
use crate::agents::subagent_tools::SUBAGENT_RUN_TASK_TOOL_NAME;
use super::final_output_tool::FinalOutputTool;
use super::platform_tools;
use super::router_tools;
use super::subagent_manager::SubAgentManager;
use super::subagent_tools;
use super::tool_execution::{ToolCallResult, CHAT_MODE_TOOL_SKIPPED_RESPONSE, DECLINED_RESPONSE};
use crate::agents::subagent_task_config::TaskConfig;
const DEFAULT_MAX_TURNS: u32 = 1000;
/// The main goose Agent
pub struct Agent {
pub(super) provider: Mutex<Option<Arc<dyn Provider>>>,
pub(super) extension_manager: RwLock<ExtensionManager>,
pub(super) extension_manager: Arc<RwLock<ExtensionManager>>,
pub(super) sub_recipe_manager: Mutex<SubRecipeManager>,
pub(super) tasks_manager: TasksManager,
pub(super) final_output_tool: Mutex<Option<FinalOutputTool>>,
@@ -76,7 +76,7 @@ pub struct Agent {
pub(super) tool_monitor: Mutex<Option<ToolMonitor>>,
pub(super) router_tool_selector: Mutex<Option<Arc<Box<dyn RouterToolSelector>>>>,
pub(super) scheduler_service: Mutex<Option<Arc<dyn SchedulerTrait>>>,
pub(super) subagent_manager: Mutex<Option<SubAgentManager>>,
pub(super) mcp_tx: Mutex<mpsc::Sender<JsonRpcMessage>>,
pub(super) mcp_notification_rx: Arc<Mutex<mpsc::Receiver<JsonRpcMessage>>>,
}
@@ -137,7 +137,7 @@ impl Agent {
Self {
provider: Mutex::new(None),
extension_manager: RwLock::new(ExtensionManager::new()),
extension_manager: Arc::new(RwLock::new(ExtensionManager::new())),
sub_recipe_manager: Mutex::new(SubRecipeManager::new()),
tasks_manager: TasksManager::new(),
final_output_tool: Mutex::new(None),
@@ -152,7 +152,7 @@ impl Agent {
router_tool_selector: Mutex::new(None),
scheduler_service: Mutex::new(None),
// Initialize with MCP notification support
subagent_manager: Mutex::new(Some(SubAgentManager::new(mcp_tx))),
mcp_tx: Mutex::new(mcp_tx),
mcp_notification_rx: Arc::new(Mutex::new(mcp_rx)),
}
}
@@ -300,12 +300,20 @@ impl Agent {
&self.tasks_manager,
)
.await
} else if tool_call.name == SUB_RECIPE_EXECUTE_TASK_TOOL_NAME {
sub_recipe_execute_task_tool::run_tasks(
} else if tool_call.name == SUBAGENT_EXECUTE_TASK_TOOL_NAME {
let provider = self.provider().await.ok();
let mcp_tx = self.mcp_tx.lock().await.clone();
let task_config =
TaskConfig::new(provider, Some(Arc::clone(&self.extension_manager)), mcp_tx);
subagent_execute_task_tool::run_tasks(
tool_call.arguments.clone(),
task_config,
&self.tasks_manager,
)
.await
} else if tool_call.name == DYNAMIC_TASK_TOOL_NAME_PREFIX {
create_dynamic_task(tool_call.arguments.clone(), &self.tasks_manager).await
} else if tool_call.name == PLATFORM_READ_RESOURCE_TOOL_NAME {
// Check if the tool is read_resource and handle it separately
ToolCallResult::from(
@@ -321,11 +329,6 @@ impl Agent {
)
} else if tool_call.name == PLATFORM_SEARCH_AVAILABLE_EXTENSIONS_TOOL_NAME {
ToolCallResult::from(extension_manager.search_available_extensions().await)
} else if tool_call.name == SUBAGENT_RUN_TASK_TOOL_NAME {
ToolCallResult::from(
self.handle_run_subagent_task(tool_call.arguments.clone())
.await,
)
} else if self.is_frontend_tool(&tool_call.name).await {
// For frontend tools, return an error indicating we need frontend execution
ToolCallResult::from(Err(ToolError::ExecutionError(
@@ -567,11 +570,8 @@ impl Agent {
platform_tools::manage_schedule_tool(),
]);
// Add subagent tool (only if ALPHA_FEATURES is enabled)
let config = Config::global();
if config.get_param::<bool>("ALPHA_FEATURES").unwrap_or(false) {
prefixed_tools.push(subagent_tools::run_task_subagent_tool());
}
// Dynamic task tool
prefixed_tools.push(create_dynamic_task_tool());
// Add resource tools if supported
if extension_manager.supports_resources() {
@@ -589,8 +589,7 @@ impl Agent {
if let Some(final_output_tool) = self.final_output_tool.lock().await.as_ref() {
prefixed_tools.push(final_output_tool.tool());
}
prefixed_tools
.push(sub_recipe_execute_task_tool::create_sub_recipe_execute_task_tool());
prefixed_tools.push(subagent_execute_task_tool::create_subagent_execute_task_tool());
}
prefixed_tools
@@ -1074,15 +1073,6 @@ impl Agent {
let mut current_provider = self.provider.lock().await;
*current_provider = Some(provider.clone());
// Initialize subagent manager with MCP notification support
// Need to recreate the MCP channel since we're replacing the manager
let (mcp_tx, mcp_rx) = mpsc::channel(100);
{
let mut rx_guard = self.mcp_notification_rx.lock().await;
*rx_guard = mcp_rx;
}
*self.subagent_manager.lock().await = Some(SubAgentManager::new(mcp_tx));
self.update_router_tool_selector(Some(provider), None)
.await?;
Ok(())

View File

@@ -11,13 +11,11 @@ mod reply_parts;
mod router_tool_selector;
mod router_tools;
mod schedule_tool;
pub mod sub_recipe_execution_tool;
pub mod sub_recipe_manager;
pub mod subagent;
pub mod subagent_execution_tool;
pub mod subagent_handler;
pub mod subagent_manager;
pub mod subagent_tools;
pub mod subagent_types;
mod subagent_task_config;
mod tool_execution;
mod tool_router_index_manager;
pub(crate) mod tool_vectordb;
@@ -27,7 +25,6 @@ pub use agent::{Agent, AgentEvent};
pub use extension::ExtensionConfig;
pub use extension_manager::ExtensionManager;
pub use prompt_manager::PromptManager;
pub use subagent::{SubAgent, SubAgentConfig, SubAgentProgress, SubAgentStatus};
pub use subagent_manager::SubAgentManager;
pub use subagent_types::SpawnSubAgentArgs;
pub use subagent::{SubAgent, SubAgentProgress, SubAgentStatus};
pub use subagent_task_config::TaskConfig;
pub use types::{FrontendTool, SessionConfig};

View File

@@ -0,0 +1,147 @@
// =======================================
// Module: Dynamic Task Tools
// Handles creation of tasks dynamically without sub-recipes
// =======================================
use crate::agents::subagent_execution_tool::tasks_manager::TasksManager;
use crate::agents::subagent_execution_tool::{lib::ExecutionMode, task_types::Task};
use crate::agents::tool_execution::ToolCallResult;
use mcp_core::{tool::ToolAnnotations, Content, Tool, ToolError};
use serde_json::{json, Value};
pub const DYNAMIC_TASK_TOOL_NAME_PREFIX: &str = "dynamic_task__create_task";
pub fn create_dynamic_task_tool() -> Tool {
Tool::new(
DYNAMIC_TASK_TOOL_NAME_PREFIX.to_string(),
"Use this tool to create one or more dynamic tasks from a shared text instruction and varying parameters.\
How it works:
- Provide a single text instruction
- Use the 'task_parameters' field to pass an array of parameter sets
- Each resulting task will use the same instruction with different parameter values
This is useful when performing the same operation across many inputs (e.g., getting weather for multiple cities, searching multiple slack channels, iterating through various linear tickets, etc).
Once created, these tasks should be passed to the 'subagent__execute_task' tool for execution. Tasks can run sequentially or in parallel.
---
What is a 'subagent'?
A 'subagent' is a stateless sub-process that executes a single task independently. Use subagents when:
- You want to parallelize similar work across different inputs
- You are not sure your search or operation will succeed on the first try
Each subagent receives a task with a defined payload and returns a result, which is not visible to the user unless explicitly summarized by the system.
---
Examples of 'task_parameters' for a single task:
text_instruction: Search for the config file in the root directory.
Examples of 'task_parameters' for multiple tasks:
text_instruction: Get weather for Melbourne.
timeout_seconds: 300
text_instruction: Get weather for Los Angeles.
timeout_seconds: 300
text_instruction: Get weather for San Francisco.
timeout_seconds: 300
".to_string(),
json!({
"type": "object",
"properties": {
"task_parameters": {
"type": "array",
"description": "Array of parameter sets for creating tasks. \
For a single task, provide an array with one element. \
For multiple tasks, provide an array with multiple elements, each with different parameter values. \
If there is no parameter set, provide an empty array.",
"items": {
"type": "object",
"properties": {
"text_instruction": {
"type": "string",
"description": "The text instruction to execute"
},
"timeout_seconds": {
"type": "integer",
"description": "Optional timeout for the task in seconds (default: 300)",
"minimum": 1
}
},
"required": ["text_instruction"]
}
}
}
}),
Some(ToolAnnotations {
title: Some("Dynamic Task Creation".to_string()),
read_only_hint: false,
destructive_hint: true,
idempotent_hint: false,
open_world_hint: true,
}),
)
}
fn extract_task_parameters(params: &Value) -> Vec<Value> {
params
.get("task_parameters")
.and_then(|v| v.as_array())
.cloned()
.unwrap_or_default()
}
fn create_text_instruction_tasks_from_params(task_params: &[Value]) -> Vec<Task> {
task_params
.iter()
.map(|task_param| {
let text_instruction = task_param
.get("text_instruction")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let payload = json!({
"text_instruction": text_instruction
});
Task {
id: uuid::Uuid::new_v4().to_string(),
task_type: "text_instruction".to_string(),
payload,
}
})
.collect()
}
fn create_task_execution_payload(tasks: Vec<Task>, execution_mode: ExecutionMode) -> Value {
let task_ids: Vec<String> = tasks.iter().map(|task| task.id.clone()).collect();
json!({
"task_ids": task_ids,
"execution_mode": execution_mode
})
}
pub async fn create_dynamic_task(params: Value, tasks_manager: &TasksManager) -> ToolCallResult {
let task_params_array = extract_task_parameters(&params);
if task_params_array.is_empty() {
return ToolCallResult::from(Err(ToolError::ExecutionError(
"No task parameters provided".to_string(),
)));
}
let tasks = create_text_instruction_tasks_from_params(&task_params_array);
// Use parallel execution if there are multiple tasks, sequential for single task
let execution_mode = if tasks.len() > 1 {
ExecutionMode::Parallel
} else {
ExecutionMode::Sequential
};
let task_execution_payload = create_task_execution_payload(tasks.clone(), execution_mode);
let tasks_json = match serde_json::to_string(&task_execution_payload) {
Ok(json) => json,
Err(e) => {
return ToolCallResult::from(Err(ToolError::ExecutionError(format!(
"Failed to serialize task list: {}",
e
))))
}
};
tasks_manager.save_tasks(tasks.clone()).await;
ToolCallResult::from(Ok(vec![Content::text(tasks_json)]))
}

View File

@@ -1,2 +1,3 @@
pub mod dynamic_task_tools;
pub mod param_utils;
pub mod sub_recipe_tools;

View File

@@ -5,8 +5,8 @@ use anyhow::Result;
use mcp_core::tool::{Tool, ToolAnnotations};
use serde_json::{json, Map, Value};
use crate::agents::sub_recipe_execution_tool::lib::{ExecutionMode, Task};
use crate::agents::sub_recipe_execution_tool::tasks_manager::TasksManager;
use crate::agents::subagent_execution_tool::lib::{ExecutionMode, Task};
use crate::agents::subagent_execution_tool::tasks_manager::TasksManager;
use crate::recipe::{Recipe, RecipeParameter, RecipeParameterRequirement, SubRecipe};
use super::param_utils::prepare_command_params;

View File

@@ -8,3 +8,4 @@ mod tasks;
pub mod tasks_manager;
pub mod utils;
mod workers;

View File

@@ -183,3 +183,4 @@ fn process_output(stdout_output: String) -> Result<Value, String> {
Ok(Value::String(stdout_output))
}
}

View File

@@ -28,3 +28,4 @@ async fn worker_loop(state: Arc<SharedState>, _worker_id: usize) {
state.decrement_active_workers();
}

View File

@@ -7,7 +7,7 @@ use crate::{
recipe_tools::sub_recipe_tools::{
create_sub_recipe_task, create_sub_recipe_task_tool, SUB_RECIPE_TASK_TOOL_NAME_PREFIX,
},
sub_recipe_execution_tool::tasks_manager::TasksManager,
subagent_execution_tool::tasks_manager::TasksManager,
tool_execution::ToolCallResult,
},
recipe::SubRecipe,

View File

@@ -1,27 +1,18 @@
use crate::{
agents::{extension_manager::ExtensionManager, Agent},
agents::{Agent, TaskConfig},
message::{Message, MessageContent, ToolRequest},
prompt_template::render_global_file,
providers::base::Provider,
providers::errors::ProviderError,
recipe::Recipe,
};
use anyhow::anyhow;
use chrono::{DateTime, Utc};
use mcp_core::protocol::{JsonRpcMessage, JsonRpcNotification};
use mcp_core::{handler::ToolError, role::Role, tool::Tool};
use mcp_core::{handler::ToolError, tool::Tool};
use serde::{Deserialize, Serialize};
use serde_json::{self, json};
use std::{collections::HashMap, sync::Arc};
use tokio::sync::{mpsc, Mutex, RwLock};
use tokio::sync::{Mutex, RwLock};
use tracing::{debug, error, instrument};
use uuid::Uuid;
use crate::agents::platform_tools::{
self, PLATFORM_LIST_RESOURCES_TOOL_NAME, PLATFORM_READ_RESOURCE_TOOL_NAME,
PLATFORM_SEARCH_AVAILABLE_EXTENSIONS_TOOL_NAME,
};
use crate::agents::subagent_tools::SUBAGENT_RUN_TASK_TOOL_NAME;
/// Status of a subagent
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
@@ -32,48 +23,6 @@ pub enum SubAgentStatus {
Terminated, // Manually terminated
}
/// Configuration for a subagent
#[derive(Debug)]
pub struct SubAgentConfig {
pub id: String,
pub recipe: Option<Recipe>,
pub instructions: Option<String>,
pub max_turns: Option<usize>,
pub timeout_seconds: Option<u64>,
}
impl SubAgentConfig {
pub fn new_with_recipe(recipe: Recipe) -> Self {
Self {
id: Uuid::new_v4().to_string(),
recipe: Some(recipe),
instructions: None,
max_turns: None,
timeout_seconds: None,
}
}
pub fn new_with_instructions(instructions: String) -> Self {
Self {
id: Uuid::new_v4().to_string(),
recipe: None,
instructions: Some(instructions),
max_turns: None,
timeout_seconds: None,
}
}
pub fn with_max_turns(mut self, max_turns: usize) -> Self {
self.max_turns = Some(max_turns);
self
}
pub fn with_timeout(mut self, timeout_seconds: u64) -> Self {
self.timeout_seconds = Some(timeout_seconds);
self
}
}
/// Progress information for a subagent
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubAgentProgress {
@@ -90,58 +39,26 @@ pub struct SubAgent {
pub id: String,
pub conversation: Arc<Mutex<Vec<Message>>>,
pub status: Arc<RwLock<SubAgentStatus>>,
pub config: SubAgentConfig,
pub config: TaskConfig,
pub turn_count: Arc<Mutex<usize>>,
pub created_at: DateTime<Utc>,
pub recipe_extensions: Arc<Mutex<Vec<String>>>,
pub missing_extensions: Arc<Mutex<Vec<String>>>, // Track extensions that weren't enabled
pub mcp_notification_tx: mpsc::Sender<JsonRpcMessage>, // For MCP notifications
}
impl SubAgent {
/// Create a new subagent with the given configuration and provider
#[instrument(skip(config, _provider, extension_manager, mcp_notification_tx))]
#[instrument(skip(task_config))]
pub async fn new(
config: SubAgentConfig,
_provider: Arc<dyn Provider>,
extension_manager: Arc<tokio::sync::RwLockReadGuard<'_, ExtensionManager>>,
mcp_notification_tx: mpsc::Sender<JsonRpcMessage>,
task_config: TaskConfig,
) -> Result<(Arc<Self>, tokio::task::JoinHandle<()>), anyhow::Error> {
debug!("Creating new subagent with id: {}", config.id);
let mut missing_extensions = Vec::new();
let mut recipe_extensions = Vec::new();
// Check if extensions from recipe exist in the extension manager
if let Some(recipe) = &config.recipe {
if let Some(extensions) = &recipe.extensions {
for extension in extensions {
let extension_name = extension.name();
let existing_extensions = extension_manager.list_extensions().await?;
if !existing_extensions.contains(&extension_name) {
missing_extensions.push(extension_name);
} else {
recipe_extensions.push(extension_name);
}
}
}
} else {
// If no recipe, inherit all extensions from the parent agent
let existing_extensions = extension_manager.list_extensions().await?;
recipe_extensions = existing_extensions;
}
debug!("Creating new subagent with id: {}", task_config.id);
let subagent = Arc::new(SubAgent {
id: config.id.clone(),
id: task_config.id.clone(),
conversation: Arc::new(Mutex::new(Vec::new())),
status: Arc::new(RwLock::new(SubAgentStatus::Ready)),
config,
config: task_config,
turn_count: Arc::new(Mutex::new(0)),
created_at: Utc::now(),
recipe_extensions: Arc::new(Mutex::new(recipe_extensions)),
missing_extensions: Arc::new(Mutex::new(missing_extensions)),
mcp_notification_tx,
});
// Send initial MCP notification
@@ -209,7 +126,7 @@ impl SubAgent {
})),
});
if let Err(e) = self.mcp_notification_tx.send(notification).await {
if let Err(e) = self.config.mcp_tx.send(notification).await {
error!(
"Failed to send MCP notification from subagent {}: {}",
self.id, e
@@ -238,17 +155,29 @@ impl SubAgent {
}
/// Process a message and generate a response using the subagent's provider
#[instrument(skip(self, message, provider, extension_manager))]
#[instrument(skip(self, message))]
pub async fn reply_subagent(
&self,
message: String,
provider: Arc<dyn Provider>,
extension_manager: Arc<tokio::sync::RwLockReadGuard<'_, ExtensionManager>>,
task_config: TaskConfig,
) -> Result<Message, anyhow::Error> {
debug!("Processing message for subagent {}", self.id);
self.send_mcp_notification("message_processing", &format!("Processing: {}", message))
.await;
// Get provider and extension manager from task config
let provider = self
.config
.provider
.as_ref()
.ok_or_else(|| anyhow!("No provider configured for subagent"))?;
let extension_manager = self
.config
.extension_manager
.as_ref()
.ok_or_else(|| anyhow!("No extension manager configured for subagent"))?;
// Check if we've exceeded max turns
{
let turn_count = *self.turn_count.lock().await;
@@ -288,80 +217,12 @@ impl SubAgent {
let mut messages = self.get_conversation().await;
// Get tools based on whether we're using a recipe or inheriting from parent
let tools: Vec<Tool> = if self.config.recipe.is_some() {
// Recipe mode: only get tools from the recipe's extensions
let recipe_extensions = self.recipe_extensions.lock().await;
let mut recipe_tools = Vec::new();
debug!(
"Subagent {} operating in recipe mode with {} extensions",
self.id,
recipe_extensions.len()
);
for extension_name in recipe_extensions.iter() {
match extension_manager
.get_prefixed_tools(Some(extension_name.clone()))
.await
{
Ok(mut ext_tools) => {
debug!(
"Added {} tools from extension {}",
ext_tools.len(),
extension_name
);
recipe_tools.append(&mut ext_tools);
}
Err(e) => {
debug!(
"Failed to get tools for extension {}: {}",
extension_name, e
);
}
}
}
debug!(
"Subagent {} has {} total recipe tools before filtering",
self.id,
recipe_tools.len()
);
// Filter out subagent tools from recipe tools
let mut filtered_tools = Self::filter_subagent_tools(recipe_tools);
// Add platform tools (except subagent tools)
Self::add_platform_tools(&mut filtered_tools, &extension_manager).await;
debug!(
"Subagent {} has {} tools after filtering and adding platform tools",
self.id,
filtered_tools.len()
);
filtered_tools
} else {
// No recipe: inherit all tools from parent (but filter out subagent tools)
debug!(
"Subagent {} operating in inheritance mode, using all parent tools",
self.id
);
let parent_tools = extension_manager.get_prefixed_tools(None).await?;
debug!(
"Subagent {} has {} parent tools before filtering",
self.id,
parent_tools.len()
);
let mut filtered_tools = Self::filter_subagent_tools(parent_tools);
// Add platform tools (except subagent tools)
Self::add_platform_tools(&mut filtered_tools, &extension_manager).await;
debug!(
"Subagent {} has {} tools after filtering and adding platform tools",
self.id,
filtered_tools.len()
);
filtered_tools
};
let tools: Vec<Tool> = extension_manager
.read()
.await
.get_prefixed_tools(None)
.await
.unwrap_or_default();
let toolshim_tools: Vec<Tool> = vec![];
@@ -371,7 +232,7 @@ impl SubAgent {
// Generate response from provider
loop {
match Agent::generate_response_from_provider(
Arc::clone(&provider),
Arc::clone(provider),
&system_prompt,
&messages,
&tools,
@@ -427,20 +288,14 @@ impl SubAgent {
.await;
// Handle platform tools or dispatch to extension manager
let tool_result = if self.is_platform_tool(&tool_call.name) {
self.handle_platform_tool_call(
tool_call.clone(),
&extension_manager,
)
let tool_result = match extension_manager
.read()
.await
} else {
match extension_manager
.dispatch_tool_call(tool_call.clone())
.await
{
Ok(result) => result.result.await,
Err(e) => Err(ToolError::ExecutionError(e.to_string())),
}
.dispatch_tool_call(tool_call.clone())
.await
{
Ok(result) => result.result.await,
Err(e) => Err(ToolError::ExecutionError(e.to_string())),
};
match tool_result {
@@ -529,142 +384,10 @@ impl SubAgent {
Ok(())
}
/// Get formatted conversation for display
pub async fn get_formatted_conversation(&self) -> String {
let conversation = self.conversation.lock().await;
let mut formatted = format!("=== Subagent {} Conversation ===\n", self.id);
if let Some(recipe) = &self.config.recipe {
formatted.push_str(&format!("Recipe: {}\n", recipe.title));
} else if let Some(instructions) = &self.config.instructions {
formatted.push_str(&format!("Instructions: {}\n", instructions));
} else {
formatted.push_str("Mode: Ad-hoc subagent\n");
}
formatted.push_str(&format!(
"Created: {}\n",
self.created_at.format("%Y-%m-%d %H:%M:%S UTC")
));
let progress = self.get_progress().await;
formatted.push_str(&format!("Status: {:?}\n", progress.status));
formatted.push_str(&format!("Turn: {}", progress.turn));
if let Some(max_turns) = progress.max_turns {
formatted.push_str(&format!("/{}", max_turns));
}
formatted.push_str("\n\n");
for (i, message) in conversation.iter().enumerate() {
formatted.push_str(&format!(
"{}. {}: {}\n",
i + 1,
match message.role {
Role::User => "User",
Role::Assistant => "Assistant",
},
message.as_concat_text()
));
}
formatted.push_str("=== End Conversation ===\n");
formatted
}
/// Get the list of extensions that weren't enabled
pub async fn get_missing_extensions(&self) -> Vec<String> {
self.missing_extensions.lock().await.clone()
}
/// Filter out subagent spawning tools to prevent infinite recursion
fn filter_subagent_tools(tools: Vec<Tool>) -> Vec<Tool> {
let original_count = tools.len();
let filtered_tools: Vec<Tool> = tools
.into_iter()
.filter(|tool| {
let should_keep = tool.name != SUBAGENT_RUN_TASK_TOOL_NAME;
if !should_keep {
debug!("Filtering out subagent tool: {}", tool.name);
}
should_keep
})
.collect();
let filtered_count = filtered_tools.len();
if filtered_count < original_count {
debug!(
"Filtered {} subagent tool(s) from {} total tools",
original_count - filtered_count,
original_count
);
}
filtered_tools
}
/// Add platform tools to the subagent's tool list (excluding dangerous tools)
async fn add_platform_tools(tools: &mut Vec<Tool>, extension_manager: &ExtensionManager) {
debug!("Adding safe platform tools to subagent");
// Add safe platform tools - subagents can search for extensions but can't manage them or schedules
tools.push(platform_tools::search_available_extensions_tool());
debug!("Added search_available_extensions tool");
// Add resource tools if supported - these are generally safe for subagents
if extension_manager.supports_resources() {
tools.extend([
platform_tools::read_resource_tool(),
platform_tools::list_resources_tool(),
]);
debug!("Added 2 resource platform tools");
}
// Note: We explicitly do NOT add these tools for security reasons:
// - manage_extensions (could interfere with parent agent's extensions)
// - manage_schedule (could interfere with parent agent's scheduling)
// - subagent spawning tools (prevent recursion)
debug!("Platform tools added successfully (dangerous tools excluded)");
}
/// Check if a tool name is a platform tool that subagents can use
fn is_platform_tool(&self, tool_name: &str) -> bool {
matches!(
tool_name,
PLATFORM_SEARCH_AVAILABLE_EXTENSIONS_TOOL_NAME
| PLATFORM_READ_RESOURCE_TOOL_NAME
| PLATFORM_LIST_RESOURCES_TOOL_NAME
)
}
/// Handle platform tool calls that are safe for subagents
async fn handle_platform_tool_call(
&self,
tool_call: mcp_core::tool::ToolCall,
extension_manager: &ExtensionManager,
) -> Result<Vec<mcp_core::Content>, ToolError> {
debug!("Handling platform tool: {}", tool_call.name);
match tool_call.name.as_str() {
PLATFORM_SEARCH_AVAILABLE_EXTENSIONS_TOOL_NAME => extension_manager
.search_available_extensions()
.await
.map_err(|e| ToolError::ExecutionError(e.to_string())),
PLATFORM_READ_RESOURCE_TOOL_NAME => extension_manager
.read_resource(tool_call.arguments)
.await
.map_err(|e| ToolError::ExecutionError(e.to_string())),
PLATFORM_LIST_RESOURCES_TOOL_NAME => extension_manager
.list_resources(tool_call.arguments)
.await
.map_err(|e| ToolError::ExecutionError(e.to_string())),
_ => Err(ToolError::ExecutionError(format!(
"Platform tool '{}' is not available to subagents for security reasons",
tool_call.name
))),
}
fn _filter_subagent_tools(tools: Vec<Tool>) -> Vec<Tool> {
// TODO: add this in subagent loop
tools
}
/// Build the system prompt for the subagent using the template
@@ -678,14 +401,6 @@ impl SubAgent {
);
context.insert("subagent_id", serde_json::Value::String(self.id.clone()));
// Add recipe information if available
if let Some(recipe) = &self.config.recipe {
context.insert(
"recipe_title",
serde_json::Value::String(recipe.title.clone()),
);
}
// Add max turns if configured
if let Some(max_turns) = self.config.max_turns {
context.insert(
@@ -694,33 +409,6 @@ impl SubAgent {
);
}
// Add task instructions
let instructions = if let Some(recipe) = &self.config.recipe {
recipe.instructions.as_deref().unwrap_or("")
} else {
self.config.instructions.as_deref().unwrap_or("")
};
context.insert(
"task_instructions",
serde_json::Value::String(instructions.to_string()),
);
// Add available extensions (only if we have a recipe and extensions)
if self.config.recipe.is_some() {
let extensions: Vec<String> = self.recipe_extensions.lock().await.clone();
if !extensions.is_empty() {
context.insert(
"extensions",
serde_json::Value::Array(
extensions
.into_iter()
.map(serde_json::Value::String)
.collect(),
),
);
}
}
// Add available tools with descriptions for better context
let tools_with_descriptions: Vec<String> = available_tools
.iter()

View File

@@ -1,27 +1,25 @@
use crate::agents::subagent_execution_tool::lib::{
ExecutionResponse, ExecutionStats, SharedState, Task, TaskResult, TaskStatus,
};
use crate::agents::subagent_execution_tool::task_execution_tracker::{
DisplayMode, TaskExecutionTracker,
};
use crate::agents::subagent_execution_tool::tasks::process_task;
use crate::agents::subagent_execution_tool::workers::spawn_worker;
use crate::agents::subagent_task_config::TaskConfig;
use mcp_core::protocol::JsonRpcMessage;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::time::Instant;
use crate::agents::sub_recipe_execution_tool::lib::{
ExecutionResponse, ExecutionStats, SharedState, Task, TaskResult, TaskStatus,
};
use crate::agents::sub_recipe_execution_tool::task_execution_tracker::{
DisplayMode, TaskExecutionTracker,
};
use crate::agents::sub_recipe_execution_tool::tasks::process_task;
use crate::agents::sub_recipe_execution_tool::workers::spawn_worker;
#[cfg(test)]
mod tests;
const EXECUTION_STATUS_COMPLETED: &str = "completed";
const DEFAULT_MAX_WORKERS: usize = 10;
pub async fn execute_single_task(
task: &Task,
notifier: mpsc::Sender<JsonRpcMessage>,
task_config: TaskConfig,
) -> ExecutionResponse {
let start_time = Instant::now();
let task_execution_tracker = Arc::new(TaskExecutionTracker::new(
@@ -29,7 +27,13 @@ pub async fn execute_single_task(
DisplayMode::SingleTaskOutput,
notifier,
));
let result = process_task(task, task_execution_tracker).await;
let result = process_task(task, task_execution_tracker.clone(), task_config).await;
// Complete the task in the tracker
task_execution_tracker
.complete_task(&result.task_id, result.clone())
.await;
let execution_time = start_time.elapsed().as_millis();
let stats = calculate_stats(&[result.clone()], execution_time);
@@ -43,6 +47,7 @@ pub async fn execute_single_task(
pub async fn execute_tasks_in_parallel(
tasks: Vec<Task>,
notifier: mpsc::Sender<JsonRpcMessage>,
task_config: TaskConfig,
) -> ExecutionResponse {
let task_execution_tracker = Arc::new(TaskExecutionTracker::new(
tasks.clone(),
@@ -70,7 +75,7 @@ pub async fn execute_tasks_in_parallel(
let worker_count = std::cmp::min(task_count, DEFAULT_MAX_WORKERS);
let mut worker_handles = Vec::new();
for i in 0..worker_count {
let handle = spawn_worker(shared_state.clone(), i);
let handle = spawn_worker(shared_state.clone(), i, task_config.clone());
worker_handles.push(handle);
}
@@ -163,17 +168,25 @@ fn create_empty_response() -> ExecutionResponse {
},
}
}
async fn collect_results(
result_rx: &mut mpsc::Receiver<TaskResult>,
task_execution_tracker: Arc<TaskExecutionTracker>,
expected_count: usize,
) -> Vec<TaskResult> {
let mut results = Vec::new();
while let Some(result) = result_rx.recv().await {
while let Some(mut result) = result_rx.recv().await {
// Truncate data to 650 chars if needed
if let Some(data) = result.data.as_mut() {
if let Some(data_str) = data.as_str() {
if data_str.len() > 650 {
*data = serde_json::Value::String(format!("{}...", &data_str[..650]));
}
}
}
task_execution_tracker
.complete_task(&result.task_id, result.clone())
.await;
results.push(result);
if results.len() >= expected_count {
break;

View File

@@ -1,14 +1,11 @@
use crate::agents::sub_recipe_execution_tool::executor::{
execute_single_task, execute_tasks_in_parallel,
};
pub use crate::agents::sub_recipe_execution_tool::task_types::{
pub use crate::agents::subagent_execution_tool::task_types::{
ExecutionMode, ExecutionResponse, ExecutionStats, SharedState, Task, TaskResult, TaskStatus,
};
use crate::agents::sub_recipe_execution_tool::tasks_manager::TasksManager;
#[cfg(test)]
mod tests;
use crate::agents::subagent_execution_tool::{
executor::{execute_single_task, execute_tasks_in_parallel},
tasks_manager::TasksManager,
};
use crate::agents::subagent_task_config::TaskConfig;
use mcp_core::protocol::JsonRpcMessage;
use serde_json::{json, Value};
use tokio::sync::mpsc;
@@ -17,6 +14,7 @@ pub async fn execute_tasks(
input: Value,
execution_mode: ExecutionMode,
notifier: mpsc::Sender<JsonRpcMessage>,
task_config: TaskConfig,
tasks_manager: &TasksManager,
) -> Result<Value, String> {
let task_ids: Vec<String> = serde_json::from_value(
@@ -44,13 +42,12 @@ pub async fn execute_tasks(
match execution_mode {
ExecutionMode::Sequential => {
if task_count == 1 {
let response = execute_single_task(&tasks[0], notifier).await;
let response = execute_single_task(&tasks[0], notifier, task_config).await;
handle_response(response)
} else {
Err("Sequential execution mode requires exactly one task".to_string())
}
}
ExecutionMode::Parallel => {
if tasks.iter().any(|task| task.get_sequential_when_repeated()) {
Ok(json!(
@@ -62,7 +59,7 @@ pub async fn execute_tasks(
))
} else {
let response: ExecutionResponse =
execute_tasks_in_parallel(tasks, notifier.clone()).await;
execute_tasks_in_parallel(tasks, notifier.clone(), task_config).await;
handle_response(response)
}
}

View File

@@ -0,0 +1,10 @@
mod executor;
pub mod lib;
pub mod notification_events;
pub mod subagent_execute_task_tool;
pub mod task_execution_tracker;
pub mod task_types;
pub mod tasks;
pub mod tasks_manager;
pub mod utils;
pub mod workers;

View File

@@ -1,4 +1,4 @@
use crate::agents::sub_recipe_execution_tool::task_types::TaskStatus;
use crate::agents::subagent_execution_tool::task_types::TaskStatus;
use serde::{Deserialize, Serialize};
use serde_json::Value;

View File

@@ -1,20 +1,21 @@
use mcp_core::{tool::ToolAnnotations, Content, Tool, ToolError};
use serde_json::Value;
use crate::agents::subagent_task_config::TaskConfig;
use crate::agents::{
sub_recipe_execution_tool::lib::execute_tasks,
sub_recipe_execution_tool::task_types::ExecutionMode,
sub_recipe_execution_tool::tasks_manager::TasksManager, tool_execution::ToolCallResult,
subagent_execution_tool::lib::execute_tasks,
subagent_execution_tool::task_types::ExecutionMode,
subagent_execution_tool::tasks_manager::TasksManager, tool_execution::ToolCallResult,
};
use mcp_core::protocol::JsonRpcMessage;
use tokio::sync::mpsc;
use tokio_stream;
pub const SUB_RECIPE_EXECUTE_TASK_TOOL_NAME: &str = "sub_recipe__execute_task";
pub fn create_sub_recipe_execute_task_tool() -> Tool {
pub const SUBAGENT_EXECUTE_TASK_TOOL_NAME: &str = "subagent__execute_task";
pub fn create_subagent_execute_task_tool() -> Tool {
Tool::new(
SUB_RECIPE_EXECUTE_TASK_TOOL_NAME,
"Only use this tool when you execute sub recipe task.
SUBAGENT_EXECUTE_TASK_TOOL_NAME,
"Only use the subagent__execute_task tool when you execute sub recipe task or dynamic task.
EXECUTION STRATEGY DECISION:
1. If the tasks are created with execution_mode, use the execution_mode.
2. Execute tasks sequentially unless user explicitly requests parallel execution. PARALLEL: User uses keywords like 'parallel', 'simultaneously', 'at the same time', 'concurrently'
@@ -24,6 +25,7 @@ IMPLEMENTATION:
- Parallel execution: Call this tool once, passing an ARRAY of all tasks
EXAMPLES:
User Intent Based:
- User: 'get weather and tell me a joke' Sequential (2 separate tool calls, 1 task each)
- User: 'get weather and joke in parallel' Parallel (1 tool call with array of 2 tasks)
- User: 'run these simultaneously' Parallel (1 tool call with task array)
@@ -57,7 +59,11 @@ EXAMPLES:
)
}
pub async fn run_tasks(execute_data: Value, tasks_manager: &TasksManager) -> ToolCallResult {
pub async fn run_tasks(
execute_data: Value,
task_config: TaskConfig,
tasks_manager: &TasksManager,
) -> ToolCallResult {
let (notification_tx, notification_rx) = mpsc::channel::<JsonRpcMessage>(100);
let tasks_manager_clone = tasks_manager.clone();
@@ -72,6 +78,7 @@ pub async fn run_tasks(execute_data: Value, tasks_manager: &TasksManager) -> Too
execute_data,
execution_mode,
notification_tx,
task_config,
&tasks_manager_clone,
)
.await

View File

@@ -5,14 +5,12 @@ use std::sync::Arc;
use tokio::sync::{mpsc, RwLock};
use tokio::time::{sleep, Duration, Instant};
use crate::agents::sub_recipe_execution_tool::notification_events::{
use crate::agents::subagent_execution_tool::notification_events::{
FailedTaskInfo, TaskCompletionStats, TaskExecutionNotificationEvent, TaskExecutionStats,
TaskInfo as EventTaskInfo,
};
use crate::agents::sub_recipe_execution_tool::task_types::{
Task, TaskInfo, TaskResult, TaskStatus,
};
use crate::agents::sub_recipe_execution_tool::utils::{count_by_status, get_task_name};
use crate::agents::subagent_execution_tool::task_types::{Task, TaskInfo, TaskResult, TaskStatus};
use crate::agents::subagent_execution_tool::utils::{count_by_status, get_task_name};
use serde_json::Value;
#[derive(Debug, Clone, PartialEq)]
@@ -41,6 +39,18 @@ fn format_task_metadata(task_info: &TaskInfo) -> String {
})
.collect::<Vec<_>>()
.join(",")
} else if task_info.task.task_type == "text_instruction" {
// For text_instruction tasks, extract and display the instruction
if let Some(text_instruction) = task_info.task.get_text_instruction() {
// Truncate long instructions to keep the display clean
if text_instruction.len() > 80 {
format!("instruction={}...", &text_instruction[..77])
} else {
format!("instruction={}", text_instruction)
}
} else {
String::new()
}
} else {
String::new()
}
@@ -113,27 +123,30 @@ impl TaskExecutionTracker {
.map(|task_info| task_info.current_output.clone())
}
async fn format_line(&self, task_info: Option<&TaskInfo>, line: &str) -> String {
if let Some(task_info) = task_info {
let task_name = get_task_name(task_info);
let task_type = task_info.task.task_type.clone();
let metadata = format_task_metadata(task_info);
if metadata.is_empty() {
format!("[{} ({})] {}", task_name, task_type, line)
} else {
format!("[{} ({}) {}] {}", task_name, task_type, metadata, line)
}
} else {
line.to_string()
}
}
pub async fn send_live_output(&self, task_id: &str, line: &str) {
match self.display_mode {
DisplayMode::SingleTaskOutput => {
let tasks = self.tasks.read().await;
let task_info = tasks.get(task_id);
let formatted_line = if let Some(task_info) = task_info {
let task_name = get_task_name(task_info);
let task_type = task_info.task.task_type.clone();
let metadata = format_task_metadata(task_info);
if metadata.is_empty() {
format!("[{} ({})] {}", task_name, task_type, line)
} else {
format!("[{} ({}) {}] {}", task_name, task_type, metadata, line)
}
} else {
line.to_string()
};
let formatted_line = self.format_line(task_info, line).await;
drop(tasks);
let event = TaskExecutionNotificationEvent::line_output(
task_id.to_string(),
formatted_line,

View File

@@ -4,7 +4,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tokio::sync::mpsc;
use crate::agents::sub_recipe_execution_tool::task_execution_tracker::TaskExecutionTracker;
use crate::agents::subagent_execution_tool::task_execution_tracker::TaskExecutionTracker;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
#[serde(rename_all = "lowercase")]
@@ -28,18 +28,18 @@ impl Task {
.flatten()
}
pub fn get_sequential_when_repeated(&self) -> bool {
self.get_sub_recipe()
.and_then(|sr| sr.get("sequential_when_repeated").and_then(|v| v.as_bool()))
.unwrap_or_default()
}
pub fn get_command_parameters(&self) -> Option<&Map<String, Value>> {
self.get_sub_recipe()
.and_then(|sr| sr.get("command_parameters"))
.and_then(|cp| cp.as_object())
}
pub fn get_sequential_when_repeated(&self) -> bool {
self.get_sub_recipe()
.and_then(|sr| sr.get("sequential_when_repeated").and_then(|v| v.as_bool()))
.unwrap_or_default()
}
pub fn get_sub_recipe_name(&self) -> Option<&str> {
self.get_sub_recipe()
.and_then(|sr| sr.get("name"))

View File

@@ -0,0 +1,231 @@
use serde_json::Value;
use std::process::Stdio;
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::Command;
use crate::agents::subagent_execution_tool::task_execution_tracker::TaskExecutionTracker;
use crate::agents::subagent_execution_tool::task_types::{Task, TaskResult, TaskStatus};
use crate::agents::subagent_handler::run_complete_subagent_task;
use crate::agents::subagent_task_config::TaskConfig;
pub async fn process_task(
task: &Task,
task_execution_tracker: Arc<TaskExecutionTracker>,
task_config: TaskConfig,
) -> TaskResult {
match get_task_result(task.clone(), task_execution_tracker, task_config).await {
Ok(data) => TaskResult {
task_id: task.id.clone(),
status: TaskStatus::Completed,
data: Some(data),
error: None,
},
Err(error) => TaskResult {
task_id: task.id.clone(),
status: TaskStatus::Failed,
data: None,
error: Some(error),
},
}
}
async fn get_task_result(
task: Task,
task_execution_tracker: Arc<TaskExecutionTracker>,
task_config: TaskConfig,
) -> Result<Value, String> {
if task.task_type == "text_instruction" {
// Handle text_instruction tasks using subagent system
handle_text_instruction_task(task, task_execution_tracker, task_config).await
} else {
// Handle sub_recipe tasks using command execution
let (command, output_identifier) = build_command(&task)?;
let (stdout_output, stderr_output, success) = run_command(
command,
&output_identifier,
&task.id,
task_execution_tracker,
)
.await?;
if success {
process_output(stdout_output)
} else {
Err(format!("Command failed:\n{}", stderr_output))
}
}
}
async fn handle_text_instruction_task(
task: Task,
task_execution_tracker: Arc<TaskExecutionTracker>,
task_config: TaskConfig,
) -> Result<Value, String> {
let text_instruction = task
.get_text_instruction()
.ok_or_else(|| format!("Task {}: Missing text_instruction", task.id))?;
// Start tracking the task
task_execution_tracker.start_task(&task.id).await;
// Create arguments for the subagent task
let task_arguments = serde_json::json!({
"text_instruction": text_instruction,
// "instructions": "You are a helpful assistant. Execute the given task and provide a clear, concise response.",
});
match run_complete_subagent_task(task_arguments, task_config).await {
Ok(contents) => {
// Extract the text content from the result
let result_text = contents
.into_iter()
.filter_map(|content| match content {
mcp_core::Content::Text(text) => Some(text.text),
_ => None,
})
.collect::<Vec<_>>()
.join("\n");
Ok(serde_json::json!({
"result": result_text
}))
}
Err(e) => {
let error_msg = format!("Subagent execution failed: {}", e);
Err(error_msg)
}
}
}
fn build_command(task: &Task) -> Result<(Command, String), String> {
let task_error = |field: &str| format!("Task {}: Missing {}", task.id, field);
let (mut command, output_identifier) = if task.task_type == "sub_recipe" {
let sub_recipe_name = task
.get_sub_recipe_name()
.ok_or_else(|| task_error("sub_recipe name"))?;
let path = task
.get_sub_recipe_path()
.ok_or_else(|| task_error("sub_recipe path"))?;
let command_parameters = task
.get_command_parameters()
.ok_or_else(|| task_error("command_parameters"))?;
let mut cmd = Command::new("goose");
cmd.arg("run").arg("--recipe").arg(path).arg("--no-session");
for (key, value) in command_parameters {
let key_str = key.to_string();
let value_str = value.as_str().unwrap_or(&value.to_string()).to_string();
cmd.arg("--params")
.arg(format!("{}={}", key_str, value_str));
}
(cmd, format!("sub-recipe {}", sub_recipe_name))
} else {
// This branch should not be reached for text_instruction tasks anymore
// as they are handled in handle_text_instruction_task
return Err("Text instruction tasks are handled separately".to_string());
};
command.stdout(Stdio::piped());
command.stderr(Stdio::piped());
Ok((command, output_identifier))
}
async fn run_command(
mut command: Command,
output_identifier: &str,
task_id: &str,
task_execution_tracker: Arc<TaskExecutionTracker>,
) -> Result<(String, String, bool), String> {
let mut child = command
.spawn()
.map_err(|e| format!("Failed to spawn goose: {}", e))?;
let stdout = child.stdout.take().expect("Failed to capture stdout");
let stderr = child.stderr.take().expect("Failed to capture stderr");
let stdout_task = spawn_output_reader(
stdout,
output_identifier,
false,
task_id,
task_execution_tracker.clone(),
);
let stderr_task = spawn_output_reader(
stderr,
output_identifier,
true,
task_id,
task_execution_tracker.clone(),
);
let status = child
.wait()
.await
.map_err(|e| format!("Failed to wait for process: {}", e))?;
let stdout_output = stdout_task.await.unwrap();
let stderr_output = stderr_task.await.unwrap();
Ok((stdout_output, stderr_output, status.success()))
}
fn spawn_output_reader(
reader: impl tokio::io::AsyncRead + Unpin + Send + 'static,
output_identifier: &str,
is_stderr: bool,
task_id: &str,
task_execution_tracker: Arc<TaskExecutionTracker>,
) -> tokio::task::JoinHandle<String> {
let output_identifier = output_identifier.to_string();
let task_id = task_id.to_string();
tokio::spawn(async move {
let mut buffer = String::new();
let mut lines = BufReader::new(reader).lines();
while let Ok(Some(line)) = lines.next_line().await {
buffer.push_str(&line);
buffer.push('\n');
if !is_stderr {
task_execution_tracker
.send_live_output(&task_id, &line)
.await;
} else {
tracing::warn!("Task stderr [{}]: {}", output_identifier, line);
}
}
buffer
})
}
fn extract_json_from_line(line: &str) -> Option<String> {
let start = line.find('{')?;
let end = line.rfind('}')?;
if start >= end {
return None;
}
let potential_json = &line[start..=end];
if serde_json::from_str::<Value>(potential_json).is_ok() {
Some(potential_json.to_string())
} else {
None
}
}
fn process_output(stdout_output: String) -> Result<Value, String> {
let last_line = stdout_output
.lines()
.filter(|line| !line.trim().is_empty())
.next_back()
.unwrap_or("");
if let Some(json_string) = extract_json_from_line(last_line) {
Ok(Value::String(json_string))
} else {
Ok(Value::String(stdout_output))
}
}

View File

@@ -2,7 +2,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use crate::agents::sub_recipe_execution_tool::task_types::Task;
use crate::agents::subagent_execution_tool::task_types::Task;
#[derive(Debug, Clone)]
pub struct TasksManager {

View File

@@ -1,6 +1,6 @@
use std::collections::HashMap;
use crate::agents::sub_recipe_execution_tool::task_types::{TaskInfo, TaskStatus};
use crate::agents::subagent_execution_tool::task_types::{TaskInfo, TaskStatus};
pub fn get_task_name(task_info: &TaskInfo) -> &str {
task_info

View File

@@ -1,5 +1,5 @@
use crate::agents::sub_recipe_execution_tool::task_types::{Task, TaskInfo, TaskStatus};
use crate::agents::sub_recipe_execution_tool::utils::{count_by_status, get_task_name};
use crate::agents::subagent_execution_tool::task_types::{Task, TaskInfo, TaskStatus};
use crate::agents::subagent_execution_tool::utils::{count_by_status, get_task_name};
use serde_json::json;
use std::collections::HashMap;

View File

@@ -0,0 +1,40 @@
use crate::agents::subagent_execution_tool::task_types::{SharedState, Task};
use crate::agents::subagent_execution_tool::tasks::process_task;
use crate::agents::subagent_task_config::TaskConfig;
use std::sync::Arc;
async fn receive_task(state: &SharedState) -> Option<Task> {
let mut receiver = state.task_receiver.lock().await;
receiver.recv().await
}
pub fn spawn_worker(
state: Arc<SharedState>,
worker_id: usize,
task_config: TaskConfig,
) -> tokio::task::JoinHandle<()> {
state.increment_active_workers();
tokio::spawn(async move {
worker_loop(state, worker_id, task_config).await;
})
}
async fn worker_loop(state: Arc<SharedState>, _worker_id: usize, task_config: TaskConfig) {
while let Some(task) = receive_task(&state).await {
state.task_execution_tracker.start_task(&task.id).await;
let result = process_task(
&task,
state.task_execution_tracker.clone(),
task_config.clone(),
)
.await;
if let Err(e) = state.result_sender.send(result).await {
tracing::error!("Worker failed to send result: {}", e);
break;
}
}
state.decrement_active_workers();
}

View File

@@ -1,79 +1,43 @@
use crate::agents::subagent::SubAgent;
use crate::agents::subagent_task_config::TaskConfig;
use anyhow::Result;
use mcp_core::{Content, ToolError};
use serde_json::Value;
use std::sync::Arc;
use crate::agents::subagent_types::SpawnSubAgentArgs;
use crate::agents::Agent;
/// Standalone function to run a complete subagent task
pub async fn run_complete_subagent_task(
task_arguments: Value,
task_config: TaskConfig,
) -> Result<Vec<Content>, ToolError> {
// Parse arguments - using "task" as the main message parameter
let text_instruction = task_arguments
.get("text_instruction")
.and_then(|v| v.as_str())
.ok_or_else(|| ToolError::ExecutionError("Missing text_instruction parameter".to_string()))?
.to_string();
impl Agent {
/// Handle running a complete subagent task (replaces the individual spawn/send/check tools)
pub async fn handle_run_subagent_task(
&self,
arguments: Value,
) -> Result<Vec<Content>, ToolError> {
let subagent_manager = self.subagent_manager.lock().await;
let manager = subagent_manager.as_ref().ok_or_else(|| {
ToolError::ExecutionError("Subagent manager not initialized".to_string())
})?;
// Create the subagent with the parent agent's provider
let (subagent, handle) = SubAgent::new(task_config.clone())
.await
.map_err(|e| ToolError::ExecutionError(format!("Failed to create subagent: {}", e)))?;
// Parse arguments - using "task" as the main message parameter
let message = arguments
.get("task")
.and_then(|v| v.as_str())
.ok_or_else(|| ToolError::ExecutionError("Missing task parameter".to_string()))?
.to_string();
// Either recipe_name or instructions must be provided
let recipe_name = arguments
.get("recipe_name")
.and_then(|v| v.as_str())
.map(|s| s.to_string());
let instructions = arguments
.get("instructions")
.and_then(|v| v.as_str())
.map(|s| s.to_string());
let mut args = if let Some(recipe_name) = recipe_name {
SpawnSubAgentArgs::new_with_recipe(recipe_name, message.clone())
} else if let Some(instructions) = instructions {
SpawnSubAgentArgs::new_with_instructions(instructions, message.clone())
} else {
return Err(ToolError::ExecutionError(
"Either recipe_name or instructions parameter must be provided".to_string(),
));
};
// Set max_turns with default of 10
let max_turns = arguments
.get("max_turns")
.and_then(|v| v.as_u64())
.unwrap_or(10) as usize;
args = args.with_max_turns(max_turns);
if let Some(timeout) = arguments.get("timeout_seconds").and_then(|v| v.as_u64()) {
args = args.with_timeout(timeout);
// Execute the subagent task
let result = match subagent.reply_subagent(text_instruction, task_config).await {
Ok(response) => {
let response_text = response.as_concat_text();
Ok(vec![Content::text(response_text)])
}
Err(e) => Err(ToolError::ExecutionError(format!(
"Subagent execution failed: {}",
e
))),
};
// Get the provider from the parent agent
let provider = self
.provider()
.await
.map_err(|e| ToolError::ExecutionError(format!("Failed to get provider: {}", e)))?;
// Get the extension manager from the parent agent
let extension_manager = Arc::new(self.extension_manager.read().await);
// Run the complete subagent task
match manager
.run_complete_subagent_task(args, provider, extension_manager)
.await
{
Ok(result) => Ok(vec![Content::text(result)]),
Err(e) => Err(ToolError::ExecutionError(format!(
"Failed to run subagent task: {}",
e
))),
}
// Clean up the subagent handle
if let Err(e) = handle.await {
tracing::debug!("Subagent handle cleanup error: {}", e);
}
// Return the result
result
}

View File

@@ -1,404 +0,0 @@
use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;
use anyhow::{anyhow, Result};
use mcp_core::protocol::JsonRpcMessage;
use tokio::sync::{mpsc, Mutex, RwLock};
use tracing::{debug, error, instrument, warn};
use crate::agents::extension_manager::ExtensionManager;
use crate::agents::subagent::{SubAgent, SubAgentConfig, SubAgentProgress, SubAgentStatus};
use crate::agents::subagent_types::SpawnSubAgentArgs;
use crate::providers::base::Provider;
use crate::recipe::Recipe;
/// Manages the lifecycle of subagents
pub struct SubAgentManager {
subagents: Arc<RwLock<HashMap<String, Arc<SubAgent>>>>,
handles: Arc<Mutex<HashMap<String, tokio::task::JoinHandle<()>>>>,
mcp_notification_tx: mpsc::Sender<JsonRpcMessage>,
}
impl SubAgentManager {
/// Create a new subagent manager
pub fn new(mcp_notification_tx: mpsc::Sender<JsonRpcMessage>) -> Self {
Self {
subagents: Arc::new(RwLock::new(HashMap::new())),
handles: Arc::new(Mutex::new(HashMap::new())),
mcp_notification_tx,
}
}
/// Spawn a new interactive subagent
#[instrument(skip(self, args, provider, extension_manager))]
pub async fn spawn_interactive_subagent(
&self,
args: SpawnSubAgentArgs,
provider: Arc<dyn Provider>,
extension_manager: Arc<tokio::sync::RwLockReadGuard<'_, ExtensionManager>>,
) -> Result<String> {
debug!("Spawning interactive subagent");
// Create subagent config based on whether we have a recipe or instructions
let mut config = if let Some(recipe_name) = args.recipe_name {
debug!("Using recipe: {}", recipe_name);
// Load the recipe
let recipe = self.load_recipe(&recipe_name).await?;
SubAgentConfig::new_with_recipe(recipe)
} else if let Some(instructions) = args.instructions {
debug!("Using direct instructions");
SubAgentConfig::new_with_instructions(instructions)
} else {
return Err(anyhow!(
"Either recipe_name or instructions must be provided"
));
};
if let Some(max_turns) = args.max_turns {
config = config.with_max_turns(max_turns);
}
if let Some(timeout) = args.timeout_seconds {
config = config.with_timeout(timeout);
}
// Create the subagent with the parent agent's provider
let (subagent, handle) = SubAgent::new(
config,
Arc::clone(&provider),
Arc::clone(&extension_manager),
self.mcp_notification_tx.clone(),
)
.await?;
let subagent_id = subagent.id.clone();
// Store the subagent and its handle
{
let mut subagents = self.subagents.write().await;
subagents.insert(subagent_id.clone(), Arc::clone(&subagent));
}
{
let mut handles = self.handles.lock().await;
handles.insert(subagent_id.clone(), handle);
}
// Return immediately - no initial message processing
Ok(subagent_id)
}
/// Get a subagent by ID
pub async fn get_subagent(&self, id: &str) -> Option<Arc<SubAgent>> {
let subagents = self.subagents.read().await;
subagents.get(id).cloned()
}
/// List all active subagent IDs
pub async fn list_subagents(&self) -> Vec<String> {
let subagents = self.subagents.read().await;
subagents.keys().cloned().collect()
}
/// Get status of all subagents
pub async fn get_subagent_status(&self) -> HashMap<String, SubAgentStatus> {
let subagents = self.subagents.read().await;
let mut status_map = HashMap::new();
for (id, subagent) in subagents.iter() {
status_map.insert(id.clone(), subagent.get_status().await);
}
status_map
}
/// Get progress of all subagents
pub async fn get_subagent_progress(&self) -> HashMap<String, SubAgentProgress> {
let subagents = self.subagents.read().await;
let mut progress_map = HashMap::new();
for (id, subagent) in subagents.iter() {
progress_map.insert(id.clone(), subagent.get_progress().await);
}
progress_map
}
/// Send a message to a specific subagent
#[instrument(skip(self, message, provider, extension_manager))]
pub async fn send_message_to_subagent(
&self,
subagent_id: &str,
message: String,
provider: Arc<dyn Provider>,
extension_manager: Arc<tokio::sync::RwLockReadGuard<'_, ExtensionManager>>,
) -> Result<String> {
let subagent = self
.get_subagent(subagent_id)
.await
.ok_or_else(|| anyhow!("Subagent {} not found", subagent_id))?;
// Process the message and get a reply
match subagent
.reply_subagent(message, provider, extension_manager)
.await
{
Ok(response) => Ok(format!(
"Message sent to subagent {}. Response:\n{}",
subagent_id,
response.as_concat_text()
)),
Err(e) => Err(anyhow!("Failed to process message in subagent: {}", e)),
}
}
/// Terminate a specific subagent
#[instrument(skip(self))]
pub async fn terminate_subagent(&self, id: &str) -> Result<()> {
debug!("Terminating subagent {}", id);
// Get and terminate the subagent
let subagent = {
let mut subagents = self.subagents.write().await;
subagents.remove(id)
};
if let Some(subagent) = subagent {
subagent.terminate().await?;
} else {
warn!("Attempted to terminate non-existent subagent {}", id);
return Err(anyhow!("Subagent {} not found", id));
}
// Clean up the background handle
let handle = {
let mut handles = self.handles.lock().await;
handles.remove(id)
};
if let Some(handle) = handle {
handle.abort();
}
debug!("Subagent {} terminated successfully", id);
Ok(())
}
/// Terminate all subagents
#[instrument(skip(self))]
pub async fn terminate_all_subagents(&self) -> Result<()> {
debug!("Terminating all subagents");
let subagent_ids: Vec<String> = {
let subagents = self.subagents.read().await;
subagents.keys().cloned().collect()
};
for id in subagent_ids {
if let Err(e) = self.terminate_subagent(&id).await {
error!("Failed to terminate subagent {}: {}", id, e);
}
}
debug!("All subagents terminated");
Ok(())
}
/// Get formatted conversation from a subagent
pub async fn get_subagent_conversation(&self, id: &str) -> Result<String> {
let subagent = self
.get_subagent(id)
.await
.ok_or_else(|| anyhow!("Subagent {} not found", id))?;
Ok(subagent.get_formatted_conversation().await)
}
/// Clean up completed or failed subagents
pub async fn cleanup_completed_subagents(&self) -> Result<usize> {
let mut completed_ids = Vec::new();
// Find completed subagents
{
let subagents = self.subagents.read().await;
for (id, subagent) in subagents.iter() {
if subagent.is_completed().await {
completed_ids.push(id.clone());
}
}
}
// Remove completed subagents
let count = completed_ids.len();
for id in completed_ids {
if let Err(e) = self.terminate_subagent(&id).await {
error!("Failed to cleanup completed subagent {}: {}", id, e);
}
}
debug!("Cleaned up {} completed subagents", count);
Ok(count)
}
/// Load a recipe from file
async fn load_recipe(&self, recipe_name: &str) -> Result<Recipe> {
// Try to load from current directory first
let recipe_path = if recipe_name.ends_with(".yaml") || recipe_name.ends_with(".yml") {
recipe_name.to_string()
} else {
format!("{}.yaml", recipe_name)
};
if Path::new(&recipe_path).exists() {
let content = tokio::fs::read_to_string(&recipe_path).await?;
let recipe: Recipe = serde_yaml::from_str(&content)?;
return Ok(recipe);
}
// Try some common recipe locations
let common_paths = [
format!("recipes/{}", recipe_path),
format!("./recipes/{}", recipe_path),
format!("../recipes/{}", recipe_path),
];
for path in &common_paths {
if Path::new(path).exists() {
let content = tokio::fs::read_to_string(path).await?;
let recipe: Recipe = serde_yaml::from_str(&content)?;
return Ok(recipe);
}
}
Err(anyhow!(
"Recipe file '{}' not found in current directory or common recipe locations",
recipe_name
))
}
/// Get count of active subagents
pub async fn get_active_count(&self) -> usize {
let subagents = self.subagents.read().await;
subagents.len()
}
/// Check if a subagent exists
pub async fn has_subagent(&self, id: &str) -> bool {
let subagents = self.subagents.read().await;
subagents.contains_key(id)
}
/// Run a complete subagent task (spawn, execute, cleanup)
#[instrument(skip(self, args, provider, extension_manager))]
pub async fn run_complete_subagent_task(
&self,
args: SpawnSubAgentArgs,
provider: Arc<dyn Provider>,
extension_manager: Arc<tokio::sync::RwLockReadGuard<'_, ExtensionManager>>,
) -> Result<String> {
debug!("Running complete subagent task");
// Create subagent config based on whether we have a recipe or instructions
let mut config = if let Some(recipe_name) = args.recipe_name {
debug!("Using recipe: {}", recipe_name);
// Load the recipe
let recipe = self.load_recipe(&recipe_name).await?;
SubAgentConfig::new_with_recipe(recipe)
} else if let Some(instructions) = args.instructions {
debug!("Using direct instructions");
SubAgentConfig::new_with_instructions(instructions)
} else {
return Err(anyhow!(
"Either recipe_name or instructions must be provided"
));
};
// Set default max_turns if not provided
let max_turns = args.max_turns.unwrap_or(10);
config = config.with_max_turns(max_turns);
if let Some(timeout) = args.timeout_seconds {
config = config.with_timeout(timeout);
}
// Create the subagent with the parent agent's provider
let (subagent, handle) = SubAgent::new(
config,
Arc::clone(&provider),
Arc::clone(&extension_manager),
self.mcp_notification_tx.clone(),
)
.await?;
let subagent_id = subagent.id.clone();
// Store the subagent and its handle temporarily
{
let mut subagents = self.subagents.write().await;
subagents.insert(subagent_id.clone(), Arc::clone(&subagent));
}
{
let mut handles = self.handles.lock().await;
handles.insert(subagent_id.clone(), handle);
}
// Run the complete conversation
let mut conversation_result = String::new();
let turn_count = 0;
let current_message = args.message.clone();
// For now, we just complete after one turn since we don't have a mechanism
// for the subagent to continue autonomously without user input
// In a future iteration, we could add logic for the subagent to continue
// working on multi-step tasks with proper turn management
match subagent
.reply_subagent(
current_message,
Arc::clone(&provider),
Arc::clone(&extension_manager),
)
.await
{
Ok(response) => {
let response_text = response.as_concat_text();
conversation_result.push_str(&format!(
"\n--- Turn {} ---\n{}",
turn_count + 1,
response_text
));
conversation_result.push_str(&format!(
"\n[Task completed after {} turns]",
turn_count + 1
));
}
Err(e) => {
conversation_result
.push_str(&format!("\n[Error after {} turns: {}]", turn_count, e));
}
}
// Clean up the subagent
if let Err(e) = self.terminate_subagent(&subagent_id).await {
debug!("Failed to cleanup subagent {}: {}", subagent_id, e);
}
// Return the complete conversation result
Ok(format!("Subagent task completed:\n{}", conversation_result))
}
}
impl Default for SubAgentManager {
fn default() -> Self {
// Create a dummy channel for default implementation
// In practice, this should not be used - SubAgentManager should be created
// with a proper MCP notification sender
let (tx, _rx) = mpsc::channel(1);
Self::new(tx)
}
}
impl Drop for SubAgentManager {
fn drop(&mut self) {
// Note: In a real implementation, you might want to spawn a task to clean up
// subagents gracefully, but for now we'll rely on the Drop implementations
// of the individual components
debug!("SubAgentManager dropped");
}
}

View File

@@ -0,0 +1,55 @@
use crate::agents::extension_manager::ExtensionManager;
use crate::providers::base::Provider;
use mcp_core::protocol::JsonRpcMessage;
use std::fmt;
use std::sync::Arc;
use tokio::sync::{mpsc, RwLock};
use uuid::Uuid;
/// Configuration for task execution with all necessary dependencies
#[derive(Clone)]
pub struct TaskConfig {
pub id: String,
pub provider: Option<Arc<dyn Provider>>,
pub extension_manager: Option<Arc<RwLock<ExtensionManager>>>,
pub mcp_tx: mpsc::Sender<JsonRpcMessage>,
pub max_turns: Option<usize>,
}
impl fmt::Debug for TaskConfig {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("TaskConfig")
.field("id", &self.id)
.field("provider", &"<dyn Provider>")
.field("extension_manager", &"<ExtensionManager>")
.field("max_turns", &self.max_turns)
.finish()
}
}
impl TaskConfig {
/// Create a new TaskConfig with all required dependencies
pub fn new(
provider: Option<Arc<dyn Provider>>,
extension_manager: Option<Arc<RwLock<ExtensionManager>>>,
mcp_tx: mpsc::Sender<JsonRpcMessage>,
) -> Self {
Self {
id: Uuid::new_v4().to_string(),
provider,
extension_manager,
mcp_tx,
max_turns: Some(10),
}
}
/// Get a reference to the provider
pub fn provider(&self) -> Option<&Arc<dyn Provider>> {
self.provider.as_ref()
}
/// Get a clone of the MCP sender
pub fn mcp_tx(&self) -> mpsc::Sender<JsonRpcMessage> {
self.mcp_tx.clone()
}
}

View File

@@ -1,68 +0,0 @@
use indoc::indoc;
use mcp_core::tool::{Tool, ToolAnnotations};
use serde_json::json;
pub const SUBAGENT_RUN_TASK_TOOL_NAME: &str = "subagent__run_task";
pub fn run_task_subagent_tool() -> Tool {
Tool::new(
SUBAGENT_RUN_TASK_TOOL_NAME.to_string(),
indoc! {r#"
Spawn a specialized subagent to handle a specific task completely and automatically.
This tool creates a subagent, processes your task through a complete conversation,
and returns the final result. The subagent is automatically cleaned up after completion.
You can configure the subagent in two ways:
1. Using a recipe file that defines instructions, extensions, and behavior
2. Providing direct instructions for ad-hoc tasks
The subagent will work autonomously until the task is complete, it reaches max_turns,
or it encounters an error. You'll get the final result without needing to manage
the subagent lifecycle manually.
Examples:
- "Convert these unittest files to pytest format: file1.py, file2.py"
- "Research the latest developments in AI and provide a comprehensive summary"
- "Review this code for security vulnerabilities and suggest fixes"
- "Refactor this legacy code to use modern Python patterns"
"#}
.to_string(),
json!({
"type": "object",
"required": ["task"],
"properties": {
"recipe_name": {
"type": "string",
"description": "Name of the recipe file to configure the subagent (e.g., 'research_assistant_recipe.yaml'). Either this or 'instructions' must be provided."
},
"instructions": {
"type": "string",
"description": "Direct instructions for the subagent's task. Either this or 'recipe_name' must be provided. Example: 'You are a code refactoring assistant. Help convert unittest tests to pytest format.'"
},
"task": {
"type": "string",
"description": "The task description or initial message for the subagent to work on"
},
"max_turns": {
"type": "integer",
"description": "Maximum number of conversation turns before auto-completion (default: 10)",
"minimum": 1,
"default": 10
},
"timeout_seconds": {
"type": "integer",
"description": "Optional timeout for the entire task in seconds",
"minimum": 1
}
}
}),
Some(ToolAnnotations {
title: Some("Run subagent task".to_string()),
read_only_hint: false,
destructive_hint: false,
idempotent_hint: false,
open_world_hint: false,
}),
)
}

View File

@@ -1,42 +0,0 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SpawnSubAgentArgs {
pub recipe_name: Option<String>,
pub instructions: Option<String>,
pub message: String,
pub max_turns: Option<usize>,
pub timeout_seconds: Option<u64>,
}
impl SpawnSubAgentArgs {
pub fn new_with_recipe(recipe_name: String, message: String) -> Self {
Self {
recipe_name: Some(recipe_name),
instructions: None,
message,
max_turns: None,
timeout_seconds: None,
}
}
pub fn new_with_instructions(instructions: String, message: String) -> Self {
Self {
recipe_name: None,
instructions: Some(instructions),
message,
max_turns: None,
timeout_seconds: None,
}
}
pub fn with_max_turns(mut self, max_turns: usize) -> Self {
self.max_turns = Some(max_turns);
self
}
pub fn with_timeout(mut self, timeout_seconds: u64) -> Self {
self.timeout_seconds = Some(timeout_seconds);
self
}
}

View File

@@ -146,11 +146,6 @@ pub struct SubRecipe {
#[serde(default)]
pub sequential_when_repeated: bool,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Execution {
#[serde(default)]
pub parallel: bool,
}
fn deserialize_value_map_as_string<'de, D>(
deserializer: D,

View File

@@ -3,6 +3,10 @@ use std::time::Instant;
#[tokio::test]
async fn test_pricing_cache_performance() {
// Use a unique cache directory for this test to avoid conflicts
let test_cache_dir = format!("/tmp/goose_test_cache_perf_{}", std::process::id());
std::env::set_var("GOOSE_CACHE_DIR", &test_cache_dir);
// Initialize the cache
let start = Instant::now();
initialize_pricing_cache()
@@ -65,10 +69,18 @@ async fn test_pricing_cache_performance() {
first_fetch_duration,
second_fetch_duration
);
// Clean up
std::env::remove_var("GOOSE_CACHE_DIR");
let _ = std::fs::remove_dir_all(&test_cache_dir);
}
#[tokio::test]
async fn test_pricing_refresh() {
// Use a unique cache directory for this test to avoid conflicts
let test_cache_dir = format!("/tmp/goose_test_cache_refresh_{}", std::process::id());
std::env::set_var("GOOSE_CACHE_DIR", &test_cache_dir);
// Initialize first
initialize_pricing_cache()
.await
@@ -90,10 +102,18 @@ async fn test_pricing_refresh() {
refreshed_pricing.is_some(),
"Expected pricing after refresh"
);
// Clean up
std::env::remove_var("GOOSE_CACHE_DIR");
let _ = std::fs::remove_dir_all(&test_cache_dir);
}
#[tokio::test]
async fn test_model_not_in_openrouter() {
// Use a unique cache directory for this test to avoid conflicts
let test_cache_dir = format!("/tmp/goose_test_cache_model_{}", std::process::id());
std::env::set_var("GOOSE_CACHE_DIR", &test_cache_dir);
initialize_pricing_cache()
.await
.expect("Failed to initialize pricing cache");
@@ -104,12 +124,20 @@ async fn test_model_not_in_openrouter() {
pricing.is_none(),
"Should return None for non-existent model"
);
// Clean up
std::env::remove_var("GOOSE_CACHE_DIR");
let _ = std::fs::remove_dir_all(&test_cache_dir);
}
#[tokio::test]
async fn test_concurrent_access() {
use tokio::task;
// Use a unique cache directory for this test to avoid conflicts
let test_cache_dir = format!("/tmp/goose_test_cache_concurrent_{}", std::process::id());
std::env::set_var("GOOSE_CACHE_DIR", &test_cache_dir);
initialize_pricing_cache()
.await
.expect("Failed to initialize pricing cache");
@@ -133,4 +161,8 @@ async fn test_concurrent_access() {
assert!(has_pricing, "Task {} should have gotten pricing", task_id);
println!("Task {} took: {:?}", task_id, duration);
}
// Clean up
std::env::remove_var("GOOSE_CACHE_DIR");
let _ = std::fs::remove_dir_all(&test_cache_dir);
}