This commit is contained in:
Dax Raad
2025-05-18 14:13:04 -04:00
parent bcd2fd68b7
commit 0e303e6508
12 changed files with 493 additions and 144 deletions

79
js/src/bus/index.ts Normal file
View File

@@ -0,0 +1,79 @@
import type { z, ZodSchema } from "zod/v4";
import { App } from "../app";
import { Log } from "../util/log";
export namespace Bus {
const log = Log.create({ service: "bus" });
type Subscription = (event: any) => void;
const state = App.state("bus", () => {
const subscriptions = new Map<any, Subscription[]>();
return {
subscriptions,
};
});
export type EventDefinition = ReturnType<typeof event>;
export function event<Type extends string, Properties extends ZodSchema>(
type: Type,
properties: Properties,
) {
return {
type,
properties,
};
}
export function publish<Definition extends EventDefinition>(
def: Definition,
properties: z.output<Definition["properties"]>,
) {
const payload = {
type: def.type,
properties,
};
log.info("publishing", {
type: def.type,
...properties,
});
for (const key of [def.type, "*"]) {
const match = state().subscriptions.get(key);
for (const sub of match ?? []) {
sub(payload);
}
}
}
export function subscribe<Definition extends EventDefinition>(
def: Definition,
callback: (event: {
type: Definition["type"];
properties: z.infer<Definition["properties"]>;
}) => void,
) {
return raw(def.type, callback);
}
export function subscribeAll(callback: (event: any) => void) {
return raw("*", callback);
}
function raw(type: string, callback: (event: any) => void) {
log.info("subscribing", { type });
const subscriptions = state().subscriptions;
let match = subscriptions.get(type) ?? [];
match.push(callback);
subscriptions.set(type, match);
return () => {
log.info("unsubscribing", { type });
const match = subscriptions.get(type);
if (!match) return;
const index = match.indexOf(callback);
if (index === -1) return;
match.splice(index, 1);
};
}
}

View File

