mirror of
https://github.com/aljazceru/turso.git
synced 2025-12-24 03:34:18 +01:00
Fix UPSERT handling, properly rebuild indexes only based on what columns they touch
This commit is contained in:
@@ -1,9 +1,12 @@
|
||||
use std::collections::HashSet;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
use turso_parser::ast::{self, Upsert};
|
||||
|
||||
use crate::error::SQLITE_CONSTRAINT_PRIMARYKEY;
|
||||
use crate::translate::expr::WalkControl;
|
||||
use crate::translate::expr::{walk_expr, WalkControl};
|
||||
use crate::translate::insert::format_unique_violation_desc;
|
||||
use crate::vdbe::insn::CmpInsFlags;
|
||||
use crate::{
|
||||
bail_parse_error,
|
||||
@@ -109,82 +112,139 @@ pub fn upsert_matches_pk(upsert: &Upsert, table: &Table) -> bool {
|
||||
})
|
||||
}
|
||||
|
||||
#[derive(Hash, Debug, Eq, PartialEq, Clone)]
|
||||
/// A hashable descriptor of a single index key term used when
|
||||
/// matching an `ON CONFLICT` target against a UNIQUE index.
|
||||
/// captures only the attributes (name and effective collation) that
|
||||
/// determine whether two key terms are equivalent for conflict detection.
|
||||
pub struct KeySig {
|
||||
/// column name, normalized to lowercase
|
||||
name: String,
|
||||
/// defaults to "binary" if not specified on the target or col
|
||||
coll: String,
|
||||
/// Returns array of chaned column indicies and whether rowid was changed.
|
||||
fn collect_changed_cols(
|
||||
table: &Table,
|
||||
set_pairs: &[(usize, Box<ast::Expr>)],
|
||||
) -> (HashSet<usize>, bool) {
|
||||
let mut cols_changed = HashSet::with_capacity(table.columns().len());
|
||||
let mut rowid_changed = false;
|
||||
for (col_idx, _) in set_pairs {
|
||||
if let Some(c) = table.columns().get(*col_idx) {
|
||||
if c.is_rowid_alias {
|
||||
rowid_changed = true;
|
||||
} else {
|
||||
cols_changed.insert(*col_idx);
|
||||
}
|
||||
}
|
||||
}
|
||||
(cols_changed, rowid_changed)
|
||||
}
|
||||
|
||||
/// Match ON CONFLICT target to a UNIQUE index, ignoring order, requiring exact
|
||||
/// coverage, and honoring collations. `table` is used to derive effective collation.
|
||||
#[inline]
|
||||
fn upsert_index_is_affected(
|
||||
table: &Table,
|
||||
idx: &Index,
|
||||
changed_cols: &HashSet<usize>,
|
||||
rowid_changed: bool,
|
||||
) -> bool {
|
||||
if rowid_changed {
|
||||
return true;
|
||||
}
|
||||
let km = index_keys(idx);
|
||||
let pm = partial_index_cols(idx, table);
|
||||
for c in km.iter().chain(pm.iter()) {
|
||||
if changed_cols.contains(c) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
/// Columns used by index key
|
||||
#[inline]
|
||||
fn index_keys(idx: &Index) -> Vec<usize> {
|
||||
idx.columns.iter().map(|ic| ic.pos_in_table).collect()
|
||||
}
|
||||
|
||||
/// Columns referenced by the partial WHERE (empty if none).
|
||||
fn partial_index_cols(idx: &Index, table: &Table) -> HashSet<usize> {
|
||||
use ast::{Expr, Name};
|
||||
let Some(expr) = &idx.where_clause else {
|
||||
return HashSet::new();
|
||||
};
|
||||
let mut out = HashSet::new();
|
||||
let _ = walk_expr(expr, &mut |e: &ast::Expr| -> crate::Result<WalkControl> {
|
||||
match e {
|
||||
Expr::Id(Name::Ident(n) | Name::Quoted(n)) => {
|
||||
if let Some((i, _)) = table.get_column_by_name(&normalize_ident(n.as_str())) {
|
||||
out.insert(i);
|
||||
}
|
||||
}
|
||||
Expr::Qualified(ns, Name::Ident(c) | Name::Quoted(c))
|
||||
| Expr::DoublyQualified(_, ns, Name::Ident(c) | Name::Quoted(c)) => {
|
||||
// Only count columns that belong to this table
|
||||
let nsn = normalize_ident(ns.as_str());
|
||||
let tname = normalize_ident(table.get_name());
|
||||
if nsn.eq_ignore_ascii_case(&tname) {
|
||||
if let Some((i, _)) = table.get_column_by_name(&normalize_ident(c.as_str())) {
|
||||
out.insert(i);
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
Ok(WalkControl::Continue)
|
||||
});
|
||||
out
|
||||
}
|
||||
|
||||
/// Match ON CONFLICT target to a UNIQUE index, *ignoring order* but requiring
|
||||
/// exact coverage (same column multiset). If the target specifies a COLLATED
|
||||
/// column, the collation must match the index column's effective collation.
|
||||
/// If the target omits collation, any index collation is accepted.
|
||||
/// Partial (WHERE) indexes never match.
|
||||
pub fn upsert_matches_index(upsert: &Upsert, index: &Index, table: &Table) -> bool {
|
||||
let Some(target) = upsert.index.as_ref() else {
|
||||
// catch-all
|
||||
return true;
|
||||
};
|
||||
// if not unique or column count differs, no match
|
||||
if !index.unique || target.targets.len() != index.columns.len() {
|
||||
// must be a non-partial UNIQUE index with identical arity
|
||||
if !index.unique || index.where_clause.is_some() || target.targets.len() != index.columns.len()
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
let mut need: HashMap<KeySig, usize> = HashMap::new();
|
||||
for ic in &index.columns {
|
||||
let sig = KeySig {
|
||||
name: normalize_ident(&ic.name).to_string(),
|
||||
coll: effective_collation_for_index_col(ic, table),
|
||||
};
|
||||
*need.entry(sig).or_insert(0) += 1;
|
||||
}
|
||||
|
||||
// Consume from the multiset using target entries, order-insensitive
|
||||
// Build a multiset of index columns: (normalized name, effective collation)
|
||||
// effective collation = index collation if set, else table column default, else "binary"
|
||||
let mut idx_cols: Vec<(String, String)> = index
|
||||
.columns
|
||||
.iter()
|
||||
.map(|ic| {
|
||||
(
|
||||
normalize_ident(&ic.name),
|
||||
effective_collation_for_index_col(ic, table),
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
// For each target key, locate a matching index column (name equal ignoring case,
|
||||
// and collation equal iff the target specifies one). Consume each match once.
|
||||
for te in &target.targets {
|
||||
let tk = match extract_target_key(&te.expr) {
|
||||
Some(x) => x,
|
||||
None => return false, // not a simple column ref
|
||||
let Some(tk) = extract_target_key(&te.expr) else {
|
||||
return false;
|
||||
};
|
||||
let tname = tk.col_name;
|
||||
let mut found = None;
|
||||
|
||||
// Candidate signatures for this target:
|
||||
// If target specifies COLLATE, require exact match on (name, coll).
|
||||
// Otherwise, accept any collation currently present for that name.
|
||||
let mut matched = false;
|
||||
if let Some(ref coll) = tk.collate {
|
||||
let sig = KeySig {
|
||||
name: tk.col_name.to_string(),
|
||||
coll: coll.clone(),
|
||||
};
|
||||
if let Some(cnt) = need.get_mut(&sig) {
|
||||
*cnt -= 1;
|
||||
if *cnt == 0 {
|
||||
need.remove(&sig);
|
||||
for (i, (iname, icoll)) in idx_cols.iter().enumerate() {
|
||||
if tname.eq_ignore_ascii_case(iname)
|
||||
&& match tk.collate.as_ref() {
|
||||
Some(c) => c.eq_ignore_ascii_case(icoll),
|
||||
None => true, // unspecified collation -> accept any
|
||||
}
|
||||
matched = true;
|
||||
}
|
||||
} else {
|
||||
// Try any available collation for this column name
|
||||
if let Some((sig, cnt)) = need
|
||||
.iter_mut()
|
||||
.find(|(k, _)| k.name.eq_ignore_ascii_case(&tk.col_name))
|
||||
{
|
||||
*cnt -= 1;
|
||||
if *cnt == 0 {
|
||||
let key = sig.clone();
|
||||
need.remove(&key);
|
||||
}
|
||||
matched = true;
|
||||
found = Some(i);
|
||||
break;
|
||||
}
|
||||
}
|
||||
if !matched {
|
||||
if let Some(i) = found {
|
||||
// consume this index column once (multiset match)
|
||||
idx_cols.swap_remove(i);
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
// All targets matched exactly.
|
||||
need.is_empty()
|
||||
// All target columns matched exactly once
|
||||
idx_cols.is_empty()
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -213,7 +273,7 @@ pub fn resolve_upsert_target(
|
||||
}
|
||||
|
||||
for idx in schema.get_indices(table.get_name()) {
|
||||
if idx.unique && idx.where_clause.is_none() && upsert_matches_index(upsert, idx, table) {
|
||||
if idx.unique && upsert_matches_index(upsert, idx, table) {
|
||||
return Ok(ResolvedUpsertTarget::Index(Arc::clone(idx)));
|
||||
}
|
||||
}
|
||||
@@ -261,7 +321,7 @@ pub fn emit_upsert(
|
||||
cdc_cursor_id: Option<usize>,
|
||||
row_done_label: BranchOffset,
|
||||
) -> crate::Result<()> {
|
||||
// Seek and snapshot current row
|
||||
// Seek & snapshot CURRENT
|
||||
program.emit_insn(Insn::SeekRowid {
|
||||
cursor_id: tbl_cursor_id,
|
||||
src_reg: conflict_rowid_reg,
|
||||
@@ -285,7 +345,7 @@ pub fn emit_upsert(
|
||||
}
|
||||
}
|
||||
|
||||
// Keep BEFORE snapshot if needed
|
||||
// BEFORE for index maintenance / CDC
|
||||
let before_start = if cdc_cursor_id.is_some() || !idx_cursors.is_empty() {
|
||||
let s = program.alloc_registers(num_cols);
|
||||
program.emit_insn(Insn::Copy {
|
||||
@@ -298,8 +358,7 @@ pub fn emit_upsert(
|
||||
None
|
||||
};
|
||||
|
||||
// NEW snapshot starts as a copy of CURRENT, then SET expressions overwrite
|
||||
// the assigned columns. matching SQLite semantics of UPDATE reading the old row.
|
||||
// NEW = CURRENT, then apply SET
|
||||
let new_start = program.alloc_registers(num_cols);
|
||||
program.emit_insn(Insn::Copy {
|
||||
src_reg: current_start,
|
||||
@@ -307,15 +366,16 @@ pub fn emit_upsert(
|
||||
extra_amount: num_cols - 1,
|
||||
});
|
||||
|
||||
// WHERE predicate on the target row. If false or NULL, skip the UPDATE.
|
||||
// WHERE on target row
|
||||
if let Some(pred) = where_clause.as_mut() {
|
||||
rewrite_upsert_expr_in_place(
|
||||
rewrite_expr_to_registers(
|
||||
pred,
|
||||
table,
|
||||
table.get_name(),
|
||||
current_start,
|
||||
conflict_rowid_reg,
|
||||
insertion,
|
||||
Some(table.get_name()),
|
||||
Some(insertion),
|
||||
true,
|
||||
)?;
|
||||
let pr = program.alloc_register();
|
||||
translate_expr(program, None, pred, pr, resolver)?;
|
||||
@@ -326,15 +386,17 @@ pub fn emit_upsert(
|
||||
});
|
||||
}
|
||||
|
||||
// Evaluate each SET expression into the NEW row img
|
||||
// Apply SET; capture rowid change if any
|
||||
let mut new_rowid_reg: Option<usize> = None;
|
||||
for (col_idx, expr) in set_pairs.iter_mut() {
|
||||
rewrite_upsert_expr_in_place(
|
||||
rewrite_expr_to_registers(
|
||||
expr,
|
||||
table,
|
||||
table.get_name(),
|
||||
current_start,
|
||||
conflict_rowid_reg,
|
||||
insertion,
|
||||
Some(table.get_name()),
|
||||
Some(insertion),
|
||||
true,
|
||||
)?;
|
||||
translate_expr_no_constant_opt(
|
||||
program,
|
||||
@@ -349,12 +411,22 @@ pub fn emit_upsert(
|
||||
program.emit_insn(Insn::HaltIfNull {
|
||||
target_reg: new_start + *col_idx,
|
||||
err_code: SQLITE_CONSTRAINT_NOTNULL,
|
||||
description: format!("{}.{}", table.get_name(), col.name.as_ref().unwrap()),
|
||||
description: String::from(table.get_name()) + col.name.as_ref().unwrap(),
|
||||
});
|
||||
}
|
||||
if col.is_rowid_alias {
|
||||
// Must be integer; remember the NEW rowid value
|
||||
let r = program.alloc_register();
|
||||
program.emit_insn(Insn::Copy {
|
||||
src_reg: new_start + *col_idx,
|
||||
dst_reg: r,
|
||||
extra_amount: 0,
|
||||
});
|
||||
program.emit_insn(Insn::MustBeInt { reg: r });
|
||||
new_rowid_reg = Some(r);
|
||||
}
|
||||
}
|
||||
|
||||
// If STRICT, perform type checks on the NEW image
|
||||
if let Some(bt) = table.btree() {
|
||||
if bt.is_strict {
|
||||
program.emit_insn(Insn::TypeCheck {
|
||||
@@ -366,49 +438,34 @@ pub fn emit_upsert(
|
||||
}
|
||||
}
|
||||
|
||||
// Rebuild indexes: remove keys corresponding to BEFORE and insert keys for NEW.
|
||||
// Index rebuild (DELETE old, INSERT new), honoring partial-index WHEREs
|
||||
if let Some(before) = before_start {
|
||||
let (changed_cols, rowid_changed) = collect_changed_cols(table, set_pairs);
|
||||
|
||||
for (idx_name, _root, idx_cid) in idx_cursors {
|
||||
let idx_meta = schema
|
||||
.get_index(table.get_name(), idx_name)
|
||||
.expect("index exists");
|
||||
|
||||
if !upsert_index_is_affected(table, idx_meta, &changed_cols, rowid_changed) {
|
||||
continue; // skip untouched index completely
|
||||
}
|
||||
let k = idx_meta.columns.len();
|
||||
|
||||
let (before_pred_reg, new_pred_reg) = if let Some(where_clause) = &idx_meta.where_clause
|
||||
{
|
||||
// BEFORE image predicate
|
||||
let mut before_where = where_clause.as_ref().clone();
|
||||
rewrite_partial_index_where_for_image(
|
||||
&mut before_where,
|
||||
table,
|
||||
before_start.expect("before_start must exist for index maintenance"),
|
||||
conflict_rowid_reg,
|
||||
)?;
|
||||
let before_reg = program.alloc_register();
|
||||
translate_expr_no_constant_opt(
|
||||
program,
|
||||
None,
|
||||
&before_where,
|
||||
before_reg,
|
||||
resolver,
|
||||
NoConstantOptReason::RegisterReuse,
|
||||
)?;
|
||||
let before_pred_reg = eval_partial_pred_for_row_image(
|
||||
program,
|
||||
table,
|
||||
idx_meta,
|
||||
before,
|
||||
conflict_rowid_reg,
|
||||
resolver,
|
||||
);
|
||||
let new_rowid = new_rowid_reg.unwrap_or(conflict_rowid_reg);
|
||||
let new_pred_reg = eval_partial_pred_for_row_image(
|
||||
program, table, idx_meta, new_start, new_rowid, resolver,
|
||||
);
|
||||
|
||||
// NEW image predicate
|
||||
let mut new_where = where_clause.as_ref().clone();
|
||||
rewrite_partial_index_where_for_image(
|
||||
&mut new_where,
|
||||
table,
|
||||
new_start,
|
||||
conflict_rowid_reg,
|
||||
)?;
|
||||
let new_reg = program.alloc_register();
|
||||
translate_expr(program, None, &new_where, new_reg, resolver)?;
|
||||
|
||||
(Some(before_reg), Some(new_reg))
|
||||
} else {
|
||||
(None, None)
|
||||
};
|
||||
// Skip delete if BEFORE predicate false/NULL
|
||||
let maybe_skip_del = before_pred_reg.map(|r| {
|
||||
let lbl = program.allocate_label();
|
||||
program.emit_insn(Insn::IfNot {
|
||||
@@ -419,6 +476,7 @@ pub fn emit_upsert(
|
||||
lbl
|
||||
});
|
||||
|
||||
// DELETE old key
|
||||
let del = program.alloc_registers(k + 1);
|
||||
for (i, ic) in idx_meta.columns.iter().enumerate() {
|
||||
let (ci, _) = table.get_column_by_name(&ic.name).unwrap();
|
||||
@@ -439,13 +497,11 @@ pub fn emit_upsert(
|
||||
cursor_id: *idx_cid,
|
||||
raise_error_if_no_matching_entry: false,
|
||||
});
|
||||
|
||||
// resolve skipping the delete if it was false/NULL
|
||||
if let Some(label) = maybe_skip_del {
|
||||
program.resolve_label(label, program.offset());
|
||||
}
|
||||
|
||||
// if NEW does not satisfy partial index, skip the insert
|
||||
// Skip insert if NEW predicate false/NULL
|
||||
let maybe_skip_ins = new_pred_reg.map(|r| {
|
||||
let lbl = program.allocate_label();
|
||||
program.emit_insn(Insn::IfNot {
|
||||
@@ -456,6 +512,7 @@ pub fn emit_upsert(
|
||||
lbl
|
||||
});
|
||||
|
||||
// INSERT new key (use NEW rowid if present)
|
||||
let ins = program.alloc_registers(k + 1);
|
||||
for (i, ic) in idx_meta.columns.iter().enumerate() {
|
||||
let (ci, _) = table.get_column_by_name(&ic.name).unwrap();
|
||||
@@ -466,7 +523,7 @@ pub fn emit_upsert(
|
||||
});
|
||||
}
|
||||
program.emit_insn(Insn::Copy {
|
||||
src_reg: conflict_rowid_reg,
|
||||
src_reg: new_rowid,
|
||||
dst_reg: ins + k,
|
||||
extra_amount: 0,
|
||||
});
|
||||
@@ -480,54 +537,51 @@ pub fn emit_upsert(
|
||||
affinity_str: None,
|
||||
});
|
||||
|
||||
// If unique, perform NoConflict + self-check before IdxInsert
|
||||
if idx_meta.unique {
|
||||
let ok_lbl = program.allocate_label();
|
||||
// Affinity on the key columns for the NoConflict probe
|
||||
let ok = program.allocate_label();
|
||||
let aff: String = idx_meta
|
||||
.columns
|
||||
.iter()
|
||||
.map(|c| {
|
||||
table
|
||||
.get_column_by_name(&c.name)
|
||||
.map(|(_, col)| col.affinity().aff_mask())
|
||||
.unwrap_or('B')
|
||||
})
|
||||
.collect();
|
||||
|
||||
program.emit_insn(Insn::Affinity {
|
||||
start_reg: ins,
|
||||
count: NonZeroUsize::new(k).unwrap(),
|
||||
affinities: aff,
|
||||
});
|
||||
program.emit_insn(Insn::NoConflict {
|
||||
cursor_id: *idx_cid,
|
||||
target_pc: ok_lbl,
|
||||
target_pc: ok,
|
||||
record_reg: ins,
|
||||
num_regs: k,
|
||||
});
|
||||
|
||||
// If there’s a hit, skip it if it’s self, otherwise raise constraint
|
||||
let hit_rowid = program.alloc_register();
|
||||
let hit = program.alloc_register();
|
||||
program.emit_insn(Insn::IdxRowId {
|
||||
cursor_id: *idx_cid,
|
||||
dest: hit_rowid,
|
||||
dest: hit,
|
||||
});
|
||||
program.emit_insn(Insn::Eq {
|
||||
lhs: conflict_rowid_reg,
|
||||
rhs: hit_rowid,
|
||||
target_pc: ok_lbl,
|
||||
lhs: new_rowid,
|
||||
rhs: hit,
|
||||
target_pc: ok,
|
||||
flags: CmpInsFlags::default(),
|
||||
collation: program.curr_collation(),
|
||||
});
|
||||
let mut description = String::with_capacity(
|
||||
table.get_name().len()
|
||||
+ idx_meta
|
||||
.columns
|
||||
.iter()
|
||||
.map(|c| c.name.len() + 2)
|
||||
.sum::<usize>(),
|
||||
);
|
||||
description.push_str(table.get_name());
|
||||
description.push_str(".(");
|
||||
description.push_str(
|
||||
&idx_meta
|
||||
.columns
|
||||
.iter()
|
||||
.map(|c| c.name.as_str())
|
||||
.collect::<Vec<_>>()
|
||||
.join(", "),
|
||||
);
|
||||
description.push(')');
|
||||
let description = format_unique_violation_desc(table.get_name(), idx_meta);
|
||||
program.emit_insn(Insn::Halt {
|
||||
err_code: SQLITE_CONSTRAINT_PRIMARYKEY,
|
||||
description,
|
||||
});
|
||||
program.preassign_label_to_next_insn(ok_lbl);
|
||||
program.preassign_label_to_next_insn(ok);
|
||||
}
|
||||
|
||||
program.emit_insn(Insn::IdxInsert {
|
||||
cursor_id: *idx_cid,
|
||||
record_reg: rec,
|
||||
@@ -535,21 +589,20 @@ pub fn emit_upsert(
|
||||
unpacked_count: Some((k + 1) as u16),
|
||||
flags: IdxInsertFlags::new().nchange(true),
|
||||
});
|
||||
|
||||
if let Some(lbl) = maybe_skip_ins {
|
||||
program.resolve_label(lbl, program.offset());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Write table row (same rowid, new payload)
|
||||
// Build NEW table payload
|
||||
let rec = program.alloc_register();
|
||||
|
||||
let affinity_str = table
|
||||
.columns()
|
||||
.iter()
|
||||
.map(|col| col.affinity().aff_mask())
|
||||
.map(|c| c.affinity().aff_mask())
|
||||
.collect::<String>();
|
||||
|
||||
program.emit_insn(Insn::MakeRecord {
|
||||
start_reg: new_start,
|
||||
count: num_cols,
|
||||
@@ -557,59 +610,155 @@ pub fn emit_upsert(
|
||||
index_name: None,
|
||||
affinity_str: Some(affinity_str),
|
||||
});
|
||||
program.emit_insn(Insn::Insert {
|
||||
cursor: tbl_cursor_id,
|
||||
key_reg: conflict_rowid_reg,
|
||||
record_reg: rec,
|
||||
flag: InsertFlags::new(),
|
||||
table_name: table.get_name().to_string(),
|
||||
});
|
||||
|
||||
if let Some(cdc_id) = cdc_cursor_id {
|
||||
let after_rec = if program.capture_data_changes_mode().has_after() {
|
||||
Some(emit_cdc_patch_record(
|
||||
program,
|
||||
table,
|
||||
new_start,
|
||||
rec,
|
||||
conflict_rowid_reg,
|
||||
))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
// Build BEFORE if needed
|
||||
let before_rec = if program.capture_data_changes_mode().has_before() {
|
||||
Some(emit_cdc_full_record(
|
||||
program,
|
||||
table.columns(),
|
||||
tbl_cursor_id,
|
||||
conflict_rowid_reg,
|
||||
))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
emit_cdc_insns(
|
||||
program,
|
||||
resolver,
|
||||
OperationMode::UPDATE,
|
||||
cdc_id,
|
||||
conflict_rowid_reg,
|
||||
before_rec,
|
||||
after_rec,
|
||||
None,
|
||||
table.get_name(),
|
||||
)?;
|
||||
// If rowid changed, first ensure no other row owns it, then delete+insert
|
||||
if let Some(rnew) = new_rowid_reg {
|
||||
let ok = program.allocate_label();
|
||||
|
||||
// If equal to old rowid, skip uniqueness probe
|
||||
program.emit_insn(Insn::Eq {
|
||||
lhs: rnew,
|
||||
rhs: conflict_rowid_reg,
|
||||
target_pc: ok,
|
||||
flags: CmpInsFlags::default(),
|
||||
collation: program.curr_collation(),
|
||||
});
|
||||
|
||||
// If another row already has rnew -> constraint
|
||||
program.emit_insn(Insn::NotExists {
|
||||
cursor: tbl_cursor_id,
|
||||
rowid_reg: rnew,
|
||||
target_pc: ok,
|
||||
});
|
||||
program.emit_insn(Insn::Halt {
|
||||
err_code: SQLITE_CONSTRAINT_PRIMARYKEY,
|
||||
description: format!(
|
||||
"{}.{}",
|
||||
table.get_name(),
|
||||
table
|
||||
.columns()
|
||||
.iter()
|
||||
.find(|c| c.is_rowid_alias)
|
||||
.and_then(|c| c.name.as_ref())
|
||||
.unwrap_or(&"rowid".to_string())
|
||||
),
|
||||
});
|
||||
program.preassign_label_to_next_insn(ok);
|
||||
|
||||
// Now replace the row
|
||||
program.emit_insn(Insn::Delete {
|
||||
cursor_id: tbl_cursor_id,
|
||||
table_name: table.get_name().to_string(),
|
||||
});
|
||||
program.emit_insn(Insn::Insert {
|
||||
cursor: tbl_cursor_id,
|
||||
key_reg: rnew,
|
||||
record_reg: rec,
|
||||
flag: InsertFlags::new().require_seek().update_rowid_change(),
|
||||
table_name: table.get_name().to_string(),
|
||||
});
|
||||
} else {
|
||||
program.emit_insn(Insn::Insert {
|
||||
cursor: tbl_cursor_id,
|
||||
key_reg: conflict_rowid_reg,
|
||||
record_reg: rec,
|
||||
flag: InsertFlags::new(),
|
||||
table_name: table.get_name().to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
// emit CDC instructions
|
||||
if let Some(cdc_id) = cdc_cursor_id {
|
||||
let new_rowid = new_rowid_reg.unwrap_or(conflict_rowid_reg);
|
||||
if new_rowid_reg.is_some() {
|
||||
// DELETE (before)
|
||||
let before_rec = if program.capture_data_changes_mode().has_before() {
|
||||
Some(emit_cdc_full_record(
|
||||
program,
|
||||
table.columns(),
|
||||
tbl_cursor_id,
|
||||
conflict_rowid_reg,
|
||||
))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
emit_cdc_insns(
|
||||
program,
|
||||
resolver,
|
||||
OperationMode::DELETE,
|
||||
cdc_id,
|
||||
conflict_rowid_reg,
|
||||
before_rec,
|
||||
None,
|
||||
None,
|
||||
table.get_name(),
|
||||
)?;
|
||||
|
||||
// INSERT (after)
|
||||
let after_rec = if program.capture_data_changes_mode().has_after() {
|
||||
Some(emit_cdc_patch_record(
|
||||
program, table, new_start, rec, new_rowid,
|
||||
))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
emit_cdc_insns(
|
||||
program,
|
||||
resolver,
|
||||
OperationMode::INSERT,
|
||||
cdc_id,
|
||||
new_rowid,
|
||||
None,
|
||||
after_rec,
|
||||
None,
|
||||
table.get_name(),
|
||||
)?;
|
||||
} else {
|
||||
let after_rec = if program.capture_data_changes_mode().has_after() {
|
||||
Some(emit_cdc_patch_record(
|
||||
program,
|
||||
table,
|
||||
new_start,
|
||||
rec,
|
||||
conflict_rowid_reg,
|
||||
))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let before_rec = if program.capture_data_changes_mode().has_before() {
|
||||
Some(emit_cdc_full_record(
|
||||
program,
|
||||
table.columns(),
|
||||
tbl_cursor_id,
|
||||
conflict_rowid_reg,
|
||||
))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
emit_cdc_insns(
|
||||
program,
|
||||
resolver,
|
||||
OperationMode::UPDATE,
|
||||
cdc_id,
|
||||
conflict_rowid_reg,
|
||||
before_rec,
|
||||
after_rec,
|
||||
None,
|
||||
table.get_name(),
|
||||
)?;
|
||||
}
|
||||
}
|
||||
|
||||
// RETURNING from NEW image + final rowid
|
||||
if !returning.is_empty() {
|
||||
let regs = ReturningValueRegisters {
|
||||
rowid_register: conflict_rowid_reg,
|
||||
rowid_register: new_rowid_reg.unwrap_or(conflict_rowid_reg),
|
||||
columns_start_register: new_start,
|
||||
num_columns: num_cols,
|
||||
};
|
||||
|
||||
emit_returning_results(program, returning, ®s)?;
|
||||
}
|
||||
|
||||
program.emit_insn(Insn::Goto {
|
||||
target_pc: row_done_label,
|
||||
});
|
||||
@@ -620,7 +769,6 @@ pub fn emit_upsert(
|
||||
///
|
||||
/// Supports multi-target row-value SETs: `SET (a, b) = (expr1, expr2)`.
|
||||
/// Enforces same number of column names and RHS values.
|
||||
/// Rewrites `EXCLUDED.*` references to direct `Register` reads from the insertion registers
|
||||
/// If the same column is assigned multiple times, the last assignment wins.
|
||||
pub fn collect_set_clauses_for_upsert(
|
||||
table: &Table,
|
||||
@@ -661,108 +809,109 @@ pub fn collect_set_clauses_for_upsert(
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
/// Rewrite an UPSERT expression so that:
|
||||
/// EXCLUDED.x -> Register(insertion.x)
|
||||
/// t.x / x -> Register(CURRENT.x) when t == target table or unqualified
|
||||
/// rowid -> Register(conflict_rowid_reg)
|
||||
fn eval_partial_pred_for_row_image(
|
||||
prg: &mut ProgramBuilder,
|
||||
table: &Table,
|
||||
idx: &Index,
|
||||
row_start: usize, // base of CURRENT or NEW image
|
||||
rowid_reg: usize, // rowid for that image
|
||||
resolver: &Resolver,
|
||||
) -> Option<usize> {
|
||||
let Some(where_expr) = &idx.where_clause else {
|
||||
return None;
|
||||
};
|
||||
let mut e = where_expr.as_ref().clone();
|
||||
rewrite_expr_to_registers(
|
||||
&mut e, table, row_start, rowid_reg, None, // table_name
|
||||
None, // insertion
|
||||
false, // dont allow EXCLUDED
|
||||
)
|
||||
.ok()?;
|
||||
let r = prg.alloc_register();
|
||||
translate_expr_no_constant_opt(
|
||||
prg,
|
||||
None,
|
||||
&e,
|
||||
r,
|
||||
resolver,
|
||||
NoConstantOptReason::RegisterReuse,
|
||||
)
|
||||
.ok()?;
|
||||
Some(r)
|
||||
}
|
||||
|
||||
/// Generic rewriter that maps column references to registers for a given row image.
|
||||
///
|
||||
/// Only rewrites names in the current expression scope, does not enter subqueries.
|
||||
fn rewrite_upsert_expr_in_place(
|
||||
/// - Id/Qualified refs to the *target table* (when `table_name` is provided) resolve
|
||||
/// to the CURRENT/NEW row image starting at `base_start`, with `rowid` (or the
|
||||
/// rowid-alias) mapped to `rowid_reg`.
|
||||
/// - If `allow_excluded` and `insertion` are provided, `EXCLUDED.x` resolves to the
|
||||
/// insertion registers (and `EXCLUDED.rowid` resolves to `insertion.key_register()`).
|
||||
/// - If `table_name` is `None`, qualified refs never match
|
||||
/// - Leaves names from other tables/namespaces untouched.
|
||||
fn rewrite_expr_to_registers(
|
||||
e: &mut ast::Expr,
|
||||
table: &Table,
|
||||
table_name: &str,
|
||||
current_start: usize,
|
||||
conflict_rowid_reg: usize,
|
||||
insertion: &Insertion,
|
||||
base_start: usize,
|
||||
rowid_reg: usize,
|
||||
table_name: Option<&str>,
|
||||
insertion: Option<&Insertion>,
|
||||
allow_excluded: bool,
|
||||
) -> crate::Result<WalkControl> {
|
||||
use ast::{Expr, Name};
|
||||
let table_name_norm = table_name.map(normalize_ident);
|
||||
|
||||
let col_reg = |name: &str| -> Option<usize> {
|
||||
// Map a column name to a register within the row image at `base_start`.
|
||||
let col_reg_from_row_image = |name: &str| -> Option<usize> {
|
||||
if name.eq_ignore_ascii_case("rowid") {
|
||||
return Some(conflict_rowid_reg);
|
||||
return Some(rowid_reg);
|
||||
}
|
||||
let (idx, c) = table.get_column_by_name(name)?;
|
||||
if c.is_rowid_alias {
|
||||
Some(rowid_reg)
|
||||
} else {
|
||||
Some(base_start + idx)
|
||||
}
|
||||
let (idx, _) = table.get_column_by_name(name)?;
|
||||
Some(current_start + idx)
|
||||
};
|
||||
|
||||
walk_expr_mut(
|
||||
e,
|
||||
&mut |expr: &mut ast::Expr| -> crate::Result<WalkControl> {
|
||||
match expr {
|
||||
// EXCLUDED.x or t.x (t may be quoted)
|
||||
Expr::Qualified(ns, Name::Ident(c) | Name::Quoted(c))
|
||||
| Expr::DoublyQualified(_, ns, Name::Ident(c) | Name::Quoted(c)) => {
|
||||
let ns = normalize_ident(ns.as_str());
|
||||
let c = normalize_ident(c.as_str());
|
||||
if ns.eq_ignore_ascii_case("excluded") {
|
||||
let Some(reg) = insertion.get_col_mapping_by_name(&c) else {
|
||||
bail_parse_error!("no such column in EXCLUDED: {}", c);
|
||||
};
|
||||
*expr = Expr::Register(reg.register);
|
||||
} else if ns.eq_ignore_ascii_case(table_name) {
|
||||
if let Some(reg) = col_reg(c.as_str()) {
|
||||
*expr = Expr::Register(reg);
|
||||
// Handle EXCLUDED.* if enabled
|
||||
if allow_excluded && ns.eq_ignore_ascii_case("excluded") {
|
||||
if let Some(ins) = insertion {
|
||||
if c.eq_ignore_ascii_case("rowid") {
|
||||
*expr = Expr::Register(ins.key_register());
|
||||
} else if let Some(cm) = ins.get_col_mapping_by_name(&c) {
|
||||
*expr = Expr::Register(cm.register);
|
||||
} else {
|
||||
bail_parse_error!("no such column in EXCLUDED: {}", c);
|
||||
}
|
||||
}
|
||||
// If insertion is None, leave EXCLUDED.* untouched.
|
||||
return Ok(WalkControl::Continue);
|
||||
}
|
||||
|
||||
// Match the target table namespace if provided
|
||||
if let Some(ref tn) = table_name_norm {
|
||||
if ns.eq_ignore_ascii_case(tn) {
|
||||
if let Some(r) = col_reg_from_row_image(&c) {
|
||||
*expr = Expr::Register(r);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// Unqualified column id -> CURRENT
|
||||
// Unqualified id -> row image (CURRENT/NEW depending on caller)
|
||||
Expr::Id(Name::Ident(name)) | Expr::Id(Name::Quoted(name)) => {
|
||||
if let Some(reg) = col_reg(&normalize_ident(name.as_str())) {
|
||||
*expr = Expr::Register(reg);
|
||||
if let Some(r) = col_reg_from_row_image(&normalize_ident(name.as_str())) {
|
||||
*expr = Expr::Register(r);
|
||||
}
|
||||
}
|
||||
Expr::RowId { .. } => {
|
||||
*expr = Expr::Register(conflict_rowid_reg);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
Ok(WalkControl::Continue)
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
/// Rewrite partial-index WHERE to read from a contiguous row image starting at `base_start`.
|
||||
/// Maps rowid (and the rowid-alias column) to `rowid_reg`... Very similar to the above method
|
||||
/// but simpler because there is no EXCLUDED or table name to consider.
|
||||
fn rewrite_partial_index_where_for_image(
|
||||
expr: &mut ast::Expr,
|
||||
table: &Table,
|
||||
base_start: usize,
|
||||
rowid_reg: usize,
|
||||
) -> crate::Result<WalkControl> {
|
||||
walk_expr_mut(
|
||||
expr,
|
||||
&mut |e: &mut ast::Expr| -> crate::Result<WalkControl> {
|
||||
match e {
|
||||
ast::Expr::Id(n) => {
|
||||
let nm = normalize_ident(n.as_str());
|
||||
if nm.eq_ignore_ascii_case("rowid") {
|
||||
*e = ast::Expr::Register(rowid_reg);
|
||||
} else if let Some((col_idx, _)) = table.get_column_by_name(&nm) {
|
||||
let col = &table.columns()[col_idx];
|
||||
*e = ast::Expr::Register(if col.is_rowid_alias {
|
||||
rowid_reg
|
||||
} else {
|
||||
base_start + col_idx
|
||||
});
|
||||
}
|
||||
}
|
||||
ast::Expr::Qualified(_, cn) | ast::Expr::DoublyQualified(_, _, cn) => {
|
||||
let nm = normalize_ident(cn.as_str());
|
||||
if nm.eq_ignore_ascii_case("rowid") {
|
||||
*e = ast::Expr::Register(rowid_reg);
|
||||
} else if let Some((col_idx, _)) = table.get_column_by_name(&nm) {
|
||||
let col = &table.columns()[col_idx];
|
||||
*e = ast::Expr::Register(if col.is_rowid_alias {
|
||||
rowid_reg
|
||||
} else {
|
||||
base_start + col_idx
|
||||
});
|
||||
}
|
||||
}
|
||||
ast::Expr::RowId { .. } => {
|
||||
*e = ast::Expr::Register(rowid_reg);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
Ok(WalkControl::Continue)
|
||||
|
||||
Reference in New Issue
Block a user