Merge 'Simple integrity check on btree' from Pere Diaz Bou

This PR adds support for the instruction `IntegrityCk` which performs an
integrity check on the contents of a single table. Next PR I will try to
implement the rest of the integrity check where we would check indexes
containt correct amount of data and some more.
<img width="1151" alt="image" src="https://github.com/user-
attachments/assets/29d54148-55ba-480f-b972-e38587f0a483" />

Closes #1719
This commit is contained in:
Pekka Enberg
2025-06-16 13:46:26 +03:00
12 changed files with 448 additions and 8 deletions

View File

@@ -73,6 +73,10 @@ fn pragma_for(pragma: PragmaName) -> Pragma {
PragmaFlags::NoColumns1 | PragmaFlags::Result0,
&["auto_vacuum"],
),
IntegrityCheck => Pragma::new(
PragmaFlags::NeedSchema | PragmaFlags::ReadOnly | PragmaFlags::Result0,
&["message"],
),
}
}

View File

@@ -24,8 +24,9 @@ use crate::{
use std::collections::HashSet;
use std::{
cell::{Cell, Ref, RefCell},
cmp::Ordering,
fmt::Debug,
cmp::{Ordering, Reverse},
collections::BinaryHeap,
fmt::{Debug, Write},
pin::Pin,
rc::Rc,
sync::Arc,
@@ -5096,11 +5097,7 @@ impl BTreeCursor {
}
pub fn read_page(&self, page_idx: usize) -> Result<BTreePage> {
self.pager.read_page(page_idx).map(|page| {
Arc::new(BTreePageInner {
page: RefCell::new(page),
})
})
btree_read_page(&self.pager, page_idx)
}
pub fn allocate_page(&self, page_type: PageType, offset: usize) -> BTreePage {
@@ -5109,6 +5106,230 @@ impl BTreeCursor {
}
}
#[derive(Clone)]
struct IntegrityCheckPageEntry {
page_idx: usize,
level: usize,
max_intkey: i64,
}
pub struct IntegrityCheckState {
pub current_page: usize,
page_stack: Vec<IntegrityCheckPageEntry>,
first_leaf_level: Option<usize>,
}
impl IntegrityCheckState {
pub fn new(page_idx: usize) -> Self {
Self {
current_page: page_idx,
page_stack: vec![IntegrityCheckPageEntry {
page_idx,
level: 0,
max_intkey: i64::MAX,
}],
first_leaf_level: None,
}
}
}
impl std::fmt::Debug for IntegrityCheckState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("IntegrityCheckState")
.field("current_page", &self.current_page)
.field("first_leaf_level", &self.first_leaf_level)
.finish()
}
}
/// Perform integrity check on a whole table/index. We check for:
/// 1. Correct order of keys in case of rowids.
/// 2. There are no overlap between cells.
/// 3. Cells do not scape outside expected range.
/// 4. Depth of leaf pages are equal.
/// 5. Overflow pages are correct (TODO)
///
/// In order to keep this reentrant, we keep a stack of pages we need to check. Ideally, like in
/// SQLlite, we would have implemented a recursive solution which would make it easier to check the
/// depth.
pub fn integrity_check(
state: &mut IntegrityCheckState,
error_count: &mut usize,
message: &mut String,
pager: &Rc<Pager>,
) -> Result<CursorResult<()>> {
let Some(IntegrityCheckPageEntry {
page_idx,
level,
max_intkey,
}) = state.page_stack.last().cloned()
else {
return Ok(CursorResult::Ok(()));
};
let page = btree_read_page(pager, page_idx)?;
return_if_locked_maybe_load!(pager, page);
state.page_stack.pop();
let page = page.get();
let contents = page.get_contents();
let usable_space = pager.usable_space() as u16;
let mut coverage_checker = CoverageChecker::new(page.get().id);
// Now we check every cell for few things:
// 1. Check cell is in correct range. Not exceeds page and not starts before we have marked
// (cell content area).
// 2. We add the cell to coverage checker in order to check if cells do not overlap.
// 3. We check order of rowids in case of table pages. We iterate backwards in order to check
// if current cell's rowid is less than the next cell. We also check rowid is less than the
// parent's divider cell. In case of this page being root page max rowid will be i64::MAX.
// 4. We append pages to the stack to check later.
// 5. In case of leaf page, check if the current level(depth) is equal to other leaf pages we
// have seen.
let mut next_rowid = max_intkey;
for cell_idx in (0..contents.cell_count()).rev() {
let (cell_start, cell_length) = contents.cell_get_raw_region(
cell_idx,
payload_overflow_threshold_max(contents.page_type(), usable_space),
payload_overflow_threshold_min(contents.page_type(), usable_space),
usable_space as usize,
);
if cell_start < contents.cell_content_area() as usize
|| cell_start > usable_space as usize - 4
{
let error_msg = format!("Cell {} in page {} is out of range. cell_range={}..{}, content_area={}, usable_space={}\n", cell_idx, page.get().id, cell_start, cell_start+cell_length, contents.cell_content_area(), usable_space);
message.write_str(&error_msg).unwrap();
*error_count += 1;
}
if cell_start + cell_length > usable_space as usize {
let error_msg = format!("Cell {} in page {} extends out of page. cell_range={}..{}, content_area={}, usable_space={}\n", cell_idx, page.get().id, cell_start, cell_start+cell_length, contents.cell_content_area(), usable_space);
message.write_str(&error_msg).unwrap();
*error_count += 1;
}
coverage_checker.add_cell(cell_start, cell_start + cell_length);
let cell = contents.cell_get(
cell_idx,
payload_overflow_threshold_max(contents.page_type(), usable_space),
payload_overflow_threshold_min(contents.page_type(), usable_space),
usable_space as usize,
)?;
match cell {
BTreeCell::TableInteriorCell(table_interior_cell) => {
state.page_stack.push(IntegrityCheckPageEntry {
page_idx: table_interior_cell._left_child_page as usize,
level: level + 1,
max_intkey: table_interior_cell._rowid,
});
let rowid = table_interior_cell._rowid;
if rowid > max_intkey || rowid > next_rowid {
let error_msg = format!("Page {} cell {} has rowid={} in wrong order. Parent cell has parent_rowid={} and next_rowid={}", page.get().id, cell_idx, rowid, max_intkey, next_rowid);
message.write_str(&error_msg).unwrap();
*error_count += 1;
}
next_rowid = rowid;
}
BTreeCell::TableLeafCell(table_leaf_cell) => {
// check depth of leaf pages are equal
if let Some(expected_leaf_level) = state.first_leaf_level {
if expected_leaf_level != level {
let error_msg = format!("Page {} is at different depth from another leaf page this_page_depth={}, other_page_depth={} ", page.get().id, level, expected_leaf_level);
message.write_str(&error_msg).unwrap();
*error_count += 1;
}
} else {
state.first_leaf_level = Some(level);
}
let rowid = table_leaf_cell._rowid;
if rowid > max_intkey || rowid > next_rowid {
let error_msg = format!("Page {} cell {} has rowid={} in wrong order. Parent cell has parent_rowid={} and next_rowid={}", page.get().id, cell_idx, rowid, max_intkey, next_rowid);
message.write_str(&error_msg).unwrap();
*error_count += 1;
}
next_rowid = rowid;
}
BTreeCell::IndexInteriorCell(index_interior_cell) => {
state.page_stack.push(IntegrityCheckPageEntry {
page_idx: index_interior_cell.left_child_page as usize,
level: level + 1,
max_intkey, // we don't care about intkey in non-table pages
});
}
BTreeCell::IndexLeafCell(_) => {
// check depth of leaf pages are equal
if let Some(expected_leaf_level) = state.first_leaf_level {
if expected_leaf_level != level {
let error_msg = format!("Page {} is at different depth from another leaf page this_page_depth={}, other_page_depth={} ", page.get().id, level, expected_leaf_level);
message.write_str(&error_msg).unwrap();
*error_count += 1;
}
} else {
state.first_leaf_level = Some(level);
}
}
}
}
// Now we add free blocks to the coverage checker
let first_freeblock = contents.first_freeblock();
if first_freeblock > 0 {
let mut pc = first_freeblock;
while pc > 0 {
let next = contents.read_u16_no_offset(pc as usize);
let size = contents.read_u16_no_offset(pc as usize + 2) as usize;
// check it doesn't go out of range
if pc > usable_space - 4 {
let error_msg = format!(
"Page {} detected freeblock that extends page start={} end={}",
page.get().id,
pc,
pc as usize + size
);
message.write_str(&error_msg).unwrap();
*error_count += 1;
break;
}
coverage_checker.add_free_block(pc as usize, pc as usize + size);
pc = next;
}
}
// Let's check the overlap of freeblocks and cells now that we have collected them all.
coverage_checker.analyze(
usable_space,
contents.cell_content_area() as usize,
message,
error_count,
contents.num_frag_free_bytes() as usize,
);
Ok(CursorResult::IO)
}
pub fn btree_read_page(pager: &Rc<Pager>, page_idx: usize) -> Result<BTreePage> {
pager.read_page(page_idx).map(|page| {
Arc::new(BTreePageInner {
page: RefCell::new(page),
})
})
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct IntegrityCheckCellRange {
start: usize,
end: usize,
is_free_block: bool,
}
// Implement ordering for min-heap (smallest start address first)
impl Ord for IntegrityCheckCellRange {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.start.cmp(&other.start)
}
}
impl PartialOrd for IntegrityCheckCellRange {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
#[cfg(debug_assertions)]
fn validate_cells_after_insertion(cell_array: &CellArray, leaf_data: bool) {
for cell in &cell_array.cells {
@@ -5120,6 +5341,73 @@ fn validate_cells_after_insertion(cell_array: &CellArray, leaf_data: bool) {
}
}
pub struct CoverageChecker {
/// Min-heap ordered by cell start
heap: BinaryHeap<Reverse<IntegrityCheckCellRange>>,
page_idx: usize,
}
impl CoverageChecker {
pub fn new(page_idx: usize) -> Self {
Self {
heap: BinaryHeap::new(),
page_idx,
}
}
fn add_range(&mut self, cell_start: usize, cell_end: usize, is_free_block: bool) {
self.heap.push(Reverse(IntegrityCheckCellRange {
start: cell_start,
end: cell_end,
is_free_block,
}));
}
pub fn add_cell(&mut self, cell_start: usize, cell_end: usize) {
self.add_range(cell_start, cell_end, false);
}
pub fn add_free_block(&mut self, cell_start: usize, cell_end: usize) {
self.add_range(cell_start, cell_end, true);
}
pub fn analyze(
&mut self,
usable_space: u16,
content_area: usize,
message: &mut String,
error_count: &mut usize,
expected_fragmentation: usize,
) {
let mut fragmentation = 0;
let mut prev_end = content_area;
while let Some(cell) = self.heap.pop() {
let start = cell.0.start;
if prev_end > start {
let error_msg = format!(
"Page {} cell overlap detected at position={} with previous_end={}. content_area={}, is_free_block={}",
self.page_idx, start, prev_end, content_area, cell.0.is_free_block
);
message.push_str(&error_msg);
*error_count += 1;
break;
} else {
fragmentation += start - prev_end;
prev_end = cell.0.end;
}
}
fragmentation += usable_space as usize - prev_end;
if fragmentation != expected_fragmentation {
let error_msg = format!(
"Page {} unexpected fragmentation got={}, expected={}",
self.page_idx, fragmentation, expected_fragmentation
);
message.push_str(&error_msg);
*error_count += 1;
}
}
}
/// Stack of pages representing the tree traversal order.
/// current_page represents the current page being used in the tree and current_page - 1 would be
/// the parent. Using current_page + 1 or higher is undefined behaviour.

View File

@@ -0,0 +1,32 @@
use crate::{
schema::Schema,
vdbe::{builder::ProgramBuilder, insn::Insn},
};
/// Maximum number of errors to report with integrity check. If we exceed this number we will short
/// circuit the procedure and return early to not waste time.
const MAX_INTEGRITY_CHECK_ERRORS: usize = 10;
pub fn translate_integrity_check(
schema: &Schema,
program: &mut ProgramBuilder,
) -> crate::Result<()> {
let mut root_pages = Vec::with_capacity(schema.tables.len() + schema.indexes.len());
// Collect root pages to run integrity check on
for table in schema.tables.values() {
if let crate::schema::Table::BTree(table) = table.as_ref() {
root_pages.push(table.root_page);
};
}
let message_register = program.alloc_register();
program.emit_insn(Insn::IntegrityCk {
max_errors: MAX_INTEGRITY_CHECK_ERRORS,
roots: root_pages,
message_register,
});
program.emit_insn(Insn::ResultRow {
start_reg: message_register,
count: 1,
});
Ok(())
}

View File

@@ -18,6 +18,7 @@ pub(crate) mod expr;
pub(crate) mod group_by;
pub(crate) mod index;
pub(crate) mod insert;
pub(crate) mod integrity_check;
pub(crate) mod main_loop;
pub(crate) mod optimizer;
pub(crate) mod order_by;

View File

@@ -18,6 +18,8 @@ use crate::{bail_parse_error, LimboError, Pager, Value};
use std::str::FromStr;
use strum::IntoEnumIterator;
use super::integrity_check::translate_integrity_check;
fn list_pragmas(program: &mut ProgramBuilder) {
for x in PragmaName::iter() {
let register = program.emit_string8_new_reg(x.to_string());
@@ -259,6 +261,7 @@ fn update_pragma(
});
Ok(())
}
PragmaName::IntegrityCheck => unreachable!("integrity_check cannot be set"),
}
}
@@ -392,6 +395,9 @@ fn query_pragma(
});
program.emit_result_row(register, 1);
}
PragmaName::IntegrityCheck => {
translate_integrity_check(schema, program)?;
}
}
Ok(())

