diff --git a/core/storage/btree.rs b/core/storage/btree.rs index cabfa903b..02b616461 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -76,6 +76,16 @@ macro_rules! return_if_locked { }}; } +/// Validate cells in a page are in a valid state. Only in debug mode. +macro_rules! debug_validate_cells { + ($page_contents:expr, $usable_space:expr) => { + #[cfg(debug_assertions)] + { + debug_validate_cells_core($page_contents, $usable_space); + } + }; +} + /// State machine of destroy operations /// Keep track of traversal so that it can be resumed when IO is encountered #[derive(Debug, Clone)] @@ -1292,6 +1302,7 @@ impl BTreeCursor { let current_sibling = sibling_pointer; for i in (0..=current_sibling).rev() { let page = self.pager.read_page(pgno as usize)?; + debug_validate_cells!(&page.get_contents(), self.usable_space() as u16); pages_to_balance.push(page); assert_eq!( parent_contents.overflow_cells.len(), @@ -1327,6 +1338,16 @@ impl BTreeCursor { } // Reverse in order to keep the right order pages_to_balance.reverse(); + + #[cfg(debug_assertions)] + { + let page_type_of_siblings = pages_to_balance[0].get_contents().page_type(); + for page in &pages_to_balance { + let contents = page.get_contents(); + debug_validate_cells!(&contents, self.usable_space() as u16); + assert_eq!(contents.page_type(), page_type_of_siblings); + } + } self.state .write_info() .unwrap() @@ -1371,6 +1392,7 @@ impl BTreeCursor { sibling_page.set_dirty(); self.pager.add_dirty(sibling_page.get().id); max_cells += sibling_contents.cell_count(); + max_cells += sibling_contents.overflow_cells.len(); if i == 0 { // we don't have left sibling from this one so we break break; @@ -1391,6 +1413,7 @@ impl BTreeCursor { ); let buf = parent_contents.as_ptr(); let cell_buf = &buf[cell_start..cell_start + cell_len]; + max_cells += 1; // TODO(pere): make this reference and not copy balance_info.divider_cells.push(cell_buf.to_vec()); @@ -1413,6 +1436,7 @@ impl BTreeCursor { cells: Vec::with_capacity(max_cells), number_of_cells_per_page: Vec::new(), }; + let cells_capacity_start = cell_array.cells.capacity(); let mut total_cells_inserted = 0; // count_cells_in_old_pages is the prefix sum of cells of each page @@ -1423,6 +1447,7 @@ impl BTreeCursor { let leaf = matches!(page_type, PageType::TableLeaf | PageType::IndexLeaf); for (i, old_page) in balance_info.pages_to_balance.iter().enumerate() { let old_page_contents = old_page.get_contents(); + debug_validate_cells!(&old_page_contents, self.usable_space() as u16); for cell_idx in 0..old_page_contents.cell_count() { let (cell_start, cell_len) = old_page_contents.cell_get_raw_region( cell_idx, @@ -1471,6 +1496,21 @@ impl BTreeCursor { total_cells_inserted += cells_inserted; } + assert_eq!( + cell_array.cells.capacity(), + cells_capacity_start, + "calculation of max cells was wrong" + ); + #[cfg(debug_assertions)] + { + for cell in &cell_array.cells { + assert!(cell.len() >= 4); + + if leaf_data { + assert!(cell[0] != 0, "payload is {:?}", cell); + } + } + } // calculate how many pages to allocate let mut new_page_sizes = Vec::new(); let mut k = 0; @@ -1685,6 +1725,7 @@ impl BTreeCursor { .write_u32(PAGE_HEADER_OFFSET_RIGHTMOST_PTR, right_pointer); } // TODO: pointer map update (vacuum support) + // Update divider cells in parent for (i, page) in pages_to_balance_new .iter() .enumerate() @@ -1769,6 +1810,7 @@ impl BTreeCursor { &cell_array, self.usable_space() as u16, )?; + debug_validate_cells!(page, self.usable_space() as u16); tracing::trace!( "edit_page page={} cells={}", pages_to_balance_new[page_idx].get().id, @@ -1813,9 +1855,10 @@ impl BTreeCursor { let child = self.pager.do_allocate_page(root_contents.page_type(), 0); tracing::debug!( - "Balancing root. root={}, rightmost={}", + "balance_root(root={}, rightmost={}, page_type={:?})", root.get().id, - child.get().id + child.get().id, + root.get_contents().page_type() ); self.pager.add_dirty(root.get().id); @@ -2955,6 +2998,7 @@ fn edit_page( let end_new_cells = start_new_cells + number_new_cells; let mut count_cells = page.cell_count(); if start_old_cells < start_new_cells { + debug_validate_cells!(page, usable_space); let number_to_shift = page_free_array( page, start_old_cells, @@ -2970,8 +3014,10 @@ fn edit_page( start, ); count_cells -= number_to_shift; + debug_validate_cells!(page, usable_space); } if end_new_cells < end_old_cells { + debug_validate_cells!(page, usable_space); let number_tail_removed = page_free_array( page, end_new_cells, @@ -2981,6 +3027,7 @@ fn edit_page( )?; assert!(count_cells >= number_tail_removed); count_cells -= number_tail_removed; + debug_validate_cells!(page, usable_space); } // TODO: make page_free_array defragment, for now I'm lazy so this will work for now. defragment_page(page, usable_space); @@ -2991,6 +3038,7 @@ fn edit_page( count_cells += count; } // TODO: overflow cells + debug_validate_cells!(page, usable_space); for i in 0..page.overflow_cells.len() { let overflow_cell = &page.overflow_cells[i]; // cell index in context of new list of cells that should be in the page @@ -3009,6 +3057,7 @@ fn edit_page( } } } + debug_validate_cells!(page, usable_space); // TODO: append cells to end page_insert_array( page, @@ -3018,6 +3067,7 @@ fn edit_page( count_cells, usable_space, )?; + debug_validate_cells!(page, usable_space); // TODO: noverflow page.write_u16(PAGE_HEADER_OFFSET_CELL_COUNT, number_new_cells as u16); Ok(()) @@ -3065,9 +3115,17 @@ fn page_insert_array( ) -> Result<()> { // TODO: implement faster algorithm, this is doing extra work that's not needed. // See pageInsertArray to understand faster way. - tracing::debug!("page_insert_array {}..{}", first, first + count); + tracing::debug!( + "page_insert_array(cell_array.cells={}..{}, cell_count={}, page_type={:?})", + first, + first + count, + page.cell_count(), + page.page_type() + ); for i in first..first + count { + debug_validate_cells!(page, usable_space); insert_into_cell(page, cell_array.cells[i], start_insert, usable_space)?; + debug_validate_cells!(page, usable_space); start_insert += 1; } Ok(()) @@ -3175,6 +3233,7 @@ fn free_cell_range( /// Defragment a page. This means packing all the cells to the end of the page. fn defragment_page(page: &PageContent, usable_space: u16) { + debug_validate_cells!(page, usable_space); tracing::debug!("defragment_page"); let cloned_page = page.clone(); // TODO(pere): usable space should include offset probably @@ -3231,6 +3290,25 @@ fn defragment_page(page: &PageContent, usable_space: u16) { // set free block to 0, unused spaced can be retrieved from gap between cell pointer end and content start page.write_u16(PAGE_HEADER_OFFSET_FIRST_FREEBLOCK, 0); page.write_u8(PAGE_HEADER_OFFSET_FRAGMENTED_BYTES_COUNT, 0); + debug_validate_cells!(page, usable_space); +} + +#[cfg(debug_assertions)] +/// Only enabled in debug mode, where we ensure that all cells are valid. +fn debug_validate_cells_core(page: &PageContent, usable_space: u16) { + for i in 0..page.cell_count() { + // println!("Debug function: i={}", i); + let (offset, size) = page.cell_get_raw_region( + i, + payload_overflow_threshold_max(page.page_type(), usable_space), + payload_overflow_threshold_min(page.page_type(), usable_space), + usable_space as usize, + ); + if page.is_leaf() { + assert!(page.as_ptr()[offset] != 0); + } + assert!(size >= 4, "cell size should be at least 4 bytes idx={}", i); + } } /// Insert a record into a cell. @@ -3263,7 +3341,11 @@ fn insert_into_cell( } let new_cell_data_pointer = allocate_cell_space(page, payload.len() as u16, usable_space)?; - tracing::debug!("insert_into_cell pc={}", new_cell_data_pointer); + tracing::trace!( + "insert_into_cell(idx={}, pc={})", + cell_idx, + new_cell_data_pointer + ); let buf = page.as_ptr(); // copy data @@ -3288,6 +3370,7 @@ fn insert_into_cell( // update cell count let new_n_cells = (page.cell_count() + 1) as u16; page.write_u16(PAGE_HEADER_OFFSET_CELL_COUNT, new_n_cells); + debug_validate_cells!(page, usable_space); Ok(()) } @@ -3551,6 +3634,7 @@ fn payload_overflow_threshold_min(_page_type: PageType, usable_space: u16) -> us /// Drop a cell from a page. /// This is done by freeing the range of bytes that the cell occupies. fn drop_cell(page: &mut PageContent, cell_idx: usize, usable_space: u16) -> Result<()> { + debug_validate_cells!(page, usable_space); let (cell_start, cell_len) = page.cell_get_raw_region( cell_idx, payload_overflow_threshold_max(page.page_type(), usable_space), @@ -3566,6 +3650,7 @@ fn drop_cell(page: &mut PageContent, cell_idx: usize, usable_space: u16) -> Resu page.write_u8(PAGE_HEADER_OFFSET_FRAGMENTED_BYTES_COUNT, 0); } page.write_u16(PAGE_HEADER_OFFSET_CELL_COUNT, page.cell_count() as u16 - 1); + debug_validate_cells!(page, usable_space); Ok(()) } @@ -3761,6 +3846,8 @@ mod tests { let mut previous_key = None; let mut valid = true; let mut depth = None; + debug_validate_cells!(contents, pager.usable_space() as u16); + let mut child_pages = Vec::new(); for cell_idx in 0..contents.cell_count() { let cell = contents .cell_get( @@ -3775,6 +3862,7 @@ mod tests { BTreeCell::TableInteriorCell(TableInteriorCell { _left_child_page, .. }) => { + child_pages.push(pager.read_page(_left_child_page as usize).unwrap()); let (child_depth, child_valid) = validate_btree(pager.clone(), _left_child_page as usize); valid &= child_valid; @@ -3812,6 +3900,18 @@ mod tests { valid = false; } } + let first_page_type = child_pages.first().map(|p| p.get_contents().page_type()); + if let Some(child_type) = first_page_type { + for page in child_pages.iter().skip(1) { + if page.get_contents().page_type() != child_type { + tracing::error!("child pages have different types"); + valid = false; + } + } + } + if contents.rightmost_pointer().is_none() && contents.cell_count() == 0 { + valid = false; + } (depth.unwrap(), valid) } diff --git a/core/storage/pager.rs b/core/storage/pager.rs index ff40c4ab7..01bae1478 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -223,6 +223,11 @@ impl Pager { pub fn do_allocate_page(&self, page_type: PageType, offset: usize) -> PageRef { let page = self.allocate_page().unwrap(); crate::btree_init_page(&page, page_type, offset, self.usable_space() as u16); + tracing::debug!( + "do_allocate_page(id={}, page_type={:?})", + page.get().id, + page.get_contents().page_type() + ); page } @@ -262,11 +267,11 @@ impl Pager { /// Reads a page from the database. pub fn read_page(&self, page_idx: usize) -> Result { - tracing::debug!("read_page(page_idx = {})", page_idx); + tracing::trace!("read_page(page_idx = {})", page_idx); let mut page_cache = self.page_cache.write(); let page_key = PageCacheKey::new(page_idx, Some(self.wal.borrow().get_max_frame())); if let Some(page) = page_cache.get(&page_key) { - tracing::debug!("read_page(page_idx = {}) = cached", page_idx); + tracing::trace!("read_page(page_idx = {}) = cached", page_idx); return Ok(page.clone()); } let page = Arc::new(Page::new(page_idx));