Merge pull request #47 from penberg/resync

treewide: overhaul the API to be sync again
This commit is contained in:
Pekka Enberg
2023-05-17 13:59:46 +03:00
committed by GitHub
9 changed files with 274 additions and 333 deletions

View File

@@ -13,7 +13,6 @@ cbindgen = "0.24.0"
[dependencies]
base64 = "0.21.0"
mvcc-rs = { path = "../../mvcc-rs" }
tokio = { version = "1.27.0", features = ["full", "parking_lot"] }
tracing = "0.1.37"
tracing-subscriber = { version = "0" }

View File

@@ -20,7 +20,7 @@ type ScanCursor = cursor::ScanCursor<'static, Clock>;
static INIT_RUST_LOG: std::sync::Once = std::sync::Once::new();
async fn storage_for(main_db_path: &str) -> database::Result<Storage> {
fn storage_for(main_db_path: &str) -> database::Result<Storage> {
// TODO: let's accept an URL instead of main_db_path here, so we can
// pass custom S3 endpoints, options, etc.
if cfg!(feature = "json_on_disk_storage") {
@@ -29,7 +29,8 @@ async fn storage_for(main_db_path: &str) -> database::Result<Storage> {
}
if cfg!(feature = "s3_storage") {
tracing::info!("S3 storage for {main_db_path}");
return Storage::new_s3(s3::Options::with_create_bucket_if_not_exists(true)).await;
let options = s3::Options::with_create_bucket_if_not_exists(true);
return Storage::new_s3(options);
}
tracing::info!("No persistent storage for {main_db_path}");
Ok(Storage::new_noop())
@@ -52,10 +53,9 @@ pub unsafe extern "C" fn MVCCDatabaseOpen(path: *const std::ffi::c_char) -> MVCC
return MVCCDatabaseRef::null();
}
};
let runtime = tokio::runtime::Runtime::new().unwrap();
tracing::debug!("mvccrs: opening persistent storage for {main_db_path}");
let storage = match runtime.block_on(storage_for(main_db_path)) {
let storage = match storage_for(main_db_path) {
Ok(storage) => storage,
Err(e) => {
tracing::error!("Failed to open persistent storage: {e}");
@@ -64,11 +64,10 @@ pub unsafe extern "C" fn MVCCDatabaseOpen(path: *const std::ffi::c_char) -> MVCC
};
let db = Db::new(clock, storage);
runtime.block_on(db.recover()).ok();
db.recover().ok();
let ctx = DbContext { db, runtime };
let ctx = Box::leak(Box::new(ctx));
MVCCDatabaseRef::from(ctx)
let db = Box::leak(Box::new(DbContext { db }));
MVCCDatabaseRef::from(db)
}
#[no_mangle]
@@ -84,8 +83,7 @@ pub unsafe extern "C" fn MVCCDatabaseClose(db: MVCCDatabaseRef) {
#[no_mangle]
pub unsafe extern "C" fn MVCCTransactionBegin(db: MVCCDatabaseRef) -> u64 {
let db = db.get_ref();
let (db, runtime) = (&db.db, &db.runtime);
let tx_id = runtime.block_on(async move { db.begin_tx().await });
let tx_id = db.begin_tx();
tracing::debug!("MVCCTransactionBegin: {tx_id}");
tx_id
}
@@ -93,9 +91,8 @@ pub unsafe extern "C" fn MVCCTransactionBegin(db: MVCCDatabaseRef) -> u64 {
#[no_mangle]
pub unsafe extern "C" fn MVCCTransactionCommit(db: MVCCDatabaseRef, tx_id: u64) -> MVCCError {
let db = db.get_ref();
let (db, runtime) = (&db.db, &db.runtime);
tracing::debug!("MVCCTransactionCommit: {tx_id}");
match runtime.block_on(async move { db.commit_tx(tx_id).await }) {
match db.commit_tx(tx_id) {
Ok(()) => MVCCError::MVCC_OK,
Err(e) => {
tracing::error!("MVCCTransactionCommit: {e}");
@@ -107,9 +104,8 @@ pub unsafe extern "C" fn MVCCTransactionCommit(db: MVCCDatabaseRef, tx_id: u64)
#[no_mangle]
pub unsafe extern "C" fn MVCCTransactionRollback(db: MVCCDatabaseRef, tx_id: u64) -> MVCCError {
let db = db.get_ref();
let (db, runtime) = (&db.db, &db.runtime);
tracing::debug!("MVCCTransactionRollback: {tx_id}");
runtime.block_on(async move { db.rollback_tx(tx_id).await });
db.rollback_tx(tx_id);
MVCCError::MVCC_OK
}
@@ -132,11 +128,10 @@ pub unsafe extern "C" fn MVCCDatabaseInsert(
general_purpose::STANDARD.encode(value)
}
};
let (db, runtime) = (&db.db, &db.runtime);
let id = database::RowID { table_id, row_id };
let row = database::Row { id, data };
tracing::debug!("MVCCDatabaseInsert: {row:?}");
match runtime.block_on(async move { db.insert(tx_id, row).await }) {
match db.insert(tx_id, row) {
Ok(_) => {
tracing::debug!("MVCCDatabaseInsert: success");
MVCCError::MVCC_OK
@@ -158,29 +153,24 @@ pub unsafe extern "C" fn MVCCDatabaseRead(
value_len: *mut i64,
) -> MVCCError {
let db = db.get_ref();
let (db, runtime) = (&db.db, &db.runtime);
match runtime.block_on(async move {
match {
let id = database::RowID { table_id, row_id };
let maybe_row = db.read(tx_id, id).await?;
let maybe_row = db.read(tx_id, id);
match maybe_row {
Some(row) => {
Ok(Some(row)) => {
tracing::debug!("Found row {row:?}");
let str_len = row.data.len() + 1;
let value = std::ffi::CString::new(row.data.as_bytes()).map_err(|e| {
mvcc_rs::errors::DatabaseError::Io(format!(
"Failed to transform read data into CString: {e}"
))
})?;
let value = std::ffi::CString::new(row.data.as_bytes()).unwrap_or_default();
unsafe {
*value_ptr = value.into_raw() as *mut u8;
*value_len = str_len as i64;
}
}
None => unsafe { *value_len = -1 },
_ => unsafe { *value_len = -1 },
};
Ok::<(), mvcc_rs::errors::DatabaseError>(())
}) {
} {
Ok(_) => {
tracing::debug!("MVCCDatabaseRead: success");
MVCCError::MVCC_OK
@@ -209,11 +199,8 @@ pub unsafe extern "C" fn MVCCScanCursorOpen(
tracing::debug!("MVCCScanCursorOpen()");
// Reference is transmuted to &'static in order to be able to pass the cursor back to C.
// The contract with C is to never use a cursor after MVCCDatabaseClose() has been called.
let database = unsafe { std::mem::transmute::<&DbContext, &'static DbContext>(db.get_ref()) };
let (database, runtime) = (&database.db, &database.runtime);
match runtime
.block_on(async move { mvcc_rs::cursor::ScanCursor::new(database, tx_id, table_id).await })
{
let db = unsafe { std::mem::transmute::<&Db, &'static Db>(db.get_ref()) };
match mvcc_rs::cursor::ScanCursor::new(db, tx_id, table_id) {
Ok(cursor) => {
if cursor.is_empty() {
tracing::debug!("Cursor is empty");
@@ -223,7 +210,7 @@ pub unsafe extern "C" fn MVCCScanCursorOpen(
}
tracing::debug!("Cursor open: {cursor:?}");
MVCCScanCursorRef {
ptr: Box::into_raw(Box::new(ScanCursorContext { cursor, db })),
ptr: Box::into_raw(Box::new(ScanCursorContext { cursor })),
}
}
Err(e) => {
@@ -242,10 +229,8 @@ pub unsafe extern "C" fn MVCCScanCursorClose(cursor: MVCCScanCursorRef) {
tracing::debug!("warning: `cursor` is null in MVCCScanCursorClose()");
return;
}
let cursor_ctx = unsafe { Box::from_raw(cursor.ptr) };
let db_context = cursor_ctx.db.clone();
let runtime = &db_context.get_ref().runtime;
runtime.block_on(async move { cursor_ctx.cursor.close().await.ok() });
let cursor = unsafe { Box::from_raw(cursor.ptr) }.cursor;
cursor.close().ok();
}
#[no_mangle]
@@ -259,31 +244,24 @@ pub unsafe extern "C" fn MVCCScanCursorRead(
tracing::debug!("warning: `cursor` is null in MVCCScanCursorRead()");
return MVCCError::MVCC_IO_ERROR_READ;
}
let cursor_ctx = unsafe { &*cursor.ptr };
let runtime = &cursor_ctx.db.get_ref().runtime;
let cursor = &cursor_ctx.cursor;
let cursor = cursor.get_ref();
// TODO: deduplicate with MVCCDatabaseRead()
match runtime.block_on(async move {
let maybe_row = cursor.current_row().await?;
match {
let maybe_row = cursor.current_row();
match maybe_row {
Some(row) => {
Ok(Some(row)) => {
tracing::debug!("Found row {row:?}");
let str_len = row.data.len() + 1;
let value = std::ffi::CString::new(row.data.as_bytes()).map_err(|e| {
mvcc_rs::errors::DatabaseError::Io(format!(
"Failed to transform read data into CString: {e}"
))
})?;
let value = std::ffi::CString::new(row.data.as_bytes()).unwrap_or_default();
unsafe {
*value_ptr = value.into_raw() as *mut u8;
*value_len = str_len as i64;
}
}
None => unsafe { *value_len = -1 },
_ => unsafe { *value_len = -1 },
};
Ok::<(), mvcc_rs::errors::DatabaseError>(())
}) {
} {
Ok(_) => {
tracing::debug!("MVCCDatabaseRead: success");
MVCCError::MVCC_OK
@@ -297,8 +275,7 @@ pub unsafe extern "C" fn MVCCScanCursorRead(
#[no_mangle]
pub unsafe extern "C" fn MVCCScanCursorNext(cursor: MVCCScanCursorRef) -> std::ffi::c_int {
let cursor_ctx = unsafe { &mut *cursor.ptr };
let cursor = &mut cursor_ctx.cursor;
let cursor = cursor.get_ref_mut();
tracing::debug!("MVCCScanCursorNext(): {}", cursor.index);
if cursor.forward() {
tracing::debug!("Forwarded to {}", cursor.index);
@@ -311,8 +288,7 @@ pub unsafe extern "C" fn MVCCScanCursorNext(cursor: MVCCScanCursorRef) -> std::f
#[no_mangle]
pub unsafe extern "C" fn MVCCScanCursorPosition(cursor: MVCCScanCursorRef) -> u64 {
let cursor_ctx = unsafe { &mut *cursor.ptr };
let cursor = &mut cursor_ctx.cursor;
let cursor = cursor.get_ref();
cursor
.current_row_id()
.map(|row_id| row_id.row_id)

View File

@@ -17,14 +17,14 @@ impl MVCCDatabaseRef {
self.ptr.is_null()
}
pub fn get_ref(&self) -> &DbContext {
unsafe { &*(self.ptr) }
pub fn get_ref(&self) -> &Db {
&unsafe { &*(self.ptr) }.db
}
#[allow(clippy::mut_from_ref)]
pub fn get_ref_mut(&self) -> &mut DbContext {
pub fn get_ref_mut(&self) -> &mut Db {
let ptr_mut = self.ptr as *mut DbContext;
unsafe { &mut (*ptr_mut) }
&mut unsafe { &mut (*ptr_mut) }.db
}
}
@@ -44,12 +44,10 @@ impl From<&mut DbContext> for MVCCDatabaseRef {
pub struct DbContext {
pub(crate) db: Db,
pub(crate) runtime: tokio::runtime::Runtime,
}
pub struct ScanCursorContext {
pub cursor: crate::ScanCursor,
pub db: MVCCDatabaseRef,
pub(crate) cursor: crate::ScanCursor,
}
#[derive(Clone, Debug)]
@@ -57,3 +55,25 @@ pub struct ScanCursorContext {
pub struct MVCCScanCursorRef {
pub ptr: *mut ScanCursorContext,
}
impl MVCCScanCursorRef {
pub fn null() -> MVCCScanCursorRef {
MVCCScanCursorRef {
ptr: std::ptr::null_mut(),
}
}
pub fn is_null(&self) -> bool {
self.ptr.is_null()
}
pub fn get_ref(&self) -> &crate::ScanCursor {
&unsafe { &*(self.ptr) }.cursor
}
#[allow(clippy::mut_from_ref)]
pub fn get_ref_mut(&self) -> &mut crate::ScanCursor {
let ptr_mut = self.ptr as *mut ScanCursorContext;
&mut unsafe { &mut (*ptr_mut) }.cursor
}
}

View File

@@ -5,19 +5,16 @@ edition = "2021"
[dependencies]
anyhow = "1.0.70"
futures = "0.3.28"
thiserror = "1.0.40"
tracing = "0.1.37"
tokio = { version = "1.27.0", features = ["full", "parking_lot"] }
tokio-stream = { version = "0.1.12", features = ["io-util"] }
serde = { version = "1.0.160", features = ["derive"] }
serde_json = "1.0.96"
pin-project = "1.0.12"
tracing-subscriber = { version = "0", optional = true }
base64 = "0.21.0"
aws-sdk-s3 = "0.27.0"
aws-config = "0.55.2"
tokio-util = "0.7.8"
parking_lot = "0.12.1"
futures = "0.3.28"
[dev-dependencies]
criterion = { version = "0.4", features = ["html_reports", "async", "async_futures"] }

View File

@@ -17,30 +17,30 @@ fn bench(c: &mut Criterion) {
let db = bench_db();
group.bench_function("begin_tx", |b| {
b.to_async(FuturesExecutor).iter(|| async {
db.begin_tx().await;
db.begin_tx();
})
});
let db = bench_db();
group.bench_function("begin_tx + rollback_tx", |b| {
b.to_async(FuturesExecutor).iter(|| async {
let tx_id = db.begin_tx().await;
db.rollback_tx(tx_id).await
let tx_id = db.begin_tx();
db.rollback_tx(tx_id)
})
});
let db = bench_db();
group.bench_function("begin_tx + commit_tx", |b| {
b.to_async(FuturesExecutor).iter(|| async {
let tx_id = db.begin_tx().await;
db.commit_tx(tx_id).await
let tx_id = db.begin_tx();
db.commit_tx(tx_id)
})
});
let db = bench_db();
group.bench_function("begin_tx-read-commit_tx", |b| {
b.to_async(FuturesExecutor).iter(|| async {
let tx_id = db.begin_tx().await;
let tx_id = db.begin_tx();
db.read(
tx_id,
RowID {
@@ -48,16 +48,15 @@ fn bench(c: &mut Criterion) {
row_id: 1,
},
)
.await
.unwrap();
db.commit_tx(tx_id).await
db.commit_tx(tx_id)
})
});
let db = bench_db();
group.bench_function("begin_tx-update-commit_tx", |b| {
b.to_async(FuturesExecutor).iter(|| async {
let tx_id = db.begin_tx().await;
let tx_id = db.begin_tx();
db.update(
tx_id,
Row {
@@ -68,15 +67,14 @@ fn bench(c: &mut Criterion) {
data: "World".to_string(),
},
)
.await
.unwrap();
db.commit_tx(tx_id).await
db.commit_tx(tx_id)
})
});
let db = bench_db();
let tx = futures::executor::block_on(db.begin_tx());
futures::executor::block_on(db.insert(
let tx = db.begin_tx();
db.insert(
tx,
Row {
id: RowID {
@@ -85,7 +83,7 @@ fn bench(c: &mut Criterion) {
},
data: "Hello".to_string(),
},
))
)
.unwrap();
group.bench_function("read", |b| {
b.to_async(FuturesExecutor).iter(|| async {
@@ -96,14 +94,13 @@ fn bench(c: &mut Criterion) {
row_id: 1,
},
)
.await
.unwrap();
})
});
let db = bench_db();
let tx = futures::executor::block_on(db.begin_tx());
futures::executor::block_on(db.insert(
let tx = db.begin_tx();
db.insert(
tx,
Row {
id: RowID {
@@ -112,7 +109,7 @@ fn bench(c: &mut Criterion) {
},
data: "Hello".to_string(),
},
))
)
.unwrap();
group.bench_function("update", |b| {
b.to_async(FuturesExecutor).iter(|| async {
@@ -126,7 +123,6 @@ fn bench(c: &mut Criterion) {
data: "World".to_string(),
},
)
.await
.unwrap();
})
});

View File

@@ -10,12 +10,12 @@ pub struct ScanCursor<'a, Clock: LogicalClock> {
}
impl<'a, Clock: LogicalClock> ScanCursor<'a, Clock> {
pub async fn new(
pub fn new(
db: &'a Database<Clock>,
tx_id: u64,
table_id: u64,
) -> Result<ScanCursor<'a, Clock>> {
let row_ids = db.scan_row_ids_for_table(table_id).await?;
let row_ids = db.scan_row_ids_for_table(table_id)?;
Ok(Self {
db,
tx_id,
@@ -31,15 +31,15 @@ impl<'a, Clock: LogicalClock> ScanCursor<'a, Clock> {
Some(self.row_ids[self.index])
}
pub async fn current_row(&self) -> Result<Option<Row>> {
pub fn current_row(&self) -> Result<Option<Row>> {
if self.index >= self.row_ids.len() {
return Ok(None);
}
let id = self.row_ids[self.index];
self.db.read(self.tx_id, id).await
self.db.read(self.tx_id, id)
}
pub async fn close(self) -> Result<()> {
pub fn close(self) -> Result<()> {
Ok(())
}

View File

@@ -1,12 +1,12 @@
use crate::clock::LogicalClock;
use crate::errors::DatabaseError;
use crate::persistent_storage::Storage;
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use std::cell::RefCell;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tokio::sync::Mutex;
pub type Result<T> = std::result::Result<T, DatabaseError>;
@@ -156,9 +156,9 @@ impl<Clock: LogicalClock> Database<Clock> {
/// * `tx_id` - the ID of the transaction in which to insert the new row.
/// * `row` - the row object containing the values to be inserted.
///
pub async fn insert(&self, tx_id: TxID, row: Row) -> Result<()> {
let inner = self.inner.lock().await;
inner.insert(tx_id, row).await
pub fn insert(&self, tx_id: TxID, row: Row) -> Result<()> {
let inner = self.inner.lock();
inner.insert(tx_id, row)
}
/// Updates a row in the database with new values.
@@ -179,11 +179,11 @@ impl<Clock: LogicalClock> Database<Clock> {
/// # Returns
///
/// Returns `true` if the row was successfully updated, and `false` otherwise.
pub async fn update(&self, tx_id: TxID, row: Row) -> Result<bool> {
if !self.delete(tx_id, row.id).await? {
pub fn update(&self, tx_id: TxID, row: Row) -> Result<bool> {
if !self.delete(tx_id, row.id)? {
return Ok(false);
}
self.insert(tx_id, row).await?;
self.insert(tx_id, row)?;
Ok(true)
}
@@ -201,9 +201,9 @@ impl<Clock: LogicalClock> Database<Clock> {
///
/// Returns `true` if the row was successfully deleted, and `false` otherwise.
///
pub async fn delete(&self, tx_id: TxID, id: RowID) -> Result<bool> {
let inner = self.inner.lock().await;
inner.delete(tx_id, id).await
pub fn delete(&self, tx_id: TxID, id: RowID) -> Result<bool> {
let inner = self.inner.lock();
inner.delete(tx_id, id)
}
/// Retrieves a row from the table with the given `id`.
@@ -220,18 +220,18 @@ impl<Clock: LogicalClock> Database<Clock> {
///
/// Returns `Some(row)` with the row data if the row with the given `id` exists,
/// and `None` otherwise.
pub async fn read(&self, tx_id: TxID, id: RowID) -> Result<Option<Row>> {
let inner = self.inner.lock().await;
inner.read(tx_id, id).await
pub fn read(&self, tx_id: TxID, id: RowID) -> Result<Option<Row>> {
let inner = self.inner.lock();
inner.read(tx_id, id)
}
pub async fn scan_row_ids(&self) -> Result<Vec<RowID>> {
let inner = self.inner.lock().await;
pub fn scan_row_ids(&self) -> Result<Vec<RowID>> {
let inner = self.inner.lock();
inner.scan_row_ids()
}
pub async fn scan_row_ids_for_table(&self, table_id: u64) -> Result<Vec<RowID>> {
let inner = self.inner.lock().await;
pub fn scan_row_ids_for_table(&self, table_id: u64) -> Result<Vec<RowID>> {
let inner = self.inner.lock();
inner.scan_row_ids_for_table(table_id)
}
@@ -240,9 +240,9 @@ impl<Clock: LogicalClock> Database<Clock> {
/// This function starts a new transaction in the database and returns a `TxID` value
/// that you can use to perform operations within the transaction. All changes made within the
/// transaction are isolated from other transactions until you commit the transaction.
pub async fn begin_tx(&self) -> TxID {
let mut inner = self.inner.lock().await;
inner.begin_tx().await
pub fn begin_tx(&self) -> TxID {
let mut inner = self.inner.lock();
inner.begin_tx()
}
/// Commits a transaction with the specified transaction ID.
@@ -254,9 +254,9 @@ impl<Clock: LogicalClock> Database<Clock> {
/// # Arguments
///
/// * `tx_id` - The ID of the transaction to commit.
pub async fn commit_tx(&self, tx_id: TxID) -> Result<()> {
let mut inner = self.inner.lock().await;
inner.commit_tx(tx_id).await
pub fn commit_tx(&self, tx_id: TxID) -> Result<()> {
let mut inner = self.inner.lock();
inner.commit_tx(tx_id)
}
/// Rolls back a transaction with the specified ID.
@@ -267,23 +267,23 @@ impl<Clock: LogicalClock> Database<Clock> {
/// # Arguments
///
/// * `tx_id` - The ID of the transaction to abort.
pub async fn rollback_tx(&self, tx_id: TxID) {
let inner = self.inner.lock().await;
inner.rollback_tx(tx_id).await;
pub fn rollback_tx(&self, tx_id: TxID) {
let inner = self.inner.lock();
inner.rollback_tx(tx_id);
}
/// Drops all unused row versions from the database.
///
/// A version is considered unused if it is not visible to any active transaction
/// and it is not the most recent version of the row.
pub async fn drop_unused_row_versions(&self) {
let inner = self.inner.lock().await;
pub fn drop_unused_row_versions(&self) {
let inner = self.inner.lock();
inner.drop_unused_row_versions();
}
pub async fn recover(&self) -> Result<()> {
let inner = self.inner.lock().await;
inner.recover().await
pub fn recover(&self) -> Result<()> {
let inner = self.inner.lock();
inner.recover()
}
}
@@ -298,7 +298,7 @@ pub struct DatabaseInner<Clock: LogicalClock> {
}
impl<Clock: LogicalClock> DatabaseInner<Clock> {
async fn insert(&self, tx_id: TxID, row: Row) -> Result<()> {
fn insert(&self, tx_id: TxID, row: Row) -> Result<()> {
let mut txs = self.txs.borrow_mut();
let tx = txs
.get_mut(&tx_id)
@@ -317,7 +317,7 @@ impl<Clock: LogicalClock> DatabaseInner<Clock> {
}
#[allow(clippy::await_holding_refcell_ref)]
async fn delete(&self, tx_id: TxID, id: RowID) -> Result<bool> {
fn delete(&self, tx_id: TxID, id: RowID) -> Result<bool> {
// NOTICE: They *are* dropped before an await point!!! But the await is conditional,
// so I think clippy is just confused.
let mut txs = self.txs.borrow_mut();
@@ -331,7 +331,7 @@ impl<Clock: LogicalClock> DatabaseInner<Clock> {
if is_write_write_conflict(&txs, tx, rv) {
drop(txs);
drop(rows);
self.rollback_tx(tx_id).await;
self.rollback_tx(tx_id);
return Err(DatabaseError::WriteWriteConflict);
}
if is_version_visible(&txs, tx, rv) {
@@ -347,7 +347,7 @@ impl<Clock: LogicalClock> DatabaseInner<Clock> {
Ok(false)
}
async fn read(&self, tx_id: TxID, id: RowID) -> Result<Option<Row>> {
fn read(&self, tx_id: TxID, id: RowID) -> Result<Option<Row>> {
let txs = self.txs.borrow_mut();
let tx = txs.get(&tx_id).unwrap();
assert!(tx.state == TransactionState::Active);
@@ -385,7 +385,7 @@ impl<Clock: LogicalClock> DatabaseInner<Clock> {
.collect())
}
async fn begin_tx(&mut self) -> TxID {
fn begin_tx(&mut self) -> TxID {
let tx_id = self.get_tx_id();
let begin_ts = self.get_timestamp();
let tx = Transaction::new(tx_id, begin_ts);
@@ -397,8 +397,7 @@ impl<Clock: LogicalClock> DatabaseInner<Clock> {
tx_id
}
#[allow(clippy::await_holding_refcell_ref)]
async fn commit_tx(&mut self, tx_id: TxID) -> Result<()> {
fn commit_tx(&mut self, tx_id: TxID) -> Result<()> {
let end_ts = self.get_timestamp();
let mut txs = self.txs.borrow_mut();
let mut tx = txs.get_mut(&tx_id).unwrap();
@@ -449,12 +448,12 @@ impl<Clock: LogicalClock> DatabaseInner<Clock> {
drop(rows);
drop(txs);
if !log_record.row_versions.is_empty() {
self.storage.log_tx(log_record).await?;
self.storage.log_tx(log_record)?;
}
Ok(())
}
async fn rollback_tx(&self, tx_id: TxID) {
fn rollback_tx(&self, tx_id: TxID) {
let mut txs = self.txs.borrow_mut();
let mut tx = txs.get_mut(&tx_id).unwrap();
assert!(tx.state == TransactionState::Active);
@@ -529,8 +528,8 @@ impl<Clock: LogicalClock> DatabaseInner<Clock> {
}
}
pub async fn recover(&self) -> Result<()> {
let tx_log = self.storage.read_tx_log().await?;
pub fn recover(&self) -> Result<()> {
let tx_log = self.storage.read_tx_log()?;
for record in tx_log {
tracing::debug!("RECOVERING {:?}", record);
for version in record.row_versions {
@@ -617,11 +616,11 @@ mod tests {
}
#[traced_test]
#[tokio::test]
async fn test_insert_read() {
#[test]
fn test_insert_read() {
let db = test_db();
let tx1 = db.begin_tx().await;
let tx1 = db.begin_tx();
let tx1_row = Row {
id: RowID {
table_id: 1,
@@ -629,7 +628,7 @@ mod tests {
},
data: "Hello".to_string(),
};
db.insert(tx1, tx1_row.clone()).await.unwrap();
db.insert(tx1, tx1_row.clone()).unwrap();
let row = db
.read(
tx1,
@@ -638,13 +637,12 @@ mod tests {
row_id: 1,
},
)
.await
.unwrap()
.unwrap();
assert_eq!(tx1_row, row);
db.commit_tx(tx1).await.unwrap();
db.commit_tx(tx1).unwrap();
let tx2 = db.begin_tx().await;
let tx2 = db.begin_tx();
let row = db
.read(
tx2,
@@ -653,35 +651,32 @@ mod tests {
row_id: 1,
},
)
.await
.unwrap()
.unwrap();
assert_eq!(tx1_row, row);
}
#[traced_test]
#[tokio::test]
async fn test_read_nonexistent() {
#[test]
fn test_read_nonexistent() {
let db = test_db();
let tx = db.begin_tx().await;
let row = db
.read(
tx,
RowID {
table_id: 1,
row_id: 1,
},
)
.await;
let tx = db.begin_tx();
let row = db.read(
tx,
RowID {
table_id: 1,
row_id: 1,
},
);
assert!(row.unwrap().is_none());
}
#[traced_test]
#[tokio::test]
async fn test_delete() {
#[test]
fn test_delete() {
let db = test_db();
let tx1 = db.begin_tx().await;
let tx1 = db.begin_tx();
let tx1_row = Row {
id: RowID {
table_id: 1,
@@ -689,7 +684,7 @@ mod tests {
},
data: "Hello".to_string(),
};
db.insert(tx1, tx1_row.clone()).await.unwrap();
db.insert(tx1, tx1_row.clone()).unwrap();
let row = db
.read(
tx1,
@@ -698,7 +693,6 @@ mod tests {
row_id: 1,
},
)
.await
.unwrap()
.unwrap();
assert_eq!(tx1_row, row);
@@ -709,7 +703,6 @@ mod tests {
row_id: 1,
},
)
.await
.unwrap();
let row = db
.read(
@@ -719,12 +712,11 @@ mod tests {
row_id: 1,
},
)
.await
.unwrap();
assert!(row.is_none());
db.commit_tx(tx1).await.unwrap();
db.commit_tx(tx1).unwrap();
let tx2 = db.begin_tx().await;
let tx2 = db.begin_tx();
let row = db
.read(
tx2,
@@ -733,16 +725,15 @@ mod tests {
row_id: 1,
},
)
.await
.unwrap();
assert!(row.is_none());
}
#[traced_test]
#[tokio::test]
async fn test_delete_nonexistent() {
#[test]
fn test_delete_nonexistent() {
let db = test_db();
let tx = db.begin_tx().await;
let tx = db.begin_tx();
assert!(!db
.delete(
tx,
@@ -751,15 +742,14 @@ mod tests {
row_id: 1
}
)
.await
.unwrap());
}
#[traced_test]
#[tokio::test]
async fn test_commit() {
#[test]
fn test_commit() {
let db = test_db();
let tx1 = db.begin_tx().await;
let tx1 = db.begin_tx();
let tx1_row = Row {
id: RowID {
table_id: 1,
@@ -767,7 +757,7 @@ mod tests {
},
data: "Hello".to_string(),
};
db.insert(tx1, tx1_row.clone()).await.unwrap();
db.insert(tx1, tx1_row.clone()).unwrap();
let row = db
.read(
tx1,
@@ -776,7 +766,6 @@ mod tests {
row_id: 1,
},
)
.await
.unwrap()
.unwrap();
assert_eq!(tx1_row, row);
@@ -787,7 +776,7 @@ mod tests {
},
data: "World".to_string(),
};
db.update(tx1, tx1_updated_row.clone()).await.unwrap();
db.update(tx1, tx1_updated_row.clone()).unwrap();
let row = db
.read(
tx1,
@@ -796,13 +785,12 @@ mod tests {
row_id: 1,
},
)
.await
.unwrap()
.unwrap();
assert_eq!(tx1_updated_row, row);
db.commit_tx(tx1).await.unwrap();
db.commit_tx(tx1).unwrap();
let tx2 = db.begin_tx().await;
let tx2 = db.begin_tx();
let row = db
.read(
tx2,
@@ -811,19 +799,18 @@ mod tests {
row_id: 1,
},
)
.await
.unwrap()
.unwrap();
db.commit_tx(tx2).await.unwrap();
db.commit_tx(tx2).unwrap();
assert_eq!(tx1_updated_row, row);
db.drop_unused_row_versions().await;
db.drop_unused_row_versions();
}
#[traced_test]
#[tokio::test]
async fn test_rollback() {
#[test]
fn test_rollback() {
let db = test_db();
let tx1 = db.begin_tx().await;
let tx1 = db.begin_tx();
let row1 = Row {
id: RowID {
table_id: 1,
@@ -831,7 +818,7 @@ mod tests {
},
data: "Hello".to_string(),
};
db.insert(tx1, row1.clone()).await.unwrap();
db.insert(tx1, row1.clone()).unwrap();
let row2 = db
.read(
tx1,
@@ -840,7 +827,6 @@ mod tests {
row_id: 1,
},
)
.await
.unwrap()
.unwrap();
assert_eq!(row1, row2);
@@ -851,7 +837,7 @@ mod tests {
},
data: "World".to_string(),
};
db.update(tx1, row3.clone()).await.unwrap();
db.update(tx1, row3.clone()).unwrap();
let row4 = db
.read(
tx1,
@@ -860,12 +846,11 @@ mod tests {
row_id: 1,
},
)
.await
.unwrap()
.unwrap();
assert_eq!(row3, row4);
db.rollback_tx(tx1).await;
let tx2 = db.begin_tx().await;
db.rollback_tx(tx1);
let tx2 = db.begin_tx();
let row5 = db
.read(
tx2,
@@ -874,18 +859,17 @@ mod tests {
row_id: 1,
},
)
.await
.unwrap();
assert_eq!(row5, None);
}
#[traced_test]
#[tokio::test]
async fn test_dirty_write() {
#[test]
fn test_dirty_write() {
let db = test_db();
// T1 inserts a row with ID 1, but does not commit.
let tx1 = db.begin_tx().await;
let tx1 = db.begin_tx();
let tx1_row = Row {
id: RowID {
table_id: 1,
@@ -893,7 +877,7 @@ mod tests {
},
data: "Hello".to_string(),
};
db.insert(tx1, tx1_row.clone()).await.unwrap();
db.insert(tx1, tx1_row.clone()).unwrap();
let row = db
.read(
tx1,
@@ -902,13 +886,12 @@ mod tests {
row_id: 1,
},
)
.await
.unwrap()
.unwrap();
assert_eq!(tx1_row, row);
// T2 attempts to delete row with ID 1, but fails because T1 has not committed.
let tx2 = db.begin_tx().await;
let tx2 = db.begin_tx();
let tx2_row = Row {
id: RowID {
table_id: 1,
@@ -916,7 +899,7 @@ mod tests {
},
data: "World".to_string(),
};
assert!(!db.update(tx2, tx2_row).await.unwrap());
assert!(!db.update(tx2, tx2_row).unwrap());
let row = db
.read(
@@ -926,19 +909,18 @@ mod tests {
row_id: 1,
},
)
.await
.unwrap()
.unwrap();
assert_eq!(tx1_row, row);
}
#[traced_test]
#[tokio::test]
async fn test_dirty_read() {
#[test]
fn test_dirty_read() {
let db = test_db();
// T1 inserts a row with ID 1, but does not commit.
let tx1 = db.begin_tx().await;
let tx1 = db.begin_tx();
let row1 = Row {
id: RowID {
table_id: 1,
@@ -946,10 +928,10 @@ mod tests {
},
data: "Hello".to_string(),
};
db.insert(tx1, row1).await.unwrap();
db.insert(tx1, row1).unwrap();
// T2 attempts to read row with ID 1, but doesn't see one because T1 has not committed.
let tx2 = db.begin_tx().await;
let tx2 = db.begin_tx();
let row2 = db
.read(
tx2,
@@ -958,19 +940,18 @@ mod tests {
row_id: 1,
},
)
.await
.unwrap();
assert_eq!(row2, None);
}
#[ignore]
#[traced_test]
#[tokio::test]
async fn test_dirty_read_deleted() {
#[test]
fn test_dirty_read_deleted() {
let db = test_db();
// T1 inserts a row with ID 1 and commits.
let tx1 = db.begin_tx().await;
let tx1 = db.begin_tx();
let tx1_row = Row {
id: RowID {
table_id: 1,
@@ -978,11 +959,11 @@ mod tests {
},
data: "Hello".to_string(),
};
db.insert(tx1, tx1_row.clone()).await.unwrap();
db.commit_tx(tx1).await.unwrap();
db.insert(tx1, tx1_row.clone()).unwrap();
db.commit_tx(tx1).unwrap();
// T2 deletes row with ID 1, but does not commit.
let tx2 = db.begin_tx().await;
let tx2 = db.begin_tx();
assert!(db
.delete(
tx2,
@@ -991,11 +972,10 @@ mod tests {
row_id: 1
}
)
.await
.unwrap());
// T3 reads row with ID 1, but doesn't see the delete because T2 hasn't committed.
let tx3 = db.begin_tx().await;
let tx3 = db.begin_tx();
let row = db
.read(
tx3,
@@ -1004,19 +984,18 @@ mod tests {
row_id: 1,
},
)
.await
.unwrap()
.unwrap();
assert_eq!(tx1_row, row);
}
#[traced_test]
#[tokio::test]
async fn test_fuzzy_read() {
#[test]
fn test_fuzzy_read() {
let db = test_db();
// T1 inserts a row with ID 1 and commits.
let tx1 = db.begin_tx().await;
let tx1 = db.begin_tx();
let tx1_row = Row {
id: RowID {
table_id: 1,
@@ -1024,7 +1003,7 @@ mod tests {
},
data: "Hello".to_string(),
};
db.insert(tx1, tx1_row.clone()).await.unwrap();
db.insert(tx1, tx1_row.clone()).unwrap();
let row = db
.read(
tx1,
@@ -1033,14 +1012,13 @@ mod tests {
row_id: 1,
},
)
.await
.unwrap()
.unwrap();
assert_eq!(tx1_row, row);
db.commit_tx(tx1).await.unwrap();
db.commit_tx(tx1).unwrap();
// T2 reads the row with ID 1 within an active transaction.
let tx2 = db.begin_tx().await;
let tx2 = db.begin_tx();
let row = db
.read(
tx2,
@@ -1049,13 +1027,12 @@ mod tests {
row_id: 1,
},
)
.await
.unwrap()
.unwrap();
assert_eq!(tx1_row, row);
// T3 updates the row and commits.
let tx3 = db.begin_tx().await;
let tx3 = db.begin_tx();
let tx3_row = Row {
id: RowID {
table_id: 1,
@@ -1063,8 +1040,8 @@ mod tests {
},
data: "World".to_string(),
};
db.update(tx3, tx3_row).await.unwrap();
db.commit_tx(tx3).await.unwrap();
db.update(tx3, tx3_row).unwrap();
db.commit_tx(tx3).unwrap();
// T2 still reads the same version of the row as before.
let row = db
@@ -1075,19 +1052,18 @@ mod tests {
row_id: 1,
},
)
.await
.unwrap()
.unwrap();
assert_eq!(tx1_row, row);
}
#[traced_test]
#[tokio::test]
async fn test_lost_update() {
#[test]
fn test_lost_update() {
let db = test_db();
// T1 inserts a row with ID 1 and commits.
let tx1 = db.begin_tx().await;
let tx1 = db.begin_tx();
let tx1_row = Row {
id: RowID {
table_id: 1,
@@ -1095,7 +1071,7 @@ mod tests {
},
data: "Hello".to_string(),
};
db.insert(tx1, tx1_row.clone()).await.unwrap();
db.insert(tx1, tx1_row.clone()).unwrap();
let row = db
.read(
tx1,
@@ -1104,14 +1080,13 @@ mod tests {
row_id: 1,
},
)
.await
.unwrap()
.unwrap();
assert_eq!(tx1_row, row);
db.commit_tx(tx1).await.unwrap();
db.commit_tx(tx1).unwrap();
// T2 attempts to update row ID 1 within an active transaction.
let tx2 = db.begin_tx().await;
let tx2 = db.begin_tx();
let tx2_row = Row {
id: RowID {
table_id: 1,
@@ -1119,10 +1094,10 @@ mod tests {
},
data: "World".to_string(),
};
assert!(db.update(tx2, tx2_row.clone()).await.unwrap());
assert!(db.update(tx2, tx2_row.clone()).unwrap());
// T3 also attempts to update row ID 1 within an active transaction.
let tx3 = db.begin_tx().await;
let tx3 = db.begin_tx();
let tx3_row = Row {
id: RowID {
table_id: 1,
@@ -1132,13 +1107,13 @@ mod tests {
};
assert_eq!(
Err(DatabaseError::WriteWriteConflict),
db.update(tx3, tx3_row).await
db.update(tx3, tx3_row)
);
db.commit_tx(tx2).await.unwrap();
assert_eq!(Err(DatabaseError::TxTerminated), db.commit_tx(tx3).await);
db.commit_tx(tx2).unwrap();
assert_eq!(Err(DatabaseError::TxTerminated), db.commit_tx(tx3));
let tx4 = db.begin_tx().await;
let tx4 = db.begin_tx();
let row = db
.read(
tx4,
@@ -1147,7 +1122,6 @@ mod tests {
row_id: 1,
},
)
.await
.unwrap()
.unwrap();
assert_eq!(tx2_row, row);
@@ -1156,12 +1130,12 @@ mod tests {
// Test for the visibility to check if a new transaction can see old committed values.
// This test checks for the typo present in the paper, explained in https://github.com/penberg/mvcc-rs/issues/15
#[traced_test]
#[tokio::test]
async fn test_committed_visibility() {
#[test]
fn test_committed_visibility() {
let db = test_db();
// let's add $10 to my account since I like money
let tx1 = db.begin_tx().await;
let tx1 = db.begin_tx();
let tx1_row = Row {
id: RowID {
table_id: 1,
@@ -1169,11 +1143,11 @@ mod tests {
},
data: "10".to_string(),
};
db.insert(tx1, tx1_row.clone()).await.unwrap();
db.commit_tx(tx1).await.unwrap();
db.insert(tx1, tx1_row.clone()).unwrap();
db.commit_tx(tx1).unwrap();
// but I like more money, so let me try adding $10 more
let tx2 = db.begin_tx().await;
let tx2 = db.begin_tx();
let tx2_row = Row {
id: RowID {
table_id: 1,
@@ -1181,7 +1155,7 @@ mod tests {
},
data: "20".to_string(),
};
assert!(db.update(tx2, tx2_row.clone()).await.unwrap());
assert!(db.update(tx2, tx2_row.clone()).unwrap());
let row = db
.read(
tx2,
@@ -1190,13 +1164,12 @@ mod tests {
row_id: 1,
},
)
.await
.unwrap()
.unwrap();
assert_eq!(row, tx2_row);
// can I check how much money I have?
let tx3 = db.begin_tx().await;
let tx3 = db.begin_tx();
let row = db
.read(
tx3,
@@ -1205,7 +1178,6 @@ mod tests {
row_id: 1,
},
)
.await
.unwrap()
.unwrap();
assert_eq!(tx1_row, row);
@@ -1213,13 +1185,13 @@ mod tests {
// Test to check if a older transaction can see (un)committed future rows
#[traced_test]
#[tokio::test]
async fn test_future_row() {
#[test]
fn test_future_row() {
let db = test_db();
let tx1 = db.begin_tx().await;
let tx1 = db.begin_tx();
let tx2 = db.begin_tx().await;
let tx2 = db.begin_tx();
let tx2_row = Row {
id: RowID {
table_id: 1,
@@ -1227,7 +1199,7 @@ mod tests {
},
data: "10".to_string(),
};
db.insert(tx2, tx2_row.clone()).await.unwrap();
db.insert(tx2, tx2_row).unwrap();
// transaction in progress, so tx1 shouldn't be able to see the value
let row = db
@@ -1238,12 +1210,11 @@ mod tests {
row_id: 1,
},
)
.await
.unwrap();
assert_eq!(row, None);
// lets commit the transaction and check if tx1 can see it
db.commit_tx(tx2).await.unwrap();
db.commit_tx(tx2).unwrap();
let row = db
.read(
tx1,
@@ -1252,14 +1223,13 @@ mod tests {
row_id: 1,
},
)
.await
.unwrap();
assert_eq!(row, None);
}
#[traced_test]
#[tokio::test]
async fn test_storage1() {
#[test]
fn test_storage1() {
let clock = LocalClock::new();
let mut path = std::env::temp_dir();
path.push(format!(
@@ -1272,9 +1242,9 @@ mod tests {
let storage = crate::persistent_storage::Storage::new_json_on_disk(path.clone());
let db = Database::new(clock, storage);
let tx1 = db.begin_tx().await;
let tx2 = db.begin_tx().await;
let tx3 = db.begin_tx().await;
let tx1 = db.begin_tx();
let tx2 = db.begin_tx();
let tx3 = db.begin_tx();
db.insert(
tx3,
@@ -1286,14 +1256,13 @@ mod tests {
data: "testme".to_string(),
},
)
.await
.unwrap();
db.commit_tx(tx1).await.unwrap();
db.rollback_tx(tx2).await;
db.commit_tx(tx3).await.unwrap();
db.commit_tx(tx1).unwrap();
db.rollback_tx(tx2);
db.commit_tx(tx3).unwrap();
let tx4 = db.begin_tx().await;
let tx4 = db.begin_tx();
db.insert(
tx4,
Row {
@@ -1304,7 +1273,6 @@ mod tests {
data: "testme2".to_string(),
},
)
.await
.unwrap();
db.insert(
tx4,
@@ -1316,7 +1284,6 @@ mod tests {
data: "testme3".to_string(),
},
)
.await
.unwrap();
assert_eq!(
@@ -1327,7 +1294,6 @@ mod tests {
row_id: 1
}
)
.await
.unwrap()
.unwrap()
.data,
@@ -1341,7 +1307,6 @@ mod tests {
row_id: 2
}
)
.await
.unwrap()
.unwrap()
.data,
@@ -1355,21 +1320,20 @@ mod tests {
row_id: 3
}
)
.await
.unwrap()
.unwrap()
.data,
"testme3"
);
db.commit_tx(tx4).await.unwrap();
db.commit_tx(tx4).unwrap();
let clock = LocalClock::new();
let storage = crate::persistent_storage::Storage::new_json_on_disk(path);
let db = Database::new(clock, storage);
db.recover().await.unwrap();
db.recover().unwrap();
println!("{:#?}", db);
let tx5 = db.begin_tx().await;
let tx5 = db.begin_tx();
println!(
"{:#?}",
db.read(
@@ -1379,7 +1343,6 @@ mod tests {
row_id: 1
}
)
.await
);
assert_eq!(
db.read(
@@ -1389,7 +1352,6 @@ mod tests {
row_id: 1
}
)
.await
.unwrap()
.unwrap()
.data,
@@ -1403,7 +1365,6 @@ mod tests {
row_id: 2
}
)
.await
.unwrap()
.unwrap()
.data,
@@ -1417,7 +1378,6 @@ mod tests {
row_id: 3
}
)
.await
.unwrap()
.unwrap()
.data,

View File

@@ -20,51 +20,48 @@ impl Storage {
Self::JsonOnDisk(path)
}
pub async fn new_s3(options: s3::Options) -> Result<Self> {
Ok(Self::S3(s3::Replicator::new(options).await?))
pub fn new_s3(options: s3::Options) -> Result<Self> {
let replicator = futures::executor::block_on(s3::Replicator::new(options))?;
Ok(Self::S3(replicator))
}
}
impl Storage {
pub async fn log_tx(&mut self, m: LogRecord) -> Result<()> {
pub fn log_tx(&mut self, m: LogRecord) -> Result<()> {
match self {
Self::JsonOnDisk(path) => {
use tokio::io::AsyncWriteExt;
use std::io::Write;
let t = serde_json::to_vec(&m).map_err(|e| DatabaseError::Io(e.to_string()))?;
let mut file = tokio::fs::OpenOptions::new()
let mut file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&path)
.await
.open(path)
.map_err(|e| DatabaseError::Io(e.to_string()))?;
file.write_all(&t)
.await
.map_err(|e| DatabaseError::Io(e.to_string()))?;
file.write_all(b"\n")
.await
.map_err(|e| DatabaseError::Io(e.to_string()))?;
}
Self::S3(replicator) => {
replicator.replicate_tx(m).await?;
futures::executor::block_on(replicator.replicate_tx(m))?;
}
Self::Noop => (),
}
Ok(())
}
pub async fn read_tx_log(&self) -> Result<Vec<LogRecord>> {
pub fn read_tx_log(&self) -> Result<Vec<LogRecord>> {
match self {
Self::JsonOnDisk(path) => {
use tokio::io::AsyncBufReadExt;
let file = tokio::fs::OpenOptions::new()
use std::io::BufRead;
let file = std::fs::OpenOptions::new()
.read(true)
.open(&path)
.await
.open(path)
.map_err(|e| DatabaseError::Io(e.to_string()))?;
let mut records: Vec<LogRecord> = Vec::new();
let mut lines = tokio::io::BufReader::new(file).lines();
while let Ok(Some(line)) = lines.next_line().await {
let mut lines = std::io::BufReader::new(file).lines();
while let Some(Ok(line)) = lines.next() {
records.push(
serde_json::from_str(&line)
.map_err(|e| DatabaseError::Io(e.to_string()))?,
@@ -72,7 +69,7 @@ impl Storage {
}
Ok(records)
}
Self::S3(replicator) => replicator.read_tx_log().await,
Self::S3(replicator) => futures::executor::block_on(replicator.read_tx_log()),
Self::Noop => Err(crate::errors::DatabaseError::Io(
"cannot read from Noop storage".to_string(),
)),

View File

@@ -19,48 +19,44 @@ fn test_non_overlapping_concurrent_inserts() {
let db = db.clone();
let ids = ids.clone();
thread::spawn(move || {
shuttle::future::block_on(async move {
let tx = db.begin_tx().await;
let id = ids.fetch_add(1, Ordering::SeqCst);
let id = RowID {
table_id: 1,
row_id: id,
};
let row = Row {
id,
data: "Hello".to_string(),
};
db.insert(tx, row.clone()).await.unwrap();
db.commit_tx(tx).await.unwrap();
let tx = db.begin_tx().await;
let committed_row = db.read(tx, id).await.unwrap();
db.commit_tx(tx).await.unwrap();
assert_eq!(committed_row, Some(row));
})
let tx = db.begin_tx();
let id = ids.fetch_add(1, Ordering::SeqCst);
let id = RowID {
table_id: 1,
row_id: id,
};
let row = Row {
id,
data: "Hello".to_string(),
};
db.insert(tx, row.clone()).unwrap();
db.commit_tx(tx).unwrap();
let tx = db.begin_tx();
let committed_row = db.read(tx, id).unwrap();
db.commit_tx(tx).unwrap();
assert_eq!(committed_row, Some(row));
});
}
{
let db = db.clone();
let ids = ids.clone();
thread::spawn(move || {
shuttle::future::block_on(async move {
let tx = db.begin_tx().await;
let id = ids.fetch_add(1, Ordering::SeqCst);
let id = RowID {
table_id: 1,
row_id: id,
};
let row = Row {
id,
data: "World".to_string(),
};
db.insert(tx, row.clone()).await.unwrap();
db.commit_tx(tx).await.unwrap();
let tx = db.begin_tx().await;
let committed_row = db.read(tx, id).await.unwrap();
db.commit_tx(tx).await.unwrap();
assert_eq!(committed_row, Some(row));
});
let tx = db.begin_tx();
let id = ids.fetch_add(1, Ordering::SeqCst);
let id = RowID {
table_id: 1,
row_id: id,
};
let row = Row {
id,
data: "World".to_string(),
};
db.insert(tx, row.clone()).unwrap();
db.commit_tx(tx).unwrap();
let tx = db.begin_tx();
let committed_row = db.read(tx, id).unwrap();
db.commit_tx(tx).unwrap();
assert_eq!(committed_row, Some(row));
});
}
},