From 8c56b381c06d3da2b5d6863d5a306f18b372de3a Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Tue, 9 May 2023 21:28:42 +0300 Subject: [PATCH] Rename mutations to log records The Hekaton paper talks about "log records" so let's just run with that terminology to avoid confusion. --- core/mvcc/docs/DESIGN.md | 18 ++++++------ core/mvcc/mvcc-rs/src/database.rs | 32 ++++++++++++--------- core/mvcc/mvcc-rs/src/persistent_storage.rs | 15 +++++----- 3 files changed, 36 insertions(+), 29 deletions(-) diff --git a/core/mvcc/docs/DESIGN.md b/core/mvcc/docs/DESIGN.md index cae0d19d9..37943d992 100644 --- a/core/mvcc/docs/DESIGN.md +++ b/core/mvcc/docs/DESIGN.md @@ -2,18 +2,18 @@ ## Persistent storage -Persistent storage must implement the `Storage` trait that the MVCC module uses to essentially store a write-ahead log (WAL) of mutations. +Persistent storage must implement the `Storage` trait that the MVCC module uses for transaction logging. Figure 1 shows an example of write-ahead log across three transactions. -The first transaction T0 executes a `INSERT (id) VALUES (1)` statement, which results in a mutation with `id` set to `1`, begin timestamp to 0 (which is the transaction ID) and end timestamp as infinity (meaning the row version is still visible). -The second transaction T1 executes another `INSERT` statement, which adds another mutation to the WAL with `id` set to `2`, begin timesstamp to 1 and end timestamp as infinity, similar to what T0 did. -Finally, a third transaction T2 executes two statements: `DELETE WHERE id = 1` and `INSERT (id) VALUES (3)`. The first one results in a mutation with `id` set to `1` and begin timestamp set to 0 (which is the transaction that created the entry). However, the end timestamp is now set to 2 (the current transaction), which means the entry is now deleted. -The second statement results in an entry in the WAL similar to the `INSERT` statements in T0 and T1. +The first transaction T0 executes a `INSERT (id) VALUES (1)` statement, which results in a log record with `id` set to `1`, begin timestamp to 0 (which is the transaction ID) and end timestamp as infinity (meaning the row version is still visible). +The second transaction T1 executes another `INSERT` statement, which adds another log record to the transaction log with `id` set to `2`, begin timesstamp to 1 and end timestamp as infinity, similar to what T0 did. +Finally, a third transaction T2 executes two statements: `DELETE WHERE id = 1` and `INSERT (id) VALUES (3)`. The first one results in a log record with `id` set to `1` and begin timestamp set to 0 (which is the transaction that created the entry). However, the end timestamp is now set to 2 (the current transaction), which means the entry is now deleted. +The second statement results in an entry in the transaction log similar to the `INSERT` statements in T0 and T1. -![Mutations](figures/mutations.png) +![Transactions](figures/transactions.png)

-Figure 1. Write-ahead log of mutations across three transactions. +Figure 1. Transaction log of three transactions.

