diff --git a/core/mvcc/mvcc-rs/Cargo.toml b/core/mvcc/mvcc-rs/Cargo.toml index 21c83167d..27f030a73 100644 --- a/core/mvcc/mvcc-rs/Cargo.toml +++ b/core/mvcc/mvcc-rs/Cargo.toml @@ -16,13 +16,12 @@ aws-config = "0.55.2" parking_lot = "0.12.1" futures = "0.3.28" crossbeam-skiplist = "0.1.1" +tracing-test = "0" [dev-dependencies] criterion = { version = "0.4", features = ["html_reports", "async", "async_futures"] } pprof = { version = "0.11.1", features = ["criterion", "flamegraph"] } -shuttle = "0.6.0" tracing-subscriber = "0" -tracing-test = "0" mvcc-rs = { path = "." } [[bench]] diff --git a/core/mvcc/mvcc-rs/src/database/mod.rs b/core/mvcc/mvcc-rs/src/database/mod.rs index 01e0c2dce..29bfbdf73 100644 --- a/core/mvcc/mvcc-rs/src/database/mod.rs +++ b/core/mvcc/mvcc-rs/src/database/mod.rs @@ -440,7 +440,7 @@ impl Database { let tx_id = self.get_tx_id(); let begin_ts = self.get_timestamp(); let tx = Transaction::new(tx_id, begin_ts); - tracing::trace!("BEGIN {tx}"); + tracing::trace!("BEGIN {tx}"); self.txs.insert(tx_id, RwLock::new(tx)); tx_id } @@ -565,6 +565,7 @@ impl Database { } } } + tracing::trace!("UPDATED {tx}"); // We have now updated all the versions with a reference to the // transaction ID to a timestamp and can, therefore, remove the // transaction. Please note that when we move to lockless, the @@ -577,6 +578,7 @@ impl Database { if !log_record.row_versions.is_empty() { self.storage.log_tx(log_record)?; } + tracing::trace!("LOGGED {tx}"); Ok(()) } diff --git a/core/mvcc/mvcc-rs/tests/concurrency_test.rs b/core/mvcc/mvcc-rs/tests/concurrency_test.rs index 12321aa10..e284dd6da 100644 --- a/core/mvcc/mvcc-rs/tests/concurrency_test.rs +++ b/core/mvcc/mvcc-rs/tests/concurrency_test.rs @@ -1,65 +1,127 @@ use mvcc_rs::clock::LocalClock; use mvcc_rs::database::{Database, Row, RowID}; -use shuttle::sync::atomic::AtomicU64; -use shuttle::sync::Arc; -use shuttle::thread; +use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; +use std::sync::Arc; +static IDS: AtomicU64 = AtomicU64::new(1); + +#[tracing_test::traced_test] #[test] fn test_non_overlapping_concurrent_inserts() { + tracing_subscriber::fmt::init(); // Two threads insert to the database concurrently using non-overlapping // row IDs. let clock = LocalClock::default(); let storage = mvcc_rs::persistent_storage::Storage::new_noop(); let db = Arc::new(Database::new(clock, storage)); - let ids = Arc::new(AtomicU64::new(0)); - shuttle::check_random( - move || { - { - let db = db.clone(); - let ids = ids.clone(); - thread::spawn(move || { - let tx = db.begin_tx(); - let id = ids.fetch_add(1, Ordering::SeqCst); - let id = RowID { - table_id: 1, - row_id: id, - }; - let row = Row { - id, - data: "Hello".to_string(), - }; - db.insert(tx, row.clone()).unwrap(); - db.commit_tx(tx).unwrap(); - let tx = db.begin_tx(); - let committed_row = db.read(tx, id).unwrap(); - db.commit_tx(tx).unwrap(); - assert_eq!(committed_row, Some(row)); - }); + let iterations = 100000; + + let th1 = { + let db = db.clone(); + std::thread::spawn(move || { + for _ in 0..iterations { + let tx = db.begin_tx(); + let id = IDS.fetch_add(1, Ordering::SeqCst); + let id = RowID { + table_id: 1, + row_id: id, + }; + let row = Row { + id, + data: "Hello".to_string(), + }; + db.insert(tx, row.clone()).unwrap(); + db.commit_tx(tx).unwrap(); + let tx = db.begin_tx(); + let committed_row = db.read(tx, id).unwrap(); + db.commit_tx(tx).unwrap(); + assert_eq!(committed_row, Some(row)); } - { - let db = db.clone(); - let ids = ids.clone(); - thread::spawn(move || { - let tx = db.begin_tx(); - let id = ids.fetch_add(1, Ordering::SeqCst); - let id = RowID { - table_id: 1, - row_id: id, - }; - let row = Row { - id, - data: "World".to_string(), - }; - db.insert(tx, row.clone()).unwrap(); - db.commit_tx(tx).unwrap(); - let tx = db.begin_tx(); - let committed_row = db.read(tx, id).unwrap(); - db.commit_tx(tx).unwrap(); - assert_eq!(committed_row, Some(row)); - }); + }) + }; + let th2 = { + std::thread::spawn(move || { + for _ in 0..iterations { + let tx = db.begin_tx(); + let id = IDS.fetch_add(1, Ordering::SeqCst); + let id = RowID { + table_id: 1, + row_id: id, + }; + let row = Row { + id, + data: "World".to_string(), + }; + db.insert(tx, row.clone()).unwrap(); + db.commit_tx(tx).unwrap(); + let tx = db.begin_tx(); + let committed_row = db.read(tx, id).unwrap(); + db.commit_tx(tx).unwrap(); + assert_eq!(committed_row, Some(row)); } - }, - 100, - ); + }) + }; + th1.join().unwrap(); + th2.join().unwrap(); +} + +#[test] +fn test_overlapping_concurrent_inserts_read_your_writes() { + tracing_subscriber::fmt::init(); + // Two threads insert to the database concurrently using overlapping row IDs. + let clock = LocalClock::default(); + let storage = mvcc_rs::persistent_storage::Storage::new_noop(); + let db = Arc::new(Database::new(clock, storage)); + let iterations = 100000; + + let th1 = { + let db = db.clone(); + std::thread::spawn(move || { + for i in 0..iterations { + if i % 1000 == 0 { + tracing::debug!("{i}"); + } + let tx = db.begin_tx(); + let id = i % 16; + let id = RowID { + table_id: 1, + row_id: id, + }; + let row = Row { + id, + data: format!("Hello @{tx}"), + }; + db.insert(tx, row.clone()).unwrap(); + let committed_row = db.read(tx, id).unwrap(); + db.commit_tx(tx).unwrap(); + assert_eq!(committed_row, Some(row)); + } + }) + }; + let th2 = { + std::thread::spawn(move || { + for i in 0..iterations { + if i % 1000 == 0 { + tracing::debug!("{i}"); + } + let tx = db.begin_tx(); + let id = i % 16; + let id = RowID { + table_id: 1, + row_id: id, + }; + let row = Row { + id, + data: format!("World @{tx}"), + }; + db.insert(tx, row.clone()).unwrap(); + let committed_row = db.read(tx, id).unwrap(); + db.commit_tx(tx).unwrap(); + assert_eq!(committed_row, Some(row)); + } + }) + }; + th1.join().unwrap(); + th2.join().unwrap(); }