Merge 'core/mvcc: Eliminate row generic types' from Pekka Enberg

The logging code that writes out transactions to disk needs to write out
the byte array that we actually use. The code is less hairly without the
generics so drop them.

Reviewed-by: Jussi Saurio <jussi.saurio@gmail.com>

Closes #1100
This commit is contained in:
Pekka Enberg
2025-03-05 19:24:54 +02:00
6 changed files with 60 additions and 64 deletions

View File

@@ -4,7 +4,7 @@ use limbo_core::mvcc::clock::LocalClock;
use limbo_core::mvcc::database::{MvStore, Row, RowID};
use pprof::criterion::{Output, PProfProfiler};
fn bench_db() -> MvStore<LocalClock, String> {
fn bench_db() -> MvStore<LocalClock> {
let clock = LocalClock::default();
let storage = limbo_core::mvcc::persistent_storage::Storage::new_noop();
MvStore::new(clock, storage)
@@ -57,7 +57,7 @@ fn bench(c: &mut Criterion) {
table_id: 1,
row_id: 1,
},
data: "World".to_string(),
data: "World".to_string().into_bytes(),
},
)
.unwrap();
@@ -74,7 +74,7 @@ fn bench(c: &mut Criterion) {
table_id: 1,
row_id: 1,
},
data: "Hello".to_string(),
data: "Hello".to_string().into_bytes(),
},
)
.unwrap();
@@ -100,7 +100,7 @@ fn bench(c: &mut Criterion) {
table_id: 1,
row_id: 1,
},
data: "Hello".to_string(),
data: "Hello".to_string().into_bytes(),
},
)
.unwrap();
@@ -113,7 +113,7 @@ fn bench(c: &mut Criterion) {
table_id: 1,
row_id: 1,
},
data: "World".to_string(),
data: "World".to_string().into_bytes(),
},
)
.unwrap();

View File

