mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-11 19:24:21 +01:00
Merge 'Sync engine defered sync' from Nikita Sivukhin
This PR makes sync client completely autonomous as now it can defer initial sync. This can open possibility to asynchronously create DB in the Turso Cloud while giving user ability to interact with local DB straight away. Closes #3531
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -4589,6 +4589,7 @@ dependencies = [
|
||||
"tracing-subscriber",
|
||||
"turso",
|
||||
"turso_core",
|
||||
"turso_parser",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
|
||||
@@ -22,15 +22,25 @@ async function process(opts: RunOpts, io: ProtocolIo, request: any) {
|
||||
const requestType = request.request();
|
||||
const completion = request.completion();
|
||||
if (requestType.type == 'Http') {
|
||||
let url: string | null = null;
|
||||
if (typeof opts.url == "function") {
|
||||
url = opts.url();
|
||||
} else {
|
||||
url = opts.url;
|
||||
}
|
||||
if (url == null) {
|
||||
completion.poison(`url is empty - sync is paused`);
|
||||
return;
|
||||
}
|
||||
try {
|
||||
let headers = typeof opts.headers === "function" ? opts.headers() : opts.headers;
|
||||
let headers = typeof opts.headers === "function" ? await opts.headers() : opts.headers;
|
||||
if (requestType.headers != null && requestType.headers.length > 0) {
|
||||
headers = { ...opts.headers };
|
||||
for (let header of requestType.headers) {
|
||||
headers[header[0]] = header[1];
|
||||
}
|
||||
}
|
||||
const response = await fetch(`${opts.url}${requestType.path}`, {
|
||||
const response = await fetch(`${url}${requestType.path}`, {
|
||||
method: requestType.method,
|
||||
headers: headers,
|
||||
body: requestType.body != null ? new Uint8Array(requestType.body) : null,
|
||||
|
||||
@@ -61,7 +61,7 @@ export interface EncryptionOpts {
|
||||
// base64 encoded encryption key (must be either 16 or 32 bytes depending on the cipher)
|
||||
key: string,
|
||||
// encryption cipher algorithm
|
||||
cipher: 'aes256gcm' | 'aes128gcm' | 'chacha20poly1305'
|
||||
cipher: 'aes256gcm' | 'aes128gcm' | 'chacha20poly1305' | 'aegis256'
|
||||
}
|
||||
export interface DatabaseOpts {
|
||||
/**
|
||||
@@ -73,22 +73,26 @@ export interface DatabaseOpts {
|
||||
/**
|
||||
* optional url of the remote database (e.g. libsql://db-org.turso.io)
|
||||
* (if omitted - local-only database will be created)
|
||||
*
|
||||
* you can also promide function which will return URL or null
|
||||
* in this case local database will be created and sync will be "switched-on" whenever the url will return non-empty value
|
||||
* note, that all other parameters (like encryption) must be set in advance in order for the "deferred" sync to work properly
|
||||
*/
|
||||
url?: string;
|
||||
url?: string | (() => string | null);
|
||||
/**
|
||||
* auth token for the remote database
|
||||
* (can be either static string or function which will provide short-lived credentials for every new request)
|
||||
*/
|
||||
authToken?: string | (() => string);
|
||||
authToken?: string | (() => Promise<string>);
|
||||
/**
|
||||
* arbitrary client name which can be used to distinguish clients internally
|
||||
* the library will gurantee uniquiness of the clientId by appending unique suffix to the clientName
|
||||
*/
|
||||
clientName?: string;
|
||||
/**
|
||||
* optional encryption parameters if cloud database were encrypted by default
|
||||
* optional remote encryption parameters if cloud database were encrypted by default
|
||||
*/
|
||||
encryption?: EncryptionOpts;
|
||||
remoteEncryption?: EncryptionOpts;
|
||||
/**
|
||||
* optional callback which will be called for every mutation before sending it to the remote
|
||||
* this callback can transform the update in order to support complex conflict resolution strategy
|
||||
@@ -136,8 +140,8 @@ export interface DatabaseStats {
|
||||
|
||||
export interface RunOpts {
|
||||
preemptionMs: number,
|
||||
url: string,
|
||||
headers: { [K: string]: string } | (() => { [K: string]: string })
|
||||
url: string | (() => string | null),
|
||||
headers: { [K: string]: string } | (() => Promise<{ [K: string]: string }>)
|
||||
transform?: Transform,
|
||||
}
|
||||
|
||||
|
||||
@@ -220,7 +220,7 @@ export type DatabaseRowTransformResultJs =
|
||||
export type GeneratorResponse =
|
||||
| { type: 'IO' }
|
||||
| { type: 'Done' }
|
||||
| { type: 'SyncEngineStats', operations: number, mainWal: number, revertWal: number, lastPullUnixTime: number, lastPushUnixTime?: number, revision?: string }
|
||||
| { type: 'SyncEngineStats', operations: number, mainWal: number, revertWal: number, lastPullUnixTime?: number, lastPushUnixTime?: number, revision?: string }
|
||||
| { type: 'SyncEngineChanges', changes: SyncEngineChanges }
|
||||
|
||||
export type JsProtocolRequest =
|
||||
@@ -238,6 +238,8 @@ export interface SyncEngineOpts {
|
||||
tablesIgnore?: Array<string>
|
||||
useTransform: boolean
|
||||
protocolVersion?: SyncEngineProtocolVersion
|
||||
bootstrapIfEmpty: boolean
|
||||
remoteEncryption?: string
|
||||
}
|
||||
|
||||
export declare const enum SyncEngineProtocolVersion {
|
||||
|
||||
@@ -81,8 +81,8 @@ function requireNative() {
|
||||
try {
|
||||
const binding = require('@tursodatabase/sync-android-arm64')
|
||||
const bindingPackageVersion = require('@tursodatabase/sync-android-arm64/package.json').version
|
||||
if (bindingPackageVersion !== '0.2.0-pre.11' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.11 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
if (bindingPackageVersion !== '0.2.0-pre.13' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.13 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
}
|
||||
return binding
|
||||
} catch (e) {
|
||||
@@ -97,8 +97,8 @@ function requireNative() {
|
||||
try {
|
||||
const binding = require('@tursodatabase/sync-android-arm-eabi')
|
||||
const bindingPackageVersion = require('@tursodatabase/sync-android-arm-eabi/package.json').version
|
||||
if (bindingPackageVersion !== '0.2.0-pre.11' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.11 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
if (bindingPackageVersion !== '0.2.0-pre.13' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.13 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
}
|
||||
return binding
|
||||
} catch (e) {
|
||||
@@ -117,8 +117,8 @@ function requireNative() {
|
||||
try {
|
||||
const binding = require('@tursodatabase/sync-win32-x64-msvc')
|
||||
const bindingPackageVersion = require('@tursodatabase/sync-win32-x64-msvc/package.json').version
|
||||
if (bindingPackageVersion !== '0.2.0-pre.11' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.11 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
if (bindingPackageVersion !== '0.2.0-pre.13' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.13 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
}
|
||||
return binding
|
||||
} catch (e) {
|
||||
@@ -133,8 +133,8 @@ function requireNative() {
|
||||
try {
|
||||
const binding = require('@tursodatabase/sync-win32-ia32-msvc')
|
||||
const bindingPackageVersion = require('@tursodatabase/sync-win32-ia32-msvc/package.json').version
|
||||
if (bindingPackageVersion !== '0.2.0-pre.11' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.11 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
if (bindingPackageVersion !== '0.2.0-pre.13' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.13 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
}
|
||||
return binding
|
||||
} catch (e) {
|
||||
@@ -149,8 +149,8 @@ function requireNative() {
|
||||
try {
|
||||
const binding = require('@tursodatabase/sync-win32-arm64-msvc')
|
||||
const bindingPackageVersion = require('@tursodatabase/sync-win32-arm64-msvc/package.json').version
|
||||
if (bindingPackageVersion !== '0.2.0-pre.11' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.11 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
if (bindingPackageVersion !== '0.2.0-pre.13' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.13 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
}
|
||||
return binding
|
||||
} catch (e) {
|
||||
@@ -168,8 +168,8 @@ function requireNative() {
|
||||
try {
|
||||
const binding = require('@tursodatabase/sync-darwin-universal')
|
||||
const bindingPackageVersion = require('@tursodatabase/sync-darwin-universal/package.json').version
|
||||
if (bindingPackageVersion !== '0.2.0-pre.11' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.11 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
if (bindingPackageVersion !== '0.2.0-pre.13' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.13 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
}
|
||||
return binding
|
||||
} catch (e) {
|
||||
@@ -184,8 +184,8 @@ function requireNative() {
|
||||
try {
|
||||
const binding = require('@tursodatabase/sync-darwin-x64')
|
||||
const bindingPackageVersion = require('@tursodatabase/sync-darwin-x64/package.json').version
|
||||
if (bindingPackageVersion !== '0.2.0-pre.11' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.11 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
if (bindingPackageVersion !== '0.2.0-pre.13' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.13 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
}
|
||||
return binding
|
||||
} catch (e) {
|
||||
@@ -200,8 +200,8 @@ function requireNative() {
|
||||
try {
|
||||
const binding = require('@tursodatabase/sync-darwin-arm64')
|
||||
const bindingPackageVersion = require('@tursodatabase/sync-darwin-arm64/package.json').version
|
||||
if (bindingPackageVersion !== '0.2.0-pre.11' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.11 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
if (bindingPackageVersion !== '0.2.0-pre.13' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.13 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
}
|
||||
return binding
|
||||
} catch (e) {
|
||||
@@ -220,8 +220,8 @@ function requireNative() {
|
||||
try {
|
||||
const binding = require('@tursodatabase/sync-freebsd-x64')
|
||||
const bindingPackageVersion = require('@tursodatabase/sync-freebsd-x64/package.json').version
|
||||
if (bindingPackageVersion !== '0.2.0-pre.11' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.11 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
if (bindingPackageVersion !== '0.2.0-pre.13' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.13 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
}
|
||||
return binding
|
||||
} catch (e) {
|
||||
@@ -236,8 +236,8 @@ function requireNative() {
|
||||
try {
|
||||
const binding = require('@tursodatabase/sync-freebsd-arm64')
|
||||
const bindingPackageVersion = require('@tursodatabase/sync-freebsd-arm64/package.json').version
|
||||
if (bindingPackageVersion !== '0.2.0-pre.11' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.11 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
if (bindingPackageVersion !== '0.2.0-pre.13' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.13 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
}
|
||||
return binding
|
||||
} catch (e) {
|
||||
@@ -257,8 +257,8 @@ function requireNative() {
|
||||
try {
|
||||
const binding = require('@tursodatabase/sync-linux-x64-musl')
|
||||
const bindingPackageVersion = require('@tursodatabase/sync-linux-x64-musl/package.json').version
|
||||
if (bindingPackageVersion !== '0.2.0-pre.11' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.11 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
if (bindingPackageVersion !== '0.2.0-pre.13' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.13 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
}
|
||||
return binding
|
||||
} catch (e) {
|
||||
@@ -273,8 +273,8 @@ function requireNative() {
|
||||
try {
|
||||
const binding = require('@tursodatabase/sync-linux-x64-gnu')
|
||||
const bindingPackageVersion = require('@tursodatabase/sync-linux-x64-gnu/package.json').version
|
||||
if (bindingPackageVersion !== '0.2.0-pre.11' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.11 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
if (bindingPackageVersion !== '0.2.0-pre.13' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.13 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
}
|
||||
return binding
|
||||
} catch (e) {
|
||||
@@ -291,8 +291,8 @@ function requireNative() {
|
||||
try {
|
||||
const binding = require('@tursodatabase/sync-linux-arm64-musl')
|
||||
const bindingPackageVersion = require('@tursodatabase/sync-linux-arm64-musl/package.json').version
|
||||
if (bindingPackageVersion !== '0.2.0-pre.11' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.11 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
if (bindingPackageVersion !== '0.2.0-pre.13' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.13 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
}
|
||||
return binding
|
||||
} catch (e) {
|
||||
@@ -307,8 +307,8 @@ function requireNative() {
|
||||
try {
|
||||
const binding = require('@tursodatabase/sync-linux-arm64-gnu')
|
||||
const bindingPackageVersion = require('@tursodatabase/sync-linux-arm64-gnu/package.json').version
|
||||
if (bindingPackageVersion !== '0.2.0-pre.11' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.11 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
if (bindingPackageVersion !== '0.2.0-pre.13' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.13 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
}
|
||||
return binding
|
||||
} catch (e) {
|
||||
@@ -325,8 +325,8 @@ function requireNative() {
|
||||
try {
|
||||
const binding = require('@tursodatabase/sync-linux-arm-musleabihf')
|
||||
const bindingPackageVersion = require('@tursodatabase/sync-linux-arm-musleabihf/package.json').version
|
||||
if (bindingPackageVersion !== '0.2.0-pre.11' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.11 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
if (bindingPackageVersion !== '0.2.0-pre.13' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.13 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
}
|
||||
return binding
|
||||
} catch (e) {
|
||||
@@ -341,8 +341,8 @@ function requireNative() {
|
||||
try {
|
||||
const binding = require('@tursodatabase/sync-linux-arm-gnueabihf')
|
||||
const bindingPackageVersion = require('@tursodatabase/sync-linux-arm-gnueabihf/package.json').version
|
||||
if (bindingPackageVersion !== '0.2.0-pre.11' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.11 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
if (bindingPackageVersion !== '0.2.0-pre.13' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.13 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
}
|
||||
return binding
|
||||
} catch (e) {
|
||||
@@ -359,8 +359,8 @@ function requireNative() {
|
||||
try {
|
||||
const binding = require('@tursodatabase/sync-linux-riscv64-musl')
|
||||
const bindingPackageVersion = require('@tursodatabase/sync-linux-riscv64-musl/package.json').version
|
||||
if (bindingPackageVersion !== '0.2.0-pre.11' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.11 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
if (bindingPackageVersion !== '0.2.0-pre.13' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.13 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
}
|
||||
return binding
|
||||
} catch (e) {
|
||||
@@ -375,8 +375,8 @@ function requireNative() {
|
||||
try {
|
||||
const binding = require('@tursodatabase/sync-linux-riscv64-gnu')
|
||||
const bindingPackageVersion = require('@tursodatabase/sync-linux-riscv64-gnu/package.json').version
|
||||
if (bindingPackageVersion !== '0.2.0-pre.11' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.11 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
if (bindingPackageVersion !== '0.2.0-pre.13' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.13 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
}
|
||||
return binding
|
||||
} catch (e) {
|
||||
@@ -392,8 +392,8 @@ function requireNative() {
|
||||
try {
|
||||
const binding = require('@tursodatabase/sync-linux-ppc64-gnu')
|
||||
const bindingPackageVersion = require('@tursodatabase/sync-linux-ppc64-gnu/package.json').version
|
||||
if (bindingPackageVersion !== '0.2.0-pre.11' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.11 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
if (bindingPackageVersion !== '0.2.0-pre.13' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.13 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
}
|
||||
return binding
|
||||
} catch (e) {
|
||||
@@ -408,8 +408,8 @@ function requireNative() {
|
||||
try {
|
||||
const binding = require('@tursodatabase/sync-linux-s390x-gnu')
|
||||
const bindingPackageVersion = require('@tursodatabase/sync-linux-s390x-gnu/package.json').version
|
||||
if (bindingPackageVersion !== '0.2.0-pre.11' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.11 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
if (bindingPackageVersion !== '0.2.0-pre.13' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.13 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
}
|
||||
return binding
|
||||
} catch (e) {
|
||||
@@ -428,8 +428,8 @@ function requireNative() {
|
||||
try {
|
||||
const binding = require('@tursodatabase/sync-openharmony-arm64')
|
||||
const bindingPackageVersion = require('@tursodatabase/sync-openharmony-arm64/package.json').version
|
||||
if (bindingPackageVersion !== '0.2.0-pre.11' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.11 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
if (bindingPackageVersion !== '0.2.0-pre.13' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.13 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
}
|
||||
return binding
|
||||
} catch (e) {
|
||||
@@ -444,8 +444,8 @@ function requireNative() {
|
||||
try {
|
||||
const binding = require('@tursodatabase/sync-openharmony-x64')
|
||||
const bindingPackageVersion = require('@tursodatabase/sync-openharmony-x64/package.json').version
|
||||
if (bindingPackageVersion !== '0.2.0-pre.11' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.11 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
if (bindingPackageVersion !== '0.2.0-pre.13' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.13 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
}
|
||||
return binding
|
||||
} catch (e) {
|
||||
@@ -460,8 +460,8 @@ function requireNative() {
|
||||
try {
|
||||
const binding = require('@tursodatabase/sync-openharmony-arm')
|
||||
const bindingPackageVersion = require('@tursodatabase/sync-openharmony-arm/package.json').version
|
||||
if (bindingPackageVersion !== '0.2.0-pre.11' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.11 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
if (bindingPackageVersion !== '0.2.0-pre.13' && process.env.NAPI_RS_ENFORCE_VERSION_CHECK && process.env.NAPI_RS_ENFORCE_VERSION_CHECK !== '0') {
|
||||
throw new Error(`Native binding package version mismatch, expected 0.2.0-pre.13 but got ${bindingPackageVersion}. You can reinstall dependencies to fix this issue.`)
|
||||
}
|
||||
return binding
|
||||
} catch (e) {
|
||||
|
||||
@@ -3,6 +3,7 @@ import { expect, test } from 'vitest'
|
||||
import { connect, Database, DatabaseRowMutation, DatabaseRowTransformResult } from './promise.js'
|
||||
|
||||
const localeCompare = (a, b) => a.x.localeCompare(b.x);
|
||||
const intCompare = (a, b) => a.x - b.x;
|
||||
|
||||
function cleanup(path) {
|
||||
unlinkSync(path);
|
||||
@@ -12,6 +13,15 @@ function cleanup(path) {
|
||||
try { unlinkSync(`${path}-wal-revert`) } catch (e) { }
|
||||
}
|
||||
|
||||
test('simple-db', async () => {
|
||||
const db = new Database({ path: ':memory:' });
|
||||
expect(await db.prepare("SELECT 1 as x").all()).toEqual([{ x: 1 }])
|
||||
await db.exec("CREATE TABLE t(x)");
|
||||
await db.exec("INSERT INTO t VALUES (1), (2), (3)");
|
||||
expect(await db.prepare("SELECT * FROM t").all()).toEqual([{ x: 1 }, { x: 2 }, { x: 3 }])
|
||||
await expect(async () => await db.pull()).rejects.toThrowError(/sync is disabled as database was opened without sync support/);
|
||||
})
|
||||
|
||||
test('implicit connect', async () => {
|
||||
const db = new Database({ path: ':memory:', url: process.env.VITE_TURSO_DB_URL });
|
||||
const defer = db.prepare("SELECT * FROM not_found");
|
||||
@@ -20,6 +30,74 @@ test('implicit connect', async () => {
|
||||
expect(await db.prepare("SELECT 1 as x").all()).toEqual([{ x: 1 }]);
|
||||
})
|
||||
|
||||
test('defered sync', async () => {
|
||||
{
|
||||
const db = await connect({ path: ':memory:', url: process.env.VITE_TURSO_DB_URL });
|
||||
await db.exec("CREATE TABLE IF NOT EXISTS t(x)");
|
||||
await db.exec("DELETE FROM t");
|
||||
await db.exec("INSERT INTO t VALUES (100)");
|
||||
await db.push();
|
||||
await db.close();
|
||||
}
|
||||
|
||||
let url = null;
|
||||
const db = new Database({ path: ':memory:', url: () => url });
|
||||
await db.prepare("CREATE TABLE t(x)").run();
|
||||
await db.prepare("INSERT INTO t VALUES (1), (2), (3), (42)").run();
|
||||
expect(await db.prepare("SELECT * FROM t").all()).toEqual([{ x: 1 }, { x: 2 }, { x: 3 }, { x: 42 }]);
|
||||
await expect(async () => await db.pull()).rejects.toThrow(/url is empty - sync is paused/);
|
||||
url = process.env.VITE_TURSO_DB_URL;
|
||||
await db.pull();
|
||||
expect(await db.prepare("SELECT * FROM t").all()).toEqual([{ x: 100 }, { x: 1 }, { x: 2 }, { x: 3 }, { x: 42 }]);
|
||||
})
|
||||
|
||||
test('encryption sync', async () => {
|
||||
const KEY = 'l/FWopMfZisTLgBX4A42AergrCrYKjiO3BfkJUwv83I=';
|
||||
const URL = 'http://encrypted--a--a.localhost:10000';
|
||||
{
|
||||
const db = await connect({ path: ':memory:', url: URL, remoteEncryption: { key: KEY, cipher: 'aes256gcm' } });
|
||||
await db.exec("CREATE TABLE IF NOT EXISTS t(x)");
|
||||
await db.exec("DELETE FROM t");
|
||||
await db.push();
|
||||
await db.close();
|
||||
}
|
||||
const db1 = await connect({ path: ':memory:', url: URL, remoteEncryption: { key: KEY, cipher: 'aes256gcm' } });
|
||||
const db2 = await connect({ path: ':memory:', url: URL, remoteEncryption: { key: KEY, cipher: 'aes256gcm' } });
|
||||
await db1.exec("INSERT INTO t VALUES (1), (2), (3)");
|
||||
await db2.exec("INSERT INTO t VALUES (4), (5), (6)");
|
||||
expect(await db1.prepare("SELECT * FROM t").all()).toEqual([{ x: 1 }, { x: 2 }, { x: 3 }]);
|
||||
expect(await db2.prepare("SELECT * FROM t").all()).toEqual([{ x: 4 }, { x: 5 }, { x: 6 }]);
|
||||
await Promise.all([db1.push(), db2.push()]);
|
||||
await Promise.all([db1.pull(), db2.pull()]);
|
||||
const expected = [{ x: 1 }, { x: 2 }, { x: 3 }, { x: 4 }, { x: 5 }, { x: 6 }];
|
||||
expect((await db1.prepare("SELECT * FROM t").all()).sort(intCompare)).toEqual(expected.sort(intCompare));
|
||||
expect((await db2.prepare("SELECT * FROM t").all()).sort(intCompare)).toEqual(expected.sort(intCompare));
|
||||
});
|
||||
|
||||
test('defered encryption sync', async () => {
|
||||
const URL = 'http://encrypted--a--a.localhost:10000';
|
||||
const KEY = 'l/FWopMfZisTLgBX4A42AergrCrYKjiO3BfkJUwv83I=';
|
||||
let url = null;
|
||||
{
|
||||
const db = await connect({ path: ':memory:', url: URL, remoteEncryption: { key: KEY, cipher: 'aes256gcm' } });
|
||||
await db.exec("CREATE TABLE IF NOT EXISTS t(x)");
|
||||
await db.exec("DELETE FROM t");
|
||||
await db.exec("INSERT INTO t VALUES (100)");
|
||||
await db.push();
|
||||
await db.close();
|
||||
}
|
||||
const db = await connect({ path: ':memory:', url: () => url, remoteEncryption: { key: KEY, cipher: 'aes256gcm' } });
|
||||
await db.exec("CREATE TABLE IF NOT EXISTS t(x)");
|
||||
await db.exec("INSERT INTO t VALUES (1), (2), (3)");
|
||||
expect(await db.prepare("SELECT * FROM t").all()).toEqual([{ x: 1 }, { x: 2 }, { x: 3 }]);
|
||||
|
||||
url = URL;
|
||||
await db.pull();
|
||||
|
||||
const expected = [{ x: 100 }, { x: 1 }, { x: 2 }, { x: 3 }];
|
||||
expect((await db.prepare("SELECT * FROM t").all())).toEqual(expected);
|
||||
});
|
||||
|
||||
test('select-after-push', async () => {
|
||||
{
|
||||
const db = await connect({ path: ':memory:', url: process.env.VITE_TURSO_DB_URL });
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import { DatabasePromise } from "@tursodatabase/database-common"
|
||||
import { ProtocolIo, run, DatabaseOpts, EncryptionOpts, RunOpts, DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult, DatabaseStats, SyncEngineGuards } from "@tursodatabase/sync-common";
|
||||
import { SyncEngine, SyncEngineProtocolVersion } from "#index";
|
||||
import { SyncEngine, SyncEngineProtocolVersion, Database as NativeDatabase } from "#index";
|
||||
import { promises } from "node:fs";
|
||||
|
||||
let NodeIO: ProtocolIo = {
|
||||
@@ -45,6 +45,12 @@ class Database extends DatabasePromise {
|
||||
#io: ProtocolIo;
|
||||
#guards: SyncEngineGuards
|
||||
constructor(opts: DatabaseOpts) {
|
||||
if (opts.url == null) {
|
||||
super(new NativeDatabase(opts.path, { tracing: opts.tracing }) as any);
|
||||
this.#engine = null;
|
||||
return;
|
||||
}
|
||||
|
||||
const engine = new SyncEngine({
|
||||
path: opts.path,
|
||||
clientName: opts.clientName,
|
||||
@@ -52,23 +58,31 @@ class Database extends DatabasePromise {
|
||||
protocolVersion: SyncEngineProtocolVersion.V1,
|
||||
longPollTimeoutMs: opts.longPollTimeoutMs,
|
||||
tracing: opts.tracing,
|
||||
bootstrapIfEmpty: typeof opts.url != "function" || opts.url() != null,
|
||||
remoteEncryption: opts.remoteEncryption?.cipher,
|
||||
});
|
||||
super(engine.db() as unknown as any);
|
||||
|
||||
|
||||
let headers = typeof opts.authToken === "function" ? () => ({
|
||||
...(opts.authToken != null && { "Authorization": `Bearer ${(opts.authToken as any)()}` }),
|
||||
...(opts.encryption != null && {
|
||||
"x-turso-encryption-key": opts.encryption.key,
|
||||
"x-turso-encryption-cipher": opts.encryption.cipher,
|
||||
})
|
||||
}) : {
|
||||
...(opts.authToken != null && { "Authorization": `Bearer ${opts.authToken}` }),
|
||||
...(opts.encryption != null && {
|
||||
"x-turso-encryption-key": opts.encryption.key,
|
||||
"x-turso-encryption-cipher": opts.encryption.cipher,
|
||||
})
|
||||
};
|
||||
let headers: { [K: string]: string } | (() => Promise<{ [K: string]: string }>);
|
||||
if (typeof opts.authToken == "function") {
|
||||
const authToken = opts.authToken;
|
||||
headers = async () => ({
|
||||
...(opts.authToken != null && { "Authorization": `Bearer ${await authToken()}` }),
|
||||
...(opts.remoteEncryption != null && {
|
||||
"x-turso-encryption-key": opts.remoteEncryption.key,
|
||||
"x-turso-encryption-cipher": opts.remoteEncryption.cipher,
|
||||
})
|
||||
});
|
||||
} else {
|
||||
const authToken = opts.authToken;
|
||||
headers = {
|
||||
...(opts.authToken != null && { "Authorization": `Bearer ${authToken}` }),
|
||||
...(opts.remoteEncryption != null && {
|
||||
"x-turso-encryption-key": opts.remoteEncryption.key,
|
||||
"x-turso-encryption-cipher": opts.remoteEncryption.cipher,
|
||||
})
|
||||
};
|
||||
}
|
||||
this.#runOpts = {
|
||||
url: opts.url,
|
||||
headers: headers,
|
||||
@@ -83,8 +97,13 @@ class Database extends DatabasePromise {
|
||||
* connect database and initialize it in case of clean start
|
||||
*/
|
||||
override async connect() {
|
||||
if (this.connected) { return; }
|
||||
await run(this.#runOpts, this.#io, this.#engine, this.#engine.connect());
|
||||
if (this.connected) {
|
||||
return;
|
||||
} else if (this.#engine == null) {
|
||||
await super.connect();
|
||||
} else {
|
||||
await run(this.#runOpts, this.#io, this.#engine, this.#engine.connect());
|
||||
}
|
||||
this.connected = true;
|
||||
}
|
||||
/**
|
||||
@@ -93,6 +112,9 @@ class Database extends DatabasePromise {
|
||||
* @returns true if new changes were pulled from the remote
|
||||
*/
|
||||
async pull() {
|
||||
if (this.#engine == null) {
|
||||
throw new Error("sync is disabled as database was opened without sync support")
|
||||
}
|
||||
const changes = await this.#guards.wait(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.wait()));
|
||||
if (changes.empty()) {
|
||||
return false;
|
||||
@@ -105,25 +127,37 @@ class Database extends DatabasePromise {
|
||||
* if {@link DatabaseOpts.transform} is set - then provided callback will be called for every mutation before sending it to the remote
|
||||
*/
|
||||
async push() {
|
||||
if (this.#engine == null) {
|
||||
throw new Error("sync is disabled as database was opened without sync support")
|
||||
}
|
||||
await this.#guards.push(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.push()));
|
||||
}
|
||||
/**
|
||||
* checkpoint WAL for local database
|
||||
*/
|
||||
async checkpoint() {
|
||||
if (this.#engine == null) {
|
||||
throw new Error("sync is disabled as database was opened without sync support")
|
||||
}
|
||||
await this.#guards.checkpoint(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.checkpoint()));
|
||||
}
|
||||
/**
|
||||
* @returns statistic of current local database
|
||||
*/
|
||||
async stats(): Promise<DatabaseStats> {
|
||||
if (this.#engine == null) {
|
||||
throw new Error("sync is disabled as database was opened without sync support")
|
||||
}
|
||||
return (await run(this.#runOpts, this.#io, this.#engine, this.#engine.stats()));
|
||||
}
|
||||
/**
|
||||
* close the database
|
||||
*/
|
||||
override async close(): Promise<void> {
|
||||
this.#engine.close();
|
||||
await super.close();
|
||||
if (this.#engine != null) {
|
||||
this.#engine.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
3
bindings/javascript/sync/packages/wasm/cp-entrypoint.sh
Normal file
3
bindings/javascript/sync/packages/wasm/cp-entrypoint.sh
Normal file
@@ -0,0 +1,3 @@
|
||||
sed 's/index-default.js/index-bundle.js/g' promise-default.ts > promise-bundle.ts
|
||||
sed 's/index-default.js/index-turbopack-hack.js/g' promise-default.ts > promise-turbopack-hack.ts
|
||||
sed 's/index-default.js/index-vite-dev-hack.js/g' promise-default.ts > promise-vite-dev-hack.ts
|
||||
@@ -1,7 +1,7 @@
|
||||
import { registerFileAtWorker, unregisterFileAtWorker } from "@tursodatabase/database-wasm-common"
|
||||
import { DatabasePromise } from "@tursodatabase/database-common"
|
||||
import { ProtocolIo, run, DatabaseOpts, EncryptionOpts, RunOpts, DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult, DatabaseStats, SyncEngineGuards } from "@tursodatabase/sync-common";
|
||||
import { SyncEngine, SyncEngineProtocolVersion, initThreadPool, MainWorker } from "./index-bundle.js";
|
||||
import { SyncEngine, SyncEngineProtocolVersion, initThreadPool, MainWorker, Database as NativeDatabase } from "./index-bundle.js";
|
||||
|
||||
let BrowserIO: ProtocolIo = {
|
||||
async read(path: string): Promise<Buffer | Uint8Array | null> {
|
||||
@@ -45,6 +45,12 @@ class Database extends DatabasePromise {
|
||||
#guards: SyncEngineGuards;
|
||||
#worker: Worker | null;
|
||||
constructor(opts: DatabaseOpts) {
|
||||
if (opts.url == null) {
|
||||
super(new NativeDatabase(opts.path, { tracing: opts.tracing }) as any);
|
||||
this.#engine = null;
|
||||
return;
|
||||
}
|
||||
|
||||
const engine = new SyncEngine({
|
||||
path: opts.path,
|
||||
clientName: opts.clientName,
|
||||
@@ -52,23 +58,31 @@ class Database extends DatabasePromise {
|
||||
protocolVersion: SyncEngineProtocolVersion.V1,
|
||||
longPollTimeoutMs: opts.longPollTimeoutMs,
|
||||
tracing: opts.tracing,
|
||||
bootstrapIfEmpty: typeof opts.url != "function" || opts.url() != null,
|
||||
remoteEncryption: opts.remoteEncryption?.cipher,
|
||||
});
|
||||
super(engine.db() as unknown as any);
|
||||
|
||||
|
||||
let headers = typeof opts.authToken === "function" ? () => ({
|
||||
...(opts.authToken != null && { "Authorization": `Bearer ${(opts.authToken as any)()}` }),
|
||||
...(opts.encryption != null && {
|
||||
"x-turso-encryption-key": opts.encryption.key,
|
||||
"x-turso-encryption-cipher": opts.encryption.cipher,
|
||||
})
|
||||
}) : {
|
||||
...(opts.authToken != null && { "Authorization": `Bearer ${opts.authToken}` }),
|
||||
...(opts.encryption != null && {
|
||||
"x-turso-encryption-key": opts.encryption.key,
|
||||
"x-turso-encryption-cipher": opts.encryption.cipher,
|
||||
})
|
||||
};
|
||||
let headers: { [K: string]: string } | (() => Promise<{ [K: string]: string }>);
|
||||
if (typeof opts.authToken == "function") {
|
||||
const authToken = opts.authToken;
|
||||
headers = async () => ({
|
||||
...(opts.authToken != null && { "Authorization": `Bearer ${await authToken()}` }),
|
||||
...(opts.remoteEncryption != null && {
|
||||
"x-turso-encryption-key": opts.remoteEncryption.key,
|
||||
"x-turso-encryption-cipher": opts.remoteEncryption.cipher,
|
||||
})
|
||||
});
|
||||
} else {
|
||||
const authToken = opts.authToken;
|
||||
headers = {
|
||||
...(opts.authToken != null && { "Authorization": `Bearer ${authToken}` }),
|
||||
...(opts.remoteEncryption != null && {
|
||||
"x-turso-encryption-key": opts.remoteEncryption.key,
|
||||
"x-turso-encryption-cipher": opts.remoteEncryption.cipher,
|
||||
})
|
||||
};
|
||||
}
|
||||
this.#runOpts = {
|
||||
url: opts.url,
|
||||
headers: headers,
|
||||
@@ -83,18 +97,23 @@ class Database extends DatabasePromise {
|
||||
* connect database and initialize it in case of clean start
|
||||
*/
|
||||
override async connect() {
|
||||
if (this.connected) { return; }
|
||||
if (!this.memory) {
|
||||
this.#worker = await init();
|
||||
await Promise.all([
|
||||
registerFileAtWorker(this.#worker, this.name),
|
||||
registerFileAtWorker(this.#worker, `${this.name}-wal`),
|
||||
registerFileAtWorker(this.#worker, `${this.name}-wal-revert`),
|
||||
registerFileAtWorker(this.#worker, `${this.name}-info`),
|
||||
registerFileAtWorker(this.#worker, `${this.name}-changes`),
|
||||
]);
|
||||
if (this.connected) {
|
||||
return;
|
||||
} else if (this.#engine == null) {
|
||||
await super.connect();
|
||||
} else {
|
||||
if (!this.memory) {
|
||||
this.#worker = await init();
|
||||
await Promise.all([
|
||||
registerFileAtWorker(this.#worker, this.name),
|
||||
registerFileAtWorker(this.#worker, `${this.name}-wal`),
|
||||
registerFileAtWorker(this.#worker, `${this.name}-wal-revert`),
|
||||
registerFileAtWorker(this.#worker, `${this.name}-info`),
|
||||
registerFileAtWorker(this.#worker, `${this.name}-changes`),
|
||||
]);
|
||||
}
|
||||
await run(this.#runOpts, this.#io, this.#engine, this.#engine.connect());
|
||||
}
|
||||
await run(this.#runOpts, this.#io, this.#engine, this.#engine.connect());
|
||||
this.connected = true;
|
||||
}
|
||||
/**
|
||||
@@ -103,6 +122,9 @@ class Database extends DatabasePromise {
|
||||
* @returns true if new changes were pulled from the remote
|
||||
*/
|
||||
async pull() {
|
||||
if (this.#engine == null) {
|
||||
throw new Error("sync is disabled as database was opened without sync support")
|
||||
}
|
||||
const changes = await this.#guards.wait(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.wait()));
|
||||
if (changes.empty()) {
|
||||
return false;
|
||||
@@ -115,35 +137,46 @@ class Database extends DatabasePromise {
|
||||
* if {@link DatabaseOpts.transform} is set - then provided callback will be called for every mutation before sending it to the remote
|
||||
*/
|
||||
async push() {
|
||||
if (this.#engine == null) {
|
||||
throw new Error("sync is disabled as database was opened without sync support")
|
||||
}
|
||||
await this.#guards.push(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.push()));
|
||||
}
|
||||
/**
|
||||
* checkpoint WAL for local database
|
||||
*/
|
||||
async checkpoint() {
|
||||
if (this.#engine == null) {
|
||||
throw new Error("sync is disabled as database was opened without sync support")
|
||||
}
|
||||
await this.#guards.checkpoint(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.checkpoint()));
|
||||
}
|
||||
/**
|
||||
* @returns statistic of current local database
|
||||
*/
|
||||
async stats(): Promise<DatabaseStats> {
|
||||
if (this.#engine == null) {
|
||||
throw new Error("sync is disabled as database was opened without sync support")
|
||||
}
|
||||
return (await run(this.#runOpts, this.#io, this.#engine, this.#engine.stats()));
|
||||
}
|
||||
/**
|
||||
* close the database and relevant files
|
||||
*/
|
||||
async close() {
|
||||
if (this.name != null && this.#worker != null) {
|
||||
await Promise.all([
|
||||
unregisterFileAtWorker(this.#worker, this.name),
|
||||
unregisterFileAtWorker(this.#worker, `${this.name}-wal`),
|
||||
unregisterFileAtWorker(this.#worker, `${this.name}-wal-revert`),
|
||||
unregisterFileAtWorker(this.#worker, `${this.name}-info`),
|
||||
unregisterFileAtWorker(this.#worker, `${this.name}-changes`),
|
||||
]);
|
||||
if (this.#engine != null) {
|
||||
if (this.name != null && this.#worker != null) {
|
||||
await Promise.all([
|
||||
unregisterFileAtWorker(this.#worker, this.name),
|
||||
unregisterFileAtWorker(this.#worker, `${this.name}-wal`),
|
||||
unregisterFileAtWorker(this.#worker, `${this.name}-wal-revert`),
|
||||
unregisterFileAtWorker(this.#worker, `${this.name}-info`),
|
||||
unregisterFileAtWorker(this.#worker, `${this.name}-changes`),
|
||||
]);
|
||||
}
|
||||
this.#engine.close();
|
||||
}
|
||||
await super.close();
|
||||
this.#engine.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import { registerFileAtWorker, unregisterFileAtWorker } from "@tursodatabase/database-wasm-common"
|
||||
import { DatabasePromise } from "@tursodatabase/database-common"
|
||||
import { ProtocolIo, run, DatabaseOpts, EncryptionOpts, RunOpts, DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult, DatabaseStats, SyncEngineGuards } from "@tursodatabase/sync-common";
|
||||
import { SyncEngine, SyncEngineProtocolVersion, initThreadPool, MainWorker } from "./index-default.js";
|
||||
import { SyncEngine, SyncEngineProtocolVersion, initThreadPool, MainWorker, Database as NativeDatabase } from "./index-default.js";
|
||||
|
||||
let BrowserIO: ProtocolIo = {
|
||||
async read(path: string): Promise<Buffer | Uint8Array | null> {
|
||||
@@ -45,6 +45,12 @@ class Database extends DatabasePromise {
|
||||
#guards: SyncEngineGuards;
|
||||
#worker: Worker | null;
|
||||
constructor(opts: DatabaseOpts) {
|
||||
if (opts.url == null) {
|
||||
super(new NativeDatabase(opts.path, { tracing: opts.tracing }) as any);
|
||||
this.#engine = null;
|
||||
return;
|
||||
}
|
||||
|
||||
const engine = new SyncEngine({
|
||||
path: opts.path,
|
||||
clientName: opts.clientName,
|
||||
@@ -52,23 +58,31 @@ class Database extends DatabasePromise {
|
||||
protocolVersion: SyncEngineProtocolVersion.V1,
|
||||
longPollTimeoutMs: opts.longPollTimeoutMs,
|
||||
tracing: opts.tracing,
|
||||
bootstrapIfEmpty: typeof opts.url != "function" || opts.url() != null,
|
||||
remoteEncryption: opts.remoteEncryption?.cipher,
|
||||
});
|
||||
super(engine.db() as unknown as any);
|
||||
|
||||
|
||||
let headers = typeof opts.authToken === "function" ? () => ({
|
||||
...(opts.authToken != null && { "Authorization": `Bearer ${(opts.authToken as any)()}` }),
|
||||
...(opts.encryption != null && {
|
||||
"x-turso-encryption-key": opts.encryption.key,
|
||||
"x-turso-encryption-cipher": opts.encryption.cipher,
|
||||
})
|
||||
}) : {
|
||||
...(opts.authToken != null && { "Authorization": `Bearer ${opts.authToken}` }),
|
||||
...(opts.encryption != null && {
|
||||
"x-turso-encryption-key": opts.encryption.key,
|
||||
"x-turso-encryption-cipher": opts.encryption.cipher,
|
||||
})
|
||||
};
|
||||
let headers: { [K: string]: string } | (() => Promise<{ [K: string]: string }>);
|
||||
if (typeof opts.authToken == "function") {
|
||||
const authToken = opts.authToken;
|
||||
headers = async () => ({
|
||||
...(opts.authToken != null && { "Authorization": `Bearer ${await authToken()}` }),
|
||||
...(opts.remoteEncryption != null && {
|
||||
"x-turso-encryption-key": opts.remoteEncryption.key,
|
||||
"x-turso-encryption-cipher": opts.remoteEncryption.cipher,
|
||||
})
|
||||
});
|
||||
} else {
|
||||
const authToken = opts.authToken;
|
||||
headers = {
|
||||
...(opts.authToken != null && { "Authorization": `Bearer ${authToken}` }),
|
||||
...(opts.remoteEncryption != null && {
|
||||
"x-turso-encryption-key": opts.remoteEncryption.key,
|
||||
"x-turso-encryption-cipher": opts.remoteEncryption.cipher,
|
||||
})
|
||||
};
|
||||
}
|
||||
this.#runOpts = {
|
||||
url: opts.url,
|
||||
headers: headers,
|
||||
@@ -83,18 +97,23 @@ class Database extends DatabasePromise {
|
||||
* connect database and initialize it in case of clean start
|
||||
*/
|
||||
override async connect() {
|
||||
if (this.connected) { return; }
|
||||
if (!this.memory) {
|
||||
this.#worker = await init();
|
||||
await Promise.all([
|
||||
registerFileAtWorker(this.#worker, this.name),
|
||||
registerFileAtWorker(this.#worker, `${this.name}-wal`),
|
||||
registerFileAtWorker(this.#worker, `${this.name}-wal-revert`),
|
||||
registerFileAtWorker(this.#worker, `${this.name}-info`),
|
||||
registerFileAtWorker(this.#worker, `${this.name}-changes`),
|
||||
]);
|
||||
if (this.connected) {
|
||||
return;
|
||||
} else if (this.#engine == null) {
|
||||
await super.connect();
|
||||
} else {
|
||||
if (!this.memory) {
|
||||
this.#worker = await init();
|
||||
await Promise.all([
|
||||
registerFileAtWorker(this.#worker, this.name),
|
||||
registerFileAtWorker(this.#worker, `${this.name}-wal`),
|
||||
registerFileAtWorker(this.#worker, `${this.name}-wal-revert`),
|
||||
registerFileAtWorker(this.#worker, `${this.name}-info`),
|
||||
registerFileAtWorker(this.#worker, `${this.name}-changes`),
|
||||
]);
|
||||
}
|
||||
await run(this.#runOpts, this.#io, this.#engine, this.#engine.connect());
|
||||
}
|
||||
await run(this.#runOpts, this.#io, this.#engine, this.#engine.connect());
|
||||
this.connected = true;
|
||||
}
|
||||
/**
|
||||
@@ -103,6 +122,9 @@ class Database extends DatabasePromise {
|
||||
* @returns true if new changes were pulled from the remote
|
||||
*/
|
||||
async pull() {
|
||||
if (this.#engine == null) {
|
||||
throw new Error("sync is disabled as database was opened without sync support")
|
||||
}
|
||||
const changes = await this.#guards.wait(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.wait()));
|
||||
if (changes.empty()) {
|
||||
return false;
|
||||
@@ -115,35 +137,46 @@ class Database extends DatabasePromise {
|
||||
* if {@link DatabaseOpts.transform} is set - then provided callback will be called for every mutation before sending it to the remote
|
||||
*/
|
||||
async push() {
|
||||
if (this.#engine == null) {
|
||||
throw new Error("sync is disabled as database was opened without sync support")
|
||||
}
|
||||
await this.#guards.push(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.push()));
|
||||
}
|
||||
/**
|
||||
* checkpoint WAL for local database
|
||||
*/
|
||||
async checkpoint() {
|
||||
if (this.#engine == null) {
|
||||
throw new Error("sync is disabled as database was opened without sync support")
|
||||
}
|
||||
await this.#guards.checkpoint(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.checkpoint()));
|
||||
}
|
||||
/**
|
||||
* @returns statistic of current local database
|
||||
*/
|
||||
async stats(): Promise<DatabaseStats> {
|
||||
if (this.#engine == null) {
|
||||
throw new Error("sync is disabled as database was opened without sync support")
|
||||
}
|
||||
return (await run(this.#runOpts, this.#io, this.#engine, this.#engine.stats()));
|
||||
}
|
||||
/**
|
||||
* close the database and relevant files
|
||||
*/
|
||||
async close() {
|
||||
if (this.name != null && this.#worker != null) {
|
||||
await Promise.all([
|
||||
unregisterFileAtWorker(this.#worker, this.name),
|
||||
unregisterFileAtWorker(this.#worker, `${this.name}-wal`),
|
||||
unregisterFileAtWorker(this.#worker, `${this.name}-wal-revert`),
|
||||
unregisterFileAtWorker(this.#worker, `${this.name}-info`),
|
||||
unregisterFileAtWorker(this.#worker, `${this.name}-changes`),
|
||||
]);
|
||||
if (this.#engine != null) {
|
||||
if (this.name != null && this.#worker != null) {
|
||||
await Promise.all([
|
||||
unregisterFileAtWorker(this.#worker, this.name),
|
||||
unregisterFileAtWorker(this.#worker, `${this.name}-wal`),
|
||||
unregisterFileAtWorker(this.#worker, `${this.name}-wal-revert`),
|
||||
unregisterFileAtWorker(this.#worker, `${this.name}-info`),
|
||||
unregisterFileAtWorker(this.#worker, `${this.name}-changes`),
|
||||
]);
|
||||
}
|
||||
this.#engine.close();
|
||||
}
|
||||
await super.close();
|
||||
this.#engine.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import { registerFileAtWorker, unregisterFileAtWorker } from "@tursodatabase/database-wasm-common"
|
||||
import { DatabasePromise } from "@tursodatabase/database-common"
|
||||
import { ProtocolIo, run, DatabaseOpts, EncryptionOpts, RunOpts, DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult, DatabaseStats, SyncEngineGuards } from "@tursodatabase/sync-common";
|
||||
import { SyncEngine, SyncEngineProtocolVersion, initThreadPool, MainWorker } from "./index-turbopack-hack.js";
|
||||
import { SyncEngine, SyncEngineProtocolVersion, initThreadPool, MainWorker, Database as NativeDatabase } from "./index-turbopack-hack.js";
|
||||
|
||||
let BrowserIO: ProtocolIo = {
|
||||
async read(path: string): Promise<Buffer | Uint8Array | null> {
|
||||
@@ -45,6 +45,12 @@ class Database extends DatabasePromise {
|
||||
#guards: SyncEngineGuards;
|
||||
#worker: Worker | null;
|
||||
constructor(opts: DatabaseOpts) {
|
||||
if (opts.url == null) {
|
||||
super(new NativeDatabase(opts.path, { tracing: opts.tracing }) as any);
|
||||
this.#engine = null;
|
||||
return;
|
||||
}
|
||||
|
||||
const engine = new SyncEngine({
|
||||
path: opts.path,
|
||||
clientName: opts.clientName,
|
||||
@@ -52,23 +58,31 @@ class Database extends DatabasePromise {
|
||||
protocolVersion: SyncEngineProtocolVersion.V1,
|
||||
longPollTimeoutMs: opts.longPollTimeoutMs,
|
||||
tracing: opts.tracing,
|
||||
bootstrapIfEmpty: typeof opts.url != "function" || opts.url() != null,
|
||||
remoteEncryption: opts.remoteEncryption?.cipher,
|
||||
});
|
||||
super(engine.db() as unknown as any);
|
||||
|
||||
|
||||
let headers = typeof opts.authToken === "function" ? () => ({
|
||||
...(opts.authToken != null && { "Authorization": `Bearer ${(opts.authToken as any)()}` }),
|
||||
...(opts.encryption != null && {
|
||||
"x-turso-encryption-key": opts.encryption.key,
|
||||
"x-turso-encryption-cipher": opts.encryption.cipher,
|
||||
})
|
||||
}) : {
|
||||
...(opts.authToken != null && { "Authorization": `Bearer ${opts.authToken}` }),
|
||||
...(opts.encryption != null && {
|
||||
"x-turso-encryption-key": opts.encryption.key,
|
||||
"x-turso-encryption-cipher": opts.encryption.cipher,
|
||||
})
|
||||
};
|
||||
let headers: { [K: string]: string } | (() => Promise<{ [K: string]: string }>);
|
||||
if (typeof opts.authToken == "function") {
|
||||
const authToken = opts.authToken;
|
||||
headers = async () => ({
|
||||
...(opts.authToken != null && { "Authorization": `Bearer ${await authToken()}` }),
|
||||
...(opts.remoteEncryption != null && {
|
||||
"x-turso-encryption-key": opts.remoteEncryption.key,
|
||||
"x-turso-encryption-cipher": opts.remoteEncryption.cipher,
|
||||
})
|
||||
});
|
||||
} else {
|
||||
const authToken = opts.authToken;
|
||||
headers = {
|
||||
...(opts.authToken != null && { "Authorization": `Bearer ${authToken}` }),
|
||||
...(opts.remoteEncryption != null && {
|
||||
"x-turso-encryption-key": opts.remoteEncryption.key,
|
||||
"x-turso-encryption-cipher": opts.remoteEncryption.cipher,
|
||||
})
|
||||
};
|
||||
}
|
||||
this.#runOpts = {
|
||||
url: opts.url,
|
||||
headers: headers,
|
||||
@@ -83,18 +97,23 @@ class Database extends DatabasePromise {
|
||||
* connect database and initialize it in case of clean start
|
||||
*/
|
||||
override async connect() {
|
||||
if (this.connected) { return; }
|
||||
if (!this.memory) {
|
||||
this.#worker = await init();
|
||||
await Promise.all([
|
||||
registerFileAtWorker(this.#worker, this.name),
|
||||
registerFileAtWorker(this.#worker, `${this.name}-wal`),
|
||||
registerFileAtWorker(this.#worker, `${this.name}-wal-revert`),
|
||||
registerFileAtWorker(this.#worker, `${this.name}-info`),
|
||||
registerFileAtWorker(this.#worker, `${this.name}-changes`),
|
||||
]);
|
||||
if (this.connected) {
|
||||
return;
|
||||
} else if (this.#engine == null) {
|
||||
await super.connect();
|
||||
} else {
|
||||
if (!this.memory) {
|
||||
this.#worker = await init();
|
||||
await Promise.all([
|
||||
registerFileAtWorker(this.#worker, this.name),
|
||||
registerFileAtWorker(this.#worker, `${this.name}-wal`),
|
||||
registerFileAtWorker(this.#worker, `${this.name}-wal-revert`),
|
||||
registerFileAtWorker(this.#worker, `${this.name}-info`),
|
||||
registerFileAtWorker(this.#worker, `${this.name}-changes`),
|
||||
]);
|
||||
}
|
||||
await run(this.#runOpts, this.#io, this.#engine, this.#engine.connect());
|
||||
}
|
||||
await run(this.#runOpts, this.#io, this.#engine, this.#engine.connect());
|
||||
this.connected = true;
|
||||
}
|
||||
/**
|
||||
@@ -103,6 +122,9 @@ class Database extends DatabasePromise {
|
||||
* @returns true if new changes were pulled from the remote
|
||||
*/
|
||||
async pull() {
|
||||
if (this.#engine == null) {
|
||||
throw new Error("sync is disabled as database was opened without sync support")
|
||||
}
|
||||
const changes = await this.#guards.wait(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.wait()));
|
||||
if (changes.empty()) {
|
||||
return false;
|
||||
@@ -115,35 +137,46 @@ class Database extends DatabasePromise {
|
||||
* if {@link DatabaseOpts.transform} is set - then provided callback will be called for every mutation before sending it to the remote
|
||||
*/
|
||||
async push() {
|
||||
if (this.#engine == null) {
|
||||
throw new Error("sync is disabled as database was opened without sync support")
|
||||
}
|
||||
await this.#guards.push(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.push()));
|
||||
}
|
||||
/**
|
||||
* checkpoint WAL for local database
|
||||
*/
|
||||
async checkpoint() {
|
||||
if (this.#engine == null) {
|
||||
throw new Error("sync is disabled as database was opened without sync support")
|
||||
}
|
||||
await this.#guards.checkpoint(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.checkpoint()));
|
||||
}
|
||||
/**
|
||||
* @returns statistic of current local database
|
||||
*/
|
||||
async stats(): Promise<DatabaseStats> {
|
||||
if (this.#engine == null) {
|
||||
throw new Error("sync is disabled as database was opened without sync support")
|
||||
}
|
||||
return (await run(this.#runOpts, this.#io, this.#engine, this.#engine.stats()));
|
||||
}
|
||||
/**
|
||||
* close the database and relevant files
|
||||
*/
|
||||
async close() {
|
||||
if (this.name != null && this.#worker != null) {
|
||||
await Promise.all([
|
||||
unregisterFileAtWorker(this.#worker, this.name),
|
||||
unregisterFileAtWorker(this.#worker, `${this.name}-wal`),
|
||||
unregisterFileAtWorker(this.#worker, `${this.name}-wal-revert`),
|
||||
unregisterFileAtWorker(this.#worker, `${this.name}-info`),
|
||||
unregisterFileAtWorker(this.#worker, `${this.name}-changes`),
|
||||
]);
|
||||
if (this.#engine != null) {
|
||||
if (this.name != null && this.#worker != null) {
|
||||
await Promise.all([
|
||||
unregisterFileAtWorker(this.#worker, this.name),
|
||||
unregisterFileAtWorker(this.#worker, `${this.name}-wal`),
|
||||
unregisterFileAtWorker(this.#worker, `${this.name}-wal-revert`),
|
||||
unregisterFileAtWorker(this.#worker, `${this.name}-info`),
|
||||
unregisterFileAtWorker(this.#worker, `${this.name}-changes`),
|
||||
]);
|
||||
}
|
||||
this.#engine.close();
|
||||
}
|
||||
await super.close();
|
||||
this.#engine.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import { registerFileAtWorker, unregisterFileAtWorker } from "@tursodatabase/database-wasm-common"
|
||||
import { DatabasePromise } from "@tursodatabase/database-common"
|
||||
import { ProtocolIo, run, DatabaseOpts, EncryptionOpts, RunOpts, DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult, DatabaseStats, SyncEngineGuards } from "@tursodatabase/sync-common";
|
||||
import { SyncEngine, SyncEngineProtocolVersion, initThreadPool, MainWorker } from "./index-vite-dev-hack.js";
|
||||
import { SyncEngine, SyncEngineProtocolVersion, initThreadPool, MainWorker, Database as NativeDatabase } from "./index-vite-dev-hack.js";
|
||||
|
||||
let BrowserIO: ProtocolIo = {
|
||||
async read(path: string): Promise<Buffer | Uint8Array | null> {
|
||||
@@ -45,6 +45,12 @@ class Database extends DatabasePromise {
|
||||
#guards: SyncEngineGuards;
|
||||
#worker: Worker | null;
|
||||
constructor(opts: DatabaseOpts) {
|
||||
if (opts.url == null) {
|
||||
super(new NativeDatabase(opts.path, { tracing: opts.tracing }) as any);
|
||||
this.#engine = null;
|
||||
return;
|
||||
}
|
||||
|
||||
const engine = new SyncEngine({
|
||||
path: opts.path,
|
||||
clientName: opts.clientName,
|
||||
@@ -52,23 +58,31 @@ class Database extends DatabasePromise {
|
||||
protocolVersion: SyncEngineProtocolVersion.V1,
|
||||
longPollTimeoutMs: opts.longPollTimeoutMs,
|
||||
tracing: opts.tracing,
|
||||
bootstrapIfEmpty: typeof opts.url != "function" || opts.url() != null,
|
||||
remoteEncryption: opts.remoteEncryption?.cipher,
|
||||
});
|
||||
super(engine.db() as unknown as any);
|
||||
|
||||
|
||||
let headers = typeof opts.authToken === "function" ? () => ({
|
||||
...(opts.authToken != null && { "Authorization": `Bearer ${(opts.authToken as any)()}` }),
|
||||
...(opts.encryption != null && {
|
||||
"x-turso-encryption-key": opts.encryption.key,
|
||||
"x-turso-encryption-cipher": opts.encryption.cipher,
|
||||
})
|
||||
}) : {
|
||||
...(opts.authToken != null && { "Authorization": `Bearer ${opts.authToken}` }),
|
||||
...(opts.encryption != null && {
|
||||
"x-turso-encryption-key": opts.encryption.key,
|
||||
"x-turso-encryption-cipher": opts.encryption.cipher,
|
||||
})
|
||||
};
|
||||
let headers: { [K: string]: string } | (() => Promise<{ [K: string]: string }>);
|
||||
if (typeof opts.authToken == "function") {
|
||||
const authToken = opts.authToken;
|
||||
headers = async () => ({
|
||||
...(opts.authToken != null && { "Authorization": `Bearer ${await authToken()}` }),
|
||||
...(opts.remoteEncryption != null && {
|
||||
"x-turso-encryption-key": opts.remoteEncryption.key,
|
||||
"x-turso-encryption-cipher": opts.remoteEncryption.cipher,
|
||||
})
|
||||
});
|
||||
} else {
|
||||
const authToken = opts.authToken;
|
||||
headers = {
|
||||
...(opts.authToken != null && { "Authorization": `Bearer ${authToken}` }),
|
||||
...(opts.remoteEncryption != null && {
|
||||
"x-turso-encryption-key": opts.remoteEncryption.key,
|
||||
"x-turso-encryption-cipher": opts.remoteEncryption.cipher,
|
||||
})
|
||||
};
|
||||
}
|
||||
this.#runOpts = {
|
||||
url: opts.url,
|
||||
headers: headers,
|
||||
@@ -83,18 +97,23 @@ class Database extends DatabasePromise {
|
||||
* connect database and initialize it in case of clean start
|
||||
*/
|
||||
override async connect() {
|
||||
if (this.connected) { return; }
|
||||
if (!this.memory) {
|
||||
this.#worker = await init();
|
||||
await Promise.all([
|
||||
registerFileAtWorker(this.#worker, this.name),
|
||||
registerFileAtWorker(this.#worker, `${this.name}-wal`),
|
||||
registerFileAtWorker(this.#worker, `${this.name}-wal-revert`),
|
||||
registerFileAtWorker(this.#worker, `${this.name}-info`),
|
||||
registerFileAtWorker(this.#worker, `${this.name}-changes`),
|
||||
]);
|
||||
if (this.connected) {
|
||||
return;
|
||||
} else if (this.#engine == null) {
|
||||
await super.connect();
|
||||
} else {
|
||||
if (!this.memory) {
|
||||
this.#worker = await init();
|
||||
await Promise.all([
|
||||
registerFileAtWorker(this.#worker, this.name),
|
||||
registerFileAtWorker(this.#worker, `${this.name}-wal`),
|
||||
registerFileAtWorker(this.#worker, `${this.name}-wal-revert`),
|
||||
registerFileAtWorker(this.#worker, `${this.name}-info`),
|
||||
registerFileAtWorker(this.#worker, `${this.name}-changes`),
|
||||
]);
|
||||
}
|
||||
await run(this.#runOpts, this.#io, this.#engine, this.#engine.connect());
|
||||
}
|
||||
await run(this.#runOpts, this.#io, this.#engine, this.#engine.connect());
|
||||
this.connected = true;
|
||||
}
|
||||
/**
|
||||
@@ -103,6 +122,9 @@ class Database extends DatabasePromise {
|
||||
* @returns true if new changes were pulled from the remote
|
||||
*/
|
||||
async pull() {
|
||||
if (this.#engine == null) {
|
||||
throw new Error("sync is disabled as database was opened without sync support")
|
||||
}
|
||||
const changes = await this.#guards.wait(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.wait()));
|
||||
if (changes.empty()) {
|
||||
return false;
|
||||
@@ -115,35 +137,46 @@ class Database extends DatabasePromise {
|
||||
* if {@link DatabaseOpts.transform} is set - then provided callback will be called for every mutation before sending it to the remote
|
||||
*/
|
||||
async push() {
|
||||
if (this.#engine == null) {
|
||||
throw new Error("sync is disabled as database was opened without sync support")
|
||||
}
|
||||
await this.#guards.push(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.push()));
|
||||
}
|
||||
/**
|
||||
* checkpoint WAL for local database
|
||||
*/
|
||||
async checkpoint() {
|
||||
if (this.#engine == null) {
|
||||
throw new Error("sync is disabled as database was opened without sync support")
|
||||
}
|
||||
await this.#guards.checkpoint(async () => await run(this.#runOpts, this.#io, this.#engine, this.#engine.checkpoint()));
|
||||
}
|
||||
/**
|
||||
* @returns statistic of current local database
|
||||
*/
|
||||
async stats(): Promise<DatabaseStats> {
|
||||
if (this.#engine == null) {
|
||||
throw new Error("sync is disabled as database was opened without sync support")
|
||||
}
|
||||
return (await run(this.#runOpts, this.#io, this.#engine, this.#engine.stats()));
|
||||
}
|
||||
/**
|
||||
* close the database and relevant files
|
||||
*/
|
||||
async close() {
|
||||
if (this.name != null && this.#worker != null) {
|
||||
await Promise.all([
|
||||
unregisterFileAtWorker(this.#worker, this.name),
|
||||
unregisterFileAtWorker(this.#worker, `${this.name}-wal`),
|
||||
unregisterFileAtWorker(this.#worker, `${this.name}-wal-revert`),
|
||||
unregisterFileAtWorker(this.#worker, `${this.name}-info`),
|
||||
unregisterFileAtWorker(this.#worker, `${this.name}-changes`),
|
||||
]);
|
||||
if (this.#engine != null) {
|
||||
if (this.name != null && this.#worker != null) {
|
||||
await Promise.all([
|
||||
unregisterFileAtWorker(this.#worker, this.name),
|
||||
unregisterFileAtWorker(this.#worker, `${this.name}-wal`),
|
||||
unregisterFileAtWorker(this.#worker, `${this.name}-wal-revert`),
|
||||
unregisterFileAtWorker(this.#worker, `${this.name}-info`),
|
||||
unregisterFileAtWorker(this.#worker, `${this.name}-changes`),
|
||||
]);
|
||||
}
|
||||
this.#engine.close();
|
||||
}
|
||||
await super.close();
|
||||
this.#engine.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@ import { expect, test } from 'vitest'
|
||||
import { Database, connect, DatabaseRowMutation, DatabaseRowTransformResult } from './promise-default.js'
|
||||
|
||||
const localeCompare = (a, b) => a.x.localeCompare(b.x);
|
||||
const intCompare = (a, b) => a.x - b.x;
|
||||
|
||||
test('implicit connect', async () => {
|
||||
const db = new Database({ path: ':memory:', url: process.env.VITE_TURSO_DB_URL });
|
||||
@@ -11,6 +12,91 @@ test('implicit connect', async () => {
|
||||
expect(await db.prepare("SELECT 1 as x").all()).toEqual([{ x: 1 }]);
|
||||
})
|
||||
|
||||
test('simple-db', async () => {
|
||||
const db = new Database({ path: ':memory:' });
|
||||
expect(await db.prepare("SELECT 1 as x").all()).toEqual([{ x: 1 }])
|
||||
await db.exec("CREATE TABLE t(x)");
|
||||
await db.exec("INSERT INTO t VALUES (1), (2), (3)");
|
||||
expect(await db.prepare("SELECT * FROM t").all()).toEqual([{ x: 1 }, { x: 2 }, { x: 3 }])
|
||||
await expect(async () => await db.pull()).rejects.toThrowError(/sync is disabled as database was opened without sync support/);
|
||||
})
|
||||
|
||||
test('implicit connect', async () => {
|
||||
const db = new Database({ path: ':memory:', url: process.env.VITE_TURSO_DB_URL });
|
||||
const defer = db.prepare("SELECT * FROM not_found");
|
||||
await expect(async () => await defer.all()).rejects.toThrowError(/no such table: not_found/);
|
||||
expect(() => db.prepare("SELECT * FROM not_found")).toThrowError(/no such table: not_found/);
|
||||
expect(await db.prepare("SELECT 1 as x").all()).toEqual([{ x: 1 }]);
|
||||
})
|
||||
|
||||
test('defered sync', async () => {
|
||||
{
|
||||
const db = await connect({ path: ':memory:', url: process.env.VITE_TURSO_DB_URL });
|
||||
await db.exec("CREATE TABLE IF NOT EXISTS t(x)");
|
||||
await db.exec("DELETE FROM t");
|
||||
await db.exec("INSERT INTO t VALUES (100)");
|
||||
await db.push();
|
||||
await db.close();
|
||||
}
|
||||
|
||||
let url = null;
|
||||
const db = new Database({ path: ':memory:', url: () => url });
|
||||
await db.prepare("CREATE TABLE t(x)").run();
|
||||
await db.prepare("INSERT INTO t VALUES (1), (2), (3), (42)").run();
|
||||
expect(await db.prepare("SELECT * FROM t").all()).toEqual([{ x: 1 }, { x: 2 }, { x: 3 }, { x: 42 }]);
|
||||
await expect(async () => await db.pull()).rejects.toThrow(/url is empty - sync is paused/);
|
||||
url = process.env.VITE_TURSO_DB_URL;
|
||||
await db.pull();
|
||||
expect(await db.prepare("SELECT * FROM t").all()).toEqual([{ x: 100 }, { x: 1 }, { x: 2 }, { x: 3 }, { x: 42 }]);
|
||||
})
|
||||
|
||||
test('encryption sync', async () => {
|
||||
const KEY = 'l/FWopMfZisTLgBX4A42AergrCrYKjiO3BfkJUwv83I=';
|
||||
const URL = 'http://encrypted--a--a.localhost:10000';
|
||||
{
|
||||
const db = await connect({ path: ':memory:', url: URL, remoteEncryption: { key: KEY, cipher: 'aes256gcm' } });
|
||||
await db.exec("CREATE TABLE IF NOT EXISTS t(x)");
|
||||
await db.exec("DELETE FROM t");
|
||||
await db.push();
|
||||
await db.close();
|
||||
}
|
||||
const db1 = await connect({ path: ':memory:', url: URL, remoteEncryption: { key: KEY, cipher: 'aes256gcm' } });
|
||||
const db2 = await connect({ path: ':memory:', url: URL, remoteEncryption: { key: KEY, cipher: 'aes256gcm' } });
|
||||
await db1.exec("INSERT INTO t VALUES (1), (2), (3)");
|
||||
await db2.exec("INSERT INTO t VALUES (4), (5), (6)");
|
||||
expect(await db1.prepare("SELECT * FROM t").all()).toEqual([{ x: 1 }, { x: 2 }, { x: 3 }]);
|
||||
expect(await db2.prepare("SELECT * FROM t").all()).toEqual([{ x: 4 }, { x: 5 }, { x: 6 }]);
|
||||
await Promise.all([db1.push(), db2.push()]);
|
||||
await Promise.all([db1.pull(), db2.pull()]);
|
||||
const expected = [{ x: 1 }, { x: 2 }, { x: 3 }, { x: 4 }, { x: 5 }, { x: 6 }];
|
||||
expect((await db1.prepare("SELECT * FROM t").all()).sort(intCompare)).toEqual(expected.sort(intCompare));
|
||||
expect((await db2.prepare("SELECT * FROM t").all()).sort(intCompare)).toEqual(expected.sort(intCompare));
|
||||
});
|
||||
|
||||
test('defered encryption sync', async () => {
|
||||
const URL = 'http://encrypted--a--a.localhost:10000';
|
||||
const KEY = 'l/FWopMfZisTLgBX4A42AergrCrYKjiO3BfkJUwv83I=';
|
||||
let url = null;
|
||||
{
|
||||
const db = await connect({ path: ':memory:', url: URL, remoteEncryption: { key: KEY, cipher: 'aes256gcm' } });
|
||||
await db.exec("CREATE TABLE IF NOT EXISTS t(x)");
|
||||
await db.exec("DELETE FROM t");
|
||||
await db.exec("INSERT INTO t VALUES (100)");
|
||||
await db.push();
|
||||
await db.close();
|
||||
}
|
||||
const db = await connect({ path: ':memory:', url: () => url, remoteEncryption: { key: KEY, cipher: 'aes256gcm' } });
|
||||
await db.exec("CREATE TABLE IF NOT EXISTS t(x)");
|
||||
await db.exec("INSERT INTO t VALUES (1), (2), (3)");
|
||||
expect(await db.prepare("SELECT * FROM t").all()).toEqual([{ x: 1 }, { x: 2 }, { x: 3 }]);
|
||||
|
||||
url = URL;
|
||||
await db.pull();
|
||||
|
||||
const expected = [{ x: 100 }, { x: 1 }, { x: 2 }, { x: 3 }];
|
||||
expect((await db.prepare("SELECT * FROM t").all())).toEqual(expected);
|
||||
});
|
||||
|
||||
test('select-after-push', async () => {
|
||||
{
|
||||
const db = await connect({ path: ':memory:', url: process.env.VITE_TURSO_DB_URL });
|
||||
|
||||
@@ -45,7 +45,7 @@ pub enum GeneratorResponse {
|
||||
operations: i64,
|
||||
main_wal: i64,
|
||||
revert_wal: i64,
|
||||
last_pull_unix_time: i64,
|
||||
last_pull_unix_time: Option<i64>,
|
||||
last_push_unix_time: Option<i64>,
|
||||
revision: Option<String>,
|
||||
},
|
||||
|
||||
@@ -24,13 +24,7 @@ use crate::{
|
||||
|
||||
#[napi]
|
||||
pub struct SyncEngine {
|
||||
path: String,
|
||||
client_name: String,
|
||||
wal_pull_batch_size: u32,
|
||||
long_poll_timeout: Option<std::time::Duration>,
|
||||
protocol_version: DatabaseSyncEngineProtocolVersion,
|
||||
tables_ignore: Vec<String>,
|
||||
use_transform: bool,
|
||||
opts: SyncEngineOptsFilled,
|
||||
io: Option<Arc<dyn turso_core::IO>>,
|
||||
protocol: Option<Arc<JsProtocolIo>>,
|
||||
sync_engine: Arc<RwLock<Option<DatabaseSyncEngine<JsProtocolIo>>>>,
|
||||
@@ -123,6 +117,49 @@ pub struct SyncEngineOpts {
|
||||
pub tables_ignore: Option<Vec<String>>,
|
||||
pub use_transform: bool,
|
||||
pub protocol_version: Option<SyncEngineProtocolVersion>,
|
||||
pub bootstrap_if_empty: bool,
|
||||
pub remote_encryption: Option<String>,
|
||||
}
|
||||
|
||||
struct SyncEngineOptsFilled {
|
||||
pub path: String,
|
||||
pub client_name: String,
|
||||
pub wal_pull_batch_size: u32,
|
||||
pub long_poll_timeout: Option<std::time::Duration>,
|
||||
pub tables_ignore: Vec<String>,
|
||||
pub use_transform: bool,
|
||||
pub protocol_version: DatabaseSyncEngineProtocolVersion,
|
||||
pub bootstrap_if_empty: bool,
|
||||
pub remote_encryption: Option<CipherMode>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
enum CipherMode {
|
||||
Aes256Gcm = 1,
|
||||
Aes128Gcm = 2,
|
||||
ChaCha20Poly1305 = 3,
|
||||
Aegis256 = 4,
|
||||
}
|
||||
|
||||
impl CipherMode {
|
||||
/// Returns the nonce size for this cipher mode.
|
||||
pub fn required_nonce_size(&self) -> usize {
|
||||
match self {
|
||||
CipherMode::Aes256Gcm | CipherMode::Aes128Gcm | CipherMode::ChaCha20Poly1305 => 12,
|
||||
CipherMode::Aegis256 => 32,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the tag size for this cipher mode.
|
||||
pub fn required_tag_size(&self) -> usize {
|
||||
// All supported ciphers use 16-byte tags
|
||||
16
|
||||
}
|
||||
|
||||
/// Returns the total metadata size (nonce + tag) for this cipher mode.
|
||||
fn required_metadata_size(&self) -> usize {
|
||||
self.required_nonce_size() + self.required_tag_size()
|
||||
}
|
||||
}
|
||||
|
||||
#[napi]
|
||||
@@ -158,7 +195,7 @@ impl SyncEngine {
|
||||
tracing: opts.tracing.clone(),
|
||||
}),
|
||||
)?));
|
||||
Ok(SyncEngine {
|
||||
let opts_filled = SyncEngineOptsFilled {
|
||||
path: opts.path,
|
||||
client_name: opts.client_name.unwrap_or("turso-sync-js".to_string()),
|
||||
wal_pull_batch_size: opts.wal_pull_batch_size.unwrap_or(100),
|
||||
@@ -167,37 +204,60 @@ impl SyncEngine {
|
||||
.map(|x| std::time::Duration::from_millis(x as u64)),
|
||||
tables_ignore: opts.tables_ignore.unwrap_or_default(),
|
||||
use_transform: opts.use_transform,
|
||||
#[allow(clippy::arc_with_non_send_sync)]
|
||||
sync_engine: Arc::new(RwLock::new(None)),
|
||||
io: Some(io),
|
||||
protocol: Some(Arc::new(JsProtocolIo::default())),
|
||||
#[allow(clippy::arc_with_non_send_sync)]
|
||||
db,
|
||||
protocol_version: match opts.protocol_version {
|
||||
Some(SyncEngineProtocolVersion::Legacy) | None => {
|
||||
DatabaseSyncEngineProtocolVersion::Legacy
|
||||
}
|
||||
_ => DatabaseSyncEngineProtocolVersion::V1,
|
||||
},
|
||||
bootstrap_if_empty: opts.bootstrap_if_empty,
|
||||
remote_encryption: match opts.remote_encryption.as_deref() {
|
||||
Some("aes256gcm") => Some(CipherMode::Aes256Gcm),
|
||||
Some("aes128gcm") => Some(CipherMode::Aes128Gcm),
|
||||
Some("chacha20poly1305") => Some(CipherMode::ChaCha20Poly1305),
|
||||
Some("aegis256") => Some(CipherMode::Aegis256),
|
||||
None => None,
|
||||
_ => {
|
||||
return Err(napi::Error::new(
|
||||
napi::Status::GenericFailure,
|
||||
"unsupported remote cipher",
|
||||
))
|
||||
}
|
||||
},
|
||||
};
|
||||
Ok(SyncEngine {
|
||||
opts: opts_filled,
|
||||
#[allow(clippy::arc_with_non_send_sync)]
|
||||
sync_engine: Arc::new(RwLock::new(None)),
|
||||
io: Some(io),
|
||||
protocol: Some(Arc::new(JsProtocolIo::default())),
|
||||
#[allow(clippy::arc_with_non_send_sync)]
|
||||
db,
|
||||
})
|
||||
}
|
||||
|
||||
#[napi]
|
||||
pub fn connect(&mut self) -> napi::Result<GeneratorHolder> {
|
||||
let opts = DatabaseSyncEngineOpts {
|
||||
client_name: self.client_name.clone(),
|
||||
wal_pull_batch_size: self.wal_pull_batch_size as u64,
|
||||
long_poll_timeout: self.long_poll_timeout,
|
||||
tables_ignore: self.tables_ignore.clone(),
|
||||
use_transform: self.use_transform,
|
||||
protocol_version_hint: self.protocol_version,
|
||||
client_name: self.opts.client_name.clone(),
|
||||
wal_pull_batch_size: self.opts.wal_pull_batch_size as u64,
|
||||
long_poll_timeout: self.opts.long_poll_timeout,
|
||||
tables_ignore: self.opts.tables_ignore.clone(),
|
||||
use_transform: self.opts.use_transform,
|
||||
protocol_version_hint: self.opts.protocol_version,
|
||||
bootstrap_if_empty: self.opts.bootstrap_if_empty,
|
||||
reserved_bytes: self
|
||||
.opts
|
||||
.remote_encryption
|
||||
.map(|x| x.required_metadata_size())
|
||||
.unwrap_or(0),
|
||||
};
|
||||
|
||||
let io = self.io()?;
|
||||
let protocol = self.protocol()?;
|
||||
let sync_engine = self.sync_engine.clone();
|
||||
let db = self.db.clone();
|
||||
let path = self.path.clone();
|
||||
let path = self.opts.path.clone();
|
||||
let generator = genawaiter::sync::Gen::new(|coro| async move {
|
||||
let coro = Coro::new((), coro);
|
||||
let initialized =
|
||||
|
||||
@@ -2266,6 +2266,12 @@ impl Connection {
|
||||
self.set_encryption_context()
|
||||
}
|
||||
|
||||
pub fn set_reserved_bytes(&self, reserved_bytes: u8) -> Result<()> {
|
||||
let pager = self.pager.read();
|
||||
pager.set_reserved_space_bytes(reserved_bytes);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn get_encryption_cipher_mode(&self) -> Option<CipherMode> {
|
||||
*self.encryption_cipher_mode.read()
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@ repository.workspace = true
|
||||
|
||||
[dependencies]
|
||||
turso_core = { workspace = true, features = ["conn_raw_api"] }
|
||||
turso_parser = { workspace = true }
|
||||
thiserror = "2.0.12"
|
||||
tracing = "0.1.41"
|
||||
serde_json.workspace = true
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
use turso_parser::parser::Parser;
|
||||
|
||||
use crate::{
|
||||
database_tape::{run_stmt_once, DatabaseReplaySessionOpts},
|
||||
errors::Error,
|
||||
@@ -211,9 +213,43 @@ impl DatabaseReplayGenerator {
|
||||
after.last()
|
||||
)));
|
||||
};
|
||||
let mut parser = Parser::new(sql.as_str().as_bytes());
|
||||
let mut ast = parser
|
||||
.next()
|
||||
.ok_or_else(|| {
|
||||
Error::DatabaseTapeError(format!(
|
||||
"unexpected DDL query: {}",
|
||||
sql.as_str()
|
||||
))
|
||||
})?
|
||||
.map_err(|e| {
|
||||
Error::DatabaseTapeError(format!(
|
||||
"unexpected DDL query {}: {}",
|
||||
e,
|
||||
sql.as_str()
|
||||
))
|
||||
})?;
|
||||
let turso_parser::ast::Cmd::Stmt(stmt) = &mut ast else {
|
||||
return Err(Error::DatabaseTapeError(format!(
|
||||
"unexpected DDL query: {}",
|
||||
sql.as_str()
|
||||
)));
|
||||
};
|
||||
match stmt {
|
||||
turso_parser::ast::Stmt::CreateTable { if_not_exists, .. }
|
||||
| turso_parser::ast::Stmt::CreateIndex { if_not_exists, .. }
|
||||
| turso_parser::ast::Stmt::CreateTrigger { if_not_exists, .. }
|
||||
| turso_parser::ast::Stmt::CreateMaterializedView {
|
||||
if_not_exists, ..
|
||||
}
|
||||
| turso_parser::ast::Stmt::CreateView { if_not_exists, .. } => {
|
||||
*if_not_exists = true
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
let insert = ReplayInfo {
|
||||
change_type: DatabaseChangeType::Insert,
|
||||
query: sql.as_str().to_string(),
|
||||
query: ast.to_string(),
|
||||
pk_column_indices: None,
|
||||
column_names: Vec::new(),
|
||||
is_ddl_replay: true,
|
||||
|
||||
@@ -38,6 +38,8 @@ pub struct DatabaseSyncEngineOpts {
|
||||
pub wal_pull_batch_size: u64,
|
||||
pub long_poll_timeout: Option<std::time::Duration>,
|
||||
pub protocol_version_hint: DatabaseSyncEngineProtocolVersion,
|
||||
pub bootstrap_if_empty: bool,
|
||||
pub reserved_bytes: usize,
|
||||
}
|
||||
|
||||
pub struct DatabaseSyncEngine<P: ProtocolIO> {
|
||||
@@ -73,9 +75,6 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
|
||||
let meta_path = format!("{main_db_path}-info");
|
||||
let changes_path = format!("{main_db_path}-changes");
|
||||
|
||||
let db_file = io.open_file(main_db_path, turso_core::OpenFlags::Create, false)?;
|
||||
let db_file = Arc::new(turso_core::storage::database::DatabaseFile::new(db_file));
|
||||
|
||||
tracing::info!("init(path={}): opts={:?}", main_db_path, opts);
|
||||
|
||||
let completion = protocol.full_read(&meta_path)?;
|
||||
@@ -88,7 +87,7 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
|
||||
|
||||
let meta = match meta {
|
||||
Some(meta) => meta,
|
||||
None => {
|
||||
None if opts.bootstrap_if_empty => {
|
||||
let client_unique_id = format!("{}-{}", opts.client_name, uuid::Uuid::new_v4());
|
||||
let revision = bootstrap_db_file(
|
||||
coro,
|
||||
@@ -106,7 +105,31 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
|
||||
revert_since_wal_watermark: 0,
|
||||
last_pushed_change_id_hint: 0,
|
||||
last_pushed_pull_gen_hint: 0,
|
||||
last_pull_unix_time: io.now().secs,
|
||||
last_pull_unix_time: Some(io.now().secs),
|
||||
last_push_unix_time: None,
|
||||
};
|
||||
tracing::info!("write meta after successful bootstrap: meta={meta:?}");
|
||||
let completion = protocol.full_write(&meta_path, meta.dump()?)?;
|
||||
// todo: what happen if we will actually update the metadata on disk but fail and so in memory state will not be updated
|
||||
wait_all_results(coro, &completion).await?;
|
||||
meta
|
||||
}
|
||||
None => {
|
||||
if opts.protocol_version_hint == DatabaseSyncEngineProtocolVersion::Legacy {
|
||||
return Err(Error::DatabaseSyncEngineError(
|
||||
"deferred bootstrap is not supported for legacy protocol".to_string(),
|
||||
));
|
||||
}
|
||||
let client_unique_id = format!("{}-{}", opts.client_name, uuid::Uuid::new_v4());
|
||||
let meta = DatabaseMetadata {
|
||||
version: DATABASE_METADATA_VERSION.to_string(),
|
||||
client_unique_id,
|
||||
synced_revision: None,
|
||||
revert_since_wal_salt: None,
|
||||
revert_since_wal_watermark: 0,
|
||||
last_pushed_change_id_hint: 0,
|
||||
last_pushed_pull_gen_hint: 0,
|
||||
last_pull_unix_time: None,
|
||||
last_push_unix_time: None,
|
||||
};
|
||||
tracing::info!("write meta after successful bootstrap: meta={meta:?}");
|
||||
@@ -127,11 +150,14 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
|
||||
tracing::info!("check if main db file exists");
|
||||
|
||||
let main_exists = io.try_open(main_db_path)?.is_some();
|
||||
if !main_exists {
|
||||
if !main_exists && meta.synced_revision.is_some() {
|
||||
let error = "main DB file doesn't exists, but metadata is".to_string();
|
||||
return Err(Error::DatabaseSyncEngineError(error));
|
||||
}
|
||||
|
||||
let db_file = io.open_file(main_db_path, turso_core::OpenFlags::Create, false)?;
|
||||
let db_file = Arc::new(turso_core::storage::database::DatabaseFile::new(db_file));
|
||||
|
||||
let main_db = turso_core::Database::open_with_flags(
|
||||
io.clone(),
|
||||
main_db_path,
|
||||
@@ -140,6 +166,17 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
|
||||
turso_core::DatabaseOpts::new().with_indexes(true),
|
||||
None,
|
||||
)?;
|
||||
|
||||
// DB wasn't bootstrapped but remote is encrypted - so we must properly set reserved bytes field in advance
|
||||
if meta.synced_revision.is_none() && opts.reserved_bytes != 0 {
|
||||
let conn = main_db.connect()?;
|
||||
conn.set_reserved_bytes(opts.reserved_bytes as u8)?;
|
||||
|
||||
// write transaction forces allocation of root DB page
|
||||
conn.execute("BEGIN IMMEDIATE")?;
|
||||
conn.execute("COMMIT")?;
|
||||
}
|
||||
|
||||
let tape_opts = DatabaseTapeOpts {
|
||||
cdc_table: None,
|
||||
cdc_mode: Some("full".to_string()),
|
||||
@@ -162,11 +199,11 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
|
||||
client_unique_id: meta.client_unique_id.clone(),
|
||||
};
|
||||
|
||||
let synced_revision = meta.synced_revision.as_ref().unwrap();
|
||||
if let DatabasePullRevision::Legacy {
|
||||
let synced_revision = meta.synced_revision.as_ref();
|
||||
if let Some(DatabasePullRevision::Legacy {
|
||||
synced_frame_no: None,
|
||||
..
|
||||
} = synced_revision
|
||||
}) = synced_revision
|
||||
{
|
||||
// sync WAL from the remote in case of bootstrap - all subsequent initializations will be fast
|
||||
db.pull_changes_from_remote(coro).await?;
|
||||
@@ -380,7 +417,7 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
|
||||
let file = acquire_slot(&self.changes_file)?;
|
||||
|
||||
let now = self.io.now();
|
||||
let revision = self.meta().synced_revision.clone().unwrap();
|
||||
let revision = self.meta().synced_revision.clone();
|
||||
let next_revision = wal_pull_to_file(
|
||||
coro,
|
||||
self.protocol.as_ref(),
|
||||
@@ -427,7 +464,7 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
|
||||
) -> Result<()> {
|
||||
if remote_changes.file_slot.is_none() {
|
||||
self.update_meta(coro, |m| {
|
||||
m.last_pull_unix_time = remote_changes.time.secs;
|
||||
m.last_pull_unix_time = Some(remote_changes.time.secs);
|
||||
})
|
||||
.await?;
|
||||
return Ok(());
|
||||
@@ -450,7 +487,7 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
|
||||
m.revert_since_wal_watermark = revert_since_wal_watermark;
|
||||
m.synced_revision = Some(remote_changes.revision);
|
||||
m.last_pushed_change_id_hint = 0;
|
||||
m.last_pull_unix_time = remote_changes.time.secs;
|
||||
m.last_pull_unix_time = Some(remote_changes.time.secs);
|
||||
})
|
||||
.await?;
|
||||
Ok(())
|
||||
|
||||
@@ -153,7 +153,7 @@ pub async fn wal_apply_from_file<Ctx>(
|
||||
coro.yield_(ProtocolCommand::IO).await?;
|
||||
}
|
||||
let info = WalFrameInfo::from_frame_header(buffer.as_slice());
|
||||
tracing::debug!("got frame: {:?}", info);
|
||||
tracing::info!("got frame: {:?}", info);
|
||||
db_size = info.db_size;
|
||||
session.append_page(info.page_no, &buffer.as_slice()[WAL_FRAME_HEADER..])?;
|
||||
}
|
||||
@@ -165,7 +165,7 @@ pub async fn wal_pull_to_file<C: ProtocolIO, Ctx>(
|
||||
coro: &Coro<Ctx>,
|
||||
client: &C,
|
||||
frames_file: &Arc<dyn turso_core::File>,
|
||||
revision: &DatabasePullRevision,
|
||||
revision: &Option<DatabasePullRevision>,
|
||||
wal_pull_batch_size: u64,
|
||||
long_poll_timeout: Option<std::time::Duration>,
|
||||
) -> Result<DatabasePullRevision> {
|
||||
@@ -181,10 +181,10 @@ pub async fn wal_pull_to_file<C: ProtocolIO, Ctx>(
|
||||
coro.yield_(ProtocolCommand::IO).await?;
|
||||
}
|
||||
match revision {
|
||||
DatabasePullRevision::Legacy {
|
||||
Some(DatabasePullRevision::Legacy {
|
||||
generation,
|
||||
synced_frame_no,
|
||||
} => {
|
||||
}) => {
|
||||
let start_frame = synced_frame_no.unwrap_or(0) + 1;
|
||||
wal_pull_to_file_legacy(
|
||||
coro,
|
||||
@@ -196,9 +196,10 @@ pub async fn wal_pull_to_file<C: ProtocolIO, Ctx>(
|
||||
)
|
||||
.await
|
||||
}
|
||||
DatabasePullRevision::V1 { revision } => {
|
||||
Some(DatabasePullRevision::V1 { revision }) => {
|
||||
wal_pull_to_file_v1(coro, client, frames_file, revision, long_poll_timeout).await
|
||||
}
|
||||
None => wal_pull_to_file_v1(coro, client, frames_file, "", long_poll_timeout).await,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1200,7 +1200,7 @@ mod tests {
|
||||
turso_core::Value::Text(turso_core::types::Text::new("t")),
|
||||
turso_core::Value::Integer(6),
|
||||
turso_core::Value::Text(turso_core::types::Text::new(
|
||||
"CREATE INDEX t_idx ON t (y)"
|
||||
"CREATE INDEX IF NOT EXISTS t_idx ON t (y)"
|
||||
)),
|
||||
]
|
||||
]
|
||||
@@ -1387,4 +1387,91 @@ mod tests {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_database_tape_replay_ddl_changes_idempotent() {
|
||||
let temp_file1 = NamedTempFile::new().unwrap();
|
||||
let db_path1 = temp_file1.path().to_str().unwrap();
|
||||
let temp_file2 = NamedTempFile::new().unwrap();
|
||||
let db_path2 = temp_file2.path().to_str().unwrap();
|
||||
let temp_file3 = NamedTempFile::new().unwrap();
|
||||
let db_path3 = temp_file3.path().to_str().unwrap();
|
||||
|
||||
let io: Arc<dyn turso_core::IO> = Arc::new(turso_core::PlatformIO::new().unwrap());
|
||||
|
||||
let db1 = turso_core::Database::open_file(io.clone(), db_path1, false, true).unwrap();
|
||||
let db1 = Arc::new(DatabaseTape::new(db1));
|
||||
|
||||
let db2 = turso_core::Database::open_file(io.clone(), db_path2, false, true).unwrap();
|
||||
let db2 = Arc::new(DatabaseTape::new(db2));
|
||||
|
||||
let db3 = turso_core::Database::open_file(io.clone(), db_path3, false, true).unwrap();
|
||||
let db3 = Arc::new(DatabaseTape::new(db3));
|
||||
|
||||
let mut gen = genawaiter::sync::Gen::new({
|
||||
|coro| async move {
|
||||
let coro: Coro<()> = coro.into();
|
||||
let conn1 = db1.connect(&coro).await.unwrap();
|
||||
conn1
|
||||
.execute("CREATE TABLE t(x TEXT PRIMARY KEY, y, z)")
|
||||
.unwrap();
|
||||
conn1.execute("CREATE INDEX t_idx ON t(y, z)").unwrap();
|
||||
|
||||
let conn2 = db2.connect(&coro).await.unwrap();
|
||||
conn2
|
||||
.execute("CREATE TABLE t(x TEXT PRIMARY KEY, y, z)")
|
||||
.unwrap();
|
||||
conn2.execute("CREATE INDEX t_idx ON t(y, z)").unwrap();
|
||||
|
||||
let conn3 = db3.connect(&coro).await.unwrap();
|
||||
{
|
||||
let opts = DatabaseReplaySessionOpts {
|
||||
use_implicit_rowid: false,
|
||||
};
|
||||
let mut session = db3.start_replay_session(&coro, opts).await.unwrap();
|
||||
|
||||
let opts = DatabaseChangesIteratorOpts {
|
||||
ignore_schema_changes: false,
|
||||
..Default::default()
|
||||
};
|
||||
let mut iterator = db1.iterate_changes(opts.clone()).unwrap();
|
||||
while let Some(operation) = iterator.next(&coro).await.unwrap() {
|
||||
tracing::info!("1. operation: {:?}", operation);
|
||||
session.replay(&coro, operation).await.unwrap();
|
||||
}
|
||||
|
||||
let mut iterator = db2.iterate_changes(opts.clone()).unwrap();
|
||||
while let Some(operation) = iterator.next(&coro).await.unwrap() {
|
||||
tracing::info!("2. operation: {:?}", operation);
|
||||
session.replay(&coro, operation).await.unwrap();
|
||||
}
|
||||
}
|
||||
let mut rows = Vec::new();
|
||||
let mut stmt = conn3.prepare("SELECT name FROM sqlite_master").unwrap();
|
||||
while let Some(row) = run_stmt_once(&coro, &mut stmt).await.unwrap() {
|
||||
rows.push(row.get_value(0).to_text().unwrap().to_string());
|
||||
}
|
||||
assert_eq!(
|
||||
rows,
|
||||
vec![
|
||||
"sqlite_sequence".to_string(),
|
||||
"turso_cdc".to_string(),
|
||||
"t".to_string(),
|
||||
"sqlite_autoindex_t_1".to_string(),
|
||||
"t_idx".to_string()
|
||||
]
|
||||
);
|
||||
crate::Result::Ok(())
|
||||
}
|
||||
});
|
||||
loop {
|
||||
match gen.resume_with(Ok(())) {
|
||||
genawaiter::GeneratorState::Yielded(..) => io.step().unwrap(),
|
||||
genawaiter::GeneratorState::Complete(result) => {
|
||||
result.unwrap();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -65,7 +65,7 @@ pub struct SyncEngineStats {
|
||||
pub cdc_operations: i64,
|
||||
pub main_wal_size: u64,
|
||||
pub revert_wal_size: u64,
|
||||
pub last_pull_unix_time: i64,
|
||||
pub last_pull_unix_time: Option<i64>,
|
||||
pub last_push_unix_time: Option<i64>,
|
||||
pub revision: Option<String>,
|
||||
}
|
||||
@@ -90,7 +90,7 @@ pub struct DatabaseMetadata {
|
||||
pub revert_since_wal_salt: Option<Vec<u32>>,
|
||||
pub revert_since_wal_watermark: u64,
|
||||
/// Unix time of last successful pull
|
||||
pub last_pull_unix_time: i64,
|
||||
pub last_pull_unix_time: Option<i64>,
|
||||
/// Unix time of last successful push
|
||||
pub last_push_unix_time: Option<i64>,
|
||||
pub last_pushed_pull_gen_hint: i64,
|
||||
|
||||
Reference in New Issue
Block a user