diff --git a/core/mvcc/bindings/c/Cargo.toml b/core/mvcc/bindings/c/Cargo.toml index 04aa540b8..be23a9a1b 100644 --- a/core/mvcc/bindings/c/Cargo.toml +++ b/core/mvcc/bindings/c/Cargo.toml @@ -13,7 +13,6 @@ cbindgen = "0.24.0" [dependencies] base64 = "0.21.0" mvcc-rs = { path = "../../mvcc-rs" } -tokio = { version = "1.27.0", features = ["full", "parking_lot"] } tracing = "0.1.37" tracing-subscriber = { version = "0" } diff --git a/core/mvcc/bindings/c/src/lib.rs b/core/mvcc/bindings/c/src/lib.rs index 8cb28a477..509fa94cf 100644 --- a/core/mvcc/bindings/c/src/lib.rs +++ b/core/mvcc/bindings/c/src/lib.rs @@ -20,7 +20,7 @@ type ScanCursor = cursor::ScanCursor<'static, Clock>; static INIT_RUST_LOG: std::sync::Once = std::sync::Once::new(); -async fn storage_for(main_db_path: &str) -> database::Result { +fn storage_for(main_db_path: &str) -> database::Result { // TODO: let's accept an URL instead of main_db_path here, so we can // pass custom S3 endpoints, options, etc. if cfg!(feature = "json_on_disk_storage") { @@ -29,7 +29,8 @@ async fn storage_for(main_db_path: &str) -> database::Result { } if cfg!(feature = "s3_storage") { tracing::info!("S3 storage for {main_db_path}"); - return Storage::new_s3(s3::Options::with_create_bucket_if_not_exists(true)).await; + let options = s3::Options::with_create_bucket_if_not_exists(true); + return Storage::new_s3(options); } tracing::info!("No persistent storage for {main_db_path}"); Ok(Storage::new_noop()) @@ -52,10 +53,9 @@ pub unsafe extern "C" fn MVCCDatabaseOpen(path: *const std::ffi::c_char) -> MVCC return MVCCDatabaseRef::null(); } }; - let runtime = tokio::runtime::Runtime::new().unwrap(); tracing::debug!("mvccrs: opening persistent storage for {main_db_path}"); - let storage = match runtime.block_on(storage_for(main_db_path)) { + let storage = match storage_for(main_db_path) { Ok(storage) => storage, Err(e) => { tracing::error!("Failed to open persistent storage: {e}"); @@ -64,11 +64,10 @@ pub unsafe extern "C" fn MVCCDatabaseOpen(path: *const std::ffi::c_char) -> MVCC }; let db = Db::new(clock, storage); - runtime.block_on(db.recover()).ok(); + db.recover().ok(); - let ctx = DbContext { db, runtime }; - let ctx = Box::leak(Box::new(ctx)); - MVCCDatabaseRef::from(ctx) + let db = Box::leak(Box::new(DbContext { db })); + MVCCDatabaseRef::from(db) } #[no_mangle] @@ -84,8 +83,7 @@ pub unsafe extern "C" fn MVCCDatabaseClose(db: MVCCDatabaseRef) { #[no_mangle] pub unsafe extern "C" fn MVCCTransactionBegin(db: MVCCDatabaseRef) -> u64 { let db = db.get_ref(); - let (db, runtime) = (&db.db, &db.runtime); - let tx_id = runtime.block_on(async move { db.begin_tx().await }); + let tx_id = db.begin_tx(); tracing::debug!("MVCCTransactionBegin: {tx_id}"); tx_id } @@ -93,9 +91,8 @@ pub unsafe extern "C" fn MVCCTransactionBegin(db: MVCCDatabaseRef) -> u64 { #[no_mangle] pub unsafe extern "C" fn MVCCTransactionCommit(db: MVCCDatabaseRef, tx_id: u64) -> MVCCError { let db = db.get_ref(); - let (db, runtime) = (&db.db, &db.runtime); tracing::debug!("MVCCTransactionCommit: {tx_id}"); - match runtime.block_on(async move { db.commit_tx(tx_id).await }) { + match db.commit_tx(tx_id) { Ok(()) => MVCCError::MVCC_OK, Err(e) => { tracing::error!("MVCCTransactionCommit: {e}"); @@ -107,9 +104,8 @@ pub unsafe extern "C" fn MVCCTransactionCommit(db: MVCCDatabaseRef, tx_id: u64) #[no_mangle] pub unsafe extern "C" fn MVCCTransactionRollback(db: MVCCDatabaseRef, tx_id: u64) -> MVCCError { let db = db.get_ref(); - let (db, runtime) = (&db.db, &db.runtime); tracing::debug!("MVCCTransactionRollback: {tx_id}"); - runtime.block_on(async move { db.rollback_tx(tx_id).await }); + db.rollback_tx(tx_id); MVCCError::MVCC_OK } @@ -132,11 +128,10 @@ pub unsafe extern "C" fn MVCCDatabaseInsert( general_purpose::STANDARD.encode(value) } }; - let (db, runtime) = (&db.db, &db.runtime); let id = database::RowID { table_id, row_id }; let row = database::Row { id, data }; tracing::debug!("MVCCDatabaseInsert: {row:?}"); - match runtime.block_on(async move { db.insert(tx_id, row).await }) { + match db.insert(tx_id, row) { Ok(_) => { tracing::debug!("MVCCDatabaseInsert: success"); MVCCError::MVCC_OK @@ -158,29 +153,24 @@ pub unsafe extern "C" fn MVCCDatabaseRead( value_len: *mut i64, ) -> MVCCError { let db = db.get_ref(); - let (db, runtime) = (&db.db, &db.runtime); - match runtime.block_on(async move { + match { let id = database::RowID { table_id, row_id }; - let maybe_row = db.read(tx_id, id).await?; + let maybe_row = db.read(tx_id, id); match maybe_row { - Some(row) => { + Ok(Some(row)) => { tracing::debug!("Found row {row:?}"); let str_len = row.data.len() + 1; - let value = std::ffi::CString::new(row.data.as_bytes()).map_err(|e| { - mvcc_rs::errors::DatabaseError::Io(format!( - "Failed to transform read data into CString: {e}" - )) - })?; + let value = std::ffi::CString::new(row.data.as_bytes()).unwrap_or_default(); unsafe { *value_ptr = value.into_raw() as *mut u8; *value_len = str_len as i64; } } - None => unsafe { *value_len = -1 }, + _ => unsafe { *value_len = -1 }, }; Ok::<(), mvcc_rs::errors::DatabaseError>(()) - }) { + } { Ok(_) => { tracing::debug!("MVCCDatabaseRead: success"); MVCCError::MVCC_OK @@ -209,11 +199,8 @@ pub unsafe extern "C" fn MVCCScanCursorOpen( tracing::debug!("MVCCScanCursorOpen()"); // Reference is transmuted to &'static in order to be able to pass the cursor back to C. // The contract with C is to never use a cursor after MVCCDatabaseClose() has been called. - let database = unsafe { std::mem::transmute::<&DbContext, &'static DbContext>(db.get_ref()) }; - let (database, runtime) = (&database.db, &database.runtime); - match runtime - .block_on(async move { mvcc_rs::cursor::ScanCursor::new(database, tx_id, table_id).await }) - { + let db = unsafe { std::mem::transmute::<&Db, &'static Db>(db.get_ref()) }; + match mvcc_rs::cursor::ScanCursor::new(db, tx_id, table_id) { Ok(cursor) => { if cursor.is_empty() { tracing::debug!("Cursor is empty"); @@ -223,7 +210,7 @@ pub unsafe extern "C" fn MVCCScanCursorOpen( } tracing::debug!("Cursor open: {cursor:?}"); MVCCScanCursorRef { - ptr: Box::into_raw(Box::new(ScanCursorContext { cursor, db })), + ptr: Box::into_raw(Box::new(ScanCursorContext { cursor })), } } Err(e) => { @@ -242,10 +229,8 @@ pub unsafe extern "C" fn MVCCScanCursorClose(cursor: MVCCScanCursorRef) { tracing::debug!("warning: `cursor` is null in MVCCScanCursorClose()"); return; } - let cursor_ctx = unsafe { Box::from_raw(cursor.ptr) }; - let db_context = cursor_ctx.db.clone(); - let runtime = &db_context.get_ref().runtime; - runtime.block_on(async move { cursor_ctx.cursor.close().await.ok() }); + let cursor = unsafe { Box::from_raw(cursor.ptr) }.cursor; + cursor.close().ok(); } #[no_mangle] @@ -259,31 +244,24 @@ pub unsafe extern "C" fn MVCCScanCursorRead( tracing::debug!("warning: `cursor` is null in MVCCScanCursorRead()"); return MVCCError::MVCC_IO_ERROR_READ; } - let cursor_ctx = unsafe { &*cursor.ptr }; - let runtime = &cursor_ctx.db.get_ref().runtime; - let cursor = &cursor_ctx.cursor; + let cursor = cursor.get_ref(); - // TODO: deduplicate with MVCCDatabaseRead() - match runtime.block_on(async move { - let maybe_row = cursor.current_row().await?; + match { + let maybe_row = cursor.current_row(); match maybe_row { - Some(row) => { + Ok(Some(row)) => { tracing::debug!("Found row {row:?}"); let str_len = row.data.len() + 1; - let value = std::ffi::CString::new(row.data.as_bytes()).map_err(|e| { - mvcc_rs::errors::DatabaseError::Io(format!( - "Failed to transform read data into CString: {e}" - )) - })?; + let value = std::ffi::CString::new(row.data.as_bytes()).unwrap_or_default(); unsafe { *value_ptr = value.into_raw() as *mut u8; *value_len = str_len as i64; } } - None => unsafe { *value_len = -1 }, + _ => unsafe { *value_len = -1 }, }; Ok::<(), mvcc_rs::errors::DatabaseError>(()) - }) { + } { Ok(_) => { tracing::debug!("MVCCDatabaseRead: success"); MVCCError::MVCC_OK @@ -297,8 +275,7 @@ pub unsafe extern "C" fn MVCCScanCursorRead( #[no_mangle] pub unsafe extern "C" fn MVCCScanCursorNext(cursor: MVCCScanCursorRef) -> std::ffi::c_int { - let cursor_ctx = unsafe { &mut *cursor.ptr }; - let cursor = &mut cursor_ctx.cursor; + let cursor = cursor.get_ref_mut(); tracing::debug!("MVCCScanCursorNext(): {}", cursor.index); if cursor.forward() { tracing::debug!("Forwarded to {}", cursor.index); @@ -311,8 +288,7 @@ pub unsafe extern "C" fn MVCCScanCursorNext(cursor: MVCCScanCursorRef) -> std::f #[no_mangle] pub unsafe extern "C" fn MVCCScanCursorPosition(cursor: MVCCScanCursorRef) -> u64 { - let cursor_ctx = unsafe { &mut *cursor.ptr }; - let cursor = &mut cursor_ctx.cursor; + let cursor = cursor.get_ref(); cursor .current_row_id() .map(|row_id| row_id.row_id) diff --git a/core/mvcc/bindings/c/src/types.rs b/core/mvcc/bindings/c/src/types.rs index 6f7874604..52c21951d 100644 --- a/core/mvcc/bindings/c/src/types.rs +++ b/core/mvcc/bindings/c/src/types.rs @@ -17,14 +17,14 @@ impl MVCCDatabaseRef { self.ptr.is_null() } - pub fn get_ref(&self) -> &DbContext { - unsafe { &*(self.ptr) } + pub fn get_ref(&self) -> &Db { + &unsafe { &*(self.ptr) }.db } #[allow(clippy::mut_from_ref)] - pub fn get_ref_mut(&self) -> &mut DbContext { + pub fn get_ref_mut(&self) -> &mut Db { let ptr_mut = self.ptr as *mut DbContext; - unsafe { &mut (*ptr_mut) } + &mut unsafe { &mut (*ptr_mut) }.db } } @@ -44,12 +44,10 @@ impl From<&mut DbContext> for MVCCDatabaseRef { pub struct DbContext { pub(crate) db: Db, - pub(crate) runtime: tokio::runtime::Runtime, } pub struct ScanCursorContext { - pub cursor: crate::ScanCursor, - pub db: MVCCDatabaseRef, + pub(crate) cursor: crate::ScanCursor, } #[derive(Clone, Debug)] @@ -57,3 +55,25 @@ pub struct ScanCursorContext { pub struct MVCCScanCursorRef { pub ptr: *mut ScanCursorContext, } + +impl MVCCScanCursorRef { + pub fn null() -> MVCCScanCursorRef { + MVCCScanCursorRef { + ptr: std::ptr::null_mut(), + } + } + + pub fn is_null(&self) -> bool { + self.ptr.is_null() + } + + pub fn get_ref(&self) -> &crate::ScanCursor { + &unsafe { &*(self.ptr) }.cursor + } + + #[allow(clippy::mut_from_ref)] + pub fn get_ref_mut(&self) -> &mut crate::ScanCursor { + let ptr_mut = self.ptr as *mut ScanCursorContext; + &mut unsafe { &mut (*ptr_mut) }.cursor + } +} diff --git a/core/mvcc/mvcc-rs/Cargo.toml b/core/mvcc/mvcc-rs/Cargo.toml index 40cbd2a7c..f2087c651 100644 --- a/core/mvcc/mvcc-rs/Cargo.toml +++ b/core/mvcc/mvcc-rs/Cargo.toml @@ -5,19 +5,16 @@ edition = "2021" [dependencies] anyhow = "1.0.70" -futures = "0.3.28" thiserror = "1.0.40" tracing = "0.1.37" -tokio = { version = "1.27.0", features = ["full", "parking_lot"] } -tokio-stream = { version = "0.1.12", features = ["io-util"] } serde = { version = "1.0.160", features = ["derive"] } serde_json = "1.0.96" -pin-project = "1.0.12" tracing-subscriber = { version = "0", optional = true } base64 = "0.21.0" aws-sdk-s3 = "0.27.0" aws-config = "0.55.2" -tokio-util = "0.7.8" +parking_lot = "0.12.1" +futures = "0.3.28" [dev-dependencies] criterion = { version = "0.4", features = ["html_reports", "async", "async_futures"] } diff --git a/core/mvcc/mvcc-rs/benches/my_benchmark.rs b/core/mvcc/mvcc-rs/benches/my_benchmark.rs index 8d0c28dce..4a9e3d122 100644 --- a/core/mvcc/mvcc-rs/benches/my_benchmark.rs +++ b/core/mvcc/mvcc-rs/benches/my_benchmark.rs @@ -17,30 +17,30 @@ fn bench(c: &mut Criterion) { let db = bench_db(); group.bench_function("begin_tx", |b| { b.to_async(FuturesExecutor).iter(|| async { - db.begin_tx().await; + db.begin_tx(); }) }); let db = bench_db(); group.bench_function("begin_tx + rollback_tx", |b| { b.to_async(FuturesExecutor).iter(|| async { - let tx_id = db.begin_tx().await; - db.rollback_tx(tx_id).await + let tx_id = db.begin_tx(); + db.rollback_tx(tx_id) }) }); let db = bench_db(); group.bench_function("begin_tx + commit_tx", |b| { b.to_async(FuturesExecutor).iter(|| async { - let tx_id = db.begin_tx().await; - db.commit_tx(tx_id).await + let tx_id = db.begin_tx(); + db.commit_tx(tx_id) }) }); let db = bench_db(); group.bench_function("begin_tx-read-commit_tx", |b| { b.to_async(FuturesExecutor).iter(|| async { - let tx_id = db.begin_tx().await; + let tx_id = db.begin_tx(); db.read( tx_id, RowID { @@ -48,16 +48,15 @@ fn bench(c: &mut Criterion) { row_id: 1, }, ) - .await .unwrap(); - db.commit_tx(tx_id).await + db.commit_tx(tx_id) }) }); let db = bench_db(); group.bench_function("begin_tx-update-commit_tx", |b| { b.to_async(FuturesExecutor).iter(|| async { - let tx_id = db.begin_tx().await; + let tx_id = db.begin_tx(); db.update( tx_id, Row { @@ -68,15 +67,14 @@ fn bench(c: &mut Criterion) { data: "World".to_string(), }, ) - .await .unwrap(); - db.commit_tx(tx_id).await + db.commit_tx(tx_id) }) }); let db = bench_db(); - let tx = futures::executor::block_on(db.begin_tx()); - futures::executor::block_on(db.insert( + let tx = db.begin_tx(); + db.insert( tx, Row { id: RowID { @@ -85,7 +83,7 @@ fn bench(c: &mut Criterion) { }, data: "Hello".to_string(), }, - )) + ) .unwrap(); group.bench_function("read", |b| { b.to_async(FuturesExecutor).iter(|| async { @@ -96,14 +94,13 @@ fn bench(c: &mut Criterion) { row_id: 1, }, ) - .await .unwrap(); }) }); let db = bench_db(); - let tx = futures::executor::block_on(db.begin_tx()); - futures::executor::block_on(db.insert( + let tx = db.begin_tx(); + db.insert( tx, Row { id: RowID { @@ -112,7 +109,7 @@ fn bench(c: &mut Criterion) { }, data: "Hello".to_string(), }, - )) + ) .unwrap(); group.bench_function("update", |b| { b.to_async(FuturesExecutor).iter(|| async { @@ -126,7 +123,6 @@ fn bench(c: &mut Criterion) { data: "World".to_string(), }, ) - .await .unwrap(); }) }); diff --git a/core/mvcc/mvcc-rs/src/cursor.rs b/core/mvcc/mvcc-rs/src/cursor.rs index 1c761f663..7042c090f 100644 --- a/core/mvcc/mvcc-rs/src/cursor.rs +++ b/core/mvcc/mvcc-rs/src/cursor.rs @@ -10,12 +10,12 @@ pub struct ScanCursor<'a, Clock: LogicalClock> { } impl<'a, Clock: LogicalClock> ScanCursor<'a, Clock> { - pub async fn new( + pub fn new( db: &'a Database, tx_id: u64, table_id: u64, ) -> Result> { - let row_ids = db.scan_row_ids_for_table(table_id).await?; + let row_ids = db.scan_row_ids_for_table(table_id)?; Ok(Self { db, tx_id, @@ -31,15 +31,15 @@ impl<'a, Clock: LogicalClock> ScanCursor<'a, Clock> { Some(self.row_ids[self.index]) } - pub async fn current_row(&self) -> Result> { + pub fn current_row(&self) -> Result> { if self.index >= self.row_ids.len() { return Ok(None); } let id = self.row_ids[self.index]; - self.db.read(self.tx_id, id).await + self.db.read(self.tx_id, id) } - pub async fn close(self) -> Result<()> { + pub fn close(self) -> Result<()> { Ok(()) } diff --git a/core/mvcc/mvcc-rs/src/database.rs b/core/mvcc/mvcc-rs/src/database.rs index 84425f1af..c69da6c2e 100644 --- a/core/mvcc/mvcc-rs/src/database.rs +++ b/core/mvcc/mvcc-rs/src/database.rs @@ -1,12 +1,12 @@ use crate::clock::LogicalClock; use crate::errors::DatabaseError; use crate::persistent_storage::Storage; +use parking_lot::Mutex; use serde::{Deserialize, Serialize}; use std::cell::RefCell; use std::collections::{BTreeMap, HashMap, HashSet}; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; -use tokio::sync::Mutex; pub type Result = std::result::Result; @@ -156,9 +156,9 @@ impl Database { /// * `tx_id` - the ID of the transaction in which to insert the new row. /// * `row` - the row object containing the values to be inserted. /// - pub async fn insert(&self, tx_id: TxID, row: Row) -> Result<()> { - let inner = self.inner.lock().await; - inner.insert(tx_id, row).await + pub fn insert(&self, tx_id: TxID, row: Row) -> Result<()> { + let inner = self.inner.lock(); + inner.insert(tx_id, row) } /// Updates a row in the database with new values. @@ -179,11 +179,11 @@ impl Database { /// # Returns /// /// Returns `true` if the row was successfully updated, and `false` otherwise. - pub async fn update(&self, tx_id: TxID, row: Row) -> Result { - if !self.delete(tx_id, row.id).await? { + pub fn update(&self, tx_id: TxID, row: Row) -> Result { + if !self.delete(tx_id, row.id)? { return Ok(false); } - self.insert(tx_id, row).await?; + self.insert(tx_id, row)?; Ok(true) } @@ -201,9 +201,9 @@ impl Database { /// /// Returns `true` if the row was successfully deleted, and `false` otherwise. /// - pub async fn delete(&self, tx_id: TxID, id: RowID) -> Result { - let inner = self.inner.lock().await; - inner.delete(tx_id, id).await + pub fn delete(&self, tx_id: TxID, id: RowID) -> Result { + let inner = self.inner.lock(); + inner.delete(tx_id, id) } /// Retrieves a row from the table with the given `id`. @@ -220,18 +220,18 @@ impl Database { /// /// Returns `Some(row)` with the row data if the row with the given `id` exists, /// and `None` otherwise. - pub async fn read(&self, tx_id: TxID, id: RowID) -> Result> { - let inner = self.inner.lock().await; - inner.read(tx_id, id).await + pub fn read(&self, tx_id: TxID, id: RowID) -> Result> { + let inner = self.inner.lock(); + inner.read(tx_id, id) } - pub async fn scan_row_ids(&self) -> Result> { - let inner = self.inner.lock().await; + pub fn scan_row_ids(&self) -> Result> { + let inner = self.inner.lock(); inner.scan_row_ids() } - pub async fn scan_row_ids_for_table(&self, table_id: u64) -> Result> { - let inner = self.inner.lock().await; + pub fn scan_row_ids_for_table(&self, table_id: u64) -> Result> { + let inner = self.inner.lock(); inner.scan_row_ids_for_table(table_id) } @@ -240,9 +240,9 @@ impl Database { /// This function starts a new transaction in the database and returns a `TxID` value /// that you can use to perform operations within the transaction. All changes made within the /// transaction are isolated from other transactions until you commit the transaction. - pub async fn begin_tx(&self) -> TxID { - let mut inner = self.inner.lock().await; - inner.begin_tx().await + pub fn begin_tx(&self) -> TxID { + let mut inner = self.inner.lock(); + inner.begin_tx() } /// Commits a transaction with the specified transaction ID. @@ -254,9 +254,9 @@ impl Database { /// # Arguments /// /// * `tx_id` - The ID of the transaction to commit. - pub async fn commit_tx(&self, tx_id: TxID) -> Result<()> { - let mut inner = self.inner.lock().await; - inner.commit_tx(tx_id).await + pub fn commit_tx(&self, tx_id: TxID) -> Result<()> { + let mut inner = self.inner.lock(); + inner.commit_tx(tx_id) } /// Rolls back a transaction with the specified ID. @@ -267,23 +267,23 @@ impl Database { /// # Arguments /// /// * `tx_id` - The ID of the transaction to abort. - pub async fn rollback_tx(&self, tx_id: TxID) { - let inner = self.inner.lock().await; - inner.rollback_tx(tx_id).await; + pub fn rollback_tx(&self, tx_id: TxID) { + let inner = self.inner.lock(); + inner.rollback_tx(tx_id); } /// Drops all unused row versions from the database. /// /// A version is considered unused if it is not visible to any active transaction /// and it is not the most recent version of the row. - pub async fn drop_unused_row_versions(&self) { - let inner = self.inner.lock().await; + pub fn drop_unused_row_versions(&self) { + let inner = self.inner.lock(); inner.drop_unused_row_versions(); } - pub async fn recover(&self) -> Result<()> { - let inner = self.inner.lock().await; - inner.recover().await + pub fn recover(&self) -> Result<()> { + let inner = self.inner.lock(); + inner.recover() } } @@ -298,7 +298,7 @@ pub struct DatabaseInner { } impl DatabaseInner { - async fn insert(&self, tx_id: TxID, row: Row) -> Result<()> { + fn insert(&self, tx_id: TxID, row: Row) -> Result<()> { let mut txs = self.txs.borrow_mut(); let tx = txs .get_mut(&tx_id) @@ -317,7 +317,7 @@ impl DatabaseInner { } #[allow(clippy::await_holding_refcell_ref)] - async fn delete(&self, tx_id: TxID, id: RowID) -> Result { + fn delete(&self, tx_id: TxID, id: RowID) -> Result { // NOTICE: They *are* dropped before an await point!!! But the await is conditional, // so I think clippy is just confused. let mut txs = self.txs.borrow_mut(); @@ -331,7 +331,7 @@ impl DatabaseInner { if is_write_write_conflict(&txs, tx, rv) { drop(txs); drop(rows); - self.rollback_tx(tx_id).await; + self.rollback_tx(tx_id); return Err(DatabaseError::WriteWriteConflict); } if is_version_visible(&txs, tx, rv) { @@ -347,7 +347,7 @@ impl DatabaseInner { Ok(false) } - async fn read(&self, tx_id: TxID, id: RowID) -> Result> { + fn read(&self, tx_id: TxID, id: RowID) -> Result> { let txs = self.txs.borrow_mut(); let tx = txs.get(&tx_id).unwrap(); assert!(tx.state == TransactionState::Active); @@ -385,7 +385,7 @@ impl DatabaseInner { .collect()) } - async fn begin_tx(&mut self) -> TxID { + fn begin_tx(&mut self) -> TxID { let tx_id = self.get_tx_id(); let begin_ts = self.get_timestamp(); let tx = Transaction::new(tx_id, begin_ts); @@ -397,8 +397,7 @@ impl DatabaseInner { tx_id } - #[allow(clippy::await_holding_refcell_ref)] - async fn commit_tx(&mut self, tx_id: TxID) -> Result<()> { + fn commit_tx(&mut self, tx_id: TxID) -> Result<()> { let end_ts = self.get_timestamp(); let mut txs = self.txs.borrow_mut(); let mut tx = txs.get_mut(&tx_id).unwrap(); @@ -449,12 +448,12 @@ impl DatabaseInner { drop(rows); drop(txs); if !log_record.row_versions.is_empty() { - self.storage.log_tx(log_record).await?; + self.storage.log_tx(log_record)?; } Ok(()) } - async fn rollback_tx(&self, tx_id: TxID) { + fn rollback_tx(&self, tx_id: TxID) { let mut txs = self.txs.borrow_mut(); let mut tx = txs.get_mut(&tx_id).unwrap(); assert!(tx.state == TransactionState::Active); @@ -529,8 +528,8 @@ impl DatabaseInner { } } - pub async fn recover(&self) -> Result<()> { - let tx_log = self.storage.read_tx_log().await?; + pub fn recover(&self) -> Result<()> { + let tx_log = self.storage.read_tx_log()?; for record in tx_log { tracing::debug!("RECOVERING {:?}", record); for version in record.row_versions { @@ -617,11 +616,11 @@ mod tests { } #[traced_test] - #[tokio::test] - async fn test_insert_read() { + #[test] + fn test_insert_read() { let db = test_db(); - let tx1 = db.begin_tx().await; + let tx1 = db.begin_tx(); let tx1_row = Row { id: RowID { table_id: 1, @@ -629,7 +628,7 @@ mod tests { }, data: "Hello".to_string(), }; - db.insert(tx1, tx1_row.clone()).await.unwrap(); + db.insert(tx1, tx1_row.clone()).unwrap(); let row = db .read( tx1, @@ -638,13 +637,12 @@ mod tests { row_id: 1, }, ) - .await .unwrap() .unwrap(); assert_eq!(tx1_row, row); - db.commit_tx(tx1).await.unwrap(); + db.commit_tx(tx1).unwrap(); - let tx2 = db.begin_tx().await; + let tx2 = db.begin_tx(); let row = db .read( tx2, @@ -653,35 +651,32 @@ mod tests { row_id: 1, }, ) - .await .unwrap() .unwrap(); assert_eq!(tx1_row, row); } #[traced_test] - #[tokio::test] - async fn test_read_nonexistent() { + #[test] + fn test_read_nonexistent() { let db = test_db(); - let tx = db.begin_tx().await; - let row = db - .read( - tx, - RowID { - table_id: 1, - row_id: 1, - }, - ) - .await; + let tx = db.begin_tx(); + let row = db.read( + tx, + RowID { + table_id: 1, + row_id: 1, + }, + ); assert!(row.unwrap().is_none()); } #[traced_test] - #[tokio::test] - async fn test_delete() { + #[test] + fn test_delete() { let db = test_db(); - let tx1 = db.begin_tx().await; + let tx1 = db.begin_tx(); let tx1_row = Row { id: RowID { table_id: 1, @@ -689,7 +684,7 @@ mod tests { }, data: "Hello".to_string(), }; - db.insert(tx1, tx1_row.clone()).await.unwrap(); + db.insert(tx1, tx1_row.clone()).unwrap(); let row = db .read( tx1, @@ -698,7 +693,6 @@ mod tests { row_id: 1, }, ) - .await .unwrap() .unwrap(); assert_eq!(tx1_row, row); @@ -709,7 +703,6 @@ mod tests { row_id: 1, }, ) - .await .unwrap(); let row = db .read( @@ -719,12 +712,11 @@ mod tests { row_id: 1, }, ) - .await .unwrap(); assert!(row.is_none()); - db.commit_tx(tx1).await.unwrap(); + db.commit_tx(tx1).unwrap(); - let tx2 = db.begin_tx().await; + let tx2 = db.begin_tx(); let row = db .read( tx2, @@ -733,16 +725,15 @@ mod tests { row_id: 1, }, ) - .await .unwrap(); assert!(row.is_none()); } #[traced_test] - #[tokio::test] - async fn test_delete_nonexistent() { + #[test] + fn test_delete_nonexistent() { let db = test_db(); - let tx = db.begin_tx().await; + let tx = db.begin_tx(); assert!(!db .delete( tx, @@ -751,15 +742,14 @@ mod tests { row_id: 1 } ) - .await .unwrap()); } #[traced_test] - #[tokio::test] - async fn test_commit() { + #[test] + fn test_commit() { let db = test_db(); - let tx1 = db.begin_tx().await; + let tx1 = db.begin_tx(); let tx1_row = Row { id: RowID { table_id: 1, @@ -767,7 +757,7 @@ mod tests { }, data: "Hello".to_string(), }; - db.insert(tx1, tx1_row.clone()).await.unwrap(); + db.insert(tx1, tx1_row.clone()).unwrap(); let row = db .read( tx1, @@ -776,7 +766,6 @@ mod tests { row_id: 1, }, ) - .await .unwrap() .unwrap(); assert_eq!(tx1_row, row); @@ -787,7 +776,7 @@ mod tests { }, data: "World".to_string(), }; - db.update(tx1, tx1_updated_row.clone()).await.unwrap(); + db.update(tx1, tx1_updated_row.clone()).unwrap(); let row = db .read( tx1, @@ -796,13 +785,12 @@ mod tests { row_id: 1, }, ) - .await .unwrap() .unwrap(); assert_eq!(tx1_updated_row, row); - db.commit_tx(tx1).await.unwrap(); + db.commit_tx(tx1).unwrap(); - let tx2 = db.begin_tx().await; + let tx2 = db.begin_tx(); let row = db .read( tx2, @@ -811,19 +799,18 @@ mod tests { row_id: 1, }, ) - .await .unwrap() .unwrap(); - db.commit_tx(tx2).await.unwrap(); + db.commit_tx(tx2).unwrap(); assert_eq!(tx1_updated_row, row); - db.drop_unused_row_versions().await; + db.drop_unused_row_versions(); } #[traced_test] - #[tokio::test] - async fn test_rollback() { + #[test] + fn test_rollback() { let db = test_db(); - let tx1 = db.begin_tx().await; + let tx1 = db.begin_tx(); let row1 = Row { id: RowID { table_id: 1, @@ -831,7 +818,7 @@ mod tests { }, data: "Hello".to_string(), }; - db.insert(tx1, row1.clone()).await.unwrap(); + db.insert(tx1, row1.clone()).unwrap(); let row2 = db .read( tx1, @@ -840,7 +827,6 @@ mod tests { row_id: 1, }, ) - .await .unwrap() .unwrap(); assert_eq!(row1, row2); @@ -851,7 +837,7 @@ mod tests { }, data: "World".to_string(), }; - db.update(tx1, row3.clone()).await.unwrap(); + db.update(tx1, row3.clone()).unwrap(); let row4 = db .read( tx1, @@ -860,12 +846,11 @@ mod tests { row_id: 1, }, ) - .await .unwrap() .unwrap(); assert_eq!(row3, row4); - db.rollback_tx(tx1).await; - let tx2 = db.begin_tx().await; + db.rollback_tx(tx1); + let tx2 = db.begin_tx(); let row5 = db .read( tx2, @@ -874,18 +859,17 @@ mod tests { row_id: 1, }, ) - .await .unwrap(); assert_eq!(row5, None); } #[traced_test] - #[tokio::test] - async fn test_dirty_write() { + #[test] + fn test_dirty_write() { let db = test_db(); // T1 inserts a row with ID 1, but does not commit. - let tx1 = db.begin_tx().await; + let tx1 = db.begin_tx(); let tx1_row = Row { id: RowID { table_id: 1, @@ -893,7 +877,7 @@ mod tests { }, data: "Hello".to_string(), }; - db.insert(tx1, tx1_row.clone()).await.unwrap(); + db.insert(tx1, tx1_row.clone()).unwrap(); let row = db .read( tx1, @@ -902,13 +886,12 @@ mod tests { row_id: 1, }, ) - .await .unwrap() .unwrap(); assert_eq!(tx1_row, row); // T2 attempts to delete row with ID 1, but fails because T1 has not committed. - let tx2 = db.begin_tx().await; + let tx2 = db.begin_tx(); let tx2_row = Row { id: RowID { table_id: 1, @@ -916,7 +899,7 @@ mod tests { }, data: "World".to_string(), }; - assert!(!db.update(tx2, tx2_row).await.unwrap()); + assert!(!db.update(tx2, tx2_row).unwrap()); let row = db .read( @@ -926,19 +909,18 @@ mod tests { row_id: 1, }, ) - .await .unwrap() .unwrap(); assert_eq!(tx1_row, row); } #[traced_test] - #[tokio::test] - async fn test_dirty_read() { + #[test] + fn test_dirty_read() { let db = test_db(); // T1 inserts a row with ID 1, but does not commit. - let tx1 = db.begin_tx().await; + let tx1 = db.begin_tx(); let row1 = Row { id: RowID { table_id: 1, @@ -946,10 +928,10 @@ mod tests { }, data: "Hello".to_string(), }; - db.insert(tx1, row1).await.unwrap(); + db.insert(tx1, row1).unwrap(); // T2 attempts to read row with ID 1, but doesn't see one because T1 has not committed. - let tx2 = db.begin_tx().await; + let tx2 = db.begin_tx(); let row2 = db .read( tx2, @@ -958,19 +940,18 @@ mod tests { row_id: 1, }, ) - .await .unwrap(); assert_eq!(row2, None); } #[ignore] #[traced_test] - #[tokio::test] - async fn test_dirty_read_deleted() { + #[test] + fn test_dirty_read_deleted() { let db = test_db(); // T1 inserts a row with ID 1 and commits. - let tx1 = db.begin_tx().await; + let tx1 = db.begin_tx(); let tx1_row = Row { id: RowID { table_id: 1, @@ -978,11 +959,11 @@ mod tests { }, data: "Hello".to_string(), }; - db.insert(tx1, tx1_row.clone()).await.unwrap(); - db.commit_tx(tx1).await.unwrap(); + db.insert(tx1, tx1_row.clone()).unwrap(); + db.commit_tx(tx1).unwrap(); // T2 deletes row with ID 1, but does not commit. - let tx2 = db.begin_tx().await; + let tx2 = db.begin_tx(); assert!(db .delete( tx2, @@ -991,11 +972,10 @@ mod tests { row_id: 1 } ) - .await .unwrap()); // T3 reads row with ID 1, but doesn't see the delete because T2 hasn't committed. - let tx3 = db.begin_tx().await; + let tx3 = db.begin_tx(); let row = db .read( tx3, @@ -1004,19 +984,18 @@ mod tests { row_id: 1, }, ) - .await .unwrap() .unwrap(); assert_eq!(tx1_row, row); } #[traced_test] - #[tokio::test] - async fn test_fuzzy_read() { + #[test] + fn test_fuzzy_read() { let db = test_db(); // T1 inserts a row with ID 1 and commits. - let tx1 = db.begin_tx().await; + let tx1 = db.begin_tx(); let tx1_row = Row { id: RowID { table_id: 1, @@ -1024,7 +1003,7 @@ mod tests { }, data: "Hello".to_string(), }; - db.insert(tx1, tx1_row.clone()).await.unwrap(); + db.insert(tx1, tx1_row.clone()).unwrap(); let row = db .read( tx1, @@ -1033,14 +1012,13 @@ mod tests { row_id: 1, }, ) - .await .unwrap() .unwrap(); assert_eq!(tx1_row, row); - db.commit_tx(tx1).await.unwrap(); + db.commit_tx(tx1).unwrap(); // T2 reads the row with ID 1 within an active transaction. - let tx2 = db.begin_tx().await; + let tx2 = db.begin_tx(); let row = db .read( tx2, @@ -1049,13 +1027,12 @@ mod tests { row_id: 1, }, ) - .await .unwrap() .unwrap(); assert_eq!(tx1_row, row); // T3 updates the row and commits. - let tx3 = db.begin_tx().await; + let tx3 = db.begin_tx(); let tx3_row = Row { id: RowID { table_id: 1, @@ -1063,8 +1040,8 @@ mod tests { }, data: "World".to_string(), }; - db.update(tx3, tx3_row).await.unwrap(); - db.commit_tx(tx3).await.unwrap(); + db.update(tx3, tx3_row).unwrap(); + db.commit_tx(tx3).unwrap(); // T2 still reads the same version of the row as before. let row = db @@ -1075,19 +1052,18 @@ mod tests { row_id: 1, }, ) - .await .unwrap() .unwrap(); assert_eq!(tx1_row, row); } #[traced_test] - #[tokio::test] - async fn test_lost_update() { + #[test] + fn test_lost_update() { let db = test_db(); // T1 inserts a row with ID 1 and commits. - let tx1 = db.begin_tx().await; + let tx1 = db.begin_tx(); let tx1_row = Row { id: RowID { table_id: 1, @@ -1095,7 +1071,7 @@ mod tests { }, data: "Hello".to_string(), }; - db.insert(tx1, tx1_row.clone()).await.unwrap(); + db.insert(tx1, tx1_row.clone()).unwrap(); let row = db .read( tx1, @@ -1104,14 +1080,13 @@ mod tests { row_id: 1, }, ) - .await .unwrap() .unwrap(); assert_eq!(tx1_row, row); - db.commit_tx(tx1).await.unwrap(); + db.commit_tx(tx1).unwrap(); // T2 attempts to update row ID 1 within an active transaction. - let tx2 = db.begin_tx().await; + let tx2 = db.begin_tx(); let tx2_row = Row { id: RowID { table_id: 1, @@ -1119,10 +1094,10 @@ mod tests { }, data: "World".to_string(), }; - assert!(db.update(tx2, tx2_row.clone()).await.unwrap()); + assert!(db.update(tx2, tx2_row.clone()).unwrap()); // T3 also attempts to update row ID 1 within an active transaction. - let tx3 = db.begin_tx().await; + let tx3 = db.begin_tx(); let tx3_row = Row { id: RowID { table_id: 1, @@ -1132,13 +1107,13 @@ mod tests { }; assert_eq!( Err(DatabaseError::WriteWriteConflict), - db.update(tx3, tx3_row).await + db.update(tx3, tx3_row) ); - db.commit_tx(tx2).await.unwrap(); - assert_eq!(Err(DatabaseError::TxTerminated), db.commit_tx(tx3).await); + db.commit_tx(tx2).unwrap(); + assert_eq!(Err(DatabaseError::TxTerminated), db.commit_tx(tx3)); - let tx4 = db.begin_tx().await; + let tx4 = db.begin_tx(); let row = db .read( tx4, @@ -1147,7 +1122,6 @@ mod tests { row_id: 1, }, ) - .await .unwrap() .unwrap(); assert_eq!(tx2_row, row); @@ -1156,12 +1130,12 @@ mod tests { // Test for the visibility to check if a new transaction can see old committed values. // This test checks for the typo present in the paper, explained in https://github.com/penberg/mvcc-rs/issues/15 #[traced_test] - #[tokio::test] - async fn test_committed_visibility() { + #[test] + fn test_committed_visibility() { let db = test_db(); // let's add $10 to my account since I like money - let tx1 = db.begin_tx().await; + let tx1 = db.begin_tx(); let tx1_row = Row { id: RowID { table_id: 1, @@ -1169,11 +1143,11 @@ mod tests { }, data: "10".to_string(), }; - db.insert(tx1, tx1_row.clone()).await.unwrap(); - db.commit_tx(tx1).await.unwrap(); + db.insert(tx1, tx1_row.clone()).unwrap(); + db.commit_tx(tx1).unwrap(); // but I like more money, so let me try adding $10 more - let tx2 = db.begin_tx().await; + let tx2 = db.begin_tx(); let tx2_row = Row { id: RowID { table_id: 1, @@ -1181,7 +1155,7 @@ mod tests { }, data: "20".to_string(), }; - assert!(db.update(tx2, tx2_row.clone()).await.unwrap()); + assert!(db.update(tx2, tx2_row.clone()).unwrap()); let row = db .read( tx2, @@ -1190,13 +1164,12 @@ mod tests { row_id: 1, }, ) - .await .unwrap() .unwrap(); assert_eq!(row, tx2_row); // can I check how much money I have? - let tx3 = db.begin_tx().await; + let tx3 = db.begin_tx(); let row = db .read( tx3, @@ -1205,7 +1178,6 @@ mod tests { row_id: 1, }, ) - .await .unwrap() .unwrap(); assert_eq!(tx1_row, row); @@ -1213,13 +1185,13 @@ mod tests { // Test to check if a older transaction can see (un)committed future rows #[traced_test] - #[tokio::test] - async fn test_future_row() { + #[test] + fn test_future_row() { let db = test_db(); - let tx1 = db.begin_tx().await; + let tx1 = db.begin_tx(); - let tx2 = db.begin_tx().await; + let tx2 = db.begin_tx(); let tx2_row = Row { id: RowID { table_id: 1, @@ -1227,7 +1199,7 @@ mod tests { }, data: "10".to_string(), }; - db.insert(tx2, tx2_row.clone()).await.unwrap(); + db.insert(tx2, tx2_row).unwrap(); // transaction in progress, so tx1 shouldn't be able to see the value let row = db @@ -1238,12 +1210,11 @@ mod tests { row_id: 1, }, ) - .await .unwrap(); assert_eq!(row, None); // lets commit the transaction and check if tx1 can see it - db.commit_tx(tx2).await.unwrap(); + db.commit_tx(tx2).unwrap(); let row = db .read( tx1, @@ -1252,14 +1223,13 @@ mod tests { row_id: 1, }, ) - .await .unwrap(); assert_eq!(row, None); } #[traced_test] - #[tokio::test] - async fn test_storage1() { + #[test] + fn test_storage1() { let clock = LocalClock::new(); let mut path = std::env::temp_dir(); path.push(format!( @@ -1272,9 +1242,9 @@ mod tests { let storage = crate::persistent_storage::Storage::new_json_on_disk(path.clone()); let db = Database::new(clock, storage); - let tx1 = db.begin_tx().await; - let tx2 = db.begin_tx().await; - let tx3 = db.begin_tx().await; + let tx1 = db.begin_tx(); + let tx2 = db.begin_tx(); + let tx3 = db.begin_tx(); db.insert( tx3, @@ -1286,14 +1256,13 @@ mod tests { data: "testme".to_string(), }, ) - .await .unwrap(); - db.commit_tx(tx1).await.unwrap(); - db.rollback_tx(tx2).await; - db.commit_tx(tx3).await.unwrap(); + db.commit_tx(tx1).unwrap(); + db.rollback_tx(tx2); + db.commit_tx(tx3).unwrap(); - let tx4 = db.begin_tx().await; + let tx4 = db.begin_tx(); db.insert( tx4, Row { @@ -1304,7 +1273,6 @@ mod tests { data: "testme2".to_string(), }, ) - .await .unwrap(); db.insert( tx4, @@ -1316,7 +1284,6 @@ mod tests { data: "testme3".to_string(), }, ) - .await .unwrap(); assert_eq!( @@ -1327,7 +1294,6 @@ mod tests { row_id: 1 } ) - .await .unwrap() .unwrap() .data, @@ -1341,7 +1307,6 @@ mod tests { row_id: 2 } ) - .await .unwrap() .unwrap() .data, @@ -1355,21 +1320,20 @@ mod tests { row_id: 3 } ) - .await .unwrap() .unwrap() .data, "testme3" ); - db.commit_tx(tx4).await.unwrap(); + db.commit_tx(tx4).unwrap(); let clock = LocalClock::new(); let storage = crate::persistent_storage::Storage::new_json_on_disk(path); let db = Database::new(clock, storage); - db.recover().await.unwrap(); + db.recover().unwrap(); println!("{:#?}", db); - let tx5 = db.begin_tx().await; + let tx5 = db.begin_tx(); println!( "{:#?}", db.read( @@ -1379,7 +1343,6 @@ mod tests { row_id: 1 } ) - .await ); assert_eq!( db.read( @@ -1389,7 +1352,6 @@ mod tests { row_id: 1 } ) - .await .unwrap() .unwrap() .data, @@ -1403,7 +1365,6 @@ mod tests { row_id: 2 } ) - .await .unwrap() .unwrap() .data, @@ -1417,7 +1378,6 @@ mod tests { row_id: 3 } ) - .await .unwrap() .unwrap() .data, diff --git a/core/mvcc/mvcc-rs/src/persistent_storage/mod.rs b/core/mvcc/mvcc-rs/src/persistent_storage/mod.rs index 1dd72c02a..f927be381 100644 --- a/core/mvcc/mvcc-rs/src/persistent_storage/mod.rs +++ b/core/mvcc/mvcc-rs/src/persistent_storage/mod.rs @@ -20,51 +20,48 @@ impl Storage { Self::JsonOnDisk(path) } - pub async fn new_s3(options: s3::Options) -> Result { - Ok(Self::S3(s3::Replicator::new(options).await?)) + pub fn new_s3(options: s3::Options) -> Result { + let replicator = futures::executor::block_on(s3::Replicator::new(options))?; + Ok(Self::S3(replicator)) } } impl Storage { - pub async fn log_tx(&mut self, m: LogRecord) -> Result<()> { + pub fn log_tx(&mut self, m: LogRecord) -> Result<()> { match self { Self::JsonOnDisk(path) => { - use tokio::io::AsyncWriteExt; + use std::io::Write; let t = serde_json::to_vec(&m).map_err(|e| DatabaseError::Io(e.to_string()))?; - let mut file = tokio::fs::OpenOptions::new() + let mut file = std::fs::OpenOptions::new() .create(true) .append(true) - .open(&path) - .await + .open(path) .map_err(|e| DatabaseError::Io(e.to_string()))?; file.write_all(&t) - .await .map_err(|e| DatabaseError::Io(e.to_string()))?; file.write_all(b"\n") - .await .map_err(|e| DatabaseError::Io(e.to_string()))?; } Self::S3(replicator) => { - replicator.replicate_tx(m).await?; + futures::executor::block_on(replicator.replicate_tx(m))?; } Self::Noop => (), } Ok(()) } - pub async fn read_tx_log(&self) -> Result> { + pub fn read_tx_log(&self) -> Result> { match self { Self::JsonOnDisk(path) => { - use tokio::io::AsyncBufReadExt; - let file = tokio::fs::OpenOptions::new() + use std::io::BufRead; + let file = std::fs::OpenOptions::new() .read(true) - .open(&path) - .await + .open(path) .map_err(|e| DatabaseError::Io(e.to_string()))?; let mut records: Vec = Vec::new(); - let mut lines = tokio::io::BufReader::new(file).lines(); - while let Ok(Some(line)) = lines.next_line().await { + let mut lines = std::io::BufReader::new(file).lines(); + while let Some(Ok(line)) = lines.next() { records.push( serde_json::from_str(&line) .map_err(|e| DatabaseError::Io(e.to_string()))?, @@ -72,7 +69,7 @@ impl Storage { } Ok(records) } - Self::S3(replicator) => replicator.read_tx_log().await, + Self::S3(replicator) => futures::executor::block_on(replicator.read_tx_log()), Self::Noop => Err(crate::errors::DatabaseError::Io( "cannot read from Noop storage".to_string(), )), diff --git a/core/mvcc/mvcc-rs/tests/concurrency_test.rs b/core/mvcc/mvcc-rs/tests/concurrency_test.rs index 4ad81c645..12321aa10 100644 --- a/core/mvcc/mvcc-rs/tests/concurrency_test.rs +++ b/core/mvcc/mvcc-rs/tests/concurrency_test.rs @@ -19,48 +19,44 @@ fn test_non_overlapping_concurrent_inserts() { let db = db.clone(); let ids = ids.clone(); thread::spawn(move || { - shuttle::future::block_on(async move { - let tx = db.begin_tx().await; - let id = ids.fetch_add(1, Ordering::SeqCst); - let id = RowID { - table_id: 1, - row_id: id, - }; - let row = Row { - id, - data: "Hello".to_string(), - }; - db.insert(tx, row.clone()).await.unwrap(); - db.commit_tx(tx).await.unwrap(); - let tx = db.begin_tx().await; - let committed_row = db.read(tx, id).await.unwrap(); - db.commit_tx(tx).await.unwrap(); - assert_eq!(committed_row, Some(row)); - }) + let tx = db.begin_tx(); + let id = ids.fetch_add(1, Ordering::SeqCst); + let id = RowID { + table_id: 1, + row_id: id, + }; + let row = Row { + id, + data: "Hello".to_string(), + }; + db.insert(tx, row.clone()).unwrap(); + db.commit_tx(tx).unwrap(); + let tx = db.begin_tx(); + let committed_row = db.read(tx, id).unwrap(); + db.commit_tx(tx).unwrap(); + assert_eq!(committed_row, Some(row)); }); } { let db = db.clone(); let ids = ids.clone(); thread::spawn(move || { - shuttle::future::block_on(async move { - let tx = db.begin_tx().await; - let id = ids.fetch_add(1, Ordering::SeqCst); - let id = RowID { - table_id: 1, - row_id: id, - }; - let row = Row { - id, - data: "World".to_string(), - }; - db.insert(tx, row.clone()).await.unwrap(); - db.commit_tx(tx).await.unwrap(); - let tx = db.begin_tx().await; - let committed_row = db.read(tx, id).await.unwrap(); - db.commit_tx(tx).await.unwrap(); - assert_eq!(committed_row, Some(row)); - }); + let tx = db.begin_tx(); + let id = ids.fetch_add(1, Ordering::SeqCst); + let id = RowID { + table_id: 1, + row_id: id, + }; + let row = Row { + id, + data: "World".to_string(), + }; + db.insert(tx, row.clone()).unwrap(); + db.commit_tx(tx).unwrap(); + let tx = db.begin_tx(); + let committed_row = db.read(tx, id).unwrap(); + db.commit_tx(tx).unwrap(); + assert_eq!(committed_row, Some(row)); }); } },