From 81b79bc8335a38c5d6e6c908dcc0560489079414 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Wed, 5 Mar 2025 18:52:02 +0200 Subject: [PATCH] core/mvcc: Eliminate row generic types The logging code that writes out transactions to disk needs to write out the byte array that we actually use. The code is less hairly without the generics so drop them. --- core/benches/mvcc_benchmark.rs | 10 +++--- core/mvcc/cursor.rs | 14 +++------ core/mvcc/database/mod.rs | 48 ++++++++++++++--------------- core/mvcc/database/tests.rs | 42 ++++++++++++------------- core/mvcc/mod.rs | 6 ++-- core/mvcc/persistent_storage/mod.rs | 4 +-- 6 files changed, 60 insertions(+), 64 deletions(-) diff --git a/core/benches/mvcc_benchmark.rs b/core/benches/mvcc_benchmark.rs index b18ab93c9..3afd9b9cd 100644 --- a/core/benches/mvcc_benchmark.rs +++ b/core/benches/mvcc_benchmark.rs @@ -4,7 +4,7 @@ use limbo_core::mvcc::clock::LocalClock; use limbo_core::mvcc::database::{MvStore, Row, RowID}; use pprof::criterion::{Output, PProfProfiler}; -fn bench_db() -> MvStore { +fn bench_db() -> MvStore { let clock = LocalClock::default(); let storage = limbo_core::mvcc::persistent_storage::Storage::new_noop(); MvStore::new(clock, storage) @@ -57,7 +57,7 @@ fn bench(c: &mut Criterion) { table_id: 1, row_id: 1, }, - data: "World".to_string(), + data: "World".to_string().into_bytes(), }, ) .unwrap(); @@ -74,7 +74,7 @@ fn bench(c: &mut Criterion) { table_id: 1, row_id: 1, }, - data: "Hello".to_string(), + data: "Hello".to_string().into_bytes(), }, ) .unwrap(); @@ -100,7 +100,7 @@ fn bench(c: &mut Criterion) { table_id: 1, row_id: 1, }, - data: "Hello".to_string(), + data: "Hello".to_string().into_bytes(), }, ) .unwrap(); @@ -113,7 +113,7 @@ fn bench(c: &mut Criterion) { table_id: 1, row_id: 1, }, - data: "World".to_string(), + data: "World".to_string().into_bytes(), }, ) .unwrap(); diff --git a/core/mvcc/cursor.rs b/core/mvcc/cursor.rs index a0cc9e24c..b52fdd3d0 100644 --- a/core/mvcc/cursor.rs +++ b/core/mvcc/cursor.rs @@ -3,19 +3,15 @@ use crate::mvcc::database::{MvStore, Result, Row, RowID}; use std::fmt::Debug; #[derive(Debug)] -pub struct ScanCursor<'a, Clock: LogicalClock, T: Sync + Send + Clone + Debug> { - pub db: &'a MvStore, +pub struct ScanCursor<'a, Clock: LogicalClock> { + pub db: &'a MvStore, pub row_ids: Vec, pub index: usize, tx_id: u64, } -impl<'a, Clock: LogicalClock, T: Sync + Send + Clone + Debug + 'static> ScanCursor<'a, Clock, T> { - pub fn new( - db: &'a MvStore, - tx_id: u64, - table_id: u64, - ) -> Result> { +impl<'a, Clock: LogicalClock> ScanCursor<'a, Clock> { + pub fn new(db: &'a MvStore, tx_id: u64, table_id: u64) -> Result> { let row_ids = db.scan_row_ids_for_table(table_id)?; Ok(Self { db, @@ -32,7 +28,7 @@ impl<'a, Clock: LogicalClock, T: Sync + Send + Clone + Debug + 'static> ScanCurs Some(self.row_ids[self.index]) } - pub fn current_row(&self) -> Result>> { + pub fn current_row(&self) -> Result> { if self.index >= self.row_ids.len() { return Ok(None); } diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index ca0425ecb..21cf91f0a 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -19,29 +19,29 @@ pub struct RowID { #[derive(Clone, Debug, PartialEq, PartialOrd)] -pub struct Row { +pub struct Row { pub id: RowID, - pub data: T, + pub data: Vec, } /// A row version. #[derive(Clone, Debug, PartialEq)] -pub struct RowVersion { +pub struct RowVersion { begin: TxTimestampOrID, end: Option, - row: Row, + row: Row, } pub type TxID = u64; /// A log record contains all the versions inserted and deleted by a transaction. #[derive(Clone, Debug)] -pub struct LogRecord { +pub struct LogRecord { pub(crate) tx_timestamp: TxID, - row_versions: Vec>, + row_versions: Vec, } -impl LogRecord { +impl LogRecord { fn new(tx_timestamp: TxID) -> Self { Self { tx_timestamp, @@ -211,15 +211,15 @@ impl AtomicTransactionState { /// A multi-version concurrency control database. #[derive(Debug)] -pub struct MvStore { - rows: SkipMap>>>, +pub struct MvStore { + rows: SkipMap>>, txs: SkipMap>, tx_ids: AtomicU64, clock: Clock, storage: Storage, } -impl MvStore { +impl MvStore { /// Creates a new database. pub fn new(clock: Clock, storage: Storage) -> Self { Self { @@ -241,7 +241,7 @@ impl MvStore) -> Result<()> { + pub fn insert(&self, tx_id: TxID, row: Row) -> Result<()> { let tx = self .txs .get(&tx_id) @@ -278,7 +278,7 @@ impl MvStore) -> Result { + pub fn update(&self, tx_id: TxID, row: Row) -> Result { if !self.delete(tx_id, row.id)? { return Ok(false); } @@ -288,7 +288,7 @@ impl MvStore) -> Result<()> { + pub fn upsert(&self, tx_id: TxID, row: Row) -> Result<()> { self.delete(tx_id, row.id)?; self.insert(tx_id, row) } @@ -361,7 +361,7 @@ impl MvStore Result>> { + pub fn read(&self, tx_id: TxID, id: RowID) -> Result> { let tx = self.txs.get(&tx_id).unwrap(); let tx = tx.value().read().unwrap(); assert_eq!(tx.state, TransactionState::Active); @@ -520,7 +520,7 @@ impl MvStore = LogRecord::new(end_ts); + let mut log_record = LogRecord::new(end_ts); for ref id in write_set { if let Some(row_versions) = self.rows.get(id) { let mut row_versions = row_versions.value().write().unwrap(); @@ -700,7 +700,7 @@ impl MvStore) { + fn insert_version(&self, id: RowID, row_version: RowVersion) { let versions = self.rows.get_or_insert_with(id, || RwLock::new(Vec::new())); let mut versions = versions.value().write().unwrap(); self.insert_version_raw(&mut versions, row_version) @@ -708,7 +708,7 @@ impl MvStore>, row_version: RowVersion) { + fn insert_version_raw(&self, versions: &mut Vec, row_version: RowVersion) { // NOTICE: this is an insert a'la insertion sort, with pessimistic linear complexity. // However, we expect the number of versions to be nearly sorted, so we deem it worthy // to search linearly for the insertion point instead of paying the price of using @@ -744,10 +744,10 @@ impl MvStore( +pub(crate) fn is_write_write_conflict( txs: &SkipMap>, tx: &Transaction, - rv: &RowVersion, + rv: &RowVersion, ) -> bool { match rv.end { Some(TxTimestampOrID::TxID(rv_end)) => { @@ -767,7 +767,7 @@ pub(crate) fn is_write_write_conflict( } } -impl RowVersion { +impl RowVersion { pub fn is_visible_to( &self, tx: &Transaction, @@ -777,10 +777,10 @@ impl RowVersion { } } -fn is_begin_visible( +fn is_begin_visible( txs: &SkipMap>, tx: &Transaction, - rv: &RowVersion, + rv: &RowVersion, ) -> bool { match rv.begin { TxTimestampOrID::Timestamp(rv_begin_ts) => tx.begin_ts >= rv_begin_ts, @@ -807,10 +807,10 @@ fn is_begin_visible( } } -fn is_end_visible( +fn is_end_visible( txs: &SkipMap>, tx: &Transaction, - rv: &RowVersion, + rv: &RowVersion, ) -> bool { match rv.end { Some(TxTimestampOrID::Timestamp(rv_end_ts)) => tx.begin_ts < rv_end_ts, diff --git a/core/mvcc/database/tests.rs b/core/mvcc/database/tests.rs index f3dcabca9..7d99d5414 100644 --- a/core/mvcc/database/tests.rs +++ b/core/mvcc/database/tests.rs @@ -1,7 +1,7 @@ use super::*; use crate::mvcc::clock::LocalClock; -fn test_db() -> MvStore { +fn test_db() -> MvStore { let clock = LocalClock::new(); let storage = crate::mvcc::persistent_storage::Storage::new_noop(); MvStore::new(clock, storage) @@ -17,7 +17,7 @@ fn test_insert_read() { table_id: 1, row_id: 1, }, - data: "Hello".to_string(), + data: "Hello".to_string().into_bytes(), }; db.insert(tx1, tx1_row.clone()).unwrap(); let row = db @@ -71,7 +71,7 @@ fn test_delete() { table_id: 1, row_id: 1, }, - data: "Hello".to_string(), + data: "Hello".to_string().into_bytes(), }; db.insert(tx1, tx1_row.clone()).unwrap(); let row = db @@ -142,7 +142,7 @@ fn test_commit() { table_id: 1, row_id: 1, }, - data: "Hello".to_string(), + data: "Hello".to_string().into_bytes(), }; db.insert(tx1, tx1_row.clone()).unwrap(); let row = db @@ -161,7 +161,7 @@ fn test_commit() { table_id: 1, row_id: 1, }, - data: "World".to_string(), + data: "World".to_string().into_bytes(), }; db.update(tx1, tx1_updated_row.clone()).unwrap(); let row = db @@ -202,7 +202,7 @@ fn test_rollback() { table_id: 1, row_id: 1, }, - data: "Hello".to_string(), + data: "Hello".to_string().into_bytes(), }; db.insert(tx1, row1.clone()).unwrap(); let row2 = db @@ -221,7 +221,7 @@ fn test_rollback() { table_id: 1, row_id: 1, }, - data: "World".to_string(), + data: "World".to_string().into_bytes(), }; db.update(tx1, row3.clone()).unwrap(); let row4 = db @@ -260,7 +260,7 @@ fn test_dirty_write() { table_id: 1, row_id: 1, }, - data: "Hello".to_string(), + data: "Hello".to_string().into_bytes(), }; db.insert(tx1, tx1_row.clone()).unwrap(); let row = db @@ -282,7 +282,7 @@ fn test_dirty_write() { table_id: 1, row_id: 1, }, - data: "World".to_string(), + data: "World".to_string().into_bytes(), }; assert!(!db.update(tx2, tx2_row).unwrap()); @@ -310,7 +310,7 @@ fn test_dirty_read() { table_id: 1, row_id: 1, }, - data: "Hello".to_string(), + data: "Hello".to_string().into_bytes(), }; db.insert(tx1, row1).unwrap(); @@ -339,7 +339,7 @@ fn test_dirty_read_deleted() { table_id: 1, row_id: 1, }, - data: "Hello".to_string(), + data: "Hello".to_string().into_bytes(), }; db.insert(tx1, tx1_row.clone()).unwrap(); db.commit_tx(tx1).unwrap(); @@ -382,7 +382,7 @@ fn test_fuzzy_read() { table_id: 1, row_id: 1, }, - data: "First".to_string(), + data: "First".to_string().into_bytes(), }; db.insert(tx1, tx1_row.clone()).unwrap(); let row = db @@ -419,7 +419,7 @@ fn test_fuzzy_read() { table_id: 1, row_id: 1, }, - data: "Second".to_string(), + data: "Second".to_string().into_bytes(), }; db.update(tx3, tx3_row).unwrap(); db.commit_tx(tx3).unwrap(); @@ -444,7 +444,7 @@ fn test_fuzzy_read() { table_id: 1, row_id: 1, }, - data: "Third".to_string(), + data: "Third".to_string().into_bytes(), }; let update_result = db.update(tx2, tx2_newrow); assert_eq!(Err(DatabaseError::WriteWriteConflict), update_result); @@ -461,7 +461,7 @@ fn test_lost_update() { table_id: 1, row_id: 1, }, - data: "Hello".to_string(), + data: "Hello".to_string().into_bytes(), }; db.insert(tx1, tx1_row.clone()).unwrap(); let row = db @@ -484,7 +484,7 @@ fn test_lost_update() { table_id: 1, row_id: 1, }, - data: "World".to_string(), + data: "World".to_string().into_bytes(), }; assert!(db.update(tx2, tx2_row.clone()).unwrap()); @@ -495,7 +495,7 @@ fn test_lost_update() { table_id: 1, row_id: 1, }, - data: "Hello, world!".to_string(), + data: "Hello, world!".to_string().into_bytes(), }; assert_eq!( Err(DatabaseError::WriteWriteConflict), @@ -532,7 +532,7 @@ fn test_committed_visibility() { table_id: 1, row_id: 1, }, - data: "10".to_string(), + data: "10".to_string().into_bytes(), }; db.insert(tx1, tx1_row.clone()).unwrap(); db.commit_tx(tx1).unwrap(); @@ -544,7 +544,7 @@ fn test_committed_visibility() { table_id: 1, row_id: 1, }, - data: "20".to_string(), + data: "20".to_string().into_bytes(), }; assert!(db.update(tx2, tx2_row.clone()).unwrap()); let row = db @@ -587,7 +587,7 @@ fn test_future_row() { table_id: 1, row_id: 1, }, - data: "10".to_string(), + data: "10".to_string().into_bytes(), }; db.insert(tx2, tx2_row).unwrap(); @@ -695,7 +695,7 @@ fn test_snapshot_isolation_tx_visible1() { table_id: 1, row_id: 1, }, - data: "testme".to_string(), + data: "testme".to_string().into_bytes(), }, }; tracing::debug!("Testing visibility of {row_version:?}"); diff --git a/core/mvcc/mod.rs b/core/mvcc/mod.rs index 382736f0d..70d36461f 100644 --- a/core/mvcc/mod.rs +++ b/core/mvcc/mod.rs @@ -68,7 +68,7 @@ mod tests { }; let row = Row { id, - data: "Hello".to_string(), + data: "Hello".to_string().into_bytes(), }; db.insert(tx, row.clone()).unwrap(); db.commit_tx(tx).unwrap(); @@ -90,7 +90,7 @@ mod tests { }; let row = Row { id, - data: "World".to_string(), + data: "World".to_string().into_bytes(), }; db.insert(tx, row.clone()).unwrap(); db.commit_tx(tx).unwrap(); @@ -134,7 +134,7 @@ mod tests { }; let row = Row { id, - data: format!("{prefix} @{tx}"), + data: format!("{prefix} @{tx}").into_bytes(), }; if let Err(e) = db.upsert(tx, row.clone()) { tracing::trace!("upsert failed: {e}"); diff --git a/core/mvcc/persistent_storage/mod.rs b/core/mvcc/persistent_storage/mod.rs index 3f9ff2171..4b9b06407 100644 --- a/core/mvcc/persistent_storage/mod.rs +++ b/core/mvcc/persistent_storage/mod.rs @@ -15,14 +15,14 @@ impl Storage { } impl Storage { - pub fn log_tx(&self, _m: LogRecord) -> Result<()> { + pub fn log_tx(&self, _m: LogRecord) -> Result<()> { match self { Self::Noop => (), } Ok(()) } - pub fn read_tx_log(&self) -> Result>> { + pub fn read_tx_log(&self) -> Result> { match self { Self::Noop => Err(DatabaseError::Io( "cannot read from Noop storage".to_string(),