btree: modified the btree_destroy subroutine

modified the btree_destroy subroutine to do an iterative DFS and use the stack cell counters to keep track of whether children have been removed. adds a unit test.
This commit is contained in:
Zaid Humayun
2025-02-09 18:34:17 +05:30
parent dc2bb7cb9b
commit e0105398a6

View File

@@ -2236,6 +2236,20 @@ impl BTreeCursor {
Ok(Some(n_overflow))
}
/// Destroys a B-tree by freeing all its pages in an iterative depth-first order.
/// This ensures child pages are freed before their parents
///
/// # Example
/// For a B-tree with this structure:
/// ```text
/// 1 (root)
/// / \
/// 2 3
/// / \ / \
/// 4 5 6 7
/// ```
///
/// The destruction order would be: [4,5,2,6,7,3,1]
pub fn btree_destroy(&mut self) -> Result<CursorResult<()>> {
self.move_to_root();
@@ -2249,89 +2263,54 @@ impl BTreeCursor {
}
let contents = page.get().contents.as_ref().unwrap();
// TOOD: Uncomment after Krishvishal's PR https://github.com/tursodatabase/limbo/pull/785 merged
// let current_page_id = page.get().id;
let current_page_id = page.get().id;
// if this node is an interior cell, process all it's children first
if !contents.is_leaf() {
let mut has_unprocessed_children = false;
// Process all the regular cells first
for cell_idx in 0..contents.cell_count() {
let cell_idx = self.stack.current_cell_index();
// process interior nodes first
if cell_idx < contents.cell_count() as i32 {
let cell = contents.cell_get(
cell_idx,
cell_idx as usize,
Rc::clone(&self.pager),
self.payload_overflow_threshold_max(contents.page_type()),
self.payload_overflow_threshold_min(contents.page_type()),
self.usable_space(),
)?;
// load the actual page this cell points to
if let BTreeCell::TableInteriorCell(interior) = cell {
let child_page =
self.pager.read_page(interior._left_child_page as usize)?;
self.stack.advance();
self.stack.push(child_page);
has_unprocessed_children = true;
break;
continue;
}
}
if !has_unprocessed_children {
// process the right child
} else if cell_idx == contents.cell_count() as i32 {
if let Some(rightmost) = contents.rightmost_pointer() {
let rightmost_page = self.pager.read_page(rightmost as usize)?;
self.stack.advance();
self.stack.push(rightmost_page);
continue;
}
}
if has_unprocessed_children {
continue;
}
} else {
for cell_idx in 0..contents.cell_count() {
// if the node is a leaf node, clear overflow pages
let cell_idx = self.stack.current_cell_index();
for i in cell_idx..contents.cell_count() as i32 {
let cell = contents.cell_get(
cell_idx,
i as usize,
Rc::clone(&self.pager),
self.payload_overflow_threshold_max(contents.page_type()),
self.payload_overflow_threshold_min(contents.page_type()),
self.usable_space(),
)?;
if let BTreeCell::TableLeafCell(TableLeafCell {
_rowid,
_payload,
first_overflow_page: Some(overflow_page_id),
}) = cell
{
let mut current_overflow_id = overflow_page_id;
loop {
let overflow_page =
self.pager.read_page(current_overflow_id as usize)?;
return_if_locked!(overflow_page);
if !overflow_page.is_loaded() {
self.pager.load_page(Arc::clone(&overflow_page))?;
return Ok(CursorResult::IO);
}
let overflow_contents = overflow_page.get().contents.as_ref().unwrap();
let next_overflow_id = u32::from_be_bytes(
overflow_contents.as_ptr()[..4].try_into().unwrap(),
);
// TODO: Uncomment after Krishvishal's PR https://github.com/tursodatabase/limbo/pull/785 is merged
// self.pager
// .free_page(Some(overflow_page), current_overflow_id as usize)?;
if next_overflow_id == 0 {
break;
}
current_overflow_id = next_overflow_id;
}
}
return_if_io!(self.clear_overflow_pages(&cell));
self.stack.advance();
}
}
// All children & overflow pages have been processed
// TODO: Uncomment after Krishvishal's PR https://github.com/tursodatabase/limbo/pull/785 is merged
// self.pager.free_page(Some(page), current_page_id)?;
self.pager.free_page(Some(page), current_page_id)?;
if self.stack.has_parent() {
self.stack.pop();
} else {
@@ -2986,4 +2965,58 @@ mod tests {
Ok(())
}
#[test]
fn test_btree_destroy() -> Result<()> {
let initial_size = 3;
let (pager, db_header) = setup_test_env(initial_size);
let mut cursor = BTreeCursor::new(pager.clone(), 2);
assert_eq!(
db_header.borrow().database_size,
initial_size,
"Database should initially have 3 pages"
);
// Initialize page 2 as a leaf page
let root_page = cursor.pager.read_page(2)?;
{
let contents = root_page.get().contents.as_mut().unwrap();
contents.write_u8(0, PageType::TableLeaf as u8); // Set page type
contents.write_u16(1, 0); // First freeblock
contents.write_u16(3, 0); // Number of cells
contents.write_u16(5, contents.buffer.borrow().len() as u16); // Cell content area
contents.write_u8(7, 0); // Fragment bytes
}
// Insert records until we force a split
for i in 0..100 {
let record = Record::new(vec![OwnedValue::Integer(i)]);
cursor.insert(&OwnedValue::Integer(i), &record, false)?;
}
// Verify split occurred
assert_eq!(
db_header.borrow().database_size,
initial_size + 2,
"Split should add 2 pages"
);
// Track freelist state before destruction
let initial_free_pages = db_header.borrow().freelist_pages;
assert_eq!(initial_free_pages, 0, "Should start with no free pages");
// Destroy the btree
match cursor.btree_destroy()? {
CursorResult::Ok(_) => {
let pages_freed = db_header.borrow().freelist_pages - initial_free_pages;
// We expect 3 pages to be freed: root became interior + 2 leaf pages
assert_eq!(pages_freed, 3, "Should free 3 pages (root + 2 leaves)");
}
CursorResult::IO => {
cursor.pager.io.run_once()?;
}
}
Ok(())
}
}