import type { SDKMessage, SDKUserMessage } from "@anthropic-ai/claude-code"; import type { FileSystem, Path } from "@effect/platform"; import type { CommandExecutor } from "@effect/platform/CommandExecutor"; import { Context, Effect, Layer, Runtime } from "effect"; import { ulid } from "ulid"; import { controllablePromise } from "../../../lib/controllablePromise"; import type { Config } from "../../config/config"; import type { InferEffect } from "../../lib/effect/types"; import { EventBus } from "../events/EventBus"; import type { SessionMetaService } from "../session/SessionMetaService"; import { SessionRepository } from "../session/SessionRepository"; import { VirtualConversationDatabase } from "../session/VirtualConversationDatabase"; import * as ClaudeCode from "./ClaudeCode"; import { ClaudeCodePermissionService } from "./ClaudeCodePermissionService"; import { ClaudeCodeSessionProcessService } from "./ClaudeCodeSessionProcessService"; import { createMessageGenerator } from "./MessageGenerator"; import * as CCSessionProcess from "./models/CCSessionProcess"; export type MessageGenerator = () => AsyncGenerator< SDKUserMessage, void, unknown >; const LayerImpl = Effect.gen(function* () { const eventBusService = yield* EventBus; const sessionRepository = yield* SessionRepository; const sessionProcessService = yield* ClaudeCodeSessionProcessService; const virtualConversationDatabase = yield* VirtualConversationDatabase; const permissionService = yield* ClaudeCodePermissionService; const runtime = yield* Effect.runtime< | FileSystem.FileSystem | Path.Path | CommandExecutor | VirtualConversationDatabase | SessionMetaService | ClaudeCodePermissionService >(); const continueTask = (options: { sessionProcessId: string; baseSessionId: string; message: string; }) => { const { sessionProcessId, baseSessionId, message } = options; return Effect.gen(function* () { const { sessionProcess, task } = yield* sessionProcessService.continueSessionProcess({ sessionProcessId, taskDef: { type: "continue", sessionId: baseSessionId, baseSessionId: baseSessionId, taskId: ulid(), }, }); const virtualConversation = yield* CCSessionProcess.createVirtualConversation(sessionProcess, { sessionId: baseSessionId, userMessage: message, }); yield* virtualConversationDatabase.createVirtualConversation( sessionProcess.def.projectId, baseSessionId, [virtualConversation], ); sessionProcess.def.setNextMessage(message); return { sessionProcess, task, }; }); }; const startTask = (options: { config: Config; baseSession: { cwd: string; projectId: string; sessionId?: string; }; message: string; }) => { const { baseSession, message, config } = options; return Effect.gen(function* () { const { generateMessages, setNextMessage, setHooks: setMessageGeneratorHooks, } = createMessageGenerator(); const { sessionProcess, task } = yield* sessionProcessService.startSessionProcess({ sessionDef: { projectId: baseSession.projectId, cwd: baseSession.cwd, abortController: new AbortController(), setNextMessage, sessionProcessId: ulid(), }, taskDef: baseSession.sessionId === undefined ? { type: "new", taskId: ulid(), } : { type: "resume", taskId: ulid(), sessionId: undefined, baseSessionId: baseSession.sessionId, }, }); const sessionInitializedPromise = controllablePromise<{ sessionId: string; }>(); const sessionFileCreatedPromise = controllablePromise<{ sessionId: string; }>(); setMessageGeneratorHooks({ onNewUserMessageResolved: async (message) => { Effect.runFork( sessionProcessService.toNotInitializedState({ sessionProcessId: sessionProcess.def.sessionProcessId, rawUserMessage: message, }), ); }, }); const handleMessage = (message: SDKMessage) => Effect.gen(function* () { const processState = yield* sessionProcessService.getSessionProcess( sessionProcess.def.sessionProcessId, ); if (processState.type === "completed") { return "break" as const; } if (processState.type === "paused") { // rule: paused は not_initialized に更新されてからくる想定 yield* Effect.die( new Error("Illegal state: paused is not expected"), ); } if ( message.type === "system" && message.subtype === "init" && processState.type === "not_initialized" ) { yield* sessionProcessService.toInitializedState({ sessionProcessId: processState.def.sessionProcessId, initContext: { initMessage: message, }, }); // Virtual Conversation Creation const virtualConversation = yield* CCSessionProcess.createVirtualConversation(processState, { sessionId: message.session_id, userMessage: processState.rawUserMessage, }); if (processState.currentTask.def.type === "new") { // 末尾に追加するだけで OK yield* virtualConversationDatabase.createVirtualConversation( baseSession.projectId, message.session_id, [virtualConversation], ); } else if (processState.currentTask.def.type === "resume") { const existingSession = yield* sessionRepository.getSession( processState.def.projectId, processState.currentTask.def.baseSessionId, ); const copiedConversations = existingSession.session === null ? [] : existingSession.session.conversations; yield* virtualConversationDatabase.createVirtualConversation( processState.def.projectId, message.session_id, [...copiedConversations, virtualConversation], ); } else { // do nothing } sessionInitializedPromise.resolve({ sessionId: message.session_id, }); yield* eventBusService.emit("sessionListChanged", { projectId: processState.def.projectId, }); yield* eventBusService.emit("sessionChanged", { projectId: processState.def.projectId, sessionId: message.session_id, }); return "continue" as const; } if ( message.type === "assistant" && processState.type === "initialized" ) { yield* sessionProcessService.toFileCreatedState({ sessionProcessId: processState.def.sessionProcessId, }); sessionFileCreatedPromise.resolve({ sessionId: message.session_id, }); yield* virtualConversationDatabase.deleteVirtualConversations( message.session_id, ); } if ( message.type === "result" && processState.type === "file_created" ) { yield* sessionProcessService.toPausedState({ sessionProcessId: processState.def.sessionProcessId, resultMessage: message, }); yield* eventBusService.emit("sessionChanged", { projectId: processState.def.projectId, sessionId: message.session_id, }); return "continue" as const; } return "continue" as const; }); const handleSessionProcessDaemon = async () => { const messageIter = await Runtime.runPromise(runtime)( Effect.gen(function* () { const permissionOptions = yield* permissionService.createCanUseToolRelatedOptions({ taskId: task.def.taskId, config, sessionId: task.def.baseSessionId, }); return yield* ClaudeCode.query(generateMessages(), { resume: task.def.baseSessionId, cwd: sessionProcess.def.cwd, abortController: sessionProcess.def.abortController, ...permissionOptions, }); }), ); setNextMessage(message); try { for await (const message of messageIter) { const result = await Runtime.runPromise(runtime)( handleMessage(message), ).catch((error) => { // iter 自体が落ちてなければ継続したいので握りつぶす Effect.runFork( sessionProcessService.changeTaskState({ sessionProcessId: sessionProcess.def.sessionProcessId, taskId: task.def.taskId, nextTask: { status: "failed", def: task.def, error: error, }, }), ); return "continue" as const; }); if (result === "break") { break; } else { } } } catch (error) { await Effect.runPromise( sessionProcessService.changeTaskState({ sessionProcessId: sessionProcess.def.sessionProcessId, taskId: task.def.taskId, nextTask: { status: "failed", def: task.def, error: error, }, }), ); } }; const daemonPromise = handleSessionProcessDaemon() .catch((error) => { console.error("Error occur in task daemon process", error); throw error; }) .finally(() => { Effect.runFork( Effect.gen(function* () { const currentProcess = yield* sessionProcessService.getSessionProcess( sessionProcess.def.sessionProcessId, ); yield* sessionProcessService.toCompletedState({ sessionProcessId: currentProcess.def.sessionProcessId, }); }), ); }); return { sessionProcess, task, daemonPromise, awaitSessionInitialized: async () => await sessionInitializedPromise.promise, awaitSessionFileCreated: async () => await sessionFileCreatedPromise.promise, }; }); }; const getPublicSessionProcesses = () => Effect.gen(function* () { const processes = yield* sessionProcessService.getSessionProcesses(); return processes.filter((process) => CCSessionProcess.isPublic(process)); }); const abortTask = (sessionProcessId: string): Effect.Effect => Effect.gen(function* () { const currentProcess = yield* sessionProcessService.getSessionProcess(sessionProcessId); yield* sessionProcessService.toCompletedState({ sessionProcessId: currentProcess.def.sessionProcessId, error: new Error("Task aborted"), }); }); const abortAllTasks = () => Effect.gen(function* () { const processes = yield* sessionProcessService.getSessionProcesses(); for (const process of processes) { yield* sessionProcessService.toCompletedState({ sessionProcessId: process.def.sessionProcessId, error: new Error("Task aborted"), }); } }); return { continueTask, startTask, abortTask, abortAllTasks, getPublicSessionProcesses, }; }); export type IClaudeCodeLifeCycleService = InferEffect; export class ClaudeCodeLifeCycleService extends Context.Tag( "ClaudeCodeLifeCycleService", )() { static Live = Layer.effect(this, LayerImpl); }