Merge 'Validate cells inside a page after each operation' from Pere Diaz Bou

We need to ensure an operation doesn't transform the cells inside a page
to an invalid state. In debug mode we can enable a
`debug_validate_cells` with `#[cfg(debug_assertions)]` so that it is
skipped on release mode. This is good to catch things early instead of
seeing the page in an incredibly weird state some state in the future.

Closes #1222
This commit is contained in:
Pekka Enberg
2025-04-01 17:31:08 +03:00
2 changed files with 111 additions and 6 deletions

View File

@@ -76,6 +76,16 @@ macro_rules! return_if_locked {
}};
}
/// Validate cells in a page are in a valid state. Only in debug mode.
macro_rules! debug_validate_cells {
($page_contents:expr, $usable_space:expr) => {
#[cfg(debug_assertions)]
{
debug_validate_cells_core($page_contents, $usable_space);
}
};
}
/// State machine of destroy operations
/// Keep track of traversal so that it can be resumed when IO is encountered
#[derive(Debug, Clone)]
@@ -1292,6 +1302,7 @@ impl BTreeCursor {
let current_sibling = sibling_pointer;
for i in (0..=current_sibling).rev() {
let page = self.pager.read_page(pgno as usize)?;
debug_validate_cells!(&page.get_contents(), self.usable_space() as u16);
pages_to_balance.push(page);
assert_eq!(
parent_contents.overflow_cells.len(),
@@ -1327,6 +1338,16 @@ impl BTreeCursor {
}
// Reverse in order to keep the right order
pages_to_balance.reverse();
#[cfg(debug_assertions)]
{
let page_type_of_siblings = pages_to_balance[0].get_contents().page_type();
for page in &pages_to_balance {
let contents = page.get_contents();
debug_validate_cells!(&contents, self.usable_space() as u16);
assert_eq!(contents.page_type(), page_type_of_siblings);
}
}
self.state
.write_info()
.unwrap()
@@ -1371,6 +1392,7 @@ impl BTreeCursor {
sibling_page.set_dirty();
self.pager.add_dirty(sibling_page.get().id);
max_cells += sibling_contents.cell_count();
max_cells += sibling_contents.overflow_cells.len();
if i == 0 {
// we don't have left sibling from this one so we break
break;
@@ -1391,6 +1413,7 @@ impl BTreeCursor {
);
let buf = parent_contents.as_ptr();
let cell_buf = &buf[cell_start..cell_start + cell_len];
max_cells += 1;
// TODO(pere): make this reference and not copy
balance_info.divider_cells.push(cell_buf.to_vec());
@@ -1413,6 +1436,7 @@ impl BTreeCursor {
cells: Vec::with_capacity(max_cells),
number_of_cells_per_page: Vec::new(),
};
let cells_capacity_start = cell_array.cells.capacity();
let mut total_cells_inserted = 0;
// count_cells_in_old_pages is the prefix sum of cells of each page
@@ -1423,6 +1447,7 @@ impl BTreeCursor {
let leaf = matches!(page_type, PageType::TableLeaf | PageType::IndexLeaf);
for (i, old_page) in balance_info.pages_to_balance.iter().enumerate() {
let old_page_contents = old_page.get_contents();
debug_validate_cells!(&old_page_contents, self.usable_space() as u16);
for cell_idx in 0..old_page_contents.cell_count() {
let (cell_start, cell_len) = old_page_contents.cell_get_raw_region(
cell_idx,
@@ -1471,6 +1496,21 @@ impl BTreeCursor {
total_cells_inserted += cells_inserted;
}
assert_eq!(
cell_array.cells.capacity(),
cells_capacity_start,
"calculation of max cells was wrong"
);
#[cfg(debug_assertions)]
{
for cell in &cell_array.cells {
assert!(cell.len() >= 4);
if leaf_data {
assert!(cell[0] != 0, "payload is {:?}", cell);
}
}
}
// calculate how many pages to allocate
let mut new_page_sizes = Vec::new();
let mut k = 0;
@@ -1685,6 +1725,7 @@ impl BTreeCursor {
.write_u32(PAGE_HEADER_OFFSET_RIGHTMOST_PTR, right_pointer);
}
// TODO: pointer map update (vacuum support)
// Update divider cells in parent
for (i, page) in pages_to_balance_new
.iter()
.enumerate()
@@ -1769,6 +1810,7 @@ impl BTreeCursor {
&cell_array,
self.usable_space() as u16,
)?;
debug_validate_cells!(page, self.usable_space() as u16);
tracing::trace!(
"edit_page page={} cells={}",
pages_to_balance_new[page_idx].get().id,
@@ -1813,9 +1855,10 @@ impl BTreeCursor {
let child = self.pager.do_allocate_page(root_contents.page_type(), 0);
tracing::debug!(
"Balancing root. root={}, rightmost={}",
"balance_root(root={}, rightmost={}, page_type={:?})",
root.get().id,
child.get().id
child.get().id,
root.get_contents().page_type()
);
self.pager.add_dirty(root.get().id);
@@ -2955,6 +2998,7 @@ fn edit_page(
let end_new_cells = start_new_cells + number_new_cells;
let mut count_cells = page.cell_count();
if start_old_cells < start_new_cells {
debug_validate_cells!(page, usable_space);
let number_to_shift = page_free_array(
page,
start_old_cells,
@@ -2970,8 +3014,10 @@ fn edit_page(
start,
);
count_cells -= number_to_shift;
debug_validate_cells!(page, usable_space);
}
if end_new_cells < end_old_cells {
debug_validate_cells!(page, usable_space);
let number_tail_removed = page_free_array(
page,
end_new_cells,
@@ -2981,6 +3027,7 @@ fn edit_page(
)?;
assert!(count_cells >= number_tail_removed);
count_cells -= number_tail_removed;
debug_validate_cells!(page, usable_space);
}
// TODO: make page_free_array defragment, for now I'm lazy so this will work for now.
defragment_page(page, usable_space);
@@ -2991,6 +3038,7 @@ fn edit_page(
count_cells += count;
}
// TODO: overflow cells
debug_validate_cells!(page, usable_space);
for i in 0..page.overflow_cells.len() {
let overflow_cell = &page.overflow_cells[i];
// cell index in context of new list of cells that should be in the page
@@ -3009,6 +3057,7 @@ fn edit_page(
}
}
}
debug_validate_cells!(page, usable_space);
// TODO: append cells to end
page_insert_array(
page,
@@ -3018,6 +3067,7 @@ fn edit_page(
count_cells,
usable_space,
)?;
debug_validate_cells!(page, usable_space);
// TODO: noverflow
page.write_u16(PAGE_HEADER_OFFSET_CELL_COUNT, number_new_cells as u16);
Ok(())
@@ -3065,9 +3115,17 @@ fn page_insert_array(
) -> Result<()> {
// TODO: implement faster algorithm, this is doing extra work that's not needed.
// See pageInsertArray to understand faster way.
tracing::debug!("page_insert_array {}..{}", first, first + count);
tracing::debug!(
"page_insert_array(cell_array.cells={}..{}, cell_count={}, page_type={:?})",
first,
first + count,
page.cell_count(),
page.page_type()
);
for i in first..first + count {
debug_validate_cells!(page, usable_space);
insert_into_cell(page, cell_array.cells[i], start_insert, usable_space)?;
debug_validate_cells!(page, usable_space);
start_insert += 1;
}
Ok(())
@@ -3175,6 +3233,7 @@ fn free_cell_range(
/// Defragment a page. This means packing all the cells to the end of the page.
fn defragment_page(page: &PageContent, usable_space: u16) {
debug_validate_cells!(page, usable_space);
tracing::debug!("defragment_page");
let cloned_page = page.clone();
// TODO(pere): usable space should include offset probably
@@ -3231,6 +3290,25 @@ fn defragment_page(page: &PageContent, usable_space: u16) {
// set free block to 0, unused spaced can be retrieved from gap between cell pointer end and content start
page.write_u16(PAGE_HEADER_OFFSET_FIRST_FREEBLOCK, 0);
page.write_u8(PAGE_HEADER_OFFSET_FRAGMENTED_BYTES_COUNT, 0);
debug_validate_cells!(page, usable_space);
}
#[cfg(debug_assertions)]
/// Only enabled in debug mode, where we ensure that all cells are valid.
fn debug_validate_cells_core(page: &PageContent, usable_space: u16) {
for i in 0..page.cell_count() {
// println!("Debug function: i={}", i);
let (offset, size) = page.cell_get_raw_region(
i,
payload_overflow_threshold_max(page.page_type(), usable_space),
payload_overflow_threshold_min(page.page_type(), usable_space),
usable_space as usize,
);
if page.is_leaf() {
assert!(page.as_ptr()[offset] != 0);
}
assert!(size >= 4, "cell size should be at least 4 bytes idx={}", i);
}
}
/// Insert a record into a cell.
@@ -3263,7 +3341,11 @@ fn insert_into_cell(
}
let new_cell_data_pointer = allocate_cell_space(page, payload.len() as u16, usable_space)?;
tracing::debug!("insert_into_cell pc={}", new_cell_data_pointer);
tracing::trace!(
"insert_into_cell(idx={}, pc={})",
cell_idx,
new_cell_data_pointer
);
let buf = page.as_ptr();
// copy data
@@ -3288,6 +3370,7 @@ fn insert_into_cell(
// update cell count
let new_n_cells = (page.cell_count() + 1) as u16;
page.write_u16(PAGE_HEADER_OFFSET_CELL_COUNT, new_n_cells);
debug_validate_cells!(page, usable_space);
Ok(())
}
@@ -3551,6 +3634,7 @@ fn payload_overflow_threshold_min(_page_type: PageType, usable_space: u16) -> us
/// Drop a cell from a page.
/// This is done by freeing the range of bytes that the cell occupies.
fn drop_cell(page: &mut PageContent, cell_idx: usize, usable_space: u16) -> Result<()> {
debug_validate_cells!(page, usable_space);
let (cell_start, cell_len) = page.cell_get_raw_region(
cell_idx,
payload_overflow_threshold_max(page.page_type(), usable_space),
@@ -3566,6 +3650,7 @@ fn drop_cell(page: &mut PageContent, cell_idx: usize, usable_space: u16) -> Resu
page.write_u8(PAGE_HEADER_OFFSET_FRAGMENTED_BYTES_COUNT, 0);
}
page.write_u16(PAGE_HEADER_OFFSET_CELL_COUNT, page.cell_count() as u16 - 1);
debug_validate_cells!(page, usable_space);
Ok(())
}
@@ -3761,6 +3846,8 @@ mod tests {
let mut previous_key = None;
let mut valid = true;
let mut depth = None;
debug_validate_cells!(contents, pager.usable_space() as u16);
let mut child_pages = Vec::new();
for cell_idx in 0..contents.cell_count() {
let cell = contents
.cell_get(
@@ -3775,6 +3862,7 @@ mod tests {
BTreeCell::TableInteriorCell(TableInteriorCell {
_left_child_page, ..
}) => {
child_pages.push(pager.read_page(_left_child_page as usize).unwrap());
let (child_depth, child_valid) =
validate_btree(pager.clone(), _left_child_page as usize);
valid &= child_valid;
@@ -3812,6 +3900,18 @@ mod tests {
valid = false;
}
}
let first_page_type = child_pages.first().map(|p| p.get_contents().page_type());
if let Some(child_type) = first_page_type {
for page in child_pages.iter().skip(1) {
if page.get_contents().page_type() != child_type {
tracing::error!("child pages have different types");
valid = false;
}
}
}
if contents.rightmost_pointer().is_none() && contents.cell_count() == 0 {
valid = false;
}
(depth.unwrap(), valid)
}

View File

@@ -223,6 +223,11 @@ impl Pager {
pub fn do_allocate_page(&self, page_type: PageType, offset: usize) -> PageRef {
let page = self.allocate_page().unwrap();
crate::btree_init_page(&page, page_type, offset, self.usable_space() as u16);
tracing::debug!(
"do_allocate_page(id={}, page_type={:?})",
page.get().id,
page.get_contents().page_type()
);
page
}
@@ -262,11 +267,11 @@ impl Pager {
/// Reads a page from the database.
pub fn read_page(&self, page_idx: usize) -> Result<PageRef> {
tracing::debug!("read_page(page_idx = {})", page_idx);
tracing::trace!("read_page(page_idx = {})", page_idx);
let mut page_cache = self.page_cache.write();
let page_key = PageCacheKey::new(page_idx, Some(self.wal.borrow().get_max_frame()));
if let Some(page) = page_cache.get(&page_key) {
tracing::debug!("read_page(page_idx = {}) = cached", page_idx);
tracing::trace!("read_page(page_idx = {}) = cached", page_idx);
return Ok(page.clone());
}
let page = Arc::new(Page::new(page_idx));