From d616a375eeafed6cf9dd647bbf6d893cf50de2fe Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Thu, 31 Jul 2025 17:26:02 +0200 Subject: [PATCH] core/mvcc: commit_tx state machine --- core/mvcc/database/mod.rs | 559 ++++++++++++++++++++++++-------------- 1 file changed, 353 insertions(+), 206 deletions(-) diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index fedce8802..54eedeebb 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -10,6 +10,7 @@ use crossbeam_skiplist::{SkipMap, SkipSet}; use parking_lot::RwLock; use std::collections::HashSet; use std::fmt::Debug; +use std::marker::PhantomData; use std::rc::Rc; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; @@ -235,6 +236,349 @@ impl AtomicTransactionState { } } +pub enum TransitionResult { + Io, + Continue, + Done, +} + +pub trait StateTransition { + type State; + type Context; + + fn transition<'a>(&mut self, context: &Self::Context) -> Result; + fn finalize<'a>(&mut self, context: &Self::Context) -> Result<()>; + fn is_finalized(&self) -> bool; +} + +pub struct StateMachine { + state: State, + is_finalized: bool, +} + +impl StateMachine { + fn new(state: State) -> Self { + Self { + state, + is_finalized: false, + } + } +} + +impl StateTransition for StateMachine { + type State = State; + type Context = State::Context; + + fn transition<'a>(&mut self, context: &Self::Context) -> Result { + loop { + if self.is_finalized { + unreachable!("StateMachine::transition: state machine is finalized"); + } + match self.state.transition(context)? { + TransitionResult::Io => { + return Ok(TransitionResult::Io); + } + TransitionResult::Continue => { + continue; + } + TransitionResult::Done => { + assert!(self.state.is_finalized()); + self.is_finalized = true; + return Ok(TransitionResult::Done); + } + } + } + } + + fn finalize<'a>(&mut self, context: &Self::Context) -> Result<()> { + self.state.finalize(context)?; + self.is_finalized = true; + Ok(()) + } + + fn is_finalized(&self) -> bool { + self.is_finalized + } +} + +#[derive(Debug)] +pub enum CommitState { + Initial, + BeginPagerTxn { end_ts: u64 }, + WriteRows { end_ts: u64 }, + CommitPagerTxn { end_ts: u64 }, + Commit { end_ts: u64 }, +} + +struct CommitStateMachine { + state: CommitState, + is_finalized: bool, + pager: Rc, + tx_id: TxID, + connection: Arc, + write_set: Vec, + _phantom: PhantomData, +} + +impl CommitStateMachine { + fn new(state: CommitState, pager: Rc, tx_id: TxID, connection: Arc) -> Self { + Self { + state, + is_finalized: false, + pager, + tx_id, + connection, + write_set: Vec::new(), + _phantom: PhantomData, + } + } +} + +impl StateTransition for CommitStateMachine { + type State = CommitStateMachine; + type Context = MvStore; + + #[tracing::instrument(fields(state = ?self.state), skip(self, mvcc_store))] + fn transition<'a>(&mut self, mvcc_store: &Self::Context) -> Result { + match self.state { + CommitState::Initial => { + let end_ts = mvcc_store.get_timestamp(); + // NOTICE: the first shadowed tx keeps the entry alive in the map + // for the duration of this whole function, which is important for correctness! + let tx = mvcc_store + .txs + .get(&self.tx_id) + .ok_or(DatabaseError::TxTerminated)?; + let tx = tx.value().write(); + match tx.state.load() { + TransactionState::Terminated => return Err(DatabaseError::TxTerminated), + _ => { + assert_eq!(tx.state, TransactionState::Active); + } + } + tx.state.store(TransactionState::Preparing); + tracing::trace!("prepare_tx(tx_id={})", self.tx_id); + + /* TODO: The code we have here is sufficient for snapshot isolation. + ** In order to implement serializability, we need the following steps: + ** + ** 1. Validate if all read versions are still visible by inspecting the read_set + ** 2. Validate if there are no phantoms by walking the scans from scan_set (which we don't even have yet) + ** - a phantom is a version that became visible in the middle of our transaction, + ** but wasn't taken into account during one of the scans from the scan_set + ** 3. Wait for commit dependencies, which we don't even track yet... + ** Excerpt from what's a commit dependency and how it's tracked in the original paper: + ** """ + A transaction T1 has a commit dependency on another transaction + T2, if T1 is allowed to commit only if T2 commits. If T2 aborts, + T1 must also abort, so cascading aborts are possible. T1 acquires a + commit dependency either by speculatively reading or speculatively ignoring a version, + instead of waiting for T2 to commit. + We implement commit dependencies by a register-and-report + approach: T1 registers its dependency with T2 and T2 informs T1 + when it has committed or aborted. Each transaction T contains a + counter, CommitDepCounter, that counts how many unresolved + commit dependencies it still has. A transaction cannot commit + until this counter is zero. In addition, T has a Boolean variable + AbortNow that other transactions can set to tell T to abort. Each + transaction T also has a set, CommitDepSet, that stores transaction IDs + of the transactions that depend on T. + To take a commit dependency on a transaction T2, T1 increments + its CommitDepCounter and adds its transaction ID to T2’s CommitDepSet. + When T2 has committed, it locates each transaction in + its CommitDepSet and decrements their CommitDepCounter. If + T2 aborted, it tells the dependent transactions to also abort by + setting their AbortNow flags. If a dependent transaction is not + found, this means that it has already aborted. + Note that a transaction with commit dependencies may not have to + wait at all - the dependencies may have been resolved before it is + ready to commit. Commit dependencies consolidate all waits into + a single wait and postpone the wait to just before commit. + Some transactions may have to wait before commit. + Waiting raises a concern of deadlocks. + However, deadlocks cannot occur because an older transaction never + waits on a younger transaction. In + a wait-for graph the direction of edges would always be from a + younger transaction (higher end timestamp) to an older transaction + (lower end timestamp) so cycles are impossible. + """ + ** If you're wondering when a speculative read happens, here you go: + ** Case 1: speculative read of TB: + """ + If transaction TB is in the Preparing state, it has acquired an end + timestamp TS which will be V’s begin timestamp if TB commits. + A safe approach in this situation would be to have transaction T + wait until transaction TB commits. However, we want to avoid all + blocking during normal processing so instead we continue with + the visibility test and, if the test returns true, allow T to + speculatively read V. Transaction T acquires a commit dependency on + TB, restricting the serialization order of the two transactions. That + is, T is allowed to commit only if TB commits. + """ + ** Case 2: speculative ignore of TE: + """ + If TE’s state is Preparing, it has an end timestamp TS that will become + the end timestamp of V if TE does commit. If TS is greater than the read + time RT, it is obvious that V will be visible if TE commits. If TE + aborts, V will still be visible, because any transaction that updates + V after TE has aborted will obtain an end timestamp greater than + TS. If TS is less than RT, we have a more complicated situation: + if TE commits, V will not be visible to T but if TE aborts, it will + be visible. We could handle this by forcing T to wait until TE + commits or aborts but we want to avoid all blocking during normal processing. + Instead we allow T to speculatively ignore V and + proceed with its processing. Transaction T acquires a commit + dependency (see Section 2.7) on TE, that is, T is allowed to commit + only if TE commits. + """ + */ + tx.state.store(TransactionState::Committed(end_ts)); + tracing::trace!("commit_tx(tx_id={})", self.tx_id); + self.write_set + .extend(tx.write_set.iter().map(|v| *v.value())); + self.state = CommitState::BeginPagerTxn { end_ts }; + Ok(TransitionResult::Continue) + } + CommitState::BeginPagerTxn { end_ts } => { + // FIXME: how do we deal with multiple concurrent writes? + // WAL requires a txn to be written sequentially. Either we: + // 1. Wait for currently writer to finish before second txn starts. + // 2. Choose a txn to write depending on some heuristics like amount of frames will be written. + // 3. .. + // + loop { + match self.pager.begin_write_tx() { + Ok(crate::types::IOResult::Done(result)) => { + if let crate::result::LimboResult::Busy = result { + return Err(DatabaseError::Io( + "Pager write transaction busy".to_string(), + )); + } + break; + } + Ok(crate::types::IOResult::IO) => { + // FIXME: this is a hack to make the pager run the IO loop + self.pager.io.run_once().unwrap(); + continue; + } + Err(e) => { + return Err(DatabaseError::Io(e.to_string())); + } + } + } + self.state = CommitState::WriteRows { end_ts }; + return Ok(TransitionResult::Continue); + } + CommitState::WriteRows { end_ts } => { + for id in &self.write_set { + if let Some(row_versions) = mvcc_store.rows.get(id) { + let row_versions = row_versions.value().read(); + // Find rows that were written by this transaction + for row_version in row_versions.iter() { + if let TxTimestampOrID::TxID(row_tx_id) = row_version.begin { + if row_tx_id == self.tx_id { + mvcc_store + .write_row_to_pager(self.pager.clone(), &row_version.row)?; + break; + } + } + if let Some(TxTimestampOrID::Timestamp(row_tx_id)) = row_version.end { + if row_tx_id == self.tx_id { + mvcc_store + .write_row_to_pager(self.pager.clone(), &row_version.row)?; + break; + } + } + } + } + } + self.state = CommitState::CommitPagerTxn { end_ts }; + Ok(TransitionResult::Continue) + } + CommitState::CommitPagerTxn { end_ts } => { + // Write committed data to pager for persistence + // Flush dirty pages to WAL - this is critical for data persistence + // Similar to what step_end_write_txn does for legacy transactions + loop { + let result = self + .pager + .end_tx( + false, // rollback = false since we're committing + false, // schema_did_change = false for now (could be improved) + &self.connection, + self.connection.wal_checkpoint_disabled.get(), + ) + .map_err(|e| DatabaseError::Io(e.to_string())) + .unwrap(); + if let crate::types::IOResult::Done(_) = result { + break; + } + } + self.state = CommitState::Commit { end_ts }; + Ok(TransitionResult::Continue) + } + CommitState::Commit { end_ts } => { + let mut log_record = LogRecord::new(end_ts); + for ref id in &self.write_set { + if let Some(row_versions) = mvcc_store.rows.get(id) { + let mut row_versions = row_versions.value().write(); + for row_version in row_versions.iter_mut() { + if let TxTimestampOrID::TxID(id) = row_version.begin { + if id == self.tx_id { + // New version is valid STARTING FROM committing transaction's end timestamp + // See diagram on page 299: https://www.cs.cmu.edu/~15721-f24/papers/Hekaton.pdf + row_version.begin = TxTimestampOrID::Timestamp(end_ts); + mvcc_store.insert_version_raw( + &mut log_record.row_versions, + row_version.clone(), + ); // FIXME: optimize cloning out + } + } + if let Some(TxTimestampOrID::TxID(id)) = row_version.end { + if id == self.tx_id { + // Old version is valid UNTIL committing transaction's end timestamp + // See diagram on page 299: https://www.cs.cmu.edu/~15721-f24/papers/Hekaton.pdf + row_version.end = Some(TxTimestampOrID::Timestamp(end_ts)); + mvcc_store.insert_version_raw( + &mut log_record.row_versions, + row_version.clone(), + ); // FIXME: optimize cloning out + } + } + } + } + } + tracing::trace!("updated(tx_id={})", self.tx_id); + + // We have now updated all the versions with a reference to the + // transaction ID to a timestamp and can, therefore, remove the + // transaction. Please note that when we move to lockless, the + // invariant doesn't necessarily hold anymore because another thread + // might have speculatively read a version that we want to remove. + // But that's a problem for another day. + // FIXME: it actually just become a problem for today!!! + // TODO: test that reproduces this failure, and then a fix + mvcc_store.txs.remove(&self.tx_id); + if !log_record.row_versions.is_empty() { + mvcc_store.storage.log_tx(log_record)?; + } + tracing::trace!("logged(tx_id={})", self.tx_id); + self.finalize(mvcc_store)?; + Ok(TransitionResult::Done) + } + } + } + + fn finalize<'a>(&mut self, _context: &Self::Context) -> Result<()> { + self.is_finalized = true; + Ok(()) + } + + fn is_finalized(&self) -> bool { + self.is_finalized + } +} + /// A multi-version concurrency control database. #[derive(Debug)] pub struct MvStore { @@ -510,210 +854,13 @@ impl MvStore { pager: Rc, connection: &Arc, ) -> Result<()> { - let end_ts = self.get_timestamp(); - // NOTICE: the first shadowed tx keeps the entry alive in the map - // for the duration of this whole function, which is important for correctness! - let tx = self.txs.get(&tx_id).ok_or(DatabaseError::TxTerminated)?; - let tx = tx.value().write(); - match tx.state.load() { - TransactionState::Terminated => return Err(DatabaseError::TxTerminated), - _ => { - assert_eq!(tx.state, TransactionState::Active); - } - } - tx.state.store(TransactionState::Preparing); - tracing::trace!("prepare_tx(tx_id={})", tx_id); - - /* TODO: The code we have here is sufficient for snapshot isolation. - ** In order to implement serializability, we need the following steps: - ** - ** 1. Validate if all read versions are still visible by inspecting the read_set - ** 2. Validate if there are no phantoms by walking the scans from scan_set (which we don't even have yet) - ** - a phantom is a version that became visible in the middle of our transaction, - ** but wasn't taken into account during one of the scans from the scan_set - ** 3. Wait for commit dependencies, which we don't even track yet... - ** Excerpt from what's a commit dependency and how it's tracked in the original paper: - ** """ - A transaction T1 has a commit dependency on another transaction - T2, if T1 is allowed to commit only if T2 commits. If T2 aborts, - T1 must also abort, so cascading aborts are possible. T1 acquires a - commit dependency either by speculatively reading or speculatively ignoring a version, - instead of waiting for T2 to commit. - We implement commit dependencies by a register-and-report - approach: T1 registers its dependency with T2 and T2 informs T1 - when it has committed or aborted. Each transaction T contains a - counter, CommitDepCounter, that counts how many unresolved - commit dependencies it still has. A transaction cannot commit - until this counter is zero. In addition, T has a Boolean variable - AbortNow that other transactions can set to tell T to abort. Each - transaction T also has a set, CommitDepSet, that stores transaction IDs - of the transactions that depend on T. - To take a commit dependency on a transaction T2, T1 increments - its CommitDepCounter and adds its transaction ID to T2’s CommitDepSet. - When T2 has committed, it locates each transaction in - its CommitDepSet and decrements their CommitDepCounter. If - T2 aborted, it tells the dependent transactions to also abort by - setting their AbortNow flags. If a dependent transaction is not - found, this means that it has already aborted. - Note that a transaction with commit dependencies may not have to - wait at all - the dependencies may have been resolved before it is - ready to commit. Commit dependencies consolidate all waits into - a single wait and postpone the wait to just before commit. - Some transactions may have to wait before commit. - Waiting raises a concern of deadlocks. - However, deadlocks cannot occur because an older transaction never - waits on a younger transaction. In - a wait-for graph the direction of edges would always be from a - younger transaction (higher end timestamp) to an older transaction - (lower end timestamp) so cycles are impossible. - """ - ** If you're wondering when a speculative read happens, here you go: - ** Case 1: speculative read of TB: - """ - If transaction TB is in the Preparing state, it has acquired an end - timestamp TS which will be V’s begin timestamp if TB commits. - A safe approach in this situation would be to have transaction T - wait until transaction TB commits. However, we want to avoid all - blocking during normal processing so instead we continue with - the visibility test and, if the test returns true, allow T to - speculatively read V. Transaction T acquires a commit dependency on - TB, restricting the serialization order of the two transactions. That - is, T is allowed to commit only if TB commits. - """ - ** Case 2: speculative ignore of TE: - """ - If TE’s state is Preparing, it has an end timestamp TS that will become - the end timestamp of V if TE does commit. If TS is greater than the read - time RT, it is obvious that V will be visible if TE commits. If TE - aborts, V will still be visible, because any transaction that updates - V after TE has aborted will obtain an end timestamp greater than - TS. If TS is less than RT, we have a more complicated situation: - if TE commits, V will not be visible to T but if TE aborts, it will - be visible. We could handle this by forcing T to wait until TE - commits or aborts but we want to avoid all blocking during normal processing. - Instead we allow T to speculatively ignore V and - proceed with its processing. Transaction T acquires a commit - dependency (see Section 2.7) on TE, that is, T is allowed to commit - only if TE commits. - """ - */ - tx.state.store(TransactionState::Committed(end_ts)); - tracing::trace!("commit_tx(tx_id={})", tx_id); - let write_set: Vec = tx.write_set.iter().map(|v| *v.value()).collect(); - drop(tx); - // Postprocessing: inserting row versions and logging the transaction to persistent storage. - - // FIXME: how do we deal with multiple concurrent writes? - // WAL requires a txn to be written sequentially. Either we: - // 1. Wait for currently writer to finish before second txn starts. - // 2. Choose a txn to write depending on some heuristics like amount of frames will be written. - // 3. .. - // - loop { - match pager.begin_write_tx() { - Ok(crate::types::IOResult::Done(result)) => { - if let crate::result::LimboResult::Busy = result { - return Err(DatabaseError::Io( - "Pager write transaction busy".to_string(), - )); - } - break; - } - Ok(crate::types::IOResult::IO) => { - // FIXME: this is a hack to make the pager run the IO loop - pager.io.run_once().unwrap(); - continue; - } - Err(e) => { - return Err(DatabaseError::Io(e.to_string())); - } - } - } - - // 1. Write rows to btree for persistence - for id in &write_set { - if let Some(row_versions) = self.rows.get(id) { - let row_versions = row_versions.value().read(); - // Find rows that were written by this transaction - for row_version in row_versions.iter() { - if let TxTimestampOrID::TxID(row_tx_id) = row_version.begin { - if row_tx_id == tx_id { - self.write_row_to_pager(pager.clone(), &row_version.row)?; - break; - } - } - if let Some(TxTimestampOrID::Timestamp(row_tx_id)) = row_version.end { - if row_tx_id == tx_id { - self.write_row_to_pager(pager.clone(), &row_version.row)?; - break; - } - } - } - } - } - // Write committed data to pager for persistence - // Flush dirty pages to WAL - this is critical for data persistence - // Similar to what step_end_write_txn does for legacy transactions - loop { - let result = pager - .end_tx( - false, // rollback = false since we're committing - false, // schema_did_change = false for now (could be improved) - connection, - connection.wal_checkpoint_disabled.get(), - ) - .map_err(|e| DatabaseError::Io(e.to_string())) - .unwrap(); - if let crate::types::IOResult::Done(_) = result { - break; - } - } - // 2. Commit rows to log - let mut log_record = LogRecord::new(end_ts); - for ref id in write_set { - if let Some(row_versions) = self.rows.get(id) { - let mut row_versions = row_versions.value().write(); - for row_version in row_versions.iter_mut() { - if let TxTimestampOrID::TxID(id) = row_version.begin { - if id == tx_id { - // New version is valid STARTING FROM committing transaction's end timestamp - // See diagram on page 299: https://www.cs.cmu.edu/~15721-f24/papers/Hekaton.pdf - row_version.begin = TxTimestampOrID::Timestamp(end_ts); - self.insert_version_raw( - &mut log_record.row_versions, - row_version.clone(), - ); // FIXME: optimize cloning out - } - } - if let Some(TxTimestampOrID::TxID(id)) = row_version.end { - if id == tx_id { - // Old version is valid UNTIL committing transaction's end timestamp - // See diagram on page 299: https://www.cs.cmu.edu/~15721-f24/papers/Hekaton.pdf - row_version.end = Some(TxTimestampOrID::Timestamp(end_ts)); - self.insert_version_raw( - &mut log_record.row_versions, - row_version.clone(), - ); // FIXME: optimize cloning out - } - } - } - } - } - tracing::trace!("updated(tx_id={})", tx_id); - - // We have now updated all the versions with a reference to the - // transaction ID to a timestamp and can, therefore, remove the - // transaction. Please note that when we move to lockless, the - // invariant doesn't necessarily hold anymore because another thread - // might have speculatively read a version that we want to remove. - // But that's a problem for another day. - // FIXME: it actually just become a problem for today!!! - // TODO: test that reproduces this failure, and then a fix - self.txs.remove(&tx_id); - if !log_record.row_versions.is_empty() { - self.storage.log_tx(log_record)?; - } - tracing::trace!("logged(tx_id={})", tx_id); + let mut state_machine: StateMachine> = StateMachine::< + CommitStateMachine, + >::new( + CommitStateMachine::new(CommitState::Initial, pager, tx_id, connection.clone()), + ); + state_machine.transition(self)?; + assert!(state_machine.is_finalized()); Ok(()) } @@ -851,7 +998,7 @@ impl MvStore { /// Inserts a new row version into the internal data structure for versions, /// while making sure that the row version is inserted in the correct order. - fn insert_version_raw(&self, versions: &mut Vec, row_version: RowVersion) { + pub fn insert_version_raw(&self, versions: &mut Vec, row_version: RowVersion) { // NOTICE: this is an insert a'la insertion sort, with pessimistic linear complexity. // However, we expect the number of versions to be nearly sorted, so we deem it worthy // to search linearly for the insertion point instead of paying the price of using @@ -874,7 +1021,7 @@ impl MvStore { versions.insert(position, row_version); } - fn write_row_to_pager(&self, pager: Rc, row: &Row) -> Result<()> { + pub fn write_row_to_pager(&self, pager: Rc, row: &Row) -> Result<()> { use crate::storage::btree::BTreeCursor; use crate::types::{IOResult, SeekKey, SeekOp};