mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-23 17:05:36 +01:00
Merge 'support multiple conflict clauses in upsert' from Nikita Sivukhin
This PR implements support for `ON CONFLICT` clause chain, e.g. ``` INSERT INTO ct(id, x, y) VALUES (4, 'x', 'y1'), (5, 'a1', 'b'), (3, '_', '_') ON CONFLICT(x) DO UPDATE SET x = excluded.x || '-' || x, y = excluded.y || '@' || y, z = 'x' ON CONFLICT(y) DO UPDATE SET x = excluded.x || '+' || x, y = excluded.y || '!' || y, z = 'y' ON CONFLICT DO UPDATE SET x = excluded.x || '#' || x, y = excluded.y || '%' || y, z = 'fallback'; ``` Reviewed-by: Jussi Saurio <jussi.saurio@gmail.com> Reviewed-by: Preston Thorpe <preston@turso.tech> Closes #3453
This commit is contained in:
@@ -136,6 +136,37 @@ test('offset-bug', async () => {
|
||||
expect(await stmt.all(1, 1)).toEqual([{ id: 2, name: 'John1', verified: 0 }])
|
||||
})
|
||||
|
||||
test('conflict-bug', async () => {
|
||||
const db = await connect(':memory:');
|
||||
|
||||
const create = db.prepare(`create table "conflict_chain_example" (
|
||||
id integer not null unique,
|
||||
name text not null,
|
||||
email text not null,
|
||||
primary key (id, name)
|
||||
)`);
|
||||
await create.run();
|
||||
|
||||
await db.prepare(`insert into "conflict_chain_example" ("id", "name", "email") values (?, ?, ?), (?, ?, ?)`).run(
|
||||
1,
|
||||
'John',
|
||||
'john@example.com',
|
||||
2,
|
||||
'John Second',
|
||||
'2john@example.com',
|
||||
);
|
||||
|
||||
const insert = db.prepare(
|
||||
`insert into "conflict_chain_example" ("id", "name", "email") values (?, ?, ?), (?, ?, ?) on conflict ("conflict_chain_example"."id", "conflict_chain_example"."name") do update set "email" = ? on conflict ("conflict_chain_example"."id") do nothing`,
|
||||
);
|
||||
await insert.run(1, 'John', 'john@example.com', 2, 'Anthony', 'idthief@example.com', 'john1@example.com');
|
||||
|
||||
expect(await db.prepare("SELECT * FROM conflict_chain_example").all()).toEqual([
|
||||
{ id: 1, name: 'John', email: 'john1@example.com' },
|
||||
{ id: 2, name: 'John Second', email: '2john@example.com' }
|
||||
]);
|
||||
})
|
||||
|
||||
test('on-disk db', async () => {
|
||||
const path = `test-${(Math.random() * 10000) | 0}.db`;
|
||||
try {
|
||||
|
||||
@@ -137,10 +137,10 @@ pub fn translate_insert(
|
||||
let root_page = btree_table.root_page;
|
||||
|
||||
let mut values: Option<Vec<Box<Expr>>> = None;
|
||||
let mut upsert_opt: Option<Upsert> = None;
|
||||
let mut upsert_actions: Vec<(ResolvedUpsertTarget, BranchOffset, Box<Upsert>)> = Vec::new();
|
||||
|
||||
let mut inserting_multiple_rows = false;
|
||||
if let InsertBody::Select(select, upsert) = &mut body {
|
||||
if let InsertBody::Select(select, upsert_opt) = &mut body {
|
||||
match &mut select.body.select {
|
||||
// TODO see how to avoid clone
|
||||
OneSelect::Values(values_expr) if values_expr.len() <= 1 => {
|
||||
@@ -177,7 +177,7 @@ pub fn translate_insert(
|
||||
}
|
||||
_ => inserting_multiple_rows = true,
|
||||
}
|
||||
if let Some(ref mut upsert) = upsert {
|
||||
while let Some(mut upsert) = upsert_opt.take() {
|
||||
if let UpsertDo::Set {
|
||||
ref mut sets,
|
||||
ref mut where_clause,
|
||||
@@ -204,15 +204,16 @@ pub fn translate_insert(
|
||||
)?;
|
||||
}
|
||||
}
|
||||
let next = upsert.next.take();
|
||||
upsert_actions.push((
|
||||
// resolve the constrained target for UPSERT in the chain
|
||||
resolve_upsert_target(resolver.schema, &table, &upsert)?,
|
||||
program.allocate_label(),
|
||||
upsert,
|
||||
));
|
||||
*upsert_opt = next;
|
||||
}
|
||||
upsert_opt = upsert.as_deref().cloned();
|
||||
}
|
||||
// resolve the constrained target for UPSERT if specified
|
||||
let resolved_upsert = if let Some(upsert) = &upsert_opt {
|
||||
Some(resolve_upsert_target(resolver.schema, &table, upsert)?)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
if inserting_multiple_rows && btree_table.has_autoincrement {
|
||||
ensure_sequence_initialized(&mut program, resolver.schema, &btree_table)?;
|
||||
@@ -407,7 +408,7 @@ pub fn translate_insert(
|
||||
)
|
||||
}
|
||||
};
|
||||
let has_upsert = upsert_opt.is_some();
|
||||
let has_upsert = !upsert_actions.is_empty();
|
||||
|
||||
// Set up the program to return result columns if RETURNING is specified
|
||||
if !result_columns.is_empty() {
|
||||
@@ -430,7 +431,6 @@ pub fn translate_insert(
|
||||
|
||||
let insertion = build_insertion(&mut program, &table, &columns, num_values)?;
|
||||
|
||||
let upsert_entry = program.allocate_label();
|
||||
let conflict_rowid_reg = program.alloc_register();
|
||||
|
||||
if inserting_multiple_rows {
|
||||
@@ -624,51 +624,6 @@ pub fn translate_insert(
|
||||
|
||||
program.preassign_label_to_next_insn(key_ready_for_uniqueness_check_label);
|
||||
|
||||
// Check uniqueness constraint for rowid if it was provided by user.
|
||||
// When the DB allocates it there are no need for separate uniqueness checks.
|
||||
|
||||
if has_user_provided_rowid {
|
||||
let make_record_label = program.allocate_label();
|
||||
program.emit_insn(Insn::NotExists {
|
||||
cursor: cursor_id,
|
||||
rowid_reg: insertion.key_register(),
|
||||
target_pc: make_record_label,
|
||||
});
|
||||
let rowid_column_name = insertion.key.column_name();
|
||||
|
||||
// Conflict on rowid: attempt to route through UPSERT if it targets the PK, otherwise raise constraint.
|
||||
// emit Halt for every case *except* when upsert handles the conflict
|
||||
'emit_halt: {
|
||||
if let (Some(_), Some(ref target)) = (upsert_opt.as_mut(), resolved_upsert.as_ref()) {
|
||||
if matches!(
|
||||
target,
|
||||
ResolvedUpsertTarget::CatchAll | ResolvedUpsertTarget::PrimaryKey
|
||||
) {
|
||||
// PK conflict: the conflicting rowid is exactly the attempted key
|
||||
program.emit_insn(Insn::Copy {
|
||||
src_reg: insertion.key_register(),
|
||||
dst_reg: conflict_rowid_reg,
|
||||
extra_amount: 0,
|
||||
});
|
||||
program.emit_insn(Insn::Goto {
|
||||
target_pc: upsert_entry,
|
||||
});
|
||||
break 'emit_halt;
|
||||
}
|
||||
}
|
||||
let mut description =
|
||||
String::with_capacity(table_name.as_str().len() + rowid_column_name.len() + 2);
|
||||
description.push_str(table_name.as_str());
|
||||
description.push('.');
|
||||
description.push_str(rowid_column_name);
|
||||
program.emit_insn(Insn::Halt {
|
||||
err_code: SQLITE_CONSTRAINT_PRIMARYKEY,
|
||||
description,
|
||||
});
|
||||
}
|
||||
program.preassign_label_to_next_insn(make_record_label);
|
||||
}
|
||||
|
||||
match table.btree() {
|
||||
Some(t) if t.is_strict => {
|
||||
program.emit_insn(Insn::TypeCheck {
|
||||
@@ -681,6 +636,36 @@ pub fn translate_insert(
|
||||
_ => (),
|
||||
}
|
||||
|
||||
let mut constraints_to_check = Vec::new();
|
||||
if has_user_provided_rowid {
|
||||
// Check uniqueness constraint for rowid if it was provided by user.
|
||||
// When the DB allocates it there are no need for separate uniqueness checks.
|
||||
let position = upsert_actions
|
||||
.iter()
|
||||
.position(|(target, ..)| matches!(target, ResolvedUpsertTarget::PrimaryKey));
|
||||
constraints_to_check.push((ResolvedUpsertTarget::PrimaryKey, position));
|
||||
}
|
||||
for index in resolver.schema.get_indices(table_name.as_str()) {
|
||||
let position = upsert_actions
|
||||
.iter()
|
||||
.position(|(target, ..)| matches!(target, ResolvedUpsertTarget::Index(x) if Arc::ptr_eq(x, index)));
|
||||
constraints_to_check.push((ResolvedUpsertTarget::Index(index.clone()), position));
|
||||
}
|
||||
|
||||
constraints_to_check.sort_by(|(_, p1), (_, p2)| match (p1, p2) {
|
||||
(Some(p1), Some(p2)) => p1.cmp(p2),
|
||||
(Some(_), None) => std::cmp::Ordering::Less,
|
||||
(None, Some(_)) => std::cmp::Ordering::Greater,
|
||||
(None, None) => std::cmp::Ordering::Equal,
|
||||
});
|
||||
|
||||
let upsert_catch_all_position =
|
||||
if let Some((ResolvedUpsertTarget::CatchAll, ..)) = upsert_actions.last() {
|
||||
Some(upsert_actions.len() - 1)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// We need to separate index handling and insertion into a `preflight` and a
|
||||
// `commit` phase, because in UPSERT mode we might need to skip the actual insertion, as we can
|
||||
// have a naked ON CONFLICT DO NOTHING, so if we eagerly insert any indexes, we could insert
|
||||
@@ -693,192 +678,221 @@ pub fn translate_insert(
|
||||
// DO UPDATE (matching target) -> fetch conflicting rowid and jump to `upsert_entry`.
|
||||
//
|
||||
// otherwise, raise SQLITE_CONSTRAINT_UNIQUE
|
||||
for index in resolver.schema.get_indices(table_name.as_str()) {
|
||||
let column_mappings = index
|
||||
.columns
|
||||
.iter()
|
||||
.map(|idx_col| insertion.get_col_mapping_by_name(&idx_col.name));
|
||||
// find which cursor we opened earlier for this index
|
||||
let idx_cursor_id = idx_cursors
|
||||
.iter()
|
||||
.find(|(name, _, _)| *name == &index.name)
|
||||
.map(|(_, _, c_id)| *c_id)
|
||||
.expect("no cursor found for index");
|
||||
|
||||
let maybe_skip_probe_label = if let Some(where_clause) = &index.where_clause {
|
||||
let mut where_for_eval = where_clause.as_ref().clone();
|
||||
rewrite_partial_index_where(&mut where_for_eval, &insertion)?;
|
||||
let reg = program.alloc_register();
|
||||
translate_expr_no_constant_opt(
|
||||
&mut program,
|
||||
Some(&TableReferences::new_empty()),
|
||||
&where_for_eval,
|
||||
reg,
|
||||
resolver,
|
||||
NoConstantOptReason::RegisterReuse,
|
||||
)?;
|
||||
let lbl = program.allocate_label();
|
||||
program.emit_insn(Insn::IfNot {
|
||||
reg,
|
||||
target_pc: lbl,
|
||||
jump_if_null: true,
|
||||
});
|
||||
Some(lbl)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let num_cols = index.columns.len();
|
||||
// allocate scratch registers for the index columns plus rowid
|
||||
let idx_start_reg = program.alloc_registers(num_cols + 1);
|
||||
|
||||
// build unpacked key [idx_start_reg .. idx_start_reg+num_cols-1], and rowid in last reg,
|
||||
// copy each index column from the table's column registers into these scratch regs
|
||||
for (i, column_mapping) in column_mappings.clone().enumerate() {
|
||||
// copy from the table's column register over to the index's scratch register
|
||||
let Some(col_mapping) = column_mapping else {
|
||||
return Err(crate::LimboError::PlanningError(
|
||||
"Column not found in INSERT".to_string(),
|
||||
));
|
||||
};
|
||||
program.emit_insn(Insn::Copy {
|
||||
src_reg: col_mapping.register,
|
||||
dst_reg: idx_start_reg + i,
|
||||
extra_amount: 0,
|
||||
});
|
||||
}
|
||||
// last register is the rowid
|
||||
program.emit_insn(Insn::Copy {
|
||||
src_reg: insertion.key_register(),
|
||||
dst_reg: idx_start_reg + num_cols,
|
||||
extra_amount: 0,
|
||||
});
|
||||
|
||||
if index.unique {
|
||||
let aff = index
|
||||
.columns
|
||||
.iter()
|
||||
.map(|ic| table.columns()[ic.pos_in_table].affinity().aff_mask())
|
||||
.collect::<String>();
|
||||
program.emit_insn(Insn::Affinity {
|
||||
start_reg: idx_start_reg,
|
||||
count: NonZeroUsize::new(num_cols).expect("nonzero col count"),
|
||||
affinities: aff,
|
||||
});
|
||||
|
||||
if has_upsert {
|
||||
let next_check = program.allocate_label();
|
||||
program.emit_insn(Insn::NoConflict {
|
||||
cursor_id: idx_cursor_id,
|
||||
target_pc: next_check,
|
||||
record_reg: idx_start_reg,
|
||||
num_regs: num_cols,
|
||||
for (constraint, position) in constraints_to_check {
|
||||
match constraint {
|
||||
ResolvedUpsertTarget::PrimaryKey => {
|
||||
let make_record_label = program.allocate_label();
|
||||
program.emit_insn(Insn::NotExists {
|
||||
cursor: cursor_id,
|
||||
rowid_reg: insertion.key_register(),
|
||||
target_pc: make_record_label,
|
||||
});
|
||||
let rowid_column_name = insertion.key.column_name();
|
||||
|
||||
// Conflict detected, figure out if this UPSERT handles the conflict
|
||||
let upsert_matches_this_index = if let (Some(_u), Some(ref target)) =
|
||||
(upsert_opt.as_ref(), resolved_upsert.as_ref())
|
||||
{
|
||||
match target {
|
||||
ResolvedUpsertTarget::CatchAll => true,
|
||||
ResolvedUpsertTarget::Index(tgt) => Arc::ptr_eq(tgt, index),
|
||||
// note: PK handled earlier by rowid path; this is a secondary index
|
||||
ResolvedUpsertTarget::PrimaryKey => false,
|
||||
// Conflict on rowid: attempt to route through UPSERT if it targets the PK, otherwise raise constraint.
|
||||
// emit Halt for every case *except* when upsert handles the conflict
|
||||
'emit_halt: {
|
||||
if let Some(position) = position.or(upsert_catch_all_position) {
|
||||
// PK conflict: the conflicting rowid is exactly the attempted key
|
||||
program.emit_insn(Insn::Copy {
|
||||
src_reg: insertion.key_register(),
|
||||
dst_reg: conflict_rowid_reg,
|
||||
extra_amount: 0,
|
||||
});
|
||||
program.emit_insn(Insn::Goto {
|
||||
target_pc: upsert_actions[position].1,
|
||||
});
|
||||
break 'emit_halt;
|
||||
}
|
||||
} else {
|
||||
false
|
||||
};
|
||||
|
||||
if upsert_matches_this_index {
|
||||
// Distinguish DO NOTHING vs DO UPDATE
|
||||
match upsert_opt.as_ref().unwrap().do_clause {
|
||||
UpsertDo::Nothing => {
|
||||
// Bail out without writing anything
|
||||
program.emit_insn(Insn::Goto {
|
||||
target_pc: row_done_label,
|
||||
});
|
||||
}
|
||||
UpsertDo::Set { .. } => {
|
||||
// Route to DO UPDATE: capture conflicting rowid then jump
|
||||
program.emit_insn(Insn::IdxRowId {
|
||||
cursor_id: idx_cursor_id,
|
||||
dest: conflict_rowid_reg,
|
||||
});
|
||||
program.emit_insn(Insn::Goto {
|
||||
target_pc: upsert_entry,
|
||||
});
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// No matching UPSERT handler so we emit constraint error
|
||||
let mut description = String::with_capacity(
|
||||
table_name.as_str().len() + rowid_column_name.len() + 2,
|
||||
);
|
||||
description.push_str(table_name.as_str());
|
||||
description.push('.');
|
||||
description.push_str(rowid_column_name);
|
||||
program.emit_insn(Insn::Halt {
|
||||
err_code: SQLITE_CONSTRAINT_UNIQUE,
|
||||
description: format_unique_violation_desc(table_name.as_str(), index),
|
||||
err_code: SQLITE_CONSTRAINT_PRIMARYKEY,
|
||||
description,
|
||||
});
|
||||
}
|
||||
|
||||
// continue preflight with next constraint
|
||||
program.preassign_label_to_next_insn(next_check);
|
||||
} else {
|
||||
// No UPSERT fast-path: probe and immediately insert
|
||||
let ok = program.allocate_label();
|
||||
program.emit_insn(Insn::NoConflict {
|
||||
cursor_id: idx_cursor_id,
|
||||
target_pc: ok,
|
||||
record_reg: idx_start_reg,
|
||||
num_regs: num_cols,
|
||||
});
|
||||
// Unique violation without ON CONFLICT clause -> error
|
||||
program.emit_insn(Insn::Halt {
|
||||
err_code: SQLITE_CONSTRAINT_UNIQUE,
|
||||
description: format_unique_violation_desc(table_name.as_str(), index),
|
||||
});
|
||||
program.preassign_label_to_next_insn(ok);
|
||||
|
||||
// In the non-UPSERT case, we insert the index
|
||||
let record_reg = program.alloc_register();
|
||||
program.emit_insn(Insn::MakeRecord {
|
||||
start_reg: idx_start_reg,
|
||||
count: num_cols + 1,
|
||||
dest_reg: record_reg,
|
||||
index_name: Some(index.name.clone()),
|
||||
affinity_str: None,
|
||||
});
|
||||
program.emit_insn(Insn::IdxInsert {
|
||||
cursor_id: idx_cursor_id,
|
||||
record_reg,
|
||||
unpacked_start: Some(idx_start_reg),
|
||||
unpacked_count: Some((num_cols + 1) as u16),
|
||||
flags: IdxInsertFlags::new().nchange(true),
|
||||
});
|
||||
program.preassign_label_to_next_insn(make_record_label);
|
||||
}
|
||||
} else {
|
||||
// Non-unique index: in UPSERT mode we postpone writes to commit phase.
|
||||
if !has_upsert {
|
||||
// eager insert for non-unique, no UPSERT
|
||||
let record_reg = program.alloc_register();
|
||||
program.emit_insn(Insn::MakeRecord {
|
||||
start_reg: idx_start_reg,
|
||||
count: num_cols + 1,
|
||||
dest_reg: record_reg,
|
||||
index_name: Some(index.name.clone()),
|
||||
affinity_str: None,
|
||||
});
|
||||
program.emit_insn(Insn::IdxInsert {
|
||||
cursor_id: idx_cursor_id,
|
||||
record_reg,
|
||||
unpacked_start: Some(idx_start_reg),
|
||||
unpacked_count: Some((num_cols + 1) as u16),
|
||||
flags: IdxInsertFlags::new().nchange(true),
|
||||
});
|
||||
}
|
||||
}
|
||||
ResolvedUpsertTarget::Index(index) => {
|
||||
let column_mappings = index
|
||||
.columns
|
||||
.iter()
|
||||
.map(|idx_col| insertion.get_col_mapping_by_name(&idx_col.name));
|
||||
// find which cursor we opened earlier for this index
|
||||
let idx_cursor_id = idx_cursors
|
||||
.iter()
|
||||
.find(|(name, _, _)| *name == &index.name)
|
||||
.map(|(_, _, c_id)| *c_id)
|
||||
.expect("no cursor found for index");
|
||||
|
||||
// Close the partial-index skip (preflight)
|
||||
if let Some(lbl) = maybe_skip_probe_label {
|
||||
program.resolve_label(lbl, program.offset());
|
||||
let maybe_skip_probe_label = if let Some(where_clause) = &index.where_clause {
|
||||
let mut where_for_eval = where_clause.as_ref().clone();
|
||||
rewrite_partial_index_where(&mut where_for_eval, &insertion)?;
|
||||
let reg = program.alloc_register();
|
||||
translate_expr_no_constant_opt(
|
||||
&mut program,
|
||||
Some(&TableReferences::new_empty()),
|
||||
&where_for_eval,
|
||||
reg,
|
||||
resolver,
|
||||
NoConstantOptReason::RegisterReuse,
|
||||
)?;
|
||||
let lbl = program.allocate_label();
|
||||
program.emit_insn(Insn::IfNot {
|
||||
reg,
|
||||
target_pc: lbl,
|
||||
jump_if_null: true,
|
||||
});
|
||||
Some(lbl)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let num_cols = index.columns.len();
|
||||
// allocate scratch registers for the index columns plus rowid
|
||||
let idx_start_reg = program.alloc_registers(num_cols + 1);
|
||||
|
||||
// build unpacked key [idx_start_reg .. idx_start_reg+num_cols-1], and rowid in last reg,
|
||||
// copy each index column from the table's column registers into these scratch regs
|
||||
for (i, column_mapping) in column_mappings.clone().enumerate() {
|
||||
// copy from the table's column register over to the index's scratch register
|
||||
let Some(col_mapping) = column_mapping else {
|
||||
return Err(crate::LimboError::PlanningError(
|
||||
"Column not found in INSERT".to_string(),
|
||||
));
|
||||
};
|
||||
program.emit_insn(Insn::Copy {
|
||||
src_reg: col_mapping.register,
|
||||
dst_reg: idx_start_reg + i,
|
||||
extra_amount: 0,
|
||||
});
|
||||
}
|
||||
// last register is the rowid
|
||||
program.emit_insn(Insn::Copy {
|
||||
src_reg: insertion.key_register(),
|
||||
dst_reg: idx_start_reg + num_cols,
|
||||
extra_amount: 0,
|
||||
});
|
||||
|
||||
if index.unique {
|
||||
let aff = index
|
||||
.columns
|
||||
.iter()
|
||||
.map(|ic| table.columns()[ic.pos_in_table].affinity().aff_mask())
|
||||
.collect::<String>();
|
||||
program.emit_insn(Insn::Affinity {
|
||||
start_reg: idx_start_reg,
|
||||
count: NonZeroUsize::new(num_cols).expect("nonzero col count"),
|
||||
affinities: aff,
|
||||
});
|
||||
|
||||
if has_upsert {
|
||||
let next_check = program.allocate_label();
|
||||
program.emit_insn(Insn::NoConflict {
|
||||
cursor_id: idx_cursor_id,
|
||||
target_pc: next_check,
|
||||
record_reg: idx_start_reg,
|
||||
num_regs: num_cols,
|
||||
});
|
||||
|
||||
// Conflict detected, figure out if this UPSERT handles the conflict
|
||||
if let Some(position) = position.or(upsert_catch_all_position) {
|
||||
match &upsert_actions[position].2.do_clause {
|
||||
UpsertDo::Nothing => {
|
||||
// Bail out without writing anything
|
||||
program.emit_insn(Insn::Goto {
|
||||
target_pc: row_done_label,
|
||||
});
|
||||
}
|
||||
UpsertDo::Set { .. } => {
|
||||
// Route to DO UPDATE: capture conflicting rowid then jump
|
||||
program.emit_insn(Insn::IdxRowId {
|
||||
cursor_id: idx_cursor_id,
|
||||
dest: conflict_rowid_reg,
|
||||
});
|
||||
program.emit_insn(Insn::Goto {
|
||||
target_pc: upsert_actions[position].1,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
// No matching UPSERT handler so we emit constraint error
|
||||
// (if conflict clause matched - VM will jump to later instructions and skip halt)
|
||||
program.emit_insn(Insn::Halt {
|
||||
err_code: SQLITE_CONSTRAINT_UNIQUE,
|
||||
description: format_unique_violation_desc(table_name.as_str(), &index),
|
||||
});
|
||||
|
||||
// continue preflight with next constraint
|
||||
program.preassign_label_to_next_insn(next_check);
|
||||
} else {
|
||||
// No UPSERT fast-path: probe and immediately insert
|
||||
let ok = program.allocate_label();
|
||||
program.emit_insn(Insn::NoConflict {
|
||||
cursor_id: idx_cursor_id,
|
||||
target_pc: ok,
|
||||
record_reg: idx_start_reg,
|
||||
num_regs: num_cols,
|
||||
});
|
||||
// Unique violation without ON CONFLICT clause -> error
|
||||
program.emit_insn(Insn::Halt {
|
||||
err_code: SQLITE_CONSTRAINT_UNIQUE,
|
||||
description: format_unique_violation_desc(table_name.as_str(), &index),
|
||||
});
|
||||
program.preassign_label_to_next_insn(ok);
|
||||
|
||||
// In the non-UPSERT case, we insert the index
|
||||
let record_reg = program.alloc_register();
|
||||
program.emit_insn(Insn::MakeRecord {
|
||||
start_reg: idx_start_reg,
|
||||
count: num_cols + 1,
|
||||
dest_reg: record_reg,
|
||||
index_name: Some(index.name.clone()),
|
||||
affinity_str: None,
|
||||
});
|
||||
program.emit_insn(Insn::IdxInsert {
|
||||
cursor_id: idx_cursor_id,
|
||||
record_reg,
|
||||
unpacked_start: Some(idx_start_reg),
|
||||
unpacked_count: Some((num_cols + 1) as u16),
|
||||
flags: IdxInsertFlags::new().nchange(true),
|
||||
});
|
||||
}
|
||||
} else {
|
||||
// Non-unique index: in UPSERT mode we postpone writes to commit phase.
|
||||
if !has_upsert {
|
||||
// eager insert for non-unique, no UPSERT
|
||||
let record_reg = program.alloc_register();
|
||||
program.emit_insn(Insn::MakeRecord {
|
||||
start_reg: idx_start_reg,
|
||||
count: num_cols + 1,
|
||||
dest_reg: record_reg,
|
||||
index_name: Some(index.name.clone()),
|
||||
affinity_str: None,
|
||||
});
|
||||
program.emit_insn(Insn::IdxInsert {
|
||||
cursor_id: idx_cursor_id,
|
||||
record_reg,
|
||||
unpacked_start: Some(idx_start_reg),
|
||||
unpacked_count: Some((num_cols + 1) as u16),
|
||||
flags: IdxInsertFlags::new().nchange(true),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Close the partial-index skip (preflight)
|
||||
if let Some(lbl) = maybe_skip_probe_label {
|
||||
program.resolve_label(lbl, program.offset());
|
||||
}
|
||||
}
|
||||
ResolvedUpsertTarget::CatchAll => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
for column_mapping in insertion
|
||||
.col_mappings
|
||||
.iter()
|
||||
@@ -1085,11 +1099,9 @@ pub fn translate_insert(
|
||||
target_pc: row_done_label,
|
||||
});
|
||||
|
||||
// Normal INSERT path is done above
|
||||
// Any conflict routed to UPSERT jumps past all that to here:
|
||||
program.preassign_label_to_next_insn(upsert_entry);
|
||||
if let (Some(mut upsert), Some(_)) = (upsert_opt.take(), resolved_upsert.clone()) {
|
||||
// Only DO UPDATE (SET ...); DO NOTHING should have already jumped to row_done_label earlier.
|
||||
for (_, label, mut upsert) in upsert_actions {
|
||||
program.preassign_label_to_next_insn(label);
|
||||
|
||||
if let UpsertDo::Set {
|
||||
ref mut sets,
|
||||
ref mut where_clause,
|
||||
|
||||
@@ -274,7 +274,7 @@ pub fn upsert_matches_index(upsert: &Upsert, index: &Index, table: &Table) -> bo
|
||||
idx_cols.is_empty()
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum ResolvedUpsertTarget {
|
||||
// ON CONFLICT DO
|
||||
CatchAll,
|
||||
|
||||
35
testing/upsert.test
Normal file → Executable file
35
testing/upsert.test
Normal file → Executable file
@@ -201,6 +201,22 @@ do_execsql_test_in_memory_any_error upsert-composite-target-too-few {
|
||||
ON CONFLICT(a) DO UPDATE SET val = excluded.val; -- only "a" given → no match → error
|
||||
}
|
||||
|
||||
# Composite index requires exact coverage, targeting too few columns must not match.
|
||||
do_execsql_test_on_specific_db {:memory:} upsert-multilple-conflict-targets {
|
||||
CREATE TABLE ct (id INTEGER PRIMARY KEY, x UNIQUE, y UNIQUE, z DEFAULT NULL);
|
||||
INSERT INTO ct(id, x, y) VALUES (1, 'x', 'y');
|
||||
INSERT INTO ct(id, x, y) VALUES (2, 'a', 'b');
|
||||
INSERT INTO ct(id, x, y) VALUES (3, '!', '@');
|
||||
INSERT INTO ct(id, x, y) VALUES (4, 'x', 'y1'), (5, 'a1', 'b'), (3, '_', '_')
|
||||
ON CONFLICT(x) DO UPDATE SET x = excluded.x || '-' || x, y = excluded.y || '@' || y, z = 'x'
|
||||
ON CONFLICT(y) DO UPDATE SET x = excluded.x || '+' || x, y = excluded.y || '!' || y, z = 'y'
|
||||
ON CONFLICT DO UPDATE SET x = excluded.x || '#' || x, y = excluded.y || '%' || y, z = 'fallback';
|
||||
SELECT * FROM ct;
|
||||
} {1|x-x|y1@y|x
|
||||
2|a1+a|b!b|y
|
||||
3|_#!|_%@|fallback
|
||||
}
|
||||
|
||||
# Qualified target (t.a) should match unique index on a.
|
||||
do_execsql_test_on_specific_db {:memory:} upsert-qualified-target {
|
||||
CREATE TABLE qt (a UNIQUE, b);
|
||||
@@ -337,6 +353,25 @@ do_execsql_test_on_specific_db {:memory:} upsert-doubly-qualified-target {
|
||||
SELECT * FROM dq;
|
||||
} {1|new}
|
||||
|
||||
do_execsql_test_on_specific_db {:memory:} upsert-targets-chain {
|
||||
CREATE TABLE dq (a UNIQUE, b UNIQUE, c UNIQUE, value TEXT);
|
||||
CREATE UNIQUE INDEX dq_ab ON dq(a, b);
|
||||
INSERT INTO dq VALUES ('a1', 'a2', 'a3', 'aaa');
|
||||
INSERT INTO dq VALUES ('b1', 'b2', 'b3', 'bbb');
|
||||
INSERT INTO dq VALUES ('c1', 'c2', 'c3', 'ccc');
|
||||
INSERT INTO dq VALUES ('d1', 'd2', 'd3', 'ddd');
|
||||
INSERT INTO dq VALUES
|
||||
('a1', 'a2', 'a3', 'upd1'), ('b1', 'b1', 'b1', 'upd2'), ('c2', 'c2', 'c2', 'upd3'), ('d3', 'd3', 'd3', 'upd4')
|
||||
ON CONFLICT (a, b) DO UPDATE SET value = excluded.value || '-a'
|
||||
ON CONFLICT (a) DO UPDATE SET value = excluded.value || '-b'
|
||||
ON CONFLICT (b) DO UPDATE SET value = excluded.value || '-c'
|
||||
ON CONFLICT DO UPDATE SET value = excluded.value || '-d';
|
||||
SELECT * FROM dq;
|
||||
} {a1|a2|a3|upd1-a
|
||||
b1|b2|b3|upd2-b
|
||||
c1|c2|c3|upd3-c
|
||||
d1|d2|d3|upd4-d}
|
||||
|
||||
# https://github.com/tursodatabase/turso/issues/3384
|
||||
do_execsql_test_on_specific_db {:memory:} upsert-non-rowid-pk-target {
|
||||
create table phonebook(name text primary key, phonenumber text, validDate date);
|
||||
|
||||
@@ -672,12 +672,12 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
pub fn partial_index_mutation_and_upsert_fuzz() {
|
||||
index_mutation_upsert_fuzz(1.0, 1);
|
||||
index_mutation_upsert_fuzz(1.0, 4);
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn simple_index_mutation_and_upsert_fuzz() {
|
||||
index_mutation_upsert_fuzz(0.0, 0);
|
||||
index_mutation_upsert_fuzz(0.0, 4);
|
||||
}
|
||||
|
||||
fn index_mutation_upsert_fuzz(partial_index_prob: f64, conflict_chain_max_len: u32) {
|
||||
|
||||
@@ -839,3 +839,24 @@ pub fn delete_eq_correct() {
|
||||
limbo_exec_rows(&limbo, &conn, "SELECT * FROM t ORDER BY id")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn upsert_conflict() {
|
||||
let limbo = TempDatabase::new_empty(true);
|
||||
let conn = limbo.db.connect().unwrap();
|
||||
for sql in [
|
||||
"CREATE TABLE t (id INTEGER PRIMARY KEY AUTOINCREMENT, c INT UNIQUE, value INT);",
|
||||
"INSERT INTO t VALUES (1, 2, 100);",
|
||||
"INSERT INTO t VALUES (1, 2, 0) ON CONFLICT (c) DO UPDATE SET value = 42;",
|
||||
] {
|
||||
conn.execute(sql).unwrap();
|
||||
}
|
||||
assert_eq!(
|
||||
vec![vec![
|
||||
rusqlite::types::Value::Integer(1),
|
||||
rusqlite::types::Value::Integer(2),
|
||||
rusqlite::types::Value::Integer(42),
|
||||
]],
|
||||
limbo_exec_rows(&limbo, &conn, "SELECT * FROM t")
|
||||
);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user