Fix several issues with integrity_check

Things that were just wrong:

1. No pages other than the root page were checked, because no looping
was done. Add a loop.
2. Rightmost child page was never added to page stack. Add it.

New integrity check features:

- Add overflow pages to stack as well
- Check that no page is referenced more than once in the tree
This commit is contained in:
Jussi Saurio
2025-08-25 16:20:32 +03:00
parent 1a4a53e6ea
commit 8cae10f744

View File

@@ -37,7 +37,6 @@ use super::{
write_varint_to_vec, IndexInteriorCell, IndexLeafCell, OverflowCell, MINIMUM_CELL_SIZE,
},
};
#[cfg(debug_assertions)]
use std::collections::HashSet;
use std::{
cell::{Cell, Ref, RefCell},
@@ -5502,6 +5501,11 @@ pub enum IntegrityCheckError {
got: usize,
expected: usize,
},
#[error("Page {page_id} referenced multiple times")]
PageReferencedMultipleTimes {
page_id: usize,
is_overflow_page: bool,
},
}
#[derive(Clone)]
@@ -5514,6 +5518,7 @@ pub struct IntegrityCheckState {
pub current_page: usize,
page_stack: Vec<IntegrityCheckPageEntry>,
first_leaf_level: Option<usize>,
pages_referenced: HashSet<usize>,
page: Option<PageRef>,
}
@@ -5526,6 +5531,7 @@ impl IntegrityCheckState {
level: 0,
max_intkey: i64::MAX,
}],
pages_referenced: HashSet::new(),
first_leaf_level: None,
page: None,
}
@@ -5555,165 +5561,214 @@ pub fn integrity_check(
errors: &mut Vec<IntegrityCheckError>,
pager: &Rc<Pager>,
) -> Result<IOResult<()>> {
let Some(IntegrityCheckPageEntry {
page_idx,
level,
max_intkey,
}) = state.page_stack.last().cloned()
else {
return Ok(IOResult::Done(()));
};
let page = match state.page.take() {
Some(page) => page,
None => {
let (page, c) = btree_read_page(pager, page_idx)?;
state.page = Some(page.get());
if let Some(c) = c {
io_yield_one!(c);
loop {
let Some(IntegrityCheckPageEntry {
page_idx,
level,
max_intkey,
}) = state.page_stack.last().cloned()
else {
return Ok(IOResult::Done(()));
};
let page = match state.page.take() {
Some(page) => page,
None => {
let (page, c) = btree_read_page(pager, page_idx)?;
state.page = Some(page.get());
if let Some(c) = c {
io_yield_one!(c);
}
state.page.take().expect("page should be present")
}
page.get()
}
};
turso_assert!(page.is_loaded(), "page should be loaded");
state.page_stack.pop();
};
turso_assert!(page.is_loaded(), "page should be loaded");
state.page_stack.pop();
let contents = page.get_contents();
let usable_space = pager.usable_space();
let mut coverage_checker = CoverageChecker::new(page.get().id);
let contents = page.get_contents();
let is_overflow_page = contents.maybe_page_type().is_none();
if !state.pages_referenced.insert(page.get().id) {
errors.push(IntegrityCheckError::PageReferencedMultipleTimes {
page_id: page.get().id,
is_overflow_page,
});
continue;
}
let usable_space = pager.usable_space();
let mut coverage_checker = CoverageChecker::new(page.get().id);
// Now we check every cell for few things:
// 1. Check cell is in correct range. Not exceeds page and not starts before we have marked
// (cell content area).
// 2. We add the cell to coverage checker in order to check if cells do not overlap.
// 3. We check order of rowids in case of table pages. We iterate backwards in order to check
// if current cell's rowid is less than the next cell. We also check rowid is less than the
// parent's divider cell. In case of this page being root page max rowid will be i64::MAX.
// 4. We append pages to the stack to check later.
// 5. In case of leaf page, check if the current level(depth) is equal to other leaf pages we
// have seen.
let mut next_rowid = max_intkey;
for cell_idx in (0..contents.cell_count()).rev() {
let (cell_start, cell_length) = contents.cell_get_raw_region(cell_idx, usable_space);
if cell_start < contents.cell_content_area() as usize || cell_start > usable_space - 4 {
errors.push(IntegrityCheckError::CellOutOfRange {
cell_idx,
page_id: page.get().id,
cell_start,
cell_end: cell_start + cell_length,
content_area: contents.cell_content_area() as usize,
usable_space,
});
}
if cell_start + cell_length > usable_space {
errors.push(IntegrityCheckError::CellOverflowsPage {
cell_idx,
page_id: page.get().id,
cell_start,
cell_end: cell_start + cell_length,
content_area: contents.cell_content_area() as usize,
usable_space,
});
}
coverage_checker.add_cell(cell_start, cell_start + cell_length);
let cell = contents.cell_get(cell_idx, usable_space)?;
match cell {
BTreeCell::TableInteriorCell(table_interior_cell) => {
if is_overflow_page {
let next_overflow_page = contents.read_u32_no_offset(0);
if next_overflow_page != 0 {
state.page_stack.push(IntegrityCheckPageEntry {
page_idx: table_interior_cell.left_child_page as usize,
level: level + 1,
max_intkey: table_interior_cell.rowid,
});
let rowid = table_interior_cell.rowid;
if rowid > max_intkey || rowid > next_rowid {
errors.push(IntegrityCheckError::CellRowidOutOfRange {
page_id: page.get().id,
cell_idx,
rowid,
max_intkey,
next_rowid,
});
}
next_rowid = rowid;
}
BTreeCell::TableLeafCell(table_leaf_cell) => {
// check depth of leaf pages are equal
if let Some(expected_leaf_level) = state.first_leaf_level {
if expected_leaf_level != level {
errors.push(IntegrityCheckError::LeafDepthMismatch {
page_id: page.get().id,
this_page_depth: level,
other_page_depth: expected_leaf_level,
});
}
} else {
state.first_leaf_level = Some(level);
}
let rowid = table_leaf_cell.rowid;
if rowid > max_intkey || rowid > next_rowid {
errors.push(IntegrityCheckError::CellRowidOutOfRange {
page_id: page.get().id,
cell_idx,
rowid,
max_intkey,
next_rowid,
});
}
next_rowid = rowid;
}
BTreeCell::IndexInteriorCell(index_interior_cell) => {
state.page_stack.push(IntegrityCheckPageEntry {
page_idx: index_interior_cell.left_child_page as usize,
level: level + 1,
max_intkey, // we don't care about intkey in non-table pages
page_idx: next_overflow_page as usize,
level,
max_intkey,
});
}
BTreeCell::IndexLeafCell(_) => {
// check depth of leaf pages are equal
if let Some(expected_leaf_level) = state.first_leaf_level {
if expected_leaf_level != level {
errors.push(IntegrityCheckError::LeafDepthMismatch {
page_id: page.get().id,
this_page_depth: level,
other_page_depth: expected_leaf_level,
});
}
} else {
state.first_leaf_level = Some(level);
}
}
continue;
}
}
// Now we add free blocks to the coverage checker
let first_freeblock = contents.first_freeblock() as usize;
if first_freeblock > 0 {
let mut pc = first_freeblock;
while pc > 0 {
let next = contents.read_u16_no_offset(pc as usize) as usize;
let size = contents.read_u16_no_offset(pc as usize + 2) as usize;
// check it doesn't go out of range
if pc > usable_space - 4 {
errors.push(IntegrityCheckError::FreeBlockOutOfRange {
// Now we check every cell for few things:
// 1. Check cell is in correct range. Not exceeds page and not starts before we have marked
// (cell content area).
// 2. We add the cell to coverage checker in order to check if cells do not overlap.
// 3. We check order of rowids in case of table pages. We iterate backwards in order to check
// if current cell's rowid is less than the next cell. We also check rowid is less than the
// parent's divider cell. In case of this page being root page max rowid will be i64::MAX.
// 4. We append pages to the stack to check later.
// 5. In case of leaf page, check if the current level(depth) is equal to other leaf pages we
// have seen.
let mut next_rowid = max_intkey;
for cell_idx in (0..contents.cell_count()).rev() {
let (cell_start, cell_length) = contents.cell_get_raw_region(cell_idx, usable_space);
if cell_start < contents.cell_content_area() as usize || cell_start > usable_space - 4 {
errors.push(IntegrityCheckError::CellOutOfRange {
cell_idx,
page_id: page.get().id,
start: pc,
end: pc + size,
cell_start,
cell_end: cell_start + cell_length,
content_area: contents.cell_content_area() as usize,
usable_space,
});
break;
}
coverage_checker.add_free_block(pc, pc + size);
pc = next;
if cell_start + cell_length > usable_space {
errors.push(IntegrityCheckError::CellOverflowsPage {
cell_idx,
page_id: page.get().id,
cell_start,
cell_end: cell_start + cell_length,
content_area: contents.cell_content_area() as usize,
usable_space,
});
}
coverage_checker.add_cell(cell_start, cell_start + cell_length);
let cell = contents.cell_get(cell_idx, usable_space)?;
match cell {
BTreeCell::TableInteriorCell(table_interior_cell) => {
state.page_stack.push(IntegrityCheckPageEntry {
page_idx: table_interior_cell.left_child_page as usize,
level: level + 1,
max_intkey: table_interior_cell.rowid,
});
let rowid = table_interior_cell.rowid;
if rowid > max_intkey || rowid > next_rowid {
errors.push(IntegrityCheckError::CellRowidOutOfRange {
page_id: page.get().id,
cell_idx,
rowid,
max_intkey,
next_rowid,
});
}
next_rowid = rowid;
}
BTreeCell::TableLeafCell(table_leaf_cell) => {
// check depth of leaf pages are equal
if let Some(expected_leaf_level) = state.first_leaf_level {
if expected_leaf_level != level {
errors.push(IntegrityCheckError::LeafDepthMismatch {
page_id: page.get().id,
this_page_depth: level,
other_page_depth: expected_leaf_level,
});
}
} else {
state.first_leaf_level = Some(level);
}
let rowid = table_leaf_cell.rowid;
if rowid > max_intkey || rowid > next_rowid {
errors.push(IntegrityCheckError::CellRowidOutOfRange {
page_id: page.get().id,
cell_idx,
rowid,
max_intkey,
next_rowid,
});
}
next_rowid = rowid;
if let Some(first_overflow_page) = table_leaf_cell.first_overflow_page {
state.page_stack.push(IntegrityCheckPageEntry {
page_idx: first_overflow_page as usize,
level,
max_intkey,
});
}
}
BTreeCell::IndexInteriorCell(index_interior_cell) => {
state.page_stack.push(IntegrityCheckPageEntry {
page_idx: index_interior_cell.left_child_page as usize,
level: level + 1,
max_intkey, // we don't care about intkey in non-table pages
});
if let Some(first_overflow_page) = index_interior_cell.first_overflow_page {
state.page_stack.push(IntegrityCheckPageEntry {
page_idx: first_overflow_page as usize,
level,
max_intkey,
});
}
}
BTreeCell::IndexLeafCell(index_leaf_cell) => {
// check depth of leaf pages are equal
if let Some(expected_leaf_level) = state.first_leaf_level {
if expected_leaf_level != level {
errors.push(IntegrityCheckError::LeafDepthMismatch {
page_id: page.get().id,
this_page_depth: level,
other_page_depth: expected_leaf_level,
});
}
} else {
state.first_leaf_level = Some(level);
}
if let Some(first_overflow_page) = index_leaf_cell.first_overflow_page {
state.page_stack.push(IntegrityCheckPageEntry {
page_idx: first_overflow_page as usize,
level,
max_intkey,
});
}
}
}
}
if let Some(rightmost) = contents.rightmost_pointer() {
state.page_stack.push(IntegrityCheckPageEntry {
page_idx: rightmost as usize,
level: level + 1,
max_intkey,
});
}
// Now we add free blocks to the coverage checker
let first_freeblock = contents.first_freeblock() as usize;
if first_freeblock > 0 {
let mut pc = first_freeblock;
while pc > 0 {
let next = contents.read_u16_no_offset(pc as usize) as usize;
let size = contents.read_u16_no_offset(pc as usize + 2) as usize;
// check it doesn't go out of range
if pc > usable_space - 4 {
errors.push(IntegrityCheckError::FreeBlockOutOfRange {
page_id: page.get().id,
start: pc,
end: pc + size,
});
break;
}
coverage_checker.add_free_block(pc, pc + size);
pc = next;
}
}
// Let's check the overlap of freeblocks and cells now that we have collected them all.
coverage_checker.analyze(
usable_space,
contents.cell_content_area() as usize,
errors,
contents.num_frag_free_bytes() as usize,
);
}
// Let's check the overlap of freeblocks and cells now that we have collected them all.
coverage_checker.analyze(
usable_space,
contents.cell_content_area() as usize,
errors,
contents.num_frag_free_bytes() as usize,
);
Ok(IOResult::Done(()))
}
pub fn btree_read_page(