feat: improve sync tasks status by using SSE

This commit is contained in:
d-kimsuon
2025-09-03 02:29:07 +09:00
parent 79794be526
commit 521a36812b
15 changed files with 260 additions and 159 deletions

View File

@@ -46,6 +46,7 @@
"class-variance-authority": "^0.7.1",
"clsx": "^2.1.1",
"hono": "^4.9.5",
"jotai": "^2.13.1",
"lucide-react": "^0.542.0",
"next": "15.5.2",
"react": "^19.1.1",

26
pnpm-lock.yaml generated
View File

@@ -44,6 +44,9 @@ importers:
hono:
specifier: ^4.9.5
version: 4.9.5
jotai:
specifier: ^2.13.1
version: 2.13.1(@types/react@19.1.12)(react@19.1.1)
lucide-react:
specifier: ^0.542.0
version: 0.542.0(react@19.1.1)
@@ -2099,6 +2102,24 @@ packages:
resolution: {integrity: sha512-twQoecYPiVA5K/h6SxtORw/Bs3ar+mLUtoPSc7iMXzQzK8d7eJ/R09wmTwAjiamETn1cXYPGfNnu7DMoHgu12w==}
hasBin: true
jotai@2.13.1:
resolution: {integrity: sha512-cRsw6kFeGC9Z/D3egVKrTXRweycZ4z/k7i2MrfCzPYsL9SIWcPXTyqv258/+Ay8VUEcihNiE/coBLE6Kic6b8A==}
engines: {node: '>=12.20.0'}
peerDependencies:
'@babel/core': '>=7.0.0'
'@babel/template': '>=7.0.0'
'@types/react': '>=17.0.0'
react: '>=17.0.0'
peerDependenciesMeta:
'@babel/core':
optional: true
'@babel/template':
optional: true
'@types/react':
optional: true
react:
optional: true
js-tokens@9.0.1:
resolution: {integrity: sha512-mxa9E9ITFOt0ban3j6L5MpjwegGz6lBQmM1IJkWeBZGcMxto50+eWdjC/52xDbS2vy0k7vIMK0Fe2wfL9OQSpQ==}
@@ -4935,6 +4956,11 @@ snapshots:
jiti@2.5.1: {}
jotai@2.13.1(@types/react@19.1.12)(react@19.1.1):
optionalDependencies:
'@types/react': 19.1.12
react: 19.1.1
js-tokens@9.0.1: {}
json-parse-even-better-errors@4.0.0: {}

View File

