diff --git a/core/storage/btree.rs b/core/storage/btree.rs index e29a9f8f7..befb43189 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -1470,7 +1470,7 @@ impl BTreeCursor { "overflow parent not yet implemented" ); - // Get divider cells and max_cells + /* 1. Get divider cells and max_cells */ let mut max_cells = 0; let mut pages_to_balance_new = Vec::new(); for i in (0..balance_info.sibling_count).rev() { @@ -1528,6 +1528,7 @@ impl BTreeCursor { // Reverse divider cells to be in order balance_info.divider_cells.reverse(); + /* 2. Initialize CellArray with all the cells used for distribution, this includes divider cells if !leaf. */ let mut cell_array = CellArray { cells: Vec::with_capacity(max_cells), number_of_cells_per_page: Vec::new(), @@ -1564,10 +1565,6 @@ impl BTreeCursor { } // Insert overflow cells into correct place let offset = total_cells_inserted; - assert!( - old_page_contents.overflow_cells.len() <= 1, - "todo: check this works for more than one overflow cell" - ); for overflow_cell in old_page_contents.overflow_cells.iter_mut() { cell_array.cells.insert( offset + overflow_cell.index, @@ -1614,17 +1611,10 @@ impl BTreeCursor { } #[cfg(debug_assertions)] - { - for cell in &cell_array.cells { - assert!(cell.len() >= 4); + validate_cells_after_insertion(&cell_array, leaf_data); - if leaf_data { - assert!(cell[0] != 0, "payload is {:?}", cell); - } - } - } - // calculate how many pages to allocate - let mut new_page_sizes: Vec = Vec::new(); + /* 3. Initiliaze current size of every page including overflow cells and divider cells that might be included. */ + let mut new_page_sizes: Vec = Vec::new(); let leaf_correction = if leaf { 4 } else { 0 }; // number of bytes beyond header, different from global usableSapce which includes // header @@ -1637,27 +1627,37 @@ impl BTreeCursor { let page_contents = page.get_contents(); let free_space = compute_free_space(page_contents, self.usable_space() as u16); - new_page_sizes.push(usable_space as usize - free_space as usize); + new_page_sizes.push(usable_space as i64 - free_space as i64); for overflow in &page_contents.overflow_cells { let size = new_page_sizes.last_mut().unwrap(); // 2 to account of pointer - *size += 2 + overflow.payload.len() as usize; + *size += 2 + overflow.payload.len() as i64; } if !leaf && i < balance_info.sibling_count - 1 { // Account for divider cell which is included in this page. let size = new_page_sizes.last_mut().unwrap(); - *size += cell_array.cells[cell_array.cell_count(i)].len() as usize; + *size += cell_array.cells[cell_array.cell_count(i)].len() as i64; } } + /* 4. Now let's try to move cells to the left trying to stack them without exceeding the maximum size of a page. + There are two cases: + * If current page has too many cells, it will move them to the next page. + * If it still has space, and it can take a cell from the right it will take them. + Here there is a caveat. Taking a cell from the right might take cells from page i+1, i+2, i+3, so not necessarily + adjacent. But we decrease the size of the adjacent page if we move from the right. This might cause a intermitent state + where page can have size <0. + This will also calculate how many pages are required to balance the cells and store in sibling_count_new. + */ // Try to pack as many cells to the left let mut sibling_count_new = balance_info.sibling_count; let mut i = 0; while i < sibling_count_new { // First try to move cells to the right if they do not fit - while new_page_sizes[i] > usable_space as usize { + while new_page_sizes[i] > usable_space as i64 { let needs_new_page = i + 1 >= sibling_count_new; if needs_new_page { + // FIXME: this doesn't remove pages if not needed sibling_count_new += 1; new_page_sizes.push(0); cell_array @@ -1669,7 +1669,7 @@ impl BTreeCursor { ); } let size_of_cell_to_remove_from_left = - 2 + cell_array.cells[cell_array.cell_count(i) - 1].len() as usize; + 2 + cell_array.cells[cell_array.cell_count(i) - 1].len() as i64; new_page_sizes[i] -= size_of_cell_to_remove_from_left; let size_of_cell_to_move_right = if !leaf_data { if cell_array.number_of_cells_per_page[i] @@ -1677,23 +1677,23 @@ impl BTreeCursor { { // This means we move to the right page the divider cell and we // promote left cell to divider - 2 + cell_array.cells[cell_array.cell_count(i)].len() as usize + 2 + cell_array.cells[cell_array.cell_count(i)].len() as i64 } else { 0 } } else { size_of_cell_to_remove_from_left }; - new_page_sizes[i + 1] += size_of_cell_to_move_right as usize; + new_page_sizes[i + 1] += size_of_cell_to_move_right as i64; cell_array.number_of_cells_per_page[i] -= 1; } // Now try to take from the right if we didn't have enough while cell_array.number_of_cells_per_page[i] < cell_array.cells.len() as u16 { let size_of_cell_to_remove_from_right = - 2 + cell_array.cells[cell_array.cell_count(i)].len() as usize; + 2 + cell_array.cells[cell_array.cell_count(i)].len() as i64; let can_take = new_page_sizes[i] + size_of_cell_to_remove_from_right - > usable_space as usize; + > usable_space as i64; if can_take { break; } @@ -1704,7 +1704,7 @@ impl BTreeCursor { if cell_array.number_of_cells_per_page[i] < cell_array.cells.len() as u16 { - 2 + cell_array.cells[cell_array.cell_count(i)].len() as usize + 2 + cell_array.cells[cell_array.cell_count(i)].len() as i64 } else { 0 } @@ -1732,6 +1732,10 @@ impl BTreeCursor { cell_array.cells.len() ); + /* 5. Balance pages starting from a left stacked cell state and move them to right trying to maintain a balanced state + where we only move from left to right if it will not unbalance both pages, meaning moving left to right won't make + right page bigger than left page. + */ // Comment borrowed from SQLite src/btree.c // The packing computed by the previous block is biased toward the siblings // on the left side (siblings with smaller keys). The left siblings are @@ -1750,8 +1754,8 @@ impl BTreeCursor { // the same we add to right (we don't add divider to right). let mut cell_right = cell_left + 1 - leaf_data as u16; loop { - let cell_left_size = cell_array.cell_size(cell_left as usize) as usize; - let cell_right_size = cell_array.cell_size(cell_right as usize) as usize; + let cell_left_size = cell_array.cell_size(cell_left as usize) as i64; + let cell_right_size = cell_array.cell_size(cell_right as usize) as i64; // TODO: add assert nMaxCells let pointer_size = if i == sibling_count_new - 1 { 0 } else { 2 }; @@ -1840,6 +1844,7 @@ impl BTreeCursor { right_page_id ); + /* 6. Update parent pointers. Update right pointer and insert divider cells with newly created distribution of cells */ // Ensure right-child pointer of the right-most new sibling pge points to the page // that was originally on that place. let is_leaf_page = matches!(page_type, PageType::TableLeaf | PageType::IndexLeaf); @@ -1921,41 +1926,12 @@ impl BTreeCursor { ) .unwrap(); #[cfg(debug_assertions)] - { - let left_pointer = if parent_contents.overflow_cells.len() == 0 { - let (cell_start, cell_len) = parent_contents.cell_get_raw_region( - balance_info.first_divider_cell + i, - payload_overflow_threshold_max( - parent_contents.page_type(), - self.usable_space() as u16, - ), - payload_overflow_threshold_min( - parent_contents.page_type(), - self.usable_space() as u16, - ), - self.usable_space(), - ); - tracing::debug!( - "balance_non_root(cell_start={}, cell_len={})", - cell_start, - cell_len - ); - - let left_pointer = read_u32( - &parent_contents.as_ptr()[cell_start..cell_start + cell_len], - 0, - ); - left_pointer - } else { - let cell = &parent_contents.overflow_cells[0]; - assert_eq!(cell.index, balance_info.first_divider_cell + i); - read_u32(&cell.payload, 0) - }; - assert_eq!(left_pointer, page.get().id as u32, "the cell we just inserted doesn't point to the correct page. points to {}, should point to {}", - left_pointer, - page.get().id as u32 - ); - } + self.validate_balance_non_root_divider_cell_insertion( + balance_info, + parent_contents, + i, + page, + ); } tracing::debug!( "balance_non_root(parent_overflow={})", @@ -1964,6 +1940,7 @@ impl BTreeCursor { #[cfg(debug_assertions)] { + // Let's ensure every page is pointed to by the divider cell or the rightmost pointer. for page in &pages_to_balance_new { assert!( pages_pointed_to.contains(&(page.get().id as u32)), @@ -1972,7 +1949,29 @@ impl BTreeCursor { ); } } - // TODO: update pages + /* 7. Start real movement of cells. Next comment is borrowed from SQLite: */ + /* Now update the actual sibling pages. The order in which they are updated + ** is important, as this code needs to avoid disrupting any page from which + ** cells may still to be read. In practice, this means: + ** + ** (1) If cells are moving left (from apNew[iPg] to apNew[iPg-1]) + ** then it is not safe to update page apNew[iPg] until after + ** the left-hand sibling apNew[iPg-1] has been updated. + ** + ** (2) If cells are moving right (from apNew[iPg] to apNew[iPg+1]) + ** then it is not safe to update page apNew[iPg] until after + ** the right-hand sibling apNew[iPg+1] has been updated. + ** + ** If neither of the above apply, the page is safe to update. + ** + ** The iPg value in the following loop starts at nNew-1 goes down + ** to 0, then back up to nNew-1 again, thus making two passes over + ** the pages. On the initial downward pass, only condition (1) above + ** needs to be tested because (2) will always be true from the previous + ** step. On the upward pass, both conditions are always true, so the + ** upwards pass simply processes pages that were missed on the downward + ** pass. + */ let mut done = vec![false; sibling_count_new]; for i in (1 - sibling_count_new as i64)..sibling_count_new as i64 { let page_idx = i.unsigned_abs() as usize; @@ -2053,6 +2052,53 @@ impl BTreeCursor { result } + #[cfg(debug_assertions)] + fn validate_balance_non_root_divider_cell_insertion( + &self, + balance_info: &mut BalanceInfo, + parent_contents: &mut PageContent, + i: usize, + page: &std::sync::Arc, + ) { + let left_pointer = if parent_contents.overflow_cells.len() == 0 { + let (cell_start, cell_len) = parent_contents.cell_get_raw_region( + balance_info.first_divider_cell + i, + payload_overflow_threshold_max( + parent_contents.page_type(), + self.usable_space() as u16, + ), + payload_overflow_threshold_min( + parent_contents.page_type(), + self.usable_space() as u16, + ), + self.usable_space(), + ); + tracing::debug!( + "balance_non_root(cell_start={}, cell_len={})", + cell_start, + cell_len + ); + + let left_pointer = read_u32( + &parent_contents.as_ptr()[cell_start..cell_start + cell_len], + 0, + ); + left_pointer + } else { + let mut left_pointer = None; + for cell in parent_contents.overflow_cells.iter() { + if cell.index == balance_info.first_divider_cell + i { + left_pointer = Some(read_u32(&cell.payload, 0)) + } + } + left_pointer.expect("overflow cell with divider cell was not found") + }; + assert_eq!(left_pointer, page.get().id as u32, "the cell we just inserted doesn't point to the correct page. points to {}, should point to {}", + left_pointer, + page.get().id as u32 + ); + } + #[cfg(debug_assertions)] fn post_balance_non_root_validation( &self, @@ -3419,6 +3465,17 @@ impl BTreeCursor { } } +#[cfg(debug_assertions)] +fn validate_cells_after_insertion(cell_array: &CellArray, leaf_data: bool) { + for cell in &cell_array.cells { + assert!(cell.len() >= 4); + + if leaf_data { + assert!(cell[0] != 0, "payload is {:?}", cell); + } + } +} + impl PageStack { fn increment_current(&self) { self.current_page.set(self.current_page.get() + 1); @@ -3944,7 +4001,7 @@ fn insert_into_cell( usable_space: u16, ) -> Result<()> { assert!( - cell_idx <= page.cell_count(), + cell_idx <= page.cell_count() + page.overflow_cells.len(), "attempting to add cell to an incorrect place cell_idx={} cell_count={}", cell_idx, page.cell_count()