From 0ea6e5714dec061b3077b159bb8ca927a32d9d1d Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Sun, 21 Sep 2025 13:27:50 -0400 Subject: [PATCH] Separate UPSERT behavior into preflight and commit state to prevent inserting idx before violating unique constraint --- core/translate/insert.rs | 482 ++++++++++++++++++++++++++------------- 1 file changed, 324 insertions(+), 158 deletions(-) diff --git a/core/translate/insert.rs b/core/translate/insert.rs index 9c942d83e..509eda6ce 100644 --- a/core/translate/insert.rs +++ b/core/translate/insert.rs @@ -1,18 +1,20 @@ +use std::num::NonZeroUsize; use std::sync::Arc; use turso_parser::ast::{ self, Expr, InsertBody, OneSelect, QualifiedName, ResolveType, ResultColumn, Upsert, UpsertDo, With, }; -use crate::error::{SQLITE_CONSTRAINT_NOTNULL, SQLITE_CONSTRAINT_PRIMARYKEY}; -use crate::schema::{self, Table}; +use crate::error::{ + SQLITE_CONSTRAINT_NOTNULL, SQLITE_CONSTRAINT_PRIMARYKEY, SQLITE_CONSTRAINT_UNIQUE, +}; +use crate::schema::{self, Index, Table}; use crate::translate::emitter::{ emit_cdc_insns, emit_cdc_patch_record, prepare_cdc_if_necessary, OperationMode, }; use crate::translate::expr::{ - bind_and_rewrite_expr, emit_returning_results, process_returning_clause, - translate_condition_expr, walk_expr_mut, ConditionMetadata, ParamState, - ReturningValueRegisters, WalkControl, + bind_and_rewrite_expr, emit_returning_results, process_returning_clause, walk_expr_mut, + ParamState, ReturningValueRegisters, WalkControl, }; use crate::translate::plan::TableReferences; use crate::translate::planner::ROWID; @@ -351,6 +353,7 @@ pub fn translate_insert( program.alloc_cursor_id(CursorType::BTreeTable(btree_table.clone())), ), }; + let has_upsert = upsert_opt.is_some(); // Set up the program to return result columns if RETURNING is specified if !result_columns.is_empty() { @@ -373,6 +376,9 @@ pub fn translate_insert( let insertion = build_insertion(&mut program, &table, &columns, num_values)?; + let upsert_entry = program.allocate_label(); + let conflict_rowid_reg = program.alloc_register(); + if inserting_multiple_rows { translate_rows_multiple( &mut program, @@ -444,41 +450,20 @@ pub fn translate_insert( // Conflict on rowid: attempt to route through UPSERT if it targets the PK, otherwise raise constraint. // emit Halt for every case *except* when upsert handles the conflict 'emit_halt: { - if let (Some(ref mut upsert), Some(ref target)) = - (upsert_opt.as_mut(), resolved_upsert.as_ref()) - { + if let (Some(_), Some(ref target)) = (upsert_opt.as_mut(), resolved_upsert.as_ref()) { if matches!( target, ResolvedUpsertTarget::CatchAll | ResolvedUpsertTarget::PrimaryKey ) { - match upsert.do_clause { - UpsertDo::Nothing => { - program.emit_insn(Insn::Goto { - target_pc: row_done_label, - }); - } - UpsertDo::Set { - ref mut sets, - ref mut where_clause, - } => { - let mut rewritten_sets = collect_set_clauses_for_upsert(&table, sets)?; - emit_upsert( - &mut program, - schema, - &table, - &insertion, - cursor_id, - insertion.key_register(), - &mut rewritten_sets, - where_clause, - &resolver, - &idx_cursors, - &mut result_columns, - cdc_table.as_ref().map(|c| c.0), - row_done_label, - )?; - } - } + // PK conflict: the conflicting rowid is exactly the attempted key + program.emit_insn(Insn::Copy { + src_reg: insertion.key_register(), + dst_reg: conflict_rowid_reg, + extra_amount: 0, + }); + program.emit_insn(Insn::Goto { + target_pc: upsert_entry, + }); break 'emit_halt; } } @@ -507,6 +492,18 @@ pub fn translate_insert( _ => (), } + // We need to separate index handling and insertion into a `preflight` and a + // `commit` phase, because in UPSERT mode we might need to skip the actual insertion, as we can + // have a naked ON CONFLICT DO NOTHING, so if we eagerly insert any indexes, we could insert + // invalid index entries before we hit a conflict down the line. + // + // Preflight phase: evaluate each applicable UNIQUE constraint and probe with NoConflict. + // If any probe hits: + // DO NOTHING -> jump to row_done_label. + // + // DO UPDATE (matching target) -> fetch conflicting rowid and jump to `upsert_entry`. + // + // otherwise, raise SQLITE_CONSTRAINT_UNIQUE for index in schema.get_indices(table_name.as_str()) { let column_mappings = index .columns @@ -519,29 +516,25 @@ pub fn translate_insert( .map(|(_, _, c_id)| *c_id) .expect("no cursor found for index"); - let skip_index_label = if let Some(where_clause) = &index.where_clause { - // Clone and rewrite WHERE to use insertion registers + let maybe_skip_probe_label = if let Some(where_clause) = &index.where_clause { let mut where_for_eval = where_clause.as_ref().clone(); rewrite_partial_index_where(&mut where_for_eval, &insertion)?; - - // Evaluate rewritten WHERE clause - let skip_label = program.allocate_label(); - // We can use an empty TableReferences here because we shouldn't have any - // Expr::Column's in the partial index WHERE clause after rewriting it to use - // regsisters - let table_references = TableReferences::new_empty(); - translate_condition_expr( + let reg = program.alloc_register(); + translate_expr_no_constant_opt( &mut program, - &table_references, + Some(&TableReferences::new_empty()), &where_for_eval, - ConditionMetadata { - jump_if_condition_is_true: false, - jump_target_when_false: skip_label, - jump_target_when_true: BranchOffset::Placeholder, - }, + reg, &resolver, + NoConstantOptReason::RegisterReuse, )?; - Some(skip_label) + let lbl = program.allocate_label(); + program.emit_insn(Insn::IfNot { + reg, + target_pc: lbl, + jump_if_null: true, + }); + Some(lbl) } else { None }; @@ -550,6 +543,7 @@ pub fn translate_insert( // allocate scratch registers for the index columns plus rowid let idx_start_reg = program.alloc_registers(num_cols + 1); + // build unpacked key [idx_start_reg .. idx_start_reg+num_cols-1], and rowid in last reg, // copy each index column from the table's column registers into these scratch regs for (i, column_mapping) in column_mappings.clone().enumerate() { // copy from the table's column register over to the index's scratch register @@ -571,104 +565,131 @@ pub fn translate_insert( extra_amount: 0, }); - let record_reg = program.alloc_register(); - program.emit_insn(Insn::MakeRecord { - start_reg: idx_start_reg, - count: num_cols + 1, - dest_reg: record_reg, - index_name: Some(index.name.clone()), - affinity_str: None, - }); - if index.unique { - let label_idx_insert = program.allocate_label(); - program.emit_insn(Insn::NoConflict { - cursor_id: idx_cursor_id, - target_pc: label_idx_insert, - record_reg: idx_start_reg, - num_regs: num_cols, + let aff = index + .columns + .iter() + .map(|ic| table.columns()[ic.pos_in_table].affinity().aff_mask()) + .collect::(); + program.emit_insn(Insn::Affinity { + start_reg: idx_start_reg, + count: NonZeroUsize::new(num_cols).expect("nonzero col count"), + affinities: aff, }); - let column_names = index.columns.iter().enumerate().fold( - String::with_capacity(50), - |mut accum, (idx, column)| { - if idx > 0 { - accum.push_str(", "); - } - accum.push_str(table_name.as_str()); - accum.push('.'); - accum.push_str(&column.name); - accum - }, - ); - // again, emit halt for every case *except* when upsert handles the conflict - 'emit_halt: { - if let (Some(ref mut upsert), Some(ref target)) = - (upsert_opt.as_mut(), resolved_upsert.as_ref()) + + if has_upsert { + let next_check = program.allocate_label(); + program.emit_insn(Insn::NoConflict { + cursor_id: idx_cursor_id, + target_pc: next_check, + record_reg: idx_start_reg, + num_regs: num_cols, + }); + + // Conflict detected, figure out if this UPSERT handles the conflict + let upsert_matches_this_index = if let (Some(_u), Some(ref target)) = + (upsert_opt.as_ref(), resolved_upsert.as_ref()) { - if match target { + match target { ResolvedUpsertTarget::CatchAll => true, ResolvedUpsertTarget::Index(tgt) => Arc::ptr_eq(tgt, index), + // note: PK handled earlier by rowid path; this is a secondary index ResolvedUpsertTarget::PrimaryKey => false, - } { - match upsert.do_clause { - UpsertDo::Nothing => { - program.emit_insn(Insn::Goto { - target_pc: row_done_label, - }); - } - UpsertDo::Set { - ref mut sets, - ref mut where_clause, - } => { - let mut rewritten_sets = - collect_set_clauses_for_upsert(&table, sets)?; - let conflict_rowid_reg = program.alloc_register(); - program.emit_insn(Insn::IdxRowId { - cursor_id: idx_cursor_id, - dest: conflict_rowid_reg, - }); - emit_upsert( - &mut program, - schema, - &table, - &insertion, - cursor_id, - conflict_rowid_reg, - &mut rewritten_sets, - where_clause, - &resolver, - &idx_cursors, - &mut result_columns, - cdc_table.as_ref().map(|c| c.0), - row_done_label, - )?; - } - } - break 'emit_halt; } + } else { + false + }; + + if upsert_matches_this_index { + // Distinguish DO NOTHING vs DO UPDATE + match upsert_opt.as_ref().unwrap().do_clause { + UpsertDo::Nothing => { + // Bail out without writing anything + program.emit_insn(Insn::Goto { + target_pc: row_done_label, + }); + } + UpsertDo::Set { .. } => { + // Route to DO UPDATE: capture conflicting rowid then jump + program.emit_insn(Insn::IdxRowId { + cursor_id: idx_cursor_id, + dest: conflict_rowid_reg, + }); + program.emit_insn(Insn::Goto { + target_pc: upsert_entry, + }); + } + } + } else { + // No matching UPSERT handler so we emit constraint error + program.emit_insn(Insn::Halt { + err_code: SQLITE_CONSTRAINT_UNIQUE, + description: format_unique_violation_desc(table_name.as_str(), index), + }); } - // No matching UPSERT rule: unique constraint violation. + + // continue preflight with next constraint + program.preassign_label_to_next_insn(next_check); + } else { + // No UPSERT fast-path: probe and immediately insert + let ok = program.allocate_label(); + program.emit_insn(Insn::NoConflict { + cursor_id: idx_cursor_id, + target_pc: ok, + record_reg: idx_start_reg, + num_regs: num_cols, + }); + // Unique violation without ON CONFLICT clause -> error program.emit_insn(Insn::Halt { - err_code: SQLITE_CONSTRAINT_PRIMARYKEY, - description: column_names, + err_code: SQLITE_CONSTRAINT_UNIQUE, + description: format_unique_violation_desc(table_name.as_str(), index), + }); + program.preassign_label_to_next_insn(ok); + + // In the non-UPSERT case, we insert the index + let record_reg = program.alloc_register(); + program.emit_insn(Insn::MakeRecord { + start_reg: idx_start_reg, + count: num_cols + 1, + dest_reg: record_reg, + index_name: Some(index.name.clone()), + affinity_str: None, + }); + program.emit_insn(Insn::IdxInsert { + cursor_id: idx_cursor_id, + record_reg, + unpacked_start: Some(idx_start_reg), + unpacked_count: Some((num_cols + 1) as u16), + flags: IdxInsertFlags::new().nchange(true), + }); + } + } else { + // Non-unique index: in UPSERT mode we postpone writes to commit phase. + if !has_upsert { + // eager insert for non-unique, no UPSERT + let record_reg = program.alloc_register(); + program.emit_insn(Insn::MakeRecord { + start_reg: idx_start_reg, + count: num_cols + 1, + dest_reg: record_reg, + index_name: Some(index.name.clone()), + affinity_str: None, + }); + program.emit_insn(Insn::IdxInsert { + cursor_id: idx_cursor_id, + record_reg, + unpacked_start: Some(idx_start_reg), + unpacked_count: Some((num_cols + 1) as u16), + flags: IdxInsertFlags::new().nchange(true), }); } - program.resolve_label(label_idx_insert, program.offset()); } - // now do the actual index insertion using the unpacked registers - program.emit_insn(Insn::IdxInsert { - cursor_id: idx_cursor_id, - record_reg, - unpacked_start: Some(idx_start_reg), // TODO: enable optimization - unpacked_count: Some((num_cols + 1) as u16), - // TODO: figure out how to determine whether or not we need to seek prior to insert. - flags: IdxInsertFlags::new().nchange(true), - }); - if let Some(skip_label) = skip_index_label { - program.resolve_label(skip_label, program.offset()); + + // Close the partial-index skip (preflight) + if let Some(lbl) = maybe_skip_probe_label { + program.resolve_label(lbl, program.offset()); } } - for column_mapping in insertion .col_mappings .iter() @@ -705,6 +726,7 @@ pub fn translate_insert( }, }); } + // Create and insert the record let affinity_str = insertion .col_mappings @@ -719,6 +741,87 @@ pub fn translate_insert( index_name: None, affinity_str: Some(affinity_str), }); + + if has_upsert { + // COMMIT PHASE: no preflight jumps happened; emit the actual index writes now + // We re-check partial-index predicates against the NEW image, produce packed records, + // and insert into all applicable indexes, we do not re-probe uniqueness here, as preflight + // already guaranteed non-conflict. + for index in schema.get_indices(table_name.as_str()) { + let idx_cursor_id = idx_cursors + .iter() + .find(|(name, _, _)| *name == &index.name) + .map(|(_, _, c_id)| *c_id) + .expect("no cursor found for index"); + + // Re-evaluate partial predicate on the would-be inserted image + let commit_skip_label = if let Some(where_clause) = &index.where_clause { + let mut where_for_eval = where_clause.as_ref().clone(); + rewrite_partial_index_where(&mut where_for_eval, &insertion)?; + let reg = program.alloc_register(); + translate_expr_no_constant_opt( + &mut program, + Some(&TableReferences::new_empty()), + &where_for_eval, + reg, + &resolver, + NoConstantOptReason::RegisterReuse, + )?; + let lbl = program.allocate_label(); + program.emit_insn(Insn::IfNot { + reg, + target_pc: lbl, + jump_if_null: true, + }); + Some(lbl) + } else { + None + }; + + let num_cols = index.columns.len(); + let idx_start_reg = program.alloc_registers(num_cols + 1); + + // Build [key cols..., rowid] from insertion registers + for (i, idx_col) in index.columns.iter().enumerate() { + let Some(cm) = insertion.get_col_mapping_by_name(&idx_col.name) else { + return Err(crate::LimboError::PlanningError( + "Column not found in INSERT (commit phase)".to_string(), + )); + }; + program.emit_insn(Insn::Copy { + src_reg: cm.register, + dst_reg: idx_start_reg + i, + extra_amount: 0, + }); + } + program.emit_insn(Insn::Copy { + src_reg: insertion.key_register(), + dst_reg: idx_start_reg + num_cols, + extra_amount: 0, + }); + + let record_reg = program.alloc_register(); + program.emit_insn(Insn::MakeRecord { + start_reg: idx_start_reg, + count: num_cols + 1, + dest_reg: record_reg, + index_name: Some(index.name.clone()), + affinity_str: None, + }); + program.emit_insn(Insn::IdxInsert { + cursor_id: idx_cursor_id, + record_reg, + unpacked_start: Some(idx_start_reg), + unpacked_count: Some((num_cols + 1) as u16), + flags: IdxInsertFlags::new().nchange(true), + }); + + if let Some(lbl) = commit_skip_label { + program.resolve_label(lbl, program.offset()); + } + } + } + program.emit_insn(Insn::Insert { cursor: cursor_id, key_reg: insertion.key_register(), @@ -764,6 +867,45 @@ pub fn translate_insert( emit_returning_results(&mut program, &result_columns, &value_registers)?; } + program.emit_insn(Insn::Goto { + target_pc: row_done_label, + }); + + // Normal INSERT path is done above + // Any conflict routed to UPSERT jumps past all that to here: + program.preassign_label_to_next_insn(upsert_entry); + if let (Some(mut upsert), Some(_)) = (upsert_opt.take(), resolved_upsert.clone()) { + // Only DO UPDATE (SET ...); DO NOTHING should have already jumped to row_done_label earlier. + if let UpsertDo::Set { + ref mut sets, + ref mut where_clause, + } = upsert.do_clause + { + // Normalize SET pairs once + let mut rewritten_sets = collect_set_clauses_for_upsert(&table, sets)?; + + emit_upsert( + &mut program, + schema, + &table, + &insertion, + cursor_id, + conflict_rowid_reg, + &mut rewritten_sets, + where_clause, + &resolver, + &idx_cursors, + &mut result_columns, + cdc_table.as_ref().map(|c| c.0), + row_done_label, + )?; + } else { + // UpsertDo::Nothing case + program.emit_insn(Insn::Goto { + target_pc: row_done_label, + }); + } + } if inserting_multiple_rows { if let Some(temp_table_ctx) = temp_table_ctx { @@ -1231,11 +1373,52 @@ fn translate_virtual_table_insert( Ok(program) } +#[inline] +/// Build the UNIQUE constraint error description to match sqlite +/// single column: `t.c1` +/// multi-column: `t.(k, c1)` +pub fn format_unique_violation_desc(table_name: &str, index: &Index) -> String { + if index.columns.len() == 1 { + let mut s = String::with_capacity(table_name.len() + 1 + index.columns[0].name.len()); + s.push_str(table_name); + s.push('.'); + s.push_str(&index.columns[0].name); + s + } else { + let mut s = String::with_capacity(table_name.len() + 3 + 4 * index.columns.len()); + s.push_str(table_name); + s.push_str(".("); + s.push_str( + &index + .columns + .iter() + .map(|c| c.name.as_str()) + .collect::>() + .join(", "), + ); + s.push(')'); + s + } +} + /// Rewrite WHERE clause for partial index to reference insertion registers pub fn rewrite_partial_index_where( expr: &mut ast::Expr, insertion: &Insertion, ) -> crate::Result { + let col_reg = |name: &str| -> Option { + if name.eq_ignore_ascii_case("rowid") { + Some(insertion.key_register()) + } else if let Some(c) = insertion.get_col_mapping_by_name(name) { + if c.column.is_rowid_alias { + Some(insertion.key_register()) + } else { + Some(c.register) + } + } else { + None + } + }; walk_expr_mut( expr, &mut |e: &mut ast::Expr| -> crate::Result { @@ -1243,33 +1426,16 @@ pub fn rewrite_partial_index_where( // NOTE: should not have ANY Expr::Columns bound to the expr Expr::Id(ast::Name::Ident(name)) | Expr::Id(ast::Name::Quoted(name)) => { let normalized = normalize_ident(name.as_str()); - if normalized.eq_ignore_ascii_case("rowid") { - *e = Expr::Register(insertion.key_register()); - } else if let Some(col_mapping) = insertion.get_col_mapping_by_name(&normalized) - { - if col_mapping.column.is_rowid_alias { - *e = Expr::Register(insertion.key_register()); - } else { - *e = Expr::Register(col_mapping.register); - } + if let Some(reg) = col_reg(&normalized) { + *e = Expr::Register(reg); } } Expr::Qualified(_, col) | Expr::DoublyQualified(_, _, col) => { let normalized = normalize_ident(col.as_str()); - if normalized.eq_ignore_ascii_case("rowid") { - *e = Expr::Register(insertion.key_register()); - } else if let Some(col_mapping) = insertion.get_col_mapping_by_name(&normalized) - { - if col_mapping.column.is_rowid_alias { - *e = Expr::Register(insertion.key_register()); - } else { - *e = Expr::Register(col_mapping.register); - } + if let Some(reg) = col_reg(&normalized) { + *e = Expr::Register(reg); } } - Expr::RowId { .. } => { - *e = Expr::Register(insertion.key_register()); - } _ => {} } Ok(WalkControl::Continue)