@@ -1,4 +1,4 @@
import { useMutation, useQueryClient } from "@tanstack/react-query";
import { useMutation } from "@tanstack/react-query";
import { AlertCircleIcon, LoaderIcon, SendIcon } from "lucide-react";
import { useRouter } from "next/navigation";
import { type FC, useId, useRef, useState } from "react";
@@ -16,7 +16,6 @@ export const NewChat: FC<{
}> = ({ projectId, onSuccess }) => {
const router = useRouter();
const textareaRef = useRef<HTMLTextAreaElement>(null);
const queryClient = useQueryClient();
const startNewChat = useMutation({
mutationFn: async (options: { message: string }) => {
@@ -31,14 +30,11 @@ export const NewChat: FC<{
throw new Error(response.statusText);
}
await queryClient.invalidateQueries({ queryKey: ["aliveTasks"] });
return response.json();
},
onSuccess: async (response) => {
setMessage("");
onSuccess?.();
await queryClient.invalidateQueries({ queryKey: ["aliveTasks"] });
router.push(
`/projects/${projectId}/sessions/${response.sessionId}#message-${response.userMessageId}`,

View File

@@ -1,4 +1,4 @@
import { useMutation, useQueryClient } from "@tanstack/react-query";
import { useMutation } from "@tanstack/react-query";
import {
AlertCircleIcon,
LoaderIcon,
@@ -30,7 +30,6 @@ export const ResumeChat: FC<{
}> = ({ projectId, sessionId, isPausedTask }) => {
const router = useRouter();
const textareaRef = useRef<HTMLTextAreaElement>(null);
const queryClient = useQueryClient();
const resumeChat = useMutation({
mutationFn: async (options: { message: string }) => {
@@ -45,13 +44,10 @@ export const ResumeChat: FC<{
throw new Error(response.statusText);
}
await queryClient.invalidateQueries({ queryKey: ["aliveTasks"] });
return response.json();
},
onSuccess: async (response) => {
setMessage("");
await queryClient.invalidateQueries({ queryKey: ["aliveTasks"] });
if (sessionId !== response.sessionId) {
router.push(
`/projects/${projectId}/sessions/${response.sessionId}#message-${response.userMessageId}`,

View File

@@ -1,9 +1,13 @@
import { useQuery } from "@tanstack/react-query";
import { useAtom } from "jotai";
import { useMemo } from "react";
import { honoClient } from "../../../../../../lib/api/client";
import { aliveTasksAtom } from "../store/aliveTasksAtom";
export const useAliveTask = (sessionId: string) => {
const { data } = useQuery({
const [aliveTasks, setAliveTasks] = useAtom(aliveTasksAtom);
useQuery({
queryKey: ["aliveTasks"],
queryFn: async () => {
const response = await honoClient.api.tasks.alive.$get({});
@@ -12,21 +16,22 @@ export const useAliveTask = (sessionId: string) => {
throw new Error(response.statusText);
}
const data = await response.json();
setAliveTasks(data.aliveTasks);
return response.json();
},
refetchOnReconnect: true,
});
const taskInfo = useMemo(() => {
const aliveTask = data?.aliveTasks.find(
(task) => task.sessionId === sessionId,
);
const aliveTask = aliveTasks.find((task) => task.sessionId === sessionId);
return {
aliveTask,
aliveTask: aliveTasks.find((task) => task.sessionId === sessionId),
isRunningTask: aliveTask?.status === "running",
isPausedTask: aliveTask?.status === "paused",
} as const;
}, [data, sessionId]);
}, [aliveTasks, sessionId]);
return taskInfo;
};

View File

@@ -0,0 +1,4 @@
import { atom } from "jotai";
import type { SerializableAliveTask } from "../../../../../../server/service/claude-code/types";
export const aliveTasksAtom = atom<SerializableAliveTask[]>([]);

View File

@@ -1,5 +1,7 @@
import { useQueryClient } from "@tanstack/react-query";
import { useSetAtom } from "jotai";
import { useCallback, useEffect } from "react";
import { aliveTasksAtom } from "../app/projects/[projectId]/sessions/[sessionId]/store/aliveTasksAtom";
import { projetsQueryConfig } from "../app/projects/hooks/useProjects";
import { honoClient } from "../lib/api/client";
import type { SSEEvent } from "../server/service/events/types";
@@ -36,7 +38,7 @@ const parseSSEEvent = (text: string): ParsedEvent => {
id: id.slice("id:".length).trim(),
event: event.slice("event:".length).trim(),
data: JSON.parse(
data.slice(data.indexOf("{"), data.indexOf("}") + 1),
data.slice(data.indexOf("{"), data.lastIndexOf("}") + 1),
) as SSEEvent,
};
};
@@ -53,6 +55,7 @@ let isInitialized = false;
export const useServerEvents = () => {
const queryClient = useQueryClient();
const setAliveTasks = useSetAtom(aliveTasksAtom);
const listener = useCallback(async () => {
console.log("listening to events");
@@ -86,9 +89,13 @@ export const useServerEvents = () => {
if (event.data.type === "session_changed") {
await queryClient.invalidateQueries({ queryKey: ["sessions"] });
}
if (event.data.type === "task_changed") {
setAliveTasks(event.data.data);
}
}
}
}, [queryClient]);
}, [queryClient, setAliveTasks]);
useEffect(() => {
if (isInitialized === false) {

View File

@@ -7,9 +7,10 @@ import { streamSSE } from "hono/streaming";
import { z } from "zod";
import { configSchema } from "../config/config";
import { ClaudeCodeTaskController } from "../service/claude-code/ClaudeCodeTaskController";
import type { SerializableAliveTask } from "../service/claude-code/types";
import { getEventBus } from "../service/events/EventBus";
import { getFileWatcher } from "../service/events/fileWatcher";
import { sseEvent } from "../service/events/sseEvent";
import type { WatcherEvent } from "../service/events/types";
import { sseEventResponse } from "../service/events/sseEventResponse";
import { getProject } from "../service/project/getProject";
import { getProjects } from "../service/project/getProjects";
import { getSession } from "../service/session/getSession";
@@ -233,7 +234,16 @@ export const routes = (app: HonoAppType) => {
)
.get("/tasks/alive", async (c) => {
return c.json({ aliveTasks: taskController.aliveTasks });
return c.json({
aliveTasks: taskController.aliveTasks.map(
(task): SerializableAliveTask => ({
id: task.id,
status: task.status,
sessionId: task.sessionId,
userMessageId: task.userMessageId,
}),
),
});
})
.post(
@@ -251,26 +261,16 @@ export const routes = (app: HonoAppType) => {
c,
async (stream) => {
const fileWatcher = getFileWatcher();
const eventBus = getEventBus();
let isConnected = true;
let eventId = 0;
// ハートビート設定
const heartbeat = setInterval(() => {
if (isConnected) {
stream
.writeSSE({
data: sseEvent({
type: "heartbeat",
timestamp: new Date().toISOString(),
}),
event: "heartbeat",
id: String(eventId++),
})
.catch(() => {
console.warn("Failed to write SSE event");
isConnected = false;
onConnectionClosed();
});
eventBus.emit("heartbeat", {
type: "heartbeat",
});
}
}, 30 * 1000);
@@ -296,59 +296,63 @@ export const routes = (app: HonoAppType) => {
// イベントリスナーを登録
console.log("Registering SSE event listeners");
fileWatcher.on("project_changed", async (event: WatcherEvent) => {
eventBus.on("connected", async (event) => {
if (!isConnected) {
return;
}
if (event.eventType !== "project_changed") {
return;
}
await stream
.writeSSE({
data: sseEvent({
type: event.eventType,
...event.data,
}),
event: event.eventType,
id: String(eventId++),
})
.catch(() => {
console.warn("Failed to write SSE event");
onConnectionClosed();
});
await stream.writeSSE(sseEventResponse(event)).catch(() => {
onConnectionClosed();
});
});
fileWatcher.on("session_changed", async (event: WatcherEvent) => {
eventBus.on("heartbeat", async (event) => {
if (!isConnected) {
return;
}
await stream.writeSSE(sseEventResponse(event)).catch(() => {
onConnectionClosed();
});
});
eventBus.on("project_changed", async (event) => {
if (!isConnected) {
return;
}
await stream
.writeSSE({
data: sseEvent({
...event.data,
type: event.eventType,
}),
event: event.eventType,
id: String(eventId++),
})
.catch(() => {
onConnectionClosed();
});
await stream.writeSSE(sseEventResponse(event)).catch(() => {
console.warn("Failed to write SSE event");
onConnectionClosed();
});
});
eventBus.on("session_changed", async (event) => {
if (!isConnected) {
return;
}
await stream.writeSSE(sseEventResponse(event)).catch(() => {
onConnectionClosed();
});
});
eventBus.on("task_changed", async (event) => {
if (!isConnected) {
return;
}
await stream.writeSSE(sseEventResponse(event)).catch(() => {
onConnectionClosed();
});
});
// 初期接続確認メッセージ
await stream.writeSSE({
data: sseEvent({
type: "connected",
message: "SSE connection established",
timestamp: new Date().toISOString(),
}),
event: "connected",
id: String(eventId++),
eventBus.emit("connected", {
type: "connected",
message: "SSE connection established",
});
fileWatcher.startWatching();
await connectionPromise;
},
async (err, stream) => {

View File

@@ -1,70 +1,25 @@
import { execSync } from "node:child_process";
import { query } from "@anthropic-ai/claude-code";
import { ulid } from "ulid";
import {
createMessageGenerator,
type MessageGenerator,
type OnMessage,
} from "./createMessageGenerator";
type BaseClaudeCodeTask = {
id: string;
projectId: string;
baseSessionId?: string | undefined; // undefined = new session
cwd: string;
generateMessages: MessageGenerator;
setNextMessage: (message: string) => void;
onMessageHandlers: OnMessage[];
};
type PendingClaudeCodeTask = BaseClaudeCodeTask & {
status: "pending";
};
type RunningClaudeCodeTask = BaseClaudeCodeTask & {
status: "running";
sessionId: string;
userMessageId: string;
abortController: AbortController;
};
type PausedClaudeCodeTask = BaseClaudeCodeTask & {
status: "paused";
sessionId: string;
userMessageId: string;
abortController: AbortController;
};
type CompletedClaudeCodeTask = BaseClaudeCodeTask & {
status: "completed";
sessionId: string;
userMessageId: string;
abortController: AbortController;
};
type FailedClaudeCodeTask = BaseClaudeCodeTask & {
status: "failed";
sessionId?: string;
userMessageId?: string;
abortController?: AbortController;
};
type ClaudeCodeTask =
| RunningClaudeCodeTask
| PausedClaudeCodeTask
| CompletedClaudeCodeTask
| FailedClaudeCodeTask;
type AliveClaudeCodeTask = RunningClaudeCodeTask | PausedClaudeCodeTask;
import { type EventBus, getEventBus } from "../events/EventBus";
import { createMessageGenerator } from "./createMessageGenerator";
import type {
AliveClaudeCodeTask,
ClaudeCodeTask,
PendingClaudeCodeTask,
RunningClaudeCodeTask,
} from "./types";
export class ClaudeCodeTaskController {
private pathToClaudeCodeExecutable: string;
private tasks: ClaudeCodeTask[] = [];
private eventBus: EventBus;
constructor() {
this.pathToClaudeCodeExecutable = execSync("which claude", {})
.toString()
.trim();
this.eventBus = getEventBus();
}
public get aliveTasks() {
@@ -256,5 +211,10 @@ export class ClaudeCodeTaskController {
}
Object.assign(target, task);
this.eventBus.emit("task_changed", {
type: "task_changed",
data: this.aliveTasks,
});
}
}

View File

@@ -0,0 +1,56 @@
import type { MessageGenerator, OnMessage } from "./createMessageGenerator";
type BaseClaudeCodeTask = {
id: string;
projectId: string;
baseSessionId?: string | undefined; // undefined = new session
cwd: string;
generateMessages: MessageGenerator;
setNextMessage: (message: string) => void;
onMessageHandlers: OnMessage[];
};
export type PendingClaudeCodeTask = BaseClaudeCodeTask & {
status: "pending";
};
export type RunningClaudeCodeTask = BaseClaudeCodeTask & {
status: "running";
sessionId: string;
userMessageId: string;
abortController: AbortController;
};
export type PausedClaudeCodeTask = BaseClaudeCodeTask & {
status: "paused";
sessionId: string;
userMessageId: string;
abortController: AbortController;
};
type CompletedClaudeCodeTask = BaseClaudeCodeTask & {
status: "completed";
sessionId: string;
userMessageId: string;
abortController: AbortController;
};
type FailedClaudeCodeTask = BaseClaudeCodeTask & {
status: "failed";
sessionId?: string;
userMessageId?: string;
abortController?: AbortController;
};
export type ClaudeCodeTask =
| RunningClaudeCodeTask
| PausedClaudeCodeTask
| CompletedClaudeCodeTask
| FailedClaudeCodeTask;
export type AliveClaudeCodeTask = RunningClaudeCodeTask | PausedClaudeCodeTask;
export type SerializableAliveTask = Pick<
AliveClaudeCodeTask,
"id" | "status" | "sessionId" | "userMessageId"
>;

View File

@@ -0,0 +1,45 @@
import { EventEmitter } from "node:events";
import type { BaseSSEEvent, SSEEvent } from "./types";
export class EventBus {
private previousId = 0;
private eventEmitter = new EventEmitter();
public emit<
T extends SSEEvent["type"],
E = SSEEvent extends infer I ? (I extends { type: T } ? I : never) : never,
>(type: T, event: Omit<E, "id" | "timestamp">): void {
const base: BaseSSEEvent = {
id: String(this.previousId++),
timestamp: new Date().toISOString(),
};
this.eventEmitter.emit(type, {
...event,
...base,
});
}
public on(
event: SSEEvent["type"],
listener: (event: SSEEvent) => void,
): void {
this.eventEmitter.on(event, listener);
}
public off(
event: SSEEvent["type"],
listener: (event: SSEEvent) => void,
): void {
this.eventEmitter.off(event, listener);
}
}
// Singleton
let eventBusInstance: EventBus | null = null;
export const getEventBus = (): EventBus => {
eventBusInstance ??= new EventBus();
return eventBusInstance;
};

View File

@@ -1,8 +1,7 @@
import { EventEmitter } from "node:events";
import { type FSWatcher, watch } from "node:fs";
import z from "zod";
import { claudeProjectPath } from "../paths";
import type { WatcherEvent } from "./types";
import { type EventBus, getEventBus } from "./EventBus";
const fileRegExp = /(?<projectId>.*?)\/(?<sessionId>.*?)\.jsonl/;
const fileRegExpGroupSchema = z.object({
@@ -10,16 +9,16 @@ const fileRegExpGroupSchema = z.object({
sessionId: z.string(),
});
export class FileWatcherService extends EventEmitter {
export class FileWatcherService {
private watcher: FSWatcher | null = null;
private projectWatchers: Map<string, FSWatcher> = new Map();
private eventBus: EventBus;
constructor() {
super();
this.startWatching();
this.eventBus = getEventBus();
}
private startWatching(): void {
public startWatching(): void {
try {
console.log("Starting file watcher on:", claudeProjectPath);
// メインプロジェクトディレクトリを監視
@@ -37,19 +36,22 @@ export class FileWatcherService extends EventEmitter {
const { projectId, sessionId } = groups.data;
this.emit("project_changed", {
eventType: "project_changed",
data: { projectId, fileEventType: eventType },
} satisfies WatcherEvent);
this.eventBus.emit("project_changed", {
type: "project_changed",
data: {
fileEventType: eventType,
projectId,
},
});
this.emit("session_changed", {
eventType: "session_changed",
this.eventBus.emit("session_changed", {
type: "session_changed",
data: {
projectId,
sessionId,
fileEventType: eventType,
},
} satisfies WatcherEvent);
});
},
);
console.log("File watcher initialization completed");

View File

@@ -1,13 +0,0 @@
import type { BaseSSEEvent, SSEEvent } from "./types";
let eventId = 0;
export const sseEvent = <D extends Omit<SSEEvent, "id" | "timestamp">>(
data: D,
): string => {
return JSON.stringify({
...data,
id: String(eventId++),
timestamp: new Date().toISOString(),
} satisfies D & BaseSSEEvent);
};

View File

@@ -0,0 +1,9 @@
import type { SSEEvent } from "./types";
export const sseEventResponse = (event: SSEEvent) => {
return {
data: JSON.stringify(event),
event: event.type,
id: event.id,
};
};

View File

@@ -1,4 +1,5 @@
import type { WatchEventType } from "node:fs";
import type { SerializableAliveTask } from "../claude-code/types";
export type WatcherEvent =
| {
@@ -27,15 +28,17 @@ export type SSEEvent = BaseSSEEvent &
timestamp: string;
}
| {
id: string;
type: "project_changed";
data: ProjectChangedData;
}
| {
id: string;
type: "session_changed";
data: SessionChangedData;
}
| {
type: "task_changed";
data: SerializableAliveTask[];
}
);
export interface ProjectChangedData {