database: bring back dropping unused row versions

This commit is contained in:
Piotr Sarna
2023-06-06 12:47:40 +02:00
parent 625394000e
commit ddbcd9be79

View File

@@ -182,7 +182,6 @@ pub struct Database<Clock: LogicalClock> {
}
impl<Clock: LogicalClock> Database<Clock> {
/// Creates a new database.
pub fn new(clock: Clock, storage: Storage) -> Self {
Self {
@@ -193,7 +192,7 @@ impl<Clock: LogicalClock> Database<Clock> {
storage,
}
}
/// Inserts a new row into the database.
///
/// This function inserts a new `row` into the database within the context
@@ -456,7 +455,43 @@ impl<Clock: LogicalClock> Database<Clock> {
/// FIXME: implement in a lock-free manner
pub fn drop_unused_row_versions(&self) {
tracing::error!("Unused rows are not dropped at the moment. Will do!");
let mut to_remove = Vec::new();
for entry in self.rows.iter() {
let mut row_versions = entry.value().write().unwrap();
row_versions.retain(|rv| {
let should_stay = match rv.end {
Some(TxTimestampOrID::Timestamp(version_end_ts)) => {
// a transaction started before this row version ended,
// ergo row version is needed
// NOTICE: O(row_versions x transactions), but also lock-free, so sounds acceptable
self.txs
.iter()
.any(|tx| version_end_ts >= tx.value().read().unwrap().begin_ts)
}
// Let's skip potentially complex logic if the transaction is still
// active/tracked. We will drop the row version when the transaction
// gets garbage-collected itself, it will always happen eventually.
Some(TxTimestampOrID::TxID(tx_id)) => !self.txs.contains_key(&tx_id),
// this row version is current, ergo visible
None => true,
};
if !should_stay {
tracing::debug!(
"Dropping row version {:?} {:?}-{:?}",
entry.key(),
rv.begin,
rv.end
);
}
should_stay
});
if row_versions.is_empty() {
to_remove.push(*entry.key());
}
}
for id in to_remove {
self.rows.remove(&id);
}
}
pub fn recover(&self) -> Result<()> {