From f2d84a534c4afadf64fa538b69cb8a760867ea73 Mon Sep 17 00:00:00 2001 From: pedrocarlo Date: Mon, 4 Aug 2025 15:10:44 -0300 Subject: [PATCH] adjust `clear_overflow_pages` --- core/storage/btree.rs | 80 +++++++++++++++++++++++++++---------------- 1 file changed, 50 insertions(+), 30 deletions(-) diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 2aba362bd..37955e90e 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -452,9 +452,10 @@ impl Debug for CursorState { } } +#[derive(Debug, Clone)] enum OverflowState { Start, - ProcessPage { next_page: u32 }, + ProcessPage { next_page: PageRef }, Done, } @@ -539,7 +540,7 @@ pub struct BTreeCursor { state: CursorState, /// Information maintained while freeing overflow pages. Maintained separately from cursor state since /// any method could require freeing overflow pages - overflow_state: Option, + overflow_state: OverflowState, /// Page stack used to traverse the btree. /// Each cursor has a stack because each cursor traverses the btree independently. stack: PageStack, @@ -626,7 +627,7 @@ impl BTreeCursor { null_flag: false, going_upwards: false, state: CursorState::None, - overflow_state: None, + overflow_state: OverflowState::Start, stack: PageStack { current_page: Cell::new(-1), node_states: RefCell::new([BTreeNodeState::default(); BTCURSOR_MAX_DEPTH + 1]), @@ -5008,9 +5009,7 @@ impl BTreeCursor { #[instrument(skip_all, level = Level::DEBUG)] fn clear_overflow_pages(&mut self, cell: &BTreeCell) -> Result> { loop { - let state = self.overflow_state.take().unwrap_or(OverflowState::Start); - - match state { + match self.overflow_state.clone() { OverflowState::Start => { let first_overflow_page = match cell { BTreeCell::TableLeafCell(leaf_cell) => leaf_cell.first_overflow_page, @@ -5021,42 +5020,63 @@ impl BTreeCursor { BTreeCell::TableInteriorCell(_) => return Ok(IOResult::Done(())), // No overflow pages }; - if let Some(page) = first_overflow_page { - self.overflow_state = Some(OverflowState::ProcessPage { next_page: page }); - continue; + if let Some(next_page) = first_overflow_page { + if next_page < 2 + || next_page + > self + .pager + .io + .block(|| { + self.pager.with_header(|header| header.database_size) + })? + .get() + { + self.overflow_state = OverflowState::Start; + return Err(LimboError::Corrupt("Invalid overflow page number".into())); + } + let (page, _c) = self.read_page(next_page as usize)?; + self.overflow_state = OverflowState::ProcessPage { + next_page: page.get(), + }; + return Ok(IOResult::IO); } else { - self.overflow_state = Some(OverflowState::Done); + self.overflow_state = OverflowState::Done; } } - OverflowState::ProcessPage { next_page } => { - if next_page < 2 - || next_page as usize - > self - .pager - .io - .block(|| self.pager.with_header(|header| header.database_size))? - .get() as usize - { - self.overflow_state = None; - return Err(LimboError::Corrupt("Invalid overflow page number".into())); - } - let (page, _c) = self.read_page(next_page as usize)?; - return_if_locked_maybe_load!(self.pager, page); + OverflowState::ProcessPage { next_page: page } => { + return_if_locked!(page); - let page = page.get(); - let contents = page.get().contents.as_ref().unwrap(); + let contents = page.get_contents(); let next = contents.read_u32(0); + let next_page_id = page.get().id; - return_if_io!(self.pager.free_page(Some(page), next_page as usize)); + return_if_io!(self.pager.free_page(Some(page), next_page_id)); if next != 0 { - self.overflow_state = Some(OverflowState::ProcessPage { next_page: next }); + if next < 2 + || next + > self + .pager + .io + .block(|| { + self.pager.with_header(|header| header.database_size) + })? + .get() + { + self.overflow_state = OverflowState::Start; + return Err(LimboError::Corrupt("Invalid overflow page number".into())); + } + let (page, _c) = self.read_page(next as usize)?; + self.overflow_state = OverflowState::ProcessPage { + next_page: page.get(), + }; + return Ok(IOResult::IO); } else { - self.overflow_state = Some(OverflowState::Done); + self.overflow_state = OverflowState::Done; } } OverflowState::Done => { - self.overflow_state = None; + self.overflow_state = OverflowState::Start; return Ok(IOResult::Done(())); } };