diff --git a/packages/turso-serverless/src/compat.ts b/packages/turso-serverless/src/compat.ts index c5f954389..44523d942 100644 --- a/packages/turso-serverless/src/compat.ts +++ b/packages/turso-serverless/src/compat.ts @@ -307,6 +307,11 @@ class LibSQLClient implements Client { close(): void { this._closed = true; + // Note: The libSQL client interface expects synchronous close, + // but our underlying session needs async close. We'll fire and forget. + this.session.close().catch(error => { + console.error('Error closing session:', error); + }); } } diff --git a/packages/turso-serverless/src/connection.ts b/packages/turso-serverless/src/connection.ts index a7846e8dc..3f73a08bc 100644 --- a/packages/turso-serverless/src/connection.ts +++ b/packages/turso-serverless/src/connection.ts @@ -90,6 +90,15 @@ export class Connection { const sql = `PRAGMA ${pragma}`; return this.session.execute(sql); } + + /** + * Close the connection. + * + * This sends a close request to the server to properly clean up the stream. + */ + async close(): Promise { + await this.session.close(); + } } /** diff --git a/packages/turso-serverless/src/protocol.ts b/packages/turso-serverless/src/protocol.ts index 07a94e96c..1c7654b39 100644 --- a/packages/turso-serverless/src/protocol.ts +++ b/packages/turso-serverless/src/protocol.ts @@ -18,12 +18,17 @@ export interface ExecuteResult { last_insert_rowid?: string; } +export interface NamedArg { + name: string; + value: Value; +} + export interface ExecuteRequest { type: 'execute'; stmt: { sql: string; args: Value[]; - named_args: Value[]; + named_args: NamedArg[]; want_rows: boolean; }; } @@ -32,6 +37,7 @@ export interface BatchStep { stmt: { sql: string; args: Value[]; + named_args?: NamedArg[]; want_rows: boolean; }; condition?: { @@ -52,9 +58,13 @@ export interface SequenceRequest { sql: string; } +export interface CloseRequest { + type: 'close'; +} + export interface PipelineRequest { baton: string | null; - requests: (ExecuteRequest | BatchRequest | SequenceRequest)[]; + requests: (ExecuteRequest | BatchRequest | SequenceRequest | CloseRequest)[]; } export interface PipelineResponse { @@ -63,7 +73,7 @@ export interface PipelineResponse { results: Array<{ type: 'ok' | 'error'; response?: { - type: 'execute' | 'batch' | 'sequence'; + type: 'execute' | 'batch' | 'sequence' | 'close'; result?: ExecuteResult; }; error?: { @@ -182,52 +192,71 @@ export async function executeCursor( const decoder = new TextDecoder(); let buffer = ''; - let isFirstLine = true; - let cursorResponse: CursorResponse; + let cursorResponse: CursorResponse | undefined; + + // First, read until we get the cursor response (first line) + while (!cursorResponse) { + const { done, value } = await reader.read(); + if (done) break; + + buffer += decoder.decode(value, { stream: true }); + + const newlineIndex = buffer.indexOf('\n'); + if (newlineIndex !== -1) { + const line = buffer.slice(0, newlineIndex).trim(); + buffer = buffer.slice(newlineIndex + 1); + + if (line) { + cursorResponse = JSON.parse(line); + break; + } + } + } + + if (!cursorResponse) { + throw new DatabaseError('No cursor response received'); + } async function* parseEntries(): AsyncGenerator { try { + // Process any remaining data in the buffer + let newlineIndex; + while ((newlineIndex = buffer.indexOf('\n')) !== -1) { + const line = buffer.slice(0, newlineIndex).trim(); + buffer = buffer.slice(newlineIndex + 1); + + if (line) { + yield JSON.parse(line) as CursorEntry; + } + } + + // Continue reading from the stream while (true) { const { done, value } = await reader!.read(); if (done) break; buffer += decoder.decode(value, { stream: true }); - let newlineIndex; while ((newlineIndex = buffer.indexOf('\n')) !== -1) { const line = buffer.slice(0, newlineIndex).trim(); buffer = buffer.slice(newlineIndex + 1); if (line) { - if (isFirstLine) { - cursorResponse = JSON.parse(line); - isFirstLine = false; - } else { - yield JSON.parse(line) as CursorEntry; - } + yield JSON.parse(line) as CursorEntry; } } } + + // Process any remaining data in the buffer + if (buffer.trim()) { + yield JSON.parse(buffer.trim()) as CursorEntry; + } } finally { reader!.releaseLock(); } } - const entries = parseEntries(); - - // Get the first entry to parse the cursor response - const firstEntry = await entries.next(); - if (!firstEntry.done) { - // Put the first entry back - const generator = (async function* () { - yield firstEntry.value; - yield* entries; - })(); - - return { response: cursorResponse!, entries: generator }; - } - - return { response: cursorResponse!, entries }; + return { response: cursorResponse, entries: parseEntries() }; } export async function executePipeline( diff --git a/packages/turso-serverless/src/session.ts b/packages/turso-serverless/src/session.ts index eb401fd03..74d9e06f4 100644 --- a/packages/turso-serverless/src/session.ts +++ b/packages/turso-serverless/src/session.ts @@ -7,7 +7,10 @@ import { type CursorResponse, type CursorEntry, type PipelineRequest, - type SequenceRequest + type SequenceRequest, + type CloseRequest, + type NamedArg, + type Value } from './protocol.js'; import { DatabaseError } from './error.js'; @@ -49,10 +52,10 @@ export class Session { * Execute a SQL statement and return all results. * * @param sql - The SQL statement to execute - * @param args - Optional array of parameter values + * @param args - Optional array of parameter values or object with named parameters * @returns Promise resolving to the complete result set */ - async execute(sql: string, args: any[] = []): Promise { + async execute(sql: string, args: any[] | Record = []): Promise { const { response, entries } = await this.executeRaw(sql, args); const result = await this.processCursorEntries(entries); return result; @@ -62,17 +65,31 @@ export class Session { * Execute a SQL statement and return the raw response and entries. * * @param sql - The SQL statement to execute - * @param args - Optional array of parameter values + * @param args - Optional array of parameter values or object with named parameters * @returns Promise resolving to the raw response and cursor entries */ - async executeRaw(sql: string, args: any[] = []): Promise<{ response: CursorResponse; entries: AsyncGenerator }> { + async executeRaw(sql: string, args: any[] | Record = []): Promise<{ response: CursorResponse; entries: AsyncGenerator }> { + let positionalArgs: Value[] = []; + let namedArgs: NamedArg[] = []; + + if (Array.isArray(args)) { + positionalArgs = args.map(encodeValue); + } else { + // Convert object with named parameters to NamedArg array + namedArgs = Object.entries(args).map(([name, value]) => ({ + name, + value: encodeValue(value) + })); + } + const request: CursorRequest = { baton: this.baton, batch: { steps: [{ stmt: { sql, - args: args.map(encodeValue), + args: positionalArgs, + named_args: namedArgs, want_rows: true } }] @@ -180,6 +197,7 @@ export class Session { stmt: { sql, args: [], + named_args: [], want_rows: false } })) @@ -248,4 +266,33 @@ export class Session { } } } + + /** + * Close the session. + * + * This sends a close request to the server to properly clean up the stream + * before resetting the local state. + */ + async close(): Promise { + // Only send close request if we have an active baton + if (this.baton) { + try { + const request: PipelineRequest = { + baton: this.baton, + requests: [{ + type: "close" + } as CloseRequest] + }; + + await executePipeline(this.baseUrl, this.config.authToken, request); + } catch (error) { + // Ignore errors during close, as the connection might already be closed + console.error('Error closing session:', error); + } + } + + // Reset local state + this.baton = null; + this.baseUrl = ''; + } } \ No newline at end of file diff --git a/packages/turso-serverless/src/statement.ts b/packages/turso-serverless/src/statement.ts index 72907fb2d..77e7a39e9 100644 --- a/packages/turso-serverless/src/statement.ts +++ b/packages/turso-serverless/src/statement.ts @@ -26,7 +26,7 @@ export class Statement { /** * Executes the prepared statement. * - * @param args - Optional array of parameter values for the SQL statement + * @param args - Optional array of parameter values or object with named parameters * @returns Promise resolving to the result of the statement * * @example @@ -36,7 +36,7 @@ export class Statement { * console.log(`Inserted user with ID ${result.lastInsertRowid}`); * ``` */ - async run(args: any[] = []): Promise { + async run(args: any[] | Record = []): Promise { const result = await this.session.execute(this.sql, args); return { changes: result.rowsAffected, lastInsertRowid: result.lastInsertRowid }; } @@ -44,7 +44,7 @@ export class Statement { /** * Execute the statement and return the first row. * - * @param args - Optional array of parameter values for the SQL statement + * @param args - Optional array of parameter values or object with named parameters * @returns Promise resolving to the first row or null if no results * * @example @@ -56,7 +56,7 @@ export class Statement { * } * ``` */ - async get(args: any[] = []): Promise { + async get(args: any[] | Record = []): Promise { const result = await this.session.execute(this.sql, args); return result.rows[0] || null; } @@ -64,7 +64,7 @@ export class Statement { /** * Execute the statement and return all rows. * - * @param args - Optional array of parameter values for the SQL statement + * @param args - Optional array of parameter values or object with named parameters * @returns Promise resolving to an array of all result rows * * @example @@ -74,7 +74,7 @@ export class Statement { * console.log(`Found ${activeUsers.length} active users`); * ``` */ - async all(args: any[] = []): Promise { + async all(args: any[] | Record = []): Promise { const result = await this.session.execute(this.sql, args); return result.rows; } @@ -85,7 +85,7 @@ export class Statement { * This method provides memory-efficient processing of large result sets * by streaming rows one at a time instead of loading everything into memory. * - * @param args - Optional array of parameter values for the SQL statement + * @param args - Optional array of parameter values or object with named parameters * @returns AsyncGenerator that yields individual rows * * @example @@ -97,7 +97,7 @@ export class Statement { * } * ``` */ - async *iterate(args: any[] = []): AsyncGenerator { + async *iterate(args: any[] | Record = []): AsyncGenerator { const { response, entries } = await this.session.executeRaw(this.sql, args); let columns: string[] = []; diff --git a/testing/javascript/__test__/async.test.js b/testing/javascript/__test__/async.test.js index 2b6264d84..665a60d56 100644 --- a/testing/javascript/__test__/async.test.js +++ b/testing/javascript/__test__/async.test.js @@ -44,6 +44,10 @@ test.afterEach.always(async (t) => { }); test.serial("Open in-memory database", async (t) => { + if (process.env.PROVIDER === "serverless") { + t.pass("Skipping in-memory database test for serverless"); + return; + } const [db] = await connect(":memory:"); t.is(db.memory, true); });