diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index 547cd9e95..da64dda13 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -900,7 +900,7 @@ impl StateTransition for CommitStateMachine { // But that's a problem for another day. // FIXME: it actually just become a problem for today!!! // TODO: test that reproduces this failure, and then a fix - mvcc_store.txs.remove(&self.tx_id); + mvcc_store.remove_tx(self.tx_id); if mvcc_store.is_exclusive_tx(&self.tx_id) { mvcc_store.release_exclusive_tx(&self.tx_id); @@ -1109,6 +1109,14 @@ pub struct MvStore { exclusive_tx: RwLock>, commit_coordinator: Arc, global_header: Arc>>, + /// MVCC checkpoints are always TRUNCATE, plus they block all other transactions. + /// This guarantees that never need to let transactions read from the SQLite WAL. + /// In MVCC, the checkpoint procedure is roughly as follows: + /// - Take the blocking_checkpoint_lock + /// - Write everything in the logical log to the pager, and from there commit to the SQLite WAL. + /// - Immediately TRUNCATE checkpoint the WAL into the database file. + /// - Release the blocking_checkpoint_lock. + blocking_checkpoint_lock: Arc, } impl MvStore { @@ -1128,6 +1136,7 @@ impl MvStore { commits_waiting: Arc::new(AtomicU64::new(0)), }), global_header: Arc::new(RwLock::new(None)), + blocking_checkpoint_lock: Arc::new(TursoRwLock::new()), } } @@ -1407,6 +1416,11 @@ impl MvStore { pager: Arc, maybe_existing_tx_id: Option, ) -> Result> { + if !self.blocking_checkpoint_lock.read() { + // If there is a stop-the-world checkpoint in progress, we cannot begin any transaction at all. + return Err(LimboError::Busy); + } + let unlock = || self.blocking_checkpoint_lock.unlock(); let tx_id = maybe_existing_tx_id.unwrap_or_else(|| self.get_tx_id()); let begin_ts = if let Some(tx_id) = maybe_existing_tx_id { self.txs.get(&tx_id).unwrap().value().begin_ts @@ -1414,7 +1428,8 @@ impl MvStore { self.get_timestamp() }; - self.acquire_exclusive_tx(&tx_id)?; + self.acquire_exclusive_tx(&tx_id) + .inspect_err(|_| unlock())?; let locked = self.commit_coordinator.pager_commit_lock.write(); if !locked { @@ -1423,6 +1438,7 @@ impl MvStore { tx_id ); self.release_exclusive_tx(&tx_id); + unlock(); return Err(LimboError::Busy); } @@ -1444,6 +1460,10 @@ impl MvStore { /// that you can use to perform operations within the transaction. All changes made within the /// transaction are isolated from other transactions until you commit the transaction. pub fn begin_tx(&self, pager: Arc) -> Result { + if !self.blocking_checkpoint_lock.read() { + // If there is a stop-the-world checkpoint in progress, we cannot begin any transaction at all. + return Err(LimboError::Busy); + } let tx_id = self.get_tx_id(); let begin_ts = self.get_timestamp(); @@ -1456,6 +1476,11 @@ impl MvStore { Ok(tx_id) } + pub fn remove_tx(&self, tx_id: TxID) { + self.txs.remove(&tx_id); + self.blocking_checkpoint_lock.unlock(); + } + fn get_new_transaction_database_header(&self, pager: &Arc) -> DatabaseHeader { if self.global_header.read().is_none() { pager.io.block(|| pager.maybe_allocate_page1()).unwrap(); @@ -1599,7 +1624,7 @@ impl MvStore { tracing::trace!("terminate(tx_id={})", tx_id); // FIXME: verify that we can already remove the transaction here! // Maybe it's fine for snapshot isolation, but too early for serializable? - self.txs.remove(&tx_id); + self.remove_tx(tx_id); Ok(()) }