core/mvcc: fix tests with state machines

This commit is contained in:
Pere Diaz Bou
2025-08-01 15:48:09 +02:00
parent 69b20d9d43
commit 764523a8bb
5 changed files with 76 additions and 76 deletions

View File

@@ -310,7 +310,7 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
type SMResult = ();
#[tracing::instrument(fields(state = ?self.state), skip(self, mvcc_store))]
fn step<'a>(&mut self, mvcc_store: &Self::Context) -> Result<TransitionResult<Self::SMResult>> {
fn step(&mut self, mvcc_store: &Self::Context) -> Result<TransitionResult<Self::SMResult>> {
match self.state {
CommitState::Initial => {
let end_ts = mvcc_store.get_timestamp();
@@ -529,7 +529,7 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
}
CommitState::Commit { end_ts } => {
let mut log_record = LogRecord::new(end_ts);
for ref id in &self.write_set {
for id in &self.write_set {
if let Some(row_versions) = mvcc_store.rows.get(id) {
let mut row_versions = row_versions.value().write();
for row_version in row_versions.iter_mut() {
@@ -579,7 +579,7 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
}
}
fn finalize<'a>(&mut self, _context: &Self::Context) -> Result<()> {
fn finalize(&mut self, _context: &Self::Context) -> Result<()> {
self.is_finalized = true;
Ok(())
}
@@ -595,7 +595,7 @@ impl StateTransition for WriteRowStateMachine {
type SMResult = ();
#[tracing::instrument(fields(state = ?self.state), skip(self, _context))]
fn step<'a>(&mut self, _context: &Self::Context) -> Result<TransitionResult<Self::SMResult>> {
fn step(&mut self, _context: &Self::Context) -> Result<TransitionResult<Self::SMResult>> {
use crate::storage::btree::BTreeCursor;
use crate::types::{IOResult, SeekKey, SeekOp};
@@ -666,7 +666,7 @@ impl StateTransition for WriteRowStateMachine {
}
}
fn finalize<'a>(&mut self, _context: &Self::Context) -> Result<()> {
fn finalize(&mut self, _context: &Self::Context) -> Result<()> {
self.is_finalized = true;
Ok(())
}

View File

@@ -64,9 +64,7 @@ fn test_insert_read() {
.unwrap()
.unwrap();
assert_eq!(tx1_row, row);
db.mvcc_store
.commit_tx(tx1, db.conn.pager.borrow().clone(), &db.conn)
.unwrap();
commit_tx(db.mvcc_store.clone(), &db.conn, tx1).unwrap();
let tx2 = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone());
let row = db
@@ -137,9 +135,7 @@ fn test_delete() {
)
.unwrap();
assert!(row.is_none());
db.mvcc_store
.commit_tx(tx1, db.conn.pager.borrow().clone(), &db.conn)
.unwrap();
commit_tx(db.mvcc_store.clone(), &db.conn, tx1).unwrap();
let tx2 = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone());
let row = db
@@ -206,9 +202,7 @@ fn test_commit() {
.unwrap()
.unwrap();
assert_eq!(tx1_updated_row, row);
db.mvcc_store
.commit_tx(tx1, db.conn.pager.borrow().clone(), &db.conn)
.unwrap();
commit_tx(db.mvcc_store.clone(), &db.conn, tx1).unwrap();
let tx2 = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone());
let row = db
@@ -222,9 +216,7 @@ fn test_commit() {
)
.unwrap()
.unwrap();
db.mvcc_store
.commit_tx(tx2, db.conn.pager.borrow().clone(), &db.conn)
.unwrap();
commit_tx(db.mvcc_store.clone(), &db.conn, tx2).unwrap();
assert_eq!(tx1_updated_row, row);
db.mvcc_store.drop_unused_row_versions();
}
@@ -356,9 +348,7 @@ fn test_dirty_read_deleted() {
let tx1 = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone());
let tx1_row = generate_simple_string_row(1, 1, "Hello");
db.mvcc_store.insert(tx1, tx1_row.clone()).unwrap();
db.mvcc_store
.commit_tx(tx1, db.conn.pager.borrow().clone(), &db.conn)
.unwrap();
commit_tx(db.mvcc_store.clone(), &db.conn, tx1).unwrap();
// T2 deletes row with ID 1, but does not commit.
let conn2 = db._db.connect().unwrap();
@@ -412,9 +402,7 @@ fn test_fuzzy_read() {
.unwrap()
.unwrap();
assert_eq!(tx1_row, row);
db.mvcc_store
.commit_tx(tx1, db.conn.pager.borrow().clone(), &db.conn)
.unwrap();
commit_tx(db.mvcc_store.clone(), &db.conn, tx1).unwrap();
// T2 reads the row with ID 1 within an active transaction.
let conn2 = db._db.connect().unwrap();
@@ -439,9 +427,7 @@ fn test_fuzzy_read() {
db.mvcc_store
.update(tx3, tx3_row, conn3.pager.borrow().clone())
.unwrap();
db.mvcc_store
.commit_tx(tx3, conn3.pager.borrow().clone(), &db.conn)
.unwrap();
commit_tx(db.mvcc_store.clone(), &conn3, tx3).unwrap();
// T2 still reads the same version of the row as before.
let row = db
@@ -463,7 +449,7 @@ fn test_fuzzy_read() {
let update_result = db
.mvcc_store
.update(tx2, tx2_newrow, conn2.pager.borrow().clone());
assert_eq!(Err(DatabaseError::WriteWriteConflict), update_result);
assert!(matches!(update_result, Err(LimboError::WriteWriteConflict)));
}
#[test]
@@ -486,9 +472,7 @@ fn test_lost_update() {
.unwrap()
.unwrap();
assert_eq!(tx1_row, row);
db.mvcc_store
.commit_tx(tx1, db.conn.pager.borrow().clone(), &db.conn)
.unwrap();
commit_tx(db.mvcc_store.clone(), &db.conn, tx1).unwrap();
// T2 attempts to update row ID 1 within an active transaction.
let conn2 = db._db.connect().unwrap();
@@ -503,20 +487,17 @@ fn test_lost_update() {
let conn3 = db._db.connect().unwrap();
let tx3 = db.mvcc_store.begin_tx(conn3.pager.borrow().clone());
let tx3_row = generate_simple_string_row(1, 1, "Hello, world!");
assert_eq!(
Err(DatabaseError::WriteWriteConflict),
assert!(matches!(
db.mvcc_store
.update(tx3, tx3_row, conn3.pager.borrow().clone())
);
.update(tx3, tx3_row, conn3.pager.borrow().clone(),),
Err(LimboError::WriteWriteConflict)
));
db.mvcc_store
.commit_tx(tx2, conn2.pager.borrow().clone(), &db.conn)
.unwrap();
assert_eq!(
Err(DatabaseError::TxTerminated),
db.mvcc_store
.commit_tx(tx3, conn3.pager.borrow().clone(), &db.conn)
);
commit_tx(db.mvcc_store.clone(), &conn2, tx2).unwrap();
assert!(matches!(
commit_tx(db.mvcc_store.clone(), &conn3, tx3),
Err(LimboError::TxTerminated)
));
let conn4 = db._db.connect().unwrap();
let tx4 = db.mvcc_store.begin_tx(conn4.pager.borrow().clone());
@@ -544,9 +525,7 @@ fn test_committed_visibility() {
let tx1 = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone());
let tx1_row = generate_simple_string_row(1, 1, "10");
db.mvcc_store.insert(tx1, tx1_row.clone()).unwrap();
db.mvcc_store
.commit_tx(tx1, db.conn.pager.borrow().clone(), &db.conn)
.unwrap();
commit_tx(db.mvcc_store.clone(), &db.conn, tx1).unwrap();
// but I like more money, so let me try adding $10 more
let conn2 = db._db.connect().unwrap();
@@ -612,9 +591,7 @@ fn test_future_row() {
assert_eq!(row, None);
// lets commit the transaction and check if tx1 can see it
db.mvcc_store
.commit_tx(tx2, conn2.pager.borrow().clone(), &db.conn)
.unwrap();
commit_tx(db.mvcc_store.clone(), &conn2, tx2).unwrap();
let row = db
.mvcc_store
.read(
@@ -658,9 +635,7 @@ fn setup_test_db() -> (MvccTestDb, u64) {
db.mvcc_store.insert(tx_id, row).unwrap();
}
db.mvcc_store
.commit_tx(tx_id, db.conn.pager.borrow().clone(), &db.conn)
.unwrap();
commit_tx(db.mvcc_store.clone(), &db.conn, tx_id).unwrap();
let tx_id = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone());
(db, tx_id)
@@ -679,14 +654,47 @@ fn setup_lazy_db(initial_keys: &[i64]) -> (MvccTestDb, u64) {
db.mvcc_store.insert(tx_id, row).unwrap();
}
db.mvcc_store
.commit_tx(tx_id, db.conn.pager.borrow().clone(), &db.conn)
.unwrap();
commit_tx(db.mvcc_store.clone(), &db.conn, tx_id).unwrap();
let tx_id = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone());
(db, tx_id)
}
pub(crate) fn commit_tx(
mv_store: Arc<MvStore<LocalClock>>,
conn: &Arc<Connection>,
tx_id: u64,
) -> Result<()> {
let mut sm = mv_store
.commit_tx(tx_id, conn.pager.borrow().clone(), conn)
.unwrap();
let result = sm.step(&mv_store)?;
assert!(sm.is_finalized());
match result {
TransitionResult::Done(()) => Ok(()),
_ => unreachable!(),
}
}
pub(crate) fn commit_tx_no_conn(
db: &MvccTestDbNoConn,
tx_id: u64,
conn: &Arc<Connection>,
) -> Result<(), LimboError> {
let mut sm = db
.db
.get_mv_store()
.unwrap()
.commit_tx(tx_id, conn.pager.borrow().clone(), conn)
.unwrap();
let result = sm.step(db.db.mv_store.as_ref().unwrap())?;
assert!(sm.is_finalized());
match result {
TransitionResult::Done(()) => Ok(()),
_ => unreachable!(),
}
}
#[test]
fn test_lazy_scan_cursor_basic() {
let (db, tx_id) = setup_lazy_db(&[1, 2, 3, 4, 5]);
@@ -801,7 +809,7 @@ fn test_cursor_with_empty_table() {
// FIXME: force page 1 initialization
let pager = db.conn.pager.borrow().clone();
let tx_id = db.mvcc_store.begin_tx(pager.clone());
db.mvcc_store.commit_tx(tx_id, pager, &db.conn).unwrap();
commit_tx(db.mvcc_store.clone(), &db.conn, tx_id).unwrap();
}
let tx_id = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone());
let table_id = 1; // Empty table

View File

@@ -41,7 +41,9 @@ pub use database::MvStore;
#[cfg(test)]
mod tests {
use crate::mvcc::database::tests::{generate_simple_string_row, MvccTestDbNoConn};
use crate::mvcc::database::tests::{
commit_tx_no_conn, generate_simple_string_row, MvccTestDbNoConn,
};
use crate::mvcc::database::RowID;
use std::sync::atomic::AtomicI64;
use std::sync::atomic::Ordering;
@@ -71,14 +73,10 @@ mod tests {
};
let row = generate_simple_string_row(1, id.row_id, "Hello");
mvcc_store.insert(tx, row.clone()).unwrap();
mvcc_store
.commit_tx(tx, conn.pager.borrow().clone(), &conn)
.unwrap();
commit_tx_no_conn(&db, tx, &conn).unwrap();
let tx = mvcc_store.begin_tx(conn.pager.borrow().clone());
let committed_row = mvcc_store.read(tx, id).unwrap();
mvcc_store
.commit_tx(tx, conn.pager.borrow().clone(), &conn)
.unwrap();
commit_tx_no_conn(&db, tx, &conn).unwrap();
assert_eq!(committed_row, Some(row));
}
})
@@ -96,14 +94,10 @@ mod tests {
};
let row = generate_simple_string_row(1, id.row_id, "World");
mvcc_store.insert(tx, row.clone()).unwrap();
mvcc_store
.commit_tx(tx, conn.pager.borrow().clone(), &conn)
.unwrap();
commit_tx_no_conn(&db, tx, &conn).unwrap();
let tx = mvcc_store.begin_tx(conn.pager.borrow().clone());
let committed_row = mvcc_store.read(tx, id).unwrap();
mvcc_store
.commit_tx(tx, conn.pager.borrow().clone(), &conn)
.unwrap();
commit_tx_no_conn(&db, tx, &conn).unwrap();
assert_eq!(committed_row, Some(row));
}
})
@@ -147,9 +141,7 @@ mod tests {
continue;
}
let committed_row = mvcc_store.read(tx, id).unwrap();
mvcc_store
.commit_tx(tx, conn.pager.borrow().clone(), &conn)
.unwrap();
commit_tx_no_conn(&db, tx, &conn).unwrap();
assert_eq!(committed_row, Some(row));
}
tracing::info!(

View File

@@ -17,12 +17,12 @@ pub trait StateTransition {
/// Returns `TransitionResult::Io` if the state machine needs to perform an IO operation.
/// Returns `TransitionResult::Continue` if the state machine needs to continue.
/// Returns `TransitionResult::Done` if the state machine is done.
fn step<'a>(&mut self, context: &Self::Context) -> Result<TransitionResult<Self::SMResult>>;
fn step(&mut self, context: &Self::Context) -> Result<TransitionResult<Self::SMResult>>;
/// Finalize the state machine.
///
/// This is called when the state machine is done.
fn finalize<'a>(&mut self, context: &Self::Context) -> Result<()>;
fn finalize(&mut self, context: &Self::Context) -> Result<()>;
/// Check if the state machine is finalized.
fn is_finalized(&self) -> bool;
@@ -48,7 +48,7 @@ impl<State: StateTransition> StateTransition for StateMachine<State> {
type Context = State::Context;
type SMResult = State::SMResult;
fn step<'a>(&mut self, context: &Self::Context) -> Result<TransitionResult<Self::SMResult>> {
fn step(&mut self, context: &Self::Context) -> Result<TransitionResult<Self::SMResult>> {
loop {
if self.is_finalized {
unreachable!("StateMachine::transition: state machine is finalized");
@@ -69,7 +69,7 @@ impl<State: StateTransition> StateTransition for StateMachine<State> {
}
}
fn finalize<'a>(&mut self, context: &Self::Context) -> Result<()> {
fn finalize(&mut self, context: &Self::Context) -> Result<()> {
self.state.finalize(context)?;
self.is_finalized = true;
Ok(())

View File

@@ -446,7 +446,7 @@ impl Program {
let mut state_machine =
mv_store.commit_tx(*tx_id, pager.clone(), &conn).unwrap();
state_machine
.step(&mv_store)
.step(mv_store)
.map_err(|e| LimboError::InternalError(e.to_string()))?;
assert!(state_machine.is_finalized());
}