From d08bcb6a1775e7199663a4dd56448d7c062b88b8 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Thu, 25 Sep 2025 10:52:05 +0400 Subject: [PATCH] adjust sync package --- .../sync/packages/browser/promise-bundle.ts | 158 ++++++++++++++++-- .../sync/packages/browser/promise-default.ts | 158 ++++++++++++++++-- .../browser/promise-turbopack-hack.ts | 158 ++++++++++++++++-- .../packages/browser/promise-vite-dev-hack.ts | 158 ++++++++++++++++-- .../sync/packages/browser/promise.test.ts | 2 - .../sync/packages/browser/promise.ts | 113 ------------- 6 files changed, 580 insertions(+), 167 deletions(-) delete mode 100644 bindings/javascript/sync/packages/browser/promise.ts diff --git a/bindings/javascript/sync/packages/browser/promise-bundle.ts b/bindings/javascript/sync/packages/browser/promise-bundle.ts index f5c705f80..1a1051fb6 100644 --- a/bindings/javascript/sync/packages/browser/promise-bundle.ts +++ b/bindings/javascript/sync/packages/browser/promise-bundle.ts @@ -1,23 +1,155 @@ -import { SyncOpts, DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult } from "@tursodatabase/sync-common"; -import { connect as promiseConnect, Database } from "./promise.js"; -import { SyncEngine, initThreadPool, MainWorker } from "./index-bundle.js"; +import { registerFileAtWorker, unregisterFileAtWorker } from "@tursodatabase/database-browser-common" +import { DatabasePromise } from "@tursodatabase/database-common" +import { ProtocolIo, run, DatabaseOpts as SyncDatabaseOpts, RunOpts, DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult, DatabaseStats, SyncEngineGuards } from "@tursodatabase/sync-common"; +import { SyncEngine, SyncEngineProtocolVersion, initThreadPool, MainWorker } from "./index-bundle.js"; + +let BrowserIO: ProtocolIo = { + async read(path: string): Promise { + const result = localStorage.getItem(path); + if (result == null) { + return null; + } + return new TextEncoder().encode(result); + }, + async write(path: string, data: Buffer | Uint8Array): Promise { + const array = new Uint8Array(data); + const value = new TextDecoder('utf-8').decode(array); + localStorage.setItem(path, value); + } +}; + +function memoryIO(): ProtocolIo { + let values = new Map(); + return { + async read(path: string): Promise { + return values.get(path); + }, + async write(path: string, data: Buffer | Uint8Array): Promise { + values.set(path, data); + } + } +}; + +async function init(): Promise { + await initThreadPool(); + if (MainWorker == null) { + throw new Error("panic: MainWorker is not initialized"); + } + return MainWorker; +} + +class Database extends DatabasePromise { + #runOpts: RunOpts; + #engine: any; + #io: ProtocolIo; + #guards: SyncEngineGuards; + #worker: Worker | null; + constructor(opts: SyncDatabaseOpts) { + const engine = new SyncEngine({ + path: opts.path, + clientName: opts.clientName, + useTransform: opts.transform != null, + protocolVersion: SyncEngineProtocolVersion.V1, + longPollTimeoutMs: opts.longPollTimeoutMs, + tracing: opts.tracing, + }); + super(engine.db() as unknown as any); + + + let headers = typeof opts.authToken === "function" ? () => ({ + ...(opts.authToken != null && { "Authorization": `Bearer ${(opts.authToken as any)()}` }), + ...(opts.encryptionKey != null && { "x-turso-encryption-key": opts.encryptionKey }) + }) : { + ...(opts.authToken != null && { "Authorization": `Bearer ${opts.authToken}` }), + ...(opts.encryptionKey != null && { "x-turso-encryption-key": opts.encryptionKey }) + }; + this.#runOpts = { + url: opts.url, + headers: headers, + preemptionMs: 1, + transform: opts.transform, + }; + this.#engine = engine; + this.#io = this.memory ? memoryIO() : BrowserIO; + this.#guards = new SyncEngineGuards(); + } + /** + * connect database and initialize it in case of clean start + */ + override async connect() { + if (!this.memory) { + this.#worker = await init(); + await Promise.all([ + registerFileAtWorker(this.#worker, this.name), + registerFileAtWorker(this.#worker, `${this.name}-wal`), + registerFileAtWorker(this.#worker, `${this.name}-wal-revert`), + registerFileAtWorker(this.#worker, `${this.name}-info`), + registerFileAtWorker(this.#worker, `${this.name}-changes`), + ]); + } + await run(this.#runOpts, this.#io, this.#engine, this.#engine.connect()); + } + /** + * pull new changes from the remote database + * if {@link SyncDatabaseOpts.longPollTimeoutMs} is set - then server will hold the connection open until either new changes will appear in the database or timeout occurs. + * @returns true if new changes were pulled from the remote + */ + async pull() { + const changes = await this.#guards.wait(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.wait())); + if (changes.empty()) { + return false; + } + await this.#guards.apply(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.apply(changes))); + return true; + } + /** + * push new local changes to the remote database + * if {@link SyncDatabaseOpts.transform} is set - then provided callback will be called for every mutation before sending it to the remote + */ + async push() { + await this.#guards.push(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.push())); + } + /** + * checkpoint WAL for local database + */ + async checkpoint() { + await this.#guards.checkpoint(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.checkpoint())); + } + /** + * @returns statistic of current local database + */ + async stats(): Promise { + return (await run(this.#runOpts, this.#io, this.#engine, this.#engine.stats())); + } + /** + * close the database and relevant files + */ + async close() { + if (this.name != null && this.#worker != null) { + await Promise.all([ + unregisterFileAtWorker(this.#worker, this.name), + unregisterFileAtWorker(this.#worker, `${this.name}-wal`), + unregisterFileAtWorker(this.#worker, `${this.name}-wal-revert`), + unregisterFileAtWorker(this.#worker, `${this.name}-info`), + unregisterFileAtWorker(this.#worker, `${this.name}-changes`), + ]); + } + await super.close(); + this.#engine.close(); + } +} /** * Creates a new database connection asynchronously. * - * @param {string} path - Path to the database file. * @param {Object} opts - Options for database behavior. * @returns {Promise} - A promise that resolves to a Database instance. */ -async function connect(opts: SyncOpts): Promise { - return await promiseConnect(opts, x => new SyncEngine(x), async () => { - await initThreadPool(); - if (MainWorker == null) { - throw new Error("panic: MainWorker is not initialized"); - } - return MainWorker; - }); +async function connect(opts: SyncDatabaseOpts): Promise { + const db = new Database(opts); + await db.connect(); + return db; } -export { connect, Database, } +export { connect, Database } export type { DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult } diff --git a/bindings/javascript/sync/packages/browser/promise-default.ts b/bindings/javascript/sync/packages/browser/promise-default.ts index 9b0adbed2..6f2cb4d60 100644 --- a/bindings/javascript/sync/packages/browser/promise-default.ts +++ b/bindings/javascript/sync/packages/browser/promise-default.ts @@ -1,23 +1,155 @@ -import { SyncOpts, DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult } from "@tursodatabase/sync-common"; -import { connect as promiseConnect, Database } from "./promise.js"; -import { SyncEngine, initThreadPool, MainWorker } from "./index-default.js"; +import { registerFileAtWorker, unregisterFileAtWorker } from "@tursodatabase/database-browser-common" +import { DatabasePromise } from "@tursodatabase/database-common" +import { ProtocolIo, run, DatabaseOpts as SyncDatabaseOpts, RunOpts, DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult, DatabaseStats, SyncEngineGuards } from "@tursodatabase/sync-common"; +import { SyncEngine, SyncEngineProtocolVersion, initThreadPool, MainWorker } from "./index-default.js"; + +let BrowserIO: ProtocolIo = { + async read(path: string): Promise { + const result = localStorage.getItem(path); + if (result == null) { + return null; + } + return new TextEncoder().encode(result); + }, + async write(path: string, data: Buffer | Uint8Array): Promise { + const array = new Uint8Array(data); + const value = new TextDecoder('utf-8').decode(array); + localStorage.setItem(path, value); + } +}; + +function memoryIO(): ProtocolIo { + let values = new Map(); + return { + async read(path: string): Promise { + return values.get(path); + }, + async write(path: string, data: Buffer | Uint8Array): Promise { + values.set(path, data); + } + } +}; + +async function init(): Promise { + await initThreadPool(); + if (MainWorker == null) { + throw new Error("panic: MainWorker is not initialized"); + } + return MainWorker; +} + +class Database extends DatabasePromise { + #runOpts: RunOpts; + #engine: any; + #io: ProtocolIo; + #guards: SyncEngineGuards; + #worker: Worker | null; + constructor(opts: SyncDatabaseOpts) { + const engine = new SyncEngine({ + path: opts.path, + clientName: opts.clientName, + useTransform: opts.transform != null, + protocolVersion: SyncEngineProtocolVersion.V1, + longPollTimeoutMs: opts.longPollTimeoutMs, + tracing: opts.tracing, + }); + super(engine.db() as unknown as any); + + + let headers = typeof opts.authToken === "function" ? () => ({ + ...(opts.authToken != null && { "Authorization": `Bearer ${(opts.authToken as any)()}` }), + ...(opts.encryptionKey != null && { "x-turso-encryption-key": opts.encryptionKey }) + }) : { + ...(opts.authToken != null && { "Authorization": `Bearer ${opts.authToken}` }), + ...(opts.encryptionKey != null && { "x-turso-encryption-key": opts.encryptionKey }) + }; + this.#runOpts = { + url: opts.url, + headers: headers, + preemptionMs: 1, + transform: opts.transform, + }; + this.#engine = engine; + this.#io = this.memory ? memoryIO() : BrowserIO; + this.#guards = new SyncEngineGuards(); + } + /** + * connect database and initialize it in case of clean start + */ + override async connect() { + if (!this.memory) { + this.#worker = await init(); + await Promise.all([ + registerFileAtWorker(this.#worker, this.name), + registerFileAtWorker(this.#worker, `${this.name}-wal`), + registerFileAtWorker(this.#worker, `${this.name}-wal-revert`), + registerFileAtWorker(this.#worker, `${this.name}-info`), + registerFileAtWorker(this.#worker, `${this.name}-changes`), + ]); + } + await run(this.#runOpts, this.#io, this.#engine, this.#engine.connect()); + } + /** + * pull new changes from the remote database + * if {@link SyncDatabaseOpts.longPollTimeoutMs} is set - then server will hold the connection open until either new changes will appear in the database or timeout occurs. + * @returns true if new changes were pulled from the remote + */ + async pull() { + const changes = await this.#guards.wait(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.wait())); + if (changes.empty()) { + return false; + } + await this.#guards.apply(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.apply(changes))); + return true; + } + /** + * push new local changes to the remote database + * if {@link SyncDatabaseOpts.transform} is set - then provided callback will be called for every mutation before sending it to the remote + */ + async push() { + await this.#guards.push(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.push())); + } + /** + * checkpoint WAL for local database + */ + async checkpoint() { + await this.#guards.checkpoint(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.checkpoint())); + } + /** + * @returns statistic of current local database + */ + async stats(): Promise { + return (await run(this.#runOpts, this.#io, this.#engine, this.#engine.stats())); + } + /** + * close the database and relevant files + */ + async close() { + if (this.name != null && this.#worker != null) { + await Promise.all([ + unregisterFileAtWorker(this.#worker, this.name), + unregisterFileAtWorker(this.#worker, `${this.name}-wal`), + unregisterFileAtWorker(this.#worker, `${this.name}-wal-revert`), + unregisterFileAtWorker(this.#worker, `${this.name}-info`), + unregisterFileAtWorker(this.#worker, `${this.name}-changes`), + ]); + } + await super.close(); + this.#engine.close(); + } +} /** * Creates a new database connection asynchronously. * - * @param {string} path - Path to the database file. * @param {Object} opts - Options for database behavior. * @returns {Promise} - A promise that resolves to a Database instance. */ -async function connect(opts: SyncOpts): Promise { - return await promiseConnect(opts, x => new SyncEngine(x), async () => { - await initThreadPool(); - if (MainWorker == null) { - throw new Error("panic: MainWorker is not initialized"); - } - return MainWorker; - }); +async function connect(opts: SyncDatabaseOpts): Promise { + const db = new Database(opts); + await db.connect(); + return db; } -export { connect, Database, } +export { connect, Database } export type { DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult } diff --git a/bindings/javascript/sync/packages/browser/promise-turbopack-hack.ts b/bindings/javascript/sync/packages/browser/promise-turbopack-hack.ts index 4c1d7e2db..bc886b755 100644 --- a/bindings/javascript/sync/packages/browser/promise-turbopack-hack.ts +++ b/bindings/javascript/sync/packages/browser/promise-turbopack-hack.ts @@ -1,23 +1,155 @@ -import { SyncOpts, DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult } from "@tursodatabase/sync-common"; -import { connect as promiseConnect, Database } from "./promise.js"; -import { SyncEngine, initThreadPool, MainWorker } from "./index-turbopack-hack.js"; +import { registerFileAtWorker, unregisterFileAtWorker } from "@tursodatabase/database-browser-common" +import { DatabasePromise } from "@tursodatabase/database-common" +import { ProtocolIo, run, DatabaseOpts as SyncDatabaseOpts, RunOpts, DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult, DatabaseStats, SyncEngineGuards } from "@tursodatabase/sync-common"; +import { SyncEngine, SyncEngineProtocolVersion, initThreadPool, MainWorker } from "./index-turbopack-hack.js"; + +let BrowserIO: ProtocolIo = { + async read(path: string): Promise { + const result = localStorage.getItem(path); + if (result == null) { + return null; + } + return new TextEncoder().encode(result); + }, + async write(path: string, data: Buffer | Uint8Array): Promise { + const array = new Uint8Array(data); + const value = new TextDecoder('utf-8').decode(array); + localStorage.setItem(path, value); + } +}; + +function memoryIO(): ProtocolIo { + let values = new Map(); + return { + async read(path: string): Promise { + return values.get(path); + }, + async write(path: string, data: Buffer | Uint8Array): Promise { + values.set(path, data); + } + } +}; + +async function init(): Promise { + await initThreadPool(); + if (MainWorker == null) { + throw new Error("panic: MainWorker is not initialized"); + } + return MainWorker; +} + +class Database extends DatabasePromise { + #runOpts: RunOpts; + #engine: any; + #io: ProtocolIo; + #guards: SyncEngineGuards; + #worker: Worker | null; + constructor(opts: SyncDatabaseOpts) { + const engine = new SyncEngine({ + path: opts.path, + clientName: opts.clientName, + useTransform: opts.transform != null, + protocolVersion: SyncEngineProtocolVersion.V1, + longPollTimeoutMs: opts.longPollTimeoutMs, + tracing: opts.tracing, + }); + super(engine.db() as unknown as any); + + + let headers = typeof opts.authToken === "function" ? () => ({ + ...(opts.authToken != null && { "Authorization": `Bearer ${(opts.authToken as any)()}` }), + ...(opts.encryptionKey != null && { "x-turso-encryption-key": opts.encryptionKey }) + }) : { + ...(opts.authToken != null && { "Authorization": `Bearer ${opts.authToken}` }), + ...(opts.encryptionKey != null && { "x-turso-encryption-key": opts.encryptionKey }) + }; + this.#runOpts = { + url: opts.url, + headers: headers, + preemptionMs: 1, + transform: opts.transform, + }; + this.#engine = engine; + this.#io = this.memory ? memoryIO() : BrowserIO; + this.#guards = new SyncEngineGuards(); + } + /** + * connect database and initialize it in case of clean start + */ + override async connect() { + if (!this.memory) { + this.#worker = await init(); + await Promise.all([ + registerFileAtWorker(this.#worker, this.name), + registerFileAtWorker(this.#worker, `${this.name}-wal`), + registerFileAtWorker(this.#worker, `${this.name}-wal-revert`), + registerFileAtWorker(this.#worker, `${this.name}-info`), + registerFileAtWorker(this.#worker, `${this.name}-changes`), + ]); + } + await run(this.#runOpts, this.#io, this.#engine, this.#engine.connect()); + } + /** + * pull new changes from the remote database + * if {@link SyncDatabaseOpts.longPollTimeoutMs} is set - then server will hold the connection open until either new changes will appear in the database or timeout occurs. + * @returns true if new changes were pulled from the remote + */ + async pull() { + const changes = await this.#guards.wait(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.wait())); + if (changes.empty()) { + return false; + } + await this.#guards.apply(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.apply(changes))); + return true; + } + /** + * push new local changes to the remote database + * if {@link SyncDatabaseOpts.transform} is set - then provided callback will be called for every mutation before sending it to the remote + */ + async push() { + await this.#guards.push(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.push())); + } + /** + * checkpoint WAL for local database + */ + async checkpoint() { + await this.#guards.checkpoint(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.checkpoint())); + } + /** + * @returns statistic of current local database + */ + async stats(): Promise { + return (await run(this.#runOpts, this.#io, this.#engine, this.#engine.stats())); + } + /** + * close the database and relevant files + */ + async close() { + if (this.name != null && this.#worker != null) { + await Promise.all([ + unregisterFileAtWorker(this.#worker, this.name), + unregisterFileAtWorker(this.#worker, `${this.name}-wal`), + unregisterFileAtWorker(this.#worker, `${this.name}-wal-revert`), + unregisterFileAtWorker(this.#worker, `${this.name}-info`), + unregisterFileAtWorker(this.#worker, `${this.name}-changes`), + ]); + } + await super.close(); + this.#engine.close(); + } +} /** * Creates a new database connection asynchronously. * - * @param {string} path - Path to the database file. * @param {Object} opts - Options for database behavior. * @returns {Promise} - A promise that resolves to a Database instance. */ -async function connect(opts: SyncOpts): Promise { - return await promiseConnect(opts, x => new SyncEngine(x), async () => { - await initThreadPool(); - if (MainWorker == null) { - throw new Error("panic: MainWorker is not initialized"); - } - return MainWorker; - }); +async function connect(opts: SyncDatabaseOpts): Promise { + const db = new Database(opts); + await db.connect(); + return db; } -export { connect, Database, } +export { connect, Database } export type { DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult } diff --git a/bindings/javascript/sync/packages/browser/promise-vite-dev-hack.ts b/bindings/javascript/sync/packages/browser/promise-vite-dev-hack.ts index 88ad78454..c084f4410 100644 --- a/bindings/javascript/sync/packages/browser/promise-vite-dev-hack.ts +++ b/bindings/javascript/sync/packages/browser/promise-vite-dev-hack.ts @@ -1,23 +1,155 @@ -import { SyncOpts, DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult } from "@tursodatabase/sync-common"; -import { connect as promiseConnect, Database } from "./promise.js"; -import { SyncEngine, initThreadPool, MainWorker } from "./index-vite-dev-hack.js"; +import { registerFileAtWorker, unregisterFileAtWorker } from "@tursodatabase/database-browser-common" +import { DatabasePromise } from "@tursodatabase/database-common" +import { ProtocolIo, run, DatabaseOpts as SyncDatabaseOpts, RunOpts, DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult, DatabaseStats, SyncEngineGuards } from "@tursodatabase/sync-common"; +import { SyncEngine, SyncEngineProtocolVersion, initThreadPool, MainWorker } from "./index-vite-dev-hack.js"; + +let BrowserIO: ProtocolIo = { + async read(path: string): Promise { + const result = localStorage.getItem(path); + if (result == null) { + return null; + } + return new TextEncoder().encode(result); + }, + async write(path: string, data: Buffer | Uint8Array): Promise { + const array = new Uint8Array(data); + const value = new TextDecoder('utf-8').decode(array); + localStorage.setItem(path, value); + } +}; + +function memoryIO(): ProtocolIo { + let values = new Map(); + return { + async read(path: string): Promise { + return values.get(path); + }, + async write(path: string, data: Buffer | Uint8Array): Promise { + values.set(path, data); + } + } +}; + +async function init(): Promise { + await initThreadPool(); + if (MainWorker == null) { + throw new Error("panic: MainWorker is not initialized"); + } + return MainWorker; +} + +class Database extends DatabasePromise { + #runOpts: RunOpts; + #engine: any; + #io: ProtocolIo; + #guards: SyncEngineGuards; + #worker: Worker | null; + constructor(opts: SyncDatabaseOpts) { + const engine = new SyncEngine({ + path: opts.path, + clientName: opts.clientName, + useTransform: opts.transform != null, + protocolVersion: SyncEngineProtocolVersion.V1, + longPollTimeoutMs: opts.longPollTimeoutMs, + tracing: opts.tracing, + }); + super(engine.db() as unknown as any); + + + let headers = typeof opts.authToken === "function" ? () => ({ + ...(opts.authToken != null && { "Authorization": `Bearer ${(opts.authToken as any)()}` }), + ...(opts.encryptionKey != null && { "x-turso-encryption-key": opts.encryptionKey }) + }) : { + ...(opts.authToken != null && { "Authorization": `Bearer ${opts.authToken}` }), + ...(opts.encryptionKey != null && { "x-turso-encryption-key": opts.encryptionKey }) + }; + this.#runOpts = { + url: opts.url, + headers: headers, + preemptionMs: 1, + transform: opts.transform, + }; + this.#engine = engine; + this.#io = this.memory ? memoryIO() : BrowserIO; + this.#guards = new SyncEngineGuards(); + } + /** + * connect database and initialize it in case of clean start + */ + override async connect() { + if (!this.memory) { + this.#worker = await init(); + await Promise.all([ + registerFileAtWorker(this.#worker, this.name), + registerFileAtWorker(this.#worker, `${this.name}-wal`), + registerFileAtWorker(this.#worker, `${this.name}-wal-revert`), + registerFileAtWorker(this.#worker, `${this.name}-info`), + registerFileAtWorker(this.#worker, `${this.name}-changes`), + ]); + } + await run(this.#runOpts, this.#io, this.#engine, this.#engine.connect()); + } + /** + * pull new changes from the remote database + * if {@link SyncDatabaseOpts.longPollTimeoutMs} is set - then server will hold the connection open until either new changes will appear in the database or timeout occurs. + * @returns true if new changes were pulled from the remote + */ + async pull() { + const changes = await this.#guards.wait(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.wait())); + if (changes.empty()) { + return false; + } + await this.#guards.apply(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.apply(changes))); + return true; + } + /** + * push new local changes to the remote database + * if {@link SyncDatabaseOpts.transform} is set - then provided callback will be called for every mutation before sending it to the remote + */ + async push() { + await this.#guards.push(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.push())); + } + /** + * checkpoint WAL for local database + */ + async checkpoint() { + await this.#guards.checkpoint(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.checkpoint())); + } + /** + * @returns statistic of current local database + */ + async stats(): Promise { + return (await run(this.#runOpts, this.#io, this.#engine, this.#engine.stats())); + } + /** + * close the database and relevant files + */ + async close() { + if (this.name != null && this.#worker != null) { + await Promise.all([ + unregisterFileAtWorker(this.#worker, this.name), + unregisterFileAtWorker(this.#worker, `${this.name}-wal`), + unregisterFileAtWorker(this.#worker, `${this.name}-wal-revert`), + unregisterFileAtWorker(this.#worker, `${this.name}-info`), + unregisterFileAtWorker(this.#worker, `${this.name}-changes`), + ]); + } + await super.close(); + this.#engine.close(); + } +} /** * Creates a new database connection asynchronously. * - * @param {string} path - Path to the database file. * @param {Object} opts - Options for database behavior. * @returns {Promise} - A promise that resolves to a Database instance. */ -async function connect(opts: SyncOpts): Promise { - return await promiseConnect(opts, x => new SyncEngine(x), async () => { - await initThreadPool(); - if (MainWorker == null) { - throw new Error("panic: MainWorker is not initialized"); - } - return MainWorker; - }); +async function connect(opts: SyncDatabaseOpts): Promise { + const db = new Database(opts); + await db.connect(); + return db; } -export { connect, Database, } +export { connect, Database } export type { DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult } diff --git a/bindings/javascript/sync/packages/browser/promise.test.ts b/bindings/javascript/sync/packages/browser/promise.test.ts index ff271d0e6..895359533 100644 --- a/bindings/javascript/sync/packages/browser/promise.test.ts +++ b/bindings/javascript/sync/packages/browser/promise.test.ts @@ -331,7 +331,6 @@ test('concurrent-updates', { timeout: 60000 }, async () => { } async function pull(db, i) { try { - console.info('pull', i); await db.pull(); } catch (e) { console.error('pull', i, e); @@ -343,7 +342,6 @@ test('concurrent-updates', { timeout: 60000 }, async () => { } async function push(db, i) { try { - console.info('push', i); await db.push(); } catch (e) { console.error('push', i, e); diff --git a/bindings/javascript/sync/packages/browser/promise.ts b/bindings/javascript/sync/packages/browser/promise.ts deleted file mode 100644 index b12680598..000000000 --- a/bindings/javascript/sync/packages/browser/promise.ts +++ /dev/null @@ -1,113 +0,0 @@ -import { registerFileAtWorker, unregisterFileAtWorker } from "@tursodatabase/database-browser-common" -import { DatabasePromise, DatabaseOpts, NativeDatabase } from "@tursodatabase/database-common" -import { ProtocolIo, run, SyncOpts, RunOpts, memoryIO, SyncEngineStats, SyncEngineGuards } from "@tursodatabase/sync-common"; - -let BrowserIo: ProtocolIo = { - async read(path: string): Promise { - const result = localStorage.getItem(path); - if (result == null) { - return null; - } - return new TextEncoder().encode(result); - }, - async write(path: string, data: Buffer | Uint8Array): Promise { - const array = new Uint8Array(data); - const value = new TextDecoder('utf-8').decode(array); - localStorage.setItem(path, value); - } -}; - - -class Database extends DatabasePromise { - runOpts: RunOpts; - engine: any; - io: ProtocolIo; - worker: Worker | null; - fsPath: string | null; - guards: SyncEngineGuards; - constructor(db: NativeDatabase, io: ProtocolIo, worker: Worker | null, runOpts: RunOpts, engine: any, fsPath: string | null, opts: DatabaseOpts = {}) { - super(db, opts) - this.io = io; - this.worker = worker; - this.runOpts = runOpts; - this.engine = engine; - this.fsPath = fsPath; - this.guards = new SyncEngineGuards(); - } - async sync() { - await this.push(); - await this.pull(); - } - async pull() { - const changes = await this.guards.wait(async () => await run(this.runOpts, this.io, this.engine, this.engine.wait())); - await this.guards.apply(async () => await run(this.runOpts, this.io, this.engine, this.engine.apply(changes))); - } - async push() { - await this.guards.push(async () => await run(this.runOpts, this.io, this.engine, this.engine.push())); - } - async checkpoint() { - await this.guards.checkpoint(async () => await run(this.runOpts, this.io, this.engine, this.engine.checkpoint())); - } - async stats(): Promise { - return (await run(this.runOpts, this.io, this.engine, this.engine.stats())); - } - override async close(): Promise { - this.db.close(); - this.engine.close(); - if (this.fsPath != null && this.worker != null) { - await Promise.all([ - unregisterFileAtWorker(this.worker, this.fsPath), - unregisterFileAtWorker(this.worker, `${this.fsPath}-wal`), - unregisterFileAtWorker(this.worker, `${this.fsPath}-wal-revert`), - unregisterFileAtWorker(this.worker, `${this.fsPath}-info`), - unregisterFileAtWorker(this.worker, `${this.fsPath}-changes`), - ]); - } - } -} - -/** - * Creates a new database connection asynchronously. - * - * @param {string} path - Path to the database file. - * @param {Object} opts - Options for database behavior. - * @returns {Promise} - A promise that resolves to a Database instance. - */ -async function connect(opts: SyncOpts, connect: (any) => any, init: () => Promise): Promise { - const engine = connect({ - path: opts.path, - clientName: opts.clientName, - tablesIgnore: opts.tablesIgnore, - useTransform: opts.transform != null, - tracing: opts.tracing, - protocolVersion: 1, - longPollTimeoutMs: opts.longPollTimeoutMs - }); - const runOpts: RunOpts = { - url: opts.url, - headers: { - ...(opts.authToken != null && { "Authorization": `Bearer ${opts.authToken}` }), - ...(opts.encryptionKey != null && { "x-turso-encryption-key": opts.encryptionKey }) - }, - preemptionMs: 1, - transform: opts.transform, - }; - const isMemory = opts.path == ':memory:'; - let io = isMemory ? memoryIO() : BrowserIo; - - const worker = await init(); - if (!isMemory) { - await Promise.all([ - registerFileAtWorker(worker, opts.path), - registerFileAtWorker(worker, `${opts.path}-wal`), - registerFileAtWorker(worker, `${opts.path}-wal-revert`), - registerFileAtWorker(worker, `${opts.path}-info`), - registerFileAtWorker(worker, `${opts.path}-changes`), - ]); - } - await run(runOpts, io, engine, engine.init()); - const nativeDb = engine.open(); - return new Database(nativeDb as any, io, worker, runOpts, engine, isMemory ? null : opts.path, {}); -} - -export { connect, Database }