From 22e98964cc07f85feae166184acd0081c68d6483 Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Tue, 14 Oct 2025 12:48:34 -0400 Subject: [PATCH 1/3] Refactor INSERT translation to a modular setup with emitter context --- core/translate/insert.rs | 1595 +++++++++++++++++++++----------------- core/translate/mod.rs | 26 +- core/translate/upsert.rs | 85 +- 3 files changed, 956 insertions(+), 750 deletions(-) diff --git a/core/translate/insert.rs b/core/translate/insert.rs index a8339c6e3..c1fab97f1 100644 --- a/core/translate/insert.rs +++ b/core/translate/insert.rs @@ -2,7 +2,6 @@ use std::num::NonZeroUsize; use std::sync::Arc; use turso_parser::ast::{ self, Expr, InsertBody, OneSelect, QualifiedName, ResolveType, ResultColumn, Upsert, UpsertDo, - With, }; use crate::error::{ @@ -20,7 +19,7 @@ use crate::translate::fkeys::{ build_index_affinity_string, emit_fk_violation, emit_guarded_fk_decrement, index_probe, open_read_index, open_read_table, }; -use crate::translate::plan::TableReferences; +use crate::translate::plan::{ResultSetColumn, TableReferences}; use crate::translate::planner::ROWID_STRS; use crate::translate::upsert::{ collect_set_clauses_for_upsert, emit_upsert, resolve_upsert_target, ResolvedUpsertTarget, @@ -36,22 +35,136 @@ use crate::{ insn::Insn, }, }; -use crate::{Result, VirtualTable}; +use crate::{Connection, Result, VirtualTable}; use super::emitter::Resolver; use super::expr::{translate_expr, translate_expr_no_constant_opt, NoConstantOptReason}; use super::plan::QueryDestination; use super::select::translate_select; -struct TempTableCtx { +pub struct TempTableCtx { cursor_id: usize, loop_start_label: BranchOffset, loop_end_label: BranchOffset, } +/// Validate anything with this insert statement that should throw an early parse error +fn validate(table_name: &str, resolver: &Resolver, table: &Table) -> Result<()> { + // Check if this is a system table that should be protected from direct writes + if crate::schema::is_system_table(table_name) { + crate::bail_parse_error!("table {} may not be modified", table_name); + } + // Check if this table has any incompatible dependent views + let incompatible_views = resolver.schema.has_incompatible_dependent_views(table_name); + if !incompatible_views.is_empty() { + use crate::incremental::compiler::DBSP_CIRCUIT_VERSION; + crate::bail_parse_error!( + "Cannot INSERT into table '{}' because it has incompatible dependent materialized view(s): {}. \n\ + These views were created with a different DBSP version than the current version ({}). \n\ + Please DROP and recreate the view(s) before modifying this table.", + table_name, + incompatible_views.join(", "), + DBSP_CIRCUIT_VERSION + ); + } + + // Check if this is a materialized view + if resolver.schema.is_materialized_view(table_name) { + crate::bail_parse_error!("cannot modify materialized view {}", table_name); + } + if resolver.schema.table_has_indexes(table_name) && !resolver.schema.indexes_enabled() { + // Let's disable altering a table with indices altogether instead of checking column by + // column to be extra safe. + crate::bail_parse_error!( + "INSERT to table with indexes is disabled. Omit the `--experimental-indexes=false` flag to enable this feature." + ); + } + if table.btree().is_some_and(|t| !t.has_rowid) { + crate::bail_parse_error!("INSERT into WITHOUT ROWID table is not supported"); + } + + Ok(()) +} + +#[allow(dead_code)] +pub struct InsertEmitCtx<'a> { + pub table: &'a Arc, + pub idx_cursors: Vec<(&'a String, i64, usize)>, + pub temp_table_ctx: Option, + pub on_conflict: ResolveType, + pub num_values: usize, + pub yield_reg_opt: Option, + pub conflict_rowid_reg: usize, + pub cursor_id: usize, + + /// Labels + pub halt_label: BranchOffset, + pub row_done_label: BranchOffset, + pub stmt_epilogue: BranchOffset, + pub loop_start_label: BranchOffset, + pub key_ready_for_uniqueness_check_label: BranchOffset, + pub key_generation_label: BranchOffset, + pub select_exhausted_label: Option, + + /// CDC table info + pub cdc_table: Option<(usize, Arc)>, + /// Autoincrement sequence table info + pub autoincrement_meta: Option, +} + +impl<'a> InsertEmitCtx<'a> { + fn new( + program: &mut ProgramBuilder, + resolver: &'a Resolver, + table: &'a Arc, + on_conflict: Option, + cdc_table: Option<(usize, Arc)>, + num_values: usize, + temp_table_ctx: Option, + ) -> Self { + // allocate cursor id's for each btree index cursor we'll need to populate the indexes + // (idx name, root_page, idx cursor id) + let idx_cursors = resolver + .schema + .get_indices(table.name.as_str()) + .map(|idx| { + ( + &idx.name, + idx.root_page, + program.alloc_cursor_id(CursorType::BTreeIndex(idx.clone())), + ) + }) + .collect::>(); + let halt_label = program.allocate_label(); + let loop_start_label = program.allocate_label(); + let row_done_label = program.allocate_label(); + let stmt_epilogue = program.allocate_label(); + let key_ready_for_uniqueness_check_label = program.allocate_label(); + let key_generation_label = program.allocate_label(); + Self { + table, + idx_cursors, + temp_table_ctx, + on_conflict: on_conflict.unwrap_or(ResolveType::Abort), + yield_reg_opt: None, + conflict_rowid_reg: program.alloc_register(), + select_exhausted_label: None, + cursor_id: 0, // set later in emit_source_emission + halt_label, + row_done_label, + stmt_epilogue, + loop_start_label, + cdc_table, + num_values, + key_ready_for_uniqueness_check_label, + key_generation_label, + autoincrement_meta: None, + } + } +} + #[allow(clippy::too_many_arguments)] pub fn translate_insert( - with: Option, resolver: &Resolver, on_conflict: Option, tbl_name: QualifiedName, @@ -67,59 +180,15 @@ pub fn translate_insert( approx_num_labels: 5, }; program.extend(&opts); - if with.is_some() { - crate::bail_parse_error!("WITH clause is not supported"); - } - if on_conflict.is_some() { - crate::bail_parse_error!("ON CONFLICT clause is not supported"); - } - - if resolver - .schema - .table_has_indexes(&tbl_name.name.to_string()) - && !resolver.schema.indexes_enabled() - { - // Let's disable altering a table with indices altogether instead of checking column by - // column to be extra safe. - crate::bail_parse_error!( - "INSERT to table with indexes is disabled. Omit the `--experimental-indexes=false` flag to enable this feature." - ); - } let table_name = &tbl_name.name; - - // Check if this is a system table that should be protected from direct writes - if crate::schema::is_system_table(table_name.as_str()) { - crate::bail_parse_error!("table {} may not be modified", table_name); - } - let table = match resolver.schema.get_table(table_name.as_str()) { Some(table) => table, None => crate::bail_parse_error!("no such table: {}", table_name), }; + validate(table_name.as_str(), resolver, &table)?; + let fk_enabled = connection.foreign_keys_enabled(); - - // Check if this is a materialized view - if resolver.schema.is_materialized_view(table_name.as_str()) { - crate::bail_parse_error!("cannot modify materialized view {}", table_name); - } - - // Check if this table has any incompatible dependent views - let incompatible_views = resolver - .schema - .has_incompatible_dependent_views(table_name.as_str()); - if !incompatible_views.is_empty() { - use crate::incremental::compiler::DBSP_CIRCUIT_VERSION; - crate::bail_parse_error!( - "Cannot INSERT into table '{}' because it has incompatible dependent materialized view(s): {}. \n\ - These views were created with a different DBSP version than the current version ({}). \n\ - Please DROP and recreate the view(s) before modifying this table.", - table_name, - incompatible_views.join(", "), - DBSP_CIRCUIT_VERSION - ); - } - if let Some(virtual_table) = &table.virtual_table() { program = translate_virtual_table_insert( program, @@ -135,101 +204,19 @@ pub fn translate_insert( let Some(btree_table) = table.btree() else { crate::bail_parse_error!("no such table: {}", table_name); }; - if !btree_table.has_rowid { - crate::bail_parse_error!("INSERT into WITHOUT ROWID table is not supported"); - } let root_page = btree_table.root_page; - let mut values: Option>> = None; - let mut upsert_actions: Vec<(ResolvedUpsertTarget, BranchOffset, Box)> = Vec::new(); - - let mut inserting_multiple_rows = false; - if let InsertBody::Select(select, upsert_opt) = &mut body { - match &mut select.body.select { - // TODO see how to avoid clone - OneSelect::Values(values_expr) if values_expr.len() <= 1 => { - if values_expr.is_empty() { - crate::bail_parse_error!("no values to insert"); - } - for expr in values_expr.iter_mut().flat_map(|v| v.iter_mut()) { - match expr.as_mut() { - Expr::Id(name) => { - if name.quoted_with('"') { - *expr = - Expr::Literal(ast::Literal::String(name.as_literal())).into(); - } else { - // an INSERT INTO ... VALUES (...) cannot reference columns - crate::bail_parse_error!("no such column: {name}"); - } - } - Expr::Qualified(first_name, second_name) => { - // an INSERT INTO ... VALUES (...) cannot reference columns - crate::bail_parse_error!("no such column: {first_name}.{second_name}"); - } - _ => {} - } - bind_and_rewrite_expr( - expr, - None, - None, - connection, - &mut program.param_ctx, - BindingBehavior::ResultColumnsNotAllowed, - )?; - } - values = values_expr.pop(); - } - _ => inserting_multiple_rows = true, - } - while let Some(mut upsert) = upsert_opt.take() { - if let UpsertDo::Set { - ref mut sets, - ref mut where_clause, - } = &mut upsert.do_clause - { - for set in sets.iter_mut() { - bind_and_rewrite_expr( - &mut set.expr, - None, - None, - connection, - &mut program.param_ctx, - BindingBehavior::AllowUnboundIdentifiers, - )?; - } - if let Some(ref mut where_expr) = where_clause { - bind_and_rewrite_expr( - where_expr, - None, - None, - connection, - &mut program.param_ctx, - BindingBehavior::AllowUnboundIdentifiers, - )?; - } - } - let next = upsert.next.take(); - upsert_actions.push(( - // resolve the constrained target for UPSERT in the chain - resolve_upsert_target(resolver.schema, &table, &upsert)?, - program.allocate_label(), - upsert, - )); - *upsert_opt = next; - } - } + let BoundInsertResult { + mut values, + mut upsert_actions, + inserting_multiple_rows, + } = bind_insert(&mut program, resolver, &table, &mut body, connection)?; if inserting_multiple_rows && btree_table.has_autoincrement { ensure_sequence_initialized(&mut program, resolver.schema, &btree_table)?; } - let halt_label = program.allocate_label(); - let loop_start_label = program.allocate_label(); - let row_done_label = program.allocate_label(); - let stmt_epilogue = program.allocate_label(); - let mut select_exhausted_label: Option = None; - let cdc_table = prepare_cdc_if_necessary(&mut program, resolver.schema, table.get_name())?; // Process RETURNING clause using shared module @@ -240,234 +227,64 @@ pub fn translate_insert( &mut program, connection, )?; - let has_fks = fk_enabled && (resolver.schema.has_child_fks(table_name.as_str()) || resolver .schema .any_resolved_fks_referencing(table_name.as_str())); - let mut yield_reg_opt = None; - let mut temp_table_ctx = None; - let (num_values, cursor_id) = match body { - InsertBody::Select(select, _) => { - // Simple Common case of INSERT INTO VALUES (...) - if matches!(&select.body.select, OneSelect::Values(values) if values.len() <= 1) { - ( - values.as_ref().unwrap().len(), - program.alloc_cursor_id(CursorType::BTreeTable(btree_table.clone())), - ) - } else { - // Multiple rows - use coroutine for value population - let yield_reg = program.alloc_register(); - let jump_on_definition_label = program.allocate_label(); - let start_offset_label = program.allocate_label(); - program.emit_insn(Insn::InitCoroutine { - yield_reg, - jump_on_definition: jump_on_definition_label, - start_offset: start_offset_label, - }); - program.preassign_label_to_next_insn(start_offset_label); - let query_destination = QueryDestination::CoroutineYield { - yield_reg, - coroutine_implementation_start: halt_label, - }; - program.incr_nesting(); - let result = - translate_select(select, resolver, program, query_destination, connection)?; - program = result.program; - program.decr_nesting(); + let mut ctx = InsertEmitCtx::new( + &mut program, + resolver, + &btree_table, + on_conflict, + cdc_table, + values.len(), + None, + ); - program.emit_insn(Insn::EndCoroutine { yield_reg }); - program.preassign_label_to_next_insn(jump_on_definition_label); - - let cursor_id = - program.alloc_cursor_id(CursorType::BTreeTable(btree_table.clone())); - - // From SQLite - /* Set useTempTable to TRUE if the result of the SELECT statement - ** should be written into a temporary table (template 4). Set to - ** FALSE if each output row of the SELECT can be written directly into - ** the destination table (template 3). - ** - ** A temp table must be used if the table being updated is also one - ** of the tables being read by the SELECT statement. Also use a - ** temp table in the case of row triggers. - */ - if program.is_table_open(&table) { - let temp_cursor_id = - program.alloc_cursor_id(CursorType::BTreeTable(btree_table.clone())); - temp_table_ctx = Some(TempTableCtx { - cursor_id: temp_cursor_id, - loop_start_label: program.allocate_label(), - loop_end_label: program.allocate_label(), - }); - - program.emit_insn(Insn::OpenEphemeral { - cursor_id: temp_cursor_id, - is_table: true, - }); - - // Main loop - program.preassign_label_to_next_insn(loop_start_label); - let yield_label = program.allocate_label(); - program.emit_insn(Insn::Yield { - yield_reg, - end_offset: yield_label, // stays local, we’ll route at loop end - }); - - let record_reg = program.alloc_register(); - let affinity_str = if columns.is_empty() { - btree_table - .columns - .iter() - .filter(|col| !col.hidden) - .map(|col| col.affinity().aff_mask()) - .collect::() - } else { - columns - .iter() - .map(|col_name| { - let column_name = normalize_ident(col_name.as_str()); - if ROWID_STRS - .iter() - .any(|s| s.eq_ignore_ascii_case(&column_name)) - { - return Affinity::Integer.aff_mask(); - } - table - .get_column_by_name(&column_name) - .unwrap() - .1 - .affinity() - .aff_mask() - }) - .collect::() - }; - - program.emit_insn(Insn::MakeRecord { - start_reg: program.reg_result_cols_start.unwrap_or(yield_reg + 1), - count: result.num_result_cols, - dest_reg: record_reg, - index_name: None, - affinity_str: Some(affinity_str), - }); - - let rowid_reg = program.alloc_register(); - program.emit_insn(Insn::NewRowid { - cursor: temp_cursor_id, - rowid_reg, - prev_largest_reg: 0, - }); - program.emit_insn(Insn::Insert { - cursor: temp_cursor_id, - key_reg: rowid_reg, - record_reg, - // since we are not doing an Insn::NewRowid or an Insn::NotExists here, we need to seek to ensure the insertion happens in the correct place. - flag: InsertFlags::new().require_seek(), - table_name: "".to_string(), - }); - // loop back - program.emit_insn(Insn::Goto { - target_pc: loop_start_label, - }); - program.preassign_label_to_next_insn(yield_label); - - program.emit_insn(Insn::OpenWrite { - cursor_id, - root_page: RegisterOrLiteral::Literal(root_page), - db: 0, - }); - } else { - program.emit_insn(Insn::OpenWrite { - cursor_id, - root_page: RegisterOrLiteral::Literal(root_page), - db: 0, - }); - - program.preassign_label_to_next_insn(loop_start_label); - - // on EOF, jump to select_exhausted to check FK constraints - let select_exhausted = program.allocate_label(); - select_exhausted_label = Some(select_exhausted); - program.emit_insn(Insn::Yield { - yield_reg, - end_offset: select_exhausted, - }); - } - - yield_reg_opt = Some(yield_reg); - (result.num_result_cols, cursor_id) - } - } - InsertBody::DefaultValues => { - let num_values = table.columns().len(); - values = Some( - table - .columns() - .iter() - .map(|c| { - c.default - .clone() - .unwrap_or(Box::new(ast::Expr::Literal(ast::Literal::Null))) - }) - .collect(), - ); - ( - num_values, - program.alloc_cursor_id(CursorType::BTreeTable(btree_table.clone())), - ) - } - }; + program = init_source_emission( + program, + &table, + connection, + &mut ctx, + resolver, + &mut values, + body, + &columns, + )?; let has_upsert = !upsert_actions.is_empty(); // Set up the program to return result columns if RETURNING is specified if !result_columns.is_empty() { program.result_columns = result_columns.clone(); } - - // allocate cursor id's for each btree index cursor we'll need to populate the indexes - // (idx name, root_page, idx cursor id) - let idx_cursors = resolver - .schema - .get_indices(table_name.as_str()) - .map(|idx| { - ( - &idx.name, - idx.root_page, - program.alloc_cursor_id(CursorType::BTreeIndex(idx.clone())), - ) - }) - .collect::>(); - - let insertion = build_insertion(&mut program, &table, &columns, num_values)?; - - let conflict_rowid_reg = program.alloc_register(); + let insertion = build_insertion(&mut program, &table, &columns, ctx.num_values)?; if inserting_multiple_rows { let select_result_start_reg = program .reg_result_cols_start - .unwrap_or(yield_reg_opt.unwrap() + 1); + .unwrap_or(ctx.yield_reg_opt.unwrap() + 1); translate_rows_multiple( &mut program, &insertion, select_result_start_reg, resolver, - &temp_table_ctx, + &ctx.temp_table_ctx, )?; } else { // Single row - populate registers directly program.emit_insn(Insn::OpenWrite { - cursor_id, + cursor_id: ctx.cursor_id, root_page: RegisterOrLiteral::Literal(root_page), db: 0, }); - translate_rows_single(&mut program, &values.unwrap(), &insertion, resolver)?; + translate_rows_single(&mut program, &values, &insertion, resolver)?; } // Open all the index btrees for writing - for idx_cursor in idx_cursors.iter() { + for idx_cursor in ctx.idx_cursors.iter() { program.emit_insn(Insn::OpenWrite { cursor_id: idx_cursor.2, root_page: idx_cursor.1.into(), @@ -476,11 +293,6 @@ pub fn translate_insert( } let has_user_provided_rowid = insertion.key.is_provided_by_user(); - let key_ready_for_uniqueness_check_label = program.allocate_label(); - let key_generation_label = program.allocate_label(); - - let mut autoincrement_meta = None; - if btree_table.has_autoincrement { let seq_table = resolver .schema @@ -500,7 +312,13 @@ pub fn translate_insert( let table_name_reg = program.emit_string8_new_reg(btree_table.name.clone()); let r_seq = program.alloc_register(); let r_seq_rowid = program.alloc_register(); - autoincrement_meta = Some((seq_cursor_id, r_seq, r_seq_rowid, table_name_reg)); + + ctx.autoincrement_meta = Some(AutoincMeta { + seq_cursor_id, + r_seq, + r_seq_rowid, + table_name_reg, + }); program.emit_insn(Insn::Integer { dest: r_seq, @@ -557,7 +375,7 @@ pub fn translate_insert( }); program.emit_insn(Insn::Goto { - target_pc: key_generation_label, + target_pc: ctx.key_generation_label, }); program.preassign_label_to_next_insn(must_be_int_label); @@ -566,18 +384,18 @@ pub fn translate_insert( }); program.emit_insn(Insn::Goto { - target_pc: key_ready_for_uniqueness_check_label, + target_pc: ctx.key_ready_for_uniqueness_check_label, }); } - program.preassign_label_to_next_insn(key_generation_label); - if let Some((_, r_seq, _, _)) = autoincrement_meta { + program.preassign_label_to_next_insn(ctx.key_generation_label); + if let Some(AutoincMeta { r_seq, .. }) = ctx.autoincrement_meta { let r_max = program.alloc_register(); let dummy_reg = program.alloc_register(); program.emit_insn(Insn::NewRowid { - cursor: cursor_id, + cursor: ctx.cursor_id, rowid_reg: dummy_reg, prev_largest_reg: r_max, }); @@ -618,7 +436,13 @@ pub fn translate_insert( value: 1, }); - if let Some((seq_cursor_id, _, r_seq_rowid, table_name_reg)) = autoincrement_meta { + if let Some(AutoincMeta { + seq_cursor_id, + r_seq_rowid, + table_name_reg, + .. + }) = ctx.autoincrement_meta + { emit_update_sqlite_sequence( &mut program, resolver.schema, @@ -630,319 +454,45 @@ pub fn translate_insert( } } else { program.emit_insn(Insn::NewRowid { - cursor: cursor_id, + cursor: ctx.cursor_id, rowid_reg: insertion.key_register(), prev_largest_reg: 0, }); } - program.preassign_label_to_next_insn(key_ready_for_uniqueness_check_label); + program.preassign_label_to_next_insn(ctx.key_ready_for_uniqueness_check_label); - match table.btree() { - Some(t) if t.is_strict => { - program.emit_insn(Insn::TypeCheck { - start_reg: insertion.first_col_register(), - count: insertion.col_mappings.len(), - check_generated: true, - table_reference: Arc::clone(&t), - }); - } - _ => (), + if ctx.table.is_strict { + program.emit_insn(Insn::TypeCheck { + start_reg: insertion.first_col_register(), + count: insertion.col_mappings.len(), + check_generated: true, + table_reference: Arc::clone(ctx.table), + }); } - let mut constraints_to_check = Vec::new(); - if has_user_provided_rowid { - // Check uniqueness constraint for rowid if it was provided by user. - // When the DB allocates it there are no need for separate uniqueness checks. - let position = upsert_actions - .iter() - .position(|(target, ..)| matches!(target, ResolvedUpsertTarget::PrimaryKey)); - constraints_to_check.push((ResolvedUpsertTarget::PrimaryKey, position)); - } - for index in resolver.schema.get_indices(table_name.as_str()) { - let position = upsert_actions - .iter() - .position(|(target, ..)| matches!(target, ResolvedUpsertTarget::Index(x) if Arc::ptr_eq(x, index))); - constraints_to_check.push((ResolvedUpsertTarget::Index(index.clone()), position)); - } - - constraints_to_check.sort_by(|(_, p1), (_, p2)| match (p1, p2) { - (Some(p1), Some(p2)) => p1.cmp(p2), - (Some(_), None) => std::cmp::Ordering::Less, - (None, Some(_)) => std::cmp::Ordering::Greater, - (None, None) => std::cmp::Ordering::Equal, - }); - - let upsert_catch_all_position = - if let Some((ResolvedUpsertTarget::CatchAll, ..)) = upsert_actions.last() { - Some(upsert_actions.len() - 1) - } else { - None - }; + let (constraints_to_check, upsert_catch_all_position) = build_constraints_to_check( + resolver, + table_name.as_str(), + &upsert_actions, + has_user_provided_rowid, + ); // We need to separate index handling and insertion into a `preflight` and a // `commit` phase, because in UPSERT mode we might need to skip the actual insertion, as we can // have a naked ON CONFLICT DO NOTHING, so if we eagerly insert any indexes, we could insert // invalid index entries before we hit a conflict down the line. - // - // Preflight phase: evaluate each applicable UNIQUE constraint and probe with NoConflict. - // If any probe hits: - // DO NOTHING -> jump to row_done_label. - // - // DO UPDATE (matching target) -> fetch conflicting rowid and jump to `upsert_entry`. - // - // otherwise, raise SQLITE_CONSTRAINT_UNIQUE - for (constraint, position) in constraints_to_check { - match constraint { - ResolvedUpsertTarget::PrimaryKey => { - let make_record_label = program.allocate_label(); - program.emit_insn(Insn::NotExists { - cursor: cursor_id, - rowid_reg: insertion.key_register(), - target_pc: make_record_label, - }); - let rowid_column_name = insertion.key.column_name(); + emit_preflight_constraint_checks( + &mut program, + &ctx, + resolver, + &insertion, + &upsert_actions, + &constraints_to_check, + upsert_catch_all_position, + )?; - // Conflict on rowid: attempt to route through UPSERT if it targets the PK, otherwise raise constraint. - // emit Halt for every case *except* when upsert handles the conflict - 'emit_halt: { - if let Some(position) = position.or(upsert_catch_all_position) { - // PK conflict: the conflicting rowid is exactly the attempted key - program.emit_insn(Insn::Copy { - src_reg: insertion.key_register(), - dst_reg: conflict_rowid_reg, - extra_amount: 0, - }); - program.emit_insn(Insn::Goto { - target_pc: upsert_actions[position].1, - }); - break 'emit_halt; - } - let mut description = String::with_capacity( - table_name.as_str().len() + rowid_column_name.len() + 2, - ); - description.push_str(table_name.as_str()); - description.push('.'); - description.push_str(rowid_column_name); - program.emit_insn(Insn::Halt { - err_code: SQLITE_CONSTRAINT_PRIMARYKEY, - description, - }); - } - program.preassign_label_to_next_insn(make_record_label); - } - ResolvedUpsertTarget::Index(index) => { - let column_mappings = index - .columns - .iter() - .map(|idx_col| insertion.get_col_mapping_by_name(&idx_col.name)); - // find which cursor we opened earlier for this index - let idx_cursor_id = idx_cursors - .iter() - .find(|(name, _, _)| *name == &index.name) - .map(|(_, _, c_id)| *c_id) - .expect("no cursor found for index"); - - let maybe_skip_probe_label = if let Some(where_clause) = &index.where_clause { - let mut where_for_eval = where_clause.as_ref().clone(); - rewrite_partial_index_where(&mut where_for_eval, &insertion)?; - let reg = program.alloc_register(); - translate_expr_no_constant_opt( - &mut program, - Some(&TableReferences::new_empty()), - &where_for_eval, - reg, - resolver, - NoConstantOptReason::RegisterReuse, - )?; - let lbl = program.allocate_label(); - program.emit_insn(Insn::IfNot { - reg, - target_pc: lbl, - jump_if_null: true, - }); - Some(lbl) - } else { - None - }; - - let num_cols = index.columns.len(); - // allocate scratch registers for the index columns plus rowid - let idx_start_reg = program.alloc_registers(num_cols + 1); - - // build unpacked key [idx_start_reg .. idx_start_reg+num_cols-1], and rowid in last reg, - // copy each index column from the table's column registers into these scratch regs - for (i, column_mapping) in column_mappings.clone().enumerate() { - // copy from the table's column register over to the index's scratch register - let Some(col_mapping) = column_mapping else { - return Err(crate::LimboError::PlanningError( - "Column not found in INSERT".to_string(), - )); - }; - program.emit_insn(Insn::Copy { - src_reg: col_mapping.register, - dst_reg: idx_start_reg + i, - extra_amount: 0, - }); - } - // last register is the rowid - program.emit_insn(Insn::Copy { - src_reg: insertion.key_register(), - dst_reg: idx_start_reg + num_cols, - extra_amount: 0, - }); - - if index.unique { - let aff = index - .columns - .iter() - .map(|ic| table.columns()[ic.pos_in_table].affinity().aff_mask()) - .collect::(); - program.emit_insn(Insn::Affinity { - start_reg: idx_start_reg, - count: NonZeroUsize::new(num_cols).expect("nonzero col count"), - affinities: aff, - }); - - if has_upsert { - let next_check = program.allocate_label(); - program.emit_insn(Insn::NoConflict { - cursor_id: idx_cursor_id, - target_pc: next_check, - record_reg: idx_start_reg, - num_regs: num_cols, - }); - - // Conflict detected, figure out if this UPSERT handles the conflict - if let Some(position) = position.or(upsert_catch_all_position) { - match &upsert_actions[position].2.do_clause { - UpsertDo::Nothing => { - // Bail out without writing anything - program.emit_insn(Insn::Goto { - target_pc: row_done_label, - }); - } - UpsertDo::Set { .. } => { - // Route to DO UPDATE: capture conflicting rowid then jump - program.emit_insn(Insn::IdxRowId { - cursor_id: idx_cursor_id, - dest: conflict_rowid_reg, - }); - program.emit_insn(Insn::Goto { - target_pc: upsert_actions[position].1, - }); - } - } - } - // No matching UPSERT handler so we emit constraint error - // (if conflict clause matched - VM will jump to later instructions and skip halt) - program.emit_insn(Insn::Halt { - err_code: SQLITE_CONSTRAINT_UNIQUE, - description: format_unique_violation_desc(table_name.as_str(), &index), - }); - - // continue preflight with next constraint - program.preassign_label_to_next_insn(next_check); - } else { - // No UPSERT fast-path: probe and immediately insert - let ok = program.allocate_label(); - program.emit_insn(Insn::NoConflict { - cursor_id: idx_cursor_id, - target_pc: ok, - record_reg: idx_start_reg, - num_regs: num_cols, - }); - // Unique violation without ON CONFLICT clause -> error - program.emit_insn(Insn::Halt { - err_code: SQLITE_CONSTRAINT_UNIQUE, - description: format_unique_violation_desc(table_name.as_str(), &index), - }); - program.preassign_label_to_next_insn(ok); - - // In the non-UPSERT case, we insert the index - let record_reg = program.alloc_register(); - program.emit_insn(Insn::MakeRecord { - start_reg: idx_start_reg, - count: num_cols + 1, - dest_reg: record_reg, - index_name: Some(index.name.clone()), - affinity_str: None, - }); - program.emit_insn(Insn::IdxInsert { - cursor_id: idx_cursor_id, - record_reg, - unpacked_start: Some(idx_start_reg), - unpacked_count: Some((num_cols + 1) as u16), - flags: IdxInsertFlags::new().nchange(true), - }); - } - } else { - // Non-unique index: in UPSERT mode we postpone writes to commit phase. - if !has_upsert { - // eager insert for non-unique, no UPSERT - let record_reg = program.alloc_register(); - program.emit_insn(Insn::MakeRecord { - start_reg: idx_start_reg, - count: num_cols + 1, - dest_reg: record_reg, - index_name: Some(index.name.clone()), - affinity_str: None, - }); - program.emit_insn(Insn::IdxInsert { - cursor_id: idx_cursor_id, - record_reg, - unpacked_start: Some(idx_start_reg), - unpacked_count: Some((num_cols + 1) as u16), - flags: IdxInsertFlags::new().nchange(true), - }); - } - } - - // Close the partial-index skip (preflight) - if let Some(lbl) = maybe_skip_probe_label { - program.resolve_label(lbl, program.offset()); - } - } - ResolvedUpsertTarget::CatchAll => unreachable!(), - } - } - - for column_mapping in insertion - .col_mappings - .iter() - .filter(|column_mapping| column_mapping.column.notnull) - { - // if this is rowid alias - turso-db will emit NULL as a column value and always use rowid for the row as a column value - if column_mapping.column.is_rowid_alias { - continue; - } - program.emit_insn(Insn::HaltIfNull { - target_reg: column_mapping.register, - err_code: SQLITE_CONSTRAINT_NOTNULL, - description: { - let mut description = String::with_capacity( - table_name.as_str().len() - + column_mapping - .column - .name - .as_ref() - .expect("Column name must be present") - .len() - + 2, - ); - description.push_str(table_name.as_str()); - description.push('.'); - description.push_str( - column_mapping - .column - .name - .as_ref() - .expect("Column name must be present"), - ); - description - }, - }); - } + emit_notnulls(&mut program, &ctx, &insertion); // Create and insert the record let affinity_str = insertion @@ -950,7 +500,6 @@ pub fn translate_insert( .iter() .map(|col_mapping| col_mapping.column.affinity().aff_mask()) .collect::(); - program.emit_insn(Insn::MakeRecord { start_reg: insertion.first_col_register(), count: insertion.col_mappings.len(), @@ -965,7 +514,8 @@ pub fn translate_insert( // and insert into all applicable indexes, we do not re-probe uniqueness here, as preflight // already guaranteed non-conflict. for index in resolver.schema.get_indices(table_name.as_str()) { - let idx_cursor_id = idx_cursors + let idx_cursor_id = ctx + .idx_cursors .iter() .find(|(name, _, _)| *name == &index.name) .map(|(_, _, c_id)| *c_id) @@ -1050,7 +600,7 @@ pub fn translate_insert( } program.emit_insn(Insn::Insert { - cursor: cursor_id, + cursor: ctx.cursor_id, key_reg: insertion.key_register(), record_reg: insertion.record_register(), flag: InsertFlags::new(), @@ -1062,7 +612,13 @@ pub fn translate_insert( emit_parent_side_fk_decrement_on_insert(&mut program, resolver, &btree_table, &insertion)?; } - if let Some((seq_cursor_id, r_seq, r_seq_rowid, table_name_reg)) = autoincrement_meta { + if let Some(AutoincMeta { + seq_cursor_id, + r_seq, + r_seq_rowid, + table_name_reg, + }) = ctx.autoincrement_meta + { let no_update_needed_label = program.allocate_label(); program.emit_insn(Insn::Le { lhs: insertion.key_register(), @@ -1088,7 +644,7 @@ pub fn translate_insert( } // Emit update in the CDC table if necessary (after the INSERT updated the table) - if let Some((cdc_cursor_id, _)) = &cdc_table { + if let Some((cdc_cursor_id, _)) = &ctx.cdc_table { let cdc_has_after = program.capture_data_changes_mode().has_after(); let after_record_reg = if cdc_has_after { Some(emit_cdc_patch_record( @@ -1125,46 +681,23 @@ pub fn translate_insert( emit_returning_results(&mut program, &result_columns, &value_registers)?; } program.emit_insn(Insn::Goto { - target_pc: row_done_label, + target_pc: ctx.row_done_label, }); - for (_, label, mut upsert) in upsert_actions { - program.preassign_label_to_next_insn(label); - - if let UpsertDo::Set { - ref mut sets, - ref mut where_clause, - } = upsert.do_clause - { - // Normalize SET pairs once - let mut rewritten_sets = collect_set_clauses_for_upsert(&table, sets)?; - - emit_upsert( - &mut program, - &table, - &insertion, - cursor_id, - conflict_rowid_reg, - &mut rewritten_sets, - where_clause, - resolver, - &idx_cursors, - &mut result_columns, - cdc_table.as_ref().map(|c| c.0), - row_done_label, - connection, - )?; - } else { - // UpsertDo::Nothing case - program.emit_insn(Insn::Goto { - target_pc: row_done_label, - }); - } - } + resolve_upserts( + &mut program, + resolver, + &mut upsert_actions, + &ctx, + &insertion, + &table, + &mut result_columns, + connection, + )?; if inserting_multiple_rows { - if let Some(temp_table_ctx) = temp_table_ctx { - program.resolve_label(row_done_label, program.offset()); + if let Some(temp_table_ctx) = ctx.temp_table_ctx { + program.resolve_label(ctx.row_done_label, program.offset()); program.emit_insn(Insn::Next { cursor_id: temp_table_ctx.cursor_id, @@ -1176,35 +709,432 @@ pub fn translate_insert( cursor_id: temp_table_ctx.cursor_id, }); program.emit_insn(Insn::Goto { - target_pc: stmt_epilogue, + target_pc: ctx.stmt_epilogue, }); } else { // For multiple rows which not require a temp table, loop back - program.resolve_label(row_done_label, program.offset()); + program.resolve_label(ctx.row_done_label, program.offset()); program.emit_insn(Insn::Goto { - target_pc: loop_start_label, + target_pc: ctx.loop_start_label, }); - if let Some(sel_eof) = select_exhausted_label { + if let Some(sel_eof) = ctx.select_exhausted_label { program.preassign_label_to_next_insn(sel_eof); program.emit_insn(Insn::Goto { - target_pc: stmt_epilogue, + target_pc: ctx.stmt_epilogue, }); } } } else { - program.resolve_label(row_done_label, program.offset()); + program.resolve_label(ctx.row_done_label, program.offset()); // single-row falls through to epilogue program.emit_insn(Insn::Goto { - target_pc: stmt_epilogue, + target_pc: ctx.stmt_epilogue, }); } - program.preassign_label_to_next_insn(stmt_epilogue); - program.resolve_label(halt_label, program.offset()); + program.preassign_label_to_next_insn(ctx.stmt_epilogue); + program.resolve_label(ctx.halt_label, program.offset()); Ok(program) } +#[allow(clippy::too_many_arguments)] +fn resolve_upserts( + program: &mut ProgramBuilder, + resolver: &Resolver, + upsert_actions: &mut [(ResolvedUpsertTarget, BranchOffset, Box)], + ctx: &InsertEmitCtx, + insertion: &Insertion, + table: &Table, + result_columns: &mut [ResultSetColumn], + connection: &Arc, +) -> Result<()> { + for (_, label, upsert) in upsert_actions { + program.preassign_label_to_next_insn(*label); + + if let UpsertDo::Set { + ref mut sets, + ref mut where_clause, + } = upsert.do_clause + { + // Normalize SET pairs once + let mut rewritten_sets = collect_set_clauses_for_upsert(table, sets)?; + + emit_upsert( + program, + table, + ctx, + insertion, + &mut rewritten_sets, + where_clause, + resolver, + result_columns, + connection, + )?; + } else { + // UpsertDo::Nothing case + program.emit_insn(Insn::Goto { + target_pc: ctx.row_done_label, + }); + } + } + Ok(()) +} + +fn emit_notnulls(program: &mut ProgramBuilder, ctx: &InsertEmitCtx, insertion: &Insertion) { + for column_mapping in insertion + .col_mappings + .iter() + .filter(|column_mapping| column_mapping.column.notnull) + { + // if this is rowid alias - turso-db will emit NULL as a column value and always use rowid for the row as a column value + if column_mapping.column.is_rowid_alias { + continue; + } + program.emit_insn(Insn::HaltIfNull { + target_reg: column_mapping.register, + err_code: SQLITE_CONSTRAINT_NOTNULL, + description: { + let mut description = String::with_capacity( + ctx.table.name.as_str().len() + + column_mapping + .column + .name + .as_ref() + .expect("Column name must be present") + .len() + + 2, + ); + description.push_str(ctx.table.name.as_str()); + description.push('.'); + description.push_str( + column_mapping + .column + .name + .as_ref() + .expect("Column name must be present"), + ); + description + }, + }); + } +} + +struct BoundInsertResult { + values: Vec>, + upsert_actions: Vec<(ResolvedUpsertTarget, BranchOffset, Box)>, + inserting_multiple_rows: bool, +} + +fn bind_insert( + program: &mut ProgramBuilder, + resolver: &Resolver, + table: &Table, + body: &mut InsertBody, + connection: &Arc, +) -> Result { + let mut values: Vec> = vec![]; + let mut upsert_actions: Vec<(ResolvedUpsertTarget, BranchOffset, Box)> = Vec::new(); + let mut inserting_multiple_rows = false; + match body { + InsertBody::DefaultValues => { + // Generate default values for the table + values = table + .columns() + .iter() + .filter(|c| !c.hidden) + .map(|c| { + c.default + .clone() + .unwrap_or(Box::new(ast::Expr::Literal(ast::Literal::Null))) + }) + .collect(); + } + InsertBody::Select(select, upsert_opt) => { + match &mut select.body.select { + // TODO see how to avoid clone + OneSelect::Values(values_expr) if values_expr.len() <= 1 => { + if values_expr.is_empty() { + crate::bail_parse_error!("no values to insert"); + } + for expr in values_expr.iter_mut().flat_map(|v| v.iter_mut()) { + match expr.as_mut() { + Expr::Id(name) => { + if name.quoted_with('"') { + *expr = Expr::Literal(ast::Literal::String(name.as_literal())) + .into(); + } else { + // an INSERT INTO ... VALUES (...) cannot reference columns + crate::bail_parse_error!("no such column: {name}"); + } + } + Expr::Qualified(first_name, second_name) => { + // an INSERT INTO ... VALUES (...) cannot reference columns + crate::bail_parse_error!( + "no such column: {first_name}.{second_name}" + ); + } + _ => {} + } + bind_and_rewrite_expr( + expr, + None, + None, + connection, + &mut program.param_ctx, + BindingBehavior::ResultColumnsNotAllowed, + )?; + } + values = values_expr.pop().unwrap_or_else(Vec::new); + } + _ => inserting_multiple_rows = true, + } + while let Some(mut upsert) = upsert_opt.take() { + if let UpsertDo::Set { + ref mut sets, + ref mut where_clause, + } = &mut upsert.do_clause + { + for set in sets.iter_mut() { + bind_and_rewrite_expr( + &mut set.expr, + None, + None, + connection, + &mut program.param_ctx, + BindingBehavior::AllowUnboundIdentifiers, + )?; + } + if let Some(ref mut where_expr) = where_clause { + bind_and_rewrite_expr( + where_expr, + None, + None, + connection, + &mut program.param_ctx, + BindingBehavior::AllowUnboundIdentifiers, + )?; + } + } + let next = upsert.next.take(); + upsert_actions.push(( + // resolve the constrained target for UPSERT in the chain + resolve_upsert_target(resolver.schema, table, &upsert)?, + program.allocate_label(), + upsert, + )); + *upsert_opt = next; + } + } + } + Ok(BoundInsertResult { + values, + upsert_actions, + inserting_multiple_rows, + }) +} + +/// Depending on the InsertBody, we begin to initialize the source of the insert values +/// into registers using the following methods: +/// +/// Values with a single row, expressions are directly evaluated into registers, so nothing +/// is emitted here, we simply allocate the cursor ID and store the arity. +/// +/// Values with multiple rows, we use a coroutine to yield each row into registers directly. +/// +/// Select, we use a coroutine to yield each row from the SELECT into registers, +/// materializing into a temporary table if the target table is also read by the SELECT. +/// +/// For DefaultValues, we allocate the cursor and extend the empty values vector with either the +/// default expressions registered for the columns, or NULLs, so they can be translated into +/// registers later. +#[allow(clippy::too_many_arguments)] +fn init_source_emission<'a>( + mut program: ProgramBuilder, + table: &Table, + connection: &Arc, + ctx: &mut InsertEmitCtx<'a>, + resolver: &Resolver, + values: &mut Vec>, + body: InsertBody, + columns: &'a [ast::Name], +) -> Result { + let (num_values, cursor_id) = match body { + InsertBody::Select(select, _) => { + // Simple Common case of INSERT INTO
VALUES (...) + if matches!(&select.body.select, OneSelect::Values(values) if values.len() <= 1) { + ( + values.len(), + program.alloc_cursor_id(CursorType::BTreeTable(ctx.table.clone())), + ) + } else { + // Multiple rows - use coroutine for value population + let yield_reg = program.alloc_register(); + let jump_on_definition_label = program.allocate_label(); + let start_offset_label = program.allocate_label(); + program.emit_insn(Insn::InitCoroutine { + yield_reg, + jump_on_definition: jump_on_definition_label, + start_offset: start_offset_label, + }); + program.preassign_label_to_next_insn(start_offset_label); + + let query_destination = QueryDestination::CoroutineYield { + yield_reg, + coroutine_implementation_start: ctx.halt_label, + }; + program.incr_nesting(); + let result = + translate_select(select, resolver, program, query_destination, connection)?; + program = result.program; + program.decr_nesting(); + + program.emit_insn(Insn::EndCoroutine { yield_reg }); + program.preassign_label_to_next_insn(jump_on_definition_label); + + let cursor_id = program.alloc_cursor_id(CursorType::BTreeTable(ctx.table.clone())); + + // From SQLite + /* Set useTempTable to TRUE if the result of the SELECT statement + ** should be written into a temporary table (template 4). Set to + ** FALSE if each output row of the SELECT can be written directly into + ** the destination table (template 3). + ** + ** A temp table must be used if the table being updated is also one + ** of the tables being read by the SELECT statement. Also use a + ** temp table in the case of row triggers. + */ + if program.is_table_open(table) { + let temp_cursor_id = + program.alloc_cursor_id(CursorType::BTreeTable(ctx.table.clone())); + ctx.temp_table_ctx = Some(TempTableCtx { + cursor_id: temp_cursor_id, + loop_start_label: program.allocate_label(), + loop_end_label: program.allocate_label(), + }); + + program.emit_insn(Insn::OpenEphemeral { + cursor_id: temp_cursor_id, + is_table: true, + }); + + // Main loop + program.preassign_label_to_next_insn(ctx.loop_start_label); + let yield_label = program.allocate_label(); + program.emit_insn(Insn::Yield { + yield_reg, + end_offset: yield_label, // stays local, we’ll route at loop end + }); + + let record_reg = program.alloc_register(); + let affinity_str = if columns.is_empty() { + ctx.table + .columns + .iter() + .filter(|col| !col.hidden) + .map(|col| col.affinity().aff_mask()) + .collect::() + } else { + columns + .iter() + .map(|col_name| { + let column_name = normalize_ident(col_name.as_str()); + if ROWID_STRS + .iter() + .any(|s| s.eq_ignore_ascii_case(&column_name)) + { + return Affinity::Integer.aff_mask(); + } + table + .get_column_by_name(&column_name) + .unwrap() + .1 + .affinity() + .aff_mask() + }) + .collect::() + }; + + program.emit_insn(Insn::MakeRecord { + start_reg: program.reg_result_cols_start.unwrap_or(yield_reg + 1), + count: result.num_result_cols, + dest_reg: record_reg, + index_name: None, + affinity_str: Some(affinity_str), + }); + + let rowid_reg = program.alloc_register(); + program.emit_insn(Insn::NewRowid { + cursor: temp_cursor_id, + rowid_reg, + prev_largest_reg: 0, + }); + program.emit_insn(Insn::Insert { + cursor: temp_cursor_id, + key_reg: rowid_reg, + record_reg, + // since we are not doing an Insn::NewRowid or an Insn::NotExists here, we need to seek to ensure the insertion happens in the correct place. + flag: InsertFlags::new().require_seek(), + table_name: "".to_string(), + }); + // loop back + program.emit_insn(Insn::Goto { + target_pc: ctx.loop_start_label, + }); + program.preassign_label_to_next_insn(yield_label); + + program.emit_insn(Insn::OpenWrite { + cursor_id, + root_page: RegisterOrLiteral::Literal(ctx.table.root_page), + db: 0, + }); + } else { + program.emit_insn(Insn::OpenWrite { + cursor_id, + root_page: RegisterOrLiteral::Literal(ctx.table.root_page), + db: 0, + }); + + program.preassign_label_to_next_insn(ctx.loop_start_label); + + // on EOF, jump to select_exhausted to check FK constraints + let select_exhausted = program.allocate_label(); + ctx.select_exhausted_label = Some(select_exhausted); + program.emit_insn(Insn::Yield { + yield_reg, + end_offset: select_exhausted, + }); + } + + ctx.yield_reg_opt = Some(yield_reg); + (result.num_result_cols, cursor_id) + } + } + InsertBody::DefaultValues => { + let num_values = table.columns().len(); + values.extend(table.columns().into_iter().map(|c| { + c.default + .clone() + .unwrap_or(Box::new(ast::Expr::Literal(ast::Literal::Null))) + })); + ( + num_values, + program.alloc_cursor_id(CursorType::BTreeTable(ctx.table.clone())), + ) + } + }; + ctx.num_values = num_values; + ctx.cursor_id = cursor_id; + Ok(program) +} + +pub struct AutoincMeta { + seq_cursor_id: usize, + r_seq: usize, + r_seq_rowid: usize, + table_name_reg: usize, +} + pub const ROWID_COLUMN: &Column = &Column { name: None, ty: schema::Type::Integer, @@ -1591,6 +1521,245 @@ fn translate_column( Ok(()) } +// Preflight phase: evaluate each applicable UNIQUE constraint and probe with NoConflict. +// If any probe hits: +// DO NOTHING -> jump to row_done_label. +// +// DO UPDATE (matching target) -> fetch conflicting rowid and jump to `upsert_entry`. +// +// otherwise, raise SQLITE_CONSTRAINT_UNIQUE +fn emit_preflight_constraint_checks( + program: &mut ProgramBuilder, + ctx: &InsertEmitCtx, + resolver: &Resolver, + insertion: &Insertion, + upsert_actions: &[(ResolvedUpsertTarget, BranchOffset, Box)], + constraints_to_check: &[(ResolvedUpsertTarget, Option)], + upsert_catch_all_position: Option, +) -> Result<()> { + for (constraint, position) in constraints_to_check { + match constraint { + ResolvedUpsertTarget::PrimaryKey => { + let make_record_label = program.allocate_label(); + program.emit_insn(Insn::NotExists { + cursor: ctx.cursor_id, + rowid_reg: insertion.key_register(), + target_pc: make_record_label, + }); + let rowid_column_name = insertion.key.column_name(); + + // Conflict on rowid: attempt to route through UPSERT if it targets the PK, otherwise raise constraint. + // emit Halt for every case *except* when upsert handles the conflict + 'emit_halt: { + if let Some(position) = position.or(upsert_catch_all_position) { + // PK conflict: the conflicting rowid is exactly the attempted key + program.emit_insn(Insn::Copy { + src_reg: insertion.key_register(), + dst_reg: ctx.conflict_rowid_reg, + extra_amount: 0, + }); + program.emit_insn(Insn::Goto { + target_pc: upsert_actions[position].1, + }); + break 'emit_halt; + } + let mut description = + String::with_capacity(ctx.table.name.len() + rowid_column_name.len() + 2); + description.push_str(ctx.table.name.as_str()); + description.push('.'); + description.push_str(rowid_column_name); + program.emit_insn(Insn::Halt { + err_code: SQLITE_CONSTRAINT_PRIMARYKEY, + description, + }); + } + program.preassign_label_to_next_insn(make_record_label); + } + ResolvedUpsertTarget::Index(index) => { + let column_mappings = index + .columns + .iter() + .map(|idx_col| insertion.get_col_mapping_by_name(&idx_col.name)); + // find which cursor we opened earlier for this index + let idx_cursor_id = ctx + .idx_cursors + .iter() + .find(|(name, _, _)| *name == &index.name) + .map(|(_, _, c_id)| *c_id) + .expect("no cursor found for index"); + + let maybe_skip_probe_label = if let Some(where_clause) = &index.where_clause { + let mut where_for_eval = where_clause.as_ref().clone(); + rewrite_partial_index_where(&mut where_for_eval, insertion)?; + let reg = program.alloc_register(); + translate_expr_no_constant_opt( + program, + Some(&TableReferences::new_empty()), + &where_for_eval, + reg, + resolver, + NoConstantOptReason::RegisterReuse, + )?; + let lbl = program.allocate_label(); + program.emit_insn(Insn::IfNot { + reg, + target_pc: lbl, + jump_if_null: true, + }); + Some(lbl) + } else { + None + }; + + let num_cols = index.columns.len(); + // allocate scratch registers for the index columns plus rowid + let idx_start_reg = program.alloc_registers(num_cols + 1); + + // build unpacked key [idx_start_reg .. idx_start_reg+num_cols-1], and rowid in last reg, + // copy each index column from the table's column registers into these scratch regs + for (i, column_mapping) in column_mappings.clone().enumerate() { + // copy from the table's column register over to the index's scratch register + let Some(col_mapping) = column_mapping else { + return Err(crate::LimboError::PlanningError( + "Column not found in INSERT".to_string(), + )); + }; + program.emit_insn(Insn::Copy { + src_reg: col_mapping.register, + dst_reg: idx_start_reg + i, + extra_amount: 0, + }); + } + // last register is the rowid + program.emit_insn(Insn::Copy { + src_reg: insertion.key_register(), + dst_reg: idx_start_reg + num_cols, + extra_amount: 0, + }); + + if index.unique { + let aff = index + .columns + .iter() + .map(|ic| ctx.table.columns[ic.pos_in_table].affinity().aff_mask()) + .collect::(); + program.emit_insn(Insn::Affinity { + start_reg: idx_start_reg, + count: NonZeroUsize::new(num_cols).expect("nonzero col count"), + affinities: aff, + }); + + if !upsert_actions.is_empty() { + let next_check = program.allocate_label(); + program.emit_insn(Insn::NoConflict { + cursor_id: idx_cursor_id, + target_pc: next_check, + record_reg: idx_start_reg, + num_regs: num_cols, + }); + + // Conflict detected, figure out if this UPSERT handles the conflict + if let Some(position) = position.or(upsert_catch_all_position) { + match &upsert_actions[position].2.do_clause { + UpsertDo::Nothing => { + // Bail out without writing anything + program.emit_insn(Insn::Goto { + target_pc: ctx.row_done_label, + }); + } + UpsertDo::Set { .. } => { + // Route to DO UPDATE: capture conflicting rowid then jump + program.emit_insn(Insn::IdxRowId { + cursor_id: idx_cursor_id, + dest: ctx.conflict_rowid_reg, + }); + program.emit_insn(Insn::Goto { + target_pc: upsert_actions[position].1, + }); + } + } + } + // No matching UPSERT handler so we emit constraint error + // (if conflict clause matched - VM will jump to later instructions and skip halt) + program.emit_insn(Insn::Halt { + err_code: SQLITE_CONSTRAINT_UNIQUE, + description: format_unique_violation_desc( + ctx.table.name.as_str(), + &index, + ), + }); + + // continue preflight with next constraint + program.preassign_label_to_next_insn(next_check); + } else { + // No UPSERT fast-path: probe and immediately insert + let ok = program.allocate_label(); + program.emit_insn(Insn::NoConflict { + cursor_id: idx_cursor_id, + target_pc: ok, + record_reg: idx_start_reg, + num_regs: num_cols, + }); + // Unique violation without ON CONFLICT clause -> error + program.emit_insn(Insn::Halt { + err_code: SQLITE_CONSTRAINT_UNIQUE, + description: format_unique_violation_desc( + ctx.table.name.as_str(), + &index, + ), + }); + program.preassign_label_to_next_insn(ok); + + // In the non-UPSERT case, we insert the index + let record_reg = program.alloc_register(); + program.emit_insn(Insn::MakeRecord { + start_reg: idx_start_reg, + count: num_cols + 1, + dest_reg: record_reg, + index_name: Some(index.name.clone()), + affinity_str: None, + }); + program.emit_insn(Insn::IdxInsert { + cursor_id: idx_cursor_id, + record_reg, + unpacked_start: Some(idx_start_reg), + unpacked_count: Some((num_cols + 1) as u16), + flags: IdxInsertFlags::new().nchange(true), + }); + } + } else { + // Non-unique index: in UPSERT mode we postpone writes to commit phase. + if upsert_actions.is_empty() { + // eager insert for non-unique, no UPSERT + let record_reg = program.alloc_register(); + program.emit_insn(Insn::MakeRecord { + start_reg: idx_start_reg, + count: num_cols + 1, + dest_reg: record_reg, + index_name: Some(index.name.clone()), + affinity_str: None, + }); + program.emit_insn(Insn::IdxInsert { + cursor_id: idx_cursor_id, + record_reg, + unpacked_start: Some(idx_start_reg), + unpacked_count: Some((num_cols + 1) as u16), + flags: IdxInsertFlags::new().nchange(true), + }); + } + } + + // Close the partial-index skip (preflight) + if let Some(lbl) = maybe_skip_probe_label { + program.resolve_label(lbl, program.offset()); + } + } + ResolvedUpsertTarget::CatchAll => unreachable!(), + } + } + Ok(()) +} + // TODO: comeback here later to apply the same improvements on select fn translate_virtual_table_insert( mut program: ProgramBuilder, @@ -1821,6 +1990,44 @@ pub fn rewrite_partial_index_where( ) } +fn build_constraints_to_check( + resolver: &Resolver, + table_name: &str, + upsert_actions: &[(ResolvedUpsertTarget, BranchOffset, Box)], + has_user_provided_rowid: bool, +) -> (Vec<(ResolvedUpsertTarget, Option)>, Option) { + let mut constraints_to_check = Vec::new(); + if has_user_provided_rowid { + // Check uniqueness constraint for rowid if it was provided by user. + // When the DB allocates it there are no need for separate uniqueness checks. + let position = upsert_actions + .iter() + .position(|(target, ..)| matches!(target, ResolvedUpsertTarget::PrimaryKey)); + constraints_to_check.push((ResolvedUpsertTarget::PrimaryKey, position)); + } + for index in resolver.schema.get_indices(table_name) { + let position = upsert_actions + .iter() + .position(|(target, ..)| matches!(target, ResolvedUpsertTarget::Index(x) if Arc::ptr_eq(x, index))); + constraints_to_check.push((ResolvedUpsertTarget::Index(index.clone()), position)); + } + + constraints_to_check.sort_by(|(_, p1), (_, p2)| match (p1, p2) { + (Some(p1), Some(p2)) => p1.cmp(p2), + (Some(_), None) => std::cmp::Ordering::Less, + (None, Some(_)) => std::cmp::Ordering::Greater, + (None, None) => std::cmp::Ordering::Equal, + }); + + let upsert_catch_all_position = + if let Some((ResolvedUpsertTarget::CatchAll, ..)) = upsert_actions.last() { + Some(upsert_actions.len() - 1) + } else { + None + }; + (constraints_to_check, upsert_catch_all_position) +} + fn emit_update_sqlite_sequence( program: &mut ProgramBuilder, schema: &Schema, diff --git a/core/translate/mod.rs b/core/translate/mod.rs index d51d89dea..758031544 100644 --- a/core/translate/mod.rs +++ b/core/translate/mod.rs @@ -284,17 +284,21 @@ pub fn translate_inner( columns, body, returning, - } => translate_insert( - with, - resolver, - or_conflict, - tbl_name, - columns, - body, - returning, - program, - connection, - )?, + } => { + if with.is_some() { + crate::bail_parse_error!("WITH clause is not supported"); + } + translate_insert( + resolver, + or_conflict, + tbl_name, + columns, + body, + returning, + program, + connection, + )? + } }; // Indicate write operations so that in the epilogue we can emit the correct type of transaction diff --git a/core/translate/upsert.rs b/core/translate/upsert.rs index 868f3a933..3fb3db43a 100644 --- a/core/translate/upsert.rs +++ b/core/translate/upsert.rs @@ -8,7 +8,7 @@ use crate::error::SQLITE_CONSTRAINT_PRIMARYKEY; use crate::schema::ROWID_SENTINEL; use crate::translate::expr::{walk_expr, WalkControl}; use crate::translate::fkeys::{emit_fk_child_update_counters, emit_parent_pk_change_checks}; -use crate::translate::insert::format_unique_violation_desc; +use crate::translate::insert::{format_unique_violation_desc, InsertEmitCtx}; use crate::translate::planner::ROWID_STRS; use crate::vdbe::insn::CmpInsFlags; use crate::Connection; @@ -31,7 +31,6 @@ use crate::{ vdbe::{ builder::ProgramBuilder, insn::{IdxInsertFlags, InsertFlags, Insn}, - BranchOffset, }, }; @@ -339,35 +338,31 @@ pub fn resolve_upsert_target( pub fn emit_upsert( program: &mut ProgramBuilder, table: &Table, + ctx: &InsertEmitCtx, insertion: &Insertion, - tbl_cursor_id: usize, - conflict_rowid_reg: usize, set_pairs: &mut [(usize, Box)], where_clause: &mut Option>, resolver: &Resolver, - idx_cursors: &[(&String, i64, usize)], returning: &mut [ResultSetColumn], - cdc_cursor_id: Option, - row_done_label: BranchOffset, connection: &Arc, ) -> crate::Result<()> { // Seek & snapshot CURRENT program.emit_insn(Insn::SeekRowid { - cursor_id: tbl_cursor_id, - src_reg: conflict_rowid_reg, - target_pc: row_done_label, + cursor_id: ctx.cursor_id, + src_reg: ctx.conflict_rowid_reg, + target_pc: ctx.row_done_label, }); - let num_cols = table.columns().len(); + let num_cols = ctx.table.columns.len(); let current_start = program.alloc_registers(num_cols); - for (i, col) in table.columns().iter().enumerate() { + for (i, col) in ctx.table.columns.iter().enumerate() { if col.is_rowid_alias { program.emit_insn(Insn::RowId { - cursor_id: tbl_cursor_id, + cursor_id: ctx.cursor_id, dest: current_start + i, }); } else { program.emit_insn(Insn::Column { - cursor_id: tbl_cursor_id, + cursor_id: ctx.cursor_id, column: i, dest: current_start + i, default: None, @@ -376,7 +371,7 @@ pub fn emit_upsert( } // BEFORE for index maintenance / CDC - let before_start = if cdc_cursor_id.is_some() || !idx_cursors.is_empty() { + let before_start = if ctx.cdc_table.is_some() || !ctx.idx_cursors.is_empty() { let s = program.alloc_registers(num_cols); program.emit_insn(Insn::Copy { src_reg: current_start, @@ -402,7 +397,7 @@ pub fn emit_upsert( pred, table, current_start, - conflict_rowid_reg, + ctx.conflict_rowid_reg, Some(table.get_name()), Some(insertion), true, @@ -411,7 +406,7 @@ pub fn emit_upsert( translate_expr(program, None, pred, pr, resolver)?; program.emit_insn(Insn::IfNot { reg: pr, - target_pc: row_done_label, + target_pc: ctx.row_done_label, jump_if_null: true, }); } @@ -423,7 +418,7 @@ pub fn emit_upsert( expr, table, current_start, - conflict_rowid_reg, + ctx.conflict_rowid_reg, Some(table.get_name()), Some(insertion), true, @@ -480,13 +475,13 @@ pub fn emit_upsert( }; let rowid_set_clause_reg = if has_user_provided_rowid { - Some(new_rowid_reg.unwrap_or(conflict_rowid_reg)) + Some(new_rowid_reg.unwrap_or(ctx.conflict_rowid_reg)) } else { None }; if let Some(bt) = table.btree() { if connection.foreign_keys_enabled() { - let rowid_new_reg = new_rowid_reg.unwrap_or(conflict_rowid_reg); + let rowid_new_reg = new_rowid_reg.unwrap_or(ctx.conflict_rowid_reg); // Child-side checks if resolver.schema.has_child_fks(bt.name.as_str()) { @@ -495,7 +490,7 @@ pub fn emit_upsert( resolver, &bt, table.get_name(), - tbl_cursor_id, + ctx.cursor_id, new_start, rowid_new_reg, &changed_cols, @@ -505,10 +500,10 @@ pub fn emit_upsert( program, resolver, &bt, - tbl_cursor_id, - conflict_rowid_reg, + ctx.cursor_id, + ctx.conflict_rowid_reg, new_start, - new_rowid_reg.unwrap_or(conflict_rowid_reg), + new_rowid_reg.unwrap_or(ctx.conflict_rowid_reg), rowid_set_clause_reg, set_pairs, )?; @@ -517,7 +512,7 @@ pub fn emit_upsert( // Index rebuild (DELETE old, INSERT new), honoring partial-index WHEREs if let Some(before) = before_start { - for (idx_name, _root, idx_cid) in idx_cursors { + for (idx_name, _root, idx_cid) in &ctx.idx_cursors { let idx_meta = resolver .schema .get_index(table.get_name(), idx_name) @@ -533,10 +528,10 @@ pub fn emit_upsert( table, idx_meta, before, - conflict_rowid_reg, + ctx.conflict_rowid_reg, resolver, ); - let new_rowid = new_rowid_reg.unwrap_or(conflict_rowid_reg); + let new_rowid = new_rowid_reg.unwrap_or(ctx.conflict_rowid_reg); let new_pred_reg = eval_partial_pred_for_row_image( program, table, idx_meta, new_start, new_rowid, resolver, ); @@ -563,7 +558,7 @@ pub fn emit_upsert( }); } program.emit_insn(Insn::Copy { - src_reg: conflict_rowid_reg, + src_reg: ctx.conflict_rowid_reg, dst_reg: del + k, extra_amount: 0, }); @@ -694,7 +689,7 @@ pub fn emit_upsert( // If equal to old rowid, skip uniqueness probe program.emit_insn(Insn::Eq { lhs: rnew, - rhs: conflict_rowid_reg, + rhs: ctx.conflict_rowid_reg, target_pc: ok, flags: CmpInsFlags::default(), collation: program.curr_collation(), @@ -702,7 +697,7 @@ pub fn emit_upsert( // If another row already has rnew -> constraint program.emit_insn(Insn::NotExists { - cursor: tbl_cursor_id, + cursor: ctx.cursor_id, rowid_reg: rnew, target_pc: ok, }); @@ -723,11 +718,11 @@ pub fn emit_upsert( // Now replace the row program.emit_insn(Insn::Delete { - cursor_id: tbl_cursor_id, + cursor_id: ctx.cursor_id, table_name: table.get_name().to_string(), }); program.emit_insn(Insn::Insert { - cursor: tbl_cursor_id, + cursor: ctx.cursor_id, key_reg: rnew, record_reg: rec, flag: InsertFlags::new().require_seek().update_rowid_change(), @@ -735,8 +730,8 @@ pub fn emit_upsert( }); } else { program.emit_insn(Insn::Insert { - cursor: tbl_cursor_id, - key_reg: conflict_rowid_reg, + cursor: ctx.cursor_id, + key_reg: ctx.conflict_rowid_reg, record_reg: rec, flag: InsertFlags::new(), table_name: table.get_name().to_string(), @@ -744,16 +739,16 @@ pub fn emit_upsert( } // emit CDC instructions - if let Some(cdc_id) = cdc_cursor_id { - let new_rowid = new_rowid_reg.unwrap_or(conflict_rowid_reg); + if let Some((cdc_id, _)) = ctx.cdc_table { + let new_rowid = new_rowid_reg.unwrap_or(ctx.conflict_rowid_reg); if new_rowid_reg.is_some() { // DELETE (before) let before_rec = if program.capture_data_changes_mode().has_before() { Some(emit_cdc_full_record( program, table.columns(), - tbl_cursor_id, - conflict_rowid_reg, + ctx.cursor_id, + ctx.conflict_rowid_reg, )) } else { None @@ -763,7 +758,7 @@ pub fn emit_upsert( resolver, OperationMode::DELETE, cdc_id, - conflict_rowid_reg, + ctx.conflict_rowid_reg, before_rec, None, None, @@ -796,7 +791,7 @@ pub fn emit_upsert( table, new_start, rec, - conflict_rowid_reg, + ctx.conflict_rowid_reg, )) } else { None @@ -805,8 +800,8 @@ pub fn emit_upsert( Some(emit_cdc_full_record( program, table.columns(), - tbl_cursor_id, - conflict_rowid_reg, + ctx.cursor_id, + ctx.conflict_rowid_reg, )) } else { None @@ -816,7 +811,7 @@ pub fn emit_upsert( resolver, OperationMode::UPDATE, cdc_id, - conflict_rowid_reg, + ctx.conflict_rowid_reg, before_rec, after_rec, None, @@ -828,7 +823,7 @@ pub fn emit_upsert( // RETURNING from NEW image + final rowid if !returning.is_empty() { let regs = ReturningValueRegisters { - rowid_register: new_rowid_reg.unwrap_or(conflict_rowid_reg), + rowid_register: new_rowid_reg.unwrap_or(ctx.conflict_rowid_reg), columns_start_register: new_start, num_columns: num_cols, }; @@ -836,7 +831,7 @@ pub fn emit_upsert( } program.emit_insn(Insn::Goto { - target_pc: row_done_label, + target_pc: ctx.row_done_label, }); Ok(()) } From 20bdb1133dfed9fb35fe091ededc64a68ab017d1 Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Tue, 14 Oct 2025 13:00:31 -0400 Subject: [PATCH 2/3] fix clippy warnings --- core/translate/insert.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/core/translate/insert.rs b/core/translate/insert.rs index c1fab97f1..ce6e1b8bb 100644 --- a/core/translate/insert.rs +++ b/core/translate/insert.rs @@ -821,6 +821,7 @@ fn emit_notnulls(program: &mut ProgramBuilder, ctx: &InsertEmitCtx, insertion: & } struct BoundInsertResult { + #[allow(clippy::vec_box)] values: Vec>, upsert_actions: Vec<(ResolvedUpsertTarget, BranchOffset, Box)>, inserting_multiple_rows: bool, @@ -948,7 +949,7 @@ fn bind_insert( /// For DefaultValues, we allocate the cursor and extend the empty values vector with either the /// default expressions registered for the columns, or NULLs, so they can be translated into /// registers later. -#[allow(clippy::too_many_arguments)] +#[allow(clippy::too_many_arguments, clippy::vec_box)] fn init_source_emission<'a>( mut program: ProgramBuilder, table: &Table, @@ -1112,7 +1113,7 @@ fn init_source_emission<'a>( } InsertBody::DefaultValues => { let num_values = table.columns().len(); - values.extend(table.columns().into_iter().map(|c| { + values.extend(table.columns().iter().map(|c| { c.default .clone() .unwrap_or(Box::new(ast::Expr::Literal(ast::Literal::Null))) @@ -1685,7 +1686,7 @@ fn emit_preflight_constraint_checks( err_code: SQLITE_CONSTRAINT_UNIQUE, description: format_unique_violation_desc( ctx.table.name.as_str(), - &index, + index, ), }); @@ -1705,7 +1706,7 @@ fn emit_preflight_constraint_checks( err_code: SQLITE_CONSTRAINT_UNIQUE, description: format_unique_violation_desc( ctx.table.name.as_str(), - &index, + index, ), }); program.preassign_label_to_next_insn(ok); From 792877d421f8db1e669007f5772ae8a7f13576b7 Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Tue, 14 Oct 2025 13:22:32 -0400 Subject: [PATCH 3/3] add doc comments to InsertEmitCtx --- core/translate/insert.rs | 33 +++++++++++++++++++++++++-------- 1 file changed, 25 insertions(+), 8 deletions(-) diff --git a/core/translate/insert.rs b/core/translate/insert.rs index ce6e1b8bb..2fa5b8eec 100644 --- a/core/translate/insert.rs +++ b/core/translate/insert.rs @@ -42,12 +42,6 @@ use super::expr::{translate_expr, translate_expr_no_constant_opt, NoConstantOptR use super::plan::QueryDestination; use super::select::translate_select; -pub struct TempTableCtx { - cursor_id: usize, - loop_start_label: BranchOffset, - loop_end_label: BranchOffset, -} - /// Validate anything with this insert statement that should throw an early parse error fn validate(table_name: &str, resolver: &Resolver, table: &Table) -> Result<()> { // Check if this is a system table that should be protected from direct writes @@ -86,24 +80,48 @@ fn validate(table_name: &str, resolver: &Resolver, table: &Table) -> Result<()> Ok(()) } +pub struct TempTableCtx { + cursor_id: usize, + loop_start_label: BranchOffset, + loop_end_label: BranchOffset, +} + #[allow(dead_code)] pub struct InsertEmitCtx<'a> { + /// Parent table being inserted into pub table: &'a Arc, + + /// Index cursors we need to populate for this table + /// (idx name, root_page, idx cursor id) pub idx_cursors: Vec<(&'a String, i64, usize)>, + + /// Context for if the insert values are materialized first + /// into a temporary table pub temp_table_ctx: Option, + /// on conflict, default to ABORT pub on_conflict: ResolveType, + /// Arity of the insert values pub num_values: usize, + /// The yield register, if a coroutine is used to yield multiple rows pub yield_reg_opt: Option, + /// The register to hold the rowid of a conflicting row pub conflict_rowid_reg: usize, + /// The cursor id of the table being inserted into pub cursor_id: usize, - /// Labels + /// Label to jump to on HALT pub halt_label: BranchOffset, + /// Label to jump to when a row is done processing (either inserted or upserted) pub row_done_label: BranchOffset, + /// Jump here at the complete end of the statement pub stmt_epilogue: BranchOffset, + /// Beginning of the loop for multiple-row inserts pub loop_start_label: BranchOffset, + /// Label to jump to when a generated key is ready for uniqueness check pub key_ready_for_uniqueness_check_label: BranchOffset, + /// Label to jump to when no key is provided and one must be generated pub key_generation_label: BranchOffset, + /// Jump here when the insert value SELECT source has been fully exhausted pub select_exhausted_label: Option, /// CDC table info @@ -123,7 +141,6 @@ impl<'a> InsertEmitCtx<'a> { temp_table_ctx: Option, ) -> Self { // allocate cursor id's for each btree index cursor we'll need to populate the indexes - // (idx name, root_page, idx cursor id) let idx_cursors = resolver .schema .get_indices(table.name.as_str())