-When MVCC bootstraps or recovers, it simply reads the write-ahead log into the in-memory index, and it's good to go. -If the WAL grows big, we can compact it by dropping all entries that are no longer visible after the the latest transaction. +When MVCC bootstraps or recovers, it simply redos the transaction log. +If the transaction log grows big, we can checkpoint it it by dropping all entries that are no longer visible after the the latest transaction and create a snapshot. diff --git a/core/mvcc/mvcc-rs/src/database.rs b/core/mvcc/mvcc-rs/src/database.rs index dccfdcd17..329910237 100644 --- a/core/mvcc/mvcc-rs/src/database.rs +++ b/core/mvcc/mvcc-rs/src/database.rs @@ -24,13 +24,14 @@ pub struct RowVersion { pub type TxID = u64; +/// A log record contains all the versions inserted and deleted by a transaction. #[derive(Clone, Debug, Serialize, Deserialize)] -pub struct Mutation { +pub struct LogRecord { tx_id: TxID, row_versions: Vec, } -impl Mutation { +impl LogRecord { fn new(tx_id: TxID) -> Self { Self { tx_id, @@ -271,10 +272,15 @@ impl< } #[cfg(test)] - pub(crate) async fn scan_storage(&self) -> Result> { + pub(crate) async fn scan_storage(&self) -> Result> { use futures::StreamExt; let inner = self.inner.lock().await; - Ok(inner.storage.scan().await?.collect::>().await) + Ok(inner + .storage + .scan() + .await? + .collect::>() + .await) } } @@ -382,20 +388,20 @@ impl let mut rows = self.rows.borrow_mut(); tx.state = TransactionState::Preparing; tracing::trace!("PREPARE {tx}"); - let mut mutation: Mutation = Mutation::new(tx_id); + let mut log_record: LogRecord = LogRecord::new(tx_id); for id in &tx.write_set { if let Some(row_versions) = rows.get_mut(id) { for row_version in row_versions.iter_mut() { if let TxTimestampOrID::TxID(id) = row_version.begin { if id == tx_id { row_version.begin = TxTimestampOrID::Timestamp(tx.begin_ts); - mutation.row_versions.push(row_version.clone()); // FIXME: optimize cloning out + log_record.row_versions.push(row_version.clone()); // FIXME: optimize cloning out } } if let Some(TxTimestampOrID::TxID(id)) = row_version.end { if id == tx_id { row_version.end = Some(TxTimestampOrID::Timestamp(end_ts)); - mutation.row_versions.push(row_version.clone()); // FIXME: optimize cloning out + log_record.row_versions.push(row_version.clone()); // FIXME: optimize cloning out } } } @@ -419,8 +425,8 @@ impl txs.remove(&tx_id); drop(rows); drop(txs); - if !mutation.row_versions.is_empty() { - self.storage.store(mutation).await?; + if !log_record.row_versions.is_empty() { + self.storage.log_tx(log_record).await?; } Ok(()) } @@ -949,12 +955,12 @@ mod tests { .await .unwrap(); - let mutation = db.scan_storage().await.unwrap(); - println!("{:?}", mutation); + let log_record = db.scan_storage().await.unwrap(); + println!("{:?}", log_record); db.commit_tx(tx4).await.unwrap(); - let mutation = db.scan_storage().await.unwrap(); - println!("{:?}", mutation); + let log_record = db.scan_storage().await.unwrap(); + println!("{:?}", log_record); } } diff --git a/core/mvcc/mvcc-rs/src/persistent_storage.rs b/core/mvcc/mvcc-rs/src/persistent_storage.rs index 98b1fbd20..380dd2413 100644 --- a/core/mvcc/mvcc-rs/src/persistent_storage.rs +++ b/core/mvcc/mvcc-rs/src/persistent_storage.rs @@ -1,12 +1,13 @@ -use crate::database::{Mutation, Result}; +use crate::database::{LogRecord, Result}; /// Persistent storage API for storing and retrieving transactions. /// TODO: final design in heavy progress! #[async_trait::async_trait] pub trait Storage { - type Stream: futures::stream::Stream; + type Stream: futures::stream::Stream; + + async fn log_tx(&mut self, m: LogRecord) -> Result<()>; - async fn store(&mut self, m: Mutation) -> Result<()>; async fn scan(&self) -> Result; } @@ -14,9 +15,9 @@ pub struct Noop {} #[async_trait::async_trait] impl Storage for Noop { - type Stream = futures::stream::Empty; + type Stream = futures::stream::Empty; - async fn store(&mut self, _m: Mutation) -> Result<()> { + async fn log_tx(&mut self, _m: LogRecord) -> Result<()> { Ok(()) } @@ -45,7 +46,7 @@ pub struct JsonOnDiskStream { #[cfg(feature = "tokio")] impl futures::stream::Stream for JsonOnDiskStream { - type Item = Mutation; + type Item = LogRecord; fn poll_next( self: std::pin::Pin<&mut Self>, @@ -63,7 +64,7 @@ impl futures::stream::Stream for JsonOnDiskStream { impl Storage for JsonOnDisk { type Stream = JsonOnDiskStream; - async fn store(&mut self, m: Mutation) -> Result<()> { + async fn log_tx(&mut self, m: LogRecord) -> Result<()> { use crate::errors::DatabaseError; use tokio::io::AsyncWriteExt; let t = serde_json::to_vec(&m).map_err(|e| DatabaseError::Io(e.to_string()))?;