diff --git a/core/lib.rs b/core/lib.rs index a32e5497b..39c2cb1c4 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -41,6 +41,7 @@ pub mod numeric; mod numeric; use crate::index_method::IndexMethod; +use crate::schema::Trigger; use crate::storage::checksum::CHECKSUM_REQUIRED_RESERVED_BYTES; use crate::storage::encryption::AtomicCipherMode; use crate::storage::pager::{AutoVacuumMode, HeaderRef}; @@ -642,6 +643,8 @@ impl Database { view_transaction_states: AllViewsTxState::new(), metrics: RwLock::new(ConnectionMetrics::new()), nestedness: AtomicI32::new(0), + compiling_triggers: RwLock::new(Vec::new()), + executing_triggers: RwLock::new(Vec::new()), encryption_key: RwLock::new(None), encryption_cipher_mode: AtomicCipherMode::new(CipherMode::None), sync_mode: AtomicSyncMode::new(SyncMode::Full), @@ -1167,6 +1170,11 @@ pub struct Connection { /// The state is integer as we may want to spawn deep nested programs (e.g. Root -[run]-> S1 -[run]-> S2 -[run]-> ...) /// and we need to track current nestedness depth in order to properly understand when we will reach the root back again nestedness: AtomicI32, + /// Stack of currently compiling triggers to prevent recursive trigger subprogram compilation + compiling_triggers: RwLock>>, + /// Stack of currently executing triggers to prevent recursive trigger execution + /// Only prevents the same trigger from firing again, allowing different triggers on the same table to fire + executing_triggers: RwLock>>, encryption_key: RwLock>, encryption_cipher_mode: AtomicCipherMode, sync_mode: AtomicSyncMode, @@ -1212,6 +1220,52 @@ impl Connection { pub fn end_nested(&self) { self.nestedness.fetch_add(-1, Ordering::SeqCst); } + + /// Check if a specific trigger is currently compiling (for recursive trigger prevention) + pub fn trigger_is_compiling(&self, trigger: impl AsRef) -> bool { + let compiling = self.compiling_triggers.read(); + if let Some(trigger) = compiling.iter().find(|t| t.name == trigger.as_ref().name) { + tracing::debug!("Trigger is already compiling: {}", trigger.name); + return true; + } + false + } + + pub fn start_trigger_compilation(&self, trigger: Arc) { + tracing::debug!("Starting trigger compilation: {}", trigger.name); + self.compiling_triggers.write().push(trigger.clone()); + } + + pub fn end_trigger_compilation(&self) { + tracing::debug!( + "Ending trigger compilation: {:?}", + self.compiling_triggers.read().last().map(|t| &t.name) + ); + self.compiling_triggers.write().pop(); + } + + /// Check if a specific trigger is currently executing (for recursive trigger prevention) + pub fn is_trigger_executing(&self, trigger: impl AsRef) -> bool { + let executing = self.executing_triggers.read(); + if let Some(trigger) = executing.iter().find(|t| t.name == trigger.as_ref().name) { + tracing::debug!("Trigger is already executing: {}", trigger.name); + return true; + } + false + } + + pub fn start_trigger_execution(&self, trigger: Arc) { + tracing::debug!("Starting trigger execution: {}", trigger.name); + self.executing_triggers.write().push(trigger.clone()); + } + + pub fn end_trigger_execution(&self) { + tracing::debug!( + "Ending trigger execution: {:?}", + self.executing_triggers.read().last().map(|t| &t.name) + ); + self.executing_triggers.write().pop(); + } pub fn prepare(self: &Arc, sql: impl AsRef) -> Result { if self.is_mvcc_bootstrap_connection() { // Never use MV store for bootstrapping - we read state directly from sqlite_schema in the DB file. @@ -2053,6 +2107,10 @@ impl Connection { self.db.mvcc_enabled() } + pub fn mv_store(&self) -> Option<&Arc> { + self.db.mv_store.as_ref() + } + /// Query the current value(s) of `pragma_name` associated to /// `pragma_value`. /// @@ -2536,6 +2594,12 @@ pub struct Statement { busy_timeout: Option, } +impl std::fmt::Debug for Statement { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Statement").finish() + } +} + impl Drop for Statement { fn drop(&mut self) { self.reset(); @@ -2567,6 +2631,11 @@ impl Statement { busy_timeout: None, } } + + pub fn get_trigger(&self) -> Option> { + self.program.trigger.clone() + } + pub fn get_query_mode(&self) -> QueryMode { self.query_mode } @@ -2881,12 +2950,8 @@ impl Statement { fn reset_internal(&mut self, max_registers: Option, max_cursors: Option) { // as abort uses auto_txn_cleanup value - it needs to be called before state.reset - self.program.abort( - self.mv_store.as_ref(), - &self.pager, - None, - &mut self.state.auto_txn_cleanup, - ); + self.program + .abort(self.mv_store.as_ref(), &self.pager, None, &mut self.state); self.state.reset(max_registers, max_cursors); self.busy = false; self.busy_timeout = None; diff --git a/core/translate/mod.rs b/core/translate/mod.rs index ae27718d8..11beca000 100644 --- a/core/translate/mod.rs +++ b/core/translate/mod.rs @@ -36,6 +36,7 @@ pub(crate) mod select; pub(crate) mod subquery; pub(crate) mod transaction; pub(crate) mod trigger; +pub(crate) mod trigger_exec; pub(crate) mod update; pub(crate) mod upsert; mod values; diff --git a/core/translate/trigger_exec.rs b/core/translate/trigger_exec.rs new file mode 100644 index 000000000..85e9c5435 --- /dev/null +++ b/core/translate/trigger_exec.rs @@ -0,0 +1,796 @@ +use crate::schema::{BTreeTable, Trigger}; +use crate::translate::emitter::Resolver; +use crate::translate::expr::translate_expr; +use crate::translate::{translate_inner, ProgramBuilder, ProgramBuilderOpts}; +use crate::util::normalize_ident; +use crate::vdbe::insn::Insn; +use crate::{bail_parse_error, QueryMode, Result, Statement}; +use parking_lot::RwLock; +use std::collections::HashSet; +use std::num::NonZero; +use std::sync::Arc; +use turso_parser::ast::{self, Expr, TriggerEvent, TriggerTime}; + +/// Context for trigger execution +#[derive(Debug)] +pub struct TriggerContext { + /// Table the trigger is attached to + pub table: Arc, + /// NEW row registers (for INSERT/UPDATE). The last element is always the rowid. + pub new_registers: Option>, + /// OLD row registers (for UPDATE/DELETE). The last element is always the rowid. + pub old_registers: Option>, +} + +impl TriggerContext { + pub fn new( + table: Arc, + new_registers: Option>, + old_registers: Option>, + ) -> Self { + Self { + table, + new_registers, + old_registers, + } + } +} + +/// Context for compiling trigger subprograms - maps NEW/OLD to parameter indices +#[derive(Debug)] +struct TriggerSubprogramContext { + /// Map from column index to parameter index for NEW values (1-indexed) + new_param_map: Option>>, + /// Map from column index to parameter index for OLD values (1-indexed) + old_param_map: Option>>, + table: Arc, +} + +/// Rewrite NEW and OLD references in trigger expressions to use Variable instructions (parameters) +fn rewrite_trigger_expr_for_subprogram( + expr: &mut ast::Expr, + table: &BTreeTable, + ctx: &TriggerSubprogramContext, +) -> Result<()> { + use crate::translate::expr::walk_expr_mut; + use crate::translate::expr::WalkControl; + + walk_expr_mut(expr, &mut |e: &mut ast::Expr| -> Result { + match e { + Expr::Qualified(ns, col) | Expr::DoublyQualified(_, ns, col) => { + let ns = normalize_ident(ns.as_str()); + let col = normalize_ident(col.as_str()); + + // Handle NEW.column references + if ns.eq_ignore_ascii_case("new") { + if let Some(new_params) = &ctx.new_param_map { + // Check if this is a rowid alias column first + if let Some((idx, col_def)) = table.get_column(&col) { + if col_def.is_rowid_alias() { + // Rowid alias columns map to the rowid parameter, not the column register + *e = Expr::Variable(format!( + "{}", + *ctx.new_param_map + .as_ref() + .expect("NEW parameters must be provided") + .last() + .expect("NEW parameters must be provided") + )); + return Ok(WalkControl::Continue); + } + if idx < new_params.len() { + *e = Expr::Variable(format!("{}", new_params[idx].get())); + return Ok(WalkControl::Continue); + } + } + // Handle NEW.rowid + if crate::translate::planner::ROWID_STRS + .iter() + .any(|s| s.eq_ignore_ascii_case(&col)) + { + *e = Expr::Variable(format!( + "{}", + ctx.new_param_map + .as_ref() + .expect("NEW parameters must be provided") + .last() + .expect("NEW parameters must be provided") + .get() + )); + return Ok(WalkControl::Continue); + } + bail_parse_error!("no such column in NEW: {}", col); + } else { + bail_parse_error!( + "NEW references are only valid in INSERT and UPDATE triggers" + ); + } + } + + // Handle OLD.column references + if ns.eq_ignore_ascii_case("old") { + if let Some(old_params) = &ctx.old_param_map { + if let Some((idx, _)) = table.get_column(&col) { + if idx < old_params.len() { + *e = Expr::Variable(format!("{}", old_params[idx].get())); + return Ok(WalkControl::Continue); + } + } + // Handle OLD.rowid + if crate::translate::planner::ROWID_STRS + .iter() + .any(|s| s.eq_ignore_ascii_case(&col)) + { + *e = Expr::Variable(format!( + "{}", + ctx.old_param_map + .as_ref() + .expect("OLD parameters must be provided") + .last() + .expect("OLD parameters must be provided") + .get() + )); + return Ok(WalkControl::Continue); + } + bail_parse_error!("no such column in OLD: {}", col); + } else { + bail_parse_error!( + "OLD references are only valid in UPDATE and DELETE triggers" + ); + } + } + + // Handle unqualified column references - they refer to NEW if available, else OLD + if let Some((idx, _)) = table.get_column(&col) { + if let Some(new_params) = &ctx.new_param_map { + if idx < new_params.len() { + *e = Expr::Variable(format!("{}", new_params[idx].get())); + return Ok(WalkControl::Continue); + } + } + if let Some(old_params) = &ctx.old_param_map { + if idx < old_params.len() { + *e = Expr::Variable(format!("{}", old_params[idx].get())); + return Ok(WalkControl::Continue); + } + } + } + + Ok(WalkControl::Continue) + } + _ => Ok(WalkControl::Continue), + } + })?; + Ok(()) +} + +/// Convert TriggerCmd to Stmt, rewriting NEW/OLD to Variable expressions (for subprogram compilation) +fn trigger_cmd_to_stmt_for_subprogram( + cmd: &ast::TriggerCmd, + subprogram_ctx: &TriggerSubprogramContext, +) -> Result { + use turso_parser::ast::{InsertBody, QualifiedName}; + + match cmd { + ast::TriggerCmd::Insert { + or_conflict, + tbl_name, + col_names, + select, + upsert, + returning, + } => { + // Rewrite NEW/OLD references in the SELECT + let mut select_clone = select.clone(); + rewrite_expressions_in_select_for_subprogram(&mut select_clone, subprogram_ctx)?; + + let body = InsertBody::Select(select_clone, upsert.clone()); + Ok(ast::Stmt::Insert { + with: None, + or_conflict: *or_conflict, + tbl_name: QualifiedName { + db_name: None, + name: tbl_name.clone(), + alias: None, + }, + columns: col_names.clone(), + body, + returning: returning.clone(), + }) + } + ast::TriggerCmd::Update { + or_conflict, + tbl_name, + sets, + from, + where_clause, + } => { + // Rewrite NEW/OLD references in SET clauses and WHERE clause + let mut sets_clone = sets.clone(); + for set in &mut sets_clone { + rewrite_trigger_expr_for_subprogram( + &mut set.expr, + &subprogram_ctx.table, + subprogram_ctx, + )?; + } + + let mut where_clause_clone = where_clause.clone(); + if let Some(ref mut where_expr) = where_clause_clone { + rewrite_trigger_expr_for_subprogram( + where_expr, + &subprogram_ctx.table, + subprogram_ctx, + )?; + } + + Ok(ast::Stmt::Update(ast::Update { + with: None, + or_conflict: *or_conflict, + tbl_name: QualifiedName { + db_name: None, + name: tbl_name.clone(), + alias: None, + }, + indexed: None, + sets: sets_clone, + from: from.clone(), + where_clause: where_clause_clone, + returning: vec![], + order_by: vec![], + limit: None, + })) + } + ast::TriggerCmd::Delete { + tbl_name, + where_clause, + } => { + // Rewrite NEW/OLD references in WHERE clause + let mut where_clause_clone = where_clause.clone(); + if let Some(ref mut where_expr) = where_clause_clone { + rewrite_trigger_expr_for_subprogram( + where_expr, + &subprogram_ctx.table, + subprogram_ctx, + )?; + } + + Ok(ast::Stmt::Delete { + tbl_name: QualifiedName { + db_name: None, + name: tbl_name.clone(), + alias: None, + }, + where_clause: where_clause_clone, + limit: None, + returning: vec![], + indexed: None, + order_by: vec![], + with: None, + }) + } + ast::TriggerCmd::Select(select) => { + // Rewrite NEW/OLD references in the SELECT + let mut select_clone = select.clone(); + rewrite_expressions_in_select_for_subprogram(&mut select_clone, subprogram_ctx)?; + Ok(ast::Stmt::Select(select_clone)) + } + } +} + +/// Rewrite NEW/OLD references in all expressions within a SELECT statement for subprogram +fn rewrite_expressions_in_select_for_subprogram( + select: &mut ast::Select, + ctx: &TriggerSubprogramContext, +) -> Result<()> { + use crate::translate::expr::walk_expr_mut; + + // Rewrite expressions in the SELECT body + match &mut select.body.select { + ast::OneSelect::Select { + columns, + where_clause, + group_by, + .. + } => { + // Rewrite in columns + for col in columns { + if let ast::ResultColumn::Expr(ref mut expr, _) = col { + walk_expr_mut(expr, &mut |e: &mut ast::Expr| { + rewrite_trigger_expr_single_for_subprogram(e, ctx)?; + Ok(crate::translate::expr::WalkControl::Continue) + })?; + } + } + + // Rewrite in WHERE clause + if let Some(ref mut where_expr) = where_clause { + walk_expr_mut(where_expr, &mut |e: &mut ast::Expr| { + rewrite_trigger_expr_single_for_subprogram(e, ctx)?; + Ok(crate::translate::expr::WalkControl::Continue) + })?; + } + + // Rewrite in GROUP BY expressions and HAVING clause + if let Some(ref mut group_by) = group_by { + for expr in &mut group_by.exprs { + walk_expr_mut(expr, &mut |e: &mut ast::Expr| { + rewrite_trigger_expr_single_for_subprogram(e, ctx)?; + Ok(crate::translate::expr::WalkControl::Continue) + })?; + } + + // Rewrite in HAVING clause + if let Some(ref mut having_expr) = group_by.having { + walk_expr_mut(having_expr, &mut |e: &mut ast::Expr| { + rewrite_trigger_expr_single_for_subprogram(e, ctx)?; + Ok(crate::translate::expr::WalkControl::Continue) + })?; + } + } + } + ast::OneSelect::Values(values) => { + for row in values { + for expr in row { + walk_expr_mut(expr, &mut |e: &mut ast::Expr| { + rewrite_trigger_expr_single_for_subprogram(e, ctx)?; + Ok(crate::translate::expr::WalkControl::Continue) + })?; + } + } + } + } + + Ok(()) +} + +/// Rewrite a single NEW/OLD reference for subprogram (called from walk_expr_mut) +fn rewrite_trigger_expr_single_for_subprogram( + e: &mut ast::Expr, + ctx: &TriggerSubprogramContext, +) -> Result<()> { + match e { + Expr::Qualified(ns, col) | Expr::DoublyQualified(_, ns, col) => { + let ns = normalize_ident(ns.as_str()); + let col = normalize_ident(col.as_str()); + + // Handle NEW.column references + if ns.eq_ignore_ascii_case("new") { + if let Some(new_params) = &ctx.new_param_map { + if let Some((idx, col_def)) = ctx.table.get_column(&col) { + if col_def.is_rowid_alias() { + *e = Expr::Variable(format!( + "{}", + ctx.new_param_map + .as_ref() + .expect("NEW parameters must be provided") + .last() + .expect("NEW parameters must be provided") + .get() + )); + return Ok(()); + } + if idx < new_params.len() { + *e = Expr::Variable(format!("{}", new_params[idx].get())); + return Ok(()); + } + } + // Handle NEW.rowid + if crate::translate::planner::ROWID_STRS + .iter() + .any(|s| s.eq_ignore_ascii_case(&col)) + { + *e = Expr::Variable(format!( + "{}", + ctx.new_param_map + .as_ref() + .expect("NEW parameters must be provided") + .last() + .expect("NEW parameters must be provided") + .get() + )); + return Ok(()); + } + bail_parse_error!("no such column in NEW: {}", col); + } else { + bail_parse_error!( + "NEW references are only valid in INSERT and UPDATE triggers" + ); + } + } + + // Handle OLD.column references + if ns.eq_ignore_ascii_case("old") { + if let Some(old_params) = &ctx.old_param_map { + if let Some((idx, col_def)) = ctx.table.get_column(&col) { + if col_def.is_rowid_alias() { + *e = Expr::Variable(format!( + "{}", + ctx.old_param_map + .as_ref() + .expect("OLD parameters must be provided") + .last() + .expect("OLD parameters must be provided") + .get() + )); + return Ok(()); + } + if idx < old_params.len() { + *e = Expr::Variable(format!("{}", old_params[idx].get())); + return Ok(()); + } + } + // Handle OLD.rowid + if crate::translate::planner::ROWID_STRS + .iter() + .any(|s| s.eq_ignore_ascii_case(&col)) + { + *e = Expr::Variable(format!( + "{}", + ctx.old_param_map + .as_ref() + .expect("OLD parameters must be provided") + .last() + .expect("OLD parameters must be provided") + .get() + )); + return Ok(()); + } + bail_parse_error!("no such column in OLD: {}", col); + } else { + bail_parse_error!( + "OLD references are only valid in UPDATE and DELETE triggers" + ); + } + } + + // Handle unqualified column references - they refer to NEW if available, else OLD + if let Some((idx, col_def)) = ctx.table.get_column(&col) { + if col_def.is_rowid_alias() { + *e = Expr::Variable(format!( + "{}", + ctx.new_param_map + .as_ref() + .expect("NEW parameters must be provided") + .last() + .expect("NEW parameters must be provided") + .get() + )); + return Ok(()); + } + if let Some(new_params) = &ctx.new_param_map { + if idx < new_params.len() { + *e = Expr::Variable(format!("{}", new_params[idx].get())); + return Ok(()); + } + } + if let Some(old_params) = &ctx.old_param_map { + if idx < old_params.len() { + *e = Expr::Variable(format!("{}", old_params[idx].get())); + return Ok(()); + } + } + } + } + _ => {} + } + Ok(()) +} + +/// Execute trigger commands by compiling them as a subprogram and emitting Program instruction +/// Returns true if there are triggers that will fire. +fn execute_trigger_commands( + program: &mut ProgramBuilder, + resolver: &mut Resolver, + trigger: &Arc, + ctx: &TriggerContext, + connection: &Arc, +) -> Result { + if connection.trigger_is_compiling(trigger) { + // Do not recursively compile the same trigger + return Ok(false); + } + connection.start_trigger_compilation(trigger.clone()); + // Build parameter mapping: parameters are 1-indexed and sequential + // Order: [NEW values..., OLD values..., rowid] + // So if we have 2 NEW columns, 2 OLD columns: NEW params are 1,2; OLD params are 3,4; rowid is 5 + let num_new = ctx.new_registers.as_ref().map(|r| r.len()).unwrap_or(0); + + let new_param_map: Option>> = ctx.new_registers.as_ref().map(|new_regs| { + (1..=new_regs.len()) + .map(|i| NonZero::new(i).unwrap()) + .collect() + }); + + let old_param_map: Option>> = ctx.old_registers.as_ref().map(|old_regs| { + (1..=old_regs.len()) + .map(|i| NonZero::new(i + num_new).unwrap()) + .collect() + }); + let subprogram_ctx = TriggerSubprogramContext { + new_param_map, + old_param_map, + table: ctx.table.clone(), + }; + let mut subprogram_builder = ProgramBuilder::new_for_trigger( + QueryMode::Normal, + program.capture_data_changes_mode().clone(), + ProgramBuilderOpts { + num_cursors: 1, + approx_num_insns: 32, + approx_num_labels: 2, + }, + trigger.clone(), + ); + for command in trigger.commands.iter() { + let stmt = trigger_cmd_to_stmt_for_subprogram(command, &subprogram_ctx)?; + subprogram_builder.prologue(); + subprogram_builder = translate_inner( + stmt, + resolver, + subprogram_builder, + connection, + "trigger subprogram", + )?; + } + subprogram_builder.epilogue(resolver.schema); + let built_subprogram = subprogram_builder.build(connection.clone(), true, "trigger subprogram"); + + let mut param_register_indices = Vec::new(); + if let Some(new_regs) = &ctx.new_registers { + param_register_indices.extend(new_regs.iter().copied()); + } + if let Some(old_regs) = &ctx.old_registers { + param_register_indices.extend(old_regs.iter().copied()); + } + + let params: Vec = param_register_indices + .iter() + .map(|®_idx| { + // Use a special encoding: negative integer represents register index + // This is a hack - ideally we'd modify Program to accept register indices directly + crate::types::Value::Integer(-(reg_idx as i64 + 1)) + }) + .collect(); + + let turso_stmt = Statement::new( + built_subprogram, + connection.mv_store().cloned(), + connection.pager.load().clone(), + QueryMode::Normal, + ); + program.emit_insn(Insn::Program { + params, + program: Arc::new(RwLock::new(turso_stmt)), + }); + connection.end_trigger_compilation(); + + Ok(true) +} + +/// Check if there are any triggers for a given event (regardless of time). +/// This is used during plan preparation to determine if materialization is needed. +pub fn has_relevant_triggers_type_only( + schema: &crate::schema::Schema, + event: TriggerEvent, + updated_column_indices: Option<&HashSet>, + table: &BTreeTable, +) -> bool { + let mut triggers = schema.get_triggers_for_table(table.name.as_str()); + + // Filter triggers by event + triggers.any(|trigger| { + // Check event matches + let event_matches = match (&trigger.event, &event) { + (TriggerEvent::Delete, TriggerEvent::Delete) => true, + (TriggerEvent::Insert, TriggerEvent::Insert) => true, + (TriggerEvent::Update, TriggerEvent::Update) => true, + (TriggerEvent::UpdateOf(trigger_cols), TriggerEvent::Update) => { + // For UPDATE OF, we need to check if any of the specified columns + // are in the UPDATE SET clause + let updated_cols = + updated_column_indices.expect("UPDATE should contain some updated columns"); + // Check if any of the trigger's specified columns are being updated + trigger_cols.iter().any(|col_name| { + let normalized_col = normalize_ident(col_name.as_str()); + if let Some((col_idx, _)) = table.get_column(&normalized_col) { + updated_cols.contains(&col_idx) + } else { + // Column doesn't exist - according to SQLite docs, unrecognized + // column names in UPDATE OF are silently ignored + false + } + }) + } + _ => false, + }; + + event_matches + }) +} + +/// Check if there are any triggers for a given event (regardless of time). +/// This is used during plan preparation to determine if materialization is needed. +pub fn get_relevant_triggers_type_and_time<'a>( + schema: &'a crate::schema::Schema, + event: TriggerEvent, + time: TriggerTime, + updated_column_indices: Option>, + table: &'a BTreeTable, +) -> impl Iterator> + 'a + Clone { + let triggers = schema.get_triggers_for_table(table.name.as_str()); + + // Filter triggers by event + triggers + .filter(move |trigger| -> bool { + // Check event matches + let event_matches = match (&trigger.event, &event) { + (TriggerEvent::Delete, TriggerEvent::Delete) => true, + (TriggerEvent::Insert, TriggerEvent::Insert) => true, + (TriggerEvent::Update, TriggerEvent::Update) => true, + (TriggerEvent::UpdateOf(trigger_cols), TriggerEvent::Update) => { + // For UPDATE OF, we need to check if any of the specified columns + // are in the UPDATE SET clause + if let Some(ref updated_cols) = updated_column_indices { + // Check if any of the trigger's specified columns are being updated + trigger_cols.iter().any(|col_name| { + let normalized_col = normalize_ident(col_name.as_str()); + if let Some((col_idx, _)) = table.get_column(&normalized_col) { + updated_cols.contains(&col_idx) + } else { + // Column doesn't exist - according to SQLite docs, unrecognized + // column names in UPDATE OF are silently ignored + false + } + }) + } else { + false + } + } + _ => false, + }; + + if !event_matches { + return false; + } + + trigger.time == time + }) + .cloned() +} + +#[allow(clippy::too_many_arguments)] +pub fn fire_trigger( + program: &mut ProgramBuilder, + resolver: &mut Resolver, + trigger: Arc, + ctx: &TriggerContext, + connection: &Arc, +) -> Result<()> { + // Evaluate WHEN clause if present + if let Some(mut when_expr) = trigger.when_clause.clone() { + // Rewrite NEW/OLD references in WHEN clause to use registers + rewrite_trigger_expr_for_when_clause(&mut when_expr, &ctx.table, ctx)?; + + let when_reg = program.alloc_register(); + translate_expr(program, None, &when_expr, when_reg, resolver)?; + + let skip_label = program.allocate_label(); + program.emit_insn(Insn::IfNot { + reg: when_reg, + jump_if_null: true, + target_pc: skip_label, + }); + + // Execute trigger commands if WHEN clause is true + execute_trigger_commands(program, resolver, &trigger, ctx, connection)?; + + program.preassign_label_to_next_insn(skip_label); + } else { + // No WHEN clause - always execute + execute_trigger_commands(program, resolver, &trigger, ctx, connection)?; + } + + Ok(()) +} + +/// Rewrite NEW/OLD references in WHEN clause expressions (uses Register expressions, not Variable) +fn rewrite_trigger_expr_for_when_clause( + expr: &mut ast::Expr, + table: &BTreeTable, + ctx: &TriggerContext, +) -> Result<()> { + use crate::translate::expr::walk_expr_mut; + use crate::translate::expr::WalkControl; + + walk_expr_mut(expr, &mut |e: &mut ast::Expr| -> Result { + match e { + Expr::Qualified(ns, col) | Expr::DoublyQualified(_, ns, col) => { + let ns = normalize_ident(ns.as_str()); + let col = normalize_ident(col.as_str()); + + // Handle NEW.column references + if ns.eq_ignore_ascii_case("new") { + if let Some(new_regs) = &ctx.new_registers { + if let Some((idx, _)) = table.get_column(&col) { + if idx < new_regs.len() { + *e = Expr::Register(new_regs[idx]); + return Ok(WalkControl::Continue); + } + } + // Handle NEW.rowid + if crate::translate::planner::ROWID_STRS + .iter() + .any(|s| s.eq_ignore_ascii_case(&col)) + { + *e = Expr::Register( + *ctx.new_registers + .as_ref() + .expect("NEW registers must be provided") + .last() + .expect("NEW registers must be provided"), + ); + return Ok(WalkControl::Continue); + } + bail_parse_error!("no such column in NEW: {}", col); + } else { + bail_parse_error!( + "NEW references are only valid in INSERT and UPDATE triggers" + ); + } + } + + // Handle OLD.column references + if ns.eq_ignore_ascii_case("old") { + if let Some(old_regs) = &ctx.old_registers { + if let Some((idx, _)) = table.get_column(&col) { + if idx < old_regs.len() { + *e = Expr::Register(old_regs[idx]); + return Ok(WalkControl::Continue); + } + } + // Handle OLD.rowid + if crate::translate::planner::ROWID_STRS + .iter() + .any(|s| s.eq_ignore_ascii_case(&col)) + { + *e = Expr::Register( + *ctx.old_registers + .as_ref() + .expect("OLD registers must be provided") + .last() + .expect("OLD registers must be provided"), + ); + return Ok(WalkControl::Continue); + } + bail_parse_error!("no such column in OLD: {}", col); + } else { + bail_parse_error!( + "OLD references are only valid in UPDATE and DELETE triggers" + ); + } + } + + // Handle unqualified column references - they refer to NEW if available, else OLD + if let Some((idx, _)) = table.get_column(&col) { + if let Some(new_regs) = &ctx.new_registers { + if idx < new_regs.len() { + *e = Expr::Register(new_regs[idx]); + return Ok(WalkControl::Continue); + } + } + if let Some(old_regs) = &ctx.old_registers { + if idx < old_regs.len() { + *e = Expr::Register(old_regs[idx]); + return Ok(WalkControl::Continue); + } + } + } + + Ok(WalkControl::Continue) + } + _ => Ok(WalkControl::Continue), + } + })?; + Ok(()) +} diff --git a/core/vdbe/builder.rs b/core/vdbe/builder.rs index 8b522dfea..6df882fdf 100644 --- a/core/vdbe/builder.rs +++ b/core/vdbe/builder.rs @@ -4,19 +4,14 @@ use std::{ }; use tracing::{instrument, Level}; -use turso_parser::ast::{self, TableInternalId}; +use turso_parser::ast::{self, ResolveType, TableInternalId}; use crate::{ - index_method::IndexMethodAttachment, - numeric::Numeric, - parameters::Parameters, - schema::{BTreeTable, Index, PseudoCursorType, Schema, Table}, - translate::{ + CaptureDataChangesMode, Connection, Value, VirtualTable, index_method::IndexMethodAttachment, numeric::Numeric, parameters::Parameters, schema::{BTreeTable, Index, PseudoCursorType, Schema, Table, Trigger}, translate::{ collate::CollationSeq, emitter::TransactionMode, plan::{ResultSetColumn, TableReferences}, - }, - CaptureDataChangesMode, Connection, Value, VirtualTable, + } }; #[derive(Default)] @@ -127,6 +122,12 @@ pub struct ProgramBuilder { /// i.e. the individual statement may need to be aborted due to a constraint conflict, etc. /// instead of the entire transaction. needs_stmt_subtransactions: bool, + /// If this ProgramBuilder is building trigger subprogram, a ref to the trigger is stored here. + pub trigger: Option>, + /// The type of resolution to perform if a constraint violation occurs during the execution of the program. + /// At present this is required only for ignoring errors when there is an INSERT OR IGNORE statement that triggers a trigger subprogram + /// which causes a conflict. + pub resolve_type: ResolveType, } #[derive(Debug, Clone)] @@ -189,6 +190,22 @@ impl ProgramBuilder { query_mode: QueryMode, capture_data_changes_mode: CaptureDataChangesMode, opts: ProgramBuilderOpts, + ) -> Self { + ProgramBuilder::_new(query_mode, capture_data_changes_mode, opts, None) + } + pub fn new_for_trigger( + query_mode: QueryMode, + capture_data_changes_mode: CaptureDataChangesMode, + opts: ProgramBuilderOpts, + trigger: Arc, + ) -> Self { + ProgramBuilder::_new(query_mode, capture_data_changes_mode, opts, Some(trigger)) + } + fn _new( + query_mode: QueryMode, + capture_data_changes_mode: CaptureDataChangesMode, + opts: ProgramBuilderOpts, + trigger: Option>, ) -> Self { Self { table_reference_counter: TableRefIdCounter::new(), @@ -215,9 +232,15 @@ impl ProgramBuilder { current_parent_explain_idx: None, reg_result_cols_start: None, needs_stmt_subtransactions: false, + trigger, + resolve_type: ResolveType::Abort, } } + pub fn set_resolve_type(&mut self, resolve_type: ResolveType) { + self.resolve_type = resolve_type; + } + pub fn set_needs_stmt_subtransactions(&mut self, needs_stmt_subtransactions: bool) { self.needs_stmt_subtransactions = needs_stmt_subtransactions; } @@ -1110,6 +1133,8 @@ impl ProgramBuilder { sql: sql.to_string(), accesses_db: !matches!(self.txn_mode, TransactionMode::None), needs_stmt_subtransactions: self.needs_stmt_subtransactions, + trigger: self.trigger, + resolve_type: self.resolve_type, } } } diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 8f94213fe..c7fbdc008 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -39,10 +39,11 @@ use crate::{ }, translate::emitter::TransactionMode, }; -use crate::{get_cursor, CheckpointMode, Connection, DatabaseStorage, MvCursor}; +use crate::{CheckpointMode, Completion, Connection, DatabaseStorage, MvCursor, StepResult, get_cursor}; use either::Either; use std::any::Any; use std::env::temp_dir; +use std::num::NonZero; use std::ops::DerefMut; use std::{ borrow::BorrowMut, @@ -75,7 +76,7 @@ use super::{ CommitState, }; use parking_lot::RwLock; -use turso_parser::ast::{self, ForeignKeyClause, Name}; +use turso_parser::ast::{self, ForeignKeyClause, Name, ResolveType}; use turso_parser::parser::Parser; use super::{ @@ -2643,6 +2644,105 @@ pub fn op_integer( Ok(InsnFunctionStepResult::Step) } +pub enum OpProgramState { + Start, + Step, +} + +/// Execute a trigger subprogram (Program opcode). +pub fn op_program( + program: &Program, + state: &mut ProgramState, + insn: &Insn, + pager: &Arc, + mv_store: Option<&Arc>, +) -> Result { + load_insn!( + Program { + params, + program: subprogram, + }, + insn + ); + loop { + match &mut state.op_program_state { + OpProgramState::Start => { + let mut statement = subprogram.write(); + statement.reset(); + let Some(ref trigger) = statement.get_trigger() else { + crate::bail_parse_error!("trigger subprogram has no trigger"); + }; + program.connection.start_trigger_execution(trigger.clone()); + + // Extract register values from params (which contain register indices encoded as negative integers) + // and bind them to the subprogram's parameters + for (param_idx, param_value) in params.iter().enumerate() { + if let Value::Integer(reg_idx_encoded) = param_value { + if *reg_idx_encoded < 0 { + // This is a register index encoded as negative integer + let actual_reg_idx = (-reg_idx_encoded - 1) as usize; + if actual_reg_idx < state.registers.len() { + let value = state.registers[actual_reg_idx].get_value().clone(); + let param_index = NonZero::::new(param_idx + 1).unwrap(); + statement.bind_at(param_index, value); + } else { + crate::bail_corrupt_error!( + "Register index {} out of bounds (len={})", + actual_reg_idx, + state.registers.len() + ); + } + } else { + crate::bail_parse_error!("Register indices for triggers should be encoded as negative integers, got {}", reg_idx_encoded); + } + } else { + crate::bail_parse_error!( + "Trigger parameters should be integers, got {:?}", + param_value + ); + } + } + + state.op_program_state = OpProgramState::Step; + } + OpProgramState::Step => { + loop { + let mut statement = subprogram.write(); + let res = statement.step(); + match res { + Ok(step_result) => match step_result { + StepResult::Done => break, + StepResult::IO => { + return Ok(InsnFunctionStepResult::IO(IOCompletions::Single( + Completion::new_yield(), + ))); + } + StepResult::Row => continue, + StepResult::Interrupt | StepResult::Busy => { + return Err(LimboError::Busy); + } + }, + Err(LimboError::Constraint(constraint_err)) => { + if program.resolve_type != ResolveType::Ignore { + return Err(LimboError::Constraint(constraint_err)); + } + break; + } + Err(err) => { + return Err(err); + } + } + } + program.connection.end_trigger_execution(); + + state.op_program_state = OpProgramState::Start; + state.pc += 1; + return Ok(InsnFunctionStepResult::Step); + } + } + } +} + pub fn op_real( program: &Program, state: &mut ProgramState, diff --git a/core/vdbe/explain.rs b/core/vdbe/explain.rs index 457dbe7ba..938343940 100644 --- a/core/vdbe/explain.rs +++ b/core/vdbe/explain.rs @@ -727,6 +727,23 @@ pub fn insn_to_row( 0, format!("r[{dest}]={value}"), ), + Insn::Program { + params, + .. + } => ( + "Program", + // First register that contains a param + params.first().map(|v| match v { + crate::types::Value::Integer(i) if *i < 0 => (-i - 1) as i32, + _ => 0, + }).unwrap_or(0), + // Number of registers that contain params + params.len() as i32, + 0, + Value::build_text(program.sql.clone()), + 0, + format!("subprogram={}", program.sql), + ), Insn::Real { value, dest } => ( "Real", 0, diff --git a/core/vdbe/insn.rs b/core/vdbe/insn.rs index 9a9358d9f..9f4cd1f94 100644 --- a/core/vdbe/insn.rs +++ b/core/vdbe/insn.rs @@ -5,13 +5,9 @@ use std::{ use super::{execute, AggFunc, BranchOffset, CursorID, FuncCtx, InsnFunction, PageIdx}; use crate::{ - schema::{BTreeTable, Column, Index}, - storage::{pager::CreateBTreeFlags, wal::CheckpointMode}, - translate::{collate::CollationSeq, emitter::TransactionMode}, - types::KeyInfo, - vdbe::affinity::Affinity, - Value, + Statement, Value, schema::{BTreeTable, Column, Index}, storage::{pager::CreateBTreeFlags, wal::CheckpointMode}, translate::{collate::CollationSeq, emitter::TransactionMode}, types::KeyInfo, vdbe::affinity::Affinity }; +use parking_lot::RwLock; use strum::EnumCount; use strum_macros::{EnumDiscriminants, FromRepr, VariantArray}; use turso_macros::Description; @@ -530,6 +526,18 @@ pub enum Insn { can_fallthrough: bool, }, + /// Invoke a trigger subprogram. + /// + /// According to SQLite documentation (https://sqlite.org/opcode.html): + /// "The Program opcode invokes the trigger subprogram. The Program instruction + /// allocates and initializes a fresh register set for each invocation of the + /// subprogram, so subprograms can be reentrant and recursive. The Param opcode + /// is used by subprograms to access content in registers of the calling bytecode program." + Program { + params: Vec, + program: Arc>, + }, + /// Write an integer value into a register. Integer { value: i64, @@ -1336,6 +1344,7 @@ impl InsnVariants { InsnVariants::Gosub => execute::op_gosub, InsnVariants::Return => execute::op_return, InsnVariants::Integer => execute::op_integer, + InsnVariants::Program => execute::op_program, InsnVariants::Real => execute::op_real, InsnVariants::RealAffinity => execute::op_real_affinity, InsnVariants::String8 => execute::op_string8, diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index bebb41aaa..6ab9cb326 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -29,23 +29,12 @@ pub mod sorter; pub mod value; use crate::{ - error::LimboError, - function::{AggFunc, FuncCtx}, - mvcc::{database::CommitStateMachine, LocalClock}, - return_if_io, - state_machine::StateMachine, - storage::{pager::PagerCommitResult, sqlite3_ondisk::SmallVec}, - translate::{collate::CollationSeq, plan::TableReferences}, - types::{IOCompletions, IOResult}, - vdbe::{ + ValueRef, error::LimboError, function::{AggFunc, FuncCtx}, mvcc::{LocalClock, database::CommitStateMachine}, return_if_io, schema::Trigger, state_machine::StateMachine, storage::{pager::PagerCommitResult, sqlite3_ondisk::SmallVec}, translate::{collate::CollationSeq, plan::TableReferences}, types::{IOCompletions, IOResult}, vdbe::{ execute::{ - OpCheckpointState, OpColumnState, OpDeleteState, OpDeleteSubState, OpDestroyState, - OpIdxInsertState, OpInsertState, OpInsertSubState, OpNewRowidState, OpNoConflictState, - OpRowIdState, OpSeekState, OpTransactionState, + OpCheckpointState, OpColumnState, OpDeleteState, OpDeleteSubState, OpDestroyState, OpIdxInsertState, OpInsertState, OpInsertSubState, OpNewRowidState, OpNoConflictState, OpProgramState, OpRowIdState, OpSeekState, OpTransactionState }, metrics::StatementMetrics, - }, - ValueRef, + } }; use crate::{ @@ -63,6 +52,7 @@ use execute::{ InsnFunction, InsnFunctionStepResult, OpIdxDeleteState, OpIntegrityCheckState, OpOpenEphemeralState, }; +use turso_parser::ast::ResolveType; use crate::vdbe::rowset::RowSet; use explain::{insn_to_row_with_comment, EXPLAIN_COLUMNS, EXPLAIN_QUERY_PLAN_COLUMNS}; @@ -296,6 +286,7 @@ pub struct ProgramState { /// Metrics collected during statement execution pub metrics: StatementMetrics, op_open_ephemeral_state: OpOpenEphemeralState, + op_program_state: OpProgramState, op_new_rowid_state: OpNewRowidState, op_idx_insert_state: OpIdxInsertState, op_insert_state: OpInsertState, @@ -324,6 +315,12 @@ pub struct ProgramState { rowsets: HashMap, } +impl std::fmt::Debug for Program { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Program").finish() + } +} + // SAFETY: This needs to be audited for thread safety. // See: https://github.com/tursodatabase/turso/issues/1552 unsafe impl Send for ProgramState {} @@ -360,6 +357,7 @@ impl ProgramState { op_integrity_check_state: OpIntegrityCheckState::Start, metrics: StatementMetrics::new(), op_open_ephemeral_state: OpOpenEphemeralState::Start, + op_program_state: OpProgramState::Start, op_new_rowid_state: OpNewRowidState::Start, op_idx_insert_state: OpIdxInsertState::MaybeSeek, op_insert_state: OpInsertState { @@ -605,6 +603,12 @@ pub struct Program { /// is determined by the parser flags "mayAbort" and "isMultiWrite". Essentially this means that the individual /// statement may need to be aborted due to a constraint conflict, etc. instead of the entire transaction. pub needs_stmt_subtransactions: bool, + /// If this is a trigger subprogram, this is the trigger that is being executed. + pub trigger: Option>, + /// The type of resolution to perform if a constraint violation occurs during the execution of the program. + /// At present this is required only for ignoring errors when there is an INSERT OR IGNORE statement that triggers a trigger subprogram + /// which causes a conflict. + pub resolve_type: ResolveType, } impl Program { @@ -747,7 +751,7 @@ impl Program { return Err(LimboError::InternalError("Connection closed".to_string())); } if state.is_interrupted() { - self.abort(mv_store, &pager, None, &mut state.auto_txn_cleanup); + self.abort(mv_store, &pager, None, state); return Ok(StepResult::Interrupt); } if let Some(io) = &state.io_completions { @@ -757,7 +761,7 @@ impl Program { } if let Some(err) = io.get_error() { let err = err.into(); - self.abort(mv_store, &pager, Some(&err), &mut state.auto_txn_cleanup); + self.abort(mv_store, &pager, Some(&err), state); return Err(err); } state.io_completions = None; @@ -799,7 +803,7 @@ impl Program { return Ok(StepResult::Busy); } Err(err) => { - self.abort(mv_store, &pager, Some(&err), &mut state.auto_txn_cleanup); + self.abort(mv_store, &pager, Some(&err), state); return Err(err); } } @@ -1059,10 +1063,21 @@ impl Program { mv_store: Option<&Arc>, pager: &Arc, err: Option<&LimboError>, - cleanup: &mut TxnCleanup, + state: &mut ProgramState, ) { + if self.is_trigger_subprogram() { + self.connection.end_trigger_execution(); + } // Errors from nested statements are handled by the parent statement. - if !self.connection.is_nested_stmt() { + if !self.connection.is_nested_stmt() && !self.is_trigger_subprogram() { + if err.is_some() { + // Any error apart from deferred FK volations causes the statement subtransaction to roll back. + let res = + state.end_statement(&self.connection, pager, EndStatement::RollbackSavepoint); + if let Err(e) = res { + tracing::error!("Error rolling back statement: {}", e); + } + } match err { // Transaction errors, e.g. trying to start a nested transaction, do not cause a rollback. Some(LimboError::TxError(_)) => {} @@ -1075,7 +1090,7 @@ impl Program { // and op_halt. Some(LimboError::Constraint(_)) => {} _ => { - if *cleanup != TxnCleanup::None || err.is_some() { + if state.auto_txn_cleanup != TxnCleanup::None || err.is_some() { if let Some(mv_store) = mv_store { if let Some(tx_id) = self.connection.get_mv_tx_id() { self.connection.auto_commit.store(true, Ordering::SeqCst); @@ -1090,7 +1105,11 @@ impl Program { } } } - *cleanup = TxnCleanup::None; + state.auto_txn_cleanup = TxnCleanup::None; + } + + pub fn is_trigger_subprogram(&self) -> bool { + self.trigger.is_some() } }