treewide: add persistent storage trait

This draft adds a persistent storage trait that can be used
to store transaction logs and read the log for recovery purposes.
Work in heavy progress, because ideally the design should also
allow reading versions from the storage, so that data can be
spilled from memory to disk if there's not enough RAM available.
This commit is contained in:
Piotr Sarna
2023-04-17 12:52:42 +02:00
parent 7ca68b3d96
commit 04a78f73fb
6 changed files with 268 additions and 50 deletions

View File

@@ -10,6 +10,10 @@ futures = "0.3.28"
thiserror = "1.0.40"
tracing = "0.1.37"
tokio = { version = "1.27.0", features = ["full"], optional = true }
tokio-stream = { version = "0.1.12", optional = true, features = ["io-util"] }
serde = { version = "1.0.160", features = ["derive"] }
serde_json = "1.0.96"
pin-project = "1.0.12"
[dev-dependencies]
criterion = { version = "0.4", features = ["html_reports", "async", "async_futures"] }
@@ -27,4 +31,4 @@ harness = false
[features]
default = []
full = ["tokio"]
tokio = ["dep:tokio"]
tokio = ["dep:tokio", "dep:tokio-stream"]

View File

@@ -4,20 +4,30 @@ use mvcc_rs::clock::LocalClock;
use mvcc_rs::database::{Database, Row};
use pprof::criterion::{Output, PProfProfiler};
fn bench_db() -> Database<
LocalClock,
mvcc_rs::persistent_storage::Noop,
tokio::sync::Mutex<
mvcc_rs::database::DatabaseInner<LocalClock, mvcc_rs::persistent_storage::Noop>,
>,
> {
let clock = LocalClock::default();
let storage = mvcc_rs::persistent_storage::Noop {};
Database::<_, _, tokio::sync::Mutex<_>>::new(clock, storage)
}
fn bench(c: &mut Criterion) {
let mut group = c.benchmark_group("mvcc-ops-throughput");
group.throughput(Throughput::Elements(1));
let clock = LocalClock::default();
let db = Database::<LocalClock, tokio::sync::Mutex<_>>::new(clock);
let db = bench_db();
group.bench_function("begin_tx", |b| {
b.to_async(FuturesExecutor).iter(|| async {
db.begin_tx().await;
})
});
let clock = LocalClock::default();
let db = Database::<LocalClock, tokio::sync::Mutex<_>>::new(clock);
let db = bench_db();
group.bench_function("begin_tx + rollback_tx", |b| {
b.to_async(FuturesExecutor).iter(|| async {
let tx_id = db.begin_tx().await;
@@ -25,8 +35,7 @@ fn bench(c: &mut Criterion) {
})
});
let clock = LocalClock::default();
let db = Database::<LocalClock, tokio::sync::Mutex<_>>::new(clock);
let db = bench_db();
group.bench_function("begin_tx + commit_tx", |b| {
b.to_async(FuturesExecutor).iter(|| async {
let tx_id = db.begin_tx().await;
@@ -34,8 +43,7 @@ fn bench(c: &mut Criterion) {
})
});
let clock = LocalClock::default();
let db = Database::<LocalClock, tokio::sync::Mutex<_>>::new(clock);
let db = bench_db();
group.bench_function("begin_tx-read-commit_tx", |b| {
b.to_async(FuturesExecutor).iter(|| async {
let tx_id = db.begin_tx().await;
@@ -44,8 +52,7 @@ fn bench(c: &mut Criterion) {
})
});
let clock = LocalClock::default();
let db = Database::<LocalClock, tokio::sync::Mutex<_>>::new(clock);
let db = bench_db();
group.bench_function("begin_tx-update-commit_tx", |b| {
b.to_async(FuturesExecutor).iter(|| async {
let tx_id = db.begin_tx().await;
@@ -62,8 +69,7 @@ fn bench(c: &mut Criterion) {
})
});
let clock = LocalClock::default();
let db = Database::<LocalClock, tokio::sync::Mutex<_>>::new(clock);
let db = bench_db();
let tx = futures::executor::block_on(db.begin_tx());
futures::executor::block_on(db.insert(
tx,
@@ -79,8 +85,7 @@ fn bench(c: &mut Criterion) {
})
});
let clock = LocalClock::default();
let db = Database::<LocalClock, tokio::sync::Mutex<_>>::new(clock);
let db = bench_db();
let tx = futures::executor::block_on(db.begin_tx());
futures::executor::block_on(db.insert(
tx,

View File

@@ -16,13 +16,28 @@ pub struct Row {
/// A row version.
#[derive(Clone, Debug, Serialize, Deserialize)]
struct RowVersion {
pub struct RowVersion {
begin: TxTimestampOrID,
end: Option<TxTimestampOrID>,
row: Row,
}
type TxID = u64;
pub type TxID = u64;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Mutation {
tx_id: TxID,
row_versions: Vec<RowVersion>,
}
impl Mutation {
fn new(tx_id: TxID) -> Self {
Self {
tx_id,
row_versions: Vec::new(),
}
}
}
/// A transaction timestamp or ID.
///
@@ -103,21 +118,26 @@ enum TransactionState {
#[derive(Debug)]
pub struct Database<
Clock: LogicalClock,
AsyncMutex: crate::sync::AsyncMutex<Inner = DatabaseInner<Clock>>,
Storage: crate::persistent_storage::Storage,
AsyncMutex: crate::sync::AsyncMutex<Inner = DatabaseInner<Clock, Storage>>,
> {
inner: Arc<AsyncMutex>,
}
impl<Clock: LogicalClock, AsyncMutex: crate::sync::AsyncMutex<Inner = DatabaseInner<Clock>>>
Database<Clock, AsyncMutex>
impl<
Clock: LogicalClock,
Storage: crate::persistent_storage::Storage,
AsyncMutex: crate::sync::AsyncMutex<Inner = DatabaseInner<Clock, Storage>>,
> Database<Clock, Storage, AsyncMutex>
{
/// Creates a new database.
pub fn new(clock: Clock) -> Self {
pub fn new(clock: Clock, storage: Storage) -> Self {
let inner = DatabaseInner {
rows: RefCell::new(HashMap::new()),
txs: RefCell::new(HashMap::new()),
tx_ids: AtomicU64::new(0),
clock,
storage,
};
Self {
inner: Arc::new(AsyncMutex::new(inner)),
@@ -239,17 +259,32 @@ impl<Clock: LogicalClock, AsyncMutex: crate::sync::AsyncMutex<Inner = DatabaseIn
let inner = self.inner.lock().await;
inner.rollback_tx(tx_id).await;
}
#[cfg(test)]
pub(crate) async fn scan_storage(&self) -> Result<Vec<Mutation>> {
use futures::StreamExt;
let inner = self.inner.lock().await;
Ok(inner
.storage
.scan()
.await?
.collect::<Vec<Mutation>>()
.await)
}
}
#[derive(Debug)]
pub struct DatabaseInner<Clock: LogicalClock> {
pub struct DatabaseInner<Clock: LogicalClock, Storage: crate::persistent_storage::Storage> {
rows: RefCell<HashMap<u64, Vec<RowVersion>>>,
txs: RefCell<HashMap<TxID, Transaction>>,
tx_ids: AtomicU64,
clock: Clock,
storage: Storage,
}
impl<Clock: LogicalClock> DatabaseInner<Clock> {
impl<Clock: LogicalClock, Storage: crate::persistent_storage::Storage>
DatabaseInner<Clock, Storage>
{
async fn insert(&self, tx_id: TxID, row: Row) -> Result<()> {
let mut txs = self.txs.borrow_mut();
let tx = txs
@@ -325,6 +360,7 @@ impl<Clock: LogicalClock> DatabaseInner<Clock> {
tx_id
}
#[allow(clippy::await_holding_refcell_ref)]
async fn commit_tx(&mut self, tx_id: TxID) -> Result<()> {
let end_ts = self.get_timestamp();
let mut txs = self.txs.borrow_mut();
@@ -338,17 +374,20 @@ impl<Clock: LogicalClock> DatabaseInner<Clock> {
let mut rows = self.rows.borrow_mut();
tx.state = TransactionState::Preparing;
tracing::trace!("PREPARE {tx}");
let mut mutation: Mutation = Mutation::new(tx_id);
for id in &tx.write_set {
if let Some(row_versions) = rows.get_mut(id) {
for row_version in row_versions.iter_mut() {
if let TxTimestampOrID::TxID(id) = row_version.begin {
if id == tx_id {
row_version.begin = TxTimestampOrID::Timestamp(tx.begin_ts);
mutation.row_versions.push(row_version.clone()); // FIXME: optimize cloning out
}
}
if let Some(TxTimestampOrID::TxID(id)) = row_version.end {
if id == tx_id {
row_version.end = Some(TxTimestampOrID::Timestamp(end_ts));
mutation.row_versions.push(row_version.clone()); // FIXME: optimize cloning out
}
}
}
@@ -363,6 +402,11 @@ impl<Clock: LogicalClock> DatabaseInner<Clock> {
// might have speculatively read a version that we want to remove.
// But that's a problem for another day.
txs.remove(&tx_id);
drop(rows);
drop(txs);
if !mutation.row_versions.is_empty() {
self.storage.store(mutation).await?;
}
Ok(())
}
@@ -460,11 +504,20 @@ mod tests {
use crate::clock::LocalClock;
use tracing_test::traced_test;
fn test_db() -> Database<
LocalClock,
crate::persistent_storage::Noop,
tokio::sync::Mutex<DatabaseInner<LocalClock, crate::persistent_storage::Noop>>,
> {
let clock = LocalClock::new();
let storage = crate::persistent_storage::Noop {};
Database::<_, _, tokio::sync::Mutex<_>>::new(clock, storage)
}
#[traced_test]
#[tokio::test]
async fn test_insert_read() {
let clock = LocalClock::default();
let db = Database::<LocalClock, tokio::sync::Mutex<_>>::new(clock);
let db = test_db();
let tx1 = db.begin_tx().await;
let tx1_row = Row {
@@ -484,8 +537,7 @@ mod tests {
#[traced_test]
#[tokio::test]
async fn test_read_nonexistent() {
let clock = LocalClock::default();
let db = Database::<LocalClock, tokio::sync::Mutex<_>>::new(clock);
let db = test_db();
let tx = db.begin_tx().await;
let row = db.read(tx, 1).await;
assert!(row.unwrap().is_none());
@@ -494,8 +546,7 @@ mod tests {
#[traced_test]
#[tokio::test]
async fn test_delete() {
let clock = LocalClock::default();
let db = Database::<LocalClock, tokio::sync::Mutex<_>>::new(clock);
let db = test_db();
let tx1 = db.begin_tx().await;
let tx1_row = Row {
@@ -518,8 +569,7 @@ mod tests {
#[traced_test]
#[tokio::test]
async fn test_delete_nonexistent() {
let clock = LocalClock::default();
let db = Database::<LocalClock, tokio::sync::Mutex<_>>::new(clock);
let db = test_db();
let tx = db.begin_tx().await;
assert!(!db.delete(tx, 1).await.unwrap());
}
@@ -527,8 +577,7 @@ mod tests {
#[traced_test]
#[tokio::test]
async fn test_commit() {
let clock = LocalClock::default();
let db = Database::<LocalClock, tokio::sync::Mutex<_>>::new(clock);
let db = test_db();
let tx1 = db.begin_tx().await;
let tx1_row = Row {
id: 1,
@@ -555,8 +604,7 @@ mod tests {
#[traced_test]
#[tokio::test]
async fn test_rollback() {
let clock = LocalClock::default();
let db = Database::<LocalClock, tokio::sync::Mutex<_>>::new(clock);
let db = test_db();
let tx1 = db.begin_tx().await;
let row1 = Row {
id: 1,
@@ -581,8 +629,7 @@ mod tests {
#[traced_test]
#[tokio::test]
async fn test_dirty_write() {
let clock = LocalClock::default();
let db = Database::<LocalClock, tokio::sync::Mutex<_>>::new(clock);
let db = test_db();
// T1 inserts a row with ID 1, but does not commit.
let tx1 = db.begin_tx().await;
@@ -609,8 +656,7 @@ mod tests {
#[traced_test]
#[tokio::test]
async fn test_dirty_read() {
let clock = LocalClock::default();
let db = Database::<LocalClock, tokio::sync::Mutex<_>>::new(clock);
let db = test_db();
// T1 inserts a row with ID 1, but does not commit.
let tx1 = db.begin_tx().await;
@@ -630,8 +676,7 @@ mod tests {
#[traced_test]
#[tokio::test]
async fn test_dirty_read_deleted() {
let clock = LocalClock::default();
let db = Database::<LocalClock, tokio::sync::Mutex<_>>::new(clock);
let db = test_db();
// T1 inserts a row with ID 1 and commits.
let tx1 = db.begin_tx().await;
@@ -655,8 +700,7 @@ mod tests {
#[traced_test]
#[tokio::test]
async fn test_fuzzy_read() {
let clock = LocalClock::default();
let db = Database::<LocalClock, tokio::sync::Mutex<_>>::new(clock);
let db = test_db();
// T1 inserts a row with ID 1 and commits.
let tx1 = db.begin_tx().await;
@@ -691,8 +735,7 @@ mod tests {
#[traced_test]
#[tokio::test]
async fn test_lost_update() {
let clock = LocalClock::default();
let db = Database::<LocalClock, tokio::sync::Mutex<_>>::new(clock);
let db = test_db();
// T1 inserts a row with ID 1 and commits.
let tx1 = db.begin_tx().await;
@@ -737,8 +780,7 @@ mod tests {
#[traced_test]
#[tokio::test]
async fn test_committed_visibility() {
let clock = LocalClock::default();
let db = Database::<LocalClock, tokio::sync::Mutex<_>>::new(clock);
let db = test_db();
// let's add $10 to my account since I like money
let tx1 = db.begin_tx().await;
@@ -769,8 +811,7 @@ mod tests {
#[traced_test]
#[tokio::test]
async fn test_future_row() {
let clock = LocalClock::default();
let db = Database::<LocalClock, tokio::sync::Mutex<_>>::new(clock);
let db = test_db();
let tx1 = db.begin_tx().await;
@@ -790,4 +831,73 @@ mod tests {
let row = db.read(tx1, 1).await.unwrap();
assert_eq!(row, None);
}
#[traced_test]
#[tokio::test]
async fn test_storage1() {
let clock = LocalClock::new();
let mut path = std::env::temp_dir();
path.push(format!(
"mvcc-rs-storage-test-{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos(),
));
let storage = crate::persistent_storage::JsonOnDisk { path };
let db: Database<_, _, tokio::sync::Mutex<_>> = Database::new(clock, storage);
let tx1 = db.begin_tx().await;
let tx2 = db.begin_tx().await;
let tx3 = db.begin_tx().await;
db.insert(
tx3,
Row {
id: 1,
data: "testme".to_string(),
},
)
.await
.unwrap();
db.commit_tx(tx1).await.unwrap();
db.rollback_tx(tx2).await;
db.commit_tx(tx3).await.unwrap();
let tx4 = db.begin_tx().await;
db.insert(
tx4,
Row {
id: 2,
data: "testme2".to_string(),
},
)
.await
.unwrap();
db.insert(
tx4,
Row {
id: 3,
data: "testme3".to_string(),
},
)
.await
.unwrap();
let mutation = db
.scan_storage()
.await
.unwrap();
println!("{:?}", mutation);
db.commit_tx(tx4).await.unwrap();
let mutation = db
.scan_storage()
.await
.unwrap();
println!("{:?}", mutation);
}
}

View File

@@ -34,4 +34,5 @@
pub mod clock;
pub mod database;
pub mod errors;
pub mod persistent_storage;
pub mod sync;

View File

@@ -0,0 +1,97 @@
use crate::database::{Result, Mutation};
use crate::errors::DatabaseError;
/// Persistent storage API for storing and retrieving transactions.
/// TODO: final design in heavy progress!
#[async_trait::async_trait]
pub trait Storage {
type Stream: futures::stream::Stream<Item = Mutation>;
async fn store(&mut self, m: Mutation) -> Result<()>;
async fn scan(&self) -> Result<Self::Stream>;
}
pub struct Noop {}
#[async_trait::async_trait]
impl Storage for Noop {
type Stream = futures::stream::Empty<Mutation>;
async fn store(&mut self, _m: Mutation) -> Result<()> {
Ok(())
}
async fn scan(&self) -> Result<Self::Stream> {
Ok(futures::stream::empty())
}
}
pub struct JsonOnDisk {
pub path: std::path::PathBuf,
}
impl JsonOnDisk {
pub fn new(path: impl Into<std::path::PathBuf>) -> Self {
let path = path.into();
Self { path }
}
}
#[cfg(feature = "tokio")]
#[pin_project::pin_project]
pub struct JsonOnDiskStream {
#[pin]
inner: tokio_stream::wrappers::LinesStream<tokio::io::BufReader<tokio::fs::File>>,
}
impl futures::stream::Stream for JsonOnDiskStream {
type Item = Mutation;
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.project();
this.inner
.poll_next(cx)
.map(|x| x.and_then(|x| x.ok().and_then(|x| serde_json::from_str(x.as_str()).ok())))
}
}
#[cfg(feature = "tokio")]
#[async_trait::async_trait]
impl Storage for JsonOnDisk {
type Stream = JsonOnDiskStream;
async fn store(&mut self, m: Mutation) -> Result<()> {
use tokio::io::AsyncWriteExt;
let t = serde_json::to_vec(&m).map_err(|e| DatabaseError::Io(e.to_string()))?;
let mut file = tokio::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&self.path)
.await
.map_err(|e| DatabaseError::Io(e.to_string()))?;
file.write_all(&t)
.await
.map_err(|e| DatabaseError::Io(e.to_string()))?;
file.write_all(b"\n")
.await
.map_err(|e| DatabaseError::Io(e.to_string()))?;
Ok(())
}
async fn scan(&self) -> Result<Self::Stream> {
use tokio::io::AsyncBufReadExt;
let file = tokio::fs::OpenOptions::new()
.read(true)
.open(&self.path)
.await
.unwrap();
Ok(JsonOnDiskStream {
inner: tokio_stream::wrappers::LinesStream::new(
tokio::io::BufReader::new(file).lines(),
),
})
}
}

View File

@@ -10,7 +10,8 @@ fn test_non_overlapping_concurrent_inserts() {
// Two threads insert to the database concurrently using non-overlapping
// row IDs.
let clock = LocalClock::default();
let db = Arc::new(Database::<LocalClock, tokio::sync::Mutex<_>>::new(clock));
let storage = mvcc_rs::persistent_storage::Noop {};
let db = Arc::new(Database::<_, _, tokio::sync::Mutex<_>>::new(clock, storage));
let ids = Arc::new(AtomicU64::new(0));
shuttle::check_random(
move || {