From ddbcd9be792a19347fd31b8b88b0018f4138e3a2 Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Tue, 6 Jun 2023 12:47:40 +0200 Subject: [PATCH] database: bring back dropping unused row versions --- core/mvcc/mvcc-rs/src/database/mod.rs | 41 +++++++++++++++++++++++++-- 1 file changed, 38 insertions(+), 3 deletions(-) diff --git a/core/mvcc/mvcc-rs/src/database/mod.rs b/core/mvcc/mvcc-rs/src/database/mod.rs index cfdd43c4c..3f6049a94 100644 --- a/core/mvcc/mvcc-rs/src/database/mod.rs +++ b/core/mvcc/mvcc-rs/src/database/mod.rs @@ -182,7 +182,6 @@ pub struct Database { } impl Database { - /// Creates a new database. pub fn new(clock: Clock, storage: Storage) -> Self { Self { @@ -193,7 +192,7 @@ impl Database { storage, } } - + /// Inserts a new row into the database. /// /// This function inserts a new `row` into the database within the context @@ -456,7 +455,43 @@ impl Database { /// FIXME: implement in a lock-free manner pub fn drop_unused_row_versions(&self) { - tracing::error!("Unused rows are not dropped at the moment. Will do!"); + let mut to_remove = Vec::new(); + for entry in self.rows.iter() { + let mut row_versions = entry.value().write().unwrap(); + row_versions.retain(|rv| { + let should_stay = match rv.end { + Some(TxTimestampOrID::Timestamp(version_end_ts)) => { + // a transaction started before this row version ended, + // ergo row version is needed + // NOTICE: O(row_versions x transactions), but also lock-free, so sounds acceptable + self.txs + .iter() + .any(|tx| version_end_ts >= tx.value().read().unwrap().begin_ts) + } + // Let's skip potentially complex logic if the transaction is still + // active/tracked. We will drop the row version when the transaction + // gets garbage-collected itself, it will always happen eventually. + Some(TxTimestampOrID::TxID(tx_id)) => !self.txs.contains_key(&tx_id), + // this row version is current, ergo visible + None => true, + }; + if !should_stay { + tracing::debug!( + "Dropping row version {:?} {:?}-{:?}", + entry.key(), + rv.begin, + rv.end + ); + } + should_stay + }); + if row_versions.is_empty() { + to_remove.push(*entry.key()); + } + } + for id in to_remove { + self.rows.remove(&id); + } } pub fn recover(&self) -> Result<()> {