Merge pull request #34 from penberg/log-records

Rename mutations to log records
This commit is contained in:
Pekka Enberg
2023-05-10 12:18:50 +03:00
committed by GitHub
3 changed files with 36 additions and 29 deletions

View File

@@ -2,18 +2,18 @@
## Persistent storage
Persistent storage must implement the `Storage` trait that the MVCC module uses to essentially store a write-ahead log (WAL) of mutations.
Persistent storage must implement the `Storage` trait that the MVCC module uses for transaction logging.
Figure 1 shows an example of write-ahead log across three transactions.
The first transaction T0 executes a `INSERT (id) VALUES (1)` statement, which results in a mutation with `id` set to `1`, begin timestamp to 0 (which is the transaction ID) and end timestamp as infinity (meaning the row version is still visible).
The second transaction T1 executes another `INSERT` statement, which adds another mutation to the WAL with `id` set to `2`, begin timesstamp to 1 and end timestamp as infinity, similar to what T0 did.
Finally, a third transaction T2 executes two statements: `DELETE WHERE id = 1` and `INSERT (id) VALUES (3)`. The first one results in a mutation with `id` set to `1` and begin timestamp set to 0 (which is the transaction that created the entry). However, the end timestamp is now set to 2 (the current transaction), which means the entry is now deleted.
The second statement results in an entry in the WAL similar to the `INSERT` statements in T0 and T1.
The first transaction T0 executes a `INSERT (id) VALUES (1)` statement, which results in a log record with `id` set to `1`, begin timestamp to 0 (which is the transaction ID) and end timestamp as infinity (meaning the row version is still visible).
The second transaction T1 executes another `INSERT` statement, which adds another log record to the transaction log with `id` set to `2`, begin timesstamp to 1 and end timestamp as infinity, similar to what T0 did.
Finally, a third transaction T2 executes two statements: `DELETE WHERE id = 1` and `INSERT (id) VALUES (3)`. The first one results in a log record with `id` set to `1` and begin timestamp set to 0 (which is the transaction that created the entry). However, the end timestamp is now set to 2 (the current transaction), which means the entry is now deleted.
The second statement results in an entry in the transaction log similar to the `INSERT` statements in T0 and T1.
![Mutations](figures/mutations.png)
![Transactions](figures/transactions.png)
<p align="center">
Figure 1. Write-ahead log of mutations across three transactions.
Figure 1. Transaction log of three transactions.
</p>
When MVCC bootstraps or recovers, it simply reads the write-ahead log into the in-memory index, and it's good to go.
If the WAL grows big, we can compact it by dropping all entries that are no longer visible after the the latest transaction.
When MVCC bootstraps or recovers, it simply redos the transaction log.
If the transaction log grows big, we can checkpoint it it by dropping all entries that are no longer visible after the the latest transaction and create a snapshot.

View File

@@ -24,13 +24,14 @@ pub struct RowVersion {
pub type TxID = u64;
/// A log record contains all the versions inserted and deleted by a transaction.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Mutation {
pub struct LogRecord {
tx_id: TxID,
row_versions: Vec<RowVersion>,
}
impl Mutation {
impl LogRecord {
fn new(tx_id: TxID) -> Self {
Self {
tx_id,
@@ -271,10 +272,15 @@ impl<
}
#[cfg(test)]
pub(crate) async fn scan_storage(&self) -> Result<Vec<Mutation>> {
pub(crate) async fn scan_storage(&self) -> Result<Vec<LogRecord>> {
use futures::StreamExt;
let inner = self.inner.lock().await;
Ok(inner.storage.scan().await?.collect::<Vec<Mutation>>().await)
Ok(inner
.storage
.scan()
.await?
.collect::<Vec<LogRecord>>()
.await)
}
}
@@ -382,20 +388,20 @@ impl<Clock: LogicalClock, Storage: crate::persistent_storage::Storage>
let mut rows = self.rows.borrow_mut();
tx.state = TransactionState::Preparing;
tracing::trace!("PREPARE {tx}");
let mut mutation: Mutation = Mutation::new(tx_id);
let mut log_record: LogRecord = LogRecord::new(tx_id);
for id in &tx.write_set {
if let Some(row_versions) = rows.get_mut(id) {
for row_version in row_versions.iter_mut() {
if let TxTimestampOrID::TxID(id) = row_version.begin {
if id == tx_id {
row_version.begin = TxTimestampOrID::Timestamp(tx.begin_ts);
mutation.row_versions.push(row_version.clone()); // FIXME: optimize cloning out
log_record.row_versions.push(row_version.clone()); // FIXME: optimize cloning out
}
}
if let Some(TxTimestampOrID::TxID(id)) = row_version.end {
if id == tx_id {
row_version.end = Some(TxTimestampOrID::Timestamp(end_ts));
mutation.row_versions.push(row_version.clone()); // FIXME: optimize cloning out
log_record.row_versions.push(row_version.clone()); // FIXME: optimize cloning out
}
}
}
@@ -419,8 +425,8 @@ impl<Clock: LogicalClock, Storage: crate::persistent_storage::Storage>
txs.remove(&tx_id);
drop(rows);
drop(txs);
if !mutation.row_versions.is_empty() {
self.storage.store(mutation).await?;
if !log_record.row_versions.is_empty() {
self.storage.log_tx(log_record).await?;
}
Ok(())
}
@@ -949,12 +955,12 @@ mod tests {
.await
.unwrap();
let mutation = db.scan_storage().await.unwrap();
println!("{:?}", mutation);
let log_record = db.scan_storage().await.unwrap();
println!("{:?}", log_record);
db.commit_tx(tx4).await.unwrap();
let mutation = db.scan_storage().await.unwrap();
println!("{:?}", mutation);
let log_record = db.scan_storage().await.unwrap();
println!("{:?}", log_record);
}
}

View File

@@ -1,12 +1,13 @@
use crate::database::{Mutation, Result};
use crate::database::{LogRecord, Result};
/// Persistent storage API for storing and retrieving transactions.
/// TODO: final design in heavy progress!
#[async_trait::async_trait]
pub trait Storage {
type Stream: futures::stream::Stream<Item = Mutation>;
type Stream: futures::stream::Stream<Item = LogRecord>;
async fn log_tx(&mut self, m: LogRecord) -> Result<()>;
async fn store(&mut self, m: Mutation) -> Result<()>;
async fn scan(&self) -> Result<Self::Stream>;
}
@@ -14,9 +15,9 @@ pub struct Noop {}
#[async_trait::async_trait]
impl Storage for Noop {
type Stream = futures::stream::Empty<Mutation>;
type Stream = futures::stream::Empty<LogRecord>;
async fn store(&mut self, _m: Mutation) -> Result<()> {
async fn log_tx(&mut self, _m: LogRecord) -> Result<()> {
Ok(())
}
@@ -45,7 +46,7 @@ pub struct JsonOnDiskStream {
#[cfg(feature = "tokio")]
impl futures::stream::Stream for JsonOnDiskStream {
type Item = Mutation;
type Item = LogRecord;
fn poll_next(
self: std::pin::Pin<&mut Self>,
@@ -63,7 +64,7 @@ impl futures::stream::Stream for JsonOnDiskStream {
impl Storage for JsonOnDisk {
type Stream = JsonOnDiskStream;
async fn store(&mut self, m: Mutation) -> Result<()> {
async fn log_tx(&mut self, m: LogRecord) -> Result<()> {
use crate::errors::DatabaseError;
use tokio::io::AsyncWriteExt;
let t = serde_json::to_vec(&m).map_err(|e| DatabaseError::Io(e.to_string()))?;