From 98db727a999f586ab20b7a3924050a688ded12af Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Mon, 10 Nov 2025 15:35:29 +0400 Subject: [PATCH] integrate extra io stepping logic to the JS bindings --- .../javascript/packages/common/promise.ts | 24 +++-- .../javascript/sync/packages/common/index.ts | 16 +++- .../javascript/sync/packages/common/run.ts | 30 ++++-- .../javascript/sync/packages/common/types.ts | 4 + .../sync/packages/native/index.d.ts | 4 +- .../javascript/sync/packages/native/index.js | 92 +++++++++---------- .../sync/packages/native/promise.ts | 32 ++++--- .../javascript/sync/src/js_protocol_io.rs | 28 +++++- bindings/javascript/sync/src/lib.rs | 6 ++ sync/engine/src/database_sync_operations.rs | 28 +++--- sync/engine/src/protocol_io.rs | 7 +- sync/engine/src/types.rs | 13 ++- 12 files changed, 181 insertions(+), 103 deletions(-) diff --git a/bindings/javascript/packages/common/promise.ts b/bindings/javascript/packages/common/promise.ts index 0ea8030b1..a1ed8a000 100644 --- a/bindings/javascript/packages/common/promise.ts +++ b/bindings/javascript/packages/common/promise.ts @@ -37,13 +37,15 @@ class Database { inTransaction: boolean; private db: NativeDatabase; + private ioStep: () => Promise; private execLock: AsyncLock; private _inTransaction: boolean = false; protected connected: boolean = false; - constructor(db: NativeDatabase) { + constructor(db: NativeDatabase, ioStep?: () => Promise) { this.db = db; this.execLock = new AsyncLock(); + this.ioStep = ioStep ?? (async () => { }); Object.defineProperties(this, { name: { get: () => this.db.path }, readonly: { get: () => this.db.readonly }, @@ -74,9 +76,9 @@ class Database { try { if (this.connected) { - return new Statement(maybeValue(this.db.prepare(sql)), this.db, this.execLock); + return new Statement(maybeValue(this.db.prepare(sql)), this.db, this.execLock, this.ioStep); } else { - return new Statement(maybePromise(() => this.connect().then(() => this.db.prepare(sql))), this.db, this.execLock) + return new Statement(maybePromise(() => this.connect().then(() => this.db.prepare(sql))), this.db, this.execLock, this.ioStep) } } catch (err) { throw convertError(err); @@ -185,6 +187,7 @@ class Database { const stepResult = exec.stepSync(); if (stepResult === STEP_IO) { await this.db.ioLoopAsync(); + await this.ioStep(); continue; } if (stepResult === STEP_DONE) { @@ -280,11 +283,13 @@ class Statement { private stmt: MaybeLazy; private db: NativeDatabase; private execLock: AsyncLock; + private ioStep: () => Promise; - constructor(stmt: MaybeLazy, db: NativeDatabase, execLock: AsyncLock) { + constructor(stmt: MaybeLazy, db: NativeDatabase, execLock: AsyncLock, ioStep: () => Promise) { this.stmt = stmt; this.db = db; this.execLock = execLock; + this.ioStep = ioStep; } /** @@ -352,7 +357,7 @@ class Statement { while (true) { const stepResult = await stmt.stepSync(); if (stepResult === STEP_IO) { - await this.db.ioLoopAsync(); + await this.io(); continue; } if (stepResult === STEP_DONE) { @@ -389,7 +394,7 @@ class Statement { while (true) { const stepResult = await stmt.stepSync(); if (stepResult === STEP_IO) { - await this.db.ioLoopAsync(); + await this.io(); continue; } if (stepResult === STEP_DONE) { @@ -453,7 +458,7 @@ class Statement { while (true) { const stepResult = await stmt.stepSync(); if (stepResult === STEP_IO) { - await this.db.ioLoopAsync(); + await this.io(); continue; } if (stepResult === STEP_DONE) { @@ -471,6 +476,11 @@ class Statement { } } + async io() { + await this.db.ioLoopAsync(); + await this.ioStep(); + } + /** * Interrupts the statement. */ diff --git a/bindings/javascript/sync/packages/common/index.ts b/bindings/javascript/sync/packages/common/index.ts index 5facb2dc9..0b5ce37fa 100644 --- a/bindings/javascript/sync/packages/common/index.ts +++ b/bindings/javascript/sync/packages/common/index.ts @@ -1,7 +1,17 @@ -import { run, memoryIO, SyncEngineGuards } from "./run.js" -import { DatabaseOpts, ProtocolIo, RunOpts, DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult, DatabaseStats, DatabaseChangeType, EncryptionOpts } from "./types.js" +import { run, memoryIO, runner, SyncEngineGuards, Runner } from "./run.js" +import { + DatabaseOpts, + ProtocolIo, + RunOpts, + DatabaseRowMutation, + DatabaseRowStatement, + DatabaseRowTransformResult, + DatabaseStats, + DatabaseChangeType, + EncryptionOpts, +} from "./types.js" -export { run, memoryIO, SyncEngineGuards } +export { run, memoryIO, runner, SyncEngineGuards, Runner } export type { DatabaseStats, DatabaseOpts, diff --git a/bindings/javascript/sync/packages/common/run.ts b/bindings/javascript/sync/packages/common/run.ts index 95d47612f..a13c6b98d 100644 --- a/bindings/javascript/sync/packages/common/run.ts +++ b/bindings/javascript/sync/packages/common/run.ts @@ -116,9 +116,28 @@ export function memoryIO(): ProtocolIo { } }; +export interface Runner { + wait(): Promise; +} -export async function run(opts: RunOpts, io: ProtocolIo, engine: any, generator: any): Promise { +export function runner(opts: RunOpts, io: ProtocolIo, engine: any): Runner { let tasks = []; + return { + async wait() { + for (let request = engine.protocolIo(); request != null; request = engine.protocolIo()) { + tasks.push(trackPromise(process(opts, io, request))); + } + const tasksRace = tasks.length == 0 ? Promise.resolve() : Promise.race([timeoutMs(opts.preemptionMs), ...tasks.map(t => t.promise)]); + await Promise.all([engine.ioLoopAsync(), tasksRace]); + + tasks = tasks.filter(t => !t.finished); + + engine.protocolIoStep(); + }, + } +} + +export async function run(runner: Runner, generator: any): Promise { while (true) { const { type, ...rest }: GeneratorResponse = await generator.resumeAsync(null); if (type == 'Done') { @@ -131,14 +150,7 @@ export async function run(opts: RunOpts, io: ProtocolIo, engine: any, generator: //@ts-ignore return rest.changes; } - for (let request = engine.protocolIo(); request != null; request = engine.protocolIo()) { - tasks.push(trackPromise(process(opts, io, request))); - } - - const tasksRace = tasks.length == 0 ? Promise.resolve() : Promise.race([timeoutMs(opts.preemptionMs), ...tasks.map(t => t.promise)]); - await Promise.all([engine.ioLoopAsync(), tasksRace]); - - tasks = tasks.filter(t => !t.finished); + await runner.wait(); } } diff --git a/bindings/javascript/sync/packages/common/types.ts b/bindings/javascript/sync/packages/common/types.ts index 98f0f0c69..fc57b534f 100644 --- a/bindings/javascript/sync/packages/common/types.ts +++ b/bindings/javascript/sync/packages/common/types.ts @@ -107,6 +107,10 @@ export interface DatabaseOpts { * optional parameter to enable internal logging for the database */ tracing?: 'error' | 'warn' | 'info' | 'debug' | 'trace', + /** + * optional parameter to enable partial sync for the database + */ + partial?: boolean; } export interface DatabaseStats { /** diff --git a/bindings/javascript/sync/packages/native/index.d.ts b/bindings/javascript/sync/packages/native/index.d.ts index 289cddbbb..6550f730d 100644 --- a/bindings/javascript/sync/packages/native/index.d.ts +++ b/bindings/javascript/sync/packages/native/index.d.ts @@ -163,7 +163,7 @@ export declare class JsDataCompletion { } export declare class JsProtocolIo { - takeRequest(): JsProtocolRequestBytes | null + } export declare class JsProtocolRequestBytes { @@ -178,6 +178,7 @@ export declare class SyncEngine { /** Runs the I/O loop asynchronously, returning a Promise. */ ioLoopAsync(): Promise protocolIo(): JsProtocolRequestBytes | null + protocolIoStep(): void push(): GeneratorHolder stats(): GeneratorHolder wait(): GeneratorHolder @@ -240,6 +241,7 @@ export interface SyncEngineOpts { protocolVersion?: SyncEngineProtocolVersion bootstrapIfEmpty: boolean remoteEncryption?: string + partial?: boolean } export declare const enum SyncEngineProtocolVersion { diff --git a/bindings/javascript/sync/packages/native/index.js b/bindings/javascript/sync/packages/native/index.js index ddc3d3f36..a5b9f5b18 100644 --- a/bindings/javascript/sync/packages/native/index.js +++ b/bindings/javascript/sync/packages/native/index.js @@ -81,8 +81,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-android-arm64') const bindingPackageVersion = require('@tursodatabase/sync-android-arm64/package.json').version - if (bindingPackageVersion !== '0.3.0-pre.4' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.3.0-pre.4 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.4.0-pre.1' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.4.0-pre.1 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -97,8 +97,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-android-arm-eabi') const bindingPackageVersion = require('@tursodatabase/sync-android-arm-eabi/package.json').version - if (bindingPackageVersion !== '0.3.0-pre.4' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.3.0-pre.4 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.4.0-pre.1' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.4.0-pre.1 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -117,8 +117,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-win32-x64-msvc') const bindingPackageVersion = require('@tursodatabase/sync-win32-x64-msvc/package.json').version - if (bindingPackageVersion !== '0.3.0-pre.4' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.3.0-pre.4 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.4.0-pre.1' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.4.0-pre.1 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -133,8 +133,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-win32-ia32-msvc') const bindingPackageVersion = require('@tursodatabase/sync-win32-ia32-msvc/package.json').version - if (bindingPackageVersion !== '0.3.0-pre.4' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.3.0-pre.4 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.4.0-pre.1' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.4.0-pre.1 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -149,8 +149,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-win32-arm64-msvc') const bindingPackageVersion = require('@tursodatabase/sync-win32-arm64-msvc/package.json').version - if (bindingPackageVersion !== '0.3.0-pre.4' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.3.0-pre.4 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.4.0-pre.1' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.4.0-pre.1 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -168,8 +168,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-darwin-universal') const bindingPackageVersion = require('@tursodatabase/sync-darwin-universal/package.json').version - if (bindingPackageVersion !== '0.3.0-pre.4' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.3.0-pre.4 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.4.0-pre.1' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.4.0-pre.1 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -184,8 +184,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-darwin-x64') const bindingPackageVersion = require('@tursodatabase/sync-darwin-x64/package.json').version - if (bindingPackageVersion !== '0.3.0-pre.4' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.3.0-pre.4 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.4.0-pre.1' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.4.0-pre.1 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -200,8 +200,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-darwin-arm64') const bindingPackageVersion = require('@tursodatabase/sync-darwin-arm64/package.json').version - if (bindingPackageVersion !== '0.3.0-pre.4' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.3.0-pre.4 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.4.0-pre.1' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.4.0-pre.1 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -220,8 +220,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-freebsd-x64') const bindingPackageVersion = require('@tursodatabase/sync-freebsd-x64/package.json').version - if (bindingPackageVersion !== '0.3.0-pre.4' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.3.0-pre.4 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.4.0-pre.1' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.4.0-pre.1 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -236,8 +236,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-freebsd-arm64') const bindingPackageVersion = require('@tursodatabase/sync-freebsd-arm64/package.json').version - if (bindingPackageVersion !== '0.3.0-pre.4' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.3.0-pre.4 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.4.0-pre.1' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.4.0-pre.1 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -257,8 +257,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-linux-x64-musl') const bindingPackageVersion = require('@tursodatabase/sync-linux-x64-musl/package.json').version - if (bindingPackageVersion !== '0.3.0-pre.4' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.3.0-pre.4 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.4.0-pre.1' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.4.0-pre.1 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -273,8 +273,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-linux-x64-gnu') const bindingPackageVersion = require('@tursodatabase/sync-linux-x64-gnu/package.json').version - if (bindingPackageVersion !== '0.3.0-pre.4' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.3.0-pre.4 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.4.0-pre.1' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.4.0-pre.1 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -291,8 +291,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-linux-arm64-musl') const bindingPackageVersion = require('@tursodatabase/sync-linux-arm64-musl/package.json').version - if (bindingPackageVersion !== '0.3.0-pre.4' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.3.0-pre.4 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.4.0-pre.1' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.4.0-pre.1 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -307,8 +307,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-linux-arm64-gnu') const bindingPackageVersion = require('@tursodatabase/sync-linux-arm64-gnu/package.json').version - if (bindingPackageVersion !== '0.3.0-pre.4' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.3.0-pre.4 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.4.0-pre.1' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.4.0-pre.1 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -325,8 +325,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-linux-arm-musleabihf') const bindingPackageVersion = require('@tursodatabase/sync-linux-arm-musleabihf/package.json').version - if (bindingPackageVersion !== '0.3.0-pre.4' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.3.0-pre.4 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.4.0-pre.1' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.4.0-pre.1 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -341,8 +341,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-linux-arm-gnueabihf') const bindingPackageVersion = require('@tursodatabase/sync-linux-arm-gnueabihf/package.json').version - if (bindingPackageVersion !== '0.3.0-pre.4' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.3.0-pre.4 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.4.0-pre.1' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.4.0-pre.1 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -359,8 +359,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-linux-riscv64-musl') const bindingPackageVersion = require('@tursodatabase/sync-linux-riscv64-musl/package.json').version - if (bindingPackageVersion !== '0.3.0-pre.4' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.3.0-pre.4 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.4.0-pre.1' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.4.0-pre.1 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -375,8 +375,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-linux-riscv64-gnu') const bindingPackageVersion = require('@tursodatabase/sync-linux-riscv64-gnu/package.json').version - if (bindingPackageVersion !== '0.3.0-pre.4' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.3.0-pre.4 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.4.0-pre.1' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.4.0-pre.1 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -392,8 +392,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-linux-ppc64-gnu') const bindingPackageVersion = require('@tursodatabase/sync-linux-ppc64-gnu/package.json').version - if (bindingPackageVersion !== '0.3.0-pre.4' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.3.0-pre.4 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.4.0-pre.1' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.4.0-pre.1 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -408,8 +408,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-linux-s390x-gnu') const bindingPackageVersion = require('@tursodatabase/sync-linux-s390x-gnu/package.json').version - if (bindingPackageVersion !== '0.3.0-pre.4' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.3.0-pre.4 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.4.0-pre.1' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.4.0-pre.1 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -428,8 +428,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-openharmony-arm64') const bindingPackageVersion = require('@tursodatabase/sync-openharmony-arm64/package.json').version - if (bindingPackageVersion !== '0.3.0-pre.4' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.3.0-pre.4 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.4.0-pre.1' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.4.0-pre.1 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -444,8 +444,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-openharmony-x64') const bindingPackageVersion = require('@tursodatabase/sync-openharmony-x64/package.json').version - if (bindingPackageVersion !== '0.3.0-pre.4' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.3.0-pre.4 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.4.0-pre.1' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.4.0-pre.1 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -460,8 +460,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-openharmony-arm') const bindingPackageVersion = require('@tursodatabase/sync-openharmony-arm/package.json').version - if (bindingPackageVersion !== '0.3.0-pre.4' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.3.0-pre.4 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.4.0-pre.1' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.4.0-pre.1 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { diff --git a/bindings/javascript/sync/packages/native/promise.ts b/bindings/javascript/sync/packages/native/promise.ts index 1df3cfb21..a75e13edf 100644 --- a/bindings/javascript/sync/packages/native/promise.ts +++ b/bindings/javascript/sync/packages/native/promise.ts @@ -1,5 +1,5 @@ import { DatabasePromise } from "@tursodatabase/database-common" -import { ProtocolIo, run, DatabaseOpts, EncryptionOpts, RunOpts, DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult, DatabaseStats, SyncEngineGuards } from "@tursodatabase/sync-common"; +import { ProtocolIo, run, DatabaseOpts, EncryptionOpts, RunOpts, DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult, DatabaseStats, SyncEngineGuards, Runner, runner } from "@tursodatabase/sync-common"; import { SyncEngine, SyncEngineProtocolVersion, Database as NativeDatabase } from "#index"; import { promises } from "node:fs"; @@ -40,10 +40,9 @@ function memoryIO(): ProtocolIo { } }; class Database extends DatabasePromise { - #runOpts: RunOpts; #engine: any; - #io: ProtocolIo; - #guards: SyncEngineGuards + #guards: SyncEngineGuards; + #runner: Runner; constructor(opts: DatabaseOpts) { if (opts.url == null) { super(new NativeDatabase(opts.path, { tracing: opts.tracing }) as any); @@ -60,8 +59,8 @@ class Database extends DatabasePromise { tracing: opts.tracing, bootstrapIfEmpty: typeof opts.url != "function" || opts.url() != null, remoteEncryption: opts.remoteEncryption?.cipher, + partial: opts.partial, }); - super(engine.db() as unknown as any); let headers: { [K: string]: string } | (() => Promise<{ [K: string]: string }>); if (typeof opts.authToken == "function") { @@ -83,14 +82,21 @@ class Database extends DatabasePromise { }) }; } - this.#runOpts = { + const runOpts = { url: opts.url, headers: headers, preemptionMs: 1, transform: opts.transform, }; + const db = engine.db() as unknown as any; + const memory = db.memory; + const io = memory ? memoryIO() : NodeIO; + const run = runner(runOpts, io, engine); + + super(engine.db() as unknown as any, () => run.wait()); + + this.#runner = run; this.#engine = engine; - this.#io = this.memory ? memoryIO() : NodeIO; this.#guards = new SyncEngineGuards(); } /** @@ -102,7 +108,7 @@ class Database extends DatabasePromise { } else if (this.#engine == null) { await super.connect(); } else { - await run(this.#runOpts, this.#io, this.#engine, this.#engine.connect()); + await run(this.#runner, this.#engine.connect()); } this.connected = true; } @@ -115,11 +121,11 @@ class Database extends DatabasePromise { if (this.#engine == null) { throw new Error("sync is disabled as database was opened without sync support") } - const changes = await this.#guards.wait(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.wait())); + const changes = await this.#guards.wait(async () => await run(this.#runner, this.#engine.wait())); if (changes.empty()) { return false; } - await this.#guards.apply(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.apply(changes))); + await this.#guards.apply(async () => await run(this.#runner, this.#engine.apply(changes))); return true; } /** @@ -130,7 +136,7 @@ class Database extends DatabasePromise { if (this.#engine == null) { throw new Error("sync is disabled as database was opened without sync support") } - await this.#guards.push(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.push())); + await this.#guards.push(async () => await run(this.#runner, this.#engine.push())); } /** * checkpoint WAL for local database @@ -139,7 +145,7 @@ class Database extends DatabasePromise { if (this.#engine == null) { throw new Error("sync is disabled as database was opened without sync support") } - await this.#guards.checkpoint(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.checkpoint())); + await this.#guards.checkpoint(async () => await run(this.#runner, this.#engine.checkpoint())); } /** * @returns statistic of current local database @@ -148,7 +154,7 @@ class Database extends DatabasePromise { if (this.#engine == null) { throw new Error("sync is disabled as database was opened without sync support") } - return (await run(this.#runOpts, this.#io, this.#engine, this.#engine.stats())); + return (await run(this.#runner, this.#engine.stats())); } /** * close the database diff --git a/bindings/javascript/sync/src/js_protocol_io.rs b/bindings/javascript/sync/src/js_protocol_io.rs index bbe68b66d..384635856 100644 --- a/bindings/javascript/sync/src/js_protocol_io.rs +++ b/bindings/javascript/sync/src/js_protocol_io.rs @@ -242,27 +242,47 @@ impl ProtocolIO for JsProtocolIo { })) } - fn register(&self, callback: Box bool>) { - tracing::info!("register callback in the ProtocolIo"); + fn add_work(&self, callback: Box bool + Send + Sync>) { + let mut work = self.work.lock().unwrap(); + work.push_back(callback); + } + + fn step_work(&self) { + let mut items = { + let mut work = self.work.lock().unwrap(); + work.drain(..).collect::>() + }; + let length = items.len(); + for _ in 0..length { + let mut item = items.pop_front().unwrap(); + if item() { + continue; + } + items.push_back(item); + } + { + let mut work = self.work.lock().unwrap(); + work.extend(items); + } } } #[napi] pub struct JsProtocolIo { requests: Mutex>, + work: Mutex bool + Send + Sync>>>, } impl Default for JsProtocolIo { fn default() -> Self { Self { requests: Mutex::new(Vec::new()), + work: Mutex::new(VecDeque::new()), } } } -#[napi] impl JsProtocolIo { - #[napi] pub fn take_request(&self) -> Option { self.requests.lock().unwrap().pop() } diff --git a/bindings/javascript/sync/src/lib.rs b/bindings/javascript/sync/src/lib.rs index 64be75f1a..7730312c0 100644 --- a/bindings/javascript/sync/src/lib.rs +++ b/bindings/javascript/sync/src/lib.rs @@ -14,6 +14,7 @@ use napi_derive::napi; use turso_node::{DatabaseOpts, IoLoopTask}; use turso_sync_engine::{ database_sync_engine::{DatabaseSyncEngine, DatabaseSyncEngineOpts}, + protocol_io::ProtocolIO, types::{Coro, DatabaseChangeType, DatabaseSyncEngineProtocolVersion}, }; @@ -304,6 +305,11 @@ impl SyncEngine { Ok(self.protocol()?.take_request()) } + #[napi] + pub fn protocol_io_step(&self) -> napi::Result<()> { + Ok(self.protocol()?.step_work()) + } + #[napi] pub fn push(&self) -> GeneratorHolder { self.run(async move |coro, guard| { diff --git a/sync/engine/src/database_sync_operations.rs b/sync/engine/src/database_sync_operations.rs index e9c3831ef..0c79bde36 100644 --- a/sync/engine/src/database_sync_operations.rs +++ b/sync/engine/src/database_sync_operations.rs @@ -306,7 +306,7 @@ pub async fn pull_pages_v1( server_revision: &str, pages: &[u32], ) -> Result> { - tracing::info!("pull_pages_v1: revision={server_revision}"); + tracing::info!("pull_pages_v1: revision={server_revision}, pages={pages:?}"); let mut bytes = BytesMut::new(); let mut bitmap = RoaringBitmap::new(); @@ -1092,15 +1092,21 @@ pub async fn bootstrap_db_file_v1( main_db_path: &str, prefix: Option, ) -> Result { - let mut bitmap = RoaringBitmap::new(); - if let Some(prefix) = prefix { - bitmap.insert_range(0..(prefix / PAGE_SIZE) as u32); - } - - let mut bitmap_bytes = Vec::with_capacity(bitmap.serialized_size()); - bitmap.serialize_into(&mut bitmap_bytes).map_err(|e| { - Error::DatabaseSyncEngineError(format!("unable to serialize bootstrap request: {e}")) - })?; + let bitmap_bytes = { + if let Some(prefix) = prefix { + let mut bitmap = RoaringBitmap::new(); + bitmap.insert_range(0..(prefix / PAGE_SIZE) as u32); + let mut bitmap_bytes = Vec::with_capacity(bitmap.serialized_size()); + bitmap.serialize_into(&mut bitmap_bytes).map_err(|e| { + Error::DatabaseSyncEngineError(format!( + "unable to serialize bootstrap request: {e}" + )) + })?; + bitmap_bytes + } else { + Vec::new() + } + }; let request = PullUpdatesReqProtoBody { encoding: PageUpdatesEncodingReq::Raw as i32, @@ -1525,8 +1531,6 @@ mod tests { fn is_done(&self) -> crate::Result { Ok(self.data.borrow().is_empty()) } - - fn set_callback(&self, callback: Box ()>) {} } #[test] diff --git a/sync/engine/src/protocol_io.rs b/sync/engine/src/protocol_io.rs index 369b82112..35a580efd 100644 --- a/sync/engine/src/protocol_io.rs +++ b/sync/engine/src/protocol_io.rs @@ -3,11 +3,11 @@ use crate::{ Result, }; -pub trait DataPollResult { +pub trait DataPollResult: Send + Sync + 'static { fn data(&self) -> &[T]; } -pub trait DataCompletion { +pub trait DataCompletion: Send + Sync + 'static { type DataPollResult: DataPollResult; fn status(&self) -> Result>; fn poll_data(&self) -> Result>; @@ -30,5 +30,6 @@ pub trait ProtocolIO: Send + Sync + 'static { body: Option>, headers: &[(&str, &str)], ) -> Result; - fn register(&self, callback: Box bool>); + fn add_work(&self, callback: Box bool + Send + Sync>); + fn step_work(&self); } diff --git a/sync/engine/src/types.rs b/sync/engine/src/types.rs index 0ffc7a7b6..c74431f2d 100644 --- a/sync/engine/src/types.rs +++ b/sync/engine/src/types.rs @@ -1,24 +1,27 @@ -use std::{cell::RefCell, collections::HashMap, sync::Arc}; +use std::{ + collections::HashMap, + sync::{Arc, Mutex}, +}; use serde::{Deserialize, Serialize}; use crate::{database_sync_operations::MutexSlot, errors::Error, Result}; pub struct Coro { - pub ctx: RefCell, + pub ctx: Mutex, gen: genawaiter::sync::Co>, } impl Coro { pub fn new(ctx: Ctx, gen: genawaiter::sync::Co>) -> Self { Self { - ctx: RefCell::new(ctx), + ctx: Mutex::new(ctx), gen, } } pub async fn yield_(&self, value: ProtocolCommand) -> Result<()> { let ctx = self.gen.yield_(value).await?; - self.ctx.replace(ctx); + *self.ctx.lock().unwrap() = ctx; Ok(()) } } @@ -27,7 +30,7 @@ impl From>> for Coro<()> { fn from(value: genawaiter::sync::Co>) -> Self { Self { gen: value, - ctx: RefCell::new(()), + ctx: Mutex::new(()), } } }