initial pass at upsert, integrate upsert into insert translation

This commit is contained in:
PThorpe92
2025-08-29 17:38:16 -04:00
parent efd15721b1
commit ae6f60b603

View File

@@ -1,7 +1,7 @@
use std::sync::Arc;
use turso_parser::ast::{
self, Expr, InsertBody, OneSelect, QualifiedName, ResolveType, ResultColumn, With,
self, Expr, InsertBody, OneSelect, QualifiedName, ResolveType, ResultColumn, Upsert, UpsertDo,
With,
};
use crate::error::{SQLITE_CONSTRAINT_NOTNULL, SQLITE_CONSTRAINT_PRIMARYKEY};
@@ -13,6 +13,10 @@ use crate::translate::expr::{
emit_returning_results, process_returning_clause, ReturningValueRegisters,
};
use crate::translate::planner::ROWID;
use crate::translate::upsert::{
collect_set_clauses_for_upsert, emit_upsert, rewrite_excluded_in_expr, upsert_matches_index,
upsert_matches_pk,
};
use crate::util::normalize_ident;
use crate::vdbe::builder::ProgramBuilderOpts;
use crate::vdbe::insn::{IdxInsertFlags, InsertFlags, RegisterOrLiteral};
@@ -101,48 +105,56 @@ pub fn translate_insert(
let root_page = btree_table.root_page;
let mut values: Option<Vec<Box<Expr>>> = None;
let mut upsert_opt: Option<Upsert> = None;
let inserting_multiple_rows = match &mut body {
InsertBody::Select(select, _) => match &mut select.body.select {
// TODO see how to avoid clone
OneSelect::Values(values_expr) if values_expr.len() <= 1 => {
if values_expr.is_empty() {
crate::bail_parse_error!("no values to insert");
}
let mut param_idx = 1;
for expr in values_expr.iter_mut().flat_map(|v| v.iter_mut()) {
match expr.as_mut() {
Expr::Id(name) => {
if name.is_double_quoted() {
*expr =
Expr::Literal(ast::Literal::String(name.to_string())).into();
} else {
// an INSERT INTO ... VALUES (...) cannot reference columns
crate::bail_parse_error!("no such column: {name}");
}
}
Expr::Qualified(first_name, second_name) => {
// an INSERT INTO ... VALUES (...) cannot reference columns
crate::bail_parse_error!("no such column: {first_name}.{second_name}");
}
_ => {}
InsertBody::Select(select, upsert) => {
upsert_opt = upsert.as_deref().cloned();
match &mut select.body.select {
// TODO see how to avoid clone
OneSelect::Values(values_expr) if values_expr.len() <= 1 => {
if values_expr.is_empty() {
crate::bail_parse_error!("no values to insert");
}
rewrite_expr(expr, &mut param_idx)?;
let mut param_idx = 1;
for expr in values_expr.iter_mut().flat_map(|v| v.iter_mut()) {
match expr.as_mut() {
Expr::Id(name) => {
if name.is_double_quoted() {
*expr = Expr::Literal(ast::Literal::String(name.to_string()))
.into();
} else {
// an INSERT INTO ... VALUES (...) cannot reference columns
crate::bail_parse_error!("no such column: {name}");
}
}
Expr::Qualified(first_name, second_name) => {
// an INSERT INTO ... VALUES (...) cannot reference columns
crate::bail_parse_error!(
"no such column: {first_name}.{second_name}"
);
}
_ => {}
}
rewrite_expr(expr, &mut param_idx)?;
}
values = values_expr.pop();
false
}
values = values_expr.pop();
false
_ => true,
}
_ => true,
},
}
InsertBody::DefaultValues => false,
};
let halt_label = program.allocate_label();
let loop_start_label = program.allocate_label();
let row_done_label = program.allocate_label();
let cdc_table = prepare_cdc_if_necessary(&mut program, schema, table.get_name())?;
// Process RETURNING clause using shared module
let (result_columns, _) = process_returning_clause(
let (mut result_columns, _) = process_returning_clause(
&mut returning,
&table,
table_name.as_str(),
@@ -158,7 +170,6 @@ pub fn translate_insert(
let mut yield_reg_opt = None;
let mut temp_table_ctx = None;
let (num_values, cursor_id) = match body {
// TODO: upsert
InsertBody::Select(select, _) => {
// Simple Common case of INSERT INTO <table> VALUES (...)
if matches!(&select.body.select, OneSelect::Values(values) if values.len() <= 1) {
@@ -336,6 +347,7 @@ pub fn translate_insert(
db: 0,
});
}
// Common record insertion logic for both single and multiple rows
let has_user_provided_rowid = insertion.key.is_provided_by_user();
let check_rowid_is_integer_label = if has_user_provided_rowid {
@@ -365,6 +377,17 @@ pub fn translate_insert(
});
}
let emit_halt_with_constraint = |program: &mut ProgramBuilder, col_name: &str| {
let mut description = String::with_capacity(table_name.as_str().len() + col_name.len() + 2);
description.push_str(table_name.as_str());
description.push('.');
description.push_str(col_name);
program.emit_insn(Insn::Halt {
err_code: SQLITE_CONSTRAINT_PRIMARYKEY,
description,
});
};
// Check uniqueness constraint for rowid if it was provided by user.
// When the DB allocates it there are no need for separate uniqueness checks.
if has_user_provided_rowid {
@@ -375,15 +398,47 @@ pub fn translate_insert(
target_pc: make_record_label,
});
let rowid_column_name = insertion.key.column_name();
let mut description =
String::with_capacity(table_name.as_str().len() + rowid_column_name.len() + 2);
description.push_str(table_name.as_str());
description.push('.');
description.push_str(rowid_column_name);
program.emit_insn(Insn::Halt {
err_code: SQLITE_CONSTRAINT_PRIMARYKEY,
description,
});
// emit halt for every case *except* when upsert handles the conflict
'emit_halt: {
if let Some(ref mut upsert) = upsert_opt.as_mut() {
if upsert_matches_pk(upsert, &table) {
match upsert.do_clause {
UpsertDo::Nothing => {
program.emit_insn(Insn::Goto {
target_pc: row_done_label,
});
}
UpsertDo::Set {
ref mut sets,
ref mut where_clause,
} => {
let mut rewritten_sets =
collect_set_clauses_for_upsert(&table, sets, &insertion)?;
if let Some(expr) = where_clause.as_mut() {
rewrite_excluded_in_expr(expr, &insertion);
}
emit_upsert(
&mut program,
schema,
&table,
cursor_id,
insertion.key_register(),
&mut rewritten_sets,
where_clause,
&resolver,
&idx_cursors,
&mut result_columns,
cdc_table.as_ref().map(|c| c.0),
row_done_label,
)?;
}
}
break 'emit_halt;
}
}
emit_halt_with_constraint(&mut program, rowid_column_name);
}
program.preassign_label_to_next_insn(make_record_label);
}
@@ -460,14 +515,56 @@ pub fn translate_insert(
},
);
program.emit_insn(Insn::Halt {
err_code: SQLITE_CONSTRAINT_PRIMARYKEY,
description: column_names,
});
'emit_halt: {
if let Some(ref mut upsert) = upsert_opt.as_mut() {
if upsert_matches_index(upsert, index, &table) {
match upsert.do_clause {
UpsertDo::Nothing => {
program.emit_insn(Insn::Goto {
target_pc: row_done_label,
});
}
UpsertDo::Set {
ref mut sets,
ref mut where_clause,
} => {
let mut rewritten_sets =
collect_set_clauses_for_upsert(&table, sets, &insertion)?;
if let Some(expr) = where_clause.as_mut() {
rewrite_excluded_in_expr(expr, &insertion);
}
let conflict_rowid_reg = program.alloc_register();
program.emit_insn(Insn::IdxRowId {
cursor_id: idx_cursor_id,
dest: conflict_rowid_reg,
});
emit_upsert(
&mut program,
schema,
&table,
cursor_id,
conflict_rowid_reg,
&mut rewritten_sets,
where_clause,
&resolver,
&idx_cursors,
&mut result_columns,
cdc_table.as_ref().map(|c| c.0),
row_done_label,
)?;
}
}
break 'emit_halt;
}
}
program.emit_insn(Insn::Halt {
err_code: SQLITE_CONSTRAINT_PRIMARYKEY,
description: column_names,
});
}
program.resolve_label(label_idx_insert, program.offset());
}
// now do the actual index insertion using the unpacked registers
program.emit_insn(Insn::IdxInsert {
cursor_id: idx_cursor_id,
@@ -570,6 +667,8 @@ pub fn translate_insert(
if inserting_multiple_rows {
if let Some(temp_table_ctx) = temp_table_ctx {
program.resolve_label(row_done_label, program.offset());
program.emit_insn(Insn::Next {
cursor_id: temp_table_ctx.cursor_id,
pc_if_next: temp_table_ctx.loop_start_label,
@@ -581,10 +680,13 @@ pub fn translate_insert(
});
} else {
// For multiple rows which not require a temp table, loop back
program.resolve_label(row_done_label, program.offset());
program.emit_insn(Insn::Goto {
target_pc: loop_start_label,
});
}
} else {
program.resolve_label(row_done_label, program.offset());
}
program.resolve_label(halt_label, program.offset());
@@ -607,7 +709,7 @@ pub const ROWID_COLUMN: &Column = &Column {
/// Represents how a table should be populated during an INSERT.
#[derive(Debug)]
struct Insertion<'a> {
pub struct Insertion<'a> {
/// The integer key ("rowid") provided to the VDBE.
key: InsertionKey<'a>,
/// The column values that will be fed to the MakeRecord instruction to insert the row.
@@ -708,18 +810,18 @@ impl InsertionKey<'_> {
/// In a vector of [ColMapping], the index of a given [ColMapping] is
/// the position of the column in the table.
#[derive(Debug)]
struct ColMapping<'a> {
pub struct ColMapping<'a> {
/// Column definition
column: &'a Column,
pub column: &'a Column,
/// Index of the value to use from a tuple in the insert statement.
/// This is needed because the values in the insert statement are not necessarily
/// in the same order as the columns in the table, nor do they necessarily contain
/// all of the columns in the table.
/// If None, a NULL will be emitted for the column, unless it has a default value.
/// A NULL rowid alias column's value will be autogenerated.
value_index: Option<usize>,
pub value_index: Option<usize>,
/// Register where the value will be stored for insertion into the table.
register: usize,
pub register: usize,
}
/// Resolves how each column in a table should be populated during an INSERT.