diff --git a/Cargo.lock b/Cargo.lock index 8f4dcfcc3..3c2310e86 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4589,6 +4589,7 @@ dependencies = [ "tracing-subscriber", "turso", "turso_core", + "turso_parser", "uuid", ] diff --git a/bindings/javascript/sync/packages/common/run.ts b/bindings/javascript/sync/packages/common/run.ts index 4d1d1c969..554c5ff61 100644 --- a/bindings/javascript/sync/packages/common/run.ts +++ b/bindings/javascript/sync/packages/common/run.ts @@ -22,15 +22,25 @@ async function process(opts: RunOpts, io: ProtocolIo, request: any) { const requestType = request.request(); const completion = request.completion(); if (requestType.type == 'Http') { + let url: string | null = null; + if (typeof opts.url == "function") { + url = opts.url(); + } else { + url = opts.url; + } + if (url == null) { + completion.poison(`url is empty - sync is paused`); + return; + } try { - let headers = typeof opts.headers === "function" ? opts.headers() : opts.headers; + let headers = typeof opts.headers === "function" ? await opts.headers() : opts.headers; if (requestType.headers != null && requestType.headers.length > 0) { headers = { ...opts.headers }; for (let header of requestType.headers) { headers[header[0]] = header[1]; } } - const response = await fetch(`${opts.url}${requestType.path}`, { + const response = await fetch(`${url}${requestType.path}`, { method: requestType.method, headers: headers, body: requestType.body != null ? new Uint8Array(requestType.body) : null, diff --git a/bindings/javascript/sync/packages/common/types.ts b/bindings/javascript/sync/packages/common/types.ts index ee3e77a87..98f0f0c69 100644 --- a/bindings/javascript/sync/packages/common/types.ts +++ b/bindings/javascript/sync/packages/common/types.ts @@ -61,7 +61,7 @@ export interface EncryptionOpts { // base64 encoded encryption key (must be either 16 or 32 bytes depending on the cipher) key: string, // encryption cipher algorithm - cipher: 'aes256gcm' | 'aes128gcm' | 'chacha20poly1305' + cipher: 'aes256gcm' | 'aes128gcm' | 'chacha20poly1305' | 'aegis256' } export interface DatabaseOpts { /** @@ -73,22 +73,26 @@ export interface DatabaseOpts { /** * optional url of the remote database (e.g. libsql://db-org.turso.io) * (if omitted - local-only database will be created) + * + * you can also promide function which will return URL or null + * in this case local database will be created and sync will be "switched-on" whenever the url will return non-empty value + * note, that all other parameters (like encryption) must be set in advance in order for the "deferred" sync to work properly */ - url?: string; + url?: string | (() => string | null); /** * auth token for the remote database * (can be either static string or function which will provide short-lived credentials for every new request) */ - authToken?: string | (() => string); + authToken?: string | (() => Promise); /** * arbitrary client name which can be used to distinguish clients internally * the library will gurantee uniquiness of the clientId by appending unique suffix to the clientName */ clientName?: string; /** - * optional encryption parameters if cloud database were encrypted by default + * optional remote encryption parameters if cloud database were encrypted by default */ - encryption?: EncryptionOpts; + remoteEncryption?: EncryptionOpts; /** * optional callback which will be called for every mutation before sending it to the remote * this callback can transform the update in order to support complex conflict resolution strategy @@ -136,8 +140,8 @@ export interface DatabaseStats { export interface RunOpts { preemptionMs: number, - url: string, - headers: { [K: string]: string } | (() => { [K: string]: string }) + url: string | (() => string | null), + headers: { [K: string]: string } | (() => Promise<{ [K: string]: string }>) transform?: Transform, } diff --git a/bindings/javascript/sync/packages/native/index.d.ts b/bindings/javascript/sync/packages/native/index.d.ts index aeb10e8ee..289cddbbb 100644 --- a/bindings/javascript/sync/packages/native/index.d.ts +++ b/bindings/javascript/sync/packages/native/index.d.ts @@ -220,7 +220,7 @@ export type DatabaseRowTransformResultJs = export type GeneratorResponse = | { type: 'IO' } | { type: 'Done' } - | { type: 'SyncEngineStats', operations: number, mainWal: number, revertWal: number, lastPullUnixTime: number, lastPushUnixTime?: number, revision?: string } + | { type: 'SyncEngineStats', operations: number, mainWal: number, revertWal: number, lastPullUnixTime?: number, lastPushUnixTime?: number, revision?: string } | { type: 'SyncEngineChanges', changes: SyncEngineChanges } export type JsProtocolRequest = @@ -238,6 +238,8 @@ export interface SyncEngineOpts { tablesIgnore?: Array useTransform: boolean protocolVersion?: SyncEngineProtocolVersion + bootstrapIfEmpty: boolean + remoteEncryption?: string } export declare const enum SyncEngineProtocolVersion { diff --git a/bindings/javascript/sync/packages/native/index.js b/bindings/javascript/sync/packages/native/index.js index 43c284371..10d3073fa 100644 --- a/bindings/javascript/sync/packages/native/index.js +++ b/bindings/javascript/sync/packages/native/index.js @@ -81,8 +81,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-android-arm64') const bindingPackageVersion = require('@tursodatabase/sync-android-arm64/package.json').version - if (bindingPackageVersion !== '0.2.0-pre.11' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.11 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.2.0-pre.13' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.13 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -97,8 +97,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-android-arm-eabi') const bindingPackageVersion = require('@tursodatabase/sync-android-arm-eabi/package.json').version - if (bindingPackageVersion !== '0.2.0-pre.11' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.11 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.2.0-pre.13' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.13 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -117,8 +117,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-win32-x64-msvc') const bindingPackageVersion = require('@tursodatabase/sync-win32-x64-msvc/package.json').version - if (bindingPackageVersion !== '0.2.0-pre.11' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.11 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.2.0-pre.13' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.13 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -133,8 +133,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-win32-ia32-msvc') const bindingPackageVersion = require('@tursodatabase/sync-win32-ia32-msvc/package.json').version - if (bindingPackageVersion !== '0.2.0-pre.11' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.11 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.2.0-pre.13' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.13 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -149,8 +149,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-win32-arm64-msvc') const bindingPackageVersion = require('@tursodatabase/sync-win32-arm64-msvc/package.json').version - if (bindingPackageVersion !== '0.2.0-pre.11' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.11 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.2.0-pre.13' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.13 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -168,8 +168,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-darwin-universal') const bindingPackageVersion = require('@tursodatabase/sync-darwin-universal/package.json').version - if (bindingPackageVersion !== '0.2.0-pre.11' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.11 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.2.0-pre.13' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.13 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -184,8 +184,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-darwin-x64') const bindingPackageVersion = require('@tursodatabase/sync-darwin-x64/package.json').version - if (bindingPackageVersion !== '0.2.0-pre.11' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.11 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.2.0-pre.13' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.13 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -200,8 +200,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-darwin-arm64') const bindingPackageVersion = require('@tursodatabase/sync-darwin-arm64/package.json').version - if (bindingPackageVersion !== '0.2.0-pre.11' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.11 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.2.0-pre.13' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.13 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -220,8 +220,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-freebsd-x64') const bindingPackageVersion = require('@tursodatabase/sync-freebsd-x64/package.json').version - if (bindingPackageVersion !== '0.2.0-pre.11' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.11 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.2.0-pre.13' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.13 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -236,8 +236,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-freebsd-arm64') const bindingPackageVersion = require('@tursodatabase/sync-freebsd-arm64/package.json').version - if (bindingPackageVersion !== '0.2.0-pre.11' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.11 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.2.0-pre.13' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.13 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -257,8 +257,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-linux-x64-musl') const bindingPackageVersion = require('@tursodatabase/sync-linux-x64-musl/package.json').version - if (bindingPackageVersion !== '0.2.0-pre.11' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.11 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.2.0-pre.13' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.13 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -273,8 +273,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-linux-x64-gnu') const bindingPackageVersion = require('@tursodatabase/sync-linux-x64-gnu/package.json').version - if (bindingPackageVersion !== '0.2.0-pre.11' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.11 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.2.0-pre.13' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.13 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -291,8 +291,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-linux-arm64-musl') const bindingPackageVersion = require('@tursodatabase/sync-linux-arm64-musl/package.json').version - if (bindingPackageVersion !== '0.2.0-pre.11' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.11 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.2.0-pre.13' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.13 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -307,8 +307,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-linux-arm64-gnu') const bindingPackageVersion = require('@tursodatabase/sync-linux-arm64-gnu/package.json').version - if (bindingPackageVersion !== '0.2.0-pre.11' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.11 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.2.0-pre.13' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.13 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -325,8 +325,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-linux-arm-musleabihf') const bindingPackageVersion = require('@tursodatabase/sync-linux-arm-musleabihf/package.json').version - if (bindingPackageVersion !== '0.2.0-pre.11' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.11 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.2.0-pre.13' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.13 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -341,8 +341,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-linux-arm-gnueabihf') const bindingPackageVersion = require('@tursodatabase/sync-linux-arm-gnueabihf/package.json').version - if (bindingPackageVersion !== '0.2.0-pre.11' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.11 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.2.0-pre.13' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.13 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -359,8 +359,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-linux-riscv64-musl') const bindingPackageVersion = require('@tursodatabase/sync-linux-riscv64-musl/package.json').version - if (bindingPackageVersion !== '0.2.0-pre.11' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.11 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.2.0-pre.13' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.13 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -375,8 +375,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-linux-riscv64-gnu') const bindingPackageVersion = require('@tursodatabase/sync-linux-riscv64-gnu/package.json').version - if (bindingPackageVersion !== '0.2.0-pre.11' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.11 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.2.0-pre.13' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.13 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -392,8 +392,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-linux-ppc64-gnu') const bindingPackageVersion = require('@tursodatabase/sync-linux-ppc64-gnu/package.json').version - if (bindingPackageVersion !== '0.2.0-pre.11' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.11 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.2.0-pre.13' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.13 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -408,8 +408,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-linux-s390x-gnu') const bindingPackageVersion = require('@tursodatabase/sync-linux-s390x-gnu/package.json').version - if (bindingPackageVersion !== '0.2.0-pre.11' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.11 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.2.0-pre.13' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.13 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -428,8 +428,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-openharmony-arm64') const bindingPackageVersion = require('@tursodatabase/sync-openharmony-arm64/package.json').version - if (bindingPackageVersion !== '0.2.0-pre.11' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.11 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.2.0-pre.13' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.13 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -444,8 +444,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-openharmony-x64') const bindingPackageVersion = require('@tursodatabase/sync-openharmony-x64/package.json').version - if (bindingPackageVersion !== '0.2.0-pre.11' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.11 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.2.0-pre.13' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.13 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { @@ -460,8 +460,8 @@ function requireNative() { try { const binding = require('@tursodatabase/sync-openharmony-arm') const bindingPackageVersion = require('@tursodatabase/sync-openharmony-arm/package.json').version - if (bindingPackageVersion !== '0.2.0-pre.11' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { - throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.11 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) + if (bindingPackageVersion !== '0.2.0-pre.13' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') { + throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.13 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`) } return binding } catch (e) { diff --git a/bindings/javascript/sync/packages/native/promise.test.ts b/bindings/javascript/sync/packages/native/promise.test.ts index cb3268993..1f5801991 100644 --- a/bindings/javascript/sync/packages/native/promise.test.ts +++ b/bindings/javascript/sync/packages/native/promise.test.ts @@ -3,6 +3,7 @@ import { expect, test } from 'vitest' import { connect, Database, DatabaseRowMutation, DatabaseRowTransformResult } from './promise.js' const localeCompare = (a, b) => a.x.localeCompare(b.x); +const intCompare = (a, b) => a.x - b.x; function cleanup(path) { unlinkSync(path); @@ -12,6 +13,15 @@ function cleanup(path) { try { unlinkSync(`${path}-wal-revert`) } catch (e) { } } +test('simple-db', async () => { + const db = new Database({ path: ':memory:' }); + expect(await db.prepare("SELECT 1 as x").all()).toEqual([{ x: 1 }]) + await db.exec("CREATE TABLE t(x)"); + await db.exec("INSERT INTO t VALUES (1), (2), (3)"); + expect(await db.prepare("SELECT * FROM t").all()).toEqual([{ x: 1 }, { x: 2 }, { x: 3 }]) + await expect(async () => await db.pull()).rejects.toThrowError(/sync is disabled as database was opened without sync support/); +}) + test('implicit connect', async () => { const db = new Database({ path: ':memory:', url: process.env.VITE_TURSO_DB_URL }); const defer = db.prepare("SELECT * FROM not_found"); @@ -20,6 +30,74 @@ test('implicit connect', async () => { expect(await db.prepare("SELECT 1 as x").all()).toEqual([{ x: 1 }]); }) +test('defered sync', async () => { + { + const db = await connect({ path: ':memory:', url: process.env.VITE_TURSO_DB_URL }); + await db.exec("CREATE TABLE IF NOT EXISTS t(x)"); + await db.exec("DELETE FROM t"); + await db.exec("INSERT INTO t VALUES (100)"); + await db.push(); + await db.close(); + } + + let url = null; + const db = new Database({ path: ':memory:', url: () => url }); + await db.prepare("CREATE TABLE t(x)").run(); + await db.prepare("INSERT INTO t VALUES (1), (2), (3), (42)").run(); + expect(await db.prepare("SELECT * FROM t").all()).toEqual([{ x: 1 }, { x: 2 }, { x: 3 }, { x: 42 }]); + await expect(async () => await db.pull()).rejects.toThrow(/url is empty - sync is paused/); + url = process.env.VITE_TURSO_DB_URL; + await db.pull(); + expect(await db.prepare("SELECT * FROM t").all()).toEqual([{ x: 100 }, { x: 1 }, { x: 2 }, { x: 3 }, { x: 42 }]); +}) + +test('encryption sync', async () => { + const KEY = 'l/FWopMfZisTLgBX4A42AergrCrYKjiO3BfkJUwv83I='; + const URL = 'http://encrypted--a--a.localhost:10000'; + { + const db = await connect({ path: ':memory:', url: URL, remoteEncryption: { key: KEY, cipher: 'aes256gcm' } }); + await db.exec("CREATE TABLE IF NOT EXISTS t(x)"); + await db.exec("DELETE FROM t"); + await db.push(); + await db.close(); + } + const db1 = await connect({ path: ':memory:', url: URL, remoteEncryption: { key: KEY, cipher: 'aes256gcm' } }); + const db2 = await connect({ path: ':memory:', url: URL, remoteEncryption: { key: KEY, cipher: 'aes256gcm' } }); + await db1.exec("INSERT INTO t VALUES (1), (2), (3)"); + await db2.exec("INSERT INTO t VALUES (4), (5), (6)"); + expect(await db1.prepare("SELECT * FROM t").all()).toEqual([{ x: 1 }, { x: 2 }, { x: 3 }]); + expect(await db2.prepare("SELECT * FROM t").all()).toEqual([{ x: 4 }, { x: 5 }, { x: 6 }]); + await Promise.all([db1.push(), db2.push()]); + await Promise.all([db1.pull(), db2.pull()]); + const expected = [{ x: 1 }, { x: 2 }, { x: 3 }, { x: 4 }, { x: 5 }, { x: 6 }]; + expect((await db1.prepare("SELECT * FROM t").all()).sort(intCompare)).toEqual(expected.sort(intCompare)); + expect((await db2.prepare("SELECT * FROM t").all()).sort(intCompare)).toEqual(expected.sort(intCompare)); +}); + +test('defered encryption sync', async () => { + const URL = 'http://encrypted--a--a.localhost:10000'; + const KEY = 'l/FWopMfZisTLgBX4A42AergrCrYKjiO3BfkJUwv83I='; + let url = null; + { + const db = await connect({ path: ':memory:', url: URL, remoteEncryption: { key: KEY, cipher: 'aes256gcm' } }); + await db.exec("CREATE TABLE IF NOT EXISTS t(x)"); + await db.exec("DELETE FROM t"); + await db.exec("INSERT INTO t VALUES (100)"); + await db.push(); + await db.close(); + } + const db = await connect({ path: ':memory:', url: () => url, remoteEncryption: { key: KEY, cipher: 'aes256gcm' } }); + await db.exec("CREATE TABLE IF NOT EXISTS t(x)"); + await db.exec("INSERT INTO t VALUES (1), (2), (3)"); + expect(await db.prepare("SELECT * FROM t").all()).toEqual([{ x: 1 }, { x: 2 }, { x: 3 }]); + + url = URL; + await db.pull(); + + const expected = [{ x: 100 }, { x: 1 }, { x: 2 }, { x: 3 }]; + expect((await db.prepare("SELECT * FROM t").all())).toEqual(expected); +}); + test('select-after-push', async () => { { const db = await connect({ path: ':memory:', url: process.env.VITE_TURSO_DB_URL }); diff --git a/bindings/javascript/sync/packages/native/promise.ts b/bindings/javascript/sync/packages/native/promise.ts index abaf263c5..1df3cfb21 100644 --- a/bindings/javascript/sync/packages/native/promise.ts +++ b/bindings/javascript/sync/packages/native/promise.ts @@ -1,6 +1,6 @@ import { DatabasePromise } from "@tursodatabase/database-common" import { ProtocolIo, run, DatabaseOpts, EncryptionOpts, RunOpts, DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult, DatabaseStats, SyncEngineGuards } from "@tursodatabase/sync-common"; -import { SyncEngine, SyncEngineProtocolVersion } from "#index"; +import { SyncEngine, SyncEngineProtocolVersion, Database as NativeDatabase } from "#index"; import { promises } from "node:fs"; let NodeIO: ProtocolIo = { @@ -45,6 +45,12 @@ class Database extends DatabasePromise { #io: ProtocolIo; #guards: SyncEngineGuards constructor(opts: DatabaseOpts) { + if (opts.url == null) { + super(new NativeDatabase(opts.path, { tracing: opts.tracing }) as any); + this.#engine = null; + return; + } + const engine = new SyncEngine({ path: opts.path, clientName: opts.clientName, @@ -52,23 +58,31 @@ class Database extends DatabasePromise { protocolVersion: SyncEngineProtocolVersion.V1, longPollTimeoutMs: opts.longPollTimeoutMs, tracing: opts.tracing, + bootstrapIfEmpty: typeof opts.url != "function" || opts.url() != null, + remoteEncryption: opts.remoteEncryption?.cipher, }); super(engine.db() as unknown as any); - - let headers = typeof opts.authToken === "function" ? () => ({ - ...(opts.authToken != null && { "Authorization": `Bearer ${(opts.authToken as any)()}` }), - ...(opts.encryption != null && { - "x-turso-encryption-key": opts.encryption.key, - "x-turso-encryption-cipher": opts.encryption.cipher, - }) - }) : { - ...(opts.authToken != null && { "Authorization": `Bearer ${opts.authToken}` }), - ...(opts.encryption != null && { - "x-turso-encryption-key": opts.encryption.key, - "x-turso-encryption-cipher": opts.encryption.cipher, - }) - }; + let headers: { [K: string]: string } | (() => Promise<{ [K: string]: string }>); + if (typeof opts.authToken == "function") { + const authToken = opts.authToken; + headers = async () => ({ + ...(opts.authToken != null && { "Authorization": `Bearer ${await authToken()}` }), + ...(opts.remoteEncryption != null && { + "x-turso-encryption-key": opts.remoteEncryption.key, + "x-turso-encryption-cipher": opts.remoteEncryption.cipher, + }) + }); + } else { + const authToken = opts.authToken; + headers = { + ...(opts.authToken != null && { "Authorization": `Bearer ${authToken}` }), + ...(opts.remoteEncryption != null && { + "x-turso-encryption-key": opts.remoteEncryption.key, + "x-turso-encryption-cipher": opts.remoteEncryption.cipher, + }) + }; + } this.#runOpts = { url: opts.url, headers: headers, @@ -83,8 +97,13 @@ class Database extends DatabasePromise { * connect database and initialize it in case of clean start */ override async connect() { - if (this.connected) { return; } - await run(this.#runOpts, this.#io, this.#engine, this.#engine.connect()); + if (this.connected) { + return; + } else if (this.#engine == null) { + await super.connect(); + } else { + await run(this.#runOpts, this.#io, this.#engine, this.#engine.connect()); + } this.connected = true; } /** @@ -93,6 +112,9 @@ class Database extends DatabasePromise { * @returns true if new changes were pulled from the remote */ async pull() { + if (this.#engine == null) { + throw new Error("sync is disabled as database was opened without sync support") + } const changes = await this.#guards.wait(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.wait())); if (changes.empty()) { return false; @@ -105,25 +127,37 @@ class Database extends DatabasePromise { * if {@link DatabaseOpts.transform} is set - then provided callback will be called for every mutation before sending it to the remote */ async push() { + if (this.#engine == null) { + throw new Error("sync is disabled as database was opened without sync support") + } await this.#guards.push(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.push())); } /** * checkpoint WAL for local database */ async checkpoint() { + if (this.#engine == null) { + throw new Error("sync is disabled as database was opened without sync support") + } await this.#guards.checkpoint(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.checkpoint())); } /** * @returns statistic of current local database */ async stats(): Promise { + if (this.#engine == null) { + throw new Error("sync is disabled as database was opened without sync support") + } return (await run(this.#runOpts, this.#io, this.#engine, this.#engine.stats())); } /** * close the database */ override async close(): Promise { - this.#engine.close(); + await super.close(); + if (this.#engine != null) { + this.#engine.close(); + } } } diff --git a/bindings/javascript/sync/packages/wasm/cp-entrypoint.sh b/bindings/javascript/sync/packages/wasm/cp-entrypoint.sh new file mode 100644 index 000000000..7fcf98ad6 --- /dev/null +++ b/bindings/javascript/sync/packages/wasm/cp-entrypoint.sh @@ -0,0 +1,3 @@ +sed 's/index-default.js/index-bundle.js/g' promise-default.ts > promise-bundle.ts +sed 's/index-default.js/index-turbopack-hack.js/g' promise-default.ts > promise-turbopack-hack.ts +sed 's/index-default.js/index-vite-dev-hack.js/g' promise-default.ts > promise-vite-dev-hack.ts diff --git a/bindings/javascript/sync/packages/wasm/promise-bundle.ts b/bindings/javascript/sync/packages/wasm/promise-bundle.ts index 7a818f323..8080d7c53 100644 --- a/bindings/javascript/sync/packages/wasm/promise-bundle.ts +++ b/bindings/javascript/sync/packages/wasm/promise-bundle.ts @@ -1,7 +1,7 @@ import { registerFileAtWorker, unregisterFileAtWorker } from "@tursodatabase/database-wasm-common" import { DatabasePromise } from "@tursodatabase/database-common" import { ProtocolIo, run, DatabaseOpts, EncryptionOpts, RunOpts, DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult, DatabaseStats, SyncEngineGuards } from "@tursodatabase/sync-common"; -import { SyncEngine, SyncEngineProtocolVersion, initThreadPool, MainWorker } from "./index-bundle.js"; +import { SyncEngine, SyncEngineProtocolVersion, initThreadPool, MainWorker, Database as NativeDatabase } from "./index-bundle.js"; let BrowserIO: ProtocolIo = { async read(path: string): Promise { @@ -45,6 +45,12 @@ class Database extends DatabasePromise { #guards: SyncEngineGuards; #worker: Worker | null; constructor(opts: DatabaseOpts) { + if (opts.url == null) { + super(new NativeDatabase(opts.path, { tracing: opts.tracing }) as any); + this.#engine = null; + return; + } + const engine = new SyncEngine({ path: opts.path, clientName: opts.clientName, @@ -52,23 +58,31 @@ class Database extends DatabasePromise { protocolVersion: SyncEngineProtocolVersion.V1, longPollTimeoutMs: opts.longPollTimeoutMs, tracing: opts.tracing, + bootstrapIfEmpty: typeof opts.url != "function" || opts.url() != null, + remoteEncryption: opts.remoteEncryption?.cipher, }); super(engine.db() as unknown as any); - - let headers = typeof opts.authToken === "function" ? () => ({ - ...(opts.authToken != null && { "Authorization": `Bearer ${(opts.authToken as any)()}` }), - ...(opts.encryption != null && { - "x-turso-encryption-key": opts.encryption.key, - "x-turso-encryption-cipher": opts.encryption.cipher, - }) - }) : { - ...(opts.authToken != null && { "Authorization": `Bearer ${opts.authToken}` }), - ...(opts.encryption != null && { - "x-turso-encryption-key": opts.encryption.key, - "x-turso-encryption-cipher": opts.encryption.cipher, - }) - }; + let headers: { [K: string]: string } | (() => Promise<{ [K: string]: string }>); + if (typeof opts.authToken == "function") { + const authToken = opts.authToken; + headers = async () => ({ + ...(opts.authToken != null && { "Authorization": `Bearer ${await authToken()}` }), + ...(opts.remoteEncryption != null && { + "x-turso-encryption-key": opts.remoteEncryption.key, + "x-turso-encryption-cipher": opts.remoteEncryption.cipher, + }) + }); + } else { + const authToken = opts.authToken; + headers = { + ...(opts.authToken != null && { "Authorization": `Bearer ${authToken}` }), + ...(opts.remoteEncryption != null && { + "x-turso-encryption-key": opts.remoteEncryption.key, + "x-turso-encryption-cipher": opts.remoteEncryption.cipher, + }) + }; + } this.#runOpts = { url: opts.url, headers: headers, @@ -83,18 +97,23 @@ class Database extends DatabasePromise { * connect database and initialize it in case of clean start */ override async connect() { - if (this.connected) { return; } - if (!this.memory) { - this.#worker = await init(); - await Promise.all([ - registerFileAtWorker(this.#worker, this.name), - registerFileAtWorker(this.#worker, `${this.name}-wal`), - registerFileAtWorker(this.#worker, `${this.name}-wal-revert`), - registerFileAtWorker(this.#worker, `${this.name}-info`), - registerFileAtWorker(this.#worker, `${this.name}-changes`), - ]); + if (this.connected) { + return; + } else if (this.#engine == null) { + await super.connect(); + } else { + if (!this.memory) { + this.#worker = await init(); + await Promise.all([ + registerFileAtWorker(this.#worker, this.name), + registerFileAtWorker(this.#worker, `${this.name}-wal`), + registerFileAtWorker(this.#worker, `${this.name}-wal-revert`), + registerFileAtWorker(this.#worker, `${this.name}-info`), + registerFileAtWorker(this.#worker, `${this.name}-changes`), + ]); + } + await run(this.#runOpts, this.#io, this.#engine, this.#engine.connect()); } - await run(this.#runOpts, this.#io, this.#engine, this.#engine.connect()); this.connected = true; } /** @@ -103,6 +122,9 @@ class Database extends DatabasePromise { * @returns true if new changes were pulled from the remote */ async pull() { + if (this.#engine == null) { + throw new Error("sync is disabled as database was opened without sync support") + } const changes = await this.#guards.wait(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.wait())); if (changes.empty()) { return false; @@ -115,35 +137,46 @@ class Database extends DatabasePromise { * if {@link DatabaseOpts.transform} is set - then provided callback will be called for every mutation before sending it to the remote */ async push() { + if (this.#engine == null) { + throw new Error("sync is disabled as database was opened without sync support") + } await this.#guards.push(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.push())); } /** * checkpoint WAL for local database */ async checkpoint() { + if (this.#engine == null) { + throw new Error("sync is disabled as database was opened without sync support") + } await this.#guards.checkpoint(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.checkpoint())); } /** * @returns statistic of current local database */ async stats(): Promise { + if (this.#engine == null) { + throw new Error("sync is disabled as database was opened without sync support") + } return (await run(this.#runOpts, this.#io, this.#engine, this.#engine.stats())); } /** * close the database and relevant files */ async close() { - if (this.name != null && this.#worker != null) { - await Promise.all([ - unregisterFileAtWorker(this.#worker, this.name), - unregisterFileAtWorker(this.#worker, `${this.name}-wal`), - unregisterFileAtWorker(this.#worker, `${this.name}-wal-revert`), - unregisterFileAtWorker(this.#worker, `${this.name}-info`), - unregisterFileAtWorker(this.#worker, `${this.name}-changes`), - ]); + if (this.#engine != null) { + if (this.name != null && this.#worker != null) { + await Promise.all([ + unregisterFileAtWorker(this.#worker, this.name), + unregisterFileAtWorker(this.#worker, `${this.name}-wal`), + unregisterFileAtWorker(this.#worker, `${this.name}-wal-revert`), + unregisterFileAtWorker(this.#worker, `${this.name}-info`), + unregisterFileAtWorker(this.#worker, `${this.name}-changes`), + ]); + } + this.#engine.close(); } await super.close(); - this.#engine.close(); } } diff --git a/bindings/javascript/sync/packages/wasm/promise-default.ts b/bindings/javascript/sync/packages/wasm/promise-default.ts index 75802be92..156998cfc 100644 --- a/bindings/javascript/sync/packages/wasm/promise-default.ts +++ b/bindings/javascript/sync/packages/wasm/promise-default.ts @@ -1,7 +1,7 @@ import { registerFileAtWorker, unregisterFileAtWorker } from "@tursodatabase/database-wasm-common" import { DatabasePromise } from "@tursodatabase/database-common" import { ProtocolIo, run, DatabaseOpts, EncryptionOpts, RunOpts, DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult, DatabaseStats, SyncEngineGuards } from "@tursodatabase/sync-common"; -import { SyncEngine, SyncEngineProtocolVersion, initThreadPool, MainWorker } from "./index-default.js"; +import { SyncEngine, SyncEngineProtocolVersion, initThreadPool, MainWorker, Database as NativeDatabase } from "./index-default.js"; let BrowserIO: ProtocolIo = { async read(path: string): Promise { @@ -45,6 +45,12 @@ class Database extends DatabasePromise { #guards: SyncEngineGuards; #worker: Worker | null; constructor(opts: DatabaseOpts) { + if (opts.url == null) { + super(new NativeDatabase(opts.path, { tracing: opts.tracing }) as any); + this.#engine = null; + return; + } + const engine = new SyncEngine({ path: opts.path, clientName: opts.clientName, @@ -52,23 +58,31 @@ class Database extends DatabasePromise { protocolVersion: SyncEngineProtocolVersion.V1, longPollTimeoutMs: opts.longPollTimeoutMs, tracing: opts.tracing, + bootstrapIfEmpty: typeof opts.url != "function" || opts.url() != null, + remoteEncryption: opts.remoteEncryption?.cipher, }); super(engine.db() as unknown as any); - - let headers = typeof opts.authToken === "function" ? () => ({ - ...(opts.authToken != null && { "Authorization": `Bearer ${(opts.authToken as any)()}` }), - ...(opts.encryption != null && { - "x-turso-encryption-key": opts.encryption.key, - "x-turso-encryption-cipher": opts.encryption.cipher, - }) - }) : { - ...(opts.authToken != null && { "Authorization": `Bearer ${opts.authToken}` }), - ...(opts.encryption != null && { - "x-turso-encryption-key": opts.encryption.key, - "x-turso-encryption-cipher": opts.encryption.cipher, - }) - }; + let headers: { [K: string]: string } | (() => Promise<{ [K: string]: string }>); + if (typeof opts.authToken == "function") { + const authToken = opts.authToken; + headers = async () => ({ + ...(opts.authToken != null && { "Authorization": `Bearer ${await authToken()}` }), + ...(opts.remoteEncryption != null && { + "x-turso-encryption-key": opts.remoteEncryption.key, + "x-turso-encryption-cipher": opts.remoteEncryption.cipher, + }) + }); + } else { + const authToken = opts.authToken; + headers = { + ...(opts.authToken != null && { "Authorization": `Bearer ${authToken}` }), + ...(opts.remoteEncryption != null && { + "x-turso-encryption-key": opts.remoteEncryption.key, + "x-turso-encryption-cipher": opts.remoteEncryption.cipher, + }) + }; + } this.#runOpts = { url: opts.url, headers: headers, @@ -83,18 +97,23 @@ class Database extends DatabasePromise { * connect database and initialize it in case of clean start */ override async connect() { - if (this.connected) { return; } - if (!this.memory) { - this.#worker = await init(); - await Promise.all([ - registerFileAtWorker(this.#worker, this.name), - registerFileAtWorker(this.#worker, `${this.name}-wal`), - registerFileAtWorker(this.#worker, `${this.name}-wal-revert`), - registerFileAtWorker(this.#worker, `${this.name}-info`), - registerFileAtWorker(this.#worker, `${this.name}-changes`), - ]); + if (this.connected) { + return; + } else if (this.#engine == null) { + await super.connect(); + } else { + if (!this.memory) { + this.#worker = await init(); + await Promise.all([ + registerFileAtWorker(this.#worker, this.name), + registerFileAtWorker(this.#worker, `${this.name}-wal`), + registerFileAtWorker(this.#worker, `${this.name}-wal-revert`), + registerFileAtWorker(this.#worker, `${this.name}-info`), + registerFileAtWorker(this.#worker, `${this.name}-changes`), + ]); + } + await run(this.#runOpts, this.#io, this.#engine, this.#engine.connect()); } - await run(this.#runOpts, this.#io, this.#engine, this.#engine.connect()); this.connected = true; } /** @@ -103,6 +122,9 @@ class Database extends DatabasePromise { * @returns true if new changes were pulled from the remote */ async pull() { + if (this.#engine == null) { + throw new Error("sync is disabled as database was opened without sync support") + } const changes = await this.#guards.wait(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.wait())); if (changes.empty()) { return false; @@ -115,35 +137,46 @@ class Database extends DatabasePromise { * if {@link DatabaseOpts.transform} is set - then provided callback will be called for every mutation before sending it to the remote */ async push() { + if (this.#engine == null) { + throw new Error("sync is disabled as database was opened without sync support") + } await this.#guards.push(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.push())); } /** * checkpoint WAL for local database */ async checkpoint() { + if (this.#engine == null) { + throw new Error("sync is disabled as database was opened without sync support") + } await this.#guards.checkpoint(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.checkpoint())); } /** * @returns statistic of current local database */ async stats(): Promise { + if (this.#engine == null) { + throw new Error("sync is disabled as database was opened without sync support") + } return (await run(this.#runOpts, this.#io, this.#engine, this.#engine.stats())); } /** * close the database and relevant files */ async close() { - if (this.name != null && this.#worker != null) { - await Promise.all([ - unregisterFileAtWorker(this.#worker, this.name), - unregisterFileAtWorker(this.#worker, `${this.name}-wal`), - unregisterFileAtWorker(this.#worker, `${this.name}-wal-revert`), - unregisterFileAtWorker(this.#worker, `${this.name}-info`), - unregisterFileAtWorker(this.#worker, `${this.name}-changes`), - ]); + if (this.#engine != null) { + if (this.name != null && this.#worker != null) { + await Promise.all([ + unregisterFileAtWorker(this.#worker, this.name), + unregisterFileAtWorker(this.#worker, `${this.name}-wal`), + unregisterFileAtWorker(this.#worker, `${this.name}-wal-revert`), + unregisterFileAtWorker(this.#worker, `${this.name}-info`), + unregisterFileAtWorker(this.#worker, `${this.name}-changes`), + ]); + } + this.#engine.close(); } await super.close(); - this.#engine.close(); } } diff --git a/bindings/javascript/sync/packages/wasm/promise-turbopack-hack.ts b/bindings/javascript/sync/packages/wasm/promise-turbopack-hack.ts index 273fd21fd..935b8b31d 100644 --- a/bindings/javascript/sync/packages/wasm/promise-turbopack-hack.ts +++ b/bindings/javascript/sync/packages/wasm/promise-turbopack-hack.ts @@ -1,7 +1,7 @@ import { registerFileAtWorker, unregisterFileAtWorker } from "@tursodatabase/database-wasm-common" import { DatabasePromise } from "@tursodatabase/database-common" import { ProtocolIo, run, DatabaseOpts, EncryptionOpts, RunOpts, DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult, DatabaseStats, SyncEngineGuards } from "@tursodatabase/sync-common"; -import { SyncEngine, SyncEngineProtocolVersion, initThreadPool, MainWorker } from "./index-turbopack-hack.js"; +import { SyncEngine, SyncEngineProtocolVersion, initThreadPool, MainWorker, Database as NativeDatabase } from "./index-turbopack-hack.js"; let BrowserIO: ProtocolIo = { async read(path: string): Promise { @@ -45,6 +45,12 @@ class Database extends DatabasePromise { #guards: SyncEngineGuards; #worker: Worker | null; constructor(opts: DatabaseOpts) { + if (opts.url == null) { + super(new NativeDatabase(opts.path, { tracing: opts.tracing }) as any); + this.#engine = null; + return; + } + const engine = new SyncEngine({ path: opts.path, clientName: opts.clientName, @@ -52,23 +58,31 @@ class Database extends DatabasePromise { protocolVersion: SyncEngineProtocolVersion.V1, longPollTimeoutMs: opts.longPollTimeoutMs, tracing: opts.tracing, + bootstrapIfEmpty: typeof opts.url != "function" || opts.url() != null, + remoteEncryption: opts.remoteEncryption?.cipher, }); super(engine.db() as unknown as any); - - let headers = typeof opts.authToken === "function" ? () => ({ - ...(opts.authToken != null && { "Authorization": `Bearer ${(opts.authToken as any)()}` }), - ...(opts.encryption != null && { - "x-turso-encryption-key": opts.encryption.key, - "x-turso-encryption-cipher": opts.encryption.cipher, - }) - }) : { - ...(opts.authToken != null && { "Authorization": `Bearer ${opts.authToken}` }), - ...(opts.encryption != null && { - "x-turso-encryption-key": opts.encryption.key, - "x-turso-encryption-cipher": opts.encryption.cipher, - }) - }; + let headers: { [K: string]: string } | (() => Promise<{ [K: string]: string }>); + if (typeof opts.authToken == "function") { + const authToken = opts.authToken; + headers = async () => ({ + ...(opts.authToken != null && { "Authorization": `Bearer ${await authToken()}` }), + ...(opts.remoteEncryption != null && { + "x-turso-encryption-key": opts.remoteEncryption.key, + "x-turso-encryption-cipher": opts.remoteEncryption.cipher, + }) + }); + } else { + const authToken = opts.authToken; + headers = { + ...(opts.authToken != null && { "Authorization": `Bearer ${authToken}` }), + ...(opts.remoteEncryption != null && { + "x-turso-encryption-key": opts.remoteEncryption.key, + "x-turso-encryption-cipher": opts.remoteEncryption.cipher, + }) + }; + } this.#runOpts = { url: opts.url, headers: headers, @@ -83,18 +97,23 @@ class Database extends DatabasePromise { * connect database and initialize it in case of clean start */ override async connect() { - if (this.connected) { return; } - if (!this.memory) { - this.#worker = await init(); - await Promise.all([ - registerFileAtWorker(this.#worker, this.name), - registerFileAtWorker(this.#worker, `${this.name}-wal`), - registerFileAtWorker(this.#worker, `${this.name}-wal-revert`), - registerFileAtWorker(this.#worker, `${this.name}-info`), - registerFileAtWorker(this.#worker, `${this.name}-changes`), - ]); + if (this.connected) { + return; + } else if (this.#engine == null) { + await super.connect(); + } else { + if (!this.memory) { + this.#worker = await init(); + await Promise.all([ + registerFileAtWorker(this.#worker, this.name), + registerFileAtWorker(this.#worker, `${this.name}-wal`), + registerFileAtWorker(this.#worker, `${this.name}-wal-revert`), + registerFileAtWorker(this.#worker, `${this.name}-info`), + registerFileAtWorker(this.#worker, `${this.name}-changes`), + ]); + } + await run(this.#runOpts, this.#io, this.#engine, this.#engine.connect()); } - await run(this.#runOpts, this.#io, this.#engine, this.#engine.connect()); this.connected = true; } /** @@ -103,6 +122,9 @@ class Database extends DatabasePromise { * @returns true if new changes were pulled from the remote */ async pull() { + if (this.#engine == null) { + throw new Error("sync is disabled as database was opened without sync support") + } const changes = await this.#guards.wait(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.wait())); if (changes.empty()) { return false; @@ -115,35 +137,46 @@ class Database extends DatabasePromise { * if {@link DatabaseOpts.transform} is set - then provided callback will be called for every mutation before sending it to the remote */ async push() { + if (this.#engine == null) { + throw new Error("sync is disabled as database was opened without sync support") + } await this.#guards.push(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.push())); } /** * checkpoint WAL for local database */ async checkpoint() { + if (this.#engine == null) { + throw new Error("sync is disabled as database was opened without sync support") + } await this.#guards.checkpoint(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.checkpoint())); } /** * @returns statistic of current local database */ async stats(): Promise { + if (this.#engine == null) { + throw new Error("sync is disabled as database was opened without sync support") + } return (await run(this.#runOpts, this.#io, this.#engine, this.#engine.stats())); } /** * close the database and relevant files */ async close() { - if (this.name != null && this.#worker != null) { - await Promise.all([ - unregisterFileAtWorker(this.#worker, this.name), - unregisterFileAtWorker(this.#worker, `${this.name}-wal`), - unregisterFileAtWorker(this.#worker, `${this.name}-wal-revert`), - unregisterFileAtWorker(this.#worker, `${this.name}-info`), - unregisterFileAtWorker(this.#worker, `${this.name}-changes`), - ]); + if (this.#engine != null) { + if (this.name != null && this.#worker != null) { + await Promise.all([ + unregisterFileAtWorker(this.#worker, this.name), + unregisterFileAtWorker(this.#worker, `${this.name}-wal`), + unregisterFileAtWorker(this.#worker, `${this.name}-wal-revert`), + unregisterFileAtWorker(this.#worker, `${this.name}-info`), + unregisterFileAtWorker(this.#worker, `${this.name}-changes`), + ]); + } + this.#engine.close(); } await super.close(); - this.#engine.close(); } } diff --git a/bindings/javascript/sync/packages/wasm/promise-vite-dev-hack.ts b/bindings/javascript/sync/packages/wasm/promise-vite-dev-hack.ts index 152859a7a..fbced4d2c 100644 --- a/bindings/javascript/sync/packages/wasm/promise-vite-dev-hack.ts +++ b/bindings/javascript/sync/packages/wasm/promise-vite-dev-hack.ts @@ -1,7 +1,7 @@ import { registerFileAtWorker, unregisterFileAtWorker } from "@tursodatabase/database-wasm-common" import { DatabasePromise } from "@tursodatabase/database-common" import { ProtocolIo, run, DatabaseOpts, EncryptionOpts, RunOpts, DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult, DatabaseStats, SyncEngineGuards } from "@tursodatabase/sync-common"; -import { SyncEngine, SyncEngineProtocolVersion, initThreadPool, MainWorker } from "./index-vite-dev-hack.js"; +import { SyncEngine, SyncEngineProtocolVersion, initThreadPool, MainWorker, Database as NativeDatabase } from "./index-vite-dev-hack.js"; let BrowserIO: ProtocolIo = { async read(path: string): Promise { @@ -45,6 +45,12 @@ class Database extends DatabasePromise { #guards: SyncEngineGuards; #worker: Worker | null; constructor(opts: DatabaseOpts) { + if (opts.url == null) { + super(new NativeDatabase(opts.path, { tracing: opts.tracing }) as any); + this.#engine = null; + return; + } + const engine = new SyncEngine({ path: opts.path, clientName: opts.clientName, @@ -52,23 +58,31 @@ class Database extends DatabasePromise { protocolVersion: SyncEngineProtocolVersion.V1, longPollTimeoutMs: opts.longPollTimeoutMs, tracing: opts.tracing, + bootstrapIfEmpty: typeof opts.url != "function" || opts.url() != null, + remoteEncryption: opts.remoteEncryption?.cipher, }); super(engine.db() as unknown as any); - - let headers = typeof opts.authToken === "function" ? () => ({ - ...(opts.authToken != null && { "Authorization": `Bearer ${(opts.authToken as any)()}` }), - ...(opts.encryption != null && { - "x-turso-encryption-key": opts.encryption.key, - "x-turso-encryption-cipher": opts.encryption.cipher, - }) - }) : { - ...(opts.authToken != null && { "Authorization": `Bearer ${opts.authToken}` }), - ...(opts.encryption != null && { - "x-turso-encryption-key": opts.encryption.key, - "x-turso-encryption-cipher": opts.encryption.cipher, - }) - }; + let headers: { [K: string]: string } | (() => Promise<{ [K: string]: string }>); + if (typeof opts.authToken == "function") { + const authToken = opts.authToken; + headers = async () => ({ + ...(opts.authToken != null && { "Authorization": `Bearer ${await authToken()}` }), + ...(opts.remoteEncryption != null && { + "x-turso-encryption-key": opts.remoteEncryption.key, + "x-turso-encryption-cipher": opts.remoteEncryption.cipher, + }) + }); + } else { + const authToken = opts.authToken; + headers = { + ...(opts.authToken != null && { "Authorization": `Bearer ${authToken}` }), + ...(opts.remoteEncryption != null && { + "x-turso-encryption-key": opts.remoteEncryption.key, + "x-turso-encryption-cipher": opts.remoteEncryption.cipher, + }) + }; + } this.#runOpts = { url: opts.url, headers: headers, @@ -83,18 +97,23 @@ class Database extends DatabasePromise { * connect database and initialize it in case of clean start */ override async connect() { - if (this.connected) { return; } - if (!this.memory) { - this.#worker = await init(); - await Promise.all([ - registerFileAtWorker(this.#worker, this.name), - registerFileAtWorker(this.#worker, `${this.name}-wal`), - registerFileAtWorker(this.#worker, `${this.name}-wal-revert`), - registerFileAtWorker(this.#worker, `${this.name}-info`), - registerFileAtWorker(this.#worker, `${this.name}-changes`), - ]); + if (this.connected) { + return; + } else if (this.#engine == null) { + await super.connect(); + } else { + if (!this.memory) { + this.#worker = await init(); + await Promise.all([ + registerFileAtWorker(this.#worker, this.name), + registerFileAtWorker(this.#worker, `${this.name}-wal`), + registerFileAtWorker(this.#worker, `${this.name}-wal-revert`), + registerFileAtWorker(this.#worker, `${this.name}-info`), + registerFileAtWorker(this.#worker, `${this.name}-changes`), + ]); + } + await run(this.#runOpts, this.#io, this.#engine, this.#engine.connect()); } - await run(this.#runOpts, this.#io, this.#engine, this.#engine.connect()); this.connected = true; } /** @@ -103,6 +122,9 @@ class Database extends DatabasePromise { * @returns true if new changes were pulled from the remote */ async pull() { + if (this.#engine == null) { + throw new Error("sync is disabled as database was opened without sync support") + } const changes = await this.#guards.wait(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.wait())); if (changes.empty()) { return false; @@ -115,35 +137,46 @@ class Database extends DatabasePromise { * if {@link DatabaseOpts.transform} is set - then provided callback will be called for every mutation before sending it to the remote */ async push() { + if (this.#engine == null) { + throw new Error("sync is disabled as database was opened without sync support") + } await this.#guards.push(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.push())); } /** * checkpoint WAL for local database */ async checkpoint() { + if (this.#engine == null) { + throw new Error("sync is disabled as database was opened without sync support") + } await this.#guards.checkpoint(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.checkpoint())); } /** * @returns statistic of current local database */ async stats(): Promise { + if (this.#engine == null) { + throw new Error("sync is disabled as database was opened without sync support") + } return (await run(this.#runOpts, this.#io, this.#engine, this.#engine.stats())); } /** * close the database and relevant files */ async close() { - if (this.name != null && this.#worker != null) { - await Promise.all([ - unregisterFileAtWorker(this.#worker, this.name), - unregisterFileAtWorker(this.#worker, `${this.name}-wal`), - unregisterFileAtWorker(this.#worker, `${this.name}-wal-revert`), - unregisterFileAtWorker(this.#worker, `${this.name}-info`), - unregisterFileAtWorker(this.#worker, `${this.name}-changes`), - ]); + if (this.#engine != null) { + if (this.name != null && this.#worker != null) { + await Promise.all([ + unregisterFileAtWorker(this.#worker, this.name), + unregisterFileAtWorker(this.#worker, `${this.name}-wal`), + unregisterFileAtWorker(this.#worker, `${this.name}-wal-revert`), + unregisterFileAtWorker(this.#worker, `${this.name}-info`), + unregisterFileAtWorker(this.#worker, `${this.name}-changes`), + ]); + } + this.#engine.close(); } await super.close(); - this.#engine.close(); } } diff --git a/bindings/javascript/sync/packages/wasm/promise.test.ts b/bindings/javascript/sync/packages/wasm/promise.test.ts index c4f7aad4b..e6be4e53a 100644 --- a/bindings/javascript/sync/packages/wasm/promise.test.ts +++ b/bindings/javascript/sync/packages/wasm/promise.test.ts @@ -2,6 +2,7 @@ import { expect, test } from 'vitest' import { Database, connect, DatabaseRowMutation, DatabaseRowTransformResult } from './promise-default.js' const localeCompare = (a, b) => a.x.localeCompare(b.x); +const intCompare = (a, b) => a.x - b.x; test('implicit connect', async () => { const db = new Database({ path: ':memory:', url: process.env.VITE_TURSO_DB_URL }); @@ -11,6 +12,91 @@ test('implicit connect', async () => { expect(await db.prepare("SELECT 1 as x").all()).toEqual([{ x: 1 }]); }) +test('simple-db', async () => { + const db = new Database({ path: ':memory:' }); + expect(await db.prepare("SELECT 1 as x").all()).toEqual([{ x: 1 }]) + await db.exec("CREATE TABLE t(x)"); + await db.exec("INSERT INTO t VALUES (1), (2), (3)"); + expect(await db.prepare("SELECT * FROM t").all()).toEqual([{ x: 1 }, { x: 2 }, { x: 3 }]) + await expect(async () => await db.pull()).rejects.toThrowError(/sync is disabled as database was opened without sync support/); +}) + +test('implicit connect', async () => { + const db = new Database({ path: ':memory:', url: process.env.VITE_TURSO_DB_URL }); + const defer = db.prepare("SELECT * FROM not_found"); + await expect(async () => await defer.all()).rejects.toThrowError(/no such table: not_found/); + expect(() => db.prepare("SELECT * FROM not_found")).toThrowError(/no such table: not_found/); + expect(await db.prepare("SELECT 1 as x").all()).toEqual([{ x: 1 }]); +}) + +test('defered sync', async () => { + { + const db = await connect({ path: ':memory:', url: process.env.VITE_TURSO_DB_URL }); + await db.exec("CREATE TABLE IF NOT EXISTS t(x)"); + await db.exec("DELETE FROM t"); + await db.exec("INSERT INTO t VALUES (100)"); + await db.push(); + await db.close(); + } + + let url = null; + const db = new Database({ path: ':memory:', url: () => url }); + await db.prepare("CREATE TABLE t(x)").run(); + await db.prepare("INSERT INTO t VALUES (1), (2), (3), (42)").run(); + expect(await db.prepare("SELECT * FROM t").all()).toEqual([{ x: 1 }, { x: 2 }, { x: 3 }, { x: 42 }]); + await expect(async () => await db.pull()).rejects.toThrow(/url is empty - sync is paused/); + url = process.env.VITE_TURSO_DB_URL; + await db.pull(); + expect(await db.prepare("SELECT * FROM t").all()).toEqual([{ x: 100 }, { x: 1 }, { x: 2 }, { x: 3 }, { x: 42 }]); +}) + +test('encryption sync', async () => { + const KEY = 'l/FWopMfZisTLgBX4A42AergrCrYKjiO3BfkJUwv83I='; + const URL = 'http://encrypted--a--a.localhost:10000'; + { + const db = await connect({ path: ':memory:', url: URL, remoteEncryption: { key: KEY, cipher: 'aes256gcm' } }); + await db.exec("CREATE TABLE IF NOT EXISTS t(x)"); + await db.exec("DELETE FROM t"); + await db.push(); + await db.close(); + } + const db1 = await connect({ path: ':memory:', url: URL, remoteEncryption: { key: KEY, cipher: 'aes256gcm' } }); + const db2 = await connect({ path: ':memory:', url: URL, remoteEncryption: { key: KEY, cipher: 'aes256gcm' } }); + await db1.exec("INSERT INTO t VALUES (1), (2), (3)"); + await db2.exec("INSERT INTO t VALUES (4), (5), (6)"); + expect(await db1.prepare("SELECT * FROM t").all()).toEqual([{ x: 1 }, { x: 2 }, { x: 3 }]); + expect(await db2.prepare("SELECT * FROM t").all()).toEqual([{ x: 4 }, { x: 5 }, { x: 6 }]); + await Promise.all([db1.push(), db2.push()]); + await Promise.all([db1.pull(), db2.pull()]); + const expected = [{ x: 1 }, { x: 2 }, { x: 3 }, { x: 4 }, { x: 5 }, { x: 6 }]; + expect((await db1.prepare("SELECT * FROM t").all()).sort(intCompare)).toEqual(expected.sort(intCompare)); + expect((await db2.prepare("SELECT * FROM t").all()).sort(intCompare)).toEqual(expected.sort(intCompare)); +}); + +test('defered encryption sync', async () => { + const URL = 'http://encrypted--a--a.localhost:10000'; + const KEY = 'l/FWopMfZisTLgBX4A42AergrCrYKjiO3BfkJUwv83I='; + let url = null; + { + const db = await connect({ path: ':memory:', url: URL, remoteEncryption: { key: KEY, cipher: 'aes256gcm' } }); + await db.exec("CREATE TABLE IF NOT EXISTS t(x)"); + await db.exec("DELETE FROM t"); + await db.exec("INSERT INTO t VALUES (100)"); + await db.push(); + await db.close(); + } + const db = await connect({ path: ':memory:', url: () => url, remoteEncryption: { key: KEY, cipher: 'aes256gcm' } }); + await db.exec("CREATE TABLE IF NOT EXISTS t(x)"); + await db.exec("INSERT INTO t VALUES (1), (2), (3)"); + expect(await db.prepare("SELECT * FROM t").all()).toEqual([{ x: 1 }, { x: 2 }, { x: 3 }]); + + url = URL; + await db.pull(); + + const expected = [{ x: 100 }, { x: 1 }, { x: 2 }, { x: 3 }]; + expect((await db.prepare("SELECT * FROM t").all())).toEqual(expected); +}); + test('select-after-push', async () => { { const db = await connect({ path: ':memory:', url: process.env.VITE_TURSO_DB_URL }); diff --git a/bindings/javascript/sync/src/generator.rs b/bindings/javascript/sync/src/generator.rs index b5208cd15..446543049 100644 --- a/bindings/javascript/sync/src/generator.rs +++ b/bindings/javascript/sync/src/generator.rs @@ -45,7 +45,7 @@ pub enum GeneratorResponse { operations: i64, main_wal: i64, revert_wal: i64, - last_pull_unix_time: i64, + last_pull_unix_time: Option, last_push_unix_time: Option, revision: Option, }, diff --git a/bindings/javascript/sync/src/lib.rs b/bindings/javascript/sync/src/lib.rs index 5e97c9d9b..f2e5b0e23 100644 --- a/bindings/javascript/sync/src/lib.rs +++ b/bindings/javascript/sync/src/lib.rs @@ -24,13 +24,7 @@ use crate::{ #[napi] pub struct SyncEngine { - path: String, - client_name: String, - wal_pull_batch_size: u32, - long_poll_timeout: Option, - protocol_version: DatabaseSyncEngineProtocolVersion, - tables_ignore: Vec, - use_transform: bool, + opts: SyncEngineOptsFilled, io: Option>, protocol: Option>, sync_engine: Arc>>>, @@ -123,6 +117,49 @@ pub struct SyncEngineOpts { pub tables_ignore: Option>, pub use_transform: bool, pub protocol_version: Option, + pub bootstrap_if_empty: bool, + pub remote_encryption: Option, +} + +struct SyncEngineOptsFilled { + pub path: String, + pub client_name: String, + pub wal_pull_batch_size: u32, + pub long_poll_timeout: Option, + pub tables_ignore: Vec, + pub use_transform: bool, + pub protocol_version: DatabaseSyncEngineProtocolVersion, + pub bootstrap_if_empty: bool, + pub remote_encryption: Option, +} + +#[derive(Debug, Clone, Copy)] +enum CipherMode { + Aes256Gcm = 1, + Aes128Gcm = 2, + ChaCha20Poly1305 = 3, + Aegis256 = 4, +} + +impl CipherMode { + /// Returns the nonce size for this cipher mode. + pub fn required_nonce_size(&self) -> usize { + match self { + CipherMode::Aes256Gcm | CipherMode::Aes128Gcm | CipherMode::ChaCha20Poly1305 => 12, + CipherMode::Aegis256 => 32, + } + } + + /// Returns the tag size for this cipher mode. + pub fn required_tag_size(&self) -> usize { + // All supported ciphers use 16-byte tags + 16 + } + + /// Returns the total metadata size (nonce + tag) for this cipher mode. + fn required_metadata_size(&self) -> usize { + self.required_nonce_size() + self.required_tag_size() + } } #[napi] @@ -158,7 +195,7 @@ impl SyncEngine { tracing: opts.tracing.clone(), }), )?)); - Ok(SyncEngine { + let opts_filled = SyncEngineOptsFilled { path: opts.path, client_name: opts.client_name.unwrap_or("turso-sync-js".to_string()), wal_pull_batch_size: opts.wal_pull_batch_size.unwrap_or(100), @@ -167,37 +204,60 @@ impl SyncEngine { .map(|x| std::time::Duration::from_millis(x as u64)), tables_ignore: opts.tables_ignore.unwrap_or_default(), use_transform: opts.use_transform, - #[allow(clippy::arc_with_non_send_sync)] - sync_engine: Arc::new(RwLock::new(None)), - io: Some(io), - protocol: Some(Arc::new(JsProtocolIo::default())), - #[allow(clippy::arc_with_non_send_sync)] - db, protocol_version: match opts.protocol_version { Some(SyncEngineProtocolVersion::Legacy) | None => { DatabaseSyncEngineProtocolVersion::Legacy } _ => DatabaseSyncEngineProtocolVersion::V1, }, + bootstrap_if_empty: opts.bootstrap_if_empty, + remote_encryption: match opts.remote_encryption.as_deref() { + Some("aes256gcm") => Some(CipherMode::Aes256Gcm), + Some("aes128gcm") => Some(CipherMode::Aes128Gcm), + Some("chacha20poly1305") => Some(CipherMode::ChaCha20Poly1305), + Some("aegis256") => Some(CipherMode::Aegis256), + None => None, + _ => { + return Err(napi::Error::new( + napi::Status::GenericFailure, + "unsupported remote cipher", + )) + } + }, + }; + Ok(SyncEngine { + opts: opts_filled, + #[allow(clippy::arc_with_non_send_sync)] + sync_engine: Arc::new(RwLock::new(None)), + io: Some(io), + protocol: Some(Arc::new(JsProtocolIo::default())), + #[allow(clippy::arc_with_non_send_sync)] + db, }) } #[napi] pub fn connect(&mut self) -> napi::Result { let opts = DatabaseSyncEngineOpts { - client_name: self.client_name.clone(), - wal_pull_batch_size: self.wal_pull_batch_size as u64, - long_poll_timeout: self.long_poll_timeout, - tables_ignore: self.tables_ignore.clone(), - use_transform: self.use_transform, - protocol_version_hint: self.protocol_version, + client_name: self.opts.client_name.clone(), + wal_pull_batch_size: self.opts.wal_pull_batch_size as u64, + long_poll_timeout: self.opts.long_poll_timeout, + tables_ignore: self.opts.tables_ignore.clone(), + use_transform: self.opts.use_transform, + protocol_version_hint: self.opts.protocol_version, + bootstrap_if_empty: self.opts.bootstrap_if_empty, + reserved_bytes: self + .opts + .remote_encryption + .map(|x| x.required_metadata_size()) + .unwrap_or(0), }; let io = self.io()?; let protocol = self.protocol()?; let sync_engine = self.sync_engine.clone(); let db = self.db.clone(); - let path = self.path.clone(); + let path = self.opts.path.clone(); let generator = genawaiter::sync::Gen::new(|coro| async move { let coro = Coro::new((), coro); let initialized = diff --git a/core/lib.rs b/core/lib.rs index da47d161b..f7d8c1cc5 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -2266,6 +2266,12 @@ impl Connection { self.set_encryption_context() } + pub fn set_reserved_bytes(&self, reserved_bytes: u8) -> Result<()> { + let pager = self.pager.read(); + pager.set_reserved_space_bytes(reserved_bytes); + Ok(()) + } + pub fn get_encryption_cipher_mode(&self) -> Option { *self.encryption_cipher_mode.read() } diff --git a/sync/engine/Cargo.toml b/sync/engine/Cargo.toml index 89b20b406..e99564c17 100644 --- a/sync/engine/Cargo.toml +++ b/sync/engine/Cargo.toml @@ -8,6 +8,7 @@ repository.workspace = true [dependencies] turso_core = { workspace = true, features = ["conn_raw_api"] } +turso_parser = { workspace = true } thiserror = "2.0.12" tracing = "0.1.41" serde_json.workspace = true diff --git a/sync/engine/src/database_replay_generator.rs b/sync/engine/src/database_replay_generator.rs index 02532d825..616136305 100644 --- a/sync/engine/src/database_replay_generator.rs +++ b/sync/engine/src/database_replay_generator.rs @@ -1,5 +1,7 @@ use std::{collections::HashMap, sync::Arc}; +use turso_parser::parser::Parser; + use crate::{ database_tape::{run_stmt_once, DatabaseReplaySessionOpts}, errors::Error, @@ -211,9 +213,43 @@ impl DatabaseReplayGenerator { after.last() ))); }; + let mut parser = Parser::new(sql.as_str().as_bytes()); + let mut ast = parser + .next() + .ok_or_else(|| { + Error::DatabaseTapeError(format!( + "unexpected DDL query: {}", + sql.as_str() + )) + })? + .map_err(|e| { + Error::DatabaseTapeError(format!( + "unexpected DDL query {}: {}", + e, + sql.as_str() + )) + })?; + let turso_parser::ast::Cmd::Stmt(stmt) = &mut ast else { + return Err(Error::DatabaseTapeError(format!( + "unexpected DDL query: {}", + sql.as_str() + ))); + }; + match stmt { + turso_parser::ast::Stmt::CreateTable { if_not_exists, .. } + | turso_parser::ast::Stmt::CreateIndex { if_not_exists, .. } + | turso_parser::ast::Stmt::CreateTrigger { if_not_exists, .. } + | turso_parser::ast::Stmt::CreateMaterializedView { + if_not_exists, .. + } + | turso_parser::ast::Stmt::CreateView { if_not_exists, .. } => { + *if_not_exists = true + } + _ => {} + } let insert = ReplayInfo { change_type: DatabaseChangeType::Insert, - query: sql.as_str().to_string(), + query: ast.to_string(), pk_column_indices: None, column_names: Vec::new(), is_ddl_replay: true, diff --git a/sync/engine/src/database_sync_engine.rs b/sync/engine/src/database_sync_engine.rs index 0257ca629..0b37b52c1 100644 --- a/sync/engine/src/database_sync_engine.rs +++ b/sync/engine/src/database_sync_engine.rs @@ -38,6 +38,8 @@ pub struct DatabaseSyncEngineOpts { pub wal_pull_batch_size: u64, pub long_poll_timeout: Option, pub protocol_version_hint: DatabaseSyncEngineProtocolVersion, + pub bootstrap_if_empty: bool, + pub reserved_bytes: usize, } pub struct DatabaseSyncEngine { @@ -73,9 +75,6 @@ impl DatabaseSyncEngine

{ let meta_path = format!("{main_db_path}-info"); let changes_path = format!("{main_db_path}-changes"); - let db_file = io.open_file(main_db_path, turso_core::OpenFlags::Create, false)?; - let db_file = Arc::new(turso_core::storage::database::DatabaseFile::new(db_file)); - tracing::info!("init(path={}): opts={:?}", main_db_path, opts); let completion = protocol.full_read(&meta_path)?; @@ -88,7 +87,7 @@ impl DatabaseSyncEngine

{ let meta = match meta { Some(meta) => meta, - None => { + None if opts.bootstrap_if_empty => { let client_unique_id = format!("{}-{}", opts.client_name, uuid::Uuid::new_v4()); let revision = bootstrap_db_file( coro, @@ -106,7 +105,31 @@ impl DatabaseSyncEngine

{ revert_since_wal_watermark: 0, last_pushed_change_id_hint: 0, last_pushed_pull_gen_hint: 0, - last_pull_unix_time: io.now().secs, + last_pull_unix_time: Some(io.now().secs), + last_push_unix_time: None, + }; + tracing::info!("write meta after successful bootstrap: meta={meta:?}"); + let completion = protocol.full_write(&meta_path, meta.dump()?)?; + // todo: what happen if we will actually update the metadata on disk but fail and so in memory state will not be updated + wait_all_results(coro, &completion).await?; + meta + } + None => { + if opts.protocol_version_hint == DatabaseSyncEngineProtocolVersion::Legacy { + return Err(Error::DatabaseSyncEngineError( + "deferred bootstrap is not supported for legacy protocol".to_string(), + )); + } + let client_unique_id = format!("{}-{}", opts.client_name, uuid::Uuid::new_v4()); + let meta = DatabaseMetadata { + version: DATABASE_METADATA_VERSION.to_string(), + client_unique_id, + synced_revision: None, + revert_since_wal_salt: None, + revert_since_wal_watermark: 0, + last_pushed_change_id_hint: 0, + last_pushed_pull_gen_hint: 0, + last_pull_unix_time: None, last_push_unix_time: None, }; tracing::info!("write meta after successful bootstrap: meta={meta:?}"); @@ -127,11 +150,14 @@ impl DatabaseSyncEngine

{ tracing::info!("check if main db file exists"); let main_exists = io.try_open(main_db_path)?.is_some(); - if !main_exists { + if !main_exists && meta.synced_revision.is_some() { let error = "main DB file doesn't exists, but metadata is".to_string(); return Err(Error::DatabaseSyncEngineError(error)); } + let db_file = io.open_file(main_db_path, turso_core::OpenFlags::Create, false)?; + let db_file = Arc::new(turso_core::storage::database::DatabaseFile::new(db_file)); + let main_db = turso_core::Database::open_with_flags( io.clone(), main_db_path, @@ -140,6 +166,17 @@ impl DatabaseSyncEngine

{ turso_core::DatabaseOpts::new().with_indexes(true), None, )?; + + // DB wasn't bootstrapped but remote is encrypted - so we must properly set reserved bytes field in advance + if meta.synced_revision.is_none() && opts.reserved_bytes != 0 { + let conn = main_db.connect()?; + conn.set_reserved_bytes(opts.reserved_bytes as u8)?; + + // write transaction forces allocation of root DB page + conn.execute("BEGIN IMMEDIATE")?; + conn.execute("COMMIT")?; + } + let tape_opts = DatabaseTapeOpts { cdc_table: None, cdc_mode: Some("full".to_string()), @@ -162,11 +199,11 @@ impl DatabaseSyncEngine

{ client_unique_id: meta.client_unique_id.clone(), }; - let synced_revision = meta.synced_revision.as_ref().unwrap(); - if let DatabasePullRevision::Legacy { + let synced_revision = meta.synced_revision.as_ref(); + if let Some(DatabasePullRevision::Legacy { synced_frame_no: None, .. - } = synced_revision + }) = synced_revision { // sync WAL from the remote in case of bootstrap - all subsequent initializations will be fast db.pull_changes_from_remote(coro).await?; @@ -380,7 +417,7 @@ impl DatabaseSyncEngine

{ let file = acquire_slot(&self.changes_file)?; let now = self.io.now(); - let revision = self.meta().synced_revision.clone().unwrap(); + let revision = self.meta().synced_revision.clone(); let next_revision = wal_pull_to_file( coro, self.protocol.as_ref(), @@ -427,7 +464,7 @@ impl DatabaseSyncEngine

{ ) -> Result<()> { if remote_changes.file_slot.is_none() { self.update_meta(coro, |m| { - m.last_pull_unix_time = remote_changes.time.secs; + m.last_pull_unix_time = Some(remote_changes.time.secs); }) .await?; return Ok(()); @@ -450,7 +487,7 @@ impl DatabaseSyncEngine

{ m.revert_since_wal_watermark = revert_since_wal_watermark; m.synced_revision = Some(remote_changes.revision); m.last_pushed_change_id_hint = 0; - m.last_pull_unix_time = remote_changes.time.secs; + m.last_pull_unix_time = Some(remote_changes.time.secs); }) .await?; Ok(()) diff --git a/sync/engine/src/database_sync_operations.rs b/sync/engine/src/database_sync_operations.rs index 00d58360a..60f37f1ff 100644 --- a/sync/engine/src/database_sync_operations.rs +++ b/sync/engine/src/database_sync_operations.rs @@ -153,7 +153,7 @@ pub async fn wal_apply_from_file( coro.yield_(ProtocolCommand::IO).await?; } let info = WalFrameInfo::from_frame_header(buffer.as_slice()); - tracing::debug!("got frame: {:?}", info); + tracing::info!("got frame: {:?}", info); db_size = info.db_size; session.append_page(info.page_no, &buffer.as_slice()[WAL_FRAME_HEADER..])?; } @@ -165,7 +165,7 @@ pub async fn wal_pull_to_file( coro: &Coro, client: &C, frames_file: &Arc, - revision: &DatabasePullRevision, + revision: &Option, wal_pull_batch_size: u64, long_poll_timeout: Option, ) -> Result { @@ -181,10 +181,10 @@ pub async fn wal_pull_to_file( coro.yield_(ProtocolCommand::IO).await?; } match revision { - DatabasePullRevision::Legacy { + Some(DatabasePullRevision::Legacy { generation, synced_frame_no, - } => { + }) => { let start_frame = synced_frame_no.unwrap_or(0) + 1; wal_pull_to_file_legacy( coro, @@ -196,9 +196,10 @@ pub async fn wal_pull_to_file( ) .await } - DatabasePullRevision::V1 { revision } => { + Some(DatabasePullRevision::V1 { revision }) => { wal_pull_to_file_v1(coro, client, frames_file, revision, long_poll_timeout).await } + None => wal_pull_to_file_v1(coro, client, frames_file, "", long_poll_timeout).await, } } diff --git a/sync/engine/src/database_tape.rs b/sync/engine/src/database_tape.rs index bce8acc79..e48939a9c 100644 --- a/sync/engine/src/database_tape.rs +++ b/sync/engine/src/database_tape.rs @@ -1200,7 +1200,7 @@ mod tests { turso_core::Value::Text(turso_core::types::Text::new("t")), turso_core::Value::Integer(6), turso_core::Value::Text(turso_core::types::Text::new( - "CREATE INDEX t_idx ON t (y)" + "CREATE INDEX IF NOT EXISTS t_idx ON t (y)" )), ] ] @@ -1387,4 +1387,91 @@ mod tests { } } } + + #[test] + pub fn test_database_tape_replay_ddl_changes_idempotent() { + let temp_file1 = NamedTempFile::new().unwrap(); + let db_path1 = temp_file1.path().to_str().unwrap(); + let temp_file2 = NamedTempFile::new().unwrap(); + let db_path2 = temp_file2.path().to_str().unwrap(); + let temp_file3 = NamedTempFile::new().unwrap(); + let db_path3 = temp_file3.path().to_str().unwrap(); + + let io: Arc = Arc::new(turso_core::PlatformIO::new().unwrap()); + + let db1 = turso_core::Database::open_file(io.clone(), db_path1, false, true).unwrap(); + let db1 = Arc::new(DatabaseTape::new(db1)); + + let db2 = turso_core::Database::open_file(io.clone(), db_path2, false, true).unwrap(); + let db2 = Arc::new(DatabaseTape::new(db2)); + + let db3 = turso_core::Database::open_file(io.clone(), db_path3, false, true).unwrap(); + let db3 = Arc::new(DatabaseTape::new(db3)); + + let mut gen = genawaiter::sync::Gen::new({ + |coro| async move { + let coro: Coro<()> = coro.into(); + let conn1 = db1.connect(&coro).await.unwrap(); + conn1 + .execute("CREATE TABLE t(x TEXT PRIMARY KEY, y, z)") + .unwrap(); + conn1.execute("CREATE INDEX t_idx ON t(y, z)").unwrap(); + + let conn2 = db2.connect(&coro).await.unwrap(); + conn2 + .execute("CREATE TABLE t(x TEXT PRIMARY KEY, y, z)") + .unwrap(); + conn2.execute("CREATE INDEX t_idx ON t(y, z)").unwrap(); + + let conn3 = db3.connect(&coro).await.unwrap(); + { + let opts = DatabaseReplaySessionOpts { + use_implicit_rowid: false, + }; + let mut session = db3.start_replay_session(&coro, opts).await.unwrap(); + + let opts = DatabaseChangesIteratorOpts { + ignore_schema_changes: false, + ..Default::default() + }; + let mut iterator = db1.iterate_changes(opts.clone()).unwrap(); + while let Some(operation) = iterator.next(&coro).await.unwrap() { + tracing::info!("1. operation: {:?}", operation); + session.replay(&coro, operation).await.unwrap(); + } + + let mut iterator = db2.iterate_changes(opts.clone()).unwrap(); + while let Some(operation) = iterator.next(&coro).await.unwrap() { + tracing::info!("2. operation: {:?}", operation); + session.replay(&coro, operation).await.unwrap(); + } + } + let mut rows = Vec::new(); + let mut stmt = conn3.prepare("SELECT name FROM sqlite_master").unwrap(); + while let Some(row) = run_stmt_once(&coro, &mut stmt).await.unwrap() { + rows.push(row.get_value(0).to_text().unwrap().to_string()); + } + assert_eq!( + rows, + vec![ + "sqlite_sequence".to_string(), + "turso_cdc".to_string(), + "t".to_string(), + "sqlite_autoindex_t_1".to_string(), + "t_idx".to_string() + ] + ); + crate::Result::Ok(()) + } + }); + loop { + match gen.resume_with(Ok(())) { + genawaiter::GeneratorState::Yielded(..) => io.step().unwrap(), + genawaiter::GeneratorState::Complete(result) => { + result.unwrap(); + break; + } + } + } + } } diff --git a/sync/engine/src/types.rs b/sync/engine/src/types.rs index 1b78e8cb1..4c08ee394 100644 --- a/sync/engine/src/types.rs +++ b/sync/engine/src/types.rs @@ -65,7 +65,7 @@ pub struct SyncEngineStats { pub cdc_operations: i64, pub main_wal_size: u64, pub revert_wal_size: u64, - pub last_pull_unix_time: i64, + pub last_pull_unix_time: Option, pub last_push_unix_time: Option, pub revision: Option, } @@ -90,7 +90,7 @@ pub struct DatabaseMetadata { pub revert_since_wal_salt: Option>, pub revert_since_wal_watermark: u64, /// Unix time of last successful pull - pub last_pull_unix_time: i64, + pub last_pull_unix_time: Option, /// Unix time of last successful push pub last_push_unix_time: Option, pub last_pushed_pull_gen_hint: i64,