From db71c7e4e38cea013dc5b9d417a8794923697ab2 Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Mon, 17 Apr 2023 10:45:25 +0200 Subject: [PATCH 1/3] database: make transactions (de)serializable --- core/mvcc/database/src/database.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/core/mvcc/database/src/database.rs b/core/mvcc/database/src/database.rs index c601e7a38..0e4616f5d 100644 --- a/core/mvcc/database/src/database.rs +++ b/core/mvcc/database/src/database.rs @@ -1,20 +1,21 @@ use crate::clock::LogicalClock; use crate::errors::DatabaseError; +use serde::{Deserialize, Serialize}; use std::cell::RefCell; use std::collections::{HashMap, HashSet}; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; -type Result = std::result::Result; +pub type Result = std::result::Result; -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct Row { pub id: u64, pub data: String, } /// A row version. -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] struct RowVersion { begin: TxTimestampOrID, end: Option, @@ -29,14 +30,14 @@ type TxID = u64; /// phase of the transaction. During the active phase, new versions track the /// transaction ID in the `begin` and `end` fields. After a transaction commits, /// versions switch to tracking timestamps. -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] enum TxTimestampOrID { Timestamp(u64), TxID(TxID), } /// Transaction -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct Transaction { /// The state of the transaction. state: TransactionState, @@ -89,7 +90,7 @@ impl std::fmt::Display for Transaction { } /// Transaction state. -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] enum TransactionState { Active, Preparing, From 7ca68b3d9691ae163d05bf71aba7a01030a6ad50 Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Mon, 17 Apr 2023 12:06:34 +0200 Subject: [PATCH 2/3] errors: Add I/O error class --- core/mvcc/database/src/errors.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/mvcc/database/src/errors.rs b/core/mvcc/database/src/errors.rs index 7bd5bab57..6cdad8ca3 100644 --- a/core/mvcc/database/src/errors.rs +++ b/core/mvcc/database/src/errors.rs @@ -8,4 +8,6 @@ pub enum DatabaseError { WriteWriteConflict, #[error("transaction is terminated")] TxTerminated, + #[error("I/O error: {0}")] + Io(String), } From 04a78f73fb7a40d816a73aeb97a493569b85ea47 Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Mon, 17 Apr 2023 12:52:42 +0200 Subject: [PATCH 3/3] treewide: add persistent storage trait This draft adds a persistent storage trait that can be used to store transaction logs and read the log for recovery purposes. Work in heavy progress, because ideally the design should also allow reading versions from the storage, so that data can be spilled from memory to disk if there's not enough RAM available. --- core/mvcc/database/Cargo.toml | 6 +- core/mvcc/database/benches/my_benchmark.rs | 33 ++-- core/mvcc/database/src/database.rs | 178 +++++++++++++++---- core/mvcc/database/src/lib.rs | 1 + core/mvcc/database/src/persistent_storage.rs | 97 ++++++++++ core/mvcc/database/tests/concurrency_test.rs | 3 +- 6 files changed, 268 insertions(+), 50 deletions(-) create mode 100644 core/mvcc/database/src/persistent_storage.rs diff --git a/core/mvcc/database/Cargo.toml b/core/mvcc/database/Cargo.toml index 36809c922..a39fb4069 100644 --- a/core/mvcc/database/Cargo.toml +++ b/core/mvcc/database/Cargo.toml @@ -10,6 +10,10 @@ futures = "0.3.28" thiserror = "1.0.40" tracing = "0.1.37" tokio = { version = "1.27.0", features = ["full"], optional = true } +tokio-stream = { version = "0.1.12", optional = true, features = ["io-util"] } +serde = { version = "1.0.160", features = ["derive"] } +serde_json = "1.0.96" +pin-project = "1.0.12" [dev-dependencies] criterion = { version = "0.4", features = ["html_reports", "async", "async_futures"] } @@ -27,4 +31,4 @@ harness = false [features] default = [] full = ["tokio"] -tokio = ["dep:tokio"] +tokio = ["dep:tokio", "dep:tokio-stream"] diff --git a/core/mvcc/database/benches/my_benchmark.rs b/core/mvcc/database/benches/my_benchmark.rs index 1515b7579..3c360107a 100644 --- a/core/mvcc/database/benches/my_benchmark.rs +++ b/core/mvcc/database/benches/my_benchmark.rs @@ -4,20 +4,30 @@ use mvcc_rs::clock::LocalClock; use mvcc_rs::database::{Database, Row}; use pprof::criterion::{Output, PProfProfiler}; +fn bench_db() -> Database< + LocalClock, + mvcc_rs::persistent_storage::Noop, + tokio::sync::Mutex< + mvcc_rs::database::DatabaseInner, + >, +> { + let clock = LocalClock::default(); + let storage = mvcc_rs::persistent_storage::Noop {}; + Database::<_, _, tokio::sync::Mutex<_>>::new(clock, storage) +} + fn bench(c: &mut Criterion) { let mut group = c.benchmark_group("mvcc-ops-throughput"); group.throughput(Throughput::Elements(1)); - let clock = LocalClock::default(); - let db = Database::>::new(clock); + let db = bench_db(); group.bench_function("begin_tx", |b| { b.to_async(FuturesExecutor).iter(|| async { db.begin_tx().await; }) }); - let clock = LocalClock::default(); - let db = Database::>::new(clock); + let db = bench_db(); group.bench_function("begin_tx + rollback_tx", |b| { b.to_async(FuturesExecutor).iter(|| async { let tx_id = db.begin_tx().await; @@ -25,8 +35,7 @@ fn bench(c: &mut Criterion) { }) }); - let clock = LocalClock::default(); - let db = Database::>::new(clock); + let db = bench_db(); group.bench_function("begin_tx + commit_tx", |b| { b.to_async(FuturesExecutor).iter(|| async { let tx_id = db.begin_tx().await; @@ -34,8 +43,7 @@ fn bench(c: &mut Criterion) { }) }); - let clock = LocalClock::default(); - let db = Database::>::new(clock); + let db = bench_db(); group.bench_function("begin_tx-read-commit_tx", |b| { b.to_async(FuturesExecutor).iter(|| async { let tx_id = db.begin_tx().await; @@ -44,8 +52,7 @@ fn bench(c: &mut Criterion) { }) }); - let clock = LocalClock::default(); - let db = Database::>::new(clock); + let db = bench_db(); group.bench_function("begin_tx-update-commit_tx", |b| { b.to_async(FuturesExecutor).iter(|| async { let tx_id = db.begin_tx().await; @@ -62,8 +69,7 @@ fn bench(c: &mut Criterion) { }) }); - let clock = LocalClock::default(); - let db = Database::>::new(clock); + let db = bench_db(); let tx = futures::executor::block_on(db.begin_tx()); futures::executor::block_on(db.insert( tx, @@ -79,8 +85,7 @@ fn bench(c: &mut Criterion) { }) }); - let clock = LocalClock::default(); - let db = Database::>::new(clock); + let db = bench_db(); let tx = futures::executor::block_on(db.begin_tx()); futures::executor::block_on(db.insert( tx, diff --git a/core/mvcc/database/src/database.rs b/core/mvcc/database/src/database.rs index 0e4616f5d..98cbefcea 100644 --- a/core/mvcc/database/src/database.rs +++ b/core/mvcc/database/src/database.rs @@ -16,13 +16,28 @@ pub struct Row { /// A row version. #[derive(Clone, Debug, Serialize, Deserialize)] -struct RowVersion { +pub struct RowVersion { begin: TxTimestampOrID, end: Option, row: Row, } -type TxID = u64; +pub type TxID = u64; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct Mutation { + tx_id: TxID, + row_versions: Vec, +} + +impl Mutation { + fn new(tx_id: TxID) -> Self { + Self { + tx_id, + row_versions: Vec::new(), + } + } +} /// A transaction timestamp or ID. /// @@ -103,21 +118,26 @@ enum TransactionState { #[derive(Debug)] pub struct Database< Clock: LogicalClock, - AsyncMutex: crate::sync::AsyncMutex>, + Storage: crate::persistent_storage::Storage, + AsyncMutex: crate::sync::AsyncMutex>, > { inner: Arc, } -impl>> - Database +impl< + Clock: LogicalClock, + Storage: crate::persistent_storage::Storage, + AsyncMutex: crate::sync::AsyncMutex>, + > Database { /// Creates a new database. - pub fn new(clock: Clock) -> Self { + pub fn new(clock: Clock, storage: Storage) -> Self { let inner = DatabaseInner { rows: RefCell::new(HashMap::new()), txs: RefCell::new(HashMap::new()), tx_ids: AtomicU64::new(0), clock, + storage, }; Self { inner: Arc::new(AsyncMutex::new(inner)), @@ -239,17 +259,32 @@ impl Result> { + use futures::StreamExt; + let inner = self.inner.lock().await; + Ok(inner + .storage + .scan() + .await? + .collect::>() + .await) + } } #[derive(Debug)] -pub struct DatabaseInner { +pub struct DatabaseInner { rows: RefCell>>, txs: RefCell>, tx_ids: AtomicU64, clock: Clock, + storage: Storage, } -impl DatabaseInner { +impl + DatabaseInner +{ async fn insert(&self, tx_id: TxID, row: Row) -> Result<()> { let mut txs = self.txs.borrow_mut(); let tx = txs @@ -325,6 +360,7 @@ impl DatabaseInner { tx_id } + #[allow(clippy::await_holding_refcell_ref)] async fn commit_tx(&mut self, tx_id: TxID) -> Result<()> { let end_ts = self.get_timestamp(); let mut txs = self.txs.borrow_mut(); @@ -338,17 +374,20 @@ impl DatabaseInner { let mut rows = self.rows.borrow_mut(); tx.state = TransactionState::Preparing; tracing::trace!("PREPARE {tx}"); + let mut mutation: Mutation = Mutation::new(tx_id); for id in &tx.write_set { if let Some(row_versions) = rows.get_mut(id) { for row_version in row_versions.iter_mut() { if let TxTimestampOrID::TxID(id) = row_version.begin { if id == tx_id { row_version.begin = TxTimestampOrID::Timestamp(tx.begin_ts); + mutation.row_versions.push(row_version.clone()); // FIXME: optimize cloning out } } if let Some(TxTimestampOrID::TxID(id)) = row_version.end { if id == tx_id { row_version.end = Some(TxTimestampOrID::Timestamp(end_ts)); + mutation.row_versions.push(row_version.clone()); // FIXME: optimize cloning out } } } @@ -363,6 +402,11 @@ impl DatabaseInner { // might have speculatively read a version that we want to remove. // But that's a problem for another day. txs.remove(&tx_id); + drop(rows); + drop(txs); + if !mutation.row_versions.is_empty() { + self.storage.store(mutation).await?; + } Ok(()) } @@ -460,11 +504,20 @@ mod tests { use crate::clock::LocalClock; use tracing_test::traced_test; + fn test_db() -> Database< + LocalClock, + crate::persistent_storage::Noop, + tokio::sync::Mutex>, + > { + let clock = LocalClock::new(); + let storage = crate::persistent_storage::Noop {}; + Database::<_, _, tokio::sync::Mutex<_>>::new(clock, storage) + } + #[traced_test] #[tokio::test] async fn test_insert_read() { - let clock = LocalClock::default(); - let db = Database::>::new(clock); + let db = test_db(); let tx1 = db.begin_tx().await; let tx1_row = Row { @@ -484,8 +537,7 @@ mod tests { #[traced_test] #[tokio::test] async fn test_read_nonexistent() { - let clock = LocalClock::default(); - let db = Database::>::new(clock); + let db = test_db(); let tx = db.begin_tx().await; let row = db.read(tx, 1).await; assert!(row.unwrap().is_none()); @@ -494,8 +546,7 @@ mod tests { #[traced_test] #[tokio::test] async fn test_delete() { - let clock = LocalClock::default(); - let db = Database::>::new(clock); + let db = test_db(); let tx1 = db.begin_tx().await; let tx1_row = Row { @@ -518,8 +569,7 @@ mod tests { #[traced_test] #[tokio::test] async fn test_delete_nonexistent() { - let clock = LocalClock::default(); - let db = Database::>::new(clock); + let db = test_db(); let tx = db.begin_tx().await; assert!(!db.delete(tx, 1).await.unwrap()); } @@ -527,8 +577,7 @@ mod tests { #[traced_test] #[tokio::test] async fn test_commit() { - let clock = LocalClock::default(); - let db = Database::>::new(clock); + let db = test_db(); let tx1 = db.begin_tx().await; let tx1_row = Row { id: 1, @@ -555,8 +604,7 @@ mod tests { #[traced_test] #[tokio::test] async fn test_rollback() { - let clock = LocalClock::default(); - let db = Database::>::new(clock); + let db = test_db(); let tx1 = db.begin_tx().await; let row1 = Row { id: 1, @@ -581,8 +629,7 @@ mod tests { #[traced_test] #[tokio::test] async fn test_dirty_write() { - let clock = LocalClock::default(); - let db = Database::>::new(clock); + let db = test_db(); // T1 inserts a row with ID 1, but does not commit. let tx1 = db.begin_tx().await; @@ -609,8 +656,7 @@ mod tests { #[traced_test] #[tokio::test] async fn test_dirty_read() { - let clock = LocalClock::default(); - let db = Database::>::new(clock); + let db = test_db(); // T1 inserts a row with ID 1, but does not commit. let tx1 = db.begin_tx().await; @@ -630,8 +676,7 @@ mod tests { #[traced_test] #[tokio::test] async fn test_dirty_read_deleted() { - let clock = LocalClock::default(); - let db = Database::>::new(clock); + let db = test_db(); // T1 inserts a row with ID 1 and commits. let tx1 = db.begin_tx().await; @@ -655,8 +700,7 @@ mod tests { #[traced_test] #[tokio::test] async fn test_fuzzy_read() { - let clock = LocalClock::default(); - let db = Database::>::new(clock); + let db = test_db(); // T1 inserts a row with ID 1 and commits. let tx1 = db.begin_tx().await; @@ -691,8 +735,7 @@ mod tests { #[traced_test] #[tokio::test] async fn test_lost_update() { - let clock = LocalClock::default(); - let db = Database::>::new(clock); + let db = test_db(); // T1 inserts a row with ID 1 and commits. let tx1 = db.begin_tx().await; @@ -737,8 +780,7 @@ mod tests { #[traced_test] #[tokio::test] async fn test_committed_visibility() { - let clock = LocalClock::default(); - let db = Database::>::new(clock); + let db = test_db(); // let's add $10 to my account since I like money let tx1 = db.begin_tx().await; @@ -769,8 +811,7 @@ mod tests { #[traced_test] #[tokio::test] async fn test_future_row() { - let clock = LocalClock::default(); - let db = Database::>::new(clock); + let db = test_db(); let tx1 = db.begin_tx().await; @@ -790,4 +831,73 @@ mod tests { let row = db.read(tx1, 1).await.unwrap(); assert_eq!(row, None); } + + #[traced_test] + #[tokio::test] + async fn test_storage1() { + let clock = LocalClock::new(); + let mut path = std::env::temp_dir(); + path.push(format!( + "mvcc-rs-storage-test-{}", + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_nanos(), + )); + let storage = crate::persistent_storage::JsonOnDisk { path }; + let db: Database<_, _, tokio::sync::Mutex<_>> = Database::new(clock, storage); + + let tx1 = db.begin_tx().await; + let tx2 = db.begin_tx().await; + let tx3 = db.begin_tx().await; + + db.insert( + tx3, + Row { + id: 1, + data: "testme".to_string(), + }, + ) + .await + .unwrap(); + + db.commit_tx(tx1).await.unwrap(); + db.rollback_tx(tx2).await; + db.commit_tx(tx3).await.unwrap(); + + let tx4 = db.begin_tx().await; + db.insert( + tx4, + Row { + id: 2, + data: "testme2".to_string(), + }, + ) + .await + .unwrap(); + db.insert( + tx4, + Row { + id: 3, + data: "testme3".to_string(), + }, + ) + .await + .unwrap(); + + let mutation = db + .scan_storage() + .await + .unwrap(); + println!("{:?}", mutation); + + db.commit_tx(tx4).await.unwrap(); + + let mutation = db + .scan_storage() + .await + .unwrap(); + println!("{:?}", mutation); + + } } diff --git a/core/mvcc/database/src/lib.rs b/core/mvcc/database/src/lib.rs index a6b1082ba..d88011290 100644 --- a/core/mvcc/database/src/lib.rs +++ b/core/mvcc/database/src/lib.rs @@ -34,4 +34,5 @@ pub mod clock; pub mod database; pub mod errors; +pub mod persistent_storage; pub mod sync; diff --git a/core/mvcc/database/src/persistent_storage.rs b/core/mvcc/database/src/persistent_storage.rs new file mode 100644 index 000000000..6ec92c810 --- /dev/null +++ b/core/mvcc/database/src/persistent_storage.rs @@ -0,0 +1,97 @@ +use crate::database::{Result, Mutation}; +use crate::errors::DatabaseError; + +/// Persistent storage API for storing and retrieving transactions. +/// TODO: final design in heavy progress! +#[async_trait::async_trait] +pub trait Storage { + type Stream: futures::stream::Stream; + + async fn store(&mut self, m: Mutation) -> Result<()>; + async fn scan(&self) -> Result; +} + +pub struct Noop {} + +#[async_trait::async_trait] +impl Storage for Noop { + type Stream = futures::stream::Empty; + + async fn store(&mut self, _m: Mutation) -> Result<()> { + Ok(()) + } + + async fn scan(&self) -> Result { + Ok(futures::stream::empty()) + } +} + +pub struct JsonOnDisk { + pub path: std::path::PathBuf, +} + +impl JsonOnDisk { + pub fn new(path: impl Into) -> Self { + let path = path.into(); + Self { path } + } +} + +#[cfg(feature = "tokio")] +#[pin_project::pin_project] +pub struct JsonOnDiskStream { + #[pin] + inner: tokio_stream::wrappers::LinesStream>, +} + +impl futures::stream::Stream for JsonOnDiskStream { + type Item = Mutation; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let this = self.project(); + this.inner + .poll_next(cx) + .map(|x| x.and_then(|x| x.ok().and_then(|x| serde_json::from_str(x.as_str()).ok()))) + } +} + +#[cfg(feature = "tokio")] +#[async_trait::async_trait] +impl Storage for JsonOnDisk { + type Stream = JsonOnDiskStream; + + async fn store(&mut self, m: Mutation) -> Result<()> { + use tokio::io::AsyncWriteExt; + let t = serde_json::to_vec(&m).map_err(|e| DatabaseError::Io(e.to_string()))?; + let mut file = tokio::fs::OpenOptions::new() + .create(true) + .append(true) + .open(&self.path) + .await + .map_err(|e| DatabaseError::Io(e.to_string()))?; + file.write_all(&t) + .await + .map_err(|e| DatabaseError::Io(e.to_string()))?; + file.write_all(b"\n") + .await + .map_err(|e| DatabaseError::Io(e.to_string()))?; + Ok(()) + } + + async fn scan(&self) -> Result { + use tokio::io::AsyncBufReadExt; + let file = tokio::fs::OpenOptions::new() + .read(true) + .open(&self.path) + .await + .unwrap(); + Ok(JsonOnDiskStream { + inner: tokio_stream::wrappers::LinesStream::new( + tokio::io::BufReader::new(file).lines(), + ), + }) + } +} diff --git a/core/mvcc/database/tests/concurrency_test.rs b/core/mvcc/database/tests/concurrency_test.rs index b7b22b0e4..0626baab3 100644 --- a/core/mvcc/database/tests/concurrency_test.rs +++ b/core/mvcc/database/tests/concurrency_test.rs @@ -10,7 +10,8 @@ fn test_non_overlapping_concurrent_inserts() { // Two threads insert to the database concurrently using non-overlapping // row IDs. let clock = LocalClock::default(); - let db = Arc::new(Database::>::new(clock)); + let storage = mvcc_rs::persistent_storage::Noop {}; + let db = Arc::new(Database::<_, _, tokio::sync::Mutex<_>>::new(clock, storage)); let ids = Arc::new(AtomicU64::new(0)); shuttle::check_random( move || {