From ae4cc872b672855b30878c8a41aecb39fa22536a Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Wed, 17 May 2023 12:39:08 +0200 Subject: [PATCH] treewide: overhaul the API to be sync again We dropped all occurrences of Tokio to avoid the cost of allocations induced by async runtimes. The only async part of the code is now S3 storage, which is just wrapped in a futures::executor::block_on() --- core/mvcc/bindings/c/Cargo.toml | 1 - core/mvcc/bindings/c/src/lib.rs | 86 ++--- core/mvcc/bindings/c/src/types.rs | 34 +- core/mvcc/mvcc-rs/Cargo.toml | 7 +- core/mvcc/mvcc-rs/benches/my_benchmark.rs | 34 +- core/mvcc/mvcc-rs/src/cursor.rs | 10 +- core/mvcc/mvcc-rs/src/database.rs | 334 ++++++++---------- .../mvcc-rs/src/persistent_storage/mod.rs | 33 +- core/mvcc/mvcc-rs/tests/concurrency_test.rs | 68 ++-- 9 files changed, 274 insertions(+), 333 deletions(-) diff --git a/core/mvcc/bindings/c/Cargo.toml b/core/mvcc/bindings/c/Cargo.toml index 04aa540b8..be23a9a1b 100644 --- a/core/mvcc/bindings/c/Cargo.toml +++ b/core/mvcc/bindings/c/Cargo.toml @@ -13,7 +13,6 @@ cbindgen = "0.24.0" [dependencies] base64 = "0.21.0" mvcc-rs = { path = "../../mvcc-rs" } -tokio = { version = "1.27.0", features = ["full", "parking_lot"] } tracing = "0.1.37" tracing-subscriber = { version = "0" } diff --git a/core/mvcc/bindings/c/src/lib.rs b/core/mvcc/bindings/c/src/lib.rs index 8cb28a477..509fa94cf 100644 --- a/core/mvcc/bindings/c/src/lib.rs +++ b/core/mvcc/bindings/c/src/lib.rs @@ -20,7 +20,7 @@ type ScanCursor = cursor::ScanCursor<'static, Clock>; static INIT_RUST_LOG: std::sync::Once = std::sync::Once::new(); -async fn storage_for(main_db_path: &str) -> database::Result { +fn storage_for(main_db_path: &str) -> database::Result { // TODO: let's accept an URL instead of main_db_path here, so we can // pass custom S3 endpoints, options, etc. if cfg!(feature = "json_on_disk_storage") { @@ -29,7 +29,8 @@ async fn storage_for(main_db_path: &str) -> database::Result { } if cfg!(feature = "s3_storage") { tracing::info!("S3 storage for {main_db_path}"); - return Storage::new_s3(s3::Options::with_create_bucket_if_not_exists(true)).await; + let options = s3::Options::with_create_bucket_if_not_exists(true); + return Storage::new_s3(options); } tracing::info!("No persistent storage for {main_db_path}"); Ok(Storage::new_noop()) @@ -52,10 +53,9 @@ pub unsafe extern "C" fn MVCCDatabaseOpen(path: *const std::ffi::c_char) -> MVCC return MVCCDatabaseRef::null(); } }; - let runtime = tokio::runtime::Runtime::new().unwrap(); tracing::debug!("mvccrs: opening persistent storage for {main_db_path}"); - let storage = match runtime.block_on(storage_for(main_db_path)) { + let storage = match storage_for(main_db_path) { Ok(storage) => storage, Err(e) => { tracing::error!("Failed to open persistent storage: {e}"); @@ -64,11 +64,10 @@ pub unsafe extern "C" fn MVCCDatabaseOpen(path: *const std::ffi::c_char) -> MVCC }; let db = Db::new(clock, storage); - runtime.block_on(db.recover()).ok(); + db.recover().ok(); - let ctx = DbContext { db, runtime }; - let ctx = Box::leak(Box::new(ctx)); - MVCCDatabaseRef::from(ctx) + let db = Box::leak(Box::new(DbContext { db })); + MVCCDatabaseRef::from(db) } #[no_mangle] @@ -84,8 +83,7 @@ pub unsafe extern "C" fn MVCCDatabaseClose(db: MVCCDatabaseRef) { #[no_mangle] pub unsafe extern "C" fn MVCCTransactionBegin(db: MVCCDatabaseRef) -> u64 { let db = db.get_ref(); - let (db, runtime) = (&db.db, &db.runtime); - let tx_id = runtime.block_on(async move { db.begin_tx().await }); + let tx_id = db.begin_tx(); tracing::debug!("MVCCTransactionBegin: {tx_id}"); tx_id } @@ -93,9 +91,8 @@ pub unsafe extern "C" fn MVCCTransactionBegin(db: MVCCDatabaseRef) -> u64 { #[no_mangle] pub unsafe extern "C" fn MVCCTransactionCommit(db: MVCCDatabaseRef, tx_id: u64) -> MVCCError { let db = db.get_ref(); - let (db, runtime) = (&db.db, &db.runtime); tracing::debug!("MVCCTransactionCommit: {tx_id}"); - match runtime.block_on(async move { db.commit_tx(tx_id).await }) { + match db.commit_tx(tx_id) { Ok(()) => MVCCError::MVCC_OK, Err(e) => { tracing::error!("MVCCTransactionCommit: {e}"); @@ -107,9 +104,8 @@ pub unsafe extern "C" fn MVCCTransactionCommit(db: MVCCDatabaseRef, tx_id: u64) #[no_mangle] pub unsafe extern "C" fn MVCCTransactionRollback(db: MVCCDatabaseRef, tx_id: u64) -> MVCCError { let db = db.get_ref(); - let (db, runtime) = (&db.db, &db.runtime); tracing::debug!("MVCCTransactionRollback: {tx_id}"); - runtime.block_on(async move { db.rollback_tx(tx_id).await }); + db.rollback_tx(tx_id); MVCCError::MVCC_OK } @@ -132,11 +128,10 @@ pub unsafe extern "C" fn MVCCDatabaseInsert( general_purpose::STANDARD.encode(value) } }; - let (db, runtime) = (&db.db, &db.runtime); let id = database::RowID { table_id, row_id }; let row = database::Row { id, data }; tracing::debug!("MVCCDatabaseInsert: {row:?}"); - match runtime.block_on(async move { db.insert(tx_id, row).await }) { + match db.insert(tx_id, row) { Ok(_) => { tracing::debug!("MVCCDatabaseInsert: success"); MVCCError::MVCC_OK @@ -158,29 +153,24 @@ pub unsafe extern "C" fn MVCCDatabaseRead( value_len: *mut i64, ) -> MVCCError { let db = db.get_ref(); - let (db, runtime) = (&db.db, &db.runtime); - match runtime.block_on(async move { + match { let id = database::RowID { table_id, row_id }; - let maybe_row = db.read(tx_id, id).await?; + let maybe_row = db.read(tx_id, id); match maybe_row { - Some(row) => { + Ok(Some(row)) => { tracing::debug!("Found row {row:?}"); let str_len = row.data.len() + 1; - let value = std::ffi::CString::new(row.data.as_bytes()).map_err(|e| { - mvcc_rs::errors::DatabaseError::Io(format!( - "Failed to transform read data into CString: {e}" - )) - })?; + let value = std::ffi::CString::new(row.data.as_bytes()).unwrap_or_default(); unsafe { *value_ptr = value.into_raw() as *mut u8; *value_len = str_len as i64; } } - None => unsafe { *value_len = -1 }, + _ => unsafe { *value_len = -1 }, }; Ok::<(), mvcc_rs::errors::DatabaseError>(()) - }) { + } { Ok(_) => { tracing::debug!("MVCCDatabaseRead: success"); MVCCError::MVCC_OK @@ -209,11 +199,8 @@ pub unsafe extern "C" fn MVCCScanCursorOpen( tracing::debug!("MVCCScanCursorOpen()"); // Reference is transmuted to &'static in order to be able to pass the cursor back to C. // The contract with C is to never use a cursor after MVCCDatabaseClose() has been called. - let database = unsafe { std::mem::transmute::<&DbContext, &'static DbContext>(db.get_ref()) }; - let (database, runtime) = (&database.db, &database.runtime); - match runtime - .block_on(async move { mvcc_rs::cursor::ScanCursor::new(database, tx_id, table_id).await }) - { + let db = unsafe { std::mem::transmute::<&Db, &'static Db>(db.get_ref()) }; + match mvcc_rs::cursor::ScanCursor::new(db, tx_id, table_id) { Ok(cursor) => { if cursor.is_empty() { tracing::debug!("Cursor is empty"); @@ -223,7 +210,7 @@ pub unsafe extern "C" fn MVCCScanCursorOpen( } tracing::debug!("Cursor open: {cursor:?}"); MVCCScanCursorRef { - ptr: Box::into_raw(Box::new(ScanCursorContext { cursor, db })), + ptr: Box::into_raw(Box::new(ScanCursorContext { cursor })), } } Err(e) => { @@ -242,10 +229,8 @@ pub unsafe extern "C" fn MVCCScanCursorClose(cursor: MVCCScanCursorRef) { tracing::debug!("warning: `cursor` is null in MVCCScanCursorClose()"); return; } - let cursor_ctx = unsafe { Box::from_raw(cursor.ptr) }; - let db_context = cursor_ctx.db.clone(); - let runtime = &db_context.get_ref().runtime; - runtime.block_on(async move { cursor_ctx.cursor.close().await.ok() }); + let cursor = unsafe { Box::from_raw(cursor.ptr) }.cursor; + cursor.close().ok(); } #[no_mangle] @@ -259,31 +244,24 @@ pub unsafe extern "C" fn MVCCScanCursorRead( tracing::debug!("warning: `cursor` is null in MVCCScanCursorRead()"); return MVCCError::MVCC_IO_ERROR_READ; } - let cursor_ctx = unsafe { &*cursor.ptr }; - let runtime = &cursor_ctx.db.get_ref().runtime; - let cursor = &cursor_ctx.cursor; + let cursor = cursor.get_ref(); - // TODO: deduplicate with MVCCDatabaseRead() - match runtime.block_on(async move { - let maybe_row = cursor.current_row().await?; + match { + let maybe_row = cursor.current_row(); match maybe_row { - Some(row) => { + Ok(Some(row)) => { tracing::debug!("Found row {row:?}"); let str_len = row.data.len() + 1; - let value = std::ffi::CString::new(row.data.as_bytes()).map_err(|e| { - mvcc_rs::errors::DatabaseError::Io(format!( - "Failed to transform read data into CString: {e}" - )) - })?; + let value = std::ffi::CString::new(row.data.as_bytes()).unwrap_or_default(); unsafe { *value_ptr = value.into_raw() as *mut u8; *value_len = str_len as i64; } } - None => unsafe { *value_len = -1 }, + _ => unsafe { *value_len = -1 }, }; Ok::<(), mvcc_rs::errors::DatabaseError>(()) - }) { + } { Ok(_) => { tracing::debug!("MVCCDatabaseRead: success"); MVCCError::MVCC_OK @@ -297,8 +275,7 @@ pub unsafe extern "C" fn MVCCScanCursorRead( #[no_mangle] pub unsafe extern "C" fn MVCCScanCursorNext(cursor: MVCCScanCursorRef) -> std::ffi::c_int { - let cursor_ctx = unsafe { &mut *cursor.ptr }; - let cursor = &mut cursor_ctx.cursor; + let cursor = cursor.get_ref_mut(); tracing::debug!("MVCCScanCursorNext(): {}", cursor.index); if cursor.forward() { tracing::debug!("Forwarded to {}", cursor.index); @@ -311,8 +288,7 @@ pub unsafe extern "C" fn MVCCScanCursorNext(cursor: MVCCScanCursorRef) -> std::f #[no_mangle] pub unsafe extern "C" fn MVCCScanCursorPosition(cursor: MVCCScanCursorRef) -> u64 { - let cursor_ctx = unsafe { &mut *cursor.ptr }; - let cursor = &mut cursor_ctx.cursor; + let cursor = cursor.get_ref(); cursor .current_row_id() .map(|row_id| row_id.row_id) diff --git a/core/mvcc/bindings/c/src/types.rs b/core/mvcc/bindings/c/src/types.rs index 6f7874604..52c21951d 100644 --- a/core/mvcc/bindings/c/src/types.rs +++ b/core/mvcc/bindings/c/src/types.rs @@ -17,14 +17,14 @@ impl MVCCDatabaseRef { self.ptr.is_null() } - pub fn get_ref(&self) -> &DbContext { - unsafe { &*(self.ptr) } + pub fn get_ref(&self) -> &Db { + &unsafe { &*(self.ptr) }.db } #[allow(clippy::mut_from_ref)] - pub fn get_ref_mut(&self) -> &mut DbContext { + pub fn get_ref_mut(&self) -> &mut Db { let ptr_mut = self.ptr as *mut DbContext; - unsafe { &mut (*ptr_mut) } + &mut unsafe { &mut (*ptr_mut) }.db } } @@ -44,12 +44,10 @@ impl From<&mut DbContext> for MVCCDatabaseRef { pub struct DbContext { pub(crate) db: Db, - pub(crate) runtime: tokio::runtime::Runtime, } pub struct ScanCursorContext { - pub cursor: crate::ScanCursor, - pub db: MVCCDatabaseRef, + pub(crate) cursor: crate::ScanCursor, } #[derive(Clone, Debug)] @@ -57,3 +55,25 @@ pub struct ScanCursorContext { pub struct MVCCScanCursorRef { pub ptr: *mut ScanCursorContext, } + +impl MVCCScanCursorRef { + pub fn null() -> MVCCScanCursorRef { + MVCCScanCursorRef { + ptr: std::ptr::null_mut(), + } + } + + pub fn is_null(&self) -> bool { + self.ptr.is_null() + } + + pub fn get_ref(&self) -> &crate::ScanCursor { + &unsafe { &*(self.ptr) }.cursor + } + + #[allow(clippy::mut_from_ref)] + pub fn get_ref_mut(&self) -> &mut crate::ScanCursor { + let ptr_mut = self.ptr as *mut ScanCursorContext; + &mut unsafe { &mut (*ptr_mut) }.cursor + } +} diff --git a/core/mvcc/mvcc-rs/Cargo.toml b/core/mvcc/mvcc-rs/Cargo.toml index 40cbd2a7c..f2087c651 100644 --- a/core/mvcc/mvcc-rs/Cargo.toml +++ b/core/mvcc/mvcc-rs/Cargo.toml @@ -5,19 +5,16 @@ edition = "2021" [dependencies] anyhow = "1.0.70" -futures = "0.3.28" thiserror = "1.0.40" tracing = "0.1.37" -tokio = { version = "1.27.0", features = ["full", "parking_lot"] } -tokio-stream = { version = "0.1.12", features = ["io-util"] } serde = { version = "1.0.160", features = ["derive"] } serde_json = "1.0.96" -pin-project = "1.0.12" tracing-subscriber = { version = "0", optional = true } base64 = "0.21.0" aws-sdk-s3 = "0.27.0" aws-config = "0.55.2" -tokio-util = "0.7.8" +parking_lot = "0.12.1" +futures = "0.3.28" [dev-dependencies] criterion = { version = "0.4", features = ["html_reports", "async", "async_futures"] } diff --git a/core/mvcc/mvcc-rs/benches/my_benchmark.rs b/core/mvcc/mvcc-rs/benches/my_benchmark.rs index 8d0c28dce..4a9e3d122 100644 --- a/core/mvcc/mvcc-rs/benches/my_benchmark.rs +++ b/core/mvcc/mvcc-rs/benches/my_benchmark.rs @@ -17,30 +17,30 @@ fn bench(c: &mut Criterion) { let db = bench_db(); group.bench_function("begin_tx", |b| { b.to_async(FuturesExecutor).iter(|| async { - db.begin_tx().await; + db.begin_tx(); }) }); let db = bench_db(); group.bench_function("begin_tx + rollback_tx", |b| { b.to_async(FuturesExecutor).iter(|| async { - let tx_id = db.begin_tx().await; - db.rollback_tx(tx_id).await + let tx_id = db.begin_tx(); + db.rollback_tx(tx_id) }) }); let db = bench_db(); group.bench_function("begin_tx + commit_tx", |b| { b.to_async(FuturesExecutor).iter(|| async { - let tx_id = db.begin_tx().await; - db.commit_tx(tx_id).await + let tx_id = db.begin_tx(); + db.commit_tx(tx_id) }) }); let db = bench_db(); group.bench_function("begin_tx-read-commit_tx", |b| { b.to_async(FuturesExecutor).iter(|| async { - let tx_id = db.begin_tx().await; + let tx_id = db.begin_tx(); db.read( tx_id, RowID { @@ -48,16 +48,15 @@ fn bench(c: &mut Criterion) { row_id: 1, }, ) - .await .unwrap(); - db.commit_tx(tx_id).await + db.commit_tx(tx_id) }) }); let db = bench_db(); group.bench_function("begin_tx-update-commit_tx", |b| { b.to_async(FuturesExecutor).iter(|| async { - let tx_id = db.begin_tx().await; + let tx_id = db.begin_tx(); db.update( tx_id, Row { @@ -68,15 +67,14 @@ fn bench(c: &mut Criterion) { data: "World".to_string(), }, ) - .await .unwrap(); - db.commit_tx(tx_id).await + db.commit_tx(tx_id) }) }); let db = bench_db(); - let tx = futures::executor::block_on(db.begin_tx()); - futures::executor::block_on(db.insert( + let tx = db.begin_tx(); + db.insert( tx, Row { id: RowID { @@ -85,7 +83,7 @@ fn bench(c: &mut Criterion) { }, data: "Hello".to_string(), }, - )) + ) .unwrap(); group.bench_function("read", |b| { b.to_async(FuturesExecutor).iter(|| async { @@ -96,14 +94,13 @@ fn bench(c: &mut Criterion) { row_id: 1, }, ) - .await .unwrap(); }) }); let db = bench_db(); - let tx = futures::executor::block_on(db.begin_tx()); - futures::executor::block_on(db.insert( + let tx = db.begin_tx(); + db.insert( tx, Row { id: RowID { @@ -112,7 +109,7 @@ fn bench(c: &mut Criterion) { }, data: "Hello".to_string(), }, - )) + ) .unwrap(); group.bench_function("update", |b| { b.to_async(FuturesExecutor).iter(|| async { @@ -126,7 +123,6 @@ fn bench(c: &mut Criterion) { data: "World".to_string(), }, ) - .await .unwrap(); }) }); diff --git a/core/mvcc/mvcc-rs/src/cursor.rs b/core/mvcc/mvcc-rs/src/cursor.rs index 1c761f663..7042c090f 100644 --- a/core/mvcc/mvcc-rs/src/cursor.rs +++ b/core/mvcc/mvcc-rs/src/cursor.rs @@ -10,12 +10,12 @@ pub struct ScanCursor<'a, Clock: LogicalClock> { } impl<'a, Clock: LogicalClock> ScanCursor<'a, Clock> { - pub async fn new( + pub fn new( db: &'a Database, tx_id: u64, table_id: u64, ) -> Result> { - let row_ids = db.scan_row_ids_for_table(table_id).await?; + let row_ids = db.scan_row_ids_for_table(table_id)?; Ok(Self { db, tx_id, @@ -31,15 +31,15 @@ impl<'a, Clock: LogicalClock> ScanCursor<'a, Clock> { Some(self.row_ids[self.index]) } - pub async fn current_row(&self) -> Result> { + pub fn current_row(&self) -> Result> { if self.index >= self.row_ids.len() { return Ok(None); } let id = self.row_ids[self.index]; - self.db.read(self.tx_id, id).await + self.db.read(self.tx_id, id) } - pub async fn close(self) -> Result<()> { + pub fn close(self) -> Result<()> { Ok(()) } diff --git a/core/mvcc/mvcc-rs/src/database.rs b/core/mvcc/mvcc-rs/src/database.rs index 84425f1af..c69da6c2e 100644 --- a/core/mvcc/mvcc-rs/src/database.rs +++ b/core/mvcc/mvcc-rs/src/database.rs @@ -1,12 +1,12 @@ use crate::clock::LogicalClock; use crate::errors::DatabaseError; use crate::persistent_storage::Storage; +use parking_lot::Mutex; use serde::{Deserialize, Serialize}; use std::cell::RefCell; use std::collections::{BTreeMap, HashMap, HashSet}; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; -use tokio::sync::Mutex; pub type Result = std::result::Result; @@ -156,9 +156,9 @@ impl Database { /// * `tx_id` - the ID of the transaction in which to insert the new row. /// * `row` - the row object containing the values to be inserted. /// - pub async fn insert(&self, tx_id: TxID, row: Row) -> Result<()> { - let inner = self.inner.lock().await; - inner.insert(tx_id, row).await + pub fn insert(&self, tx_id: TxID, row: Row) -> Result<()> { + let inner = self.inner.lock(); + inner.insert(tx_id, row) } /// Updates a row in the database with new values. @@ -179,11 +179,11 @@ impl Database { /// # Returns /// /// Returns `true` if the row was successfully updated, and `false` otherwise. - pub async fn update(&self, tx_id: TxID, row: Row) -> Result { - if !self.delete(tx_id, row.id).await? { + pub fn update(&self, tx_id: TxID, row: Row) -> Result { + if !self.delete(tx_id, row.id)? { return Ok(false); } - self.insert(tx_id, row).await?; + self.insert(tx_id, row)?; Ok(true) } @@ -201,9 +201,9 @@ impl Database { /// /// Returns `true` if the row was successfully deleted, and `false` otherwise. /// - pub async fn delete(&self, tx_id: TxID, id: RowID) -> Result { - let inner = self.inner.lock().await; - inner.delete(tx_id, id).await + pub fn delete(&self, tx_id: TxID, id: RowID) -> Result { + let inner = self.inner.lock(); + inner.delete(tx_id, id) } /// Retrieves a row from the table with the given `id`. @@ -220,18 +220,18 @@ impl Database { /// /// Returns `Some(row)` with the row data if the row with the given `id` exists, /// and `None` otherwise. - pub async fn read(&self, tx_id: TxID, id: RowID) -> Result> { - let inner = self.inner.lock().await; - inner.read(tx_id, id).await + pub fn read(&self, tx_id: TxID, id: RowID) -> Result> { + let inner = self.inner.lock(); + inner.read(tx_id, id) } - pub async fn scan_row_ids(&self) -> Result> { - let inner = self.inner.lock().await; + pub fn scan_row_ids(&self) -> Result> { + let inner = self.inner.lock(); inner.scan_row_ids() } - pub async fn scan_row_ids_for_table(&self, table_id: u64) -> Result> { - let inner = self.inner.lock().await; + pub fn scan_row_ids_for_table(&self, table_id: u64) -> Result> { + let inner = self.inner.lock(); inner.scan_row_ids_for_table(table_id) } @@ -240,9 +240,9 @@ impl Database { /// This function starts a new transaction in the database and returns a `TxID` value /// that you can use to perform operations within the transaction. All changes made within the /// transaction are isolated from other transactions until you commit the transaction. - pub async fn begin_tx(&self) -> TxID { - let mut inner = self.inner.lock().await; - inner.begin_tx().await + pub fn begin_tx(&self) -> TxID { + let mut inner = self.inner.lock(); + inner.begin_tx() } /// Commits a transaction with the specified transaction ID. @@ -254,9 +254,9 @@ impl Database { /// # Arguments /// /// * `tx_id` - The ID of the transaction to commit. - pub async fn commit_tx(&self, tx_id: TxID) -> Result<()> { - let mut inner = self.inner.lock().await; - inner.commit_tx(tx_id).await + pub fn commit_tx(&self, tx_id: TxID) -> Result<()> { + let mut inner = self.inner.lock(); + inner.commit_tx(tx_id) } /// Rolls back a transaction with the specified ID. @@ -267,23 +267,23 @@ impl Database { /// # Arguments /// /// * `tx_id` - The ID of the transaction to abort. - pub async fn rollback_tx(&self, tx_id: TxID) { - let inner = self.inner.lock().await; - inner.rollback_tx(tx_id).await; + pub fn rollback_tx(&self, tx_id: TxID) { + let inner = self.inner.lock(); + inner.rollback_tx(tx_id); } /// Drops all unused row versions from the database. /// /// A version is considered unused if it is not visible to any active transaction /// and it is not the most recent version of the row. - pub async fn drop_unused_row_versions(&self) { - let inner = self.inner.lock().await; + pub fn drop_unused_row_versions(&self) { + let inner = self.inner.lock(); inner.drop_unused_row_versions(); } - pub async fn recover(&self) -> Result<()> { - let inner = self.inner.lock().await; - inner.recover().await + pub fn recover(&self) -> Result<()> { + let inner = self.inner.lock(); + inner.recover() } } @@ -298,7 +298,7 @@ pub struct DatabaseInner { } impl DatabaseInner { - async fn insert(&self, tx_id: TxID, row: Row) -> Result<()> { + fn insert(&self, tx_id: TxID, row: Row) -> Result<()> { let mut txs = self.txs.borrow_mut(); let tx = txs .get_mut(&tx_id) @@ -317,7 +317,7 @@ impl DatabaseInner { } #[allow(clippy::await_holding_refcell_ref)] - async fn delete(&self, tx_id: TxID, id: RowID) -> Result { + fn delete(&self, tx_id: TxID, id: RowID) -> Result { // NOTICE: They *are* dropped before an await point!!! But the await is conditional, // so I think clippy is just confused. let mut txs = self.txs.borrow_mut(); @@ -331,7 +331,7 @@ impl DatabaseInner { if is_write_write_conflict(&txs, tx, rv) { drop(txs); drop(rows); - self.rollback_tx(tx_id).await; + self.rollback_tx(tx_id); return Err(DatabaseError::WriteWriteConflict); } if is_version_visible(&txs, tx, rv) { @@ -347,7 +347,7 @@ impl DatabaseInner { Ok(false) } - async fn read(&self, tx_id: TxID, id: RowID) -> Result> { + fn read(&self, tx_id: TxID, id: RowID) -> Result> { let txs = self.txs.borrow_mut(); let tx = txs.get(&tx_id).unwrap(); assert!(tx.state == TransactionState::Active); @@ -385,7 +385,7 @@ impl DatabaseInner { .collect()) } - async fn begin_tx(&mut self) -> TxID { + fn begin_tx(&mut self) -> TxID { let tx_id = self.get_tx_id(); let begin_ts = self.get_timestamp(); let tx = Transaction::new(tx_id, begin_ts); @@ -397,8 +397,7 @@ impl DatabaseInner { tx_id } - #[allow(clippy::await_holding_refcell_ref)] - async fn commit_tx(&mut self, tx_id: TxID) -> Result<()> { + fn commit_tx(&mut self, tx_id: TxID) -> Result<()> { let end_ts = self.get_timestamp(); let mut txs = self.txs.borrow_mut(); let mut tx = txs.get_mut(&tx_id).unwrap(); @@ -449,12 +448,12 @@ impl DatabaseInner { drop(rows); drop(txs); if !log_record.row_versions.is_empty() { - self.storage.log_tx(log_record).await?; + self.storage.log_tx(log_record)?; } Ok(()) } - async fn rollback_tx(&self, tx_id: TxID) { + fn rollback_tx(&self, tx_id: TxID) { let mut txs = self.txs.borrow_mut(); let mut tx = txs.get_mut(&tx_id).unwrap(); assert!(tx.state == TransactionState::Active); @@ -529,8 +528,8 @@ impl DatabaseInner { } } - pub async fn recover(&self) -> Result<()> { - let tx_log = self.storage.read_tx_log().await?; + pub fn recover(&self) -> Result<()> { + let tx_log = self.storage.read_tx_log()?; for record in tx_log { tracing::debug!("RECOVERING {:?}", record); for version in record.row_versions { @@ -617,11 +616,11 @@ mod tests { } #[traced_test] - #[tokio::test] - async fn test_insert_read() { + #[test] + fn test_insert_read() { let db = test_db(); - let tx1 = db.begin_tx().await; + let tx1 = db.begin_tx(); let tx1_row = Row { id: RowID { table_id: 1, @@ -629,7 +628,7 @@ mod tests { }, data: "Hello".to_string(), }; - db.insert(tx1, tx1_row.clone()).await.unwrap(); + db.insert(tx1, tx1_row.clone()).unwrap(); let row = db .read( tx1, @@ -638,13 +637,12 @@ mod tests { row_id: 1, }, ) - .await .unwrap() .unwrap(); assert_eq!(tx1_row, row); - db.commit_tx(tx1).await.unwrap(); + db.commit_tx(tx1).unwrap(); - let tx2 = db.begin_tx().await; + let tx2 = db.begin_tx(); let row = db .read( tx2, @@ -653,35 +651,32 @@ mod tests { row_id: 1, }, ) - .await .unwrap() .unwrap(); assert_eq!(tx1_row, row); } #[traced_test] - #[tokio::test] - async fn test_read_nonexistent() { + #[test] + fn test_read_nonexistent() { let db = test_db(); - let tx = db.begin_tx().await; - let row = db - .read( - tx, - RowID { - table_id: 1, - row_id: 1, - }, - ) - .await; + let tx = db.begin_tx(); + let row = db.read( + tx, + RowID { + table_id: 1, + row_id: 1, + }, + ); assert!(row.unwrap().is_none()); } #[traced_test] - #[tokio::test] - async fn test_delete() { + #[test] + fn test_delete() { let db = test_db(); - let tx1 = db.begin_tx().await; + let tx1 = db.begin_tx(); let tx1_row = Row { id: RowID { table_id: 1, @@ -689,7 +684,7 @@ mod tests { }, data: "Hello".to_string(), }; - db.insert(tx1, tx1_row.clone()).await.unwrap(); + db.insert(tx1, tx1_row.clone()).unwrap(); let row = db .read( tx1, @@ -698,7 +693,6 @@ mod tests { row_id: 1, }, ) - .await .unwrap() .unwrap(); assert_eq!(tx1_row, row); @@ -709,7 +703,6 @@ mod tests { row_id: 1, }, ) - .await .unwrap(); let row = db .read( @@ -719,12 +712,11 @@ mod tests { row_id: 1, }, ) - .await .unwrap(); assert!(row.is_none()); - db.commit_tx(tx1).await.unwrap(); + db.commit_tx(tx1).unwrap(); - let tx2 = db.begin_tx().await; + let tx2 = db.begin_tx(); let row = db .read( tx2, @@ -733,16 +725,15 @@ mod tests { row_id: 1, }, ) - .await .unwrap(); assert!(row.is_none()); } #[traced_test] - #[tokio::test] - async fn test_delete_nonexistent() { + #[test] + fn test_delete_nonexistent() { let db = test_db(); - let tx = db.begin_tx().await; + let tx = db.begin_tx(); assert!(!db .delete( tx, @@ -751,15 +742,14 @@ mod tests { row_id: 1 } ) - .await .unwrap()); } #[traced_test] - #[tokio::test] - async fn test_commit() { + #[test] + fn test_commit() { let db = test_db(); - let tx1 = db.begin_tx().await; + let tx1 = db.begin_tx(); let tx1_row = Row { id: RowID { table_id: 1, @@ -767,7 +757,7 @@ mod tests { }, data: "Hello".to_string(), }; - db.insert(tx1, tx1_row.clone()).await.unwrap(); + db.insert(tx1, tx1_row.clone()).unwrap(); let row = db .read( tx1, @@ -776,7 +766,6 @@ mod tests { row_id: 1, }, ) - .await .unwrap() .unwrap(); assert_eq!(tx1_row, row); @@ -787,7 +776,7 @@ mod tests { }, data: "World".to_string(), }; - db.update(tx1, tx1_updated_row.clone()).await.unwrap(); + db.update(tx1, tx1_updated_row.clone()).unwrap(); let row = db .read( tx1, @@ -796,13 +785,12 @@ mod tests { row_id: 1, }, ) - .await .unwrap() .unwrap(); assert_eq!(tx1_updated_row, row); - db.commit_tx(tx1).await.unwrap(); + db.commit_tx(tx1).unwrap(); - let tx2 = db.begin_tx().await; + let tx2 = db.begin_tx(); let row = db .read( tx2, @@ -811,19 +799,18 @@ mod tests { row_id: 1, }, ) - .await .unwrap() .unwrap(); - db.commit_tx(tx2).await.unwrap(); + db.commit_tx(tx2).unwrap(); assert_eq!(tx1_updated_row, row); - db.drop_unused_row_versions().await; + db.drop_unused_row_versions(); } #[traced_test] - #[tokio::test] - async fn test_rollback() { + #[test] + fn test_rollback() { let db = test_db(); - let tx1 = db.begin_tx().await; + let tx1 = db.begin_tx(); let row1 = Row { id: RowID { table_id: 1, @@ -831,7 +818,7 @@ mod tests { }, data: "Hello".to_string(), }; - db.insert(tx1, row1.clone()).await.unwrap(); + db.insert(tx1, row1.clone()).unwrap(); let row2 = db .read( tx1, @@ -840,7 +827,6 @@ mod tests { row_id: 1, }, ) - .await .unwrap() .unwrap(); assert_eq!(row1, row2); @@ -851,7 +837,7 @@ mod tests { }, data: "World".to_string(), }; - db.update(tx1, row3.clone()).await.unwrap(); + db.update(tx1, row3.clone()).unwrap(); let row4 = db .read( tx1, @@ -860,12 +846,11 @@ mod tests { row_id: 1, }, ) - .await .unwrap() .unwrap(); assert_eq!(row3, row4); - db.rollback_tx(tx1).await; - let tx2 = db.begin_tx().await; + db.rollback_tx(tx1); + let tx2 = db.begin_tx(); let row5 = db .read( tx2, @@ -874,18 +859,17 @@ mod tests { row_id: 1, }, ) - .await .unwrap(); assert_eq!(row5, None); } #[traced_test] - #[tokio::test] - async fn test_dirty_write() { + #[test] + fn test_dirty_write() { let db = test_db(); // T1 inserts a row with ID 1, but does not commit. - let tx1 = db.begin_tx().await; + let tx1 = db.begin_tx(); let tx1_row = Row { id: RowID { table_id: 1, @@ -893,7 +877,7 @@ mod tests { }, data: "Hello".to_string(), }; - db.insert(tx1, tx1_row.clone()).await.unwrap(); + db.insert(tx1, tx1_row.clone()).unwrap(); let row = db .read( tx1, @@ -902,13 +886,12 @@ mod tests { row_id: 1, }, ) - .await .unwrap() .unwrap(); assert_eq!(tx1_row, row); // T2 attempts to delete row with ID 1, but fails because T1 has not committed. - let tx2 = db.begin_tx().await; + let tx2 = db.begin_tx(); let tx2_row = Row { id: RowID { table_id: 1, @@ -916,7 +899,7 @@ mod tests { }, data: "World".to_string(), }; - assert!(!db.update(tx2, tx2_row).await.unwrap()); + assert!(!db.update(tx2, tx2_row).unwrap()); let row = db .read( @@ -926,19 +909,18 @@ mod tests { row_id: 1, }, ) - .await .unwrap() .unwrap(); assert_eq!(tx1_row, row); } #[traced_test] - #[tokio::test] - async fn test_dirty_read() { + #[test] + fn test_dirty_read() { let db = test_db(); // T1 inserts a row with ID 1, but does not commit. - let tx1 = db.begin_tx().await; + let tx1 = db.begin_tx(); let row1 = Row { id: RowID { table_id: 1, @@ -946,10 +928,10 @@ mod tests { }, data: "Hello".to_string(), }; - db.insert(tx1, row1).await.unwrap(); + db.insert(tx1, row1).unwrap(); // T2 attempts to read row with ID 1, but doesn't see one because T1 has not committed. - let tx2 = db.begin_tx().await; + let tx2 = db.begin_tx(); let row2 = db .read( tx2, @@ -958,19 +940,18 @@ mod tests { row_id: 1, }, ) - .await .unwrap(); assert_eq!(row2, None); } #[ignore] #[traced_test] - #[tokio::test] - async fn test_dirty_read_deleted() { + #[test] + fn test_dirty_read_deleted() { let db = test_db(); // T1 inserts a row with ID 1 and commits. - let tx1 = db.begin_tx().await; + let tx1 = db.begin_tx(); let tx1_row = Row { id: RowID { table_id: 1, @@ -978,11 +959,11 @@ mod tests { }, data: "Hello".to_string(), }; - db.insert(tx1, tx1_row.clone()).await.unwrap(); - db.commit_tx(tx1).await.unwrap(); + db.insert(tx1, tx1_row.clone()).unwrap(); + db.commit_tx(tx1).unwrap(); // T2 deletes row with ID 1, but does not commit. - let tx2 = db.begin_tx().await; + let tx2 = db.begin_tx(); assert!(db .delete( tx2, @@ -991,11 +972,10 @@ mod tests { row_id: 1 } ) - .await .unwrap()); // T3 reads row with ID 1, but doesn't see the delete because T2 hasn't committed. - let tx3 = db.begin_tx().await; + let tx3 = db.begin_tx(); let row = db .read( tx3, @@ -1004,19 +984,18 @@ mod tests { row_id: 1, }, ) - .await .unwrap() .unwrap(); assert_eq!(tx1_row, row); } #[traced_test] - #[tokio::test] - async fn test_fuzzy_read() { + #[test] + fn test_fuzzy_read() { let db = test_db(); // T1 inserts a row with ID 1 and commits. - let tx1 = db.begin_tx().await; + let tx1 = db.begin_tx(); let tx1_row = Row { id: RowID { table_id: 1, @@ -1024,7 +1003,7 @@ mod tests { }, data: "Hello".to_string(), }; - db.insert(tx1, tx1_row.clone()).await.unwrap(); + db.insert(tx1, tx1_row.clone()).unwrap(); let row = db .read( tx1, @@ -1033,14 +1012,13 @@ mod tests { row_id: 1, }, ) - .await .unwrap() .unwrap(); assert_eq!(tx1_row, row); - db.commit_tx(tx1).await.unwrap(); + db.commit_tx(tx1).unwrap(); // T2 reads the row with ID 1 within an active transaction. - let tx2 = db.begin_tx().await; + let tx2 = db.begin_tx(); let row = db .read( tx2, @@ -1049,13 +1027,12 @@ mod tests { row_id: 1, }, ) - .await .unwrap() .unwrap(); assert_eq!(tx1_row, row); // T3 updates the row and commits. - let tx3 = db.begin_tx().await; + let tx3 = db.begin_tx(); let tx3_row = Row { id: RowID { table_id: 1, @@ -1063,8 +1040,8 @@ mod tests { }, data: "World".to_string(), }; - db.update(tx3, tx3_row).await.unwrap(); - db.commit_tx(tx3).await.unwrap(); + db.update(tx3, tx3_row).unwrap(); + db.commit_tx(tx3).unwrap(); // T2 still reads the same version of the row as before. let row = db @@ -1075,19 +1052,18 @@ mod tests { row_id: 1, }, ) - .await .unwrap() .unwrap(); assert_eq!(tx1_row, row); } #[traced_test] - #[tokio::test] - async fn test_lost_update() { + #[test] + fn test_lost_update() { let db = test_db(); // T1 inserts a row with ID 1 and commits. - let tx1 = db.begin_tx().await; + let tx1 = db.begin_tx(); let tx1_row = Row { id: RowID { table_id: 1, @@ -1095,7 +1071,7 @@ mod tests { }, data: "Hello".to_string(), }; - db.insert(tx1, tx1_row.clone()).await.unwrap(); + db.insert(tx1, tx1_row.clone()).unwrap(); let row = db .read( tx1, @@ -1104,14 +1080,13 @@ mod tests { row_id: 1, }, ) - .await .unwrap() .unwrap(); assert_eq!(tx1_row, row); - db.commit_tx(tx1).await.unwrap(); + db.commit_tx(tx1).unwrap(); // T2 attempts to update row ID 1 within an active transaction. - let tx2 = db.begin_tx().await; + let tx2 = db.begin_tx(); let tx2_row = Row { id: RowID { table_id: 1, @@ -1119,10 +1094,10 @@ mod tests { }, data: "World".to_string(), }; - assert!(db.update(tx2, tx2_row.clone()).await.unwrap()); + assert!(db.update(tx2, tx2_row.clone()).unwrap()); // T3 also attempts to update row ID 1 within an active transaction. - let tx3 = db.begin_tx().await; + let tx3 = db.begin_tx(); let tx3_row = Row { id: RowID { table_id: 1, @@ -1132,13 +1107,13 @@ mod tests { }; assert_eq!( Err(DatabaseError::WriteWriteConflict), - db.update(tx3, tx3_row).await + db.update(tx3, tx3_row) ); - db.commit_tx(tx2).await.unwrap(); - assert_eq!(Err(DatabaseError::TxTerminated), db.commit_tx(tx3).await); + db.commit_tx(tx2).unwrap(); + assert_eq!(Err(DatabaseError::TxTerminated), db.commit_tx(tx3)); - let tx4 = db.begin_tx().await; + let tx4 = db.begin_tx(); let row = db .read( tx4, @@ -1147,7 +1122,6 @@ mod tests { row_id: 1, }, ) - .await .unwrap() .unwrap(); assert_eq!(tx2_row, row); @@ -1156,12 +1130,12 @@ mod tests { // Test for the visibility to check if a new transaction can see old committed values. // This test checks for the typo present in the paper, explained in https://github.com/penberg/mvcc-rs/issues/15 #[traced_test] - #[tokio::test] - async fn test_committed_visibility() { + #[test] + fn test_committed_visibility() { let db = test_db(); // let's add $10 to my account since I like money - let tx1 = db.begin_tx().await; + let tx1 = db.begin_tx(); let tx1_row = Row { id: RowID { table_id: 1, @@ -1169,11 +1143,11 @@ mod tests { }, data: "10".to_string(), }; - db.insert(tx1, tx1_row.clone()).await.unwrap(); - db.commit_tx(tx1).await.unwrap(); + db.insert(tx1, tx1_row.clone()).unwrap(); + db.commit_tx(tx1).unwrap(); // but I like more money, so let me try adding $10 more - let tx2 = db.begin_tx().await; + let tx2 = db.begin_tx(); let tx2_row = Row { id: RowID { table_id: 1, @@ -1181,7 +1155,7 @@ mod tests { }, data: "20".to_string(), }; - assert!(db.update(tx2, tx2_row.clone()).await.unwrap()); + assert!(db.update(tx2, tx2_row.clone()).unwrap()); let row = db .read( tx2, @@ -1190,13 +1164,12 @@ mod tests { row_id: 1, }, ) - .await .unwrap() .unwrap(); assert_eq!(row, tx2_row); // can I check how much money I have? - let tx3 = db.begin_tx().await; + let tx3 = db.begin_tx(); let row = db .read( tx3, @@ -1205,7 +1178,6 @@ mod tests { row_id: 1, }, ) - .await .unwrap() .unwrap(); assert_eq!(tx1_row, row); @@ -1213,13 +1185,13 @@ mod tests { // Test to check if a older transaction can see (un)committed future rows #[traced_test] - #[tokio::test] - async fn test_future_row() { + #[test] + fn test_future_row() { let db = test_db(); - let tx1 = db.begin_tx().await; + let tx1 = db.begin_tx(); - let tx2 = db.begin_tx().await; + let tx2 = db.begin_tx(); let tx2_row = Row { id: RowID { table_id: 1, @@ -1227,7 +1199,7 @@ mod tests { }, data: "10".to_string(), }; - db.insert(tx2, tx2_row.clone()).await.unwrap(); + db.insert(tx2, tx2_row).unwrap(); // transaction in progress, so tx1 shouldn't be able to see the value let row = db @@ -1238,12 +1210,11 @@ mod tests { row_id: 1, }, ) - .await .unwrap(); assert_eq!(row, None); // lets commit the transaction and check if tx1 can see it - db.commit_tx(tx2).await.unwrap(); + db.commit_tx(tx2).unwrap(); let row = db .read( tx1, @@ -1252,14 +1223,13 @@ mod tests { row_id: 1, }, ) - .await .unwrap(); assert_eq!(row, None); } #[traced_test] - #[tokio::test] - async fn test_storage1() { + #[test] + fn test_storage1() { let clock = LocalClock::new(); let mut path = std::env::temp_dir(); path.push(format!( @@ -1272,9 +1242,9 @@ mod tests { let storage = crate::persistent_storage::Storage::new_json_on_disk(path.clone()); let db = Database::new(clock, storage); - let tx1 = db.begin_tx().await; - let tx2 = db.begin_tx().await; - let tx3 = db.begin_tx().await; + let tx1 = db.begin_tx(); + let tx2 = db.begin_tx(); + let tx3 = db.begin_tx(); db.insert( tx3, @@ -1286,14 +1256,13 @@ mod tests { data: "testme".to_string(), }, ) - .await .unwrap(); - db.commit_tx(tx1).await.unwrap(); - db.rollback_tx(tx2).await; - db.commit_tx(tx3).await.unwrap(); + db.commit_tx(tx1).unwrap(); + db.rollback_tx(tx2); + db.commit_tx(tx3).unwrap(); - let tx4 = db.begin_tx().await; + let tx4 = db.begin_tx(); db.insert( tx4, Row { @@ -1304,7 +1273,6 @@ mod tests { data: "testme2".to_string(), }, ) - .await .unwrap(); db.insert( tx4, @@ -1316,7 +1284,6 @@ mod tests { data: "testme3".to_string(), }, ) - .await .unwrap(); assert_eq!( @@ -1327,7 +1294,6 @@ mod tests { row_id: 1 } ) - .await .unwrap() .unwrap() .data, @@ -1341,7 +1307,6 @@ mod tests { row_id: 2 } ) - .await .unwrap() .unwrap() .data, @@ -1355,21 +1320,20 @@ mod tests { row_id: 3 } ) - .await .unwrap() .unwrap() .data, "testme3" ); - db.commit_tx(tx4).await.unwrap(); + db.commit_tx(tx4).unwrap(); let clock = LocalClock::new(); let storage = crate::persistent_storage::Storage::new_json_on_disk(path); let db = Database::new(clock, storage); - db.recover().await.unwrap(); + db.recover().unwrap(); println!("{:#?}", db); - let tx5 = db.begin_tx().await; + let tx5 = db.begin_tx(); println!( "{:#?}", db.read( @@ -1379,7 +1343,6 @@ mod tests { row_id: 1 } ) - .await ); assert_eq!( db.read( @@ -1389,7 +1352,6 @@ mod tests { row_id: 1 } ) - .await .unwrap() .unwrap() .data, @@ -1403,7 +1365,6 @@ mod tests { row_id: 2 } ) - .await .unwrap() .unwrap() .data, @@ -1417,7 +1378,6 @@ mod tests { row_id: 3 } ) - .await .unwrap() .unwrap() .data, diff --git a/core/mvcc/mvcc-rs/src/persistent_storage/mod.rs b/core/mvcc/mvcc-rs/src/persistent_storage/mod.rs index 1dd72c02a..f927be381 100644 --- a/core/mvcc/mvcc-rs/src/persistent_storage/mod.rs +++ b/core/mvcc/mvcc-rs/src/persistent_storage/mod.rs @@ -20,51 +20,48 @@ impl Storage { Self::JsonOnDisk(path) } - pub async fn new_s3(options: s3::Options) -> Result { - Ok(Self::S3(s3::Replicator::new(options).await?)) + pub fn new_s3(options: s3::Options) -> Result { + let replicator = futures::executor::block_on(s3::Replicator::new(options))?; + Ok(Self::S3(replicator)) } } impl Storage { - pub async fn log_tx(&mut self, m: LogRecord) -> Result<()> { + pub fn log_tx(&mut self, m: LogRecord) -> Result<()> { match self { Self::JsonOnDisk(path) => { - use tokio::io::AsyncWriteExt; + use std::io::Write; let t = serde_json::to_vec(&m).map_err(|e| DatabaseError::Io(e.to_string()))?; - let mut file = tokio::fs::OpenOptions::new() + let mut file = std::fs::OpenOptions::new() .create(true) .append(true) - .open(&path) - .await + .open(path) .map_err(|e| DatabaseError::Io(e.to_string()))?; file.write_all(&t) - .await .map_err(|e| DatabaseError::Io(e.to_string()))?; file.write_all(b"\n") - .await .map_err(|e| DatabaseError::Io(e.to_string()))?; } Self::S3(replicator) => { - replicator.replicate_tx(m).await?; + futures::executor::block_on(replicator.replicate_tx(m))?; } Self::Noop => (), } Ok(()) } - pub async fn read_tx_log(&self) -> Result> { + pub fn read_tx_log(&self) -> Result> { match self { Self::JsonOnDisk(path) => { - use tokio::io::AsyncBufReadExt; - let file = tokio::fs::OpenOptions::new() + use std::io::BufRead; + let file = std::fs::OpenOptions::new() .read(true) - .open(&path) - .await + .open(path) .map_err(|e| DatabaseError::Io(e.to_string()))?; let mut records: Vec = Vec::new(); - let mut lines = tokio::io::BufReader::new(file).lines(); - while let Ok(Some(line)) = lines.next_line().await { + let mut lines = std::io::BufReader::new(file).lines(); + while let Some(Ok(line)) = lines.next() { records.push( serde_json::from_str(&line) .map_err(|e| DatabaseError::Io(e.to_string()))?, @@ -72,7 +69,7 @@ impl Storage { } Ok(records) } - Self::S3(replicator) => replicator.read_tx_log().await, + Self::S3(replicator) => futures::executor::block_on(replicator.read_tx_log()), Self::Noop => Err(crate::errors::DatabaseError::Io( "cannot read from Noop storage".to_string(), )), diff --git a/core/mvcc/mvcc-rs/tests/concurrency_test.rs b/core/mvcc/mvcc-rs/tests/concurrency_test.rs index 4ad81c645..12321aa10 100644 --- a/core/mvcc/mvcc-rs/tests/concurrency_test.rs +++ b/core/mvcc/mvcc-rs/tests/concurrency_test.rs @@ -19,48 +19,44 @@ fn test_non_overlapping_concurrent_inserts() { let db = db.clone(); let ids = ids.clone(); thread::spawn(move || { - shuttle::future::block_on(async move { - let tx = db.begin_tx().await; - let id = ids.fetch_add(1, Ordering::SeqCst); - let id = RowID { - table_id: 1, - row_id: id, - }; - let row = Row { - id, - data: "Hello".to_string(), - }; - db.insert(tx, row.clone()).await.unwrap(); - db.commit_tx(tx).await.unwrap(); - let tx = db.begin_tx().await; - let committed_row = db.read(tx, id).await.unwrap(); - db.commit_tx(tx).await.unwrap(); - assert_eq!(committed_row, Some(row)); - }) + let tx = db.begin_tx(); + let id = ids.fetch_add(1, Ordering::SeqCst); + let id = RowID { + table_id: 1, + row_id: id, + }; + let row = Row { + id, + data: "Hello".to_string(), + }; + db.insert(tx, row.clone()).unwrap(); + db.commit_tx(tx).unwrap(); + let tx = db.begin_tx(); + let committed_row = db.read(tx, id).unwrap(); + db.commit_tx(tx).unwrap(); + assert_eq!(committed_row, Some(row)); }); } { let db = db.clone(); let ids = ids.clone(); thread::spawn(move || { - shuttle::future::block_on(async move { - let tx = db.begin_tx().await; - let id = ids.fetch_add(1, Ordering::SeqCst); - let id = RowID { - table_id: 1, - row_id: id, - }; - let row = Row { - id, - data: "World".to_string(), - }; - db.insert(tx, row.clone()).await.unwrap(); - db.commit_tx(tx).await.unwrap(); - let tx = db.begin_tx().await; - let committed_row = db.read(tx, id).await.unwrap(); - db.commit_tx(tx).await.unwrap(); - assert_eq!(committed_row, Some(row)); - }); + let tx = db.begin_tx(); + let id = ids.fetch_add(1, Ordering::SeqCst); + let id = RowID { + table_id: 1, + row_id: id, + }; + let row = Row { + id, + data: "World".to_string(), + }; + db.insert(tx, row.clone()).unwrap(); + db.commit_tx(tx).unwrap(); + let tx = db.begin_tx(); + let committed_row = db.read(tx, id).unwrap(); + db.commit_tx(tx).unwrap(); + assert_eq!(committed_row, Some(row)); }); } },