refactor: add effect-ts and refactor codes

This commit is contained in:
d-kimsuon
2025-10-15 23:22:27 +09:00
parent 94cc1c0630
commit 21070d09ff
76 changed files with 7598 additions and 1950 deletions

View File

@@ -0,0 +1,362 @@
import { Effect, Layer, Ref } from "effect";
import { describe, expect, it, vi } from "vitest";
import { EventBus } from "../service/events/EventBus";
import { FileWatcherService } from "../service/events/fileWatcher";
import type { InternalEventDeclaration } from "../service/events/InternalEventDeclaration";
import { ProjectMetaService } from "../service/project/ProjectMetaService";
import { ProjectRepository } from "../service/project/ProjectRepository";
import { VirtualConversationDatabase } from "../service/session/PredictSessionsDatabase";
import { SessionMetaService } from "../service/session/SessionMetaService";
import { SessionRepository } from "../service/session/SessionRepository";
import { InitializeService } from "./initialize";
describe("InitializeService", () => {
const createMockProjectRepository = (
projects: Array<{
id: string;
claudeProjectPath: string;
lastModifiedAt: Date;
meta: {
projectName: string | null;
projectPath: string | null;
sessionCount: number;
};
}> = [],
) =>
Layer.succeed(ProjectRepository, {
getProjects: () => Effect.succeed({ projects }),
getProject: () => Effect.fail(new Error("Not implemented in mock")),
});
const createMockSessionRepository = (
sessions: Array<{
id: string;
jsonlFilePath: string;
lastModifiedAt: Date;
meta: {
messageCount: number;
firstCommand: {
kind: "command";
commandName: string;
commandArgs?: string;
commandMessage?: string;
} | null;
};
}> = [],
getSessionsCb?: (projectId: string) => void,
) =>
Layer.succeed(SessionRepository, {
getSessions: (projectId: string) => {
if (getSessionsCb) getSessionsCb(projectId);
return Effect.succeed({ sessions });
},
getSession: () => Effect.fail(new Error("Not implemented in mock")),
});
const createMockProjectMetaService = () =>
Layer.succeed(ProjectMetaService, {
getProjectMeta: () =>
Effect.succeed({
projectName: "Test Project",
projectPath: "/path/to/project",
sessionCount: 0,
}),
invalidateProject: () => Effect.void,
});
const createMockSessionMetaService = () =>
Layer.succeed(SessionMetaService, {
getSessionMeta: () =>
Effect.succeed({
messageCount: 0,
firstCommand: null,
}),
invalidateSession: () => Effect.void,
});
const createTestLayer = (
mockProjectRepositoryLayer: Layer.Layer<
ProjectRepository,
never,
never
> = createMockProjectRepository(),
mockSessionRepositoryLayer: Layer.Layer<
SessionRepository,
never,
never
> = createMockSessionRepository(),
) => {
// Provide EventBus first since FileWatcherService depends on it
const fileWatcherWithEventBus = FileWatcherService.Live.pipe(
Layer.provide(EventBus.Live),
);
// Merge all dependencies
const allDependencies = Layer.mergeAll(
EventBus.Live,
fileWatcherWithEventBus,
mockProjectRepositoryLayer,
mockSessionRepositoryLayer,
createMockProjectMetaService(),
createMockSessionMetaService(),
VirtualConversationDatabase.Live,
);
// Provide dependencies to InitializeService.Live and expose all services
return Layer.provide(InitializeService.Live, allDependencies).pipe(
Layer.merge(allDependencies),
);
};
describe("basic initialization process", () => {
it("service initialization succeeds", async () => {
const mockProjectRepositoryLayer = createMockProjectRepository([
{
id: "project-1",
claudeProjectPath: "/path/to/project-1",
lastModifiedAt: new Date(),
meta: {
projectName: "Project 1",
projectPath: "/path/to/project-1",
sessionCount: 2,
},
},
]);
const mockSessionRepositoryLayer = createMockSessionRepository([
{
id: "session-1",
jsonlFilePath: "/path/to/session-1.jsonl",
lastModifiedAt: new Date(),
meta: {
messageCount: 5,
firstCommand: {
kind: "command",
commandName: "test",
},
},
},
{
id: "session-2",
jsonlFilePath: "/path/to/session-2.jsonl",
lastModifiedAt: new Date(),
meta: {
messageCount: 3,
firstCommand: null,
},
},
]);
const program = Effect.gen(function* () {
const initialize = yield* InitializeService;
return yield* initialize.startInitialization();
});
const testLayer = createTestLayer(
mockProjectRepositoryLayer,
mockSessionRepositoryLayer,
);
const result = await Effect.runPromise(
program.pipe(Effect.provide(testLayer)),
);
expect(result).toBeUndefined();
});
it("file watcher is started", async () => {
const program = Effect.gen(function* () {
const initialize = yield* InitializeService;
yield* initialize.startInitialization();
// Verify file watcher is started
// (In actual implementation, verify that startWatching is called)
return "file watcher started";
});
const testLayer = createTestLayer();
const result = await Effect.runPromise(
program.pipe(Effect.provide(testLayer)),
);
expect(result).toBe("file watcher started");
});
});
describe("event processing", () => {
it("receives sessionChanged event", async () => {
const program = Effect.gen(function* () {
const initialize = yield* InitializeService;
const eventBus = yield* EventBus;
const eventsRef = yield* Ref.make<
Array<InternalEventDeclaration["sessionChanged"]>
>([]);
// Set up listener for sessionChanged event
yield* eventBus.on("sessionChanged", (event) => {
Effect.runSync(Ref.update(eventsRef, (events) => [...events, event]));
});
yield* initialize.startInitialization();
// Emit event
yield* eventBus.emit("sessionChanged", {
projectId: "project-1",
sessionId: "session-1",
});
// Wait a bit for event to be processed
yield* Effect.sleep("50 millis");
const events = yield* Ref.get(eventsRef);
return events;
});
const testLayer = createTestLayer();
const result = await Effect.runPromise(
program.pipe(Effect.provide(testLayer)),
);
expect(result).toHaveLength(1);
expect(result[0]).toEqual({
projectId: "project-1",
sessionId: "session-1",
});
});
it("heartbeat event is emitted periodically", async () => {
const program = Effect.gen(function* () {
const initialize = yield* InitializeService;
const eventBus = yield* EventBus;
const heartbeatCountRef = yield* Ref.make(0);
// Set up listener for heartbeat event
yield* eventBus.on("heartbeat", () =>
Effect.runSync(Ref.update(heartbeatCountRef, (count) => count + 1)),
);
yield* initialize.startInitialization();
// Wait a bit to verify heartbeat is emitted
// (In actual tests, should use mock to shorten time)
yield* Effect.sleep("100 millis");
const count = yield* Ref.get(heartbeatCountRef);
return count;
});
const testLayer = createTestLayer();
const result = await Effect.runPromise(
program.pipe(Effect.provide(testLayer)),
);
// heartbeat is emitted immediately once first, then every 10 seconds
// At 100ms, only the first one is emitted
expect(result).toBeGreaterThanOrEqual(1);
});
});
describe("cache initialization", () => {
it("project and session caches are initialized", async () => {
const getProjectsCalled = vi.fn();
const getSessionsCalled = vi.fn();
const mockProjectRepositoryLayer = Layer.succeed(ProjectRepository, {
getProjects: () => {
getProjectsCalled();
return Effect.succeed({
projects: [
{
id: "project-1",
claudeProjectPath: "/path/to/project-1",
lastModifiedAt: new Date(),
meta: {
projectName: "Project 1",
projectPath: "/path/to/project-1",
sessionCount: 2,
},
},
],
});
},
getProject: () => Effect.fail(new Error("Not implemented in mock")),
});
const mockSessionRepositoryLayer = createMockSessionRepository(
[
{
id: "session-1",
jsonlFilePath: "/path/to/session-1.jsonl",
lastModifiedAt: new Date(),
meta: {
messageCount: 5,
firstCommand: {
kind: "command",
commandName: "test",
},
},
},
],
getSessionsCalled,
);
const program = Effect.gen(function* () {
const initialize = yield* InitializeService;
yield* initialize.startInitialization();
});
const testLayer = createTestLayer(
mockProjectRepositoryLayer,
mockSessionRepositoryLayer,
);
await Effect.runPromise(program.pipe(Effect.provide(testLayer)));
expect(getProjectsCalled).toHaveBeenCalledTimes(1);
expect(getSessionsCalled).toHaveBeenCalledTimes(1);
expect(getSessionsCalled).toHaveBeenCalledWith("project-1");
});
it("doesn't throw error even if cache initialization fails", async () => {
const mockProjectRepositoryLayer = Layer.succeed(ProjectRepository, {
getProjects: () => Effect.fail(new Error("Failed to get projects")),
getProject: () => Effect.fail(new Error("Not implemented in mock")),
});
const program = Effect.gen(function* () {
const initialize = yield* InitializeService;
return yield* initialize.startInitialization();
});
const testLayer = createTestLayer(mockProjectRepositoryLayer);
// Completes without throwing error
await expect(
Effect.runPromise(program.pipe(Effect.provide(testLayer))),
).resolves.toBeUndefined();
});
});
describe("cleanup", () => {
it("resources are cleaned up with stopCleanup", async () => {
const program = Effect.gen(function* () {
const initialize = yield* InitializeService;
yield* initialize.startInitialization();
yield* initialize.stopCleanup();
return "cleaned up";
});
const testLayer = createTestLayer();
const result = await Effect.runPromise(
program.pipe(Effect.provide(testLayer)),
);
expect(result).toBe("cleaned up");
});
});
});

