feat: implement continue chat (not resume if connected)

This commit is contained in:
d-kimsuon
2025-09-03 01:43:03 +09:00
parent 60b9c658f5
commit 79794be526
9 changed files with 388 additions and 219 deletions

View File

@@ -1,15 +1,19 @@
import { execSync } from "node:child_process";
import { query, type SDKMessage } from "@anthropic-ai/claude-code";
import { query } from "@anthropic-ai/claude-code";
import { ulid } from "ulid";
type OnMessage = (message: SDKMessage) => void | Promise<void>;
import {
createMessageGenerator,
type MessageGenerator,
type OnMessage,
} from "./createMessageGenerator";
type BaseClaudeCodeTask = {
id: string;
projectId: string;
sessionId?: string | undefined; // undefined = new session
baseSessionId?: string | undefined; // undefined = new session
cwd: string;
message: string;
generateMessages: MessageGenerator;
setNextMessage: (message: string) => void;
onMessageHandlers: OnMessage[];
};
@@ -19,29 +23,40 @@ type PendingClaudeCodeTask = BaseClaudeCodeTask & {
type RunningClaudeCodeTask = BaseClaudeCodeTask & {
status: "running";
nextSessionId: string;
sessionId: string;
userMessageId: string;
abortController: AbortController;
};
type PausedClaudeCodeTask = BaseClaudeCodeTask & {
status: "paused";
sessionId: string;
userMessageId: string;
abortController: AbortController;
};
type CompletedClaudeCodeTask = BaseClaudeCodeTask & {
status: "completed";
nextSessionId: string;
sessionId: string;
userMessageId: string;
abortController: AbortController;
};
type FailedClaudeCodeTask = BaseClaudeCodeTask & {
status: "failed";
nextSessionId?: string;
sessionId?: string;
userMessageId?: string;
abortController?: AbortController;
};
type ClaudeCodeTask =
| PendingClaudeCodeTask
| RunningClaudeCodeTask
| PausedClaudeCodeTask
| CompletedClaudeCodeTask
| FailedClaudeCodeTask;
type AliveClaudeCodeTask = RunningClaudeCodeTask | PausedClaudeCodeTask;
export class ClaudeCodeTaskController {
private pathToClaudeCodeExecutable: string;
private tasks: ClaudeCodeTask[] = [];
@@ -52,28 +67,185 @@ export class ClaudeCodeTaskController {
.trim();
}
public async createTask(
taskDef: Omit<ClaudeCodeTask, "id" | "status" | "onMessageHandlers">,
onMessage?: OnMessage,
) {
const task: ClaudeCodeTask = {
...taskDef,
id: ulid(),
status: "pending",
onMessageHandlers: typeof onMessage === "function" ? [onMessage] : [],
};
public get aliveTasks() {
return this.tasks.filter(
(task) => task.status === "running" || task.status === "paused",
);
}
this.tasks.push(task);
public async startOrContinueTask(
currentSession: {
cwd: string;
projectId: string;
sessionId?: string;
},
message: string,
): Promise<AliveClaudeCodeTask> {
const existingTask = this.aliveTasks.find(
(task) => task.sessionId === currentSession.sessionId,
);
if (existingTask) {
return this.continueTask(existingTask, message);
} else {
return await this.startTask(currentSession, message);
}
}
private continueTask(task: AliveClaudeCodeTask, message: string) {
task.setNextMessage(message);
return task;
}
public get pendingTasks() {
return this.tasks.filter((task) => task.status === "pending");
private startTask(
currentSession: {
cwd: string;
projectId: string;
sessionId?: string;
},
message: string,
) {
const { generateMessages, setNextMessage } =
createMessageGenerator(message);
const task: PendingClaudeCodeTask = {
status: "pending",
id: ulid(),
projectId: currentSession.projectId,
baseSessionId: currentSession.sessionId,
cwd: currentSession.cwd,
generateMessages,
setNextMessage,
onMessageHandlers: [],
};
let aliveTaskResolve: (task: AliveClaudeCodeTask) => void;
let aliveTaskReject: (error: unknown) => void;
const aliveTaskPromise = new Promise<AliveClaudeCodeTask>(
(resolve, reject) => {
aliveTaskResolve = resolve;
aliveTaskReject = reject;
},
);
let resolved = false;
const handleTask = async () => {
try {
const abortController = new AbortController();
let currentTask: AliveClaudeCodeTask | undefined;
for await (const message of query({
prompt: task.generateMessages(),
options: {
resume: task.baseSessionId,
cwd: task.cwd,
pathToClaudeCodeExecutable: this.pathToClaudeCodeExecutable,
permissionMode: "bypassPermissions",
abortController: abortController,
},
})) {
currentTask ??= this.aliveTasks.find((t) => t.id === task.id);
if (currentTask !== undefined && currentTask.status === "paused") {
this.updateExistingTask({
...currentTask,
status: "running",
});
}
// 初回の system message だとまだ history ファイルが作成されていないので
if (
!resolved &&
(message.type === "user" || message.type === "assistant") &&
message.uuid !== undefined
) {
const runningTask: RunningClaudeCodeTask = {
status: "running",
id: task.id,
projectId: task.projectId,
cwd: task.cwd,
generateMessages: task.generateMessages,
setNextMessage: task.setNextMessage,
onMessageHandlers: task.onMessageHandlers,
userMessageId: message.uuid,
sessionId: message.session_id,
abortController: abortController,
};
this.tasks.push(runningTask);
aliveTaskResolve(runningTask);
resolved = true;
}
await Promise.all(
task.onMessageHandlers.map(async (onMessageHandler) => {
await onMessageHandler(message);
}),
);
if (currentTask !== undefined && message.type === "result") {
this.updateExistingTask({
...currentTask,
status: "paused",
});
}
}
const updatedTask = this.aliveTasks.find((t) => t.id === task.id);
if (updatedTask === undefined) {
const error = new Error(
`illegal state: task is not running, task: ${JSON.stringify(updatedTask)}`,
);
aliveTaskReject(error);
throw error;
}
this.updateExistingTask({
...updatedTask,
status: "completed",
});
} catch (error) {
if (!resolved) {
aliveTaskReject(error);
resolved = true;
}
console.error("Error resuming task", error);
this.updateExistingTask({
...task,
status: "failed",
});
}
};
// continue background
void handleTask();
return aliveTaskPromise;
}
public get runningTasks() {
return this.tasks.filter((task) => task.status === "running");
public abortTask(sessionId: string) {
const task = this.aliveTasks.find((task) => task.sessionId === sessionId);
if (!task) {
throw new Error("Alive Task not found");
}
task.abortController.abort();
this.updateExistingTask({
id: task.id,
projectId: task.projectId,
sessionId: task.sessionId,
status: "failed",
cwd: task.cwd,
generateMessages: task.generateMessages,
setNextMessage: task.setNextMessage,
onMessageHandlers: task.onMessageHandlers,
baseSessionId: task.baseSessionId,
userMessageId: task.userMessageId,
});
}
private updateExistingTask(task: ClaudeCodeTask) {
@@ -85,113 +257,4 @@ export class ClaudeCodeTaskController {
Object.assign(target, task);
}
public startTask(id: string) {
const task = this.tasks.find((task) => task.id === id);
if (!task) {
throw new Error("Task not found");
}
let runningTaskResolve: (task: RunningClaudeCodeTask) => void;
let runningTaskReject: (error: unknown) => void;
const runningTaskPromise = new Promise<RunningClaudeCodeTask>(
(resolve, reject) => {
runningTaskResolve = resolve;
runningTaskReject = reject;
},
);
let resolved = false;
const handleTask = async () => {
try {
const abortController = new AbortController();
for await (const message of query({
prompt: task.message,
options: {
resume: task.sessionId,
cwd: task.cwd,
pathToClaudeCodeExecutable: this.pathToClaudeCodeExecutable,
permissionMode: "bypassPermissions",
abortController,
},
})) {
// 初回の sysmte message だとまだ history ファイルが作成されていないので
if (
!resolved &&
(message.type === "user" || message.type === "assistant") &&
message.uuid !== undefined
) {
const runningTask: RunningClaudeCodeTask = {
...task,
status: "running",
nextSessionId: message.session_id,
userMessageId: message.uuid,
abortController,
};
this.updateExistingTask(runningTask);
runningTaskResolve(runningTask);
resolved = true;
}
await Promise.all(
task.onMessageHandlers.map(async (onMessageHandler) => {
await onMessageHandler(message);
}),
);
}
if (task.status !== "running") {
const error = new Error(
`illegal state: task is not running, task: ${JSON.stringify(task)}`,
);
runningTaskReject(error);
throw error;
}
this.updateExistingTask({
...task,
status: "completed",
nextSessionId: task.nextSessionId,
userMessageId: task.userMessageId,
});
} catch (error) {
if (!resolved) {
runningTaskReject(error);
resolved = true;
}
console.error("Error resuming task", error);
task.status = "failed";
}
};
// continue background
void handleTask();
return runningTaskPromise;
}
public abortTask(sessionId: string) {
const task = this.tasks
.filter((task) => task.status === "running")
.find((task) => task.nextSessionId === sessionId);
if (!task) {
throw new Error("Running Task not found");
}
task.abortController.abort();
this.updateExistingTask({
id: task.id,
status: "failed",
cwd: task.cwd,
message: task.message,
onMessageHandlers: task.onMessageHandlers,
projectId: task.projectId,
nextSessionId: task.nextSessionId,
sessionId: task.sessionId,
userMessageId: task.userMessageId,
});
}
}