From 3e7e66c0e7908bafba877add312da81f16a18e15 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Mon, 7 Jul 2025 12:44:50 +0400 Subject: [PATCH] add basic cdc tests --- tests/integration/functions/mod.rs | 1 + tests/integration/functions/test_cdc.rs | 557 ++++++++++++++++++++++++ 2 files changed, 558 insertions(+) create mode 100644 tests/integration/functions/test_cdc.rs diff --git a/tests/integration/functions/mod.rs b/tests/integration/functions/mod.rs index 66fcb1cb5..5a2372f24 100644 --- a/tests/integration/functions/mod.rs +++ b/tests/integration/functions/mod.rs @@ -1 +1,2 @@ mod test_function_rowid; +mod test_cdc; diff --git a/tests/integration/functions/test_cdc.rs b/tests/integration/functions/test_cdc.rs new file mode 100644 index 000000000..e69751b68 --- /dev/null +++ b/tests/integration/functions/test_cdc.rs @@ -0,0 +1,557 @@ +use rusqlite::types::Value; + +use crate::common::{limbo_exec_rows, TempDatabase}; + +fn replace_column_with_null(rows: Vec>, column: usize) -> Vec> { + rows.into_iter() + .map(|row| { + row.into_iter() + .enumerate() + .map(|(i, value)| if i == column { Value::Null } else { value }) + .collect() + }) + .collect() +} + +#[test] +fn test_cdc_simple() { + let db = TempDatabase::new_empty(false); + let conn = db.connect_limbo(); + conn.execute("PRAGMA unstable_capture_data_changes_conn('rowid-only')") + .unwrap(); + conn.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y)") + .unwrap(); + conn.execute("INSERT INTO t VALUES (10, 10), (5, 1)") + .unwrap(); + let rows = limbo_exec_rows(&db, &conn, "SELECT * FROM t"); + assert_eq!( + rows, + vec![ + vec![Value::Integer(5), Value::Integer(1)], + vec![Value::Integer(10), Value::Integer(10)], + ] + ); + let rows = replace_column_with_null(limbo_exec_rows(&db, &conn, "SELECT * FROM turso_cdc"), 1); + assert_eq!( + rows, + vec![ + vec![ + Value::Integer(1), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(10) + ], + vec![ + Value::Integer(2), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(5) + ] + ] + ); +} + +#[test] +fn test_cdc_crud() { + let db = TempDatabase::new_empty(false); + let conn = db.connect_limbo(); + conn.execute("PRAGMA unstable_capture_data_changes_conn('rowid-only')") + .unwrap(); + conn.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y)") + .unwrap(); + conn.execute("INSERT INTO t VALUES (20, 20), (10, 10), (5, 1)") + .unwrap(); + conn.execute("UPDATE t SET y = 100 WHERE x = 5").unwrap(); + conn.execute("DELETE FROM t WHERE x > 5").unwrap(); + conn.execute("INSERT INTO t VALUES (1, 1)").unwrap(); + conn.execute("UPDATE t SET x = 2 WHERE x = 1").unwrap(); + + let rows = limbo_exec_rows(&db, &conn, "SELECT * FROM t"); + assert_eq!( + rows, + vec![ + vec![Value::Integer(2), Value::Integer(1)], + vec![Value::Integer(5), Value::Integer(100)], + ] + ); + let rows = replace_column_with_null(limbo_exec_rows(&db, &conn, "SELECT * FROM turso_cdc"), 1); + assert_eq!( + rows, + vec![ + vec![ + Value::Integer(1), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(20) + ], + vec![ + Value::Integer(2), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(10) + ], + vec![ + Value::Integer(3), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(5) + ], + vec![ + Value::Integer(4), + Value::Null, + Value::Integer(0), + Value::Text("t".to_string()), + Value::Integer(5) + ], + vec![ + Value::Integer(5), + Value::Null, + Value::Integer(-1), + Value::Text("t".to_string()), + Value::Integer(10) + ], + vec![ + Value::Integer(6), + Value::Null, + Value::Integer(-1), + Value::Text("t".to_string()), + Value::Integer(20) + ], + vec![ + Value::Integer(7), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(1) + ], + vec![ + Value::Integer(8), + Value::Null, + Value::Integer(-1), + Value::Text("t".to_string()), + Value::Integer(1) + ], + vec![ + Value::Integer(9), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(2) + ], + ] + ); +} + +#[test] +fn test_cdc_failed_op() { + let db = TempDatabase::new_empty(true); + let conn = db.connect_limbo(); + conn.execute("PRAGMA unstable_capture_data_changes_conn('rowid-only')") + .unwrap(); + conn.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y UNIQUE)") + .unwrap(); + conn.execute("INSERT INTO t VALUES (1, 10), (2, 20)") + .unwrap(); + assert!(conn + .execute("INSERT INTO t VALUES (3, 30), (4, 40), (5, 10)") + .is_err()); + conn.execute("INSERT INTO t VALUES (6, 60), (7, 70)") + .unwrap(); + + let rows = limbo_exec_rows(&db, &conn, "SELECT * FROM t"); + assert_eq!( + rows, + vec![ + vec![Value::Integer(1), Value::Integer(10)], + vec![Value::Integer(2), Value::Integer(20)], + vec![Value::Integer(6), Value::Integer(60)], + vec![Value::Integer(7), Value::Integer(70)], + ] + ); + let rows = replace_column_with_null(limbo_exec_rows(&db, &conn, "SELECT * FROM turso_cdc"), 1); + assert_eq!( + rows, + vec![ + vec![ + Value::Integer(1), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(1) + ], + vec![ + Value::Integer(2), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(2) + ], + vec![ + Value::Integer(3), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(6) + ], + vec![ + Value::Integer(4), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(7) + ], + ] + ); +} + +#[test] +fn test_cdc_uncaptured_connection() { + let db = TempDatabase::new_empty(true); + let conn1 = db.connect_limbo(); + conn1 + .execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y UNIQUE)") + .unwrap(); + conn1.execute("INSERT INTO t VALUES (1, 10)").unwrap(); + conn1 + .execute("PRAGMA unstable_capture_data_changes_conn('rowid-only')") + .unwrap(); + conn1.execute("INSERT INTO t VALUES (2, 20)").unwrap(); // captured + let conn2 = db.connect_limbo(); + conn2.execute("INSERT INTO t VALUES (3, 30)").unwrap(); + conn2 + .execute("PRAGMA unstable_capture_data_changes_conn('rowid-only')") + .unwrap(); + conn2.execute("INSERT INTO t VALUES (4, 40)").unwrap(); // captured + conn2 + .execute("PRAGMA unstable_capture_data_changes_conn('off')") + .unwrap(); + conn2.execute("INSERT INTO t VALUES (5, 50)").unwrap(); + + conn1.execute("INSERT INTO t VALUES (6, 60)").unwrap(); // captured + conn1 + .execute("PRAGMA unstable_capture_data_changes_conn('off')") + .unwrap(); + conn1.execute("INSERT INTO t VALUES (7, 70)").unwrap(); + + let rows = limbo_exec_rows(&db, &conn1, "SELECT * FROM t"); + assert_eq!( + rows, + vec![ + vec![Value::Integer(1), Value::Integer(10)], + vec![Value::Integer(2), Value::Integer(20)], + vec![Value::Integer(3), Value::Integer(30)], + vec![Value::Integer(4), Value::Integer(40)], + vec![Value::Integer(5), Value::Integer(50)], + vec![Value::Integer(6), Value::Integer(60)], + vec![Value::Integer(7), Value::Integer(70)], + ] + ); + let rows = replace_column_with_null(limbo_exec_rows(&db, &conn1, "SELECT * FROM turso_cdc"), 1); + assert_eq!( + rows, + vec![ + vec![ + Value::Integer(1), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(2) + ], + vec![ + Value::Integer(2), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(4) + ], + vec![ + Value::Integer(3), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(6) + ], + ] + ); +} + +#[test] +fn test_cdc_custom_table() { + let db = TempDatabase::new_empty(true); + let conn1 = db.connect_limbo(); + conn1 + .execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y UNIQUE)") + .unwrap(); + conn1 + .execute("PRAGMA unstable_capture_data_changes_conn('rowid-only,custom_cdc')") + .unwrap(); + conn1.execute("INSERT INTO t VALUES (1, 10)").unwrap(); + conn1.execute("INSERT INTO t VALUES (2, 20)").unwrap(); + let rows = limbo_exec_rows(&db, &conn1, "SELECT * FROM t"); + assert_eq!( + rows, + vec![ + vec![Value::Integer(1), Value::Integer(10)], + vec![Value::Integer(2), Value::Integer(20)], + ] + ); + let rows = + replace_column_with_null(limbo_exec_rows(&db, &conn1, "SELECT * FROM custom_cdc"), 1); + assert_eq!( + rows, + vec![ + vec![ + Value::Integer(1), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(1) + ], + vec![ + Value::Integer(2), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(2) + ], + ] + ); +} + +#[test] +fn test_cdc_ignore_changes_in_cdc_table() { + let db = TempDatabase::new_empty(true); + let conn1 = db.connect_limbo(); + conn1 + .execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y UNIQUE)") + .unwrap(); + conn1 + .execute("PRAGMA unstable_capture_data_changes_conn('rowid-only,custom_cdc')") + .unwrap(); + conn1.execute("INSERT INTO t VALUES (1, 10)").unwrap(); + conn1.execute("INSERT INTO t VALUES (2, 20)").unwrap(); + let rows = limbo_exec_rows(&db, &conn1, "SELECT * FROM t"); + assert_eq!( + rows, + vec![ + vec![Value::Integer(1), Value::Integer(10)], + vec![Value::Integer(2), Value::Integer(20)], + ] + ); + conn1 + .execute("DELETE FROM custom_cdc WHERE operation_id < 2") + .unwrap(); + let rows = + replace_column_with_null(limbo_exec_rows(&db, &conn1, "SELECT * FROM custom_cdc"), 1); + assert_eq!( + rows, + vec![vec![ + Value::Integer(2), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(2) + ],] + ); +} + +#[test] +fn test_cdc_transaction() { + let db = TempDatabase::new_empty(true); + let conn1 = db.connect_limbo(); + conn1 + .execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y UNIQUE)") + .unwrap(); + conn1 + .execute("CREATE TABLE q(x INTEGER PRIMARY KEY, y UNIQUE)") + .unwrap(); + conn1 + .execute("PRAGMA unstable_capture_data_changes_conn('rowid-only,custom_cdc')") + .unwrap(); + conn1.execute("BEGIN").unwrap(); + conn1.execute("INSERT INTO t VALUES (1, 10)").unwrap(); + conn1.execute("INSERT INTO q VALUES (2, 20)").unwrap(); + conn1.execute("INSERT INTO t VALUES (3, 30)").unwrap(); + conn1.execute("DELETE FROM t WHERE x = 1").unwrap(); + conn1.execute("UPDATE q SET y = 200 WHERE x = 2").unwrap(); + conn1.execute("COMMIT").unwrap(); + let rows = limbo_exec_rows(&db, &conn1, "SELECT * FROM t"); + assert_eq!(rows, vec![vec![Value::Integer(3), Value::Integer(30)],]); + let rows = limbo_exec_rows(&db, &conn1, "SELECT * FROM q"); + assert_eq!(rows, vec![vec![Value::Integer(2), Value::Integer(200)],]); + let rows = + replace_column_with_null(limbo_exec_rows(&db, &conn1, "SELECT * FROM custom_cdc"), 1); + assert_eq!( + rows, + vec![ + vec![ + Value::Integer(1), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(1) + ], + vec![ + Value::Integer(2), + Value::Null, + Value::Integer(1), + Value::Text("q".to_string()), + Value::Integer(2) + ], + vec![ + Value::Integer(3), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(3) + ], + vec![ + Value::Integer(4), + Value::Null, + Value::Integer(-1), + Value::Text("t".to_string()), + Value::Integer(1) + ], + vec![ + Value::Integer(5), + Value::Null, + Value::Integer(0), + Value::Text("q".to_string()), + Value::Integer(2) + ], + ] + ); +} + +#[test] +fn test_cdc_independent_connections() { + let db = TempDatabase::new_empty(true); + let conn1 = db.connect_limbo(); + let conn2 = db.connect_limbo(); + conn1 + .execute("PRAGMA unstable_capture_data_changes_conn('rowid-only,custom_cdc1')") + .unwrap(); + conn2 + .execute("PRAGMA unstable_capture_data_changes_conn('rowid-only,custom_cdc2')") + .unwrap(); + conn1 + .execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y UNIQUE)") + .unwrap(); + conn1.execute("INSERT INTO t VALUES (1, 10)").unwrap(); + conn2.execute("INSERT INTO t VALUES (2, 20)").unwrap(); + let rows = limbo_exec_rows(&db, &conn1, "SELECT * FROM t"); + assert_eq!( + rows, + vec![ + vec![Value::Integer(1), Value::Integer(10)], + vec![Value::Integer(2), Value::Integer(20)] + ] + ); + let rows = + replace_column_with_null(limbo_exec_rows(&db, &conn1, "SELECT * FROM custom_cdc1"), 1); + assert_eq!( + rows, + vec![vec![ + Value::Integer(1), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(1) + ]] + ); + let rows = + replace_column_with_null(limbo_exec_rows(&db, &conn1, "SELECT * FROM custom_cdc2"), 1); + assert_eq!( + rows, + vec![vec![ + Value::Integer(1), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(2) + ]] + ); +} + +#[test] +fn test_cdc_independent_connections_different_cdc_not_ignore() { + let db = TempDatabase::new_empty(true); + let conn1 = db.connect_limbo(); + let conn2 = db.connect_limbo(); + conn1 + .execute("PRAGMA unstable_capture_data_changes_conn('rowid-only,custom_cdc1')") + .unwrap(); + conn2 + .execute("PRAGMA unstable_capture_data_changes_conn('rowid-only,custom_cdc2')") + .unwrap(); + conn1 + .execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y UNIQUE)") + .unwrap(); + conn1.execute("INSERT INTO t VALUES (1, 10)").unwrap(); + conn1.execute("INSERT INTO t VALUES (2, 20)").unwrap(); + conn2.execute("INSERT INTO t VALUES (3, 30)").unwrap(); + conn2.execute("INSERT INTO t VALUES (4, 40)").unwrap(); + conn1 + .execute("DELETE FROM custom_cdc2 WHERE operation_id < 2") + .unwrap(); + conn2 + .execute("DELETE FROM custom_cdc1 WHERE operation_id < 2") + .unwrap(); + let rows = limbo_exec_rows(&db, &conn1, "SELECT * FROM t"); + assert_eq!( + rows, + vec![ + vec![Value::Integer(1), Value::Integer(10)], + vec![Value::Integer(2), Value::Integer(20)], + vec![Value::Integer(3), Value::Integer(30)], + vec![Value::Integer(4), Value::Integer(40)], + ] + ); + let rows = + replace_column_with_null(limbo_exec_rows(&db, &conn1, "SELECT * FROM custom_cdc1"), 1); + assert_eq!( + rows, + vec![ + vec![ + Value::Integer(2), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(2) + ], + vec![ + Value::Integer(3), + Value::Null, + Value::Integer(-1), + Value::Text("custom_cdc2".to_string()), + Value::Integer(1) + ] + ] + ); + let rows = + replace_column_with_null(limbo_exec_rows(&db, &conn2, "SELECT * FROM custom_cdc2"), 1); + assert_eq!( + rows, + vec![ + vec![ + Value::Integer(2), + Value::Null, + Value::Integer(1), + Value::Text("t".to_string()), + Value::Integer(4) + ], + vec![ + Value::Integer(3), + Value::Null, + Value::Integer(-1), + Value::Text("custom_cdc1".to_string()), + Value::Integer(1) + ] + ] + ); +}