diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index efc91f6e5..6eaefd446 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -1262,19 +1262,50 @@ impl MvStore { /// /// This is used for IMMEDIATE and EXCLUSIVE transaction types where we need /// to ensure exclusive write access as per SQLite semantics. - pub fn begin_exclusive_tx(&self, pager: Rc) -> Result> { - let tx_id = self.get_tx_id(); + pub fn begin_exclusive_tx( + &self, + pager: Rc, + maybe_existing_tx_id: Option, + ) -> Result> { + self._begin_exclusive_tx(pager, false, maybe_existing_tx_id) + } + + /// Upgrades a read transaction to an exclusive write transaction. + /// + /// This is used for IMMEDIATE and EXCLUSIVE transaction types where we need + /// to ensure exclusive write access as per SQLite semantics. + pub fn upgrade_to_exclusive_tx( + &self, + pager: Rc, + maybe_existing_tx_id: Option, + ) -> Result> { + self._begin_exclusive_tx(pager, true, maybe_existing_tx_id) + } + + /// Begins an exclusive write transaction that prevents concurrent writes. + /// + /// This is used for IMMEDIATE and EXCLUSIVE transaction types where we need + /// to ensure exclusive write access as per SQLite semantics. + fn _begin_exclusive_tx( + &self, + pager: Rc, + is_upgrade_from_read: bool, + maybe_existing_tx_id: Option, + ) -> Result> { + let tx_id = maybe_existing_tx_id.unwrap_or_else(|| self.get_tx_id()); let begin_ts = self.get_timestamp(); self.acquire_exclusive_tx(&tx_id)?; // Try to acquire the pager read lock - match pager.begin_read_tx()? { - LimboResult::Busy => { - self.release_exclusive_tx(&tx_id); - return Err(LimboError::Busy); + if !is_upgrade_from_read { + match pager.begin_read_tx()? { + LimboResult::Busy => { + self.release_exclusive_tx(&tx_id); + return Err(LimboError::Busy); + } + LimboResult::Ok => {} } - LimboResult::Ok => {} } let locked = self.commit_coordinator.pager_commit_lock.write(); if !locked { @@ -1287,7 +1318,9 @@ impl MvStore { LimboResult::Busy => { tracing::debug!("begin_exclusive_tx: tx_id={} failed with Busy", tx_id); // Failed to get pager lock - release our exclusive lock - panic!("begin_exclusive_tx: tx_id={tx_id} failed with Busy, this should never happen as we were able to lock mvcc exclusive write lock"); + self.commit_coordinator.pager_commit_lock.unlock(); + self.release_exclusive_tx(&tx_id); + return Err(LimboError::Busy); } LimboResult::Ok => { let tx = Transaction::new(tx_id, begin_ts); @@ -1336,7 +1369,6 @@ impl MvStore { pager: Rc, connection: &Arc, ) -> Result>> { - tracing::trace!("commit_tx(tx_id={})", tx_id); let state_machine: StateMachine> = StateMachine::>::new(CommitStateMachine::new( CommitState::Initial, diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index ea51ec8e9..2bbaea1d7 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -2171,7 +2171,7 @@ pub fn op_transaction( mv_store.begin_tx(pager.clone()) } TransactionMode::Write => { - return_if_io!(mv_store.begin_exclusive_tx(pager.clone())) + return_if_io!(mv_store.begin_exclusive_tx(pager.clone(), None)) } }; conn.mv_transactions.borrow_mut().push(tx_id); @@ -2180,11 +2180,13 @@ pub fn op_transaction( && matches!(new_transaction_state, TransactionState::Write { .. }) && matches!(tx_mode, TransactionMode::Write) { - // For MVCC with concurrent transactions, we don't need to upgrade to exclusive. - // The existing MVCC transaction can handle both reads and writes. - // We only upgrade to exclusive for IMMEDIATE/EXCLUSIVE transaction modes. - // Since we already have an MVCC transaction from BEGIN CONCURRENT, - // we can just continue using it for writes. + let is_upgrade_from_read = matches!(current_state, TransactionState::Read); + let tx_id = program.connection.mv_tx_id.get().unwrap(); + if is_upgrade_from_read { + return_if_io!(mv_store.upgrade_to_exclusive_tx(pager.clone(), Some(tx_id))); + } else { + return_if_io!(mv_store.begin_exclusive_tx(pager.clone(), Some(tx_id))); + } } } else { if matches!(tx_mode, TransactionMode::Concurrent) { diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index 52b7bf080..1822813de 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -1082,6 +1082,8 @@ pub fn handle_program_error( LimboError::TxError(_) => {} // Table locked errors, e.g. trying to checkpoint in an interactive transaction, do not cause a rollback. LimboError::TableLocked => {} + // Busy errors do not cause a rollback. + LimboError::Busy => {} _ => { if let Some(mv_store) = mv_store { if let Some(tx_id) = connection.mv_tx_id.get() {