some fixes

This commit is contained in:
Pere Diaz Bou
2025-02-10 12:00:23 +01:00
parent 05ca716f82
commit 0c015e43a2

View File

@@ -304,7 +304,7 @@ impl BTreeCursor {
let mem_page_rc = self.stack.top();
let cell_idx = self.stack.current_cell_index() as usize;
debug!("current id={} cell={}", mem_page_rc.get().id, cell_idx);
// debug!("current id={} cell={}", mem_page_rc.get().id, cell_idx);
return_if_locked!(mem_page_rc);
if !mem_page_rc.is_loaded() {
self.pager.load_page(mem_page_rc.clone())?;
@@ -870,11 +870,18 @@ impl BTreeCursor {
matches!(self.state, CursorState::Write(_)),
"Cursor must be in balancing state"
);
let state = self.state.write_info().expect("must be balancing").state;
let state = self
.state
.write_info()
.expect("must be balancing")
.state
.clone();
tracing::trace!("balance_non_root(state={:?})", state);
let (next_write_state, result) = match state {
WriteState::Start => todo!(),
WriteState::BalanceStart => todo!(),
WriteState::BalanceNonRoot => {
let write_info = self.state.write_info().unwrap();
let parent_page = self.stack.top();
if parent_page.is_locked() {
return Ok(CursorResult::IO);
@@ -884,6 +891,8 @@ impl BTreeCursor {
self.pager.load_page(parent_page.clone())?;
return Ok(CursorResult::IO);
}
parent_page.set_dirty();
self.pager.add_dirty(parent_page.get().id);
let parent_contents = parent_page.get().contents.as_ref().unwrap();
let page_to_balance_idx = self.stack.current_cell_index() as usize;
@@ -893,9 +902,9 @@ impl BTreeCursor {
page_to_balance_idx
);
// Part 1: Find the sibling pages to balance
self.write_info.new_pages.borrow_mut().clear();
self.write_info.pages_to_balance.borrow_mut().clear();
self.write_info.divider_cells.borrow_mut().clear();
write_info.new_pages.borrow_mut().clear();
write_info.pages_to_balance.borrow_mut().clear();
write_info.divider_cells.borrow_mut().clear();
let number_of_cells_in_parent =
parent_contents.cell_count() + parent_contents.overflow_cells.len();
@@ -922,10 +931,8 @@ impl BTreeCursor {
};
(2, next_divider)
};
self.write_info.sibling_count.replace(sibling_pointer + 1);
self.write_info
.first_divider_cell
.replace(first_cell_divider);
write_info.sibling_count.replace(sibling_pointer + 1);
write_info.first_divider_cell.replace(first_cell_divider);
let last_sibling_is_right_pointer = sibling_pointer + first_cell_divider
- parent_contents.overflow_cells.len()
@@ -942,19 +949,21 @@ impl BTreeCursor {
// load sibling pages
// start loading right page first
let mut pgno: u32 = unsafe { right_pointer.cast::<u32>().read() };
let mut pgno: u32 = unsafe { right_pointer.cast::<u32>().read().swap_bytes() };
dbg!(pgno);
let mut current_sibling = sibling_pointer;
for i in (0..=current_sibling).rev() {
let page = self.pager.read_page(pgno as usize)?;
page.set_dirty();
self.pager.add_dirty(page.get().id);
self.write_info.pages_to_balance.borrow_mut().push(page);
write_info.pages_to_balance.borrow_mut().push(page);
assert_eq!(
parent_contents.overflow_cells.len(),
0,
"overflow in parent is not yet implented while balancing it"
);
let next_cell_divider = i + first_cell_divider;
if i == 0 {
break;
}
let next_cell_divider = i + first_cell_divider - 1;
pgno = match parent_contents.cell_get(
next_cell_divider,
self.pager.clone(),
@@ -989,7 +998,7 @@ impl BTreeCursor {
.pages_to_balance
.borrow()
.iter()
.all(|page| page.is_locked());
.all(|page| !page.is_locked());
if !all_loaded {
return Ok(CursorResult::IO);
}
@@ -1010,13 +1019,15 @@ impl BTreeCursor {
for i in (0..sibling_count).rev() {
let sibling_page = &pages_to_balance[i];
let sibling_contents = sibling_page.get_contents();
sibling_page.set_dirty();
self.pager.add_dirty(sibling_page.get().id);
max_cells += sibling_contents.cell_count();
if i == 0 {
// we don't have left sibling from this one so we break
break;
}
// Since we know we have a left sibling, take the divider that points to left sibling of this page
let cell_idx = first_divider_cell - i - 1;
let cell_idx = first_divider_cell + i - 1;
let (cell_start, cell_len) = parent_contents.cell_get_raw_region(
cell_idx,
self.payload_overflow_threshold_max(parent_contents.page_type()),
@@ -1073,9 +1084,8 @@ impl BTreeCursor {
}
// Insert overflow cells into correct place
let mut offset = total_cells_inserted;
assert_eq!(
old_page_contents.overflow_cells.len(),
1,
assert!(
old_page_contents.overflow_cells.len() <= 1,
"todo: check this works for more than one overflow cell"
);
for overflow_cell in old_page_contents.overflow_cells.iter_mut() {
@@ -1109,6 +1119,8 @@ impl BTreeCursor {
let mut new_page_sizes = Vec::new();
let mut k = 0;
// todo: add leaf correction
// number of bytes beyond header, differnet from global usableSapce which inccludes
// header
let usable_space = self.usable_space() - 12;
for i in 0..sibling_count {
cell_array
@@ -1136,15 +1148,28 @@ impl BTreeCursor {
}
k += 1;
}
for n in cell_array.number_of_cells_per_page.iter().enumerate() {
println!("init count page={}, n={}", n.0, n.1);
}
// Try to pack as many cells to the left
let mut sibling_count_new = sibling_count;
let mut i = 0;
while i < sibling_count_new {
for n in cell_array.number_of_cells_per_page.iter().enumerate() {
println!("start count i={} page={}, n={}", i, n.0, n.1);
}
// First try to move cells to the right if they do not fit
while new_page_sizes[i] > usable_space as u16 {
let needs_new_page = i + 2 >= sibling_count_new;
println!("moving right {}", i);
let needs_new_page = i + 1 >= sibling_count_new;
if needs_new_page {
sibling_count_new += 1;
println!("adding new page");
new_page_sizes.push(0);
cell_array
.number_of_cells_per_page
.push(cell_array.cells.len() as u16);
assert!(
sibling_count_new <= 5,
"it is corrupt to require more than 5 pages to balance 3 siblings"
@@ -1166,12 +1191,13 @@ impl BTreeCursor {
} else {
size_of_cell_to_remove_from_left
};
new_page_sizes[i + 1] -= size_of_cell_to_move_right;
new_page_sizes[i + 1] += size_of_cell_to_move_right;
cell_array.number_of_cells_per_page[i] -= 1;
}
// Now try to take from the right if we didn't have enough
while cell_array.number_of_cells_per_page[i] < cell_array.cells.len() as u16 {
println!("moving left {}", i);
let size_of_cell_to_remove_from_right =
2 + cell_array.cells[cell_array.cell_count(i)].len() as u16;
let can_take = new_page_sizes[i] + size_of_cell_to_remove_from_right
@@ -1194,15 +1220,22 @@ impl BTreeCursor {
size_of_cell_to_remove_from_right
};
new_page_sizes[i + 1] += size_of_cell_to_remove_from_right;
new_page_sizes[i + 1] -= size_of_cell_to_remove_from_right;
}
let we_still_need_another_page =
cell_array.number_of_cells_per_page[i] >= cell_array.cells.len() as u16;
if we_still_need_another_page {
sibling_count_new += 1;
dbg!("we need");
sibling_count_new = i + 1;
}
i += 1;
if i >= sibling_count_new {
break;
}
}
for n in cell_array.number_of_cells_per_page.iter().enumerate() {
println!("start count page={}, n={}", n.0, n.1);
}
// Comment borrowed from SQLite src/btree.c
@@ -1222,6 +1255,7 @@ impl BTreeCursor {
// if leaf_data means we don't have divider, so the one we move from left is
// the same we add to right (we don't add divider to right).
let mut cell_right = cell_left + 1 - leaf_data as u16;
log::trace!("start cell_left={}", cell_left);
loop {
let cell_left_size = cell_array.cell_size(cell_left as usize);
let cell_right_size = cell_array.cell_size(cell_right as usize);
@@ -1244,9 +1278,13 @@ impl BTreeCursor {
cell_left -= 1;
cell_right -= 1;
}
tracing::trace!("end cell_left={}", cell_left);
new_page_sizes[i] = size_right_page;
new_page_sizes[i - 1] = size_left_page;
for n in cell_array.number_of_cells_per_page.iter().enumerate() {
println!("new count page={}, n={}", n.0, n.1);
}
assert!(
cell_array.number_of_cells_per_page[i - 1]
> if i > 1 {
@@ -1281,10 +1319,15 @@ impl BTreeCursor {
}
}
// Write right pointer in parent page to point to new rightmost page
parent_contents.write_u32(
PAGE_HEADER_OFFSET_RIGHTMOST_PTR,
pages_to_balance_new.last().unwrap().get().id as u32,
);
// Ensure right-child pointer of the right-most new sibling pge points to the page
// that was originally on that place.
let is_leaf_page =
matches!(page_type, PageType::TableInterior | PageType::IndexInterior);
let is_leaf_page = matches!(page_type, PageType::TableLeaf | PageType::IndexLeaf);
if !is_leaf_page {
let last_page = pages_to_balance.last().unwrap();
let right_pointer = last_page.get_contents().rightmost_pointer().unwrap();
@@ -1301,25 +1344,29 @@ impl BTreeCursor {
let divider_cell_idx = cell_array.cell_count(i);
let divider_cell = &mut cell_array.cells[divider_cell_idx];
let page = &pages_to_balance_new[i];
// FIXME: dont use auxiliary space, could be done without allocations
let mut new_divider_cell = Vec::new();
if !is_leaf_page {
// Interior
page.get_contents()
.write_u32(PAGE_HEADER_OFFSET_RIGHTMOST_PTR, page.get().id as u32);
new_divider_cell.extend_from_slice(divider_cell);
} else if leaf_data {
// Leaf table
// FIXME: not needed conversion
// FIXME: need to update cell size in order to free correctly?
// insert into cell with correct range should be enough
let rowid = read_u32(divider_cell, 4);
divider_cell[0..4].copy_from_slice(&(page.get().id as u32).to_be_bytes());
divider_cell[4..8].copy_from_slice(&rowid.to_be_bytes());
let (_, n_bytes_payload) = read_varint(divider_cell)?;
let (rowid, _) = read_varint(&divider_cell[n_bytes_payload..])?;
new_divider_cell.extend_from_slice(&(page.get().id as u32).to_be_bytes());
write_varint_to_vec(rowid, &mut new_divider_cell);
} else {
// Leaf index
divider_cell[0..4].copy_from_slice(&(page.get().id as u32).to_be_bytes());
new_divider_cell.extend_from_slice(divider_cell);
}
insert_into_cell(
parent_contents,
&divider_cell,
&new_divider_cell,
first_divider_cell + i,
self.usable_space() as u16,
);
@@ -1346,10 +1393,12 @@ impl BTreeCursor {
} else {
cell_array.cells.len()
};
let start_new_cells =
cell_array.cell_count(page_idx - 1) + (!leaf_data) as usize;
(
start_old_cells,
cell_array.cell_count(page_idx - 1) + (!leaf_data) as usize,
cell_array.cell_count(0),
start_new_cells,
cell_array.cell_count(page_idx) - start_new_cells,
)
};
let page = pages_to_balance_new[page_idx].get_contents();
@@ -1359,7 +1408,12 @@ impl BTreeCursor {
start_new_cells,
number_new_cells,
&cell_array,
usable_space as u16,
self.usable_space() as u16,
);
tracing::trace!(
"edit_page page={} cells={}",
pages_to_balance_new[page_idx].get().id,
page.cell_count()
);
page.overflow_cells.clear();
@@ -1369,7 +1423,8 @@ impl BTreeCursor {
// TODO: balance root
self.stack.pop();
// TODO: free pages
return Ok(CursorResult::IO);
write_info.state = WriteState::Finish;
return Ok(CursorResult::Ok(()));
}
WriteState::Finish => todo!(),
};
@@ -2014,11 +2069,11 @@ impl PageStack {
/// Push a new page onto the stack.
/// This effectively means traversing to a child page.
fn push(&self, page: PageRef) {
debug!(
"pagestack::push(current={}, new_page_id={})",
self.current_page.borrow(),
page.get().id
);
// debug!(
// "pagestack::push(current={}, new_page_id={})",
// self.current_page.borrow(),
// page.get().id
// );
*self.current_page.borrow_mut() += 1;
let current = *self.current_page.borrow();
assert!(
@@ -2033,7 +2088,7 @@ impl PageStack {
/// This effectively means traversing back up to a parent page.
fn pop(&self) {
let current = *self.current_page.borrow();
debug!("pagestack::pop(current={})", current);
// debug!("pagestack::pop(current={})", current);
self.cell_indices.borrow_mut()[current as usize] = 0;
self.stack.borrow_mut()[current as usize] = None;
*self.current_page.borrow_mut() -= 1;
@@ -2047,11 +2102,11 @@ impl PageStack {
.as_ref()
.unwrap()
.clone();
debug!(
"pagestack::top(current={}, page_id={})",
current,
page.get().id
);
// debug!(
// "pagestack::top(current={}, page_id={})",
// current,
// page.get().id
// );
page
}
@@ -2160,6 +2215,7 @@ pub fn btree_init_page(
contents.write_u16(PAGE_HEADER_OFFSET_CELL_COUNT, 0);
let cell_content_area_start = db_header.page_size - db_header.reserved_space as u16;
contents.write_u16(
PAGE_HEADER_OFFSET_CELL_CONTENT_AREA,
cell_content_area_start,
@@ -2200,7 +2256,6 @@ pub fn edit_page(
start,
);
count_cells -= number_to_shift;
// TODO: shift
}
if end_new_cells < end_old_cells {
let number_tail_removed = page_free_array(
@@ -2248,7 +2303,7 @@ pub fn edit_page(
usable_space,
);
// TODO: noverflow
page.write_u32(PAGE_HEADER_OFFSET_CELL_COUNT, count_cells as u32);
page.write_u16(PAGE_HEADER_OFFSET_CELL_COUNT, number_new_cells as u16);
}
pub fn page_free_array(
@@ -2269,12 +2324,15 @@ pub fn page_free_array(
// check if not overflow cell
if cell_pointer.start >= buf_range.start && cell_pointer.start < buf_range.end {
assert!(
cell_pointer.end >= buf_range.start && cell_pointer.end < buf_range.end,
cell_pointer.end >= buf_range.start && cell_pointer.end <= buf_range.end,
"whole cell should be inside the page"
);
// TODO: remove pointer too
let offset = (cell_pointer.start as usize - buf_range.start as usize) as u16;
let len = (cell_pointer.end as usize - buf_range.start as usize) as u16;
let len = (cell_pointer.end as usize - cell_pointer.start as usize) as u16;
println!("removing {}~{}", offset, len);
free_cell_range(page, offset, len, usable_space);
page.write_u16(PAGE_HEADER_OFFSET_CELL_COUNT, page.cell_count() as u16 - 1);
number_of_cells_removed += 1;
}
}
@@ -2288,6 +2346,7 @@ pub fn page_insert_array(
mut start_insert: usize,
usable_space: u16,
) {
dbg!(count);
// TODO: implement faster algorithm, this is doing extra work that's not needed.
// See pageInsertArray to understand faster way.
for i in first..first + count {
@@ -2439,6 +2498,7 @@ fn defragment_page(page: &PageContent, usable_space: u16) {
// return SQLITE_CORRUPT_PAGE(pPage);
// }
assert!(cbrk >= first_cell);
dbg!(cbrk, first_cell);
let write_buf = page.as_ptr();
// set new first byte of cell content
@@ -2446,8 +2506,7 @@ fn defragment_page(page: &PageContent, usable_space: u16) {
// set free block to 0, unused spaced can be retrieved from gap between cell pointer end and content start
page.write_u16(PAGE_HEADER_OFFSET_FIRST_FREEBLOCK, 0);
// set unused space to 0
let first_cell = cloned_page.cell_content_area() as u64;
assert!(first_cell <= cbrk);
dbg!(cbrk, first_cell);
write_buf[first_cell as usize..cbrk as usize].fill(0);
}
@@ -2529,7 +2588,14 @@ fn compute_free_space(page: &PageContent, usable_space: u16) -> u16 {
// #2. fragments (isolated 1-3 byte chunks of free space within the cell content area)
// #3. freeblocks (linked list of blocks of at least 4 bytes within the cell content area that are not in use due to e.g. deletions)
let mut free_space_bytes = page.unallocated_region_size() + page.num_frag_free_bytes() as usize;
let pointer_size = if matches!(page.page_type(), PageType::TableLeaf | PageType::IndexLeaf) {
0
} else {
4
};
let first_cell = page.offset + 8 + pointer_size + (2 * page.cell_count());
let mut free_space_bytes =
cell_content_area_start as usize + page.num_frag_free_bytes() as usize;
// #3 is computed by iterating over the freeblocks linked list
let mut cur_freeblock_ptr = page.first_freeblock() as usize;
@@ -2585,7 +2651,7 @@ fn compute_free_space(page: &PageContent, usable_space: u16) -> u16 {
// return SQLITE_CORRUPT_PAGE(pPage);
// }
free_space_bytes as u16
free_space_bytes as u16 - first_cell as u16
}
/// Allocate space for a cell on a page.