database: change insert to upsert in concurrency tests

Using insert() was a violation of our API, kind of, because
inserts are not expected to be called twice on the same id.
Instead, update or upsert should delete the version first,
and that's what's done in this patch.

At the same time, write-write conflict detection needed to be
implemented, because we started hitting it with rollback().

Finally, garbage collection is modified to actually work
and garbage-collect row versions. Without it, the number of tracked
row versions very quickly goes out of hand.
This commit is contained in:
Piotr Sarna
2023-06-13 13:47:38 +02:00
parent 46e3f7e3c4
commit 0338e14814
2 changed files with 32 additions and 4 deletions

View File

@@ -314,6 +314,12 @@ impl<Clock: LogicalClock> Database<Clock> {
})
.map(|p| p + 1)
.unwrap_or(0);
if versions.len() - position > 3 {
tracing::debug!(
"Inserting an element {} positions from the end",
versions.len() - position
);
}
versions.insert(position, row_version);
}
@@ -372,6 +378,13 @@ impl<Clock: LogicalClock> Database<Clock> {
Ok(true)
}
/// Inserts a row in the database with new values, previously deleting
/// any old data if it existed.
pub fn upsert(&self, tx_id: TxID, row: Row) -> Result<()> {
self.delete(tx_id, row.id).ok();
self.insert(tx_id, row)
}
/// Deletes a row from the table with the given `id`.
///
/// This function deletes an existing row `id` in the database within the
@@ -676,12 +689,21 @@ impl<Clock: LogicalClock> Database<Clock> {
self.clock.get_timestamp()
}
/// FIXME: implement in a lock-free manner
pub fn drop_unused_row_versions(&self) {
/// Removes unused row versions with very loose heuristics,
/// which sometimes leaves versions intact for too long.
/// Returns the number of removed versions.
pub fn drop_unused_row_versions(&self) -> usize {
tracing::debug!(
"transactions: {}; rows: {}",
self.txs.len(),
self.rows.len()
);
let mut to_remove = Vec::new();
for entry in self.rows.iter() {
let mut row_versions = entry.value().write().unwrap();
tracing::debug!("versions: {}", row_versions.len());
row_versions.retain(|rv| {
tracing::debug!("inspecting {rv:?}");
let should_stay = match rv.end {
Some(TxTimestampOrID::Timestamp(version_end_ts)) => {
// a transaction started before this row version ended,
@@ -699,7 +721,7 @@ impl<Clock: LogicalClock> Database<Clock> {
None => true,
};
if !should_stay {
tracing::debug!(
tracing::trace!(
"Dropping row version {:?} {:?}-{:?}",
entry.key(),
rv.begin,
@@ -712,9 +734,11 @@ impl<Clock: LogicalClock> Database<Clock> {
to_remove.push(*entry.key());
}
}
let dropped = to_remove.len();
for id in to_remove {
self.rows.remove(&id);
}
dropped
}
pub fn recover(&self) -> Result<()> {

View File

@@ -86,6 +86,10 @@ fn test_overlapping_concurrent_inserts_read_your_writes() {
if i % 1000 == 0 {
tracing::debug!("{prefix}: {i}");
}
if i % 10000 == 0 {
let dropped = db.drop_unused_row_versions();
tracing::debug!("garbage collected {dropped} versions");
}
let tx = db.begin_tx();
let id = i % 16;
let id = RowID {
@@ -96,7 +100,7 @@ fn test_overlapping_concurrent_inserts_read_your_writes() {
id,
data: format!("{prefix} @{tx}"),
};
db.insert(tx, row.clone()).unwrap();
db.upsert(tx, row.clone()).unwrap();
let committed_row = db.read(tx, id).unwrap();
db.commit_tx(tx).unwrap();
assert_eq!(committed_row, Some(row));