core/mvcc: Switch to parking_lot RwLock

This commit is contained in:
Pekka Enberg
2025-07-30 19:36:25 +03:00
parent 951bebfac3
commit b7cb4a3ed4
2 changed files with 20 additions and 28 deletions

View File

@@ -2,9 +2,9 @@ use crate::mvcc::clock::LogicalClock;
use crate::mvcc::errors::DatabaseError;
use crate::mvcc::persistent_storage::Storage;
use crossbeam_skiplist::{SkipMap, SkipSet};
use parking_lot::RwLock;
use std::fmt::Debug;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::RwLock;
pub type Result<T> = std::result::Result<T, DatabaseError>;
@@ -265,7 +265,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
.txs
.get(&tx_id)
.ok_or(DatabaseError::NoSuchTransactionID(tx_id))?;
let mut tx = tx.value().write().unwrap();
let mut tx = tx.value().write();
assert_eq!(tx.state, TransactionState::Active);
let id = row.id;
let row_version = RowVersion {
@@ -332,13 +332,13 @@ impl<Clock: LogicalClock> MvStore<Clock> {
tracing::trace!("delete(tx_id={}, id={:?})", tx_id, id);
let row_versions_opt = self.rows.get(&id);
if let Some(ref row_versions) = row_versions_opt {
let mut row_versions = row_versions.value().write().unwrap();
let mut row_versions = row_versions.value().write();
for rv in row_versions.iter_mut().rev() {
let tx = self
.txs
.get(&tx_id)
.ok_or(DatabaseError::NoSuchTransactionID(tx_id))?;
let tx = tx.value().read().unwrap();
let tx = tx.value().read();
assert_eq!(tx.state, TransactionState::Active);
// A transaction cannot delete a version that it cannot see,
// nor can it conflict with it.
@@ -361,7 +361,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
.txs
.get(&tx_id)
.ok_or(DatabaseError::NoSuchTransactionID(tx_id))?;
let mut tx = tx.value().write().unwrap();
let mut tx = tx.value().write();
tx.insert_to_write_set(id);
return Ok(true);
}
@@ -386,10 +386,10 @@ impl<Clock: LogicalClock> MvStore<Clock> {
pub fn read(&self, tx_id: TxID, id: RowID) -> Result<Option<Row>> {
tracing::trace!("read(tx_id={}, id={:?})", tx_id, id);
let tx = self.txs.get(&tx_id).unwrap();
let tx = tx.value().read().unwrap();
let tx = tx.value().read();
assert_eq!(tx.state, TransactionState::Active);
if let Some(row_versions) = self.rows.get(&id) {
let row_versions = row_versions.value().read().unwrap();
let row_versions = row_versions.value().read();
if let Some(rv) = row_versions
.iter()
.rev()
@@ -507,7 +507,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
// NOTICE: the first shadowed tx keeps the entry alive in the map
// for the duration of this whole function, which is important for correctness!
let tx = self.txs.get(&tx_id).ok_or(DatabaseError::TxTerminated)?;
let tx = tx.value().write().unwrap();
let tx = tx.value().write();
match tx.state.load() {
TransactionState::Terminated => return Err(DatabaseError::TxTerminated),
_ => {
@@ -599,7 +599,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
let mut log_record = LogRecord::new(end_ts);
for ref id in write_set {
if let Some(row_versions) = self.rows.get(id) {
let mut row_versions = row_versions.value().write().unwrap();
let mut row_versions = row_versions.value().write();
for row_version in row_versions.iter_mut() {
if let TxTimestampOrID::TxID(id) = row_version.begin {
if id == tx_id {
@@ -653,7 +653,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
/// * `tx_id` - The ID of the transaction to abort.
pub fn rollback_tx(&self, tx_id: TxID) {
let tx_unlocked = self.txs.get(&tx_id).unwrap();
let tx = tx_unlocked.value().write().unwrap();
let tx = tx_unlocked.value().write();
assert_eq!(tx.state, TransactionState::Active);
tx.state.store(TransactionState::Aborted);
tracing::trace!("abort(tx_id={})", tx_id);
@@ -662,7 +662,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
for ref id in write_set {
if let Some(row_versions) = self.rows.get(id) {
let mut row_versions = row_versions.value().write().unwrap();
let mut row_versions = row_versions.value().write();
row_versions.retain(|rv| rv.begin != TxTimestampOrID::TxID(tx_id));
if row_versions.is_empty() {
self.rows.remove(id);
@@ -670,7 +670,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
}
}
let tx = tx_unlocked.value().read().unwrap();
let tx = tx_unlocked.value().read();
tx.state.store(TransactionState::Terminated);
tracing::trace!("terminate(tx_id={})", tx_id);
// FIXME: verify that we can already remove the transaction here!
@@ -700,7 +700,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
let mut dropped = 0;
let mut to_remove = Vec::new();
for entry in self.rows.iter() {
let mut row_versions = entry.value().write().unwrap();
let mut row_versions = entry.value().write();
row_versions.retain(|rv| {
// FIXME: should take rv.begin into account as well
let should_stay = match rv.end {
@@ -708,7 +708,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
// a transaction started before this row version ended, ergo row version is needed
// NOTICE: O(row_versions x transactions), but also lock-free, so sounds acceptable
self.txs.iter().any(|tx| {
let tx = tx.value().read().unwrap();
let tx = tx.value().read();
// FIXME: verify!
match tx.state.load() {
TransactionState::Active | TransactionState::Preparing => {
@@ -762,15 +762,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
fn get_begin_timestamp(&self, ts_or_id: &TxTimestampOrID) -> u64 {
match ts_or_id {
TxTimestampOrID::Timestamp(ts) => *ts,
TxTimestampOrID::TxID(tx_id) => {
self.txs
.get(tx_id)
.unwrap()
.value()
.read()
.unwrap()
.begin_ts
}
TxTimestampOrID::TxID(tx_id) => self.txs.get(tx_id).unwrap().value().read().begin_ts,
}
}
@@ -778,7 +770,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
/// the row version is inserted in the correct order.
fn insert_version(&self, id: RowID, row_version: RowVersion) {
let versions = self.rows.get_or_insert_with(id, || RwLock::new(Vec::new()));
let mut versions = versions.value().write().unwrap();
let mut versions = versions.value().write();
self.insert_version_raw(&mut versions, row_version)
}
@@ -828,7 +820,7 @@ pub(crate) fn is_write_write_conflict(
match rv.end {
Some(TxTimestampOrID::TxID(rv_end)) => {
let te = txs.get(&rv_end).unwrap();
let te = te.value().read().unwrap();
let te = te.value().read();
if te.tx_id == tx.tx_id {
return false;
}
@@ -862,7 +854,7 @@ fn is_begin_visible(
TxTimestampOrID::Timestamp(rv_begin_ts) => tx.begin_ts >= rv_begin_ts,
TxTimestampOrID::TxID(rv_begin) => {
let tb = txs.get(&rv_begin).unwrap();
let tb = tb.value().read().unwrap();
let tb = tb.value().read();
let visible = match tb.state.load() {
TransactionState::Active => tx.tx_id == tb.tx_id && rv.end.is_none(),
TransactionState::Preparing => false, // NOTICE: makes sense for snapshot isolation, not so much for serializable!
@@ -892,7 +884,7 @@ fn is_end_visible(
Some(TxTimestampOrID::Timestamp(rv_end_ts)) => tx.begin_ts < rv_end_ts,
Some(TxTimestampOrID::TxID(rv_end)) => {
let te = txs.get(&rv_end).unwrap();
let te = te.value().read().unwrap();
let te = te.value().read();
let visible = match te.state.load() {
TransactionState::Active => tx.tx_id != te.tx_id,
TransactionState::Preparing => false, // NOTICE: makes sense for snapshot isolation, not so much for serializable!

View File

@@ -901,7 +901,7 @@ fn test_snapshot_isolation_tx_visible1() {
]);
let current_tx = new_tx(4, 4, TransactionState::Preparing);
let current_tx = current_tx.read().unwrap();
let current_tx = current_tx.read();
let rv_visible = |begin: TxTimestampOrID, end: Option<TxTimestampOrID>| {
let row_version = RowVersion {