add test to the DatabaseTape

This commit is contained in:
Nikita Sivukhin
2025-07-30 00:55:40 +04:00
parent 4269a1fe7a
commit 7481573cf1

View File

@@ -309,8 +309,12 @@ fn parse_bin_record(bin_record: Vec<u8>) -> Result<Vec<turso::Value>> {
#[cfg(test)]
mod tests {
use tempfile::NamedTempFile;
use turso::Value;
use crate::database_tape::DatabaseTape;
use crate::{
database_tape::{DatabaseChangesIteratorOpts, DatabaseTape},
types::DatabaseTapeOperation,
};
async fn fetch_rows(conn: &turso::Connection, query: &str) -> Vec<Vec<turso::Value>> {
let mut rows = vec![];
@@ -326,7 +330,7 @@ mod tests {
}
#[tokio::test]
async fn test_database_cdc() {
async fn test_database_cdc_single_iteration() {
let temp_file1 = NamedTempFile::new().unwrap();
let db_path1 = temp_file1.path().to_str().unwrap();
@@ -475,4 +479,62 @@ mod tests {
]]
);
}
#[tokio::test]
async fn test_database_cdc_multiple_iterations() {
let temp_file1 = NamedTempFile::new().unwrap();
let db_path1 = temp_file1.path().to_str().unwrap();
let temp_file2 = NamedTempFile::new().unwrap();
let db_path2 = temp_file2.path().to_str().unwrap();
let db1 = turso::Builder::new_local(db_path1).build().await.unwrap();
let db1 = DatabaseTape::new(db1);
let conn1 = db1.connect().await.unwrap();
let db2 = turso::Builder::new_local(db_path2).build().await.unwrap();
let db2 = DatabaseTape::new(db2);
let conn2 = db2.connect().await.unwrap();
conn1
.execute("CREATE TABLE a(x INTEGER PRIMARY KEY, y);", ())
.await
.unwrap();
conn2
.execute("CREATE TABLE a(x INTEGER PRIMARY KEY, y);", ())
.await
.unwrap();
let mut next_change_id = None;
let mut expected = Vec::new();
for i in 0..10 {
conn1
.execute("INSERT INTO a VALUES (?, 'hello')", (i,))
.await
.unwrap();
expected.push(vec![
Value::Integer(i as i64),
Value::Text("hello".to_string()),
]);
let mut iterator = db1
.iterate_changes(DatabaseChangesIteratorOpts {
first_change_id: next_change_id,
..Default::default()
})
.await
.unwrap();
{
let mut replay = db2.start_tape_session().await.unwrap();
while let Some(change) = iterator.next().await.unwrap() {
if let DatabaseTapeOperation::RowChange(change) = &change {
next_change_id = Some(change.change_id + 1);
}
replay.replay(change).await.unwrap();
}
}
let conn2 = db2.connect().await.unwrap();
assert_eq!(fetch_rows(&conn2, "SELECT * FROM a").await, expected);
}
}
}