diff --git a/opencode.json b/opencode.json index 720ece5c..a8819ebd 100644 --- a/opencode.json +++ b/opencode.json @@ -1,3 +1,8 @@ { - "$schema": "https://opencode.ai/config.json" + "$schema": "https://opencode.ai/config.json", + "permission": { + "bash": { + "cat*": "ask" + } + } } diff --git a/packages/opencode/src/acp/agent.ts b/packages/opencode/src/acp/agent.ts index 4f2f6dc4..03bc4d5d 100644 --- a/packages/opencode/src/acp/agent.ts +++ b/packages/opencode/src/acp/agent.ts @@ -1,204 +1,587 @@ import type { - Agent, + Agent as ACPAgent, AgentSideConnection, AuthenticateRequest, - AuthenticateResponse, CancelNotification, InitializeRequest, - InitializeResponse, LoadSessionRequest, - LoadSessionResponse, NewSessionRequest, - NewSessionResponse, + PermissionOption, PromptRequest, - PromptResponse, SetSessionModelRequest, - SetSessionModelResponse, + SetSessionModeRequest, + SetSessionModeResponse, } from "@agentclientprotocol/sdk" import { Log } from "../util/log" import { ACPSessionManager } from "./session" import type { ACPConfig } from "./types" import { Provider } from "../provider/provider" import { SessionPrompt } from "../session/prompt" -import { Identifier } from "../id/id" +import { Installation } from "@/installation" +import { SessionLock } from "@/session/lock" +import { Bus } from "@/bus" +import { MessageV2 } from "@/session/message-v2" +import { Storage } from "@/storage/storage" +import { Command } from "@/command" +import { Agent as Agents } from "@/agent/agent" +import { Permission } from "@/permission" -export class OpenCodeAgent implements Agent { - private log = Log.create({ service: "acp-agent" }) - private sessionManager = new ACPSessionManager() - private connection: AgentSideConnection - private config: ACPConfig +export namespace ACP { + const log = Log.create({ service: "acp-agent" }) - constructor(connection: AgentSideConnection, config: ACPConfig = {}) { - this.connection = connection - this.config = config - } + // TODO: mcp servers? - async initialize(params: InitializeRequest): Promise { - this.log.info("initialize", { protocolVersion: params.protocolVersion }) + type ToolKind = + | "read" + | "edit" + | "delete" + | "move" + | "search" + | "execute" + | "think" + | "fetch" + | "switch_mode" + | "other" - return { - protocolVersion: 1, - agentCapabilities: { - loadSession: false, - }, - _meta: { - opencode: { - version: await import("../installation").then((m) => m.Installation.VERSION), + export class Agent implements ACPAgent { + private sessionManager = new ACPSessionManager() + private connection: AgentSideConnection + private config: ACPConfig + + constructor(connection: AgentSideConnection, config: ACPConfig = {}) { + this.connection = connection + this.config = config + this.setupEventSubscriptions() + } + + private setupEventSubscriptions() { + const options: PermissionOption[] = [ + { optionId: "once", kind: "allow_once", name: "Allow once" }, + { optionId: "always", kind: "allow_always", name: "Always allow" }, + { optionId: "reject", kind: "reject_once", name: "Reject" }, + ] + Bus.subscribe(Permission.Event.Updated, async (event) => { + const acpSession = this.sessionManager.get(event.properties.sessionID) + if (!acpSession) return + try { + const permission = event.properties + const res = await this.connection + .requestPermission({ + sessionId: acpSession.id, + toolCall: { + toolCallId: permission.callID ?? permission.id, + status: "pending", + title: permission.title, + rawInput: permission.metadata, + kind: toToolKind(permission.type), + locations: toLocations(permission.type, permission.metadata), + }, + options, + }) + .catch((error) => { + log.error("failed to request permission from ACP", { + error, + permissionID: permission.id, + sessionID: permission.sessionID, + }) + Permission.respond({ + sessionID: permission.sessionID, + permissionID: permission.id, + response: "reject", + }) + return + }) + if (!res) return + if (res.outcome.outcome !== "selected") { + Permission.respond({ sessionID: permission.sessionID, permissionID: permission.id, response: "reject" }) + return + } + Permission.respond({ + sessionID: permission.sessionID, + permissionID: permission.id, + response: res.outcome.optionId as "once" | "always" | "reject", + }) + } catch (err) { + if (!(err instanceof Permission.RejectedError)) { + log.error("unexpected error when handling permission", { error: err }) + throw err + } + } + }) + + Bus.subscribe(MessageV2.Event.PartUpdated, async (event) => { + const props = event.properties + const { part } = props + const acpSession = this.sessionManager.get(part.sessionID) + if (!acpSession) return + + const message = await Storage.read(["message", part.sessionID, part.messageID]).catch( + () => undefined, + ) + if (!message || message.role !== "assistant") return + + if (part.type === "tool") { + switch (part.state.status) { + case "pending": + await this.connection + .sessionUpdate({ + sessionId: acpSession.id, + update: { + sessionUpdate: "tool_call", + toolCallId: part.callID, + title: part.tool, + kind: toToolKind(part.tool), + status: "pending", + locations: [], + rawInput: {}, + }, + }) + .catch((err) => { + log.error("failed to send tool pending to ACP", { error: err }) + }) + break + case "running": + await this.connection + .sessionUpdate({ + sessionId: acpSession.id, + update: { + sessionUpdate: "tool_call_update", + toolCallId: part.callID, + status: "in_progress", + locations: toLocations(part.tool, part.state.input), + rawInput: part.state.input, + }, + }) + .catch((err) => { + log.error("failed to send tool in_progress to ACP", { error: err }) + }) + break + case "completed": + await this.connection + .sessionUpdate({ + sessionId: acpSession.id, + update: { + sessionUpdate: "tool_call_update", + toolCallId: part.callID, + status: "completed", + content: [ + { + type: "content", + content: { + type: "text", + text: part.state.output, + }, + }, + ], + title: part.state.title, + rawOutput: { + output: part.state.output, + metadata: part.state.metadata, + }, + }, + }) + .catch((err) => { + log.error("failed to send tool completed to ACP", { error: err }) + }) + break + case "error": + await this.connection + .sessionUpdate({ + sessionId: acpSession.id, + update: { + sessionUpdate: "tool_call_update", + toolCallId: part.callID, + status: "failed", + content: [ + { + type: "content", + content: { + type: "text", + text: part.state.error, + }, + }, + ], + rawOutput: { + error: part.state.error, + }, + }, + }) + .catch((err) => { + log.error("failed to send tool error to ACP", { error: err }) + }) + break + } + } else if (part.type === "text") { + const delta = props.delta + if (delta && part.synthetic !== true) { + await this.connection + .sessionUpdate({ + sessionId: acpSession.id, + update: { + sessionUpdate: "agent_message_chunk", + content: { + type: "text", + text: delta, + }, + }, + }) + .catch((err) => { + log.error("failed to send text to ACP", { error: err }) + }) + } + } else if (part.type === "reasoning") { + const delta = props.delta + if (delta) { + await this.connection + .sessionUpdate({ + sessionId: acpSession.id, + update: { + sessionUpdate: "agent_thought_chunk", + content: { + type: "text", + text: delta, + }, + }, + }) + .catch((err) => { + log.error("failed to send reasoning to ACP", { error: err }) + }) + } + } + }) + } + + async initialize(params: InitializeRequest) { + log.info("initialize", { protocolVersion: params.protocolVersion }) + + return { + protocolVersion: 1, + agentCapabilities: { + loadSession: true, + // TODO: map acp mcp + // mcpCapabilities: { + // http: true, + // sse: true, + // }, }, - }, + authMethods: [ + { + description: "Run `opencode auth login` in the terminal", + name: "Login with opencode", + id: "opencode-login", + }, + ], + _meta: { + opencode: { + version: Installation.VERSION, + }, + }, + } + } + + async authenticate(_params: AuthenticateRequest) { + throw new Error("Authentication not implemented") + } + + async newSession(params: NewSessionRequest) { + const model = await defaultModel(this.config) + const session = await this.sessionManager.create(params.cwd, params.mcpServers, model) + + const load = await this.loadSession({ + cwd: params.cwd, + mcpServers: params.mcpServers, + sessionId: session.id, + }) + + return { + sessionId: session.id, + models: load.models, + modes: load.modes, + _meta: {}, + } + } + + async loadSession(params: LoadSessionRequest) { + const model = await defaultModel(this.config) + const sessionId = params.sessionId + + const providers = await Provider.list() + const entries = Object.entries(providers).sort((a, b) => { + const nameA = a[1].info.name.toLowerCase() + const nameB = b[1].info.name.toLowerCase() + if (nameA < nameB) return -1 + if (nameA > nameB) return 1 + return 0 + }) + const availableModels = entries.flatMap(([providerID, provider]) => { + const models = Provider.sort(Object.values(provider.info.models)) + return models.map((model) => ({ + modelId: `${providerID}/${model.id}`, + name: `${provider.info.name}/${model.name}`, + })) + }) + + const availableCommands = (await Command.list()).map((command) => ({ + name: command.name, + description: command.description ?? "", + })) + + setTimeout(() => { + this.connection.sessionUpdate({ + sessionId, + update: { + sessionUpdate: "available_commands_update", + availableCommands, + }, + }) + }, 0) + + const availableModes = (await Agents.list()) + .filter((agent) => agent.mode !== "subagent") + .map((agent) => ({ + id: agent.name, + name: agent.name, + description: agent.description, + })) + + const currentModeId = availableModes.find((m) => m.name === "build")?.id ?? availableModes[0].id + + return { + sessionId, + models: { + currentModelId: `${model.providerID}/${model.modelID}`, + availableModels, + }, + modes: { + availableModes, + currentModeId, + }, + _meta: {}, + } + } + + async setSessionModel(params: SetSessionModelRequest) { + const session = this.sessionManager.get(params.sessionId) + if (!session) { + throw new Error(`Session not found: ${params.sessionId}`) + } + + const parsed = Provider.parseModel(params.modelId) + const model = await Provider.getModel(parsed.providerID, parsed.modelID) + + this.sessionManager.setModel(session.id, { + providerID: model.providerID, + modelID: model.modelID, + }) + + return { + _meta: {}, + } + } + + async setSessionMode(params: SetSessionModeRequest): Promise { + const session = this.sessionManager.get(params.sessionId) + if (!session) { + throw new Error(`Session not found: ${params.sessionId}`) + } + await Agents.get(params.modeId).then((agent) => { + if (!agent) throw new Error(`Agent not found: ${params.modeId}`) + }) + this.sessionManager.setMode(params.sessionId, params.modeId) + } + + async prompt(params: PromptRequest) { + const sessionID = params.sessionId + const acpSession = this.sessionManager.get(sessionID) + if (!acpSession) { + throw new Error(`Session not found: ${sessionID}`) + } + + const current = acpSession.model + const model = current ?? (await defaultModel(this.config)) + if (!current) { + this.sessionManager.setModel(acpSession.id, model) + } + const agent = acpSession.modeId ?? "build" + + const parts: SessionPrompt.PromptInput["parts"] = [] + for (const part of params.prompt) { + switch (part.type) { + case "text": + parts.push({ + type: "text" as const, + text: part.text, + }) + break + case "image": + if (part.data) { + parts.push({ + type: "file", + url: `data:${part.mimeType};base64,${part.data}`, + mime: part.mimeType, + }) + } else if (part.uri && part.uri.startsWith("http:")) { + parts.push({ + type: "file", + url: part.uri, + mime: part.mimeType, + }) + } + break + + case "resource_link": + const parsed = parseUri(part.uri) + parts.push(parsed) + + break + + case "resource": + const resource = part.resource + if ("text" in resource) { + parts.push({ + type: "text", + text: resource.text, + }) + } + break + + default: + break + } + } + + log.info("parts", { parts }) + + const cmd = await (async () => { + const text = parts.filter((part) => part.type === "text").join("") + const match = text.match(/^\/(\w+)\s*(.*)$/) + if (!match) return + + const [c, args] = match.slice(1) + const command = await Command.get(c) + if (!command) return + return { command, args } + })() + + if (cmd) { + await SessionPrompt.command({ + sessionID, + command: cmd.command.name, + arguments: cmd.args, + agent, + }) + } else { + await SessionPrompt.prompt({ + sessionID, + model: { + providerID: model.providerID, + modelID: model.modelID, + }, + parts, + agent, + }) + } + + return { + stopReason: "end_turn" as const, + _meta: {}, + } + } + + async cancel(params: CancelNotification) { + SessionLock.abort(params.sessionId) } } - async authenticate(params: AuthenticateRequest): Promise { - this.log.info("authenticate", { methodId: params.methodId }) - throw new Error("Authentication not yet implemented") - } + function toToolKind(toolName: string): ToolKind { + const tool = toolName.toLocaleLowerCase() + switch (tool) { + case "bash": + return "execute" + case "webfetch": + return "fetch" - async newSession(params: NewSessionRequest): Promise { - this.log.info("newSession", { cwd: params.cwd, mcpServers: params.mcpServers.length }) + case "edit": + case "patch": + case "write": + return "edit" - const model = await this.defaultModel() - const session = await this.sessionManager.create(params.cwd, params.mcpServers, model) - const availableModels = await this.availableModels() + case "grep": + case "glob": + case "context7_resolve_library_id": + case "context7_get_library_docs": + return "search" - return { - sessionId: session.id, - models: { - currentModelId: `${model.providerID}/${model.modelID}`, - availableModels, - }, - _meta: {}, + case "list": + case "read": + return "read" + + default: + return "other" } } - async loadSession(params: LoadSessionRequest): Promise { - this.log.info("loadSession", { sessionId: params.sessionId, cwd: params.cwd }) - - const defaultModel = await this.defaultModel() - const session = await this.sessionManager.load(params.sessionId, params.cwd, params.mcpServers, defaultModel) - const availableModels = await this.availableModels() - - return { - models: { - currentModelId: `${session.model.providerID}/${session.model.modelID}`, - availableModels, - }, - _meta: {}, + function toLocations(toolName: string, input: Record): { path: string }[] { + const tool = toolName.toLocaleLowerCase() + switch (tool) { + case "read": + case "edit": + case "write": + return input["filePath"] ? [{ path: input["filePath"] }] : [] + case "glob": + case "grep": + return input["path"] ? [{ path: input["path"] }] : [] + case "bash": + return [] + case "list": + return input["path"] ? [{ path: input["path"] }] : [] + default: + return [] } } - async setSessionModel(params: SetSessionModelRequest): Promise { - this.log.info("setSessionModel", { sessionId: params.sessionId, modelId: params.modelId }) - - const session = this.sessionManager.get(params.sessionId) - if (!session) { - throw new Error(`Session not found: ${params.sessionId}`) - } - - const parsed = Provider.parseModel(params.modelId) - const model = await Provider.getModel(parsed.providerID, parsed.modelID) - - this.sessionManager.setModel(session.id, { - providerID: model.providerID, - modelID: model.modelID, - }) - - return { - _meta: {}, - } - } - - private async defaultModel() { - const configured = this.config.defaultModel + async function defaultModel(config: ACPConfig) { + const configured = config.defaultModel if (configured) return configured return Provider.defaultModel() } - private async availableModels() { - const providers = await Provider.list() - const entries = Object.entries(providers).sort((a, b) => { - const nameA = a[1].info.name.toLowerCase() - const nameB = b[1].info.name.toLowerCase() - if (nameA < nameB) return -1 - if (nameA > nameB) return 1 - return 0 - }) - return entries.flatMap(([providerID, provider]) => { - const models = Provider.sort(Object.values(provider.info.models)) - return models.map((model) => ({ - modelId: `${providerID}/${model.id}`, - name: `${provider.info.name}/${model.name}`, - })) - }) - } - - async prompt(params: PromptRequest): Promise { - this.log.info("prompt", { - sessionId: params.sessionId, - promptLength: params.prompt.length, - }) - - const acpSession = this.sessionManager.get(params.sessionId) - if (!acpSession) { - throw new Error(`Session not found: ${params.sessionId}`) - } - - const current = acpSession.model - const model = current ?? (await this.defaultModel()) - if (!current) { - this.sessionManager.setModel(acpSession.id, model) - } - - const parts = params.prompt.map((content) => { - if (content.type === "text") { + function parseUri( + uri: string, + ): { type: "file"; url: string; filename: string; mime: string } | { type: "text"; text: string } { + try { + if (uri.startsWith("file://")) { + const path = uri.slice(7) + const name = path.split("/").pop() || path return { - type: "text" as const, - text: content.text, + type: "file", + url: uri, + filename: name, + mime: "text/plain", } } - if (content.type === "resource") { - const resource = content.resource - let text = "" - if ("text" in resource && typeof resource.text === "string") { - text = resource.text - } - return { - type: "text" as const, - text, + if (uri.startsWith("zed://")) { + const url = new URL(uri) + const path = url.searchParams.get("path") + if (path) { + const name = path.split("/").pop() || path + return { + type: "file", + url: `file://${path}`, + filename: name, + mime: "text/plain", + } } } return { - type: "text" as const, - text: JSON.stringify(content), + type: "text", + text: uri, + } + } catch { + return { + type: "text", + text: uri, } - }) - - await SessionPrompt.prompt({ - sessionID: acpSession.openCodeSessionId, - messageID: Identifier.ascending("message"), - model: { - providerID: model.providerID, - modelID: model.modelID, - }, - parts, - acpConnection: { - connection: this.connection, - sessionId: params.sessionId, - }, - }) - - this.log.debug("prompt response completed") - - // Streaming notifications are now handled during prompt execution - // No need to send final text chunk here - - return { - stopReason: "end_turn", - _meta: {}, } } - - async cancel(params: CancelNotification): Promise { - this.log.info("cancel", { sessionId: params.sessionId }) - } } diff --git a/packages/opencode/src/acp/client.ts b/packages/opencode/src/acp/client.ts deleted file mode 100644 index 24119eab..00000000 --- a/packages/opencode/src/acp/client.ts +++ /dev/null @@ -1,85 +0,0 @@ -import type { - Client, - CreateTerminalRequest, - CreateTerminalResponse, - KillTerminalCommandRequest, - KillTerminalResponse, - ReadTextFileRequest, - ReadTextFileResponse, - ReleaseTerminalRequest, - ReleaseTerminalResponse, - RequestPermissionRequest, - RequestPermissionResponse, - SessionNotification, - TerminalOutputRequest, - TerminalOutputResponse, - WaitForTerminalExitRequest, - WaitForTerminalExitResponse, - WriteTextFileRequest, - WriteTextFileResponse, -} from "@agentclientprotocol/sdk" -import { Log } from "../util/log" - -export class ACPClient implements Client { - private log = Log.create({ service: "acp-client" }) - - async requestPermission(params: RequestPermissionRequest): Promise { - this.log.debug("requestPermission", params) - const firstOption = params.options[0] - if (!firstOption) { - return { outcome: { outcome: "cancelled" } } - } - return { - outcome: { - outcome: "selected", - optionId: firstOption.optionId, - }, - } - } - - async sessionUpdate(params: SessionNotification): Promise { - this.log.debug("sessionUpdate", { sessionId: params.sessionId }) - } - - async writeTextFile(params: WriteTextFileRequest): Promise { - this.log.debug("writeTextFile", { path: params.path }) - await Bun.write(params.path, params.content) - return { _meta: {} } - } - - async readTextFile(params: ReadTextFileRequest): Promise { - this.log.debug("readTextFile", { path: params.path }) - const file = Bun.file(params.path) - const exists = await file.exists() - if (!exists) { - throw new Error(`File not found: ${params.path}`) - } - const content = await file.text() - return { content, _meta: {} } - } - - async createTerminal(params: CreateTerminalRequest): Promise { - this.log.debug("createTerminal", params) - throw new Error("Terminal support not yet implemented") - } - - async terminalOutput(params: TerminalOutputRequest): Promise { - this.log.debug("terminalOutput", params) - throw new Error("Terminal support not yet implemented") - } - - async releaseTerminal(params: ReleaseTerminalRequest): Promise { - this.log.debug("releaseTerminal", params) - throw new Error("Terminal support not yet implemented") - } - - async waitForTerminalExit(params: WaitForTerminalExitRequest): Promise { - this.log.debug("waitForTerminalExit", params) - throw new Error("Terminal support not yet implemented") - } - - async killTerminal(params: KillTerminalCommandRequest): Promise { - this.log.debug("killTerminal", params) - throw new Error("Terminal support not yet implemented") - } -} diff --git a/packages/opencode/src/acp/server.ts b/packages/opencode/src/acp/server.ts deleted file mode 100644 index 0e5306dc..00000000 --- a/packages/opencode/src/acp/server.ts +++ /dev/null @@ -1,53 +0,0 @@ -import { AgentSideConnection, ndJsonStream } from "@agentclientprotocol/sdk" -import { Log } from "../util/log" -import { Instance } from "../project/instance" -import { OpenCodeAgent } from "./agent" - -export namespace ACPServer { - const log = Log.create({ service: "acp-server" }) - - export async function start() { - await Instance.provide({ - directory: process.cwd(), - fn: async () => { - log.info("starting ACP server", { cwd: process.cwd() }) - - const stdout = new WritableStream({ - write(chunk) { - process.stdout.write(chunk) - }, - }) - - const stdin = new ReadableStream({ - start(controller) { - process.stdin.on("data", (chunk) => { - controller.enqueue(new Uint8Array(chunk)) - }) - process.stdin.on("end", () => { - controller.close() - }) - }, - }) - - const stream = ndJsonStream(stdout, stdin) - - new AgentSideConnection((conn) => { - return new OpenCodeAgent(conn) - }, stream) - - await new Promise((resolve) => { - process.on("SIGTERM", () => { - log.info("received SIGTERM") - resolve() - }) - process.on("SIGINT", () => { - log.info("received SIGINT") - resolve() - }) - }) - - log.info("ACP server stopped") - }, - }) - } -} diff --git a/packages/opencode/src/acp/session.ts b/packages/opencode/src/acp/session.ts index 3a797259..652e8cfd 100644 --- a/packages/opencode/src/acp/session.ts +++ b/packages/opencode/src/acp/session.ts @@ -7,20 +7,15 @@ import type { ACPSessionState } from "./types" export class ACPSessionManager { private sessions = new Map() - async create( - cwd: string, - mcpServers: McpServer[], - model?: ACPSessionState["model"], - ): Promise { - const sessionId = `acp_${Identifier.ascending("session")}` - const openCodeSession = await Session.create({ title: `ACP Session ${sessionId}` }) + async create(cwd: string, mcpServers: McpServer[], model?: ACPSessionState["model"]): Promise { + const session = await Session.create({ title: `ACP Session ${crypto.randomUUID()}` }) + const sessionId = session.id const resolvedModel = model ?? (await Provider.defaultModel()) const state: ACPSessionState = { id: sessionId, cwd, mcpServers, - openCodeSessionId: openCodeSession.id, createdAt: new Date(), model: resolvedModel, } @@ -29,54 +24,22 @@ export class ACPSessionManager { return state } - get(sessionId: string): ACPSessionState | undefined { + get(sessionId: string) { return this.sessions.get(sessionId) } - async remove(sessionId: string): Promise { + async remove(sessionId: string) { const state = this.sessions.get(sessionId) if (!state) return - await Session.remove(state.openCodeSessionId).catch(() => {}) + await Session.remove(sessionId).catch(() => {}) this.sessions.delete(sessionId) } - has(sessionId: string): boolean { + has(sessionId: string) { return this.sessions.has(sessionId) } - async load( - sessionId: string, - cwd: string, - mcpServers: McpServer[], - model?: ACPSessionState["model"], - ): Promise { - const existing = this.sessions.get(sessionId) - if (existing) { - if (!existing.model) { - const resolved = model ?? (await Provider.defaultModel()) - existing.model = resolved - this.sessions.set(sessionId, existing) - } - return existing - } - - const openCodeSession = await Session.create({ title: `ACP Session ${sessionId} (loaded)` }) - const resolvedModel = model ?? (await Provider.defaultModel()) - - const state: ACPSessionState = { - id: sessionId, - cwd, - mcpServers, - openCodeSessionId: openCodeSession.id, - createdAt: new Date(), - model: resolvedModel, - } - - this.sessions.set(sessionId, state) - return state - } - getModel(sessionId: string) { const session = this.sessions.get(sessionId) if (!session) return @@ -90,4 +53,12 @@ export class ACPSessionManager { this.sessions.set(sessionId, session) return session } + + setMode(sessionId: string, modeId: string) { + const session = this.sessions.get(sessionId) + if (!session) return + session.modeId = modeId + this.sessions.set(sessionId, session) + return session + } } diff --git a/packages/opencode/src/acp/types.ts b/packages/opencode/src/acp/types.ts index 1bffa019..56308cb7 100644 --- a/packages/opencode/src/acp/types.ts +++ b/packages/opencode/src/acp/types.ts @@ -4,12 +4,12 @@ export interface ACPSessionState { id: string cwd: string mcpServers: McpServer[] - openCodeSessionId: string createdAt: Date model: { providerID: string modelID: string } + modeId?: string } export interface ACPConfig { diff --git a/packages/opencode/src/cli/cmd/acp.ts b/packages/opencode/src/cli/cmd/acp.ts index f415cd6a..6628137f 100644 --- a/packages/opencode/src/cli/cmd/acp.ts +++ b/packages/opencode/src/cli/cmd/acp.ts @@ -1,5 +1,17 @@ -import { ACPServer } from "../../acp/server" +import { Log } from "@/util/log" +import { bootstrap } from "../bootstrap" import { cmd } from "./cmd" +import { AgentSideConnection, ndJsonStream } from "@agentclientprotocol/sdk" +import { ACP } from "@/acp/agent" + +const log = Log.create({ service: "acp-command" }) + +process.on("unhandledRejection", (reason, promise) => { + log.error("Unhandled rejection", { + promise, + reason, + }) +}) export const AcpCommand = cmd({ command: "acp", @@ -13,6 +25,38 @@ export const AcpCommand = cmd({ }, handler: async (opts) => { if (opts.cwd) process.chdir(opts["cwd"]) - await ACPServer.start() + await bootstrap(process.cwd(), async () => { + const input = new WritableStream({ + write(chunk) { + return new Promise((resolve, reject) => { + process.stdout.write(Buffer.from(chunk), (err) => { + if (err) { + reject(err) + } else { + resolve() + } + }) + }) + }, + }) + const output = new ReadableStream({ + start(controller) { + process.stdin.on("data", (chunk: Buffer) => { + controller.enqueue(new Uint8Array(chunk)) + }) + process.stdin.on("end", () => controller.close()) + process.stdin.on("error", (err) => controller.error(err)) + }, + }) + + const stream = ndJsonStream(input, output) + + new AgentSideConnection((conn) => { + return new ACP.Agent(conn) + }, stream) + + log.info("setup connection") + }) + process.stdin.resume() }, }) diff --git a/packages/opencode/src/session/index.ts b/packages/opencode/src/session/index.ts index 64f64082..640dd55c 100644 --- a/packages/opencode/src/session/index.ts +++ b/packages/opencode/src/session/index.ts @@ -340,10 +340,25 @@ export namespace Session { }, ) - export const updatePart = fn(MessageV2.Part, async (part) => { + const UpdatePartInput = z.union([ + MessageV2.Part, + z.object({ + part: MessageV2.TextPart, + delta: z.string(), + }), + z.object({ + part: MessageV2.ReasoningPart, + delta: z.string(), + }), + ]) + + export const updatePart = fn(UpdatePartInput, async (input) => { + const part = "delta" in input ? input.part : input + const delta = "delta" in input ? input.delta : undefined await Storage.write(["part", part.messageID, part.id], part) Bus.publish(MessageV2.Event.PartUpdated, { part, + delta, }) return part }) diff --git a/packages/opencode/src/session/message-v2.ts b/packages/opencode/src/session/message-v2.ts index b3efefcc..c37773b7 100644 --- a/packages/opencode/src/session/message-v2.ts +++ b/packages/opencode/src/session/message-v2.ts @@ -359,6 +359,7 @@ export namespace MessageV2 { "message.part.updated", z.object({ part: Part, + delta: z.string().optional(), }), ), PartRemoved: Bus.event( diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts index b6ec891b..6adeb6f7 100644 --- a/packages/opencode/src/session/prompt.ts +++ b/packages/opencode/src/session/prompt.ts @@ -96,16 +96,6 @@ export namespace SessionPrompt { agent: z.string().optional(), system: z.string().optional(), tools: z.record(z.string(), z.boolean()).optional(), - /** - * ACP (Agent Client Protocol) connection details for streaming responses. - * When provided, enables real-time streaming and tool execution visibility. - */ - acpConnection: z - .object({ - connection: z.any(), // AgentSideConnection - using any to avoid circular deps - sessionId: z.string(), // ACP session ID (different from opencode sessionID) - }) - .optional(), parts: z.array( z.discriminatedUnion("type", [ MessageV2.TextPart.omit({ @@ -184,7 +174,6 @@ export namespace SessionPrompt { agent: agent.name, system, abort: abort.signal, - acpConnection: input.acpConnection, }) const tools = await resolveTools({ @@ -196,6 +185,28 @@ export namespace SessionPrompt { processor, }) + // const permUnsub = (() => { + // const handled = new Set() + // const options = [ + // { optionId: "allow_once", kind: "allow_once", name: "Allow once" }, + // { optionId: "allow_always", kind: "allow_always", name: "Always allow" }, + // { optionId: "reject_once", kind: "reject_once", name: "Reject" }, + // ] + // return Bus.subscribe(Permission.Event.Updated, async (event) => { + // const info = event.properties + // if (info.sessionID !== input.sessionID) return + // if (handled.has(info.id)) return + // handled.add(info.id) + // const toolCallId = info.callID ?? info.id + // const metadata = info.metadata ?? {} + // // TODO: emit permission event to bus for ACP to handle + // Permission.respond({ sessionID: info.sessionID, permissionID: info.id, response: "reject" }) + // }) + // })() + // await using _permSub = defer(() => { + // permUnsub?.() + // }) + const params = await Plugin.trigger( "chat.params", { @@ -889,60 +900,6 @@ export namespace SessionPrompt { return input.messages } - /** - * Maps tool names to ACP tool kinds for consistent categorization. - * - read: Tools that read data (read, glob, grep, list, webfetch, docs) - * - edit: Tools that modify state (edit, write, bash) - * - other: All other tools (MCP tools, task, todowrite, etc.) - */ - function determineToolKind(toolName: string): "read" | "edit" | "other" { - const readTools = [ - "read", - "glob", - "grep", - "list", - "webfetch", - "context7_resolve_library_id", - "context7_get_library_docs", - ] - const editTools = ["edit", "write", "bash"] - - if (readTools.includes(toolName.toLowerCase())) return "read" - if (editTools.includes(toolName.toLowerCase())) return "edit" - return "other" - } - - /** - * Extracts file/directory locations from tool inputs for ACP notifications. - * Returns array of {path} objects that ACP clients can use for navigation. - * - * Examples: - * - read({filePath: "/foo/bar.ts"}) -> [{path: "/foo/bar.ts"}] - * - glob({pattern: "*.ts", path: "/src"}) -> [{path: "/src"}] - * - bash({command: "ls"}) -> [] (no file references) - */ - function extractLocations(toolName: string, input: Record): { path: string }[] { - try { - switch (toolName.toLowerCase()) { - case "read": - case "edit": - case "write": - return input["filePath"] ? [{ path: input["filePath"] }] : [] - case "glob": - case "grep": - return input["path"] ? [{ path: input["path"] }] : [] - case "bash": - return [] - case "list": - return input["path"] ? [{ path: input["path"] }] : [] - default: - return [] - } - } catch { - return [] - } - } - export type Processor = Awaited> async function createProcessor(input: { sessionID: string @@ -951,10 +908,6 @@ export namespace SessionPrompt { system: string[] agent: string abort: AbortSignal - acpConnection?: { - connection: any - sessionId: string - } }) { const toolcalls: Record = {} let snapshot: string | undefined @@ -1052,7 +1005,7 @@ export namespace SessionPrompt { const part = reasoningMap[value.id] part.text += value.text if (value.providerMetadata) part.metadata = value.providerMetadata - if (part.text) await Session.updatePart(part) + if (part.text) await Session.updatePart({ part, delta: value.text }) } break @@ -1084,26 +1037,6 @@ export namespace SessionPrompt { }, }) toolcalls[value.id] = part as MessageV2.ToolPart - - // Notify ACP client of pending tool call - if (input.acpConnection) { - await input.acpConnection.connection - .sessionUpdate({ - sessionId: input.acpConnection.sessionId, - update: { - sessionUpdate: "tool_call", - toolCallId: value.id, - title: value.toolName, - kind: determineToolKind(value.toolName), - status: "pending", - locations: [], // Will be populated when we have input - rawInput: {}, - }, - }) - .catch((err: Error) => { - log.error("failed to send tool pending to ACP", { error: err }) - }) - } break case "tool-input-delta": @@ -1128,24 +1061,6 @@ export namespace SessionPrompt { metadata: value.providerMetadata, }) toolcalls[value.toolCallId] = part as MessageV2.ToolPart - - // Notify ACP client that tool is running - if (input.acpConnection) { - await input.acpConnection.connection - .sessionUpdate({ - sessionId: input.acpConnection.sessionId, - update: { - sessionUpdate: "tool_call_update", - toolCallId: value.toolCallId, - status: "in_progress", - locations: extractLocations(value.toolName, value.input), - rawInput: value.input, - }, - }) - .catch((err: Error) => { - log.error("failed to send tool in_progress to ACP", { error: err }) - }) - } } break } @@ -1168,32 +1083,6 @@ export namespace SessionPrompt { }, }) - // Notify ACP client that tool completed - if (input.acpConnection) { - await input.acpConnection.connection - .sessionUpdate({ - sessionId: input.acpConnection.sessionId, - update: { - sessionUpdate: "tool_call_update", - toolCallId: value.toolCallId, - status: "completed", - content: [ - { - type: "content", - content: { - type: "text", - text: value.output.output, - }, - }, - ], - rawOutput: value.output, - }, - }) - .catch((err: Error) => { - log.error("failed to send tool completed to ACP", { error: err }) - }) - } - delete toolcalls[value.toolCallId] } break @@ -1216,34 +1105,6 @@ export namespace SessionPrompt { }, }) - // Notify ACP client of tool error - if (input.acpConnection) { - await input.acpConnection.connection - .sessionUpdate({ - sessionId: input.acpConnection.sessionId, - update: { - sessionUpdate: "tool_call_update", - toolCallId: value.toolCallId, - status: "failed", - content: [ - { - type: "content", - content: { - type: "text", - text: `Error: ${(value.error as any).toString()}`, - }, - }, - ], - rawOutput: { - error: (value.error as any).toString(), - }, - }, - }) - .catch((err: Error) => { - log.error("failed to send tool error to ACP", { error: err }) - }) - } - if (value.error instanceof Permission.RejectedError) { blocked = true } @@ -1322,26 +1183,11 @@ export namespace SessionPrompt { if (currentText) { currentText.text += value.text if (value.providerMetadata) currentText.metadata = value.providerMetadata - if (currentText.text) await Session.updatePart(currentText) - - // Send streaming chunk to ACP client - if (input.acpConnection && value.text) { - await input.acpConnection.connection - .sessionUpdate({ - sessionId: input.acpConnection.sessionId, - update: { - sessionUpdate: "agent_message_chunk", - content: { - type: "text", - text: value.text, - }, - }, - }) - .catch((err: Error) => { - log.error("failed to send text delta to ACP", { error: err }) - // Don't fail the whole request if ACP notification fails - }) - } + if (currentText.text) + await Session.updatePart({ + part: currentText, + delta: value.text, + }) } break