implement BE for scheduler feat

This commit is contained in:
d-kimsuon
2025-10-25 01:32:42 +09:00
parent c2409017e5
commit 974d87daf7
10 changed files with 1207 additions and 0 deletions

View File

@@ -16,6 +16,8 @@ import { GitService } from "../../../server/core/git/services/GitService";
import { ProjectRepository } from "../../../server/core/project/infrastructure/ProjectRepository";
import { ProjectController } from "../../../server/core/project/presentation/ProjectController";
import { ProjectMetaService } from "../../../server/core/project/services/ProjectMetaService";
import { SchedulerService } from "../../../server/core/scheduler/domain/Scheduler";
import { SchedulerController } from "../../../server/core/scheduler/presentation/SchedulerController";
import { SessionRepository } from "../../../server/core/session/infrastructure/SessionRepository";
import { VirtualConversationDatabase } from "../../../server/core/session/infrastructure/VirtualConversationDatabase";
import { SessionController } from "../../../server/core/session/presentation/SessionController";
@@ -40,6 +42,7 @@ await Effect.runPromise(
Effect.provide(ClaudeCodePermissionController.Live),
Effect.provide(FileSystemController.Live),
Effect.provide(SSEController.Live),
Effect.provide(SchedulerController.Live),
)
.pipe(
/** Application */
@@ -53,6 +56,7 @@ await Effect.runPromise(
Effect.provide(ClaudeCodeSessionProcessService.Live),
Effect.provide(ClaudeCodeService.Live),
Effect.provide(GitService.Live),
Effect.provide(SchedulerService.Live),
)
.pipe(
/** Infrastructure */

View File

@@ -0,0 +1,124 @@
import { mkdir, rm } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { FileSystem, Path } from "@effect/platform";
import { NodeFileSystem, NodePath } from "@effect/platform-node";
import { Effect, Layer } from "effect";
import { afterEach, beforeEach, describe, expect, test } from "vitest";
import {
getConfigPath,
initializeConfig,
readConfig,
writeConfig,
} from "./config";
import type { SchedulerConfig } from "./schema";
describe("scheduler config", () => {
let testDir: string;
const testLayer = Layer.mergeAll(NodeFileSystem.layer, NodePath.layer);
beforeEach(async () => {
testDir = join(tmpdir(), `scheduler-test-${Date.now()}`);
await mkdir(testDir, { recursive: true });
});
afterEach(async () => {
await rm(testDir, { recursive: true, force: true });
});
test("getConfigPath returns correct path", async () => {
const result = await Effect.runPromise(
getConfigPath.pipe(Effect.provide(testLayer)),
);
expect(result).toContain(".claude-code-viewer/scheduler/config.json");
});
test("writeConfig and readConfig work correctly", async () => {
const config: SchedulerConfig = {
jobs: [
{
id: "test-job-1",
name: "Test Job",
schedule: {
type: "cron",
expression: "0 0 * * *",
},
message: {
content: "test message",
projectId: "project-1",
baseSessionId: null,
},
enabled: true,
concurrencyPolicy: "skip",
createdAt: "2025-10-25T00:00:00Z",
lastRunAt: null,
lastRunStatus: null,
},
],
};
const result = await Effect.runPromise(
Effect.gen(function* () {
yield* writeConfig(config);
return yield* readConfig;
}).pipe(Effect.provide(testLayer)),
);
expect(result).toEqual(config);
});
test("initializeConfig creates file if not exists", async () => {
const result = await Effect.runPromise(
Effect.gen(function* () {
const configPath = yield* getConfigPath;
const fs = yield* FileSystem.FileSystem;
const exists = yield* fs.exists(configPath);
if (exists) {
yield* fs.remove(configPath);
}
return yield* initializeConfig;
}).pipe(Effect.provide(testLayer)),
);
expect(result).toEqual({ jobs: [] });
});
test("readConfig fails with ConfigFileNotFoundError when file does not exist", async () => {
const result = await Effect.runPromise(
Effect.gen(function* () {
const fs = yield* FileSystem.FileSystem;
const configPath = yield* getConfigPath;
const exists = yield* fs.exists(configPath);
if (exists) {
yield* fs.remove(configPath);
}
return yield* readConfig;
}).pipe(Effect.provide(testLayer), Effect.flip),
);
expect(result._tag).toBe("ConfigFileNotFoundError");
});
test("readConfig fails with ConfigParseError for invalid JSON", async () => {
const result = await Effect.runPromise(
Effect.gen(function* () {
const fs = yield* FileSystem.FileSystem;
const path = yield* Path.Path;
const configPath = yield* getConfigPath;
const configDir = path.dirname(configPath);
yield* fs.makeDirectory(configDir, { recursive: true });
yield* fs.writeFileString(configPath, "{ invalid json }");
return yield* readConfig;
}).pipe(Effect.provide(testLayer), Effect.flip),
);
expect(result._tag).toBe("ConfigParseError");
});
});

View File

@@ -0,0 +1,94 @@
import { homedir } from "node:os";
import { FileSystem, Path } from "@effect/platform";
import { Data, Effect } from "effect";
import { type SchedulerConfig, schedulerConfigSchema } from "./schema";
class ConfigFileNotFoundError extends Data.TaggedError(
"ConfigFileNotFoundError",
)<{
readonly path: string;
}> {}
class ConfigParseError extends Data.TaggedError("ConfigParseError")<{
readonly path: string;
readonly cause: unknown;
}> {}
const CONFIG_DIR = "scheduler";
const CONFIG_FILE = "config.json";
export const getConfigPath = Effect.gen(function* () {
const path = yield* Path.Path;
const baseDir = path.resolve(homedir(), ".claude-code-viewer");
return path.join(baseDir, CONFIG_DIR, CONFIG_FILE);
});
export const readConfig = Effect.gen(function* () {
const fs = yield* FileSystem.FileSystem;
const configPath = yield* getConfigPath;
const exists = yield* fs.exists(configPath);
if (!exists) {
return yield* Effect.fail(
new ConfigFileNotFoundError({ path: configPath }),
);
}
const content = yield* fs.readFileString(configPath);
const jsonResult = yield* Effect.try({
try: () => JSON.parse(content),
catch: (error) =>
new ConfigParseError({
path: configPath,
cause: error,
}),
});
const parsed = schedulerConfigSchema.safeParse(jsonResult);
if (!parsed.success) {
return yield* Effect.fail(
new ConfigParseError({
path: configPath,
cause: parsed.error,
}),
);
}
return parsed.data;
});
export const writeConfig = (config: SchedulerConfig) =>
Effect.gen(function* () {
const fs = yield* FileSystem.FileSystem;
const path = yield* Path.Path;
const configPath = yield* getConfigPath;
const configDir = path.dirname(configPath);
yield* fs.makeDirectory(configDir, { recursive: true });
const content = JSON.stringify(config, null, 2);
yield* fs.writeFileString(configPath, content);
});
export const initializeConfig = Effect.gen(function* () {
const result = yield* readConfig.pipe(
Effect.catchTags({
ConfigFileNotFoundError: () =>
Effect.gen(function* () {
const initialConfig: SchedulerConfig = { jobs: [] };
yield* writeConfig(initialConfig);
return initialConfig;
}),
ConfigParseError: () =>
Effect.gen(function* () {
const initialConfig: SchedulerConfig = { jobs: [] };
yield* writeConfig(initialConfig);
return initialConfig;
}),
}),
);
return result;
});

View File

@@ -0,0 +1,167 @@
import { describe, expect, test } from "vitest";
import type { SchedulerJob } from "../schema";
import { calculateFixedDelay, shouldExecuteJob } from "./Job";
describe("shouldExecuteJob", () => {
test("returns false when job is disabled", () => {
const job: SchedulerJob = {
id: "test-job",
name: "Test Job",
schedule: { type: "cron", expression: "* * * * *" },
message: { content: "test", projectId: "proj-1", baseSessionId: null },
enabled: false,
concurrencyPolicy: "skip",
createdAt: "2025-10-25T00:00:00Z",
lastRunAt: null,
lastRunStatus: null,
};
expect(shouldExecuteJob(job, new Date())).toBe(false);
});
test("returns true for cron job when enabled", () => {
const job: SchedulerJob = {
id: "test-job",
name: "Test Job",
schedule: { type: "cron", expression: "* * * * *" },
message: { content: "test", projectId: "proj-1", baseSessionId: null },
enabled: true,
concurrencyPolicy: "skip",
createdAt: "2025-10-25T00:00:00Z",
lastRunAt: null,
lastRunStatus: null,
};
expect(shouldExecuteJob(job, new Date())).toBe(true);
});
test("returns false for oneTime fixed job that has already run", () => {
const job: SchedulerJob = {
id: "test-job",
name: "Test Job",
schedule: { type: "fixed", delayMs: 60000, oneTime: true },
message: { content: "test", projectId: "proj-1", baseSessionId: null },
enabled: true,
concurrencyPolicy: "skip",
createdAt: "2025-10-25T00:00:00Z",
lastRunAt: "2025-10-25T00:01:00Z",
lastRunStatus: "success",
};
expect(shouldExecuteJob(job, new Date())).toBe(false);
});
test("returns false for oneTime fixed job when scheduled time has not arrived", () => {
const createdAt = new Date("2025-10-25T00:00:00Z");
const now = new Date("2025-10-25T00:00:30Z");
const job: SchedulerJob = {
id: "test-job",
name: "Test Job",
schedule: { type: "fixed", delayMs: 60000, oneTime: true },
message: { content: "test", projectId: "proj-1", baseSessionId: null },
enabled: true,
concurrencyPolicy: "skip",
createdAt: createdAt.toISOString(),
lastRunAt: null,
lastRunStatus: null,
};
expect(shouldExecuteJob(job, now)).toBe(false);
});
test("returns true for oneTime fixed job when scheduled time has arrived", () => {
const createdAt = new Date("2025-10-25T00:00:00Z");
const now = new Date("2025-10-25T00:01:01Z");
const job: SchedulerJob = {
id: "test-job",
name: "Test Job",
schedule: { type: "fixed", delayMs: 60000, oneTime: true },
message: { content: "test", projectId: "proj-1", baseSessionId: null },
enabled: true,
concurrencyPolicy: "skip",
createdAt: createdAt.toISOString(),
lastRunAt: null,
lastRunStatus: null,
};
expect(shouldExecuteJob(job, now)).toBe(true);
});
test("returns true for recurring fixed job", () => {
const job: SchedulerJob = {
id: "test-job",
name: "Test Job",
schedule: { type: "fixed", delayMs: 60000, oneTime: false },
message: { content: "test", projectId: "proj-1", baseSessionId: null },
enabled: true,
concurrencyPolicy: "skip",
createdAt: "2025-10-25T00:00:00Z",
lastRunAt: null,
lastRunStatus: null,
};
expect(shouldExecuteJob(job, new Date())).toBe(true);
});
});
describe("calculateFixedDelay", () => {
test("calculates delay correctly for future scheduled time", () => {
const createdAt = new Date("2025-10-25T00:00:00Z");
const now = new Date("2025-10-25T00:00:30Z");
const job: SchedulerJob = {
id: "test-job",
name: "Test Job",
schedule: { type: "fixed", delayMs: 60000, oneTime: true },
message: { content: "test", projectId: "proj-1", baseSessionId: null },
enabled: true,
concurrencyPolicy: "skip",
createdAt: createdAt.toISOString(),
lastRunAt: null,
lastRunStatus: null,
};
const delay = calculateFixedDelay(job, now);
expect(delay).toBe(30000);
});
test("returns 0 for past scheduled time", () => {
const createdAt = new Date("2025-10-25T00:00:00Z");
const now = new Date("2025-10-25T00:02:00Z");
const job: SchedulerJob = {
id: "test-job",
name: "Test Job",
schedule: { type: "fixed", delayMs: 60000, oneTime: true },
message: { content: "test", projectId: "proj-1", baseSessionId: null },
enabled: true,
concurrencyPolicy: "skip",
createdAt: createdAt.toISOString(),
lastRunAt: null,
lastRunStatus: null,
};
const delay = calculateFixedDelay(job, now);
expect(delay).toBe(0);
});
test("throws error for non-fixed schedule type", () => {
const job: SchedulerJob = {
id: "test-job",
name: "Test Job",
schedule: { type: "cron", expression: "* * * * *" },
message: { content: "test", projectId: "proj-1", baseSessionId: null },
enabled: true,
concurrencyPolicy: "skip",
createdAt: "2025-10-25T00:00:00Z",
lastRunAt: null,
lastRunStatus: null,
};
expect(() => calculateFixedDelay(job, new Date())).toThrow(
"Job schedule type must be fixed",
);
});
});

View File

@@ -0,0 +1,74 @@
import { Effect } from "effect";
import { ClaudeCodeLifeCycleService } from "../../claude-code/services/ClaudeCodeLifeCycleService";
import { UserConfigService } from "../../platform/services/UserConfigService";
import { ProjectRepository } from "../../project/infrastructure/ProjectRepository";
import type { SchedulerJob } from "../schema";
export const executeJob = (job: SchedulerJob) =>
Effect.gen(function* () {
const lifeCycleService = yield* ClaudeCodeLifeCycleService;
const projectRepository = yield* ProjectRepository;
const userConfigService = yield* UserConfigService;
const { message } = job;
const { project } = yield* projectRepository.getProject(message.projectId);
const userConfig = yield* userConfigService.getUserConfig();
if (project.meta.projectPath === null) {
return yield* Effect.fail(
new Error(`Project path not found for projectId: ${message.projectId}`),
);
}
if (message.baseSessionId === null) {
yield* lifeCycleService.startTask({
baseSession: {
cwd: project.meta.projectPath,
projectId: message.projectId,
sessionId: undefined,
},
userConfig,
message: message.content,
});
} else {
yield* lifeCycleService.continueTask({
sessionProcessId: message.baseSessionId,
message: message.content,
baseSessionId: message.baseSessionId,
});
}
});
export const shouldExecuteJob = (job: SchedulerJob, now: Date): boolean => {
if (!job.enabled) {
return false;
}
if (job.schedule.type === "cron") {
return true;
}
if (job.schedule.type === "fixed" && job.schedule.oneTime) {
if (job.lastRunStatus !== null) {
return false;
}
const createdAt = new Date(job.createdAt);
const scheduledTime = new Date(createdAt.getTime() + job.schedule.delayMs);
return now >= scheduledTime;
}
return true;
};
export const calculateFixedDelay = (job: SchedulerJob, now: Date): number => {
if (job.schedule.type !== "fixed") {
throw new Error("Job schedule type must be fixed");
}
const createdAt = new Date(job.createdAt);
const scheduledTime = new Date(createdAt.getTime() + job.schedule.delayMs);
const delay = scheduledTime.getTime() - now.getTime();
return Math.max(0, delay);
};

View File

@@ -0,0 +1,232 @@
import { mkdir, rm, unlink } from "node:fs/promises";
import { homedir, tmpdir } from "node:os";
import { join } from "node:path";
import { NodeContext, NodeFileSystem, NodePath } from "@effect/platform-node";
import { Effect, Layer } from "effect";
import { afterEach, beforeEach, describe, expect, test } from "vitest";
import { ClaudeCodeLifeCycleService } from "../../claude-code/services/ClaudeCodeLifeCycleService";
import { ClaudeCodeSessionProcessService } from "../../claude-code/services/ClaudeCodeSessionProcessService";
import { EnvService } from "../../platform/services/EnvService";
import { UserConfigService } from "../../platform/services/UserConfigService";
import { ProjectRepository } from "../../project/infrastructure/ProjectRepository";
import type { NewSchedulerJob } from "../schema";
import { SchedulerService } from "./Scheduler";
describe("SchedulerService", () => {
let testDir: string;
const mockSessionProcessService = Layer.succeed(
ClaudeCodeSessionProcessService,
{
startSessionProcess: () =>
Effect.succeed({ sessionProcess: {} as never, task: {} as never }),
continueSessionProcess: () =>
Effect.succeed({ sessionProcess: {} as never, task: {} as never }),
toNotInitializedState: () =>
Effect.succeed({ sessionProcess: {} as never, task: {} as never }),
toInitializedState: () => Effect.succeed({ sessionProcess: {} as never }),
toFileCreatedState: () => Effect.succeed({ sessionProcess: {} as never }),
toPausedState: () => Effect.succeed({ sessionProcess: {} as never }),
toCompletedState: () =>
Effect.succeed({ sessionProcess: {} as never, task: undefined }),
dangerouslyChangeProcessState: () => Effect.succeed({} as never),
getSessionProcesses: () => Effect.succeed([]),
getSessionProcess: () => Effect.succeed({} as never),
getTask: () => Effect.succeed({} as never),
changeTaskState: () => Effect.succeed({} as never),
},
);
const mockLifeCycleService = Layer.succeed(ClaudeCodeLifeCycleService, {
startTask: () => Effect.void,
continueTask: () => Effect.void,
} as never);
const mockProjectRepository = Layer.succeed(ProjectRepository, {
getProject: () =>
Effect.succeed({
project: {
meta: { projectPath: "/tmp/test-project" },
},
} as never),
} as never);
const mockUserConfigService = Layer.succeed(UserConfigService, {
getUserConfig: () =>
Effect.succeed({
hideNoUserMessageSession: true,
unifySameTitleSession: true,
enterKeyBehavior: "shift-enter-send",
permissionMode: "default",
locale: "ja",
}),
} as never);
const mockEnvService = Layer.succeed(EnvService, {
getEnv: () => Effect.succeed(undefined),
} as never);
const baseLayers = Layer.mergeAll(
NodeFileSystem.layer,
NodePath.layer,
NodeContext.layer,
mockSessionProcessService,
mockLifeCycleService,
mockProjectRepository,
mockUserConfigService,
mockEnvService,
);
const testLayer = Layer.mergeAll(SchedulerService.Live, baseLayers).pipe(
Layer.provide(baseLayers),
);
beforeEach(async () => {
testDir = join(tmpdir(), `scheduler-test-${Date.now()}`);
await mkdir(testDir, { recursive: true });
// Clean up existing config file
const configPath = join(
homedir(),
".claude-code-viewer",
"scheduler",
"config.json",
);
try {
await unlink(configPath);
} catch {
// Ignore if file doesn't exist
}
});
afterEach(async () => {
await rm(testDir, { recursive: true, force: true });
});
test("addJob creates a new job with generated id", async () => {
const newJob: NewSchedulerJob = {
name: "Test Job",
schedule: { type: "cron", expression: "0 0 * * *" },
message: {
content: "test message",
projectId: "project-1",
baseSessionId: null,
},
enabled: false,
concurrencyPolicy: "skip",
};
const result = await Effect.runPromise(
Effect.gen(function* () {
const service = yield* SchedulerService;
const job = yield* service.addJob(newJob);
return job;
}).pipe(Effect.provide(testLayer)),
);
expect(result.id).toBeDefined();
expect(result.name).toBe("Test Job");
expect(result.createdAt).toBeDefined();
expect(result.lastRunAt).toBe(null);
expect(result.lastRunStatus).toBe(null);
});
test("getJobs returns all jobs", async () => {
const newJob: NewSchedulerJob = {
name: "Test Job",
schedule: { type: "cron", expression: "0 0 * * *" },
message: {
content: "test message",
projectId: "project-1",
baseSessionId: null,
},
enabled: false,
concurrencyPolicy: "skip",
};
const result = await Effect.runPromise(
Effect.gen(function* () {
const service = yield* SchedulerService;
yield* service.addJob(newJob);
yield* service.addJob(newJob);
return yield* service.getJobs();
}).pipe(Effect.provide(testLayer)),
);
expect(result).toHaveLength(2);
});
test("updateJob modifies an existing job", async () => {
const newJob: NewSchedulerJob = {
name: "Test Job",
schedule: { type: "cron", expression: "0 0 * * *" },
message: {
content: "test message",
projectId: "project-1",
baseSessionId: null,
},
enabled: false,
concurrencyPolicy: "skip",
};
const result = await Effect.runPromise(
Effect.gen(function* () {
const service = yield* SchedulerService;
const job = yield* service.addJob(newJob);
const updated = yield* service.updateJob(job.id, {
name: "Updated Job",
});
return updated;
}).pipe(Effect.provide(testLayer)),
);
expect(result.name).toBe("Updated Job");
});
test("deleteJob removes a job", async () => {
const newJob: NewSchedulerJob = {
name: "Test Job",
schedule: { type: "cron", expression: "0 0 * * *" },
message: {
content: "test message",
projectId: "project-1",
baseSessionId: null,
},
enabled: false,
concurrencyPolicy: "skip",
};
const result = await Effect.runPromise(
Effect.gen(function* () {
const service = yield* SchedulerService;
const job = yield* service.addJob(newJob);
yield* service.deleteJob(job.id);
return yield* service.getJobs();
}).pipe(Effect.provide(testLayer)),
);
expect(result).toHaveLength(0);
});
test("updateJob fails with SchedulerJobNotFoundError for non-existent job", async () => {
const result = await Effect.runPromise(
Effect.gen(function* () {
const service = yield* SchedulerService;
return yield* service.updateJob("non-existent-id", { name: "Updated" });
}).pipe(Effect.provide(testLayer), Effect.flip),
);
expect(result._tag).toBe("SchedulerJobNotFoundError");
});
test("deleteJob fails with SchedulerJobNotFoundError for non-existent job", async () => {
const result = await Effect.runPromise(
Effect.gen(function* () {
const service = yield* SchedulerService;
return yield* service.deleteJob("non-existent-id");
}).pipe(Effect.provide(testLayer), Effect.flip),
);
expect(result._tag).toBe("SchedulerJobNotFoundError");
});
});

View File

@@ -0,0 +1,313 @@
import { randomUUID } from "node:crypto";
import {
Context,
Cron,
Data,
Duration,
Effect,
Fiber,
Layer,
Ref,
Schedule,
} from "effect";
import type { InferEffect } from "../../../lib/effect/types";
import { initializeConfig, readConfig, writeConfig } from "../config";
import type {
NewSchedulerJob,
SchedulerConfig,
SchedulerJob,
UpdateSchedulerJob,
} from "../schema";
import { calculateFixedDelay, executeJob, shouldExecuteJob } from "./Job";
class SchedulerJobNotFoundError extends Data.TaggedError(
"SchedulerJobNotFoundError",
)<{
readonly jobId: string;
}> {}
class InvalidCronExpressionError extends Data.TaggedError(
"InvalidCronExpressionError",
)<{
readonly expression: string;
readonly cause: unknown;
}> {}
const LayerImpl = Effect.gen(function* () {
const fibersRef = yield* Ref.make<
Map<string, Fiber.RuntimeFiber<unknown, unknown>>
>(new Map());
const runningJobsRef = yield* Ref.make<Set<string>>(new Set());
const startJob = (job: SchedulerJob) =>
Effect.gen(function* () {
const now = new Date();
if (job.schedule.type === "cron") {
const cronResult = Cron.parse(job.schedule.expression);
if (cronResult._tag === "Left") {
return yield* Effect.fail(
new InvalidCronExpressionError({
expression: job.schedule.expression,
cause: cronResult.left,
}),
);
}
const schedule = Schedule.cron(cronResult.right);
const fiber = yield* Effect.repeat(
runJobWithConcurrencyControl(job),
schedule,
).pipe(Effect.forkDaemon);
yield* Ref.update(fibersRef, (fibers) =>
new Map(fibers).set(job.id, fiber),
);
} else if (job.schedule.type === "fixed") {
if (!shouldExecuteJob(job, now)) {
return;
}
const delay = calculateFixedDelay(job, now);
const delayDuration = Duration.millis(delay);
if (job.schedule.oneTime) {
const fiber = yield* Effect.delay(
runJobWithConcurrencyControl(job),
delayDuration,
).pipe(Effect.forkDaemon);
yield* Ref.update(fibersRef, (fibers) =>
new Map(fibers).set(job.id, fiber),
);
} else {
const schedule = Schedule.spaced(delayDuration);
const fiber = yield* Effect.repeat(
runJobWithConcurrencyControl(job),
schedule,
).pipe(Effect.forkDaemon);
yield* Ref.update(fibersRef, (fibers) =>
new Map(fibers).set(job.id, fiber),
);
}
}
});
const runJobWithConcurrencyControl = (job: SchedulerJob) =>
Effect.gen(function* () {
if (job.concurrencyPolicy === "skip") {
const runningJobs = yield* Ref.get(runningJobsRef);
if (runningJobs.has(job.id)) {
return;
}
}
yield* Ref.update(runningJobsRef, (jobs) => new Set(jobs).add(job.id));
const result = yield* executeJob(job).pipe(
Effect.matchEffect({
onSuccess: () =>
updateJobStatus(job.id, "success", new Date().toISOString()),
onFailure: () =>
updateJobStatus(job.id, "failed", new Date().toISOString()),
}),
);
yield* Ref.update(runningJobsRef, (jobs) => {
const newJobs = new Set(jobs);
newJobs.delete(job.id);
return newJobs;
});
return result;
});
const updateJobStatus = (
jobId: string,
status: "success" | "failed",
runAt: string,
) =>
Effect.gen(function* () {
const config = yield* readConfig;
const job = config.jobs.find((j) => j.id === jobId);
if (job === undefined) {
return;
}
const updatedJob: SchedulerJob = {
...job,
lastRunAt: runAt,
lastRunStatus: status,
};
const updatedConfig: SchedulerConfig = {
jobs: config.jobs.map((j) => (j.id === jobId ? updatedJob : j)),
};
yield* writeConfig(updatedConfig);
});
const stopJob = (jobId: string) =>
Effect.gen(function* () {
const fibers = yield* Ref.get(fibersRef);
const fiber = fibers.get(jobId);
if (fiber !== undefined) {
yield* Fiber.interrupt(fiber);
yield* Ref.update(fibersRef, (fibers) => {
const newFibers = new Map(fibers);
newFibers.delete(jobId);
return newFibers;
});
}
});
const startScheduler = Effect.gen(function* () {
yield* initializeConfig;
const config = yield* readConfig;
for (const job of config.jobs) {
if (job.enabled) {
yield* startJob(job);
}
}
});
const stopScheduler = Effect.gen(function* () {
const fibers = yield* Ref.get(fibersRef);
for (const fiber of fibers.values()) {
yield* Fiber.interrupt(fiber);
}
yield* Ref.set(fibersRef, new Map());
});
const getJobs = () =>
Effect.gen(function* () {
const config = yield* readConfig.pipe(
Effect.catchTags({
ConfigFileNotFoundError: () =>
initializeConfig.pipe(Effect.map(() => ({ jobs: [] }))),
ConfigParseError: () =>
initializeConfig.pipe(Effect.map(() => ({ jobs: [] }))),
}),
);
return config.jobs;
});
const addJob = (newJob: NewSchedulerJob) =>
Effect.gen(function* () {
const config = yield* readConfig.pipe(
Effect.catchTags({
ConfigFileNotFoundError: () =>
initializeConfig.pipe(Effect.map(() => ({ jobs: [] }))),
ConfigParseError: () =>
initializeConfig.pipe(Effect.map(() => ({ jobs: [] }))),
}),
);
const job: SchedulerJob = {
...newJob,
id: randomUUID(),
createdAt: new Date().toISOString(),
lastRunAt: null,
lastRunStatus: null,
};
const updatedConfig: SchedulerConfig = {
jobs: [...config.jobs, job],
};
yield* writeConfig(updatedConfig);
if (job.enabled) {
yield* startJob(job);
}
return job;
});
const updateJob = (jobId: string, updates: UpdateSchedulerJob) =>
Effect.gen(function* () {
const config = yield* readConfig.pipe(
Effect.catchTags({
ConfigFileNotFoundError: () =>
initializeConfig.pipe(Effect.map(() => ({ jobs: [] }))),
ConfigParseError: () =>
initializeConfig.pipe(Effect.map(() => ({ jobs: [] }))),
}),
);
const job = config.jobs.find((j) => j.id === jobId);
if (job === undefined) {
return yield* Effect.fail(new SchedulerJobNotFoundError({ jobId }));
}
yield* stopJob(jobId);
const updatedJob: SchedulerJob = {
...job,
...updates,
};
const updatedConfig: SchedulerConfig = {
jobs: config.jobs.map((j) => (j.id === jobId ? updatedJob : j)),
};
yield* writeConfig(updatedConfig);
if (updatedJob.enabled) {
yield* startJob(updatedJob);
}
return updatedJob;
});
const deleteJob = (jobId: string) =>
Effect.gen(function* () {
const config = yield* readConfig.pipe(
Effect.catchTags({
ConfigFileNotFoundError: () =>
initializeConfig.pipe(Effect.map(() => ({ jobs: [] }))),
ConfigParseError: () =>
initializeConfig.pipe(Effect.map(() => ({ jobs: [] }))),
}),
);
const job = config.jobs.find((j) => j.id === jobId);
if (job === undefined) {
return yield* Effect.fail(new SchedulerJobNotFoundError({ jobId }));
}
yield* stopJob(jobId);
const updatedConfig: SchedulerConfig = {
jobs: config.jobs.filter((j) => j.id !== jobId),
};
yield* writeConfig(updatedConfig);
});
return {
startScheduler,
stopScheduler,
getJobs,
addJob,
updateJob,
deleteJob,
};
});
export type ISchedulerService = InferEffect<typeof LayerImpl>;
export class SchedulerService extends Context.Tag("SchedulerService")<
SchedulerService,
ISchedulerService
>() {
static Live = Layer.effect(this, LayerImpl);
}

View File

@@ -0,0 +1,106 @@
import type { FileSystem, Path } from "@effect/platform";
import type { CommandExecutor } from "@effect/platform/CommandExecutor";
import { Context, Effect, Layer, Runtime } from "effect";
import { Hono, type Context as HonoContext } from "hono";
import type { InferEffect } from "../../../lib/effect/types";
import type { ClaudeCodeLifeCycleService } from "../../claude-code/services/ClaudeCodeLifeCycleService";
import type { EnvService } from "../../platform/services/EnvService";
import type { UserConfigService } from "../../platform/services/UserConfigService";
import type { ProjectRepository } from "../../project/infrastructure/ProjectRepository";
import { SchedulerService } from "../domain/Scheduler";
import { newSchedulerJobSchema, updateSchedulerJobSchema } from "../schema";
const LayerImpl = Effect.gen(function* () {
const schedulerService = yield* SchedulerService;
const runtime = yield* Effect.runtime<
| FileSystem.FileSystem
| Path.Path
| CommandExecutor
| EnvService
| ProjectRepository
| UserConfigService
| ClaudeCodeLifeCycleService
>();
const app = new Hono()
.get("/jobs", async (c: HonoContext) => {
const result = await Runtime.runPromise(runtime)(
schedulerService.getJobs(),
);
return c.json(result);
})
.post("/jobs", async (c: HonoContext) => {
const body = await c.req.json();
const parsed = newSchedulerJobSchema.safeParse(body);
if (!parsed.success) {
return c.json(
{ error: "Invalid request body", details: parsed.error },
400,
);
}
const result = await Runtime.runPromise(runtime)(
schedulerService.addJob(parsed.data),
);
return c.json(result, 201);
})
.patch("/jobs/:id", async (c: HonoContext) => {
const id = c.req.param("id");
const body = await c.req.json();
const parsed = updateSchedulerJobSchema.safeParse(body);
if (!parsed.success) {
return c.json(
{ error: "Invalid request body", details: parsed.error },
400,
);
}
const result = await Runtime.runPromise(runtime)(
schedulerService
.updateJob(id, parsed.data)
.pipe(
Effect.catchTag("SchedulerJobNotFoundError", () =>
Effect.succeed(null),
),
),
);
if (result === null) {
return c.json({ error: "Job not found" }, 404);
}
return c.json(result);
})
.delete("/jobs/:id", async (c: HonoContext) => {
const id = c.req.param("id");
const result = await Runtime.runPromise(runtime)(
schedulerService.deleteJob(id).pipe(
Effect.catchTag("SchedulerJobNotFoundError", () =>
Effect.succeed(false),
),
Effect.map(() => true),
),
);
if (!result) {
return c.json({ error: "Job not found" }, 404);
}
return c.json({ success: true }, 200);
});
return { app };
});
export type ISchedulerController = InferEffect<typeof LayerImpl>;
export class SchedulerController extends Context.Tag("SchedulerController")<
SchedulerController,
ISchedulerController
>() {
static Live = Layer.effect(this, LayerImpl);
}

View File

@@ -0,0 +1,85 @@
import { z } from "zod";
// Schedule type discriminated union
export const cronScheduleSchema = z.object({
type: z.literal("cron"),
expression: z.string(),
});
export const fixedScheduleSchema = z.object({
type: z.literal("fixed"),
delayMs: z.number().int().positive(),
oneTime: z.boolean(),
});
export const scheduleSchema = z.discriminatedUnion("type", [
cronScheduleSchema,
fixedScheduleSchema,
]);
// Message configuration
export const messageConfigSchema = z.object({
content: z.string(),
projectId: z.string(),
baseSessionId: z.string().nullable(),
});
// Job status
export const jobStatusSchema = z.enum(["success", "failed"]);
// Concurrency policy
export const concurrencyPolicySchema = z.enum(["skip", "run"]);
// Scheduler job
export const schedulerJobSchema = z.object({
id: z.string(),
name: z.string(),
schedule: scheduleSchema,
message: messageConfigSchema,
enabled: z.boolean(),
concurrencyPolicy: concurrencyPolicySchema,
createdAt: z.string().datetime(),
lastRunAt: z.string().datetime().nullable(),
lastRunStatus: jobStatusSchema.nullable(),
});
// Config file schema
export const schedulerConfigSchema = z.object({
jobs: z.array(schedulerJobSchema),
});
// Type exports
export type CronSchedule = z.infer<typeof cronScheduleSchema>;
export type FixedSchedule = z.infer<typeof fixedScheduleSchema>;
export type Schedule = z.infer<typeof scheduleSchema>;
export type MessageConfig = z.infer<typeof messageConfigSchema>;
export type JobStatus = z.infer<typeof jobStatusSchema>;
export type ConcurrencyPolicy = z.infer<typeof concurrencyPolicySchema>;
export type SchedulerJob = z.infer<typeof schedulerJobSchema>;
export type SchedulerConfig = z.infer<typeof schedulerConfigSchema>;
// New job creation schema (without runtime fields)
export const newSchedulerJobSchema = schedulerJobSchema
.omit({
id: true,
createdAt: true,
lastRunAt: true,
lastRunStatus: true,
})
.extend({
enabled: z.boolean().default(true),
concurrencyPolicy: concurrencyPolicySchema.default("skip"),
});
export type NewSchedulerJob = z.infer<typeof newSchedulerJobSchema>;
// Job update schema (partial fields)
export const updateSchedulerJobSchema = schedulerJobSchema.partial().pick({
name: true,
schedule: true,
message: true,
enabled: true,
concurrencyPolicy: true,
});
export type UpdateSchedulerJob = z.infer<typeof updateSchedulerJobSchema>;

View File

@@ -18,6 +18,7 @@ import { CommitRequestSchema, PushRequestSchema } from "../core/git/schema";
import { EnvService } from "../core/platform/services/EnvService";
import { UserConfigService } from "../core/platform/services/UserConfigService";
import { ProjectController } from "../core/project/presentation/ProjectController";
import { SchedulerController } from "../core/scheduler/presentation/SchedulerController";
import type { VirtualConversationDatabase } from "../core/session/infrastructure/VirtualConversationDatabase";
import { SessionController } from "../core/session/presentation/SessionController";
import type { SessionMetaService } from "../core/session/services/SessionMetaService";
@@ -40,6 +41,7 @@ export const routes = (app: HonoAppType) =>
const sseController = yield* SSEController;
const fileSystemController = yield* FileSystemController;
const claudeCodeController = yield* ClaudeCodeController;
const schedulerController = yield* SchedulerController;
// services
const envService = yield* EnvService;
@@ -440,6 +442,12 @@ export const routes = (app: HonoAppType) =>
);
})
/**
* SchedulerController Routes
*/
.route("/scheduler", schedulerController.app)
/**
* FileSystemController Routes
*/