From 34a7b2ffd45fc80ec7dca9bc63cccd56b2f28f1b Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Thu, 14 Aug 2025 12:39:44 +0400 Subject: [PATCH] ignore changes in the turso_sync_last_change_id --- .../src/database_sync_operations.rs | 61 ++++++++++--------- 1 file changed, 31 insertions(+), 30 deletions(-) diff --git a/packages/turso-sync-engine/src/database_sync_operations.rs b/packages/turso-sync-engine/src/database_sync_operations.rs index 75e3fba19..683a7bbee 100644 --- a/packages/turso-sync-engine/src/database_sync_operations.rs +++ b/packages/turso-sync-engine/src/database_sync_operations.rs @@ -254,7 +254,8 @@ pub async fn wal_push( } } -const TURSO_SYNC_META_TABLE: &str = +const TURSO_SYNC_TABLE_NAME: &str = "turso_sync_last_change_id"; +const TURSO_SYNC_CREATE_TABLE: &str = "CREATE TABLE IF NOT EXISTS turso_sync_last_change_id (client_id TEXT PRIMARY KEY, pull_gen INTEGER, change_id INTEGER)"; const TURSO_SYNC_SELECT_LAST_CHANGE_ID: &str = "SELECT pull_gen, change_id FROM turso_sync_last_change_id WHERE client_id = ?"; @@ -303,7 +304,7 @@ pub async fn transfer_logical_changes( ); // fetch last_change_id from the target DB in order to guarantee atomic replay of changes and avoid conflicts in case of failure - let mut schema_stmt = target_conn.prepare(TURSO_SYNC_META_TABLE)?; + let mut schema_stmt = target_conn.prepare(TURSO_SYNC_CREATE_TABLE)?; exec_stmt(coro, &mut schema_stmt).await?; let mut select_last_change_id_stmt = target_conn.prepare(TURSO_SYNC_SELECT_LAST_CHANGE_ID)?; @@ -364,13 +365,16 @@ pub async fn transfer_logical_changes( while let Some(operation) = changes.next(coro).await? { match &operation { DatabaseTapeOperation::RowChange(change) => { - rows_changed += 1; assert!( last_change_id.is_none() || last_change_id.unwrap() < change.change_id, "change id must be strictly increasing: last_change_id={:?}, change.change_id={}", last_change_id, change.change_id ); + if change.table_name == TURSO_SYNC_TABLE_NAME { + continue; + } + rows_changed += 1; // we give user full control over CDC table - so let's not emit assert here for now if last_change_id.is_some() && last_change_id.unwrap() + 1 != change.change_id { tracing::warn!( @@ -381,35 +385,32 @@ pub async fn transfer_logical_changes( } last_change_id = Some(change.change_id); } - DatabaseTapeOperation::Commit => { - if rows_changed > 0 || (bump_pull_gen && last_change_id.unwrap_or(0) > 0) { - tracing::info!("prepare update stmt for turso_sync_last_change_id table with client_id={} and last_change_id={:?}", client_id, last_change_id); - // update turso_sync_last_change_id table with new value before commit - let mut set_last_change_id_stmt = - session.conn().prepare(TURSO_SYNC_UPDATE_LAST_CHANGE_ID)?; - let (next_pull_gen, next_change_id) = if bump_pull_gen { - (source_pull_gen + 1, 0) - } else { - (source_pull_gen, last_change_id.unwrap_or(0)) - }; - tracing::info!("transfer_logical_changes: client_id={client_id}, set pull_gen={next_pull_gen}, change_id={next_change_id}, rows_changed={rows_changed}"); - set_last_change_id_stmt - .bind_at(1.try_into().unwrap(), Value::Integer(next_pull_gen)); - set_last_change_id_stmt - .bind_at(2.try_into().unwrap(), Value::Integer(next_change_id)); - set_last_change_id_stmt - .bind_at(3.try_into().unwrap(), Value::Text(Text::new(client_id))); - exec_stmt(coro, &mut set_last_change_id_stmt).await?; - } - if bump_pull_gen { - let session_schema_cookie = session.conn().read_schema_version()?; - if session_schema_cookie <= source_schema_cookie { - session - .conn() - .write_schema_version(source_schema_cookie + 1)?; - } + DatabaseTapeOperation::Commit if rows_changed > 0 || bump_pull_gen => { + tracing::info!("prepare update stmt for turso_sync_last_change_id table with client_id={} and last_change_id={:?}", client_id, last_change_id); + // update turso_sync_last_change_id table with new value before commit + let mut set_last_change_id_stmt = + session.conn().prepare(TURSO_SYNC_UPDATE_LAST_CHANGE_ID)?; + let (next_pull_gen, next_change_id) = if bump_pull_gen { + (source_pull_gen + 1, 0) + } else { + (source_pull_gen, last_change_id.unwrap_or(0)) + }; + tracing::info!("transfer_logical_changes: client_id={client_id}, set pull_gen={next_pull_gen}, change_id={next_change_id}, rows_changed={rows_changed}"); + set_last_change_id_stmt + .bind_at(1.try_into().unwrap(), Value::Integer(next_pull_gen)); + set_last_change_id_stmt + .bind_at(2.try_into().unwrap(), Value::Integer(next_change_id)); + set_last_change_id_stmt + .bind_at(3.try_into().unwrap(), Value::Text(Text::new(client_id))); + exec_stmt(coro, &mut set_last_change_id_stmt).await?; + let session_schema_cookie = session.conn().read_schema_version()?; + if session_schema_cookie <= source_schema_cookie { + session + .conn() + .write_schema_version(source_schema_cookie + 1)?; } } + _ => {} } session.replay(coro, operation).await?; }