diff --git a/core/mvcc/database/Cargo.toml b/core/mvcc/database/Cargo.toml index 36809c922..a39fb4069 100644 --- a/core/mvcc/database/Cargo.toml +++ b/core/mvcc/database/Cargo.toml @@ -10,6 +10,10 @@ futures = "0.3.28" thiserror = "1.0.40" tracing = "0.1.37" tokio = { version = "1.27.0", features = ["full"], optional = true } +tokio-stream = { version = "0.1.12", optional = true, features = ["io-util"] } +serde = { version = "1.0.160", features = ["derive"] } +serde_json = "1.0.96" +pin-project = "1.0.12" [dev-dependencies] criterion = { version = "0.4", features = ["html_reports", "async", "async_futures"] } @@ -27,4 +31,4 @@ harness = false [features] default = [] full = ["tokio"] -tokio = ["dep:tokio"] +tokio = ["dep:tokio", "dep:tokio-stream"] diff --git a/core/mvcc/database/benches/my_benchmark.rs b/core/mvcc/database/benches/my_benchmark.rs index 1515b7579..3c360107a 100644 --- a/core/mvcc/database/benches/my_benchmark.rs +++ b/core/mvcc/database/benches/my_benchmark.rs @@ -4,20 +4,30 @@ use mvcc_rs::clock::LocalClock; use mvcc_rs::database::{Database, Row}; use pprof::criterion::{Output, PProfProfiler}; +fn bench_db() -> Database< + LocalClock, + mvcc_rs::persistent_storage::Noop, + tokio::sync::Mutex< + mvcc_rs::database::DatabaseInner, + >, +> { + let clock = LocalClock::default(); + let storage = mvcc_rs::persistent_storage::Noop {}; + Database::<_, _, tokio::sync::Mutex<_>>::new(clock, storage) +} + fn bench(c: &mut Criterion) { let mut group = c.benchmark_group("mvcc-ops-throughput"); group.throughput(Throughput::Elements(1)); - let clock = LocalClock::default(); - let db = Database::>::new(clock); + let db = bench_db(); group.bench_function("begin_tx", |b| { b.to_async(FuturesExecutor).iter(|| async { db.begin_tx().await; }) }); - let clock = LocalClock::default(); - let db = Database::>::new(clock); + let db = bench_db(); group.bench_function("begin_tx + rollback_tx", |b| { b.to_async(FuturesExecutor).iter(|| async { let tx_id = db.begin_tx().await; @@ -25,8 +35,7 @@ fn bench(c: &mut Criterion) { }) }); - let clock = LocalClock::default(); - let db = Database::>::new(clock); + let db = bench_db(); group.bench_function("begin_tx + commit_tx", |b| { b.to_async(FuturesExecutor).iter(|| async { let tx_id = db.begin_tx().await; @@ -34,8 +43,7 @@ fn bench(c: &mut Criterion) { }) }); - let clock = LocalClock::default(); - let db = Database::>::new(clock); + let db = bench_db(); group.bench_function("begin_tx-read-commit_tx", |b| { b.to_async(FuturesExecutor).iter(|| async { let tx_id = db.begin_tx().await; @@ -44,8 +52,7 @@ fn bench(c: &mut Criterion) { }) }); - let clock = LocalClock::default(); - let db = Database::>::new(clock); + let db = bench_db(); group.bench_function("begin_tx-update-commit_tx", |b| { b.to_async(FuturesExecutor).iter(|| async { let tx_id = db.begin_tx().await; @@ -62,8 +69,7 @@ fn bench(c: &mut Criterion) { }) }); - let clock = LocalClock::default(); - let db = Database::>::new(clock); + let db = bench_db(); let tx = futures::executor::block_on(db.begin_tx()); futures::executor::block_on(db.insert( tx, @@ -79,8 +85,7 @@ fn bench(c: &mut Criterion) { }) }); - let clock = LocalClock::default(); - let db = Database::>::new(clock); + let db = bench_db(); let tx = futures::executor::block_on(db.begin_tx()); futures::executor::block_on(db.insert( tx, diff --git a/core/mvcc/database/src/database.rs b/core/mvcc/database/src/database.rs index c601e7a38..98cbefcea 100644 --- a/core/mvcc/database/src/database.rs +++ b/core/mvcc/database/src/database.rs @@ -1,27 +1,43 @@ use crate::clock::LogicalClock; use crate::errors::DatabaseError; +use serde::{Deserialize, Serialize}; use std::cell::RefCell; use std::collections::{HashMap, HashSet}; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; -type Result = std::result::Result; +pub type Result = std::result::Result; -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct Row { pub id: u64, pub data: String, } /// A row version. -#[derive(Clone, Debug)] -struct RowVersion { +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct RowVersion { begin: TxTimestampOrID, end: Option, row: Row, } -type TxID = u64; +pub type TxID = u64; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct Mutation { + tx_id: TxID, + row_versions: Vec, +} + +impl Mutation { + fn new(tx_id: TxID) -> Self { + Self { + tx_id, + row_versions: Vec::new(), + } + } +} /// A transaction timestamp or ID. /// @@ -29,14 +45,14 @@ type TxID = u64; /// phase of the transaction. During the active phase, new versions track the /// transaction ID in the `begin` and `end` fields. After a transaction commits, /// versions switch to tracking timestamps. -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] enum TxTimestampOrID { Timestamp(u64), TxID(TxID), } /// Transaction -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct Transaction { /// The state of the transaction. state: TransactionState, @@ -89,7 +105,7 @@ impl std::fmt::Display for Transaction { } /// Transaction state. -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] enum TransactionState { Active, Preparing, @@ -102,21 +118,26 @@ enum TransactionState { #[derive(Debug)] pub struct Database< Clock: LogicalClock, - AsyncMutex: crate::sync::AsyncMutex>, + Storage: crate::persistent_storage::Storage, + AsyncMutex: crate::sync::AsyncMutex>, > { inner: Arc, } -impl>> - Database +impl< + Clock: LogicalClock, + Storage: crate::persistent_storage::Storage, + AsyncMutex: crate::sync::AsyncMutex>, + > Database { /// Creates a new database. - pub fn new(clock: Clock) -> Self { + pub fn new(clock: Clock, storage: Storage) -> Self { let inner = DatabaseInner { rows: RefCell::new(HashMap::new()), txs: RefCell::new(HashMap::new()), tx_ids: AtomicU64::new(0), clock, + storage, }; Self { inner: Arc::new(AsyncMutex::new(inner)), @@ -238,17 +259,32 @@ impl Result> { + use futures::StreamExt; + let inner = self.inner.lock().await; + Ok(inner + .storage + .scan() + .await? + .collect::>() + .await) + } } #[derive(Debug)] -pub struct DatabaseInner { +pub struct DatabaseInner { rows: RefCell>>, txs: RefCell>, tx_ids: AtomicU64, clock: Clock, + storage: Storage, } -impl DatabaseInner { +impl + DatabaseInner +{ async fn insert(&self, tx_id: TxID, row: Row) -> Result<()> { let mut txs = self.txs.borrow_mut(); let tx = txs @@ -324,6 +360,7 @@ impl DatabaseInner { tx_id } + #[allow(clippy::await_holding_refcell_ref)] async fn commit_tx(&mut self, tx_id: TxID) -> Result<()> { let end_ts = self.get_timestamp(); let mut txs = self.txs.borrow_mut(); @@ -337,17 +374,20 @@ impl DatabaseInner { let mut rows = self.rows.borrow_mut(); tx.state = TransactionState::Preparing; tracing::trace!("PREPARE {tx}"); + let mut mutation: Mutation = Mutation::new(tx_id); for id in &tx.write_set { if let Some(row_versions) = rows.get_mut(id) { for row_version in row_versions.iter_mut() { if let TxTimestampOrID::TxID(id) = row_version.begin { if id == tx_id { row_version.begin = TxTimestampOrID::Timestamp(tx.begin_ts); + mutation.row_versions.push(row_version.clone()); // FIXME: optimize cloning out } } if let Some(TxTimestampOrID::TxID(id)) = row_version.end { if id == tx_id { row_version.end = Some(TxTimestampOrID::Timestamp(end_ts)); + mutation.row_versions.push(row_version.clone()); // FIXME: optimize cloning out } } } @@ -362,6 +402,11 @@ impl DatabaseInner { // might have speculatively read a version that we want to remove. // But that's a problem for another day. txs.remove(&tx_id); + drop(rows); + drop(txs); + if !mutation.row_versions.is_empty() { + self.storage.store(mutation).await?; + } Ok(()) } @@ -459,11 +504,20 @@ mod tests { use crate::clock::LocalClock; use tracing_test::traced_test; + fn test_db() -> Database< + LocalClock, + crate::persistent_storage::Noop, + tokio::sync::Mutex>, + > { + let clock = LocalClock::new(); + let storage = crate::persistent_storage::Noop {}; + Database::<_, _, tokio::sync::Mutex<_>>::new(clock, storage) + } + #[traced_test] #[tokio::test] async fn test_insert_read() { - let clock = LocalClock::default(); - let db = Database::>::new(clock); + let db = test_db(); let tx1 = db.begin_tx().await; let tx1_row = Row { @@ -483,8 +537,7 @@ mod tests { #[traced_test] #[tokio::test] async fn test_read_nonexistent() { - let clock = LocalClock::default(); - let db = Database::>::new(clock); + let db = test_db(); let tx = db.begin_tx().await; let row = db.read(tx, 1).await; assert!(row.unwrap().is_none()); @@ -493,8 +546,7 @@ mod tests { #[traced_test] #[tokio::test] async fn test_delete() { - let clock = LocalClock::default(); - let db = Database::>::new(clock); + let db = test_db(); let tx1 = db.begin_tx().await; let tx1_row = Row { @@ -517,8 +569,7 @@ mod tests { #[traced_test] #[tokio::test] async fn test_delete_nonexistent() { - let clock = LocalClock::default(); - let db = Database::>::new(clock); + let db = test_db(); let tx = db.begin_tx().await; assert!(!db.delete(tx, 1).await.unwrap()); } @@ -526,8 +577,7 @@ mod tests { #[traced_test] #[tokio::test] async fn test_commit() { - let clock = LocalClock::default(); - let db = Database::>::new(clock); + let db = test_db(); let tx1 = db.begin_tx().await; let tx1_row = Row { id: 1, @@ -554,8 +604,7 @@ mod tests { #[traced_test] #[tokio::test] async fn test_rollback() { - let clock = LocalClock::default(); - let db = Database::>::new(clock); + let db = test_db(); let tx1 = db.begin_tx().await; let row1 = Row { id: 1, @@ -580,8 +629,7 @@ mod tests { #[traced_test] #[tokio::test] async fn test_dirty_write() { - let clock = LocalClock::default(); - let db = Database::>::new(clock); + let db = test_db(); // T1 inserts a row with ID 1, but does not commit. let tx1 = db.begin_tx().await; @@ -608,8 +656,7 @@ mod tests { #[traced_test] #[tokio::test] async fn test_dirty_read() { - let clock = LocalClock::default(); - let db = Database::>::new(clock); + let db = test_db(); // T1 inserts a row with ID 1, but does not commit. let tx1 = db.begin_tx().await; @@ -629,8 +676,7 @@ mod tests { #[traced_test] #[tokio::test] async fn test_dirty_read_deleted() { - let clock = LocalClock::default(); - let db = Database::>::new(clock); + let db = test_db(); // T1 inserts a row with ID 1 and commits. let tx1 = db.begin_tx().await; @@ -654,8 +700,7 @@ mod tests { #[traced_test] #[tokio::test] async fn test_fuzzy_read() { - let clock = LocalClock::default(); - let db = Database::>::new(clock); + let db = test_db(); // T1 inserts a row with ID 1 and commits. let tx1 = db.begin_tx().await; @@ -690,8 +735,7 @@ mod tests { #[traced_test] #[tokio::test] async fn test_lost_update() { - let clock = LocalClock::default(); - let db = Database::>::new(clock); + let db = test_db(); // T1 inserts a row with ID 1 and commits. let tx1 = db.begin_tx().await; @@ -736,8 +780,7 @@ mod tests { #[traced_test] #[tokio::test] async fn test_committed_visibility() { - let clock = LocalClock::default(); - let db = Database::>::new(clock); + let db = test_db(); // let's add $10 to my account since I like money let tx1 = db.begin_tx().await; @@ -768,8 +811,7 @@ mod tests { #[traced_test] #[tokio::test] async fn test_future_row() { - let clock = LocalClock::default(); - let db = Database::>::new(clock); + let db = test_db(); let tx1 = db.begin_tx().await; @@ -789,4 +831,73 @@ mod tests { let row = db.read(tx1, 1).await.unwrap(); assert_eq!(row, None); } + + #[traced_test] + #[tokio::test] + async fn test_storage1() { + let clock = LocalClock::new(); + let mut path = std::env::temp_dir(); + path.push(format!( + "mvcc-rs-storage-test-{}", + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_nanos(), + )); + let storage = crate::persistent_storage::JsonOnDisk { path }; + let db: Database<_, _, tokio::sync::Mutex<_>> = Database::new(clock, storage); + + let tx1 = db.begin_tx().await; + let tx2 = db.begin_tx().await; + let tx3 = db.begin_tx().await; + + db.insert( + tx3, + Row { + id: 1, + data: "testme".to_string(), + }, + ) + .await + .unwrap(); + + db.commit_tx(tx1).await.unwrap(); + db.rollback_tx(tx2).await; + db.commit_tx(tx3).await.unwrap(); + + let tx4 = db.begin_tx().await; + db.insert( + tx4, + Row { + id: 2, + data: "testme2".to_string(), + }, + ) + .await + .unwrap(); + db.insert( + tx4, + Row { + id: 3, + data: "testme3".to_string(), + }, + ) + .await + .unwrap(); + + let mutation = db + .scan_storage() + .await + .unwrap(); + println!("{:?}", mutation); + + db.commit_tx(tx4).await.unwrap(); + + let mutation = db + .scan_storage() + .await + .unwrap(); + println!("{:?}", mutation); + + } } diff --git a/core/mvcc/database/src/errors.rs b/core/mvcc/database/src/errors.rs index 7bd5bab57..6cdad8ca3 100644 --- a/core/mvcc/database/src/errors.rs +++ b/core/mvcc/database/src/errors.rs @@ -8,4 +8,6 @@ pub enum DatabaseError { WriteWriteConflict, #[error("transaction is terminated")] TxTerminated, + #[error("I/O error: {0}")] + Io(String), } diff --git a/core/mvcc/database/src/lib.rs b/core/mvcc/database/src/lib.rs index a6b1082ba..d88011290 100644 --- a/core/mvcc/database/src/lib.rs +++ b/core/mvcc/database/src/lib.rs @@ -34,4 +34,5 @@ pub mod clock; pub mod database; pub mod errors; +pub mod persistent_storage; pub mod sync; diff --git a/core/mvcc/database/src/persistent_storage.rs b/core/mvcc/database/src/persistent_storage.rs new file mode 100644 index 000000000..6ec92c810 --- /dev/null +++ b/core/mvcc/database/src/persistent_storage.rs @@ -0,0 +1,97 @@ +use crate::database::{Result, Mutation}; +use crate::errors::DatabaseError; + +/// Persistent storage API for storing and retrieving transactions. +/// TODO: final design in heavy progress! +#[async_trait::async_trait] +pub trait Storage { + type Stream: futures::stream::Stream; + + async fn store(&mut self, m: Mutation) -> Result<()>; + async fn scan(&self) -> Result; +} + +pub struct Noop {} + +#[async_trait::async_trait] +impl Storage for Noop { + type Stream = futures::stream::Empty; + + async fn store(&mut self, _m: Mutation) -> Result<()> { + Ok(()) + } + + async fn scan(&self) -> Result { + Ok(futures::stream::empty()) + } +} + +pub struct JsonOnDisk { + pub path: std::path::PathBuf, +} + +impl JsonOnDisk { + pub fn new(path: impl Into) -> Self { + let path = path.into(); + Self { path } + } +} + +#[cfg(feature = "tokio")] +#[pin_project::pin_project] +pub struct JsonOnDiskStream { + #[pin] + inner: tokio_stream::wrappers::LinesStream>, +} + +impl futures::stream::Stream for JsonOnDiskStream { + type Item = Mutation; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let this = self.project(); + this.inner + .poll_next(cx) + .map(|x| x.and_then(|x| x.ok().and_then(|x| serde_json::from_str(x.as_str()).ok()))) + } +} + +#[cfg(feature = "tokio")] +#[async_trait::async_trait] +impl Storage for JsonOnDisk { + type Stream = JsonOnDiskStream; + + async fn store(&mut self, m: Mutation) -> Result<()> { + use tokio::io::AsyncWriteExt; + let t = serde_json::to_vec(&m).map_err(|e| DatabaseError::Io(e.to_string()))?; + let mut file = tokio::fs::OpenOptions::new() + .create(true) + .append(true) + .open(&self.path) + .await + .map_err(|e| DatabaseError::Io(e.to_string()))?; + file.write_all(&t) + .await + .map_err(|e| DatabaseError::Io(e.to_string()))?; + file.write_all(b"\n") + .await + .map_err(|e| DatabaseError::Io(e.to_string()))?; + Ok(()) + } + + async fn scan(&self) -> Result { + use tokio::io::AsyncBufReadExt; + let file = tokio::fs::OpenOptions::new() + .read(true) + .open(&self.path) + .await + .unwrap(); + Ok(JsonOnDiskStream { + inner: tokio_stream::wrappers::LinesStream::new( + tokio::io::BufReader::new(file).lines(), + ), + }) + } +} diff --git a/core/mvcc/database/tests/concurrency_test.rs b/core/mvcc/database/tests/concurrency_test.rs index b7b22b0e4..0626baab3 100644 --- a/core/mvcc/database/tests/concurrency_test.rs +++ b/core/mvcc/database/tests/concurrency_test.rs @@ -10,7 +10,8 @@ fn test_non_overlapping_concurrent_inserts() { // Two threads insert to the database concurrently using non-overlapping // row IDs. let clock = LocalClock::default(); - let db = Arc::new(Database::>::new(clock)); + let storage = mvcc_rs::persistent_storage::Noop {}; + let db = Arc::new(Database::<_, _, tokio::sync::Mutex<_>>::new(clock, storage)); let ids = Arc::new(AtomicU64::new(0)); shuttle::check_random( move || {