Files
goose/temporal-service/service.go
Max Novich 180b1df25d Mnovich/temporal foreground tasks (#2895)
Co-authored-by: Carlos M. Lopez <carlopez@squareup.com>
2025-06-20 16:19:58 -07:00

283 lines
8.4 KiB
Go

package main
import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"os"
"time"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
"gopkg.in/yaml.v2"
)
// Global service instance for activities to access
var globalService *TemporalService
// TemporalService manages the Temporal client and provides HTTP API
type TemporalService struct {
client client.Client
worker worker.Worker
scheduleJobs map[string]*JobStatus // In-memory job tracking
runningJobs map[string]bool // Track which jobs are currently running
runningWorkflows map[string][]string // Track workflow IDs for each job
recipesDir string // Directory for managed recipe storage
ports *PortConfig // Port configuration
}
// NewTemporalService creates a new Temporal service and ensures Temporal server is running
func NewTemporalService() (*TemporalService, error) {
// First, find available ports
ports, err := findAvailablePorts()
if err != nil {
return nil, fmt.Errorf("failed to find available ports: %w", err)
}
log.Printf("Using ports - Temporal: %d, UI: %d, HTTP: %d",
ports.TemporalPort, ports.UIPort, ports.HTTPPort)
// Ensure Temporal server is running
if err := ensureTemporalServerRunning(ports); err != nil {
return nil, fmt.Errorf("failed to ensure Temporal server is running: %w", err)
}
// Set up managed recipes directory in user data directory
recipesDir, err := getManagedRecipesDir()
if err != nil {
return nil, fmt.Errorf("failed to determine managed recipes directory: %w", err)
}
if err := os.MkdirAll(recipesDir, 0755); err != nil {
return nil, fmt.Errorf("failed to create managed recipes directory: %w", err)
}
log.Printf("Using managed recipes directory: %s", recipesDir)
// Create client (Temporal server should now be running)
c, err := client.Dial(client.Options{
HostPort: fmt.Sprintf("127.0.0.1:%d", ports.TemporalPort),
Namespace: Namespace,
})
if err != nil {
return nil, fmt.Errorf("failed to create temporal client: %w", err)
}
// Create worker
w := worker.New(c, TaskQueueName, worker.Options{})
w.RegisterWorkflow(GooseJobWorkflow)
w.RegisterActivity(ExecuteGooseRecipe)
if err := w.Start(); err != nil {
c.Close()
return nil, fmt.Errorf("failed to start worker: %w", err)
}
log.Printf("Connected to Temporal server successfully on port %d", ports.TemporalPort)
service := &TemporalService{
client: c,
worker: w,
scheduleJobs: make(map[string]*JobStatus),
runningJobs: make(map[string]bool),
runningWorkflows: make(map[string][]string),
recipesDir: recipesDir,
ports: ports,
}
// Set global service for activities
globalService = service
return service, nil
}
// Stop gracefully shuts down the Temporal service
func (ts *TemporalService) Stop() {
log.Println("Shutting down Temporal service...")
if ts.worker != nil {
ts.worker.Stop()
}
if ts.client != nil {
ts.client.Close()
}
log.Println("Temporal service stopped")
}
// GetHTTPPort returns the HTTP port for this service
func (ts *TemporalService) GetHTTPPort() int {
return ts.ports.HTTPPort
}
// GetTemporalPort returns the Temporal server port for this service
func (ts *TemporalService) GetTemporalPort() int {
return ts.ports.TemporalPort
}
// GetUIPort returns the Temporal UI port for this service
func (ts *TemporalService) GetUIPort() int {
return ts.ports.UIPort
}
// HTTP API handlers
func (ts *TemporalService) handleJobs(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
if r.Method != http.MethodPost {
ts.writeErrorResponse(w, http.StatusMethodNotAllowed, "Method not allowed")
return
}
var req JobRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
ts.writeErrorResponse(w, http.StatusBadRequest, fmt.Sprintf("Invalid JSON: %v", err))
return
}
var resp JobResponse
switch req.Action {
case "create":
resp = ts.createSchedule(req)
case "delete":
resp = ts.deleteSchedule(req)
case "pause":
resp = ts.pauseSchedule(req)
case "unpause":
resp = ts.unpauseSchedule(req)
case "update":
resp = ts.updateSchedule(req)
case "list":
resp = ts.listSchedules()
case "run_now":
resp = ts.runNow(req)
case "kill_job":
resp = ts.killJob(req)
case "inspect_job":
resp = ts.inspectJob(req)
case "mark_completed":
resp = ts.markCompleted(req)
case "status":
resp = ts.getJobStatus(req)
default:
resp = JobResponse{Success: false, Message: fmt.Sprintf("Unknown action: %s", req.Action)}
}
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(resp)
}
func (ts *TemporalService) handleHealth(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(map[string]string{"status": "healthy"})
}
func (ts *TemporalService) handlePorts(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
portInfo := map[string]int{
"http_port": ts.ports.HTTPPort,
"temporal_port": ts.ports.TemporalPort,
"ui_port": ts.ports.UIPort,
}
json.NewEncoder(w).Encode(portInfo)
}
// markJobAsRunning sets a job as currently running and tracks the workflow ID
func (ts *TemporalService) markJobAsRunning(jobID string) {
ts.runningJobs[jobID] = true
log.Printf("Marked job %s as running", jobID)
}
// markJobAsNotRunning sets a job as not currently running and clears workflow tracking
func (ts *TemporalService) markJobAsNotRunning(jobID string) {
delete(ts.runningJobs, jobID)
delete(ts.runningWorkflows, jobID)
log.Printf("Marked job %s as not running", jobID)
}
// addRunningWorkflow tracks a workflow ID for a job
func (ts *TemporalService) addRunningWorkflow(jobID, workflowID string) {
if ts.runningWorkflows[jobID] == nil {
ts.runningWorkflows[jobID] = make([]string, 0)
}
ts.runningWorkflows[jobID] = append(ts.runningWorkflows[jobID], workflowID)
log.Printf("Added workflow %s for job %s", workflowID, jobID)
}
// removeRunningWorkflow removes a workflow ID from job tracking
func (ts *TemporalService) removeRunningWorkflow(jobID, workflowID string) {
if workflows, exists := ts.runningWorkflows[jobID]; exists {
for i, id := range workflows {
if id == workflowID {
ts.runningWorkflows[jobID] = append(workflows[:i], workflows[i+1:]...)
break
}
}
if len(ts.runningWorkflows[jobID]) == 0 {
delete(ts.runningWorkflows, jobID)
ts.runningJobs[jobID] = false
}
}
}
// getEmbeddedRecipeContent retrieves embedded recipe content from schedule metadata
func (ts *TemporalService) getEmbeddedRecipeContent(jobID string) (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
scheduleID := fmt.Sprintf("goose-job-%s", jobID)
handle := ts.client.ScheduleClient().GetHandle(ctx, scheduleID)
desc, err := handle.Describe(ctx)
if err != nil {
return "", fmt.Errorf("failed to get schedule description: %w", err)
}
if desc.Schedule.State.Note == "" {
return "", fmt.Errorf("no metadata found in schedule")
}
var metadata map[string]interface{}
if err := json.Unmarshal([]byte(desc.Schedule.State.Note), &metadata); err != nil {
return "", fmt.Errorf("failed to parse schedule metadata: %w", err)
}
if recipeContent, ok := metadata["recipe_content"].(string); ok {
return recipeContent, nil
}
return "", fmt.Errorf("no embedded recipe content found")
}
// writeErrorResponse writes a standardized error response
func (ts *TemporalService) writeErrorResponse(w http.ResponseWriter, statusCode int, message string) {
w.WriteHeader(statusCode)
json.NewEncoder(w).Encode(JobResponse{Success: false, Message: message})
}
// isJobCurrentlyRunning checks if there are any running workflows for the given job ID
func (ts *TemporalService) isJobCurrentlyRunning(ctx context.Context, jobID string) bool {
// Check our in-memory tracking of running jobs
if running, exists := ts.runningJobs[jobID]; exists && running {
return true
}
return false
}
// parseRecipeContent parses recipe content from bytes (YAML or JSON)
func (ts *TemporalService) parseRecipeContent(content []byte) (*Recipe, error) {
var recipe Recipe
// Try YAML first, then JSON
if err := yaml.Unmarshal(content, &recipe); err != nil {
if err := json.Unmarshal(content, &recipe); err != nil {
return nil, fmt.Errorf("failed to parse as YAML or JSON: %w", err)
}
}
return &recipe, nil
}