Merge 'Use more structured approach in translate_insert' from Jussi Saurio

Closes #2686
**Note**: this PR also incorporates #2700, which I cannot merge
separately because it reveals the bug described in #2686, which nothing
on `main` currently detects.
_(Also as background, the way this all started was: I was trying to
enable `UNIQUE` and `PRIMARY KEY` usage in `simulator` in #2641 , but
couldn't because it would constantly fail due to #2686.)_
---
I'll admit this PR went in a different direction than I had envisioned.
I was trying to debug and minimally fix #2686, but was finding it
increasingly hard to understand the flow of `translate_insert` and fix
this specific issue without breaking something else. So, I thought it
might be benefit from restructuring.
---
## Functional changes
- Fixes #2686.
    * If an index contained a `rowid` alias column, we were inserting
`NULL` into the index instead of the actual integer value.
    * The root cause of this is that SQLite does insert `NULL` in place
of the rowid alias column into the table, presumably to save space.
    * This is not a problem for tables as `SELECT`ing the rowid alias
column will always be mapped into `Insn::Rowid`, but it is a major
problem for indexes as index lookups will never find anything.
## Code structure changes
The responsibility of holding the information about what to insert is
now contained in these new data structures:
```rust
/// Represents how a table should be populated during an INSERT.
#[derive(Debug)]
struct Insertion<'a> {
    /// The integer key ("rowid") provided to the VDBE.
    key: InsertionKey<'a>,
    /// The column values that will be fed to the MakeRecord instruction to insert the row.
    /// If the table has a rowid alias column, it will also be included in this record,
    /// but a NULL will be stored for it.
    col_mappings: Vec<ColumnMapping<'a>>,
    /// The register that will contain the record built using the MakeRecord instruction.
    record_reg: usize,
}

#[derive(Debug)]
enum InsertionKey<'a> {
    /// Rowid is not provided by user and will be autogenerated.
    Autogenerated { register: usize },
    /// Rowid is provided via the 'rowid' keyword.
    LiteralRowid {
        value_index: Option<usize>,
        register: usize,
    },
    /// Rowid is provided via a rowid alias column.
    RowidAlias(ColumnMapping<'a>),
}

/// Represents how a column in a table should be populated during an INSERT.
/// In a vector of InsertionMapping, the index of a given InsertionMapping is
/// the position of the column in the table.
#[derive(Debug)]
struct ColumnMapping<'a> {
    /// Column definition
    column: &'a Column,
    /// Index of the value to use from a tuple in the insert statement.
    /// This is needed because the values in the insert statement are not necessarily
    /// in the same order as the columns in the table, nor do they necessarily contain
    /// all of the columns in the table.
    /// If None, a NULL will be emitted for the column, unless it has a default value.
    /// A NULL rowid alias column's value will be autogenerated.
    value_index: Option<usize>,
    /// Register where the value will be stored for insertion into the table.
    register: usize,
}
```
---
This gets rid of a few things that are a bit hard to follow in the
current implementation:
1. Needing to keep track of "the last rowid explicit value" and other
weird edge case code related to rowids
```rust
// <old code>

// In case when both rowid and rowid-alias column provided in the query - turso-db overwrite rowid with **latest** value from the list
// As we iterate by column in natural order of their definition in scheme,
// we need to track last value_index we wrote to the rowid and overwrite rowid register only if new value_index is greater
let mut last_rowid_explicit_value = None;

...
// <more old code>

// When inserting a single row, SQLite writes the value provided for the rowid alias column (INTEGER PRIMARY KEY)
// directly into the rowid register and writes a NULL into the rowid alias column.
let write_directly_to_rowid_reg = mapping.column.is_rowid_alias;
let write_reg = if write_directly_to_rowid_reg {
    if last_rowid_explicit_value.is_some_and(|x| x > value_index) {
        continue;
    }
    last_rowid_explicit_value = Some(value_index);
    column_registers_start // rowid always the first register in the array for insertion record
} else {
    column_register
};
```
Instead when the `Insertion` struct is constructed, it will simply
overwrite `InsertionKey` if a rowid reference is encountered multiple
times, so we naturally use the "last seen" rowid value. Moreover,
`InsertionKey` is always translated first, so we need no special logic
for it:
```rust
// <new code>
translate_key(program, insertion, &mut translate_value_fn, resolver)?;
for col in insertion.col_mappings.iter() {
    translate_column(
        program,
        col.column,
        col.register,
        col.value_index,
        &mut translate_value_fn,
        resolver,
    )?;
}
```
---
2. Needing to keep track of registers in the main execution flow:
```rust
// <old code>

// allocate a register for each column in the table. if not provided by user, they will simply be set as null.
// allocate an extra register for rowid regardless of whether user provided a rowid alias column.
let num_cols = btree_table.columns.len();
let rowid_and_columns_start_register = program.alloc_registers(num_cols + 1);
let columns_start_register = rowid_and_columns_start_register + 1;
```
Now the main execution flow just uses these methods on `Insertion`:
```rust
// Create and insert the record
program.emit_insn(Insn::MakeRecord {
    start_reg: insertion.first_col_register(),
    count: insertion.col_mappings.len(),
    dest_reg: insertion.record_register(),
    index_name: None,
});
program.emit_insn(Insn::Insert {
    cursor: cursor_id,
    key_reg: insertion.key_register(),
    record_reg: insertion.record_register(),
    flag: InsertFlags::new(),
    table_name: table_name.to_string(),
});
```
---
The translation of a row now uses the information in `Insertion` and the
implementation is shared between the "insert single row" and "insert
multiple rows" cases:
```rust
/// Translate the key and the columns of the insertion.
/// This function is called by both [translate_rows_single] and [translate_rows_multiple],
/// each providing a different [translate_value_fn] implementation, because for multiple rows
/// we need to emit the values in a loop, from either an ephemeral table or a coroutine,
/// whereas for the single row the translation happens in a single pass without looping.
fn translate_rows_base<'short, 'long: 'short>(
    program: &mut ProgramBuilder,
    insertion: &'short Insertion<'long>,
    mut translate_value_fn: impl FnMut(&mut ProgramBuilder, usize, usize) -> Result<()>,
    resolver: &Resolver,
) -> Result<()> {
    translate_key(program, insertion, &mut translate_value_fn, resolver)?;
    for col in insertion.col_mappings.iter() {
        translate_column(
            program,
            col.column,
            col.register,
            col.value_index,
            &mut translate_value_fn,
            resolver,
        )?;
    }

    Ok(())
}
```
Which gets rid of the duplication in `populate_columns_single_row` and
`populate_columns_multiple_rows` in the old implementation.

Reviewed-by: Nikita Sivukhin (@sivukhin)

Closes #2687
This commit is contained in:
Jussi Saurio
2025-08-21 16:50:03 +03:00
committed by GitHub
13 changed files with 446 additions and 380 deletions

View File

@@ -603,6 +603,24 @@ impl Table {
}
}
/// Returns the column position and column for a given column name.
pub fn get_column_by_name(&self, name: &str) -> Option<(usize, &Column)> {
let name = normalize_ident(name);
match self {
Self::BTree(table) => table.get_column(name.as_str()),
Self::Virtual(table) => table
.columns
.iter()
.enumerate()
.find(|(_, col)| col.name.as_ref() == Some(&name)),
Self::FromClauseSubquery(from_clause_subquery) => from_clause_subquery
.columns
.iter()
.enumerate()
.find(|(_, col)| col.name.as_ref() == Some(&name)),
}
}
pub fn columns(&self) -> &Vec<Column> {
match self {
Self::BTree(table) => &table.columns,

View File

@@ -125,7 +125,7 @@ pub fn translate_alter_table(
continue;
}
program.emit_column(cursor_id, i, iter);
program.emit_column_or_rowid(cursor_id, i, iter);
iter += 1;
}
@@ -264,7 +264,7 @@ pub fn translate_alter_table(
let first_column = program.alloc_registers(sqlite_schema_column_len);
for i in 0..sqlite_schema_column_len {
program.emit_column(cursor_id, i, first_column + i);
program.emit_column_or_rowid(cursor_id, i, first_column + i);
}
program.emit_string8_new_reg(table_name.to_string());
@@ -354,7 +354,7 @@ pub fn translate_alter_table(
let first_column = program.alloc_registers(sqlite_schema_column_len);
for i in 0..sqlite_schema_column_len {
program.emit_column(cursor_id, i, first_column + i);
program.emit_column_or_rowid(cursor_id, i, first_column + i);
}
program.emit_string8_new_reg(table_name.to_string());

View File

@@ -551,7 +551,7 @@ fn emit_delete_insns(
.iter()
.enumerate()
.for_each(|(reg_offset, column_index)| {
program.emit_column(
program.emit_column_or_rowid(
main_table_cursor_id,
column_index.pos_in_table,
start_reg + reg_offset,
@@ -615,7 +615,7 @@ fn emit_delete_insns(
// Read all column values from the row to be deleted
for (i, _column) in table_reference.columns().iter().enumerate() {
program.emit_column(main_table_cursor_id, i, columns_start_reg + i);
program.emit_column_or_rowid(main_table_cursor_id, i, columns_start_reg + i);
}
// Emit RETURNING results using the values we just read
@@ -967,7 +967,11 @@ fn emit_update_insns(
}
})
.unwrap_or(&cursor_id);
program.emit_column(cursor_id, column_idx_in_index.unwrap_or(idx), target_reg);
program.emit_column_or_rowid(
cursor_id,
column_idx_in_index.unwrap_or(idx),
target_reg,
);
}
if let Some(cdc_updates_register) = cdc_updates_register {
@@ -1155,7 +1159,7 @@ fn emit_update_insns(
.iter()
.enumerate()
.for_each(|(reg_offset, column_index)| {
program.emit_column(
program.emit_column_or_rowid(
cursor_id,
column_index.pos_in_table,
start_reg + reg_offset,
@@ -1415,7 +1419,7 @@ pub fn emit_cdc_full_record(
extra_amount: 0,
});
} else {
program.emit_column(table_cursor_id, i, columns_reg + 1 + i);
program.emit_column_or_rowid(table_cursor_id, i, columns_reg + 1 + i);
}
}
program.emit_insn(Insn::MakeRecord {

View File

@@ -2012,7 +2012,7 @@ pub fn translate_expr(
*column
};
program.emit_column(read_cursor, column, target_register);
program.emit_column_or_rowid(read_cursor, column, target_register);
}
let Some(column) = table.get_column_at(*column) else {
crate::bail_parse_error!("column index out of bounds");

View File

@@ -479,7 +479,7 @@ impl<'a> GroupByAggArgumentSource<'a> {
dest_reg_start,
..
} => {
program.emit_column(*cursor_id, *col_start, dest_reg_start + arg_idx);
program.emit_column_or_rowid(*cursor_id, *col_start, dest_reg_start + arg_idx);
Ok(dest_reg_start + arg_idx)
}
GroupByAggArgumentSource::Register {
@@ -525,7 +525,7 @@ pub fn group_by_process_single_group(
for i in 0..group_by.exprs.len() {
let sorter_column_index = i;
let group_reg = groups_start_reg + i;
program.emit_column(*pseudo_cursor, sorter_column_index, group_reg);
program.emit_column_or_rowid(*pseudo_cursor, sorter_column_index, group_reg);
}
groups_start_reg
}
@@ -648,7 +648,7 @@ pub fn group_by_process_single_group(
t_ctx.non_aggregate_expressions.iter().enumerate()
{
if *in_result {
program.emit_column(*pseudo_cursor, sorter_column_index, next_reg);
program.emit_column_or_rowid(*pseudo_cursor, sorter_column_index, next_reg);
t_ctx.resolver.expr_to_reg_cache.push((expr, next_reg));
next_reg += 1;
}

View File

@@ -157,7 +157,7 @@ pub fn translate_create_index(
// Then insert the record into the sorter
let start_reg = program.alloc_registers(columns.len() + 1);
for (i, (col, _)) in columns.iter().enumerate() {
program.emit_column(table_cursor_id, col.0, start_reg + i);
program.emit_column_or_rowid(table_cursor_id, col.0, start_reg + i);
}
let rowid_reg = start_reg + columns.len();
program.emit_insn(Insn::RowId {
@@ -400,7 +400,7 @@ pub fn translate_drop_index(
// Read sqlite_schema.name into dest_reg
let dest_reg = program.alloc_register();
program.emit_column(sqlite_schema_cursor_id, 1, dest_reg);
program.emit_column_or_rowid(sqlite_schema_cursor_id, 1, dest_reg);
// if current column is not index_name then jump to Next
// skip if sqlite_schema.name != index_name_reg
@@ -415,7 +415,7 @@ pub fn translate_drop_index(
// read type of table
// skip if sqlite_schema.type != 'index' (index_str_reg)
program.emit_column(sqlite_schema_cursor_id, 0, dest_reg);
program.emit_column_or_rowid(sqlite_schema_cursor_id, 0, dest_reg);
// if current column is not index then jump to Next
program.emit_insn(Insn::Ne {
lhs: index_str_reg,

View File

@@ -6,7 +6,7 @@ use turso_sqlite3_parser::ast::{
};
use crate::error::{SQLITE_CONSTRAINT_NOTNULL, SQLITE_CONSTRAINT_PRIMARYKEY};
use crate::schema::{self, IndexColumn, Table};
use crate::schema::{self, Table};
use crate::translate::emitter::{
emit_cdc_insns, emit_cdc_patch_record, prepare_cdc_if_necessary, OperationMode,
};
@@ -319,32 +319,12 @@ pub fn translate_insert(
})
.collect::<Vec<(&String, usize, usize)>>();
let column_mappings = resolve_columns_for_insert(&table, &columns, num_values)?;
let has_user_provided_rowid = column_mappings
.iter()
.any(|x| x.value_index.is_some() && x.column.is_rowid_alias);
// allocate a register for each column in the table. if not provided by user, they will simply be set as null.
// allocate an extra register for rowid regardless of whether user provided a rowid alias column.
let num_cols = btree_table.columns.len();
let rowid_and_columns_start_register = program.alloc_registers(num_cols + 1);
let columns_start_register = rowid_and_columns_start_register + 1;
let record_register = program.alloc_register();
let insertion = build_insertion(&mut program, &table, &columns, num_values)?;
if inserting_multiple_rows {
if let Some(ref temp_table_ctx) = temp_table_ctx {
// Rewind loop to read from ephemeral table
program.emit_insn(Insn::Rewind {
cursor_id: temp_table_ctx.cursor_id,
pc_if_empty: temp_table_ctx.loop_end_label,
});
program.preassign_label_to_next_insn(temp_table_ctx.loop_start_label);
}
populate_columns_multiple_rows(
translate_rows_multiple(
&mut program,
&column_mappings,
rowid_and_columns_start_register,
&insertion,
yield_reg_opt.unwrap() + 1,
&resolver,
&temp_table_ctx,
@@ -357,13 +337,7 @@ pub fn translate_insert(
db: 0,
});
populate_column_registers(
&mut program,
&values.unwrap(),
&column_mappings,
rowid_and_columns_start_register,
&resolver,
)?;
translate_rows_single(&mut program, &values.unwrap(), &insertion, &resolver)?;
}
// Open all the index btrees for writing
@@ -375,6 +349,7 @@ pub fn translate_insert(
});
}
// Common record insertion logic for both single and multiple rows
let has_user_provided_rowid = insertion.key.is_provided_by_user();
let check_rowid_is_integer_label = if has_user_provided_rowid {
Some(program.allocate_label())
} else {
@@ -382,7 +357,7 @@ pub fn translate_insert(
};
if has_user_provided_rowid {
program.emit_insn(Insn::NotNull {
reg: rowid_and_columns_start_register,
reg: insertion.key_register(),
target_pc: check_rowid_is_integer_label.unwrap(),
});
}
@@ -390,7 +365,7 @@ pub fn translate_insert(
// Create new rowid if a) not provided by user or b) provided by user but is NULL
program.emit_insn(Insn::NewRowid {
cursor: cursor_id,
rowid_reg: rowid_and_columns_start_register,
rowid_reg: insertion.key_register(),
prev_largest_reg: 0,
});
@@ -398,7 +373,7 @@ pub fn translate_insert(
program.resolve_label(must_be_int_label, program.offset());
// If the user provided a rowid, it must be an integer.
program.emit_insn(Insn::MustBeInt {
reg: rowid_and_columns_start_register,
reg: insertion.key_register(),
});
}
@@ -408,14 +383,10 @@ pub fn translate_insert(
let make_record_label = program.allocate_label();
program.emit_insn(Insn::NotExists {
cursor: cursor_id,
rowid_reg: rowid_and_columns_start_register,
rowid_reg: insertion.key_register(),
target_pc: make_record_label,
});
let rowid_column = column_mappings
.iter()
.find(|x| x.value_index.is_some() && x.column.is_rowid_alias)
.expect("rowid alias column must be provided");
let rowid_column_name = rowid_column.column.name.as_deref().unwrap_or(ROWID);
let rowid_column_name = insertion.key.column_name();
program.emit_insn(Insn::Halt {
err_code: SQLITE_CONSTRAINT_PRIMARYKEY,
description: format!("{}.{}", table_name.as_str(), rowid_column_name),
@@ -426,8 +397,8 @@ pub fn translate_insert(
match table.btree() {
Some(t) if t.is_strict => {
program.emit_insn(Insn::TypeCheck {
start_reg: columns_start_register,
count: num_cols,
start_reg: insertion.first_col_register(),
count: insertion.col_mappings.len(),
check_generated: true,
table_reference: Arc::clone(&t),
});
@@ -435,46 +406,44 @@ pub fn translate_insert(
_ => (),
}
let index_col_mappings = resolve_indicies_for_insert(schema, table.as_ref(), &column_mappings)?;
for index_col_mapping in index_col_mappings {
for index in schema.get_indices(table_name.as_str()) {
let column_mappings = index
.columns
.iter()
.map(|idx_col| insertion.get_col_mapping_by_name(&idx_col.name));
// find which cursor we opened earlier for this index
let idx_cursor_id = idx_cursors
.iter()
.find(|(name, _, _)| *name == &index_col_mapping.idx_name)
.find(|(name, _, _)| *name == &index.name)
.map(|(_, _, c_id)| *c_id)
.expect("no cursor found for index");
let num_cols = index_col_mapping.columns.len();
let num_cols = index.columns.len();
// allocate scratch registers for the index columns plus rowid
let idx_start_reg = program.alloc_registers(num_cols + 1);
// copy each index column from the table's column registers into these scratch regs
for (i, col) in index_col_mapping.columns.iter().enumerate() {
for (i, column_mapping) in column_mappings.clone().enumerate() {
// copy from the table's column register over to the index's scratch register
program.emit_insn(Insn::Copy {
src_reg: rowid_and_columns_start_register + col.0,
src_reg: column_mapping.register,
dst_reg: idx_start_reg + i,
extra_amount: 0,
});
}
// last register is the rowid
program.emit_insn(Insn::Copy {
src_reg: rowid_and_columns_start_register,
src_reg: insertion.key_register(),
dst_reg: idx_start_reg + num_cols,
extra_amount: 0,
});
let index = schema
.get_index(table_name.as_str(), &index_col_mapping.idx_name)
.expect("index should be present");
let record_reg = program.alloc_register();
program.emit_insn(Insn::MakeRecord {
start_reg: idx_start_reg,
count: num_cols + 1,
dest_reg: record_reg,
index_name: Some(index_col_mapping.idx_name),
index_name: Some(index.name.clone()),
});
if index.unique {
@@ -485,25 +454,15 @@ pub fn translate_insert(
record_reg: idx_start_reg,
num_regs: num_cols,
});
let column_names = index_col_mapping.columns.iter().enumerate().fold(
let column_names = index.columns.iter().enumerate().fold(
String::with_capacity(50),
|mut accum, (idx, (index, _))| {
|mut accum, (idx, column)| {
if idx > 0 {
accum.push_str(", ");
}
accum.push_str(&btree_table.name);
accum.push_str(&index.name);
accum.push('.');
let name = column_mappings
.get(*index)
.unwrap()
.column
.name
.as_ref()
.expect("column name is None");
accum.push_str(name);
accum.push_str(&column.name);
accum
},
);
@@ -527,23 +486,23 @@ pub fn translate_insert(
});
}
for (i, col) in column_mappings
for column_mapping in insertion
.col_mappings
.iter()
.enumerate()
.filter(|(_, col)| col.column.notnull)
.filter(|column_mapping| column_mapping.column.notnull)
{
// if this is rowid alias - turso-db will emit NULL as a column value and always use rowid for the row as a column value
if col.column.is_rowid_alias {
if column_mapping.column.is_rowid_alias {
continue;
}
let target_reg = i + rowid_and_columns_start_register;
program.emit_insn(Insn::HaltIfNull {
target_reg,
target_reg: column_mapping.register,
err_code: SQLITE_CONSTRAINT_NOTNULL,
description: format!(
"{}.{}",
table_name,
col.column
column_mapping
.column
.name
.as_ref()
.expect("Column name must be present")
@@ -552,15 +511,15 @@ pub fn translate_insert(
}
// Create and insert the record
program.emit_insn(Insn::MakeRecord {
start_reg: columns_start_register,
count: num_cols,
dest_reg: record_register,
start_reg: insertion.first_col_register(),
count: insertion.col_mappings.len(),
dest_reg: insertion.record_register(),
index_name: None,
});
program.emit_insn(Insn::Insert {
cursor: cursor_id,
key_reg: rowid_and_columns_start_register,
record_reg: record_register,
key_reg: insertion.key_register(),
record_reg: insertion.record_register(),
flag: InsertFlags::new(),
table_name: table_name.to_string(),
});
@@ -572,9 +531,9 @@ pub fn translate_insert(
Some(emit_cdc_patch_record(
&mut program,
&table,
columns_start_register,
record_register,
rowid_and_columns_start_register,
insertion.first_col_register(),
insertion.record_register(),
insertion.key_register(),
))
} else {
None
@@ -584,7 +543,7 @@ pub fn translate_insert(
&resolver,
OperationMode::INSERT,
*cdc_cursor_id,
rowid_and_columns_start_register,
insertion.key_register(),
None,
after_record_reg,
None,
@@ -595,8 +554,8 @@ pub fn translate_insert(
// Emit RETURNING results if specified
if !result_columns.is_empty() {
let value_registers = ReturningValueRegisters {
rowid_register: rowid_and_columns_start_register,
columns_start_register,
rowid_register: insertion.key_register(),
columns_start_register: insertion.first_col_register(),
num_columns: table.columns().len(),
};
@@ -627,19 +586,6 @@ pub fn translate_insert(
Ok(program)
}
#[derive(Debug)]
/// Represents how a column should be populated during an INSERT.
/// Contains both the column definition and optionally the index into the VALUES tuple.
struct ColumnMapping<'a> {
/// Reference to the column definition from the table schema
column: &'a Column,
/// If Some(i), use the i-th value from the VALUES tuple
/// If None, use NULL (column was not specified in INSERT statement)
value_index: Option<usize>,
/// The default value for the column, if defined
default_value: Option<&'a Expr>,
}
pub const ROWID_COLUMN: &Column = &Column {
name: None,
ty: schema::Type::Integer,
@@ -653,298 +599,375 @@ pub const ROWID_COLUMN: &Column = &Column {
hidden: false,
};
/// Represents how a table should be populated during an INSERT.
#[derive(Debug)]
struct Insertion<'a> {
/// The integer key ("rowid") provided to the VDBE.
key: InsertionKey<'a>,
/// The column values that will be fed to the MakeRecord instruction to insert the row.
/// If the table has a rowid alias column, it will also be included in this record,
/// but a NULL will be stored for it.
col_mappings: Vec<ColMapping<'a>>,
/// The register that will contain the record built using the MakeRecord instruction.
record_reg: usize,
}
impl<'a> Insertion<'a> {
/// Return the register that contains the rowid.
pub fn key_register(&self) -> usize {
self.key.register()
}
/// Return the first register of the values that used to build the record
/// for the main table insert.
pub fn first_col_register(&self) -> usize {
self.col_mappings
.first()
.expect("columns must be present")
.register
}
/// Return the register that contains the record built using the MakeRecord instruction.
pub fn record_register(&self) -> usize {
self.record_reg
}
/// Returns the column mapping for a given column name.
pub fn get_col_mapping_by_name(&self, name: &str) -> &ColMapping<'a> {
if let InsertionKey::RowidAlias(mapping) = &self.key {
// If the key is a rowid alias, a NULL is emitted as the column value,
// so we need to return the key mapping instead so that the non-NULL rowid is used
// for the index insert.
if mapping
.column
.name
.as_ref()
.is_some_and(|n| n.eq_ignore_ascii_case(name))
{
return mapping;
}
}
self.col_mappings
.iter()
.find(|col| {
col.column
.name
.as_ref()
.is_some_and(|n| n.eq_ignore_ascii_case(name))
})
.unwrap_or_else(|| panic!("column {name} not found in insertion"))
}
}
#[derive(Debug)]
enum InsertionKey<'a> {
/// Rowid is not provided by user and will be autogenerated.
Autogenerated { register: usize },
/// Rowid is provided via the 'rowid' keyword.
LiteralRowid {
value_index: Option<usize>,
register: usize,
},
/// Rowid is provided via a rowid alias column.
RowidAlias(ColMapping<'a>),
}
impl InsertionKey<'_> {
fn register(&self) -> usize {
match self {
InsertionKey::Autogenerated { register } => *register,
InsertionKey::LiteralRowid { register, .. } => *register,
InsertionKey::RowidAlias(x) => x.register,
}
}
fn is_provided_by_user(&self) -> bool {
!matches!(self, InsertionKey::Autogenerated { .. })
}
fn column_name(&self) -> &str {
match self {
InsertionKey::RowidAlias(x) => x
.column
.name
.as_ref()
.expect("rowid alias column must be present")
.as_str(),
InsertionKey::LiteralRowid { .. } => ROWID,
InsertionKey::Autogenerated { .. } => ROWID,
}
}
}
/// Represents how a column in a table should be populated during an INSERT.
/// In a vector of [ColMapping], the index of a given [ColMapping] is
/// the position of the column in the table.
#[derive(Debug)]
struct ColMapping<'a> {
/// Column definition
column: &'a Column,
/// Index of the value to use from a tuple in the insert statement.
/// This is needed because the values in the insert statement are not necessarily
/// in the same order as the columns in the table, nor do they necessarily contain
/// all of the columns in the table.
/// If None, a NULL will be emitted for the column, unless it has a default value.
/// A NULL rowid alias column's value will be autogenerated.
value_index: Option<usize>,
/// Register where the value will be stored for insertion into the table.
register: usize,
}
/// Resolves how each column in a table should be populated during an INSERT.
/// Returns a Vec of ColumnMapping, one for each column in the table's schema.
///
/// For each column, specifies:
/// 1. The column definition (type, constraints, etc)
/// 2. Where to get the value from:
/// - Some(i) -> use i-th value from the VALUES tuple
/// - None -> use NULL (column wasn't specified in INSERT)
///
/// Two cases are handled:
/// 1. No column list specified (INSERT INTO t VALUES ...):
/// - Values are assigned to columns in table definition order
/// - If fewer values than columns, remaining columns map to None
/// 2. Column list specified (INSERT INTO t (col1, col3) VALUES ...):
/// - Named columns map to their corresponding value index
/// - Unspecified columns map to None
fn resolve_columns_for_insert<'a>(
/// Returns an [Insertion] struct that contains the key and record for the insertion.
fn build_insertion<'a>(
program: &mut ProgramBuilder,
table: &'a Table,
columns: &Option<DistinctNames>,
num_values: usize,
) -> Result<Vec<ColumnMapping<'a>>> {
) -> Result<Insertion<'a>> {
let table_columns = table.columns();
// +1 for rowid implicit ROWID column
let mut column_mappings = Vec::with_capacity(table_columns.len() + 1);
column_mappings.push(ColumnMapping {
column: ROWID_COLUMN,
value_index: None,
default_value: None,
});
for column in table_columns {
column_mappings.push(ColumnMapping {
column,
let rowid_register = program.alloc_register();
let mut insertion_key = InsertionKey::Autogenerated {
register: rowid_register,
};
let mut column_mappings = table
.columns()
.iter()
.map(|c| ColMapping {
column: c,
value_index: None,
default_value: column.default.as_ref(),
});
}
register: program.alloc_register(),
})
.collect::<Vec<_>>();
if columns.is_none() {
// Case 1: No columns specified - map values to columns in order
let mut value_idx = 0;
for (i, col) in table_columns.iter().enumerate() {
column_mappings[i + 1].value_index = if col.hidden { None } else { Some(value_idx) };
if !col.hidden {
value_idx += 1;
}
}
if num_values != value_idx {
if num_values != table_columns.iter().filter(|c| !c.hidden).count() {
crate::bail_parse_error!(
"table {} has {} columns but {} values were supplied",
&table.get_name(),
value_idx,
table_columns.len(),
num_values
);
}
let mut value_idx = 0;
for (i, col) in table_columns.iter().enumerate() {
if col.hidden {
// Hidden columns are not taken into account.
continue;
}
if col.is_rowid_alias {
insertion_key = InsertionKey::RowidAlias(ColMapping {
column: col,
value_index: Some(value_idx),
register: rowid_register,
});
} else {
column_mappings[i].value_index = Some(value_idx);
}
value_idx += 1;
}
} else {
// Case 2: Columns specified - map named columns to their values
// Map each named column to its value index
for (value_index, column_name) in columns.as_ref().unwrap().iter().enumerate() {
let column_name = normalize_ident(column_name.as_str());
let table_index = if column_name == ROWID {
Some(0)
if let Some((idx_in_table, col_in_table)) = table.get_column_by_name(&column_name) {
// Named column
if col_in_table.is_rowid_alias {
insertion_key = InsertionKey::RowidAlias(ColMapping {
column: col_in_table,
value_index: Some(value_index),
register: rowid_register,
});
} else {
column_mappings[idx_in_table].value_index = Some(value_index);
}
} else if column_name == ROWID {
// Explicit use of the 'rowid' keyword
if let Some(col_in_table) = table.columns().iter().find(|c| c.is_rowid_alias) {
insertion_key = InsertionKey::RowidAlias(ColMapping {
column: col_in_table,
value_index: Some(value_index),
register: rowid_register,
});
} else {
insertion_key = InsertionKey::LiteralRowid {
value_index: Some(value_index),
register: rowid_register,
};
}
} else {
column_mappings.iter().position(|c| {
c.column
.name
.as_ref()
.is_some_and(|name| name.eq_ignore_ascii_case(&column_name))
})
};
let Some(table_index) = table_index else {
crate::bail_parse_error!(
"table {} has no column named {}",
&table.get_name(),
column_name
);
};
column_mappings[table_index].value_index = Some(value_index);
}
}
Ok(column_mappings)
}
/// Represents how a column in an index should be populated during an INSERT.
/// Similar to ColumnMapping above but includes the index name, as well as multiple
/// possible value indices for each.
#[derive(Debug, Default)]
struct IndexColMapping {
idx_name: String,
columns: Vec<(usize, IndexColumn)>,
value_indicies: Vec<Option<usize>>,
}
impl IndexColMapping {
fn new(name: String) -> Self {
IndexColMapping {
idx_name: name,
..Default::default()
}
}
}
/// Example:
/// Table 'test': (a, b, c);
/// Index 'idx': test(a, b);
///________________________________
/// Insert (a, c): (2, 3)
/// Record: (2, NULL, 3)
/// IndexColMapping: (a, b) = (2, NULL)
fn resolve_indicies_for_insert(
schema: &Schema,
table: &Table,
columns: &[ColumnMapping<'_>],
) -> Result<Vec<IndexColMapping>> {
let mut index_col_mappings = Vec::new();
// Iterate over all indices for this table
for index in schema.get_indices(table.get_name()) {
let mut idx_map = IndexColMapping::new(index.name.clone());
// For each column in the index (in the order defined by the index),
// try to find the corresponding column in the inserts column mapping.
for idx_col in &index.columns {
let target_name = normalize_ident(idx_col.name.as_str());
if let Some((i, col_mapping)) = columns.iter().enumerate().find(|(_, mapping)| {
mapping
.column
.name
.as_ref()
.is_some_and(|name| name.eq_ignore_ascii_case(&target_name))
}) {
idx_map.columns.push((i, idx_col.clone()));
idx_map.value_indicies.push(col_mapping.value_index);
} else {
return Err(crate::LimboError::ParseError(format!(
"Column {} not found in index {}",
target_name, index.name
)));
}
}
// Add the mapping if at least one column was found.
if !idx_map.columns.is_empty() {
index_col_mappings.push(idx_map);
}
}
Ok(index_col_mappings)
Ok(Insertion {
key: insertion_key,
col_mappings: column_mappings,
record_reg: program.alloc_register(),
})
}
fn populate_columns_multiple_rows(
/// Populates the column registers with values for multiple rows.
/// This is used for INSERT INTO <table> VALUES (...), (...), ... or INSERT INTO <table> SELECT ...
/// which use either a coroutine or an ephemeral table as the value source.
fn translate_rows_multiple<'short, 'long: 'short>(
program: &mut ProgramBuilder,
column_mappings: &[ColumnMapping], // columns in order of table definition
column_registers_start: usize,
insertion: &'short Insertion<'long>,
yield_reg: usize,
resolver: &Resolver,
temp_table_ctx: &Option<TempTableCtx>,
) -> Result<()> {
// In case when both rowid and rowid-alias column provided in the query - turso-db overwrite rowid with **latest** value from the list
// As we iterate by column in natural order of their definition in scheme,
// we need to track last value_index we wrote to the rowid and overwrite rowid register only if new value_index is greater
let mut last_rowid_explicit_value = None;
for (i, mapping) in column_mappings.iter().enumerate() {
let column_register = column_registers_start + i;
if let Some(value_index) = mapping.value_index {
let write_directly_to_rowid_reg = mapping.column.is_rowid_alias;
let write_reg = if write_directly_to_rowid_reg {
if last_rowid_explicit_value.is_some_and(|x| x > value_index) {
continue;
}
last_rowid_explicit_value = Some(value_index);
column_registers_start // rowid always the first register in the array for insertion record
} else {
column_register
};
if let Some(ref temp_table_ctx) = temp_table_ctx {
// Rewind loop to read from ephemeral table
program.emit_insn(Insn::Rewind {
cursor_id: temp_table_ctx.cursor_id,
pc_if_empty: temp_table_ctx.loop_end_label,
});
program.preassign_label_to_next_insn(temp_table_ctx.loop_start_label);
}
let translate_value_fn =
|prg: &mut ProgramBuilder, value_index: usize, column_register: usize| {
if let Some(temp_table_ctx) = temp_table_ctx {
program.emit_column(temp_table_ctx.cursor_id, value_index, write_reg);
prg.emit_column_or_rowid(temp_table_ctx.cursor_id, value_index, column_register);
} else {
program.emit_insn(Insn::Copy {
prg.emit_insn(Insn::Copy {
src_reg: yield_reg + value_index,
dst_reg: write_reg,
dst_reg: column_register,
extra_amount: 0,
});
}
// write_reg can be euqal to column_register if column list explicitly mention "rowid"
if write_reg != column_register {
program.emit_null(column_register, None);
program.mark_last_insn_constant();
}
} else if mapping.column.is_rowid_alias {
program.emit_insn(Insn::SoftNull {
reg: column_register,
});
} else if let Some(default_expr) = mapping.default_value {
translate_expr(program, None, default_expr, column_register, resolver)?;
} else {
// Column was not specified as has no DEFAULT - use NULL if it is nullable, otherwise error
// Rowid alias columns can be NULL because we will autogenerate a rowid in that case.
let is_nullable = !mapping.column.primary_key || mapping.column.is_rowid_alias;
if is_nullable {
program.emit_insn(Insn::Null {
dest: column_register,
dest_end: None,
});
} else {
crate::bail_parse_error!(
"column {} is not nullable",
mapping.column.name.as_ref().expect("column name is None")
);
}
}
}
Ok(())
Ok(())
};
translate_rows_base(program, insertion, translate_value_fn, resolver)
}
/// Populates the column registers with values for a single row
#[allow(clippy::too_many_arguments)]
fn populate_column_registers(
fn translate_rows_single(
program: &mut ProgramBuilder,
value: &[Expr],
column_mappings: &[ColumnMapping],
column_registers_start: usize,
insertion: &Insertion,
resolver: &Resolver,
) -> Result<()> {
// In case when both rowid and rowid-alias column provided in the query - turso-db overwrite rowid with **latest** value from the list
// As we iterate by column in natural order of their definition in scheme,
// we need to track last value_index we wrote to the rowid and overwrite rowid register only if new value_index is greater
let mut last_rowid_explicit_value = None;
for (i, mapping) in column_mappings.iter().enumerate() {
let column_register = column_registers_start + i;
// Column has a value in the VALUES tuple
if let Some(value_index) = mapping.value_index {
// When inserting a single row, SQLite writes the value provided for the rowid alias column (INTEGER PRIMARY KEY)
// directly into the rowid register and writes a NULL into the rowid alias column.
let write_directly_to_rowid_reg = mapping.column.is_rowid_alias;
let write_reg = if write_directly_to_rowid_reg {
if last_rowid_explicit_value.is_some_and(|x| x > value_index) {
continue;
}
last_rowid_explicit_value = Some(value_index);
column_registers_start // rowid always the first register in the array for insertion record
} else {
column_register
};
let translate_value_fn =
|prg: &mut ProgramBuilder, value_index: usize, column_register: usize| -> Result<()> {
translate_expr_no_constant_opt(
program,
prg,
None,
value.get(value_index).expect("value index out of bounds"),
write_reg,
resolver,
NoConstantOptReason::RegisterReuse,
)?;
// write_reg can be euqal to column_register if column list explicitly mention "rowid"
if write_reg != column_register {
program.emit_null(column_register, None);
program.mark_last_insn_constant();
}
} else if mapping.column.hidden {
program.emit_insn(Insn::Null {
dest: column_register,
dest_end: None,
});
program.mark_last_insn_constant();
} else if let Some(default_expr) = mapping.default_value {
translate_expr_no_constant_opt(
program,
None,
default_expr,
value.get(value_index).unwrap_or_else(|| {
panic!("value index out of bounds: {value_index} for value: {value:?}")
}),
column_register,
resolver,
NoConstantOptReason::RegisterReuse,
)?;
} else {
// Column was not specified as has no DEFAULT - use NULL if it is nullable, otherwise error
// Rowid alias columns can be NULL because we will autogenerate a rowid in that case.
let is_nullable = !mapping.column.primary_key || mapping.column.is_rowid_alias;
if is_nullable {
program.emit_insn(Insn::Null {
dest: column_register,
dest_end: None,
});
program.mark_last_insn_constant();
} else {
crate::bail_parse_error!(
"column {} is not nullable",
mapping.column.name.as_ref().expect("column name is None")
);
}
Ok(())
};
translate_rows_base(program, insertion, translate_value_fn, resolver)
}
/// Translate the key and the columns of the insertion.
/// This function is called by both [translate_rows_single] and [translate_rows_multiple],
/// each providing a different [translate_value_fn] implementation, because for multiple rows
/// we need to emit the values in a loop, from either an ephemeral table or a coroutine,
/// whereas for the single row the translation happens in a single pass without looping.
fn translate_rows_base<'short, 'long: 'short>(
program: &mut ProgramBuilder,
insertion: &'short Insertion<'long>,
mut translate_value_fn: impl FnMut(&mut ProgramBuilder, usize, usize) -> Result<()>,
resolver: &Resolver,
) -> Result<()> {
translate_key(program, insertion, &mut translate_value_fn, resolver)?;
for col in insertion.col_mappings.iter() {
translate_column(
program,
col.column,
col.register,
col.value_index,
&mut translate_value_fn,
resolver,
)?;
}
Ok(())
}
/// Translate the [InsertionKey].
fn translate_key(
program: &mut ProgramBuilder,
insertion: &Insertion,
mut translate_value_fn: impl FnMut(&mut ProgramBuilder, usize, usize) -> Result<()>,
resolver: &Resolver,
) -> Result<()> {
match &insertion.key {
InsertionKey::RowidAlias(rowid_alias_column) => translate_column(
program,
rowid_alias_column.column,
rowid_alias_column.register,
rowid_alias_column.value_index,
&mut translate_value_fn,
resolver,
),
InsertionKey::LiteralRowid {
value_index,
register,
} => translate_column(
program,
ROWID_COLUMN,
*register,
*value_index,
&mut translate_value_fn,
resolver,
),
InsertionKey::Autogenerated { .. } => Ok(()), // will be populated later
}
}
fn translate_column(
program: &mut ProgramBuilder,
column: &Column,
column_register: usize,
value_index: Option<usize>,
translate_value_fn: &mut impl FnMut(&mut ProgramBuilder, usize, usize) -> Result<()>,
resolver: &Resolver,
) -> Result<()> {
if let Some(value_index) = value_index {
translate_value_fn(program, value_index, column_register)?;
} else if column.is_rowid_alias {
// Although a non-NULL integer key is used for the insertion key,
// the rowid alias column is emitted as NULL.
program.emit_insn(Insn::SoftNull {
reg: column_register,
});
} else if column.hidden {
// Emit NULL for not-explicitly-mentioned hidden columns, even ignoring DEFAULT.
program.emit_insn(Insn::Null {
dest: column_register,
dest_end: None,
});
} else if let Some(default_expr) = column.default.as_ref() {
translate_expr(program, None, default_expr, column_register, resolver)?;
} else {
let nullable = !column.notnull && !column.primary_key && !column.unique;
if !nullable {
crate::bail_parse_error!(
"column {} is not nullable",
column
.name
.as_ref()
.expect("column name must be present")
.as_str()
);
}
program.emit_insn(Insn::Null {
dest: column_register,
dest_end: None,
});
}
Ok(())
}
@@ -970,35 +993,29 @@ fn translate_virtual_table_insert(
_ => crate::bail_parse_error!("Unsupported INSERT body for virtual tables"),
};
let table = Table::Virtual(virtual_table.clone());
let column_mappings = resolve_columns_for_insert(&table, &columns, num_values)?;
let registers_start = program.alloc_registers(column_mappings.len() + 1);
/* *
* Inserts for virtual tables are done in a single step.
* argv[0] = (NULL for insert)
* argv[1] = (rowid for insert - NULL in most cases)
* argv[2..] = column values
* */
*/
let registers_start = program.alloc_register();
program.emit_insn(Insn::Null {
dest: registers_start,
dest_end: None,
});
/* *
* argv[1] = (rowid for insert - NULL in most cases)
* argv[2..] = column values
* */
let insertion = build_insertion(&mut program, &table, &columns, num_values)?;
populate_column_registers(
&mut program,
&value,
&column_mappings,
registers_start + 1,
resolver,
)?;
translate_rows_single(&mut program, &value, &insertion, resolver)?;
let conflict_action = on_conflict.as_ref().map(|c| c.bit_value()).unwrap_or(0) as u16;
let cursor_id = program.alloc_cursor_id(CursorType::VirtualTable(virtual_table.clone()));
program.emit_insn(Insn::VUpdate {
cursor_id,
arg_count: column_mappings.len() + 1,
arg_count: insertion.col_mappings.len() + 2, // +1 for NULL, +1 for rowid
start_reg: registers_start,
conflict_action,
});

View File

@@ -1404,7 +1404,7 @@ fn emit_autoindex(
let ephemeral_cols_start_reg = program.alloc_registers(num_regs_to_reserve);
for (i, col) in index.columns.iter().enumerate() {
let reg = ephemeral_cols_start_reg + i;
program.emit_column(table_cursor_id, col.pos_in_table, reg);
program.emit_column_or_rowid(table_cursor_id, col.pos_in_table, reg);
}
if table_has_rowid {
program.emit_insn(Insn::RowId {

View File

@@ -133,7 +133,7 @@ pub fn emit_order_by(
.get(i)
.expect("remapping must exist for all result columns")
.orderby_sorter_idx;
program.emit_column(cursor_id, column_idx, reg);
program.emit_column_or_rowid(cursor_id, column_idx, reg);
}
emit_result_row_and_limit(

View File

@@ -713,7 +713,7 @@ pub fn translate_drop_table(
program.preassign_label_to_next_insn(metadata_loop);
// start loop on schema table
program.emit_column(
program.emit_column_or_rowid(
sqlite_schema_cursor_id_0,
2,
table_name_and_root_page_register,
@@ -726,7 +726,7 @@ pub fn translate_drop_table(
flags: CmpInsFlags::default(),
collation: program.curr_collation(),
});
program.emit_column(
program.emit_column_or_rowid(
sqlite_schema_cursor_id_0,
0,
table_name_and_root_page_register,
@@ -749,7 +749,7 @@ pub fn translate_drop_table(
let skip_cdc_label = program.allocate_label();
let entry_type_reg = program.alloc_register();
program.emit_column(sqlite_schema_cursor_id_0, 0, entry_type_reg);
program.emit_column_or_rowid(sqlite_schema_cursor_id_0, 0, entry_type_reg);
program.emit_insn(Insn::Ne {
lhs: entry_type_reg,
rhs: table_type,
@@ -904,7 +904,7 @@ pub fn translate_drop_table(
});
program.preassign_label_to_next_insn(copy_schema_to_temp_table_loop);
// start loop on schema table
program.emit_column(sqlite_schema_cursor_id_1, 3, prev_root_page_register);
program.emit_column_or_rowid(sqlite_schema_cursor_id_1, 3, prev_root_page_register);
// The label and Insn::Ne are used to skip over any rows in the schema table that don't have the root page that was moved
let next_label = program.allocate_label();
program.emit_insn(Insn::Ne {
@@ -963,9 +963,9 @@ pub fn translate_drop_table(
rowid_reg: schema_row_id_register,
target_pc: next_label,
});
program.emit_column(sqlite_schema_cursor_id_1, 0, schema_column_0_register);
program.emit_column(sqlite_schema_cursor_id_1, 1, schema_column_1_register);
program.emit_column(sqlite_schema_cursor_id_1, 2, schema_column_2_register);
program.emit_column_or_rowid(sqlite_schema_cursor_id_1, 0, schema_column_0_register);
program.emit_column_or_rowid(sqlite_schema_cursor_id_1, 1, schema_column_1_register);
program.emit_column_or_rowid(sqlite_schema_cursor_id_1, 2, schema_column_2_register);
let root_page = table
.get_root_page()
.try_into()
@@ -974,7 +974,7 @@ pub fn translate_drop_table(
value: root_page,
dest: moved_to_root_page_register,
});
program.emit_column(sqlite_schema_cursor_id_1, 4, schema_column_4_register);
program.emit_column_or_rowid(sqlite_schema_cursor_id_1, 4, schema_column_4_register);
program.emit_insn(Insn::MakeRecord {
start_reg: schema_column_0_register,
count: 5,

View File

@@ -220,8 +220,8 @@ pub fn translate_drop_view(
let col0_reg = program.alloc_register();
let col1_reg = program.alloc_register();
program.emit_column(sqlite_schema_cursor_id, 0, col0_reg);
program.emit_column(sqlite_schema_cursor_id, 1, col1_reg);
program.emit_column_or_rowid(sqlite_schema_cursor_id, 0, col0_reg);
program.emit_column_or_rowid(sqlite_schema_cursor_id, 1, col1_reg);
// Check if type == 'view' and name == view_name
let skip_delete_label = program.allocate_label();

View File

@@ -836,7 +836,27 @@ impl ProgramBuilder {
self.preassign_label_to_next_insn(loop_end);
}
pub fn emit_column(&mut self, cursor_id: CursorID, column: usize, out: usize) {
pub fn emit_column_or_rowid(&mut self, cursor_id: CursorID, column: usize, out: usize) {
let (_, cursor_type) = self.cursor_ref.get(cursor_id).unwrap();
if let CursorType::BTreeTable(btree) = cursor_type {
let column_def = btree
.columns
.get(column)
.expect("column index out of bounds");
if column_def.is_rowid_alias {
self.emit_insn(Insn::RowId {
cursor_id,
dest: out,
});
} else {
self.emit_column(cursor_id, column, out);
}
} else {
self.emit_column(cursor_id, column, out);
}
}
fn emit_column(&mut self, cursor_id: CursorID, column: usize, out: usize) {
let (_, cursor_type) = self.cursor_ref.get(cursor_id).unwrap();
use crate::translate::expr::sanitize_string;

View File

@@ -13,4 +13,11 @@ do_execsql_test_on_specific_db {:memory:} create_table_same_uniques_and_primary_
do_execsql_test_on_specific_db {:memory:} create_table_unique_contained_in_primary_keys {
CREATE TABLE t4 (a,b, primary key(a,b), unique(a));
} {}
} {}
# https://github.com/tursodatabase/turso/issues/2686
do_execsql_test_on_specific_db {:memory:} create_table_rowid_unique_regression_test {
create table u(x integer unique primary key);
insert into u values (1),(2),(3);
select * from u where x > 2;
} {3}