From 8b1ef20c0871e4aacf178413d299a459287d960c Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Mon, 15 May 2023 10:50:47 +0200 Subject: [PATCH] treewide: drop storage trait We're good with an enum, and async_trait has a runtime cost we don't like. --- core/mvcc/bindings/c/Cargo.toml | 2 +- core/mvcc/bindings/c/src/lib.rs | 9 +- core/mvcc/mvcc-rs/Cargo.toml | 12 +- core/mvcc/mvcc-rs/benches/my_benchmark.rs | 7 +- core/mvcc/mvcc-rs/src/cursor.rs | 21 +--- core/mvcc/mvcc-rs/src/database.rs | 31 ++---- core/mvcc/mvcc-rs/src/persistent_storage.rs | 115 ++++++++------------ core/mvcc/mvcc-rs/tests/concurrency_test.rs | 2 +- 8 files changed, 72 insertions(+), 127 deletions(-) diff --git a/core/mvcc/bindings/c/Cargo.toml b/core/mvcc/bindings/c/Cargo.toml index 3e348ff73..4d8bb1427 100644 --- a/core/mvcc/bindings/c/Cargo.toml +++ b/core/mvcc/bindings/c/Cargo.toml @@ -12,7 +12,7 @@ cbindgen = "0.24.0" [dependencies] base64 = "0.21.0" -mvcc-rs = { path = "../../mvcc-rs", features = ["tokio"] } +mvcc-rs = { path = "../../mvcc-rs" } tokio = { version = "1.27.0", features = ["full", "parking_lot"] } tracing = "0.1.37" tracing-subscriber = { version = "0" } diff --git a/core/mvcc/bindings/c/src/lib.rs b/core/mvcc/bindings/c/src/lib.rs index b602245c8..18e08db0b 100644 --- a/core/mvcc/bindings/c/src/lib.rs +++ b/core/mvcc/bindings/c/src/lib.rs @@ -12,13 +12,10 @@ use types::{DbContext, MVCCDatabaseRef, MVCCScanCursorRef, ScanCursorContext}; type Clock = clock::LocalClock; /// cbindgen:ignore -type Storage = persistent_storage::JsonOnDisk; +type Db = database::Database; /// cbindgen:ignore -type Db = database::Database; - -/// cbindgen:ignore -type ScanCursor = cursor::ScanCursor<'static, Clock, Storage>; +type ScanCursor = cursor::ScanCursor<'static, Clock>; static INIT_RUST_LOG: std::sync::Once = std::sync::Once::new(); @@ -40,7 +37,7 @@ pub unsafe extern "C" fn MVCCDatabaseOpen(path: *const std::ffi::c_char) -> MVCC } }; tracing::debug!("mvccrs: opening persistent storage at {path}"); - let storage = crate::persistent_storage::JsonOnDisk::new(path); + let storage = crate::persistent_storage::Storage::new_json_on_disk(path); let db = Db::new(clock, storage); let runtime = tokio::runtime::Runtime::new().unwrap(); let ctx = DbContext { db, runtime }; diff --git a/core/mvcc/mvcc-rs/Cargo.toml b/core/mvcc/mvcc-rs/Cargo.toml index 11cb010d6..e5c6ea399 100644 --- a/core/mvcc/mvcc-rs/Cargo.toml +++ b/core/mvcc/mvcc-rs/Cargo.toml @@ -5,12 +5,11 @@ edition = "2021" [dependencies] anyhow = "1.0.70" -async-trait = "0.1.68" futures = "0.3.28" thiserror = "1.0.40" tracing = "0.1.37" -tokio = { version = "1.27.0", features = ["full", "parking_lot"], optional = true } -tokio-stream = { version = "0.1.12", optional = true, features = ["io-util"] } +tokio = { version = "1.27.0", features = ["full", "parking_lot"] } +tokio-stream = { version = "0.1.12", features = ["io-util"] } serde = { version = "1.0.160", features = ["derive"] } serde_json = "1.0.96" pin-project = "1.0.12" @@ -21,10 +20,9 @@ base64 = "0.21.0" criterion = { version = "0.4", features = ["html_reports", "async", "async_futures"] } pprof = { version = "0.11.1", features = ["criterion", "flamegraph"] } shuttle = "0.6.0" -tokio = { version = "1.27.0", features = ["full", "parking_lot"] } tracing-subscriber = "0" tracing-test = "0" -mvcc-rs = { path = ".", features = ["tokio"] } +mvcc-rs = { path = "." } [[bench]] name = "my_benchmark" @@ -32,6 +30,4 @@ harness = false [features] default = [] -full = ["tokio"] -c_bindings = ["tokio", "dep:tracing-subscriber"] -tokio = ["dep:tokio", "dep:tokio-stream"] +c_bindings = ["dep:tracing-subscriber"] diff --git a/core/mvcc/mvcc-rs/benches/my_benchmark.rs b/core/mvcc/mvcc-rs/benches/my_benchmark.rs index 494df0cf6..8d0c28dce 100644 --- a/core/mvcc/mvcc-rs/benches/my_benchmark.rs +++ b/core/mvcc/mvcc-rs/benches/my_benchmark.rs @@ -4,12 +4,9 @@ use mvcc_rs::clock::LocalClock; use mvcc_rs::database::{Database, Row, RowID}; use pprof::criterion::{Output, PProfProfiler}; -fn bench_db() -> Database< - LocalClock, - mvcc_rs::persistent_storage::Noop, -> { +fn bench_db() -> Database { let clock = LocalClock::default(); - let storage = mvcc_rs::persistent_storage::Noop {}; + let storage = mvcc_rs::persistent_storage::Storage::new_noop(); Database::new(clock, storage) } diff --git a/core/mvcc/mvcc-rs/src/cursor.rs b/core/mvcc/mvcc-rs/src/cursor.rs index ea988474a..e289093ff 100644 --- a/core/mvcc/mvcc-rs/src/cursor.rs +++ b/core/mvcc/mvcc-rs/src/cursor.rs @@ -1,29 +1,16 @@ use crate::clock::LogicalClock; use crate::database::{Database, Result, Row, RowID}; -use crate::persistent_storage::Storage; #[derive(Debug)] -pub struct ScanCursor< - 'a, - Clock: LogicalClock, - StorageImpl: Storage, -> { - pub db: &'a Database, +pub struct ScanCursor<'a, Clock: LogicalClock> { + pub db: &'a Database, pub row_ids: Vec, pub index: usize, tx_id: u64, } -impl< - 'a, - Clock: LogicalClock, - StorageImpl: Storage, - > ScanCursor<'a, Clock, StorageImpl> -{ - pub async fn new( - db: &'a Database, - table_id: u64, - ) -> Result> { +impl<'a, Clock: LogicalClock> ScanCursor<'a, Clock> { + pub async fn new(db: &'a Database, table_id: u64) -> Result> { let tx_id = db.begin_tx().await; let row_ids = db.scan_row_ids_for_table(table_id).await?; Ok(Self { diff --git a/core/mvcc/mvcc-rs/src/database.rs b/core/mvcc/mvcc-rs/src/database.rs index a81b1e16e..8d6521756 100644 --- a/core/mvcc/mvcc-rs/src/database.rs +++ b/core/mvcc/mvcc-rs/src/database.rs @@ -1,5 +1,6 @@ use crate::clock::LogicalClock; use crate::errors::DatabaseError; +use crate::persistent_storage::Storage; use serde::{Deserialize, Serialize}; use std::cell::RefCell; use std::collections::{BTreeMap, HashMap, HashSet}; @@ -125,18 +126,11 @@ enum TransactionState { /// A database with MVCC. #[derive(Debug)] -pub struct Database< - Clock: LogicalClock, - Storage: crate::persistent_storage::Storage, -> { - inner: Arc>>, +pub struct Database { + inner: Arc>>, } -impl< - Clock: LogicalClock, - Storage: crate::persistent_storage::Storage, - > Database -{ +impl Database { /// Creates a new database. pub fn new(clock: Clock, storage: Storage) -> Self { let inner = DatabaseInner { @@ -294,7 +288,7 @@ impl< } #[derive(Debug)] -pub struct DatabaseInner { +pub struct DatabaseInner { rows: RefCell>>, txs: RefCell>, tx_timestamps: RefCell>, @@ -303,9 +297,7 @@ pub struct DatabaseInner - DatabaseInner -{ +impl DatabaseInner { async fn insert(&self, tx_id: TxID, row: Row) -> Result<()> { let mut txs = self.txs.borrow_mut(); let tx = txs @@ -624,12 +616,9 @@ mod tests { use crate::clock::LocalClock; use tracing_test::traced_test; - fn test_db() -> Database< - LocalClock, - crate::persistent_storage::Noop, - > { + fn test_db() -> Database { let clock = LocalClock::new(); - let storage = crate::persistent_storage::Noop {}; + let storage = crate::persistent_storage::Storage::new_noop(); Database::new(clock, storage) } @@ -1286,7 +1275,7 @@ mod tests { .unwrap() .as_nanos(), )); - let storage = crate::persistent_storage::JsonOnDisk { path: path.clone() }; + let storage = crate::persistent_storage::Storage::new_json_on_disk(path.clone()); let db = Database::new(clock, storage); let tx1 = db.begin_tx().await; @@ -1381,7 +1370,7 @@ mod tests { db.commit_tx(tx4).await.unwrap(); let clock = LocalClock::new(); - let storage = crate::persistent_storage::JsonOnDisk { path }; + let storage = crate::persistent_storage::Storage::new_json_on_disk(path); let db = Database::new(clock, storage); db.recover().await.unwrap(); println!("{:#?}", db); diff --git a/core/mvcc/mvcc-rs/src/persistent_storage.rs b/core/mvcc/mvcc-rs/src/persistent_storage.rs index 2277e4d2c..f07379981 100644 --- a/core/mvcc/mvcc-rs/src/persistent_storage.rs +++ b/core/mvcc/mvcc-rs/src/persistent_storage.rs @@ -1,53 +1,29 @@ use crate::database::{LogRecord, Result}; - -/// Persistent storage API for storing and retrieving transactions. -/// TODO: final design in heavy progress! -#[async_trait::async_trait] -pub trait Storage { - type Stream: futures::stream::Stream; - - /// Append a transaction in the transaction log. - async fn log_tx(&mut self, m: LogRecord) -> Result<()>; - - /// Read the transaction log for replay. - async fn read_tx_log(&self) -> Result; -} - -pub struct Noop {} - -#[async_trait::async_trait] -impl Storage for Noop { - type Stream = futures::stream::Empty; - - async fn log_tx(&mut self, _m: LogRecord) -> Result<()> { - Ok(()) - } - - async fn read_tx_log(&self) -> Result { - Ok(futures::stream::empty()) - } -} +use crate::errors::DatabaseError; #[derive(Debug)] -pub struct JsonOnDisk { - pub path: std::path::PathBuf, +pub enum Storage { + Noop, + JsonOnDisk(std::path::PathBuf), } -impl JsonOnDisk { - pub fn new(path: impl Into) -> Self { +impl Storage { + pub fn new_noop() -> Self { + Self::Noop + } + + pub fn new_json_on_disk(path: impl Into) -> Self { let path = path.into(); - Self { path } + Self::JsonOnDisk(path) } } -#[cfg(feature = "tokio")] #[pin_project::pin_project] pub struct JsonOnDiskStream { #[pin] inner: tokio_stream::wrappers::LinesStream>, } -#[cfg(feature = "tokio")] impl futures::stream::Stream for JsonOnDiskStream { type Item = LogRecord; @@ -62,41 +38,44 @@ impl futures::stream::Stream for JsonOnDiskStream { } } -#[cfg(feature = "tokio")] -#[async_trait::async_trait] -impl Storage for JsonOnDisk { - type Stream = JsonOnDiskStream; - - async fn log_tx(&mut self, m: LogRecord) -> Result<()> { - use crate::errors::DatabaseError; - use tokio::io::AsyncWriteExt; - let t = serde_json::to_vec(&m).map_err(|e| DatabaseError::Io(e.to_string()))?; - let mut file = tokio::fs::OpenOptions::new() - .create(true) - .append(true) - .open(&self.path) - .await - .map_err(|e| DatabaseError::Io(e.to_string()))?; - file.write_all(&t) - .await - .map_err(|e| DatabaseError::Io(e.to_string()))?; - file.write_all(b"\n") - .await - .map_err(|e| DatabaseError::Io(e.to_string()))?; +impl Storage { + pub async fn log_tx(&mut self, m: LogRecord) -> Result<()> { + if let Self::JsonOnDisk(path) = self { + use tokio::io::AsyncWriteExt; + let t = serde_json::to_vec(&m).map_err(|e| DatabaseError::Io(e.to_string()))?; + let mut file = tokio::fs::OpenOptions::new() + .create(true) + .append(true) + .open(&path) + .await + .map_err(|e| DatabaseError::Io(e.to_string()))?; + file.write_all(&t) + .await + .map_err(|e| DatabaseError::Io(e.to_string()))?; + file.write_all(b"\n") + .await + .map_err(|e| DatabaseError::Io(e.to_string()))?; + } Ok(()) } - async fn read_tx_log(&self) -> Result { - use tokio::io::AsyncBufReadExt; - let file = tokio::fs::OpenOptions::new() - .read(true) - .open(&self.path) - .await - .unwrap(); - Ok(JsonOnDiskStream { - inner: tokio_stream::wrappers::LinesStream::new( - tokio::io::BufReader::new(file).lines(), - ), - }) + pub async fn read_tx_log(&self) -> Result { + if let Self::JsonOnDisk(path) = self { + use tokio::io::AsyncBufReadExt; + let file = tokio::fs::OpenOptions::new() + .read(true) + .open(&path) + .await + .map_err(|e| DatabaseError::Io(e.to_string()))?; + Ok(JsonOnDiskStream { + inner: tokio_stream::wrappers::LinesStream::new( + tokio::io::BufReader::new(file).lines(), + ), + }) + } else { + Err(crate::errors::DatabaseError::Io( + "cannot read from Noop storage".to_string(), + )) + } } } diff --git a/core/mvcc/mvcc-rs/tests/concurrency_test.rs b/core/mvcc/mvcc-rs/tests/concurrency_test.rs index 1ea28453c..4ad81c645 100644 --- a/core/mvcc/mvcc-rs/tests/concurrency_test.rs +++ b/core/mvcc/mvcc-rs/tests/concurrency_test.rs @@ -10,7 +10,7 @@ fn test_non_overlapping_concurrent_inserts() { // Two threads insert to the database concurrently using non-overlapping // row IDs. let clock = LocalClock::default(); - let storage = mvcc_rs::persistent_storage::Noop {}; + let storage = mvcc_rs::persistent_storage::Storage::new_noop(); let db = Arc::new(Database::new(clock, storage)); let ids = Arc::new(AtomicU64::new(0)); shuttle::check_random(