Merge 'core/mvcc: Eliminate RwLock wrapping Transaction' from Pekka Enberg

The write and read sets in Transaction use SkipSet, which is thread-
safe. Therefore, drop the RwLock wrapping Transaction everywhere,
increasing MVCC throughput by almost 30%.
Before:
```
Running write throughput benchmark with 1 threads, 1000 batch size, 1000 iterations, mode: Mvcc
Database created at: write_throughput_test.db
Thread 0: 1000000 inserts in 6.50s (153927.21 inserts/sec)

=== BENCHMARK RESULTS ===
Total inserts: 1000000
Total time: 6.50s
Overall throughput: 153758.85 inserts/sec
Threads: 1
Batch size: 1000
Iterations per thread: 1000
```
After:
```
Running write throughput benchmark with 1 threads, 1000 batch size, 1000 iterations, mode: Mvcc
Database created at: write_throughput_test.db
Thread 0: 1000000 inserts in 5.10s (195927.13 inserts/sec)

=== BENCHMARK RESULTS ===
Total inserts: 1000000
Total time: 5.11s
Overall throughput: 195663.94 inserts/sec
Threads: 1
Batch size: 1000
Iterations per thread: 1000
```

Closes #3035
This commit is contained in:
Pekka Enberg
2025-09-11 20:55:14 +03:00
committed by GitHub
2 changed files with 32 additions and 45 deletions

View File

