Revert "Mnovich/temporal scheduler (#2745)" (#2839)

This commit is contained in:
Max Novich
2025-06-09 15:41:46 -07:00
committed by GitHub
parent e6f89cde76
commit 996f727ec5
35 changed files with 172 additions and 4896 deletions

16
Cargo.lock generated
View File

@@ -3611,22 +3611,6 @@ dependencies = [
"xcap",
]
[[package]]
name = "goose-scheduler-executor"
version = "0.1.0"
dependencies = [
"anyhow",
"clap 4.5.31",
"futures",
"goose",
"mcp-core",
"serde_json",
"serde_yaml",
"tokio",
"tracing",
"tracing-subscriber",
]
[[package]]
name = "goose-server"
version = "1.0.26"

View File

@@ -93,10 +93,10 @@ run-ui-only:
# Run UI with alpha changes
run-ui-alpha temporal="true":
run-ui-alpha:
@just release-binary
@echo "Running UI with {{ if temporal == "true" { "Temporal" } else { "Legacy" } }} scheduler..."
cd ui/desktop && npm install && ALPHA=true GOOSE_SCHEDULER_TYPE={{ if temporal == "true" { "temporal" } else { "legacy" } }} npm run start-alpha-gui
@echo "Running UI..."
cd ui/desktop && npm install && ALPHA=true npm run start-alpha-gui
# Run UI with latest (Windows version)
run-ui-windows:

View File

@@ -1,81 +0,0 @@
# TemporalScheduler gRPC Detection Fix - COMPLETED ✅
## Critical Issue Resolved
**Error**: `Port 7233 is already in use by something other than a Temporal server.`
**Root Cause**: The `check_temporal_server()` method was trying to communicate with Temporal server using HTTP protocol on port 7233, but Temporal server actually uses **gRPC protocol** on that port.
## The Problem
```rust
// OLD (BROKEN) - Trying HTTP on gRPC port
async fn check_temporal_server(&self) -> bool {
match self.http_client.get(format!("{}/api/v1/namespaces", TEMPORAL_SERVER_URL)).send().await {
Ok(response) => response.status().is_success(),
Err(_) => false,
}
}
```
This would always return `false` even when a perfectly good Temporal server was running, causing the scheduler to think port 7233 was occupied by "something other than a Temporal server."
## The Solution
```rust
// NEW (WORKING) - Multi-protocol detection
async fn check_temporal_server(&self) -> bool {
// First try the web UI (which uses HTTP)
if let Ok(response) = self.http_client.get("http://localhost:8233/").send().await {
if response.status().is_success() {
return true;
}
}
// Alternative: check if we can establish a TCP connection to the gRPC port
use std::net::SocketAddr;
use std::time::Duration;
let addr: SocketAddr = "127.0.0.1:7233".parse().unwrap();
match std::net::TcpStream::connect_timeout(&addr, Duration::from_secs(2)) {
Ok(_) => {
info!("Detected Temporal server on port 7233 (gRPC connection successful)");
true
}
Err(_) => false,
}
}
```
## How It Works Now
1. **HTTP Check**: First tries to connect to Temporal Web UI on port 8233 (HTTP)
2. **gRPC Check**: If that fails, tries TCP connection to gRPC port 7233
3. **Smart Detection**: If either succeeds, recognizes it as a valid Temporal server
4. **Connection**: Connects to existing server instead of failing with port conflict
## Test Results
```
✅ Temporal server detection test completed
Temporal server detected: true
🎉 SUCCESS: Found existing Temporal server!
The scheduler will connect to it instead of failing
```
## Verification
- ✅ All unit tests pass
- ✅ Code compiles without warnings
- ✅ Clippy checks pass
- ✅ Real-world detection confirmed with existing server
- ✅ Port conflict logic verified
## Impact
- **No more false negatives**: Properly detects existing Temporal servers
- **No more crashes**: Connects to existing infrastructure gracefully
- **Better reliability**: Works with real Temporal deployments
- **Production ready**: Handles gRPC protocol correctly
## Files Modified
- `crates/goose/src/temporal_scheduler.rs` - Fixed detection logic
- Added comprehensive test for gRPC detection
## Commits
- **316bc12189**: Fix: Properly detect existing Temporal server using correct protocol
The TemporalScheduler now correctly handles the protocol differences and will successfully connect to existing Temporal servers instead of failing with misleading port conflict errors! 🎉

View File

@@ -1,125 +0,0 @@
# TemporalScheduler Port Conflict Fix - COMPLETED ✅
## Issue Fixed
The TemporalScheduler was crashing when Temporal services were already running, with errors like:
```
Error: Scheduler internal error: Port 7233 is already in use. Another Temporal server may be running.
```
This caused the goosed server to fail to start, preventing the desktop application from working.
## Root Cause
The original logic would:
1. Check if ports 7233 and 8080 were in use
2. If in use, immediately return an error
3. Never attempt to connect to existing services
This was problematic because:
- Users might have Temporal services already running
- Multiple instances of the application couldn't coexist
- The scheduler couldn't leverage existing infrastructure
## Solution Implemented
### 1. **Enhanced Service Detection Logic**
- **File**: `crates/goose/src/temporal_scheduler.rs`
- **Method**: `ensure_services_running()`
- **Improvement**: Now checks both services comprehensively before deciding what to start
```rust
async fn ensure_services_running(&self) -> Result<(), SchedulerError> {
// First, check if both services are already running
let temporal_running = self.check_temporal_server().await;
let go_service_running = self.health_check().await.unwrap_or(false);
if temporal_running && go_service_running {
info!("Both Temporal server and Go service are already running");
return Ok(());
}
// Handle various combinations of service states...
}
```
### 2. **Smart Port Conflict Resolution**
- **Temporal Server**: If port 7233 is in use, check if it's actually a Temporal server we can connect to
- **Go Service**: If port 8080 is in use, check if it's our Go service we can connect to
- **Only error if ports are used by incompatible services**
```rust
async fn start_temporal_server(&self) -> Result<(), SchedulerError> {
if self.check_port_in_use(7233).await {
// Port is in use - check if it's a Temporal server we can connect to
if self.check_temporal_server().await {
info!("Port 7233 is in use by a Temporal server we can connect to");
return Ok(());
} else {
return Err(SchedulerError::SchedulerInternalError(
"Port 7233 is already in use by something other than a Temporal server.".to_string(),
));
}
}
// ... start new server if needed
}
```
### 3. **Comprehensive Testing**
Added 4 unit tests:
- `test_sessions_method_exists_and_compiles` - Verifies sessions() method works
- `test_sessions_method_signature` - Compile-time signature verification
- `test_port_check_functionality` - Tests port checking logic
- `test_service_status_checking` - Tests service detection methods
### 4. **Improved Error Messages**
- Clear distinction between "port in use by compatible service" vs "port in use by incompatible service"
- Better logging for debugging service startup issues
- Informative messages about what services are detected
## Key Behavioral Changes
### Before (❌ Problematic)
```
1. Check if port 7233 is in use
2. If yes → Error: "Port already in use"
3. Application crashes
```
### After (✅ Fixed)
```
1. Check if port 7233 is in use
2. If yes → Check if it's a Temporal server
3. If it's a Temporal server → Connect to it
4. If it's not a Temporal server → Error with specific message
5. If port is free → Start new Temporal server
```
## Files Modified
- `crates/goose/src/temporal_scheduler.rs` - Main implementation
- Added comprehensive test suite
- Created verification script: `test_port_conflict_fix.sh`
## Verification Results
✅ All unit tests pass
✅ Code compiles without warnings
✅ Clippy checks pass
✅ Service detection logic verified
✅ Port checking functionality confirmed
## Commits Made
1. **cccbba4fd9**: Fix: Improve TemporalScheduler service detection and port conflict handling
2. **c645a4941f**: Fix: Connect to existing Temporal services instead of erroring on port conflicts
## Impact
- **No more crashes** when Temporal services are already running
- **Better resource utilization** by connecting to existing services
- **Improved user experience** - application starts reliably
- **Enhanced debugging** with better error messages and logging
- **Production ready** - handles real-world deployment scenarios
## Testing
Run the verification script to confirm all fixes are working:
```bash
./test_port_conflict_fix.sh
```
The TemporalScheduler now gracefully handles existing services and provides a robust, production-ready scheduling solution.

View File

@@ -11,8 +11,7 @@ use crate::commands::project::{handle_project_default, handle_projects_interacti
use crate::commands::recipe::{handle_deeplink, handle_validate};
// Import the new handlers from commands::schedule
use crate::commands::schedule::{
handle_schedule_add, handle_schedule_cron_help, handle_schedule_list, handle_schedule_remove,
handle_schedule_run_now, handle_schedule_services_status, handle_schedule_services_stop,
handle_schedule_add, handle_schedule_list, handle_schedule_remove, handle_schedule_run_now,
handle_schedule_sessions,
};
use crate::commands::session::{handle_session_list, handle_session_remove};
@@ -124,11 +123,7 @@ enum SchedulerCommand {
Add {
#[arg(long, help = "Unique ID for the job")]
id: String,
#[arg(
long,
help = "Cron expression for the schedule",
long_help = "Cron expression for when to run the job. Examples:\n '0 * * * *' - Every hour at minute 0\n '0 */2 * * *' - Every 2 hours\n '@hourly' - Every hour (shorthand)\n '0 9 * * *' - Every day at 9:00 AM\n '0 9 * * 1' - Every Monday at 9:00 AM\n '0 0 1 * *' - First day of every month at midnight"
)]
#[arg(long, help = "Cron string for the schedule (e.g., '0 0 * * * *')")]
cron: String,
#[arg(
long,
@@ -160,15 +155,6 @@ enum SchedulerCommand {
#[arg(long, help = "ID of the schedule to run")] // Explicitly make it --id
id: String,
},
/// Check status of Temporal services (temporal scheduler only)
#[command(about = "Check status of Temporal services")]
ServicesStatus {},
/// Stop Temporal services (temporal scheduler only)
#[command(about = "Stop Temporal services")]
ServicesStop {},
/// Show cron expression examples and help
#[command(about = "Show cron expression examples and help")]
CronHelp {},
}
#[derive(Subcommand)]
@@ -782,15 +768,6 @@ pub async fn cli() -> Result<()> {
// New arm
handle_schedule_run_now(id).await?;
}
SchedulerCommand::ServicesStatus {} => {
handle_schedule_services_status().await?;
}
SchedulerCommand::ServicesStop {} => {
handle_schedule_services_stop().await?;
}
SchedulerCommand::CronHelp {} => {
handle_schedule_cron_help().await?;
}
}
return Ok(());
}

View File

