message storage performance improvements

This commit is contained in:
Dax Raad
2025-11-07 01:11:47 -05:00
parent d0f5c825bd
commit 9554abb56e
8 changed files with 28 additions and 32 deletions

View File

@@ -3,7 +3,6 @@ import {
BoxRenderable,
TextareaRenderable,
MouseEvent,
KeyEvent,
PasteEvent,
t,
dim,

View File

@@ -270,6 +270,7 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({
},
async sync(sessionID: string) {
const now = Date.now()
if (store.message[sessionID]) return
console.log("syncing", sessionID)
const [session, messages, todo, diff] = await Promise.all([
sdk.client.session.get({ path: { id: sessionID }, throwOnError: true }),

View File

@@ -756,7 +756,7 @@ export namespace Server {
validator(
"query",
z.object({
limit: z.coerce.number().optional(),
limit: z.coerce.number(),
}),
),
async (c) => {

View File

@@ -111,9 +111,7 @@ export namespace SessionCompaction {
draft.time.compacting = undefined
})
})
const toSummarize = await Session.messages({ sessionID: input.sessionID }).then(
MessageV2.filterCompacted,
)
const toSummarize = await MessageV2.filterCompacted(Session.messageStream(input.sessionID))
const model = await Provider.getModel(input.providerID, input.modelID)
const system = [
...SystemPrompt.summarize(model.providerID),

View File

@@ -273,6 +273,17 @@ export namespace Session {
return diffs ?? []
})
export const messageStream = fn(Identifier.schema("session"), async function* (sessionID) {
const list = await Array.fromAsync(await Storage.list(["message", sessionID]))
for (let i = list.length - 1; i >= 0; i--) {
const read = await Storage.read<MessageV2.Info>(list[i])
yield {
info: read,
parts: await getParts(read.id),
}
}
})
export const messages = fn(
z.object({
sessionID: Identifier.schema("session"),
@@ -280,16 +291,11 @@ export namespace Session {
}),
async (input) => {
const result = [] as MessageV2.WithParts[]
const list = (await Array.fromAsync(await Storage.list(["message", input.sessionID])))
.toSorted((a, b) => a.at(-1)!.localeCompare(b.at(-1)!))
.slice(-1 * (input.limit ?? 1_000_000))
for (const p of list) {
const read = await Storage.read<MessageV2.Info>(p)
result.push({
info: read,
parts: await getParts(read.id),
})
for await (const msg of messageStream(input.sessionID)) {
if (input.limit && result.length >= input.limit) break
result.push(msg)
}
result.reverse()
return result
},
)

View File

@@ -655,10 +655,14 @@ export namespace MessageV2 {
return convertToModelMessages(result)
}
export function filterCompacted(msgs: { info: MessageV2.Info; parts: MessageV2.Part[] }[]) {
const i = msgs.findLastIndex((m) => m.info.role === "assistant" && !!m.info.summary)
if (i === -1) return msgs.slice()
return msgs.slice(i)
export async function filterCompacted(stream: AsyncIterable<MessageV2.WithParts>) {
const result = [] as MessageV2.WithParts[]
for await (const msg of stream) {
result.push(msg)
if (msg.info.role === "assistant" && msg.info.summary === true) break
}
result.reverse()
return result
}
export function fromError(e: unknown, ctx: { providerID: string }) {

View File

@@ -434,9 +434,7 @@ export namespace SessionPrompt {
providerID: string
signal: AbortSignal
}) {
let msgs = await Session.messages({ sessionID: input.sessionID }).then(
MessageV2.filterCompacted,
)
let msgs = await MessageV2.filterCompacted(Session.messageStream(input.sessionID))
const lastAssistant = msgs.findLast((msg) => msg.info.role === "assistant")
if (
lastAssistant?.info.role === "assistant" &&

View File

@@ -151,17 +151,7 @@ export namespace SessionSummary {
messageID: Identifier.schema("message").optional(),
}),
async (input) => {
let all = await Session.messages({ sessionID: input.sessionID })
if (input.messageID)
all = all.filter(
(x) =>
x.info.id === input.messageID ||
(x.info.role === "assistant" && x.info.parentID === input.messageID),
)
return computeDiff({
messages: all,
})
return Storage.read<Snapshot.FileDiff[]>(["session_diff", input.sessionID]) ?? []
},
)