This commit is contained in:
Dax Raad
2025-05-30 13:58:46 -04:00
parent 017a440a70
commit 42c7880858
4 changed files with 99 additions and 96 deletions

View File

@@ -7,7 +7,10 @@ type Bindings = {
}
export class SyncServer extends DurableObject {
async fetch(req: Request) {
constructor(ctx: DurableObjectState, env: Bindings) {
super(ctx, env)
}
async fetch() {
console.log("SyncServer subscribe")
const webSocketPair = new WebSocketPair()
@@ -16,11 +19,12 @@ export class SyncServer extends DurableObject {
this.ctx.acceptWebSocket(server)
setTimeout(async () => {
const data = await this.ctx.storage.list()
data.forEach((content: any, key) => {
if (key === "shareID") return
server.send(JSON.stringify({ key, content: content }))
const data = await this.ctx.storage.list({
prefix: "data/",
})
for (const [key, content] of Object.entries(data)) {
server.send(JSON.stringify({ key, content }))
}
}, 0)
return new Response(null, {
@@ -35,25 +39,54 @@ export class SyncServer extends DurableObject {
ws.close(code, "Durable Object is closing WebSocket")
}
async publish(key: string, content: any) {
await this.ctx.storage.put(key, content)
async publish(secret: string, key: string, content: any) {
if (secret !== (await this.getSecret())) throw new Error("Invalid secret")
const sessionID = await this.getSessionID()
if (
!key.startsWith(`session/info/${sessionID}`) &&
!key.startsWith(`session/message/${sessionID}/`)
)
return new Response("Error: Invalid key", { status: 400 })
// store message
await Resource.Bucket.put(`${key}.json`, JSON.stringify(content))
await this.ctx.storage.put("data/" + key, content)
const clients = this.ctx.getWebSockets()
console.log("SyncServer publish", key, "to", clients.length, "subscribers")
clients.forEach((client) => client.send(JSON.stringify({ key, content })))
}
async setShareID(shareID: string) {
await this.ctx.storage.put("shareID", shareID)
public async share(sessionID: string) {
let secret = await this.getSecret()
if (secret) return secret
secret = randomUUID()
await this.ctx.storage.put("secret", secret)
await this.ctx.storage.put("sessionID", sessionID)
return secret
}
async getShareID() {
return this.ctx.storage.get<string>("shareID")
private async getSecret() {
return this.ctx.storage.get<string>("secret")
}
async clear() {
private async getSessionID() {
return this.ctx.storage.get<string>("sessionID")
}
async clear(secret: string) {
await this.assertSecret(secret)
await this.ctx.storage.deleteAll()
}
private async assertSecret(secret: string) {
if (secret !== (await this.getSecret())) throw new Error("Invalid secret")
}
static shortName(id: string) {
return id.substring(id.length - 8)
}
}
export default {
@@ -71,103 +104,60 @@ export default {
if (request.method === "POST" && method === "share_create") {
const body = await request.json<any>()
const sessionID = body.sessionID
// Get existing shareID
const id = env.SYNC_SERVER.idFromName(sessionID)
const short = SyncServer.shortName(sessionID)
const id = env.SYNC_SERVER.idFromName(short)
const stub = env.SYNC_SERVER.get(id)
if (await stub.getShareID())
return new Response("Error: Session already shared", { status: 400 })
const shareID = randomUUID()
await stub.setShareID(shareID)
return new Response(JSON.stringify({ shareID }), {
headers: { "Content-Type": "application/json" },
})
const secret = await stub.share(sessionID)
return new Response(
JSON.stringify({
secret,
url: "https://dev.opencode.ai/s?id=" + short,
}),
{
headers: { "Content-Type": "application/json" },
},
)
}
if (request.method === "POST" && method === "share_delete") {
const body = await request.json<any>()
const sessionID = body.sessionID
const shareID = body.shareID
// validate shareID
if (!shareID)
return new Response("Error: Share ID is required", { status: 400 })
// Delete from durable object
const id = env.SYNC_SERVER.idFromName(sessionID)
const secret = body.secret
const id = env.SYNC_SERVER.idFromName(SyncServer.shortName(sessionID))
const stub = env.SYNC_SERVER.get(id)
if ((await stub.getShareID()) !== shareID)
return new Response("Error: Share ID does not match", { status: 400 })
await stub.clear()
await stub.clear(secret)
return new Response(JSON.stringify({}), {
headers: { "Content-Type": "application/json" },
})
}
if (request.method === "POST" && method === "share_sync") {
const body = await request.json<any>()
const sessionID = body.sessionID
const shareID = body.shareID
const key = body.key
const content = body.content
console.log("share_sync", sessionID, shareID, key, content)
// validate key
if (
!key.startsWith(`session/info/${sessionID}`) &&
!key.startsWith(`session/message/${sessionID}/`)
)
return new Response("Error: Invalid key", { status: 400 })
// validate shareID
if (!shareID)
return new Response("Error: Share ID is required", { status: 400 })
// send message to server
const id = env.SYNC_SERVER.idFromName(sessionID)
const body = await request.json<{
sessionID: string
secret: string
key: string
content: any
}>()
const name = SyncServer.shortName(body.sessionID)
const id = env.SYNC_SERVER.idFromName(name)
const stub = env.SYNC_SERVER.get(id)
if ((await stub.getShareID()) !== shareID)
return new Response("Error: Share ID does not match", { status: 400 })
await stub.publish(key, content)
// store message
await Resource.Bucket.put(
`${shareID}/${key}.json`,
JSON.stringify(content),
)
await stub.publish(body.secret, body.key, body.content)
return new Response(JSON.stringify({}), {
headers: { "Content-Type": "application/json" },
})
}
if (request.method === "GET" && method === "share_poll") {
// Expect to receive a WebSocket Upgrade request.
// If there is one, accept the request and return a WebSocket Response.
const upgradeHeader = request.headers.get("Upgrade")
if (!upgradeHeader || upgradeHeader !== "websocket") {
return new Response("Error: Upgrade header is required", {
status: 426,
})
}
// get query parameters
const sessionID = url.searchParams.get("id")
if (!sessionID)
const id = url.searchParams.get("id")
if (!id)
return new Response("Error: Share ID is required", { status: 400 })
// subscribe to server
const id = env.SYNC_SERVER.idFromName(sessionID)
const stub = env.SYNC_SERVER.get(id)
if (!(await stub.getShareID()))
return new Response("Error: Session not shared", { status: 400 })
const stub = env.SYNC_SERVER.get(env.SYNC_SERVER.idFromName(id))
return stub.fetch(request)
}
},