From 6a20735fe0057d82aa50044e210b569855a97d0d Mon Sep 17 00:00:00 2001 From: Jussi Saurio Date: Mon, 22 Sep 2025 12:40:19 +0300 Subject: [PATCH] mvcc: add blocking checkpoint lock MVCC checkpoints are always TRUNCATE, plus they block all other transactions. This guarantees that never need to let transactions read from the SQLite WAL. In MVCC, the checkpoint procedure is roughly as follows: - Take the blocking_checkpoint_lock - Write everything in the logical log to the pager, and from there commit to the SQLite WAL. - Immediately TRUNCATE checkpoint the WAL into the database file. - Release the blocking_checkpoint_lock. --- core/mvcc/database/mod.rs | 31 ++++++++++++++++++++++++++++--- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index 547cd9e95..da64dda13 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -900,7 +900,7 @@ impl StateTransition for CommitStateMachine { // But that's a problem for another day. // FIXME: it actually just become a problem for today!!! // TODO: test that reproduces this failure, and then a fix - mvcc_store.txs.remove(&self.tx_id); + mvcc_store.remove_tx(self.tx_id); if mvcc_store.is_exclusive_tx(&self.tx_id) { mvcc_store.release_exclusive_tx(&self.tx_id); @@ -1109,6 +1109,14 @@ pub struct MvStore { exclusive_tx: RwLock>, commit_coordinator: Arc, global_header: Arc>>, + /// MVCC checkpoints are always TRUNCATE, plus they block all other transactions. + /// This guarantees that never need to let transactions read from the SQLite WAL. + /// In MVCC, the checkpoint procedure is roughly as follows: + /// - Take the blocking_checkpoint_lock + /// - Write everything in the logical log to the pager, and from there commit to the SQLite WAL. + /// - Immediately TRUNCATE checkpoint the WAL into the database file. + /// - Release the blocking_checkpoint_lock. + blocking_checkpoint_lock: Arc, } impl MvStore { @@ -1128,6 +1136,7 @@ impl MvStore { commits_waiting: Arc::new(AtomicU64::new(0)), }), global_header: Arc::new(RwLock::new(None)), + blocking_checkpoint_lock: Arc::new(TursoRwLock::new()), } } @@ -1407,6 +1416,11 @@ impl MvStore { pager: Arc, maybe_existing_tx_id: Option, ) -> Result> { + if !self.blocking_checkpoint_lock.read() { + // If there is a stop-the-world checkpoint in progress, we cannot begin any transaction at all. + return Err(LimboError::Busy); + } + let unlock = || self.blocking_checkpoint_lock.unlock(); let tx_id = maybe_existing_tx_id.unwrap_or_else(|| self.get_tx_id()); let begin_ts = if let Some(tx_id) = maybe_existing_tx_id { self.txs.get(&tx_id).unwrap().value().begin_ts @@ -1414,7 +1428,8 @@ impl MvStore { self.get_timestamp() }; - self.acquire_exclusive_tx(&tx_id)?; + self.acquire_exclusive_tx(&tx_id) + .inspect_err(|_| unlock())?; let locked = self.commit_coordinator.pager_commit_lock.write(); if !locked { @@ -1423,6 +1438,7 @@ impl MvStore { tx_id ); self.release_exclusive_tx(&tx_id); + unlock(); return Err(LimboError::Busy); } @@ -1444,6 +1460,10 @@ impl MvStore { /// that you can use to perform operations within the transaction. All changes made within the /// transaction are isolated from other transactions until you commit the transaction. pub fn begin_tx(&self, pager: Arc) -> Result { + if !self.blocking_checkpoint_lock.read() { + // If there is a stop-the-world checkpoint in progress, we cannot begin any transaction at all. + return Err(LimboError::Busy); + } let tx_id = self.get_tx_id(); let begin_ts = self.get_timestamp(); @@ -1456,6 +1476,11 @@ impl MvStore { Ok(tx_id) } + pub fn remove_tx(&self, tx_id: TxID) { + self.txs.remove(&tx_id); + self.blocking_checkpoint_lock.unlock(); + } + fn get_new_transaction_database_header(&self, pager: &Arc) -> DatabaseHeader { if self.global_header.read().is_none() { pager.io.block(|| pager.maybe_allocate_page1()).unwrap(); @@ -1599,7 +1624,7 @@ impl MvStore { tracing::trace!("terminate(tx_id={})", tx_id); // FIXME: verify that we can already remove the transaction here! // Maybe it's fine for snapshot isolation, but too early for serializable? - self.txs.remove(&tx_id); + self.remove_tx(tx_id); Ok(()) }