From 38d263096915636cd942f6f81935233e3be07d29 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Mon, 6 Oct 2025 12:15:15 +0400 Subject: [PATCH 1/8] remove unnecessary SchemaLocked error - lock() return error in case when another thread panicked while holding the same lock - we better to just panic too in any such case --- core/error.rs | 2 -- core/lib.rs | 50 +++++++++++++-------------------------- core/mvcc/database/mod.rs | 4 ++-- core/storage/pager.rs | 4 ++-- 4 files changed, 21 insertions(+), 39 deletions(-) diff --git a/core/error.rs b/core/error.rs index 3dd4841ad..76bac45f0 100644 --- a/core/error.rs +++ b/core/error.rs @@ -49,8 +49,6 @@ pub enum LimboError { ExtensionError(String), #[error("Runtime error: integer overflow")] IntegerOverflow, - #[error("Schema is locked for write")] - SchemaLocked, #[error("Runtime error: database table is locked")] TableLocked, #[error("Error: Resource is read-only")] diff --git a/core/lib.rs b/core/lib.rs index 11b85be81..29450c471 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -558,12 +558,7 @@ impl Database { let conn = Arc::new(Connection { db: self.clone(), pager: RwLock::new(pager), - schema: RwLock::new( - self.schema - .lock() - .map_err(|_| LimboError::SchemaLocked)? - .clone(), - ), + schema: RwLock::new(self.schema.lock().unwrap().clone()), database_schemas: RwLock::new(std::collections::HashMap::new()), auto_commit: AtomicBool::new(true), transaction_state: RwLock::new(TransactionState::None), @@ -835,17 +830,17 @@ impl Database { #[inline] pub(crate) fn with_schema_mut(&self, f: impl FnOnce(&mut Schema) -> Result) -> Result { - let mut schema_ref = self.schema.lock().map_err(|_| LimboError::SchemaLocked)?; + let mut schema_ref = self.schema.lock().unwrap(); let schema = Arc::make_mut(&mut *schema_ref); f(schema) } - pub(crate) fn clone_schema(&self) -> Result> { - let schema = self.schema.lock().map_err(|_| LimboError::SchemaLocked)?; - Ok(schema.clone()) + pub(crate) fn clone_schema(&self) -> Arc { + let schema = self.schema.lock().unwrap(); + schema.clone() } - pub(crate) fn update_schema_if_newer(&self, another: Arc) -> Result<()> { - let mut schema = self.schema.lock().map_err(|_| LimboError::SchemaLocked)?; + pub(crate) fn update_schema_if_newer(&self, another: Arc) { + let mut schema = self.schema.lock().unwrap(); if schema.schema_version < another.schema_version { tracing::debug!( "DB schema is outdated: {} < {}", @@ -860,7 +855,6 @@ impl Database { another.schema_version ); } - Ok(()) } pub fn get_mv_store(&self) -> Option<&Arc> { @@ -1154,7 +1148,7 @@ impl Connection { let input = str::from_utf8(&sql.as_bytes()[..byte_offset_end]) .unwrap() .trim(); - self.maybe_update_schema()?; + self.maybe_update_schema(); let pager = self.pager.read().clone(); let mode = QueryMode::new(&cmd); let (Cmd::Stmt(stmt) | Cmd::Explain(stmt) | Cmd::ExplainQueryPlan(stmt)) = cmd; @@ -1248,7 +1242,8 @@ impl Connection { reparse_result?; let schema = self.schema.read().clone(); - self.db.update_schema_if_newer(schema) + self.db.update_schema_if_newer(schema); + Ok(()) } fn reparse_schema(self: &Arc) -> Result<()> { @@ -1303,7 +1298,7 @@ impl Connection { "The supplied SQL string contains no statements".to_string(), )); } - self.maybe_update_schema()?; + self.maybe_update_schema(); let sql = sql.as_ref(); tracing::trace!("Preparing and executing batch: {}", sql); let mut parser = Parser::new(sql.as_bytes()); @@ -1337,7 +1332,7 @@ impl Connection { return Err(LimboError::InternalError("Connection closed".to_string())); } let sql = sql.as_ref(); - self.maybe_update_schema()?; + self.maybe_update_schema(); tracing::trace!("Querying: {}", sql); let mut parser = Parser::new(sql.as_bytes()); let cmd = parser.next_cmd()?; @@ -1389,7 +1384,7 @@ impl Connection { return Err(LimboError::InternalError("Connection closed".to_string())); } let sql = sql.as_ref(); - self.maybe_update_schema()?; + self.maybe_update_schema(); let mut parser = Parser::new(sql.as_bytes()); while let Some(cmd) = parser.next_cmd()? { let syms = self.syms.read(); @@ -1540,20 +1535,14 @@ impl Connection { Ok(db) } - pub fn maybe_update_schema(&self) -> Result<()> { + pub fn maybe_update_schema(&self) { let current_schema_version = self.schema.read().schema_version; - let schema = self - .db - .schema - .lock() - .map_err(|_| LimboError::SchemaLocked)?; + let schema = self.db.schema.lock().unwrap(); if matches!(self.get_tx_state(), TransactionState::None) && current_schema_version != schema.schema_version { *self.schema.write() = schema.clone(); } - - Ok(()) } /// Read schema version at current transaction @@ -2075,12 +2064,7 @@ impl Connection { ))); } - let use_indexes = self - .db - .schema - .lock() - .map_err(|_| LimboError::SchemaLocked)? - .indexes_enabled(); + let use_indexes = self.db.schema.lock().unwrap().indexes_enabled(); let use_mvcc = self.db.mv_store.is_some(); let use_views = self.db.experimental_views_enabled(); let use_strict = self.db.experimental_strict_enabled(); @@ -2598,7 +2582,7 @@ impl Statement { fn reprepare(&mut self) -> Result<()> { tracing::trace!("repreparing statement"); let conn = self.program.connection.clone(); - *conn.schema.write() = conn.db.clone_schema()?; + *conn.schema.write() = conn.db.clone_schema(); self.program = { let mut parser = Parser::new(self.program.sql.as_bytes()); let cmd = parser.next_cmd()?; diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index f00690aff..45b7cb7e8 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -654,7 +654,7 @@ impl StateTransition for CommitStateMachine { let schema_did_change = self.did_commit_schema_change; if schema_did_change { let schema = connection.schema.read().clone(); - connection.db.update_schema_if_newer(schema)?; + connection.db.update_schema_if_newer(schema); } let tx = mvcc_store.txs.get(&self.tx_id).unwrap(); let tx_unlocked = tx.value(); @@ -1606,7 +1606,7 @@ impl MvStore { > connection.db.schema.lock().unwrap().schema_version { // Connection made schema changes during tx and rolled back -> revert connection-local schema. - *connection.schema.write() = connection.db.clone_schema()?; + *connection.schema.write() = connection.db.clone_schema(); } let tx = tx_unlocked.value(); diff --git a/core/storage/pager.rs b/core/storage/pager.rs index bf11ef1d3..f2bde3d04 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -1198,7 +1198,7 @@ impl Pager { if schema_did_change { let schema = connection.schema.read().clone(); - connection.db.update_schema_if_newer(schema)?; + connection.db.update_schema_if_newer(schema); } Ok(IOResult::Done(commit_status)) } @@ -2411,7 +2411,7 @@ impl Pager { } self.reset_internal_states(); if schema_did_change { - *connection.schema.write() = connection.db.clone_schema()?; + *connection.schema.write() = connection.db.clone_schema(); } if is_write { if let Some(wal) = self.wal.as_ref() { From 8dae601fac879cd28a5fc04326cae7db7db17fc5 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Mon, 6 Oct 2025 13:21:45 +0400 Subject: [PATCH 2/8] make rollback non-failing method --- core/benches/mvcc_benchmark.rs | 3 +- core/lib.rs | 33 ++++------- .../mvcc/database/checkpoint_state_machine.rs | 10 +--- core/mvcc/database/mod.rs | 9 +-- core/mvcc/database/tests.rs | 6 +- core/schema.rs | 2 +- core/storage/btree.rs | 18 +++--- core/storage/pager.rs | 59 ++++++++++--------- core/storage/wal.rs | 9 ++- core/vdbe/execute.rs | 6 +- core/vdbe/mod.rs | 19 +++--- 11 files changed, 77 insertions(+), 97 deletions(-) diff --git a/core/benches/mvcc_benchmark.rs b/core/benches/mvcc_benchmark.rs index 0ebd33fa5..7d316707d 100644 --- a/core/benches/mvcc_benchmark.rs +++ b/core/benches/mvcc_benchmark.rs @@ -36,8 +36,7 @@ fn bench(c: &mut Criterion) { let conn = db.conn.clone(); let tx_id = db.mvcc_store.begin_tx(conn.get_pager().clone()).unwrap(); db.mvcc_store - .rollback_tx(tx_id, conn.get_pager().clone(), &conn) - .unwrap(); + .rollback_tx(tx_id, conn.get_pager().clone(), &conn); }) }); diff --git a/core/lib.rs b/core/lib.rs index 29450c471..f45715dc2 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -498,7 +498,7 @@ impl Database { let result = schema .make_from_btree(None, pager.clone(), &syms) .or_else(|e| { - pager.end_read_tx()?; + pager.end_read_tx(); Err(e) }); if let Err(LimboError::ExtensionError(e)) = result { @@ -1195,11 +1195,11 @@ impl Connection { 0 } Err(err) => { - pager.end_read_tx().expect("read txn must be finished"); + pager.end_read_tx(); return Err(err); } }; - pager.end_read_tx().expect("read txn must be finished"); + pager.end_read_tx(); let db_schema_version = self.db.schema.lock().unwrap().schema_version; tracing::debug!( @@ -1236,7 +1236,7 @@ impl Connection { // close opened transaction if it was kept open // (in most cases, it will be automatically closed if stmt was executed properly) if previous == TransactionState::Read { - pager.end_read_tx().expect("read txn must be finished"); + pager.end_read_tx(); } reparse_result?; @@ -1654,7 +1654,7 @@ impl Connection { let pager = self.pager.read(); pager.begin_read_tx()?; pager.io.block(|| pager.begin_write_tx()).inspect_err(|_| { - pager.end_read_tx().expect("read txn must be closed"); + pager.end_read_tx(); })?; // start write transaction and disable auto-commit mode as SQL can be executed within WAL session (at caller own risk) @@ -1702,13 +1702,11 @@ impl Connection { wal.end_read_tx(); } - let rollback_err = if !force_commit { + if !force_commit { // remove all non-commited changes in case if WAL session left some suffix without commit frame - pager.rollback(false, self, true).err() - } else { - None - }; - if let Some(err) = commit_err.or(rollback_err) { + pager.rollback(false, self, true); + } + if let Some(err) = commit_err { return Err(err); } } @@ -1752,12 +1750,7 @@ impl Connection { _ => { if !self.mvcc_enabled() { let pager = self.pager.read(); - pager.io.block(|| { - pager.end_tx( - true, // rollback = true for close - self, - ) - })?; + pager.rollback_tx(self); } self.set_tx_state(TransactionState::None); } @@ -2632,12 +2625,8 @@ impl Statement { } let state = self.program.connection.get_tx_state(); if let TransactionState::Write { .. } = state { - let end_tx_res = self.pager.end_tx(true, &self.program.connection)?; + self.pager.rollback_tx(&self.program.connection); self.program.connection.set_tx_state(TransactionState::None); - assert!( - matches!(end_tx_res, IOResult::Done(_)), - "end_tx should not return IO as it should just end txn without flushing anything. Got {end_tx_res:?}" - ); } } res diff --git a/core/mvcc/database/checkpoint_state_machine.rs b/core/mvcc/database/checkpoint_state_machine.rs index a207d5ad2..fee93c2d8 100644 --- a/core/mvcc/database/checkpoint_state_machine.rs +++ b/core/mvcc/database/checkpoint_state_machine.rs @@ -548,7 +548,7 @@ impl CheckpointStateMachine { CheckpointState::CommitPagerTxn => { tracing::debug!("Committing pager transaction"); - let result = self.pager.end_tx(false, &self.connection)?; + let result = self.pager.commit_tx(&self.connection)?; match result { IOResult::Done(_) => { self.state = CheckpointState::TruncateLogicalLog; @@ -642,16 +642,12 @@ impl StateTransition for CheckpointStateMachine { Err(err) => { tracing::info!("Error in checkpoint state machine: {err}"); if self.lock_states.pager_write_tx { - let rollback = true; - self.pager - .io - .block(|| self.pager.end_tx(rollback, self.connection.as_ref())) - .expect("failed to end pager write tx"); + self.pager.rollback_tx(self.connection.as_ref()); if self.update_transaction_state { *self.connection.transaction_state.write() = TransactionState::None; } } else if self.lock_states.pager_read_tx { - self.pager.end_read_tx().unwrap(); + self.pager.end_read_tx(); if self.update_transaction_state { *self.connection.transaction_state.write() = TransactionState::None; } diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index 45b7cb7e8..a03fba7ba 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -1566,12 +1566,7 @@ impl MvStore { /// # Arguments /// /// * `tx_id` - The ID of the transaction to abort. - pub fn rollback_tx( - &self, - tx_id: TxID, - _pager: Arc, - connection: &Connection, - ) -> Result<()> { + pub fn rollback_tx(&self, tx_id: TxID, _pager: Arc, connection: &Connection) { let tx_unlocked = self.txs.get(&tx_id).unwrap(); let tx = tx_unlocked.value(); *connection.mv_tx.write() = None; @@ -1615,8 +1610,6 @@ impl MvStore { // FIXME: verify that we can already remove the transaction here! // Maybe it's fine for snapshot isolation, but too early for serializable? self.remove_tx(tx_id); - - Ok(()) } /// Returns true if the given transaction is the exclusive transaction. diff --git a/core/mvcc/database/tests.rs b/core/mvcc/database/tests.rs index 35a45e728..e559bda52 100644 --- a/core/mvcc/database/tests.rs +++ b/core/mvcc/database/tests.rs @@ -347,8 +347,7 @@ fn test_rollback() { .unwrap(); assert_eq!(row3, row4); db.mvcc_store - .rollback_tx(tx1, db.conn.pager.read().clone(), &db.conn) - .unwrap(); + .rollback_tx(tx1, db.conn.pager.read().clone(), &db.conn); let tx2 = db .mvcc_store .begin_tx(db.conn.pager.read().clone()) @@ -592,8 +591,7 @@ fn test_lost_update() { )); // hack: in the actual tursodb database we rollback the mvcc tx ourselves, so manually roll it back here db.mvcc_store - .rollback_tx(tx3, conn3.pager.read().clone(), &conn3) - .unwrap(); + .rollback_tx(tx3, conn3.pager.read().clone(), &conn3); commit_tx(db.mvcc_store.clone(), &conn2, tx2).unwrap(); assert!(matches!( diff --git a/core/schema.rs b/core/schema.rs index 5188f962f..9208c5d1d 100644 --- a/core/schema.rs +++ b/core/schema.rs @@ -472,7 +472,7 @@ impl Schema { pager.io.block(|| cursor.next())?; } - pager.end_read_tx()?; + pager.end_read_tx(); self.populate_indices(from_sql_indexes, automatic_indices)?; diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 0dfaff8c1..9ab4c689e 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -8183,7 +8183,7 @@ mod tests { // force allocate page1 with a transaction pager.begin_read_tx().unwrap(); run_until_done(|| pager.begin_write_tx(), &pager).unwrap(); - run_until_done(|| pager.end_tx(false, &conn), &pager).unwrap(); + run_until_done(|| pager.commit_tx(&conn), &pager).unwrap(); let page2 = run_until_done(|| pager.allocate_page(), &pager).unwrap(); btree_init_page(&page2, PageType::TableLeaf, 0, pager.usable_space()); @@ -8495,7 +8495,7 @@ mod tests { pager.deref(), ) .unwrap(); - pager.io.block(|| pager.end_tx(false, &conn)).unwrap(); + pager.io.block(|| pager.commit_tx(&conn)).unwrap(); pager.begin_read_tx().unwrap(); // FIXME: add sorted vector instead, should be okay for small amounts of keys for now :P, too lazy to fix right now let _c = cursor.move_to_root().unwrap(); @@ -8524,7 +8524,7 @@ mod tests { println!("btree after:\n{btree_after}"); panic!("invalid btree"); } - pager.end_read_tx().unwrap(); + pager.end_read_tx(); } pager.begin_read_tx().unwrap(); tracing::info!( @@ -8546,7 +8546,7 @@ mod tests { "key {key} is not found, got {cursor_rowid}" ); } - pager.end_read_tx().unwrap(); + pager.end_read_tx(); } } @@ -8641,7 +8641,7 @@ mod tests { if let Some(c) = c { pager.io.wait_for_completion(c).unwrap(); } - pager.io.block(|| pager.end_tx(false, &conn)).unwrap(); + pager.io.block(|| pager.commit_tx(&conn)).unwrap(); } // Check that all keys can be found by seeking @@ -8702,7 +8702,7 @@ mod tests { } prev = Some(cur); } - pager.end_read_tx().unwrap(); + pager.end_read_tx(); } } @@ -8848,7 +8848,7 @@ mod tests { if let Some(c) = c { pager.io.wait_for_completion(c).unwrap(); } - pager.io.block(|| pager.end_tx(false, &conn)).unwrap(); + pager.io.block(|| pager.commit_tx(&conn)).unwrap(); } // Final validation @@ -8856,7 +8856,7 @@ mod tests { sorted_keys.sort(); validate_expected_keys(&pager, &mut cursor, &sorted_keys, seed); - pager.end_read_tx().unwrap(); + pager.end_read_tx(); } } @@ -8939,7 +8939,7 @@ mod tests { "key {key:?} is not found, seed: {seed}" ); } - pager.end_read_tx().unwrap(); + pager.end_read_tx(); } #[test] diff --git a/core/storage/pager.rs b/core/storage/pager.rs index f2bde3d04..32f9e834e 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -1161,33 +1161,20 @@ impl Pager { } #[instrument(skip_all, level = Level::DEBUG)] - pub fn end_tx( - &self, - rollback: bool, - connection: &Connection, - ) -> Result> { + pub fn commit_tx(&self, connection: &Connection) -> Result> { if connection.is_nested_stmt.load(Ordering::SeqCst) { // Parent statement will handle the transaction rollback. return Ok(IOResult::Done(PagerCommitResult::Rollback)); } - tracing::trace!("end_tx(rollback={})", rollback); let Some(wal) = self.wal.as_ref() else { // TODO: Unsure what the semantics of "end_tx" is for in-memory databases, ephemeral tables and ephemeral indexes. return Ok(IOResult::Done(PagerCommitResult::Rollback)); }; - let (is_write, schema_did_change) = match connection.get_tx_state() { + let (_, schema_did_change) = match connection.get_tx_state() { TransactionState::Write { schema_did_change } => (true, schema_did_change), _ => (false, false), }; - tracing::trace!("end_tx(schema_did_change={})", schema_did_change); - if rollback { - if is_write { - wal.borrow().end_write_tx(); - } - wal.borrow().end_read_tx(); - self.rollback(schema_did_change, connection, is_write)?; - return Ok(IOResult::Done(PagerCommitResult::Rollback)); - } + tracing::trace!("commit_tx(schema_did_change={})", schema_did_change); let commit_status = return_if_io!(self.commit_dirty_pages( connection.is_wal_auto_checkpoint_disabled(), connection.get_sync_mode(), @@ -1204,12 +1191,33 @@ impl Pager { } #[instrument(skip_all, level = Level::DEBUG)] - pub fn end_read_tx(&self) -> Result<()> { + pub fn rollback_tx(&self, connection: &Connection) { + if connection.is_nested_stmt.load(Ordering::SeqCst) { + // Parent statement will handle the transaction rollback. + return; + } let Some(wal) = self.wal.as_ref() else { - return Ok(()); + // TODO: Unsure what the semantics of "end_tx" is for in-memory databases, ephemeral tables and ephemeral indexes. + return; + }; + let (is_write, schema_did_change) = match connection.get_tx_state() { + TransactionState::Write { schema_did_change } => (true, schema_did_change), + _ => (false, false), + }; + tracing::trace!("rollback_tx(schema_did_change={})", schema_did_change); + if is_write { + wal.borrow().end_write_tx(); + } + wal.borrow().end_read_tx(); + self.rollback(schema_did_change, connection, is_write); + } + + #[instrument(skip_all, level = Level::DEBUG)] + pub fn end_read_tx(&self) { + let Some(wal) = self.wal.as_ref() else { + return; }; wal.borrow().end_read_tx(); - Ok(()) } /// Reads a page from disk (either WAL or DB file) bypassing page-cache @@ -2393,12 +2401,7 @@ impl Pager { } #[instrument(skip_all, level = Level::DEBUG)] - pub fn rollback( - &self, - schema_did_change: bool, - connection: &Connection, - is_write: bool, - ) -> Result<(), LimboError> { + pub fn rollback(&self, schema_did_change: bool, connection: &Connection, is_write: bool) { tracing::debug!(schema_did_change); self.clear_page_cache(); if is_write { @@ -2415,11 +2418,9 @@ impl Pager { } if is_write { if let Some(wal) = self.wal.as_ref() { - wal.borrow_mut().rollback()?; + wal.borrow_mut().rollback(); } } - - Ok(()) } fn reset_internal_states(&self) { @@ -2764,7 +2765,7 @@ mod ptrmap_tests { use super::*; use crate::io::{MemoryIO, OpenFlags, IO}; use crate::storage::buffer_pool::BufferPool; - use crate::storage::database::{DatabaseFile, DatabaseStorage}; + use crate::storage::database::DatabaseFile; use crate::storage::page_cache::PageCache; use crate::storage::pager::Pager; use crate::storage::sqlite3_ondisk::PageSize; diff --git a/core/storage/wal.rs b/core/storage/wal.rs index c590219bd..1f55e1cc0 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -302,7 +302,7 @@ pub trait Wal: Debug { fn get_checkpoint_seq(&self) -> u32; fn get_max_frame(&self) -> u64; fn get_min_frame(&self) -> u64; - fn rollback(&mut self) -> Result<()>; + fn rollback(&mut self); /// Return unique set of pages changed **after** frame_watermark position and until current WAL session max_frame_no fn changed_pages_after(&self, frame_watermark: u64) -> Result>; @@ -1351,8 +1351,8 @@ impl Wal for WalFile { self.min_frame.load(Ordering::Acquire) } - #[instrument(err, skip_all, level = Level::DEBUG)] - fn rollback(&mut self) -> Result<()> { + #[instrument(skip_all, level = Level::DEBUG)] + fn rollback(&mut self) { let (max_frame, last_checksum) = { let shared = self.get_shared(); let max_frame = shared.max_frame.load(Ordering::Acquire); @@ -1369,7 +1369,6 @@ impl Wal for WalFile { self.last_checksum = last_checksum; self.max_frame.store(max_frame, Ordering::Release); self.reset_internal_states(); - Ok(()) } #[instrument(skip_all, level = Level::DEBUG)] @@ -2825,7 +2824,7 @@ pub mod test { } } drop(w); - conn2.pager.write().end_read_tx().unwrap(); + conn2.pager.write().end_read_tx(); conn1 .execute("create table test(id integer primary key, value text)") diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 87fbaec0a..ec5d6f3f9 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -2372,7 +2372,7 @@ pub fn op_transaction_inner( // That is, if the transaction had not started, end the read transaction so that next time we // start a new one. if matches!(current_state, TransactionState::None) { - pager.end_read_tx()?; + pager.end_read_tx(); conn.set_tx_state(TransactionState::None); } assert_eq!(conn.get_tx_state(), current_state); @@ -2456,10 +2456,10 @@ pub fn op_auto_commit( // TODO(pere): add rollback I/O logic once we implement rollback journal if let Some(mv_store) = mv_store { if let Some(tx_id) = conn.get_mv_tx_id() { - mv_store.rollback_tx(tx_id, pager.clone(), &conn)?; + mv_store.rollback_tx(tx_id, pager.clone(), &conn); } } else { - return_if_io!(pager.end_tx(true, &conn)); + pager.rollback_tx(&conn); } conn.set_tx_state(TransactionState::None); conn.auto_commit.store(true, Ordering::SeqCst); diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index fa1a88df8..ad3c7ad3b 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -30,7 +30,7 @@ use crate::{ function::{AggFunc, FuncCtx}, mvcc::{database::CommitStateMachine, LocalClock}, state_machine::StateMachine, - storage::sqlite3_ondisk::SmallVec, + storage::{pager::PagerCommitResult, sqlite3_ondisk::SmallVec}, translate::{collate::CollationSeq, plan::TableReferences}, types::{IOCompletions, IOResult, RawSlice, TextRef}, vdbe::{ @@ -41,7 +41,7 @@ use crate::{ }, metrics::StatementMetrics, }, - IOExt, RefValue, + RefValue, }; use crate::{ @@ -533,7 +533,7 @@ impl Program { // Connection is closed for whatever reason, rollback the transaction. let state = self.connection.get_tx_state(); if let TransactionState::Write { .. } = state { - pager.io.block(|| pager.end_tx(true, &self.connection))?; + pager.rollback_tx(&self.connection); } return Err(LimboError::InternalError("Connection closed".to_string())); } @@ -588,7 +588,7 @@ impl Program { // Connection is closed for whatever reason, rollback the transaction. let state = self.connection.get_tx_state(); if let TransactionState::Write { .. } = state { - pager.io.block(|| pager.end_tx(true, &self.connection))?; + pager.rollback_tx(&self.connection); } return Err(LimboError::InternalError("Connection closed".to_string())); } @@ -636,7 +636,7 @@ impl Program { // Connection is closed for whatever reason, rollback the transaction. let state = self.connection.get_tx_state(); if let TransactionState::Write { .. } = state { - pager.io.block(|| pager.end_tx(true, &self.connection))?; + pager.rollback_tx(&self.connection); } return Err(LimboError::InternalError("Connection closed".to_string())); } @@ -888,7 +888,7 @@ impl Program { ), TransactionState::Read => { connection.set_tx_state(TransactionState::None); - pager.end_read_tx()?; + pager.end_read_tx(); Ok(IOResult::Done(())) } TransactionState::None => Ok(IOResult::Done(())), @@ -914,7 +914,12 @@ impl Program { connection: &Connection, rollback: bool, ) -> Result> { - let cacheflush_status = pager.end_tx(rollback, connection)?; + let cacheflush_status = if !rollback { + pager.commit_tx(connection)? + } else { + pager.rollback_tx(connection); + IOResult::Done(PagerCommitResult::Rollback) + }; match cacheflush_status { IOResult::Done(_) => { if self.change_cnt_on { From 48ca3864b86da611afac297f1c71e489d00bdb5d Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Mon, 6 Oct 2025 13:22:26 +0400 Subject: [PATCH 3/8] properly abort statement in case of reset (when statement wasn't executed till completion) and interrupt --- core/lib.rs | 2 + core/vdbe/builder.rs | 2 + core/vdbe/mod.rs | 101 +++++++++++++++++++++++++------------------ 3 files changed, 63 insertions(+), 42 deletions(-) diff --git a/core/lib.rs b/core/lib.rs index f45715dc2..afbcfa09b 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -2722,6 +2722,8 @@ impl Statement { pub fn _reset(&mut self, max_registers: Option, max_cursors: Option) { self.state.reset(max_registers, max_cursors); + self.program + .abort(self.mv_store.as_ref(), &self.pager, None); self.busy = false; self.busy_timeout = None; } diff --git a/core/vdbe/builder.rs b/core/vdbe/builder.rs index 3d1a333ec..c2b6741aa 100644 --- a/core/vdbe/builder.rs +++ b/core/vdbe/builder.rs @@ -16,6 +16,7 @@ use crate::{ expr::ParamState, plan::{ResultSetColumn, TableReferences}, }, + vdbe::PROGRAM_STATE_ACTIVE, CaptureDataChangesMode, Connection, Value, VirtualTable, }; @@ -1019,6 +1020,7 @@ impl ProgramBuilder { table_references: self.table_references, sql: sql.to_string(), accesses_db: !matches!(self.txn_mode, TransactionMode::None), + program_state: PROGRAM_STATE_ACTIVE.into(), } } } diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index ad3c7ad3b..ed7fa4a12 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -66,7 +66,7 @@ use std::{ collections::HashMap, num::NonZero, sync::{ - atomic::{AtomicI64, Ordering}, + atomic::{AtomicI64, AtomicU32, Ordering}, Arc, }, }; @@ -483,6 +483,10 @@ macro_rules! get_cursor { }; } +const PROGRAM_STATE_ACTIVE: u32 = 0; +const PROGRAM_STATE_ABORTED: u32 = 1; +const PROGRAM_STATE_DONE: u32 = 2; + pub struct Program { pub max_registers: usize, // we store original indices because we don't want to create new vec from @@ -501,6 +505,9 @@ pub struct Program { /// Used to determine whether we need to check for schema changes when /// starting a transaction. pub accesses_db: bool, + /// Current state of the program + /// Used to execute abort only once + pub program_state: AtomicU32, } impl Program { @@ -641,6 +648,7 @@ impl Program { return Err(LimboError::InternalError("Connection closed".to_string())); } if state.is_interrupted() { + self.abort(mv_store, &pager, None); return Ok(StepResult::Interrupt); } if let Some(io) = &state.io_completions { @@ -649,7 +657,7 @@ impl Program { } if let Some(err) = io.get_error() { let err = err.into(); - handle_program_error(&pager, &self.connection, &err, mv_store)?; + self.abort(mv_store, &pager, Some(&err)); return Err(err); } state.io_completions = None; @@ -672,6 +680,8 @@ impl Program { Ok(InsnFunctionStepResult::Done) => { // Instruction completed execution state.metrics.insn_executed = state.metrics.insn_executed.saturating_add(1); + self.program_state + .store(PROGRAM_STATE_DONE, Ordering::SeqCst); return Ok(StepResult::Done); } Ok(InsnFunctionStepResult::IO(io)) => { @@ -693,7 +703,7 @@ impl Program { return Ok(StepResult::Busy); } Err(err) => { - handle_program_error(&pager, &self.connection, &err, mv_store)?; + self.abort(mv_store, &pager, Some(&err)); return Err(err); } } @@ -946,6 +956,52 @@ impl Program { ) -> Result> { commit_state.step(mv_store) } + + /// Aborts the program due to various conditions (explicit error, interrupt or reset of unfinished statement) by rolling back the transaction + /// This method is no-op if program was already finished (either aborted or executed to completion) + pub fn abort( + &self, + mv_store: Option<&Arc>, + pager: &Arc, + err: Option<&LimboError>, + ) { + let Ok(..) = self.program_state.compare_exchange( + PROGRAM_STATE_ACTIVE, + PROGRAM_STATE_ABORTED, + Ordering::SeqCst, + Ordering::SeqCst, + ) else { + // no need to abort: program was either already aborted or executed to completion successfully + return; + }; + + if self.connection.is_nested_stmt.load(Ordering::SeqCst) { + // Errors from nested statements are handled by the parent statement. + return; + } + if self.connection.get_tx_state() == TransactionState::None { + return; + } + match err { + // Transaction errors, e.g. trying to start a nested transaction, do not cause a rollback. + Some(LimboError::TxError(_)) => {} + // Table locked errors, e.g. trying to checkpoint in an interactive transaction, do not cause a rollback. + Some(LimboError::TableLocked) => {} + // Busy errors do not cause a rollback. + Some(LimboError::Busy) => {} + _ => { + if let Some(mv_store) = mv_store { + if let Some(tx_id) = self.connection.get_mv_tx_id() { + self.connection.auto_commit.store(true, Ordering::SeqCst); + mv_store.rollback_tx(tx_id, pager.clone(), &self.connection); + } + } else { + pager.rollback_tx(&self.connection); + } + self.connection.set_tx_state(TransactionState::None); + } + } + } } fn make_record(registers: &[Register], start_reg: &usize, count: &usize) -> ImmutableRecord { @@ -1068,42 +1124,3 @@ impl Row { self.count } } - -/// Handle a program error by rolling back the transaction -pub fn handle_program_error( - pager: &Arc, - connection: &Connection, - err: &LimboError, - mv_store: Option<&Arc>, -) -> Result<()> { - if connection.is_nested_stmt.load(Ordering::SeqCst) { - // Errors from nested statements are handled by the parent statement. - return Ok(()); - } - match err { - // Transaction errors, e.g. trying to start a nested transaction, do not cause a rollback. - LimboError::TxError(_) => {} - // Table locked errors, e.g. trying to checkpoint in an interactive transaction, do not cause a rollback. - LimboError::TableLocked => {} - // Busy errors do not cause a rollback. - LimboError::Busy => {} - _ => { - if let Some(mv_store) = mv_store { - if let Some(tx_id) = connection.get_mv_tx_id() { - connection.set_tx_state(TransactionState::None); - connection.auto_commit.store(true, Ordering::SeqCst); - mv_store.rollback_tx(tx_id, pager.clone(), connection)?; - } - } else { - pager - .io - .block(|| pager.end_tx(true, connection)) - .inspect_err(|e| { - tracing::error!("end_tx failed: {e}"); - })?; - } - connection.set_tx_state(TransactionState::None); - } - } - Ok(()) -} From a3ca5f6bf26c22f9adc94e1bfe5382dce03a6fa7 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Mon, 6 Oct 2025 13:27:42 +0400 Subject: [PATCH 4/8] implement Drop for Statement --- core/lib.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/core/lib.rs b/core/lib.rs index afbcfa09b..d4818f3d7 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -2741,6 +2741,12 @@ impl Statement { } } +impl Drop for Statement { + fn drop(&mut self) { + self.reset(); + } +} + pub type Row = vdbe::Row; pub type StepResult = vdbe::StepResult; From afe9a1949641934bb1bcf8898c195ecc9fac1fd4 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Mon, 6 Oct 2025 13:30:05 +0400 Subject: [PATCH 5/8] add simple integration test --- .../query_processing/test_read_path.rs | 23 ++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/tests/integration/query_processing/test_read_path.rs b/tests/integration/query_processing/test_read_path.rs index a0b72cd60..98874283b 100644 --- a/tests/integration/query_processing/test_read_path.rs +++ b/tests/integration/query_processing/test_read_path.rs @@ -1,4 +1,4 @@ -use crate::common::TempDatabase; +use crate::common::{limbo_exec_rows, TempDatabase}; use turso_core::{StepResult, Value}; #[test] @@ -876,3 +876,24 @@ fn test_upsert_parameters_order() -> anyhow::Result<()> { ); Ok(()) } + +#[test] +fn test_multiple_connections_visibility() -> anyhow::Result<()> { + let tmp_db = TempDatabase::new_with_rusqlite( + "CREATE TABLE test (k INTEGER PRIMARY KEY, v INTEGER);", + false, + ); + let conn1 = tmp_db.connect_limbo(); + let conn2 = tmp_db.connect_limbo(); + conn1.execute("BEGIN")?; + conn1.execute("INSERT INTO test VALUES (1, 2), (3, 4)")?; + let mut stmt = conn2.prepare("SELECT COUNT(*) FROM test").unwrap(); + let _ = stmt.step().unwrap(); + // intentionally drop not-fully-consumed statement in order to check that on Drop statement will execute reset with proper cleanup + drop(stmt); + conn1.execute("COMMIT")?; + + let rows = limbo_exec_rows(&tmp_db, &conn2, "SELECT COUNT(*) FROM test"); + assert_eq!(rows, vec![vec![rusqlite::types::Value::Integer(2)]]); + Ok(()) +} From 4877180784acfad61dde30bd0325406f16497765 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Mon, 6 Oct 2025 13:34:16 +0400 Subject: [PATCH 6/8] fix clippy --- core/lib.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/core/lib.rs b/core/lib.rs index d4818f3d7..75b85849d 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -497,10 +497,7 @@ impl Database { schema.schema_version = header_schema_cookie; let result = schema .make_from_btree(None, pager.clone(), &syms) - .or_else(|e| { - pager.end_read_tx(); - Err(e) - }); + .inspect_err(|_| pager.end_read_tx()); if let Err(LimboError::ExtensionError(e)) = result { // this means that a vtab exists and we no longer have the module loaded. we print // a warning to the user to load the module From 0ace1f9d903edfbb4f08199c7d81bb6b92d414fc Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Mon, 6 Oct 2025 15:10:37 +0400 Subject: [PATCH 7/8] fix code in order to not reset internal prepared statements created during DDL execution --- core/lib.rs | 14 +++++++++++--- core/vdbe/execute.rs | 10 ++++++++-- core/vdbe/mod.rs | 6 +++--- 3 files changed, 22 insertions(+), 8 deletions(-) diff --git a/core/lib.rs b/core/lib.rs index 75b85849d..f331a8952 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -47,6 +47,7 @@ use crate::types::{WalFrameInfo, WalState}; #[cfg(feature = "fs")] use crate::util::{OpenMode, OpenOptions}; use crate::vdbe::metrics::ConnectionMetrics; +use crate::vdbe::PROGRAM_STATE_DONE; use crate::vtab::VirtualTable; use crate::{incremental::view::AllViewsTxState, translate::emitter::TransactionMode}; use core::str; @@ -2572,6 +2573,7 @@ impl Statement { fn reprepare(&mut self) -> Result<()> { tracing::trace!("repreparing statement"); let conn = self.program.connection.clone(); + *conn.schema.write() = conn.db.clone_schema(); self.program = { let mut parser = Parser::new(self.program.sql.as_bytes()); @@ -2600,7 +2602,7 @@ impl Statement { QueryMode::Explain => (EXPLAIN_COLUMNS.len(), 0), QueryMode::ExplainQueryPlan => (EXPLAIN_QUERY_PLAN_COLUMNS.len(), 0), }; - self._reset(Some(max_registers), Some(cursor_count)); + self.reset_internal(Some(max_registers), Some(cursor_count)); // Load the parameters back into the state self.state.parameters = parameters; Ok(()) @@ -2714,10 +2716,16 @@ impl Statement { } pub fn reset(&mut self) { - self._reset(None, None); + self.reset_internal(None, None); } - pub fn _reset(&mut self, max_registers: Option, max_cursors: Option) { + pub(crate) fn mark_as_done(&self) { + self.program + .program_state + .store(PROGRAM_STATE_DONE, Ordering::SeqCst); + } + + fn reset_internal(&mut self, max_registers: Option, max_cursors: Option) { self.state.reset(max_registers, max_cursors); self.program .abort(self.mv_store.as_ref(), &self.pager, None); diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index ec5d6f3f9..d3ef412f1 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -8018,12 +8018,15 @@ pub fn op_drop_column( let schema = conn.schema.read(); for (view_name, view) in schema.views.iter() { let view_select_sql = format!("SELECT * FROM {view_name}"); - conn.prepare(view_select_sql.as_str()).map_err(|e| { + let stmt = conn.prepare(view_select_sql.as_str()).map_err(|e| { LimboError::ParseError(format!( "cannot drop column \"{}\": referenced in VIEW {view_name}: {}", column_name, view.sql, )) })?; + // this is internal statement running during active Program execution + // so, we must not interact with transaction state and explicitly mark this statement as done avoiding cleanup on reset + stmt.mark_as_done(); } } @@ -8149,12 +8152,15 @@ pub fn op_alter_column( for (view_name, view) in schema.views.iter() { let view_select_sql = format!("SELECT * FROM {view_name}"); // FIXME: this should rewrite the view to reference the new column name - conn.prepare(view_select_sql.as_str()).map_err(|e| { + let stmt = conn.prepare(view_select_sql.as_str()).map_err(|e| { LimboError::ParseError(format!( "cannot rename column \"{}\": referenced in VIEW {view_name}: {}", old_column_name, view.sql, )) })?; + // this is internal statement running during active Program execution + // so, we must not interact with transaction state and explicitly mark this statement as done avoiding cleanup on reset + stmt.mark_as_done(); } } diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index ed7fa4a12..ed09762e1 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -483,9 +483,9 @@ macro_rules! get_cursor { }; } -const PROGRAM_STATE_ACTIVE: u32 = 0; -const PROGRAM_STATE_ABORTED: u32 = 1; -const PROGRAM_STATE_DONE: u32 = 2; +pub(crate) const PROGRAM_STATE_ACTIVE: u32 = 1; +pub(crate) const PROGRAM_STATE_ABORTED: u32 = 2; +pub(crate) const PROGRAM_STATE_DONE: u32 = 3; pub struct Program { pub max_registers: usize, From e2f73106177aa7f52fd7bea84f562edb37daa8e0 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Mon, 6 Oct 2025 17:51:43 +0400 Subject: [PATCH 8/8] add explicit tracker for Txn cleanup necessary for statement --- core/incremental/expr_compiler.rs | 5 -- core/lib.rs | 28 +++--- core/mvcc/database/mod.rs | 4 +- core/vdbe/builder.rs | 2 - core/vdbe/execute.rs | 19 ++-- core/vdbe/mod.rs | 88 ++++++++----------- .../query_processing/test_transactions.rs | 1 + 7 files changed, 62 insertions(+), 85 deletions(-) diff --git a/core/incremental/expr_compiler.rs b/core/incremental/expr_compiler.rs index 44b2cef49..f823c870d 100644 --- a/core/incremental/expr_compiler.rs +++ b/core/incremental/expr_compiler.rs @@ -458,11 +458,6 @@ impl CompiledExpression { "Expression evaluation produced unexpected row".to_string(), )); } - crate::vdbe::execute::InsnFunctionStepResult::Interrupt => { - return Err(crate::LimboError::InternalError( - "Expression evaluation was interrupted".to_string(), - )); - } crate::vdbe::execute::InsnFunctionStepResult::Step => { pc = state.pc as usize; } diff --git a/core/lib.rs b/core/lib.rs index f331a8952..44297643b 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -47,7 +47,6 @@ use crate::types::{WalFrameInfo, WalState}; #[cfg(feature = "fs")] use crate::util::{OpenMode, OpenOptions}; use crate::vdbe::metrics::ConnectionMetrics; -use crate::vdbe::PROGRAM_STATE_DONE; use crate::vtab::VirtualTable; use crate::{incremental::view::AllViewsTxState, translate::emitter::TransactionMode}; use core::str; @@ -2424,6 +2423,12 @@ pub struct Statement { busy_timeout: Option, } +impl Drop for Statement { + fn drop(&mut self) { + self.reset(); + } +} + impl Statement { pub fn new( program: vdbe::Program, @@ -2719,16 +2724,15 @@ impl Statement { self.reset_internal(None, None); } - pub(crate) fn mark_as_done(&self) { - self.program - .program_state - .store(PROGRAM_STATE_DONE, Ordering::SeqCst); - } - fn reset_internal(&mut self, max_registers: Option, max_cursors: Option) { + // as abort uses auto_txn_cleanup value - it needs to be called before state.reset + self.program.abort( + self.mv_store.as_ref(), + &self.pager, + None, + &mut self.state.auto_txn_cleanup, + ); self.state.reset(max_registers, max_cursors); - self.program - .abort(self.mv_store.as_ref(), &self.pager, None); self.busy = false; self.busy_timeout = None; } @@ -2746,12 +2750,6 @@ impl Statement { } } -impl Drop for Statement { - fn drop(&mut self) { - self.reset(); - } -} - pub type Row = vdbe::Row; pub type StepResult = vdbe::StepResult; diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index a03fba7ba..d9fd32f4b 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -1342,7 +1342,7 @@ impl MvStore { &self, pager: Arc, maybe_existing_tx_id: Option, - ) -> Result> { + ) -> Result { if !self.blocking_checkpoint_lock.read() { // If there is a stop-the-world checkpoint in progress, we cannot begin any transaction at all. return Err(LimboError::Busy); @@ -1378,7 +1378,7 @@ impl MvStore { ); tracing::debug!("begin_exclusive_tx: tx_id={} succeeded", tx_id); self.txs.insert(tx_id, tx); - Ok(IOResult::Done(tx_id)) + Ok(tx_id) } /// Begins a new transaction in the database. diff --git a/core/vdbe/builder.rs b/core/vdbe/builder.rs index c2b6741aa..3d1a333ec 100644 --- a/core/vdbe/builder.rs +++ b/core/vdbe/builder.rs @@ -16,7 +16,6 @@ use crate::{ expr::ParamState, plan::{ResultSetColumn, TableReferences}, }, - vdbe::PROGRAM_STATE_ACTIVE, CaptureDataChangesMode, Connection, Value, VirtualTable, }; @@ -1020,7 +1019,6 @@ impl ProgramBuilder { table_references: self.table_references, sql: sql.to_string(), accesses_db: !matches!(self.txn_mode, TransactionMode::None), - program_state: PROGRAM_STATE_ACTIVE.into(), } } } diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index d3ef412f1..01def8029 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -19,7 +19,7 @@ use crate::types::{ }; use crate::util::normalize_ident; use crate::vdbe::insn::InsertFlags; -use crate::vdbe::registers_to_ref_values; +use crate::vdbe::{registers_to_ref_values, TxnCleanup}; use crate::vector::{vector_concat, vector_slice}; use crate::{ error::{ @@ -157,7 +157,6 @@ pub enum InsnFunctionStepResult { Done, IO(IOCompletions), Row, - Interrupt, Step, } @@ -2328,7 +2327,7 @@ pub fn op_transaction_inner( | TransactionMode::Read | TransactionMode::Concurrent => mv_store.begin_tx(pager.clone())?, TransactionMode::Write => { - return_if_io!(mv_store.begin_exclusive_tx(pager.clone(), None)) + mv_store.begin_exclusive_tx(pager.clone(), None)? } }; *program.connection.mv_tx.write() = Some((tx_id, *tx_mode)); @@ -2343,7 +2342,7 @@ pub fn op_transaction_inner( if matches!(new_transaction_state, TransactionState::Write { .. }) && matches!(actual_tx_mode, TransactionMode::Write) { - return_if_io!(mv_store.begin_exclusive_tx(pager.clone(), Some(tx_id))); + mv_store.begin_exclusive_tx(pager.clone(), Some(tx_id))?; } } } else { @@ -2359,6 +2358,7 @@ pub fn op_transaction_inner( "nested stmt should not begin a new read transaction" ); pager.begin_read_tx()?; + state.auto_txn_cleanup = TxnCleanup::RollbackTxn; } if updated && matches!(new_transaction_state, TransactionState::Write { .. }) { @@ -2374,6 +2374,7 @@ pub fn op_transaction_inner( if matches!(current_state, TransactionState::None) { pager.end_read_tx(); conn.set_tx_state(TransactionState::None); + state.auto_txn_cleanup = TxnCleanup::None; } assert_eq!(conn.get_tx_state(), current_state); return Err(LimboError::Busy); @@ -8018,15 +8019,12 @@ pub fn op_drop_column( let schema = conn.schema.read(); for (view_name, view) in schema.views.iter() { let view_select_sql = format!("SELECT * FROM {view_name}"); - let stmt = conn.prepare(view_select_sql.as_str()).map_err(|e| { + let _ = conn.prepare(view_select_sql.as_str()).map_err(|e| { LimboError::ParseError(format!( "cannot drop column \"{}\": referenced in VIEW {view_name}: {}", column_name, view.sql, )) })?; - // this is internal statement running during active Program execution - // so, we must not interact with transaction state and explicitly mark this statement as done avoiding cleanup on reset - stmt.mark_as_done(); } } @@ -8152,15 +8150,12 @@ pub fn op_alter_column( for (view_name, view) in schema.views.iter() { let view_select_sql = format!("SELECT * FROM {view_name}"); // FIXME: this should rewrite the view to reference the new column name - let stmt = conn.prepare(view_select_sql.as_str()).map_err(|e| { + let _ = conn.prepare(view_select_sql.as_str()).map_err(|e| { LimboError::ParseError(format!( "cannot rename column \"{}\": referenced in VIEW {view_name}: {}", old_column_name, view.sql, )) })?; - // this is internal statement running during active Program execution - // so, we must not interact with transaction state and explicitly mark this statement as done avoiding cleanup on reset - stmt.mark_as_done(); } } diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index ed09762e1..4b11c8c1d 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -66,7 +66,7 @@ use std::{ collections::HashMap, num::NonZero, sync::{ - atomic::{AtomicI64, AtomicU32, Ordering}, + atomic::{AtomicI64, Ordering}, Arc, }, }; @@ -265,6 +265,12 @@ pub struct Row { count: usize, } +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum TxnCleanup { + None, + RollbackTxn, +} + /// The program state describes the environment in which the program executes. pub struct ProgramState { pub io_completions: Option, @@ -302,6 +308,10 @@ pub struct ProgramState { op_checkpoint_state: OpCheckpointState, /// State machine for committing view deltas with I/O handling view_delta_state: ViewDeltaCommitState, + /// Marker which tells about auto transaction cleanup necessary for that connection in case of reset + /// This is used when statement in auto-commit mode reseted after previous uncomplete execution - in which case we may need to rollback transaction started on previous attempt + /// Note, that MVCC transactions are always explicit - so they do not update auto_txn_cleanup marker + pub(crate) auto_txn_cleanup: TxnCleanup, } impl ProgramState { @@ -346,6 +356,7 @@ impl ProgramState { op_transaction_state: OpTransactionState::Start, op_checkpoint_state: OpCheckpointState::StartCheckpoint, view_delta_state: ViewDeltaCommitState::NotStarted, + auto_txn_cleanup: TxnCleanup::None, } } @@ -428,6 +439,7 @@ impl ProgramState { self.op_column_state = OpColumnState::Start; self.op_row_id_state = OpRowIdState::Start; self.view_delta_state = ViewDeltaCommitState::NotStarted; + self.auto_txn_cleanup = TxnCleanup::None; } pub fn get_cursor(&mut self, cursor_id: CursorID) -> &mut Cursor { @@ -483,10 +495,6 @@ macro_rules! get_cursor { }; } -pub(crate) const PROGRAM_STATE_ACTIVE: u32 = 1; -pub(crate) const PROGRAM_STATE_ABORTED: u32 = 2; -pub(crate) const PROGRAM_STATE_DONE: u32 = 3; - pub struct Program { pub max_registers: usize, // we store original indices because we don't want to create new vec from @@ -505,9 +513,6 @@ pub struct Program { /// Used to determine whether we need to check for schema changes when /// starting a transaction. pub accesses_db: bool, - /// Current state of the program - /// Used to execute abort only once - pub program_state: AtomicU32, } impl Program { @@ -648,7 +653,7 @@ impl Program { return Err(LimboError::InternalError("Connection closed".to_string())); } if state.is_interrupted() { - self.abort(mv_store, &pager, None); + self.abort(mv_store, &pager, None, &mut state.auto_txn_cleanup); return Ok(StepResult::Interrupt); } if let Some(io) = &state.io_completions { @@ -657,7 +662,7 @@ impl Program { } if let Some(err) = io.get_error() { let err = err.into(); - self.abort(mv_store, &pager, Some(&err)); + self.abort(mv_store, &pager, Some(&err), &mut state.auto_txn_cleanup); return Err(err); } state.io_completions = None; @@ -680,8 +685,7 @@ impl Program { Ok(InsnFunctionStepResult::Done) => { // Instruction completed execution state.metrics.insn_executed = state.metrics.insn_executed.saturating_add(1); - self.program_state - .store(PROGRAM_STATE_DONE, Ordering::SeqCst); + state.auto_txn_cleanup = TxnCleanup::None; return Ok(StepResult::Done); } Ok(InsnFunctionStepResult::IO(io)) => { @@ -694,16 +698,12 @@ impl Program { state.metrics.insn_executed = state.metrics.insn_executed.saturating_add(1); return Ok(StepResult::Row); } - Ok(InsnFunctionStepResult::Interrupt) => { - // Instruction interrupted - may resume at same PC - return Ok(StepResult::Interrupt); - } Err(LimboError::Busy) => { // Instruction blocked - will retry at same PC return Ok(StepResult::Busy); } Err(err) => { - self.abort(mv_store, &pager, Some(&err)); + self.abort(mv_store, &pager, Some(&err), &mut state.auto_txn_cleanup); return Err(err); } } @@ -964,43 +964,33 @@ impl Program { mv_store: Option<&Arc>, pager: &Arc, err: Option<&LimboError>, + cleanup: &mut TxnCleanup, ) { - let Ok(..) = self.program_state.compare_exchange( - PROGRAM_STATE_ACTIVE, - PROGRAM_STATE_ABORTED, - Ordering::SeqCst, - Ordering::SeqCst, - ) else { - // no need to abort: program was either already aborted or executed to completion successfully - return; - }; - - if self.connection.is_nested_stmt.load(Ordering::SeqCst) { - // Errors from nested statements are handled by the parent statement. - return; - } - if self.connection.get_tx_state() == TransactionState::None { - return; - } - match err { - // Transaction errors, e.g. trying to start a nested transaction, do not cause a rollback. - Some(LimboError::TxError(_)) => {} - // Table locked errors, e.g. trying to checkpoint in an interactive transaction, do not cause a rollback. - Some(LimboError::TableLocked) => {} - // Busy errors do not cause a rollback. - Some(LimboError::Busy) => {} - _ => { - if let Some(mv_store) = mv_store { - if let Some(tx_id) = self.connection.get_mv_tx_id() { - self.connection.auto_commit.store(true, Ordering::SeqCst); - mv_store.rollback_tx(tx_id, pager.clone(), &self.connection); + // Errors from nested statements are handled by the parent statement. + if !self.connection.is_nested_stmt.load(Ordering::SeqCst) { + match err { + // Transaction errors, e.g. trying to start a nested transaction, do not cause a rollback. + Some(LimboError::TxError(_)) => {} + // Table locked errors, e.g. trying to checkpoint in an interactive transaction, do not cause a rollback. + Some(LimboError::TableLocked) => {} + // Busy errors do not cause a rollback. + Some(LimboError::Busy) => {} + _ => { + if *cleanup != TxnCleanup::None || err.is_some() { + if let Some(mv_store) = mv_store { + if let Some(tx_id) = self.connection.get_mv_tx_id() { + self.connection.auto_commit.store(true, Ordering::SeqCst); + mv_store.rollback_tx(tx_id, pager.clone(), &self.connection); + } + } else { + pager.rollback_tx(&self.connection); + } + self.connection.set_tx_state(TransactionState::None); } - } else { - pager.rollback_tx(&self.connection); } - self.connection.set_tx_state(TransactionState::None); } } + *cleanup = TxnCleanup::None; } } diff --git a/tests/integration/query_processing/test_transactions.rs b/tests/integration/query_processing/test_transactions.rs index 53ab7f00e..a96235153 100644 --- a/tests/integration/query_processing/test_transactions.rs +++ b/tests/integration/query_processing/test_transactions.rs @@ -95,6 +95,7 @@ fn test_deferred_transaction_no_restart() { .execute("INSERT INTO test (id, value) VALUES (2, 'second')") .unwrap(); conn2.execute("COMMIT").unwrap(); + drop(stmt); let mut stmt = conn1.query("SELECT COUNT(*) FROM test").unwrap().unwrap(); if let StepResult::Row = stmt.step().unwrap() {