This commit is contained in:
Nikita Sivukhin
2025-07-10 15:28:23 +04:00
parent b258c10c9a
commit fabb00f385
2 changed files with 110 additions and 46 deletions

View File

@@ -1157,7 +1157,7 @@ fn emit_update_insns(
"rowid_set_clause_reg must be set because has_user_provided_rowid is true",
),
dst_reg: rowid_reg,
amount: 1,
amount: 0,
});
emit_cdc_insns(
program,
@@ -1173,7 +1173,7 @@ fn emit_update_insns(
program.emit_insn(Insn::Copy {
src_reg: rowid_set_clause_reg.unwrap_or(beg),
dst_reg: rowid_reg,
amount: 1,
amount: 0,
});
emit_cdc_insns(
program,

View File

@@ -17,7 +17,7 @@ fn replace_column_with_null(rows: Vec<Vec<Value>>, column: usize) -> Vec<Vec<Val
fn test_cdc_simple() {
let db = TempDatabase::new_empty(false);
let conn = db.connect_limbo();
conn.execute("PRAGMA unstable_capture_data_changes_conn('rowid-only')")
conn.execute("PRAGMA unstable_capture_data_changes_conn('id')")
.unwrap();
conn.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y)")
.unwrap();
@@ -40,14 +40,18 @@ fn test_cdc_simple() {
Value::Null,
Value::Integer(1),
Value::Text("t".to_string()),
Value::Integer(10)
Value::Integer(10),
Value::Null,
Value::Null,
],
vec![
Value::Integer(2),
Value::Null,
Value::Integer(1),
Value::Text("t".to_string()),
Value::Integer(5)
Value::Integer(5),
Value::Null,
Value::Null,
]
]
);
@@ -57,7 +61,7 @@ fn test_cdc_simple() {
fn test_cdc_crud() {
let db = TempDatabase::new_empty(false);
let conn = db.connect_limbo();
conn.execute("PRAGMA unstable_capture_data_changes_conn('rowid-only')")
conn.execute("PRAGMA unstable_capture_data_changes_conn('id')")
.unwrap();
conn.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y)")
.unwrap();
@@ -85,63 +89,81 @@ fn test_cdc_crud() {
Value::Null,
Value::Integer(1),
Value::Text("t".to_string()),
Value::Integer(20)
Value::Integer(20),
Value::Null,
Value::Null,
],
vec![
Value::Integer(2),
Value::Null,
Value::Integer(1),
Value::Text("t".to_string()),
Value::Integer(10)
Value::Integer(10),
Value::Null,
Value::Null,
],
vec![
Value::Integer(3),
Value::Null,
Value::Integer(1),
Value::Text("t".to_string()),
Value::Integer(5)
Value::Integer(5),
Value::Null,
Value::Null,
],
vec![
Value::Integer(4),
Value::Null,
Value::Integer(0),
Value::Text("t".to_string()),
Value::Integer(5)
Value::Integer(5),
Value::Null,
Value::Null,
],
vec![
Value::Integer(5),
Value::Null,
Value::Integer(-1),
Value::Text("t".to_string()),
Value::Integer(10)
Value::Integer(10),
Value::Null,
Value::Null,
],
vec![
Value::Integer(6),
Value::Null,
Value::Integer(-1),
Value::Text("t".to_string()),
Value::Integer(20)
Value::Integer(20),
Value::Null,
Value::Null,
],
vec![
Value::Integer(7),
Value::Null,
Value::Integer(1),
Value::Text("t".to_string()),
Value::Integer(1)
Value::Integer(1),
Value::Null,
Value::Null,
],
vec![
Value::Integer(8),
Value::Null,
Value::Integer(-1),
Value::Text("t".to_string()),
Value::Integer(1)
Value::Integer(1),
Value::Null,
Value::Null,
],
vec![
Value::Integer(9),
Value::Null,
Value::Integer(1),
Value::Text("t".to_string()),
Value::Integer(2)
Value::Integer(2),
Value::Null,
Value::Null,
],
]
);
@@ -151,7 +173,7 @@ fn test_cdc_crud() {
fn test_cdc_failed_op() {
let db = TempDatabase::new_empty(true);
let conn = db.connect_limbo();
conn.execute("PRAGMA unstable_capture_data_changes_conn('rowid-only')")
conn.execute("PRAGMA unstable_capture_data_changes_conn('id')")
.unwrap();
conn.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y UNIQUE)")
.unwrap();
@@ -182,28 +204,36 @@ fn test_cdc_failed_op() {
Value::Null,
Value::Integer(1),
Value::Text("t".to_string()),
Value::Integer(1)
Value::Integer(1),
Value::Null,
Value::Null,
],
vec![
Value::Integer(2),
Value::Null,
Value::Integer(1),
Value::Text("t".to_string()),
Value::Integer(2)
Value::Integer(2),
Value::Null,
Value::Null,
],
vec![
Value::Integer(3),
Value::Null,
Value::Integer(1),
Value::Text("t".to_string()),
Value::Integer(6)
Value::Integer(6),
Value::Null,
Value::Null,
],
vec![
Value::Integer(4),
Value::Null,
Value::Integer(1),
Value::Text("t".to_string()),
Value::Integer(7)
Value::Integer(7),
Value::Null,
Value::Null,
],
]
);
@@ -218,13 +248,13 @@ fn test_cdc_uncaptured_connection() {
.unwrap();
conn1.execute("INSERT INTO t VALUES (1, 10)").unwrap();
conn1
.execute("PRAGMA unstable_capture_data_changes_conn('rowid-only')")
.execute("PRAGMA unstable_capture_data_changes_conn('id')")
.unwrap();
conn1.execute("INSERT INTO t VALUES (2, 20)").unwrap(); // captured
let conn2 = db.connect_limbo();
conn2.execute("INSERT INTO t VALUES (3, 30)").unwrap();
conn2
.execute("PRAGMA unstable_capture_data_changes_conn('rowid-only')")
.execute("PRAGMA unstable_capture_data_changes_conn('id')")
.unwrap();
conn2.execute("INSERT INTO t VALUES (4, 40)").unwrap(); // captured
conn2
@@ -260,21 +290,27 @@ fn test_cdc_uncaptured_connection() {
Value::Null,
Value::Integer(1),
Value::Text("t".to_string()),
Value::Integer(2)
Value::Integer(2),
Value::Null,
Value::Null,
],
vec![
Value::Integer(2),
Value::Null,
Value::Integer(1),
Value::Text("t".to_string()),
Value::Integer(4)
Value::Integer(4),
Value::Null,
Value::Null,
],
vec![
Value::Integer(3),
Value::Null,
Value::Integer(1),
Value::Text("t".to_string()),
Value::Integer(6)
Value::Integer(6),
Value::Null,
Value::Null,
],
]
);
@@ -288,7 +324,7 @@ fn test_cdc_custom_table() {
.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y UNIQUE)")
.unwrap();
conn1
.execute("PRAGMA unstable_capture_data_changes_conn('rowid-only,custom_cdc')")
.execute("PRAGMA unstable_capture_data_changes_conn('id,custom_cdc')")
.unwrap();
conn1.execute("INSERT INTO t VALUES (1, 10)").unwrap();
conn1.execute("INSERT INTO t VALUES (2, 20)").unwrap();
@@ -310,14 +346,18 @@ fn test_cdc_custom_table() {
Value::Null,
Value::Integer(1),
Value::Text("t".to_string()),
Value::Integer(1)
Value::Integer(1),
Value::Null,
Value::Null,
],
vec![
Value::Integer(2),
Value::Null,
Value::Integer(1),
Value::Text("t".to_string()),
Value::Integer(2)
Value::Integer(2),
Value::Null,
Value::Null,
],
]
);
@@ -331,7 +371,7 @@ fn test_cdc_ignore_changes_in_cdc_table() {
.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y UNIQUE)")
.unwrap();
conn1
.execute("PRAGMA unstable_capture_data_changes_conn('rowid-only,custom_cdc')")
.execute("PRAGMA unstable_capture_data_changes_conn('id,custom_cdc')")
.unwrap();
conn1.execute("INSERT INTO t VALUES (1, 10)").unwrap();
conn1.execute("INSERT INTO t VALUES (2, 20)").unwrap();
@@ -355,7 +395,9 @@ fn test_cdc_ignore_changes_in_cdc_table() {
Value::Null,
Value::Integer(1),
Value::Text("t".to_string()),
Value::Integer(2)
Value::Integer(2),
Value::Null,
Value::Null,
],]
);
}
@@ -371,7 +413,7 @@ fn test_cdc_transaction() {
.execute("CREATE TABLE q(x INTEGER PRIMARY KEY, y UNIQUE)")
.unwrap();
conn1
.execute("PRAGMA unstable_capture_data_changes_conn('rowid-only,custom_cdc')")
.execute("PRAGMA unstable_capture_data_changes_conn('id,custom_cdc')")
.unwrap();
conn1.execute("BEGIN").unwrap();
conn1.execute("INSERT INTO t VALUES (1, 10)").unwrap();
@@ -394,35 +436,45 @@ fn test_cdc_transaction() {
Value::Null,
Value::Integer(1),
Value::Text("t".to_string()),
Value::Integer(1)
Value::Integer(1),
Value::Null,
Value::Null,
],
vec![
Value::Integer(2),
Value::Null,
Value::Integer(1),
Value::Text("q".to_string()),
Value::Integer(2)
Value::Integer(2),
Value::Null,
Value::Null,
],
vec![
Value::Integer(3),
Value::Null,
Value::Integer(1),
Value::Text("t".to_string()),
Value::Integer(3)
Value::Integer(3),
Value::Null,
Value::Null,
],
vec![
Value::Integer(4),
Value::Null,
Value::Integer(-1),
Value::Text("t".to_string()),
Value::Integer(1)
Value::Integer(1),
Value::Null,
Value::Null,
],
vec![
Value::Integer(5),
Value::Null,
Value::Integer(0),
Value::Text("q".to_string()),
Value::Integer(2)
Value::Integer(2),
Value::Null,
Value::Null,
],
]
);
@@ -434,10 +486,10 @@ fn test_cdc_independent_connections() {
let conn1 = db.connect_limbo();
let conn2 = db.connect_limbo();
conn1
.execute("PRAGMA unstable_capture_data_changes_conn('rowid-only,custom_cdc1')")
.execute("PRAGMA unstable_capture_data_changes_conn('id,custom_cdc1')")
.unwrap();
conn2
.execute("PRAGMA unstable_capture_data_changes_conn('rowid-only,custom_cdc2')")
.execute("PRAGMA unstable_capture_data_changes_conn('id,custom_cdc2')")
.unwrap();
conn1
.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y UNIQUE)")
@@ -461,7 +513,9 @@ fn test_cdc_independent_connections() {
Value::Null,
Value::Integer(1),
Value::Text("t".to_string()),
Value::Integer(1)
Value::Integer(1),
Value::Null,
Value::Null,
]]
);
let rows =
@@ -473,7 +527,9 @@ fn test_cdc_independent_connections() {
Value::Null,
Value::Integer(1),
Value::Text("t".to_string()),
Value::Integer(2)
Value::Integer(2),
Value::Null,
Value::Null,
]]
);
}
@@ -484,10 +540,10 @@ fn test_cdc_independent_connections_different_cdc_not_ignore() {
let conn1 = db.connect_limbo();
let conn2 = db.connect_limbo();
conn1
.execute("PRAGMA unstable_capture_data_changes_conn('rowid-only,custom_cdc1')")
.execute("PRAGMA unstable_capture_data_changes_conn('id,custom_cdc1')")
.unwrap();
conn2
.execute("PRAGMA unstable_capture_data_changes_conn('rowid-only,custom_cdc2')")
.execute("PRAGMA unstable_capture_data_changes_conn('id,custom_cdc2')")
.unwrap();
conn1
.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y UNIQUE)")
@@ -522,14 +578,18 @@ fn test_cdc_independent_connections_different_cdc_not_ignore() {
Value::Null,
Value::Integer(1),
Value::Text("t".to_string()),
Value::Integer(2)
Value::Integer(2),
Value::Null,
Value::Null,
],
vec![
Value::Integer(3),
Value::Null,
Value::Integer(-1),
Value::Text("custom_cdc2".to_string()),
Value::Integer(1)
Value::Integer(1),
Value::Null,
Value::Null,
]
]
);
@@ -543,14 +603,18 @@ fn test_cdc_independent_connections_different_cdc_not_ignore() {
Value::Null,
Value::Integer(1),
Value::Text("t".to_string()),
Value::Integer(4)
Value::Integer(4),
Value::Null,
Value::Null,
],
vec![
Value::Integer(3),
Value::Null,
Value::Integer(-1),
Value::Text("custom_cdc1".to_string()),
Value::Integer(1)
Value::Integer(1),
Value::Null,
Value::Null,
]
]
);