feat: add pause/unpause functionality for scheduled jobs (#2698)

This commit is contained in:
Max Novich
2025-05-28 12:20:03 -07:00
committed by GitHub
parent feb7b15c76
commit ea6a7a7847
10 changed files with 484 additions and 20 deletions

View File

@@ -33,6 +33,7 @@ pub async fn handle_schedule_add(
cron,
last_run: None,
currently_running: false,
paused: false,
};
let scheduler_storage_path =

View File

@@ -42,6 +42,8 @@ use utoipa::OpenApi;
super::routes::schedule::list_schedules,
super::routes::schedule::delete_schedule,
super::routes::schedule::run_now_handler,
super::routes::schedule::pause_schedule,
super::routes::schedule::unpause_schedule,
super::routes::schedule::sessions_handler
),
components(schemas(

View File

@@ -94,6 +94,7 @@ async fn create_schedule(
cron: req.cron,
last_run: None,
currently_running: false,
paused: false,
};
scheduler
.add_scheduled_job(job.clone())
@@ -260,12 +261,86 @@ async fn sessions_handler(
}
}
#[utoipa::path(
post,
path = "/schedule/{id}/pause",
params(
("id" = String, Path, description = "ID of the schedule to pause")
),
responses(
(status = 204, description = "Scheduled job paused successfully"),
(status = 404, description = "Scheduled job not found"),
(status = 400, description = "Cannot pause a currently running job"),
(status = 500, description = "Internal server error")
),
tag = "schedule"
)]
#[axum::debug_handler]
async fn pause_schedule(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
Path(id): Path<String>,
) -> Result<StatusCode, StatusCode> {
verify_secret_key(&headers, &state)?;
let scheduler = state
.scheduler()
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
scheduler.pause_schedule(&id).await.map_err(|e| {
eprintln!("Error pausing schedule '{}': {:?}", id, e);
match e {
goose::scheduler::SchedulerError::JobNotFound(_) => StatusCode::NOT_FOUND,
goose::scheduler::SchedulerError::AnyhowError(_) => StatusCode::BAD_REQUEST,
_ => StatusCode::INTERNAL_SERVER_ERROR,
}
})?;
Ok(StatusCode::NO_CONTENT)
}
#[utoipa::path(
post,
path = "/schedule/{id}/unpause",
params(
("id" = String, Path, description = "ID of the schedule to unpause")
),
responses(
(status = 204, description = "Scheduled job unpaused successfully"),
(status = 404, description = "Scheduled job not found"),
(status = 500, description = "Internal server error")
),
tag = "schedule"
)]
#[axum::debug_handler]
async fn unpause_schedule(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
Path(id): Path<String>,
) -> Result<StatusCode, StatusCode> {
verify_secret_key(&headers, &state)?;
let scheduler = state
.scheduler()
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
scheduler.unpause_schedule(&id).await.map_err(|e| {
eprintln!("Error unpausing schedule '{}': {:?}", id, e);
match e {
goose::scheduler::SchedulerError::JobNotFound(_) => StatusCode::NOT_FOUND,
_ => StatusCode::INTERNAL_SERVER_ERROR,
}
})?;
Ok(StatusCode::NO_CONTENT)
}
pub fn routes(state: Arc<AppState>) -> Router {
Router::new()
.route("/schedule/create", post(create_schedule))
.route("/schedule/list", get(list_schedules))
.route("/schedule/delete/{id}", delete(delete_schedule)) // Corrected
.route("/schedule/{id}/run_now", post(run_now_handler)) // Corrected
.route("/schedule/{id}/pause", post(pause_schedule))
.route("/schedule/{id}/unpause", post(unpause_schedule))
.route("/schedule/{id}/sessions", get(sessions_handler)) // Corrected
.with_state(state)
}

View File

