move compute to the main thread for browser and node

- now, most of the work is happening on the main thread
- for database in browser, we still have dedicated WebWorker - but it is used only for OPFS access and only for that
- for syn in browser we still offload sync operations to the WebWorker
This commit is contained in:
Nikita Sivukhin
2025-09-17 21:38:36 +04:00
parent 635ac1c8be
commit 974feac27b
19 changed files with 471 additions and 275 deletions

View File

@@ -24,46 +24,94 @@ interface BrowserImports {
is_web_worker(): boolean;
lookup_file(ptr: number, len: number): number;
read(handle: number, ptr: number, len: number, offset: number): number;
read_async(handle: number, ptr: number, len: number, offset: number, c: number);
write(handle: number, ptr: number, len: number, offset: number): number;
write_async(handle: number, ptr: number, len: number, offset: number, c: number);
sync(handle: number): number;
sync_async(handle: number, c: number);
truncate(handle: number, len: number): number;
truncate_async(handle: number, len: number, c: number);
size(handle: number): number;
}
function panic(name): never {
function panicMain(name): never {
throw new Error(`method ${name} must be invoked only from the worker thread`);
}
function panicWorker(name): never {
throw new Error(`method ${name} must be invoked only from the main thread`);
}
const MainDummyImports: BrowserImports = {
is_web_worker: function (): boolean {
let completeOpfs: any = null;
function mainImports(worker: Worker): BrowserImports {
return {
is_web_worker(): boolean {
return false;
},
lookup_file: function (ptr: number, len: number): number {
panic("lookup_file")
write_async(handle, ptr, len, offset, c) {
writeFileAtWorker(worker, handle, ptr, len, offset)
.then(result => {
completeOpfs(c, result);
}, err => {
console.error('write_async', err);
completeOpfs(c, -1);
});
},
read: function (handle: number, ptr: number, len: number, offset: number): number {
panic("read")
sync_async(handle, c) {
syncFileAtWorker(worker, handle)
.then(result => {
completeOpfs(c, result);
}, err => {
console.error('sync_async', err);
completeOpfs(c, -1);
});
},
write: function (handle: number, ptr: number, len: number, offset: number): number {
panic("write")
read_async(handle, ptr, len, offset, c) {
readFileAtWorker(worker, handle, ptr, len, offset)
.then(result => {
completeOpfs(c, result);
}, err => {
console.error('read_async', err);
completeOpfs(c, -1);
});
},
sync: function (handle: number): number {
panic("sync")
truncate_async(handle, len, c) {
truncateFileAtWorker(worker, handle, len)
.then(result => {
completeOpfs(c, result);
}, err => {
console.error('truncate_async', err);
completeOpfs(c, -1);
});
},
truncate: function (handle: number, len: number): number {
panic("truncate")
lookup_file(ptr, len): number {
panicMain("lookup_file")
},
size: function (handle: number): number {
panic("size")
read(handle, ptr, len, offset): number {
panicMain("read")
},
write(handle, ptr, len, offset): number {
panicMain("write")
},
sync(handle): number {
panicMain("sync")
},
truncate(handle, len): number {
panicMain("truncate")
},
size(handle): number {
panicMain("size")
}
};
};
function workerImports(opfs: OpfsDirectory, memory: WebAssembly.Memory): BrowserImports {
return {
is_web_worker: function (): boolean {
is_web_worker(): boolean {
return true;
},
lookup_file: function (ptr: number, len: number): number {
lookup_file(ptr, len): number {
try {
const handle = opfs.lookupFileHandle(getStringFromMemory(memory, ptr, len));
return handle == null ? -404 : handle;
@@ -71,29 +119,28 @@ function workerImports(opfs: OpfsDirectory, memory: WebAssembly.Memory): Browser
return -1;
}
},
read: function (handle: number, ptr: number, len: number, offset: number): number {
read(handle, ptr, len, offset): number {
try {
return opfs.read(handle, getUint8ArrayFromMemory(memory, ptr, len), offset);
} catch (e) {
return -1;
}
},
write: function (handle: number, ptr: number, len: number, offset: number): number {
write(handle, ptr, len, offset): number {
try {
return opfs.write(handle, getUint8ArrayFromMemory(memory, ptr, len), offset)
} catch (e) {
return -1;
}
},
sync: function (handle: number): number {
sync(handle): number {
try {
opfs.sync(handle);
return 0;
return opfs.sync(handle);
} catch (e) {
return -1;
}
},
truncate: function (handle: number, len: number): number {
truncate(handle, len): number {
try {
opfs.truncate(handle, len);
return 0;
@@ -101,13 +148,25 @@ function workerImports(opfs: OpfsDirectory, memory: WebAssembly.Memory): Browser
return -1;
}
},
size: function (handle: number): number {
size(handle): number {
try {
return opfs.size(handle);
} catch (e) {
return -1;
}
}
},
read_async(handle, ptr, len, offset, completion) {
panicWorker("read_async")
},
write_async(handle, ptr, len, offset, completion) {
panicWorker("write_async")
},
sync_async(handle, completion) {
panicWorker("sync_async")
},
truncate_async(handle, len, c) {
panicWorker("truncate_async")
},
}
}
@@ -175,10 +234,11 @@ class OpfsDirectory {
throw e;
}
}
sync(handle: number) {
sync(handle: number): number {
try {
const file = this.fileByHandle.get(handle);
file.flush();
return 0;
} catch (e) {
console.error('sync', handle, e);
throw e;
@@ -187,8 +247,8 @@ class OpfsDirectory {
truncate(handle: number, size: number) {
try {
const file = this.fileByHandle.get(handle);
const result = file.truncate(size);
return result;
file.truncate(size);
return 0;
} catch (e) {
console.error('truncate', handle, size, e);
throw e;
@@ -214,7 +274,7 @@ function waitForWorkerResponse(worker: Worker, id: number): Promise<any> {
if (msg.data.error != null) {
waitReject(msg.data.error)
} else {
waitResolve()
waitResolve(msg.data.result)
}
cleanup();
}
@@ -229,6 +289,38 @@ function waitForWorkerResponse(worker: Worker, id: number): Promise<any> {
return result;
}
function readFileAtWorker(worker: Worker, handle: number, ptr: number, len: number, offset: number) {
workerRequestId += 1;
const currentId = workerRequestId;
const promise = waitForWorkerResponse(worker, currentId);
worker.postMessage({ __turso__: "read_async", handle: handle, ptr: ptr, len: len, offset: offset, id: currentId });
return promise;
}
function writeFileAtWorker(worker: Worker, handle: number, ptr: number, len: number, offset: number) {
workerRequestId += 1;
const currentId = workerRequestId;
const promise = waitForWorkerResponse(worker, currentId);
worker.postMessage({ __turso__: "write_async", handle: handle, ptr: ptr, len: len, offset: offset, id: currentId });
return promise;
}
function syncFileAtWorker(worker: Worker, handle: number) {
workerRequestId += 1;
const currentId = workerRequestId;
const promise = waitForWorkerResponse(worker, currentId);
worker.postMessage({ __turso__: "sync_async", handle: handle, id: currentId });
return promise;
}
function truncateFileAtWorker(worker: Worker, handle: number, len: number) {
workerRequestId += 1;
const currentId = workerRequestId;
const promise = waitForWorkerResponse(worker, currentId);
worker.postMessage({ __turso__: "truncate_async", handle: handle, len: len, id: currentId });
return promise;
}
function registerFileAtWorker(worker: Worker, path: string): Promise<void> {
workerRequestId += 1;
const currentId = workerRequestId;
@@ -299,12 +391,25 @@ function setupWebWorker() {
self.postMessage({ id: e.data.id, error: error });
}
return;
} else if (e.data.__turso__ == 'read_async') {
let result = opfs.read(e.data.handle, getUint8ArrayFromMemory(memory, e.data.ptr, e.data.len), e.data.offset);
self.postMessage({ id: e.data.id, result: result });
} else if (e.data.__turso__ == 'write_async') {
let result = opfs.write(e.data.handle, getUint8ArrayFromMemory(memory, e.data.ptr, e.data.len), e.data.offset);
self.postMessage({ id: e.data.id, result: result });
} else if (e.data.__turso__ == 'sync_async') {
let result = opfs.sync(e.data.handle);
self.postMessage({ id: e.data.id, result: result });
} else if (e.data.__turso__ == 'truncate_async') {
let result = opfs.truncate(e.data.handle, e.data.len);
self.postMessage({ id: e.data.id, result: result });
}
handler.handle(e)
}
}
async function setupMainThread(wasmFile: ArrayBuffer, factory: () => Worker): Promise<any> {
const worker = factory();
const __emnapiContext = __emnapiGetDefaultContext()
const __wasi = new __WASI({
version: 'preview1',
@@ -322,13 +427,13 @@ async function setupMainThread(wasmFile: ArrayBuffer, factory: () => Worker): Pr
context: __emnapiContext,
asyncWorkPoolSize: 1,
wasi: __wasi,
onCreateWorker() { return factory() },
onCreateWorker() { return worker; },
overwriteImports(importObject) {
importObject.env = {
...importObject.env,
...importObject.napi,
...importObject.emnapi,
...MainDummyImports,
...mainImports(worker),
memory: __sharedMemory,
}
return importObject
@@ -340,8 +445,9 @@ async function setupMainThread(wasmFile: ArrayBuffer, factory: () => Worker): Pr
}
}
},
})
});
completeOpfs = __napiModule.exports.completeOpfs;
return __napiModule;
}
export { OpfsDirectory, workerImports, MainDummyImports, waitForWorkerResponse, registerFileAtWorker, unregisterFileAtWorker, isWebWorker, setupWebWorker, setupMainThread }
export { OpfsDirectory, workerImports, mainImports as MainDummyImports, waitForWorkerResponse, registerFileAtWorker, unregisterFileAtWorker, isWebWorker, setupWebWorker, setupMainThread }

View File

@@ -20,5 +20,5 @@ export const Database = napiModule.exports.Database
export const Opfs = napiModule.exports.Opfs
export const OpfsFile = napiModule.exports.OpfsFile
export const Statement = napiModule.exports.Statement
export const connect = napiModule.exports.connect
export const connectDbAsync = napiModule.exports.connectDbAsync
export const initThreadPool = napiModule.exports.initThreadPool

View File

@@ -18,5 +18,5 @@ export const Database = napiModule.exports.Database
export const Opfs = napiModule.exports.Opfs
export const OpfsFile = napiModule.exports.OpfsFile
export const Statement = napiModule.exports.Statement
export const connect = napiModule.exports.connect
export const connectDbAsync = napiModule.exports.connectDbAsync
export const initThreadPool = napiModule.exports.initThreadPool

View File

@@ -21,5 +21,5 @@ export const Database = napiModule.exports.Database
export const Opfs = napiModule.exports.Opfs
export const OpfsFile = napiModule.exports.OpfsFile
export const Statement = napiModule.exports.Statement
export const connect = napiModule.exports.connect
export const connectDbAsync = napiModule.exports.connectDbAsync
export const initThreadPool = napiModule.exports.initThreadPool

View File

@@ -7,7 +7,7 @@ let napiModule = {
Opfs: {} as any,
OpfsFile: {} as any,
Statement: {} as any,
connect: {} as any,
connectDbAsync: {} as any,
initThreadPool: {} as any,
}
};
@@ -37,5 +37,5 @@ export const Database = napiModule.exports.Database
export const Opfs = napiModule.exports.Opfs
export const OpfsFile = napiModule.exports.OpfsFile
export const Statement = napiModule.exports.Statement
export const connect = napiModule.exports.connect
export const connectDbAsync = napiModule.exports.connectDbAsync
export const initThreadPool = napiModule.exports.initThreadPool

View File

@@ -1,6 +1,6 @@
import { DatabaseOpts, SqliteError, } from "@tursodatabase/database-common"
import { connect as promiseConnect, Database } from "./promise.js";
import { connect as nativeConnect, initThreadPool, MainWorker } from "./index-bundle.js";
import { Database, connect as promiseConnect } from "./promise.js";
import { initThreadPool, MainWorker, connectDbAsync } from "./index-bundle.js";
/**
* Creates a new database connection asynchronously.
@@ -10,13 +10,19 @@ import { connect as nativeConnect, initThreadPool, MainWorker } from "./index-bu
* @returns {Promise<Database>} - A promise that resolves to a Database instance.
*/
async function connect(path: string, opts: DatabaseOpts = {}): Promise<Database> {
return await promiseConnect(path, opts, nativeConnect, async () => {
const init = async () => {
await initThreadPool();
if (MainWorker == null) {
throw new Error("panic: MainWorker is not initialized");
}
return MainWorker;
});
};
return await promiseConnect(
path,
opts,
connectDbAsync,
init
);
}
export { connect, Database, SqliteError }

View File

@@ -1,6 +1,6 @@
import { DatabaseOpts, SqliteError, } from "@tursodatabase/database-common"
import { connect as promiseConnect, Database } from "./promise.js";
import { connect as nativeConnect, initThreadPool, MainWorker } from "./index-default.js";
import { Database, connect as promiseConnect } from "./promise.js";
import { initThreadPool, MainWorker, connectDbAsync } from "./index-default.js";
/**
* Creates a new database connection asynchronously.
@@ -10,13 +10,19 @@ import { connect as nativeConnect, initThreadPool, MainWorker } from "./index-de
* @returns {Promise<Database>} - A promise that resolves to a Database instance.
*/
async function connect(path: string, opts: DatabaseOpts = {}): Promise<Database> {
return await promiseConnect(path, opts, nativeConnect, async () => {
const init = async () => {
await initThreadPool();
if (MainWorker == null) {
throw new Error("panic: MainWorker is not initialized");
}
return MainWorker;
});
};
return await promiseConnect(
path,
opts,
connectDbAsync,
init
);
}
export { connect, Database, SqliteError }

View File

@@ -1,6 +1,6 @@
import { DatabaseOpts, SqliteError, } from "@tursodatabase/database-common"
import { connect as promiseConnect, Database } from "./promise.js";
import { connect as nativeConnect, initThreadPool, MainWorker } from "./index-turbopack-hack.js";
import { Database, connect as promiseConnect } from "./promise.js";
import { initThreadPool, MainWorker, connectDbAsync } from "./index-turbopack-hack.js";
/**
* Creates a new database connection asynchronously.
@@ -10,13 +10,19 @@ import { connect as nativeConnect, initThreadPool, MainWorker } from "./index-tu
* @returns {Promise<Database>} - A promise that resolves to a Database instance.
*/
async function connect(path: string, opts: DatabaseOpts = {}): Promise<Database> {
return await promiseConnect(path, opts, nativeConnect, async () => {
const init = async () => {
await initThreadPool();
if (MainWorker == null) {
throw new Error("panic: MainWorker is not initialized");
}
return MainWorker;
});
};
return await promiseConnect(
path,
opts,
connectDbAsync,
init
);
}
export { connect, Database, SqliteError }

View File

@@ -1,6 +1,6 @@
import { DatabaseOpts, SqliteError, } from "@tursodatabase/database-common"
import { connect as promiseConnect, Database } from "./promise.js";
import { connect as nativeConnect, initThreadPool, MainWorker } from "./index-vite-dev-hack.js";
import { Database, connect as promiseConnect } from "./promise.js";
import { initThreadPool, MainWorker, connectDbAsync } from "./index-vite-dev-hack.js";
/**
* Creates a new database connection asynchronously.
@@ -10,13 +10,19 @@ import { connect as nativeConnect, initThreadPool, MainWorker } from "./index-vi
* @returns {Promise<Database>} - A promise that resolves to a Database instance.
*/
async function connect(path: string, opts: DatabaseOpts = {}): Promise<Database> {
return await promiseConnect(path, opts, nativeConnect, async () => {
const init = async () => {
await initThreadPool();
if (MainWorker == null) {
throw new Error("panic: MainWorker is not initialized");
}
return MainWorker;
});
};
return await promiseConnect(
path,
opts,
connectDbAsync,
init
);
}
export { connect, Database, SqliteError }

View File

@@ -1,4 +1,4 @@
import { expect, test, afterEach } from 'vitest'
import { expect, test } from 'vitest'
import { connect } from './promise-default.js'
test('in-memory db', async () => {
@@ -10,6 +10,28 @@ test('in-memory db', async () => {
expect(rows).toEqual([{ x: 1 }, { x: 3 }]);
})
test('on-disk db large inserts', async () => {
const path = `test-${(Math.random() * 10000) | 0}.db`;
const db1 = await connect(path);
await db1.prepare("CREATE TABLE t(x)").run();
await db1.prepare("INSERT INTO t VALUES (randomblob(10 * 4096 + 0))").run();
await db1.prepare("INSERT INTO t VALUES (randomblob(10 * 4096 + 1))").run();
await db1.prepare("INSERT INTO t VALUES (randomblob(10 * 4096 + 2))").run();
const stmt1 = db1.prepare("SELECT length(x) as l FROM t");
expect(stmt1.columns()).toEqual([{ name: "l", column: null, database: null, table: null, type: null }]);
const rows1 = await stmt1.all();
expect(rows1).toEqual([{ l: 10 * 4096 }, { l: 10 * 4096 + 1 }, { l: 10 * 4096 + 2 }]);
await db1.exec("BEGIN");
await db1.exec("INSERT INTO t VALUES (1)");
await db1.exec("ROLLBACK");
const rows2 = await db1.prepare("SELECT length(x) as l FROM t").all();
expect(rows2).toEqual([{ l: 10 * 4096 }, { l: 10 * 4096 + 1 }, { l: 10 * 4096 + 2 }]);
await db1.prepare("PRAGMA wal_checkpoint(TRUNCATE)").run();
})
test('on-disk db', async () => {
const path = `test-${(Math.random() * 10000) | 0}.db`;
const db1 = await connect(path);
@@ -19,8 +41,8 @@ test('on-disk db', async () => {
expect(stmt1.columns()).toEqual([{ name: "x", column: null, database: null, table: null, type: null }]);
const rows1 = await stmt1.all([1]);
expect(rows1).toEqual([{ x: 1 }, { x: 3 }]);
await db1.close();
stmt1.close();
await db1.close();
const db2 = await connect(path);
const stmt2 = db2.prepare("SELECT * FROM t WHERE x % 2 = ?");
@@ -30,23 +52,23 @@ test('on-disk db', async () => {
db2.close();
})
test('attach', async () => {
const path1 = `test-${(Math.random() * 10000) | 0}.db`;
const path2 = `test-${(Math.random() * 10000) | 0}.db`;
const db1 = await connect(path1);
await db1.exec("CREATE TABLE t(x)");
await db1.exec("INSERT INTO t VALUES (1), (2), (3)");
const db2 = await connect(path2);
await db2.exec("CREATE TABLE q(x)");
await db2.exec("INSERT INTO q VALUES (4), (5), (6)");
// test('attach', async () => {
// const path1 = `test-${(Math.random() * 10000) | 0}.db`;
// const path2 = `test-${(Math.random() * 10000) | 0}.db`;
// const db1 = await connect(path1);
// await db1.exec("CREATE TABLE t(x)");
// await db1.exec("INSERT INTO t VALUES (1), (2), (3)");
// const db2 = await connect(path2);
// await db2.exec("CREATE TABLE q(x)");
// await db2.exec("INSERT INTO q VALUES (4), (5), (6)");
await db1.exec(`ATTACH '${path2}' as secondary`);
// await db1.exec(`ATTACH '${path2}' as secondary`);
const stmt = db1.prepare("SELECT * FROM t UNION ALL SELECT * FROM secondary.q");
expect(stmt.columns()).toEqual([{ name: "x", column: null, database: null, table: null, type: null }]);
const rows = await stmt.all([1]);
expect(rows).toEqual([{ x: 1 }, { x: 2 }, { x: 3 }, { x: 4 }, { x: 5 }, { x: 6 }]);
})
// const stmt = db1.prepare("SELECT * FROM t UNION ALL SELECT * FROM secondary.q");
// expect(stmt.columns()).toEqual([{ name: "x", column: null, database: null, table: null, type: null }]);
// const rows = await stmt.all([1]);
// expect(rows).toEqual([{ x: 1 }, { x: 2 }, { x: 3 }, { x: 4 }, { x: 5 }, { x: 6 }]);
// })
test('blobs', async () => {
const db = await connect(":memory:");

View File

@@ -192,7 +192,12 @@ class Database {
}
try {
this.db.batchSync(sql);
let stmt = this.prepare(sql);
try {
stmt.run();
} finally {
stmt.close();
}
} catch (err) {
throw convertError(err);
}
@@ -408,6 +413,10 @@ class Statement {
throw convertError(err);
}
}
close() {
this.stmt.finalize();
}
}
export { Database, Statement }

View File

@@ -196,7 +196,12 @@ class Database {
}
try {
await this.db.batchAsync(sql);
const stmt = this.prepare(sql);
try {
await stmt.run();
} finally {
stmt.close();
}
} catch (err) {
throw convertError(err);
}
@@ -298,7 +303,7 @@ class Statement {
bindParams(this.stmt, bindParameters);
while (true) {
const stepResult = await this.stmt.stepAsync();
const stepResult = this.stmt.stepSync();
if (stepResult === STEP_IO) {
await this.db.db.ioLoopAsync();
continue;
@@ -328,7 +333,7 @@ class Statement {
bindParams(this.stmt, bindParameters);
while (true) {
const stepResult = await this.stmt.stepAsync();
const stepResult = this.stmt.stepSync();
if (stepResult === STEP_IO) {
await this.db.db.ioLoopAsync();
continue;
@@ -352,7 +357,7 @@ class Statement {
bindParams(this.stmt, bindParameters);
while (true) {
const stepResult = await this.stmt.stepAsync();
const stepResult = this.stmt.stepSync();
if (stepResult === STEP_IO) {
await this.db.db.ioLoopAsync();
continue;
@@ -377,7 +382,7 @@ class Statement {
const rows: any[] = [];
while (true) {
const stepResult = await this.stmt.stepAsync();
const stepResult = this.stmt.stepSync();
if (stepResult === STEP_IO) {
await this.db.db.ioLoopAsync();
continue;

View File

@@ -15,26 +15,6 @@ export declare class Database {
get path(): string
/** Returns whether the database connection is open. */
get open(): boolean
/**
* Executes a batch of SQL statements on main thread
*
* # Arguments
*
* * `sql` - The SQL statements to execute.
*
* # Returns
*/
batchSync(sql: string): void
/**
* Executes a batch of SQL statements outside of main thread
*
* # Arguments
*
* * `sql` - The SQL statements to execute.
*
* # Returns
*/
batchAsync(sql: string): Promise<void>
/**
* Prepares a statement for execution.
*

View File

@@ -20,10 +20,10 @@
},
"../packages/native": {
"name": "@tursodatabase/database",
"version": "0.1.5-pre.3",
"version": "0.2.0-pre.3",
"license": "MIT",
"dependencies": {
"@tursodatabase/database-common": "^0.1.5-pre.3"
"@tursodatabase/database-common": "^0.2.0-pre.3"
},
"devDependencies": {
"@napi-rs/cli": "^3.1.5",

View File

@@ -1,26 +1,26 @@
import { run, bench, group, baseline } from 'mitata';
import { Database } from '@tursodatabase/database/compat';
import { Database } from '@tursodatabase/database';
const db = new Database(':memory:');
db.exec("CREATE TABLE users (id INTEGER, name TEXT, email TEXT)");
db.exec("INSERT INTO users (id, name, email) VALUES (1, 'Alice', 'alice@example.org')");
await db.exec("CREATE TABLE users (id INTEGER, name TEXT, email TEXT)");
await db.exec("INSERT INTO users (id, name, email) VALUES (1, 'Alice', 'alice@example.org')");
const stmtSelect = db.prepare("SELECT * FROM users WHERE id = ?");
const rawStmtSelect = db.prepare("SELECT * FROM users WHERE id = ?").raw();
const stmtInsert = db.prepare("INSERT INTO users (id, name, email) VALUES (?, ?, ?)");
bench('Statement.get() with bind parameters [expanded]', () => {
stmtSelect.get(1);
bench('Statement.get() with bind parameters [expanded]', async () => {
await stmtSelect.get(1);
});
bench('Statement.get() with bind parameters [raw]', () => {
rawStmtSelect.get(1);
bench('Statement.get() with bind parameters [raw]', async () => {
await rawStmtSelect.get(1);
});
bench('Statement.run() with bind parameters', () => {
stmtInsert.run([1, 'foobar', 'foobar@example.com']);
bench('Statement.run() with bind parameters', async () => {
await stmtInsert.run([1, 'foobar', 'foobar@example.com']);
});
await run({

View File

@@ -1,10 +1,10 @@
use std::sync::Arc;
use std::{cell::RefCell, collections::HashMap, sync::Arc};
use napi::bindgen_prelude::*;
use napi_derive::napi;
use turso_core::{storage::database::DatabaseFile, Clock, File, Instant, IO};
use turso_core::{Clock, Completion, File, Instant, MemoryIO, IO};
use crate::{init_tracing, is_memory, Database, DatabaseOpts};
use crate::{is_memory, Database, DatabaseOpts};
pub struct NoopTask;
@@ -29,11 +29,11 @@ pub fn init_thread_pool() -> napi::Result<AsyncTask<NoopTask>> {
pub struct ConnectTask {
path: String,
io: Arc<dyn turso_core::IO>,
opts: Option<DatabaseOpts>,
}
pub struct ConnectResult {
db: Arc<turso_core::Database>,
conn: Arc<turso_core::Connection>,
db: Database,
}
unsafe impl Send for ConnectResult {}
@@ -43,73 +43,98 @@ impl Task for ConnectTask {
type JsValue = Database;
fn compute(&mut self) -> Result<Self::Output> {
let file = self
.io
.open_file(&self.path, turso_core::OpenFlags::Create, false)
.map_err(|e| Error::new(Status::GenericFailure, format!("Failed to open file: {e}")))?;
let db_file = Arc::new(DatabaseFile::new(file));
let db = turso_core::Database::open(self.io.clone(), &self.path, db_file, false, true)
.map_err(|e| {
Error::new(
Status::GenericFailure,
format!("Failed to open database: {e}"),
)
})?;
let conn = db
.connect()
.map_err(|e| Error::new(Status::GenericFailure, format!("Failed to connect: {e}")))?;
Ok(ConnectResult { db, conn })
let db = Database::new_io(self.path.clone(), self.io.clone(), self.opts.clone())?;
Ok(ConnectResult { db })
}
fn resolve(&mut self, _: Env, result: Self::Output) -> Result<Self::JsValue> {
Ok(Database::create(
Some(result.db),
self.io.clone(),
result.conn,
self.path.clone(),
))
Ok(result.db)
}
}
#[napi]
// we offload connect to the web-worker because:
// 1. browser main-thread do not support Atomic.wait operations
// 2. turso-db use blocking IO [io.wait_for_completion(c)] in few places during initialization path
//
// so, we offload connect to the worker thread
pub fn connect(path: String, opts: Option<DatabaseOpts>) -> Result<AsyncTask<ConnectTask>> {
if let Some(opts) = opts {
init_tracing(opts.tracing);
}
let task = if is_memory(&path) {
ConnectTask {
io: Arc::new(turso_core::MemoryIO::new()),
path,
}
} else {
let io = Arc::new(Opfs::new()?);
ConnectTask { io, path }
};
Ok(AsyncTask::new(task))
}
#[napi]
#[derive(Clone)]
pub struct Opfs;
pub struct Opfs {
inner: Arc<OpfsInner>,
}
pub struct OpfsInner {
completion_no: RefCell<u32>,
completions: RefCell<HashMap<u32, Completion>>,
}
thread_local! {
static OPFS: Arc<Opfs> = Arc::new(Opfs::new());
}
#[napi]
#[derive(Clone)]
struct OpfsFile {
handle: i32,
opfs: Opfs,
}
// unsafe impl Send for OpfsFile {}
// unsafe impl Sync for OpfsFile {}
unsafe impl Send for Opfs {}
unsafe impl Sync for Opfs {}
#[napi]
// we offload connect to the web-worker because
// turso-db use blocking IO [io.wait_for_completion(c)] in few places during initialization path
pub fn connect_db_async(
path: String,
opts: Option<DatabaseOpts>,
) -> Result<AsyncTask<ConnectTask>> {
let io: Arc<dyn turso_core::IO> = if is_memory(&path) {
Arc::new(MemoryIO::new())
} else {
// we must create OPFS IO on the main thread
opfs()
};
let task = ConnectTask { path, io, opts };
Ok(AsyncTask::new(task))
}
#[napi]
pub fn complete_opfs(completion_no: u32, result: i32) {
OPFS.with(|opfs| opfs.complete(completion_no, result))
}
pub fn opfs() -> Arc<Opfs> {
OPFS.with(|opfs| opfs.clone())
}
impl Opfs {
#[napi(constructor)]
pub fn new() -> napi::Result<Self> {
Ok(Self)
pub fn new() -> Self {
Self {
inner: Arc::new(OpfsInner {
completion_no: RefCell::new(0),
completions: RefCell::new(HashMap::new()),
}),
}
}
pub fn complete(&self, completion_no: u32, result: i32) {
let completion = {
let mut completions = self.inner.completions.borrow_mut();
completions.remove(&completion_no).unwrap()
};
completion.complete(result);
}
fn register_completion(&self, c: Completion) -> u32 {
let inner = &self.inner;
*inner.completion_no.borrow_mut() += 1;
let completion_no = *inner.completion_no.borrow();
tracing::debug!(
"register completion: {} {:?}",
completion_no,
Arc::as_ptr(inner)
);
inner.completions.borrow_mut().insert(completion_no, c);
completion_no
}
}
@@ -127,6 +152,13 @@ extern "C" {
fn sync(handle: i32) -> i32;
fn truncate(handle: i32, length: usize) -> i32;
fn size(handle: i32) -> i32;
fn write_async(handle: i32, buffer: *const u8, buffer_len: usize, offset: i32, c: u32);
fn sync_async(handle: i32, c: u32);
fn read_async(handle: i32, buffer: *mut u8, buffer_len: usize, offset: i32, c: u32);
fn truncate_async(handle: i32, length: usize, c: u32);
// fn size_async(handle: i32) -> i32;
fn is_web_worker() -> bool;
}
@@ -144,7 +176,12 @@ impl IO for Opfs {
tracing::info!("open_file: {}", path);
let result = unsafe { lookup_file(path.as_ptr(), path.len()) };
if result >= 0 {
Ok(Arc::new(OpfsFile { handle: result }))
Ok(Arc::new(OpfsFile {
handle: result,
opfs: Opfs {
inner: self.inner.clone(),
},
}))
} else if result == -404 {
Err(turso_core::LimboError::InternalError(format!(
"unexpected path {path}: files must be created in advance for OPFS IO"
@@ -175,17 +212,32 @@ impl File for OpfsFile {
pos: u64,
c: turso_core::Completion,
) -> turso_core::Result<turso_core::Completion> {
assert!(
is_web_worker_safe(),
"opfs must be used only from web worker for now"
let web_worker = is_web_worker_safe();
tracing::debug!(
"pread({}, is_web_worker={}): pos={}",
self.handle,
web_worker,
pos
);
tracing::debug!("pread({}): pos={}", self.handle, pos);
let handle = self.handle;
let read_c = c.as_read();
let buffer = read_c.buf_arc();
let buffer = buffer.as_mut_slice();
if web_worker {
let result = unsafe { read(handle, buffer.as_mut_ptr(), buffer.len(), pos as i32) };
c.complete(result as i32);
} else {
let completion_no = self.opfs.register_completion(c.clone());
unsafe {
read_async(
handle,
buffer.as_mut_ptr(),
buffer.len(),
pos as i32,
completion_no,
)
};
}
Ok(c)
}
@@ -195,27 +247,44 @@ impl File for OpfsFile {
buffer: Arc<turso_core::Buffer>,
c: turso_core::Completion,
) -> turso_core::Result<turso_core::Completion> {
assert!(
is_web_worker_safe(),
"opfs must be used only from web worker for now"
let web_worker = is_web_worker_safe();
tracing::debug!(
"pwrite({}, is_web_worker={}): pos={}",
self.handle,
web_worker,
pos
);
tracing::debug!("pwrite({}): pos={}", self.handle, pos);
let handle = self.handle;
let buffer = buffer.as_slice();
if web_worker {
let result = unsafe { write(handle, buffer.as_ptr(), buffer.len(), pos as i32) };
c.complete(result as i32);
} else {
let completion_no = self.opfs.register_completion(c.clone());
unsafe {
write_async(
handle,
buffer.as_ptr(),
buffer.len(),
pos as i32,
completion_no,
)
};
}
Ok(c)
}
fn sync(&self, c: turso_core::Completion) -> turso_core::Result<turso_core::Completion> {
assert!(
is_web_worker_safe(),
"opfs must be used only from web worker for now"
);
tracing::debug!("sync({})", self.handle);
let web_worker = is_web_worker_safe();
tracing::debug!("sync({}, is_web_worker={})", self.handle, web_worker);
let handle = self.handle;
if web_worker {
let result = unsafe { sync(handle) };
c.complete(result as i32);
} else {
let completion_no = self.opfs.register_completion(c.clone());
unsafe { sync_async(handle, completion_no) };
}
Ok(c)
}
@@ -224,14 +293,21 @@ impl File for OpfsFile {
len: u64,
c: turso_core::Completion,
) -> turso_core::Result<turso_core::Completion> {
assert!(
is_web_worker_safe(),
"opfs must be used only from web worker for now"
let web_worker = is_web_worker_safe();
tracing::debug!(
"truncate({}, is_web_worker={}): len={}",
self.handle,
web_worker,
len
);
tracing::debug!("truncate({}): len={}", self.handle, len);
let handle = self.handle;
if web_worker {
let result = unsafe { truncate(handle, len as usize) };
c.complete(result as i32);
} else {
let completion_no = self.opfs.register_completion(c.clone());
unsafe { truncate_async(handle, len as usize, completion_no) };
}
Ok(c)
}

View File

@@ -10,8 +10,10 @@
//! - Iterating through query results
//! - Managing the I/O event loop
#[cfg(feature = "browser")]
// #[cfg(feature = "browser")]
pub mod browser;
// #[cfg(feature = "browser")]
use crate::browser::opfs;
use napi::bindgen_prelude::*;
use napi::{Env, Task};
@@ -76,10 +78,6 @@ pub(crate) fn init_tracing(level_filter: Option<String>) {
}
pub enum DbTask {
Batch {
conn: Arc<turso_core::Connection>,
sql: String,
},
Step {
stmt: Arc<RefCell<Option<turso_core::Statement>>>,
},
@@ -93,10 +91,6 @@ impl Task for DbTask {
fn compute(&mut self) -> Result<Self::Output> {
match self {
DbTask::Batch { conn, sql } => {
batch_sync(conn, sql)?;
Ok(0)
}
DbTask::Step { stmt } => step_sync(stmt),
}
}
@@ -107,20 +101,11 @@ impl Task for DbTask {
}
#[napi(object)]
#[derive(Clone)]
pub struct DatabaseOpts {
pub tracing: Option<String>,
}
fn batch_sync(conn: &Arc<turso_core::Connection>, sql: &str) -> napi::Result<()> {
conn.prepare_execute_batch(sql).map_err(|e| {
Error::new(
Status::GenericFailure,
format!("Failed to execute batch: {e}"),
)
})?;
Ok(())
}
fn step_sync(stmt: &Arc<RefCell<Option<turso_core::Statement>>>) -> napi::Result<u32> {
let mut stmt_ref = stmt.borrow_mut();
let stmt = stmt_ref
@@ -152,21 +137,38 @@ impl Database {
/// # Arguments
/// * `path` - The path to the database file.
#[napi(constructor)]
pub fn new(path: String, opts: Option<DatabaseOpts>) -> Result<Self> {
if let Some(opts) = opts {
init_tracing(opts.tracing);
pub fn new_napi(path: String, opts: Option<DatabaseOpts>) -> Result<Self> {
Self::new(path, opts)
}
pub fn new(path: String, opts: Option<DatabaseOpts>) -> Result<Self> {
let io: Arc<dyn turso_core::IO> = if is_memory(&path) {
Arc::new(turso_core::MemoryIO::new())
} else {
#[cfg(not(feature = "browser"))]
{
Arc::new(turso_core::PlatformIO::new().map_err(|e| {
Error::new(Status::GenericFailure, format!("Failed to create IO: {e}"))
})?)
};
}
#[cfg(feature = "browser")]
if !is_memory(&path) {
return Err(Error::new(Status::GenericFailure, "sync constructor is not supported for FS-backed databases in the WASM. Use async connect(...) method instead".to_string()));
{
return Err(napi::Error::new(
napi::Status::GenericFailure,
"FS-backed db must be initialized through connectDbAsync function in the browser",
));
}
};
Self::new_io(path, io, opts)
}
pub fn new_io(
path: String,
io: Arc<dyn turso_core::IO>,
opts: Option<DatabaseOpts>,
) -> Result<Self> {
if let Some(opts) = opts {
init_tracing(opts.tracing);
}
let file = io
@@ -233,33 +235,6 @@ impl Database {
self.is_open.get()
}
/// Executes a batch of SQL statements on main thread
///
/// # Arguments
///
/// * `sql` - The SQL statements to execute.
///
/// # Returns
#[napi]
pub fn batch_sync(&self, sql: String) -> Result<()> {
batch_sync(&self.conn()?, &sql)
}
/// Executes a batch of SQL statements outside of main thread
///
/// # Arguments
///
/// * `sql` - The SQL statements to execute.
///
/// # Returns
#[napi(ts_return_type = "Promise<void>")]
pub fn batch_async(&self, sql: String) -> Result<AsyncTask<DbTask>> {
Ok(AsyncTask::new(DbTask::Batch {
conn: self.conn()?.clone(),
sql,
}))
}
/// Prepares a statement for execution.
///
/// # Arguments
@@ -325,8 +300,8 @@ impl Database {
#[napi]
pub fn close(&mut self) -> Result<()> {
self.is_open.set(false);
let _ = self._db.take();
let _ = self.conn.take().unwrap();
let _ = self._db.take();
Ok(())
}

View File

@@ -15,26 +15,6 @@ export declare class Database {
get path(): string
/** Returns whether the database connection is open. */
get open(): boolean
/**
* Executes a batch of SQL statements on main thread
*
* # Arguments
*
* * `sql` - The SQL statements to execute.
*
* # Returns
*/
batchSync(sql: string): void
/**
* Executes a batch of SQL statements outside of main thread
*
* # Arguments
*
* * `sql` - The SQL statements to execute.
*
* # Returns
*/
batchAsync(sql: string): Promise<void>
/**
* Prepares a statement for execution.
*
@@ -93,6 +73,16 @@ export declare class Database {
ioLoopAsync(): Promise<void>
}
export declare class Opfs {
constructor()
connectDb(path: string, opts?: DatabaseOpts | undefined | null): Promise<unknown>
complete(completionNo: number, result: number): void
}
export declare class OpfsFile {
}
/** A prepared statement. */
export declare class Statement {
reset(): void
@@ -149,6 +139,12 @@ export declare class Statement {
export interface DatabaseOpts {
tracing?: string
}
/**
* turso-db in the the browser requires explicit thread pool initialization
* so, we just put no-op task on the thread pool and force emnapi to allocate web worker
*/
export declare function initThreadPool(): Promise<unknown>
export declare class GeneratorHolder {
resumeSync(error?: string | undefined | null): GeneratorResponse
resumeAsync(error?: string | undefined | null): Promise<unknown>
@@ -220,7 +216,7 @@ export type DatabaseRowTransformResultJs =
export type GeneratorResponse =
| { type: 'IO' }
| { type: 'Done' }
| { type: 'SyncEngineStats', operations: number, mainWal: number, revertWal: number, lastPullUnixTime: number, lastPushUnixTime?: number }
| { type: 'SyncEngineStats', operations: number, mainWal: number, revertWal: number, lastPullUnixTime: number, lastPushUnixTime?: number, revision?: string }
export type JsProtocolRequest =
| { type: 'Http', method: string, path: string, body?: Array<number>, headers: Array<[string, string]> }

View File

@@ -508,9 +508,12 @@ if (!nativeBinding) {
throw new Error(`Failed to load native binding`)
}
const { Database, Statement, GeneratorHolder, JsDataCompletion, JsProtocolIo, JsProtocolRequestBytes, SyncEngine, DatabaseChangeTypeJs, SyncEngineProtocolVersion } = nativeBinding
const { Database, Opfs, OpfsFile, Statement, initThreadPool, GeneratorHolder, JsDataCompletion, JsProtocolIo, JsProtocolRequestBytes, SyncEngine, DatabaseChangeTypeJs, SyncEngineProtocolVersion } = nativeBinding
export { Database }
export { Opfs }
export { OpfsFile }
export { Statement }
export { initThreadPool }
export { GeneratorHolder }
export { JsDataCompletion }
export { JsProtocolIo }