diff --git a/core/mvcc/mvcc-rs/src/database/mod.rs b/core/mvcc/mvcc-rs/src/database/mod.rs index 4941b5305..064864a7c 100644 --- a/core/mvcc/mvcc-rs/src/database/mod.rs +++ b/core/mvcc/mvcc-rs/src/database/mod.rs @@ -314,6 +314,12 @@ impl Database { }) .map(|p| p + 1) .unwrap_or(0); + if versions.len() - position > 3 { + tracing::debug!( + "Inserting an element {} positions from the end", + versions.len() - position + ); + } versions.insert(position, row_version); } @@ -372,6 +378,13 @@ impl Database { Ok(true) } + /// Inserts a row in the database with new values, previously deleting + /// any old data if it existed. + pub fn upsert(&self, tx_id: TxID, row: Row) -> Result<()> { + self.delete(tx_id, row.id).ok(); + self.insert(tx_id, row) + } + /// Deletes a row from the table with the given `id`. /// /// This function deletes an existing row `id` in the database within the @@ -676,12 +689,21 @@ impl Database { self.clock.get_timestamp() } - /// FIXME: implement in a lock-free manner - pub fn drop_unused_row_versions(&self) { + /// Removes unused row versions with very loose heuristics, + /// which sometimes leaves versions intact for too long. + /// Returns the number of removed versions. + pub fn drop_unused_row_versions(&self) -> usize { + tracing::debug!( + "transactions: {}; rows: {}", + self.txs.len(), + self.rows.len() + ); let mut to_remove = Vec::new(); for entry in self.rows.iter() { let mut row_versions = entry.value().write().unwrap(); + tracing::debug!("versions: {}", row_versions.len()); row_versions.retain(|rv| { + tracing::debug!("inspecting {rv:?}"); let should_stay = match rv.end { Some(TxTimestampOrID::Timestamp(version_end_ts)) => { // a transaction started before this row version ended, @@ -699,7 +721,7 @@ impl Database { None => true, }; if !should_stay { - tracing::debug!( + tracing::trace!( "Dropping row version {:?} {:?}-{:?}", entry.key(), rv.begin, @@ -712,9 +734,11 @@ impl Database { to_remove.push(*entry.key()); } } + let dropped = to_remove.len(); for id in to_remove { self.rows.remove(&id); } + dropped } pub fn recover(&self) -> Result<()> { diff --git a/core/mvcc/mvcc-rs/tests/concurrency_test.rs b/core/mvcc/mvcc-rs/tests/concurrency_test.rs index fced575d7..5cf7ccbd1 100644 --- a/core/mvcc/mvcc-rs/tests/concurrency_test.rs +++ b/core/mvcc/mvcc-rs/tests/concurrency_test.rs @@ -86,6 +86,10 @@ fn test_overlapping_concurrent_inserts_read_your_writes() { if i % 1000 == 0 { tracing::debug!("{prefix}: {i}"); } + if i % 10000 == 0 { + let dropped = db.drop_unused_row_versions(); + tracing::debug!("garbage collected {dropped} versions"); + } let tx = db.begin_tx(); let id = i % 16; let id = RowID { @@ -96,7 +100,7 @@ fn test_overlapping_concurrent_inserts_read_your_writes() { id, data: format!("{prefix} @{tx}"), }; - db.insert(tx, row.clone()).unwrap(); + db.upsert(tx, row.clone()).unwrap(); let committed_row = db.read(tx, id).unwrap(); db.commit_tx(tx).unwrap(); assert_eq!(committed_row, Some(row));