diff --git a/packages/opencode/src/file/watcher.ts b/packages/opencode/src/file/watcher.ts index 7d190c60..d5985b58 100644 --- a/packages/opencode/src/file/watcher.ts +++ b/packages/opencode/src/file/watcher.ts @@ -63,7 +63,8 @@ export namespace FileWatcher { return { sub } }, async (state) => { - state.sub?.unsubscribe() + if (!state.sub) return + await state.sub?.unsubscribe() }, ) diff --git a/packages/opencode/src/index.ts b/packages/opencode/src/index.ts index a0cce76a..45ccd3ca 100644 --- a/packages/opencode/src/index.ts +++ b/packages/opencode/src/index.ts @@ -22,8 +22,6 @@ import { AttachCommand } from "./cli/cmd/attach" import { AcpCommand } from "./cli/cmd/acp" import { EOL } from "os" -const cancel = new AbortController() - process.on("unhandledRejection", (e) => { Log.Default.error("rejection", { e: e instanceof Error ? e.message : e, @@ -135,6 +133,10 @@ try { console.error(e) } process.exitCode = 1 +} finally { + // Some subprocesses don't react properly to SIGTERM and similar signals. + // Most notably, some docker-container-based MCP servers don't handle such signals unless + // run using `docker run --init`. + // Explicitly exit to avoid any hanging subprocesses. + process.exit(); } - -cancel.abort() diff --git a/packages/opencode/src/lsp/index.ts b/packages/opencode/src/lsp/index.ts index d533815f..72a9cae2 100644 --- a/packages/opencode/src/lsp/index.ts +++ b/packages/opencode/src/lsp/index.ts @@ -101,9 +101,7 @@ export namespace LSP { } }, async (state) => { - for (const client of state.clients) { - await client.shutdown() - } + await Promise.all(state.clients.map((client) => client.shutdown())) }, ) diff --git a/packages/opencode/src/mcp/index.ts b/packages/opencode/src/mcp/index.ts index fa3513bb..b0e72b53 100644 --- a/packages/opencode/src/mcp/index.ts +++ b/packages/opencode/src/mcp/index.ts @@ -145,9 +145,7 @@ export namespace MCP { } }, async (state) => { - for (const client of Object.values(state.clients)) { - client.close() - } + await Promise.all(Object.values(state.clients).map((client) => client.close())) }, ) diff --git a/packages/opencode/src/project/bootstrap.ts b/packages/opencode/src/project/bootstrap.ts index c7805aa7..45e85fd2 100644 --- a/packages/opencode/src/project/bootstrap.ts +++ b/packages/opencode/src/project/bootstrap.ts @@ -11,7 +11,7 @@ export async function InstanceBootstrap() { await Plugin.init() Share.init() Format.init() - LSP.init() + await LSP.init() FileWatcher.init() File.init() } diff --git a/packages/opencode/src/project/state.ts b/packages/opencode/src/project/state.ts index 2ffef3b3..6377833e 100644 --- a/packages/opencode/src/project/state.ts +++ b/packages/opencode/src/project/state.ts @@ -1,23 +1,26 @@ +import { Log } from "@/util/log" + export namespace State { interface Entry { state: any dispose?: (state: any) => Promise } - const entries = new Map>() + const log = Log.create({ service: "state" }) + const recordsByKey = new Map>() export function create(root: () => string, init: () => S, dispose?: (state: Awaited) => Promise) { return () => { const key = root() - let collection = entries.get(key) - if (!collection) { - collection = new Map() - entries.set(key, collection) + let entries = recordsByKey.get(key) + if (!entries) { + entries = new Map() + recordsByKey.set(key, entries) } - const exists = collection.get(init) + const exists = entries.get(init) if (exists) return exists.state as S const state = init() - collection.set(init, { + entries.set(init, { state, dispose, }) @@ -26,9 +29,38 @@ export namespace State { } export async function dispose(key: string) { - for (const [_, entry] of entries.get(key)?.entries() ?? []) { + const entries = recordsByKey.get(key) + if (!entries) return + + log.info("waiting for state disposal to complete", { key }) + + let disposalFinished = false + + setTimeout(() => { + if (!disposalFinished) { + log.warn( + "state disposal is taking an unusually long time - if it does not complete in a reasonable time, please report this as a bug", + { key }, + ) + } + }, 10000).unref() + + const tasks: Promise[] = [] + for (const entry of entries.values()) { if (!entry.dispose) continue - await entry.dispose(await entry.state) + + const task = Promise.resolve(entry.state) + .then((state) => entry.dispose!(state)) + .catch((error) => { + log.error("Error while disposing state:", { error, key }) + }) + + tasks.push(task) } + + await Promise.all(tasks) + + disposalFinished = true + log.info("state disposal completed", { key }) } } diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts index 7018978e..184f4af8 100644 --- a/packages/opencode/src/session/prompt.ts +++ b/packages/opencode/src/session/prompt.ts @@ -74,13 +74,22 @@ export namespace SessionPrompt { callback: (input: MessageV2.WithParts) => void }[] >() + const pending = new Set>() + + const track = (promise: Promise) => { + pending.add(promise) + promise.finally(() => pending.delete(promise)) + } return { queued, + pending, + track, } }, async (current) => { current.queued.clear() + await Promise.allSettled([...current.pending]) }, ) @@ -191,28 +200,6 @@ export namespace SessionPrompt { processor, }) - // const permUnsub = (() => { - // const handled = new Set() - // const options = [ - // { optionId: "allow_once", kind: "allow_once", name: "Allow once" }, - // { optionId: "allow_always", kind: "allow_always", name: "Always allow" }, - // { optionId: "reject_once", kind: "reject_once", name: "Reject" }, - // ] - // return Bus.subscribe(Permission.Event.Updated, async (event) => { - // const info = event.properties - // if (info.sessionID !== input.sessionID) return - // if (handled.has(info.id)) return - // handled.add(info.id) - // const toolCallId = info.callID ?? info.id - // const metadata = info.metadata ?? {} - // // TODO: emit permission event to bus for ACP to handle - // Permission.respond({ sessionID: info.sessionID, permissionID: info.id, response: "reject" }) - // }) - // })() - // await using _permSub = defer(() => { - // permUnsub?.() - // }) - const params = await Plugin.trigger( "chat.params", { @@ -247,13 +234,15 @@ export namespace SessionPrompt { step++ await processor.next(msgs.findLast((m) => m.info.role === "user")?.info.id!) if (step === 1) { - ensureTitle({ - session, - history: msgs, - message: userMsg, - providerID: model.providerID, - modelID: model.info.id, - }) + state().track( + ensureTitle({ + session, + history: msgs, + message: userMsg, + providerID: model.providerID, + modelID: model.info.id, + }), + ) SessionSummary.summarize({ sessionID: input.sessionID, messageID: userMsg.info.id, @@ -1730,7 +1719,7 @@ export namespace SessionPrompt { thinkingBudget: 0, } } - generateText({ + await generateText({ maxOutputTokens: small.info.reasoning ? 1500 : 20, providerOptions: ProviderTransform.providerOptions(small.npm, small.providerID, options), messages: [