Merge pull request #35 from penberg/recovery

Initial support for recovery
This commit is contained in:
Pekka Enberg
2023-05-10 13:28:16 +03:00
committed by GitHub
3 changed files with 52 additions and 22 deletions

View File

@@ -3,6 +3,7 @@ use std::sync::atomic::{AtomicU64, Ordering};
/// Logical clock.
pub trait LogicalClock {
fn get_timestamp(&self) -> u64;
fn reset(&self, ts: u64);
}
/// A node-local clock backed by an atomic counter.
@@ -23,4 +24,8 @@ impl LogicalClock for LocalClock {
fn get_timestamp(&self) -> u64 {
self.ts_sequence.fetch_add(1, Ordering::SeqCst)
}
fn reset(&self, ts: u64) {
self.ts_sequence.store(ts, Ordering::SeqCst);
}
}

View File

@@ -27,14 +27,14 @@ pub type TxID = u64;
/// A log record contains all the versions inserted and deleted by a transaction.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct LogRecord {
tx_id: TxID,
tx_timestamp: TxID,
row_versions: Vec<RowVersion>,
}
impl LogRecord {
fn new(tx_id: TxID) -> Self {
fn new(tx_timestamp: TxID) -> Self {
Self {
tx_id,
tx_timestamp,
row_versions: Vec::new(),
}
}
@@ -271,16 +271,9 @@ impl<
inner.drop_unused_row_versions();
}
#[cfg(test)]
pub(crate) async fn scan_storage(&self) -> Result<Vec<LogRecord>> {
use futures::StreamExt;
pub async fn recover(&self) -> Result<()> {
let inner = self.inner.lock().await;
Ok(inner
.storage
.scan()
.await?
.collect::<Vec<LogRecord>>()
.await)
inner.recover().await
}
}
@@ -388,7 +381,7 @@ impl<Clock: LogicalClock, Storage: crate::persistent_storage::Storage>
let mut rows = self.rows.borrow_mut();
tx.state = TransactionState::Preparing;
tracing::trace!("PREPARE {tx}");
let mut log_record: LogRecord = LogRecord::new(tx_id);
let mut log_record: LogRecord = LogRecord::new(end_ts);
for id in &tx.write_set {
if let Some(row_versions) = rows.get_mut(id) {
for row_version in row_versions.iter_mut() {
@@ -505,6 +498,26 @@ impl<Clock: LogicalClock, Storage: crate::persistent_storage::Storage>
rows.remove(&id);
}
}
pub async fn recover(&self) -> Result<()> {
use futures::StreamExt;
let tx_log = self
.storage
.read_tx_log()
.await?
.collect::<Vec<LogRecord>>()
.await;
for record in tx_log {
println!("RECOVERING {:?}", record);
for version in record.row_versions {
let mut rows = self.rows.borrow_mut();
let row_versions = rows.entry(version.row.id).or_insert_with(Vec::new);
row_versions.push(version);
}
self.clock.reset(record.tx_timestamp);
}
Ok(())
}
}
/// A write-write conflict happens when transaction T_m attempts to update a
@@ -914,7 +927,7 @@ mod tests {
.unwrap()
.as_nanos(),
));
let storage = crate::persistent_storage::JsonOnDisk { path };
let storage = crate::persistent_storage::JsonOnDisk { path: path.clone() };
let db: Database<_, _, tokio::sync::Mutex<_>> = Database::new(clock, storage);
let tx1 = db.begin_tx().await;
@@ -955,12 +968,21 @@ mod tests {
.await
.unwrap();
let log_record = db.scan_storage().await.unwrap();
println!("{:?}", log_record);
assert_eq!(db.read(tx4, 1).await.unwrap().unwrap().data, "testme");
assert_eq!(db.read(tx4, 2).await.unwrap().unwrap().data, "testme2");
assert_eq!(db.read(tx4, 3).await.unwrap().unwrap().data, "testme3");
db.commit_tx(tx4).await.unwrap();
let log_record = db.scan_storage().await.unwrap();
println!("{:?}", log_record);
let clock = LocalClock::new();
let storage = crate::persistent_storage::JsonOnDisk { path };
let db: Database<_, _, tokio::sync::Mutex<_>> = Database::new(clock, storage);
db.recover().await.unwrap();
println!("{:#?}", db);
let tx5 = db.begin_tx().await;
println!("{:#?}", db.read(tx5, 1).await);
assert_eq!(db.read(tx5, 1).await.unwrap().unwrap().data, "testme");
assert_eq!(db.read(tx5, 2).await.unwrap().unwrap().data, "testme2");
assert_eq!(db.read(tx5, 3).await.unwrap().unwrap().data, "testme3");
}
}

View File

@@ -6,9 +6,11 @@ use crate::database::{LogRecord, Result};
pub trait Storage {
type Stream: futures::stream::Stream<Item = LogRecord>;
/// Append a transaction in the transaction log.
async fn log_tx(&mut self, m: LogRecord) -> Result<()>;
async fn scan(&self) -> Result<Self::Stream>;
/// Read the transaction log for replay.
async fn read_tx_log(&self) -> Result<Self::Stream>;
}
pub struct Noop {}
@@ -21,11 +23,12 @@ impl Storage for Noop {
Ok(())
}
async fn scan(&self) -> Result<Self::Stream> {
async fn read_tx_log(&self) -> Result<Self::Stream> {
Ok(futures::stream::empty())
}
}
#[derive(Debug)]
pub struct JsonOnDisk {
pub path: std::path::PathBuf,
}
@@ -83,7 +86,7 @@ impl Storage for JsonOnDisk {
Ok(())
}
async fn scan(&self) -> Result<Self::Stream> {
async fn read_tx_log(&self) -> Result<Self::Stream> {
use tokio::io::AsyncBufReadExt;
let file = tokio::fs::OpenOptions::new()
.read(true)