concurrency test: port to OS threads

Without mutexes, it makes no sense anymore to use shuttle.
Instead, the test cases just spawn OS threads.
Also, a case with overlapping ids is added, to test whether
transactions read their own writes within the same transaction.
This commit is contained in:
Piotr Sarna
2023-06-12 13:33:29 +02:00
parent a93fcdcbcf
commit 57249f2c94
3 changed files with 117 additions and 54 deletions

View File

@@ -16,13 +16,12 @@ aws-config = "0.55.2"
parking_lot = "0.12.1"
futures = "0.3.28"
crossbeam-skiplist = "0.1.1"
tracing-test = "0"
[dev-dependencies]
criterion = { version = "0.4", features = ["html_reports", "async", "async_futures"] }
pprof = { version = "0.11.1", features = ["criterion", "flamegraph"] }
shuttle = "0.6.0"
tracing-subscriber = "0"
tracing-test = "0"
mvcc-rs = { path = "." }
[[bench]]

View File

@@ -440,7 +440,7 @@ impl<Clock: LogicalClock> Database<Clock> {
let tx_id = self.get_tx_id();
let begin_ts = self.get_timestamp();
let tx = Transaction::new(tx_id, begin_ts);
tracing::trace!("BEGIN {tx}");
tracing::trace!("BEGIN {tx}");
self.txs.insert(tx_id, RwLock::new(tx));
tx_id
}
@@ -565,6 +565,7 @@ impl<Clock: LogicalClock> Database<Clock> {
}
}
}
tracing::trace!("UPDATED {tx}");
// We have now updated all the versions with a reference to the
// transaction ID to a timestamp and can, therefore, remove the
// transaction. Please note that when we move to lockless, the
@@ -577,6 +578,7 @@ impl<Clock: LogicalClock> Database<Clock> {
if !log_record.row_versions.is_empty() {
self.storage.log_tx(log_record)?;
}
tracing::trace!("LOGGED {tx}");
Ok(())
}

View File

@@ -1,65 +1,127 @@
use mvcc_rs::clock::LocalClock;
use mvcc_rs::database::{Database, Row, RowID};
use shuttle::sync::atomic::AtomicU64;
use shuttle::sync::Arc;
use shuttle::thread;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;
static IDS: AtomicU64 = AtomicU64::new(1);
#[tracing_test::traced_test]
#[test]
fn test_non_overlapping_concurrent_inserts() {
tracing_subscriber::fmt::init();
// Two threads insert to the database concurrently using non-overlapping
// row IDs.
let clock = LocalClock::default();
let storage = mvcc_rs::persistent_storage::Storage::new_noop();
let db = Arc::new(Database::new(clock, storage));
let ids = Arc::new(AtomicU64::new(0));
shuttle::check_random(
move || {
{
let db = db.clone();
let ids = ids.clone();
thread::spawn(move || {
let tx = db.begin_tx();
let id = ids.fetch_add(1, Ordering::SeqCst);
let id = RowID {
table_id: 1,
row_id: id,
};
let row = Row {
id,
data: "Hello".to_string(),
};
db.insert(tx, row.clone()).unwrap();
db.commit_tx(tx).unwrap();
let tx = db.begin_tx();
let committed_row = db.read(tx, id).unwrap();
db.commit_tx(tx).unwrap();
assert_eq!(committed_row, Some(row));
});
let iterations = 100000;
let th1 = {
let db = db.clone();
std::thread::spawn(move || {
for _ in 0..iterations {
let tx = db.begin_tx();
let id = IDS.fetch_add(1, Ordering::SeqCst);
let id = RowID {
table_id: 1,
row_id: id,
};
let row = Row {
id,
data: "Hello".to_string(),
};
db.insert(tx, row.clone()).unwrap();
db.commit_tx(tx).unwrap();
let tx = db.begin_tx();
let committed_row = db.read(tx, id).unwrap();
db.commit_tx(tx).unwrap();
assert_eq!(committed_row, Some(row));
}
{
let db = db.clone();
let ids = ids.clone();
thread::spawn(move || {
let tx = db.begin_tx();
let id = ids.fetch_add(1, Ordering::SeqCst);
let id = RowID {
table_id: 1,
row_id: id,
};
let row = Row {
id,
data: "World".to_string(),
};
db.insert(tx, row.clone()).unwrap();
db.commit_tx(tx).unwrap();
let tx = db.begin_tx();
let committed_row = db.read(tx, id).unwrap();
db.commit_tx(tx).unwrap();
assert_eq!(committed_row, Some(row));
});
})
};
let th2 = {
std::thread::spawn(move || {
for _ in 0..iterations {
let tx = db.begin_tx();
let id = IDS.fetch_add(1, Ordering::SeqCst);
let id = RowID {
table_id: 1,
row_id: id,
};
let row = Row {
id,
data: "World".to_string(),
};
db.insert(tx, row.clone()).unwrap();
db.commit_tx(tx).unwrap();
let tx = db.begin_tx();
let committed_row = db.read(tx, id).unwrap();
db.commit_tx(tx).unwrap();
assert_eq!(committed_row, Some(row));
}
},
100,
);
})
};
th1.join().unwrap();
th2.join().unwrap();
}
#[test]
fn test_overlapping_concurrent_inserts_read_your_writes() {
tracing_subscriber::fmt::init();
// Two threads insert to the database concurrently using overlapping row IDs.
let clock = LocalClock::default();
let storage = mvcc_rs::persistent_storage::Storage::new_noop();
let db = Arc::new(Database::new(clock, storage));
let iterations = 100000;
let th1 = {
let db = db.clone();
std::thread::spawn(move || {
for i in 0..iterations {
if i % 1000 == 0 {
tracing::debug!("{i}");
}
let tx = db.begin_tx();
let id = i % 16;
let id = RowID {
table_id: 1,
row_id: id,
};
let row = Row {
id,
data: format!("Hello @{tx}"),
};
db.insert(tx, row.clone()).unwrap();
let committed_row = db.read(tx, id).unwrap();
db.commit_tx(tx).unwrap();
assert_eq!(committed_row, Some(row));
}
})
};
let th2 = {
std::thread::spawn(move || {
for i in 0..iterations {
if i % 1000 == 0 {
tracing::debug!("{i}");
}
let tx = db.begin_tx();
let id = i % 16;
let id = RowID {
table_id: 1,
row_id: id,
};
let row = Row {
id,
data: format!("World @{tx}"),
};
db.insert(tx, row.clone()).unwrap();
let committed_row = db.read(tx, id).unwrap();
db.commit_tx(tx).unwrap();
assert_eq!(committed_row, Some(row));
}
})
};
th1.join().unwrap();
th2.join().unwrap();
}