Merge pull request #19 from psarna/storage_v1

Add persistent storage trait
This commit is contained in:
Pekka Enberg
2023-04-17 17:50:18 +03:00
committed by GitHub
7 changed files with 277 additions and 56 deletions

View File

@@ -10,6 +10,10 @@ futures = "0.3.28"
thiserror = "1.0.40"
tracing = "0.1.37"
tokio = { version = "1.27.0", features = ["full"], optional = true }
tokio-stream = { version = "0.1.12", optional = true, features = ["io-util"] }
serde = { version = "1.0.160", features = ["derive"] }
serde_json = "1.0.96"
pin-project = "1.0.12"
[dev-dependencies]
criterion = { version = "0.4", features = ["html_reports", "async", "async_futures"] }
@@ -27,4 +31,4 @@ harness = false
[features]
default = []
full = ["tokio"]
tokio = ["dep:tokio"]
tokio = ["dep:tokio", "dep:tokio-stream"]

View File

@@ -4,20 +4,30 @@ use mvcc_rs::clock::LocalClock;
use mvcc_rs::database::{Database, Row};
use pprof::criterion::{Output, PProfProfiler};
fn bench_db() -> Database<
LocalClock,
mvcc_rs::persistent_storage::Noop,
tokio::sync::Mutex<
mvcc_rs::database::DatabaseInner<LocalClock, mvcc_rs::persistent_storage::Noop>,
>,
> {
let clock = LocalClock::default();
let storage = mvcc_rs::persistent_storage::Noop {};
Database::<_, _, tokio::sync::Mutex<_>>::new(clock, storage)
}
fn bench(c: &mut Criterion) {
let mut group = c.benchmark_group("mvcc-ops-throughput");
group.throughput(Throughput::Elements(1));
let clock = LocalClock::default();
let db = Database::<LocalClock, tokio::sync::Mutex<_>>::new(clock);
let db = bench_db();
group.bench_function("begin_tx", |b| {
b.to_async(FuturesExecutor).iter(|| async {
db.begin_tx().await;
})
});
let clock = LocalClock::default();
let db = Database::<LocalClock, tokio::sync::Mutex<_>>::new(clock);
let db = bench_db();
group.bench_function("begin_tx + rollback_tx", |b| {
b.to_async(FuturesExecutor).iter(|| async {
let tx_id = db.begin_tx().await;
@@ -25,8 +35,7 @@ fn bench(c: &mut Criterion) {
})
});
let clock = LocalClock::default();
let db = Database::<LocalClock, tokio::sync::Mutex<_>>::new(clock);
let db = bench_db();
group.bench_function("begin_tx + commit_tx", |b| {
b.to_async(FuturesExecutor).iter(|| async {
let tx_id = db.begin_tx().await;
@@ -34,8 +43,7 @@ fn bench(c: &mut Criterion) {
})
});
let clock = LocalClock::default();
let db = Database::<LocalClock, tokio::sync::Mutex<_>>::new(clock);
let db = bench_db();
group.bench_function("begin_tx-read-commit_tx", |b| {
b.to_async(FuturesExecutor).iter(|| async {
let tx_id = db.begin_tx().await;
@@ -44,8 +52,7 @@ fn bench(c: &mut Criterion) {
})
});
let clock = LocalClock::default();
let db = Database::<LocalClock, tokio::sync::Mutex<_>>::new(clock);
let db = bench_db();
group.bench_function("begin_tx-update-commit_tx", |b| {
b.to_async(FuturesExecutor).iter(|| async {
let tx_id = db.begin_tx().await;
@@ -62,8 +69,7 @@ fn bench(c: &mut Criterion) {
})
});
let clock = LocalClock::default();
let db = Database::<LocalClock, tokio::sync::Mutex<_>>::new(clock);
let db = bench_db();
let tx = futures::executor::block_on(db.begin_tx());
futures::executor::block_on(db.insert(
tx,
@@ -79,8 +85,7 @@ fn bench(c: &mut Criterion) {
})
});
let clock = LocalClock::default();
let db = Database::<LocalClock, tokio::sync::Mutex<_>>::new(clock);
let db = bench_db();
let tx = futures::executor::block_on(db.begin_tx());
futures::executor::block_on(db.insert(
tx,

View File

@@ -1,27 +1,43 @@
use crate::clock::LogicalClock;
use crate::errors::DatabaseError;
use serde::{Deserialize, Serialize};
use std::cell::RefCell;
use std::collections::{HashMap, HashSet};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
type Result<T> = std::result::Result<T, DatabaseError>;
pub type Result<T> = std::result::Result<T, DatabaseError>;
#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct Row {
pub id: u64,
pub data: String,
}
/// A row version.
#[derive(Clone, Debug)]
struct RowVersion {
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct RowVersion {
begin: TxTimestampOrID,
end: Option<TxTimestampOrID>,
row: Row,
}
type TxID = u64;
pub type TxID = u64;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Mutation {
tx_id: TxID,
row_versions: Vec<RowVersion>,
}
impl Mutation {
fn new(tx_id: TxID) -> Self {
Self {
tx_id,
row_versions: Vec::new(),
}
}
}
/// A transaction timestamp or ID.
///
@@ -29,14 +45,14 @@ type TxID = u64;
/// phase of the transaction. During the active phase, new versions track the
/// transaction ID in the `begin` and `end` fields. After a transaction commits,
/// versions switch to tracking timestamps.
#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
enum TxTimestampOrID {
Timestamp(u64),
TxID(TxID),
}
/// Transaction
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Transaction {
/// The state of the transaction.
state: TransactionState,
@@ -89,7 +105,7 @@ impl std::fmt::Display for Transaction {
}
/// Transaction state.
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
enum TransactionState {
Active,
Preparing,
@@ -102,21 +118,26 @@ enum TransactionState {
#[derive(Debug)]
pub struct Database<
Clock: LogicalClock,
AsyncMutex: crate::sync::AsyncMutex<Inner = DatabaseInner<Clock>>,
Storage: crate::persistent_storage::Storage,
AsyncMutex: crate::sync::AsyncMutex<Inner = DatabaseInner<Clock, Storage>>,
> {
inner: Arc<AsyncMutex>,
}
impl<Clock: LogicalClock, AsyncMutex: crate::sync::AsyncMutex<Inner = DatabaseInner<Clock>>>
Database<Clock, AsyncMutex>
impl<
Clock: LogicalClock,
Storage: crate::persistent_storage::Storage,
AsyncMutex: crate::sync::AsyncMutex<Inner = DatabaseInner<Clock, Storage>>,
> Database<Clock, Storage, AsyncMutex>
{
/// Creates a new database.
pub fn new(clock: Clock) -> Self {
pub fn new(clock: Clock, storage: Storage) -> Self {
let inner = DatabaseInner {
rows: RefCell::new(HashMap::new()),
txs: RefCell::new(HashMap::new()),
tx_ids: AtomicU64::new(0),
clock,
storage,
};
Self {
inner: Arc::new(AsyncMutex::new(inner)),
@@ -238,17 +259,32 @@ impl<Clock: LogicalClock, AsyncMutex: crate::sync::AsyncMutex<Inner = DatabaseIn
let inner = self.inner.lock().await;
inner.rollback_tx(tx_id).await;
}
#[cfg(test)]
pub(crate) async fn scan_storage(&self) -> Result<Vec<Mutation>> {
use futures::StreamExt;
let inner = self.inner.lock().await;
Ok(inner
.storage
.scan()
.await?
.collect::<Vec<Mutation>>()
.await)
}
}
#[derive(Debug)]
pub struct DatabaseInner<Clock: LogicalClock> {
pub struct DatabaseInner<Clock: LogicalClock, Storage: crate::persistent_storage::Storage> {
rows: RefCell<HashMap<u64, Vec<RowVersion>>>,
txs: RefCell<HashMap<TxID, Transaction>>,
tx_ids: AtomicU64,
clock: Clock,
storage: Storage,
}
impl<Clock: LogicalClock> DatabaseInner<Clock> {
impl<Clock: LogicalClock, Storage: crate::persistent_storage::Storage>
DatabaseInner<Clock, Storage>
{
async fn insert(&self, tx_id: TxID, row: Row) -> Result<()> {
let mut txs = self.txs.borrow_mut();
let tx = txs
@@ -324,6 +360,7 @@ impl<Clock: LogicalClock> DatabaseInner<Clock> {
tx_id
}
#[allow(clippy::await_holding_refcell_ref)]
async fn commit_tx(&mut self, tx_id: TxID) -> Result<()> {
let end_ts = self.get_timestamp();
let mut txs = self.txs.borrow_mut();
@@ -337,17 +374,20 @@ impl<Clock: LogicalClock> DatabaseInner<Clock> {
let mut rows = self.rows.borrow_mut();
tx.state = TransactionState::Preparing;
tracing::trace!("PREPARE {tx}");
let mut mutation: Mutation = Mutation::new(tx_id);
for id in &tx.write_set {
if let Some(row_versions) = rows.get_mut(id) {
for row_version in row_versions.iter_mut() {
if let TxTimestampOrID::TxID(id) = row_version.begin {
if id == tx_id {
row_version.begin = TxTimestampOrID::Timestamp(tx.begin_ts);
mutation.row_versions.push(row_version.clone()); // FIXME: optimize cloning out
}
}
if let Some(TxTimestampOrID::TxID(id)) = row_version.end {
if id == tx_id {
row_version.end = Some(TxTimestampOrID::Timestamp(end_ts));
mutation.row_versions.push(row_version.clone()); // FIXME: optimize cloning out
}
}
}
@@ -362,6 +402,11 @@ impl<Clock: LogicalClock> DatabaseInner<Clock> {
// might have speculatively read a version that we want to remove.
// But that's a problem for another day.
txs.remove(&tx_id);
drop(rows);
drop(txs);
if !mutation.row_versions.is_empty() {
self.storage.store(mutation).await?;
}
Ok(())
}
@@ -459,11 +504,20 @@ mod tests {
use crate::clock::LocalClock;
use tracing_test::traced_test;
fn test_db() -> Database<
LocalClock,
crate::persistent_storage::Noop,
tokio::sync::Mutex<DatabaseInner<LocalClock, crate::persistent_storage::Noop>>,
> {
let clock = LocalClock::new();
let storage = crate::persistent_storage::Noop {};
Database::<_, _, tokio::sync::Mutex<_>>::new(clock, storage)
}
#[traced_test]
#[tokio::test]
async fn test_insert_read() {
let clock = LocalClock::default();
let db = Database::<LocalClock, tokio::sync::Mutex<_>>::new(clock);
let db = test_db();
let tx1 = db.begin_tx().await;
let tx1_row = Row {
@@ -483,8 +537,7 @@ mod tests {
#[traced_test]
#[tokio::test]
async fn test_read_nonexistent() {
let clock = LocalClock::default();
let db = Database::<LocalClock, tokio::sync::Mutex<_>>::new(clock);
let db = test_db();
let tx = db.begin_tx().await;
let row = db.read(tx, 1).await;
assert!(row.unwrap().is_none());
@@ -493,8 +546,7 @@ mod tests {
#[traced_test]
#[tokio::test]
async fn test_delete() {
let clock = LocalClock::default();
let db = Database::<LocalClock, tokio::sync::Mutex<_>>::new(clock);
let db = test_db();
let tx1 = db.begin_tx().await;
let tx1_row = Row {
@@ -517,8 +569,7 @@ mod tests {
#[traced_test]
#[tokio::test]
async fn test_delete_nonexistent() {
let clock = LocalClock::default();
let db = Database::<LocalClock, tokio::sync::Mutex<_>>::new(clock);
let db = test_db();
let tx = db.begin_tx().await;
assert!(!db.delete(tx, 1).await.unwrap());
}
@@ -526,8 +577,7 @@ mod tests {
#[traced_test]
#[tokio::test]
async fn test_commit() {
let clock = LocalClock::default();
let db = Database::<LocalClock, tokio::sync::Mutex<_>>::new(clock);
let db = test_db();
let tx1 = db.begin_tx().await;
let tx1_row = Row {
id: 1,
@@ -554,8 +604,7 @@ mod tests {
#[traced_test]
#[tokio::test]
async fn test_rollback() {
let clock = LocalClock::default();
let db = Database::<LocalClock, tokio::sync::Mutex<_>>::new(clock);
let db = test_db();
let tx1 = db.begin_tx().await;
let row1 = Row {
id: 1,
@@ -580,8 +629,7 @@ mod tests {
#[traced_test]
#[tokio::test]
async fn test_dirty_write() {
let clock = LocalClock::default();
let db = Database::<LocalClock, tokio::sync::Mutex<_>>::new(clock);
let db = test_db();
// T1 inserts a row with ID 1, but does not commit.
let tx1 = db.begin_tx().await;
@@ -608,8 +656,7 @@ mod tests {
#[traced_test]
#[tokio::test]
async fn test_dirty_read() {
let clock = LocalClock::default();
let db = Database::<LocalClock, tokio::sync::Mutex<_>>::new(clock);
let db = test_db();
// T1 inserts a row with ID 1, but does not commit.
let tx1 = db.begin_tx().await;
@@ -629,8 +676,7 @@ mod tests {
#[traced_test]
#[tokio::test]
async fn test_dirty_read_deleted() {
let clock = LocalClock::default();
let db = Database::<LocalClock, tokio::sync::Mutex<_>>::new(clock);
let db = test_db();
// T1 inserts a row with ID 1 and commits.
let tx1 = db.begin_tx().await;
@@ -654,8 +700,7 @@ mod tests {
#[traced_test]
#[tokio::test]
async fn test_fuzzy_read() {
let clock = LocalClock::default();
let db = Database::<LocalClock, tokio::sync::Mutex<_>>::new(clock);
let db = test_db();
// T1 inserts a row with ID 1 and commits.
let tx1 = db.begin_tx().await;
@@ -690,8 +735,7 @@ mod tests {
#[traced_test]
#[tokio::test]
async fn test_lost_update() {
let clock = LocalClock::default();
let db = Database::<LocalClock, tokio::sync::Mutex<_>>::new(clock);
let db = test_db();
// T1 inserts a row with ID 1 and commits.
let tx1 = db.begin_tx().await;
@@ -736,8 +780,7 @@ mod tests {
#[traced_test]
#[tokio::test]
async fn test_committed_visibility() {
let clock = LocalClock::default();
let db = Database::<LocalClock, tokio::sync::Mutex<_>>::new(clock);
let db = test_db();
// let's add $10 to my account since I like money
let tx1 = db.begin_tx().await;
@@ -768,8 +811,7 @@ mod tests {
#[traced_test]
#[tokio::test]
async fn test_future_row() {
let clock = LocalClock::default();
let db = Database::<LocalClock, tokio::sync::Mutex<_>>::new(clock);
let db = test_db();
let tx1 = db.begin_tx().await;
@@ -789,4 +831,73 @@ mod tests {
let row = db.read(tx1, 1).await.unwrap();
assert_eq!(row, None);
}
#[traced_test]
#[tokio::test]
async fn test_storage1() {
let clock = LocalClock::new();
let mut path = std::env::temp_dir();
path.push(format!(
"mvcc-rs-storage-test-{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos(),
));
let storage = crate::persistent_storage::JsonOnDisk { path };
let db: Database<_, _, tokio::sync::Mutex<_>> = Database::new(clock, storage);
let tx1 = db.begin_tx().await;
let tx2 = db.begin_tx().await;
let tx3 = db.begin_tx().await;
db.insert(
tx3,
Row {
id: 1,
data: "testme".to_string(),
},
)
.await
.unwrap();
db.commit_tx(tx1).await.unwrap();
db.rollback_tx(tx2).await;
db.commit_tx(tx3).await.unwrap();
let tx4 = db.begin_tx().await;
db.insert(
tx4,
Row {
id: 2,
data: "testme2".to_string(),
},
)
.await
.unwrap();
db.insert(
tx4,
Row {
id: 3,
data: "testme3".to_string(),
},
)
.await
.unwrap();
let mutation = db
.scan_storage()
.await
.unwrap();
println!("{:?}", mutation);
db.commit_tx(tx4).await.unwrap();
let mutation = db
.scan_storage()
.await
.unwrap();
println!("{:?}", mutation);
}
}

View File

@@ -8,4 +8,6 @@ pub enum DatabaseError {
WriteWriteConflict,
#[error("transaction is terminated")]
TxTerminated,
#[error("I/O error: {0}")]
Io(String),
}

View File

@@ -34,4 +34,5 @@
pub mod clock;
pub mod database;
pub mod errors;
pub mod persistent_storage;
pub mod sync;

View File

@@ -0,0 +1,97 @@
use crate::database::{Result, Mutation};
use crate::errors::DatabaseError;
/// Persistent storage API for storing and retrieving transactions.
/// TODO: final design in heavy progress!
#[async_trait::async_trait]
pub trait Storage {
type Stream: futures::stream::Stream<Item = Mutation>;
async fn store(&mut self, m: Mutation) -> Result<()>;
async fn scan(&self) -> Result<Self::Stream>;
}
pub struct Noop {}
#[async_trait::async_trait]
impl Storage for Noop {
type Stream = futures::stream::Empty<Mutation>;
async fn store(&mut self, _m: Mutation) -> Result<()> {
Ok(())
}
async fn scan(&self) -> Result<Self::Stream> {
Ok(futures::stream::empty())
}
}
pub struct JsonOnDisk {
pub path: std::path::PathBuf,
}
impl JsonOnDisk {
pub fn new(path: impl Into<std::path::PathBuf>) -> Self {
let path = path.into();
Self { path }
}
}
#[cfg(feature = "tokio")]
#[pin_project::pin_project]
pub struct JsonOnDiskStream {
#[pin]
inner: tokio_stream::wrappers::LinesStream<tokio::io::BufReader<tokio::fs::File>>,
}
impl futures::stream::Stream for JsonOnDiskStream {
type Item = Mutation;
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.project();
this.inner
.poll_next(cx)
.map(|x| x.and_then(|x| x.ok().and_then(|x| serde_json::from_str(x.as_str()).ok())))
}
}
#[cfg(feature = "tokio")]
#[async_trait::async_trait]
impl Storage for JsonOnDisk {
type Stream = JsonOnDiskStream;
async fn store(&mut self, m: Mutation) -> Result<()> {
use tokio::io::AsyncWriteExt;
let t = serde_json::to_vec(&m).map_err(|e| DatabaseError::Io(e.to_string()))?;
let mut file = tokio::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&self.path)
.await
.map_err(|e| DatabaseError::Io(e.to_string()))?;
file.write_all(&t)
.await
.map_err(|e| DatabaseError::Io(e.to_string()))?;
file.write_all(b"\n")
.await
.map_err(|e| DatabaseError::Io(e.to_string()))?;
Ok(())
}
async fn scan(&self) -> Result<Self::Stream> {
use tokio::io::AsyncBufReadExt;
let file = tokio::fs::OpenOptions::new()
.read(true)
.open(&self.path)
.await
.unwrap();
Ok(JsonOnDiskStream {
inner: tokio_stream::wrappers::LinesStream::new(
tokio::io::BufReader::new(file).lines(),
),
})
}
}

View File

@@ -10,7 +10,8 @@ fn test_non_overlapping_concurrent_inserts() {
// Two threads insert to the database concurrently using non-overlapping
// row IDs.
let clock = LocalClock::default();
let db = Arc::new(Database::<LocalClock, tokio::sync::Mutex<_>>::new(clock));
let storage = mvcc_rs::persistent_storage::Noop {};
let db = Arc::new(Database::<_, _, tokio::sync::Mutex<_>>::new(clock, storage));
let ids = Arc::new(AtomicU64::new(0));
shuttle::check_random(
move || {