Merge 'Fix: core/translate/insert: fix four issues with inserts' from Jussi Saurio

Closes #436
This PR fixes four issues:
1. Not respecting user-provided column names (e.g. `INSERT INTO foo
(b,c) values (1,2);` would just insert into the first two columns
regardless of what index `b` and `c` have)
2. Limbo would get in an infinite loop when inserting too many values
(too many i.e. more columns than the table has)
3. False positive unique constraint error on non-primary key columns
when inserting multiple values, e.g.
```
limbo> create table t1(v1 int);
limbo> insert into t1 values (1),(2);
Runtime error: UNIQUE constraint failed: t1.v1 (19)
```
as seen [here](https://github.com/tursodatabase/limbo/pull/490#issuecomm
ent-2545545562)
4. Limbo no longer uses a coroutine for INSERT when only inserting one
row. See [this comment](https://github.com/tursodatabase/limbo/issues/43
6#issuecomment-2533937845). For the equivalent query, Limbo now
generates:
```
limbo> EXPLAIN INSERT INTO users (name, email) VALUES ('John Doe', 'john@example.com');
addr  opcode             p1    p2    p3    p4             p5  comment
----  -----------------  ----  ----  ----  -------------  --  -------
0     Init               0     10    0                    0   Start at 10
1     OpenWriteAsync     0     2     0                    0
2     OpenWriteAwait     0     0     0                    0
3     String8            0     3     0     John Doe       0   r[3]='John Doe'
4     String8            0     4     0     john@example.com  0   r[4]='john@example.com'
5     NewRowId           0     1     0                    0
6     MakeRecord         2     3     5                    0   r[5]=mkrec(r[2..4])
7     InsertAsync        0     5     1                    0
8     InsertAwait        0     0     0                    0
9     Halt               0     0     0                    0
10    Transaction        0     1     0                    0
11    Null               0     2     0                    0   r[2]=NULL
12    Goto               0     1     0                    0
```
---
Note that this PR doesn't fix e.g. #472 which requires creating an index
on the non-rowid primary key column(s), nor does it implement rollback
(e.g. inserting two rows where one fails to unique constraint still
inserts the other row)
---
**EXAMPLES OF ERRONEOUS BEHAVIOR -- current head of main:**
wrong column inserted
```
limbo> create table rowidalias_b (a, b INTEGER PRIMARY KEY, c, d);

limbo> insert into rowidalias_b (d) values ('d only');
limbo> select * from rowidalias_b;
d only|1||   <-- gets inserted into column a
```
wrong column inserted
```
limbo> create table textpk (a, b text primary key, c);
limbo> insert into textpk (a,b,c) values ('a','b','c');
limbo> select * from textpk;
a|b|c
limbo> insert into textpk (b,c) values ('b','c');
limbo> select * from textpk;
a|b|c
b|c|  <--- b gets inserted into column a
```
false positive from integer check due to attempting to insert wrong
column
```
limbo> create table rowidalias_b (a, b INTEGER PRIMARY KEY, c, d);
limbo> insert into rowidalias_b (a,c) values ('lol', 'bal');
Parse error: MustBeInt: the value in the register is not an integer  <-- tries to insert c into b column
```
false positive from integer check due to attempting to insert wrong
column
```
limbo> CREATE TABLE users (
    id INTEGER PRIMARY KEY,
    name TEXT,
    email TEXT
);
limbo> INSERT INTO users (name, email) VALUES ('John Doe', 'john@example.com');
Parse error: MustBeInt: the value in the register is not an integer.   <-- tries to insert name into id column
```
allows write of nonexistent column
```
limbo> create table a(b);
limbo> insert into a (nonexistent_col) values (1);
limbo> select * from a;
1
```
hangs forever when inserting too many values
```
limbo> create table a (b integer primary key);
limbo> insert into a values (1,2);  <-- spinloops forever at 100% cpu
```
unique constraint error on non-unique column
```
limbo> create table t1(v1 int);
limbo> insert into t1 values (1),(2);
Runtime error: UNIQUE constraint failed: t1.v1 (19)
```
**EXAMPLES OF CORRECT BEHAVIOR -- this branch:**
correct column inserted
```
limbo> create table rowidalias_b (a, b INTEGER PRIMARY KEY, c, d);
limbo> insert into rowidalias_b (d) values ('d only');
limbo> select * from rowidalias_b;
|1||d only
```
correct column inserted
```
limbo> create table textpk (a, b text primary key, c);
limbo> insert into textpk (a,b,c) values ('a','b','c');
limbo> select * from textpk;
a|b|c
limbo> insert into textpk (b,c) values ('b','c');
limbo> select * from textpk;
a|b|c
|b|c
```
correct columns inserted, PK autoincremented
```
limbo> create table rowidalias_b (a, b INTEGER PRIMARY KEY, c, d);
limbo> insert into rowidalias_b (a,c) values ('lol', 'bal');
limbo> select * from rowidalias_b;
lol|1|bal|
```
correct column inserted, PK autoincremented
```
limbo> CREATE TABLE users (
    id INTEGER PRIMARY KEY,
    name TEXT,
    email TEXT
);
limbo> INSERT INTO users (name, email) VALUES ('John Doe', 'john@example.com');
limbo> select * from users;
1|John Doe|john@example.com
```
reports parse error correctly about wrong number of values
```
limbo> create table a (b integer primary key);
limbo> insert into a values (1,2);
Parse error: table a has 1 columns but 2 values were supplied
```
reports parse error correctly about nonexistent column
```
limbo> create table a(b);
limbo> insert into a (nonexistent_col) values (1);
Parse error: table a has no column named nonexistent_col
```
no unique constraint error on non-unique column
```
limbo> create table t1(v1 int);
limbo> insert into t1 values (1),(2);
limbo> select * from t1;
1
2
```
**Also, added multi-row inserts to simulator and ran into at least
this:**
```
Seed: 9444323279823516485
path to db '"/var/folders/qj/r6wpj6657x9cj_1jx_62cpgr0000gn/T/.tmpcYczRv/simulator.db"'
Initial opts SimulatorOpts { ticks: 3474, max_connections: 1, max_tables: 79, read_percent: 61, write_percent: 12, delete_percent: 27, max_interactions: 2940, page_size: 4096 }
thread 'main' panicked at core/storage/sqlite3_ondisk.rs:332:36:
called `Result::unwrap()` on an `Err` value: Corrupt("Invalid page type: 83")
```

Closes #533
This commit is contained in:
Pekka Enberg
2024-12-27 10:00:37 +02:00
4 changed files with 344 additions and 122 deletions

View File

@@ -2,32 +2,187 @@ use std::rc::Weak;
use std::{cell::RefCell, ops::Deref, rc::Rc};
use sqlite3_parser::ast::{
DistinctNames, InsertBody, QualifiedName, ResolveType, ResultColumn, With,
DistinctNames, Expr, InsertBody, QualifiedName, ResolveType, ResultColumn, With,
};
use crate::error::SQLITE_CONSTRAINT_PRIMARYKEY;
use crate::util::normalize_ident;
use crate::{
schema::{Schema, Table},
schema::{Column, Schema, Table},
storage::sqlite3_ondisk::DatabaseHeader,
translate::expr::translate_expr,
vdbe::{builder::ProgramBuilder, Insn, Program},
};
use crate::{Connection, Result};
#[derive(Debug)]
/// Represents how a column should be populated during an INSERT.
/// Contains both the column definition and optionally the index into the VALUES tuple.
struct ColumnMapping<'a> {
/// Reference to the column definition from the table schema
column: &'a Column,
/// If Some(i), use the i-th value from the VALUES tuple
/// If None, use NULL (column was not specified in INSERT statement)
value_index: Option<usize>,
}
/// Resolves how each column in a table should be populated during an INSERT.
/// Returns a Vec of ColumnMapping, one for each column in the table's schema.
///
/// For each column, specifies:
/// 1. The column definition (type, constraints, etc)
/// 2. Where to get the value from:
/// - Some(i) -> use i-th value from the VALUES tuple
/// - None -> use NULL (column wasn't specified in INSERT)
///
/// Two cases are handled:
/// 1. No column list specified (INSERT INTO t VALUES ...):
/// - Values are assigned to columns in table definition order
/// - If fewer values than columns, remaining columns map to None
/// 2. Column list specified (INSERT INTO t (col1, col3) VALUES ...):
/// - Named columns map to their corresponding value index
/// - Unspecified columns map to None
fn resolve_columns_for_insert<'a>(
table: &'a Table,
columns: &Option<DistinctNames>,
values: &[Vec<Expr>],
) -> Result<Vec<ColumnMapping<'a>>> {
if values.is_empty() {
crate::bail_parse_error!("no values to insert");
}
let table_columns = table.columns();
// Case 1: No columns specified - map values to columns in order
if columns.is_none() {
let num_values = values[0].len();
if num_values > table_columns.len() {
crate::bail_parse_error!(
"table {} has {} columns but {} values were supplied",
table.get_name(),
table_columns.len(),
num_values
);
}
// Verify all value tuples have same length
for value in values.iter().skip(1) {
if value.len() != num_values {
crate::bail_parse_error!("all VALUES must have the same number of terms");
}
}
// Map each column to either its corresponding value index or None
return Ok(table_columns
.iter()
.enumerate()
.map(|(i, col)| ColumnMapping {
column: col,
value_index: if i < num_values { Some(i) } else { None },
})
.collect());
}
// Case 2: Columns specified - map named columns to their values
let mut mappings: Vec<_> = table_columns
.iter()
.map(|col| ColumnMapping {
column: col,
value_index: None,
})
.collect();
// Map each named column to its value index
for (value_index, column_name) in columns.as_ref().unwrap().iter().enumerate() {
let column_name = normalize_ident(column_name.0.as_str());
let table_index = table_columns
.iter()
.position(|c| c.name.eq_ignore_ascii_case(&column_name));
if table_index.is_none() {
crate::bail_parse_error!(
"table {} has no column named {}",
table.get_name(),
column_name
);
}
mappings[table_index.unwrap()].value_index = Some(value_index);
}
Ok(mappings)
}
/// Populates the column registers with values for a single row
fn populate_column_registers(
program: &mut ProgramBuilder,
value: &[Expr],
column_mappings: &[ColumnMapping],
column_registers_start: usize,
inserting_multiple_rows: bool,
rowid_reg: usize,
) -> Result<()> {
for (i, mapping) in column_mappings.iter().enumerate() {
let target_reg = column_registers_start + i;
// Column has a value in the VALUES tuple
if let Some(value_index) = mapping.value_index {
// When inserting a single row, SQLite writes the value provided for the rowid alias column (INTEGER PRIMARY KEY)
// directly into the rowid register and writes a NULL into the rowid alias column. Not sure why this only happens
// in the single row case, but let's copy it.
let write_directly_to_rowid_reg =
mapping.column.is_rowid_alias && !inserting_multiple_rows;
let reg = if write_directly_to_rowid_reg {
rowid_reg
} else {
target_reg
};
translate_expr(
program,
None,
value.get(value_index).expect("value index out of bounds"),
reg,
None,
)?;
if write_directly_to_rowid_reg {
program.emit_insn(Insn::SoftNull { reg: target_reg });
}
} else {
// Column was not specified - use NULL if it is nullable, otherwise error
// Rowid alias columns can be NULL because we will autogenerate a rowid in that case.
let is_nullable = !mapping.column.primary_key || mapping.column.is_rowid_alias;
if is_nullable {
program.emit_insn(Insn::Null {
dest: target_reg,
dest_end: None,
});
program.mark_last_insn_constant();
} else {
crate::bail_parse_error!("column {} is not nullable", mapping.column.name);
}
}
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
pub fn translate_insert(
schema: &Schema,
with: &Option<With>,
or_conflict: &Option<ResolveType>,
on_conflict: &Option<ResolveType>,
tbl_name: &QualifiedName,
_columns: &Option<DistinctNames>,
columns: &Option<DistinctNames>,
body: &InsertBody,
_returning: &Option<Vec<ResultColumn>>,
database_header: Rc<RefCell<DatabaseHeader>>,
connection: Weak<Connection>,
) -> Result<Program> {
assert!(with.is_none());
assert!(or_conflict.is_none());
if with.is_some() {
crate::bail_parse_error!("WITH clause is not supported");
}
if on_conflict.is_some() {
crate::bail_parse_error!("ON CONFLICT clause is not supported");
}
let mut program = ProgramBuilder::new();
let init_label = program.allocate_label();
program.emit_insn_with_label_dependency(
@@ -46,6 +201,10 @@ pub fn translate_insert(
None => crate::bail_corrupt_error!("Parse error: no such table: {}", table_name),
};
let table = Rc::new(Table::BTree(table));
if !table.has_rowid() {
crate::bail_parse_error!("INSERT into WITHOUT ROWID table is not supported");
}
let cursor_id = program.alloc_cursor_id(
Some(table_name.0.clone()),
Some(table.clone().deref().clone()),
@@ -55,18 +214,49 @@ pub fn translate_insert(
Table::Index(index) => index.root_page,
Table::Pseudo(_) => todo!(),
};
let values = match body {
InsertBody::Select(select, None) => match &select.body.select {
sqlite3_parser::ast::OneSelect::Values(values) => values,
_ => todo!(),
},
_ => todo!(),
};
let mut num_cols = table.columns().len();
if table.has_rowid() {
num_cols += 1;
}
// column_registers_start[0] == rowid if has rowid
let column_registers_start = program.alloc_registers(num_cols);
let column_mappings = resolve_columns_for_insert(&table, columns, values)?;
// Check if rowid was provided (through INTEGER PRIMARY KEY as a rowid alias)
let rowid_alias_index = table.columns().iter().position(|c| c.is_rowid_alias);
let has_user_provided_rowid = {
assert!(column_mappings.len() == table.columns().len());
if let Some(index) = rowid_alias_index {
column_mappings[index].value_index.is_some()
} else {
false
}
};
// Coroutine for values
let yield_reg = program.alloc_register();
let jump_on_definition_label = program.allocate_label();
{
// allocate a register for each column in the table. if not provided by user, they will simply be set as null.
// allocate an extra register for rowid regardless of whether user provided a rowid alias column.
let num_cols = table.columns().len();
let rowid_reg = program.alloc_registers(num_cols + 1);
let column_registers_start = rowid_reg + 1;
let rowid_alias_reg = {
if has_user_provided_rowid {
Some(column_registers_start + rowid_alias_index.unwrap())
} else {
None
}
};
let record_register = program.alloc_register();
let halt_label = program.allocate_label();
let mut loop_start_offset = 0;
let inserting_multiple_rows = values.len() > 1;
// Multiple rows - use coroutine for value population
if inserting_multiple_rows {
let yield_reg = program.alloc_register();
let jump_on_definition_label = program.allocate_label();
program.emit_insn_with_label_dependency(
Insn::InitCoroutine {
yield_reg,
@@ -75,134 +265,154 @@ pub fn translate_insert(
},
jump_on_definition_label,
);
match body {
InsertBody::Select(select, None) => match &select.body.select {
sqlite3_parser::ast::OneSelect::Select {
distinctness: _,
columns: _,
from: _,
where_clause: _,
group_by: _,
window_clause: _,
} => todo!(),
sqlite3_parser::ast::OneSelect::Values(values) => {
for value in values {
for (col, expr) in value.iter().enumerate() {
let mut col = col;
if table.has_rowid() {
col += 1;
}
translate_expr(
&mut program,
None,
expr,
column_registers_start + col,
None,
)?;
}
program.emit_insn(Insn::Yield {
yield_reg,
end_offset: 0,
});
}
}
},
InsertBody::DefaultValues => todo!("default values not yet supported"),
_ => todo!(),
for value in values {
populate_column_registers(
&mut program,
value,
&column_mappings,
column_registers_start,
true,
rowid_reg,
)?;
program.emit_insn(Insn::Yield {
yield_reg,
end_offset: 0,
});
}
program.emit_insn(Insn::EndCoroutine { yield_reg });
program.resolve_label(jump_on_definition_label, program.offset());
program.emit_insn(Insn::OpenWriteAsync {
cursor_id,
root_page,
});
program.emit_insn(Insn::OpenWriteAwait {});
// Main loop
// FIXME: rollback is not implemented. E.g. if you insert 2 rows and one fails to unique constraint violation,
// the other row will still be inserted.
loop_start_offset = program.offset();
program.emit_insn_with_label_dependency(
Insn::Yield {
yield_reg,
end_offset: halt_label,
},
halt_label,
);
} else {
// Single row - populate registers directly
program.emit_insn(Insn::OpenWriteAsync {
cursor_id,
root_page,
});
program.emit_insn(Insn::OpenWriteAwait {});
populate_column_registers(
&mut program,
&values[0],
&column_mappings,
column_registers_start,
false,
rowid_reg,
)?;
}
program.resolve_label(jump_on_definition_label, program.offset());
program.emit_insn(Insn::OpenWriteAsync {
cursor_id,
root_page,
});
program.emit_insn(Insn::OpenWriteAwait {});
// Main loop
let record_register = program.alloc_register();
let halt_label = program.allocate_label();
let loop_start_offset = program.offset();
program.emit_insn_with_label_dependency(
Insn::Yield {
yield_reg,
end_offset: halt_label,
},
halt_label,
);
if table.has_rowid() {
let row_id_reg = column_registers_start;
if let Some(rowid_alias_column) = table.get_rowid_alias_column() {
let key_reg = column_registers_start + 1 + rowid_alias_column.0;
// copy key to rowid
// Common record insertion logic for both single and multiple rows
let check_rowid_is_integer_label = rowid_alias_reg.and(Some(program.allocate_label()));
if let Some(reg) = rowid_alias_reg {
// for the row record, the rowid alias column (INTEGER PRIMARY KEY) is always set to NULL
// and its value is copied to the rowid register. in the case where a single row is inserted,
// the value is written directly to the rowid register (see populate_column_registers()).
// again, not sure why this only happens in the single row case, but let's mimic sqlite.
// in the single row case we save a Copy instruction, but in the multiple rows case we do
// it here in the loop.
if inserting_multiple_rows {
program.emit_insn(Insn::Copy {
src_reg: key_reg,
dst_reg: row_id_reg,
amount: 0,
src_reg: reg,
dst_reg: rowid_reg,
amount: 0, // TODO: rename 'amount' to something else; amount==0 means 1
});
program.emit_insn(Insn::SoftNull { reg: key_reg });
// for the row record, the rowid alias column is always set to NULL
program.emit_insn(Insn::SoftNull { reg });
}
let notnull_label = program.allocate_label();
// the user provided rowid value might itself be NULL. If it is, we create a new rowid on the next instruction.
program.emit_insn_with_label_dependency(
Insn::NotNull {
reg: row_id_reg,
target_pc: notnull_label,
reg: rowid_reg,
target_pc: check_rowid_is_integer_label.unwrap(),
},
notnull_label,
check_rowid_is_integer_label.unwrap(),
);
program.emit_insn(Insn::NewRowid {
cursor: cursor_id,
rowid_reg: row_id_reg,
prev_largest_reg: 0,
});
}
program.resolve_label(notnull_label, program.offset());
program.emit_insn(Insn::MustBeInt { reg: row_id_reg });
// Create new rowid if a) not provided by user or b) provided by user but is NULL
program.emit_insn(Insn::NewRowid {
cursor: cursor_id,
rowid_reg: rowid_reg,
prev_largest_reg: 0,
});
if let Some(must_be_int_label) = check_rowid_is_integer_label {
program.resolve_label(must_be_int_label, program.offset());
// If the user provided a rowid, it must be an integer.
program.emit_insn(Insn::MustBeInt { reg: rowid_reg });
}
// Check uniqueness constraint for rowid if it was provided by user.
// When the DB allocates it there are no need for separate uniqueness checks.
if has_user_provided_rowid {
let make_record_label = program.allocate_label();
program.emit_insn_with_label_dependency(
Insn::NotExists {
cursor: cursor_id,
rowid_reg: row_id_reg,
rowid_reg: rowid_reg,
target_pc: make_record_label,
},
make_record_label,
);
// TODO: rollback
let rowid_column_name = if let Some(index) = rowid_alias_index {
table.column_index_to_name(index).unwrap()
} else {
"rowid"
};
program.emit_insn(Insn::Halt {
err_code: SQLITE_CONSTRAINT_PRIMARYKEY,
description: format!(
"{}.{}",
table.get_name(),
table.column_index_to_name(0).unwrap()
),
description: format!("{}.{}", table.get_name(), rowid_column_name),
});
program.resolve_label(make_record_label, program.offset());
program.emit_insn(Insn::MakeRecord {
start_reg: column_registers_start + 1,
count: num_cols - 1,
dest_reg: record_register,
});
program.emit_insn(Insn::InsertAsync {
cursor: cursor_id,
key_reg: column_registers_start,
record_reg: record_register,
flag: 0,
});
program.emit_insn(Insn::InsertAwait { cursor_id });
}
program.emit_insn(Insn::Goto {
target_pc: loop_start_offset,
// Create and insert the record
program.emit_insn(Insn::MakeRecord {
start_reg: column_registers_start,
count: num_cols,
dest_reg: record_register,
});
program.emit_insn(Insn::InsertAsync {
cursor: cursor_id,
key_reg: rowid_reg,
record_reg: record_register,
flag: 0,
});
program.emit_insn(Insn::InsertAwait { cursor_id });
if inserting_multiple_rows {
// For multiple rows, loop back
program.emit_insn(Insn::Goto {
target_pc: loop_start_offset,
});
}
program.resolve_label(halt_label, program.offset());
program.emit_insn(Insn::Halt {
err_code: 0,
description: String::new(),
});
program.resolve_label(init_label, program.offset());
program.emit_insn(Insn::Transaction { write: true });
program.emit_constant_insns();

View File

@@ -106,7 +106,7 @@ impl Interactions {
.iter_mut()
.find(|t| t.name == insert.table)
.unwrap();
table.rows.push(insert.values.clone());
table.rows.extend(insert.values.clone());
}
Query::Delete(_) => todo!(),
Query::Select(_) => {}
@@ -320,7 +320,7 @@ fn property_insert_select<R: rand::Rng>(rng: &mut R, env: &SimulatorEnv) -> Inte
// Insert the row
let insert_query = Interaction::Query(Query::Insert(Insert {
table: table.name.clone(),
values: row.clone(),
values: vec![row.clone()],
}));
// Select the row

View File

@@ -37,10 +37,15 @@ impl ArbitraryFrom<Vec<&Table>> for Select {
impl ArbitraryFrom<Table> for Insert {
fn arbitrary_from<R: Rng>(rng: &mut R, table: &Table) -> Self {
let values = table
.columns
.iter()
.map(|c| Value::arbitrary_from(rng, &c.column_type))
let num_rows = rng.gen_range(1..10);
let values: Vec<Vec<Value>> = (0..num_rows)
.map(|_| {
table
.columns
.iter()
.map(|c| Value::arbitrary_from(rng, &c.column_type))
.collect()
})
.collect();
Insert {
table: table.name.clone(),

View File

@@ -75,7 +75,7 @@ pub(crate) struct Select {
#[derive(Clone, Debug, PartialEq)]
pub(crate) struct Insert {
pub(crate) table: String,
pub(crate) values: Vec<Value>,
pub(crate) values: Vec<Vec<Value>>,
}
#[derive(Clone, Debug, PartialEq)]
@@ -104,14 +104,21 @@ impl Display for Query {
predicate: guard,
}) => write!(f, "SELECT * FROM {} WHERE {}", table, guard),
Query::Insert(Insert { table, values }) => {
write!(f, "INSERT INTO {} VALUES (", table)?;
for (i, v) in values.iter().enumerate() {
write!(f, "INSERT INTO {} VALUES ", table)?;
for (i, row) in values.iter().enumerate() {
if i != 0 {
write!(f, ", ")?;
}
write!(f, "{}", v)?;
write!(f, "(")?;
for (j, value) in row.iter().enumerate() {
if j != 0 {
write!(f, ", ")?;
}
write!(f, "{}", value)?;
}
write!(f, ")")?;
}
write!(f, ")")
Ok(())
}
Query::Delete(Delete {
table,