diff --git a/packages/turso-sync-engine/src/database_sync_engine.rs b/packages/turso-sync-engine/src/database_sync_engine.rs index dafdfbc9b..e9ddd038b 100644 --- a/packages/turso-sync-engine/src/database_sync_engine.rs +++ b/packages/turso-sync-engine/src/database_sync_engine.rs @@ -5,6 +5,7 @@ use crate::{ checkpoint_wal_file, db_bootstrap, reset_wal_file, transfer_logical_changes, transfer_physical_changes, wait_full_body, wal_pull, wal_push, WalPullResult, }, + database_tape::DatabaseTape, errors::Error, io_operations::IoOperations, protocol_io::ProtocolIO, @@ -22,6 +23,7 @@ pub struct DatabaseSyncEngineOpts { pub struct DatabaseSyncEngine { io: Arc, protocol: Arc

, + draft_tape: DatabaseTape, draft_path: String, synced_path: String, meta_path: String, @@ -74,10 +76,13 @@ impl DatabaseSyncEngine { path: &str, opts: DatabaseSyncEngineOpts, ) -> Result { + let draft_path = format!("{path}-draft"); + let draft_tape = io.open_tape(&draft_path, true)?; let mut db = Self { io, protocol, - draft_path: format!("{path}-draft"), + draft_tape, + draft_path, synced_path: format!("{path}-synced"), meta_path: format!("{path}-info"), opts, @@ -90,8 +95,7 @@ impl DatabaseSyncEngine { /// Create database connection and appropriately configure it before use pub async fn connect(&self, coro: &Coro) -> Result> { - let db = self.io.open_tape(&self.draft_path, true)?; - db.connect(coro).await + self.draft_tape.connect(coro).await } /// Sync all new changes from remote DB and apply them locally @@ -114,13 +118,11 @@ impl DatabaseSyncEngine { { // we will "replay" Synced WAL to the Draft WAL later without pushing it to the remote // so, we pass 'capture: true' as we need to preserve all changes for future push of WAL - let draft = self.io.open_tape(&self.draft_path, true)?; - tracing::info!("opened draft"); let synced = self.io.open_tape(&self.synced_path, true)?; tracing::info!("opened synced"); // we will start wal write session for Draft DB in order to hold write lock during transfer of changes - let mut draft_session = WalSession::new(draft.connect(coro).await?); + let mut draft_session = WalSession::new(self.draft_tape.connect(coro).await?); draft_session.begin()?; // mark Synced as dirty as we will start transfer of logical changes there and if we will fail in the middle - we will need to cleanup Synced db @@ -128,7 +130,7 @@ impl DatabaseSyncEngine { // transfer logical changes to the Synced DB in order to later execute physical "rebase" operation let client_id = &self.meta().client_unique_id; - transfer_logical_changes(coro, &draft, &synced, client_id, true).await?; + transfer_logical_changes(coro, &self.draft_tape, &synced, client_id, true).await?; // now we are ready to do the rebase: let's transfer physical changes from Synced to Draft let synced_wal_watermark = self.meta().synced_wal_match_watermark; @@ -209,14 +211,13 @@ impl DatabaseSyncEngine { // we will push Synced WAL to the remote // so, we pass 'capture: false' as we don't need to preserve changes made to Synced WAL in turso_cdc - let draft = self.io.open_tape(&self.draft_path, true)?; let synced = self.io.open_tape(&self.synced_path, false)?; // mark Synced as dirty as we will start transfer of logical changes there and if we will fail in the middle - we will need to cleanup Synced db self.synced_is_dirty = true; let client_id = &self.meta().client_unique_id; - transfer_logical_changes(coro, &draft, &synced, client_id, false).await?; + transfer_logical_changes(coro, &self.draft_tape, &synced, client_id, false).await?; self.push_synced_to_remote(coro).await?; Ok(()) @@ -450,6 +451,7 @@ pub mod tests { use std::{collections::BTreeMap, sync::Arc}; use rand::RngCore; + use tokio::join; use crate::{ database_sync_engine::DatabaseSyncEngineOpts, @@ -476,12 +478,12 @@ pub mod tests { .await .unwrap(); let mut runner = TestRunner::new(ctx.clone(), io, protocol.clone()); - let local_path = dir.path().join("local.db"); + let local_path = dir.path().join("local.db").to_str().unwrap().to_string(); let opts = DatabaseSyncEngineOpts { client_name: "id-1".to_string(), wal_pull_batch_size: 1, }; - runner.init(local_path, opts).await.unwrap(); + runner.init(&local_path, opts).await.unwrap(); protocol .server @@ -588,6 +590,60 @@ pub mod tests { }); } + #[test] + pub fn test_sync_single_db_update_sync_concurrent() { + deterministic_runtime(async || { + let io: Arc = Arc::new(turso_core::MemoryIO::new()); + let dir = tempfile::TempDir::new().unwrap(); + let server_path = dir.path().join("server.db"); + let ctx = Arc::new(TestContext::new(seed_u64())); + let protocol = TestProtocolIo::new(ctx.clone(), &server_path) + .await + .unwrap(); + let mut runner = TestRunner::new(ctx.clone(), io, protocol.clone()); + let opts = DatabaseSyncEngineOpts { + client_name: "id-1".to_string(), + wal_pull_batch_size: 1, + }; + + protocol + .server + .execute("CREATE TABLE t(x TEXT PRIMARY KEY, y)", ()) + .await + .unwrap(); + protocol + .server + .execute("INSERT INTO t VALUES ('hello', 'world')", ()) + .await + .unwrap(); + + runner.init(":memory:", opts).await.unwrap(); + let conn = runner.connect().await.unwrap(); + + let syncs = async move { + for i in 0..10 { + tracing::info!("sync attempt #{i}"); + runner.sync().await.unwrap(); + } + }; + + let updates = async move { + for i in 0..10 { + tracing::info!("update attempt #{i}"); + let sql = format!("INSERT INTO t VALUES ('key-{i}', 'value-{i}')"); + match conn.execute(&sql, ()).await { + Ok(_) => {} + Err(err) if err.to_string().contains("database is locked") => {} + Err(err) => panic!("update failed: {err}"), + } + ctx.random_sleep_n(50).await; + } + }; + + join!(updates, syncs); + }); + } + #[test] pub fn test_sync_single_db_many_pulls_big_payloads() { deterministic_runtime(async || { @@ -599,13 +655,13 @@ pub mod tests { .await .unwrap(); let mut runner = TestRunner::new(ctx.clone(), io, protocol.clone()); - let local_path = dir.path().join("local.db"); + let local_path = dir.path().join("local.db").to_str().unwrap().to_string(); let opts = DatabaseSyncEngineOpts { client_name: "id-1".to_string(), wal_pull_batch_size: 1, }; - runner.init(local_path, opts).await.unwrap(); + runner.init(&local_path, opts).await.unwrap(); protocol .server @@ -659,12 +715,12 @@ pub mod tests { .await .unwrap(); let mut runner = TestRunner::new(ctx.clone(), io, protocol.clone()); - let local_path = dir.path().join("local.db"); + let local_path = dir.path().join("local.db").to_str().unwrap().to_string(); let opts = DatabaseSyncEngineOpts { client_name: "id-1".to_string(), wal_pull_batch_size: 1, }; - runner.init(local_path, opts).await.unwrap(); + runner.init(&local_path, opts).await.unwrap(); protocol .server @@ -723,12 +779,12 @@ pub mod tests { .await .unwrap(); let mut runner = TestRunner::new(ctx.clone(), io.clone(), server.clone()); - let local_path = dir.path().join("local.db"); + let local_path = dir.path().join("local.db").to_str().unwrap().to_string(); let opts = DatabaseSyncEngineOpts { client_name: "id-1".to_string(), wal_pull_batch_size: 1, }; - runner.init(local_path, opts).await.unwrap(); + runner.init(&local_path, opts).await.unwrap(); server .server @@ -794,12 +850,17 @@ pub mod tests { const CLIENTS: usize = 8; for i in 0..CLIENTS { let mut runner = TestRunner::new(ctx.clone(), io.clone(), protocol.clone()); - let local_path = dir.path().join(format!("local-{i}.db")); + let local_path = dir + .path() + .join(format!("local-{i}.db")) + .to_str() + .unwrap() + .to_string(); let opts = DatabaseSyncEngineOpts { client_name: format!("id-{i}"), wal_pull_batch_size: 1, }; - runner.init(local_path, opts).await.unwrap(); + runner.init(&local_path, opts).await.unwrap(); dbs.push(runner); } @@ -877,12 +938,16 @@ pub mod tests { let sync_lock = sync_lock.clone(); async move { let mut runner = TestRunner::new(ctx.clone(), io.clone(), server.clone()); - let local_path = dir.join(format!("local-{i}.db")); + let local_path = dir + .join(format!("local-{i}.db")) + .to_str() + .unwrap() + .to_string(); let opts = DatabaseSyncEngineOpts { client_name: format!("id-{i}"), wal_pull_batch_size: 1, }; - runner.init(local_path, opts).await.unwrap(); + runner.init(&local_path, opts).await.unwrap(); runner.pull().await.unwrap(); let conn = runner.connect().await.unwrap(); for query in queries { @@ -933,12 +998,17 @@ pub mod tests { it += 1; let mut runner = TestRunner::new(ctx.clone(), io.clone(), protocol.clone()); - let local_path = dir.path().join(format!("local-{it}.db")); + let local_path = dir + .path() + .join(format!("local-{it}.db")) + .to_str() + .unwrap() + .to_string(); let opts = DatabaseSyncEngineOpts { client_name: format!("id-{it}"), wal_pull_batch_size: 1, }; - runner.init(local_path, opts).await.unwrap(); + runner.init(&local_path, opts).await.unwrap(); let has_fault = matches!(strategy, FaultInjectionStrategy::Enabled { .. }); @@ -1003,12 +1073,17 @@ pub mod tests { .unwrap(); let mut runner = TestRunner::new(ctx.clone(), io.clone(), protocol.clone()); - let local_path = dir.path().join(format!("local-{it}.db")); + let local_path = dir + .path() + .join(format!("local-{it}.db")) + .to_str() + .unwrap() + .to_string(); let opts = DatabaseSyncEngineOpts { client_name: format!("id-{it}"), wal_pull_batch_size: 1, }; - runner.init(local_path, opts).await.unwrap(); + runner.init(&local_path, opts).await.unwrap(); protocol .server @@ -1078,12 +1153,17 @@ pub mod tests { .unwrap(); let mut runner = TestRunner::new(ctx.clone(), io.clone(), protocol.clone()); - let local_path = dir.path().join(format!("local-{it}.db")); + let local_path = dir + .path() + .join(format!("local-{it}.db")) + .to_str() + .unwrap() + .to_string(); let opts = DatabaseSyncEngineOpts { client_name: format!("id-{it}"), wal_pull_batch_size: 1, }; - runner.init(local_path, opts).await.unwrap(); + runner.init(&local_path, opts).await.unwrap(); let conn = runner.connect().await.unwrap(); diff --git a/packages/turso-sync-engine/src/lib.rs b/packages/turso-sync-engine/src/lib.rs index d0f61dc21..2dda6cf67 100644 --- a/packages/turso-sync-engine/src/lib.rs +++ b/packages/turso-sync-engine/src/lib.rs @@ -84,23 +84,12 @@ mod tests { db: None, } } - pub async fn init( - &mut self, - local_path: PathBuf, - opts: DatabaseSyncEngineOpts, - ) -> Result<()> { + pub async fn init(&mut self, local_path: &str, opts: DatabaseSyncEngineOpts) -> Result<()> { let io = self.io.clone(); let server = self.sync_server.clone(); let db = self .run(genawaiter::sync::Gen::new(|coro| async move { - DatabaseSyncEngine::new( - &coro, - io, - Arc::new(server), - local_path.to_str().unwrap(), - opts, - ) - .await + DatabaseSyncEngine::new(&coro, io, Arc::new(server), local_path, opts).await })) .await .unwrap(); diff --git a/packages/turso-sync-engine/src/test_context.rs b/packages/turso-sync-engine/src/test_context.rs index 3691550da..3d67d135d 100644 --- a/packages/turso-sync-engine/src/test_context.rs +++ b/packages/turso-sync-engine/src/test_context.rs @@ -77,6 +77,13 @@ impl TestContext { let delay = self.rng.lock().await.next_u64() % 1000; tokio::time::sleep(std::time::Duration::from_millis(delay)).await } + pub async fn random_sleep_n(&self, n: u64) { + let delay = { + let mut rng = self.rng.lock().await; + rng.next_u64() % 1000 * (rng.next_u64() % n + 1) + }; + tokio::time::sleep(std::time::Duration::from_millis(delay)).await + } pub async fn rng(&self) -> tokio::sync::MutexGuard { self.rng.lock().await