Platform Tool for Scheduler: Allow Goose to Manage Its Own Schedule (#2944)

This commit is contained in:
tlongwell-block
2025-06-19 10:04:31 -04:00
committed by GitHub
parent 46a8b9218a
commit f0b627c96a
10 changed files with 2023 additions and 5 deletions

View File

@@ -29,7 +29,10 @@ pub async fn run() -> Result<()> {
.join("schedules.json");
let scheduler_instance = SchedulerFactory::create(schedule_file_path).await?;
app_state.set_scheduler(scheduler_instance).await;
app_state.set_scheduler(scheduler_instance.clone()).await;
// NEW: Provide scheduler access to the agent
agent_ref.set_scheduler(scheduler_instance).await;
let cors = CorsLayer::new()
.allow_origin(Any)

View File

@@ -93,6 +93,8 @@ fn parse_session_name_to_iso(session_name: &str) -> String {
request_body = CreateScheduleRequest,
responses(
(status = 200, description = "Scheduled job created successfully", body = ScheduledJob),
(status = 400, description = "Invalid cron expression or recipe file"),
(status = 409, description = "Job ID already exists"),
(status = 500, description = "Internal server error")
),
tag = "schedule"
@@ -128,7 +130,13 @@ async fn create_schedule(
.await
.map_err(|e| {
eprintln!("Error creating schedule: {:?}", e); // Log error
StatusCode::INTERNAL_SERVER_ERROR
match e {
goose::scheduler::SchedulerError::JobNotFound(_) => StatusCode::NOT_FOUND,
goose::scheduler::SchedulerError::CronParseError(_) => StatusCode::BAD_REQUEST,
goose::scheduler::SchedulerError::RecipeLoadError(_) => StatusCode::BAD_REQUEST,
goose::scheduler::SchedulerError::JobIdExists(_) => StatusCode::CONFLICT,
_ => StatusCode::INTERNAL_SERVER_ERROR,
}
})?;
Ok(Json(job))
}

View File

@@ -17,6 +17,7 @@ use crate::permission::PermissionConfirmation;
use crate::providers::base::Provider;
use crate::providers::errors::ProviderError;
use crate::recipe::{Author, Recipe, Settings};
use crate::scheduler_trait::SchedulerTrait;
use crate::tool_monitor::{ToolCall, ToolMonitor};
use regex::Regex;
use serde_json::Value;
@@ -27,7 +28,8 @@ use crate::agents::extension::{ExtensionConfig, ExtensionError, ExtensionResult,
use crate::agents::extension_manager::{get_parameter_names, ExtensionManager};
use crate::agents::platform_tools::{
PLATFORM_LIST_RESOURCES_TOOL_NAME, PLATFORM_MANAGE_EXTENSIONS_TOOL_NAME,
PLATFORM_READ_RESOURCE_TOOL_NAME, PLATFORM_SEARCH_AVAILABLE_EXTENSIONS_TOOL_NAME,
PLATFORM_MANAGE_SCHEDULE_TOOL_NAME, PLATFORM_READ_RESOURCE_TOOL_NAME,
PLATFORM_SEARCH_AVAILABLE_EXTENSIONS_TOOL_NAME,
};
use crate::agents::prompt_manager::PromptManager;
use crate::agents::router_tool_selector::{
@@ -59,6 +61,7 @@ pub struct Agent {
pub(super) tool_result_rx: ToolResultReceiver,
pub(super) tool_monitor: Mutex<Option<ToolMonitor>>,
pub(super) router_tool_selector: Mutex<Option<Arc<Box<dyn RouterToolSelector>>>>,
pub(super) scheduler_service: Mutex<Option<Arc<dyn SchedulerTrait>>>,
}
#[derive(Clone, Debug)]
@@ -86,6 +89,7 @@ impl Agent {
tool_result_rx: Arc::new(Mutex::new(tool_rx)),
tool_monitor: Mutex::new(None),
router_tool_selector: Mutex::new(None),
scheduler_service: Mutex::new(None),
}
}
@@ -104,6 +108,12 @@ impl Agent {
monitor.reset();
}
}
/// Set the scheduler service for this agent
pub async fn set_scheduler(&self, scheduler: Arc<dyn SchedulerTrait>) {
let mut scheduler_service = self.scheduler_service.lock().await;
*scheduler_service = Some(scheduler);
}
}
impl Default for Agent {
@@ -185,7 +195,7 @@ impl Agent {
/// Dispatch a single tool call to the appropriate client
#[instrument(skip(self, tool_call, request_id), fields(input, output))]
pub(super) async fn dispatch_tool_call(
pub async fn dispatch_tool_call(
&self,
tool_call: mcp_core::tool::ToolCall,
request_id: String,
@@ -204,6 +214,13 @@ impl Agent {
}
}
if tool_call.name == PLATFORM_MANAGE_SCHEDULE_TOOL_NAME {
let result = self
.handle_schedule_management(tool_call.arguments, request_id.clone())
.await;
return (request_id, Ok(ToolCallResult::from(result)));
}
if tool_call.name == PLATFORM_MANAGE_EXTENSIONS_TOOL_NAME {
let extension_name = tool_call
.arguments
@@ -414,7 +431,7 @@ impl Agent {
let mut extension_manager = self.extension_manager.lock().await;
extension_manager.add_extension(extension.clone()).await?;
}
};
}
// If vector tool selection is enabled, index the tools
let selector = self.router_tool_selector.lock().await.clone();
@@ -453,6 +470,7 @@ impl Agent {
// Add platform tools
prefixed_tools.push(platform_tools::search_available_extensions_tool());
prefixed_tools.push(platform_tools::manage_extensions_tool());
prefixed_tools.push(platform_tools::manage_schedule_tool());
// Add resource tools if supported
if extension_manager.supports_resources() {

View File

@@ -8,6 +8,8 @@ pub mod prompt_manager;
mod reply_parts;
mod router_tool_selector;
mod router_tools;
mod schedule_tool;
mod tool_execution;
mod tool_router_index_manager;
pub(crate) mod tool_vectordb;

View File

@@ -7,6 +7,7 @@ pub const PLATFORM_LIST_RESOURCES_TOOL_NAME: &str = "platform__list_resources";
pub const PLATFORM_SEARCH_AVAILABLE_EXTENSIONS_TOOL_NAME: &str =
"platform__search_available_extensions";
pub const PLATFORM_MANAGE_EXTENSIONS_TOOL_NAME: &str = "platform__manage_extensions";
pub const PLATFORM_MANAGE_SCHEDULE_TOOL_NAME: &str = "platform__manage_schedule";
pub fn read_resource_tool() -> Tool {
Tool::new(
@@ -112,3 +113,47 @@ pub fn manage_extensions_tool() -> Tool {
}),
)
}
pub fn manage_schedule_tool() -> Tool {
Tool::new(
PLATFORM_MANAGE_SCHEDULE_TOOL_NAME.to_string(),
indoc! {r#"
Manage scheduled recipe execution for this Goose instance.
Actions:
- "list": List all scheduled jobs
- "create": Create a new scheduled job from a recipe file
- "run_now": Execute a scheduled job immediately
- "pause": Pause a scheduled job
- "unpause": Resume a paused job
- "delete": Remove a scheduled job
- "kill": Terminate a currently running job
- "inspect": Get details about a running job
- "sessions": List execution history for a job
- "session_content": Get the full content (messages) of a specific session
"#}
.to_string(),
json!({
"type": "object",
"required": ["action"],
"properties": {
"action": {
"type": "string",
"enum": ["list", "create", "run_now", "pause", "unpause", "delete", "kill", "inspect", "sessions", "session_content"]
},
"job_id": {"type": "string", "description": "Job identifier for operations on existing jobs"},
"recipe_path": {"type": "string", "description": "Path to recipe file for create action"},
"cron_expression": {"type": "string", "description": "A six field cron expression for create action"},
"limit": {"type": "integer", "description": "Limit for sessions list", "default": 50},
"session_id": {"type": "string", "description": "Session identifier for session_content action"}
}
}),
Some(ToolAnnotations {
title: Some("Manage scheduled recipes".to_string()),
read_only_hint: false,
destructive_hint: true, // Can kill jobs
idempotent_hint: false,
open_world_hint: false,
}),
)
}

View File

@@ -0,0 +1,420 @@
//! Schedule tool handlers for the Goose agent
//!
//! This module contains all the handlers for the schedule management platform tool,
//! including job creation, execution, monitoring, and session management.
use std::sync::Arc;
use chrono::Utc;
use mcp_core::{Content, ToolError, ToolResult};
use crate::recipe::Recipe;
use crate::scheduler_trait::SchedulerTrait;
use super::Agent;
impl Agent {
/// Handle schedule management tool calls
pub async fn handle_schedule_management(
&self,
arguments: serde_json::Value,
_request_id: String,
) -> ToolResult<Vec<Content>> {
let scheduler = match self.scheduler_service.lock().await.as_ref() {
Some(s) => s.clone(),
None => {
return Err(ToolError::ExecutionError(
"Scheduler not available. This tool only works in server mode.".to_string(),
))
}
};
let action = arguments
.get("action")
.and_then(|v| v.as_str())
.ok_or_else(|| ToolError::ExecutionError("Missing 'action' parameter".to_string()))?;
match action {
"list" => self.handle_list_jobs(scheduler).await,
"create" => self.handle_create_job(scheduler, arguments).await,
"run_now" => self.handle_run_now(scheduler, arguments).await,
"pause" => self.handle_pause_job(scheduler, arguments).await,
"unpause" => self.handle_unpause_job(scheduler, arguments).await,
"delete" => self.handle_delete_job(scheduler, arguments).await,
"kill" => self.handle_kill_job(scheduler, arguments).await,
"inspect" => self.handle_inspect_job(scheduler, arguments).await,
"sessions" => self.handle_list_sessions(scheduler, arguments).await,
"session_content" => self.handle_session_content(arguments).await,
_ => Err(ToolError::ExecutionError(format!(
"Unknown action: {}",
action
))),
}
}
/// List all scheduled jobs
async fn handle_list_jobs(
&self,
scheduler: Arc<dyn SchedulerTrait>,
) -> ToolResult<Vec<Content>> {
match scheduler.list_scheduled_jobs().await {
Ok(jobs) => {
let jobs_json = serde_json::to_string_pretty(&jobs).map_err(|e| {
ToolError::ExecutionError(format!("Failed to serialize jobs: {}", e))
})?;
Ok(vec![Content::text(format!(
"Scheduled Jobs:\n{}",
jobs_json
))])
}
Err(e) => Err(ToolError::ExecutionError(format!(
"Failed to list jobs: {}",
e
))),
}
}
/// Create a new scheduled job from a recipe file
async fn handle_create_job(
&self,
scheduler: Arc<dyn SchedulerTrait>,
arguments: serde_json::Value,
) -> ToolResult<Vec<Content>> {
let recipe_path = arguments
.get("recipe_path")
.and_then(|v| v.as_str())
.ok_or_else(|| {
ToolError::ExecutionError("Missing 'recipe_path' parameter".to_string())
})?;
let cron_expression = arguments
.get("cron_expression")
.and_then(|v| v.as_str())
.ok_or_else(|| {
ToolError::ExecutionError("Missing 'cron_expression' parameter".to_string())
})?;
// Validate recipe file exists and is readable
if !std::path::Path::new(recipe_path).exists() {
return Err(ToolError::ExecutionError(format!(
"Recipe file not found: {}",
recipe_path
)));
}
// Validate it's a valid recipe by trying to parse it
match std::fs::read_to_string(recipe_path) {
Ok(content) => {
if recipe_path.ends_with(".json") {
serde_json::from_str::<Recipe>(&content).map_err(|e| {
ToolError::ExecutionError(format!("Invalid JSON recipe: {}", e))
})?;
} else {
serde_yaml::from_str::<Recipe>(&content).map_err(|e| {
ToolError::ExecutionError(format!("Invalid YAML recipe: {}", e))
})?;
}
}
Err(e) => {
return Err(ToolError::ExecutionError(format!(
"Cannot read recipe file: {}",
e
)))
}
}
// Generate unique job ID
let job_id = format!("agent_created_{}", Utc::now().timestamp());
let job = crate::scheduler::ScheduledJob {
id: job_id.clone(),
source: recipe_path.to_string(),
cron: cron_expression.to_string(),
last_run: None,
currently_running: false,
paused: false,
current_session_id: None,
process_start_time: None,
};
match scheduler.add_scheduled_job(job).await {
Ok(()) => Ok(vec![Content::text(format!(
"Successfully created scheduled job '{}' for recipe '{}' with cron expression '{}'",
job_id, recipe_path, cron_expression
))]),
Err(e) => Err(ToolError::ExecutionError(format!(
"Failed to create job: {}",
e
))),
}
}
/// Run a scheduled job immediately
async fn handle_run_now(
&self,
scheduler: Arc<dyn SchedulerTrait>,
arguments: serde_json::Value,
) -> ToolResult<Vec<Content>> {
let job_id = arguments
.get("job_id")
.and_then(|v| v.as_str())
.ok_or_else(|| ToolError::ExecutionError("Missing 'job_id' parameter".to_string()))?;
match scheduler.run_now(job_id).await {
Ok(session_id) => Ok(vec![Content::text(format!(
"Successfully started job '{}'. Session ID: {}",
job_id, session_id
))]),
Err(e) => Err(ToolError::ExecutionError(format!(
"Failed to run job: {}",
e
))),
}
}
/// Pause a scheduled job
async fn handle_pause_job(
&self,
scheduler: Arc<dyn SchedulerTrait>,
arguments: serde_json::Value,
) -> ToolResult<Vec<Content>> {
let job_id = arguments
.get("job_id")
.and_then(|v| v.as_str())
.ok_or_else(|| ToolError::ExecutionError("Missing 'job_id' parameter".to_string()))?;
match scheduler.pause_schedule(job_id).await {
Ok(()) => Ok(vec![Content::text(format!(
"Successfully paused job '{}'",
job_id
))]),
Err(e) => Err(ToolError::ExecutionError(format!(
"Failed to pause job: {}",
e
))),
}
}
/// Resume a paused scheduled job
async fn handle_unpause_job(
&self,
scheduler: Arc<dyn SchedulerTrait>,
arguments: serde_json::Value,
) -> ToolResult<Vec<Content>> {
let job_id = arguments
.get("job_id")
.and_then(|v| v.as_str())
.ok_or_else(|| ToolError::ExecutionError("Missing 'job_id' parameter".to_string()))?;
match scheduler.unpause_schedule(job_id).await {
Ok(()) => Ok(vec![Content::text(format!(
"Successfully unpaused job '{}'",
job_id
))]),
Err(e) => Err(ToolError::ExecutionError(format!(
"Failed to unpause job: {}",
e
))),
}
}
/// Delete a scheduled job
async fn handle_delete_job(
&self,
scheduler: Arc<dyn SchedulerTrait>,
arguments: serde_json::Value,
) -> ToolResult<Vec<Content>> {
let job_id = arguments
.get("job_id")
.and_then(|v| v.as_str())
.ok_or_else(|| ToolError::ExecutionError("Missing 'job_id' parameter".to_string()))?;
match scheduler.remove_scheduled_job(job_id).await {
Ok(()) => Ok(vec![Content::text(format!(
"Successfully deleted job '{}'",
job_id
))]),
Err(e) => Err(ToolError::ExecutionError(format!(
"Failed to delete job: {}",
e
))),
}
}
/// Terminate a currently running job
async fn handle_kill_job(
&self,
scheduler: Arc<dyn SchedulerTrait>,
arguments: serde_json::Value,
) -> ToolResult<Vec<Content>> {
let job_id = arguments
.get("job_id")
.and_then(|v| v.as_str())
.ok_or_else(|| ToolError::ExecutionError("Missing 'job_id' parameter".to_string()))?;
match scheduler.kill_running_job(job_id).await {
Ok(()) => Ok(vec![Content::text(format!(
"Successfully killed running job '{}'",
job_id
))]),
Err(e) => Err(ToolError::ExecutionError(format!(
"Failed to kill job: {}",
e
))),
}
}
/// Get information about a running job
async fn handle_inspect_job(
&self,
scheduler: Arc<dyn SchedulerTrait>,
arguments: serde_json::Value,
) -> ToolResult<Vec<Content>> {
let job_id = arguments
.get("job_id")
.and_then(|v| v.as_str())
.ok_or_else(|| ToolError::ExecutionError("Missing 'job_id' parameter".to_string()))?;
match scheduler.get_running_job_info(job_id).await {
Ok(Some((session_id, start_time))) => {
let duration = Utc::now().signed_duration_since(start_time);
Ok(vec![Content::text(format!(
"Job '{}' is currently running:\n- Session ID: {}\n- Started: {}\n- Duration: {} seconds",
job_id, session_id, start_time.to_rfc3339(), duration.num_seconds()
))])
}
Ok(None) => Ok(vec![Content::text(format!(
"Job '{}' is not currently running",
job_id
))]),
Err(e) => Err(ToolError::ExecutionError(format!(
"Failed to inspect job: {}",
e
))),
}
}
/// List execution sessions for a job
async fn handle_list_sessions(
&self,
scheduler: Arc<dyn SchedulerTrait>,
arguments: serde_json::Value,
) -> ToolResult<Vec<Content>> {
let job_id = arguments
.get("job_id")
.and_then(|v| v.as_str())
.ok_or_else(|| ToolError::ExecutionError("Missing 'job_id' parameter".to_string()))?;
let limit = arguments
.get("limit")
.and_then(|v| v.as_u64())
.unwrap_or(50) as usize;
match scheduler.sessions(job_id, limit).await {
Ok(sessions) => {
if sessions.is_empty() {
Ok(vec![Content::text(format!(
"No sessions found for job '{}'",
job_id
))])
} else {
let sessions_info: Vec<String> = sessions
.into_iter()
.map(|(session_name, metadata)| {
format!(
"- Session: {} (Messages: {}, Working Dir: {})",
session_name,
metadata.message_count,
metadata.working_dir.display()
)
})
.collect();
Ok(vec![Content::text(format!(
"Sessions for job '{}':\n{}",
job_id,
sessions_info.join("\n")
))])
}
}
Err(e) => Err(ToolError::ExecutionError(format!(
"Failed to list sessions: {}",
e
))),
}
}
/// Get the full content (metadata and messages) of a specific session
async fn handle_session_content(
&self,
arguments: serde_json::Value,
) -> ToolResult<Vec<Content>> {
let session_id = arguments
.get("session_id")
.and_then(|v| v.as_str())
.ok_or_else(|| {
ToolError::ExecutionError("Missing 'session_id' parameter".to_string())
})?;
// Get the session file path
let session_path = crate::session::storage::get_path(
crate::session::storage::Identifier::Name(session_id.to_string()),
);
// Check if session file exists
if !session_path.exists() {
return Err(ToolError::ExecutionError(format!(
"Session '{}' not found",
session_id
)));
}
// Read session metadata
let metadata = match crate::session::storage::read_metadata(&session_path) {
Ok(metadata) => metadata,
Err(e) => {
return Err(ToolError::ExecutionError(format!(
"Failed to read session metadata: {}",
e
)));
}
};
// Read session messages
let messages = match crate::session::storage::read_messages(&session_path) {
Ok(messages) => messages,
Err(e) => {
return Err(ToolError::ExecutionError(format!(
"Failed to read session messages: {}",
e
)));
}
};
// Format the response with metadata and messages
let metadata_json = match serde_json::to_string_pretty(&metadata) {
Ok(json) => json,
Err(e) => {
return Err(ToolError::ExecutionError(format!(
"Failed to serialize metadata: {}",
e
)));
}
};
let messages_json = match serde_json::to_string_pretty(&messages) {
Ok(json) => json,
Err(e) => {
return Err(ToolError::ExecutionError(format!(
"Failed to serialize messages: {}",
e
)));
}
};
Ok(vec![Content::text(format!(
"Session '{}' Content:\n\nMetadata:\n{}\n\nMessages:\n{}",
session_id, metadata_json, messages_json
))])
}
}

View File

@@ -330,3 +330,190 @@ mod tests {
.await
}
}
#[cfg(test)]
mod schedule_tool_tests {
use super::*;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use goose::agents::platform_tools::PLATFORM_MANAGE_SCHEDULE_TOOL_NAME;
use goose::scheduler::{ScheduledJob, SchedulerError};
use goose::scheduler_trait::SchedulerTrait;
use goose::session::storage::SessionMetadata;
use std::sync::Arc;
// Mock scheduler for testing
struct MockScheduler {
jobs: tokio::sync::Mutex<Vec<ScheduledJob>>,
}
impl MockScheduler {
fn new() -> Self {
Self {
jobs: tokio::sync::Mutex::new(Vec::new()),
}
}
}
#[async_trait]
impl SchedulerTrait for MockScheduler {
async fn add_scheduled_job(&self, job: ScheduledJob) -> Result<(), SchedulerError> {
let mut jobs = self.jobs.lock().await;
jobs.push(job);
Ok(())
}
async fn list_scheduled_jobs(&self) -> Result<Vec<ScheduledJob>, SchedulerError> {
let jobs = self.jobs.lock().await;
Ok(jobs.clone())
}
async fn remove_scheduled_job(&self, id: &str) -> Result<(), SchedulerError> {
let mut jobs = self.jobs.lock().await;
if let Some(pos) = jobs.iter().position(|job| job.id == id) {
jobs.remove(pos);
Ok(())
} else {
Err(SchedulerError::JobNotFound(id.to_string()))
}
}
async fn pause_schedule(&self, _id: &str) -> Result<(), SchedulerError> {
Ok(())
}
async fn unpause_schedule(&self, _id: &str) -> Result<(), SchedulerError> {
Ok(())
}
async fn run_now(&self, _id: &str) -> Result<String, SchedulerError> {
Ok("test_session_123".to_string())
}
async fn sessions(
&self,
_sched_id: &str,
_limit: usize,
) -> Result<Vec<(String, SessionMetadata)>, SchedulerError> {
Ok(vec![])
}
async fn update_schedule(
&self,
_sched_id: &str,
_new_cron: String,
) -> Result<(), SchedulerError> {
Ok(())
}
async fn kill_running_job(&self, _sched_id: &str) -> Result<(), SchedulerError> {
Ok(())
}
async fn get_running_job_info(
&self,
_sched_id: &str,
) -> Result<Option<(String, DateTime<Utc>)>, SchedulerError> {
Ok(None)
}
}
#[tokio::test]
async fn test_schedule_management_tool_list() {
let agent = Agent::new();
let mock_scheduler = Arc::new(MockScheduler::new());
agent.set_scheduler(mock_scheduler.clone()).await;
// Test that the schedule management tool is available in the tools list
let tools = agent.list_tools(None).await;
let schedule_tool = tools
.iter()
.find(|tool| tool.name == PLATFORM_MANAGE_SCHEDULE_TOOL_NAME);
assert!(schedule_tool.is_some());
let tool = schedule_tool.unwrap();
assert!(tool
.description
.contains("Manage scheduled recipe execution"));
}
#[tokio::test]
async fn test_schedule_management_tool_no_scheduler() {
let agent = Agent::new();
// Don't set scheduler - test that the tool still appears in the list
// but would fail if actually called (which we can't test directly through public API)
let tools = agent.list_tools(None).await;
let schedule_tool = tools
.iter()
.find(|tool| tool.name == PLATFORM_MANAGE_SCHEDULE_TOOL_NAME);
assert!(schedule_tool.is_some());
}
#[tokio::test]
async fn test_schedule_management_tool_in_platform_tools() {
let agent = Agent::new();
let tools = agent.list_tools(Some("platform".to_string())).await;
// Check that the schedule management tool is included in platform tools
let schedule_tool = tools
.iter()
.find(|tool| tool.name == PLATFORM_MANAGE_SCHEDULE_TOOL_NAME);
assert!(schedule_tool.is_some());
let tool = schedule_tool.unwrap();
assert!(tool
.description
.contains("Manage scheduled recipe execution"));
// Verify the tool has the expected actions in its schema
if let Some(properties) = tool.input_schema.get("properties") {
if let Some(action_prop) = properties.get("action") {
if let Some(enum_values) = action_prop.get("enum") {
let actions: Vec<String> = enum_values
.as_array()
.unwrap()
.iter()
.map(|v| v.as_str().unwrap().to_string())
.collect();
// Check that our session_content action is included
assert!(actions.contains(&"session_content".to_string()));
assert!(actions.contains(&"list".to_string()));
assert!(actions.contains(&"create".to_string()));
assert!(actions.contains(&"sessions".to_string()));
}
}
}
}
#[tokio::test]
async fn test_schedule_management_tool_schema_validation() {
let agent = Agent::new();
let tools = agent.list_tools(None).await;
let schedule_tool = tools
.iter()
.find(|tool| tool.name == PLATFORM_MANAGE_SCHEDULE_TOOL_NAME);
assert!(schedule_tool.is_some());
let tool = schedule_tool.unwrap();
// Verify the tool schema has the session_id parameter for session_content action
if let Some(properties) = tool.input_schema.get("properties") {
assert!(properties.get("session_id").is_some());
if let Some(session_id_prop) = properties.get("session_id") {
assert_eq!(
session_id_prop.get("type").unwrap().as_str().unwrap(),
"string"
);
assert!(session_id_prop
.get("description")
.unwrap()
.as_str()
.unwrap()
.contains("Session identifier for session_content action"));
}
}
}
}

View File

@@ -0,0 +1,901 @@
#![cfg(test)]
use mcp_core::{Content, ToolError};
use serde_json::json;
use goose::agents::platform_tools::PLATFORM_MANAGE_SCHEDULE_TOOL_NAME;
mod test_support;
use test_support::{
create_temp_recipe, create_test_session_metadata, MockBehavior, ScheduleToolTestBuilder,
};
// Test all actions of the scheduler platform tool
#[tokio::test]
async fn test_schedule_tool_list_action() {
// Create a test builder with existing jobs
let (agent, scheduler) = ScheduleToolTestBuilder::new()
.with_existing_job("job1", "*/5 * * * * *")
.await
.with_existing_job("job2", "0 0 * * * *")
.await
.build()
.await;
// Test list action
let arguments = json!({
"action": "list"
});
let result = agent
.handle_schedule_management(arguments, "test_req".to_string())
.await;
assert!(result.is_ok());
let content = result.unwrap();
assert_eq!(content.len(), 1);
if let Content::Text(text_content) = &content[0] {
assert!(text_content.text.contains("Scheduled Jobs:"));
assert!(text_content.text.contains("job1"));
assert!(text_content.text.contains("job2"));
} else {
panic!("Expected text content");
}
// Verify the scheduler was called
let calls = scheduler.get_calls().await;
assert!(calls.contains(&"list_scheduled_jobs".to_string()));
}
#[tokio::test]
async fn test_schedule_tool_list_action_empty() {
// Create a test builder with no jobs
let (agent, scheduler) = ScheduleToolTestBuilder::new().build().await;
// Test list action
let arguments = json!({
"action": "list"
});
let result = agent
.handle_schedule_management(arguments, "test_req".to_string())
.await;
assert!(result.is_ok());
let content = result.unwrap();
assert_eq!(content.len(), 1);
if let Content::Text(text_content) = &content[0] {
assert!(text_content.text.contains("Scheduled Jobs:"));
}
// Verify the scheduler was called
let calls = scheduler.get_calls().await;
assert!(calls.contains(&"list_scheduled_jobs".to_string()));
}
#[tokio::test]
async fn test_schedule_tool_list_action_error() {
// Create a test builder with a list error
let (agent, scheduler) = ScheduleToolTestBuilder::new()
.with_scheduler_behavior(
"list_scheduled_jobs",
MockBehavior::InternalError("Database error".to_string()),
)
.await
.build()
.await;
// Test list action
let arguments = json!({
"action": "list"
});
let result = agent
.handle_schedule_management(arguments, "test_req".to_string())
.await;
assert!(result.is_err());
if let Err(ToolError::ExecutionError(msg)) = result {
assert!(msg.contains("Failed to list jobs"));
assert!(msg.contains("Database error"));
} else {
panic!("Expected ExecutionError");
}
// Verify the scheduler was called
let calls = scheduler.get_calls().await;
assert!(calls.contains(&"list_scheduled_jobs".to_string()));
}
#[tokio::test]
async fn test_schedule_tool_create_action() {
let (agent, scheduler) = ScheduleToolTestBuilder::new().build().await;
// Create a temporary recipe file
let temp_recipe = create_temp_recipe(true, "json");
// Test create action
let arguments = json!({
"action": "create",
"recipe_path": temp_recipe.path.to_str().unwrap(),
"cron_expression": "*/5 * * * * *"
});
let result = agent
.handle_schedule_management(arguments, "test_req".to_string())
.await;
assert!(result.is_ok());
let content = result.unwrap();
assert_eq!(content.len(), 1);
if let Content::Text(text_content) = &content[0] {
assert!(text_content
.text
.contains("Successfully created scheduled job"));
}
// Verify the scheduler was called
let calls = scheduler.get_calls().await;
assert!(calls.contains(&"add_scheduled_job".to_string()));
}
#[tokio::test]
async fn test_schedule_tool_create_action_missing_params() {
let (agent, _) = ScheduleToolTestBuilder::new().build().await;
// Test create action with missing recipe_path
let arguments = json!({
"action": "create",
"cron_expression": "*/5 * * * * *"
});
let result = agent
.handle_schedule_management(arguments, "test_req".to_string())
.await;
assert!(result.is_err());
if let Err(ToolError::ExecutionError(msg)) = result {
assert!(msg.contains("Missing 'recipe_path' parameter"));
} else {
panic!("Expected ExecutionError");
}
// Test create action with missing cron_expression
let temp_recipe = create_temp_recipe(true, "json");
let arguments = json!({
"action": "create",
"recipe_path": temp_recipe.path.to_str().unwrap()
});
let result = agent
.handle_schedule_management(arguments, "test_req".to_string())
.await;
assert!(result.is_err());
if let Err(ToolError::ExecutionError(msg)) = result {
assert!(msg.contains("Missing 'cron_expression' parameter"));
} else {
panic!("Expected ExecutionError");
}
}
#[tokio::test]
async fn test_schedule_tool_create_action_nonexistent_recipe() {
let (agent, _) = ScheduleToolTestBuilder::new().build().await;
// Test create action with nonexistent recipe
let arguments = json!({
"action": "create",
"recipe_path": "/nonexistent/recipe.json",
"cron_expression": "*/5 * * * * *"
});
let result = agent
.handle_schedule_management(arguments, "test_req".to_string())
.await;
assert!(result.is_err());
if let Err(ToolError::ExecutionError(msg)) = result {
assert!(msg.contains("Recipe file not found"));
} else {
panic!("Expected ExecutionError");
}
}
#[tokio::test]
async fn test_schedule_tool_create_action_invalid_recipe() {
let (agent, _) = ScheduleToolTestBuilder::new().build().await;
// Create an invalid recipe file
let temp_recipe = create_temp_recipe(false, "json");
// Test create action with invalid recipe
let arguments = json!({
"action": "create",
"recipe_path": temp_recipe.path.to_str().unwrap(),
"cron_expression": "*/5 * * * * *"
});
let result = agent
.handle_schedule_management(arguments, "test_req".to_string())
.await;
assert!(result.is_err());
if let Err(ToolError::ExecutionError(msg)) = result {
assert!(msg.contains("Invalid JSON recipe"));
} else {
panic!("Expected ExecutionError");
}
}
#[tokio::test]
async fn test_schedule_tool_create_action_scheduler_error() {
let (agent, scheduler) = ScheduleToolTestBuilder::new()
.with_scheduler_behavior(
"add_scheduled_job",
MockBehavior::AlreadyExists("job1".to_string()),
)
.await
.build()
.await;
// Create a temporary recipe file
let temp_recipe = create_temp_recipe(true, "json");
// Test create action
let arguments = json!({
"action": "create",
"recipe_path": temp_recipe.path.to_str().unwrap(),
"cron_expression": "*/5 * * * * *"
});
let result = agent
.handle_schedule_management(arguments, "test_req".to_string())
.await;
assert!(result.is_err());
if let Err(ToolError::ExecutionError(msg)) = result {
assert!(msg.contains("Failed to create job"));
assert!(msg.contains("job1"));
} else {
panic!("Expected ExecutionError");
}
// Verify the scheduler was called
let calls = scheduler.get_calls().await;
assert!(calls.contains(&"add_scheduled_job".to_string()));
}
#[tokio::test]
async fn test_schedule_tool_run_now_action() {
let (agent, scheduler) = ScheduleToolTestBuilder::new()
.with_existing_job("job1", "*/5 * * * * *")
.await
.build()
.await;
// Test run_now action
let arguments = json!({
"action": "run_now",
"job_id": "job1"
});
let result = agent
.handle_schedule_management(arguments, "test_req".to_string())
.await;
assert!(result.is_ok());
let content = result.unwrap();
assert_eq!(content.len(), 1);
if let Content::Text(text_content) = &content[0] {
assert!(text_content
.text
.contains("Successfully started job 'job1'"));
}
// Verify the scheduler was called
let calls = scheduler.get_calls().await;
assert!(calls.contains(&"run_now".to_string()));
}
#[tokio::test]
async fn test_schedule_tool_run_now_action_missing_job_id() {
let (agent, _) = ScheduleToolTestBuilder::new().build().await;
// Test run_now action with missing job_id
let arguments = json!({
"action": "run_now"
});
let result = agent
.handle_schedule_management(arguments, "test_req".to_string())
.await;
assert!(result.is_err());
if let Err(ToolError::ExecutionError(msg)) = result {
assert!(msg.contains("Missing 'job_id' parameter"));
} else {
panic!("Expected ExecutionError");
}
}
#[tokio::test]
async fn test_schedule_tool_run_now_action_nonexistent_job() {
let (agent, scheduler) = ScheduleToolTestBuilder::new()
.with_scheduler_behavior("run_now", MockBehavior::NotFound("nonexistent".to_string()))
.await
.build()
.await;
// Test run_now action with nonexistent job
let arguments = json!({
"action": "run_now",
"job_id": "nonexistent"
});
let result = agent
.handle_schedule_management(arguments, "test_req".to_string())
.await;
assert!(result.is_err());
if let Err(ToolError::ExecutionError(msg)) = result {
assert!(msg.contains("Failed to run job"));
assert!(msg.contains("nonexistent"));
} else {
panic!("Expected ExecutionError");
}
// Verify the scheduler was called
let calls = scheduler.get_calls().await;
assert!(calls.contains(&"run_now".to_string()));
}
#[tokio::test]
async fn test_schedule_tool_pause_action() {
let (agent, scheduler) = ScheduleToolTestBuilder::new()
.with_existing_job("job1", "*/5 * * * * *")
.await
.build()
.await;
// Test pause action
let arguments = json!({
"action": "pause",
"job_id": "job1"
});
let result = agent
.handle_schedule_management(arguments, "test_req".to_string())
.await;
assert!(result.is_ok());
let content = result.unwrap();
assert_eq!(content.len(), 1);
if let Content::Text(text_content) = &content[0] {
assert!(text_content.text.contains("Successfully paused job 'job1'"));
}
// Verify the scheduler was called
let calls = scheduler.get_calls().await;
assert!(calls.contains(&"pause_schedule".to_string()));
}
#[tokio::test]
async fn test_schedule_tool_pause_action_missing_job_id() {
let (agent, _) = ScheduleToolTestBuilder::new().build().await;
// Test pause action with missing job_id
let arguments = json!({
"action": "pause"
});
let result = agent
.handle_schedule_management(arguments, "test_req".to_string())
.await;
assert!(result.is_err());
if let Err(ToolError::ExecutionError(msg)) = result {
assert!(msg.contains("Missing 'job_id' parameter"));
} else {
panic!("Expected ExecutionError");
}
}
#[tokio::test]
async fn test_schedule_tool_pause_action_running_job() {
let (agent, scheduler) = ScheduleToolTestBuilder::new()
.with_scheduler_behavior(
"pause_schedule",
MockBehavior::JobCurrentlyRunning("job1".to_string()),
)
.await
.build()
.await;
// Test pause action with a running job
let arguments = json!({
"action": "pause",
"job_id": "job1"
});
let result = agent
.handle_schedule_management(arguments, "test_req".to_string())
.await;
assert!(result.is_err());
if let Err(ToolError::ExecutionError(msg)) = result {
assert!(msg.contains("Failed to pause job"));
assert!(msg.contains("job1"));
} else {
panic!("Expected ExecutionError");
}
// Verify the scheduler was called
let calls = scheduler.get_calls().await;
assert!(calls.contains(&"pause_schedule".to_string()));
}
#[tokio::test]
async fn test_schedule_tool_unpause_action() {
let (agent, scheduler) = ScheduleToolTestBuilder::new()
.with_existing_job("job1", "*/5 * * * * *")
.await
.build()
.await;
// Test unpause action
let arguments = json!({
"action": "unpause",
"job_id": "job1"
});
let result = agent
.handle_schedule_management(arguments, "test_req".to_string())
.await;
assert!(result.is_ok());
let content = result.unwrap();
assert_eq!(content.len(), 1);
if let Content::Text(text_content) = &content[0] {
assert!(text_content
.text
.contains("Successfully unpaused job 'job1'"));
}
// Verify the scheduler was called
let calls = scheduler.get_calls().await;
assert!(calls.contains(&"unpause_schedule".to_string()));
}
#[tokio::test]
async fn test_schedule_tool_delete_action() {
let (agent, scheduler) = ScheduleToolTestBuilder::new()
.with_existing_job("job1", "*/5 * * * * *")
.await
.build()
.await;
// Test delete action
let arguments = json!({
"action": "delete",
"job_id": "job1"
});
let result = agent
.handle_schedule_management(arguments, "test_req".to_string())
.await;
assert!(result.is_ok());
let content = result.unwrap();
assert_eq!(content.len(), 1);
if let Content::Text(text_content) = &content[0] {
assert!(text_content
.text
.contains("Successfully deleted job 'job1'"));
}
// Verify the scheduler was called
let calls = scheduler.get_calls().await;
assert!(calls.contains(&"remove_scheduled_job".to_string()));
}
#[tokio::test]
async fn test_schedule_tool_kill_action() {
let (agent, scheduler) = ScheduleToolTestBuilder::new()
.with_existing_job("job1", "*/5 * * * * *")
.await
.with_running_job("job1")
.await
.build()
.await;
// Test kill action
let arguments = json!({
"action": "kill",
"job_id": "job1"
});
let result = agent
.handle_schedule_management(arguments, "test_req".to_string())
.await;
assert!(result.is_ok());
let content = result.unwrap();
assert_eq!(content.len(), 1);
if let Content::Text(text_content) = &content[0] {
assert!(text_content
.text
.contains("Successfully killed running job 'job1'"));
}
// Verify the scheduler was called
let calls = scheduler.get_calls().await;
assert!(calls.contains(&"kill_running_job".to_string()));
}
#[tokio::test]
async fn test_schedule_tool_kill_action_not_running() {
let (agent, scheduler) = ScheduleToolTestBuilder::new()
.with_existing_job("job1", "*/5 * * * * *")
.await
.build()
.await;
// Test kill action with a job that's not running
let arguments = json!({
"action": "kill",
"job_id": "job1"
});
let result = agent
.handle_schedule_management(arguments, "test_req".to_string())
.await;
assert!(result.is_err());
if let Err(ToolError::ExecutionError(msg)) = result {
assert!(msg.contains("Failed to kill job"));
} else {
panic!("Expected ExecutionError");
}
// Verify the scheduler was called
let calls = scheduler.get_calls().await;
assert!(calls.contains(&"kill_running_job".to_string()));
}
#[tokio::test]
async fn test_schedule_tool_inspect_action_running() {
let (agent, scheduler) = ScheduleToolTestBuilder::new()
.with_existing_job("job1", "*/5 * * * * *")
.await
.with_running_job("job1")
.await
.build()
.await;
// Test inspect action
let arguments = json!({
"action": "inspect",
"job_id": "job1"
});
let result = agent
.handle_schedule_management(arguments, "test_req".to_string())
.await;
assert!(result.is_ok());
let content = result.unwrap();
assert_eq!(content.len(), 1);
if let Content::Text(text_content) = &content[0] {
assert!(text_content
.text
.contains("Job 'job1' is currently running"));
}
// Verify the scheduler was called
let calls = scheduler.get_calls().await;
assert!(calls.contains(&"get_running_job_info".to_string()));
}
#[tokio::test]
async fn test_schedule_tool_inspect_action_not_running() {
let (agent, scheduler) = ScheduleToolTestBuilder::new()
.with_existing_job("job1", "*/5 * * * * *")
.await
.build()
.await;
// Test inspect action with a job that's not running
let arguments = json!({
"action": "inspect",
"job_id": "job1"
});
let result = agent
.handle_schedule_management(arguments, "test_req".to_string())
.await;
assert!(result.is_ok());
let content = result.unwrap();
assert_eq!(content.len(), 1);
if let Content::Text(text_content) = &content[0] {
assert!(text_content
.text
.contains("Job 'job1' is not currently running"));
}
// Verify the scheduler was called
let calls = scheduler.get_calls().await;
assert!(calls.contains(&"get_running_job_info".to_string()));
}
#[tokio::test]
async fn test_schedule_tool_sessions_action() {
// Create test session metadata
let sessions = vec![
(
"1234567890_session1".to_string(),
create_test_session_metadata(5, "/tmp"),
),
(
"0987654321_session2".to_string(),
create_test_session_metadata(10, "/home"),
),
];
let (agent, scheduler) = ScheduleToolTestBuilder::new()
.with_existing_job("job1", "*/5 * * * * *")
.await
.with_sessions_data("job1", sessions)
.await
.build()
.await;
// Test sessions action
let arguments = json!({
"action": "sessions",
"job_id": "job1"
});
let result = agent
.handle_schedule_management(arguments, "test_req".to_string())
.await;
assert!(result.is_ok());
let content = result.unwrap();
assert_eq!(content.len(), 1);
if let Content::Text(text_content) = &content[0] {
assert!(text_content.text.contains("Sessions for job 'job1'"));
assert!(text_content.text.contains("session1"));
assert!(text_content.text.contains("session2"));
}
// Verify the scheduler was called
let calls = scheduler.get_calls().await;
assert!(calls.contains(&"sessions".to_string()));
}
#[tokio::test]
async fn test_schedule_tool_sessions_action_with_limit() {
// Create test session metadata
let sessions = vec![
(
"1234567890_session1".to_string(),
create_test_session_metadata(5, "/tmp"),
),
(
"0987654321_session2".to_string(),
create_test_session_metadata(10, "/home"),
),
(
"5555555555_session3".to_string(),
create_test_session_metadata(15, "/usr"),
),
];
let (agent, scheduler) = ScheduleToolTestBuilder::new()
.with_existing_job("job1", "*/5 * * * * *")
.await
.with_sessions_data("job1", sessions)
.await
.build()
.await;
// Test sessions action with limit
let arguments = json!({
"action": "sessions",
"job_id": "job1",
"limit": 2
});
let result = agent
.handle_schedule_management(arguments, "test_req".to_string())
.await;
assert!(result.is_ok());
// Verify the scheduler was called
let calls = scheduler.get_calls().await;
assert!(calls.contains(&"sessions".to_string()));
}
#[tokio::test]
async fn test_schedule_tool_sessions_action_empty() {
let (agent, scheduler) = ScheduleToolTestBuilder::new()
.with_existing_job("job1", "*/5 * * * * *")
.await
.build()
.await;
// Test sessions action with no sessions
let arguments = json!({
"action": "sessions",
"job_id": "job1"
});
let result = agent
.handle_schedule_management(arguments, "test_req".to_string())
.await;
assert!(result.is_ok());
let content = result.unwrap();
assert_eq!(content.len(), 1);
if let Content::Text(text_content) = &content[0] {
assert!(text_content
.text
.contains("No sessions found for job 'job1'"));
}
// Verify the scheduler was called
let calls = scheduler.get_calls().await;
assert!(calls.contains(&"sessions".to_string()));
}
#[tokio::test]
async fn test_schedule_tool_session_content_action() {
let (agent, _) = ScheduleToolTestBuilder::new().build().await;
// Test with a non-existent session
let arguments = json!({
"action": "session_content",
"session_id": "non_existent_session"
});
let result = agent
.handle_schedule_management(arguments, "test_req".to_string())
.await;
assert!(result.is_err());
if let Err(ToolError::ExecutionError(msg)) = result {
assert!(msg.contains("Session 'non_existent_session' not found"));
} else {
panic!("Expected ExecutionError");
}
}
#[tokio::test]
async fn test_schedule_tool_session_content_action_with_real_session() {
let (agent, _) = ScheduleToolTestBuilder::new().build().await;
// Create a temporary session file in the proper session directory
let session_dir = goose::session::storage::ensure_session_dir().unwrap();
let session_id = "test_session_real";
let session_path = session_dir.join(format!("{}.jsonl", session_id));
// Create test metadata and messages
let metadata = create_test_session_metadata(2, "/tmp");
let messages = vec![
goose::message::Message::user().with_text("Hello"),
goose::message::Message::assistant().with_text("Hi there!"),
];
// Save the session file
goose::session::storage::save_messages_with_metadata(&session_path, &metadata, &messages)
.unwrap();
// Test the session_content action
let arguments = json!({
"action": "session_content",
"session_id": session_id
});
let result = agent
.handle_schedule_management(arguments, "test_req".to_string())
.await;
// Clean up the test session file
let _ = std::fs::remove_file(&session_path);
// Verify the result
assert!(result.is_ok());
if let Ok(content) = result {
assert_eq!(content.len(), 1);
if let mcp_core::Content::Text(text_content) = &content[0] {
assert!(text_content
.text
.contains("Session 'test_session_real' Content:"));
assert!(text_content.text.contains("Metadata:"));
assert!(text_content.text.contains("Messages:"));
assert!(text_content.text.contains("Hello"));
assert!(text_content.text.contains("Hi there!"));
assert!(text_content.text.contains("Test session"));
} else {
panic!("Expected text content");
}
} else {
panic!("Expected successful result");
}
}
#[tokio::test]
async fn test_schedule_tool_session_content_action_missing_session_id() {
let (agent, _) = ScheduleToolTestBuilder::new().build().await;
// Test session_content action with missing session_id
let arguments = json!({
"action": "session_content"
});
let result = agent
.handle_schedule_management(arguments, "test_req".to_string())
.await;
assert!(result.is_err());
if let Err(ToolError::ExecutionError(msg)) = result {
assert!(msg.contains("Missing 'session_id' parameter"));
} else {
panic!("Expected ExecutionError");
}
}
#[tokio::test]
async fn test_schedule_tool_unknown_action() {
let (agent, _) = ScheduleToolTestBuilder::new().build().await;
// Test unknown action
let arguments = json!({
"action": "unknown_action"
});
let result = agent
.handle_schedule_management(arguments, "test_req".to_string())
.await;
assert!(result.is_err());
if let Err(ToolError::ExecutionError(msg)) = result {
assert!(msg.contains("Unknown action"));
} else {
panic!("Expected ExecutionError");
}
}
#[tokio::test]
async fn test_schedule_tool_dispatch() {
let (agent, scheduler) = ScheduleToolTestBuilder::new()
.with_existing_job("job1", "*/5 * * * * *")
.await
.build()
.await;
// Test that the tool is properly dispatched through dispatch_tool_call
let tool_call = mcp_core::tool::ToolCall {
name: PLATFORM_MANAGE_SCHEDULE_TOOL_NAME.to_string(),
arguments: json!({
"action": "list"
}),
};
let (request_id, result) = agent
.dispatch_tool_call(tool_call, "test_dispatch".to_string())
.await;
assert_eq!(request_id, "test_dispatch");
assert!(result.is_ok());
let tool_result = result.unwrap();
// The result should be a future that resolves to the tool output
let output = tool_result.result.await;
assert!(output.is_ok());
// Verify the scheduler was called
let calls = scheduler.get_calls().await;
assert!(calls.contains(&"list_scheduled_jobs".to_string()));
}

View File

@@ -0,0 +1,21 @@
//! Test-only utilities for the scheduler
#![cfg(test)]
use once_cell::sync::Lazy;
use std::sync::Arc;
use tokio::sync::Mutex;
use goose::providers::base::Provider as GooseProvider;
static TEST_PROVIDER: Lazy<Mutex<Option<Arc<dyn GooseProvider>>>> = Lazy::new(|| Mutex::new(None));
/// Register a default provider for scheduler job executions when running under tests.
/// The provider will be used by [`Scheduler`] when no provider_override is supplied.
pub async fn set_test_provider(p: Arc<dyn GooseProvider>) {
let mut guard = TEST_PROVIDER.lock().await;
*guard = Some(p);
}
pub async fn get_test_provider() -> Option<Arc<dyn GooseProvider>> {
TEST_PROVIDER.lock().await.clone()
}

View File

@@ -0,0 +1,413 @@
#![cfg(test)]
use std::collections::{HashMap, HashSet};
use std::path::PathBuf;
use std::sync::Arc;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use tempfile::TempDir;
use tokio::sync::Mutex;
use goose::agents::Agent;
use goose::scheduler::{ScheduledJob, SchedulerError};
use goose::scheduler_trait::SchedulerTrait;
use goose::session::storage::SessionMetadata;
#[derive(Debug, Clone)]
pub enum MockBehavior {
Success,
NotFound(String),
AlreadyExists(String),
InternalError(String),
JobCurrentlyRunning(String),
}
#[derive(Clone)]
pub struct ConfigurableMockScheduler {
jobs: Arc<Mutex<HashMap<String, ScheduledJob>>>,
running_jobs: Arc<Mutex<HashSet<String>>>,
call_log: Arc<Mutex<Vec<String>>>,
behaviors: Arc<Mutex<HashMap<String, MockBehavior>>>,
sessions_data: Arc<Mutex<HashMap<String, Vec<(String, SessionMetadata)>>>>,
}
impl ConfigurableMockScheduler {
pub fn new() -> Self {
Self {
jobs: Arc::new(Mutex::new(HashMap::new())),
running_jobs: Arc::new(Mutex::new(HashSet::new())),
call_log: Arc::new(Mutex::new(Vec::new())),
behaviors: Arc::new(Mutex::new(HashMap::new())),
sessions_data: Arc::new(Mutex::new(HashMap::new())),
}
}
pub async fn with_behavior(self, method: &str, behavior: MockBehavior) -> Self {
self.behaviors
.lock()
.await
.insert(method.to_string(), behavior);
self
}
pub async fn with_existing_job(self, job: ScheduledJob) -> Self {
self.jobs.lock().await.insert(job.id.clone(), job);
self
}
pub async fn with_running_job(self, job_id: &str) -> Self {
self.running_jobs.lock().await.insert(job_id.to_string());
self
}
pub async fn with_sessions_data(
self,
job_id: &str,
sessions: Vec<(String, SessionMetadata)>,
) -> Self {
self.sessions_data
.lock()
.await
.insert(job_id.to_string(), sessions);
self
}
pub async fn get_calls(&self) -> Vec<String> {
self.call_log.lock().await.clone()
}
async fn log_call(&self, method: &str) {
self.call_log.lock().await.push(method.to_string());
}
async fn get_behavior(&self, method: &str) -> MockBehavior {
self.behaviors
.lock()
.await
.get(method)
.cloned()
.unwrap_or(MockBehavior::Success)
}
}
#[async_trait]
impl SchedulerTrait for ConfigurableMockScheduler {
async fn add_scheduled_job(&self, job: ScheduledJob) -> Result<(), SchedulerError> {
self.log_call("add_scheduled_job").await;
match self.get_behavior("add_scheduled_job").await {
MockBehavior::Success => {
let mut jobs = self.jobs.lock().await;
if jobs.contains_key(&job.id) {
return Err(SchedulerError::JobIdExists(job.id));
}
jobs.insert(job.id.clone(), job);
Ok(())
}
MockBehavior::AlreadyExists(id) => Err(SchedulerError::JobIdExists(id)),
MockBehavior::InternalError(msg) => Err(SchedulerError::SchedulerInternalError(msg)),
_ => Ok(()),
}
}
async fn list_scheduled_jobs(&self) -> Result<Vec<ScheduledJob>, SchedulerError> {
self.log_call("list_scheduled_jobs").await;
match self.get_behavior("list_scheduled_jobs").await {
MockBehavior::Success => {
let jobs = self.jobs.lock().await;
Ok(jobs.values().cloned().collect())
}
MockBehavior::InternalError(msg) => Err(SchedulerError::SchedulerInternalError(msg)),
_ => Ok(vec![]),
}
}
async fn remove_scheduled_job(&self, id: &str) -> Result<(), SchedulerError> {
self.log_call("remove_scheduled_job").await;
match self.get_behavior("remove_scheduled_job").await {
MockBehavior::Success => {
let mut jobs = self.jobs.lock().await;
if jobs.remove(id).is_some() {
Ok(())
} else {
Err(SchedulerError::JobNotFound(id.to_string()))
}
}
MockBehavior::NotFound(job_id) => Err(SchedulerError::JobNotFound(job_id)),
MockBehavior::InternalError(msg) => Err(SchedulerError::SchedulerInternalError(msg)),
_ => Ok(()),
}
}
async fn pause_schedule(&self, id: &str) -> Result<(), SchedulerError> {
self.log_call("pause_schedule").await;
match self.get_behavior("pause_schedule").await {
MockBehavior::Success => {
let jobs = self.jobs.lock().await;
if jobs.contains_key(id) {
Ok(())
} else {
Err(SchedulerError::JobNotFound(id.to_string()))
}
}
MockBehavior::NotFound(job_id) => Err(SchedulerError::JobNotFound(job_id)),
MockBehavior::JobCurrentlyRunning(job_id) => {
Err(SchedulerError::AnyhowError(anyhow::anyhow!(
"Cannot pause schedule '{}' while it's currently running",
job_id
)))
}
MockBehavior::InternalError(msg) => Err(SchedulerError::SchedulerInternalError(msg)),
_ => Ok(()),
}
}
async fn unpause_schedule(&self, id: &str) -> Result<(), SchedulerError> {
self.log_call("unpause_schedule").await;
match self.get_behavior("unpause_schedule").await {
MockBehavior::Success => {
let jobs = self.jobs.lock().await;
if jobs.contains_key(id) {
Ok(())
} else {
Err(SchedulerError::JobNotFound(id.to_string()))
}
}
MockBehavior::NotFound(job_id) => Err(SchedulerError::JobNotFound(job_id)),
MockBehavior::InternalError(msg) => Err(SchedulerError::SchedulerInternalError(msg)),
_ => Ok(()),
}
}
async fn run_now(&self, id: &str) -> Result<String, SchedulerError> {
self.log_call("run_now").await;
match self.get_behavior("run_now").await {
MockBehavior::Success => {
let jobs = self.jobs.lock().await;
if jobs.contains_key(id) {
Ok(format!("{}_session_{}", id, chrono::Utc::now().timestamp()))
} else {
Err(SchedulerError::JobNotFound(id.to_string()))
}
}
MockBehavior::NotFound(job_id) => Err(SchedulerError::JobNotFound(job_id)),
MockBehavior::InternalError(msg) => Err(SchedulerError::SchedulerInternalError(msg)),
_ => Ok("mock_session_123".to_string()),
}
}
async fn sessions(
&self,
sched_id: &str,
limit: usize,
) -> Result<Vec<(String, SessionMetadata)>, SchedulerError> {
self.log_call("sessions").await;
match self.get_behavior("sessions").await {
MockBehavior::Success => {
let sessions_data = self.sessions_data.lock().await;
let sessions = sessions_data.get(sched_id).cloned().unwrap_or_default();
Ok(sessions.into_iter().take(limit).collect())
}
MockBehavior::NotFound(job_id) => Err(SchedulerError::JobNotFound(job_id)),
MockBehavior::InternalError(msg) => Err(SchedulerError::SchedulerInternalError(msg)),
_ => Ok(vec![]),
}
}
async fn update_schedule(
&self,
sched_id: &str,
_new_cron: String,
) -> Result<(), SchedulerError> {
self.log_call("update_schedule").await;
match self.get_behavior("update_schedule").await {
MockBehavior::Success => {
let jobs = self.jobs.lock().await;
if jobs.contains_key(sched_id) {
Ok(())
} else {
Err(SchedulerError::JobNotFound(sched_id.to_string()))
}
}
MockBehavior::NotFound(job_id) => Err(SchedulerError::JobNotFound(job_id)),
MockBehavior::InternalError(msg) => Err(SchedulerError::SchedulerInternalError(msg)),
_ => Ok(()),
}
}
async fn kill_running_job(&self, sched_id: &str) -> Result<(), SchedulerError> {
self.log_call("kill_running_job").await;
match self.get_behavior("kill_running_job").await {
MockBehavior::Success => {
let running_jobs = self.running_jobs.lock().await;
if running_jobs.contains(sched_id) {
Ok(())
} else {
Err(SchedulerError::AnyhowError(anyhow::anyhow!(
"Schedule '{}' is not currently running",
sched_id
)))
}
}
MockBehavior::NotFound(job_id) => Err(SchedulerError::JobNotFound(job_id)),
MockBehavior::InternalError(msg) => Err(SchedulerError::SchedulerInternalError(msg)),
_ => Ok(()),
}
}
async fn get_running_job_info(
&self,
sched_id: &str,
) -> Result<Option<(String, DateTime<Utc>)>, SchedulerError> {
self.log_call("get_running_job_info").await;
match self.get_behavior("get_running_job_info").await {
MockBehavior::Success => {
let running_jobs = self.running_jobs.lock().await;
if running_jobs.contains(sched_id) {
Ok(Some((format!("{}_session", sched_id), Utc::now())))
} else {
Ok(None)
}
}
MockBehavior::NotFound(job_id) => Err(SchedulerError::JobNotFound(job_id)),
MockBehavior::InternalError(msg) => Err(SchedulerError::SchedulerInternalError(msg)),
_ => Ok(None),
}
}
}
// Helper for creating temp recipe files
pub struct TempRecipe {
pub path: PathBuf,
_temp_dir: TempDir, // Keep alive
}
pub fn create_temp_recipe(valid: bool, format: &str) -> TempRecipe {
let temp_dir = tempfile::tempdir().unwrap();
let filename = format!("test_recipe.{}", format);
let path = temp_dir.path().join(filename);
let content = if valid {
match format {
"json" => {
r#"{
"version": "1.0.0",
"title": "Test Recipe",
"description": "A test recipe",
"prompt": "Hello world"
}"#
}
"yaml" | "yml" => {
r#"version: "1.0.0"
title: "Test Recipe"
description: "A test recipe"
prompt: "Hello world"
"#
}
_ => panic!("Unsupported format: {}", format),
}
} else {
match format {
"json" => r#"{"invalid": json syntax"#,
"yaml" | "yml" => "invalid:\n - yaml: syntax: error",
_ => "invalid content",
}
};
std::fs::write(&path, content).unwrap();
TempRecipe {
path,
_temp_dir: temp_dir,
}
}
// Test builder for easy setup
pub struct ScheduleToolTestBuilder {
scheduler: Arc<ConfigurableMockScheduler>,
}
impl ScheduleToolTestBuilder {
pub fn new() -> Self {
Self {
scheduler: Arc::new(ConfigurableMockScheduler::new()),
}
}
pub async fn with_scheduler_behavior(self, method: &str, behavior: MockBehavior) -> Self {
{
let mut behaviors = self.scheduler.behaviors.lock().await;
behaviors.insert(method.to_string(), behavior);
}
self
}
pub async fn with_existing_job(self, job_id: &str, cron: &str) -> Self {
let job = ScheduledJob {
id: job_id.to_string(),
source: "/tmp/test.json".to_string(),
cron: cron.to_string(),
last_run: None,
currently_running: false,
paused: false,
current_session_id: None,
process_start_time: None,
};
{
let mut jobs = self.scheduler.jobs.lock().await;
jobs.insert(job.id.clone(), job);
}
self
}
pub async fn with_running_job(self, job_id: &str) -> Self {
{
let mut running_jobs = self.scheduler.running_jobs.lock().await;
running_jobs.insert(job_id.to_string());
}
self
}
pub async fn with_sessions_data(
self,
job_id: &str,
sessions: Vec<(String, SessionMetadata)>,
) -> Self {
{
let mut sessions_data = self.scheduler.sessions_data.lock().await;
sessions_data.insert(job_id.to_string(), sessions);
}
self
}
pub async fn build(self) -> (Agent, Arc<ConfigurableMockScheduler>) {
let agent = Agent::new();
agent.set_scheduler(self.scheduler.clone()).await;
(agent, self.scheduler)
}
}
// Helper function to create test session metadata
pub fn create_test_session_metadata(message_count: usize, working_dir: &str) -> SessionMetadata {
SessionMetadata {
message_count,
working_dir: PathBuf::from(working_dir),
description: "Test session".to_string(),
schedule_id: Some("test_job".to_string()),
total_tokens: Some(100),
input_tokens: Some(50),
output_tokens: Some(50),
accumulated_total_tokens: Some(100),
accumulated_input_tokens: Some(50),
accumulated_output_tokens: Some(50),
}
}