diff --git a/core/lib.rs b/core/lib.rs index 634823807..4f5f11796 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -227,8 +227,8 @@ pub fn maybe_init_database_file(file: &Rc, io: &Arc) -> Result btree_init_page( &page1, storage::sqlite3_ondisk::PageType::TableLeaf, - &db_header, DATABASE_HEADER_SIZE, + db_header.page_size - db_header.reserved_space as u16, ); let contents = page1.get().contents.as_mut().unwrap(); diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 40d9c59d8..ebe83174e 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -2,8 +2,7 @@ use tracing::debug; use crate::storage::pager::Pager; use crate::storage::sqlite3_ondisk::{ - read_btree_cell, read_varint, BTreeCell, DatabaseHeader, PageContent, PageType, - TableInteriorCell, TableLeafCell, + read_varint, BTreeCell, PageContent, PageType, TableInteriorCell, TableLeafCell, }; use crate::types::{CursorResult, OwnedValue, Record, SeekKey, SeekOp}; @@ -81,35 +80,40 @@ enum WriteState { Start, BalanceStart, BalanceNonRoot, - BalanceGetParentPage, - BalanceMoveUp, + BalanceNonRootWaitLoadPages, Finish, } struct WriteInfo { /// State of the write operation state machine. state: WriteState, - /// Pages involved in the split of the page due to balancing (splits_pages[0] is the balancing page, while other - fresh allocated pages) - split_pages: RefCell>, - /// Amount of cells from balancing page for every split page - split_pages_cells_count: RefCell>, + /// Old pages being balanced. + pages_to_balance: RefCell>, + /// Pages allocated during the write operation due to balancing. + new_pages: RefCell>, /// Scratch space used during balancing. - scratch_cells: RefCell>, + scratch_cells: RefCell>>, /// Bookkeeping of the rightmost pointer so the PAGE_HEADER_OFFSET_RIGHTMOST_PTR can be updated. - rightmost_pointer: RefCell>, - /// Copy of the current page needed for buffer references. - page_copy: RefCell>, + rightmost_pointer: RefCell>, + /// Divider cells of old pages + divider_cells: RefCell>>, + /// Number of siblings being used to balance + sibling_count: RefCell, + /// First divider cell to remove that marks the first sibling + first_divider_cell: RefCell, } impl WriteInfo { fn new() -> WriteInfo { WriteInfo { state: WriteState::Start, - split_pages: RefCell::new(Vec::with_capacity(4)), - split_pages_cells_count: RefCell::new(Vec::with_capacity(4)), scratch_cells: RefCell::new(Vec::new()), rightmost_pointer: RefCell::new(None), - page_copy: RefCell::new(None), + pages_to_balance: RefCell::new(Vec::new()), + divider_cells: RefCell::new(Vec::new()), + sibling_count: RefCell::new(0), + first_divider_cell: RefCell::new(0), + new_pages: RefCell::new(Vec::new()), } } } @@ -173,6 +177,12 @@ struct PageStack { cell_indices: RefCell<[i32; BTCURSOR_MAX_DEPTH + 1]>, } +struct CellArray { + cells: Vec<&'static mut [u8]>, // TODO(pere): make this with references + + number_of_cells_per_page: Vec, // number of cells in each page +} + impl BTreeCursor { pub fn new(pager: Rc, root_page: usize) -> Self { Self { @@ -251,8 +261,8 @@ impl BTreeCursor { let cell = contents.cell_get( cell_idx, self.pager.clone(), - self.payload_overflow_threshold_max(contents.page_type()), - self.payload_overflow_threshold_min(contents.page_type()), + payload_overflow_threshold_max(contents.page_type(), self.usable_space() as u16), + payload_overflow_threshold_min(contents.page_type(), self.usable_space() as u16), self.usable_space(), )?; @@ -340,8 +350,8 @@ impl BTreeCursor { let cell = contents.cell_get( cell_idx, self.pager.clone(), - self.payload_overflow_threshold_max(contents.page_type()), - self.payload_overflow_threshold_min(contents.page_type()), + payload_overflow_threshold_max(contents.page_type(), self.usable_space() as u16), + payload_overflow_threshold_min(contents.page_type(), self.usable_space() as u16), self.usable_space(), )?; match &cell { @@ -461,8 +471,14 @@ impl BTreeCursor { let cell = contents.cell_get( cell_idx, self.pager.clone(), - self.payload_overflow_threshold_max(contents.page_type()), - self.payload_overflow_threshold_min(contents.page_type()), + payload_overflow_threshold_max( + contents.page_type(), + self.usable_space() as u16, + ), + payload_overflow_threshold_min( + contents.page_type(), + self.usable_space() as u16, + ), self.usable_space(), )?; match &cell { @@ -621,8 +637,14 @@ impl BTreeCursor { match &contents.cell_get( cell_idx, self.pager.clone(), - self.payload_overflow_threshold_max(contents.page_type()), - self.payload_overflow_threshold_min(contents.page_type()), + payload_overflow_threshold_max( + contents.page_type(), + self.usable_space() as u16, + ), + payload_overflow_threshold_min( + contents.page_type(), + self.usable_space() as u16, + ), self.usable_space(), )? { BTreeCell::TableInteriorCell(TableInteriorCell { @@ -742,19 +764,31 @@ impl BTreeCursor { // insert cell let mut cell_payload: Vec = Vec::new(); - self.fill_cell_payload(page_type, Some(int_key), &mut cell_payload, record); + fill_cell_payload( + page_type, + Some(int_key), + &mut cell_payload, + record, + self.usable_space() as u16, + self.pager.clone(), + ); // insert let overflow = { let contents = page.get().contents.as_mut().unwrap(); - self.insert_into_cell(contents, cell_payload.as_slice(), cell_idx); - let overflow_cells = contents.overflow_cells.len(); debug!( - "insert_into_page(overflow, cell_count={}, overflow_cells={})", - contents.cell_count(), - overflow_cells + "insert_into_page(overflow, cell_count={})", + contents.cell_count() ); - overflow_cells + + insert_into_cell( + contents, + cell_payload.as_slice(), + cell_idx, + self.usable_space() as u16, + ) + .unwrap(); + contents.overflow_cells.len() }; let write_info = self .state @@ -768,8 +802,7 @@ impl BTreeCursor { } WriteState::BalanceStart | WriteState::BalanceNonRoot - | WriteState::BalanceMoveUp - | WriteState::BalanceGetParentPage => { + | WriteState::BalanceNonRootWaitLoadPages => { return_if_io!(self.balance()); } WriteState::Finish => { @@ -781,268 +814,6 @@ impl BTreeCursor { return ret; } - /// Insert a record into a cell. - /// If the cell overflows, an overflow cell is created. - /// insert_into_cell() is called from insert_into_page(), - /// and the overflow cell count is used to determine if the page overflows, - /// i.e. whether we need to balance the btree after the insert. - fn insert_into_cell(&self, page: &mut PageContent, payload: &[u8], cell_idx: usize) { - let free = self.compute_free_space(page, RefCell::borrow(&self.pager.db_header)); - const CELL_POINTER_SIZE_BYTES: usize = 2; - let enough_space = payload.len() + CELL_POINTER_SIZE_BYTES <= free as usize; - if !enough_space { - // add to overflow cells - page.overflow_cells.push(OverflowCell { - index: cell_idx, - payload: Pin::new(Vec::from(payload)), - }); - return; - } - - // TODO: insert into cell payload in internal page - let new_cell_data_pointer = self - .allocate_cell_space(page, payload.len() as u16) - .unwrap(); - let buf = page.as_ptr(); - - // Copy cell data - buf[new_cell_data_pointer as usize..new_cell_data_pointer as usize + payload.len()] - .copy_from_slice(payload); - // memmove(pIns+2, pIns, 2*(pPage->nCell - i)); - let (cell_pointer_array_start, _) = page.cell_pointer_array_offset_and_size(); - let cell_pointer_cur_idx = cell_pointer_array_start + (CELL_POINTER_SIZE_BYTES * cell_idx); - - let cell_count = page.cell_count(); - - // Move existing pointers if needed - let n_bytes_forward = CELL_POINTER_SIZE_BYTES * (cell_count - cell_idx); - if n_bytes_forward > 0 { - buf.copy_within( - cell_pointer_cur_idx..cell_pointer_cur_idx + n_bytes_forward, - cell_pointer_cur_idx + CELL_POINTER_SIZE_BYTES, - ); - } - - // Insert new cell pointer at the current cell index - page.write_u16(cell_pointer_cur_idx - page.offset, new_cell_data_pointer); - - // Update cell count - let new_n_cells = (page.cell_count() + 1) as u16; - page.write_u16(PAGE_HEADER_OFFSET_CELL_COUNT, new_n_cells); - } - - /// Free the range of bytes that a cell occupies. - /// This function also updates the freeblock list in the page. - /// Freeblocks are used to keep track of free space in the page, - /// and are organized as a linked list. - fn free_cell_range(&self, page: &mut PageContent, offset: u16, len: u16) -> Result<()> { - let mut cell_block_end = offset as u32 + len as u32; - let mut fragments_reduced = 0; - let mut cell_length = len; - let mut cell_block_start = offset; - let mut next_free_block_ptr = PAGE_HEADER_OFFSET_FIRST_FREEBLOCK as u16; - - let usable_size = { - let db_header = self.pager.db_header.borrow(); - (db_header.page_size - db_header.reserved_space as u16) as u32 - }; - - assert!( - cell_block_end <= usable_size, - "cell block end is out of bounds" - ); - assert!(cell_block_start >= 4, "minimum cell size is 4"); - assert!( - cell_block_start <= self.usable_space() as u16 - 4, - "offset is out of page bounds" - ); - - // Check for empty freelist fast path - let mut next_free_block = if page.read_u8(next_free_block_ptr as usize + 1) == 0 - && page.read_u8(next_free_block_ptr as usize) == 0 - { - 0 // Fast path for empty freelist - } else { - // Find position in free list - let mut block = page.read_u16(next_free_block_ptr as usize); - while block != 0 && block < cell_block_start { - if block <= next_free_block_ptr { - if block == 0 { - break; // Handle corruption test case - } - return Err(LimboError::Corrupt("Free block list not ascending".into())); - } - next_free_block_ptr = block; - block = page.read_u16(block as usize); - } - block - }; - - if next_free_block as u32 > usable_size - 4 { - return Err(LimboError::Corrupt("Free block beyond usable space".into())); - } - - // Coalesce with next block if adjacent - if next_free_block != 0 && cell_block_end + 3 >= next_free_block as u32 { - fragments_reduced = (next_free_block as u32 - cell_block_end) as u8; - if cell_block_end > next_free_block as u32 { - return Err(LimboError::Corrupt("Invalid block overlap".into())); - } - - let next_block_size = page.read_u16(next_free_block as usize + 2) as u32; - cell_block_end = next_free_block as u32 + next_block_size; - if cell_block_end > usable_size { - return Err(LimboError::Corrupt( - "Coalesced block extends beyond page".into(), - )); - } - - cell_length = cell_block_end as u16 - cell_block_start; - next_free_block = page.read_u16(next_free_block as usize); - } - - // Coalesce with previous block if adjacent - if next_free_block_ptr > PAGE_HEADER_OFFSET_FIRST_FREEBLOCK as u16 { - let prev_block_end = - next_free_block_ptr as u32 + page.read_u16(next_free_block_ptr as usize + 2) as u32; - - if prev_block_end + 3 >= cell_block_start as u32 { - if prev_block_end > cell_block_start as u32 { - return Err(LimboError::Corrupt("Invalid previous block overlap".into())); - } - fragments_reduced += (cell_block_start as u32 - prev_block_end) as u8; - cell_length = (cell_block_end - next_free_block_ptr as u32) as u16; - cell_block_start = next_free_block_ptr; - } - } - - // Update frag count - let current_frags = page.read_u8(PAGE_HEADER_OFFSET_FRAGMENTED_BYTES_COUNT); - if fragments_reduced > current_frags { - return Err(LimboError::Corrupt("Invalid fragmentation count".into())); - } - page.write_u8( - PAGE_HEADER_OFFSET_FRAGMENTED_BYTES_COUNT, - current_frags - fragments_reduced, - ); - - let content_area_start = page.cell_content_area(); - if cell_block_start <= content_area_start { - if cell_block_start < content_area_start { - return Err(LimboError::Corrupt("Free block before content area".into())); - } - if next_free_block_ptr != PAGE_HEADER_OFFSET_FIRST_FREEBLOCK as u16 { - return Err(LimboError::Corrupt("Invalid content area merge".into())); - } - // Extend content area - page.write_u16(PAGE_HEADER_OFFSET_FIRST_FREEBLOCK, next_free_block); - page.write_u16(PAGE_HEADER_OFFSET_CELL_CONTENT_AREA, cell_block_end as u16); - } else { - // Insert in free list - page.write_u16(next_free_block_ptr as usize, cell_block_start); - page.write_u16(cell_block_start as usize, next_free_block); - page.write_u16(cell_block_start as usize + 2, cell_length); - } - - Ok(()) - } - - /// Drop a cell from a page. - /// This is done by freeing the range of bytes that the cell occupies. - fn drop_cell(&self, page: &mut PageContent, cell_idx: usize) { - let cell_count = page.cell_count(); - let (cell_start, cell_len) = page.cell_get_raw_region( - cell_idx, - self.payload_overflow_threshold_max(page.page_type()), - self.payload_overflow_threshold_min(page.page_type()), - self.usable_space(), - ); - - self.free_cell_range(page, cell_start as u16, cell_len as u16) - .expect("Failed to free cell range"); - - let new_cell_count = cell_count - 1; - page.write_u16(PAGE_HEADER_OFFSET_CELL_COUNT, new_cell_count as u16); - - if new_cell_count == 0 { - page.write_u16(PAGE_HEADER_OFFSET_FIRST_FREEBLOCK, 0); - page.write_u8(PAGE_HEADER_OFFSET_FRAGMENTED_BYTES_COUNT, 0); - page.write_u16( - PAGE_HEADER_OFFSET_CELL_CONTENT_AREA, - self.usable_space() as u16, - ); - } else { - let (pointer_array_start, _) = page.cell_pointer_array_offset_and_size(); - let buf = page.as_ptr(); - buf.copy_within( - pointer_array_start + (2 * (cell_idx + 1)) // src - ..pointer_array_start + (2 * cell_count), - pointer_array_start + (2 * cell_idx), // dst - ); - } - } - - fn find_free_cell( - &self, - page_ref: &PageContent, - amount: usize, - db_header: Ref, - ) -> Result { - // NOTE: freelist is in ascending order of keys and pc - // unused_space is reserved bytes at the end of page, therefore we must subtract from maxpc - let mut free_list_pointer_addr = 1; - let mut pc = page_ref.first_freeblock() as usize; - - let usable_space = (db_header.page_size - db_header.reserved_space as u16) as usize; - let maxpc = usable_space - amount; - - if pc == 0 { - return Ok(0); - } - - while pc <= maxpc { - let size = page_ref.read_u16(pc + 2) as usize; - - if let Some(x) = size.checked_sub(amount) { - if x < 4 { - if page_ref.read_u8(PAGE_HEADER_OFFSET_FRAGMENTED_BYTES_COUNT) > 57 { - return Ok(0); - } - - let next_ptr = page_ref.read_u16(pc); - page_ref.write_u16(free_list_pointer_addr, next_ptr); - - let frag_count = page_ref.read_u8(PAGE_HEADER_OFFSET_FRAGMENTED_BYTES_COUNT); - page_ref.write_u8( - PAGE_HEADER_OFFSET_FRAGMENTED_BYTES_COUNT, - frag_count + x as u8, - ); - return Ok(pc); - } else if x + pc > maxpc { - return Err(LimboError::Corrupt("Free block extends beyond page".into())); - } else { - page_ref.write_u16(pc + 2, x as u16); - return Ok(pc + x); - } - } - - free_list_pointer_addr = pc; - pc = page_ref.read_u16(pc) as usize; - if pc <= free_list_pointer_addr && pc != 0 { - return Err(LimboError::Corrupt( - "Free list not in ascending order".into(), - )); - } - } - - if pc > maxpc + amount - 4 { - return Err(LimboError::Corrupt( - "Free block chain extends beyond page end".into(), - )); - } - Ok(0) - } - /// Balance a leaf page. /// Balancing is done when a page overflows. /// see e.g. https://en.wikipedia.org/wiki/B-tree @@ -1073,312 +844,612 @@ impl BTreeCursor { if !self.stack.has_parent() { self.balance_root(); - return Ok(CursorResult::Ok(())); } let write_info = self.state.mut_write_info().unwrap(); write_info.state = WriteState::BalanceNonRoot; - } - WriteState::BalanceNonRoot - | WriteState::BalanceGetParentPage - | WriteState::BalanceMoveUp => { + self.stack.pop(); + self.stack.retreat(); return_if_io!(self.balance_non_root()); } - - _ => unreachable!("invalid balance leaf state {:?}", state), + WriteState::BalanceNonRoot | WriteState::BalanceNonRootWaitLoadPages => { + return_if_io!(self.balance_non_root()); + } + WriteState::Finish => return Ok(CursorResult::Ok(())), + _ => panic!("unexpected state on balance {:?}", state), } } } + /// Balance a non root page by trying to balance cells between a maximum of 3 siblings that should be neighboring the page that overflowed/underflowed. fn balance_non_root(&mut self) -> Result> { assert!( matches!(self.state, CursorState::Write(_)), "Cursor must be in balancing state" ); - let state = self.state.write_info().expect("must be balancing").state; + let state = self + .state + .write_info() + .expect("must be balancing") + .state + .clone(); + tracing::debug!("balance_non_root(state={:?})", state); let (next_write_state, result) = match state { WriteState::Start => todo!(), WriteState::BalanceStart => todo!(), WriteState::BalanceNonRoot => { - // drop divider cells and find right pointer - // NOTE: since we are doing a simple split we only finding the pointer we want to update (right pointer). - // Right pointer means cell that points to the last page, as we don't really want to drop this one. This one - // can be a "rightmost pointer" or a "cell". - // we always asumme there is a parent - let current_page = self.stack.top(); - debug!("balance_non_root(page={})", current_page.get().id); - - let current_page_inner = current_page.get(); - let current_page_contents = &mut current_page_inner.contents; - let current_page_contents = current_page_contents.as_mut().unwrap(); - // Copy of page used to reference cell bytes. - // This needs to be saved somewhere safe so that references still point to here, - // this will be store in write_info below - let page_copy = current_page_contents.clone(); - current_page_contents.overflow_cells.clear(); - - // In memory in order copy of all cells in pages we want to balance. For now let's do a 2 page split. - // Right pointer in interior cells should be converted to regular cells if more than 2 pages are used for balancing. let write_info = self.state.write_info().unwrap(); - let mut scratch_cells = write_info.scratch_cells.borrow_mut(); - scratch_cells.clear(); - - let usable_space = self.usable_space(); - for cell_idx in 0..page_copy.cell_count() { - let (start, len) = page_copy.cell_get_raw_region( - cell_idx, - self.payload_overflow_threshold_max(page_copy.page_type()), - self.payload_overflow_threshold_min(page_copy.page_type()), - usable_space, - ); - let cell_buffer = to_static_buf(&page_copy.as_ptr()[start..start + len]); - scratch_cells.push(cell_buffer); - } - // overflow_cells are stored in order - so we need to insert them in reverse order - for cell in page_copy.overflow_cells.iter().rev() { - scratch_cells.insert(cell.index, to_static_buf(&cell.payload)); - } - - // amount of cells for pages involved in split - // the algorithm accumulate cells in greedy manner with 2 conditions for split: - // 1. new cell will overflow single cell (accumulated + new > usable_space - header_size) - // 2. accumulated size already reach >50% of content_usable_size - // second condition is necessary, otherwise in case of small cells we will create a lot of almost empty pages - // - // if we have single overflow cell in a table leaf node - we still can have 3 split pages - // - // for example, if current page has 4 entries with size ~1/4 page size, and new cell has size ~page size - // then we will need 3 pages to distribute cells between them - let split_pages_cells_count = &mut write_info.split_pages_cells_count.borrow_mut(); - split_pages_cells_count.clear(); - let mut last_page_cells_count = 0; - let mut last_page_cells_size = 0; - let content_usable_space = usable_space - page_copy.header_size(); - for scratch_cell in scratch_cells.iter() { - let cell_size = scratch_cell.len() + 2; // + cell pointer size (u16) - if last_page_cells_size + cell_size > content_usable_space - || 2 * last_page_cells_size > content_usable_space - { - split_pages_cells_count.push(last_page_cells_count); - last_page_cells_count = 0; - last_page_cells_size = 0; - } - last_page_cells_count += 1; - last_page_cells_size += cell_size; - assert!(last_page_cells_size <= content_usable_space); - } - split_pages_cells_count.push(last_page_cells_count); - let new_pages_count = split_pages_cells_count.len(); - - debug!( - "splitting left={} new_pages={}, cells_count={:?}", - current_page.get().id, - new_pages_count - 1, - split_pages_cells_count - ); - - *write_info.rightmost_pointer.borrow_mut() = page_copy.rightmost_pointer(); - write_info.page_copy.replace(Some(page_copy)); - - let page = current_page.get().contents.as_mut().unwrap(); - let page_type = page.page_type(); - assert!( - matches!(page_type, PageType::TableLeaf | PageType::TableInterior), - "indexes still not supported" - ); - - write_info.split_pages.borrow_mut().clear(); - write_info.split_pages.borrow_mut().push(current_page); - // allocate new pages - for _ in 1..new_pages_count { - let new_page = self.allocate_page(page_type, 0); - write_info.split_pages.borrow_mut().push(new_page); - } - - (WriteState::BalanceGetParentPage, Ok(CursorResult::Ok(()))) - } - WriteState::BalanceGetParentPage => { - let parent = self.stack.parent(); - let loaded = parent.is_loaded(); - return_if_locked!(parent); - - if !loaded { - debug!("balance_leaf(loading page)"); - self.pager.load_page(parent.clone())?; + let parent_page = self.stack.top(); + if parent_page.is_locked() { return Ok(CursorResult::IO); } - parent.set_dirty(); - (WriteState::BalanceMoveUp, Ok(CursorResult::Ok(()))) - } - WriteState::BalanceMoveUp => { - let parent = self.stack.parent(); + return_if_locked!(parent_page); + if !parent_page.is_loaded() { + self.pager.load_page(parent_page.clone())?; + return Ok(CursorResult::IO); + } + parent_page.set_dirty(); + self.pager.add_dirty(parent_page.get().id); + let parent_contents = parent_page.get().contents.as_ref().unwrap(); + let page_to_balance_idx = self.stack.current_cell_index() as usize; - let (page_type, current_idx) = { - let current_page = self.stack.top(); - let contents = current_page.get().contents.as_ref().unwrap(); - (contents.page_type().clone(), current_page.get().id) + debug!( + "balance_non_root(parent_id={} page_to_balance_idx={})", + parent_page.get().id, + page_to_balance_idx + ); + assert!(matches!( + parent_contents.page_type(), + PageType::IndexInterior | PageType::TableInterior + )); + // Part 1: Find the sibling pages to balance + write_info.new_pages.borrow_mut().clear(); + write_info.pages_to_balance.borrow_mut().clear(); + write_info.divider_cells.borrow_mut().clear(); + let number_of_cells_in_parent = + parent_contents.cell_count() + parent_contents.overflow_cells.len(); + + assert!( + parent_contents.overflow_cells.is_empty(), + "balancing child page with overflowed parent not yet implemented" + ); + assert!(page_to_balance_idx <= parent_contents.cell_count()); + // As there will be at maximum 3 pages used to balance: + // sibling_pointer is the index represeneting one of those 3 pages, and we initialize it to the last possible page. + // next_divider is the first divider that contains the first page of the 3 pages. + let (sibling_pointer, first_cell_divider) = if number_of_cells_in_parent < 2 { + (number_of_cells_in_parent, 0) + } else if number_of_cells_in_parent == 2 { + // Here we will have at lest 2 cells and one right pointer, therefore we can get 3 siblings. + // In case of 2 we will have all pages to balance. + (2, 0) + } else { + // In case of > 3 we have to check which ones to get + let next_divider = if page_to_balance_idx == 0 { + // first cell, take first 3 + 0 + } else if page_to_balance_idx == number_of_cells_in_parent { + // Page corresponds to right pointer, so take last 3 + number_of_cells_in_parent - 2 + } else { + // Some cell in the middle, so we want to take sibling on left and right. + page_to_balance_idx - 1 + }; + (2, next_divider) + }; + write_info.sibling_count.replace(sibling_pointer + 1); + write_info.first_divider_cell.replace(first_cell_divider); + + let last_sibling_is_right_pointer = sibling_pointer + first_cell_divider + - parent_contents.overflow_cells.len() + == parent_contents.cell_count(); + // Get the right page pointer that we will need to update later + let right_pointer = if last_sibling_is_right_pointer { + parent_contents.rightmost_pointer_raw().unwrap() + } else { + let (start_of_cell, _) = parent_contents.cell_get_raw_region( + first_cell_divider + sibling_pointer, + payload_overflow_threshold_max( + parent_contents.page_type(), + self.usable_space() as u16, + ), + payload_overflow_threshold_min( + parent_contents.page_type(), + self.usable_space() as u16, + ), + self.usable_space(), + ); + let buf = parent_contents.as_ptr().as_mut_ptr(); + unsafe { buf.add(start_of_cell) } }; - parent.set_dirty(); - self.pager.add_dirty(parent.get().id); - let parent_contents = parent.get().contents.as_mut().unwrap(); - // if this isn't empty next loop won't work - assert_eq!(parent_contents.overflow_cells.len(), 0); - - // Right page pointer is u32 in right most pointer, and in cell is u32 too, so we can use a *u32 to hold where we want to change this value - let mut right_pointer = PAGE_HEADER_OFFSET_RIGHTMOST_PTR; - for cell_idx in 0..parent_contents.cell_count() { - let cell = parent_contents.cell_get( - cell_idx, + // load sibling pages + // start loading right page first + let mut pgno: u32 = unsafe { right_pointer.cast::().read().swap_bytes() }; + write_info.rightmost_pointer.replace(Some(right_pointer)); + let current_sibling = sibling_pointer; + for i in (0..=current_sibling).rev() { + let page = self.pager.read_page(pgno as usize)?; + write_info.pages_to_balance.borrow_mut().push(page); + assert_eq!( + parent_contents.overflow_cells.len(), + 0, + "overflow in parent is not yet implented while balancing it" + ); + if i == 0 { + break; + } + let next_cell_divider = i + first_cell_divider - 1; + pgno = match parent_contents.cell_get( + next_cell_divider, self.pager.clone(), - self.payload_overflow_threshold_max(page_type.clone()), - self.payload_overflow_threshold_min(page_type.clone()), + payload_overflow_threshold_max( + parent_contents.page_type(), + self.usable_space() as u16, + ), + payload_overflow_threshold_min( + parent_contents.page_type(), + self.usable_space() as u16, + ), self.usable_space(), - )?; - let found = match cell { - BTreeCell::TableInteriorCell(interior) => { - interior._left_child_page as usize == current_idx + )? { + BTreeCell::TableInteriorCell(table_interior_cell) => { + table_interior_cell._left_child_page + } + BTreeCell::IndexInteriorCell(index_interior_cell) => { + index_interior_cell.left_child_page + } + BTreeCell::TableLeafCell(..) | BTreeCell::IndexLeafCell(..) => { + unreachable!() } - _ => unreachable!("Parent should always be an interior page"), }; - if found { - let (start, _len) = parent_contents.cell_get_raw_region( + } + // Reverse in order to keep the right order + self.state + .write_info() + .unwrap() + .pages_to_balance + .borrow_mut() + .reverse(); + ( + WriteState::BalanceNonRootWaitLoadPages, + Ok(CursorResult::IO), + ) + } + WriteState::BalanceNonRootWaitLoadPages => { + let write_info = self.state.write_info().unwrap(); + let all_loaded = write_info + .pages_to_balance + .borrow() + .iter() + .all(|page| !page.is_locked()); + if !all_loaded { + return Ok(CursorResult::IO); + } + // Now do real balancing + let parent_page = self.stack.top(); + let parent_contents = parent_page.get_contents(); + assert!( + parent_contents.overflow_cells.len() == 0, + "overflow parent not yet implemented" + ); + let sibling_count = *write_info.sibling_count.borrow(); + let first_divider_cell = *write_info.first_divider_cell.borrow(); + + // Get divider cells and max_cells + let mut max_cells = 0; + let pages_to_balance = write_info.pages_to_balance.borrow(); + let mut pages_to_balance_new = Vec::new(); + for i in (0..sibling_count).rev() { + let sibling_page = &pages_to_balance[i]; + let sibling_contents = sibling_page.get_contents(); + sibling_page.set_dirty(); + self.pager.add_dirty(sibling_page.get().id); + max_cells += sibling_contents.cell_count(); + if i == 0 { + // we don't have left sibling from this one so we break + break; + } + // Since we know we have a left sibling, take the divider that points to left sibling of this page + let cell_idx = first_divider_cell + i - 1; + let (cell_start, cell_len) = parent_contents.cell_get_raw_region( + cell_idx, + payload_overflow_threshold_max( + parent_contents.page_type(), + self.usable_space() as u16, + ), + payload_overflow_threshold_min( + parent_contents.page_type(), + self.usable_space() as u16, + ), + self.usable_space(), + ); + let buf = parent_contents.as_ptr(); + let cell_buf = &buf[cell_start..cell_start + cell_len]; + + // TODO(pere): make this reference and not copy + write_info + .divider_cells + .borrow_mut() + .push(cell_buf.to_vec()); + tracing::trace!( + "dropping divider cell from parent cell_idx={} count={}", + cell_idx, + parent_contents.cell_count() + ); + drop_cell(parent_contents, cell_idx, self.usable_space() as u16)?; + } + assert_eq!( + write_info.divider_cells.borrow().len(), + sibling_count - 1, + "the number of pages balancing must be divided by one less divider" + ); + // Reverse divider cells to be in order + write_info.divider_cells.borrow_mut().reverse(); + + write_info + .scratch_cells + .replace(Vec::with_capacity(max_cells)); + + let mut cell_array = CellArray { + cells: Vec::new(), + number_of_cells_per_page: Vec::new(), + }; + + let mut total_cells_inserted = 0; + // count_cells_in_old_pages is the prefix sum of cells of each page + let mut count_cells_in_old_pages = Vec::new(); + let mut divider_cells = Vec::new(); + + let page_type = pages_to_balance[0].get_contents().page_type(); + let leaf_data = matches!(page_type, PageType::TableLeaf); + for (i, old_page) in pages_to_balance.iter().enumerate() { + let old_page_contents = old_page.get_contents(); + for cell_idx in 0..old_page_contents.cell_count() { + let (cell_start, cell_len) = old_page_contents.cell_get_raw_region( cell_idx, - self.payload_overflow_threshold_max(page_type.clone()), - self.payload_overflow_threshold_min(page_type.clone()), + payload_overflow_threshold_max( + old_page_contents.page_type(), + self.usable_space() as u16, + ), + payload_overflow_threshold_min( + old_page_contents.page_type(), + self.usable_space() as u16, + ), self.usable_space(), ); - right_pointer = start; + let buf = old_page_contents.as_ptr(); + let cell_buf = &mut buf[cell_start..cell_start + cell_len]; + // TODO(pere): make this reference and not copy + cell_array.cells.push(to_static_buf(cell_buf)); + } + // Insert overflow cells into correct place + let offset = total_cells_inserted; + assert!( + old_page_contents.overflow_cells.len() <= 1, + "todo: check this works for more than one overflow cell" + ); + for overflow_cell in old_page_contents.overflow_cells.iter_mut() { + cell_array.cells.insert( + offset + overflow_cell.index, + to_static_buf(&mut Pin::as_mut(&mut overflow_cell.payload)), + ); + } + + count_cells_in_old_pages.push(cell_array.cells.len() as u16); + + let mut cells_inserted = + old_page_contents.cell_count() + old_page_contents.overflow_cells.len(); + + if i < pages_to_balance.len() - 1 && !leaf_data { + // If we are a index page or a interior table page we need to take the divider cell too. + // But we don't need the last divider as it will remain the same. + let divider_cell = write_info.divider_cells.borrow()[i].clone(); + // TODO(pere): in case of old pages are leaf pages, so index leaf page, we need to strip page pointers + // from divider cells in index interior pages (parent) because those should not be included. + cells_inserted += 1; + divider_cells.push(divider_cell); + cell_array + .cells + .push(to_static_buf(divider_cells.last_mut().unwrap().as_mut())); + } + total_cells_inserted += cells_inserted; + } + + // calculate how many pages to allocate + let mut new_page_sizes = Vec::new(); + let mut k = 0; + // todo: add leaf correction + // number of bytes beyond header, differnet from global usableSapce which inccludes + // header + let usable_space = self.usable_space() - 12; + for i in 0..sibling_count { + cell_array + .number_of_cells_per_page + .push(count_cells_in_old_pages[i]); + let page = &pages_to_balance[i]; + let page_contents = page.get_contents(); + let free_space = compute_free_space(&page_contents, self.usable_space() as u16); + + // If we have an empty page of cells, we ignore it + if k > 0 + && cell_array.number_of_cells_per_page[k - 1] + == cell_array.number_of_cells_per_page[k] + { + k -= 1; + } + if !leaf_data { + k += 1; + } + new_page_sizes.push(usable_space as u16 - free_space); + for overflow in &page_contents.overflow_cells { + let size = new_page_sizes.last_mut().unwrap(); + // 2 to account of pointer + *size += 2 + overflow.payload.len() as u16; + } + k += 1; + } + + // Try to pack as many cells to the left + let mut sibling_count_new = sibling_count; + let mut i = 0; + while i < sibling_count_new { + // First try to move cells to the right if they do not fit + while new_page_sizes[i] > usable_space as u16 { + let needs_new_page = i + 1 >= sibling_count_new; + if needs_new_page { + sibling_count_new += 1; + new_page_sizes.push(0); + cell_array + .number_of_cells_per_page + .push(cell_array.cells.len() as u16); + assert!( + sibling_count_new <= 5, + "it is corrupt to require more than 5 pages to balance 3 siblings" + ); + } + let size_of_cell_to_remove_from_left = + 2 + cell_array.cells[cell_array.cell_count(i) - 1].len() as u16; + new_page_sizes[i] -= size_of_cell_to_remove_from_left; + let size_of_cell_to_move_right = if !leaf_data { + if cell_array.number_of_cells_per_page[i] + < cell_array.cells.len() as u16 + { + // This means we move to the right page the divider cell and we + // promote left cell to divider + 2 + cell_array.cells[cell_array.cell_count(i)].len() as u16 + } else { + 0 + } + } else { + size_of_cell_to_remove_from_left + }; + new_page_sizes[i + 1] += size_of_cell_to_move_right; + cell_array.number_of_cells_per_page[i] -= 1; + } + + // Now try to take from the right if we didn't have enough + while cell_array.number_of_cells_per_page[i] < cell_array.cells.len() as u16 { + let size_of_cell_to_remove_from_right = + 2 + cell_array.cells[cell_array.cell_count(i)].len() as u16; + let can_take = new_page_sizes[i] + size_of_cell_to_remove_from_right + > usable_space as u16; + if can_take { + break; + } + new_page_sizes[i] += size_of_cell_to_remove_from_right; + cell_array.number_of_cells_per_page[i] += 1; + + let size_of_cell_to_remove_from_right = if !leaf_data { + if cell_array.number_of_cells_per_page[i] + < cell_array.cells.len() as u16 + { + 2 + cell_array.cells[cell_array.cell_count(i)].len() as u16 + } else { + 0 + } + } else { + size_of_cell_to_remove_from_right + }; + + new_page_sizes[i + 1] -= size_of_cell_to_remove_from_right; + } + + let we_still_need_another_page = + cell_array.number_of_cells_per_page[i] >= cell_array.cells.len() as u16; + if we_still_need_another_page { + sibling_count_new = i + 1; + } + i += 1; + if i >= sibling_count_new { break; } } - let write_info = self.state.write_info().unwrap(); - let mut split_pages = write_info.split_pages.borrow_mut(); - let split_pages_len = split_pages.len(); - let scratch_cells = write_info.scratch_cells.borrow(); + // Comment borrowed from SQLite src/btree.c + // The packing computed by the previous block is biased toward the siblings + // on the left side (siblings with smaller keys). The left siblings are + // always nearly full, while the right-most sibling might be nearly empty. + // The next block of code attempts to adjust the packing of siblings to + // get a better balance. + // + // This adjustment is more than an optimization. The packing above might + // be so out of balance as to be illegal. For example, the right-most + // sibling might be completely empty. This adjustment is not optional. + for i in (1..sibling_count_new).rev() { + let mut size_right_page = new_page_sizes[i]; + let mut size_left_page = new_page_sizes[i - 1]; + let mut cell_left = cell_array.number_of_cells_per_page[i - 1] - 1; + // if leaf_data means we don't have divider, so the one we move from left is + // the same we add to right (we don't add divider to right). + let mut cell_right = cell_left + 1 - leaf_data as u16; + loop { + let cell_left_size = cell_array.cell_size(cell_left as usize); + let cell_right_size = cell_array.cell_size(cell_right as usize); + // TODO: add assert nMaxCells - // reset pages - for page in split_pages.iter() { - assert!(page.is_dirty()); - let contents = page.get().contents.as_mut().unwrap(); + let pointer_size = if i == sibling_count_new - 1 { 0 } else { 2 }; + let would_not_improve_balance = size_right_page + cell_right_size + 2 + > size_left_page - (cell_left_size + pointer_size); + if size_right_page != 0 && would_not_improve_balance { + break; + } - contents.write_u16(PAGE_HEADER_OFFSET_FIRST_FREEBLOCK, 0); - contents.write_u16(PAGE_HEADER_OFFSET_CELL_COUNT, 0); + size_left_page -= cell_left_size + 2; + size_right_page += cell_right_size + 2; + cell_array.number_of_cells_per_page[i - 1] = cell_left; - contents.write_u16( - PAGE_HEADER_OFFSET_CELL_CONTENT_AREA, + if cell_left == 0 { + break; + } + cell_left -= 1; + cell_right -= 1; + } + + new_page_sizes[i] = size_right_page; + new_page_sizes[i - 1] = size_left_page; + assert!( + cell_array.number_of_cells_per_page[i - 1] + > if i > 1 { + cell_array.number_of_cells_per_page[i - 2] + } else { + 0 + } + ); + } + + // Allocate pages or set dirty if not needed + for i in 0..sibling_count_new { + if i < sibling_count { + pages_to_balance[i].set_dirty(); + pages_to_balance_new.push(pages_to_balance[i].clone()); + } else { + let page = self.allocate_page(page_type.clone(), 0); + pages_to_balance_new.push(page); + } + } + + // Reassign page numbers in increasing order + let mut page_numbers = Vec::new(); + for page in pages_to_balance_new.iter() { + page_numbers.push(page.get().id); + } + page_numbers.sort(); + for (page, new_id) in pages_to_balance_new.iter().zip(page_numbers) { + if new_id != page.get().id { + page.get().id = new_id; + self.pager.put_loaded_page(new_id, page.clone()); + } + } + + // Write right pointer in parent page to point to new rightmost page + let right_page_id = pages_to_balance_new.last().unwrap().get().id as u32; + let rightmost_pointer = write_info.rightmost_pointer.borrow_mut().unwrap(); + let rightmost_pointer = + unsafe { std::slice::from_raw_parts_mut(rightmost_pointer, 4) }; + rightmost_pointer[0..4].copy_from_slice(&right_page_id.to_be_bytes()); + + // Ensure right-child pointer of the right-most new sibling pge points to the page + // that was originally on that place. + let is_leaf_page = matches!(page_type, PageType::TableLeaf | PageType::IndexLeaf); + if !is_leaf_page { + let last_page = pages_to_balance.last().unwrap(); + let right_pointer = last_page.get_contents().rightmost_pointer().unwrap(); + let new_last_page = pages_to_balance_new.last().unwrap(); + new_last_page + .get_contents() + .write_u32(PAGE_HEADER_OFFSET_RIGHTMOST_PTR, right_pointer); + } + // TODO: pointer map update (vacuum support) + for i in 0..sibling_count_new - 1 + /* do not take last page */ + { + let divider_cell_idx = cell_array.cell_count(i); + let mut divider_cell = &mut cell_array.cells[divider_cell_idx]; + let page = &pages_to_balance_new[i]; + // FIXME: dont use auxiliary space, could be done without allocations + let mut new_divider_cell = Vec::new(); + if !is_leaf_page { + // Interior + page.get_contents() + .write_u32(PAGE_HEADER_OFFSET_RIGHTMOST_PTR, page.get().id as u32); + new_divider_cell.extend_from_slice(divider_cell); + } else if leaf_data { + // Leaf table + // FIXME: not needed conversion + // FIXME: need to update cell size in order to free correctly? + // insert into cell with correct range should be enough + divider_cell = &mut cell_array.cells[divider_cell_idx - 1]; + let (_, n_bytes_payload) = read_varint(divider_cell)?; + let (rowid, _) = read_varint(÷r_cell[n_bytes_payload..])?; + new_divider_cell.extend_from_slice(&(page.get().id as u32).to_be_bytes()); + write_varint_to_vec(rowid, &mut new_divider_cell); + } else { + // Leaf index + new_divider_cell.extend_from_slice(divider_cell); + } + // FIXME: defragment shouldn't be needed + insert_into_cell( + parent_contents, + &new_divider_cell, + first_divider_cell + i, self.usable_space() as u16, - ); - - contents.write_u8(PAGE_HEADER_OFFSET_FRAGMENTED_BYTES_COUNT, 0); - if !contents.is_leaf() { - contents.write_u32(PAGE_HEADER_OFFSET_RIGHTMOST_PTR, 0); - } + ) + .unwrap(); } - - let mut current_cell_index = 0_usize; - /* index to scratch cells that will be used as dividers in order */ - let mut divider_cells_index = Vec::with_capacity(split_pages.len()); - - debug!("balance_leaf::distribute(cells={})", scratch_cells.len()); - - for (i, page) in split_pages.iter_mut().enumerate() { - let page_id = page.get().id; - let contents = page.get().contents.as_mut().unwrap(); - - let cells_to_copy = write_info.split_pages_cells_count.borrow()[i]; - debug!( - "balance_leaf::distribute(page={}, cells_to_copy={})", - page_id, cells_to_copy - ); - - let cell_index_range = current_cell_index..current_cell_index + cells_to_copy; - for (j, cell_idx) in cell_index_range.enumerate() { - debug!("balance_leaf::distribute_in_page(page={}, cells_to_copy={}, j={}, cell_idx={})", page_id, cells_to_copy, j, cell_idx); - - let cell = scratch_cells[cell_idx]; - self.insert_into_cell(contents, cell, j); + // TODO: update pages + let mut done = vec![false; sibling_count_new]; + for i in (1 - sibling_count_new as i64)..sibling_count_new as i64 { + let page_idx = i.unsigned_abs() as usize; + if done[page_idx] { + continue; } - divider_cells_index.push(current_cell_index + cells_to_copy - 1); - current_cell_index += cells_to_copy; - } - - let is_leaf = { - let page = self.stack.top(); - let page = page.get().contents.as_ref().unwrap(); - page.is_leaf() - }; - - // update rightmost pointer for each page if we are in interior page - if !is_leaf { - for page in split_pages.iter_mut().take(split_pages_len - 1) { - let contents = page.get().contents.as_mut().unwrap(); - - assert!(contents.cell_count() >= 1); - let last_cell = contents.cell_get( - contents.cell_count() - 1, - self.pager.clone(), - self.payload_overflow_threshold_max(contents.page_type()), - self.payload_overflow_threshold_min(contents.page_type()), - self.usable_space(), - )?; - let last_cell_pointer = match last_cell { - BTreeCell::TableInteriorCell(interior) => interior._left_child_page, - _ => unreachable!(), + if i >= 0 + || count_cells_in_old_pages[page_idx - 1] + >= cell_array.number_of_cells_per_page[page_idx - 1] + { + let (start_old_cells, start_new_cells, number_new_cells) = if page_idx == 0 + { + (0, 0, cell_array.cell_count(0)) + } else { + let this_was_old_page = page_idx < sibling_count; + let start_old_cells = if this_was_old_page { + count_cells_in_old_pages[page_idx - 1] as usize + + (!leaf_data) as usize + } else { + cell_array.cells.len() + }; + let start_new_cells = + cell_array.cell_count(page_idx - 1) + (!leaf_data) as usize; + ( + start_old_cells, + start_new_cells, + cell_array.cell_count(page_idx) - start_new_cells, + ) }; - self.drop_cell(contents, contents.cell_count() - 1); - contents.write_u32(PAGE_HEADER_OFFSET_RIGHTMOST_PTR, last_cell_pointer); + let page = pages_to_balance_new[page_idx].get_contents(); + edit_page( + page, + start_old_cells, + start_new_cells, + number_new_cells, + &cell_array, + self.usable_space() as u16, + )?; + tracing::trace!( + "edit_page page={} cells={}", + pages_to_balance_new[page_idx].get().id, + page.cell_count() + ); + page.overflow_cells.clear(); + + done[page_idx] = true; } - // last page right most pointer points to previous right most pointer before splitting - let last_page = split_pages.last().unwrap(); - let last_page_contents = last_page.get().contents.as_mut().unwrap(); - last_page_contents.write_u32( - PAGE_HEADER_OFFSET_RIGHTMOST_PTR, - write_info.rightmost_pointer.borrow().unwrap(), - ); - } - - // insert dividers in parent - // we can consider dividers the first cell of each page starting from the second page - for (page_id_index, page) in - split_pages.iter_mut().take(split_pages_len - 1).enumerate() - { - let contents = page.get().contents.as_mut().unwrap(); - let divider_cell_index = divider_cells_index[page_id_index]; - let cell_payload = scratch_cells[divider_cell_index]; - let cell = read_btree_cell( - cell_payload, - &contents.page_type(), - 0, - self.pager.clone(), - self.payload_overflow_threshold_max(contents.page_type()), - self.payload_overflow_threshold_min(contents.page_type()), - self.usable_space(), - )?; - - let key = match cell { - BTreeCell::TableLeafCell(TableLeafCell { _rowid, .. }) - | BTreeCell::TableInteriorCell(TableInteriorCell { _rowid, .. }) => _rowid, - _ => unreachable!(), - }; - - let mut divider_cell = Vec::with_capacity(4 + 9); // 4 - page id, 9 - max length of varint - divider_cell.extend_from_slice(&(page.get().id as u32).to_be_bytes()); - write_varint_to_vec(key, &mut divider_cell); - - let parent_cell_idx = self.find_cell(parent_contents, key); - self.insert_into_cell(parent_contents, ÷r_cell, parent_cell_idx); - } - - { - // copy last page id to right pointer - let last_pointer = split_pages.last().unwrap().get().id as u32; - parent_contents.write_u32(right_pointer, last_pointer); } + // TODO: balance root self.stack.pop(); - let _ = write_info.page_copy.take(); - (WriteState::BalanceStart, Ok(CursorResult::Ok(()))) + // TODO: free pages + (WriteState::Finish, Ok(CursorResult::Ok(()))) } WriteState::Finish => todo!(), }; @@ -1400,439 +1471,74 @@ impl BTreeCursor { }; let offset = if is_page_1 { DATABASE_HEADER_SIZE } else { 0 }; - let new_root_page = self.allocate_page(PageType::TableInterior, offset); - { - let current_root = self.stack.top(); - let current_root_contents = current_root.get().contents.as_ref().unwrap(); - let new_root_page_contents = new_root_page.get().contents.as_mut().unwrap(); - if is_page_1 { - // Copy header - let current_root_buf = current_root_contents.as_ptr(); - let new_root_buf = new_root_page_contents.as_ptr(); - new_root_buf[0..DATABASE_HEADER_SIZE] - .copy_from_slice(¤t_root_buf[0..DATABASE_HEADER_SIZE]); - } - // point new root right child to previous root - new_root_page_contents.write_u32( - PAGE_HEADER_OFFSET_RIGHTMOST_PTR, - current_root.get().id as u32, - ); - new_root_page_contents.write_u16(PAGE_HEADER_OFFSET_CELL_COUNT, 0); - } + let root = self.stack.top(); + let root_contents = root.get_contents(); + let child = self.allocate_page(root_contents.page_type(), 0); - /* swap split page buffer with new root buffer so we don't have to update page idx */ - { - let (root_id, child_id, child) = { - let page_ref = self.stack.top(); - let child = page_ref.clone(); + tracing::debug!( + "Balancing root. root={}, rightmost={}", + root.get().id, + child.get().id + ); - // Swap the entire Page structs - std::mem::swap(&mut child.get().id, &mut new_root_page.get().id); - // TODO:: shift bytes by offset to left on child because now child has offset 100 - // and header bytes - // Also change the offset of page - // - if is_page_1 { - // Remove header from child and set offset to 0 - let contents = child.get().contents.as_mut().unwrap(); - let (cell_pointer_offset, _) = contents.cell_pointer_array_offset_and_size(); - // change cell pointers - for cell_idx in 0..contents.cell_count() { - let cell_pointer_offset = cell_pointer_offset + (2 * cell_idx) - offset; - let pc = contents.read_u16(cell_pointer_offset); - contents.write_u16(cell_pointer_offset, pc - offset as u16); - } + self.pager.add_dirty(root.get().id); + self.pager.add_dirty(child.get().id); - contents.offset = 0; - let buf = contents.as_ptr(); - buf.copy_within(DATABASE_HEADER_SIZE.., 0); - } + let root_buf = root_contents.as_ptr(); + let child_contents = child.get_contents(); + let child_buf = child_contents.as_ptr(); + let (root_pointer_start, root_pointer_len) = + root_contents.cell_pointer_array_offset_and_size(); + let (child_pointer_start, _) = child.get_contents().cell_pointer_array_offset_and_size(); - self.pager.add_dirty(new_root_page.get().id); - self.pager.add_dirty(child.get().id); - (new_root_page.get().id, child.get().id, child) - }; + let top = root_contents.cell_content_area() as usize; - debug!("Balancing root. root={}, rightmost={}", root_id, child_id); - let root = new_root_page.clone(); + // 1. Modify child + // Copy pointers + child_buf[child_pointer_start..child_pointer_start + root_pointer_len] + .copy_from_slice(&root_buf[root_pointer_start..root_pointer_start + root_pointer_len]); + // Copy cell contents + child_buf[top..].copy_from_slice(&root_buf[top..]); + // Copy header + child_buf[0..root_contents.header_size()] + .copy_from_slice(&root_buf[offset..offset + root_contents.header_size()]); + // Copy overflow cells + child_contents.overflow_cells = root_contents.overflow_cells.clone(); - self.root_page = root_id; - self.stack.clear(); - self.stack.push(root.clone()); - self.stack.push(child.clone()); + // 2. Modify root + let new_root_page_type = match root_contents.page_type() { + PageType::IndexLeaf => PageType::IndexInterior, + PageType::TableLeaf => PageType::TableInterior, + _ => unreachable!("invalid root non leaf page type"), + } as u8; + // set new page type + root_contents.write_u8(PAGE_HEADER_OFFSET_PAGE_TYPE, new_root_page_type); + root_contents.write_u32(PAGE_HEADER_OFFSET_RIGHTMOST_PTR, child.get().id as u32); + root_contents.write_u16( + PAGE_HEADER_OFFSET_CELL_CONTENT_AREA, + self.usable_space() as u16, + ); + root_contents.write_u16(PAGE_HEADER_OFFSET_CELL_COUNT, 0); + root_contents.write_u16(PAGE_HEADER_OFFSET_FIRST_FREEBLOCK, 0); - self.pager.put_loaded_page(root_id, root); - self.pager.put_loaded_page(child_id, child); - } + root_contents.write_u8(PAGE_HEADER_OFFSET_FRAGMENTED_BYTES_COUNT, 0); + root_contents.overflow_cells.clear(); + self.root_page = root.get().id; + self.stack.clear(); + self.stack.push(root.clone()); + self.stack.advance(); + self.stack.push(child.clone()); } /// Allocate a new page to the btree via the pager. /// This marks the page as dirty and writes the page header. fn allocate_page(&self, page_type: PageType, offset: usize) -> PageRef { let page = self.pager.allocate_page().unwrap(); - btree_init_page(&page, page_type, &self.pager.db_header.borrow(), offset); + btree_init_page(&page, page_type, offset, self.usable_space() as u16); page } - /// Allocate a new overflow page. - /// This is done when a cell overflows and new space is needed. - fn allocate_overflow_page(&self) -> PageRef { - let page = self.pager.allocate_page().unwrap(); - - // setup overflow page - let contents = page.get().contents.as_mut().unwrap(); - let buf = contents.as_ptr(); - buf.fill(0); - - page - } - - /// Allocate space for a cell on a page. - fn allocate_cell_space(&self, page_ref: &mut PageContent, amount: u16) -> Result { - let amount = amount as usize; - let (cell_offset, _) = page_ref.cell_pointer_array_offset_and_size(); - let gap = cell_offset + 2 * page_ref.cell_count(); - let mut top = page_ref.cell_content_area() as usize; - - if page_ref.first_freeblock() != 0 && gap + 2 <= top { - let db_header = RefCell::borrow(&self.pager.db_header); - let pc = self.find_free_cell(page_ref, amount, db_header)?; - if pc != 0 { - // Corruption check - if pc <= gap { - return Err(LimboError::Corrupt( - "Corrupted page: free block overlaps cell pointer array".into(), - )); - } - return Ok(pc as u16); - } - } - - if gap + 2 + amount > top { - // defragment - self.defragment_page(page_ref, RefCell::borrow(&self.pager.db_header)); - top = page_ref.read_u16(PAGE_HEADER_OFFSET_CELL_CONTENT_AREA) as usize; - assert!(gap + 2 + amount <= top); - } - - top -= amount; - page_ref.write_u16(PAGE_HEADER_OFFSET_CELL_CONTENT_AREA, top as u16); - - let db_header = RefCell::borrow(&self.pager.db_header); - let usable_space = (db_header.page_size - db_header.reserved_space as u16) as usize; - assert!(top + amount <= usable_space); - - Ok(top as u16) - } - - /// Defragment a page. This means packing all the cells to the end of the page. - fn defragment_page(&self, page: &PageContent, db_header: Ref) { - debug!("defragment_page"); - let cloned_page = page.clone(); - // TODO(pere): usable space should include offset probably - let usable_space = (db_header.page_size - db_header.reserved_space as u16) as u64; - let mut cbrk = usable_space; - - // TODO: implement fast algorithm - - let last_cell = usable_space - 4; - let first_cell = cloned_page.unallocated_region_start() as u64; - - if cloned_page.cell_count() > 0 { - let page_type = page.page_type(); - let read_buf = cloned_page.as_ptr(); - let write_buf = page.as_ptr(); - - for i in 0..cloned_page.cell_count() { - let cell_offset = page.offset + 8; - let cell_idx = cell_offset + i * 2; - - let pc = u16::from_be_bytes([read_buf[cell_idx], read_buf[cell_idx + 1]]) as u64; - if pc > last_cell { - unimplemented!("corrupted page"); - } - - assert!(pc <= last_cell); - - let size = match page_type { - PageType::TableInterior => { - let (_, nr_key) = match read_varint(&read_buf[pc as usize ..]) { - Ok(v) => v, - Err(_) => todo!( - "error while parsing varint from cell, probably treat this as corruption?" - ), - }; - 4 + nr_key as u64 - } - PageType::TableLeaf => { - let (payload_size, nr_payload) = match read_varint(&read_buf[pc as usize..]) { - Ok(v) => v, - Err(_) => todo!( - "error while parsing varint from cell, probably treat this as corruption?" - ), - }; - let (_, nr_key) = match read_varint(&read_buf[pc as usize + nr_payload..]) { - Ok(v) => v, - Err(_) => todo!( - "error while parsing varint from cell, probably treat this as corruption?" - ), - }; - // TODO: add overflow page calculation - payload_size + nr_payload as u64 + nr_key as u64 - } - PageType::IndexInterior => todo!(), - PageType::IndexLeaf => todo!(), - }; - cbrk -= size; - if cbrk < first_cell || pc + size > usable_space { - todo!("corrupt"); - } - assert!(cbrk + size <= usable_space && cbrk >= first_cell); - // set new pointer - write_buf[cell_idx..cell_idx + 2].copy_from_slice(&(cbrk as u16).to_be_bytes()); - // copy payload - write_buf[cbrk as usize..cbrk as usize + size as usize] - .copy_from_slice(&read_buf[pc as usize..pc as usize + size as usize]); - } - } - - // assert!( nfree >= 0 ); - // if( data[hdr+7]+cbrk-iCellFirst!=pPage->nFree ){ - // return SQLITE_CORRUPT_PAGE(pPage); - // } - assert!(cbrk >= first_cell); - let write_buf = page.as_ptr(); - - // set new first byte of cell content - page.write_u16(PAGE_HEADER_OFFSET_CELL_CONTENT_AREA, cbrk as u16); - // set free block to 0, unused spaced can be retrieved from gap between cell pointer end and content start - page.write_u16(PAGE_HEADER_OFFSET_FIRST_FREEBLOCK, 0); - // set fragmented bytes counter to zero - page.write_u8(PAGE_HEADER_OFFSET_FRAGMENTED_BYTES_COUNT, 0); - // set unused space to 0 - let first_cell = cloned_page.cell_content_area() as u64; - assert!(first_cell <= cbrk); - write_buf[first_cell as usize..cbrk as usize].fill(0); - } - - /// Free blocks can be zero, meaning the "real free space" that can be used to allocate is expected to be between first cell byte - /// and end of cell pointer area. - #[allow(unused_assignments)] - fn compute_free_space(&self, page: &PageContent, db_header: Ref) -> u16 { - // TODO(pere): maybe free space is not calculated correctly with offset - - // Usable space, not the same as free space, simply means: - // space that is not reserved for extensions by sqlite. Usually reserved_space is 0. - let usable_space = (db_header.page_size - db_header.reserved_space as u16) as usize; - - let mut cell_content_area_start = page.cell_content_area(); - // A zero value for the cell content area pointer is interpreted as 65536. - // See https://www.sqlite.org/fileformat.html - // The max page size for a sqlite database is 64kiB i.e. 65536 bytes. - // 65536 is u16::MAX + 1, and since cell content grows from right to left, this means - // the cell content area pointer is at the end of the page, - // i.e. - // 1. the page size is 64kiB - // 2. there are no cells on the page - // 3. there is no reserved space at the end of the page - if cell_content_area_start == 0 { - cell_content_area_start = u16::MAX; - } - - // The amount of free space is the sum of: - // #1. the size of the unallocated region - // #2. fragments (isolated 1-3 byte chunks of free space within the cell content area) - // #3. freeblocks (linked list of blocks of at least 4 bytes within the cell content area that are not in use due to e.g. deletions) - - let mut free_space_bytes = - page.unallocated_region_size() + page.num_frag_free_bytes() as usize; - - // #3 is computed by iterating over the freeblocks linked list - let mut cur_freeblock_ptr = page.first_freeblock() as usize; - let page_buf = page.as_ptr(); - if cur_freeblock_ptr > 0 { - if cur_freeblock_ptr < cell_content_area_start as usize { - // Freeblocks exist in the cell content area e.g. after deletions - // They should never exist in the unused area of the page. - todo!("corrupted page"); - } - - let mut next = 0; - let mut size = 0; - loop { - // TODO: check corruption icellast - next = u16::from_be_bytes( - page_buf[cur_freeblock_ptr..cur_freeblock_ptr + 2] - .try_into() - .unwrap(), - ) as usize; // first 2 bytes in freeblock = next freeblock pointer - size = u16::from_be_bytes( - page_buf[cur_freeblock_ptr + 2..cur_freeblock_ptr + 4] - .try_into() - .unwrap(), - ) as usize; // next 2 bytes in freeblock = size of current freeblock - free_space_bytes += size; - // Freeblocks are in order from left to right on the page, - // so next pointer should > current pointer + its size, or 0 if no next block exists. - if next <= cur_freeblock_ptr + size + 3 { - break; - } - cur_freeblock_ptr = next; - } - - // Next should always be 0 (NULL) at this point since we have reached the end of the freeblocks linked list - assert_eq!( - next, 0, - "corrupted page: freeblocks list not in ascending order" - ); - - assert!( - cur_freeblock_ptr + size <= usable_space, - "corrupted page: last freeblock extends last page end" - ); - } - - assert!( - free_space_bytes <= usable_space, - "corrupted page: free space is greater than usable space" - ); - - // if( nFree>usableSize || nFree, - cell_payload: &mut Vec, - record: &Record, - ) { - assert!(matches!( - page_type, - PageType::TableLeaf | PageType::IndexLeaf - )); - // TODO: make record raw from start, having to serialize is not good - let mut record_buf = Vec::new(); - record.serialize(&mut record_buf); - - // fill in header - if matches!(page_type, PageType::TableLeaf) { - let int_key = int_key.unwrap(); - write_varint_to_vec(record_buf.len() as u64, cell_payload); - write_varint_to_vec(int_key, cell_payload); - } else { - write_varint_to_vec(record_buf.len() as u64, cell_payload); - } - - let payload_overflow_threshold_max = self.payload_overflow_threshold_max(page_type.clone()); - debug!( - "fill_cell_payload(record_size={}, payload_overflow_threshold_max={})", - record_buf.len(), - payload_overflow_threshold_max - ); - if record_buf.len() <= payload_overflow_threshold_max { - // enough allowed space to fit inside a btree page - cell_payload.extend_from_slice(record_buf.as_slice()); - return; - } - debug!("fill_cell_payload(overflow)"); - - let payload_overflow_threshold_min = self.payload_overflow_threshold_min(page_type); - // see e.g. https://github.com/sqlite/sqlite/blob/9591d3fe93936533c8c3b0dc4d025ac999539e11/src/dbstat.c#L371 - let mut space_left = payload_overflow_threshold_min - + (record_buf.len() - payload_overflow_threshold_min) % (self.usable_space() - 4); - - if space_left > payload_overflow_threshold_max { - space_left = payload_overflow_threshold_min; - } - - // cell_size must be equal to first value of space_left as this will be the bytes copied to non-overflow page. - let cell_size = space_left + cell_payload.len() + 4; // 4 is the number of bytes of pointer to first overflow page - let mut to_copy_buffer = record_buf.as_slice(); - - let prev_size = cell_payload.len(); - cell_payload.resize(prev_size + space_left + 4, 0); - let mut pointer = unsafe { cell_payload.as_mut_ptr().add(prev_size) }; - let mut pointer_to_next = unsafe { cell_payload.as_mut_ptr().add(prev_size + space_left) }; - let mut overflow_pages = Vec::new(); - - loop { - let to_copy = space_left.min(to_copy_buffer.len()); - unsafe { std::ptr::copy(to_copy_buffer.as_ptr(), pointer, to_copy) }; - - let left = to_copy_buffer.len() - to_copy; - if left == 0 { - break; - } - - // we still have bytes to add, we will need to allocate new overflow page - let overflow_page = self.allocate_overflow_page(); - overflow_pages.push(overflow_page.clone()); - { - let id = overflow_page.get().id as u32; - let contents = overflow_page.get().contents.as_mut().unwrap(); - - // TODO: take into account offset here? - let buf = contents.as_ptr(); - let as_bytes = id.to_be_bytes(); - // update pointer to new overflow page - unsafe { std::ptr::copy(as_bytes.as_ptr(), pointer_to_next, 4) }; - - pointer = unsafe { buf.as_mut_ptr().add(4) }; - pointer_to_next = buf.as_mut_ptr(); - space_left = self.usable_space() - 4; - } - - to_copy_buffer = &to_copy_buffer[to_copy..]; - } - - assert_eq!(cell_size, cell_payload.len()); - } - - /// Returns the maximum payload size (X) that can be stored directly on a b-tree page without spilling to overflow pages. - /// - /// For table leaf pages: X = usable_size - 35 - /// For index pages: X = ((usable_size - 12) * 64/255) - 23 - /// - /// The usable size is the total page size less the reserved space at the end of each page. - /// These thresholds are designed to: - /// - Give a minimum fanout of 4 for index b-trees - /// - Ensure enough payload is on the b-tree page that the record header can usually be accessed - /// without consulting an overflow page - fn payload_overflow_threshold_max(&self, page_type: PageType) -> usize { - let usable_size = self.usable_space(); - match page_type { - PageType::IndexInterior | PageType::IndexLeaf => { - ((usable_size - 12) * 64 / 255) - 23 // Index page formula - } - PageType::TableInterior | PageType::TableLeaf => { - usable_size - 35 // Table leaf page formula - } - } - } - - /// Returns the minimum payload size (M) that must be stored on the b-tree page before spilling to overflow pages is allowed. - /// - /// For all page types: M = ((usable_size - 12) * 32/255) - 23 - /// - /// When payload size P exceeds max_local(): - /// - If K = M + ((P-M) % (usable_size-4)) <= max_local(): store K bytes on page - /// - Otherwise: store M bytes on page - /// - /// The remaining bytes are stored on overflow pages in both cases. - fn payload_overflow_threshold_min(&self, _page_type: PageType) -> usize { - let usable_size = self.usable_space(); - // Same formula for all page types - ((usable_size - 12) * 32 / 255) - 23 - } - /// The "usable size" of a database page is the page size specified by the 2-byte integer at offset 16 /// in the header, minus the "reserved" space size recorded in the 1-byte integer at offset 20 in the header. /// The usable size of a page might be an odd number. However, the usable size is not allowed to be less than 480. @@ -1852,8 +1558,8 @@ impl BTreeCursor { .cell_get( cell_idx, self.pager.clone(), - self.payload_overflow_threshold_max(page.page_type()), - self.payload_overflow_threshold_min(page.page_type()), + payload_overflow_threshold_max(page.page_type(), self.usable_space() as u16), + payload_overflow_threshold_min(page.page_type(), self.usable_space() as u16), self.usable_space(), ) .unwrap() @@ -1993,8 +1699,8 @@ impl BTreeCursor { let cell = contents.cell_get( idx, self.pager.clone(), - self.payload_overflow_threshold_max(contents.page_type()), - self.payload_overflow_threshold_min(contents.page_type()), + payload_overflow_threshold_max(contents.page_type(), self.usable_space() as u16), + payload_overflow_threshold_min(contents.page_type(), self.usable_space() as u16), self.usable_space(), )?; @@ -2015,8 +1721,8 @@ impl BTreeCursor { let cell = contents.cell_get( cell_idx, self.pager.clone(), - self.payload_overflow_threshold_max(contents.page_type()), - self.payload_overflow_threshold_min(contents.page_type()), + payload_overflow_threshold_max(contents.page_type(), self.usable_space() as u16), + payload_overflow_threshold_min(contents.page_type(), self.usable_space() as u16), self.usable_space(), )?; @@ -2064,8 +1770,14 @@ impl BTreeCursor { let predecessor_cell = leaf_contents.cell_get( leaf_cell_idx, self.pager.clone(), - self.payload_overflow_threshold_max(leaf_contents.page_type()), - self.payload_overflow_threshold_min(leaf_contents.page_type()), + payload_overflow_threshold_max( + leaf_contents.page_type(), + self.usable_space() as u16, + ), + payload_overflow_threshold_min( + leaf_contents.page_type(), + self.usable_space() as u16, + ), self.usable_space(), )?; @@ -2081,11 +1793,17 @@ impl BTreeCursor { } _ => unreachable!("Expected table leaf cell"), } - self.insert_into_cell(contents, &cell_payload, cell_idx); - self.drop_cell(contents, cell_idx); + insert_into_cell( + contents, + &cell_payload, + cell_idx, + self.usable_space() as u16, + ) + .unwrap(); + drop_cell(contents, cell_idx, self.usable_space() as u16)?; } else { // For leaf nodes, simply remove the cell - self.drop_cell(contents, cell_idx); + drop_cell(contents, cell_idx, self.usable_space() as u16)?; } // TODO(Krishna): Implement balance after delete. I will implement after balance_nonroot is extended. @@ -2124,8 +1842,8 @@ impl BTreeCursor { let equals = match &contents.cell_get( cell_idx, self.pager.clone(), - self.payload_overflow_threshold_max(contents.page_type()), - self.payload_overflow_threshold_min(contents.page_type()), + payload_overflow_threshold_max(contents.page_type(), self.usable_space() as u16), + payload_overflow_threshold_min(contents.page_type(), self.usable_space() as u16), self.usable_space(), )? { BTreeCell::TableLeafCell(l) => l._rowid == int_key, @@ -2213,8 +1931,10 @@ impl BTreeCursor { payload_len: usize, page_type: PageType, ) -> Result> { - let max_local = self.payload_overflow_threshold_max(page_type.clone()); - let min_local = self.payload_overflow_threshold_min(page_type.clone()); + let max_local = + payload_overflow_threshold_max(page_type.clone(), self.usable_space() as u16); + let min_local = + payload_overflow_threshold_min(page_type.clone(), self.usable_space() as u16); let usable_size = self.usable_space(); let (_, local_size) = payload_overflows(payload_len, max_local, min_local, usable_size); @@ -2240,7 +1960,7 @@ impl PageStack { /// Push a new page onto the stack. /// This effectively means traversing to a child page. fn push(&self, page: PageRef) { - debug!( + tracing::trace!( "pagestack::push(current={}, new_page_id={})", self.current_page.borrow(), page.get().id @@ -2259,7 +1979,7 @@ impl PageStack { /// This effectively means traversing back up to a parent page. fn pop(&self) { let current = *self.current_page.borrow(); - debug!("pagestack::pop(current={})", current); + tracing::trace!("pagestack::pop(current={})", current); self.cell_indices.borrow_mut()[current as usize] = 0; self.stack.borrow_mut()[current as usize] = None; *self.current_page.borrow_mut() -= 1; @@ -2273,7 +1993,7 @@ impl PageStack { .as_ref() .unwrap() .clone(); - debug!( + tracing::trace!( "pagestack::top(current={}, page_id={})", current, page.get().id @@ -2281,15 +2001,6 @@ impl PageStack { page } - /// Get the parent page of the current page. - fn parent(&self) -> PageRef { - let current = *self.current_page.borrow(); - self.stack.borrow()[current as usize - 1] - .as_ref() - .unwrap() - .clone() - } - /// Current page pointer being used fn current(&self) -> usize { *self.current_page.borrow() as usize @@ -2309,13 +2020,16 @@ impl PageStack { } /// Advance the current cell index of the current page to the next cell. + /// We usually advance after going traversing a new page fn advance(&self) { let current = self.current(); + tracing::trace!("advance {}", self.cell_indices.borrow()[current]); self.cell_indices.borrow_mut()[current] += 1; } fn retreat(&self) { let current = self.current(); + tracing::trace!("retreat {}", self.cell_indices.borrow()[current]); self.cell_indices.borrow_mut()[current] -= 1; } @@ -2333,12 +2047,68 @@ impl PageStack { } } -pub fn btree_init_page( - page: &PageRef, - page_type: PageType, - db_header: &DatabaseHeader, - offset: usize, -) { +impl CellArray { + pub fn cell_size(&self, cell_idx: usize) -> u16 { + self.cells[cell_idx].len() as u16 + } + + pub fn cell_count(&self, page_idx: usize) -> usize { + self.number_of_cells_per_page[page_idx] as usize + } +} + +/// Try to find a free block available and allocate it if found +fn find_free_cell(page_ref: &PageContent, usable_space: u16, amount: usize) -> Result { + // NOTE: freelist is in ascending order of keys and pc + // unuse_space is reserved bytes at the end of page, therefore we must substract from maxpc + let mut pc = page_ref.first_freeblock() as usize; + let mut prev_pc = page_ref.offset + PAGE_HEADER_OFFSET_FIRST_FREEBLOCK; + + let buf = page_ref.as_ptr(); + + let usable_space = usable_space as usize; + let maxpc = usable_space - amount; + while pc <= maxpc { + let next = u16::from_be_bytes(buf[pc..pc + 2].try_into().unwrap()); + let size = u16::from_be_bytes(buf[pc + 2..pc + 4].try_into().unwrap()); + if amount <= size as usize { + if amount == size as usize { + // delete whole thing + page_ref.write_u16(PAGE_HEADER_OFFSET_FIRST_FREEBLOCK, next); + } else { + // take only the part we are interested in by reducing the size + let new_size = size - amount as u16; + // size includes 4 bytes of freeblock + // we need to leave the free block at least + if new_size >= 4 { + buf[pc + 2..pc + 4].copy_from_slice(&new_size.to_be_bytes()); + } else { + // increase fragment size and delete entry from free list + buf[prev_pc..prev_pc + 2].copy_from_slice(&next.to_be_bytes()); + let frag = page_ref.num_frag_free_bytes() + new_size as u8; + page_ref.write_u8(PAGE_HEADER_OFFSET_FRAGMENTED_BYTES_COUNT, frag); + } + pc += new_size as usize; + } + return Ok(pc); + } + prev_pc = pc; + pc = next as usize; + if pc <= prev_pc && pc != 0 { + return Err(LimboError::Corrupt( + "Free list not in ascending order".into(), + )); + } + } + if pc > maxpc + amount - 4 { + return Err(LimboError::Corrupt( + "Free block chain extends beyond page end".into(), + )); + } + Ok(0) +} + +pub fn btree_init_page(page: &PageRef, page_type: PageType, offset: usize, usable_space: u16) { // setup btree page let contents = page.get(); debug!("btree_init_page(id={}, offset={})", contents.id, offset); @@ -2349,18 +2119,656 @@ pub fn btree_init_page( contents.write_u16(PAGE_HEADER_OFFSET_FIRST_FREEBLOCK, 0); contents.write_u16(PAGE_HEADER_OFFSET_CELL_COUNT, 0); - let cell_content_area_start = db_header.page_size - db_header.reserved_space as u16; - contents.write_u16( - PAGE_HEADER_OFFSET_CELL_CONTENT_AREA, - cell_content_area_start, - ); + contents.write_u16(PAGE_HEADER_OFFSET_CELL_CONTENT_AREA, usable_space); contents.write_u8(PAGE_HEADER_OFFSET_FRAGMENTED_BYTES_COUNT, 0); contents.write_u32(PAGE_HEADER_OFFSET_RIGHTMOST_PTR, 0); } -fn to_static_buf(buf: &[u8]) -> &'static [u8] { - unsafe { std::mem::transmute::<&[u8], &'static [u8]>(buf) } +fn to_static_buf(buf: &mut [u8]) -> &'static mut [u8] { + unsafe { std::mem::transmute::<&mut [u8], &'static mut [u8]>(buf) } +} + +fn edit_page( + page: &mut PageContent, + start_old_cells: usize, + start_new_cells: usize, + number_new_cells: usize, + cell_array: &CellArray, + usable_space: u16, +) -> Result<()> { + tracing::trace!( + "edit_page start_old_cells={} start_new_cells={} number_new_cells={} cell_array={}", + start_old_cells, + start_new_cells, + number_new_cells, + cell_array.cells.len() + ); + let end_old_cells = start_old_cells + page.cell_count() + page.overflow_cells.len(); + let end_new_cells = start_new_cells + number_new_cells; + let mut count_cells = page.cell_count(); + if start_old_cells < start_new_cells { + let number_to_shift = page_free_array( + page, + start_old_cells, + start_new_cells - start_old_cells, + cell_array, + usable_space, + )?; + // shift pointers left + let buf = page.as_ptr(); + let (start, _) = page.cell_pointer_array_offset_and_size(); + buf.copy_within( + start + (number_to_shift * 2)..start + (count_cells * 2), + start, + ); + count_cells -= number_to_shift; + } + if end_new_cells < end_old_cells { + let number_tail_removed = page_free_array( + page, + end_new_cells, + end_old_cells - end_new_cells, + cell_array, + usable_space, + )?; + assert!(count_cells >= number_tail_removed); + count_cells -= number_tail_removed; + } + // TODO: make page_free_array defragment, for now I'm lazy so this will work for now. + defragment_page(page, usable_space); + // TODO: add to start + if start_new_cells < start_old_cells { + let count = number_new_cells.min(start_old_cells - start_new_cells); + page_insert_array(page, start_new_cells, count, cell_array, 0, usable_space)?; + count_cells += count; + } + // TODO: overflow cells + for i in 0..page.overflow_cells.len() { + let overflow_cell = &page.overflow_cells[i]; + // cell index in context of new list of cells that should be in the page + if start_old_cells + overflow_cell.index >= start_new_cells { + let cell_idx = start_old_cells + overflow_cell.index - start_new_cells; + if cell_idx < number_new_cells { + count_cells += 1; + page_insert_array( + page, + start_new_cells + cell_idx, + 1, + cell_array, + cell_idx, + usable_space, + )?; + } + } + } + // TODO: append cells to end + page_insert_array( + page, + start_new_cells + count_cells, + number_new_cells - count_cells, + cell_array, + count_cells, + usable_space, + )?; + // TODO: noverflow + page.write_u16(PAGE_HEADER_OFFSET_CELL_COUNT, number_new_cells as u16); + Ok(()) +} + +fn page_free_array( + page: &mut PageContent, + first: usize, + count: usize, + cell_array: &CellArray, + usable_space: u16, +) -> Result { + let buf = &mut page.as_ptr()[page.offset..usable_space as usize]; + let buf_range = buf.as_ptr_range(); + let mut number_of_cells_removed = 0; + // TODO: implement fancy smart free block coalescing procedure instead of dumb free to + // then defragment + for i in first..first + count { + let cell = &cell_array.cells[i]; + let cell_pointer = cell.as_ptr_range(); + // check if not overflow cell + if cell_pointer.start >= buf_range.start && cell_pointer.start < buf_range.end { + assert!( + cell_pointer.end >= buf_range.start && cell_pointer.end <= buf_range.end, + "whole cell should be inside the page" + ); + // TODO: remove pointer too + let offset = (cell_pointer.start as usize - buf_range.start as usize) as u16; + let len = (cell_pointer.end as usize - cell_pointer.start as usize) as u16; + free_cell_range(page, offset, len, usable_space)?; + page.write_u16(PAGE_HEADER_OFFSET_CELL_COUNT, page.cell_count() as u16 - 1); + number_of_cells_removed += 1; + } + } + Ok(number_of_cells_removed) +} +fn page_insert_array( + page: &mut PageContent, + first: usize, + count: usize, + cell_array: &CellArray, + mut start_insert: usize, + usable_space: u16, +) -> Result<()> { + // TODO: implement faster algorithm, this is doing extra work that's not needed. + // See pageInsertArray to understand faster way. + for i in first..first + count { + insert_into_cell(page, cell_array.cells[i], start_insert, usable_space)?; + start_insert += 1; + } + Ok(()) +} + +/// Free the range of bytes that a cell occupies. +/// This function also updates the freeblock list in the page. +/// Freeblocks are used to keep track of free space in the page, +/// and are organized as a linked list. +fn free_cell_range( + page: &mut PageContent, + mut offset: u16, + len: u16, + usable_space: u16, +) -> Result<()> { + let mut size = len; + let mut end = offset + len; + let mut pointer_to_pc = page.offset as u16 + 1; + // if the freeblock list is empty, we set this block as the first freeblock in the page header. + let pc = if page.first_freeblock() == 0 { + 0 + } else { + // if the freeblock list is not empty, and the offset is greater than the first freeblock, + // then we need to do some more calculation to figure out where to insert the freeblock + // in the freeblock linked list. + let first_block = page.first_freeblock(); + + let mut pc = first_block; + + while pc < offset { + if pc <= pointer_to_pc { + if pc == 0 { + break; + } + return Err(LimboError::Corrupt( + "free cell range free block not in ascending order".into(), + )); + } + + let next = page.read_u16_no_offset(pc as usize); + pointer_to_pc = pc; + pc = next; + } + + if pc > usable_space - 4 { + return Err(LimboError::Corrupt("Free block beyond usable space".into())); + } + let mut removed_fragmentation = 0; + if pc > 0 && offset + len + 3 >= pc { + removed_fragmentation = (pc - end) as u8; + + if end > pc { + return Err(LimboError::Corrupt("Invalid block overlap".into())); + } + end = pc + page.read_u16_no_offset(pc as usize + 2); + if end > usable_space { + return Err(LimboError::Corrupt( + "Coalesced block extends beyond page".into(), + )); + } + size = end - offset; + pc = page.read_u16_no_offset(pc as usize); + } + + if pointer_to_pc > page.offset as u16 + 1 { + let prev_end = pointer_to_pc + page.read_u16_no_offset(pointer_to_pc as usize + 2); + if prev_end + 3 >= offset { + if prev_end > offset { + return Err(LimboError::Corrupt("Invalid previous block overlap".into())); + } + removed_fragmentation += (offset - prev_end) as u8; + size = end - pointer_to_pc; + offset = pointer_to_pc; + } + } + if removed_fragmentation > page.num_frag_free_bytes() { + return Err(LimboError::Corrupt("Invalid fragmentation count".into())); + } + let frag = page.num_frag_free_bytes() - removed_fragmentation; + page.write_u8(PAGE_HEADER_OFFSET_FRAGMENTED_BYTES_COUNT, frag); + + pc + }; + + if offset <= page.cell_content_area() { + if offset < page.cell_content_area() { + return Err(LimboError::Corrupt("Free block before content area".into())); + } + if pointer_to_pc != page.offset as u16 + PAGE_HEADER_OFFSET_FIRST_FREEBLOCK as u16 { + return Err(LimboError::Corrupt("Invalid content area merge".into())); + } + page.write_u16(PAGE_HEADER_OFFSET_FIRST_FREEBLOCK, pc); + page.write_u16(PAGE_HEADER_OFFSET_CELL_CONTENT_AREA, end); + } else { + page.write_u16_no_offset(pointer_to_pc as usize, offset); + page.write_u16_no_offset(offset as usize, pc); + page.write_u16_no_offset(offset as usize + 2, size); + } + Ok(()) +} + +/// Defragment a page. This means packing all the cells to the end of the page. +fn defragment_page(page: &PageContent, usable_space: u16) { + tracing::debug!("defragment_page"); + let cloned_page = page.clone(); + // TODO(pere): usable space should include offset probably + let mut cbrk = usable_space; + + // TODO: implement fast algorithm + + let last_cell = usable_space - 4; + let first_cell = cloned_page.unallocated_region_start() as u16; + + if cloned_page.cell_count() > 0 { + let read_buf = cloned_page.as_ptr(); + let write_buf = page.as_ptr(); + + for i in 0..cloned_page.cell_count() { + let (cell_offset, _) = page.cell_pointer_array_offset_and_size(); + let cell_idx = cell_offset + (i * 2); + + let pc = cloned_page.read_u16_no_offset(cell_idx); + if pc > last_cell { + unimplemented!("corrupted page"); + } + + assert!(pc <= last_cell); + + let (_, size) = cloned_page.cell_get_raw_region( + i, + payload_overflow_threshold_max(page.page_type(), usable_space), + payload_overflow_threshold_min(page.page_type(), usable_space), + usable_space as usize, + ); + let size = size as u16; + cbrk -= size; + if cbrk < first_cell || pc + size > usable_space { + todo!("corrupt"); + } + assert!(cbrk + size <= usable_space && cbrk >= first_cell); + // set new pointer + page.write_u16_no_offset(cell_idx, cbrk); + // copy payload + write_buf[cbrk as usize..cbrk as usize + size as usize] + .copy_from_slice(&read_buf[pc as usize..pc as usize + size as usize]); + } + } + + // assert!( nfree >= 0 ); + // if( data[hdr+7]+cbrk-iCellFirst!=pPage->nFree ){ + // return SQLITE_CORRUPT_PAGE(pPage); + // } + assert!(cbrk >= first_cell); + + // set new first byte of cell content + page.write_u16(PAGE_HEADER_OFFSET_CELL_CONTENT_AREA, cbrk as u16); + // set free block to 0, unused spaced can be retrieved from gap between cell pointer end and content start + page.write_u16(PAGE_HEADER_OFFSET_FIRST_FREEBLOCK, 0); + page.write_u8(PAGE_HEADER_OFFSET_FRAGMENTED_BYTES_COUNT, 0); +} + +/// Insert a record into a cell. +/// If the cell overflows, an overflow cell is created. +/// insert_into_cell() is called from insert_into_page(), +/// and the overflow cell count is used to determine if the page overflows, +/// i.e. whether we need to balance the btree after the insert. +fn insert_into_cell( + page: &mut PageContent, + payload: &[u8], + cell_idx: usize, + usable_space: u16, +) -> Result<()> { + assert!( + cell_idx <= page.cell_count(), + "attempting to add cell to an incorrect place cell_idx={} cell_count={}", + cell_idx, + page.cell_count() + ); + let free = compute_free_space(page, usable_space); + const CELL_POINTER_SIZE_BYTES: usize = 2; + let enough_space = payload.len() + CELL_POINTER_SIZE_BYTES <= free as usize; + if !enough_space { + // add to overflow cell + page.overflow_cells.push(OverflowCell { + index: cell_idx, + payload: Pin::new(Vec::from(payload)), + }); + return Ok(()); + } + + // TODO: insert into cell payload in internal page + let new_cell_data_pointer = allocate_cell_space(page, payload.len() as u16, usable_space)?; + let buf = page.as_ptr(); + + // copy data + buf[new_cell_data_pointer as usize..new_cell_data_pointer as usize + payload.len()] + .copy_from_slice(payload); + // memmove(pIns+2, pIns, 2*(pPage->nCell - i)); + let (cell_pointer_array_start, _) = page.cell_pointer_array_offset_and_size(); + let cell_pointer_cur_idx = cell_pointer_array_start + (CELL_POINTER_SIZE_BYTES * cell_idx); + + // move existing pointers forward by CELL_POINTER_SIZE_BYTES... + let n_cells_forward = page.cell_count() - cell_idx; + let n_bytes_forward = CELL_POINTER_SIZE_BYTES * n_cells_forward; + if n_bytes_forward > 0 { + buf.copy_within( + cell_pointer_cur_idx..cell_pointer_cur_idx + n_bytes_forward, + cell_pointer_cur_idx + CELL_POINTER_SIZE_BYTES, + ); + } + // ...and insert new cell pointer at the current index + page.write_u16_no_offset(cell_pointer_cur_idx, new_cell_data_pointer); + + // update cell count + let new_n_cells = (page.cell_count() + 1) as u16; + page.write_u16(PAGE_HEADER_OFFSET_CELL_COUNT, new_n_cells); + Ok(()) +} + +/// Free blocks can be zero, meaning the "real free space" that can be used to allocate is expected to be between first cell byte +/// and end of cell pointer area. +#[allow(unused_assignments)] +fn compute_free_space(page: &PageContent, usable_space: u16) -> u16 { + // TODO(pere): maybe free space is not calculated correctly with offset + + // Usable space, not the same as free space, simply means: + // space that is not reserved for extensions by sqlite. Usually reserved_space is 0. + let usable_space = usable_space as usize; + + let mut cell_content_area_start = page.cell_content_area(); + // A zero value for the cell content area pointer is interpreted as 65536. + // See https://www.sqlite.org/fileformat.html + // The max page size for a sqlite database is 64kiB i.e. 65536 bytes. + // 65536 is u16::MAX + 1, and since cell content grows from right to left, this means + // the cell content area pointer is at the end of the page, + // i.e. + // 1. the page size is 64kiB + // 2. there are no cells on the page + // 3. there is no reserved space at the end of the page + if cell_content_area_start == 0 { + cell_content_area_start = u16::MAX; + } + + // The amount of free space is the sum of: + // #1. the size of the unallocated region + // #2. fragments (isolated 1-3 byte chunks of free space within the cell content area) + // #3. freeblocks (linked list of blocks of at least 4 bytes within the cell content area that are not in use due to e.g. deletions) + + let pointer_size = if matches!(page.page_type(), PageType::TableLeaf | PageType::IndexLeaf) { + 0 + } else { + 4 + }; + let first_cell = page.offset + 8 + pointer_size + (2 * page.cell_count()); + let mut free_space_bytes = + cell_content_area_start as usize + page.num_frag_free_bytes() as usize; + + // #3 is computed by iterating over the freeblocks linked list + let mut cur_freeblock_ptr = page.first_freeblock() as usize; + if cur_freeblock_ptr > 0 { + if cur_freeblock_ptr < cell_content_area_start as usize { + // Freeblocks exist in the cell content area e.g. after deletions + // They should never exist in the unused area of the page. + todo!("corrupted page"); + } + + let mut next = 0; + let mut size = 0; + loop { + // TODO: check corruption icellast + next = page.read_u16_no_offset(cur_freeblock_ptr) as usize; // first 2 bytes in freeblock = next freeblock pointer + size = page.read_u16_no_offset(cur_freeblock_ptr + 2) as usize; // next 2 bytes in freeblock = size of current freeblock + free_space_bytes += size; + // Freeblocks are in order from left to right on the page, + // so next pointer should > current pointer + its size, or 0 if no next block exists. + if next <= cur_freeblock_ptr + size + 3 { + break; + } + cur_freeblock_ptr = next; + } + + // Next should always be 0 (NULL) at this point since we have reached the end of the freeblocks linked list + assert!( + next == 0, + "corrupted page: freeblocks list not in ascending order" + ); + + assert!( + cur_freeblock_ptr + size <= usable_space, + "corrupted page: last freeblock extends last page end" + ); + } + + assert!( + free_space_bytes <= usable_space, + "corrupted page: free space is greater than usable space" + ); + + // if( nFree>usableSize || nFree Result { + let amount = amount as usize; + + let (cell_offset, _) = page_ref.cell_pointer_array_offset_and_size(); + let gap = cell_offset + 2 * page_ref.cell_count(); + let mut top = page_ref.cell_content_area() as usize; + + // there are free blocks and enough space + if page_ref.first_freeblock() != 0 && gap + 2 <= top { + // find slot + let pc = find_free_cell(page_ref, usable_space, amount)?; + if pc != 0 { + return Ok(pc as u16); + } + /* fall through, we might need to defragment */ + } + + if gap + 2 + amount > top { + // defragment + defragment_page(page_ref, usable_space); + top = page_ref.read_u16(PAGE_HEADER_OFFSET_CELL_CONTENT_AREA) as usize; + } + + top -= amount; + + page_ref.write_u16(PAGE_HEADER_OFFSET_CELL_CONTENT_AREA, top as u16); + + assert!(top + amount <= usable_space as usize); + Ok(top as u16) +} + +/// Fill in the cell payload with the record. +/// If the record is too large to fit in the cell, it will spill onto overflow pages. +fn fill_cell_payload( + page_type: PageType, + int_key: Option, + cell_payload: &mut Vec, + record: &Record, + usable_space: u16, + pager: Rc, +) { + assert!(matches!( + page_type, + PageType::TableLeaf | PageType::IndexLeaf + )); + // TODO: make record raw from start, having to serialize is not good + let mut record_buf = Vec::new(); + record.serialize(&mut record_buf); + + // fill in header + if matches!(page_type, PageType::TableLeaf) { + let int_key = int_key.unwrap(); + write_varint_to_vec(record_buf.len() as u64, cell_payload); + write_varint_to_vec(int_key, cell_payload); + } else { + write_varint_to_vec(record_buf.len() as u64, cell_payload); + } + + let payload_overflow_threshold_max = + payload_overflow_threshold_max(page_type.clone(), usable_space); + debug!( + "fill_cell_payload(record_size={}, payload_overflow_threshold_max={})", + record_buf.len(), + payload_overflow_threshold_max + ); + if record_buf.len() <= payload_overflow_threshold_max { + // enough allowed space to fit inside a btree page + cell_payload.extend_from_slice(record_buf.as_slice()); + return; + } + debug!("fill_cell_payload(overflow)"); + + let payload_overflow_threshold_min = payload_overflow_threshold_min(page_type, usable_space); + // see e.g. https://github.com/sqlite/sqlite/blob/9591d3fe93936533c8c3b0dc4d025ac999539e11/src/dbstat.c#L371 + let mut space_left = payload_overflow_threshold_min + + (record_buf.len() - payload_overflow_threshold_min) % (usable_space as usize - 4); + + if space_left > payload_overflow_threshold_max { + space_left = payload_overflow_threshold_min; + } + + // cell_size must be equal to first value of space_left as this will be the bytes copied to non-overflow page. + let cell_size = space_left + cell_payload.len() + 4; // 4 is the number of bytes of pointer to first overflow page + let mut to_copy_buffer = record_buf.as_slice(); + + let prev_size = cell_payload.len(); + cell_payload.resize(prev_size + space_left + 4, 0); + let mut pointer = unsafe { cell_payload.as_mut_ptr().add(prev_size) }; + let mut pointer_to_next = unsafe { cell_payload.as_mut_ptr().add(prev_size + space_left) }; + let mut overflow_pages = Vec::new(); + + loop { + let to_copy = space_left.min(to_copy_buffer.len()); + unsafe { std::ptr::copy(to_copy_buffer.as_ptr(), pointer, to_copy) }; + + let left = to_copy_buffer.len() - to_copy; + if left == 0 { + break; + } + + // we still have bytes to add, we will need to allocate new overflow page + let overflow_page = allocate_overflow_page(pager.clone()); + overflow_pages.push(overflow_page.clone()); + { + let id = overflow_page.get().id as u32; + let contents = overflow_page.get().contents.as_mut().unwrap(); + + // TODO: take into account offset here? + let buf = contents.as_ptr(); + let as_bytes = id.to_be_bytes(); + // update pointer to new overflow page + unsafe { std::ptr::copy(as_bytes.as_ptr(), pointer_to_next, 4) }; + + pointer = unsafe { buf.as_mut_ptr().add(4) }; + pointer_to_next = buf.as_mut_ptr(); + space_left = usable_space as usize - 4; + } + + to_copy_buffer = &to_copy_buffer[to_copy..]; + } + + assert_eq!(cell_size, cell_payload.len()); +} + +/// Allocate a new overflow page. +/// This is done when a cell overflows and new space is needed. +fn allocate_overflow_page(pager: Rc) -> PageRef { + let page = pager.allocate_page().unwrap(); + + // setup overflow page + let contents = page.get().contents.as_mut().unwrap(); + let buf = contents.as_ptr(); + buf.fill(0); + + page +} + +/// Returns the maximum payload size (X) that can be stored directly on a b-tree page without spilling to overflow pages. +/// +/// For table leaf pages: X = usable_size - 35 +/// For index pages: X = ((usable_size - 12) * 64/255) - 23 +/// +/// The usable size is the total page size less the reserved space at the end of each page. +/// These thresholds are designed to: +/// - Give a minimum fanout of 4 for index b-trees +/// - Ensure enough payload is on the b-tree page that the record header can usually be accessed +/// without consulting an overflow page +fn payload_overflow_threshold_max(page_type: PageType, usable_space: u16) -> usize { + match page_type { + PageType::IndexInterior | PageType::IndexLeaf => { + ((usable_space as usize - 12) * 64 / 255) - 23 // Index page formula + } + PageType::TableInterior | PageType::TableLeaf => { + usable_space as usize - 35 // Table leaf page formula + } + } +} + +/// Returns the minimum payload size (M) that must be stored on the b-tree page before spilling to overflow pages is allowed. +/// +/// For all page types: M = ((usable_size - 12) * 32/255) - 23 +/// +/// When payload size P exceeds max_local(): +/// - If K = M + ((P-M) % (usable_size-4)) <= max_local(): store K bytes on page +/// - Otherwise: store M bytes on page +/// +/// The remaining bytes are stored on overflow pages in both cases. +fn payload_overflow_threshold_min(_page_type: PageType, usable_space: u16) -> usize { + // Same formula for all page types + ((usable_space as usize - 12) * 32 / 255) - 23 +} + +/// Drop a cell from a page. +/// This is done by freeing the range of bytes that the cell occupies. +fn drop_cell(page: &mut PageContent, cell_idx: usize, usable_space: u16) -> Result<()> { + let (cell_start, cell_len) = page.cell_get_raw_region( + cell_idx, + payload_overflow_threshold_max(page.page_type(), usable_space), + payload_overflow_threshold_min(page.page_type(), usable_space), + usable_space as usize, + ); + free_cell_range(page, cell_start as u16, cell_len as u16, usable_space)?; + if page.cell_count() > 1 { + shift_pointers_left(page, cell_idx); + } else { + page.write_u16(PAGE_HEADER_OFFSET_CELL_CONTENT_AREA, usable_space); + page.write_u16(PAGE_HEADER_OFFSET_FIRST_FREEBLOCK, 0); + page.write_u8(PAGE_HEADER_OFFSET_FRAGMENTED_BYTES_COUNT, 0); + } + page.write_u16(PAGE_HEADER_OFFSET_CELL_COUNT, page.cell_count() as u16 - 1); + Ok(()) +} + +/// Shift pointers to the left once starting from a cell position +/// This is useful when we remove a cell and we want to move left the cells from the right to fill +/// the empty space that's not needed +fn shift_pointers_left(page: &mut PageContent, cell_idx: usize) { + assert!(page.cell_count() > 0); + let buf = page.as_ptr(); + let (start, _) = page.cell_pointer_array_offset_and_size(); + let start = start + (cell_idx * 2) + 2; + let right_cells = page.cell_count() - cell_idx - 1; + let amount_to_shift = right_cells * 2; + buf.copy_within(start..start + amount_to_shift, start - 2); } #[cfg(test)] @@ -2374,10 +2782,153 @@ mod tests { use crate::storage::database::FileStorage; use crate::storage::page_cache::DumbLruPageCache; use crate::storage::sqlite3_ondisk; + use crate::storage::sqlite3_ondisk::DatabaseHeader; + use crate::types::Text; use crate::{BufferPool, DatabaseStorage, WalFile, WalFileShared, WriteCompletion}; use std::cell::RefCell; + use std::ops::Deref; + use std::panic; + use std::rc::Rc; use std::sync::Arc; + use rand::{thread_rng, Rng}; + use tempfile::TempDir; + + use crate::{ + io::BufferData, + storage::{ + btree::{ + compute_free_space, fill_cell_payload, payload_overflow_threshold_max, + payload_overflow_threshold_min, + }, + pager::PageRef, + sqlite3_ondisk::{BTreeCell, PageContent, PageType}, + }, + types::{OwnedValue, Record}, + Database, Page, Pager, PlatformIO, + }; + + use super::{btree_init_page, defragment_page, drop_cell, insert_into_cell}; + + fn get_page(id: usize) -> PageRef { + let page = Arc::new(Page::new(id)); + + let drop_fn = Rc::new(|_| {}); + let inner = PageContent { + offset: 0, + buffer: Rc::new(RefCell::new(Buffer::new( + BufferData::new(vec![0; 4096]), + drop_fn, + ))), + overflow_cells: Vec::new(), + }; + page.get().contents.replace(inner); + + btree_init_page(&page, PageType::TableLeaf, 0, 4096); + page + } + + fn get_database() -> Arc { + let mut path = TempDir::new().unwrap().into_path(); + path.push("test.db"); + { + let connection = rusqlite::Connection::open(&path).unwrap(); + connection + .pragma_update(None, "journal_mode", "wal") + .unwrap(); + } + let io: Arc = Arc::new(PlatformIO::new().unwrap()); + let db = Database::open_file(io.clone(), path.to_str().unwrap()).unwrap(); + + db + } + + fn ensure_cell(page: &mut PageContent, cell_idx: usize, payload: &Vec) { + let cell = page.cell_get_raw_region( + cell_idx, + payload_overflow_threshold_max(page.page_type(), 4096), + payload_overflow_threshold_min(page.page_type(), 4096), + 4096, + ); + tracing::trace!("cell idx={} start={} len={}", cell_idx, cell.0, cell.1); + let buf = &page.as_ptr()[cell.0..cell.0 + cell.1]; + assert_eq!(buf.len(), payload.len()); + assert_eq!(buf, payload); + } + + fn add_record( + id: usize, + pos: usize, + page: &mut PageContent, + record: Record, + db: &Arc, + ) -> Vec { + let mut payload: Vec = Vec::new(); + fill_cell_payload( + page.page_type(), + Some(id as u64), + &mut payload, + &record, + 4096, + db.pager.clone(), + ); + insert_into_cell(page, &payload, pos, 4096).unwrap(); + payload + } + + #[test] + fn test_insert_cell() { + let db = get_database(); + let page = get_page(2); + let page = page.get_contents(); + let header_size = 8; + let record = Record::new([OwnedValue::Integer(1)].to_vec()); + let payload = add_record(1, 0, page, record, &db); + assert_eq!(page.cell_count(), 1); + let free = compute_free_space(page, 4096); + assert_eq!(free, 4096 - payload.len() as u16 - 2 - header_size); + + let cell_idx = 0; + ensure_cell(page, cell_idx, &payload); + } + + struct Cell { + pos: usize, + payload: Vec, + } + + #[test] + fn test_drop_1() { + let db = get_database(); + + let page = get_page(2); + let page = page.get_contents(); + let header_size = 8; + + let mut total_size = 0; + let mut cells = Vec::new(); + let usable_space = 4096; + for i in 0..3 { + let record = Record::new([OwnedValue::Integer(i as i64)].to_vec()); + let payload = add_record(i, i, page, record, &db); + assert_eq!(page.cell_count(), i + 1); + let free = compute_free_space(page, usable_space); + total_size += payload.len() as u16 + 2; + assert_eq!(free, 4096 - total_size - header_size); + cells.push(Cell { pos: i, payload }); + } + + for (i, cell) in cells.iter().enumerate() { + ensure_cell(page, i, &cell.payload); + } + cells.remove(1); + drop_cell(page, 1, usable_space).unwrap(); + + for (i, cell) in cells.iter().enumerate() { + ensure_cell(page, i, &cell.payload); + } + } + fn validate_btree(pager: Rc, page_idx: usize) -> (usize, bool) { let cursor = BTreeCursor::new(pager.clone(), page_idx); let page = pager.read_page(page_idx).unwrap(); @@ -2392,8 +2943,8 @@ mod tests { .cell_get( cell_idx, pager.clone(), - cursor.payload_overflow_threshold_max(page_type), - cursor.payload_overflow_threshold_min(page_type), + payload_overflow_threshold_max(page_type, 4096), + payload_overflow_threshold_min(page_type, 4096), cursor.usable_space(), ) .unwrap(); @@ -2455,8 +3006,8 @@ mod tests { .cell_get( cell_idx, pager.clone(), - cursor.payload_overflow_threshold_max(page_type), - cursor.payload_overflow_threshold_min(page_type), + payload_overflow_threshold_max(page_type, 4096), + payload_overflow_threshold_min(page_type, 4096), cursor.usable_space(), ) .unwrap(); @@ -2522,7 +3073,7 @@ mod tests { }; let pager = Rc::new(pager); let page1 = pager.allocate_page().unwrap(); - btree_init_page(&page1, PageType::TableLeaf, &db_header, 0); + btree_init_page(&page1, PageType::TableLeaf, 0, 4096); (pager, page1.get().id) } @@ -2631,9 +3182,18 @@ mod tests { size, insert_id ); + run_until_done( + || { + let key = SeekKey::TableRowId(key as u64); + cursor.move_to(key, SeekOp::EQ) + }, + pager.deref(), + ) + .unwrap(); + let key = OwnedValue::Integer(key); let value = Record::new(vec![OwnedValue::Blob(Rc::new(vec![0; size]))]); - cursor.insert(&key, &value, false).unwrap(); + run_until_done(|| cursor.insert(&key, &value, true), pager.deref()).unwrap(); } tracing::info!( "=========== btree ===========\n{}\n\n", @@ -2655,6 +3215,47 @@ mod tests { } } } + #[test] + pub fn test_drop_odd() { + let db = get_database(); + + let page = get_page(2); + let page = page.get_contents(); + let header_size = 8; + + let mut total_size = 0; + let mut cells = Vec::new(); + let usable_space = 4096; + let total_cells = 10; + for i in 0..total_cells { + let record = Record::new([OwnedValue::Integer(i as i64)].to_vec()); + let payload = add_record(i, i, page, record, &db); + assert_eq!(page.cell_count(), i + 1); + let free = compute_free_space(page, usable_space); + total_size += payload.len() as u16 + 2; + assert_eq!(free, 4096 - total_size - header_size); + cells.push(Cell { pos: i, payload }); + } + + let mut removed = 0; + let mut new_cells = Vec::new(); + for cell in cells { + if cell.pos % 2 == 1 { + drop_cell(page, cell.pos - removed, usable_space).unwrap(); + removed += 1; + } else { + new_cells.push(cell); + } + } + let cells = new_cells; + for (i, cell) in cells.iter().enumerate() { + ensure_cell(page, i, &cell.payload); + } + + for (i, cell) in cells.iter().enumerate() { + ensure_cell(page, i, &cell.payload); + } + } #[test] pub fn btree_insert_fuzz_run_equal_size() { @@ -2743,11 +3344,11 @@ mod tests { } #[test] - fn test_clear_overflow_pages() -> Result<()> { + pub fn test_clear_overflow_pages() -> Result<()> { let (pager, db_header) = setup_test_env(5); let cursor = BTreeCursor::new(pager.clone(), 1); - let max_local = cursor.payload_overflow_threshold_max(PageType::TableLeaf); + let max_local = payload_overflow_threshold_max(PageType::TableLeaf, 4096); let usable_size = cursor.usable_space(); // Create a large payload that will definitely trigger overflow @@ -2840,7 +3441,7 @@ mod tests { } #[test] - fn test_clear_overflow_pages_no_overflow() -> Result<()> { + pub fn test_clear_overflow_pages_no_overflow() -> Result<()> { let (pager, db_header) = setup_test_env(5); let cursor = BTreeCursor::new(pager.clone(), 1); @@ -2880,4 +3481,436 @@ mod tests { Ok(()) } + #[test] + pub fn test_defragment() { + let db = get_database(); + + let page = get_page(2); + let page = page.get_contents(); + let header_size = 8; + + let mut total_size = 0; + let mut cells = Vec::new(); + let usable_space = 4096; + for i in 0..3 { + let record = Record::new([OwnedValue::Integer(i as i64)].to_vec()); + let payload = add_record(i, i, page, record, &db); + assert_eq!(page.cell_count(), i + 1); + let free = compute_free_space(page, usable_space); + total_size += payload.len() as u16 + 2; + assert_eq!(free, 4096 - total_size - header_size); + cells.push(Cell { pos: i, payload }); + } + + for (i, cell) in cells.iter().enumerate() { + ensure_cell(page, i, &cell.payload); + } + cells.remove(1); + drop_cell(page, 1, usable_space).unwrap(); + + for (i, cell) in cells.iter().enumerate() { + ensure_cell(page, i, &cell.payload); + } + + defragment_page(page, usable_space); + + for (i, cell) in cells.iter().enumerate() { + ensure_cell(page, i, &cell.payload); + } + } + + #[test] + pub fn test_drop_odd_with_defragment() { + let db = get_database(); + + let page = get_page(2); + let page = page.get_contents(); + let header_size = 8; + + let mut total_size = 0; + let mut cells = Vec::new(); + let usable_space = 4096; + let total_cells = 10; + for i in 0..total_cells { + let record = Record::new([OwnedValue::Integer(i as i64)].to_vec()); + let payload = add_record(i, i, page, record, &db); + assert_eq!(page.cell_count(), i + 1); + let free = compute_free_space(page, usable_space); + total_size += payload.len() as u16 + 2; + assert_eq!(free, 4096 - total_size - header_size); + cells.push(Cell { pos: i, payload }); + } + + let mut removed = 0; + let mut new_cells = Vec::new(); + for cell in cells { + if cell.pos % 2 == 1 { + drop_cell(page, cell.pos - removed, usable_space).unwrap(); + removed += 1; + } else { + new_cells.push(cell); + } + } + let cells = new_cells; + for (i, cell) in cells.iter().enumerate() { + ensure_cell(page, i, &cell.payload); + } + + defragment_page(page, usable_space); + + for (i, cell) in cells.iter().enumerate() { + ensure_cell(page, i, &cell.payload); + } + } + + #[test] + pub fn test_fuzz_drop_defragment_insert() { + let db = get_database(); + + let page = get_page(2); + let page = page.get_contents(); + let header_size = 8; + + let mut total_size = 0; + let mut cells = Vec::new(); + let usable_space = 4096; + let mut i = 1000; + let seed = thread_rng().gen(); + let mut rng = ChaCha8Rng::seed_from_u64(seed); + while i > 0 { + i -= 1; + match rng.next_u64() % 3 { + 0 => { + // allow appends with extra place to insert + let cell_idx = rng.next_u64() as usize % (page.cell_count() + 1); + let free = compute_free_space(page, usable_space); + let record = Record::new([OwnedValue::Integer(i as i64)].to_vec()); + let mut payload: Vec = Vec::new(); + fill_cell_payload( + page.page_type(), + Some(i as u64), + &mut payload, + &record, + 4096, + db.pager.clone(), + ); + if (free as usize) < payload.len() - 2 { + // do not try to insert overflow pages because they require balancing + continue; + } + insert_into_cell(page, &payload, cell_idx, 4096).unwrap(); + assert!(page.overflow_cells.is_empty()); + total_size += payload.len() as u16 + 2; + cells.push(Cell { pos: i, payload }); + } + 1 => { + if page.cell_count() == 0 { + continue; + } + let cell_idx = rng.next_u64() as usize % page.cell_count(); + let (_, len) = page.cell_get_raw_region( + cell_idx, + payload_overflow_threshold_max(page.page_type(), 4096), + payload_overflow_threshold_min(page.page_type(), 4096), + usable_space as usize, + ); + drop_cell(page, cell_idx, usable_space).unwrap(); + total_size -= len as u16 + 2; + cells.remove(cell_idx); + } + 2 => { + defragment_page(page, usable_space); + } + _ => unreachable!(), + } + let free = compute_free_space(page, usable_space); + assert_eq!(free, 4096 - total_size - header_size); + } + } + + #[test] + pub fn test_free_space() { + let db = get_database(); + let page = get_page(2); + let page = page.get_contents(); + let header_size = 8; + let usable_space = 4096; + + let record = Record::new([OwnedValue::Integer(0)].to_vec()); + let payload = add_record(0, 0, page, record, &db); + let free = compute_free_space(page, usable_space); + assert_eq!(free, 4096 - payload.len() as u16 - 2 - header_size); + } + + #[test] + pub fn test_defragment_1() { + let db = get_database(); + + let page = get_page(2); + let page = page.get_contents(); + let usable_space = 4096; + + let record = Record::new([OwnedValue::Integer(0 as i64)].to_vec()); + let payload = add_record(0, 0, page, record, &db); + + assert_eq!(page.cell_count(), 1); + defragment_page(page, usable_space); + assert_eq!(page.cell_count(), 1); + let (start, len) = page.cell_get_raw_region( + 0, + payload_overflow_threshold_max(page.page_type(), 4096), + payload_overflow_threshold_min(page.page_type(), 4096), + usable_space as usize, + ); + let buf = page.as_ptr(); + assert_eq!(&payload, &buf[start..start + len]); + } + + #[test] + pub fn test_insert_drop_insert() { + let db = get_database(); + + let page = get_page(2); + let page = page.get_contents(); + let usable_space = 4096; + + let record = Record::new( + [ + OwnedValue::Integer(0), + OwnedValue::Text(Text::new("aaaaaaaa")), + ] + .to_vec(), + ); + let _ = add_record(0, 0, page, record, &db); + + assert_eq!(page.cell_count(), 1); + drop_cell(page, 0, usable_space).unwrap(); + assert_eq!(page.cell_count(), 0); + + let record = Record::new([OwnedValue::Integer(0 as i64)].to_vec()); + let payload = add_record(0, 0, page, record, &db); + assert_eq!(page.cell_count(), 1); + + let (start, len) = page.cell_get_raw_region( + 0, + payload_overflow_threshold_max(page.page_type(), 4096), + payload_overflow_threshold_min(page.page_type(), 4096), + usable_space as usize, + ); + let buf = page.as_ptr(); + assert_eq!(&payload, &buf[start..start + len]); + } + + #[test] + pub fn test_insert_drop_insert_multiple() { + let db = get_database(); + + let page = get_page(2); + let page = page.get_contents(); + let usable_space = 4096; + + let record = Record::new( + [ + OwnedValue::Integer(0), + OwnedValue::Text(Text::new("aaaaaaaa")), + ] + .to_vec(), + ); + let _ = add_record(0, 0, page, record, &db); + + for _ in 0..100 { + assert_eq!(page.cell_count(), 1); + drop_cell(page, 0, usable_space).unwrap(); + assert_eq!(page.cell_count(), 0); + + let record = Record::new([OwnedValue::Integer(0)].to_vec()); + let payload = add_record(0, 0, page, record, &db); + assert_eq!(page.cell_count(), 1); + + let (start, len) = page.cell_get_raw_region( + 0, + payload_overflow_threshold_max(page.page_type(), 4096), + payload_overflow_threshold_min(page.page_type(), 4096), + usable_space as usize, + ); + let buf = page.as_ptr(); + assert_eq!(&payload, &buf[start..start + len]); + } + } + + #[test] + pub fn test_drop_a_few_insert() { + let db = get_database(); + + let page = get_page(2); + let page = page.get_contents(); + let usable_space = 4096; + + let record = Record::new([OwnedValue::Integer(0)].to_vec()); + let payload = add_record(0, 0, page, record, &db); + let record = Record::new([OwnedValue::Integer(1)].to_vec()); + let _ = add_record(1, 1, page, record, &db); + let record = Record::new([OwnedValue::Integer(2)].to_vec()); + let _ = add_record(2, 2, page, record, &db); + + drop_cell(page, 1, usable_space).unwrap(); + drop_cell(page, 1, usable_space).unwrap(); + + ensure_cell(page, 0, &payload); + } + + #[test] + pub fn test_fuzz_victim_1() { + let db = get_database(); + + let page = get_page(2); + let page = page.get_contents(); + let usable_space = 4096; + + let record = Record::new([OwnedValue::Integer(0)].to_vec()); + let _ = add_record(0, 0, page, record, &db); + + let record = Record::new([OwnedValue::Integer(0)].to_vec()); + let _ = add_record(0, 0, page, record, &db); + drop_cell(page, 0, usable_space).unwrap(); + + defragment_page(page, usable_space); + + let record = Record::new([OwnedValue::Integer(0)].to_vec()); + let _ = add_record(0, 1, page, record, &db); + + drop_cell(page, 0, usable_space).unwrap(); + + let record = Record::new([OwnedValue::Integer(0)].to_vec()); + let _ = add_record(0, 1, page, record, &db); + } + + #[test] + pub fn test_fuzz_victim_2() { + let db = get_database(); + + let page = get_page(2); + let usable_space = 4096; + let insert = |pos, page| { + let record = Record::new([OwnedValue::Integer(0)].to_vec()); + let _ = add_record(0, pos, page, record, &db); + }; + let drop = |pos, page| { + drop_cell(page, pos, usable_space).unwrap(); + }; + let defragment = |page| { + defragment_page(page, usable_space); + }; + defragment(page.get_contents()); + defragment(page.get_contents()); + insert(0, page.get_contents()); + drop(0, page.get_contents()); + insert(0, page.get_contents()); + drop(0, page.get_contents()); + insert(0, page.get_contents()); + defragment(page.get_contents()); + defragment(page.get_contents()); + drop(0, page.get_contents()); + defragment(page.get_contents()); + insert(0, page.get_contents()); + drop(0, page.get_contents()); + insert(0, page.get_contents()); + insert(1, page.get_contents()); + insert(1, page.get_contents()); + insert(0, page.get_contents()); + drop(3, page.get_contents()); + drop(2, page.get_contents()); + compute_free_space(page.get_contents(), usable_space); + } + + #[test] + pub fn test_fuzz_victim_3() { + let db = get_database(); + + let page = get_page(2); + let usable_space = 4096; + let insert = |pos, page| { + let record = Record::new([OwnedValue::Integer(0)].to_vec()); + let _ = add_record(0, pos, page, record, &db); + }; + let drop = |pos, page| { + drop_cell(page, pos, usable_space).unwrap(); + }; + let defragment = |page| { + defragment_page(page, usable_space); + }; + let record = Record::new([OwnedValue::Integer(0)].to_vec()); + let mut payload: Vec = Vec::new(); + fill_cell_payload( + page.get_contents().page_type(), + Some(0), + &mut payload, + &record, + 4096, + db.pager.clone(), + ); + insert(0, page.get_contents()); + defragment(page.get_contents()); + insert(0, page.get_contents()); + defragment(page.get_contents()); + insert(0, page.get_contents()); + drop(2, page.get_contents()); + drop(0, page.get_contents()); + let free = compute_free_space(page.get_contents(), usable_space); + let total_size = payload.len() + 2; + assert_eq!( + free, + usable_space - page.get_contents().header_size() as u16 - total_size as u16 + ); + dbg!(free); + } + #[test] + pub fn btree_insert_sequential() { + let (pager, root_page) = empty_btree(); + let mut keys = Vec::new(); + for i in 0..10000 { + let mut cursor = BTreeCursor::new(pager.clone(), root_page); + tracing::info!("INSERT INTO t VALUES ({});", i,); + let key = OwnedValue::Integer(i); + let value = Record::new(vec![OwnedValue::Integer(i)]); + tracing::trace!("before insert {}", i); + run_until_done( + || { + let key = SeekKey::TableRowId(i as u64); + cursor.move_to(key, SeekOp::EQ) + }, + pager.deref(), + ) + .unwrap(); + run_until_done(|| cursor.insert(&key, &value, true), pager.deref()).unwrap(); + keys.push(i); + } + if matches!(validate_btree(pager.clone(), root_page), (_, false)) { + panic!("invalid btree"); + } + tracing::trace!( + "=========== btree ===========\n{}\n\n", + format_btree(pager.clone(), root_page, 0) + ); + for key in keys.iter() { + let mut cursor = BTreeCursor::new(pager.clone(), root_page); + let key = OwnedValue::Integer(*key); + let exists = run_until_done(|| cursor.exists(&key), pager.deref()).unwrap(); + assert!(exists, "key not found {}", key); + } + } + + fn run_until_done( + mut action: impl FnMut() -> Result>, + pager: &Pager, + ) -> Result { + loop { + match action()? { + CursorResult::Ok(res) => { + return Ok(res); + } + CursorResult::IO => pager.io.run_once().unwrap(), + } + } + } } diff --git a/core/storage/pager.rs b/core/storage/pager.rs index a67e4dd3f..20e7f9e6b 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -56,6 +56,10 @@ impl Page { unsafe { &mut *self.inner.get() } } + pub fn get_contents(&self) -> &mut PageContent { + self.get().contents.as_mut().unwrap() + } + pub fn is_uptodate(&self) -> bool { self.get().flags.load(Ordering::SeqCst) & PAGE_UPTODATE != 0 } diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 4fd9c1cf6..0e49d30ae 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -439,14 +439,14 @@ impl PageContent { u16::from_be_bytes([buf[self.offset + pos], buf[self.offset + pos + 1]]) } + pub fn read_u16_no_offset(&self, pos: usize) -> u16 { + let buf = self.as_ptr(); + u16::from_be_bytes([buf[pos], buf[pos + 1]]) + } + pub fn read_u32(&self, pos: usize) -> u32 { let buf = self.as_ptr(); - u32::from_be_bytes([ - buf[self.offset + pos], - buf[self.offset + pos + 1], - buf[self.offset + pos + 2], - buf[self.offset + pos + 3], - ]) + read_u32(buf, self.offset + pos) } pub fn write_u8(&self, pos: usize, value: u8) { @@ -461,6 +461,12 @@ impl PageContent { buf[self.offset + pos..self.offset + pos + 2].copy_from_slice(&value.to_be_bytes()); } + pub fn write_u16_no_offset(&self, pos: usize, value: u16) { + tracing::debug!("write_u16(pos={}, value={})", pos, value); + let buf = self.as_ptr(); + buf[pos..pos + 2].copy_from_slice(&value.to_be_bytes()); + } + pub fn write_u32(&self, pos: usize, value: u32) { tracing::debug!("write_u32(pos={}, value={})", pos, value); let buf = self.as_ptr(); @@ -534,6 +540,16 @@ impl PageContent { } } + pub fn rightmost_pointer_raw(&self) -> Option<*mut u8> { + match self.page_type() { + PageType::IndexInterior | PageType::TableInterior => { + Some(unsafe { self.as_ptr().as_mut_ptr().add(self.offset + 8) }) + } + PageType::IndexLeaf => None, + PageType::TableLeaf => None, + } + } + pub fn cell_get( &self, idx: usize, @@ -574,7 +590,7 @@ impl PageContent { (self.offset + header_size, self.cell_pointer_array_size()) } - /* Get region of a cell's payload */ + /// Get region of a cell's payload pub fn cell_get_raw_region( &self, idx: usize, @@ -584,10 +600,10 @@ impl PageContent { ) -> (usize, usize) { let buf = self.as_ptr(); let ncells = self.cell_count(); - let cell_pointer_array_start = self.header_size(); + let (cell_pointer_array_start, _) = self.cell_pointer_array_offset_and_size(); assert!(idx < ncells, "cell_get: idx out of bounds"); let cell_pointer = cell_pointer_array_start + (idx * 2); // pointers are 2 bytes each - let cell_pointer = self.read_u16(cell_pointer) as usize; + let cell_pointer = self.read_u16_no_offset(cell_pointer) as usize; let start = cell_pointer; let len = match self.page_type() { PageType::IndexInterior => { @@ -888,6 +904,7 @@ fn read_payload(unread: &[u8], payload_size: usize, pager: Rc) -> (Vec 0); let page; loop { + // FIXME(pere): this looks terrible, what did i do lmao let page_ref = pager.read_page(next_overflow as usize); if let Ok(p) = page_ref { page = p; @@ -1471,6 +1488,10 @@ impl WalHeader { } } +pub fn read_u32(buf: &[u8], pos: usize) -> u32 { + u32::from_be_bytes([buf[pos], buf[pos + 1], buf[pos + 2], buf[pos + 3]]) +} + #[cfg(test)] mod tests { use super::*; diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index 38b2092db..e001a5e6c 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -2707,6 +2707,9 @@ impl Program { _ => unreachable!("Not a record! Cannot insert a non record value."), }; let key = &state.registers[*key_reg]; + // NOTE(pere): Sending moved_before == true is okay because we moved before but + // if we were to set to false after starting a balance procedure, it might + // leave undefined state. return_if_io!(cursor.insert(key, record, true)); state.pc += 1; }