From 22f9cd695da203dc0141d5a0c39aaec4323f6316 Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Tue, 24 Jun 2025 19:23:40 +0200 Subject: [PATCH] commit_txn track rollback case --- core/storage/btree.rs | 2 +- core/storage/pager.rs | 13 +++++++++++-- core/storage/wal.rs | 2 +- core/vdbe/execute.rs | 24 +++++++++++++++--------- core/vdbe/mod.rs | 19 ++++++++++++++++--- 5 files changed, 44 insertions(+), 16 deletions(-) diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 6f237e784..d5c172fa6 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -7045,7 +7045,7 @@ mod tests { ) .unwrap(); loop { - match pager.end_tx().unwrap() { + match pager.end_tx(false).unwrap() { crate::PagerCacheflushStatus::Done(_) => break, crate::PagerCacheflushStatus::IO => { pager.io.run_once().unwrap(); diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 955a46d22..ee7cf8ba1 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -234,6 +234,7 @@ pub enum PagerCacheflushResult { /// The WAL was written, fsynced, and a checkpoint was performed. /// The database file was then also fsynced. Checkpointed(CheckpointResult), + Rollback, } impl Pager { @@ -573,7 +574,13 @@ impl Pager { self.wal.borrow_mut().begin_write_tx() } - pub fn end_tx(&self) -> Result { + pub fn end_tx(&self, rollback: bool) -> Result { + tracing::trace!("end_tx(rollback={})", rollback); + if rollback { + self.wal.borrow().end_write_tx()?; + self.wal.borrow().end_read_tx()?; + return Ok(PagerCacheflushStatus::Done(PagerCacheflushResult::Rollback)); + } let cacheflush_status = self.cacheflush()?; match cacheflush_status { PagerCacheflushStatus::IO => Ok(PagerCacheflushStatus::IO), @@ -1064,8 +1071,10 @@ impl Pager { pub fn rollback(&self) -> Result<(), LimboError> { let mut cache = self.page_cache.write(); - cache.clear(); + cache.unset_dirty_all_pages(); + cache.clear().expect("failed to clear page cache"); self.wal.borrow_mut().rollback()?; + Ok(()) } } diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 3f918a8d1..df7ca340d 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -909,6 +909,7 @@ impl Wal for WalFile { // TODO(pere): implement proper hashmap, this sucks :). let shared = self.get_shared(); let max_frame = shared.max_frame.load(Ordering::SeqCst); + tracing::trace!("rollback(to_max_frame={})", max_frame); let mut frame_cache = shared.frame_cache.lock(); for (_, frames) in frame_cache.iter_mut() { let mut last_valid_frame = frames.len(); @@ -927,7 +928,6 @@ impl Wal for WalFile { } fn finish_append_frames_commit(&mut self) -> Result<()> { - println!("finish_append_frames_commit"); let shared = self.get_shared(); shared.max_frame.store(self.max_frame, Ordering::SeqCst); tracing::trace!( diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index db281c758..656ce774d 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -1589,7 +1589,7 @@ pub fn halt( ))); } } - match program.commit_txn(pager.clone(), state, mv_store)? { + match program.commit_txn(pager.clone(), state, mv_store, false)? { StepResult::Done => Ok(InsnFunctionStepResult::Done), StepResult::IO => Ok(InsnFunctionStepResult::IO), StepResult::Row => Ok(InsnFunctionStepResult::Row), @@ -1637,12 +1637,18 @@ pub fn op_halt( ))); } } - match program.commit_txn(pager.clone(), state, mv_store)? { - StepResult::Done => Ok(InsnFunctionStepResult::Done), - StepResult::IO => Ok(InsnFunctionStepResult::IO), - StepResult::Row => Ok(InsnFunctionStepResult::Row), - StepResult::Interrupt => Ok(InsnFunctionStepResult::Interrupt), - StepResult::Busy => Ok(InsnFunctionStepResult::Busy), + let auto_commit = program.connection.auto_commit.get(); + tracing::trace!("op_halt(auto_commit={})", auto_commit); + if auto_commit { + match program.commit_txn(pager.clone(), state, mv_store, false)? { + StepResult::Done => Ok(InsnFunctionStepResult::Done), + StepResult::IO => Ok(InsnFunctionStepResult::IO), + StepResult::Row => Ok(InsnFunctionStepResult::Row), + StepResult::Interrupt => Ok(InsnFunctionStepResult::Interrupt), + StepResult::Busy => Ok(InsnFunctionStepResult::Busy), + } + } else { + Ok(InsnFunctionStepResult::Done) } } @@ -1743,7 +1749,7 @@ pub fn op_auto_commit( }; let conn = program.connection.clone(); if state.commit_state == CommitState::Committing { - return match program.commit_txn(pager.clone(), state, mv_store)? { + return match program.commit_txn(pager.clone(), state, mv_store, *rollback)? { super::StepResult::Done => Ok(InsnFunctionStepResult::Done), super::StepResult::IO => Ok(InsnFunctionStepResult::IO), super::StepResult::Row => Ok(InsnFunctionStepResult::Row), @@ -1773,7 +1779,7 @@ pub fn op_auto_commit( "cannot commit - no transaction is active".to_string(), )); } - match program.commit_txn(pager.clone(), state, mv_store)? { + match program.commit_txn(pager.clone(), state, mv_store, *rollback)? { super::StepResult::Done => Ok(InsnFunctionStepResult::Done), super::StepResult::IO => Ok(InsnFunctionStepResult::IO), super::StepResult::Row => Ok(InsnFunctionStepResult::Row), diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index 738300116..93e3c704b 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -395,6 +395,7 @@ impl Program { pager: Rc, program_state: &mut ProgramState, mv_store: Option<&Rc>, + rollback: bool, ) -> Result { if let Some(mv_store) = mv_store { let conn = self.connection.clone(); @@ -410,16 +411,27 @@ impl Program { } else { let connection = self.connection.clone(); let auto_commit = connection.auto_commit.get(); - tracing::trace!("Halt auto_commit {}", auto_commit); + tracing::trace!( + "Halt auto_commit {}, state={:?}", + auto_commit, + program_state.commit_state + ); if program_state.commit_state == CommitState::Committing { - self.step_end_write_txn(&pager, &mut program_state.commit_state, &connection) + self.step_end_write_txn( + &pager, + &mut program_state.commit_state, + &connection, + rollback, + ) } else if auto_commit { let current_state = connection.transaction_state.get(); + tracing::trace!("Auto-commit state: {:?}", current_state); match current_state { TransactionState::Write => self.step_end_write_txn( &pager, &mut program_state.commit_state, &connection, + rollback, ), TransactionState::Read => { connection.transaction_state.replace(TransactionState::None); @@ -443,8 +455,9 @@ impl Program { pager: &Rc, commit_state: &mut CommitState, connection: &Connection, + rollback: bool, ) -> Result { - let cacheflush_status = pager.end_tx()?; + let cacheflush_status = pager.end_tx(rollback)?; match cacheflush_status { PagerCacheflushStatus::Done(_) => { if self.change_cnt_on {