From cd42503e2cea32f6bceee46fbd9f257ba9a0b923 Mon Sep 17 00:00:00 2001 From: Frank Date: Mon, 8 Sep 2025 15:46:57 -0400 Subject: [PATCH] Zen: telemetry --- .../app/src/routes/zen/v1/chat/completions.ts | 123 +++++++++--------- cloud/function/src/log-processor.ts | 49 +++++++ cloud/function/sst-env.d.ts | 5 + cloud/resource/sst-env.d.ts | 5 + infra/cloud.ts | 17 ++- packages/function/sst-env.d.ts | 5 + sst-env.d.ts | 7 + 7 files changed, 148 insertions(+), 63 deletions(-) create mode 100644 cloud/function/src/log-processor.ts diff --git a/cloud/app/src/routes/zen/v1/chat/completions.ts b/cloud/app/src/routes/zen/v1/chat/completions.ts index 9805176c..9765ea08 100644 --- a/cloud/app/src/routes/zen/v1/chat/completions.ts +++ b/cloud/app/src/routes/zen/v1/chat/completions.ts @@ -22,7 +22,7 @@ const MODELS = { // headerMappings: {}, // }, "qwen/qwen3-coder": { - id: "qwen/qwen3-coder", + id: "qwen/qwen3-coder" as const, auth: true, api: "https://inference.baseten.co", apiKey: Resource.BASETEN_API_KEY.value, @@ -37,7 +37,7 @@ const MODELS = { headerMappings: {}, }, "grok-code": { - id: "x-ai/grok-code-fast-1", + id: "x-ai/grok-code-fast-1" as const, auth: false, api: "https://api.x.ai", apiKey: Resource.XAI_API_KEY.value, @@ -68,6 +68,11 @@ export async function POST(input: APIEvent) { try { const url = new URL(input.request.url) const body = await input.request.json() + logMetric({ + is_tream: !!body.stream, + session: input.request.headers.get("x-opencode-session"), + request: input.request.headers.get("x-opencode-request"), + }) const MODEL = validateModel() const apiKey = await authenticate() const isFree = FREE_WORKSPACES.includes(apiKey?.workspaceID ?? "") @@ -106,9 +111,11 @@ export async function POST(input: APIEvent) { // Handle non-streaming response if (!body.stream) { - const body = await res.json() - await trackUsage(body) - return new Response(JSON.stringify(body), { + const json = await res.json() + const body = JSON.stringify(json) + logMetric({ response_length: body.length }) + await trackUsage(json) + return new Response(body, { status: res.status, statusText: res.statusText, headers: resHeaders, @@ -121,16 +128,26 @@ export async function POST(input: APIEvent) { const reader = res.body?.getReader() const decoder = new TextDecoder() let buffer = "" + let responseLength = 0 + let startTimestamp = Date.now() + let receivedFirstByte = false function pump(): Promise { return ( reader?.read().then(async ({ done, value }) => { if (done) { + logMetric({ response_length: responseLength }) c.close() return } + if (!receivedFirstByte) { + receivedFirstByte = true + logMetric({ time_to_first_byte: Date.now() - startTimestamp }) + } + buffer += decoder.decode(value, { stream: true }) + responseLength += value.length const parts = buffer.split("\n\n") buffer = parts.pop() ?? "" @@ -169,7 +186,9 @@ export async function POST(input: APIEvent) { if (!(body.model in MODELS)) { throw new ModelError(`Model ${body.model} not supported`) } - return MODELS[body.model as keyof typeof MODELS] + const model = MODELS[body.model as keyof typeof MODELS] + logMetric({ model: model.id }) + return model } async function authenticate() { @@ -190,9 +209,12 @@ export async function POST(input: APIEvent) { ) if (!key) throw new AuthError("Invalid API key.") + logMetric({ + api_key: key.id, + workspace: key.workspaceID, + }) return key } catch (e) { - console.log(e) // ignore error if model does not require authentication if (!MODEL.auth) return throw e @@ -216,10 +238,6 @@ export async function POST(input: APIEvent) { } async function trackUsage(chunk: any) { - console.log(`trackUsage ${apiKey}`) - - if (!apiKey) return - const usage = chunk.usage const inputTokens = usage.prompt_tokens ?? 0 const outputTokens = usage.completion_tokens ?? 0 @@ -228,14 +246,30 @@ export async function POST(input: APIEvent) { //const cacheWriteTokens = providerMetadata?.["anthropic"]?.["cacheCreationInputTokens"] ?? 0 const cacheWriteTokens = 0 - const inputCost = MODEL.cost.input * inputTokens - const outputCost = MODEL.cost.output * outputTokens - const reasoningCost = MODEL.cost.reasoning * reasoningTokens - const cacheReadCost = MODEL.cost.cacheRead * cacheReadTokens - const cacheWriteCost = MODEL.cost.cacheWrite * cacheWriteTokens - const costInCents = (inputCost + outputCost + reasoningCost + cacheReadCost + cacheWriteCost) * 100 - const cost = isFree ? 0 : centsToMicroCents(costInCents) + const inputCost = MODEL.cost.input * inputTokens * 100 + const outputCost = MODEL.cost.output * outputTokens * 100 + const reasoningCost = MODEL.cost.reasoning * reasoningTokens * 100 + const cacheReadCost = MODEL.cost.cacheRead * cacheReadTokens * 100 + const cacheWriteCost = MODEL.cost.cacheWrite * cacheWriteTokens * 100 + const totalCostInCent = inputCost + outputCost + reasoningCost + cacheReadCost + cacheWriteCost + logMetric({ + "tokens.input": inputTokens, + "tokens.output": outputTokens, + "tokens.reasoning": reasoningTokens, + "tokens.cache_read": cacheReadTokens, + "tokens.cache_write": cacheWriteTokens, + "cost.input": Math.round(inputCost), + "cost.output": Math.round(outputCost), + "cost.reasoning": Math.round(reasoningCost), + "cost.cache_read": Math.round(cacheReadCost), + "cost.cache_write": Math.round(cacheWriteCost), + "cost.total": Math.round(totalCostInCent), + }) + + if (!apiKey) return + + const cost = isFree ? 0 : centsToMicroCents(totalCostInCent) await Database.transaction(async (tx) => { await tx.insert(UsageTable).values({ workspaceID: apiKey.workspaceID, @@ -264,47 +298,18 @@ export async function POST(input: APIEvent) { ) } } catch (error: any) { - if (error instanceof AuthError) { - return new Response( - JSON.stringify({ - error: { - message: error.message, - type: "invalid_request_error", - param: null, - code: "unauthorized", - }, - }), - { - status: 401, - }, - ) - } - - if (error instanceof CreditsError) { - return new Response( - JSON.stringify({ - error: { - message: error.message, - type: "insufficient_quota", - param: null, - code: "insufficient_quota", - }, - }), - { - status: 401, - }, - ) - } - - if (error instanceof ModelError) { - return new Response(JSON.stringify({ error: { message: error.message } }), { - status: 401, - }) - } - - console.log(error) - return new Response(JSON.stringify({ error: { message: error.message } }), { - status: 500, + logMetric({ + "error.type": error.constructor.name, + "error.message": error.message, }) + + if (error instanceof AuthError || error instanceof CreditsError || error instanceof ModelError) + return new Response(JSON.stringify({ error: { message: error.message } }), { status: 401 }) + + return new Response(JSON.stringify({ error: { message: error.message } }), { status: 500 }) + } + + function logMetric(values: Record) { + console.log(`_metric:${JSON.stringify(values)}`) } } diff --git a/cloud/function/src/log-processor.ts b/cloud/function/src/log-processor.ts new file mode 100644 index 00000000..70fcf5a4 --- /dev/null +++ b/cloud/function/src/log-processor.ts @@ -0,0 +1,49 @@ +import { Resource } from "@opencode/cloud-resource" +import type { TraceItem } from "@cloudflare/workers-types" + +export default { + async tail(events: TraceItem[]) { + for (const event of events) { + if (!event.event) continue + if (!("request" in event.event)) continue + if (event.event.request.method !== "POST") continue + + const url = new URL(event.event.request.url) + if (url.pathname !== "/zen/v1/chat/completions") return + + let metrics = { + event_type: "completions", + "cf.continent": event.event.request.cf?.continent, + "cf.country": event.event.request.cf?.country, + "cf.city": event.event.request.cf?.city, + "cf.region": event.event.request.cf?.region, + "cf.latitude": event.event.request.cf?.latitude, + "cf.longitude": event.event.request.cf?.longitude, + "cf.timezone": event.event.request.cf?.timezone, + duration: event.wallTime, + request_length: parseInt(event.event.request.headers["content-length"] ?? "0"), + status: event.event.response?.status ?? 0, + ip: event.event.request.headers["x-real-ip"], + } + for (const log of event.logs) { + for (const message of log.message) { + if (!message.startsWith("_metric:")) continue + metrics = { ...metrics, ...JSON.parse(message.slice(8)) } + } + } + console.log(JSON.stringify(metrics, null, 2)) + + const ret = await fetch("https://api.honeycomb.io/1/events/zen", { + method: "POST", + headers: { + "Content-Type": "application/json", + "X-Honeycomb-Event-Time": (event.eventTimestamp ?? Date.now()).toString(), + "X-Honeycomb-Team": Resource.HONEYCOMB_API_KEY.value, + }, + body: JSON.stringify(metrics), + }) + console.log(ret.status) + console.log(await ret.text()) + } + }, +} diff --git a/cloud/function/sst-env.d.ts b/cloud/function/sst-env.d.ts index 5478b433..efb047ff 100644 --- a/cloud/function/sst-env.d.ts +++ b/cloud/function/sst-env.d.ts @@ -50,6 +50,10 @@ declare module "sst" { "type": "sst.sst.Secret" "value": string } + "HONEYCOMB_API_KEY": { + "type": "sst.sst.Secret" + "value": string + } "STRIPE_SECRET_KEY": { "type": "sst.sst.Secret" "value": string @@ -76,6 +80,7 @@ declare module "sst" { "AuthApi": cloudflare.Service "AuthStorage": cloudflare.KVNamespace "Bucket": cloudflare.R2Bucket + "LogProcessor": cloudflare.Service } } diff --git a/cloud/resource/sst-env.d.ts b/cloud/resource/sst-env.d.ts index 5478b433..efb047ff 100644 --- a/cloud/resource/sst-env.d.ts +++ b/cloud/resource/sst-env.d.ts @@ -50,6 +50,10 @@ declare module "sst" { "type": "sst.sst.Secret" "value": string } + "HONEYCOMB_API_KEY": { + "type": "sst.sst.Secret" + "value": string + } "STRIPE_SECRET_KEY": { "type": "sst.sst.Secret" "value": string @@ -76,6 +80,7 @@ declare module "sst" { "AuthApi": cloudflare.Service "AuthStorage": cloudflare.KVNamespace "Bucket": cloudflare.R2Bucket + "LogProcessor": cloudflare.Service } } diff --git a/infra/cloud.ts b/infra/cloud.ts index 74690669..04997dad 100644 --- a/infra/cloud.ts +++ b/infra/cloud.ts @@ -1,5 +1,6 @@ import { WebhookEndpoint } from "pulumi-stripe" import { domain } from "./stage" +import log from "../packages/web/dist/_worker.js/chunks/log_GHQSQ8rj.mjs" //////////////// // DATABASE @@ -7,7 +8,7 @@ import { domain } from "./stage" const cluster = planetscale.getDatabaseOutput({ name: "opencode", - organization: "sst", + organization: "anomalyco", }) const branch = @@ -103,6 +104,7 @@ const ANTHROPIC_API_KEY = new sst.Secret("ANTHROPIC_API_KEY") const XAI_API_KEY = new sst.Secret("XAI_API_KEY") const BASETEN_API_KEY = new sst.Secret("BASETEN_API_KEY") const STRIPE_SECRET_KEY = new sst.Secret("STRIPE_SECRET_KEY") +const HONEYCOMB_API_KEY = new sst.Secret("HONEYCOMB_API_KEY") const AUTH_API_URL = new sst.Linkable("AUTH_API_URL", { properties: { value: auth.url.apply((url) => url!) }, }) @@ -114,6 +116,14 @@ const STRIPE_WEBHOOK_SECRET = new sst.Linkable("STRIPE_WEBHOOK_SECRET", { // CONSOLE //////////////// +let logProcessor +if ($app.stage === "production" || $app.stage === "frank") { + logProcessor = new sst.cloudflare.Worker("LogProcessor", { + handler: "cloud/function/src/log-processor.ts", + link: [HONEYCOMB_API_KEY], + }) +} + new sst.cloudflare.x.SolidStart("Console", { domain, path: "cloud/app", @@ -135,9 +145,8 @@ new sst.cloudflare.x.SolidStart("Console", { server: { transform: { worker: { - placement: { - mode: "smart", - }, + placement: { mode: "smart" }, + tailConsumers: logProcessor ? [{ service: logProcessor.nodes.worker.scriptName }] : [], }, }, }, diff --git a/packages/function/sst-env.d.ts b/packages/function/sst-env.d.ts index 5478b433..efb047ff 100644 --- a/packages/function/sst-env.d.ts +++ b/packages/function/sst-env.d.ts @@ -50,6 +50,10 @@ declare module "sst" { "type": "sst.sst.Secret" "value": string } + "HONEYCOMB_API_KEY": { + "type": "sst.sst.Secret" + "value": string + } "STRIPE_SECRET_KEY": { "type": "sst.sst.Secret" "value": string @@ -76,6 +80,7 @@ declare module "sst" { "AuthApi": cloudflare.Service "AuthStorage": cloudflare.KVNamespace "Bucket": cloudflare.R2Bucket + "LogProcessor": cloudflare.Service } } diff --git a/sst-env.d.ts b/sst-env.d.ts index 534b20a6..c1999089 100644 --- a/sst-env.d.ts +++ b/sst-env.d.ts @@ -64,6 +64,13 @@ declare module "sst" { "type": "sst.sst.Secret" "value": string } + "HONEYCOMB_API_KEY": { + "type": "sst.sst.Secret" + "value": string + } + "LogProcessor": { + "type": "sst.cloudflare.Worker" + } "STRIPE_SECRET_KEY": { "type": "sst.sst.Secret" "value": string