Fix foreign key constraint enforcement on UNIQUE indexes

Closes #3648

Co-authored-by: Pavan-Nambi <pavannambi999@gmail.com>
This commit is contained in:
Jussi Saurio
2025-10-24 10:52:54 +03:00
parent 5f0bbf1ce5
commit 18e6a23f23
5 changed files with 204 additions and 123 deletions

View File

@@ -33,7 +33,7 @@ use crate::translate::expr::{
use crate::translate::fkeys::{
build_index_affinity_string, emit_fk_child_update_counters,
emit_fk_delete_parent_existence_checks, emit_guarded_fk_decrement,
emit_parent_pk_change_checks, open_read_index, open_read_table, stabilize_new_row_for_fk,
emit_parent_key_change_checks, open_read_index, open_read_table, stabilize_new_row_for_fk,
};
use crate::translate::plan::{DeletePlan, JoinedTable, Plan, QueryDestination, Search};
use crate::translate::planner::ROWID_STRS;
@@ -1346,10 +1346,11 @@ fn emit_update_insns(
.schema
.any_resolved_fks_referencing(table_name)
{
emit_parent_pk_change_checks(
emit_parent_key_change_checks(
program,
&t_ctx.resolver,
&table_btree,
plan.indexes_to_update.iter(),
target_table_cursor_id,
beg,
start,

View File

@@ -264,14 +264,15 @@ pub fn stabilize_new_row_for_fk(
Ok(())
}
/// Parent-side checks when the parent PK might change (UPDATE on parent):
/// Parent-side checks when the parent key might change (UPDATE on parent):
/// Detect if any child references the OLD key (potential violation), and if any references the NEW key
/// (which cancels one potential violation). For composite PKs this builds OLD/NEW vectors first.
/// (which cancels one potential violation). For composite keys this builds OLD/NEW vectors first.
#[allow(clippy::too_many_arguments)]
pub fn emit_parent_pk_change_checks(
pub fn emit_parent_key_change_checks(
program: &mut ProgramBuilder,
resolver: &Resolver,
table_btree: &BTreeTable,
indexes_to_update: impl Iterator<Item = impl AsRef<Index>>,
cursor_id: usize,
old_rowid_reg: usize,
start: usize,
@@ -290,34 +291,32 @@ pub fn emit_parent_pk_change_checks(
return Ok(());
}
match table_btree.primary_key_columns.len() {
0 => emit_rowid_pk_change_check(
let primary_key_is_rowid_alias = table_btree.get_rowid_alias_column().is_some();
if primary_key_is_rowid_alias || table_btree.primary_key_columns.is_empty() {
emit_rowid_pk_change_check(
program,
&incoming,
resolver,
old_rowid_reg,
rowid_set_clause_reg.unwrap_or(old_rowid_reg),
),
1 => emit_single_pk_change_check(
program,
&incoming,
resolver,
table_btree,
cursor_id,
start,
rowid_new_reg,
),
_ => emit_composite_pk_change_check(
program,
&incoming,
resolver,
table_btree,
cursor_id,
old_rowid_reg,
start,
rowid_new_reg,
),
)?;
}
for index in indexes_to_update {
emit_parent_index_key_change_checks(
program,
cursor_id,
start,
old_rowid_reg,
rowid_new_reg,
&incoming,
resolver,
table_btree,
index.as_ref(),
)?;
}
Ok(())
}
/// Rowid-table parent PK change: compare rowid OLD vs NEW; if changed, run two-pass counters.
@@ -355,124 +354,82 @@ pub fn emit_rowid_pk_change_check(
Ok(())
}
/// Single-column PK parent change: load OLD and NEW; if changed, run two-pass counters.
pub fn emit_single_pk_change_check(
program: &mut ProgramBuilder,
incoming: &[ResolvedFkRef],
resolver: &Resolver,
table_btree: &BTreeTable,
cursor_id: usize,
start: usize,
rowid_new_reg: usize,
) -> Result<()> {
let (pk_name, _) = &table_btree.primary_key_columns[0];
let (pos, col) = table_btree.get_column(pk_name).unwrap();
let old_reg = program.alloc_register();
if col.is_rowid_alias {
program.emit_insn(Insn::RowId {
cursor_id,
dest: old_reg,
});
} else {
program.emit_insn(Insn::Column {
cursor_id,
column: pos,
dest: old_reg,
default: None,
});
}
let new_reg = if col.is_rowid_alias {
rowid_new_reg
} else {
start + pos
};
let skip = program.allocate_label();
program.emit_insn(Insn::Eq {
lhs: old_reg,
rhs: new_reg,
target_pc: skip,
flags: CmpInsFlags::default(),
collation: None,
});
let old_pk = program.alloc_register();
let new_pk = program.alloc_register();
program.emit_insn(Insn::Copy {
src_reg: old_reg,
dst_reg: old_pk,
extra_amount: 0,
});
program.emit_insn(Insn::Copy {
src_reg: new_reg,
dst_reg: new_pk,
extra_amount: 0,
});
emit_fk_parent_pk_change_counters(program, incoming, resolver, old_pk, new_pk, 1)?;
program.preassign_label_to_next_insn(skip);
Ok(())
}
/// Composite-PK parent change: build OLD/NEW vectors; if any component differs, run two-pass counters.
/// Foreign keys are only legal if the referenced parent key is:
/// 1. The rowid alias (no separate index)
/// 2. Part of a primary key / unique index (there is no practical difference between the two)
///
/// If the foreign key references a composite key, all of the columns in the key must be referenced.
/// E.g.
/// CREATE TABLE parent (a, b, c, PRIMARY KEY (a, b, c));
/// CREATE TABLE child (a, b, c, FOREIGN KEY (a, b, c) REFERENCES parent (a, b, c));
///
/// Whereas this is not allowed:
/// CREATE TABLE parent (a, b, c, PRIMARY KEY (a, b, c));
/// CREATE TABLE child (a, b, c, FOREIGN KEY (a, b) REFERENCES parent (a, b, c));
///
/// This function checks if the parent key has changed by comparing the OLD and NEW values.
/// If the parent key has changed, it emits the counters for the foreign keys.
/// If the parent key has not changed, it does nothing.
#[allow(clippy::too_many_arguments)]
pub fn emit_composite_pk_change_check(
pub fn emit_parent_index_key_change_checks(
program: &mut ProgramBuilder,
cursor_id: usize,
new_values_start: usize,
old_rowid_reg: usize,
new_rowid_reg: usize,
incoming: &[ResolvedFkRef],
resolver: &Resolver,
table_btree: &BTreeTable,
cursor_id: usize,
old_rowid_reg: usize,
start: usize,
rowid_new_reg: usize,
index: &Index,
) -> Result<()> {
let pk_len = table_btree.primary_key_columns.len();
let idx_len = index.columns.len();
let old_pk = program.alloc_registers(pk_len);
for (i, (pk_name, _)) in table_btree.primary_key_columns.iter().enumerate() {
let (pos, col) = table_btree.get_column(pk_name).unwrap();
if col.is_rowid_alias {
let old_key = program.alloc_registers(idx_len);
for (i, index_col) in index.columns.iter().enumerate() {
let pos_in_table = index_col.pos_in_table;
let column = &table_btree.columns[pos_in_table];
if column.is_rowid_alias {
program.emit_insn(Insn::Copy {
src_reg: old_rowid_reg,
dst_reg: old_pk + i,
dst_reg: old_key + i,
extra_amount: 0,
});
} else {
program.emit_insn(Insn::Column {
cursor_id,
column: pos,
dest: old_pk + i,
column: pos_in_table,
dest: old_key + i,
default: None,
});
}
}
let new_pk = program.alloc_registers(pk_len);
for (i, (pk_name, _)) in table_btree.primary_key_columns.iter().enumerate() {
let (pos, col) = table_btree.get_column(pk_name).unwrap();
let src = if col.is_rowid_alias {
rowid_new_reg
let new_key = program.alloc_registers(idx_len);
for (i, index_col) in index.columns.iter().enumerate() {
let pos_in_table = index_col.pos_in_table;
let column = &table_btree.columns[pos_in_table];
let src = if column.is_rowid_alias {
new_rowid_reg
} else {
start + pos
new_values_start + pos_in_table
};
program.emit_insn(Insn::Copy {
src_reg: src,
dst_reg: new_pk + i,
dst_reg: new_key + i,
extra_amount: 0,
});
}
let skip = program.allocate_label();
let changed = program.allocate_label();
for i in 0..pk_len {
let next = if i + 1 == pk_len {
for i in 0..idx_len {
let next = if i + 1 == idx_len {
None
} else {
Some(program.allocate_label())
};
program.emit_insn(Insn::Eq {
lhs: old_pk + i,
rhs: new_pk + i,
lhs: old_key + i,
rhs: new_key + i,
target_pc: next.unwrap_or(skip),
flags: CmpInsFlags::default(),
collation: None,
@@ -484,7 +441,7 @@ pub fn emit_composite_pk_change_check(
}
program.preassign_label_to_next_insn(changed);
emit_fk_parent_pk_change_counters(program, incoming, resolver, old_pk, new_pk, pk_len)?;
emit_fk_parent_pk_change_counters(program, incoming, resolver, old_key, new_key, idx_len)?;
program.preassign_label_to_next_insn(skip);
Ok(())
}

View File

@@ -8,7 +8,7 @@ use crate::error::SQLITE_CONSTRAINT_PRIMARYKEY;
use crate::schema::ROWID_SENTINEL;
use crate::translate::emitter::UpdateRowSource;
use crate::translate::expr::{walk_expr, WalkControl};
use crate::translate::fkeys::{emit_fk_child_update_counters, emit_parent_pk_change_checks};
use crate::translate::fkeys::{emit_fk_child_update_counters, emit_parent_key_change_checks};
use crate::translate::insert::{format_unique_violation_desc, InsertEmitCtx};
use crate::translate::planner::ROWID_STRS;
use crate::vdbe::insn::CmpInsFlags;
@@ -497,10 +497,13 @@ pub fn emit_upsert(
&changed_cols,
)?;
}
emit_parent_pk_change_checks(
emit_parent_key_change_checks(
program,
resolver,
&bt,
resolver.schema.get_indices(table.get_name()).filter(|idx| {
upsert_index_is_affected(table, idx, &changed_cols, rowid_changed)
}),
ctx.cursor_id,
ctx.conflict_rowid_reg,
new_start,

View File

@@ -1104,3 +1104,106 @@ do_execsql_test_on_specific_db {:memory:} fk-delete-composite-bounds {
-- This should be a no-op (no row (5,3)), and MUST NOT error.
DELETE FROM p WHERE a=5 AND b=3;
} {}
# Single column unique index on parent, FK referenced by child
do_execsql_test_in_memory_any_error fk-update-parent-unique-single-col {
PRAGMA foreign_keys=ON;
CREATE TABLE parent(id UNIQUE);
CREATE TABLE child(pid REFERENCES parent(id));
INSERT INTO parent VALUES(1);
INSERT INTO child VALUES(1);
UPDATE parent SET id = 2 WHERE id = 1;
}
# Single column with explicit CREATE UNIQUE INDEX
do_execsql_test_in_memory_any_error fk-update-parent-explicit-unique-single-col {
PRAGMA foreign_keys=ON;
CREATE TABLE parent(id);
CREATE UNIQUE INDEX parent_id_idx ON parent(id);
CREATE TABLE child(pid REFERENCES parent(id));
INSERT INTO parent VALUES(1);
INSERT INTO child VALUES(1);
UPDATE parent SET id = 2 WHERE id = 1;
}
# Multi-column unique index on parent, FK referenced by multi-column FK in child
do_execsql_test_in_memory_any_error fk-update-parent-unique-multi-col {
PRAGMA foreign_keys=ON;
CREATE TABLE parent(a, b, UNIQUE(a, b));
CREATE TABLE child(ca, cb, FOREIGN KEY(ca, cb) REFERENCES parent(a, b));
INSERT INTO parent VALUES(1, 2);
INSERT INTO child VALUES(1, 2);
UPDATE parent SET a = 3 WHERE a = 1 AND b = 2;
}
# Multi-column unique index on parent, FK referenced by multi-column FK in child
do_execsql_test_in_memory_any_error fk-update-parent-unique-multi-col-2 {
PRAGMA foreign_keys=ON;
CREATE TABLE parent(a, b, UNIQUE(a, b));
CREATE TABLE child(ca, cb, FOREIGN KEY(ca, cb) REFERENCES parent(a, b));
INSERT INTO parent VALUES(1, 2);
INSERT INTO child VALUES(1, 2);
UPDATE parent SET b = 3 WHERE a = 1 AND b = 2;
}
# Multi-column index defined explicitly as CREATE UNIQUE INDEX
do_execsql_test_in_memory_any_error fk-update-parent-explicit-unique-multi-col {
PRAGMA foreign_keys=ON;
CREATE TABLE parent(a, b);
CREATE UNIQUE INDEX parent_ab_idx ON parent(a, b);
CREATE TABLE child(ca, cb, FOREIGN KEY(ca, cb) REFERENCES parent(a, b));
INSERT INTO parent VALUES(1, 2);
INSERT INTO child VALUES(1, 2);
UPDATE parent SET a = 3 WHERE a = 1 AND b = 2;
}
# Multi-column index defined explicitly as CREATE UNIQUE INDEX
do_execsql_test_in_memory_any_error fk-update-parent-explicit-unique-multi-col-2 {
PRAGMA foreign_keys=ON;
CREATE TABLE parent(a, b);
CREATE UNIQUE INDEX parent_ab_idx ON parent(a, b);
CREATE TABLE child(ca, cb, FOREIGN KEY(ca, cb) REFERENCES parent(a, b));
INSERT INTO parent VALUES(1, 2);
INSERT INTO child VALUES(1, 2);
UPDATE parent SET b = 3 WHERE a = 1 AND b = 2;
}
# Single column INTEGER PRIMARY KEY
do_execsql_test_in_memory_any_error fk-update-parent-int-pk {
PRAGMA foreign_keys=ON;
CREATE TABLE parent(id INTEGER PRIMARY KEY);
CREATE TABLE child(pid REFERENCES parent(id));
INSERT INTO parent VALUES(1);
INSERT INTO child VALUES(1);
UPDATE parent SET id = 2 WHERE id = 1;
}
# Single column TEXT PRIMARY KEY
do_execsql_test_in_memory_any_error fk-update-parent-text-pk {
PRAGMA foreign_keys=ON;
CREATE TABLE parent(id PRIMARY KEY);
CREATE TABLE child(pid REFERENCES parent(id));
INSERT INTO parent VALUES('key1');
INSERT INTO child VALUES('key1');
UPDATE parent SET id = 'key2' WHERE id = 'key1';
}
# Multi-column PRIMARY KEY
do_execsql_test_in_memory_any_error fk-update-parent-multi-col-pk {
PRAGMA foreign_keys=ON;
CREATE TABLE parent(a, b, PRIMARY KEY(a, b));
CREATE TABLE child(ca, cb, FOREIGN KEY(ca, cb) REFERENCES parent(a, b));
INSERT INTO parent VALUES(1, 2);
INSERT INTO child VALUES(1, 2);
UPDATE parent SET a = 3 WHERE a = 1 AND b = 2;
}
# Multi-column PRIMARY KEY
do_execsql_test_in_memory_any_error fk-update-parent-multi-col-pk-2 {
PRAGMA foreign_keys=ON;
CREATE TABLE parent(a, b, PRIMARY KEY(a, b));
CREATE TABLE child(ca, cb, FOREIGN KEY(ca, cb) REFERENCES parent(a, b));
INSERT INTO parent VALUES(1, 2);
INSERT INTO child VALUES(1, 2);
UPDATE parent SET b = 3 WHERE a = 1 AND b = 2;
}

View File

@@ -679,30 +679,47 @@ mod fuzz_tests {
limbo_exec_rows(&limbo_db, &limbo, &s);
sqlite.execute(&s, params![]).unwrap();
let get_constraint_type = |rng: &mut ChaCha8Rng| match rng.random_range(0..3) {
0 => "INTEGER PRIMARY KEY",
1 => "UNIQUE",
2 => "PRIMARY KEY",
_ => unreachable!(),
};
// Mix of immediate and deferred FK constraints
let s = log_and_exec("CREATE TABLE parent(id INTEGER PRIMARY KEY, a INT, b INT)");
let s = log_and_exec(&format!(
"CREATE TABLE parent(id {}, a INT, b INT)",
get_constraint_type(&mut rng)
));
limbo_exec_rows(&limbo_db, &limbo, &s);
sqlite.execute(&s, params![]).unwrap();
// Child with DEFERRABLE INITIALLY DEFERRED FK
let s = log_and_exec(
"CREATE TABLE child_deferred(id INTEGER PRIMARY KEY, pid INT, x INT, \
let s = log_and_exec(&format!(
"CREATE TABLE child_deferred(id {}, pid INT, x INT, \
FOREIGN KEY(pid) REFERENCES parent(id) DEFERRABLE INITIALLY DEFERRED)",
);
get_constraint_type(&mut rng)
));
limbo_exec_rows(&limbo_db, &limbo, &s);
sqlite.execute(&s, params![]).unwrap();
// Child with immediate FK (default)
let s = log_and_exec(
"CREATE TABLE child_immediate(id INTEGER PRIMARY KEY, pid INT, y INT, \
let s = log_and_exec(&format!(
"CREATE TABLE child_immediate(id {}, pid INT, y INT, \
FOREIGN KEY(pid) REFERENCES parent(id))",
);
get_constraint_type(&mut rng)
));
limbo_exec_rows(&limbo_db, &limbo, &s);
sqlite.execute(&s, params![]).unwrap();
let composite_constraint = match rng.random_range(0..2) {
0 => "PRIMARY KEY",
1 => "UNIQUE",
_ => unreachable!(),
};
// Composite key parent for deferred testing
let s = log_and_exec(
"CREATE TABLE parent_comp(a INT NOT NULL, b INT NOT NULL, c INT, PRIMARY KEY(a,b))",
&format!("CREATE TABLE parent_comp(a INT NOT NULL, b INT NOT NULL, c INT, {composite_constraint}(a,b))"),
);
limbo_exec_rows(&limbo_db, &limbo, &s);
sqlite.execute(&s, params![]).unwrap();