diff --git a/packages/turso-sync-engine/src/database_sync_engine.rs b/packages/turso-sync-engine/src/database_sync_engine.rs index a19b85be8..bb0dd7264 100644 --- a/packages/turso-sync-engine/src/database_sync_engine.rs +++ b/packages/turso-sync-engine/src/database_sync_engine.rs @@ -221,6 +221,8 @@ impl DatabaseSyncEngine { transfer_logical_changes(coro, &self.draft_tape, &synced, client_id, false).await?; self.push_synced_to_remote(coro).await?; + + self.reset_synced_if_dirty(coro).await?; Ok(()) } @@ -772,7 +774,7 @@ pub mod tests { .await .unwrap(); - runner.pull().await.unwrap(); + runner.sync().await.unwrap(); // create connection in outer scope in order to prevent Database from being dropped in between of pull operations let conn = runner.connect().await.unwrap(); @@ -791,7 +793,7 @@ pub mod tests { } tracing::info!("pull attempt={}", attempt); - runner.pull().await.unwrap(); + runner.sync().await.unwrap(); let expected = expected .iter() diff --git a/packages/turso-sync-engine/src/database_sync_operations.rs b/packages/turso-sync-engine/src/database_sync_operations.rs index 84d968383..e9e257462 100644 --- a/packages/turso-sync-engine/src/database_sync_operations.rs +++ b/packages/turso-sync-engine/src/database_sync_operations.rs @@ -361,7 +361,6 @@ pub async fn transfer_logical_changes( }; let mut rows_changed = 0; let mut changes = source.iterate_changes(iterate_opts)?; - let mut updated = false; while let Some(operation) = changes.next(coro).await? { match &operation { DatabaseTapeOperation::RowChange(change) => { @@ -381,35 +380,36 @@ pub async fn transfer_logical_changes( ); } last_change_id = Some(change.change_id); - updated = true; } - DatabaseTapeOperation::Commit if updated || bump_pull_gen => { - tracing::debug!("prepare update stmt for turso_sync_last_change_id table with client_id={} and last_change_id={:?}", client_id, last_change_id); - // update turso_sync_last_change_id table with new value before commit - let mut set_last_change_id_stmt = - session.conn().prepare(TURSO_SYNC_UPDATE_LAST_CHANGE_ID)?; - let (next_pull_gen, next_change_id) = if bump_pull_gen { - (source_pull_gen + 1, 0) - } else { - (source_pull_gen, last_change_id.unwrap_or(0)) - }; - tracing::debug!("transfer_logical_changes: client_id={client_id}, set pull_gen={next_pull_gen}, change_id={next_change_id}"); - set_last_change_id_stmt - .bind_at(1.try_into().unwrap(), Value::Integer(next_pull_gen)); - set_last_change_id_stmt - .bind_at(2.try_into().unwrap(), Value::Integer(next_change_id)); - set_last_change_id_stmt - .bind_at(3.try_into().unwrap(), Value::Text(Text::new(client_id))); - exec_stmt(coro, &mut set_last_change_id_stmt).await?; - - let session_schema_cookie = session.conn().read_schema_version()?; - if session_schema_cookie <= source_schema_cookie { - session - .conn() - .write_schema_version(source_schema_cookie + 1)?; + DatabaseTapeOperation::Commit => { + if rows_changed > 0 || (bump_pull_gen && last_change_id.unwrap_or(0) > 0) { + tracing::debug!("prepare update stmt for turso_sync_last_change_id table with client_id={} and last_change_id={:?}", client_id, last_change_id); + // update turso_sync_last_change_id table with new value before commit + let mut set_last_change_id_stmt = + session.conn().prepare(TURSO_SYNC_UPDATE_LAST_CHANGE_ID)?; + let (next_pull_gen, next_change_id) = if bump_pull_gen { + (source_pull_gen + 1, 0) + } else { + (source_pull_gen, last_change_id.unwrap_or(0)) + }; + tracing::debug!("transfer_logical_changes: client_id={client_id}, set pull_gen={next_pull_gen}, change_id={next_change_id}, rows_changed={rows_changed}"); + set_last_change_id_stmt + .bind_at(1.try_into().unwrap(), Value::Integer(next_pull_gen)); + set_last_change_id_stmt + .bind_at(2.try_into().unwrap(), Value::Integer(next_change_id)); + set_last_change_id_stmt + .bind_at(3.try_into().unwrap(), Value::Text(Text::new(client_id))); + exec_stmt(coro, &mut set_last_change_id_stmt).await?; + } + if bump_pull_gen { + let session_schema_cookie = session.conn().read_schema_version()?; + if session_schema_cookie <= source_schema_cookie { + session + .conn() + .write_schema_version(source_schema_cookie + 1)?; + } } } - _ => {} } session.replay(coro, operation).await?; } @@ -653,12 +653,16 @@ pub mod tests { let mut gen = genawaiter::sync::Gen::new(|coro| async move { let conn1 = db1.connect(&coro).await?; - conn1.execute("CREATE TABLE t(x, y)")?; - conn1.execute("INSERT INTO t VALUES (1, 2), (3, 4), (5, 6)")?; + conn1.execute("CREATE TABLE t(x, y)").unwrap(); + conn1 + .execute("INSERT INTO t VALUES (1, 2), (3, 4), (5, 6)") + .unwrap(); - let conn2 = db2.connect(&coro).await?; + let conn2 = db2.connect(&coro).await.unwrap(); - transfer_logical_changes(&coro, &db1, &db2, "id-1", false).await?; + transfer_logical_changes(&coro, &db1, &db2, "id-1", false) + .await + .unwrap(); let mut rows = Vec::new(); let mut stmt = conn2.prepare("SELECT x, y FROM t").unwrap(); @@ -674,8 +678,10 @@ pub mod tests { ] ); - conn1.execute("INSERT INTO t VALUES (7, 8)")?; - transfer_logical_changes(&coro, &db1, &db2, "id-1", false).await?; + conn1.execute("INSERT INTO t VALUES (7, 8)").unwrap(); + transfer_logical_changes(&coro, &db1, &db2, "id-1", false) + .await + .unwrap(); let mut rows = Vec::new(); let mut stmt = conn2.prepare("SELECT x, y FROM t").unwrap(); diff --git a/packages/turso-sync-engine/src/database_tape.rs b/packages/turso-sync-engine/src/database_tape.rs index 491ff65dd..bec747df8 100644 --- a/packages/turso-sync-engine/src/database_tape.rs +++ b/packages/turso-sync-engine/src/database_tape.rs @@ -167,12 +167,14 @@ impl DatabaseTape { opts: DatabaseReplaySessionOpts, ) -> Result { tracing::debug!("opening replay session"); + let conn = self.connect(coro).await?; + conn.execute("BEGIN IMMEDIATE")?; Ok(DatabaseReplaySession { - conn: self.connect(coro).await?, + conn, cached_delete_stmt: HashMap::new(), cached_insert_stmt: HashMap::new(), cached_update_stmt: HashMap::new(), - in_txn: false, + in_txn: true, opts, }) } @@ -407,7 +409,7 @@ impl DatabaseChangesIterator { } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct DatabaseReplaySessionOpts { pub use_implicit_rowid: bool, } diff --git a/packages/turso-sync-engine/src/test_sync_server.rs b/packages/turso-sync-engine/src/test_sync_server.rs index 9ad902df5..ae5be4a22 100644 --- a/packages/turso-sync-engine/src/test_sync_server.rs +++ b/packages/turso-sync-engine/src/test_sync_server.rs @@ -313,6 +313,7 @@ impl TestSyncServer { pub async fn execute(&self, sql: &str, params: impl turso::IntoParams) -> Result<()> { let conn = self.db.connect()?; conn.execute(sql, params).await?; + tracing::debug!("sync_frames_from_conn after execute"); self.sync_frames_from_conn(&conn).await?; Ok(()) }