@@ -1,11 +1,9 @@
use anyhow::{bail, Context, Result};
use base64::engine::{general_purpose::STANDARD as BASE64_STANDARD, Engine};
use goose::scheduler::{
get_default_scheduled_recipes_dir, get_default_scheduler_storage_path, ScheduledJob,
get_default_scheduled_recipes_dir, get_default_scheduler_storage_path, ScheduledJob, Scheduler,
SchedulerError,
};
use goose::scheduler_factory::SchedulerFactory;
use goose::temporal_scheduler::TemporalScheduler;
use std::path::Path;
// Base64 decoding function - might be needed if recipe_source_arg can be base64
@@ -17,64 +15,6 @@ async fn _decode_base64_recipe(source: &str) -> Result<String> {
String::from_utf8(bytes).with_context(|| "Decoded Base64 recipe source is not valid UTF-8.")
}
fn validate_cron_expression(cron: &str) -> Result<()> {
// Basic validation and helpful suggestions
if cron.trim().is_empty() {
bail!("Cron expression cannot be empty");
}
// Check for common mistakes and provide helpful suggestions
let parts: Vec<&str> = cron.split_whitespace().collect();
match parts.len() {
5 => {
// Standard 5-field cron (minute hour day month weekday)
println!("✅ Using standard 5-field cron format: {}", cron);
}
6 => {
// 6-field cron with seconds (second minute hour day month weekday)
println!("✅ Using 6-field cron format with seconds: {}", cron);
}
1 if cron.starts_with('@') => {
// Shorthand expressions like @hourly, @daily, etc.
let valid_shorthands = [
"@yearly",
"@annually",
"@monthly",
"@weekly",
"@daily",
"@midnight",
"@hourly",
];
if valid_shorthands.contains(&cron) {
println!("✅ Using cron shorthand: {}", cron);
} else {
println!(
"⚠️ Unknown cron shorthand '{}'. Valid options: {}",
cron,
valid_shorthands.join(", ")
);
}
}
_ => {
println!("⚠️ Unusual cron format detected: '{}'", cron);
println!(" Common formats:");
println!(" - 5 fields: '0 * * * *' (minute hour day month weekday)");
println!(" - 6 fields: '0 0 * * * *' (second minute hour day month weekday)");
println!(" - Shorthand: '@hourly', '@daily', '@weekly', '@monthly'");
}
}
// Provide examples for common scheduling needs
if cron == "* * * * *" {
println!("⚠️ This will run every minute! Did you mean:");
println!(" - '0 * * * *' for every hour?");
println!(" - '0 0 * * *' for every day?");
}
Ok(())
}
pub async fn handle_schedule_add(
id: String,
cron: String,
@@ -85,9 +25,6 @@ pub async fn handle_schedule_add(
id, cron, recipe_source_arg
);
// Validate cron expression and provide helpful feedback
validate_cron_expression(&cron)?;
// The Scheduler's add_scheduled_job will handle copying the recipe from recipe_source_arg
// to its internal storage and validating the path.
let job = ScheduledJob {
@@ -103,7 +40,7 @@ pub async fn handle_schedule_add(
let scheduler_storage_path =
get_default_scheduler_storage_path().context("Failed to get scheduler storage path")?;
let scheduler = SchedulerFactory::create(scheduler_storage_path)
let scheduler = Scheduler::new(scheduler_storage_path)
.await
.context("Failed to initialize scheduler")?;
@@ -148,28 +85,19 @@ pub async fn handle_schedule_add(
pub async fn handle_schedule_list() -> Result<()> {
let scheduler_storage_path =
get_default_scheduler_storage_path().context("Failed to get scheduler storage path")?;
let scheduler = SchedulerFactory::create(scheduler_storage_path)
let scheduler = Scheduler::new(scheduler_storage_path)
.await
.context("Failed to initialize scheduler")?;
let jobs = scheduler.list_scheduled_jobs().await?;
let jobs = scheduler.list_scheduled_jobs().await;
if jobs.is_empty() {
println!("No scheduled jobs found.");
} else {
println!("Scheduled Jobs:");
for job in jobs {
let status = if job.currently_running {
"🟢 RUNNING"
} else if job.paused {
"⏸️ PAUSED"
} else {
"⏹️ IDLE"
};
println!(
"- ID: {}\n Status: {}\n Cron: {}\n Recipe Source (in store): {}\n Last Run: {}",
"- ID: {}\n Cron: {}\n Recipe Source (in store): {}\n Last Run: {}",
job.id,
status,
job.cron,
job.source, // This source is now the path within scheduled_recipes_dir
job.last_run
@@ -183,7 +111,7 @@ pub async fn handle_schedule_list() -> Result<()> {
pub async fn handle_schedule_remove(id: String) -> Result<()> {
let scheduler_storage_path =
get_default_scheduler_storage_path().context("Failed to get scheduler storage path")?;
let scheduler = SchedulerFactory::create(scheduler_storage_path)
let scheduler = Scheduler::new(scheduler_storage_path)
.await
.context("Failed to initialize scheduler")?;
@@ -205,7 +133,7 @@ pub async fn handle_schedule_remove(id: String) -> Result<()> {
pub async fn handle_schedule_sessions(id: String, limit: Option<u32>) -> Result<()> {
let scheduler_storage_path =
get_default_scheduler_storage_path().context("Failed to get scheduler storage path")?;
let scheduler = SchedulerFactory::create(scheduler_storage_path)
let scheduler = Scheduler::new(scheduler_storage_path)
.await
.context("Failed to initialize scheduler")?;
@@ -238,7 +166,7 @@ pub async fn handle_schedule_sessions(id: String, limit: Option<u32>) -> Result<
pub async fn handle_schedule_run_now(id: String) -> Result<()> {
let scheduler_storage_path =
get_default_scheduler_storage_path().context("Failed to get scheduler storage path")?;
let scheduler = SchedulerFactory::create(scheduler_storage_path)
let scheduler = Scheduler::new(scheduler_storage_path)
.await
.context("Failed to initialize scheduler")?;
@@ -258,131 +186,3 @@ pub async fn handle_schedule_run_now(id: String) -> Result<()> {
}
Ok(())
}
pub async fn handle_schedule_services_status() -> Result<()> {
// Check if we're using temporal scheduler
let scheduler_type =
std::env::var("GOOSE_SCHEDULER_TYPE").unwrap_or_else(|_| "temporal".to_string());
if scheduler_type != "temporal" {
println!("Service management is only available for temporal scheduler.");
println!("Set GOOSE_SCHEDULER_TYPE=temporal to use Temporal services.");
return Ok(());
}
println!("Checking Temporal services status...");
// Create a temporary TemporalScheduler to check status
match TemporalScheduler::new().await {
Ok(scheduler) => {
let info = scheduler.get_service_info().await;
println!("{}", info);
}
Err(e) => {
println!("Failed to check services: {}", e);
println!("\nThis might mean:");
println!("- Temporal CLI is not installed");
println!("- Go service binary is not built");
println!("- Services are not running");
}
}
Ok(())
}
pub async fn handle_schedule_services_stop() -> Result<()> {
// Check if we're using temporal scheduler
let scheduler_type =
std::env::var("GOOSE_SCHEDULER_TYPE").unwrap_or_else(|_| "temporal".to_string());
if scheduler_type != "temporal" {
println!("Service management is only available for temporal scheduler.");
println!("Set GOOSE_SCHEDULER_TYPE=temporal to use Temporal services.");
return Ok(());
}
println!("Stopping Temporal services...");
// Create a temporary TemporalScheduler to stop services
match TemporalScheduler::new().await {
Ok(scheduler) => match scheduler.stop_services().await {
Ok(result) => {
println!("{}", result);
println!("\nNote: Services were running independently and have been stopped.");
println!("They will be automatically restarted when needed.");
}
Err(e) => {
println!("Failed to stop services: {}", e);
}
},
Err(e) => {
println!("Failed to initialize scheduler: {}", e);
println!("Services may not be running or may have already been stopped.");
}
}
Ok(())
}
pub async fn handle_schedule_cron_help() -> Result<()> {
println!("📅 Cron Expression Guide for Goose Scheduler");
println!("===========================================\\n");
println!("🕐 HOURLY SCHEDULES (Most Common Request):");
println!(" 0 * * * * - Every hour at minute 0 (e.g., 1:00, 2:00, 3:00...)");
println!(" 30 * * * * - Every hour at minute 30 (e.g., 1:30, 2:30, 3:30...)");
println!(" 0 */2 * * * - Every 2 hours at minute 0 (e.g., 2:00, 4:00, 6:00...)");
println!(" 0 */3 * * * - Every 3 hours at minute 0 (e.g., 3:00, 6:00, 9:00...)");
println!(" @hourly - Every hour (same as \"0 * * * *\")\\n");
println!("📅 DAILY SCHEDULES:");
println!(" 0 9 * * * - Every day at 9:00 AM");
println!(" 30 14 * * * - Every day at 2:30 PM");
println!(" 0 0 * * * - Every day at midnight");
println!(" @daily - Every day at midnight\\n");
println!("📆 WEEKLY SCHEDULES:");
println!(" 0 9 * * 1 - Every Monday at 9:00 AM");
println!(" 0 17 * * 5 - Every Friday at 5:00 PM");
println!(" 0 0 * * 0 - Every Sunday at midnight");
println!(" @weekly - Every Sunday at midnight\\n");
println!("🗓️ MONTHLY SCHEDULES:");
println!(" 0 9 1 * * - First day of every month at 9:00 AM");
println!(" 0 0 15 * * - 15th of every month at midnight");
println!(" @monthly - First day of every month at midnight\\n");
println!("📝 CRON FORMAT:");
println!(" Standard 5-field: minute hour day month weekday");
println!(" ┌───────────── minute (0 - 59)");
println!(" │ ┌─────────── hour (0 - 23)");
println!(" │ │ ┌───────── day of month (1 - 31)");
println!(" │ │ │ ┌─────── month (1 - 12)");
println!(" │ │ │ │ ┌───── day of week (0 - 7, Sunday = 0 or 7)");
println!(" │ │ │ │ │");
println!(" * * * * *\\n");
println!("🔧 SPECIAL CHARACTERS:");
println!(" * - Any value (every minute, hour, day, etc.)");
println!(" */n - Every nth interval (*/5 = every 5 minutes)");
println!(" n-m - Range (1-5 = 1,2,3,4,5)");
println!(" n,m - List (1,3,5 = 1 or 3 or 5)\\n");
println!("⚡ SHORTHAND EXPRESSIONS:");
println!(" @yearly - Once a year (0 0 1 1 *)");
println!(" @monthly - Once a month (0 0 1 * *)");
println!(" @weekly - Once a week (0 0 * * 0)");
println!(" @daily - Once a day (0 0 * * *)");
println!(" @hourly - Once an hour (0 * * * *)\\n");
println!("💡 EXAMPLES:");
println!(
" goose schedule add --id hourly-report --cron \"0 * * * *\" --recipe-source report.yaml"
);
println!(
" goose schedule add --id daily-backup --cron \"@daily\" --recipe-source backup.yaml"
);
println!(" goose schedule add --id weekly-summary --cron \"0 9 * * 1\" --recipe-source summary.yaml");
Ok(())
}

View File

@@ -1,16 +0,0 @@
[package]
name = "goose-scheduler-executor"
version = "0.1.0"
edition = "2021"
[dependencies]
goose = { path = "../goose" }
mcp-core = { path = "../mcp-core" }
anyhow = "1.0"
tokio = { version = "1.0", features = ["full"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
clap = { version = "4.0", features = ["derive"] }
futures = "0.3"
serde_json = "1.0"
serde_yaml = "0.9"

View File

@@ -1,212 +0,0 @@
use anyhow::{anyhow, Result};
use clap::Parser;
use goose::agents::{Agent, SessionConfig};
use goose::config::Config;
use goose::message::Message;
use goose::providers::create;
use goose::recipe::Recipe;
use goose::session;
use std::env;
use std::fs;
use std::path::Path;
use tracing::info;
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
/// Job ID for the scheduled job
job_id: String,
/// Path to the recipe file to execute
recipe_path: String,
}
#[tokio::main]
async fn main() -> Result<()> {
// Initialize tracing
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.init();
let args = Args::parse();
info!("Starting goose-scheduler-executor for job: {}", args.job_id);
info!("Recipe path: {}", args.recipe_path);
// Execute the recipe and get session ID
let session_id = execute_recipe(&args.job_id, &args.recipe_path).await?;
// Output session ID to stdout (this is what the Go service expects)
println!("{}", session_id);
Ok(())
}
async fn execute_recipe(job_id: &str, recipe_path: &str) -> Result<String> {
let recipe_path_buf = Path::new(recipe_path);
// Check if recipe file exists
if !recipe_path_buf.exists() {
return Err(anyhow!("Recipe file not found: {}", recipe_path));
}
// Read and parse recipe
let recipe_content = fs::read_to_string(recipe_path_buf)?;
let recipe: Recipe = {
let extension = recipe_path_buf
.extension()
.and_then(|os_str| os_str.to_str())
.unwrap_or("yaml")
.to_lowercase();
match extension.as_str() {
"json" | "jsonl" => serde_json::from_str::<Recipe>(&recipe_content)
.map_err(|e| anyhow!("Failed to parse JSON recipe '{}': {}", recipe_path, e))?,
"yaml" | "yml" => serde_yaml::from_str::<Recipe>(&recipe_content)
.map_err(|e| anyhow!("Failed to parse YAML recipe '{}': {}", recipe_path, e))?,
_ => {
return Err(anyhow!(
"Unsupported recipe file extension '{}' for: {}",
extension,
recipe_path
));
}
}
};
// Create agent
let agent = Agent::new();
// Get provider configuration
let global_config = Config::global();
let provider_name: String = global_config.get_param("GOOSE_PROVIDER").map_err(|_| {
anyhow!("GOOSE_PROVIDER not configured. Run 'goose configure' or set env var.")
})?;
let model_name: String = global_config.get_param("GOOSE_MODEL").map_err(|_| {
anyhow!("GOOSE_MODEL not configured. Run 'goose configure' or set env var.")
})?;
let model_config = goose::model::ModelConfig::new(model_name);
let provider = create(&provider_name, model_config)
.map_err(|e| anyhow!("Failed to create provider '{}': {}", provider_name, e))?;
// Set provider on agent
agent
.update_provider(provider)
.await
.map_err(|e| anyhow!("Failed to set provider on agent: {}", e))?;
info!(
"Agent configured with provider '{}' for job '{}'",
provider_name, job_id
);
// Generate session ID
let session_id = session::generate_session_id();
// Check if recipe has a prompt
let Some(prompt_text) = recipe.prompt else {
info!(
"Recipe '{}' has no prompt to execute for job '{}'",
recipe_path, job_id
);
// Create empty session for consistency
let session_file_path = goose::session::storage::get_path(
goose::session::storage::Identifier::Name(session_id.clone()),
);
let metadata = goose::session::storage::SessionMetadata {
working_dir: env::current_dir().unwrap_or_default(),
description: "Empty job - no prompt".to_string(),
schedule_id: Some(job_id.to_string()),
message_count: 0,
..Default::default()
};
goose::session::storage::save_messages_with_metadata(&session_file_path, &metadata, &[])
.map_err(|e| anyhow!("Failed to persist metadata for empty job: {}", e))?;
return Ok(session_id);
};
// Create session configuration
let current_dir =
env::current_dir().map_err(|e| anyhow!("Failed to get current directory: {}", e))?;
let session_config = SessionConfig {
id: goose::session::storage::Identifier::Name(session_id.clone()),
working_dir: current_dir.clone(),
schedule_id: Some(job_id.to_string()),
};
// Execute the recipe
let mut messages = vec![Message::user().with_text(prompt_text)];
info!("Executing recipe for job '{}' with prompt", job_id);
let mut stream = agent
.reply(&messages, Some(session_config))
.await
.map_err(|e| anyhow!("Agent failed to reply for recipe '{}': {}", recipe_path, e))?;
// Process the response stream
use futures::StreamExt;
use goose::agents::AgentEvent;
while let Some(message_result) = stream.next().await {
match message_result {
Ok(AgentEvent::Message(msg)) => {
if msg.role == mcp_core::role::Role::Assistant {
info!("[Job {}] Assistant response received", job_id);
}
messages.push(msg);
}
Ok(AgentEvent::McpNotification(_)) => {
// Handle notifications if needed
}
Err(e) => {
return Err(anyhow!("Error receiving message from agent: {}", e));
}
}
}
// Save session
let session_file_path = goose::session::storage::get_path(
goose::session::storage::Identifier::Name(session_id.clone()),
);
// Try to read updated metadata, or create fallback
match goose::session::storage::read_metadata(&session_file_path) {
Ok(mut updated_metadata) => {
updated_metadata.message_count = messages.len();
goose::session::storage::save_messages_with_metadata(
&session_file_path,
&updated_metadata,
&messages,
)
.map_err(|e| anyhow!("Failed to persist final messages: {}", e))?;
}
Err(_) => {
let fallback_metadata = goose::session::storage::SessionMetadata {
working_dir: current_dir,
description: format!("Scheduled job: {}", job_id),
schedule_id: Some(job_id.to_string()),
message_count: messages.len(),
..Default::default()
};
goose::session::storage::save_messages_with_metadata(
&session_file_path,
&fallback_metadata,
&messages,
)
.map_err(|e| anyhow!("Failed to persist messages with fallback metadata: {}", e))?;
}
}
info!(
"Finished executing job '{}', session: {}",
job_id, session_id
);
Ok(session_id)
}

View File

@@ -6,7 +6,7 @@ use anyhow::Result;
use etcetera::{choose_app_strategy, AppStrategy};
use goose::agents::Agent;
use goose::config::APP_STRATEGY;
use goose::scheduler_factory::SchedulerFactory;
use goose::scheduler::Scheduler as GooseScheduler;
use tower_http::cors::{Any, CorsLayer};
use tracing::info;
@@ -28,7 +28,7 @@ pub async fn run() -> Result<()> {
.data_dir()
.join("schedules.json");
let scheduler_instance = SchedulerFactory::create(schedule_file_path).await?;
let scheduler_instance = GooseScheduler::new(schedule_file_path).await?;
app_state.set_scheduler(scheduler_instance).await;
let cors = CorsLayer::new()

View File

@@ -472,7 +472,7 @@ mod tests {
.unwrap()
.data_dir()
.join("schedules.json");
let sched = goose::scheduler_factory::SchedulerFactory::create_legacy(sched_storage_path)
let sched = goose::scheduler::Scheduler::new(sched_storage_path)
.await
.unwrap();
test_state.set_scheduler(sched).await;

View File

@@ -541,10 +541,9 @@ mod tests {
let state = AppState::new(Arc::new(agent), "test-secret".to_string()).await;
let scheduler_path = goose::scheduler::get_default_scheduler_storage_path()
.expect("Failed to get default scheduler storage path");
let scheduler =
goose::scheduler_factory::SchedulerFactory::create_legacy(scheduler_path)
.await
.unwrap();
let scheduler = goose::scheduler::Scheduler::new(scheduler_path)
.await
.unwrap();
state.set_scheduler(scheduler).await;
let app = routes(state);

View File

@@ -108,11 +108,6 @@ async fn create_schedule(
.scheduler()
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
tracing::info!(
"Server: Calling scheduler.add_scheduled_job() for job '{}'",
req.id
);
let job = ScheduledJob {
id: req.id,
source: req.recipe_source,
@@ -152,12 +147,7 @@ async fn list_schedules(
.scheduler()
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
tracing::info!("Server: Calling scheduler.list_scheduled_jobs()");
let jobs = scheduler.list_scheduled_jobs().await.map_err(|e| {
eprintln!("Error listing schedules: {:?}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
let jobs = scheduler.list_scheduled_jobs().await;
Ok(Json(ListSchedulesResponse { jobs }))
}
@@ -220,8 +210,6 @@ async fn run_now_handler(
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
tracing::info!("Server: Calling scheduler.run_now() for job '{}'", id);
match scheduler.run_now(&id).await {
Ok(session_id) => Ok(Json(RunNowResponse { session_id })),
Err(e) => {
@@ -420,10 +408,7 @@ async fn update_schedule(
})?;
// Return the updated schedule
let jobs = scheduler.list_scheduled_jobs().await.map_err(|e| {
eprintln!("Error listing schedules after update: {:?}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
let jobs = scheduler.list_scheduled_jobs().await;
let updated_job = jobs
.into_iter()
.find(|job| job.id == id)

View File

@@ -1,5 +1,5 @@
use goose::agents::Agent;
use goose::scheduler_trait::SchedulerTrait;
use goose::scheduler::Scheduler;
use std::sync::Arc;
use tokio::sync::Mutex;
@@ -9,7 +9,7 @@ pub type AgentRef = Arc<Agent>;
pub struct AppState {
agent: Option<AgentRef>,
pub secret_key: String,
pub scheduler: Arc<Mutex<Option<Arc<dyn SchedulerTrait>>>>,
pub scheduler: Arc<Mutex<Option<Arc<Scheduler>>>>,
}
impl AppState {
@@ -27,12 +27,12 @@ impl AppState {
.ok_or_else(|| anyhow::anyhow!("Agent needs to be created first."))
}
pub async fn set_scheduler(&self, sched: Arc<dyn SchedulerTrait>) {
pub async fn set_scheduler(&self, sched: Arc<Scheduler>) {
let mut guard = self.scheduler.lock().await;
*guard = Some(sched);
}
pub async fn scheduler(&self) -> Result<Arc<dyn SchedulerTrait>, anyhow::Error> {
pub async fn scheduler(&self) -> Result<Arc<Scheduler>, anyhow::Error> {
self.scheduler
.lock()
.await

View File

@@ -8,10 +8,7 @@ pub mod prompt_template;
pub mod providers;
pub mod recipe;
pub mod scheduler;
pub mod scheduler_factory;
pub mod scheduler_trait;
pub mod session;
pub mod temporal_scheduler;
pub mod token_counter;
pub mod tool_monitor;
pub mod tracing;

View File

@@ -5,7 +5,6 @@ use std::path::{Path, PathBuf};
use std::sync::Arc;
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use etcetera::{choose_app_strategy, AppStrategy};
use serde::{Deserialize, Serialize};
@@ -19,7 +18,6 @@ use crate::message::Message;
use crate::providers::base::Provider as GooseProvider; // Alias to avoid conflict in test section
use crate::providers::create;
use crate::recipe::Recipe;
use crate::scheduler_trait::SchedulerTrait;
use crate::session;
use crate::session::storage::SessionMetadata;
@@ -1373,57 +1371,3 @@ mod tests {
Ok(())
}
}
#[async_trait]
impl SchedulerTrait for Scheduler {
async fn add_scheduled_job(&self, job: ScheduledJob) -> Result<(), SchedulerError> {
self.add_scheduled_job(job).await
}
async fn list_scheduled_jobs(&self) -> Result<Vec<ScheduledJob>, SchedulerError> {
Ok(self.list_scheduled_jobs().await)
}
async fn remove_scheduled_job(&self, id: &str) -> Result<(), SchedulerError> {
self.remove_scheduled_job(id).await
}
async fn pause_schedule(&self, id: &str) -> Result<(), SchedulerError> {
self.pause_schedule(id).await
}
async fn unpause_schedule(&self, id: &str) -> Result<(), SchedulerError> {
self.unpause_schedule(id).await
}
async fn run_now(&self, id: &str) -> Result<String, SchedulerError> {
self.run_now(id).await
}
async fn sessions(
&self,
sched_id: &str,
limit: usize,
) -> Result<Vec<(String, SessionMetadata)>, SchedulerError> {
self.sessions(sched_id, limit).await
}
async fn update_schedule(
&self,
sched_id: &str,
new_cron: String,
) -> Result<(), SchedulerError> {
self.update_schedule(sched_id, new_cron).await
}
async fn kill_running_job(&self, sched_id: &str) -> Result<(), SchedulerError> {
self.kill_running_job(sched_id).await
}
async fn get_running_job_info(
&self,
sched_id: &str,
) -> Result<Option<(String, DateTime<Utc>)>, SchedulerError> {
self.get_running_job_info(sched_id).await
}
}

View File

@@ -1,75 +0,0 @@
use std::path::PathBuf;
use std::sync::Arc;
use crate::config::Config;
use crate::scheduler::{Scheduler, SchedulerError};
use crate::scheduler_trait::SchedulerTrait;
use crate::temporal_scheduler::TemporalScheduler;
pub enum SchedulerType {
Legacy,
Temporal,
}
impl SchedulerType {
/// Determine scheduler type from configuration
pub fn from_config() -> Self {
let config = Config::global();
match config.get_param::<String>("GOOSE_SCHEDULER_TYPE") {
Ok(scheduler_type) => match scheduler_type.to_lowercase().as_str() {
"temporal" => SchedulerType::Temporal,
"legacy" => SchedulerType::Legacy,
_ => {
tracing::warn!(
"Unknown scheduler type '{}', defaulting to legacy",
scheduler_type
);
SchedulerType::Legacy
}
},
Err(_) => {
// Default to temporal scheduler
SchedulerType::Temporal
}
}
}
}
/// Factory for creating scheduler instances
pub struct SchedulerFactory;
impl SchedulerFactory {
/// Create a scheduler instance based on configuration
pub async fn create(storage_path: PathBuf) -> Result<Arc<dyn SchedulerTrait>, SchedulerError> {
let scheduler_type = SchedulerType::from_config();
match scheduler_type {
SchedulerType::Legacy => {
tracing::info!("Creating legacy scheduler");
let scheduler = Scheduler::new(storage_path).await?;
Ok(scheduler as Arc<dyn SchedulerTrait>)
}
SchedulerType::Temporal => {
tracing::info!("Creating Temporal scheduler");
let scheduler = TemporalScheduler::new().await?;
Ok(scheduler as Arc<dyn SchedulerTrait>)
}
}
}
/// Create a specific scheduler type (for testing or explicit use)
pub async fn create_legacy(
storage_path: PathBuf,
) -> Result<Arc<dyn SchedulerTrait>, SchedulerError> {
tracing::info!("Creating legacy scheduler (explicit)");
let scheduler = Scheduler::new(storage_path).await?;
Ok(scheduler as Arc<dyn SchedulerTrait>)
}
/// Create a Temporal scheduler (for testing or explicit use)
pub async fn create_temporal() -> Result<Arc<dyn SchedulerTrait>, SchedulerError> {
tracing::info!("Creating Temporal scheduler (explicit)");
let scheduler = TemporalScheduler::new().await?;
Ok(scheduler as Arc<dyn SchedulerTrait>)
}
}

View File

@@ -1,47 +0,0 @@
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use crate::scheduler::{ScheduledJob, SchedulerError};
use crate::session::storage::SessionMetadata;
/// Common trait for all scheduler implementations
#[async_trait]
pub trait SchedulerTrait: Send + Sync {
/// Add a new scheduled job
async fn add_scheduled_job(&self, job: ScheduledJob) -> Result<(), SchedulerError>;
/// List all scheduled jobs
async fn list_scheduled_jobs(&self) -> Result<Vec<ScheduledJob>, SchedulerError>;
/// Remove a scheduled job by ID
async fn remove_scheduled_job(&self, id: &str) -> Result<(), SchedulerError>;
/// Pause a scheduled job
async fn pause_schedule(&self, id: &str) -> Result<(), SchedulerError>;
/// Unpause a scheduled job
async fn unpause_schedule(&self, id: &str) -> Result<(), SchedulerError>;
/// Run a job immediately
async fn run_now(&self, id: &str) -> Result<String, SchedulerError>;
/// Get sessions for a scheduled job
async fn sessions(
&self,
sched_id: &str,
limit: usize,
) -> Result<Vec<(String, SessionMetadata)>, SchedulerError>;
/// Update a schedule's cron expression
async fn update_schedule(&self, sched_id: &str, new_cron: String)
-> Result<(), SchedulerError>;
/// Kill a running job
async fn kill_running_job(&self, sched_id: &str) -> Result<(), SchedulerError>;
/// Get information about a running job
async fn get_running_job_info(
&self,
sched_id: &str,
) -> Result<Option<(String, DateTime<Utc>)>, SchedulerError>;
}

View File

@@ -1,870 +0,0 @@
use std::process::Command;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use reqwest::Client;
use serde::{Deserialize, Serialize};
use tokio::time::sleep;
use tracing::{info, warn};
use crate::scheduler::{ScheduledJob, SchedulerError};
use crate::scheduler_trait::SchedulerTrait;
use crate::session::storage::SessionMetadata;
const TEMPORAL_SERVICE_URL: &str = "http://localhost:8080";
const TEMPORAL_SERVICE_STARTUP_TIMEOUT: Duration = Duration::from_secs(30);
const TEMPORAL_SERVICE_HEALTH_CHECK_INTERVAL: Duration = Duration::from_secs(2);
#[derive(Serialize, Deserialize, Debug)]
struct JobRequest {
action: String,
job_id: Option<String>,
cron: Option<String>,
recipe_path: Option<String>,
}
#[derive(Serialize, Deserialize, Debug)]
struct JobResponse {
success: bool,
message: String,
jobs: Option<Vec<TemporalJobStatus>>,
data: Option<serde_json::Value>,
}
#[derive(Serialize, Deserialize, Debug)]
struct TemporalJobStatus {
id: String,
cron: String,
recipe_path: String,
last_run: Option<String>,
next_run: Option<String>,
currently_running: bool,
paused: bool,
created_at: String,
}
#[derive(Serialize, Deserialize, Debug)]
struct RunNowResponse {
session_id: String,
}
pub struct TemporalScheduler {
http_client: Client,
service_url: String,
}
impl TemporalScheduler {
pub async fn new() -> Result<Arc<Self>, SchedulerError> {
let http_client = Client::new();
let service_url = TEMPORAL_SERVICE_URL.to_string();
let scheduler = Arc::new(Self {
http_client,
service_url,
});
// Check if services are running, start them if needed
scheduler.ensure_services_running().await?;
// Wait for service to be ready
scheduler.wait_for_service_ready().await?;
info!("TemporalScheduler initialized successfully");
Ok(scheduler)
}
async fn ensure_services_running(&self) -> Result<(), SchedulerError> {
info!("Checking if Temporal services are running...");
// First, check if both services are already running
let temporal_running = self.check_temporal_server().await;
let go_service_running = self.health_check().await.unwrap_or(false);
if temporal_running && go_service_running {
info!("Both Temporal server and Go service are already running");
return Ok(());
}
// If Go service is running but Temporal server is not, this is an unusual state
if go_service_running && !temporal_running {
warn!("Go service is running but Temporal server is not - this may indicate a configuration issue");
return Err(SchedulerError::SchedulerInternalError(
"Go service is running but Temporal server is not accessible. Please check your Temporal server configuration.".to_string()
));
}
// If Temporal server is running but Go service is not, start the Go service
if temporal_running && !go_service_running {
info!("Temporal server is running, starting Go service...");
self.start_go_service().await?;
return Ok(());
}
// If neither is running, start both
if !temporal_running {
info!("Starting Temporal server...");
self.start_temporal_server().await?;
// Wait for Temporal server to be ready
self.wait_for_temporal_server().await?;
}
// Now start the Go service
if !self.health_check().await.unwrap_or(false) {
info!("Starting Temporal Go service...");
self.start_go_service().await?;
}
Ok(())
}
async fn check_temporal_server(&self) -> bool {
// Temporal server uses gRPC on port 7233, not HTTP
// We should check the web UI port (8233) instead, or use a different method
// First try the web UI (which uses HTTP)
if let Ok(response) = self.http_client.get("http://localhost:8233/").send().await {
if response.status().is_success() {
return true;
}
}
// Alternative: check if we can establish a TCP connection to the gRPC port
use std::net::SocketAddr;
use std::time::Duration;
let addr: SocketAddr = "127.0.0.1:7233".parse().unwrap();
match std::net::TcpStream::connect_timeout(&addr, Duration::from_secs(2)) {
Ok(_) => {
info!("Detected Temporal server on port 7233 (gRPC connection successful)");
true
}
Err(_) => false,
}
}
async fn start_temporal_server(&self) -> Result<(), SchedulerError> {
info!("Starting Temporal server in background...");
// Check if port 7233 is already in use
if self.check_port_in_use(7233).await {
// Port is in use - check if it's a Temporal server we can connect to
if self.check_temporal_server().await {
info!("Port 7233 is in use by a Temporal server we can connect to");
return Ok(());
} else {
return Err(SchedulerError::SchedulerInternalError(
"Port 7233 is already in use by something other than a Temporal server."
.to_string(),
));
}
}
let output = Command::new("sh")
.arg("-c")
.arg("nohup temporal server start-dev --db-filename temporal.db --port 7233 --ui-port 8233 --log-level warn > temporal-server.log 2>&1 & echo $!")
.output()
.map_err(|e| SchedulerError::SchedulerInternalError(
format!("Failed to start Temporal server: {}. Make sure 'temporal' CLI is installed.", e)
))?;
if !output.status.success() {
return Err(SchedulerError::SchedulerInternalError(format!(
"Failed to start Temporal server: {}",
String::from_utf8_lossy(&output.stderr)
)));
}
let pid_output = String::from_utf8_lossy(&output.stdout);
let pid = pid_output.trim();
info!("Temporal server started with PID: {}", pid);
Ok(())
}
async fn check_port_in_use(&self, port: u16) -> bool {
use std::net::{SocketAddr, TcpListener};
let addr: SocketAddr = format!("127.0.0.1:{}", port).parse().unwrap();
TcpListener::bind(addr).is_err()
}
async fn wait_for_temporal_server(&self) -> Result<(), SchedulerError> {
info!("Waiting for Temporal server to be ready...");
let start_time = std::time::Instant::now();
while start_time.elapsed() < TEMPORAL_SERVICE_STARTUP_TIMEOUT {
if self.check_temporal_server().await {
info!("Temporal server is ready");
return Ok(());
}
sleep(TEMPORAL_SERVICE_HEALTH_CHECK_INTERVAL).await;
}
Err(SchedulerError::SchedulerInternalError(
"Temporal server failed to become ready within timeout".to_string(),
))
}
async fn start_go_service(&self) -> Result<(), SchedulerError> {
info!("Starting Temporal Go service in background...");
// Check if port 8080 is already in use
if self.check_port_in_use(8080).await {
// Port is in use - check if it's our Go service we can connect to
if self.health_check().await.unwrap_or(false) {
info!("Port 8080 is in use by a Go service we can connect to");
return Ok(());
} else {
return Err(SchedulerError::SchedulerInternalError(
"Port 8080 is already in use by something other than our Go service."
.to_string(),
));
}
}
// Check if the temporal-service binary exists - try multiple possible locations
let binary_path = Self::find_go_service_binary()?;
let working_dir = std::path::Path::new(&binary_path).parent().ok_or_else(|| {
SchedulerError::SchedulerInternalError(
"Could not determine working directory for Go service".to_string(),
)
})?;
info!("Found Go service binary at: {}", binary_path);
info!("Using working directory: {}", working_dir.display());
let command = format!(
"cd '{}' && nohup ./temporal-service > temporal-service.log 2>&1 & echo $!",
working_dir.display()
);
let output = Command::new("sh")
.arg("-c")
.arg(&command)
.output()
.map_err(|e| {
SchedulerError::SchedulerInternalError(format!(
"Failed to start Go temporal service: {}",
e
))
})?;
if !output.status.success() {
return Err(SchedulerError::SchedulerInternalError(format!(
"Failed to start Go service: {}",
String::from_utf8_lossy(&output.stderr)
)));
}
let pid_output = String::from_utf8_lossy(&output.stdout);
let pid = pid_output.trim();
info!("Temporal Go service started with PID: {}", pid);
Ok(())
}
fn find_go_service_binary() -> Result<String, SchedulerError> {
// Try to find the Go service binary by looking for it relative to the current executable
// or in common locations
let possible_paths = vec![
// Relative to current working directory (original behavior)
"./temporal-service/temporal-service",
];
// Also try to find it relative to the current executable path
if let Ok(exe_path) = std::env::current_exe() {
if let Some(exe_dir) = exe_path.parent() {
// Try various relative paths from the executable directory
let exe_relative_paths = vec![
exe_dir.join("temporal-service/temporal-service"),
exe_dir.join("../temporal-service/temporal-service"),
exe_dir.join("../../temporal-service/temporal-service"),
exe_dir.join("../../../temporal-service/temporal-service"),
exe_dir.join("../../../../temporal-service/temporal-service"),
];
for path in exe_relative_paths {
if path.exists() {
return Ok(path.to_string_lossy().to_string());
}
}
}
}
// Try the original relative paths
for path in &possible_paths {
if std::path::Path::new(path).exists() {
return Ok(path.to_string());
}
}
Err(SchedulerError::SchedulerInternalError(
"Go service binary not found. Tried paths relative to current executable and working directory. Please ensure the temporal-service binary is built and available.".to_string()
))
}
async fn wait_for_service_ready(&self) -> Result<(), SchedulerError> {
info!("Waiting for Temporal service to be ready...");
let start_time = std::time::Instant::now();
while start_time.elapsed() < TEMPORAL_SERVICE_STARTUP_TIMEOUT {
match self.health_check().await {
Ok(true) => {
info!("Temporal service is ready");
return Ok(());
}
Ok(false) => {
// Service responded but not healthy
sleep(TEMPORAL_SERVICE_HEALTH_CHECK_INTERVAL).await;
}
Err(_) => {
// Service not responding yet
sleep(TEMPORAL_SERVICE_HEALTH_CHECK_INTERVAL).await;
}
}
}
Err(SchedulerError::SchedulerInternalError(
"Temporal service failed to become ready within timeout".to_string(),
))
}
async fn health_check(&self) -> Result<bool, SchedulerError> {
let url = format!("{}/health", self.service_url);
match self.http_client.get(&url).send().await {
Ok(response) => {
if response.status().is_success() {
Ok(true)
} else {
Ok(false)
}
}
Err(_) => Ok(false),
}
}
pub async fn add_scheduled_job(&self, job: ScheduledJob) -> Result<(), SchedulerError> {
tracing::info!(
"TemporalScheduler: add_scheduled_job() called for job '{}'",
job.id
);
let request = JobRequest {
action: "create".to_string(),
job_id: Some(job.id.clone()),
cron: Some(job.cron.clone()),
recipe_path: Some(job.source.clone()),
};
let response = self.make_request(request).await?;
if response.success {
info!("Successfully created scheduled job: {}", job.id);
Ok(())
} else {
Err(SchedulerError::SchedulerInternalError(response.message))
}
}
pub async fn list_scheduled_jobs(&self) -> Result<Vec<ScheduledJob>, SchedulerError> {
tracing::info!("TemporalScheduler: list_scheduled_jobs() called");
let request = JobRequest {
action: "list".to_string(),
job_id: None,
cron: None,
recipe_path: None,
};
let response = self.make_request(request).await?;
if response.success {
let jobs = response.jobs.unwrap_or_default();
let scheduled_jobs = jobs
.into_iter()
.map(|tj| {
ScheduledJob {
id: tj.id,
source: tj.recipe_path,
cron: tj.cron,
last_run: tj.last_run.and_then(|s| s.parse::<DateTime<Utc>>().ok()),
currently_running: tj.currently_running,
paused: tj.paused,
current_session_id: None, // Not provided by Temporal service
process_start_time: None, // Not provided by Temporal service
}
})
.collect();
Ok(scheduled_jobs)
} else {
Err(SchedulerError::SchedulerInternalError(response.message))
}
}
pub async fn remove_scheduled_job(&self, id: &str) -> Result<(), SchedulerError> {
let request = JobRequest {
action: "delete".to_string(),
job_id: Some(id.to_string()),
cron: None,
recipe_path: None,
};
let response = self.make_request(request).await?;
if response.success {
info!("Successfully removed scheduled job: {}", id);
Ok(())
} else {
Err(SchedulerError::SchedulerInternalError(response.message))
}
}
pub async fn pause_schedule(&self, id: &str) -> Result<(), SchedulerError> {
let request = JobRequest {
action: "pause".to_string(),
job_id: Some(id.to_string()),
cron: None,
recipe_path: None,
};
let response = self.make_request(request).await?;
if response.success {
info!("Successfully paused scheduled job: {}", id);
Ok(())
} else {
Err(SchedulerError::SchedulerInternalError(response.message))
}
}
pub async fn unpause_schedule(&self, id: &str) -> Result<(), SchedulerError> {
let request = JobRequest {
action: "unpause".to_string(),
job_id: Some(id.to_string()),
cron: None,
recipe_path: None,
};
let response = self.make_request(request).await?;
if response.success {
info!("Successfully unpaused scheduled job: {}", id);
Ok(())
} else {
Err(SchedulerError::SchedulerInternalError(response.message))
}
}
pub async fn run_now(&self, id: &str) -> Result<String, SchedulerError> {
tracing::info!("TemporalScheduler: run_now() called for job '{}'", id);
let request = JobRequest {
action: "run_now".to_string(),
job_id: Some(id.to_string()),
cron: None,
recipe_path: None,
};
let response = self.make_request(request).await?;
if response.success {
if let Some(data) = response.data {
if let Ok(run_response) = serde_json::from_value::<RunNowResponse>(data) {
info!("Successfully started job execution for: {}", id);
Ok(run_response.session_id)
} else {
Err(SchedulerError::SchedulerInternalError(
"Invalid response format for run_now".to_string(),
))
}
} else {
Err(SchedulerError::SchedulerInternalError(
"No session ID returned from run_now".to_string(),
))
}
} else {
Err(SchedulerError::SchedulerInternalError(response.message))
}
}
// Note: This method fetches sessions from the session storage directly
// since Temporal service doesn't track session metadata
pub async fn sessions(
&self,
sched_id: &str,
limit: usize,
) -> Result<Vec<(String, SessionMetadata)>, SchedulerError> {
use crate::session::storage;
// Get all session files
let all_session_files = storage::list_sessions().map_err(|e| {
SchedulerError::SchedulerInternalError(format!("Failed to list sessions: {}", e))
})?;
let mut schedule_sessions: Vec<(String, SessionMetadata)> = Vec::new();
for (session_name, session_path) in all_session_files {
match storage::read_metadata(&session_path) {
Ok(metadata) => {
// Check if this session belongs to the requested schedule
if metadata.schedule_id.as_deref() == Some(sched_id) {
schedule_sessions.push((session_name, metadata));
}
}
Err(e) => {
tracing::warn!(
"Failed to read metadata for session file {}: {}. Skipping.",
session_path.display(),
e
);
}
}
}
// Sort by session_name (timestamp string) in descending order (newest first)
schedule_sessions.sort_by(|a, b| b.0.cmp(&a.0));
// Take only the requested limit
let result_sessions: Vec<(String, SessionMetadata)> =
schedule_sessions.into_iter().take(limit).collect();
tracing::info!(
"Found {} sessions for schedule '{}'",
result_sessions.len(),
sched_id
);
Ok(result_sessions)
}
pub async fn update_schedule(
&self,
_sched_id: &str,
_new_cron: String,
) -> Result<(), SchedulerError> {
warn!("update_schedule() method not implemented for TemporalScheduler - delete and recreate job instead");
Err(SchedulerError::SchedulerInternalError(
"update_schedule not supported - delete and recreate job instead".to_string(),
))
}
pub async fn kill_running_job(&self, _sched_id: &str) -> Result<(), SchedulerError> {
warn!("kill_running_job() method not implemented for TemporalScheduler");
Err(SchedulerError::SchedulerInternalError(
"kill_running_job not supported by TemporalScheduler".to_string(),
))
}
pub async fn get_running_job_info(
&self,
sched_id: &str,
) -> Result<Option<(String, DateTime<Utc>)>, SchedulerError> {
tracing::info!(
"TemporalScheduler: get_running_job_info() called for job '{}'",
sched_id
);
// First check if the job is marked as currently running
let jobs = self.list_scheduled_jobs().await?;
let job = jobs.iter().find(|j| j.id == sched_id);
if let Some(job) = job {
if job.currently_running {
// For now, we'll return a placeholder session ID and current time
// In a more complete implementation, we would track the actual session ID
// and start time from the Temporal workflow execution
let session_id =
format!("temporal-{}-{}", sched_id, chrono::Utc::now().timestamp());
let start_time = chrono::Utc::now(); // This should be the actual start time
Ok(Some((session_id, start_time)))
} else {
Ok(None)
}
} else {
Err(SchedulerError::JobNotFound(sched_id.to_string()))
}
}
async fn make_request(&self, request: JobRequest) -> Result<JobResponse, SchedulerError> {
let url = format!("{}/jobs", self.service_url);
tracing::info!(
"TemporalScheduler: Making HTTP request to {} with action '{}'",
url,
request.action
);
let response = self
.http_client
.post(&url)
.json(&request)
.send()
.await
.map_err(|e| {
SchedulerError::SchedulerInternalError(format!("HTTP request failed: {}", e))
})?;
if !response.status().is_success() {
return Err(SchedulerError::SchedulerInternalError(format!(
"HTTP request failed with status: {}",
response.status()
)));
}
let job_response: JobResponse = response.json().await.map_err(|e| {
SchedulerError::SchedulerInternalError(format!("Failed to parse response JSON: {}", e))
})?;
Ok(job_response)
}
}
impl Drop for TemporalScheduler {
fn drop(&mut self) {
// Services continue running independently - no cleanup needed
info!("TemporalScheduler dropped - Temporal services continue running independently");
}
}
// Service management utilities
impl TemporalScheduler {
/// Check if Temporal services are running
pub async fn check_services_status(&self) -> (bool, bool) {
let temporal_server_running = self.check_temporal_server().await;
let go_service_running = self.health_check().await.unwrap_or(false);
(temporal_server_running, go_service_running)
}
/// Get service information
pub async fn get_service_info(&self) -> String {
let (temporal_running, go_running) = self.check_services_status().await;
format!(
"Temporal Services Status:\n\
- Temporal Server ({}:7233): {}\n\
- Temporal Web UI: http://localhost:8233\n\
- Go Service ({}:8080): {}\n\
- Service logs: temporal-server.log, temporal-service/temporal-service.log",
if temporal_running {
"localhost"
} else {
"not running"
},
if temporal_running {
"✅ Running"
} else {
"❌ Not Running"
},
if go_running {
"localhost"
} else {
"not running"
},
if go_running {
"✅ Running"
} else {
"❌ Not Running"
}
)
}
/// Stop Temporal services (for manual management)
pub async fn stop_services(&self) -> Result<String, SchedulerError> {
info!("Stopping Temporal services...");
let mut results = Vec::new();
// Stop Go service
let go_result = Command::new("pkill")
.args(["-f", "temporal-service"])
.output();
match go_result {
Ok(output) if output.status.success() => {
results.push("✅ Go service stopped".to_string());
}
Ok(_) => {
results.push("⚠️ Go service was not running or failed to stop".to_string());
}
Err(e) => {
results.push(format!("❌ Failed to stop Go service: {}", e));
}
}
// Stop Temporal server
let temporal_result = Command::new("pkill")
.args(["-f", "temporal server start-dev"])
.output();
match temporal_result {
Ok(output) if output.status.success() => {
results.push("✅ Temporal server stopped".to_string());
}
Ok(_) => {
results.push("⚠️ Temporal server was not running or failed to stop".to_string());
}
Err(e) => {
results.push(format!("❌ Failed to stop Temporal server: {}", e));
}
}
let result_message = results.join("\n");
info!("Service stop results: {}", result_message);
Ok(result_message)
}
}
#[async_trait]
impl SchedulerTrait for TemporalScheduler {
async fn add_scheduled_job(&self, job: ScheduledJob) -> Result<(), SchedulerError> {
self.add_scheduled_job(job).await
}
async fn list_scheduled_jobs(&self) -> Result<Vec<ScheduledJob>, SchedulerError> {
self.list_scheduled_jobs().await
}
async fn remove_scheduled_job(&self, id: &str) -> Result<(), SchedulerError> {
self.remove_scheduled_job(id).await
}
async fn pause_schedule(&self, id: &str) -> Result<(), SchedulerError> {
self.pause_schedule(id).await
}
async fn unpause_schedule(&self, id: &str) -> Result<(), SchedulerError> {
self.unpause_schedule(id).await
}
async fn run_now(&self, id: &str) -> Result<String, SchedulerError> {
self.run_now(id).await
}
async fn sessions(
&self,
sched_id: &str,
limit: usize,
) -> Result<Vec<(String, SessionMetadata)>, SchedulerError> {
self.sessions(sched_id, limit).await
}
async fn update_schedule(
&self,
sched_id: &str,
new_cron: String,
) -> Result<(), SchedulerError> {
self.update_schedule(sched_id, new_cron).await
}
async fn kill_running_job(&self, sched_id: &str) -> Result<(), SchedulerError> {
self.kill_running_job(sched_id).await
}
async fn get_running_job_info(
&self,
sched_id: &str,
) -> Result<Option<(String, DateTime<Utc>)>, SchedulerError> {
self.get_running_job_info(sched_id).await
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_sessions_method_exists_and_compiles() {
// This test verifies that the sessions method exists and compiles correctly
// It doesn't require Temporal services to be running
// Create a mock scheduler instance (this will fail if services aren't running, but that's OK)
let result = TemporalScheduler::new().await;
// Even if scheduler creation fails, we can still test the method signature
match result {
Ok(scheduler) => {
// If services are running, test the actual method
let sessions_result = scheduler.sessions("test-schedule", 5).await;
// The method should return a Result, regardless of success/failure
match sessions_result {
Ok(sessions) => {
// Verify the return type is correct
assert!(sessions.len() <= 5); // Should respect the limit
println!("✅ sessions() method returned {} sessions", sessions.len());
}
Err(e) => {
// Even errors are OK - the method is implemented
println!(
"⚠️ sessions() method returned error (expected if no sessions): {}",
e
);
}
}
}
Err(_) => {
// Services not running - that's fine, we just verified the method compiles
println!("⚠️ Temporal services not running - method signature test passed");
}
}
}
#[test]
fn test_sessions_method_signature() {
// This test verifies the method signature is correct at compile time
// We just need to verify the method exists and can be called
// This will fail to compile if the method doesn't exist or has wrong signature
let _test_fn = |scheduler: &TemporalScheduler, id: &str, limit: usize| {
// This is a compile-time check - we don't actually call it
let _future = scheduler.sessions(id, limit);
};
println!("✅ sessions() method signature is correct");
}
#[test]
fn test_port_check_functionality() {
// Test the port checking functionality
use tokio::runtime::Runtime;
let rt = Runtime::new().unwrap();
rt.block_on(async {
let scheduler = TemporalScheduler {
http_client: reqwest::Client::new(),
service_url: "http://localhost:8080".to_string(),
};
// Test with a port that should be available (high port number)
let high_port_in_use = scheduler.check_port_in_use(65432).await;
// Test with a port that might be in use (port 80)
let low_port_in_use = scheduler.check_port_in_use(80).await;
println!("✅ Port checking functionality works");
println!(" High port (65432) in use: {}", high_port_in_use);
println!(" Low port (80) in use: {}", low_port_in_use);
});
}
#[test]
fn test_find_go_service_binary() {
// Test the Go service binary finding logic
match TemporalScheduler::find_go_service_binary() {
Ok(path) => {
println!("✅ Found Go service binary at: {}", path);
assert!(
std::path::Path::new(&path).exists(),
"Binary should exist at found path"
);
}
Err(e) => {
println!("⚠️ Go service binary not found: {}", e);
// This is expected if the binary isn't built or available
}
}
}
}

View File

@@ -1,35 +0,0 @@
#!/bin/bash
# Build script for Temporal service
set -e
echo "Building Temporal service..."
# Change to temporal-service directory
cd "$(dirname "$0")"
# Initialize Go module if not already done
if [ ! -f "go.sum" ]; then
echo "Initializing Go module..."
go mod tidy
fi
# Build the service
echo "Compiling Go binary..."
go build -o temporal-service main.go
# Make it executable
chmod +x temporal-service
echo "Build completed successfully!"
echo "Binary location: $(pwd)/temporal-service"
echo ""
echo "Prerequisites:"
echo " 1. Install Temporal CLI: brew install temporal"
echo " 2. Start Temporal server: temporal server start-dev"
echo ""
echo "To run the service:"
echo " ./temporal-service"
echo ""
echo "Environment variables:"
echo " PORT - HTTP port (default: 8080)"

View File

@@ -1,117 +0,0 @@
#!/bin/bash
# Example usage script for the Temporal service
set -e
echo "Temporal Service Example Usage"
echo "=============================="
echo ""
# Check if service is running
if ! curl -s http://localhost:8080/health > /dev/null; then
echo "Starting Temporal service..."
echo "Please run in another terminal: ./temporal-service"
echo "Then run this script again."
exit 1
fi
echo "✓ Temporal service is running"
echo ""
# Create example recipe
RECIPE_FILE="/tmp/example-recipe.yaml"
cat > $RECIPE_FILE << EOF
version: "1.0.0"
title: "Daily Report Generator"
description: "Generates a daily report"
prompt: |
Generate a daily report with the following information:
- Current date and time
- System status
- Recent activity summary
Please format the output as a structured report.
EOF
echo "Created example recipe: $RECIPE_FILE"
echo ""
# Function to make API calls
make_api_call() {
local action="$1"
local job_id="$2"
local cron="$3"
local recipe_path="$4"
local payload="{\"action\": \"$action\""
if [ -n "$job_id" ]; then
payload="$payload, \"job_id\": \"$job_id\""
fi
if [ -n "$cron" ]; then
payload="$payload, \"cron\": \"$cron\""
fi
if [ -n "$recipe_path" ]; then
payload="$payload, \"recipe_path\": \"$recipe_path\""
fi
payload="$payload}"
echo "API Call: $payload"
curl -s -X POST http://localhost:8080/jobs \
-H "Content-Type: application/json" \
-d "$payload" | jq .
echo ""
}
# Example 1: Create a daily job
echo "1. Creating a daily job (runs at 9 AM every day)..."
make_api_call "create" "daily-report" "0 9 * * *" "$RECIPE_FILE"
# Example 2: Create an hourly job
echo "2. Creating an hourly job..."
make_api_call "create" "hourly-check" "0 * * * *" "$RECIPE_FILE"
# Example 3: List all jobs
echo "3. Listing all scheduled jobs..."
make_api_call "list"
# Example 4: Pause a job
echo "4. Pausing the hourly job..."
make_api_call "pause" "hourly-check"
# Example 5: List jobs again to see paused status
echo "5. Listing jobs to see paused status..."
make_api_call "list"
# Example 6: Unpause the job
echo "6. Unpausing the hourly job..."
make_api_call "unpause" "hourly-check"
# Example 7: Run a job immediately
echo "7. Running daily-report job immediately..."
echo "Note: This will fail without goose-scheduler-executor binary"
make_api_call "run_now" "daily-report"
# Example 8: Delete jobs
echo "8. Cleaning up - deleting jobs..."
make_api_call "delete" "daily-report"
make_api_call "delete" "hourly-check"
# Example 9: Final list (should be empty)
echo "9. Final job list (should be empty)..."
make_api_call "list"
# Clean up
rm -f $RECIPE_FILE
echo "Example completed!"
echo ""
echo "Common cron expressions:"
echo " '0 9 * * *' - Daily at 9 AM"
echo " '0 */6 * * *' - Every 6 hours"
echo " '*/15 * * * *' - Every 15 minutes"
echo " '0 0 * * 0' - Weekly on Sunday at midnight"
echo " '0 0 1 * *' - Monthly on the 1st at midnight"

View File

@@ -1,35 +0,0 @@
module temporal-service
go 1.21
require go.temporal.io/sdk v1.24.0
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a // indirect
github.com/gogo/googleapis v1.4.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/gogo/status v1.1.1 // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
github.com/pborman/uuid v1.2.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/robfig/cron v1.2.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/stretchr/testify v1.8.4 // indirect
go.temporal.io/api v1.24.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
golang.org/x/net v0.14.0 // indirect
golang.org/x/sys v0.11.0 // indirect
golang.org/x/text v0.12.0 // indirect
golang.org/x/time v0.3.0 // indirect
google.golang.org/genproto v0.0.0-20230815205213-6bfd019c3878 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230815205213-6bfd019c3878 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230815205213-6bfd019c3878 // indirect
google.golang.org/grpc v1.57.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

File diff suppressed because it is too large Load Diff

View File

@@ -1,544 +0,0 @@
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"os"
"os/exec"
"os/signal"
"strings"
"syscall"
"time"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"
)
const (
TaskQueueName = "goose-task-queue"
Namespace = "default"
)
// Global service instance for activities to access
var globalService *TemporalService
// Request/Response types for HTTP API
type JobRequest struct {
Action string `json:"action"` // create, delete, pause, unpause, list, run_now
JobID string `json:"job_id"`
CronExpr string `json:"cron"`
RecipePath string `json:"recipe_path"`
}
type JobResponse struct {
Success bool `json:"success"`
Message string `json:"message"`
Jobs []JobStatus `json:"jobs,omitempty"`
Data interface{} `json:"data,omitempty"`
}
type JobStatus struct {
ID string `json:"id"`
CronExpr string `json:"cron"`
RecipePath string `json:"recipe_path"`
LastRun *string `json:"last_run,omitempty"`
NextRun *string `json:"next_run,omitempty"`
CurrentlyRunning bool `json:"currently_running"`
Paused bool `json:"paused"`
CreatedAt time.Time `json:"created_at"`
}
type RunNowResponse struct {
SessionID string `json:"session_id"`
}
// TemporalService manages the Temporal client and provides HTTP API
type TemporalService struct {
client client.Client
worker worker.Worker
scheduleJobs map[string]*JobStatus // In-memory job tracking
runningJobs map[string]bool // Track which jobs are currently running
}
// NewTemporalService creates a new Temporal service that connects to existing server
func NewTemporalService() (*TemporalService, error) {
// Create client (assumes Temporal server is already running)
c, err := client.Dial(client.Options{
HostPort: "127.0.0.1:7233",
Namespace: Namespace,
})
if err != nil {
return nil, fmt.Errorf("failed to create temporal client: %w", err)
}
// Create worker
w := worker.New(c, TaskQueueName, worker.Options{})
w.RegisterWorkflow(GooseJobWorkflow)
w.RegisterActivity(ExecuteGooseRecipe)
if err := w.Start(); err != nil {
c.Close()
return nil, fmt.Errorf("failed to start worker: %w", err)
}
log.Println("Connected to Temporal server successfully")
service := &TemporalService{
client: c,
worker: w,
scheduleJobs: make(map[string]*JobStatus),
runningJobs: make(map[string]bool),
}
// Set global service for activities
globalService = service
return service, nil
}
// Stop gracefully shuts down the Temporal service
func (ts *TemporalService) Stop() {
log.Println("Shutting down Temporal service...")
if ts.worker != nil {
ts.worker.Stop()
}
if ts.client != nil {
ts.client.Close()
}
log.Println("Temporal service stopped")
}
// Workflow definition for executing Goose recipes
func GooseJobWorkflow(ctx workflow.Context, jobID, recipePath string) (string, error) {
logger := workflow.GetLogger(ctx)
logger.Info("Starting Goose job workflow", "jobID", jobID, "recipePath", recipePath)
ao := workflow.ActivityOptions{
StartToCloseTimeout: 2 * time.Hour, // Allow up to 2 hours for job execution
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: time.Minute,
MaximumAttempts: 3,
NonRetryableErrorTypes: []string{"InvalidRecipeError"},
},
}
ctx = workflow.WithActivityOptions(ctx, ao)
var sessionID string
err := workflow.ExecuteActivity(ctx, ExecuteGooseRecipe, jobID, recipePath).Get(ctx, &sessionID)
if err != nil {
logger.Error("Goose job workflow failed", "jobID", jobID, "error", err)
return "", err
}
logger.Info("Goose job workflow completed", "jobID", jobID, "sessionID", sessionID)
return sessionID, nil
}
// Activity definition for executing Goose recipes
func ExecuteGooseRecipe(ctx context.Context, jobID, recipePath string) (string, error) {
logger := activity.GetLogger(ctx)
logger.Info("Executing Goose recipe", "jobID", jobID, "recipePath", recipePath)
// Mark job as running at the start
if globalService != nil {
globalService.markJobAsRunning(jobID)
// Ensure we mark it as not running when we're done
defer globalService.markJobAsNotRunning(jobID)
}
// Check if recipe file exists
if _, err := os.Stat(recipePath); os.IsNotExist(err) {
return "", temporal.NewNonRetryableApplicationError(
fmt.Sprintf("recipe file not found: %s", recipePath),
"InvalidRecipeError",
err,
)
}
// Execute the Goose recipe via the executor binary
cmd := exec.CommandContext(ctx, "goose-scheduler-executor", jobID, recipePath)
cmd.Env = append(os.Environ(), fmt.Sprintf("GOOSE_JOB_ID=%s", jobID))
output, err := cmd.Output()
if err != nil {
if exitError, ok := err.(*exec.ExitError); ok {
logger.Error("Recipe execution failed", "jobID", jobID, "stderr", string(exitError.Stderr))
return "", fmt.Errorf("recipe execution failed: %s", string(exitError.Stderr))
}
return "", fmt.Errorf("failed to execute recipe: %w", err)
}
sessionID := strings.TrimSpace(string(output))
logger.Info("Recipe executed successfully", "jobID", jobID, "sessionID", sessionID)
return sessionID, nil
}
// HTTP API handlers
func (ts *TemporalService) handleJobs(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
if r.Method != http.MethodPost {
ts.writeErrorResponse(w, http.StatusMethodNotAllowed, "Method not allowed")
return
}
var req JobRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
ts.writeErrorResponse(w, http.StatusBadRequest, fmt.Sprintf("Invalid JSON: %v", err))
return
}
var resp JobResponse
switch req.Action {
case "create":
resp = ts.createSchedule(req)
case "delete":
resp = ts.deleteSchedule(req)
case "pause":
resp = ts.pauseSchedule(req)
case "unpause":
resp = ts.unpauseSchedule(req)
case "list":
resp = ts.listSchedules()
case "run_now":
resp = ts.runNow(req)
default:
resp = JobResponse{Success: false, Message: fmt.Sprintf("Unknown action: %s", req.Action)}
}
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(resp)
}
func (ts *TemporalService) createSchedule(req JobRequest) JobResponse {
if req.JobID == "" || req.CronExpr == "" || req.RecipePath == "" {
return JobResponse{Success: false, Message: "Missing required fields: job_id, cron, recipe_path"}
}
// Check if job already exists
if _, exists := ts.scheduleJobs[req.JobID]; exists {
return JobResponse{Success: false, Message: fmt.Sprintf("Job with ID '%s' already exists", req.JobID)}
}
// Validate recipe file exists
if _, err := os.Stat(req.RecipePath); os.IsNotExist(err) {
return JobResponse{Success: false, Message: fmt.Sprintf("Recipe file not found: %s", req.RecipePath)}
}
scheduleID := fmt.Sprintf("goose-job-%s", req.JobID)
// Create Temporal schedule
schedule := client.ScheduleOptions{
ID: scheduleID,
Spec: client.ScheduleSpec{
CronExpressions: []string{req.CronExpr},
},
Action: &client.ScheduleWorkflowAction{
ID: fmt.Sprintf("workflow-%s-{{.ScheduledTime.Unix}}", req.JobID),
Workflow: GooseJobWorkflow,
Args: []interface{}{req.JobID, req.RecipePath},
TaskQueue: TaskQueueName,
},
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
_, err := ts.client.ScheduleClient().Create(ctx, schedule)
if err != nil {
return JobResponse{Success: false, Message: fmt.Sprintf("Failed to create schedule: %v", err)}
}
// Track job in memory
jobStatus := &JobStatus{
ID: req.JobID,
CronExpr: req.CronExpr,
RecipePath: req.RecipePath,
CurrentlyRunning: false,
Paused: false,
CreatedAt: time.Now(),
}
ts.scheduleJobs[req.JobID] = jobStatus
log.Printf("Created schedule for job: %s", req.JobID)
return JobResponse{Success: true, Message: "Schedule created successfully"}
}
func (ts *TemporalService) deleteSchedule(req JobRequest) JobResponse {
if req.JobID == "" {
return JobResponse{Success: false, Message: "Missing job_id"}
}
scheduleID := fmt.Sprintf("goose-job-%s", req.JobID)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
handle := ts.client.ScheduleClient().GetHandle(ctx, scheduleID)
err := handle.Delete(ctx)
if err != nil {
return JobResponse{Success: false, Message: fmt.Sprintf("Failed to delete schedule: %v", err)}
}
// Remove from memory
delete(ts.scheduleJobs, req.JobID)
log.Printf("Deleted schedule for job: %s", req.JobID)
return JobResponse{Success: true, Message: "Schedule deleted successfully"}
}
func (ts *TemporalService) pauseSchedule(req JobRequest) JobResponse {
if req.JobID == "" {
return JobResponse{Success: false, Message: "Missing job_id"}
}
scheduleID := fmt.Sprintf("goose-job-%s", req.JobID)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
handle := ts.client.ScheduleClient().GetHandle(ctx, scheduleID)
err := handle.Pause(ctx, client.SchedulePauseOptions{
Note: "Paused via API",
})
if err != nil {
return JobResponse{Success: false, Message: fmt.Sprintf("Failed to pause schedule: %v", err)}
}
// Update in memory
if job, exists := ts.scheduleJobs[req.JobID]; exists {
job.Paused = true
}
log.Printf("Paused schedule for job: %s", req.JobID)
return JobResponse{Success: true, Message: "Schedule paused successfully"}
}
func (ts *TemporalService) unpauseSchedule(req JobRequest) JobResponse {
if req.JobID == "" {
return JobResponse{Success: false, Message: "Missing job_id"}
}
scheduleID := fmt.Sprintf("goose-job-%s", req.JobID)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
handle := ts.client.ScheduleClient().GetHandle(ctx, scheduleID)
err := handle.Unpause(ctx, client.ScheduleUnpauseOptions{
Note: "Unpaused via API",
})
if err != nil {
return JobResponse{Success: false, Message: fmt.Sprintf("Failed to unpause schedule: %v", err)}
}
// Update in memory
if job, exists := ts.scheduleJobs[req.JobID]; exists {
job.Paused = false
}
log.Printf("Unpaused schedule for job: %s", req.JobID)
return JobResponse{Success: true, Message: "Schedule unpaused successfully"}
}
func (ts *TemporalService) listSchedules() JobResponse {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// List all schedules from Temporal
iter, err := ts.client.ScheduleClient().List(ctx, client.ScheduleListOptions{})
if err != nil {
return JobResponse{Success: false, Message: fmt.Sprintf("Failed to list schedules: %v", err)}
}
var jobs []JobStatus
for iter.HasNext() {
schedule, err := iter.Next()
if err != nil {
log.Printf("Error listing schedules: %v", err)
continue
}
// Extract job ID from schedule ID
if strings.HasPrefix(schedule.ID, "goose-job-") {
jobID := strings.TrimPrefix(schedule.ID, "goose-job-")
// Get additional details from in-memory tracking
var jobStatus JobStatus
if tracked, exists := ts.scheduleJobs[jobID]; exists {
jobStatus = *tracked
} else {
// Fallback for schedules not in memory
jobStatus = JobStatus{
ID: jobID,
CreatedAt: time.Now(), // We don't have the real creation time
}
}
// Update with Temporal schedule info
if len(schedule.Spec.CronExpressions) > 0 {
jobStatus.CronExpr = schedule.Spec.CronExpressions[0]
}
// Get detailed schedule information including paused state and running status
scheduleHandle := ts.client.ScheduleClient().GetHandle(ctx, schedule.ID)
if desc, err := scheduleHandle.Describe(ctx); err == nil {
jobStatus.Paused = desc.Schedule.State.Paused
// Check if there are any running workflows for this job
jobStatus.CurrentlyRunning = ts.isJobCurrentlyRunning(ctx, jobID)
// Update last run time if available
if len(desc.Info.RecentActions) > 0 {
lastAction := desc.Info.RecentActions[len(desc.Info.RecentActions)-1]
if !lastAction.ActualTime.IsZero() {
lastRunStr := lastAction.ActualTime.Format(time.RFC3339)
jobStatus.LastRun = &lastRunStr
}
}
// Update next run time if available - this field may not exist in older SDK versions
// We'll skip this for now to avoid compilation errors
} else {
log.Printf("Warning: Could not get detailed info for schedule %s: %v", schedule.ID, err)
}
// Update in-memory tracking with latest info
ts.scheduleJobs[jobID] = &jobStatus
jobs = append(jobs, jobStatus)
}
}
return JobResponse{Success: true, Jobs: jobs}
}
// isJobCurrentlyRunning checks if there are any running workflows for the given job ID
func (ts *TemporalService) isJobCurrentlyRunning(ctx context.Context, jobID string) bool {
// Check our in-memory tracking of running jobs
if running, exists := ts.runningJobs[jobID]; exists && running {
return true
}
return false
}
// markJobAsRunning sets a job as currently running
func (ts *TemporalService) markJobAsRunning(jobID string) {
ts.runningJobs[jobID] = true
log.Printf("Marked job %s as running", jobID)
}
// markJobAsNotRunning sets a job as not currently running
func (ts *TemporalService) markJobAsNotRunning(jobID string) {
delete(ts.runningJobs, jobID)
log.Printf("Marked job %s as not running", jobID)
}
func (ts *TemporalService) runNow(req JobRequest) JobResponse {
if req.JobID == "" {
return JobResponse{Success: false, Message: "Missing job_id"}
}
// Get job details
job, exists := ts.scheduleJobs[req.JobID]
if !exists {
return JobResponse{Success: false, Message: fmt.Sprintf("Job '%s' not found", req.JobID)}
}
// Execute workflow immediately
workflowOptions := client.StartWorkflowOptions{
ID: fmt.Sprintf("manual-%s-%d", req.JobID, time.Now().Unix()),
TaskQueue: TaskQueueName,
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
we, err := ts.client.ExecuteWorkflow(ctx, workflowOptions, GooseJobWorkflow, req.JobID, job.RecipePath)
if err != nil {
return JobResponse{Success: false, Message: fmt.Sprintf("Failed to start workflow: %v", err)}
}
// Don't wait for completion in run_now, just return the workflow ID
log.Printf("Manual execution started for job: %s, workflow: %s", req.JobID, we.GetID())
return JobResponse{
Success: true,
Message: "Job execution started",
Data: RunNowResponse{SessionID: we.GetID()}, // Return workflow ID as session ID for now
}
}
func (ts *TemporalService) writeErrorResponse(w http.ResponseWriter, statusCode int, message string) {
w.WriteHeader(statusCode)
json.NewEncoder(w).Encode(JobResponse{Success: false, Message: message})
}
func (ts *TemporalService) handleHealth(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(map[string]string{"status": "healthy"})
}
func main() {
port := os.Getenv("PORT")
if port == "" {
port = "8080"
}
log.Println("Starting Temporal service...")
log.Println("Note: This service requires a running Temporal server at 127.0.0.1:7233")
log.Println("Start Temporal server with: temporal server start-dev")
// Create Temporal service
service, err := NewTemporalService()
if err != nil {
log.Fatalf("Failed to create Temporal service: %v", err)
}
// Set up HTTP server
mux := http.NewServeMux()
mux.HandleFunc("/jobs", service.handleJobs)
mux.HandleFunc("/health", service.handleHealth)
server := &http.Server{
Addr: ":" + port,
Handler: mux,
}
// Handle graceful shutdown
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigChan
log.Println("Received shutdown signal")
// Shutdown HTTP server
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
server.Shutdown(ctx)
// Stop Temporal service
service.Stop()
os.Exit(0)
}()
log.Printf("Temporal service starting on port %s", port)
log.Printf("Health endpoint: http://localhost:%s/health", port)
log.Printf("Jobs endpoint: http://localhost:%s/jobs", port)
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf("HTTP server failed: %v", err)
}
}

View File

@@ -1,92 +0,0 @@
#!/bin/bash
# Startup script for Temporal service with integrated Temporal server
set -e
echo "Starting Temporal development environment..."
# Check if temporal CLI is available
if ! command -v temporal &> /dev/null; then
echo "Error: Temporal CLI not found!"
echo "Please install it first:"
echo " brew install temporal"
echo " # or download from https://github.com/temporalio/cli/releases"
exit 1
fi
# Check if temporal-service binary exists
if [ ! -f "./temporal-service" ]; then
echo "Error: temporal-service binary not found!"
echo "Please build it first: ./build.sh"
exit 1
fi
# Set data directory
DATA_DIR="${GOOSE_DATA_DIR:-./data}"
mkdir -p "$DATA_DIR"
echo "Data directory: $DATA_DIR"
echo "Starting Temporal server..."
# Start Temporal server in background
temporal server start-dev \
--db-filename "$DATA_DIR/temporal.db" \
--port 7233 \
--ui-port 8233 \
--log-level warn &
TEMPORAL_PID=$!
echo "Temporal server started with PID: $TEMPORAL_PID"
# Function to cleanup on exit
cleanup() {
echo ""
echo "Shutting down..."
if [ ! -z "$SERVICE_PID" ]; then
echo "Stopping temporal-service (PID: $SERVICE_PID)..."
kill $SERVICE_PID 2>/dev/null || true
fi
echo "Stopping Temporal server (PID: $TEMPORAL_PID)..."
kill $TEMPORAL_PID 2>/dev/null || true
wait $TEMPORAL_PID 2>/dev/null || true
echo "Shutdown complete"
}
# Set trap for cleanup
trap cleanup EXIT INT TERM
# Wait for Temporal server to be ready
echo "Waiting for Temporal server to be ready..."
for i in {1..30}; do
if curl -s http://localhost:7233/api/v1/namespaces > /dev/null 2>&1; then
echo "Temporal server is ready!"
break
fi
if [ $i -eq 30 ]; then
echo "Error: Temporal server failed to start within 30 seconds"
exit 1
fi
sleep 1
done
# Start the temporal service
echo "Starting temporal-service..."
PORT="${PORT:-8080}" ./temporal-service &
SERVICE_PID=$!
echo ""
echo "🎉 Temporal development environment is running!"
echo ""
echo "Services:"
echo " - Temporal Server: http://localhost:7233 (gRPC)"
echo " - Temporal Web UI: http://localhost:8233"
echo " - Goose Scheduler API: http://localhost:${PORT:-8080}"
echo ""
echo "API Endpoints:"
echo " - Health: http://localhost:${PORT:-8080}/health"
echo " - Jobs: http://localhost:${PORT:-8080}/jobs"
echo ""
echo "Press Ctrl+C to stop all services"
# Wait for the service to exit
wait $SERVICE_PID

Binary file not shown.

View File

@@ -1,123 +0,0 @@
#!/bin/bash
# Test script for Temporal service
set -e
echo "Testing Temporal service..."
# Check if service is running
if ! curl -s http://localhost:8080/health > /dev/null; then
echo "Error: Temporal service is not running on port 8080"
echo "Please start it with: ./temporal-service"
exit 1
fi
echo "✓ Service is running"
# Test health endpoint
echo "Testing health endpoint..."
HEALTH_RESPONSE=$(curl -s http://localhost:8080/health)
if [[ $HEALTH_RESPONSE == *"healthy"* ]]; then
echo "✓ Health check passed"
else
echo "✗ Health check failed: $HEALTH_RESPONSE"
exit 1
fi
# Test list schedules (should be empty initially)
echo "Testing list schedules..."
LIST_RESPONSE=$(curl -s -X POST http://localhost:8080/jobs \
-H "Content-Type: application/json" \
-d '{"action": "list"}')
if [[ $LIST_RESPONSE == *"\"success\":true"* ]]; then
echo "✓ List schedules works"
else
echo "✗ List schedules failed: $LIST_RESPONSE"
exit 1
fi
# Create a test recipe file
TEST_RECIPE="/tmp/test-recipe.yaml"
cat > $TEST_RECIPE << EOF
version: "1.0.0"
title: "Test Recipe"
description: "A test recipe for the scheduler"
prompt: "This is a test prompt for scheduled execution."
EOF
echo "Created test recipe at $TEST_RECIPE"
# Test create schedule
echo "Testing create schedule..."
CREATE_RESPONSE=$(curl -s -X POST http://localhost:8080/jobs \
-H "Content-Type: application/json" \
-d "{\"action\": \"create\", \"job_id\": \"test-job\", \"cron\": \"0 */6 * * *\", \"recipe_path\": \"$TEST_RECIPE\"}")
if [[ $CREATE_RESPONSE == *"\"success\":true"* ]]; then
echo "✓ Create schedule works"
else
echo "✗ Create schedule failed: $CREATE_RESPONSE"
exit 1
fi
# Test list schedules again (should have one job)
echo "Testing list schedules with job..."
LIST_RESPONSE=$(curl -s -X POST http://localhost:8080/jobs \
-H "Content-Type: application/json" \
-d '{"action": "list"}')
if [[ $LIST_RESPONSE == *"test-job"* ]]; then
echo "✓ Job appears in list"
else
echo "✗ Job not found in list: $LIST_RESPONSE"
exit 1
fi
# Test pause schedule
echo "Testing pause schedule..."
PAUSE_RESPONSE=$(curl -s -X POST http://localhost:8080/jobs \
-H "Content-Type: application/json" \
-d '{"action": "pause", "job_id": "test-job"}')
if [[ $PAUSE_RESPONSE == *"\"success\":true"* ]]; then
echo "✓ Pause schedule works"
else
echo "✗ Pause schedule failed: $PAUSE_RESPONSE"
exit 1
fi
# Test unpause schedule
echo "Testing unpause schedule..."
UNPAUSE_RESPONSE=$(curl -s -X POST http://localhost:8080/jobs \
-H "Content-Type: application/json" \
-d '{"action": "unpause", "job_id": "test-job"}')
if [[ $UNPAUSE_RESPONSE == *"\"success\":true"* ]]; then
echo "✓ Unpause schedule works"
else
echo "✗ Unpause schedule failed: $UNPAUSE_RESPONSE"
exit 1
fi
# Test delete schedule
echo "Testing delete schedule..."
DELETE_RESPONSE=$(curl -s -X POST http://localhost:8080/jobs \
-H "Content-Type: application/json" \
-d '{"action": "delete", "job_id": "test-job"}')
if [[ $DELETE_RESPONSE == *"\"success\":true"* ]]; then
echo "✓ Delete schedule works"
else
echo "✗ Delete schedule failed: $DELETE_RESPONSE"
exit 1
fi
# Clean up
rm -f $TEST_RECIPE
echo ""
echo "🎉 All tests passed!"
echo ""
echo "The Temporal service is working correctly."
echo "You can now integrate it with the Rust scheduler."

View File

@@ -1,114 +0,0 @@
#!/bin/bash
# Test script for Temporal Scheduler Integration
# This script tests the complete Phase 2 implementation
set -e
echo "🚀 Testing Temporal Scheduler Integration - Phase 2"
echo "=================================================="
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color
# Function to print status
print_status() {
echo -e "${GREEN}$1${NC}"
}
print_warning() {
echo -e "${YELLOW}⚠️ $1${NC}"
}
print_error() {
echo -e "${RED}$1${NC}"
}
# Check prerequisites
echo "🔍 Checking prerequisites..."
# Check if Temporal CLI is installed
if ! command -v temporal &> /dev/null; then
print_error "Temporal CLI not found. Install with: brew install temporal"
exit 1
fi
print_status "Temporal CLI found"
# Check if Go service is built
if [ ! -f "temporal-service/temporal-service" ]; then
print_error "Temporal service not built. Run: cd temporal-service && ./build.sh"
exit 1
fi
print_status "Temporal service binary found"
# Check if Rust executor is built
if ! command -v goose-scheduler-executor &> /dev/null; then
print_error "goose-scheduler-executor not found in PATH"
print_warning "Building and installing executor..."
cargo build --release --bin goose-scheduler-executor
cp target/release/goose-scheduler-executor /usr/local/bin/
fi
print_status "goose-scheduler-executor found"
# Build the goose library
echo "🔨 Building goose library..."
cargo build --lib -p goose
print_status "Goose library built successfully"
# Test 1: Verify trait compilation
echo "🧪 Test 1: Verify trait abstraction compiles..."
cargo check --lib -p goose
print_status "Trait abstraction compiles correctly"
# Test 2: Verify executor binary works
echo "🧪 Test 2: Test executor binary help..."
if goose-scheduler-executor --help > /dev/null 2>&1; then
print_status "Executor binary responds to --help"
else
print_error "Executor binary failed help test"
exit 1
fi
# Test 3: Create a test recipe
echo "🧪 Test 3: Creating test recipe..."
TEST_RECIPE_DIR="/tmp/goose-temporal-test"
mkdir -p "$TEST_RECIPE_DIR"
cat > "$TEST_RECIPE_DIR/test-recipe.yaml" << 'EOF'
version: "1.0.0"
title: "Temporal Test Recipe"
description: "A simple test recipe for Temporal scheduler"
prompt: "Say hello and tell me the current time"
EOF
print_status "Test recipe created at $TEST_RECIPE_DIR/test-recipe.yaml"
# Test 4: Verify the integration compiles with all features
echo "🧪 Test 4: Full compilation test..."
cargo build --workspace --exclude goose-server --exclude goose-cli
print_status "Full workspace builds successfully"
echo ""
echo "🎉 Phase 2 Integration Tests Complete!"
echo "======================================"
print_status "All tests passed successfully"
echo ""
echo "📋 What was tested:"
echo " ✅ Prerequisites (Temporal CLI, Go service, Rust executor)"
echo " ✅ Goose library compilation"
echo " ✅ Trait abstraction"
echo " ✅ Executor binary functionality"
echo " ✅ Test recipe creation"
echo " ✅ Full workspace compilation"
echo ""
echo "🚀 Ready for Phase 3: Migration & Testing"
echo ""
echo "To test the Temporal scheduler manually:"
echo " 1. Set environment: export GOOSE_SCHEDULER_TYPE=temporal"
echo " 2. Start services: cd temporal-service && ./start.sh"
echo " 3. Use the scheduler factory in your code"
echo ""
print_status "Phase 2 implementation is ready!"

View File

@@ -1,84 +0,0 @@
#!/bin/bash
# Simple test to verify the port conflict fix
echo "🧪 Testing TemporalScheduler port conflict fix"
echo "=============================================="
# Check if we're in the right directory
if [ ! -f "crates/goose/src/temporal_scheduler.rs" ]; then
echo "❌ Please run this script from the goose project root directory"
exit 1
fi
echo "✅ Prerequisites check passed"
# Build the project
echo "🔨 Building project..."
cargo build --release > /dev/null 2>&1
if [ $? -ne 0 ]; then
echo "❌ Build failed"
exit 1
fi
echo "✅ Build successful"
# Run the unit tests to make sure our logic is correct
echo "🧪 Running TemporalScheduler unit tests..."
cargo test temporal_scheduler::tests --quiet > /dev/null 2>&1
if [ $? -eq 0 ]; then
echo "✅ All unit tests passed"
else
echo "❌ Unit tests failed"
exit 1
fi
# Check the code for the specific improvements
echo "🔍 Verifying code improvements..."
# Check that we have the improved service detection logic
if grep -q "Port 7233 is in use by a Temporal server we can connect to" crates/goose/src/temporal_scheduler.rs; then
echo "✅ Found improved Temporal server detection logic"
else
echo "❌ Missing improved Temporal server detection logic"
exit 1
fi
if grep -q "Port 8080 is in use by a Go service we can connect to" crates/goose/src/temporal_scheduler.rs; then
echo "✅ Found improved Go service detection logic"
else
echo "❌ Missing improved Go service detection logic"
exit 1
fi
# Check that we have the comprehensive service status checking
if grep -q "First, check if both services are already running" crates/goose/src/temporal_scheduler.rs; then
echo "✅ Found comprehensive service status checking"
else
echo "❌ Missing comprehensive service status checking"
exit 1
fi
# Check that we have proper port checking
if grep -q "check_port_in_use" crates/goose/src/temporal_scheduler.rs; then
echo "✅ Found port checking functionality"
else
echo "❌ Missing port checking functionality"
exit 1
fi
echo ""
echo "🎉 All checks passed!"
echo "✅ TemporalScheduler now has improved service detection"
echo "✅ Port conflicts are handled gracefully"
echo "✅ Existing services are detected and connected to"
echo "✅ No more crashes when services are already running"
echo ""
echo "📋 Summary of improvements:"
echo " • Enhanced ensure_services_running() logic"
echo " • Added port conflict detection with service verification"
echo " • Improved error handling for various service states"
echo " • Added comprehensive unit tests"
echo " • Now connects to existing services instead of failing"
echo ""
echo "🚀 The TemporalScheduler is ready for production use!"

View File

@@ -10,7 +10,7 @@
"license": {
"name": "Apache-2.0"
},
"version": "1.0.25"
"version": "1.0.24"
},
"paths": {
"/agent/tools": {

Binary file not shown.

Before

Width:  |  Height:  |  Size: 1.1 KiB

View File

@@ -1,4 +0,0 @@
<svg width="44" height="44" viewBox="0 0 44 44" fill="none" xmlns="http://www.w3.org/2000/svg">
<circle cx="22.6666" cy="21.3333" r="21.3333" fill="#101010"/>
<path d="M15.9289 14.9289C19.8342 11.0237 26.1663 11.0237 30.0715 14.9289C33.9768 18.8342 33.9768 25.1663 30.0715 29.0715C26.1663 32.9768 19.8342 32.9768 15.9289 29.0715C12.0237 25.1663 12.0237 18.8342 15.9289 14.9289ZM28.6574 16.343C25.5333 13.2188 20.4672 13.2188 17.343 16.343C14.2188 19.4672 14.2188 24.5333 17.343 27.6574C20.4672 30.7816 25.5333 30.7816 28.6574 27.6574C31.7816 24.5333 31.7816 19.4672 28.6574 16.343ZM24.0002 22.0002H26.5002V24.0002H23.0002C22.4479 24.0002 22.0002 23.5525 22.0002 23.0002V18.5002H24.0002V22.0002Z" fill="white"/>
</svg>

Before

Width:  |  Height:  |  Size: 718 B

View File

@@ -336,7 +336,7 @@ export default function RecipeEditor({ config }: RecipeEditorProps) {
<button
onClick={() => setIsScheduleModalOpen(true)}
disabled={!requiredFieldsAreFilled()}
className="w-full h-[60px] rounded-none border-t text-gray-900 dark:text-white hover:bg-gray-50 dark:border-gray-600 text-lg font-medium"
className="w-full p-3 bg-green-500 text-white rounded-lg hover:bg-green-600 disabled:opacity-50 disabled:hover:bg-green-500"
>
Create Schedule from Recipe
</button>

View File

@@ -7,7 +7,6 @@ import cronstrue from 'cronstrue';
import * as yaml from 'yaml';
import { Buffer } from 'buffer';
import { Recipe } from '../../recipe';
import ClockIcon from '../../assets/clock-icon.svg';
type FrequencyValue = 'once' | 'hourly' | 'daily' | 'weekly' | 'monthly';
@@ -500,24 +499,17 @@ export const CreateScheduleModal: React.FC<CreateScheduleModalProps> = ({
return (
<div className="fixed inset-0 bg-black/20 backdrop-blur-sm z-40 flex items-center justify-center p-4">
<Card className="w-full max-w-md bg-bgApp shadow-xl rounded-3xl z-50 flex flex-col max-h-[90vh] overflow-hidden">
<div className="px-8 pt-8 pb-4 flex-shrink-0 text-center">
<div className="flex flex-col items-center">
<img src={ClockIcon} alt="Clock" className="w-11 h-11 mb-2" />
<h2 className="text-base font-semibold text-gray-900 dark:text-white">
Create New Schedule
</h2>
<p className="text-base text-gray-500 dark:text-gray-400 mt-2 max-w-sm">
Create a new schedule using the settings below to do things like automatically run
tasks or create files
</p>
</div>
<Card className="w-full max-w-md bg-bgApp shadow-xl rounded-lg z-50 flex flex-col max-h-[90vh] overflow-hidden">
<div className="px-6 pt-6 pb-4 flex-shrink-0">
<h2 className="text-xl font-semibold text-gray-900 dark:text-white">
Create New Schedule
</h2>
</div>
<form
id="new-schedule-form"
onSubmit={handleLocalSubmit}
className="px-8 py-4 space-y-4 flex-grow overflow-y-auto"
className="px-6 py-4 space-y-4 flex-grow overflow-y-auto"
>
{apiErrorExternally && (
<p className="text-red-500 text-sm mb-3 p-2 bg-red-100 dark:bg-red-900/30 rounded-md border border-red-500/50">
@@ -532,7 +524,7 @@ export const CreateScheduleModal: React.FC<CreateScheduleModalProps> = ({
<div>
<label htmlFor="scheduleId-modal" className={modalLabelClassName}>
Name:
Schedule ID:
</label>
<Input
type="text"
@@ -545,30 +537,30 @@ export const CreateScheduleModal: React.FC<CreateScheduleModalProps> = ({
</div>
<div>
<label className={modalLabelClassName}>Source:</label>
<label className={modalLabelClassName}>Recipe Source:</label>
<div className="space-y-2">
<div className="flex bg-gray-100 dark:bg-gray-700 rounded-full p-1">
<div className="flex gap-2">
<button
type="button"
onClick={() => setSourceType('file')}
className={`flex-1 px-4 py-2 text-sm font-medium rounded-full transition-all ${
className={`px-3 py-2 text-sm rounded-md border ${
sourceType === 'file'
? 'bg-white dark:bg-gray-800 text-gray-900 dark:text-white shadow-sm'
: 'text-gray-600 dark:text-gray-400 hover:text-gray-900 dark:hover:text-white'
? 'bg-blue-500 text-white border-blue-500'
: 'bg-gray-100 dark:bg-gray-700 text-gray-700 dark:text-gray-300 border-gray-300 dark:border-gray-600'
}`}
>
YAML
YAML File
</button>
<button
type="button"
onClick={() => setSourceType('deeplink')}
className={`flex-1 px-4 py-2 text-sm font-medium rounded-full transition-all ${
className={`px-3 py-2 text-sm rounded-md border ${
sourceType === 'deeplink'
? 'bg-white dark:bg-gray-800 text-gray-900 dark:text-white shadow-sm'
: 'text-gray-600 dark:text-gray-400 hover:text-gray-900 dark:hover:text-white'
? 'bg-blue-500 text-white border-blue-500'
: 'bg-gray-100 dark:bg-gray-700 text-gray-700 dark:text-gray-300 border-gray-300 dark:border-gray-600'
}`}
>
Deep link
Deep Link
</button>
</div>
@@ -578,7 +570,7 @@ export const CreateScheduleModal: React.FC<CreateScheduleModalProps> = ({
type="button"
variant="outline"
onClick={handleBrowseFile}
className="w-full justify-center rounded-full"
className="w-full justify-center"
>
Browse for YAML file...
</Button>
@@ -597,7 +589,6 @@ export const CreateScheduleModal: React.FC<CreateScheduleModalProps> = ({
value={deepLinkInput}
onChange={(e) => handleDeepLinkChange(e.target.value)}
placeholder="Paste goose://bot or goose://recipe link here..."
className="rounded-full"
/>
{parsedRecipe && (
<div className="mt-2 p-2 bg-green-100 dark:bg-green-900/30 rounded-md border border-green-500/50">
@@ -748,15 +739,6 @@ export const CreateScheduleModal: React.FC<CreateScheduleModalProps> = ({
{/* Actions */}
<div className="mt-[8px] ml-[-24px] mr-[-24px] pt-[16px]">
<Button
type="submit"
form="new-schedule-form"
variant="ghost"
disabled={isLoadingExternally}
className="w-full h-[60px] rounded-none border-t text-gray-900 dark:text-white hover:bg-gray-50 dark:border-gray-600 text-lg font-medium"
>
{isLoadingExternally ? 'Creating...' : 'Create Schedule'}
</Button>
<Button
type="button"
variant="ghost"
@@ -766,6 +748,15 @@ export const CreateScheduleModal: React.FC<CreateScheduleModalProps> = ({
>
Cancel
</Button>
<Button
type="submit"
form="new-schedule-form"
variant="default"
disabled={isLoadingExternally}
className="w-full h-[60px] rounded-none border-t dark:border-gray-600 text-lg dark:text-white dark:border-gray-600 font-regular"
>
{isLoadingExternally ? 'Creating...' : 'Create Schedule'}
</Button>
</div>
</Card>
</div>

View File

@@ -86,8 +86,12 @@ export const ScheduleFromRecipeModal: React.FC<ScheduleFromRecipeModalProps> = (
Recipe Details:
</h3>
<div className="bg-gray-50 dark:bg-gray-800 p-3 rounded-md">
<p className="text-sm font-medium text-gray-900 dark:text-white">{recipe.title}</p>
<p className="text-xs text-gray-600 dark:text-gray-400 mt-1">{recipe.description}</p>
<p className="text-sm font-medium text-gray-900 dark:text-white">
{recipe.title}
</p>
<p className="text-xs text-gray-600 dark:text-gray-400 mt-1">
{recipe.description}
</p>
</div>
</div>
@@ -96,13 +100,22 @@ export const ScheduleFromRecipeModal: React.FC<ScheduleFromRecipeModalProps> = (
Recipe Deep Link:
</label>
<div className="flex items-center">
<Input type="text" value={deepLink} readOnly className="flex-1 text-xs font-mono" />
<Input
type="text"
value={deepLink}
readOnly
className="flex-1 text-xs font-mono"
/>
<Button
type="button"
onClick={handleCopy}
className="ml-2 px-3 py-2 bg-blue-500 text-white rounded-md hover:bg-blue-600 flex items-center"
>
{copied ? <Check className="w-4 h-4" /> : <Copy className="w-4 h-4" />}
{copied ? (
<Check className="w-4 h-4" />
) : (
<Copy className="w-4 h-4" />
)}
</Button>
</div>
<p className="text-xs text-gray-500 dark:text-gray-400 mt-1">
@@ -116,14 +129,14 @@ export const ScheduleFromRecipeModal: React.FC<ScheduleFromRecipeModalProps> = (
type="button"
variant="outline"
onClick={handleClose}
className="flex-1 rounded-xl hover:bg-bgSubtle text-textSubtle"
className="flex-1"
>
Cancel
</Button>
<Button
type="button"
onClick={handleCreateSchedule}
className="flex-1 bg-bgAppInverse text-sm text-textProminentInverse rounded-xl hover:bg-bgStandardInverse transition-colors"
className="flex-1 bg-green-500 hover:bg-green-600 text-white"
>
Create Schedule
</Button>

View File

@@ -16,12 +16,11 @@ import MoreMenuLayout from '../more_menu/MoreMenuLayout';
import { Card } from '../ui/card';
import { Button } from '../ui/button';
import { TrashIcon } from '../icons/TrashIcon';
import { Plus, RefreshCw, Pause, Play, Edit, Square, Eye, MoreHorizontal } from 'lucide-react';
import { Plus, RefreshCw, Pause, Play, Edit, Square, Eye } from 'lucide-react';
import { CreateScheduleModal, NewSchedulePayload } from './CreateScheduleModal';
import { EditScheduleModal } from './EditScheduleModal';
import ScheduleDetailView from './ScheduleDetailView';
import { toastError, toastSuccess } from '../../toasts';
import { Popover, PopoverContent, PopoverTrigger } from '../ui/popover';
import cronstrue from 'cronstrue';
interface SchedulesViewProps {
@@ -68,7 +67,7 @@ const SchedulesView: React.FC<SchedulesViewProps> = ({ onClose }) => {
useEffect(() => {
if (viewingScheduleId === null) {
fetchSchedules();
// Check for pending deep link from recipe editor
const pendingDeepLink = localStorage.getItem('pendingScheduleDeepLink');
if (pendingDeepLink) {
@@ -383,7 +382,7 @@ const SchedulesView: React.FC<SchedulesViewProps> = ({ onClose }) => {
onClick={handleRefresh}
disabled={isRefreshing || isLoading}
variant="outline"
className="w-full md:w-auto flex items-center gap-2 justify-center rounded-full [&>svg]:!size-4"
className="w-full md:w-auto flex items-center gap-2 justify-center"
>
<RefreshCw className={`h-4 w-4 ${isRefreshing ? 'animate-spin' : ''}`} />
{isRefreshing ? 'Refreshing...' : 'Refresh'}
@@ -454,118 +453,111 @@ const SchedulesView: React.FC<SchedulesViewProps> = ({ onClose }) => {
</p>
)}
</div>
<div className="flex-shrink-0">
<Popover>
<PopoverTrigger asChild>
<div className="flex-shrink-0 flex items-center gap-1">
{!job.currently_running && (
<>
<Button
variant="ghost"
size="icon"
onClick={(e) => {
e.stopPropagation();
handleOpenEditModal(job);
}}
className="text-gray-500 dark:text-gray-400 hover:text-gray-700 dark:hover:text-gray-300 hover:bg-gray-100/50 dark:hover:bg-gray-800/50"
className="text-gray-500 dark:text-gray-400 hover:text-blue-500 dark:hover:text-blue-400 hover:bg-blue-100/50 dark:hover:bg-blue-900/30"
title={`Edit schedule ${job.id}`}
disabled={
pausingScheduleIds.has(job.id) ||
deletingScheduleIds.has(job.id) ||
isSubmitting
}
>
<MoreHorizontal className="w-4 h-4" />
<Edit className="w-4 h-4" />
</Button>
</PopoverTrigger>
<PopoverContent
className="w-48 p-1 bg-white dark:bg-gray-800 border border-gray-200 dark:border-gray-600 shadow-lg"
align="end"
>
<div className="space-y-1">
{!job.currently_running && (
<>
<button
onClick={(e) => {
e.stopPropagation();
handleOpenEditModal(job);
}}
disabled={
pausingScheduleIds.has(job.id) ||
deletingScheduleIds.has(job.id) ||
isSubmitting
}
className="w-full flex items-center justify-between px-3 py-2 text-sm text-gray-900 dark:text-white hover:bg-gray-100 dark:hover:bg-gray-700 rounded-md disabled:opacity-50 disabled:cursor-not-allowed"
>
<span>Edit</span>
<Edit className="w-4 h-4" />
</button>
<button
onClick={(e) => {
e.stopPropagation();
if (job.paused) {
handleUnpauseSchedule(job.id);
} else {
handlePauseSchedule(job.id);
}
}}
disabled={
pausingScheduleIds.has(job.id) ||
deletingScheduleIds.has(job.id)
}
className="w-full flex items-center justify-between px-3 py-2 text-sm text-gray-900 dark:text-white hover:bg-gray-100 dark:hover:bg-gray-700 rounded-md disabled:opacity-50 disabled:cursor-not-allowed"
>
<span>{job.paused ? 'Resume schedule' : 'Stop schedule'}</span>
{job.paused ? (
<Play className="w-4 h-4" />
) : (
<Pause className="w-4 h-4" />
)}
</button>
</>
)}
{job.currently_running && (
<>
<button
onClick={(e) => {
e.stopPropagation();
handleInspectRunningJob(job.id);
}}
disabled={
inspectingScheduleIds.has(job.id) ||
killingScheduleIds.has(job.id)
}
className="w-full flex items-center justify-between px-3 py-2 text-sm text-gray-900 dark:text-white hover:bg-gray-100 dark:hover:bg-gray-700 rounded-md disabled:opacity-50 disabled:cursor-not-allowed"
>
<span>Inspect</span>
<Eye className="w-4 h-4" />
</button>
<button
onClick={(e) => {
e.stopPropagation();
handleKillRunningJob(job.id);
}}
disabled={
killingScheduleIds.has(job.id) ||
inspectingScheduleIds.has(job.id)
}
className="w-full flex items-center justify-between px-3 py-2 text-sm text-gray-900 dark:text-white hover:bg-gray-100 dark:hover:bg-gray-700 rounded-md disabled:opacity-50 disabled:cursor-not-allowed"
>
<span>Kill job</span>
<Square className="w-4 h-4" />
</button>
</>
)}
<hr className="border-gray-200 dark:border-gray-600 my-1" />
<button
onClick={(e) => {
e.stopPropagation();
handleDeleteSchedule(job.id);
}}
disabled={
pausingScheduleIds.has(job.id) ||
deletingScheduleIds.has(job.id) ||
killingScheduleIds.has(job.id) ||
inspectingScheduleIds.has(job.id)
<Button
variant="ghost"
size="icon"
onClick={(e) => {
e.stopPropagation();
if (job.paused) {
handleUnpauseSchedule(job.id);
} else {
handlePauseSchedule(job.id);
}
className="w-full flex items-center justify-between px-3 py-2 text-sm text-red-600 dark:text-red-400 hover:bg-red-50 dark:hover:bg-red-900/20 rounded-md disabled:opacity-50 disabled:cursor-not-allowed"
>
<span>Delete</span>
<TrashIcon className="w-4 h-4" />
</button>
</div>
</PopoverContent>
</Popover>
}}
className={`${
job.paused
? 'text-green-500 dark:text-green-400 hover:text-green-600 dark:hover:text-green-300 hover:bg-green-100/50 dark:hover:bg-green-900/30'
: 'text-orange-500 dark:text-orange-400 hover:text-orange-600 dark:hover:text-orange-300 hover:bg-orange-100/50 dark:hover:bg-orange-900/30'
}`}
title={
job.paused
? `Unpause schedule ${job.id}`
: `Pause schedule ${job.id}`
}
disabled={
pausingScheduleIds.has(job.id) || deletingScheduleIds.has(job.id)
}
>
{job.paused ? (
<Play className="w-4 h-4" />
) : (
<Pause className="w-4 h-4" />
)}
</Button>
</>
)}
{job.currently_running && (
<>
<Button
variant="ghost"
size="icon"
onClick={(e) => {
e.stopPropagation();
handleInspectRunningJob(job.id);
}}
className="text-blue-500 dark:text-blue-400 hover:text-blue-600 dark:hover:text-blue-300 hover:bg-blue-100/50 dark:hover:bg-blue-900/30"
title={`Inspect running job ${job.id}`}
disabled={
inspectingScheduleIds.has(job.id) || killingScheduleIds.has(job.id)
}
>
<Eye className="w-4 h-4" />
</Button>
<Button
variant="ghost"
size="icon"
onClick={(e) => {
e.stopPropagation();
handleKillRunningJob(job.id);
}}
className="text-red-500 dark:text-red-400 hover:text-red-600 dark:hover:text-red-300 hover:bg-red-100/50 dark:hover:bg-red-900/30"
title={`Kill running job ${job.id}`}
disabled={
killingScheduleIds.has(job.id) || inspectingScheduleIds.has(job.id)
}
>
<Square className="w-4 h-4" />
</Button>
</>
)}
<Button
variant="ghost"
size="icon"
onClick={(e) => {
e.stopPropagation();
handleDeleteSchedule(job.id);
}}
className="text-gray-500 dark:text-gray-400 hover:text-red-500 dark:hover:text-red-400 hover:bg-red-100/50 dark:hover:bg-red-900/30"
title={`Delete schedule ${job.id}`}
disabled={
pausingScheduleIds.has(job.id) ||
deletingScheduleIds.has(job.id) ||
killingScheduleIds.has(job.id) ||
inspectingScheduleIds.has(job.id)
}
>
<TrashIcon className="w-5 h-5" />
</Button>
</div>
</div>
</Card>