avoid capturing changes in cdc table

This commit is contained in:
Nikita Sivukhin
2025-07-06 22:24:35 +04:00
parent a988bbaffe
commit 32fa2ac3ee
2 changed files with 30 additions and 23 deletions

View File

@@ -119,16 +119,20 @@ pub fn translate_insert(
let cdc_table = program.capture_data_changes_mode().table();
let cdc_table = if let Some(cdc_table) = cdc_table {
let Some(turso_cdc_table) = schema.get_table(&cdc_table) else {
crate::bail_parse_error!("no such table: {}", cdc_table);
};
let Some(cdc_btree) = turso_cdc_table.btree().clone() else {
crate::bail_parse_error!("no such table: {}", cdc_table);
};
Some((
program.alloc_cursor_id(CursorType::BTreeTable(cdc_btree.clone())),
cdc_btree,
))
if table.get_name() != cdc_table {
let Some(turso_cdc_table) = schema.get_table(&cdc_table) else {
crate::bail_parse_error!("no such table: {}", cdc_table);
};
let Some(cdc_btree) = turso_cdc_table.btree().clone() else {
crate::bail_parse_error!("no such table: {}", cdc_table);
};
Some((
program.alloc_cursor_id(CursorType::BTreeTable(cdc_btree.clone())),
cdc_btree,
))
} else {
None
}
} else {
None
};

View File

@@ -125,20 +125,23 @@ pub fn init_loop(
OperationMode::INSERT | OperationMode::UPDATE | OperationMode::DELETE
)
{
assert!(tables.joined_tables().len() == 1);
let cdc_table_name = cdc_table.unwrap();
let Some(cdc_table) = t_ctx.resolver.schema.get_table(cdc_table_name) else {
crate::bail_parse_error!("no such table: {}", cdc_table_name);
};
let Some(cdc_btree) = cdc_table.btree().clone() else {
crate::bail_parse_error!("no such table: {}", cdc_table_name);
};
let cdc_cursor_id = program.alloc_cursor_id(CursorType::BTreeTable(cdc_btree.clone()));
program.emit_insn(Insn::OpenWrite {
cursor_id: cdc_cursor_id,
root_page: cdc_btree.root_page.into(),
name: cdc_btree.name.clone(),
});
t_ctx.cdc_cursor_id = Some(cdc_cursor_id);
if tables.joined_tables()[0].table.get_name() != cdc_table_name {
let Some(cdc_table) = t_ctx.resolver.schema.get_table(cdc_table_name) else {
crate::bail_parse_error!("no such table: {}", cdc_table_name);
};
let Some(cdc_btree) = cdc_table.btree().clone() else {
crate::bail_parse_error!("no such table: {}", cdc_table_name);
};
let cdc_cursor_id = program.alloc_cursor_id(CursorType::BTreeTable(cdc_btree.clone()));
program.emit_insn(Insn::OpenWrite {
cursor_id: cdc_cursor_id,
root_page: cdc_btree.root_page.into(),
name: cdc_btree.name.clone(),
});
t_ctx.cdc_cursor_id = Some(cdc_cursor_id);
}
}
// Initialize ephemeral indexes for distinct aggregates