diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index ceed6e2a2..7b9c60aaf 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -310,7 +310,7 @@ impl StateTransition for CommitStateMachine { type SMResult = (); #[tracing::instrument(fields(state = ?self.state), skip(self, mvcc_store))] - fn step<'a>(&mut self, mvcc_store: &Self::Context) -> Result> { + fn step(&mut self, mvcc_store: &Self::Context) -> Result> { match self.state { CommitState::Initial => { let end_ts = mvcc_store.get_timestamp(); @@ -529,7 +529,7 @@ impl StateTransition for CommitStateMachine { } CommitState::Commit { end_ts } => { let mut log_record = LogRecord::new(end_ts); - for ref id in &self.write_set { + for id in &self.write_set { if let Some(row_versions) = mvcc_store.rows.get(id) { let mut row_versions = row_versions.value().write(); for row_version in row_versions.iter_mut() { @@ -579,7 +579,7 @@ impl StateTransition for CommitStateMachine { } } - fn finalize<'a>(&mut self, _context: &Self::Context) -> Result<()> { + fn finalize(&mut self, _context: &Self::Context) -> Result<()> { self.is_finalized = true; Ok(()) } @@ -595,7 +595,7 @@ impl StateTransition for WriteRowStateMachine { type SMResult = (); #[tracing::instrument(fields(state = ?self.state), skip(self, _context))] - fn step<'a>(&mut self, _context: &Self::Context) -> Result> { + fn step(&mut self, _context: &Self::Context) -> Result> { use crate::storage::btree::BTreeCursor; use crate::types::{IOResult, SeekKey, SeekOp}; @@ -666,7 +666,7 @@ impl StateTransition for WriteRowStateMachine { } } - fn finalize<'a>(&mut self, _context: &Self::Context) -> Result<()> { + fn finalize(&mut self, _context: &Self::Context) -> Result<()> { self.is_finalized = true; Ok(()) } diff --git a/core/mvcc/database/tests.rs b/core/mvcc/database/tests.rs index 6af7a5e6a..814ebe61f 100644 --- a/core/mvcc/database/tests.rs +++ b/core/mvcc/database/tests.rs @@ -64,9 +64,7 @@ fn test_insert_read() { .unwrap() .unwrap(); assert_eq!(tx1_row, row); - db.mvcc_store - .commit_tx(tx1, db.conn.pager.borrow().clone(), &db.conn) - .unwrap(); + commit_tx(db.mvcc_store.clone(), &db.conn, tx1).unwrap(); let tx2 = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone()); let row = db @@ -137,9 +135,7 @@ fn test_delete() { ) .unwrap(); assert!(row.is_none()); - db.mvcc_store - .commit_tx(tx1, db.conn.pager.borrow().clone(), &db.conn) - .unwrap(); + commit_tx(db.mvcc_store.clone(), &db.conn, tx1).unwrap(); let tx2 = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone()); let row = db @@ -206,9 +202,7 @@ fn test_commit() { .unwrap() .unwrap(); assert_eq!(tx1_updated_row, row); - db.mvcc_store - .commit_tx(tx1, db.conn.pager.borrow().clone(), &db.conn) - .unwrap(); + commit_tx(db.mvcc_store.clone(), &db.conn, tx1).unwrap(); let tx2 = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone()); let row = db @@ -222,9 +216,7 @@ fn test_commit() { ) .unwrap() .unwrap(); - db.mvcc_store - .commit_tx(tx2, db.conn.pager.borrow().clone(), &db.conn) - .unwrap(); + commit_tx(db.mvcc_store.clone(), &db.conn, tx2).unwrap(); assert_eq!(tx1_updated_row, row); db.mvcc_store.drop_unused_row_versions(); } @@ -356,9 +348,7 @@ fn test_dirty_read_deleted() { let tx1 = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone()); let tx1_row = generate_simple_string_row(1, 1, "Hello"); db.mvcc_store.insert(tx1, tx1_row.clone()).unwrap(); - db.mvcc_store - .commit_tx(tx1, db.conn.pager.borrow().clone(), &db.conn) - .unwrap(); + commit_tx(db.mvcc_store.clone(), &db.conn, tx1).unwrap(); // T2 deletes row with ID 1, but does not commit. let conn2 = db._db.connect().unwrap(); @@ -412,9 +402,7 @@ fn test_fuzzy_read() { .unwrap() .unwrap(); assert_eq!(tx1_row, row); - db.mvcc_store - .commit_tx(tx1, db.conn.pager.borrow().clone(), &db.conn) - .unwrap(); + commit_tx(db.mvcc_store.clone(), &db.conn, tx1).unwrap(); // T2 reads the row with ID 1 within an active transaction. let conn2 = db._db.connect().unwrap(); @@ -439,9 +427,7 @@ fn test_fuzzy_read() { db.mvcc_store .update(tx3, tx3_row, conn3.pager.borrow().clone()) .unwrap(); - db.mvcc_store - .commit_tx(tx3, conn3.pager.borrow().clone(), &db.conn) - .unwrap(); + commit_tx(db.mvcc_store.clone(), &conn3, tx3).unwrap(); // T2 still reads the same version of the row as before. let row = db @@ -463,7 +449,7 @@ fn test_fuzzy_read() { let update_result = db .mvcc_store .update(tx2, tx2_newrow, conn2.pager.borrow().clone()); - assert_eq!(Err(DatabaseError::WriteWriteConflict), update_result); + assert!(matches!(update_result, Err(LimboError::WriteWriteConflict))); } #[test] @@ -486,9 +472,7 @@ fn test_lost_update() { .unwrap() .unwrap(); assert_eq!(tx1_row, row); - db.mvcc_store - .commit_tx(tx1, db.conn.pager.borrow().clone(), &db.conn) - .unwrap(); + commit_tx(db.mvcc_store.clone(), &db.conn, tx1).unwrap(); // T2 attempts to update row ID 1 within an active transaction. let conn2 = db._db.connect().unwrap(); @@ -503,20 +487,17 @@ fn test_lost_update() { let conn3 = db._db.connect().unwrap(); let tx3 = db.mvcc_store.begin_tx(conn3.pager.borrow().clone()); let tx3_row = generate_simple_string_row(1, 1, "Hello, world!"); - assert_eq!( - Err(DatabaseError::WriteWriteConflict), + assert!(matches!( db.mvcc_store - .update(tx3, tx3_row, conn3.pager.borrow().clone()) - ); + .update(tx3, tx3_row, conn3.pager.borrow().clone(),), + Err(LimboError::WriteWriteConflict) + )); - db.mvcc_store - .commit_tx(tx2, conn2.pager.borrow().clone(), &db.conn) - .unwrap(); - assert_eq!( - Err(DatabaseError::TxTerminated), - db.mvcc_store - .commit_tx(tx3, conn3.pager.borrow().clone(), &db.conn) - ); + commit_tx(db.mvcc_store.clone(), &conn2, tx2).unwrap(); + assert!(matches!( + commit_tx(db.mvcc_store.clone(), &conn3, tx3), + Err(LimboError::TxTerminated) + )); let conn4 = db._db.connect().unwrap(); let tx4 = db.mvcc_store.begin_tx(conn4.pager.borrow().clone()); @@ -544,9 +525,7 @@ fn test_committed_visibility() { let tx1 = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone()); let tx1_row = generate_simple_string_row(1, 1, "10"); db.mvcc_store.insert(tx1, tx1_row.clone()).unwrap(); - db.mvcc_store - .commit_tx(tx1, db.conn.pager.borrow().clone(), &db.conn) - .unwrap(); + commit_tx(db.mvcc_store.clone(), &db.conn, tx1).unwrap(); // but I like more money, so let me try adding $10 more let conn2 = db._db.connect().unwrap(); @@ -612,9 +591,7 @@ fn test_future_row() { assert_eq!(row, None); // lets commit the transaction and check if tx1 can see it - db.mvcc_store - .commit_tx(tx2, conn2.pager.borrow().clone(), &db.conn) - .unwrap(); + commit_tx(db.mvcc_store.clone(), &conn2, tx2).unwrap(); let row = db .mvcc_store .read( @@ -658,9 +635,7 @@ fn setup_test_db() -> (MvccTestDb, u64) { db.mvcc_store.insert(tx_id, row).unwrap(); } - db.mvcc_store - .commit_tx(tx_id, db.conn.pager.borrow().clone(), &db.conn) - .unwrap(); + commit_tx(db.mvcc_store.clone(), &db.conn, tx_id).unwrap(); let tx_id = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone()); (db, tx_id) @@ -679,14 +654,47 @@ fn setup_lazy_db(initial_keys: &[i64]) -> (MvccTestDb, u64) { db.mvcc_store.insert(tx_id, row).unwrap(); } - db.mvcc_store - .commit_tx(tx_id, db.conn.pager.borrow().clone(), &db.conn) - .unwrap(); + commit_tx(db.mvcc_store.clone(), &db.conn, tx_id).unwrap(); let tx_id = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone()); (db, tx_id) } +pub(crate) fn commit_tx( + mv_store: Arc>, + conn: &Arc, + tx_id: u64, +) -> Result<()> { + let mut sm = mv_store + .commit_tx(tx_id, conn.pager.borrow().clone(), conn) + .unwrap(); + let result = sm.step(&mv_store)?; + assert!(sm.is_finalized()); + match result { + TransitionResult::Done(()) => Ok(()), + _ => unreachable!(), + } +} + +pub(crate) fn commit_tx_no_conn( + db: &MvccTestDbNoConn, + tx_id: u64, + conn: &Arc, +) -> Result<(), LimboError> { + let mut sm = db + .db + .get_mv_store() + .unwrap() + .commit_tx(tx_id, conn.pager.borrow().clone(), conn) + .unwrap(); + let result = sm.step(db.db.mv_store.as_ref().unwrap())?; + assert!(sm.is_finalized()); + match result { + TransitionResult::Done(()) => Ok(()), + _ => unreachable!(), + } +} + #[test] fn test_lazy_scan_cursor_basic() { let (db, tx_id) = setup_lazy_db(&[1, 2, 3, 4, 5]); @@ -801,7 +809,7 @@ fn test_cursor_with_empty_table() { // FIXME: force page 1 initialization let pager = db.conn.pager.borrow().clone(); let tx_id = db.mvcc_store.begin_tx(pager.clone()); - db.mvcc_store.commit_tx(tx_id, pager, &db.conn).unwrap(); + commit_tx(db.mvcc_store.clone(), &db.conn, tx_id).unwrap(); } let tx_id = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone()); let table_id = 1; // Empty table diff --git a/core/mvcc/mod.rs b/core/mvcc/mod.rs index b45a281e6..32b8ce807 100644 --- a/core/mvcc/mod.rs +++ b/core/mvcc/mod.rs @@ -41,7 +41,9 @@ pub use database::MvStore; #[cfg(test)] mod tests { - use crate::mvcc::database::tests::{generate_simple_string_row, MvccTestDbNoConn}; + use crate::mvcc::database::tests::{ + commit_tx_no_conn, generate_simple_string_row, MvccTestDbNoConn, + }; use crate::mvcc::database::RowID; use std::sync::atomic::AtomicI64; use std::sync::atomic::Ordering; @@ -71,14 +73,10 @@ mod tests { }; let row = generate_simple_string_row(1, id.row_id, "Hello"); mvcc_store.insert(tx, row.clone()).unwrap(); - mvcc_store - .commit_tx(tx, conn.pager.borrow().clone(), &conn) - .unwrap(); + commit_tx_no_conn(&db, tx, &conn).unwrap(); let tx = mvcc_store.begin_tx(conn.pager.borrow().clone()); let committed_row = mvcc_store.read(tx, id).unwrap(); - mvcc_store - .commit_tx(tx, conn.pager.borrow().clone(), &conn) - .unwrap(); + commit_tx_no_conn(&db, tx, &conn).unwrap(); assert_eq!(committed_row, Some(row)); } }) @@ -96,14 +94,10 @@ mod tests { }; let row = generate_simple_string_row(1, id.row_id, "World"); mvcc_store.insert(tx, row.clone()).unwrap(); - mvcc_store - .commit_tx(tx, conn.pager.borrow().clone(), &conn) - .unwrap(); + commit_tx_no_conn(&db, tx, &conn).unwrap(); let tx = mvcc_store.begin_tx(conn.pager.borrow().clone()); let committed_row = mvcc_store.read(tx, id).unwrap(); - mvcc_store - .commit_tx(tx, conn.pager.borrow().clone(), &conn) - .unwrap(); + commit_tx_no_conn(&db, tx, &conn).unwrap(); assert_eq!(committed_row, Some(row)); } }) @@ -147,9 +141,7 @@ mod tests { continue; } let committed_row = mvcc_store.read(tx, id).unwrap(); - mvcc_store - .commit_tx(tx, conn.pager.borrow().clone(), &conn) - .unwrap(); + commit_tx_no_conn(&db, tx, &conn).unwrap(); assert_eq!(committed_row, Some(row)); } tracing::info!( diff --git a/core/state_machine.rs b/core/state_machine.rs index 228d26879..fc8361480 100644 --- a/core/state_machine.rs +++ b/core/state_machine.rs @@ -17,12 +17,12 @@ pub trait StateTransition { /// Returns `TransitionResult::Io` if the state machine needs to perform an IO operation. /// Returns `TransitionResult::Continue` if the state machine needs to continue. /// Returns `TransitionResult::Done` if the state machine is done. - fn step<'a>(&mut self, context: &Self::Context) -> Result>; + fn step(&mut self, context: &Self::Context) -> Result>; /// Finalize the state machine. /// /// This is called when the state machine is done. - fn finalize<'a>(&mut self, context: &Self::Context) -> Result<()>; + fn finalize(&mut self, context: &Self::Context) -> Result<()>; /// Check if the state machine is finalized. fn is_finalized(&self) -> bool; @@ -48,7 +48,7 @@ impl StateTransition for StateMachine { type Context = State::Context; type SMResult = State::SMResult; - fn step<'a>(&mut self, context: &Self::Context) -> Result> { + fn step(&mut self, context: &Self::Context) -> Result> { loop { if self.is_finalized { unreachable!("StateMachine::transition: state machine is finalized"); @@ -69,7 +69,7 @@ impl StateTransition for StateMachine { } } - fn finalize<'a>(&mut self, context: &Self::Context) -> Result<()> { + fn finalize(&mut self, context: &Self::Context) -> Result<()> { self.state.finalize(context)?; self.is_finalized = true; Ok(()) diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index 7751abdee..8e1ec1f67 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -446,7 +446,7 @@ impl Program { let mut state_machine = mv_store.commit_tx(*tx_id, pager.clone(), &conn).unwrap(); state_machine - .step(&mv_store) + .step(mv_store) .map_err(|e| LimboError::InternalError(e.to_string()))?; assert!(state_machine.is_finalized()); }