@@ -3,19 +3,15 @@ use crate::mvcc::database::{MvStore, Result, Row, RowID};
use std::fmt::Debug;
#[derive(Debug)]
pub struct ScanCursor<'a, Clock: LogicalClock, T: Sync + Send + Clone + Debug> {
pub db: &'a MvStore<Clock, T>,
pub struct ScanCursor<'a, Clock: LogicalClock> {
pub db: &'a MvStore<Clock>,
pub row_ids: Vec<RowID>,
pub index: usize,
tx_id: u64,
}
impl<'a, Clock: LogicalClock, T: Sync + Send + Clone + Debug + 'static> ScanCursor<'a, Clock, T> {
pub fn new(
db: &'a MvStore<Clock, T>,
tx_id: u64,
table_id: u64,
) -> Result<ScanCursor<'a, Clock, T>> {
impl<'a, Clock: LogicalClock> ScanCursor<'a, Clock> {
pub fn new(db: &'a MvStore<Clock>, tx_id: u64, table_id: u64) -> Result<ScanCursor<'a, Clock>> {
let row_ids = db.scan_row_ids_for_table(table_id)?;
Ok(Self {
db,
@@ -32,7 +28,7 @@ impl<'a, Clock: LogicalClock, T: Sync + Send + Clone + Debug + 'static> ScanCurs
Some(self.row_ids[self.index])
}
pub fn current_row(&self) -> Result<Option<Row<T>>> {
pub fn current_row(&self) -> Result<Option<Row>> {
if self.index >= self.row_ids.len() {
return Ok(None);
}

View File

@@ -19,29 +19,29 @@ pub struct RowID {
#[derive(Clone, Debug, PartialEq, PartialOrd)]
pub struct Row<T> {
pub struct Row {
pub id: RowID,
pub data: T,
pub data: Vec<u8>,
}
/// A row version.
#[derive(Clone, Debug, PartialEq)]
pub struct RowVersion<T> {
pub struct RowVersion {
begin: TxTimestampOrID,
end: Option<TxTimestampOrID>,
row: Row<T>,
row: Row,
}
pub type TxID = u64;
/// A log record contains all the versions inserted and deleted by a transaction.
#[derive(Clone, Debug)]
pub struct LogRecord<T> {
pub struct LogRecord {
pub(crate) tx_timestamp: TxID,
row_versions: Vec<RowVersion<T>>,
row_versions: Vec<RowVersion>,
}
impl<T> LogRecord<T> {
impl LogRecord {
fn new(tx_timestamp: TxID) -> Self {
Self {
tx_timestamp,
@@ -211,15 +211,15 @@ impl AtomicTransactionState {
/// A multi-version concurrency control database.
#[derive(Debug)]
pub struct MvStore<Clock: LogicalClock, T: Sync + Send + Clone + Debug> {
rows: SkipMap<RowID, RwLock<Vec<RowVersion<T>>>>,
pub struct MvStore<Clock: LogicalClock> {
rows: SkipMap<RowID, RwLock<Vec<RowVersion>>>,
txs: SkipMap<TxID, RwLock<Transaction>>,
tx_ids: AtomicU64,
clock: Clock,
storage: Storage,
}
impl<Clock: LogicalClock, T: Sync + Send + Clone + Debug + 'static> MvStore<Clock, T> {
impl<Clock: LogicalClock> MvStore<Clock> {
/// Creates a new database.
pub fn new(clock: Clock, storage: Storage) -> Self {
Self {
@@ -241,7 +241,7 @@ impl<Clock: LogicalClock, T: Sync + Send + Clone + Debug + 'static> MvStore<Cloc
/// * `tx_id` - the ID of the transaction in which to insert the new row.
/// * `row` - the row object containing the values to be inserted.
///
pub fn insert(&self, tx_id: TxID, row: Row<T>) -> Result<()> {
pub fn insert(&self, tx_id: TxID, row: Row) -> Result<()> {
let tx = self
.txs
.get(&tx_id)
@@ -278,7 +278,7 @@ impl<Clock: LogicalClock, T: Sync + Send + Clone + Debug + 'static> MvStore<Cloc
/// # Returns
///
/// Returns `true` if the row was successfully updated, and `false` otherwise.
pub fn update(&self, tx_id: TxID, row: Row<T>) -> Result<bool> {
pub fn update(&self, tx_id: TxID, row: Row) -> Result<bool> {
if !self.delete(tx_id, row.id)? {
return Ok(false);
}
@@ -288,7 +288,7 @@ impl<Clock: LogicalClock, T: Sync + Send + Clone + Debug + 'static> MvStore<Cloc
/// Inserts a row in the database with new values, previously deleting
/// any old data if it existed. Bails on a delete error, e.g. write-write conflict.
pub fn upsert(&self, tx_id: TxID, row: Row<T>) -> Result<()> {
pub fn upsert(&self, tx_id: TxID, row: Row) -> Result<()> {
self.delete(tx_id, row.id)?;
self.insert(tx_id, row)
}
@@ -361,7 +361,7 @@ impl<Clock: LogicalClock, T: Sync + Send + Clone + Debug + 'static> MvStore<Cloc
///
/// Returns `Some(row)` with the row data if the row with the given `id` exists,
/// and `None` otherwise.
pub fn read(&self, tx_id: TxID, id: RowID) -> Result<Option<Row<T>>> {
pub fn read(&self, tx_id: TxID, id: RowID) -> Result<Option<Row>> {
let tx = self.txs.get(&tx_id).unwrap();
let tx = tx.value().read().unwrap();
assert_eq!(tx.state, TransactionState::Active);
@@ -520,7 +520,7 @@ impl<Clock: LogicalClock, T: Sync + Send + Clone + Debug + 'static> MvStore<Cloc
drop(tx);
// Postprocessing: inserting row versions and logging the transaction to persistent storage.
// TODO: we should probably save to persistent storage first, and only then update the in-memory structures.
let mut log_record: LogRecord<T> = LogRecord::new(end_ts);
let mut log_record = LogRecord::new(end_ts);
for ref id in write_set {
if let Some(row_versions) = self.rows.get(id) {
let mut row_versions = row_versions.value().write().unwrap();
@@ -700,7 +700,7 @@ impl<Clock: LogicalClock, T: Sync + Send + Clone + Debug + 'static> MvStore<Cloc
/// Inserts a new row version into the database, while making sure that
/// the row version is inserted in the correct order.
fn insert_version(&self, id: RowID, row_version: RowVersion<T>) {
fn insert_version(&self, id: RowID, row_version: RowVersion) {
let versions = self.rows.get_or_insert_with(id, || RwLock::new(Vec::new()));
let mut versions = versions.value().write().unwrap();
self.insert_version_raw(&mut versions, row_version)
@@ -708,7 +708,7 @@ impl<Clock: LogicalClock, T: Sync + Send + Clone + Debug + 'static> MvStore<Cloc
/// Inserts a new row version into the internal data structure for versions,
/// while making sure that the row version is inserted in the correct order.
fn insert_version_raw(&self, versions: &mut Vec<RowVersion<T>>, row_version: RowVersion<T>) {
fn insert_version_raw(&self, versions: &mut Vec<RowVersion>, row_version: RowVersion) {
// NOTICE: this is an insert a'la insertion sort, with pessimistic linear complexity.
// However, we expect the number of versions to be nearly sorted, so we deem it worthy
// to search linearly for the insertion point instead of paying the price of using
@@ -744,10 +744,10 @@ impl<Clock: LogicalClock, T: Sync + Send + Clone + Debug + 'static> MvStore<Cloc
/// TEs state is Aborted"
/// Ref: https://www.cs.cmu.edu/~15721-f24/papers/Hekaton.pdf , page 301,
/// 2.6. Updating a Version.
pub(crate) fn is_write_write_conflict<T>(
pub(crate) fn is_write_write_conflict(
txs: &SkipMap<TxID, RwLock<Transaction>>,
tx: &Transaction,
rv: &RowVersion<T>,
rv: &RowVersion,
) -> bool {
match rv.end {
Some(TxTimestampOrID::TxID(rv_end)) => {
@@ -767,7 +767,7 @@ pub(crate) fn is_write_write_conflict<T>(
}
}
impl<T> RowVersion<T> {
impl RowVersion {
pub fn is_visible_to(
&self,
tx: &Transaction,
@@ -777,10 +777,10 @@ impl<T> RowVersion<T> {
}
}
fn is_begin_visible<T>(
fn is_begin_visible(
txs: &SkipMap<TxID, RwLock<Transaction>>,
tx: &Transaction,
rv: &RowVersion<T>,
rv: &RowVersion,
) -> bool {
match rv.begin {
TxTimestampOrID::Timestamp(rv_begin_ts) => tx.begin_ts >= rv_begin_ts,
@@ -807,10 +807,10 @@ fn is_begin_visible<T>(
}
}
fn is_end_visible<T>(
fn is_end_visible(
txs: &SkipMap<TxID, RwLock<Transaction>>,
tx: &Transaction,
rv: &RowVersion<T>,
rv: &RowVersion,
) -> bool {
match rv.end {
Some(TxTimestampOrID::Timestamp(rv_end_ts)) => tx.begin_ts < rv_end_ts,

View File

@@ -1,7 +1,7 @@
use super::*;
use crate::mvcc::clock::LocalClock;
fn test_db() -> MvStore<LocalClock, String> {
fn test_db() -> MvStore<LocalClock> {
let clock = LocalClock::new();
let storage = crate::mvcc::persistent_storage::Storage::new_noop();
MvStore::new(clock, storage)
@@ -17,7 +17,7 @@ fn test_insert_read() {
table_id: 1,
row_id: 1,
},
data: "Hello".to_string(),
data: "Hello".to_string().into_bytes(),
};
db.insert(tx1, tx1_row.clone()).unwrap();
let row = db
@@ -71,7 +71,7 @@ fn test_delete() {
table_id: 1,
row_id: 1,
},
data: "Hello".to_string(),
data: "Hello".to_string().into_bytes(),
};
db.insert(tx1, tx1_row.clone()).unwrap();
let row = db
@@ -142,7 +142,7 @@ fn test_commit() {
table_id: 1,
row_id: 1,
},
data: "Hello".to_string(),
data: "Hello".to_string().into_bytes(),
};
db.insert(tx1, tx1_row.clone()).unwrap();
let row = db
@@ -161,7 +161,7 @@ fn test_commit() {
table_id: 1,
row_id: 1,
},
data: "World".to_string(),
data: "World".to_string().into_bytes(),
};
db.update(tx1, tx1_updated_row.clone()).unwrap();
let row = db
@@ -202,7 +202,7 @@ fn test_rollback() {
table_id: 1,
row_id: 1,
},
data: "Hello".to_string(),
data: "Hello".to_string().into_bytes(),
};
db.insert(tx1, row1.clone()).unwrap();
let row2 = db
@@ -221,7 +221,7 @@ fn test_rollback() {
table_id: 1,
row_id: 1,
},
data: "World".to_string(),
data: "World".to_string().into_bytes(),
};
db.update(tx1, row3.clone()).unwrap();
let row4 = db
@@ -260,7 +260,7 @@ fn test_dirty_write() {
table_id: 1,
row_id: 1,
},
data: "Hello".to_string(),
data: "Hello".to_string().into_bytes(),
};
db.insert(tx1, tx1_row.clone()).unwrap();
let row = db
@@ -282,7 +282,7 @@ fn test_dirty_write() {
table_id: 1,
row_id: 1,
},
data: "World".to_string(),
data: "World".to_string().into_bytes(),
};
assert!(!db.update(tx2, tx2_row).unwrap());
@@ -310,7 +310,7 @@ fn test_dirty_read() {
table_id: 1,
row_id: 1,
},
data: "Hello".to_string(),
data: "Hello".to_string().into_bytes(),
};
db.insert(tx1, row1).unwrap();
@@ -339,7 +339,7 @@ fn test_dirty_read_deleted() {
table_id: 1,
row_id: 1,
},
data: "Hello".to_string(),
data: "Hello".to_string().into_bytes(),
};
db.insert(tx1, tx1_row.clone()).unwrap();
db.commit_tx(tx1).unwrap();
@@ -382,7 +382,7 @@ fn test_fuzzy_read() {
table_id: 1,
row_id: 1,
},
data: "First".to_string(),
data: "First".to_string().into_bytes(),
};
db.insert(tx1, tx1_row.clone()).unwrap();
let row = db
@@ -419,7 +419,7 @@ fn test_fuzzy_read() {
table_id: 1,
row_id: 1,
},
data: "Second".to_string(),
data: "Second".to_string().into_bytes(),
};
db.update(tx3, tx3_row).unwrap();
db.commit_tx(tx3).unwrap();
@@ -444,7 +444,7 @@ fn test_fuzzy_read() {
table_id: 1,
row_id: 1,
},
data: "Third".to_string(),
data: "Third".to_string().into_bytes(),
};
let update_result = db.update(tx2, tx2_newrow);
assert_eq!(Err(DatabaseError::WriteWriteConflict), update_result);
@@ -461,7 +461,7 @@ fn test_lost_update() {
table_id: 1,
row_id: 1,
},
data: "Hello".to_string(),
data: "Hello".to_string().into_bytes(),
};
db.insert(tx1, tx1_row.clone()).unwrap();
let row = db
@@ -484,7 +484,7 @@ fn test_lost_update() {
table_id: 1,
row_id: 1,
},
data: "World".to_string(),
data: "World".to_string().into_bytes(),
};
assert!(db.update(tx2, tx2_row.clone()).unwrap());
@@ -495,7 +495,7 @@ fn test_lost_update() {
table_id: 1,
row_id: 1,
},
data: "Hello, world!".to_string(),
data: "Hello, world!".to_string().into_bytes(),
};
assert_eq!(
Err(DatabaseError::WriteWriteConflict),
@@ -532,7 +532,7 @@ fn test_committed_visibility() {
table_id: 1,
row_id: 1,
},
data: "10".to_string(),
data: "10".to_string().into_bytes(),
};
db.insert(tx1, tx1_row.clone()).unwrap();
db.commit_tx(tx1).unwrap();
@@ -544,7 +544,7 @@ fn test_committed_visibility() {
table_id: 1,
row_id: 1,
},
data: "20".to_string(),
data: "20".to_string().into_bytes(),
};
assert!(db.update(tx2, tx2_row.clone()).unwrap());
let row = db
@@ -587,7 +587,7 @@ fn test_future_row() {
table_id: 1,
row_id: 1,
},
data: "10".to_string(),
data: "10".to_string().into_bytes(),
};
db.insert(tx2, tx2_row).unwrap();
@@ -695,7 +695,7 @@ fn test_snapshot_isolation_tx_visible1() {
table_id: 1,
row_id: 1,
},
data: "testme".to_string(),
data: "testme".to_string().into_bytes(),
},
};
tracing::debug!("Testing visibility of {row_version:?}");

View File

@@ -68,7 +68,7 @@ mod tests {
};
let row = Row {
id,
data: "Hello".to_string(),
data: "Hello".to_string().into_bytes(),
};
db.insert(tx, row.clone()).unwrap();
db.commit_tx(tx).unwrap();
@@ -90,7 +90,7 @@ mod tests {
};
let row = Row {
id,
data: "World".to_string(),
data: "World".to_string().into_bytes(),
};
db.insert(tx, row.clone()).unwrap();
db.commit_tx(tx).unwrap();
@@ -134,7 +134,7 @@ mod tests {
};
let row = Row {
id,
data: format!("{prefix} @{tx}"),
data: format!("{prefix} @{tx}").into_bytes(),
};
if let Err(e) = db.upsert(tx, row.clone()) {
tracing::trace!("upsert failed: {e}");

View File

@@ -15,14 +15,14 @@ impl Storage {
}
impl Storage {
pub fn log_tx<T>(&self, _m: LogRecord<T>) -> Result<()> {
pub fn log_tx(&self, _m: LogRecord) -> Result<()> {
match self {
Self::Noop => (),
}
Ok(())
}
pub fn read_tx_log<T>(&self) -> Result<Vec<LogRecord<T>>> {
pub fn read_tx_log(&self) -> Result<Vec<LogRecord>> {
match self {
Self::Noop => Err(DatabaseError::Io(
"cannot read from Noop storage".to_string(),