diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index 54cd1d0cf..547cd9e95 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -1,6 +1,5 @@ use crate::mvcc::clock::LogicalClock; use crate::mvcc::persistent_storage::Storage; -use crate::return_if_io; use crate::state_machine::StateMachine; use crate::state_machine::StateTransition; use crate::state_machine::TransitionResult; @@ -542,24 +541,11 @@ impl StateTransition for CommitStateMachine { if mvcc_store.is_exclusive_tx(&self.tx_id) { mvcc_store.release_exclusive_tx(&self.tx_id); self.commit_coordinator.pager_commit_lock.unlock(); - if !mvcc_store.storage.is_logical_log() { - // FIXME: this function isnt re-entrant - self.pager - .io - .block(|| self.pager.end_tx(false, &self.connection))?; - } - } else if !mvcc_store.storage.is_logical_log() { - self.pager.end_read_tx()?; } self.finalize(mvcc_store)?; return Ok(TransitionResult::Done(())); } - if mvcc_store.storage.is_logical_log() { - self.state = CommitState::Commit { end_ts }; - return Ok(TransitionResult::Continue); - } else { - self.state = CommitState::BeginPagerTxn { end_ts }; - } + self.state = CommitState::Commit { end_ts }; Ok(TransitionResult::Continue) } CommitState::BeginPagerTxn { end_ts } => { @@ -851,7 +837,6 @@ impl StateTransition for CommitStateMachine { return Ok(TransitionResult::Continue); } CommitState::BeginCommitLogicalLog { end_ts, log_record } => { - assert!(mvcc_store.storage.is_logical_log()); if !mvcc_store.is_exclusive_tx(&self.tx_id) { // logical log needs to be serialized let locked = self.commit_coordinator.pager_commit_lock.write(); @@ -866,10 +851,6 @@ impl StateTransition for CommitStateMachine { match result { IOResult::Done(_) => {} IOResult::IO(io) => { - assert!( - mvcc_store.storage.is_logical_log(), - "for now logical log is the only storage that can return IO" - ); if !io.finished() { return Ok(TransitionResult::Io(io)); } @@ -897,13 +878,11 @@ impl StateTransition for CommitStateMachine { let schema = connection.schema.borrow().clone(); connection.db.update_schema_if_newer(schema)?; } - if mvcc_store.storage.is_logical_log() { - let tx = mvcc_store.txs.get(&self.tx_id).unwrap(); - let tx_unlocked = tx.value(); - self.header.write().replace(*tx_unlocked.header.borrow()); - tracing::trace!("end_commit_logical_log(tx_id={})", self.tx_id); - self.commit_coordinator.pager_commit_lock.unlock(); - } + let tx = mvcc_store.txs.get(&self.tx_id).unwrap(); + let tx_unlocked = tx.value(); + self.header.write().replace(*tx_unlocked.header.borrow()); + tracing::trace!("end_commit_logical_log(tx_id={})", self.tx_id); + self.commit_coordinator.pager_commit_lock.unlock(); self.state = CommitState::CommitEnd { end_ts: *end_ts }; return Ok(TransitionResult::Continue); } @@ -1422,38 +1401,12 @@ impl MvStore { /// /// This is used for IMMEDIATE and EXCLUSIVE transaction types where we need /// to ensure exclusive write access as per SQLite semantics. + #[instrument(skip_all, level = Level::DEBUG)] pub fn begin_exclusive_tx( &self, pager: Arc, maybe_existing_tx_id: Option, ) -> Result> { - self._begin_exclusive_tx(pager, false, maybe_existing_tx_id) - } - - /// Upgrades a read transaction to an exclusive write transaction. - /// - /// This is used for IMMEDIATE and EXCLUSIVE transaction types where we need - /// to ensure exclusive write access as per SQLite semantics. - pub fn upgrade_to_exclusive_tx( - &self, - pager: Arc, - maybe_existing_tx_id: Option, - ) -> Result> { - self._begin_exclusive_tx(pager, true, maybe_existing_tx_id) - } - - /// Begins an exclusive write transaction that prevents concurrent writes. - /// - /// This is used for IMMEDIATE and EXCLUSIVE transaction types where we need - /// to ensure exclusive write access as per SQLite semantics. - #[instrument(skip_all, level = Level::DEBUG)] - fn _begin_exclusive_tx( - &self, - pager: Arc, - is_upgrade_from_read: bool, - maybe_existing_tx_id: Option, - ) -> Result> { - let is_logical_log = self.storage.is_logical_log(); let tx_id = maybe_existing_tx_id.unwrap_or_else(|| self.get_tx_id()); let begin_ts = if let Some(tx_id) = maybe_existing_tx_id { self.txs.get(&tx_id).unwrap().value().begin_ts @@ -1463,16 +1416,6 @@ impl MvStore { self.acquire_exclusive_tx(&tx_id)?; - // Try to acquire the pager read lock - if !is_upgrade_from_read && !is_logical_log { - pager.begin_read_tx().inspect_err(|_| { - tracing::debug!( - "begin_exclusive_tx: tx_id={} failed with Busy on pager_read_lock", - tx_id - ); - self.release_exclusive_tx(&tx_id); - })?; - } let locked = self.commit_coordinator.pager_commit_lock.write(); if !locked { tracing::debug!( @@ -1480,46 +1423,18 @@ impl MvStore { tx_id ); self.release_exclusive_tx(&tx_id); - pager.end_read_tx()?; return Err(LimboError::Busy); } let header = self.get_new_transaction_database_header(&pager); - if is_logical_log { - let tx = Transaction::new(tx_id, begin_ts, header); - tracing::trace!( - "begin_exclusive_tx(tx_id={}) - exclusive write logical log transaction", - tx_id - ); - tracing::debug!("begin_exclusive_tx: tx_id={} succeeded", tx_id); - self.txs.insert(tx_id, tx); - return Ok(IOResult::Done(tx_id)); - } - // Try to acquire the pager write lock - let begin_w_tx_res = pager.begin_write_tx(); - if let Err(LimboError::Busy) = begin_w_tx_res { - tracing::debug!("begin_exclusive_tx: tx_id={} failed with Busy", tx_id); - // Failed to get pager lock - release our exclusive lock - self.commit_coordinator.pager_commit_lock.unlock(); - self.release_exclusive_tx(&tx_id); - if maybe_existing_tx_id.is_none() { - // If we were upgrading an existing non-CONCURRENT mvcc transaction to write, we don't end the read tx on Busy. - // But if we were beginning a completely new non-CONCURRENT mvcc transaction, we do end it because the next time the connection - // attempts to do something, it will open a new read tx, which will fail if we don't end this one here. - pager.end_read_tx()?; - } - return Err(LimboError::Busy); - } - return_if_io!(begin_w_tx_res); let tx = Transaction::new(tx_id, begin_ts, header); tracing::trace!( - "begin_exclusive_tx(tx_id={}) - exclusive write transaction", + "begin_exclusive_tx(tx_id={}) - exclusive write logical log transaction", tx_id ); tracing::debug!("begin_exclusive_tx: tx_id={} succeeded", tx_id); self.txs.insert(tx_id, tx); - Ok(IOResult::Done(tx_id)) } @@ -1532,12 +1447,6 @@ impl MvStore { let tx_id = self.get_tx_id(); let begin_ts = self.get_timestamp(); - // TODO: we need to tie a pager's read transaction to a transaction ID, so that future refactors to read - // pages from WAL/DB read from a consistent state to maintiain snapshot isolation. - if !self.storage.is_logical_log() { - pager.begin_read_tx()?; - } - // Set txn's header to the global header let header = self.get_new_transaction_database_header(&pager); let tx = Transaction::new(tx_id, begin_ts, header); diff --git a/core/mvcc/persistent_storage/mod.rs b/core/mvcc/persistent_storage/mod.rs index b92bf081e..ac12e77c4 100644 --- a/core/mvcc/persistent_storage/mod.rs +++ b/core/mvcc/persistent_storage/mod.rs @@ -29,10 +29,6 @@ impl Storage { todo!() } - pub fn is_logical_log(&self) -> bool { - true - } - pub fn sync(&self) -> Result> { self.logical_log.borrow_mut().sync() } diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 16f32ba79..5099f7831 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -2276,16 +2276,8 @@ pub fn op_transaction_inner( if matches!(new_transaction_state, TransactionState::Write { .. }) && matches!(actual_tx_mode, TransactionMode::Write) { - let (tx_id, mv_tx_mode) = program.connection.mv_tx.get().unwrap(); - if mv_tx_mode == TransactionMode::Read { - return_if_io!( - mv_store.upgrade_to_exclusive_tx(pager.clone(), Some(tx_id)) - ); - } else { - return_if_io!( - mv_store.begin_exclusive_tx(pager.clone(), Some(tx_id)) - ); - } + let (tx_id, _) = program.connection.mv_tx.get().unwrap(); + return_if_io!(mv_store.begin_exclusive_tx(pager.clone(), Some(tx_id))); } } } else {