View File

@@ -1,55 +1,144 @@
import prexit from "prexit";
import { claudeCodeTaskController } from "../service/claude-code/ClaudeCodeTaskController";
import { eventBus } from "../service/events/EventBus";
import { fileWatcher } from "../service/events/fileWatcher";
import { Context, Effect, Layer, Ref, Schedule } from "effect";
import { EventBus } from "../service/events/EventBus";
import { FileWatcherService } from "../service/events/fileWatcher";
import type { InternalEventDeclaration } from "../service/events/InternalEventDeclaration";
import type { ProjectRepository } from "../service/project/ProjectRepository";
import { projectMetaStorage } from "../service/project/projectMetaStorage";
import type { SessionRepository } from "../service/session/SessionRepository";
import { sessionMetaStorage } from "../service/session/sessionMetaStorage";
import { ProjectMetaService } from "../service/project/ProjectMetaService";
import { ProjectRepository } from "../service/project/ProjectRepository";
import { VirtualConversationDatabase } from "../service/session/PredictSessionsDatabase";
import { SessionMetaService } from "../service/session/SessionMetaService";
import { SessionRepository } from "../service/session/SessionRepository";
export const initialize = async (deps: {
sessionRepository: SessionRepository;
projectRepository: ProjectRepository;
}): Promise<void> => {
fileWatcher.startWatching();
interface InitializeServiceInterface {
readonly startInitialization: () => Effect.Effect<void>;
readonly stopCleanup: () => Effect.Effect<void>;
}
const intervalId = setInterval(() => {
eventBus.emit("heartbeat", {});
}, 10 * 1000);
export class InitializeService extends Context.Tag("InitializeService")<
InitializeService,
InitializeServiceInterface
>() {
static Live = Layer.effect(
this,
Effect.gen(function* () {
const eventBus = yield* EventBus;
const fileWatcher = yield* FileWatcherService;
const projectRepository = yield* ProjectRepository;
const sessionRepository = yield* SessionRepository;
const projectMetaService = yield* ProjectMetaService;
const sessionMetaService = yield* SessionMetaService;
const virtualConversationDatabase = yield* VirtualConversationDatabase;
const onSessionChanged = (
event: InternalEventDeclaration["sessionChanged"],
) => {
projectMetaStorage.invalidateProject(event.projectId);
sessionMetaStorage.invalidateSession(event.projectId, event.sessionId);
};
// 状態管理用の Ref
const listenersRef = yield* Ref.make<{
sessionProcessChanged?:
| ((event: InternalEventDeclaration["sessionProcessChanged"]) => void)
| null;
sessionChanged?:
| ((event: InternalEventDeclaration["sessionChanged"]) => void)
| null;
}>({});
eventBus.on("sessionChanged", onSessionChanged);
const startInitialization = (): Effect.Effect<void> => {
return Effect.gen(function* () {
// ファイルウォッチャーを開始
yield* fileWatcher.startWatching();
try {
console.log("Initializing projects cache");
const { projects } = await deps.projectRepository.getProjects();
console.log(`${projects.length} projects cache initialized`);
// ハートビートを定期的に送信
const daemon = Effect.repeat(
eventBus.emit("heartbeat", {}),
Schedule.fixed("10 seconds"),
);
console.log("Initializing sessions cache");
const results = await Promise.all(
projects.map((project) => deps.sessionRepository.getSessions(project.id)),
);
console.log(
`${results.reduce(
(s, { sessions }) => s + sessions.length,
0,
)} sessions cache initialized`,
);
} catch {
// do nothing
}
console.log("start heartbeat");
yield* Effect.forkDaemon(daemon);
console.log("after starting heartbeat fork");
prexit(() => {
clearInterval(intervalId);
eventBus.off("sessionChanged", onSessionChanged);
fileWatcher.stop();
claudeCodeTaskController.abortAllTasks();
});
};
// sessionChanged イベントのリスナーを登録
const onSessionChanged = (
event: InternalEventDeclaration["sessionChanged"],
) => {
Effect.runFork(
projectMetaService.invalidateProject(event.projectId),
);
Effect.runFork(
sessionMetaService.invalidateSession(
event.projectId,
event.sessionId,
),
);
};
const onSessionProcessChanged = (
event: InternalEventDeclaration["sessionProcessChanged"],
) => {
if (
(event.changed.type === "completed" ||
event.changed.type === "paused") &&
event.changed.sessionId !== undefined
) {
Effect.runFork(
virtualConversationDatabase.deleteVirtualConversations(
event.changed.sessionId,
),
);
return;
}
};
yield* Ref.set(listenersRef, {
sessionChanged: onSessionChanged,
sessionProcessChanged: onSessionProcessChanged,
});
yield* eventBus.on("sessionChanged", onSessionChanged);
yield* eventBus.on("sessionProcessChanged", onSessionProcessChanged);
yield* Effect.gen(function* () {
console.log("Initializing projects cache");
const { projects } = yield* projectRepository.getProjects();
console.log(`${projects.length} projects cache initialized`);
console.log("Initializing sessions cache");
const results = yield* Effect.all(
projects.map((project) =>
sessionRepository.getSessions(project.id),
),
{ concurrency: "unbounded" },
);
const totalSessions = results.reduce(
(s, { sessions }) => s + sessions.length,
0,
);
console.log(`${totalSessions} sessions cache initialized`);
}).pipe(
Effect.catchAll(() => Effect.void),
Effect.withSpan("initialize-cache"),
);
}).pipe(Effect.withSpan("start-initialization")) as Effect.Effect<void>;
};
const stopCleanup = (): Effect.Effect<void> =>
Effect.gen(function* () {
const listeners = yield* Ref.get(listenersRef);
if (listeners.sessionChanged) {
yield* eventBus.off("sessionChanged", listeners.sessionChanged);
}
if (listeners.sessionProcessChanged) {
yield* eventBus.off(
"sessionProcessChanged",
listeners.sessionProcessChanged,
);
}
yield* Ref.set(listenersRef, {});
yield* fileWatcher.stop();
});
return {
startInitialization,
stopCleanup,
} satisfies InitializeServiceInterface;
}),
);
}

File diff suppressed because it is too large Load Diff