@@ -1,4 +1,4 @@
import { z } from "zod";
import { z } from "zod/v4";
import { randomBytes } from "crypto";
export namespace Identifier {
@@ -11,7 +11,7 @@ export namespace Identifier {
return z.string().startsWith(prefixes[prefix]);
}
const LENGTH = 24;
const LENGTH = 26;
export function ascending(prefix: keyof typeof prefixes, given?: string) {
return generateID(prefix, false, given);
@@ -45,6 +45,11 @@ export namespace Identifier {
const randLength = (LENGTH - 12) / 2;
const random = randomBytes(randLength);
return prefix + "_" + timeBytes.toString("hex") + random.toString("hex");
return (
prefixes[prefix] +
"_" +
timeBytes.toString("hex") +
random.toString("hex")
);
}
}

View File

@@ -1,29 +1,11 @@
import { App } from "./app";
import process from "node:process";
import { RPC } from "./server/server";
import { Session } from "./session/session";
import { Identifier } from "./id/id";
import { Server } from "./server/server";
const app = await App.create({
directory: process.cwd(),
});
App.provide(app, async () => {
const sessionID = await Session.list()
[Symbol.asyncIterator]()
.next()
.then((v) => v.value ?? Session.create().then((s) => s.id));
await Session.chat(sessionID, {
role: "user",
id: Identifier.ascending("message"),
parts: [
{
type: "text",
text: "Hey how are you? try to use tools",
},
],
});
const rpc = RPC.listen();
const server = Server.listen();
});

View File

@@ -1,34 +1,64 @@
import { Log } from "../util/log";
import { Bus } from "../bus";
export namespace RPC {
const log = Log.create({ service: "rpc" });
import { Hono } from "hono";
import { streamSSE } from "hono/streaming";
import { Session } from "../session/session";
import { zValidator } from "@hono/zod-validator";
import { z } from "zod";
export namespace Server {
const log = Log.create({ service: "server" });
const PORT = 16713;
export function listen(input?: { port?: number }) {
const port = input?.port ?? PORT;
log.info("trying", { port });
try {
const server = Bun.serve({
port,
websocket: {
open() {},
message() {},
export type App = ReturnType<typeof listen>;
export function listen() {
const app = new Hono()
.get("/event", async (c) => {
log.info("event connected");
return streamSSE(c, async (stream) => {
const unsub = Bus.subscribeAll(async (event) => {
await stream.writeSSE({
data: JSON.stringify(event),
});
});
await new Promise<void>((resolve) => {
stream.onAbort(() => {
unsub();
resolve();
log.info("event disconnected");
});
});
});
})
.post("/session_create", async (c) => {
const session = await Session.create();
return c.json(session);
})
.post(
"/session_chat",
zValidator(
"json",
z.object({
sessionID: z.string(),
parts: z.custom<Session.Message["parts"]>(),
}),
),
async (c) => {
const body = c.req.valid("json");
const msg = await Session.chat(body.sessionID, ...body.parts);
return c.json(msg);
},
routes: {
"/ws": (req, server) => {
if (server.upgrade(req)) return;
return new Response("Not a websocket request", { status: 400 });
},
},
});
log.info("listening", { port });
return {
server,
};
} catch (e: any) {
if (e?.code === "EADDRINUSE") {
return listen({ port: port + 1 });
}
throw e;
}
);
Bun.serve({
port: PORT,
hostname: "0.0.0.0",
idleTimeout: 0,
fetch: app.fetch,
});
return app;
}
}

View File

@@ -1,5 +1,5 @@
import path from "path";
import { z } from "zod";
import { z } from "zod/v3";
import { App } from "../app/";
import { Identifier } from "../id/id";
import { LLM } from "../llm/llm";
@@ -11,7 +11,9 @@ import {
tool,
type TextUIPart,
type ToolInvocationUIPart,
type UIDataTypes,
type UIMessage,
type UIMessagePart,
} from "ai";
export namespace Session {
@@ -20,11 +22,18 @@ export namespace Session {
export interface Info {
id: string;
title: string;
tokens: {
input: number;
output: number;
reasoning: number;
};
}
export type Message = UIMessage<{ sessionID: string }>;
const state = App.state("session", () => {
const sessions = new Map<string, Info>();
const messages = new Map<string, UIMessage[]>();
const messages = new Map<string, Message[]>();
return {
sessions,
@@ -36,12 +45,14 @@ export namespace Session {
const result: Info = {
id: Identifier.descending("session"),
title: "New Session - " + new Date().toISOString(),
tokens: {
input: 0,
output: 0,
reasoning: 0,
},
};
log.info("created", result);
await Storage.write(
"session/info/" + result.id + ".json",
JSON.stringify(result),
);
await Storage.writeJSON("session/info/" + result.id, result);
state().sessions.set(result.id, result);
return result;
}
@@ -51,23 +62,35 @@ export namespace Session {
if (result) {
return result;
}
const read = JSON.parse(await Storage.readToString("session/info/" + id));
const read = await Storage.readJSON<Info>("session/info/" + id);
state().sessions.set(id, read);
return read;
return read as Info;
}
export async function update(session: Info) {
state().sessions.set(session.id, session);
await Storage.writeJSON("session/info/" + session.id, session);
}
export async function messages(sessionID: string) {
const result = state().messages.get(sessionID);
if (result) {
return result;
const match = state().messages.get(sessionID);
if (match) {
return match;
}
const read = JSON.parse(
await Storage.readToString(
"session/message/" + sessionID + ".json",
).catch(() => "[]"),
);
state().messages.set(sessionID, read);
return read;
const result = [] as Message[];
const list = await Storage.list("session/message/" + sessionID)
.then((x) => x.toArray())
.catch(() => {});
if (!list) return result;
for (const item of list) {
const messageID = path.basename(item.path, ".json");
const read = await Storage.readJSON<Message>(
"session/message/" + sessionID + "/" + messageID,
);
result.push(read);
}
state().messages.set(sessionID, result);
return result;
}
export async function* list() {
@@ -81,11 +104,23 @@ export namespace Session {
}
}
export async function chat(sessionID: string, msg: UIMessage) {
export async function chat(
sessionID: string,
...parts: UIMessagePart<UIDataTypes>[]
) {
const session = await get(sessionID);
const l = log.clone().tag("session", sessionID);
l.info("chatting");
const msgs = (await messages(sessionID)) ?? [
{
const msgs = await messages(sessionID);
async function write(msg: Message) {
return Storage.writeJSON(
"session/message/" + sessionID + "/" + msg.id,
msg,
);
}
if (msgs.length === 0) {
const system: UIMessage<{ sessionID: string }> = {
id: Identifier.ascending("message"),
role: "system",
parts: [
@@ -94,40 +129,38 @@ export namespace Session {
text: "You are a helpful assistant called opencode",
},
],
} as UIMessage,
];
msgs.push(msg);
state().messages.set(sessionID, msgs);
async function write() {
return Storage.write(
"session/message/" + sessionID + ".json",
JSON.stringify(msgs),
);
metadata: {
sessionID,
},
};
msgs.push(system);
state().messages.set(sessionID, msgs);
await write(system);
}
await write();
const msg: Message = {
role: "user",
id: Identifier.ascending("message"),
parts,
metadata: {
sessionID,
},
};
msgs.push(msg);
await write(msg);
const model = await LLM.findModel("claude-3-7-sonnet-20250219");
const result = streamText({
messages: convertToModelMessages(msgs),
temperature: 0,
tools: {
test: tool({
id: "opencode.test" as const,
parameters: z.object({
feeling: z.string(),
}),
execute: async () => {
return `Hello`;
},
description: "call this tool to get a greeting",
}),
},
model,
});
const next: UIMessage = {
const next: Message = {
id: Identifier.ascending("message"),
role: "assistant",
parts: [],
metadata: {
sessionID,
},
};
msgs.push(next);
let text: TextUIPart | undefined;
@@ -135,7 +168,9 @@ export namespace Session {
while (true) {
const { done, value } = await reader.read();
if (done) break;
l.info("part", value);
l.info("part", {
type: value.type,
});
switch (value.type) {
case "start":
break;
@@ -175,15 +210,15 @@ export namespace Session {
state: "result",
result: value.result,
};
await write();
}
break;
case "finish":
await write();
break;
case "finish-step":
await write();
break;
case "error":
log.error("error", value);
break;
default:
@@ -191,6 +226,13 @@ export namespace Session {
type: value.type,
});
}
await write(next);
}
const usage = await result.totalUsage;
session.tokens.input += usage.inputTokens || 0;
session.tokens.output += usage.outputTokens || 0;
session.tokens.reasoning += usage.reasoningTokens || 0;
await update(session);
return next;
}
}

View File

@@ -4,10 +4,19 @@ import fs from "fs/promises";
import { Log } from "../util/log";
import { App } from "../app";
import { AppPath } from "../app/path";
import { Bus } from "../bus";
import z from "zod/v4";
export namespace Storage {
const log = Log.create({ service: "storage" });
export const Event = {
Write: Bus.event(
"storage.write",
z.object({ key: z.string(), body: z.any() }),
),
};
const state = App.state("storage", async () => {
const app = await App.use();
const storageDir = AppPath.storage(app.root);
@@ -36,4 +45,15 @@ export namespace Storage {
export const read = expose("read");
export const list = expose("list");
export const readToString = expose("readToString");
export async function readJSON<T>(key: string) {
const data = await readToString(key + ".json");
return JSON.parse(data) as T;
}
export async function writeJSON<T>(key: string, data: T) {
Bus.publish(Event.Write, { key, body: data });
const json = JSON.stringify(data);
await write(key + ".json", json);
}
}

0
js/src/util/event.ts Normal file
View File

View File

@@ -2,16 +2,21 @@ export namespace Log {
export function create(tags?: Record<string, any>) {
tags = tags || {};
function build(message: any, extra?: Record<string, any>) {
const prefix = Object.entries({
...tags,
...extra,
})
.map(([key, value]) => `${key}=${value}`)
.join(" ");
return [prefix, message];
}
const result = {
info(message?: any, extra?: Record<string, any>) {
const prefix = Object.entries({
...tags,
...extra,
})
.map(([key, value]) => `${key}=${value}`)
.join(" ");
console.log(prefix, message);
return result;
console.log(...build(message, extra));
},
error(message?: any, extra?: Record<string, any>) {
console.error(...build(message, extra));
},
tag(key: string, value: string) {
if (tags) tags[key] = value;