Merge 'Sync improvements' from Nikita Sivukhin

This PR improves sync and database bindings for browser
List of changes:
- For node and browser database now run on main thread and only IO work
offloaded to the worker (web worker in browser)
- Simple locks are implemented for database access externally in order
to guard access to the same connection (when request is executed async -
main thread can try to start another request concurrently)
- parking_lot in the Wal replaced by spin-wait (by invoking
`parking_lot.try_read/try_write`) for WASM target because browser can't
park main thread
- js sync reworked in order to support few engine options
(`longPollTimeoutMs`) and introduce external locking which properly
guards concurrent access of sync methods

Closes #3218
This commit is contained in:
Pekka Enberg
2025-09-22 09:14:30 +03:00
committed by GitHub
49 changed files with 4717 additions and 483 deletions

3
Cargo.lock generated
View File

@@ -4233,6 +4233,7 @@ dependencies = [
name = "turso_node"
version = "0.2.0-pre.3"
dependencies = [
"chrono",
"napi",
"napi-build",
"napi-derive",
@@ -4335,10 +4336,10 @@ name = "turso_sync_js"
version = "0.2.0-pre.3"
dependencies = [
"genawaiter",
"http",
"napi",
"napi-build",
"napi-derive",
"tracing",
"tracing-subscriber",
"turso_core",
"turso_node",

View File

@@ -34,7 +34,6 @@ members = [
"perf/throughput/rusqlite",
"perf/encryption"
]
exclude = [
"perf/latency/limbo",
]
@@ -62,7 +61,7 @@ limbo_percentile = { path = "extensions/percentile", version = "0.2.0-pre.3" }
limbo_regexp = { path = "extensions/regexp", version = "0.2.0-pre.3" }
turso_sqlite3_parser = { path = "vendored/sqlite3-parser", version = "0.2.0-pre.3" }
limbo_uuid = { path = "extensions/uuid", version = "0.2.0-pre.3" }
turso_parser = { path = "parser", version = "0.2.0-pre.3" }
turso_parser = { path = "parser", version = "0.2.0-pre.3" }
sql_generation = { path = "sql_generation" }
strum = { version = "0.26", features = ["derive"] }
strum_macros = "0.26"

View File

@@ -16,6 +16,7 @@ napi = { version = "3.1.3", default-features = false, features = ["napi6"] }
napi-derive = { version = "3.1.1", default-features = true }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
tracing.workspace = true
chrono = { workspace = true, default-features = false, features = ["clock"] }
[features]
encryption = ["turso_core/encryption"]

View File

@@ -0,0 +1,335 @@
{
"name": "wasm",
"version": "1.0.0",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "wasm",
"version": "1.0.0",
"license": "ISC",
"dependencies": {
"@tursodatabase/database-browser": "../../packages/browser"
},
"devDependencies": {
"vite": "^7.1.4"
}
},
"../../packages/browser": {
"name": "@tursodatabase/database-browser",
"version": "0.1.5",
"license": "MIT",
"dependencies": {
"@napi-rs/wasm-runtime": "^1.0.3",
"@tursodatabase/database-browser-common": "^0.1.5",
"@tursodatabase/database-common": "^0.1.5"
},
"devDependencies": {
"@napi-rs/cli": "^3.1.5",
"@vitest/browser": "^3.2.4",
"playwright": "^1.55.0",
"typescript": "^5.9.2",
"vitest": "^3.2.4"
}
},
"node_modules/@esbuild/linux-arm64": {
"version": "0.25.9",
"cpu": [
"arm64"
],
"dev": true,
"license": "MIT",
"optional": true,
"os": [
"linux"
],
"engines": {
"node": ">=18"
}
},
"node_modules/@rollup/rollup-linux-arm64-gnu": {
"version": "4.50.0",
"cpu": [
"arm64"
],
"dev": true,
"license": "MIT",
"optional": true,
"os": [
"linux"
]
},
"node_modules/@rollup/rollup-linux-arm64-musl": {
"version": "4.50.0",
"cpu": [
"arm64"
],
"dev": true,
"license": "MIT",
"optional": true,
"os": [
"linux"
]
},
"node_modules/@tursodatabase/database-browser": {
"resolved": "../../packages/browser",
"link": true
},
"node_modules/@types/estree": {
"version": "1.0.8",
"dev": true,
"license": "MIT"
},
"node_modules/esbuild": {
"version": "0.25.9",
"dev": true,
"hasInstallScript": true,
"license": "MIT",
"bin": {
"esbuild": "bin/esbuild"
},
"engines": {
"node": ">=18"
},
"optionalDependencies": {
"@esbuild/aix-ppc64": "0.25.9",
"@esbuild/android-arm": "0.25.9",
"@esbuild/android-arm64": "0.25.9",
"@esbuild/android-x64": "0.25.9",
"@esbuild/darwin-arm64": "0.25.9",
"@esbuild/darwin-x64": "0.25.9",
"@esbuild/freebsd-arm64": "0.25.9",
"@esbuild/freebsd-x64": "0.25.9",
"@esbuild/linux-arm": "0.25.9",
"@esbuild/linux-arm64": "0.25.9",
"@esbuild/linux-ia32": "0.25.9",
"@esbuild/linux-loong64": "0.25.9",
"@esbuild/linux-mips64el": "0.25.9",
"@esbuild/linux-ppc64": "0.25.9",
"@esbuild/linux-riscv64": "0.25.9",
"@esbuild/linux-s390x": "0.25.9",
"@esbuild/linux-x64": "0.25.9",
"@esbuild/netbsd-arm64": "0.25.9",
"@esbuild/netbsd-x64": "0.25.9",
"@esbuild/openbsd-arm64": "0.25.9",
"@esbuild/openbsd-x64": "0.25.9",
"@esbuild/openharmony-arm64": "0.25.9",
"@esbuild/sunos-x64": "0.25.9",
"@esbuild/win32-arm64": "0.25.9",
"@esbuild/win32-ia32": "0.25.9",
"@esbuild/win32-x64": "0.25.9"
}
},
"node_modules/fdir": {
"version": "6.5.0",
"dev": true,
"license": "MIT",
"engines": {
"node": ">=12.0.0"
},
"peerDependencies": {
"picomatch": "^3 || ^4"
},
"peerDependenciesMeta": {
"picomatch": {
"optional": true
}
}
},
"node_modules/nanoid": {
"version": "3.3.11",
"dev": true,
"funding": [
{
"type": "github",
"url": "https://github.com/sponsors/ai"
}
],
"license": "MIT",
"bin": {
"nanoid": "bin/nanoid.cjs"
},
"engines": {
"node": "^10 || ^12 || ^13.7 || ^14 || >=15.0.1"
}
},
"node_modules/picocolors": {
"version": "1.1.1",
"dev": true,
"license": "ISC"
},
"node_modules/picomatch": {
"version": "4.0.3",
"dev": true,
"license": "MIT",
"engines": {
"node": ">=12"
},
"funding": {
"url": "https://github.com/sponsors/jonschlinkert"
}
},
"node_modules/postcss": {
"version": "8.5.6",
"dev": true,
"funding": [
{
"type": "opencollective",
"url": "https://opencollective.com/postcss/"
},
{
"type": "tidelift",
"url": "https://tidelift.com/funding/github/npm/postcss"
},
{
"type": "github",
"url": "https://github.com/sponsors/ai"
}
],
"license": "MIT",
"dependencies": {
"nanoid": "^3.3.11",
"picocolors": "^1.1.1",
"source-map-js": "^1.2.1"
},
"engines": {
"node": "^10 || ^12 || >=14"
}
},
"node_modules/rollup": {
"version": "4.50.0",
"dev": true,
"license": "MIT",
"dependencies": {
"@types/estree": "1.0.8"
},
"bin": {
"rollup": "dist/bin/rollup"
},
"engines": {
"node": ">=18.0.0",
"npm": ">=8.0.0"
},
"optionalDependencies": {
"@rollup/rollup-android-arm-eabi": "4.50.0",
"@rollup/rollup-android-arm64": "4.50.0",
"@rollup/rollup-darwin-arm64": "4.50.0",
"@rollup/rollup-darwin-x64": "4.50.0",
"@rollup/rollup-freebsd-arm64": "4.50.0",
"@rollup/rollup-freebsd-x64": "4.50.0",
"@rollup/rollup-linux-arm-gnueabihf": "4.50.0",
"@rollup/rollup-linux-arm-musleabihf": "4.50.0",
"@rollup/rollup-linux-arm64-gnu": "4.50.0",
"@rollup/rollup-linux-arm64-musl": "4.50.0",
"@rollup/rollup-linux-loongarch64-gnu": "4.50.0",
"@rollup/rollup-linux-ppc64-gnu": "4.50.0",
"@rollup/rollup-linux-riscv64-gnu": "4.50.0",
"@rollup/rollup-linux-riscv64-musl": "4.50.0",
"@rollup/rollup-linux-s390x-gnu": "4.50.0",
"@rollup/rollup-linux-x64-gnu": "4.50.0",
"@rollup/rollup-linux-x64-musl": "4.50.0",
"@rollup/rollup-openharmony-arm64": "4.50.0",
"@rollup/rollup-win32-arm64-msvc": "4.50.0",
"@rollup/rollup-win32-ia32-msvc": "4.50.0",
"@rollup/rollup-win32-x64-msvc": "4.50.0",
"fsevents": "~2.3.2"
}
},
"node_modules/source-map-js": {
"version": "1.2.1",
"dev": true,
"license": "BSD-3-Clause",
"engines": {
"node": ">=0.10.0"
}
},
"node_modules/tinyglobby": {
"version": "0.2.14",
"dev": true,
"license": "MIT",
"dependencies": {
"fdir": "^6.4.4",
"picomatch": "^4.0.2"
},
"engines": {
"node": ">=12.0.0"
},
"funding": {
"url": "https://github.com/sponsors/SuperchupuDev"
}
},
"node_modules/vite": {
"version": "7.1.4",
"dev": true,
"license": "MIT",
"dependencies": {
"esbuild": "^0.25.0",
"fdir": "^6.5.0",
"picomatch": "^4.0.3",
"postcss": "^8.5.6",
"rollup": "^4.43.0",
"tinyglobby": "^0.2.14"
},
"bin": {
"vite": "bin/vite.js"
},
"engines": {
"node": "^20.19.0 || >=22.12.0"
},
"funding": {
"url": "https://github.com/vitejs/vite?sponsor=1"
},
"optionalDependencies": {
"fsevents": "~2.3.3"
},
"peerDependencies": {
"@types/node": "^20.19.0 || >=22.12.0",
"jiti": ">=1.21.0",
"less": "^4.0.0",
"lightningcss": "^1.21.0",
"sass": "^1.70.0",
"sass-embedded": "^1.70.0",
"stylus": ">=0.54.8",
"sugarss": "^5.0.0",
"terser": "^5.16.0",
"tsx": "^4.8.1",
"yaml": "^2.4.2"
},
"peerDependenciesMeta": {
"@types/node": {
"optional": true
},
"jiti": {
"optional": true
},
"less": {
"optional": true
},
"lightningcss": {
"optional": true
},
"sass": {
"optional": true
},
"sass-embedded": {
"optional": true
},
"stylus": {
"optional": true
},
"sugarss": {
"optional": true
},
"terser": {
"optional": true
},
"tsx": {
"optional": true
},
"yaml": {
"optional": true
}
}
}
}
}

View File

@@ -0,0 +1,626 @@
{
"name": "drizzle",
"version": "1.0.0",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "drizzle",
"version": "1.0.0",
"license": "ISC",
"dependencies": {
"@tursodatabase/database": "../../packages/native",
"better-sqlite3": "^12.2.0",
"drizzle-orm": "^0.44.3"
}
},
"../..": {
"version": "0.1.5",
"workspaces": [
"packages/common",
"packages/native",
"packages/browser",
"packages/browser-common",
"sync/packages/common",
"sync/packages/native",
"sync/packages/browser"
]
},
"../../packages/browser": {
"name": "@tursodatabase/database-browser",
"version": "0.1.5",
"extraneous": true,
"license": "MIT",
"dependencies": {
"@napi-rs/wasm-runtime": "^1.0.3",
"@tursodatabase/database-browser-common": "^0.1.5",
"@tursodatabase/database-common": "^0.1.5"
},
"devDependencies": {
"@napi-rs/cli": "^3.1.5",
"@vitest/browser": "^3.2.4",
"playwright": "^1.55.0",
"typescript": "^5.9.2",
"vitest": "^3.2.4"
}
},
"../../packages/browser-common": {
"name": "@tursodatabase/database-browser-common",
"version": "0.1.5",
"extraneous": true,
"license": "MIT",
"devDependencies": {
"typescript": "^5.9.2"
}
},
"../../packages/common": {
"name": "@tursodatabase/database-common",
"version": "0.1.5",
"extraneous": true,
"license": "MIT",
"devDependencies": {
"typescript": "^5.9.2"
}
},
"../../packages/native": {
"name": "@tursodatabase/database",
"version": "0.1.5",
"license": "MIT",
"dependencies": {
"@tursodatabase/database-common": "^0.1.5"
},
"devDependencies": {
"@napi-rs/cli": "^3.1.5",
"@types/node": "^24.3.1",
"typescript": "^5.9.2",
"vitest": "^3.2.4"
}
},
"../../sync/packages/browser": {
"name": "@tursodatabase/sync-browser",
"version": "0.1.5",
"extraneous": true,
"license": "MIT",
"dependencies": {
"@napi-rs/wasm-runtime": "^1.0.3",
"@tursodatabase/database-browser-common": "^0.1.5",
"@tursodatabase/database-common": "^0.1.5",
"@tursodatabase/sync-common": "^0.1.5"
},
"devDependencies": {
"@napi-rs/cli": "^3.1.5",
"@vitest/browser": "^3.2.4",
"playwright": "^1.55.0",
"typescript": "^5.9.2",
"vitest": "^3.2.4"
}
},
"../../sync/packages/common": {
"name": "@tursodatabase/sync-common",
"version": "0.1.5",
"extraneous": true,
"license": "MIT",
"devDependencies": {
"typescript": "^5.9.2"
}
},
"../../sync/packages/native": {
"name": "@tursodatabase/sync",
"version": "0.1.5",
"extraneous": true,
"license": "MIT",
"dependencies": {
"@tursodatabase/database-common": "^0.1.5",
"@tursodatabase/sync-common": "^0.1.5"
},
"devDependencies": {
"@napi-rs/cli": "^3.1.5",
"@types/node": "^24.3.1",
"typescript": "^5.9.2",
"vitest": "^3.2.4"
}
},
"node_modules/@tursodatabase/database": {
"resolved": "../../packages/native",
"link": true
},
"node_modules/base64-js": {
"version": "1.5.1",
"funding": [
{
"type": "github",
"url": "https://github.com/sponsors/feross"
},
{
"type": "patreon",
"url": "https://www.patreon.com/feross"
},
{
"type": "consulting",
"url": "https://feross.org/support"
}
],
"license": "MIT"
},
"node_modules/better-sqlite3": {
"version": "12.2.0",
"hasInstallScript": true,
"license": "MIT",
"dependencies": {
"bindings": "^1.5.0",
"prebuild-install": "^7.1.1"
},
"engines": {
"node": "20.x || 22.x || 23.x || 24.x"
}
},
"node_modules/bindings": {
"version": "1.5.0",
"license": "MIT",
"dependencies": {
"file-uri-to-path": "1.0.0"
}
},
"node_modules/bl": {
"version": "4.1.0",
"license": "MIT",
"dependencies": {
"buffer": "^5.5.0",
"inherits": "^2.0.4",
"readable-stream": "^3.4.0"
}
},
"node_modules/buffer": {
"version": "5.7.1",
"funding": [
{
"type": "github",
"url": "https://github.com/sponsors/feross"
},
{
"type": "patreon",
"url": "https://www.patreon.com/feross"
},
{
"type": "consulting",
"url": "https://feross.org/support"
}
],
"license": "MIT",
"dependencies": {
"base64-js": "^1.3.1",
"ieee754": "^1.1.13"
}
},
"node_modules/chownr": {
"version": "1.1.4",
"license": "ISC"
},
"node_modules/decompress-response": {
"version": "6.0.0",
"license": "MIT",
"dependencies": {
"mimic-response": "^3.1.0"
},
"engines": {
"node": ">=10"
},
"funding": {
"url": "https://github.com/sponsors/sindresorhus"
}
},
"node_modules/deep-extend": {
"version": "0.6.0",
"license": "MIT",
"engines": {
"node": ">=4.0.0"
}
},
"node_modules/detect-libc": {
"version": "2.0.4",
"license": "Apache-2.0",
"engines": {
"node": ">=8"
}
},
"node_modules/drizzle-orm": {
"version": "0.44.4",
"license": "Apache-2.0",
"peerDependencies": {
"@aws-sdk/client-rds-data": ">=3",
"@cloudflare/workers-types": ">=4",
"@electric-sql/pglite": ">=0.2.0",
"@libsql/client": ">=0.10.0",
"@libsql/client-wasm": ">=0.10.0",
"@neondatabase/serverless": ">=0.10.0",
"@op-engineering/op-sqlite": ">=2",
"@opentelemetry/api": "^1.4.1",
"@planetscale/database": ">=1.13",
"@prisma/client": "*",
"@tidbcloud/serverless": "*",
"@types/better-sqlite3": "*",
"@types/pg": "*",
"@types/sql.js": "*",
"@upstash/redis": ">=1.34.7",
"@vercel/postgres": ">=0.8.0",
"@xata.io/client": "*",
"better-sqlite3": ">=7",
"bun-types": "*",
"expo-sqlite": ">=14.0.0",
"gel": ">=2",
"knex": "*",
"kysely": "*",
"mysql2": ">=2",
"pg": ">=8",
"postgres": ">=3",
"sql.js": ">=1",
"sqlite3": ">=5"
},
"peerDependenciesMeta": {
"@aws-sdk/client-rds-data": {
"optional": true
},
"@cloudflare/workers-types": {
"optional": true
},
"@electric-sql/pglite": {
"optional": true
},
"@libsql/client": {
"optional": true
},
"@libsql/client-wasm": {
"optional": true
},
"@neondatabase/serverless": {
"optional": true
},
"@op-engineering/op-sqlite": {
"optional": true
},
"@opentelemetry/api": {
"optional": true
},
"@planetscale/database": {
"optional": true
},
"@prisma/client": {
"optional": true
},
"@tidbcloud/serverless": {
"optional": true
},
"@types/better-sqlite3": {
"optional": true
},
"@types/pg": {
"optional": true
},
"@types/sql.js": {
"optional": true
},
"@upstash/redis": {
"optional": true
},
"@vercel/postgres": {
"optional": true
},
"@xata.io/client": {
"optional": true
},
"better-sqlite3": {
"optional": true
},
"bun-types": {
"optional": true
},
"expo-sqlite": {
"optional": true
},
"gel": {
"optional": true
},
"knex": {
"optional": true
},
"kysely": {
"optional": true
},
"mysql2": {
"optional": true
},
"pg": {
"optional": true
},
"postgres": {
"optional": true
},
"prisma": {
"optional": true
},
"sql.js": {
"optional": true
},
"sqlite3": {
"optional": true
}
}
},
"node_modules/end-of-stream": {
"version": "1.4.5",
"license": "MIT",
"dependencies": {
"once": "^1.4.0"
}
},
"node_modules/expand-template": {
"version": "2.0.3",
"license": "(MIT OR WTFPL)",
"engines": {
"node": ">=6"
}
},
"node_modules/file-uri-to-path": {
"version": "1.0.0",
"license": "MIT"
},
"node_modules/fs-constants": {
"version": "1.0.0",
"license": "MIT"
},
"node_modules/github-from-package": {
"version": "0.0.0",
"license": "MIT"
},
"node_modules/ieee754": {
"version": "1.2.1",
"funding": [
{
"type": "github",
"url": "https://github.com/sponsors/feross"
},
{
"type": "patreon",
"url": "https://www.patreon.com/feross"
},
{
"type": "consulting",
"url": "https://feross.org/support"
}
],
"license": "BSD-3-Clause"
},
"node_modules/inherits": {
"version": "2.0.4",
"license": "ISC"
},
"node_modules/ini": {
"version": "1.3.8",
"license": "ISC"
},
"node_modules/mimic-response": {
"version": "3.1.0",
"license": "MIT",
"engines": {
"node": ">=10"
},
"funding": {
"url": "https://github.com/sponsors/sindresorhus"
}
},
"node_modules/minimist": {
"version": "1.2.8",
"license": "MIT",
"funding": {
"url": "https://github.com/sponsors/ljharb"
}
},
"node_modules/mkdirp-classic": {
"version": "0.5.3",
"license": "MIT"
},
"node_modules/napi-build-utils": {
"version": "2.0.0",
"license": "MIT"
},
"node_modules/node-abi": {
"version": "3.75.0",
"license": "MIT",
"dependencies": {
"semver": "^7.3.5"
},
"engines": {
"node": ">=10"
}
},
"node_modules/once": {
"version": "1.4.0",
"license": "ISC",
"dependencies": {
"wrappy": "1"
}
},
"node_modules/prebuild-install": {
"version": "7.1.3",
"license": "MIT",
"dependencies": {
"detect-libc": "^2.0.0",
"expand-template": "^2.0.3",
"github-from-package": "0.0.0",
"minimist": "^1.2.3",
"mkdirp-classic": "^0.5.3",
"napi-build-utils": "^2.0.0",
"node-abi": "^3.3.0",
"pump": "^3.0.0",
"rc": "^1.2.7",
"simple-get": "^4.0.0",
"tar-fs": "^2.0.0",
"tunnel-agent": "^0.6.0"
},
"bin": {
"prebuild-install": "bin.js"
},
"engines": {
"node": ">=10"
}
},
"node_modules/pump": {
"version": "3.0.3",
"license": "MIT",
"dependencies": {
"end-of-stream": "^1.1.0",
"once": "^1.3.1"
}
},
"node_modules/rc": {
"version": "1.2.8",
"license": "(BSD-2-Clause OR MIT OR Apache-2.0)",
"dependencies": {
"deep-extend": "^0.6.0",
"ini": "~1.3.0",
"minimist": "^1.2.0",
"strip-json-comments": "~2.0.1"
},
"bin": {
"rc": "cli.js"
}
},
"node_modules/readable-stream": {
"version": "3.6.2",
"license": "MIT",
"dependencies": {
"inherits": "^2.0.3",
"string_decoder": "^1.1.1",
"util-deprecate": "^1.0.1"
},
"engines": {
"node": ">= 6"
}
},
"node_modules/safe-buffer": {
"version": "5.2.1",
"funding": [
{
"type": "github",
"url": "https://github.com/sponsors/feross"
},
{
"type": "patreon",
"url": "https://www.patreon.com/feross"
},
{
"type": "consulting",
"url": "https://feross.org/support"
}
],
"license": "MIT"
},
"node_modules/semver": {
"version": "7.7.2",
"license": "ISC",
"bin": {
"semver": "bin/semver.js"
},
"engines": {
"node": ">=10"
}
},
"node_modules/simple-concat": {
"version": "1.0.1",
"funding": [
{
"type": "github",
"url": "https://github.com/sponsors/feross"
},
{
"type": "patreon",
"url": "https://www.patreon.com/feross"
},
{
"type": "consulting",
"url": "https://feross.org/support"
}
],
"license": "MIT"
},
"node_modules/simple-get": {
"version": "4.0.1",
"funding": [
{
"type": "github",
"url": "https://github.com/sponsors/feross"
},
{
"type": "patreon",
"url": "https://www.patreon.com/feross"
},
{
"type": "consulting",
"url": "https://feross.org/support"
}
],
"license": "MIT",
"dependencies": {
"decompress-response": "^6.0.0",
"once": "^1.3.1",
"simple-concat": "^1.0.0"
}
},
"node_modules/string_decoder": {
"version": "1.3.0",
"license": "MIT",
"dependencies": {
"safe-buffer": "~5.2.0"
}
},
"node_modules/strip-json-comments": {
"version": "2.0.1",
"license": "MIT",
"engines": {
"node": ">=0.10.0"
}
},
"node_modules/tar-fs": {
"version": "2.1.3",
"license": "MIT",
"dependencies": {
"chownr": "^1.1.1",
"mkdirp-classic": "^0.5.2",
"pump": "^3.0.0",
"tar-stream": "^2.1.4"
}
},
"node_modules/tar-stream": {
"version": "2.2.0",
"license": "MIT",
"dependencies": {
"bl": "^4.0.3",
"end-of-stream": "^1.4.1",
"fs-constants": "^1.0.0",
"inherits": "^2.0.3",
"readable-stream": "^3.1.1"
},
"engines": {
"node": ">=6"
}
},
"node_modules/tunnel-agent": {
"version": "0.6.0",
"license": "Apache-2.0",
"dependencies": {
"safe-buffer": "^5.0.1"
},
"engines": {
"node": "*"
}
},
"node_modules/util-deprecate": {
"version": "1.0.2",
"license": "MIT"
},
"node_modules/wrappy": {
"version": "1.0.2",
"license": "ISC"
}
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -24,46 +24,94 @@ interface BrowserImports {
is_web_worker(): boolean;
lookup_file(ptr: number, len: number): number;
read(handle: number, ptr: number, len: number, offset: number): number;
read_async(handle: number, ptr: number, len: number, offset: number, c: number);
write(handle: number, ptr: number, len: number, offset: number): number;
write_async(handle: number, ptr: number, len: number, offset: number, c: number);
sync(handle: number): number;
sync_async(handle: number, c: number);
truncate(handle: number, len: number): number;
truncate_async(handle: number, len: number, c: number);
size(handle: number): number;
}
function panic(name): never {
function panicMain(name): never {
throw new Error(`method ${name} must be invoked only from the worker thread`);
}
function panicWorker(name): never {
throw new Error(`method ${name} must be invoked only from the main thread`);
}
const MainDummyImports: BrowserImports = {
is_web_worker: function (): boolean {
return false;
},
lookup_file: function (ptr: number, len: number): number {
panic("lookup_file")
},
read: function (handle: number, ptr: number, len: number, offset: number): number {
panic("read")
},
write: function (handle: number, ptr: number, len: number, offset: number): number {
panic("write")
},
sync: function (handle: number): number {
panic("sync")
},
truncate: function (handle: number, len: number): number {
panic("truncate")
},
size: function (handle: number): number {
panic("size")
}
let completeOpfs: any = null;
function mainImports(worker: Worker): BrowserImports {
return {
is_web_worker(): boolean {
return false;
},
write_async(handle, ptr, len, offset, c) {
writeFileAtWorker(worker, handle, ptr, len, offset)
.then(result => {
completeOpfs(c, result);
}, err => {
console.error('write_async', err);
completeOpfs(c, -1);
});
},
sync_async(handle, c) {
syncFileAtWorker(worker, handle)
.then(result => {
completeOpfs(c, result);
}, err => {
console.error('sync_async', err);
completeOpfs(c, -1);
});
},
read_async(handle, ptr, len, offset, c) {
readFileAtWorker(worker, handle, ptr, len, offset)
.then(result => {
completeOpfs(c, result);
}, err => {
console.error('read_async', err);
completeOpfs(c, -1);
});
},
truncate_async(handle, len, c) {
truncateFileAtWorker(worker, handle, len)
.then(result => {
completeOpfs(c, result);
}, err => {
console.error('truncate_async', err);
completeOpfs(c, -1);
});
},
lookup_file(ptr, len): number {
panicMain("lookup_file")
},
read(handle, ptr, len, offset): number {
panicMain("read")
},
write(handle, ptr, len, offset): number {
panicMain("write")
},
sync(handle): number {
panicMain("sync")
},
truncate(handle, len): number {
panicMain("truncate")
},
size(handle): number {
panicMain("size")
}
};
};
function workerImports(opfs: OpfsDirectory, memory: WebAssembly.Memory): BrowserImports {
return {
is_web_worker: function (): boolean {
is_web_worker(): boolean {
return true;
},
lookup_file: function (ptr: number, len: number): number {
lookup_file(ptr, len): number {
try {
const handle = opfs.lookupFileHandle(getStringFromMemory(memory, ptr, len));
return handle == null ? -404 : handle;
@@ -71,29 +119,28 @@ function workerImports(opfs: OpfsDirectory, memory: WebAssembly.Memory): Browser
return -1;
}
},
read: function (handle: number, ptr: number, len: number, offset: number): number {
read(handle, ptr, len, offset): number {
try {
return opfs.read(handle, getUint8ArrayFromMemory(memory, ptr, len), offset);
} catch (e) {
return -1;
}
},
write: function (handle: number, ptr: number, len: number, offset: number): number {
write(handle, ptr, len, offset): number {
try {
return opfs.write(handle, getUint8ArrayFromMemory(memory, ptr, len), offset)
} catch (e) {
return -1;
}
},
sync: function (handle: number): number {
sync(handle): number {
try {
opfs.sync(handle);
return 0;
return opfs.sync(handle);
} catch (e) {
return -1;
}
},
truncate: function (handle: number, len: number): number {
truncate(handle, len): number {
try {
opfs.truncate(handle, len);
return 0;
@@ -101,13 +148,25 @@ function workerImports(opfs: OpfsDirectory, memory: WebAssembly.Memory): Browser
return -1;
}
},
size: function (handle: number): number {
size(handle): number {
try {
return opfs.size(handle);
} catch (e) {
return -1;
}
}
},
read_async(handle, ptr, len, offset, completion) {
panicWorker("read_async")
},
write_async(handle, ptr, len, offset, completion) {
panicWorker("write_async")
},
sync_async(handle, completion) {
panicWorker("sync_async")
},
truncate_async(handle, len, c) {
panicWorker("truncate_async")
},
}
}
@@ -175,10 +234,11 @@ class OpfsDirectory {
throw e;
}
}
sync(handle: number) {
sync(handle: number): number {
try {
const file = this.fileByHandle.get(handle);
file.flush();
return 0;
} catch (e) {
console.error('sync', handle, e);
throw e;
@@ -187,8 +247,8 @@ class OpfsDirectory {
truncate(handle: number, size: number) {
try {
const file = this.fileByHandle.get(handle);
const result = file.truncate(size);
return result;
file.truncate(size);
return 0;
} catch (e) {
console.error('truncate', handle, size, e);
throw e;
@@ -214,7 +274,7 @@ function waitForWorkerResponse(worker: Worker, id: number): Promise<any> {
if (msg.data.error != null) {
waitReject(msg.data.error)
} else {
waitResolve()
waitResolve(msg.data.result)
}
cleanup();
}
@@ -229,6 +289,38 @@ function waitForWorkerResponse(worker: Worker, id: number): Promise<any> {
return result;
}
function readFileAtWorker(worker: Worker, handle: number, ptr: number, len: number, offset: number) {
workerRequestId += 1;
const currentId = workerRequestId;
const promise = waitForWorkerResponse(worker, currentId);
worker.postMessage({ __turso__: "read_async", handle: handle, ptr: ptr, len: len, offset: offset, id: currentId });
return promise;
}
function writeFileAtWorker(worker: Worker, handle: number, ptr: number, len: number, offset: number) {
workerRequestId += 1;
const currentId = workerRequestId;
const promise = waitForWorkerResponse(worker, currentId);
worker.postMessage({ __turso__: "write_async", handle: handle, ptr: ptr, len: len, offset: offset, id: currentId });
return promise;
}
function syncFileAtWorker(worker: Worker, handle: number) {
workerRequestId += 1;
const currentId = workerRequestId;
const promise = waitForWorkerResponse(worker, currentId);
worker.postMessage({ __turso__: "sync_async", handle: handle, id: currentId });
return promise;
}
function truncateFileAtWorker(worker: Worker, handle: number, len: number) {
workerRequestId += 1;
const currentId = workerRequestId;
const promise = waitForWorkerResponse(worker, currentId);
worker.postMessage({ __turso__: "truncate_async", handle: handle, len: len, id: currentId });
return promise;
}
function registerFileAtWorker(worker: Worker, path: string): Promise<void> {
workerRequestId += 1;
const currentId = workerRequestId;
@@ -299,12 +391,25 @@ function setupWebWorker() {
self.postMessage({ id: e.data.id, error: error });
}
return;
} else if (e.data.__turso__ == 'read_async') {
let result = opfs.read(e.data.handle, getUint8ArrayFromMemory(memory, e.data.ptr, e.data.len), e.data.offset);
self.postMessage({ id: e.data.id, result: result });
} else if (e.data.__turso__ == 'write_async') {
let result = opfs.write(e.data.handle, getUint8ArrayFromMemory(memory, e.data.ptr, e.data.len), e.data.offset);
self.postMessage({ id: e.data.id, result: result });
} else if (e.data.__turso__ == 'sync_async') {
let result = opfs.sync(e.data.handle);
self.postMessage({ id: e.data.id, result: result });
} else if (e.data.__turso__ == 'truncate_async') {
let result = opfs.truncate(e.data.handle, e.data.len);
self.postMessage({ id: e.data.id, result: result });
}
handler.handle(e)
}
}
async function setupMainThread(wasmFile: ArrayBuffer, factory: () => Worker): Promise<any> {
const worker = factory();
const __emnapiContext = __emnapiGetDefaultContext()
const __wasi = new __WASI({
version: 'preview1',
@@ -322,13 +427,13 @@ async function setupMainThread(wasmFile: ArrayBuffer, factory: () => Worker): Pr
context: __emnapiContext,
asyncWorkPoolSize: 1,
wasi: __wasi,
onCreateWorker() { return factory() },
onCreateWorker() { return worker; },
overwriteImports(importObject) {
importObject.env = {
...importObject.env,
...importObject.napi,
...importObject.emnapi,
...MainDummyImports,
...mainImports(worker),
memory: __sharedMemory,
}
return importObject
@@ -340,8 +445,9 @@ async function setupMainThread(wasmFile: ArrayBuffer, factory: () => Worker): Pr
}
}
},
})
});
completeOpfs = __napiModule.exports.completeOpfs;
return __napiModule;
}
export { OpfsDirectory, workerImports, MainDummyImports, waitForWorkerResponse, registerFileAtWorker, unregisterFileAtWorker, isWebWorker, setupWebWorker, setupMainThread }
export { OpfsDirectory, workerImports, mainImports as MainDummyImports, waitForWorkerResponse, registerFileAtWorker, unregisterFileAtWorker, isWebWorker, setupWebWorker, setupMainThread }

View File

@@ -20,5 +20,5 @@ export const Database = napiModule.exports.Database
export const Opfs = napiModule.exports.Opfs
export const OpfsFile = napiModule.exports.OpfsFile
export const Statement = napiModule.exports.Statement
export const connect = napiModule.exports.connect
export const connectDbAsync = napiModule.exports.connectDbAsync
export const initThreadPool = napiModule.exports.initThreadPool

View File

@@ -18,5 +18,5 @@ export const Database = napiModule.exports.Database
export const Opfs = napiModule.exports.Opfs
export const OpfsFile = napiModule.exports.OpfsFile
export const Statement = napiModule.exports.Statement
export const connect = napiModule.exports.connect
export const connectDbAsync = napiModule.exports.connectDbAsync
export const initThreadPool = napiModule.exports.initThreadPool

View File

@@ -21,5 +21,5 @@ export const Database = napiModule.exports.Database
export const Opfs = napiModule.exports.Opfs
export const OpfsFile = napiModule.exports.OpfsFile
export const Statement = napiModule.exports.Statement
export const connect = napiModule.exports.connect
export const connectDbAsync = napiModule.exports.connectDbAsync
export const initThreadPool = napiModule.exports.initThreadPool

View File

@@ -7,7 +7,7 @@ let napiModule = {
Opfs: {} as any,
OpfsFile: {} as any,
Statement: {} as any,
connect: {} as any,
connectDbAsync: {} as any,
initThreadPool: {} as any,
}
};
@@ -37,5 +37,5 @@ export const Database = napiModule.exports.Database
export const Opfs = napiModule.exports.Opfs
export const OpfsFile = napiModule.exports.OpfsFile
export const Statement = napiModule.exports.Statement
export const connect = napiModule.exports.connect
export const connectDbAsync = napiModule.exports.connectDbAsync
export const initThreadPool = napiModule.exports.initThreadPool

View File

@@ -1,6 +1,6 @@
import { DatabaseOpts, SqliteError, } from "@tursodatabase/database-common"
import { connect as promiseConnect, Database } from "./promise.js";
import { connect as nativeConnect, initThreadPool, MainWorker } from "./index-bundle.js";
import { Database, connect as promiseConnect } from "./promise.js";
import { initThreadPool, MainWorker, connectDbAsync } from "./index-bundle.js";
/**
* Creates a new database connection asynchronously.
@@ -10,13 +10,19 @@ import { connect as nativeConnect, initThreadPool, MainWorker } from "./index-bu
* @returns {Promise<Database>} - A promise that resolves to a Database instance.
*/
async function connect(path: string, opts: DatabaseOpts = {}): Promise<Database> {
return await promiseConnect(path, opts, nativeConnect, async () => {
const init = async () => {
await initThreadPool();
if (MainWorker == null) {
throw new Error("panic: MainWorker is not initialized");
}
return MainWorker;
});
};
return await promiseConnect(
path,
opts,
connectDbAsync,
init
);
}
export { connect, Database, SqliteError }

View File

@@ -1,6 +1,6 @@
import { DatabaseOpts, SqliteError, } from "@tursodatabase/database-common"
import { connect as promiseConnect, Database } from "./promise.js";
import { connect as nativeConnect, initThreadPool, MainWorker } from "./index-default.js";
import { Database, connect as promiseConnect } from "./promise.js";
import { initThreadPool, MainWorker, connectDbAsync } from "./index-default.js";
/**
* Creates a new database connection asynchronously.
@@ -10,13 +10,19 @@ import { connect as nativeConnect, initThreadPool, MainWorker } from "./index-de
* @returns {Promise<Database>} - A promise that resolves to a Database instance.
*/
async function connect(path: string, opts: DatabaseOpts = {}): Promise<Database> {
return await promiseConnect(path, opts, nativeConnect, async () => {
const init = async () => {
await initThreadPool();
if (MainWorker == null) {
throw new Error("panic: MainWorker is not initialized");
}
return MainWorker;
});
};
return await promiseConnect(
path,
opts,
connectDbAsync,
init
);
}
export { connect, Database, SqliteError }

View File

@@ -1,6 +1,6 @@
import { DatabaseOpts, SqliteError, } from "@tursodatabase/database-common"
import { connect as promiseConnect, Database } from "./promise.js";
import { connect as nativeConnect, initThreadPool, MainWorker } from "./index-turbopack-hack.js";
import { Database, connect as promiseConnect } from "./promise.js";
import { initThreadPool, MainWorker, connectDbAsync } from "./index-turbopack-hack.js";
/**
* Creates a new database connection asynchronously.
@@ -10,13 +10,19 @@ import { connect as nativeConnect, initThreadPool, MainWorker } from "./index-tu
* @returns {Promise<Database>} - A promise that resolves to a Database instance.
*/
async function connect(path: string, opts: DatabaseOpts = {}): Promise<Database> {
return await promiseConnect(path, opts, nativeConnect, async () => {
const init = async () => {
await initThreadPool();
if (MainWorker == null) {
throw new Error("panic: MainWorker is not initialized");
}
return MainWorker;
});
};
return await promiseConnect(
path,
opts,
connectDbAsync,
init
);
}
export { connect, Database, SqliteError }

View File

@@ -1,6 +1,6 @@
import { DatabaseOpts, SqliteError, } from "@tursodatabase/database-common"
import { connect as promiseConnect, Database } from "./promise.js";
import { connect as nativeConnect, initThreadPool, MainWorker } from "./index-vite-dev-hack.js";
import { Database, connect as promiseConnect } from "./promise.js";
import { initThreadPool, MainWorker, connectDbAsync } from "./index-vite-dev-hack.js";
/**
* Creates a new database connection asynchronously.
@@ -10,13 +10,19 @@ import { connect as nativeConnect, initThreadPool, MainWorker } from "./index-vi
* @returns {Promise<Database>} - A promise that resolves to a Database instance.
*/
async function connect(path: string, opts: DatabaseOpts = {}): Promise<Database> {
return await promiseConnect(path, opts, nativeConnect, async () => {
const init = async () => {
await initThreadPool();
if (MainWorker == null) {
throw new Error("panic: MainWorker is not initialized");
}
return MainWorker;
});
};
return await promiseConnect(
path,
opts,
connectDbAsync,
init
);
}
export { connect, Database, SqliteError }

View File

@@ -1,4 +1,4 @@
import { expect, test, afterEach } from 'vitest'
import { expect, test } from 'vitest'
import { connect } from './promise-default.js'
test('in-memory db', async () => {
@@ -10,6 +10,28 @@ test('in-memory db', async () => {
expect(rows).toEqual([{ x: 1 }, { x: 3 }]);
})
test('on-disk db large inserts', async () => {
const path = `test-${(Math.random() * 10000) | 0}.db`;
const db1 = await connect(path);
await db1.prepare("CREATE TABLE t(x)").run();
await db1.prepare("INSERT INTO t VALUES (randomblob(10 * 4096 + 0))").run();
await db1.prepare("INSERT INTO t VALUES (randomblob(10 * 4096 + 1))").run();
await db1.prepare("INSERT INTO t VALUES (randomblob(10 * 4096 + 2))").run();
const stmt1 = db1.prepare("SELECT length(x) as l FROM t");
expect(stmt1.columns()).toEqual([{ name: "l", column: null, database: null, table: null, type: null }]);
const rows1 = await stmt1.all();
expect(rows1).toEqual([{ l: 10 * 4096 }, { l: 10 * 4096 + 1 }, { l: 10 * 4096 + 2 }]);
await db1.exec("BEGIN");
await db1.exec("INSERT INTO t VALUES (1)");
await db1.exec("ROLLBACK");
const rows2 = await db1.prepare("SELECT length(x) as l FROM t").all();
expect(rows2).toEqual([{ l: 10 * 4096 }, { l: 10 * 4096 + 1 }, { l: 10 * 4096 + 2 }]);
await db1.prepare("PRAGMA wal_checkpoint(TRUNCATE)").run();
})
test('on-disk db', async () => {
const path = `test-${(Math.random() * 10000) | 0}.db`;
const db1 = await connect(path);
@@ -19,8 +41,8 @@ test('on-disk db', async () => {
expect(stmt1.columns()).toEqual([{ name: "x", column: null, database: null, table: null, type: null }]);
const rows1 = await stmt1.all([1]);
expect(rows1).toEqual([{ x: 1 }, { x: 3 }]);
await db1.close();
stmt1.close();
await db1.close();
const db2 = await connect(path);
const stmt2 = db2.prepare("SELECT * FROM t WHERE x % 2 = ?");
@@ -30,23 +52,24 @@ test('on-disk db', async () => {
db2.close();
})
test('attach', async () => {
const path1 = `test-${(Math.random() * 10000) | 0}.db`;
const path2 = `test-${(Math.random() * 10000) | 0}.db`;
const db1 = await connect(path1);
await db1.exec("CREATE TABLE t(x)");
await db1.exec("INSERT INTO t VALUES (1), (2), (3)");
const db2 = await connect(path2);
await db2.exec("CREATE TABLE q(x)");
await db2.exec("INSERT INTO q VALUES (4), (5), (6)");
// attach is not supported in browser for now
// test('attach', async () => {
// const path1 = `test-${(Math.random() * 10000) | 0}.db`;
// const path2 = `test-${(Math.random() * 10000) | 0}.db`;
// const db1 = await connect(path1);
// await db1.exec("CREATE TABLE t(x)");
// await db1.exec("INSERT INTO t VALUES (1), (2), (3)");
// const db2 = await connect(path2);
// await db2.exec("CREATE TABLE q(x)");
// await db2.exec("INSERT INTO q VALUES (4), (5), (6)");
await db1.exec(`ATTACH '${path2}' as secondary`);
// await db1.exec(`ATTACH '${path2}' as secondary`);
const stmt = db1.prepare("SELECT * FROM t UNION ALL SELECT * FROM secondary.q");
expect(stmt.columns()).toEqual([{ name: "x", column: null, database: null, table: null, type: null }]);
const rows = await stmt.all([1]);
expect(rows).toEqual([{ x: 1 }, { x: 2 }, { x: 3 }, { x: 4 }, { x: 5 }, { x: 6 }]);
})
// const stmt = db1.prepare("SELECT * FROM t UNION ALL SELECT * FROM secondary.q");
// expect(stmt.columns()).toEqual([{ name: "x", column: null, database: null, table: null, type: null }]);
// const rows = await stmt.all([1]);
// expect(rows).toEqual([{ x: 1 }, { x: 2 }, { x: 3 }, { x: 4 }, { x: 5 }, { x: 6 }]);
// })
test('blobs', async () => {
const db = await connect(":memory:");

View File

@@ -0,0 +1,29 @@
export class AsyncLock {
locked: boolean;
queue: any[];
constructor() {
this.locked = false;
this.queue = []
}
async acquire() {
if (!this.locked) {
this.locked = true;
return Promise.resolve();
} else {
const block = new Promise(resolve => { this.queue.push(resolve) });
return block;
}
}
release() {
if (this.locked == false) {
throw new Error("invalid state: lock was already unlocked");
}
const item = this.queue.shift();
if (item != null) {
this.locked = true;
item();
} else {
this.locked = false;
}
}
}

View File

@@ -192,7 +192,12 @@ class Database {
}
try {
this.db.batchSync(sql);
let stmt = this.prepare(sql);
try {
stmt.run();
} finally {
stmt.close();
}
} catch (err) {
throw convertError(err);
}
@@ -408,6 +413,10 @@ class Statement {
throw convertError(err);
}
}
close() {
this.stmt.finalize();
}
}
export { Database, Statement }

View File

@@ -2,5 +2,6 @@ import { NativeDatabase, NativeStatement, DatabaseOpts } from "./types.js";
import { Database as DatabaseCompat, Statement as StatementCompat } from "./compat.js";
import { Database as DatabasePromise, Statement as StatementPromise } from "./promise.js";
import { SqliteError } from "./sqlite-error.js";
import { AsyncLock } from "./async-lock.js";
export { DatabaseCompat, StatementCompat, DatabasePromise, StatementPromise, NativeDatabase, NativeStatement, SqliteError, DatabaseOpts }
export { DatabaseCompat, StatementCompat, DatabasePromise, StatementPromise, NativeDatabase, NativeStatement, SqliteError, DatabaseOpts, AsyncLock }

View File

@@ -1,3 +1,4 @@
import { AsyncLock } from "./async-lock.js";
import { bindParams } from "./bind.js";
import { SqliteError } from "./sqlite-error.js";
import { NativeDatabase, NativeStatement, STEP_IO, STEP_ROW, STEP_DONE, DatabaseOpts } from "./types.js";
@@ -32,6 +33,7 @@ class Database {
db: NativeDatabase;
memory: boolean;
open: boolean;
execLock: AsyncLock;
private _inTransaction: boolean = false;
/**
* Creates a new database connection. If the database file pointed to by `path` does not exists, it will be created.
@@ -57,6 +59,7 @@ class Database {
initialize(db: NativeDatabase, name, readonly) {
this.db = db;
this.memory = db.memory;
this.execLock = new AsyncLock();
Object.defineProperties(this, {
inTransaction: {
get: () => this._inTransaction,
@@ -195,10 +198,11 @@ class Database {
throw new TypeError("The database connection is not open");
}
const stmt = this.prepare(sql);
try {
await this.db.batchAsync(sql);
} catch (err) {
throw convertError(err);
await stmt.run();
} finally {
stmt.close();
}
}
@@ -297,25 +301,30 @@ class Statement {
this.stmt.reset();
bindParams(this.stmt, bindParameters);
while (true) {
const stepResult = await this.stmt.stepAsync();
if (stepResult === STEP_IO) {
await this.db.db.ioLoopAsync();
continue;
}
if (stepResult === STEP_DONE) {
break;
}
if (stepResult === STEP_ROW) {
// For run(), we don't need the row data, just continue
continue;
await this.db.execLock.acquire();
try {
while (true) {
const stepResult = await this.stmt.stepSync();
if (stepResult === STEP_IO) {
await this.db.db.ioLoopAsync();
continue;
}
if (stepResult === STEP_DONE) {
break;
}
if (stepResult === STEP_ROW) {
// For run(), we don't need the row data, just continue
continue;
}
}
const lastInsertRowid = this.db.db.lastInsertRowid();
const changes = this.db.db.totalChanges() === totalChangesBefore ? 0 : this.db.db.changes();
return { changes, lastInsertRowid };
} finally {
this.db.execLock.release();
}
const lastInsertRowid = this.db.db.lastInsertRowid();
const changes = this.db.db.totalChanges() === totalChangesBefore ? 0 : this.db.db.changes();
return { changes, lastInsertRowid };
}
/**
@@ -327,18 +336,23 @@ class Statement {
this.stmt.reset();
bindParams(this.stmt, bindParameters);
while (true) {
const stepResult = await this.stmt.stepAsync();
if (stepResult === STEP_IO) {
await this.db.db.ioLoopAsync();
continue;
}
if (stepResult === STEP_DONE) {
return undefined;
}
if (stepResult === STEP_ROW) {
return this.stmt.row();
await this.db.execLock.acquire();
try {
while (true) {
const stepResult = await this.stmt.stepSync();
if (stepResult === STEP_IO) {
await this.db.db.ioLoopAsync();
continue;
}
if (stepResult === STEP_DONE) {
return undefined;
}
if (stepResult === STEP_ROW) {
return this.stmt.row();
}
}
} finally {
this.db.execLock.release();
}
}
@@ -351,18 +365,23 @@ class Statement {
this.stmt.reset();
bindParams(this.stmt, bindParameters);
while (true) {
const stepResult = await this.stmt.stepAsync();
if (stepResult === STEP_IO) {
await this.db.db.ioLoopAsync();
continue;
}
if (stepResult === STEP_DONE) {
break;
}
if (stepResult === STEP_ROW) {
yield this.stmt.row();
await this.db.execLock.acquire();
try {
while (true) {
const stepResult = await this.stmt.stepSync();
if (stepResult === STEP_IO) {
await this.db.db.ioLoopAsync();
continue;
}
if (stepResult === STEP_DONE) {
break;
}
if (stepResult === STEP_ROW) {
yield this.stmt.row();
}
}
} finally {
this.db.execLock.release();
}
}
@@ -376,20 +395,26 @@ class Statement {
bindParams(this.stmt, bindParameters);
const rows: any[] = [];
while (true) {
const stepResult = await this.stmt.stepAsync();
if (stepResult === STEP_IO) {
await this.db.db.ioLoopAsync();
continue;
}
if (stepResult === STEP_DONE) {
break;
}
if (stepResult === STEP_ROW) {
rows.push(this.stmt.row());
await this.db.execLock.acquire();
try {
while (true) {
const stepResult = await this.stmt.stepSync();
if (stepResult === STEP_IO) {
await this.db.db.ioLoopAsync();
continue;
}
if (stepResult === STEP_DONE) {
break;
}
if (stepResult === STEP_ROW) {
rows.push(this.stmt.row());
}
}
return rows;
}
finally {
this.db.execLock.release();
}
return rows;
}
/**

View File

@@ -15,26 +15,6 @@ export declare class Database {
get path(): string
/** Returns whether the database connection is open. */
get open(): boolean
/**
* Executes a batch of SQL statements on main thread
*
* # Arguments
*
* * `sql` - The SQL statements to execute.
*
* # Returns
*/
batchSync(sql: string): void
/**
* Executes a batch of SQL statements outside of main thread
*
* # Arguments
*
* * `sql` - The SQL statements to execute.
*
* # Returns
*/
batchAsync(sql: string): Promise<void>
/**
* Prepares a statement for execution.
*

View File

@@ -22,6 +22,9 @@
"devDependencies": {
"@napi-rs/cli": "^3.1.5",
"@types/node": "^24.3.1",
"better-sqlite3": "^12.2.0",
"drizzle-kit": "^0.31.4",
"drizzle-orm": "^0.44.5",
"typescript": "^5.9.2",
"vitest": "^3.2.4"
},

View File

@@ -1,8 +1,28 @@
import { unlinkSync } from "node:fs";
import { expect, test } from 'vitest'
import { connect } from './promise.js'
import { sql } from 'drizzle-orm';
import { drizzle } from 'drizzle-orm/better-sqlite3';
test('in-memory db', async () => {
test('drizzle-orm', async () => {
const path = `test-${(Math.random() * 10000) | 0}.db`;
try {
const conn = await connect(path);
const db = drizzle(conn);
await db.run('CREATE TABLE t(x, y)');
let tasks = [];
for (let i = 0; i < 1234; i++) {
tasks.push(db.run(sql`INSERT INTO t VALUES (${i}, randomblob(${i} * 5))`))
}
await Promise.all(tasks);
expect(await db.all("SELECT COUNT(*) as cnt FROM t")).toEqual([{ cnt: 1234 }])
} finally {
unlinkSync(path);
unlinkSync(`${path}-wal`);
}
})
test('in-memory-db-async', async () => {
const db = await connect(":memory:");
await db.exec("CREATE TABLE t(x)");
await db.exec("INSERT INTO t VALUES (1), (2), (3)");

View File

@@ -20,10 +20,10 @@
},
"../packages/native": {
"name": "@tursodatabase/database",
"version": "0.1.5-pre.3",
"version": "0.2.0-pre.3",
"license": "MIT",
"dependencies": {
"@tursodatabase/database-common": "^0.1.5-pre.3"
"@tursodatabase/database-common": "^0.2.0-pre.3"
},
"devDependencies": {
"@napi-rs/cli": "^3.1.5",

View File

@@ -1,26 +1,26 @@
import { run, bench, group, baseline } from 'mitata';
import { Database } from '@tursodatabase/database/compat';
import { Database } from '@tursodatabase/database';
const db = new Database(':memory:');
db.exec("CREATE TABLE users (id INTEGER, name TEXT, email TEXT)");
db.exec("INSERT INTO users (id, name, email) VALUES (1, 'Alice', 'alice@example.org')");
await db.exec("CREATE TABLE users (id INTEGER, name TEXT, email TEXT)");
await db.exec("INSERT INTO users (id, name, email) VALUES (1, 'Alice', 'alice@example.org')");
const stmtSelect = db.prepare("SELECT * FROM users WHERE id = ?");
const rawStmtSelect = db.prepare("SELECT * FROM users WHERE id = ?").raw();
const stmtInsert = db.prepare("INSERT INTO users (id, name, email) VALUES (?, ?, ?)");
bench('Statement.get() with bind parameters [expanded]', () => {
stmtSelect.get(1);
bench('Statement.get() with bind parameters [expanded]', async () => {
await stmtSelect.get(1);
});
bench('Statement.get() with bind parameters [raw]', () => {
rawStmtSelect.get(1);
bench('Statement.get() with bind parameters [raw]', async () => {
await rawStmtSelect.get(1);
});
bench('Statement.run() with bind parameters', () => {
stmtInsert.run([1, 'foobar', 'foobar@example.com']);
bench('Statement.run() with bind parameters', async () => {
await stmtInsert.run([1, 'foobar', 'foobar@example.com']);
});
await run({

View File

@@ -1,10 +1,10 @@
use std::sync::Arc;
use std::{cell::RefCell, collections::HashMap, sync::Arc};
use napi::bindgen_prelude::*;
use napi_derive::napi;
use turso_core::{storage::database::DatabaseFile, Clock, File, Instant, IO};
use turso_core::{Clock, Completion, File, Instant, MemoryIO, IO};
use crate::{init_tracing, is_memory, Database, DatabaseOpts};
use crate::{is_memory, Database, DatabaseOpts};
pub struct NoopTask;
@@ -29,11 +29,11 @@ pub fn init_thread_pool() -> napi::Result<AsyncTask<NoopTask>> {
pub struct ConnectTask {
path: String,
io: Arc<dyn turso_core::IO>,
opts: Option<DatabaseOpts>,
}
pub struct ConnectResult {
db: Arc<turso_core::Database>,
conn: Arc<turso_core::Connection>,
db: Database,
}
unsafe impl Send for ConnectResult {}
@@ -43,79 +43,108 @@ impl Task for ConnectTask {
type JsValue = Database;
fn compute(&mut self) -> Result<Self::Output> {
let file = self
.io
.open_file(&self.path, turso_core::OpenFlags::Create, false)
.map_err(|e| Error::new(Status::GenericFailure, format!("Failed to open file: {e}")))?;
let db_file = Arc::new(DatabaseFile::new(file));
let db = turso_core::Database::open(self.io.clone(), &self.path, db_file, false, true)
.map_err(|e| {
Error::new(
Status::GenericFailure,
format!("Failed to open database: {e}"),
)
})?;
let conn = db
.connect()
.map_err(|e| Error::new(Status::GenericFailure, format!("Failed to connect: {e}")))?;
Ok(ConnectResult { db, conn })
let db = Database::new_io(self.path.clone(), self.io.clone(), self.opts.clone())?;
Ok(ConnectResult { db })
}
fn resolve(&mut self, _: Env, result: Self::Output) -> Result<Self::JsValue> {
Ok(Database::create(
Some(result.db),
self.io.clone(),
result.conn,
self.path.clone(),
))
Ok(result.db)
}
}
#[napi]
// we offload connect to the web-worker because:
// 1. browser main-thread do not support Atomic.wait operations
// 2. turso-db use blocking IO [io.wait_for_completion(c)] in few places during initialization path
//
// so, we offload connect to the worker thread
pub fn connect(path: String, opts: Option<DatabaseOpts>) -> Result<AsyncTask<ConnectTask>> {
if let Some(opts) = opts {
init_tracing(opts.tracing);
}
let task = if is_memory(&path) {
ConnectTask {
io: Arc::new(turso_core::MemoryIO::new()),
path,
}
} else {
let io = Arc::new(Opfs::new()?);
ConnectTask { io, path }
};
Ok(AsyncTask::new(task))
}
#[napi]
#[derive(Clone)]
pub struct Opfs;
pub struct Opfs {
inner: Arc<OpfsInner>,
}
pub struct OpfsInner {
completion_no: RefCell<u32>,
completions: RefCell<HashMap<u32, Completion>>,
}
thread_local! {
static OPFS: Arc<Opfs> = Arc::new(Opfs::default());
}
#[napi]
#[derive(Clone)]
struct OpfsFile {
handle: i32,
opfs: Opfs,
}
unsafe impl Send for Opfs {}
unsafe impl Sync for Opfs {}
#[napi]
// we offload connect to the web-worker because
// turso-db use blocking IO [io.wait_for_completion(c)] in few places during initialization path
pub fn connect_db_async(
path: String,
opts: Option<DatabaseOpts>,
) -> Result<AsyncTask<ConnectTask>> {
let io: Arc<dyn turso_core::IO> = if is_memory(&path) {
Arc::new(MemoryIO::new())
} else {
// we must create OPFS IO on the main thread
opfs()
};
let task = ConnectTask { path, io, opts };
Ok(AsyncTask::new(task))
}
#[napi]
pub fn complete_opfs(completion_no: u32, result: i32) {
OPFS.with(|opfs| opfs.complete(completion_no, result))
}
pub fn opfs() -> Arc<Opfs> {
OPFS.with(|opfs| opfs.clone())
}
impl Opfs {
#[napi(constructor)]
pub fn new() -> napi::Result<Self> {
Ok(Self)
pub fn complete(&self, completion_no: u32, result: i32) {
let completion = {
let mut completions = self.inner.completions.borrow_mut();
completions.remove(&completion_no).unwrap()
};
completion.complete(result);
}
fn register_completion(&self, c: Completion) -> u32 {
let inner = &self.inner;
*inner.completion_no.borrow_mut() += 1;
let completion_no = *inner.completion_no.borrow();
tracing::debug!(
"register completion: {} {:?}",
completion_no,
Arc::as_ptr(inner)
);
inner.completions.borrow_mut().insert(completion_no, c);
completion_no
}
}
impl Clock for Opfs {
fn now(&self) -> Instant {
Instant { secs: 0, micros: 0 } // TODO
let now = chrono::Local::now();
Instant {
secs: now.timestamp(),
micros: now.timestamp_subsec_micros(),
}
}
}
impl Default for Opfs {
fn default() -> Self {
Self {
#[allow(clippy::arc_with_non_send_sync)]
inner: Arc::new(OpfsInner {
completion_no: RefCell::new(0),
completions: RefCell::new(HashMap::new()),
}),
}
}
}
@@ -127,6 +156,13 @@ extern "C" {
fn sync(handle: i32) -> i32;
fn truncate(handle: i32, length: usize) -> i32;
fn size(handle: i32) -> i32;
fn write_async(handle: i32, buffer: *const u8, buffer_len: usize, offset: i32, c: u32);
fn sync_async(handle: i32, c: u32);
fn read_async(handle: i32, buffer: *mut u8, buffer_len: usize, offset: i32, c: u32);
fn truncate_async(handle: i32, length: usize, c: u32);
// fn size_async(handle: i32) -> i32;
fn is_web_worker() -> bool;
}
@@ -144,7 +180,12 @@ impl IO for Opfs {
tracing::info!("open_file: {}", path);
let result = unsafe { lookup_file(path.as_ptr(), path.len()) };
if result >= 0 {
Ok(Arc::new(OpfsFile { handle: result }))
Ok(Arc::new(OpfsFile {
handle: result,
opfs: Opfs {
inner: self.inner.clone(),
},
}))
} else if result == -404 {
Err(turso_core::LimboError::InternalError(format!(
"unexpected path {path}: files must be created in advance for OPFS IO"
@@ -175,17 +216,32 @@ impl File for OpfsFile {
pos: u64,
c: turso_core::Completion,
) -> turso_core::Result<turso_core::Completion> {
assert!(
is_web_worker_safe(),
"opfs must be used only from web worker for now"
let web_worker = is_web_worker_safe();
tracing::debug!(
"pread({}, is_web_worker={}): pos={}",
self.handle,
web_worker,
pos
);
tracing::debug!("pread({}): pos={}", self.handle, pos);
let handle = self.handle;
let read_c = c.as_read();
let buffer = read_c.buf_arc();
let buffer = buffer.as_mut_slice();
let result = unsafe { read(handle, buffer.as_mut_ptr(), buffer.len(), pos as i32) };
c.complete(result as i32);
if web_worker {
let result = unsafe { read(handle, buffer.as_mut_ptr(), buffer.len(), pos as i32) };
c.complete(result as i32);
} else {
let completion_no = self.opfs.register_completion(c.clone());
unsafe {
read_async(
handle,
buffer.as_mut_ptr(),
buffer.len(),
pos as i32,
completion_no,
)
};
}
Ok(c)
}
@@ -195,27 +251,44 @@ impl File for OpfsFile {
buffer: Arc<turso_core::Buffer>,
c: turso_core::Completion,
) -> turso_core::Result<turso_core::Completion> {
assert!(
is_web_worker_safe(),
"opfs must be used only from web worker for now"
let web_worker = is_web_worker_safe();
tracing::debug!(
"pwrite({}, is_web_worker={}): pos={}",
self.handle,
web_worker,
pos
);
tracing::debug!("pwrite({}): pos={}", self.handle, pos);
let handle = self.handle;
let buffer = buffer.as_slice();
let result = unsafe { write(handle, buffer.as_ptr(), buffer.len(), pos as i32) };
c.complete(result as i32);
if web_worker {
let result = unsafe { write(handle, buffer.as_ptr(), buffer.len(), pos as i32) };
c.complete(result as i32);
} else {
let completion_no = self.opfs.register_completion(c.clone());
unsafe {
write_async(
handle,
buffer.as_ptr(),
buffer.len(),
pos as i32,
completion_no,
)
};
}
Ok(c)
}
fn sync(&self, c: turso_core::Completion) -> turso_core::Result<turso_core::Completion> {
assert!(
is_web_worker_safe(),
"opfs must be used only from web worker for now"
);
tracing::debug!("sync({})", self.handle);
let web_worker = is_web_worker_safe();
tracing::debug!("sync({}, is_web_worker={})", self.handle, web_worker);
let handle = self.handle;
let result = unsafe { sync(handle) };
c.complete(result as i32);
if web_worker {
let result = unsafe { sync(handle) };
c.complete(result as i32);
} else {
let completion_no = self.opfs.register_completion(c.clone());
unsafe { sync_async(handle, completion_no) };
}
Ok(c)
}
@@ -224,14 +297,21 @@ impl File for OpfsFile {
len: u64,
c: turso_core::Completion,
) -> turso_core::Result<turso_core::Completion> {
assert!(
is_web_worker_safe(),
"opfs must be used only from web worker for now"
let web_worker = is_web_worker_safe();
tracing::debug!(
"truncate({}, is_web_worker={}): len={}",
self.handle,
web_worker,
len
);
tracing::debug!("truncate({}): len={}", self.handle, len);
let handle = self.handle;
let result = unsafe { truncate(handle, len as usize) };
c.complete(result as i32);
if web_worker {
let result = unsafe { truncate(handle, len as usize) };
c.complete(result as i32);
} else {
let completion_no = self.opfs.register_completion(c.clone());
unsafe { truncate_async(handle, len as usize, completion_no) };
}
Ok(c)
}

View File

@@ -60,6 +60,8 @@ pub(crate) fn init_tracing(level_filter: Option<String>) {
return;
};
let level_filter = match level_filter.as_ref() {
"error" => LevelFilter::ERROR,
"warn" => LevelFilter::WARN,
"info" => LevelFilter::INFO,
"debug" => LevelFilter::DEBUG,
"trace" => LevelFilter::TRACE,
@@ -76,10 +78,6 @@ pub(crate) fn init_tracing(level_filter: Option<String>) {
}
pub enum DbTask {
Batch {
conn: Arc<turso_core::Connection>,
sql: String,
},
Step {
stmt: Arc<RefCell<Option<turso_core::Statement>>>,
},
@@ -93,10 +91,6 @@ impl Task for DbTask {
fn compute(&mut self) -> Result<Self::Output> {
match self {
DbTask::Batch { conn, sql } => {
batch_sync(conn, sql)?;
Ok(0)
}
DbTask::Step { stmt } => step_sync(stmt),
}
}
@@ -107,20 +101,11 @@ impl Task for DbTask {
}
#[napi(object)]
#[derive(Clone)]
pub struct DatabaseOpts {
pub tracing: Option<String>,
}
fn batch_sync(conn: &Arc<turso_core::Connection>, sql: &str) -> napi::Result<()> {
conn.prepare_execute_batch(sql).map_err(|e| {
Error::new(
Status::GenericFailure,
format!("Failed to execute batch: {e}"),
)
})?;
Ok(())
}
fn step_sync(stmt: &Arc<RefCell<Option<turso_core::Statement>>>) -> napi::Result<u32> {
let mut stmt_ref = stmt.borrow_mut();
let stmt = stmt_ref
@@ -152,21 +137,38 @@ impl Database {
/// # Arguments
/// * `path` - The path to the database file.
#[napi(constructor)]
pub fn new_napi(path: String, opts: Option<DatabaseOpts>) -> Result<Self> {
Self::new(path, opts)
}
pub fn new(path: String, opts: Option<DatabaseOpts>) -> Result<Self> {
if let Some(opts) = opts {
init_tracing(opts.tracing);
}
let io: Arc<dyn turso_core::IO> = if is_memory(&path) {
Arc::new(turso_core::MemoryIO::new())
} else {
Arc::new(turso_core::PlatformIO::new().map_err(|e| {
Error::new(Status::GenericFailure, format!("Failed to create IO: {e}"))
})?)
#[cfg(not(feature = "browser"))]
{
Arc::new(turso_core::PlatformIO::new().map_err(|e| {
Error::new(Status::GenericFailure, format!("Failed to create IO: {e}"))
})?)
}
#[cfg(feature = "browser")]
{
return Err(napi::Error::new(
napi::Status::GenericFailure,
"FS-backed db must be initialized through connectDbAsync function in the browser",
));
}
};
Self::new_io(path, io, opts)
}
#[cfg(feature = "browser")]
if !is_memory(&path) {
return Err(Error::new(Status::GenericFailure, "sync constructor is not supported for FS-backed databases in the WASM. Use async connect(...) method instead".to_string()));
pub fn new_io(
path: String,
io: Arc<dyn turso_core::IO>,
opts: Option<DatabaseOpts>,
) -> Result<Self> {
if let Some(opts) = opts {
init_tracing(opts.tracing);
}
let file = io
@@ -233,33 +235,6 @@ impl Database {
self.is_open.get()
}
/// Executes a batch of SQL statements on main thread
///
/// # Arguments
///
/// * `sql` - The SQL statements to execute.
///
/// # Returns
#[napi]
pub fn batch_sync(&self, sql: String) -> Result<()> {
batch_sync(&self.conn()?, &sql)
}
/// Executes a batch of SQL statements outside of main thread
///
/// # Arguments
///
/// * `sql` - The SQL statements to execute.
///
/// # Returns
#[napi(ts_return_type = "Promise<void>")]
pub fn batch_async(&self, sql: String) -> Result<AsyncTask<DbTask>> {
Ok(AsyncTask::new(DbTask::Batch {
conn: self.conn()?.clone(),
sql,
}))
}
/// Prepares a statement for execution.
///
/// # Arguments
@@ -325,8 +300,8 @@ impl Database {
#[napi]
pub fn close(&mut self) -> Result<()> {
self.is_open.set(false);
let _ = self._db.take();
let _ = self.conn.take().unwrap();
let _ = self._db.take();
Ok(())
}
@@ -621,7 +596,12 @@ impl Statement {
/// Finalizes the statement.
#[napi]
pub fn finalize(&self) -> Result<()> {
self.stmt.borrow_mut().take();
match self.stmt.try_borrow_mut() {
Ok(mut stmt) => {
stmt.take();
}
Err(err) => tracing::error!("borrow error: {:?}", err),
}
Ok(())
}
}

View File

@@ -10,7 +10,6 @@ repository.workspace = true
crate-type = ["cdylib"]
[dependencies]
http = "1.3.1"
napi = { version = "3.1.3", default-features = false, features = ["napi6"] }
napi-derive = { version = "3.1.1", default-features = true }
turso_sync_engine = { workspace = true }
@@ -18,9 +17,10 @@ turso_core = { workspace = true }
turso_node = { workspace = true }
genawaiter = { version = "0.99.1", default-features = false }
tracing-subscriber = { workspace = true }
tracing.workspace = true
[build-dependencies]
napi-build = "2.2.3"
[features]
browser = ["turso_node/browser"]
browser = ["turso_node/browser"]

View File

@@ -42,7 +42,7 @@
"tsc-build": "npm exec tsc && cp sync.wasm32-wasi.wasm ./dist/sync.wasm32-wasi.wasm && WASM_FILE=sync.wasm32-wasi.wasm JS_FILE=./dist/wasm-inline.js node ../../../scripts/inline-wasm-base64.js && npm run bundle",
"bundle": "vite build",
"build": "npm run napi-build && npm run tsc-build",
"test": "VITE_TURSO_DB_URL=http://b--a--a.localhost:10000 CI=1 vitest --browser=chromium --run && VITE_TURSO_DB_URL=http://b--a--a.localhost:10000 CI=1 vitest --browser=firefox --run"
"test": "VITE_TURSO_DB_URL=http://f--a--a.localhost:10000 CI=1 vitest --testTimeout 30000 --browser=chromium --run && VITE_TURSO_DB_URL=http://f--a--a.localhost:10000 CI=1 vitest --testTimeout 30000 --browser=firefox --run"
},
"napi": {
"binaryName": "sync",

View File

@@ -260,6 +260,130 @@ test('persistence-pull-push', async () => {
expect(rows2.sort(localeCompare)).toEqual(expected.sort(localeCompare))
})
test('pull-push-concurrent', async () => {
{
const db = await connect({ path: ':memory:', url: process.env.VITE_TURSO_DB_URL, longPollTimeoutMs: 5000 });
await db.exec("CREATE TABLE IF NOT EXISTS q(x TEXT PRIMARY KEY, y)");
await db.exec("DELETE FROM q");
await db.push();
await db.close();
}
let pullResolve = null;
const pullFinish = new Promise(resolve => pullResolve = resolve);
let pushResolve = null;
const pushFinish = new Promise(resolve => pushResolve = resolve);
let stopPull = false;
let stopPush = false;
const db = await connect({ path: ':memory:', url: process.env.VITE_TURSO_DB_URL });
let pull = async () => {
try {
await db.pull();
} catch (e) {
console.error('pull', e);
} finally {
if (!stopPull) {
setTimeout(pull, 0);
} else {
pullResolve()
}
}
}
let push = async () => {
try {
if ((await db.stats()).operations > 0) {
await db.push();
}
} catch (e) {
console.error('push', e);
} finally {
if (!stopPush) {
setTimeout(push, 0);
} else {
pushResolve();
}
}
}
setTimeout(pull, 0);
setTimeout(push, 0);
for (let i = 0; i < 1000; i++) {
await db.exec(`INSERT INTO q VALUES ('k${i}', 'v${i}')`);
}
await new Promise(resolve => setTimeout(resolve, 1000));
stopPush = true;
await pushFinish;
stopPull = true;
await pullFinish;
console.info(await db.stats());
})
test('concurrent-updates', { timeout: 60000 }, async () => {
{
const db = await connect({ path: ':memory:', url: process.env.VITE_TURSO_DB_URL, longPollTimeoutMs: 10 });
await db.exec("CREATE TABLE IF NOT EXISTS three(x TEXT PRIMARY KEY, y, z)");
await db.exec("DELETE FROM three");
await db.push();
await db.close();
}
let stop = false;
const dbs = [];
for (let i = 0; i < 8; i++) {
dbs.push(await connect({ path: ':memory:', url: process.env.VITE_TURSO_DB_URL }));
}
async function pull(db, i) {
try {
console.info('pull', i);
await db.pull();
} catch (e) {
console.error('pull', i, e);
} finally {
if (!stop) {
setTimeout(async () => await pull(db, i), 0);
}
}
}
async function push(db, i) {
try {
console.info('push', i);
await db.push();
} catch (e) {
console.error('push', i, e);
} finally {
if (!stop) {
setTimeout(async () => await push(db, i), 0);
}
}
}
for (let i = 0; i < dbs.length; i++) {
setTimeout(async () => await pull(dbs[i], i), 0)
setTimeout(async () => await push(dbs[i], i), 0)
}
for (let i = 0; i < 1000; i++) {
try {
const tasks = [];
for (let s = 0; s < dbs.length; s++) {
tasks.push(dbs[s].exec(`INSERT INTO three VALUES ('${s}', 0, randomblob(128)) ON CONFLICT DO UPDATE SET y = y + 1, z = randomblob(128)`));
}
await Promise.all(tasks);
} catch (e) {
// ignore
}
await new Promise(resolve => setTimeout(resolve, 1));
}
stop = true;
await Promise.all(dbs.map(db => db.push()));
await Promise.all(dbs.map(db => db.pull()));
let results = [];
for (let i = 0; i < dbs.length; i++) {
results.push(await dbs[i].prepare('SELECT x, y FROM three').all());
}
for (let i = 0; i < dbs.length; i++) {
expect(results[i]).toEqual(results[0]);
for (let s = 0; s < dbs.length; s++) {
expect(results[i][s].y).toBeGreaterThan(500);
}
}
})
test('transform', async () => {
{
const db = await connect({

View File

@@ -1,6 +1,6 @@
import { registerFileAtWorker, unregisterFileAtWorker } from "@tursodatabase/database-browser-common"
import { DatabasePromise, DatabaseOpts, NativeDatabase } from "@tursodatabase/database-common"
import { ProtocolIo, run, SyncOpts, RunOpts, memoryIO, SyncEngineStats } from "@tursodatabase/sync-common";
import { ProtocolIo, run, SyncOpts, RunOpts, memoryIO, SyncEngineStats, SyncEngineGuards } from "@tursodatabase/sync-common";
let BrowserIo: ProtocolIo = {
async read(path: string): Promise<Buffer | Uint8Array | null> {
@@ -24,6 +24,7 @@ class Database extends DatabasePromise {
io: ProtocolIo;
worker: Worker | null;
fsPath: string | null;
guards: SyncEngineGuards;
constructor(db: NativeDatabase, io: ProtocolIo, worker: Worker | null, runOpts: RunOpts, engine: any, fsPath: string | null, opts: DatabaseOpts = {}) {
super(db, opts)
this.io = io;
@@ -31,18 +32,21 @@ class Database extends DatabasePromise {
this.runOpts = runOpts;
this.engine = engine;
this.fsPath = fsPath;
this.guards = new SyncEngineGuards();
}
async sync() {
await run(this.runOpts, this.io, this.engine, this.engine.sync());
await this.push();
await this.pull();
}
async pull() {
await run(this.runOpts, this.io, this.engine, this.engine.pull());
const changes = await this.guards.wait(async () => await run(this.runOpts, this.io, this.engine, this.engine.wait()));
await this.guards.apply(async () => await run(this.runOpts, this.io, this.engine, this.engine.apply(changes)));
}
async push() {
await run(this.runOpts, this.io, this.engine, this.engine.push());
await this.guards.push(async () => await run(this.runOpts, this.io, this.engine, this.engine.push()));
}
async checkpoint() {
await run(this.runOpts, this.io, this.engine, this.engine.checkpoint());
await this.guards.checkpoint(async () => await run(this.runOpts, this.io, this.engine, this.engine.checkpoint()));
}
async stats(): Promise<SyncEngineStats> {
return (await run(this.runOpts, this.io, this.engine, this.engine.stats()));
@@ -76,7 +80,8 @@ async function connect(opts: SyncOpts, connect: (any) => any, init: () => Promis
tablesIgnore: opts.tablesIgnore,
useTransform: opts.transform != null,
tracing: opts.tracing,
protocolVersion: 1
protocolVersion: 1,
longPollTimeoutMs: opts.longPollTimeoutMs
});
const runOpts: RunOpts = {
url: opts.url,

View File

@@ -1,5 +1,5 @@
import { run, memoryIO } from "./run.js"
import { run, memoryIO, SyncEngineGuards } from "./run.js"
import { SyncOpts, ProtocolIo, RunOpts, DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult, SyncEngineStats } from "./types.js"
export { run, memoryIO, }
export { run, memoryIO, SyncEngineGuards }
export type { SyncOpts, ProtocolIo, RunOpts, DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult, SyncEngineStats }

View File

@@ -21,5 +21,8 @@
"tsc-build": "npm exec tsc",
"build": "npm run tsc-build",
"test": "echo 'no tests'"
},
"dependencies": {
"@tursodatabase/database-common": "^0.2.0-pre.3"
}
}

View File

@@ -1,6 +1,7 @@
"use strict";
import { GeneratorResponse, ProtocolIo, RunOpts } from "./types.js";
import { AsyncLock } from "@tursodatabase/database-common";
const GENERATOR_RESUME_IO = 0;
const GENERATOR_RESUME_DONE = 1;
@@ -114,6 +115,10 @@ export async function run(opts: RunOpts, io: ProtocolIo, engine: any, generator:
if (type == 'SyncEngineStats') {
return rest;
}
if (type == 'SyncEngineChanges') {
//@ts-ignore
return rest.changes;
}
for (let request = engine.protocolIo(); request != null; request = engine.protocolIo()) {
tasks.push(trackPromise(process(opts, io, request)));
}
@@ -124,4 +129,67 @@ export async function run(opts: RunOpts, io: ProtocolIo, engine: any, generator:
tasks = tasks.filter(t => !t.finished);
}
return generator.take();
}
export class SyncEngineGuards {
waitLock: AsyncLock;
pushLock: AsyncLock;
pullLock: AsyncLock;
checkpointLock: AsyncLock;
constructor() {
this.waitLock = new AsyncLock();
this.pushLock = new AsyncLock();
this.pullLock = new AsyncLock();
this.checkpointLock = new AsyncLock();
}
async wait(f: () => Promise<any>): Promise<any> {
try {
await this.waitLock.acquire();
return await f();
} finally {
this.waitLock.release();
}
}
async push(f: () => Promise<void>) {
try {
await this.pushLock.acquire();
await this.pullLock.acquire();
await this.checkpointLock.acquire();
return await f();
} finally {
this.pushLock.release();
this.pullLock.release();
this.checkpointLock.release();
}
}
async apply(f: () => Promise<void>) {
try {
await this.waitLock.acquire();
await this.pushLock.acquire();
await this.pullLock.acquire();
await this.checkpointLock.acquire();
return await f();
} finally {
this.waitLock.release();
this.pushLock.release();
this.pullLock.release();
this.checkpointLock.release();
}
}
async checkpoint(f: () => Promise<void>) {
try {
await this.waitLock.acquire();
await this.pushLock.acquire();
await this.pullLock.acquire();
await this.checkpointLock.acquire();
return await f();
} finally {
this.waitLock.release();
this.pushLock.release();
this.pullLock.release();
this.checkpointLock.release();
}
}
}

View File

@@ -3,8 +3,9 @@
"skipLibCheck": true,
"declaration": true,
"declarationMap": true,
"module": "esnext",
"module": "nodenext",
"target": "esnext",
"moduleResolution": "nodenext",
"outDir": "dist/",
"lib": [
"es2020",

View File

@@ -36,6 +36,7 @@ export interface SyncOpts {
encryptionKey?: string;
tablesIgnore?: string[],
transform?: Transform,
longPollTimeoutMs?: number,
tracing?: string,
}
@@ -53,4 +54,4 @@ export interface SyncEngineStats {
revision: string | null;
}
export type GeneratorResponse = { type: 'IO' } | { type: 'Done' } | ({ type: 'SyncEngineStats' } & SyncEngineStats)
export type GeneratorResponse = { type: 'IO' } | { type: 'Done' } | ({ type: 'SyncEngineStats' } & SyncEngineStats) | { type: 'SyncEngineChanges', changes: any }

View File

@@ -15,26 +15,6 @@ export declare class Database {
get path(): string
/** Returns whether the database connection is open. */
get open(): boolean
/**
* Executes a batch of SQL statements on main thread
*
* # Arguments
*
* * `sql` - The SQL statements to execute.
*
* # Returns
*/
batchSync(sql: string): void
/**
* Executes a batch of SQL statements outside of main thread
*
* # Arguments
*
* * `sql` - The SQL statements to execute.
*
* # Returns
*/
batchAsync(sql: string): Promise<void>
/**
* Prepares a statement for execution.
*
@@ -178,15 +158,19 @@ export declare class SyncEngine {
/** Runs the I/O loop asynchronously, returning a Promise. */
ioLoopAsync(): Promise<void>
protocolIo(): JsProtocolRequestBytes | null
sync(): GeneratorHolder
push(): GeneratorHolder
stats(): GeneratorHolder
pull(): GeneratorHolder
wait(): GeneratorHolder
apply(changes: SyncEngineChanges): GeneratorHolder
checkpoint(): GeneratorHolder
open(): Database
close(): void
}
export declare class SyncEngineChanges {
}
export declare const enum DatabaseChangeTypeJs {
Insert = 0,
Update = 1,
@@ -220,7 +204,8 @@ export type DatabaseRowTransformResultJs =
export type GeneratorResponse =
| { type: 'IO' }
| { type: 'Done' }
| { type: 'SyncEngineStats', operations: number, mainWal: number, revertWal: number, lastPullUnixTime: number, lastPushUnixTime?: number }
| { type: 'SyncEngineStats', operations: number, mainWal: number, revertWal: number, lastPullUnixTime: number, lastPushUnixTime?: number, revision?: string }
| { type: 'SyncEngineChanges', changes: SyncEngineChanges }
export type JsProtocolRequest =
| { type: 'Http', method: string, path: string, body?: Array<number>, headers: Array<[string, string]> }
@@ -232,6 +217,7 @@ export interface SyncEngineOpts {
path: string
clientName?: string
walPullBatchSize?: number
longPollTimeoutMs?: number
tracing?: string
tablesIgnore?: Array<string>
useTransform: boolean

View File

@@ -508,7 +508,7 @@ if (!nativeBinding) {
throw new Error(`Failed to load native binding`)
}
const { Database, Statement, GeneratorHolder, JsDataCompletion, JsProtocolIo, JsProtocolRequestBytes, SyncEngine, DatabaseChangeTypeJs, SyncEngineProtocolVersion } = nativeBinding
const { Database, Statement, GeneratorHolder, JsDataCompletion, JsProtocolIo, JsProtocolRequestBytes, SyncEngine, SyncEngineChanges, DatabaseChangeTypeJs, SyncEngineProtocolVersion } = nativeBinding
export { Database }
export { Statement }
export { GeneratorHolder }
@@ -516,5 +516,6 @@ export { JsDataCompletion }
export { JsProtocolIo }
export { JsProtocolRequestBytes }
export { SyncEngine }
export { SyncEngineChanges }
export { DatabaseChangeTypeJs }
export { SyncEngineProtocolVersion }

View File

@@ -31,7 +31,7 @@
"napi-artifacts": "napi artifacts --output-dir .",
"tsc-build": "npm exec tsc",
"build": "npm run napi-build && npm run tsc-build",
"test": "VITE_TURSO_DB_URL=http://b--a--a.localhost:10000 vitest --run",
"test": "VITE_TURSO_DB_URL=http://d--a--a.localhost:10000 vitest --run",
"prepublishOnly": "npm run napi-dirs && npm run napi-artifacts && napi prepublish -t npm"
},
"napi": {

View File

@@ -160,7 +160,7 @@ test('checkpoint', async () => {
await db1.checkpoint();
expect((await db1.stats()).mainWal).toBe(0);
let revertWal = (await db1.stats()).revertWal;
expect(revertWal).toBeLessThan(4096 * 1000 / 100);
expect(revertWal).toBeLessThan(4096 * 1000 / 50);
for (let i = 0; i < 1000; i++) {
await db1.exec(`UPDATE q SET y = 'u${i}' WHERE x = 'k${i}'`);
@@ -284,6 +284,119 @@ test('persistence-pull-push', async () => {
}
})
test('update', async () => {
{
const db = await connect({ path: ':memory:', url: process.env.VITE_TURSO_DB_URL, longPollTimeoutMs: 5000 });
await db.exec("CREATE TABLE IF NOT EXISTS q(x TEXT PRIMARY KEY, y)");
await db.exec("DELETE FROM q");
await db.push();
await db.close();
}
const db = await connect({ path: ':memory:', url: process.env.VITE_TURSO_DB_URL });
await db.exec("INSERT INTO q VALUES ('1', '2')")
await db.push();
await db.exec("INSERT INTO q VALUES ('1', '2') ON CONFLICT DO UPDATE SET y = '3'")
await db.push();
})
test('concurrent-updates', async () => {
{
const db = await connect({ path: ':memory:', url: process.env.VITE_TURSO_DB_URL, longPollTimeoutMs: 5000 });
await db.exec("CREATE TABLE IF NOT EXISTS q(x TEXT PRIMARY KEY, y)");
await db.exec("DELETE FROM q");
await db.push();
await db.close();
}
const db1 = await connect({ path: ':memory:', url: process.env.VITE_TURSO_DB_URL });
async function pull(db) {
try {
await db.pull();
} catch (e) {
// ignore
} finally {
setTimeout(async () => await pull(db), 0);
}
}
async function push(db) {
try {
await db.push();
} catch (e) {
// ignore
} finally {
setTimeout(async () => await push(db), 0);
}
}
setTimeout(async () => await pull(db1), 0)
setTimeout(async () => await push(db1), 0)
for (let i = 0; i < 1000; i++) {
try {
await Promise.all([
db1.exec(`INSERT INTO q VALUES ('1', 0) ON CONFLICT DO UPDATE SET y = ${i + 1}`),
db1.exec(`INSERT INTO q VALUES ('2', 0) ON CONFLICT DO UPDATE SET y = ${i + 1}`)
]);
} catch (e) {
// ignore
}
await new Promise(resolve => setTimeout(resolve, 1));
}
})
test('pull-push-concurrent', async () => {
{
const db = await connect({ path: ':memory:', url: process.env.VITE_TURSO_DB_URL, longPollTimeoutMs: 5000 });
await db.exec("CREATE TABLE IF NOT EXISTS q(x TEXT PRIMARY KEY, y)");
await db.exec("DELETE FROM q");
await db.push();
await db.close();
}
let pullResolve = null;
const pullFinish = new Promise(resolve => pullResolve = resolve);
let pushResolve = null;
const pushFinish = new Promise(resolve => pushResolve = resolve);
let stopPull = false;
let stopPush = false;
const db = await connect({ path: ':memory:', url: process.env.VITE_TURSO_DB_URL });
let pull = async () => {
try {
await db.pull();
} catch (e) {
console.error('pull', e);
} finally {
if (!stopPull) {
setTimeout(pull, 0);
} else {
pullResolve()
}
}
}
let push = async () => {
try {
if ((await db.stats()).operations > 0) {
await db.push();
}
} catch (e) {
console.error('push', e);
} finally {
if (!stopPush) {
setTimeout(push, 0);
} else {
pushResolve();
}
}
}
setTimeout(pull, 0);
setTimeout(push, 0);
for (let i = 0; i < 1000; i++) {
await db.exec(`INSERT INTO q VALUES ('k${i}', 'v${i}')`);
}
await new Promise(resolve => setTimeout(resolve, 1000));
stopPush = true;
await pushFinish;
stopPull = true;
await pullFinish;
console.info(await db.stats());
})
test('transform', async () => {
{
const db = await connect({

View File

@@ -1,5 +1,5 @@
import { DatabasePromise, DatabaseOpts, NativeDatabase } from "@tursodatabase/database-common"
import { ProtocolIo, run, SyncOpts, RunOpts, DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult, SyncEngineStats } from "@tursodatabase/sync-common";
import { ProtocolIo, run, SyncOpts, RunOpts, DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult, SyncEngineStats, SyncEngineGuards } from "@tursodatabase/sync-common";
import { Database as NativeDB, SyncEngine } from "#index";
import { promises } from "node:fs";
@@ -43,23 +43,27 @@ class Database extends DatabasePromise {
runOpts: RunOpts;
engine: any;
io: ProtocolIo;
guards: SyncEngineGuards
constructor(db: NativeDatabase, io: ProtocolIo, runOpts: RunOpts, engine: any, opts: DatabaseOpts = {}) {
super(db, opts)
this.runOpts = runOpts;
this.engine = engine;
this.io = io;
this.guards = new SyncEngineGuards();
}
async sync() {
await run(this.runOpts, this.io, this.engine, this.engine.sync());
await this.push();
await this.pull();
}
async pull() {
await run(this.runOpts, this.io, this.engine, this.engine.pull());
const changes = await this.guards.wait(async () => await run(this.runOpts, this.io, this.engine, this.engine.wait()));
await this.guards.apply(async () => await run(this.runOpts, this.io, this.engine, this.engine.apply(changes)));
}
async push() {
await run(this.runOpts, this.io, this.engine, this.engine.push());
await this.guards.push(async () => await run(this.runOpts, this.io, this.engine, this.engine.push()));
}
async checkpoint() {
await run(this.runOpts, this.io, this.engine, this.engine.checkpoint());
await this.guards.checkpoint(async () => await run(this.runOpts, this.io, this.engine, this.engine.checkpoint()));
}
async stats(): Promise<SyncEngineStats> {
return (await run(this.runOpts, this.io, this.engine, this.engine.stats()));
@@ -83,7 +87,8 @@ async function connect(opts: SyncOpts): Promise<Database> {
tablesIgnore: opts.tablesIgnore,
useTransform: opts.transform != null,
tracing: opts.tracing,
protocolVersion: 1
protocolVersion: 1,
longPollTimeoutMs: opts.longPollTimeoutMs,
});
const runOpts: RunOpts = {
url: opts.url,

View File

@@ -5,7 +5,7 @@ use std::{
sync::{Arc, Mutex},
};
use turso_sync_engine::types::ProtocolCommand;
use turso_sync_engine::types::{DbChangesStatus, ProtocolCommand};
pub const GENERATOR_RESUME_IO: u32 = 0;
pub const GENERATOR_RESUME_DONE: u32 = 1;
@@ -35,7 +35,12 @@ impl<F: Future<Output = turso_sync_engine::Result<()>>> Generator
}
}
#[napi(discriminant = "type")]
#[napi]
pub struct SyncEngineChanges {
pub(crate) status: Box<Option<DbChangesStatus>>,
}
#[napi(discriminant = "type", object_from_js = false)]
pub enum GeneratorResponse {
IO,
Done,
@@ -47,6 +52,9 @@ pub enum GeneratorResponse {
last_push_unix_time: Option<i64>,
revision: Option<String>,
},
SyncEngineChanges {
changes: SyncEngineChanges,
},
}
#[napi]

View File

@@ -6,7 +6,7 @@ pub mod js_protocol_io;
use std::{
collections::HashMap,
sync::{Arc, Mutex, OnceLock, RwLock, RwLockReadGuard, RwLockWriteGuard},
sync::{Arc, Mutex, OnceLock, RwLock, RwLockReadGuard},
};
use napi::bindgen_prelude::{AsyncTask, Either5, Null};
@@ -19,7 +19,7 @@ use turso_sync_engine::{
};
use crate::{
generator::{GeneratorHolder, GeneratorResponse},
generator::{GeneratorHolder, GeneratorResponse, SyncEngineChanges},
js_protocol_io::{JsProtocolIo, JsProtocolRequestBytes},
};
@@ -33,6 +33,7 @@ pub struct SyncEngine {
path: String,
client_name: String,
wal_pull_batch_size: u32,
long_poll_timeout: Option<std::time::Duration>,
protocol_version: DatabaseSyncEngineProtocolVersion,
tables_ignore: Vec<String>,
use_transform: bool,
@@ -123,6 +124,7 @@ pub struct SyncEngineOpts {
pub path: String,
pub client_name: Option<String>,
pub wal_pull_batch_size: Option<u32>,
pub long_poll_timeout_ms: Option<u32>,
pub tracing: Option<String>,
pub tables_ignore: Option<Vec<String>>,
pub use_transform: bool,
@@ -147,6 +149,8 @@ impl SyncEngine {
pub fn new(opts: SyncEngineOpts) -> napi::Result<Self> {
// helpful for local debugging
match opts.tracing.as_deref() {
Some("error") => init_tracing(LevelFilter::ERROR),
Some("warn") => init_tracing(LevelFilter::WARN),
Some("info") => init_tracing(LevelFilter::INFO),
Some("debug") => init_tracing(LevelFilter::DEBUG),
Some("trace") => init_tracing(LevelFilter::TRACE),
@@ -167,13 +171,16 @@ impl SyncEngine {
}
#[cfg(feature = "browser")]
{
Arc::new(turso_node::browser::Opfs::new()?)
turso_node::browser::opfs()
}
};
Ok(SyncEngine {
path: opts.path,
client_name: opts.client_name.unwrap_or("turso-sync-js".to_string()),
wal_pull_batch_size: opts.wal_pull_batch_size.unwrap_or(100),
long_poll_timeout: opts
.long_poll_timeout_ms
.map(|x| std::time::Duration::from_millis(x as u64)),
tables_ignore: opts.tables_ignore.unwrap_or_default(),
use_transform: opts.use_transform,
#[allow(clippy::arc_with_non_send_sync)]
@@ -196,6 +203,7 @@ impl SyncEngine {
let opts = DatabaseSyncEngineOpts {
client_name: self.client_name.clone(),
wal_pull_batch_size: self.wal_pull_batch_size as u64,
long_poll_timeout: self.long_poll_timeout,
tables_ignore: self.tables_ignore.clone(),
use_transform: self.use_transform,
protocol_version_hint: self.protocol_version,
@@ -244,20 +252,10 @@ impl SyncEngine {
Ok(self.protocol()?.take_request())
}
#[napi]
pub fn sync(&self) -> GeneratorHolder {
self.run(async move |coro, sync_engine| {
let mut sync_engine = try_write(sync_engine)?;
let sync_engine = try_unwrap_mut(&mut sync_engine)?;
sync_engine.sync(coro).await?;
Ok(None)
})
}
#[napi]
pub fn push(&self) -> GeneratorHolder {
self.run(async move |coro, sync_engine| {
let sync_engine = try_read(sync_engine)?;
self.run(async move |coro, guard| {
let sync_engine = try_read(guard)?;
let sync_engine = try_unwrap(&sync_engine)?;
sync_engine.push_changes_to_remote(coro).await?;
Ok(None)
@@ -266,8 +264,8 @@ impl SyncEngine {
#[napi]
pub fn stats(&self) -> GeneratorHolder {
self.run(async move |coro, sync_engine| {
let sync_engine = try_read(sync_engine)?;
self.run(async move |coro, guard| {
let sync_engine = try_read(guard)?;
let sync_engine = try_unwrap(&sync_engine)?;
let stats = sync_engine.stats(coro).await?;
Ok(Some(GeneratorResponse::SyncEngineStats {
@@ -282,20 +280,34 @@ impl SyncEngine {
}
#[napi]
pub fn pull(&self) -> GeneratorHolder {
self.run(async move |coro, sync_engine| {
let mut sync_engine = try_write(sync_engine)?;
let sync_engine = try_unwrap_mut(&mut sync_engine)?;
sync_engine.pull_changes_from_remote(coro).await?;
pub fn wait(&self) -> GeneratorHolder {
self.run(async move |coro, guard| {
let sync_engine = try_read(guard)?;
let sync_engine = try_unwrap(&sync_engine)?;
Ok(Some(GeneratorResponse::SyncEngineChanges {
changes: SyncEngineChanges {
status: Box::new(Some(sync_engine.wait_changes_from_remote(coro).await?)),
},
}))
})
}
#[napi]
pub fn apply(&self, changes: &mut SyncEngineChanges) -> GeneratorHolder {
let status = changes.status.take().unwrap();
self.run(async move |coro, guard| {
let sync_engine = try_read(guard)?;
let sync_engine = try_unwrap(&sync_engine)?;
sync_engine.apply_changes_from_remote(coro, status).await?;
Ok(None)
})
}
#[napi]
pub fn checkpoint(&self) -> GeneratorHolder {
self.run(async move |coro, sync_engine| {
let mut sync_engine = try_write(sync_engine)?;
let sync_engine = try_unwrap_mut(&mut sync_engine)?;
self.run(async move |coro, guard| {
let sync_engine = try_read(guard)?;
let sync_engine = try_unwrap(&sync_engine)?;
sync_engine.checkpoint(coro).await?;
Ok(None)
})
@@ -378,18 +390,6 @@ fn try_read(
Ok(sync_engine)
}
fn try_write(
sync_engine: &RwLock<Option<DatabaseSyncEngine<JsProtocolIo>>>,
) -> turso_sync_engine::Result<RwLockWriteGuard<'_, Option<DatabaseSyncEngine<JsProtocolIo>>>> {
let Ok(sync_engine) = sync_engine.try_write() else {
let nasty_error = "sync_engine is busy".to_string();
return Err(turso_sync_engine::errors::Error::DatabaseSyncEngineError(
nasty_error,
));
};
Ok(sync_engine)
}
fn try_unwrap<'a>(
sync_engine: &'a RwLockReadGuard<'_, Option<DatabaseSyncEngine<JsProtocolIo>>>,
) -> turso_sync_engine::Result<&'a DatabaseSyncEngine<JsProtocolIo>> {
@@ -401,15 +401,3 @@ fn try_unwrap<'a>(
};
Ok(sync_engine)
}
fn try_unwrap_mut<'a>(
sync_engine: &'a mut RwLockWriteGuard<'_, Option<DatabaseSyncEngine<JsProtocolIo>>>,
) -> turso_sync_engine::Result<&'a mut DatabaseSyncEngine<JsProtocolIo>> {
let Some(sync_engine) = sync_engine.as_mut() else {
let error = "sync_engine must be initialized".to_string();
return Err(turso_sync_engine::errors::Error::DatabaseSyncEngineError(
error,
));
};
Ok(sync_engine)
}

File diff suppressed because it is too large Load Diff

View File

@@ -1639,11 +1639,41 @@ impl WalFile {
}
fn get_shared_mut(&self) -> parking_lot::RwLockWriteGuard<'_, WalFileShared> {
self.shared.write()
// WASM in browser main thread doesn't have a way to "park" a thread
// so, we spin way here instead of calling blocking lock
#[cfg(target_family = "wasm")]
{
loop {
let Some(lock) = self.shared.try_write() else {
std::hint::spin_loop();
continue;
};
return lock;
}
}
#[cfg(not(target_family = "wasm"))]
{
self.shared.write()
}
}
fn get_shared(&self) -> parking_lot::RwLockReadGuard<'_, WalFileShared> {
self.shared.read()
// WASM in browser main thread doesn't have a way to "park" a thread
// so, we spin way here instead of calling blocking lock
#[cfg(target_family = "wasm")]
{
loop {
let Some(lock) = self.shared.try_read() else {
std::hint::spin_loop();
continue;
};
return lock;
}
}
#[cfg(not(target_family = "wasm"))]
{
self.shared.read()
}
}
fn complete_append_frame(&mut self, page_id: u64, frame_id: u64, checksums: (u32, u32)) {

View File

@@ -264,13 +264,12 @@ impl DatabaseReplayGenerator {
let update = self.update_query(coro, table_name, &columns).await?;
Ok(update)
} else {
let columns = [true].repeat(after.len());
let update = self.update_query(coro, table_name, &columns).await?;
Ok(update)
let upsert = self.upsert_query(coro, table_name, after.len()).await?;
Ok(upsert)
}
}
DatabaseTapeRowChangeType::Insert { after } => {
let insert = self.insert_query(coro, table_name, after.len()).await?;
let insert = self.upsert_query(coro, table_name, after.len()).await?;
Ok(insert)
}
}
@@ -320,7 +319,7 @@ impl DatabaseReplayGenerator {
is_ddl_replay: false,
})
}
pub(crate) async fn insert_query<Ctx>(
pub(crate) async fn upsert_query<Ctx>(
&self,
coro: &Coro<Ctx>,
table_name: &str,

View File

@@ -1,5 +1,4 @@
use std::{
cell::RefCell,
collections::{HashMap, HashSet},
sync::{Arc, Mutex},
};
@@ -37,6 +36,7 @@ pub struct DatabaseSyncEngineOpts {
pub tables_ignore: Vec<String>,
pub use_transform: bool,
pub wal_pull_batch_size: u64,
pub long_poll_timeout: Option<std::time::Duration>,
pub protocol_version_hint: DatabaseSyncEngineProtocolVersion,
}
@@ -51,7 +51,7 @@ pub struct DatabaseSyncEngine<P: ProtocolIO> {
meta_path: String,
changes_file: Arc<Mutex<Option<Arc<dyn turso_core::File>>>>,
opts: DatabaseSyncEngineOpts,
meta: RefCell<DatabaseMetadata>,
meta: Mutex<DatabaseMetadata>,
client_unique_id: String,
}
@@ -147,7 +147,7 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
tracing::info!("initialize database tape connection: path={}", main_db_path);
let main_tape = DatabaseTape::new_with_opts(main_db, tape_opts);
let changes_file = io.open_file(&changes_path, OpenFlags::Create, false)?;
let mut db = Self {
let db = Self {
io,
protocol,
db_file,
@@ -158,7 +158,7 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
meta_path: format!("{main_db_path}-info"),
changes_file: Arc::new(Mutex::new(Some(changes_file))),
opts,
meta: RefCell::new(meta.clone()),
meta: Mutex::new(meta.clone()),
client_unique_id: meta.client_unique_id.clone(),
};
@@ -176,7 +176,7 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
Ok(db)
}
fn open_revert_db_conn(&mut self) -> Result<Arc<turso_core::Connection>> {
fn open_revert_db_conn(&self) -> Result<Arc<turso_core::Connection>> {
let db = turso_core::Database::open_with_flags_bypass_registry(
self.io.clone(),
&self.main_db_path,
@@ -191,10 +191,7 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
Ok(conn)
}
async fn checkpoint_passive<Ctx>(
&mut self,
coro: &Coro<Ctx>,
) -> Result<(Option<Vec<u32>>, u64)> {
async fn checkpoint_passive<Ctx>(&self, coro: &Coro<Ctx>) -> Result<(Option<Vec<u32>>, u64)> {
let watermark = self.meta().revert_since_wal_watermark;
tracing::info!(
"checkpoint(path={:?}): revert_since_wal_watermark={}",
@@ -273,9 +270,13 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
})
}
pub async fn checkpoint<Ctx>(&mut self, coro: &Coro<Ctx>) -> Result<()> {
pub async fn checkpoint<Ctx>(&self, coro: &Coro<Ctx>) -> Result<()> {
let (main_wal_salt, watermark) = self.checkpoint_passive(coro).await?;
tracing::info!(
"checkpoint(path={:?}): passive checkpoint is done",
self.main_db_path
);
let main_conn = connect_untracked(&self.main_tape)?;
let revert_conn = self.open_revert_db_conn()?;
@@ -386,6 +387,7 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
&file.value,
&revision,
self.opts.wal_pull_batch_size,
self.opts.long_poll_timeout,
)
.await?;
@@ -419,10 +421,17 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
/// This method will **not** send local changed to the remote
/// This method will block writes for the period of pull
pub async fn apply_changes_from_remote<Ctx>(
&mut self,
&self,
coro: &Coro<Ctx>,
remote_changes: DbChangesStatus,
) -> Result<()> {
if remote_changes.file_slot.is_none() {
self.update_meta(coro, |m| {
m.last_pull_unix_time = remote_changes.time.secs;
})
.await?;
return Ok(());
}
assert!(remote_changes.file_slot.is_some(), "file_slot must be set");
let changes_file = remote_changes.file_slot.as_ref().unwrap().value.clone();
let pull_result = self.apply_changes_internal(coro, &changes_file).await;
@@ -447,7 +456,7 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
Ok(())
}
async fn apply_changes_internal<Ctx>(
&mut self,
&self,
coro: &Coro<Ctx>,
changes_file: &Arc<dyn turso_core::File>,
) -> Result<u64> {
@@ -652,7 +661,7 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
/// Sync local changes to remote DB and bring new changes from remote to local
/// This method will block writes for the period of sync
pub async fn sync<Ctx>(&mut self, coro: &Coro<Ctx>) -> Result<()> {
pub async fn sync<Ctx>(&self, coro: &Coro<Ctx>) -> Result<()> {
// todo(sivukhin): this is bit suboptimal as both 'push' and 'pull' will call pull_synced_from_remote
// but for now - keep it simple
self.push_changes_to_remote(coro).await?;
@@ -660,21 +669,14 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
Ok(())
}
pub async fn pull_changes_from_remote<Ctx>(&mut self, coro: &Coro<Ctx>) -> Result<()> {
pub async fn pull_changes_from_remote<Ctx>(&self, coro: &Coro<Ctx>) -> Result<()> {
let changes = self.wait_changes_from_remote(coro).await?;
if changes.file_slot.is_some() {
self.apply_changes_from_remote(coro, changes).await?;
} else {
self.update_meta(coro, |m| {
m.last_pull_unix_time = changes.time.secs;
})
.await?;
}
self.apply_changes_from_remote(coro, changes).await?;
Ok(())
}
fn meta(&self) -> std::cell::Ref<'_, DatabaseMetadata> {
self.meta.borrow()
fn meta(&self) -> std::sync::MutexGuard<'_, DatabaseMetadata> {
self.meta.lock().unwrap()
}
async fn update_meta<Ctx>(
@@ -688,7 +690,7 @@ impl<P: ProtocolIO> DatabaseSyncEngine<P> {
let completion = self.protocol.full_write(&self.meta_path, meta.dump()?)?;
// todo: what happen if we will actually update the metadata on disk but fail and so in memory state will not be updated
wait_all_results(coro, &completion).await?;
self.meta.replace(meta);
*self.meta.lock().unwrap() = meta;
Ok(())
}
}

View File

@@ -166,6 +166,7 @@ pub async fn wal_pull_to_file<C: ProtocolIO, Ctx>(
frames_file: &Arc<dyn turso_core::File>,
revision: &DatabasePullRevision,
wal_pull_batch_size: u64,
long_poll_timeout: Option<std::time::Duration>,
) -> Result<DatabasePullRevision> {
// truncate file before pulling new data
let c = Completion::new_trunc(move |result| {
@@ -195,7 +196,7 @@ pub async fn wal_pull_to_file<C: ProtocolIO, Ctx>(
.await
}
DatabasePullRevision::V1 { revision } => {
wal_pull_to_file_v1(coro, client, frames_file, revision).await
wal_pull_to_file_v1(coro, client, frames_file, revision, long_poll_timeout).await
}
}
}
@@ -206,6 +207,7 @@ pub async fn wal_pull_to_file_v1<C: ProtocolIO, Ctx>(
client: &C,
frames_file: &Arc<dyn turso_core::File>,
revision: &str,
long_poll_timeout: Option<std::time::Duration>,
) -> Result<DatabasePullRevision> {
tracing::info!("wal_pull: revision={revision}");
let mut bytes = BytesMut::new();
@@ -214,7 +216,7 @@ pub async fn wal_pull_to_file_v1<C: ProtocolIO, Ctx>(
encoding: PageUpdatesEncodingReq::Raw as i32,
server_revision: String::new(),
client_revision: revision.to_string(),
long_poll_timeout_ms: 0,
long_poll_timeout_ms: long_poll_timeout.map(|x| x.as_millis() as u32).unwrap_or(0),
server_pages: BytesMut::new().into(),
client_pages: BytesMut::new().into(),
};
@@ -805,12 +807,11 @@ pub async fn push_logical_changes<C: ProtocolIO, Ctx>(
}),
DatabaseTapeOperation::RowChange(change) => {
let replay_info = generator.replay_info(coro, &change).await?;
let change_type = (&change.change).into();
match change.change {
DatabaseTapeRowChangeType::Delete { before } => {
let values = generator.replay_values(
&replay_info,
change_type,
replay_info.change_type,
change.id,
before,
None,
@@ -827,7 +828,7 @@ pub async fn push_logical_changes<C: ProtocolIO, Ctx>(
DatabaseTapeRowChangeType::Insert { after } => {
let values = generator.replay_values(
&replay_info,
change_type,
replay_info.change_type,
change.id,
after,
None,
@@ -848,7 +849,7 @@ pub async fn push_logical_changes<C: ProtocolIO, Ctx>(
} => {
let values = generator.replay_values(
&replay_info,
change_type,
replay_info.change_type,
change.id,
after,
Some(updates),
@@ -869,7 +870,7 @@ pub async fn push_logical_changes<C: ProtocolIO, Ctx>(
} => {
let values = generator.replay_values(
&replay_info,
change_type,
replay_info.change_type,
change.id,
after,
None,
@@ -1359,7 +1360,7 @@ pub async fn wait_proto_message<Ctx, T: prost::Message + Default>(
Error::DatabaseSyncEngineError(format!("unable to deserialize protobuf message: {e}"))
})?;
let _ = bytes.split_to(message_length + prefix_length);
tracing::debug!(
tracing::trace!(
"wait_proto_message: elapsed={:?}",
std::time::Instant::now().duration_since(start_time)
);

View File

@@ -10,7 +10,7 @@ use crate::{
database_sync_operations::WAL_FRAME_HEADER,
errors::Error,
types::{
Coro, DatabaseChange, DatabaseTapeOperation, DatabaseTapeRowChange,
Coro, DatabaseChange, DatabaseChangeType, DatabaseTapeOperation, DatabaseTapeRowChange,
DatabaseTapeRowChangeType, ProtocolCommand,
},
wal_session::WalSession,
@@ -584,7 +584,7 @@ impl DatabaseReplaySession {
cached.stmt.reset();
let values = self.generator.replay_values(
&cached.info,
change_type,
DatabaseChangeType::Delete,
change.id,
before,
None,
@@ -600,7 +600,7 @@ impl DatabaseReplaySession {
cached.stmt.reset();
let values = self.generator.replay_values(
&cached.info,
change_type,
DatabaseChangeType::Insert,
change.id,
after,
None,
@@ -643,7 +643,7 @@ impl DatabaseReplaySession {
table,
columns
);
let info = self.generator.insert_query(coro, table, columns).await?;
let info = self.generator.upsert_query(coro, table, columns).await?;
let stmt = self.conn.prepare(&info.query)?;
self.cached_insert_stmt
.insert(key.clone(), CachedStmt { stmt, info });