improve integrity check

- check free list trunk and pages
- use shared hash map to check for duplicate references for pages
- properly check overflow pages
This commit is contained in:
Nikita Sivukhin
2025-08-27 23:14:21 +04:00
parent 30c5473151
commit ae705445bf
2 changed files with 195 additions and 69 deletions

View File

@@ -38,7 +38,7 @@ use super::{
write_varint_to_vec, IndexInteriorCell, IndexLeafCell, OverflowCell, MINIMUM_CELL_SIZE,
},
};
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::{
cell::{Cell, Ref, RefCell},
cmp::{Ordering, Reverse},
@@ -5504,46 +5504,96 @@ pub enum IntegrityCheckError {
got: usize,
expected: usize,
},
#[error("Page {page_id} referenced multiple times")]
#[error("Page {page_id} referenced multiple times (references={references:?}, page_category={page_category:?})")]
PageReferencedMultipleTimes {
page_id: usize,
is_overflow_page: bool,
page_id: u64,
references: Vec<u64>,
page_category: PageCategory,
},
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub(crate) enum PageCategory {
Normal,
Overflow,
FreeListTrunk,
FreePage,
}
#[derive(Clone)]
struct IntegrityCheckPageEntry {
page_idx: usize,
level: usize,
max_intkey: i64,
page_category: PageCategory,
}
pub struct IntegrityCheckState {
pub current_page: usize,
page_stack: Vec<IntegrityCheckPageEntry>,
first_leaf_level: Option<usize>,
pages_referenced: HashSet<usize>,
page_reference: HashMap<u64, u64>,
page: Option<PageRef>,
}
impl IntegrityCheckState {
pub fn new(page_idx: usize) -> Self {
pub fn new() -> Self {
Self {
current_page: page_idx,
page_stack: vec![IntegrityCheckPageEntry {
page_idx,
level: 0,
max_intkey: i64::MAX,
}],
pages_referenced: HashSet::new(),
page_stack: Vec::new(),
page_reference: HashMap::new(),
first_leaf_level: None,
page: None,
}
}
pub fn start(
&mut self,
page_idx: usize,
page_category: PageCategory,
errors: &mut Vec<IntegrityCheckError>,
) {
assert!(
self.page_stack.len() == 0,
"stack should be empty before integrity check for new root"
);
self.first_leaf_level = None;
let _ = self.page.take();
// root can't be referenced from anywhere - so we insert "zero entry" for it
self.push_page(
IntegrityCheckPageEntry {
page_idx,
level: 0,
max_intkey: i64::MAX,
page_category,
},
0,
errors,
);
}
fn push_page(
&mut self,
entry: IntegrityCheckPageEntry,
referenced_by: u64,
errors: &mut Vec<IntegrityCheckError>,
) {
let page_id = entry.page_idx as u64;
let Some(previous) = self.page_reference.insert(page_id, referenced_by) else {
// do not traverse free pages as they have no meaingful structured content
if entry.page_category != PageCategory::FreePage {
self.page_stack.push(entry);
}
return;
};
errors.push(IntegrityCheckError::PageReferencedMultipleTimes {
page_id,
page_category: entry.page_category,
references: vec![previous, referenced_by],
});
}
}
impl std::fmt::Debug for IntegrityCheckState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("IntegrityCheckState")
.field("current_page", &self.current_page)
.field("first_leaf_level", &self.first_leaf_level)
.finish()
}
@@ -5567,6 +5617,7 @@ pub fn integrity_check(
loop {
let Some(IntegrityCheckPageEntry {
page_idx,
page_category,
level,
max_intkey,
}) = state.page_stack.last().cloned()
@@ -5588,28 +5639,55 @@ pub fn integrity_check(
state.page_stack.pop();
let contents = page.get_contents();
let is_overflow_page = contents.maybe_page_type().is_none();
if !state.pages_referenced.insert(page.get().id) {
errors.push(IntegrityCheckError::PageReferencedMultipleTimes {
page_id: page.get().id,
is_overflow_page,
});
continue;
}
let usable_space = pager.usable_space();
let mut coverage_checker = CoverageChecker::new(page.get().id);
if is_overflow_page {
let next_overflow_page = contents.read_u32_no_offset(0);
if next_overflow_page != 0 {
state.page_stack.push(IntegrityCheckPageEntry {
page_idx: next_overflow_page as usize,
level,
max_intkey,
});
if page_category == PageCategory::FreeListTrunk {
let next_freelist_trunk_page = contents.read_u32_no_offset(0);
if next_freelist_trunk_page != 0 {
state.push_page(
IntegrityCheckPageEntry {
page_idx: next_freelist_trunk_page as usize,
level,
max_intkey,
page_category: PageCategory::FreeListTrunk,
},
page.get().id as u64,
errors,
);
}
let page_pointers = contents.read_u32_no_offset(4);
for i in 0..page_pointers {
let page_pointer = contents.read_u32_no_offset((8 + 4 * i) as usize);
state.push_page(
IntegrityCheckPageEntry {
page_idx: page_pointer as usize,
level,
max_intkey,
page_category: PageCategory::FreePage,
},
page.get().id as u64,
errors,
);
}
continue;
}
if page_category == PageCategory::Overflow {
let next_overflow_page = contents.read_u32_no_offset(0);
if next_overflow_page != 0 {
state.push_page(
IntegrityCheckPageEntry {
page_idx: next_overflow_page as usize,
level,
max_intkey,
page_category: PageCategory::Overflow,
},
page.get().id as u64,
errors,
);
}
continue;
}
let usable_space = pager.usable_space();
let mut coverage_checker = CoverageChecker::new(page.get().id);
// Now we check every cell for few things:
// 1. Check cell is in correct range. Not exceeds page and not starts before we have marked
@@ -5648,11 +5726,16 @@ pub fn integrity_check(
let cell = contents.cell_get(cell_idx, usable_space)?;
match cell {
BTreeCell::TableInteriorCell(table_interior_cell) => {
state.page_stack.push(IntegrityCheckPageEntry {
page_idx: table_interior_cell.left_child_page as usize,
level: level + 1,
max_intkey: table_interior_cell.rowid,
});
state.push_page(
IntegrityCheckPageEntry {
page_idx: table_interior_cell.left_child_page as usize,
level: level + 1,
max_intkey: table_interior_cell.rowid,
page_category: PageCategory::Normal,
},
page.get().id as u64,
errors,
);
let rowid = table_interior_cell.rowid;
if rowid > max_intkey || rowid > next_rowid {
errors.push(IntegrityCheckError::CellRowidOutOfRange {
@@ -5690,25 +5773,40 @@ pub fn integrity_check(
}
next_rowid = rowid;
if let Some(first_overflow_page) = table_leaf_cell.first_overflow_page {
state.page_stack.push(IntegrityCheckPageEntry {
page_idx: first_overflow_page as usize,
level,
max_intkey,
});
state.push_page(
IntegrityCheckPageEntry {
page_idx: first_overflow_page as usize,
level,
max_intkey,
page_category: PageCategory::Overflow,
},
page.get().id as u64,
errors,
);
}
}
BTreeCell::IndexInteriorCell(index_interior_cell) => {
state.page_stack.push(IntegrityCheckPageEntry {
page_idx: index_interior_cell.left_child_page as usize,
level: level + 1,
max_intkey, // we don't care about intkey in non-table pages
});
state.push_page(
IntegrityCheckPageEntry {
page_idx: index_interior_cell.left_child_page as usize,
level: level + 1,
max_intkey, // we don't care about intkey in non-table pages
page_category: PageCategory::Normal,
},
page.get().id as u64,
errors,
);
if let Some(first_overflow_page) = index_interior_cell.first_overflow_page {
state.page_stack.push(IntegrityCheckPageEntry {
page_idx: first_overflow_page as usize,
level,
max_intkey,
});
state.push_page(
IntegrityCheckPageEntry {
page_idx: first_overflow_page as usize,
level,
max_intkey,
page_category: PageCategory::Overflow,
},
page.get().id as u64,
errors,
);
}
}
BTreeCell::IndexLeafCell(index_leaf_cell) => {
@@ -5725,22 +5823,32 @@ pub fn integrity_check(
state.first_leaf_level = Some(level);
}
if let Some(first_overflow_page) = index_leaf_cell.first_overflow_page {
state.page_stack.push(IntegrityCheckPageEntry {
page_idx: first_overflow_page as usize,
level,
max_intkey,
});
state.push_page(
IntegrityCheckPageEntry {
page_idx: first_overflow_page as usize,
level,
max_intkey,
page_category: PageCategory::Overflow,
},
page.get().id as u64,
errors,
);
}
}
}
}
if let Some(rightmost) = contents.rightmost_pointer() {
state.page_stack.push(IntegrityCheckPageEntry {
page_idx: rightmost as usize,
level: level + 1,
max_intkey,
});
state.push_page(
IntegrityCheckPageEntry {
page_idx: rightmost as usize,
level: level + 1,
max_intkey,
page_category: PageCategory::Normal,
},
page.get().id as u64,
errors,
);
}
// Now we add free blocks to the coverage checker

View File

@@ -2,7 +2,9 @@
use crate::function::AlterTableFunc;
use crate::numeric::{NullableInteger, Numeric};
use crate::schema::Table;
use crate::storage::btree::{integrity_check, IntegrityCheckError, IntegrityCheckState};
use crate::storage::btree::{
integrity_check, IntegrityCheckError, IntegrityCheckState, PageCategory,
};
use crate::storage::database::DatabaseFile;
use crate::storage::page_cache::DumbLruPageCache;
use crate::storage::pager::{AtomicDbState, CreateBTreeFlags, DbState};
@@ -7108,10 +7110,26 @@ pub fn op_integrity_check(
);
match &mut state.op_integrity_check_state {
OpIntegrityCheckState::Start => {
let freelist_trunk_page =
return_if_io!(pager.with_header(|header| header.freelist_trunk_page.get()));
let mut errors = Vec::new();
let mut integrity_check_state = IntegrityCheckState::new();
let mut current_root_idx = 0;
// check freelist pages first, if there are any for database
if freelist_trunk_page > 0 {
integrity_check_state.start(
freelist_trunk_page as usize,
PageCategory::FreeListTrunk,
&mut errors,
);
} else {
integrity_check_state.start(roots[0], PageCategory::Normal, &mut errors);
current_root_idx += 1;
}
state.op_integrity_check_state = OpIntegrityCheckState::Checking {
errors: Vec::new(),
current_root_idx: 0,
state: IntegrityCheckState::new(roots[0]),
errors,
state: integrity_check_state,
current_root_idx,
};
}
OpIntegrityCheckState::Checking {
@@ -7120,9 +7138,9 @@ pub fn op_integrity_check(
state: integrity_check_state,
} => {
return_if_io!(integrity_check(integrity_check_state, errors, pager));
*current_root_idx += 1;
if *current_root_idx < roots.len() {
*integrity_check_state = IntegrityCheckState::new(roots[*current_root_idx]);
integrity_check_state.start(roots[*current_root_idx], PageCategory::Normal, errors);
*current_root_idx += 1;
return Ok(InsnFunctionStepResult::Step);
} else {
let message = if errors.is_empty() {