diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 362b440c2..e55ba8490 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -304,7 +304,7 @@ impl BTreeCursor { let mem_page_rc = self.stack.top(); let cell_idx = self.stack.current_cell_index() as usize; - debug!("current id={} cell={}", mem_page_rc.get().id, cell_idx); + // debug!("current id={} cell={}", mem_page_rc.get().id, cell_idx); return_if_locked!(mem_page_rc); if !mem_page_rc.is_loaded() { self.pager.load_page(mem_page_rc.clone())?; @@ -870,11 +870,18 @@ impl BTreeCursor { matches!(self.state, CursorState::Write(_)), "Cursor must be in balancing state" ); - let state = self.state.write_info().expect("must be balancing").state; + let state = self + .state + .write_info() + .expect("must be balancing") + .state + .clone(); + tracing::trace!("balance_non_root(state={:?})", state); let (next_write_state, result) = match state { WriteState::Start => todo!(), WriteState::BalanceStart => todo!(), WriteState::BalanceNonRoot => { + let write_info = self.state.write_info().unwrap(); let parent_page = self.stack.top(); if parent_page.is_locked() { return Ok(CursorResult::IO); @@ -884,6 +891,8 @@ impl BTreeCursor { self.pager.load_page(parent_page.clone())?; return Ok(CursorResult::IO); } + parent_page.set_dirty(); + self.pager.add_dirty(parent_page.get().id); let parent_contents = parent_page.get().contents.as_ref().unwrap(); let page_to_balance_idx = self.stack.current_cell_index() as usize; @@ -893,9 +902,9 @@ impl BTreeCursor { page_to_balance_idx ); // Part 1: Find the sibling pages to balance - self.write_info.new_pages.borrow_mut().clear(); - self.write_info.pages_to_balance.borrow_mut().clear(); - self.write_info.divider_cells.borrow_mut().clear(); + write_info.new_pages.borrow_mut().clear(); + write_info.pages_to_balance.borrow_mut().clear(); + write_info.divider_cells.borrow_mut().clear(); let number_of_cells_in_parent = parent_contents.cell_count() + parent_contents.overflow_cells.len(); @@ -922,10 +931,8 @@ impl BTreeCursor { }; (2, next_divider) }; - self.write_info.sibling_count.replace(sibling_pointer + 1); - self.write_info - .first_divider_cell - .replace(first_cell_divider); + write_info.sibling_count.replace(sibling_pointer + 1); + write_info.first_divider_cell.replace(first_cell_divider); let last_sibling_is_right_pointer = sibling_pointer + first_cell_divider - parent_contents.overflow_cells.len() @@ -942,19 +949,21 @@ impl BTreeCursor { // load sibling pages // start loading right page first - let mut pgno: u32 = unsafe { right_pointer.cast::().read() }; + let mut pgno: u32 = unsafe { right_pointer.cast::().read().swap_bytes() }; + dbg!(pgno); let mut current_sibling = sibling_pointer; for i in (0..=current_sibling).rev() { let page = self.pager.read_page(pgno as usize)?; - page.set_dirty(); - self.pager.add_dirty(page.get().id); - self.write_info.pages_to_balance.borrow_mut().push(page); + write_info.pages_to_balance.borrow_mut().push(page); assert_eq!( parent_contents.overflow_cells.len(), 0, "overflow in parent is not yet implented while balancing it" ); - let next_cell_divider = i + first_cell_divider; + if i == 0 { + break; + } + let next_cell_divider = i + first_cell_divider - 1; pgno = match parent_contents.cell_get( next_cell_divider, self.pager.clone(), @@ -989,7 +998,7 @@ impl BTreeCursor { .pages_to_balance .borrow() .iter() - .all(|page| page.is_locked()); + .all(|page| !page.is_locked()); if !all_loaded { return Ok(CursorResult::IO); } @@ -1010,13 +1019,15 @@ impl BTreeCursor { for i in (0..sibling_count).rev() { let sibling_page = &pages_to_balance[i]; let sibling_contents = sibling_page.get_contents(); + sibling_page.set_dirty(); + self.pager.add_dirty(sibling_page.get().id); max_cells += sibling_contents.cell_count(); if i == 0 { // we don't have left sibling from this one so we break break; } // Since we know we have a left sibling, take the divider that points to left sibling of this page - let cell_idx = first_divider_cell - i - 1; + let cell_idx = first_divider_cell + i - 1; let (cell_start, cell_len) = parent_contents.cell_get_raw_region( cell_idx, self.payload_overflow_threshold_max(parent_contents.page_type()), @@ -1073,9 +1084,8 @@ impl BTreeCursor { } // Insert overflow cells into correct place let mut offset = total_cells_inserted; - assert_eq!( - old_page_contents.overflow_cells.len(), - 1, + assert!( + old_page_contents.overflow_cells.len() <= 1, "todo: check this works for more than one overflow cell" ); for overflow_cell in old_page_contents.overflow_cells.iter_mut() { @@ -1109,6 +1119,8 @@ impl BTreeCursor { let mut new_page_sizes = Vec::new(); let mut k = 0; // todo: add leaf correction + // number of bytes beyond header, differnet from global usableSapce which inccludes + // header let usable_space = self.usable_space() - 12; for i in 0..sibling_count { cell_array @@ -1136,15 +1148,28 @@ impl BTreeCursor { } k += 1; } + + for n in cell_array.number_of_cells_per_page.iter().enumerate() { + println!("init count page={}, n={}", n.0, n.1); + } // Try to pack as many cells to the left let mut sibling_count_new = sibling_count; let mut i = 0; while i < sibling_count_new { + for n in cell_array.number_of_cells_per_page.iter().enumerate() { + println!("start count i={} page={}, n={}", i, n.0, n.1); + } // First try to move cells to the right if they do not fit while new_page_sizes[i] > usable_space as u16 { - let needs_new_page = i + 2 >= sibling_count_new; + println!("moving right {}", i); + let needs_new_page = i + 1 >= sibling_count_new; if needs_new_page { sibling_count_new += 1; + println!("adding new page"); + new_page_sizes.push(0); + cell_array + .number_of_cells_per_page + .push(cell_array.cells.len() as u16); assert!( sibling_count_new <= 5, "it is corrupt to require more than 5 pages to balance 3 siblings" @@ -1166,12 +1191,13 @@ impl BTreeCursor { } else { size_of_cell_to_remove_from_left }; - new_page_sizes[i + 1] -= size_of_cell_to_move_right; + new_page_sizes[i + 1] += size_of_cell_to_move_right; cell_array.number_of_cells_per_page[i] -= 1; } // Now try to take from the right if we didn't have enough while cell_array.number_of_cells_per_page[i] < cell_array.cells.len() as u16 { + println!("moving left {}", i); let size_of_cell_to_remove_from_right = 2 + cell_array.cells[cell_array.cell_count(i)].len() as u16; let can_take = new_page_sizes[i] + size_of_cell_to_remove_from_right @@ -1194,15 +1220,22 @@ impl BTreeCursor { size_of_cell_to_remove_from_right }; - new_page_sizes[i + 1] += size_of_cell_to_remove_from_right; + new_page_sizes[i + 1] -= size_of_cell_to_remove_from_right; } let we_still_need_another_page = cell_array.number_of_cells_per_page[i] >= cell_array.cells.len() as u16; if we_still_need_another_page { - sibling_count_new += 1; + dbg!("we need"); + sibling_count_new = i + 1; } i += 1; + if i >= sibling_count_new { + break; + } + } + for n in cell_array.number_of_cells_per_page.iter().enumerate() { + println!("start count page={}, n={}", n.0, n.1); } // Comment borrowed from SQLite src/btree.c @@ -1222,6 +1255,7 @@ impl BTreeCursor { // if leaf_data means we don't have divider, so the one we move from left is // the same we add to right (we don't add divider to right). let mut cell_right = cell_left + 1 - leaf_data as u16; + log::trace!("start cell_left={}", cell_left); loop { let cell_left_size = cell_array.cell_size(cell_left as usize); let cell_right_size = cell_array.cell_size(cell_right as usize); @@ -1244,9 +1278,13 @@ impl BTreeCursor { cell_left -= 1; cell_right -= 1; } + tracing::trace!("end cell_left={}", cell_left); new_page_sizes[i] = size_right_page; new_page_sizes[i - 1] = size_left_page; + for n in cell_array.number_of_cells_per_page.iter().enumerate() { + println!("new count page={}, n={}", n.0, n.1); + } assert!( cell_array.number_of_cells_per_page[i - 1] > if i > 1 { @@ -1281,10 +1319,15 @@ impl BTreeCursor { } } + // Write right pointer in parent page to point to new rightmost page + parent_contents.write_u32( + PAGE_HEADER_OFFSET_RIGHTMOST_PTR, + pages_to_balance_new.last().unwrap().get().id as u32, + ); + // Ensure right-child pointer of the right-most new sibling pge points to the page // that was originally on that place. - let is_leaf_page = - matches!(page_type, PageType::TableInterior | PageType::IndexInterior); + let is_leaf_page = matches!(page_type, PageType::TableLeaf | PageType::IndexLeaf); if !is_leaf_page { let last_page = pages_to_balance.last().unwrap(); let right_pointer = last_page.get_contents().rightmost_pointer().unwrap(); @@ -1301,25 +1344,29 @@ impl BTreeCursor { let divider_cell_idx = cell_array.cell_count(i); let divider_cell = &mut cell_array.cells[divider_cell_idx]; let page = &pages_to_balance_new[i]; + // FIXME: dont use auxiliary space, could be done without allocations + let mut new_divider_cell = Vec::new(); if !is_leaf_page { // Interior page.get_contents() .write_u32(PAGE_HEADER_OFFSET_RIGHTMOST_PTR, page.get().id as u32); + new_divider_cell.extend_from_slice(divider_cell); } else if leaf_data { // Leaf table // FIXME: not needed conversion // FIXME: need to update cell size in order to free correctly? // insert into cell with correct range should be enough - let rowid = read_u32(divider_cell, 4); - divider_cell[0..4].copy_from_slice(&(page.get().id as u32).to_be_bytes()); - divider_cell[4..8].copy_from_slice(&rowid.to_be_bytes()); + let (_, n_bytes_payload) = read_varint(divider_cell)?; + let (rowid, _) = read_varint(÷r_cell[n_bytes_payload..])?; + new_divider_cell.extend_from_slice(&(page.get().id as u32).to_be_bytes()); + write_varint_to_vec(rowid, &mut new_divider_cell); } else { // Leaf index - divider_cell[0..4].copy_from_slice(&(page.get().id as u32).to_be_bytes()); + new_divider_cell.extend_from_slice(divider_cell); } insert_into_cell( parent_contents, - ÷r_cell, + &new_divider_cell, first_divider_cell + i, self.usable_space() as u16, ); @@ -1346,10 +1393,12 @@ impl BTreeCursor { } else { cell_array.cells.len() }; + let start_new_cells = + cell_array.cell_count(page_idx - 1) + (!leaf_data) as usize; ( start_old_cells, - cell_array.cell_count(page_idx - 1) + (!leaf_data) as usize, - cell_array.cell_count(0), + start_new_cells, + cell_array.cell_count(page_idx) - start_new_cells, ) }; let page = pages_to_balance_new[page_idx].get_contents(); @@ -1359,7 +1408,12 @@ impl BTreeCursor { start_new_cells, number_new_cells, &cell_array, - usable_space as u16, + self.usable_space() as u16, + ); + tracing::trace!( + "edit_page page={} cells={}", + pages_to_balance_new[page_idx].get().id, + page.cell_count() ); page.overflow_cells.clear(); @@ -1369,7 +1423,8 @@ impl BTreeCursor { // TODO: balance root self.stack.pop(); // TODO: free pages - return Ok(CursorResult::IO); + write_info.state = WriteState::Finish; + return Ok(CursorResult::Ok(())); } WriteState::Finish => todo!(), }; @@ -2014,11 +2069,11 @@ impl PageStack { /// Push a new page onto the stack. /// This effectively means traversing to a child page. fn push(&self, page: PageRef) { - debug!( - "pagestack::push(current={}, new_page_id={})", - self.current_page.borrow(), - page.get().id - ); + // debug!( + // "pagestack::push(current={}, new_page_id={})", + // self.current_page.borrow(), + // page.get().id + // ); *self.current_page.borrow_mut() += 1; let current = *self.current_page.borrow(); assert!( @@ -2033,7 +2088,7 @@ impl PageStack { /// This effectively means traversing back up to a parent page. fn pop(&self) { let current = *self.current_page.borrow(); - debug!("pagestack::pop(current={})", current); + // debug!("pagestack::pop(current={})", current); self.cell_indices.borrow_mut()[current as usize] = 0; self.stack.borrow_mut()[current as usize] = None; *self.current_page.borrow_mut() -= 1; @@ -2047,11 +2102,11 @@ impl PageStack { .as_ref() .unwrap() .clone(); - debug!( - "pagestack::top(current={}, page_id={})", - current, - page.get().id - ); + // debug!( + // "pagestack::top(current={}, page_id={})", + // current, + // page.get().id + // ); page } @@ -2160,6 +2215,7 @@ pub fn btree_init_page( contents.write_u16(PAGE_HEADER_OFFSET_CELL_COUNT, 0); let cell_content_area_start = db_header.page_size - db_header.reserved_space as u16; + contents.write_u16( PAGE_HEADER_OFFSET_CELL_CONTENT_AREA, cell_content_area_start, @@ -2200,7 +2256,6 @@ pub fn edit_page( start, ); count_cells -= number_to_shift; - // TODO: shift } if end_new_cells < end_old_cells { let number_tail_removed = page_free_array( @@ -2248,7 +2303,7 @@ pub fn edit_page( usable_space, ); // TODO: noverflow - page.write_u32(PAGE_HEADER_OFFSET_CELL_COUNT, count_cells as u32); + page.write_u16(PAGE_HEADER_OFFSET_CELL_COUNT, number_new_cells as u16); } pub fn page_free_array( @@ -2269,12 +2324,15 @@ pub fn page_free_array( // check if not overflow cell if cell_pointer.start >= buf_range.start && cell_pointer.start < buf_range.end { assert!( - cell_pointer.end >= buf_range.start && cell_pointer.end < buf_range.end, + cell_pointer.end >= buf_range.start && cell_pointer.end <= buf_range.end, "whole cell should be inside the page" ); + // TODO: remove pointer too let offset = (cell_pointer.start as usize - buf_range.start as usize) as u16; - let len = (cell_pointer.end as usize - buf_range.start as usize) as u16; + let len = (cell_pointer.end as usize - cell_pointer.start as usize) as u16; + println!("removing {}~{}", offset, len); free_cell_range(page, offset, len, usable_space); + page.write_u16(PAGE_HEADER_OFFSET_CELL_COUNT, page.cell_count() as u16 - 1); number_of_cells_removed += 1; } } @@ -2288,6 +2346,7 @@ pub fn page_insert_array( mut start_insert: usize, usable_space: u16, ) { + dbg!(count); // TODO: implement faster algorithm, this is doing extra work that's not needed. // See pageInsertArray to understand faster way. for i in first..first + count { @@ -2439,6 +2498,7 @@ fn defragment_page(page: &PageContent, usable_space: u16) { // return SQLITE_CORRUPT_PAGE(pPage); // } assert!(cbrk >= first_cell); + dbg!(cbrk, first_cell); let write_buf = page.as_ptr(); // set new first byte of cell content @@ -2446,8 +2506,7 @@ fn defragment_page(page: &PageContent, usable_space: u16) { // set free block to 0, unused spaced can be retrieved from gap between cell pointer end and content start page.write_u16(PAGE_HEADER_OFFSET_FIRST_FREEBLOCK, 0); // set unused space to 0 - let first_cell = cloned_page.cell_content_area() as u64; - assert!(first_cell <= cbrk); + dbg!(cbrk, first_cell); write_buf[first_cell as usize..cbrk as usize].fill(0); } @@ -2529,7 +2588,14 @@ fn compute_free_space(page: &PageContent, usable_space: u16) -> u16 { // #2. fragments (isolated 1-3 byte chunks of free space within the cell content area) // #3. freeblocks (linked list of blocks of at least 4 bytes within the cell content area that are not in use due to e.g. deletions) - let mut free_space_bytes = page.unallocated_region_size() + page.num_frag_free_bytes() as usize; + let pointer_size = if matches!(page.page_type(), PageType::TableLeaf | PageType::IndexLeaf) { + 0 + } else { + 4 + }; + let first_cell = page.offset + 8 + pointer_size + (2 * page.cell_count()); + let mut free_space_bytes = + cell_content_area_start as usize + page.num_frag_free_bytes() as usize; // #3 is computed by iterating over the freeblocks linked list let mut cur_freeblock_ptr = page.first_freeblock() as usize; @@ -2585,7 +2651,7 @@ fn compute_free_space(page: &PageContent, usable_space: u16) -> u16 { // return SQLITE_CORRUPT_PAGE(pPage); // } - free_space_bytes as u16 + free_space_bytes as u16 - first_cell as u16 } /// Allocate space for a cell on a page.