treewide: drop storage trait

We're good with an enum, and async_trait has a runtime cost
we don't like.
This commit is contained in:
Piotr Sarna
2023-05-15 10:50:47 +02:00
parent 7772c65f8d
commit 8b1ef20c08
8 changed files with 72 additions and 127 deletions

View File

@@ -12,7 +12,7 @@ cbindgen = "0.24.0"
[dependencies]
base64 = "0.21.0"
mvcc-rs = { path = "../../mvcc-rs", features = ["tokio"] }
mvcc-rs = { path = "../../mvcc-rs" }
tokio = { version = "1.27.0", features = ["full", "parking_lot"] }
tracing = "0.1.37"
tracing-subscriber = { version = "0" }

View File

@@ -12,13 +12,10 @@ use types::{DbContext, MVCCDatabaseRef, MVCCScanCursorRef, ScanCursorContext};
type Clock = clock::LocalClock;
/// cbindgen:ignore
type Storage = persistent_storage::JsonOnDisk;
type Db = database::Database<Clock>;
/// cbindgen:ignore
type Db = database::Database<Clock, Storage>;
/// cbindgen:ignore
type ScanCursor = cursor::ScanCursor<'static, Clock, Storage>;
type ScanCursor = cursor::ScanCursor<'static, Clock>;
static INIT_RUST_LOG: std::sync::Once = std::sync::Once::new();
@@ -40,7 +37,7 @@ pub unsafe extern "C" fn MVCCDatabaseOpen(path: *const std::ffi::c_char) -> MVCC
}
};
tracing::debug!("mvccrs: opening persistent storage at {path}");
let storage = crate::persistent_storage::JsonOnDisk::new(path);
let storage = crate::persistent_storage::Storage::new_json_on_disk(path);
let db = Db::new(clock, storage);
let runtime = tokio::runtime::Runtime::new().unwrap();
let ctx = DbContext { db, runtime };

View File

@@ -5,12 +5,11 @@ edition = "2021"
[dependencies]
anyhow = "1.0.70"
async-trait = "0.1.68"
futures = "0.3.28"
thiserror = "1.0.40"
tracing = "0.1.37"
tokio = { version = "1.27.0", features = ["full", "parking_lot"], optional = true }
tokio-stream = { version = "0.1.12", optional = true, features = ["io-util"] }
tokio = { version = "1.27.0", features = ["full", "parking_lot"] }
tokio-stream = { version = "0.1.12", features = ["io-util"] }
serde = { version = "1.0.160", features = ["derive"] }
serde_json = "1.0.96"
pin-project = "1.0.12"
@@ -21,10 +20,9 @@ base64 = "0.21.0"
criterion = { version = "0.4", features = ["html_reports", "async", "async_futures"] }
pprof = { version = "0.11.1", features = ["criterion", "flamegraph"] }
shuttle = "0.6.0"
tokio = { version = "1.27.0", features = ["full", "parking_lot"] }
tracing-subscriber = "0"
tracing-test = "0"
mvcc-rs = { path = ".", features = ["tokio"] }
mvcc-rs = { path = "." }
[[bench]]
name = "my_benchmark"
@@ -32,6 +30,4 @@ harness = false
[features]
default = []
full = ["tokio"]
c_bindings = ["tokio", "dep:tracing-subscriber"]
tokio = ["dep:tokio", "dep:tokio-stream"]
c_bindings = ["dep:tracing-subscriber"]

View File

