In order to prepare for #3, the APIs are made asynchronous.
It also applies to tests and benches.
This commit is contained in:
Piotr Sarna
2023-04-14 13:38:18 +02:00
parent 620b8c6362
commit bfe3bcef71
4 changed files with 177 additions and 168 deletions

View File

@@ -5,14 +5,16 @@ edition = "2021"
[dependencies]
anyhow = "1.0.70"
futures = "0.3.28"
parking_lot = "0.12.1"
thiserror = "1.0.40"
tracing = "0.1.37"
[dev-dependencies]
criterion = { version = "0.4", features = ["html_reports"] }
criterion = { version = "0.4", features = ["html_reports", "async", "async_futures"] }
pprof = { version = "0.11.1", features = ["criterion", "flamegraph"] }
shuttle = "0.6.0"
tokio = { version = "1.27.0", features = ["full"] }
tracing-subscriber = "0"
tracing-test = "0"

View File

@@ -1,3 +1,4 @@
use criterion::async_executor::FuturesExecutor;
use criterion::{criterion_group, criterion_main, Criterion, Throughput};
use mvcc_rs::clock::LocalClock;
use mvcc_rs::database::{Database, Row};
@@ -10,44 +11,44 @@ fn bench(c: &mut Criterion) {
let clock = LocalClock::default();
let db = Database::new(clock);
group.bench_function("begin_tx", |b| {
b.iter(|| {
db.begin_tx();
b.to_async(FuturesExecutor).iter(|| async {
db.begin_tx().await;
})
});
let clock = LocalClock::default();
let db = Database::new(clock);
group.bench_function("begin_tx + rollback_tx", |b| {
b.iter(|| {
let tx_id = db.begin_tx();
db.rollback_tx(tx_id)
b.to_async(FuturesExecutor).iter(|| async {
let tx_id = db.begin_tx().await;
db.rollback_tx(tx_id).await
})
});
let clock = LocalClock::default();
let db = Database::new(clock);
group.bench_function("begin_tx + commit_tx", |b| {
b.iter(|| {
let tx_id = db.begin_tx();
db.commit_tx(tx_id)
b.to_async(FuturesExecutor).iter(|| async {
let tx_id = db.begin_tx().await;
db.commit_tx(tx_id).await
})
});
let clock = LocalClock::default();
let db = Database::new(clock);
group.bench_function("begin_tx-read-commit_tx", |b| {
b.iter(|| {
let tx_id = db.begin_tx();
db.read(tx_id, 1).unwrap();
db.commit_tx(tx_id)
b.to_async(FuturesExecutor).iter(|| async {
let tx_id = db.begin_tx().await;
db.read(tx_id, 1).await.unwrap();
db.commit_tx(tx_id).await
})
});
let clock = LocalClock::default();
let db = Database::new(clock);
group.bench_function("begin_tx-update-commit_tx", |b| {
b.iter(|| {
let tx_id = db.begin_tx();
b.to_async(FuturesExecutor).iter(|| async {
let tx_id = db.begin_tx().await;
db.update(
tx_id,
Row {
@@ -55,41 +56,42 @@ fn bench(c: &mut Criterion) {
data: "World".to_string(),
},
)
.await
.unwrap();
db.commit_tx(tx_id)
db.commit_tx(tx_id).await
})
});
let clock = LocalClock::default();
let db = Database::new(clock);
let tx = db.begin_tx();
db.insert(
let tx = futures::executor::block_on(db.begin_tx());
futures::executor::block_on(db.insert(
tx,
Row {
id: 1,
data: "Hello".to_string(),
},
)
))
.unwrap();
group.bench_function("read", |b| {
b.iter(|| {
db.read(tx, 1).unwrap();
b.to_async(FuturesExecutor).iter(|| async {
db.read(tx, 1).await.unwrap();
})
});
let clock = LocalClock::default();
let db = Database::new(clock);
let tx = db.begin_tx();
db.insert(
let tx = futures::executor::block_on(db.begin_tx());
futures::executor::block_on(db.insert(
tx,
Row {
id: 1,
data: "Hello".to_string(),
},
)
))
.unwrap();
group.bench_function("update", |b| {
b.iter(|| {
b.to_async(FuturesExecutor).iter(|| async {
db.update(
tx,
Row {
@@ -97,6 +99,7 @@ fn bench(c: &mut Criterion) {
data: "World".to_string(),
},
)
.await
.unwrap();
})
});

View File

@@ -1,8 +1,8 @@
use crate::clock::LogicalClock;
use crate::errors::DatabaseError;
use parking_lot::Mutex;
use std::cell::RefCell;
use std::collections::{HashMap, HashSet};
use parking_lot::Mutex;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
@@ -129,9 +129,9 @@ impl<Clock: LogicalClock> Database<Clock> {
/// * `tx_id` - the ID of the transaction in which to insert the new row.
/// * `row` - the row object containing the values to be inserted.
///
pub fn insert(&self, tx_id: TxID, row: Row) -> Result<()> {
pub async fn insert(&self, tx_id: TxID, row: Row) -> Result<()> {
let inner = self.inner.lock();
inner.insert(tx_id, row)
inner.insert(tx_id, row).await
}
/// Updates a row in the database with new values.
@@ -152,11 +152,11 @@ impl<Clock: LogicalClock> Database<Clock> {
/// # Returns
///
/// Returns `true` if the row was successfully updated, and `false` otherwise.
pub fn update(&self, tx_id: TxID, row: Row) -> Result<bool> {
if !self.delete(tx_id, row.id)? {
pub async fn update(&self, tx_id: TxID, row: Row) -> Result<bool> {
if !self.delete(tx_id, row.id).await? {
return Ok(false);
}
self.insert(tx_id, row)?;
self.insert(tx_id, row).await?;
Ok(true)
}
@@ -174,9 +174,9 @@ impl<Clock: LogicalClock> Database<Clock> {
///
/// Returns `true` if the row was successfully deleted, and `false` otherwise.
///
pub fn delete(&self, tx_id: TxID, id: u64) -> Result<bool> {
pub async fn delete(&self, tx_id: TxID, id: u64) -> Result<bool> {
let inner = self.inner.lock();
inner.delete(tx_id, id)
inner.delete(tx_id, id).await
}
/// Retrieves a row from the table with the given `id`.
@@ -193,9 +193,9 @@ impl<Clock: LogicalClock> Database<Clock> {
///
/// Returns `Some(row)` with the row data if the row with the given `id` exists,
/// and `None` otherwise.
pub fn read(&self, tx_id: TxID, id: u64) -> Result<Option<Row>> {
pub async fn read(&self, tx_id: TxID, id: u64) -> Result<Option<Row>> {
let inner = self.inner.lock();
inner.read(tx_id, id)
inner.read(tx_id, id).await
}
/// Begins a new transaction in the database.
@@ -203,9 +203,9 @@ impl<Clock: LogicalClock> Database<Clock> {
/// This function starts a new transaction in the database and returns a `TxID` value
/// that you can use to perform operations within the transaction. All changes made within the
/// transaction are isolated from other transactions until you commit the transaction.
pub fn begin_tx(&self) -> TxID {
pub async fn begin_tx(&self) -> TxID {
let mut inner = self.inner.lock();
inner.begin_tx()
inner.begin_tx().await
}
/// Commits a transaction with the specified transaction ID.
@@ -217,9 +217,9 @@ impl<Clock: LogicalClock> Database<Clock> {
/// # Arguments
///
/// * `tx_id` - The ID of the transaction to commit.
pub fn commit_tx(&self, tx_id: TxID) -> Result<()> {
pub async fn commit_tx(&self, tx_id: TxID) -> Result<()> {
let mut inner = self.inner.lock();
inner.commit_tx(tx_id)
inner.commit_tx(tx_id).await
}
/// Rolls back a transaction with the specified ID.
@@ -230,9 +230,9 @@ impl<Clock: LogicalClock> Database<Clock> {
/// # Arguments
///
/// * `tx_id` - The ID of the transaction to abort.
pub fn rollback_tx(&self, tx_id: TxID) {
pub async fn rollback_tx(&self, tx_id: TxID) {
let inner = self.inner.lock();
inner.rollback_tx(tx_id);
inner.rollback_tx(tx_id).await;
}
}
@@ -245,7 +245,7 @@ pub struct DatabaseInner<Clock: LogicalClock> {
}
impl<Clock: LogicalClock> DatabaseInner<Clock> {
fn insert(&self, tx_id: TxID, row: Row) -> Result<()> {
async fn insert(&self, tx_id: TxID, row: Row) -> Result<()> {
let mut txs = self.txs.borrow_mut();
let tx = txs
.get_mut(&tx_id)
@@ -263,7 +263,7 @@ impl<Clock: LogicalClock> DatabaseInner<Clock> {
Ok(())
}
fn delete(&self, tx_id: TxID, id: u64) -> Result<bool> {
async fn delete(&self, tx_id: TxID, id: u64) -> Result<bool> {
let mut rows = self.rows.borrow_mut();
let mut txs = self.txs.borrow_mut();
if let Some(row_versions) = rows.get_mut(&id) {
@@ -275,7 +275,7 @@ impl<Clock: LogicalClock> DatabaseInner<Clock> {
if is_write_write_conflict(&txs, tx, rv) {
drop(txs);
drop(rows);
self.rollback_tx(tx_id);
self.rollback_tx(tx_id).await;
return Err(DatabaseError::WriteWriteConflict);
}
if is_version_visible(&txs, tx, rv) {
@@ -291,7 +291,7 @@ impl<Clock: LogicalClock> DatabaseInner<Clock> {
Ok(false)
}
fn read(&self, tx_id: TxID, id: u64) -> Result<Option<Row>> {
async fn read(&self, tx_id: TxID, id: u64) -> Result<Option<Row>> {
let txs = self.txs.borrow_mut();
let tx = txs.get(&tx_id).unwrap();
assert!(tx.state == TransactionState::Active);
@@ -307,7 +307,7 @@ impl<Clock: LogicalClock> DatabaseInner<Clock> {
Ok(None)
}
fn begin_tx(&mut self) -> TxID {
async fn begin_tx(&mut self) -> TxID {
let tx_id = self.get_tx_id();
let begin_ts = self.get_timestamp();
let tx = Transaction::new(tx_id, begin_ts);
@@ -317,7 +317,7 @@ impl<Clock: LogicalClock> DatabaseInner<Clock> {
tx_id
}
fn commit_tx(&mut self, tx_id: TxID) -> Result<()> {
async fn commit_tx(&mut self, tx_id: TxID) -> Result<()> {
let end_ts = self.get_timestamp();
let mut txs = self.txs.borrow_mut();
let mut tx = txs.get_mut(&tx_id).unwrap();
@@ -351,7 +351,7 @@ impl<Clock: LogicalClock> DatabaseInner<Clock> {
Ok(())
}
fn rollback_tx(&self, tx_id: TxID) {
async fn rollback_tx(&self, tx_id: TxID) {
let mut txs = self.txs.borrow_mut();
let mut tx = txs.get_mut(&tx_id).unwrap();
assert!(tx.state == TransactionState::Active);
@@ -446,274 +446,274 @@ mod tests {
use tracing_test::traced_test;
#[traced_test]
#[test]
fn test_insert_read() {
#[tokio::test]
async fn test_insert_read() {
let clock = LocalClock::default();
let db = Database::new(clock);
let tx1 = db.begin_tx();
let tx1 = db.begin_tx().await;
let tx1_row = Row {
id: 1,
data: "Hello".to_string(),
};
db.insert(tx1, tx1_row.clone()).unwrap();
let row = db.read(tx1, 1).unwrap().unwrap();
db.insert(tx1, tx1_row.clone()).await.unwrap();
let row = db.read(tx1, 1).await.unwrap().unwrap();
assert_eq!(tx1_row, row);
db.commit_tx(tx1).unwrap();
db.commit_tx(tx1).await.unwrap();
let tx2 = db.begin_tx();
let row = db.read(tx2, 1).unwrap().unwrap();
let tx2 = db.begin_tx().await;
let row = db.read(tx2, 1).await.unwrap().unwrap();
assert_eq!(tx1_row, row);
}
#[traced_test]
#[test]
fn test_read_nonexistent() {
#[tokio::test]
async fn test_read_nonexistent() {
let clock = LocalClock::default();
let db = Database::new(clock);
let tx = db.begin_tx();
let row = db.read(tx, 1);
let tx = db.begin_tx().await;
let row = db.read(tx, 1).await;
assert!(row.unwrap().is_none());
}
#[traced_test]
#[test]
fn test_delete() {
#[tokio::test]
async fn test_delete() {
let clock = LocalClock::default();
let db = Database::new(clock);
let tx1 = db.begin_tx();
let tx1 = db.begin_tx().await;
let tx1_row = Row {
id: 1,
data: "Hello".to_string(),
};
db.insert(tx1, tx1_row.clone()).unwrap();
let row = db.read(tx1, 1).unwrap().unwrap();
db.insert(tx1, tx1_row.clone()).await.unwrap();
let row = db.read(tx1, 1).await.unwrap().unwrap();
assert_eq!(tx1_row, row);
db.delete(tx1, 1).unwrap();
let row = db.read(tx1, 1).unwrap();
db.delete(tx1, 1).await.unwrap();
let row = db.read(tx1, 1).await.unwrap();
assert!(row.is_none());
db.commit_tx(tx1).unwrap();
db.commit_tx(tx1).await.unwrap();
let tx2 = db.begin_tx();
let row = db.read(tx2, 1).unwrap();
let tx2 = db.begin_tx().await;
let row = db.read(tx2, 1).await.unwrap();
assert!(row.is_none());
}
#[traced_test]
#[test]
fn test_delete_nonexistent() {
#[tokio::test]
async fn test_delete_nonexistent() {
let clock = LocalClock::default();
let db = Database::new(clock);
let tx = db.begin_tx();
assert!(!db.delete(tx, 1).unwrap());
let tx = db.begin_tx().await;
assert!(!db.delete(tx, 1).await.unwrap());
}
#[traced_test]
#[test]
fn test_commit() {
#[tokio::test]
async fn test_commit() {
let clock = LocalClock::default();
let db = Database::new(clock);
let tx1 = db.begin_tx();
let tx1 = db.begin_tx().await;
let tx1_row = Row {
id: 1,
data: "Hello".to_string(),
};
db.insert(tx1, tx1_row.clone()).unwrap();
let row = db.read(tx1, 1).unwrap().unwrap();
db.insert(tx1, tx1_row.clone()).await.unwrap();
let row = db.read(tx1, 1).await.unwrap().unwrap();
assert_eq!(tx1_row, row);
let tx1_updated_row = Row {
id: 1,
data: "World".to_string(),
};
db.update(tx1, tx1_updated_row.clone()).unwrap();
let row = db.read(tx1, 1).unwrap().unwrap();
db.update(tx1, tx1_updated_row.clone()).await.unwrap();
let row = db.read(tx1, 1).await.unwrap().unwrap();
assert_eq!(tx1_updated_row, row);
db.commit_tx(tx1).unwrap();
db.commit_tx(tx1).await.unwrap();
let tx2 = db.begin_tx();
let row = db.read(tx2, 1).unwrap().unwrap();
db.commit_tx(tx2).unwrap();
let tx2 = db.begin_tx().await;
let row = db.read(tx2, 1).await.unwrap().unwrap();
db.commit_tx(tx2).await.unwrap();
assert_eq!(tx1_updated_row, row);
}
#[traced_test]
#[test]
fn test_rollback() {
#[tokio::test]
async fn test_rollback() {
let clock = LocalClock::default();
let db = Database::new(clock);
let tx1 = db.begin_tx();
let tx1 = db.begin_tx().await;
let row1 = Row {
id: 1,
data: "Hello".to_string(),
};
db.insert(tx1, row1.clone()).unwrap();
let row2 = db.read(tx1, 1).unwrap().unwrap();
db.insert(tx1, row1.clone()).await.unwrap();
let row2 = db.read(tx1, 1).await.unwrap().unwrap();
assert_eq!(row1, row2);
let row3 = Row {
id: 1,
data: "World".to_string(),
};
db.update(tx1, row3.clone()).unwrap();
let row4 = db.read(tx1, 1).unwrap().unwrap();
db.update(tx1, row3.clone()).await.unwrap();
let row4 = db.read(tx1, 1).await.unwrap().unwrap();
assert_eq!(row3, row4);
db.rollback_tx(tx1);
let tx2 = db.begin_tx();
let row5 = db.read(tx2, 1).unwrap();
db.rollback_tx(tx1).await;
let tx2 = db.begin_tx().await;
let row5 = db.read(tx2, 1).await.unwrap();
assert_eq!(row5, None);
}
#[traced_test]
#[test]
fn test_dirty_write() {
#[tokio::test]
async fn test_dirty_write() {
let clock = LocalClock::default();
let db = Database::new(clock);
// T1 inserts a row with ID 1, but does not commit.
let tx1 = db.begin_tx();
let tx1 = db.begin_tx().await;
let tx1_row = Row {
id: 1,
data: "Hello".to_string(),
};
db.insert(tx1, tx1_row.clone()).unwrap();
let row = db.read(tx1, 1).unwrap().unwrap();
db.insert(tx1, tx1_row.clone()).await.unwrap();
let row = db.read(tx1, 1).await.unwrap().unwrap();
assert_eq!(tx1_row, row);
// T2 attempts to delete row with ID 1, but fails because T1 has not committed.
let tx2 = db.begin_tx();
let tx2 = db.begin_tx().await;
let tx2_row = Row {
id: 1,
data: "World".to_string(),
};
assert!(!db.update(tx2, tx2_row).unwrap());
assert!(!db.update(tx2, tx2_row).await.unwrap());
let row = db.read(tx1, 1).unwrap().unwrap();
let row = db.read(tx1, 1).await.unwrap().unwrap();
assert_eq!(tx1_row, row);
}
#[traced_test]
#[test]
fn test_dirty_read() {
#[tokio::test]
async fn test_dirty_read() {
let clock = LocalClock::default();
let db = Database::new(clock);
// T1 inserts a row with ID 1, but does not commit.
let tx1 = db.begin_tx();
let tx1 = db.begin_tx().await;
let row1 = Row {
id: 1,
data: "Hello".to_string(),
};
db.insert(tx1, row1).unwrap();
db.insert(tx1, row1).await.unwrap();
// T2 attempts to read row with ID 1, but doesn't see one because T1 has not committed.
let tx2 = db.begin_tx();
let row2 = db.read(tx2, 1).unwrap();
let tx2 = db.begin_tx().await;
let row2 = db.read(tx2, 1).await.unwrap();
assert_eq!(row2, None);
}
#[ignore]
#[traced_test]
#[test]
fn test_dirty_read_deleted() {
#[tokio::test]
async fn test_dirty_read_deleted() {
let clock = LocalClock::default();
let db = Database::new(clock);
// T1 inserts a row with ID 1 and commits.
let tx1 = db.begin_tx();
let tx1 = db.begin_tx().await;
let tx1_row = Row {
id: 1,
data: "Hello".to_string(),
};
db.insert(tx1, tx1_row.clone()).unwrap();
db.commit_tx(tx1).unwrap();
db.insert(tx1, tx1_row.clone()).await.unwrap();
db.commit_tx(tx1).await.unwrap();
// T2 deletes row with ID 1, but does not commit.
let tx2 = db.begin_tx();
assert!(db.delete(tx2, 1).unwrap());
let tx2 = db.begin_tx().await;
assert!(db.delete(tx2, 1).await.unwrap());
// T3 reads row with ID 1, but doesn't see the delete because T2 hasn't committed.
let tx3 = db.begin_tx();
let row = db.read(tx3, 1).unwrap().unwrap();
let tx3 = db.begin_tx().await;
let row = db.read(tx3, 1).await.unwrap().unwrap();
assert_eq!(tx1_row, row);
}
#[traced_test]
#[test]
fn test_fuzzy_read() {
#[tokio::test]
async fn test_fuzzy_read() {
let clock = LocalClock::default();
let db = Database::new(clock);
// T1 inserts a row with ID 1 and commits.
let tx1 = db.begin_tx();
let tx1 = db.begin_tx().await;
let tx1_row = Row {
id: 1,
data: "Hello".to_string(),
};
db.insert(tx1, tx1_row.clone()).unwrap();
let row = db.read(tx1, 1).unwrap().unwrap();
db.insert(tx1, tx1_row.clone()).await.unwrap();
let row = db.read(tx1, 1).await.unwrap().unwrap();
assert_eq!(tx1_row, row);
db.commit_tx(tx1).unwrap();
db.commit_tx(tx1).await.unwrap();
// T2 reads the row with ID 1 within an active transaction.
let tx2 = db.begin_tx();
let row = db.read(tx2, 1).unwrap().unwrap();
let tx2 = db.begin_tx().await;
let row = db.read(tx2, 1).await.unwrap().unwrap();
assert_eq!(tx1_row, row);
// T3 updates the row and commits.
let tx3 = db.begin_tx();
let tx3 = db.begin_tx().await;
let tx3_row = Row {
id: 1,
data: "World".to_string(),
};
db.update(tx3, tx3_row).unwrap();
db.commit_tx(tx3).unwrap();
db.update(tx3, tx3_row).await.unwrap();
db.commit_tx(tx3).await.unwrap();
// T2 still reads the same version of the row as before.
let row = db.read(tx2, 1).unwrap().unwrap();
let row = db.read(tx2, 1).await.unwrap().unwrap();
assert_eq!(tx1_row, row);
}
#[traced_test]
#[test]
fn test_lost_update() {
#[tokio::test]
async fn test_lost_update() {
let clock = LocalClock::default();
let db = Database::new(clock);
// T1 inserts a row with ID 1 and commits.
let tx1 = db.begin_tx();
let tx1 = db.begin_tx().await;
let tx1_row = Row {
id: 1,
data: "Hello".to_string(),
};
db.insert(tx1, tx1_row.clone()).unwrap();
let row = db.read(tx1, 1).unwrap().unwrap();
db.insert(tx1, tx1_row.clone()).await.unwrap();
let row = db.read(tx1, 1).await.unwrap().unwrap();
assert_eq!(tx1_row, row);
db.commit_tx(tx1).unwrap();
db.commit_tx(tx1).await.unwrap();
// T2 attempts to update row ID 1 within an active transaction.
let tx2 = db.begin_tx();
let tx2 = db.begin_tx().await;
let tx2_row = Row {
id: 1,
data: "World".to_string(),
};
assert!(db.update(tx2, tx2_row.clone()).unwrap());
assert!(db.update(tx2, tx2_row.clone()).await.unwrap());
// T3 also attempts to update row ID 1 within an active transaction.
let tx3 = db.begin_tx();
let tx3 = db.begin_tx().await;
let tx3_row = Row {
id: 1,
data: "Hello, world!".to_string(),
};
assert_eq!(
Err(DatabaseError::WriteWriteConflict),
db.update(tx3, tx3_row)
db.update(tx3, tx3_row).await
);
db.commit_tx(tx2).unwrap();
assert_eq!(Err(DatabaseError::TxTerminated), db.commit_tx(tx3));
db.commit_tx(tx2).await.unwrap();
assert_eq!(Err(DatabaseError::TxTerminated), db.commit_tx(tx3).await);
let tx4 = db.begin_tx();
let row = db.read(tx4, 1).unwrap().unwrap();
let tx4 = db.begin_tx().await;
let row = db.read(tx4, 1).await.unwrap().unwrap();
assert_eq!(tx2_row, row);
}
}

View File

@@ -18,36 +18,40 @@ fn test_non_overlapping_concurrent_inserts() {
let db = db.clone();
let ids = ids.clone();
thread::spawn(move || {
let tx = db.begin_tx();
let id = ids.fetch_add(1, Ordering::SeqCst);
let row = Row {
id,
data: "Hello".to_string(),
};
db.insert(tx, row.clone()).unwrap();
db.commit_tx(tx).unwrap();
let tx = db.begin_tx();
let committed_row = db.read(tx, id).unwrap();
db.commit_tx(tx).unwrap();
assert_eq!(committed_row, Some(row));
shuttle::future::block_on(async move {
let tx = db.begin_tx().await;
let id = ids.fetch_add(1, Ordering::SeqCst);
let row = Row {
id,
data: "Hello".to_string(),
};
db.insert(tx, row.clone()).await.unwrap();
db.commit_tx(tx).await.unwrap();
let tx = db.begin_tx().await;
let committed_row = db.read(tx, id).await.unwrap();
db.commit_tx(tx).await.unwrap();
assert_eq!(committed_row, Some(row));
})
});
}
{
let db = db.clone();
let ids = ids.clone();
thread::spawn(move || {
let tx = db.begin_tx();
let id = ids.fetch_add(1, Ordering::SeqCst);
let row = Row {
id,
data: "World".to_string(),
};
db.insert(tx, row.clone()).unwrap();
db.commit_tx(tx).unwrap();
let tx = db.begin_tx();
let committed_row = db.read(tx, id).unwrap();
db.commit_tx(tx).unwrap();
assert_eq!(committed_row, Some(row));
shuttle::future::block_on(async move {
let tx = db.begin_tx().await;
let id = ids.fetch_add(1, Ordering::SeqCst);
let row = Row {
id,
data: "World".to_string(),
};
db.insert(tx, row.clone()).await.unwrap();
db.commit_tx(tx).await.unwrap();
let tx = db.begin_tx().await;
let committed_row = db.read(tx, id).await.unwrap();
db.commit_tx(tx).await.unwrap();
assert_eq!(committed_row, Some(row));
});
});
}
},