From fdfc4fd5b4312408fc5eaa13c2ac2bd860f50964 Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Mon, 5 Jun 2023 13:58:24 +0200 Subject: [PATCH 1/2] database: drop RefCell from SkipMap not needed, the structure is already Send&Sync --- core/mvcc/mvcc-rs/src/database/mod.rs | 41 ++++++++++--------------- core/mvcc/mvcc-rs/src/database/tests.rs | 1 - 2 files changed, 16 insertions(+), 26 deletions(-) diff --git a/core/mvcc/mvcc-rs/src/database/mod.rs b/core/mvcc/mvcc-rs/src/database/mod.rs index 07d532374..df9cc7bb1 100644 --- a/core/mvcc/mvcc-rs/src/database/mod.rs +++ b/core/mvcc/mvcc-rs/src/database/mod.rs @@ -138,7 +138,7 @@ impl Database { /// Creates a new database. pub fn new(clock: Clock, storage: Storage) -> Self { let inner = DatabaseInner { - rows: RefCell::new(SkipMap::new()), + rows: SkipMap::new(), txs: RefCell::new(HashMap::new()), tx_timestamps: RefCell::new(BTreeMap::new()), tx_ids: AtomicU64::new(1), // let's reserve transaction 0 for special purposes @@ -293,7 +293,7 @@ impl Database { #[derive(Debug)] pub struct DatabaseInner { - rows: RefCell>>>, + rows: SkipMap>>, txs: RefCell>, tx_timestamps: RefCell>, tx_ids: AtomicU64, @@ -314,8 +314,7 @@ impl DatabaseInner { end: None, row, }; - let rows = self.rows.borrow_mut(); - let versions = rows.get_or_insert_with(id, || RwLock::new(Vec::new())); + let versions = self.rows.get_or_insert_with(id, || RwLock::new(Vec::new())); let mut versions = versions.value().write().unwrap(); versions.push(row_version); tx.insert_to_write_set(id); @@ -327,8 +326,7 @@ impl DatabaseInner { // NOTICE: They *are* dropped before an await point!!! But the await is conditional, // so I think clippy is just confused. let mut txs = self.txs.borrow_mut(); - let rows = self.rows.borrow_mut(); - let row_versions_opt = rows.get(&id); + let row_versions_opt = self.rows.get(&id); if let Some(ref row_versions) = row_versions_opt { let mut row_versions = row_versions.value().write().unwrap(); for rv in row_versions.iter_mut().rev() { @@ -340,7 +338,6 @@ impl DatabaseInner { drop(txs); drop(row_versions); drop(row_versions_opt); - drop(rows); self.rollback_tx(tx_id); return Err(DatabaseError::WriteWriteConflict); } @@ -361,8 +358,7 @@ impl DatabaseInner { let txs = self.txs.borrow_mut(); let tx = txs.get(&tx_id).unwrap(); assert!(tx.state == TransactionState::Active); - let rows = self.rows.borrow(); - if let Some(row_versions) = rows.get(&id) { + if let Some(row_versions) = self.rows.get(&id) { let row_versions = row_versions.value().read().unwrap(); for rv in row_versions.iter().rev() { if is_version_visible(&txs, tx, rv) { @@ -375,14 +371,13 @@ impl DatabaseInner { } fn scan_row_ids(&self) -> Result> { - let rows = self.rows.borrow(); - let keys = rows.iter().map(|entry| *entry.key()); + let keys = self.rows.iter().map(|entry| *entry.key()); Ok(keys.collect()) } fn scan_row_ids_for_table(&self, table_id: u64) -> Result> { - let rows = &self.rows.borrow(); - Ok(rows + Ok(self + .rows .range( RowID { table_id, @@ -418,12 +413,11 @@ impl DatabaseInner { assert!(tx.state == TransactionState::Active); } } - let rows = self.rows.borrow_mut(); tx.state = TransactionState::Preparing; tracing::trace!("PREPARE {tx}"); let mut log_record: LogRecord = LogRecord::new(end_ts); for id in &tx.write_set { - if let Some(row_versions) = rows.get(id) { + if let Some(row_versions) = self.rows.get(id) { let mut row_versions = row_versions.value().write().unwrap(); for row_version in row_versions.iter_mut() { if let TxTimestampOrID::TxID(id) = row_version.begin { @@ -457,7 +451,6 @@ impl DatabaseInner { } } txs.remove(&tx_id); - drop(rows); drop(txs); if !log_record.row_versions.is_empty() { self.storage.log_tx(log_record)?; @@ -471,13 +464,12 @@ impl DatabaseInner { assert!(tx.state == TransactionState::Active); tx.state = TransactionState::Aborted; tracing::trace!("ABORT {tx}"); - let rows = self.rows.borrow_mut(); for id in &tx.write_set { - if let Some(row_versions) = rows.get(id) { + if let Some(row_versions) = self.rows.get(id) { let mut row_versions = row_versions.value().write().unwrap(); row_versions.retain(|rv| rv.begin != TxTimestampOrID::TxID(tx_id)); if row_versions.is_empty() { - rows.remove(id); + self.rows.remove(id); } } } @@ -506,9 +498,8 @@ impl DatabaseInner { fn drop_unused_row_versions(&self) { let txs = self.txs.borrow(); let tx_timestamps = self.tx_timestamps.borrow(); - let rows = self.rows.borrow_mut(); let mut to_remove = Vec::new(); - for entry in rows.iter() { + for entry in self.rows.iter() { let mut row_versions = entry.value().write().unwrap(); row_versions.retain(|rv| { let should_stay = match rv.end { @@ -543,7 +534,7 @@ impl DatabaseInner { } } for id in to_remove { - rows.remove(&id); + self.rows.remove(&id); } } @@ -552,9 +543,9 @@ impl DatabaseInner { for record in tx_log { tracing::debug!("RECOVERING {:?}", record); for version in record.row_versions { - let rows = self.rows.borrow_mut(); - let row_versions = - rows.get_or_insert_with(version.row.id, || RwLock::new(Vec::new())); + let row_versions = self + .rows + .get_or_insert_with(version.row.id, || RwLock::new(Vec::new())); let mut row_versions = row_versions.value().write().unwrap(); row_versions.push(version); } diff --git a/core/mvcc/mvcc-rs/src/database/tests.rs b/core/mvcc/mvcc-rs/src/database/tests.rs index adb29856e..29bf1c4f7 100644 --- a/core/mvcc/mvcc-rs/src/database/tests.rs +++ b/core/mvcc/mvcc-rs/src/database/tests.rs @@ -1,4 +1,3 @@ - use super::*; use crate::clock::LocalClock; use tracing_test::traced_test; From a8faffa9f59cf33628e51cf3214161c2a70937c0 Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Mon, 5 Jun 2023 14:25:31 +0200 Subject: [PATCH 2/2] database: migrate txs to SkipMap --- core/mvcc/mvcc-rs/src/database/mod.rs | 170 ++++++++++++++++++-------- 1 file changed, 117 insertions(+), 53 deletions(-) diff --git a/core/mvcc/mvcc-rs/src/database/mod.rs b/core/mvcc/mvcc-rs/src/database/mod.rs index df9cc7bb1..66d82e54d 100644 --- a/core/mvcc/mvcc-rs/src/database/mod.rs +++ b/core/mvcc/mvcc-rs/src/database/mod.rs @@ -1,11 +1,11 @@ use crate::clock::LogicalClock; use crate::errors::DatabaseError; use crate::persistent_storage::Storage; -use crossbeam_skiplist::SkipMap; +use crossbeam_skiplist::{SkipMap, SkipSet}; use parking_lot::Mutex; use serde::{Deserialize, Serialize}; use std::cell::RefCell; -use std::collections::{BTreeMap, HashMap, HashSet}; +use std::collections::BTreeMap; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, RwLock}; @@ -66,7 +66,7 @@ enum TxTimestampOrID { } /// Transaction -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize)] pub struct Transaction { /// The state of the transaction. state: TransactionState, @@ -75,9 +75,55 @@ pub struct Transaction { /// The transaction begin timestamp. begin_ts: u64, /// The transaction write set. - write_set: HashSet, + #[serde(with = "skipset_rowid")] + write_set: SkipSet, /// The transaction read set. - read_set: RefCell>, + #[serde(with = "skipset_rowid")] + read_set: SkipSet, +} + +mod skipset_rowid { + use super::*; + use serde::{de, ser, ser::SerializeSeq}; + + struct SkipSetDeserializer; + + impl<'de> serde::de::Visitor<'de> for SkipSetDeserializer { + type Value = SkipSet; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("SkipSet key value sequence.") + } + + fn visit_seq(self, mut seq: A) -> std::result::Result + where + A: serde::de::SeqAccess<'de>, + { + let new_skipset = SkipSet::new(); + while let Some(elem) = seq.next_element()? { + new_skipset.insert(elem); + } + + Ok(new_skipset) + } + } + + pub fn serialize( + value: &SkipSet, + ser: S, + ) -> std::result::Result { + let mut set = ser.serialize_seq(Some(value.len()))?; + for v in value { + set.serialize_element(v.value())?; + } + set.end() + } + + pub fn deserialize<'de, D: de::Deserializer<'de>>( + de: D, + ) -> std::result::Result, D::Error> { + de.deserialize_seq(SkipSetDeserializer) + } } impl Transaction { @@ -86,14 +132,13 @@ impl Transaction { state: TransactionState::Active, tx_id, begin_ts, - write_set: HashSet::new(), - read_set: RefCell::new(HashSet::new()), + write_set: SkipSet::new(), + read_set: SkipSet::new(), } } fn insert_to_read_set(&self, id: RowID) { - let mut read_set = self.read_set.borrow_mut(); - read_set.insert(id); + self.read_set.insert(id); } fn insert_to_write_set(&mut self, id: RowID) { @@ -103,18 +148,21 @@ impl Transaction { impl std::fmt::Display for Transaction { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> { - match self.read_set.try_borrow() { - Ok(read_set) => write!( - f, - "{{ id: {}, begin_ts: {}, write_set: {:?}, read_set: {:?} }}", - self.tx_id, self.begin_ts, self.write_set, read_set - ), - Err(_) => write!( - f, - "{{ id: {}, begin_ts: {}, write_set: {:?}, read_set: }}", - self.tx_id, self.begin_ts, self.write_set - ), - } + write!( + f, + "{{ id: {}, begin_ts: {}, write_set: {:?}, read_set: {:?}", + self.tx_id, + self.begin_ts, + // FIXME: I'm sorry, we obviously shouldn't be cloning here. + self.write_set + .iter() + .map(|v| *v.value()) + .collect::>(), + self.read_set + .iter() + .map(|v| *v.value()) + .collect::>() + ) } } @@ -139,7 +187,7 @@ impl Database { pub fn new(clock: Clock, storage: Storage) -> Self { let inner = DatabaseInner { rows: SkipMap::new(), - txs: RefCell::new(HashMap::new()), + txs: SkipMap::new(), tx_timestamps: RefCell::new(BTreeMap::new()), tx_ids: AtomicU64::new(1), // let's reserve transaction 0 for special purposes clock, @@ -294,7 +342,7 @@ impl Database { #[derive(Debug)] pub struct DatabaseInner { rows: SkipMap>>, - txs: RefCell>, + txs: SkipMap>, tx_timestamps: RefCell>, tx_ids: AtomicU64, clock: Clock, @@ -303,10 +351,11 @@ pub struct DatabaseInner { impl DatabaseInner { fn insert(&self, tx_id: TxID, row: Row) -> Result<()> { - let mut txs = self.txs.borrow_mut(); - let tx = txs - .get_mut(&tx_id) + let tx = self + .txs + .get(&tx_id) .ok_or(DatabaseError::NoSuchTransactionID(tx_id))?; + let mut tx = tx.value().write().unwrap(); assert!(tx.state == TransactionState::Active); let id = row.id; let row_version = RowVersion { @@ -321,31 +370,32 @@ impl DatabaseInner { Ok(()) } - #[allow(clippy::await_holding_refcell_ref)] fn delete(&self, tx_id: TxID, id: RowID) -> Result { - // NOTICE: They *are* dropped before an await point!!! But the await is conditional, - // so I think clippy is just confused. - let mut txs = self.txs.borrow_mut(); let row_versions_opt = self.rows.get(&id); if let Some(ref row_versions) = row_versions_opt { let mut row_versions = row_versions.value().write().unwrap(); for rv in row_versions.iter_mut().rev() { - let tx = txs + let tx = self + .txs .get(&tx_id) .ok_or(DatabaseError::NoSuchTransactionID(tx_id))?; + let tx = tx.value().read().unwrap(); assert!(tx.state == TransactionState::Active); - if is_write_write_conflict(&txs, tx, rv) { - drop(txs); + if is_write_write_conflict(&self.txs, &tx, rv) { drop(row_versions); drop(row_versions_opt); + drop(tx); self.rollback_tx(tx_id); return Err(DatabaseError::WriteWriteConflict); } - if is_version_visible(&txs, tx, rv) { + if is_version_visible(&self.txs, &tx, rv) { rv.end = Some(TxTimestampOrID::TxID(tx.tx_id)); - let tx = txs - .get_mut(&tx_id) + drop(tx); // FIXME: maybe just grab the write lock above? Do we ever expect conflicts? + let tx = self + .txs + .get(&tx_id) .ok_or(DatabaseError::NoSuchTransactionID(tx_id))?; + let mut tx = tx.value().write().unwrap(); tx.insert_to_write_set(id); return Ok(true); } @@ -355,13 +405,13 @@ impl DatabaseInner { } fn read(&self, tx_id: TxID, id: RowID) -> Result> { - let txs = self.txs.borrow_mut(); - let tx = txs.get(&tx_id).unwrap(); + let tx = self.txs.get(&tx_id).unwrap(); + let tx = tx.value().read().unwrap(); assert!(tx.state == TransactionState::Active); if let Some(row_versions) = self.rows.get(&id) { let row_versions = row_versions.value().read().unwrap(); for rv in row_versions.iter().rev() { - if is_version_visible(&txs, tx, rv) { + if is_version_visible(&self.txs, &tx, rv) { tx.insert_to_read_set(id); return Ok(Some(rv.row.clone())); } @@ -396,17 +446,16 @@ impl DatabaseInner { let begin_ts = self.get_timestamp(); let tx = Transaction::new(tx_id, begin_ts); tracing::trace!("BEGIN {tx}"); - let mut txs = self.txs.borrow_mut(); let mut tx_timestamps = self.tx_timestamps.borrow_mut(); - txs.insert(tx_id, tx); + self.txs.insert(tx_id, RwLock::new(tx)); *tx_timestamps.entry(begin_ts).or_insert(0) += 1; tx_id } fn commit_tx(&mut self, tx_id: TxID) -> Result<()> { let end_ts = self.get_timestamp(); - let mut txs = self.txs.borrow_mut(); - let tx = txs.get_mut(&tx_id).unwrap(); + let tx = self.txs.get(&tx_id).unwrap(); + let mut tx = tx.value().write().unwrap(); match tx.state { TransactionState::Terminated => return Err(DatabaseError::TxTerminated), _ => { @@ -417,6 +466,7 @@ impl DatabaseInner { tracing::trace!("PREPARE {tx}"); let mut log_record: LogRecord = LogRecord::new(end_ts); for id in &tx.write_set { + let id = id.value(); if let Some(row_versions) = self.rows.get(id) { let mut row_versions = row_versions.value().write().unwrap(); for row_version in row_versions.iter_mut() { @@ -450,8 +500,7 @@ impl DatabaseInner { tx_timestamps.remove(&tx.begin_ts); } } - txs.remove(&tx_id); - drop(txs); + self.txs.remove(&tx_id); if !log_record.row_versions.is_empty() { self.storage.log_tx(log_record)?; } @@ -459,12 +508,13 @@ impl DatabaseInner { } fn rollback_tx(&self, tx_id: TxID) { - let mut txs = self.txs.borrow_mut(); - let tx = txs.get_mut(&tx_id).unwrap(); + let tx = self.txs.get(&tx_id).unwrap(); + let mut tx = tx.value().write().unwrap(); assert!(tx.state == TransactionState::Active); tx.state = TransactionState::Aborted; tracing::trace!("ABORT {tx}"); for id in &tx.write_set { + let id = id.value(); if let Some(row_versions) = self.rows.get(id) { let mut row_versions = row_versions.value().write().unwrap(); row_versions.retain(|rv| rv.begin != TxTimestampOrID::TxID(tx_id)); @@ -496,7 +546,6 @@ impl DatabaseInner { /// We can do better by keeping an index of row versions ordered /// by their end timestamps. fn drop_unused_row_versions(&self) { - let txs = self.txs.borrow(); let tx_timestamps = self.tx_timestamps.borrow(); let mut to_remove = Vec::new(); for entry in self.rows.iter() { @@ -515,7 +564,7 @@ impl DatabaseInner { // Let's skip potentially complex logic if the transaction is still // active/tracked. We will drop the row version when the transaction // gets garbage-collected itself, it will always happen eventually. - Some(TxTimestampOrID::TxID(tx_id)) => !txs.contains_key(&tx_id), + Some(TxTimestampOrID::TxID(tx_id)) => !self.txs.contains_key(&tx_id), // this row version is current, ergo visible None => true, }; @@ -558,13 +607,14 @@ impl DatabaseInner { /// A write-write conflict happens when transaction T_m attempts to update a /// row version that is currently being updated by an active transaction T_n. fn is_write_write_conflict( - txs: &HashMap, + txs: &SkipMap>, tx: &Transaction, rv: &RowVersion, ) -> bool { match rv.end { Some(TxTimestampOrID::TxID(rv_end)) => { let te = txs.get(&rv_end).unwrap(); + let te = te.value().read().unwrap(); match te.state { TransactionState::Active => tx.tx_id != te.tx_id, TransactionState::Preparing => todo!(), @@ -578,15 +628,24 @@ fn is_write_write_conflict( } } -fn is_version_visible(txs: &HashMap, tx: &Transaction, rv: &RowVersion) -> bool { +fn is_version_visible( + txs: &SkipMap>, + tx: &Transaction, + rv: &RowVersion, +) -> bool { is_begin_visible(txs, tx, rv) && is_end_visible(txs, tx, rv) } -fn is_begin_visible(txs: &HashMap, tx: &Transaction, rv: &RowVersion) -> bool { +fn is_begin_visible( + txs: &SkipMap>, + tx: &Transaction, + rv: &RowVersion, +) -> bool { match rv.begin { TxTimestampOrID::Timestamp(rv_begin_ts) => tx.begin_ts >= rv_begin_ts, TxTimestampOrID::TxID(rv_begin) => { let tb = txs.get(&rv_begin).unwrap(); + let tb = tb.value().read().unwrap(); match tb.state { TransactionState::Active => tx.tx_id == tb.tx_id && rv.end.is_none(), TransactionState::Preparing => todo!(), @@ -598,11 +657,16 @@ fn is_begin_visible(txs: &HashMap, tx: &Transaction, rv: &Row } } -fn is_end_visible(txs: &HashMap, tx: &Transaction, rv: &RowVersion) -> bool { +fn is_end_visible( + txs: &SkipMap>, + tx: &Transaction, + rv: &RowVersion, +) -> bool { match rv.end { Some(TxTimestampOrID::Timestamp(rv_end_ts)) => tx.begin_ts < rv_end_ts, Some(TxTimestampOrID::TxID(rv_end)) => { let te = txs.get(&rv_end).unwrap(); + let te = te.value().read().unwrap(); match te.state { TransactionState::Active => tx.tx_id != te.tx_id, TransactionState::Preparing => todo!(),