From 45288b1297418ea588d65f3165b573e9c77db62b Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Thu, 11 Sep 2025 19:54:08 +0300 Subject: [PATCH] core/mvcc: Eliminate RwLock wrapping Transaction The write and read sets in Transaction use SkipSet, which is thread-safe. Therefore, drop the RwLock wrapping Transaction everywhere, increasing MVCC throughput by almost 30%. Before: ``` Running write throughput benchmark with 1 threads, 1000 batch size, 1000 iterations, mode: Mvcc Database created at: write_throughput_test.db Thread 0: 1000000 inserts in 6.50s (153927.21 inserts/sec) === BENCHMARK RESULTS === Total inserts: 1000000 Total time: 6.50s Overall throughput: 153758.85 inserts/sec Threads: 1 Batch size: 1000 Iterations per thread: 1000 ``` After: ``` Running write throughput benchmark with 1 threads, 1000 batch size, 1000 iterations, mode: Mvcc Database created at: write_throughput_test.db Thread 0: 1000000 inserts in 5.10s (195927.13 inserts/sec) === BENCHMARK RESULTS === Total inserts: 1000000 Total time: 5.11s Overall throughput: 195663.94 inserts/sec Threads: 1 Batch size: 1000 Iterations per thread: 1000 ``` --- core/mvcc/database/mod.rs | 68 +++++++++++++++---------------------- core/mvcc/database/tests.rs | 9 +++-- 2 files changed, 32 insertions(+), 45 deletions(-) diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index 6727784e5..80e02d3d0 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -125,7 +125,7 @@ impl Transaction { self.read_set.insert(id); } - fn insert_to_write_set(&mut self, id: RowID) { + fn insert_to_write_set(&self, id: RowID) { self.write_set.insert(id); } } @@ -350,7 +350,7 @@ impl StateTransition for CommitStateMachine { .txs .get(&self.tx_id) .ok_or(LimboError::TxTerminated)?; - let tx = tx.value().write(); + let tx = tx.value(); match tx.state.load() { TransactionState::Terminated => { return Err(LimboError::TxTerminated); @@ -810,7 +810,7 @@ impl DeleteRowStateMachine { #[derive(Debug)] pub struct MvStore { rows: SkipMap>>, - txs: SkipMap>, + txs: SkipMap, tx_ids: AtomicU64, next_rowid: AtomicU64, clock: Clock, @@ -852,7 +852,7 @@ impl MvStore { .txs .get(&tx_id) .ok_or(LimboError::NoSuchTransactionID(tx_id.to_string()))?; - let mut tx = tx.value().write(); + let tx = tx.value(); assert_eq!(tx.state, TransactionState::Active); let id = row.id; let row_version = RowVersion { @@ -861,7 +861,6 @@ impl MvStore { row, }; tx.insert_to_write_set(id); - drop(tx); self.insert_version(id, row_version); Ok(()) } @@ -925,17 +924,16 @@ impl MvStore { .txs .get(&tx_id) .ok_or(LimboError::NoSuchTransactionID(tx_id.to_string()))?; - let tx = tx.value().read(); + let tx = tx.value(); assert_eq!(tx.state, TransactionState::Active); // A transaction cannot delete a version that it cannot see, // nor can it conflict with it. - if !rv.is_visible_to(&tx, &self.txs) { + if !rv.is_visible_to(tx, &self.txs) { continue; } - if is_write_write_conflict(&self.txs, &tx, rv) { + if is_write_write_conflict(&self.txs, tx, rv) { drop(row_versions); drop(row_versions_opt); - drop(tx); self.rollback_tx(tx_id, pager); return Err(LimboError::WriteWriteConflict); } @@ -943,12 +941,11 @@ impl MvStore { rv.end = Some(TxTimestampOrID::TxID(tx.tx_id)); drop(row_versions); drop(row_versions_opt); - drop(tx); let tx = self .txs .get(&tx_id) .ok_or(LimboError::NoSuchTransactionID(tx_id.to_string()))?; - let mut tx = tx.value().write(); + let tx = tx.value(); tx.insert_to_write_set(id); return Ok(true); } @@ -973,14 +970,14 @@ impl MvStore { pub fn read(&self, tx_id: TxID, id: RowID) -> Result> { tracing::trace!("read(tx_id={}, id={:?})", tx_id, id); let tx = self.txs.get(&tx_id).unwrap(); - let tx = tx.value().read(); + let tx = tx.value(); assert_eq!(tx.state, TransactionState::Active); if let Some(row_versions) = self.rows.get(&id) { let row_versions = row_versions.value().read(); if let Some(rv) = row_versions .iter() .rev() - .find(|rv| rv.is_visible_to(&tx, &self.txs)) + .find(|rv| rv.is_visible_to(tx, &self.txs)) { tx.insert_to_read_set(id); return Ok(Some(rv.row.clone())); @@ -1048,7 +1045,7 @@ impl MvStore { }; let tx = self.txs.get(&tx_id).unwrap(); - let tx = tx.value().read(); + let tx = tx.value(); let mut rows = self.rows.range(min_bound..max_bound); loop { // We are moving forward, so if a row was deleted we just need to skip it. Therefore, we need @@ -1057,7 +1054,7 @@ impl MvStore { let row = next_row?; // We found a row, let's check if it's visible to the transaction. - if let Some(visible_row) = self.find_last_visible_version(&tx, row) { + if let Some(visible_row) = self.find_last_visible_version(tx, row) { return Some(visible_row); } // If this row is not visible, continue to the next row @@ -1066,7 +1063,7 @@ impl MvStore { fn find_last_visible_version( &self, - tx: &parking_lot::lock_api::RwLockReadGuard<'_, parking_lot::RawRwLock, Transaction>, + tx: &Transaction, row: crossbeam_skiplist::map::Entry< '_, RowID, @@ -1090,15 +1087,15 @@ impl MvStore { tracing::trace!("seek_rowid(bound={:?}, lower_bound={})", bound, lower_bound,); let tx = self.txs.get(&tx_id).unwrap(); - let tx = tx.value().read(); + let tx = tx.value(); if lower_bound { self.rows .lower_bound(bound) - .and_then(|entry| self.find_last_visible_version(&tx, entry)) + .and_then(|entry| self.find_last_visible_version(tx, entry)) } else { self.rows .upper_bound(bound) - .and_then(|entry| self.find_last_visible_version(&tx, entry)) + .and_then(|entry| self.find_last_visible_version(tx, entry)) } } @@ -1112,7 +1109,7 @@ impl MvStore { let begin_ts = self.get_timestamp(); let tx = Transaction::new(tx_id, begin_ts); tracing::trace!("begin_tx(tx_id={})", tx_id); - self.txs.insert(tx_id, RwLock::new(tx)); + self.txs.insert(tx_id, tx); // TODO: we need to tie a pager's read transaction to a transaction ID, so that future refactors to read // pages from WAL/DB read from a consistent state to maintiain snapshot isolation. @@ -1154,12 +1151,11 @@ impl MvStore { /// * `tx_id` - The ID of the transaction to abort. pub fn rollback_tx(&self, tx_id: TxID, pager: Rc) { let tx_unlocked = self.txs.get(&tx_id).unwrap(); - let tx = tx_unlocked.value().write(); + let tx = tx_unlocked.value(); assert_eq!(tx.state, TransactionState::Active); tx.state.store(TransactionState::Aborted); tracing::trace!("abort(tx_id={})", tx_id); let write_set: Vec = tx.write_set.iter().map(|v| *v.value()).collect(); - drop(tx); for ref id in write_set { if let Some(row_versions) = self.rows.get(id) { @@ -1171,7 +1167,7 @@ impl MvStore { } } - let tx = tx_unlocked.value().read(); + let tx = tx_unlocked.value(); tx.state.store(TransactionState::Terminated); tracing::trace!("terminate(tx_id={})", tx_id); pager.end_read_tx().unwrap(); @@ -1210,7 +1206,7 @@ impl MvStore { // a transaction started before this row version ended, ergo row version is needed // NOTICE: O(row_versions x transactions), but also lock-free, so sounds acceptable self.txs.iter().any(|tx| { - let tx = tx.value().read(); + let tx = tx.value(); // FIXME: verify! match tx.state.load() { TransactionState::Active | TransactionState::Preparing => { @@ -1264,7 +1260,7 @@ impl MvStore { fn get_begin_timestamp(&self, ts_or_id: &TxTimestampOrID) -> u64 { match ts_or_id { TxTimestampOrID::Timestamp(ts) => *ts, - TxTimestampOrID::TxID(tx_id) => self.txs.get(tx_id).unwrap().value().read().begin_ts, + TxTimestampOrID::TxID(tx_id) => self.txs.get(tx_id).unwrap().value().begin_ts, } } @@ -1455,14 +1451,14 @@ impl MvStore { /// Ref: https://www.cs.cmu.edu/~15721-f24/papers/Hekaton.pdf , page 301, /// 2.6. Updating a Version. pub(crate) fn is_write_write_conflict( - txs: &SkipMap>, + txs: &SkipMap, tx: &Transaction, rv: &RowVersion, ) -> bool { match rv.end { Some(TxTimestampOrID::TxID(rv_end)) => { let te = txs.get(&rv_end).unwrap(); - let te = te.value().read(); + let te = te.value(); if te.tx_id == tx.tx_id { return false; } @@ -1478,25 +1474,17 @@ pub(crate) fn is_write_write_conflict( } impl RowVersion { - pub fn is_visible_to( - &self, - tx: &Transaction, - txs: &SkipMap>, - ) -> bool { + pub fn is_visible_to(&self, tx: &Transaction, txs: &SkipMap) -> bool { is_begin_visible(txs, tx, self) && is_end_visible(txs, tx, self) } } -fn is_begin_visible( - txs: &SkipMap>, - tx: &Transaction, - rv: &RowVersion, -) -> bool { +fn is_begin_visible(txs: &SkipMap, tx: &Transaction, rv: &RowVersion) -> bool { match rv.begin { TxTimestampOrID::Timestamp(rv_begin_ts) => tx.begin_ts >= rv_begin_ts, TxTimestampOrID::TxID(rv_begin) => { let tb = txs.get(&rv_begin).unwrap(); - let tb = tb.value().read(); + let tb = tb.value(); let visible = match tb.state.load() { TransactionState::Active => tx.tx_id == tb.tx_id && rv.end.is_none(), TransactionState::Preparing => false, // NOTICE: makes sense for snapshot isolation, not so much for serializable! @@ -1518,7 +1506,7 @@ fn is_begin_visible( } fn is_end_visible( - txs: &SkipMap>, + txs: &SkipMap, current_tx: &Transaction, row_version: &RowVersion, ) -> bool { @@ -1526,7 +1514,7 @@ fn is_end_visible( Some(TxTimestampOrID::Timestamp(rv_end_ts)) => current_tx.begin_ts < rv_end_ts, Some(TxTimestampOrID::TxID(rv_end)) => { let other_tx = txs.get(&rv_end).unwrap(); - let other_tx = other_tx.value().read(); + let other_tx = other_tx.value(); let visible = match other_tx.state.load() { // V's sharp mind discovered an issue with the hekaton paper which basically states that a // transaction can see a row version if the end is a TXId only if it isn't the same transaction. diff --git a/core/mvcc/database/tests.rs b/core/mvcc/database/tests.rs index 481f9cdaf..f5d7f7d08 100644 --- a/core/mvcc/database/tests.rs +++ b/core/mvcc/database/tests.rs @@ -983,20 +983,20 @@ Terminated | Irrelevant | Reread V’s End field. TE has terminated so or not found | | the timestamp. */ -fn new_tx(tx_id: TxID, begin_ts: u64, state: TransactionState) -> RwLock { +fn new_tx(tx_id: TxID, begin_ts: u64, state: TransactionState) -> Transaction { let state = state.into(); - RwLock::new(Transaction { + Transaction { state, tx_id, begin_ts, write_set: SkipSet::new(), read_set: SkipSet::new(), - }) + } } #[test] fn test_snapshot_isolation_tx_visible1() { - let txs: SkipMap> = SkipMap::from_iter([ + let txs: SkipMap = SkipMap::from_iter([ (1, new_tx(1, 1, TransactionState::Committed(2))), (2, new_tx(2, 2, TransactionState::Committed(5))), (3, new_tx(3, 3, TransactionState::Aborted)), @@ -1006,7 +1006,6 @@ fn test_snapshot_isolation_tx_visible1() { ]); let current_tx = new_tx(4, 4, TransactionState::Preparing); - let current_tx = current_tx.read(); let rv_visible = |begin: TxTimestampOrID, end: Option| { let row_version = RowVersion {