mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-03 08:24:19 +01:00
remove turso-sync as now we have turso-sync-engine
This commit is contained in:
392
Cargo.lock
generated
392
Cargo.lock
generated
@@ -225,29 +225,6 @@ version = "1.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26"
|
||||
|
||||
[[package]]
|
||||
name = "aws-lc-rs"
|
||||
version = "1.13.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5c953fe1ba023e6b7730c0d4b031d06f267f23a46167dcbd40316644b10a17ba"
|
||||
dependencies = [
|
||||
"aws-lc-sys",
|
||||
"zeroize",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "aws-lc-sys"
|
||||
version = "0.30.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "dbfd150b5dbdb988bcc8fb1fe787eb6b7ee6180ca24da683b61ea5405f3d43ff"
|
||||
dependencies = [
|
||||
"bindgen",
|
||||
"cc",
|
||||
"cmake",
|
||||
"dunce",
|
||||
"fs_extra",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "backtrace"
|
||||
version = "0.3.74"
|
||||
@@ -287,29 +264,6 @@ dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "bindgen"
|
||||
version = "0.69.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "271383c67ccabffb7381723dea0672a673f292304fcb45c01cc648c7a8d58088"
|
||||
dependencies = [
|
||||
"bitflags 2.9.0",
|
||||
"cexpr",
|
||||
"clang-sys",
|
||||
"itertools 0.10.5",
|
||||
"lazy_static",
|
||||
"lazycell",
|
||||
"log",
|
||||
"prettyplease",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"regex",
|
||||
"rustc-hash 1.1.0",
|
||||
"shlex",
|
||||
"syn 2.0.100",
|
||||
"which",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "bitflags"
|
||||
version = "1.3.2"
|
||||
@@ -435,15 +389,6 @@ version = "1.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6d43a04d8753f35258c91f8ec639f792891f748a1edbd759cf1dcea3382ad83c"
|
||||
|
||||
[[package]]
|
||||
name = "cexpr"
|
||||
version = "0.6.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766"
|
||||
dependencies = [
|
||||
"nom",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cfg-if"
|
||||
version = "1.0.0"
|
||||
@@ -504,17 +449,6 @@ dependencies = [
|
||||
"half",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "clang-sys"
|
||||
version = "1.8.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0b023947811758c97c59bf9d1c188fd619ad4718dcaa767947df1cadb14f39f4"
|
||||
dependencies = [
|
||||
"glob",
|
||||
"libc",
|
||||
"libloading",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "clap"
|
||||
version = "4.5.32"
|
||||
@@ -576,15 +510,6 @@ dependencies = [
|
||||
"error-code",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cmake"
|
||||
version = "0.1.54"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e7caa3f9de89ddbe2c607f4101924c5abec803763ae9534e4f4d7d8f84aa81f0"
|
||||
dependencies = [
|
||||
"cc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "colorchoice"
|
||||
version = "1.0.3"
|
||||
@@ -646,16 +571,6 @@ dependencies = [
|
||||
"unicode-segmentation",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "core-foundation"
|
||||
version = "0.10.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b2a6cd9ae233e7f62ba4e9353e81a88df7fc8a5987b8d445b4d90c879bd156f6"
|
||||
dependencies = [
|
||||
"core-foundation-sys",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "core-foundation-sys"
|
||||
version = "0.8.7"
|
||||
@@ -1046,12 +961,6 @@ version = "0.0.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7454e41ff9012c00d53cf7f475c5e3afa3b91b7c90568495495e8d9bf47a1055"
|
||||
|
||||
[[package]]
|
||||
name = "dunce"
|
||||
version = "1.0.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "92773504d58c093f6de2459af4af33faa518c13451eb8f2b5698ed3d36e7c813"
|
||||
|
||||
[[package]]
|
||||
name = "dyn-clone"
|
||||
version = "1.0.19"
|
||||
@@ -1274,12 +1183,6 @@ dependencies = [
|
||||
"percent-encoding",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "fs_extra"
|
||||
version = "1.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c"
|
||||
|
||||
[[package]]
|
||||
name = "fsevent-sys"
|
||||
version = "4.1.0"
|
||||
@@ -1537,99 +1440,12 @@ dependencies = [
|
||||
"itoa",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "http-body"
|
||||
version = "1.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"http",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "http-body-util"
|
||||
version = "0.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b021d93e26becf5dc7e1b75b1bed1fd93124b374ceb73f43d4d4eafec896a64a"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"futures-core",
|
||||
"http",
|
||||
"http-body",
|
||||
"pin-project-lite",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "httparse"
|
||||
version = "1.10.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87"
|
||||
|
||||
[[package]]
|
||||
name = "humantime"
|
||||
version = "2.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9b112acc8b3adf4b107a8ec20977da0273a8c386765a3ec0229bd500a1443f9f"
|
||||
|
||||
[[package]]
|
||||
name = "hyper"
|
||||
version = "1.6.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cc2b571658e38e0c01b1fdca3bbbe93c00d3d71693ff2770043f8c29bc7d6f80"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"futures-channel",
|
||||
"futures-util",
|
||||
"http",
|
||||
"http-body",
|
||||
"httparse",
|
||||
"itoa",
|
||||
"pin-project-lite",
|
||||
"smallvec",
|
||||
"tokio",
|
||||
"want",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hyper-rustls"
|
||||
version = "0.27.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e3c93eb611681b207e1fe55d5a71ecf91572ec8a6705cdb6857f7d8d5242cf58"
|
||||
dependencies = [
|
||||
"http",
|
||||
"hyper",
|
||||
"hyper-util",
|
||||
"log",
|
||||
"rustls",
|
||||
"rustls-native-certs",
|
||||
"rustls-pki-types",
|
||||
"tokio",
|
||||
"tokio-rustls",
|
||||
"tower-service",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hyper-util"
|
||||
version = "0.1.16"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8d9b05277c7e8da2c93a568989bb6207bef0112e8d17df7a6eda4a3cf143bc5e"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"futures-channel",
|
||||
"futures-core",
|
||||
"futures-util",
|
||||
"http",
|
||||
"http-body",
|
||||
"hyper",
|
||||
"libc",
|
||||
"pin-project-lite",
|
||||
"socket2",
|
||||
"tokio",
|
||||
"tower-service",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "iana-time-zone"
|
||||
version = "0.1.62"
|
||||
@@ -2038,12 +1854,6 @@ version = "1.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe"
|
||||
|
||||
[[package]]
|
||||
name = "lazycell"
|
||||
version = "1.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
|
||||
|
||||
[[package]]
|
||||
name = "libc"
|
||||
version = "0.2.172"
|
||||
@@ -2387,12 +2197,6 @@ dependencies = [
|
||||
"libmimalloc-sys",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "minimal-lexical"
|
||||
version = "0.2.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
|
||||
|
||||
[[package]]
|
||||
name = "miniz_oxide"
|
||||
version = "0.8.5"
|
||||
@@ -2425,7 +2229,7 @@ dependencies = [
|
||||
"napi-build",
|
||||
"napi-sys",
|
||||
"nohash-hasher",
|
||||
"rustc-hash 2.1.1",
|
||||
"rustc-hash",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -2508,16 +2312,6 @@ version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2bf50223579dc7cdcfb3bfcacf7069ff68243f8c363f62ffa99cf000a6b9c451"
|
||||
|
||||
[[package]]
|
||||
name = "nom"
|
||||
version = "7.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a"
|
||||
dependencies = [
|
||||
"memchr",
|
||||
"minimal-lexical",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "notify"
|
||||
version = "8.0.0"
|
||||
@@ -2641,12 +2435,6 @@ version = "11.1.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d6790f58c7ff633d8771f42965289203411a5e5c68388703c06e14f24770b41e"
|
||||
|
||||
[[package]]
|
||||
name = "openssl-probe"
|
||||
version = "0.1.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e"
|
||||
|
||||
[[package]]
|
||||
name = "option-ext"
|
||||
version = "0.2.0"
|
||||
@@ -2874,16 +2662,6 @@ dependencies = [
|
||||
"termtree",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "prettyplease"
|
||||
version = "0.2.34"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6837b9e10d61f45f987d50808f83d1ee3d206c66acf650c3e4ae2e1f6ddedf55"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"syn 2.0.100",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "proc-macro-error-attr2"
|
||||
version = "2.0.0"
|
||||
@@ -3285,12 +3063,6 @@ version = "0.1.24"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f"
|
||||
|
||||
[[package]]
|
||||
name = "rustc-hash"
|
||||
version = "1.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
|
||||
|
||||
[[package]]
|
||||
name = "rustc-hash"
|
||||
version = "2.1.1"
|
||||
@@ -3342,54 +3114,6 @@ dependencies = [
|
||||
"windows-sys 0.59.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustls"
|
||||
version = "0.23.31"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c0ebcbd2f03de0fc1122ad9bb24b127a5a6cd51d72604a3f3c50ac459762b6cc"
|
||||
dependencies = [
|
||||
"aws-lc-rs",
|
||||
"log",
|
||||
"once_cell",
|
||||
"rustls-pki-types",
|
||||
"rustls-webpki",
|
||||
"subtle",
|
||||
"zeroize",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustls-native-certs"
|
||||
version = "0.8.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7fcff2dd52b58a8d98a70243663a0d234c4e2b79235637849d15913394a247d3"
|
||||
dependencies = [
|
||||
"openssl-probe",
|
||||
"rustls-pki-types",
|
||||
"schannel",
|
||||
"security-framework",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustls-pki-types"
|
||||
version = "1.12.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "229a4a4c221013e7e1f1a043678c5cc39fe5171437c88fb47151a21e6f5b5c79"
|
||||
dependencies = [
|
||||
"zeroize",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustls-webpki"
|
||||
version = "0.103.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0a17884ae0c1b773f1ccd2bd4a8c72f16da897310a98b0e84bf349ad5ead92fc"
|
||||
dependencies = [
|
||||
"aws-lc-rs",
|
||||
"ring",
|
||||
"rustls-pki-types",
|
||||
"untrusted",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustversion"
|
||||
version = "1.0.20"
|
||||
@@ -3445,15 +3169,6 @@ dependencies = [
|
||||
"winapi-util",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "schannel"
|
||||
version = "0.1.27"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1f29ebaa345f945cec9fbbc532eb307f0fdad8161f281b6369539c8d84876b3d"
|
||||
dependencies = [
|
||||
"windows-sys 0.59.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "schemars"
|
||||
version = "0.8.22"
|
||||
@@ -3485,29 +3200,6 @@ version = "1.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
|
||||
|
||||
[[package]]
|
||||
name = "security-framework"
|
||||
version = "3.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "271720403f46ca04f7ba6f55d438f8bd878d6b8ca0a1046e8228c4145bcbb316"
|
||||
dependencies = [
|
||||
"bitflags 2.9.0",
|
||||
"core-foundation",
|
||||
"core-foundation-sys",
|
||||
"libc",
|
||||
"security-framework-sys",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "security-framework-sys"
|
||||
version = "2.14.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "49db231d56a190491cb4aeda9527f1ad45345af50b0851622a7adb8c03b01c32"
|
||||
dependencies = [
|
||||
"core-foundation-sys",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "semver"
|
||||
version = "1.0.26"
|
||||
@@ -3670,12 +3362,6 @@ dependencies = [
|
||||
"syn 2.0.100",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "subtle"
|
||||
version = "2.6.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292"
|
||||
|
||||
[[package]]
|
||||
name = "supports-color"
|
||||
version = "3.0.2"
|
||||
@@ -3991,16 +3677,6 @@ dependencies = [
|
||||
"syn 2.0.100",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-rustls"
|
||||
version = "0.26.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8e727b36a1a0e8b74c376ac2211e40c2c8af09fb4013c60d910495810f008e9b"
|
||||
dependencies = [
|
||||
"rustls",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "toml"
|
||||
version = "0.8.22"
|
||||
@@ -4043,12 +3719,6 @@ version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bfb942dfe1d8e29a7ee7fcbde5bd2b9a25fb89aa70caea2eba3bee836ff41076"
|
||||
|
||||
[[package]]
|
||||
name = "tower-service"
|
||||
version = "0.3.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3"
|
||||
|
||||
[[package]]
|
||||
name = "tracing"
|
||||
version = "0.1.41"
|
||||
@@ -4122,12 +3792,6 @@ dependencies = [
|
||||
"tracing-log",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "try-lock"
|
||||
version = "0.2.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b"
|
||||
|
||||
[[package]]
|
||||
name = "turso"
|
||||
version = "0.1.4-pre.6"
|
||||
@@ -4149,33 +3813,6 @@ dependencies = [
|
||||
"turso_core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "turso-sync"
|
||||
version = "0.1.4-pre.6"
|
||||
dependencies = [
|
||||
"ctor",
|
||||
"futures",
|
||||
"http",
|
||||
"http-body-util",
|
||||
"hyper",
|
||||
"hyper-rustls",
|
||||
"hyper-util",
|
||||
"paste",
|
||||
"rand 0.9.2",
|
||||
"rand_chacha 0.9.0",
|
||||
"rustls",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tempfile",
|
||||
"thiserror 2.0.12",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
"turso",
|
||||
"turso_core",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "turso_cli"
|
||||
version = "0.1.4-pre.6"
|
||||
@@ -4566,15 +4203,6 @@ dependencies = [
|
||||
"winapi-util",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "want"
|
||||
version = "0.3.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bfa7760aed19e106de2c7c0b581b509f2f25d3dacaf737cb82ac61bc6d760b0e"
|
||||
dependencies = [
|
||||
"try-lock",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "wasi"
|
||||
version = "0.11.0+wasi-snapshot-preview1"
|
||||
@@ -4671,18 +4299,6 @@ dependencies = [
|
||||
"wasm-bindgen",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "which"
|
||||
version = "4.4.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "87ba24419a2078cd2b0f2ede2691b6c66d8e47836da3b6db8265ebad47afbfc7"
|
||||
dependencies = [
|
||||
"either",
|
||||
"home",
|
||||
"once_cell",
|
||||
"rustix 0.38.44",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "winapi"
|
||||
version = "0.3.9"
|
||||
@@ -5067,12 +4683,6 @@ dependencies = [
|
||||
"synstructure",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "zeroize"
|
||||
version = "1.8.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde"
|
||||
|
||||
[[package]]
|
||||
name = "zerovec"
|
||||
version = "0.10.4"
|
||||
|
||||
@@ -26,7 +26,6 @@ members = [
|
||||
"testing/sqlite_test_ext",
|
||||
"tests",
|
||||
"vendored/sqlite3-parser/sqlparser_bench",
|
||||
"packages/turso-sync",
|
||||
"packages/turso-sync-engine",
|
||||
"packages/turso-sync-js",
|
||||
]
|
||||
|
||||
@@ -1,33 +0,0 @@
|
||||
[package]
|
||||
name = "turso-sync"
|
||||
version.workspace = true
|
||||
authors.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
repository.workspace = true
|
||||
|
||||
[dependencies]
|
||||
turso_core = { workspace = true, features = ["conn_raw_api"] }
|
||||
turso = { workspace = true, features = ["conn_raw_api"] }
|
||||
thiserror = "2.0.12"
|
||||
tracing = "0.1.41"
|
||||
hyper = { version = "1.6.0", features = ["client", "http1"] }
|
||||
serde_json.workspace = true
|
||||
http-body-util = "0.1.3"
|
||||
http = "1.3.1"
|
||||
hyper-util = { version = "0.1.16", features = ["tokio", "http1", "client"] }
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
tokio = { version = "1.46.1", features = ["fs", "io-util"] }
|
||||
hyper-rustls = "0.27.7"
|
||||
rustls = "0.23.31"
|
||||
|
||||
[dev-dependencies]
|
||||
ctor = "0.4.2"
|
||||
tempfile = "3.20.0"
|
||||
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
|
||||
tokio = { version = "1.46.1", features = ["macros", "rt-multi-thread", "test-util"] }
|
||||
uuid = "1.17.0"
|
||||
rand = "0.9.2"
|
||||
rand_chacha = "0.9.0"
|
||||
futures = "0.3.31"
|
||||
paste = "1.0.15"
|
||||
@@ -1,20 +0,0 @@
|
||||
# Turso sync package
|
||||
|
||||
turso-sync package extends turso-db embedded database with sync capabilities
|
||||
|
||||
> [!NOTE]
|
||||
> This package is experimental and, therefore, subject to change at any time.
|
||||
|
||||
## Installation
|
||||
|
||||
```bash
|
||||
cargo add turso-sync
|
||||
```
|
||||
|
||||
## Examples
|
||||
|
||||
Check out the `examples/` directory for complete usage examples.
|
||||
|
||||
## License
|
||||
|
||||
MIT
|
||||
@@ -1,70 +0,0 @@
|
||||
use std::io::{self, Write};
|
||||
|
||||
use tracing_subscriber::EnvFilter;
|
||||
use turso_sync::database::Builder;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter(EnvFilter::from_default_env())
|
||||
.init();
|
||||
let sync_url = std::env::var("TURSO_SYNC_URL").unwrap();
|
||||
let auth_token = std::env::var("TURSO_AUTH_TOKEN").ok();
|
||||
let local_path = std::env::var("TURSO_LOCAL_PATH").unwrap();
|
||||
let mut db = Builder::new_synced(&local_path, &sync_url, auth_token)
|
||||
.build()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
loop {
|
||||
print!("> ");
|
||||
io::stdout().flush().unwrap();
|
||||
|
||||
let mut input = String::new();
|
||||
let bytes_read = io::stdin().read_line(&mut input).unwrap();
|
||||
|
||||
if bytes_read == 0 {
|
||||
break;
|
||||
}
|
||||
|
||||
let trimmed = input.trim();
|
||||
match trimmed {
|
||||
".exit" | ".quit" => break,
|
||||
".pull" => {
|
||||
db.pull().await.unwrap();
|
||||
continue;
|
||||
}
|
||||
".push" => {
|
||||
db.push().await.unwrap();
|
||||
continue;
|
||||
}
|
||||
".sync" => {
|
||||
db.sync().await.unwrap();
|
||||
continue;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
let mut rows = db.query(&input, ()).await.unwrap();
|
||||
while let Some(row) = rows.next().await.unwrap() {
|
||||
let mut values = vec![];
|
||||
for i in 0..row.column_count() {
|
||||
let value = row.get_value(i).unwrap();
|
||||
match value {
|
||||
turso::Value::Null => values.push("NULL".to_string()),
|
||||
turso::Value::Integer(x) => values.push(format!("{x}")),
|
||||
turso::Value::Real(x) => values.push(format!("{x}")),
|
||||
turso::Value::Text(x) => values.push(format!("'{x}'")),
|
||||
turso::Value::Blob(x) => values.push(format!(
|
||||
"x'{}'",
|
||||
x.iter()
|
||||
.map(|x| format!("{x:02x}"))
|
||||
.collect::<Vec<_>>()
|
||||
.join(""),
|
||||
)),
|
||||
}
|
||||
}
|
||||
println!("{}", &values.join(" "));
|
||||
io::stdout().flush().unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,77 +0,0 @@
|
||||
use std::io::{self, Write};
|
||||
|
||||
use turso::Builder;
|
||||
use turso_sync::{
|
||||
database_tape::{DatabaseChangesIteratorMode, DatabaseChangesIteratorOpts, DatabaseTape},
|
||||
types::DatabaseTapeOperation,
|
||||
};
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let db = Builder::new_local("local.db").build().await.unwrap();
|
||||
let db = DatabaseTape::new(db);
|
||||
|
||||
let conn = db.connect().await.unwrap();
|
||||
|
||||
loop {
|
||||
print!("> ");
|
||||
io::stdout().flush().unwrap();
|
||||
|
||||
let mut input = String::new();
|
||||
let bytes_read = io::stdin().read_line(&mut input).unwrap();
|
||||
|
||||
if bytes_read == 0 {
|
||||
break;
|
||||
}
|
||||
|
||||
let trimmed = input.trim();
|
||||
if trimmed == ".exit" || trimmed == ".quit" {
|
||||
break;
|
||||
}
|
||||
if trimmed.starts_with(".undo ") || trimmed.starts_with(".redo ") {
|
||||
let first_change_id = Some(trimmed[".undo ".len()..].parse().unwrap());
|
||||
let mode = match &trimmed[0..(".undo".len())] {
|
||||
".undo" => DatabaseChangesIteratorMode::Revert,
|
||||
".redo" => DatabaseChangesIteratorMode::Apply,
|
||||
_ => unreachable!(),
|
||||
};
|
||||
let mut iterator = db
|
||||
.iterate_changes(DatabaseChangesIteratorOpts {
|
||||
first_change_id,
|
||||
mode,
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
let mut session = db.start_tape_session().await.unwrap();
|
||||
if let Some(change) = iterator.next().await.unwrap() {
|
||||
session.replay(change).await.unwrap();
|
||||
session.replay(DatabaseTapeOperation::Commit).await.unwrap();
|
||||
}
|
||||
continue;
|
||||
}
|
||||
let mut stmt = conn.prepare(&input).await.unwrap();
|
||||
let mut rows = stmt.query(()).await.unwrap();
|
||||
while let Some(row) = rows.next().await.unwrap() {
|
||||
let mut values = vec![];
|
||||
for i in 0..row.column_count() {
|
||||
let value = row.get_value(i).unwrap();
|
||||
match value {
|
||||
turso::Value::Null => values.push("NULL".to_string()),
|
||||
turso::Value::Integer(x) => values.push(format!("{x}",)),
|
||||
turso::Value::Real(x) => values.push(format!("{x}")),
|
||||
turso::Value::Text(x) => values.push(format!("'{x}'")),
|
||||
turso::Value::Blob(x) => values.push(format!(
|
||||
"x'{}'",
|
||||
x.iter()
|
||||
.map(|x| format!("{x:02x}"))
|
||||
.collect::<Vec<_>>()
|
||||
.join(""),
|
||||
)),
|
||||
}
|
||||
}
|
||||
println!("{}", &values.join(" "));
|
||||
io::stdout().flush().unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,98 +0,0 @@
|
||||
use std::path::PathBuf;
|
||||
|
||||
use hyper_rustls::{ConfigBuilderExt, HttpsConnector, HttpsConnectorBuilder};
|
||||
use hyper_util::{client::legacy::connect::HttpConnector, rt::TokioExecutor};
|
||||
|
||||
use crate::{
|
||||
database_inner::{DatabaseInner, Rows},
|
||||
errors::Error,
|
||||
filesystem::tokio::TokioFilesystem,
|
||||
sync_server::turso::{TursoSyncServer, TursoSyncServerOpts},
|
||||
Result,
|
||||
};
|
||||
|
||||
/// [Database] expose public interface for synced database from [DatabaseInner] private implementation
|
||||
///
|
||||
/// This layer also serves a purpose of "gluing" together all component for real use,
|
||||
/// because [DatabaseInner] abstracts things away in order to simplify testing
|
||||
pub struct Database(DatabaseInner<TursoSyncServer, TokioFilesystem>);
|
||||
|
||||
pub struct Builder {
|
||||
path: String,
|
||||
sync_url: String,
|
||||
auth_token: Option<String>,
|
||||
encryption_key: Option<String>,
|
||||
connector: Option<HttpsConnector<HttpConnector>>,
|
||||
}
|
||||
|
||||
impl Builder {
|
||||
pub fn new_synced(path: &str, sync_url: &str, auth_token: Option<String>) -> Self {
|
||||
Self {
|
||||
path: path.to_string(),
|
||||
sync_url: sync_url.to_string(),
|
||||
auth_token,
|
||||
encryption_key: None,
|
||||
connector: None,
|
||||
}
|
||||
}
|
||||
pub fn with_encryption_key(self, encryption_key: &str) -> Self {
|
||||
Self {
|
||||
encryption_key: Some(encryption_key.to_string()),
|
||||
..self
|
||||
}
|
||||
}
|
||||
pub fn with_connector(self, connector: HttpsConnector<HttpConnector>) -> Self {
|
||||
Self {
|
||||
connector: Some(connector),
|
||||
..self
|
||||
}
|
||||
}
|
||||
pub async fn build(self) -> Result<Database> {
|
||||
let path = PathBuf::from(self.path);
|
||||
let connector = self.connector.map(Ok).unwrap_or_else(default_connector)?;
|
||||
let executor = TokioExecutor::new();
|
||||
let client = hyper_util::client::legacy::Builder::new(executor).build(connector);
|
||||
let sync_server = TursoSyncServer::new(
|
||||
client,
|
||||
TursoSyncServerOpts {
|
||||
sync_url: self.sync_url,
|
||||
auth_token: self.auth_token,
|
||||
encryption_key: self.encryption_key,
|
||||
pull_batch_size: None,
|
||||
},
|
||||
)?;
|
||||
let filesystem = TokioFilesystem();
|
||||
let inner = DatabaseInner::new(filesystem, sync_server, &path).await?;
|
||||
Ok(Database(inner))
|
||||
}
|
||||
}
|
||||
|
||||
impl Database {
|
||||
pub async fn sync(&mut self) -> Result<()> {
|
||||
self.0.sync().await
|
||||
}
|
||||
pub async fn pull(&mut self) -> Result<()> {
|
||||
self.0.pull().await
|
||||
}
|
||||
pub async fn push(&mut self) -> Result<()> {
|
||||
self.0.push().await
|
||||
}
|
||||
pub async fn execute(&self, sql: &str, params: impl turso::IntoParams) -> Result<u64> {
|
||||
self.0.execute(sql, params).await
|
||||
}
|
||||
pub async fn query(&self, sql: &str, params: impl turso::IntoParams) -> Result<Rows> {
|
||||
self.0.query(sql, params).await
|
||||
}
|
||||
}
|
||||
|
||||
pub fn default_connector() -> Result<HttpsConnector<HttpConnector>> {
|
||||
let tls_config = rustls::ClientConfig::builder()
|
||||
.with_native_roots()
|
||||
.map_err(|e| Error::DatabaseSyncError(format!("unable to configure CA roots: {e}")))?
|
||||
.with_no_client_auth();
|
||||
Ok(HttpsConnectorBuilder::new()
|
||||
.with_tls_config(tls_config)
|
||||
.https_or_http()
|
||||
.enable_http1()
|
||||
.build())
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,552 +0,0 @@
|
||||
use std::{
|
||||
collections::{HashMap, VecDeque},
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use crate::{
|
||||
types::{
|
||||
DatabaseChange, DatabaseTapeOperation, DatabaseTapeRowChange, DatabaseTapeRowChangeType,
|
||||
},
|
||||
Result,
|
||||
};
|
||||
|
||||
/// Simple wrapper over [turso::Database] which extends its intereface with few methods
|
||||
/// to collect changes made to the database and apply/revert arbitrary changes to the database
|
||||
pub struct DatabaseTape {
|
||||
inner: turso::Database,
|
||||
cdc_table: Arc<String>,
|
||||
pragma_query: String,
|
||||
}
|
||||
|
||||
const DEFAULT_CDC_TABLE_NAME: &str = "turso_cdc";
|
||||
const DEFAULT_CDC_MODE: &str = "full";
|
||||
const DEFAULT_CHANGES_BATCH_SIZE: usize = 100;
|
||||
const CDC_PRAGMA_NAME: &str = "unstable_capture_data_changes_conn";
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct DatabaseTapeOpts {
|
||||
pub cdc_table: Option<String>,
|
||||
pub cdc_mode: Option<String>,
|
||||
}
|
||||
|
||||
impl DatabaseTape {
|
||||
pub fn new(database: turso::Database) -> Self {
|
||||
let opts = DatabaseTapeOpts {
|
||||
cdc_table: None,
|
||||
cdc_mode: None,
|
||||
};
|
||||
Self::new_with_opts(database, opts)
|
||||
}
|
||||
pub fn new_with_opts(database: turso::Database, opts: DatabaseTapeOpts) -> Self {
|
||||
tracing::debug!("create local sync database with options {:?}", opts);
|
||||
let cdc_table = opts.cdc_table.unwrap_or(DEFAULT_CDC_TABLE_NAME.to_string());
|
||||
let cdc_mode = opts.cdc_mode.unwrap_or(DEFAULT_CDC_MODE.to_string());
|
||||
let pragma_query = format!("PRAGMA {CDC_PRAGMA_NAME}('{cdc_mode},{cdc_table}')");
|
||||
Self {
|
||||
inner: database,
|
||||
cdc_table: Arc::new(cdc_table.to_string()),
|
||||
pragma_query,
|
||||
}
|
||||
}
|
||||
pub async fn connect(&self) -> Result<turso::Connection> {
|
||||
let connection = self.inner.connect()?;
|
||||
tracing::debug!("set '{CDC_PRAGMA_NAME}' for new connection");
|
||||
connection.execute(&self.pragma_query, ()).await?;
|
||||
Ok(connection)
|
||||
}
|
||||
/// Builds an iterator which emits [DatabaseTapeOperation] by extracting data from CDC table
|
||||
pub async fn iterate_changes(
|
||||
&self,
|
||||
opts: DatabaseChangesIteratorOpts,
|
||||
) -> Result<DatabaseChangesIterator> {
|
||||
tracing::debug!("opening changes iterator with options {:?}", opts);
|
||||
let conn = self.connect().await?;
|
||||
let query = opts.mode.query(&self.cdc_table, opts.batch_size);
|
||||
let query_stmt = conn.prepare(&query).await?;
|
||||
Ok(DatabaseChangesIterator {
|
||||
first_change_id: opts.first_change_id,
|
||||
batch: VecDeque::with_capacity(opts.batch_size),
|
||||
query_stmt,
|
||||
txn_boundary_returned: false,
|
||||
mode: opts.mode,
|
||||
ignore_schema_changes: opts.ignore_schema_changes,
|
||||
})
|
||||
}
|
||||
/// Start replay session which can apply [DatabaseTapeOperation] from [Self::iterate_changes]
|
||||
pub async fn start_tape_session(&self) -> Result<DatabaseReplaySession> {
|
||||
tracing::debug!("opening replay session");
|
||||
Ok(DatabaseReplaySession {
|
||||
conn: self.connect().await?,
|
||||
cached_delete_stmt: HashMap::new(),
|
||||
cached_insert_stmt: HashMap::new(),
|
||||
in_txn: false,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum DatabaseChangesIteratorMode {
|
||||
Apply,
|
||||
Revert,
|
||||
}
|
||||
|
||||
impl DatabaseChangesIteratorMode {
|
||||
pub fn query(&self, table_name: &str, limit: usize) -> String {
|
||||
let (operation, order) = match self {
|
||||
DatabaseChangesIteratorMode::Apply => (">=", "ASC"),
|
||||
DatabaseChangesIteratorMode::Revert => ("<=", "DESC"),
|
||||
};
|
||||
format!(
|
||||
"SELECT * FROM {table_name} WHERE change_id {operation} ? ORDER BY change_id {order} LIMIT {limit}",
|
||||
)
|
||||
}
|
||||
pub fn first_id(&self) -> i64 {
|
||||
match self {
|
||||
DatabaseChangesIteratorMode::Apply => -1,
|
||||
DatabaseChangesIteratorMode::Revert => i64::MAX,
|
||||
}
|
||||
}
|
||||
pub fn next_id(&self, id: i64) -> i64 {
|
||||
match self {
|
||||
DatabaseChangesIteratorMode::Apply => id + 1,
|
||||
DatabaseChangesIteratorMode::Revert => id - 1,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct DatabaseChangesIteratorOpts {
|
||||
pub first_change_id: Option<i64>,
|
||||
pub batch_size: usize,
|
||||
pub mode: DatabaseChangesIteratorMode,
|
||||
pub ignore_schema_changes: bool,
|
||||
}
|
||||
|
||||
impl Default for DatabaseChangesIteratorOpts {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
first_change_id: None,
|
||||
batch_size: DEFAULT_CHANGES_BATCH_SIZE,
|
||||
mode: DatabaseChangesIteratorMode::Apply,
|
||||
ignore_schema_changes: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct DatabaseChangesIterator {
|
||||
query_stmt: turso::Statement,
|
||||
first_change_id: Option<i64>,
|
||||
batch: VecDeque<DatabaseTapeRowChange>,
|
||||
txn_boundary_returned: bool,
|
||||
mode: DatabaseChangesIteratorMode,
|
||||
ignore_schema_changes: bool,
|
||||
}
|
||||
|
||||
impl DatabaseChangesIterator {
|
||||
pub async fn next(&mut self) -> Result<Option<DatabaseTapeOperation>> {
|
||||
if self.batch.is_empty() {
|
||||
self.refill().await?;
|
||||
}
|
||||
// todo(sivukhin): iterator must be more clever about transaction boundaries - but for that we need to extend CDC table
|
||||
// for now, if iterator reach the end of CDC table - we are sure that this is a transaction boundary
|
||||
loop {
|
||||
let next = if let Some(change) = self.batch.pop_front() {
|
||||
self.txn_boundary_returned = false;
|
||||
Some(DatabaseTapeOperation::RowChange(change))
|
||||
} else if !self.txn_boundary_returned {
|
||||
self.txn_boundary_returned = true;
|
||||
Some(DatabaseTapeOperation::Commit)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
if let Some(DatabaseTapeOperation::RowChange(change)) = &next {
|
||||
if self.ignore_schema_changes && change.table_name == "sqlite_schema" {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
return Ok(next);
|
||||
}
|
||||
}
|
||||
async fn refill(&mut self) -> Result<()> {
|
||||
let change_id_filter = self.first_change_id.unwrap_or(self.mode.first_id());
|
||||
self.query_stmt.reset();
|
||||
|
||||
let mut rows = self.query_stmt.query((change_id_filter,)).await?;
|
||||
while let Some(row) = rows.next().await? {
|
||||
let database_change: DatabaseChange = row.try_into()?;
|
||||
let tape_change = match self.mode {
|
||||
DatabaseChangesIteratorMode::Apply => database_change.into_apply()?,
|
||||
DatabaseChangesIteratorMode::Revert => database_change.into_revert()?,
|
||||
};
|
||||
self.batch.push_back(tape_change);
|
||||
}
|
||||
let batch_len = self.batch.len();
|
||||
if batch_len > 0 {
|
||||
self.first_change_id = Some(self.mode.next_id(self.batch[batch_len - 1].change_id));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct DatabaseReplaySession {
|
||||
conn: turso::Connection,
|
||||
cached_delete_stmt: HashMap<String, turso::Statement>,
|
||||
cached_insert_stmt: HashMap<(String, usize), turso::Statement>,
|
||||
in_txn: bool,
|
||||
}
|
||||
|
||||
impl DatabaseReplaySession {
|
||||
pub async fn replay(&mut self, operation: DatabaseTapeOperation) -> Result<()> {
|
||||
match operation {
|
||||
DatabaseTapeOperation::Commit => {
|
||||
tracing::trace!("replay: commit replayed changes after transaction boundary");
|
||||
if self.in_txn {
|
||||
self.conn.execute("COMMIT", ()).await?;
|
||||
self.in_txn = false;
|
||||
}
|
||||
}
|
||||
DatabaseTapeOperation::RowChange(change) => {
|
||||
if !self.in_txn {
|
||||
tracing::trace!("replay: start txn for replaying changes");
|
||||
self.conn.execute("BEGIN", ()).await?;
|
||||
self.in_txn = true;
|
||||
}
|
||||
tracing::trace!("replay: change={:?}", change);
|
||||
let table_name = &change.table_name;
|
||||
match change.change {
|
||||
DatabaseTapeRowChangeType::Delete => {
|
||||
self.replay_delete(table_name, change.id).await?
|
||||
}
|
||||
DatabaseTapeRowChangeType::Update { bin_record } => {
|
||||
self.replay_delete(table_name, change.id).await?;
|
||||
let values = parse_bin_record(bin_record)?;
|
||||
self.replay_insert(table_name, change.id, values).await?;
|
||||
}
|
||||
DatabaseTapeRowChangeType::Insert { bin_record } => {
|
||||
let values = parse_bin_record(bin_record)?;
|
||||
self.replay_insert(table_name, change.id, values).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
async fn replay_delete(&mut self, table_name: &str, id: i64) -> Result<()> {
|
||||
let stmt = self.cached_delete_stmt(table_name).await?;
|
||||
stmt.execute((id,)).await?;
|
||||
Ok(())
|
||||
}
|
||||
async fn replay_insert(
|
||||
&mut self,
|
||||
table_name: &str,
|
||||
id: i64,
|
||||
mut values: Vec<turso::Value>,
|
||||
) -> Result<()> {
|
||||
let columns = values.len();
|
||||
let stmt = self.cached_insert_stmt(table_name, columns).await?;
|
||||
|
||||
values.push(turso::Value::Integer(id));
|
||||
let params = turso::params::Params::Positional(values);
|
||||
|
||||
stmt.execute(params).await?;
|
||||
Ok(())
|
||||
}
|
||||
async fn cached_delete_stmt(&mut self, table_name: &str) -> Result<&mut turso::Statement> {
|
||||
if !self.cached_delete_stmt.contains_key(table_name) {
|
||||
tracing::trace!("prepare delete statement for replay: table={}", table_name);
|
||||
let query = format!("DELETE FROM {table_name} WHERE rowid = ?");
|
||||
let stmt = self.conn.prepare(&query).await?;
|
||||
self.cached_delete_stmt.insert(table_name.to_string(), stmt);
|
||||
}
|
||||
tracing::trace!(
|
||||
"ready to use prepared delete statement for replay: table={}",
|
||||
table_name
|
||||
);
|
||||
Ok(self.cached_delete_stmt.get_mut(table_name).unwrap())
|
||||
}
|
||||
async fn cached_insert_stmt(
|
||||
&mut self,
|
||||
table_name: &str,
|
||||
columns: usize,
|
||||
) -> Result<&mut turso::Statement> {
|
||||
let key = (table_name.to_string(), columns);
|
||||
if !self.cached_insert_stmt.contains_key(&key) {
|
||||
tracing::trace!(
|
||||
"prepare insert statement for replay: table={}, columns={}",
|
||||
table_name,
|
||||
columns
|
||||
);
|
||||
let mut table_info = self
|
||||
.conn
|
||||
.query(
|
||||
&format!("SELECT name FROM pragma_table_info('{table_name}')"),
|
||||
(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut column_names = Vec::with_capacity(columns + 1);
|
||||
while let Some(table_info_row) = table_info.next().await? {
|
||||
let value = table_info_row.get_value(0)?;
|
||||
column_names.push(value.as_text().expect("must be text").to_string());
|
||||
}
|
||||
column_names.push("rowid".to_string());
|
||||
|
||||
let placeholders = ["?"].repeat(columns + 1).join(",");
|
||||
let column_names = column_names.join(", ");
|
||||
let query = format!("INSERT INTO {table_name}({column_names}) VALUES ({placeholders})");
|
||||
let stmt = self.conn.prepare(&query).await?;
|
||||
self.cached_insert_stmt.insert(key.clone(), stmt);
|
||||
}
|
||||
tracing::trace!(
|
||||
"ready to use prepared insert statement for replay: table={}, columns={}",
|
||||
table_name,
|
||||
columns
|
||||
);
|
||||
Ok(self.cached_insert_stmt.get_mut(&key).unwrap())
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_bin_record(bin_record: Vec<u8>) -> Result<Vec<turso::Value>> {
|
||||
let record = turso_core::types::ImmutableRecord::from_bin_record(bin_record);
|
||||
let mut cursor = turso_core::types::RecordCursor::new();
|
||||
let columns = cursor.count(&record);
|
||||
let mut values = Vec::with_capacity(columns);
|
||||
for i in 0..columns {
|
||||
let value = cursor.get_value(&record, i)?;
|
||||
values.push(value.to_owned().into());
|
||||
}
|
||||
Ok(values)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use tempfile::NamedTempFile;
|
||||
use turso::Value;
|
||||
|
||||
use crate::{
|
||||
database_tape::{DatabaseChangesIteratorOpts, DatabaseTape},
|
||||
types::DatabaseTapeOperation,
|
||||
};
|
||||
|
||||
async fn fetch_rows(conn: &turso::Connection, query: &str) -> Vec<Vec<turso::Value>> {
|
||||
let mut rows = vec![];
|
||||
let mut iterator = conn.query(query, ()).await.unwrap();
|
||||
while let Some(row) = iterator.next().await.unwrap() {
|
||||
let mut row_values = vec![];
|
||||
for i in 0..row.column_count() {
|
||||
row_values.push(row.get_value(i).unwrap());
|
||||
}
|
||||
rows.push(row_values);
|
||||
}
|
||||
rows
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_database_cdc_single_iteration() {
|
||||
let temp_file1 = NamedTempFile::new().unwrap();
|
||||
let db_path1 = temp_file1.path().to_str().unwrap();
|
||||
|
||||
let temp_file2 = NamedTempFile::new().unwrap();
|
||||
let db_path2 = temp_file2.path().to_str().unwrap();
|
||||
|
||||
let db1 = turso::Builder::new_local(db_path1).build().await.unwrap();
|
||||
let db1 = DatabaseTape::new(db1);
|
||||
let conn1 = db1.connect().await.unwrap();
|
||||
|
||||
let db2 = turso::Builder::new_local(db_path2).build().await.unwrap();
|
||||
let db2 = DatabaseTape::new(db2);
|
||||
let conn2 = db2.connect().await.unwrap();
|
||||
|
||||
conn1
|
||||
.execute("CREATE TABLE a(x INTEGER PRIMARY KEY, y);", ())
|
||||
.await
|
||||
.unwrap();
|
||||
conn1
|
||||
.execute("CREATE TABLE b(x INTEGER PRIMARY KEY, y, z);", ())
|
||||
.await
|
||||
.unwrap();
|
||||
conn2
|
||||
.execute("CREATE TABLE a(x INTEGER PRIMARY KEY, y);", ())
|
||||
.await
|
||||
.unwrap();
|
||||
conn2
|
||||
.execute("CREATE TABLE b(x INTEGER PRIMARY KEY, y, z);", ())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
conn1
|
||||
.execute("INSERT INTO a VALUES (1, 'hello'), (2, 'turso')", ())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
conn1
|
||||
.execute(
|
||||
"INSERT INTO b VALUES (3, 'bye', 0.1), (4, 'limbo', 0.2)",
|
||||
(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut iterator = db1.iterate_changes(Default::default()).await.unwrap();
|
||||
{
|
||||
let mut replay = db2.start_tape_session().await.unwrap();
|
||||
while let Some(change) = iterator.next().await.unwrap() {
|
||||
replay.replay(change).await.unwrap();
|
||||
}
|
||||
}
|
||||
assert_eq!(
|
||||
fetch_rows(&conn2, "SELECT * FROM a").await,
|
||||
vec![
|
||||
vec![
|
||||
turso::Value::Integer(1),
|
||||
turso::Value::Text("hello".to_string())
|
||||
],
|
||||
vec![
|
||||
turso::Value::Integer(2),
|
||||
turso::Value::Text("turso".to_string())
|
||||
],
|
||||
]
|
||||
);
|
||||
assert_eq!(
|
||||
fetch_rows(&conn2, "SELECT * FROM b").await,
|
||||
vec![
|
||||
vec![
|
||||
turso::Value::Integer(3),
|
||||
turso::Value::Text("bye".to_string()),
|
||||
turso::Value::Real(0.1)
|
||||
],
|
||||
vec![
|
||||
turso::Value::Integer(4),
|
||||
turso::Value::Text("limbo".to_string()),
|
||||
turso::Value::Real(0.2)
|
||||
],
|
||||
]
|
||||
);
|
||||
|
||||
conn1
|
||||
.execute("DELETE FROM b WHERE y = 'limbo'", ())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
{
|
||||
let mut replay = db2.start_tape_session().await.unwrap();
|
||||
while let Some(change) = iterator.next().await.unwrap() {
|
||||
replay.replay(change).await.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
assert_eq!(
|
||||
fetch_rows(&conn2, "SELECT * FROM a").await,
|
||||
vec![
|
||||
vec![
|
||||
turso::Value::Integer(1),
|
||||
turso::Value::Text("hello".to_string())
|
||||
],
|
||||
vec![
|
||||
turso::Value::Integer(2),
|
||||
turso::Value::Text("turso".to_string())
|
||||
],
|
||||
]
|
||||
);
|
||||
assert_eq!(
|
||||
fetch_rows(&conn2, "SELECT * FROM b").await,
|
||||
vec![vec![
|
||||
turso::Value::Integer(3),
|
||||
turso::Value::Text("bye".to_string()),
|
||||
turso::Value::Real(0.1)
|
||||
],]
|
||||
);
|
||||
|
||||
conn1
|
||||
.execute("UPDATE b SET y = x'deadbeef' WHERE x = 3", ())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
{
|
||||
let mut replay = db2.start_tape_session().await.unwrap();
|
||||
while let Some(change) = iterator.next().await.unwrap() {
|
||||
replay.replay(change).await.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
assert_eq!(
|
||||
fetch_rows(&conn2, "SELECT * FROM a").await,
|
||||
vec![
|
||||
vec![
|
||||
turso::Value::Integer(1),
|
||||
turso::Value::Text("hello".to_string())
|
||||
],
|
||||
vec![
|
||||
turso::Value::Integer(2),
|
||||
turso::Value::Text("turso".to_string())
|
||||
],
|
||||
]
|
||||
);
|
||||
assert_eq!(
|
||||
fetch_rows(&conn2, "SELECT * FROM b").await,
|
||||
vec![vec![
|
||||
turso::Value::Integer(3),
|
||||
turso::Value::Blob(vec![0xde, 0xad, 0xbe, 0xef]),
|
||||
turso::Value::Real(0.1)
|
||||
]]
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_database_cdc_multiple_iterations() {
|
||||
let temp_file1 = NamedTempFile::new().unwrap();
|
||||
let db_path1 = temp_file1.path().to_str().unwrap();
|
||||
|
||||
let temp_file2 = NamedTempFile::new().unwrap();
|
||||
let db_path2 = temp_file2.path().to_str().unwrap();
|
||||
|
||||
let db1 = turso::Builder::new_local(db_path1).build().await.unwrap();
|
||||
let db1 = DatabaseTape::new(db1);
|
||||
let conn1 = db1.connect().await.unwrap();
|
||||
|
||||
let db2 = turso::Builder::new_local(db_path2).build().await.unwrap();
|
||||
let db2 = DatabaseTape::new(db2);
|
||||
let conn2 = db2.connect().await.unwrap();
|
||||
|
||||
conn1
|
||||
.execute("CREATE TABLE a(x INTEGER PRIMARY KEY, y);", ())
|
||||
.await
|
||||
.unwrap();
|
||||
conn2
|
||||
.execute("CREATE TABLE a(x INTEGER PRIMARY KEY, y);", ())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut next_change_id = None;
|
||||
let mut expected = Vec::new();
|
||||
for i in 0..10 {
|
||||
conn1
|
||||
.execute("INSERT INTO a VALUES (?, 'hello')", (i,))
|
||||
.await
|
||||
.unwrap();
|
||||
expected.push(vec![
|
||||
Value::Integer(i as i64),
|
||||
Value::Text("hello".to_string()),
|
||||
]);
|
||||
|
||||
let mut iterator = db1
|
||||
.iterate_changes(DatabaseChangesIteratorOpts {
|
||||
first_change_id: next_change_id,
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
{
|
||||
let mut replay = db2.start_tape_session().await.unwrap();
|
||||
while let Some(change) = iterator.next().await.unwrap() {
|
||||
if let DatabaseTapeOperation::RowChange(change) = &change {
|
||||
next_change_id = Some(change.change_id + 1);
|
||||
}
|
||||
replay.replay(change).await.unwrap();
|
||||
}
|
||||
}
|
||||
let conn2 = db2.connect().await.unwrap();
|
||||
assert_eq!(fetch_rows(&conn2, "SELECT * FROM a").await, expected);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,59 +0,0 @@
|
||||
use crate::sync_server::DbSyncStatus;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum Error {
|
||||
#[error("database error: {0}")]
|
||||
TursoError(turso::Error),
|
||||
#[error("database tape error: {0}")]
|
||||
DatabaseTapeError(String),
|
||||
#[error("invalid URI: {0}")]
|
||||
Uri(http::uri::InvalidUri),
|
||||
#[error("invalid HTTP request: {0}")]
|
||||
Http(http::Error),
|
||||
#[error("HTTP request error: {0}")]
|
||||
HyperRequest(hyper_util::client::legacy::Error),
|
||||
#[error("HTTP response error: {0}")]
|
||||
HyperResponse(hyper::Error),
|
||||
#[error("deserialization error: {0}")]
|
||||
JsonDecode(serde_json::Error),
|
||||
#[error("unexpected sync server error: code={0}, info={1}")]
|
||||
SyncServerError(http::StatusCode, String),
|
||||
#[error("unexpected sync server status: {0:?}")]
|
||||
SyncServerUnexpectedStatus(DbSyncStatus),
|
||||
#[error("unexpected filesystem error: {0}")]
|
||||
FilesystemError(std::io::Error),
|
||||
#[error("local metadata error: {0}")]
|
||||
MetadataError(String),
|
||||
#[error("database sync error: {0}")]
|
||||
DatabaseSyncError(String),
|
||||
#[error("sync server pull error: checkpoint required: `{0:?}`")]
|
||||
PullNeedCheckpoint(DbSyncStatus),
|
||||
#[error("sync server push error: wal conflict detected")]
|
||||
PushConflict,
|
||||
#[error("sync server push error: inconsitent state on remote: `{0:?}`")]
|
||||
PushInconsistent(DbSyncStatus),
|
||||
}
|
||||
|
||||
impl From<turso::Error> for Error {
|
||||
fn from(value: turso::Error) -> Self {
|
||||
Self::TursoError(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<turso_core::LimboError> for Error {
|
||||
fn from(value: turso_core::LimboError) -> Self {
|
||||
Self::TursoError(value.into())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<std::io::Error> for Error {
|
||||
fn from(value: std::io::Error) -> Self {
|
||||
Self::FilesystemError(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<serde_json::Error> for Error {
|
||||
fn from(value: serde_json::Error) -> Self {
|
||||
Self::JsonDecode(value)
|
||||
}
|
||||
}
|
||||
@@ -1,42 +0,0 @@
|
||||
#[cfg(test)]
|
||||
pub mod test;
|
||||
pub mod tokio;
|
||||
|
||||
use crate::Result;
|
||||
use std::path::Path;
|
||||
|
||||
pub trait Filesystem {
|
||||
type File;
|
||||
fn exists_file(&self, path: &Path) -> impl std::future::Future<Output = Result<bool>> + Send;
|
||||
fn remove_file(&self, path: &Path) -> impl std::future::Future<Output = Result<()>> + Send;
|
||||
fn create_file(
|
||||
&self,
|
||||
path: &Path,
|
||||
) -> impl std::future::Future<Output = Result<Self::File>> + Send;
|
||||
fn open_file(
|
||||
&self,
|
||||
path: &Path,
|
||||
) -> impl std::future::Future<Output = Result<Self::File>> + Send;
|
||||
fn copy_file(
|
||||
&self,
|
||||
src: &Path,
|
||||
dst: &Path,
|
||||
) -> impl std::future::Future<Output = Result<()>> + Send;
|
||||
fn rename_file(
|
||||
&self,
|
||||
src: &Path,
|
||||
dst: &Path,
|
||||
) -> impl std::future::Future<Output = Result<()>> + Send;
|
||||
fn truncate_file(
|
||||
&self,
|
||||
file: &Self::File,
|
||||
size: usize,
|
||||
) -> impl std::future::Future<Output = Result<()>> + Send;
|
||||
fn write_file(
|
||||
&self,
|
||||
file: &mut Self::File,
|
||||
buf: &[u8],
|
||||
) -> impl std::future::Future<Output = Result<()>> + Send;
|
||||
fn sync_file(&self, file: &Self::File) -> impl std::future::Future<Output = Result<()>> + Send;
|
||||
fn read_file(&self, path: &Path) -> impl std::future::Future<Output = Result<Vec<u8>>> + Send;
|
||||
}
|
||||
@@ -1,97 +0,0 @@
|
||||
use std::{io::Write, sync::Arc};
|
||||
|
||||
use crate::{filesystem::Filesystem, test_context::TestContext, Result};
|
||||
|
||||
pub struct TestFilesystem {
|
||||
ctx: Arc<TestContext>,
|
||||
}
|
||||
|
||||
impl TestFilesystem {
|
||||
pub fn new(ctx: Arc<TestContext>) -> Self {
|
||||
Self { ctx }
|
||||
}
|
||||
}
|
||||
|
||||
impl Filesystem for TestFilesystem {
|
||||
type File = std::fs::File;
|
||||
|
||||
async fn exists_file(&self, path: &std::path::Path) -> Result<bool> {
|
||||
self.ctx.faulty_call("exists_file_start").await?;
|
||||
let result = std::fs::exists(path)?;
|
||||
self.ctx.faulty_call("exists_file_end").await?;
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
async fn remove_file(&self, path: &std::path::Path) -> Result<()> {
|
||||
self.ctx.faulty_call("remove_file_start").await?;
|
||||
match std::fs::remove_file(path) {
|
||||
Ok(()) => Result::Ok(()),
|
||||
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Result::Ok(()),
|
||||
Err(e) => Err(e.into()),
|
||||
}?;
|
||||
self.ctx.faulty_call("remove_file_end").await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn create_file(&self, path: &std::path::Path) -> Result<Self::File> {
|
||||
self.ctx.faulty_call("create_file_start").await?;
|
||||
let result = std::fs::OpenOptions::new()
|
||||
.write(true)
|
||||
.create_new(true)
|
||||
.open(path)?;
|
||||
self.ctx.faulty_call("create_file_end").await?;
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
async fn open_file(&self, path: &std::path::Path) -> Result<Self::File> {
|
||||
self.ctx.faulty_call("open_file_start").await?;
|
||||
let result = std::fs::OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
.open(path)?;
|
||||
self.ctx.faulty_call("open_file_end").await?;
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
async fn copy_file(&self, src: &std::path::Path, dst: &std::path::Path) -> Result<()> {
|
||||
self.ctx.faulty_call("copy_file_start").await?;
|
||||
std::fs::copy(src, dst)?;
|
||||
self.ctx.faulty_call("copy_file_end").await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn rename_file(&self, src: &std::path::Path, dst: &std::path::Path) -> Result<()> {
|
||||
self.ctx.faulty_call("rename_file_start").await?;
|
||||
std::fs::rename(src, dst)?;
|
||||
self.ctx.faulty_call("rename_file_end").await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn truncate_file(&self, file: &Self::File, size: usize) -> Result<()> {
|
||||
self.ctx.faulty_call("truncate_file_start").await?;
|
||||
file.set_len(size as u64)?;
|
||||
self.ctx.faulty_call("truncate_file_end").await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn write_file(&self, file: &mut Self::File, buf: &[u8]) -> Result<()> {
|
||||
self.ctx.faulty_call("write_file_start").await?;
|
||||
file.write_all(buf)?;
|
||||
self.ctx.faulty_call("write_file_end").await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn sync_file(&self, file: &Self::File) -> Result<()> {
|
||||
self.ctx.faulty_call("sync_file_start").await?;
|
||||
file.sync_all()?;
|
||||
self.ctx.faulty_call("sync_file_end").await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn read_file(&self, path: &std::path::Path) -> Result<Vec<u8>> {
|
||||
self.ctx.faulty_call("read_file_start").await?;
|
||||
let data = std::fs::read(path)?;
|
||||
self.ctx.faulty_call("read_file_end").await?;
|
||||
Ok(data)
|
||||
}
|
||||
}
|
||||
@@ -1,78 +0,0 @@
|
||||
use std::path::Path;
|
||||
|
||||
use tokio::io::AsyncWriteExt;
|
||||
|
||||
use crate::{filesystem::Filesystem, Result};
|
||||
|
||||
pub struct TokioFilesystem();
|
||||
|
||||
impl Filesystem for TokioFilesystem {
|
||||
type File = tokio::fs::File;
|
||||
|
||||
async fn exists_file(&self, path: &Path) -> Result<bool> {
|
||||
tracing::debug!("check file exists at {:?}", path);
|
||||
Ok(tokio::fs::try_exists(&path).await?)
|
||||
}
|
||||
|
||||
async fn remove_file(&self, path: &Path) -> Result<()> {
|
||||
tracing::debug!("remove file at {:?}", path);
|
||||
match tokio::fs::remove_file(path).await {
|
||||
Ok(()) => Ok(()),
|
||||
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
|
||||
Err(e) => Err(e.into()),
|
||||
}
|
||||
}
|
||||
|
||||
async fn create_file(&self, path: &Path) -> Result<Self::File> {
|
||||
tracing::debug!("create file at {:?}", path);
|
||||
Ok(tokio::fs::File::create_new(path)
|
||||
.await
|
||||
.inspect_err(|e| tracing::error!("failed to create file at {:?}: {}", path, e))?)
|
||||
}
|
||||
|
||||
async fn open_file(&self, path: &Path) -> Result<Self::File> {
|
||||
tracing::debug!("open file at {:?}", path);
|
||||
Ok(tokio::fs::OpenOptions::new()
|
||||
.write(true)
|
||||
.read(true)
|
||||
.open(path)
|
||||
.await?)
|
||||
}
|
||||
|
||||
async fn copy_file(&self, src: &Path, dst: &Path) -> Result<()> {
|
||||
tracing::debug!("copy file from {:?} to {:?}", src, dst);
|
||||
tokio::fs::copy(&src, &dst).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn rename_file(&self, src: &Path, dst: &Path) -> Result<()> {
|
||||
tracing::debug!("rename file from {:?} to {:?}", src, dst);
|
||||
tokio::fs::rename(&src, &dst)
|
||||
.await
|
||||
.inspect_err(|e| tracing::error!("failed to rename {:?} to {:?}: {}", src, dst, e))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn truncate_file(&self, file: &Self::File, size: usize) -> Result<()> {
|
||||
tracing::debug!("truncate file to size {}", size);
|
||||
file.set_len(size as u64).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn write_file(&self, file: &mut Self::File, buf: &[u8]) -> Result<()> {
|
||||
tracing::debug!("write buffer of size {} to file", buf.len());
|
||||
file.write_all(buf).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn sync_file(&self, file: &Self::File) -> Result<()> {
|
||||
tracing::debug!("sync file");
|
||||
file.sync_all().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn read_file(&self, path: &Path) -> Result<Vec<u8>> {
|
||||
tracing::debug!("read file {:?}", path);
|
||||
Ok(tokio::fs::read(path).await?)
|
||||
}
|
||||
}
|
||||
@@ -1,54 +0,0 @@
|
||||
pub mod database;
|
||||
pub mod database_tape;
|
||||
pub mod errors;
|
||||
pub mod types;
|
||||
|
||||
pub type Result<T> = std::result::Result<T, errors::Error>;
|
||||
|
||||
mod database_inner;
|
||||
mod filesystem;
|
||||
mod metadata;
|
||||
mod sync_server;
|
||||
#[cfg(test)]
|
||||
mod test_context;
|
||||
mod wal_session;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use tracing_subscriber::EnvFilter;
|
||||
|
||||
#[ctor::ctor]
|
||||
fn init() {
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter(EnvFilter::from_default_env())
|
||||
// .with_ansi(false)
|
||||
.init();
|
||||
}
|
||||
|
||||
pub fn seed_u64() -> u64 {
|
||||
seed().parse().unwrap_or(0)
|
||||
}
|
||||
|
||||
pub fn seed() -> String {
|
||||
std::env::var("SEED").unwrap_or("0".to_string())
|
||||
}
|
||||
|
||||
pub fn deterministic_runtime_from_seed<F: std::future::Future<Output = ()>>(
|
||||
seed: &[u8],
|
||||
f: impl Fn() -> F,
|
||||
) {
|
||||
let seed = tokio::runtime::RngSeed::from_bytes(seed);
|
||||
let runtime = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_time()
|
||||
.start_paused(true)
|
||||
.rng_seed(seed)
|
||||
.build_local(Default::default())
|
||||
.unwrap();
|
||||
runtime.block_on(f());
|
||||
}
|
||||
|
||||
pub fn deterministic_runtime<F: std::future::Future<Output = ()>>(f: impl Fn() -> F) {
|
||||
let seed = seed();
|
||||
deterministic_runtime_from_seed(seed.as_bytes(), f);
|
||||
}
|
||||
}
|
||||
@@ -1,112 +0,0 @@
|
||||
use std::path::Path;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::{errors::Error, filesystem::Filesystem, Result};
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq)]
|
||||
pub enum ActiveDatabase {
|
||||
/// Draft database is the only one from the pair which can accept writes
|
||||
/// It holds all local changes
|
||||
Draft,
|
||||
/// Synced database most of the time holds DB state from remote
|
||||
/// We can temporary apply changes from Draft DB to it - but they will be reseted almost immediately
|
||||
Synced,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
|
||||
pub struct DatabaseMetadata {
|
||||
/// Latest generation from remote which was pulled locally to the Synced DB
|
||||
pub synced_generation: usize,
|
||||
/// Latest frame number from remote which was pulled locally to the Synced DB
|
||||
pub synced_frame_no: usize,
|
||||
/// Latest change_id from CDC table in Draft DB which was successfully pushed to the remote through Synced DB
|
||||
pub synced_change_id: Option<i64>,
|
||||
/// Optional field which will store change_id from CDC table in Draft DB which was successfully transferred to the SyncedDB
|
||||
/// but not durably pushed to the remote yet
|
||||
///
|
||||
/// This can happen if WAL push will abort in the middle due to network partition, application crash, etc
|
||||
pub transferred_change_id: Option<i64>,
|
||||
/// Current active databasel
|
||||
pub active_db: ActiveDatabase,
|
||||
}
|
||||
|
||||
impl DatabaseMetadata {
|
||||
pub async fn read_from(fs: &impl Filesystem, path: &Path) -> Result<Option<Self>> {
|
||||
tracing::debug!("try read metadata from: {:?}", path);
|
||||
if !fs.exists_file(path).await? {
|
||||
tracing::debug!("no metadata found at {:?}", path);
|
||||
return Ok(None);
|
||||
}
|
||||
let contents = fs.read_file(path).await?;
|
||||
let meta = serde_json::from_slice::<DatabaseMetadata>(&contents[..])?;
|
||||
tracing::debug!("read metadata from {:?}: {:?}", path, meta);
|
||||
Ok(Some(meta))
|
||||
}
|
||||
pub async fn write_to(&self, fs: &impl Filesystem, path: &Path) -> Result<()> {
|
||||
tracing::debug!("write metadata to {:?}: {:?}", path, self);
|
||||
let directory = path.parent().ok_or_else(|| {
|
||||
Error::MetadataError(format!(
|
||||
"unable to get parent of the provided path: {path:?}",
|
||||
))
|
||||
})?;
|
||||
let filename = path
|
||||
.file_name()
|
||||
.and_then(|x| x.to_str())
|
||||
.ok_or_else(|| Error::MetadataError(format!("unable to get filename: {path:?}")))?;
|
||||
|
||||
let timestamp = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH);
|
||||
let timestamp = timestamp.map_err(|e| {
|
||||
Error::MetadataError(format!("failed to get current time for temp file: {e}"))
|
||||
})?;
|
||||
let temp_name = format!("{}.tmp.{}", filename, timestamp.as_nanos());
|
||||
let temp_path = directory.join(temp_name);
|
||||
|
||||
let data = serde_json::to_string(self)?;
|
||||
|
||||
let mut temp_file = fs.create_file(&temp_path).await?;
|
||||
let mut result = fs.write_file(&mut temp_file, data.as_bytes()).await;
|
||||
if result.is_ok() {
|
||||
result = fs.sync_file(&temp_file).await;
|
||||
}
|
||||
drop(temp_file);
|
||||
if result.is_ok() {
|
||||
result = fs.rename_file(&temp_path, path).await;
|
||||
}
|
||||
if result.is_err() {
|
||||
let _ = fs.remove_file(&temp_path).await.inspect_err(|e| {
|
||||
tracing::warn!("failed to remove temp file at {:?}: {}", temp_path, e)
|
||||
});
|
||||
}
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::{
|
||||
filesystem::tokio::TokioFilesystem,
|
||||
metadata::{ActiveDatabase, DatabaseMetadata},
|
||||
};
|
||||
|
||||
#[tokio::test]
|
||||
pub async fn metadata_simple_test() {
|
||||
let dir = tempfile::TempDir::new().unwrap();
|
||||
let path = dir.path().join("db-info");
|
||||
let meta = DatabaseMetadata {
|
||||
synced_generation: 1,
|
||||
synced_frame_no: 2,
|
||||
synced_change_id: Some(3),
|
||||
transferred_change_id: Some(4),
|
||||
active_db: ActiveDatabase::Draft,
|
||||
};
|
||||
let fs = TokioFilesystem();
|
||||
meta.write_to(&fs, &path).await.unwrap();
|
||||
|
||||
let read = DatabaseMetadata::read_from(&fs, &path)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(meta, read);
|
||||
}
|
||||
}
|
||||
@@ -1 +0,0 @@
|
||||
!empty_wal_mode.db
|
||||
Binary file not shown.
@@ -1,46 +0,0 @@
|
||||
use crate::Result;
|
||||
|
||||
#[cfg(test)]
|
||||
pub mod test;
|
||||
pub mod turso;
|
||||
|
||||
#[derive(Debug, serde::Deserialize)]
|
||||
pub struct DbSyncInfo {
|
||||
pub current_generation: usize,
|
||||
}
|
||||
|
||||
#[derive(Debug, serde::Deserialize)]
|
||||
pub struct DbSyncStatus {
|
||||
pub baton: Option<String>,
|
||||
pub status: String,
|
||||
pub generation: usize,
|
||||
pub max_frame_no: usize,
|
||||
}
|
||||
|
||||
pub trait Stream {
|
||||
fn read_chunk(
|
||||
&mut self,
|
||||
) -> impl std::future::Future<Output = Result<Option<hyper::body::Bytes>>> + Send;
|
||||
}
|
||||
|
||||
pub trait SyncServer {
|
||||
type Stream: Stream;
|
||||
fn db_info(&self) -> impl std::future::Future<Output = Result<DbSyncInfo>> + Send;
|
||||
fn db_export(
|
||||
&self,
|
||||
generation_id: usize,
|
||||
) -> impl std::future::Future<Output = Result<Self::Stream>> + Send;
|
||||
fn wal_pull(
|
||||
&self,
|
||||
generation_id: usize,
|
||||
start_frame: usize,
|
||||
) -> impl std::future::Future<Output = Result<Self::Stream>> + Send;
|
||||
fn wal_push(
|
||||
&self,
|
||||
baton: Option<String>,
|
||||
generation_id: usize,
|
||||
start_frame: usize,
|
||||
end_frame: usize,
|
||||
frames: Vec<u8>,
|
||||
) -> impl std::future::Future<Output = Result<DbSyncStatus>> + Send;
|
||||
}
|
||||
@@ -1,285 +0,0 @@
|
||||
use std::{collections::HashMap, path::Path, sync::Arc};
|
||||
|
||||
use tokio::sync::Mutex;
|
||||
use turso::{IntoParams, Value};
|
||||
|
||||
use crate::{
|
||||
errors::Error,
|
||||
sync_server::{DbSyncInfo, DbSyncStatus, Stream, SyncServer},
|
||||
test_context::TestContext,
|
||||
Result,
|
||||
};
|
||||
|
||||
struct Generation {
|
||||
snapshot: Vec<u8>,
|
||||
frames: Vec<Vec<u8>>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct SyncSession {
|
||||
baton: String,
|
||||
conn: turso::Connection,
|
||||
in_txn: bool,
|
||||
}
|
||||
|
||||
struct TestSyncServerState {
|
||||
generation: usize,
|
||||
generations: HashMap<usize, Generation>,
|
||||
sessions: HashMap<String, SyncSession>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct TestSyncServerOpts {
|
||||
pub pull_batch_size: usize,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct TestSyncServer {
|
||||
ctx: Arc<TestContext>,
|
||||
db: turso::Database,
|
||||
opts: Arc<TestSyncServerOpts>,
|
||||
state: Arc<Mutex<TestSyncServerState>>,
|
||||
}
|
||||
|
||||
pub struct TestStream {
|
||||
ctx: Arc<TestContext>,
|
||||
data: Vec<u8>,
|
||||
position: usize,
|
||||
}
|
||||
|
||||
impl TestStream {
|
||||
pub fn new(ctx: Arc<TestContext>, data: Vec<u8>) -> Self {
|
||||
Self {
|
||||
ctx,
|
||||
data,
|
||||
position: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for TestStream {
|
||||
async fn read_chunk(&mut self) -> Result<Option<hyper::body::Bytes>> {
|
||||
self.ctx
|
||||
.faulty_call(if self.position == 0 {
|
||||
"read_chunk_first"
|
||||
} else {
|
||||
"read_chunk_next"
|
||||
})
|
||||
.await?;
|
||||
let size = (self.data.len() - self.position).min(FRAME_SIZE);
|
||||
if size == 0 {
|
||||
Ok(None)
|
||||
} else {
|
||||
let chunk = &self.data[self.position..self.position + size];
|
||||
self.position += size;
|
||||
Ok(Some(hyper::body::Bytes::copy_from_slice(chunk)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const PAGE_SIZE: usize = 4096;
|
||||
const FRAME_SIZE: usize = 24 + PAGE_SIZE;
|
||||
|
||||
impl SyncServer for TestSyncServer {
|
||||
type Stream = TestStream;
|
||||
async fn db_info(&self) -> Result<DbSyncInfo> {
|
||||
self.ctx.faulty_call("db_info").await?;
|
||||
|
||||
let state = self.state.lock().await;
|
||||
Ok(DbSyncInfo {
|
||||
current_generation: state.generation,
|
||||
})
|
||||
}
|
||||
|
||||
async fn db_export(&self, generation_id: usize) -> Result<TestStream> {
|
||||
self.ctx.faulty_call("db_export").await?;
|
||||
|
||||
let state = self.state.lock().await;
|
||||
let Some(generation) = state.generations.get(&generation_id) else {
|
||||
return Err(Error::DatabaseSyncError("generation not found".to_string()));
|
||||
};
|
||||
Ok(TestStream::new(
|
||||
self.ctx.clone(),
|
||||
generation.snapshot.clone(),
|
||||
))
|
||||
}
|
||||
|
||||
async fn wal_pull(&self, generation_id: usize, start_frame: usize) -> Result<TestStream> {
|
||||
tracing::debug!("wal_pull: {}/{}", generation_id, start_frame);
|
||||
self.ctx.faulty_call("wal_pull").await?;
|
||||
|
||||
let state = self.state.lock().await;
|
||||
let Some(generation) = state.generations.get(&generation_id) else {
|
||||
return Err(Error::DatabaseSyncError("generation not found".to_string()));
|
||||
};
|
||||
let mut data = Vec::new();
|
||||
for frame_no in start_frame..start_frame + self.opts.pull_batch_size {
|
||||
let frame_idx = frame_no - 1;
|
||||
let Some(frame) = generation.frames.get(frame_idx) else {
|
||||
break;
|
||||
};
|
||||
data.extend_from_slice(frame);
|
||||
}
|
||||
if data.is_empty() {
|
||||
let last_generation = state.generations.get(&state.generation).unwrap();
|
||||
return Err(Error::PullNeedCheckpoint(DbSyncStatus {
|
||||
baton: None,
|
||||
status: "checkpoint_needed".to_string(),
|
||||
generation: state.generation,
|
||||
max_frame_no: last_generation.frames.len(),
|
||||
}));
|
||||
}
|
||||
Ok(TestStream::new(self.ctx.clone(), data))
|
||||
}
|
||||
|
||||
async fn wal_push(
|
||||
&self,
|
||||
mut baton: Option<String>,
|
||||
generation_id: usize,
|
||||
start_frame: usize,
|
||||
end_frame: usize,
|
||||
frames: Vec<u8>,
|
||||
) -> Result<super::DbSyncStatus> {
|
||||
tracing::debug!(
|
||||
"wal_push: {}/{}/{}/{:?}",
|
||||
generation_id,
|
||||
start_frame,
|
||||
end_frame,
|
||||
baton
|
||||
);
|
||||
self.ctx.faulty_call("wal_push").await?;
|
||||
|
||||
let mut session = {
|
||||
let mut state = self.state.lock().await;
|
||||
if state.generation != generation_id {
|
||||
return Err(Error::DatabaseSyncError(
|
||||
"generation id mismatch".to_string(),
|
||||
));
|
||||
}
|
||||
let baton_str = baton.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
|
||||
let session = match state.sessions.get(&baton_str) {
|
||||
Some(session) => session.clone(),
|
||||
None => {
|
||||
let session = SyncSession {
|
||||
baton: baton_str.clone(),
|
||||
conn: self.db.connect()?,
|
||||
in_txn: false,
|
||||
};
|
||||
state.sessions.insert(baton_str.clone(), session.clone());
|
||||
session
|
||||
}
|
||||
};
|
||||
baton = Some(baton_str.clone());
|
||||
session
|
||||
};
|
||||
|
||||
let mut offset = 0;
|
||||
for frame_no in start_frame..end_frame {
|
||||
if offset + FRAME_SIZE > frames.len() {
|
||||
return Err(Error::DatabaseSyncError(
|
||||
"unexpected length of frames data".to_string(),
|
||||
));
|
||||
}
|
||||
if !session.in_txn {
|
||||
session.conn.wal_insert_begin()?;
|
||||
session.in_txn = true;
|
||||
}
|
||||
let frame = &frames[offset..offset + FRAME_SIZE];
|
||||
match session.conn.wal_insert_frame(frame_no as u64, frame) {
|
||||
Ok(info) => {
|
||||
if info.is_commit_frame() {
|
||||
if session.in_txn {
|
||||
session.conn.wal_insert_end()?;
|
||||
session.in_txn = false;
|
||||
}
|
||||
self.sync_frames_from_conn(&session.conn).await?;
|
||||
}
|
||||
}
|
||||
Err(turso::Error::WalOperationError(err)) if err.contains("Conflict") => {
|
||||
session.conn.wal_insert_end()?;
|
||||
return Err(Error::PushConflict);
|
||||
}
|
||||
Err(err) => {
|
||||
session.conn.wal_insert_end()?;
|
||||
return Err(err.into());
|
||||
}
|
||||
}
|
||||
offset += FRAME_SIZE;
|
||||
}
|
||||
let mut state = self.state.lock().await;
|
||||
state
|
||||
.sessions
|
||||
.insert(baton.clone().unwrap(), session.clone());
|
||||
Ok(DbSyncStatus {
|
||||
baton: Some(session.baton.clone()),
|
||||
status: "ok".into(),
|
||||
generation: state.generation,
|
||||
max_frame_no: session.conn.wal_frame_count()? as usize,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// empty DB with single 4096-byte page and WAL mode (PRAGMA journal_mode=WAL)
|
||||
// see test test_empty_wal_mode_db_content which validates asset content
|
||||
pub const EMPTY_WAL_MODE_DB: &[u8] = include_bytes!("empty_wal_mode.db");
|
||||
|
||||
pub async fn convert_rows(rows: &mut turso::Rows) -> Result<Vec<Vec<Value>>> {
|
||||
let mut rows_values = vec![];
|
||||
while let Some(row) = rows.next().await? {
|
||||
let mut row_values = vec![];
|
||||
for i in 0..row.column_count() {
|
||||
row_values.push(row.get_value(i)?);
|
||||
}
|
||||
rows_values.push(row_values);
|
||||
}
|
||||
Ok(rows_values)
|
||||
}
|
||||
|
||||
impl TestSyncServer {
|
||||
pub async fn new(ctx: Arc<TestContext>, path: &Path, opts: TestSyncServerOpts) -> Result<Self> {
|
||||
let mut generations = HashMap::new();
|
||||
generations.insert(
|
||||
1,
|
||||
Generation {
|
||||
snapshot: EMPTY_WAL_MODE_DB.to_vec(),
|
||||
frames: Vec::new(),
|
||||
},
|
||||
);
|
||||
Ok(Self {
|
||||
ctx,
|
||||
db: turso::Builder::new_local(path.to_str().unwrap())
|
||||
.build()
|
||||
.await?,
|
||||
opts: Arc::new(opts),
|
||||
state: Arc::new(Mutex::new(TestSyncServerState {
|
||||
generation: 1,
|
||||
generations,
|
||||
sessions: HashMap::new(),
|
||||
})),
|
||||
})
|
||||
}
|
||||
pub fn db(&self) -> turso::Database {
|
||||
self.db.clone()
|
||||
}
|
||||
pub async fn execute(&self, sql: &str, params: impl IntoParams) -> Result<()> {
|
||||
let conn = self.db.connect()?;
|
||||
conn.execute(sql, params).await?;
|
||||
self.sync_frames_from_conn(&conn).await?;
|
||||
Ok(())
|
||||
}
|
||||
async fn sync_frames_from_conn(&self, conn: &turso::Connection) -> Result<()> {
|
||||
let mut state = self.state.lock().await;
|
||||
let generation = state.generation;
|
||||
let generation = state.generations.get_mut(&generation).unwrap();
|
||||
let last_frame = generation.frames.len() + 1;
|
||||
let mut frame = [0u8; FRAME_SIZE];
|
||||
let wal_frame_count = conn.wal_frame_count()?;
|
||||
tracing::debug!("conn frames count: {}", wal_frame_count);
|
||||
for frame_no in last_frame..=wal_frame_count as usize {
|
||||
conn.wal_get_frame(frame_no as u64, &mut frame)?;
|
||||
tracing::debug!("push local frame {}", frame_no);
|
||||
generation.frames.push(frame.to_vec());
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -1,202 +0,0 @@
|
||||
use std::io::Read;
|
||||
|
||||
use http::request;
|
||||
use http_body_util::BodyExt;
|
||||
use hyper::body::{Buf, Bytes};
|
||||
|
||||
use crate::{
|
||||
errors::Error,
|
||||
sync_server::{DbSyncInfo, DbSyncStatus, Stream, SyncServer},
|
||||
Result,
|
||||
};
|
||||
|
||||
pub type Client = hyper_util::client::legacy::Client<
|
||||
hyper_rustls::HttpsConnector<hyper_util::client::legacy::connect::HttpConnector>,
|
||||
http_body_util::Full<Bytes>,
|
||||
>;
|
||||
|
||||
const DEFAULT_PULL_BATCH_SIZE: usize = 100;
|
||||
|
||||
pub struct TursoSyncServerOpts {
|
||||
pub sync_url: String,
|
||||
pub auth_token: Option<String>,
|
||||
pub encryption_key: Option<String>,
|
||||
pub pull_batch_size: Option<usize>,
|
||||
}
|
||||
|
||||
pub struct TursoSyncServer {
|
||||
client: Client,
|
||||
auth_token_header: Option<hyper::header::HeaderValue>,
|
||||
opts: TursoSyncServerOpts,
|
||||
}
|
||||
|
||||
fn sync_server_error(status: http::StatusCode, body: impl Buf) -> Error {
|
||||
let mut body_str = String::new();
|
||||
if let Err(e) = body.reader().read_to_string(&mut body_str) {
|
||||
Error::SyncServerError(status, format!("unable to read response body: {e}"))
|
||||
} else {
|
||||
Error::SyncServerError(status, body_str)
|
||||
}
|
||||
}
|
||||
|
||||
async fn aggregate_body(body: hyper::body::Incoming) -> Result<impl Buf> {
|
||||
let chunks = body.collect().await;
|
||||
let chunks = chunks.map_err(Error::HyperResponse)?;
|
||||
Ok(chunks.aggregate())
|
||||
}
|
||||
|
||||
pub struct HyperStream {
|
||||
body: hyper::body::Incoming,
|
||||
}
|
||||
|
||||
impl Stream for HyperStream {
|
||||
async fn read_chunk(&mut self) -> Result<Option<hyper::body::Bytes>> {
|
||||
let Some(frame) = self.body.frame().await else {
|
||||
return Ok(None);
|
||||
};
|
||||
let frame = frame.map_err(Error::HyperResponse)?;
|
||||
let frame = frame
|
||||
.into_data()
|
||||
.map_err(|_| Error::DatabaseSyncError("failed to read export chunk".to_string()))?;
|
||||
Ok(Some(frame))
|
||||
}
|
||||
}
|
||||
|
||||
impl TursoSyncServer {
|
||||
pub fn new(client: Client, opts: TursoSyncServerOpts) -> Result<Self> {
|
||||
let auth_token_header = opts
|
||||
.auth_token
|
||||
.as_ref()
|
||||
.map(|token| hyper::header::HeaderValue::from_str(&format!("Bearer {token}")))
|
||||
.transpose()
|
||||
.map_err(|e| Error::Http(e.into()))?;
|
||||
Ok(Self {
|
||||
client,
|
||||
opts,
|
||||
auth_token_header,
|
||||
})
|
||||
}
|
||||
async fn send(
|
||||
&self,
|
||||
method: http::Method,
|
||||
url: &str,
|
||||
body: http_body_util::Full<Bytes>,
|
||||
) -> Result<(http::StatusCode, hyper::body::Incoming)> {
|
||||
let url: hyper::Uri = url.parse().map_err(Error::Uri)?;
|
||||
let mut request = request::Builder::new().uri(url).method(method);
|
||||
if let Some(auth_token_header) = &self.auth_token_header {
|
||||
request = request.header("Authorization", auth_token_header);
|
||||
}
|
||||
if let Some(encryption_key) = &self.opts.encryption_key {
|
||||
request = request.header("x-turso-encryption-key", encryption_key);
|
||||
}
|
||||
let request = request.body(body).map_err(Error::Http)?;
|
||||
let response = self.client.request(request).await;
|
||||
let response = response.map_err(Error::HyperRequest)?;
|
||||
let status = response.status();
|
||||
Ok((status, response.into_body()))
|
||||
}
|
||||
}
|
||||
|
||||
impl SyncServer for TursoSyncServer {
|
||||
type Stream = HyperStream;
|
||||
async fn db_info(&self) -> Result<DbSyncInfo> {
|
||||
tracing::debug!("db_info");
|
||||
let url = format!("{}/info", self.opts.sync_url);
|
||||
let empty = http_body_util::Full::new(Bytes::new());
|
||||
let (status, body) = self.send(http::Method::GET, &url, empty).await?;
|
||||
let body = aggregate_body(body).await?;
|
||||
|
||||
if !status.is_success() {
|
||||
return Err(sync_server_error(status, body));
|
||||
}
|
||||
|
||||
let info = serde_json::from_reader(body.reader()).map_err(Error::JsonDecode)?;
|
||||
tracing::debug!("db_info response: {:?}", info);
|
||||
Ok(info)
|
||||
}
|
||||
|
||||
async fn wal_push(
|
||||
&self,
|
||||
baton: Option<String>,
|
||||
generation_id: usize,
|
||||
start_frame: usize,
|
||||
end_frame: usize,
|
||||
frames: Vec<u8>,
|
||||
) -> Result<DbSyncStatus> {
|
||||
tracing::debug!(
|
||||
"wal_push: {}/{}/{} (baton: {:?})",
|
||||
generation_id,
|
||||
start_frame,
|
||||
end_frame,
|
||||
baton
|
||||
);
|
||||
let url = if let Some(baton) = baton {
|
||||
format!(
|
||||
"{}/sync/{}/{}/{}/{}",
|
||||
self.opts.sync_url, generation_id, start_frame, end_frame, baton
|
||||
)
|
||||
} else {
|
||||
format!(
|
||||
"{}/sync/{}/{}/{}",
|
||||
self.opts.sync_url, generation_id, start_frame, end_frame
|
||||
)
|
||||
};
|
||||
let body = http_body_util::Full::new(Bytes::from(frames));
|
||||
let (status_code, body) = self.send(http::Method::POST, &url, body).await?;
|
||||
let body = aggregate_body(body).await?;
|
||||
|
||||
if !status_code.is_success() {
|
||||
return Err(sync_server_error(status_code, body));
|
||||
}
|
||||
|
||||
let status: DbSyncStatus =
|
||||
serde_json::from_reader(body.reader()).map_err(Error::JsonDecode)?;
|
||||
|
||||
match status.status.as_str() {
|
||||
"ok" => Ok(status),
|
||||
"conflict" => Err(Error::PushConflict),
|
||||
"push_needed" => Err(Error::PushInconsistent(status)),
|
||||
_ => Err(Error::SyncServerUnexpectedStatus(status)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn db_export(&self, generation_id: usize) -> Result<HyperStream> {
|
||||
tracing::debug!("db_export: {}", generation_id);
|
||||
let url = format!("{}/export/{}", self.opts.sync_url, generation_id);
|
||||
let empty = http_body_util::Full::new(Bytes::new());
|
||||
let (status, body) = self.send(http::Method::GET, &url, empty).await?;
|
||||
if !status.is_success() {
|
||||
let body = aggregate_body(body).await?;
|
||||
return Err(sync_server_error(status, body));
|
||||
}
|
||||
Ok(HyperStream { body })
|
||||
}
|
||||
|
||||
async fn wal_pull(&self, generation_id: usize, start_frame: usize) -> Result<HyperStream> {
|
||||
let batch = self.opts.pull_batch_size.unwrap_or(DEFAULT_PULL_BATCH_SIZE);
|
||||
let end_frame = start_frame + batch;
|
||||
tracing::debug!("wall_pull: {}/{}/{}", generation_id, start_frame, end_frame);
|
||||
let url = format!(
|
||||
"{}/sync/{}/{}/{}",
|
||||
self.opts.sync_url, generation_id, start_frame, end_frame
|
||||
);
|
||||
let empty = http_body_util::Full::new(Bytes::new());
|
||||
let (status, body) = self.send(http::Method::GET, &url, empty).await?;
|
||||
if status == http::StatusCode::BAD_REQUEST {
|
||||
let body = aggregate_body(body).await?;
|
||||
let status: DbSyncStatus =
|
||||
serde_json::from_reader(body.reader()).map_err(Error::JsonDecode)?;
|
||||
if status.status == "checkpoint_needed" {
|
||||
return Err(Error::PullNeedCheckpoint(status));
|
||||
} else {
|
||||
return Err(Error::SyncServerUnexpectedStatus(status));
|
||||
}
|
||||
}
|
||||
if !status.is_success() {
|
||||
let body = aggregate_body(body).await?;
|
||||
return Err(sync_server_error(status, body));
|
||||
}
|
||||
Ok(HyperStream { body })
|
||||
}
|
||||
}
|
||||
@@ -1,139 +0,0 @@
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use rand::SeedableRng;
|
||||
use rand_chacha::ChaCha8Rng;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use crate::{errors::Error, Result};
|
||||
|
||||
type PinnedFuture = Pin<Box<dyn Future<Output = bool> + Send>>;
|
||||
|
||||
pub struct FaultInjectionPlan {
|
||||
pub is_fault: Box<dyn Fn(String, String) -> PinnedFuture + Send + Sync>,
|
||||
}
|
||||
|
||||
pub enum FaultInjectionStrategy {
|
||||
Disabled,
|
||||
Record,
|
||||
Enabled { plan: FaultInjectionPlan },
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for FaultInjectionStrategy {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Self::Disabled => write!(f, "Disabled"),
|
||||
Self::Record => write!(f, "Record"),
|
||||
Self::Enabled { .. } => write!(f, "Enabled"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct TestContext {
|
||||
fault_injection: Mutex<FaultInjectionStrategy>,
|
||||
faulty_call: Mutex<HashSet<(String, String)>>,
|
||||
rng: Mutex<ChaCha8Rng>,
|
||||
}
|
||||
|
||||
pub struct FaultSession {
|
||||
ctx: Arc<TestContext>,
|
||||
recording: bool,
|
||||
plans: Option<Vec<FaultInjectionPlan>>,
|
||||
}
|
||||
|
||||
impl FaultSession {
|
||||
pub async fn next(&mut self) -> Option<FaultInjectionStrategy> {
|
||||
if !self.recording {
|
||||
self.recording = true;
|
||||
return Some(FaultInjectionStrategy::Record);
|
||||
}
|
||||
if self.plans.is_none() {
|
||||
self.plans = Some(self.ctx.enumerate_simple_plans().await);
|
||||
}
|
||||
|
||||
let plans = self.plans.as_mut().unwrap();
|
||||
if plans.is_empty() {
|
||||
return None;
|
||||
}
|
||||
|
||||
let plan = plans.pop().unwrap();
|
||||
Some(FaultInjectionStrategy::Enabled { plan })
|
||||
}
|
||||
}
|
||||
|
||||
impl TestContext {
|
||||
pub fn new(seed: u64) -> Self {
|
||||
Self {
|
||||
rng: Mutex::new(ChaCha8Rng::seed_from_u64(seed)),
|
||||
fault_injection: Mutex::new(FaultInjectionStrategy::Disabled),
|
||||
faulty_call: Mutex::new(HashSet::new()),
|
||||
}
|
||||
}
|
||||
pub async fn rng(&self) -> tokio::sync::MutexGuard<ChaCha8Rng> {
|
||||
self.rng.lock().await
|
||||
}
|
||||
pub fn fault_session(self: &Arc<Self>) -> FaultSession {
|
||||
FaultSession {
|
||||
ctx: self.clone(),
|
||||
recording: false,
|
||||
plans: None,
|
||||
}
|
||||
}
|
||||
pub async fn switch_mode(&self, updated: FaultInjectionStrategy) {
|
||||
let mut mode = self.fault_injection.lock().await;
|
||||
tracing::info!("switch fault injection mode: {:?}", updated);
|
||||
*mode = updated;
|
||||
}
|
||||
pub async fn enumerate_simple_plans(&self) -> Vec<FaultInjectionPlan> {
|
||||
let mut plans = vec![];
|
||||
for call in self.faulty_call.lock().await.iter() {
|
||||
let mut fault_counts = HashMap::new();
|
||||
fault_counts.insert(call.clone(), 1);
|
||||
|
||||
let count = Arc::new(Mutex::new(1));
|
||||
let call = call.clone();
|
||||
plans.push(FaultInjectionPlan {
|
||||
is_fault: Box::new(move |name, bt| {
|
||||
let call = call.clone();
|
||||
let count = count.clone();
|
||||
Box::pin(async move {
|
||||
if (name, bt) != call {
|
||||
return false;
|
||||
}
|
||||
let mut count = count.lock().await;
|
||||
*count -= 1;
|
||||
*count >= 0
|
||||
})
|
||||
}),
|
||||
})
|
||||
}
|
||||
plans
|
||||
}
|
||||
pub async fn faulty_call(&self, name: &str) -> Result<()> {
|
||||
tracing::trace!("faulty_call: {}", name);
|
||||
tokio::task::yield_now().await;
|
||||
if let FaultInjectionStrategy::Disabled = &*self.fault_injection.lock().await {
|
||||
return Ok(());
|
||||
}
|
||||
let bt = std::backtrace::Backtrace::force_capture().to_string();
|
||||
match &mut *self.fault_injection.lock().await {
|
||||
FaultInjectionStrategy::Record => {
|
||||
let mut call_sites = self.faulty_call.lock().await;
|
||||
call_sites.insert((name.to_string(), bt));
|
||||
Ok(())
|
||||
}
|
||||
FaultInjectionStrategy::Enabled { plan } => {
|
||||
if plan.is_fault.as_ref()(name.to_string(), bt.clone()).await {
|
||||
Err(Error::DatabaseSyncError("injected fault".to_string()))
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
_ => unreachable!("Disabled case handled above"),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,210 +0,0 @@
|
||||
use crate::{errors::Error, Result};
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq)]
|
||||
pub enum DatabaseChangeType {
|
||||
Delete,
|
||||
Update,
|
||||
Insert,
|
||||
}
|
||||
|
||||
/// [DatabaseChange] struct represents data from CDC table as-i
|
||||
/// (see `turso_cdc_table_columns` definition in turso-core)
|
||||
#[derive(Clone)]
|
||||
pub struct DatabaseChange {
|
||||
/// Monotonically incrementing change number
|
||||
pub change_id: i64,
|
||||
/// Unix timestamp of the change (not guaranteed to be strictly monotonic as host clocks can drift)
|
||||
pub change_time: u64,
|
||||
/// Type of the change
|
||||
pub change_type: DatabaseChangeType,
|
||||
/// Table of the change
|
||||
pub table_name: String,
|
||||
/// Rowid of changed row
|
||||
pub id: i64,
|
||||
/// Binary record of the row before the change, if CDC pragma set to either 'before' or 'full'
|
||||
pub before: Option<Vec<u8>>,
|
||||
/// Binary record of the row after the change, if CDC pragma set to either 'after' or 'full'
|
||||
pub after: Option<Vec<u8>>,
|
||||
}
|
||||
|
||||
impl DatabaseChange {
|
||||
/// Converts [DatabaseChange] into the operation which effect will be the application of the change
|
||||
pub fn into_apply(self) -> Result<DatabaseTapeRowChange> {
|
||||
let tape_change = match self.change_type {
|
||||
DatabaseChangeType::Delete => DatabaseTapeRowChangeType::Delete,
|
||||
DatabaseChangeType::Update => DatabaseTapeRowChangeType::Update {
|
||||
bin_record: self.after.ok_or_else(|| {
|
||||
Error::DatabaseTapeError(
|
||||
"cdc_mode must be set to either 'full' or 'after'".to_string(),
|
||||
)
|
||||
})?,
|
||||
},
|
||||
DatabaseChangeType::Insert => DatabaseTapeRowChangeType::Insert {
|
||||
bin_record: self.after.ok_or_else(|| {
|
||||
Error::DatabaseTapeError(
|
||||
"cdc_mode must be set to either 'full' or 'after'".to_string(),
|
||||
)
|
||||
})?,
|
||||
},
|
||||
};
|
||||
Ok(DatabaseTapeRowChange {
|
||||
change_id: self.change_id,
|
||||
change_time: self.change_time,
|
||||
change: tape_change,
|
||||
table_name: self.table_name,
|
||||
id: self.id,
|
||||
})
|
||||
}
|
||||
/// Converts [DatabaseChange] into the operation which effect will be the revert of the change
|
||||
pub fn into_revert(self) -> Result<DatabaseTapeRowChange> {
|
||||
let tape_change = match self.change_type {
|
||||
DatabaseChangeType::Delete => DatabaseTapeRowChangeType::Insert {
|
||||
bin_record: self.before.ok_or_else(|| {
|
||||
Error::DatabaseTapeError(
|
||||
"cdc_mode must be set to either 'full' or 'before'".to_string(),
|
||||
)
|
||||
})?,
|
||||
},
|
||||
DatabaseChangeType::Update => DatabaseTapeRowChangeType::Update {
|
||||
bin_record: self.before.ok_or_else(|| {
|
||||
Error::DatabaseTapeError(
|
||||
"cdc_mode must be set to either 'full' or 'before'".to_string(),
|
||||
)
|
||||
})?,
|
||||
},
|
||||
DatabaseChangeType::Insert => DatabaseTapeRowChangeType::Delete,
|
||||
};
|
||||
Ok(DatabaseTapeRowChange {
|
||||
change_id: self.change_id,
|
||||
change_time: self.change_time,
|
||||
change: tape_change,
|
||||
table_name: self.table_name,
|
||||
id: self.id,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for DatabaseChange {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("DatabaseChange")
|
||||
.field("change_id", &self.change_id)
|
||||
.field("change_time", &self.change_time)
|
||||
.field("change_type", &self.change_type)
|
||||
.field("table_name", &self.table_name)
|
||||
.field("id", &self.id)
|
||||
.field("before.len()", &self.before.as_ref().map(|x| x.len()))
|
||||
.field("after.len()", &self.after.as_ref().map(|x| x.len()))
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<turso::Row> for DatabaseChange {
|
||||
type Error = Error;
|
||||
|
||||
fn try_from(row: turso::Row) -> Result<Self> {
|
||||
let change_id = get_value_i64(&row, 0)?;
|
||||
let change_time = get_value_i64(&row, 1)? as u64;
|
||||
let change_type = get_value_i64(&row, 2)?;
|
||||
let table_name = get_value_text(&row, 3)?;
|
||||
let id = get_value_i64(&row, 4)?;
|
||||
let before = get_value_blob_or_null(&row, 5)?;
|
||||
let after = get_value_blob_or_null(&row, 6)?;
|
||||
|
||||
let change_type = match change_type {
|
||||
-1 => DatabaseChangeType::Delete,
|
||||
0 => DatabaseChangeType::Update,
|
||||
1 => DatabaseChangeType::Insert,
|
||||
v => {
|
||||
return Err(Error::DatabaseTapeError(format!(
|
||||
"unexpected change type: expected -1|0|1, got '{v:?}'"
|
||||
)))
|
||||
}
|
||||
};
|
||||
Ok(Self {
|
||||
change_id,
|
||||
change_time,
|
||||
change_type,
|
||||
table_name,
|
||||
id,
|
||||
before,
|
||||
after,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub enum DatabaseTapeRowChangeType {
|
||||
Delete,
|
||||
Update { bin_record: Vec<u8> },
|
||||
Insert { bin_record: Vec<u8> },
|
||||
}
|
||||
|
||||
/// [DatabaseTapeOperation] extends [DatabaseTapeRowChange] by adding information about transaction boundary
|
||||
///
|
||||
/// This helps [crate::database_tape::DatabaseTapeSession] to properly maintain transaction state and COMMIT or ROLLBACK changes in appropriate time
|
||||
/// by consuming events from [crate::database_tape::DatabaseChangesIterator]
|
||||
#[derive(Debug)]
|
||||
pub enum DatabaseTapeOperation {
|
||||
RowChange(DatabaseTapeRowChange),
|
||||
Commit,
|
||||
}
|
||||
|
||||
/// [DatabaseTapeRowChange] is the specific operation over single row which can be performed on database
|
||||
#[derive(Debug)]
|
||||
pub struct DatabaseTapeRowChange {
|
||||
pub change_id: i64,
|
||||
pub change_time: u64,
|
||||
pub change: DatabaseTapeRowChangeType,
|
||||
pub table_name: String,
|
||||
pub id: i64,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for DatabaseTapeRowChangeType {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Self::Delete => write!(f, "Delete"),
|
||||
Self::Update { bin_record } => f
|
||||
.debug_struct("Update")
|
||||
.field("bin_record.len()", &bin_record.len())
|
||||
.finish(),
|
||||
Self::Insert { bin_record } => f
|
||||
.debug_struct("Insert")
|
||||
.field("bin_record.len()", &bin_record.len())
|
||||
.finish(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn get_value(row: &turso::Row, index: usize) -> Result<turso::Value> {
|
||||
row.get_value(index).map_err(Error::TursoError)
|
||||
}
|
||||
|
||||
fn get_value_i64(row: &turso::Row, index: usize) -> Result<i64> {
|
||||
let v = get_value(row, index)?;
|
||||
match v {
|
||||
turso::Value::Integer(v) => Ok(v),
|
||||
v => Err(Error::DatabaseTapeError(format!(
|
||||
"column {index} type mismatch: expected integer, got '{v:?}'"
|
||||
))),
|
||||
}
|
||||
}
|
||||
|
||||
fn get_value_text(row: &turso::Row, index: usize) -> Result<String> {
|
||||
let v = get_value(row, index)?;
|
||||
match v {
|
||||
turso::Value::Text(x) => Ok(x),
|
||||
v => Err(Error::DatabaseTapeError(format!(
|
||||
"column {index} type mismatch: expected string, got '{v:?}'"
|
||||
))),
|
||||
}
|
||||
}
|
||||
|
||||
fn get_value_blob_or_null(row: &turso::Row, index: usize) -> Result<Option<Vec<u8>>> {
|
||||
let v = get_value(row, index)?;
|
||||
match v {
|
||||
turso::Value::Null => Ok(None),
|
||||
turso::Value::Blob(x) => Ok(Some(x)),
|
||||
v => Err(Error::DatabaseTapeError(format!(
|
||||
"column {index} type mismatch: expected blob, got '{v:?}'"
|
||||
))),
|
||||
}
|
||||
}
|
||||
@@ -1,40 +0,0 @@
|
||||
use crate::Result;
|
||||
|
||||
pub struct WalSession<'a> {
|
||||
conn: &'a turso::Connection,
|
||||
in_txn: bool,
|
||||
}
|
||||
|
||||
impl<'a> WalSession<'a> {
|
||||
pub fn new(conn: &'a turso::Connection) -> Self {
|
||||
Self {
|
||||
conn,
|
||||
in_txn: false,
|
||||
}
|
||||
}
|
||||
pub fn begin(&mut self) -> Result<()> {
|
||||
assert!(!self.in_txn);
|
||||
self.conn.wal_insert_begin()?;
|
||||
self.in_txn = true;
|
||||
Ok(())
|
||||
}
|
||||
pub fn end(&mut self) -> Result<()> {
|
||||
assert!(self.in_txn);
|
||||
self.conn.wal_insert_end()?;
|
||||
self.in_txn = false;
|
||||
Ok(())
|
||||
}
|
||||
pub fn in_txn(&self) -> bool {
|
||||
self.in_txn
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Drop for WalSession<'a> {
|
||||
fn drop(&mut self) {
|
||||
if self.in_txn {
|
||||
let _ = self
|
||||
.end()
|
||||
.inspect_err(|e| tracing::error!("failed to close WAL session: {}", e));
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user