Use new datastructures and functions in translate_insert

This commit is contained in:
Jussi Saurio
2025-08-21 12:31:19 +03:00
parent 88c4eae63e
commit ac56d5bb67

View File

@@ -319,32 +319,12 @@ pub fn translate_insert(
})
.collect::<Vec<(&String, usize, usize)>>();
let column_mappings = resolve_columns_for_insert(&table, &columns, num_values)?;
let has_user_provided_rowid = column_mappings
.iter()
.any(|x| x.value_index.is_some() && x.column.is_rowid_alias);
// allocate a register for each column in the table. if not provided by user, they will simply be set as null.
// allocate an extra register for rowid regardless of whether user provided a rowid alias column.
let num_cols = btree_table.columns.len();
let rowid_and_columns_start_register = program.alloc_registers(num_cols + 1);
let columns_start_register = rowid_and_columns_start_register + 1;
let record_register = program.alloc_register();
let insertion = build_insertion(&mut program, &table, &columns, num_values)?;
if inserting_multiple_rows {
if let Some(ref temp_table_ctx) = temp_table_ctx {
// Rewind loop to read from ephemeral table
program.emit_insn(Insn::Rewind {
cursor_id: temp_table_ctx.cursor_id,
pc_if_empty: temp_table_ctx.loop_end_label,
});
program.preassign_label_to_next_insn(temp_table_ctx.loop_start_label);
}
populate_columns_multiple_rows(
translate_rows_multiple(
&mut program,
&column_mappings,
rowid_and_columns_start_register,
&insertion,
yield_reg_opt.unwrap() + 1,
&resolver,
&temp_table_ctx,
@@ -357,13 +337,7 @@ pub fn translate_insert(
db: 0,
});
populate_column_registers(
&mut program,
&values.unwrap(),
&column_mappings,
rowid_and_columns_start_register,
&resolver,
)?;
translate_rows_single(&mut program, &values.unwrap(), &insertion, &resolver)?;
}
// Open all the index btrees for writing
@@ -375,6 +349,7 @@ pub fn translate_insert(
});
}
// Common record insertion logic for both single and multiple rows
let has_user_provided_rowid = insertion.key.is_provided_by_user();
let check_rowid_is_integer_label = if has_user_provided_rowid {
Some(program.allocate_label())
} else {
@@ -382,7 +357,7 @@ pub fn translate_insert(
};
if has_user_provided_rowid {
program.emit_insn(Insn::NotNull {
reg: rowid_and_columns_start_register,
reg: insertion.key_register(),
target_pc: check_rowid_is_integer_label.unwrap(),
});
}
@@ -390,7 +365,7 @@ pub fn translate_insert(
// Create new rowid if a) not provided by user or b) provided by user but is NULL
program.emit_insn(Insn::NewRowid {
cursor: cursor_id,
rowid_reg: rowid_and_columns_start_register,
rowid_reg: insertion.key_register(),
prev_largest_reg: 0,
});
@@ -398,7 +373,7 @@ pub fn translate_insert(
program.resolve_label(must_be_int_label, program.offset());
// If the user provided a rowid, it must be an integer.
program.emit_insn(Insn::MustBeInt {
reg: rowid_and_columns_start_register,
reg: insertion.key_register(),
});
}
@@ -408,14 +383,10 @@ pub fn translate_insert(
let make_record_label = program.allocate_label();
program.emit_insn(Insn::NotExists {
cursor: cursor_id,
rowid_reg: rowid_and_columns_start_register,
rowid_reg: insertion.key_register(),
target_pc: make_record_label,
});
let rowid_column = column_mappings
.iter()
.find(|x| x.value_index.is_some() && x.column.is_rowid_alias)
.expect("rowid alias column must be provided");
let rowid_column_name = rowid_column.column.name.as_deref().unwrap_or(ROWID);
let rowid_column_name = insertion.key.column_name();
program.emit_insn(Insn::Halt {
err_code: SQLITE_CONSTRAINT_PRIMARYKEY,
description: format!("{}.{}", table_name.as_str(), rowid_column_name),
@@ -426,8 +397,8 @@ pub fn translate_insert(
match table.btree() {
Some(t) if t.is_strict => {
program.emit_insn(Insn::TypeCheck {
start_reg: columns_start_register,
count: num_cols,
start_reg: insertion.first_col_register(),
count: insertion.col_mappings.len(),
check_generated: true,
table_reference: Arc::clone(&t),
});
@@ -435,46 +406,44 @@ pub fn translate_insert(
_ => (),
}
let index_col_mappings = resolve_indicies_for_insert(schema, table.as_ref(), &column_mappings)?;
for index_col_mapping in index_col_mappings {
for index in schema.get_indices(table_name.as_str()) {
let column_mappings = index
.columns
.iter()
.map(|idx_col| insertion.get_col_mapping_by_name(&idx_col.name));
// find which cursor we opened earlier for this index
let idx_cursor_id = idx_cursors
.iter()
.find(|(name, _, _)| *name == &index_col_mapping.idx_name)
.find(|(name, _, _)| *name == &index.name)
.map(|(_, _, c_id)| *c_id)
.expect("no cursor found for index");
let num_cols = index_col_mapping.columns.len();
let num_cols = index.columns.len();
// allocate scratch registers for the index columns plus rowid
let idx_start_reg = program.alloc_registers(num_cols + 1);
// copy each index column from the table's column registers into these scratch regs
for (i, col) in index_col_mapping.columns.iter().enumerate() {
for (i, column_mapping) in column_mappings.clone().enumerate() {
// copy from the table's column register over to the index's scratch register
program.emit_insn(Insn::Copy {
src_reg: rowid_and_columns_start_register + col.0,
src_reg: column_mapping.register,
dst_reg: idx_start_reg + i,
extra_amount: 0,
});
}
// last register is the rowid
program.emit_insn(Insn::Copy {
src_reg: rowid_and_columns_start_register,
src_reg: insertion.key_register(),
dst_reg: idx_start_reg + num_cols,
extra_amount: 0,
});
let index = schema
.get_index(table_name.as_str(), &index_col_mapping.idx_name)
.expect("index should be present");
let record_reg = program.alloc_register();
program.emit_insn(Insn::MakeRecord {
start_reg: idx_start_reg,
count: num_cols + 1,
dest_reg: record_reg,
index_name: Some(index_col_mapping.idx_name),
index_name: Some(index.name.clone()),
});
if index.unique {
@@ -485,25 +454,15 @@ pub fn translate_insert(
record_reg: idx_start_reg,
num_regs: num_cols,
});
let column_names = index_col_mapping.columns.iter().enumerate().fold(
let column_names = index.columns.iter().enumerate().fold(
String::with_capacity(50),
|mut accum, (idx, (index, _))| {
|mut accum, (idx, column)| {
if idx > 0 {
accum.push_str(", ");
}
accum.push_str(&btree_table.name);
accum.push_str(&index.name);
accum.push('.');
let name = column_mappings
.get(*index)
.unwrap()
.column
.name
.as_ref()
.expect("column name is None");
accum.push_str(name);
accum.push_str(&column.name);
accum
},
);
@@ -527,23 +486,23 @@ pub fn translate_insert(
});
}
for (i, col) in column_mappings
for column_mapping in insertion
.col_mappings
.iter()
.enumerate()
.filter(|(_, col)| col.column.notnull)
.filter(|column_mapping| column_mapping.column.notnull)
{
// if this is rowid alias - turso-db will emit NULL as a column value and always use rowid for the row as a column value
if col.column.is_rowid_alias {
if column_mapping.column.is_rowid_alias {
continue;
}
let target_reg = i + rowid_and_columns_start_register;
program.emit_insn(Insn::HaltIfNull {
target_reg,
target_reg: column_mapping.register,
err_code: SQLITE_CONSTRAINT_NOTNULL,
description: format!(
"{}.{}",
table_name,
col.column
column_mapping
.column
.name
.as_ref()
.expect("Column name must be present")
@@ -552,15 +511,15 @@ pub fn translate_insert(
}
// Create and insert the record
program.emit_insn(Insn::MakeRecord {
start_reg: columns_start_register,
count: num_cols,
dest_reg: record_register,
start_reg: insertion.first_col_register(),
count: insertion.col_mappings.len(),
dest_reg: insertion.record_register(),
index_name: None,
});
program.emit_insn(Insn::Insert {
cursor: cursor_id,
key_reg: rowid_and_columns_start_register,
record_reg: record_register,
key_reg: insertion.key_register(),
record_reg: insertion.record_register(),
flag: InsertFlags::new(),
table_name: table_name.to_string(),
});
@@ -572,9 +531,9 @@ pub fn translate_insert(
Some(emit_cdc_patch_record(
&mut program,
&table,
columns_start_register,
record_register,
rowid_and_columns_start_register,
insertion.first_col_register(),
insertion.record_register(),
insertion.key_register(),
))
} else {
None
@@ -584,7 +543,7 @@ pub fn translate_insert(
&resolver,
OperationMode::INSERT,
*cdc_cursor_id,
rowid_and_columns_start_register,
insertion.key_register(),
None,
after_record_reg,
None,
@@ -595,8 +554,8 @@ pub fn translate_insert(
// Emit RETURNING results if specified
if !result_columns.is_empty() {
let value_registers = ReturningValueRegisters {
rowid_register: rowid_and_columns_start_register,
columns_start_register,
rowid_register: insertion.key_register(),
columns_start_register: insertion.first_col_register(),
num_columns: table.columns().len(),
};
@@ -1349,35 +1308,29 @@ fn translate_virtual_table_insert(
_ => crate::bail_parse_error!("Unsupported INSERT body for virtual tables"),
};
let table = Table::Virtual(virtual_table.clone());
let column_mappings = resolve_columns_for_insert(&table, &columns, num_values)?;
let registers_start = program.alloc_registers(column_mappings.len() + 1);
/* *
* Inserts for virtual tables are done in a single step.
* argv[0] = (NULL for insert)
* argv[1] = (rowid for insert - NULL in most cases)
* argv[2..] = column values
* */
*/
let registers_start = program.alloc_register();
program.emit_insn(Insn::Null {
dest: registers_start,
dest_end: None,
});
/* *
* argv[1] = (rowid for insert - NULL in most cases)
* argv[2..] = column values
* */
let insertion = build_insertion(&mut program, &table, &columns, num_values)?;
populate_column_registers(
&mut program,
&value,
&column_mappings,
registers_start + 1,
resolver,
)?;
translate_rows_single(&mut program, &value, &insertion, resolver)?;
let conflict_action = on_conflict.as_ref().map(|c| c.bit_value()).unwrap_or(0) as u16;
let cursor_id = program.alloc_cursor_id(CursorType::VirtualTable(virtual_table.clone()));
program.emit_insn(Insn::VUpdate {
cursor_id,
arg_count: column_mappings.len() + 1,
arg_count: insertion.col_mappings.len() + 2, // +1 for NULL, +1 for rowid
start_reg: registers_start,
conflict_action,
});