mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-09 03:04:20 +01:00
Merge 'mvcc: add blocking checkpoint lock' from Jussi Saurio
This PR does not implement MVCC checkpoint yet, just adds a lock for it. MVCC checkpoints are always TRUNCATE, plus they block all other transactions. This guarantees that never need to let transactions read from the SQLite WAL. In MVCC, the checkpoint procedure is roughly as follows: - Take the blocking_checkpoint_lock - Write everything in the logical log to the pager, and from there commit to the SQLite WAL. - Immediately TRUNCATE checkpoint the WAL into the database file. - Release the blocking_checkpoint_lock. Reviewed-by: Pere Diaz Bou <pere-altea@homail.com> Closes #3244
This commit is contained in:
@@ -900,7 +900,7 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
|
||||
// But that's a problem for another day.
|
||||
// FIXME: it actually just become a problem for today!!!
|
||||
// TODO: test that reproduces this failure, and then a fix
|
||||
mvcc_store.txs.remove(&self.tx_id);
|
||||
mvcc_store.remove_tx(self.tx_id);
|
||||
|
||||
if mvcc_store.is_exclusive_tx(&self.tx_id) {
|
||||
mvcc_store.release_exclusive_tx(&self.tx_id);
|
||||
@@ -1109,6 +1109,14 @@ pub struct MvStore<Clock: LogicalClock> {
|
||||
exclusive_tx: RwLock<Option<TxID>>,
|
||||
commit_coordinator: Arc<CommitCoordinator>,
|
||||
global_header: Arc<RwLock<Option<DatabaseHeader>>>,
|
||||
/// MVCC checkpoints are always TRUNCATE, plus they block all other transactions.
|
||||
/// This guarantees that never need to let transactions read from the SQLite WAL.
|
||||
/// In MVCC, the checkpoint procedure is roughly as follows:
|
||||
/// - Take the blocking_checkpoint_lock
|
||||
/// - Write everything in the logical log to the pager, and from there commit to the SQLite WAL.
|
||||
/// - Immediately TRUNCATE checkpoint the WAL into the database file.
|
||||
/// - Release the blocking_checkpoint_lock.
|
||||
blocking_checkpoint_lock: Arc<TursoRwLock>,
|
||||
}
|
||||
|
||||
impl<Clock: LogicalClock> MvStore<Clock> {
|
||||
@@ -1128,6 +1136,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
|
||||
commits_waiting: Arc::new(AtomicU64::new(0)),
|
||||
}),
|
||||
global_header: Arc::new(RwLock::new(None)),
|
||||
blocking_checkpoint_lock: Arc::new(TursoRwLock::new()),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1407,6 +1416,11 @@ impl<Clock: LogicalClock> MvStore<Clock> {
|
||||
pager: Arc<Pager>,
|
||||
maybe_existing_tx_id: Option<TxID>,
|
||||
) -> Result<IOResult<TxID>> {
|
||||
if !self.blocking_checkpoint_lock.read() {
|
||||
// If there is a stop-the-world checkpoint in progress, we cannot begin any transaction at all.
|
||||
return Err(LimboError::Busy);
|
||||
}
|
||||
let unlock = || self.blocking_checkpoint_lock.unlock();
|
||||
let tx_id = maybe_existing_tx_id.unwrap_or_else(|| self.get_tx_id());
|
||||
let begin_ts = if let Some(tx_id) = maybe_existing_tx_id {
|
||||
self.txs.get(&tx_id).unwrap().value().begin_ts
|
||||
@@ -1414,7 +1428,8 @@ impl<Clock: LogicalClock> MvStore<Clock> {
|
||||
self.get_timestamp()
|
||||
};
|
||||
|
||||
self.acquire_exclusive_tx(&tx_id)?;
|
||||
self.acquire_exclusive_tx(&tx_id)
|
||||
.inspect_err(|_| unlock())?;
|
||||
|
||||
let locked = self.commit_coordinator.pager_commit_lock.write();
|
||||
if !locked {
|
||||
@@ -1423,6 +1438,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
|
||||
tx_id
|
||||
);
|
||||
self.release_exclusive_tx(&tx_id);
|
||||
unlock();
|
||||
return Err(LimboError::Busy);
|
||||
}
|
||||
|
||||
@@ -1444,6 +1460,10 @@ impl<Clock: LogicalClock> MvStore<Clock> {
|
||||
/// that you can use to perform operations within the transaction. All changes made within the
|
||||
/// transaction are isolated from other transactions until you commit the transaction.
|
||||
pub fn begin_tx(&self, pager: Arc<Pager>) -> Result<TxID> {
|
||||
if !self.blocking_checkpoint_lock.read() {
|
||||
// If there is a stop-the-world checkpoint in progress, we cannot begin any transaction at all.
|
||||
return Err(LimboError::Busy);
|
||||
}
|
||||
let tx_id = self.get_tx_id();
|
||||
let begin_ts = self.get_timestamp();
|
||||
|
||||
@@ -1456,6 +1476,11 @@ impl<Clock: LogicalClock> MvStore<Clock> {
|
||||
Ok(tx_id)
|
||||
}
|
||||
|
||||
pub fn remove_tx(&self, tx_id: TxID) {
|
||||
self.txs.remove(&tx_id);
|
||||
self.blocking_checkpoint_lock.unlock();
|
||||
}
|
||||
|
||||
fn get_new_transaction_database_header(&self, pager: &Arc<Pager>) -> DatabaseHeader {
|
||||
if self.global_header.read().is_none() {
|
||||
pager.io.block(|| pager.maybe_allocate_page1()).unwrap();
|
||||
@@ -1599,7 +1624,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
|
||||
tracing::trace!("terminate(tx_id={})", tx_id);
|
||||
// FIXME: verify that we can already remove the transaction here!
|
||||
// Maybe it's fine for snapshot isolation, but too early for serializable?
|
||||
self.txs.remove(&tx_id);
|
||||
self.remove_tx(tx_id);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user