From 47eb149214d982f3557dd44e5004018745fdfd3d Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Tue, 6 Jun 2023 11:21:10 +0200 Subject: [PATCH] database: drop the mutex Without a critical section, we naturally hit a few unimplemented paths when handling concurrent transactions, which is great news! Visiting previously impossible paths already proves that lock-free is able to handle concurrency > 1. Now, the easy part - fixing all the unimplemented paths and making the Hekaton implementation 100% foolproof. --- core/mvcc/mvcc-rs/src/database/mod.rs | 282 +++++------------- .../mvcc-rs/src/persistent_storage/mod.rs | 2 +- 2 files changed, 78 insertions(+), 206 deletions(-) diff --git a/core/mvcc/mvcc-rs/src/database/mod.rs b/core/mvcc/mvcc-rs/src/database/mod.rs index 66d82e54d..cfdd43c4c 100644 --- a/core/mvcc/mvcc-rs/src/database/mod.rs +++ b/core/mvcc/mvcc-rs/src/database/mod.rs @@ -2,12 +2,9 @@ use crate::clock::LogicalClock; use crate::errors::DatabaseError; use crate::persistent_storage::Storage; use crossbeam_skiplist::{SkipMap, SkipSet}; -use parking_lot::Mutex; use serde::{Deserialize, Serialize}; -use std::cell::RefCell; -use std::collections::BTreeMap; use std::sync::atomic::{AtomicU64, Ordering}; -use std::sync::{Arc, RwLock}; +use std::sync::RwLock; pub type Result = std::result::Result; @@ -175,29 +172,28 @@ enum TransactionState { Aborted, Terminated, } - -/// A database with MVCC. #[derive(Debug)] pub struct Database { - inner: Arc>>, + rows: SkipMap>>, + txs: SkipMap>, + tx_ids: AtomicU64, + clock: Clock, + storage: Storage, } impl Database { + /// Creates a new database. pub fn new(clock: Clock, storage: Storage) -> Self { - let inner = DatabaseInner { + Self { rows: SkipMap::new(), txs: SkipMap::new(), - tx_timestamps: RefCell::new(BTreeMap::new()), tx_ids: AtomicU64::new(1), // let's reserve transaction 0 for special purposes clock, storage, - }; - Self { - inner: Arc::new(Mutex::new(inner)), } } - + /// Inserts a new row into the database. /// /// This function inserts a new `row` into the database within the context @@ -209,8 +205,23 @@ impl Database { /// * `row` - the row object containing the values to be inserted. /// pub fn insert(&self, tx_id: TxID, row: Row) -> Result<()> { - let inner = self.inner.lock(); - inner.insert(tx_id, row) + let tx = self + .txs + .get(&tx_id) + .ok_or(DatabaseError::NoSuchTransactionID(tx_id))?; + let mut tx = tx.value().write().unwrap(); + assert!(tx.state == TransactionState::Active); + let id = row.id; + let row_version = RowVersion { + begin: TxTimestampOrID::TxID(tx.tx_id), + end: None, + row, + }; + let versions = self.rows.get_or_insert_with(id, || RwLock::new(Vec::new())); + let mut versions = versions.value().write().unwrap(); + versions.push(row_version); + tx.insert_to_write_set(id); + Ok(()) } /// Updates a row in the database with new values. @@ -254,123 +265,6 @@ impl Database { /// Returns `true` if the row was successfully deleted, and `false` otherwise. /// pub fn delete(&self, tx_id: TxID, id: RowID) -> Result { - let inner = self.inner.lock(); - inner.delete(tx_id, id) - } - - /// Retrieves a row from the table with the given `id`. - /// - /// This operation is performed within the scope of the transaction identified - /// by `tx_id`. - /// - /// # Arguments - /// - /// * `tx_id` - The ID of the transaction to perform the read operation in. - /// * `id` - The ID of the row to retrieve. - /// - /// # Returns - /// - /// Returns `Some(row)` with the row data if the row with the given `id` exists, - /// and `None` otherwise. - pub fn read(&self, tx_id: TxID, id: RowID) -> Result> { - let inner = self.inner.lock(); - inner.read(tx_id, id) - } - - pub fn scan_row_ids(&self) -> Result> { - let inner = self.inner.lock(); - inner.scan_row_ids() - } - - pub fn scan_row_ids_for_table(&self, table_id: u64) -> Result> { - let inner = self.inner.lock(); - inner.scan_row_ids_for_table(table_id) - } - - /// Begins a new transaction in the database. - /// - /// This function starts a new transaction in the database and returns a `TxID` value - /// that you can use to perform operations within the transaction. All changes made within the - /// transaction are isolated from other transactions until you commit the transaction. - pub fn begin_tx(&self) -> TxID { - let mut inner = self.inner.lock(); - inner.begin_tx() - } - - /// Commits a transaction with the specified transaction ID. - /// - /// This function commits the changes made within the specified transaction and finalizes the - /// transaction. Once a transaction has been committed, all changes made within the transaction - /// are visible to other transactions that access the same data. - /// - /// # Arguments - /// - /// * `tx_id` - The ID of the transaction to commit. - pub fn commit_tx(&self, tx_id: TxID) -> Result<()> { - let mut inner = self.inner.lock(); - inner.commit_tx(tx_id) - } - - /// Rolls back a transaction with the specified ID. - /// - /// This function rolls back a transaction with the specified `tx_id` by - /// discarding any changes made by the transaction. - /// - /// # Arguments - /// - /// * `tx_id` - The ID of the transaction to abort. - pub fn rollback_tx(&self, tx_id: TxID) { - let inner = self.inner.lock(); - inner.rollback_tx(tx_id); - } - - /// Drops all unused row versions from the database. - /// - /// A version is considered unused if it is not visible to any active transaction - /// and it is not the most recent version of the row. - pub fn drop_unused_row_versions(&self) { - let inner = self.inner.lock(); - inner.drop_unused_row_versions(); - } - - pub fn recover(&self) -> Result<()> { - let inner = self.inner.lock(); - inner.recover() - } -} - -#[derive(Debug)] -pub struct DatabaseInner { - rows: SkipMap>>, - txs: SkipMap>, - tx_timestamps: RefCell>, - tx_ids: AtomicU64, - clock: Clock, - storage: Storage, -} - -impl DatabaseInner { - fn insert(&self, tx_id: TxID, row: Row) -> Result<()> { - let tx = self - .txs - .get(&tx_id) - .ok_or(DatabaseError::NoSuchTransactionID(tx_id))?; - let mut tx = tx.value().write().unwrap(); - assert!(tx.state == TransactionState::Active); - let id = row.id; - let row_version = RowVersion { - begin: TxTimestampOrID::TxID(tx.tx_id), - end: None, - row, - }; - let versions = self.rows.get_or_insert_with(id, || RwLock::new(Vec::new())); - let mut versions = versions.value().write().unwrap(); - versions.push(row_version); - tx.insert_to_write_set(id); - Ok(()) - } - - fn delete(&self, tx_id: TxID, id: RowID) -> Result { let row_versions_opt = self.rows.get(&id); if let Some(ref row_versions) = row_versions_opt { let mut row_versions = row_versions.value().write().unwrap(); @@ -404,7 +298,21 @@ impl DatabaseInner { Ok(false) } - fn read(&self, tx_id: TxID, id: RowID) -> Result> { + /// Retrieves a row from the table with the given `id`. + /// + /// This operation is performed within the scope of the transaction identified + /// by `tx_id`. + /// + /// # Arguments + /// + /// * `tx_id` - The ID of the transaction to perform the read operation in. + /// * `id` - The ID of the row to retrieve. + /// + /// # Returns + /// + /// Returns `Some(row)` with the row data if the row with the given `id` exists, + /// and `None` otherwise. + pub fn read(&self, tx_id: TxID, id: RowID) -> Result> { let tx = self.txs.get(&tx_id).unwrap(); let tx = tx.value().read().unwrap(); assert!(tx.state == TransactionState::Active); @@ -420,12 +328,14 @@ impl DatabaseInner { Ok(None) } - fn scan_row_ids(&self) -> Result> { + /// Gets all row ids in the database. + pub fn scan_row_ids(&self) -> Result> { let keys = self.rows.iter().map(|entry| *entry.key()); Ok(keys.collect()) } - fn scan_row_ids_for_table(&self, table_id: u64) -> Result> { + /// Gets all row ids in the database for a given table. + pub fn scan_row_ids_for_table(&self, table_id: u64) -> Result> { Ok(self .rows .range( @@ -441,18 +351,30 @@ impl DatabaseInner { .collect()) } - fn begin_tx(&mut self) -> TxID { + /// Begins a new transaction in the database. + /// + /// This function starts a new transaction in the database and returns a `TxID` value + /// that you can use to perform operations within the transaction. All changes made within the + /// transaction are isolated from other transactions until you commit the transaction. + pub fn begin_tx(&self) -> TxID { let tx_id = self.get_tx_id(); let begin_ts = self.get_timestamp(); let tx = Transaction::new(tx_id, begin_ts); tracing::trace!("BEGIN {tx}"); - let mut tx_timestamps = self.tx_timestamps.borrow_mut(); self.txs.insert(tx_id, RwLock::new(tx)); - *tx_timestamps.entry(begin_ts).or_insert(0) += 1; tx_id } - fn commit_tx(&mut self, tx_id: TxID) -> Result<()> { + /// Commits a transaction with the specified transaction ID. + /// + /// This function commits the changes made within the specified transaction and finalizes the + /// transaction. Once a transaction has been committed, all changes made within the transaction + /// are visible to other transactions that access the same data. + /// + /// # Arguments + /// + /// * `tx_id` - The ID of the transaction to commit. + pub fn commit_tx(&self, tx_id: TxID) -> Result<()> { let end_ts = self.get_timestamp(); let tx = self.txs.get(&tx_id).unwrap(); let mut tx = tx.value().write().unwrap(); @@ -487,19 +409,6 @@ impl DatabaseInner { } tx.state = TransactionState::Committed; tracing::trace!("COMMIT {tx}"); - // We have now updated all the versions with a reference to the - // transaction ID to a timestamp and can, therefore, remove the - // transaction. Please note that when we move to lockless, the - // invariant doesn't necessarily hold anymore because another thread - // might have speculatively read a version that we want to remove. - // But that's a problem for another day. - let mut tx_timestamps = self.tx_timestamps.borrow_mut(); - if let Some(timestamp_entry) = tx_timestamps.get_mut(&tx.begin_ts) { - *timestamp_entry -= 1; - if timestamp_entry == &0 { - tx_timestamps.remove(&tx.begin_ts); - } - } self.txs.remove(&tx_id); if !log_record.row_versions.is_empty() { self.storage.log_tx(log_record)?; @@ -507,7 +416,15 @@ impl DatabaseInner { Ok(()) } - fn rollback_tx(&self, tx_id: TxID) { + /// Rolls back a transaction with the specified ID. + /// + /// This function rolls back a transaction with the specified `tx_id` by + /// discarding any changes made by the transaction. + /// + /// # Arguments + /// + /// * `tx_id` - The ID of the transaction to abort. + pub fn rollback_tx(&self, tx_id: TxID) { let tx = self.txs.get(&tx_id).unwrap(); let mut tx = tx.value().write().unwrap(); assert!(tx.state == TransactionState::Active); @@ -527,64 +444,19 @@ impl DatabaseInner { tracing::trace!("TERMINATE {tx}"); } - fn get_tx_id(&mut self) -> u64 { + /// Generates next unique transaction id + pub fn get_tx_id(&self) -> u64 { self.tx_ids.fetch_add(1, Ordering::SeqCst) } - fn get_timestamp(&mut self) -> u64 { + /// Gets current timestamp + pub fn get_timestamp(&self) -> u64 { self.clock.get_timestamp() } - /// Drops all rows that are not visible to any transaction. - /// The logic is as follows. If a row version has an end marker - /// which denotes a transaction that is not active, then we can - /// drop the row version -- it is not visible to any transaction. - /// If a row version has an end marker that denotes a timestamp T_END, - /// then we can drop the row version only if all active transactions - /// have a begin timestamp that is greater than timestamp T_END. - /// FIXME: this function is a full scan over all rows and row versions. - /// We can do better by keeping an index of row versions ordered - /// by their end timestamps. - fn drop_unused_row_versions(&self) { - let tx_timestamps = self.tx_timestamps.borrow(); - let mut to_remove = Vec::new(); - for entry in self.rows.iter() { - let mut row_versions = entry.value().write().unwrap(); - row_versions.retain(|rv| { - let should_stay = match rv.end { - Some(TxTimestampOrID::Timestamp(version_end_ts)) => { - match tx_timestamps.first_key_value() { - // a transaction started before this row version ended, - // ergo row version is needed - Some((begin_ts, _)) => version_end_ts >= *begin_ts, - // no transaction => row version is not needed - None => false, - } - } - // Let's skip potentially complex logic if the transaction is still - // active/tracked. We will drop the row version when the transaction - // gets garbage-collected itself, it will always happen eventually. - Some(TxTimestampOrID::TxID(tx_id)) => !self.txs.contains_key(&tx_id), - // this row version is current, ergo visible - None => true, - }; - if !should_stay { - tracing::debug!( - "Dropping row version {:?} {:?}-{:?}", - entry.key(), - rv.begin, - rv.end - ); - } - should_stay - }); - if row_versions.is_empty() { - to_remove.push(*entry.key()); - } - } - for id in to_remove { - self.rows.remove(&id); - } + /// FIXME: implement in a lock-free manner + pub fn drop_unused_row_versions(&self) { + tracing::error!("Unused rows are not dropped at the moment. Will do!"); } pub fn recover(&self) -> Result<()> { diff --git a/core/mvcc/mvcc-rs/src/persistent_storage/mod.rs b/core/mvcc/mvcc-rs/src/persistent_storage/mod.rs index f927be381..185a432ee 100644 --- a/core/mvcc/mvcc-rs/src/persistent_storage/mod.rs +++ b/core/mvcc/mvcc-rs/src/persistent_storage/mod.rs @@ -27,7 +27,7 @@ impl Storage { } impl Storage { - pub fn log_tx(&mut self, m: LogRecord) -> Result<()> { + pub fn log_tx(&self, m: LogRecord) -> Result<()> { match self { Self::JsonOnDisk(path) => { use std::io::Write;