diff --git a/packages/turso-serverless/src/protocol.ts b/packages/turso-serverless/src/protocol.ts index 152132fb5..1c7654b39 100644 --- a/packages/turso-serverless/src/protocol.ts +++ b/packages/turso-serverless/src/protocol.ts @@ -192,52 +192,71 @@ export async function executeCursor( const decoder = new TextDecoder(); let buffer = ''; - let isFirstLine = true; - let cursorResponse: CursorResponse; + let cursorResponse: CursorResponse | undefined; + + // First, read until we get the cursor response (first line) + while (!cursorResponse) { + const { done, value } = await reader.read(); + if (done) break; + + buffer += decoder.decode(value, { stream: true }); + + const newlineIndex = buffer.indexOf('\n'); + if (newlineIndex !== -1) { + const line = buffer.slice(0, newlineIndex).trim(); + buffer = buffer.slice(newlineIndex + 1); + + if (line) { + cursorResponse = JSON.parse(line); + break; + } + } + } + + if (!cursorResponse) { + throw new DatabaseError('No cursor response received'); + } async function* parseEntries(): AsyncGenerator { try { + // Process any remaining data in the buffer + let newlineIndex; + while ((newlineIndex = buffer.indexOf('\n')) !== -1) { + const line = buffer.slice(0, newlineIndex).trim(); + buffer = buffer.slice(newlineIndex + 1); + + if (line) { + yield JSON.parse(line) as CursorEntry; + } + } + + // Continue reading from the stream while (true) { const { done, value } = await reader!.read(); if (done) break; buffer += decoder.decode(value, { stream: true }); - let newlineIndex; while ((newlineIndex = buffer.indexOf('\n')) !== -1) { const line = buffer.slice(0, newlineIndex).trim(); buffer = buffer.slice(newlineIndex + 1); if (line) { - if (isFirstLine) { - cursorResponse = JSON.parse(line); - isFirstLine = false; - } else { - yield JSON.parse(line) as CursorEntry; - } + yield JSON.parse(line) as CursorEntry; } } } + + // Process any remaining data in the buffer + if (buffer.trim()) { + yield JSON.parse(buffer.trim()) as CursorEntry; + } } finally { reader!.releaseLock(); } } - const entries = parseEntries(); - - // Get the first entry to parse the cursor response - const firstEntry = await entries.next(); - if (!firstEntry.done) { - // Put the first entry back - const generator = (async function* () { - yield firstEntry.value; - yield* entries; - })(); - - return { response: cursorResponse!, entries: generator }; - } - - return { response: cursorResponse!, entries }; + return { response: cursorResponse, entries: parseEntries() }; } export async function executePipeline(