Co-authored-by: Lily Delalande <ldelalande@squareup.com>
This commit is contained in:
Wendy Tang
2025-06-25 16:18:48 -07:00
committed by GitHub
parent c3acddc360
commit d9d7eb0697
17 changed files with 1709 additions and 84 deletions

View File

@@ -618,6 +618,7 @@ async fn process_message_streaming(
// Log model change
tracing::info!("Model changed to {} in {} mode", model, mode);
}
Err(e) => {
error!("Error in message stream: {}", e);
let mut sender = sender.lock().await;

View File

@@ -906,23 +906,87 @@ impl Session {
match method.as_str() {
"notifications/message" => {
let data = o.get("data").unwrap_or(&Value::Null);
let message = match data {
Value::String(s) => s.clone(),
let (formatted_message, subagent_id, _notification_type) = match data {
Value::String(s) => (s.clone(), None, None),
Value::Object(o) => {
if let Some(Value::String(output)) = o.get("output") {
output.to_owned()
// Check for subagent notification structure first
if let Some(Value::String(msg)) = o.get("message") {
// Extract subagent info for better display
let subagent_id = o.get("subagent_id")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let notification_type = o.get("type")
.and_then(|v| v.as_str())
.unwrap_or("");
let formatted = match notification_type {
"subagent_created" | "completed" | "terminated" => {
format!("🤖 {}", msg)
}
"tool_usage" | "tool_completed" | "tool_error" => {
format!("🔧 {}", msg)
}
"message_processing" | "turn_progress" => {
format!("💭 {}", msg)
}
"response_generated" => {
// Check verbosity setting for subagent response content
let config = Config::global();
let min_priority = config
.get_param::<f32>("GOOSE_CLI_MIN_PRIORITY")
.ok()
.unwrap_or(0.5);
if min_priority > 0.1 && !self.debug {
// High/Medium verbosity: show truncated response
if let Some(response_content) = msg.strip_prefix("Responded: ") {
if response_content.len() > 100 {
format!("🤖 Responded: {}...", &response_content[..100])
} else {
format!("🤖 {}", msg)
}
} else {
format!("🤖 {}", msg)
}
} else {
// All verbosity or debug: show full response
format!("🤖 {}", msg)
}
}
_ => {
msg.to_string()
}
};
(formatted, Some(subagent_id.to_string()), Some(notification_type.to_string()))
} else if let Some(Value::String(output)) = o.get("output") {
// Fallback for other MCP notification types
(output.to_owned(), None, None)
} else {
data.to_string()
(data.to_string(), None, None)
}
},
v => {
v.to_string()
(v.to_string(), None, None)
},
};
if interactive {
output::set_thinking_message(&message);
// Handle subagent notifications - show immediately
if let Some(_id) = subagent_id {
// Show subagent notifications immediately (no buffering) with compact spacing
if interactive {
let _ = progress_bars.hide();
println!("{}", console::style(&formatted_message).green().dim());
} else {
progress_bars.log(&formatted_message);
}
} else {
progress_bars.log(&message);
// Non-subagent notification, display immediately with compact spacing
if interactive {
let _ = progress_bars.hide();
println!("{}", console::style(&formatted_message).green().dim());
} else {
progress_bars.log(&formatted_message);
}
}
},
"notifications/progress" => {
@@ -951,6 +1015,7 @@ impl Session {
eprintln!("Model changed to {} in {} mode", model, mode);
}
}
Some(Err(e)) => {
eprintln!("Error: {}", e);
drop(stream);

View File

@@ -117,6 +117,7 @@ pub fn hide_thinking() {
THINKING.with(|t| t.borrow_mut().hide());
}
#[allow(dead_code)]
pub fn set_thinking_message(s: &String) {
THINKING.with(|t| {
if let Some(spinner) = t.borrow_mut().spinner.as_mut() {

View File

@@ -269,6 +269,7 @@ pub unsafe extern "C" fn goose_agent_send_message(
Ok(AgentEvent::ModelChange { .. }) => {
// Model change events are informational, just continue
}
Err(e) => {
full_response.push_str(&format!("\nError in message stream: {}", e));
}

View File

@@ -277,6 +277,7 @@ async fn handler(
).await;
}
}
Ok(Some(Err(e))) => {
tracing::error!("Error processing message: {}", e);
let _ = stream_event(
@@ -392,6 +393,7 @@ async fn ask_handler(
// Handle notifications if needed
tracing::info!("Received notification: {:?}", n);
}
Err(e) => {
tracing::error!("Error processing as_ai message: {}", e);
return Err(StatusCode::INTERNAL_SERVER_ERROR);

View File

@@ -22,7 +22,7 @@ use crate::scheduler_trait::SchedulerTrait;
use crate::tool_monitor::{ToolCall, ToolMonitor};
use regex::Regex;
use serde_json::Value;
use tokio::sync::{mpsc, Mutex};
use tokio::sync::{mpsc, Mutex, RwLock};
use tracing::{debug, error, instrument};
use crate::agents::extension::{ExtensionConfig, ExtensionError, ExtensionResult, ToolInfo};
@@ -45,14 +45,18 @@ use mcp_core::{
prompt::Prompt, protocol::GetPromptResult, tool::Tool, Content, ToolError, ToolResult,
};
use crate::agents::subagent_tools::SUBAGENT_RUN_TASK_TOOL_NAME;
use super::platform_tools;
use super::router_tools;
use super::subagent_manager::SubAgentManager;
use super::subagent_tools;
use super::tool_execution::{ToolCallResult, CHAT_MODE_TOOL_SKIPPED_RESPONSE, DECLINED_RESPONSE};
/// The main goose Agent
pub struct Agent {
pub(super) provider: Mutex<Option<Arc<dyn Provider>>>,
pub(super) extension_manager: Mutex<ExtensionManager>,
pub(super) extension_manager: RwLock<ExtensionManager>,
pub(super) sub_recipe_manager: Mutex<SubRecipeManager>,
pub(super) frontend_tools: Mutex<HashMap<String, FrontendTool>>,
pub(super) frontend_instructions: Mutex<Option<String>>,
@@ -64,6 +68,8 @@ pub struct Agent {
pub(super) tool_monitor: Mutex<Option<ToolMonitor>>,
pub(super) router_tool_selector: Mutex<Option<Arc<Box<dyn RouterToolSelector>>>>,
pub(super) scheduler_service: Mutex<Option<Arc<dyn SchedulerTrait>>>,
pub(super) subagent_manager: Mutex<Option<SubAgentManager>>,
pub(super) mcp_notification_rx: Arc<Mutex<mpsc::Receiver<JsonRpcMessage>>>,
}
#[derive(Clone, Debug)]
@@ -73,52 +79,6 @@ pub enum AgentEvent {
ModelChange { model: String, mode: String },
}
impl Agent {
pub fn new() -> Self {
// Create channels with buffer size 32 (adjust if needed)
let (confirm_tx, confirm_rx) = mpsc::channel(32);
let (tool_tx, tool_rx) = mpsc::channel(32);
Self {
provider: Mutex::new(None),
extension_manager: Mutex::new(ExtensionManager::new()),
sub_recipe_manager: Mutex::new(SubRecipeManager::new()),
frontend_tools: Mutex::new(HashMap::new()),
frontend_instructions: Mutex::new(None),
prompt_manager: Mutex::new(PromptManager::new()),
confirmation_tx: confirm_tx,
confirmation_rx: Mutex::new(confirm_rx),
tool_result_tx: tool_tx,
tool_result_rx: Arc::new(Mutex::new(tool_rx)),
tool_monitor: Mutex::new(None),
router_tool_selector: Mutex::new(None),
scheduler_service: Mutex::new(None),
}
}
pub async fn configure_tool_monitor(&self, max_repetitions: Option<u32>) {
let mut tool_monitor = self.tool_monitor.lock().await;
*tool_monitor = Some(ToolMonitor::new(max_repetitions));
}
pub async fn get_tool_stats(&self) -> Option<HashMap<String, u32>> {
let tool_monitor = self.tool_monitor.lock().await;
tool_monitor.as_ref().map(|monitor| monitor.get_stats())
}
pub async fn reset_tool_monitor(&self) {
if let Some(monitor) = self.tool_monitor.lock().await.as_mut() {
monitor.reset();
}
}
/// Set the scheduler service for this agent
pub async fn set_scheduler(&self, scheduler: Arc<dyn SchedulerTrait>) {
let mut scheduler_service = self.scheduler_service.lock().await;
*scheduler_service = Some(scheduler);
}
}
impl Default for Agent {
fn default() -> Self {
Self::new()
@@ -160,6 +120,55 @@ where
}
impl Agent {
pub fn new() -> Self {
// Create channels with buffer size 32 (adjust if needed)
let (confirm_tx, confirm_rx) = mpsc::channel(32);
let (tool_tx, tool_rx) = mpsc::channel(32);
// Add MCP notification channel
let (mcp_tx, mcp_rx) = mpsc::channel(100);
Self {
provider: Mutex::new(None),
extension_manager: RwLock::new(ExtensionManager::new()),
sub_recipe_manager: Mutex::new(SubRecipeManager::new()),
frontend_tools: Mutex::new(HashMap::new()),
frontend_instructions: Mutex::new(None),
prompt_manager: Mutex::new(PromptManager::new()),
confirmation_tx: confirm_tx,
confirmation_rx: Mutex::new(confirm_rx),
tool_result_tx: tool_tx,
tool_result_rx: Arc::new(Mutex::new(tool_rx)),
tool_monitor: Mutex::new(None),
router_tool_selector: Mutex::new(None),
scheduler_service: Mutex::new(None),
// Initialize with MCP notification support
subagent_manager: Mutex::new(Some(SubAgentManager::new(mcp_tx))),
mcp_notification_rx: Arc::new(Mutex::new(mcp_rx)),
}
}
pub async fn configure_tool_monitor(&self, max_repetitions: Option<u32>) {
let mut tool_monitor = self.tool_monitor.lock().await;
*tool_monitor = Some(ToolMonitor::new(max_repetitions));
}
pub async fn get_tool_stats(&self) -> Option<HashMap<String, u32>> {
let tool_monitor = self.tool_monitor.lock().await;
tool_monitor.as_ref().map(|monitor| monitor.get_stats())
}
pub async fn reset_tool_monitor(&self) {
if let Some(monitor) = self.tool_monitor.lock().await.as_mut() {
monitor.reset();
}
}
/// Set the scheduler service for this agent
pub async fn set_scheduler(&self, scheduler: Arc<dyn SchedulerTrait>) {
let mut scheduler_service = self.scheduler_service.lock().await;
*scheduler_service = Some(scheduler);
}
/// Get a reference count clone to the provider
pub async fn provider(&self) -> Result<Arc<dyn Provider>, anyhow::Error> {
match &*self.provider.lock().await {
@@ -182,7 +191,7 @@ impl Agent {
pub async fn get_prefixed_tools(&self) -> ExtensionResult<Vec<Tool>> {
let mut tools = self
.extension_manager
.lock()
.read()
.await
.get_prefixed_tools(None)
.await?;
@@ -249,7 +258,7 @@ impl Agent {
return (request_id, Ok(ToolCallResult::from(result)));
}
let extension_manager = self.extension_manager.lock().await;
let extension_manager = self.extension_manager.read().await;
let sub_recipe_manager = self.sub_recipe_manager.lock().await;
let result: ToolCallResult = if sub_recipe_manager.is_sub_recipe_tool(&tool_call.name) {
@@ -271,6 +280,11 @@ impl Agent {
)
} else if tool_call.name == PLATFORM_SEARCH_AVAILABLE_EXTENSIONS_TOOL_NAME {
ToolCallResult::from(extension_manager.search_available_extensions().await)
} else if tool_call.name == SUBAGENT_RUN_TASK_TOOL_NAME {
ToolCallResult::from(
self.handle_run_subagent_task(tool_call.arguments.clone())
.await,
)
} else if self.is_frontend_tool(&tool_call.name).await {
// For frontend tools, return an error indicating we need frontend execution
ToolCallResult::from(Err(ToolError::ExecutionError(
@@ -333,13 +347,13 @@ impl Agent {
extension_name: String,
request_id: String,
) -> (String, Result<Vec<Content>, ToolError>) {
let mut extension_manager = self.extension_manager.lock().await;
let mut extension_manager = self.extension_manager.write().await;
let selector = self.router_tool_selector.lock().await.clone();
if ToolRouterIndexManager::is_tool_router_enabled(&selector) {
if let Some(selector) = selector {
let selector_action = if action == "disable" { "remove" } else { "add" };
let extension_manager = self.extension_manager.lock().await;
let extension_manager = self.extension_manager.read().await;
let selector = Arc::new(selector);
if let Err(e) = ToolRouterIndexManager::update_extension_tools(
&selector,
@@ -407,6 +421,33 @@ impl Agent {
})
.map_err(|e| ToolError::ExecutionError(e.to_string()));
// Update vector index if operation was successful and vector routing is enabled
if result.is_ok() {
let selector = self.router_tool_selector.lock().await.clone();
if ToolRouterIndexManager::is_tool_router_enabled(&selector) {
if let Some(selector) = selector {
let vector_action = if action == "disable" { "remove" } else { "add" };
let extension_manager = self.extension_manager.read().await;
let selector = Arc::new(selector);
if let Err(e) = ToolRouterIndexManager::update_extension_tools(
&selector,
&extension_manager,
&extension_name,
vector_action,
)
.await
{
return (
request_id,
Err(ToolError::ExecutionError(format!(
"Failed to update vector index: {}",
e
))),
);
}
}
}
}
(request_id, result)
}
@@ -439,7 +480,7 @@ impl Agent {
}
}
_ => {
let mut extension_manager = self.extension_manager.lock().await;
let mut extension_manager = self.extension_manager.write().await;
extension_manager.add_extension(extension.clone()).await?;
}
}
@@ -448,7 +489,7 @@ impl Agent {
let selector = self.router_tool_selector.lock().await.clone();
if ToolRouterIndexManager::is_tool_router_enabled(&selector) {
if let Some(selector) = selector {
let extension_manager = self.extension_manager.lock().await;
let extension_manager = self.extension_manager.read().await;
let selector = Arc::new(selector);
if let Err(e) = ToolRouterIndexManager::update_extension_tools(
&selector,
@@ -471,7 +512,7 @@ impl Agent {
}
pub async fn list_tools(&self, extension_name: Option<String>) -> Vec<Tool> {
let extension_manager = self.extension_manager.lock().await;
let extension_manager = self.extension_manager.read().await;
let mut prefixed_tools = extension_manager
.get_prefixed_tools(extension_name.clone())
.await
@@ -485,6 +526,12 @@ impl Agent {
platform_tools::manage_schedule_tool(),
]);
// Add subagent tool (only if ALPHA_FEATURES is enabled)
let config = Config::global();
if config.get_param::<bool>("ALPHA_FEATURES").unwrap_or(false) {
prefixed_tools.push(subagent_tools::run_task_subagent_tool());
}
// Add resource tools if supported
if extension_manager.supports_resources() {
prefixed_tools.extend([
@@ -521,7 +568,7 @@ impl Agent {
let selector = self.router_tool_selector.lock().await.clone();
if let Some(selector) = selector {
if let Ok(recent_calls) = selector.get_recent_tool_calls(20).await {
let extension_manager = self.extension_manager.lock().await;
let extension_manager = self.extension_manager.read().await;
// Add recent tool calls to the list, avoiding duplicates
for tool_name in recent_calls {
// Find the tool in the extension manager's tools
@@ -541,11 +588,14 @@ impl Agent {
}
pub async fn remove_extension(&self, name: &str) -> Result<()> {
let mut extension_manager = self.extension_manager.write().await;
extension_manager.remove_extension(name).await?;
// If vector tool selection is enabled, remove tools from the index
let selector = self.router_tool_selector.lock().await.clone();
if ToolRouterIndexManager::is_tool_router_enabled(&selector) {
if let Some(selector) = selector {
let extension_manager = self.extension_manager.lock().await;
let extension_manager = self.extension_manager.read().await;
ToolRouterIndexManager::update_extension_tools(
&selector,
&extension_manager,
@@ -556,14 +606,11 @@ impl Agent {
}
}
let mut extension_manager = self.extension_manager.lock().await;
extension_manager.remove_extension(name).await?;
Ok(())
}
pub async fn list_extensions(&self) -> Vec<String> {
let extension_manager = self.extension_manager.lock().await;
let extension_manager = self.extension_manager.read().await;
extension_manager
.list_extensions()
.await
@@ -631,6 +678,28 @@ impl Agent {
Ok(Box::pin(async_stream::try_stream! {
let _ = reply_span.enter();
loop {
// Check for MCP notifications from subagents
let mcp_notifications = self.get_mcp_notifications().await;
for notification in mcp_notifications {
// Extract subagent info from the notification data
if let JsonRpcMessage::Notification(ref notif) = notification {
if let Some(params) = &notif.params {
if let Some(data) = params.get("data") {
if let (Some(subagent_id), Some(_message)) = (
data.get("subagent_id").and_then(|v| v.as_str()),
data.get("message").and_then(|v| v.as_str())
) {
// Emit as McpNotification event
yield AgentEvent::McpNotification((
subagent_id.to_string(),
notification.clone()
));
}
}
}
}
}
match Self::generate_response_from_provider(
self.provider().await?,
&system_prompt,
@@ -832,6 +901,31 @@ impl Agent {
messages.push(response);
messages.push(final_message_tool_resp);
// Check for MCP notifications from subagents again before next iteration
// Note: These are already handled as McpNotification events above,
// so we don't need to convert them to assistant messages here.
// This was causing duplicate plain-text notifications.
// let mcp_notifications = self.get_mcp_notifications().await;
// for notification in mcp_notifications {
// // Extract subagent info from the notification data for assistant messages
// if let JsonRpcMessage::Notification(ref notif) = notification {
// if let Some(params) = &notif.params {
// if let Some(data) = params.get("data") {
// if let (Some(subagent_id), Some(message)) = (
// data.get("subagent_id").and_then(|v| v.as_str()),
// data.get("message").and_then(|v| v.as_str())
// ) {
// yield AgentEvent::Message(
// Message::assistant().with_text(
// format!("Subagent {}: {}", subagent_id, message)
// )
// );
// }
// }
// }
// }
// }
},
Err(ProviderError::ContextLengthExceeded(_)) => {
// At this point, the last message should be a user message
@@ -862,9 +956,32 @@ impl Agent {
prompt_manager.add_system_prompt_extra(instruction);
}
/// Update the provider used by this agent
/// Get MCP notifications from subagents
pub async fn get_mcp_notifications(&self) -> Vec<JsonRpcMessage> {
let mut notifications = Vec::new();
let mut rx = self.mcp_notification_rx.lock().await;
while let Ok(notification) = rx.try_recv() {
notifications.push(notification);
}
notifications
}
/// Update the provider
pub async fn update_provider(&self, provider: Arc<dyn Provider>) -> Result<()> {
*self.provider.lock().await = Some(provider.clone());
let mut current_provider = self.provider.lock().await;
*current_provider = Some(provider.clone());
// Initialize subagent manager with MCP notification support
// Need to recreate the MCP channel since we're replacing the manager
let (mcp_tx, mcp_rx) = mpsc::channel(100);
{
let mut rx_guard = self.mcp_notification_rx.lock().await;
*rx_guard = mcp_rx;
}
*self.subagent_manager.lock().await = Some(SubAgentManager::new(mcp_tx));
self.update_router_tool_selector(Some(provider), None)
.await?;
Ok(())
@@ -876,7 +993,7 @@ impl Agent {
reindex_all: Option<bool>,
) -> Result<()> {
let config = Config::global();
let extension_manager = self.extension_manager.lock().await;
let _extension_manager = self.extension_manager.read().await;
let provider = match provider {
Some(p) => p,
None => self.provider().await?,
@@ -910,6 +1027,7 @@ impl Agent {
};
// First index platform tools
let extension_manager = self.extension_manager.read().await;
ToolRouterIndexManager::index_platform_tools(&selector, &extension_manager).await?;
if reindex_all.unwrap_or(false) {
@@ -934,6 +1052,7 @@ impl Agent {
// Update the selector
*self.router_tool_selector.lock().await = Some(selector.clone());
Ok(())
}
@@ -944,7 +1063,7 @@ impl Agent {
}
pub async fn list_extension_prompts(&self) -> HashMap<String, Vec<Prompt>> {
let extension_manager = self.extension_manager.lock().await;
let extension_manager = self.extension_manager.read().await;
extension_manager
.list_prompts()
.await
@@ -952,7 +1071,7 @@ impl Agent {
}
pub async fn get_prompt(&self, name: &str, arguments: Value) -> Result<GetPromptResult> {
let extension_manager = self.extension_manager.lock().await;
let extension_manager = self.extension_manager.read().await;
// First find which extension has this prompt
let prompts = extension_manager
@@ -975,7 +1094,7 @@ impl Agent {
}
pub async fn get_plan_prompt(&self) -> anyhow::Result<String> {
let extension_manager = self.extension_manager.lock().await;
let extension_manager = self.extension_manager.read().await;
let tools = extension_manager.get_prefixed_tools(None).await?;
let tools_info = tools
.into_iter()
@@ -1001,7 +1120,7 @@ impl Agent {
}
pub async fn create_recipe(&self, mut messages: Vec<Message>) -> Result<Recipe> {
let extension_manager = self.extension_manager.lock().await;
let extension_manager = self.extension_manager.read().await;
let extensions_info = extension_manager.get_extensions_info().await;
// Get model name from provider

View File

@@ -11,6 +11,11 @@ mod router_tool_selector;
mod router_tools;
mod schedule_tool;
pub mod sub_recipe_manager;
pub mod subagent;
pub mod subagent_handler;
pub mod subagent_manager;
pub mod subagent_tools;
pub mod subagent_types;
mod tool_execution;
mod tool_router_index_manager;
pub(crate) mod tool_vectordb;
@@ -20,4 +25,7 @@ pub use agent::{Agent, AgentEvent};
pub use extension::ExtensionConfig;
pub use extension_manager::ExtensionManager;
pub use prompt_manager::PromptManager;
pub use subagent::{SubAgent, SubAgentConfig, SubAgentProgress, SubAgentStatus};
pub use subagent_manager::SubAgentManager;
pub use subagent_types::SpawnSubAgentArgs;
pub use types::{FrontendTool, SessionConfig};

View File

@@ -52,7 +52,7 @@ impl Agent {
}
// Prepare system prompt
let extension_manager = self.extension_manager.lock().await;
let extension_manager = self.extension_manager.read().await;
let extensions_info = extension_manager.get_extensions_info().await;
// Get model name from provider

View File

@@ -0,0 +1,757 @@
use crate::{
agents::{extension_manager::ExtensionManager, Agent},
message::{Message, MessageContent, ToolRequest},
prompt_template::render_global_file,
providers::base::Provider,
providers::errors::ProviderError,
recipe::Recipe,
};
use anyhow::anyhow;
use chrono::{DateTime, Utc};
use mcp_core::protocol::{JsonRpcMessage, JsonRpcNotification};
use mcp_core::{handler::ToolError, role::Role, tool::Tool};
use serde::{Deserialize, Serialize};
use serde_json::{self, json};
use std::{collections::HashMap, sync::Arc};
use tokio::sync::{mpsc, Mutex, RwLock};
use tracing::{debug, error, instrument};
use uuid::Uuid;
use crate::agents::platform_tools::{
self, PLATFORM_LIST_RESOURCES_TOOL_NAME, PLATFORM_READ_RESOURCE_TOOL_NAME,
PLATFORM_SEARCH_AVAILABLE_EXTENSIONS_TOOL_NAME,
};
use crate::agents::subagent_tools::SUBAGENT_RUN_TASK_TOOL_NAME;
/// Status of a subagent
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum SubAgentStatus {
Ready, // Ready to process messages
Processing, // Currently working on a task
Completed(String), // Task completed (with optional message for success/error)
Terminated, // Manually terminated
}
/// Configuration for a subagent
#[derive(Debug)]
pub struct SubAgentConfig {
pub id: String,
pub recipe: Option<Recipe>,
pub instructions: Option<String>,
pub max_turns: Option<usize>,
pub timeout_seconds: Option<u64>,
}
impl SubAgentConfig {
pub fn new_with_recipe(recipe: Recipe) -> Self {
Self {
id: Uuid::new_v4().to_string(),
recipe: Some(recipe),
instructions: None,
max_turns: None,
timeout_seconds: None,
}
}
pub fn new_with_instructions(instructions: String) -> Self {
Self {
id: Uuid::new_v4().to_string(),
recipe: None,
instructions: Some(instructions),
max_turns: None,
timeout_seconds: None,
}
}
pub fn with_max_turns(mut self, max_turns: usize) -> Self {
self.max_turns = Some(max_turns);
self
}
pub fn with_timeout(mut self, timeout_seconds: u64) -> Self {
self.timeout_seconds = Some(timeout_seconds);
self
}
}
/// Progress information for a subagent
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubAgentProgress {
pub subagent_id: String,
pub status: SubAgentStatus,
pub message: String,
pub turn: usize,
pub max_turns: Option<usize>,
pub timestamp: DateTime<Utc>,
}
/// A specialized agent that can handle specific tasks independently
pub struct SubAgent {
pub id: String,
pub conversation: Arc<Mutex<Vec<Message>>>,
pub status: Arc<RwLock<SubAgentStatus>>,
pub config: SubAgentConfig,
pub turn_count: Arc<Mutex<usize>>,
pub created_at: DateTime<Utc>,
pub recipe_extensions: Arc<Mutex<Vec<String>>>,
pub missing_extensions: Arc<Mutex<Vec<String>>>, // Track extensions that weren't enabled
pub mcp_notification_tx: mpsc::Sender<JsonRpcMessage>, // For MCP notifications
}
impl SubAgent {
/// Create a new subagent with the given configuration and provider
#[instrument(skip(config, _provider, extension_manager, mcp_notification_tx))]
pub async fn new(
config: SubAgentConfig,
_provider: Arc<dyn Provider>,
extension_manager: Arc<tokio::sync::RwLockReadGuard<'_, ExtensionManager>>,
mcp_notification_tx: mpsc::Sender<JsonRpcMessage>,
) -> Result<(Arc<Self>, tokio::task::JoinHandle<()>), anyhow::Error> {
debug!("Creating new subagent with id: {}", config.id);
let mut missing_extensions = Vec::new();
let mut recipe_extensions = Vec::new();
// Check if extensions from recipe exist in the extension manager
if let Some(recipe) = &config.recipe {
if let Some(extensions) = &recipe.extensions {
for extension in extensions {
let extension_name = extension.name();
let existing_extensions = extension_manager.list_extensions().await?;
if !existing_extensions.contains(&extension_name) {
missing_extensions.push(extension_name);
} else {
recipe_extensions.push(extension_name);
}
}
}
} else {
// If no recipe, inherit all extensions from the parent agent
let existing_extensions = extension_manager.list_extensions().await?;
recipe_extensions = existing_extensions;
}
let subagent = Arc::new(SubAgent {
id: config.id.clone(),
conversation: Arc::new(Mutex::new(Vec::new())),
status: Arc::new(RwLock::new(SubAgentStatus::Ready)),
config,
turn_count: Arc::new(Mutex::new(0)),
created_at: Utc::now(),
recipe_extensions: Arc::new(Mutex::new(recipe_extensions)),
missing_extensions: Arc::new(Mutex::new(missing_extensions)),
mcp_notification_tx,
});
// Send initial MCP notification
let subagent_clone = Arc::clone(&subagent);
subagent_clone
.send_mcp_notification("subagent_created", "Subagent created and ready")
.await;
// Create a background task handle (for future use with streaming/monitoring)
let subagent_clone = Arc::clone(&subagent);
let handle = tokio::spawn(async move {
// This could be used for background monitoring, cleanup, etc.
debug!("Subagent {} background task started", subagent_clone.id);
});
debug!("Subagent {} created successfully", subagent.id);
Ok((subagent, handle))
}
/// Get the current status of the subagent
pub async fn get_status(&self) -> SubAgentStatus {
self.status.read().await.clone()
}
/// Update the status of the subagent
async fn set_status(&self, status: SubAgentStatus) {
// Update the status first, then release the lock
{
let mut current_status = self.status.write().await;
*current_status = status.clone();
} // Write lock is released here!
// Send MCP notifications based on status
match &status {
SubAgentStatus::Processing => {
self.send_mcp_notification("status_changed", "Processing request")
.await;
}
SubAgentStatus::Completed(msg) => {
self.send_mcp_notification("completed", &format!("Completed: {}", msg))
.await;
}
SubAgentStatus::Terminated => {
self.send_mcp_notification("terminated", "Subagent terminated")
.await;
}
_ => {}
}
}
/// Send an MCP notification about the subagent's activity
pub async fn send_mcp_notification(&self, notification_type: &str, message: &str) {
let notification = JsonRpcMessage::Notification(JsonRpcNotification {
jsonrpc: "2.0".to_string(),
method: "notifications/message".to_string(),
params: Some(json!({
"level": "info",
"logger": format!("subagent_{}", self.id),
"data": {
"subagent_id": self.id,
"type": notification_type,
"message": message,
"timestamp": Utc::now().to_rfc3339()
}
})),
});
if let Err(e) = self.mcp_notification_tx.send(notification).await {
error!(
"Failed to send MCP notification from subagent {}: {}",
self.id, e
);
}
}
/// Get current progress information
pub async fn get_progress(&self) -> SubAgentProgress {
let status = self.get_status().await;
let turn_count = *self.turn_count.lock().await;
SubAgentProgress {
subagent_id: self.id.clone(),
status: status.clone(),
message: match &status {
SubAgentStatus::Ready => "Ready to process messages".to_string(),
SubAgentStatus::Processing => "Processing request...".to_string(),
SubAgentStatus::Completed(msg) => msg.clone(),
SubAgentStatus::Terminated => "Subagent terminated".to_string(),
},
turn: turn_count,
max_turns: self.config.max_turns,
timestamp: Utc::now(),
}
}
/// Process a message and generate a response using the subagent's provider
#[instrument(skip(self, message, provider, extension_manager))]
pub async fn reply_subagent(
&self,
message: String,
provider: Arc<dyn Provider>,
extension_manager: Arc<tokio::sync::RwLockReadGuard<'_, ExtensionManager>>,
) -> Result<Message, anyhow::Error> {
debug!("Processing message for subagent {}", self.id);
self.send_mcp_notification("message_processing", &format!("Processing: {}", message))
.await;
// Check if we've exceeded max turns
{
let turn_count = *self.turn_count.lock().await;
if let Some(max_turns) = self.config.max_turns {
if turn_count >= max_turns {
self.set_status(SubAgentStatus::Completed(
"Maximum turns exceeded".to_string(),
))
.await;
return Err(anyhow!("Maximum turns ({}) exceeded", max_turns));
}
}
}
// Set status to processing
self.set_status(SubAgentStatus::Processing).await;
// Add user message to conversation
let user_message = Message::user().with_text(message.clone());
{
let mut conversation = self.conversation.lock().await;
conversation.push(user_message.clone());
}
// Increment turn count
{
let mut turn_count = self.turn_count.lock().await;
*turn_count += 1;
self.send_mcp_notification(
"turn_progress",
&format!("Turn {}/{}", turn_count, self.config.max_turns.unwrap_or(0)),
)
.await;
}
// Get the current conversation for context
let mut messages = self.get_conversation().await;
// Get tools based on whether we're using a recipe or inheriting from parent
let tools: Vec<Tool> = if self.config.recipe.is_some() {
// Recipe mode: only get tools from the recipe's extensions
let recipe_extensions = self.recipe_extensions.lock().await;
let mut recipe_tools = Vec::new();
debug!(
"Subagent {} operating in recipe mode with {} extensions",
self.id,
recipe_extensions.len()
);
for extension_name in recipe_extensions.iter() {
match extension_manager
.get_prefixed_tools(Some(extension_name.clone()))
.await
{
Ok(mut ext_tools) => {
debug!(
"Added {} tools from extension {}",
ext_tools.len(),
extension_name
);
recipe_tools.append(&mut ext_tools);
}
Err(e) => {
debug!(
"Failed to get tools for extension {}: {}",
extension_name, e
);
}
}
}
debug!(
"Subagent {} has {} total recipe tools before filtering",
self.id,
recipe_tools.len()
);
// Filter out subagent tools from recipe tools
let mut filtered_tools = Self::filter_subagent_tools(recipe_tools);
// Add platform tools (except subagent tools)
Self::add_platform_tools(&mut filtered_tools, &extension_manager).await;
debug!(
"Subagent {} has {} tools after filtering and adding platform tools",
self.id,
filtered_tools.len()
);
filtered_tools
} else {
// No recipe: inherit all tools from parent (but filter out subagent tools)
debug!(
"Subagent {} operating in inheritance mode, using all parent tools",
self.id
);
let parent_tools = extension_manager.get_prefixed_tools(None).await?;
debug!(
"Subagent {} has {} parent tools before filtering",
self.id,
parent_tools.len()
);
let mut filtered_tools = Self::filter_subagent_tools(parent_tools);
// Add platform tools (except subagent tools)
Self::add_platform_tools(&mut filtered_tools, &extension_manager).await;
debug!(
"Subagent {} has {} tools after filtering and adding platform tools",
self.id,
filtered_tools.len()
);
filtered_tools
};
let toolshim_tools: Vec<Tool> = vec![];
// Build system prompt using the template
let system_prompt = self.build_system_prompt(&tools).await?;
// Generate response from provider
loop {
match Agent::generate_response_from_provider(
Arc::clone(&provider),
&system_prompt,
&messages,
&tools,
&toolshim_tools,
)
.await
{
Ok((response, _usage)) => {
// Process any tool calls in the response
let tool_requests: Vec<ToolRequest> = response
.content
.iter()
.filter_map(|content| {
if let MessageContent::ToolRequest(req) = content {
Some(req.clone())
} else {
None
}
})
.collect();
// If there are no tool requests, we're done
if tool_requests.is_empty() {
self.add_message(response.clone()).await;
// Send notification about response
self.send_mcp_notification(
"response_generated",
&format!("Responded: {}", response.as_concat_text()),
)
.await;
// Add delay before completion to ensure all processing finishes
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
// Set status back to ready and return the final response
self.set_status(SubAgentStatus::Completed("Completed!".to_string()))
.await;
break Ok(response);
}
// Add the assistant message with tool calls to the conversation
messages.push(response.clone());
// Process each tool request and create user response messages
for request in &tool_requests {
if let Ok(tool_call) = &request.tool_call {
// Send notification about tool usage
self.send_mcp_notification(
"tool_usage",
&format!("Using tool: {}", tool_call.name),
)
.await;
// Handle platform tools or dispatch to extension manager
let tool_result = if self.is_platform_tool(&tool_call.name) {
self.handle_platform_tool_call(
tool_call.clone(),
&extension_manager,
)
.await
} else {
match extension_manager
.dispatch_tool_call(tool_call.clone())
.await
{
Ok(result) => result.result.await,
Err(e) => Err(ToolError::ExecutionError(e.to_string())),
}
};
match tool_result {
Ok(result) => {
// Create a user message with the tool response
let tool_response_message = Message::user()
.with_tool_response(request.id.clone(), Ok(result.clone()));
messages.push(tool_response_message);
// Send notification about tool completion
self.send_mcp_notification(
"tool_completed",
&format!("Tool {} completed successfully", tool_call.name),
)
.await;
}
Err(e) => {
// Create a user message with the tool error
let tool_error_message = Message::user().with_tool_response(
request.id.clone(),
Err(ToolError::ExecutionError(e.to_string())),
);
messages.push(tool_error_message);
// Send notification about tool error
self.send_mcp_notification(
"tool_error",
&format!("Tool {} error: {}", tool_call.name, e),
)
.await;
}
}
}
}
// Continue the loop to get the next response from the provider
}
Err(ProviderError::ContextLengthExceeded(_)) => {
self.set_status(SubAgentStatus::Completed(
"Context length exceeded".to_string(),
))
.await;
break Ok(Message::assistant().with_context_length_exceeded(
"The context length of the model has been exceeded. Please start a new session and try again.",
));
}
Err(ProviderError::RateLimitExceeded(_)) => {
self.set_status(SubAgentStatus::Completed("Rate limit exceeded".to_string()))
.await;
break Ok(Message::assistant()
.with_text("Rate limit exceeded. Please try again later."));
}
Err(e) => {
self.set_status(SubAgentStatus::Completed(format!("Error: {}", e)))
.await;
error!("Error: {}", e);
break Ok(Message::assistant().with_text(format!("Ran into this error: {e}.\n\nPlease retry if you think this is a transient or recoverable error.")));
}
}
}
}
/// Add a message to the conversation (for tracking agent responses)
pub async fn add_message(&self, message: Message) {
let mut conversation = self.conversation.lock().await;
conversation.push(message);
}
/// Get the full conversation history
pub async fn get_conversation(&self) -> Vec<Message> {
self.conversation.lock().await.clone()
}
/// Check if the subagent has completed its task
pub async fn is_completed(&self) -> bool {
matches!(
self.get_status().await,
SubAgentStatus::Completed(_) | SubAgentStatus::Terminated
)
}
/// Terminate the subagent
pub async fn terminate(&self) -> Result<(), anyhow::Error> {
debug!("Terminating subagent {}", self.id);
self.set_status(SubAgentStatus::Terminated).await;
Ok(())
}
/// Get formatted conversation for display
pub async fn get_formatted_conversation(&self) -> String {
let conversation = self.conversation.lock().await;
let mut formatted = format!("=== Subagent {} Conversation ===\n", self.id);
if let Some(recipe) = &self.config.recipe {
formatted.push_str(&format!("Recipe: {}\n", recipe.title));
} else if let Some(instructions) = &self.config.instructions {
formatted.push_str(&format!("Instructions: {}\n", instructions));
} else {
formatted.push_str("Mode: Ad-hoc subagent\n");
}
formatted.push_str(&format!(
"Created: {}\n",
self.created_at.format("%Y-%m-%d %H:%M:%S UTC")
));
let progress = self.get_progress().await;
formatted.push_str(&format!("Status: {:?}\n", progress.status));
formatted.push_str(&format!("Turn: {}", progress.turn));
if let Some(max_turns) = progress.max_turns {
formatted.push_str(&format!("/{}", max_turns));
}
formatted.push_str("\n\n");
for (i, message) in conversation.iter().enumerate() {
formatted.push_str(&format!(
"{}. {}: {}\n",
i + 1,
match message.role {
Role::User => "User",
Role::Assistant => "Assistant",
},
message.as_concat_text()
));
}
formatted.push_str("=== End Conversation ===\n");
formatted
}
/// Get the list of extensions that weren't enabled
pub async fn get_missing_extensions(&self) -> Vec<String> {
self.missing_extensions.lock().await.clone()
}
/// Filter out subagent spawning tools to prevent infinite recursion
fn filter_subagent_tools(tools: Vec<Tool>) -> Vec<Tool> {
let original_count = tools.len();
let filtered_tools: Vec<Tool> = tools
.into_iter()
.filter(|tool| {
let should_keep = tool.name != SUBAGENT_RUN_TASK_TOOL_NAME;
if !should_keep {
debug!("Filtering out subagent tool: {}", tool.name);
}
should_keep
})
.collect();
let filtered_count = filtered_tools.len();
if filtered_count < original_count {
debug!(
"Filtered {} subagent tool(s) from {} total tools",
original_count - filtered_count,
original_count
);
}
filtered_tools
}
/// Add platform tools to the subagent's tool list (excluding dangerous tools)
async fn add_platform_tools(tools: &mut Vec<Tool>, extension_manager: &ExtensionManager) {
debug!("Adding safe platform tools to subagent");
// Add safe platform tools - subagents can search for extensions but can't manage them or schedules
tools.push(platform_tools::search_available_extensions_tool());
debug!("Added search_available_extensions tool");
// Add resource tools if supported - these are generally safe for subagents
if extension_manager.supports_resources() {
tools.extend([
platform_tools::read_resource_tool(),
platform_tools::list_resources_tool(),
]);
debug!("Added 2 resource platform tools");
}
// Note: We explicitly do NOT add these tools for security reasons:
// - manage_extensions (could interfere with parent agent's extensions)
// - manage_schedule (could interfere with parent agent's scheduling)
// - subagent spawning tools (prevent recursion)
debug!("Platform tools added successfully (dangerous tools excluded)");
}
/// Check if a tool name is a platform tool that subagents can use
fn is_platform_tool(&self, tool_name: &str) -> bool {
matches!(
tool_name,
PLATFORM_SEARCH_AVAILABLE_EXTENSIONS_TOOL_NAME
| PLATFORM_READ_RESOURCE_TOOL_NAME
| PLATFORM_LIST_RESOURCES_TOOL_NAME
)
}
/// Handle platform tool calls that are safe for subagents
async fn handle_platform_tool_call(
&self,
tool_call: mcp_core::tool::ToolCall,
extension_manager: &ExtensionManager,
) -> Result<Vec<mcp_core::Content>, ToolError> {
debug!("Handling platform tool: {}", tool_call.name);
match tool_call.name.as_str() {
PLATFORM_SEARCH_AVAILABLE_EXTENSIONS_TOOL_NAME => extension_manager
.search_available_extensions()
.await
.map_err(|e| ToolError::ExecutionError(e.to_string())),
PLATFORM_READ_RESOURCE_TOOL_NAME => extension_manager
.read_resource(tool_call.arguments)
.await
.map_err(|e| ToolError::ExecutionError(e.to_string())),
PLATFORM_LIST_RESOURCES_TOOL_NAME => extension_manager
.list_resources(tool_call.arguments)
.await
.map_err(|e| ToolError::ExecutionError(e.to_string())),
_ => Err(ToolError::ExecutionError(format!(
"Platform tool '{}' is not available to subagents for security reasons",
tool_call.name
))),
}
}
/// Build the system prompt for the subagent using the template
async fn build_system_prompt(&self, available_tools: &[Tool]) -> Result<String, anyhow::Error> {
let mut context = HashMap::new();
// Add basic context
context.insert(
"current_date_time",
serde_json::Value::String(Utc::now().format("%Y-%m-%d %H:%M:%S UTC").to_string()),
);
context.insert("subagent_id", serde_json::Value::String(self.id.clone()));
// Add recipe information if available
if let Some(recipe) = &self.config.recipe {
context.insert(
"recipe_title",
serde_json::Value::String(recipe.title.clone()),
);
}
// Add max turns if configured
if let Some(max_turns) = self.config.max_turns {
context.insert(
"max_turns",
serde_json::Value::Number(serde_json::Number::from(max_turns)),
);
}
// Add task instructions
let instructions = if let Some(recipe) = &self.config.recipe {
recipe.instructions.as_deref().unwrap_or("")
} else {
self.config.instructions.as_deref().unwrap_or("")
};
context.insert(
"task_instructions",
serde_json::Value::String(instructions.to_string()),
);
// Add available extensions (only if we have a recipe and extensions)
if self.config.recipe.is_some() {
let extensions: Vec<String> = self.recipe_extensions.lock().await.clone();
if !extensions.is_empty() {
context.insert(
"extensions",
serde_json::Value::Array(
extensions
.into_iter()
.map(serde_json::Value::String)
.collect(),
),
);
}
}
// Add available tools with descriptions for better context
let tools_with_descriptions: Vec<String> = available_tools
.iter()
.map(|t| {
if t.description.is_empty() {
t.name.clone()
} else {
format!("{}: {}", t.name, t.description)
}
})
.collect();
context.insert(
"available_tools",
serde_json::Value::String(if tools_with_descriptions.is_empty() {
"None".to_string()
} else {
tools_with_descriptions.join(", ")
}),
);
// Add tool count for context
context.insert(
"tool_count",
serde_json::Value::Number(serde_json::Number::from(available_tools.len())),
);
// Render the subagent system prompt template
let system_prompt = render_global_file("subagent_system.md", &context)
.map_err(|e| anyhow!("Failed to render subagent system prompt: {}", e))?;
Ok(system_prompt)
}
}

View File

@@ -0,0 +1,79 @@
use anyhow::Result;
use mcp_core::{Content, ToolError};
use serde_json::Value;
use std::sync::Arc;
use crate::agents::subagent_types::SpawnSubAgentArgs;
use crate::agents::Agent;
impl Agent {
/// Handle running a complete subagent task (replaces the individual spawn/send/check tools)
pub async fn handle_run_subagent_task(
&self,
arguments: Value,
) -> Result<Vec<Content>, ToolError> {
let subagent_manager = self.subagent_manager.lock().await;
let manager = subagent_manager.as_ref().ok_or_else(|| {
ToolError::ExecutionError("Subagent manager not initialized".to_string())
})?;
// Parse arguments - using "task" as the main message parameter
let message = arguments
.get("task")
.and_then(|v| v.as_str())
.ok_or_else(|| ToolError::ExecutionError("Missing task parameter".to_string()))?
.to_string();
// Either recipe_name or instructions must be provided
let recipe_name = arguments
.get("recipe_name")
.and_then(|v| v.as_str())
.map(|s| s.to_string());
let instructions = arguments
.get("instructions")
.and_then(|v| v.as_str())
.map(|s| s.to_string());
let mut args = if let Some(recipe_name) = recipe_name {
SpawnSubAgentArgs::new_with_recipe(recipe_name, message.clone())
} else if let Some(instructions) = instructions {
SpawnSubAgentArgs::new_with_instructions(instructions, message.clone())
} else {
return Err(ToolError::ExecutionError(
"Either recipe_name or instructions parameter must be provided".to_string(),
));
};
// Set max_turns with default of 10
let max_turns = arguments
.get("max_turns")
.and_then(|v| v.as_u64())
.unwrap_or(10) as usize;
args = args.with_max_turns(max_turns);
if let Some(timeout) = arguments.get("timeout_seconds").and_then(|v| v.as_u64()) {
args = args.with_timeout(timeout);
}
// Get the provider from the parent agent
let provider = self
.provider()
.await
.map_err(|e| ToolError::ExecutionError(format!("Failed to get provider: {}", e)))?;
// Get the extension manager from the parent agent
let extension_manager = Arc::new(self.extension_manager.read().await);
// Run the complete subagent task
match manager
.run_complete_subagent_task(args, provider, extension_manager)
.await
{
Ok(result) => Ok(vec![Content::text(result)]),
Err(e) => Err(ToolError::ExecutionError(format!(
"Failed to run subagent task: {}",
e
))),
}
}
}

View File

@@ -0,0 +1,404 @@
use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;
use anyhow::{anyhow, Result};
use mcp_core::protocol::JsonRpcMessage;
use tokio::sync::{mpsc, Mutex, RwLock};
use tracing::{debug, error, instrument, warn};
use crate::agents::extension_manager::ExtensionManager;
use crate::agents::subagent::{SubAgent, SubAgentConfig, SubAgentProgress, SubAgentStatus};
use crate::agents::subagent_types::SpawnSubAgentArgs;
use crate::providers::base::Provider;
use crate::recipe::Recipe;
/// Manages the lifecycle of subagents
pub struct SubAgentManager {
subagents: Arc<RwLock<HashMap<String, Arc<SubAgent>>>>,
handles: Arc<Mutex<HashMap<String, tokio::task::JoinHandle<()>>>>,
mcp_notification_tx: mpsc::Sender<JsonRpcMessage>,
}
impl SubAgentManager {
/// Create a new subagent manager
pub fn new(mcp_notification_tx: mpsc::Sender<JsonRpcMessage>) -> Self {
Self {
subagents: Arc::new(RwLock::new(HashMap::new())),
handles: Arc::new(Mutex::new(HashMap::new())),
mcp_notification_tx,
}
}
/// Spawn a new interactive subagent
#[instrument(skip(self, args, provider, extension_manager))]
pub async fn spawn_interactive_subagent(
&self,
args: SpawnSubAgentArgs,
provider: Arc<dyn Provider>,
extension_manager: Arc<tokio::sync::RwLockReadGuard<'_, ExtensionManager>>,
) -> Result<String> {
debug!("Spawning interactive subagent");
// Create subagent config based on whether we have a recipe or instructions
let mut config = if let Some(recipe_name) = args.recipe_name {
debug!("Using recipe: {}", recipe_name);
// Load the recipe
let recipe = self.load_recipe(&recipe_name).await?;
SubAgentConfig::new_with_recipe(recipe)
} else if let Some(instructions) = args.instructions {
debug!("Using direct instructions");
SubAgentConfig::new_with_instructions(instructions)
} else {
return Err(anyhow!(
"Either recipe_name or instructions must be provided"
));
};
if let Some(max_turns) = args.max_turns {
config = config.with_max_turns(max_turns);
}
if let Some(timeout) = args.timeout_seconds {
config = config.with_timeout(timeout);
}
// Create the subagent with the parent agent's provider
let (subagent, handle) = SubAgent::new(
config,
Arc::clone(&provider),
Arc::clone(&extension_manager),
self.mcp_notification_tx.clone(),
)
.await?;
let subagent_id = subagent.id.clone();
// Store the subagent and its handle
{
let mut subagents = self.subagents.write().await;
subagents.insert(subagent_id.clone(), Arc::clone(&subagent));
}
{
let mut handles = self.handles.lock().await;
handles.insert(subagent_id.clone(), handle);
}
// Return immediately - no initial message processing
Ok(subagent_id)
}
/// Get a subagent by ID
pub async fn get_subagent(&self, id: &str) -> Option<Arc<SubAgent>> {
let subagents = self.subagents.read().await;
subagents.get(id).cloned()
}
/// List all active subagent IDs
pub async fn list_subagents(&self) -> Vec<String> {
let subagents = self.subagents.read().await;
subagents.keys().cloned().collect()
}
/// Get status of all subagents
pub async fn get_subagent_status(&self) -> HashMap<String, SubAgentStatus> {
let subagents = self.subagents.read().await;
let mut status_map = HashMap::new();
for (id, subagent) in subagents.iter() {
status_map.insert(id.clone(), subagent.get_status().await);
}
status_map
}
/// Get progress of all subagents
pub async fn get_subagent_progress(&self) -> HashMap<String, SubAgentProgress> {
let subagents = self.subagents.read().await;
let mut progress_map = HashMap::new();
for (id, subagent) in subagents.iter() {
progress_map.insert(id.clone(), subagent.get_progress().await);
}
progress_map
}
/// Send a message to a specific subagent
#[instrument(skip(self, message, provider, extension_manager))]
pub async fn send_message_to_subagent(
&self,
subagent_id: &str,
message: String,
provider: Arc<dyn Provider>,
extension_manager: Arc<tokio::sync::RwLockReadGuard<'_, ExtensionManager>>,
) -> Result<String> {
let subagent = self
.get_subagent(subagent_id)
.await
.ok_or_else(|| anyhow!("Subagent {} not found", subagent_id))?;
// Process the message and get a reply
match subagent
.reply_subagent(message, provider, extension_manager)
.await
{
Ok(response) => Ok(format!(
"Message sent to subagent {}. Response:\n{}",
subagent_id,
response.as_concat_text()
)),
Err(e) => Err(anyhow!("Failed to process message in subagent: {}", e)),
}
}
/// Terminate a specific subagent
#[instrument(skip(self))]
pub async fn terminate_subagent(&self, id: &str) -> Result<()> {
debug!("Terminating subagent {}", id);
// Get and terminate the subagent
let subagent = {
let mut subagents = self.subagents.write().await;
subagents.remove(id)
};
if let Some(subagent) = subagent {
subagent.terminate().await?;
} else {
warn!("Attempted to terminate non-existent subagent {}", id);
return Err(anyhow!("Subagent {} not found", id));
}
// Clean up the background handle
let handle = {
let mut handles = self.handles.lock().await;
handles.remove(id)
};
if let Some(handle) = handle {
handle.abort();
}
debug!("Subagent {} terminated successfully", id);
Ok(())
}
/// Terminate all subagents
#[instrument(skip(self))]
pub async fn terminate_all_subagents(&self) -> Result<()> {
debug!("Terminating all subagents");
let subagent_ids: Vec<String> = {
let subagents = self.subagents.read().await;
subagents.keys().cloned().collect()
};
for id in subagent_ids {
if let Err(e) = self.terminate_subagent(&id).await {
error!("Failed to terminate subagent {}: {}", id, e);
}
}
debug!("All subagents terminated");
Ok(())
}
/// Get formatted conversation from a subagent
pub async fn get_subagent_conversation(&self, id: &str) -> Result<String> {
let subagent = self
.get_subagent(id)
.await
.ok_or_else(|| anyhow!("Subagent {} not found", id))?;
Ok(subagent.get_formatted_conversation().await)
}
/// Clean up completed or failed subagents
pub async fn cleanup_completed_subagents(&self) -> Result<usize> {
let mut completed_ids = Vec::new();
// Find completed subagents
{
let subagents = self.subagents.read().await;
for (id, subagent) in subagents.iter() {
if subagent.is_completed().await {
completed_ids.push(id.clone());
}
}
}
// Remove completed subagents
let count = completed_ids.len();
for id in completed_ids {
if let Err(e) = self.terminate_subagent(&id).await {
error!("Failed to cleanup completed subagent {}: {}", id, e);
}
}
debug!("Cleaned up {} completed subagents", count);
Ok(count)
}
/// Load a recipe from file
async fn load_recipe(&self, recipe_name: &str) -> Result<Recipe> {
// Try to load from current directory first
let recipe_path = if recipe_name.ends_with(".yaml") || recipe_name.ends_with(".yml") {
recipe_name.to_string()
} else {
format!("{}.yaml", recipe_name)
};
if Path::new(&recipe_path).exists() {
let content = tokio::fs::read_to_string(&recipe_path).await?;
let recipe: Recipe = serde_yaml::from_str(&content)?;
return Ok(recipe);
}
// Try some common recipe locations
let common_paths = [
format!("recipes/{}", recipe_path),
format!("./recipes/{}", recipe_path),
format!("../recipes/{}", recipe_path),
];
for path in &common_paths {
if Path::new(path).exists() {
let content = tokio::fs::read_to_string(path).await?;
let recipe: Recipe = serde_yaml::from_str(&content)?;
return Ok(recipe);
}
}
Err(anyhow!(
"Recipe file '{}' not found in current directory or common recipe locations",
recipe_name
))
}
/// Get count of active subagents
pub async fn get_active_count(&self) -> usize {
let subagents = self.subagents.read().await;
subagents.len()
}
/// Check if a subagent exists
pub async fn has_subagent(&self, id: &str) -> bool {
let subagents = self.subagents.read().await;
subagents.contains_key(id)
}
/// Run a complete subagent task (spawn, execute, cleanup)
#[instrument(skip(self, args, provider, extension_manager))]
pub async fn run_complete_subagent_task(
&self,
args: SpawnSubAgentArgs,
provider: Arc<dyn Provider>,
extension_manager: Arc<tokio::sync::RwLockReadGuard<'_, ExtensionManager>>,
) -> Result<String> {
debug!("Running complete subagent task");
// Create subagent config based on whether we have a recipe or instructions
let mut config = if let Some(recipe_name) = args.recipe_name {
debug!("Using recipe: {}", recipe_name);
// Load the recipe
let recipe = self.load_recipe(&recipe_name).await?;
SubAgentConfig::new_with_recipe(recipe)
} else if let Some(instructions) = args.instructions {
debug!("Using direct instructions");
SubAgentConfig::new_with_instructions(instructions)
} else {
return Err(anyhow!(
"Either recipe_name or instructions must be provided"
));
};
// Set default max_turns if not provided
let max_turns = args.max_turns.unwrap_or(10);
config = config.with_max_turns(max_turns);
if let Some(timeout) = args.timeout_seconds {
config = config.with_timeout(timeout);
}
// Create the subagent with the parent agent's provider
let (subagent, handle) = SubAgent::new(
config,
Arc::clone(&provider),
Arc::clone(&extension_manager),
self.mcp_notification_tx.clone(),
)
.await?;
let subagent_id = subagent.id.clone();
// Store the subagent and its handle temporarily
{
let mut subagents = self.subagents.write().await;
subagents.insert(subagent_id.clone(), Arc::clone(&subagent));
}
{
let mut handles = self.handles.lock().await;
handles.insert(subagent_id.clone(), handle);
}
// Run the complete conversation
let mut conversation_result = String::new();
let turn_count = 0;
let current_message = args.message.clone();
// For now, we just complete after one turn since we don't have a mechanism
// for the subagent to continue autonomously without user input
// In a future iteration, we could add logic for the subagent to continue
// working on multi-step tasks with proper turn management
match subagent
.reply_subagent(
current_message,
Arc::clone(&provider),
Arc::clone(&extension_manager),
)
.await
{
Ok(response) => {
let response_text = response.as_concat_text();
conversation_result.push_str(&format!(
"\n--- Turn {} ---\n{}",
turn_count + 1,
response_text
));
conversation_result.push_str(&format!(
"\n[Task completed after {} turns]",
turn_count + 1
));
}
Err(e) => {
conversation_result
.push_str(&format!("\n[Error after {} turns: {}]", turn_count, e));
}
}
// Clean up the subagent
if let Err(e) = self.terminate_subagent(&subagent_id).await {
debug!("Failed to cleanup subagent {}: {}", subagent_id, e);
}
// Return the complete conversation result
Ok(format!("Subagent task completed:\n{}", conversation_result))
}
}
impl Default for SubAgentManager {
fn default() -> Self {
// Create a dummy channel for default implementation
// In practice, this should not be used - SubAgentManager should be created
// with a proper MCP notification sender
let (tx, _rx) = mpsc::channel(1);
Self::new(tx)
}
}
impl Drop for SubAgentManager {
fn drop(&mut self) {
// Note: In a real implementation, you might want to spawn a task to clean up
// subagents gracefully, but for now we'll rely on the Drop implementations
// of the individual components
debug!("SubAgentManager dropped");
}
}

View File

@@ -0,0 +1,68 @@
use indoc::indoc;
use mcp_core::tool::{Tool, ToolAnnotations};
use serde_json::json;
pub const SUBAGENT_RUN_TASK_TOOL_NAME: &str = "subagent__run_task";
pub fn run_task_subagent_tool() -> Tool {
Tool::new(
SUBAGENT_RUN_TASK_TOOL_NAME.to_string(),
indoc! {r#"
Spawn a specialized subagent to handle a specific task completely and automatically.
This tool creates a subagent, processes your task through a complete conversation,
and returns the final result. The subagent is automatically cleaned up after completion.
You can configure the subagent in two ways:
1. Using a recipe file that defines instructions, extensions, and behavior
2. Providing direct instructions for ad-hoc tasks
The subagent will work autonomously until the task is complete, it reaches max_turns,
or it encounters an error. You'll get the final result without needing to manage
the subagent lifecycle manually.
Examples:
- "Convert these unittest files to pytest format: file1.py, file2.py"
- "Research the latest developments in AI and provide a comprehensive summary"
- "Review this code for security vulnerabilities and suggest fixes"
- "Refactor this legacy code to use modern Python patterns"
"#}
.to_string(),
json!({
"type": "object",
"required": ["task"],
"properties": {
"recipe_name": {
"type": "string",
"description": "Name of the recipe file to configure the subagent (e.g., 'research_assistant_recipe.yaml'). Either this or 'instructions' must be provided."
},
"instructions": {
"type": "string",
"description": "Direct instructions for the subagent's task. Either this or 'recipe_name' must be provided. Example: 'You are a code refactoring assistant. Help convert unittest tests to pytest format.'"
},
"task": {
"type": "string",
"description": "The task description or initial message for the subagent to work on"
},
"max_turns": {
"type": "integer",
"description": "Maximum number of conversation turns before auto-completion (default: 10)",
"minimum": 1,
"default": 10
},
"timeout_seconds": {
"type": "integer",
"description": "Optional timeout for the entire task in seconds",
"minimum": 1
}
}
}),
Some(ToolAnnotations {
title: Some("Run subagent task".to_string()),
read_only_hint: false,
destructive_hint: false,
idempotent_hint: false,
open_world_hint: false,
}),
)
}

View File

@@ -0,0 +1,42 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SpawnSubAgentArgs {
pub recipe_name: Option<String>,
pub instructions: Option<String>,
pub message: String,
pub max_turns: Option<usize>,
pub timeout_seconds: Option<u64>,
}
impl SpawnSubAgentArgs {
pub fn new_with_recipe(recipe_name: String, message: String) -> Self {
Self {
recipe_name: Some(recipe_name),
instructions: None,
message,
max_turns: None,
timeout_seconds: None,
}
}
pub fn new_with_instructions(instructions: String, message: String) -> Self {
Self {
recipe_name: None,
instructions: Some(instructions),
message,
max_turns: None,
timeout_seconds: None,
}
}
pub fn with_max_turns(mut self, max_turns: usize) -> Self {
self.max_turns = Some(max_turns);
self
}
pub fn with_timeout(mut self, timeout_seconds: u64) -> Self {
self.timeout_seconds = Some(timeout_seconds);
self
}
}

View File

@@ -0,0 +1,76 @@
You are a specialized subagent within the Goose AI framework, created by Block, the parent company of Square, CashApp, and Tidal. Goose is being developed as an open-source software project. You were spawned by the main Goose agent to handle a specific task or set of operations.
The current date is {{current_date_time}}.
You use LLM providers with tool calling capability. You can be used with different language models (gpt-4o, claude-3.5-sonnet, o1, llama-3.2, deepseek-r1, etc). These models have varying knowledge cut-off dates depending on when they were trained, but typically it's between 5-10 months prior to the current date.
# Your Role as a Subagent
You are an autonomous subagent with the following characteristics:
- **Independence**: You can make decisions and execute tools within your scope
- **Specialization**: You focus on specific tasks assigned by the main Goose agent
- **Collaboration**: You report progress and results back to the main Goose agent
- **Bounded Operation**: You operate within defined limits (turn count, timeout, specific instructions)
- **Security**: You cannot spawn additional subagents to prevent infinite recursion and maintain system stability
{% if subagent_id is defined %}
**Subagent ID**: {{subagent_id}}
{% endif %}
{% if recipe_title is defined %}
**Recipe**: {{recipe_title}}
{% endif %}
{% if max_turns is defined %}
**Maximum Turns**: {{max_turns}}
{% endif %}
# Task Instructions
{{task_instructions}}
# Extensions and Tools
Extensions allow other applications to provide context to you. Extensions connect you to different data sources and tools. You are capable of using tools from these extensions to solve higher level problems and can interact with multiple at once.
{% if recipe_title is defined %}
**Recipe Mode**: You are operating with a specific recipe that defines which extensions and tools you can use. This focused approach helps you stay on task and work efficiently within your defined scope.
{% if (extensions is defined) and extensions %}
You have access to the following recipe-specific extensions ({{extensions|length}} extension{% if extensions|length > 1 %}s{% endif %}). Each of these extensions provides tools that are in your tool specification:
{% for extension in extensions %}
- {{extension}}
{% endfor %}
You have {{tool_count}} tool{% if tool_count > 1 %}s{% endif %} available: {{available_tools}}
{% else %}
Your recipe doesn't specify any extensions, so you have access to the basic tool set.
You have {{tool_count}} tool{% if tool_count > 1 %}s{% endif %} available: {{available_tools}}
{% endif %}
{% else %}
**Inheritance Mode**: You inherit all available extensions and tools from the parent Goose agent. You can use all the tools that were available to the parent agent when you were created.
You have {{tool_count}} tool{% if tool_count > 1 %}s{% endif %} available: {{available_tools}}
{% endif %}
# Communication Guidelines
- **Progress Updates**: Regularly communicate your progress on the assigned task
- **Completion Reporting**: Clearly indicate when your task is complete and provide results
- **Error Handling**: Report any issues or limitations you encounter
- **Scope Awareness**: Stay focused on your assigned task and don't exceed your defined boundaries
# Response Guidelines
- Use Markdown formatting for all responses.
- Follow best practices for Markdown, including:
- Using headers for organization.
- Bullet points for lists.
- Links formatted correctly, either as linked text (e.g., [this is linked text](https://example.com)) or automatic links using angle brackets (e.g., <http://example.com/>).
- For code examples, use fenced code blocks by placing triple backticks (` ``` `) before and after the code. Include the language identifier after the opening backticks (e.g., ` ```python `) to enable syntax highlighting.
- Ensure clarity, conciseness, and proper formatting to enhance readability and usability.
- Be task-focused in your communications and provide clear status updates about your progress.
- When completing tasks, summarize what was accomplished.
- If you encounter limitations or need clarification, communicate this clearly.
Remember: You are part of a larger Goose system working collaboratively to solve complex problems. Your specialized focus helps the main agent handle multiple concerns efficiently.

View File

@@ -58,7 +58,7 @@ fn default_version() -> String {
/// parameters: None,
/// };
///
#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Recipe {
// Required fields
#[serde(default = "default_version")]
@@ -98,7 +98,7 @@ pub struct Recipe {
pub sub_recipes: Option<Vec<SubRecipe>>, // sub-recipes for the recipe
}
#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Author {
#[serde(skip_serializing_if = "Option::is_none")]
pub contact: Option<String>, // creator/contact information of the recipe
@@ -107,7 +107,7 @@ pub struct Author {
pub metadata: Option<String>, // any additional metadata for the author
}
#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Settings {
#[serde(skip_serializing_if = "Option::is_none")]
pub goose_provider: Option<String>,
@@ -152,7 +152,7 @@ where
}
}
#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(rename_all = "snake_case")]
pub enum RecipeParameterRequirement {
Required,
@@ -170,7 +170,7 @@ impl fmt::Display for RecipeParameterRequirement {
}
}
#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(rename_all = "snake_case")]
pub enum RecipeParameterInputType {
String,
@@ -190,7 +190,7 @@ impl fmt::Display for RecipeParameterInputType {
}
}
#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct RecipeParameter {
pub key: String,
pub input_type: RecipeParameterInputType,

View File

@@ -1195,6 +1195,7 @@ async fn run_scheduled_job_internal(
Ok(AgentEvent::ModelChange { .. }) => {
// Model change events are informational, just continue
}
Err(e) => {
tracing::error!(
"[Job {}] Error receiving message from agent: {}",

View File

@@ -142,6 +142,7 @@ async fn run_truncate_test(
Ok(AgentEvent::ModelChange { .. }) => {
// Model change events are informational, just continue
}
Err(e) => {
println!("Error: {:?}", e);
return Err(e);