up to finish without fixing stuff

This commit is contained in:
Pere Diaz Bou
2025-02-09 21:59:23 +01:00
parent 0035b9d1bd
commit 05ca716f82

View File

@@ -2,8 +2,8 @@ use tracing::debug;
use crate::storage::pager::Pager;
use crate::storage::sqlite3_ondisk::{
read_btree_cell, read_u32, read_varint, BTreeCell, DatabaseHeader, PageContent, PageType,
TableInteriorCell, TableLeafCell,
read_u32, read_varint, BTreeCell, DatabaseHeader, PageContent, PageType, TableInteriorCell,
TableLeafCell,
};
use crate::types::{CursorResult, OwnedValue, Record, SeekKey, SeekOp};
@@ -82,8 +82,6 @@ enum WriteState {
BalanceStart,
BalanceNonRoot,
BalanceNonRootWaitLoadPages,
BalanceGetParentPage,
BalanceMoveUp,
Finish,
}
@@ -763,14 +761,18 @@ impl BTreeCursor {
// insert
let overflow = {
let contents = page.get().contents.as_mut().unwrap();
self.insert_into_cell(contents, cell_payload.as_slice(), cell_idx);
let overflow_cells = contents.overflow_cells.len();
debug!(
"insert_into_page(overflow, cell_count={}, overflow_cells={})",
contents.cell_count(),
overflow_cells
"insert_into_page(overflow, cell_count={})",
contents.cell_count()
);
overflow_cells
insert_into_cell(
contents,
cell_payload.as_slice(),
cell_idx,
self.usable_space() as u16,
);
contents.overflow_cells.len()
};
let write_info = self
.state
@@ -784,9 +786,7 @@ impl BTreeCursor {
}
WriteState::BalanceStart
| WriteState::BalanceNonRoot
| WriteState::BalanceNonRootWaitLoadPages
| WriteState::BalanceMoveUp
| WriteState::BalanceGetParentPage => {
| WriteState::BalanceNonRootWaitLoadPages => {
return_if_io!(self.balance());
}
WriteState::Finish => {
@@ -798,56 +798,6 @@ impl BTreeCursor {
return ret;
}
/// Insert a record into a cell.
/// If the cell overflows, an overflow cell is created.
/// insert_into_cell() is called from insert_into_page(),
/// and the overflow cell count is used to determine if the page overflows,
/// i.e. whether we need to balance the btree after the insert.
fn insert_into_cell(&self, page: &mut PageContent, payload: &[u8], cell_idx: usize) {
let free = self.compute_free_space(page, RefCell::borrow(&self.pager.db_header));
const CELL_POINTER_SIZE_BYTES: usize = 2;
let enough_space = payload.len() + CELL_POINTER_SIZE_BYTES <= free as usize;
if !enough_space {
// add to overflow cells
page.overflow_cells.push(OverflowCell {
index: cell_idx,
payload: Pin::new(Vec::from(payload)),
});
return;
}
// TODO: insert into cell payload in internal page
let new_cell_data_pointer = self
.allocate_cell_space(page, payload.len() as u16)
.unwrap();
let buf = page.as_ptr();
// Copy cell data
buf[new_cell_data_pointer as usize..new_cell_data_pointer as usize + payload.len()]
.copy_from_slice(payload);
// memmove(pIns+2, pIns, 2*(pPage->nCell - i));
let (cell_pointer_array_start, _) = page.cell_pointer_array_offset_and_size();
let cell_pointer_cur_idx = cell_pointer_array_start + (CELL_POINTER_SIZE_BYTES * cell_idx);
let cell_count = page.cell_count();
// Move existing pointers if needed
let n_bytes_forward = CELL_POINTER_SIZE_BYTES * (cell_count - cell_idx);
if n_bytes_forward > 0 {
buf.copy_within(
cell_pointer_cur_idx..cell_pointer_cur_idx + n_bytes_forward,
cell_pointer_cur_idx + CELL_POINTER_SIZE_BYTES,
);
}
// Insert new cell pointer at the current cell index
page.write_u16(cell_pointer_cur_idx - page.offset, new_cell_data_pointer);
// Update cell count
let new_n_cells = (page.cell_count() + 1) as u16;
page.write_u16(PAGE_HEADER_OFFSET_CELL_COUNT, new_n_cells);
}
/// Drop a cell from a page.
/// This is done by freeing the range of bytes that the cell occupies.
fn drop_cell(&self, page: &mut PageContent, cell_idx: usize) {
@@ -905,9 +855,7 @@ impl BTreeCursor {
self.stack.pop();
return_if_io!(self.balance_non_root());
}
WriteState::BalanceNonRoot
| WriteState::BalanceGetParentPage
| WriteState::BalanceMoveUp => {
WriteState::BalanceNonRoot | WriteState::BalanceNonRootWaitLoadPages => {
return_if_io!(self.balance_non_root());
}
@@ -927,11 +875,6 @@ impl BTreeCursor {
WriteState::Start => todo!(),
WriteState::BalanceStart => todo!(),
WriteState::BalanceNonRoot => {
// drop divider cells and find right pointer
// NOTE: since we are doing a simple split we only finding the pointer we want to update (right pointer).
// Right pointer means cell that points to the last page, as we don't really want to drop this one. This one
// can be a "rightmost pointer" or a "cell".
// we always asumme there is a parent
let parent_page = self.stack.top();
if parent_page.is_locked() {
return Ok(CursorResult::IO);
@@ -1050,6 +993,7 @@ impl BTreeCursor {
if !all_loaded {
return Ok(CursorResult::IO);
}
// Now do real balancing
let parent_page = self.stack.top();
let parent_contents = parent_page.get_contents();
assert!(
@@ -1062,7 +1006,7 @@ impl BTreeCursor {
// Get divider cells and max_cells
let mut max_cells = 0;
let pages_to_balance = write_info.pages_to_balance.borrow();
let pages_to_balance_new = write_info.pages_to_balance.borrow();
let mut pages_to_balance_new = Vec::new();
for i in (0..sibling_count).rev() {
let sibling_page = &pages_to_balance[i];
let sibling_contents = sibling_page.get_contents();
@@ -1101,7 +1045,6 @@ impl BTreeCursor {
.scratch_cells
.replace(Vec::with_capacity(max_cells));
let scratch_cells = write_info.scratch_cells.borrow_mut();
let mut cell_array = CellArray {
cells: Vec::new(),
number_of_cells_per_page: Vec::new(),
@@ -1116,7 +1059,6 @@ impl BTreeCursor {
let leaf_data = matches!(page_type, PageType::TableLeaf);
for (i, old_page) in pages_to_balance.iter().enumerate() {
let old_page_contents = old_page.get_contents();
let old_page_type = old_page_contents.page_type();
for cell_idx in 0..old_page_contents.cell_count() {
let (cell_start, cell_len) = old_page_contents.cell_get_raw_region(
cell_idx,
@@ -1125,9 +1067,9 @@ impl BTreeCursor {
self.usable_space(),
);
let buf = old_page_contents.as_ptr();
let cell_buf = &buf[cell_start..cell_start + cell_len];
let cell_buf = &mut buf[cell_start..cell_start + cell_len];
// TODO(pere): make this reference and not copy
cell_array.cells.push(cell_buf);
cell_array.cells.push(to_static_buf(cell_buf));
}
// Insert overflow cells into correct place
let mut offset = total_cells_inserted;
@@ -1136,10 +1078,11 @@ impl BTreeCursor {
1,
"todo: check this works for more than one overflow cell"
);
for overflow_cell in &old_page_contents.overflow_cells {
cell_array
.cells
.insert(offset + overflow_cell.index, &overflow_cell.payload);
for overflow_cell in old_page_contents.overflow_cells.iter_mut() {
cell_array.cells.insert(
offset + overflow_cell.index,
to_static_buf(&mut Pin::as_mut(&mut overflow_cell.payload)),
);
}
count_cells_in_old_pages.push(cell_array.cells.len() as u16);
@@ -1155,7 +1098,9 @@ impl BTreeCursor {
// from divider cells in index interior pages (parent) because those should not be included.
cells_inserted += 1;
divider_cells.push(divider_cell);
cell_array.cells.push(&divider_cells.last().unwrap());
cell_array
.cells
.push(to_static_buf(divider_cells.last_mut().unwrap().as_mut()));
}
total_cells_inserted += cells_inserted;
}
@@ -1169,10 +1114,9 @@ impl BTreeCursor {
cell_array
.number_of_cells_per_page
.push(count_cells_in_old_pages[i]);
let page = pages_to_balance[i];
let page = &pages_to_balance[i];
let page_contents = page.get_contents();
let free_space =
self.compute_free_space(&page_contents, self.database_header.borrow());
let free_space = compute_free_space(&page_contents, self.usable_space() as u16);
// If we have an empty page of cells, we ignore it
if k > 0
@@ -1319,7 +1263,7 @@ impl BTreeCursor {
pages_to_balance[i].set_dirty();
pages_to_balance_new.push(pages_to_balance[i].clone());
} else {
let page = self.allocate_page(page_type, 0);
let page = self.allocate_page(page_type.clone(), 0);
pages_to_balance_new.push(page);
}
}
@@ -1373,7 +1317,12 @@ impl BTreeCursor {
// Leaf index
divider_cell[0..4].copy_from_slice(&(page.get().id as u32).to_be_bytes());
}
self.insert_into_cell(parent_contents, &divider_cell, first_divider_cell + i);
insert_into_cell(
parent_contents,
&divider_cell,
first_divider_cell + i,
self.usable_space() as u16,
);
}
// TODO: update pages
let mut done = vec![false; sibling_count_new];
@@ -1412,196 +1361,15 @@ impl BTreeCursor {
&cell_array,
usable_space as u16,
);
page.overflow_cells.clear();
done[page_idx] = true;
}
}
// TODO: balance root
return Ok(CursorResult::IO);
}
WriteState::BalanceGetParentPage => {
let parent = self.stack.parent();
let loaded = parent.is_loaded();
return_if_locked!(parent);
if !loaded {
debug!("balance_leaf(loading page)");
self.pager.load_page(parent.clone())?;
return Ok(CursorResult::IO);
}
parent.set_dirty();
(WriteState::BalanceMoveUp, Ok(CursorResult::Ok(())))
}
WriteState::BalanceMoveUp => {
let parent = self.stack.parent();
let (page_type, current_idx) = {
let current_page = self.stack.top();
let contents = current_page.get().contents.as_ref().unwrap();
(contents.page_type().clone(), current_page.get().id)
};
parent.set_dirty();
self.pager.add_dirty(parent.get().id);
let parent_contents = parent.get().contents.as_mut().unwrap();
// if this isn't empty next loop won't work
assert_eq!(parent_contents.overflow_cells.len(), 0);
// Right page pointer is u32 in right most pointer, and in cell is u32 too, so we can use a *u32 to hold where we want to change this value
let mut right_pointer = PAGE_HEADER_OFFSET_RIGHTMOST_PTR;
for cell_idx in 0..parent_contents.cell_count() {
let cell = parent_contents.cell_get(
cell_idx,
self.pager.clone(),
self.payload_overflow_threshold_max(page_type.clone()),
self.payload_overflow_threshold_min(page_type.clone()),
self.usable_space(),
)?;
let found = match cell {
BTreeCell::TableInteriorCell(interior) => {
interior._left_child_page as usize == current_idx
}
_ => unreachable!("Parent should always be an interior page"),
};
if found {
let (start, _len) = parent_contents.cell_get_raw_region(
cell_idx,
self.payload_overflow_threshold_max(page_type.clone()),
self.payload_overflow_threshold_min(page_type.clone()),
self.usable_space(),
);
right_pointer = start;
break;
}
}
let write_info = self.state.write_info().unwrap();
let mut split_pages = write_info.split_pages.borrow_mut();
let split_pages_len = split_pages.len();
let scratch_cells = write_info.scratch_cells.borrow();
// reset pages
for page in split_pages.iter() {
assert!(page.is_dirty());
let contents = page.get().contents.as_mut().unwrap();
contents.write_u16(PAGE_HEADER_OFFSET_FIRST_FREEBLOCK, 0);
contents.write_u16(PAGE_HEADER_OFFSET_CELL_COUNT, 0);
contents.write_u16(
PAGE_HEADER_OFFSET_CELL_CONTENT_AREA,
self.usable_space() as u16,
);
contents.write_u8(PAGE_HEADER_OFFSET_FRAGMENTED_BYTES_COUNT, 0);
if !contents.is_leaf() {
contents.write_u32(PAGE_HEADER_OFFSET_RIGHTMOST_PTR, 0);
}
}
let mut current_cell_index = 0_usize;
/* index to scratch cells that will be used as dividers in order */
let mut divider_cells_index = Vec::with_capacity(split_pages.len());
debug!("balance_leaf::distribute(cells={})", scratch_cells.len());
for (i, page) in split_pages.iter_mut().enumerate() {
let page_id = page.get().id;
let contents = page.get().contents.as_mut().unwrap();
let cells_to_copy = write_info.split_pages_cells_count.borrow()[i];
debug!(
"balance_leaf::distribute(page={}, cells_to_copy={})",
page_id, cells_to_copy
);
let cell_index_range = current_cell_index..current_cell_index + cells_to_copy;
for (j, cell_idx) in cell_index_range.enumerate() {
debug!("balance_leaf::distribute_in_page(page={}, cells_to_copy={}, j={}, cell_idx={})", page_id, cells_to_copy, j, cell_idx);
let cell = scratch_cells[cell_idx];
self.insert_into_cell(contents, cell, j);
}
divider_cells_index.push(current_cell_index + cells_to_copy - 1);
current_cell_index += cells_to_copy;
}
let is_leaf = {
let page = self.stack.top();
let page = page.get().contents.as_ref().unwrap();
page.is_leaf()
};
// update rightmost pointer for each page if we are in interior page
if !is_leaf {
for page in split_pages.iter_mut().take(split_pages_len - 1) {
let contents = page.get().contents.as_mut().unwrap();
assert!(contents.cell_count() >= 1);
let last_cell = contents.cell_get(
contents.cell_count() - 1,
self.pager.clone(),
self.payload_overflow_threshold_max(contents.page_type()),
self.payload_overflow_threshold_min(contents.page_type()),
self.usable_space(),
)?;
let last_cell_pointer = match last_cell {
BTreeCell::TableInteriorCell(interior) => interior._left_child_page,
_ => unreachable!(),
};
self.drop_cell(contents, contents.cell_count() - 1);
contents.write_u32(PAGE_HEADER_OFFSET_RIGHTMOST_PTR, last_cell_pointer);
}
// last page right most pointer points to previous right most pointer before splitting
let last_page = split_pages.last().unwrap();
let last_page_contents = last_page.get().contents.as_mut().unwrap();
last_page_contents.write_u32(
PAGE_HEADER_OFFSET_RIGHTMOST_PTR,
write_info.rightmost_pointer.borrow().unwrap(),
);
}
// insert dividers in parent
// we can consider dividers the first cell of each page starting from the second page
for (page_id_index, page) in
split_pages.iter_mut().take(split_pages_len - 1).enumerate()
{
let contents = page.get().contents.as_mut().unwrap();
let divider_cell_index = divider_cells_index[page_id_index];
let cell_payload = scratch_cells[divider_cell_index];
let cell = read_btree_cell(
cell_payload,
&contents.page_type(),
0,
self.pager.clone(),
self.payload_overflow_threshold_max(contents.page_type()),
self.payload_overflow_threshold_min(contents.page_type()),
self.usable_space(),
)?;
let key = match cell {
BTreeCell::TableLeafCell(TableLeafCell { _rowid, .. })
| BTreeCell::TableInteriorCell(TableInteriorCell { _rowid, .. }) => _rowid,
_ => unreachable!(),
};
let mut divider_cell = Vec::with_capacity(4 + 9); // 4 - page id, 9 - max length of varint
divider_cell.extend_from_slice(&(page.get().id as u32).to_be_bytes());
write_varint_to_vec(key, &mut divider_cell);
let parent_cell_idx = self.find_cell(parent_contents, key);
self.insert_into_cell(parent_contents, &divider_cell, parent_cell_idx);
}
{
// copy last page id to right pointer
let last_pointer = split_pages.last().unwrap().get().id as u32;
parent_contents.write_u32(right_pointer, last_pointer);
}
self.stack.pop();
let _ = write_info.page_copy.take();
(WriteState::BalanceStart, Ok(CursorResult::Ok(())))
// TODO: free pages
return Ok(CursorResult::IO);
}
WriteState::Finish => todo!(),
};
@@ -1711,133 +1479,6 @@ impl BTreeCursor {
page
}
/// Allocate space for a cell on a page.
fn allocate_cell_space(&self, page_ref: &mut PageContent, amount: u16) -> Result<u16> {
let amount = amount as usize;
let (cell_offset, _) = page_ref.cell_pointer_array_offset_and_size();
let gap = cell_offset + 2 * page_ref.cell_count();
let mut top = page_ref.cell_content_area() as usize;
if page_ref.first_freeblock() != 0 && gap + 2 <= top {
let db_header = RefCell::borrow(&self.pager.db_header);
let pc = self.find_free_cell(page_ref, amount, db_header)?;
if pc != 0 {
// Corruption check
if pc <= gap {
return Err(LimboError::Corrupt(
"Corrupted page: free block overlaps cell pointer array".into(),
));
}
return Ok(pc as u16);
}
}
if gap + 2 + amount > top {
// defragment
defragment_page(page_ref, self.usable_space() as u16);
top = page_ref.read_u16(PAGE_HEADER_OFFSET_CELL_CONTENT_AREA) as usize;
assert!(gap + 2 + amount <= top);
}
top -= amount;
page_ref.write_u16(PAGE_HEADER_OFFSET_CELL_CONTENT_AREA, top as u16);
let db_header = RefCell::borrow(&self.pager.db_header);
let usable_space = (db_header.page_size - db_header.reserved_space as u16) as usize;
assert!(top + amount <= usable_space);
Ok(top as u16)
}
/// Free blocks can be zero, meaning the "real free space" that can be used to allocate is expected to be between first cell byte
/// and end of cell pointer area.
#[allow(unused_assignments)]
fn compute_free_space(&self, page: &PageContent, db_header: Ref<DatabaseHeader>) -> u16 {
// TODO(pere): maybe free space is not calculated correctly with offset
// Usable space, not the same as free space, simply means:
// space that is not reserved for extensions by sqlite. Usually reserved_space is 0.
let usable_space = (db_header.page_size - db_header.reserved_space as u16) as usize;
let mut cell_content_area_start = page.cell_content_area();
// A zero value for the cell content area pointer is interpreted as 65536.
// See https://www.sqlite.org/fileformat.html
// The max page size for a sqlite database is 64kiB i.e. 65536 bytes.
// 65536 is u16::MAX + 1, and since cell content grows from right to left, this means
// the cell content area pointer is at the end of the page,
// i.e.
// 1. the page size is 64kiB
// 2. there are no cells on the page
// 3. there is no reserved space at the end of the page
if cell_content_area_start == 0 {
cell_content_area_start = u16::MAX;
}
// The amount of free space is the sum of:
// #1. the size of the unallocated region
// #2. fragments (isolated 1-3 byte chunks of free space within the cell content area)
// #3. freeblocks (linked list of blocks of at least 4 bytes within the cell content area that are not in use due to e.g. deletions)
let mut free_space_bytes =
page.unallocated_region_size() + page.num_frag_free_bytes() as usize;
// #3 is computed by iterating over the freeblocks linked list
let mut cur_freeblock_ptr = page.first_freeblock() as usize;
let page_buf = page.as_ptr();
if cur_freeblock_ptr > 0 {
if cur_freeblock_ptr < cell_content_area_start as usize {
// Freeblocks exist in the cell content area e.g. after deletions
// They should never exist in the unused area of the page.
todo!("corrupted page");
}
let mut next = 0;
let mut size = 0;
loop {
// TODO: check corruption icellast
next = u16::from_be_bytes(
page_buf[cur_freeblock_ptr..cur_freeblock_ptr + 2]
.try_into()
.unwrap(),
) as usize; // first 2 bytes in freeblock = next freeblock pointer
size = u16::from_be_bytes(
page_buf[cur_freeblock_ptr + 2..cur_freeblock_ptr + 4]
.try_into()
.unwrap(),
) as usize; // next 2 bytes in freeblock = size of current freeblock
free_space_bytes += size;
// Freeblocks are in order from left to right on the page,
// so next pointer should > current pointer + its size, or 0 if no next block exists.
if next <= cur_freeblock_ptr + size + 3 {
break;
}
cur_freeblock_ptr = next;
}
// Next should always be 0 (NULL) at this point since we have reached the end of the freeblocks linked list
assert_eq!(
next, 0,
"corrupted page: freeblocks list not in ascending order"
);
assert!(
cur_freeblock_ptr + size <= usable_space,
"corrupted page: last freeblock extends last page end"
);
}
assert!(
free_space_bytes <= usable_space,
"corrupted page: free space is greater than usable space"
);
// if( nFree>usableSize || nFree<iCellFirst ){
// return SQLITE_CORRUPT_PAGE(pPage);
// }
free_space_bytes as u16
}
/// Fill in the cell payload with the record.
/// If the record is too large to fit in the cell, it will spill onto overflow pages.
fn fill_cell_payload(
@@ -2476,14 +2117,14 @@ impl CellArray {
}
}
fn find_free_cell(page_ref: &PageContent, db_header: Ref<DatabaseHeader>, amount: usize) -> usize {
fn find_free_cell(page_ref: &PageContent, usable_space: u16, amount: usize) -> usize {
// NOTE: freelist is in ascending order of keys and pc
// unuse_space is reserved bytes at the end of page, therefore we must substract from maxpc
let mut pc = page_ref.first_freeblock() as usize;
let buf = page_ref.as_ptr();
let usable_space = (db_header.page_size - db_header.reserved_space as u16) as usize;
let usable_space = usable_space as usize;
let maxpc = usable_space - amount;
let mut found = false;
while pc <= maxpc {
@@ -2528,8 +2169,8 @@ pub fn btree_init_page(
contents.write_u32(PAGE_HEADER_OFFSET_RIGHTMOST_PTR, 0);
}
fn to_static_buf(buf: &[u8]) -> &'static [u8] {
unsafe { std::mem::transmute::<&[u8], &'static [u8]>(buf) }
fn to_static_buf(buf: &mut [u8]) -> &'static mut [u8] {
unsafe { std::mem::transmute::<&mut [u8], &'static mut [u8]>(buf) }
}
pub fn edit_page(
@@ -2551,6 +2192,13 @@ pub fn edit_page(
cell_array,
usable_space,
);
// shift pointers left
let buf = page.as_ptr();
let (start, _) = page.cell_pointer_array_offset_and_size();
buf.copy_within(
start + (number_to_shift * 2)..start + (count_cells * 2),
start,
);
count_cells -= number_to_shift;
// TODO: shift
}
@@ -2568,10 +2216,39 @@ pub fn edit_page(
// TODO: make page_free_array defragment, for now I'm lazy so this will work for now.
defragment_page(page, usable_space);
// TODO: add to start
if start_new_cells < start_old_cells {
let count = number_new_cells.min(start_old_cells - start_new_cells);
page_insert_array(page, start_new_cells, count, cell_array, 0, usable_space);
count_cells += count;
}
// TODO: overflow cells
for i in 0..page.overflow_cells.len() {
let overflow_cell = &page.overflow_cells[i];
// cell index in context of new list of cells that should be in the page
let cell_idx = start_old_cells + overflow_cell.index - start_new_cells;
if cell_idx >= 0 && cell_idx < start_new_cells {
count_cells += 1;
page_insert_array(
page,
cell_idx + start_new_cells,
1,
cell_array,
cell_idx,
usable_space,
);
}
}
// TODO: append cells to end
// TODO: update ncell, noverflow
// TODO: update ncell
page_insert_array(
page,
start_new_cells + count_cells,
number_new_cells - count_cells,
cell_array,
count_cells,
usable_space,
);
// TODO: noverflow
page.write_u32(PAGE_HEADER_OFFSET_CELL_COUNT, count_cells as u32);
}
pub fn page_free_array(
@@ -2608,8 +2285,15 @@ pub fn page_insert_array(
first: usize,
count: usize,
cell_array: &CellArray,
mut start_insert: usize,
usable_space: u16,
) {
// TODO: implement faster algorithm, this is doing extra work that's not needed.
// See pageInsertArray to understand faster way.
for i in first..first + count {
insert_into_cell(page, cell_array.cells[i], start_insert, usable_space);
start_insert += 1;
}
}
/// Free the range of bytes that a cell occupies.
@@ -2766,6 +2450,176 @@ fn defragment_page(page: &PageContent, usable_space: u16) {
assert!(first_cell <= cbrk);
write_buf[first_cell as usize..cbrk as usize].fill(0);
}
/// Insert a record into a cell.
/// If the cell overflows, an overflow cell is created.
/// insert_into_cell() is called from insert_into_page(),
/// and the overflow cell count is used to determine if the page overflows,
/// i.e. whether we need to balance the btree after the insert.
fn insert_into_cell(page: &mut PageContent, payload: &[u8], cell_idx: usize, usable_space: u16) {
let free = compute_free_space(page, usable_space);
const CELL_POINTER_SIZE_BYTES: usize = 2;
let enough_space = payload.len() + CELL_POINTER_SIZE_BYTES <= free as usize;
if !enough_space {
// add to overflow cell
page.overflow_cells.push(OverflowCell {
index: cell_idx,
payload: Pin::new(Vec::from(payload)),
});
return;
}
// TODO: insert into cell payload in internal page
let new_cell_data_pointer = allocate_cell_space(page, payload.len() as u16, usable_space);
let buf = page.as_ptr();
// copy data
buf[new_cell_data_pointer as usize..new_cell_data_pointer as usize + payload.len()]
.copy_from_slice(payload);
// memmove(pIns+2, pIns, 2*(pPage->nCell - i));
let (cell_pointer_array_start, _) = page.cell_pointer_array_offset_and_size();
let cell_pointer_cur_idx = cell_pointer_array_start + (CELL_POINTER_SIZE_BYTES * cell_idx);
// move existing pointers forward by CELL_POINTER_SIZE_BYTES...
let n_cells_forward = page.cell_count() - cell_idx;
let n_bytes_forward = CELL_POINTER_SIZE_BYTES * n_cells_forward;
if n_bytes_forward > 0 {
buf.copy_within(
cell_pointer_cur_idx..cell_pointer_cur_idx + n_bytes_forward,
cell_pointer_cur_idx + CELL_POINTER_SIZE_BYTES,
);
}
// ...and insert new cell pointer at the current index
page.write_u16(cell_pointer_cur_idx - page.offset, new_cell_data_pointer);
// update first byte of content area (cell data always appended to the left, so cell content area pointer moves to point to the new cell data)
page.write_u16(PAGE_HEADER_OFFSET_CELL_CONTENT_AREA, new_cell_data_pointer);
// update cell count
let new_n_cells = (page.cell_count() + 1) as u16;
page.write_u16(PAGE_HEADER_OFFSET_CELL_COUNT, new_n_cells);
}
/// Free blocks can be zero, meaning the "real free space" that can be used to allocate is expected to be between first cell byte
/// and end of cell pointer area.
#[allow(unused_assignments)]
fn compute_free_space(page: &PageContent, usable_space: u16) -> u16 {
// TODO(pere): maybe free space is not calculated correctly with offset
// Usable space, not the same as free space, simply means:
// space that is not reserved for extensions by sqlite. Usually reserved_space is 0.
let usable_space = usable_space as usize;
let mut cell_content_area_start = page.cell_content_area();
// A zero value for the cell content area pointer is interpreted as 65536.
// See https://www.sqlite.org/fileformat.html
// The max page size for a sqlite database is 64kiB i.e. 65536 bytes.
// 65536 is u16::MAX + 1, and since cell content grows from right to left, this means
// the cell content area pointer is at the end of the page,
// i.e.
// 1. the page size is 64kiB
// 2. there are no cells on the page
// 3. there is no reserved space at the end of the page
if cell_content_area_start == 0 {
cell_content_area_start = u16::MAX;
}
// The amount of free space is the sum of:
// #1. the size of the unallocated region
// #2. fragments (isolated 1-3 byte chunks of free space within the cell content area)
// #3. freeblocks (linked list of blocks of at least 4 bytes within the cell content area that are not in use due to e.g. deletions)
let mut free_space_bytes = page.unallocated_region_size() + page.num_frag_free_bytes() as usize;
// #3 is computed by iterating over the freeblocks linked list
let mut cur_freeblock_ptr = page.first_freeblock() as usize;
let page_buf = page.as_ptr();
if cur_freeblock_ptr > 0 {
if cur_freeblock_ptr < cell_content_area_start as usize {
// Freeblocks exist in the cell content area e.g. after deletions
// They should never exist in the unused area of the page.
todo!("corrupted page");
}
let mut next = 0;
let mut size = 0;
loop {
// TODO: check corruption icellast
next = u16::from_be_bytes(
page_buf[cur_freeblock_ptr..cur_freeblock_ptr + 2]
.try_into()
.unwrap(),
) as usize; // first 2 bytes in freeblock = next freeblock pointer
size = u16::from_be_bytes(
page_buf[cur_freeblock_ptr + 2..cur_freeblock_ptr + 4]
.try_into()
.unwrap(),
) as usize; // next 2 bytes in freeblock = size of current freeblock
free_space_bytes += size;
// Freeblocks are in order from left to right on the page,
// so next pointer should > current pointer + its size, or 0 if no next block exists.
if next <= cur_freeblock_ptr + size + 3 {
break;
}
cur_freeblock_ptr = next;
}
// Next should always be 0 (NULL) at this point since we have reached the end of the freeblocks linked list
assert!(
next == 0,
"corrupted page: freeblocks list not in ascending order"
);
assert!(
cur_freeblock_ptr + size <= usable_space,
"corrupted page: last freeblock extends last page end"
);
}
assert!(
free_space_bytes <= usable_space,
"corrupted page: free space is greater than usable space"
);
// if( nFree>usableSize || nFree<iCellFirst ){
// return SQLITE_CORRUPT_PAGE(pPage);
// }
free_space_bytes as u16
}
/// Allocate space for a cell on a page.
fn allocate_cell_space(page_ref: &PageContent, amount: u16, usable_space: u16) -> u16 {
let amount = amount as usize;
let (cell_offset, _) = page_ref.cell_pointer_array_offset_and_size();
let gap = cell_offset + 2 * page_ref.cell_count();
let mut top = page_ref.cell_content_area() as usize;
// there are free blocks and enough space
if page_ref.first_freeblock() != 0 && gap + 2 <= top {
// find slot
let pc = find_free_cell(page_ref, usable_space, amount);
if pc != 0 {
return pc as u16;
}
/* fall through, we might need to defragment */
}
if gap + 2 + amount > top {
// defragment
defragment_page(page_ref, usable_space as u16);
top = page_ref.read_u16(PAGE_HEADER_OFFSET_CELL_CONTENT_AREA) as usize;
}
top -= amount;
page_ref.write_u16(PAGE_HEADER_OFFSET_CELL_CONTENT_AREA, top as u16);
assert!(top + amount <= usable_space as usize);
top as u16
}
#[cfg(test)]
mod tests {
use rand_chacha::rand_core::RngCore;