diff --git a/package.json b/package.json index 4f6f5f0..901e8a7 100644 --- a/package.json +++ b/package.json @@ -46,6 +46,7 @@ "class-variance-authority": "^0.7.1", "clsx": "^2.1.1", "hono": "^4.9.5", + "jotai": "^2.13.1", "lucide-react": "^0.542.0", "next": "15.5.2", "react": "^19.1.1", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 4b64fd5..e9df31a 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -44,6 +44,9 @@ importers: hono: specifier: ^4.9.5 version: 4.9.5 + jotai: + specifier: ^2.13.1 + version: 2.13.1(@types/react@19.1.12)(react@19.1.1) lucide-react: specifier: ^0.542.0 version: 0.542.0(react@19.1.1) @@ -2099,6 +2102,24 @@ packages: resolution: {integrity: sha512-twQoecYPiVA5K/h6SxtORw/Bs3ar+mLUtoPSc7iMXzQzK8d7eJ/R09wmTwAjiamETn1cXYPGfNnu7DMoHgu12w==} hasBin: true + jotai@2.13.1: + resolution: {integrity: sha512-cRsw6kFeGC9Z/D3egVKrTXRweycZ4z/k7i2MrfCzPYsL9SIWcPXTyqv258/+Ay8VUEcihNiE/coBLE6Kic6b8A==} + engines: {node: '>=12.20.0'} + peerDependencies: + '@babel/core': '>=7.0.0' + '@babel/template': '>=7.0.0' + '@types/react': '>=17.0.0' + react: '>=17.0.0' + peerDependenciesMeta: + '@babel/core': + optional: true + '@babel/template': + optional: true + '@types/react': + optional: true + react: + optional: true + js-tokens@9.0.1: resolution: {integrity: sha512-mxa9E9ITFOt0ban3j6L5MpjwegGz6lBQmM1IJkWeBZGcMxto50+eWdjC/52xDbS2vy0k7vIMK0Fe2wfL9OQSpQ==} @@ -4935,6 +4956,11 @@ snapshots: jiti@2.5.1: {} + jotai@2.13.1(@types/react@19.1.12)(react@19.1.1): + optionalDependencies: + '@types/react': 19.1.12 + react: 19.1.1 + js-tokens@9.0.1: {} json-parse-even-better-errors@4.0.0: {} diff --git a/src/app/projects/[projectId]/components/newChat/NewChat.tsx b/src/app/projects/[projectId]/components/newChat/NewChat.tsx index e39851f..a81f944 100644 --- a/src/app/projects/[projectId]/components/newChat/NewChat.tsx +++ b/src/app/projects/[projectId]/components/newChat/NewChat.tsx @@ -1,4 +1,4 @@ -import { useMutation, useQueryClient } from "@tanstack/react-query"; +import { useMutation } from "@tanstack/react-query"; import { AlertCircleIcon, LoaderIcon, SendIcon } from "lucide-react"; import { useRouter } from "next/navigation"; import { type FC, useId, useRef, useState } from "react"; @@ -16,7 +16,6 @@ export const NewChat: FC<{ }> = ({ projectId, onSuccess }) => { const router = useRouter(); const textareaRef = useRef(null); - const queryClient = useQueryClient(); const startNewChat = useMutation({ mutationFn: async (options: { message: string }) => { @@ -31,14 +30,11 @@ export const NewChat: FC<{ throw new Error(response.statusText); } - await queryClient.invalidateQueries({ queryKey: ["aliveTasks"] }); - return response.json(); }, onSuccess: async (response) => { setMessage(""); onSuccess?.(); - await queryClient.invalidateQueries({ queryKey: ["aliveTasks"] }); router.push( `/projects/${projectId}/sessions/${response.sessionId}#message-${response.userMessageId}`, diff --git a/src/app/projects/[projectId]/sessions/[sessionId]/components/resumeChat/ResumeChat.tsx b/src/app/projects/[projectId]/sessions/[sessionId]/components/resumeChat/ResumeChat.tsx index ab825c0..67ec543 100644 --- a/src/app/projects/[projectId]/sessions/[sessionId]/components/resumeChat/ResumeChat.tsx +++ b/src/app/projects/[projectId]/sessions/[sessionId]/components/resumeChat/ResumeChat.tsx @@ -1,4 +1,4 @@ -import { useMutation, useQueryClient } from "@tanstack/react-query"; +import { useMutation } from "@tanstack/react-query"; import { AlertCircleIcon, LoaderIcon, @@ -30,7 +30,6 @@ export const ResumeChat: FC<{ }> = ({ projectId, sessionId, isPausedTask }) => { const router = useRouter(); const textareaRef = useRef(null); - const queryClient = useQueryClient(); const resumeChat = useMutation({ mutationFn: async (options: { message: string }) => { @@ -45,13 +44,10 @@ export const ResumeChat: FC<{ throw new Error(response.statusText); } - await queryClient.invalidateQueries({ queryKey: ["aliveTasks"] }); - return response.json(); }, onSuccess: async (response) => { setMessage(""); - await queryClient.invalidateQueries({ queryKey: ["aliveTasks"] }); if (sessionId !== response.sessionId) { router.push( `/projects/${projectId}/sessions/${response.sessionId}#message-${response.userMessageId}`, diff --git a/src/app/projects/[projectId]/sessions/[sessionId]/hooks/useAliveTask.ts b/src/app/projects/[projectId]/sessions/[sessionId]/hooks/useAliveTask.ts index 5370664..f13709e 100644 --- a/src/app/projects/[projectId]/sessions/[sessionId]/hooks/useAliveTask.ts +++ b/src/app/projects/[projectId]/sessions/[sessionId]/hooks/useAliveTask.ts @@ -1,9 +1,13 @@ import { useQuery } from "@tanstack/react-query"; +import { useAtom } from "jotai"; import { useMemo } from "react"; import { honoClient } from "../../../../../../lib/api/client"; +import { aliveTasksAtom } from "../store/aliveTasksAtom"; export const useAliveTask = (sessionId: string) => { - const { data } = useQuery({ + const [aliveTasks, setAliveTasks] = useAtom(aliveTasksAtom); + + useQuery({ queryKey: ["aliveTasks"], queryFn: async () => { const response = await honoClient.api.tasks.alive.$get({}); @@ -12,21 +16,22 @@ export const useAliveTask = (sessionId: string) => { throw new Error(response.statusText); } + const data = await response.json(); + setAliveTasks(data.aliveTasks); return response.json(); }, refetchOnReconnect: true, }); const taskInfo = useMemo(() => { - const aliveTask = data?.aliveTasks.find( - (task) => task.sessionId === sessionId, - ); + const aliveTask = aliveTasks.find((task) => task.sessionId === sessionId); + return { - aliveTask, + aliveTask: aliveTasks.find((task) => task.sessionId === sessionId), isRunningTask: aliveTask?.status === "running", isPausedTask: aliveTask?.status === "paused", } as const; - }, [data, sessionId]); + }, [aliveTasks, sessionId]); return taskInfo; }; diff --git a/src/app/projects/[projectId]/sessions/[sessionId]/store/aliveTasksAtom.ts b/src/app/projects/[projectId]/sessions/[sessionId]/store/aliveTasksAtom.ts new file mode 100644 index 0000000..90dd9de --- /dev/null +++ b/src/app/projects/[projectId]/sessions/[sessionId]/store/aliveTasksAtom.ts @@ -0,0 +1,4 @@ +import { atom } from "jotai"; +import type { SerializableAliveTask } from "../../../../../../server/service/claude-code/types"; + +export const aliveTasksAtom = atom([]); diff --git a/src/hooks/useServerEvents.ts b/src/hooks/useServerEvents.ts index ea62627..1416746 100644 --- a/src/hooks/useServerEvents.ts +++ b/src/hooks/useServerEvents.ts @@ -1,5 +1,7 @@ import { useQueryClient } from "@tanstack/react-query"; +import { useSetAtom } from "jotai"; import { useCallback, useEffect } from "react"; +import { aliveTasksAtom } from "../app/projects/[projectId]/sessions/[sessionId]/store/aliveTasksAtom"; import { projetsQueryConfig } from "../app/projects/hooks/useProjects"; import { honoClient } from "../lib/api/client"; import type { SSEEvent } from "../server/service/events/types"; @@ -36,7 +38,7 @@ const parseSSEEvent = (text: string): ParsedEvent => { id: id.slice("id:".length).trim(), event: event.slice("event:".length).trim(), data: JSON.parse( - data.slice(data.indexOf("{"), data.indexOf("}") + 1), + data.slice(data.indexOf("{"), data.lastIndexOf("}") + 1), ) as SSEEvent, }; }; @@ -53,6 +55,7 @@ let isInitialized = false; export const useServerEvents = () => { const queryClient = useQueryClient(); + const setAliveTasks = useSetAtom(aliveTasksAtom); const listener = useCallback(async () => { console.log("listening to events"); @@ -86,9 +89,13 @@ export const useServerEvents = () => { if (event.data.type === "session_changed") { await queryClient.invalidateQueries({ queryKey: ["sessions"] }); } + + if (event.data.type === "task_changed") { + setAliveTasks(event.data.data); + } } } - }, [queryClient]); + }, [queryClient, setAliveTasks]); useEffect(() => { if (isInitialized === false) { diff --git a/src/server/hono/route.ts b/src/server/hono/route.ts index 2e27e15..47e03ef 100644 --- a/src/server/hono/route.ts +++ b/src/server/hono/route.ts @@ -7,9 +7,10 @@ import { streamSSE } from "hono/streaming"; import { z } from "zod"; import { configSchema } from "../config/config"; import { ClaudeCodeTaskController } from "../service/claude-code/ClaudeCodeTaskController"; +import type { SerializableAliveTask } from "../service/claude-code/types"; +import { getEventBus } from "../service/events/EventBus"; import { getFileWatcher } from "../service/events/fileWatcher"; -import { sseEvent } from "../service/events/sseEvent"; -import type { WatcherEvent } from "../service/events/types"; +import { sseEventResponse } from "../service/events/sseEventResponse"; import { getProject } from "../service/project/getProject"; import { getProjects } from "../service/project/getProjects"; import { getSession } from "../service/session/getSession"; @@ -233,7 +234,16 @@ export const routes = (app: HonoAppType) => { ) .get("/tasks/alive", async (c) => { - return c.json({ aliveTasks: taskController.aliveTasks }); + return c.json({ + aliveTasks: taskController.aliveTasks.map( + (task): SerializableAliveTask => ({ + id: task.id, + status: task.status, + sessionId: task.sessionId, + userMessageId: task.userMessageId, + }), + ), + }); }) .post( @@ -251,26 +261,16 @@ export const routes = (app: HonoAppType) => { c, async (stream) => { const fileWatcher = getFileWatcher(); + const eventBus = getEventBus(); + let isConnected = true; - let eventId = 0; // ハートビート設定 const heartbeat = setInterval(() => { if (isConnected) { - stream - .writeSSE({ - data: sseEvent({ - type: "heartbeat", - timestamp: new Date().toISOString(), - }), - event: "heartbeat", - id: String(eventId++), - }) - .catch(() => { - console.warn("Failed to write SSE event"); - isConnected = false; - onConnectionClosed(); - }); + eventBus.emit("heartbeat", { + type: "heartbeat", + }); } }, 30 * 1000); @@ -296,59 +296,63 @@ export const routes = (app: HonoAppType) => { // イベントリスナーを登録 console.log("Registering SSE event listeners"); - fileWatcher.on("project_changed", async (event: WatcherEvent) => { + eventBus.on("connected", async (event) => { if (!isConnected) { return; } - - if (event.eventType !== "project_changed") { - return; - } - - await stream - .writeSSE({ - data: sseEvent({ - type: event.eventType, - ...event.data, - }), - event: event.eventType, - id: String(eventId++), - }) - .catch(() => { - console.warn("Failed to write SSE event"); - onConnectionClosed(); - }); + await stream.writeSSE(sseEventResponse(event)).catch(() => { + onConnectionClosed(); + }); }); - fileWatcher.on("session_changed", async (event: WatcherEvent) => { + + eventBus.on("heartbeat", async (event) => { + if (!isConnected) { + return; + } + await stream.writeSSE(sseEventResponse(event)).catch(() => { + onConnectionClosed(); + }); + }); + + eventBus.on("project_changed", async (event) => { if (!isConnected) { return; } - await stream - .writeSSE({ - data: sseEvent({ - ...event.data, - type: event.eventType, - }), - event: event.eventType, - id: String(eventId++), - }) - .catch(() => { - onConnectionClosed(); - }); + await stream.writeSSE(sseEventResponse(event)).catch(() => { + console.warn("Failed to write SSE event"); + onConnectionClosed(); + }); + }); + + eventBus.on("session_changed", async (event) => { + if (!isConnected) { + return; + } + + await stream.writeSSE(sseEventResponse(event)).catch(() => { + onConnectionClosed(); + }); + }); + + eventBus.on("task_changed", async (event) => { + if (!isConnected) { + return; + } + + await stream.writeSSE(sseEventResponse(event)).catch(() => { + onConnectionClosed(); + }); }); // 初期接続確認メッセージ - await stream.writeSSE({ - data: sseEvent({ - type: "connected", - message: "SSE connection established", - timestamp: new Date().toISOString(), - }), - event: "connected", - id: String(eventId++), + eventBus.emit("connected", { + type: "connected", + message: "SSE connection established", }); + fileWatcher.startWatching(); + await connectionPromise; }, async (err, stream) => { diff --git a/src/server/service/claude-code/ClaudeCodeTaskController.ts b/src/server/service/claude-code/ClaudeCodeTaskController.ts index 5d386a0..7bb066e 100644 --- a/src/server/service/claude-code/ClaudeCodeTaskController.ts +++ b/src/server/service/claude-code/ClaudeCodeTaskController.ts @@ -1,70 +1,25 @@ import { execSync } from "node:child_process"; import { query } from "@anthropic-ai/claude-code"; import { ulid } from "ulid"; -import { - createMessageGenerator, - type MessageGenerator, - type OnMessage, -} from "./createMessageGenerator"; - -type BaseClaudeCodeTask = { - id: string; - projectId: string; - baseSessionId?: string | undefined; // undefined = new session - cwd: string; - generateMessages: MessageGenerator; - setNextMessage: (message: string) => void; - onMessageHandlers: OnMessage[]; -}; - -type PendingClaudeCodeTask = BaseClaudeCodeTask & { - status: "pending"; -}; - -type RunningClaudeCodeTask = BaseClaudeCodeTask & { - status: "running"; - sessionId: string; - userMessageId: string; - abortController: AbortController; -}; - -type PausedClaudeCodeTask = BaseClaudeCodeTask & { - status: "paused"; - sessionId: string; - userMessageId: string; - abortController: AbortController; -}; - -type CompletedClaudeCodeTask = BaseClaudeCodeTask & { - status: "completed"; - sessionId: string; - userMessageId: string; - abortController: AbortController; -}; - -type FailedClaudeCodeTask = BaseClaudeCodeTask & { - status: "failed"; - sessionId?: string; - userMessageId?: string; - abortController?: AbortController; -}; - -type ClaudeCodeTask = - | RunningClaudeCodeTask - | PausedClaudeCodeTask - | CompletedClaudeCodeTask - | FailedClaudeCodeTask; - -type AliveClaudeCodeTask = RunningClaudeCodeTask | PausedClaudeCodeTask; +import { type EventBus, getEventBus } from "../events/EventBus"; +import { createMessageGenerator } from "./createMessageGenerator"; +import type { + AliveClaudeCodeTask, + ClaudeCodeTask, + PendingClaudeCodeTask, + RunningClaudeCodeTask, +} from "./types"; export class ClaudeCodeTaskController { private pathToClaudeCodeExecutable: string; private tasks: ClaudeCodeTask[] = []; + private eventBus: EventBus; constructor() { this.pathToClaudeCodeExecutable = execSync("which claude", {}) .toString() .trim(); + this.eventBus = getEventBus(); } public get aliveTasks() { @@ -256,5 +211,10 @@ export class ClaudeCodeTaskController { } Object.assign(target, task); + + this.eventBus.emit("task_changed", { + type: "task_changed", + data: this.aliveTasks, + }); } } diff --git a/src/server/service/claude-code/types.ts b/src/server/service/claude-code/types.ts new file mode 100644 index 0000000..12e2908 --- /dev/null +++ b/src/server/service/claude-code/types.ts @@ -0,0 +1,56 @@ +import type { MessageGenerator, OnMessage } from "./createMessageGenerator"; + +type BaseClaudeCodeTask = { + id: string; + projectId: string; + baseSessionId?: string | undefined; // undefined = new session + cwd: string; + generateMessages: MessageGenerator; + setNextMessage: (message: string) => void; + onMessageHandlers: OnMessage[]; +}; + +export type PendingClaudeCodeTask = BaseClaudeCodeTask & { + status: "pending"; +}; + +export type RunningClaudeCodeTask = BaseClaudeCodeTask & { + status: "running"; + sessionId: string; + userMessageId: string; + abortController: AbortController; +}; + +export type PausedClaudeCodeTask = BaseClaudeCodeTask & { + status: "paused"; + sessionId: string; + userMessageId: string; + abortController: AbortController; +}; + +type CompletedClaudeCodeTask = BaseClaudeCodeTask & { + status: "completed"; + sessionId: string; + userMessageId: string; + abortController: AbortController; +}; + +type FailedClaudeCodeTask = BaseClaudeCodeTask & { + status: "failed"; + sessionId?: string; + userMessageId?: string; + abortController?: AbortController; +}; + +export type ClaudeCodeTask = + | RunningClaudeCodeTask + | PausedClaudeCodeTask + | CompletedClaudeCodeTask + | FailedClaudeCodeTask; + +export type AliveClaudeCodeTask = RunningClaudeCodeTask | PausedClaudeCodeTask; + +export type SerializableAliveTask = Pick< + AliveClaudeCodeTask, + "id" | "status" | "sessionId" | "userMessageId" +>; diff --git a/src/server/service/events/EventBus.ts b/src/server/service/events/EventBus.ts new file mode 100644 index 0000000..3d5a4c7 --- /dev/null +++ b/src/server/service/events/EventBus.ts @@ -0,0 +1,45 @@ +import { EventEmitter } from "node:events"; + +import type { BaseSSEEvent, SSEEvent } from "./types"; + +export class EventBus { + private previousId = 0; + private eventEmitter = new EventEmitter(); + + public emit< + T extends SSEEvent["type"], + E = SSEEvent extends infer I ? (I extends { type: T } ? I : never) : never, + >(type: T, event: Omit): void { + const base: BaseSSEEvent = { + id: String(this.previousId++), + timestamp: new Date().toISOString(), + }; + + this.eventEmitter.emit(type, { + ...event, + ...base, + }); + } + + public on( + event: SSEEvent["type"], + listener: (event: SSEEvent) => void, + ): void { + this.eventEmitter.on(event, listener); + } + + public off( + event: SSEEvent["type"], + listener: (event: SSEEvent) => void, + ): void { + this.eventEmitter.off(event, listener); + } +} + +// Singleton +let eventBusInstance: EventBus | null = null; + +export const getEventBus = (): EventBus => { + eventBusInstance ??= new EventBus(); + return eventBusInstance; +}; diff --git a/src/server/service/events/fileWatcher.ts b/src/server/service/events/fileWatcher.ts index 7d1c046..d1d44ae 100644 --- a/src/server/service/events/fileWatcher.ts +++ b/src/server/service/events/fileWatcher.ts @@ -1,8 +1,7 @@ -import { EventEmitter } from "node:events"; import { type FSWatcher, watch } from "node:fs"; import z from "zod"; import { claudeProjectPath } from "../paths"; -import type { WatcherEvent } from "./types"; +import { type EventBus, getEventBus } from "./EventBus"; const fileRegExp = /(?.*?)\/(?.*?)\.jsonl/; const fileRegExpGroupSchema = z.object({ @@ -10,16 +9,16 @@ const fileRegExpGroupSchema = z.object({ sessionId: z.string(), }); -export class FileWatcherService extends EventEmitter { +export class FileWatcherService { private watcher: FSWatcher | null = null; private projectWatchers: Map = new Map(); + private eventBus: EventBus; constructor() { - super(); - this.startWatching(); + this.eventBus = getEventBus(); } - private startWatching(): void { + public startWatching(): void { try { console.log("Starting file watcher on:", claudeProjectPath); // メインプロジェクトディレクトリを監視 @@ -37,19 +36,22 @@ export class FileWatcherService extends EventEmitter { const { projectId, sessionId } = groups.data; - this.emit("project_changed", { - eventType: "project_changed", - data: { projectId, fileEventType: eventType }, - } satisfies WatcherEvent); + this.eventBus.emit("project_changed", { + type: "project_changed", + data: { + fileEventType: eventType, + projectId, + }, + }); - this.emit("session_changed", { - eventType: "session_changed", + this.eventBus.emit("session_changed", { + type: "session_changed", data: { projectId, sessionId, fileEventType: eventType, }, - } satisfies WatcherEvent); + }); }, ); console.log("File watcher initialization completed"); diff --git a/src/server/service/events/sseEvent.ts b/src/server/service/events/sseEvent.ts deleted file mode 100644 index f915c21..0000000 --- a/src/server/service/events/sseEvent.ts +++ /dev/null @@ -1,13 +0,0 @@ -import type { BaseSSEEvent, SSEEvent } from "./types"; - -let eventId = 0; - -export const sseEvent = >( - data: D, -): string => { - return JSON.stringify({ - ...data, - id: String(eventId++), - timestamp: new Date().toISOString(), - } satisfies D & BaseSSEEvent); -}; diff --git a/src/server/service/events/sseEventResponse.ts b/src/server/service/events/sseEventResponse.ts new file mode 100644 index 0000000..541b584 --- /dev/null +++ b/src/server/service/events/sseEventResponse.ts @@ -0,0 +1,9 @@ +import type { SSEEvent } from "./types"; + +export const sseEventResponse = (event: SSEEvent) => { + return { + data: JSON.stringify(event), + event: event.type, + id: event.id, + }; +}; diff --git a/src/server/service/events/types.ts b/src/server/service/events/types.ts index b5583fc..83e8e62 100644 --- a/src/server/service/events/types.ts +++ b/src/server/service/events/types.ts @@ -1,4 +1,5 @@ import type { WatchEventType } from "node:fs"; +import type { SerializableAliveTask } from "../claude-code/types"; export type WatcherEvent = | { @@ -27,15 +28,17 @@ export type SSEEvent = BaseSSEEvent & timestamp: string; } | { - id: string; type: "project_changed"; data: ProjectChangedData; } | { - id: string; type: "session_changed"; data: SessionChangedData; } + | { + type: "task_changed"; + data: SerializableAliveTask[]; + } ); export interface ProjectChangedData {