From 7481573cf1e7f1bc3cb3dbb94f4c298349c936a4 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 30 Jul 2025 00:55:40 +0400 Subject: [PATCH] add test to the DatabaseTape --- packages/turso-sync/src/database_tape.rs | 66 +++++++++++++++++++++++- 1 file changed, 64 insertions(+), 2 deletions(-) diff --git a/packages/turso-sync/src/database_tape.rs b/packages/turso-sync/src/database_tape.rs index 4610930dd..4b108301c 100644 --- a/packages/turso-sync/src/database_tape.rs +++ b/packages/turso-sync/src/database_tape.rs @@ -309,8 +309,12 @@ fn parse_bin_record(bin_record: Vec) -> Result> { #[cfg(test)] mod tests { use tempfile::NamedTempFile; + use turso::Value; - use crate::database_tape::DatabaseTape; + use crate::{ + database_tape::{DatabaseChangesIteratorOpts, DatabaseTape}, + types::DatabaseTapeOperation, + }; async fn fetch_rows(conn: &turso::Connection, query: &str) -> Vec> { let mut rows = vec![]; @@ -326,7 +330,7 @@ mod tests { } #[tokio::test] - async fn test_database_cdc() { + async fn test_database_cdc_single_iteration() { let temp_file1 = NamedTempFile::new().unwrap(); let db_path1 = temp_file1.path().to_str().unwrap(); @@ -475,4 +479,62 @@ mod tests { ]] ); } + + #[tokio::test] + async fn test_database_cdc_multiple_iterations() { + let temp_file1 = NamedTempFile::new().unwrap(); + let db_path1 = temp_file1.path().to_str().unwrap(); + + let temp_file2 = NamedTempFile::new().unwrap(); + let db_path2 = temp_file2.path().to_str().unwrap(); + + let db1 = turso::Builder::new_local(db_path1).build().await.unwrap(); + let db1 = DatabaseTape::new(db1); + let conn1 = db1.connect().await.unwrap(); + + let db2 = turso::Builder::new_local(db_path2).build().await.unwrap(); + let db2 = DatabaseTape::new(db2); + let conn2 = db2.connect().await.unwrap(); + + conn1 + .execute("CREATE TABLE a(x INTEGER PRIMARY KEY, y);", ()) + .await + .unwrap(); + conn2 + .execute("CREATE TABLE a(x INTEGER PRIMARY KEY, y);", ()) + .await + .unwrap(); + + let mut next_change_id = None; + let mut expected = Vec::new(); + for i in 0..10 { + conn1 + .execute("INSERT INTO a VALUES (?, 'hello')", (i,)) + .await + .unwrap(); + expected.push(vec![ + Value::Integer(i as i64), + Value::Text("hello".to_string()), + ]); + + let mut iterator = db1 + .iterate_changes(DatabaseChangesIteratorOpts { + first_change_id: next_change_id, + ..Default::default() + }) + .await + .unwrap(); + { + let mut replay = db2.start_tape_session().await.unwrap(); + while let Some(change) = iterator.next().await.unwrap() { + if let DatabaseTapeOperation::RowChange(change) = &change { + next_change_id = Some(change.change_id + 1); + } + replay.replay(change).await.unwrap(); + } + } + let conn2 = db2.connect().await.unwrap(); + assert_eq!(fetch_rows(&conn2, "SELECT * FROM a").await, expected); + } + } }