add explicit tracker for Txn cleanup necessary for statement

This commit is contained in:
Nikita Sivukhin
2025-10-06 17:51:43 +04:00
parent 0ace1f9d90
commit e2f7310617
7 changed files with 62 additions and 85 deletions

View File

@@ -458,11 +458,6 @@ impl CompiledExpression {
"Expression evaluation produced unexpected row".to_string(),
));
}
crate::vdbe::execute::InsnFunctionStepResult::Interrupt => {
return Err(crate::LimboError::InternalError(
"Expression evaluation was interrupted".to_string(),
));
}
crate::vdbe::execute::InsnFunctionStepResult::Step => {
pc = state.pc as usize;
}

View File

@@ -47,7 +47,6 @@ use crate::types::{WalFrameInfo, WalState};
#[cfg(feature = "fs")]
use crate::util::{OpenMode, OpenOptions};
use crate::vdbe::metrics::ConnectionMetrics;
use crate::vdbe::PROGRAM_STATE_DONE;
use crate::vtab::VirtualTable;
use crate::{incremental::view::AllViewsTxState, translate::emitter::TransactionMode};
use core::str;
@@ -2424,6 +2423,12 @@ pub struct Statement {
busy_timeout: Option<BusyTimeout>,
}
impl Drop for Statement {
fn drop(&mut self) {
self.reset();
}
}
impl Statement {
pub fn new(
program: vdbe::Program,
@@ -2719,16 +2724,15 @@ impl Statement {
self.reset_internal(None, None);
}
pub(crate) fn mark_as_done(&self) {
self.program
.program_state
.store(PROGRAM_STATE_DONE, Ordering::SeqCst);
}
fn reset_internal(&mut self, max_registers: Option<usize>, max_cursors: Option<usize>) {
// as abort uses auto_txn_cleanup value - it needs to be called before state.reset
self.program.abort(
self.mv_store.as_ref(),
&self.pager,
None,
&mut self.state.auto_txn_cleanup,
);
self.state.reset(max_registers, max_cursors);
self.program
.abort(self.mv_store.as_ref(), &self.pager, None);
self.busy = false;
self.busy_timeout = None;
}
@@ -2746,12 +2750,6 @@ impl Statement {
}
}
impl Drop for Statement {
fn drop(&mut self) {
self.reset();
}
}
pub type Row = vdbe::Row;
pub type StepResult = vdbe::StepResult;

View File

@@ -1342,7 +1342,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
&self,
pager: Arc<Pager>,
maybe_existing_tx_id: Option<TxID>,
) -> Result<IOResult<TxID>> {
) -> Result<TxID> {
if !self.blocking_checkpoint_lock.read() {
// If there is a stop-the-world checkpoint in progress, we cannot begin any transaction at all.
return Err(LimboError::Busy);
@@ -1378,7 +1378,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
);
tracing::debug!("begin_exclusive_tx: tx_id={} succeeded", tx_id);
self.txs.insert(tx_id, tx);
Ok(IOResult::Done(tx_id))
Ok(tx_id)
}
/// Begins a new transaction in the database.

View File

@@ -16,7 +16,6 @@ use crate::{
expr::ParamState,
plan::{ResultSetColumn, TableReferences},
},
vdbe::PROGRAM_STATE_ACTIVE,
CaptureDataChangesMode, Connection, Value, VirtualTable,
};
@@ -1020,7 +1019,6 @@ impl ProgramBuilder {
table_references: self.table_references,
sql: sql.to_string(),
accesses_db: !matches!(self.txn_mode, TransactionMode::None),
program_state: PROGRAM_STATE_ACTIVE.into(),
}
}
}

View File

