Merge 'turso-cdc: add updates column for cdc table' from Nikita Sivukhin

This PR adds new `updates` column to the CDC table. This column holds
updated fields of the row in the following format:
```
[C boolean values where true set for changed columns]
[C values with updates where NULL is set for not-changed columns]
```
For example:
```
turso> UPDATE t SET y = 'turso', q = 'db' WHERE rowid = 1;
turso> SELECT bin_record_json_object('["x","y","z","q","x","y","z","q"]', updates) as updates FROM turso_cdc;
┌──────────────────────────────────────────────────────────────────┐
│ updates                                                          │
├──────────────────────────────────────────────────────────────────┤
│ {"x":0,"y":1,"z":0,"q":1,"x":null,"y":"turso","z":null,"q":"db"} │
└──────────────────────────────────────────────────────────────────┘
```
Also, this column works differently for `ALTER TABLE` statements where
update value for `sql` will be equal to the original `ALTER TABLE`:
```
turso> ALTER TABLE t ADD COLUMN t;
turso> SELECT bin_record_json_object('["type","name","tbl_name","rootpage","sql","type","name","tbl_name","rootpage","sql"]', updates) as updates FROM turso_cdc WHERE rowid = 2;
┌───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ updates                                                                                                                                           │
├───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┤
│ {"type":0,"name":0,"tbl_name":0,"rootpage":0,"sql":1,"type":null,"name":null,"tbl_name":null,"rootpage":null,"sql":"ALTER TABLE t ADD COLUMN t;"} │
└───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
```
This will help turso-db to implement logical replication which supports
both column-level updates and schema changes

Closes #2538
This commit is contained in:
Pekka Enberg
2025-08-12 09:50:16 +03:00
committed by GitHub
11 changed files with 191 additions and 9 deletions

View File