@@ -109,6 +109,8 @@ pub struct ScheduledJob {
pub last_run: Option<DateTime<Utc>>,
#[serde(default)]
pub currently_running: bool,
#[serde(default)]
pub paused: bool,
}
async fn persist_jobs_from_arc(
@@ -219,6 +221,21 @@ impl Scheduler {
let job_to_execute = job_for_task.clone(); // Clone for run_scheduled_job_internal
Box::pin(async move {
// Check if the job is paused before executing
let should_execute = {
let jobs_map_guard = current_jobs_arc.lock().await;
if let Some((_, current_job_in_map)) = jobs_map_guard.get(&task_job_id) {
!current_job_in_map.paused
} else {
false
}
};
if !should_execute {
tracing::info!("Skipping execution of paused job '{}'", &task_job_id);
return;
}
let current_time = Utc::now();
let mut needs_persist = false;
{
@@ -319,6 +336,21 @@ impl Scheduler {
let job_to_execute = job_for_task.clone(); // Clone for run_scheduled_job_internal
Box::pin(async move {
// Check if the job is paused before executing
let should_execute = {
let jobs_map_guard = current_jobs_arc.lock().await;
if let Some((_, stored_job)) = jobs_map_guard.get(&task_job_id) {
!stored_job.paused
} else {
false
}
};
if !should_execute {
tracing::info!("Skipping execution of paused job '{}'", &task_job_id);
return;
}
let current_time = Utc::now();
let mut needs_persist = false;
{
@@ -515,6 +547,36 @@ impl Scheduler {
))),
}
}
pub async fn pause_schedule(&self, sched_id: &str) -> Result<(), SchedulerError> {
let mut jobs_guard = self.jobs.lock().await;
match jobs_guard.get_mut(sched_id) {
Some((_, job_def)) => {
if job_def.currently_running {
return Err(SchedulerError::AnyhowError(anyhow!(
"Cannot pause schedule '{}' while it's currently running",
sched_id
)));
}
job_def.paused = true;
self.persist_jobs_to_storage_with_guard(&jobs_guard).await?;
Ok(())
}
None => Err(SchedulerError::JobNotFound(sched_id.to_string())),
}
}
pub async fn unpause_schedule(&self, sched_id: &str) -> Result<(), SchedulerError> {
let mut jobs_guard = self.jobs.lock().await;
match jobs_guard.get_mut(sched_id) {
Some((_, job_def)) => {
job_def.paused = false;
self.persist_jobs_to_storage_with_guard(&jobs_guard).await?;
Ok(())
}
None => Err(SchedulerError::JobNotFound(sched_id.to_string())),
}
}
}
#[derive(Debug)]
@@ -858,6 +920,7 @@ mod tests {
cron: "* * * * * * ".to_string(), // Runs every second for quick testing
last_run: None,
currently_running: false,
paused: false,
};
// Create the mock provider instance for the test

View File

@@ -539,6 +539,39 @@
}
}
},
"/schedule/{id}/pause": {
"post": {
"tags": [
"schedule"
],
"operationId": "pause_schedule",
"parameters": [
{
"name": "id",
"in": "path",
"description": "ID of the schedule to pause",
"required": true,
"schema": {
"type": "string"
}
}
],
"responses": {
"204": {
"description": "Scheduled job paused successfully"
},
"400": {
"description": "Cannot pause a currently running job"
},
"404": {
"description": "Scheduled job not found"
},
"500": {
"description": "Internal server error"
}
}
}
},
"/schedule/{id}/run_now": {
"post": {
"tags": [
@@ -623,6 +656,36 @@
}
}
},
"/schedule/{id}/unpause": {
"post": {
"tags": [
"schedule"
],
"operationId": "unpause_schedule",
"parameters": [
{
"name": "id",
"in": "path",
"description": "ID of the schedule to unpause",
"required": true,
"schema": {
"type": "string"
}
}
],
"responses": {
"204": {
"description": "Scheduled job unpaused successfully"
},
"404": {
"description": "Scheduled job not found"
},
"500": {
"description": "Internal server error"
}
}
}
},
"/sessions": {
"get": {
"tags": [
@@ -1703,6 +1766,9 @@
"format": "date-time",
"nullable": true
},
"paused": {
"type": "boolean"
},
"source": {
"type": "string"
}

View File

@@ -1,7 +1,7 @@
// This file is auto-generated by @hey-api/openapi-ts
import type { Options as ClientOptions, TDataShape, Client } from '@hey-api/client-fetch';
import type { GetToolsData, GetToolsResponse, ReadAllConfigData, ReadAllConfigResponse, BackupConfigData, BackupConfigResponse, GetExtensionsData, GetExtensionsResponse, AddExtensionData, AddExtensionResponse, RemoveExtensionData, RemoveExtensionResponse, InitConfigData, InitConfigResponse, UpsertPermissionsData, UpsertPermissionsResponse, ProvidersData, ProvidersResponse2, ReadConfigData, RemoveConfigData, RemoveConfigResponse, UpsertConfigData, UpsertConfigResponse, ConfirmPermissionData, ManageContextData, ManageContextResponse, CreateScheduleData, CreateScheduleResponse, DeleteScheduleData, DeleteScheduleResponse, ListSchedulesData, ListSchedulesResponse2, RunNowHandlerData, RunNowHandlerResponse, SessionsHandlerData, SessionsHandlerResponse, ListSessionsData, ListSessionsResponse, GetSessionHistoryData, GetSessionHistoryResponse } from './types.gen';
import type { GetToolsData, GetToolsResponse, ReadAllConfigData, ReadAllConfigResponse, BackupConfigData, BackupConfigResponse, GetExtensionsData, GetExtensionsResponse, AddExtensionData, AddExtensionResponse, RemoveExtensionData, RemoveExtensionResponse, InitConfigData, InitConfigResponse, UpsertPermissionsData, UpsertPermissionsResponse, ProvidersData, ProvidersResponse2, ReadConfigData, RemoveConfigData, RemoveConfigResponse, UpsertConfigData, UpsertConfigResponse, ConfirmPermissionData, ManageContextData, ManageContextResponse, CreateScheduleData, CreateScheduleResponse, DeleteScheduleData, DeleteScheduleResponse, ListSchedulesData, ListSchedulesResponse2, PauseScheduleData, PauseScheduleResponse, RunNowHandlerData, RunNowHandlerResponse, SessionsHandlerData, SessionsHandlerResponse, UnpauseScheduleData, UnpauseScheduleResponse, ListSessionsData, ListSessionsResponse, GetSessionHistoryData, GetSessionHistoryResponse } from './types.gen';
import { client as _heyApiClient } from './client.gen';
export type Options<TData extends TDataShape = TDataShape, ThrowOnError extends boolean = boolean> = ClientOptions<TData, ThrowOnError> & {
@@ -169,6 +169,13 @@ export const listSchedules = <ThrowOnError extends boolean = false>(options?: Op
});
};
export const pauseSchedule = <ThrowOnError extends boolean = false>(options: Options<PauseScheduleData, ThrowOnError>) => {
return (options.client ?? _heyApiClient).post<PauseScheduleResponse, unknown, ThrowOnError>({
url: '/schedule/{id}/pause',
...options
});
};
export const runNowHandler = <ThrowOnError extends boolean = false>(options: Options<RunNowHandlerData, ThrowOnError>) => {
return (options.client ?? _heyApiClient).post<RunNowHandlerResponse, unknown, ThrowOnError>({
url: '/schedule/{id}/run_now',
@@ -183,6 +190,13 @@ export const sessionsHandler = <ThrowOnError extends boolean = false>(options: O
});
};
export const unpauseSchedule = <ThrowOnError extends boolean = false>(options: Options<UnpauseScheduleData, ThrowOnError>) => {
return (options.client ?? _heyApiClient).post<UnpauseScheduleResponse, unknown, ThrowOnError>({
url: '/schedule/{id}/unpause',
...options
});
};
export const listSessions = <ThrowOnError extends boolean = false>(options?: Options<ListSessionsData, ThrowOnError>) => {
return (options?.client ?? _heyApiClient).get<ListSessionsResponse, unknown, ThrowOnError>({
url: '/sessions',

View File

@@ -307,6 +307,7 @@ export type ScheduledJob = {
currently_running?: boolean;
id: string;
last_run?: string | null;
paused?: boolean;
source: string;
};
@@ -963,6 +964,42 @@ export type ListSchedulesResponses = {
export type ListSchedulesResponse2 = ListSchedulesResponses[keyof ListSchedulesResponses];
export type PauseScheduleData = {
body?: never;
path: {
/**
* ID of the schedule to pause
*/
id: string;
};
query?: never;
url: '/schedule/{id}/pause';
};
export type PauseScheduleErrors = {
/**
* Cannot pause a currently running job
*/
400: unknown;
/**
* Scheduled job not found
*/
404: unknown;
/**
* Internal server error
*/
500: unknown;
};
export type PauseScheduleResponses = {
/**
* Scheduled job paused successfully
*/
204: void;
};
export type PauseScheduleResponse = PauseScheduleResponses[keyof PauseScheduleResponses];
export type RunNowHandlerData = {
body?: never;
path: {
@@ -1025,6 +1062,38 @@ export type SessionsHandlerResponses = {
export type SessionsHandlerResponse = SessionsHandlerResponses[keyof SessionsHandlerResponses];
export type UnpauseScheduleData = {
body?: never;
path: {
/**
* ID of the schedule to unpause
*/
id: string;
};
query?: never;
url: '/schedule/{id}/unpause';
};
export type UnpauseScheduleErrors = {
/**
* Scheduled job not found
*/
404: unknown;
/**
* Internal server error
*/
500: unknown;
};
export type UnpauseScheduleResponses = {
/**
* Scheduled job unpaused successfully
*/
204: void;
};
export type UnpauseScheduleResponse = UnpauseScheduleResponses[keyof UnpauseScheduleResponses];
export type ListSessionsData = {
body?: never;
path?: never;

View File

@@ -5,10 +5,10 @@ import BackButton from '../ui/BackButton';
import { Card } from '../ui/card';
import MoreMenuLayout from '../more_menu/MoreMenuLayout';
import { fetchSessionDetails, SessionDetails } from '../../sessions';
import { getScheduleSessions, runScheduleNow, listSchedules, ScheduledJob } from '../../schedule';
import { getScheduleSessions, runScheduleNow, pauseSchedule, unpauseSchedule, listSchedules, ScheduledJob } from '../../schedule';
import SessionHistoryView from '../sessions/SessionHistoryView';
import { toastError, toastSuccess } from '../../toasts';
import { Loader2 } from 'lucide-react';
import { Loader2, Pause, Play } from 'lucide-react';
import cronstrue from 'cronstrue';
interface ScheduleSessionMeta {
@@ -128,6 +128,38 @@ const ScheduleDetailView: React.FC<ScheduleDetailViewProps> = ({ scheduleId, onN
}
};
const handlePauseSchedule = async () => {
if (!scheduleId) return;
try {
await pauseSchedule(scheduleId);
toastSuccess({
title: 'Schedule Paused',
msg: `Successfully paused schedule "${scheduleId}"`,
});
fetchScheduleDetails(scheduleId);
} catch (err) {
console.error('Failed to pause schedule:', err);
const errorMsg = err instanceof Error ? err.message : 'Failed to pause schedule';
toastError({ title: 'Pause Schedule Error', msg: errorMsg });
}
};
const handleUnpauseSchedule = async () => {
if (!scheduleId) return;
try {
await unpauseSchedule(scheduleId);
toastSuccess({
title: 'Schedule Unpaused',
msg: `Successfully unpaused schedule "${scheduleId}"`,
});
fetchScheduleDetails(scheduleId);
} catch (err) {
console.error('Failed to unpause schedule:', err);
const errorMsg = err instanceof Error ? err.message : 'Failed to unpause schedule';
toastError({ title: 'Unpause Schedule Error', msg: errorMsg });
}
};
// Add a periodic refresh for schedule details to keep the running status up to date
useEffect(() => {
if (!scheduleId) return;
@@ -255,12 +287,20 @@ const ScheduleDetailView: React.FC<ScheduleDetailViewProps> = ({ scheduleId, onN
<h3 className="text-base font-semibold text-gray-900 dark:text-white">
{scheduleDetails.id}
</h3>
{scheduleDetails.currently_running && (
<div className="mt-2 md:mt-0 text-sm text-green-500 dark:text-green-400 font-semibold flex items-center">
<span className="inline-block w-2 h-2 bg-green-500 dark:bg-green-400 rounded-full mr-1 animate-pulse"></span>
Currently Running
</div>
)}
<div className="mt-2 md:mt-0 flex items-center gap-2">
{scheduleDetails.currently_running && (
<div className="text-sm text-green-500 dark:text-green-400 font-semibold flex items-center">
<span className="inline-block w-2 h-2 bg-green-500 dark:bg-green-400 rounded-full mr-1 animate-pulse"></span>
Currently Running
</div>
)}
{scheduleDetails.paused && (
<div className="text-sm text-orange-500 dark:text-orange-400 font-semibold flex items-center">
<Pause className="w-3 h-3 mr-1" />
Paused
</div>
)}
</div>
</div>
<p className="text-sm text-gray-600 dark:text-gray-300">
<span className="font-semibold">Schedule:</span>{' '}
@@ -285,16 +325,49 @@ const ScheduleDetailView: React.FC<ScheduleDetailViewProps> = ({ scheduleId, onN
<section>
<h2 className="text-xl font-semibold text-gray-900 dark:text-white mb-3">Actions</h2>
<Button
onClick={handleRunNow}
disabled={runNowLoading || scheduleDetails?.currently_running === true}
className="w-full md:w-auto"
>
{runNowLoading ? 'Triggering...' : 'Run Schedule Now'}
</Button>
<div className="flex flex-col md:flex-row gap-2">
<Button
onClick={handleRunNow}
disabled={runNowLoading || scheduleDetails?.currently_running === true}
className="w-full md:w-auto"
>
{runNowLoading ? 'Triggering...' : 'Run Schedule Now'}
</Button>
{scheduleDetails && !scheduleDetails.currently_running && (
<Button
onClick={scheduleDetails.paused ? handleUnpauseSchedule : handlePauseSchedule}
variant="outline"
className={`w-full md:w-auto flex items-center gap-2 ${
scheduleDetails.paused
? 'text-green-600 dark:text-green-400 border-green-300 dark:border-green-600 hover:bg-green-50 dark:hover:bg-green-900/20'
: 'text-orange-600 dark:text-orange-400 border-orange-300 dark:border-orange-600 hover:bg-orange-50 dark:hover:bg-orange-900/20'
}`}
>
{scheduleDetails.paused ? (
<>
<Play className="w-4 h-4" />
Unpause Schedule
</>
) : (
<>
<Pause className="w-4 h-4" />
Pause Schedule
</>
)}
</Button>
)}
</div>
{scheduleDetails?.currently_running && (
<p className="text-sm text-amber-600 dark:text-amber-400 mt-2">
Cannot trigger a schedule while it's already running.
Cannot trigger or modify a schedule while it's already running.
</p>
)}
{scheduleDetails?.paused && (
<p className="text-sm text-orange-600 dark:text-orange-400 mt-2">
This schedule is paused and will not run automatically. Use "Run Schedule Now" to trigger it manually or unpause to resume automatic execution.
</p>
)}
</section>

View File

@@ -1,14 +1,15 @@
import React, { useState, useEffect } from 'react';
import { listSchedules, createSchedule, deleteSchedule, ScheduledJob } from '../../schedule';
import { listSchedules, createSchedule, deleteSchedule, pauseSchedule, unpauseSchedule, ScheduledJob } from '../../schedule';
import BackButton from '../ui/BackButton';
import { ScrollArea } from '../ui/scroll-area';
import MoreMenuLayout from '../more_menu/MoreMenuLayout';
import { Card } from '../ui/card';
import { Button } from '../ui/button';
import { TrashIcon } from '../icons/TrashIcon';
import { Plus, RefreshCw } from 'lucide-react';
import { Plus, RefreshCw, Pause, Play } from 'lucide-react';
import { CreateScheduleModal, NewSchedulePayload } from './CreateScheduleModal';
import ScheduleDetailView from './ScheduleDetailView';
import { toastError, toastSuccess } from '../../toasts';
import cronstrue from 'cronstrue';
interface SchedulesViewProps {
@@ -123,6 +124,52 @@ const SchedulesView: React.FC<SchedulesViewProps> = ({ onClose }) => {
}
};
const handlePauseSchedule = async (idToPause: string) => {
setIsLoading(true);
setApiError(null);
try {
await pauseSchedule(idToPause);
toastSuccess({
title: 'Schedule Paused',
msg: `Successfully paused schedule "${idToPause}"`,
});
await fetchSchedules();
} catch (error) {
console.error(`Failed to pause schedule "${idToPause}":`, error);
const errorMsg = error instanceof Error ? error.message : `Unknown error pausing "${idToPause}".`;
setApiError(errorMsg);
toastError({
title: 'Pause Schedule Error',
msg: errorMsg,
});
} finally {
setIsLoading(false);
}
};
const handleUnpauseSchedule = async (idToUnpause: string) => {
setIsLoading(true);
setApiError(null);
try {
await unpauseSchedule(idToUnpause);
toastSuccess({
title: 'Schedule Unpaused',
msg: `Successfully unpaused schedule "${idToUnpause}"`,
});
await fetchSchedules();
} catch (error) {
console.error(`Failed to unpause schedule "${idToUnpause}":`, error);
const errorMsg = error instanceof Error ? error.message : `Unknown error unpausing "${idToUnpause}".`;
setApiError(errorMsg);
toastError({
title: 'Unpause Schedule Error',
msg: errorMsg,
});
} finally {
setIsLoading(false);
}
};
const handleNavigateToScheduleDetail = (scheduleId: string) => {
setViewingScheduleId(scheduleId);
};
@@ -237,8 +284,37 @@ const SchedulesView: React.FC<SchedulesViewProps> = ({ onClose }) => {
Currently Running
</p>
)}
{job.paused && (
<p className="text-xs text-orange-500 dark:text-orange-400 mt-1 font-semibold flex items-center">
<Pause className="w-3 h-3 mr-1" />
Paused
</p>
)}
</div>
<div className="flex-shrink-0">
<div className="flex-shrink-0 flex items-center gap-1">
{!job.currently_running && (
<Button
variant="ghost"
size="icon"
onClick={(e) => {
e.stopPropagation();
if (job.paused) {
handleUnpauseSchedule(job.id);
} else {
handlePauseSchedule(job.id);
}
}}
className={`${
job.paused
? 'text-green-500 dark:text-green-400 hover:text-green-600 dark:hover:text-green-300 hover:bg-green-100/50 dark:hover:bg-green-900/30'
: 'text-orange-500 dark:text-orange-400 hover:text-orange-600 dark:hover:text-orange-300 hover:bg-orange-100/50 dark:hover:bg-orange-900/30'
}`}
title={job.paused ? `Unpause schedule ${job.id}` : `Pause schedule ${job.id}`}
disabled={isLoading}
>
{job.paused ? <Play className="w-4 h-4" /> : <Pause className="w-4 h-4" />}
</Button>
)}
<Button
variant="ghost"
size="icon"

View File

@@ -2,6 +2,8 @@ import {
listSchedules as apiListSchedules,
createSchedule as apiCreateSchedule,
deleteSchedule as apiDeleteSchedule,
pauseSchedule as apiPauseSchedule,
unpauseSchedule as apiUnpauseSchedule,
sessionsHandler as apiGetScheduleSessions,
runNowHandler as apiRunScheduleNow,
} from './api';
@@ -12,6 +14,7 @@ export interface ScheduledJob {
cron: string;
last_run?: string | null;
currently_running?: boolean;
paused?: boolean;
}
export interface ScheduleSession {
@@ -107,3 +110,25 @@ export async function runScheduleNow(scheduleId: string): Promise<string> {
throw error;
}
}
export async function pauseSchedule(scheduleId: string): Promise<void> {
try {
await apiPauseSchedule<true>({
path: { id: scheduleId },
});
} catch (error) {
console.error(`Error pausing schedule ${scheduleId}:`, error);
throw error;
}
}
export async function unpauseSchedule(scheduleId: string): Promise<void> {
try {
await apiUnpauseSchedule<true>({
path: { id: scheduleId },
});
} catch (error) {
console.error(`Error unpausing schedule ${scheduleId}:`, error);
throw error;
}
}