mirror of
https://github.com/aljazceru/goose.git
synced 2025-12-18 22:54:24 +01:00
665 lines
20 KiB
Go
665 lines
20 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log"
|
|
"net/http"
|
|
"os"
|
|
"os/exec"
|
|
"os/signal"
|
|
"path/filepath"
|
|
"strings"
|
|
"syscall"
|
|
"time"
|
|
|
|
"go.temporal.io/api/workflowservice/v1"
|
|
"go.temporal.io/sdk/activity"
|
|
"go.temporal.io/sdk/client"
|
|
"go.temporal.io/sdk/temporal"
|
|
"go.temporal.io/sdk/worker"
|
|
"go.temporal.io/sdk/workflow"
|
|
)
|
|
|
|
const (
|
|
TaskQueueName = "goose-task-queue"
|
|
Namespace = "default"
|
|
)
|
|
|
|
// Global service instance for activities to access
|
|
var globalService *TemporalService
|
|
|
|
// Request/Response types for HTTP API
|
|
type JobRequest struct {
|
|
Action string `json:"action"` // create, delete, pause, unpause, list, run_now
|
|
JobID string `json:"job_id"`
|
|
CronExpr string `json:"cron"`
|
|
RecipePath string `json:"recipe_path"`
|
|
}
|
|
|
|
type JobResponse struct {
|
|
Success bool `json:"success"`
|
|
Message string `json:"message"`
|
|
Jobs []JobStatus `json:"jobs,omitempty"`
|
|
Data interface{} `json:"data,omitempty"`
|
|
}
|
|
|
|
type JobStatus struct {
|
|
ID string `json:"id"`
|
|
CronExpr string `json:"cron"`
|
|
RecipePath string `json:"recipe_path"`
|
|
LastRun *string `json:"last_run,omitempty"`
|
|
NextRun *string `json:"next_run,omitempty"`
|
|
CurrentlyRunning bool `json:"currently_running"`
|
|
Paused bool `json:"paused"`
|
|
CreatedAt time.Time `json:"created_at"`
|
|
}
|
|
|
|
type RunNowResponse struct {
|
|
SessionID string `json:"session_id"`
|
|
}
|
|
|
|
// ensureTemporalServerRunning checks if Temporal server is running and starts it if needed
|
|
func ensureTemporalServerRunning() error {
|
|
log.Println("Checking if Temporal server is running...")
|
|
|
|
// Check if Temporal server is already running by trying to connect
|
|
if isTemporalServerRunning() {
|
|
log.Println("Temporal server is already running")
|
|
return nil
|
|
}
|
|
|
|
log.Println("Temporal server not running, attempting to start it...")
|
|
|
|
// Find the temporal CLI binary
|
|
temporalCmd, err := findTemporalCLI()
|
|
if err != nil {
|
|
return fmt.Errorf("could not find temporal CLI: %w", err)
|
|
}
|
|
|
|
log.Printf("Using Temporal CLI at: %s", temporalCmd)
|
|
|
|
// Start Temporal server in background
|
|
cmd := exec.Command(temporalCmd, "server", "start-dev",
|
|
"--db-filename", "temporal.db",
|
|
"--port", "7233",
|
|
"--ui-port", "8233",
|
|
"--log-level", "warn")
|
|
|
|
// Start the process in background
|
|
if err := cmd.Start(); err != nil {
|
|
return fmt.Errorf("failed to start Temporal server: %w", err)
|
|
}
|
|
|
|
log.Printf("Temporal server started with PID: %d", cmd.Process.Pid)
|
|
|
|
// Wait for server to be ready (with timeout)
|
|
timeout := time.After(30 * time.Second)
|
|
ticker := time.NewTicker(2 * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-timeout:
|
|
return fmt.Errorf("timeout waiting for Temporal server to start")
|
|
case <-ticker.C:
|
|
if isTemporalServerRunning() {
|
|
log.Println("Temporal server is now ready")
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// isTemporalServerRunning checks if Temporal server is accessible
|
|
func isTemporalServerRunning() bool {
|
|
// Try to create a client connection to check if server is running
|
|
c, err := client.Dial(client.Options{
|
|
HostPort: "127.0.0.1:7233",
|
|
Namespace: Namespace,
|
|
})
|
|
if err != nil {
|
|
return false
|
|
}
|
|
defer c.Close()
|
|
|
|
// Try a simple operation to verify the connection works
|
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
|
defer cancel()
|
|
|
|
_, err = c.WorkflowService().GetSystemInfo(ctx, &workflowservice.GetSystemInfoRequest{})
|
|
return err == nil
|
|
}
|
|
|
|
// findTemporalCLI attempts to find the temporal CLI binary
|
|
func findTemporalCLI() (string, error) {
|
|
// First, try to find temporal in PATH using exec.LookPath
|
|
if path, err := exec.LookPath("temporal"); err == nil {
|
|
// Verify it's the correct temporal CLI by checking version
|
|
cmd := exec.Command(path, "--version")
|
|
if err := cmd.Run(); err == nil {
|
|
return path, nil
|
|
}
|
|
}
|
|
|
|
// If not found in PATH, try different possible locations for the temporal CLI
|
|
possiblePaths := []string{
|
|
"./temporal", // Current directory
|
|
}
|
|
|
|
// Also try relative to the current executable (most important for bundled apps)
|
|
if exePath, err := os.Executable(); err == nil {
|
|
exeDir := filepath.Dir(exePath)
|
|
possiblePaths = append(possiblePaths,
|
|
filepath.Join(exeDir, "temporal"),
|
|
filepath.Join(exeDir, "temporal.exe"), // Windows
|
|
// Also try one level up (for development)
|
|
filepath.Join(exeDir, "..", "temporal"),
|
|
filepath.Join(exeDir, "..", "temporal.exe"),
|
|
)
|
|
}
|
|
|
|
// Test each possible path
|
|
for _, path := range possiblePaths {
|
|
if _, err := os.Stat(path); err == nil {
|
|
// File exists, test if it's executable and the right binary
|
|
cmd := exec.Command(path, "--version")
|
|
if err := cmd.Run(); err == nil {
|
|
return path, nil
|
|
}
|
|
}
|
|
}
|
|
|
|
return "", fmt.Errorf("temporal CLI not found in PATH or any of the expected locations: %v", possiblePaths)
|
|
}
|
|
|
|
// TemporalService manages the Temporal client and provides HTTP API
|
|
type TemporalService struct {
|
|
client client.Client
|
|
worker worker.Worker
|
|
scheduleJobs map[string]*JobStatus // In-memory job tracking
|
|
runningJobs map[string]bool // Track which jobs are currently running
|
|
}
|
|
|
|
// NewTemporalService creates a new Temporal service and ensures Temporal server is running
|
|
func NewTemporalService() (*TemporalService, error) {
|
|
// First, ensure Temporal server is running
|
|
if err := ensureTemporalServerRunning(); err != nil {
|
|
return nil, fmt.Errorf("failed to ensure Temporal server is running: %w", err)
|
|
}
|
|
|
|
// Create client (Temporal server should now be running)
|
|
c, err := client.Dial(client.Options{
|
|
HostPort: "127.0.0.1:7233",
|
|
Namespace: Namespace,
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create temporal client: %w", err)
|
|
}
|
|
|
|
// Create worker
|
|
w := worker.New(c, TaskQueueName, worker.Options{})
|
|
w.RegisterWorkflow(GooseJobWorkflow)
|
|
w.RegisterActivity(ExecuteGooseRecipe)
|
|
|
|
if err := w.Start(); err != nil {
|
|
c.Close()
|
|
return nil, fmt.Errorf("failed to start worker: %w", err)
|
|
}
|
|
|
|
log.Println("Connected to Temporal server successfully")
|
|
|
|
service := &TemporalService{
|
|
client: c,
|
|
worker: w,
|
|
scheduleJobs: make(map[string]*JobStatus),
|
|
runningJobs: make(map[string]bool),
|
|
}
|
|
|
|
// Set global service for activities
|
|
globalService = service
|
|
|
|
return service, nil
|
|
}
|
|
|
|
// Stop gracefully shuts down the Temporal service
|
|
func (ts *TemporalService) Stop() {
|
|
log.Println("Shutting down Temporal service...")
|
|
if ts.worker != nil {
|
|
ts.worker.Stop()
|
|
}
|
|
if ts.client != nil {
|
|
ts.client.Close()
|
|
}
|
|
log.Println("Temporal service stopped")
|
|
}
|
|
|
|
// Workflow definition for executing Goose recipes
|
|
func GooseJobWorkflow(ctx workflow.Context, jobID, recipePath string) (string, error) {
|
|
logger := workflow.GetLogger(ctx)
|
|
logger.Info("Starting Goose job workflow", "jobID", jobID, "recipePath", recipePath)
|
|
|
|
ao := workflow.ActivityOptions{
|
|
StartToCloseTimeout: 2 * time.Hour, // Allow up to 2 hours for job execution
|
|
RetryPolicy: &temporal.RetryPolicy{
|
|
InitialInterval: time.Second,
|
|
BackoffCoefficient: 2.0,
|
|
MaximumInterval: time.Minute,
|
|
MaximumAttempts: 3,
|
|
NonRetryableErrorTypes: []string{"InvalidRecipeError"},
|
|
},
|
|
}
|
|
ctx = workflow.WithActivityOptions(ctx, ao)
|
|
|
|
var sessionID string
|
|
err := workflow.ExecuteActivity(ctx, ExecuteGooseRecipe, jobID, recipePath).Get(ctx, &sessionID)
|
|
if err != nil {
|
|
logger.Error("Goose job workflow failed", "jobID", jobID, "error", err)
|
|
return "", err
|
|
}
|
|
|
|
logger.Info("Goose job workflow completed", "jobID", jobID, "sessionID", sessionID)
|
|
return sessionID, nil
|
|
}
|
|
|
|
// Activity definition for executing Goose recipes
|
|
func ExecuteGooseRecipe(ctx context.Context, jobID, recipePath string) (string, error) {
|
|
logger := activity.GetLogger(ctx)
|
|
logger.Info("Executing Goose recipe", "jobID", jobID, "recipePath", recipePath)
|
|
|
|
// Mark job as running at the start
|
|
if globalService != nil {
|
|
globalService.markJobAsRunning(jobID)
|
|
// Ensure we mark it as not running when we're done
|
|
defer globalService.markJobAsNotRunning(jobID)
|
|
}
|
|
|
|
// Check if recipe file exists
|
|
if _, err := os.Stat(recipePath); os.IsNotExist(err) {
|
|
return "", temporal.NewNonRetryableApplicationError(
|
|
fmt.Sprintf("recipe file not found: %s", recipePath),
|
|
"InvalidRecipeError",
|
|
err,
|
|
)
|
|
}
|
|
|
|
// Execute the Goose recipe via the executor binary
|
|
cmd := exec.CommandContext(ctx, "goose-scheduler-executor", jobID, recipePath)
|
|
cmd.Env = append(os.Environ(), fmt.Sprintf("GOOSE_JOB_ID=%s", jobID))
|
|
|
|
output, err := cmd.Output()
|
|
if err != nil {
|
|
if exitError, ok := err.(*exec.ExitError); ok {
|
|
logger.Error("Recipe execution failed", "jobID", jobID, "stderr", string(exitError.Stderr))
|
|
return "", fmt.Errorf("recipe execution failed: %s", string(exitError.Stderr))
|
|
}
|
|
return "", fmt.Errorf("failed to execute recipe: %w", err)
|
|
}
|
|
|
|
sessionID := strings.TrimSpace(string(output))
|
|
logger.Info("Recipe executed successfully", "jobID", jobID, "sessionID", sessionID)
|
|
return sessionID, nil
|
|
}
|
|
|
|
// HTTP API handlers
|
|
|
|
func (ts *TemporalService) handleJobs(w http.ResponseWriter, r *http.Request) {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
|
|
if r.Method != http.MethodPost {
|
|
ts.writeErrorResponse(w, http.StatusMethodNotAllowed, "Method not allowed")
|
|
return
|
|
}
|
|
|
|
var req JobRequest
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
ts.writeErrorResponse(w, http.StatusBadRequest, fmt.Sprintf("Invalid JSON: %v", err))
|
|
return
|
|
}
|
|
|
|
var resp JobResponse
|
|
|
|
switch req.Action {
|
|
case "create":
|
|
resp = ts.createSchedule(req)
|
|
case "delete":
|
|
resp = ts.deleteSchedule(req)
|
|
case "pause":
|
|
resp = ts.pauseSchedule(req)
|
|
case "unpause":
|
|
resp = ts.unpauseSchedule(req)
|
|
case "list":
|
|
resp = ts.listSchedules()
|
|
case "run_now":
|
|
resp = ts.runNow(req)
|
|
default:
|
|
resp = JobResponse{Success: false, Message: fmt.Sprintf("Unknown action: %s", req.Action)}
|
|
}
|
|
|
|
w.WriteHeader(http.StatusOK)
|
|
json.NewEncoder(w).Encode(resp)
|
|
}
|
|
|
|
func (ts *TemporalService) createSchedule(req JobRequest) JobResponse {
|
|
if req.JobID == "" || req.CronExpr == "" || req.RecipePath == "" {
|
|
return JobResponse{Success: false, Message: "Missing required fields: job_id, cron, recipe_path"}
|
|
}
|
|
|
|
// Check if job already exists
|
|
if _, exists := ts.scheduleJobs[req.JobID]; exists {
|
|
return JobResponse{Success: false, Message: fmt.Sprintf("Job with ID '%s' already exists", req.JobID)}
|
|
}
|
|
|
|
// Validate recipe file exists
|
|
if _, err := os.Stat(req.RecipePath); os.IsNotExist(err) {
|
|
return JobResponse{Success: false, Message: fmt.Sprintf("Recipe file not found: %s", req.RecipePath)}
|
|
}
|
|
|
|
scheduleID := fmt.Sprintf("goose-job-%s", req.JobID)
|
|
|
|
// Create Temporal schedule
|
|
schedule := client.ScheduleOptions{
|
|
ID: scheduleID,
|
|
Spec: client.ScheduleSpec{
|
|
CronExpressions: []string{req.CronExpr},
|
|
},
|
|
Action: &client.ScheduleWorkflowAction{
|
|
ID: fmt.Sprintf("workflow-%s-{{.ScheduledTime.Unix}}", req.JobID),
|
|
Workflow: GooseJobWorkflow,
|
|
Args: []interface{}{req.JobID, req.RecipePath},
|
|
TaskQueue: TaskQueueName,
|
|
},
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
_, err := ts.client.ScheduleClient().Create(ctx, schedule)
|
|
if err != nil {
|
|
return JobResponse{Success: false, Message: fmt.Sprintf("Failed to create schedule: %v", err)}
|
|
}
|
|
|
|
// Track job in memory
|
|
jobStatus := &JobStatus{
|
|
ID: req.JobID,
|
|
CronExpr: req.CronExpr,
|
|
RecipePath: req.RecipePath,
|
|
CurrentlyRunning: false,
|
|
Paused: false,
|
|
CreatedAt: time.Now(),
|
|
}
|
|
ts.scheduleJobs[req.JobID] = jobStatus
|
|
|
|
log.Printf("Created schedule for job: %s", req.JobID)
|
|
return JobResponse{Success: true, Message: "Schedule created successfully"}
|
|
}
|
|
|
|
func (ts *TemporalService) deleteSchedule(req JobRequest) JobResponse {
|
|
if req.JobID == "" {
|
|
return JobResponse{Success: false, Message: "Missing job_id"}
|
|
}
|
|
|
|
scheduleID := fmt.Sprintf("goose-job-%s", req.JobID)
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
handle := ts.client.ScheduleClient().GetHandle(ctx, scheduleID)
|
|
err := handle.Delete(ctx)
|
|
if err != nil {
|
|
return JobResponse{Success: false, Message: fmt.Sprintf("Failed to delete schedule: %v", err)}
|
|
}
|
|
|
|
// Remove from memory
|
|
delete(ts.scheduleJobs, req.JobID)
|
|
|
|
log.Printf("Deleted schedule for job: %s", req.JobID)
|
|
return JobResponse{Success: true, Message: "Schedule deleted successfully"}
|
|
}
|
|
|
|
func (ts *TemporalService) pauseSchedule(req JobRequest) JobResponse {
|
|
if req.JobID == "" {
|
|
return JobResponse{Success: false, Message: "Missing job_id"}
|
|
}
|
|
|
|
scheduleID := fmt.Sprintf("goose-job-%s", req.JobID)
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
handle := ts.client.ScheduleClient().GetHandle(ctx, scheduleID)
|
|
err := handle.Pause(ctx, client.SchedulePauseOptions{
|
|
Note: "Paused via API",
|
|
})
|
|
if err != nil {
|
|
return JobResponse{Success: false, Message: fmt.Sprintf("Failed to pause schedule: %v", err)}
|
|
}
|
|
|
|
// Update in memory
|
|
if job, exists := ts.scheduleJobs[req.JobID]; exists {
|
|
job.Paused = true
|
|
}
|
|
|
|
log.Printf("Paused schedule for job: %s", req.JobID)
|
|
return JobResponse{Success: true, Message: "Schedule paused successfully"}
|
|
}
|
|
|
|
func (ts *TemporalService) unpauseSchedule(req JobRequest) JobResponse {
|
|
if req.JobID == "" {
|
|
return JobResponse{Success: false, Message: "Missing job_id"}
|
|
}
|
|
|
|
scheduleID := fmt.Sprintf("goose-job-%s", req.JobID)
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
handle := ts.client.ScheduleClient().GetHandle(ctx, scheduleID)
|
|
err := handle.Unpause(ctx, client.ScheduleUnpauseOptions{
|
|
Note: "Unpaused via API",
|
|
})
|
|
if err != nil {
|
|
return JobResponse{Success: false, Message: fmt.Sprintf("Failed to unpause schedule: %v", err)}
|
|
}
|
|
|
|
// Update in memory
|
|
if job, exists := ts.scheduleJobs[req.JobID]; exists {
|
|
job.Paused = false
|
|
}
|
|
|
|
log.Printf("Unpaused schedule for job: %s", req.JobID)
|
|
return JobResponse{Success: true, Message: "Schedule unpaused successfully"}
|
|
}
|
|
|
|
func (ts *TemporalService) listSchedules() JobResponse {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
// List all schedules from Temporal
|
|
iter, err := ts.client.ScheduleClient().List(ctx, client.ScheduleListOptions{})
|
|
if err != nil {
|
|
return JobResponse{Success: false, Message: fmt.Sprintf("Failed to list schedules: %v", err)}
|
|
}
|
|
|
|
var jobs []JobStatus
|
|
for iter.HasNext() {
|
|
schedule, err := iter.Next()
|
|
if err != nil {
|
|
log.Printf("Error listing schedules: %v", err)
|
|
continue
|
|
}
|
|
|
|
// Extract job ID from schedule ID
|
|
if strings.HasPrefix(schedule.ID, "goose-job-") {
|
|
jobID := strings.TrimPrefix(schedule.ID, "goose-job-")
|
|
|
|
// Get additional details from in-memory tracking
|
|
var jobStatus JobStatus
|
|
if tracked, exists := ts.scheduleJobs[jobID]; exists {
|
|
jobStatus = *tracked
|
|
} else {
|
|
// Fallback for schedules not in memory
|
|
jobStatus = JobStatus{
|
|
ID: jobID,
|
|
CreatedAt: time.Now(), // We don't have the real creation time
|
|
}
|
|
}
|
|
|
|
// Update with Temporal schedule info
|
|
if len(schedule.Spec.CronExpressions) > 0 {
|
|
jobStatus.CronExpr = schedule.Spec.CronExpressions[0]
|
|
}
|
|
|
|
// Get detailed schedule information including paused state and running status
|
|
scheduleHandle := ts.client.ScheduleClient().GetHandle(ctx, schedule.ID)
|
|
if desc, err := scheduleHandle.Describe(ctx); err == nil {
|
|
jobStatus.Paused = desc.Schedule.State.Paused
|
|
|
|
// Check if there are any running workflows for this job
|
|
jobStatus.CurrentlyRunning = ts.isJobCurrentlyRunning(ctx, jobID)
|
|
|
|
// Update last run time if available
|
|
if len(desc.Info.RecentActions) > 0 {
|
|
lastAction := desc.Info.RecentActions[len(desc.Info.RecentActions)-1]
|
|
if !lastAction.ActualTime.IsZero() {
|
|
lastRunStr := lastAction.ActualTime.Format(time.RFC3339)
|
|
jobStatus.LastRun = &lastRunStr
|
|
}
|
|
}
|
|
|
|
// Update next run time if available - this field may not exist in older SDK versions
|
|
// We'll skip this for now to avoid compilation errors
|
|
} else {
|
|
log.Printf("Warning: Could not get detailed info for schedule %s: %v", schedule.ID, err)
|
|
}
|
|
|
|
// Update in-memory tracking with latest info
|
|
ts.scheduleJobs[jobID] = &jobStatus
|
|
|
|
jobs = append(jobs, jobStatus)
|
|
}
|
|
}
|
|
|
|
return JobResponse{Success: true, Jobs: jobs}
|
|
}
|
|
|
|
// isJobCurrentlyRunning checks if there are any running workflows for the given job ID
|
|
func (ts *TemporalService) isJobCurrentlyRunning(ctx context.Context, jobID string) bool {
|
|
// Check our in-memory tracking of running jobs
|
|
if running, exists := ts.runningJobs[jobID]; exists && running {
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
// markJobAsRunning sets a job as currently running
|
|
func (ts *TemporalService) markJobAsRunning(jobID string) {
|
|
ts.runningJobs[jobID] = true
|
|
log.Printf("Marked job %s as running", jobID)
|
|
}
|
|
|
|
// markJobAsNotRunning sets a job as not currently running
|
|
func (ts *TemporalService) markJobAsNotRunning(jobID string) {
|
|
delete(ts.runningJobs, jobID)
|
|
log.Printf("Marked job %s as not running", jobID)
|
|
}
|
|
|
|
func (ts *TemporalService) runNow(req JobRequest) JobResponse {
|
|
if req.JobID == "" {
|
|
return JobResponse{Success: false, Message: "Missing job_id"}
|
|
}
|
|
|
|
// Get job details
|
|
job, exists := ts.scheduleJobs[req.JobID]
|
|
if !exists {
|
|
return JobResponse{Success: false, Message: fmt.Sprintf("Job '%s' not found", req.JobID)}
|
|
}
|
|
|
|
// Execute workflow immediately
|
|
workflowOptions := client.StartWorkflowOptions{
|
|
ID: fmt.Sprintf("manual-%s-%d", req.JobID, time.Now().Unix()),
|
|
TaskQueue: TaskQueueName,
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
we, err := ts.client.ExecuteWorkflow(ctx, workflowOptions, GooseJobWorkflow, req.JobID, job.RecipePath)
|
|
if err != nil {
|
|
return JobResponse{Success: false, Message: fmt.Sprintf("Failed to start workflow: %v", err)}
|
|
}
|
|
|
|
// Don't wait for completion in run_now, just return the workflow ID
|
|
log.Printf("Manual execution started for job: %s, workflow: %s", req.JobID, we.GetID())
|
|
return JobResponse{
|
|
Success: true,
|
|
Message: "Job execution started",
|
|
Data: RunNowResponse{SessionID: we.GetID()}, // Return workflow ID as session ID for now
|
|
}
|
|
}
|
|
|
|
func (ts *TemporalService) writeErrorResponse(w http.ResponseWriter, statusCode int, message string) {
|
|
w.WriteHeader(statusCode)
|
|
json.NewEncoder(w).Encode(JobResponse{Success: false, Message: message})
|
|
}
|
|
|
|
func (ts *TemporalService) handleHealth(w http.ResponseWriter, r *http.Request) {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(http.StatusOK)
|
|
json.NewEncoder(w).Encode(map[string]string{"status": "healthy"})
|
|
}
|
|
|
|
func main() {
|
|
port := os.Getenv("PORT")
|
|
if port == "" {
|
|
port = "8080"
|
|
}
|
|
|
|
log.Println("Starting Temporal service...")
|
|
log.Println("Note: This service requires a running Temporal server at 127.0.0.1:7233")
|
|
log.Println("Start Temporal server with: temporal server start-dev")
|
|
|
|
// Create Temporal service
|
|
service, err := NewTemporalService()
|
|
if err != nil {
|
|
log.Fatalf("Failed to create Temporal service: %v", err)
|
|
}
|
|
|
|
// Set up HTTP server
|
|
mux := http.NewServeMux()
|
|
mux.HandleFunc("/jobs", service.handleJobs)
|
|
mux.HandleFunc("/health", service.handleHealth)
|
|
|
|
server := &http.Server{
|
|
Addr: ":" + port,
|
|
Handler: mux,
|
|
}
|
|
|
|
// Handle graceful shutdown
|
|
sigChan := make(chan os.Signal, 1)
|
|
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
|
|
|
|
go func() {
|
|
<-sigChan
|
|
log.Println("Received shutdown signal")
|
|
|
|
// Shutdown HTTP server
|
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer cancel()
|
|
server.Shutdown(ctx)
|
|
|
|
// Stop Temporal service
|
|
service.Stop()
|
|
|
|
os.Exit(0)
|
|
}()
|
|
|
|
log.Printf("Temporal service starting on port %s", port)
|
|
log.Printf("Health endpoint: http://localhost:%s/health", port)
|
|
log.Printf("Jobs endpoint: http://localhost:%s/jobs", port)
|
|
|
|
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
|
log.Fatalf("HTTP server failed: %v", err)
|
|
}
|
|
} |