diff --git a/core/mvcc/mvcc-rs/src/database/mod.rs b/core/mvcc/mvcc-rs/src/database/mod.rs index 66d82e54d..3307b831b 100644 --- a/core/mvcc/mvcc-rs/src/database/mod.rs +++ b/core/mvcc/mvcc-rs/src/database/mod.rs @@ -2,12 +2,9 @@ use crate::clock::LogicalClock; use crate::errors::DatabaseError; use crate::persistent_storage::Storage; use crossbeam_skiplist::{SkipMap, SkipSet}; -use parking_lot::Mutex; use serde::{Deserialize, Serialize}; -use std::cell::RefCell; -use std::collections::BTreeMap; use std::sync::atomic::{AtomicU64, Ordering}; -use std::sync::{Arc, RwLock}; +use std::sync::RwLock; pub type Result = std::result::Result; @@ -150,7 +147,8 @@ impl std::fmt::Display for Transaction { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> { write!( f, - "{{ id: {}, begin_ts: {}, write_set: {:?}, read_set: {:?}", + "{{ state: {}, id: {}, begin_ts: {}, write_set: {:?}, read_set: {:?}", + self.state, self.tx_id, self.begin_ts, // FIXME: I'm sorry, we obviously shouldn't be cloning here. @@ -171,30 +169,41 @@ impl std::fmt::Display for Transaction { enum TransactionState { Active, Preparing, - Committed, + Committed(u64), Aborted, Terminated, } -/// A database with MVCC. +impl std::fmt::Display for TransactionState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> { + match self { + TransactionState::Active => write!(f, "Active"), + TransactionState::Preparing => write!(f, "Preparing"), + TransactionState::Committed(ts) => write!(f, "Committed({ts})"), + TransactionState::Aborted => write!(f, "Aborted"), + TransactionState::Terminated => write!(f, "Terminated"), + } + } +} + #[derive(Debug)] pub struct Database { - inner: Arc>>, + rows: SkipMap>>, + txs: SkipMap>, + tx_ids: AtomicU64, + clock: Clock, + storage: Storage, } impl Database { /// Creates a new database. pub fn new(clock: Clock, storage: Storage) -> Self { - let inner = DatabaseInner { + Self { rows: SkipMap::new(), txs: SkipMap::new(), - tx_timestamps: RefCell::new(BTreeMap::new()), tx_ids: AtomicU64::new(1), // let's reserve transaction 0 for special purposes clock, storage, - }; - Self { - inner: Arc::new(Mutex::new(inner)), } } @@ -209,8 +218,23 @@ impl Database { /// * `row` - the row object containing the values to be inserted. /// pub fn insert(&self, tx_id: TxID, row: Row) -> Result<()> { - let inner = self.inner.lock(); - inner.insert(tx_id, row) + let tx = self + .txs + .get(&tx_id) + .ok_or(DatabaseError::NoSuchTransactionID(tx_id))?; + let mut tx = tx.value().write().unwrap(); + assert!(tx.state == TransactionState::Active); + let id = row.id; + let row_version = RowVersion { + begin: TxTimestampOrID::TxID(tx.tx_id), + end: None, + row, + }; + let versions = self.rows.get_or_insert_with(id, || RwLock::new(Vec::new())); + let mut versions = versions.value().write().unwrap(); + versions.push(row_version); + tx.insert_to_write_set(id); + Ok(()) } /// Updates a row in the database with new values. @@ -254,123 +278,6 @@ impl Database { /// Returns `true` if the row was successfully deleted, and `false` otherwise. /// pub fn delete(&self, tx_id: TxID, id: RowID) -> Result { - let inner = self.inner.lock(); - inner.delete(tx_id, id) - } - - /// Retrieves a row from the table with the given `id`. - /// - /// This operation is performed within the scope of the transaction identified - /// by `tx_id`. - /// - /// # Arguments - /// - /// * `tx_id` - The ID of the transaction to perform the read operation in. - /// * `id` - The ID of the row to retrieve. - /// - /// # Returns - /// - /// Returns `Some(row)` with the row data if the row with the given `id` exists, - /// and `None` otherwise. - pub fn read(&self, tx_id: TxID, id: RowID) -> Result> { - let inner = self.inner.lock(); - inner.read(tx_id, id) - } - - pub fn scan_row_ids(&self) -> Result> { - let inner = self.inner.lock(); - inner.scan_row_ids() - } - - pub fn scan_row_ids_for_table(&self, table_id: u64) -> Result> { - let inner = self.inner.lock(); - inner.scan_row_ids_for_table(table_id) - } - - /// Begins a new transaction in the database. - /// - /// This function starts a new transaction in the database and returns a `TxID` value - /// that you can use to perform operations within the transaction. All changes made within the - /// transaction are isolated from other transactions until you commit the transaction. - pub fn begin_tx(&self) -> TxID { - let mut inner = self.inner.lock(); - inner.begin_tx() - } - - /// Commits a transaction with the specified transaction ID. - /// - /// This function commits the changes made within the specified transaction and finalizes the - /// transaction. Once a transaction has been committed, all changes made within the transaction - /// are visible to other transactions that access the same data. - /// - /// # Arguments - /// - /// * `tx_id` - The ID of the transaction to commit. - pub fn commit_tx(&self, tx_id: TxID) -> Result<()> { - let mut inner = self.inner.lock(); - inner.commit_tx(tx_id) - } - - /// Rolls back a transaction with the specified ID. - /// - /// This function rolls back a transaction with the specified `tx_id` by - /// discarding any changes made by the transaction. - /// - /// # Arguments - /// - /// * `tx_id` - The ID of the transaction to abort. - pub fn rollback_tx(&self, tx_id: TxID) { - let inner = self.inner.lock(); - inner.rollback_tx(tx_id); - } - - /// Drops all unused row versions from the database. - /// - /// A version is considered unused if it is not visible to any active transaction - /// and it is not the most recent version of the row. - pub fn drop_unused_row_versions(&self) { - let inner = self.inner.lock(); - inner.drop_unused_row_versions(); - } - - pub fn recover(&self) -> Result<()> { - let inner = self.inner.lock(); - inner.recover() - } -} - -#[derive(Debug)] -pub struct DatabaseInner { - rows: SkipMap>>, - txs: SkipMap>, - tx_timestamps: RefCell>, - tx_ids: AtomicU64, - clock: Clock, - storage: Storage, -} - -impl DatabaseInner { - fn insert(&self, tx_id: TxID, row: Row) -> Result<()> { - let tx = self - .txs - .get(&tx_id) - .ok_or(DatabaseError::NoSuchTransactionID(tx_id))?; - let mut tx = tx.value().write().unwrap(); - assert!(tx.state == TransactionState::Active); - let id = row.id; - let row_version = RowVersion { - begin: TxTimestampOrID::TxID(tx.tx_id), - end: None, - row, - }; - let versions = self.rows.get_or_insert_with(id, || RwLock::new(Vec::new())); - let mut versions = versions.value().write().unwrap(); - versions.push(row_version); - tx.insert_to_write_set(id); - Ok(()) - } - - fn delete(&self, tx_id: TxID, id: RowID) -> Result { let row_versions_opt = self.rows.get(&id); if let Some(ref row_versions) = row_versions_opt { let mut row_versions = row_versions.value().write().unwrap(); @@ -404,7 +311,21 @@ impl DatabaseInner { Ok(false) } - fn read(&self, tx_id: TxID, id: RowID) -> Result> { + /// Retrieves a row from the table with the given `id`. + /// + /// This operation is performed within the scope of the transaction identified + /// by `tx_id`. + /// + /// # Arguments + /// + /// * `tx_id` - The ID of the transaction to perform the read operation in. + /// * `id` - The ID of the row to retrieve. + /// + /// # Returns + /// + /// Returns `Some(row)` with the row data if the row with the given `id` exists, + /// and `None` otherwise. + pub fn read(&self, tx_id: TxID, id: RowID) -> Result> { let tx = self.txs.get(&tx_id).unwrap(); let tx = tx.value().read().unwrap(); assert!(tx.state == TransactionState::Active); @@ -420,12 +341,14 @@ impl DatabaseInner { Ok(None) } - fn scan_row_ids(&self) -> Result> { + /// Gets all row ids in the database. + pub fn scan_row_ids(&self) -> Result> { let keys = self.rows.iter().map(|entry| *entry.key()); Ok(keys.collect()) } - fn scan_row_ids_for_table(&self, table_id: u64) -> Result> { + /// Gets all row ids in the database for a given table. + pub fn scan_row_ids_for_table(&self, table_id: u64) -> Result> { Ok(self .rows .range( @@ -441,18 +364,30 @@ impl DatabaseInner { .collect()) } - fn begin_tx(&mut self) -> TxID { + /// Begins a new transaction in the database. + /// + /// This function starts a new transaction in the database and returns a `TxID` value + /// that you can use to perform operations within the transaction. All changes made within the + /// transaction are isolated from other transactions until you commit the transaction. + pub fn begin_tx(&self) -> TxID { let tx_id = self.get_tx_id(); let begin_ts = self.get_timestamp(); let tx = Transaction::new(tx_id, begin_ts); tracing::trace!("BEGIN {tx}"); - let mut tx_timestamps = self.tx_timestamps.borrow_mut(); self.txs.insert(tx_id, RwLock::new(tx)); - *tx_timestamps.entry(begin_ts).or_insert(0) += 1; tx_id } - fn commit_tx(&mut self, tx_id: TxID) -> Result<()> { + /// Commits a transaction with the specified transaction ID. + /// + /// This function commits the changes made within the specified transaction and finalizes the + /// transaction. Once a transaction has been committed, all changes made within the transaction + /// are visible to other transactions that access the same data. + /// + /// # Arguments + /// + /// * `tx_id` - The ID of the transaction to commit. + pub fn commit_tx(&self, tx_id: TxID) -> Result<()> { let end_ts = self.get_timestamp(); let tx = self.txs.get(&tx_id).unwrap(); let mut tx = tx.value().write().unwrap(); @@ -464,6 +399,84 @@ impl DatabaseInner { } tx.state = TransactionState::Preparing; tracing::trace!("PREPARE {tx}"); + + /* TODO: The code we have here is sufficient for snapshot isolation. + ** In order to implement serializability, we need the following steps: + ** + ** 1. Validate if all read versions are still visible by inspecting the read_set + ** 2. Validate if there are no phantoms by walking the scans from scan_set (which we don't even have yet) + ** - a phantom is a version that became visible in the middle of our transaction, + ** but wasn't taken into account during one of the scans from the scan_set + ** 3. Wait for commit dependencies, which we don't even track yet... + ** Excerpt from what's a commit dependency and how it's tracked in the original paper: + ** """ + A transaction T1 has a commit dependency on another transaction + T2, if T1 is allowed to commit only if T2 commits. If T2 aborts, + T1 must also abort, so cascading aborts are possible. T1 acquires a + commit dependency either by speculatively reading or speculatively ignoring a version, + instead of waiting for T2 to commit. + We implement commit dependencies by a register-and-report + approach: T1 registers its dependency with T2 and T2 informs T1 + when it has committed or aborted. Each transaction T contains a + counter, CommitDepCounter, that counts how many unresolved + commit dependencies it still has. A transaction cannot commit + until this counter is zero. In addition, T has a Boolean variable + AbortNow that other transactions can set to tell T to abort. Each + transaction T also has a set, CommitDepSet, that stores transaction IDs + of the transactions that depend on T. + To take a commit dependency on a transaction T2, T1 increments + its CommitDepCounter and adds its transaction ID to T2’s CommitDepSet. + When T2 has committed, it locates each transaction in + its CommitDepSet and decrements their CommitDepCounter. If + T2 aborted, it tells the dependent transactions to also abort by + setting their AbortNow flags. If a dependent transaction is not + found, this means that it has already aborted. + Note that a transaction with commit dependencies may not have to + wait at all - the dependencies may have been resolved before it is + ready to commit. Commit dependencies consolidate all waits into + a single wait and postpone the wait to just before commit. + Some transactions may have to wait before commit. + Waiting raises a concern of deadlocks. + However, deadlocks cannot occur because an older transaction never + waits on a younger transaction. In + a wait-for graph the direction of edges would always be from a + younger transaction (higher end timestamp) to an older transaction + (lower end timestamp) so cycles are impossible. + """ + ** If you're wondering when a speculative read happens, here you go: + ** Case 1: speculative read of TB: + """ + If transaction TB is in the Preparing state, it has acquired an end + timestamp TS which will be V’s begin timestamp if TB commits. + A safe approach in this situation would be to have transaction T + wait until transaction TB commits. However, we want to avoid all + blocking during normal processing so instead we continue with + the visibility test and, if the test returns true, allow T to + speculatively read V. Transaction T acquires a commit dependency on + TB, restricting the serialization order of the two transactions. That + is, T is allowed to commit only if TB commits. + """ + ** Case 2: speculative ignore of TE: + """ + If TE’s state is Preparing, it has an end timestamp TS that will become + the end timestamp of V if TE does commit. If TS is greater than the read + time RT, it is obvious that V will be visible if TE commits. If TE + aborts, V will still be visible, because any transaction that updates + V after TE has aborted will obtain an end timestamp greater than + TS. If TS is less than RT, we have a more complicated situation: + if TE commits, V will not be visible to T but if TE aborts, it will + be visible. We could handle this by forcing T to wait until TE + commits or aborts but we want to avoid all blocking during normal processing. + Instead we allow T to speculatively ignore V and + proceed with its processing. Transaction T acquires a commit + dependency (see Section 2.7) on TE, that is, T is allowed to commit + only if TE commits. + """ + */ + tx.state = TransactionState::Committed(end_ts); + tracing::trace!("COMMIT {tx}"); + // Postprocessing: inserting row versions and logging the transaction to persistent storage. + // TODO: we should probably save to persistent storage first, and only then update the in-memory structures. let mut log_record: LogRecord = LogRecord::new(end_ts); for id in &tx.write_set { let id = id.value(); @@ -485,21 +498,14 @@ impl DatabaseInner { } } } - tx.state = TransactionState::Committed; - tracing::trace!("COMMIT {tx}"); // We have now updated all the versions with a reference to the // transaction ID to a timestamp and can, therefore, remove the // transaction. Please note that when we move to lockless, the // invariant doesn't necessarily hold anymore because another thread // might have speculatively read a version that we want to remove. // But that's a problem for another day. - let mut tx_timestamps = self.tx_timestamps.borrow_mut(); - if let Some(timestamp_entry) = tx_timestamps.get_mut(&tx.begin_ts) { - *timestamp_entry -= 1; - if timestamp_entry == &0 { - tx_timestamps.remove(&tx.begin_ts); - } - } + // FIXME: it actually just become a problem for today!!! + // TODO: test that reproduces this failure, and then a fix self.txs.remove(&tx_id); if !log_record.row_versions.is_empty() { self.storage.log_tx(log_record)?; @@ -507,7 +513,15 @@ impl DatabaseInner { Ok(()) } - fn rollback_tx(&self, tx_id: TxID) { + /// Rolls back a transaction with the specified ID. + /// + /// This function rolls back a transaction with the specified `tx_id` by + /// discarding any changes made by the transaction. + /// + /// # Arguments + /// + /// * `tx_id` - The ID of the transaction to abort. + pub fn rollback_tx(&self, tx_id: TxID) { let tx = self.txs.get(&tx_id).unwrap(); let mut tx = tx.value().write().unwrap(); assert!(tx.state == TransactionState::Active); @@ -527,39 +541,30 @@ impl DatabaseInner { tracing::trace!("TERMINATE {tx}"); } - fn get_tx_id(&mut self) -> u64 { + /// Generates next unique transaction id + pub fn get_tx_id(&self) -> u64 { self.tx_ids.fetch_add(1, Ordering::SeqCst) } - fn get_timestamp(&mut self) -> u64 { + /// Gets current timestamp + pub fn get_timestamp(&self) -> u64 { self.clock.get_timestamp() } - /// Drops all rows that are not visible to any transaction. - /// The logic is as follows. If a row version has an end marker - /// which denotes a transaction that is not active, then we can - /// drop the row version -- it is not visible to any transaction. - /// If a row version has an end marker that denotes a timestamp T_END, - /// then we can drop the row version only if all active transactions - /// have a begin timestamp that is greater than timestamp T_END. - /// FIXME: this function is a full scan over all rows and row versions. - /// We can do better by keeping an index of row versions ordered - /// by their end timestamps. - fn drop_unused_row_versions(&self) { - let tx_timestamps = self.tx_timestamps.borrow(); + /// FIXME: implement in a lock-free manner + pub fn drop_unused_row_versions(&self) { let mut to_remove = Vec::new(); for entry in self.rows.iter() { let mut row_versions = entry.value().write().unwrap(); row_versions.retain(|rv| { let should_stay = match rv.end { Some(TxTimestampOrID::Timestamp(version_end_ts)) => { - match tx_timestamps.first_key_value() { - // a transaction started before this row version ended, - // ergo row version is needed - Some((begin_ts, _)) => version_end_ts >= *begin_ts, - // no transaction => row version is not needed - None => false, - } + // a transaction started before this row version ended, + // ergo row version is needed + // NOTICE: O(row_versions x transactions), but also lock-free, so sounds acceptable + self.txs + .iter() + .any(|tx| version_end_ts >= tx.value().read().unwrap().begin_ts) } // Let's skip potentially complex logic if the transaction is still // active/tracked. We will drop the row version when the transaction @@ -606,7 +611,7 @@ impl DatabaseInner { /// A write-write conflict happens when transaction T_m attempts to update a /// row version that is currently being updated by an active transaction T_n. -fn is_write_write_conflict( +pub(crate) fn is_write_write_conflict( txs: &SkipMap>, tx: &Transaction, rv: &RowVersion, @@ -618,7 +623,7 @@ fn is_write_write_conflict( match te.state { TransactionState::Active => tx.tx_id != te.tx_id, TransactionState::Preparing => todo!(), - TransactionState::Committed => todo!(), + TransactionState::Committed(_end_ts) => todo!(), TransactionState::Aborted => todo!(), TransactionState::Terminated => todo!(), } @@ -628,7 +633,7 @@ fn is_write_write_conflict( } } -fn is_version_visible( +pub(crate) fn is_version_visible( txs: &SkipMap>, tx: &Transaction, rv: &RowVersion, @@ -646,13 +651,22 @@ fn is_begin_visible( TxTimestampOrID::TxID(rv_begin) => { let tb = txs.get(&rv_begin).unwrap(); let tb = tb.value().read().unwrap(); - match tb.state { + let visible = match tb.state { TransactionState::Active => tx.tx_id == tb.tx_id && rv.end.is_none(), - TransactionState::Preparing => todo!(), - TransactionState::Committed => todo!(), - TransactionState::Aborted => todo!(), - TransactionState::Terminated => todo!(), - } + TransactionState::Preparing => false, // NOTICE: makes sense for snapshot isolation, not so much for serializable! + TransactionState::Committed(committed_ts) => tx.begin_ts >= committed_ts, + TransactionState::Aborted => false, + TransactionState::Terminated => { + tracing::debug!("TODO: should reread rv's end field - it should have updated the timestamp in the row version by now"); + false + } + }; + tracing::trace!( + "is_begin_visible: tx={tx}, tb={tb} rv = {:?}-{:?} visible = {visible}", + rv.begin, + rv.end + ); + visible } } } @@ -667,13 +681,22 @@ fn is_end_visible( Some(TxTimestampOrID::TxID(rv_end)) => { let te = txs.get(&rv_end).unwrap(); let te = te.value().read().unwrap(); - match te.state { + let visible = match te.state { TransactionState::Active => tx.tx_id != te.tx_id, - TransactionState::Preparing => todo!(), - TransactionState::Committed => todo!(), - TransactionState::Aborted => todo!(), - TransactionState::Terminated => todo!(), - } + TransactionState::Preparing => false, // NOTICE: makes sense for snapshot isolation, not so much for serializable! + TransactionState::Committed(committed_ts) => tx.begin_ts < committed_ts, + TransactionState::Aborted => false, + TransactionState::Terminated => { + tracing::debug!("TODO: should reread rv's end field - it should have updated the timestamp in the row version by now"); + false + } + }; + tracing::trace!( + "is_end_visible: tx={tx}, te={te} rv = {:?}-{:?} visible = {visible}", + rv.begin, + rv.end + ); + visible } None => true, } diff --git a/core/mvcc/mvcc-rs/src/database/tests.rs b/core/mvcc/mvcc-rs/src/database/tests.rs index 29bf1c4f7..ada842218 100644 --- a/core/mvcc/mvcc-rs/src/database/tests.rs +++ b/core/mvcc/mvcc-rs/src/database/tests.rs @@ -337,7 +337,6 @@ fn test_dirty_read() { assert_eq!(row2, None); } -#[ignore] #[traced_test] #[test] fn test_dirty_read_deleted() { @@ -777,3 +776,157 @@ fn test_storage1() { "testme3" ); } + +/* States described in the Hekaton paper *for serializability*: + +Table 1: Case analysis of action to take when version V’s +Begin field contains the ID of transaction TB +------------------------------------------------------------------------------------------------------ +TB’s state | TB’s end timestamp | Action to take when transaction T checks visibility of version V. +------------------------------------------------------------------------------------------------------ +Active | Not set | V is visible only if TB=T and V’s end timestamp equals infinity. +------------------------------------------------------------------------------------------------------ +Preparing | TS | V’s begin timestamp will be TS ut V is not yet committed. Use TS + | as V’s begin time when testing visibility. If the test is true, + | allow T to speculatively read V. Committed TS V’s begin timestamp + | will be TS and V is committed. Use TS as V’s begin time to test + | visibility. +------------------------------------------------------------------------------------------------------ +Committed | TS | V’s begin timestamp will be TS and V is committed. Use TS as V’s + | begin time to test visibility. +------------------------------------------------------------------------------------------------------ +Aborted | Irrelevant | Ignore V; it’s a garbage version. +------------------------------------------------------------------------------------------------------ +Terminated | Irrelevant | Reread V’s Begin field. TB has terminated so it must have finalized +or not found | | the timestamp. +------------------------------------------------------------------------------------------------------ + +Table 2: Case analysis of action to take when V's End field +contains a transaction ID TE. +------------------------------------------------------------------------------------------------------ +TE’s state | TE’s end timestamp | Action to take when transaction T checks visibility of a version V + | | as of read time RT. +------------------------------------------------------------------------------------------------------ +Active | Not set | V is visible only if TE is not T. +------------------------------------------------------------------------------------------------------ +Preparing | TS | V’s end timestamp will be TS provided that TE commits. If TS > RT, + | V is visible to T. If TS < RT, T speculatively ignores V. +------------------------------------------------------------------------------------------------------ +Committed | TS | V’s end timestamp will be TS and V is committed. Use TS as V’s end + | timestamp when testing visibility. +------------------------------------------------------------------------------------------------------ +Aborted | Irrelevant | V is visible. +------------------------------------------------------------------------------------------------------ +Terminated | Irrelevant | Reread V’s End field. TE has terminated so it must have finalized +or not found | | the timestamp. +*/ + +fn new_tx(tx_id: TxID, begin_ts: u64, state: TransactionState) -> RwLock { + RwLock::new(Transaction { + state, + tx_id, + begin_ts, + write_set: SkipSet::new(), + read_set: SkipSet::new(), + }) +} + +#[traced_test] +#[test] +fn test_snapshot_isolation_tx_visible1() { + let txs: SkipMap> = SkipMap::from_iter([ + (1, new_tx(1, 1, TransactionState::Committed(2))), + (2, new_tx(2, 2, TransactionState::Committed(5))), + (3, new_tx(3, 3, TransactionState::Aborted)), + (5, new_tx(5, 5, TransactionState::Preparing)), + (6, new_tx(6, 6, TransactionState::Committed(10))), + (7, new_tx(7, 7, TransactionState::Active)), + ]); + + let current_tx = new_tx(4, 4, TransactionState::Preparing); + let current_tx = current_tx.read().unwrap(); + + let rv_visible = |begin: TxTimestampOrID, end: Option| { + let row_version = RowVersion { + begin, + end, + row: Row { + id: RowID { + table_id: 1, + row_id: 1, + }, + data: "testme".to_string(), + }, + }; + tracing::debug!("Testing visibility of {row_version:?}"); + is_version_visible(&txs, ¤t_tx, &row_version) + }; + + // begin visible: transaction committed with ts < current_tx.begin_ts + // end visible: inf + assert!(rv_visible(TxTimestampOrID::TxID(1), None)); + + // begin invisible: transaction committed with ts > current_tx.begin_ts + assert!(!rv_visible(TxTimestampOrID::TxID(2), None)); + + // begin invisible: transaction aborted + assert!(!rv_visible(TxTimestampOrID::TxID(3), None)); + + // begin visible: timestamp < current_tx.begin_ts + // end invisible: transaction committed with ts > current_tx.begin_ts + assert!(!rv_visible( + TxTimestampOrID::Timestamp(0), + Some(TxTimestampOrID::TxID(1)) + )); + + // begin visible: timestamp < current_tx.begin_ts + // end visible: transaction committed with ts < current_tx.begin_ts + assert!(rv_visible( + TxTimestampOrID::Timestamp(0), + Some(TxTimestampOrID::TxID(2)) + )); + + // begin visible: timestamp < current_tx.begin_ts + // end invisible: transaction aborted + assert!(!rv_visible( + TxTimestampOrID::Timestamp(0), + Some(TxTimestampOrID::TxID(3)) + )); + + // begin invisible: transaction preparing + assert!(!rv_visible(TxTimestampOrID::TxID(5), None)); + + // begin invisible: transaction committed with ts > current_tx.begin_ts + assert!(!rv_visible(TxTimestampOrID::TxID(6), None)); + + // begin invisible: transaction active + assert!(!rv_visible(TxTimestampOrID::TxID(7), None)); + + // begin invisible: transaction committed with ts > current_tx.begin_ts + assert!(!rv_visible(TxTimestampOrID::TxID(6), None)); + + // begin invisible: transaction active + assert!(!rv_visible(TxTimestampOrID::TxID(7), None)); + + // begin visible: timestamp < current_tx.begin_ts + // end invisible: transaction preparing + assert!(!rv_visible( + TxTimestampOrID::Timestamp(0), + Some(TxTimestampOrID::TxID(5)) + )); + + // begin invisible: timestamp > current_tx.begin_ts + assert!(!rv_visible( + TxTimestampOrID::Timestamp(6), + Some(TxTimestampOrID::TxID(6)) + )); + + // begin visible: timestamp < current_tx.begin_ts + // end visible: some active transaction will eventually overwrite this version, + // but that hasn't happened + // (this is the https://avi.im/blag/2023/hekaton-paper-typo/ case, I believe!) + assert!(rv_visible( + TxTimestampOrID::Timestamp(0), + Some(TxTimestampOrID::TxID(7)) + )); +} diff --git a/core/mvcc/mvcc-rs/src/persistent_storage/mod.rs b/core/mvcc/mvcc-rs/src/persistent_storage/mod.rs index f927be381..185a432ee 100644 --- a/core/mvcc/mvcc-rs/src/persistent_storage/mod.rs +++ b/core/mvcc/mvcc-rs/src/persistent_storage/mod.rs @@ -27,7 +27,7 @@ impl Storage { } impl Storage { - pub fn log_tx(&mut self, m: LogRecord) -> Result<()> { + pub fn log_tx(&self, m: LogRecord) -> Result<()> { match self { Self::JsonOnDisk(path) => { use std::io::Write;