diff --git a/core/mvcc/docs/DESIGN.md b/core/mvcc/docs/DESIGN.md
index cae0d19d9..37943d992 100644
--- a/core/mvcc/docs/DESIGN.md
+++ b/core/mvcc/docs/DESIGN.md
@@ -2,18 +2,18 @@
## Persistent storage
-Persistent storage must implement the `Storage` trait that the MVCC module uses to essentially store a write-ahead log (WAL) of mutations.
+Persistent storage must implement the `Storage` trait that the MVCC module uses for transaction logging.
Figure 1 shows an example of write-ahead log across three transactions.
-The first transaction T0 executes a `INSERT (id) VALUES (1)` statement, which results in a mutation with `id` set to `1`, begin timestamp to 0 (which is the transaction ID) and end timestamp as infinity (meaning the row version is still visible).
-The second transaction T1 executes another `INSERT` statement, which adds another mutation to the WAL with `id` set to `2`, begin timesstamp to 1 and end timestamp as infinity, similar to what T0 did.
-Finally, a third transaction T2 executes two statements: `DELETE WHERE id = 1` and `INSERT (id) VALUES (3)`. The first one results in a mutation with `id` set to `1` and begin timestamp set to 0 (which is the transaction that created the entry). However, the end timestamp is now set to 2 (the current transaction), which means the entry is now deleted.
-The second statement results in an entry in the WAL similar to the `INSERT` statements in T0 and T1.
+The first transaction T0 executes a `INSERT (id) VALUES (1)` statement, which results in a log record with `id` set to `1`, begin timestamp to 0 (which is the transaction ID) and end timestamp as infinity (meaning the row version is still visible).
+The second transaction T1 executes another `INSERT` statement, which adds another log record to the transaction log with `id` set to `2`, begin timesstamp to 1 and end timestamp as infinity, similar to what T0 did.
+Finally, a third transaction T2 executes two statements: `DELETE WHERE id = 1` and `INSERT (id) VALUES (3)`. The first one results in a log record with `id` set to `1` and begin timestamp set to 0 (which is the transaction that created the entry). However, the end timestamp is now set to 2 (the current transaction), which means the entry is now deleted.
+The second statement results in an entry in the transaction log similar to the `INSERT` statements in T0 and T1.
-
+
-Figure 1. Write-ahead log of mutations across three transactions.
+Figure 1. Transaction log of three transactions.
-When MVCC bootstraps or recovers, it simply reads the write-ahead log into the in-memory index, and it's good to go.
-If the WAL grows big, we can compact it by dropping all entries that are no longer visible after the the latest transaction.
+When MVCC bootstraps or recovers, it simply redos the transaction log.
+If the transaction log grows big, we can checkpoint it it by dropping all entries that are no longer visible after the the latest transaction and create a snapshot.
diff --git a/core/mvcc/mvcc-rs/src/database.rs b/core/mvcc/mvcc-rs/src/database.rs
index dccfdcd17..329910237 100644
--- a/core/mvcc/mvcc-rs/src/database.rs
+++ b/core/mvcc/mvcc-rs/src/database.rs
@@ -24,13 +24,14 @@ pub struct RowVersion {
pub type TxID = u64;
+/// A log record contains all the versions inserted and deleted by a transaction.
#[derive(Clone, Debug, Serialize, Deserialize)]
-pub struct Mutation {
+pub struct LogRecord {
tx_id: TxID,
row_versions: Vec,
}
-impl Mutation {
+impl LogRecord {
fn new(tx_id: TxID) -> Self {
Self {
tx_id,
@@ -271,10 +272,15 @@ impl<
}
#[cfg(test)]
- pub(crate) async fn scan_storage(&self) -> Result> {
+ pub(crate) async fn scan_storage(&self) -> Result> {
use futures::StreamExt;
let inner = self.inner.lock().await;
- Ok(inner.storage.scan().await?.collect::>().await)
+ Ok(inner
+ .storage
+ .scan()
+ .await?
+ .collect::>()
+ .await)
}
}
@@ -382,20 +388,20 @@ impl
let mut rows = self.rows.borrow_mut();
tx.state = TransactionState::Preparing;
tracing::trace!("PREPARE {tx}");
- let mut mutation: Mutation = Mutation::new(tx_id);
+ let mut log_record: LogRecord = LogRecord::new(tx_id);
for id in &tx.write_set {
if let Some(row_versions) = rows.get_mut(id) {
for row_version in row_versions.iter_mut() {
if let TxTimestampOrID::TxID(id) = row_version.begin {
if id == tx_id {
row_version.begin = TxTimestampOrID::Timestamp(tx.begin_ts);
- mutation.row_versions.push(row_version.clone()); // FIXME: optimize cloning out
+ log_record.row_versions.push(row_version.clone()); // FIXME: optimize cloning out
}
}
if let Some(TxTimestampOrID::TxID(id)) = row_version.end {
if id == tx_id {
row_version.end = Some(TxTimestampOrID::Timestamp(end_ts));
- mutation.row_versions.push(row_version.clone()); // FIXME: optimize cloning out
+ log_record.row_versions.push(row_version.clone()); // FIXME: optimize cloning out
}
}
}
@@ -419,8 +425,8 @@ impl
txs.remove(&tx_id);
drop(rows);
drop(txs);
- if !mutation.row_versions.is_empty() {
- self.storage.store(mutation).await?;
+ if !log_record.row_versions.is_empty() {
+ self.storage.log_tx(log_record).await?;
}
Ok(())
}
@@ -949,12 +955,12 @@ mod tests {
.await
.unwrap();
- let mutation = db.scan_storage().await.unwrap();
- println!("{:?}", mutation);
+ let log_record = db.scan_storage().await.unwrap();
+ println!("{:?}", log_record);
db.commit_tx(tx4).await.unwrap();
- let mutation = db.scan_storage().await.unwrap();
- println!("{:?}", mutation);
+ let log_record = db.scan_storage().await.unwrap();
+ println!("{:?}", log_record);
}
}
diff --git a/core/mvcc/mvcc-rs/src/persistent_storage.rs b/core/mvcc/mvcc-rs/src/persistent_storage.rs
index 98b1fbd20..380dd2413 100644
--- a/core/mvcc/mvcc-rs/src/persistent_storage.rs
+++ b/core/mvcc/mvcc-rs/src/persistent_storage.rs
@@ -1,12 +1,13 @@
-use crate::database::{Mutation, Result};
+use crate::database::{LogRecord, Result};
/// Persistent storage API for storing and retrieving transactions.
/// TODO: final design in heavy progress!
#[async_trait::async_trait]
pub trait Storage {
- type Stream: futures::stream::Stream- ;
+ type Stream: futures::stream::Stream
- ;
+
+ async fn log_tx(&mut self, m: LogRecord) -> Result<()>;
- async fn store(&mut self, m: Mutation) -> Result<()>;
async fn scan(&self) -> Result;
}
@@ -14,9 +15,9 @@ pub struct Noop {}
#[async_trait::async_trait]
impl Storage for Noop {
- type Stream = futures::stream::Empty;
+ type Stream = futures::stream::Empty;
- async fn store(&mut self, _m: Mutation) -> Result<()> {
+ async fn log_tx(&mut self, _m: LogRecord) -> Result<()> {
Ok(())
}
@@ -45,7 +46,7 @@ pub struct JsonOnDiskStream {
#[cfg(feature = "tokio")]
impl futures::stream::Stream for JsonOnDiskStream {
- type Item = Mutation;
+ type Item = LogRecord;
fn poll_next(
self: std::pin::Pin<&mut Self>,
@@ -63,7 +64,7 @@ impl futures::stream::Stream for JsonOnDiskStream {
impl Storage for JsonOnDisk {
type Stream = JsonOnDiskStream;
- async fn store(&mut self, m: Mutation) -> Result<()> {
+ async fn log_tx(&mut self, m: LogRecord) -> Result<()> {
use crate::errors::DatabaseError;
use tokio::io::AsyncWriteExt;
let t = serde_json::to_vec(&m).map_err(|e| DatabaseError::Io(e.to_string()))?;