From a24e1b775ce52b8ae0798c2bc13180e69a2bfcf2 Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Wed, 11 Jun 2025 17:56:19 +0200 Subject: [PATCH] check order of rowids --- core/storage/btree.rs | 125 +++++++++++++++++++++++++++++++++++------- 1 file changed, 105 insertions(+), 20 deletions(-) diff --git a/core/storage/btree.rs b/core/storage/btree.rs index fff157872..240f9e3de 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -5098,22 +5098,28 @@ impl BTreeCursor { } } +#[derive(Clone)] +struct IntegrityCheckPageEntry { + page_idx: usize, + level: usize, + max_intkey: i64, +} pub struct IntegrityCheckState { pub current_page: usize, - page_stack: PageStack, - start: bool, + page_stack: Vec, + first_leaf_level: Option, } impl IntegrityCheckState { pub fn new(page_idx: usize) -> Self { Self { current_page: page_idx, - page_stack: PageStack { - current_page: Cell::new(-1), - cell_indices: RefCell::new([0; BTCURSOR_MAX_DEPTH + 1]), - stack: RefCell::new([const { None }; BTCURSOR_MAX_DEPTH + 1]), - }, - start: true, + page_stack: vec![IntegrityCheckPageEntry { + page_idx, + level: 0, + max_intkey: i64::MAX, + }], + first_leaf_level: None, } } } @@ -5121,32 +5127,51 @@ impl std::fmt::Debug for IntegrityCheckState { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("IntegrityCheckState") .field("current_page", &self.current_page) - .field("start", &self.start) + .field("first_leaf_level", &self.first_leaf_level) .finish() } } +/// Perform integrity check on a whole table/index. We check for: +/// 1. Correct order of keys in case of rowids. +/// 2. There are no overlap between cells. +/// 3. Cells do not scape outside expected range. +/// 4. Depth of leaf pages are equal. +/// 5. Overflow pages are correct (TODO) +/// +/// In order to keep this reentrant, we keep a stack of pages we need to check. Ideally, like in +/// SQLlite, we would have implemented a recursive solution which would make it easier to check the +/// depth. pub fn integrity_check( state: &mut IntegrityCheckState, error_count: &mut usize, message: &mut String, pager: &Rc, ) -> Result> { - if state.start { - let page = btree_read_page(pager, state.current_page)?; - return_if_locked_maybe_load!(pager, page); - state.page_stack.push(page); - state.start = false; - } - let page = state.page_stack.top(); + let Some(IntegrityCheckPageEntry { page_idx, level, max_intkey }) = state.page_stack.last().cloned() else { + return Ok(CursorResult::Ok(())); + }; + let page = btree_read_page(pager, page_idx)?; return_if_locked_maybe_load!(pager, page); + state.page_stack.pop(); let page = page.get(); let contents = page.get_contents(); let usable_space = pager.usable_space() as u16; let mut coverage_checker = CoverageChecker::new(page.get().id); - for cell_idx in 0..contents.cell_count() { + // Now we check every cell for few things: + // 1. Check cell is in correct range. Not exceeds page and not starts before we have marked + // (cell content area). + // 2. We add the cell to coverage checker in order to check if cells do not overlap. + // 3. We check order of rowids in case of table pages. We iterate backwards in order to check + // if current cell's rowid is less than the next cell. We also check rowid is less than the + // parent's divider cell. In case of this page being root page max rowid will be i64::MAX. + // 4. We append pages to the stack to check later. + // 5. In case of leaf page, check if the current level(depth) is equal to other leaf pages we + // have seen. + let mut next_rowid = max_intkey; + for cell_idx in (0..contents.cell_count()).rev() { let (cell_start, cell_length) = contents.cell_get_raw_region( cell_idx, payload_overflow_threshold_max(contents.page_type(), usable_space), @@ -5166,10 +5191,69 @@ pub fn integrity_check( *error_count += 1; } coverage_checker.add_cell(cell_start, cell_start + cell_length); - // TODO: check order - // TODO: check overflow + let cell = contents.cell_get( + cell_idx, + payload_overflow_threshold_max(contents.page_type(), usable_space), + payload_overflow_threshold_min(contents.page_type(), usable_space), + usable_space as usize, + )?; + match cell { + BTreeCell::TableInteriorCell(table_interior_cell) => { + state.page_stack.push(IntegrityCheckPageEntry { + page_idx: table_interior_cell._left_child_page as usize, + level: level + 1, + max_intkey: table_interior_cell._rowid, + }); + let rowid = table_interior_cell._rowid; + if rowid > max_intkey || rowid > next_rowid { + let error_msg = format!("Page {} cell {} has rowid={} in wrong order. Parent cell has parent_rowid={} and next_rowid={}", page.get().id, cell_idx, rowid, max_intkey, next_rowid); + message.write_str(&error_msg).unwrap(); + *error_count += 1; + } + next_rowid = rowid; + } + BTreeCell::TableLeafCell(table_leaf_cell) => { + // check depth of leaf pages are equal + if let Some(expected_leaf_level) = state.first_leaf_level { + if expected_leaf_level != level { + let error_msg = format!("Page {} is at different depth from another leaf page this_page_depth={}, other_page_depth={} ", page.get().id, level, expected_leaf_level); + message.write_str(&error_msg).unwrap(); + *error_count += 1; + } + } else { + state.first_leaf_level = Some(level); + } + let rowid = table_leaf_cell._rowid; + if rowid > max_intkey || rowid > next_rowid { + let error_msg = format!("Page {} cell {} has rowid={} in wrong order. Parent cell has parent_rowid={} and next_rowid={}", page.get().id, cell_idx, rowid, max_intkey, next_rowid); + message.write_str(&error_msg).unwrap(); + *error_count += 1; + } + next_rowid = rowid; + } + BTreeCell::IndexInteriorCell(index_interior_cell) => { + state.page_stack.push(IntegrityCheckPageEntry { + page_idx: index_interior_cell.left_child_page as usize, + level: level + 1, + max_intkey, // we don't care about intkey in non-table pages + }); + } + BTreeCell::IndexLeafCell(_) => { + // check depth of leaf pages are equal + if let Some(expected_leaf_level) = state.first_leaf_level { + if expected_leaf_level != level { + let error_msg = format!("Page {} is at different depth from another leaf page this_page_depth={}, other_page_depth={} ", page.get().id, level, expected_leaf_level); + message.write_str(&error_msg).unwrap(); + *error_count += 1; + } + } else { + state.first_leaf_level = Some(level); + } + } + } } + // Now we add free blocks to the coverage checker let first_freeblock = contents.first_freeblock(); if first_freeblock > 0 { let mut pc = first_freeblock; @@ -5193,6 +5277,7 @@ pub fn integrity_check( } } + // Let's check the overlap of freeblocks and cells now that we have collected them all. coverage_checker.analyze( usable_space, contents.cell_content_area() as usize, @@ -5201,7 +5286,7 @@ pub fn integrity_check( contents.num_frag_free_bytes() as usize, ); - Ok(CursorResult::Ok(())) + Ok(CursorResult::IO) } pub fn btree_read_page(pager: &Rc, page_idx: usize) -> Result {