View File

@@ -2,6 +2,7 @@
use crate::function::AlterTableFunc;
use crate::numeric::{NullableInteger, Numeric};
use crate::schema::Schema;
use crate::storage::btree::{integrity_check, IntegrityCheckState};
use crate::storage::database::FileMemoryStorage;
use crate::storage::page_cache::DumbLruPageCache;
use crate::storage::pager::CreateBTreeFlags;
@@ -23,6 +24,7 @@ use crate::{
},
types::compare_immutable,
};
use std::fmt::Write;
use std::{borrow::BorrowMut, rc::Rc, sync::Arc};
use crate::{pseudo::PseudoCursor, result::LimboResult};
@@ -5402,6 +5404,75 @@ pub fn op_count(
Ok(InsnFunctionStepResult::Step)
}
#[derive(Debug)]
pub enum OpIntegrityCheckState {
Start,
Checking {
error_count: usize,
message: String,
current_root_idx: usize,
state: IntegrityCheckState,
},
}
pub fn op_integrity_check(
program: &Program,
state: &mut ProgramState,
insn: &Insn,
pager: &Rc<Pager>,
mv_store: Option<&Rc<MvStore>>,
) -> Result<InsnFunctionStepResult> {
let Insn::IntegrityCk {
max_errors,
roots,
message_register,
} = insn
else {
unreachable!("unexpected Insn {:?}", insn)
};
match &mut state.op_integrity_check_state {
OpIntegrityCheckState::Start => {
state.op_integrity_check_state = OpIntegrityCheckState::Checking {
error_count: 0,
message: String::new(),
current_root_idx: 0,
state: IntegrityCheckState::new(roots[0]),
};
}
OpIntegrityCheckState::Checking {
error_count,
message,
current_root_idx,
state: integrity_check_state,
} => {
return_if_io!(integrity_check(
integrity_check_state,
error_count,
message,
pager
));
*current_root_idx += 1;
if *current_root_idx < roots.len() {
*integrity_check_state = IntegrityCheckState::new(roots[*current_root_idx]);
return Ok(InsnFunctionStepResult::Step);
} else {
if *error_count == 0 {
message.write_str("ok").map_err(|err| {
LimboError::InternalError(format!(
"error appending message to integrity check {:?}",
err
))
})?;
}
state.registers[*message_register] = Register::Value(Value::build_text(message));
state.op_integrity_check_state = OpIntegrityCheckState::Start;
state.pc += 1;
}
}
}
Ok(InsnFunctionStepResult::Step)
}
impl Value {
pub fn exec_lower(&self) -> Option<Self> {
match self {

View File

@@ -1590,6 +1590,19 @@ pub fn insn_to_str(
0,
format!("r[{}]={}", *out_reg, *value),
),
Insn::IntegrityCk {
max_errors,
roots,
message_register,
} => (
"IntegrityCk",
*max_errors as i32,
0,
0,
Value::build_text(""),
0,
format!("roots={:?} message_register={}", roots, message_register),
),
};
format!(
"{:<4} {:<17} {:<4} {:<4} {:<4} {:<13} {:<2} {}",

View File

@@ -936,6 +936,18 @@ pub enum Insn {
target_reg: usize,
exact: bool,
},
/// Do an analysis of the currently open database. Store in register (P1+1) the text of an error message describing any problems.
/// If no problems are found, store a NULL in register (P1+1).
/// The register (P1) contains one less than the maximum number of allowed errors.
/// At most reg(P1) errors will be reported. In other words, the analysis stops as soon as reg(P1) errors are seen.
/// Reg(P1) is updated with the number of errors remaining. The root page numbers of all tables in the database are integers
/// stored in P4_INTARRAY argument. If P5 is not zero, the check is done on the auxiliary database file, not the main database file. This opcode is used to implement the integrity_check pragma.
IntegrityCk {
max_errors: usize,
roots: Vec<usize>,
message_register: usize,
},
}
impl Insn {
@@ -1060,6 +1072,7 @@ impl Insn {
Insn::Affinity { .. } => execute::op_affinity,
Insn::IdxDelete { .. } => execute::op_idx_delete,
Insn::Count { .. } => execute::op_count,
Insn::IntegrityCk { .. } => execute::op_integrity_check,
}
}
}

View File

@@ -43,7 +43,7 @@ use crate::{
use crate::json::JsonCacheCell;
use crate::{Connection, MvStore, Result, TransactionState};
use builder::CursorKey;
use execute::{InsnFunction, InsnFunctionStepResult, OpIdxDeleteState};
use execute::{InsnFunction, InsnFunctionStepResult, OpIdxDeleteState, OpIntegrityCheckState};
use rand::{
distributions::{Distribution, Uniform},
@@ -248,6 +248,7 @@ pub struct ProgramState {
#[cfg(feature = "json")]
json_cache: JsonCacheCell,
op_idx_delete_state: Option<OpIdxDeleteState>,
op_integrity_check_state: OpIntegrityCheckState,
}
impl ProgramState {
@@ -272,6 +273,7 @@ impl ProgramState {
#[cfg(feature = "json")]
json_cache: JsonCacheCell::new(),
op_idx_delete_state: None,
op_integrity_check_state: OpIntegrityCheckState::Start,
}
}

View File

@@ -36,3 +36,4 @@ source $testdir/null.test
source $testdir/create_table.test
source $testdir/collate.test
source $testdir/values.test
source $testdir/integrity_check.test

View File

@@ -0,0 +1,7 @@
set testdir [file dirname $argv0]
source $testdir/tester.tcl
do_execsql_test integrity-check {
PRAGMA integrity_check;
} {ok}

View File

@@ -1730,6 +1730,8 @@ pub enum PragmaName {
AutoVacuum,
/// `cache_size` pragma
CacheSize,
/// Run integrity check on the database file
IntegrityCheck,
/// `journal_mode` pragma
JournalMode,
/// Noop as per SQLite docs