diff --git a/packages/turso-sync-engine/src/database_sync_engine.rs b/packages/turso-sync-engine/src/database_sync_engine.rs index 4621b61d5..a19b85be8 100644 --- a/packages/turso-sync-engine/src/database_sync_engine.rs +++ b/packages/turso-sync-engine/src/database_sync_engine.rs @@ -456,6 +456,7 @@ pub mod tests { use crate::{ database_sync_engine::DatabaseSyncEngineOpts, + errors::Error, test_context::{FaultInjectionStrategy, TestContext}, test_protocol_io::TestProtocolIo, test_sync_server::convert_rows, @@ -645,6 +646,107 @@ pub mod tests { }); } + #[test] + pub fn test_sync_many_dbs_update_sync_concurrent() { + deterministic_runtime(async || { + let io: Arc = Arc::new(turso_core::MemoryIO::new()); + let dir = tempfile::TempDir::new().unwrap(); + let server_path = dir.path().join("server.db"); + let ctx = Arc::new(TestContext::new(seed_u64())); + let protocol = TestProtocolIo::new(ctx.clone(), &server_path) + .await + .unwrap(); + protocol + .server + .execute("CREATE TABLE t(x TEXT PRIMARY KEY, y)", ()) + .await + .unwrap(); + protocol + .server + .execute( + "INSERT INTO t VALUES ('id-1', 'client1'), ('id-2', 'client2')", + (), + ) + .await + .unwrap(); + let mut runner1 = TestRunner::new(ctx.clone(), io.clone(), protocol.clone()); + runner1 + .init( + ":memory:-1", + DatabaseSyncEngineOpts { + client_name: "id-1".to_string(), + wal_pull_batch_size: 2, + }, + ) + .await + .unwrap(); + let mut runner2 = TestRunner::new(ctx.clone(), io.clone(), protocol.clone()); + runner2 + .init( + ":memory:-2", + DatabaseSyncEngineOpts { + client_name: "id-2".to_string(), + wal_pull_batch_size: 2, + }, + ) + .await + .unwrap(); + + let conn1 = runner1.connect().await.unwrap(); + let conn2 = runner2.connect().await.unwrap(); + + let syncs1 = async move { + for i in 0..10 { + tracing::info!("sync attempt #{i}"); + match runner1.sync().await { + Ok(()) | Err(Error::DatabaseSyncEngineConflict(..)) => continue, + Err(err) => panic!("unexpected error: {err}"), + } + } + }; + + let syncs2 = async move { + for i in 0..10 { + tracing::info!("sync attempt #{i}"); + match runner2.sync().await { + Ok(()) | Err(Error::DatabaseSyncEngineConflict(..)) => continue, + Err(err) => panic!("unexpected error: {err}"), + } + } + }; + + let ctx1 = ctx.clone(); + let updates1 = async move { + for i in 0..100 { + tracing::info!("update attempt #{i}"); + let sql = format!("INSERT INTO t VALUES ('key-1-{i}', 'value')"); + match conn1.execute(&sql, ()).await { + Ok(_) => {} + Err(err) if err.to_string().contains("database is locked") => {} + Err(err) => panic!("update failed: {err}"), + } + ctx1.random_sleep_n(10).await; + } + }; + + let ctx2 = ctx.clone(); + let updates2 = async move { + for i in 0..100 { + tracing::info!("update attempt #{i}"); + let sql = format!("INSERT INTO t VALUES ('key-2-{i}', 'value')"); + match conn2.execute(&sql, ()).await { + Ok(_) => {} + Err(err) if err.to_string().contains("database is locked") => {} + Err(err) => panic!("update failed: {err}"), + } + ctx2.random_sleep_n(10).await; + } + }; + + join!(updates1, updates2, syncs1, syncs2); + }); + } + #[test] pub fn test_sync_single_db_many_pulls_big_payloads() { deterministic_runtime(async || { diff --git a/packages/turso-sync-engine/src/database_sync_operations.rs b/packages/turso-sync-engine/src/database_sync_operations.rs index 8f21868af..84d968383 100644 --- a/packages/turso-sync-engine/src/database_sync_operations.rs +++ b/packages/turso-sync-engine/src/database_sync_operations.rs @@ -179,6 +179,10 @@ pub async fn wal_pull<'a, C: ProtocolIO, U: AsyncFnMut(&'a Coro, u64) -> Result< } coro.yield_(ProtocolCommand::IO).await?; } + if start_frame < end_frame { + // chunk which was sent from the server has ended early - so there is nothing left on server-side for pull + return Ok(WalPullResult::Done); + } if !buffer.is_empty() { return Err(Error::DatabaseSyncEngineError(format!( "wal_pull: response has unexpected trailing data: buffer.len()={}",