Files
goose/temporal-service/main.go
2025-06-09 10:57:19 -07:00

544 lines
16 KiB
Go

package main
import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"os"
"os/exec"
"os/signal"
"strings"
"syscall"
"time"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"
)
const (
TaskQueueName = "goose-task-queue"
Namespace = "default"
)
// Global service instance for activities to access
var globalService *TemporalService
// Request/Response types for HTTP API
type JobRequest struct {
Action string `json:"action"` // create, delete, pause, unpause, list, run_now
JobID string `json:"job_id"`
CronExpr string `json:"cron"`
RecipePath string `json:"recipe_path"`
}
type JobResponse struct {
Success bool `json:"success"`
Message string `json:"message"`
Jobs []JobStatus `json:"jobs,omitempty"`
Data interface{} `json:"data,omitempty"`
}
type JobStatus struct {
ID string `json:"id"`
CronExpr string `json:"cron"`
RecipePath string `json:"recipe_path"`
LastRun *string `json:"last_run,omitempty"`
NextRun *string `json:"next_run,omitempty"`
CurrentlyRunning bool `json:"currently_running"`
Paused bool `json:"paused"`
CreatedAt time.Time `json:"created_at"`
}
type RunNowResponse struct {
SessionID string `json:"session_id"`
}
// TemporalService manages the Temporal client and provides HTTP API
type TemporalService struct {
client client.Client
worker worker.Worker
scheduleJobs map[string]*JobStatus // In-memory job tracking
runningJobs map[string]bool // Track which jobs are currently running
}
// NewTemporalService creates a new Temporal service that connects to existing server
func NewTemporalService() (*TemporalService, error) {
// Create client (assumes Temporal server is already running)
c, err := client.Dial(client.Options{
HostPort: "127.0.0.1:7233",
Namespace: Namespace,
})
if err != nil {
return nil, fmt.Errorf("failed to create temporal client: %w", err)
}
// Create worker
w := worker.New(c, TaskQueueName, worker.Options{})
w.RegisterWorkflow(GooseJobWorkflow)
w.RegisterActivity(ExecuteGooseRecipe)
if err := w.Start(); err != nil {
c.Close()
return nil, fmt.Errorf("failed to start worker: %w", err)
}
log.Println("Connected to Temporal server successfully")
service := &TemporalService{
client: c,
worker: w,
scheduleJobs: make(map[string]*JobStatus),
runningJobs: make(map[string]bool),
}
// Set global service for activities
globalService = service
return service, nil
}
// Stop gracefully shuts down the Temporal service
func (ts *TemporalService) Stop() {
log.Println("Shutting down Temporal service...")
if ts.worker != nil {
ts.worker.Stop()
}
if ts.client != nil {
ts.client.Close()
}
log.Println("Temporal service stopped")
}
// Workflow definition for executing Goose recipes
func GooseJobWorkflow(ctx workflow.Context, jobID, recipePath string) (string, error) {
logger := workflow.GetLogger(ctx)
logger.Info("Starting Goose job workflow", "jobID", jobID, "recipePath", recipePath)
ao := workflow.ActivityOptions{
StartToCloseTimeout: 2 * time.Hour, // Allow up to 2 hours for job execution
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: time.Minute,
MaximumAttempts: 3,
NonRetryableErrorTypes: []string{"InvalidRecipeError"},
},
}
ctx = workflow.WithActivityOptions(ctx, ao)
var sessionID string
err := workflow.ExecuteActivity(ctx, ExecuteGooseRecipe, jobID, recipePath).Get(ctx, &sessionID)
if err != nil {
logger.Error("Goose job workflow failed", "jobID", jobID, "error", err)
return "", err
}
logger.Info("Goose job workflow completed", "jobID", jobID, "sessionID", sessionID)
return sessionID, nil
}
// Activity definition for executing Goose recipes
func ExecuteGooseRecipe(ctx context.Context, jobID, recipePath string) (string, error) {
logger := activity.GetLogger(ctx)
logger.Info("Executing Goose recipe", "jobID", jobID, "recipePath", recipePath)
// Mark job as running at the start
if globalService != nil {
globalService.markJobAsRunning(jobID)
// Ensure we mark it as not running when we're done
defer globalService.markJobAsNotRunning(jobID)
}
// Check if recipe file exists
if _, err := os.Stat(recipePath); os.IsNotExist(err) {
return "", temporal.NewNonRetryableApplicationError(
fmt.Sprintf("recipe file not found: %s", recipePath),
"InvalidRecipeError",
err,
)
}
// Execute the Goose recipe via the executor binary
cmd := exec.CommandContext(ctx, "goose-scheduler-executor", jobID, recipePath)
cmd.Env = append(os.Environ(), fmt.Sprintf("GOOSE_JOB_ID=%s", jobID))
output, err := cmd.Output()
if err != nil {
if exitError, ok := err.(*exec.ExitError); ok {
logger.Error("Recipe execution failed", "jobID", jobID, "stderr", string(exitError.Stderr))
return "", fmt.Errorf("recipe execution failed: %s", string(exitError.Stderr))
}
return "", fmt.Errorf("failed to execute recipe: %w", err)
}
sessionID := strings.TrimSpace(string(output))
logger.Info("Recipe executed successfully", "jobID", jobID, "sessionID", sessionID)
return sessionID, nil
}
// HTTP API handlers
func (ts *TemporalService) handleJobs(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
if r.Method != http.MethodPost {
ts.writeErrorResponse(w, http.StatusMethodNotAllowed, "Method not allowed")
return
}
var req JobRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
ts.writeErrorResponse(w, http.StatusBadRequest, fmt.Sprintf("Invalid JSON: %v", err))
return
}
var resp JobResponse
switch req.Action {
case "create":
resp = ts.createSchedule(req)
case "delete":
resp = ts.deleteSchedule(req)
case "pause":
resp = ts.pauseSchedule(req)
case "unpause":
resp = ts.unpauseSchedule(req)
case "list":
resp = ts.listSchedules()
case "run_now":
resp = ts.runNow(req)
default:
resp = JobResponse{Success: false, Message: fmt.Sprintf("Unknown action: %s", req.Action)}
}
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(resp)
}
func (ts *TemporalService) createSchedule(req JobRequest) JobResponse {
if req.JobID == "" || req.CronExpr == "" || req.RecipePath == "" {
return JobResponse{Success: false, Message: "Missing required fields: job_id, cron, recipe_path"}
}
// Check if job already exists
if _, exists := ts.scheduleJobs[req.JobID]; exists {
return JobResponse{Success: false, Message: fmt.Sprintf("Job with ID '%s' already exists", req.JobID)}
}
// Validate recipe file exists
if _, err := os.Stat(req.RecipePath); os.IsNotExist(err) {
return JobResponse{Success: false, Message: fmt.Sprintf("Recipe file not found: %s", req.RecipePath)}
}
scheduleID := fmt.Sprintf("goose-job-%s", req.JobID)
// Create Temporal schedule
schedule := client.ScheduleOptions{
ID: scheduleID,
Spec: client.ScheduleSpec{
CronExpressions: []string{req.CronExpr},
},
Action: &client.ScheduleWorkflowAction{
ID: fmt.Sprintf("workflow-%s-{{.ScheduledTime.Unix}}", req.JobID),
Workflow: GooseJobWorkflow,
Args: []interface{}{req.JobID, req.RecipePath},
TaskQueue: TaskQueueName,
},
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
_, err := ts.client.ScheduleClient().Create(ctx, schedule)
if err != nil {
return JobResponse{Success: false, Message: fmt.Sprintf("Failed to create schedule: %v", err)}
}
// Track job in memory
jobStatus := &JobStatus{
ID: req.JobID,
CronExpr: req.CronExpr,
RecipePath: req.RecipePath,
CurrentlyRunning: false,
Paused: false,
CreatedAt: time.Now(),
}
ts.scheduleJobs[req.JobID] = jobStatus
log.Printf("Created schedule for job: %s", req.JobID)
return JobResponse{Success: true, Message: "Schedule created successfully"}
}
func (ts *TemporalService) deleteSchedule(req JobRequest) JobResponse {
if req.JobID == "" {
return JobResponse{Success: false, Message: "Missing job_id"}
}
scheduleID := fmt.Sprintf("goose-job-%s", req.JobID)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
handle := ts.client.ScheduleClient().GetHandle(ctx, scheduleID)
err := handle.Delete(ctx)
if err != nil {
return JobResponse{Success: false, Message: fmt.Sprintf("Failed to delete schedule: %v", err)}
}
// Remove from memory
delete(ts.scheduleJobs, req.JobID)
log.Printf("Deleted schedule for job: %s", req.JobID)
return JobResponse{Success: true, Message: "Schedule deleted successfully"}
}
func (ts *TemporalService) pauseSchedule(req JobRequest) JobResponse {
if req.JobID == "" {
return JobResponse{Success: false, Message: "Missing job_id"}
}
scheduleID := fmt.Sprintf("goose-job-%s", req.JobID)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
handle := ts.client.ScheduleClient().GetHandle(ctx, scheduleID)
err := handle.Pause(ctx, client.SchedulePauseOptions{
Note: "Paused via API",
})
if err != nil {
return JobResponse{Success: false, Message: fmt.Sprintf("Failed to pause schedule: %v", err)}
}
// Update in memory
if job, exists := ts.scheduleJobs[req.JobID]; exists {
job.Paused = true
}
log.Printf("Paused schedule for job: %s", req.JobID)
return JobResponse{Success: true, Message: "Schedule paused successfully"}
}
func (ts *TemporalService) unpauseSchedule(req JobRequest) JobResponse {
if req.JobID == "" {
return JobResponse{Success: false, Message: "Missing job_id"}
}
scheduleID := fmt.Sprintf("goose-job-%s", req.JobID)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
handle := ts.client.ScheduleClient().GetHandle(ctx, scheduleID)
err := handle.Unpause(ctx, client.ScheduleUnpauseOptions{
Note: "Unpaused via API",
})
if err != nil {
return JobResponse{Success: false, Message: fmt.Sprintf("Failed to unpause schedule: %v", err)}
}
// Update in memory
if job, exists := ts.scheduleJobs[req.JobID]; exists {
job.Paused = false
}
log.Printf("Unpaused schedule for job: %s", req.JobID)
return JobResponse{Success: true, Message: "Schedule unpaused successfully"}
}
func (ts *TemporalService) listSchedules() JobResponse {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// List all schedules from Temporal
iter, err := ts.client.ScheduleClient().List(ctx, client.ScheduleListOptions{})
if err != nil {
return JobResponse{Success: false, Message: fmt.Sprintf("Failed to list schedules: %v", err)}
}
var jobs []JobStatus
for iter.HasNext() {
schedule, err := iter.Next()
if err != nil {
log.Printf("Error listing schedules: %v", err)
continue
}
// Extract job ID from schedule ID
if strings.HasPrefix(schedule.ID, "goose-job-") {
jobID := strings.TrimPrefix(schedule.ID, "goose-job-")
// Get additional details from in-memory tracking
var jobStatus JobStatus
if tracked, exists := ts.scheduleJobs[jobID]; exists {
jobStatus = *tracked
} else {
// Fallback for schedules not in memory
jobStatus = JobStatus{
ID: jobID,
CreatedAt: time.Now(), // We don't have the real creation time
}
}
// Update with Temporal schedule info
if len(schedule.Spec.CronExpressions) > 0 {
jobStatus.CronExpr = schedule.Spec.CronExpressions[0]
}
// Get detailed schedule information including paused state and running status
scheduleHandle := ts.client.ScheduleClient().GetHandle(ctx, schedule.ID)
if desc, err := scheduleHandle.Describe(ctx); err == nil {
jobStatus.Paused = desc.Schedule.State.Paused
// Check if there are any running workflows for this job
jobStatus.CurrentlyRunning = ts.isJobCurrentlyRunning(ctx, jobID)
// Update last run time if available
if len(desc.Info.RecentActions) > 0 {
lastAction := desc.Info.RecentActions[len(desc.Info.RecentActions)-1]
if !lastAction.ActualTime.IsZero() {
lastRunStr := lastAction.ActualTime.Format(time.RFC3339)
jobStatus.LastRun = &lastRunStr
}
}
// Update next run time if available - this field may not exist in older SDK versions
// We'll skip this for now to avoid compilation errors
} else {
log.Printf("Warning: Could not get detailed info for schedule %s: %v", schedule.ID, err)
}
// Update in-memory tracking with latest info
ts.scheduleJobs[jobID] = &jobStatus
jobs = append(jobs, jobStatus)
}
}
return JobResponse{Success: true, Jobs: jobs}
}
// isJobCurrentlyRunning checks if there are any running workflows for the given job ID
func (ts *TemporalService) isJobCurrentlyRunning(ctx context.Context, jobID string) bool {
// Check our in-memory tracking of running jobs
if running, exists := ts.runningJobs[jobID]; exists && running {
return true
}
return false
}
// markJobAsRunning sets a job as currently running
func (ts *TemporalService) markJobAsRunning(jobID string) {
ts.runningJobs[jobID] = true
log.Printf("Marked job %s as running", jobID)
}
// markJobAsNotRunning sets a job as not currently running
func (ts *TemporalService) markJobAsNotRunning(jobID string) {
delete(ts.runningJobs, jobID)
log.Printf("Marked job %s as not running", jobID)
}
func (ts *TemporalService) runNow(req JobRequest) JobResponse {
if req.JobID == "" {
return JobResponse{Success: false, Message: "Missing job_id"}
}
// Get job details
job, exists := ts.scheduleJobs[req.JobID]
if !exists {
return JobResponse{Success: false, Message: fmt.Sprintf("Job '%s' not found", req.JobID)}
}
// Execute workflow immediately
workflowOptions := client.StartWorkflowOptions{
ID: fmt.Sprintf("manual-%s-%d", req.JobID, time.Now().Unix()),
TaskQueue: TaskQueueName,
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
we, err := ts.client.ExecuteWorkflow(ctx, workflowOptions, GooseJobWorkflow, req.JobID, job.RecipePath)
if err != nil {
return JobResponse{Success: false, Message: fmt.Sprintf("Failed to start workflow: %v", err)}
}
// Don't wait for completion in run_now, just return the workflow ID
log.Printf("Manual execution started for job: %s, workflow: %s", req.JobID, we.GetID())
return JobResponse{
Success: true,
Message: "Job execution started",
Data: RunNowResponse{SessionID: we.GetID()}, // Return workflow ID as session ID for now
}
}
func (ts *TemporalService) writeErrorResponse(w http.ResponseWriter, statusCode int, message string) {
w.WriteHeader(statusCode)
json.NewEncoder(w).Encode(JobResponse{Success: false, Message: message})
}
func (ts *TemporalService) handleHealth(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(map[string]string{"status": "healthy"})
}
func main() {
port := os.Getenv("PORT")
if port == "" {
port = "8080"
}
log.Println("Starting Temporal service...")
log.Println("Note: This service requires a running Temporal server at 127.0.0.1:7233")
log.Println("Start Temporal server with: temporal server start-dev")
// Create Temporal service
service, err := NewTemporalService()
if err != nil {
log.Fatalf("Failed to create Temporal service: %v", err)
}
// Set up HTTP server
mux := http.NewServeMux()
mux.HandleFunc("/jobs", service.handleJobs)
mux.HandleFunc("/health", service.handleHealth)
server := &http.Server{
Addr: ":" + port,
Handler: mux,
}
// Handle graceful shutdown
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigChan
log.Println("Received shutdown signal")
// Shutdown HTTP server
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
server.Shutdown(ctx)
// Stop Temporal service
service.Stop()
os.Exit(0)
}()
log.Printf("Temporal service starting on port %s", port)
log.Printf("Health endpoint: http://localhost:%s/health", port)
log.Printf("Jobs endpoint: http://localhost:%s/jobs", port)
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf("HTTP server failed: %v", err)
}
}