Merge pull request #55 from penberg/upsert_tests

database: change insert to upsert in concurrency tests
This commit is contained in:
Pekka Enberg
2023-06-13 16:55:44 +03:00
committed by GitHub
2 changed files with 32 additions and 4 deletions

View File

@@ -314,6 +314,12 @@ impl<Clock: LogicalClock> Database<Clock> {
})
.map(|p| p + 1)
.unwrap_or(0);
if versions.len() - position > 3 {
tracing::debug!(
"Inserting an element {} positions from the end",
versions.len() - position
);
}
versions.insert(position, row_version);
}
@@ -372,6 +378,13 @@ impl<Clock: LogicalClock> Database<Clock> {
Ok(true)
}
/// Inserts a row in the database with new values, previously deleting
/// any old data if it existed.
pub fn upsert(&self, tx_id: TxID, row: Row) -> Result<()> {
self.delete(tx_id, row.id).ok();
self.insert(tx_id, row)
}
/// Deletes a row from the table with the given `id`.
///
/// This function deletes an existing row `id` in the database within the
@@ -676,12 +689,21 @@ impl<Clock: LogicalClock> Database<Clock> {
self.clock.get_timestamp()
}
/// FIXME: implement in a lock-free manner
pub fn drop_unused_row_versions(&self) {
/// Removes unused row versions with very loose heuristics,
/// which sometimes leaves versions intact for too long.
/// Returns the number of removed versions.
pub fn drop_unused_row_versions(&self) -> usize {
tracing::debug!(
"transactions: {}; rows: {}",
self.txs.len(),
self.rows.len()
);
let mut to_remove = Vec::new();
for entry in self.rows.iter() {
let mut row_versions = entry.value().write().unwrap();
tracing::debug!("versions: {}", row_versions.len());
row_versions.retain(|rv| {
tracing::debug!("inspecting {rv:?}");
let should_stay = match rv.end {
Some(TxTimestampOrID::Timestamp(version_end_ts)) => {
// a transaction started before this row version ended,
@@ -699,7 +721,7 @@ impl<Clock: LogicalClock> Database<Clock> {
None => true,
};
if !should_stay {
tracing::debug!(
tracing::trace!(
"Dropping row version {:?} {:?}-{:?}",
entry.key(),
rv.begin,
@@ -712,9 +734,11 @@ impl<Clock: LogicalClock> Database<Clock> {
to_remove.push(*entry.key());
}
}
let dropped = to_remove.len();
for id in to_remove {
self.rows.remove(&id);
}
dropped
}
pub fn recover(&self) -> Result<()> {

View File

@@ -86,6 +86,10 @@ fn test_overlapping_concurrent_inserts_read_your_writes() {
if i % 1000 == 0 {
tracing::debug!("{prefix}: {i}");
}
if i % 10000 == 0 {
let dropped = db.drop_unused_row_versions();
tracing::debug!("garbage collected {dropped} versions");
}
let tx = db.begin_tx();
let id = i % 16;
let id = RowID {
@@ -96,7 +100,7 @@ fn test_overlapping_concurrent_inserts_read_your_writes() {
id,
data: format!("{prefix} @{tx}"),
};
db.insert(tx, row.clone()).unwrap();
db.upsert(tx, row.clone()).unwrap();
let committed_row = db.read(tx, id).unwrap();
db.commit_tx(tx).unwrap();
assert_eq!(committed_row, Some(row));