serverless: Fix Connection.run() implementation

This commit is contained in:
Pekka Enberg
2025-07-30 14:38:21 +03:00
parent 5663dd8c91
commit 9bd053033a

View File

@@ -192,52 +192,71 @@ export async function executeCursor(
const decoder = new TextDecoder();
let buffer = '';
let isFirstLine = true;
let cursorResponse: CursorResponse;
let cursorResponse: CursorResponse | undefined;
// First, read until we get the cursor response (first line)
while (!cursorResponse) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const newlineIndex = buffer.indexOf('\n');
if (newlineIndex !== -1) {
const line = buffer.slice(0, newlineIndex).trim();
buffer = buffer.slice(newlineIndex + 1);
if (line) {
cursorResponse = JSON.parse(line);
break;
}
}
}
if (!cursorResponse) {
throw new DatabaseError('No cursor response received');
}
async function* parseEntries(): AsyncGenerator<CursorEntry> {
try {
// Process any remaining data in the buffer
let newlineIndex;
while ((newlineIndex = buffer.indexOf('\n')) !== -1) {
const line = buffer.slice(0, newlineIndex).trim();
buffer = buffer.slice(newlineIndex + 1);
if (line) {
yield JSON.parse(line) as CursorEntry;
}
}
// Continue reading from the stream
while (true) {
const { done, value } = await reader!.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
let newlineIndex;
while ((newlineIndex = buffer.indexOf('\n')) !== -1) {
const line = buffer.slice(0, newlineIndex).trim();
buffer = buffer.slice(newlineIndex + 1);
if (line) {
if (isFirstLine) {
cursorResponse = JSON.parse(line);
isFirstLine = false;
} else {
yield JSON.parse(line) as CursorEntry;
}
yield JSON.parse(line) as CursorEntry;
}
}
}
// Process any remaining data in the buffer
if (buffer.trim()) {
yield JSON.parse(buffer.trim()) as CursorEntry;
}
} finally {
reader!.releaseLock();
}
}
const entries = parseEntries();
// Get the first entry to parse the cursor response
const firstEntry = await entries.next();
if (!firstEntry.done) {
// Put the first entry back
const generator = (async function* () {
yield firstEntry.value;
yield* entries;
})();
return { response: cursorResponse!, entries: generator };
}
return { response: cursorResponse!, entries };
return { response: cursorResponse, entries: parseEntries() };
}
export async function executePipeline(