@@ -19,7 +19,7 @@ use crate::types::{
};
use crate::util::normalize_ident;
use crate::vdbe::insn::InsertFlags;
use crate::vdbe::registers_to_ref_values;
use crate::vdbe::{registers_to_ref_values, TxnCleanup};
use crate::vector::{vector_concat, vector_slice};
use crate::{
error::{
@@ -157,7 +157,6 @@ pub enum InsnFunctionStepResult {
Done,
IO(IOCompletions),
Row,
Interrupt,
Step,
}
@@ -2328,7 +2327,7 @@ pub fn op_transaction_inner(
| TransactionMode::Read
| TransactionMode::Concurrent => mv_store.begin_tx(pager.clone())?,
TransactionMode::Write => {
return_if_io!(mv_store.begin_exclusive_tx(pager.clone(), None))
mv_store.begin_exclusive_tx(pager.clone(), None)?
}
};
*program.connection.mv_tx.write() = Some((tx_id, *tx_mode));
@@ -2343,7 +2342,7 @@ pub fn op_transaction_inner(
if matches!(new_transaction_state, TransactionState::Write { .. })
&& matches!(actual_tx_mode, TransactionMode::Write)
{
return_if_io!(mv_store.begin_exclusive_tx(pager.clone(), Some(tx_id)));
mv_store.begin_exclusive_tx(pager.clone(), Some(tx_id))?;
}
}
} else {
@@ -2359,6 +2358,7 @@ pub fn op_transaction_inner(
"nested stmt should not begin a new read transaction"
);
pager.begin_read_tx()?;
state.auto_txn_cleanup = TxnCleanup::RollbackTxn;
}
if updated && matches!(new_transaction_state, TransactionState::Write { .. }) {
@@ -2374,6 +2374,7 @@ pub fn op_transaction_inner(
if matches!(current_state, TransactionState::None) {
pager.end_read_tx();
conn.set_tx_state(TransactionState::None);
state.auto_txn_cleanup = TxnCleanup::None;
}
assert_eq!(conn.get_tx_state(), current_state);
return Err(LimboError::Busy);
@@ -8018,15 +8019,12 @@ pub fn op_drop_column(
let schema = conn.schema.read();
for (view_name, view) in schema.views.iter() {
let view_select_sql = format!("SELECT * FROM {view_name}");
let stmt = conn.prepare(view_select_sql.as_str()).map_err(|e| {
let _ = conn.prepare(view_select_sql.as_str()).map_err(|e| {
LimboError::ParseError(format!(
"cannot drop column \"{}\": referenced in VIEW {view_name}: {}",
column_name, view.sql,
))
})?;
// this is internal statement running during active Program execution
// so, we must not interact with transaction state and explicitly mark this statement as done avoiding cleanup on reset
stmt.mark_as_done();
}
}
@@ -8152,15 +8150,12 @@ pub fn op_alter_column(
for (view_name, view) in schema.views.iter() {
let view_select_sql = format!("SELECT * FROM {view_name}");
// FIXME: this should rewrite the view to reference the new column name
let stmt = conn.prepare(view_select_sql.as_str()).map_err(|e| {
let _ = conn.prepare(view_select_sql.as_str()).map_err(|e| {
LimboError::ParseError(format!(
"cannot rename column \"{}\": referenced in VIEW {view_name}: {}",
old_column_name, view.sql,
))
})?;
// this is internal statement running during active Program execution
// so, we must not interact with transaction state and explicitly mark this statement as done avoiding cleanup on reset
stmt.mark_as_done();
}
}

View File

@@ -66,7 +66,7 @@ use std::{
collections::HashMap,
num::NonZero,
sync::{
atomic::{AtomicI64, AtomicU32, Ordering},
atomic::{AtomicI64, Ordering},
Arc,
},
};
@@ -265,6 +265,12 @@ pub struct Row {
count: usize,
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum TxnCleanup {
None,
RollbackTxn,
}
/// The program state describes the environment in which the program executes.
pub struct ProgramState {
pub io_completions: Option<IOCompletions>,
@@ -302,6 +308,10 @@ pub struct ProgramState {
op_checkpoint_state: OpCheckpointState,
/// State machine for committing view deltas with I/O handling
view_delta_state: ViewDeltaCommitState,
/// Marker which tells about auto transaction cleanup necessary for that connection in case of reset
/// This is used when statement in auto-commit mode reseted after previous uncomplete execution - in which case we may need to rollback transaction started on previous attempt
/// Note, that MVCC transactions are always explicit - so they do not update auto_txn_cleanup marker
pub(crate) auto_txn_cleanup: TxnCleanup,
}
impl ProgramState {
@@ -346,6 +356,7 @@ impl ProgramState {
op_transaction_state: OpTransactionState::Start,
op_checkpoint_state: OpCheckpointState::StartCheckpoint,
view_delta_state: ViewDeltaCommitState::NotStarted,
auto_txn_cleanup: TxnCleanup::None,
}
}
@@ -428,6 +439,7 @@ impl ProgramState {
self.op_column_state = OpColumnState::Start;
self.op_row_id_state = OpRowIdState::Start;
self.view_delta_state = ViewDeltaCommitState::NotStarted;
self.auto_txn_cleanup = TxnCleanup::None;
}
pub fn get_cursor(&mut self, cursor_id: CursorID) -> &mut Cursor {
@@ -483,10 +495,6 @@ macro_rules! get_cursor {
};
}
pub(crate) const PROGRAM_STATE_ACTIVE: u32 = 1;
pub(crate) const PROGRAM_STATE_ABORTED: u32 = 2;
pub(crate) const PROGRAM_STATE_DONE: u32 = 3;
pub struct Program {
pub max_registers: usize,
// we store original indices because we don't want to create new vec from
@@ -505,9 +513,6 @@ pub struct Program {
/// Used to determine whether we need to check for schema changes when
/// starting a transaction.
pub accesses_db: bool,
/// Current state of the program
/// Used to execute abort only once
pub program_state: AtomicU32,
}
impl Program {
@@ -648,7 +653,7 @@ impl Program {
return Err(LimboError::InternalError("Connection closed".to_string()));
}
if state.is_interrupted() {
self.abort(mv_store, &pager, None);
self.abort(mv_store, &pager, None, &mut state.auto_txn_cleanup);
return Ok(StepResult::Interrupt);
}
if let Some(io) = &state.io_completions {
@@ -657,7 +662,7 @@ impl Program {
}
if let Some(err) = io.get_error() {
let err = err.into();
self.abort(mv_store, &pager, Some(&err));
self.abort(mv_store, &pager, Some(&err), &mut state.auto_txn_cleanup);
return Err(err);
}
state.io_completions = None;
@@ -680,8 +685,7 @@ impl Program {
Ok(InsnFunctionStepResult::Done) => {
// Instruction completed execution
state.metrics.insn_executed = state.metrics.insn_executed.saturating_add(1);
self.program_state
.store(PROGRAM_STATE_DONE, Ordering::SeqCst);
state.auto_txn_cleanup = TxnCleanup::None;
return Ok(StepResult::Done);
}
Ok(InsnFunctionStepResult::IO(io)) => {
@@ -694,16 +698,12 @@ impl Program {
state.metrics.insn_executed = state.metrics.insn_executed.saturating_add(1);
return Ok(StepResult::Row);
}
Ok(InsnFunctionStepResult::Interrupt) => {
// Instruction interrupted - may resume at same PC
return Ok(StepResult::Interrupt);
}
Err(LimboError::Busy) => {
// Instruction blocked - will retry at same PC
return Ok(StepResult::Busy);
}
Err(err) => {
self.abort(mv_store, &pager, Some(&err));
self.abort(mv_store, &pager, Some(&err), &mut state.auto_txn_cleanup);
return Err(err);
}
}
@@ -964,43 +964,33 @@ impl Program {
mv_store: Option<&Arc<MvStore>>,
pager: &Arc<Pager>,
err: Option<&LimboError>,
cleanup: &mut TxnCleanup,
) {
let Ok(..) = self.program_state.compare_exchange(
PROGRAM_STATE_ACTIVE,
PROGRAM_STATE_ABORTED,
Ordering::SeqCst,
Ordering::SeqCst,
) else {
// no need to abort: program was either already aborted or executed to completion successfully
return;
};
if self.connection.is_nested_stmt.load(Ordering::SeqCst) {
// Errors from nested statements are handled by the parent statement.
return;
}
if self.connection.get_tx_state() == TransactionState::None {
return;
}
match err {
// Transaction errors, e.g. trying to start a nested transaction, do not cause a rollback.
Some(LimboError::TxError(_)) => {}
// Table locked errors, e.g. trying to checkpoint in an interactive transaction, do not cause a rollback.
Some(LimboError::TableLocked) => {}
// Busy errors do not cause a rollback.
Some(LimboError::Busy) => {}
_ => {
if let Some(mv_store) = mv_store {
if let Some(tx_id) = self.connection.get_mv_tx_id() {
self.connection.auto_commit.store(true, Ordering::SeqCst);
mv_store.rollback_tx(tx_id, pager.clone(), &self.connection);
// Errors from nested statements are handled by the parent statement.
if !self.connection.is_nested_stmt.load(Ordering::SeqCst) {
match err {
// Transaction errors, e.g. trying to start a nested transaction, do not cause a rollback.
Some(LimboError::TxError(_)) => {}
// Table locked errors, e.g. trying to checkpoint in an interactive transaction, do not cause a rollback.
Some(LimboError::TableLocked) => {}
// Busy errors do not cause a rollback.
Some(LimboError::Busy) => {}
_ => {
if *cleanup != TxnCleanup::None || err.is_some() {
if let Some(mv_store) = mv_store {
if let Some(tx_id) = self.connection.get_mv_tx_id() {
self.connection.auto_commit.store(true, Ordering::SeqCst);
mv_store.rollback_tx(tx_id, pager.clone(), &self.connection);
}
} else {
pager.rollback_tx(&self.connection);
}
self.connection.set_tx_state(TransactionState::None);
}
} else {
pager.rollback_tx(&self.connection);
}
self.connection.set_tx_state(TransactionState::None);
}
}
*cleanup = TxnCleanup::None;
}
}

View File

@@ -95,6 +95,7 @@ fn test_deferred_transaction_no_restart() {
.execute("INSERT INTO test (id, value) VALUES (2, 'second')")
.unwrap();
conn2.execute("COMMIT").unwrap();
drop(stmt);
let mut stmt = conn1.query("SELECT COUNT(*) FROM test").unwrap().unwrap();
if let StepResult::Row = stmt.step().unwrap() {