@@ -617,6 +617,9 @@ impl CaptureDataChangesMode {
))
}
}
pub fn has_updates(&self) -> bool {
matches!(self, CaptureDataChangesMode::Full { .. })
}
pub fn has_after(&self) -> bool {
matches!(
self,

View File

@@ -13,7 +13,7 @@ use crate::{
LimboError, Result, SymbolTable,
};
use super::{schema::SQLITE_TABLEID, update::translate_update_with_after};
use super::{schema::SQLITE_TABLEID, update::translate_update_for_schema_change};
pub fn translate_alter_table(
alter: (ast::QualifiedName, ast::AlterTableBody),
@@ -21,6 +21,7 @@ pub fn translate_alter_table(
schema: &Schema,
mut program: ProgramBuilder,
connection: &Arc<crate::Connection>,
input: &str,
) -> Result<ProgramBuilder> {
program.begin_write_operation();
let (table_name, alter_table) = alter;
@@ -94,12 +95,13 @@ pub fn translate_alter_table(
unreachable!();
};
translate_update_with_after(
translate_update_for_schema_change(
schema,
&mut update,
syms,
program,
connection,
input,
|program| {
let column_count = btree.columns.len();
let root_page = btree.root_page;
@@ -206,12 +208,13 @@ pub fn translate_alter_table(
unreachable!();
};
translate_update_with_after(
translate_update_for_schema_change(
schema,
&mut update,
syms,
program,
connection,
input,
|program| {
program.emit_insn(Insn::SetCookie {
db: 0,

View File

@@ -601,6 +601,7 @@ fn emit_delete_insns(
rowid_reg,
before_record_reg,
None,
None,
table_reference.table.get_name(),
)?;
}
@@ -868,6 +869,15 @@ fn emit_update_insns(
// we scan a column at a time, loading either the column's values, or the new value
// from the Set expression, into registers so we can emit a MakeRecord and update the row.
// we allocate 2C registers for "updates" as the structure of this column for CDC table is following:
// [C boolean values where true set for changed columns] [C values with updates where NULL is set for not-changed columns]
let cdc_updates_register = if program.capture_data_changes_mode().has_updates() {
Some(program.alloc_registers(2 * table_ref.columns().len()))
} else {
None
};
let start = if is_virtual { beg + 2 } else { beg + 1 };
for (idx, table_column) in table_ref.columns().iter().enumerate() {
let target_reg = start + idx;
@@ -914,6 +924,27 @@ fn emit_update_insns(
});
}
}
if let Some(cdc_updates_register) = cdc_updates_register {
let change_reg = cdc_updates_register + idx;
let value_reg = cdc_updates_register + table_ref.columns().len() + idx;
program.emit_bool(true, change_reg);
program.mark_last_insn_constant();
let mut updated = false;
if let Some(ddl_query_for_cdc_update) = &plan.cdc_update_alter_statement {
if table_column.name.as_deref() == Some("sql") {
program.emit_string8(ddl_query_for_cdc_update.clone(), value_reg);
updated = true;
}
}
if !updated {
program.emit_insn(Insn::Copy {
src_reg: target_reg,
dst_reg: value_reg,
extra_amount: 0,
});
}
}
} else {
let column_idx_in_index = index.as_ref().and_then(|(idx, _)| {
idx.columns
@@ -944,6 +975,15 @@ fn emit_update_insns(
.unwrap_or(&cursor_id);
program.emit_column(cursor_id, column_idx_in_index.unwrap_or(idx), target_reg);
}
if let Some(cdc_updates_register) = cdc_updates_register {
let change_bit_reg = cdc_updates_register + idx;
let value_reg = cdc_updates_register + table_ref.columns().len() + idx;
program.emit_bool(false, change_bit_reg);
program.mark_last_insn_constant();
program.emit_null(value_reg, None);
program.mark_last_insn_constant();
}
}
}
@@ -1222,6 +1262,19 @@ fn emit_update_insns(
None
};
let cdc_updates_record = if let Some(cdc_updates_register) = cdc_updates_register {
let record_reg = program.alloc_register();
program.emit_insn(Insn::MakeRecord {
start_reg: cdc_updates_register,
count: 2 * table_ref.columns().len(),
dest_reg: record_reg,
index_name: None,
});
Some(record_reg)
} else {
None
};
// emit actual CDC instructions for write to the CDC table
if let Some(cdc_cursor_id) = t_ctx.cdc_cursor_id {
let cdc_rowid_before_reg =
@@ -1235,6 +1288,7 @@ fn emit_update_insns(
cdc_rowid_before_reg,
cdc_before_reg,
None,
None,
table_ref.table.get_name(),
)?;
emit_cdc_insns(
@@ -1245,6 +1299,7 @@ fn emit_update_insns(
cdc_rowid_after_reg,
cdc_after_reg,
None,
None,
table_ref.table.get_name(),
)?;
} else {
@@ -1256,6 +1311,7 @@ fn emit_update_insns(
cdc_rowid_before_reg,
cdc_before_reg,
cdc_after_reg,
cdc_updates_record,
table_ref.table.get_name(),
)?;
}
@@ -1377,10 +1433,11 @@ pub fn emit_cdc_insns(
rowid_reg: usize,
before_record_reg: Option<usize>,
after_record_reg: Option<usize>,
updates_record_reg: Option<usize>,
table_name: &str,
) -> Result<()> {
// (change_id INTEGER PRIMARY KEY AUTOINCREMENT, change_time INTEGER, change_type INTEGER, table_name TEXT, id, before BLOB, after BLOB)
let turso_cdc_registers = program.alloc_registers(7);
// (change_id INTEGER PRIMARY KEY AUTOINCREMENT, change_time INTEGER, change_type INTEGER, table_name TEXT, id, before BLOB, after BLOB, updates BLOB)
let turso_cdc_registers = program.alloc_registers(8);
program.emit_insn(Insn::Null {
dest: turso_cdc_registers,
dest_end: None,
@@ -1441,6 +1498,17 @@ pub fn emit_cdc_insns(
program.mark_last_insn_constant();
}
if let Some(updates_record_reg) = updates_record_reg {
program.emit_insn(Insn::Copy {
src_reg: updates_record_reg,
dst_reg: turso_cdc_registers + 7,
extra_amount: 0,
});
} else {
program.emit_null(turso_cdc_registers + 7, None);
program.mark_last_insn_constant();
}
let rowid_reg = program.alloc_register();
program.emit_insn(Insn::NewRowid {
cursor: cdc_cursor_id,
@@ -1451,7 +1519,7 @@ pub fn emit_cdc_insns(
let record_reg = program.alloc_register();
program.emit_insn(Insn::MakeRecord {
start_reg: turso_cdc_registers,
count: 7,
count: 8,
dest_reg: record_reg,
index_name: None,
});

View File

@@ -456,6 +456,7 @@ pub fn translate_drop_index(
row_id_reg,
before_record_reg,
None,
None,
SQLITE_TABLEID,
)?;
}

View File

@@ -580,6 +580,7 @@ pub fn translate_insert(
rowid_and_columns_start_register,
None,
after_record_reg,
None,
table_name.as_str(),
)?;
}

View File

@@ -99,7 +99,7 @@ pub fn translate(
connection.clone(),
program,
)?,
stmt => translate_inner(schema, stmt, syms, program, &connection)?,
stmt => translate_inner(schema, stmt, syms, program, &connection, input)?,
};
program.epilogue(schema);
@@ -116,6 +116,7 @@ pub fn translate_inner(
syms: &SymbolTable,
program: ProgramBuilder,
connection: &Arc<Connection>,
input: &str,
) -> Result<ProgramBuilder> {
let is_write = matches!(
stmt,
@@ -142,7 +143,7 @@ pub fn translate_inner(
let mut program = match stmt {
ast::Stmt::AlterTable(alter) => {
translate_alter_table(*alter, syms, schema, program, connection)?
translate_alter_table(*alter, syms, schema, program, connection, input)?
}
ast::Stmt::Analyze(_) => bail_parse_error!("ANALYZE not supported yet"),
ast::Stmt::Attach { expr, db_name, key } => {

View File

@@ -397,6 +397,9 @@ pub struct UpdatePlan {
pub indexes_to_update: Vec<Arc<Index>>,
// If the table's rowid alias is used, gather all the target rowids into an ephemeral table, and then use that table as the single JoinedTable for the actual UPDATE loop.
pub ephemeral_plan: Option<SelectPlan>,
// For ALTER TABLE turso-db emits appropriate DDL statement in the "updates" cell of CDC table
// This field is present only for update plan created for ALTER TABLE when CDC mode has "updates" values
pub cdc_update_alter_statement: Option<String>,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]

View File

@@ -731,6 +731,14 @@ fn turso_cdc_table_columns() -> Vec<ColumnDefinition> {
}),
constraints: vec![],
},
ast::ColumnDefinition {
col_name: ast::Name::from_str("updates"),
col_type: Some(ast::Type {
name: "BLOB".to_string(),
size: None,
}),
constraints: vec![],
},
]
}

View File

@@ -270,6 +270,7 @@ pub fn emit_schema_entry(
rowid_reg,
None,
after_record_reg,
None,
SQLITE_TABLEID,
)?;
}
@@ -775,6 +776,7 @@ pub fn translate_drop_table(
row_id_reg,
before_record_reg,
None,
None,
SQLITE_TABLEID,
)?;
program.resolve_label(skip_cdc_label, program.offset());

View File

@@ -72,15 +72,23 @@ pub fn translate_update(
Ok(program)
}
pub fn translate_update_with_after(
pub fn translate_update_for_schema_change(
schema: &Schema,
body: &mut Update,
syms: &SymbolTable,
mut program: ProgramBuilder,
connection: &Arc<crate::Connection>,
ddl_query: &str,
after: impl FnOnce(&mut ProgramBuilder),
) -> crate::Result<ProgramBuilder> {
let mut plan = prepare_update_plan(&mut program, schema, body, connection)?;
if let Plan::Update(plan) = &mut plan {
if program.capture_data_changes_mode().has_updates() {
plan.cdc_update_alter_statement = Some(ddl_query.to_string());
}
}
optimize_plan(&mut plan, schema)?;
// TODO: freestyling these numbers
let opts = ProgramBuilderOpts {
@@ -368,6 +376,7 @@ pub fn prepare_update_plan(
contains_constant_false_condition: false,
indexes_to_update,
ephemeral_plan,
cdc_update_alter_statement: None,
}))
}

View File

@@ -44,6 +44,7 @@ fn test_cdc_simple_id() {
Value::Integer(10),
Value::Null,
Value::Null,
Value::Null,
],
vec![
Value::Integer(2),
@@ -53,6 +54,7 @@ fn test_cdc_simple_id() {
Value::Integer(5),
Value::Null,
Value::Null,
Value::Null,
]
]
);
@@ -99,6 +101,7 @@ fn test_cdc_simple_before() {
Value::Integer(1),
Value::Null,
Value::Null,
Value::Null,
],
vec![
Value::Integer(2),
@@ -108,6 +111,7 @@ fn test_cdc_simple_before() {
Value::Integer(3),
Value::Null,
Value::Null,
Value::Null,
],
vec![
Value::Integer(3),
@@ -117,6 +121,7 @@ fn test_cdc_simple_before() {
Value::Integer(1),
Value::Blob(record([Value::Integer(1), Value::Integer(2)])),
Value::Null,
Value::Null,
],
vec![
Value::Integer(4),
@@ -126,6 +131,7 @@ fn test_cdc_simple_before() {
Value::Integer(3),
Value::Blob(record([Value::Integer(3), Value::Integer(4)])),
Value::Null,
Value::Null,
],
vec![
Value::Integer(5),
@@ -135,6 +141,7 @@ fn test_cdc_simple_before() {
Value::Integer(1),
Value::Blob(record([Value::Integer(1), Value::Integer(3)])),
Value::Null,
Value::Null,
]
]
);
@@ -165,6 +172,7 @@ fn test_cdc_simple_after() {
Value::Integer(1),
Value::Null,
Value::Blob(record([Value::Integer(1), Value::Integer(2)])),
Value::Null,
],
vec![
Value::Integer(2),
@@ -174,6 +182,7 @@ fn test_cdc_simple_after() {
Value::Integer(3),
Value::Null,
Value::Blob(record([Value::Integer(3), Value::Integer(4)])),
Value::Null,
],
vec![
Value::Integer(3),
@@ -183,6 +192,7 @@ fn test_cdc_simple_after() {
Value::Integer(1),
Value::Null,
Value::Blob(record([Value::Integer(1), Value::Integer(3)])),
Value::Null,
],
vec![
Value::Integer(4),
@@ -192,6 +202,7 @@ fn test_cdc_simple_after() {
Value::Integer(3),
Value::Null,
Value::Null,
Value::Null,
],
vec![
Value::Integer(5),
@@ -201,6 +212,7 @@ fn test_cdc_simple_after() {
Value::Integer(1),
Value::Null,
Value::Null,
Value::Null,
]
]
);
@@ -231,6 +243,7 @@ fn test_cdc_simple_full() {
Value::Integer(1),
Value::Null,
Value::Blob(record([Value::Integer(1), Value::Integer(2)])),
Value::Null,
],
vec![
Value::Integer(2),
@@ -240,6 +253,7 @@ fn test_cdc_simple_full() {
Value::Integer(3),
Value::Null,
Value::Blob(record([Value::Integer(3), Value::Integer(4)])),
Value::Null,
],
vec![
Value::Integer(3),
@@ -249,6 +263,12 @@ fn test_cdc_simple_full() {
Value::Integer(1),
Value::Blob(record([Value::Integer(1), Value::Integer(2)])),
Value::Blob(record([Value::Integer(1), Value::Integer(3)])),
Value::Blob(record([
Value::Integer(0),
Value::Integer(1),
Value::Null,
Value::Integer(3)
])),
],
vec![
Value::Integer(4),
@@ -258,6 +278,7 @@ fn test_cdc_simple_full() {
Value::Integer(3),
Value::Blob(record([Value::Integer(3), Value::Integer(4)])),
Value::Null,
Value::Null,
],
vec![
Value::Integer(5),
@@ -267,6 +288,7 @@ fn test_cdc_simple_full() {
Value::Integer(1),
Value::Blob(record([Value::Integer(1), Value::Integer(3)])),
Value::Null,
Value::Null,
]
]
);
@@ -307,6 +329,7 @@ fn test_cdc_crud() {
Value::Integer(20),
Value::Null,
Value::Null,
Value::Null,
],
vec![
Value::Integer(2),
@@ -316,6 +339,7 @@ fn test_cdc_crud() {
Value::Integer(10),
Value::Null,
Value::Null,
Value::Null,
],
vec![
Value::Integer(3),
@@ -325,6 +349,7 @@ fn test_cdc_crud() {
Value::Integer(5),
Value::Null,
Value::Null,
Value::Null,
],
vec![
Value::Integer(4),
@@ -334,6 +359,7 @@ fn test_cdc_crud() {
Value::Integer(5),
Value::Null,
Value::Null,
Value::Null,
],
vec![
Value::Integer(5),
@@ -343,6 +369,7 @@ fn test_cdc_crud() {
Value::Integer(10),
Value::Null,
Value::Null,
Value::Null,
],
vec![
Value::Integer(6),
@@ -352,6 +379,7 @@ fn test_cdc_crud() {
Value::Integer(20),
Value::Null,
Value::Null,
Value::Null,
],
vec![
Value::Integer(7),
@@ -361,6 +389,7 @@ fn test_cdc_crud() {
Value::Integer(1),
Value::Null,
Value::Null,
Value::Null,
],
vec![
Value::Integer(8),
@@ -370,6 +399,7 @@ fn test_cdc_crud() {
Value::Integer(1),
Value::Null,
Value::Null,
Value::Null,
],
vec![
Value::Integer(9),
@@ -379,6 +409,7 @@ fn test_cdc_crud() {
Value::Integer(2),
Value::Null,
Value::Null,
Value::Null,
],
]
);
@@ -422,6 +453,7 @@ fn test_cdc_failed_op() {
Value::Integer(1),
Value::Null,
Value::Null,
Value::Null,
],
vec![
Value::Integer(2),
@@ -431,6 +463,7 @@ fn test_cdc_failed_op() {
Value::Integer(2),
Value::Null,
Value::Null,
Value::Null,
],
vec![
Value::Integer(3),
@@ -440,6 +473,7 @@ fn test_cdc_failed_op() {
Value::Integer(6),
Value::Null,
Value::Null,
Value::Null,
],
vec![
Value::Integer(4),
@@ -449,6 +483,7 @@ fn test_cdc_failed_op() {
Value::Integer(7),
Value::Null,
Value::Null,
Value::Null,
],
]
);
@@ -508,6 +543,7 @@ fn test_cdc_uncaptured_connection() {
Value::Integer(2),
Value::Null,
Value::Null,
Value::Null,
],
vec![
Value::Integer(2),
@@ -517,6 +553,7 @@ fn test_cdc_uncaptured_connection() {
Value::Integer(4),
Value::Null,
Value::Null,
Value::Null,
],
vec![
Value::Integer(3),
@@ -526,6 +563,7 @@ fn test_cdc_uncaptured_connection() {
Value::Integer(6),
Value::Null,
Value::Null,
Value::Null,
],
]
);
@@ -564,6 +602,7 @@ fn test_cdc_custom_table() {
Value::Integer(1),
Value::Null,
Value::Null,
Value::Null,
],
vec![
Value::Integer(2),
@@ -573,6 +612,7 @@ fn test_cdc_custom_table() {
Value::Integer(2),
Value::Null,
Value::Null,
Value::Null,
],
]
);
@@ -613,6 +653,7 @@ fn test_cdc_ignore_changes_in_cdc_table() {
Value::Integer(2),
Value::Null,
Value::Null,
Value::Null,
],]
);
}
@@ -654,6 +695,7 @@ fn test_cdc_transaction() {
Value::Integer(1),
Value::Null,
Value::Null,
Value::Null,
],
vec![
Value::Integer(2),
@@ -663,6 +705,7 @@ fn test_cdc_transaction() {
Value::Integer(2),
Value::Null,
Value::Null,
Value::Null,
],
vec![
Value::Integer(3),
@@ -672,6 +715,7 @@ fn test_cdc_transaction() {
Value::Integer(3),
Value::Null,
Value::Null,
Value::Null,
],
vec![
Value::Integer(4),
@@ -681,6 +725,7 @@ fn test_cdc_transaction() {
Value::Integer(1),
Value::Null,
Value::Null,
Value::Null,
],
vec![
Value::Integer(5),
@@ -690,6 +735,7 @@ fn test_cdc_transaction() {
Value::Integer(2),
Value::Null,
Value::Null,
Value::Null,
],
]
);
@@ -731,6 +777,7 @@ fn test_cdc_independent_connections() {
Value::Integer(1),
Value::Null,
Value::Null,
Value::Null,
]]
);
let rows =
@@ -745,6 +792,7 @@ fn test_cdc_independent_connections() {
Value::Integer(2),
Value::Null,
Value::Null,
Value::Null,
]]
);
}
@@ -796,6 +844,7 @@ fn test_cdc_independent_connections_different_cdc_not_ignore() {
Value::Integer(2),
Value::Null,
Value::Null,
Value::Null,
],
vec![
Value::Integer(3),
@@ -805,6 +854,7 @@ fn test_cdc_independent_connections_different_cdc_not_ignore() {
Value::Integer(1),
Value::Null,
Value::Null,
Value::Null,
]
]
);
@@ -821,6 +871,7 @@ fn test_cdc_independent_connections_different_cdc_not_ignore() {
Value::Integer(4),
Value::Null,
Value::Null,
Value::Null,
],
vec![
Value::Integer(3),
@@ -830,6 +881,7 @@ fn test_cdc_independent_connections_different_cdc_not_ignore() {
Value::Integer(1),
Value::Null,
Value::Null,
Value::Null,
]
]
);
@@ -914,6 +966,7 @@ fn test_cdc_schema_changes() {
"CREATE TABLE t (x, y, z UNIQUE, q, PRIMARY KEY (x, y))".to_string()
)
])),
Value::Null,
],
vec![
Value::Integer(2),
@@ -929,6 +982,7 @@ fn test_cdc_schema_changes() {
Value::Integer(6),
Value::Text("CREATE TABLE q (a, b, c)".to_string())
])),
Value::Null,
],
vec![
Value::Integer(3),
@@ -944,6 +998,7 @@ fn test_cdc_schema_changes() {
Value::Integer(7),
Value::Text("CREATE INDEX t_q ON t (q)".to_string())
])),
Value::Null,
],
vec![
Value::Integer(4),
@@ -959,6 +1014,7 @@ fn test_cdc_schema_changes() {
Value::Integer(8),
Value::Text("CREATE INDEX q_abc ON q (a, b, c)".to_string())
])),
Value::Null,
],
vec![
Value::Integer(5),
@@ -976,6 +1032,7 @@ fn test_cdc_schema_changes() {
)
])),
Value::Null,
Value::Null,
],
vec![
Value::Integer(6),
@@ -991,6 +1048,7 @@ fn test_cdc_schema_changes() {
Value::Text("CREATE INDEX q_abc ON q (a, b, c)".to_string())
])),
Value::Null,
Value::Null,
]
]
);
@@ -1027,6 +1085,7 @@ fn test_cdc_schema_changes_alter_table() {
"CREATE TABLE t (x, y, z UNIQUE, q, PRIMARY KEY (x, y))".to_string()
)
])),
Value::Null,
],
vec![
Value::Integer(2),
@@ -1052,6 +1111,18 @@ fn test_cdc_schema_changes_alter_table() {
"CREATE TABLE t (x PRIMARY KEY, y PRIMARY KEY, z UNIQUE)".to_string()
)
])),
Value::Blob(record([
Value::Integer(0),
Value::Integer(0),
Value::Integer(0),
Value::Integer(0),
Value::Integer(1),
Value::Null,
Value::Null,
Value::Null,
Value::Null,
Value::Text("ALTER TABLE t DROP COLUMN q".to_string())
])),
],
vec![
Value::Integer(3),
@@ -1077,6 +1148,18 @@ fn test_cdc_schema_changes_alter_table() {
"CREATE TABLE t (x PRIMARY KEY, y PRIMARY KEY, z UNIQUE, t)".to_string()
)
])),
Value::Blob(record([
Value::Integer(0),
Value::Integer(0),
Value::Integer(0),
Value::Integer(0),
Value::Integer(1),
Value::Null,
Value::Null,
Value::Null,
Value::Null,
Value::Text("ALTER TABLE t ADD COLUMN t".to_string())
])),
],
]
);