diff --git a/src/app/api/[[...route]]/route.ts b/src/app/api/[[...route]]/route.ts index f6fbe2c..3d4d922 100644 --- a/src/app/api/[[...route]]/route.ts +++ b/src/app/api/[[...route]]/route.ts @@ -16,6 +16,8 @@ import { GitService } from "../../../server/core/git/services/GitService"; import { ProjectRepository } from "../../../server/core/project/infrastructure/ProjectRepository"; import { ProjectController } from "../../../server/core/project/presentation/ProjectController"; import { ProjectMetaService } from "../../../server/core/project/services/ProjectMetaService"; +import { SchedulerService } from "../../../server/core/scheduler/domain/Scheduler"; +import { SchedulerController } from "../../../server/core/scheduler/presentation/SchedulerController"; import { SessionRepository } from "../../../server/core/session/infrastructure/SessionRepository"; import { VirtualConversationDatabase } from "../../../server/core/session/infrastructure/VirtualConversationDatabase"; import { SessionController } from "../../../server/core/session/presentation/SessionController"; @@ -40,6 +42,7 @@ await Effect.runPromise( Effect.provide(ClaudeCodePermissionController.Live), Effect.provide(FileSystemController.Live), Effect.provide(SSEController.Live), + Effect.provide(SchedulerController.Live), ) .pipe( /** Application */ @@ -53,6 +56,7 @@ await Effect.runPromise( Effect.provide(ClaudeCodeSessionProcessService.Live), Effect.provide(ClaudeCodeService.Live), Effect.provide(GitService.Live), + Effect.provide(SchedulerService.Live), ) .pipe( /** Infrastructure */ diff --git a/src/server/core/scheduler/config.test.ts b/src/server/core/scheduler/config.test.ts new file mode 100644 index 0000000..b483041 --- /dev/null +++ b/src/server/core/scheduler/config.test.ts @@ -0,0 +1,124 @@ +import { mkdir, rm } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { FileSystem, Path } from "@effect/platform"; +import { NodeFileSystem, NodePath } from "@effect/platform-node"; +import { Effect, Layer } from "effect"; +import { afterEach, beforeEach, describe, expect, test } from "vitest"; +import { + getConfigPath, + initializeConfig, + readConfig, + writeConfig, +} from "./config"; +import type { SchedulerConfig } from "./schema"; + +describe("scheduler config", () => { + let testDir: string; + const testLayer = Layer.mergeAll(NodeFileSystem.layer, NodePath.layer); + + beforeEach(async () => { + testDir = join(tmpdir(), `scheduler-test-${Date.now()}`); + await mkdir(testDir, { recursive: true }); + }); + + afterEach(async () => { + await rm(testDir, { recursive: true, force: true }); + }); + + test("getConfigPath returns correct path", async () => { + const result = await Effect.runPromise( + getConfigPath.pipe(Effect.provide(testLayer)), + ); + + expect(result).toContain(".claude-code-viewer/scheduler/config.json"); + }); + + test("writeConfig and readConfig work correctly", async () => { + const config: SchedulerConfig = { + jobs: [ + { + id: "test-job-1", + name: "Test Job", + schedule: { + type: "cron", + expression: "0 0 * * *", + }, + message: { + content: "test message", + projectId: "project-1", + baseSessionId: null, + }, + enabled: true, + concurrencyPolicy: "skip", + createdAt: "2025-10-25T00:00:00Z", + lastRunAt: null, + lastRunStatus: null, + }, + ], + }; + + const result = await Effect.runPromise( + Effect.gen(function* () { + yield* writeConfig(config); + return yield* readConfig; + }).pipe(Effect.provide(testLayer)), + ); + + expect(result).toEqual(config); + }); + + test("initializeConfig creates file if not exists", async () => { + const result = await Effect.runPromise( + Effect.gen(function* () { + const configPath = yield* getConfigPath; + const fs = yield* FileSystem.FileSystem; + + const exists = yield* fs.exists(configPath); + if (exists) { + yield* fs.remove(configPath); + } + + return yield* initializeConfig; + }).pipe(Effect.provide(testLayer)), + ); + + expect(result).toEqual({ jobs: [] }); + }); + + test("readConfig fails with ConfigFileNotFoundError when file does not exist", async () => { + const result = await Effect.runPromise( + Effect.gen(function* () { + const fs = yield* FileSystem.FileSystem; + const configPath = yield* getConfigPath; + + const exists = yield* fs.exists(configPath); + if (exists) { + yield* fs.remove(configPath); + } + + return yield* readConfig; + }).pipe(Effect.provide(testLayer), Effect.flip), + ); + + expect(result._tag).toBe("ConfigFileNotFoundError"); + }); + + test("readConfig fails with ConfigParseError for invalid JSON", async () => { + const result = await Effect.runPromise( + Effect.gen(function* () { + const fs = yield* FileSystem.FileSystem; + const path = yield* Path.Path; + const configPath = yield* getConfigPath; + const configDir = path.dirname(configPath); + + yield* fs.makeDirectory(configDir, { recursive: true }); + yield* fs.writeFileString(configPath, "{ invalid json }"); + + return yield* readConfig; + }).pipe(Effect.provide(testLayer), Effect.flip), + ); + + expect(result._tag).toBe("ConfigParseError"); + }); +}); diff --git a/src/server/core/scheduler/config.ts b/src/server/core/scheduler/config.ts new file mode 100644 index 0000000..69a3005 --- /dev/null +++ b/src/server/core/scheduler/config.ts @@ -0,0 +1,94 @@ +import { homedir } from "node:os"; +import { FileSystem, Path } from "@effect/platform"; +import { Data, Effect } from "effect"; +import { type SchedulerConfig, schedulerConfigSchema } from "./schema"; + +class ConfigFileNotFoundError extends Data.TaggedError( + "ConfigFileNotFoundError", +)<{ + readonly path: string; +}> {} + +class ConfigParseError extends Data.TaggedError("ConfigParseError")<{ + readonly path: string; + readonly cause: unknown; +}> {} + +const CONFIG_DIR = "scheduler"; +const CONFIG_FILE = "config.json"; + +export const getConfigPath = Effect.gen(function* () { + const path = yield* Path.Path; + const baseDir = path.resolve(homedir(), ".claude-code-viewer"); + return path.join(baseDir, CONFIG_DIR, CONFIG_FILE); +}); + +export const readConfig = Effect.gen(function* () { + const fs = yield* FileSystem.FileSystem; + const configPath = yield* getConfigPath; + + const exists = yield* fs.exists(configPath); + if (!exists) { + return yield* Effect.fail( + new ConfigFileNotFoundError({ path: configPath }), + ); + } + + const content = yield* fs.readFileString(configPath); + + const jsonResult = yield* Effect.try({ + try: () => JSON.parse(content), + catch: (error) => + new ConfigParseError({ + path: configPath, + cause: error, + }), + }); + + const parsed = schedulerConfigSchema.safeParse(jsonResult); + + if (!parsed.success) { + return yield* Effect.fail( + new ConfigParseError({ + path: configPath, + cause: parsed.error, + }), + ); + } + + return parsed.data; +}); + +export const writeConfig = (config: SchedulerConfig) => + Effect.gen(function* () { + const fs = yield* FileSystem.FileSystem; + const path = yield* Path.Path; + const configPath = yield* getConfigPath; + const configDir = path.dirname(configPath); + + yield* fs.makeDirectory(configDir, { recursive: true }); + + const content = JSON.stringify(config, null, 2); + yield* fs.writeFileString(configPath, content); + }); + +export const initializeConfig = Effect.gen(function* () { + const result = yield* readConfig.pipe( + Effect.catchTags({ + ConfigFileNotFoundError: () => + Effect.gen(function* () { + const initialConfig: SchedulerConfig = { jobs: [] }; + yield* writeConfig(initialConfig); + return initialConfig; + }), + ConfigParseError: () => + Effect.gen(function* () { + const initialConfig: SchedulerConfig = { jobs: [] }; + yield* writeConfig(initialConfig); + return initialConfig; + }), + }), + ); + + return result; +}); diff --git a/src/server/core/scheduler/domain/Job.test.ts b/src/server/core/scheduler/domain/Job.test.ts new file mode 100644 index 0000000..7bb31ce --- /dev/null +++ b/src/server/core/scheduler/domain/Job.test.ts @@ -0,0 +1,167 @@ +import { describe, expect, test } from "vitest"; +import type { SchedulerJob } from "../schema"; +import { calculateFixedDelay, shouldExecuteJob } from "./Job"; + +describe("shouldExecuteJob", () => { + test("returns false when job is disabled", () => { + const job: SchedulerJob = { + id: "test-job", + name: "Test Job", + schedule: { type: "cron", expression: "* * * * *" }, + message: { content: "test", projectId: "proj-1", baseSessionId: null }, + enabled: false, + concurrencyPolicy: "skip", + createdAt: "2025-10-25T00:00:00Z", + lastRunAt: null, + lastRunStatus: null, + }; + + expect(shouldExecuteJob(job, new Date())).toBe(false); + }); + + test("returns true for cron job when enabled", () => { + const job: SchedulerJob = { + id: "test-job", + name: "Test Job", + schedule: { type: "cron", expression: "* * * * *" }, + message: { content: "test", projectId: "proj-1", baseSessionId: null }, + enabled: true, + concurrencyPolicy: "skip", + createdAt: "2025-10-25T00:00:00Z", + lastRunAt: null, + lastRunStatus: null, + }; + + expect(shouldExecuteJob(job, new Date())).toBe(true); + }); + + test("returns false for oneTime fixed job that has already run", () => { + const job: SchedulerJob = { + id: "test-job", + name: "Test Job", + schedule: { type: "fixed", delayMs: 60000, oneTime: true }, + message: { content: "test", projectId: "proj-1", baseSessionId: null }, + enabled: true, + concurrencyPolicy: "skip", + createdAt: "2025-10-25T00:00:00Z", + lastRunAt: "2025-10-25T00:01:00Z", + lastRunStatus: "success", + }; + + expect(shouldExecuteJob(job, new Date())).toBe(false); + }); + + test("returns false for oneTime fixed job when scheduled time has not arrived", () => { + const createdAt = new Date("2025-10-25T00:00:00Z"); + const now = new Date("2025-10-25T00:00:30Z"); + + const job: SchedulerJob = { + id: "test-job", + name: "Test Job", + schedule: { type: "fixed", delayMs: 60000, oneTime: true }, + message: { content: "test", projectId: "proj-1", baseSessionId: null }, + enabled: true, + concurrencyPolicy: "skip", + createdAt: createdAt.toISOString(), + lastRunAt: null, + lastRunStatus: null, + }; + + expect(shouldExecuteJob(job, now)).toBe(false); + }); + + test("returns true for oneTime fixed job when scheduled time has arrived", () => { + const createdAt = new Date("2025-10-25T00:00:00Z"); + const now = new Date("2025-10-25T00:01:01Z"); + + const job: SchedulerJob = { + id: "test-job", + name: "Test Job", + schedule: { type: "fixed", delayMs: 60000, oneTime: true }, + message: { content: "test", projectId: "proj-1", baseSessionId: null }, + enabled: true, + concurrencyPolicy: "skip", + createdAt: createdAt.toISOString(), + lastRunAt: null, + lastRunStatus: null, + }; + + expect(shouldExecuteJob(job, now)).toBe(true); + }); + + test("returns true for recurring fixed job", () => { + const job: SchedulerJob = { + id: "test-job", + name: "Test Job", + schedule: { type: "fixed", delayMs: 60000, oneTime: false }, + message: { content: "test", projectId: "proj-1", baseSessionId: null }, + enabled: true, + concurrencyPolicy: "skip", + createdAt: "2025-10-25T00:00:00Z", + lastRunAt: null, + lastRunStatus: null, + }; + + expect(shouldExecuteJob(job, new Date())).toBe(true); + }); +}); + +describe("calculateFixedDelay", () => { + test("calculates delay correctly for future scheduled time", () => { + const createdAt = new Date("2025-10-25T00:00:00Z"); + const now = new Date("2025-10-25T00:00:30Z"); + + const job: SchedulerJob = { + id: "test-job", + name: "Test Job", + schedule: { type: "fixed", delayMs: 60000, oneTime: true }, + message: { content: "test", projectId: "proj-1", baseSessionId: null }, + enabled: true, + concurrencyPolicy: "skip", + createdAt: createdAt.toISOString(), + lastRunAt: null, + lastRunStatus: null, + }; + + const delay = calculateFixedDelay(job, now); + expect(delay).toBe(30000); + }); + + test("returns 0 for past scheduled time", () => { + const createdAt = new Date("2025-10-25T00:00:00Z"); + const now = new Date("2025-10-25T00:02:00Z"); + + const job: SchedulerJob = { + id: "test-job", + name: "Test Job", + schedule: { type: "fixed", delayMs: 60000, oneTime: true }, + message: { content: "test", projectId: "proj-1", baseSessionId: null }, + enabled: true, + concurrencyPolicy: "skip", + createdAt: createdAt.toISOString(), + lastRunAt: null, + lastRunStatus: null, + }; + + const delay = calculateFixedDelay(job, now); + expect(delay).toBe(0); + }); + + test("throws error for non-fixed schedule type", () => { + const job: SchedulerJob = { + id: "test-job", + name: "Test Job", + schedule: { type: "cron", expression: "* * * * *" }, + message: { content: "test", projectId: "proj-1", baseSessionId: null }, + enabled: true, + concurrencyPolicy: "skip", + createdAt: "2025-10-25T00:00:00Z", + lastRunAt: null, + lastRunStatus: null, + }; + + expect(() => calculateFixedDelay(job, new Date())).toThrow( + "Job schedule type must be fixed", + ); + }); +}); diff --git a/src/server/core/scheduler/domain/Job.ts b/src/server/core/scheduler/domain/Job.ts new file mode 100644 index 0000000..4537ae7 --- /dev/null +++ b/src/server/core/scheduler/domain/Job.ts @@ -0,0 +1,74 @@ +import { Effect } from "effect"; +import { ClaudeCodeLifeCycleService } from "../../claude-code/services/ClaudeCodeLifeCycleService"; +import { UserConfigService } from "../../platform/services/UserConfigService"; +import { ProjectRepository } from "../../project/infrastructure/ProjectRepository"; +import type { SchedulerJob } from "../schema"; + +export const executeJob = (job: SchedulerJob) => + Effect.gen(function* () { + const lifeCycleService = yield* ClaudeCodeLifeCycleService; + const projectRepository = yield* ProjectRepository; + const userConfigService = yield* UserConfigService; + + const { message } = job; + const { project } = yield* projectRepository.getProject(message.projectId); + const userConfig = yield* userConfigService.getUserConfig(); + + if (project.meta.projectPath === null) { + return yield* Effect.fail( + new Error(`Project path not found for projectId: ${message.projectId}`), + ); + } + + if (message.baseSessionId === null) { + yield* lifeCycleService.startTask({ + baseSession: { + cwd: project.meta.projectPath, + projectId: message.projectId, + sessionId: undefined, + }, + userConfig, + message: message.content, + }); + } else { + yield* lifeCycleService.continueTask({ + sessionProcessId: message.baseSessionId, + message: message.content, + baseSessionId: message.baseSessionId, + }); + } + }); + +export const shouldExecuteJob = (job: SchedulerJob, now: Date): boolean => { + if (!job.enabled) { + return false; + } + + if (job.schedule.type === "cron") { + return true; + } + + if (job.schedule.type === "fixed" && job.schedule.oneTime) { + if (job.lastRunStatus !== null) { + return false; + } + + const createdAt = new Date(job.createdAt); + const scheduledTime = new Date(createdAt.getTime() + job.schedule.delayMs); + return now >= scheduledTime; + } + + return true; +}; + +export const calculateFixedDelay = (job: SchedulerJob, now: Date): number => { + if (job.schedule.type !== "fixed") { + throw new Error("Job schedule type must be fixed"); + } + + const createdAt = new Date(job.createdAt); + const scheduledTime = new Date(createdAt.getTime() + job.schedule.delayMs); + const delay = scheduledTime.getTime() - now.getTime(); + + return Math.max(0, delay); +}; diff --git a/src/server/core/scheduler/domain/Scheduler.test.ts b/src/server/core/scheduler/domain/Scheduler.test.ts new file mode 100644 index 0000000..4059771 --- /dev/null +++ b/src/server/core/scheduler/domain/Scheduler.test.ts @@ -0,0 +1,232 @@ +import { mkdir, rm, unlink } from "node:fs/promises"; +import { homedir, tmpdir } from "node:os"; +import { join } from "node:path"; +import { NodeContext, NodeFileSystem, NodePath } from "@effect/platform-node"; +import { Effect, Layer } from "effect"; +import { afterEach, beforeEach, describe, expect, test } from "vitest"; +import { ClaudeCodeLifeCycleService } from "../../claude-code/services/ClaudeCodeLifeCycleService"; +import { ClaudeCodeSessionProcessService } from "../../claude-code/services/ClaudeCodeSessionProcessService"; +import { EnvService } from "../../platform/services/EnvService"; +import { UserConfigService } from "../../platform/services/UserConfigService"; +import { ProjectRepository } from "../../project/infrastructure/ProjectRepository"; +import type { NewSchedulerJob } from "../schema"; +import { SchedulerService } from "./Scheduler"; + +describe("SchedulerService", () => { + let testDir: string; + + const mockSessionProcessService = Layer.succeed( + ClaudeCodeSessionProcessService, + { + startSessionProcess: () => + Effect.succeed({ sessionProcess: {} as never, task: {} as never }), + continueSessionProcess: () => + Effect.succeed({ sessionProcess: {} as never, task: {} as never }), + toNotInitializedState: () => + Effect.succeed({ sessionProcess: {} as never, task: {} as never }), + toInitializedState: () => Effect.succeed({ sessionProcess: {} as never }), + toFileCreatedState: () => Effect.succeed({ sessionProcess: {} as never }), + toPausedState: () => Effect.succeed({ sessionProcess: {} as never }), + toCompletedState: () => + Effect.succeed({ sessionProcess: {} as never, task: undefined }), + dangerouslyChangeProcessState: () => Effect.succeed({} as never), + getSessionProcesses: () => Effect.succeed([]), + getSessionProcess: () => Effect.succeed({} as never), + getTask: () => Effect.succeed({} as never), + changeTaskState: () => Effect.succeed({} as never), + }, + ); + + const mockLifeCycleService = Layer.succeed(ClaudeCodeLifeCycleService, { + startTask: () => Effect.void, + continueTask: () => Effect.void, + } as never); + + const mockProjectRepository = Layer.succeed(ProjectRepository, { + getProject: () => + Effect.succeed({ + project: { + meta: { projectPath: "/tmp/test-project" }, + }, + } as never), + } as never); + + const mockUserConfigService = Layer.succeed(UserConfigService, { + getUserConfig: () => + Effect.succeed({ + hideNoUserMessageSession: true, + unifySameTitleSession: true, + enterKeyBehavior: "shift-enter-send", + permissionMode: "default", + locale: "ja", + }), + } as never); + + const mockEnvService = Layer.succeed(EnvService, { + getEnv: () => Effect.succeed(undefined), + } as never); + + const baseLayers = Layer.mergeAll( + NodeFileSystem.layer, + NodePath.layer, + NodeContext.layer, + mockSessionProcessService, + mockLifeCycleService, + mockProjectRepository, + mockUserConfigService, + mockEnvService, + ); + + const testLayer = Layer.mergeAll(SchedulerService.Live, baseLayers).pipe( + Layer.provide(baseLayers), + ); + + beforeEach(async () => { + testDir = join(tmpdir(), `scheduler-test-${Date.now()}`); + await mkdir(testDir, { recursive: true }); + + // Clean up existing config file + const configPath = join( + homedir(), + ".claude-code-viewer", + "scheduler", + "config.json", + ); + try { + await unlink(configPath); + } catch { + // Ignore if file doesn't exist + } + }); + + afterEach(async () => { + await rm(testDir, { recursive: true, force: true }); + }); + + test("addJob creates a new job with generated id", async () => { + const newJob: NewSchedulerJob = { + name: "Test Job", + schedule: { type: "cron", expression: "0 0 * * *" }, + message: { + content: "test message", + projectId: "project-1", + baseSessionId: null, + }, + enabled: false, + concurrencyPolicy: "skip", + }; + + const result = await Effect.runPromise( + Effect.gen(function* () { + const service = yield* SchedulerService; + const job = yield* service.addJob(newJob); + return job; + }).pipe(Effect.provide(testLayer)), + ); + + expect(result.id).toBeDefined(); + expect(result.name).toBe("Test Job"); + expect(result.createdAt).toBeDefined(); + expect(result.lastRunAt).toBe(null); + expect(result.lastRunStatus).toBe(null); + }); + + test("getJobs returns all jobs", async () => { + const newJob: NewSchedulerJob = { + name: "Test Job", + schedule: { type: "cron", expression: "0 0 * * *" }, + message: { + content: "test message", + projectId: "project-1", + baseSessionId: null, + }, + enabled: false, + concurrencyPolicy: "skip", + }; + + const result = await Effect.runPromise( + Effect.gen(function* () { + const service = yield* SchedulerService; + yield* service.addJob(newJob); + yield* service.addJob(newJob); + return yield* service.getJobs(); + }).pipe(Effect.provide(testLayer)), + ); + + expect(result).toHaveLength(2); + }); + + test("updateJob modifies an existing job", async () => { + const newJob: NewSchedulerJob = { + name: "Test Job", + schedule: { type: "cron", expression: "0 0 * * *" }, + message: { + content: "test message", + projectId: "project-1", + baseSessionId: null, + }, + enabled: false, + concurrencyPolicy: "skip", + }; + + const result = await Effect.runPromise( + Effect.gen(function* () { + const service = yield* SchedulerService; + const job = yield* service.addJob(newJob); + const updated = yield* service.updateJob(job.id, { + name: "Updated Job", + }); + return updated; + }).pipe(Effect.provide(testLayer)), + ); + + expect(result.name).toBe("Updated Job"); + }); + + test("deleteJob removes a job", async () => { + const newJob: NewSchedulerJob = { + name: "Test Job", + schedule: { type: "cron", expression: "0 0 * * *" }, + message: { + content: "test message", + projectId: "project-1", + baseSessionId: null, + }, + enabled: false, + concurrencyPolicy: "skip", + }; + + const result = await Effect.runPromise( + Effect.gen(function* () { + const service = yield* SchedulerService; + const job = yield* service.addJob(newJob); + yield* service.deleteJob(job.id); + return yield* service.getJobs(); + }).pipe(Effect.provide(testLayer)), + ); + + expect(result).toHaveLength(0); + }); + + test("updateJob fails with SchedulerJobNotFoundError for non-existent job", async () => { + const result = await Effect.runPromise( + Effect.gen(function* () { + const service = yield* SchedulerService; + return yield* service.updateJob("non-existent-id", { name: "Updated" }); + }).pipe(Effect.provide(testLayer), Effect.flip), + ); + + expect(result._tag).toBe("SchedulerJobNotFoundError"); + }); + + test("deleteJob fails with SchedulerJobNotFoundError for non-existent job", async () => { + const result = await Effect.runPromise( + Effect.gen(function* () { + const service = yield* SchedulerService; + return yield* service.deleteJob("non-existent-id"); + }).pipe(Effect.provide(testLayer), Effect.flip), + ); + + expect(result._tag).toBe("SchedulerJobNotFoundError"); + }); +}); diff --git a/src/server/core/scheduler/domain/Scheduler.ts b/src/server/core/scheduler/domain/Scheduler.ts new file mode 100644 index 0000000..32b3f5e --- /dev/null +++ b/src/server/core/scheduler/domain/Scheduler.ts @@ -0,0 +1,313 @@ +import { randomUUID } from "node:crypto"; +import { + Context, + Cron, + Data, + Duration, + Effect, + Fiber, + Layer, + Ref, + Schedule, +} from "effect"; +import type { InferEffect } from "../../../lib/effect/types"; +import { initializeConfig, readConfig, writeConfig } from "../config"; +import type { + NewSchedulerJob, + SchedulerConfig, + SchedulerJob, + UpdateSchedulerJob, +} from "../schema"; +import { calculateFixedDelay, executeJob, shouldExecuteJob } from "./Job"; + +class SchedulerJobNotFoundError extends Data.TaggedError( + "SchedulerJobNotFoundError", +)<{ + readonly jobId: string; +}> {} + +class InvalidCronExpressionError extends Data.TaggedError( + "InvalidCronExpressionError", +)<{ + readonly expression: string; + readonly cause: unknown; +}> {} + +const LayerImpl = Effect.gen(function* () { + const fibersRef = yield* Ref.make< + Map> + >(new Map()); + const runningJobsRef = yield* Ref.make>(new Set()); + + const startJob = (job: SchedulerJob) => + Effect.gen(function* () { + const now = new Date(); + + if (job.schedule.type === "cron") { + const cronResult = Cron.parse(job.schedule.expression); + + if (cronResult._tag === "Left") { + return yield* Effect.fail( + new InvalidCronExpressionError({ + expression: job.schedule.expression, + cause: cronResult.left, + }), + ); + } + + const schedule = Schedule.cron(cronResult.right); + + const fiber = yield* Effect.repeat( + runJobWithConcurrencyControl(job), + schedule, + ).pipe(Effect.forkDaemon); + + yield* Ref.update(fibersRef, (fibers) => + new Map(fibers).set(job.id, fiber), + ); + } else if (job.schedule.type === "fixed") { + if (!shouldExecuteJob(job, now)) { + return; + } + + const delay = calculateFixedDelay(job, now); + const delayDuration = Duration.millis(delay); + + if (job.schedule.oneTime) { + const fiber = yield* Effect.delay( + runJobWithConcurrencyControl(job), + delayDuration, + ).pipe(Effect.forkDaemon); + + yield* Ref.update(fibersRef, (fibers) => + new Map(fibers).set(job.id, fiber), + ); + } else { + const schedule = Schedule.spaced(delayDuration); + + const fiber = yield* Effect.repeat( + runJobWithConcurrencyControl(job), + schedule, + ).pipe(Effect.forkDaemon); + + yield* Ref.update(fibersRef, (fibers) => + new Map(fibers).set(job.id, fiber), + ); + } + } + }); + + const runJobWithConcurrencyControl = (job: SchedulerJob) => + Effect.gen(function* () { + if (job.concurrencyPolicy === "skip") { + const runningJobs = yield* Ref.get(runningJobsRef); + if (runningJobs.has(job.id)) { + return; + } + } + + yield* Ref.update(runningJobsRef, (jobs) => new Set(jobs).add(job.id)); + + const result = yield* executeJob(job).pipe( + Effect.matchEffect({ + onSuccess: () => + updateJobStatus(job.id, "success", new Date().toISOString()), + onFailure: () => + updateJobStatus(job.id, "failed", new Date().toISOString()), + }), + ); + + yield* Ref.update(runningJobsRef, (jobs) => { + const newJobs = new Set(jobs); + newJobs.delete(job.id); + return newJobs; + }); + + return result; + }); + + const updateJobStatus = ( + jobId: string, + status: "success" | "failed", + runAt: string, + ) => + Effect.gen(function* () { + const config = yield* readConfig; + const job = config.jobs.find((j) => j.id === jobId); + + if (job === undefined) { + return; + } + + const updatedJob: SchedulerJob = { + ...job, + lastRunAt: runAt, + lastRunStatus: status, + }; + + const updatedConfig: SchedulerConfig = { + jobs: config.jobs.map((j) => (j.id === jobId ? updatedJob : j)), + }; + + yield* writeConfig(updatedConfig); + }); + + const stopJob = (jobId: string) => + Effect.gen(function* () { + const fibers = yield* Ref.get(fibersRef); + const fiber = fibers.get(jobId); + + if (fiber !== undefined) { + yield* Fiber.interrupt(fiber); + yield* Ref.update(fibersRef, (fibers) => { + const newFibers = new Map(fibers); + newFibers.delete(jobId); + return newFibers; + }); + } + }); + + const startScheduler = Effect.gen(function* () { + yield* initializeConfig; + const config = yield* readConfig; + + for (const job of config.jobs) { + if (job.enabled) { + yield* startJob(job); + } + } + }); + + const stopScheduler = Effect.gen(function* () { + const fibers = yield* Ref.get(fibersRef); + + for (const fiber of fibers.values()) { + yield* Fiber.interrupt(fiber); + } + + yield* Ref.set(fibersRef, new Map()); + }); + + const getJobs = () => + Effect.gen(function* () { + const config = yield* readConfig.pipe( + Effect.catchTags({ + ConfigFileNotFoundError: () => + initializeConfig.pipe(Effect.map(() => ({ jobs: [] }))), + ConfigParseError: () => + initializeConfig.pipe(Effect.map(() => ({ jobs: [] }))), + }), + ); + return config.jobs; + }); + + const addJob = (newJob: NewSchedulerJob) => + Effect.gen(function* () { + const config = yield* readConfig.pipe( + Effect.catchTags({ + ConfigFileNotFoundError: () => + initializeConfig.pipe(Effect.map(() => ({ jobs: [] }))), + ConfigParseError: () => + initializeConfig.pipe(Effect.map(() => ({ jobs: [] }))), + }), + ); + const job: SchedulerJob = { + ...newJob, + id: randomUUID(), + createdAt: new Date().toISOString(), + lastRunAt: null, + lastRunStatus: null, + }; + + const updatedConfig: SchedulerConfig = { + jobs: [...config.jobs, job], + }; + + yield* writeConfig(updatedConfig); + + if (job.enabled) { + yield* startJob(job); + } + + return job; + }); + + const updateJob = (jobId: string, updates: UpdateSchedulerJob) => + Effect.gen(function* () { + const config = yield* readConfig.pipe( + Effect.catchTags({ + ConfigFileNotFoundError: () => + initializeConfig.pipe(Effect.map(() => ({ jobs: [] }))), + ConfigParseError: () => + initializeConfig.pipe(Effect.map(() => ({ jobs: [] }))), + }), + ); + const job = config.jobs.find((j) => j.id === jobId); + + if (job === undefined) { + return yield* Effect.fail(new SchedulerJobNotFoundError({ jobId })); + } + + yield* stopJob(jobId); + + const updatedJob: SchedulerJob = { + ...job, + ...updates, + }; + + const updatedConfig: SchedulerConfig = { + jobs: config.jobs.map((j) => (j.id === jobId ? updatedJob : j)), + }; + + yield* writeConfig(updatedConfig); + + if (updatedJob.enabled) { + yield* startJob(updatedJob); + } + + return updatedJob; + }); + + const deleteJob = (jobId: string) => + Effect.gen(function* () { + const config = yield* readConfig.pipe( + Effect.catchTags({ + ConfigFileNotFoundError: () => + initializeConfig.pipe(Effect.map(() => ({ jobs: [] }))), + ConfigParseError: () => + initializeConfig.pipe(Effect.map(() => ({ jobs: [] }))), + }), + ); + const job = config.jobs.find((j) => j.id === jobId); + + if (job === undefined) { + return yield* Effect.fail(new SchedulerJobNotFoundError({ jobId })); + } + + yield* stopJob(jobId); + + const updatedConfig: SchedulerConfig = { + jobs: config.jobs.filter((j) => j.id !== jobId), + }; + + yield* writeConfig(updatedConfig); + }); + + return { + startScheduler, + stopScheduler, + getJobs, + addJob, + updateJob, + deleteJob, + }; +}); + +export type ISchedulerService = InferEffect; + +export class SchedulerService extends Context.Tag("SchedulerService")< + SchedulerService, + ISchedulerService +>() { + static Live = Layer.effect(this, LayerImpl); +} diff --git a/src/server/core/scheduler/presentation/SchedulerController.ts b/src/server/core/scheduler/presentation/SchedulerController.ts new file mode 100644 index 0000000..27d6aae --- /dev/null +++ b/src/server/core/scheduler/presentation/SchedulerController.ts @@ -0,0 +1,106 @@ +import type { FileSystem, Path } from "@effect/platform"; +import type { CommandExecutor } from "@effect/platform/CommandExecutor"; +import { Context, Effect, Layer, Runtime } from "effect"; +import { Hono, type Context as HonoContext } from "hono"; +import type { InferEffect } from "../../../lib/effect/types"; +import type { ClaudeCodeLifeCycleService } from "../../claude-code/services/ClaudeCodeLifeCycleService"; +import type { EnvService } from "../../platform/services/EnvService"; +import type { UserConfigService } from "../../platform/services/UserConfigService"; +import type { ProjectRepository } from "../../project/infrastructure/ProjectRepository"; +import { SchedulerService } from "../domain/Scheduler"; +import { newSchedulerJobSchema, updateSchedulerJobSchema } from "../schema"; + +const LayerImpl = Effect.gen(function* () { + const schedulerService = yield* SchedulerService; + + const runtime = yield* Effect.runtime< + | FileSystem.FileSystem + | Path.Path + | CommandExecutor + | EnvService + | ProjectRepository + | UserConfigService + | ClaudeCodeLifeCycleService + >(); + + const app = new Hono() + .get("/jobs", async (c: HonoContext) => { + const result = await Runtime.runPromise(runtime)( + schedulerService.getJobs(), + ); + return c.json(result); + }) + .post("/jobs", async (c: HonoContext) => { + const body = await c.req.json(); + const parsed = newSchedulerJobSchema.safeParse(body); + + if (!parsed.success) { + return c.json( + { error: "Invalid request body", details: parsed.error }, + 400, + ); + } + + const result = await Runtime.runPromise(runtime)( + schedulerService.addJob(parsed.data), + ); + return c.json(result, 201); + }) + .patch("/jobs/:id", async (c: HonoContext) => { + const id = c.req.param("id"); + const body = await c.req.json(); + const parsed = updateSchedulerJobSchema.safeParse(body); + + if (!parsed.success) { + return c.json( + { error: "Invalid request body", details: parsed.error }, + 400, + ); + } + + const result = await Runtime.runPromise(runtime)( + schedulerService + .updateJob(id, parsed.data) + .pipe( + Effect.catchTag("SchedulerJobNotFoundError", () => + Effect.succeed(null), + ), + ), + ); + + if (result === null) { + return c.json({ error: "Job not found" }, 404); + } + + return c.json(result); + }) + .delete("/jobs/:id", async (c: HonoContext) => { + const id = c.req.param("id"); + + const result = await Runtime.runPromise(runtime)( + schedulerService.deleteJob(id).pipe( + Effect.catchTag("SchedulerJobNotFoundError", () => + Effect.succeed(false), + ), + Effect.map(() => true), + ), + ); + + if (!result) { + return c.json({ error: "Job not found" }, 404); + } + + return c.json({ success: true }, 200); + }); + + return { app }; +}); + +export type ISchedulerController = InferEffect; + +export class SchedulerController extends Context.Tag("SchedulerController")< + SchedulerController, + ISchedulerController +>() { + static Live = Layer.effect(this, LayerImpl); +} diff --git a/src/server/core/scheduler/schema.ts b/src/server/core/scheduler/schema.ts new file mode 100644 index 0000000..7b9b5a5 --- /dev/null +++ b/src/server/core/scheduler/schema.ts @@ -0,0 +1,85 @@ +import { z } from "zod"; + +// Schedule type discriminated union +export const cronScheduleSchema = z.object({ + type: z.literal("cron"), + expression: z.string(), +}); + +export const fixedScheduleSchema = z.object({ + type: z.literal("fixed"), + delayMs: z.number().int().positive(), + oneTime: z.boolean(), +}); + +export const scheduleSchema = z.discriminatedUnion("type", [ + cronScheduleSchema, + fixedScheduleSchema, +]); + +// Message configuration +export const messageConfigSchema = z.object({ + content: z.string(), + projectId: z.string(), + baseSessionId: z.string().nullable(), +}); + +// Job status +export const jobStatusSchema = z.enum(["success", "failed"]); + +// Concurrency policy +export const concurrencyPolicySchema = z.enum(["skip", "run"]); + +// Scheduler job +export const schedulerJobSchema = z.object({ + id: z.string(), + name: z.string(), + schedule: scheduleSchema, + message: messageConfigSchema, + enabled: z.boolean(), + concurrencyPolicy: concurrencyPolicySchema, + createdAt: z.string().datetime(), + lastRunAt: z.string().datetime().nullable(), + lastRunStatus: jobStatusSchema.nullable(), +}); + +// Config file schema +export const schedulerConfigSchema = z.object({ + jobs: z.array(schedulerJobSchema), +}); + +// Type exports +export type CronSchedule = z.infer; +export type FixedSchedule = z.infer; +export type Schedule = z.infer; +export type MessageConfig = z.infer; +export type JobStatus = z.infer; +export type ConcurrencyPolicy = z.infer; +export type SchedulerJob = z.infer; +export type SchedulerConfig = z.infer; + +// New job creation schema (without runtime fields) +export const newSchedulerJobSchema = schedulerJobSchema + .omit({ + id: true, + createdAt: true, + lastRunAt: true, + lastRunStatus: true, + }) + .extend({ + enabled: z.boolean().default(true), + concurrencyPolicy: concurrencyPolicySchema.default("skip"), + }); + +export type NewSchedulerJob = z.infer; + +// Job update schema (partial fields) +export const updateSchedulerJobSchema = schedulerJobSchema.partial().pick({ + name: true, + schedule: true, + message: true, + enabled: true, + concurrencyPolicy: true, +}); + +export type UpdateSchedulerJob = z.infer; diff --git a/src/server/hono/route.ts b/src/server/hono/route.ts index 9b4715f..0136be4 100644 --- a/src/server/hono/route.ts +++ b/src/server/hono/route.ts @@ -18,6 +18,7 @@ import { CommitRequestSchema, PushRequestSchema } from "../core/git/schema"; import { EnvService } from "../core/platform/services/EnvService"; import { UserConfigService } from "../core/platform/services/UserConfigService"; import { ProjectController } from "../core/project/presentation/ProjectController"; +import { SchedulerController } from "../core/scheduler/presentation/SchedulerController"; import type { VirtualConversationDatabase } from "../core/session/infrastructure/VirtualConversationDatabase"; import { SessionController } from "../core/session/presentation/SessionController"; import type { SessionMetaService } from "../core/session/services/SessionMetaService"; @@ -40,6 +41,7 @@ export const routes = (app: HonoAppType) => const sseController = yield* SSEController; const fileSystemController = yield* FileSystemController; const claudeCodeController = yield* ClaudeCodeController; + const schedulerController = yield* SchedulerController; // services const envService = yield* EnvService; @@ -440,6 +442,12 @@ export const routes = (app: HonoAppType) => ); }) + /** + * SchedulerController Routes + */ + + .route("/scheduler", schedulerController.app) + /** * FileSystemController Routes */