From 8f43741513d23ba506475216906fd6ef13a13f07 Mon Sep 17 00:00:00 2001 From: Jussi Saurio Date: Mon, 15 Sep 2025 09:29:08 +0300 Subject: [PATCH] fix mvcc rollback executing ROLLBACK did not rollback the mv-store transaction --- core/benches/mvcc_benchmark.rs | 4 +++- core/mvcc/database/mod.rs | 23 +++++++++++++++++++---- core/mvcc/database/tests.rs | 7 +++++-- core/vdbe/execute.rs | 8 +++++++- core/vdbe/mod.rs | 3 +-- 5 files changed, 35 insertions(+), 10 deletions(-) diff --git a/core/benches/mvcc_benchmark.rs b/core/benches/mvcc_benchmark.rs index ab8484c35..c69392208 100644 --- a/core/benches/mvcc_benchmark.rs +++ b/core/benches/mvcc_benchmark.rs @@ -36,7 +36,9 @@ fn bench(c: &mut Criterion) { b.to_async(FuturesExecutor).iter(|| async { let conn = db.conn.clone(); let tx_id = db.mvcc_store.begin_tx(conn.get_pager().clone()); - db.mvcc_store.rollback_tx(tx_id, conn.get_pager().clone()) + db.mvcc_store + .rollback_tx(tx_id, conn.get_pager().clone(), &conn) + .unwrap(); }) }); diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index b03a612ba..c5a96a6c6 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -1388,17 +1388,28 @@ impl MvStore { /// # Arguments /// /// * `tx_id` - The ID of the transaction to abort. - pub fn rollback_tx(&self, tx_id: TxID, pager: Rc) { + pub fn rollback_tx( + &self, + tx_id: TxID, + pager: Rc, + connection: &Connection, + ) -> Result<()> { let tx_unlocked = self.txs.get(&tx_id).unwrap(); let tx = tx_unlocked.value(); + connection.mv_tx.set(None); assert_eq!(tx.state, TransactionState::Active); tx.state.store(TransactionState::Aborted); tracing::trace!("abort(tx_id={})", tx_id); let write_set: Vec = tx.write_set.iter().map(|v| *v.value()).collect(); - if self.is_exclusive_tx(&tx_id) { + let pager_rollback_done = if self.is_exclusive_tx(&tx_id) { + self.commit_coordinator.pager_commit_lock.unlock(); self.release_exclusive_tx(&tx_id); - } + pager.io.block(|| pager.end_tx(true, connection))?; + true + } else { + false + }; for ref id in write_set { if let Some(row_versions) = self.rows.get(id) { @@ -1420,10 +1431,14 @@ impl MvStore { let tx = tx_unlocked.value(); tx.state.store(TransactionState::Terminated); tracing::trace!("terminate(tx_id={})", tx_id); - pager.end_read_tx().unwrap(); + if !pager_rollback_done { + pager.end_read_tx()?; + } // FIXME: verify that we can already remove the transaction here! // Maybe it's fine for snapshot isolation, but too early for serializable? self.txs.remove(&tx_id); + + Ok(()) } /// Returns true if the given transaction is the exclusive transaction. diff --git a/core/mvcc/database/tests.rs b/core/mvcc/database/tests.rs index ee6ef58c0..604095efa 100644 --- a/core/mvcc/database/tests.rs +++ b/core/mvcc/database/tests.rs @@ -296,7 +296,8 @@ fn test_rollback() { .unwrap(); assert_eq!(row3, row4); db.mvcc_store - .rollback_tx(tx1, db.conn.pager.borrow().clone()); + .rollback_tx(tx1, db.conn.pager.borrow().clone(), &db.conn) + .unwrap(); let tx2 = db.mvcc_store.begin_tx(db.conn.pager.borrow().clone()); let row5 = db .mvcc_store @@ -521,7 +522,9 @@ fn test_lost_update() { Err(LimboError::WriteWriteConflict) )); // hack: in the actual tursodb database we rollback the mvcc tx ourselves, so manually roll it back here - db.mvcc_store.rollback_tx(tx3, conn3.pager.borrow().clone()); + db.mvcc_store + .rollback_tx(tx3, conn3.pager.borrow().clone(), &conn3) + .unwrap(); commit_tx(db.mvcc_store.clone(), &conn2, tx2).unwrap(); assert!(matches!( diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 3913e4c22..53fc9ee76 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -2300,7 +2300,13 @@ pub fn op_auto_commit( if *auto_commit != conn.auto_commit.get() { if *rollback { // TODO(pere): add rollback I/O logic once we implement rollback journal - return_if_io!(pager.end_tx(true, &conn)); + if let Some(mv_store) = mv_store { + if let Some((tx_id, _)) = conn.mv_tx.get() { + mv_store.rollback_tx(tx_id, pager.clone(), &conn)?; + } + } else { + return_if_io!(pager.end_tx(true, &conn)); + } conn.transaction_state.replace(TransactionState::None); conn.auto_commit.replace(true); } else { diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index ebc2715c6..0606f2e08 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -1080,10 +1080,9 @@ pub fn handle_program_error( _ => { if let Some(mv_store) = mv_store { if let Some((tx_id, _)) = connection.mv_tx.get() { - connection.mv_tx.set(None); connection.transaction_state.replace(TransactionState::None); connection.auto_commit.replace(true); - mv_store.rollback_tx(tx_id, pager.clone()); + mv_store.rollback_tx(tx_id, pager.clone(), connection)?; } } else { pager