From d504f9b1acf1e0537916af22299feeca1ca2642e Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Mon, 4 Aug 2025 15:40:50 +0400 Subject: [PATCH] fix turso-sync tests --- packages/turso-sync/src/database_tape.rs | 28 +++++++++++++++++------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/packages/turso-sync/src/database_tape.rs b/packages/turso-sync/src/database_tape.rs index 4b108301c..0aa513ae9 100644 --- a/packages/turso-sync/src/database_tape.rs +++ b/packages/turso-sync/src/database_tape.rs @@ -69,6 +69,7 @@ impl DatabaseTape { query_stmt, txn_boundary_returned: false, mode: opts.mode, + ignore_schema_changes: opts.ignore_schema_changes, }) } /// Start replay session which can apply [DatabaseTapeOperation] from [Self::iterate_changes] @@ -118,6 +119,7 @@ pub struct DatabaseChangesIteratorOpts { pub first_change_id: Option, pub batch_size: usize, pub mode: DatabaseChangesIteratorMode, + pub ignore_schema_changes: bool, } impl Default for DatabaseChangesIteratorOpts { @@ -126,6 +128,7 @@ impl Default for DatabaseChangesIteratorOpts { first_change_id: None, batch_size: DEFAULT_CHANGES_BATCH_SIZE, mode: DatabaseChangesIteratorMode::Apply, + ignore_schema_changes: true, } } } @@ -136,6 +139,7 @@ pub struct DatabaseChangesIterator { batch: VecDeque, txn_boundary_returned: bool, mode: DatabaseChangesIteratorMode, + ignore_schema_changes: bool, } impl DatabaseChangesIterator { @@ -145,14 +149,22 @@ impl DatabaseChangesIterator { } // todo(sivukhin): iterator must be more clever about transaction boundaries - but for that we need to extend CDC table // for now, if iterator reach the end of CDC table - we are sure that this is a transaction boundary - if let Some(change) = self.batch.pop_front() { - self.txn_boundary_returned = false; - Ok(Some(DatabaseTapeOperation::RowChange(change))) - } else if !self.txn_boundary_returned { - self.txn_boundary_returned = true; - Ok(Some(DatabaseTapeOperation::Commit)) - } else { - Ok(None) + loop { + let next = if let Some(change) = self.batch.pop_front() { + self.txn_boundary_returned = false; + Some(DatabaseTapeOperation::RowChange(change)) + } else if !self.txn_boundary_returned { + self.txn_boundary_returned = true; + Some(DatabaseTapeOperation::Commit) + } else { + None + }; + if let Some(DatabaseTapeOperation::RowChange(change)) = &next { + if self.ignore_schema_changes && change.table_name == "sqlite_schema" { + continue; + } + } + return Ok(next); } } async fn refill(&mut self) -> Result<()> {