From 2a018ea9a3fc17068021fbc919f7898f8eaa360b Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Thu, 20 Apr 2023 15:46:58 +0200 Subject: [PATCH 1/2] fixup: move DatabaseError under a feature --- core/mvcc/database/src/persistent_storage.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/mvcc/database/src/persistent_storage.rs b/core/mvcc/database/src/persistent_storage.rs index 60c3d0a80..8687c43a0 100644 --- a/core/mvcc/database/src/persistent_storage.rs +++ b/core/mvcc/database/src/persistent_storage.rs @@ -1,5 +1,4 @@ use crate::database::{Result, Mutation}; -use crate::errors::DatabaseError; /// Persistent storage API for storing and retrieving transactions. /// TODO: final design in heavy progress! @@ -65,6 +64,7 @@ impl Storage for JsonOnDisk { type Stream = JsonOnDiskStream; async fn store(&mut self, m: Mutation) -> Result<()> { + use crate::errors::DatabaseError; use tokio::io::AsyncWriteExt; let t = serde_json::to_vec(&m).map_err(|e| DatabaseError::Io(e.to_string()))?; let mut file = tokio::fs::OpenOptions::new() From fb6ce709935ba0d12749291253510cfb4fc8d1cf Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Thu, 20 Apr 2023 15:34:17 +0200 Subject: [PATCH 2/2] database: add dropping unused row versions When a row version is not visible by any transactions, active or future ones, it should be dropped. --- core/mvcc/database/src/database.rs | 89 ++++++++++++++++++++++++------ 1 file changed, 73 insertions(+), 16 deletions(-) diff --git a/core/mvcc/database/src/database.rs b/core/mvcc/database/src/database.rs index 98cbefcea..dccfdcd17 100644 --- a/core/mvcc/database/src/database.rs +++ b/core/mvcc/database/src/database.rs @@ -2,7 +2,7 @@ use crate::clock::LogicalClock; use crate::errors::DatabaseError; use serde::{Deserialize, Serialize}; use std::cell::RefCell; -use std::collections::{HashMap, HashSet}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; @@ -135,6 +135,7 @@ impl< let inner = DatabaseInner { rows: RefCell::new(HashMap::new()), txs: RefCell::new(HashMap::new()), + tx_timestamps: RefCell::new(BTreeMap::new()), tx_ids: AtomicU64::new(0), clock, storage, @@ -260,16 +261,20 @@ impl< inner.rollback_tx(tx_id).await; } + /// Drops all unused row versions from the database. + /// + /// A version is considered unused if it is not visible to any active transaction + /// and it is not the most recent version of the row. + pub async fn drop_unused_row_versions(&self) { + let inner = self.inner.lock().await; + inner.drop_unused_row_versions(); + } + #[cfg(test)] pub(crate) async fn scan_storage(&self) -> Result> { use futures::StreamExt; let inner = self.inner.lock().await; - Ok(inner - .storage - .scan() - .await? - .collect::>() - .await) + Ok(inner.storage.scan().await?.collect::>().await) } } @@ -277,6 +282,7 @@ impl< pub struct DatabaseInner { rows: RefCell>>, txs: RefCell>, + tx_timestamps: RefCell>, tx_ids: AtomicU64, clock: Clock, storage: Storage, @@ -356,7 +362,9 @@ impl let tx = Transaction::new(tx_id, begin_ts); tracing::trace!("BEGIN {tx}"); let mut txs = self.txs.borrow_mut(); + let mut tx_timestamps = self.tx_timestamps.borrow_mut(); txs.insert(tx_id, tx); + *tx_timestamps.entry(begin_ts).or_insert(0) += 1; tx_id } @@ -401,6 +409,13 @@ impl // invariant doesn't necessarily hold anymore because another thread // might have speculatively read a version that we want to remove. // But that's a problem for another day. + let mut tx_timestamps = self.tx_timestamps.borrow_mut(); + if let Some(timestamp_entry) = tx_timestamps.get_mut(&tx.begin_ts) { + *timestamp_entry -= 1; + if timestamp_entry == &0 { + tx_timestamps.remove(&tx.begin_ts); + } + } txs.remove(&tx_id); drop(rows); drop(txs); @@ -436,6 +451,54 @@ impl fn get_timestamp(&mut self) -> u64 { self.clock.get_timestamp() } + + /// Drops all rows that are not visible to any transaction. + /// The logic is as follows. If a row version has an end marker + /// which denotes a transaction that is not active, then we can + /// drop the row version -- it is not visible to any transaction. + /// If a row version has an end marker that denotes a timestamp T_END, + /// then we can drop the row version only if all active transactions + /// have a begin timestamp that is greater than timestamp T_END. + /// FIXME: this function is a full scan over all rows and row versions. + /// We can do better by keeping an index of row versions ordered + /// by their end timestamps. + fn drop_unused_row_versions(&self) { + let txs = self.txs.borrow(); + let tx_timestamps = self.tx_timestamps.borrow(); + let mut rows = self.rows.borrow_mut(); + let mut to_remove = Vec::new(); + for (id, row_versions) in rows.iter_mut() { + row_versions.retain(|rv| { + let should_stay = match rv.end { + Some(TxTimestampOrID::Timestamp(version_end_ts)) => { + match tx_timestamps.first_key_value() { + // a transaction started before this row version ended, + // ergo row version is needed + Some((begin_ts, _)) => version_end_ts >= *begin_ts, + // no transaction => row version is not needed + None => false, + } + } + // Let's skip potentially complex logic if the transaction is still + // active/tracked. We will drop the row version when the transaction + // gets garbage-collected itself, it will always happen eventually. + Some(TxTimestampOrID::TxID(tx_id)) => !txs.contains_key(&tx_id), + // this row version is current, ergo visible + None => true, + }; + if !should_stay { + tracing::debug!("Dropping row version {} {:?}-{:?}", id, rv.begin, rv.end); + } + should_stay + }); + if row_versions.is_empty() { + to_remove.push(*id); + } + } + for id in to_remove { + rows.remove(&id); + } + } } /// A write-write conflict happens when transaction T_m attempts to update a @@ -599,6 +662,7 @@ mod tests { let row = db.read(tx2, 1).await.unwrap().unwrap(); db.commit_tx(tx2).await.unwrap(); assert_eq!(tx1_updated_row, row); + db.drop_unused_row_versions().await; } #[traced_test] @@ -885,19 +949,12 @@ mod tests { .await .unwrap(); - let mutation = db - .scan_storage() - .await - .unwrap(); + let mutation = db.scan_storage().await.unwrap(); println!("{:?}", mutation); db.commit_tx(tx4).await.unwrap(); - let mutation = db - .scan_storage() - .await - .unwrap(); + let mutation = db.scan_storage().await.unwrap(); println!("{:?}", mutation); - } }