From e2f73106177aa7f52fd7bea84f562edb37daa8e0 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Mon, 6 Oct 2025 17:51:43 +0400 Subject: [PATCH] add explicit tracker for Txn cleanup necessary for statement --- core/incremental/expr_compiler.rs | 5 -- core/lib.rs | 28 +++--- core/mvcc/database/mod.rs | 4 +- core/vdbe/builder.rs | 2 - core/vdbe/execute.rs | 19 ++-- core/vdbe/mod.rs | 88 ++++++++----------- .../query_processing/test_transactions.rs | 1 + 7 files changed, 62 insertions(+), 85 deletions(-) diff --git a/core/incremental/expr_compiler.rs b/core/incremental/expr_compiler.rs index 44b2cef49..f823c870d 100644 --- a/core/incremental/expr_compiler.rs +++ b/core/incremental/expr_compiler.rs @@ -458,11 +458,6 @@ impl CompiledExpression { "Expression evaluation produced unexpected row".to_string(), )); } - crate::vdbe::execute::InsnFunctionStepResult::Interrupt => { - return Err(crate::LimboError::InternalError( - "Expression evaluation was interrupted".to_string(), - )); - } crate::vdbe::execute::InsnFunctionStepResult::Step => { pc = state.pc as usize; } diff --git a/core/lib.rs b/core/lib.rs index f331a8952..44297643b 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -47,7 +47,6 @@ use crate::types::{WalFrameInfo, WalState}; #[cfg(feature = "fs")] use crate::util::{OpenMode, OpenOptions}; use crate::vdbe::metrics::ConnectionMetrics; -use crate::vdbe::PROGRAM_STATE_DONE; use crate::vtab::VirtualTable; use crate::{incremental::view::AllViewsTxState, translate::emitter::TransactionMode}; use core::str; @@ -2424,6 +2423,12 @@ pub struct Statement { busy_timeout: Option, } +impl Drop for Statement { + fn drop(&mut self) { + self.reset(); + } +} + impl Statement { pub fn new( program: vdbe::Program, @@ -2719,16 +2724,15 @@ impl Statement { self.reset_internal(None, None); } - pub(crate) fn mark_as_done(&self) { - self.program - .program_state - .store(PROGRAM_STATE_DONE, Ordering::SeqCst); - } - fn reset_internal(&mut self, max_registers: Option, max_cursors: Option) { + // as abort uses auto_txn_cleanup value - it needs to be called before state.reset + self.program.abort( + self.mv_store.as_ref(), + &self.pager, + None, + &mut self.state.auto_txn_cleanup, + ); self.state.reset(max_registers, max_cursors); - self.program - .abort(self.mv_store.as_ref(), &self.pager, None); self.busy = false; self.busy_timeout = None; } @@ -2746,12 +2750,6 @@ impl Statement { } } -impl Drop for Statement { - fn drop(&mut self) { - self.reset(); - } -} - pub type Row = vdbe::Row; pub type StepResult = vdbe::StepResult; diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index a03fba7ba..d9fd32f4b 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -1342,7 +1342,7 @@ impl MvStore { &self, pager: Arc, maybe_existing_tx_id: Option, - ) -> Result> { + ) -> Result { if !self.blocking_checkpoint_lock.read() { // If there is a stop-the-world checkpoint in progress, we cannot begin any transaction at all. return Err(LimboError::Busy); @@ -1378,7 +1378,7 @@ impl MvStore { ); tracing::debug!("begin_exclusive_tx: tx_id={} succeeded", tx_id); self.txs.insert(tx_id, tx); - Ok(IOResult::Done(tx_id)) + Ok(tx_id) } /// Begins a new transaction in the database. diff --git a/core/vdbe/builder.rs b/core/vdbe/builder.rs index c2b6741aa..3d1a333ec 100644 --- a/core/vdbe/builder.rs +++ b/core/vdbe/builder.rs @@ -16,7 +16,6 @@ use crate::{ expr::ParamState, plan::{ResultSetColumn, TableReferences}, }, - vdbe::PROGRAM_STATE_ACTIVE, CaptureDataChangesMode, Connection, Value, VirtualTable, }; @@ -1020,7 +1019,6 @@ impl ProgramBuilder { table_references: self.table_references, sql: sql.to_string(), accesses_db: !matches!(self.txn_mode, TransactionMode::None), - program_state: PROGRAM_STATE_ACTIVE.into(), } } } diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index d3ef412f1..01def8029 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -19,7 +19,7 @@ use crate::types::{ }; use crate::util::normalize_ident; use crate::vdbe::insn::InsertFlags; -use crate::vdbe::registers_to_ref_values; +use crate::vdbe::{registers_to_ref_values, TxnCleanup}; use crate::vector::{vector_concat, vector_slice}; use crate::{ error::{ @@ -157,7 +157,6 @@ pub enum InsnFunctionStepResult { Done, IO(IOCompletions), Row, - Interrupt, Step, } @@ -2328,7 +2327,7 @@ pub fn op_transaction_inner( | TransactionMode::Read | TransactionMode::Concurrent => mv_store.begin_tx(pager.clone())?, TransactionMode::Write => { - return_if_io!(mv_store.begin_exclusive_tx(pager.clone(), None)) + mv_store.begin_exclusive_tx(pager.clone(), None)? } }; *program.connection.mv_tx.write() = Some((tx_id, *tx_mode)); @@ -2343,7 +2342,7 @@ pub fn op_transaction_inner( if matches!(new_transaction_state, TransactionState::Write { .. }) && matches!(actual_tx_mode, TransactionMode::Write) { - return_if_io!(mv_store.begin_exclusive_tx(pager.clone(), Some(tx_id))); + mv_store.begin_exclusive_tx(pager.clone(), Some(tx_id))?; } } } else { @@ -2359,6 +2358,7 @@ pub fn op_transaction_inner( "nested stmt should not begin a new read transaction" ); pager.begin_read_tx()?; + state.auto_txn_cleanup = TxnCleanup::RollbackTxn; } if updated && matches!(new_transaction_state, TransactionState::Write { .. }) { @@ -2374,6 +2374,7 @@ pub fn op_transaction_inner( if matches!(current_state, TransactionState::None) { pager.end_read_tx(); conn.set_tx_state(TransactionState::None); + state.auto_txn_cleanup = TxnCleanup::None; } assert_eq!(conn.get_tx_state(), current_state); return Err(LimboError::Busy); @@ -8018,15 +8019,12 @@ pub fn op_drop_column( let schema = conn.schema.read(); for (view_name, view) in schema.views.iter() { let view_select_sql = format!("SELECT * FROM {view_name}"); - let stmt = conn.prepare(view_select_sql.as_str()).map_err(|e| { + let _ = conn.prepare(view_select_sql.as_str()).map_err(|e| { LimboError::ParseError(format!( "cannot drop column \"{}\": referenced in VIEW {view_name}: {}", column_name, view.sql, )) })?; - // this is internal statement running during active Program execution - // so, we must not interact with transaction state and explicitly mark this statement as done avoiding cleanup on reset - stmt.mark_as_done(); } } @@ -8152,15 +8150,12 @@ pub fn op_alter_column( for (view_name, view) in schema.views.iter() { let view_select_sql = format!("SELECT * FROM {view_name}"); // FIXME: this should rewrite the view to reference the new column name - let stmt = conn.prepare(view_select_sql.as_str()).map_err(|e| { + let _ = conn.prepare(view_select_sql.as_str()).map_err(|e| { LimboError::ParseError(format!( "cannot rename column \"{}\": referenced in VIEW {view_name}: {}", old_column_name, view.sql, )) })?; - // this is internal statement running during active Program execution - // so, we must not interact with transaction state and explicitly mark this statement as done avoiding cleanup on reset - stmt.mark_as_done(); } } diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index ed09762e1..4b11c8c1d 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -66,7 +66,7 @@ use std::{ collections::HashMap, num::NonZero, sync::{ - atomic::{AtomicI64, AtomicU32, Ordering}, + atomic::{AtomicI64, Ordering}, Arc, }, }; @@ -265,6 +265,12 @@ pub struct Row { count: usize, } +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum TxnCleanup { + None, + RollbackTxn, +} + /// The program state describes the environment in which the program executes. pub struct ProgramState { pub io_completions: Option, @@ -302,6 +308,10 @@ pub struct ProgramState { op_checkpoint_state: OpCheckpointState, /// State machine for committing view deltas with I/O handling view_delta_state: ViewDeltaCommitState, + /// Marker which tells about auto transaction cleanup necessary for that connection in case of reset + /// This is used when statement in auto-commit mode reseted after previous uncomplete execution - in which case we may need to rollback transaction started on previous attempt + /// Note, that MVCC transactions are always explicit - so they do not update auto_txn_cleanup marker + pub(crate) auto_txn_cleanup: TxnCleanup, } impl ProgramState { @@ -346,6 +356,7 @@ impl ProgramState { op_transaction_state: OpTransactionState::Start, op_checkpoint_state: OpCheckpointState::StartCheckpoint, view_delta_state: ViewDeltaCommitState::NotStarted, + auto_txn_cleanup: TxnCleanup::None, } } @@ -428,6 +439,7 @@ impl ProgramState { self.op_column_state = OpColumnState::Start; self.op_row_id_state = OpRowIdState::Start; self.view_delta_state = ViewDeltaCommitState::NotStarted; + self.auto_txn_cleanup = TxnCleanup::None; } pub fn get_cursor(&mut self, cursor_id: CursorID) -> &mut Cursor { @@ -483,10 +495,6 @@ macro_rules! get_cursor { }; } -pub(crate) const PROGRAM_STATE_ACTIVE: u32 = 1; -pub(crate) const PROGRAM_STATE_ABORTED: u32 = 2; -pub(crate) const PROGRAM_STATE_DONE: u32 = 3; - pub struct Program { pub max_registers: usize, // we store original indices because we don't want to create new vec from @@ -505,9 +513,6 @@ pub struct Program { /// Used to determine whether we need to check for schema changes when /// starting a transaction. pub accesses_db: bool, - /// Current state of the program - /// Used to execute abort only once - pub program_state: AtomicU32, } impl Program { @@ -648,7 +653,7 @@ impl Program { return Err(LimboError::InternalError("Connection closed".to_string())); } if state.is_interrupted() { - self.abort(mv_store, &pager, None); + self.abort(mv_store, &pager, None, &mut state.auto_txn_cleanup); return Ok(StepResult::Interrupt); } if let Some(io) = &state.io_completions { @@ -657,7 +662,7 @@ impl Program { } if let Some(err) = io.get_error() { let err = err.into(); - self.abort(mv_store, &pager, Some(&err)); + self.abort(mv_store, &pager, Some(&err), &mut state.auto_txn_cleanup); return Err(err); } state.io_completions = None; @@ -680,8 +685,7 @@ impl Program { Ok(InsnFunctionStepResult::Done) => { // Instruction completed execution state.metrics.insn_executed = state.metrics.insn_executed.saturating_add(1); - self.program_state - .store(PROGRAM_STATE_DONE, Ordering::SeqCst); + state.auto_txn_cleanup = TxnCleanup::None; return Ok(StepResult::Done); } Ok(InsnFunctionStepResult::IO(io)) => { @@ -694,16 +698,12 @@ impl Program { state.metrics.insn_executed = state.metrics.insn_executed.saturating_add(1); return Ok(StepResult::Row); } - Ok(InsnFunctionStepResult::Interrupt) => { - // Instruction interrupted - may resume at same PC - return Ok(StepResult::Interrupt); - } Err(LimboError::Busy) => { // Instruction blocked - will retry at same PC return Ok(StepResult::Busy); } Err(err) => { - self.abort(mv_store, &pager, Some(&err)); + self.abort(mv_store, &pager, Some(&err), &mut state.auto_txn_cleanup); return Err(err); } } @@ -964,43 +964,33 @@ impl Program { mv_store: Option<&Arc>, pager: &Arc, err: Option<&LimboError>, + cleanup: &mut TxnCleanup, ) { - let Ok(..) = self.program_state.compare_exchange( - PROGRAM_STATE_ACTIVE, - PROGRAM_STATE_ABORTED, - Ordering::SeqCst, - Ordering::SeqCst, - ) else { - // no need to abort: program was either already aborted or executed to completion successfully - return; - }; - - if self.connection.is_nested_stmt.load(Ordering::SeqCst) { - // Errors from nested statements are handled by the parent statement. - return; - } - if self.connection.get_tx_state() == TransactionState::None { - return; - } - match err { - // Transaction errors, e.g. trying to start a nested transaction, do not cause a rollback. - Some(LimboError::TxError(_)) => {} - // Table locked errors, e.g. trying to checkpoint in an interactive transaction, do not cause a rollback. - Some(LimboError::TableLocked) => {} - // Busy errors do not cause a rollback. - Some(LimboError::Busy) => {} - _ => { - if let Some(mv_store) = mv_store { - if let Some(tx_id) = self.connection.get_mv_tx_id() { - self.connection.auto_commit.store(true, Ordering::SeqCst); - mv_store.rollback_tx(tx_id, pager.clone(), &self.connection); + // Errors from nested statements are handled by the parent statement. + if !self.connection.is_nested_stmt.load(Ordering::SeqCst) { + match err { + // Transaction errors, e.g. trying to start a nested transaction, do not cause a rollback. + Some(LimboError::TxError(_)) => {} + // Table locked errors, e.g. trying to checkpoint in an interactive transaction, do not cause a rollback. + Some(LimboError::TableLocked) => {} + // Busy errors do not cause a rollback. + Some(LimboError::Busy) => {} + _ => { + if *cleanup != TxnCleanup::None || err.is_some() { + if let Some(mv_store) = mv_store { + if let Some(tx_id) = self.connection.get_mv_tx_id() { + self.connection.auto_commit.store(true, Ordering::SeqCst); + mv_store.rollback_tx(tx_id, pager.clone(), &self.connection); + } + } else { + pager.rollback_tx(&self.connection); + } + self.connection.set_tx_state(TransactionState::None); } - } else { - pager.rollback_tx(&self.connection); } - self.connection.set_tx_state(TransactionState::None); } } + *cleanup = TxnCleanup::None; } } diff --git a/tests/integration/query_processing/test_transactions.rs b/tests/integration/query_processing/test_transactions.rs index 53ab7f00e..a96235153 100644 --- a/tests/integration/query_processing/test_transactions.rs +++ b/tests/integration/query_processing/test_transactions.rs @@ -95,6 +95,7 @@ fn test_deferred_transaction_no_restart() { .execute("INSERT INTO test (id, value) VALUES (2, 'second')") .unwrap(); conn2.execute("COMMIT").unwrap(); + drop(stmt); let mut stmt = conn1.query("SELECT COUNT(*) FROM test").unwrap().unwrap(); if let StepResult::Row = stmt.step().unwrap() {