Fix schema representation and methods for ForeignKey resolution

This commit is contained in:
PThorpe92
2025-09-29 19:44:39 -04:00
parent ae975afe49
commit 37c8abf247
6 changed files with 1705 additions and 50 deletions

View File

@@ -395,31 +395,6 @@ impl Schema {
self.indexes_enabled
}
pub fn get_foreign_keys_for_table(&self, table_name: &str) -> Vec<Arc<ForeignKey>> {
self.get_table(table_name)
.and_then(|t| t.btree())
.map(|t| t.foreign_keys.clone())
.unwrap_or_default()
}
/// Get foreign keys where this table is the parent (referenced by other tables)
pub fn get_referencing_foreign_keys(
&self,
parent_table: &str,
) -> Vec<(String, Arc<ForeignKey>)> {
let mut refs = Vec::new();
for table in self.tables.values() {
if let Table::BTree(btree) = table.deref() {
for fk in &btree.foreign_keys {
if fk.parent_table == parent_table {
refs.push((btree.name.as_str().to_string(), fk.clone()));
}
}
}
}
refs
}
/// Update [Schema] by scanning the first root page (sqlite_schema)
pub fn make_from_btree(
&mut self,
@@ -871,11 +846,9 @@ impl Schema {
Ok(())
}
pub fn incoming_fks_to(&self, table_name: &str) -> Vec<IncomingFkRef> {
pub fn incoming_fks_to(&self, table_name: &str) -> Vec<ResolvedFkRef> {
let target = normalize_ident(table_name);
let mut out = vec![];
// Resolve the parent table once
let parent_tbl = self
.get_btree_table(&target)
.expect("incoming_fks_to: parent table must exist");
@@ -995,7 +968,7 @@ impl Schema {
find_parent_unique(&parent_cols)
};
out.push(IncomingFkRef {
out.push(ResolvedFkRef {
child_table: Arc::clone(&child),
fk: Arc::clone(fk),
parent_cols,
@@ -1010,6 +983,117 @@ impl Schema {
out
}
pub fn outgoing_fks_of(&self, child_table: &str) -> Vec<ResolvedFkRef> {
let child_name = normalize_ident(child_table);
let Some(child) = self.get_btree_table(&child_name) else {
return vec![];
};
// Helper to find the UNIQUE/index on the parent that matches the resolved parent cols
let find_parent_unique =
|parent_tbl: &BTreeTable, cols: &Vec<String>| -> Option<Arc<Index>> {
let matches_pk = !parent_tbl.primary_key_columns.is_empty()
&& parent_tbl.primary_key_columns.len() == cols.len()
&& parent_tbl
.primary_key_columns
.iter()
.zip(cols.iter())
.all(|((n, _), c)| n.eq_ignore_ascii_case(c));
if matches_pk {
return None;
}
self.get_indices(&parent_tbl.name)
.find(|idx| {
idx.unique
&& idx.columns.len() == cols.len()
&& idx
.columns
.iter()
.zip(cols.iter())
.all(|(ic, pc)| ic.name.eq_ignore_ascii_case(pc))
})
.cloned()
};
let mut out = Vec::new();
for fk in &child.foreign_keys {
let parent_name = normalize_ident(&fk.parent_table);
let Some(parent_tbl) = self.get_btree_table(&parent_name) else {
continue;
};
// Normalize columns (same rules you used in validation)
let child_cols: Vec<String> = fk
.child_columns
.iter()
.map(|s| normalize_ident(s))
.collect();
let parent_cols: Vec<String> = if fk.parent_columns.is_empty() {
if !parent_tbl.primary_key_columns.is_empty() {
parent_tbl
.primary_key_columns
.iter()
.map(|(n, _)| normalize_ident(n))
.collect()
} else {
vec!["rowid".to_string()]
}
} else {
fk.parent_columns
.iter()
.map(|s| normalize_ident(s))
.collect()
};
// Positions
let child_pos: Vec<usize> = child_cols
.iter()
.map(|c| child.get_column(c).expect("child col missing").0)
.collect();
let parent_pos: Vec<usize> = parent_cols
.iter()
.map(|c| {
parent_tbl
.get_column(c)
.map(|(i, _)| i)
.or_else(|| c.eq_ignore_ascii_case("rowid").then_some(0))
.expect("parent col missing")
})
.collect();
// Parent uses rowid?
let parent_uses_rowid = parent_cols.len() == 1 && {
let c = parent_cols[0].as_str();
c.eq_ignore_ascii_case("rowid")
|| parent_tbl.columns.iter().any(|col| {
col.is_rowid_alias
&& col
.name
.as_deref()
.is_some_and(|n| n.eq_ignore_ascii_case(c))
})
};
let parent_unique_index = if parent_uses_rowid {
None
} else {
find_parent_unique(&parent_tbl, &parent_cols)
};
out.push(ResolvedFkRef {
child_table: Arc::clone(&child),
fk: Arc::clone(fk),
parent_cols,
child_cols,
child_pos,
parent_pos,
parent_uses_rowid,
parent_unique_index,
});
}
out
}
pub fn any_incoming_fk_to(&self, table_name: &str) -> bool {
self.tables.values().any(|t| {
let Some(bt) = t.btree() else {
@@ -1020,6 +1104,37 @@ impl Schema {
.any(|fk| fk.parent_table == table_name)
})
}
/// Returns if this table declares any outgoing FKs (is a child of some parent)
pub fn has_child_fks(&self, table_name: &str) -> bool {
self.get_table(table_name)
.and_then(|t| t.btree())
.is_some_and(|t| !t.foreign_keys.is_empty())
}
/// Return the *declared* (unresolved) FKs for a table. Callers that need
/// positions/rowid/unique info should use `incoming_fks_to` instead.
pub fn get_fks_for_table(&self, table_name: &str) -> Vec<Arc<ForeignKey>> {
self.get_table(table_name)
.and_then(|t| t.btree())
.map(|t| t.foreign_keys.clone())
.unwrap_or_default()
}
/// Return pairs of (child_table_name, FK) for FKs that reference `parent_table`
pub fn get_referencing_fks(&self, parent_table: &str) -> Vec<(String, Arc<ForeignKey>)> {
let mut refs = Vec::new();
for table in self.tables.values() {
if let Table::BTree(btree) = table.deref() {
for fk in &btree.foreign_keys {
if fk.parent_table == parent_table {
refs.push((btree.name.as_str().to_string(), fk.clone()));
}
}
}
}
refs
}
}
impl Clone for Schema {
@@ -1752,11 +1867,11 @@ pub fn _build_pseudo_table(columns: &[ResultColumn]) -> PseudoCursorType {
#[derive(Debug, Clone)]
pub struct ForeignKey {
/// Columns in this table
/// Columns in this table (child side)
pub child_columns: Vec<String>,
/// Referenced table
/// Referenced (parent) table
pub parent_table: String,
/// Referenced columns
/// Parent-side referenced columns
pub parent_columns: Vec<String>,
pub on_delete: RefAct,
pub on_update: RefAct,
@@ -1765,9 +1880,9 @@ pub struct ForeignKey {
pub deferred: bool,
}
/// A single foreign key where `parent_table == target`.
/// A single resolved foreign key where `parent_table == target`.
#[derive(Clone, Debug)]
pub struct IncomingFkRef {
pub struct ResolvedFkRef {
/// Child table that owns the FK.
pub child_table: Arc<BTreeTable>,
/// The FK as declared on the child table.
@@ -1788,7 +1903,7 @@ pub struct IncomingFkRef {
pub parent_unique_index: Option<Arc<Index>>,
}
impl IncomingFkRef {
impl ResolvedFkRef {
/// Returns if any referenced parent column can change when these column positions are updated.
pub fn parent_key_may_change(
&self,

File diff suppressed because it is too large Load Diff

View File

@@ -87,7 +87,7 @@ pub fn translate_insert(
let has_child_fks = fk_enabled
&& !resolver
.schema
.get_foreign_keys_for_table(table_name.as_str())
.get_fks_for_table(table_name.as_str())
.is_empty();
let has_parent_fks = fk_enabled && resolver.schema.any_incoming_fk_to(table_name.as_str());
@@ -1146,7 +1146,6 @@ pub fn translate_insert(
&mut result_columns,
cdc_table.as_ref().map(|c| c.0),
row_done_label,
connection,
)?;
} else {
// UpsertDo::Nothing case
@@ -1910,7 +1909,7 @@ fn emit_fk_checks_for_insert(
});
// Iterate child FKs declared on this table
for fk in resolver.schema.get_foreign_keys_for_table(table_name) {
for fk in resolver.schema.get_fks_for_table(table_name) {
let fk_ok = program.allocate_label();
// If any child column is NULL, skip this FK

View File

@@ -4,7 +4,7 @@
use chrono::Datelike;
use std::sync::Arc;
use turso_macros::match_ignore_ascii_case;
use turso_parser::ast::{self, ColumnDefinition, Expr, Literal, Name};
use turso_parser::ast::{self, ColumnDefinition, Expr, Literal};
use turso_parser::ast::{PragmaName, QualifiedName};
use super::integrity_check::translate_integrity_check;
@@ -388,10 +388,10 @@ fn update_pragma(
Ok((program, TransactionMode::None))
}
PragmaName::ForeignKeys => {
let enabled = match &value {
Expr::Id(name) | Expr::Name(name) => {
let name_str = name.as_str().as_bytes();
match_ignore_ascii_case!(match name_str {
let enabled = match value {
Expr::Name(name) | Expr::Id(name) => {
let name_bytes = name.as_str().as_bytes();
match_ignore_ascii_case!(match name_bytes {
b"ON" | b"TRUE" | b"YES" | b"1" => true,
_ => false,
})

View File

@@ -5,10 +5,15 @@ use std::{collections::HashMap, sync::Arc};
use turso_parser::ast::{self, Upsert};
use crate::error::SQLITE_CONSTRAINT_PRIMARYKEY;
use crate::translate::emitter::{
emit_fk_child_existence_checks, emit_fk_parent_existence_checks,
emit_fk_parent_pk_change_counters,
};
use crate::translate::expr::{walk_expr, WalkControl};
use crate::translate::insert::format_unique_violation_desc;
use crate::translate::planner::ROWID_STRS;
use crate::vdbe::insn::CmpInsFlags;
use crate::Connection;
use crate::{
bail_parse_error,
error::SQLITE_CONSTRAINT_NOTNULL,
@@ -346,6 +351,7 @@ pub fn emit_upsert(
returning: &mut [ResultSetColumn],
cdc_cursor_id: Option<usize>,
row_done_label: BranchOffset,
connection: &Arc<Connection>,
) -> crate::Result<()> {
// Seek & snapshot CURRENT
program.emit_insn(Insn::SeekRowid {
@@ -464,10 +470,179 @@ pub fn emit_upsert(
}
}
let (changed_cols, rowid_changed) = collect_changed_cols(table, set_pairs);
if let Some(bt) = table.btree() {
if connection.foreign_keys_enabled() {
let rowid_new_reg = new_rowid_reg.unwrap_or(conflict_rowid_reg);
// Child-side checks
if resolver.schema.has_child_fks(bt.name.as_str()) {
emit_fk_child_existence_checks(
program,
resolver,
&bt,
table.get_name(),
new_start,
rowid_new_reg,
&changed_cols,
)?;
}
// Parent-side checks only if any incoming FK could care
if resolver.schema.any_incoming_fk_to(table.get_name()) {
// if parent key can't change, skip
let updated_parent_positions: HashSet<usize> =
set_pairs.iter().map(|(i, _)| *i).collect();
let incoming = resolver.schema.incoming_fks_to(table.get_name());
let parent_key_may_change = incoming
.iter()
.any(|r| r.parent_key_may_change(&updated_parent_positions, &bt));
if parent_key_may_change {
let skip_parent_fk = program.allocate_label();
let pk_len = bt.primary_key_columns.len();
match pk_len {
0 => {
// implicit rowid
program.emit_insn(Insn::Eq {
lhs: rowid_new_reg,
rhs: conflict_rowid_reg,
target_pc: skip_parent_fk,
flags: CmpInsFlags::default(),
collation: program.curr_collation(),
});
emit_fk_parent_existence_checks(
program,
resolver,
table.get_name(),
tbl_cursor_id,
conflict_rowid_reg,
)?;
program.preassign_label_to_next_insn(skip_parent_fk);
}
1 => {
// single-col declared PK
let (pk_name, _) = &bt.primary_key_columns[0];
let (pos, col) = bt.get_column(pk_name).unwrap();
let old_reg = program.alloc_register();
if col.is_rowid_alias {
program.emit_insn(Insn::RowId {
cursor_id: tbl_cursor_id,
dest: old_reg,
});
} else {
program.emit_insn(Insn::Column {
cursor_id: tbl_cursor_id,
column: pos,
dest: old_reg,
default: None,
});
}
let new_reg = new_start + pos;
let skip = program.allocate_label();
program.emit_insn(Insn::Eq {
lhs: old_reg,
rhs: new_reg,
target_pc: skip,
flags: CmpInsFlags::default(),
collation: program.curr_collation(),
});
emit_fk_parent_existence_checks(
program,
resolver,
table.get_name(),
tbl_cursor_id,
conflict_rowid_reg,
)?;
program.preassign_label_to_next_insn(skip);
}
_ => {
// composite PK: build OLD/NEW vectors and do the 2-pass counter logic
let old_pk_start = program.alloc_registers(pk_len);
for (i, (pk_name, _)) in bt.primary_key_columns.iter().enumerate() {
let (pos, col) = bt.get_column(pk_name).unwrap();
if col.is_rowid_alias {
// old rowid (UPSERT target) == conflict_rowid_reg
program.emit_insn(Insn::Copy {
src_reg: conflict_rowid_reg,
dst_reg: old_pk_start + i,
extra_amount: 0,
});
} else {
program.emit_insn(Insn::Column {
cursor_id: tbl_cursor_id,
column: pos,
dest: old_pk_start + i,
default: None,
});
}
}
let new_pk_start = program.alloc_registers(pk_len);
for (i, (pk_name, _)) in bt.primary_key_columns.iter().enumerate() {
let (pos, col) = bt.get_column(pk_name).unwrap();
let src = if col.is_rowid_alias {
rowid_new_reg
} else {
new_start + pos
};
program.emit_insn(Insn::Copy {
src_reg: src,
dst_reg: new_pk_start + i,
extra_amount: 0,
});
}
// Compare OLD vs NEW, if all equal then skip
let skip = program.allocate_label();
let changed = program.allocate_label();
for i in 0..pk_len {
if i == pk_len - 1 {
program.emit_insn(Insn::Eq {
lhs: old_pk_start + i,
rhs: new_pk_start + i,
target_pc: skip,
flags: CmpInsFlags::default(),
collation: program.curr_collation(),
});
program.emit_insn(Insn::Goto { target_pc: changed });
} else {
let next = program.allocate_label();
program.emit_insn(Insn::Eq {
lhs: old_pk_start + i,
rhs: new_pk_start + i,
target_pc: next,
flags: CmpInsFlags::default(),
collation: program.curr_collation(),
});
program.emit_insn(Insn::Goto { target_pc: changed });
program.preassign_label_to_next_insn(next);
}
}
program.preassign_label_to_next_insn(changed);
emit_fk_parent_pk_change_counters(
program,
&incoming,
resolver,
old_pk_start,
new_pk_start,
pk_len,
)?;
program.preassign_label_to_next_insn(skip);
}
}
}
}
}
}
// Index rebuild (DELETE old, INSERT new), honoring partial-index WHEREs
if let Some(before) = before_start {
let (changed_cols, rowid_changed) = collect_changed_cols(table, set_pairs);
for (idx_name, _root, idx_cid) in idx_cursors {
let idx_meta = resolver
.schema

View File

@@ -3,12 +3,11 @@ pub mod grammar_generator;
#[cfg(test)]
mod tests {
use rand::seq::{IndexedRandom, SliceRandom};
use std::collections::HashSet;
use turso_core::DatabaseOpts;
use rand::{Rng, SeedableRng};
use rand_chacha::ChaCha8Rng;
use rusqlite::{params, types::Value};
use std::{collections::HashSet, io::Write};
use turso_core::DatabaseOpts;
use crate::{
common::{
@@ -646,6 +645,422 @@ mod tests {
"Different results! limbo: {:?}, sqlite: {:?}, seed: {}, query: {}, table def: {}",
limbo_rows, sqlite_rows, seed, query, table_defs[i]
);
}
}
}
}
pub fn fk_single_pk_mutation_fuzz() {
let _ = env_logger::try_init();
let (mut rng, seed) = rng_from_time();
println!("fk_single_pk_mutation_fuzz seed: {seed}");
const OUTER_ITERS: usize = 50;
const INNER_ITERS: usize = 200;
for outer in 0..OUTER_ITERS {
println!("fk_single_pk_mutation_fuzz {}/{}", outer + 1, OUTER_ITERS);
let limbo_db = TempDatabase::new_empty(true);
let sqlite_db = TempDatabase::new_empty(true);
let limbo = limbo_db.connect_limbo();
let sqlite = rusqlite::Connection::open(sqlite_db.path.clone()).unwrap();
// Statement log for this iteration
let mut stmts: Vec<String> = Vec::new();
let mut log_and_exec = |sql: &str| {
stmts.push(sql.to_string());
sql.to_string()
};
// Enable FKs in both engines
let s = log_and_exec("PRAGMA foreign_keys=ON");
limbo_exec_rows(&limbo_db, &limbo, &s);
sqlite.execute(&s, params![]).unwrap();
// DDL
let s = log_and_exec("CREATE TABLE p(id INTEGER PRIMARY KEY, a INT, b INT)");
limbo_exec_rows(&limbo_db, &limbo, &s);
sqlite.execute(&s, params![]).unwrap();
let s = log_and_exec(
"CREATE TABLE c(id INTEGER PRIMARY KEY, x INT, y INT, FOREIGN KEY(x) REFERENCES p(id))",
);
limbo_exec_rows(&limbo_db, &limbo, &s);
sqlite.execute(&s, params![]).unwrap();
// Seed parent
let n_par = rng.random_range(5..=40);
let mut used_ids = std::collections::HashSet::new();
for _ in 0..n_par {
let mut id;
loop {
id = rng.random_range(1..=200) as i64;
if used_ids.insert(id) {
break;
}
}
let a = rng.random_range(-5..=25);
let b = rng.random_range(-5..=25);
let stmt = log_and_exec(&format!("INSERT INTO p VALUES ({id}, {a}, {b})"));
limbo_exec_rows(&limbo_db, &limbo, &stmt);
sqlite.execute(&stmt, params![]).unwrap();
}
// Seed child
let n_child = rng.random_range(5..=80);
for i in 0..n_child {
let id = 1000 + i as i64;
let x = if rng.random_bool(0.8) {
*used_ids.iter().choose(&mut rng).unwrap()
} else {
rng.random_range(1..=220) as i64
};
let y = rng.random_range(-10..=10);
let stmt = log_and_exec(&format!("INSERT INTO c VALUES ({id}, {x}, {y})"));
match (
sqlite.execute(&stmt, params![]),
limbo_exec_rows_fallible(&limbo_db, &limbo, &stmt),
) {
(Ok(_), Ok(_)) => {}
(Err(_), Err(_)) => {}
(x, y) => {
eprintln!("\n=== FK fuzz failure (seeding mismatch) ===");
eprintln!("seed: {seed}, outer: {}", outer + 1);
eprintln!("sqlite: {x:?}, limbo: {y:?}");
eprintln!("last stmt: {stmt}");
eprintln!("--- replay statements ({}) ---", stmts.len());
for (i, s) in stmts.iter().enumerate() {
eprintln!("{:04}: {};", i + 1, s);
}
panic!("Seeding child insert mismatch");
}
}
}
// Mutations
for _ in 0..INNER_ITERS {
let action = rng.random_range(0..6);
let stmt = match action {
// Parent INSERT
0 => {
let mut id;
let mut tries = 0;
loop {
id = rng.random_range(1..=250) as i64;
if !used_ids.contains(&id) || tries > 10 {
break;
}
tries += 1;
}
let a = rng.random_range(-5..=25);
let b = rng.random_range(-5..=25);
format!("INSERT INTO p VALUES({id}, {a}, {b})")
}
// Parent UPDATE
1 => {
if rng.random_bool(0.5) {
let old = rng.random_range(1..=250);
let new_id = rng.random_range(1..=260);
format!("UPDATE p SET id={new_id} WHERE id={old}")
} else {
let a = rng.random_range(-5..=25);
let b = rng.random_range(-5..=25);
let tgt = rng.random_range(1..=260);
format!("UPDATE p SET a={a}, b={b} WHERE id={tgt}")
}
}
// Parent DELETE
2 => {
let del_id = rng.random_range(1..=260);
format!("DELETE FROM p WHERE id={del_id}")
}
// Child INSERT
3 => {
let id = rng.random_range(1000..=2000);
let x = if rng.random_bool(0.7) {
if let Some(p) = used_ids.iter().choose(&mut rng) {
*p
} else {
rng.random_range(1..=260) as i64
}
} else {
rng.random_range(1..=260) as i64
};
let y = rng.random_range(-10..=10);
format!("INSERT INTO c VALUES({id}, {x}, {y})")
}
// Child UPDATE
4 => {
let pick = rng.random_range(1000..=2000);
if rng.random_bool(0.6) {
let new_x = if rng.random_bool(0.7) {
if let Some(p) = used_ids.iter().choose(&mut rng) {
*p
} else {
rng.random_range(1..=260) as i64
}
} else {
rng.random_range(1..=260) as i64
};
format!("UPDATE c SET x={new_x} WHERE id={pick}")
} else {
let new_y = rng.random_range(-10..=10);
format!("UPDATE c SET y={new_y} WHERE id={pick}")
}
}
// Child DELETE
_ => {
let pick = rng.random_range(1000..=2000);
format!("DELETE FROM c WHERE id={pick}")
}
};
let stmt = log_and_exec(&stmt);
let sres = sqlite.execute(&stmt, params![]);
let lres = limbo_exec_rows_fallible(&limbo_db, &limbo, &stmt);
match (sres, lres) {
(Ok(_), Ok(_)) => {
if stmt.starts_with("INSERT INTO p VALUES(") {
if let Some(tok) = stmt.split_whitespace().nth(4) {
if let Some(idtok) = tok.split(['(', ',']).nth(1) {
if let Ok(idnum) = idtok.parse::<i64>() {
used_ids.insert(idnum);
}
}
}
}
let sp = sqlite_exec_rows(&sqlite, "SELECT id,a,b FROM p ORDER BY id");
let sc = sqlite_exec_rows(&sqlite, "SELECT id,x,y FROM c ORDER BY id");
let lp =
limbo_exec_rows(&limbo_db, &limbo, "SELECT id,a,b FROM p ORDER BY id");
let lc =
limbo_exec_rows(&limbo_db, &limbo, "SELECT id,x,y FROM c ORDER BY id");
if sp != lp || sc != lc {
eprintln!("\n=== FK fuzz failure (state mismatch) ===");
eprintln!("seed: {seed}, outer: {}", outer + 1);
eprintln!("last stmt: {stmt}");
eprintln!("sqlite p: {sp:?}\nsqlite c: {sc:?}");
eprintln!("limbo p: {lp:?}\nlimbo c: {lc:?}");
eprintln!("--- replay statements ({}) ---", stmts.len());
for (i, s) in stmts.iter().enumerate() {
eprintln!("{:04}: {};", i + 1, s);
}
panic!("State mismatch");
}
}
(Err(_), Err(_)) => { /* parity OK */ }
(ok_sqlite, ok_limbo) => {
eprintln!("\n=== FK fuzz failure (outcome mismatch) ===");
eprintln!("seed: {seed}, outer: {}", outer + 1);
eprintln!("sqlite: {ok_sqlite:?}, limbo: {ok_limbo:?}");
eprintln!("last stmt: {stmt}");
// dump final states to help decide who is right
let sp = sqlite_exec_rows(&sqlite, "SELECT id,a,b FROM p ORDER BY id");
let sc = sqlite_exec_rows(&sqlite, "SELECT id,x,y FROM c ORDER BY id");
let lp =
limbo_exec_rows(&limbo_db, &limbo, "SELECT id,a,b FROM p ORDER BY id");
let lc =
limbo_exec_rows(&limbo_db, &limbo, "SELECT id,x,y FROM c ORDER BY id");
eprintln!("sqlite p: {sp:?}\nsqlite c: {sc:?}");
eprintln!("turso p: {lp:?}\nturso c: {lc:?}");
eprintln!(
"--- writing ({}) statements to fk_fuzz_statements.sql ---",
stmts.len()
);
let mut file = std::fs::File::create("fk_fuzz_statements.sql").unwrap();
for s in stmts.iter() {
let _ = file.write_fmt(format_args!("{s};\n"));
}
file.flush().unwrap();
panic!("DML outcome mismatch, statements written to tests/fk_fuzz_statements.sql");
}
}
}
}
}
#[test]
#[ignore] // TODO: un-ignore when UNIQUE constraints are fixed
pub fn fk_composite_pk_mutation_fuzz() {
let _ = env_logger::try_init();
let (mut rng, seed) = rng_from_time();
println!("fk_composite_pk_mutation_fuzz seed: {seed}");
const OUTER_ITERS: usize = 30;
const INNER_ITERS: usize = 200;
for outer in 0..OUTER_ITERS {
println!(
"fk_composite_pk_mutation_fuzz {}/{}",
outer + 1,
OUTER_ITERS
);
let limbo_db = TempDatabase::new_empty(true);
let sqlite_db = TempDatabase::new_empty(true);
let limbo = limbo_db.connect_limbo();
let sqlite = rusqlite::Connection::open(sqlite_db.path.clone()).unwrap();
let mut stmts: Vec<String> = Vec::new();
let mut log_and_exec = |sql: &str| {
stmts.push(sql.to_string());
sql.to_string()
};
// Enable FKs in both engines
let _ = log_and_exec("PRAGMA foreign_keys=ON");
limbo_exec_rows(&limbo_db, &limbo, "PRAGMA foreign_keys=ON");
sqlite.execute("PRAGMA foreign_keys=ON", params![]).unwrap();
// Parent PK is composite (a,b). Child references (x,y) -> (a,b).
let s = log_and_exec(
"CREATE TABLE p(a INT NOT NULL, b INT NOT NULL, v INT, PRIMARY KEY(a,b))",
);
limbo_exec_rows(&limbo_db, &limbo, &s);
sqlite.execute(&s, params![]).unwrap();
let s = log_and_exec(
"CREATE TABLE c(id INTEGER PRIMARY KEY, x INT, y INT, w INT, \
FOREIGN KEY(x,y) REFERENCES p(a,b))",
);
limbo_exec_rows(&limbo_db, &limbo, &s);
sqlite.execute(&s, params![]).unwrap();
// Seed parent: small grid of (a,b)
let mut pairs: Vec<(i64, i64)> = Vec::new();
for _ in 0..rng.random_range(5..=25) {
let a = rng.random_range(-3..=6);
let b = rng.random_range(-3..=6);
if !pairs.contains(&(a, b)) {
pairs.push((a, b));
let v = rng.random_range(0..=20);
let stmt = log_and_exec(&format!("INSERT INTO p VALUES({a},{b},{v})"));
limbo_exec_rows(&limbo_db, &limbo, &stmt);
sqlite.execute(&stmt, params![]).unwrap();
}
}
// Seed child rows, 70% chance to reference existing (a,b)
for i in 0..rng.random_range(5..=60) {
let id = 5000 + i as i64;
let (x, y) = if rng.random_bool(0.7) {
*pairs.choose(&mut rng).unwrap_or(&(0, 0))
} else {
(rng.random_range(-4..=7), rng.random_range(-4..=7))
};
let w = rng.random_range(-10..=10);
let stmt = log_and_exec(&format!("INSERT INTO c VALUES({id}, {x}, {y}, {w})"));
let _ = sqlite.execute(&stmt, params![]);
let _ = limbo_exec_rows_fallible(&limbo_db, &limbo, &stmt);
}
for _ in 0..INNER_ITERS {
let op = rng.random_range(0..6);
let stmt = log_and_exec(&match op {
// INSERT parent
0 => {
let a = rng.random_range(-4..=8);
let b = rng.random_range(-4..=8);
let v = rng.random_range(0..=20);
format!("INSERT INTO p VALUES({a},{b},{v})")
}
// UPDATE parent composite key (a,b)
1 => {
let a_old = rng.random_range(-4..=8);
let b_old = rng.random_range(-4..=8);
let a_new = rng.random_range(-4..=8);
let b_new = rng.random_range(-4..=8);
format!("UPDATE p SET a={a_new}, b={b_new} WHERE a={a_old} AND b={b_old}")
}
// DELETE parent
2 => {
let a = rng.random_range(-4..=8);
let b = rng.random_range(-4..=8);
format!("DELETE FROM p WHERE a={a} AND b={b}")
}
// INSERT child
3 => {
let id = rng.random_range(5000..=7000);
let (x, y) = if rng.random_bool(0.7) {
*pairs.choose(&mut rng).unwrap_or(&(0, 0))
} else {
(rng.random_range(-4..=8), rng.random_range(-4..=8))
};
let w = rng.random_range(-10..=10);
format!("INSERT INTO c VALUES({id},{x},{y},{w})")
}
// UPDATE child FK columns (x,y)
4 => {
let id = rng.random_range(5000..=7000);
let (x, y) = if rng.random_bool(0.7) {
*pairs.choose(&mut rng).unwrap_or(&(0, 0))
} else {
(rng.random_range(-4..=8), rng.random_range(-4..=8))
};
format!("UPDATE c SET x={x}, y={y} WHERE id={id}")
}
// DELETE child
_ => {
let id = rng.random_range(5000..=7000);
format!("DELETE FROM c WHERE id={id}")
}
});
let sres = sqlite.execute(&stmt, params![]);
let lres = limbo_exec_rows_fallible(&limbo_db, &limbo, &stmt);
match (sres, lres) {
(Ok(_), Ok(_)) => {
// Compare canonical states
let sp = sqlite_exec_rows(&sqlite, "SELECT a,b,v FROM p ORDER BY a,b,v");
let sc = sqlite_exec_rows(&sqlite, "SELECT id,x,y,w FROM c ORDER BY id");
let lp = limbo_exec_rows(
&limbo_db,
&limbo,
"SELECT a,b,v FROM p ORDER BY a,b,v",
);
let lc = limbo_exec_rows(
&limbo_db,
&limbo,
"SELECT id,x,y,w FROM c ORDER BY id",
);
assert_eq!(sp, lp, "seed {seed}, stmt {stmt}");
assert_eq!(sc, lc, "seed {seed}, stmt {stmt}");
}
(Err(_), Err(_)) => { /* both errored -> parity OK */ }
(ok_s, ok_l) => {
eprintln!(
"Mismatch sqlite={ok_s:?}, limbo={ok_l:?}, stmt={stmt}, seed={seed}"
);
let sp = sqlite_exec_rows(&sqlite, "SELECT a,b,v FROM p ORDER BY a,b,v");
let sc = sqlite_exec_rows(&sqlite, "SELECT id,x,y,w FROM c ORDER BY id");
let lp = limbo_exec_rows(
&limbo_db,
&limbo,
"SELECT a,b,v FROM p ORDER BY a,b,v",
);
let lc = limbo_exec_rows(
&limbo_db,
&limbo,
"SELECT id,x,y,w FROM c ORDER BY id",
);
eprintln!(
"sqlite p={sp:?}\nsqlite c={sc:?}\nlimbo p={lp:?}\nlimbo c={lc:?}"
);
let mut file =
std::fs::File::create("fk_composite_fuzz_statements.sql").unwrap();
for s in stmts.iter() {
let _ = writeln!(&file, "{s};");
}
file.flush().unwrap();
panic!("DML outcome mismatch, sql file written to tests/fk_composite_fuzz_statements.sql");
}
}
}
}
}