From b07e57d9d17ba13625d1b9d0a5239347c8764edb Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Wed, 23 Jul 2025 11:59:54 +0200 Subject: [PATCH] review fixes --- core/lib.rs | 7 +-- core/storage/pager.rs | 122 +++++++++++++++++++++--------------------- core/vdbe/mod.rs | 59 ++++++++++---------- 3 files changed, 92 insertions(+), 96 deletions(-) diff --git a/core/lib.rs b/core/lib.rs index 2dc6d6bf7..0a661f6f8 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -805,11 +805,8 @@ impl Connection { return Err(LimboError::InternalError("Connection closed".to_string())); } let res = self._db.io.run_once(); - if res.is_err() { - let state = self.transaction_state.get(); - if let TransactionState::Write { schema_did_change } = state { - self.pager.borrow().rollback(schema_did_change, self)? - } + if let Err(ref e) = res { + vdbe::handle_program_error(&self.pager.borrow(), self, e)?; } res } diff --git a/core/storage/pager.rs b/core/storage/pager.rs index ca5392a58..286de874d 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -953,36 +953,36 @@ impl Pager { current_page_to_append_idx, } => { let in_flight = self.flush_info.borrow().in_flight_writes.clone(); - if *in_flight.borrow() == 0 { - // Clear dirty now - let page_id = self.flush_info.borrow().dirty_pages[current_page_to_append_idx]; - let page = { - let mut cache = self.page_cache.write(); - let page_key = PageCacheKey::new(page_id); - let page = cache.get(&page_key).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it."); - let page_type = page.get().contents.as_ref().unwrap().maybe_page_type(); - trace!( - "commit_dirty_pages(page={}, page_type={:?}", - page_id, - page_type - ); - page - }; - page.clear_dirty(); - // Continue with next page - let is_last_page = current_page_to_append_idx - == self.flush_info.borrow().dirty_pages.len() - 1; - if is_last_page { - self.dirty_pages.borrow_mut().clear(); - self.flush_info.borrow_mut().state = CacheFlushState::Start; - Ok(IOResult::Done(())) - } else { - self.flush_info.borrow_mut().state = CacheFlushState::AppendFrame { - current_page_to_append_idx: current_page_to_append_idx + 1, - }; - Ok(IOResult::IO) - } + if *in_flight.borrow() > 0 { + return Ok(IOResult::IO); + } + + // Clear dirty now + let page_id = self.flush_info.borrow().dirty_pages[current_page_to_append_idx]; + let page = { + let mut cache = self.page_cache.write(); + let page_key = PageCacheKey::new(page_id); + let page = cache.get(&page_key).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it."); + let page_type = page.get().contents.as_ref().unwrap().maybe_page_type(); + trace!( + "commit_dirty_pages(page={}, page_type={:?}", + page_id, + page_type + ); + page + }; + page.clear_dirty(); + // Continue with next page + let is_last_page = + current_page_to_append_idx == self.flush_info.borrow().dirty_pages.len() - 1; + if is_last_page { + self.dirty_pages.borrow_mut().clear(); + self.flush_info.borrow_mut().state = CacheFlushState::Start; + Ok(IOResult::Done(())) } else { + self.flush_info.borrow_mut().state = CacheFlushState::AppendFrame { + current_page_to_append_idx: current_page_to_append_idx + 1, + }; Ok(IOResult::IO) } } @@ -1064,46 +1064,44 @@ impl Pager { current_page_to_append_idx, } => { let in_flight = self.commit_info.borrow().in_flight_writes.clone(); - if *in_flight.borrow() == 0 { - // First clear dirty - let page_id = - self.commit_info.borrow().dirty_pages[current_page_to_append_idx]; - let page = { - let mut cache = self.page_cache.write(); - let page_key = PageCacheKey::new(page_id); - let page = cache.get(&page_key).unwrap_or_else(|| { + if *in_flight.borrow() > 0 { + return Ok(IOResult::IO); + } + // First clear dirty + let page_id = self.commit_info.borrow().dirty_pages[current_page_to_append_idx]; + let page = { + let mut cache = self.page_cache.write(); + let page_key = PageCacheKey::new(page_id); + let page = cache.get(&page_key).unwrap_or_else(|| { panic!( "we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it. page={page_id}" ) }); - let page_type = page.get().contents.as_ref().unwrap().maybe_page_type(); - trace!( - "commit_dirty_pages(page={}, page_type={:?}", - page_id, - page_type - ); - page - }; - page.clear_dirty(); + let page_type = page.get().contents.as_ref().unwrap().maybe_page_type(); + trace!( + "commit_dirty_pages(page={}, page_type={:?}", + page_id, + page_type + ); + page + }; + page.clear_dirty(); - // Now advance to next page if there are more - let is_last_frame = current_page_to_append_idx - == self.commit_info.borrow().dirty_pages.len() - 1; - if is_last_frame { - // Let's clear the page cache now - { - let mut cache = self.page_cache.write(); - cache.clear().unwrap(); - } - self.dirty_pages.borrow_mut().clear(); - self.commit_info.borrow_mut().state = CommitState::SyncWal; - } else { - self.commit_info.borrow_mut().state = CommitState::AppendFrame { - current_page_to_append_idx: current_page_to_append_idx + 1, - } + // Now advance to next page if there are more + let is_last_frame = current_page_to_append_idx + == self.commit_info.borrow().dirty_pages.len() - 1; + if is_last_frame { + // Let's clear the page cache now + { + let mut cache = self.page_cache.write(); + cache.clear().unwrap(); } + self.dirty_pages.borrow_mut().clear(); + self.commit_info.borrow_mut().state = CommitState::SyncWal; } else { - return Ok(IOResult::IO); + self.commit_info.borrow_mut().state = CommitState::AppendFrame { + current_page_to_append_idx: current_page_to_append_idx + 1, + } } } CommitState::SyncWal => { diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index 25c05dab3..6a337d934 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -413,35 +413,8 @@ impl Program { Ok(InsnFunctionStepResult::Interrupt) => return Ok(StepResult::Interrupt), Ok(InsnFunctionStepResult::Busy) => return Ok(StepResult::Busy), Err(err) => { - match err { - LimboError::TxError(_) => {} - _ => { - let state = self.connection.transaction_state.get(); - if let TransactionState::Write { schema_did_change } = state { - if let Err(e) = pager.rollback(schema_did_change, &self.connection) - { - tracing::error!("rollback failed: {e}"); - } - if let Err(e) = - pager.end_tx(false, schema_did_change, &self.connection, false) - { - tracing::error!("end_tx failed: {e}"); - } - self.connection - .transaction_state - .replace(TransactionState::None); - } else { - if let Err(e) = pager.end_read_tx() { - tracing::error!("end_read_tx failed: {e}"); - } - self.connection - .transaction_state - .replace(TransactionState::None); - } - } - } - let err = Err(err); - return err; + handle_program_error(&pager, &self.connection, &err)?; + return Err(err); } } } @@ -769,3 +742,31 @@ impl Row { self.count } } + +/// Handle a program error by rolling back the transaction +pub fn handle_program_error( + pager: &Rc, + connection: &Connection, + err: &LimboError, +) -> Result<()> { + match err { + LimboError::TxError(_) => {} + _ => { + let state = connection.transaction_state.get(); + if let TransactionState::Write { schema_did_change } = state { + if let Err(e) = pager.rollback(schema_did_change, connection) { + tracing::error!("rollback failed: {e}"); + } + if let Err(e) = pager.end_tx(false, schema_did_change, connection, false) { + tracing::error!("end_tx failed: {e}"); + } + } else { + if let Err(e) = pager.end_read_tx() { + tracing::error!("end_read_tx failed: {e}"); + } + } + connection.transaction_state.replace(TransactionState::None); + } + } + Ok(()) +}