@@ -4,12 +4,9 @@ use mvcc_rs::clock::LocalClock;
use mvcc_rs::database::{Database, Row, RowID};
use pprof::criterion::{Output, PProfProfiler};
fn bench_db() -> Database<
LocalClock,
mvcc_rs::persistent_storage::Noop,
> {
fn bench_db() -> Database<LocalClock> {
let clock = LocalClock::default();
let storage = mvcc_rs::persistent_storage::Noop {};
let storage = mvcc_rs::persistent_storage::Storage::new_noop();
Database::new(clock, storage)
}

View File

@@ -1,29 +1,16 @@
use crate::clock::LogicalClock;
use crate::database::{Database, Result, Row, RowID};
use crate::persistent_storage::Storage;
#[derive(Debug)]
pub struct ScanCursor<
'a,
Clock: LogicalClock,
StorageImpl: Storage,
> {
pub db: &'a Database<Clock, StorageImpl>,
pub struct ScanCursor<'a, Clock: LogicalClock> {
pub db: &'a Database<Clock>,
pub row_ids: Vec<RowID>,
pub index: usize,
tx_id: u64,
}
impl<
'a,
Clock: LogicalClock,
StorageImpl: Storage,
> ScanCursor<'a, Clock, StorageImpl>
{
pub async fn new(
db: &'a Database<Clock, StorageImpl>,
table_id: u64,
) -> Result<ScanCursor<'a, Clock, StorageImpl>> {
impl<'a, Clock: LogicalClock> ScanCursor<'a, Clock> {
pub async fn new(db: &'a Database<Clock>, table_id: u64) -> Result<ScanCursor<'a, Clock>> {
let tx_id = db.begin_tx().await;
let row_ids = db.scan_row_ids_for_table(table_id).await?;
Ok(Self {

View File

@@ -1,5 +1,6 @@
use crate::clock::LogicalClock;
use crate::errors::DatabaseError;
use crate::persistent_storage::Storage;
use serde::{Deserialize, Serialize};
use std::cell::RefCell;
use std::collections::{BTreeMap, HashMap, HashSet};
@@ -125,18 +126,11 @@ enum TransactionState {
/// A database with MVCC.
#[derive(Debug)]
pub struct Database<
Clock: LogicalClock,
Storage: crate::persistent_storage::Storage,
> {
inner: Arc<Mutex<DatabaseInner<Clock, Storage>>>,
pub struct Database<Clock: LogicalClock> {
inner: Arc<Mutex<DatabaseInner<Clock>>>,
}
impl<
Clock: LogicalClock,
Storage: crate::persistent_storage::Storage,
> Database<Clock, Storage>
{
impl<Clock: LogicalClock> Database<Clock> {
/// Creates a new database.
pub fn new(clock: Clock, storage: Storage) -> Self {
let inner = DatabaseInner {
@@ -294,7 +288,7 @@ impl<
}
#[derive(Debug)]
pub struct DatabaseInner<Clock: LogicalClock, Storage: crate::persistent_storage::Storage> {
pub struct DatabaseInner<Clock: LogicalClock> {
rows: RefCell<BTreeMap<RowID, Vec<RowVersion>>>,
txs: RefCell<HashMap<TxID, Transaction>>,
tx_timestamps: RefCell<BTreeMap<u64, usize>>,
@@ -303,9 +297,7 @@ pub struct DatabaseInner<Clock: LogicalClock, Storage: crate::persistent_storage
storage: Storage,
}
impl<Clock: LogicalClock, Storage: crate::persistent_storage::Storage>
DatabaseInner<Clock, Storage>
{
impl<Clock: LogicalClock> DatabaseInner<Clock> {
async fn insert(&self, tx_id: TxID, row: Row) -> Result<()> {
let mut txs = self.txs.borrow_mut();
let tx = txs
@@ -624,12 +616,9 @@ mod tests {
use crate::clock::LocalClock;
use tracing_test::traced_test;
fn test_db() -> Database<
LocalClock,
crate::persistent_storage::Noop,
> {
fn test_db() -> Database<LocalClock> {
let clock = LocalClock::new();
let storage = crate::persistent_storage::Noop {};
let storage = crate::persistent_storage::Storage::new_noop();
Database::new(clock, storage)
}
@@ -1286,7 +1275,7 @@ mod tests {
.unwrap()
.as_nanos(),
));
let storage = crate::persistent_storage::JsonOnDisk { path: path.clone() };
let storage = crate::persistent_storage::Storage::new_json_on_disk(path.clone());
let db = Database::new(clock, storage);
let tx1 = db.begin_tx().await;
@@ -1381,7 +1370,7 @@ mod tests {
db.commit_tx(tx4).await.unwrap();
let clock = LocalClock::new();
let storage = crate::persistent_storage::JsonOnDisk { path };
let storage = crate::persistent_storage::Storage::new_json_on_disk(path);
let db = Database::new(clock, storage);
db.recover().await.unwrap();
println!("{:#?}", db);

View File

@@ -1,53 +1,29 @@
use crate::database::{LogRecord, Result};
/// Persistent storage API for storing and retrieving transactions.
/// TODO: final design in heavy progress!
#[async_trait::async_trait]
pub trait Storage {
type Stream: futures::stream::Stream<Item = LogRecord>;
/// Append a transaction in the transaction log.
async fn log_tx(&mut self, m: LogRecord) -> Result<()>;
/// Read the transaction log for replay.
async fn read_tx_log(&self) -> Result<Self::Stream>;
}
pub struct Noop {}
#[async_trait::async_trait]
impl Storage for Noop {
type Stream = futures::stream::Empty<LogRecord>;
async fn log_tx(&mut self, _m: LogRecord) -> Result<()> {
Ok(())
}
async fn read_tx_log(&self) -> Result<Self::Stream> {
Ok(futures::stream::empty())
}
}
use crate::errors::DatabaseError;
#[derive(Debug)]
pub struct JsonOnDisk {
pub path: std::path::PathBuf,
pub enum Storage {
Noop,
JsonOnDisk(std::path::PathBuf),
}
impl JsonOnDisk {
pub fn new(path: impl Into<std::path::PathBuf>) -> Self {
impl Storage {
pub fn new_noop() -> Self {
Self::Noop
}
pub fn new_json_on_disk(path: impl Into<std::path::PathBuf>) -> Self {
let path = path.into();
Self { path }
Self::JsonOnDisk(path)
}
}
#[cfg(feature = "tokio")]
#[pin_project::pin_project]
pub struct JsonOnDiskStream {
#[pin]
inner: tokio_stream::wrappers::LinesStream<tokio::io::BufReader<tokio::fs::File>>,
}
#[cfg(feature = "tokio")]
impl futures::stream::Stream for JsonOnDiskStream {
type Item = LogRecord;
@@ -62,41 +38,44 @@ impl futures::stream::Stream for JsonOnDiskStream {
}
}
#[cfg(feature = "tokio")]
#[async_trait::async_trait]
impl Storage for JsonOnDisk {
type Stream = JsonOnDiskStream;
async fn log_tx(&mut self, m: LogRecord) -> Result<()> {
use crate::errors::DatabaseError;
use tokio::io::AsyncWriteExt;
let t = serde_json::to_vec(&m).map_err(|e| DatabaseError::Io(e.to_string()))?;
let mut file = tokio::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&self.path)
.await
.map_err(|e| DatabaseError::Io(e.to_string()))?;
file.write_all(&t)
.await
.map_err(|e| DatabaseError::Io(e.to_string()))?;
file.write_all(b"\n")
.await
.map_err(|e| DatabaseError::Io(e.to_string()))?;
impl Storage {
pub async fn log_tx(&mut self, m: LogRecord) -> Result<()> {
if let Self::JsonOnDisk(path) = self {
use tokio::io::AsyncWriteExt;
let t = serde_json::to_vec(&m).map_err(|e| DatabaseError::Io(e.to_string()))?;
let mut file = tokio::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&path)
.await
.map_err(|e| DatabaseError::Io(e.to_string()))?;
file.write_all(&t)
.await
.map_err(|e| DatabaseError::Io(e.to_string()))?;
file.write_all(b"\n")
.await
.map_err(|e| DatabaseError::Io(e.to_string()))?;
}
Ok(())
}
async fn read_tx_log(&self) -> Result<Self::Stream> {
use tokio::io::AsyncBufReadExt;
let file = tokio::fs::OpenOptions::new()
.read(true)
.open(&self.path)
.await
.unwrap();
Ok(JsonOnDiskStream {
inner: tokio_stream::wrappers::LinesStream::new(
tokio::io::BufReader::new(file).lines(),
),
})
pub async fn read_tx_log(&self) -> Result<JsonOnDiskStream> {
if let Self::JsonOnDisk(path) = self {
use tokio::io::AsyncBufReadExt;
let file = tokio::fs::OpenOptions::new()
.read(true)
.open(&path)
.await
.map_err(|e| DatabaseError::Io(e.to_string()))?;
Ok(JsonOnDiskStream {
inner: tokio_stream::wrappers::LinesStream::new(
tokio::io::BufReader::new(file).lines(),
),
})
} else {
Err(crate::errors::DatabaseError::Io(
"cannot read from Noop storage".to_string(),
))
}
}
}

View File

@@ -10,7 +10,7 @@ fn test_non_overlapping_concurrent_inserts() {
// Two threads insert to the database concurrently using non-overlapping
// row IDs.
let clock = LocalClock::default();
let storage = mvcc_rs::persistent_storage::Noop {};
let storage = mvcc_rs::persistent_storage::Storage::new_noop();
let db = Arc::new(Database::new(clock, storage));
let ids = Arc::new(AtomicU64::new(0));
shuttle::check_random(