diff --git a/packages/turso-sync-engine/src/database_sync_engine.rs b/packages/turso-sync-engine/src/database_sync_engine.rs index f804a6dda..1b8d53161 100644 --- a/packages/turso-sync-engine/src/database_sync_engine.rs +++ b/packages/turso-sync-engine/src/database_sync_engine.rs @@ -458,7 +458,7 @@ pub mod tests { use crate::{ database_sync_engine::DatabaseSyncEngineOpts, errors::Error, - test_context::{FaultInjectionStrategy, TestContext}, + test_context::{FaultInjectionPlan, FaultInjectionStrategy, TestContext}, test_protocol_io::TestProtocolIo, test_sync_server::convert_rows, tests::{deterministic_runtime, seed_u64, TestRunner}, @@ -593,6 +593,105 @@ pub mod tests { }); } + #[test] + pub fn test_sync_single_db_no_changes_no_push() { + deterministic_runtime(async || { + let io: Arc = Arc::new(turso_core::PlatformIO::new().unwrap()); + let dir = tempfile::TempDir::new().unwrap(); + let server_path = dir.path().join("server.db"); + let ctx = Arc::new(TestContext::new(seed_u64())); + let protocol = TestProtocolIo::new(ctx.clone(), &server_path) + .await + .unwrap(); + let mut runner = TestRunner::new(ctx.clone(), io, protocol.clone()); + let local_path = dir.path().join("local.db").to_str().unwrap().to_string(); + let opts = DatabaseSyncEngineOpts { + client_name: "id-1".to_string(), + wal_pull_batch_size: 1, + }; + runner.init(&local_path, opts).await.unwrap(); + + protocol + .server + .execute("CREATE TABLE t(x INTEGER PRIMARY KEY)", ()) + .await + .unwrap(); + protocol + .server + .execute("INSERT INTO t VALUES (1)", ()) + .await + .unwrap(); + + let conn = runner.connect().await.unwrap(); + + runner.sync().await.unwrap(); + assert_eq!( + query_rows(&conn, "SELECT * FROM t").await.unwrap(), + vec![vec![turso::Value::Integer(1)]] + ); + + conn.execute("INSERT INTO t VALUES (100)", ()) + .await + .unwrap(); + + protocol + .server + .execute("INSERT INTO t VALUES (2)", ()) + .await + .unwrap(); + + runner.sync().await.unwrap(); + assert_eq!( + query_rows(&conn, "SELECT * FROM t").await.unwrap(), + vec![ + vec![turso::Value::Integer(1)], + vec![turso::Value::Integer(2)], + vec![turso::Value::Integer(100)], + ] + ); + + protocol + .server + .execute("INSERT INTO t VALUES (3)", ()) + .await + .unwrap(); + runner.sync().await.unwrap(); + assert_eq!( + query_rows(&conn, "SELECT * FROM t").await.unwrap(), + vec![ + vec![turso::Value::Integer(1)], + vec![turso::Value::Integer(2)], + vec![turso::Value::Integer(3)], + vec![turso::Value::Integer(100)], + ] + ); + + ctx.switch_mode(FaultInjectionStrategy::Enabled { + plan: FaultInjectionPlan { + is_fault: Box::new(|name, _| Box::pin(async move { name == "wal_push_start" })), + }, + }) + .await; + + protocol + .server + .execute("INSERT INTO t VALUES (4)", ()) + .await + .unwrap(); + runner.sync().await.unwrap(); + assert_eq!( + query_rows(&conn, "SELECT * FROM t").await.unwrap(), + vec![ + vec![turso::Value::Integer(1)], + vec![turso::Value::Integer(2)], + vec![turso::Value::Integer(3)], + vec![turso::Value::Integer(4)], + vec![turso::Value::Integer(100)], + ] + ); + }); + } + #[test] pub fn test_sync_single_db_update_sync_concurrent() { deterministic_runtime(async || {