diff --git a/core/mvcc/bindings/c/include/mvcc.h b/core/mvcc/bindings/c/include/mvcc.h index bd6aaa4d8..e0c432cd9 100644 --- a/core/mvcc/bindings/c/include/mvcc.h +++ b/core/mvcc/bindings/c/include/mvcc.h @@ -25,13 +25,21 @@ MVCCDatabaseRef MVCCDatabaseOpen(const char *path); void MVCCDatabaseClose(MVCCDatabaseRef db); -MVCCError MVCCDatabaseInsert(MVCCDatabaseRef db, uint64_t id, const void *value_ptr, uintptr_t value_len); +MVCCError MVCCDatabaseInsert(MVCCDatabaseRef db, + uint64_t table_id, + uint64_t row_id, + const void *value_ptr, + uintptr_t value_len); -MVCCError MVCCDatabaseRead(MVCCDatabaseRef db, uint64_t id, uint8_t **value_ptr, int64_t *value_len); +MVCCError MVCCDatabaseRead(MVCCDatabaseRef db, + uint64_t table_id, + uint64_t row_id, + uint8_t **value_ptr, + int64_t *value_len); void MVCCFreeStr(void *ptr); -MVCCScanCursorRef MVCCScanCursorOpen(MVCCDatabaseRef db); +MVCCScanCursorRef MVCCScanCursorOpen(MVCCDatabaseRef db, uint64_t table_id); void MVCCScanCursorClose(MVCCScanCursorRef cursor); diff --git a/core/mvcc/bindings/c/src/lib.rs b/core/mvcc/bindings/c/src/lib.rs index 6da877c12..a68235835 100644 --- a/core/mvcc/bindings/c/src/lib.rs +++ b/core/mvcc/bindings/c/src/lib.rs @@ -64,7 +64,8 @@ pub unsafe extern "C" fn MVCCDatabaseClose(db: MVCCDatabaseRef) { #[no_mangle] pub unsafe extern "C" fn MVCCDatabaseInsert( db: MVCCDatabaseRef, - id: u64, + table_id: u64, + row_id: u64, value_ptr: *const std::ffi::c_void, value_len: usize, ) -> MVCCError { @@ -79,6 +80,7 @@ pub unsafe extern "C" fn MVCCDatabaseInsert( } }; let (db, runtime) = (&db.db, &db.runtime); + let id = database::RowID { table_id, row_id }; let row = database::Row { id, data }; tracing::debug!("MVCCDatabaseInsert: {row:?}"); match runtime.block_on(async move { @@ -100,7 +102,8 @@ pub unsafe extern "C" fn MVCCDatabaseInsert( #[no_mangle] pub unsafe extern "C" fn MVCCDatabaseRead( db: MVCCDatabaseRef, - id: u64, + table_id: u64, + row_id: u64, value_ptr: *mut *mut u8, value_len: *mut i64, ) -> MVCCError { @@ -109,6 +112,7 @@ pub unsafe extern "C" fn MVCCDatabaseRead( match runtime.block_on(async move { let tx = db.begin_tx().await; + let id = database::RowID { table_id, row_id }; let maybe_row = db.read(tx, id).await?; match maybe_row { Some(row) => { @@ -148,13 +152,18 @@ pub unsafe extern "C" fn MVCCFreeStr(ptr: *mut std::ffi::c_void) { } #[no_mangle] -pub unsafe extern "C" fn MVCCScanCursorOpen(db: MVCCDatabaseRef) -> MVCCScanCursorRef { +pub unsafe extern "C" fn MVCCScanCursorOpen( + db: MVCCDatabaseRef, + table_id: u64, +) -> MVCCScanCursorRef { tracing::debug!("MVCCScanCursorOpen()"); // Reference is transmuted to &'static in order to be able to pass the cursor back to C. // The contract with C is to never use a cursor after MVCCDatabaseClose() has been called. let database = unsafe { std::mem::transmute::<&DbContext, &'static DbContext>(db.get_ref()) }; let (database, runtime) = (&database.db, &database.runtime); - match runtime.block_on(async move { mvcc_rs::cursor::ScanCursor::new(database).await }) { + match runtime + .block_on(async move { mvcc_rs::cursor::ScanCursor::new(database, table_id).await }) + { Ok(cursor) => { if cursor.is_empty() { tracing::debug!("Cursor is empty"); @@ -254,5 +263,8 @@ pub unsafe extern "C" fn MVCCScanCursorNext(cursor: MVCCScanCursorRef) -> std::f pub unsafe extern "C" fn MVCCScanCursorPosition(cursor: MVCCScanCursorRef) -> u64 { let cursor_ctx = unsafe { &mut *cursor.ptr }; let cursor = &mut cursor_ctx.cursor; - cursor.current_row_id().unwrap_or(0) + cursor + .current_row_id() + .map(|row_id| row_id.row_id) + .unwrap_or(0) } diff --git a/core/mvcc/mvcc-rs/benches/my_benchmark.rs b/core/mvcc/mvcc-rs/benches/my_benchmark.rs index 3c360107a..36ccf45eb 100644 --- a/core/mvcc/mvcc-rs/benches/my_benchmark.rs +++ b/core/mvcc/mvcc-rs/benches/my_benchmark.rs @@ -1,7 +1,7 @@ use criterion::async_executor::FuturesExecutor; use criterion::{criterion_group, criterion_main, Criterion, Throughput}; use mvcc_rs::clock::LocalClock; -use mvcc_rs::database::{Database, Row}; +use mvcc_rs::database::{Database, Row, RowID}; use pprof::criterion::{Output, PProfProfiler}; fn bench_db() -> Database< @@ -47,7 +47,15 @@ fn bench(c: &mut Criterion) { group.bench_function("begin_tx-read-commit_tx", |b| { b.to_async(FuturesExecutor).iter(|| async { let tx_id = db.begin_tx().await; - db.read(tx_id, 1).await.unwrap(); + db.read( + tx_id, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .await + .unwrap(); db.commit_tx(tx_id).await }) }); @@ -59,7 +67,10 @@ fn bench(c: &mut Criterion) { db.update( tx_id, Row { - id: 1, + id: RowID { + table_id: 1, + row_id: 1, + }, data: "World".to_string(), }, ) @@ -74,14 +85,25 @@ fn bench(c: &mut Criterion) { futures::executor::block_on(db.insert( tx, Row { - id: 1, + id: RowID { + table_id: 1, + row_id: 1, + }, data: "Hello".to_string(), }, )) .unwrap(); group.bench_function("read", |b| { b.to_async(FuturesExecutor).iter(|| async { - db.read(tx, 1).await.unwrap(); + db.read( + tx, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .await + .unwrap(); }) }); @@ -90,7 +112,10 @@ fn bench(c: &mut Criterion) { futures::executor::block_on(db.insert( tx, Row { - id: 1, + id: RowID { + table_id: 1, + row_id: 1, + }, data: "Hello".to_string(), }, )) @@ -100,7 +125,10 @@ fn bench(c: &mut Criterion) { db.update( tx, Row { - id: 1, + id: RowID { + table_id: 1, + row_id: 1, + }, data: "World".to_string(), }, ) diff --git a/core/mvcc/mvcc-rs/src/cursor.rs b/core/mvcc/mvcc-rs/src/cursor.rs index 7d9455426..397265d7c 100644 --- a/core/mvcc/mvcc-rs/src/cursor.rs +++ b/core/mvcc/mvcc-rs/src/cursor.rs @@ -1,5 +1,5 @@ use crate::clock::LogicalClock; -use crate::database::{Database, DatabaseInner, Result, Row}; +use crate::database::{Database, DatabaseInner, Result, Row, RowID}; use crate::persistent_storage::Storage; use crate::sync::AsyncMutex; @@ -11,7 +11,7 @@ pub struct ScanCursor< Mutex: AsyncMutex>, > { pub db: &'a Database, - pub row_ids: Vec, + pub row_ids: Vec, pub index: usize, tx_id: u64, } @@ -25,9 +25,10 @@ impl< { pub async fn new( db: &'a Database, + table_id: u64, ) -> Result> { let tx_id = db.begin_tx().await; - let row_ids = db.scan_row_ids().await?; + let row_ids = db.scan_row_ids_for_table(table_id).await?; Ok(Self { db, tx_id, @@ -36,7 +37,7 @@ impl< }) } - pub fn current_row_id(&self) -> Option { + pub fn current_row_id(&self) -> Option { if self.index >= self.row_ids.len() { return None; } diff --git a/core/mvcc/mvcc-rs/src/database.rs b/core/mvcc/mvcc-rs/src/database.rs index efd45d966..8611f392f 100644 --- a/core/mvcc/mvcc-rs/src/database.rs +++ b/core/mvcc/mvcc-rs/src/database.rs @@ -8,9 +8,16 @@ use std::sync::Arc; pub type Result = std::result::Result; +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, Hash)] +pub struct RowID { + pub table_id: u64, + pub row_id: u64, +} + #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] + pub struct Row { - pub id: u64, + pub id: RowID, pub data: String, } @@ -62,9 +69,9 @@ pub struct Transaction { /// The transaction begin timestamp. begin_ts: u64, /// The transaction write set. - write_set: HashSet, + write_set: HashSet, /// The transaction read set. - read_set: RefCell>, + read_set: RefCell>, } impl Transaction { @@ -78,12 +85,12 @@ impl Transaction { } } - fn insert_to_read_set(&self, id: u64) { + fn insert_to_read_set(&self, id: RowID) { let mut read_set = self.read_set.borrow_mut(); read_set.insert(id); } - fn insert_to_write_set(&mut self, id: u64) { + fn insert_to_write_set(&mut self, id: RowID) { self.write_set.insert(id); } } @@ -201,7 +208,7 @@ impl< /// /// Returns `true` if the row was successfully deleted, and `false` otherwise. /// - pub async fn delete(&self, tx_id: TxID, id: u64) -> Result { + pub async fn delete(&self, tx_id: TxID, id: RowID) -> Result { let inner = self.inner.lock().await; inner.delete(tx_id, id).await } @@ -220,16 +227,21 @@ impl< /// /// Returns `Some(row)` with the row data if the row with the given `id` exists, /// and `None` otherwise. - pub async fn read(&self, tx_id: TxID, id: u64) -> Result> { + pub async fn read(&self, tx_id: TxID, id: RowID) -> Result> { let inner = self.inner.lock().await; inner.read(tx_id, id).await } - pub async fn scan_row_ids(&self) -> Result> { + pub async fn scan_row_ids(&self) -> Result> { let inner = self.inner.lock().await; inner.scan_row_ids() } + pub async fn scan_row_ids_for_table(&self, table_id: u64) -> Result> { + let inner = self.inner.lock().await; + inner.scan_row_ids_for_table(table_id) + } + /// Begins a new transaction in the database. /// /// This function starts a new transaction in the database and returns a `TxID` value @@ -284,7 +296,7 @@ impl< #[derive(Debug)] pub struct DatabaseInner { - rows: RefCell>>, + rows: RefCell>>, txs: RefCell>, tx_timestamps: RefCell>, tx_ids: AtomicU64, @@ -314,7 +326,7 @@ impl } #[allow(clippy::await_holding_refcell_ref)] - async fn delete(&self, tx_id: TxID, id: u64) -> Result { + async fn delete(&self, tx_id: TxID, id: RowID) -> Result { // NOTICE: They *are* dropped before an await point!!! But the await is conditional, // so I think clippy is just confused. let mut txs = self.txs.borrow_mut(); @@ -344,7 +356,7 @@ impl Ok(false) } - async fn read(&self, tx_id: TxID, id: u64) -> Result> { + async fn read(&self, tx_id: TxID, id: RowID) -> Result> { let txs = self.txs.borrow_mut(); let tx = txs.get(&tx_id).unwrap(); assert!(tx.state == TransactionState::Active); @@ -360,11 +372,28 @@ impl Ok(None) } - fn scan_row_ids(&self) -> Result> { + fn scan_row_ids(&self) -> Result> { let rows = self.rows.borrow(); Ok(rows.keys().cloned().collect()) } + fn scan_row_ids_for_table(&self, table_id: u64) -> Result> { + let rows = self.rows.borrow(); + Ok(rows + .range( + RowID { + table_id, + row_id: 0, + }..RowID { + table_id, + row_id: u64::MAX, + }, + ) + .map(|(k, _)| k) + .cloned() + .collect()) + } + async fn begin_tx(&mut self) -> TxID { let tx_id = self.get_tx_id(); let begin_ts = self.get_timestamp(); @@ -496,7 +525,7 @@ impl None => true, }; if !should_stay { - tracing::debug!("Dropping row version {} {:?}-{:?}", id, rv.begin, rv.end); + tracing::debug!("Dropping row version {:?} {:?}-{:?}", id, rv.begin, rv.end); } should_stay }); @@ -613,16 +642,39 @@ mod tests { let tx1 = db.begin_tx().await; let tx1_row = Row { - id: 1, + id: RowID { + table_id: 1, + row_id: 1, + }, data: "Hello".to_string(), }; db.insert(tx1, tx1_row.clone()).await.unwrap(); - let row = db.read(tx1, 1).await.unwrap().unwrap(); + let row = db + .read( + tx1, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .await + .unwrap() + .unwrap(); assert_eq!(tx1_row, row); db.commit_tx(tx1).await.unwrap(); let tx2 = db.begin_tx().await; - let row = db.read(tx2, 1).await.unwrap().unwrap(); + let row = db + .read( + tx2, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .await + .unwrap() + .unwrap(); assert_eq!(tx1_row, row); } @@ -631,7 +683,15 @@ mod tests { async fn test_read_nonexistent() { let db = test_db(); let tx = db.begin_tx().await; - let row = db.read(tx, 1).await; + let row = db + .read( + tx, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .await; assert!(row.unwrap().is_none()); } @@ -642,19 +702,58 @@ mod tests { let tx1 = db.begin_tx().await; let tx1_row = Row { - id: 1, + id: RowID { + table_id: 1, + row_id: 1, + }, data: "Hello".to_string(), }; db.insert(tx1, tx1_row.clone()).await.unwrap(); - let row = db.read(tx1, 1).await.unwrap().unwrap(); + let row = db + .read( + tx1, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .await + .unwrap() + .unwrap(); assert_eq!(tx1_row, row); - db.delete(tx1, 1).await.unwrap(); - let row = db.read(tx1, 1).await.unwrap(); + db.delete( + tx1, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .await + .unwrap(); + let row = db + .read( + tx1, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .await + .unwrap(); assert!(row.is_none()); db.commit_tx(tx1).await.unwrap(); let tx2 = db.begin_tx().await; - let row = db.read(tx2, 1).await.unwrap(); + let row = db + .read( + tx2, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .await + .unwrap(); assert!(row.is_none()); } @@ -663,7 +762,16 @@ mod tests { async fn test_delete_nonexistent() { let db = test_db(); let tx = db.begin_tx().await; - assert!(!db.delete(tx, 1).await.unwrap()); + assert!(!db + .delete( + tx, + RowID { + table_id: 1, + row_id: 1 + } + ) + .await + .unwrap()); } #[traced_test] @@ -672,23 +780,59 @@ mod tests { let db = test_db(); let tx1 = db.begin_tx().await; let tx1_row = Row { - id: 1, + id: RowID { + table_id: 1, + row_id: 1, + }, data: "Hello".to_string(), }; db.insert(tx1, tx1_row.clone()).await.unwrap(); - let row = db.read(tx1, 1).await.unwrap().unwrap(); + let row = db + .read( + tx1, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .await + .unwrap() + .unwrap(); assert_eq!(tx1_row, row); let tx1_updated_row = Row { - id: 1, + id: RowID { + table_id: 1, + row_id: 1, + }, data: "World".to_string(), }; db.update(tx1, tx1_updated_row.clone()).await.unwrap(); - let row = db.read(tx1, 1).await.unwrap().unwrap(); + let row = db + .read( + tx1, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .await + .unwrap() + .unwrap(); assert_eq!(tx1_updated_row, row); db.commit_tx(tx1).await.unwrap(); let tx2 = db.begin_tx().await; - let row = db.read(tx2, 1).await.unwrap().unwrap(); + let row = db + .read( + tx2, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .await + .unwrap() + .unwrap(); db.commit_tx(tx2).await.unwrap(); assert_eq!(tx1_updated_row, row); db.drop_unused_row_versions().await; @@ -700,22 +844,57 @@ mod tests { let db = test_db(); let tx1 = db.begin_tx().await; let row1 = Row { - id: 1, + id: RowID { + table_id: 1, + row_id: 1, + }, data: "Hello".to_string(), }; db.insert(tx1, row1.clone()).await.unwrap(); - let row2 = db.read(tx1, 1).await.unwrap().unwrap(); + let row2 = db + .read( + tx1, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .await + .unwrap() + .unwrap(); assert_eq!(row1, row2); let row3 = Row { - id: 1, + id: RowID { + table_id: 1, + row_id: 1, + }, data: "World".to_string(), }; db.update(tx1, row3.clone()).await.unwrap(); - let row4 = db.read(tx1, 1).await.unwrap().unwrap(); + let row4 = db + .read( + tx1, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .await + .unwrap() + .unwrap(); assert_eq!(row3, row4); db.rollback_tx(tx1).await; let tx2 = db.begin_tx().await; - let row5 = db.read(tx2, 1).await.unwrap(); + let row5 = db + .read( + tx2, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .await + .unwrap(); assert_eq!(row5, None); } @@ -727,22 +906,48 @@ mod tests { // T1 inserts a row with ID 1, but does not commit. let tx1 = db.begin_tx().await; let tx1_row = Row { - id: 1, + id: RowID { + table_id: 1, + row_id: 1, + }, data: "Hello".to_string(), }; db.insert(tx1, tx1_row.clone()).await.unwrap(); - let row = db.read(tx1, 1).await.unwrap().unwrap(); + let row = db + .read( + tx1, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .await + .unwrap() + .unwrap(); assert_eq!(tx1_row, row); // T2 attempts to delete row with ID 1, but fails because T1 has not committed. let tx2 = db.begin_tx().await; let tx2_row = Row { - id: 1, + id: RowID { + table_id: 1, + row_id: 1, + }, data: "World".to_string(), }; assert!(!db.update(tx2, tx2_row).await.unwrap()); - let row = db.read(tx1, 1).await.unwrap().unwrap(); + let row = db + .read( + tx1, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .await + .unwrap() + .unwrap(); assert_eq!(tx1_row, row); } @@ -754,14 +959,26 @@ mod tests { // T1 inserts a row with ID 1, but does not commit. let tx1 = db.begin_tx().await; let row1 = Row { - id: 1, + id: RowID { + table_id: 1, + row_id: 1, + }, data: "Hello".to_string(), }; db.insert(tx1, row1).await.unwrap(); // T2 attempts to read row with ID 1, but doesn't see one because T1 has not committed. let tx2 = db.begin_tx().await; - let row2 = db.read(tx2, 1).await.unwrap(); + let row2 = db + .read( + tx2, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .await + .unwrap(); assert_eq!(row2, None); } @@ -774,7 +991,10 @@ mod tests { // T1 inserts a row with ID 1 and commits. let tx1 = db.begin_tx().await; let tx1_row = Row { - id: 1, + id: RowID { + table_id: 1, + row_id: 1, + }, data: "Hello".to_string(), }; db.insert(tx1, tx1_row.clone()).await.unwrap(); @@ -782,11 +1002,30 @@ mod tests { // T2 deletes row with ID 1, but does not commit. let tx2 = db.begin_tx().await; - assert!(db.delete(tx2, 1).await.unwrap()); + assert!(db + .delete( + tx2, + RowID { + table_id: 1, + row_id: 1 + } + ) + .await + .unwrap()); // T3 reads row with ID 1, but doesn't see the delete because T2 hasn't committed. let tx3 = db.begin_tx().await; - let row = db.read(tx3, 1).await.unwrap().unwrap(); + let row = db + .read( + tx3, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .await + .unwrap() + .unwrap(); assert_eq!(tx1_row, row); } @@ -798,30 +1037,66 @@ mod tests { // T1 inserts a row with ID 1 and commits. let tx1 = db.begin_tx().await; let tx1_row = Row { - id: 1, + id: RowID { + table_id: 1, + row_id: 1, + }, data: "Hello".to_string(), }; db.insert(tx1, tx1_row.clone()).await.unwrap(); - let row = db.read(tx1, 1).await.unwrap().unwrap(); + let row = db + .read( + tx1, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .await + .unwrap() + .unwrap(); assert_eq!(tx1_row, row); db.commit_tx(tx1).await.unwrap(); // T2 reads the row with ID 1 within an active transaction. let tx2 = db.begin_tx().await; - let row = db.read(tx2, 1).await.unwrap().unwrap(); + let row = db + .read( + tx2, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .await + .unwrap() + .unwrap(); assert_eq!(tx1_row, row); // T3 updates the row and commits. let tx3 = db.begin_tx().await; let tx3_row = Row { - id: 1, + id: RowID { + table_id: 1, + row_id: 1, + }, data: "World".to_string(), }; db.update(tx3, tx3_row).await.unwrap(); db.commit_tx(tx3).await.unwrap(); // T2 still reads the same version of the row as before. - let row = db.read(tx2, 1).await.unwrap().unwrap(); + let row = db + .read( + tx2, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .await + .unwrap() + .unwrap(); assert_eq!(tx1_row, row); } @@ -833,18 +1108,34 @@ mod tests { // T1 inserts a row with ID 1 and commits. let tx1 = db.begin_tx().await; let tx1_row = Row { - id: 1, + id: RowID { + table_id: 1, + row_id: 1, + }, data: "Hello".to_string(), }; db.insert(tx1, tx1_row.clone()).await.unwrap(); - let row = db.read(tx1, 1).await.unwrap().unwrap(); + let row = db + .read( + tx1, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .await + .unwrap() + .unwrap(); assert_eq!(tx1_row, row); db.commit_tx(tx1).await.unwrap(); // T2 attempts to update row ID 1 within an active transaction. let tx2 = db.begin_tx().await; let tx2_row = Row { - id: 1, + id: RowID { + table_id: 1, + row_id: 1, + }, data: "World".to_string(), }; assert!(db.update(tx2, tx2_row.clone()).await.unwrap()); @@ -852,7 +1143,10 @@ mod tests { // T3 also attempts to update row ID 1 within an active transaction. let tx3 = db.begin_tx().await; let tx3_row = Row { - id: 1, + id: RowID { + table_id: 1, + row_id: 1, + }, data: "Hello, world!".to_string(), }; assert_eq!( @@ -864,7 +1158,17 @@ mod tests { assert_eq!(Err(DatabaseError::TxTerminated), db.commit_tx(tx3).await); let tx4 = db.begin_tx().await; - let row = db.read(tx4, 1).await.unwrap().unwrap(); + let row = db + .read( + tx4, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .await + .unwrap() + .unwrap(); assert_eq!(tx2_row, row); } @@ -878,7 +1182,10 @@ mod tests { // let's add $10 to my account since I like money let tx1 = db.begin_tx().await; let tx1_row = Row { - id: 1, + id: RowID { + table_id: 1, + row_id: 1, + }, data: "10".to_string(), }; db.insert(tx1, tx1_row.clone()).await.unwrap(); @@ -887,16 +1194,39 @@ mod tests { // but I like more money, so let me try adding $10 more let tx2 = db.begin_tx().await; let tx2_row = Row { - id: 1, + id: RowID { + table_id: 1, + row_id: 1, + }, data: "20".to_string(), }; assert!(db.update(tx2, tx2_row.clone()).await.unwrap()); - let row = db.read(tx2, 1).await.unwrap().unwrap(); + let row = db + .read( + tx2, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .await + .unwrap() + .unwrap(); assert_eq!(row, tx2_row); // can I check how much money I have? let tx3 = db.begin_tx().await; - let row = db.read(tx3, 1).await.unwrap().unwrap(); + let row = db + .read( + tx3, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .await + .unwrap() + .unwrap(); assert_eq!(tx1_row, row); } @@ -910,18 +1240,39 @@ mod tests { let tx2 = db.begin_tx().await; let tx2_row = Row { - id: 1, + id: RowID { + table_id: 1, + row_id: 1, + }, data: "10".to_string(), }; db.insert(tx2, tx2_row.clone()).await.unwrap(); // transaction in progress, so tx1 shouldn't be able to see the value - let row = db.read(tx1, 1).await.unwrap(); + let row = db + .read( + tx1, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .await + .unwrap(); assert_eq!(row, None); // lets commit the transaction and check if tx1 can see it db.commit_tx(tx2).await.unwrap(); - let row = db.read(tx1, 1).await.unwrap(); + let row = db + .read( + tx1, + RowID { + table_id: 1, + row_id: 1, + }, + ) + .await + .unwrap(); assert_eq!(row, None); } @@ -947,7 +1298,10 @@ mod tests { db.insert( tx3, Row { - id: 1, + id: RowID { + table_id: 1, + row_id: 1, + }, data: "testme".to_string(), }, ) @@ -962,7 +1316,10 @@ mod tests { db.insert( tx4, Row { - id: 2, + id: RowID { + table_id: 1, + row_id: 2, + }, data: "testme2".to_string(), }, ) @@ -971,16 +1328,58 @@ mod tests { db.insert( tx4, Row { - id: 3, + id: RowID { + table_id: 1, + row_id: 3, + }, data: "testme3".to_string(), }, ) .await .unwrap(); - assert_eq!(db.read(tx4, 1).await.unwrap().unwrap().data, "testme"); - assert_eq!(db.read(tx4, 2).await.unwrap().unwrap().data, "testme2"); - assert_eq!(db.read(tx4, 3).await.unwrap().unwrap().data, "testme3"); + assert_eq!( + db.read( + tx4, + RowID { + table_id: 1, + row_id: 1 + } + ) + .await + .unwrap() + .unwrap() + .data, + "testme" + ); + assert_eq!( + db.read( + tx4, + RowID { + table_id: 1, + row_id: 2 + } + ) + .await + .unwrap() + .unwrap() + .data, + "testme2" + ); + assert_eq!( + db.read( + tx4, + RowID { + table_id: 1, + row_id: 3 + } + ) + .await + .unwrap() + .unwrap() + .data, + "testme3" + ); db.commit_tx(tx4).await.unwrap(); let clock = LocalClock::new(); @@ -990,9 +1389,58 @@ mod tests { println!("{:#?}", db); let tx5 = db.begin_tx().await; - println!("{:#?}", db.read(tx5, 1).await); - assert_eq!(db.read(tx5, 1).await.unwrap().unwrap().data, "testme"); - assert_eq!(db.read(tx5, 2).await.unwrap().unwrap().data, "testme2"); - assert_eq!(db.read(tx5, 3).await.unwrap().unwrap().data, "testme3"); + println!( + "{:#?}", + db.read( + tx5, + RowID { + table_id: 1, + row_id: 1 + } + ) + .await + ); + assert_eq!( + db.read( + tx5, + RowID { + table_id: 1, + row_id: 1 + } + ) + .await + .unwrap() + .unwrap() + .data, + "testme" + ); + assert_eq!( + db.read( + tx5, + RowID { + table_id: 1, + row_id: 2 + } + ) + .await + .unwrap() + .unwrap() + .data, + "testme2" + ); + assert_eq!( + db.read( + tx5, + RowID { + table_id: 1, + row_id: 3 + } + ) + .await + .unwrap() + .unwrap() + .data, + "testme3" + ); } } diff --git a/core/mvcc/mvcc-rs/tests/concurrency_test.rs b/core/mvcc/mvcc-rs/tests/concurrency_test.rs index 0626baab3..f1806d4ae 100644 --- a/core/mvcc/mvcc-rs/tests/concurrency_test.rs +++ b/core/mvcc/mvcc-rs/tests/concurrency_test.rs @@ -1,5 +1,5 @@ use mvcc_rs::clock::LocalClock; -use mvcc_rs::database::{Database, Row}; +use mvcc_rs::database::{Database, Row, RowID}; use shuttle::sync::atomic::AtomicU64; use shuttle::sync::Arc; use shuttle::thread; @@ -22,6 +22,10 @@ fn test_non_overlapping_concurrent_inserts() { shuttle::future::block_on(async move { let tx = db.begin_tx().await; let id = ids.fetch_add(1, Ordering::SeqCst); + let id = RowID { + table_id: 1, + row_id: id, + }; let row = Row { id, data: "Hello".to_string(), @@ -42,6 +46,10 @@ fn test_non_overlapping_concurrent_inserts() { shuttle::future::block_on(async move { let tx = db.begin_tx().await; let id = ids.fetch_add(1, Ordering::SeqCst); + let id = RowID { + table_id: 1, + row_id: id, + }; let row = Row { id, data: "World".to_string(),