diff --git a/packages/turso-sync-engine/src/database_sync_engine.rs b/packages/turso-sync-engine/src/database_sync_engine.rs index 48a021ae8..e26ffdbc7 100644 --- a/packages/turso-sync-engine/src/database_sync_engine.rs +++ b/packages/turso-sync-engine/src/database_sync_engine.rs @@ -565,7 +565,7 @@ pub mod tests { #[test] pub fn test_sync_single_db_many_pulls_big_payloads() { - deterministic_runtime(async || { + deterministic_runtime(async || { let io: Arc = Arc::new(turso_core::PlatformIO::new().unwrap()); let dir = tempfile::TempDir::new().unwrap(); let server_path = dir.path().join("server.db"); @@ -579,6 +579,7 @@ pub mod tests { client_name: "id-1".to_string(), wal_pull_batch_size: 1, }; + runner.init(local_path, opts).await.unwrap(); protocol @@ -621,6 +622,55 @@ pub mod tests { } }); } + + #[test] + pub fn test_sync_single_db_checkpoint() { + deterministic_runtime(async || { + let io: Arc = Arc::new(turso_core::PlatformIO::new().unwrap()); + let dir = tempfile::TempDir::new().unwrap(); + let server_path = dir.path().join("server.db"); + let ctx = Arc::new(TestContext::new(seed_u64())); + let protocol = TestProtocolIo::new(ctx.clone(), &server_path) + .await + .unwrap(); + let mut runner = TestRunner::new(ctx.clone(), io, protocol.clone()); + let local_path = dir.path().join("local.db"); + let opts = DatabaseSyncEngineOpts { + client_name: "id-1".to_string(), + wal_pull_batch_size: 1, + }; + runner.init(local_path, opts).await.unwrap(); + + protocol + .server + .execute("CREATE TABLE t(x INTEGER PRIMARY KEY)", ()) + .await + .unwrap(); + protocol + .server + .execute("INSERT INTO t VALUES (1)", ()) + .await + .unwrap(); + protocol.server.checkpoint().await.unwrap(); + protocol + .server + .execute("INSERT INTO t VALUES (2)", ()) + .await + .unwrap(); + + let conn = runner.connect().await.unwrap(); + + runner.pull().await.unwrap(); + + assert_eq!( + query_rows(&conn, "SELECT * FROM t").await.unwrap(), + vec![ + vec![turso::Value::Integer(1)], + vec![turso::Value::Integer(2)] + ] + ); + }); + } #[test] pub fn test_sync_single_db_full_syncs() { diff --git a/packages/turso-sync-engine/src/test_sync_server.rs b/packages/turso-sync-engine/src/test_sync_server.rs index ccfbc1eb6..16f963582 100644 --- a/packages/turso-sync-engine/src/test_sync_server.rs +++ b/packages/turso-sync-engine/src/test_sync_server.rs @@ -1,4 +1,8 @@ -use std::{collections::HashMap, path::Path, sync::Arc}; +use std::{ + collections::HashMap, + path::{Path, PathBuf}, + sync::Arc, +}; use tokio::sync::Mutex; @@ -33,6 +37,7 @@ struct TestSyncServerState { #[derive(Clone)] pub struct TestSyncServer { + path: PathBuf, ctx: Arc, db: turso::Database, state: Arc>, @@ -49,6 +54,7 @@ impl TestSyncServer { }, ); Ok(Self { + path: path.to_path_buf(), ctx, db: turso::Builder::new_local(path.to_str().unwrap()) .build() @@ -263,6 +269,25 @@ impl TestSyncServer { pub fn db(&self) -> turso::Database { self.db.clone() } + pub async fn checkpoint(&self) -> Result<()> { + let conn = self.db.connect()?; + let _ = conn.query("PRAGMA wal_checkpoint(TRUNCATE)", ()).await?; + let mut state = self.state.lock().await; + let generation = state.generation + 1; + state.generation = generation; + state.generations.insert( + generation, + Generation { + snapshot: std::fs::read(&self.path).map_err(|e| { + Error::DatabaseSyncEngineError(format!( + "failed to create generation snapshot: {e}" + )) + })?, + frames: Vec::new(), + }, + ); + Ok(()) + } pub async fn execute(&self, sql: &str, params: impl turso::IntoParams) -> Result<()> { let conn = self.db.connect()?; conn.execute(sql, params).await?;