diff --git a/core/mvcc/database/Cargo.toml b/core/mvcc/database/Cargo.toml index 34b9c5792..366c9a36e 100644 --- a/core/mvcc/database/Cargo.toml +++ b/core/mvcc/database/Cargo.toml @@ -5,14 +5,16 @@ edition = "2021" [dependencies] anyhow = "1.0.70" +futures = "0.3.28" parking_lot = "0.12.1" thiserror = "1.0.40" tracing = "0.1.37" [dev-dependencies] -criterion = { version = "0.4", features = ["html_reports"] } +criterion = { version = "0.4", features = ["html_reports", "async", "async_futures"] } pprof = { version = "0.11.1", features = ["criterion", "flamegraph"] } shuttle = "0.6.0" +tokio = { version = "1.27.0", features = ["full"] } tracing-subscriber = "0" tracing-test = "0" diff --git a/core/mvcc/database/benches/my_benchmark.rs b/core/mvcc/database/benches/my_benchmark.rs index 8f67d6eac..56331d1ff 100644 --- a/core/mvcc/database/benches/my_benchmark.rs +++ b/core/mvcc/database/benches/my_benchmark.rs @@ -1,3 +1,4 @@ +use criterion::async_executor::FuturesExecutor; use criterion::{criterion_group, criterion_main, Criterion, Throughput}; use mvcc_rs::clock::LocalClock; use mvcc_rs::database::{Database, Row}; @@ -10,44 +11,44 @@ fn bench(c: &mut Criterion) { let clock = LocalClock::default(); let db = Database::new(clock); group.bench_function("begin_tx", |b| { - b.iter(|| { - db.begin_tx(); + b.to_async(FuturesExecutor).iter(|| async { + db.begin_tx().await; }) }); let clock = LocalClock::default(); let db = Database::new(clock); group.bench_function("begin_tx + rollback_tx", |b| { - b.iter(|| { - let tx_id = db.begin_tx(); - db.rollback_tx(tx_id) + b.to_async(FuturesExecutor).iter(|| async { + let tx_id = db.begin_tx().await; + db.rollback_tx(tx_id).await }) }); let clock = LocalClock::default(); let db = Database::new(clock); group.bench_function("begin_tx + commit_tx", |b| { - b.iter(|| { - let tx_id = db.begin_tx(); - db.commit_tx(tx_id) + b.to_async(FuturesExecutor).iter(|| async { + let tx_id = db.begin_tx().await; + db.commit_tx(tx_id).await }) }); let clock = LocalClock::default(); let db = Database::new(clock); group.bench_function("begin_tx-read-commit_tx", |b| { - b.iter(|| { - let tx_id = db.begin_tx(); - db.read(tx_id, 1).unwrap(); - db.commit_tx(tx_id) + b.to_async(FuturesExecutor).iter(|| async { + let tx_id = db.begin_tx().await; + db.read(tx_id, 1).await.unwrap(); + db.commit_tx(tx_id).await }) }); let clock = LocalClock::default(); let db = Database::new(clock); group.bench_function("begin_tx-update-commit_tx", |b| { - b.iter(|| { - let tx_id = db.begin_tx(); + b.to_async(FuturesExecutor).iter(|| async { + let tx_id = db.begin_tx().await; db.update( tx_id, Row { @@ -55,41 +56,42 @@ fn bench(c: &mut Criterion) { data: "World".to_string(), }, ) + .await .unwrap(); - db.commit_tx(tx_id) + db.commit_tx(tx_id).await }) }); let clock = LocalClock::default(); let db = Database::new(clock); - let tx = db.begin_tx(); - db.insert( + let tx = futures::executor::block_on(db.begin_tx()); + futures::executor::block_on(db.insert( tx, Row { id: 1, data: "Hello".to_string(), }, - ) + )) .unwrap(); group.bench_function("read", |b| { - b.iter(|| { - db.read(tx, 1).unwrap(); + b.to_async(FuturesExecutor).iter(|| async { + db.read(tx, 1).await.unwrap(); }) }); let clock = LocalClock::default(); let db = Database::new(clock); - let tx = db.begin_tx(); - db.insert( + let tx = futures::executor::block_on(db.begin_tx()); + futures::executor::block_on(db.insert( tx, Row { id: 1, data: "Hello".to_string(), }, - ) + )) .unwrap(); group.bench_function("update", |b| { - b.iter(|| { + b.to_async(FuturesExecutor).iter(|| async { db.update( tx, Row { @@ -97,6 +99,7 @@ fn bench(c: &mut Criterion) { data: "World".to_string(), }, ) + .await .unwrap(); }) }); diff --git a/core/mvcc/database/src/database.rs b/core/mvcc/database/src/database.rs index 6f196f291..fa0248915 100644 --- a/core/mvcc/database/src/database.rs +++ b/core/mvcc/database/src/database.rs @@ -1,8 +1,8 @@ use crate::clock::LogicalClock; use crate::errors::DatabaseError; +use parking_lot::Mutex; use std::cell::RefCell; use std::collections::{HashMap, HashSet}; -use parking_lot::Mutex; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; @@ -129,9 +129,9 @@ impl Database { /// * `tx_id` - the ID of the transaction in which to insert the new row. /// * `row` - the row object containing the values to be inserted. /// - pub fn insert(&self, tx_id: TxID, row: Row) -> Result<()> { + pub async fn insert(&self, tx_id: TxID, row: Row) -> Result<()> { let inner = self.inner.lock(); - inner.insert(tx_id, row) + inner.insert(tx_id, row).await } /// Updates a row in the database with new values. @@ -152,11 +152,11 @@ impl Database { /// # Returns /// /// Returns `true` if the row was successfully updated, and `false` otherwise. - pub fn update(&self, tx_id: TxID, row: Row) -> Result { - if !self.delete(tx_id, row.id)? { + pub async fn update(&self, tx_id: TxID, row: Row) -> Result { + if !self.delete(tx_id, row.id).await? { return Ok(false); } - self.insert(tx_id, row)?; + self.insert(tx_id, row).await?; Ok(true) } @@ -174,9 +174,9 @@ impl Database { /// /// Returns `true` if the row was successfully deleted, and `false` otherwise. /// - pub fn delete(&self, tx_id: TxID, id: u64) -> Result { + pub async fn delete(&self, tx_id: TxID, id: u64) -> Result { let inner = self.inner.lock(); - inner.delete(tx_id, id) + inner.delete(tx_id, id).await } /// Retrieves a row from the table with the given `id`. @@ -193,9 +193,9 @@ impl Database { /// /// Returns `Some(row)` with the row data if the row with the given `id` exists, /// and `None` otherwise. - pub fn read(&self, tx_id: TxID, id: u64) -> Result> { + pub async fn read(&self, tx_id: TxID, id: u64) -> Result> { let inner = self.inner.lock(); - inner.read(tx_id, id) + inner.read(tx_id, id).await } /// Begins a new transaction in the database. @@ -203,9 +203,9 @@ impl Database { /// This function starts a new transaction in the database and returns a `TxID` value /// that you can use to perform operations within the transaction. All changes made within the /// transaction are isolated from other transactions until you commit the transaction. - pub fn begin_tx(&self) -> TxID { + pub async fn begin_tx(&self) -> TxID { let mut inner = self.inner.lock(); - inner.begin_tx() + inner.begin_tx().await } /// Commits a transaction with the specified transaction ID. @@ -217,9 +217,9 @@ impl Database { /// # Arguments /// /// * `tx_id` - The ID of the transaction to commit. - pub fn commit_tx(&self, tx_id: TxID) -> Result<()> { + pub async fn commit_tx(&self, tx_id: TxID) -> Result<()> { let mut inner = self.inner.lock(); - inner.commit_tx(tx_id) + inner.commit_tx(tx_id).await } /// Rolls back a transaction with the specified ID. @@ -230,9 +230,9 @@ impl Database { /// # Arguments /// /// * `tx_id` - The ID of the transaction to abort. - pub fn rollback_tx(&self, tx_id: TxID) { + pub async fn rollback_tx(&self, tx_id: TxID) { let inner = self.inner.lock(); - inner.rollback_tx(tx_id); + inner.rollback_tx(tx_id).await; } } @@ -245,7 +245,7 @@ pub struct DatabaseInner { } impl DatabaseInner { - fn insert(&self, tx_id: TxID, row: Row) -> Result<()> { + async fn insert(&self, tx_id: TxID, row: Row) -> Result<()> { let mut txs = self.txs.borrow_mut(); let tx = txs .get_mut(&tx_id) @@ -263,7 +263,7 @@ impl DatabaseInner { Ok(()) } - fn delete(&self, tx_id: TxID, id: u64) -> Result { + async fn delete(&self, tx_id: TxID, id: u64) -> Result { let mut rows = self.rows.borrow_mut(); let mut txs = self.txs.borrow_mut(); if let Some(row_versions) = rows.get_mut(&id) { @@ -275,7 +275,7 @@ impl DatabaseInner { if is_write_write_conflict(&txs, tx, rv) { drop(txs); drop(rows); - self.rollback_tx(tx_id); + self.rollback_tx(tx_id).await; return Err(DatabaseError::WriteWriteConflict); } if is_version_visible(&txs, tx, rv) { @@ -291,7 +291,7 @@ impl DatabaseInner { Ok(false) } - fn read(&self, tx_id: TxID, id: u64) -> Result> { + async fn read(&self, tx_id: TxID, id: u64) -> Result> { let txs = self.txs.borrow_mut(); let tx = txs.get(&tx_id).unwrap(); assert!(tx.state == TransactionState::Active); @@ -307,7 +307,7 @@ impl DatabaseInner { Ok(None) } - fn begin_tx(&mut self) -> TxID { + async fn begin_tx(&mut self) -> TxID { let tx_id = self.get_tx_id(); let begin_ts = self.get_timestamp(); let tx = Transaction::new(tx_id, begin_ts); @@ -317,7 +317,7 @@ impl DatabaseInner { tx_id } - fn commit_tx(&mut self, tx_id: TxID) -> Result<()> { + async fn commit_tx(&mut self, tx_id: TxID) -> Result<()> { let end_ts = self.get_timestamp(); let mut txs = self.txs.borrow_mut(); let mut tx = txs.get_mut(&tx_id).unwrap(); @@ -351,7 +351,7 @@ impl DatabaseInner { Ok(()) } - fn rollback_tx(&self, tx_id: TxID) { + async fn rollback_tx(&self, tx_id: TxID) { let mut txs = self.txs.borrow_mut(); let mut tx = txs.get_mut(&tx_id).unwrap(); assert!(tx.state == TransactionState::Active); @@ -446,274 +446,274 @@ mod tests { use tracing_test::traced_test; #[traced_test] - #[test] - fn test_insert_read() { + #[tokio::test] + async fn test_insert_read() { let clock = LocalClock::default(); let db = Database::new(clock); - let tx1 = db.begin_tx(); + let tx1 = db.begin_tx().await; let tx1_row = Row { id: 1, data: "Hello".to_string(), }; - db.insert(tx1, tx1_row.clone()).unwrap(); - let row = db.read(tx1, 1).unwrap().unwrap(); + db.insert(tx1, tx1_row.clone()).await.unwrap(); + let row = db.read(tx1, 1).await.unwrap().unwrap(); assert_eq!(tx1_row, row); - db.commit_tx(tx1).unwrap(); + db.commit_tx(tx1).await.unwrap(); - let tx2 = db.begin_tx(); - let row = db.read(tx2, 1).unwrap().unwrap(); + let tx2 = db.begin_tx().await; + let row = db.read(tx2, 1).await.unwrap().unwrap(); assert_eq!(tx1_row, row); } #[traced_test] - #[test] - fn test_read_nonexistent() { + #[tokio::test] + async fn test_read_nonexistent() { let clock = LocalClock::default(); let db = Database::new(clock); - let tx = db.begin_tx(); - let row = db.read(tx, 1); + let tx = db.begin_tx().await; + let row = db.read(tx, 1).await; assert!(row.unwrap().is_none()); } #[traced_test] - #[test] - fn test_delete() { + #[tokio::test] + async fn test_delete() { let clock = LocalClock::default(); let db = Database::new(clock); - let tx1 = db.begin_tx(); + let tx1 = db.begin_tx().await; let tx1_row = Row { id: 1, data: "Hello".to_string(), }; - db.insert(tx1, tx1_row.clone()).unwrap(); - let row = db.read(tx1, 1).unwrap().unwrap(); + db.insert(tx1, tx1_row.clone()).await.unwrap(); + let row = db.read(tx1, 1).await.unwrap().unwrap(); assert_eq!(tx1_row, row); - db.delete(tx1, 1).unwrap(); - let row = db.read(tx1, 1).unwrap(); + db.delete(tx1, 1).await.unwrap(); + let row = db.read(tx1, 1).await.unwrap(); assert!(row.is_none()); - db.commit_tx(tx1).unwrap(); + db.commit_tx(tx1).await.unwrap(); - let tx2 = db.begin_tx(); - let row = db.read(tx2, 1).unwrap(); + let tx2 = db.begin_tx().await; + let row = db.read(tx2, 1).await.unwrap(); assert!(row.is_none()); } #[traced_test] - #[test] - fn test_delete_nonexistent() { + #[tokio::test] + async fn test_delete_nonexistent() { let clock = LocalClock::default(); let db = Database::new(clock); - let tx = db.begin_tx(); - assert!(!db.delete(tx, 1).unwrap()); + let tx = db.begin_tx().await; + assert!(!db.delete(tx, 1).await.unwrap()); } #[traced_test] - #[test] - fn test_commit() { + #[tokio::test] + async fn test_commit() { let clock = LocalClock::default(); let db = Database::new(clock); - let tx1 = db.begin_tx(); + let tx1 = db.begin_tx().await; let tx1_row = Row { id: 1, data: "Hello".to_string(), }; - db.insert(tx1, tx1_row.clone()).unwrap(); - let row = db.read(tx1, 1).unwrap().unwrap(); + db.insert(tx1, tx1_row.clone()).await.unwrap(); + let row = db.read(tx1, 1).await.unwrap().unwrap(); assert_eq!(tx1_row, row); let tx1_updated_row = Row { id: 1, data: "World".to_string(), }; - db.update(tx1, tx1_updated_row.clone()).unwrap(); - let row = db.read(tx1, 1).unwrap().unwrap(); + db.update(tx1, tx1_updated_row.clone()).await.unwrap(); + let row = db.read(tx1, 1).await.unwrap().unwrap(); assert_eq!(tx1_updated_row, row); - db.commit_tx(tx1).unwrap(); + db.commit_tx(tx1).await.unwrap(); - let tx2 = db.begin_tx(); - let row = db.read(tx2, 1).unwrap().unwrap(); - db.commit_tx(tx2).unwrap(); + let tx2 = db.begin_tx().await; + let row = db.read(tx2, 1).await.unwrap().unwrap(); + db.commit_tx(tx2).await.unwrap(); assert_eq!(tx1_updated_row, row); } #[traced_test] - #[test] - fn test_rollback() { + #[tokio::test] + async fn test_rollback() { let clock = LocalClock::default(); let db = Database::new(clock); - let tx1 = db.begin_tx(); + let tx1 = db.begin_tx().await; let row1 = Row { id: 1, data: "Hello".to_string(), }; - db.insert(tx1, row1.clone()).unwrap(); - let row2 = db.read(tx1, 1).unwrap().unwrap(); + db.insert(tx1, row1.clone()).await.unwrap(); + let row2 = db.read(tx1, 1).await.unwrap().unwrap(); assert_eq!(row1, row2); let row3 = Row { id: 1, data: "World".to_string(), }; - db.update(tx1, row3.clone()).unwrap(); - let row4 = db.read(tx1, 1).unwrap().unwrap(); + db.update(tx1, row3.clone()).await.unwrap(); + let row4 = db.read(tx1, 1).await.unwrap().unwrap(); assert_eq!(row3, row4); - db.rollback_tx(tx1); - let tx2 = db.begin_tx(); - let row5 = db.read(tx2, 1).unwrap(); + db.rollback_tx(tx1).await; + let tx2 = db.begin_tx().await; + let row5 = db.read(tx2, 1).await.unwrap(); assert_eq!(row5, None); } #[traced_test] - #[test] - fn test_dirty_write() { + #[tokio::test] + async fn test_dirty_write() { let clock = LocalClock::default(); let db = Database::new(clock); // T1 inserts a row with ID 1, but does not commit. - let tx1 = db.begin_tx(); + let tx1 = db.begin_tx().await; let tx1_row = Row { id: 1, data: "Hello".to_string(), }; - db.insert(tx1, tx1_row.clone()).unwrap(); - let row = db.read(tx1, 1).unwrap().unwrap(); + db.insert(tx1, tx1_row.clone()).await.unwrap(); + let row = db.read(tx1, 1).await.unwrap().unwrap(); assert_eq!(tx1_row, row); // T2 attempts to delete row with ID 1, but fails because T1 has not committed. - let tx2 = db.begin_tx(); + let tx2 = db.begin_tx().await; let tx2_row = Row { id: 1, data: "World".to_string(), }; - assert!(!db.update(tx2, tx2_row).unwrap()); + assert!(!db.update(tx2, tx2_row).await.unwrap()); - let row = db.read(tx1, 1).unwrap().unwrap(); + let row = db.read(tx1, 1).await.unwrap().unwrap(); assert_eq!(tx1_row, row); } #[traced_test] - #[test] - fn test_dirty_read() { + #[tokio::test] + async fn test_dirty_read() { let clock = LocalClock::default(); let db = Database::new(clock); // T1 inserts a row with ID 1, but does not commit. - let tx1 = db.begin_tx(); + let tx1 = db.begin_tx().await; let row1 = Row { id: 1, data: "Hello".to_string(), }; - db.insert(tx1, row1).unwrap(); + db.insert(tx1, row1).await.unwrap(); // T2 attempts to read row with ID 1, but doesn't see one because T1 has not committed. - let tx2 = db.begin_tx(); - let row2 = db.read(tx2, 1).unwrap(); + let tx2 = db.begin_tx().await; + let row2 = db.read(tx2, 1).await.unwrap(); assert_eq!(row2, None); } #[ignore] #[traced_test] - #[test] - fn test_dirty_read_deleted() { + #[tokio::test] + async fn test_dirty_read_deleted() { let clock = LocalClock::default(); let db = Database::new(clock); // T1 inserts a row with ID 1 and commits. - let tx1 = db.begin_tx(); + let tx1 = db.begin_tx().await; let tx1_row = Row { id: 1, data: "Hello".to_string(), }; - db.insert(tx1, tx1_row.clone()).unwrap(); - db.commit_tx(tx1).unwrap(); + db.insert(tx1, tx1_row.clone()).await.unwrap(); + db.commit_tx(tx1).await.unwrap(); // T2 deletes row with ID 1, but does not commit. - let tx2 = db.begin_tx(); - assert!(db.delete(tx2, 1).unwrap()); + let tx2 = db.begin_tx().await; + assert!(db.delete(tx2, 1).await.unwrap()); // T3 reads row with ID 1, but doesn't see the delete because T2 hasn't committed. - let tx3 = db.begin_tx(); - let row = db.read(tx3, 1).unwrap().unwrap(); + let tx3 = db.begin_tx().await; + let row = db.read(tx3, 1).await.unwrap().unwrap(); assert_eq!(tx1_row, row); } #[traced_test] - #[test] - fn test_fuzzy_read() { + #[tokio::test] + async fn test_fuzzy_read() { let clock = LocalClock::default(); let db = Database::new(clock); // T1 inserts a row with ID 1 and commits. - let tx1 = db.begin_tx(); + let tx1 = db.begin_tx().await; let tx1_row = Row { id: 1, data: "Hello".to_string(), }; - db.insert(tx1, tx1_row.clone()).unwrap(); - let row = db.read(tx1, 1).unwrap().unwrap(); + db.insert(tx1, tx1_row.clone()).await.unwrap(); + let row = db.read(tx1, 1).await.unwrap().unwrap(); assert_eq!(tx1_row, row); - db.commit_tx(tx1).unwrap(); + db.commit_tx(tx1).await.unwrap(); // T2 reads the row with ID 1 within an active transaction. - let tx2 = db.begin_tx(); - let row = db.read(tx2, 1).unwrap().unwrap(); + let tx2 = db.begin_tx().await; + let row = db.read(tx2, 1).await.unwrap().unwrap(); assert_eq!(tx1_row, row); // T3 updates the row and commits. - let tx3 = db.begin_tx(); + let tx3 = db.begin_tx().await; let tx3_row = Row { id: 1, data: "World".to_string(), }; - db.update(tx3, tx3_row).unwrap(); - db.commit_tx(tx3).unwrap(); + db.update(tx3, tx3_row).await.unwrap(); + db.commit_tx(tx3).await.unwrap(); // T2 still reads the same version of the row as before. - let row = db.read(tx2, 1).unwrap().unwrap(); + let row = db.read(tx2, 1).await.unwrap().unwrap(); assert_eq!(tx1_row, row); } #[traced_test] - #[test] - fn test_lost_update() { + #[tokio::test] + async fn test_lost_update() { let clock = LocalClock::default(); let db = Database::new(clock); // T1 inserts a row with ID 1 and commits. - let tx1 = db.begin_tx(); + let tx1 = db.begin_tx().await; let tx1_row = Row { id: 1, data: "Hello".to_string(), }; - db.insert(tx1, tx1_row.clone()).unwrap(); - let row = db.read(tx1, 1).unwrap().unwrap(); + db.insert(tx1, tx1_row.clone()).await.unwrap(); + let row = db.read(tx1, 1).await.unwrap().unwrap(); assert_eq!(tx1_row, row); - db.commit_tx(tx1).unwrap(); + db.commit_tx(tx1).await.unwrap(); // T2 attempts to update row ID 1 within an active transaction. - let tx2 = db.begin_tx(); + let tx2 = db.begin_tx().await; let tx2_row = Row { id: 1, data: "World".to_string(), }; - assert!(db.update(tx2, tx2_row.clone()).unwrap()); + assert!(db.update(tx2, tx2_row.clone()).await.unwrap()); // T3 also attempts to update row ID 1 within an active transaction. - let tx3 = db.begin_tx(); + let tx3 = db.begin_tx().await; let tx3_row = Row { id: 1, data: "Hello, world!".to_string(), }; assert_eq!( Err(DatabaseError::WriteWriteConflict), - db.update(tx3, tx3_row) + db.update(tx3, tx3_row).await ); - db.commit_tx(tx2).unwrap(); - assert_eq!(Err(DatabaseError::TxTerminated), db.commit_tx(tx3)); + db.commit_tx(tx2).await.unwrap(); + assert_eq!(Err(DatabaseError::TxTerminated), db.commit_tx(tx3).await); - let tx4 = db.begin_tx(); - let row = db.read(tx4, 1).unwrap().unwrap(); + let tx4 = db.begin_tx().await; + let row = db.read(tx4, 1).await.unwrap().unwrap(); assert_eq!(tx2_row, row); } } diff --git a/core/mvcc/database/tests/concurrency_test.rs b/core/mvcc/database/tests/concurrency_test.rs index bc1e4f90a..7673afba7 100644 --- a/core/mvcc/database/tests/concurrency_test.rs +++ b/core/mvcc/database/tests/concurrency_test.rs @@ -18,36 +18,40 @@ fn test_non_overlapping_concurrent_inserts() { let db = db.clone(); let ids = ids.clone(); thread::spawn(move || { - let tx = db.begin_tx(); - let id = ids.fetch_add(1, Ordering::SeqCst); - let row = Row { - id, - data: "Hello".to_string(), - }; - db.insert(tx, row.clone()).unwrap(); - db.commit_tx(tx).unwrap(); - let tx = db.begin_tx(); - let committed_row = db.read(tx, id).unwrap(); - db.commit_tx(tx).unwrap(); - assert_eq!(committed_row, Some(row)); + shuttle::future::block_on(async move { + let tx = db.begin_tx().await; + let id = ids.fetch_add(1, Ordering::SeqCst); + let row = Row { + id, + data: "Hello".to_string(), + }; + db.insert(tx, row.clone()).await.unwrap(); + db.commit_tx(tx).await.unwrap(); + let tx = db.begin_tx().await; + let committed_row = db.read(tx, id).await.unwrap(); + db.commit_tx(tx).await.unwrap(); + assert_eq!(committed_row, Some(row)); + }) }); } { let db = db.clone(); let ids = ids.clone(); thread::spawn(move || { - let tx = db.begin_tx(); - let id = ids.fetch_add(1, Ordering::SeqCst); - let row = Row { - id, - data: "World".to_string(), - }; - db.insert(tx, row.clone()).unwrap(); - db.commit_tx(tx).unwrap(); - let tx = db.begin_tx(); - let committed_row = db.read(tx, id).unwrap(); - db.commit_tx(tx).unwrap(); - assert_eq!(committed_row, Some(row)); + shuttle::future::block_on(async move { + let tx = db.begin_tx().await; + let id = ids.fetch_add(1, Ordering::SeqCst); + let row = Row { + id, + data: "World".to_string(), + }; + db.insert(tx, row.clone()).await.unwrap(); + db.commit_tx(tx).await.unwrap(); + let tx = db.begin_tx().await; + let committed_row = db.read(tx, id).await.unwrap(); + db.commit_tx(tx).await.unwrap(); + assert_eq!(committed_row, Some(row)); + }); }); } },