This commit is contained in:
alpaylan
2025-04-08 17:48:20 -04:00
4 changed files with 575 additions and 50 deletions

View File

@@ -6,10 +6,10 @@ runs:
steps:
- name: Install SQLite
env:
SQLITE_VERSION: "3470200"
YEAR: 2024
SQLITE_VERSION: "3490100"
YEAR: 2025
run: |
curl -o /tmp/sqlite.zip https://www.sqlite.org/$YEAR/sqlite-tools-linux-x64-$SQLITE_VERSION.zip > /dev/null
curl -o /tmp/sqlite.zip https://sqlite.org/$YEAR/sqlite-tools-linux-x64-$SQLITE_VERSION.zip > /dev/null
unzip -j /tmp/sqlite.zip sqlite3 -d /usr/local/bin/
sqlite3 --version
shell: bash

View File

@@ -13,6 +13,8 @@ use crate::{return_corrupt, LimboError, Result};
use std::cell::{Cell, Ref, RefCell};
use std::cmp::Ordering;
#[cfg(debug_assertions)]
use std::collections::HashSet;
use std::pin::Pin;
use std::rc::Rc;
@@ -607,7 +609,7 @@ impl BTreeCursor {
}
None => {
if has_parent {
debug!("moving simple upwards");
tracing::trace!("moving simple upwards");
self.going_upwards = true;
self.stack.pop();
continue;
@@ -955,7 +957,7 @@ impl BTreeCursor {
pub fn move_to(&mut self, key: SeekKey<'_>, cmp: SeekOp) -> Result<CursorResult<()>> {
assert!(self.mv_cursor.is_none());
tracing::debug!("move_to(key={:?} cmp={:?})", key, cmp);
tracing::trace!("move_to(key={:?} cmp={:?})", key, cmp);
// For a table with N rows, we can find any row by row id in O(log(N)) time by starting at the root page and following the B-tree pointers.
// B-trees consist of interior pages and leaf pages. Interior pages contain pointers to other pages, while leaf pages contain the actual row data.
//
@@ -1478,12 +1480,14 @@ impl BTreeCursor {
self.pager.add_dirty(sibling_page.get().id);
max_cells += sibling_contents.cell_count();
max_cells += sibling_contents.overflow_cells.len();
if i == 0 {
// we don't have left sibling from this one so we break
break;
// Right pointer is not dropped, we simply update it at the end. This could be a divider cell that points
// to the last page in the list of pages to balance or this could be the rightmost pointer that points to a page.
if i == balance_info.sibling_count - 1 {
continue;
}
// Since we know we have a left sibling, take the divider that points to left sibling of this page
let cell_idx = balance_info.first_divider_cell + i - 1;
let cell_idx = balance_info.first_divider_cell + i;
let (cell_start, cell_len) = parent_contents.cell_get_raw_region(
cell_idx,
payload_overflow_threshold_max(
@@ -1500,6 +1504,13 @@ impl BTreeCursor {
let cell_buf = &buf[cell_start..cell_start + cell_len];
max_cells += 1;
tracing::debug!(
"balance_non_root(drop_divider_cell, first_divider_cell={}, divider_cell={}, left_pointer={})",
balance_info.first_divider_cell,
i,
read_u32(cell_buf, 0)
);
// TODO(pere): make this reference and not copy
balance_info.divider_cells.push(cell_buf.to_vec());
tracing::trace!(
@@ -1576,6 +1587,11 @@ impl BTreeCursor {
// TODO(pere): in case of old pages are leaf pages, so index leaf page, we need to strip page pointers
// from divider cells in index interior pages (parent) because those should not be included.
cells_inserted += 1;
if !leaf {
// This divider cell needs to be updated with new left pointer,
let right_pointer = old_page_contents.rightmost_pointer().unwrap();
divider_cell[..4].copy_from_slice(&right_pointer.to_be_bytes());
}
cell_array.cells.push(to_static_buf(divider_cell.as_mut()));
}
total_cells_inserted += cells_inserted;
@@ -1586,6 +1602,17 @@ impl BTreeCursor {
cells_capacity_start,
"calculation of max cells was wrong"
);
// Let's copy all cells for later checks
#[cfg(debug_assertions)]
let mut cells_debug = Vec::new();
#[cfg(debug_assertions)]
{
for cell in &cell_array.cells {
cells_debug.push(cell.to_vec());
}
}
#[cfg(debug_assertions)]
{
for cell in &cell_array.cells {
@@ -1597,9 +1624,9 @@ impl BTreeCursor {
}
}
// calculate how many pages to allocate
let mut new_page_sizes = Vec::new();
let mut new_page_sizes: Vec<usize> = Vec::new();
let leaf_correction = if leaf { 4 } else { 0 };
// number of bytes beyond header, different from global usableSapce which inccludes
// number of bytes beyond header, different from global usableSapce which includes
// header
let usable_space = self.usable_space() - 12 + leaf_correction;
for i in 0..balance_info.sibling_count {
@@ -1610,11 +1637,16 @@ impl BTreeCursor {
let page_contents = page.get_contents();
let free_space = compute_free_space(page_contents, self.usable_space() as u16);
new_page_sizes.push(usable_space as u16 - free_space);
new_page_sizes.push(usable_space as usize - free_space as usize);
for overflow in &page_contents.overflow_cells {
let size = new_page_sizes.last_mut().unwrap();
// 2 to account of pointer
*size += 2 + overflow.payload.len() as u16;
*size += 2 + overflow.payload.len() as usize;
}
if !leaf && i < balance_info.sibling_count - 1 {
// Account for divider cell which is included in this page.
let size = new_page_sizes.last_mut().unwrap();
*size += cell_array.cells[cell_array.cell_count(i)].len() as usize;
}
}
@@ -1623,7 +1655,7 @@ impl BTreeCursor {
let mut i = 0;
while i < sibling_count_new {
// First try to move cells to the right if they do not fit
while new_page_sizes[i] > usable_space as u16 {
while new_page_sizes[i] > usable_space as usize {
let needs_new_page = i + 1 >= sibling_count_new;
if needs_new_page {
sibling_count_new += 1;
@@ -1637,7 +1669,7 @@ impl BTreeCursor {
);
}
let size_of_cell_to_remove_from_left =
2 + cell_array.cells[cell_array.cell_count(i) - 1].len() as u16;
2 + cell_array.cells[cell_array.cell_count(i) - 1].len() as usize;
new_page_sizes[i] -= size_of_cell_to_remove_from_left;
let size_of_cell_to_move_right = if !leaf_data {
if cell_array.number_of_cells_per_page[i]
@@ -1645,23 +1677,23 @@ impl BTreeCursor {
{
// This means we move to the right page the divider cell and we
// promote left cell to divider
2 + cell_array.cells[cell_array.cell_count(i)].len() as u16
2 + cell_array.cells[cell_array.cell_count(i)].len() as usize
} else {
0
}
} else {
size_of_cell_to_remove_from_left
};
new_page_sizes[i + 1] += size_of_cell_to_move_right;
new_page_sizes[i + 1] += size_of_cell_to_move_right as usize;
cell_array.number_of_cells_per_page[i] -= 1;
}
// Now try to take from the right if we didn't have enough
while cell_array.number_of_cells_per_page[i] < cell_array.cells.len() as u16 {
let size_of_cell_to_remove_from_right =
2 + cell_array.cells[cell_array.cell_count(i)].len() as u16;
2 + cell_array.cells[cell_array.cell_count(i)].len() as usize;
let can_take = new_page_sizes[i] + size_of_cell_to_remove_from_right
> usable_space as u16;
> usable_space as usize;
if can_take {
break;
}
@@ -1672,7 +1704,7 @@ impl BTreeCursor {
if cell_array.number_of_cells_per_page[i]
< cell_array.cells.len() as u16
{
2 + cell_array.cells[cell_array.cell_count(i)].len() as u16
2 + cell_array.cells[cell_array.cell_count(i)].len() as usize
} else {
0
}
@@ -1718,8 +1750,8 @@ impl BTreeCursor {
// the same we add to right (we don't add divider to right).
let mut cell_right = cell_left + 1 - leaf_data as u16;
loop {
let cell_left_size = cell_array.cell_size(cell_left as usize);
let cell_right_size = cell_array.cell_size(cell_right as usize);
let cell_left_size = cell_array.cell_size(cell_left as usize) as usize;
let cell_right_size = cell_array.cell_size(cell_right as usize) as usize;
// TODO: add assert nMaxCells
let pointer_size = if i == sibling_count_new - 1 { 0 } else { 2 };
@@ -1779,13 +1811,35 @@ impl BTreeCursor {
}
}
// Write right pointer in parent page to point to new rightmost page
#[cfg(debug_assertions)]
{
tracing::debug!("balance_non_root(parent page_id={})", parent_page.get().id);
for page in &pages_to_balance_new {
tracing::debug!("balance_non_root(new_sibling page_id={})", page.get().id);
}
}
// pages_pointed_to helps us debug we did in fact create divider cells to all the new pages and the rightmost pointer,
// also points to the last page.
#[cfg(debug_assertions)]
let mut pages_pointed_to = HashSet::new();
// Write right pointer in parent page to point to new rightmost page. keep in mind
// we update rightmost pointer first because inserting cells could defragment parent page,
// therfore invalidating the pointer.
let right_page_id = pages_to_balance_new.last().unwrap().get().id as u32;
let rightmost_pointer = balance_info.rightmost_pointer;
let rightmost_pointer =
unsafe { std::slice::from_raw_parts_mut(rightmost_pointer, 4) };
rightmost_pointer[0..4].copy_from_slice(&right_page_id.to_be_bytes());
#[cfg(debug_assertions)]
pages_pointed_to.insert(right_page_id);
tracing::debug!(
"balance_non_root(rightmost_pointer_update, rightmost_pointer={})",
right_page_id
);
// Ensure right-child pointer of the right-most new sibling pge points to the page
// that was originally on that place.
let is_leaf_page = matches!(page_type, PageType::TableLeaf | PageType::IndexLeaf);
@@ -1816,8 +1870,15 @@ impl BTreeCursor {
page.get_contents()
.write_u32(PAGE_HEADER_OFFSET_RIGHTMOST_PTR, previous_pointer_divider);
// divider cell now points to this page
divider_cell[0..4].copy_from_slice(&(page.get().id as u32).to_be_bytes());
new_divider_cell.extend_from_slice(divider_cell);
new_divider_cell.extend_from_slice(&(page.get().id as u32).to_be_bytes());
// now copy the rest of the divider cell:
// Table Interior page:
// * varint rowid
// Index Interior page:
// * varint payload size
// * payload
// * first overflow page (u32 optional)
new_divider_cell.extend_from_slice(&divider_cell[4..]);
} else if leaf_data {
// Leaf table
// FIXME: not needed conversion
@@ -1832,8 +1893,26 @@ impl BTreeCursor {
// Leaf index
new_divider_cell.extend_from_slice(divider_cell);
}
let left_pointer = read_u32(&new_divider_cell[..4], 0);
assert!(left_pointer != parent_page.get().id as u32);
#[cfg(debug_assertions)]
pages_pointed_to.insert(left_pointer);
tracing::debug!(
"balance_non_root(insert_divider_cell, first_divider_cell={}, divider_cell={}, left_pointer={})",
balance_info.first_divider_cell,
i,
left_pointer
);
assert_eq!(left_pointer, page.get().id as u32);
// FIXME: remove this lock
assert!(
left_pointer <= self.pager.db_header.lock().database_size,
"invalid page number divider left pointer {} > database number of pages",
left_pointer,
);
// FIXME: defragment shouldn't be needed
defragment_page(parent_contents, self.usable_space() as u16);
// defragment_page(parent_contents, self.usable_space() as u16);
insert_into_cell(
parent_contents,
&new_divider_cell,
@@ -1841,6 +1920,57 @@ impl BTreeCursor {
self.usable_space() as u16,
)
.unwrap();
#[cfg(debug_assertions)]
{
let left_pointer = if parent_contents.overflow_cells.len() == 0 {
let (cell_start, cell_len) = parent_contents.cell_get_raw_region(
balance_info.first_divider_cell + i,
payload_overflow_threshold_max(
parent_contents.page_type(),
self.usable_space() as u16,
),
payload_overflow_threshold_min(
parent_contents.page_type(),
self.usable_space() as u16,
),
self.usable_space(),
);
tracing::debug!(
"balance_non_root(cell_start={}, cell_len={})",
cell_start,
cell_len
);
let left_pointer = read_u32(
&parent_contents.as_ptr()[cell_start..cell_start + cell_len],
0,
);
left_pointer
} else {
let cell = &parent_contents.overflow_cells[0];
assert_eq!(cell.index, balance_info.first_divider_cell + i);
read_u32(&cell.payload, 0)
};
assert_eq!(left_pointer, page.get().id as u32, "the cell we just inserted doesn't point to the correct page. points to {}, should point to {}",
left_pointer,
page.get().id as u32
);
}
}
tracing::debug!(
"balance_non_root(parent_overflow={})",
parent_contents.overflow_cells.len()
);
#[cfg(debug_assertions)]
{
for page in &pages_to_balance_new {
assert!(
pages_pointed_to.contains(&(page.get().id as u32)),
"page {} not pointed to by divider cell or rightmost pointer",
page.get().id
);
}
}
// TODO: update pages
let mut done = vec![false; sibling_count_new];
@@ -1858,6 +1988,7 @@ impl BTreeCursor {
(0, 0, cell_array.cell_count(0))
} else {
let this_was_old_page = page_idx < balance_info.sibling_count;
// We add !leaf_data because we want to skip 1 in case of divider cell which is encountared between pages assigned
let start_old_cells = if this_was_old_page {
count_cells_in_old_pages[page_idx - 1] as usize
+ (!leaf_data) as usize
@@ -1894,6 +2025,19 @@ impl BTreeCursor {
done[page_idx] = true;
}
}
#[cfg(debug_assertions)]
self.post_balance_non_root_validation(
&parent_page,
balance_info,
parent_contents,
pages_to_balance_new,
page_type,
leaf_data,
cells_debug,
sibling_count_new,
rightmost_pointer,
);
// TODO: balance root
// TODO: free pages
(WriteState::BalanceStart, Ok(CursorResult::Ok(())))
@@ -1909,6 +2053,338 @@ impl BTreeCursor {
result
}
#[cfg(debug_assertions)]
fn post_balance_non_root_validation(
&self,
parent_page: &PageRef,
balance_info: &mut BalanceInfo,
parent_contents: &mut PageContent,
pages_to_balance_new: Vec<std::sync::Arc<crate::Page>>,
page_type: PageType,
leaf_data: bool,
mut cells_debug: Vec<Vec<u8>>,
sibling_count_new: usize,
rightmost_pointer: &mut [u8],
) {
let mut valid = true;
let mut current_index_cell = 0;
for cell_idx in 0..parent_contents.cell_count() {
let cell = parent_contents
.cell_get(
cell_idx,
payload_overflow_threshold_max(
parent_contents.page_type(),
self.usable_space() as u16,
),
payload_overflow_threshold_min(
parent_contents.page_type(),
self.usable_space() as u16,
),
self.usable_space(),
)
.unwrap();
match cell {
BTreeCell::TableInteriorCell(table_interior_cell) => {
let left_child_page = table_interior_cell._left_child_page;
if left_child_page == parent_page.get().id as u32 {
tracing::error!("balance_non_root(parent_divider_points_to_same_page, page_id={}, cell_left_child_page={})",
parent_page.get().id,
left_child_page,
);
valid = false;
}
}
BTreeCell::IndexInteriorCell(index_interior_cell) => {
let left_child_page = index_interior_cell.left_child_page;
if left_child_page == parent_page.get().id as u32 {
tracing::error!("balance_non_root(parent_divider_points_to_same_page, page_id={}, cell_left_child_page={})",
parent_page.get().id,
left_child_page,
);
valid = false;
}
}
_ => {}
}
}
// Let's now make a in depth check that we in fact added all possible cells somewhere and they are not lost
for (page_idx, page) in pages_to_balance_new.iter().enumerate() {
let contents = page.get_contents();
// Cells are distributed in order
for cell_idx in 0..contents.cell_count() {
let (cell_start, cell_len) = contents.cell_get_raw_region(
cell_idx,
payload_overflow_threshold_max(
contents.page_type(),
self.usable_space() as u16,
),
payload_overflow_threshold_min(
contents.page_type(),
self.usable_space() as u16,
),
self.usable_space(),
);
let buf = contents.as_ptr();
let cell_buf = to_static_buf(&mut buf[cell_start..cell_start + cell_len]);
let cell_buf_in_array = &cells_debug[current_index_cell];
if cell_buf != cell_buf_in_array {
tracing::error!("balance_non_root(cell_not_found_debug, page_id={}, cell_in_cell_array_idx={})",
page.get().id,
current_index_cell,
);
valid = false;
}
let cell = crate::storage::sqlite3_ondisk::read_btree_cell(
cell_buf,
&page_type,
0,
payload_overflow_threshold_max(
parent_contents.page_type(),
self.usable_space() as u16,
),
payload_overflow_threshold_min(
parent_contents.page_type(),
self.usable_space() as u16,
),
self.usable_space(),
)
.unwrap();
match &cell {
BTreeCell::TableInteriorCell(table_interior_cell) => {
let left_child_page = table_interior_cell._left_child_page;
if left_child_page == page.get().id as u32 {
tracing::error!("balance_non_root(child_page_points_same_page, page_id={}, cell_left_child_page={}, page_idx={})",
page.get().id,
left_child_page,
page_idx
);
valid = false;
}
if left_child_page == parent_page.get().id as u32 {
tracing::error!("balance_non_root(child_page_points_parent_of_child, page_id={}, cell_left_child_page={}, page_idx={})",
page.get().id,
left_child_page,
page_idx
);
valid = false;
}
}
BTreeCell::IndexInteriorCell(index_interior_cell) => {
let left_child_page = index_interior_cell.left_child_page;
if left_child_page == page.get().id as u32 {
tracing::error!("balance_non_root(child_page_points_same_page, page_id={}, cell_left_child_page={}, page_idx={})",
page.get().id,
left_child_page,
page_idx
);
valid = false;
}
if left_child_page == parent_page.get().id as u32 {
tracing::error!("balance_non_root(child_page_points_parent_of_child, page_id={}, cell_left_child_page={}, page_idx={})",
page.get().id,
left_child_page,
page_idx
);
valid = false;
}
}
_ => {}
}
current_index_cell += 1;
}
// Now check divider cells and their pointers.
let parent_buf = parent_contents.as_ptr();
let cell_divider_idx = balance_info.first_divider_cell + page_idx;
if page_idx == sibling_count_new - 1 {
// We will only validate rightmost pointer of parent page, we will not validate rightmost if it's a cell and not the last pointer because,
// insert cell could've defragmented the page and invalidated the pointer.
// right pointer, we just check right pointer points to this page.
if cell_divider_idx == parent_contents.cell_count() {
let rightmost = read_u32(rightmost_pointer, 0);
if rightmost != page.get().id as u32 {
tracing::error!("balance_non_root(cell_divider_right_pointer, should point to {}, but points to {})",
page.get().id,
rightmost
);
valid = false;
}
}
} else {
// divider cell might be an overflow cell
let mut was_overflow = false;
for overflow_cell in &parent_contents.overflow_cells {
if overflow_cell.index == cell_divider_idx {
let left_pointer = read_u32(&overflow_cell.payload, 0);
if left_pointer != page.get().id as u32 {
tracing::error!("balance_non_root(cell_divider_left_pointer_overflow, should point to page_id={}, but points to {}, divider_cell={}, overflow_cells_parent={})",
page.get().id,
left_pointer,
page_idx,
parent_contents.overflow_cells.len()
);
valid = false;
}
was_overflow = true;
break;
}
}
if was_overflow {
continue;
}
// check if overflow
// check if right pointer, this is the last page. Do we update rightmost pointer and defragment moves it?
let (cell_start, cell_len) = parent_contents.cell_get_raw_region(
cell_divider_idx,
payload_overflow_threshold_max(
parent_contents.page_type(),
self.usable_space() as u16,
),
payload_overflow_threshold_min(
parent_contents.page_type(),
self.usable_space() as u16,
),
self.usable_space(),
);
let cell_left_pointer = read_u32(&parent_buf[cell_start..cell_start + cell_len], 0);
if cell_left_pointer != page.get().id as u32 {
tracing::error!("balance_non_root(cell_divider_left_pointer, should point to page_id={}, but points to {}, divider_cell={}, overflow_cells_parent={})",
page.get().id,
cell_left_pointer,
page_idx,
parent_contents.overflow_cells.len()
);
valid = false;
}
if leaf_data {
// If we are in a table leaf page, we just need to check that this cell that should be a divider cell is in the parent
// This means we already check cell in leaf pages but not on parent so we don't advance current_index_cell
if page_idx >= balance_info.sibling_count - 1 {
// This means we are in the last page and we don't need to check anything
continue;
}
let cell_buf: &'static mut [u8] =
to_static_buf(&mut cells_debug[current_index_cell - 1]);
let cell = crate::storage::sqlite3_ondisk::read_btree_cell(
cell_buf,
&page_type,
0,
payload_overflow_threshold_max(
parent_contents.page_type(),
self.usable_space() as u16,
),
payload_overflow_threshold_min(
parent_contents.page_type(),
self.usable_space() as u16,
),
self.usable_space(),
)
.unwrap();
let parent_cell = parent_contents
.cell_get(
cell_divider_idx,
payload_overflow_threshold_max(
parent_contents.page_type(),
self.usable_space() as u16,
),
payload_overflow_threshold_min(
parent_contents.page_type(),
self.usable_space() as u16,
),
self.usable_space(),
)
.unwrap();
let rowid = match cell {
BTreeCell::TableLeafCell(table_leaf_cell) => table_leaf_cell._rowid,
_ => unreachable!(),
};
let rowid_parent = match parent_cell {
BTreeCell::TableInteriorCell(table_interior_cell) => {
table_interior_cell._rowid
}
_ => unreachable!(),
};
if rowid_parent != rowid {
tracing::error!("balance_non_root(cell_divider_rowid, page_id={}, cell_divider_idx={}, rowid_parent={}, rowid={})",
page.get().id,
cell_divider_idx,
rowid_parent,
rowid
);
valid = false;
}
} else {
// In any other case, we need to check that this cell was moved to parent as divider cell
let mut was_overflow = false;
for overflow_cell in &parent_contents.overflow_cells {
if overflow_cell.index == cell_divider_idx {
let left_pointer = read_u32(&overflow_cell.payload, 0);
if left_pointer != page.get().id as u32 {
tracing::error!("balance_non_root(cell_divider_divider_cell_overflow should point to page_id={}, but points to {}, divider_cell={}, overflow_cells_parent={})",
page.get().id,
left_pointer,
page_idx,
parent_contents.overflow_cells.len()
);
valid = false;
}
was_overflow = true;
break;
}
}
if was_overflow {
continue;
}
let (parent_cell_start, parent_cell_len) = parent_contents.cell_get_raw_region(
cell_divider_idx,
payload_overflow_threshold_max(
parent_contents.page_type(),
self.usable_space() as u16,
),
payload_overflow_threshold_min(
parent_contents.page_type(),
self.usable_space() as u16,
),
self.usable_space(),
);
let cell_buf_in_array = &cells_debug[current_index_cell];
let left_pointer = read_u32(
&parent_buf[parent_cell_start..parent_cell_start + parent_cell_len],
0,
);
if left_pointer != page.get().id as u32 {
tracing::error!("balance_non_root(divider_cell_left_pointer_interior should point to page_id={}, but points to {}, divider_cell={}, overflow_cells_parent={})",
page.get().id,
left_pointer,
page_idx,
parent_contents.overflow_cells.len()
);
valid = false;
}
match page_type {
PageType::TableInterior | PageType::IndexInterior => {
let parent_cell_buf =
&parent_buf[parent_cell_start..parent_cell_start + parent_cell_len];
if parent_cell_buf[4..] != cell_buf_in_array[4..] {
tracing::error!("balance_non_root(cell_divider_cell, page_id={}, cell_divider_idx={})",
page.get().id,
cell_divider_idx,
);
valid = false;
}
}
PageType::IndexLeaf => todo!(),
_ => {
unreachable!()
}
}
current_index_cell += 1;
}
}
}
assert!(valid, "corrupted database, cells were to balanced properly");
}
/// Balance the root page.
/// This is done when the root page overflows, and we need to create a new root page.
/// See e.g. https://en.wikipedia.org/wiki/B-tree
@@ -3133,7 +3609,7 @@ fn edit_page(
cell_array: &CellArray,
usable_space: u16,
) -> Result<()> {
tracing::trace!(
tracing::debug!(
"edit_page start_old_cells={} start_new_cells={} number_new_cells={} cell_array={}",
start_old_cells,
start_new_cells,
@@ -3443,7 +3919,6 @@ fn defragment_page(page: &PageContent, usable_space: u16) {
/// Only enabled in debug mode, where we ensure that all cells are valid.
fn debug_validate_cells_core(page: &PageContent, usable_space: u16) {
for i in 0..page.cell_count() {
// println!("Debug function: i={}", i);
let (offset, size) = page.cell_get_raw_region(
i,
payload_overflow_threshold_max(page.page_type(), usable_space),
@@ -3487,7 +3962,7 @@ fn insert_into_cell(
}
let new_cell_data_pointer = allocate_cell_space(page, payload.len() as u16, usable_space)?;
tracing::trace!(
tracing::debug!(
"insert_into_cell(idx={}, pc={})",
cell_idx,
new_cell_data_pointer
@@ -3815,6 +4290,8 @@ fn shift_pointers_left(page: &mut PageContent, cell_idx: usize) {
#[cfg(test)]
mod tests {
use rand::thread_rng;
use rand::Rng;
use rand_chacha::rand_core::RngCore;
use rand_chacha::rand_core::SeedableRng;
use rand_chacha::ChaCha8Rng;
@@ -4009,6 +4486,14 @@ mod tests {
_left_child_page, ..
}) => {
child_pages.push(pager.read_page(_left_child_page as usize).unwrap());
if _left_child_page == page.id as u32 {
valid = false;
tracing::error!(
"left child page is the same as parent {}",
_left_child_page
);
continue;
}
let (child_depth, child_valid) =
validate_btree(pager.clone(), _left_child_page as usize);
valid &= child_valid;
@@ -4016,6 +4501,10 @@ mod tests {
}
_ => panic!("unsupported btree cell: {:?}", cell),
};
if current_depth >= 100 {
tracing::error!("depth is too big");
return (100, false);
}
depth = Some(depth.unwrap_or(current_depth + 1));
if depth != Some(current_depth + 1) {
tracing::error!("depth is different for child of page {}", page_idx);
@@ -4251,9 +4740,7 @@ mod tests {
let (pager, root_page) = empty_btree();
let mut cursor = BTreeCursor::new(None, pager.clone(), root_page);
let mut keys = Vec::new();
let seed = rng.next_u64();
tracing::info!("seed: {}", seed);
let mut rng = ChaCha8Rng::seed_from_u64(seed);
for insert_id in 0..inserts {
let size = size(&mut rng);
let key = {
@@ -4288,12 +4775,31 @@ mod tests {
let value = ImmutableRecord::from_registers(&[Register::OwnedValue(
OwnedValue::Blob(vec![0; size]),
)]);
let btree_before = format_btree(pager.clone(), root_page, 0);
run_until_done(
|| cursor.insert(&BTreeKey::new_table_rowid(key as u64, Some(&value)), true),
pager.deref(),
)
.unwrap();
if matches!(validate_btree(pager.clone(), root_page), (_, false)) {
// FIXME: add sorted vector instead, should be okay for small amounts of keys for now :P, too lazy to fix right now
keys.sort();
cursor.move_to_root();
let mut valid = true;
for key in keys.iter() {
tracing::trace!("seeking key: {}", key);
run_until_done(|| cursor.next(), pager.deref()).unwrap();
let cursor_rowid = cursor.rowid().unwrap().unwrap();
if *key as u64 != cursor_rowid {
valid = false;
println!("key {} is not found, got {}", key, cursor_rowid);
break;
}
}
// let's validate btree too so that we undertsand where the btree failed
if matches!(validate_btree(pager.clone(), root_page), (_, false)) || !valid {
let btree_after = format_btree(pager.clone(), root_page, 0);
println!("btree before:\n{}", btree_before);
println!("btree after:\n{}", btree_after);
panic!("invalid btree");
}
}
@@ -4304,13 +4810,17 @@ mod tests {
if matches!(validate_btree(pager.clone(), root_page), (_, false)) {
panic!("invalid btree");
}
keys.sort();
cursor.move_to_root();
for key in keys.iter() {
let seek_key = SeekKey::TableRowId(*key as u64);
tracing::debug!("seeking key: {}", key);
let found =
run_until_done(|| cursor.seek(seek_key.clone(), SeekOp::EQ), pager.deref())
.unwrap();
assert!(found, "key {} is not found", key);
tracing::trace!("seeking key: {}", key);
run_until_done(|| cursor.next(), pager.deref()).unwrap();
let cursor_rowid = cursor.rowid().unwrap().unwrap();
assert_eq!(
*key as u64, cursor_rowid,
"key {} is not found, got {}",
key, cursor_rowid
);
}
}
}
@@ -4368,25 +4878,21 @@ mod tests {
}
#[test]
#[ignore]
pub fn btree_insert_fuzz_run_random() {
btree_insert_fuzz_run(128, 16, |rng| (rng.next_u32() % 4096) as usize);
}
#[test]
#[ignore]
pub fn btree_insert_fuzz_run_small() {
btree_insert_fuzz_run(1, 1024, |rng| (rng.next_u32() % 128) as usize);
btree_insert_fuzz_run(1, 100, |rng| (rng.next_u32() % 128) as usize);
}
#[test]
#[ignore]
pub fn btree_insert_fuzz_run_big() {
btree_insert_fuzz_run(64, 32, |rng| 3 * 1024 + (rng.next_u32() % 1024) as usize);
}
#[test]
#[ignore]
pub fn btree_insert_fuzz_run_overflow() {
btree_insert_fuzz_run(64, 32, |rng| (rng.next_u32() % 32 * 1024) as usize);
}
@@ -4775,15 +5281,13 @@ mod tests {
let mut total_size = 0;
let mut cells = Vec::new();
let usable_space = 4096;
let mut i = 1000;
// let seed = thread_rng().gen();
// let seed = 15292777653676891381;
let seed = 9261043168681395159;
let mut i = 100000;
let seed = thread_rng().gen();
tracing::info!("seed {}", seed);
let mut rng = ChaCha8Rng::seed_from_u64(seed);
while i > 0 {
i -= 1;
match rng.next_u64() % 3 {
match rng.next_u64() % 4 {
0 => {
// allow appends with extra place to insert
let cell_idx = rng.next_u64() as usize % (page.cell_count() + 1);
@@ -4800,14 +5304,14 @@ mod tests {
4096,
conn.pager.clone(),
);
if (free as usize) < payload.len() - 2 {
if (free as usize) < payload.len() + 2 {
// do not try to insert overflow pages because they require balancing
continue;
}
insert_into_cell(page, &payload, cell_idx, 4096).unwrap();
assert!(page.overflow_cells.is_empty());
total_size += payload.len() as u16 + 2;
cells.push(Cell { pos: i, payload });
cells.insert(cell_idx, Cell { pos: i, payload });
}
1 => {
if page.cell_count() == 0 {
@@ -4827,6 +5331,13 @@ mod tests {
2 => {
defragment_page(page, usable_space);
}
3 => {
// check cells
for (i, cell) in cells.iter().enumerate() {
ensure_cell(page, i, &cell.payload);
}
assert_eq!(page.cell_count(), cells.len());
}
_ => unreachable!(),
}
let free = compute_free_space(page, usable_space);

View File

@@ -197,6 +197,12 @@ impl Display for OwnedValue {
}
Self::Float(fl) => {
let fl = *fl;
if fl == f64::INFINITY {
return write!(f, "Inf");
}
if fl == f64::NEG_INFINITY {
return write!(f, "-Inf");
}
if fl.is_nan() {
return write!(f, "");
}

View File

@@ -166,6 +166,14 @@ do_execsql_test select-like-expression {
select 2 % 0.5
} {}
do_execsql_test select_positive_infinite_float {
SELECT 1.7976931348623157E+308 + 1e308; -- f64::MAX + 1e308
} {Inf}
do_execsql_test select_negative_infinite_float {
SELECT -1.7976931348623157E+308 - 1e308 -- f64::MIN - 1e308
} {-Inf}
do_execsql_test select_shl_large_negative_float {
SELECT 1 << -1e19;
SELECT 1 << -9223372036854775808; -- i64::MIN