adjust clear_overflow_pages

This commit is contained in:
pedrocarlo
2025-08-04 15:10:44 -03:00
parent 718ad5e7fd
commit f2d84a534c

View File

@@ -452,9 +452,10 @@ impl Debug for CursorState {
}
}
#[derive(Debug, Clone)]
enum OverflowState {
Start,
ProcessPage { next_page: u32 },
ProcessPage { next_page: PageRef },
Done,
}
@@ -539,7 +540,7 @@ pub struct BTreeCursor {
state: CursorState,
/// Information maintained while freeing overflow pages. Maintained separately from cursor state since
/// any method could require freeing overflow pages
overflow_state: Option<OverflowState>,
overflow_state: OverflowState,
/// Page stack used to traverse the btree.
/// Each cursor has a stack because each cursor traverses the btree independently.
stack: PageStack,
@@ -626,7 +627,7 @@ impl BTreeCursor {
null_flag: false,
going_upwards: false,
state: CursorState::None,
overflow_state: None,
overflow_state: OverflowState::Start,
stack: PageStack {
current_page: Cell::new(-1),
node_states: RefCell::new([BTreeNodeState::default(); BTCURSOR_MAX_DEPTH + 1]),
@@ -5008,9 +5009,7 @@ impl BTreeCursor {
#[instrument(skip_all, level = Level::DEBUG)]
fn clear_overflow_pages(&mut self, cell: &BTreeCell) -> Result<IOResult<()>> {
loop {
let state = self.overflow_state.take().unwrap_or(OverflowState::Start);
match state {
match self.overflow_state.clone() {
OverflowState::Start => {
let first_overflow_page = match cell {
BTreeCell::TableLeafCell(leaf_cell) => leaf_cell.first_overflow_page,
@@ -5021,42 +5020,63 @@ impl BTreeCursor {
BTreeCell::TableInteriorCell(_) => return Ok(IOResult::Done(())), // No overflow pages
};
if let Some(page) = first_overflow_page {
self.overflow_state = Some(OverflowState::ProcessPage { next_page: page });
continue;
if let Some(next_page) = first_overflow_page {
if next_page < 2
|| next_page
> self
.pager
.io
.block(|| {
self.pager.with_header(|header| header.database_size)
})?
.get()
{
self.overflow_state = OverflowState::Start;
return Err(LimboError::Corrupt("Invalid overflow page number".into()));
}
let (page, _c) = self.read_page(next_page as usize)?;
self.overflow_state = OverflowState::ProcessPage {
next_page: page.get(),
};
return Ok(IOResult::IO);
} else {
self.overflow_state = Some(OverflowState::Done);
self.overflow_state = OverflowState::Done;
}
}
OverflowState::ProcessPage { next_page } => {
if next_page < 2
|| next_page as usize
> self
.pager
.io
.block(|| self.pager.with_header(|header| header.database_size))?
.get() as usize
{
self.overflow_state = None;
return Err(LimboError::Corrupt("Invalid overflow page number".into()));
}
let (page, _c) = self.read_page(next_page as usize)?;
return_if_locked_maybe_load!(self.pager, page);
OverflowState::ProcessPage { next_page: page } => {
return_if_locked!(page);
let page = page.get();
let contents = page.get().contents.as_ref().unwrap();
let contents = page.get_contents();
let next = contents.read_u32(0);
let next_page_id = page.get().id;
return_if_io!(self.pager.free_page(Some(page), next_page as usize));
return_if_io!(self.pager.free_page(Some(page), next_page_id));
if next != 0 {
self.overflow_state = Some(OverflowState::ProcessPage { next_page: next });
if next < 2
|| next
> self
.pager
.io
.block(|| {
self.pager.with_header(|header| header.database_size)
})?
.get()
{
self.overflow_state = OverflowState::Start;
return Err(LimboError::Corrupt("Invalid overflow page number".into()));
}
let (page, _c) = self.read_page(next as usize)?;
self.overflow_state = OverflowState::ProcessPage {
next_page: page.get(),
};
return Ok(IOResult::IO);
} else {
self.overflow_state = Some(OverflowState::Done);
self.overflow_state = OverflowState::Done;
}
}
OverflowState::Done => {
self.overflow_state = None;
self.overflow_state = OverflowState::Start;
return Ok(IOResult::Done(()));
}
};