diff --git a/core/mvcc/mvcc-rs/src/database/mod.rs b/core/mvcc/mvcc-rs/src/database/mod.rs index 07d532374..df9cc7bb1 100644 --- a/core/mvcc/mvcc-rs/src/database/mod.rs +++ b/core/mvcc/mvcc-rs/src/database/mod.rs @@ -138,7 +138,7 @@ impl Database { /// Creates a new database. pub fn new(clock: Clock, storage: Storage) -> Self { let inner = DatabaseInner { - rows: RefCell::new(SkipMap::new()), + rows: SkipMap::new(), txs: RefCell::new(HashMap::new()), tx_timestamps: RefCell::new(BTreeMap::new()), tx_ids: AtomicU64::new(1), // let's reserve transaction 0 for special purposes @@ -293,7 +293,7 @@ impl Database { #[derive(Debug)] pub struct DatabaseInner { - rows: RefCell>>>, + rows: SkipMap>>, txs: RefCell>, tx_timestamps: RefCell>, tx_ids: AtomicU64, @@ -314,8 +314,7 @@ impl DatabaseInner { end: None, row, }; - let rows = self.rows.borrow_mut(); - let versions = rows.get_or_insert_with(id, || RwLock::new(Vec::new())); + let versions = self.rows.get_or_insert_with(id, || RwLock::new(Vec::new())); let mut versions = versions.value().write().unwrap(); versions.push(row_version); tx.insert_to_write_set(id); @@ -327,8 +326,7 @@ impl DatabaseInner { // NOTICE: They *are* dropped before an await point!!! But the await is conditional, // so I think clippy is just confused. let mut txs = self.txs.borrow_mut(); - let rows = self.rows.borrow_mut(); - let row_versions_opt = rows.get(&id); + let row_versions_opt = self.rows.get(&id); if let Some(ref row_versions) = row_versions_opt { let mut row_versions = row_versions.value().write().unwrap(); for rv in row_versions.iter_mut().rev() { @@ -340,7 +338,6 @@ impl DatabaseInner { drop(txs); drop(row_versions); drop(row_versions_opt); - drop(rows); self.rollback_tx(tx_id); return Err(DatabaseError::WriteWriteConflict); } @@ -361,8 +358,7 @@ impl DatabaseInner { let txs = self.txs.borrow_mut(); let tx = txs.get(&tx_id).unwrap(); assert!(tx.state == TransactionState::Active); - let rows = self.rows.borrow(); - if let Some(row_versions) = rows.get(&id) { + if let Some(row_versions) = self.rows.get(&id) { let row_versions = row_versions.value().read().unwrap(); for rv in row_versions.iter().rev() { if is_version_visible(&txs, tx, rv) { @@ -375,14 +371,13 @@ impl DatabaseInner { } fn scan_row_ids(&self) -> Result> { - let rows = self.rows.borrow(); - let keys = rows.iter().map(|entry| *entry.key()); + let keys = self.rows.iter().map(|entry| *entry.key()); Ok(keys.collect()) } fn scan_row_ids_for_table(&self, table_id: u64) -> Result> { - let rows = &self.rows.borrow(); - Ok(rows + Ok(self + .rows .range( RowID { table_id, @@ -418,12 +413,11 @@ impl DatabaseInner { assert!(tx.state == TransactionState::Active); } } - let rows = self.rows.borrow_mut(); tx.state = TransactionState::Preparing; tracing::trace!("PREPARE {tx}"); let mut log_record: LogRecord = LogRecord::new(end_ts); for id in &tx.write_set { - if let Some(row_versions) = rows.get(id) { + if let Some(row_versions) = self.rows.get(id) { let mut row_versions = row_versions.value().write().unwrap(); for row_version in row_versions.iter_mut() { if let TxTimestampOrID::TxID(id) = row_version.begin { @@ -457,7 +451,6 @@ impl DatabaseInner { } } txs.remove(&tx_id); - drop(rows); drop(txs); if !log_record.row_versions.is_empty() { self.storage.log_tx(log_record)?; @@ -471,13 +464,12 @@ impl DatabaseInner { assert!(tx.state == TransactionState::Active); tx.state = TransactionState::Aborted; tracing::trace!("ABORT {tx}"); - let rows = self.rows.borrow_mut(); for id in &tx.write_set { - if let Some(row_versions) = rows.get(id) { + if let Some(row_versions) = self.rows.get(id) { let mut row_versions = row_versions.value().write().unwrap(); row_versions.retain(|rv| rv.begin != TxTimestampOrID::TxID(tx_id)); if row_versions.is_empty() { - rows.remove(id); + self.rows.remove(id); } } } @@ -506,9 +498,8 @@ impl DatabaseInner { fn drop_unused_row_versions(&self) { let txs = self.txs.borrow(); let tx_timestamps = self.tx_timestamps.borrow(); - let rows = self.rows.borrow_mut(); let mut to_remove = Vec::new(); - for entry in rows.iter() { + for entry in self.rows.iter() { let mut row_versions = entry.value().write().unwrap(); row_versions.retain(|rv| { let should_stay = match rv.end { @@ -543,7 +534,7 @@ impl DatabaseInner { } } for id in to_remove { - rows.remove(&id); + self.rows.remove(&id); } } @@ -552,9 +543,9 @@ impl DatabaseInner { for record in tx_log { tracing::debug!("RECOVERING {:?}", record); for version in record.row_versions { - let rows = self.rows.borrow_mut(); - let row_versions = - rows.get_or_insert_with(version.row.id, || RwLock::new(Vec::new())); + let row_versions = self + .rows + .get_or_insert_with(version.row.id, || RwLock::new(Vec::new())); let mut row_versions = row_versions.value().write().unwrap(); row_versions.push(version); } diff --git a/core/mvcc/mvcc-rs/src/database/tests.rs b/core/mvcc/mvcc-rs/src/database/tests.rs index adb29856e..29bf1c4f7 100644 --- a/core/mvcc/mvcc-rs/src/database/tests.rs +++ b/core/mvcc/mvcc-rs/src/database/tests.rs @@ -1,4 +1,3 @@ - use super::*; use crate::clock::LocalClock; use tracing_test::traced_test;