Merge 'core/translate: implement basic foreign key constraint support' from Preston Thorpe

This PR introduces support for foreign key constraints, and the `PRAGMA
foreign_keys;`, and relevant opcodes: `FkCounter` and `FkIfZero`.
Extensive fuzz tests were added both for regular and composite
PK/rowid/unique index constraints, as well as some really weird
edgecases to make sure we our affinity handling is correct as well when
we trigger the constraints.
Foreign-key checking is driven by two VDBE ops: `FkCounter` and
`FkIfZero`, and
 `FkCounter` is a running meter on the `Connection` for deferred FK
violations. When an `insert/delete/update` operation creates a potential
orphan (we insert a child row that doesn’t have a matching parent, or we
delete/update a parent that children still point at), this counter is
incremented. When a later operation fixes that (e.g. we insert the
missing parent or re-target the child), we decrement the counter. If any
is remaining at commit time, the commit fails. For immediate
constraints, on the violation path we emit Halt right away.
`FkIfZero` can either be used to guard a decrement of FkCounter to
prevent underflow, or can potentially (in the future) be used to avoid
work checking if any constraints need resolving.
NOTE: this PR does not implement `pragma defer_foreign_keys` for global
`deferred` constraint semantics. only explicit `col INT REFERENCES t(id)
DEFERRABLE INITIALLY DEFERRED` is supported in this PR.
This PR does not add support for `ON UPDATE|DELETE CASCADE`, only for
basic implicit `DO NOTHING` behavior.
~~NOTE: I did notice that, as referenced here: #3463~~
~~our current handling of unique constraints does not pass fuzz tests, I
believe only in the case of composite primary keys,~~ ~~because the fuzz
test for FK referencing composite PK is failing but only for UNIQUE
constraints, never (or as many times as i tried) for foreign key
constraints.~~
EDIT: all fuzzers are passing, because @sivukhin fixed the unique
constraint issue.
The reason that the `deferred` fuzzer is `#[ignore]`'d is because sqlite
uses sub-transactions, and even though the fuzzing only does 1 entry per
transaction... the fuzzer can lose track of _when_ it's in a transaction
and when it hits a FK constraint, and there is an error in both DB's, it
can just continue to do run regular statements, and then the eventual
ROLLBACK will revert different things in sqlite vs turso.. so for now,
we leave it `ignore`d

Reviewed-by: Jussi Saurio <jussi.saurio@gmail.com>

Closes #3510
This commit is contained in:
Jussi Saurio
2025-10-08 11:44:24 +03:00
committed by GitHub
31 changed files with 4765 additions and 86 deletions

1
.gitignore vendored
View File

