generate before/after row values in modification statements

This commit is contained in:
Nikita Sivukhin
2025-07-10 15:24:48 +04:00
parent 9129991b62
commit b258c10c9a
2 changed files with 45 additions and 9 deletions

View File

@@ -149,7 +149,7 @@ pub struct TranslateCtx<'a> {
/// - First: all `GROUP BY` expressions, in the order they appear in the `GROUP BY` clause.
/// - Then: remaining non-aggregate expressions that are not part of `GROUP BY`.
pub non_aggregate_expressions: Vec<(&'a Expr, bool)>,
/// Cursor id for turso_cdc table (if capture_data_changes=on is set and query can modify the data)
/// Cursor id for cdc table (if capture_data_changes PRAGMA is set and query can modify the data)
pub cdc_cursor_id: Option<usize>,
}
@@ -570,19 +570,36 @@ fn emit_delete_insns(
}
}
if let Some(turso_cdc_cursor_id) = t_ctx.cdc_cursor_id {
if let Some(cdc_cursor_id) = t_ctx.cdc_cursor_id {
let rowid_reg = program.alloc_register();
program.emit_insn(Insn::RowId {
cursor_id: main_table_cursor_id,
dest: rowid_reg,
});
let cdc_has_before = program.capture_data_changes_mode().has_before();
let before_record_reg = if cdc_has_before {
let columns = table_reference.columns();
let columns_reg = program.alloc_registers(columns.len() + 1);
for i in 0..columns.len() {
program.emit_column(main_table_cursor_id, i, columns_reg + 1 + i);
}
program.emit_insn(Insn::MakeRecord {
start_reg: columns_reg + 1,
count: columns.len(),
dest_reg: columns_reg,
index_name: None,
});
Some(columns_reg)
} else {
None
};
emit_cdc_insns(
program,
&t_ctx.resolver,
OperationMode::DELETE,
turso_cdc_cursor_id,
cdc_cursor_id,
rowid_reg,
None,
before_record_reg,
None,
table_reference.table.get_name(),
)?;
@@ -1101,6 +1118,25 @@ fn emit_update_insns(
if let Some(cdc_cursor_id) = t_ctx.cdc_cursor_id {
let rowid_reg = program.alloc_register();
let has_after = program.capture_data_changes_mode().has_after();
let has_before = program.capture_data_changes_mode().has_before();
let before_record_reg = if has_before {
let columns = table_ref.columns();
let columns_reg = program.alloc_registers(columns.len() + 1);
for i in 0..columns.len() {
program.emit_column(cursor_id, i, columns_reg + 1 + i);
}
program.emit_insn(Insn::MakeRecord {
start_reg: columns_reg + 1,
count: columns.len(),
dest_reg: columns_reg,
index_name: None,
});
Some(columns_reg)
} else {
None
};
let after_record_reg = if has_after { Some(record_reg) } else { None };
if has_user_provided_rowid {
program.emit_insn(Insn::RowId {
cursor_id,
@@ -1112,7 +1148,7 @@ fn emit_update_insns(
OperationMode::DELETE,
cdc_cursor_id,
rowid_reg,
None,
before_record_reg,
None,
table_ref.table.get_name(),
)?;
@@ -1130,7 +1166,7 @@ fn emit_update_insns(
cdc_cursor_id,
rowid_reg,
None,
None,
after_record_reg,
table_ref.table.get_name(),
)?;
} else {
@@ -1145,8 +1181,8 @@ fn emit_update_insns(
OperationMode::UPDATE,
cdc_cursor_id,
rowid_reg,
None,
None,
before_record_reg,
after_record_reg,
table_ref.table.get_name(),
)?;
}

View File

@@ -118,7 +118,6 @@ pub fn translate_insert(
let loop_start_label = program.allocate_label();
let cdc_table = program.capture_data_changes_mode().table();
let cdc_has_after = program.capture_data_changes_mode().has_after();
let cdc_table = if let Some(cdc_table) = cdc_table {
if table.get_name() != cdc_table {
let Some(turso_cdc_table) = schema.get_table(cdc_table) else {
@@ -566,6 +565,7 @@ pub fn translate_insert(
// Write record to the turso_cdc table if necessary
if let Some((cdc_cursor_id, _)) = &cdc_table {
let cdc_has_after = program.capture_data_changes_mode().has_after();
let after_record_reg = if cdc_has_after {
Some(record_register)
} else {