fix turso-sync tests

This commit is contained in:
Nikita Sivukhin
2025-08-04 15:40:50 +04:00
parent c0d5c55d5c
commit d504f9b1ac

View File

@@ -69,6 +69,7 @@ impl DatabaseTape {
query_stmt,
txn_boundary_returned: false,
mode: opts.mode,
ignore_schema_changes: opts.ignore_schema_changes,
})
}
/// Start replay session which can apply [DatabaseTapeOperation] from [Self::iterate_changes]
@@ -118,6 +119,7 @@ pub struct DatabaseChangesIteratorOpts {
pub first_change_id: Option<i64>,
pub batch_size: usize,
pub mode: DatabaseChangesIteratorMode,
pub ignore_schema_changes: bool,
}
impl Default for DatabaseChangesIteratorOpts {
@@ -126,6 +128,7 @@ impl Default for DatabaseChangesIteratorOpts {
first_change_id: None,
batch_size: DEFAULT_CHANGES_BATCH_SIZE,
mode: DatabaseChangesIteratorMode::Apply,
ignore_schema_changes: true,
}
}
}
@@ -136,6 +139,7 @@ pub struct DatabaseChangesIterator {
batch: VecDeque<DatabaseTapeRowChange>,
txn_boundary_returned: bool,
mode: DatabaseChangesIteratorMode,
ignore_schema_changes: bool,
}
impl DatabaseChangesIterator {
@@ -145,14 +149,22 @@ impl DatabaseChangesIterator {
}
// todo(sivukhin): iterator must be more clever about transaction boundaries - but for that we need to extend CDC table
// for now, if iterator reach the end of CDC table - we are sure that this is a transaction boundary
if let Some(change) = self.batch.pop_front() {
self.txn_boundary_returned = false;
Ok(Some(DatabaseTapeOperation::RowChange(change)))
} else if !self.txn_boundary_returned {
self.txn_boundary_returned = true;
Ok(Some(DatabaseTapeOperation::Commit))
} else {
Ok(None)
loop {
let next = if let Some(change) = self.batch.pop_front() {
self.txn_boundary_returned = false;
Some(DatabaseTapeOperation::RowChange(change))
} else if !self.txn_boundary_returned {
self.txn_boundary_returned = true;
Some(DatabaseTapeOperation::Commit)
} else {
None
};
if let Some(DatabaseTapeOperation::RowChange(change)) = &next {
if self.ignore_schema_changes && change.table_name == "sqlite_schema" {
continue;
}
}
return Ok(next);
}
}
async fn refill(&mut self) -> Result<()> {