@@ -125,7 +125,7 @@ impl Transaction {
self.read_set.insert(id);
}
fn insert_to_write_set(&mut self, id: RowID) {
fn insert_to_write_set(&self, id: RowID) {
self.write_set.insert(id);
}
}
@@ -350,7 +350,7 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
.txs
.get(&self.tx_id)
.ok_or(LimboError::TxTerminated)?;
let tx = tx.value().write();
let tx = tx.value();
match tx.state.load() {
TransactionState::Terminated => {
return Err(LimboError::TxTerminated);
@@ -810,7 +810,7 @@ impl DeleteRowStateMachine {
#[derive(Debug)]
pub struct MvStore<Clock: LogicalClock> {
rows: SkipMap<RowID, RwLock<Vec<RowVersion>>>,
txs: SkipMap<TxID, RwLock<Transaction>>,
txs: SkipMap<TxID, Transaction>,
tx_ids: AtomicU64,
next_rowid: AtomicU64,
clock: Clock,
@@ -852,7 +852,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
.txs
.get(&tx_id)
.ok_or(LimboError::NoSuchTransactionID(tx_id.to_string()))?;
let mut tx = tx.value().write();
let tx = tx.value();
assert_eq!(tx.state, TransactionState::Active);
let id = row.id;
let row_version = RowVersion {
@@ -861,7 +861,6 @@ impl<Clock: LogicalClock> MvStore<Clock> {
row,
};
tx.insert_to_write_set(id);
drop(tx);
self.insert_version(id, row_version);
Ok(())
}
@@ -925,17 +924,16 @@ impl<Clock: LogicalClock> MvStore<Clock> {
.txs
.get(&tx_id)
.ok_or(LimboError::NoSuchTransactionID(tx_id.to_string()))?;
let tx = tx.value().read();
let tx = tx.value();
assert_eq!(tx.state, TransactionState::Active);
// A transaction cannot delete a version that it cannot see,
// nor can it conflict with it.
if !rv.is_visible_to(&tx, &self.txs) {
if !rv.is_visible_to(tx, &self.txs) {
continue;
}
if is_write_write_conflict(&self.txs, &tx, rv) {
if is_write_write_conflict(&self.txs, tx, rv) {
drop(row_versions);
drop(row_versions_opt);
drop(tx);
self.rollback_tx(tx_id, pager);
return Err(LimboError::WriteWriteConflict);
}
@@ -943,12 +941,11 @@ impl<Clock: LogicalClock> MvStore<Clock> {
rv.end = Some(TxTimestampOrID::TxID(tx.tx_id));
drop(row_versions);
drop(row_versions_opt);
drop(tx);
let tx = self
.txs
.get(&tx_id)
.ok_or(LimboError::NoSuchTransactionID(tx_id.to_string()))?;
let mut tx = tx.value().write();
let tx = tx.value();
tx.insert_to_write_set(id);
return Ok(true);
}
@@ -973,14 +970,14 @@ impl<Clock: LogicalClock> MvStore<Clock> {
pub fn read(&self, tx_id: TxID, id: RowID) -> Result<Option<Row>> {
tracing::trace!("read(tx_id={}, id={:?})", tx_id, id);
let tx = self.txs.get(&tx_id).unwrap();
let tx = tx.value().read();
let tx = tx.value();
assert_eq!(tx.state, TransactionState::Active);
if let Some(row_versions) = self.rows.get(&id) {
let row_versions = row_versions.value().read();
if let Some(rv) = row_versions
.iter()
.rev()
.find(|rv| rv.is_visible_to(&tx, &self.txs))
.find(|rv| rv.is_visible_to(tx, &self.txs))
{
tx.insert_to_read_set(id);
return Ok(Some(rv.row.clone()));
@@ -1048,7 +1045,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
};
let tx = self.txs.get(&tx_id).unwrap();
let tx = tx.value().read();
let tx = tx.value();
let mut rows = self.rows.range(min_bound..max_bound);
loop {
// We are moving forward, so if a row was deleted we just need to skip it. Therefore, we need
@@ -1057,7 +1054,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
let row = next_row?;
// We found a row, let's check if it's visible to the transaction.
if let Some(visible_row) = self.find_last_visible_version(&tx, row) {
if let Some(visible_row) = self.find_last_visible_version(tx, row) {
return Some(visible_row);
}
// If this row is not visible, continue to the next row
@@ -1066,7 +1063,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
fn find_last_visible_version(
&self,
tx: &parking_lot::lock_api::RwLockReadGuard<'_, parking_lot::RawRwLock, Transaction>,
tx: &Transaction,
row: crossbeam_skiplist::map::Entry<
'_,
RowID,
@@ -1090,15 +1087,15 @@ impl<Clock: LogicalClock> MvStore<Clock> {
tracing::trace!("seek_rowid(bound={:?}, lower_bound={})", bound, lower_bound,);
let tx = self.txs.get(&tx_id).unwrap();
let tx = tx.value().read();
let tx = tx.value();
if lower_bound {
self.rows
.lower_bound(bound)
.and_then(|entry| self.find_last_visible_version(&tx, entry))
.and_then(|entry| self.find_last_visible_version(tx, entry))
} else {
self.rows
.upper_bound(bound)
.and_then(|entry| self.find_last_visible_version(&tx, entry))
.and_then(|entry| self.find_last_visible_version(tx, entry))
}
}
@@ -1112,7 +1109,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
let begin_ts = self.get_timestamp();
let tx = Transaction::new(tx_id, begin_ts);
tracing::trace!("begin_tx(tx_id={})", tx_id);
self.txs.insert(tx_id, RwLock::new(tx));
self.txs.insert(tx_id, tx);
// TODO: we need to tie a pager's read transaction to a transaction ID, so that future refactors to read
// pages from WAL/DB read from a consistent state to maintiain snapshot isolation.
@@ -1154,12 +1151,11 @@ impl<Clock: LogicalClock> MvStore<Clock> {
/// * `tx_id` - The ID of the transaction to abort.
pub fn rollback_tx(&self, tx_id: TxID, pager: Rc<Pager>) {
let tx_unlocked = self.txs.get(&tx_id).unwrap();
let tx = tx_unlocked.value().write();
let tx = tx_unlocked.value();
assert_eq!(tx.state, TransactionState::Active);
tx.state.store(TransactionState::Aborted);
tracing::trace!("abort(tx_id={})", tx_id);
let write_set: Vec<RowID> = tx.write_set.iter().map(|v| *v.value()).collect();
drop(tx);
for ref id in write_set {
if let Some(row_versions) = self.rows.get(id) {
@@ -1171,7 +1167,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
}
}
let tx = tx_unlocked.value().read();
let tx = tx_unlocked.value();
tx.state.store(TransactionState::Terminated);
tracing::trace!("terminate(tx_id={})", tx_id);
pager.end_read_tx().unwrap();
@@ -1210,7 +1206,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
// a transaction started before this row version ended, ergo row version is needed
// NOTICE: O(row_versions x transactions), but also lock-free, so sounds acceptable
self.txs.iter().any(|tx| {
let tx = tx.value().read();
let tx = tx.value();
// FIXME: verify!
match tx.state.load() {
TransactionState::Active | TransactionState::Preparing => {
@@ -1264,7 +1260,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
fn get_begin_timestamp(&self, ts_or_id: &TxTimestampOrID) -> u64 {
match ts_or_id {
TxTimestampOrID::Timestamp(ts) => *ts,
TxTimestampOrID::TxID(tx_id) => self.txs.get(tx_id).unwrap().value().read().begin_ts,
TxTimestampOrID::TxID(tx_id) => self.txs.get(tx_id).unwrap().value().begin_ts,
}
}
@@ -1455,14 +1451,14 @@ impl<Clock: LogicalClock> MvStore<Clock> {
/// Ref: https://www.cs.cmu.edu/~15721-f24/papers/Hekaton.pdf , page 301,
/// 2.6. Updating a Version.
pub(crate) fn is_write_write_conflict(
txs: &SkipMap<TxID, RwLock<Transaction>>,
txs: &SkipMap<TxID, Transaction>,
tx: &Transaction,
rv: &RowVersion,
) -> bool {
match rv.end {
Some(TxTimestampOrID::TxID(rv_end)) => {
let te = txs.get(&rv_end).unwrap();
let te = te.value().read();
let te = te.value();
if te.tx_id == tx.tx_id {
return false;
}
@@ -1478,25 +1474,17 @@ pub(crate) fn is_write_write_conflict(
}
impl RowVersion {
pub fn is_visible_to(
&self,
tx: &Transaction,
txs: &SkipMap<TxID, RwLock<Transaction>>,
) -> bool {
pub fn is_visible_to(&self, tx: &Transaction, txs: &SkipMap<TxID, Transaction>) -> bool {
is_begin_visible(txs, tx, self) && is_end_visible(txs, tx, self)
}
}
fn is_begin_visible(
txs: &SkipMap<TxID, RwLock<Transaction>>,
tx: &Transaction,
rv: &RowVersion,
) -> bool {
fn is_begin_visible(txs: &SkipMap<TxID, Transaction>, tx: &Transaction, rv: &RowVersion) -> bool {
match rv.begin {
TxTimestampOrID::Timestamp(rv_begin_ts) => tx.begin_ts >= rv_begin_ts,
TxTimestampOrID::TxID(rv_begin) => {
let tb = txs.get(&rv_begin).unwrap();
let tb = tb.value().read();
let tb = tb.value();
let visible = match tb.state.load() {
TransactionState::Active => tx.tx_id == tb.tx_id && rv.end.is_none(),
TransactionState::Preparing => false, // NOTICE: makes sense for snapshot isolation, not so much for serializable!
@@ -1518,7 +1506,7 @@ fn is_begin_visible(
}
fn is_end_visible(
txs: &SkipMap<TxID, RwLock<Transaction>>,
txs: &SkipMap<TxID, Transaction>,
current_tx: &Transaction,
row_version: &RowVersion,
) -> bool {
@@ -1526,7 +1514,7 @@ fn is_end_visible(
Some(TxTimestampOrID::Timestamp(rv_end_ts)) => current_tx.begin_ts < rv_end_ts,
Some(TxTimestampOrID::TxID(rv_end)) => {
let other_tx = txs.get(&rv_end).unwrap();
let other_tx = other_tx.value().read();
let other_tx = other_tx.value();
let visible = match other_tx.state.load() {
// V's sharp mind discovered an issue with the hekaton paper which basically states that a
// transaction can see a row version if the end is a TXId only if it isn't the same transaction.

View File

@@ -983,20 +983,20 @@ Terminated | Irrelevant | Reread Vs End field. TE has terminated so
or not found | | the timestamp.
*/
fn new_tx(tx_id: TxID, begin_ts: u64, state: TransactionState) -> RwLock<Transaction> {
fn new_tx(tx_id: TxID, begin_ts: u64, state: TransactionState) -> Transaction {
let state = state.into();
RwLock::new(Transaction {
Transaction {
state,
tx_id,
begin_ts,
write_set: SkipSet::new(),
read_set: SkipSet::new(),
})
}
}
#[test]
fn test_snapshot_isolation_tx_visible1() {
let txs: SkipMap<TxID, RwLock<Transaction>> = SkipMap::from_iter([
let txs: SkipMap<TxID, Transaction> = SkipMap::from_iter([
(1, new_tx(1, 1, TransactionState::Committed(2))),
(2, new_tx(2, 2, TransactionState::Committed(5))),
(3, new_tx(3, 3, TransactionState::Aborted)),
@@ -1006,7 +1006,6 @@ fn test_snapshot_isolation_tx_visible1() {
]);
let current_tx = new_tx(4, 4, TransactionState::Preparing);
let current_tx = current_tx.read();
let rv_visible = |begin: TxTimestampOrID, end: Option<TxTimestampOrID>| {
let row_version = RowVersion {