From 5feb9ea2f03986d9a477076d98063f8948abea85 Mon Sep 17 00:00:00 2001 From: Jussi Saurio Date: Sun, 14 Sep 2025 21:23:06 +0300 Subject: [PATCH] mvcc: fix non-concurrent transaction semantics on the main branch, mvcc allows concurrent inserts from multiple txns even without BEGIN CONCURRENT, and then always hangs whenever one of the txns tries to commit. this commit fixes that issue. --- core/mvcc/database/mod.rs | 50 ++++++++++++++++++++++++++++++++------- core/vdbe/execute.rs | 14 ++++++----- core/vdbe/mod.rs | 2 ++ 3 files changed, 51 insertions(+), 15 deletions(-) diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index efc91f6e5..6eaefd446 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -1262,19 +1262,50 @@ impl MvStore { /// /// This is used for IMMEDIATE and EXCLUSIVE transaction types where we need /// to ensure exclusive write access as per SQLite semantics. - pub fn begin_exclusive_tx(&self, pager: Rc) -> Result> { - let tx_id = self.get_tx_id(); + pub fn begin_exclusive_tx( + &self, + pager: Rc, + maybe_existing_tx_id: Option, + ) -> Result> { + self._begin_exclusive_tx(pager, false, maybe_existing_tx_id) + } + + /// Upgrades a read transaction to an exclusive write transaction. + /// + /// This is used for IMMEDIATE and EXCLUSIVE transaction types where we need + /// to ensure exclusive write access as per SQLite semantics. + pub fn upgrade_to_exclusive_tx( + &self, + pager: Rc, + maybe_existing_tx_id: Option, + ) -> Result> { + self._begin_exclusive_tx(pager, true, maybe_existing_tx_id) + } + + /// Begins an exclusive write transaction that prevents concurrent writes. + /// + /// This is used for IMMEDIATE and EXCLUSIVE transaction types where we need + /// to ensure exclusive write access as per SQLite semantics. + fn _begin_exclusive_tx( + &self, + pager: Rc, + is_upgrade_from_read: bool, + maybe_existing_tx_id: Option, + ) -> Result> { + let tx_id = maybe_existing_tx_id.unwrap_or_else(|| self.get_tx_id()); let begin_ts = self.get_timestamp(); self.acquire_exclusive_tx(&tx_id)?; // Try to acquire the pager read lock - match pager.begin_read_tx()? { - LimboResult::Busy => { - self.release_exclusive_tx(&tx_id); - return Err(LimboError::Busy); + if !is_upgrade_from_read { + match pager.begin_read_tx()? { + LimboResult::Busy => { + self.release_exclusive_tx(&tx_id); + return Err(LimboError::Busy); + } + LimboResult::Ok => {} } - LimboResult::Ok => {} } let locked = self.commit_coordinator.pager_commit_lock.write(); if !locked { @@ -1287,7 +1318,9 @@ impl MvStore { LimboResult::Busy => { tracing::debug!("begin_exclusive_tx: tx_id={} failed with Busy", tx_id); // Failed to get pager lock - release our exclusive lock - panic!("begin_exclusive_tx: tx_id={tx_id} failed with Busy, this should never happen as we were able to lock mvcc exclusive write lock"); + self.commit_coordinator.pager_commit_lock.unlock(); + self.release_exclusive_tx(&tx_id); + return Err(LimboError::Busy); } LimboResult::Ok => { let tx = Transaction::new(tx_id, begin_ts); @@ -1336,7 +1369,6 @@ impl MvStore { pager: Rc, connection: &Arc, ) -> Result>> { - tracing::trace!("commit_tx(tx_id={})", tx_id); let state_machine: StateMachine> = StateMachine::>::new(CommitStateMachine::new( CommitState::Initial, diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index ea51ec8e9..2bbaea1d7 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -2171,7 +2171,7 @@ pub fn op_transaction( mv_store.begin_tx(pager.clone()) } TransactionMode::Write => { - return_if_io!(mv_store.begin_exclusive_tx(pager.clone())) + return_if_io!(mv_store.begin_exclusive_tx(pager.clone(), None)) } }; conn.mv_transactions.borrow_mut().push(tx_id); @@ -2180,11 +2180,13 @@ pub fn op_transaction( && matches!(new_transaction_state, TransactionState::Write { .. }) && matches!(tx_mode, TransactionMode::Write) { - // For MVCC with concurrent transactions, we don't need to upgrade to exclusive. - // The existing MVCC transaction can handle both reads and writes. - // We only upgrade to exclusive for IMMEDIATE/EXCLUSIVE transaction modes. - // Since we already have an MVCC transaction from BEGIN CONCURRENT, - // we can just continue using it for writes. + let is_upgrade_from_read = matches!(current_state, TransactionState::Read); + let tx_id = program.connection.mv_tx_id.get().unwrap(); + if is_upgrade_from_read { + return_if_io!(mv_store.upgrade_to_exclusive_tx(pager.clone(), Some(tx_id))); + } else { + return_if_io!(mv_store.begin_exclusive_tx(pager.clone(), Some(tx_id))); + } } } else { if matches!(tx_mode, TransactionMode::Concurrent) { diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index 52b7bf080..1822813de 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -1082,6 +1082,8 @@ pub fn handle_program_error( LimboError::TxError(_) => {} // Table locked errors, e.g. trying to checkpoint in an interactive transaction, do not cause a rollback. LimboError::TableLocked => {} + // Busy errors do not cause a rollback. + LimboError::Busy => {} _ => { if let Some(mv_store) = mv_store { if let Some(tx_id) = connection.mv_tx_id.get() {