diff --git a/core/mvcc/mvcc-rs/src/clock.rs b/core/mvcc/mvcc-rs/src/clock.rs index e6ef0dfc4..7bab1fe5d 100644 --- a/core/mvcc/mvcc-rs/src/clock.rs +++ b/core/mvcc/mvcc-rs/src/clock.rs @@ -3,6 +3,7 @@ use std::sync::atomic::{AtomicU64, Ordering}; /// Logical clock. pub trait LogicalClock { fn get_timestamp(&self) -> u64; + fn reset(&self, ts: u64); } /// A node-local clock backed by an atomic counter. @@ -23,4 +24,8 @@ impl LogicalClock for LocalClock { fn get_timestamp(&self) -> u64 { self.ts_sequence.fetch_add(1, Ordering::SeqCst) } + + fn reset(&self, ts: u64) { + self.ts_sequence.store(ts, Ordering::SeqCst); + } } diff --git a/core/mvcc/mvcc-rs/src/database.rs b/core/mvcc/mvcc-rs/src/database.rs index 329910237..42d0d1588 100644 --- a/core/mvcc/mvcc-rs/src/database.rs +++ b/core/mvcc/mvcc-rs/src/database.rs @@ -27,14 +27,14 @@ pub type TxID = u64; /// A log record contains all the versions inserted and deleted by a transaction. #[derive(Clone, Debug, Serialize, Deserialize)] pub struct LogRecord { - tx_id: TxID, + tx_timestamp: TxID, row_versions: Vec, } impl LogRecord { - fn new(tx_id: TxID) -> Self { + fn new(tx_timestamp: TxID) -> Self { Self { - tx_id, + tx_timestamp, row_versions: Vec::new(), } } @@ -271,16 +271,9 @@ impl< inner.drop_unused_row_versions(); } - #[cfg(test)] - pub(crate) async fn scan_storage(&self) -> Result> { - use futures::StreamExt; + pub async fn recover(&self) -> Result<()> { let inner = self.inner.lock().await; - Ok(inner - .storage - .scan() - .await? - .collect::>() - .await) + inner.recover().await } } @@ -388,7 +381,7 @@ impl let mut rows = self.rows.borrow_mut(); tx.state = TransactionState::Preparing; tracing::trace!("PREPARE {tx}"); - let mut log_record: LogRecord = LogRecord::new(tx_id); + let mut log_record: LogRecord = LogRecord::new(end_ts); for id in &tx.write_set { if let Some(row_versions) = rows.get_mut(id) { for row_version in row_versions.iter_mut() { @@ -505,6 +498,26 @@ impl rows.remove(&id); } } + + pub async fn recover(&self) -> Result<()> { + use futures::StreamExt; + let tx_log = self + .storage + .read_tx_log() + .await? + .collect::>() + .await; + for record in tx_log { + println!("RECOVERING {:?}", record); + for version in record.row_versions { + let mut rows = self.rows.borrow_mut(); + let row_versions = rows.entry(version.row.id).or_insert_with(Vec::new); + row_versions.push(version); + } + self.clock.reset(record.tx_timestamp); + } + Ok(()) + } } /// A write-write conflict happens when transaction T_m attempts to update a @@ -914,7 +927,7 @@ mod tests { .unwrap() .as_nanos(), )); - let storage = crate::persistent_storage::JsonOnDisk { path }; + let storage = crate::persistent_storage::JsonOnDisk { path: path.clone() }; let db: Database<_, _, tokio::sync::Mutex<_>> = Database::new(clock, storage); let tx1 = db.begin_tx().await; @@ -955,12 +968,21 @@ mod tests { .await .unwrap(); - let log_record = db.scan_storage().await.unwrap(); - println!("{:?}", log_record); - + assert_eq!(db.read(tx4, 1).await.unwrap().unwrap().data, "testme"); + assert_eq!(db.read(tx4, 2).await.unwrap().unwrap().data, "testme2"); + assert_eq!(db.read(tx4, 3).await.unwrap().unwrap().data, "testme3"); db.commit_tx(tx4).await.unwrap(); - let log_record = db.scan_storage().await.unwrap(); - println!("{:?}", log_record); + let clock = LocalClock::new(); + let storage = crate::persistent_storage::JsonOnDisk { path }; + let db: Database<_, _, tokio::sync::Mutex<_>> = Database::new(clock, storage); + db.recover().await.unwrap(); + println!("{:#?}", db); + + let tx5 = db.begin_tx().await; + println!("{:#?}", db.read(tx5, 1).await); + assert_eq!(db.read(tx5, 1).await.unwrap().unwrap().data, "testme"); + assert_eq!(db.read(tx5, 2).await.unwrap().unwrap().data, "testme2"); + assert_eq!(db.read(tx5, 3).await.unwrap().unwrap().data, "testme3"); } } diff --git a/core/mvcc/mvcc-rs/src/persistent_storage.rs b/core/mvcc/mvcc-rs/src/persistent_storage.rs index 380dd2413..2277e4d2c 100644 --- a/core/mvcc/mvcc-rs/src/persistent_storage.rs +++ b/core/mvcc/mvcc-rs/src/persistent_storage.rs @@ -6,9 +6,11 @@ use crate::database::{LogRecord, Result}; pub trait Storage { type Stream: futures::stream::Stream; + /// Append a transaction in the transaction log. async fn log_tx(&mut self, m: LogRecord) -> Result<()>; - async fn scan(&self) -> Result; + /// Read the transaction log for replay. + async fn read_tx_log(&self) -> Result; } pub struct Noop {} @@ -21,11 +23,12 @@ impl Storage for Noop { Ok(()) } - async fn scan(&self) -> Result { + async fn read_tx_log(&self) -> Result { Ok(futures::stream::empty()) } } +#[derive(Debug)] pub struct JsonOnDisk { pub path: std::path::PathBuf, } @@ -83,7 +86,7 @@ impl Storage for JsonOnDisk { Ok(()) } - async fn scan(&self) -> Result { + async fn read_tx_log(&self) -> Result { use tokio::io::AsyncBufReadExt; let file = tokio::fs::OpenOptions::new() .read(true)