Merge 'Explicit rowid insert' from Nikita Sivukhin

This PR adds support for `INSERT` queries with explicit value for
`rowid` column (not thought rowid alias):
```
turso> create table t(x, y, z);
turso> insert into t(rowid, x, y, z) values (10, 1, 2, 3);
turso> select rowid, * from t;
┌───────┬───┬───┬───┐
│ rowid │ x │ y │ z │
├───────┼───┼───┼───┤
│    10 │ 1 │ 2 │ 3 │
└───────┴───┴───┴───┘
```

Reviewed-by: Jussi Saurio <jussi.saurio@gmail.com>

Closes #2239
This commit is contained in:
Jussi Saurio
2025-07-24 10:08:42 +03:00
2 changed files with 177 additions and 137 deletions

View File

@@ -5,8 +5,9 @@ use turso_sqlite3_parser::ast::{
};
use crate::error::{SQLITE_CONSTRAINT_NOTNULL, SQLITE_CONSTRAINT_PRIMARYKEY};
use crate::schema::{IndexColumn, Table};
use crate::schema::{self, IndexColumn, Table};
use crate::translate::emitter::{emit_cdc_insns, emit_cdc_patch_record, OperationMode};
use crate::translate::planner::ROWID;
use crate::util::normalize_ident;
use crate::vdbe::builder::ProgramBuilderOpts;
use crate::vdbe::insn::{IdxInsertFlags, InsertFlags, RegisterOrLiteral};
@@ -290,29 +291,15 @@ pub fn translate_insert(
.collect::<Vec<(&String, usize, usize)>>();
let column_mappings = resolve_columns_for_insert(&table, &columns, num_values)?;
// Check if rowid was provided (through INTEGER PRIMARY KEY as a rowid alias)
let rowid_alias_index = btree_table.columns.iter().position(|c| c.is_rowid_alias);
let has_user_provided_rowid = {
assert_eq!(column_mappings.len(), btree_table.columns.len());
if let Some(index) = rowid_alias_index {
column_mappings[index].value_index.is_some()
} else {
false
}
};
let has_user_provided_rowid = column_mappings
.iter()
.any(|x| x.value_index.is_some() && x.column.is_rowid_alias);
// allocate a register for each column in the table. if not provided by user, they will simply be set as null.
// allocate an extra register for rowid regardless of whether user provided a rowid alias column.
let num_cols = btree_table.columns.len();
let rowid_reg = program.alloc_registers(num_cols + 1);
let column_registers_start = rowid_reg + 1;
let rowid_alias_reg = {
if has_user_provided_rowid {
Some(column_registers_start + rowid_alias_index.unwrap())
} else {
None
}
};
let rowid_and_columns_start_register = program.alloc_registers(num_cols + 1);
let columns_start_register = rowid_and_columns_start_register + 1;
let record_register = program.alloc_register();
@@ -328,7 +315,7 @@ pub fn translate_insert(
populate_columns_multiple_rows(
&mut program,
&column_mappings,
column_registers_start,
rowid_and_columns_start_register,
yield_reg_opt.unwrap() + 1,
&resolver,
&temp_table_ctx,
@@ -345,8 +332,7 @@ pub fn translate_insert(
&mut program,
&values.unwrap(),
&column_mappings,
column_registers_start,
rowid_reg,
rowid_and_columns_start_register,
&resolver,
)?;
}
@@ -368,26 +354,14 @@ pub fn translate_insert(
});
}
// Common record insertion logic for both single and multiple rows
let check_rowid_is_integer_label = rowid_alias_reg.and(Some(program.allocate_label()));
if let Some(reg) = rowid_alias_reg {
// for the row record, the rowid alias column (INTEGER PRIMARY KEY) is always set to NULL
// and its value is copied to the rowid register. in the case where a single row is inserted,
// the value is written directly to the rowid register (see populate_column_registers()).
// again, not sure why this only happens in the single row case, but let's mimic sqlite.
// in the single row case we save a Copy instruction, but in the multiple rows case we do
// it here in the loop.
if inserting_multiple_rows {
program.emit_insn(Insn::Copy {
src_reg: reg,
dst_reg: rowid_reg,
extra_amount: 0, // TODO: rename 'amount' to something else; amount==0 means 1
});
// for the row record, the rowid alias column is always set to NULL
program.emit_insn(Insn::SoftNull { reg });
}
// the user provided rowid value might itself be NULL. If it is, we create a new rowid on the next instruction.
let check_rowid_is_integer_label = if has_user_provided_rowid {
Some(program.allocate_label())
} else {
None
};
if has_user_provided_rowid {
program.emit_insn(Insn::NotNull {
reg: rowid_reg,
reg: rowid_and_columns_start_register,
target_pc: check_rowid_is_integer_label.unwrap(),
});
}
@@ -395,14 +369,16 @@ pub fn translate_insert(
// Create new rowid if a) not provided by user or b) provided by user but is NULL
program.emit_insn(Insn::NewRowid {
cursor: cursor_id,
rowid_reg,
rowid_reg: rowid_and_columns_start_register,
prev_largest_reg: 0,
});
if let Some(must_be_int_label) = check_rowid_is_integer_label {
program.resolve_label(must_be_int_label, program.offset());
// If the user provided a rowid, it must be an integer.
program.emit_insn(Insn::MustBeInt { reg: rowid_reg });
program.emit_insn(Insn::MustBeInt {
reg: rowid_and_columns_start_register,
});
}
// Check uniqueness constraint for rowid if it was provided by user.
@@ -411,21 +387,14 @@ pub fn translate_insert(
let make_record_label = program.allocate_label();
program.emit_insn(Insn::NotExists {
cursor: cursor_id,
rowid_reg,
rowid_reg: rowid_and_columns_start_register,
target_pc: make_record_label,
});
let rowid_column_name = if let Some(index) = rowid_alias_index {
btree_table
.columns
.get(index)
.unwrap()
.name
.as_ref()
.expect("column name is None")
} else {
"rowid"
};
let rowid_column = column_mappings
.iter()
.find(|x| x.value_index.is_some() && x.column.is_rowid_alias)
.expect("rowid alias column must be provided");
let rowid_column_name = rowid_column.column.name.as_deref().unwrap_or(ROWID);
program.emit_insn(Insn::Halt {
err_code: SQLITE_CONSTRAINT_PRIMARYKEY,
description: format!("{}.{}", table_name.0, rowid_column_name),
@@ -436,7 +405,7 @@ pub fn translate_insert(
match table.btree() {
Some(t) if t.is_strict => {
program.emit_insn(Insn::TypeCheck {
start_reg: column_registers_start,
start_reg: columns_start_register,
count: num_cols,
check_generated: true,
table_reference: Rc::clone(&t),
@@ -463,14 +432,14 @@ pub fn translate_insert(
// copy from the table's column register over to the index's scratch register
program.emit_insn(Insn::Copy {
src_reg: column_registers_start + col.0,
src_reg: rowid_and_columns_start_register + col.0,
dst_reg: idx_start_reg + i,
extra_amount: 0,
});
}
// last register is the rowid
program.emit_insn(Insn::Copy {
src_reg: rowid_reg,
src_reg: rowid_and_columns_start_register,
dst_reg: idx_start_reg + num_cols,
extra_amount: 0,
});
@@ -505,10 +474,10 @@ pub fn translate_insert(
accum.push_str(&btree_table.name);
accum.push('.');
let name = btree_table
.columns
let name = column_mappings
.get(*index)
.unwrap()
.column
.name
.as_ref()
.expect("column name is None");
@@ -546,7 +515,7 @@ pub fn translate_insert(
if col.column.is_rowid_alias {
continue;
}
let target_reg = i + column_registers_start;
let target_reg = i + rowid_and_columns_start_register;
program.emit_insn(Insn::HaltIfNull {
target_reg,
err_code: SQLITE_CONSTRAINT_NOTNULL,
@@ -562,14 +531,14 @@ pub fn translate_insert(
}
// Create and insert the record
program.emit_insn(Insn::MakeRecord {
start_reg: column_registers_start,
start_reg: columns_start_register,
count: num_cols,
dest_reg: record_register,
index_name: None,
});
program.emit_insn(Insn::Insert {
cursor: cursor_id,
key_reg: rowid_reg,
key_reg: rowid_and_columns_start_register,
record_reg: record_register,
flag: InsertFlags::new(),
table_name: table_name.to_string(),
@@ -582,9 +551,9 @@ pub fn translate_insert(
Some(emit_cdc_patch_record(
&mut program,
&table,
column_registers_start,
columns_start_register,
record_register,
rowid_reg,
rowid_and_columns_start_register,
))
} else {
None
@@ -594,7 +563,7 @@ pub fn translate_insert(
&resolver,
OperationMode::INSERT,
*cdc_cursor_id,
rowid_reg,
rowid_and_columns_start_register,
None,
after_record_reg,
&table_name.0,
@@ -639,6 +608,19 @@ struct ColumnMapping<'a> {
default_value: Option<&'a Expr>,
}
pub const ROWID_COLUMN: &Column = &Column {
name: None,
ty: schema::Type::Integer,
ty_str: String::new(),
primary_key: true,
is_rowid_alias: true,
notnull: true,
default: None,
unique: false,
collation: None,
hidden: false,
};
/// Resolves how each column in a table should be populated during an INSERT.
/// Returns a Vec of ColumnMapping, one for each column in the table's schema.
///
@@ -661,20 +643,30 @@ fn resolve_columns_for_insert<'a>(
num_values: usize,
) -> Result<Vec<ColumnMapping<'a>>> {
let table_columns = table.columns();
// Case 1: No columns specified - map values to columns in order
// +1 for rowid implicit ROWID column
let mut column_mappings = Vec::with_capacity(table_columns.len() + 1);
column_mappings.push(ColumnMapping {
column: ROWID_COLUMN,
value_index: None,
default_value: None,
});
for column in table_columns {
column_mappings.push(ColumnMapping {
column,
value_index: None,
default_value: column.default.as_ref(),
});
}
if columns.is_none() {
// Case 1: No columns specified - map values to columns in order
let mut value_idx = 0;
let mut column_mappings = Vec::with_capacity(table_columns.len());
for col in table_columns {
let mapping = ColumnMapping {
column: col,
value_index: if col.hidden { None } else { Some(value_idx) },
default_value: col.default.as_ref(),
};
for (i, col) in table_columns.iter().enumerate() {
column_mappings[i + 1].value_index = if col.hidden { None } else { Some(value_idx) };
if !col.hidden {
value_idx += 1;
}
column_mappings.push(mapping);
}
if num_values != value_idx {
@@ -685,41 +677,35 @@ fn resolve_columns_for_insert<'a>(
num_values
);
}
} else {
// Case 2: Columns specified - map named columns to their values
// Map each named column to its value index
for (value_index, column_name) in columns.as_ref().unwrap().iter().enumerate() {
let column_name = normalize_ident(column_name.0.as_str());
let table_index = if column_name == ROWID {
Some(0)
} else {
column_mappings.iter().position(|c| {
c.column
.name
.as_ref()
.is_some_and(|name| name.eq_ignore_ascii_case(&column_name))
})
};
return Ok(column_mappings);
let Some(table_index) = table_index else {
crate::bail_parse_error!(
"table {} has no column named {}",
&table.get_name(),
column_name
);
};
column_mappings[table_index].value_index = Some(value_index);
}
}
// Case 2: Columns specified - map named columns to their values
let mut mappings: Vec<_> = table_columns
.iter()
.map(|col| ColumnMapping {
column: col,
value_index: None,
default_value: col.default.as_ref(),
})
.collect();
// Map each named column to its value index
for (value_index, column_name) in columns.as_ref().unwrap().iter().enumerate() {
let column_name = normalize_ident(column_name.0.as_str());
let table_index = table_columns.iter().position(|c| {
c.name
.as_ref()
.is_some_and(|name| name.eq_ignore_ascii_case(&column_name))
});
let Some(table_index) = table_index else {
crate::bail_parse_error!(
"table {} has no column named {}",
&table.get_name(),
column_name
);
};
mappings[table_index].value_index = Some(value_index);
}
Ok(mappings)
Ok(column_mappings)
}
/// Represents how a column in an index should be populated during an INSERT.
@@ -793,34 +779,51 @@ fn populate_columns_multiple_rows(
resolver: &Resolver,
temp_table_ctx: &Option<TempTableCtx>,
) -> Result<()> {
// In case when both rowid and rowid-alias column provided in the query - turso-db overwrite rowid with **latest** value from the list
// As we iterate by column in natural order of their definition in scheme,
// we need to track last value_index we wrote to the rowid and overwrite rowid register only if new value_index is greater
let mut last_rowid_explicit_value = None;
for (i, mapping) in column_mappings.iter().enumerate() {
let target_reg = column_registers_start + i;
let column_register = column_registers_start + i;
if let Some(value_index) = mapping.value_index {
let write_directly_to_rowid_reg = mapping.column.is_rowid_alias;
let write_reg = if write_directly_to_rowid_reg {
if last_rowid_explicit_value.is_some_and(|x| x > value_index) {
continue;
}
last_rowid_explicit_value = Some(value_index);
column_registers_start // rowid always the first register in the array for insertion record
} else {
column_register
};
if let Some(temp_table_ctx) = temp_table_ctx {
program.emit_column(
temp_table_ctx.cursor_id,
value_index,
column_registers_start + i,
);
program.emit_column(temp_table_ctx.cursor_id, value_index, write_reg);
} else {
program.emit_insn(Insn::Copy {
src_reg: yield_reg + value_index,
dst_reg: column_registers_start + i,
dst_reg: write_reg,
extra_amount: 0,
});
}
// write_reg can be euqal to column_register if column list explicitly mention "rowid"
if write_reg != column_register {
program.emit_null(column_register, None);
program.mark_last_insn_constant();
}
} else if mapping.column.is_rowid_alias {
program.emit_insn(Insn::SoftNull { reg: target_reg });
program.emit_insn(Insn::SoftNull {
reg: column_register,
});
} else if let Some(default_expr) = mapping.default_value {
translate_expr(program, None, default_expr, target_reg, resolver)?;
translate_expr(program, None, default_expr, column_register, resolver)?;
} else {
// Column was not specified as has no DEFAULT - use NULL if it is nullable, otherwise error
// Rowid alias columns can be NULL because we will autogenerate a rowid in that case.
let is_nullable = !mapping.column.primary_key || mapping.column.is_rowid_alias;
if is_nullable {
program.emit_insn(Insn::Null {
dest: target_reg,
dest: column_register,
dest_end: None,
});
} else {
@@ -841,36 +844,46 @@ fn populate_column_registers(
value: &[Expr],
column_mappings: &[ColumnMapping],
column_registers_start: usize,
rowid_reg: usize,
resolver: &Resolver,
) -> Result<()> {
// In case when both rowid and rowid-alias column provided in the query - turso-db overwrite rowid with **latest** value from the list
// As we iterate by column in natural order of their definition in scheme,
// we need to track last value_index we wrote to the rowid and overwrite rowid register only if new value_index is greater
let mut last_rowid_explicit_value = None;
for (i, mapping) in column_mappings.iter().enumerate() {
let target_reg = column_registers_start + i;
let column_register = column_registers_start + i;
// Column has a value in the VALUES tuple
if let Some(value_index) = mapping.value_index {
// When inserting a single row, SQLite writes the value provided for the rowid alias column (INTEGER PRIMARY KEY)
// directly into the rowid register and writes a NULL into the rowid alias column.
let write_directly_to_rowid_reg = mapping.column.is_rowid_alias;
let reg = if write_directly_to_rowid_reg {
rowid_reg
let write_reg = if write_directly_to_rowid_reg {
if last_rowid_explicit_value.is_some_and(|x| x > value_index) {
continue;
}
last_rowid_explicit_value = Some(value_index);
column_registers_start // rowid always the first register in the array for insertion record
} else {
target_reg
column_register
};
translate_expr_no_constant_opt(
program,
None,
value.get(value_index).expect("value index out of bounds"),
reg,
write_reg,
resolver,
NoConstantOptReason::RegisterReuse,
)?;
if write_directly_to_rowid_reg {
program.emit_insn(Insn::SoftNull { reg: target_reg });
// write_reg can be euqal to column_register if column list explicitly mention "rowid"
if write_reg != column_register {
program.emit_null(column_register, None);
program.mark_last_insn_constant();
}
} else if mapping.column.hidden {
program.emit_insn(Insn::Null {
dest: target_reg,
dest: column_register,
dest_end: None,
});
program.mark_last_insn_constant();
@@ -879,7 +892,7 @@ fn populate_column_registers(
program,
None,
default_expr,
target_reg,
column_register,
resolver,
NoConstantOptReason::RegisterReuse,
)?;
@@ -889,7 +902,7 @@ fn populate_column_registers(
let is_nullable = !mapping.column.primary_key || mapping.column.is_rowid_alias;
if is_nullable {
program.emit_insn(Insn::Null {
dest: target_reg,
dest: column_register,
dest_end: None,
});
program.mark_last_insn_constant();
@@ -923,27 +936,25 @@ fn translate_virtual_table_insert(
};
let table = Table::Virtual(virtual_table.clone());
let column_mappings = resolve_columns_for_insert(&table, &columns, num_values)?;
let registers_start = program.alloc_registers(2);
let registers_start = program.alloc_registers(column_mappings.len() + 1);
/* *
* Inserts for virtual tables are done in a single step.
* argv[0] = (NULL for insert)
* argv[1] = (NULL for insert)
* argv[1] = (rowid for insert - NULL in most cases)
* argv[2..] = column values
* */
program.emit_insn(Insn::Null {
dest: registers_start,
dest_end: Some(registers_start + 1),
dest_end: None,
});
let values_reg = program.alloc_registers(column_mappings.len());
populate_column_registers(
&mut program,
&value,
&column_mappings,
values_reg,
registers_start,
registers_start + 1,
resolver,
)?;
let conflict_action = on_conflict.as_ref().map(|c| c.bit_value()).unwrap_or(0) as u16;
@@ -952,7 +963,7 @@ fn translate_virtual_table_insert(
program.emit_insn(Insn::VUpdate {
cursor_id,
arg_count: column_mappings.len() + 2,
arg_count: column_mappings.len() + 1,
start_reg: registers_start,
conflict_action,
});

View File

@@ -488,3 +488,32 @@ do_execsql_test_on_specific_db {:memory:} insert-tricky-column-order-table {
4||5
|3|2
|6|5}
do_execsql_test_on_specific_db {:memory:} insert-explicit-rowid {
create table t (x, y, z);
insert into t(z, x, y, rowid) values (1, 2, 3, 4), (5, 6, 7, 8);
insert into t(z, x, y, rowid) values (9, 10, 11, 12);
select rowid, x, y, z from t;
} {4|2|3|1
8|6|7|5
12|10|11|9}
do_execsql_test_on_specific_db {:memory:} insert-explicit-rowid-with-rowidalias {
create table t (x, y INTEGER PRIMARY KEY, z);
insert into t(z, x, y, rowid) values (1, 2, 3, 4), (5, 6, 7, 8);
insert into t(z, x, y, rowid) values (9, 10, 11, 12);
insert into t(z, x, rowid, y) values (-1, -2, -3, -4), (-5, -6, -7, -8);
insert into t(z, x, rowid, y) values (-9, -10, -11, -12);
select rowid, x, y, z from t;
} {-12|-10|-12|-9
-8|-6|-8|-5
-4|-2|-4|-1
4|2|4|1
8|6|8|5
12|10|12|9}
do_execsql_test_in_memory_error_content insert-explicit-rowid-conflict {
create table t (x);
insert into t(rowid, x) values (1, 1);
insert into t(rowid, x) values (1, 2);
} {UNIQUE constraint failed: t.rowid (19)}