diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index 84c343adb..b3f71a871 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -2,9 +2,9 @@ use crate::mvcc::clock::LogicalClock; use crate::mvcc::errors::DatabaseError; use crate::mvcc::persistent_storage::Storage; use crossbeam_skiplist::{SkipMap, SkipSet}; +use parking_lot::RwLock; use std::fmt::Debug; use std::sync::atomic::{AtomicU64, Ordering}; -use std::sync::RwLock; pub type Result = std::result::Result; @@ -265,7 +265,7 @@ impl MvStore { .txs .get(&tx_id) .ok_or(DatabaseError::NoSuchTransactionID(tx_id))?; - let mut tx = tx.value().write().unwrap(); + let mut tx = tx.value().write(); assert_eq!(tx.state, TransactionState::Active); let id = row.id; let row_version = RowVersion { @@ -332,13 +332,13 @@ impl MvStore { tracing::trace!("delete(tx_id={}, id={:?})", tx_id, id); let row_versions_opt = self.rows.get(&id); if let Some(ref row_versions) = row_versions_opt { - let mut row_versions = row_versions.value().write().unwrap(); + let mut row_versions = row_versions.value().write(); for rv in row_versions.iter_mut().rev() { let tx = self .txs .get(&tx_id) .ok_or(DatabaseError::NoSuchTransactionID(tx_id))?; - let tx = tx.value().read().unwrap(); + let tx = tx.value().read(); assert_eq!(tx.state, TransactionState::Active); // A transaction cannot delete a version that it cannot see, // nor can it conflict with it. @@ -361,7 +361,7 @@ impl MvStore { .txs .get(&tx_id) .ok_or(DatabaseError::NoSuchTransactionID(tx_id))?; - let mut tx = tx.value().write().unwrap(); + let mut tx = tx.value().write(); tx.insert_to_write_set(id); return Ok(true); } @@ -386,10 +386,10 @@ impl MvStore { pub fn read(&self, tx_id: TxID, id: RowID) -> Result> { tracing::trace!("read(tx_id={}, id={:?})", tx_id, id); let tx = self.txs.get(&tx_id).unwrap(); - let tx = tx.value().read().unwrap(); + let tx = tx.value().read(); assert_eq!(tx.state, TransactionState::Active); if let Some(row_versions) = self.rows.get(&id) { - let row_versions = row_versions.value().read().unwrap(); + let row_versions = row_versions.value().read(); if let Some(rv) = row_versions .iter() .rev() @@ -507,7 +507,7 @@ impl MvStore { // NOTICE: the first shadowed tx keeps the entry alive in the map // for the duration of this whole function, which is important for correctness! let tx = self.txs.get(&tx_id).ok_or(DatabaseError::TxTerminated)?; - let tx = tx.value().write().unwrap(); + let tx = tx.value().write(); match tx.state.load() { TransactionState::Terminated => return Err(DatabaseError::TxTerminated), _ => { @@ -599,7 +599,7 @@ impl MvStore { let mut log_record = LogRecord::new(end_ts); for ref id in write_set { if let Some(row_versions) = self.rows.get(id) { - let mut row_versions = row_versions.value().write().unwrap(); + let mut row_versions = row_versions.value().write(); for row_version in row_versions.iter_mut() { if let TxTimestampOrID::TxID(id) = row_version.begin { if id == tx_id { @@ -653,7 +653,7 @@ impl MvStore { /// * `tx_id` - The ID of the transaction to abort. pub fn rollback_tx(&self, tx_id: TxID) { let tx_unlocked = self.txs.get(&tx_id).unwrap(); - let tx = tx_unlocked.value().write().unwrap(); + let tx = tx_unlocked.value().write(); assert_eq!(tx.state, TransactionState::Active); tx.state.store(TransactionState::Aborted); tracing::trace!("abort(tx_id={})", tx_id); @@ -662,7 +662,7 @@ impl MvStore { for ref id in write_set { if let Some(row_versions) = self.rows.get(id) { - let mut row_versions = row_versions.value().write().unwrap(); + let mut row_versions = row_versions.value().write(); row_versions.retain(|rv| rv.begin != TxTimestampOrID::TxID(tx_id)); if row_versions.is_empty() { self.rows.remove(id); @@ -670,7 +670,7 @@ impl MvStore { } } - let tx = tx_unlocked.value().read().unwrap(); + let tx = tx_unlocked.value().read(); tx.state.store(TransactionState::Terminated); tracing::trace!("terminate(tx_id={})", tx_id); // FIXME: verify that we can already remove the transaction here! @@ -700,7 +700,7 @@ impl MvStore { let mut dropped = 0; let mut to_remove = Vec::new(); for entry in self.rows.iter() { - let mut row_versions = entry.value().write().unwrap(); + let mut row_versions = entry.value().write(); row_versions.retain(|rv| { // FIXME: should take rv.begin into account as well let should_stay = match rv.end { @@ -708,7 +708,7 @@ impl MvStore { // a transaction started before this row version ended, ergo row version is needed // NOTICE: O(row_versions x transactions), but also lock-free, so sounds acceptable self.txs.iter().any(|tx| { - let tx = tx.value().read().unwrap(); + let tx = tx.value().read(); // FIXME: verify! match tx.state.load() { TransactionState::Active | TransactionState::Preparing => { @@ -762,15 +762,7 @@ impl MvStore { fn get_begin_timestamp(&self, ts_or_id: &TxTimestampOrID) -> u64 { match ts_or_id { TxTimestampOrID::Timestamp(ts) => *ts, - TxTimestampOrID::TxID(tx_id) => { - self.txs - .get(tx_id) - .unwrap() - .value() - .read() - .unwrap() - .begin_ts - } + TxTimestampOrID::TxID(tx_id) => self.txs.get(tx_id).unwrap().value().read().begin_ts, } } @@ -778,7 +770,7 @@ impl MvStore { /// the row version is inserted in the correct order. fn insert_version(&self, id: RowID, row_version: RowVersion) { let versions = self.rows.get_or_insert_with(id, || RwLock::new(Vec::new())); - let mut versions = versions.value().write().unwrap(); + let mut versions = versions.value().write(); self.insert_version_raw(&mut versions, row_version) } @@ -828,7 +820,7 @@ pub(crate) fn is_write_write_conflict( match rv.end { Some(TxTimestampOrID::TxID(rv_end)) => { let te = txs.get(&rv_end).unwrap(); - let te = te.value().read().unwrap(); + let te = te.value().read(); if te.tx_id == tx.tx_id { return false; } @@ -862,7 +854,7 @@ fn is_begin_visible( TxTimestampOrID::Timestamp(rv_begin_ts) => tx.begin_ts >= rv_begin_ts, TxTimestampOrID::TxID(rv_begin) => { let tb = txs.get(&rv_begin).unwrap(); - let tb = tb.value().read().unwrap(); + let tb = tb.value().read(); let visible = match tb.state.load() { TransactionState::Active => tx.tx_id == tb.tx_id && rv.end.is_none(), TransactionState::Preparing => false, // NOTICE: makes sense for snapshot isolation, not so much for serializable! @@ -892,7 +884,7 @@ fn is_end_visible( Some(TxTimestampOrID::Timestamp(rv_end_ts)) => tx.begin_ts < rv_end_ts, Some(TxTimestampOrID::TxID(rv_end)) => { let te = txs.get(&rv_end).unwrap(); - let te = te.value().read().unwrap(); + let te = te.value().read(); let visible = match te.state.load() { TransactionState::Active => tx.tx_id != te.tx_id, TransactionState::Preparing => false, // NOTICE: makes sense for snapshot isolation, not so much for serializable! diff --git a/core/mvcc/database/tests.rs b/core/mvcc/database/tests.rs index d4949d3af..6ae69456f 100644 --- a/core/mvcc/database/tests.rs +++ b/core/mvcc/database/tests.rs @@ -901,7 +901,7 @@ fn test_snapshot_isolation_tx_visible1() { ]); let current_tx = new_tx(4, 4, TransactionState::Preparing); - let current_tx = current_tx.read().unwrap(); + let current_tx = current_tx.read(); let rv_visible = |begin: TxTimestampOrID, end: Option| { let row_version = RowVersion {