mvcc: add blocking checkpoint lock

MVCC checkpoints are always TRUNCATE, plus they block all other transactions.
This guarantees that never need to let transactions read from the SQLite WAL.

In MVCC, the checkpoint procedure is roughly as follows:
- Take the blocking_checkpoint_lock
- Write everything in the logical log to the pager, and from there commit to the SQLite WAL.
- Immediately TRUNCATE checkpoint the WAL into the database file.
- Release the blocking_checkpoint_lock.
This commit is contained in:
Jussi Saurio
2025-09-22 12:40:19 +03:00
parent 37866e74e5
commit 6a20735fe0

View File

@@ -900,7 +900,7 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
// But that's a problem for another day.
// FIXME: it actually just become a problem for today!!!
// TODO: test that reproduces this failure, and then a fix
mvcc_store.txs.remove(&self.tx_id);
mvcc_store.remove_tx(self.tx_id);
if mvcc_store.is_exclusive_tx(&self.tx_id) {
mvcc_store.release_exclusive_tx(&self.tx_id);
@@ -1109,6 +1109,14 @@ pub struct MvStore<Clock: LogicalClock> {
exclusive_tx: RwLock<Option<TxID>>,
commit_coordinator: Arc<CommitCoordinator>,
global_header: Arc<RwLock<Option<DatabaseHeader>>>,
/// MVCC checkpoints are always TRUNCATE, plus they block all other transactions.
/// This guarantees that never need to let transactions read from the SQLite WAL.
/// In MVCC, the checkpoint procedure is roughly as follows:
/// - Take the blocking_checkpoint_lock
/// - Write everything in the logical log to the pager, and from there commit to the SQLite WAL.
/// - Immediately TRUNCATE checkpoint the WAL into the database file.
/// - Release the blocking_checkpoint_lock.
blocking_checkpoint_lock: Arc<TursoRwLock>,
}
impl<Clock: LogicalClock> MvStore<Clock> {
@@ -1128,6 +1136,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
commits_waiting: Arc::new(AtomicU64::new(0)),
}),
global_header: Arc::new(RwLock::new(None)),
blocking_checkpoint_lock: Arc::new(TursoRwLock::new()),
}
}
@@ -1407,6 +1416,11 @@ impl<Clock: LogicalClock> MvStore<Clock> {
pager: Arc<Pager>,
maybe_existing_tx_id: Option<TxID>,
) -> Result<IOResult<TxID>> {
if !self.blocking_checkpoint_lock.read() {
// If there is a stop-the-world checkpoint in progress, we cannot begin any transaction at all.
return Err(LimboError::Busy);
}
let unlock = || self.blocking_checkpoint_lock.unlock();
let tx_id = maybe_existing_tx_id.unwrap_or_else(|| self.get_tx_id());
let begin_ts = if let Some(tx_id) = maybe_existing_tx_id {
self.txs.get(&tx_id).unwrap().value().begin_ts
@@ -1414,7 +1428,8 @@ impl<Clock: LogicalClock> MvStore<Clock> {
self.get_timestamp()
};
self.acquire_exclusive_tx(&tx_id)?;
self.acquire_exclusive_tx(&tx_id)
.inspect_err(|_| unlock())?;
let locked = self.commit_coordinator.pager_commit_lock.write();
if !locked {
@@ -1423,6 +1438,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
tx_id
);
self.release_exclusive_tx(&tx_id);
unlock();
return Err(LimboError::Busy);
}
@@ -1444,6 +1460,10 @@ impl<Clock: LogicalClock> MvStore<Clock> {
/// that you can use to perform operations within the transaction. All changes made within the
/// transaction are isolated from other transactions until you commit the transaction.
pub fn begin_tx(&self, pager: Arc<Pager>) -> Result<TxID> {
if !self.blocking_checkpoint_lock.read() {
// If there is a stop-the-world checkpoint in progress, we cannot begin any transaction at all.
return Err(LimboError::Busy);
}
let tx_id = self.get_tx_id();
let begin_ts = self.get_timestamp();
@@ -1456,6 +1476,11 @@ impl<Clock: LogicalClock> MvStore<Clock> {
Ok(tx_id)
}
pub fn remove_tx(&self, tx_id: TxID) {
self.txs.remove(&tx_id);
self.blocking_checkpoint_lock.unlock();
}
fn get_new_transaction_database_header(&self, pager: &Arc<Pager>) -> DatabaseHeader {
if self.global_header.read().is_none() {
pager.io.block(|| pager.maybe_allocate_page1()).unwrap();
@@ -1599,7 +1624,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
tracing::trace!("terminate(tx_id={})", tx_id);
// FIXME: verify that we can already remove the transaction here!
// Maybe it's fine for snapshot isolation, but too early for serializable?
self.txs.remove(&tx_id);
self.remove_tx(tx_id);
Ok(())
}