@@ -43,6 +43,7 @@ simulator.log
**/*.txt
profile.json.gz
simulator-output/
tests/*.sql
&1
bisected.sql

View File

@@ -448,8 +448,8 @@ Modifiers:
| Eq | Yes | |
| Expire | No | |
| Explain | No | |
| FkCounter | No | |
| FkIfZero | No | |
| FkCounter | Yes | |
| FkIfZero | Yes | |
| Found | Yes | |
| Function | Yes | |
| Ge | Yes | |

View File

@@ -163,6 +163,7 @@ impl From<turso_ext::ResultCode> for LimboError {
pub const SQLITE_CONSTRAINT: usize = 19;
pub const SQLITE_CONSTRAINT_PRIMARYKEY: usize = SQLITE_CONSTRAINT | (6 << 8);
pub const SQLITE_CONSTRAINT_FOREIGNKEY: usize = SQLITE_CONSTRAINT | (7 << 8);
pub const SQLITE_CONSTRAINT_NOTNULL: usize = SQLITE_CONSTRAINT | (5 << 8);
pub const SQLITE_FULL: usize = 13; // we want this in autoincrement - incase if user inserts max allowed int
pub const SQLITE_CONSTRAINT_UNIQUE: usize = 2067;

View File

@@ -2245,6 +2245,7 @@ mod tests {
is_strict: false,
has_autoincrement: false,
unique_sets: vec![],
foreign_keys: vec![],
};
schema.add_btree_table(Arc::new(users_table));
@@ -2298,6 +2299,7 @@ mod tests {
is_strict: false,
has_autoincrement: false,
unique_sets: vec![],
foreign_keys: vec![],
};
schema.add_btree_table(Arc::new(products_table));
@@ -2363,6 +2365,7 @@ mod tests {
has_autoincrement: false,
is_strict: false,
unique_sets: vec![],
foreign_keys: vec![],
};
schema.add_btree_table(Arc::new(orders_table));
@@ -2401,6 +2404,7 @@ mod tests {
is_strict: false,
has_autoincrement: false,
unique_sets: vec![],
foreign_keys: vec![],
};
schema.add_btree_table(Arc::new(customers_table));
@@ -2463,6 +2467,7 @@ mod tests {
is_strict: false,
has_autoincrement: false,
unique_sets: vec![],
foreign_keys: vec![],
};
schema.add_btree_table(Arc::new(purchases_table));
@@ -2513,6 +2518,7 @@ mod tests {
is_strict: false,
has_autoincrement: false,
unique_sets: vec![],
foreign_keys: vec![],
};
schema.add_btree_table(Arc::new(vendors_table));
@@ -2550,6 +2556,7 @@ mod tests {
is_strict: false,
has_autoincrement: false,
unique_sets: vec![],
foreign_keys: vec![],
};
schema.add_btree_table(Arc::new(sales_table));

View File

@@ -1411,6 +1411,7 @@ mod tests {
has_rowid: true,
is_strict: false,
unique_sets: vec![],
foreign_keys: vec![],
has_autoincrement: false,
};
@@ -1460,6 +1461,7 @@ mod tests {
has_rowid: true,
is_strict: false,
has_autoincrement: false,
foreign_keys: vec![],
unique_sets: vec![],
};
@@ -1509,6 +1511,7 @@ mod tests {
has_rowid: true,
is_strict: false,
has_autoincrement: false,
foreign_keys: vec![],
unique_sets: vec![],
};
@@ -1558,6 +1561,7 @@ mod tests {
has_rowid: true, // Has implicit rowid but no alias
is_strict: false,
has_autoincrement: false,
foreign_keys: vec![],
unique_sets: vec![],
};

View File

@@ -63,17 +63,16 @@ pub use io::{
};
use parking_lot::RwLock;
use schema::Schema;
use std::cell::Cell;
use std::{
borrow::Cow,
cell::RefCell,
cell::{Cell, RefCell},
collections::HashMap,
fmt::{self, Display},
num::NonZero,
ops::Deref,
rc::Rc,
sync::{
atomic::{AtomicBool, AtomicI32, AtomicI64, AtomicU16, AtomicUsize, Ordering},
atomic::{AtomicBool, AtomicI32, AtomicI64, AtomicIsize, AtomicU16, AtomicUsize, Ordering},
Arc, LazyLock, Mutex, Weak,
},
time::Duration,
@@ -583,6 +582,8 @@ impl Database {
data_sync_retry: AtomicBool::new(false),
busy_timeout: RwLock::new(Duration::new(0, 0)),
is_mvcc_bootstrap_connection: AtomicBool::new(is_mvcc_bootstrap_connection),
fk_pragma: AtomicBool::new(false),
fk_deferred_violations: AtomicIsize::new(0),
});
self.n_connections
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
@@ -1100,6 +1101,9 @@ pub struct Connection {
busy_timeout: RwLock<std::time::Duration>,
/// Whether this is an internal connection used for MVCC bootstrap
is_mvcc_bootstrap_connection: AtomicBool,
/// Whether pragma foreign_keys=ON for this connection
fk_pragma: AtomicBool,
fk_deferred_violations: AtomicIsize,
}
impl Drop for Connection {
@@ -1532,6 +1536,21 @@ impl Connection {
Ok(db)
}
pub fn set_foreign_keys_enabled(&self, enable: bool) {
self.fk_pragma.store(enable, Ordering::Release);
}
pub fn foreign_keys_enabled(&self) -> bool {
self.fk_pragma.load(Ordering::Acquire)
}
pub(crate) fn clear_deferred_foreign_key_violations(&self) -> isize {
self.fk_deferred_violations.swap(0, Ordering::Release)
}
pub(crate) fn get_deferred_foreign_key_violations(&self) -> isize {
self.fk_deferred_violations.load(Ordering::Acquire)
}
pub fn maybe_update_schema(&self) {
let current_schema_version = self.schema.read().schema_version;
let schema = self.db.schema.lock().unwrap();

View File

@@ -131,6 +131,10 @@ pub fn pragma_for(pragma: &PragmaName) -> Pragma {
PragmaFlags::NoColumns1 | PragmaFlags::Result0,
&["mvcc_checkpoint_threshold"],
),
ForeignKeys => Pragma::new(
PragmaFlags::NoColumns1 | PragmaFlags::Result0,
&["foreign_keys"],
),
}
}

View File

@@ -89,7 +89,9 @@ use std::ops::Deref;
use std::sync::Arc;
use std::sync::Mutex;
use tracing::trace;
use turso_parser::ast::{self, ColumnDefinition, Expr, Literal, SortOrder, TableOptions};
use turso_parser::ast::{
self, ColumnDefinition, Expr, InitDeferredPred, Literal, RefAct, SortOrder, TableOptions,
};
use turso_parser::{
ast::{Cmd, CreateTableBody, ResultColumn, Stmt},
parser::Parser,
@@ -646,6 +648,7 @@ impl Schema {
has_rowid: true,
is_strict: false,
has_autoincrement: false,
foreign_keys: vec![],
unique_sets: vec![],
})));
@@ -842,6 +845,266 @@ impl Schema {
Ok(())
}
/// Compute all resolved FKs *referencing* `table_name` (arg: `table_name` is the parent).
/// Each item contains the child table, normalized columns/positions, and the parent lookup
/// strategy (rowid vs. UNIQUE index or PK).
pub fn resolved_fks_referencing(&self, table_name: &str) -> Result<Vec<ResolvedFkRef>> {
let fk_mismatch_err = |child: &str, parent: &str| -> crate::LimboError {
crate::LimboError::Constraint(format!(
"foreign key mismatch - \"{child}\" referencing \"{parent}\""
))
};
let target = normalize_ident(table_name);
let mut out = Vec::with_capacity(4); // arbitrary estimate
let parent_tbl = self
.get_btree_table(&target)
.ok_or_else(|| fk_mismatch_err("<unknown>", &target))?;
// Precompute helper to find parent unique index, if it's not the rowid
let find_parent_unique = |cols: &Vec<String>| -> Option<Arc<Index>> {
self.get_indices(&parent_tbl.name)
.find(|idx| {
idx.unique
&& idx.columns.len() == cols.len()
&& idx
.columns
.iter()
.zip(cols.iter())
.all(|(ic, pc)| ic.name.eq_ignore_ascii_case(pc))
})
.cloned()
};
for t in self.tables.values() {
let Some(child) = t.btree() else {
continue;
};
for fk in &child.foreign_keys {
if !fk.parent_table.eq_ignore_ascii_case(&target) {
continue;
}
if fk.child_columns.is_empty() {
// SQLite requires an explicit child column list unless the table has a single-column PK that
return Err(fk_mismatch_err(&child.name, &parent_tbl.name));
}
let child_cols: Vec<String> = fk.child_columns.clone();
let mut child_pos = Vec::with_capacity(child_cols.len());
for cname in &child_cols {
let (i, _) = child
.get_column(cname)
.ok_or_else(|| fk_mismatch_err(&child.name, &parent_tbl.name))?;
child_pos.push(i);
}
let parent_cols: Vec<String> = if fk.parent_columns.is_empty() {
if !parent_tbl.primary_key_columns.is_empty() {
parent_tbl
.primary_key_columns
.iter()
.map(|(col, _)| col)
.cloned()
.collect()
} else {
return Err(fk_mismatch_err(&child.name, &parent_tbl.name));
}
} else {
fk.parent_columns.clone()
};
// Same length required
if parent_cols.len() != child_cols.len() {
return Err(fk_mismatch_err(&child.name, &parent_tbl.name));
}
let mut parent_pos = Vec::with_capacity(parent_cols.len());
for pc in &parent_cols {
let pos = parent_tbl.get_column(pc).map(|(i, _)| i).or_else(|| {
ROWID_STRS
.iter()
.any(|s| pc.eq_ignore_ascii_case(s))
.then_some(0)
});
let Some(p) = pos else {
return Err(fk_mismatch_err(&child.name, &parent_tbl.name));
};
parent_pos.push(p);
}
// Determine if parent key is ROWID/alias
let parent_uses_rowid = parent_tbl.primary_key_columns.len().eq(&1) && {
if parent_tbl.primary_key_columns.len() == 1 {
let pk_name = &parent_tbl.primary_key_columns[0].0;
// rowid or alias INTEGER PRIMARY KEY; either is ok implicitly
parent_tbl.columns.iter().any(|c| {
c.is_rowid_alias
&& c.name
.as_deref()
.is_some_and(|n| n.eq_ignore_ascii_case(pk_name))
}) || ROWID_STRS.iter().any(|&r| r.eq_ignore_ascii_case(pk_name))
} else {
false
}
};
// If not rowid, there must be a non-partial UNIQUE exactly on parent_cols
let parent_unique_index = if parent_uses_rowid {
None
} else {
find_parent_unique(&parent_cols)
.ok_or_else(|| fk_mismatch_err(&child.name, &parent_tbl.name))?
.into()
};
fk.validate()?;
out.push(ResolvedFkRef {
child_table: Arc::clone(&child),
fk: Arc::clone(fk),
parent_cols,
child_cols,
child_pos,
parent_pos,
parent_uses_rowid,
parent_unique_index,
});
}
}
Ok(out)
}
/// Compute all resolved FKs *declared by* `child_table`
pub fn resolved_fks_for_child(&self, child_table: &str) -> crate::Result<Vec<ResolvedFkRef>> {
let fk_mismatch_err = |child: &str, parent: &str| -> crate::LimboError {
crate::LimboError::Constraint(format!(
"foreign key mismatch - \"{child}\" referencing \"{parent}\""
))
};
let child_name = normalize_ident(child_table);
let child = self
.get_btree_table(&child_name)
.ok_or_else(|| fk_mismatch_err(&child_name, "<unknown>"))?;
let mut out = Vec::with_capacity(child.foreign_keys.len());
for fk in &child.foreign_keys {
let parent_name = normalize_ident(&fk.parent_table);
let parent_tbl = self
.get_btree_table(&parent_name)
.ok_or_else(|| fk_mismatch_err(&child.name, &parent_name))?;
let child_cols: Vec<String> = fk.child_columns.clone();
if child_cols.is_empty() {
return Err(fk_mismatch_err(&child.name, &parent_tbl.name));
}
// Child positions exist
let mut child_pos = Vec::with_capacity(child_cols.len());
for cname in &child_cols {
let (i, _) = child
.get_column(cname)
.ok_or_else(|| fk_mismatch_err(&child.name, &parent_tbl.name))?;
child_pos.push(i);
}
let parent_cols: Vec<String> = if fk.parent_columns.is_empty() {
if !parent_tbl.primary_key_columns.is_empty() {
parent_tbl
.primary_key_columns
.iter()
.map(|(col, _)| col)
.cloned()
.collect()
} else {
return Err(fk_mismatch_err(&child.name, &parent_tbl.name));
}
} else {
fk.parent_columns.clone()
};
if parent_cols.len() != child_cols.len() {
return Err(fk_mismatch_err(&child.name, &parent_tbl.name));
}
// Parent positions exist, or rowid sentinel
let mut parent_pos = Vec::with_capacity(parent_cols.len());
for pc in &parent_cols {
let pos = parent_tbl.get_column(pc).map(|(i, _)| i).or_else(|| {
ROWID_STRS
.iter()
.any(|&r| r.eq_ignore_ascii_case(pc))
.then_some(0)
});
let Some(p) = pos else {
return Err(fk_mismatch_err(&child.name, &parent_tbl.name));
};
parent_pos.push(p);
}
let parent_uses_rowid = parent_cols.len().eq(&1) && {
let c = parent_cols[0].as_str();
ROWID_STRS.iter().any(|&r| r.eq_ignore_ascii_case(c))
|| parent_tbl.columns.iter().any(|col| {
col.is_rowid_alias
&& col
.name
.as_deref()
.is_some_and(|n| n.eq_ignore_ascii_case(c))
})
};
// Must be PK or a non-partial UNIQUE on exactly those columns.
let parent_unique_index = if parent_uses_rowid {
None
} else {
self.get_indices(&parent_tbl.name)
.find(|idx| {
idx.unique
&& idx.where_clause.is_none()
&& idx.columns.len() == parent_cols.len()
&& idx
.columns
.iter()
.zip(parent_cols.iter())
.all(|(ic, pc)| ic.name.eq_ignore_ascii_case(pc))
})
.cloned()
.ok_or_else(|| fk_mismatch_err(&child.name, &parent_tbl.name))?
.into()
};
fk.validate()?;
out.push(ResolvedFkRef {
child_table: Arc::clone(&child),
fk: Arc::clone(fk),
parent_cols,
child_cols,
child_pos,
parent_pos,
parent_uses_rowid,
parent_unique_index,
});
}
Ok(out)
}
/// Returns if any table declares a FOREIGN KEY whose parent is `table_name`.
pub fn any_resolved_fks_referencing(&self, table_name: &str) -> bool {
self.tables.values().any(|t| {
let Some(bt) = t.btree() else {
return false;
};
bt.foreign_keys
.iter()
.any(|fk| fk.parent_table == table_name)
})
}
/// Returns true if `table_name` declares any FOREIGN KEYs
pub fn has_child_fks(&self, table_name: &str) -> bool {
self.get_table(table_name)
.and_then(|t| t.btree())
.is_some_and(|t| !t.foreign_keys.is_empty())
}
}
impl Clone for Schema {
@@ -1016,6 +1279,7 @@ pub struct BTreeTable {
pub is_strict: bool,
pub has_autoincrement: bool,
pub unique_sets: Vec<UniqueSet>,
pub foreign_keys: Vec<Arc<ForeignKey>>,
}
impl BTreeTable {
@@ -1146,6 +1410,7 @@ pub fn create_table(tbl_name: &str, body: &CreateTableBody, root_page: i64) -> R
let mut has_rowid = true;
let mut has_autoincrement = false;
let mut primary_key_columns = vec![];
let mut foreign_keys = vec![];
let mut cols = vec![];
let is_strict: bool;
let mut unique_sets: Vec<UniqueSet> = vec![];
@@ -1219,6 +1484,73 @@ pub fn create_table(tbl_name: &str, body: &CreateTableBody, root_page: i64) -> R
is_primary_key: false,
};
unique_sets.push(unique_set);
} else if let ast::TableConstraint::ForeignKey {
columns,
clause,
defer_clause,
} = &c.constraint
{
let child_columns: Vec<String> = columns
.iter()
.map(|ic| normalize_ident(ic.col_name.as_str()))
.collect();
// derive parent columns: explicit or default to parent PK
let parent_table = normalize_ident(clause.tbl_name.as_str());
let parent_columns: Vec<String> = clause
.columns
.iter()
.map(|ic| normalize_ident(ic.col_name.as_str()))
.collect();
// Only check arity if parent columns were explicitly listed
if !parent_columns.is_empty() && child_columns.len() != parent_columns.len() {
crate::bail_parse_error!(
"foreign key on \"{}\" has {} child column(s) but {} parent column(s)",
tbl_name,
child_columns.len(),
parent_columns.len()
);
}
// deferrable semantics
let deferred = match defer_clause {
Some(d) => {
d.deferrable
&& matches!(
d.init_deferred,
Some(InitDeferredPred::InitiallyDeferred)
)
}
None => false, // NOT DEFERRABLE INITIALLY IMMEDIATE by default
};
let fk = ForeignKey {
parent_table,
parent_columns,
child_columns,
on_delete: clause
.args
.iter()
.find_map(|a| {
if let ast::RefArg::OnDelete(x) = a {
Some(*x)
} else {
None
}
})
.unwrap_or(RefAct::NoAction),
on_update: clause
.args
.iter()
.find_map(|a| {
if let ast::RefArg::OnUpdate(x) = a {
Some(*x)
} else {
None
}
})
.unwrap_or(RefAct::NoAction),
deferred,
};
foreign_keys.push(Arc::new(fk));
}
}
for ast::ColumnDefinition {
@@ -1259,7 +1591,7 @@ pub fn create_table(tbl_name: &str, body: &CreateTableBody, root_page: i64) -> R
let mut unique = false;
let mut collation = None;
for c_def in constraints {
match c_def.constraint {
match &c_def.constraint {
ast::ColumnConstraint::PrimaryKey {
order: o,
auto_increment,
@@ -1272,11 +1604,11 @@ pub fn create_table(tbl_name: &str, body: &CreateTableBody, root_page: i64) -> R
);
}
primary_key = true;
if auto_increment {
if *auto_increment {
has_autoincrement = true;
}
if let Some(o) = o {
order = o;
order = *o;
}
unique_sets.push(UniqueSet {
columns: vec![(name.clone(), order)],
@@ -1305,6 +1637,53 @@ pub fn create_table(tbl_name: &str, body: &CreateTableBody, root_page: i64) -> R
ast::ColumnConstraint::Collate { ref collation_name } => {
collation = Some(CollationSeq::new(collation_name.as_str())?);
}
ast::ColumnConstraint::ForeignKey {
clause,
defer_clause,
} => {
let fk = ForeignKey {
parent_table: normalize_ident(clause.tbl_name.as_str()),
parent_columns: clause
.columns
.iter()
.map(|c| normalize_ident(c.col_name.as_str()))
.collect(),
on_delete: clause
.args
.iter()
.find_map(|arg| {
if let ast::RefArg::OnDelete(act) = arg {
Some(*act)
} else {
None
}
})
.unwrap_or(RefAct::NoAction),
on_update: clause
.args
.iter()
.find_map(|arg| {
if let ast::RefArg::OnUpdate(act) = arg {
Some(*act)
} else {
None
}
})
.unwrap_or(RefAct::NoAction),
child_columns: vec![name.clone()],
deferred: match defer_clause {
Some(d) => {
d.deferrable
&& matches!(
d.init_deferred,
Some(InitDeferredPred::InitiallyDeferred)
)
}
None => false,
},
};
foreign_keys.push(Arc::new(fk));
}
_ => {}
}
}
@@ -1384,6 +1763,7 @@ pub fn create_table(tbl_name: &str, body: &CreateTableBody, root_page: i64) -> R
has_autoincrement,
columns: cols,
is_strict,
foreign_keys,
unique_sets: {
// If there are any unique sets that have identical column names in the same order (even if they are PRIMARY KEY and UNIQUE and have different sort orders), remove the duplicates.
// Examples:
@@ -1441,6 +1821,115 @@ pub fn _build_pseudo_table(columns: &[ResultColumn]) -> PseudoCursorType {
table
}
#[derive(Debug, Clone)]
pub struct ForeignKey {
/// Columns in this table (child side)
pub child_columns: Vec<String>,
/// Referenced (parent) table
pub parent_table: String,
/// Parent-side referenced columns
pub parent_columns: Vec<String>,
pub on_delete: RefAct,
pub on_update: RefAct,
/// DEFERRABLE INITIALLY DEFERRED
pub deferred: bool,
}
impl ForeignKey {
fn validate(&self) -> Result<()> {
// TODO: remove this when actions are implemented
if !(matches!(self.on_update, RefAct::NoAction)
&& matches!(self.on_delete, RefAct::NoAction))
{
crate::bail_parse_error!(
"foreign key actions other than NO ACTION are not implemented"
);
}
if self
.parent_columns
.iter()
.any(|c| ROWID_STRS.iter().any(|&r| r.eq_ignore_ascii_case(c)))
{
return Err(crate::LimboError::Constraint(format!(
"foreign key mismatch referencing \"{}\"",
self.parent_table
)));
}
Ok(())
}
}
/// A single resolved foreign key where `parent_table == target`.
#[derive(Clone, Debug)]
pub struct ResolvedFkRef {
/// Child table that owns the FK.
pub child_table: Arc<BTreeTable>,
/// The FK as declared on the child table.
pub fk: Arc<ForeignKey>,
/// Resolved, normalized column names.
pub parent_cols: Vec<String>,
pub child_cols: Vec<String>,
/// Column positions in the child/parent tables (pos_in_table)
pub child_pos: Vec<usize>,
pub parent_pos: Vec<usize>,
/// If the parent key is rowid or a rowid-alias (single-column only)
pub parent_uses_rowid: bool,
/// For non-rowid parents: the UNIQUE index that enforces the parent key.
/// (None when `parent_uses_rowid == true`.)
pub parent_unique_index: Option<Arc<Index>>,
}
impl ResolvedFkRef {
/// Returns if any referenced parent column can change when these column positions are updated.
pub fn parent_key_may_change(
&self,
updated_parent_positions: &HashSet<usize>,
parent_tbl: &BTreeTable,
) -> bool {
if self.parent_uses_rowid {
// parent rowid changes if the parent's rowid or alias is updated
if let Some((idx, _)) = parent_tbl
.columns
.iter()
.enumerate()
.find(|(_, c)| c.is_rowid_alias)
{
return updated_parent_positions.contains(&idx);
}
// Without a rowid alias, a direct rowid update is represented separately with ROWID_SENTINEL
return true;
}
self.parent_pos
.iter()
.any(|p| updated_parent_positions.contains(p))
}
/// Returns if any child column of this FK is in `updated_child_positions`
pub fn child_key_changed(
&self,
updated_child_positions: &HashSet<usize>,
child_tbl: &BTreeTable,
) -> bool {
if self
.child_pos
.iter()
.any(|p| updated_child_positions.contains(p))
{
return true;
}
// special case: if FK uses a rowid alias on child, and rowid changed
if self.child_cols.len() == 1 {
let (i, col) = child_tbl.get_column(&self.child_cols[0]).unwrap();
if col.is_rowid_alias && updated_child_positions.contains(&i) {
return true;
}
}
false
}
}
#[derive(Debug, Clone)]
pub struct Column {
pub name: Option<String>,
@@ -1782,6 +2271,7 @@ pub fn sqlite_schema_table() -> BTreeTable {
hidden: false,
},
],
foreign_keys: vec![],
unique_sets: vec![],
}
}
@@ -2392,6 +2882,7 @@ mod tests {
hidden: false,
}],
unique_sets: vec![],
foreign_keys: vec![],
};
let result =

View File

@@ -371,6 +371,7 @@ mod tests {
hidden: false,
}],
unique_sets: vec![],
foreign_keys: vec![],
})),
});
@@ -413,6 +414,7 @@ mod tests {
hidden: false,
}],
unique_sets: vec![],
foreign_keys: vec![],
})),
});
// Right table t2(id=2)
@@ -446,6 +448,7 @@ mod tests {
hidden: false,
}],
unique_sets: vec![],
foreign_keys: vec![],
})),
});
table_references
@@ -486,6 +489,7 @@ mod tests {
hidden: false,
}],
unique_sets: vec![],
foreign_keys: vec![],
})),
});
table_references

View File

@@ -1,6 +1,7 @@
// This module contains code for emitting bytecode instructions for SQL query execution.
// It handles translating high-level SQL operations into low-level bytecode that can be executed by the virtual machine.
use std::collections::HashSet;
use std::num::NonZeroUsize;
use std::sync::Arc;
@@ -29,6 +30,11 @@ use crate::translate::expr::{
emit_returning_results, translate_expr_no_constant_opt, walk_expr_mut, NoConstantOptReason,
ReturningValueRegisters, WalkControl,
};
use crate::translate::fkeys::{
build_index_affinity_string, emit_fk_child_update_counters,
emit_fk_delete_parent_existence_checks, emit_guarded_fk_decrement,
emit_parent_pk_change_checks, open_read_index, open_read_table, stabilize_new_row_for_fk,
};
use crate::translate::plan::{DeletePlan, JoinedTable, Plan, QueryDestination, Search};
use crate::translate::planner::ROWID_STRS;
use crate::translate::result_row::try_fold_expr_to_i64;
@@ -469,13 +475,149 @@ fn emit_program_for_delete(
None,
)?;
program.preassign_label_to_next_insn(after_main_loop_label);
// Finalize program
program.result_columns = plan.result_columns;
program.table_references.extend(plan.table_references);
Ok(())
}
pub fn emit_fk_child_decrement_on_delete(
program: &mut ProgramBuilder,
resolver: &Resolver,
child_tbl: &BTreeTable,
child_table_name: &str,
child_cursor_id: usize,
child_rowid_reg: usize,
) -> crate::Result<()> {
for fk_ref in resolver.schema.resolved_fks_for_child(child_table_name)? {
if !fk_ref.fk.deferred {
continue;
}
// Fast path: if any FK column is NULL can't be a violation
let null_skip = program.allocate_label();
for cname in &fk_ref.child_cols {
let (pos, col) = child_tbl.get_column(cname).unwrap();
let src = if col.is_rowid_alias {
child_rowid_reg
} else {
let tmp = program.alloc_register();
program.emit_insn(Insn::Column {
cursor_id: child_cursor_id,
column: pos,
dest: tmp,
default: None,
});
tmp
};
program.emit_insn(Insn::IsNull {
reg: src,
target_pc: null_skip,
});
}
if fk_ref.parent_uses_rowid {
// Probe parent table by rowid
let parent_tbl = resolver
.schema
.get_btree_table(&fk_ref.fk.parent_table)
.expect("parent btree");
let pcur = open_read_table(program, &parent_tbl);
let (pos, col) = child_tbl.get_column(&fk_ref.child_cols[0]).unwrap();
let val = if col.is_rowid_alias {
child_rowid_reg
} else {
let tmp = program.alloc_register();
program.emit_insn(Insn::Column {
cursor_id: child_cursor_id,
column: pos,
dest: tmp,
default: None,
});
tmp
};
let tmpi = program.alloc_register();
program.emit_insn(Insn::Copy {
src_reg: val,
dst_reg: tmpi,
extra_amount: 0,
});
program.emit_insn(Insn::MustBeInt { reg: tmpi });
// NotExists jumps when the parent key is missing, so we decrement there
let missing = program.allocate_label();
let done = program.allocate_label();
program.emit_insn(Insn::NotExists {
cursor: pcur,
rowid_reg: tmpi,
target_pc: missing,
});
// Parent FOUND, no decrement
program.emit_insn(Insn::Close { cursor_id: pcur });
program.emit_insn(Insn::Goto { target_pc: done });
// Parent MISSING, decrement is guarded by FkIfZero to avoid underflow
program.preassign_label_to_next_insn(missing);
program.emit_insn(Insn::Close { cursor_id: pcur });
emit_guarded_fk_decrement(program, done);
program.preassign_label_to_next_insn(done);
} else {
// Probe parent unique index
let parent_tbl = resolver
.schema
.get_btree_table(&fk_ref.fk.parent_table)
.expect("parent btree");
let idx = fk_ref.parent_unique_index.as_ref().expect("unique index");
let icur = open_read_index(program, idx);
// Build probe from current child row
let n = fk_ref.child_cols.len();
let probe = program.alloc_registers(n);
for (i, cname) in fk_ref.child_cols.iter().enumerate() {
let (pos, col) = child_tbl.get_column(cname).unwrap();
let src = if col.is_rowid_alias {
child_rowid_reg
} else {
let r = program.alloc_register();
program.emit_insn(Insn::Column {
cursor_id: child_cursor_id,
column: pos,
dest: r,
default: None,
});
r
};
program.emit_insn(Insn::Copy {
src_reg: src,
dst_reg: probe + i,
extra_amount: 0,
});
}
program.emit_insn(Insn::Affinity {
start_reg: probe,
count: std::num::NonZeroUsize::new(n).unwrap(),
affinities: build_index_affinity_string(idx, &parent_tbl),
});
let ok = program.allocate_label();
program.emit_insn(Insn::Found {
cursor_id: icur,
target_pc: ok,
record_reg: probe,
num_regs: n,
});
program.emit_insn(Insn::Close { cursor_id: icur });
emit_guarded_fk_decrement(program, ok);
program.preassign_label_to_next_insn(ok);
program.emit_insn(Insn::Close { cursor_id: icur });
}
program.preassign_label_to_next_insn(null_skip);
}
Ok(())
}
fn emit_delete_insns(
connection: &Arc<Connection>,
program: &mut ProgramBuilder,
@@ -514,6 +656,34 @@ fn emit_delete_insns(
dest: key_reg,
});
if connection.foreign_keys_enabled() {
if let Some(table) = unsafe { &*table_reference }.btree() {
if t_ctx
.resolver
.schema
.any_resolved_fks_referencing(table_name)
{
emit_fk_delete_parent_existence_checks(
program,
&t_ctx.resolver,
table_name,
main_table_cursor_id,
key_reg,
)?;
}
if t_ctx.resolver.schema.has_child_fks(table_name) {
emit_fk_child_decrement_on_delete(
program,
&t_ctx.resolver,
&table,
table_name,
main_table_cursor_id,
key_reg,
)?;
}
}
}
if unsafe { &*table_reference }.virtual_table().is_some() {
let conflict_action = 0u16;
let start_reg = key_reg;
@@ -802,7 +972,6 @@ fn emit_program_for_update(
)?;
program.preassign_label_to_next_insn(after_main_loop_label);
after(program);
program.result_columns = plan.returning.unwrap_or_default();
@@ -1067,6 +1236,59 @@ fn emit_update_insns(
}
}
if connection.foreign_keys_enabled() {
let rowid_new_reg = rowid_set_clause_reg.unwrap_or(beg);
if let Some(table_btree) = unsafe { &*table_ref }.btree() {
stabilize_new_row_for_fk(
program,
&table_btree,
&plan.set_clauses,
cursor_id,
start,
rowid_new_reg,
)?;
if t_ctx.resolver.schema.has_child_fks(table_name) {
// Child-side checks:
// this ensures updated row still satisfies child FKs that point OUT from this table
emit_fk_child_update_counters(
program,
&t_ctx.resolver,
&table_btree,
table_name,
cursor_id,
start,
rowid_new_reg,
&plan
.set_clauses
.iter()
.map(|(i, _)| *i)
.collect::<HashSet<_>>(),
)?;
}
// Parent-side checks:
// We only need to do work if the referenced key (the parent key) might change.
// we detect that by comparing OLD vs NEW primary key representation
// then run parent FK checks only when it actually changes.
if t_ctx
.resolver
.schema
.any_resolved_fks_referencing(table_name)
{
emit_parent_pk_change_checks(
program,
&t_ctx.resolver,
&table_btree,
cursor_id,
beg,
start,
rowid_new_reg,
rowid_set_clause_reg,
&plan.set_clauses,
)?;
}
}
}
for (index, (idx_cursor_id, record_reg)) in plan.indexes_to_update.iter().zip(&index_cursors) {
// We need to know whether or not the OLD values satisfied the predicate on the
// partial index, so we can know whether or not to delete the old index entry,

991
core/translate/fkeys.rs Normal file
View File

@@ -0,0 +1,991 @@
use turso_parser::ast::Expr;
use super::ProgramBuilder;
use crate::{
schema::{BTreeTable, ForeignKey, Index, ResolvedFkRef, ROWID_SENTINEL},
translate::{emitter::Resolver, planner::ROWID_STRS},
vdbe::{
builder::CursorType,
insn::{CmpInsFlags, Insn},
BranchOffset,
},
Result,
};
use std::{collections::HashSet, num::NonZeroUsize, sync::Arc};
#[inline]
pub fn emit_guarded_fk_decrement(program: &mut ProgramBuilder, label: BranchOffset) {
program.emit_insn(Insn::FkIfZero {
is_scope: false,
target_pc: label,
});
program.emit_insn(Insn::FkCounter {
increment_value: -1,
is_scope: false,
});
}
/// Open a read cursor on an index and return its cursor id.
#[inline]
pub fn open_read_index(program: &mut ProgramBuilder, idx: &Arc<Index>) -> usize {
let icur = program.alloc_cursor_id(CursorType::BTreeIndex(idx.clone()));
program.emit_insn(Insn::OpenRead {
cursor_id: icur,
root_page: idx.root_page,
db: 0,
});
icur
}
/// Open a read cursor on a table and return its cursor id.
#[inline]
pub fn open_read_table(program: &mut ProgramBuilder, tbl: &Arc<BTreeTable>) -> usize {
let tcur = program.alloc_cursor_id(CursorType::BTreeTable(tbl.clone()));
program.emit_insn(Insn::OpenRead {
cursor_id: tcur,
root_page: tbl.root_page,
db: 0,
});
tcur
}
/// Copy `len` registers starting at `src_start` to a fresh block and apply index affinities.
/// Returns the destination start register.
#[inline]
fn copy_with_affinity(
program: &mut ProgramBuilder,
src_start: usize,
len: usize,
idx: &Index,
aff_from_tbl: &BTreeTable,
) -> usize {
let dst = program.alloc_registers(len);
for i in 0..len {
program.emit_insn(Insn::Copy {
src_reg: src_start + i,
dst_reg: dst + i,
extra_amount: 0,
});
}
if let Some(count) = NonZeroUsize::new(len) {
program.emit_insn(Insn::Affinity {
start_reg: dst,
count,
affinities: build_index_affinity_string(idx, aff_from_tbl),
});
}
dst
}
/// Issue an index probe using `Found`/`NotFound` and route to `on_found`/`on_not_found`.
pub fn index_probe<F, G>(
program: &mut ProgramBuilder,
icur: usize,
record_reg: usize,
num_regs: usize,
mut on_found: F,
mut on_not_found: G,
) -> Result<()>
where
F: FnMut(&mut ProgramBuilder) -> Result<()>,
G: FnMut(&mut ProgramBuilder) -> Result<()>,
{
let lbl_found = program.allocate_label();
let lbl_join = program.allocate_label();
program.emit_insn(Insn::Found {
cursor_id: icur,
target_pc: lbl_found,
record_reg,
num_regs,
});
// NOT FOUND path
on_not_found(program)?;
program.emit_insn(Insn::Goto {
target_pc: lbl_join,
});
// FOUND path
program.preassign_label_to_next_insn(lbl_found);
on_found(program)?;
// Join & close once
program.preassign_label_to_next_insn(lbl_join);
program.emit_insn(Insn::Close { cursor_id: icur });
Ok(())
}
/// Iterate a table and call `on_match` when all child columns equal the key at `parent_key_start`.
/// Skips rows where any FK column is NULL. If `self_exclude_rowid` is Some, the row with that rowid is skipped.
fn table_scan_match_any<F>(
program: &mut ProgramBuilder,
child_tbl: &Arc<BTreeTable>,
child_cols: &[String],
parent_key_start: usize,
self_exclude_rowid: Option<usize>,
mut on_match: F,
) -> Result<()>
where
F: FnMut(&mut ProgramBuilder) -> Result<()>,
{
let ccur = open_read_table(program, child_tbl);
let done = program.allocate_label();
program.emit_insn(Insn::Rewind {
cursor_id: ccur,
pc_if_empty: done,
});
let loop_top = program.allocate_label();
program.preassign_label_to_next_insn(loop_top);
let next_row = program.allocate_label();
// Compare each FK column to parent key component.
for (i, cname) in child_cols.iter().enumerate() {
let (pos, _) = child_tbl.get_column(cname).ok_or_else(|| {
crate::LimboError::InternalError(format!("child col {cname} missing"))
})?;
let tmp = program.alloc_register();
program.emit_insn(Insn::Column {
cursor_id: ccur,
column: pos,
dest: tmp,
default: None,
});
program.emit_insn(Insn::IsNull {
reg: tmp,
target_pc: next_row,
});
let cont = program.allocate_label();
program.emit_insn(Insn::Eq {
lhs: tmp,
rhs: parent_key_start + i,
target_pc: cont,
flags: CmpInsFlags::default().jump_if_null(),
collation: Some(super::collate::CollationSeq::Binary),
});
program.emit_insn(Insn::Goto {
target_pc: next_row,
});
program.preassign_label_to_next_insn(cont);
}
//self-reference exclusion on rowid
if let Some(parent_rowid) = self_exclude_rowid {
let child_rowid = program.alloc_register();
let skip = program.allocate_label();
program.emit_insn(Insn::RowId {
cursor_id: ccur,
dest: child_rowid,
});
program.emit_insn(Insn::Eq {
lhs: child_rowid,
rhs: parent_rowid,
target_pc: skip,
flags: CmpInsFlags::default(),
collation: None,
});
on_match(program)?;
program.preassign_label_to_next_insn(skip);
} else {
on_match(program)?;
}
program.preassign_label_to_next_insn(next_row);
program.emit_insn(Insn::Next {
cursor_id: ccur,
pc_if_next: loop_top,
});
program.preassign_label_to_next_insn(done);
program.emit_insn(Insn::Close { cursor_id: ccur });
Ok(())
}
/// Build the index affinity mask string (one char per indexed column).
#[inline]
pub fn build_index_affinity_string(idx: &Index, table: &BTreeTable) -> String {
idx.columns
.iter()
.map(|ic| table.columns[ic.pos_in_table].affinity().aff_mask())
.collect()
}
/// For deferred FKs: increment the global counter; for immediate FKs: halt with FK error.
pub fn emit_fk_violation(program: &mut ProgramBuilder, fk: &ForeignKey) -> Result<()> {
if fk.deferred {
program.emit_insn(Insn::FkCounter {
increment_value: 1,
is_scope: false,
});
} else {
program.emit_insn(Insn::Halt {
err_code: crate::error::SQLITE_CONSTRAINT_FOREIGNKEY,
description: "FOREIGN KEY constraint failed".to_string(),
});
}
Ok(())
}
/// Stabilize the NEW row image for FK checks (UPDATE):
/// fill in unmodified PK columns from the current row so the NEW PK vector is complete.
pub fn stabilize_new_row_for_fk(
program: &mut ProgramBuilder,
table_btree: &BTreeTable,
set_clauses: &[(usize, Box<Expr>)],
cursor_id: usize,
start: usize,
rowid_new_reg: usize,
) -> Result<()> {
if table_btree.primary_key_columns.is_empty() {
return Ok(());
}
let set_cols: HashSet<usize> = set_clauses
.iter()
.filter_map(|(i, _)| if *i == ROWID_SENTINEL { None } else { Some(*i) })
.collect();
for (pk_name, _) in &table_btree.primary_key_columns {
let (pos, col) = table_btree
.get_column(pk_name)
.ok_or_else(|| crate::LimboError::InternalError(format!("pk col {pk_name} missing")))?;
if !set_cols.contains(&pos) {
if col.is_rowid_alias {
program.emit_insn(Insn::Copy {
src_reg: rowid_new_reg,
dst_reg: start + pos,
extra_amount: 0,
});
} else {
program.emit_insn(Insn::Column {
cursor_id,
column: pos,
dest: start + pos,
default: None,
});
}
}
}
Ok(())
}
/// Parent-side checks when the parent PK might change (UPDATE on parent):
/// Detect if any child references the OLD key (potential violation), and if any references the NEW key
/// (which cancels one potential violation). For composite PKs this builds OLD/NEW vectors first.
#[allow(clippy::too_many_arguments)]
pub fn emit_parent_pk_change_checks(
program: &mut ProgramBuilder,
resolver: &Resolver,
table_btree: &BTreeTable,
cursor_id: usize,
old_rowid_reg: usize,
start: usize,
rowid_new_reg: usize,
rowid_set_clause_reg: Option<usize>,
set_clauses: &[(usize, Box<Expr>)],
) -> Result<()> {
let updated_positions: HashSet<usize> = set_clauses.iter().map(|(i, _)| *i).collect();
let incoming = resolver
.schema
.resolved_fks_referencing(&table_btree.name)?;
let affects_pk = incoming
.iter()
.any(|r| r.parent_key_may_change(&updated_positions, table_btree));
if !affects_pk {
return Ok(());
}
match table_btree.primary_key_columns.len() {
0 => emit_rowid_pk_change_check(
program,
&incoming,
resolver,
old_rowid_reg,
rowid_set_clause_reg.unwrap_or(old_rowid_reg),
),
1 => emit_single_pk_change_check(
program,
&incoming,
resolver,
table_btree,
cursor_id,
start,
rowid_new_reg,
),
_ => emit_composite_pk_change_check(
program,
&incoming,
resolver,
table_btree,
cursor_id,
old_rowid_reg,
start,
rowid_new_reg,
),
}
}
/// Rowid-table parent PK change: compare rowid OLD vs NEW; if changed, run two-pass counters.
pub fn emit_rowid_pk_change_check(
program: &mut ProgramBuilder,
incoming: &[ResolvedFkRef],
resolver: &Resolver,
old_rowid_reg: usize,
new_rowid_reg: usize,
) -> Result<()> {
let skip = program.allocate_label();
program.emit_insn(Insn::Eq {
lhs: new_rowid_reg,
rhs: old_rowid_reg,
target_pc: skip,
flags: CmpInsFlags::default(),
collation: None,
});
let old_pk = program.alloc_register();
let new_pk = program.alloc_register();
program.emit_insn(Insn::Copy {
src_reg: old_rowid_reg,
dst_reg: old_pk,
extra_amount: 0,
});
program.emit_insn(Insn::Copy {
src_reg: new_rowid_reg,
dst_reg: new_pk,
extra_amount: 0,
});
emit_fk_parent_pk_change_counters(program, incoming, resolver, old_pk, new_pk, 1)?;
program.preassign_label_to_next_insn(skip);
Ok(())
}
/// Single-column PK parent change: load OLD and NEW; if changed, run two-pass counters.
pub fn emit_single_pk_change_check(
program: &mut ProgramBuilder,
incoming: &[ResolvedFkRef],
resolver: &Resolver,
table_btree: &BTreeTable,
cursor_id: usize,
start: usize,
rowid_new_reg: usize,
) -> Result<()> {
let (pk_name, _) = &table_btree.primary_key_columns[0];
let (pos, col) = table_btree.get_column(pk_name).unwrap();
let old_reg = program.alloc_register();
if col.is_rowid_alias {
program.emit_insn(Insn::RowId {
cursor_id,
dest: old_reg,
});
} else {
program.emit_insn(Insn::Column {
cursor_id,
column: pos,
dest: old_reg,
default: None,
});
}
let new_reg = if col.is_rowid_alias {
rowid_new_reg
} else {
start + pos
};
let skip = program.allocate_label();
program.emit_insn(Insn::Eq {
lhs: old_reg,
rhs: new_reg,
target_pc: skip,
flags: CmpInsFlags::default(),
collation: None,
});
let old_pk = program.alloc_register();
let new_pk = program.alloc_register();
program.emit_insn(Insn::Copy {
src_reg: old_reg,
dst_reg: old_pk,
extra_amount: 0,
});
program.emit_insn(Insn::Copy {
src_reg: new_reg,
dst_reg: new_pk,
extra_amount: 0,
});
emit_fk_parent_pk_change_counters(program, incoming, resolver, old_pk, new_pk, 1)?;
program.preassign_label_to_next_insn(skip);
Ok(())
}
/// Composite-PK parent change: build OLD/NEW vectors; if any component differs, run two-pass counters.
#[allow(clippy::too_many_arguments)]
pub fn emit_composite_pk_change_check(
program: &mut ProgramBuilder,
incoming: &[ResolvedFkRef],
resolver: &Resolver,
table_btree: &BTreeTable,
cursor_id: usize,
old_rowid_reg: usize,
start: usize,
rowid_new_reg: usize,
) -> Result<()> {
let pk_len = table_btree.primary_key_columns.len();
let old_pk = program.alloc_registers(pk_len);
for (i, (pk_name, _)) in table_btree.primary_key_columns.iter().enumerate() {
let (pos, col) = table_btree.get_column(pk_name).unwrap();
if col.is_rowid_alias {
program.emit_insn(Insn::Copy {
src_reg: old_rowid_reg,
dst_reg: old_pk + i,
extra_amount: 0,
});
} else {
program.emit_insn(Insn::Column {
cursor_id,
column: pos,
dest: old_pk + i,
default: None,
});
}
}
let new_pk = program.alloc_registers(pk_len);
for (i, (pk_name, _)) in table_btree.primary_key_columns.iter().enumerate() {
let (pos, col) = table_btree.get_column(pk_name).unwrap();
let src = if col.is_rowid_alias {
rowid_new_reg
} else {
start + pos
};
program.emit_insn(Insn::Copy {
src_reg: src,
dst_reg: new_pk + i,
extra_amount: 0,
});
}
let skip = program.allocate_label();
let changed = program.allocate_label();
for i in 0..pk_len {
let next = if i + 1 == pk_len {
None
} else {
Some(program.allocate_label())
};
program.emit_insn(Insn::Eq {
lhs: old_pk + i,
rhs: new_pk + i,
target_pc: next.unwrap_or(skip),
flags: CmpInsFlags::default(),
collation: None,
});
program.emit_insn(Insn::Goto { target_pc: changed });
if let Some(n) = next {
program.preassign_label_to_next_insn(n);
}
}
program.preassign_label_to_next_insn(changed);
emit_fk_parent_pk_change_counters(program, incoming, resolver, old_pk, new_pk, pk_len)?;
program.preassign_label_to_next_insn(skip);
Ok(())
}
/// Two-pass parent-side maintenance for UPDATE of a parent key:
/// 1. Probe child for OLD key, increment deferred counter if any references exist.
/// 2. Probe child for NEW key, guarded decrement cancels exactly one increment if present
pub fn emit_fk_parent_pk_change_counters(
program: &mut ProgramBuilder,
incoming: &[ResolvedFkRef],
resolver: &Resolver,
old_pk_start: usize,
new_pk_start: usize,
n_cols: usize,
) -> Result<()> {
for fk_ref in incoming {
emit_fk_parent_key_probe(
program,
resolver,
fk_ref,
old_pk_start,
n_cols,
ParentProbePass::Old,
)?;
emit_fk_parent_key_probe(
program,
resolver,
fk_ref,
new_pk_start,
n_cols,
ParentProbePass::New,
)?;
}
Ok(())
}
#[derive(Clone, Copy)]
enum ParentProbePass {
Old,
New,
}
/// Probe the child side for a given parent key
fn emit_fk_parent_key_probe(
program: &mut ProgramBuilder,
resolver: &Resolver,
fk_ref: &ResolvedFkRef,
parent_key_start: usize,
n_cols: usize,
pass: ParentProbePass,
) -> Result<()> {
let child_tbl = &fk_ref.child_table;
let child_cols = &fk_ref.fk.child_columns;
let is_deferred = fk_ref.fk.deferred;
let on_match = |p: &mut ProgramBuilder| -> Result<()> {
match (is_deferred, pass) {
// OLD key referenced by a child
(false, ParentProbePass::Old) => {
// Immediate FK: fail now.
emit_fk_violation(p, &fk_ref.fk)?; // HALT for immediate
}
(true, ParentProbePass::Old) => {
// Deferred FK: increment counter.
p.emit_insn(Insn::FkCounter {
increment_value: 1,
is_scope: false,
});
}
// NEW key referenced by a child (cancel one deferred violation)
(true, ParentProbePass::New) => {
// Guard to avoid underflow if OLD pass didn't increment.
let skip = p.allocate_label();
emit_guarded_fk_decrement(p, skip);
p.preassign_label_to_next_insn(skip);
}
// Immediate FK on NEW pass: nothing to cancel; do nothing.
(false, ParentProbePass::New) => {}
}
Ok(())
};
// Prefer exact child index on (child_cols...)
let idx = resolver.schema.get_indices(&child_tbl.name).find(|ix| {
ix.columns.len() == child_cols.len()
&& ix
.columns
.iter()
.zip(child_cols.iter())
.all(|(ic, cc)| ic.name.eq_ignore_ascii_case(cc))
});
if let Some(ix) = idx {
let icur = open_read_index(program, ix);
let probe = copy_with_affinity(program, parent_key_start, n_cols, ix, child_tbl);
// FOUND => on_match; NOT FOUND => no-op
index_probe(program, icur, probe, n_cols, on_match, |_p| Ok(()))?;
} else {
// Table scan fallback
table_scan_match_any(
program,
child_tbl,
child_cols,
parent_key_start,
None,
on_match,
)?;
}
Ok(())
}
/// Build a parent key vector (in FK parent-column order) into `dest_start`.
/// Handles rowid aliasing and explicit ROWID names; uses current row for non-rowid columns.
fn build_parent_key(
program: &mut ProgramBuilder,
parent_bt: &BTreeTable,
parent_cols: &[String],
parent_cursor_id: usize,
parent_rowid_reg: usize,
dest_start: usize,
) -> Result<()> {
for (i, pcol) in parent_cols.iter().enumerate() {
let src = if ROWID_STRS.iter().any(|s| pcol.eq_ignore_ascii_case(s)) {
parent_rowid_reg
} else {
let (pos, col) = parent_bt
.get_column(pcol)
.ok_or_else(|| crate::LimboError::InternalError(format!("col {pcol} missing")))?;
if col.is_rowid_alias {
parent_rowid_reg
} else {
program.emit_insn(Insn::Column {
cursor_id: parent_cursor_id,
column: pos,
dest: dest_start + i,
default: None,
});
continue;
}
};
program.emit_insn(Insn::Copy {
src_reg: src,
dst_reg: dest_start + i,
extra_amount: 0,
});
}
Ok(())
}
/// Child-side FK maintenance for UPDATE/UPSERT:
/// If any FK columns of this child row changed:
/// Pass 1 (OLD tuple): if OLD is non-NULL and parent is missing: decrement deferred counter (guarded).
/// Pass 2 (NEW tuple): if NEW is non-NULL and parent is missing: immediate error or deferred(+1).
#[allow(clippy::too_many_arguments)]
pub fn emit_fk_child_update_counters(
program: &mut ProgramBuilder,
resolver: &Resolver,
child_tbl: &BTreeTable,
child_table_name: &str,
child_cursor_id: usize,
new_start_reg: usize,
new_rowid_reg: usize,
updated_cols: &HashSet<usize>,
) -> crate::Result<()> {
// Helper: materialize OLD tuple for this FK; returns (start_reg, ncols) or None if any component is NULL.
let load_old_tuple =
|program: &mut ProgramBuilder, fk_cols: &[String]| -> Option<(usize, usize)> {
let n = fk_cols.len();
let start = program.alloc_registers(n);
let null_jmp = program.allocate_label();
for (k, cname) in fk_cols.iter().enumerate() {
let (pos, _col) = match child_tbl.get_column(cname) {
Some(v) => v,
None => {
return None;
}
};
program.emit_column_or_rowid(child_cursor_id, pos, start + k);
program.emit_insn(Insn::IsNull {
reg: start + k,
target_pc: null_jmp,
});
}
// No NULLs, proceed
let cont = program.allocate_label();
program.emit_insn(Insn::Goto { target_pc: cont });
// NULL encountered: invalidate tuple by jumping here
program.preassign_label_to_next_insn(null_jmp);
program.preassign_label_to_next_insn(cont);
Some((start, n))
};
for fk_ref in resolver.schema.resolved_fks_for_child(child_table_name)? {
// If the child-side FK columns did not change, there is nothing to do.
if !fk_ref.child_key_changed(updated_cols, child_tbl) {
continue;
}
let ncols = fk_ref.child_cols.len();
// Pass 1: OLD tuple handling only for deferred FKs
if fk_ref.fk.deferred {
if let Some((old_start, _)) = load_old_tuple(program, &fk_ref.child_cols) {
if fk_ref.parent_uses_rowid {
// Parent key is rowid: probe parent table by rowid
let parent_tbl = resolver
.schema
.get_btree_table(&fk_ref.fk.parent_table)
.expect("parent btree");
let pcur = open_read_table(program, &parent_tbl);
// first FK col is the rowid value
let rid = program.alloc_register();
program.emit_insn(Insn::Copy {
src_reg: old_start,
dst_reg: rid,
extra_amount: 0,
});
program.emit_insn(Insn::MustBeInt { reg: rid });
// If NOT exists => decrement
let miss = program.allocate_label();
program.emit_insn(Insn::NotExists {
cursor: pcur,
rowid_reg: rid,
target_pc: miss,
});
// found: close & continue
let join = program.allocate_label();
program.emit_insn(Insn::Close { cursor_id: pcur });
program.emit_insn(Insn::Goto { target_pc: join });
// missing: guarded decrement
program.preassign_label_to_next_insn(miss);
program.emit_insn(Insn::Close { cursor_id: pcur });
let skip = program.allocate_label();
emit_guarded_fk_decrement(program, skip);
program.preassign_label_to_next_insn(skip);
program.preassign_label_to_next_insn(join);
} else {
// Parent key is a unique index: use index probe and guarded decrement on NOT FOUND
let parent_tbl = resolver
.schema
.get_btree_table(&fk_ref.fk.parent_table)
.expect("parent btree");
let idx = fk_ref
.parent_unique_index
.as_ref()
.expect("parent unique index required");
let icur = open_read_index(program, idx);
// Copy OLD tuple and apply parent index affinities
let probe = copy_with_affinity(program, old_start, ncols, idx, &parent_tbl);
// Found: nothing; Not found: guarded decrement
index_probe(
program,
icur,
probe,
ncols,
|_p| Ok(()),
|p| {
let skip = p.allocate_label();
emit_guarded_fk_decrement(p, skip);
p.preassign_label_to_next_insn(skip);
Ok(())
},
)?;
}
}
}
// Pass 2: NEW tuple handling
let fk_ok = program.allocate_label();
for cname in &fk_ref.fk.child_columns {
let (i, col) = child_tbl.get_column(cname).unwrap();
let src = if col.is_rowid_alias {
new_rowid_reg
} else {
new_start_reg + i
};
program.emit_insn(Insn::IsNull {
reg: src,
target_pc: fk_ok,
});
}
if fk_ref.parent_uses_rowid {
let parent_tbl = resolver
.schema
.get_btree_table(&fk_ref.fk.parent_table)
.expect("parent btree");
let pcur = open_read_table(program, &parent_tbl);
// Take the first child column value from NEW image
let (i_child, col_child) = child_tbl.get_column(&fk_ref.child_cols[0]).unwrap();
let val_reg = if col_child.is_rowid_alias {
new_rowid_reg
} else {
new_start_reg + i_child
};
let tmp = program.alloc_register();
program.emit_insn(Insn::Copy {
src_reg: val_reg,
dst_reg: tmp,
extra_amount: 0,
});
program.emit_insn(Insn::MustBeInt { reg: tmp });
let violation = program.allocate_label();
program.emit_insn(Insn::NotExists {
cursor: pcur,
rowid_reg: tmp,
target_pc: violation,
});
// found: close and continue
program.emit_insn(Insn::Close { cursor_id: pcur });
program.emit_insn(Insn::Goto { target_pc: fk_ok });
// missing: violation (immediate HALT or deferred +1)
program.preassign_label_to_next_insn(violation);
program.emit_insn(Insn::Close { cursor_id: pcur });
emit_fk_violation(program, &fk_ref.fk)?;
} else {
let parent_tbl = resolver
.schema
.get_btree_table(&fk_ref.fk.parent_table)
.expect("parent btree");
let idx = fk_ref
.parent_unique_index
.as_ref()
.expect("parent unique index required");
let icur = open_read_index(program, idx);
// Build NEW probe (in FK child column order, aligns with parent index columns)
let probe = {
let start = program.alloc_registers(ncols);
for (k, cname) in fk_ref.child_cols.iter().enumerate() {
let (i, col) = child_tbl.get_column(cname).unwrap();
program.emit_insn(Insn::Copy {
src_reg: if col.is_rowid_alias {
new_rowid_reg
} else {
new_start_reg + i
},
dst_reg: start + k,
extra_amount: 0,
});
}
// Apply affinities of the parent index/table
if let Some(cnt) = NonZeroUsize::new(ncols) {
program.emit_insn(Insn::Affinity {
start_reg: start,
count: cnt,
affinities: build_index_affinity_string(idx, &parent_tbl),
});
}
start
};
// FOUND: ok; NOT FOUND: violation path
index_probe(
program,
icur,
probe,
ncols,
|_p| Ok(()),
|p| {
emit_fk_violation(p, &fk_ref.fk)?;
Ok(())
},
)?;
program.emit_insn(Insn::Goto { target_pc: fk_ok });
}
// Skip label for NEW tuple NULL short-circuit
program.preassign_label_to_next_insn(fk_ok);
}
Ok(())
}
/// Prevent deleting a parent row that is still referenced by any child.
/// For each incoming FK referencing `parent_table_name`:
/// 1. Build the parent key vector from the current parent row (FK parent-column order,
/// or the table's PK columns when the FK omits parent columns).
/// 2. Look for referencing child rows:
/// - Prefer an exact child index on (child_columns...). If found, probe the index.
/// - Otherwise scan the child table. For self-referential FKs, exclude the current rowid.
/// 3. If a referencing child exists:
/// - Immediate FK: HALT with SQLITE_CONSTRAINT_FOREIGNKEY
/// - Deferred FK: FkCounter +1
pub fn emit_fk_delete_parent_existence_checks(
program: &mut ProgramBuilder,
resolver: &Resolver,
parent_table_name: &str,
parent_cursor_id: usize,
parent_rowid_reg: usize,
) -> Result<()> {
let parent_bt = resolver
.schema
.get_btree_table(parent_table_name)
.ok_or_else(|| crate::LimboError::InternalError("parent not btree".into()))?;
for fk_ref in resolver
.schema
.resolved_fks_referencing(parent_table_name)?
{
let is_self_ref = fk_ref
.child_table
.name
.eq_ignore_ascii_case(parent_table_name);
// Build parent key in FK's parent-column order (or table PK columns if unspecified).
let parent_cols: Vec<String> = if fk_ref.fk.parent_columns.is_empty() {
parent_bt
.primary_key_columns
.iter()
.map(|(n, _)| n.clone())
.collect()
} else {
fk_ref.fk.parent_columns.clone()
};
let ncols = parent_cols.len();
let parent_key_start = program.alloc_registers(ncols);
build_parent_key(
program,
&parent_bt,
&parent_cols,
parent_cursor_id,
parent_rowid_reg,
parent_key_start,
)?;
// Try an exact child index on (child_columns...) if available and not self-ref
let child_cols = &fk_ref.fk.child_columns;
let child_idx = if !is_self_ref {
resolver
.schema
.get_indices(&fk_ref.child_table.name)
.find(|idx| {
idx.columns.len() == child_cols.len()
&& idx
.columns
.iter()
.zip(child_cols.iter())
.all(|(ic, cc)| ic.name.eq_ignore_ascii_case(cc))
})
} else {
None
};
if let Some(idx) = child_idx {
// Index probe: FOUND => violation; NOT FOUND => ok.
let icur = open_read_index(program, idx);
let probe =
copy_with_affinity(program, parent_key_start, ncols, idx, &fk_ref.child_table);
index_probe(
program,
icur,
probe,
ncols,
|p| {
emit_fk_violation(p, &fk_ref.fk)?;
Ok(())
},
|_p| Ok(()),
)?;
} else {
// Table scan fallback; for self-ref, exclude the same parent row by rowid.
table_scan_match_any(
program,
&fk_ref.child_table,
child_cols,
parent_key_start,
if is_self_ref {
Some(parent_rowid_reg)
} else {
None
},
|p| {
emit_fk_violation(p, &fk_ref.fk)?;
Ok(())
},
)?;
}
}
Ok(())
}

View File

@@ -8,7 +8,7 @@ use turso_parser::ast::{
use crate::error::{
SQLITE_CONSTRAINT_NOTNULL, SQLITE_CONSTRAINT_PRIMARYKEY, SQLITE_CONSTRAINT_UNIQUE,
};
use crate::schema::{self, Affinity, Index, Table};
use crate::schema::{self, Affinity, BTreeTable, Index, ResolvedFkRef, Table};
use crate::translate::emitter::{
emit_cdc_insns, emit_cdc_patch_record, prepare_cdc_if_necessary, OperationMode,
};
@@ -16,6 +16,10 @@ use crate::translate::expr::{
bind_and_rewrite_expr, emit_returning_results, process_returning_clause, walk_expr_mut,
BindingBehavior, ReturningValueRegisters, WalkControl,
};
use crate::translate::fkeys::{
build_index_affinity_string, emit_fk_violation, emit_guarded_fk_decrement, index_probe,
open_read_index, open_read_table,
};
use crate::translate::plan::TableReferences;
use crate::translate::planner::ROWID_STRS;
use crate::translate::upsert::{
@@ -23,7 +27,7 @@ use crate::translate::upsert::{
};
use crate::util::normalize_ident;
use crate::vdbe::builder::ProgramBuilderOpts;
use crate::vdbe::insn::{IdxInsertFlags, InsertFlags, RegisterOrLiteral};
use crate::vdbe::insn::{CmpInsFlags, IdxInsertFlags, InsertFlags, RegisterOrLiteral};
use crate::vdbe::BranchOffset;
use crate::{
schema::{Column, Schema},
@@ -93,6 +97,7 @@ pub fn translate_insert(
Some(table) => table,
None => crate::bail_parse_error!("no such table: {}", table_name),
};
let fk_enabled = connection.foreign_keys_enabled();
// Check if this is a materialized view
if resolver.schema.is_materialized_view(table_name.as_str()) {
@@ -222,6 +227,8 @@ pub fn translate_insert(
let halt_label = program.allocate_label();
let loop_start_label = program.allocate_label();
let row_done_label = program.allocate_label();
let stmt_epilogue = program.allocate_label();
let mut select_exhausted_label: Option<BranchOffset> = None;
let cdc_table = prepare_cdc_if_necessary(&mut program, resolver.schema, table.get_name())?;
@@ -234,6 +241,11 @@ pub fn translate_insert(
connection,
)?;
let has_fks = fk_enabled
&& (resolver.schema.has_child_fks(table_name.as_str())
|| resolver
.schema
.any_resolved_fks_referencing(table_name.as_str()));
let mut yield_reg_opt = None;
let mut temp_table_ctx = None;
let (num_values, cursor_id) = match body {
@@ -254,7 +266,6 @@ pub fn translate_insert(
jump_on_definition: jump_on_definition_label,
start_offset: start_offset_label,
});
program.preassign_label_to_next_insn(start_offset_label);
let query_destination = QueryDestination::CoroutineYield {
@@ -298,18 +309,14 @@ pub fn translate_insert(
});
// Main loop
// FIXME: rollback is not implemented. E.g. if you insert 2 rows and one fails to unique constraint violation,
// the other row will still be inserted.
program.preassign_label_to_next_insn(loop_start_label);
let yield_label = program.allocate_label();
program.emit_insn(Insn::Yield {
yield_reg,
end_offset: yield_label,
end_offset: yield_label, // stays local, well route at loop end
});
let record_reg = program.alloc_register();
let record_reg = program.alloc_register();
let affinity_str = if columns.is_empty() {
btree_table
.columns
@@ -352,7 +359,6 @@ pub fn translate_insert(
rowid_reg,
prev_largest_reg: 0,
});
program.emit_insn(Insn::Insert {
cursor: temp_cursor_id,
key_reg: rowid_reg,
@@ -361,12 +367,10 @@ pub fn translate_insert(
flag: InsertFlags::new().require_seek(),
table_name: "".to_string(),
});
// loop back
program.emit_insn(Insn::Goto {
target_pc: loop_start_label,
});
program.preassign_label_to_next_insn(yield_label);
program.emit_insn(Insn::OpenWrite {
@@ -381,13 +385,14 @@ pub fn translate_insert(
db: 0,
});
// Main loop
// FIXME: rollback is not implemented. E.g. if you insert 2 rows and one fails to unique constraint violation,
// the other row will still be inserted.
program.preassign_label_to_next_insn(loop_start_label);
// on EOF, jump to select_exhausted to check FK constraints
let select_exhausted = program.allocate_label();
select_exhausted_label = Some(select_exhausted);
program.emit_insn(Insn::Yield {
yield_reg,
end_offset: halt_label,
end_offset: select_exhausted,
});
}
@@ -1033,6 +1038,16 @@ pub fn translate_insert(
}
}
}
if has_fks {
// Child-side check must run before Insert (may HALT or increment deferred counter)
emit_fk_child_insert_checks(
&mut program,
resolver,
&btree_table,
insertion.first_col_register(),
insertion.key_register(),
)?;
}
program.emit_insn(Insn::Insert {
cursor: cursor_id,
@@ -1042,6 +1057,11 @@ pub fn translate_insert(
table_name: table_name.to_string(),
});
if has_fks {
// After the row is actually present, repair deferred counters for children referencing this NEW parent key.
emit_parent_side_fk_decrement_on_insert(&mut program, resolver, &btree_table, &insertion)?;
}
if let Some((seq_cursor_id, r_seq, r_seq_rowid, table_name_reg)) = autoincrement_meta {
let no_update_needed_label = program.allocate_label();
program.emit_insn(Insn::Le {
@@ -1132,6 +1152,7 @@ pub fn translate_insert(
&mut result_columns,
cdc_table.as_ref().map(|c| c.0),
row_done_label,
connection,
)?;
} else {
// UpsertDo::Nothing case
@@ -1154,17 +1175,31 @@ pub fn translate_insert(
program.emit_insn(Insn::Close {
cursor_id: temp_table_ctx.cursor_id,
});
program.emit_insn(Insn::Goto {
target_pc: stmt_epilogue,
});
} else {
// For multiple rows which not require a temp table, loop back
program.resolve_label(row_done_label, program.offset());
program.emit_insn(Insn::Goto {
target_pc: loop_start_label,
});
if let Some(sel_eof) = select_exhausted_label {
program.preassign_label_to_next_insn(sel_eof);
program.emit_insn(Insn::Goto {
target_pc: stmt_epilogue,
});
}
}
} else {
program.resolve_label(row_done_label, program.offset());
// single-row falls through to epilogue
program.emit_insn(Insn::Goto {
target_pc: stmt_epilogue,
});
}
program.preassign_label_to_next_insn(stmt_epilogue);
program.resolve_label(halt_label, program.offset());
Ok(program)
@@ -1857,3 +1892,385 @@ fn emit_update_sqlite_sequence(
Ok(())
}
/// Child-side FK checks for INSERT of a single row:
/// For each outgoing FK on `child_tbl`, if the NEW tuple's FK columns are all non-NULL,
/// verify that the referenced parent key exists.
pub fn emit_fk_child_insert_checks(
program: &mut ProgramBuilder,
resolver: &Resolver,
child_tbl: &BTreeTable,
new_start_reg: usize,
new_rowid_reg: usize,
) -> crate::Result<()> {
for fk_ref in resolver.schema.resolved_fks_for_child(&child_tbl.name)? {
let is_self_ref = fk_ref.fk.parent_table.eq_ignore_ascii_case(&child_tbl.name);
// Short-circuit if any NEW component is NULL
let fk_ok = program.allocate_label();
for cname in &fk_ref.child_cols {
let (i, col) = child_tbl.get_column(cname).unwrap();
let src = if col.is_rowid_alias {
new_rowid_reg
} else {
new_start_reg + i
};
program.emit_insn(Insn::IsNull {
reg: src,
target_pc: fk_ok,
});
}
let parent_tbl = resolver
.schema
.get_btree_table(&fk_ref.fk.parent_table)
.expect("parent btree");
if fk_ref.parent_uses_rowid {
let pcur = open_read_table(program, &parent_tbl);
// first child col carries rowid
let (i_child, col_child) = child_tbl.get_column(&fk_ref.child_cols[0]).unwrap();
let val_reg = if col_child.is_rowid_alias {
new_rowid_reg
} else {
new_start_reg + i_child
};
// Normalize rowid to integer for both the probe and the same-row fast path.
let tmp = program.alloc_register();
program.emit_insn(Insn::Copy {
src_reg: val_reg,
dst_reg: tmp,
extra_amount: 0,
});
program.emit_insn(Insn::MustBeInt { reg: tmp });
// If this is a self-reference *and* the child FK equals NEW rowid,
// the constraint will be satisfied once this row is inserted
if is_self_ref {
program.emit_insn(Insn::Eq {
lhs: tmp,
rhs: new_rowid_reg,
target_pc: fk_ok,
flags: CmpInsFlags::default(),
collation: None,
});
}
let violation = program.allocate_label();
program.emit_insn(Insn::NotExists {
cursor: pcur,
rowid_reg: tmp,
target_pc: violation,
});
program.emit_insn(Insn::Close { cursor_id: pcur });
program.emit_insn(Insn::Goto { target_pc: fk_ok });
// Missing parent: immediate vs deferred as usual
program.preassign_label_to_next_insn(violation);
program.emit_insn(Insn::Close { cursor_id: pcur });
emit_fk_violation(program, &fk_ref.fk)?;
program.preassign_label_to_next_insn(fk_ok);
} else {
let idx = fk_ref
.parent_unique_index
.as_ref()
.expect("parent unique index required");
let icur = open_read_index(program, idx);
let ncols = fk_ref.child_cols.len();
// Build NEW child probe from child NEW values, apply parent-index affinities.
let probe = {
let start = program.alloc_registers(ncols);
for (k, cname) in fk_ref.child_cols.iter().enumerate() {
let (i, col) = child_tbl.get_column(cname).unwrap();
program.emit_insn(Insn::Copy {
src_reg: if col.is_rowid_alias {
new_rowid_reg
} else {
new_start_reg + i
},
dst_reg: start + k,
extra_amount: 0,
});
}
if let Some(cnt) = NonZeroUsize::new(ncols) {
program.emit_insn(Insn::Affinity {
start_reg: start,
count: cnt,
affinities: build_index_affinity_string(idx, &parent_tbl),
});
}
start
};
if is_self_ref {
// Determine the parent column order to compare against:
let parent_cols: Vec<&str> =
idx.columns.iter().map(|ic| ic.name.as_str()).collect();
// Build new parent-key image from this same rows new values, in the index order.
let parent_new = program.alloc_registers(ncols);
for (i, pname) in parent_cols.iter().enumerate() {
let (pos, col) = child_tbl.get_column(pname).unwrap();
program.emit_insn(Insn::Copy {
src_reg: if col.is_rowid_alias {
new_rowid_reg
} else {
new_start_reg + pos
},
dst_reg: parent_new + i,
extra_amount: 0,
});
}
if let Some(cnt) = NonZeroUsize::new(ncols) {
program.emit_insn(Insn::Affinity {
start_reg: parent_new,
count: cnt,
affinities: build_index_affinity_string(idx, &parent_tbl),
});
}
// Compare child probe to NEW parent image column-by-column.
let mismatch = program.allocate_label();
for i in 0..ncols {
let cont = program.allocate_label();
program.emit_insn(Insn::Eq {
lhs: probe + i,
rhs: parent_new + i,
target_pc: cont,
flags: CmpInsFlags::default().jump_if_null(),
collation: Some(super::collate::CollationSeq::Binary),
});
program.emit_insn(Insn::Goto {
target_pc: mismatch,
});
program.preassign_label_to_next_insn(cont);
}
// All equal: same-row OK
program.emit_insn(Insn::Goto { target_pc: fk_ok });
program.preassign_label_to_next_insn(mismatch);
}
index_probe(
program,
icur,
probe,
ncols,
// on_found: parent exists, FK satisfied
|_p| Ok(()),
// on_not_found: behave like a normal FK
|p| {
emit_fk_violation(p, &fk_ref.fk)?;
Ok(())
},
)?;
program.emit_insn(Insn::Goto { target_pc: fk_ok });
program.preassign_label_to_next_insn(fk_ok);
}
}
Ok(())
}
/// Build NEW parent key image in FK parent-column order into a contiguous register block.
/// Handles 3 shapes:
/// - parent_uses_rowid: single "rowid" component
/// - explicit fk.parent_columns
/// - fk.parent_columns empty => use parent's declared PK columns (order-preserving)
fn build_parent_key_image_for_insert(
program: &mut ProgramBuilder,
parent_table: &BTreeTable,
pref: &ResolvedFkRef,
insertion: &Insertion,
) -> crate::Result<(usize, usize)> {
// Decide column list
let parent_cols: Vec<String> = if pref.parent_uses_rowid {
vec!["rowid".to_string()]
} else if !pref.fk.parent_columns.is_empty() {
pref.fk.parent_columns.clone()
} else {
// fall back to the declared PK of the parent table, in schema order
parent_table
.primary_key_columns
.iter()
.map(|(n, _)| n.clone())
.collect()
};
let ncols = parent_cols.len();
let start = program.alloc_registers(ncols);
// Copy from the would-be parent insertion
for (i, pname) in parent_cols.iter().enumerate() {
let src = if pname.eq_ignore_ascii_case("rowid") {
insertion.key_register()
} else {
// For rowid-alias parents, get_col_mapping_by_name will return the key mapping,
// not the NULL placeholder in col_mappings.
insertion
.get_col_mapping_by_name(pname)
.ok_or_else(|| {
crate::LimboError::PlanningError(format!(
"Column '{}' not present in INSERT image for parent {}",
pname, parent_table.name
))
})?
.register
};
program.emit_insn(Insn::Copy {
src_reg: src,
dst_reg: start + i,
extra_amount: 0,
});
}
// Apply affinities of the parent columns (or integer for rowid)
let aff: String = if pref.parent_uses_rowid {
"i".to_string()
} else {
parent_cols
.iter()
.map(|name| {
let (_, col) = parent_table.get_column(name).ok_or_else(|| {
crate::LimboError::InternalError(format!("parent col {name} missing"))
})?;
Ok::<_, crate::LimboError>(col.affinity().aff_mask())
})
.collect::<Result<String, _>>()?
};
if let Some(count) = NonZeroUsize::new(ncols) {
program.emit_insn(Insn::Affinity {
start_reg: start,
count,
affinities: aff,
});
}
Ok((start, ncols))
}
/// Parent-side: when inserting into the parent, decrement the counter
/// if any child rows reference the NEW parent key.
/// We *always* do this for deferred FKs, and we *also* do it for
/// self-referential FKs (even if immediate) because the insert can
/// “repair” a prior child-insert count recorded earlier in the same statement.
pub fn emit_parent_side_fk_decrement_on_insert(
program: &mut ProgramBuilder,
resolver: &Resolver,
parent_table: &BTreeTable,
insertion: &Insertion,
) -> crate::Result<()> {
for pref in resolver
.schema
.resolved_fks_referencing(&parent_table.name)?
{
let is_self_ref = pref
.child_table
.name
.eq_ignore_ascii_case(&parent_table.name);
// Skip only when it cannot repair anything: non-deferred and not self-referencing
if !pref.fk.deferred && !is_self_ref {
continue;
}
let (new_pk_start, n_cols) =
build_parent_key_image_for_insert(program, parent_table, &pref, insertion)?;
let child_tbl = &pref.child_table;
let child_cols = &pref.fk.child_columns;
let idx = resolver.schema.get_indices(&child_tbl.name).find(|ix| {
ix.columns.len() == child_cols.len()
&& ix
.columns
.iter()
.zip(child_cols.iter())
.all(|(ic, cc)| ic.name.eq_ignore_ascii_case(cc))
});
if let Some(ix) = idx {
let icur = open_read_index(program, ix);
// Copy key into probe regs and apply child-index affinities
let probe_start = program.alloc_registers(n_cols);
for i in 0..n_cols {
program.emit_insn(Insn::Copy {
src_reg: new_pk_start + i,
dst_reg: probe_start + i,
extra_amount: 0,
});
}
if let Some(count) = NonZeroUsize::new(n_cols) {
program.emit_insn(Insn::Affinity {
start_reg: probe_start,
count,
affinities: build_index_affinity_string(ix, child_tbl),
});
}
let found = program.allocate_label();
program.emit_insn(Insn::Found {
cursor_id: icur,
target_pc: found,
record_reg: probe_start,
num_regs: n_cols,
});
// Not found, nothing to decrement
program.emit_insn(Insn::Close { cursor_id: icur });
let skip = program.allocate_label();
program.emit_insn(Insn::Goto { target_pc: skip });
// Found: guarded counter decrement
program.resolve_label(found, program.offset());
program.emit_insn(Insn::Close { cursor_id: icur });
emit_guarded_fk_decrement(program, skip);
program.resolve_label(skip, program.offset());
} else {
// fallback scan :(
let ccur = open_read_table(program, child_tbl);
let done = program.allocate_label();
program.emit_insn(Insn::Rewind {
cursor_id: ccur,
pc_if_empty: done,
});
let loop_top = program.allocate_label();
let next_row = program.allocate_label();
program.resolve_label(loop_top, program.offset());
for (i, child_name) in child_cols.iter().enumerate() {
let (pos, _) = child_tbl.get_column(child_name).ok_or_else(|| {
crate::LimboError::InternalError(format!("child col {child_name} missing"))
})?;
let tmp = program.alloc_register();
program.emit_insn(Insn::Column {
cursor_id: ccur,
column: pos,
dest: tmp,
default: None,
});
program.emit_insn(Insn::IsNull {
reg: tmp,
target_pc: next_row,
});
let cont = program.allocate_label();
program.emit_insn(Insn::Eq {
lhs: tmp,
rhs: new_pk_start + i,
target_pc: cont,
flags: CmpInsFlags::default().jump_if_null(),
collation: Some(super::collate::CollationSeq::Binary),
});
program.emit_insn(Insn::Goto {
target_pc: next_row,
});
program.resolve_label(cont, program.offset());
}
// Matched one child row: guarded decrement of counter
emit_guarded_fk_decrement(program, next_row);
program.resolve_label(next_row, program.offset());
program.emit_insn(Insn::Next {
cursor_id: ccur,
pc_if_next: loop_top,
});
program.resolve_label(done, program.offset());
program.emit_insn(Insn::Close { cursor_id: ccur });
}
}
Ok(())
}

View File

@@ -2389,6 +2389,7 @@ mod tests {
name: "users".to_string(),
root_page: 2,
primary_key_columns: vec![("id".to_string(), turso_parser::ast::SortOrder::Asc)],
foreign_keys: vec![],
columns: vec![
SchemaColumn {
name: Some("id".to_string()),
@@ -2505,6 +2506,7 @@ mod tests {
is_strict: false,
has_autoincrement: false,
unique_sets: vec![],
foreign_keys: vec![],
};
schema.add_btree_table(Arc::new(orders_table));
@@ -2567,6 +2569,7 @@ mod tests {
is_strict: false,
has_autoincrement: false,
unique_sets: vec![],
foreign_keys: vec![],
};
schema.add_btree_table(Arc::new(products_table));

View File

@@ -17,6 +17,7 @@ pub(crate) mod delete;
pub(crate) mod display;
pub(crate) mod emitter;
pub(crate) mod expr;
pub(crate) mod fkeys;
pub(crate) mod group_by;
pub(crate) mod index;
pub(crate) mod insert;

View File

@@ -1664,6 +1664,7 @@ mod tests {
has_rowid: true,
is_strict: false,
unique_sets: vec![],
foreign_keys: vec![],
})
}

View File

@@ -478,6 +478,7 @@ fn parse_table(
has_autoincrement: false,
unique_sets: vec![],
foreign_keys: vec![],
});
drop(view_guard);

View File

@@ -95,6 +95,20 @@ fn update_pragma(
connection: Arc<crate::Connection>,
mut program: ProgramBuilder,
) -> crate::Result<(ProgramBuilder, TransactionMode)> {
let parse_pragma_enabled = |expr: &ast::Expr| -> bool {
if let Expr::Literal(Literal::Numeric(n)) = expr {
return !matches!(n.as_str(), "0");
};
let name_bytes = match expr {
Expr::Literal(Literal::Keyword(name)) => name.as_bytes(),
Expr::Name(name) | Expr::Id(name) => name.as_str().as_bytes(),
_ => "".as_bytes(),
};
match_ignore_ascii_case!(match name_bytes {
b"ON" | b"TRUE" | b"YES" | b"1" => true,
_ => false,
})
};
match pragma {
PragmaName::ApplicationId => {
let data = parse_signed_number(&value)?;
@@ -343,38 +357,15 @@ fn update_pragma(
}
PragmaName::Synchronous => {
use crate::SyncMode;
let mode = match value {
Expr::Name(name) => {
let name_bytes = name.as_str().as_bytes();
match_ignore_ascii_case!(match name_bytes {
b"OFF" | b"FALSE" | b"NO" | b"0" => SyncMode::Off,
_ => SyncMode::Full,
})
}
Expr::Literal(Literal::Numeric(n)) => match n.as_str() {
"0" => SyncMode::Off,
_ => SyncMode::Full,
},
_ => SyncMode::Full,
let mode = match parse_pragma_enabled(&value) {
true => SyncMode::Full,
false => SyncMode::Off,
};
connection.set_sync_mode(mode);
Ok((program, TransactionMode::None))
}
PragmaName::DataSyncRetry => {
let retry_enabled = match value {
Expr::Name(name) => {
let name_bytes = name.as_str().as_bytes();
match_ignore_ascii_case!(match name_bytes {
b"ON" | b"TRUE" | b"YES" | b"1" => true,
_ => false,
})
}
Expr::Literal(Literal::Numeric(n)) => !matches!(n.as_str(), "0"),
_ => false,
};
let retry_enabled = parse_pragma_enabled(&value);
connection.set_data_sync_retry(retry_enabled);
Ok((program, TransactionMode::None))
}
@@ -387,6 +378,11 @@ fn update_pragma(
connection.set_mvcc_checkpoint_threshold(threshold)?;
Ok((program, TransactionMode::None))
}
PragmaName::ForeignKeys => {
let enabled = parse_pragma_enabled(&value);
connection.set_foreign_keys_enabled(enabled);
Ok((program, TransactionMode::None))
}
}
}
@@ -704,6 +700,14 @@ fn query_pragma(
program.add_pragma_result_column(pragma.to_string());
Ok((program, TransactionMode::None))
}
PragmaName::ForeignKeys => {
let enabled = connection.foreign_keys_enabled();
let register = program.alloc_register();
program.emit_int(enabled as i64, register);
program.emit_result_row(register, 1);
program.add_pragma_result_column(pragma.to_string());
Ok((program, TransactionMode::None))
}
}
}

View File

@@ -812,6 +812,7 @@ pub fn translate_drop_table(
}],
is_strict: false,
unique_sets: vec![],
foreign_keys: vec![],
});
// cursor id 2
let ephemeral_cursor_id = program.alloc_cursor_id(CursorType::BTreeTable(simple_table_rc));

View File

@@ -353,6 +353,7 @@ pub fn prepare_update_plan(
}],
is_strict: false,
unique_sets: vec![],
foreign_keys: vec![],
});
let temp_cursor_id = program.alloc_cursor_id(CursorType::BTreeTable(table.clone()));

View File

@@ -5,10 +5,13 @@ use std::{collections::HashMap, sync::Arc};
use turso_parser::ast::{self, Upsert};
use crate::error::SQLITE_CONSTRAINT_PRIMARYKEY;
use crate::schema::ROWID_SENTINEL;
use crate::translate::expr::{walk_expr, WalkControl};
use crate::translate::fkeys::{emit_fk_child_update_counters, emit_parent_pk_change_checks};
use crate::translate::insert::format_unique_violation_desc;
use crate::translate::planner::ROWID_STRS;
use crate::vdbe::insn::CmpInsFlags;
use crate::Connection;
use crate::{
bail_parse_error,
error::SQLITE_CONSTRAINT_NOTNULL,
@@ -346,6 +349,7 @@ pub fn emit_upsert(
returning: &mut [ResultSetColumn],
cdc_cursor_id: Option<usize>,
row_done_label: BranchOffset,
connection: &Arc<Connection>,
) -> crate::Result<()> {
// Seek & snapshot CURRENT
program.emit_insn(Insn::SeekRowid {
@@ -464,10 +468,55 @@ pub fn emit_upsert(
}
}
let (changed_cols, rowid_changed) = collect_changed_cols(table, set_pairs);
let rowid_alias_idx = table.columns().iter().position(|c| c.is_rowid_alias);
let has_direct_rowid_update = set_pairs
.iter()
.any(|(idx, _)| *idx == rowid_alias_idx.unwrap_or(ROWID_SENTINEL));
let has_user_provided_rowid = if let Some(i) = rowid_alias_idx {
set_pairs.iter().any(|(idx, _)| *idx == i) || has_direct_rowid_update
} else {
has_direct_rowid_update
};
let rowid_set_clause_reg = if has_user_provided_rowid {
Some(new_rowid_reg.unwrap_or(conflict_rowid_reg))
} else {
None
};
if let Some(bt) = table.btree() {
if connection.foreign_keys_enabled() {
let rowid_new_reg = new_rowid_reg.unwrap_or(conflict_rowid_reg);
// Child-side checks
if resolver.schema.has_child_fks(bt.name.as_str()) {
emit_fk_child_update_counters(
program,
resolver,
&bt,
table.get_name(),
tbl_cursor_id,
new_start,
rowid_new_reg,
&changed_cols,
)?;
}
emit_parent_pk_change_checks(
program,
resolver,
&bt,
tbl_cursor_id,
conflict_rowid_reg,
new_start,
new_rowid_reg.unwrap_or(conflict_rowid_reg),
rowid_set_clause_reg,
set_pairs,
)?;
}
}
// Index rebuild (DELETE old, INSERT new), honoring partial-index WHEREs
if let Some(before) = before_start {
let (changed_cols, rowid_changed) = collect_changed_cols(table, set_pairs);
for (idx_name, _root, idx_cid) in idx_cursors {
let idx_meta = resolver
.schema

View File

@@ -80,6 +80,7 @@ pub fn translate_create_materialized_view(
has_autoincrement: false,
unique_sets: vec![],
foreign_keys: vec![],
});
// Allocate a cursor for writing to the view's btree during population

View File

@@ -505,6 +505,7 @@ pub fn init_window<'a>(
is_strict: false,
unique_sets: vec![],
has_autoincrement: false,
foreign_keys: vec![],
});
let cursor_buffer_read = program.alloc_cursor_id(CursorType::BTreeTable(buffer_table.clone()));
let cursor_buffer_write = program.alloc_cursor_id(CursorType::BTreeTable(buffer_table.clone()));

View File

@@ -792,6 +792,9 @@ impl ProgramBuilder {
Insn::NotFound { target_pc, .. } => {
resolve(target_pc, "NotFound");
}
Insn::FkIfZero { target_pc, .. } => {
resolve(target_pc, "FkIfZero");
}
_ => {}
}
}

View File

@@ -1,5 +1,5 @@
#![allow(unused_variables)]
use crate::error::SQLITE_CONSTRAINT_UNIQUE;
use crate::error::{SQLITE_CONSTRAINT_FOREIGNKEY, SQLITE_CONSTRAINT_UNIQUE};
use crate::function::AlterTableFunc;
use crate::mvcc::database::CheckpointStateMachine;
use crate::numeric::{NullableInteger, Numeric};
@@ -35,7 +35,7 @@ use crate::{
},
translate::emitter::TransactionMode,
};
use crate::{get_cursor, CheckpointMode, MvCursor};
use crate::{get_cursor, CheckpointMode, Connection, MvCursor};
use std::env::temp_dir;
use std::ops::DerefMut;
use std::{
@@ -2156,6 +2156,9 @@ pub fn halt(
"UNIQUE constraint failed: {description} (19)"
)));
}
SQLITE_CONSTRAINT_FOREIGNKEY => {
return Err(LimboError::Constraint(format!("{description} (19)")));
}
_ => {
return Err(LimboError::Constraint(format!(
"undocumented halt error code {description}"
@@ -2166,9 +2169,21 @@ pub fn halt(
let auto_commit = program.connection.auto_commit.load(Ordering::SeqCst);
tracing::trace!("halt(auto_commit={})", auto_commit);
if auto_commit {
program
.commit_txn(pager.clone(), state, mv_store, false)
.map(Into::into)
let res = program.commit_txn(pager.clone(), state, mv_store, false);
if res.is_ok()
&& program.connection.foreign_keys_enabled()
&& program
.connection
.fk_deferred_violations
.swap(0, Ordering::AcqRel)
> 0
{
// In autocommit mode, a statement that leaves deferred violations must fail here.
return Err(LimboError::Constraint(
"foreign key constraint failed".to_string(),
));
}
res.map(Into::into)
} else {
Ok(InsnFunctionStepResult::Done)
}
@@ -2441,20 +2456,47 @@ pub fn op_auto_commit(
load_insn!(
AutoCommit {
auto_commit,
rollback,
rollback
},
insn
);
let conn = program.connection.clone();
let fk_on = conn.foreign_keys_enabled();
let had_autocommit = conn.auto_commit.load(Ordering::SeqCst); // true, not in tx
// Drive any multi-step commit/rollback thats already in progress.
if matches!(state.commit_state, CommitState::Committing) {
return program
let res = program
.commit_txn(pager.clone(), state, mv_store, *rollback)
.map(Into::into);
// Only clear after a final, successful non-rollback COMMIT.
if fk_on
&& !*rollback
&& matches!(
res,
Ok(InsnFunctionStepResult::Step | InsnFunctionStepResult::Done)
)
{
conn.clear_deferred_foreign_key_violations();
}
return res;
}
if *auto_commit != conn.auto_commit.load(Ordering::SeqCst) {
if *rollback {
// TODO(pere): add rollback I/O logic once we implement rollback journal
// The logic in this opcode can be a bit confusing, so to make things a bit clearer lets be
// very explicit about the currently existing and requested state.
let requested_autocommit = *auto_commit;
let requested_rollback = *rollback;
let changed = requested_autocommit != had_autocommit;
// what the requested operation is
let is_begin_req = had_autocommit && !requested_autocommit && !requested_rollback;
let is_commit_req = !had_autocommit && requested_autocommit && !requested_rollback;
let is_rollback_req = !had_autocommit && requested_autocommit && requested_rollback;
if changed {
if requested_rollback {
// ROLLBACK transition
if let Some(mv_store) = mv_store {
if let Some(tx_id) = conn.get_mv_tx_id() {
mv_store.rollback_tx(tx_id, pager.clone(), &conn);
@@ -2465,16 +2507,23 @@ pub fn op_auto_commit(
conn.set_tx_state(TransactionState::None);
conn.auto_commit.store(true, Ordering::SeqCst);
} else {
conn.auto_commit.store(*auto_commit, Ordering::SeqCst);
// BEGIN (true->false) or COMMIT (false->true)
if is_commit_req {
// Pre-check deferred FKs; leave tx open and do NOT clear violations
check_deferred_fk_on_commit(&conn)?;
}
conn.auto_commit
.store(requested_autocommit, Ordering::SeqCst);
}
} else {
let mvcc_tx_active = program.connection.get_mv_tx().is_some();
// No autocommit flip
let mvcc_tx_active = conn.get_mv_tx().is_some();
if !mvcc_tx_active {
if !*auto_commit {
if !requested_autocommit {
return Err(LimboError::TxError(
"cannot start a transaction within a transaction".to_string(),
));
} else if *rollback {
} else if requested_rollback {
return Err(LimboError::TxError(
"cannot rollback - no transaction is active".to_string(),
));
@@ -2483,19 +2532,41 @@ pub fn op_auto_commit(
"cannot commit - no transaction is active".to_string(),
));
}
} else {
let is_begin = !*auto_commit && !*rollback;
if is_begin {
return Err(LimboError::TxError(
"cannot use BEGIN after BEGIN CONCURRENT".to_string(),
));
}
} else if is_begin_req {
return Err(LimboError::TxError(
"cannot use BEGIN after BEGIN CONCURRENT".to_string(),
));
}
}
program
.commit_txn(pager.clone(), state, mv_store, *rollback)
.map(Into::into)
let res = program
.commit_txn(pager.clone(), state, mv_store, requested_rollback)
.map(Into::into);
// Clear deferred FK counters only after FINAL success of COMMIT/ROLLBACK.
if fk_on
&& matches!(
res,
Ok(InsnFunctionStepResult::Step | InsnFunctionStepResult::Done)
)
&& (is_rollback_req || is_commit_req)
{
conn.clear_deferred_foreign_key_violations();
}
res
}
fn check_deferred_fk_on_commit(conn: &Connection) -> Result<()> {
if !conn.foreign_keys_enabled() {
return Ok(());
}
if conn.get_deferred_foreign_key_violations() > 0 {
return Err(LimboError::Constraint(
"FOREIGN KEY constraint failed".into(),
));
}
Ok(())
}
pub fn op_goto(
@@ -8287,6 +8358,72 @@ fn handle_text_sum(acc: &mut Value, sum_state: &mut SumAggState, parsed_number:
}
}
pub fn op_fk_counter(
program: &Program,
state: &mut ProgramState,
insn: &Insn,
pager: &Arc<Pager>,
mv_store: Option<&Arc<MvStore>>,
) -> Result<InsnFunctionStepResult> {
load_insn!(
FkCounter {
increment_value,
is_scope,
},
insn
);
if *is_scope {
state.fk_scope_counter = state.fk_scope_counter.saturating_add(*increment_value);
} else {
// Transaction-level counter: add/subtract for deferred FKs.
program
.connection
.fk_deferred_violations
.fetch_add(*increment_value, Ordering::AcqRel);
}
state.pc += 1;
Ok(InsnFunctionStepResult::Step)
}
pub fn op_fk_if_zero(
program: &Program,
state: &mut ProgramState,
insn: &Insn,
_pager: &Arc<Pager>,
_mv_store: Option<&Arc<MvStore>>,
) -> Result<InsnFunctionStepResult> {
load_insn!(
FkIfZero {
is_scope,
target_pc,
},
insn
);
let fk_enabled = program.connection.foreign_keys_enabled();
// Jump if any:
// Foreign keys are disabled globally
// p1 is true AND deferred constraint counter is zero
// p1 is false AND deferred constraint counter is non-zero
if !fk_enabled {
state.pc = target_pc.as_offset_int();
return Ok(InsnFunctionStepResult::Step);
}
let v = if !*is_scope {
program.connection.get_deferred_foreign_key_violations()
} else {
state.fk_scope_counter
};
state.pc = if v == 0 {
target_pc.as_offset_int()
} else {
state.pc + 1
};
Ok(InsnFunctionStepResult::Step)
}
mod cmath {
extern "C" {
pub fn exp(x: f64) -> f64;

View File

@@ -1804,7 +1804,25 @@ pub fn insn_to_row(
0,
String::new(),
),
}
Insn::FkCounter{increment_value, is_scope } => (
"FkCounter",
*increment_value as i32,
*is_scope as i32,
0,
Value::build_text(""),
0,
String::new(),
),
Insn::FkIfZero{target_pc, is_scope } => (
"FkIfZero",
target_pc.as_debug_int(),
*is_scope as i32,
0,
Value::build_text(""),
0,
String::new(),
),
}
}
pub fn insn_to_row_with_comment(

View File

@@ -1169,6 +1169,20 @@ pub enum Insn {
p2: Option<usize>, // P2: address of parent explain instruction
detail: String, // P4: detail text
},
// Increment a "constraint counter" by P2 (P2 may be negative or positive).
// If P1 is non-zero, the database constraint counter is incremented (deferred foreign key constraints).
// Otherwise, if P1 is zero, the statement counter is incremented (immediate foreign key constraints).
FkCounter {
increment_value: isize,
is_scope: bool,
},
// This opcode tests if a foreign key constraint-counter is currently zero. If so, jump to instruction P2. Otherwise, fall through to the next instruction.
// If P1 is non-zero, then the jump is taken if the database constraint-counter is zero (the one that counts deferred constraint violations).
// If P1 is zero, the jump is taken if the statement constraint-counter is zero (immediate foreign key constraint violations).
FkIfZero {
is_scope: bool,
target_pc: BranchOffset,
},
}
const fn get_insn_virtual_table() -> [InsnFunction; InsnVariants::COUNT] {
@@ -1335,6 +1349,8 @@ impl InsnVariants {
InsnVariants::MemMax => execute::op_mem_max,
InsnVariants::Sequence => execute::op_sequence,
InsnVariants::SequenceTest => execute::op_sequence_test,
InsnVariants::FkCounter => execute::op_fk_counter,
InsnVariants::FkIfZero => execute::op_fk_if_zero,
}
}
}

View File

@@ -313,6 +313,7 @@ pub struct ProgramState {
/// This is used when statement in auto-commit mode reseted after previous uncomplete execution - in which case we may need to rollback transaction started on previous attempt
/// Note, that MVCC transactions are always explicit - so they do not update auto_txn_cleanup marker
pub(crate) auto_txn_cleanup: TxnCleanup,
fk_scope_counter: isize,
}
impl ProgramState {
@@ -359,6 +360,7 @@ impl ProgramState {
op_checkpoint_state: OpCheckpointState::StartCheckpoint,
view_delta_state: ViewDeltaCommitState::NotStarted,
auto_txn_cleanup: TxnCleanup::None,
fk_scope_counter: 0,
}
}
@@ -830,7 +832,6 @@ impl Program {
// Reset state for next use
program_state.view_delta_state = ViewDeltaCommitState::NotStarted;
if self.connection.get_tx_state() == TransactionState::None {
// No need to do any work here if not in tx. Current MVCC logic doesn't work with this assumption,
// hence the mv_store.is_none() check.

View File

@@ -1416,6 +1416,8 @@ pub enum PragmaName {
Encoding,
/// Current free page count.
FreelistCount,
/// Enable or disable foreign key constraint enforcement
ForeignKeys,
/// Run integrity check on the database file
IntegrityCheck,
/// `journal_mode` pragma

View File

@@ -47,3 +47,4 @@ source $testdir/vtab.test
source $testdir/upsert.test
source $testdir/window.test
source $testdir/partial_idx.test
source $testdir/foreign_keys.test

1106
testing/foreign_keys.test Normal file

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff