Merge 'Serverless JavaScript driver improvements' from Pekka Enberg

Closes #2349
This commit is contained in:
Pekka Enberg
2025-07-31 10:14:22 +03:00
6 changed files with 135 additions and 41 deletions

View File

@@ -307,6 +307,11 @@ class LibSQLClient implements Client {
close(): void {
this._closed = true;
// Note: The libSQL client interface expects synchronous close,
// but our underlying session needs async close. We'll fire and forget.
this.session.close().catch(error => {
console.error('Error closing session:', error);
});
}
}

View File

@@ -90,6 +90,15 @@ export class Connection {
const sql = `PRAGMA ${pragma}`;
return this.session.execute(sql);
}
/**
* Close the connection.
*
* This sends a close request to the server to properly clean up the stream.
*/
async close(): Promise<void> {
await this.session.close();
}
}
/**

View File

@@ -18,12 +18,17 @@ export interface ExecuteResult {
last_insert_rowid?: string;
}
export interface NamedArg {
name: string;
value: Value;
}
export interface ExecuteRequest {
type: 'execute';
stmt: {
sql: string;
args: Value[];
named_args: Value[];
named_args: NamedArg[];
want_rows: boolean;
};
}
@@ -32,6 +37,7 @@ export interface BatchStep {
stmt: {
sql: string;
args: Value[];
named_args?: NamedArg[];
want_rows: boolean;
};
condition?: {
@@ -52,9 +58,13 @@ export interface SequenceRequest {
sql: string;
}
export interface CloseRequest {
type: 'close';
}
export interface PipelineRequest {
baton: string | null;
requests: (ExecuteRequest | BatchRequest | SequenceRequest)[];
requests: (ExecuteRequest | BatchRequest | SequenceRequest | CloseRequest)[];
}
export interface PipelineResponse {
@@ -63,7 +73,7 @@ export interface PipelineResponse {
results: Array<{
type: 'ok' | 'error';
response?: {
type: 'execute' | 'batch' | 'sequence';
type: 'execute' | 'batch' | 'sequence' | 'close';
result?: ExecuteResult;
};
error?: {
@@ -182,52 +192,71 @@ export async function executeCursor(
const decoder = new TextDecoder();
let buffer = '';
let isFirstLine = true;
let cursorResponse: CursorResponse;
let cursorResponse: CursorResponse | undefined;
// First, read until we get the cursor response (first line)
while (!cursorResponse) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const newlineIndex = buffer.indexOf('\n');
if (newlineIndex !== -1) {
const line = buffer.slice(0, newlineIndex).trim();
buffer = buffer.slice(newlineIndex + 1);
if (line) {
cursorResponse = JSON.parse(line);
break;
}
}
}
if (!cursorResponse) {
throw new DatabaseError('No cursor response received');
}
async function* parseEntries(): AsyncGenerator<CursorEntry> {
try {
// Process any remaining data in the buffer
let newlineIndex;
while ((newlineIndex = buffer.indexOf('\n')) !== -1) {
const line = buffer.slice(0, newlineIndex).trim();
buffer = buffer.slice(newlineIndex + 1);
if (line) {
yield JSON.parse(line) as CursorEntry;
}
}
// Continue reading from the stream
while (true) {
const { done, value } = await reader!.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
let newlineIndex;
while ((newlineIndex = buffer.indexOf('\n')) !== -1) {
const line = buffer.slice(0, newlineIndex).trim();
buffer = buffer.slice(newlineIndex + 1);
if (line) {
if (isFirstLine) {
cursorResponse = JSON.parse(line);
isFirstLine = false;
} else {
yield JSON.parse(line) as CursorEntry;
}
yield JSON.parse(line) as CursorEntry;
}
}
}
// Process any remaining data in the buffer
if (buffer.trim()) {
yield JSON.parse(buffer.trim()) as CursorEntry;
}
} finally {
reader!.releaseLock();
}
}
const entries = parseEntries();
// Get the first entry to parse the cursor response
const firstEntry = await entries.next();
if (!firstEntry.done) {
// Put the first entry back
const generator = (async function* () {
yield firstEntry.value;
yield* entries;
})();
return { response: cursorResponse!, entries: generator };
}
return { response: cursorResponse!, entries };
return { response: cursorResponse, entries: parseEntries() };
}
export async function executePipeline(

View File

@@ -7,7 +7,10 @@ import {
type CursorResponse,
type CursorEntry,
type PipelineRequest,
type SequenceRequest
type SequenceRequest,
type CloseRequest,
type NamedArg,
type Value
} from './protocol.js';
import { DatabaseError } from './error.js';
@@ -49,10 +52,10 @@ export class Session {
* Execute a SQL statement and return all results.
*
* @param sql - The SQL statement to execute
* @param args - Optional array of parameter values
* @param args - Optional array of parameter values or object with named parameters
* @returns Promise resolving to the complete result set
*/
async execute(sql: string, args: any[] = []): Promise<any> {
async execute(sql: string, args: any[] | Record<string, any> = []): Promise<any> {
const { response, entries } = await this.executeRaw(sql, args);
const result = await this.processCursorEntries(entries);
return result;
@@ -62,17 +65,31 @@ export class Session {
* Execute a SQL statement and return the raw response and entries.
*
* @param sql - The SQL statement to execute
* @param args - Optional array of parameter values
* @param args - Optional array of parameter values or object with named parameters
* @returns Promise resolving to the raw response and cursor entries
*/
async executeRaw(sql: string, args: any[] = []): Promise<{ response: CursorResponse; entries: AsyncGenerator<CursorEntry> }> {
async executeRaw(sql: string, args: any[] | Record<string, any> = []): Promise<{ response: CursorResponse; entries: AsyncGenerator<CursorEntry> }> {
let positionalArgs: Value[] = [];
let namedArgs: NamedArg[] = [];
if (Array.isArray(args)) {
positionalArgs = args.map(encodeValue);
} else {
// Convert object with named parameters to NamedArg array
namedArgs = Object.entries(args).map(([name, value]) => ({
name,
value: encodeValue(value)
}));
}
const request: CursorRequest = {
baton: this.baton,
batch: {
steps: [{
stmt: {
sql,
args: args.map(encodeValue),
args: positionalArgs,
named_args: namedArgs,
want_rows: true
}
}]
@@ -180,6 +197,7 @@ export class Session {
stmt: {
sql,
args: [],
named_args: [],
want_rows: false
}
}))
@@ -248,4 +266,33 @@ export class Session {
}
}
}
/**
* Close the session.
*
* This sends a close request to the server to properly clean up the stream
* before resetting the local state.
*/
async close(): Promise<void> {
// Only send close request if we have an active baton
if (this.baton) {
try {
const request: PipelineRequest = {
baton: this.baton,
requests: [{
type: "close"
} as CloseRequest]
};
await executePipeline(this.baseUrl, this.config.authToken, request);
} catch (error) {
// Ignore errors during close, as the connection might already be closed
console.error('Error closing session:', error);
}
}
// Reset local state
this.baton = null;
this.baseUrl = '';
}
}

View File

@@ -26,7 +26,7 @@ export class Statement {
/**
* Executes the prepared statement.
*
* @param args - Optional array of parameter values for the SQL statement
* @param args - Optional array of parameter values or object with named parameters
* @returns Promise resolving to the result of the statement
*
* @example
@@ -36,7 +36,7 @@ export class Statement {
* console.log(`Inserted user with ID ${result.lastInsertRowid}`);
* ```
*/
async run(args: any[] = []): Promise<any> {
async run(args: any[] | Record<string, any> = []): Promise<any> {
const result = await this.session.execute(this.sql, args);
return { changes: result.rowsAffected, lastInsertRowid: result.lastInsertRowid };
}
@@ -44,7 +44,7 @@ export class Statement {
/**
* Execute the statement and return the first row.
*
* @param args - Optional array of parameter values for the SQL statement
* @param args - Optional array of parameter values or object with named parameters
* @returns Promise resolving to the first row or null if no results
*
* @example
@@ -56,7 +56,7 @@ export class Statement {
* }
* ```
*/
async get(args: any[] = []): Promise<any> {
async get(args: any[] | Record<string, any> = []): Promise<any> {
const result = await this.session.execute(this.sql, args);
return result.rows[0] || null;
}
@@ -64,7 +64,7 @@ export class Statement {
/**
* Execute the statement and return all rows.
*
* @param args - Optional array of parameter values for the SQL statement
* @param args - Optional array of parameter values or object with named parameters
* @returns Promise resolving to an array of all result rows
*
* @example
@@ -74,7 +74,7 @@ export class Statement {
* console.log(`Found ${activeUsers.length} active users`);
* ```
*/
async all(args: any[] = []): Promise<any[]> {
async all(args: any[] | Record<string, any> = []): Promise<any[]> {
const result = await this.session.execute(this.sql, args);
return result.rows;
}
@@ -85,7 +85,7 @@ export class Statement {
* This method provides memory-efficient processing of large result sets
* by streaming rows one at a time instead of loading everything into memory.
*
* @param args - Optional array of parameter values for the SQL statement
* @param args - Optional array of parameter values or object with named parameters
* @returns AsyncGenerator that yields individual rows
*
* @example
@@ -97,7 +97,7 @@ export class Statement {
* }
* ```
*/
async *iterate(args: any[] = []): AsyncGenerator<any> {
async *iterate(args: any[] | Record<string, any> = []): AsyncGenerator<any> {
const { response, entries } = await this.session.executeRaw(this.sql, args);
let columns: string[] = [];

View File

@@ -44,6 +44,10 @@ test.afterEach.always(async (t) => {
});
test.serial("Open in-memory database", async (t) => {
if (process.env.PROVIDER === "serverless") {
t.pass("Skipping in-memory database test for serverless");
return;
}
const [db] = await connect(":memory:");
t.is(db.memory, true);
});