From 33ef1aa0da7dfe9416025e26e98f9cf9d48c9119 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Tue, 12 Aug 2025 23:33:28 +0400 Subject: [PATCH 1/6] add tracing option to the SyncEngine --- packages/turso-sync-js/Cargo.toml | 1 + packages/turso-sync-js/index.d.ts | 1 + packages/turso-sync-js/src/lib.rs | 23 ++++++++++++++++++++++- 3 files changed, 24 insertions(+), 1 deletion(-) diff --git a/packages/turso-sync-js/Cargo.toml b/packages/turso-sync-js/Cargo.toml index 78b584099..e16f64891 100644 --- a/packages/turso-sync-js/Cargo.toml +++ b/packages/turso-sync-js/Cargo.toml @@ -17,6 +17,7 @@ turso_sync_engine = { workspace = true } turso_core = { workspace = true } turso_node = { workspace = true } genawaiter = { version = "0.99.1", default-features = false } +tracing-subscriber = "0.3.19" [build-dependencies] napi-build = "2.2.3" diff --git a/packages/turso-sync-js/index.d.ts b/packages/turso-sync-js/index.d.ts index 737681857..fe82c3575 100644 --- a/packages/turso-sync-js/index.d.ts +++ b/packages/turso-sync-js/index.d.ts @@ -160,4 +160,5 @@ export interface SyncEngineOpts { path: string clientName?: string walPullBatchSize?: number + enableTracing?: boolean } diff --git a/packages/turso-sync-js/src/lib.rs b/packages/turso-sync-js/src/lib.rs index 47711603f..e83b8d9ec 100644 --- a/packages/turso-sync-js/src/lib.rs +++ b/packages/turso-sync-js/src/lib.rs @@ -3,10 +3,11 @@ pub mod generator; pub mod js_protocol_io; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, OnceLock}; use napi::bindgen_prelude::AsyncTask; use napi_derive::napi; +use tracing_subscriber::{filter::LevelFilter, fmt::format::FmtSpan}; use turso_node::IoLoopTask; use turso_sync_engine::{ database_sync_engine::{DatabaseSyncEngine, DatabaseSyncEngineOpts}, @@ -39,12 +40,32 @@ pub struct SyncEngineOpts { pub path: String, pub client_name: Option, pub wal_pull_batch_size: Option, + pub enable_tracing: Option, +} + +static TRACING_INIT: OnceLock<()> = OnceLock::new(); +fn init_tracing(level_filter: LevelFilter) { + TRACING_INIT.get_or_init(|| { + tracing_subscriber::fmt() + .with_ansi(false) + .with_thread_ids(true) + .with_span_events(FmtSpan::ACTIVE) + .with_max_level(level_filter) + .init(); + }); } #[napi] impl SyncEngine { #[napi(constructor)] pub fn new(opts: SyncEngineOpts) -> napi::Result { + // helpful for local debugging + match opts.enable_tracing.as_deref() { + Some("info") => init_tracing(LevelFilter::INFO), + Some("debug") => init_tracing(LevelFilter::DEBUG), + Some("trace") => init_tracing(LevelFilter::TRACE), + _ => {} + } let is_memory = opts.path == ":memory:"; let io: Arc = if is_memory { Arc::new(turso_core::MemoryIO::new()) From 2ca8a15dcc8ec79eddfaf7068fab4e1aa3241506 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 13 Aug 2025 12:29:03 +0400 Subject: [PATCH 2/6] switch from Buffer to Vec for now as buffers are not supported by default in browser --- packages/turso-sync-js/src/js_protocol_io.rs | 4 ++-- packages/turso-sync-js/sync_engine.ts | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/turso-sync-js/src/js_protocol_io.rs b/packages/turso-sync-js/src/js_protocol_io.rs index 40044f6d2..fffd7d026 100644 --- a/packages/turso-sync-js/src/js_protocol_io.rs +++ b/packages/turso-sync-js/src/js_protocol_io.rs @@ -14,7 +14,7 @@ pub enum JsProtocolRequest { Http { method: String, path: String, - body: Option, + body: Option>, }, FullRead { path: String, @@ -134,7 +134,7 @@ impl ProtocolIO for JsProtocolIo { Ok(self.add_request(JsProtocolRequest::Http { method: method.to_string(), path: path.to_string(), - body: body.map(Buffer::from), + body, })) } diff --git a/packages/turso-sync-js/sync_engine.ts b/packages/turso-sync-js/sync_engine.ts index 4a001dfae..dac182a76 100644 --- a/packages/turso-sync-js/sync_engine.ts +++ b/packages/turso-sync-js/sync_engine.ts @@ -66,7 +66,7 @@ async function process(opts, request) { const response = await fetch(`${opts.url}${requestType.path}`, { method: requestType.method, headers: opts.headers, - body: requestType.body + body: requestType.body != null ? new Uint8Array(requestType.body) : null, }); completion.status(response.status); const reader = response.body.getReader(); From 615207fb9c2896a5cbf0ee4747d811f20029bde8 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 13 Aug 2025 12:29:39 +0400 Subject: [PATCH 3/6] use proper event loop in core connection in order to handle all cases properly - otherwise, in case of schema change, connection will constantly get Database schema is changed error as reprepare logic is implemented in the statement event loop --- Cargo.lock | 1 + core/lib.rs | 47 +++++++++++++++++------------------------------ 2 files changed, 18 insertions(+), 30 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8081dcc3f..545aa6095 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4028,6 +4028,7 @@ dependencies = [ "napi", "napi-build", "napi-derive", + "tracing-subscriber", "turso_core", "turso_node", "turso_sync_engine", diff --git a/core/lib.rs b/core/lib.rs index 85bf799c1..0f0e9f618 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -983,21 +983,8 @@ impl Connection { input, )?; - let mut stmt = - Statement::new(program, self._db.mv_store.clone(), pager.clone()); - - loop { - match stmt.step()? { - vdbe::StepResult::Done => { - break; - } - vdbe::StepResult::IO => stmt.run_once()?, - vdbe::StepResult::Row => {} - vdbe::StepResult::Interrupt | vdbe::StepResult::Busy => { - return Err(LimboError::Busy) - } - } - } + Statement::new(program, self._db.mv_store.clone(), pager.clone()) + .run_ignore_rows()?; } _ => unreachable!(), } @@ -1118,21 +1105,8 @@ impl Connection { input, )?; - let mut stmt = - Statement::new(program, self._db.mv_store.clone(), pager.clone()); - - loop { - match stmt.step()? { - vdbe::StepResult::Done => { - break; - } - vdbe::StepResult::IO => stmt.run_once()?, - vdbe::StepResult::Row => {} - vdbe::StepResult::Interrupt | vdbe::StepResult::Busy => { - return Err(LimboError::Busy) - } - } - } + Statement::new(program, self._db.mv_store.clone(), pager.clone()) + .run_ignore_rows()?; } } } @@ -1970,6 +1944,19 @@ impl Statement { res } + pub(crate) fn run_ignore_rows(&mut self) -> Result<()> { + loop { + match self.step()? { + vdbe::StepResult::Done => return Ok(()), + vdbe::StepResult::IO => self.run_once()?, + vdbe::StepResult::Row => continue, + vdbe::StepResult::Interrupt | vdbe::StepResult::Busy => { + return Err(LimboError::Busy) + } + } + } + } + #[instrument(skip_all, level = Level::DEBUG)] fn reprepare(&mut self) -> Result<()> { tracing::trace!("repreparing statement"); From 80476b3069f2cd460f368e10b4b2ef51f7608077 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Tue, 12 Aug 2025 23:34:01 +0400 Subject: [PATCH 4/6] bypass database registry for all dbs which path starts with :memory: prefix - sync engine create pair of databases and they must be isolated but live in the same MemoryIO - the problem can happen if there will be 2 sync engines with MemoryIO storage layer - as they all will create :memory:-draft and :memory:-synced DBs --- core/lib.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/lib.rs b/core/lib.rs index 0f0e9f618..c260694cf 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -255,8 +255,8 @@ impl Database { enable_indexes: bool, enable_views: bool, ) -> Result> { - if path == ":memory:" { - return Self::do_open_with_flags( + if path.starts_with(":memory:") { + return Self::open_with_flags_bypass_registry( io, path, db_file, @@ -277,7 +277,7 @@ impl Database { if let Some(db) = registry.get(&canonical_path).and_then(Weak::upgrade) { return Ok(db); } - let db = Self::do_open_with_flags( + let db = Self::open_with_flags_bypass_registry( io, path, db_file, @@ -291,7 +291,7 @@ impl Database { } #[allow(clippy::arc_with_non_send_sync)] - fn do_open_with_flags( + fn open_with_flags_bypass_registry( io: Arc, path: &str, db_file: Arc, From eff8d8540d1e83214459822ac6eeb0d3409ecc24 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 13 Aug 2025 17:08:07 +0400 Subject: [PATCH 5/6] fix bug and add test with concurrent dbs in sync --- .../src/database_sync_engine.rs | 102 ++++++++++++++++++ .../src/database_sync_operations.rs | 4 + 2 files changed, 106 insertions(+) diff --git a/packages/turso-sync-engine/src/database_sync_engine.rs b/packages/turso-sync-engine/src/database_sync_engine.rs index 4621b61d5..a19b85be8 100644 --- a/packages/turso-sync-engine/src/database_sync_engine.rs +++ b/packages/turso-sync-engine/src/database_sync_engine.rs @@ -456,6 +456,7 @@ pub mod tests { use crate::{ database_sync_engine::DatabaseSyncEngineOpts, + errors::Error, test_context::{FaultInjectionStrategy, TestContext}, test_protocol_io::TestProtocolIo, test_sync_server::convert_rows, @@ -645,6 +646,107 @@ pub mod tests { }); } + #[test] + pub fn test_sync_many_dbs_update_sync_concurrent() { + deterministic_runtime(async || { + let io: Arc = Arc::new(turso_core::MemoryIO::new()); + let dir = tempfile::TempDir::new().unwrap(); + let server_path = dir.path().join("server.db"); + let ctx = Arc::new(TestContext::new(seed_u64())); + let protocol = TestProtocolIo::new(ctx.clone(), &server_path) + .await + .unwrap(); + protocol + .server + .execute("CREATE TABLE t(x TEXT PRIMARY KEY, y)", ()) + .await + .unwrap(); + protocol + .server + .execute( + "INSERT INTO t VALUES ('id-1', 'client1'), ('id-2', 'client2')", + (), + ) + .await + .unwrap(); + let mut runner1 = TestRunner::new(ctx.clone(), io.clone(), protocol.clone()); + runner1 + .init( + ":memory:-1", + DatabaseSyncEngineOpts { + client_name: "id-1".to_string(), + wal_pull_batch_size: 2, + }, + ) + .await + .unwrap(); + let mut runner2 = TestRunner::new(ctx.clone(), io.clone(), protocol.clone()); + runner2 + .init( + ":memory:-2", + DatabaseSyncEngineOpts { + client_name: "id-2".to_string(), + wal_pull_batch_size: 2, + }, + ) + .await + .unwrap(); + + let conn1 = runner1.connect().await.unwrap(); + let conn2 = runner2.connect().await.unwrap(); + + let syncs1 = async move { + for i in 0..10 { + tracing::info!("sync attempt #{i}"); + match runner1.sync().await { + Ok(()) | Err(Error::DatabaseSyncEngineConflict(..)) => continue, + Err(err) => panic!("unexpected error: {err}"), + } + } + }; + + let syncs2 = async move { + for i in 0..10 { + tracing::info!("sync attempt #{i}"); + match runner2.sync().await { + Ok(()) | Err(Error::DatabaseSyncEngineConflict(..)) => continue, + Err(err) => panic!("unexpected error: {err}"), + } + } + }; + + let ctx1 = ctx.clone(); + let updates1 = async move { + for i in 0..100 { + tracing::info!("update attempt #{i}"); + let sql = format!("INSERT INTO t VALUES ('key-1-{i}', 'value')"); + match conn1.execute(&sql, ()).await { + Ok(_) => {} + Err(err) if err.to_string().contains("database is locked") => {} + Err(err) => panic!("update failed: {err}"), + } + ctx1.random_sleep_n(10).await; + } + }; + + let ctx2 = ctx.clone(); + let updates2 = async move { + for i in 0..100 { + tracing::info!("update attempt #{i}"); + let sql = format!("INSERT INTO t VALUES ('key-2-{i}', 'value')"); + match conn2.execute(&sql, ()).await { + Ok(_) => {} + Err(err) if err.to_string().contains("database is locked") => {} + Err(err) => panic!("update failed: {err}"), + } + ctx2.random_sleep_n(10).await; + } + }; + + join!(updates1, updates2, syncs1, syncs2); + }); + } + #[test] pub fn test_sync_single_db_many_pulls_big_payloads() { deterministic_runtime(async || { diff --git a/packages/turso-sync-engine/src/database_sync_operations.rs b/packages/turso-sync-engine/src/database_sync_operations.rs index 8f21868af..84d968383 100644 --- a/packages/turso-sync-engine/src/database_sync_operations.rs +++ b/packages/turso-sync-engine/src/database_sync_operations.rs @@ -179,6 +179,10 @@ pub async fn wal_pull<'a, C: ProtocolIO, U: AsyncFnMut(&'a Coro, u64) -> Result< } coro.yield_(ProtocolCommand::IO).await?; } + if start_frame < end_frame { + // chunk which was sent from the server has ended early - so there is nothing left on server-side for pull + return Ok(WalPullResult::Done); + } if !buffer.is_empty() { return Err(Error::DatabaseSyncEngineError(format!( "wal_pull: response has unexpected trailing data: buffer.len()={}", From 56b86cd5f53fbcae60cefc5c505252229569c6bd Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 13 Aug 2025 17:16:46 +0400 Subject: [PATCH 6/6] add comment about :memory: in sync-engine --- core/lib.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/lib.rs b/core/lib.rs index c260694cf..e769310f7 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -255,6 +255,8 @@ impl Database { enable_indexes: bool, enable_views: bool, ) -> Result> { + // turso-sync-engine create 2 databases with different names in the same IO if MemoryIO is used + // in this case we need to bypass registry (as this is MemoryIO DB) but also preserve original distinction in names (e.g. :memory:-draft and :memory:-synced) if path.starts_with(":memory:") { return Self::open_with_flags_bypass_registry( io,