support multiple conflict clauses in upsert

This commit is contained in:
Nikita Sivukhin
2025-09-29 18:45:13 +04:00
parent 0157ffec7f
commit 3590f9882d
3 changed files with 84 additions and 53 deletions

View File

@@ -23,7 +23,7 @@ use crate::translate::upsert::{
};
use crate::util::normalize_ident;
use crate::vdbe::builder::ProgramBuilderOpts;
use crate::vdbe::insn::{IdxInsertFlags, InsertFlags, RegisterOrLiteral};
use crate::vdbe::insn::{CmpInsFlags, IdxInsertFlags, InsertFlags, RegisterOrLiteral};
use crate::vdbe::BranchOffset;
use crate::{
schema::{Column, Schema},
@@ -137,10 +137,11 @@ pub fn translate_insert(
let root_page = btree_table.root_page;
let mut values: Option<Vec<Box<Expr>>> = None;
let mut upsert_opt: Option<Upsert> = None;
let mut upsert_actions: Vec<(ResolvedUpsertTarget, Box<Upsert>)> = Vec::new();
let upsert_matched_idx_reg = program.alloc_register();
let mut inserting_multiple_rows = false;
if let InsertBody::Select(select, upsert) = &mut body {
if let InsertBody::Select(select, upsert_opt) = &mut body {
match &mut select.body.select {
// TODO see how to avoid clone
OneSelect::Values(values_expr) if values_expr.len() <= 1 => {
@@ -177,7 +178,7 @@ pub fn translate_insert(
}
_ => inserting_multiple_rows = true,
}
if let Some(ref mut upsert) = upsert {
while let Some(mut upsert) = upsert_opt.take() {
if let UpsertDo::Set {
ref mut sets,
ref mut where_clause,
@@ -192,6 +193,7 @@ pub fn translate_insert(
&mut program.param_ctx,
BindingBehavior::ResultColumnsNotAllowed,
)?;
tracing::info!("rewrite expr: {:?}", set.expr);
}
if let Some(ref mut where_expr) = where_clause {
bind_and_rewrite_expr(
@@ -204,15 +206,15 @@ pub fn translate_insert(
)?;
}
}
let next = upsert.next.take();
upsert_actions.push((
// resolve the constrained target for UPSERT in the chain
resolve_upsert_target(resolver.schema, &table, &upsert)?,
upsert,
));
*upsert_opt = next;
}
upsert_opt = upsert.as_deref().cloned();
}
// resolve the constrained target for UPSERT if specified
let resolved_upsert = if let Some(upsert) = &upsert_opt {
Some(resolve_upsert_target(resolver.schema, &table, upsert)?)
} else {
None
};
if inserting_multiple_rows && btree_table.has_autoincrement {
ensure_sequence_initialized(&mut program, resolver.schema, &btree_table)?;
@@ -407,7 +409,7 @@ pub fn translate_insert(
)
}
};
let has_upsert = upsert_opt.is_some();
let has_upsert = !upsert_actions.is_empty();
// Set up the program to return result columns if RETURNING is specified
if !result_columns.is_empty() {
@@ -639,21 +641,22 @@ pub fn translate_insert(
// Conflict on rowid: attempt to route through UPSERT if it targets the PK, otherwise raise constraint.
// emit Halt for every case *except* when upsert handles the conflict
'emit_halt: {
if let (Some(_), Some(ref target)) = (upsert_opt.as_mut(), resolved_upsert.as_ref()) {
if matches!(
target,
ResolvedUpsertTarget::CatchAll | ResolvedUpsertTarget::PrimaryKey
) {
// PK conflict: the conflicting rowid is exactly the attempted key
program.emit_insn(Insn::Copy {
src_reg: insertion.key_register(),
dst_reg: conflict_rowid_reg,
extra_amount: 0,
});
program.emit_insn(Insn::Goto {
target_pc: upsert_entry,
});
break 'emit_halt;
for (i, (target, _)) in upsert_actions.iter().enumerate() {
match target {
ResolvedUpsertTarget::CatchAll | ResolvedUpsertTarget::PrimaryKey => {
program.emit_int(i as i64, upsert_matched_idx_reg);
// PK conflict: the conflicting rowid is exactly the attempted key
program.emit_insn(Insn::Copy {
src_reg: insertion.key_register(),
dst_reg: conflict_rowid_reg,
extra_amount: 0,
});
program.emit_insn(Insn::Goto {
target_pc: upsert_entry,
});
break 'emit_halt;
}
_ => {}
}
}
let mut description =
@@ -776,22 +779,14 @@ pub fn translate_insert(
});
// Conflict detected, figure out if this UPSERT handles the conflict
let upsert_matches_this_index = if let (Some(_u), Some(ref target)) =
(upsert_opt.as_ref(), resolved_upsert.as_ref())
{
for (i, (target, upsert)) in upsert_actions.iter().enumerate() {
match target {
ResolvedUpsertTarget::CatchAll => true,
ResolvedUpsertTarget::Index(tgt) => Arc::ptr_eq(tgt, index),
// note: PK handled earlier by rowid path; this is a secondary index
ResolvedUpsertTarget::PrimaryKey => false,
ResolvedUpsertTarget::CatchAll => {}
ResolvedUpsertTarget::Index(tgt) if Arc::ptr_eq(tgt, index) => {}
_ => continue,
}
} else {
false
};
if upsert_matches_this_index {
// Distinguish DO NOTHING vs DO UPDATE
match upsert_opt.as_ref().unwrap().do_clause {
program.emit_int(i as i64, upsert_matched_idx_reg);
match &upsert.do_clause {
UpsertDo::Nothing => {
// Bail out without writing anything
program.emit_insn(Insn::Goto {
@@ -809,13 +804,14 @@ pub fn translate_insert(
});
}
}
} else {
// No matching UPSERT handler so we emit constraint error
program.emit_insn(Insn::Halt {
err_code: SQLITE_CONSTRAINT_UNIQUE,
description: format_unique_violation_desc(table_name.as_str(), index),
});
break;
}
// No matching UPSERT handler so we emit constraint error
// (if conflict clause matched - VM will jump to later instructions and skip halt)
program.emit_insn(Insn::Halt {
err_code: SQLITE_CONSTRAINT_UNIQUE,
description: format_unique_violation_desc(table_name.as_str(), index),
});
// continue preflight with next constraint
program.preassign_label_to_next_insn(next_check);
@@ -1085,11 +1081,12 @@ pub fn translate_insert(
target_pc: row_done_label,
});
// Normal INSERT path is done above
// Any conflict routed to UPSERT jumps past all that to here:
program.preassign_label_to_next_insn(upsert_entry);
if let (Some(mut upsert), Some(_)) = (upsert_opt.take(), resolved_upsert.clone()) {
// Only DO UPDATE (SET ...); DO NOTHING should have already jumped to row_done_label earlier.
let mut upsert_action_labels = Vec::new();
for (_, mut upsert) in upsert_actions {
let label = program.allocate_label();
upsert_action_labels.push(label);
program.preassign_label_to_next_insn(label);
if let UpsertDo::Set {
ref mut sets,
ref mut where_clause,
@@ -1120,6 +1117,24 @@ pub fn translate_insert(
}
}
// Normal INSERT path is done above
// Any conflict routed to UPSERT jumps past all that to here:
program.preassign_label_to_next_insn(upsert_entry);
for (i, label) in upsert_action_labels.iter().enumerate() {
let upsert_action_id = program.alloc_register();
program.emit_int(i as i64, upsert_action_id);
program.mark_last_insn_constant();
program.emit_insn(Insn::Eq {
lhs: upsert_matched_idx_reg,
rhs: upsert_action_id,
target_pc: *label,
flags: CmpInsFlags::default(),
collation: None,
});
}
if inserting_multiple_rows {
if let Some(temp_table_ctx) = temp_table_ctx {
program.resolve_label(row_done_label, program.offset());

View File

@@ -274,7 +274,7 @@ pub fn upsert_matches_index(upsert: &Upsert, index: &Index, table: &Table) -> bo
idx_cols.is_empty()
}
#[derive(Clone)]
#[derive(Clone, Debug)]
pub enum ResolvedUpsertTarget {
// ON CONFLICT DO
CatchAll,

16
testing/upsert.test Normal file → Executable file
View File

@@ -201,6 +201,22 @@ do_execsql_test_in_memory_any_error upsert-composite-target-too-few {
ON CONFLICT(a) DO UPDATE SET val = excluded.val; -- only "a" given → no match → error
}
# Composite index requires exact coverage, targeting too few columns must not match.
do_execsql_test_on_specific_db {:memory:} upsert-multilple-conflict-targets {
CREATE TABLE ct (id INTEGER PRIMARY KEY, x UNIQUE, y UNIQUE, z DEFAULT NULL);
INSERT INTO ct(id, x, y) VALUES (1, 'x', 'y');
INSERT INTO ct(id, x, y) VALUES (2, 'a', 'b');
INSERT INTO ct(id, x, y) VALUES (3, '!', '@');
INSERT INTO ct(id, x, y) VALUES (4, 'x', 'y1'), (5, 'a1', 'b'), (3, '_', '_')
ON CONFLICT(x) DO UPDATE SET x = excluded.x || '-' || x, y = excluded.y || '@' || y, z = 'x'
ON CONFLICT(y) DO UPDATE SET x = excluded.x || '+' || x, y = excluded.y || '!' || y, z = 'y'
ON CONFLICT DO UPDATE SET x = excluded.x || '#' || x, y = excluded.y || '%' || y, z = 'fallback';
SELECT * FROM ct;
} {1|x-x|y1@y|x
2|a1+a|b!b|y
3|_#!|_%@|fallback
}
# Qualified target (t.a) should match unique index on a.
do_execsql_test_on_specific_db {:memory:} upsert-qualified-target {
CREATE TABLE qt (a UNIQUE, b);