diff --git a/bindings/javascript/sync/packages/common/types.ts b/bindings/javascript/sync/packages/common/types.ts index fc57b534f..61cbd7f0b 100644 --- a/bindings/javascript/sync/packages/common/types.ts +++ b/bindings/javascript/sync/packages/common/types.ts @@ -138,6 +138,14 @@ export interface DatabaseStats { * (can be used as e-tag, but string must not be interpreted in any way and must be used as opaque value) */ revision: string | null; + /** + * total amount of sent bytes over the network + */ + networkSentBytes: number; + /** + * total amount of received bytes over the network + */ + networkReceivedBytes: number; } /* internal types used in the native/browser packages */ diff --git a/bindings/javascript/sync/packages/native/index.d.ts b/bindings/javascript/sync/packages/native/index.d.ts index 6550f730d..33187509d 100644 --- a/bindings/javascript/sync/packages/native/index.d.ts +++ b/bindings/javascript/sync/packages/native/index.d.ts @@ -221,7 +221,7 @@ export type DatabaseRowTransformResultJs = export type GeneratorResponse = | { type: 'IO' } | { type: 'Done' } - | { type: 'SyncEngineStats', operations: number, mainWal: number, revertWal: number, lastPullUnixTime?: number, lastPushUnixTime?: number, revision?: string } + | { type: 'SyncEngineStats', operations: number, mainWal: number, revertWal: number, lastPullUnixTime?: number, lastPushUnixTime?: number, revision?: string, networkSentBytes: number, networkReceivedBytes: number } | { type: 'SyncEngineChanges', changes: SyncEngineChanges } export type JsProtocolRequest = diff --git a/bindings/javascript/sync/packages/native/promise.test.ts b/bindings/javascript/sync/packages/native/promise.test.ts index ea95e0f13..7ab40ca96 100644 --- a/bindings/javascript/sync/packages/native/promise.test.ts +++ b/bindings/javascript/sync/packages/native/promise.test.ts @@ -13,6 +13,40 @@ function cleanup(path) { try { unlinkSync(`${path}-wal-revert`) } catch (e) { } } +test('partial sync', async () => { + { + const db = await connect({ + path: ':memory:', + url: process.env.VITE_TURSO_DB_URL, + longPollTimeoutMs: 100, + }); + await db.exec("CREATE TABLE IF NOT EXISTS partial(value BLOB)"); + await db.exec("DELETE FROM partial"); + await db.exec("INSERT INTO partial SELECT randomblob(1024) FROM generate_series(1, 2000)"); + await db.push(); + await db.close(); + } + + const db = await connect({ + path: ':memory:', + url: process.env.VITE_TURSO_DB_URL, + longPollTimeoutMs: 100, + partial: true, + }); + + // 128 pages plus some overhead (very rough estimation) + expect((await db.stats()).networkReceivedBytes).toBeLessThanOrEqual(128 * (4096 + 128)); + + // select of one record shouldn't increase amount of received data + expect(await db.prepare("SELECT length(value) as length FROM partial LIMIT 1").all()).toEqual([{ length: 1024 }]); + expect((await db.stats()).networkReceivedBytes).toBeLessThanOrEqual(128 * (4096 + 128)); + + await db.prepare("INSERT INTO partial VALUES (-1)").run(); + + expect(await db.prepare("SELECT COUNT(*) as cnt FROM partial").all()).toEqual([{ cnt: 2001 }]); + expect((await db.stats()).networkReceivedBytes).toBeGreaterThanOrEqual(2000 * 1024); +}) + test('concurrent-actions-consistency', async () => { { const db = await connect({ diff --git a/bindings/javascript/sync/packages/wasm/promise-bundle.ts b/bindings/javascript/sync/packages/wasm/promise-bundle.ts index 8080d7c53..88ec078d3 100644 --- a/bindings/javascript/sync/packages/wasm/promise-bundle.ts +++ b/bindings/javascript/sync/packages/wasm/promise-bundle.ts @@ -1,6 +1,6 @@ import { registerFileAtWorker, unregisterFileAtWorker } from "@tursodatabase/database-wasm-common" import { DatabasePromise } from "@tursodatabase/database-common" -import { ProtocolIo, run, DatabaseOpts, EncryptionOpts, RunOpts, DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult, DatabaseStats, SyncEngineGuards } from "@tursodatabase/sync-common"; +import { ProtocolIo, run, DatabaseOpts, EncryptionOpts, RunOpts, DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult, DatabaseStats, SyncEngineGuards, Runner, runner } from "@tursodatabase/sync-common"; import { SyncEngine, SyncEngineProtocolVersion, initThreadPool, MainWorker, Database as NativeDatabase } from "./index-bundle.js"; let BrowserIO: ProtocolIo = { @@ -39,7 +39,7 @@ async function init(): Promise { } class Database extends DatabasePromise { - #runOpts: RunOpts; + #runner: Runner; #engine: any; #io: ProtocolIo; #guards: SyncEngineGuards; @@ -61,7 +61,6 @@ class Database extends DatabasePromise { bootstrapIfEmpty: typeof opts.url != "function" || opts.url() != null, remoteEncryption: opts.remoteEncryption?.cipher, }); - super(engine.db() as unknown as any); let headers: { [K: string]: string } | (() => Promise<{ [K: string]: string }>); if (typeof opts.authToken == "function") { @@ -83,14 +82,21 @@ class Database extends DatabasePromise { }) }; } - this.#runOpts = { + const runOpts = { url: opts.url, headers: headers, preemptionMs: 1, transform: opts.transform, }; + const db = engine.db() as unknown as any; + const memory = db.memory; + const io = memory ? memoryIO() : BrowserIO; + const run = runner(runOpts, io, engine); + super(engine.db() as unknown as any, () => run.wait()); + + this.#runner = run; this.#engine = engine; - this.#io = this.memory ? memoryIO() : BrowserIO; + this.#io = io; this.#guards = new SyncEngineGuards(); } /** @@ -112,7 +118,7 @@ class Database extends DatabasePromise { registerFileAtWorker(this.#worker, `${this.name}-changes`), ]); } - await run(this.#runOpts, this.#io, this.#engine, this.#engine.connect()); + await run(this.#runner, this.#engine.connect()); } this.connected = true; } @@ -125,11 +131,11 @@ class Database extends DatabasePromise { if (this.#engine == null) { throw new Error("sync is disabled as database was opened without sync support") } - const changes = await this.#guards.wait(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.wait())); + const changes = await this.#guards.wait(async () => await run(this.#runner, this.#engine.wait())); if (changes.empty()) { return false; } - await this.#guards.apply(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.apply(changes))); + await this.#guards.apply(async () => await run(this.#runner, this.#engine.apply(changes))); return true; } /** @@ -140,7 +146,7 @@ class Database extends DatabasePromise { if (this.#engine == null) { throw new Error("sync is disabled as database was opened without sync support") } - await this.#guards.push(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.push())); + await this.#guards.push(async () => await run(this.#runner, this.#engine.push())); } /** * checkpoint WAL for local database @@ -149,7 +155,7 @@ class Database extends DatabasePromise { if (this.#engine == null) { throw new Error("sync is disabled as database was opened without sync support") } - await this.#guards.checkpoint(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.checkpoint())); + await this.#guards.checkpoint(async () => await run(this.#runner, this.#engine.checkpoint())); } /** * @returns statistic of current local database @@ -158,7 +164,7 @@ class Database extends DatabasePromise { if (this.#engine == null) { throw new Error("sync is disabled as database was opened without sync support") } - return (await run(this.#runOpts, this.#io, this.#engine, this.#engine.stats())); + return (await run(this.#runner, this.#engine.stats())); } /** * close the database and relevant files diff --git a/bindings/javascript/sync/packages/wasm/promise-default.ts b/bindings/javascript/sync/packages/wasm/promise-default.ts index 156998cfc..698add803 100644 --- a/bindings/javascript/sync/packages/wasm/promise-default.ts +++ b/bindings/javascript/sync/packages/wasm/promise-default.ts @@ -1,6 +1,6 @@ import { registerFileAtWorker, unregisterFileAtWorker } from "@tursodatabase/database-wasm-common" import { DatabasePromise } from "@tursodatabase/database-common" -import { ProtocolIo, run, DatabaseOpts, EncryptionOpts, RunOpts, DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult, DatabaseStats, SyncEngineGuards } from "@tursodatabase/sync-common"; +import { ProtocolIo, run, DatabaseOpts, EncryptionOpts, RunOpts, DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult, DatabaseStats, SyncEngineGuards, Runner, runner } from "@tursodatabase/sync-common"; import { SyncEngine, SyncEngineProtocolVersion, initThreadPool, MainWorker, Database as NativeDatabase } from "./index-default.js"; let BrowserIO: ProtocolIo = { @@ -39,7 +39,7 @@ async function init(): Promise { } class Database extends DatabasePromise { - #runOpts: RunOpts; + #runner: Runner; #engine: any; #io: ProtocolIo; #guards: SyncEngineGuards; @@ -61,7 +61,6 @@ class Database extends DatabasePromise { bootstrapIfEmpty: typeof opts.url != "function" || opts.url() != null, remoteEncryption: opts.remoteEncryption?.cipher, }); - super(engine.db() as unknown as any); let headers: { [K: string]: string } | (() => Promise<{ [K: string]: string }>); if (typeof opts.authToken == "function") { @@ -83,14 +82,21 @@ class Database extends DatabasePromise { }) }; } - this.#runOpts = { + const runOpts = { url: opts.url, headers: headers, preemptionMs: 1, transform: opts.transform, }; + const db = engine.db() as unknown as any; + const memory = db.memory; + const io = memory ? memoryIO() : BrowserIO; + const run = runner(runOpts, io, engine); + super(engine.db() as unknown as any, () => run.wait()); + + this.#runner = run; this.#engine = engine; - this.#io = this.memory ? memoryIO() : BrowserIO; + this.#io = io; this.#guards = new SyncEngineGuards(); } /** @@ -112,7 +118,7 @@ class Database extends DatabasePromise { registerFileAtWorker(this.#worker, `${this.name}-changes`), ]); } - await run(this.#runOpts, this.#io, this.#engine, this.#engine.connect()); + await run(this.#runner, this.#engine.connect()); } this.connected = true; } @@ -125,11 +131,11 @@ class Database extends DatabasePromise { if (this.#engine == null) { throw new Error("sync is disabled as database was opened without sync support") } - const changes = await this.#guards.wait(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.wait())); + const changes = await this.#guards.wait(async () => await run(this.#runner, this.#engine.wait())); if (changes.empty()) { return false; } - await this.#guards.apply(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.apply(changes))); + await this.#guards.apply(async () => await run(this.#runner, this.#engine.apply(changes))); return true; } /** @@ -140,7 +146,7 @@ class Database extends DatabasePromise { if (this.#engine == null) { throw new Error("sync is disabled as database was opened without sync support") } - await this.#guards.push(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.push())); + await this.#guards.push(async () => await run(this.#runner, this.#engine.push())); } /** * checkpoint WAL for local database @@ -149,7 +155,7 @@ class Database extends DatabasePromise { if (this.#engine == null) { throw new Error("sync is disabled as database was opened without sync support") } - await this.#guards.checkpoint(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.checkpoint())); + await this.#guards.checkpoint(async () => await run(this.#runner, this.#engine.checkpoint())); } /** * @returns statistic of current local database @@ -158,7 +164,7 @@ class Database extends DatabasePromise { if (this.#engine == null) { throw new Error("sync is disabled as database was opened without sync support") } - return (await run(this.#runOpts, this.#io, this.#engine, this.#engine.stats())); + return (await run(this.#runner, this.#engine.stats())); } /** * close the database and relevant files diff --git a/bindings/javascript/sync/packages/wasm/promise-turbopack-hack.ts b/bindings/javascript/sync/packages/wasm/promise-turbopack-hack.ts index 935b8b31d..eeb55f475 100644 --- a/bindings/javascript/sync/packages/wasm/promise-turbopack-hack.ts +++ b/bindings/javascript/sync/packages/wasm/promise-turbopack-hack.ts @@ -1,6 +1,6 @@ import { registerFileAtWorker, unregisterFileAtWorker } from "@tursodatabase/database-wasm-common" import { DatabasePromise } from "@tursodatabase/database-common" -import { ProtocolIo, run, DatabaseOpts, EncryptionOpts, RunOpts, DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult, DatabaseStats, SyncEngineGuards } from "@tursodatabase/sync-common"; +import { ProtocolIo, run, DatabaseOpts, EncryptionOpts, RunOpts, DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult, DatabaseStats, SyncEngineGuards, Runner, runner } from "@tursodatabase/sync-common"; import { SyncEngine, SyncEngineProtocolVersion, initThreadPool, MainWorker, Database as NativeDatabase } from "./index-turbopack-hack.js"; let BrowserIO: ProtocolIo = { @@ -39,7 +39,7 @@ async function init(): Promise { } class Database extends DatabasePromise { - #runOpts: RunOpts; + #runner: Runner; #engine: any; #io: ProtocolIo; #guards: SyncEngineGuards; @@ -61,7 +61,6 @@ class Database extends DatabasePromise { bootstrapIfEmpty: typeof opts.url != "function" || opts.url() != null, remoteEncryption: opts.remoteEncryption?.cipher, }); - super(engine.db() as unknown as any); let headers: { [K: string]: string } | (() => Promise<{ [K: string]: string }>); if (typeof opts.authToken == "function") { @@ -83,14 +82,21 @@ class Database extends DatabasePromise { }) }; } - this.#runOpts = { + const runOpts = { url: opts.url, headers: headers, preemptionMs: 1, transform: opts.transform, }; + const db = engine.db() as unknown as any; + const memory = db.memory; + const io = memory ? memoryIO() : BrowserIO; + const run = runner(runOpts, io, engine); + super(engine.db() as unknown as any, () => run.wait()); + + this.#runner = run; this.#engine = engine; - this.#io = this.memory ? memoryIO() : BrowserIO; + this.#io = io; this.#guards = new SyncEngineGuards(); } /** @@ -112,7 +118,7 @@ class Database extends DatabasePromise { registerFileAtWorker(this.#worker, `${this.name}-changes`), ]); } - await run(this.#runOpts, this.#io, this.#engine, this.#engine.connect()); + await run(this.#runner, this.#engine.connect()); } this.connected = true; } @@ -125,11 +131,11 @@ class Database extends DatabasePromise { if (this.#engine == null) { throw new Error("sync is disabled as database was opened without sync support") } - const changes = await this.#guards.wait(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.wait())); + const changes = await this.#guards.wait(async () => await run(this.#runner, this.#engine.wait())); if (changes.empty()) { return false; } - await this.#guards.apply(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.apply(changes))); + await this.#guards.apply(async () => await run(this.#runner, this.#engine.apply(changes))); return true; } /** @@ -140,7 +146,7 @@ class Database extends DatabasePromise { if (this.#engine == null) { throw new Error("sync is disabled as database was opened without sync support") } - await this.#guards.push(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.push())); + await this.#guards.push(async () => await run(this.#runner, this.#engine.push())); } /** * checkpoint WAL for local database @@ -149,7 +155,7 @@ class Database extends DatabasePromise { if (this.#engine == null) { throw new Error("sync is disabled as database was opened without sync support") } - await this.#guards.checkpoint(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.checkpoint())); + await this.#guards.checkpoint(async () => await run(this.#runner, this.#engine.checkpoint())); } /** * @returns statistic of current local database @@ -158,7 +164,7 @@ class Database extends DatabasePromise { if (this.#engine == null) { throw new Error("sync is disabled as database was opened without sync support") } - return (await run(this.#runOpts, this.#io, this.#engine, this.#engine.stats())); + return (await run(this.#runner, this.#engine.stats())); } /** * close the database and relevant files diff --git a/bindings/javascript/sync/packages/wasm/promise-vite-dev-hack.ts b/bindings/javascript/sync/packages/wasm/promise-vite-dev-hack.ts index fbced4d2c..700ac52c0 100644 --- a/bindings/javascript/sync/packages/wasm/promise-vite-dev-hack.ts +++ b/bindings/javascript/sync/packages/wasm/promise-vite-dev-hack.ts @@ -1,6 +1,6 @@ import { registerFileAtWorker, unregisterFileAtWorker } from "@tursodatabase/database-wasm-common" import { DatabasePromise } from "@tursodatabase/database-common" -import { ProtocolIo, run, DatabaseOpts, EncryptionOpts, RunOpts, DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult, DatabaseStats, SyncEngineGuards } from "@tursodatabase/sync-common"; +import { ProtocolIo, run, DatabaseOpts, EncryptionOpts, RunOpts, DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult, DatabaseStats, SyncEngineGuards, Runner, runner } from "@tursodatabase/sync-common"; import { SyncEngine, SyncEngineProtocolVersion, initThreadPool, MainWorker, Database as NativeDatabase } from "./index-vite-dev-hack.js"; let BrowserIO: ProtocolIo = { @@ -39,7 +39,7 @@ async function init(): Promise { } class Database extends DatabasePromise { - #runOpts: RunOpts; + #runner: Runner; #engine: any; #io: ProtocolIo; #guards: SyncEngineGuards; @@ -61,7 +61,6 @@ class Database extends DatabasePromise { bootstrapIfEmpty: typeof opts.url != "function" || opts.url() != null, remoteEncryption: opts.remoteEncryption?.cipher, }); - super(engine.db() as unknown as any); let headers: { [K: string]: string } | (() => Promise<{ [K: string]: string }>); if (typeof opts.authToken == "function") { @@ -83,14 +82,21 @@ class Database extends DatabasePromise { }) }; } - this.#runOpts = { + const runOpts = { url: opts.url, headers: headers, preemptionMs: 1, transform: opts.transform, }; + const db = engine.db() as unknown as any; + const memory = db.memory; + const io = memory ? memoryIO() : BrowserIO; + const run = runner(runOpts, io, engine); + super(engine.db() as unknown as any, () => run.wait()); + + this.#runner = run; this.#engine = engine; - this.#io = this.memory ? memoryIO() : BrowserIO; + this.#io = io; this.#guards = new SyncEngineGuards(); } /** @@ -112,7 +118,7 @@ class Database extends DatabasePromise { registerFileAtWorker(this.#worker, `${this.name}-changes`), ]); } - await run(this.#runOpts, this.#io, this.#engine, this.#engine.connect()); + await run(this.#runner, this.#engine.connect()); } this.connected = true; } @@ -125,11 +131,11 @@ class Database extends DatabasePromise { if (this.#engine == null) { throw new Error("sync is disabled as database was opened without sync support") } - const changes = await this.#guards.wait(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.wait())); + const changes = await this.#guards.wait(async () => await run(this.#runner, this.#engine.wait())); if (changes.empty()) { return false; } - await this.#guards.apply(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.apply(changes))); + await this.#guards.apply(async () => await run(this.#runner, this.#engine.apply(changes))); return true; } /** @@ -140,7 +146,7 @@ class Database extends DatabasePromise { if (this.#engine == null) { throw new Error("sync is disabled as database was opened without sync support") } - await this.#guards.push(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.push())); + await this.#guards.push(async () => await run(this.#runner, this.#engine.push())); } /** * checkpoint WAL for local database @@ -149,7 +155,7 @@ class Database extends DatabasePromise { if (this.#engine == null) { throw new Error("sync is disabled as database was opened without sync support") } - await this.#guards.checkpoint(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.checkpoint())); + await this.#guards.checkpoint(async () => await run(this.#runner, this.#engine.checkpoint())); } /** * @returns statistic of current local database @@ -158,7 +164,7 @@ class Database extends DatabasePromise { if (this.#engine == null) { throw new Error("sync is disabled as database was opened without sync support") } - return (await run(this.#runOpts, this.#io, this.#engine, this.#engine.stats())); + return (await run(this.#runner, this.#engine.stats())); } /** * close the database and relevant files diff --git a/bindings/javascript/sync/src/generator.rs b/bindings/javascript/sync/src/generator.rs index 446543049..5481eef16 100644 --- a/bindings/javascript/sync/src/generator.rs +++ b/bindings/javascript/sync/src/generator.rs @@ -48,6 +48,8 @@ pub enum GeneratorResponse { last_pull_unix_time: Option, last_push_unix_time: Option, revision: Option, + network_sent_bytes: i64, + network_received_bytes: i64, }, SyncEngineChanges { changes: SyncEngineChanges, diff --git a/bindings/javascript/sync/src/lib.rs b/bindings/javascript/sync/src/lib.rs index 7730312c0..6951f2821 100644 --- a/bindings/javascript/sync/src/lib.rs +++ b/bindings/javascript/sync/src/lib.rs @@ -333,6 +333,8 @@ impl SyncEngine { last_pull_unix_time: stats.last_pull_unix_time, last_push_unix_time: stats.last_push_unix_time, revision: stats.revision, + network_sent_bytes: stats.network_sent_bytes as i64, + network_received_bytes: stats.network_received_bytes as i64, })) }) }