diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 7bed4b521..688e6f63c 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -26,7 +26,7 @@ use std::{ cell::{Cell, Ref, RefCell}, cmp::{Ordering, Reverse}, collections::BinaryHeap, - fmt::{Debug, Write}, + fmt::Debug, pin::Pin, rc::Rc, sync::Arc, @@ -5106,6 +5106,62 @@ impl BTreeCursor { } } +#[derive(Debug, thiserror::Error)] +pub enum IntegrityCheckError { + #[error("Cell {cell_idx} in page {page_id} is out of range. cell_range={cell_start}..{cell_end}, content_area={content_area}, usable_space={usable_space}")] + CellOutOfRange { + cell_idx: usize, + page_id: usize, + cell_start: usize, + cell_end: usize, + content_area: usize, + usable_space: usize, + }, + #[error("Cell {cell_idx} in page {page_id} extends out of page. cell_range={cell_start}..{cell_end}, content_area={content_area}, usable_space={usable_space}")] + CellOverflowsPage { + cell_idx: usize, + page_id: usize, + cell_start: usize, + cell_end: usize, + content_area: usize, + usable_space: usize, + }, + #[error("Page {page_id} cell {cell_idx} has rowid={rowid} in wrong order. Parent cell has parent_rowid={max_intkey} and next_rowid={next_rowid}")] + CellRowidOutOfRange { + page_id: usize, + cell_idx: usize, + rowid: i64, + max_intkey: i64, + next_rowid: i64, + }, + #[error("Page {page_id} is at different depth from another leaf page this_page_depth={this_page_depth}, other_page_depth={other_page_depth} ")] + LeafDepthMismatch { + page_id: usize, + this_page_depth: usize, + other_page_depth: usize, + }, + #[error("Page {page_id} detected freeblock that extends page start={start} end={end}")] + FreeBlockOutOfRange { + page_id: usize, + start: usize, + end: usize, + }, + #[error("Page {page_id} cell overlap detected at position={start} with previous_end={prev_end}. content_area={content_area}, is_free_block={is_free_block}")] + CellOverlap { + page_id: usize, + start: usize, + prev_end: usize, + content_area: usize, + is_free_block: bool, + }, + #[error("Page {page_id} unexpected fragmentation got={got}, expected={expected}")] + UnexpectedFragmentation { + page_id: usize, + got: usize, + expected: usize, + }, +} + #[derive(Clone)] struct IntegrityCheckPageEntry { page_idx: usize, @@ -5152,8 +5208,7 @@ impl std::fmt::Debug for IntegrityCheckState { /// depth. pub fn integrity_check( state: &mut IntegrityCheckState, - error_count: &mut usize, - message: &mut String, + errors: &mut Vec, pager: &Rc, ) -> Result> { let Some(IntegrityCheckPageEntry { @@ -5194,14 +5249,24 @@ pub fn integrity_check( if cell_start < contents.cell_content_area() as usize || cell_start > usable_space as usize - 4 { - let error_msg = format!("Cell {} in page {} is out of range. cell_range={}..{}, content_area={}, usable_space={}\n", cell_idx, page.get().id, cell_start, cell_start+cell_length, contents.cell_content_area(), usable_space); - message.write_str(&error_msg).unwrap(); - *error_count += 1; + errors.push(IntegrityCheckError::CellOutOfRange { + cell_idx, + page_id: page.get().id, + cell_start, + cell_end: cell_start + cell_length, + content_area: contents.cell_content_area() as usize, + usable_space: usable_space as usize, + }); } if cell_start + cell_length > usable_space as usize { - let error_msg = format!("Cell {} in page {} extends out of page. cell_range={}..{}, content_area={}, usable_space={}\n", cell_idx, page.get().id, cell_start, cell_start+cell_length, contents.cell_content_area(), usable_space); - message.write_str(&error_msg).unwrap(); - *error_count += 1; + errors.push(IntegrityCheckError::CellOverflowsPage { + cell_idx, + page_id: page.get().id, + cell_start, + cell_end: cell_start + cell_length, + content_area: contents.cell_content_area() as usize, + usable_space: usable_space as usize, + }); } coverage_checker.add_cell(cell_start, cell_start + cell_length); let cell = contents.cell_get( @@ -5219,9 +5284,13 @@ pub fn integrity_check( }); let rowid = table_interior_cell._rowid; if rowid > max_intkey || rowid > next_rowid { - let error_msg = format!("Page {} cell {} has rowid={} in wrong order. Parent cell has parent_rowid={} and next_rowid={}", page.get().id, cell_idx, rowid, max_intkey, next_rowid); - message.write_str(&error_msg).unwrap(); - *error_count += 1; + errors.push(IntegrityCheckError::CellRowidOutOfRange { + page_id: page.get().id, + cell_idx, + rowid, + max_intkey, + next_rowid, + }); } next_rowid = rowid; } @@ -5229,18 +5298,24 @@ pub fn integrity_check( // check depth of leaf pages are equal if let Some(expected_leaf_level) = state.first_leaf_level { if expected_leaf_level != level { - let error_msg = format!("Page {} is at different depth from another leaf page this_page_depth={}, other_page_depth={} ", page.get().id, level, expected_leaf_level); - message.write_str(&error_msg).unwrap(); - *error_count += 1; + errors.push(IntegrityCheckError::LeafDepthMismatch { + page_id: page.get().id, + this_page_depth: level, + other_page_depth: expected_leaf_level, + }); } } else { state.first_leaf_level = Some(level); } let rowid = table_leaf_cell._rowid; if rowid > max_intkey || rowid > next_rowid { - let error_msg = format!("Page {} cell {} has rowid={} in wrong order. Parent cell has parent_rowid={} and next_rowid={}", page.get().id, cell_idx, rowid, max_intkey, next_rowid); - message.write_str(&error_msg).unwrap(); - *error_count += 1; + errors.push(IntegrityCheckError::CellRowidOutOfRange { + page_id: page.get().id, + cell_idx, + rowid, + max_intkey, + next_rowid, + }); } next_rowid = rowid; } @@ -5255,9 +5330,11 @@ pub fn integrity_check( // check depth of leaf pages are equal if let Some(expected_leaf_level) = state.first_leaf_level { if expected_leaf_level != level { - let error_msg = format!("Page {} is at different depth from another leaf page this_page_depth={}, other_page_depth={} ", page.get().id, level, expected_leaf_level); - message.write_str(&error_msg).unwrap(); - *error_count += 1; + errors.push(IntegrityCheckError::LeafDepthMismatch { + page_id: page.get().id, + this_page_depth: level, + other_page_depth: expected_leaf_level, + }); } } else { state.first_leaf_level = Some(level); @@ -5275,14 +5352,11 @@ pub fn integrity_check( let size = contents.read_u16_no_offset(pc as usize + 2) as usize; // check it doesn't go out of range if pc > usable_space - 4 { - let error_msg = format!( - "Page {} detected freeblock that extends page start={} end={}", - page.get().id, - pc, - pc as usize + size - ); - message.write_str(&error_msg).unwrap(); - *error_count += 1; + errors.push(IntegrityCheckError::FreeBlockOutOfRange { + page_id: page.get().id, + start: pc as usize, + end: pc as usize + size, + }); break; } coverage_checker.add_free_block(pc as usize, pc as usize + size); @@ -5294,8 +5368,7 @@ pub fn integrity_check( coverage_checker.analyze( usable_space, contents.cell_content_area() as usize, - message, - error_count, + errors, contents.num_frag_free_bytes() as usize, ); @@ -5375,8 +5448,7 @@ impl CoverageChecker { &mut self, usable_space: u16, content_area: usize, - message: &mut String, - error_count: &mut usize, + errors: &mut Vec, expected_fragmentation: usize, ) { let mut fragmentation = 0; @@ -5384,12 +5456,13 @@ impl CoverageChecker { while let Some(cell) = self.heap.pop() { let start = cell.0.start; if prev_end > start { - let error_msg = format!( - "Page {} cell overlap detected at position={} with previous_end={}. content_area={}, is_free_block={}", - self.page_idx, start, prev_end, content_area, cell.0.is_free_block - ); - message.push_str(&error_msg); - *error_count += 1; + errors.push(IntegrityCheckError::CellOverlap { + page_id: self.page_idx, + start, + prev_end, + content_area, + is_free_block: cell.0.is_free_block, + }); break; } else { fragmentation += start - prev_end; @@ -5398,12 +5471,11 @@ impl CoverageChecker { } fragmentation += usable_space as usize - prev_end; if fragmentation != expected_fragmentation { - let error_msg = format!( - "Page {} unexpected fragmentation got={}, expected={}", - self.page_idx, fragmentation, expected_fragmentation - ); - message.push_str(&error_msg); - *error_count += 1; + errors.push(IntegrityCheckError::UnexpectedFragmentation { + page_id: self.page_idx, + got: fragmentation, + expected: expected_fragmentation, + }); } } } diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 36258c407..635ebf101 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -2,7 +2,7 @@ use crate::function::AlterTableFunc; use crate::numeric::{NullableInteger, Numeric}; use crate::schema::Schema; -use crate::storage::btree::{integrity_check, IntegrityCheckState}; +use crate::storage::btree::{integrity_check, IntegrityCheckError, IntegrityCheckState}; use crate::storage::database::FileMemoryStorage; use crate::storage::page_cache::DumbLruPageCache; use crate::storage::pager::CreateBTreeFlags; @@ -24,7 +24,6 @@ use crate::{ }, types::compare_immutable, }; -use std::fmt::Write; use std::{borrow::BorrowMut, rc::Rc, sync::Arc}; use crate::{pseudo::PseudoCursor, result::LimboResult}; @@ -5408,8 +5407,7 @@ pub fn op_count( pub enum OpIntegrityCheckState { Start, Checking { - error_count: usize, - message: String, + errors: Vec, current_root_idx: usize, state: IntegrityCheckState, }, @@ -5432,37 +5430,31 @@ pub fn op_integrity_check( match &mut state.op_integrity_check_state { OpIntegrityCheckState::Start => { state.op_integrity_check_state = OpIntegrityCheckState::Checking { - error_count: 0, - message: String::new(), + errors: Vec::new(), current_root_idx: 0, state: IntegrityCheckState::new(roots[0]), }; } OpIntegrityCheckState::Checking { - error_count, - message, + errors, current_root_idx, state: integrity_check_state, } => { - return_if_io!(integrity_check( - integrity_check_state, - error_count, - message, - pager - )); + return_if_io!(integrity_check(integrity_check_state, errors, pager)); *current_root_idx += 1; if *current_root_idx < roots.len() { *integrity_check_state = IntegrityCheckState::new(roots[*current_root_idx]); return Ok(InsnFunctionStepResult::Step); } else { - if *error_count == 0 { - message.write_str("ok").map_err(|err| { - LimboError::InternalError(format!( - "error appending message to integrity check {:?}", - err - )) - })?; - } + let message = if errors.is_empty() { + "ok".to_string() + } else { + errors + .iter() + .map(|e| e.to_string()) + .collect::>() + .join("\n") + }; state.registers[*message_register] = Register::Value(Value::build_text(message)); state.op_integrity_check_state = OpIntegrityCheckState::Start; state.pc += 1;