fix upsert conflict handling

This commit is contained in:
Nikita Sivukhin
2025-09-30 22:39:55 +04:00
parent 73f68dfcfb
commit 9ef05adc5e
4 changed files with 279 additions and 216 deletions

View File

@@ -138,7 +138,6 @@ pub fn translate_insert(
let mut values: Option<Vec<Box<Expr>>> = None; let mut values: Option<Vec<Box<Expr>>> = None;
let mut upsert_actions: Vec<(ResolvedUpsertTarget, BranchOffset, Box<Upsert>)> = Vec::new(); let mut upsert_actions: Vec<(ResolvedUpsertTarget, BranchOffset, Box<Upsert>)> = Vec::new();
let upsert_matched_idx_reg = program.alloc_register();
let mut inserting_multiple_rows = false; let mut inserting_multiple_rows = false;
if let InsertBody::Select(select, upsert_opt) = &mut body { if let InsertBody::Select(select, upsert_opt) = &mut body {
@@ -625,50 +624,6 @@ pub fn translate_insert(
program.preassign_label_to_next_insn(key_ready_for_uniqueness_check_label); program.preassign_label_to_next_insn(key_ready_for_uniqueness_check_label);
// Check uniqueness constraint for rowid if it was provided by user.
// When the DB allocates it there are no need for separate uniqueness checks.
if has_user_provided_rowid {
let make_record_label = program.allocate_label();
program.emit_insn(Insn::NotExists {
cursor: cursor_id,
rowid_reg: insertion.key_register(),
target_pc: make_record_label,
});
let rowid_column_name = insertion.key.column_name();
// Conflict on rowid: attempt to route through UPSERT if it targets the PK, otherwise raise constraint.
// emit Halt for every case *except* when upsert handles the conflict
'emit_halt: {
for (i, (target, label, _)) in upsert_actions.iter().enumerate() {
match target {
ResolvedUpsertTarget::CatchAll | ResolvedUpsertTarget::PrimaryKey => {
program.emit_int(i as i64, upsert_matched_idx_reg);
// PK conflict: the conflicting rowid is exactly the attempted key
program.emit_insn(Insn::Copy {
src_reg: insertion.key_register(),
dst_reg: conflict_rowid_reg,
extra_amount: 0,
});
program.emit_insn(Insn::Goto { target_pc: *label });
break 'emit_halt;
}
_ => {}
}
}
let mut description =
String::with_capacity(table_name.as_str().len() + rowid_column_name.len() + 2);
description.push_str(table_name.as_str());
description.push('.');
description.push_str(rowid_column_name);
program.emit_insn(Insn::Halt {
err_code: SQLITE_CONSTRAINT_PRIMARYKEY,
description,
});
}
program.preassign_label_to_next_insn(make_record_label);
}
match table.btree() { match table.btree() {
Some(t) if t.is_strict => { Some(t) if t.is_strict => {
program.emit_insn(Insn::TypeCheck { program.emit_insn(Insn::TypeCheck {
@@ -681,6 +636,36 @@ pub fn translate_insert(
_ => (), _ => (),
} }
let mut constraints_to_check = Vec::new();
if has_user_provided_rowid {
// Check uniqueness constraint for rowid if it was provided by user.
// When the DB allocates it there are no need for separate uniqueness checks.
let position = upsert_actions
.iter()
.position(|(target, ..)| matches!(target, ResolvedUpsertTarget::PrimaryKey));
constraints_to_check.push((ResolvedUpsertTarget::PrimaryKey, position));
}
for index in resolver.schema.get_indices(table_name.as_str()) {
let position = upsert_actions
.iter()
.position(|(target, ..)| matches!(target, ResolvedUpsertTarget::Index(x) if Arc::ptr_eq(&x, index)));
constraints_to_check.push((ResolvedUpsertTarget::Index(index.clone()), position));
}
constraints_to_check.sort_by(|(_, p1), (_, p2)| match (p1, p2) {
(Some(p1), Some(p2)) => p1.cmp(p2),
(Some(_), None) => std::cmp::Ordering::Less,
(None, Some(_)) => std::cmp::Ordering::Greater,
(None, None) => std::cmp::Ordering::Equal,
});
let upsert_catch_all_position =
if let Some((ResolvedUpsertTarget::CatchAll, ..)) = upsert_actions.last() {
Some(upsert_actions.len() - 1)
} else {
None
};
// We need to separate index handling and insertion into a `preflight` and a // We need to separate index handling and insertion into a `preflight` and a
// `commit` phase, because in UPSERT mode we might need to skip the actual insertion, as we can // `commit` phase, because in UPSERT mode we might need to skip the actual insertion, as we can
// have a naked ON CONFLICT DO NOTHING, so if we eagerly insert any indexes, we could insert // have a naked ON CONFLICT DO NOTHING, so if we eagerly insert any indexes, we could insert
@@ -693,7 +678,46 @@ pub fn translate_insert(
// DO UPDATE (matching target) -> fetch conflicting rowid and jump to `upsert_entry`. // DO UPDATE (matching target) -> fetch conflicting rowid and jump to `upsert_entry`.
// //
// otherwise, raise SQLITE_CONSTRAINT_UNIQUE // otherwise, raise SQLITE_CONSTRAINT_UNIQUE
for index in resolver.schema.get_indices(table_name.as_str()) { for (constraint, position) in constraints_to_check {
match constraint {
ResolvedUpsertTarget::PrimaryKey => {
let make_record_label = program.allocate_label();
program.emit_insn(Insn::NotExists {
cursor: cursor_id,
rowid_reg: insertion.key_register(),
target_pc: make_record_label,
});
let rowid_column_name = insertion.key.column_name();
// Conflict on rowid: attempt to route through UPSERT if it targets the PK, otherwise raise constraint.
// emit Halt for every case *except* when upsert handles the conflict
'emit_halt: {
if let Some(position) = position.or(upsert_catch_all_position) {
// PK conflict: the conflicting rowid is exactly the attempted key
program.emit_insn(Insn::Copy {
src_reg: insertion.key_register(),
dst_reg: conflict_rowid_reg,
extra_amount: 0,
});
program.emit_insn(Insn::Goto {
target_pc: upsert_actions[position].1,
});
break 'emit_halt;
}
let mut description = String::with_capacity(
table_name.as_str().len() + rowid_column_name.len() + 2,
);
description.push_str(table_name.as_str());
description.push('.');
description.push_str(rowid_column_name);
program.emit_insn(Insn::Halt {
err_code: SQLITE_CONSTRAINT_PRIMARYKEY,
description,
});
}
program.preassign_label_to_next_insn(make_record_label);
}
ResolvedUpsertTarget::Index(index) => {
let column_mappings = index let column_mappings = index
.columns .columns
.iter() .iter()
@@ -776,14 +800,8 @@ pub fn translate_insert(
}); });
// Conflict detected, figure out if this UPSERT handles the conflict // Conflict detected, figure out if this UPSERT handles the conflict
for (i, (target, label, upsert)) in upsert_actions.iter().enumerate() { if let Some(position) = position.or(upsert_catch_all_position) {
match target { match &upsert_actions[position].2.do_clause {
ResolvedUpsertTarget::CatchAll => {}
ResolvedUpsertTarget::Index(tgt) if Arc::ptr_eq(tgt, index) => {}
_ => continue,
}
program.emit_int(i as i64, upsert_matched_idx_reg);
match &upsert.do_clause {
UpsertDo::Nothing => { UpsertDo::Nothing => {
// Bail out without writing anything // Bail out without writing anything
program.emit_insn(Insn::Goto { program.emit_insn(Insn::Goto {
@@ -796,16 +814,17 @@ pub fn translate_insert(
cursor_id: idx_cursor_id, cursor_id: idx_cursor_id,
dest: conflict_rowid_reg, dest: conflict_rowid_reg,
}); });
program.emit_insn(Insn::Goto { target_pc: *label }); program.emit_insn(Insn::Goto {
target_pc: upsert_actions[position].1,
});
} }
} }
break;
} }
// No matching UPSERT handler so we emit constraint error // No matching UPSERT handler so we emit constraint error
// (if conflict clause matched - VM will jump to later instructions and skip halt) // (if conflict clause matched - VM will jump to later instructions and skip halt)
program.emit_insn(Insn::Halt { program.emit_insn(Insn::Halt {
err_code: SQLITE_CONSTRAINT_UNIQUE, err_code: SQLITE_CONSTRAINT_UNIQUE,
description: format_unique_violation_desc(table_name.as_str(), index), description: format_unique_violation_desc(table_name.as_str(), &index),
}); });
// continue preflight with next constraint // continue preflight with next constraint
@@ -822,7 +841,7 @@ pub fn translate_insert(
// Unique violation without ON CONFLICT clause -> error // Unique violation without ON CONFLICT clause -> error
program.emit_insn(Insn::Halt { program.emit_insn(Insn::Halt {
err_code: SQLITE_CONSTRAINT_UNIQUE, err_code: SQLITE_CONSTRAINT_UNIQUE,
description: format_unique_violation_desc(table_name.as_str(), index), description: format_unique_violation_desc(table_name.as_str(), &index),
}); });
program.preassign_label_to_next_insn(ok); program.preassign_label_to_next_insn(ok);
@@ -870,6 +889,10 @@ pub fn translate_insert(
program.resolve_label(lbl, program.offset()); program.resolve_label(lbl, program.offset());
} }
} }
ResolvedUpsertTarget::CatchAll => unreachable!(),
}
}
for column_mapping in insertion for column_mapping in insertion
.col_mappings .col_mappings
.iter() .iter()

View File

@@ -353,6 +353,25 @@ do_execsql_test_on_specific_db {:memory:} upsert-doubly-qualified-target {
SELECT * FROM dq; SELECT * FROM dq;
} {1|new} } {1|new}
do_execsql_test_on_specific_db {:memory:} upsert-targets-chain {
CREATE TABLE dq (a UNIQUE, b UNIQUE, c UNIQUE, value TEXT);
CREATE UNIQUE INDEX dq_ab ON dq(a, b);
INSERT INTO dq VALUES ('a1', 'a2', 'a3', 'aaa');
INSERT INTO dq VALUES ('b1', 'b2', 'b3', 'bbb');
INSERT INTO dq VALUES ('c1', 'c2', 'c3', 'ccc');
INSERT INTO dq VALUES ('d1', 'd2', 'd3', 'ddd');
INSERT INTO dq VALUES
('a1', 'a2', 'a3', 'upd1'), ('b1', 'b1', 'b1', 'upd2'), ('c2', 'c2', 'c2', 'upd3'), ('d3', 'd3', 'd3', 'upd4')
ON CONFLICT (a, b) DO UPDATE SET value = excluded.value || '-a'
ON CONFLICT (a) DO UPDATE SET value = excluded.value || '-b'
ON CONFLICT (b) DO UPDATE SET value = excluded.value || '-c'
ON CONFLICT DO UPDATE SET value = excluded.value || '-d';
SELECT * FROM dq;
} {a1|a2|a3|upd1-a
b1|b2|b3|upd2-b
c1|c2|c3|upd3-c
d1|d2|d3|upd4-d}
# https://github.com/tursodatabase/turso/issues/3384 # https://github.com/tursodatabase/turso/issues/3384
do_execsql_test_on_specific_db {:memory:} upsert-non-rowid-pk-target { do_execsql_test_on_specific_db {:memory:} upsert-non-rowid-pk-target {
create table phonebook(name text primary key, phonenumber text, validDate date); create table phonebook(name text primary key, phonenumber text, validDate date);

View File

@@ -672,12 +672,12 @@ mod tests {
#[test] #[test]
pub fn partial_index_mutation_and_upsert_fuzz() { pub fn partial_index_mutation_and_upsert_fuzz() {
index_mutation_upsert_fuzz(1.0, 1); index_mutation_upsert_fuzz(1.0, 4);
} }
#[test] #[test]
pub fn simple_index_mutation_and_upsert_fuzz() { pub fn simple_index_mutation_and_upsert_fuzz() {
index_mutation_upsert_fuzz(0.0, 0); index_mutation_upsert_fuzz(0.0, 4);
} }
fn index_mutation_upsert_fuzz(partial_index_prob: f64, conflict_chain_max_len: u32) { fn index_mutation_upsert_fuzz(partial_index_prob: f64, conflict_chain_max_len: u32) {

View File

@@ -839,3 +839,24 @@ pub fn delete_eq_correct() {
limbo_exec_rows(&limbo, &conn, "SELECT * FROM t ORDER BY id") limbo_exec_rows(&limbo, &conn, "SELECT * FROM t ORDER BY id")
); );
} }
#[test]
pub fn upsert_conflict() {
let limbo = TempDatabase::new_empty(true);
let conn = limbo.db.connect().unwrap();
for sql in [
"CREATE TABLE t (id INTEGER PRIMARY KEY AUTOINCREMENT, c INT UNIQUE, value INT);",
"INSERT INTO t VALUES (1, 2, 100);",
"INSERT INTO t VALUES (1, 2, 0) ON CONFLICT (c) DO UPDATE SET value = 42;",
] {
conn.execute(sql).unwrap();
}
assert_eq!(
vec![vec![
rusqlite::types::Value::Integer(1),
rusqlite::types::Value::Integer(2),
rusqlite::types::Value::Integer(42),
]],
limbo_exec_rows(&limbo, &conn, "SELECT * FROM t")
);
}