add loop left pointer validation

This commit is contained in:
Pere Diaz Bou
2025-04-07 17:55:50 +02:00
parent 6ac2368ae2
commit f137ddfdf8

View File

@@ -2,8 +2,7 @@ use tracing::debug;
use crate::storage::pager::Pager;
use crate::storage::sqlite3_ondisk::{
read_btree_cell, read_u32, read_varint, BTreeCell, PageContent, PageType, TableInteriorCell,
TableLeafCell,
read_u32, read_varint, BTreeCell, PageContent, PageType, TableInteriorCell, TableLeafCell,
};
use crate::MvCursor;
@@ -14,6 +13,7 @@ use crate::{return_corrupt, LimboError, Result};
use std::cell::{Cell, Ref, RefCell};
use std::cmp::Ordering;
#[cfg(debug_assertions)]
use std::collections::HashSet;
use std::pin::Pin;
use std::rc::Rc;
@@ -2023,6 +2023,7 @@ impl BTreeCursor {
#[cfg(debug_assertions)]
self.post_balance_non_root_validation(
&parent_page,
balance_info,
parent_contents,
pages_to_balance_new,
@@ -2047,8 +2048,10 @@ impl BTreeCursor {
result
}
#[cfg(debug_assertions)]
fn post_balance_non_root_validation(
&self,
parent_page: &PageRef,
balance_info: &mut BalanceInfo,
parent_contents: &mut PageContent,
pages_to_balance_new: Vec<Arc<crate::Page>>,
@@ -2060,6 +2063,45 @@ impl BTreeCursor {
) {
let mut valid = true;
let mut current_index_cell = 0;
for cell_idx in 0..parent_contents.cell_count() {
let cell = parent_contents
.cell_get(
cell_idx,
payload_overflow_threshold_max(
parent_contents.page_type(),
self.usable_space() as u16,
),
payload_overflow_threshold_min(
parent_contents.page_type(),
self.usable_space() as u16,
),
self.usable_space(),
)
.unwrap();
match cell {
BTreeCell::TableInteriorCell(table_interior_cell) => {
let left_child_page = table_interior_cell._left_child_page;
if left_child_page == parent_page.get().id as u32 {
tracing::error!("balance_non_root(parent_divider_points_to_same_page, page_id={}, cell_left_child_page={})",
parent_page.get().id,
left_child_page,
);
valid = false;
}
}
BTreeCell::IndexInteriorCell(index_interior_cell) => {
let left_child_page = index_interior_cell.left_child_page;
if left_child_page == parent_page.get().id as u32 {
tracing::error!("balance_non_root(parent_divider_points_to_same_page, page_id={}, cell_left_child_page={})",
parent_page.get().id,
left_child_page,
);
valid = false;
}
}
_ => {}
}
}
// Let's now make a in depth check that we in fact added all possible cells somewhere and they are not lost
for (page_idx, page) in pages_to_balance_new.iter().enumerate() {
let contents = page.get_contents();
@@ -2078,7 +2120,7 @@ impl BTreeCursor {
self.usable_space(),
);
let buf = contents.as_ptr();
let cell_buf = &buf[cell_start..cell_start + cell_len];
let cell_buf = to_static_buf(&mut buf[cell_start..cell_start + cell_len]);
let cell_buf_in_array = &cells_debug[current_index_cell];
if cell_buf != cell_buf_in_array {
tracing::error!("balance_non_root(cell_not_found_debug, page_id={}, cell_in_cell_array_idx={})",
@@ -2087,6 +2129,63 @@ impl BTreeCursor {
);
valid = false;
}
let cell = crate::storage::sqlite3_ondisk::read_btree_cell(
cell_buf,
&page_type,
0,
payload_overflow_threshold_max(
parent_contents.page_type(),
self.usable_space() as u16,
),
payload_overflow_threshold_min(
parent_contents.page_type(),
self.usable_space() as u16,
),
self.usable_space(),
)
.unwrap();
match &cell {
BTreeCell::TableInteriorCell(table_interior_cell) => {
let left_child_page = table_interior_cell._left_child_page;
if left_child_page == page.get().id as u32 {
tracing::error!("balance_non_root(child_page_points_same_page, page_id={}, cell_left_child_page={}, page_idx={})",
page.get().id,
left_child_page,
page_idx
);
valid = false;
}
if left_child_page == parent_page.get().id as u32 {
tracing::error!("balance_non_root(child_page_points_parent_of_child, page_id={}, cell_left_child_page={}, page_idx={})",
page.get().id,
left_child_page,
page_idx
);
valid = false;
}
}
BTreeCell::IndexInteriorCell(index_interior_cell) => {
let left_child_page = index_interior_cell.left_child_page;
if left_child_page == page.get().id as u32 {
tracing::error!("balance_non_root(child_page_points_same_page, page_id={}, cell_left_child_page={}, page_idx={})",
page.get().id,
left_child_page,
page_idx
);
valid = false;
}
if left_child_page == parent_page.get().id as u32 {
tracing::error!("balance_non_root(child_page_points_parent_of_child, page_id={}, cell_left_child_page={}, page_idx={})",
page.get().id,
left_child_page,
page_idx
);
valid = false;
}
}
_ => {}
}
current_index_cell += 1;
}
// Now check divider cells and their pointers.
@@ -4637,7 +4736,6 @@ mod tests {
let mut cursor = BTreeCursor::new(None, pager.clone(), root_page);
let mut keys = Vec::new();
let seed = rng.next_u64();
let seed = 1743883058;
tracing::info!("seed: {}", seed);
let mut rng = ChaCha8Rng::seed_from_u64(seed);
for insert_id in 0..inserts {
@@ -4680,6 +4778,7 @@ mod tests {
pager.deref(),
)
.unwrap();
// FIXME: add sorted vector instead, should be okay for small amounts of keys for now :P, too lazy to fix right now
keys.sort();
cursor.move_to_root();
let mut valid = true;
@@ -4711,7 +4810,6 @@ mod tests {
keys.sort();
cursor.move_to_root();
for key in keys.iter() {
let seek_key = SeekKey::TableRowId(*key as u64);
tracing::trace!("seeking key: {}", key);
run_until_done(|| cursor.next(), pager.deref()).unwrap();
let cursor_rowid = cursor.rowid().unwrap().unwrap();