From 8cae10f744fa0362e2f127227b705bde8216b8e2 Mon Sep 17 00:00:00 2001 From: Jussi Saurio Date: Mon, 25 Aug 2025 16:20:32 +0300 Subject: [PATCH] Fix several issues with integrity_check Things that were just wrong: 1. No pages other than the root page were checked, because no looping was done. Add a loop. 2. Rightmost child page was never added to page stack. Add it. New integrity check features: - Add overflow pages to stack as well - Check that no page is referenced more than once in the tree --- core/storage/btree.rs | 349 ++++++++++++++++++++++++------------------ 1 file changed, 202 insertions(+), 147 deletions(-) diff --git a/core/storage/btree.rs b/core/storage/btree.rs index cc14856a7..b2c36a560 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -37,7 +37,6 @@ use super::{ write_varint_to_vec, IndexInteriorCell, IndexLeafCell, OverflowCell, MINIMUM_CELL_SIZE, }, }; -#[cfg(debug_assertions)] use std::collections::HashSet; use std::{ cell::{Cell, Ref, RefCell}, @@ -5502,6 +5501,11 @@ pub enum IntegrityCheckError { got: usize, expected: usize, }, + #[error("Page {page_id} referenced multiple times")] + PageReferencedMultipleTimes { + page_id: usize, + is_overflow_page: bool, + }, } #[derive(Clone)] @@ -5514,6 +5518,7 @@ pub struct IntegrityCheckState { pub current_page: usize, page_stack: Vec, first_leaf_level: Option, + pages_referenced: HashSet, page: Option, } @@ -5526,6 +5531,7 @@ impl IntegrityCheckState { level: 0, max_intkey: i64::MAX, }], + pages_referenced: HashSet::new(), first_leaf_level: None, page: None, } @@ -5555,165 +5561,214 @@ pub fn integrity_check( errors: &mut Vec, pager: &Rc, ) -> Result> { - let Some(IntegrityCheckPageEntry { - page_idx, - level, - max_intkey, - }) = state.page_stack.last().cloned() - else { - return Ok(IOResult::Done(())); - }; - let page = match state.page.take() { - Some(page) => page, - None => { - let (page, c) = btree_read_page(pager, page_idx)?; - state.page = Some(page.get()); - if let Some(c) = c { - io_yield_one!(c); + loop { + let Some(IntegrityCheckPageEntry { + page_idx, + level, + max_intkey, + }) = state.page_stack.last().cloned() + else { + return Ok(IOResult::Done(())); + }; + let page = match state.page.take() { + Some(page) => page, + None => { + let (page, c) = btree_read_page(pager, page_idx)?; + state.page = Some(page.get()); + if let Some(c) = c { + io_yield_one!(c); + } + state.page.take().expect("page should be present") } - page.get() - } - }; - turso_assert!(page.is_loaded(), "page should be loaded"); - state.page_stack.pop(); + }; + turso_assert!(page.is_loaded(), "page should be loaded"); + state.page_stack.pop(); - let contents = page.get_contents(); - let usable_space = pager.usable_space(); - let mut coverage_checker = CoverageChecker::new(page.get().id); + let contents = page.get_contents(); + let is_overflow_page = contents.maybe_page_type().is_none(); + if !state.pages_referenced.insert(page.get().id) { + errors.push(IntegrityCheckError::PageReferencedMultipleTimes { + page_id: page.get().id, + is_overflow_page, + }); + continue; + } + let usable_space = pager.usable_space(); + let mut coverage_checker = CoverageChecker::new(page.get().id); - // Now we check every cell for few things: - // 1. Check cell is in correct range. Not exceeds page and not starts before we have marked - // (cell content area). - // 2. We add the cell to coverage checker in order to check if cells do not overlap. - // 3. We check order of rowids in case of table pages. We iterate backwards in order to check - // if current cell's rowid is less than the next cell. We also check rowid is less than the - // parent's divider cell. In case of this page being root page max rowid will be i64::MAX. - // 4. We append pages to the stack to check later. - // 5. In case of leaf page, check if the current level(depth) is equal to other leaf pages we - // have seen. - let mut next_rowid = max_intkey; - for cell_idx in (0..contents.cell_count()).rev() { - let (cell_start, cell_length) = contents.cell_get_raw_region(cell_idx, usable_space); - if cell_start < contents.cell_content_area() as usize || cell_start > usable_space - 4 { - errors.push(IntegrityCheckError::CellOutOfRange { - cell_idx, - page_id: page.get().id, - cell_start, - cell_end: cell_start + cell_length, - content_area: contents.cell_content_area() as usize, - usable_space, - }); - } - if cell_start + cell_length > usable_space { - errors.push(IntegrityCheckError::CellOverflowsPage { - cell_idx, - page_id: page.get().id, - cell_start, - cell_end: cell_start + cell_length, - content_area: contents.cell_content_area() as usize, - usable_space, - }); - } - coverage_checker.add_cell(cell_start, cell_start + cell_length); - let cell = contents.cell_get(cell_idx, usable_space)?; - match cell { - BTreeCell::TableInteriorCell(table_interior_cell) => { + if is_overflow_page { + let next_overflow_page = contents.read_u32_no_offset(0); + if next_overflow_page != 0 { state.page_stack.push(IntegrityCheckPageEntry { - page_idx: table_interior_cell.left_child_page as usize, - level: level + 1, - max_intkey: table_interior_cell.rowid, - }); - let rowid = table_interior_cell.rowid; - if rowid > max_intkey || rowid > next_rowid { - errors.push(IntegrityCheckError::CellRowidOutOfRange { - page_id: page.get().id, - cell_idx, - rowid, - max_intkey, - next_rowid, - }); - } - next_rowid = rowid; - } - BTreeCell::TableLeafCell(table_leaf_cell) => { - // check depth of leaf pages are equal - if let Some(expected_leaf_level) = state.first_leaf_level { - if expected_leaf_level != level { - errors.push(IntegrityCheckError::LeafDepthMismatch { - page_id: page.get().id, - this_page_depth: level, - other_page_depth: expected_leaf_level, - }); - } - } else { - state.first_leaf_level = Some(level); - } - let rowid = table_leaf_cell.rowid; - if rowid > max_intkey || rowid > next_rowid { - errors.push(IntegrityCheckError::CellRowidOutOfRange { - page_id: page.get().id, - cell_idx, - rowid, - max_intkey, - next_rowid, - }); - } - next_rowid = rowid; - } - BTreeCell::IndexInteriorCell(index_interior_cell) => { - state.page_stack.push(IntegrityCheckPageEntry { - page_idx: index_interior_cell.left_child_page as usize, - level: level + 1, - max_intkey, // we don't care about intkey in non-table pages + page_idx: next_overflow_page as usize, + level, + max_intkey, }); } - BTreeCell::IndexLeafCell(_) => { - // check depth of leaf pages are equal - if let Some(expected_leaf_level) = state.first_leaf_level { - if expected_leaf_level != level { - errors.push(IntegrityCheckError::LeafDepthMismatch { - page_id: page.get().id, - this_page_depth: level, - other_page_depth: expected_leaf_level, - }); - } - } else { - state.first_leaf_level = Some(level); - } - } + continue; } - } - // Now we add free blocks to the coverage checker - let first_freeblock = contents.first_freeblock() as usize; - if first_freeblock > 0 { - let mut pc = first_freeblock; - while pc > 0 { - let next = contents.read_u16_no_offset(pc as usize) as usize; - let size = contents.read_u16_no_offset(pc as usize + 2) as usize; - // check it doesn't go out of range - if pc > usable_space - 4 { - errors.push(IntegrityCheckError::FreeBlockOutOfRange { + // Now we check every cell for few things: + // 1. Check cell is in correct range. Not exceeds page and not starts before we have marked + // (cell content area). + // 2. We add the cell to coverage checker in order to check if cells do not overlap. + // 3. We check order of rowids in case of table pages. We iterate backwards in order to check + // if current cell's rowid is less than the next cell. We also check rowid is less than the + // parent's divider cell. In case of this page being root page max rowid will be i64::MAX. + // 4. We append pages to the stack to check later. + // 5. In case of leaf page, check if the current level(depth) is equal to other leaf pages we + // have seen. + let mut next_rowid = max_intkey; + for cell_idx in (0..contents.cell_count()).rev() { + let (cell_start, cell_length) = contents.cell_get_raw_region(cell_idx, usable_space); + if cell_start < contents.cell_content_area() as usize || cell_start > usable_space - 4 { + errors.push(IntegrityCheckError::CellOutOfRange { + cell_idx, page_id: page.get().id, - start: pc, - end: pc + size, + cell_start, + cell_end: cell_start + cell_length, + content_area: contents.cell_content_area() as usize, + usable_space, }); - break; } - coverage_checker.add_free_block(pc, pc + size); - pc = next; + if cell_start + cell_length > usable_space { + errors.push(IntegrityCheckError::CellOverflowsPage { + cell_idx, + page_id: page.get().id, + cell_start, + cell_end: cell_start + cell_length, + content_area: contents.cell_content_area() as usize, + usable_space, + }); + } + coverage_checker.add_cell(cell_start, cell_start + cell_length); + let cell = contents.cell_get(cell_idx, usable_space)?; + match cell { + BTreeCell::TableInteriorCell(table_interior_cell) => { + state.page_stack.push(IntegrityCheckPageEntry { + page_idx: table_interior_cell.left_child_page as usize, + level: level + 1, + max_intkey: table_interior_cell.rowid, + }); + let rowid = table_interior_cell.rowid; + if rowid > max_intkey || rowid > next_rowid { + errors.push(IntegrityCheckError::CellRowidOutOfRange { + page_id: page.get().id, + cell_idx, + rowid, + max_intkey, + next_rowid, + }); + } + next_rowid = rowid; + } + BTreeCell::TableLeafCell(table_leaf_cell) => { + // check depth of leaf pages are equal + if let Some(expected_leaf_level) = state.first_leaf_level { + if expected_leaf_level != level { + errors.push(IntegrityCheckError::LeafDepthMismatch { + page_id: page.get().id, + this_page_depth: level, + other_page_depth: expected_leaf_level, + }); + } + } else { + state.first_leaf_level = Some(level); + } + let rowid = table_leaf_cell.rowid; + if rowid > max_intkey || rowid > next_rowid { + errors.push(IntegrityCheckError::CellRowidOutOfRange { + page_id: page.get().id, + cell_idx, + rowid, + max_intkey, + next_rowid, + }); + } + next_rowid = rowid; + if let Some(first_overflow_page) = table_leaf_cell.first_overflow_page { + state.page_stack.push(IntegrityCheckPageEntry { + page_idx: first_overflow_page as usize, + level, + max_intkey, + }); + } + } + BTreeCell::IndexInteriorCell(index_interior_cell) => { + state.page_stack.push(IntegrityCheckPageEntry { + page_idx: index_interior_cell.left_child_page as usize, + level: level + 1, + max_intkey, // we don't care about intkey in non-table pages + }); + if let Some(first_overflow_page) = index_interior_cell.first_overflow_page { + state.page_stack.push(IntegrityCheckPageEntry { + page_idx: first_overflow_page as usize, + level, + max_intkey, + }); + } + } + BTreeCell::IndexLeafCell(index_leaf_cell) => { + // check depth of leaf pages are equal + if let Some(expected_leaf_level) = state.first_leaf_level { + if expected_leaf_level != level { + errors.push(IntegrityCheckError::LeafDepthMismatch { + page_id: page.get().id, + this_page_depth: level, + other_page_depth: expected_leaf_level, + }); + } + } else { + state.first_leaf_level = Some(level); + } + if let Some(first_overflow_page) = index_leaf_cell.first_overflow_page { + state.page_stack.push(IntegrityCheckPageEntry { + page_idx: first_overflow_page as usize, + level, + max_intkey, + }); + } + } + } } + + if let Some(rightmost) = contents.rightmost_pointer() { + state.page_stack.push(IntegrityCheckPageEntry { + page_idx: rightmost as usize, + level: level + 1, + max_intkey, + }); + } + + // Now we add free blocks to the coverage checker + let first_freeblock = contents.first_freeblock() as usize; + if first_freeblock > 0 { + let mut pc = first_freeblock; + while pc > 0 { + let next = contents.read_u16_no_offset(pc as usize) as usize; + let size = contents.read_u16_no_offset(pc as usize + 2) as usize; + // check it doesn't go out of range + if pc > usable_space - 4 { + errors.push(IntegrityCheckError::FreeBlockOutOfRange { + page_id: page.get().id, + start: pc, + end: pc + size, + }); + break; + } + coverage_checker.add_free_block(pc, pc + size); + pc = next; + } + } + + // Let's check the overlap of freeblocks and cells now that we have collected them all. + coverage_checker.analyze( + usable_space, + contents.cell_content_area() as usize, + errors, + contents.num_frag_free_bytes() as usize, + ); } - - // Let's check the overlap of freeblocks and cells now that we have collected them all. - coverage_checker.analyze( - usable_space, - contents.cell_content_area() as usize, - errors, - contents.num_frag_free_bytes() as usize, - ); - - Ok(IOResult::Done(())) } pub fn btree_read_page(