diff --git a/core/storage/btree.rs b/core/storage/btree.rs index f8ebbef24..362b440c2 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -2,8 +2,8 @@ use tracing::debug; use crate::storage::pager::Pager; use crate::storage::sqlite3_ondisk::{ - read_btree_cell, read_u32, read_varint, BTreeCell, DatabaseHeader, PageContent, PageType, - TableInteriorCell, TableLeafCell, + read_u32, read_varint, BTreeCell, DatabaseHeader, PageContent, PageType, TableInteriorCell, + TableLeafCell, }; use crate::types::{CursorResult, OwnedValue, Record, SeekKey, SeekOp}; @@ -82,8 +82,6 @@ enum WriteState { BalanceStart, BalanceNonRoot, BalanceNonRootWaitLoadPages, - BalanceGetParentPage, - BalanceMoveUp, Finish, } @@ -763,14 +761,18 @@ impl BTreeCursor { // insert let overflow = { let contents = page.get().contents.as_mut().unwrap(); - self.insert_into_cell(contents, cell_payload.as_slice(), cell_idx); - let overflow_cells = contents.overflow_cells.len(); debug!( - "insert_into_page(overflow, cell_count={}, overflow_cells={})", - contents.cell_count(), - overflow_cells + "insert_into_page(overflow, cell_count={})", + contents.cell_count() ); - overflow_cells + + insert_into_cell( + contents, + cell_payload.as_slice(), + cell_idx, + self.usable_space() as u16, + ); + contents.overflow_cells.len() }; let write_info = self .state @@ -784,9 +786,7 @@ impl BTreeCursor { } WriteState::BalanceStart | WriteState::BalanceNonRoot - | WriteState::BalanceNonRootWaitLoadPages - | WriteState::BalanceMoveUp - | WriteState::BalanceGetParentPage => { + | WriteState::BalanceNonRootWaitLoadPages => { return_if_io!(self.balance()); } WriteState::Finish => { @@ -798,56 +798,6 @@ impl BTreeCursor { return ret; } - /// Insert a record into a cell. - /// If the cell overflows, an overflow cell is created. - /// insert_into_cell() is called from insert_into_page(), - /// and the overflow cell count is used to determine if the page overflows, - /// i.e. whether we need to balance the btree after the insert. - fn insert_into_cell(&self, page: &mut PageContent, payload: &[u8], cell_idx: usize) { - let free = self.compute_free_space(page, RefCell::borrow(&self.pager.db_header)); - const CELL_POINTER_SIZE_BYTES: usize = 2; - let enough_space = payload.len() + CELL_POINTER_SIZE_BYTES <= free as usize; - if !enough_space { - // add to overflow cells - page.overflow_cells.push(OverflowCell { - index: cell_idx, - payload: Pin::new(Vec::from(payload)), - }); - return; - } - - // TODO: insert into cell payload in internal page - let new_cell_data_pointer = self - .allocate_cell_space(page, payload.len() as u16) - .unwrap(); - let buf = page.as_ptr(); - - // Copy cell data - buf[new_cell_data_pointer as usize..new_cell_data_pointer as usize + payload.len()] - .copy_from_slice(payload); - // memmove(pIns+2, pIns, 2*(pPage->nCell - i)); - let (cell_pointer_array_start, _) = page.cell_pointer_array_offset_and_size(); - let cell_pointer_cur_idx = cell_pointer_array_start + (CELL_POINTER_SIZE_BYTES * cell_idx); - - let cell_count = page.cell_count(); - - // Move existing pointers if needed - let n_bytes_forward = CELL_POINTER_SIZE_BYTES * (cell_count - cell_idx); - if n_bytes_forward > 0 { - buf.copy_within( - cell_pointer_cur_idx..cell_pointer_cur_idx + n_bytes_forward, - cell_pointer_cur_idx + CELL_POINTER_SIZE_BYTES, - ); - } - - // Insert new cell pointer at the current cell index - page.write_u16(cell_pointer_cur_idx - page.offset, new_cell_data_pointer); - - // Update cell count - let new_n_cells = (page.cell_count() + 1) as u16; - page.write_u16(PAGE_HEADER_OFFSET_CELL_COUNT, new_n_cells); - } - /// Drop a cell from a page. /// This is done by freeing the range of bytes that the cell occupies. fn drop_cell(&self, page: &mut PageContent, cell_idx: usize) { @@ -905,9 +855,7 @@ impl BTreeCursor { self.stack.pop(); return_if_io!(self.balance_non_root()); } - WriteState::BalanceNonRoot - | WriteState::BalanceGetParentPage - | WriteState::BalanceMoveUp => { + WriteState::BalanceNonRoot | WriteState::BalanceNonRootWaitLoadPages => { return_if_io!(self.balance_non_root()); } @@ -927,11 +875,6 @@ impl BTreeCursor { WriteState::Start => todo!(), WriteState::BalanceStart => todo!(), WriteState::BalanceNonRoot => { - // drop divider cells and find right pointer - // NOTE: since we are doing a simple split we only finding the pointer we want to update (right pointer). - // Right pointer means cell that points to the last page, as we don't really want to drop this one. This one - // can be a "rightmost pointer" or a "cell". - // we always asumme there is a parent let parent_page = self.stack.top(); if parent_page.is_locked() { return Ok(CursorResult::IO); @@ -1050,6 +993,7 @@ impl BTreeCursor { if !all_loaded { return Ok(CursorResult::IO); } + // Now do real balancing let parent_page = self.stack.top(); let parent_contents = parent_page.get_contents(); assert!( @@ -1062,7 +1006,7 @@ impl BTreeCursor { // Get divider cells and max_cells let mut max_cells = 0; let pages_to_balance = write_info.pages_to_balance.borrow(); - let pages_to_balance_new = write_info.pages_to_balance.borrow(); + let mut pages_to_balance_new = Vec::new(); for i in (0..sibling_count).rev() { let sibling_page = &pages_to_balance[i]; let sibling_contents = sibling_page.get_contents(); @@ -1101,7 +1045,6 @@ impl BTreeCursor { .scratch_cells .replace(Vec::with_capacity(max_cells)); - let scratch_cells = write_info.scratch_cells.borrow_mut(); let mut cell_array = CellArray { cells: Vec::new(), number_of_cells_per_page: Vec::new(), @@ -1116,7 +1059,6 @@ impl BTreeCursor { let leaf_data = matches!(page_type, PageType::TableLeaf); for (i, old_page) in pages_to_balance.iter().enumerate() { let old_page_contents = old_page.get_contents(); - let old_page_type = old_page_contents.page_type(); for cell_idx in 0..old_page_contents.cell_count() { let (cell_start, cell_len) = old_page_contents.cell_get_raw_region( cell_idx, @@ -1125,9 +1067,9 @@ impl BTreeCursor { self.usable_space(), ); let buf = old_page_contents.as_ptr(); - let cell_buf = &buf[cell_start..cell_start + cell_len]; + let cell_buf = &mut buf[cell_start..cell_start + cell_len]; // TODO(pere): make this reference and not copy - cell_array.cells.push(cell_buf); + cell_array.cells.push(to_static_buf(cell_buf)); } // Insert overflow cells into correct place let mut offset = total_cells_inserted; @@ -1136,10 +1078,11 @@ impl BTreeCursor { 1, "todo: check this works for more than one overflow cell" ); - for overflow_cell in &old_page_contents.overflow_cells { - cell_array - .cells - .insert(offset + overflow_cell.index, &overflow_cell.payload); + for overflow_cell in old_page_contents.overflow_cells.iter_mut() { + cell_array.cells.insert( + offset + overflow_cell.index, + to_static_buf(&mut Pin::as_mut(&mut overflow_cell.payload)), + ); } count_cells_in_old_pages.push(cell_array.cells.len() as u16); @@ -1155,7 +1098,9 @@ impl BTreeCursor { // from divider cells in index interior pages (parent) because those should not be included. cells_inserted += 1; divider_cells.push(divider_cell); - cell_array.cells.push(÷r_cells.last().unwrap()); + cell_array + .cells + .push(to_static_buf(divider_cells.last_mut().unwrap().as_mut())); } total_cells_inserted += cells_inserted; } @@ -1169,10 +1114,9 @@ impl BTreeCursor { cell_array .number_of_cells_per_page .push(count_cells_in_old_pages[i]); - let page = pages_to_balance[i]; + let page = &pages_to_balance[i]; let page_contents = page.get_contents(); - let free_space = - self.compute_free_space(&page_contents, self.database_header.borrow()); + let free_space = compute_free_space(&page_contents, self.usable_space() as u16); // If we have an empty page of cells, we ignore it if k > 0 @@ -1319,7 +1263,7 @@ impl BTreeCursor { pages_to_balance[i].set_dirty(); pages_to_balance_new.push(pages_to_balance[i].clone()); } else { - let page = self.allocate_page(page_type, 0); + let page = self.allocate_page(page_type.clone(), 0); pages_to_balance_new.push(page); } } @@ -1373,7 +1317,12 @@ impl BTreeCursor { // Leaf index divider_cell[0..4].copy_from_slice(&(page.get().id as u32).to_be_bytes()); } - self.insert_into_cell(parent_contents, ÷r_cell, first_divider_cell + i); + insert_into_cell( + parent_contents, + ÷r_cell, + first_divider_cell + i, + self.usable_space() as u16, + ); } // TODO: update pages let mut done = vec![false; sibling_count_new]; @@ -1412,196 +1361,15 @@ impl BTreeCursor { &cell_array, usable_space as u16, ); + page.overflow_cells.clear(); done[page_idx] = true; } } // TODO: balance root - - return Ok(CursorResult::IO); - } - WriteState::BalanceGetParentPage => { - let parent = self.stack.parent(); - let loaded = parent.is_loaded(); - return_if_locked!(parent); - - if !loaded { - debug!("balance_leaf(loading page)"); - self.pager.load_page(parent.clone())?; - return Ok(CursorResult::IO); - } - parent.set_dirty(); - (WriteState::BalanceMoveUp, Ok(CursorResult::Ok(()))) - } - WriteState::BalanceMoveUp => { - let parent = self.stack.parent(); - - let (page_type, current_idx) = { - let current_page = self.stack.top(); - let contents = current_page.get().contents.as_ref().unwrap(); - (contents.page_type().clone(), current_page.get().id) - }; - - parent.set_dirty(); - self.pager.add_dirty(parent.get().id); - let parent_contents = parent.get().contents.as_mut().unwrap(); - // if this isn't empty next loop won't work - assert_eq!(parent_contents.overflow_cells.len(), 0); - - // Right page pointer is u32 in right most pointer, and in cell is u32 too, so we can use a *u32 to hold where we want to change this value - let mut right_pointer = PAGE_HEADER_OFFSET_RIGHTMOST_PTR; - for cell_idx in 0..parent_contents.cell_count() { - let cell = parent_contents.cell_get( - cell_idx, - self.pager.clone(), - self.payload_overflow_threshold_max(page_type.clone()), - self.payload_overflow_threshold_min(page_type.clone()), - self.usable_space(), - )?; - let found = match cell { - BTreeCell::TableInteriorCell(interior) => { - interior._left_child_page as usize == current_idx - } - _ => unreachable!("Parent should always be an interior page"), - }; - if found { - let (start, _len) = parent_contents.cell_get_raw_region( - cell_idx, - self.payload_overflow_threshold_max(page_type.clone()), - self.payload_overflow_threshold_min(page_type.clone()), - self.usable_space(), - ); - right_pointer = start; - break; - } - } - - let write_info = self.state.write_info().unwrap(); - let mut split_pages = write_info.split_pages.borrow_mut(); - let split_pages_len = split_pages.len(); - let scratch_cells = write_info.scratch_cells.borrow(); - - // reset pages - for page in split_pages.iter() { - assert!(page.is_dirty()); - let contents = page.get().contents.as_mut().unwrap(); - - contents.write_u16(PAGE_HEADER_OFFSET_FIRST_FREEBLOCK, 0); - contents.write_u16(PAGE_HEADER_OFFSET_CELL_COUNT, 0); - - contents.write_u16( - PAGE_HEADER_OFFSET_CELL_CONTENT_AREA, - self.usable_space() as u16, - ); - - contents.write_u8(PAGE_HEADER_OFFSET_FRAGMENTED_BYTES_COUNT, 0); - if !contents.is_leaf() { - contents.write_u32(PAGE_HEADER_OFFSET_RIGHTMOST_PTR, 0); - } - } - - let mut current_cell_index = 0_usize; - /* index to scratch cells that will be used as dividers in order */ - let mut divider_cells_index = Vec::with_capacity(split_pages.len()); - - debug!("balance_leaf::distribute(cells={})", scratch_cells.len()); - - for (i, page) in split_pages.iter_mut().enumerate() { - let page_id = page.get().id; - let contents = page.get().contents.as_mut().unwrap(); - - let cells_to_copy = write_info.split_pages_cells_count.borrow()[i]; - debug!( - "balance_leaf::distribute(page={}, cells_to_copy={})", - page_id, cells_to_copy - ); - - let cell_index_range = current_cell_index..current_cell_index + cells_to_copy; - for (j, cell_idx) in cell_index_range.enumerate() { - debug!("balance_leaf::distribute_in_page(page={}, cells_to_copy={}, j={}, cell_idx={})", page_id, cells_to_copy, j, cell_idx); - - let cell = scratch_cells[cell_idx]; - self.insert_into_cell(contents, cell, j); - } - divider_cells_index.push(current_cell_index + cells_to_copy - 1); - current_cell_index += cells_to_copy; - } - - let is_leaf = { - let page = self.stack.top(); - let page = page.get().contents.as_ref().unwrap(); - page.is_leaf() - }; - - // update rightmost pointer for each page if we are in interior page - if !is_leaf { - for page in split_pages.iter_mut().take(split_pages_len - 1) { - let contents = page.get().contents.as_mut().unwrap(); - - assert!(contents.cell_count() >= 1); - let last_cell = contents.cell_get( - contents.cell_count() - 1, - self.pager.clone(), - self.payload_overflow_threshold_max(contents.page_type()), - self.payload_overflow_threshold_min(contents.page_type()), - self.usable_space(), - )?; - let last_cell_pointer = match last_cell { - BTreeCell::TableInteriorCell(interior) => interior._left_child_page, - _ => unreachable!(), - }; - self.drop_cell(contents, contents.cell_count() - 1); - contents.write_u32(PAGE_HEADER_OFFSET_RIGHTMOST_PTR, last_cell_pointer); - } - // last page right most pointer points to previous right most pointer before splitting - let last_page = split_pages.last().unwrap(); - let last_page_contents = last_page.get().contents.as_mut().unwrap(); - last_page_contents.write_u32( - PAGE_HEADER_OFFSET_RIGHTMOST_PTR, - write_info.rightmost_pointer.borrow().unwrap(), - ); - } - - // insert dividers in parent - // we can consider dividers the first cell of each page starting from the second page - for (page_id_index, page) in - split_pages.iter_mut().take(split_pages_len - 1).enumerate() - { - let contents = page.get().contents.as_mut().unwrap(); - let divider_cell_index = divider_cells_index[page_id_index]; - let cell_payload = scratch_cells[divider_cell_index]; - let cell = read_btree_cell( - cell_payload, - &contents.page_type(), - 0, - self.pager.clone(), - self.payload_overflow_threshold_max(contents.page_type()), - self.payload_overflow_threshold_min(contents.page_type()), - self.usable_space(), - )?; - - let key = match cell { - BTreeCell::TableLeafCell(TableLeafCell { _rowid, .. }) - | BTreeCell::TableInteriorCell(TableInteriorCell { _rowid, .. }) => _rowid, - _ => unreachable!(), - }; - - let mut divider_cell = Vec::with_capacity(4 + 9); // 4 - page id, 9 - max length of varint - divider_cell.extend_from_slice(&(page.get().id as u32).to_be_bytes()); - write_varint_to_vec(key, &mut divider_cell); - - let parent_cell_idx = self.find_cell(parent_contents, key); - self.insert_into_cell(parent_contents, ÷r_cell, parent_cell_idx); - } - - { - // copy last page id to right pointer - let last_pointer = split_pages.last().unwrap().get().id as u32; - parent_contents.write_u32(right_pointer, last_pointer); - } self.stack.pop(); - let _ = write_info.page_copy.take(); - (WriteState::BalanceStart, Ok(CursorResult::Ok(()))) + // TODO: free pages + return Ok(CursorResult::IO); } WriteState::Finish => todo!(), }; @@ -1711,133 +1479,6 @@ impl BTreeCursor { page } - /// Allocate space for a cell on a page. - fn allocate_cell_space(&self, page_ref: &mut PageContent, amount: u16) -> Result { - let amount = amount as usize; - let (cell_offset, _) = page_ref.cell_pointer_array_offset_and_size(); - let gap = cell_offset + 2 * page_ref.cell_count(); - let mut top = page_ref.cell_content_area() as usize; - - if page_ref.first_freeblock() != 0 && gap + 2 <= top { - let db_header = RefCell::borrow(&self.pager.db_header); - let pc = self.find_free_cell(page_ref, amount, db_header)?; - if pc != 0 { - // Corruption check - if pc <= gap { - return Err(LimboError::Corrupt( - "Corrupted page: free block overlaps cell pointer array".into(), - )); - } - return Ok(pc as u16); - } - } - - if gap + 2 + amount > top { - // defragment - defragment_page(page_ref, self.usable_space() as u16); - top = page_ref.read_u16(PAGE_HEADER_OFFSET_CELL_CONTENT_AREA) as usize; - assert!(gap + 2 + amount <= top); - } - - top -= amount; - page_ref.write_u16(PAGE_HEADER_OFFSET_CELL_CONTENT_AREA, top as u16); - - let db_header = RefCell::borrow(&self.pager.db_header); - let usable_space = (db_header.page_size - db_header.reserved_space as u16) as usize; - assert!(top + amount <= usable_space); - - Ok(top as u16) - } - - /// Free blocks can be zero, meaning the "real free space" that can be used to allocate is expected to be between first cell byte - /// and end of cell pointer area. - #[allow(unused_assignments)] - fn compute_free_space(&self, page: &PageContent, db_header: Ref) -> u16 { - // TODO(pere): maybe free space is not calculated correctly with offset - - // Usable space, not the same as free space, simply means: - // space that is not reserved for extensions by sqlite. Usually reserved_space is 0. - let usable_space = (db_header.page_size - db_header.reserved_space as u16) as usize; - - let mut cell_content_area_start = page.cell_content_area(); - // A zero value for the cell content area pointer is interpreted as 65536. - // See https://www.sqlite.org/fileformat.html - // The max page size for a sqlite database is 64kiB i.e. 65536 bytes. - // 65536 is u16::MAX + 1, and since cell content grows from right to left, this means - // the cell content area pointer is at the end of the page, - // i.e. - // 1. the page size is 64kiB - // 2. there are no cells on the page - // 3. there is no reserved space at the end of the page - if cell_content_area_start == 0 { - cell_content_area_start = u16::MAX; - } - - // The amount of free space is the sum of: - // #1. the size of the unallocated region - // #2. fragments (isolated 1-3 byte chunks of free space within the cell content area) - // #3. freeblocks (linked list of blocks of at least 4 bytes within the cell content area that are not in use due to e.g. deletions) - - let mut free_space_bytes = - page.unallocated_region_size() + page.num_frag_free_bytes() as usize; - - // #3 is computed by iterating over the freeblocks linked list - let mut cur_freeblock_ptr = page.first_freeblock() as usize; - let page_buf = page.as_ptr(); - if cur_freeblock_ptr > 0 { - if cur_freeblock_ptr < cell_content_area_start as usize { - // Freeblocks exist in the cell content area e.g. after deletions - // They should never exist in the unused area of the page. - todo!("corrupted page"); - } - - let mut next = 0; - let mut size = 0; - loop { - // TODO: check corruption icellast - next = u16::from_be_bytes( - page_buf[cur_freeblock_ptr..cur_freeblock_ptr + 2] - .try_into() - .unwrap(), - ) as usize; // first 2 bytes in freeblock = next freeblock pointer - size = u16::from_be_bytes( - page_buf[cur_freeblock_ptr + 2..cur_freeblock_ptr + 4] - .try_into() - .unwrap(), - ) as usize; // next 2 bytes in freeblock = size of current freeblock - free_space_bytes += size; - // Freeblocks are in order from left to right on the page, - // so next pointer should > current pointer + its size, or 0 if no next block exists. - if next <= cur_freeblock_ptr + size + 3 { - break; - } - cur_freeblock_ptr = next; - } - - // Next should always be 0 (NULL) at this point since we have reached the end of the freeblocks linked list - assert_eq!( - next, 0, - "corrupted page: freeblocks list not in ascending order" - ); - - assert!( - cur_freeblock_ptr + size <= usable_space, - "corrupted page: last freeblock extends last page end" - ); - } - - assert!( - free_space_bytes <= usable_space, - "corrupted page: free space is greater than usable space" - ); - - // if( nFree>usableSize || nFree, amount: usize) -> usize { +fn find_free_cell(page_ref: &PageContent, usable_space: u16, amount: usize) -> usize { // NOTE: freelist is in ascending order of keys and pc // unuse_space is reserved bytes at the end of page, therefore we must substract from maxpc let mut pc = page_ref.first_freeblock() as usize; let buf = page_ref.as_ptr(); - let usable_space = (db_header.page_size - db_header.reserved_space as u16) as usize; + let usable_space = usable_space as usize; let maxpc = usable_space - amount; let mut found = false; while pc <= maxpc { @@ -2528,8 +2169,8 @@ pub fn btree_init_page( contents.write_u32(PAGE_HEADER_OFFSET_RIGHTMOST_PTR, 0); } -fn to_static_buf(buf: &[u8]) -> &'static [u8] { - unsafe { std::mem::transmute::<&[u8], &'static [u8]>(buf) } +fn to_static_buf(buf: &mut [u8]) -> &'static mut [u8] { + unsafe { std::mem::transmute::<&mut [u8], &'static mut [u8]>(buf) } } pub fn edit_page( @@ -2551,6 +2192,13 @@ pub fn edit_page( cell_array, usable_space, ); + // shift pointers left + let buf = page.as_ptr(); + let (start, _) = page.cell_pointer_array_offset_and_size(); + buf.copy_within( + start + (number_to_shift * 2)..start + (count_cells * 2), + start, + ); count_cells -= number_to_shift; // TODO: shift } @@ -2568,10 +2216,39 @@ pub fn edit_page( // TODO: make page_free_array defragment, for now I'm lazy so this will work for now. defragment_page(page, usable_space); // TODO: add to start + if start_new_cells < start_old_cells { + let count = number_new_cells.min(start_old_cells - start_new_cells); + page_insert_array(page, start_new_cells, count, cell_array, 0, usable_space); + count_cells += count; + } // TODO: overflow cells + for i in 0..page.overflow_cells.len() { + let overflow_cell = &page.overflow_cells[i]; + // cell index in context of new list of cells that should be in the page + let cell_idx = start_old_cells + overflow_cell.index - start_new_cells; + if cell_idx >= 0 && cell_idx < start_new_cells { + count_cells += 1; + page_insert_array( + page, + cell_idx + start_new_cells, + 1, + cell_array, + cell_idx, + usable_space, + ); + } + } // TODO: append cells to end - // TODO: update ncell, noverflow - // TODO: update ncell + page_insert_array( + page, + start_new_cells + count_cells, + number_new_cells - count_cells, + cell_array, + count_cells, + usable_space, + ); + // TODO: noverflow + page.write_u32(PAGE_HEADER_OFFSET_CELL_COUNT, count_cells as u32); } pub fn page_free_array( @@ -2608,8 +2285,15 @@ pub fn page_insert_array( first: usize, count: usize, cell_array: &CellArray, + mut start_insert: usize, usable_space: u16, ) { + // TODO: implement faster algorithm, this is doing extra work that's not needed. + // See pageInsertArray to understand faster way. + for i in first..first + count { + insert_into_cell(page, cell_array.cells[i], start_insert, usable_space); + start_insert += 1; + } } /// Free the range of bytes that a cell occupies. @@ -2766,6 +2450,176 @@ fn defragment_page(page: &PageContent, usable_space: u16) { assert!(first_cell <= cbrk); write_buf[first_cell as usize..cbrk as usize].fill(0); } + +/// Insert a record into a cell. +/// If the cell overflows, an overflow cell is created. +/// insert_into_cell() is called from insert_into_page(), +/// and the overflow cell count is used to determine if the page overflows, +/// i.e. whether we need to balance the btree after the insert. +fn insert_into_cell(page: &mut PageContent, payload: &[u8], cell_idx: usize, usable_space: u16) { + let free = compute_free_space(page, usable_space); + const CELL_POINTER_SIZE_BYTES: usize = 2; + let enough_space = payload.len() + CELL_POINTER_SIZE_BYTES <= free as usize; + if !enough_space { + // add to overflow cell + page.overflow_cells.push(OverflowCell { + index: cell_idx, + payload: Pin::new(Vec::from(payload)), + }); + return; + } + + // TODO: insert into cell payload in internal page + let new_cell_data_pointer = allocate_cell_space(page, payload.len() as u16, usable_space); + let buf = page.as_ptr(); + + // copy data + buf[new_cell_data_pointer as usize..new_cell_data_pointer as usize + payload.len()] + .copy_from_slice(payload); + // memmove(pIns+2, pIns, 2*(pPage->nCell - i)); + let (cell_pointer_array_start, _) = page.cell_pointer_array_offset_and_size(); + let cell_pointer_cur_idx = cell_pointer_array_start + (CELL_POINTER_SIZE_BYTES * cell_idx); + + // move existing pointers forward by CELL_POINTER_SIZE_BYTES... + let n_cells_forward = page.cell_count() - cell_idx; + let n_bytes_forward = CELL_POINTER_SIZE_BYTES * n_cells_forward; + if n_bytes_forward > 0 { + buf.copy_within( + cell_pointer_cur_idx..cell_pointer_cur_idx + n_bytes_forward, + cell_pointer_cur_idx + CELL_POINTER_SIZE_BYTES, + ); + } + // ...and insert new cell pointer at the current index + page.write_u16(cell_pointer_cur_idx - page.offset, new_cell_data_pointer); + + // update first byte of content area (cell data always appended to the left, so cell content area pointer moves to point to the new cell data) + page.write_u16(PAGE_HEADER_OFFSET_CELL_CONTENT_AREA, new_cell_data_pointer); + + // update cell count + let new_n_cells = (page.cell_count() + 1) as u16; + page.write_u16(PAGE_HEADER_OFFSET_CELL_COUNT, new_n_cells); +} + +/// Free blocks can be zero, meaning the "real free space" that can be used to allocate is expected to be between first cell byte +/// and end of cell pointer area. +#[allow(unused_assignments)] +fn compute_free_space(page: &PageContent, usable_space: u16) -> u16 { + // TODO(pere): maybe free space is not calculated correctly with offset + + // Usable space, not the same as free space, simply means: + // space that is not reserved for extensions by sqlite. Usually reserved_space is 0. + let usable_space = usable_space as usize; + + let mut cell_content_area_start = page.cell_content_area(); + // A zero value for the cell content area pointer is interpreted as 65536. + // See https://www.sqlite.org/fileformat.html + // The max page size for a sqlite database is 64kiB i.e. 65536 bytes. + // 65536 is u16::MAX + 1, and since cell content grows from right to left, this means + // the cell content area pointer is at the end of the page, + // i.e. + // 1. the page size is 64kiB + // 2. there are no cells on the page + // 3. there is no reserved space at the end of the page + if cell_content_area_start == 0 { + cell_content_area_start = u16::MAX; + } + + // The amount of free space is the sum of: + // #1. the size of the unallocated region + // #2. fragments (isolated 1-3 byte chunks of free space within the cell content area) + // #3. freeblocks (linked list of blocks of at least 4 bytes within the cell content area that are not in use due to e.g. deletions) + + let mut free_space_bytes = page.unallocated_region_size() + page.num_frag_free_bytes() as usize; + + // #3 is computed by iterating over the freeblocks linked list + let mut cur_freeblock_ptr = page.first_freeblock() as usize; + let page_buf = page.as_ptr(); + if cur_freeblock_ptr > 0 { + if cur_freeblock_ptr < cell_content_area_start as usize { + // Freeblocks exist in the cell content area e.g. after deletions + // They should never exist in the unused area of the page. + todo!("corrupted page"); + } + + let mut next = 0; + let mut size = 0; + loop { + // TODO: check corruption icellast + next = u16::from_be_bytes( + page_buf[cur_freeblock_ptr..cur_freeblock_ptr + 2] + .try_into() + .unwrap(), + ) as usize; // first 2 bytes in freeblock = next freeblock pointer + size = u16::from_be_bytes( + page_buf[cur_freeblock_ptr + 2..cur_freeblock_ptr + 4] + .try_into() + .unwrap(), + ) as usize; // next 2 bytes in freeblock = size of current freeblock + free_space_bytes += size; + // Freeblocks are in order from left to right on the page, + // so next pointer should > current pointer + its size, or 0 if no next block exists. + if next <= cur_freeblock_ptr + size + 3 { + break; + } + cur_freeblock_ptr = next; + } + + // Next should always be 0 (NULL) at this point since we have reached the end of the freeblocks linked list + assert!( + next == 0, + "corrupted page: freeblocks list not in ascending order" + ); + + assert!( + cur_freeblock_ptr + size <= usable_space, + "corrupted page: last freeblock extends last page end" + ); + } + + assert!( + free_space_bytes <= usable_space, + "corrupted page: free space is greater than usable space" + ); + + // if( nFree>usableSize || nFree u16 { + let amount = amount as usize; + + let (cell_offset, _) = page_ref.cell_pointer_array_offset_and_size(); + let gap = cell_offset + 2 * page_ref.cell_count(); + let mut top = page_ref.cell_content_area() as usize; + + // there are free blocks and enough space + if page_ref.first_freeblock() != 0 && gap + 2 <= top { + // find slot + let pc = find_free_cell(page_ref, usable_space, amount); + if pc != 0 { + return pc as u16; + } + /* fall through, we might need to defragment */ + } + + if gap + 2 + amount > top { + // defragment + defragment_page(page_ref, usable_space as u16); + top = page_ref.read_u16(PAGE_HEADER_OFFSET_CELL_CONTENT_AREA) as usize; + } + + top -= amount; + + page_ref.write_u16(PAGE_HEADER_OFFSET_CELL_CONTENT_AREA, top as u16); + + assert!(top + amount <= usable_space as usize); + top as u16 +} + #[cfg(test)] mod tests { use rand_chacha::rand_core::RngCore;