From bd430866e8a00ba3ac310c371bc14e6cf749506d Mon Sep 17 00:00:00 2001 From: Max Novich Date: Thu, 29 May 2025 18:33:27 -0700 Subject: [PATCH] feat: implement proper task cancellation for scheduled jobs (#2731) --- crates/goose-cli/src/commands/schedule.rs | 2 + crates/goose-server/src/openapi.rs | 4 + crates/goose-server/src/routes/schedule.rs | 116 + crates/goose-server/ui/desktop/openapi.json | 2141 +++++++++++++++-- crates/goose/src/scheduler.rs | 322 ++- ui/desktop/openapi.json | 98 + ui/desktop/src/api/sdk.gen.ts | 16 +- ui/desktop/src/api/types.gen.ts | 60 + .../schedule/CreateScheduleModal.tsx | 2 +- .../schedule/ScheduleDetailView.tsx | 193 +- .../src/components/schedule/SchedulesView.tsx | 180 +- ui/desktop/src/schedule.ts | 48 + 12 files changed, 2945 insertions(+), 237 deletions(-) diff --git a/crates/goose-cli/src/commands/schedule.rs b/crates/goose-cli/src/commands/schedule.rs index cfe27a47..d25df185 100644 --- a/crates/goose-cli/src/commands/schedule.rs +++ b/crates/goose-cli/src/commands/schedule.rs @@ -34,6 +34,8 @@ pub async fn handle_schedule_add( last_run: None, currently_running: false, paused: false, + current_session_id: None, + process_start_time: None, }; let scheduler_storage_path = diff --git a/crates/goose-server/src/openapi.rs b/crates/goose-server/src/openapi.rs index 24e2d515..317376f4 100644 --- a/crates/goose-server/src/openapi.rs +++ b/crates/goose-server/src/openapi.rs @@ -45,6 +45,8 @@ use utoipa::OpenApi; super::routes::schedule::run_now_handler, super::routes::schedule::pause_schedule, super::routes::schedule::unpause_schedule, + super::routes::schedule::kill_running_job, + super::routes::schedule::inspect_running_job, super::routes::schedule::sessions_handler ), components(schemas( @@ -95,6 +97,8 @@ use utoipa::OpenApi; SessionMetadata, super::routes::schedule::CreateScheduleRequest, super::routes::schedule::UpdateScheduleRequest, + super::routes::schedule::KillJobResponse, + super::routes::schedule::InspectJobResponse, goose::scheduler::ScheduledJob, super::routes::schedule::RunNowResponse, super::routes::schedule::ListSchedulesResponse, diff --git a/crates/goose-server/src/routes/schedule.rs b/crates/goose-server/src/routes/schedule.rs index fb7d852a..2caf7a14 100644 --- a/crates/goose-server/src/routes/schedule.rs +++ b/crates/goose-server/src/routes/schedule.rs @@ -31,6 +31,21 @@ pub struct ListSchedulesResponse { jobs: Vec, } +// Response for the kill endpoint +#[derive(Serialize, utoipa::ToSchema)] +pub struct KillJobResponse { + message: String, +} + +// Response for the inspect endpoint +#[derive(Serialize, utoipa::ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct InspectJobResponse { + session_id: Option, + process_start_time: Option, + running_duration_seconds: Option, +} + // Response for the run_now endpoint #[derive(Serialize, utoipa::ToSchema)] pub struct RunNowResponse { @@ -100,6 +115,8 @@ async fn create_schedule( last_run: None, currently_running: false, paused: false, + current_session_id: None, + process_start_time: None, }; scheduler .add_scheduled_job(job.clone()) @@ -199,6 +216,17 @@ async fn run_now_handler( eprintln!("Error running schedule '{}' now: {:?}", id, e); match e { goose::scheduler::SchedulerError::JobNotFound(_) => Err(StatusCode::NOT_FOUND), + goose::scheduler::SchedulerError::AnyhowError(ref err) => { + // Check if this is a cancellation error + if err.to_string().contains("was successfully cancelled") { + // Return a special session_id to indicate cancellation + Ok(Json(RunNowResponse { + session_id: "CANCELLED".to_string(), + })) + } else { + Err(StatusCode::INTERNAL_SERVER_ERROR) + } + } _ => Err(StatusCode::INTERNAL_SERVER_ERROR), } } @@ -389,6 +417,92 @@ async fn update_schedule( Ok(Json(updated_job)) } +#[utoipa::path( + post, + path = "/schedule/{id}/kill", + responses( + (status = 200, description = "Running job killed successfully"), + ), + tag = "schedule" +)] +#[axum::debug_handler] +pub async fn kill_running_job( + State(state): State>, + headers: HeaderMap, + Path(id): Path, +) -> Result, StatusCode> { + verify_secret_key(&headers, &state)?; + let scheduler = state + .scheduler() + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + + scheduler.kill_running_job(&id).await.map_err(|e| { + eprintln!("Error killing running job '{}': {:?}", id, e); + match e { + goose::scheduler::SchedulerError::JobNotFound(_) => StatusCode::NOT_FOUND, + goose::scheduler::SchedulerError::AnyhowError(_) => StatusCode::BAD_REQUEST, + _ => StatusCode::INTERNAL_SERVER_ERROR, + } + })?; + + Ok(Json(KillJobResponse { + message: format!("Successfully killed running job '{}'", id), + })) +} + +#[utoipa::path( + get, + path = "/schedule/{id}/inspect", + params( + ("id" = String, Path, description = "ID of the schedule to inspect") + ), + responses( + (status = 200, description = "Running job information", body = InspectJobResponse), + (status = 404, description = "Scheduled job not found"), + (status = 500, description = "Internal server error") + ), + tag = "schedule" +)] +#[axum::debug_handler] +pub async fn inspect_running_job( + State(state): State>, + headers: HeaderMap, + Path(id): Path, +) -> Result, StatusCode> { + verify_secret_key(&headers, &state)?; + let scheduler = state + .scheduler() + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + + match scheduler.get_running_job_info(&id).await { + Ok(info) => { + if let Some((session_id, start_time)) = info { + let duration = chrono::Utc::now().signed_duration_since(start_time); + Ok(Json(InspectJobResponse { + session_id: Some(session_id), + process_start_time: Some(start_time.to_rfc3339()), + running_duration_seconds: Some(duration.num_seconds()), + })) + } else { + Ok(Json(InspectJobResponse { + session_id: None, + process_start_time: None, + running_duration_seconds: None, + })) + } + } + Err(e) => { + eprintln!("Error inspecting running job '{}': {:?}", id, e); + match e { + goose::scheduler::SchedulerError::JobNotFound(_) => Err(StatusCode::NOT_FOUND), + _ => Err(StatusCode::INTERNAL_SERVER_ERROR), + } + } + } +} + pub fn routes(state: Arc) -> Router { Router::new() .route("/schedule/create", post(create_schedule)) @@ -398,6 +512,8 @@ pub fn routes(state: Arc) -> Router { .route("/schedule/{id}/run_now", post(run_now_handler)) // Corrected .route("/schedule/{id}/pause", post(pause_schedule)) .route("/schedule/{id}/unpause", post(unpause_schedule)) + .route("/schedule/{id}/kill", post(kill_running_job)) + .route("/schedule/{id}/inspect", get(inspect_running_job)) .route("/schedule/{id}/sessions", get(sessions_handler)) // Corrected .with_state(state) } diff --git a/crates/goose-server/ui/desktop/openapi.json b/crates/goose-server/ui/desktop/openapi.json index 5e78c8f6..0533f2f7 100644 --- a/crates/goose-server/ui/desktop/openapi.json +++ b/crates/goose-server/ui/desktop/openapi.json @@ -10,15 +10,58 @@ "license": { "name": "Apache-2.0" }, - "version": "1.0.4" + "version": "1.0.24" }, "paths": { + "/agent/tools": { + "get": { + "tags": [ + "super::routes::agent" + ], + "operationId": "get_tools", + "parameters": [ + { + "name": "extension_name", + "in": "query", + "description": "Optional extension name to filter tools", + "required": false, + "schema": { + "type": "string", + "nullable": true + } + } + ], + "responses": { + "200": { + "description": "Tools retrieved successfully", + "content": { + "application/json": { + "schema": { + "type": "array", + "items": { + "$ref": "#/components/schemas/ToolInfo" + } + } + } + } + }, + "401": { + "description": "Unauthorized - invalid secret key" + }, + "424": { + "description": "Agent not initialized" + }, + "500": { + "description": "Internal server error" + } + } + } + }, "/config": { "get": { "tags": [ "super::routes::config_management" ], - "summary": "Read all configuration values", "operationId": "read_all_config", "responses": { "200": { @@ -34,13 +77,56 @@ } } }, - "/config/extension": { + "/config/backup": { "post": { "tags": [ - "config" + "super::routes::config_management" ], - "summary": "Add an extension configuration", - "operationId": "add_extension_config", + "operationId": "backup_config", + "responses": { + "200": { + "description": "Config file backed up", + "content": { + "text/plain": { + "schema": { + "type": "string" + } + } + } + }, + "500": { + "description": "Internal server error" + } + } + } + }, + "/config/extensions": { + "get": { + "tags": [ + "super::routes::config_management" + ], + "operationId": "get_extensions", + "responses": { + "200": { + "description": "All extensions retrieved successfully", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ExtensionResponse" + } + } + } + }, + "500": { + "description": "Internal server error" + } + } + }, + "post": { + "tags": [ + "super::routes::config_management" + ], + "operationId": "add_extension", "requestBody": { "content": { "application/json": { @@ -53,7 +139,7 @@ }, "responses": { "200": { - "description": "Extension added successfully", + "description": "Extension added or updated successfully", "content": { "text/plain": { "schema": { @@ -65,27 +151,31 @@ "400": { "description": "Invalid request" }, + "422": { + "description": "Could not serialize config.yaml" + }, "500": { "description": "Internal server error" } } - }, + } + }, + "/config/extensions/{name}": { "delete": { "tags": [ "super::routes::config_management" ], - "summary": "Remove an extension configuration", "operationId": "remove_extension", - "requestBody": { - "content": { - "application/json": { - "schema": { - "$ref": "#/components/schemas/ConfigKeyQuery" - } + "parameters": [ + { + "name": "name", + "in": "path", + "required": true, + "schema": { + "type": "string" } - }, - "required": true - }, + } + ], "responses": { "200": { "description": "Extension removed successfully", @@ -106,12 +196,90 @@ } } }, - "/config/read": { + "/config/init": { + "post": { + "tags": [ + "super::routes::config_management" + ], + "operationId": "init_config", + "responses": { + "200": { + "description": "Config initialization check completed", + "content": { + "text/plain": { + "schema": { + "type": "string" + } + } + } + }, + "500": { + "description": "Internal server error" + } + } + } + }, + "/config/permissions": { + "post": { + "tags": [ + "super::routes::config_management" + ], + "operationId": "upsert_permissions", + "requestBody": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/UpsertPermissionsQuery" + } + } + }, + "required": true + }, + "responses": { + "200": { + "description": "Permission update completed", + "content": { + "text/plain": { + "schema": { + "type": "string" + } + } + } + }, + "400": { + "description": "Invalid request" + } + } + } + }, + "/config/providers": { "get": { "tags": [ "super::routes::config_management" ], - "summary": "Read a configuration value", + "operationId": "providers", + "responses": { + "200": { + "description": "All configuration values retrieved successfully", + "content": { + "application/json": { + "schema": { + "type": "array", + "items": { + "$ref": "#/components/schemas/ProviderDetails" + } + } + } + } + } + } + } + }, + "/config/read": { + "post": { + "tags": [ + "super::routes::config_management" + ], "operationId": "read_config", "requestBody": { "content": { @@ -143,7 +311,6 @@ "tags": [ "super::routes::config_management" ], - "summary": "Remove a configuration value", "operationId": "remove_config", "requestBody": { "content": { @@ -180,7 +347,6 @@ "tags": [ "super::routes::config_management" ], - "summary": "Upsert a configuration value", "operationId": "upsert_config", "requestBody": { "content": { @@ -209,20 +375,99 @@ } } }, + "/confirm": { + "post": { + "tags": [ + "super::routes::reply" + ], + "operationId": "confirm_permission", + "requestBody": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/PermissionConfirmationRequest" + } + } + }, + "required": true + }, + "responses": { + "200": { + "description": "Permission action is confirmed", + "content": { + "application/json": { + "schema": {} + } + } + }, + "401": { + "description": "Unauthorized - invalid secret key" + }, + "500": { + "description": "Internal server error" + } + } + } + }, + "/context/manage": { + "post": { + "tags": [ + "Context Management" + ], + "operationId": "manage_context", + "requestBody": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ContextManageRequest" + } + } + }, + "required": true + }, + "responses": { + "200": { + "description": "Context managed successfully", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ContextManageResponse" + } + } + } + }, + "401": { + "description": "Unauthorized - Invalid or missing API key" + }, + "412": { + "description": "Precondition failed - Agent not available" + }, + "500": { + "description": "Internal server error" + } + }, + "security": [ + { + "api_key": [] + } + ] + } + }, "/schedule/create": { "post": { - "tags": ["schedule"], - "summary": "Create a new scheduled job", + "tags": [ + "schedule" + ], "operationId": "create_schedule", "requestBody": { - "required": true, "content": { "application/json": { "schema": { "$ref": "#/components/schemas/CreateScheduleRequest" } } - } + }, + "required": true }, "responses": { "200": { @@ -241,47 +486,18 @@ } } }, - "/schedule/list": { - "get": { - "tags": ["schedule"], - "summary": "List all scheduled jobs", - "operationId": "list_schedules", - "responses": { - "200": { - "description": "A list of scheduled jobs", - "content": { - "application/json": { - "schema": { - "type": "object", - "properties": { - "jobs": { - "type": "array", - "items": { - "$ref": "#/components/schemas/ScheduledJob" - } - } - } - } - } - } - }, - "500": { - "description": "Internal server error" - } - } - } - }, "/schedule/delete/{id}": { "delete": { - "tags": ["schedule"], - "summary": "Delete a scheduled job by ID", + "tags": [ + "schedule" + ], "operationId": "delete_schedule", "parameters": [ { "name": "id", "in": "path", - "required": true, "description": "ID of the schedule to delete", + "required": true, "schema": { "type": "string" } @@ -300,17 +516,201 @@ } } }, - "/schedule/{id}/run_now": { - "post": { - "tags": ["schedule"], - "summary": "Run a scheduled job immediately", - "operationId": "run_schedule_now", + "/schedule/list": { + "get": { + "tags": [ + "schedule" + ], + "operationId": "list_schedules", + "responses": { + "200": { + "description": "A list of scheduled jobs", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ListSchedulesResponse" + } + } + } + }, + "500": { + "description": "Internal server error" + } + } + } + }, + "/schedule/{id}": { + "put": { + "tags": [ + "schedule" + ], + "operationId": "update_schedule", "parameters": [ { "name": "id", "in": "path", + "description": "ID of the schedule to update", "required": true, + "schema": { + "type": "string" + } + } + ], + "requestBody": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/UpdateScheduleRequest" + } + } + }, + "required": true + }, + "responses": { + "200": { + "description": "Scheduled job updated successfully", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ScheduledJob" + } + } + } + }, + "400": { + "description": "Cannot update a currently running job or invalid request" + }, + "404": { + "description": "Scheduled job not found" + }, + "500": { + "description": "Internal server error" + } + } + } + }, + "/schedule/{id}/inspect": { + "get": { + "tags": [ + "schedule" + ], + "operationId": "inspect_running_job", + "parameters": [ + { + "name": "id", + "in": "path", + "description": "ID of the schedule to inspect", + "required": true, + "schema": { + "type": "string" + } + } + ], + "responses": { + "200": { + "description": "Running job information", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/InspectJobResponse" + } + } + } + }, + "404": { + "description": "Scheduled job not found" + }, + "500": { + "description": "Internal server error" + } + } + } + }, + "/schedule/{id}/kill": { + "post": { + "tags": [ + "schedule" + ], + "operationId": "kill_running_job", + "parameters": [ + { + "name": "id", + "in": "path", + "description": "ID of the schedule to kill", + "required": true, + "schema": { + "type": "string" + } + } + ], + "responses": { + "200": { + "description": "Running job killed successfully", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/KillJobResponse" + } + } + } + }, + "400": { + "description": "Job is not currently running" + }, + "404": { + "description": "Scheduled job not found" + }, + "500": { + "description": "Internal server error" + } + } + } + }, + "/schedule/{id}/pause": { + "post": { + "tags": [ + "schedule" + ], + "operationId": "pause_schedule", + "parameters": [ + { + "name": "id", + "in": "path", + "description": "ID of the schedule to pause", + "required": true, + "schema": { + "type": "string" + } + } + ], + "responses": { + "204": { + "description": "Scheduled job paused successfully" + }, + "400": { + "description": "Cannot pause a currently running job" + }, + "404": { + "description": "Scheduled job not found" + }, + "500": { + "description": "Internal server error" + } + } + } + }, + "/schedule/{id}/run_now": { + "post": { + "tags": [ + "schedule" + ], + "operationId": "run_now_handler", + "parameters": [ + { + "name": "id", + "in": "path", "description": "ID of the schedule to run", + "required": true, "schema": { "type": "string" } @@ -338,15 +738,16 @@ }, "/schedule/{id}/sessions": { "get": { - "tags": ["schedule"], - "summary": "List sessions created by a specific schedule", - "operationId": "list_schedule_sessions", + "tags": [ + "schedule" + ], + "operationId": "sessions_handler", "parameters": [ { "name": "id", "in": "path", - "required": true, "description": "ID of the schedule", + "required": true, "schema": { "type": "string" } @@ -354,24 +755,23 @@ { "name": "limit", "in": "query", - "description": "Maximum number of sessions to return", "required": false, "schema": { "type": "integer", "format": "int32", - "default": 50 + "minimum": 0 } } ], "responses": { "200": { - "description": "A list of session metadata", + "description": "A list of session display info", "content": { "application/json": { "schema": { "type": "array", "items": { - "$ref": "#/components/schemas/SessionMetadata" + "$ref": "#/components/schemas/SessionDisplayInfo" } } } @@ -382,19 +782,173 @@ } } } + }, + "/schedule/{id}/unpause": { + "post": { + "tags": [ + "schedule" + ], + "operationId": "unpause_schedule", + "parameters": [ + { + "name": "id", + "in": "path", + "description": "ID of the schedule to unpause", + "required": true, + "schema": { + "type": "string" + } + } + ], + "responses": { + "204": { + "description": "Scheduled job unpaused successfully" + }, + "404": { + "description": "Scheduled job not found" + }, + "500": { + "description": "Internal server error" + } + } + } + }, + "/sessions": { + "get": { + "tags": [ + "Session Management" + ], + "operationId": "list_sessions", + "responses": { + "200": { + "description": "List of available sessions retrieved successfully", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/SessionListResponse" + } + } + } + }, + "401": { + "description": "Unauthorized - Invalid or missing API key" + }, + "500": { + "description": "Internal server error" + } + }, + "security": [ + { + "api_key": [] + } + ] + } + }, + "/sessions/{session_id}": { + "get": { + "tags": [ + "Session Management" + ], + "operationId": "get_session_history", + "parameters": [ + { + "name": "session_id", + "in": "path", + "description": "Unique identifier for the session", + "required": true, + "schema": { + "type": "string" + } + } + ], + "responses": { + "200": { + "description": "Session history retrieved successfully", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/SessionHistoryResponse" + } + } + } + }, + "401": { + "description": "Unauthorized - Invalid or missing API key" + }, + "404": { + "description": "Session not found" + }, + "500": { + "description": "Internal server error" + } + }, + "security": [ + { + "api_key": [] + } + ] + } } }, "components": { "schemas": { + "Annotations": { + "type": "object", + "properties": { + "audience": { + "type": "array", + "items": { + "$ref": "#/components/schemas/Role" + }, + "nullable": true + }, + "priority": { + "type": "number", + "format": "float", + "nullable": true + }, + "timestamp": { + "type": "string", + "format": "date-time", + "example": "2023-01-01T00:00:00Z" + } + } + }, + "ConfigKey": { + "type": "object", + "required": [ + "name", + "required", + "secret" + ], + "properties": { + "default": { + "type": "string", + "nullable": true + }, + "name": { + "type": "string" + }, + "required": { + "type": "boolean" + }, + "secret": { + "type": "boolean" + } + } + }, "ConfigKeyQuery": { "type": "object", "required": [ - "key" + "key", + "is_secret" ], "properties": { + "is_secret": { + "type": "boolean" + }, "key": { - "type": "string", - "description": "The configuration key to operate on" + "type": "string" } } }, @@ -406,45 +960,134 @@ "properties": { "config": { "type": "object", - "description": "The configuration values", "additionalProperties": {} } } }, - "ExtensionQuery": { + "Content": { + "oneOf": [ + { + "allOf": [ + { + "$ref": "#/components/schemas/TextContent" + }, + { + "type": "object", + "required": [ + "type" + ], + "properties": { + "type": { + "type": "string", + "enum": [ + "text" + ] + } + } + } + ] + }, + { + "allOf": [ + { + "$ref": "#/components/schemas/ImageContent" + }, + { + "type": "object", + "required": [ + "type" + ], + "properties": { + "type": { + "type": "string", + "enum": [ + "image" + ] + } + } + } + ] + }, + { + "allOf": [ + { + "$ref": "#/components/schemas/EmbeddedResource" + }, + { + "type": "object", + "required": [ + "type" + ], + "properties": { + "type": { + "type": "string", + "enum": [ + "resource" + ] + } + } + } + ] + } + ], + "discriminator": { + "propertyName": "type" + } + }, + "ContextLengthExceeded": { "type": "object", "required": [ - "name", - "config" + "msg" ], "properties": { - "config": { - "description": "The configuration for the extension" - }, - "name": { - "type": "string", - "description": "The name of the extension" + "msg": { + "type": "string" } } }, - "UpsertConfigQuery": { + "ContextManageRequest": { "type": "object", + "description": "Request payload for context management operations", "required": [ - "key", - "value" + "messages", + "manageAction" ], "properties": { - "is_secret": { - "type": "boolean", - "description": "Whether this configuration value should be treated as a secret", - "nullable": true - }, - "key": { + "manageAction": { "type": "string", - "description": "The configuration key to upsert" + "description": "Operation to perform: \"truncation\" or \"summarize\"" }, - "value": { - "description": "The value to set for the configuration" + "messages": { + "type": "array", + "items": { + "$ref": "#/components/schemas/Message" + }, + "description": "Collection of messages to be managed" + } + } + }, + "ContextManageResponse": { + "type": "object", + "description": "Response from context management operations", + "required": [ + "messages", + "tokenCounts" + ], + "properties": { + "messages": { + "type": "array", + "items": { + "$ref": "#/components/schemas/Message" + }, + "description": "Processed messages after the operation" + }, + "tokenCounts": { + "type": "array", + "items": { + "type": "integer", + "minimum": 0 + }, + "description": "Token counts for each processed message" } } }, @@ -456,17 +1099,804 @@ "cron" ], "properties": { + "cron": { + "type": "string" + }, "id": { - "type": "string", - "description": "Unique ID for the new schedule." + "type": "string" }, "recipe_source": { - "type": "string", - "description": "Path to the recipe file to be executed by this schedule." + "type": "string" + } + } + }, + "EmbeddedResource": { + "type": "object", + "required": [ + "resource" + ], + "properties": { + "annotations": { + "allOf": [ + { + "$ref": "#/components/schemas/Annotations" + } + ], + "nullable": true }, - "cron": { + "resource": { + "$ref": "#/components/schemas/ResourceContents" + } + } + }, + "Envs": { + "type": "object", + "additionalProperties": { + "type": "string", + "description": "A map of environment variables to set, e.g. API_KEY -> some_secret, HOST -> host" + } + }, + "ExtensionConfig": { + "oneOf": [ + { + "type": "object", + "description": "Server-sent events client with a URI endpoint", + "required": [ + "name", + "uri", + "type" + ], + "properties": { + "bundled": { + "type": "boolean", + "description": "Whether this extension is bundled with Goose", + "nullable": true + }, + "description": { + "type": "string", + "nullable": true + }, + "env_keys": { + "type": "array", + "items": { + "type": "string" + } + }, + "envs": { + "$ref": "#/components/schemas/Envs" + }, + "name": { + "type": "string", + "description": "The name used to identify this extension" + }, + "timeout": { + "type": "integer", + "format": "int64", + "nullable": true, + "minimum": 0 + }, + "type": { + "type": "string", + "enum": [ + "sse" + ] + }, + "uri": { + "type": "string" + } + } + }, + { + "type": "object", + "description": "Standard I/O client with command and arguments", + "required": [ + "name", + "cmd", + "args", + "type" + ], + "properties": { + "args": { + "type": "array", + "items": { + "type": "string" + } + }, + "bundled": { + "type": "boolean", + "description": "Whether this extension is bundled with Goose", + "nullable": true + }, + "cmd": { + "type": "string" + }, + "description": { + "type": "string", + "nullable": true + }, + "env_keys": { + "type": "array", + "items": { + "type": "string" + } + }, + "envs": { + "$ref": "#/components/schemas/Envs" + }, + "name": { + "type": "string", + "description": "The name used to identify this extension" + }, + "timeout": { + "type": "integer", + "format": "int64", + "nullable": true, + "minimum": 0 + }, + "type": { + "type": "string", + "enum": [ + "stdio" + ] + } + } + }, + { + "type": "object", + "description": "Built-in extension that is part of the goose binary", + "required": [ + "name", + "type" + ], + "properties": { + "bundled": { + "type": "boolean", + "description": "Whether this extension is bundled with Goose", + "nullable": true + }, + "display_name": { + "type": "string", + "nullable": true + }, + "name": { + "type": "string", + "description": "The name used to identify this extension" + }, + "timeout": { + "type": "integer", + "format": "int64", + "nullable": true, + "minimum": 0 + }, + "type": { + "type": "string", + "enum": [ + "builtin" + ] + } + } + }, + { + "type": "object", + "description": "Frontend-provided tools that will be called through the frontend", + "required": [ + "name", + "tools", + "type" + ], + "properties": { + "bundled": { + "type": "boolean", + "description": "Whether this extension is bundled with Goose", + "nullable": true + }, + "instructions": { + "type": "string", + "description": "Instructions for how to use these tools", + "nullable": true + }, + "name": { + "type": "string", + "description": "The name used to identify this extension" + }, + "tools": { + "type": "array", + "items": { + "$ref": "#/components/schemas/Tool" + }, + "description": "The tools provided by the frontend" + }, + "type": { + "type": "string", + "enum": [ + "frontend" + ] + } + } + } + ], + "description": "Represents the different types of MCP extensions that can be added to the manager", + "discriminator": { + "propertyName": "type" + } + }, + "ExtensionEntry": { + "allOf": [ + { + "$ref": "#/components/schemas/ExtensionConfig" + }, + { + "type": "object", + "required": [ + "enabled" + ], + "properties": { + "enabled": { + "type": "boolean" + } + } + } + ] + }, + "ExtensionQuery": { + "type": "object", + "required": [ + "name", + "config", + "enabled" + ], + "properties": { + "config": { + "$ref": "#/components/schemas/ExtensionConfig" + }, + "enabled": { + "type": "boolean" + }, + "name": { + "type": "string" + } + } + }, + "ExtensionResponse": { + "type": "object", + "required": [ + "extensions" + ], + "properties": { + "extensions": { + "type": "array", + "items": { + "$ref": "#/components/schemas/ExtensionEntry" + } + } + } + }, + "FrontendToolRequest": { + "type": "object", + "required": [ + "id", + "toolCall" + ], + "properties": { + "id": { + "type": "string" + }, + "toolCall": { + "type": "object" + } + } + }, + "ImageContent": { + "type": "object", + "required": [ + "data", + "mimeType" + ], + "properties": { + "annotations": { + "allOf": [ + { + "$ref": "#/components/schemas/Annotations" + } + ], + "nullable": true + }, + "data": { + "type": "string" + }, + "mimeType": { + "type": "string" + } + } + }, + "InspectJobResponse": { + "type": "object", + "properties": { + "processStartTime": { "type": "string", - "description": "Cron string defining when the job should run." + "nullable": true + }, + "runningDurationSeconds": { + "type": "integer", + "format": "int64", + "nullable": true + }, + "sessionId": { + "type": "string", + "nullable": true + } + } + }, + "KillJobResponse": { + "type": "object", + "required": [ + "message" + ], + "properties": { + "message": { + "type": "string" + } + } + }, + "ListSchedulesResponse": { + "type": "object", + "required": [ + "jobs" + ], + "properties": { + "jobs": { + "type": "array", + "items": { + "$ref": "#/components/schemas/ScheduledJob" + } + } + } + }, + "Message": { + "type": "object", + "description": "A message to or from an LLM", + "required": [ + "role", + "created", + "content" + ], + "properties": { + "content": { + "type": "array", + "items": { + "$ref": "#/components/schemas/MessageContent" + } + }, + "created": { + "type": "integer", + "format": "int64" + }, + "role": { + "$ref": "#/components/schemas/Role" + } + } + }, + "MessageContent": { + "oneOf": [ + { + "allOf": [ + { + "$ref": "#/components/schemas/TextContent" + }, + { + "type": "object", + "required": [ + "type" + ], + "properties": { + "type": { + "type": "string", + "enum": [ + "text" + ] + } + } + } + ] + }, + { + "allOf": [ + { + "$ref": "#/components/schemas/ImageContent" + }, + { + "type": "object", + "required": [ + "type" + ], + "properties": { + "type": { + "type": "string", + "enum": [ + "image" + ] + } + } + } + ] + }, + { + "allOf": [ + { + "$ref": "#/components/schemas/ToolRequest" + }, + { + "type": "object", + "required": [ + "type" + ], + "properties": { + "type": { + "type": "string", + "enum": [ + "toolRequest" + ] + } + } + } + ] + }, + { + "allOf": [ + { + "$ref": "#/components/schemas/ToolResponse" + }, + { + "type": "object", + "required": [ + "type" + ], + "properties": { + "type": { + "type": "string", + "enum": [ + "toolResponse" + ] + } + } + } + ] + }, + { + "allOf": [ + { + "$ref": "#/components/schemas/ToolConfirmationRequest" + }, + { + "type": "object", + "required": [ + "type" + ], + "properties": { + "type": { + "type": "string", + "enum": [ + "toolConfirmationRequest" + ] + } + } + } + ] + }, + { + "allOf": [ + { + "$ref": "#/components/schemas/FrontendToolRequest" + }, + { + "type": "object", + "required": [ + "type" + ], + "properties": { + "type": { + "type": "string", + "enum": [ + "frontendToolRequest" + ] + } + } + } + ] + }, + { + "allOf": [ + { + "$ref": "#/components/schemas/ThinkingContent" + }, + { + "type": "object", + "required": [ + "type" + ], + "properties": { + "type": { + "type": "string", + "enum": [ + "thinking" + ] + } + } + } + ] + }, + { + "allOf": [ + { + "$ref": "#/components/schemas/RedactedThinkingContent" + }, + { + "type": "object", + "required": [ + "type" + ], + "properties": { + "type": { + "type": "string", + "enum": [ + "redactedThinking" + ] + } + } + } + ] + }, + { + "allOf": [ + { + "$ref": "#/components/schemas/ContextLengthExceeded" + }, + { + "type": "object", + "required": [ + "type" + ], + "properties": { + "type": { + "type": "string", + "enum": [ + "contextLengthExceeded" + ] + } + } + } + ] + }, + { + "allOf": [ + { + "$ref": "#/components/schemas/SummarizationRequested" + }, + { + "type": "object", + "required": [ + "type" + ], + "properties": { + "type": { + "type": "string", + "enum": [ + "summarizationRequested" + ] + } + } + } + ] + } + ], + "description": "Content passed inside a message, which can be both simple content and tool content", + "discriminator": { + "propertyName": "type" + } + }, + "ModelInfo": { + "type": "object", + "description": "Information about a model's capabilities", + "required": [ + "name", + "context_limit" + ], + "properties": { + "context_limit": { + "type": "integer", + "description": "The maximum context length this model supports", + "minimum": 0 + }, + "name": { + "type": "string", + "description": "The name of the model" + } + } + }, + "PermissionConfirmationRequest": { + "type": "object", + "required": [ + "id", + "action" + ], + "properties": { + "action": { + "type": "string" + }, + "id": { + "type": "string" + }, + "principal_type": { + "$ref": "#/components/schemas/PrincipalType" + } + } + }, + "PermissionLevel": { + "type": "string", + "description": "Enum representing the possible permission levels for a tool.", + "enum": [ + "always_allow", + "ask_before", + "never_allow" + ] + }, + "PrincipalType": { + "type": "string", + "enum": [ + "Extension", + "Tool" + ] + }, + "ProviderDetails": { + "type": "object", + "required": [ + "name", + "metadata", + "is_configured" + ], + "properties": { + "is_configured": { + "type": "boolean" + }, + "metadata": { + "$ref": "#/components/schemas/ProviderMetadata" + }, + "name": { + "type": "string" + } + } + }, + "ProviderMetadata": { + "type": "object", + "description": "Metadata about a provider's configuration requirements and capabilities", + "required": [ + "name", + "display_name", + "description", + "default_model", + "known_models", + "model_doc_link", + "config_keys" + ], + "properties": { + "config_keys": { + "type": "array", + "items": { + "$ref": "#/components/schemas/ConfigKey" + }, + "description": "Required configuration keys" + }, + "default_model": { + "type": "string", + "description": "The default/recommended model for this provider" + }, + "description": { + "type": "string", + "description": "Description of the provider's capabilities" + }, + "display_name": { + "type": "string", + "description": "Display name for the provider in UIs" + }, + "known_models": { + "type": "array", + "items": { + "$ref": "#/components/schemas/ModelInfo" + }, + "description": "A list of currently known models with their capabilities\nTODO: eventually query the apis directly" + }, + "model_doc_link": { + "type": "string", + "description": "Link to the docs where models can be found" + }, + "name": { + "type": "string", + "description": "The unique identifier for this provider" + } + } + }, + "ProvidersResponse": { + "type": "object", + "required": [ + "providers" + ], + "properties": { + "providers": { + "type": "array", + "items": { + "$ref": "#/components/schemas/ProviderDetails" + } + } + } + }, + "RedactedThinkingContent": { + "type": "object", + "required": [ + "data" + ], + "properties": { + "data": { + "type": "string" + } + } + }, + "ResourceContents": { + "oneOf": [ + { + "type": "object", + "required": [ + "uri", + "text" + ], + "properties": { + "mime_type": { + "type": "string", + "nullable": true + }, + "text": { + "type": "string" + }, + "uri": { + "type": "string" + } + } + }, + { + "type": "object", + "required": [ + "uri", + "blob" + ], + "properties": { + "blob": { + "type": "string" + }, + "mime_type": { + "type": "string", + "nullable": true + }, + "uri": { + "type": "string" + } + } + } + ] + }, + "Role": { + "type": "string", + "enum": [ + "user", + "assistant" + ] + }, + "RunNowResponse": { + "type": "object", + "required": [ + "session_id" + ], + "properties": { + "session_id": { + "type": "string" } } }, @@ -478,96 +1908,503 @@ "cron" ], "properties": { - "id": { - "type": "string", - "description": "Unique identifier for the scheduled job." - }, - "source": { - "type": "string", - "description": "Path to the recipe file for this job." - }, "cron": { + "type": "string" + }, + "current_session_id": { "type": "string", - "description": "Cron string defining the schedule." + "nullable": true + }, + "currently_running": { + "type": "boolean" + }, + "id": { + "type": "string" }, "last_run": { "type": "string", "format": "date-time", - "description": "Timestamp of the last time the job was run.", "nullable": true + }, + "paused": { + "type": "boolean" + }, + "process_start_time": { + "type": "string", + "format": "date-time", + "nullable": true + }, + "source": { + "type": "string" + } + } + }, + "SessionDisplayInfo": { + "type": "object", + "required": [ + "id", + "name", + "createdAt", + "workingDir", + "messageCount" + ], + "properties": { + "accumulatedInputTokens": { + "type": "integer", + "format": "int32", + "nullable": true + }, + "accumulatedOutputTokens": { + "type": "integer", + "format": "int32", + "nullable": true + }, + "accumulatedTotalTokens": { + "type": "integer", + "format": "int32", + "nullable": true + }, + "createdAt": { + "type": "string" + }, + "id": { + "type": "string" + }, + "inputTokens": { + "type": "integer", + "format": "int32", + "nullable": true + }, + "messageCount": { + "type": "integer", + "minimum": 0 + }, + "name": { + "type": "string" + }, + "outputTokens": { + "type": "integer", + "format": "int32", + "nullable": true + }, + "scheduleId": { + "type": "string", + "nullable": true + }, + "totalTokens": { + "type": "integer", + "format": "int32", + "nullable": true + }, + "workingDir": { + "type": "string" + } + } + }, + "SessionHistoryResponse": { + "type": "object", + "required": [ + "sessionId", + "metadata", + "messages" + ], + "properties": { + "messages": { + "type": "array", + "items": { + "$ref": "#/components/schemas/Message" + }, + "description": "List of messages in the session conversation" + }, + "metadata": { + "$ref": "#/components/schemas/SessionMetadata" + }, + "sessionId": { + "type": "string", + "description": "Unique identifier for the session" + } + } + }, + "SessionInfo": { + "type": "object", + "required": [ + "id", + "path", + "modified", + "metadata" + ], + "properties": { + "id": { + "type": "string" + }, + "metadata": { + "$ref": "#/components/schemas/SessionMetadata" + }, + "modified": { + "type": "string" + }, + "path": { + "type": "string" + } + } + }, + "SessionListResponse": { + "type": "object", + "required": [ + "sessions" + ], + "properties": { + "sessions": { + "type": "array", + "items": { + "$ref": "#/components/schemas/SessionInfo" + }, + "description": "List of available session information objects" } } }, "SessionMetadata": { "type": "object", + "description": "Metadata for a session, stored as the first line in the session file", "required": [ "working_dir", "description", "message_count" ], "properties": { - "working_dir": { - "type": "string", - "description": "Working directory for the session." + "accumulated_input_tokens": { + "type": "integer", + "format": "int32", + "description": "The number of input tokens used in the session. Accumulated across all messages.", + "nullable": true + }, + "accumulated_output_tokens": { + "type": "integer", + "format": "int32", + "description": "The number of output tokens used in the session. Accumulated across all messages.", + "nullable": true + }, + "accumulated_total_tokens": { + "type": "integer", + "format": "int32", + "description": "The total number of tokens used in the session. Accumulated across all messages (useful for tracking cost over an entire session).", + "nullable": true }, "description": { "type": "string", - "description": "A short description of the session." + "description": "A short description of the session, typically 3 words or less" }, - "schedule_id": { - "type": "string", - "description": "ID of the schedule that triggered this session, if any.", + "input_tokens": { + "type": "integer", + "format": "int32", + "description": "The number of input tokens used in the session. Retrieved from the provider's last usage.", "nullable": true }, "message_count": { "type": "integer", - "format": "int64", - "description": "Number of messages in the session." - }, - "total_tokens": { - "type": "integer", - "format": "int32", - "nullable": true - }, - "input_tokens": { - "type": "integer", - "format": "int32", - "nullable": true + "description": "Number of messages in the session", + "minimum": 0 }, "output_tokens": { - "type": "integer", - "format": "int32", - "nullable": true + "type": "integer", + "format": "int32", + "description": "The number of output tokens used in the session. Retrieved from the provider's last usage.", + "nullable": true }, - "accumulated_total_tokens": { - "type": "integer", - "format": "int32", - "nullable": true + "schedule_id": { + "type": "string", + "description": "ID of the schedule that triggered this session, if any", + "nullable": true }, - "accumulated_input_tokens": { - "type": "integer", - "format": "int32", - "nullable": true + "total_tokens": { + "type": "integer", + "format": "int32", + "description": "The total number of tokens used in the session. Retrieved from the provider's last usage.", + "nullable": true }, - "accumulated_output_tokens": { - "type": "integer", - "format": "int32", - "nullable": true + "working_dir": { + "type": "string", + "description": "Working directory for the session", + "example": "/home/user/sessions/session1" } } }, - "RunNowResponse": { + "SessionsQuery": { + "type": "object", + "properties": { + "limit": { + "type": "integer", + "format": "int32", + "minimum": 0 + } + } + }, + "SummarizationRequested": { "type": "object", "required": [ - "session_id" + "msg" ], "properties": { - "session_id": { + "msg": { + "type": "string" + } + } + }, + "TextContent": { + "type": "object", + "required": [ + "text" + ], + "properties": { + "annotations": { + "allOf": [ + { + "$ref": "#/components/schemas/Annotations" + } + ], + "nullable": true + }, + "text": { + "type": "string" + } + } + }, + "ThinkingContent": { + "type": "object", + "required": [ + "thinking", + "signature" + ], + "properties": { + "signature": { + "type": "string" + }, + "thinking": { + "type": "string" + } + } + }, + "Tool": { + "type": "object", + "description": "A tool that can be used by a model.", + "required": [ + "name", + "description", + "inputSchema" + ], + "properties": { + "annotations": { + "allOf": [ + { + "$ref": "#/components/schemas/ToolAnnotations" + } + ], + "nullable": true + }, + "description": { "type": "string", - "description": "The ID of the newly created session." + "description": "A description of what the tool does" + }, + "inputSchema": { + "description": "A JSON Schema object defining the expected parameters for the tool" + }, + "name": { + "type": "string", + "description": "The name of the tool" + } + } + }, + "ToolAnnotations": { + "type": "object", + "description": "Additional properties describing a tool to clients.\n\nNOTE: all properties in ToolAnnotations are **hints**.\nThey are not guaranteed to provide a faithful description of\ntool behavior (including descriptive properties like `title`).\n\nClients should never make tool use decisions based on ToolAnnotations\nreceived from untrusted servers.", + "properties": { + "destructiveHint": { + "type": "boolean", + "description": "If true, the tool may perform destructive updates to its environment.\nIf false, the tool performs only additive updates.\n\n(This property is meaningful only when `read_only_hint == false`)\n\nDefault: true" + }, + "idempotentHint": { + "type": "boolean", + "description": "If true, calling the tool repeatedly with the same arguments\nwill have no additional effect on its environment.\n\n(This property is meaningful only when `read_only_hint == false`)\n\nDefault: false" + }, + "openWorldHint": { + "type": "boolean", + "description": "If true, this tool may interact with an \"open world\" of external\nentities. If false, the tool's domain of interaction is closed.\nFor example, the world of a web search tool is open, whereas that\nof a memory tool is not.\n\nDefault: true" + }, + "readOnlyHint": { + "type": "boolean", + "description": "If true, the tool does not modify its environment.\n\nDefault: false" + }, + "title": { + "type": "string", + "description": "A human-readable title for the tool.", + "nullable": true + } + } + }, + "ToolConfirmationRequest": { + "type": "object", + "required": [ + "id", + "toolName", + "arguments" + ], + "properties": { + "arguments": {}, + "id": { + "type": "string" + }, + "prompt": { + "type": "string", + "nullable": true + }, + "toolName": { + "type": "string" + } + } + }, + "ToolInfo": { + "type": "object", + "description": "Information about the tool used for building prompts", + "required": [ + "name", + "description", + "parameters" + ], + "properties": { + "description": { + "type": "string" + }, + "name": { + "type": "string" + }, + "parameters": { + "type": "array", + "items": { + "type": "string" + } + }, + "permission": { + "allOf": [ + { + "$ref": "#/components/schemas/PermissionLevel" + } + ], + "nullable": true + } + } + }, + "ToolPermission": { + "type": "object", + "required": [ + "tool_name", + "permission" + ], + "properties": { + "permission": { + "$ref": "#/components/schemas/PermissionLevel" + }, + "tool_name": { + "type": "string" + } + } + }, + "ToolRequest": { + "type": "object", + "required": [ + "id", + "toolCall" + ], + "properties": { + "id": { + "type": "string" + }, + "toolCall": { + "type": "object" + } + } + }, + "ToolResponse": { + "type": "object", + "required": [ + "id", + "toolResult" + ], + "properties": { + "id": { + "type": "string" + }, + "toolResult": { + "type": "object" + } + } + }, + "ToolResultSchema": { + "type": "object", + "required": [ + "success", + "data" + ], + "properties": { + "data": { + "type": "object" + }, + "message": { + "type": "string", + "example": "Operation completed successfully", + "nullable": true + }, + "success": { + "type": "boolean", + "example": true + } + }, + "example": { + "data": {}, + "success": true + } + }, + "UpdateScheduleRequest": { + "type": "object", + "required": [ + "cron" + ], + "properties": { + "cron": { + "type": "string" + } + } + }, + "UpsertConfigQuery": { + "type": "object", + "required": [ + "key", + "value", + "is_secret" + ], + "properties": { + "is_secret": { + "type": "boolean" + }, + "key": { + "type": "string" + }, + "value": {} + } + }, + "UpsertPermissionsQuery": { + "type": "object", + "required": [ + "tool_permissions" + ], + "properties": { + "tool_permissions": { + "type": "array", + "items": { + "$ref": "#/components/schemas/ToolPermission" + } } } } } } -} +} \ No newline at end of file diff --git a/crates/goose/src/scheduler.rs b/crates/goose/src/scheduler.rs index 32a1aea9..6c8bc856 100644 --- a/crates/goose/src/scheduler.rs +++ b/crates/goose/src/scheduler.rs @@ -20,6 +20,10 @@ use crate::recipe::Recipe; use crate::session; use crate::session::storage::SessionMetadata; +// Track running tasks with their abort handles +type RunningTasksMap = HashMap; +type JobsMap = HashMap; + pub fn get_default_scheduler_storage_path() -> Result { let strategy = choose_app_strategy(config::APP_STRATEGY.clone()) .map_err(|e| io::Error::new(io::ErrorKind::NotFound, e.to_string()))?; @@ -111,11 +115,15 @@ pub struct ScheduledJob { pub currently_running: bool, #[serde(default)] pub paused: bool, + #[serde(default)] + pub current_session_id: Option, + #[serde(default)] + pub process_start_time: Option>, } async fn persist_jobs_from_arc( storage_path: &Path, - jobs_arc: &Arc>>, + jobs_arc: &Arc>, ) -> Result<(), SchedulerError> { let jobs_guard = jobs_arc.lock().await; let list: Vec = jobs_guard.values().map(|(_, j)| j.clone()).collect(); @@ -129,8 +137,9 @@ async fn persist_jobs_from_arc( pub struct Scheduler { internal_scheduler: TokioJobScheduler, - jobs: Arc>>, + jobs: Arc>, storage_path: PathBuf, + running_tasks: Arc>, } impl Scheduler { @@ -140,11 +149,13 @@ impl Scheduler { .map_err(|e| SchedulerError::SchedulerInternalError(e.to_string()))?; let jobs = Arc::new(Mutex::new(HashMap::new())); + let running_tasks = Arc::new(Mutex::new(HashMap::new())); let arc_self = Arc::new(Self { internal_scheduler, jobs, storage_path, + running_tasks, }); arc_self.load_jobs_from_storage().await?; @@ -208,17 +219,21 @@ impl Scheduler { let mut stored_job = original_job_spec.clone(); stored_job.source = destination_recipe_path.to_string_lossy().into_owned(); + stored_job.current_session_id = None; + stored_job.process_start_time = None; tracing::info!("Updated job source path to: {}", stored_job.source); let job_for_task = stored_job.clone(); let jobs_arc_for_task = self.jobs.clone(); let storage_path_for_task = self.storage_path.clone(); + let running_tasks_for_task = self.running_tasks.clone(); let cron_task = Job::new_async(&stored_job.cron, move |_uuid, _l| { let task_job_id = job_for_task.id.clone(); let current_jobs_arc = jobs_arc_for_task.clone(); let local_storage_path = storage_path_for_task.clone(); let job_to_execute = job_for_task.clone(); // Clone for run_scheduled_job_internal + let running_tasks_arc = running_tasks_for_task.clone(); Box::pin(async move { // Check if the job is paused before executing @@ -243,6 +258,7 @@ impl Scheduler { if let Some((_, current_job_in_map)) = jobs_map_guard.get_mut(&task_job_id) { current_job_in_map.last_run = Some(current_time); current_job_in_map.currently_running = true; + current_job_in_map.process_start_time = Some(current_time); needs_persist = true; } } @@ -258,14 +274,37 @@ impl Scheduler { ); } } - // Pass None for provider_override in normal execution - let result = run_scheduled_job_internal(job_to_execute, None).await; + + // Spawn the job execution as an abortable task + let job_task = tokio::spawn(run_scheduled_job_internal( + job_to_execute.clone(), + None, + Some(current_jobs_arc.clone()), + Some(task_job_id.clone()), + )); + + // Store the abort handle at the scheduler level + { + let mut running_tasks_guard = running_tasks_arc.lock().await; + running_tasks_guard.insert(task_job_id.clone(), job_task.abort_handle()); + } + + // Wait for the job to complete or be aborted + let result = job_task.await; + + // Remove the abort handle + { + let mut running_tasks_guard = running_tasks_arc.lock().await; + running_tasks_guard.remove(&task_job_id); + } // Update the job status after execution { let mut jobs_map_guard = current_jobs_arc.lock().await; if let Some((_, current_job_in_map)) = jobs_map_guard.get_mut(&task_job_id) { current_job_in_map.currently_running = false; + current_job_in_map.current_session_id = None; + current_job_in_map.process_start_time = None; needs_persist = true; } } @@ -282,12 +321,27 @@ impl Scheduler { } } - if let Err(e) = result { - tracing::error!( - "Scheduled job '{}' execution failed: {}", - &e.job_id, - e.error - ); + match result { + Ok(Ok(_session_id)) => { + tracing::info!("Scheduled job '{}' completed successfully", &task_job_id); + } + Ok(Err(e)) => { + tracing::error!( + "Scheduled job '{}' execution failed: {}", + &e.job_id, + e.error + ); + } + Err(join_error) if join_error.is_cancelled() => { + tracing::info!("Scheduled job '{}' was cancelled/killed", &task_job_id); + } + Err(join_error) => { + tracing::error!( + "Scheduled job '{}' task failed: {}", + &task_job_id, + join_error + ); + } } }) }) @@ -328,12 +382,14 @@ impl Scheduler { let job_for_task = job_to_load.clone(); let jobs_arc_for_task = self.jobs.clone(); let storage_path_for_task = self.storage_path.clone(); + let running_tasks_for_task = self.running_tasks.clone(); let cron_task = Job::new_async(&job_to_load.cron, move |_uuid, _l| { let task_job_id = job_for_task.id.clone(); let current_jobs_arc = jobs_arc_for_task.clone(); let local_storage_path = storage_path_for_task.clone(); let job_to_execute = job_for_task.clone(); // Clone for run_scheduled_job_internal + let running_tasks_arc = running_tasks_for_task.clone(); Box::pin(async move { // Check if the job is paused before executing @@ -358,6 +414,7 @@ impl Scheduler { if let Some((_, stored_job)) = jobs_map_guard.get_mut(&task_job_id) { stored_job.last_run = Some(current_time); stored_job.currently_running = true; + stored_job.process_start_time = Some(current_time); needs_persist = true; } } @@ -373,14 +430,37 @@ impl Scheduler { ); } } - // Pass None for provider_override in normal execution - let result = run_scheduled_job_internal(job_to_execute, None).await; + + // Spawn the job execution as an abortable task + let job_task = tokio::spawn(run_scheduled_job_internal( + job_to_execute, + None, + Some(current_jobs_arc.clone()), + Some(task_job_id.clone()), + )); + + // Store the abort handle at the scheduler level + { + let mut running_tasks_guard = running_tasks_arc.lock().await; + running_tasks_guard.insert(task_job_id.clone(), job_task.abort_handle()); + } + + // Wait for the job to complete or be aborted + let result = job_task.await; + + // Remove the abort handle + { + let mut running_tasks_guard = running_tasks_arc.lock().await; + running_tasks_guard.remove(&task_job_id); + } // Update the job status after execution { let mut jobs_map_guard = current_jobs_arc.lock().await; if let Some((_, stored_job)) = jobs_map_guard.get_mut(&task_job_id) { stored_job.currently_running = false; + stored_job.current_session_id = None; + stored_job.process_start_time = None; needs_persist = true; } } @@ -397,12 +477,30 @@ impl Scheduler { } } - if let Err(e) = result { - tracing::error!( - "Scheduled job '{}' execution failed: {}", - &e.job_id, - e.error - ); + match result { + Ok(Ok(_session_id)) => { + tracing::info!( + "Scheduled job '{}' completed successfully", + &task_job_id + ); + } + Ok(Err(e)) => { + tracing::error!( + "Scheduled job '{}' execution failed: {}", + &e.job_id, + e.error + ); + } + Err(join_error) if join_error.is_cancelled() => { + tracing::info!("Scheduled job '{}' was cancelled/killed", &task_job_id); + } + Err(join_error) => { + tracing::error!( + "Scheduled job '{}' task failed: {}", + &task_job_id, + join_error + ); + } } }) }) @@ -421,7 +519,7 @@ impl Scheduler { // Renamed and kept for direct use when a guard is already held (e.g. add/remove) async fn persist_jobs_to_storage_with_guard( &self, - jobs_guard: &tokio::sync::MutexGuard<'_, HashMap>, + jobs_guard: &tokio::sync::MutexGuard<'_, JobsMap>, ) -> Result<(), SchedulerError> { let list: Vec = jobs_guard.values().map(|(_, j)| j.clone()).collect(); if let Some(parent) = self.storage_path.parent() { @@ -523,14 +621,36 @@ impl Scheduler { } }; - // Pass None for provider_override in normal execution - let run_result = run_scheduled_job_internal(job_to_run.clone(), None).await; + // Spawn the job execution as an abortable task for run_now + let job_task = tokio::spawn(run_scheduled_job_internal( + job_to_run.clone(), + None, + Some(self.jobs.clone()), + Some(sched_id.to_string()), + )); + + // Store the abort handle for run_now jobs + { + let mut running_tasks_guard = self.running_tasks.lock().await; + running_tasks_guard.insert(sched_id.to_string(), job_task.abort_handle()); + } + + // Wait for the job to complete or be aborted + let run_result = job_task.await; + + // Remove the abort handle + { + let mut running_tasks_guard = self.running_tasks.lock().await; + running_tasks_guard.remove(sched_id); + } // Clear the currently_running flag after execution { let mut jobs_guard = self.jobs.lock().await; if let Some((_tokio_job_id, job_in_map)) = jobs_guard.get_mut(sched_id) { job_in_map.currently_running = false; + job_in_map.current_session_id = None; + job_in_map.process_start_time = None; job_in_map.last_run = Some(Utc::now()); } // MutexGuard is dropped here } @@ -539,12 +659,24 @@ impl Scheduler { self.persist_jobs().await?; match run_result { - Ok(session_id) => Ok(session_id), - Err(e) => Err(SchedulerError::AnyhowError(anyhow!( + Ok(Ok(session_id)) => Ok(session_id), + Ok(Err(e)) => Err(SchedulerError::AnyhowError(anyhow!( "Failed to execute job '{}' immediately: {}", sched_id, e.error ))), + Err(join_error) if join_error.is_cancelled() => { + tracing::info!("Run now job '{}' was cancelled/killed", sched_id); + Err(SchedulerError::AnyhowError(anyhow!( + "Job '{}' was successfully cancelled", + sched_id + ))) + } + Err(join_error) => Err(SchedulerError::AnyhowError(anyhow!( + "Failed to execute job '{}' immediately: {}", + sched_id, + join_error + ))), } } @@ -608,12 +740,14 @@ impl Scheduler { let job_for_task = job_def.clone(); let jobs_arc_for_task = self.jobs.clone(); let storage_path_for_task = self.storage_path.clone(); + let running_tasks_for_task = self.running_tasks.clone(); let cron_task = Job::new_async(&new_cron, move |_uuid, _l| { let task_job_id = job_for_task.id.clone(); let current_jobs_arc = jobs_arc_for_task.clone(); let local_storage_path = storage_path_for_task.clone(); let job_to_execute = job_for_task.clone(); + let running_tasks_arc = running_tasks_for_task.clone(); Box::pin(async move { // Check if the job is paused before executing @@ -641,6 +775,7 @@ impl Scheduler { { current_job_in_map.last_run = Some(current_time); current_job_in_map.currently_running = true; + current_job_in_map.process_start_time = Some(current_time); needs_persist = true; } } @@ -657,7 +792,29 @@ impl Scheduler { } } - let result = run_scheduled_job_internal(job_to_execute, None).await; + // Spawn the job execution as an abortable task + let job_task = tokio::spawn(run_scheduled_job_internal( + job_to_execute, + None, + Some(current_jobs_arc.clone()), + Some(task_job_id.clone()), + )); + + // Store the abort handle at the scheduler level + { + let mut running_tasks_guard = running_tasks_arc.lock().await; + running_tasks_guard + .insert(task_job_id.clone(), job_task.abort_handle()); + } + + // Wait for the job to complete or be aborted + let result = job_task.await; + + // Remove the abort handle + { + let mut running_tasks_guard = running_tasks_arc.lock().await; + running_tasks_guard.remove(&task_job_id); + } // Update the job status after execution { @@ -666,6 +823,8 @@ impl Scheduler { jobs_map_guard.get_mut(&task_job_id) { current_job_in_map.currently_running = false; + current_job_in_map.current_session_id = None; + current_job_in_map.process_start_time = None; needs_persist = true; } } @@ -682,12 +841,33 @@ impl Scheduler { } } - if let Err(e) = result { - tracing::error!( - "Scheduled job '{}' execution failed: {}", - &e.job_id, - e.error - ); + match result { + Ok(Ok(_session_id)) => { + tracing::info!( + "Scheduled job '{}' completed successfully", + &task_job_id + ); + } + Ok(Err(e)) => { + tracing::error!( + "Scheduled job '{}' execution failed: {}", + &e.job_id, + e.error + ); + } + Err(join_error) if join_error.is_cancelled() => { + tracing::info!( + "Scheduled job '{}' was cancelled/killed", + &task_job_id + ); + } + Err(join_error) => { + tracing::error!( + "Scheduled job '{}' task failed: {}", + &task_job_id, + join_error + ); + } } }) }) @@ -709,6 +889,70 @@ impl Scheduler { None => Err(SchedulerError::JobNotFound(sched_id.to_string())), } } + + pub async fn kill_running_job(&self, sched_id: &str) -> Result<(), SchedulerError> { + let mut jobs_guard = self.jobs.lock().await; + match jobs_guard.get_mut(sched_id) { + Some((_, job_def)) => { + if !job_def.currently_running { + return Err(SchedulerError::AnyhowError(anyhow!( + "Schedule '{}' is not currently running", + sched_id + ))); + } + + tracing::info!("Killing running job '{}'", sched_id); + + // Abort the running task if it exists + { + let mut running_tasks_guard = self.running_tasks.lock().await; + if let Some(abort_handle) = running_tasks_guard.remove(sched_id) { + abort_handle.abort(); + tracing::info!("Aborted running task for job '{}'", sched_id); + } else { + tracing::warn!( + "No abort handle found for job '{}' in running tasks map", + sched_id + ); + } + } + + // Mark the job as no longer running + job_def.currently_running = false; + job_def.current_session_id = None; + job_def.process_start_time = None; + + self.persist_jobs_to_storage_with_guard(&jobs_guard).await?; + + tracing::info!("Successfully killed job '{}'", sched_id); + Ok(()) + } + None => Err(SchedulerError::JobNotFound(sched_id.to_string())), + } + } + + pub async fn get_running_job_info( + &self, + sched_id: &str, + ) -> Result)>, SchedulerError> { + let jobs_guard = self.jobs.lock().await; + match jobs_guard.get(sched_id) { + Some((_, job_def)) => { + if job_def.currently_running { + if let (Some(session_id), Some(start_time)) = + (&job_def.current_session_id, &job_def.process_start_time) + { + Ok(Some((session_id.clone(), *start_time))) + } else { + Ok(None) + } + } else { + Ok(None) + } + } + None => Err(SchedulerError::JobNotFound(sched_id.to_string())), + } + } } #[derive(Debug)] @@ -720,6 +964,8 @@ struct JobExecutionError { async fn run_scheduled_job_internal( job: ScheduledJob, provider_override: Option>, // New optional parameter + jobs_arc: Option>>, + job_id: Option, ) -> std::result::Result { tracing::info!("Executing job: {} (Source: {})", job.id, job.source); @@ -811,6 +1057,15 @@ async fn run_scheduled_job_internal( tracing::info!("Agent configured with provider for job '{}'", job.id); let session_id_for_return = session::generate_session_id(); + + // Update the job with the session ID if we have access to the jobs arc + if let (Some(jobs_arc), Some(job_id_str)) = (jobs_arc.as_ref(), job_id.as_ref()) { + let mut jobs_guard = jobs_arc.lock().await; + if let Some((_, job_def)) = jobs_guard.get_mut(job_id_str) { + job_def.current_session_id = Some(session_id_for_return.clone()); + } + } + let session_file_path = crate::session::storage::get_path( crate::session::storage::Identifier::Name(session_id_for_return.clone()), ); @@ -843,6 +1098,9 @@ async fn run_scheduled_job_internal( use futures::StreamExt; while let Some(message_result) = stream.next().await { + // Check if the task has been cancelled + tokio::task::yield_now().await; + match message_result { Ok(msg) => { if msg.role == mcp_core::role::Role::Assistant { @@ -1053,6 +1311,8 @@ mod tests { last_run: None, currently_running: false, paused: false, + current_session_id: None, + process_start_time: None, }; // Create the mock provider instance for the test @@ -1061,7 +1321,7 @@ mod tests { // Call run_scheduled_job_internal, passing the mock provider let created_session_id = - run_scheduled_job_internal(dummy_job.clone(), Some(mock_provider_instance)) + run_scheduled_job_internal(dummy_job.clone(), Some(mock_provider_instance), None, None) .await .expect("run_scheduled_job_internal failed"); diff --git a/ui/desktop/openapi.json b/ui/desktop/openapi.json index 1f4c197c..edeba03c 100644 --- a/ui/desktop/openapi.json +++ b/ui/desktop/openapi.json @@ -589,6 +589,66 @@ } } }, + "/schedule/{id}/inspect": { + "get": { + "tags": [ + "schedule" + ], + "operationId": "inspect_running_job", + "parameters": [ + { + "name": "id", + "in": "path", + "description": "ID of the schedule to inspect", + "required": true, + "schema": { + "type": "string" + } + } + ], + "responses": { + "200": { + "description": "Running job information", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/InspectJobResponse" + } + } + } + }, + "404": { + "description": "Scheduled job not found" + }, + "500": { + "description": "Internal server error" + } + } + } + }, + "/schedule/{id}/kill": { + "post": { + "tags": [ + "schedule" + ], + "operationId": "kill_running_job", + "parameters": [ + { + "name": "id", + "in": "path", + "required": true, + "schema": { + "type": "string" + } + } + ], + "responses": { + "200": { + "description": "Running job killed successfully" + } + } + } + }, "/schedule/{id}/pause": { "post": { "tags": [ @@ -1332,6 +1392,35 @@ } } }, + "InspectJobResponse": { + "type": "object", + "properties": { + "processStartTime": { + "type": "string", + "nullable": true + }, + "runningDurationSeconds": { + "type": "integer", + "format": "int64", + "nullable": true + }, + "sessionId": { + "type": "string", + "nullable": true + } + } + }, + "KillJobResponse": { + "type": "object", + "required": [ + "message" + ], + "properties": { + "message": { + "type": "string" + } + } + }, "ListSchedulesResponse": { "type": "object", "required": [ @@ -1805,6 +1894,10 @@ "cron": { "type": "string" }, + "current_session_id": { + "type": "string", + "nullable": true + }, "currently_running": { "type": "boolean" }, @@ -1819,6 +1912,11 @@ "paused": { "type": "boolean" }, + "process_start_time": { + "type": "string", + "format": "date-time", + "nullable": true + }, "source": { "type": "string" } diff --git a/ui/desktop/src/api/sdk.gen.ts b/ui/desktop/src/api/sdk.gen.ts index 79472945..7ed369e2 100644 --- a/ui/desktop/src/api/sdk.gen.ts +++ b/ui/desktop/src/api/sdk.gen.ts @@ -1,7 +1,7 @@ // This file is auto-generated by @hey-api/openapi-ts import type { Options as ClientOptions, TDataShape, Client } from '@hey-api/client-fetch'; -import type { GetToolsData, GetToolsResponse, ReadAllConfigData, ReadAllConfigResponse, BackupConfigData, BackupConfigResponse, GetExtensionsData, GetExtensionsResponse, AddExtensionData, AddExtensionResponse, RemoveExtensionData, RemoveExtensionResponse, InitConfigData, InitConfigResponse, UpsertPermissionsData, UpsertPermissionsResponse, ProvidersData, ProvidersResponse2, ReadConfigData, RemoveConfigData, RemoveConfigResponse, UpsertConfigData, UpsertConfigResponse, ConfirmPermissionData, ManageContextData, ManageContextResponse, CreateScheduleData, CreateScheduleResponse, DeleteScheduleData, DeleteScheduleResponse, ListSchedulesData, ListSchedulesResponse2, UpdateScheduleData, UpdateScheduleResponse, PauseScheduleData, PauseScheduleResponse, RunNowHandlerData, RunNowHandlerResponse, SessionsHandlerData, SessionsHandlerResponse, UnpauseScheduleData, UnpauseScheduleResponse, ListSessionsData, ListSessionsResponse, GetSessionHistoryData, GetSessionHistoryResponse } from './types.gen'; +import type { GetToolsData, GetToolsResponse, ReadAllConfigData, ReadAllConfigResponse, BackupConfigData, BackupConfigResponse, GetExtensionsData, GetExtensionsResponse, AddExtensionData, AddExtensionResponse, RemoveExtensionData, RemoveExtensionResponse, InitConfigData, InitConfigResponse, UpsertPermissionsData, UpsertPermissionsResponse, ProvidersData, ProvidersResponse2, ReadConfigData, RemoveConfigData, RemoveConfigResponse, UpsertConfigData, UpsertConfigResponse, ConfirmPermissionData, ManageContextData, ManageContextResponse, CreateScheduleData, CreateScheduleResponse, DeleteScheduleData, DeleteScheduleResponse, ListSchedulesData, ListSchedulesResponse2, UpdateScheduleData, UpdateScheduleResponse, InspectRunningJobData, InspectRunningJobResponse, KillRunningJobData, PauseScheduleData, PauseScheduleResponse, RunNowHandlerData, RunNowHandlerResponse, SessionsHandlerData, SessionsHandlerResponse, UnpauseScheduleData, UnpauseScheduleResponse, ListSessionsData, ListSessionsResponse, GetSessionHistoryData, GetSessionHistoryResponse } from './types.gen'; import { client as _heyApiClient } from './client.gen'; export type Options = ClientOptions & { @@ -180,6 +180,20 @@ export const updateSchedule = (options: Op }); }; +export const inspectRunningJob = (options: Options) => { + return (options.client ?? _heyApiClient).get({ + url: '/schedule/{id}/inspect', + ...options + }); +}; + +export const killRunningJob = (options: Options) => { + return (options.client ?? _heyApiClient).post({ + url: '/schedule/{id}/kill', + ...options + }); +}; + export const pauseSchedule = (options: Options) => { return (options.client ?? _heyApiClient).post({ url: '/schedule/{id}/pause', diff --git a/ui/desktop/src/api/types.gen.ts b/ui/desktop/src/api/types.gen.ts index a032086c..424a469b 100644 --- a/ui/desktop/src/api/types.gen.ts +++ b/ui/desktop/src/api/types.gen.ts @@ -172,6 +172,16 @@ export type ImageContent = { mimeType: string; }; +export type InspectJobResponse = { + processStartTime?: string | null; + runningDurationSeconds?: number | null; + sessionId?: string | null; +}; + +export type KillJobResponse = { + message: string; +}; + export type ListSchedulesResponse = { jobs: Array; }; @@ -304,10 +314,12 @@ export type RunNowResponse = { export type ScheduledJob = { cron: string; + current_session_id?: string | null; currently_running?: boolean; id: string; last_run?: string | null; paused?: boolean; + process_start_time?: string | null; source: string; }; @@ -1004,6 +1016,54 @@ export type UpdateScheduleResponses = { export type UpdateScheduleResponse = UpdateScheduleResponses[keyof UpdateScheduleResponses]; +export type InspectRunningJobData = { + body?: never; + path: { + /** + * ID of the schedule to inspect + */ + id: string; + }; + query?: never; + url: '/schedule/{id}/inspect'; +}; + +export type InspectRunningJobErrors = { + /** + * Scheduled job not found + */ + 404: unknown; + /** + * Internal server error + */ + 500: unknown; +}; + +export type InspectRunningJobResponses = { + /** + * Running job information + */ + 200: InspectJobResponse; +}; + +export type InspectRunningJobResponse = InspectRunningJobResponses[keyof InspectRunningJobResponses]; + +export type KillRunningJobData = { + body?: never; + path: { + id: string; + }; + query?: never; + url: '/schedule/{id}/kill'; +}; + +export type KillRunningJobResponses = { + /** + * Running job killed successfully + */ + 200: unknown; +}; + export type PauseScheduleData = { body?: never; path: { diff --git a/ui/desktop/src/components/schedule/CreateScheduleModal.tsx b/ui/desktop/src/components/schedule/CreateScheduleModal.tsx index 8aacc960..1c79d4c5 100644 --- a/ui/desktop/src/components/schedule/CreateScheduleModal.tsx +++ b/ui/desktop/src/components/schedule/CreateScheduleModal.tsx @@ -2,7 +2,7 @@ import React, { useState, useEffect, FormEvent } from 'react'; import { Card } from '../ui/card'; import { Button } from '../ui/button'; import { Input } from '../ui/input'; -import { Select } from '../ui/select'; +import { Select } from '../ui/Select'; import cronstrue from 'cronstrue'; type FrequencyValue = 'once' | 'hourly' | 'daily' | 'weekly' | 'monthly'; diff --git a/ui/desktop/src/components/schedule/ScheduleDetailView.tsx b/ui/desktop/src/components/schedule/ScheduleDetailView.tsx index 3b4bf70c..155fef21 100644 --- a/ui/desktop/src/components/schedule/ScheduleDetailView.tsx +++ b/ui/desktop/src/components/schedule/ScheduleDetailView.tsx @@ -5,11 +5,21 @@ import BackButton from '../ui/BackButton'; import { Card } from '../ui/card'; import MoreMenuLayout from '../more_menu/MoreMenuLayout'; import { fetchSessionDetails, SessionDetails } from '../../sessions'; -import { getScheduleSessions, runScheduleNow, pauseSchedule, unpauseSchedule, updateSchedule, listSchedules, ScheduledJob } from '../../schedule'; +import { + getScheduleSessions, + runScheduleNow, + pauseSchedule, + unpauseSchedule, + updateSchedule, + listSchedules, + killRunningJob, + inspectRunningJob, + ScheduledJob, +} from '../../schedule'; import SessionHistoryView from '../sessions/SessionHistoryView'; import { EditScheduleModal } from './EditScheduleModal'; import { toastError, toastSuccess } from '../../toasts'; -import { Loader2, Pause, Play, Edit } from 'lucide-react'; +import { Loader2, Pause, Play, Edit, Square, Eye } from 'lucide-react'; import cronstrue from 'cronstrue'; interface ScheduleSessionMeta { @@ -40,9 +50,14 @@ const ScheduleDetailView: React.FC = ({ scheduleId, onN const [scheduleDetails, setScheduleDetails] = useState(null); const [isLoadingSchedule, setIsLoadingSchedule] = useState(false); const [scheduleError, setScheduleError] = useState(null); - + // Individual loading states for each action to prevent double-clicks const [pauseUnpauseLoading, setPauseUnpauseLoading] = useState(false); + const [killJobLoading, setKillJobLoading] = useState(false); + const [inspectJobLoading, setInspectJobLoading] = useState(false); + + // Track if we explicitly killed a job to distinguish from natural completion + const [jobWasKilled, setJobWasKilled] = useState(false); const [selectedSessionDetails, setSelectedSessionDetails] = useState(null); const [isLoadingSessionDetails, setIsLoadingSessionDetails] = useState(false); @@ -68,25 +83,34 @@ const ScheduleDetailView: React.FC = ({ scheduleId, onN } }, []); - const fetchScheduleDetails = useCallback(async (sId: string) => { - if (!sId) return; - setIsLoadingSchedule(true); - setScheduleError(null); - try { - const allSchedules = await listSchedules(); - const schedule = allSchedules.find((s) => s.id === sId); - if (schedule) { - setScheduleDetails(schedule); - } else { - setScheduleError('Schedule not found'); + const fetchScheduleDetails = useCallback( + async (sId: string) => { + if (!sId) return; + setIsLoadingSchedule(true); + setScheduleError(null); + try { + const allSchedules = await listSchedules(); + const schedule = allSchedules.find((s) => s.id === sId); + if (schedule) { + // Only reset runNowLoading if we explicitly killed the job + // This prevents interfering with natural job completion + if (!schedule.currently_running && runNowLoading && jobWasKilled) { + setRunNowLoading(false); + setJobWasKilled(false); // Reset the flag + } + setScheduleDetails(schedule); + } else { + setScheduleError('Schedule not found'); + } + } catch (err) { + console.error('Failed to fetch schedule details:', err); + setScheduleError(err instanceof Error ? err.message : 'Failed to fetch schedule details'); + } finally { + setIsLoadingSchedule(false); } - } catch (err) { - console.error('Failed to fetch schedule details:', err); - setScheduleError(err instanceof Error ? err.message : 'Failed to fetch schedule details'); - } finally { - setIsLoadingSchedule(false); - } - }, []); + }, + [runNowLoading, jobWasKilled] + ); const getReadableCron = (cronString: string) => { try { @@ -108,6 +132,7 @@ const ScheduleDetailView: React.FC = ({ scheduleId, onN setSelectedSessionDetails(null); setScheduleDetails(null); setScheduleError(null); + setJobWasKilled(false); // Reset kill flag when changing schedules } }, [scheduleId, fetchScheduleSessions, fetchScheduleDetails, selectedSessionDetails]); @@ -115,11 +140,18 @@ const ScheduleDetailView: React.FC = ({ scheduleId, onN if (!scheduleId) return; setRunNowLoading(true); try { - const newSessionId = await runScheduleNow(scheduleId); // MODIFIED - toastSuccess({ - title: 'Schedule Triggered', - msg: `Successfully triggered schedule. New session ID: ${newSessionId}`, - }); + const newSessionId = await runScheduleNow(scheduleId); + if (newSessionId === 'CANCELLED') { + toastSuccess({ + title: 'Job Cancelled', + msg: 'The job was cancelled while starting up.', + }); + } else { + toastSuccess({ + title: 'Schedule Triggered', + msg: `Successfully triggered schedule. New session ID: ${newSessionId}`, + }); + } setTimeout(() => { if (scheduleId) { fetchScheduleSessions(scheduleId); @@ -183,9 +215,60 @@ const ScheduleDetailView: React.FC = ({ scheduleId, onN setEditApiError(null); }; + const handleKillRunningJob = async () => { + if (!scheduleId) return; + setKillJobLoading(true); + try { + const result = await killRunningJob(scheduleId); + toastSuccess({ + title: 'Job Killed', + msg: result.message, + }); + // Mark that we explicitly killed this job + setJobWasKilled(true); + // Clear the runNowLoading state immediately when job is killed + setRunNowLoading(false); + fetchScheduleDetails(scheduleId); + } catch (err) { + console.error('Failed to kill running job:', err); + const errorMsg = err instanceof Error ? err.message : 'Failed to kill running job'; + toastError({ title: 'Kill Job Error', msg: errorMsg }); + } finally { + setKillJobLoading(false); + } + }; + + const handleInspectRunningJob = async () => { + if (!scheduleId) return; + setInspectJobLoading(true); + try { + const result = await inspectRunningJob(scheduleId); + if (result.sessionId) { + const duration = result.runningDurationSeconds + ? `${Math.floor(result.runningDurationSeconds / 60)}m ${result.runningDurationSeconds % 60}s` + : 'Unknown'; + toastSuccess({ + title: 'Job Inspection', + msg: `Session: ${result.sessionId}\nRunning for: ${duration}`, + }); + } else { + toastSuccess({ + title: 'Job Inspection', + msg: 'No detailed information available for this job', + }); + } + } catch (err) { + console.error('Failed to inspect running job:', err); + const errorMsg = err instanceof Error ? err.message : 'Failed to inspect running job'; + toastError({ title: 'Inspect Job Error', msg: errorMsg }); + } finally { + setInspectJobLoading(false); + } + }; + const handleEditScheduleSubmit = async (cron: string) => { if (!scheduleId) return; - + setIsEditSubmitting(true); setEditApiError(null); try { @@ -226,6 +309,18 @@ const ScheduleDetailView: React.FC = ({ scheduleId, onN }; }, [scheduleId, fetchScheduleDetails]); + // Monitor schedule state changes and reset loading states appropriately + useEffect(() => { + if (scheduleDetails) { + // Only reset runNowLoading if we explicitly killed the job + // This prevents interfering with natural job completion + if (!scheduleDetails.currently_running && runNowLoading && jobWasKilled) { + setRunNowLoading(false); + setJobWasKilled(false); // Reset the flag + } + } + }, [scheduleDetails, runNowLoading, jobWasKilled]); + const loadAndShowSessionDetails = async (sessionId: string) => { setIsLoadingSessionDetails(true); setSessionDetailsError(null); @@ -364,6 +459,18 @@ const ScheduleDetailView: React.FC = ({ scheduleId, onN ? new Date(scheduleDetails.last_run).toLocaleString() : 'Never'}

+ {scheduleDetails.currently_running && scheduleDetails.current_session_id && ( +

+ Current Session:{' '} + {scheduleDetails.current_session_id} +

+ )} + {scheduleDetails.currently_running && scheduleDetails.process_start_time && ( +

+ Process Started:{' '} + {new Date(scheduleDetails.process_start_time).toLocaleString()} +

+ )} )} @@ -379,7 +486,7 @@ const ScheduleDetailView: React.FC = ({ scheduleId, onN > {runNowLoading ? 'Triggering...' : 'Run Schedule Now'} - + {scheduleDetails && !scheduleDetails.currently_running && ( <> )} + + {scheduleDetails && scheduleDetails.currently_running && ( + <> + + + + )} - + {scheduleDetails?.currently_running && (

Cannot trigger or modify a schedule while it's already running.

)} - + {scheduleDetails?.paused && (

- This schedule is paused and will not run automatically. Use "Run Schedule Now" to trigger it manually or unpause to resume automatic execution. + This schedule is paused and will not run automatically. Use "Run Schedule Now" to + trigger it manually or unpause to resume automatic execution.

)} diff --git a/ui/desktop/src/components/schedule/SchedulesView.tsx b/ui/desktop/src/components/schedule/SchedulesView.tsx index 18dd5fe6..6743da33 100644 --- a/ui/desktop/src/components/schedule/SchedulesView.tsx +++ b/ui/desktop/src/components/schedule/SchedulesView.tsx @@ -1,12 +1,22 @@ import React, { useState, useEffect } from 'react'; -import { listSchedules, createSchedule, deleteSchedule, pauseSchedule, unpauseSchedule, updateSchedule, ScheduledJob } from '../../schedule'; +import { + listSchedules, + createSchedule, + deleteSchedule, + pauseSchedule, + unpauseSchedule, + updateSchedule, + killRunningJob, + inspectRunningJob, + ScheduledJob, +} from '../../schedule'; import BackButton from '../ui/BackButton'; import { ScrollArea } from '../ui/scroll-area'; import MoreMenuLayout from '../more_menu/MoreMenuLayout'; import { Card } from '../ui/card'; import { Button } from '../ui/button'; import { TrashIcon } from '../icons/TrashIcon'; -import { Plus, RefreshCw, Pause, Play, Edit } from 'lucide-react'; +import { Plus, RefreshCw, Pause, Play, Edit, Square, Eye } from 'lucide-react'; import { CreateScheduleModal, NewSchedulePayload } from './CreateScheduleModal'; import { EditScheduleModal } from './EditScheduleModal'; import ScheduleDetailView from './ScheduleDetailView'; @@ -27,10 +37,12 @@ const SchedulesView: React.FC = ({ onClose }) => { const [isEditModalOpen, setIsEditModalOpen] = useState(false); const [editingSchedule, setEditingSchedule] = useState(null); const [isRefreshing, setIsRefreshing] = useState(false); - + // Individual loading states for each action to prevent double-clicks const [pausingScheduleIds, setPausingScheduleIds] = useState>(new Set()); const [deletingScheduleIds, setDeletingScheduleIds] = useState>(new Set()); + const [killingScheduleIds, setKillingScheduleIds] = useState>(new Set()); + const [inspectingScheduleIds, setInspectingScheduleIds] = useState>(new Set()); const [viewingScheduleId, setViewingScheduleId] = useState(null); @@ -125,7 +137,7 @@ const SchedulesView: React.FC = ({ onClose }) => { const handleEditScheduleSubmit = async (cron: string) => { if (!editingSchedule) return; - + setIsSubmitting(true); setSubmitApiError(null); try { @@ -153,10 +165,10 @@ const SchedulesView: React.FC = ({ onClose }) => { const handleDeleteSchedule = async (idToDelete: string) => { if (!window.confirm(`Are you sure you want to delete schedule "${idToDelete}"?`)) return; - + // Immediately add to deleting set to disable button - setDeletingScheduleIds(prev => new Set(prev).add(idToDelete)); - + setDeletingScheduleIds((prev) => new Set(prev).add(idToDelete)); + if (viewingScheduleId === idToDelete) { setViewingScheduleId(null); } @@ -171,7 +183,7 @@ const SchedulesView: React.FC = ({ onClose }) => { ); } finally { // Remove from deleting set - setDeletingScheduleIds(prev => { + setDeletingScheduleIds((prev) => { const newSet = new Set(prev); newSet.delete(idToDelete); return newSet; @@ -181,8 +193,8 @@ const SchedulesView: React.FC = ({ onClose }) => { const handlePauseSchedule = async (idToPause: string) => { // Immediately add to pausing set to disable button - setPausingScheduleIds(prev => new Set(prev).add(idToPause)); - + setPausingScheduleIds((prev) => new Set(prev).add(idToPause)); + setApiError(null); try { await pauseSchedule(idToPause); @@ -193,7 +205,8 @@ const SchedulesView: React.FC = ({ onClose }) => { await fetchSchedules(); } catch (error) { console.error(`Failed to pause schedule "${idToPause}":`, error); - const errorMsg = error instanceof Error ? error.message : `Unknown error pausing "${idToPause}".`; + const errorMsg = + error instanceof Error ? error.message : `Unknown error pausing "${idToPause}".`; setApiError(errorMsg); toastError({ title: 'Pause Schedule Error', @@ -201,7 +214,7 @@ const SchedulesView: React.FC = ({ onClose }) => { }); } finally { // Remove from pausing set - setPausingScheduleIds(prev => { + setPausingScheduleIds((prev) => { const newSet = new Set(prev); newSet.delete(idToPause); return newSet; @@ -211,8 +224,8 @@ const SchedulesView: React.FC = ({ onClose }) => { const handleUnpauseSchedule = async (idToUnpause: string) => { // Immediately add to pausing set to disable button - setPausingScheduleIds(prev => new Set(prev).add(idToUnpause)); - + setPausingScheduleIds((prev) => new Set(prev).add(idToUnpause)); + setApiError(null); try { await unpauseSchedule(idToUnpause); @@ -223,7 +236,8 @@ const SchedulesView: React.FC = ({ onClose }) => { await fetchSchedules(); } catch (error) { console.error(`Failed to unpause schedule "${idToUnpause}":`, error); - const errorMsg = error instanceof Error ? error.message : `Unknown error unpausing "${idToUnpause}".`; + const errorMsg = + error instanceof Error ? error.message : `Unknown error unpausing "${idToUnpause}".`; setApiError(errorMsg); toastError({ title: 'Unpause Schedule Error', @@ -231,7 +245,7 @@ const SchedulesView: React.FC = ({ onClose }) => { }); } finally { // Remove from pausing set - setPausingScheduleIds(prev => { + setPausingScheduleIds((prev) => { const newSet = new Set(prev); newSet.delete(idToUnpause); return newSet; @@ -239,6 +253,77 @@ const SchedulesView: React.FC = ({ onClose }) => { } }; + const handleKillRunningJob = async (scheduleId: string) => { + // Immediately add to killing set to disable button + setKillingScheduleIds((prev) => new Set(prev).add(scheduleId)); + + setApiError(null); + try { + const result = await killRunningJob(scheduleId); + toastSuccess({ + title: 'Job Killed', + msg: result.message, + }); + await fetchSchedules(); + } catch (error) { + console.error(`Failed to kill running job "${scheduleId}":`, error); + const errorMsg = + error instanceof Error ? error.message : `Unknown error killing job "${scheduleId}".`; + setApiError(errorMsg); + toastError({ + title: 'Kill Job Error', + msg: errorMsg, + }); + } finally { + // Remove from killing set + setKillingScheduleIds((prev) => { + const newSet = new Set(prev); + newSet.delete(scheduleId); + return newSet; + }); + } + }; + + const handleInspectRunningJob = async (scheduleId: string) => { + // Immediately add to inspecting set to disable button + setInspectingScheduleIds((prev) => new Set(prev).add(scheduleId)); + + setApiError(null); + try { + const result = await inspectRunningJob(scheduleId); + if (result.sessionId) { + const duration = result.runningDurationSeconds + ? `${Math.floor(result.runningDurationSeconds / 60)}m ${result.runningDurationSeconds % 60}s` + : 'Unknown'; + toastSuccess({ + title: 'Job Inspection', + msg: `Session: ${result.sessionId}\nRunning for: ${duration}`, + }); + } else { + toastSuccess({ + title: 'Job Inspection', + msg: 'No detailed information available for this job', + }); + } + } catch (error) { + console.error(`Failed to inspect running job "${scheduleId}":`, error); + const errorMsg = + error instanceof Error ? error.message : `Unknown error inspecting job "${scheduleId}".`; + setApiError(errorMsg); + toastError({ + title: 'Inspect Job Error', + msg: errorMsg, + }); + } finally { + // Remove from inspecting set + setInspectingScheduleIds((prev) => { + const newSet = new Set(prev); + newSet.delete(scheduleId); + return newSet; + }); + } + }; + const handleNavigateToScheduleDetail = (scheduleId: string) => { setViewingScheduleId(scheduleId); }; @@ -372,7 +457,11 @@ const SchedulesView: React.FC = ({ onClose }) => { }} className="text-gray-500 dark:text-gray-400 hover:text-blue-500 dark:hover:text-blue-400 hover:bg-blue-100/50 dark:hover:bg-blue-900/30" title={`Edit schedule ${job.id}`} - disabled={pausingScheduleIds.has(job.id) || deletingScheduleIds.has(job.id) || isSubmitting} + disabled={ + pausingScheduleIds.has(job.id) || + deletingScheduleIds.has(job.id) || + isSubmitting + } > @@ -392,10 +481,54 @@ const SchedulesView: React.FC = ({ onClose }) => { ? 'text-green-500 dark:text-green-400 hover:text-green-600 dark:hover:text-green-300 hover:bg-green-100/50 dark:hover:bg-green-900/30' : 'text-orange-500 dark:text-orange-400 hover:text-orange-600 dark:hover:text-orange-300 hover:bg-orange-100/50 dark:hover:bg-orange-900/30' }`} - title={job.paused ? `Unpause schedule ${job.id}` : `Pause schedule ${job.id}`} - disabled={pausingScheduleIds.has(job.id) || deletingScheduleIds.has(job.id)} + title={ + job.paused + ? `Unpause schedule ${job.id}` + : `Pause schedule ${job.id}` + } + disabled={ + pausingScheduleIds.has(job.id) || deletingScheduleIds.has(job.id) + } > - {job.paused ? : } + {job.paused ? ( + + ) : ( + + )} + + + )} + {job.currently_running && ( + <> + + )} @@ -408,7 +541,12 @@ const SchedulesView: React.FC = ({ onClose }) => { }} className="text-gray-500 dark:text-gray-400 hover:text-red-500 dark:hover:text-red-400 hover:bg-red-100/50 dark:hover:bg-red-900/30" title={`Delete schedule ${job.id}`} - disabled={pausingScheduleIds.has(job.id) || deletingScheduleIds.has(job.id)} + disabled={ + pausingScheduleIds.has(job.id) || + deletingScheduleIds.has(job.id) || + killingScheduleIds.has(job.id) || + inspectingScheduleIds.has(job.id) + } > diff --git a/ui/desktop/src/schedule.ts b/ui/desktop/src/schedule.ts index 4c300166..c60c68a5 100644 --- a/ui/desktop/src/schedule.ts +++ b/ui/desktop/src/schedule.ts @@ -7,6 +7,8 @@ import { updateSchedule as apiUpdateSchedule, sessionsHandler as apiGetScheduleSessions, runNowHandler as apiRunScheduleNow, + killRunningJob as apiKillRunningJob, + inspectRunningJob as apiInspectRunningJob, } from './api'; export interface ScheduledJob { @@ -16,6 +18,8 @@ export interface ScheduledJob { last_run?: string | null; currently_running?: boolean; paused?: boolean; + current_session_id?: string | null; + process_start_time?: string | null; } export interface ScheduleSession { @@ -151,3 +155,47 @@ export async function updateSchedule(scheduleId: string, cron: string): Promise< throw error; } } + +export interface KillJobResponse { + message: string; +} + +export interface InspectJobResponse { + sessionId?: string | null; + processStartTime?: string | null; + runningDurationSeconds?: number | null; +} + +export async function killRunningJob(scheduleId: string): Promise { + try { + const response = await apiKillRunningJob({ + path: { id: scheduleId }, + }); + + if (response && response.data) { + return response.data as KillJobResponse; + } + console.error('Unexpected response format from apiKillRunningJob', response); + throw new Error('Failed to kill running job: Unexpected response format'); + } catch (error) { + console.error(`Error killing running job ${scheduleId}:`, error); + throw error; + } +} + +export async function inspectRunningJob(scheduleId: string): Promise { + try { + const response = await apiInspectRunningJob({ + path: { id: scheduleId }, + }); + + if (response && response.data) { + return response.data as InspectJobResponse; + } + console.error('Unexpected response format from apiInspectRunningJob', response); + throw new Error('Failed to inspect running job: Unexpected response format'); + } catch (error) { + console.error(`Error inspecting running job ${scheduleId}:`, error); + throw error; + } +}