perf: refactor sse handleing

This commit is contained in:
d-kimsuon
2025-09-18 20:42:44 +09:00
parent a90ef520dd
commit eb5a8ddeeb
38 changed files with 727 additions and 597 deletions

View File

@@ -8,9 +8,11 @@ import { z } from "zod";
import { configSchema } from "../config/config";
import { ClaudeCodeTaskController } from "../service/claude-code/ClaudeCodeTaskController";
import type { SerializableAliveTask } from "../service/claude-code/types";
import { adaptInternalEventToSSE } from "../service/events/adaptInternalEventToSSE";
import { getEventBus } from "../service/events/EventBus";
import { getFileWatcher } from "../service/events/fileWatcher";
import { sseEventResponse } from "../service/events/sseEventResponse";
import type { InternalEventDeclaration } from "../service/events/InternalEventDeclaration";
import { writeTypeSafeSSE } from "../service/events/typeSafeSSE";
import { getFileCompletion } from "../service/file-completion/getFileCompletion";
import { getBranches } from "../service/git/getBranches";
import { getCommits } from "../service/git/getCommits";
@@ -25,6 +27,14 @@ import { configMiddleware } from "./middleware/config.middleware";
export const routes = (app: HonoAppType) => {
const taskController = new ClaudeCodeTaskController();
const fileWatcher = getFileWatcher();
const eventBus = getEventBus();
fileWatcher.startWatching();
setInterval(() => {
eventBus.emit("heartbeat", {});
}, 10 * 1000);
return (
app
@@ -375,108 +385,52 @@ export const routes = (app: HonoAppType) => {
},
)
.get("/events/state_changes", async (c) => {
.get("/sse", async (c) => {
return streamSSE(
c,
async (stream) => {
const fileWatcher = getFileWatcher();
const eventBus = getEventBus();
async (rawStream) => {
const stream = writeTypeSafeSSE(rawStream);
let isConnected = true;
// ハートビート設定
const heartbeat = setInterval(() => {
if (isConnected) {
eventBus.emit("heartbeat", {
type: "heartbeat",
});
}
}, 30 * 1000);
// connection handling
const abortController = new AbortController();
let connectionResolve: ((value: undefined) => void) | undefined;
const connectionPromise = new Promise<undefined>((resolve) => {
connectionResolve = resolve;
});
const onConnectionClosed = () => {
isConnected = false;
connectionResolve?.(undefined);
abortController.abort();
clearInterval(heartbeat);
const onSessionListChanged = (
event: InternalEventDeclaration["sessionListChanged"],
) => {
stream.writeSSE("sessionListChanged", {
projectId: event.projectId,
});
};
// 接続終了時のクリーンアップ
stream.onAbort(() => {
console.log("SSE connection aborted");
onConnectionClosed();
});
// イベントリスナーを登録
console.log("Registering SSE event listeners");
eventBus.on("connected", async (event) => {
if (!isConnected) {
return;
}
await stream.writeSSE(sseEventResponse(event)).catch(() => {
onConnectionClosed();
const onSessionChanged = (
event: InternalEventDeclaration["sessionChanged"],
) => {
stream.writeSSE("sessionChanged", {
projectId: event.projectId,
sessionId: event.sessionId,
});
});
};
eventBus.on("heartbeat", async (event) => {
if (!isConnected) {
return;
}
await stream.writeSSE(sseEventResponse(event)).catch(() => {
onConnectionClosed();
const onTaskChanged = (
event: InternalEventDeclaration["taskChanged"],
) => {
stream.writeSSE("taskChanged", {
aliveTasks: event.aliveTasks,
});
};
eventBus.on("sessionListChanged", onSessionListChanged);
eventBus.on("sessionChanged", onSessionChanged);
eventBus.on("taskChanged", onTaskChanged);
const { connectionPromise } = adaptInternalEventToSSE(rawStream, {
cleanUp: () => {
eventBus.off("sessionListChanged", onSessionListChanged);
eventBus.off("sessionChanged", onSessionChanged);
eventBus.off("taskChanged", onTaskChanged);
},
});
eventBus.on("project_changed", async (event) => {
if (!isConnected) {
return;
}
await stream.writeSSE(sseEventResponse(event)).catch(() => {
console.warn("Failed to write SSE event");
onConnectionClosed();
});
});
eventBus.on("session_changed", async (event) => {
if (!isConnected) {
return;
}
await stream.writeSSE(sseEventResponse(event)).catch(() => {
onConnectionClosed();
});
});
eventBus.on("task_changed", async (event) => {
if (!isConnected) {
return;
}
await stream.writeSSE(sseEventResponse(event)).catch(() => {
onConnectionClosed();
});
});
// 初期接続確認メッセージ
eventBus.emit("connected", {
type: "connected",
message: "SSE connection established",
});
fileWatcher.startWatching();
await connectionPromise;
},
async (err, stream) => {
async (err) => {
console.error("Streaming error:", err);
await stream.write("エラーが発生しました。");
},
);
})

View File

@@ -2,7 +2,7 @@ import { execSync } from "node:child_process";
import { query } from "@anthropic-ai/claude-code";
import prexit from "prexit";
import { ulid } from "ulid";
import { type EventBus, getEventBus } from "../events/EventBus";
import { getEventBus, type IEventBus } from "../events/EventBus";
import { createMessageGenerator } from "./createMessageGenerator";
import type {
AliveClaudeCodeTask,
@@ -14,7 +14,7 @@ import type {
export class ClaudeCodeTaskController {
private pathToClaudeCodeExecutable: string;
private tasks: ClaudeCodeTask[] = [];
private eventBus: EventBus;
private eventBus: IEventBus;
constructor() {
this.pathToClaudeCodeExecutable = execSync("which claude", {})
@@ -239,9 +239,8 @@ export class ClaudeCodeTaskController {
Object.assign(target, task);
this.eventBus.emit("task_changed", {
type: "task_changed",
data: this.aliveTasks,
this.eventBus.emit("taskChanged", {
aliveTasks: this.aliveTasks,
});
}
}

View File

@@ -1,45 +1,47 @@
import { EventEmitter } from "node:events";
import { EventEmitter } from "node:stream";
import type { InternalEventDeclaration } from "./InternalEventDeclaration";
import type { BaseSSEEvent, SSEEvent } from "./types";
class EventBus {
private emitter: EventEmitter;
export class EventBus {
private previousId = 0;
private eventEmitter = new EventEmitter();
constructor() {
this.emitter = new EventEmitter();
}
public emit<
T extends SSEEvent["type"],
E = SSEEvent extends infer I ? (I extends { type: T } ? I : never) : never,
>(type: T, event: Omit<E, "id" | "timestamp">): void {
const base: BaseSSEEvent = {
id: String(this.previousId++),
timestamp: new Date().toISOString(),
};
this.eventEmitter.emit(type, {
...event,
...base,
public emit<EventName extends keyof InternalEventDeclaration>(
event: EventName,
data: InternalEventDeclaration[EventName],
): void {
this.emitter.emit(event, {
...data,
});
}
public on(
event: SSEEvent["type"],
listener: (event: SSEEvent) => void,
public on<EventName extends keyof InternalEventDeclaration>(
event: EventName,
listener: (
data: InternalEventDeclaration[EventName],
) => void | Promise<void>,
): void {
this.eventEmitter.on(event, listener);
this.emitter.on(event, listener);
}
public off(
event: SSEEvent["type"],
listener: (event: SSEEvent) => void,
public off<EventName extends keyof InternalEventDeclaration>(
event: EventName,
listener: (
data: InternalEventDeclaration[EventName],
) => void | Promise<void>,
): void {
this.eventEmitter.off(event, listener);
this.emitter.off(event, listener);
}
}
// Singleton
let eventBusInstance: EventBus | null = null;
// singleton
let eventBus: EventBus | null = null;
export const getEventBus = (): EventBus => {
eventBusInstance ??= new EventBus();
return eventBusInstance;
export const getEventBus = () => {
eventBus ??= new EventBus();
return eventBus;
};
export type IEventBus = ReturnType<typeof getEventBus>;

View File

@@ -0,0 +1,19 @@
import type { AliveClaudeCodeTask } from "../claude-code/types";
export type InternalEventDeclaration = {
// biome-ignore lint/complexity/noBannedTypes: correct type
heartbeat: {};
sessionListChanged: {
projectId: string;
};
sessionChanged: {
projectId: string;
sessionId: string;
};
taskChanged: {
aliveTasks: AliveClaudeCodeTask[];
};
};

View File

@@ -0,0 +1,61 @@
import type { SSEStreamingApi } from "hono/streaming";
import { getEventBus } from "./EventBus";
import type { InternalEventDeclaration } from "./InternalEventDeclaration";
import { writeTypeSafeSSE } from "./typeSafeSSE";
export const adaptInternalEventToSSE = (
rawStream: SSEStreamingApi,
options?: {
timeout?: number;
cleanUp?: () => void | Promise<void>;
},
) => {
const { timeout = 60 * 1000, cleanUp } = options ?? {};
console.log("SSE connection started");
const eventBus = getEventBus();
const stream = writeTypeSafeSSE(rawStream);
const abortController = new AbortController();
let connectionResolve: (() => void) | undefined;
const connectionPromise = new Promise<void>((resolve) => {
connectionResolve = resolve;
});
const closeConnection = () => {
console.log("SSE connection closed");
connectionResolve?.();
abortController.abort();
eventBus.off("heartbeat", heartbeat);
cleanUp?.();
};
rawStream.onAbort(() => {
console.log("SSE connection aborted");
closeConnection();
});
// Event Listeners
const heartbeat = (event: InternalEventDeclaration["heartbeat"]) => {
stream.writeSSE("heartbeat", {
...event,
});
};
eventBus.on("heartbeat", heartbeat);
stream.writeSSE("connect", {
timestamp: new Date().toISOString(),
});
setTimeout(() => {
closeConnection();
}, timeout);
return {
connectionPromise,
} as const;
};

View File

@@ -1,7 +1,7 @@
import { type FSWatcher, watch } from "node:fs";
import z from "zod";
import { claudeProjectPath } from "../paths";
import { type EventBus, getEventBus } from "./EventBus";
import { getEventBus, type IEventBus } from "./EventBus";
const fileRegExp = /(?<projectId>.*?)\/(?<sessionId>.*?)\.jsonl/;
const fileRegExpGroupSchema = z.object({
@@ -10,15 +10,19 @@ const fileRegExpGroupSchema = z.object({
});
export class FileWatcherService {
private isWatching = false;
private watcher: FSWatcher | null = null;
private projectWatchers: Map<string, FSWatcher> = new Map();
private eventBus: EventBus;
private eventBus: IEventBus;
constructor() {
this.eventBus = getEventBus();
}
public startWatching(): void {
if (this.isWatching) return;
this.isWatching = true;
try {
console.log("Starting file watcher on:", claudeProjectPath);
// メインプロジェクトディレクトリを監視
@@ -36,22 +40,20 @@ export class FileWatcherService {
const { projectId, sessionId } = groups.data;
this.eventBus.emit("project_changed", {
type: "project_changed",
data: {
fileEventType: eventType,
projectId,
},
});
this.eventBus.emit("session_changed", {
type: "session_changed",
data: {
if (eventType === "change") {
// セッションファイルの中身が変更されている
this.eventBus.emit("sessionChanged", {
projectId,
sessionId,
fileEventType: eventType,
},
});
});
} else if (eventType === "rename") {
// セッションファイルの追加/削除
this.eventBus.emit("sessionListChanged", {
projectId,
});
} else {
eventType satisfies never;
}
},
);
console.log("File watcher initialization completed");

View File

@@ -1,9 +0,0 @@
import type { SSEEvent } from "./types";
export const sseEventResponse = (event: SSEEvent) => {
return {
data: JSON.stringify(event),
event: event.type,
id: event.id,
};
};

View File

@@ -0,0 +1,21 @@
import type { SSEStreamingApi } from "hono/streaming";
import { ulid } from "ulid";
import type { SSEEventDeclaration } from "../../../types/sse";
export const writeTypeSafeSSE = (stream: SSEStreamingApi) => ({
writeSSE: async <EventName extends keyof SSEEventDeclaration>(
event: EventName,
data: SSEEventDeclaration[EventName],
): Promise<void> => {
const id = ulid();
await stream.writeSSE({
event: event,
id: id,
data: JSON.stringify({
kind: event,
timestamp: new Date().toISOString(),
...data,
}),
});
},
});

View File

@@ -1,53 +0,0 @@
import type { WatchEventType } from "node:fs";
import type { SerializableAliveTask } from "../claude-code/types";
export type WatcherEvent =
| {
eventType: "project_changed";
data: ProjectChangedData;
}
| {
eventType: "session_changed";
data: SessionChangedData;
};
export type BaseSSEEvent = {
id: string;
timestamp: string;
};
export type SSEEvent = BaseSSEEvent &
(
| {
type: "connected";
message: string;
timestamp: string;
}
| {
type: "heartbeat";
timestamp: string;
}
| {
type: "project_changed";
data: ProjectChangedData;
}
| {
type: "session_changed";
data: SessionChangedData;
}
| {
type: "task_changed";
data: SerializableAliveTask[];
}
);
export interface ProjectChangedData {
projectId: string;
fileEventType: WatchEventType;
}
export interface SessionChangedData {
projectId: string;
sessionId: string;
fileEventType: WatchEventType;
}

View File

@@ -160,8 +160,6 @@ async function getUntrackedFiles(cwd: string): Promise<GitResult<string[]>> {
cwd,
);
console.log("debug statusResult stdout", statusResult);
if (!statusResult.success) {
return statusResult;
}
@@ -334,7 +332,6 @@ export const getDiff = async (
// Include untracked files when comparing to working directory
if (toRef === undefined) {
const untrackedResult = await getUntrackedFiles(cwd);
console.log("debug untrackedResult", untrackedResult);
if (untrackedResult.success) {
for (const untrackedFile of untrackedResult.data) {
const untrackedDiff = await createUntrackedFileDiff(