diff --git a/Cargo.lock b/Cargo.lock index 89ece1c11..50f3f8047 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5062,6 +5062,7 @@ dependencies = [ "futures", "genawaiter", "http", + "libc", "prost 0.14.1", "rand 0.9.2", "rand_chacha 0.9.0", diff --git a/bindings/javascript/packages/common/promise.ts b/bindings/javascript/packages/common/promise.ts index 0ea8030b1..a1ed8a000 100644 --- a/bindings/javascript/packages/common/promise.ts +++ b/bindings/javascript/packages/common/promise.ts @@ -37,13 +37,15 @@ class Database { inTransaction: boolean; private db: NativeDatabase; + private ioStep: () => Promise; private execLock: AsyncLock; private _inTransaction: boolean = false; protected connected: boolean = false; - constructor(db: NativeDatabase) { + constructor(db: NativeDatabase, ioStep?: () => Promise) { this.db = db; this.execLock = new AsyncLock(); + this.ioStep = ioStep ?? (async () => { }); Object.defineProperties(this, { name: { get: () => this.db.path }, readonly: { get: () => this.db.readonly }, @@ -74,9 +76,9 @@ class Database { try { if (this.connected) { - return new Statement(maybeValue(this.db.prepare(sql)), this.db, this.execLock); + return new Statement(maybeValue(this.db.prepare(sql)), this.db, this.execLock, this.ioStep); } else { - return new Statement(maybePromise(() => this.connect().then(() => this.db.prepare(sql))), this.db, this.execLock) + return new Statement(maybePromise(() => this.connect().then(() => this.db.prepare(sql))), this.db, this.execLock, this.ioStep) } } catch (err) { throw convertError(err); @@ -185,6 +187,7 @@ class Database { const stepResult = exec.stepSync(); if (stepResult === STEP_IO) { await this.db.ioLoopAsync(); + await this.ioStep(); continue; } if (stepResult === STEP_DONE) { @@ -280,11 +283,13 @@ class Statement { private stmt: MaybeLazy; private db: NativeDatabase; private execLock: AsyncLock; + private ioStep: () => Promise; - constructor(stmt: MaybeLazy, db: NativeDatabase, execLock: AsyncLock) { + constructor(stmt: MaybeLazy, db: NativeDatabase, execLock: AsyncLock, ioStep: () => Promise) { this.stmt = stmt; this.db = db; this.execLock = execLock; + this.ioStep = ioStep; } /** @@ -352,7 +357,7 @@ class Statement { while (true) { const stepResult = await stmt.stepSync(); if (stepResult === STEP_IO) { - await this.db.ioLoopAsync(); + await this.io(); continue; } if (stepResult === STEP_DONE) { @@ -389,7 +394,7 @@ class Statement { while (true) { const stepResult = await stmt.stepSync(); if (stepResult === STEP_IO) { - await this.db.ioLoopAsync(); + await this.io(); continue; } if (stepResult === STEP_DONE) { @@ -453,7 +458,7 @@ class Statement { while (true) { const stepResult = await stmt.stepSync(); if (stepResult === STEP_IO) { - await this.db.ioLoopAsync(); + await this.io(); continue; } if (stepResult === STEP_DONE) { @@ -471,6 +476,11 @@ class Statement { } } + async io() { + await this.db.ioLoopAsync(); + await this.ioStep(); + } + /** * Interrupts the statement. */ diff --git a/bindings/javascript/sync/packages/common/index.ts b/bindings/javascript/sync/packages/common/index.ts index 5facb2dc9..0b5ce37fa 100644 --- a/bindings/javascript/sync/packages/common/index.ts +++ b/bindings/javascript/sync/packages/common/index.ts @@ -1,7 +1,17 @@ -import { run, memoryIO, SyncEngineGuards } from "./run.js" -import { DatabaseOpts, ProtocolIo, RunOpts, DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult, DatabaseStats, DatabaseChangeType, EncryptionOpts } from "./types.js" +import { run, memoryIO, runner, SyncEngineGuards, Runner } from "./run.js" +import { + DatabaseOpts, + ProtocolIo, + RunOpts, + DatabaseRowMutation, + DatabaseRowStatement, + DatabaseRowTransformResult, + DatabaseStats, + DatabaseChangeType, + EncryptionOpts, +} from "./types.js" -export { run, memoryIO, SyncEngineGuards } +export { run, memoryIO, runner, SyncEngineGuards, Runner } export type { DatabaseStats, DatabaseOpts, diff --git a/bindings/javascript/sync/packages/common/run.ts b/bindings/javascript/sync/packages/common/run.ts index 95d47612f..a13c6b98d 100644 --- a/bindings/javascript/sync/packages/common/run.ts +++ b/bindings/javascript/sync/packages/common/run.ts @@ -116,9 +116,28 @@ export function memoryIO(): ProtocolIo { } }; +export interface Runner { + wait(): Promise; +} -export async function run(opts: RunOpts, io: ProtocolIo, engine: any, generator: any): Promise { +export function runner(opts: RunOpts, io: ProtocolIo, engine: any): Runner { let tasks = []; + return { + async wait() { + for (let request = engine.protocolIo(); request != null; request = engine.protocolIo()) { + tasks.push(trackPromise(process(opts, io, request))); + } + const tasksRace = tasks.length == 0 ? Promise.resolve() : Promise.race([timeoutMs(opts.preemptionMs), ...tasks.map(t => t.promise)]); + await Promise.all([engine.ioLoopAsync(), tasksRace]); + + tasks = tasks.filter(t => !t.finished); + + engine.protocolIoStep(); + }, + } +} + +export async function run(runner: Runner, generator: any): Promise { while (true) { const { type, ...rest }: GeneratorResponse = await generator.resumeAsync(null); if (type == 'Done') { @@ -131,14 +150,7 @@ export async function run(opts: RunOpts, io: ProtocolIo, engine: any, generator: //@ts-ignore return rest.changes; } - for (let request = engine.protocolIo(); request != null; request = engine.protocolIo()) { - tasks.push(trackPromise(process(opts, io, request))); - } - - const tasksRace = tasks.length == 0 ? Promise.resolve() : Promise.race([timeoutMs(opts.preemptionMs), ...tasks.map(t => t.promise)]); - await Promise.all([engine.ioLoopAsync(), tasksRace]); - - tasks = tasks.filter(t => !t.finished); + await runner.wait(); } } diff --git a/bindings/javascript/sync/packages/common/types.ts b/bindings/javascript/sync/packages/common/types.ts index 98f0f0c69..06a48142e 100644 --- a/bindings/javascript/sync/packages/common/types.ts +++ b/bindings/javascript/sync/packages/common/types.ts @@ -107,6 +107,10 @@ export interface DatabaseOpts { * optional parameter to enable internal logging for the database */ tracing?: 'error' | 'warn' | 'info' | 'debug' | 'trace', + /** + * optional parameter to enable partial sync for the database + */ + partialBootstrapStrategy?: { kind: 'prefix', length: number } | { kind: 'query', query: string }; } export interface DatabaseStats { /** @@ -134,6 +138,14 @@ export interface DatabaseStats { * (can be used as e-tag, but string must not be interpreted in any way and must be used as opaque value) */ revision: string | null; + /** + * total amount of sent bytes over the network + */ + networkSentBytes: number; + /** + * total amount of received bytes over the network + */ + networkReceivedBytes: number; } /* internal types used in the native/browser packages */ diff --git a/bindings/javascript/sync/packages/native/index.d.ts b/bindings/javascript/sync/packages/native/index.d.ts index 289cddbbb..4073cdd1e 100644 --- a/bindings/javascript/sync/packages/native/index.d.ts +++ b/bindings/javascript/sync/packages/native/index.d.ts @@ -163,7 +163,7 @@ export declare class JsDataCompletion { } export declare class JsProtocolIo { - takeRequest(): JsProtocolRequestBytes | null + } export declare class JsProtocolRequestBytes { @@ -178,6 +178,7 @@ export declare class SyncEngine { /** Runs the I/O loop asynchronously, returning a Promise. */ ioLoopAsync(): Promise protocolIo(): JsProtocolRequestBytes | null + protocolIoStep(): void push(): GeneratorHolder stats(): GeneratorHolder wait(): GeneratorHolder @@ -220,9 +221,13 @@ export type DatabaseRowTransformResultJs = export type GeneratorResponse = | { type: 'IO' } | { type: 'Done' } - | { type: 'SyncEngineStats', operations: number, mainWal: number, revertWal: number, lastPullUnixTime?: number, lastPushUnixTime?: number, revision?: string } + | { type: 'SyncEngineStats', operations: number, mainWal: number, revertWal: number, lastPullUnixTime?: number, lastPushUnixTime?: number, revision?: string, networkSentBytes: number, networkReceivedBytes: number } | { type: 'SyncEngineChanges', changes: SyncEngineChanges } +export type JsPartialBootstrapStrategy = + | { type: 'Prefix', length: number } + | { type: 'Query', query: string } + export type JsProtocolRequest = | { type: 'Http', method: string, path: string, body?: Array, headers: Array<[string, string]> } | { type: 'FullRead', path: string } @@ -240,6 +245,7 @@ export interface SyncEngineOpts { protocolVersion?: SyncEngineProtocolVersion bootstrapIfEmpty: boolean remoteEncryption?: string + partialBoostrapStrategy?: JsPartialBootstrapStrategy } export declare const enum SyncEngineProtocolVersion { diff --git a/bindings/javascript/sync/packages/native/index.js b/bindings/javascript/sync/packages/native/index.js index ddc3d3f36..a5b9f5b18 100644 --- a/bindings/javascript/sync/packages/native/index.js +++ b/bindings/javascript/sync/packages/native/index.js @@ -81,8 +81,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-android-arm64') const bindingPackageVersion = require('@tursodatabase/sync-android-arm64/package.json').version - if (bindingPackageVersion !== '0.3.0-pre.4' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.3.0-pre.4 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.4.0-pre.1' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.4.0-pre.1 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -97,8 +97,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-android-arm-eabi') const bindingPackageVersion = require('@tursodatabase/sync-android-arm-eabi/package.json').version - if (bindingPackageVersion !== '0.3.0-pre.4' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.3.0-pre.4 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.4.0-pre.1' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.4.0-pre.1 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -117,8 +117,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-win32-x64-msvc') const bindingPackageVersion = require('@tursodatabase/sync-win32-x64-msvc/package.json').version - if (bindingPackageVersion !== '0.3.0-pre.4' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.3.0-pre.4 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.4.0-pre.1' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.4.0-pre.1 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -133,8 +133,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-win32-ia32-msvc') const bindingPackageVersion = require('@tursodatabase/sync-win32-ia32-msvc/package.json').version - if (bindingPackageVersion !== '0.3.0-pre.4' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.3.0-pre.4 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.4.0-pre.1' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.4.0-pre.1 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -149,8 +149,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-win32-arm64-msvc') const bindingPackageVersion = require('@tursodatabase/sync-win32-arm64-msvc/package.json').version - if (bindingPackageVersion !== '0.3.0-pre.4' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.3.0-pre.4 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.4.0-pre.1' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.4.0-pre.1 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -168,8 +168,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-darwin-universal') const bindingPackageVersion = require('@tursodatabase/sync-darwin-universal/package.json').version - if (bindingPackageVersion !== '0.3.0-pre.4' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.3.0-pre.4 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.4.0-pre.1' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.4.0-pre.1 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -184,8 +184,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-darwin-x64') const bindingPackageVersion = require('@tursodatabase/sync-darwin-x64/package.json').version - if (bindingPackageVersion !== '0.3.0-pre.4' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.3.0-pre.4 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.4.0-pre.1' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.4.0-pre.1 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -200,8 +200,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-darwin-arm64') const bindingPackageVersion = require('@tursodatabase/sync-darwin-arm64/package.json').version - if (bindingPackageVersion !== '0.3.0-pre.4' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.3.0-pre.4 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.4.0-pre.1' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.4.0-pre.1 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -220,8 +220,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-freebsd-x64') const bindingPackageVersion = require('@tursodatabase/sync-freebsd-x64/package.json').version - if (bindingPackageVersion !== '0.3.0-pre.4' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.3.0-pre.4 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.4.0-pre.1' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.4.0-pre.1 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -236,8 +236,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-freebsd-arm64') const bindingPackageVersion = require('@tursodatabase/sync-freebsd-arm64/package.json').version - if (bindingPackageVersion !== '0.3.0-pre.4' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.3.0-pre.4 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.4.0-pre.1' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.4.0-pre.1 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -257,8 +257,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-linux-x64-musl') const bindingPackageVersion = require('@tursodatabase/sync-linux-x64-musl/package.json').version - if (bindingPackageVersion !== '0.3.0-pre.4' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.3.0-pre.4 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.4.0-pre.1' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.4.0-pre.1 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -273,8 +273,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-linux-x64-gnu') const bindingPackageVersion = require('@tursodatabase/sync-linux-x64-gnu/package.json').version - if (bindingPackageVersion !== '0.3.0-pre.4' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.3.0-pre.4 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.4.0-pre.1' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.4.0-pre.1 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -291,8 +291,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-linux-arm64-musl') const bindingPackageVersion = require('@tursodatabase/sync-linux-arm64-musl/package.json').version - if (bindingPackageVersion !== '0.3.0-pre.4' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.3.0-pre.4 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.4.0-pre.1' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.4.0-pre.1 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -307,8 +307,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-linux-arm64-gnu') const bindingPackageVersion = require('@tursodatabase/sync-linux-arm64-gnu/package.json').version - if (bindingPackageVersion !== '0.3.0-pre.4' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.3.0-pre.4 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.4.0-pre.1' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.4.0-pre.1 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -325,8 +325,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-linux-arm-musleabihf') const bindingPackageVersion = require('@tursodatabase/sync-linux-arm-musleabihf/package.json').version - if (bindingPackageVersion !== '0.3.0-pre.4' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.3.0-pre.4 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.4.0-pre.1' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.4.0-pre.1 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -341,8 +341,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-linux-arm-gnueabihf') const bindingPackageVersion = require('@tursodatabase/sync-linux-arm-gnueabihf/package.json').version - if (bindingPackageVersion !== '0.3.0-pre.4' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.3.0-pre.4 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.4.0-pre.1' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.4.0-pre.1 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -359,8 +359,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-linux-riscv64-musl') const bindingPackageVersion = require('@tursodatabase/sync-linux-riscv64-musl/package.json').version - if (bindingPackageVersion !== '0.3.0-pre.4' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.3.0-pre.4 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.4.0-pre.1' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.4.0-pre.1 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -375,8 +375,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-linux-riscv64-gnu') const bindingPackageVersion = require('@tursodatabase/sync-linux-riscv64-gnu/package.json').version - if (bindingPackageVersion !== '0.3.0-pre.4' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.3.0-pre.4 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.4.0-pre.1' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.4.0-pre.1 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -392,8 +392,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-linux-ppc64-gnu') const bindingPackageVersion = require('@tursodatabase/sync-linux-ppc64-gnu/package.json').version - if (bindingPackageVersion !== '0.3.0-pre.4' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.3.0-pre.4 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.4.0-pre.1' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.4.0-pre.1 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -408,8 +408,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-linux-s390x-gnu') const bindingPackageVersion = require('@tursodatabase/sync-linux-s390x-gnu/package.json').version - if (bindingPackageVersion !== '0.3.0-pre.4' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.3.0-pre.4 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.4.0-pre.1' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.4.0-pre.1 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -428,8 +428,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-openharmony-arm64') const bindingPackageVersion = require('@tursodatabase/sync-openharmony-arm64/package.json').version - if (bindingPackageVersion !== '0.3.0-pre.4' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.3.0-pre.4 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.4.0-pre.1' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.4.0-pre.1 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -444,8 +444,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-openharmony-x64') const bindingPackageVersion = require('@tursodatabase/sync-openharmony-x64/package.json').version - if (bindingPackageVersion !== '0.3.0-pre.4' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.3.0-pre.4 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.4.0-pre.1' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.4.0-pre.1 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -460,8 +460,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-openharmony-arm') const bindingPackageVersion = require('@tursodatabase/sync-openharmony-arm/package.json').version - if (bindingPackageVersion !== '0.3.0-pre.4' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.3.0-pre.4 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.4.0-pre.1' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.4.0-pre.1 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { diff --git a/bindings/javascript/sync/packages/native/promise.test.ts b/bindings/javascript/sync/packages/native/promise.test.ts index ea95e0f13..2874152b2 100644 --- a/bindings/javascript/sync/packages/native/promise.test.ts +++ b/bindings/javascript/sync/packages/native/promise.test.ts @@ -13,6 +13,77 @@ function cleanup(path) { try { unlinkSync(`${path}-wal-revert`) } catch (e) { } } +test('partial sync (prefix bootstrap strategy)', async () => { + { + const db = await connect({ + path: ':memory:', + url: process.env.VITE_TURSO_DB_URL, + longPollTimeoutMs: 100, + }); + await db.exec("CREATE TABLE IF NOT EXISTS partial(value BLOB)"); + await db.exec("DELETE FROM partial"); + await db.exec("INSERT INTO partial SELECT randomblob(1024) FROM generate_series(1, 2000)"); + await db.push(); + await db.close(); + } + + const db = await connect({ + path: ':memory:', + url: process.env.VITE_TURSO_DB_URL, + longPollTimeoutMs: 100, + partialBootstrapStrategy: { kind: 'prefix', length: 128 * 1024 }, + }); + + // 128 pages plus some overhead (very rough estimation) + expect((await db.stats()).networkReceivedBytes).toBeLessThanOrEqual(128 * (4096 + 128)); + + // select of one record shouldn't increase amount of received data + expect(await db.prepare("SELECT length(value) as length FROM partial LIMIT 1").all()).toEqual([{ length: 1024 }]); + expect((await db.stats()).networkReceivedBytes).toBeLessThanOrEqual(128 * (4096 + 128)); + + await db.prepare("INSERT INTO partial VALUES (-1)").run(); + + expect(await db.prepare("SELECT COUNT(*) as cnt FROM partial").all()).toEqual([{ cnt: 2001 }]); + expect((await db.stats()).networkReceivedBytes).toBeGreaterThanOrEqual(2000 * 1024); +}) + +test('partial sync (query bootstrap strategy)', async () => { + { + const db = await connect({ + path: ':memory:', + url: process.env.VITE_TURSO_DB_URL, + longPollTimeoutMs: 100, + }); + await db.exec("CREATE TABLE IF NOT EXISTS partial_keyed(key INTEGER PRIMARY KEY, value BLOB)"); + await db.exec("DELETE FROM partial_keyed"); + await db.exec("INSERT INTO partial_keyed SELECT value, randomblob(1024) FROM generate_series(1, 2000)"); + await db.push(); + await db.close(); + } + + const db = await connect({ + path: ':memory:', + url: process.env.VITE_TURSO_DB_URL, + longPollTimeoutMs: 100, + partialBootstrapStrategy: { kind: 'query', query: 'SELECT * FROM partial_keyed WHERE key = 1000' }, + }); + + // we must sync only few pages + expect((await db.stats()).networkReceivedBytes).toBeLessThanOrEqual(10 * (4096 + 128)); + + // select of one record shouldn't increase amount of received data by a lot + expect(await db.prepare("SELECT length(value) as length FROM partial_keyed LIMIT 1").all()).toEqual([{ length: 1024 }]); + expect((await db.stats()).networkReceivedBytes).toBeLessThanOrEqual(10 * (4096 + 128)); + + await db.prepare("INSERT INTO partial_keyed VALUES (-1, -1)").run(); + const n1 = await db.stats(); + + // same as bootstrap query - we shouldn't bring any more pages + expect(await db.prepare("SELECT length(value) as length FROM partial_keyed WHERE key = 1000").all()).toEqual([{ length: 1024 }]); + const n2 = await db.stats(); + expect(n1.networkReceivedBytes).toEqual(n2.networkReceivedBytes); +}) + test('concurrent-actions-consistency', async () => { { const db = await connect({ diff --git a/bindings/javascript/sync/packages/native/promise.ts b/bindings/javascript/sync/packages/native/promise.ts index 1df3cfb21..c494e6e7c 100644 --- a/bindings/javascript/sync/packages/native/promise.ts +++ b/bindings/javascript/sync/packages/native/promise.ts @@ -1,5 +1,5 @@ import { DatabasePromise } from "@tursodatabase/database-common" -import { ProtocolIo, run, DatabaseOpts, EncryptionOpts, RunOpts, DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult, DatabaseStats, SyncEngineGuards } from "@tursodatabase/sync-common"; +import { ProtocolIo, run, DatabaseOpts, EncryptionOpts, RunOpts, DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult, DatabaseStats, SyncEngineGuards, Runner, runner } from "@tursodatabase/sync-common"; import { SyncEngine, SyncEngineProtocolVersion, Database as NativeDatabase } from "#index"; import { promises } from "node:fs"; @@ -40,10 +40,9 @@ function memoryIO(): ProtocolIo { } }; class Database extends DatabasePromise { - #runOpts: RunOpts; #engine: any; - #io: ProtocolIo; - #guards: SyncEngineGuards + #guards: SyncEngineGuards; + #runner: Runner; constructor(opts: DatabaseOpts) { if (opts.url == null) { super(new NativeDatabase(opts.path, { tracing: opts.tracing }) as any); @@ -51,6 +50,17 @@ class Database extends DatabasePromise { return; } + let partialBoostrapStrategy = undefined; + if (opts.partialBootstrapStrategy != null) { + switch (opts.partialBootstrapStrategy.kind) { + case "prefix": + partialBoostrapStrategy = { type: "Prefix", length: opts.partialBootstrapStrategy.length }; + break; + case "query": + partialBoostrapStrategy = { type: "Query", query: opts.partialBootstrapStrategy.query }; + break; + } + } const engine = new SyncEngine({ path: opts.path, clientName: opts.clientName, @@ -60,8 +70,8 @@ class Database extends DatabasePromise { tracing: opts.tracing, bootstrapIfEmpty: typeof opts.url != "function" || opts.url() != null, remoteEncryption: opts.remoteEncryption?.cipher, + partialBoostrapStrategy: partialBoostrapStrategy }); - super(engine.db() as unknown as any); let headers: { [K: string]: string } | (() => Promise<{ [K: string]: string }>); if (typeof opts.authToken == "function") { @@ -83,14 +93,21 @@ class Database extends DatabasePromise { }) }; } - this.#runOpts = { + const runOpts = { url: opts.url, headers: headers, preemptionMs: 1, transform: opts.transform, }; + const db = engine.db() as unknown as any; + const memory = db.memory; + const io = memory ? memoryIO() : NodeIO; + const run = runner(runOpts, io, engine); + + super(engine.db() as unknown as any, () => run.wait()); + + this.#runner = run; this.#engine = engine; - this.#io = this.memory ? memoryIO() : NodeIO; this.#guards = new SyncEngineGuards(); } /** @@ -102,7 +119,7 @@ class Database extends DatabasePromise { } else if (this.#engine == null) { await super.connect(); } else { - await run(this.#runOpts, this.#io, this.#engine, this.#engine.connect()); + await run(this.#runner, this.#engine.connect()); } this.connected = true; } @@ -115,11 +132,11 @@ class Database extends DatabasePromise { if (this.#engine == null) { throw new Error("sync is disabled as database was opened without sync support") } - const changes = await this.#guards.wait(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.wait())); + const changes = await this.#guards.wait(async () => await run(this.#runner, this.#engine.wait())); if (changes.empty()) { return false; } - await this.#guards.apply(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.apply(changes))); + await this.#guards.apply(async () => await run(this.#runner, this.#engine.apply(changes))); return true; } /** @@ -130,7 +147,7 @@ class Database extends DatabasePromise { if (this.#engine == null) { throw new Error("sync is disabled as database was opened without sync support") } - await this.#guards.push(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.push())); + await this.#guards.push(async () => await run(this.#runner, this.#engine.push())); } /** * checkpoint WAL for local database @@ -139,7 +156,7 @@ class Database extends DatabasePromise { if (this.#engine == null) { throw new Error("sync is disabled as database was opened without sync support") } - await this.#guards.checkpoint(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.checkpoint())); + await this.#guards.checkpoint(async () => await run(this.#runner, this.#engine.checkpoint())); } /** * @returns statistic of current local database @@ -148,7 +165,7 @@ class Database extends DatabasePromise { if (this.#engine == null) { throw new Error("sync is disabled as database was opened without sync support") } - return (await run(this.#runOpts, this.#io, this.#engine, this.#engine.stats())); + return (await run(this.#runner, this.#engine.stats())); } /** * close the database diff --git a/bindings/javascript/sync/packages/wasm/promise-bundle.ts b/bindings/javascript/sync/packages/wasm/promise-bundle.ts index 8080d7c53..88ec078d3 100644 --- a/bindings/javascript/sync/packages/wasm/promise-bundle.ts +++ b/bindings/javascript/sync/packages/wasm/promise-bundle.ts @@ -1,6 +1,6 @@ import { registerFileAtWorker, unregisterFileAtWorker } from "@tursodatabase/database-wasm-common" import { DatabasePromise } from "@tursodatabase/database-common" -import { ProtocolIo, run, DatabaseOpts, EncryptionOpts, RunOpts, DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult, DatabaseStats, SyncEngineGuards } from "@tursodatabase/sync-common"; +import { ProtocolIo, run, DatabaseOpts, EncryptionOpts, RunOpts, DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult, DatabaseStats, SyncEngineGuards, Runner, runner } from "@tursodatabase/sync-common"; import { SyncEngine, SyncEngineProtocolVersion, initThreadPool, MainWorker, Database as NativeDatabase } from "./index-bundle.js"; let BrowserIO: ProtocolIo = { @@ -39,7 +39,7 @@ async function init(): Promise { } class Database extends DatabasePromise { - #runOpts: RunOpts; + #runner: Runner; #engine: any; #io: ProtocolIo; #guards: SyncEngineGuards; @@ -61,7 +61,6 @@ class Database extends DatabasePromise { bootstrapIfEmpty: typeof opts.url != "function" || opts.url() != null, remoteEncryption: opts.remoteEncryption?.cipher, }); - super(engine.db() as unknown as any); let headers: { [K: string]: string } | (() => Promise<{ [K: string]: string }>); if (typeof opts.authToken == "function") { @@ -83,14 +82,21 @@ class Database extends DatabasePromise { }) }; } - this.#runOpts = { + const runOpts = { url: opts.url, headers: headers, preemptionMs: 1, transform: opts.transform, }; + const db = engine.db() as unknown as any; + const memory = db.memory; + const io = memory ? memoryIO() : BrowserIO; + const run = runner(runOpts, io, engine); + super(engine.db() as unknown as any, () => run.wait()); + + this.#runner = run; this.#engine = engine; - this.#io = this.memory ? memoryIO() : BrowserIO; + this.#io = io; this.#guards = new SyncEngineGuards(); } /** @@ -112,7 +118,7 @@ class Database extends DatabasePromise { registerFileAtWorker(this.#worker, `${this.name}-changes`), ]); } - await run(this.#runOpts, this.#io, this.#engine, this.#engine.connect()); + await run(this.#runner, this.#engine.connect()); } this.connected = true; } @@ -125,11 +131,11 @@ class Database extends DatabasePromise { if (this.#engine == null) { throw new Error("sync is disabled as database was opened without sync support") } - const changes = await this.#guards.wait(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.wait())); + const changes = await this.#guards.wait(async () => await run(this.#runner, this.#engine.wait())); if (changes.empty()) { return false; } - await this.#guards.apply(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.apply(changes))); + await this.#guards.apply(async () => await run(this.#runner, this.#engine.apply(changes))); return true; } /** @@ -140,7 +146,7 @@ class Database extends DatabasePromise { if (this.#engine == null) { throw new Error("sync is disabled as database was opened without sync support") } - await this.#guards.push(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.push())); + await this.#guards.push(async () => await run(this.#runner, this.#engine.push())); } /** * checkpoint WAL for local database @@ -149,7 +155,7 @@ class Database extends DatabasePromise { if (this.#engine == null) { throw new Error("sync is disabled as database was opened without sync support") } - await this.#guards.checkpoint(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.checkpoint())); + await this.#guards.checkpoint(async () => await run(this.#runner, this.#engine.checkpoint())); } /** * @returns statistic of current local database @@ -158,7 +164,7 @@ class Database extends DatabasePromise { if (this.#engine == null) { throw new Error("sync is disabled as database was opened without sync support") } - return (await run(this.#runOpts, this.#io, this.#engine, this.#engine.stats())); + return (await run(this.#runner, this.#engine.stats())); } /** * close the database and relevant files diff --git a/bindings/javascript/sync/packages/wasm/promise-default.ts b/bindings/javascript/sync/packages/wasm/promise-default.ts index 156998cfc..698add803 100644 --- a/bindings/javascript/sync/packages/wasm/promise-default.ts +++ b/bindings/javascript/sync/packages/wasm/promise-default.ts @@ -1,6 +1,6 @@ import { registerFileAtWorker, unregisterFileAtWorker } from "@tursodatabase/database-wasm-common" import { DatabasePromise } from "@tursodatabase/database-common" -import { ProtocolIo, run, DatabaseOpts, EncryptionOpts, RunOpts, DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult, DatabaseStats, SyncEngineGuards } from "@tursodatabase/sync-common"; +import { ProtocolIo, run, DatabaseOpts, EncryptionOpts, RunOpts, DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult, DatabaseStats, SyncEngineGuards, Runner, runner } from "@tursodatabase/sync-common"; import { SyncEngine, SyncEngineProtocolVersion, initThreadPool, MainWorker, Database as NativeDatabase } from "./index-default.js"; let BrowserIO: ProtocolIo = { @@ -39,7 +39,7 @@ async function init(): Promise { } class Database extends DatabasePromise { - #runOpts: RunOpts; + #runner: Runner; #engine: any; #io: ProtocolIo; #guards: SyncEngineGuards; @@ -61,7 +61,6 @@ class Database extends DatabasePromise { bootstrapIfEmpty: typeof opts.url != "function" || opts.url() != null, remoteEncryption: opts.remoteEncryption?.cipher, }); - super(engine.db() as unknown as any); let headers: { [K: string]: string } | (() => Promise<{ [K: string]: string }>); if (typeof opts.authToken == "function") { @@ -83,14 +82,21 @@ class Database extends DatabasePromise { }) }; } - this.#runOpts = { + const runOpts = { url: opts.url, headers: headers, preemptionMs: 1, transform: opts.transform, }; + const db = engine.db() as unknown as any; + const memory = db.memory; + const io = memory ? memoryIO() : BrowserIO; + const run = runner(runOpts, io, engine); + super(engine.db() as unknown as any, () => run.wait()); + + this.#runner = run; this.#engine = engine; - this.#io = this.memory ? memoryIO() : BrowserIO; + this.#io = io; this.#guards = new SyncEngineGuards(); } /** @@ -112,7 +118,7 @@ class Database extends DatabasePromise { registerFileAtWorker(this.#worker, `${this.name}-changes`), ]); } - await run(this.#runOpts, this.#io, this.#engine, this.#engine.connect()); + await run(this.#runner, this.#engine.connect()); } this.connected = true; } @@ -125,11 +131,11 @@ class Database extends DatabasePromise { if (this.#engine == null) { throw new Error("sync is disabled as database was opened without sync support") } - const changes = await this.#guards.wait(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.wait())); + const changes = await this.#guards.wait(async () => await run(this.#runner, this.#engine.wait())); if (changes.empty()) { return false; } - await this.#guards.apply(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.apply(changes))); + await this.#guards.apply(async () => await run(this.#runner, this.#engine.apply(changes))); return true; } /** @@ -140,7 +146,7 @@ class Database extends DatabasePromise { if (this.#engine == null) { throw new Error("sync is disabled as database was opened without sync support") } - await this.#guards.push(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.push())); + await this.#guards.push(async () => await run(this.#runner, this.#engine.push())); } /** * checkpoint WAL for local database @@ -149,7 +155,7 @@ class Database extends DatabasePromise { if (this.#engine == null) { throw new Error("sync is disabled as database was opened without sync support") } - await this.#guards.checkpoint(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.checkpoint())); + await this.#guards.checkpoint(async () => await run(this.#runner, this.#engine.checkpoint())); } /** * @returns statistic of current local database @@ -158,7 +164,7 @@ class Database extends DatabasePromise { if (this.#engine == null) { throw new Error("sync is disabled as database was opened without sync support") } - return (await run(this.#runOpts, this.#io, this.#engine, this.#engine.stats())); + return (await run(this.#runner, this.#engine.stats())); } /** * close the database and relevant files diff --git a/bindings/javascript/sync/packages/wasm/promise-turbopack-hack.ts b/bindings/javascript/sync/packages/wasm/promise-turbopack-hack.ts index 935b8b31d..eeb55f475 100644 --- a/bindings/javascript/sync/packages/wasm/promise-turbopack-hack.ts +++ b/bindings/javascript/sync/packages/wasm/promise-turbopack-hack.ts @@ -1,6 +1,6 @@ import { registerFileAtWorker, unregisterFileAtWorker } from "@tursodatabase/database-wasm-common" import { DatabasePromise } from "@tursodatabase/database-common" -import { ProtocolIo, run, DatabaseOpts, EncryptionOpts, RunOpts, DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult, DatabaseStats, SyncEngineGuards } from "@tursodatabase/sync-common"; +import { ProtocolIo, run, DatabaseOpts, EncryptionOpts, RunOpts, DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult, DatabaseStats, SyncEngineGuards, Runner, runner } from "@tursodatabase/sync-common"; import { SyncEngine, SyncEngineProtocolVersion, initThreadPool, MainWorker, Database as NativeDatabase } from "./index-turbopack-hack.js"; let BrowserIO: ProtocolIo = { @@ -39,7 +39,7 @@ async function init(): Promise { } class Database extends DatabasePromise { - #runOpts: RunOpts; + #runner: Runner; #engine: any; #io: ProtocolIo; #guards: SyncEngineGuards; @@ -61,7 +61,6 @@ class Database extends DatabasePromise { bootstrapIfEmpty: typeof opts.url != "function" || opts.url() != null, remoteEncryption: opts.remoteEncryption?.cipher, }); - super(engine.db() as unknown as any); let headers: { [K: string]: string } | (() => Promise<{ [K: string]: string }>); if (typeof opts.authToken == "function") { @@ -83,14 +82,21 @@ class Database extends DatabasePromise { }) }; } - this.#runOpts = { + const runOpts = { url: opts.url, headers: headers, preemptionMs: 1, transform: opts.transform, }; + const db = engine.db() as unknown as any; + const memory = db.memory; + const io = memory ? memoryIO() : BrowserIO; + const run = runner(runOpts, io, engine); + super(engine.db() as unknown as any, () => run.wait()); + + this.#runner = run; this.#engine = engine; - this.#io = this.memory ? memoryIO() : BrowserIO; + this.#io = io; this.#guards = new SyncEngineGuards(); } /** @@ -112,7 +118,7 @@ class Database extends DatabasePromise { registerFileAtWorker(this.#worker, `${this.name}-changes`), ]); } - await run(this.#runOpts, this.#io, this.#engine, this.#engine.connect()); + await run(this.#runner, this.#engine.connect()); } this.connected = true; } @@ -125,11 +131,11 @@ class Database extends DatabasePromise { if (this.#engine == null) { throw new Error("sync is disabled as database was opened without sync support") } - const changes = await this.#guards.wait(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.wait())); + const changes = await this.#guards.wait(async () => await run(this.#runner, this.#engine.wait())); if (changes.empty()) { return false; } - await this.#guards.apply(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.apply(changes))); + await this.#guards.apply(async () => await run(this.#runner, this.#engine.apply(changes))); return true; } /** @@ -140,7 +146,7 @@ class Database extends DatabasePromise { if (this.#engine == null) { throw new Error("sync is disabled as database was opened without sync support") } - await this.#guards.push(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.push())); + await this.#guards.push(async () => await run(this.#runner, this.#engine.push())); } /** * checkpoint WAL for local database @@ -149,7 +155,7 @@ class Database extends DatabasePromise { if (this.#engine == null) { throw new Error("sync is disabled as database was opened without sync support") } - await this.#guards.checkpoint(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.checkpoint())); + await this.#guards.checkpoint(async () => await run(this.#runner, this.#engine.checkpoint())); } /** * @returns statistic of current local database @@ -158,7 +164,7 @@ class Database extends DatabasePromise { if (this.#engine == null) { throw new Error("sync is disabled as database was opened without sync support") } - return (await run(this.#runOpts, this.#io, this.#engine, this.#engine.stats())); + return (await run(this.#runner, this.#engine.stats())); } /** * close the database and relevant files diff --git a/bindings/javascript/sync/packages/wasm/promise-vite-dev-hack.ts b/bindings/javascript/sync/packages/wasm/promise-vite-dev-hack.ts index fbced4d2c..700ac52c0 100644 --- a/bindings/javascript/sync/packages/wasm/promise-vite-dev-hack.ts +++ b/bindings/javascript/sync/packages/wasm/promise-vite-dev-hack.ts @@ -1,6 +1,6 @@ import { registerFileAtWorker, unregisterFileAtWorker } from "@tursodatabase/database-wasm-common" import { DatabasePromise } from "@tursodatabase/database-common" -import { ProtocolIo, run, DatabaseOpts, EncryptionOpts, RunOpts, DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult, DatabaseStats, SyncEngineGuards } from "@tursodatabase/sync-common"; +import { ProtocolIo, run, DatabaseOpts, EncryptionOpts, RunOpts, DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult, DatabaseStats, SyncEngineGuards, Runner, runner } from "@tursodatabase/sync-common"; import { SyncEngine, SyncEngineProtocolVersion, initThreadPool, MainWorker, Database as NativeDatabase } from "./index-vite-dev-hack.js"; let BrowserIO: ProtocolIo = { @@ -39,7 +39,7 @@ async function init(): Promise { } class Database extends DatabasePromise { - #runOpts: RunOpts; + #runner: Runner; #engine: any; #io: ProtocolIo; #guards: SyncEngineGuards; @@ -61,7 +61,6 @@ class Database extends DatabasePromise { bootstrapIfEmpty: typeof opts.url != "function" || opts.url() != null, remoteEncryption: opts.remoteEncryption?.cipher, }); - super(engine.db() as unknown as any); let headers: { [K: string]: string } | (() => Promise<{ [K: string]: string }>); if (typeof opts.authToken == "function") { @@ -83,14 +82,21 @@ class Database extends DatabasePromise { }) }; } - this.#runOpts = { + const runOpts = { url: opts.url, headers: headers, preemptionMs: 1, transform: opts.transform, }; + const db = engine.db() as unknown as any; + const memory = db.memory; + const io = memory ? memoryIO() : BrowserIO; + const run = runner(runOpts, io, engine); + super(engine.db() as unknown as any, () => run.wait()); + + this.#runner = run; this.#engine = engine; - this.#io = this.memory ? memoryIO() : BrowserIO; + this.#io = io; this.#guards = new SyncEngineGuards(); } /** @@ -112,7 +118,7 @@ class Database extends DatabasePromise { registerFileAtWorker(this.#worker, `${this.name}-changes`), ]); } - await run(this.#runOpts, this.#io, this.#engine, this.#engine.connect()); + await run(this.#runner, this.#engine.connect()); } this.connected = true; } @@ -125,11 +131,11 @@ class Database extends DatabasePromise { if (this.#engine == null) { throw new Error("sync is disabled as database was opened without sync support") } - const changes = await this.#guards.wait(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.wait())); + const changes = await this.#guards.wait(async () => await run(this.#runner, this.#engine.wait())); if (changes.empty()) { return false; } - await this.#guards.apply(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.apply(changes))); + await this.#guards.apply(async () => await run(this.#runner, this.#engine.apply(changes))); return true; } /** @@ -140,7 +146,7 @@ class Database extends DatabasePromise { if (this.#engine == null) { throw new Error("sync is disabled as database was opened without sync support") } - await this.#guards.push(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.push())); + await this.#guards.push(async () => await run(this.#runner, this.#engine.push())); } /** * checkpoint WAL for local database @@ -149,7 +155,7 @@ class Database extends DatabasePromise { if (this.#engine == null) { throw new Error("sync is disabled as database was opened without sync support") } - await this.#guards.checkpoint(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.checkpoint())); + await this.#guards.checkpoint(async () => await run(this.#runner, this.#engine.checkpoint())); } /** * @returns statistic of current local database @@ -158,7 +164,7 @@ class Database extends DatabasePromise { if (this.#engine == null) { throw new Error("sync is disabled as database was opened without sync support") } - return (await run(this.#runOpts, this.#io, this.#engine, this.#engine.stats())); + return (await run(this.#runner, this.#engine.stats())); } /** * close the database and relevant files diff --git a/bindings/javascript/sync/src/generator.rs b/bindings/javascript/sync/src/generator.rs index 446543049..5481eef16 100644 --- a/bindings/javascript/sync/src/generator.rs +++ b/bindings/javascript/sync/src/generator.rs @@ -48,6 +48,8 @@ pub enum GeneratorResponse { last_pull_unix_time: Option, last_push_unix_time: Option, revision: Option, + network_sent_bytes: i64, + network_received_bytes: i64, }, SyncEngineChanges { changes: SyncEngineChanges, diff --git a/bindings/javascript/sync/src/js_protocol_io.rs b/bindings/javascript/sync/src/js_protocol_io.rs index 9208e4636..754bbe089 100644 --- a/bindings/javascript/sync/src/js_protocol_io.rs +++ b/bindings/javascript/sync/src/js_protocol_io.rs @@ -241,24 +241,48 @@ impl ProtocolIO for JsProtocolIo { .collect(), })) } + + fn add_work(&self, callback: Box bool + Send>) { + let mut work = self.work.lock().unwrap(); + work.push_back(callback); + } + + fn step_work(&self) { + let mut items = { + let mut work = self.work.lock().unwrap(); + work.drain(..).collect::>() + }; + let length = items.len(); + for _ in 0..length { + let mut item = items.pop_front().unwrap(); + if item() { + continue; + } + items.push_back(item); + } + { + let mut work = self.work.lock().unwrap(); + work.extend(items); + } + } } #[napi] pub struct JsProtocolIo { requests: Mutex>, + work: Mutex bool + Send>>>, } impl Default for JsProtocolIo { fn default() -> Self { Self { requests: Mutex::new(Vec::new()), + work: Mutex::new(VecDeque::new()), } } } -#[napi] impl JsProtocolIo { - #[napi] pub fn take_request(&self) -> Option { self.requests.lock().unwrap().pop() } diff --git a/bindings/javascript/sync/src/lib.rs b/bindings/javascript/sync/src/lib.rs index 3073f8309..b355e8423 100644 --- a/bindings/javascript/sync/src/lib.rs +++ b/bindings/javascript/sync/src/lib.rs @@ -13,7 +13,8 @@ use napi::bindgen_prelude::{AsyncTask, Either5, Null}; use napi_derive::napi; use turso_node::{DatabaseOpts, IoLoopTask}; use turso_sync_engine::{ - database_sync_engine::{DatabaseSyncEngine, DatabaseSyncEngineOpts}, + database_sync_engine::{DatabaseSyncEngine, DatabaseSyncEngineOpts, PartialBootstrapStrategy}, + protocol_io::ProtocolIO, types::{Coro, DatabaseChangeType, DatabaseSyncEngineProtocolVersion}, }; @@ -107,6 +108,13 @@ pub enum DatabaseRowTransformResultJs { Rewrite { stmt: DatabaseRowStatementJs }, } +#[napi(discriminant = "type")] +#[derive(Debug)] +pub enum JsPartialBootstrapStrategy { + Prefix { length: i64 }, + Query { query: String }, +} + #[napi(object, object_to_js = false)] pub struct SyncEngineOpts { pub path: String, @@ -119,6 +127,7 @@ pub struct SyncEngineOpts { pub protocol_version: Option, pub bootstrap_if_empty: bool, pub remote_encryption: Option, + pub partial_boostrap_strategy: Option, } struct SyncEngineOptsFilled { @@ -131,6 +140,7 @@ struct SyncEngineOptsFilled { pub protocol_version: DatabaseSyncEngineProtocolVersion, pub bootstrap_if_empty: bool, pub remote_encryption: Option, + pub partial_boostrap_strategy: Option, } #[derive(Debug, Clone, Copy)] @@ -170,12 +180,32 @@ impl SyncEngine { let io: Arc = if is_memory { Arc::new(turso_core::MemoryIO::new()) } else { - #[cfg(not(feature = "browser"))] + #[cfg(all(target_os = "linux", not(feature = "browser")))] + { + if opts.partial_boostrap_strategy.is_none() { + Arc::new(turso_core::PlatformIO::new().map_err(|e| { + napi::Error::new( + napi::Status::GenericFailure, + format!("Failed to create platform IO: {e}"), + ) + })?) + } else { + use turso_sync_engine::sparse_io::SparseLinuxIo; + + Arc::new(SparseLinuxIo::new().map_err(|e| { + napi::Error::new( + napi::Status::GenericFailure, + format!("Failed to create sparse IO: {e}"), + ) + })?) + } + } + #[cfg(all(not(target_os = "linux"), not(feature = "browser")))] { Arc::new(turso_core::PlatformIO::new().map_err(|e| { napi::Error::new( napi::Status::GenericFailure, - format!("Failed to create IO: {e}"), + format!("Failed to create platform IO: {e}"), ) })?) } @@ -224,6 +254,14 @@ impl SyncEngine { )) } }, + partial_boostrap_strategy: opts.partial_boostrap_strategy.map(|s| match s { + JsPartialBootstrapStrategy::Prefix { length } => PartialBootstrapStrategy::Prefix { + length: length as usize, + }, + JsPartialBootstrapStrategy::Query { query } => { + PartialBootstrapStrategy::Query { query } + } + }), }; Ok(SyncEngine { opts: opts_filled, @@ -251,6 +289,7 @@ impl SyncEngine { .remote_encryption .map(|x| x.required_metadata_size()) .unwrap_or(0), + partial_bootstrap_strategy: self.opts.partial_boostrap_strategy.clone(), }; let io = self.io()?; @@ -300,6 +339,12 @@ impl SyncEngine { Ok(self.protocol()?.take_request()) } + #[napi] + pub fn protocol_io_step(&self) -> napi::Result<()> { + self.protocol()?.step_work(); + Ok(()) + } + #[napi] pub fn push(&self) -> GeneratorHolder { self.run(async move |coro, guard| { @@ -323,6 +368,8 @@ impl SyncEngine { last_pull_unix_time: stats.last_pull_unix_time, last_push_unix_time: stats.last_push_unix_time, revision: stats.revision, + network_sent_bytes: stats.network_sent_bytes as i64, + network_received_bytes: stats.network_received_bytes as i64, })) }) } diff --git a/core/io/completions.rs b/core/io/completions.rs index 9e5ed605c..a1f038a54 100644 --- a/core/io/completions.rs +++ b/core/io/completions.rs @@ -58,6 +58,12 @@ impl ContextInner { } } +impl Default for Context { + fn default() -> Self { + Self::new() + } +} + impl Context { pub fn new() -> Self { Self { diff --git a/core/io/memory.rs b/core/io/memory.rs index 31c78a4b1..2e047b3b0 100644 --- a/core/io/memory.rs +++ b/core/io/memory.rs @@ -1,4 +1,5 @@ use super::{Buffer, Clock, Completion, File, OpenFlags, IO}; +use crate::turso_assert; use crate::{io::clock::DefaultClock, Result}; use crate::io::clock::Instant; @@ -233,6 +234,30 @@ impl File for MemoryFile { tracing::debug!("size(path={}): {}", self.path, self.size.get()); Ok(self.size.get()) } + + fn has_hole(&self, pos: usize, len: usize) -> Result { + let start_page = pos / PAGE_SIZE; + let end_page = ((pos + len.max(1)) - 1) / PAGE_SIZE; + for page_no in start_page..=end_page { + if self.get_page(page_no).is_some() { + return Ok(false); + } + } + Ok(true) + } + + fn punch_hole(&self, pos: usize, len: usize) -> Result<()> { + turso_assert!( + pos % PAGE_SIZE == 0 && len % PAGE_SIZE == 0, + "hole must be page aligned" + ); + let start_page = pos / PAGE_SIZE; + let end_page = ((pos + len.max(1)) - 1) / PAGE_SIZE; + for page_no in start_page..=end_page { + unsafe { (*self.pages.get()).remove(&page_no) }; + } + Ok(()) + } } impl MemoryFile { diff --git a/core/io/mod.rs b/core/io/mod.rs index 1c9d107a4..3f917b264 100644 --- a/core/io/mod.rs +++ b/core/io/mod.rs @@ -92,6 +92,22 @@ pub trait File: Send + Sync { } fn size(&self) -> Result; fn truncate(&self, len: u64, c: Completion) -> Result; + + /// Optional method implemented by the IO which supports "partial" files (e.g. file with "holes") + /// This method is used in sync engine only for now (in partial sync mode) and never used in the core database code + /// + /// The hole is the contiguous file region which is not allocated by the file-system + /// If there is a single byte which is allocated within a given range - method must return false in this case + // todo: need to add custom completion type? + fn has_hole(&self, _pos: usize, _len: usize) -> Result { + panic!("has_hole is not supported for the given IO implementation") + } + /// Optional method implemented by the IO which supports "partial" files (e.g. file with "holes") + /// This method is used in sync engine only for now (in partial sync mode) and never used in the core database code + // todo: need to add custom completion type? + fn punch_hole(&self, _pos: usize, _len: usize) -> Result<()> { + panic!("punch_hole is not supported for the given IO implementation") + } } #[derive(Debug, Copy, Clone, PartialEq)] diff --git a/core/lib.rs b/core/lib.rs index af316135a..a32e5497b 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -10,7 +10,7 @@ mod functions; mod incremental; pub mod index_method; mod info; -mod io; +pub mod io; #[cfg(feature = "json")] mod json; pub mod mvcc; diff --git a/sync/engine/Cargo.toml b/sync/engine/Cargo.toml index e99564c17..c0d01f8f3 100644 --- a/sync/engine/Cargo.toml +++ b/sync/engine/Cargo.toml @@ -21,11 +21,18 @@ base64 = "0.22.1" prost = "0.14.1" roaring = "0.11.2" +[target.'cfg(target_os = "linux")'.dependencies] +libc = { version = "0.2.172" } + [dev-dependencies] ctor = "0.4.2" tempfile = { workspace = true } tracing-subscriber = { workspace = true, features = ["env-filter"] } -tokio = { workspace = true, features = ["macros", "rt-multi-thread", "test-util"] } +tokio = { workspace = true, features = [ + "macros", + "rt-multi-thread", + "test-util", +] } uuid = "1.17.0" rand = { workspace = true } rand_chacha = { workspace = true } diff --git a/sync/engine/src/database_sync_engine.rs b/sync/engine/src/database_sync_engine.rs index dcdc843cf..6a09d153d 100644 --- a/sync/engine/src/database_sync_engine.rs +++ b/sync/engine/src/database_sync_engine.rs @@ -1,17 +1,21 @@ use std::{ collections::{HashMap, HashSet}, - sync::{Arc, Mutex}, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, Mutex, + }, }; -use turso_core::OpenFlags; +use turso_core::{DatabaseStorage, OpenFlags}; use crate::{ database_replay_generator::DatabaseReplayGenerator, + database_sync_lazy_storage::LazyDatabaseStorage, database_sync_operations::{ acquire_slot, apply_transformation, bootstrap_db_file, connect_untracked, count_local_changes, has_table, push_logical_changes, read_last_change_id, read_wal_salt, reset_wal_file, update_last_change_id, wait_all_results, wal_apply_from_file, - wal_pull_to_file, PAGE_SIZE, WAL_FRAME_HEADER, WAL_FRAME_SIZE, + wal_pull_to_file, ProtocolIoStats, PAGE_SIZE, WAL_FRAME_HEADER, WAL_FRAME_SIZE, }, database_tape::{ DatabaseChangesIteratorMode, DatabaseChangesIteratorOpts, DatabaseReplaySession, @@ -30,6 +34,12 @@ use crate::{ Result, }; +#[derive(Clone, Debug)] +pub enum PartialBootstrapStrategy { + Prefix { length: usize }, + Query { query: String }, +} + #[derive(Clone, Debug)] pub struct DatabaseSyncEngineOpts { pub client_name: String, @@ -40,11 +50,38 @@ pub struct DatabaseSyncEngineOpts { pub protocol_version_hint: DatabaseSyncEngineProtocolVersion, pub bootstrap_if_empty: bool, pub reserved_bytes: usize, + pub partial_bootstrap_strategy: Option, +} + +pub struct DataStats { + pub written_bytes: AtomicUsize, + pub read_bytes: AtomicUsize, +} + +impl Default for DataStats { + fn default() -> Self { + Self::new() + } +} + +impl DataStats { + pub fn new() -> Self { + Self { + written_bytes: AtomicUsize::new(0), + read_bytes: AtomicUsize::new(0), + } + } + pub fn write(&self, size: usize) { + self.written_bytes.fetch_add(size, Ordering::SeqCst); + } + pub fn read(&self, size: usize) { + self.read_bytes.fetch_add(size, Ordering::SeqCst); + } } pub struct DatabaseSyncEngine { io: Arc, - protocol: Arc

, + protocol: ProtocolIoStats

, db_file: Arc, main_tape: DatabaseTape, main_db_wal_path: String, @@ -68,7 +105,7 @@ impl DatabaseSyncEngine

{ io: Arc, protocol: Arc

, main_db_path: &str, - opts: DatabaseSyncEngineOpts, + mut opts: DatabaseSyncEngineOpts, ) -> Result { let main_db_wal_path = format!("{main_db_path}-wal"); let revert_db_wal_path = format!("{main_db_path}-wal-revert"); @@ -78,12 +115,15 @@ impl DatabaseSyncEngine

{ tracing::info!("init(path={}): opts={:?}", main_db_path, opts); let completion = protocol.full_read(&meta_path)?; - let data = wait_all_results(coro, &completion).await?; + let data = wait_all_results(coro, &completion, None).await?; let meta = if data.is_empty() { None } else { Some(DatabaseMetadata::load(&data)?) }; + let protocol = ProtocolIoStats::new(protocol); + let partial_bootstrap_strategy = opts.partial_bootstrap_strategy.take(); + let partial = partial_bootstrap_strategy.is_some(); let meta = match meta { Some(meta) => meta, @@ -91,27 +131,33 @@ impl DatabaseSyncEngine

{ let client_unique_id = format!("{}-{}", opts.client_name, uuid::Uuid::new_v4()); let revision = bootstrap_db_file( coro, - protocol.as_ref(), + &protocol, &io, main_db_path, opts.protocol_version_hint, + partial_bootstrap_strategy, ) .await?; let meta = DatabaseMetadata { version: DATABASE_METADATA_VERSION.to_string(), client_unique_id, - synced_revision: Some(revision), + synced_revision: Some(revision.clone()), revert_since_wal_salt: None, revert_since_wal_watermark: 0, last_pushed_change_id_hint: 0, last_pushed_pull_gen_hint: 0, last_pull_unix_time: Some(io.now().secs), last_push_unix_time: None, + partial_bootstrap_server_revision: if partial { + Some(revision.clone()) + } else { + None + }, }; tracing::info!("write meta after successful bootstrap: meta={meta:?}"); let completion = protocol.full_write(&meta_path, meta.dump()?)?; // todo: what happen if we will actually update the metadata on disk but fail and so in memory state will not be updated - wait_all_results(coro, &completion).await?; + wait_all_results(coro, &completion, None).await?; meta } None => { @@ -120,6 +166,11 @@ impl DatabaseSyncEngine

{ "deferred bootstrap is not supported for legacy protocol".to_string(), )); } + if partial { + return Err(Error::DatabaseSyncEngineError( + "deferred bootstrap is not supported for partial sync".to_string(), + )); + } let client_unique_id = format!("{}-{}", opts.client_name, uuid::Uuid::new_v4()); let meta = DatabaseMetadata { version: DATABASE_METADATA_VERSION.to_string(), @@ -131,11 +182,12 @@ impl DatabaseSyncEngine

{ last_pushed_pull_gen_hint: 0, last_pull_unix_time: None, last_push_unix_time: None, + partial_bootstrap_server_revision: None, }; tracing::info!("write meta after successful bootstrap: meta={meta:?}"); let completion = protocol.full_write(&meta_path, meta.dump()?)?; // todo: what happen if we will actually update the metadata on disk but fail and so in memory state will not be updated - wait_all_results(coro, &completion).await?; + wait_all_results(coro, &completion, None).await?; meta } }; @@ -156,7 +208,27 @@ impl DatabaseSyncEngine

{ } let db_file = io.open_file(main_db_path, turso_core::OpenFlags::Create, false)?; - let db_file = Arc::new(turso_core::storage::database::DatabaseFile::new(db_file)); + let db_file: Arc = if partial { + let Some(partial_bootstrap_server_revision) = &meta.partial_bootstrap_server_revision + else { + return Err(Error::DatabaseSyncEngineError( + "partial_bootstrap_server_revision must be set in the metadata".to_string(), + )); + }; + let DatabasePullRevision::V1 { revision } = &partial_bootstrap_server_revision else { + return Err(Error::DatabaseSyncEngineError( + "partial sync is supported only for V1 protocol".to_string(), + )); + }; + Arc::new(LazyDatabaseStorage::new( + db_file, + None, // todo(sivukhin): allocate dirty file for FS IO + protocol.clone(), + revision.to_string(), + )) + } else { + Arc::new(turso_core::storage::database::DatabaseFile::new(db_file)) + }; let main_db = turso_core::Database::open_with_flags( io.clone(), @@ -304,6 +376,16 @@ impl DatabaseSyncEngine

{ last_pull_unix_time, last_push_unix_time, revision, + network_sent_bytes: self + .protocol + .network_stats + .written_bytes + .load(Ordering::SeqCst), + network_received_bytes: self + .protocol + .network_stats + .read_bytes + .load(Ordering::SeqCst), }) } @@ -420,7 +502,7 @@ impl DatabaseSyncEngine

{ let revision = self.meta().synced_revision.clone(); let next_revision = wal_pull_to_file( coro, - self.protocol.as_ref(), + &self.protocol, &file.value, &revision, self.opts.wal_pull_batch_size, @@ -669,13 +751,8 @@ impl DatabaseSyncEngine

{ let mut transformed = if self.opts.use_transform { Some( - apply_transformation( - coro, - self.protocol.as_ref(), - &local_changes, - &replay.generator, - ) - .await?, + apply_transformation(coro, &self.protocol, &local_changes, &replay.generator) + .await?, ) } else { None @@ -717,7 +794,7 @@ impl DatabaseSyncEngine

{ let (_, change_id) = push_logical_changes( coro, - self.protocol.as_ref(), + &self.protocol, &self.main_tape, &self.client_unique_id, &self.opts, @@ -770,7 +847,7 @@ impl DatabaseSyncEngine

{ tracing::info!("update_meta: {meta:?}"); let completion = self.protocol.full_write(&self.meta_path, meta.dump()?)?; // todo: what happen if we will actually update the metadata on disk but fail and so in memory state will not be updated - wait_all_results(coro, &completion).await?; + wait_all_results(coro, &completion, None).await?; *self.meta.lock().unwrap() = meta; Ok(()) } diff --git a/sync/engine/src/database_sync_lazy_storage.rs b/sync/engine/src/database_sync_lazy_storage.rs new file mode 100644 index 000000000..ad83b628c --- /dev/null +++ b/sync/engine/src/database_sync_lazy_storage.rs @@ -0,0 +1,247 @@ +use std::sync::Arc; + +use turso_core::{Buffer, Completion, CompletionError, DatabaseStorage, File, LimboError}; + +use crate::{ + database_sync_operations::{pull_pages_v1, ProtocolIoStats, PAGE_SIZE}, + errors, + protocol_io::ProtocolIO, + types::Coro, +}; + +pub struct LazyDatabaseStorage { + clean_file: Arc, + dirty_file: Option>, + protocol: ProtocolIoStats

, + server_revision: String, +} + +impl LazyDatabaseStorage

{ + pub fn new( + clean_file: Arc, + dirty_file: Option>, + protocol: ProtocolIoStats

, + server_revision: String, + ) -> Self { + Self { + clean_file, + dirty_file, + protocol, + server_revision, + } + } +} + +impl DatabaseStorage for LazyDatabaseStorage

{ + fn read_header(&self, c: turso_core::Completion) -> turso_core::Result { + assert!( + !self.clean_file.has_hole(0, PAGE_SIZE)?, + "first page must be filled" + ); + self.clean_file.pread(0, c) + } + + fn read_page( + &self, + page_idx: usize, + io_ctx: &turso_core::IOContext, + c: turso_core::Completion, + ) -> turso_core::Result { + assert!( + io_ctx.encryption_context().is_none(), + "encryption or checksum are not supported with partial sync" + ); + assert!(page_idx as i64 >= 0, "page should be positive"); + let r = c.as_read(); + let size = r.buf().len(); + assert!(page_idx > 0); + if !(512..=65536).contains(&size) || size & (size - 1) != 0 { + return Err(LimboError::NotADB); + } + let Some(pos) = (page_idx as u64 - 1).checked_mul(size as u64) else { + return Err(LimboError::IntegerOverflow); + }; + + if !self.clean_file.has_hole(pos as usize, size)? { + let Some(dirty_file) = &self.dirty_file else { + // no dirty file was set - this means that FS is atomic (e.g. MemoryIO) + return self.clean_file.pread(pos, c); + }; + if dirty_file.has_hole(pos as usize, size)? { + // dirty file has no hole - this means that we cleanly removed the hole when we wrote to the clean file + return self.clean_file.pread(pos, c); + } + let check_buffer = Arc::new(Buffer::new_temporary(size)); + let check_c = + dirty_file.pread(pos, Completion::new_read(check_buffer.clone(), |_| {}))?; + assert!( + check_c.finished(), + "LazyDatabaseStorage works only with sync IO" + ); + + let clean_buffer = r.buf_arc(); + let clean_c = self + .clean_file + .pread(pos, Completion::new_read(clean_buffer.clone(), |_| {}))?; + assert!( + clean_c.finished(), + "LazyDatabaseStorage works only with sync IO" + ); + + if check_buffer.as_slice().eq(clean_buffer.as_slice()) { + // dirty buffer matches clean buffer - this means that clean data is valid + return self.clean_file.pread(pos, c); + } + } + tracing::info!( + "PartialDatabaseStorage::read_page(page_idx={}): read page from the remote server", + page_idx + ); + let mut generator = genawaiter::sync::Gen::new({ + let protocol = self.protocol.clone(); + let server_revision = self.server_revision.clone(); + let clean_file = self.clean_file.clone(); + let dirty_file = self.dirty_file.clone(); + let c = c.clone(); + |coro| async move { + let coro = Coro::new((), coro); + let pages = [(page_idx - 1) as u32]; + let result = pull_pages_v1(&coro, &protocol, &server_revision, &pages).await; + match result { + Ok(page) => { + let read = c.as_read(); + let buf = read.buf_arc(); + buf.as_mut_slice().copy_from_slice(&page); + + if let Some(dirty_file) = &dirty_file { + let dirty_c = dirty_file.pwrite( + pos, + buf.clone(), + Completion::new_write(|_| {}), + )?; + assert!( + dirty_c.finished(), + "LazyDatabaseStorage works only with sync IO" + ); + } + + let clean_c = + clean_file.pwrite(pos, buf.clone(), Completion::new_write(|_| {}))?; + assert!( + clean_c.finished(), + "LazyDatabaseStorage works only with sync IO" + ); + + if let Some(dirty_file) = &dirty_file { + dirty_file.punch_hole(pos as usize, buf.len())?; + } + + c.complete(buf.len() as i32); + Ok::<(), errors::Error>(()) + } + Err(err) => { + tracing::error!("failed to fetch path from remote server: {err}"); + c.error(CompletionError::IOError(std::io::ErrorKind::Other)); + Err(err) + } + } + } + }); + self.protocol + .add_work(Box::new(move || match generator.resume_with(Ok(())) { + genawaiter::GeneratorState::Yielded(_) => false, + genawaiter::GeneratorState::Complete(_) => true, + })); + Ok(c) + } + + fn write_page( + &self, + page_idx: usize, + buffer: std::sync::Arc, + io_ctx: &turso_core::IOContext, + c: turso_core::Completion, + ) -> turso_core::Result { + assert!( + io_ctx.encryption_context().is_none(), + "encryption or checksum are not supported with partial sync" + ); + + let buffer_size = buffer.len(); + assert!(page_idx > 0); + assert!(buffer_size >= 512); + assert!(buffer_size <= 65536); + assert_eq!(buffer_size & (buffer_size - 1), 0); + let Some(pos) = (page_idx as u64 - 1).checked_mul(buffer_size as u64) else { + return Err(LimboError::IntegerOverflow); + }; + + // we write to the database only during checkpoint - so we need to punch hole in the dirty file in order to mark this region as valid + if let Some(dirty_file) = &self.dirty_file { + dirty_file.punch_hole(pos as usize, buffer_size)?; + } + self.clean_file.pwrite(pos, buffer, c) + } + + fn write_pages( + &self, + first_page_idx: usize, + page_size: usize, + buffers: Vec>, + io_ctx: &turso_core::IOContext, + c: turso_core::Completion, + ) -> turso_core::Result { + assert!( + io_ctx.encryption_context().is_none(), + "encryption or checksum are not supported with partial sync" + ); + + assert!(first_page_idx > 0); + assert!(page_size >= 512); + assert!(page_size <= 65536); + assert_eq!(page_size & (page_size - 1), 0); + + let Some(pos) = (first_page_idx as u64 - 1).checked_mul(page_size as u64) else { + return Err(LimboError::IntegerOverflow); + }; + // we write to the database only during checkpoint - so we need to punch hole in the dirty file in order to mark this region as valid + if let Some(dirty_file) = &self.dirty_file { + let buffers_size = buffers.iter().map(|b| b.len()).sum(); + dirty_file.punch_hole(pos as usize, buffers_size)?; + } + let c = self.clean_file.pwritev(pos, buffers, c)?; + Ok(c) + } + + fn sync(&self, c: turso_core::Completion) -> turso_core::Result { + if let Some(dirty_file) = &self.dirty_file { + let dirty_c = dirty_file.sync(Completion::new_sync(|_| {}))?; + assert!( + dirty_c.finished(), + "LazyDatabaseStorage works only with sync IO" + ); + } + + self.clean_file.sync(c) + } + + fn size(&self) -> turso_core::Result { + self.clean_file.size() + } + + fn truncate( + &self, + len: usize, + c: turso_core::Completion, + ) -> turso_core::Result { + if let Some(dirty_file) = &self.dirty_file { + let dirty_c = dirty_file.truncate(len as u64, Completion::new_trunc(|_| {}))?; + assert!( + dirty_c.finished(), + "LazyDatabaseStorage works only with sync IO" + ); + } + + self.clean_file.truncate(len as u64, c) + } +} diff --git a/sync/engine/src/database_sync_operations.rs b/sync/engine/src/database_sync_operations.rs index 7611ea2d5..45ba66123 100644 --- a/sync/engine/src/database_sync_operations.rs +++ b/sync/engine/src/database_sync_operations.rs @@ -1,7 +1,11 @@ -use std::sync::{Arc, Mutex}; +use std::{ + ops::Deref, + sync::{Arc, Mutex}, +}; use bytes::BytesMut; use prost::Message; +use roaring::RoaringBitmap; use turso_core::{ types::{Text, WalFrameInfo}, Buffer, Completion, LimboError, OpenFlags, Value, @@ -9,7 +13,7 @@ use turso_core::{ use crate::{ database_replay_generator::DatabaseReplayGenerator, - database_sync_engine::DatabaseSyncEngineOpts, + database_sync_engine::{DataStats, DatabaseSyncEngineOpts, PartialBootstrapStrategy}, database_tape::{ run_stmt_expect_one_row, run_stmt_ignore_rows, DatabaseChangesIteratorMode, DatabaseChangesIteratorOpts, DatabaseReplaySessionOpts, DatabaseTape, DatabaseWalSession, @@ -59,6 +63,37 @@ pub(crate) fn acquire_slot(slot: &Arc>>) -> Result { + pub protocol: Arc

, + pub network_stats: Arc, +} + +impl ProtocolIoStats

{ + pub fn new(protocol: Arc

) -> Self { + Self { + protocol, + network_stats: Arc::new(DataStats::new()), + } + } +} + +impl Clone for ProtocolIoStats

{ + fn clone(&self) -> Self { + Self { + protocol: self.protocol.clone(), + network_stats: self.network_stats.clone(), + } + } +} + +impl Deref for ProtocolIoStats

{ + type Target = P; + + fn deref(&self) -> &Self::Target { + &self.protocol + } +} + enum WalHttpPullResult> { Frames(C), NeedCheckpoint(DbSyncStatus), @@ -78,7 +113,7 @@ pub fn connect_untracked(tape: &DatabaseTape) -> Result( coro: &Coro, - client: &C, + client: &ProtocolIoStats, db: Arc, ) -> Result { tracing::info!("db_bootstrap"); @@ -89,8 +124,10 @@ pub async fn db_bootstrap( let mut pos = 0; loop { while let Some(chunk) = content.poll_data()? { + client.network_stats.read(chunk.data().len()); let chunk = chunk.data(); let content_len = chunk.len(); + // todo(sivukhin): optimize allocations here #[allow(clippy::arc_with_non_send_sync)] let buffer = Arc::new(Buffer::new_temporary(chunk.len())); @@ -163,7 +200,7 @@ pub async fn wal_apply_from_file( pub async fn wal_pull_to_file( coro: &Coro, - client: &C, + client: &ProtocolIoStats, frames_file: &Arc, revision: &Option, wal_pull_batch_size: u64, @@ -206,7 +243,7 @@ pub async fn wal_pull_to_file( /// Pull updates from remote to the separate file pub async fn wal_pull_to_file_v1( coro: &Coro, - client: &C, + client: &ProtocolIoStats, frames_file: &Arc, revision: &str, long_poll_timeout: Option, @@ -219,10 +256,12 @@ pub async fn wal_pull_to_file_v1( server_revision: String::new(), client_revision: revision.to_string(), long_poll_timeout_ms: long_poll_timeout.map(|x| x.as_millis() as u32).unwrap_or(0), - server_pages: BytesMut::new().into(), + server_pages_selector: BytesMut::new().into(), + server_query_selector: String::new(), client_pages: BytesMut::new().into(), }; let request = request.encode_to_vec(); + client.network_stats.write(request.len()); let completion = client.http( "POST", "/pull-updates", @@ -232,8 +271,13 @@ pub async fn wal_pull_to_file_v1( ("accept-encoding", "application/protobuf"), ], )?; - let Some(header) = - wait_proto_message::(coro, &completion, &mut bytes).await? + let Some(header) = wait_proto_message::( + coro, + &completion, + &client.network_stats, + &mut bytes, + ) + .await? else { return Err(Error::DatabaseSyncEngineError( "no header returned in the pull-updates protobuf call".to_string(), @@ -246,7 +290,8 @@ pub async fn wal_pull_to_file_v1( let buffer = Arc::new(Buffer::new_temporary(WAL_FRAME_SIZE)); let mut page_data_opt = - wait_proto_message::(coro, &completion, &mut bytes).await?; + wait_proto_message::(coro, &completion, &client.network_stats, &mut bytes) + .await?; while let Some(page_data) = page_data_opt.take() { let page_id = page_data.page_id; tracing::info!("received page {}", page_id); @@ -259,7 +304,8 @@ pub async fn wal_pull_to_file_v1( ))); } buffer.as_mut_slice()[WAL_FRAME_HEADER..].copy_from_slice(&page); - page_data_opt = wait_proto_message(coro, &completion, &mut bytes).await?; + page_data_opt = + wait_proto_message(coro, &completion, &client.network_stats, &mut bytes).await?; let mut frame_info = WalFrameInfo { db_size: 0, page_no: page_id as u32 + 1, @@ -298,10 +344,87 @@ pub async fn wal_pull_to_file_v1( }) } +/// Pull pages from remote +pub async fn pull_pages_v1( + coro: &Coro, + client: &ProtocolIoStats, + server_revision: &str, + pages: &[u32], +) -> Result> { + tracing::info!("pull_pages_v1: revision={server_revision}, pages={pages:?}"); + let mut bytes = BytesMut::new(); + + let mut bitmap = RoaringBitmap::new(); + bitmap.extend(pages); + + let mut bitmap_bytes = Vec::with_capacity(bitmap.serialized_size()); + bitmap.serialize_into(&mut bitmap_bytes).map_err(|e| { + Error::DatabaseSyncEngineError(format!("unable to serialize pull page request: {e}")) + })?; + + let request = PullUpdatesReqProtoBody { + encoding: PageUpdatesEncodingReq::Raw as i32, + server_revision: server_revision.to_string(), + client_revision: String::new(), + long_poll_timeout_ms: 0, + server_pages_selector: bitmap_bytes.into(), + server_query_selector: String::new(), + client_pages: BytesMut::new().into(), + }; + let request = request.encode_to_vec(); + client.network_stats.write(request.len()); + let completion = client.http( + "POST", + "/pull-updates", + Some(request), + &[ + ("content-type", "application/protobuf"), + ("accept-encoding", "application/protobuf"), + ], + )?; + let Some(header) = wait_proto_message::( + coro, + &completion, + &client.network_stats, + &mut bytes, + ) + .await? + else { + return Err(Error::DatabaseSyncEngineError( + "no header returned in the pull-updates protobuf call".to_string(), + )); + }; + tracing::info!("pull_pages_v1: got header={:?}", header); + + let mut pages = Vec::with_capacity(PAGE_SIZE * pages.len()); + + let mut page_data_opt = + wait_proto_message::(coro, &completion, &client.network_stats, &mut bytes) + .await?; + while let Some(page_data) = page_data_opt.take() { + let page_id = page_data.page_id; + tracing::info!("received page {}", page_id); + let page = decode_page(&header, page_data)?; + if page.len() != PAGE_SIZE { + return Err(Error::DatabaseSyncEngineError(format!( + "page has unexpected size: {} != {}", + page.len(), + PAGE_SIZE + ))); + } + pages.extend_from_slice(&page); + page_data_opt = + wait_proto_message(coro, &completion, &client.network_stats, &mut bytes).await?; + tracing::info!("page_data_opt: {}", page_data_opt.is_some()); + } + + Ok(pages) +} + /// Pull updates from remote to the separate file pub async fn wal_pull_to_file_legacy( coro: &Coro, - client: &C, + client: &ProtocolIoStats, frames_file: &Arc, mut generation: u64, mut start_frame: u64, @@ -339,7 +462,9 @@ pub async fn wal_pull_to_file_legacy( }; loop { while let Some(chunk) = data.poll_data()? { + client.network_stats.read(chunk.data().len()); let mut chunk = chunk.data(); + while !chunk.is_empty() { let to_fill = (WAL_FRAME_SIZE - buffer_len).min(chunk.len()); buffer.as_mut_slice()[buffer_len..buffer_len + to_fill] @@ -425,7 +550,7 @@ pub async fn wal_pull_to_file_legacy( /// and can be called multiple times with same frame range pub async fn wal_push( coro: &Coro, - client: &C, + client: &ProtocolIoStats, wal_session: &mut WalSession, baton: Option, generation: u64, @@ -630,7 +755,7 @@ pub async fn read_last_change_id( pub async fn fetch_last_change_id( coro: &Coro, - client: &C, + client: &ProtocolIoStats, source_conn: &Arc, client_id: &str, ) -> Result<(i64, Option)> { @@ -710,7 +835,7 @@ pub async fn fetch_last_change_id( pub async fn push_logical_changes( coro: &Coro, - client: &C, + client: &ProtocolIoStats, source: &DatabaseTape, client_id: &str, opts: &DatabaseSyncEngineOpts, @@ -929,9 +1054,9 @@ pub async fn push_logical_changes( Ok((source_pull_gen, last_change_id.unwrap_or(0))) } -pub async fn apply_transformation( +pub async fn apply_transformation( coro: &Coro, - client: &P, + client: &ProtocolIoStats, changes: &Vec, generator: &DatabaseReplayGenerator, ) -> Result> { @@ -941,7 +1066,7 @@ pub async fn apply_transformation( mutations.push(generator.create_mutation(&replay_info, change)?); } let completion = client.transform(mutations)?; - let transformed = wait_all_results(coro, &completion).await?; + let transformed = wait_all_results(coro, &completion, None).await?; if transformed.len() != changes.len() { return Err(Error::DatabaseSyncEngineError(format!( "unexpected result from custom transformation: mismatch in shapes: {} != {}", @@ -1001,39 +1126,81 @@ pub async fn checkpoint_wal_file( pub async fn bootstrap_db_file( coro: &Coro, - client: &C, + client: &ProtocolIoStats, io: &Arc, main_db_path: &str, protocol: DatabaseSyncEngineProtocolVersion, + partial_bootstrap_strategy: Option, ) -> Result { match protocol { DatabaseSyncEngineProtocolVersion::Legacy => { + if partial_bootstrap_strategy.is_some() { + return Err(Error::DatabaseSyncEngineError( + "can't bootstrap prefix of database with legacy protocol".to_string(), + )); + } bootstrap_db_file_legacy(coro, client, io, main_db_path).await } DatabaseSyncEngineProtocolVersion::V1 => { - bootstrap_db_file_v1(coro, client, io, main_db_path).await + bootstrap_db_file_v1(coro, client, io, main_db_path, partial_bootstrap_strategy).await } } } pub async fn bootstrap_db_file_v1( coro: &Coro, - client: &C, + client: &ProtocolIoStats, io: &Arc, main_db_path: &str, + bootstrap: Option, ) -> Result { - let mut bytes = BytesMut::new(); + let server_pages_selector = if let Some(PartialBootstrapStrategy::Prefix { length }) = + &bootstrap + { + let mut bitmap = RoaringBitmap::new(); + bitmap.insert_range(0..(*length / PAGE_SIZE) as u32); + let mut bitmap_bytes = Vec::with_capacity(bitmap.serialized_size()); + bitmap.serialize_into(&mut bitmap_bytes).map_err(|e| { + Error::DatabaseSyncEngineError(format!("unable to serialize bootstrap request: {e}")) + })?; + bitmap_bytes + } else { + Vec::new() + }; + let server_query_selector = if let Some(PartialBootstrapStrategy::Query { query }) = bootstrap { + query + } else { + String::new() + }; + + let request = PullUpdatesReqProtoBody { + encoding: PageUpdatesEncodingReq::Raw as i32, + server_revision: String::new(), + client_revision: String::new(), + long_poll_timeout_ms: 0, + server_pages_selector: server_pages_selector.into(), + server_query_selector, + client_pages: BytesMut::new().into(), + }; + let request = request.encode_to_vec(); + client.network_stats.write(request.len()); let completion = client.http( - "GET", + "POST", "/pull-updates", - None, + Some(request), &[ ("content-type", "application/protobuf"), ("accept-encoding", "application/protobuf"), ], )?; - let Some(header) = - wait_proto_message::(coro, &completion, &mut bytes).await? + let mut bytes = BytesMut::new(); + let Some(header) = wait_proto_message::( + coro, + &completion, + &client.network_stats, + &mut bytes, + ) + .await? else { return Err(Error::DatabaseSyncEngineError( "no header returned in the pull-updates protobuf call".to_string(), @@ -1059,8 +1226,13 @@ pub async fn bootstrap_db_file_v1( #[allow(clippy::arc_with_non_send_sync)] let buffer = Arc::new(Buffer::new_temporary(PAGE_SIZE)); while let Some(page_data) = - wait_proto_message::(coro, &completion, &mut bytes).await? + wait_proto_message::(coro, &completion, &client.network_stats, &mut bytes) + .await? { + tracing::info!( + "bootstrap_db_file: received page page_id={}", + page_data.page_id + ); let offset = page_data.page_id * PAGE_SIZE as u64; let page = decode_page(&header, page_data)?; if page.len() != PAGE_SIZE { @@ -1110,7 +1282,7 @@ fn decode_page(header: &PullUpdatesRespProtoBody, page_data: PageData) -> Result pub async fn bootstrap_db_file_legacy( coro: &Coro, - client: &C, + client: &ProtocolIoStats, io: &Arc, main_db_path: &str, ) -> Result { @@ -1169,17 +1341,20 @@ pub async fn reset_wal_file( async fn sql_execute_http( coro: &Coro, - client: &C, + client: &ProtocolIoStats, request: server_proto::PipelineReqBody, ) -> Result> { let body = serde_json::to_vec(&request)?; + + client.network_stats.write(body.len()); let completion = client.http("POST", "/v2/pipeline", Some(body), &[])?; + let status = wait_status(coro, &completion).await?; if status != http::StatusCode::OK { let error = format!("sql_execute_http: unexpected status code: {status}"); return Err(Error::DatabaseSyncEngineError(error)); } - let response = wait_all_results(coro, &completion).await?; + let response = wait_all_results(coro, &completion, Some(&client.network_stats)).await?; let response: server_proto::PipelineRespBody = serde_json::from_slice(&response)?; tracing::debug!("hrana response: {:?}", response); let mut results = Vec::new(); @@ -1217,7 +1392,7 @@ async fn sql_execute_http( async fn wal_pull_http( coro: &Coro, - client: &C, + client: &ProtocolIoStats, generation: u64, start_frame: u64, end_frame: u64, @@ -1230,7 +1405,7 @@ async fn wal_pull_http( )?; let status = wait_status(coro, &completion).await?; if status == http::StatusCode::BAD_REQUEST { - let status_body = wait_all_results(coro, &completion).await?; + let status_body = wait_all_results(coro, &completion, Some(&client.network_stats)).await?; let status: DbSyncStatus = serde_json::from_slice(&status_body)?; if status.status == "checkpoint_needed" { return Ok(WalHttpPullResult::NeedCheckpoint(status)); @@ -1248,7 +1423,7 @@ async fn wal_pull_http( async fn wal_push_http( coro: &Coro, - client: &C, + client: &ProtocolIoStats, baton: Option, generation: u64, start_frame: u64, @@ -1258,6 +1433,8 @@ async fn wal_push_http( let baton = baton .map(|baton| format!("/{baton}")) .unwrap_or("".to_string()); + + client.network_stats.write(frames.len()); let completion = client.http( "POST", &format!("/sync/{generation}/{start_frame}/{end_frame}{baton}"), @@ -1265,7 +1442,7 @@ async fn wal_push_http( &[], )?; let status = wait_status(coro, &completion).await?; - let status_body = wait_all_results(coro, &completion).await?; + let status_body = wait_all_results(coro, &completion, Some(&client.network_stats)).await?; if status != http::StatusCode::OK { let error = std::str::from_utf8(&status_body).ok().unwrap_or(""); return Err(Error::DatabaseSyncEngineError(format!( @@ -1275,10 +1452,13 @@ async fn wal_push_http( Ok(serde_json::from_slice(&status_body)?) } -async fn db_info_http(coro: &Coro, client: &C) -> Result { +async fn db_info_http( + coro: &Coro, + client: &ProtocolIoStats, +) -> Result { let completion = client.http("GET", "/info", None, &[])?; let status = wait_status(coro, &completion).await?; - let status_body = wait_all_results(coro, &completion).await?; + let status_body = wait_all_results(coro, &completion, Some(&client.network_stats)).await?; if status != http::StatusCode::OK { return Err(Error::DatabaseSyncEngineError(format!( "db_info go unexpected status: {status}" @@ -1289,7 +1469,7 @@ async fn db_info_http(coro: &Coro, client: &C) -> Resul async fn db_bootstrap_http( coro: &Coro, - client: &C, + client: &ProtocolIoStats, generation: u64, ) -> Result { let completion = client.http("GET", &format!("/export/{generation}"), None, &[])?; @@ -1335,6 +1515,7 @@ pub fn read_varint(buf: &[u8]) -> Result> { pub async fn wait_proto_message( coro: &Coro, completion: &impl DataCompletion, + network_stats: &DataStats, bytes: &mut BytesMut, ) -> Result> { let start_time = std::time::Instant::now(); @@ -1346,6 +1527,7 @@ pub async fn wait_proto_message( }; if not_enough_bytes { if let Some(poll) = completion.poll_data()? { + network_stats.read(poll.data().len()); bytes.extend_from_slice(poll.data()); } else if !completion.is_done()? { coro.yield_(ProtocolCommand::IO).await?; @@ -1374,10 +1556,12 @@ pub async fn wait_proto_message( pub async fn wait_all_results( coro: &Coro, completion: &impl DataCompletion, + stats: Option<&DataStats>, ) -> Result> { let mut results = Vec::new(); loop { while let Some(poll) = completion.poll_data()? { + stats.inspect(|s| s.read(poll.data().len())); results.extend_from_slice(poll.data()); } if completion.is_done()? { @@ -1396,6 +1580,7 @@ mod tests { use prost::Message; use crate::{ + database_sync_engine::DataStats, database_sync_operations::wait_proto_message, protocol_io::{DataCompletion, DataPollResult}, server_proto::PageData, @@ -1416,6 +1601,8 @@ mod tests { chunk: usize, } + unsafe impl Sync for TestCompletion {} + impl DataCompletion for TestCompletion { type DataPollResult = TestPollResult; fn status(&self) -> crate::Result> { @@ -1457,9 +1644,15 @@ mod tests { let coro: Coro<()> = coro.into(); let mut bytes = BytesMut::new(); let mut count = 0; - while wait_proto_message::<(), PageData>(&coro, &completion, &mut bytes) - .await? - .is_some() + let network_stats = DataStats::new(); + while wait_proto_message::<(), PageData>( + &coro, + &completion, + &network_stats, + &mut bytes, + ) + .await? + .is_some() { assert!(bytes.capacity() <= 16 * 1024 + 1024); count += 1; diff --git a/sync/engine/src/lib.rs b/sync/engine/src/lib.rs index 0546a15e7..4c6551c38 100644 --- a/sync/engine/src/lib.rs +++ b/sync/engine/src/lib.rs @@ -1,5 +1,6 @@ pub mod database_replay_generator; pub mod database_sync_engine; +pub mod database_sync_lazy_storage; pub mod database_sync_operations; pub mod database_tape; pub mod errors; @@ -9,6 +10,9 @@ pub mod server_proto; pub mod types; pub mod wal_session; +#[cfg(target_os = "linux")] +pub mod sparse_io; + pub type Result = std::result::Result; #[cfg(test)] diff --git a/sync/engine/src/protocol_io.rs b/sync/engine/src/protocol_io.rs index 797a2aef7..b74cd772b 100644 --- a/sync/engine/src/protocol_io.rs +++ b/sync/engine/src/protocol_io.rs @@ -3,18 +3,18 @@ use crate::{ Result, }; -pub trait DataPollResult { +pub trait DataPollResult: Send + Sync + 'static { fn data(&self) -> &[T]; } -pub trait DataCompletion { +pub trait DataCompletion: Send + Sync + 'static { type DataPollResult: DataPollResult; fn status(&self) -> Result>; fn poll_data(&self) -> Result>; fn is_done(&self) -> Result; } -pub trait ProtocolIO { +pub trait ProtocolIO: Send + Sync + 'static { type DataCompletionBytes: DataCompletion; type DataCompletionTransform: DataCompletion; fn full_read(&self, path: &str) -> Result; @@ -30,4 +30,6 @@ pub trait ProtocolIO { body: Option>, headers: &[(&str, &str)], ) -> Result; + fn add_work(&self, callback: Box bool + Send>); + fn step_work(&self); } diff --git a/sync/engine/src/server_proto.rs b/sync/engine/src/server_proto.rs index bca505d68..e74f9171d 100644 --- a/sync/engine/src/server_proto.rs +++ b/sync/engine/src/server_proto.rs @@ -23,7 +23,9 @@ pub struct PullUpdatesReqProtoBody { #[prost(uint32, tag = "4")] pub long_poll_timeout_ms: u32, #[prost(bytes, tag = "5")] - pub server_pages: Bytes, + pub server_pages_selector: Bytes, + #[prost(string, tag = "7")] + pub server_query_selector: String, #[prost(bytes, tag = "6")] pub client_pages: Bytes, } diff --git a/sync/engine/src/sparse_io.rs b/sync/engine/src/sparse_io.rs new file mode 100644 index 000000000..f89098c70 --- /dev/null +++ b/sync/engine/src/sparse_io.rs @@ -0,0 +1,205 @@ +use std::{ + os::{fd::AsRawFd, unix::fs::FileExt}, + sync::{Arc, RwLock}, +}; + +use tracing::{instrument, Level}; +use turso_core::{ + io::clock::DefaultClock, Buffer, Clock, Completion, File, Instant, OpenFlags, Result, IO, +}; + +pub struct SparseLinuxIo {} + +impl SparseLinuxIo { + pub fn new() -> Result { + Ok(Self {}) + } +} + +impl IO for SparseLinuxIo { + #[instrument(err, skip_all, level = Level::TRACE)] + fn open_file(&self, path: &str, flags: OpenFlags, _direct: bool) -> Result> { + let mut file = std::fs::File::options(); + file.read(true); + + if !flags.contains(OpenFlags::ReadOnly) { + file.write(true); + file.create(flags.contains(OpenFlags::Create)); + } + + let file = file.open(path)?; + Ok(Arc::new(SparseLinuxFile { + file: RwLock::new(file), + })) + } + + #[instrument(err, skip_all, level = Level::TRACE)] + fn remove_file(&self, path: &str) -> Result<()> { + Ok(std::fs::remove_file(path)?) + } + + #[instrument(err, skip_all, level = Level::TRACE)] + fn step(&self) -> Result<()> { + Ok(()) + } +} + +impl Clock for SparseLinuxIo { + fn now(&self) -> Instant { + DefaultClock.now() + } +} + +pub struct SparseLinuxFile { + file: RwLock, +} + +#[allow(clippy::readonly_write_lock)] +impl File for SparseLinuxFile { + #[instrument(err, skip_all, level = Level::TRACE)] + fn lock_file(&self, _exclusive: bool) -> Result<()> { + Ok(()) + } + + #[instrument(err, skip_all, level = Level::TRACE)] + fn unlock_file(&self) -> Result<()> { + Ok(()) + } + + #[instrument(skip(self, c), level = Level::TRACE)] + fn pread(&self, pos: u64, c: Completion) -> Result { + let file = self.file.read().unwrap(); + let nr = { + let r = c.as_read(); + let buf = r.buf(); + let buf = buf.as_mut_slice(); + file.read_exact_at(buf, pos)?; + buf.len() as i32 + }; + c.complete(nr); + Ok(c) + } + + #[instrument(skip(self, c, buffer), level = Level::TRACE)] + fn pwrite(&self, pos: u64, buffer: Arc, c: Completion) -> Result { + let file = self.file.write().unwrap(); + let buf = buffer.as_slice(); + file.write_all_at(buf, pos)?; + c.complete(buffer.len() as i32); + Ok(c) + } + + #[instrument(err, skip_all, level = Level::TRACE)] + fn sync(&self, c: Completion) -> Result { + let file = self.file.write().unwrap(); + file.sync_all()?; + c.complete(0); + Ok(c) + } + + #[instrument(err, skip_all, level = Level::TRACE)] + fn truncate(&self, len: u64, c: Completion) -> Result { + let file = self.file.write().unwrap(); + file.set_len(len)?; + c.complete(0); + Ok(c) + } + + fn size(&self) -> Result { + let file = self.file.read().unwrap(); + Ok(file.metadata()?.len()) + } + + fn has_hole(&self, pos: usize, len: usize) -> turso_core::Result { + let file = self.file.read().unwrap(); + // SEEK_DATA: Adjust the file offset to the next location in the file + // greater than or equal to offset containing data. If offset + // points to data, then the file offset is set to offset + // (see https://man7.org/linux/man-pages/man2/lseek.2.html#DESCRIPTION) + let res = unsafe { libc::lseek(file.as_raw_fd(), pos as i64, libc::SEEK_DATA) }; + if res == -1 { + let errno = unsafe { *libc::__errno_location() }; + if errno == libc::ENXIO { + // ENXIO: whence is SEEK_DATA or SEEK_HOLE, and offset is beyond the + // end of the file, or whence is SEEK_DATA and offset is + // within a hole at the end of the file. + // (see https://man7.org/linux/man-pages/man2/lseek.2.html#ERRORS) + return Ok(true); + } else { + return Err(turso_core::LimboError::CompletionError( + turso_core::CompletionError::IOError( + std::io::Error::from_raw_os_error(errno).kind(), + ), + )); + } + } + // lseek succeeded - the hole is here if next data is strictly before pos + len - 1 (the last byte of the checked region + Ok(res as usize >= pos + len) + } + + fn punch_hole(&self, pos: usize, len: usize) -> turso_core::Result<()> { + let file = self.file.write().unwrap(); + let res = unsafe { + libc::fallocate( + file.as_raw_fd(), + libc::FALLOC_FL_PUNCH_HOLE | libc::FALLOC_FL_KEEP_SIZE, + pos as i64, + len as i64, + ) + }; + if res == -1 { + let errno = unsafe { *libc::__errno_location() }; + Err(turso_core::LimboError::CompletionError( + turso_core::CompletionError::IOError( + std::io::Error::from_raw_os_error(errno).kind(), + ), + )) + } else { + Ok(()) + } + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use turso_core::{Buffer, Completion, OpenFlags, IO}; + + use crate::sparse_io::SparseLinuxIo; + + #[test] + pub fn sparse_io_test() { + let tmp = tempfile::NamedTempFile::new().unwrap(); + let tmp_path = tmp.into_temp_path(); + let tmp_path = tmp_path.as_os_str().to_str().unwrap(); + let io = SparseLinuxIo::new().unwrap(); + let file = io.open_file(tmp_path, OpenFlags::default(), false).unwrap(); + let _ = file + .truncate(1024 * 1024, Completion::new_trunc(|_| {})) + .unwrap(); + assert!(file.has_hole(0, 4096).unwrap()); + + let buffer = Arc::new(Buffer::new_temporary(4096)); + buffer.as_mut_slice().fill(1); + let _ = file + .pwrite(0, buffer.clone(), Completion::new_write(|_| {})) + .unwrap(); + assert!(!file.has_hole(0, 4096).unwrap()); + + assert!(file.has_hole(4096, 4096).unwrap()); + assert!(file.has_hole(4096 * 2, 4096).unwrap()); + + let _ = file + .pwrite(4096 * 2, buffer.clone(), Completion::new_write(|_| {})) + .unwrap(); + assert!(file.has_hole(4096, 4096).unwrap()); + assert!(!file.has_hole(4096 * 2, 4096).unwrap()); + + assert!(!file.has_hole(4096, 4097).unwrap()); + + file.punch_hole(2 * 4096, 4096).unwrap(); + assert!(file.has_hole(4096 * 2, 4096).unwrap()); + assert!(file.has_hole(4096, 4097).unwrap()); + } +} diff --git a/sync/engine/src/types.rs b/sync/engine/src/types.rs index 4c08ee394..81951baab 100644 --- a/sync/engine/src/types.rs +++ b/sync/engine/src/types.rs @@ -1,24 +1,27 @@ -use std::{cell::RefCell, collections::HashMap, sync::Arc}; +use std::{ + collections::HashMap, + sync::{Arc, Mutex}, +}; use serde::{Deserialize, Serialize}; use crate::{database_sync_operations::MutexSlot, errors::Error, Result}; pub struct Coro { - pub ctx: RefCell, + pub ctx: Mutex, gen: genawaiter::sync::Co>, } impl Coro { pub fn new(ctx: Ctx, gen: genawaiter::sync::Co>) -> Self { Self { - ctx: RefCell::new(ctx), + ctx: Mutex::new(ctx), gen, } } pub async fn yield_(&self, value: ProtocolCommand) -> Result<()> { let ctx = self.gen.yield_(value).await?; - self.ctx.replace(ctx); + *self.ctx.lock().unwrap() = ctx; Ok(()) } } @@ -27,7 +30,7 @@ impl From>> for Coro<()> { fn from(value: genawaiter::sync::Co>) -> Self { Self { gen: value, - ctx: RefCell::new(()), + ctx: Mutex::new(()), } } } @@ -68,6 +71,8 @@ pub struct SyncEngineStats { pub last_pull_unix_time: Option, pub last_push_unix_time: Option, pub revision: Option, + pub network_sent_bytes: usize, + pub network_received_bytes: usize, } #[derive(Debug, Clone, Copy, PartialEq)] @@ -95,6 +100,7 @@ pub struct DatabaseMetadata { pub last_push_unix_time: Option, pub last_pushed_pull_gen_hint: i64, pub last_pushed_change_id_hint: i64, + pub partial_bootstrap_server_revision: Option, } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]