triggers: add execution plumbing to translation and vdbe layers

This commit is contained in:
Jussi Saurio
2025-11-18 12:35:46 +02:00
parent 3d00686f48
commit e60e37da7d
8 changed files with 1075 additions and 43 deletions

View File

@@ -41,6 +41,7 @@ pub mod numeric;
mod numeric;
use crate::index_method::IndexMethod;
use crate::schema::Trigger;
use crate::storage::checksum::CHECKSUM_REQUIRED_RESERVED_BYTES;
use crate::storage::encryption::AtomicCipherMode;
use crate::storage::pager::{AutoVacuumMode, HeaderRef};
@@ -642,6 +643,8 @@ impl Database {
view_transaction_states: AllViewsTxState::new(),
metrics: RwLock::new(ConnectionMetrics::new()),
nestedness: AtomicI32::new(0),
compiling_triggers: RwLock::new(Vec::new()),
executing_triggers: RwLock::new(Vec::new()),
encryption_key: RwLock::new(None),
encryption_cipher_mode: AtomicCipherMode::new(CipherMode::None),
sync_mode: AtomicSyncMode::new(SyncMode::Full),
@@ -1167,6 +1170,11 @@ pub struct Connection {
/// The state is integer as we may want to spawn deep nested programs (e.g. Root -[run]-> S1 -[run]-> S2 -[run]-> ...)
/// and we need to track current nestedness depth in order to properly understand when we will reach the root back again
nestedness: AtomicI32,
/// Stack of currently compiling triggers to prevent recursive trigger subprogram compilation
compiling_triggers: RwLock<Vec<Arc<Trigger>>>,
/// Stack of currently executing triggers to prevent recursive trigger execution
/// Only prevents the same trigger from firing again, allowing different triggers on the same table to fire
executing_triggers: RwLock<Vec<Arc<Trigger>>>,
encryption_key: RwLock<Option<EncryptionKey>>,
encryption_cipher_mode: AtomicCipherMode,
sync_mode: AtomicSyncMode,
@@ -1212,6 +1220,52 @@ impl Connection {
pub fn end_nested(&self) {
self.nestedness.fetch_add(-1, Ordering::SeqCst);
}
/// Check if a specific trigger is currently compiling (for recursive trigger prevention)
pub fn trigger_is_compiling(&self, trigger: impl AsRef<Trigger>) -> bool {
let compiling = self.compiling_triggers.read();
if let Some(trigger) = compiling.iter().find(|t| t.name == trigger.as_ref().name) {
tracing::debug!("Trigger is already compiling: {}", trigger.name);
return true;
}
false
}
pub fn start_trigger_compilation(&self, trigger: Arc<Trigger>) {
tracing::debug!("Starting trigger compilation: {}", trigger.name);
self.compiling_triggers.write().push(trigger.clone());
}
pub fn end_trigger_compilation(&self) {
tracing::debug!(
"Ending trigger compilation: {:?}",
self.compiling_triggers.read().last().map(|t| &t.name)
);
self.compiling_triggers.write().pop();
}
/// Check if a specific trigger is currently executing (for recursive trigger prevention)
pub fn is_trigger_executing(&self, trigger: impl AsRef<Trigger>) -> bool {
let executing = self.executing_triggers.read();
if let Some(trigger) = executing.iter().find(|t| t.name == trigger.as_ref().name) {
tracing::debug!("Trigger is already executing: {}", trigger.name);
return true;
}
false
}
pub fn start_trigger_execution(&self, trigger: Arc<Trigger>) {
tracing::debug!("Starting trigger execution: {}", trigger.name);
self.executing_triggers.write().push(trigger.clone());
}
pub fn end_trigger_execution(&self) {
tracing::debug!(
"Ending trigger execution: {:?}",
self.executing_triggers.read().last().map(|t| &t.name)
);
self.executing_triggers.write().pop();
}
pub fn prepare(self: &Arc<Connection>, sql: impl AsRef<str>) -> Result<Statement> {
if self.is_mvcc_bootstrap_connection() {
// Never use MV store for bootstrapping - we read state directly from sqlite_schema in the DB file.
@@ -2053,6 +2107,10 @@ impl Connection {
self.db.mvcc_enabled()
}
pub fn mv_store(&self) -> Option<&Arc<MvStore>> {
self.db.mv_store.as_ref()
}
/// Query the current value(s) of `pragma_name` associated to
/// `pragma_value`.
///
@@ -2536,6 +2594,12 @@ pub struct Statement {
busy_timeout: Option<BusyTimeout>,
}
impl std::fmt::Debug for Statement {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Statement").finish()
}
}
impl Drop for Statement {
fn drop(&mut self) {
self.reset();
@@ -2567,6 +2631,11 @@ impl Statement {
busy_timeout: None,
}
}
pub fn get_trigger(&self) -> Option<Arc<Trigger>> {
self.program.trigger.clone()
}
pub fn get_query_mode(&self) -> QueryMode {
self.query_mode
}
@@ -2881,12 +2950,8 @@ impl Statement {
fn reset_internal(&mut self, max_registers: Option<usize>, max_cursors: Option<usize>) {
// as abort uses auto_txn_cleanup value - it needs to be called before state.reset
self.program.abort(
self.mv_store.as_ref(),
&self.pager,
None,
&mut self.state.auto_txn_cleanup,
);
self.program
.abort(self.mv_store.as_ref(), &self.pager, None, &mut self.state);
self.state.reset(max_registers, max_cursors);
self.busy = false;
self.busy_timeout = None;

View File

@@ -36,6 +36,7 @@ pub(crate) mod select;
pub(crate) mod subquery;
pub(crate) mod transaction;
pub(crate) mod trigger;
pub(crate) mod trigger_exec;
pub(crate) mod update;
pub(crate) mod upsert;
mod values;

View File

@@ -0,0 +1,796 @@
use crate::schema::{BTreeTable, Trigger};
use crate::translate::emitter::Resolver;
use crate::translate::expr::translate_expr;
use crate::translate::{translate_inner, ProgramBuilder, ProgramBuilderOpts};
use crate::util::normalize_ident;
use crate::vdbe::insn::Insn;
use crate::{bail_parse_error, QueryMode, Result, Statement};
use parking_lot::RwLock;
use std::collections::HashSet;
use std::num::NonZero;
use std::sync::Arc;
use turso_parser::ast::{self, Expr, TriggerEvent, TriggerTime};
/// Context for trigger execution
#[derive(Debug)]
pub struct TriggerContext {
/// Table the trigger is attached to
pub table: Arc<BTreeTable>,
/// NEW row registers (for INSERT/UPDATE). The last element is always the rowid.
pub new_registers: Option<Vec<usize>>,
/// OLD row registers (for UPDATE/DELETE). The last element is always the rowid.
pub old_registers: Option<Vec<usize>>,
}
impl TriggerContext {
pub fn new(
table: Arc<BTreeTable>,
new_registers: Option<Vec<usize>>,
old_registers: Option<Vec<usize>>,
) -> Self {
Self {
table,
new_registers,
old_registers,
}
}
}
/// Context for compiling trigger subprograms - maps NEW/OLD to parameter indices
#[derive(Debug)]
struct TriggerSubprogramContext {
/// Map from column index to parameter index for NEW values (1-indexed)
new_param_map: Option<Vec<NonZero<usize>>>,
/// Map from column index to parameter index for OLD values (1-indexed)
old_param_map: Option<Vec<NonZero<usize>>>,
table: Arc<BTreeTable>,
}
/// Rewrite NEW and OLD references in trigger expressions to use Variable instructions (parameters)
fn rewrite_trigger_expr_for_subprogram(
expr: &mut ast::Expr,
table: &BTreeTable,
ctx: &TriggerSubprogramContext,
) -> Result<()> {
use crate::translate::expr::walk_expr_mut;
use crate::translate::expr::WalkControl;
walk_expr_mut(expr, &mut |e: &mut ast::Expr| -> Result<WalkControl> {
match e {
Expr::Qualified(ns, col) | Expr::DoublyQualified(_, ns, col) => {
let ns = normalize_ident(ns.as_str());
let col = normalize_ident(col.as_str());
// Handle NEW.column references
if ns.eq_ignore_ascii_case("new") {
if let Some(new_params) = &ctx.new_param_map {
// Check if this is a rowid alias column first
if let Some((idx, col_def)) = table.get_column(&col) {
if col_def.is_rowid_alias() {
// Rowid alias columns map to the rowid parameter, not the column register
*e = Expr::Variable(format!(
"{}",
*ctx.new_param_map
.as_ref()
.expect("NEW parameters must be provided")
.last()
.expect("NEW parameters must be provided")
));
return Ok(WalkControl::Continue);
}
if idx < new_params.len() {
*e = Expr::Variable(format!("{}", new_params[idx].get()));
return Ok(WalkControl::Continue);
}
}
// Handle NEW.rowid
if crate::translate::planner::ROWID_STRS
.iter()
.any(|s| s.eq_ignore_ascii_case(&col))
{
*e = Expr::Variable(format!(
"{}",
ctx.new_param_map
.as_ref()
.expect("NEW parameters must be provided")
.last()
.expect("NEW parameters must be provided")
.get()
));
return Ok(WalkControl::Continue);
}
bail_parse_error!("no such column in NEW: {}", col);
} else {
bail_parse_error!(
"NEW references are only valid in INSERT and UPDATE triggers"
);
}
}
// Handle OLD.column references
if ns.eq_ignore_ascii_case("old") {
if let Some(old_params) = &ctx.old_param_map {
if let Some((idx, _)) = table.get_column(&col) {
if idx < old_params.len() {
*e = Expr::Variable(format!("{}", old_params[idx].get()));
return Ok(WalkControl::Continue);
}
}
// Handle OLD.rowid
if crate::translate::planner::ROWID_STRS
.iter()
.any(|s| s.eq_ignore_ascii_case(&col))
{
*e = Expr::Variable(format!(
"{}",
ctx.old_param_map
.as_ref()
.expect("OLD parameters must be provided")
.last()
.expect("OLD parameters must be provided")
.get()
));
return Ok(WalkControl::Continue);
}
bail_parse_error!("no such column in OLD: {}", col);
} else {
bail_parse_error!(
"OLD references are only valid in UPDATE and DELETE triggers"
);
}
}
// Handle unqualified column references - they refer to NEW if available, else OLD
if let Some((idx, _)) = table.get_column(&col) {
if let Some(new_params) = &ctx.new_param_map {
if idx < new_params.len() {
*e = Expr::Variable(format!("{}", new_params[idx].get()));
return Ok(WalkControl::Continue);
}
}
if let Some(old_params) = &ctx.old_param_map {
if idx < old_params.len() {
*e = Expr::Variable(format!("{}", old_params[idx].get()));
return Ok(WalkControl::Continue);
}
}
}
Ok(WalkControl::Continue)
}
_ => Ok(WalkControl::Continue),
}
})?;
Ok(())
}
/// Convert TriggerCmd to Stmt, rewriting NEW/OLD to Variable expressions (for subprogram compilation)
fn trigger_cmd_to_stmt_for_subprogram(
cmd: &ast::TriggerCmd,
subprogram_ctx: &TriggerSubprogramContext,
) -> Result<ast::Stmt> {
use turso_parser::ast::{InsertBody, QualifiedName};
match cmd {
ast::TriggerCmd::Insert {
or_conflict,
tbl_name,
col_names,
select,
upsert,
returning,
} => {
// Rewrite NEW/OLD references in the SELECT
let mut select_clone = select.clone();
rewrite_expressions_in_select_for_subprogram(&mut select_clone, subprogram_ctx)?;
let body = InsertBody::Select(select_clone, upsert.clone());
Ok(ast::Stmt::Insert {
with: None,
or_conflict: *or_conflict,
tbl_name: QualifiedName {
db_name: None,
name: tbl_name.clone(),
alias: None,
},
columns: col_names.clone(),
body,
returning: returning.clone(),
})
}
ast::TriggerCmd::Update {
or_conflict,
tbl_name,
sets,
from,
where_clause,
} => {
// Rewrite NEW/OLD references in SET clauses and WHERE clause
let mut sets_clone = sets.clone();
for set in &mut sets_clone {
rewrite_trigger_expr_for_subprogram(
&mut set.expr,
&subprogram_ctx.table,
subprogram_ctx,
)?;
}
let mut where_clause_clone = where_clause.clone();
if let Some(ref mut where_expr) = where_clause_clone {
rewrite_trigger_expr_for_subprogram(
where_expr,
&subprogram_ctx.table,
subprogram_ctx,
)?;
}
Ok(ast::Stmt::Update(ast::Update {
with: None,
or_conflict: *or_conflict,
tbl_name: QualifiedName {
db_name: None,
name: tbl_name.clone(),
alias: None,
},
indexed: None,
sets: sets_clone,
from: from.clone(),
where_clause: where_clause_clone,
returning: vec![],
order_by: vec![],
limit: None,
}))
}
ast::TriggerCmd::Delete {
tbl_name,
where_clause,
} => {
// Rewrite NEW/OLD references in WHERE clause
let mut where_clause_clone = where_clause.clone();
if let Some(ref mut where_expr) = where_clause_clone {
rewrite_trigger_expr_for_subprogram(
where_expr,
&subprogram_ctx.table,
subprogram_ctx,
)?;
}
Ok(ast::Stmt::Delete {
tbl_name: QualifiedName {
db_name: None,
name: tbl_name.clone(),
alias: None,
},
where_clause: where_clause_clone,
limit: None,
returning: vec![],
indexed: None,
order_by: vec![],
with: None,
})
}
ast::TriggerCmd::Select(select) => {
// Rewrite NEW/OLD references in the SELECT
let mut select_clone = select.clone();
rewrite_expressions_in_select_for_subprogram(&mut select_clone, subprogram_ctx)?;
Ok(ast::Stmt::Select(select_clone))
}
}
}
/// Rewrite NEW/OLD references in all expressions within a SELECT statement for subprogram
fn rewrite_expressions_in_select_for_subprogram(
select: &mut ast::Select,
ctx: &TriggerSubprogramContext,
) -> Result<()> {
use crate::translate::expr::walk_expr_mut;
// Rewrite expressions in the SELECT body
match &mut select.body.select {
ast::OneSelect::Select {
columns,
where_clause,
group_by,
..
} => {
// Rewrite in columns
for col in columns {
if let ast::ResultColumn::Expr(ref mut expr, _) = col {
walk_expr_mut(expr, &mut |e: &mut ast::Expr| {
rewrite_trigger_expr_single_for_subprogram(e, ctx)?;
Ok(crate::translate::expr::WalkControl::Continue)
})?;
}
}
// Rewrite in WHERE clause
if let Some(ref mut where_expr) = where_clause {
walk_expr_mut(where_expr, &mut |e: &mut ast::Expr| {
rewrite_trigger_expr_single_for_subprogram(e, ctx)?;
Ok(crate::translate::expr::WalkControl::Continue)
})?;
}
// Rewrite in GROUP BY expressions and HAVING clause
if let Some(ref mut group_by) = group_by {
for expr in &mut group_by.exprs {
walk_expr_mut(expr, &mut |e: &mut ast::Expr| {
rewrite_trigger_expr_single_for_subprogram(e, ctx)?;
Ok(crate::translate::expr::WalkControl::Continue)
})?;
}
// Rewrite in HAVING clause
if let Some(ref mut having_expr) = group_by.having {
walk_expr_mut(having_expr, &mut |e: &mut ast::Expr| {
rewrite_trigger_expr_single_for_subprogram(e, ctx)?;
Ok(crate::translate::expr::WalkControl::Continue)
})?;
}
}
}
ast::OneSelect::Values(values) => {
for row in values {
for expr in row {
walk_expr_mut(expr, &mut |e: &mut ast::Expr| {
rewrite_trigger_expr_single_for_subprogram(e, ctx)?;
Ok(crate::translate::expr::WalkControl::Continue)
})?;
}
}
}
}
Ok(())
}
/// Rewrite a single NEW/OLD reference for subprogram (called from walk_expr_mut)
fn rewrite_trigger_expr_single_for_subprogram(
e: &mut ast::Expr,
ctx: &TriggerSubprogramContext,
) -> Result<()> {
match e {
Expr::Qualified(ns, col) | Expr::DoublyQualified(_, ns, col) => {
let ns = normalize_ident(ns.as_str());
let col = normalize_ident(col.as_str());
// Handle NEW.column references
if ns.eq_ignore_ascii_case("new") {
if let Some(new_params) = &ctx.new_param_map {
if let Some((idx, col_def)) = ctx.table.get_column(&col) {
if col_def.is_rowid_alias() {
*e = Expr::Variable(format!(
"{}",
ctx.new_param_map
.as_ref()
.expect("NEW parameters must be provided")
.last()
.expect("NEW parameters must be provided")
.get()
));
return Ok(());
}
if idx < new_params.len() {
*e = Expr::Variable(format!("{}", new_params[idx].get()));
return Ok(());
}
}
// Handle NEW.rowid
if crate::translate::planner::ROWID_STRS
.iter()
.any(|s| s.eq_ignore_ascii_case(&col))
{
*e = Expr::Variable(format!(
"{}",
ctx.new_param_map
.as_ref()
.expect("NEW parameters must be provided")
.last()
.expect("NEW parameters must be provided")
.get()
));
return Ok(());
}
bail_parse_error!("no such column in NEW: {}", col);
} else {
bail_parse_error!(
"NEW references are only valid in INSERT and UPDATE triggers"
);
}
}
// Handle OLD.column references
if ns.eq_ignore_ascii_case("old") {
if let Some(old_params) = &ctx.old_param_map {
if let Some((idx, col_def)) = ctx.table.get_column(&col) {
if col_def.is_rowid_alias() {
*e = Expr::Variable(format!(
"{}",
ctx.old_param_map
.as_ref()
.expect("OLD parameters must be provided")
.last()
.expect("OLD parameters must be provided")
.get()
));
return Ok(());
}
if idx < old_params.len() {
*e = Expr::Variable(format!("{}", old_params[idx].get()));
return Ok(());
}
}
// Handle OLD.rowid
if crate::translate::planner::ROWID_STRS
.iter()
.any(|s| s.eq_ignore_ascii_case(&col))
{
*e = Expr::Variable(format!(
"{}",
ctx.old_param_map
.as_ref()
.expect("OLD parameters must be provided")
.last()
.expect("OLD parameters must be provided")
.get()
));
return Ok(());
}
bail_parse_error!("no such column in OLD: {}", col);
} else {
bail_parse_error!(
"OLD references are only valid in UPDATE and DELETE triggers"
);
}
}
// Handle unqualified column references - they refer to NEW if available, else OLD
if let Some((idx, col_def)) = ctx.table.get_column(&col) {
if col_def.is_rowid_alias() {
*e = Expr::Variable(format!(
"{}",
ctx.new_param_map
.as_ref()
.expect("NEW parameters must be provided")
.last()
.expect("NEW parameters must be provided")
.get()
));
return Ok(());
}
if let Some(new_params) = &ctx.new_param_map {
if idx < new_params.len() {
*e = Expr::Variable(format!("{}", new_params[idx].get()));
return Ok(());
}
}
if let Some(old_params) = &ctx.old_param_map {
if idx < old_params.len() {
*e = Expr::Variable(format!("{}", old_params[idx].get()));
return Ok(());
}
}
}
}
_ => {}
}
Ok(())
}
/// Execute trigger commands by compiling them as a subprogram and emitting Program instruction
/// Returns true if there are triggers that will fire.
fn execute_trigger_commands(
program: &mut ProgramBuilder,
resolver: &mut Resolver,
trigger: &Arc<Trigger>,
ctx: &TriggerContext,
connection: &Arc<crate::Connection>,
) -> Result<bool> {
if connection.trigger_is_compiling(trigger) {
// Do not recursively compile the same trigger
return Ok(false);
}
connection.start_trigger_compilation(trigger.clone());
// Build parameter mapping: parameters are 1-indexed and sequential
// Order: [NEW values..., OLD values..., rowid]
// So if we have 2 NEW columns, 2 OLD columns: NEW params are 1,2; OLD params are 3,4; rowid is 5
let num_new = ctx.new_registers.as_ref().map(|r| r.len()).unwrap_or(0);
let new_param_map: Option<Vec<NonZero<usize>>> = ctx.new_registers.as_ref().map(|new_regs| {
(1..=new_regs.len())
.map(|i| NonZero::new(i).unwrap())
.collect()
});
let old_param_map: Option<Vec<NonZero<usize>>> = ctx.old_registers.as_ref().map(|old_regs| {
(1..=old_regs.len())
.map(|i| NonZero::new(i + num_new).unwrap())
.collect()
});
let subprogram_ctx = TriggerSubprogramContext {
new_param_map,
old_param_map,
table: ctx.table.clone(),
};
let mut subprogram_builder = ProgramBuilder::new_for_trigger(
QueryMode::Normal,
program.capture_data_changes_mode().clone(),
ProgramBuilderOpts {
num_cursors: 1,
approx_num_insns: 32,
approx_num_labels: 2,
},
trigger.clone(),
);
for command in trigger.commands.iter() {
let stmt = trigger_cmd_to_stmt_for_subprogram(command, &subprogram_ctx)?;
subprogram_builder.prologue();
subprogram_builder = translate_inner(
stmt,
resolver,
subprogram_builder,
connection,
"trigger subprogram",
)?;
}
subprogram_builder.epilogue(resolver.schema);
let built_subprogram = subprogram_builder.build(connection.clone(), true, "trigger subprogram");
let mut param_register_indices = Vec::new();
if let Some(new_regs) = &ctx.new_registers {
param_register_indices.extend(new_regs.iter().copied());
}
if let Some(old_regs) = &ctx.old_registers {
param_register_indices.extend(old_regs.iter().copied());
}
let params: Vec<crate::types::Value> = param_register_indices
.iter()
.map(|&reg_idx| {
// Use a special encoding: negative integer represents register index
// This is a hack - ideally we'd modify Program to accept register indices directly
crate::types::Value::Integer(-(reg_idx as i64 + 1))
})
.collect();
let turso_stmt = Statement::new(
built_subprogram,
connection.mv_store().cloned(),
connection.pager.load().clone(),
QueryMode::Normal,
);
program.emit_insn(Insn::Program {
params,
program: Arc::new(RwLock::new(turso_stmt)),
});
connection.end_trigger_compilation();
Ok(true)
}
/// Check if there are any triggers for a given event (regardless of time).
/// This is used during plan preparation to determine if materialization is needed.
pub fn has_relevant_triggers_type_only(
schema: &crate::schema::Schema,
event: TriggerEvent,
updated_column_indices: Option<&HashSet<usize>>,
table: &BTreeTable,
) -> bool {
let mut triggers = schema.get_triggers_for_table(table.name.as_str());
// Filter triggers by event
triggers.any(|trigger| {
// Check event matches
let event_matches = match (&trigger.event, &event) {
(TriggerEvent::Delete, TriggerEvent::Delete) => true,
(TriggerEvent::Insert, TriggerEvent::Insert) => true,
(TriggerEvent::Update, TriggerEvent::Update) => true,
(TriggerEvent::UpdateOf(trigger_cols), TriggerEvent::Update) => {
// For UPDATE OF, we need to check if any of the specified columns
// are in the UPDATE SET clause
let updated_cols =
updated_column_indices.expect("UPDATE should contain some updated columns");
// Check if any of the trigger's specified columns are being updated
trigger_cols.iter().any(|col_name| {
let normalized_col = normalize_ident(col_name.as_str());
if let Some((col_idx, _)) = table.get_column(&normalized_col) {
updated_cols.contains(&col_idx)
} else {
// Column doesn't exist - according to SQLite docs, unrecognized
// column names in UPDATE OF are silently ignored
false
}
})
}
_ => false,
};
event_matches
})
}
/// Check if there are any triggers for a given event (regardless of time).
/// This is used during plan preparation to determine if materialization is needed.
pub fn get_relevant_triggers_type_and_time<'a>(
schema: &'a crate::schema::Schema,
event: TriggerEvent,
time: TriggerTime,
updated_column_indices: Option<HashSet<usize>>,
table: &'a BTreeTable,
) -> impl Iterator<Item = Arc<Trigger>> + 'a + Clone {
let triggers = schema.get_triggers_for_table(table.name.as_str());
// Filter triggers by event
triggers
.filter(move |trigger| -> bool {
// Check event matches
let event_matches = match (&trigger.event, &event) {
(TriggerEvent::Delete, TriggerEvent::Delete) => true,
(TriggerEvent::Insert, TriggerEvent::Insert) => true,
(TriggerEvent::Update, TriggerEvent::Update) => true,
(TriggerEvent::UpdateOf(trigger_cols), TriggerEvent::Update) => {
// For UPDATE OF, we need to check if any of the specified columns
// are in the UPDATE SET clause
if let Some(ref updated_cols) = updated_column_indices {
// Check if any of the trigger's specified columns are being updated
trigger_cols.iter().any(|col_name| {
let normalized_col = normalize_ident(col_name.as_str());
if let Some((col_idx, _)) = table.get_column(&normalized_col) {
updated_cols.contains(&col_idx)
} else {
// Column doesn't exist - according to SQLite docs, unrecognized
// column names in UPDATE OF are silently ignored
false
}
})
} else {
false
}
}
_ => false,
};
if !event_matches {
return false;
}
trigger.time == time
})
.cloned()
}
#[allow(clippy::too_many_arguments)]
pub fn fire_trigger(
program: &mut ProgramBuilder,
resolver: &mut Resolver,
trigger: Arc<Trigger>,
ctx: &TriggerContext,
connection: &Arc<crate::Connection>,
) -> Result<()> {
// Evaluate WHEN clause if present
if let Some(mut when_expr) = trigger.when_clause.clone() {
// Rewrite NEW/OLD references in WHEN clause to use registers
rewrite_trigger_expr_for_when_clause(&mut when_expr, &ctx.table, ctx)?;
let when_reg = program.alloc_register();
translate_expr(program, None, &when_expr, when_reg, resolver)?;
let skip_label = program.allocate_label();
program.emit_insn(Insn::IfNot {
reg: when_reg,
jump_if_null: true,
target_pc: skip_label,
});
// Execute trigger commands if WHEN clause is true
execute_trigger_commands(program, resolver, &trigger, ctx, connection)?;
program.preassign_label_to_next_insn(skip_label);
} else {
// No WHEN clause - always execute
execute_trigger_commands(program, resolver, &trigger, ctx, connection)?;
}
Ok(())
}
/// Rewrite NEW/OLD references in WHEN clause expressions (uses Register expressions, not Variable)
fn rewrite_trigger_expr_for_when_clause(
expr: &mut ast::Expr,
table: &BTreeTable,
ctx: &TriggerContext,
) -> Result<()> {
use crate::translate::expr::walk_expr_mut;
use crate::translate::expr::WalkControl;
walk_expr_mut(expr, &mut |e: &mut ast::Expr| -> Result<WalkControl> {
match e {
Expr::Qualified(ns, col) | Expr::DoublyQualified(_, ns, col) => {
let ns = normalize_ident(ns.as_str());
let col = normalize_ident(col.as_str());
// Handle NEW.column references
if ns.eq_ignore_ascii_case("new") {
if let Some(new_regs) = &ctx.new_registers {
if let Some((idx, _)) = table.get_column(&col) {
if idx < new_regs.len() {
*e = Expr::Register(new_regs[idx]);
return Ok(WalkControl::Continue);
}
}
// Handle NEW.rowid
if crate::translate::planner::ROWID_STRS
.iter()
.any(|s| s.eq_ignore_ascii_case(&col))
{
*e = Expr::Register(
*ctx.new_registers
.as_ref()
.expect("NEW registers must be provided")
.last()
.expect("NEW registers must be provided"),
);
return Ok(WalkControl::Continue);
}
bail_parse_error!("no such column in NEW: {}", col);
} else {
bail_parse_error!(
"NEW references are only valid in INSERT and UPDATE triggers"
);
}
}
// Handle OLD.column references
if ns.eq_ignore_ascii_case("old") {
if let Some(old_regs) = &ctx.old_registers {
if let Some((idx, _)) = table.get_column(&col) {
if idx < old_regs.len() {
*e = Expr::Register(old_regs[idx]);
return Ok(WalkControl::Continue);
}
}
// Handle OLD.rowid
if crate::translate::planner::ROWID_STRS
.iter()
.any(|s| s.eq_ignore_ascii_case(&col))
{
*e = Expr::Register(
*ctx.old_registers
.as_ref()
.expect("OLD registers must be provided")
.last()
.expect("OLD registers must be provided"),
);
return Ok(WalkControl::Continue);
}
bail_parse_error!("no such column in OLD: {}", col);
} else {
bail_parse_error!(
"OLD references are only valid in UPDATE and DELETE triggers"
);
}
}
// Handle unqualified column references - they refer to NEW if available, else OLD
if let Some((idx, _)) = table.get_column(&col) {
if let Some(new_regs) = &ctx.new_registers {
if idx < new_regs.len() {
*e = Expr::Register(new_regs[idx]);
return Ok(WalkControl::Continue);
}
}
if let Some(old_regs) = &ctx.old_registers {
if idx < old_regs.len() {
*e = Expr::Register(old_regs[idx]);
return Ok(WalkControl::Continue);
}
}
}
Ok(WalkControl::Continue)
}
_ => Ok(WalkControl::Continue),
}
})?;
Ok(())
}

View File

@@ -4,19 +4,14 @@ use std::{
};
use tracing::{instrument, Level};
use turso_parser::ast::{self, TableInternalId};
use turso_parser::ast::{self, ResolveType, TableInternalId};
use crate::{
index_method::IndexMethodAttachment,
numeric::Numeric,
parameters::Parameters,
schema::{BTreeTable, Index, PseudoCursorType, Schema, Table},
translate::{
CaptureDataChangesMode, Connection, Value, VirtualTable, index_method::IndexMethodAttachment, numeric::Numeric, parameters::Parameters, schema::{BTreeTable, Index, PseudoCursorType, Schema, Table, Trigger}, translate::{
collate::CollationSeq,
emitter::TransactionMode,
plan::{ResultSetColumn, TableReferences},
},
CaptureDataChangesMode, Connection, Value, VirtualTable,
}
};
#[derive(Default)]
@@ -127,6 +122,12 @@ pub struct ProgramBuilder {
/// i.e. the individual statement may need to be aborted due to a constraint conflict, etc.
/// instead of the entire transaction.
needs_stmt_subtransactions: bool,
/// If this ProgramBuilder is building trigger subprogram, a ref to the trigger is stored here.
pub trigger: Option<Arc<Trigger>>,
/// The type of resolution to perform if a constraint violation occurs during the execution of the program.
/// At present this is required only for ignoring errors when there is an INSERT OR IGNORE statement that triggers a trigger subprogram
/// which causes a conflict.
pub resolve_type: ResolveType,
}
#[derive(Debug, Clone)]
@@ -189,6 +190,22 @@ impl ProgramBuilder {
query_mode: QueryMode,
capture_data_changes_mode: CaptureDataChangesMode,
opts: ProgramBuilderOpts,
) -> Self {
ProgramBuilder::_new(query_mode, capture_data_changes_mode, opts, None)
}
pub fn new_for_trigger(
query_mode: QueryMode,
capture_data_changes_mode: CaptureDataChangesMode,
opts: ProgramBuilderOpts,
trigger: Arc<Trigger>,
) -> Self {
ProgramBuilder::_new(query_mode, capture_data_changes_mode, opts, Some(trigger))
}
fn _new(
query_mode: QueryMode,
capture_data_changes_mode: CaptureDataChangesMode,
opts: ProgramBuilderOpts,
trigger: Option<Arc<Trigger>>,
) -> Self {
Self {
table_reference_counter: TableRefIdCounter::new(),
@@ -215,9 +232,15 @@ impl ProgramBuilder {
current_parent_explain_idx: None,
reg_result_cols_start: None,
needs_stmt_subtransactions: false,
trigger,
resolve_type: ResolveType::Abort,
}
}
pub fn set_resolve_type(&mut self, resolve_type: ResolveType) {
self.resolve_type = resolve_type;
}
pub fn set_needs_stmt_subtransactions(&mut self, needs_stmt_subtransactions: bool) {
self.needs_stmt_subtransactions = needs_stmt_subtransactions;
}
@@ -1110,6 +1133,8 @@ impl ProgramBuilder {
sql: sql.to_string(),
accesses_db: !matches!(self.txn_mode, TransactionMode::None),
needs_stmt_subtransactions: self.needs_stmt_subtransactions,
trigger: self.trigger,
resolve_type: self.resolve_type,
}
}
}

View File

@@ -39,10 +39,11 @@ use crate::{
},
translate::emitter::TransactionMode,
};
use crate::{get_cursor, CheckpointMode, Connection, DatabaseStorage, MvCursor};
use crate::{CheckpointMode, Completion, Connection, DatabaseStorage, MvCursor, StepResult, get_cursor};
use either::Either;
use std::any::Any;
use std::env::temp_dir;
use std::num::NonZero;
use std::ops::DerefMut;
use std::{
borrow::BorrowMut,
@@ -75,7 +76,7 @@ use super::{
CommitState,
};
use parking_lot::RwLock;
use turso_parser::ast::{self, ForeignKeyClause, Name};
use turso_parser::ast::{self, ForeignKeyClause, Name, ResolveType};
use turso_parser::parser::Parser;
use super::{
@@ -2643,6 +2644,105 @@ pub fn op_integer(
Ok(InsnFunctionStepResult::Step)
}
pub enum OpProgramState {
Start,
Step,
}
/// Execute a trigger subprogram (Program opcode).
pub fn op_program(
program: &Program,
state: &mut ProgramState,
insn: &Insn,
pager: &Arc<Pager>,
mv_store: Option<&Arc<MvStore>>,
) -> Result<InsnFunctionStepResult> {
load_insn!(
Program {
params,
program: subprogram,
},
insn
);
loop {
match &mut state.op_program_state {
OpProgramState::Start => {
let mut statement = subprogram.write();
statement.reset();
let Some(ref trigger) = statement.get_trigger() else {
crate::bail_parse_error!("trigger subprogram has no trigger");
};
program.connection.start_trigger_execution(trigger.clone());
// Extract register values from params (which contain register indices encoded as negative integers)
// and bind them to the subprogram's parameters
for (param_idx, param_value) in params.iter().enumerate() {
if let Value::Integer(reg_idx_encoded) = param_value {
if *reg_idx_encoded < 0 {
// This is a register index encoded as negative integer
let actual_reg_idx = (-reg_idx_encoded - 1) as usize;
if actual_reg_idx < state.registers.len() {
let value = state.registers[actual_reg_idx].get_value().clone();
let param_index = NonZero::<usize>::new(param_idx + 1).unwrap();
statement.bind_at(param_index, value);
} else {
crate::bail_corrupt_error!(
"Register index {} out of bounds (len={})",
actual_reg_idx,
state.registers.len()
);
}
} else {
crate::bail_parse_error!("Register indices for triggers should be encoded as negative integers, got {}", reg_idx_encoded);
}
} else {
crate::bail_parse_error!(
"Trigger parameters should be integers, got {:?}",
param_value
);
}
}
state.op_program_state = OpProgramState::Step;
}
OpProgramState::Step => {
loop {
let mut statement = subprogram.write();
let res = statement.step();
match res {
Ok(step_result) => match step_result {
StepResult::Done => break,
StepResult::IO => {
return Ok(InsnFunctionStepResult::IO(IOCompletions::Single(
Completion::new_yield(),
)));
}
StepResult::Row => continue,
StepResult::Interrupt | StepResult::Busy => {
return Err(LimboError::Busy);
}
},
Err(LimboError::Constraint(constraint_err)) => {
if program.resolve_type != ResolveType::Ignore {
return Err(LimboError::Constraint(constraint_err));
}
break;
}
Err(err) => {
return Err(err);
}
}
}
program.connection.end_trigger_execution();
state.op_program_state = OpProgramState::Start;
state.pc += 1;
return Ok(InsnFunctionStepResult::Step);
}
}
}
}
pub fn op_real(
program: &Program,
state: &mut ProgramState,

View File

@@ -727,6 +727,23 @@ pub fn insn_to_row(
0,
format!("r[{dest}]={value}"),
),
Insn::Program {
params,
..
} => (
"Program",
// First register that contains a param
params.first().map(|v| match v {
crate::types::Value::Integer(i) if *i < 0 => (-i - 1) as i32,
_ => 0,
}).unwrap_or(0),
// Number of registers that contain params
params.len() as i32,
0,
Value::build_text(program.sql.clone()),
0,
format!("subprogram={}", program.sql),
),
Insn::Real { value, dest } => (
"Real",
0,

View File

@@ -5,13 +5,9 @@ use std::{
use super::{execute, AggFunc, BranchOffset, CursorID, FuncCtx, InsnFunction, PageIdx};
use crate::{
schema::{BTreeTable, Column, Index},
storage::{pager::CreateBTreeFlags, wal::CheckpointMode},
translate::{collate::CollationSeq, emitter::TransactionMode},
types::KeyInfo,
vdbe::affinity::Affinity,
Value,
Statement, Value, schema::{BTreeTable, Column, Index}, storage::{pager::CreateBTreeFlags, wal::CheckpointMode}, translate::{collate::CollationSeq, emitter::TransactionMode}, types::KeyInfo, vdbe::affinity::Affinity
};
use parking_lot::RwLock;
use strum::EnumCount;
use strum_macros::{EnumDiscriminants, FromRepr, VariantArray};
use turso_macros::Description;
@@ -530,6 +526,18 @@ pub enum Insn {
can_fallthrough: bool,
},
/// Invoke a trigger subprogram.
///
/// According to SQLite documentation (https://sqlite.org/opcode.html):
/// "The Program opcode invokes the trigger subprogram. The Program instruction
/// allocates and initializes a fresh register set for each invocation of the
/// subprogram, so subprograms can be reentrant and recursive. The Param opcode
/// is used by subprograms to access content in registers of the calling bytecode program."
Program {
params: Vec<Value>,
program: Arc<RwLock<Statement>>,
},
/// Write an integer value into a register.
Integer {
value: i64,
@@ -1336,6 +1344,7 @@ impl InsnVariants {
InsnVariants::Gosub => execute::op_gosub,
InsnVariants::Return => execute::op_return,
InsnVariants::Integer => execute::op_integer,
InsnVariants::Program => execute::op_program,
InsnVariants::Real => execute::op_real,
InsnVariants::RealAffinity => execute::op_real_affinity,
InsnVariants::String8 => execute::op_string8,

View File

@@ -29,23 +29,12 @@ pub mod sorter;
pub mod value;
use crate::{
error::LimboError,
function::{AggFunc, FuncCtx},
mvcc::{database::CommitStateMachine, LocalClock},
return_if_io,
state_machine::StateMachine,
storage::{pager::PagerCommitResult, sqlite3_ondisk::SmallVec},
translate::{collate::CollationSeq, plan::TableReferences},
types::{IOCompletions, IOResult},
vdbe::{
ValueRef, error::LimboError, function::{AggFunc, FuncCtx}, mvcc::{LocalClock, database::CommitStateMachine}, return_if_io, schema::Trigger, state_machine::StateMachine, storage::{pager::PagerCommitResult, sqlite3_ondisk::SmallVec}, translate::{collate::CollationSeq, plan::TableReferences}, types::{IOCompletions, IOResult}, vdbe::{
execute::{
OpCheckpointState, OpColumnState, OpDeleteState, OpDeleteSubState, OpDestroyState,
OpIdxInsertState, OpInsertState, OpInsertSubState, OpNewRowidState, OpNoConflictState,
OpRowIdState, OpSeekState, OpTransactionState,
OpCheckpointState, OpColumnState, OpDeleteState, OpDeleteSubState, OpDestroyState, OpIdxInsertState, OpInsertState, OpInsertSubState, OpNewRowidState, OpNoConflictState, OpProgramState, OpRowIdState, OpSeekState, OpTransactionState
},
metrics::StatementMetrics,
},
ValueRef,
}
};
use crate::{
@@ -63,6 +52,7 @@ use execute::{
InsnFunction, InsnFunctionStepResult, OpIdxDeleteState, OpIntegrityCheckState,
OpOpenEphemeralState,
};
use turso_parser::ast::ResolveType;
use crate::vdbe::rowset::RowSet;
use explain::{insn_to_row_with_comment, EXPLAIN_COLUMNS, EXPLAIN_QUERY_PLAN_COLUMNS};
@@ -296,6 +286,7 @@ pub struct ProgramState {
/// Metrics collected during statement execution
pub metrics: StatementMetrics,
op_open_ephemeral_state: OpOpenEphemeralState,
op_program_state: OpProgramState,
op_new_rowid_state: OpNewRowidState,
op_idx_insert_state: OpIdxInsertState,
op_insert_state: OpInsertState,
@@ -324,6 +315,12 @@ pub struct ProgramState {
rowsets: HashMap<usize, RowSet>,
}
impl std::fmt::Debug for Program {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Program").finish()
}
}
// SAFETY: This needs to be audited for thread safety.
// See: https://github.com/tursodatabase/turso/issues/1552
unsafe impl Send for ProgramState {}
@@ -360,6 +357,7 @@ impl ProgramState {
op_integrity_check_state: OpIntegrityCheckState::Start,
metrics: StatementMetrics::new(),
op_open_ephemeral_state: OpOpenEphemeralState::Start,
op_program_state: OpProgramState::Start,
op_new_rowid_state: OpNewRowidState::Start,
op_idx_insert_state: OpIdxInsertState::MaybeSeek,
op_insert_state: OpInsertState {
@@ -605,6 +603,12 @@ pub struct Program {
/// is determined by the parser flags "mayAbort" and "isMultiWrite". Essentially this means that the individual
/// statement may need to be aborted due to a constraint conflict, etc. instead of the entire transaction.
pub needs_stmt_subtransactions: bool,
/// If this is a trigger subprogram, this is the trigger that is being executed.
pub trigger: Option<Arc<Trigger>>,
/// The type of resolution to perform if a constraint violation occurs during the execution of the program.
/// At present this is required only for ignoring errors when there is an INSERT OR IGNORE statement that triggers a trigger subprogram
/// which causes a conflict.
pub resolve_type: ResolveType,
}
impl Program {
@@ -747,7 +751,7 @@ impl Program {
return Err(LimboError::InternalError("Connection closed".to_string()));
}
if state.is_interrupted() {
self.abort(mv_store, &pager, None, &mut state.auto_txn_cleanup);
self.abort(mv_store, &pager, None, state);
return Ok(StepResult::Interrupt);
}
if let Some(io) = &state.io_completions {
@@ -757,7 +761,7 @@ impl Program {
}
if let Some(err) = io.get_error() {
let err = err.into();
self.abort(mv_store, &pager, Some(&err), &mut state.auto_txn_cleanup);
self.abort(mv_store, &pager, Some(&err), state);
return Err(err);
}
state.io_completions = None;
@@ -799,7 +803,7 @@ impl Program {
return Ok(StepResult::Busy);
}
Err(err) => {
self.abort(mv_store, &pager, Some(&err), &mut state.auto_txn_cleanup);
self.abort(mv_store, &pager, Some(&err), state);
return Err(err);
}
}
@@ -1059,10 +1063,21 @@ impl Program {
mv_store: Option<&Arc<MvStore>>,
pager: &Arc<Pager>,
err: Option<&LimboError>,
cleanup: &mut TxnCleanup,
state: &mut ProgramState,
) {
if self.is_trigger_subprogram() {
self.connection.end_trigger_execution();
}
// Errors from nested statements are handled by the parent statement.
if !self.connection.is_nested_stmt() {
if !self.connection.is_nested_stmt() && !self.is_trigger_subprogram() {
if err.is_some() {
// Any error apart from deferred FK volations causes the statement subtransaction to roll back.
let res =
state.end_statement(&self.connection, pager, EndStatement::RollbackSavepoint);
if let Err(e) = res {
tracing::error!("Error rolling back statement: {}", e);
}
}
match err {
// Transaction errors, e.g. trying to start a nested transaction, do not cause a rollback.
Some(LimboError::TxError(_)) => {}
@@ -1075,7 +1090,7 @@ impl Program {
// and op_halt.
Some(LimboError::Constraint(_)) => {}
_ => {
if *cleanup != TxnCleanup::None || err.is_some() {
if state.auto_txn_cleanup != TxnCleanup::None || err.is_some() {
if let Some(mv_store) = mv_store {
if let Some(tx_id) = self.connection.get_mv_tx_id() {
self.connection.auto_commit.store(true, Ordering::SeqCst);
@@ -1090,7 +1105,11 @@ impl Program {
}
}
}
*cleanup = TxnCleanup::None;
state.auto_txn_cleanup = TxnCleanup::None;
}
pub fn is_trigger_subprogram(&self) -> bool {
self.trigger.is_some()
}
}