From 0338e14814875235f63e40901d9be6fa69b9cdc6 Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Tue, 13 Jun 2023 13:47:38 +0200 Subject: [PATCH] database: change insert to upsert in concurrency tests Using insert() was a violation of our API, kind of, because inserts are not expected to be called twice on the same id. Instead, update or upsert should delete the version first, and that's what's done in this patch. At the same time, write-write conflict detection needed to be implemented, because we started hitting it with rollback(). Finally, garbage collection is modified to actually work and garbage-collect row versions. Without it, the number of tracked row versions very quickly goes out of hand. --- core/mvcc/mvcc-rs/src/database/mod.rs | 30 ++++++++++++++++++--- core/mvcc/mvcc-rs/tests/concurrency_test.rs | 6 ++++- 2 files changed, 32 insertions(+), 4 deletions(-) diff --git a/core/mvcc/mvcc-rs/src/database/mod.rs b/core/mvcc/mvcc-rs/src/database/mod.rs index 4941b5305..064864a7c 100644 --- a/core/mvcc/mvcc-rs/src/database/mod.rs +++ b/core/mvcc/mvcc-rs/src/database/mod.rs @@ -314,6 +314,12 @@ impl Database { }) .map(|p| p + 1) .unwrap_or(0); + if versions.len() - position > 3 { + tracing::debug!( + "Inserting an element {} positions from the end", + versions.len() - position + ); + } versions.insert(position, row_version); } @@ -372,6 +378,13 @@ impl Database { Ok(true) } + /// Inserts a row in the database with new values, previously deleting + /// any old data if it existed. + pub fn upsert(&self, tx_id: TxID, row: Row) -> Result<()> { + self.delete(tx_id, row.id).ok(); + self.insert(tx_id, row) + } + /// Deletes a row from the table with the given `id`. /// /// This function deletes an existing row `id` in the database within the @@ -676,12 +689,21 @@ impl Database { self.clock.get_timestamp() } - /// FIXME: implement in a lock-free manner - pub fn drop_unused_row_versions(&self) { + /// Removes unused row versions with very loose heuristics, + /// which sometimes leaves versions intact for too long. + /// Returns the number of removed versions. + pub fn drop_unused_row_versions(&self) -> usize { + tracing::debug!( + "transactions: {}; rows: {}", + self.txs.len(), + self.rows.len() + ); let mut to_remove = Vec::new(); for entry in self.rows.iter() { let mut row_versions = entry.value().write().unwrap(); + tracing::debug!("versions: {}", row_versions.len()); row_versions.retain(|rv| { + tracing::debug!("inspecting {rv:?}"); let should_stay = match rv.end { Some(TxTimestampOrID::Timestamp(version_end_ts)) => { // a transaction started before this row version ended, @@ -699,7 +721,7 @@ impl Database { None => true, }; if !should_stay { - tracing::debug!( + tracing::trace!( "Dropping row version {:?} {:?}-{:?}", entry.key(), rv.begin, @@ -712,9 +734,11 @@ impl Database { to_remove.push(*entry.key()); } } + let dropped = to_remove.len(); for id in to_remove { self.rows.remove(&id); } + dropped } pub fn recover(&self) -> Result<()> { diff --git a/core/mvcc/mvcc-rs/tests/concurrency_test.rs b/core/mvcc/mvcc-rs/tests/concurrency_test.rs index fced575d7..5cf7ccbd1 100644 --- a/core/mvcc/mvcc-rs/tests/concurrency_test.rs +++ b/core/mvcc/mvcc-rs/tests/concurrency_test.rs @@ -86,6 +86,10 @@ fn test_overlapping_concurrent_inserts_read_your_writes() { if i % 1000 == 0 { tracing::debug!("{prefix}: {i}"); } + if i % 10000 == 0 { + let dropped = db.drop_unused_row_versions(); + tracing::debug!("garbage collected {dropped} versions"); + } let tx = db.begin_tx(); let id = i % 16; let id = RowID { @@ -96,7 +100,7 @@ fn test_overlapping_concurrent_inserts_read_your_writes() { id, data: format!("{prefix} @{tx}"), }; - db.insert(tx, row.clone()).unwrap(); + db.upsert(tx, row.clone()).unwrap(); let committed_row = db.read(tx, id).unwrap(); db.commit_tx(tx).unwrap(); assert_eq!(committed_row, Some(row));