From 79794be5265707c4c681ea8ecdd73a8699f8752f Mon Sep 17 00:00:00 2001 From: d-kimsuon Date: Wed, 3 Sep 2025 01:43:03 +0900 Subject: [PATCH] feat: implement continue chat (not resume if connected) --- src/app/hooks/useConfig.ts | 6 +- .../components/newChat/NewChat.tsx | 11 +- .../components/SessionPageContent.tsx | 46 ++- .../components/resumeChat/ResumeChat.tsx | 26 +- .../[sessionId]/hooks/useAliveTask.ts | 32 ++ .../[sessionId]/hooks/useIsResummingTask.ts | 45 --- src/server/hono/route.ts | 44 ++- .../claude-code/ClaudeCodeTaskController.ts | 329 +++++++++++------- .../claude-code/createMessageGenerator.ts | 68 ++++ 9 files changed, 388 insertions(+), 219 deletions(-) create mode 100644 src/app/projects/[projectId]/sessions/[sessionId]/hooks/useAliveTask.ts delete mode 100644 src/app/projects/[projectId]/sessions/[sessionId]/hooks/useIsResummingTask.ts create mode 100644 src/server/service/claude-code/createMessageGenerator.ts diff --git a/src/app/hooks/useConfig.ts b/src/app/hooks/useConfig.ts index b7bff0c..aa308ce 100644 --- a/src/app/hooks/useConfig.ts +++ b/src/app/hooks/useConfig.ts @@ -28,8 +28,10 @@ export const useConfig = () => { }); return await response.json(); }, - onSuccess: () => { - queryClient.invalidateQueries({ queryKey: configQueryConfig.queryKey }); + onSuccess: async () => { + await queryClient.invalidateQueries({ + queryKey: configQueryConfig.queryKey, + }); }, }); diff --git a/src/app/projects/[projectId]/components/newChat/NewChat.tsx b/src/app/projects/[projectId]/components/newChat/NewChat.tsx index a12a407..e39851f 100644 --- a/src/app/projects/[projectId]/components/newChat/NewChat.tsx +++ b/src/app/projects/[projectId]/components/newChat/NewChat.tsx @@ -1,4 +1,4 @@ -import { useMutation } from "@tanstack/react-query"; +import { useMutation, useQueryClient } from "@tanstack/react-query"; import { AlertCircleIcon, LoaderIcon, SendIcon } from "lucide-react"; import { useRouter } from "next/navigation"; import { type FC, useId, useRef, useState } from "react"; @@ -16,6 +16,7 @@ export const NewChat: FC<{ }> = ({ projectId, onSuccess }) => { const router = useRouter(); const textareaRef = useRef(null); + const queryClient = useQueryClient(); const startNewChat = useMutation({ mutationFn: async (options: { message: string }) => { @@ -30,13 +31,17 @@ export const NewChat: FC<{ throw new Error(response.statusText); } + await queryClient.invalidateQueries({ queryKey: ["aliveTasks"] }); + return response.json(); }, - onSuccess: (response) => { + onSuccess: async (response) => { setMessage(""); onSuccess?.(); + await queryClient.invalidateQueries({ queryKey: ["aliveTasks"] }); + router.push( - `/projects/${projectId}/sessions/${response.nextSessionId}#message-${response.userMessageId}`, + `/projects/${projectId}/sessions/${response.sessionId}#message-${response.userMessageId}`, ); }, }); diff --git a/src/app/projects/[projectId]/sessions/[sessionId]/components/SessionPageContent.tsx b/src/app/projects/[projectId]/sessions/[sessionId]/components/SessionPageContent.tsx index 1fd7805..bf503eb 100644 --- a/src/app/projects/[projectId]/sessions/[sessionId]/components/SessionPageContent.tsx +++ b/src/app/projects/[projectId]/sessions/[sessionId]/components/SessionPageContent.tsx @@ -1,14 +1,14 @@ "use client"; import { useMutation } from "@tanstack/react-query"; -import { ArrowLeftIcon, LoaderIcon, XIcon } from "lucide-react"; +import { ArrowLeftIcon, LoaderIcon, PauseIcon, XIcon } from "lucide-react"; import Link from "next/link"; import type { FC } from "react"; import { useEffect, useRef, useState } from "react"; import { Button } from "@/components/ui/button"; import { honoClient } from "../../../../../../lib/api/client"; import { firstCommandToTitle } from "../../../services/firstCommandToTitle"; -import { useIsResummingTask } from "../hooks/useIsResummingTask"; +import { useAliveTask } from "../hooks/useAliveTask"; import { useSession } from "../hooks/useSession"; import { ConversationList } from "./conversationList/ConversationList"; import { ResumeChat } from "./resumeChat/ResumeChat"; @@ -37,15 +37,16 @@ export const SessionPageContent: FC<{ }, }); - const { isResummingTask } = useIsResummingTask(sessionId); + const { isRunningTask, isPausedTask } = useAliveTask(sessionId); - const [previouConversationLength, setPreviouConversationLength] = useState(0); + const [previousConversationLength, setPreviousConversationLength] = + useState(0); const scrollContainerRef = useRef(null); // 自動スクロール処理 useEffect(() => { - if (isResummingTask && conversations.length !== previouConversationLength) { - setPreviouConversationLength(conversations.length); + if (isRunningTask && conversations.length !== previousConversationLength) { + setPreviousConversationLength(conversations.length); const scrollContainer = scrollContainerRef.current; if (scrollContainer) { scrollContainer.scrollTo({ @@ -54,7 +55,7 @@ export const SessionPageContent: FC<{ }); } } - }, [conversations, isResummingTask, previouConversationLength]); + }, [conversations, isRunningTask, previousConversationLength]); return (
@@ -85,12 +86,33 @@ export const SessionPageContent: FC<{
- {isResummingTask && ( + {isRunningTask && (

- Conversation is being resumed... + Conversation is in progress... +

+
+ +
+ )} + + {isPausedTask && ( +
+ +
+

+ Conversation is paused...

diff --git a/src/app/projects/[projectId]/sessions/[sessionId]/components/resumeChat/ResumeChat.tsx b/src/app/projects/[projectId]/sessions/[sessionId]/components/resumeChat/ResumeChat.tsx index f118ff4..ab825c0 100644 --- a/src/app/projects/[projectId]/sessions/[sessionId]/components/resumeChat/ResumeChat.tsx +++ b/src/app/projects/[projectId]/sessions/[sessionId]/components/resumeChat/ResumeChat.tsx @@ -26,7 +26,8 @@ import { export const ResumeChat: FC<{ projectId: string; sessionId: string; -}> = ({ projectId, sessionId }) => { + isPausedTask: boolean; +}> = ({ projectId, sessionId, isPausedTask }) => { const router = useRouter(); const textareaRef = useRef(null); const queryClient = useQueryClient(); @@ -44,14 +45,18 @@ export const ResumeChat: FC<{ throw new Error(response.statusText); } + await queryClient.invalidateQueries({ queryKey: ["aliveTasks"] }); + return response.json(); }, - onSuccess: (response) => { + onSuccess: async (response) => { setMessage(""); - queryClient.invalidateQueries({ queryKey: ["runningTasks"] }); - router.push( - `/projects/${projectId}/sessions/${response.nextSessionId}#message-${response.userMessageId}`, - ); + await queryClient.invalidateQueries({ queryKey: ["aliveTasks"] }); + if (sessionId !== response.sessionId) { + router.push( + `/projects/${projectId}/sessions/${response.sessionId}#message-${response.userMessageId}`, + ); + } }, }); @@ -143,12 +148,17 @@ export const ResumeChat: FC<{ {resumeChat.isPending ? ( <> - Starting... + Starting... This may take a while. + + ) : isPausedTask ? ( + <> + + Send ) : ( <> - Resume Chat + Resume )} diff --git a/src/app/projects/[projectId]/sessions/[sessionId]/hooks/useAliveTask.ts b/src/app/projects/[projectId]/sessions/[sessionId]/hooks/useAliveTask.ts new file mode 100644 index 0000000..5370664 --- /dev/null +++ b/src/app/projects/[projectId]/sessions/[sessionId]/hooks/useAliveTask.ts @@ -0,0 +1,32 @@ +import { useQuery } from "@tanstack/react-query"; +import { useMemo } from "react"; +import { honoClient } from "../../../../../../lib/api/client"; + +export const useAliveTask = (sessionId: string) => { + const { data } = useQuery({ + queryKey: ["aliveTasks"], + queryFn: async () => { + const response = await honoClient.api.tasks.alive.$get({}); + + if (!response.ok) { + throw new Error(response.statusText); + } + + return response.json(); + }, + refetchOnReconnect: true, + }); + + const taskInfo = useMemo(() => { + const aliveTask = data?.aliveTasks.find( + (task) => task.sessionId === sessionId, + ); + return { + aliveTask, + isRunningTask: aliveTask?.status === "running", + isPausedTask: aliveTask?.status === "paused", + } as const; + }, [data, sessionId]); + + return taskInfo; +}; diff --git a/src/app/projects/[projectId]/sessions/[sessionId]/hooks/useIsResummingTask.ts b/src/app/projects/[projectId]/sessions/[sessionId]/hooks/useIsResummingTask.ts deleted file mode 100644 index 4d41ad4..0000000 --- a/src/app/projects/[projectId]/sessions/[sessionId]/hooks/useIsResummingTask.ts +++ /dev/null @@ -1,45 +0,0 @@ -import { useQuery } from "@tanstack/react-query"; -import { useMemo } from "react"; -import { honoClient } from "../../../../../../lib/api/client"; - -export const useIsResummingTask = (sessionId: string) => { - const { data, isLoading } = useQuery({ - queryKey: ["runningTasks"], - queryFn: async () => { - const response = await honoClient.api.tasks.running.$get({}); - - if (!response.ok) { - throw new Error(response.statusText); - } - - return response.json(); - }, - // Only poll when there might be running tasks - refetchInterval: (query) => { - const hasRunningTasks = (query.state.data?.runningTasks?.length ?? 0) > 0; - return hasRunningTasks ? 2000 : false; // Poll every 2s when there are tasks, stop when none - }, - // Keep data fresh for 30 seconds - staleTime: 30 * 1000, - // Keep in cache for 5 minutes - gcTime: 5 * 60 * 1000, - // Refetch when window regains focus - refetchOnWindowFocus: true, - }); - - const taskInfo = useMemo(() => { - const runningTask = data?.runningTasks.find( - (task) => task.nextSessionId === sessionId, - ); - return { - isResummingTask: Boolean(runningTask), - task: runningTask, - hasRunningTasks: (data?.runningTasks.length ?? 0) > 0, - }; - }, [data, sessionId]); - - return { - ...taskInfo, - isLoading, - }; -}; diff --git a/src/server/hono/route.ts b/src/server/hono/route.ts index 9d1da73..2e27e15 100644 --- a/src/server/hono/route.ts +++ b/src/server/hono/route.ts @@ -182,15 +182,19 @@ export const routes = (app: HonoAppType) => { return c.json({ error: "Project path not found" }, 400); } - const task = await taskController.createTask({ - projectId, - cwd: project.meta.projectPath, + const task = await taskController.startOrContinueTask( + { + projectId, + cwd: project.meta.projectPath, + }, message, - }); + ); - const { nextSessionId, userMessageId } = - await taskController.startTask(task.id); - return c.json({ taskId: task.id, nextSessionId, userMessageId }); + return c.json({ + taskId: task.id, + sessionId: task.sessionId, + userMessageId: task.userMessageId, + }); }, ) @@ -211,21 +215,25 @@ export const routes = (app: HonoAppType) => { return c.json({ error: "Project path not found" }, 400); } - const task = await taskController.createTask({ - projectId, - sessionId, - cwd: project.meta.projectPath, - message: resumeMessage, - }); + const task = await taskController.startOrContinueTask( + { + projectId, + sessionId, + cwd: project.meta.projectPath, + }, + resumeMessage, + ); - const { nextSessionId, userMessageId } = - await taskController.startTask(task.id); - return c.json({ taskId: task.id, nextSessionId, userMessageId }); + return c.json({ + taskId: task.id, + sessionId: task.sessionId, + userMessageId: task.userMessageId, + }); }, ) - .get("/tasks/running", async (c) => { - return c.json({ runningTasks: taskController.runningTasks }); + .get("/tasks/alive", async (c) => { + return c.json({ aliveTasks: taskController.aliveTasks }); }) .post( diff --git a/src/server/service/claude-code/ClaudeCodeTaskController.ts b/src/server/service/claude-code/ClaudeCodeTaskController.ts index 6ff33be..5d386a0 100644 --- a/src/server/service/claude-code/ClaudeCodeTaskController.ts +++ b/src/server/service/claude-code/ClaudeCodeTaskController.ts @@ -1,15 +1,19 @@ import { execSync } from "node:child_process"; -import { query, type SDKMessage } from "@anthropic-ai/claude-code"; +import { query } from "@anthropic-ai/claude-code"; import { ulid } from "ulid"; - -type OnMessage = (message: SDKMessage) => void | Promise; +import { + createMessageGenerator, + type MessageGenerator, + type OnMessage, +} from "./createMessageGenerator"; type BaseClaudeCodeTask = { id: string; projectId: string; - sessionId?: string | undefined; // undefined = new session + baseSessionId?: string | undefined; // undefined = new session cwd: string; - message: string; + generateMessages: MessageGenerator; + setNextMessage: (message: string) => void; onMessageHandlers: OnMessage[]; }; @@ -19,29 +23,40 @@ type PendingClaudeCodeTask = BaseClaudeCodeTask & { type RunningClaudeCodeTask = BaseClaudeCodeTask & { status: "running"; - nextSessionId: string; + sessionId: string; + userMessageId: string; + abortController: AbortController; +}; + +type PausedClaudeCodeTask = BaseClaudeCodeTask & { + status: "paused"; + sessionId: string; userMessageId: string; abortController: AbortController; }; type CompletedClaudeCodeTask = BaseClaudeCodeTask & { status: "completed"; - nextSessionId: string; + sessionId: string; userMessageId: string; + abortController: AbortController; }; type FailedClaudeCodeTask = BaseClaudeCodeTask & { status: "failed"; - nextSessionId?: string; + sessionId?: string; userMessageId?: string; + abortController?: AbortController; }; type ClaudeCodeTask = - | PendingClaudeCodeTask | RunningClaudeCodeTask + | PausedClaudeCodeTask | CompletedClaudeCodeTask | FailedClaudeCodeTask; +type AliveClaudeCodeTask = RunningClaudeCodeTask | PausedClaudeCodeTask; + export class ClaudeCodeTaskController { private pathToClaudeCodeExecutable: string; private tasks: ClaudeCodeTask[] = []; @@ -52,28 +67,185 @@ export class ClaudeCodeTaskController { .trim(); } - public async createTask( - taskDef: Omit, - onMessage?: OnMessage, - ) { - const task: ClaudeCodeTask = { - ...taskDef, - id: ulid(), - status: "pending", - onMessageHandlers: typeof onMessage === "function" ? [onMessage] : [], - }; + public get aliveTasks() { + return this.tasks.filter( + (task) => task.status === "running" || task.status === "paused", + ); + } - this.tasks.push(task); + public async startOrContinueTask( + currentSession: { + cwd: string; + projectId: string; + sessionId?: string; + }, + message: string, + ): Promise { + const existingTask = this.aliveTasks.find( + (task) => task.sessionId === currentSession.sessionId, + ); + if (existingTask) { + return this.continueTask(existingTask, message); + } else { + return await this.startTask(currentSession, message); + } + } + + private continueTask(task: AliveClaudeCodeTask, message: string) { + task.setNextMessage(message); return task; } - public get pendingTasks() { - return this.tasks.filter((task) => task.status === "pending"); + private startTask( + currentSession: { + cwd: string; + projectId: string; + sessionId?: string; + }, + message: string, + ) { + const { generateMessages, setNextMessage } = + createMessageGenerator(message); + + const task: PendingClaudeCodeTask = { + status: "pending", + id: ulid(), + projectId: currentSession.projectId, + baseSessionId: currentSession.sessionId, + cwd: currentSession.cwd, + generateMessages, + setNextMessage, + onMessageHandlers: [], + }; + + let aliveTaskResolve: (task: AliveClaudeCodeTask) => void; + let aliveTaskReject: (error: unknown) => void; + + const aliveTaskPromise = new Promise( + (resolve, reject) => { + aliveTaskResolve = resolve; + aliveTaskReject = reject; + }, + ); + + let resolved = false; + + const handleTask = async () => { + try { + const abortController = new AbortController(); + + let currentTask: AliveClaudeCodeTask | undefined; + + for await (const message of query({ + prompt: task.generateMessages(), + options: { + resume: task.baseSessionId, + cwd: task.cwd, + pathToClaudeCodeExecutable: this.pathToClaudeCodeExecutable, + permissionMode: "bypassPermissions", + abortController: abortController, + }, + })) { + currentTask ??= this.aliveTasks.find((t) => t.id === task.id); + + if (currentTask !== undefined && currentTask.status === "paused") { + this.updateExistingTask({ + ...currentTask, + status: "running", + }); + } + + // 初回の system message だとまだ history ファイルが作成されていないので + if ( + !resolved && + (message.type === "user" || message.type === "assistant") && + message.uuid !== undefined + ) { + const runningTask: RunningClaudeCodeTask = { + status: "running", + id: task.id, + projectId: task.projectId, + cwd: task.cwd, + generateMessages: task.generateMessages, + setNextMessage: task.setNextMessage, + onMessageHandlers: task.onMessageHandlers, + userMessageId: message.uuid, + sessionId: message.session_id, + abortController: abortController, + }; + this.tasks.push(runningTask); + aliveTaskResolve(runningTask); + resolved = true; + } + + await Promise.all( + task.onMessageHandlers.map(async (onMessageHandler) => { + await onMessageHandler(message); + }), + ); + + if (currentTask !== undefined && message.type === "result") { + this.updateExistingTask({ + ...currentTask, + status: "paused", + }); + } + } + + const updatedTask = this.aliveTasks.find((t) => t.id === task.id); + + if (updatedTask === undefined) { + const error = new Error( + `illegal state: task is not running, task: ${JSON.stringify(updatedTask)}`, + ); + aliveTaskReject(error); + throw error; + } + + this.updateExistingTask({ + ...updatedTask, + status: "completed", + }); + } catch (error) { + if (!resolved) { + aliveTaskReject(error); + resolved = true; + } + + console.error("Error resuming task", error); + this.updateExistingTask({ + ...task, + status: "failed", + }); + } + }; + + // continue background + void handleTask(); + + return aliveTaskPromise; } - public get runningTasks() { - return this.tasks.filter((task) => task.status === "running"); + public abortTask(sessionId: string) { + const task = this.aliveTasks.find((task) => task.sessionId === sessionId); + if (!task) { + throw new Error("Alive Task not found"); + } + + task.abortController.abort(); + this.updateExistingTask({ + id: task.id, + projectId: task.projectId, + sessionId: task.sessionId, + status: "failed", + cwd: task.cwd, + generateMessages: task.generateMessages, + setNextMessage: task.setNextMessage, + onMessageHandlers: task.onMessageHandlers, + baseSessionId: task.baseSessionId, + userMessageId: task.userMessageId, + }); } private updateExistingTask(task: ClaudeCodeTask) { @@ -85,113 +257,4 @@ export class ClaudeCodeTaskController { Object.assign(target, task); } - - public startTask(id: string) { - const task = this.tasks.find((task) => task.id === id); - if (!task) { - throw new Error("Task not found"); - } - - let runningTaskResolve: (task: RunningClaudeCodeTask) => void; - let runningTaskReject: (error: unknown) => void; - const runningTaskPromise = new Promise( - (resolve, reject) => { - runningTaskResolve = resolve; - runningTaskReject = reject; - }, - ); - - let resolved = false; - - const handleTask = async () => { - try { - const abortController = new AbortController(); - - for await (const message of query({ - prompt: task.message, - options: { - resume: task.sessionId, - cwd: task.cwd, - pathToClaudeCodeExecutable: this.pathToClaudeCodeExecutable, - permissionMode: "bypassPermissions", - abortController, - }, - })) { - // 初回の sysmte message だとまだ history ファイルが作成されていないので - if ( - !resolved && - (message.type === "user" || message.type === "assistant") && - message.uuid !== undefined - ) { - const runningTask: RunningClaudeCodeTask = { - ...task, - status: "running", - nextSessionId: message.session_id, - userMessageId: message.uuid, - abortController, - }; - this.updateExistingTask(runningTask); - runningTaskResolve(runningTask); - resolved = true; - } - - await Promise.all( - task.onMessageHandlers.map(async (onMessageHandler) => { - await onMessageHandler(message); - }), - ); - } - - if (task.status !== "running") { - const error = new Error( - `illegal state: task is not running, task: ${JSON.stringify(task)}`, - ); - runningTaskReject(error); - throw error; - } - - this.updateExistingTask({ - ...task, - status: "completed", - nextSessionId: task.nextSessionId, - userMessageId: task.userMessageId, - }); - } catch (error) { - if (!resolved) { - runningTaskReject(error); - resolved = true; - } - - console.error("Error resuming task", error); - task.status = "failed"; - } - }; - - // continue background - void handleTask(); - - return runningTaskPromise; - } - - public abortTask(sessionId: string) { - const task = this.tasks - .filter((task) => task.status === "running") - .find((task) => task.nextSessionId === sessionId); - if (!task) { - throw new Error("Running Task not found"); - } - - task.abortController.abort(); - this.updateExistingTask({ - id: task.id, - status: "failed", - cwd: task.cwd, - message: task.message, - onMessageHandlers: task.onMessageHandlers, - projectId: task.projectId, - nextSessionId: task.nextSessionId, - sessionId: task.sessionId, - userMessageId: task.userMessageId, - }); - } } diff --git a/src/server/service/claude-code/createMessageGenerator.ts b/src/server/service/claude-code/createMessageGenerator.ts new file mode 100644 index 0000000..1e86f96 --- /dev/null +++ b/src/server/service/claude-code/createMessageGenerator.ts @@ -0,0 +1,68 @@ +import type { SDKMessage, SDKUserMessage } from "@anthropic-ai/claude-code"; + +export type OnMessage = (message: SDKMessage) => void | Promise; + +export type MessageGenerator = () => AsyncGenerator< + SDKUserMessage, + void, + unknown +>; + +const createPromise = () => { + let promiseResolve: ((value: string) => void) | undefined; + let promiseReject: ((reason?: unknown) => void) | undefined; + + const promise = new Promise((resolve, reject) => { + promiseResolve = resolve; + promiseReject = reject; + }); + + if (!promiseResolve || !promiseReject) { + throw new Error("Illegal state: Promise not created"); + } + + return { + promise, + resolve: promiseResolve, + reject: promiseReject, + } as const; +}; + +export const createMessageGenerator = ( + firstMessage: string, +): { + generateMessages: MessageGenerator; + setNextMessage: (message: string) => void; +} => { + let currentPromise = createPromise(); + + const createMessage = (message: string): SDKUserMessage => { + return { + type: "user", + message: { + role: "user", + content: message, + }, + } as SDKUserMessage; + }; + + async function* generateMessages(): ReturnType { + yield createMessage(firstMessage); + + while (true) { + const message = await currentPromise.promise; + currentPromise = createPromise(); + + yield createMessage(message); + } + } + + const setNextMessage = (message: string) => { + currentPromise.resolve(message); + }; + + return { + generateMessages, + setNextMessage, + }; +};