From ae6f60b60382a38ebf3a8f4f0a904e162cbcf304 Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Fri, 29 Aug 2025 17:38:16 -0400 Subject: [PATCH] initial pass at upsert, integrate upsert into insert translation --- core/translate/insert.rs | 204 +++++++++++++++++++++++++++++---------- 1 file changed, 153 insertions(+), 51 deletions(-) diff --git a/core/translate/insert.rs b/core/translate/insert.rs index a4b9e7bf0..b1338f6dc 100644 --- a/core/translate/insert.rs +++ b/core/translate/insert.rs @@ -1,7 +1,7 @@ use std::sync::Arc; - use turso_parser::ast::{ - self, Expr, InsertBody, OneSelect, QualifiedName, ResolveType, ResultColumn, With, + self, Expr, InsertBody, OneSelect, QualifiedName, ResolveType, ResultColumn, Upsert, UpsertDo, + With, }; use crate::error::{SQLITE_CONSTRAINT_NOTNULL, SQLITE_CONSTRAINT_PRIMARYKEY}; @@ -13,6 +13,10 @@ use crate::translate::expr::{ emit_returning_results, process_returning_clause, ReturningValueRegisters, }; use crate::translate::planner::ROWID; +use crate::translate::upsert::{ + collect_set_clauses_for_upsert, emit_upsert, rewrite_excluded_in_expr, upsert_matches_index, + upsert_matches_pk, +}; use crate::util::normalize_ident; use crate::vdbe::builder::ProgramBuilderOpts; use crate::vdbe::insn::{IdxInsertFlags, InsertFlags, RegisterOrLiteral}; @@ -101,48 +105,56 @@ pub fn translate_insert( let root_page = btree_table.root_page; let mut values: Option>> = None; + let mut upsert_opt: Option = None; + let inserting_multiple_rows = match &mut body { - InsertBody::Select(select, _) => match &mut select.body.select { - // TODO see how to avoid clone - OneSelect::Values(values_expr) if values_expr.len() <= 1 => { - if values_expr.is_empty() { - crate::bail_parse_error!("no values to insert"); - } - let mut param_idx = 1; - for expr in values_expr.iter_mut().flat_map(|v| v.iter_mut()) { - match expr.as_mut() { - Expr::Id(name) => { - if name.is_double_quoted() { - *expr = - Expr::Literal(ast::Literal::String(name.to_string())).into(); - } else { - // an INSERT INTO ... VALUES (...) cannot reference columns - crate::bail_parse_error!("no such column: {name}"); - } - } - Expr::Qualified(first_name, second_name) => { - // an INSERT INTO ... VALUES (...) cannot reference columns - crate::bail_parse_error!("no such column: {first_name}.{second_name}"); - } - _ => {} + InsertBody::Select(select, upsert) => { + upsert_opt = upsert.as_deref().cloned(); + match &mut select.body.select { + // TODO see how to avoid clone + OneSelect::Values(values_expr) if values_expr.len() <= 1 => { + if values_expr.is_empty() { + crate::bail_parse_error!("no values to insert"); } - rewrite_expr(expr, &mut param_idx)?; + let mut param_idx = 1; + for expr in values_expr.iter_mut().flat_map(|v| v.iter_mut()) { + match expr.as_mut() { + Expr::Id(name) => { + if name.is_double_quoted() { + *expr = Expr::Literal(ast::Literal::String(name.to_string())) + .into(); + } else { + // an INSERT INTO ... VALUES (...) cannot reference columns + crate::bail_parse_error!("no such column: {name}"); + } + } + Expr::Qualified(first_name, second_name) => { + // an INSERT INTO ... VALUES (...) cannot reference columns + crate::bail_parse_error!( + "no such column: {first_name}.{second_name}" + ); + } + _ => {} + } + rewrite_expr(expr, &mut param_idx)?; + } + values = values_expr.pop(); + false } - values = values_expr.pop(); - false + _ => true, } - _ => true, - }, + } InsertBody::DefaultValues => false, }; let halt_label = program.allocate_label(); let loop_start_label = program.allocate_label(); + let row_done_label = program.allocate_label(); let cdc_table = prepare_cdc_if_necessary(&mut program, schema, table.get_name())?; // Process RETURNING clause using shared module - let (result_columns, _) = process_returning_clause( + let (mut result_columns, _) = process_returning_clause( &mut returning, &table, table_name.as_str(), @@ -158,7 +170,6 @@ pub fn translate_insert( let mut yield_reg_opt = None; let mut temp_table_ctx = None; let (num_values, cursor_id) = match body { - // TODO: upsert InsertBody::Select(select, _) => { // Simple Common case of INSERT INTO VALUES (...) if matches!(&select.body.select, OneSelect::Values(values) if values.len() <= 1) { @@ -336,6 +347,7 @@ pub fn translate_insert( db: 0, }); } + // Common record insertion logic for both single and multiple rows let has_user_provided_rowid = insertion.key.is_provided_by_user(); let check_rowid_is_integer_label = if has_user_provided_rowid { @@ -365,6 +377,17 @@ pub fn translate_insert( }); } + let emit_halt_with_constraint = |program: &mut ProgramBuilder, col_name: &str| { + let mut description = String::with_capacity(table_name.as_str().len() + col_name.len() + 2); + description.push_str(table_name.as_str()); + description.push('.'); + description.push_str(col_name); + program.emit_insn(Insn::Halt { + err_code: SQLITE_CONSTRAINT_PRIMARYKEY, + description, + }); + }; + // Check uniqueness constraint for rowid if it was provided by user. // When the DB allocates it there are no need for separate uniqueness checks. if has_user_provided_rowid { @@ -375,15 +398,47 @@ pub fn translate_insert( target_pc: make_record_label, }); let rowid_column_name = insertion.key.column_name(); - let mut description = - String::with_capacity(table_name.as_str().len() + rowid_column_name.len() + 2); - description.push_str(table_name.as_str()); - description.push('.'); - description.push_str(rowid_column_name); - program.emit_insn(Insn::Halt { - err_code: SQLITE_CONSTRAINT_PRIMARYKEY, - description, - }); + + // emit halt for every case *except* when upsert handles the conflict + 'emit_halt: { + if let Some(ref mut upsert) = upsert_opt.as_mut() { + if upsert_matches_pk(upsert, &table) { + match upsert.do_clause { + UpsertDo::Nothing => { + program.emit_insn(Insn::Goto { + target_pc: row_done_label, + }); + } + UpsertDo::Set { + ref mut sets, + ref mut where_clause, + } => { + let mut rewritten_sets = + collect_set_clauses_for_upsert(&table, sets, &insertion)?; + if let Some(expr) = where_clause.as_mut() { + rewrite_excluded_in_expr(expr, &insertion); + } + emit_upsert( + &mut program, + schema, + &table, + cursor_id, + insertion.key_register(), + &mut rewritten_sets, + where_clause, + &resolver, + &idx_cursors, + &mut result_columns, + cdc_table.as_ref().map(|c| c.0), + row_done_label, + )?; + } + } + break 'emit_halt; + } + } + emit_halt_with_constraint(&mut program, rowid_column_name); + } program.preassign_label_to_next_insn(make_record_label); } @@ -460,14 +515,56 @@ pub fn translate_insert( }, ); - program.emit_insn(Insn::Halt { - err_code: SQLITE_CONSTRAINT_PRIMARYKEY, - description: column_names, - }); + 'emit_halt: { + if let Some(ref mut upsert) = upsert_opt.as_mut() { + if upsert_matches_index(upsert, index, &table) { + match upsert.do_clause { + UpsertDo::Nothing => { + program.emit_insn(Insn::Goto { + target_pc: row_done_label, + }); + } + UpsertDo::Set { + ref mut sets, + ref mut where_clause, + } => { + let mut rewritten_sets = + collect_set_clauses_for_upsert(&table, sets, &insertion)?; + if let Some(expr) = where_clause.as_mut() { + rewrite_excluded_in_expr(expr, &insertion); + } + let conflict_rowid_reg = program.alloc_register(); + program.emit_insn(Insn::IdxRowId { + cursor_id: idx_cursor_id, + dest: conflict_rowid_reg, + }); + emit_upsert( + &mut program, + schema, + &table, + cursor_id, + conflict_rowid_reg, + &mut rewritten_sets, + where_clause, + &resolver, + &idx_cursors, + &mut result_columns, + cdc_table.as_ref().map(|c| c.0), + row_done_label, + )?; + } + } + break 'emit_halt; + } + } + program.emit_insn(Insn::Halt { + err_code: SQLITE_CONSTRAINT_PRIMARYKEY, + description: column_names, + }); + } program.resolve_label(label_idx_insert, program.offset()); } - // now do the actual index insertion using the unpacked registers program.emit_insn(Insn::IdxInsert { cursor_id: idx_cursor_id, @@ -570,6 +667,8 @@ pub fn translate_insert( if inserting_multiple_rows { if let Some(temp_table_ctx) = temp_table_ctx { + program.resolve_label(row_done_label, program.offset()); + program.emit_insn(Insn::Next { cursor_id: temp_table_ctx.cursor_id, pc_if_next: temp_table_ctx.loop_start_label, @@ -581,10 +680,13 @@ pub fn translate_insert( }); } else { // For multiple rows which not require a temp table, loop back + program.resolve_label(row_done_label, program.offset()); program.emit_insn(Insn::Goto { target_pc: loop_start_label, }); } + } else { + program.resolve_label(row_done_label, program.offset()); } program.resolve_label(halt_label, program.offset()); @@ -607,7 +709,7 @@ pub const ROWID_COLUMN: &Column = &Column { /// Represents how a table should be populated during an INSERT. #[derive(Debug)] -struct Insertion<'a> { +pub struct Insertion<'a> { /// The integer key ("rowid") provided to the VDBE. key: InsertionKey<'a>, /// The column values that will be fed to the MakeRecord instruction to insert the row. @@ -708,18 +810,18 @@ impl InsertionKey<'_> { /// In a vector of [ColMapping], the index of a given [ColMapping] is /// the position of the column in the table. #[derive(Debug)] -struct ColMapping<'a> { +pub struct ColMapping<'a> { /// Column definition - column: &'a Column, + pub column: &'a Column, /// Index of the value to use from a tuple in the insert statement. /// This is needed because the values in the insert statement are not necessarily /// in the same order as the columns in the table, nor do they necessarily contain /// all of the columns in the table. /// If None, a NULL will be emitted for the column, unless it has a default value. /// A NULL rowid alias column's value will be autogenerated. - value_index: Option, + pub value_index: Option, /// Register where the value will be stored for insertion into the table. - register: usize, + pub register: usize, } /// Resolves how each column in a table should be populated during an INSERT.