From 887b25dd00b7794904c8cecf2456b49d095cb1f3 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 13 Aug 2025 20:22:10 +0400 Subject: [PATCH 1/4] do not push wal unnecessary when nothing was changed locally --- .../src/database_sync_engine.rs | 6 +- .../src/database_sync_operations.rs | 72 ++++++++++--------- .../turso-sync-engine/src/database_tape.rs | 8 ++- .../turso-sync-engine/src/test_sync_server.rs | 1 + 4 files changed, 49 insertions(+), 38 deletions(-) diff --git a/packages/turso-sync-engine/src/database_sync_engine.rs b/packages/turso-sync-engine/src/database_sync_engine.rs index a19b85be8..bb0dd7264 100644 --- a/packages/turso-sync-engine/src/database_sync_engine.rs +++ b/packages/turso-sync-engine/src/database_sync_engine.rs @@ -221,6 +221,8 @@ impl DatabaseSyncEngine { transfer_logical_changes(coro, &self.draft_tape, &synced, client_id, false).await?; self.push_synced_to_remote(coro).await?; + + self.reset_synced_if_dirty(coro).await?; Ok(()) } @@ -772,7 +774,7 @@ pub mod tests { .await .unwrap(); - runner.pull().await.unwrap(); + runner.sync().await.unwrap(); // create connection in outer scope in order to prevent Database from being dropped in between of pull operations let conn = runner.connect().await.unwrap(); @@ -791,7 +793,7 @@ pub mod tests { } tracing::info!("pull attempt={}", attempt); - runner.pull().await.unwrap(); + runner.sync().await.unwrap(); let expected = expected .iter() diff --git a/packages/turso-sync-engine/src/database_sync_operations.rs b/packages/turso-sync-engine/src/database_sync_operations.rs index 84d968383..e9e257462 100644 --- a/packages/turso-sync-engine/src/database_sync_operations.rs +++ b/packages/turso-sync-engine/src/database_sync_operations.rs @@ -361,7 +361,6 @@ pub async fn transfer_logical_changes( }; let mut rows_changed = 0; let mut changes = source.iterate_changes(iterate_opts)?; - let mut updated = false; while let Some(operation) = changes.next(coro).await? { match &operation { DatabaseTapeOperation::RowChange(change) => { @@ -381,35 +380,36 @@ pub async fn transfer_logical_changes( ); } last_change_id = Some(change.change_id); - updated = true; } - DatabaseTapeOperation::Commit if updated || bump_pull_gen => { - tracing::debug!("prepare update stmt for turso_sync_last_change_id table with client_id={} and last_change_id={:?}", client_id, last_change_id); - // update turso_sync_last_change_id table with new value before commit - let mut set_last_change_id_stmt = - session.conn().prepare(TURSO_SYNC_UPDATE_LAST_CHANGE_ID)?; - let (next_pull_gen, next_change_id) = if bump_pull_gen { - (source_pull_gen + 1, 0) - } else { - (source_pull_gen, last_change_id.unwrap_or(0)) - }; - tracing::debug!("transfer_logical_changes: client_id={client_id}, set pull_gen={next_pull_gen}, change_id={next_change_id}"); - set_last_change_id_stmt - .bind_at(1.try_into().unwrap(), Value::Integer(next_pull_gen)); - set_last_change_id_stmt - .bind_at(2.try_into().unwrap(), Value::Integer(next_change_id)); - set_last_change_id_stmt - .bind_at(3.try_into().unwrap(), Value::Text(Text::new(client_id))); - exec_stmt(coro, &mut set_last_change_id_stmt).await?; - - let session_schema_cookie = session.conn().read_schema_version()?; - if session_schema_cookie <= source_schema_cookie { - session - .conn() - .write_schema_version(source_schema_cookie + 1)?; + DatabaseTapeOperation::Commit => { + if rows_changed > 0 || (bump_pull_gen && last_change_id.unwrap_or(0) > 0) { + tracing::debug!("prepare update stmt for turso_sync_last_change_id table with client_id={} and last_change_id={:?}", client_id, last_change_id); + // update turso_sync_last_change_id table with new value before commit + let mut set_last_change_id_stmt = + session.conn().prepare(TURSO_SYNC_UPDATE_LAST_CHANGE_ID)?; + let (next_pull_gen, next_change_id) = if bump_pull_gen { + (source_pull_gen + 1, 0) + } else { + (source_pull_gen, last_change_id.unwrap_or(0)) + }; + tracing::debug!("transfer_logical_changes: client_id={client_id}, set pull_gen={next_pull_gen}, change_id={next_change_id}, rows_changed={rows_changed}"); + set_last_change_id_stmt + .bind_at(1.try_into().unwrap(), Value::Integer(next_pull_gen)); + set_last_change_id_stmt + .bind_at(2.try_into().unwrap(), Value::Integer(next_change_id)); + set_last_change_id_stmt + .bind_at(3.try_into().unwrap(), Value::Text(Text::new(client_id))); + exec_stmt(coro, &mut set_last_change_id_stmt).await?; + } + if bump_pull_gen { + let session_schema_cookie = session.conn().read_schema_version()?; + if session_schema_cookie <= source_schema_cookie { + session + .conn() + .write_schema_version(source_schema_cookie + 1)?; + } } } - _ => {} } session.replay(coro, operation).await?; } @@ -653,12 +653,16 @@ pub mod tests { let mut gen = genawaiter::sync::Gen::new(|coro| async move { let conn1 = db1.connect(&coro).await?; - conn1.execute("CREATE TABLE t(x, y)")?; - conn1.execute("INSERT INTO t VALUES (1, 2), (3, 4), (5, 6)")?; + conn1.execute("CREATE TABLE t(x, y)").unwrap(); + conn1 + .execute("INSERT INTO t VALUES (1, 2), (3, 4), (5, 6)") + .unwrap(); - let conn2 = db2.connect(&coro).await?; + let conn2 = db2.connect(&coro).await.unwrap(); - transfer_logical_changes(&coro, &db1, &db2, "id-1", false).await?; + transfer_logical_changes(&coro, &db1, &db2, "id-1", false) + .await + .unwrap(); let mut rows = Vec::new(); let mut stmt = conn2.prepare("SELECT x, y FROM t").unwrap(); @@ -674,8 +678,10 @@ pub mod tests { ] ); - conn1.execute("INSERT INTO t VALUES (7, 8)")?; - transfer_logical_changes(&coro, &db1, &db2, "id-1", false).await?; + conn1.execute("INSERT INTO t VALUES (7, 8)").unwrap(); + transfer_logical_changes(&coro, &db1, &db2, "id-1", false) + .await + .unwrap(); let mut rows = Vec::new(); let mut stmt = conn2.prepare("SELECT x, y FROM t").unwrap(); diff --git a/packages/turso-sync-engine/src/database_tape.rs b/packages/turso-sync-engine/src/database_tape.rs index 491ff65dd..bec747df8 100644 --- a/packages/turso-sync-engine/src/database_tape.rs +++ b/packages/turso-sync-engine/src/database_tape.rs @@ -167,12 +167,14 @@ impl DatabaseTape { opts: DatabaseReplaySessionOpts, ) -> Result { tracing::debug!("opening replay session"); + let conn = self.connect(coro).await?; + conn.execute("BEGIN IMMEDIATE")?; Ok(DatabaseReplaySession { - conn: self.connect(coro).await?, + conn, cached_delete_stmt: HashMap::new(), cached_insert_stmt: HashMap::new(), cached_update_stmt: HashMap::new(), - in_txn: false, + in_txn: true, opts, }) } @@ -407,7 +409,7 @@ impl DatabaseChangesIterator { } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct DatabaseReplaySessionOpts { pub use_implicit_rowid: bool, } diff --git a/packages/turso-sync-engine/src/test_sync_server.rs b/packages/turso-sync-engine/src/test_sync_server.rs index 9ad902df5..ae5be4a22 100644 --- a/packages/turso-sync-engine/src/test_sync_server.rs +++ b/packages/turso-sync-engine/src/test_sync_server.rs @@ -313,6 +313,7 @@ impl TestSyncServer { pub async fn execute(&self, sql: &str, params: impl turso::IntoParams) -> Result<()> { let conn = self.db.connect()?; conn.execute(sql, params).await?; + tracing::debug!("sync_frames_from_conn after execute"); self.sync_frames_from_conn(&conn).await?; Ok(()) } From f603a0dfc85f85dfdfd5fac6191850640d553b7f Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Thu, 14 Aug 2025 12:37:49 +0400 Subject: [PATCH 2/4] change log level to INFO in order to simplify debugging (DEBUG logs in the db are pretty spammy) --- .../src/database_sync_engine.rs | 25 +++++++++---------- .../src/database_sync_operations.rs | 22 ++++++++-------- 2 files changed, 23 insertions(+), 24 deletions(-) diff --git a/packages/turso-sync-engine/src/database_sync_engine.rs b/packages/turso-sync-engine/src/database_sync_engine.rs index bb0dd7264..f804a6dda 100644 --- a/packages/turso-sync-engine/src/database_sync_engine.rs +++ b/packages/turso-sync-engine/src/database_sync_engine.rs @@ -45,7 +45,7 @@ async fn update_meta( ) -> Result<()> { let mut meta = orig.as_ref().unwrap().clone(); update(&mut meta); - tracing::debug!("update_meta: {meta:?}"); + tracing::info!("update_meta: {meta:?}"); let completion = io.full_write(meta_path, meta.dump()?)?; // todo: what happen if we will actually update the metadata on disk but fail and so in memory state will not be updated wait_full_body(coro, &completion).await?; @@ -60,7 +60,7 @@ async fn set_meta( orig: &mut Option, meta: DatabaseMetadata, ) -> Result<()> { - tracing::debug!("set_meta: {meta:?}"); + tracing::info!("set_meta: {meta:?}"); let completion = io.full_write(meta_path, meta.dump()?)?; // todo: what happen if we will actually update the metadata on disk but fail and so in memory state will not be updated wait_full_body(coro, &completion).await?; @@ -103,7 +103,7 @@ impl DatabaseSyncEngine { /// This method will **not** send local changed to the remote /// This method will block writes for the period of pull pub async fn pull(&mut self, coro: &Coro) -> Result<()> { - tracing::debug!( + tracing::info!( "pull: draft={}, synced={}", self.draft_path, self.synced_path @@ -120,7 +120,6 @@ impl DatabaseSyncEngine { // we will "replay" Synced WAL to the Draft WAL later without pushing it to the remote // so, we pass 'capture: true' as we need to preserve all changes for future push of WAL let synced = self.io.open_tape(&self.synced_path, true)?; - tracing::info!("opened synced"); // we will start wal write session for Draft DB in order to hold write lock during transfer of changes let mut draft_session = WalSession::new(connect(coro, &self.draft_tape).await?); @@ -168,7 +167,7 @@ impl DatabaseSyncEngine { let WalPullResult::NeedCheckpoint = pull_result else { break; }; - tracing::debug!( + tracing::info!( "ready to checkpoint synced db file at {:?}, generation={}", self.synced_path, self.meta().synced_generation @@ -198,7 +197,7 @@ impl DatabaseSyncEngine { /// This method will **not** pull remote changes to the local DB /// This method will **not** block writes for the period of sync pub async fn push(&mut self, coro: &Coro) -> Result<()> { - tracing::debug!( + tracing::info!( "push: draft={}, synced={}", self.draft_path, self.synced_path @@ -237,7 +236,7 @@ impl DatabaseSyncEngine { } async fn init(&mut self, coro: &Coro) -> Result<()> { - tracing::debug!( + tracing::info!( "initialize sync engine: draft={}, synced={}, opts={:?}", self.draft_path, self.synced_path, @@ -258,7 +257,7 @@ impl DatabaseSyncEngine { } None => { let meta = self.bootstrap_db_files(coro).await?; - tracing::debug!("write meta after successful bootstrap: meta={meta:?}"); + tracing::info!("write meta after successful bootstrap: meta={meta:?}"); set_meta( coro, self.protocol.as_ref(), @@ -285,7 +284,7 @@ impl DatabaseSyncEngine { } async fn pull_synced_from_remote(&mut self, coro: &Coro) -> Result { - tracing::debug!( + tracing::info!( "pull_synced_from_remote: draft={:?}, synced={:?}", self.draft_path, self.synced_path, @@ -330,7 +329,7 @@ impl DatabaseSyncEngine { } async fn push_synced_to_remote(&mut self, coro: &Coro) -> Result<()> { - tracing::debug!( + tracing::info!( "push_synced_to_remote: draft={}, synced={}, id={}", self.draft_path, self.synced_path, @@ -381,7 +380,7 @@ impl DatabaseSyncEngine { self.meta.is_none(), "bootstrap_db_files must be called only when meta is not set" ); - tracing::debug!( + tracing::info!( "bootstrap_db_files: draft={}, synced={}", self.draft_path, self.synced_path, @@ -410,7 +409,7 @@ impl DatabaseSyncEngine { let db_info = db_bootstrap(coro, self.protocol.as_ref(), files).await?; let elapsed = std::time::Instant::now().duration_since(start_time); - tracing::debug!( + tracing::info!( "bootstrap_db_files: finished draft={:?}, synced={:?}: elapsed={:?}", self.draft_path, self.synced_path, @@ -428,7 +427,7 @@ impl DatabaseSyncEngine { /// Reset WAL of Synced database which potentially can have some local changes async fn reset_synced_if_dirty(&mut self, coro: &Coro) -> Result<()> { - tracing::debug!( + tracing::info!( "reset_synced: synced_path={:?}, synced_is_dirty={}", self.synced_path, self.synced_is_dirty diff --git a/packages/turso-sync-engine/src/database_sync_operations.rs b/packages/turso-sync-engine/src/database_sync_operations.rs index e9e257462..75e3fba19 100644 --- a/packages/turso-sync-engine/src/database_sync_operations.rs +++ b/packages/turso-sync-engine/src/database_sync_operations.rs @@ -122,7 +122,7 @@ pub async fn wal_pull<'a, C: ProtocolIO, U: AsyncFnMut(&'a Coro, u64) -> Result< end_frame: u64, mut update: U, ) -> Result { - tracing::debug!( + tracing::info!( "wal_pull: generation={}, start_frame={}, end_frame={}", generation, start_frame, @@ -209,7 +209,7 @@ pub async fn wal_push( end_frame: u64, ) -> Result { assert!(wal_session.in_txn()); - tracing::debug!("wal_push: baton={baton:?}, generation={generation}, start_frame={start_frame}, end_frame={end_frame}"); + tracing::info!("wal_push: baton={baton:?}, generation={generation}, start_frame={start_frame}, end_frame={end_frame}"); if start_frame == end_frame { return Ok(WalPushResult::Ok { baton: None }); @@ -272,7 +272,7 @@ pub async fn transfer_logical_changes( client_id: &str, bump_pull_gen: bool, ) -> Result<()> { - tracing::debug!("transfer_logical_changes: client_id={client_id}"); + tracing::info!("transfer_logical_changes: client_id={client_id}"); let source_conn = connect_untracked(source)?; let target_conn = connect_untracked(target)?; @@ -293,12 +293,12 @@ pub async fn transfer_logical_changes( Error::DatabaseSyncEngineError("unexpected source pull_gen type".to_string()) })?, None => { - tracing::debug!("transfer_logical_changes: client_id={client_id}, turso_sync_last_change_id table is not found"); + tracing::info!("transfer_logical_changes: client_id={client_id}, turso_sync_last_change_id table is not found"); 0 } } }; - tracing::debug!( + tracing::info!( "transfer_logical_changes: client_id={client_id}, source_pull_gen={source_pull_gen}" ); @@ -383,7 +383,7 @@ pub async fn transfer_logical_changes( } DatabaseTapeOperation::Commit => { if rows_changed > 0 || (bump_pull_gen && last_change_id.unwrap_or(0) > 0) { - tracing::debug!("prepare update stmt for turso_sync_last_change_id table with client_id={} and last_change_id={:?}", client_id, last_change_id); + tracing::info!("prepare update stmt for turso_sync_last_change_id table with client_id={} and last_change_id={:?}", client_id, last_change_id); // update turso_sync_last_change_id table with new value before commit let mut set_last_change_id_stmt = session.conn().prepare(TURSO_SYNC_UPDATE_LAST_CHANGE_ID)?; @@ -392,7 +392,7 @@ pub async fn transfer_logical_changes( } else { (source_pull_gen, last_change_id.unwrap_or(0)) }; - tracing::debug!("transfer_logical_changes: client_id={client_id}, set pull_gen={next_pull_gen}, change_id={next_change_id}, rows_changed={rows_changed}"); + tracing::info!("transfer_logical_changes: client_id={client_id}, set pull_gen={next_pull_gen}, change_id={next_change_id}, rows_changed={rows_changed}"); set_last_change_id_stmt .bind_at(1.try_into().unwrap(), Value::Integer(next_pull_gen)); set_last_change_id_stmt @@ -414,7 +414,7 @@ pub async fn transfer_logical_changes( session.replay(coro, operation).await?; } - tracing::debug!("transfer_logical_changes: rows_changed={:?}", rows_changed); + tracing::info!("transfer_logical_changes: rows_changed={:?}", rows_changed); Ok(()) } @@ -428,7 +428,7 @@ pub async fn transfer_physical_changes( source_sync_watermark: u64, target_wal_match_watermark: u64, ) -> Result { - tracing::debug!("transfer_physical_changes: source_wal_match_watermark={source_wal_match_watermark}, source_sync_watermark={source_sync_watermark}, target_wal_match_watermark={target_wal_match_watermark}"); + tracing::info!("transfer_physical_changes: source_wal_match_watermark={source_wal_match_watermark}, source_sync_watermark={source_sync_watermark}, target_wal_match_watermark={target_wal_match_watermark}"); let source_conn = connect(coro, source).await?; let mut source_session = WalSession::new(source_conn.clone()); @@ -454,7 +454,7 @@ pub async fn transfer_physical_changes( let mut last_frame_info = None; let mut frame = vec![0u8; WAL_FRAME_SIZE]; let mut target_sync_watermark = target_session.frames_count()?; - tracing::debug!( + tracing::info!( "transfer_physical_changes: start={}, end={}", source_wal_match_watermark + 1, source_frames_count @@ -465,7 +465,7 @@ pub async fn transfer_physical_changes( target_session.append_page(frame_info.page_no, &frame[WAL_FRAME_HEADER..])?; if source_frame_no == source_sync_watermark { target_sync_watermark = target_session.frames_count()? + 1; // +1 because page will be actually commited on next iteration - tracing::debug!("set target_sync_watermark to {}", target_sync_watermark); + tracing::info!("set target_sync_watermark to {}", target_sync_watermark); } last_frame_info = Some(frame_info); } From 8c9d648852dcfbb89c979038c5a1ae77034ecc94 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Thu, 14 Aug 2025 12:38:15 +0400 Subject: [PATCH 3/4] add test which check that we don't push without the need --- .../src/database_sync_engine.rs | 101 +++++++++++++++++- 1 file changed, 100 insertions(+), 1 deletion(-) diff --git a/packages/turso-sync-engine/src/database_sync_engine.rs b/packages/turso-sync-engine/src/database_sync_engine.rs index f804a6dda..1b8d53161 100644 --- a/packages/turso-sync-engine/src/database_sync_engine.rs +++ b/packages/turso-sync-engine/src/database_sync_engine.rs @@ -458,7 +458,7 @@ pub mod tests { use crate::{ database_sync_engine::DatabaseSyncEngineOpts, errors::Error, - test_context::{FaultInjectionStrategy, TestContext}, + test_context::{FaultInjectionPlan, FaultInjectionStrategy, TestContext}, test_protocol_io::TestProtocolIo, test_sync_server::convert_rows, tests::{deterministic_runtime, seed_u64, TestRunner}, @@ -593,6 +593,105 @@ pub mod tests { }); } + #[test] + pub fn test_sync_single_db_no_changes_no_push() { + deterministic_runtime(async || { + let io: Arc = Arc::new(turso_core::PlatformIO::new().unwrap()); + let dir = tempfile::TempDir::new().unwrap(); + let server_path = dir.path().join("server.db"); + let ctx = Arc::new(TestContext::new(seed_u64())); + let protocol = TestProtocolIo::new(ctx.clone(), &server_path) + .await + .unwrap(); + let mut runner = TestRunner::new(ctx.clone(), io, protocol.clone()); + let local_path = dir.path().join("local.db").to_str().unwrap().to_string(); + let opts = DatabaseSyncEngineOpts { + client_name: "id-1".to_string(), + wal_pull_batch_size: 1, + }; + runner.init(&local_path, opts).await.unwrap(); + + protocol + .server + .execute("CREATE TABLE t(x INTEGER PRIMARY KEY)", ()) + .await + .unwrap(); + protocol + .server + .execute("INSERT INTO t VALUES (1)", ()) + .await + .unwrap(); + + let conn = runner.connect().await.unwrap(); + + runner.sync().await.unwrap(); + assert_eq!( + query_rows(&conn, "SELECT * FROM t").await.unwrap(), + vec![vec![turso::Value::Integer(1)]] + ); + + conn.execute("INSERT INTO t VALUES (100)", ()) + .await + .unwrap(); + + protocol + .server + .execute("INSERT INTO t VALUES (2)", ()) + .await + .unwrap(); + + runner.sync().await.unwrap(); + assert_eq!( + query_rows(&conn, "SELECT * FROM t").await.unwrap(), + vec![ + vec![turso::Value::Integer(1)], + vec![turso::Value::Integer(2)], + vec![turso::Value::Integer(100)], + ] + ); + + protocol + .server + .execute("INSERT INTO t VALUES (3)", ()) + .await + .unwrap(); + runner.sync().await.unwrap(); + assert_eq!( + query_rows(&conn, "SELECT * FROM t").await.unwrap(), + vec![ + vec![turso::Value::Integer(1)], + vec![turso::Value::Integer(2)], + vec![turso::Value::Integer(3)], + vec![turso::Value::Integer(100)], + ] + ); + + ctx.switch_mode(FaultInjectionStrategy::Enabled { + plan: FaultInjectionPlan { + is_fault: Box::new(|name, _| Box::pin(async move { name == "wal_push_start" })), + }, + }) + .await; + + protocol + .server + .execute("INSERT INTO t VALUES (4)", ()) + .await + .unwrap(); + runner.sync().await.unwrap(); + assert_eq!( + query_rows(&conn, "SELECT * FROM t").await.unwrap(), + vec![ + vec![turso::Value::Integer(1)], + vec![turso::Value::Integer(2)], + vec![turso::Value::Integer(3)], + vec![turso::Value::Integer(4)], + vec![turso::Value::Integer(100)], + ] + ); + }); + } + #[test] pub fn test_sync_single_db_update_sync_concurrent() { deterministic_runtime(async || { From 34a7b2ffd45fc80ec7dca9bc63cccd56b2f28f1b Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Thu, 14 Aug 2025 12:39:44 +0400 Subject: [PATCH 4/4] ignore changes in the turso_sync_last_change_id --- .../src/database_sync_operations.rs | 61 ++++++++++--------- 1 file changed, 31 insertions(+), 30 deletions(-) diff --git a/packages/turso-sync-engine/src/database_sync_operations.rs b/packages/turso-sync-engine/src/database_sync_operations.rs index 75e3fba19..683a7bbee 100644 --- a/packages/turso-sync-engine/src/database_sync_operations.rs +++ b/packages/turso-sync-engine/src/database_sync_operations.rs @@ -254,7 +254,8 @@ pub async fn wal_push( } } -const TURSO_SYNC_META_TABLE: &str = +const TURSO_SYNC_TABLE_NAME: &str = "turso_sync_last_change_id"; +const TURSO_SYNC_CREATE_TABLE: &str = "CREATE TABLE IF NOT EXISTS turso_sync_last_change_id (client_id TEXT PRIMARY KEY, pull_gen INTEGER, change_id INTEGER)"; const TURSO_SYNC_SELECT_LAST_CHANGE_ID: &str = "SELECT pull_gen, change_id FROM turso_sync_last_change_id WHERE client_id = ?"; @@ -303,7 +304,7 @@ pub async fn transfer_logical_changes( ); // fetch last_change_id from the target DB in order to guarantee atomic replay of changes and avoid conflicts in case of failure - let mut schema_stmt = target_conn.prepare(TURSO_SYNC_META_TABLE)?; + let mut schema_stmt = target_conn.prepare(TURSO_SYNC_CREATE_TABLE)?; exec_stmt(coro, &mut schema_stmt).await?; let mut select_last_change_id_stmt = target_conn.prepare(TURSO_SYNC_SELECT_LAST_CHANGE_ID)?; @@ -364,13 +365,16 @@ pub async fn transfer_logical_changes( while let Some(operation) = changes.next(coro).await? { match &operation { DatabaseTapeOperation::RowChange(change) => { - rows_changed += 1; assert!( last_change_id.is_none() || last_change_id.unwrap() < change.change_id, "change id must be strictly increasing: last_change_id={:?}, change.change_id={}", last_change_id, change.change_id ); + if change.table_name == TURSO_SYNC_TABLE_NAME { + continue; + } + rows_changed += 1; // we give user full control over CDC table - so let's not emit assert here for now if last_change_id.is_some() && last_change_id.unwrap() + 1 != change.change_id { tracing::warn!( @@ -381,35 +385,32 @@ pub async fn transfer_logical_changes( } last_change_id = Some(change.change_id); } - DatabaseTapeOperation::Commit => { - if rows_changed > 0 || (bump_pull_gen && last_change_id.unwrap_or(0) > 0) { - tracing::info!("prepare update stmt for turso_sync_last_change_id table with client_id={} and last_change_id={:?}", client_id, last_change_id); - // update turso_sync_last_change_id table with new value before commit - let mut set_last_change_id_stmt = - session.conn().prepare(TURSO_SYNC_UPDATE_LAST_CHANGE_ID)?; - let (next_pull_gen, next_change_id) = if bump_pull_gen { - (source_pull_gen + 1, 0) - } else { - (source_pull_gen, last_change_id.unwrap_or(0)) - }; - tracing::info!("transfer_logical_changes: client_id={client_id}, set pull_gen={next_pull_gen}, change_id={next_change_id}, rows_changed={rows_changed}"); - set_last_change_id_stmt - .bind_at(1.try_into().unwrap(), Value::Integer(next_pull_gen)); - set_last_change_id_stmt - .bind_at(2.try_into().unwrap(), Value::Integer(next_change_id)); - set_last_change_id_stmt - .bind_at(3.try_into().unwrap(), Value::Text(Text::new(client_id))); - exec_stmt(coro, &mut set_last_change_id_stmt).await?; - } - if bump_pull_gen { - let session_schema_cookie = session.conn().read_schema_version()?; - if session_schema_cookie <= source_schema_cookie { - session - .conn() - .write_schema_version(source_schema_cookie + 1)?; - } + DatabaseTapeOperation::Commit if rows_changed > 0 || bump_pull_gen => { + tracing::info!("prepare update stmt for turso_sync_last_change_id table with client_id={} and last_change_id={:?}", client_id, last_change_id); + // update turso_sync_last_change_id table with new value before commit + let mut set_last_change_id_stmt = + session.conn().prepare(TURSO_SYNC_UPDATE_LAST_CHANGE_ID)?; + let (next_pull_gen, next_change_id) = if bump_pull_gen { + (source_pull_gen + 1, 0) + } else { + (source_pull_gen, last_change_id.unwrap_or(0)) + }; + tracing::info!("transfer_logical_changes: client_id={client_id}, set pull_gen={next_pull_gen}, change_id={next_change_id}, rows_changed={rows_changed}"); + set_last_change_id_stmt + .bind_at(1.try_into().unwrap(), Value::Integer(next_pull_gen)); + set_last_change_id_stmt + .bind_at(2.try_into().unwrap(), Value::Integer(next_change_id)); + set_last_change_id_stmt + .bind_at(3.try_into().unwrap(), Value::Text(Text::new(client_id))); + exec_stmt(coro, &mut set_last_change_id_stmt).await?; + let session_schema_cookie = session.conn().read_schema_version()?; + if session_schema_cookie <= source_schema_cookie { + session + .conn() + .write_schema_version(source_schema_cookie + 1)?; } } + _ => {} } session.replay(coro, operation).await?; }