Merge 'Minor improvements and cleanups in btree' from Preston Thorpe

This PR does the following:
1. Replaces a couple `RefCell`'s on the cursor and page stack with
`Cell` since the types implement `Copy`
2. Adds a `return_corrupt!` macro to handle the frequent error
3. Removed `Result` return type from cursor `record` method
4. Removed a couple clones and very minor refactoring (a few clippy
suggestions)

Reviewed-by: Pere Diaz Bou <pere-altea@homail.com>

Closes #1075
This commit is contained in:
Pere Diaz Bou
2025-03-02 22:02:58 +01:00
3 changed files with 81 additions and 81 deletions

View File

@@ -6,9 +6,9 @@ use crate::storage::sqlite3_ondisk::{
};
use crate::types::{CursorResult, OwnedValue, Record, SeekKey, SeekOp};
use crate::{LimboError, Result};
use crate::{return_corrupt, LimboError, Result};
use std::cell::{Ref, RefCell};
use std::cell::{Cell, Ref, RefCell};
use std::pin::Pin;
use std::rc::Rc;
@@ -145,7 +145,7 @@ pub struct BTreeCursor {
/// Page id of the root page used to go back up fast.
root_page: usize,
/// Rowid and record are stored before being consumed.
rowid: RefCell<Option<u64>>,
rowid: Cell<Option<u64>>,
record: RefCell<Option<Record>>,
null_flag: bool,
/// Index internal pages are consumed on the way up, so we store going upwards flag in case
@@ -164,7 +164,7 @@ pub struct BTreeCursor {
/// the parent. Using current_page + 1 or higher is undefined behaviour.
struct PageStack {
/// Pointer to the current page being consumed
current_page: RefCell<i32>,
current_page: Cell<i32>,
/// List of pages in the stack. Root page will be in index 0
stack: RefCell<[Option<PageRef>; BTCURSOR_MAX_DEPTH + 1]>,
/// List of cell indices in the stack.
@@ -188,13 +188,13 @@ impl BTreeCursor {
Self {
pager,
root_page,
rowid: RefCell::new(None),
rowid: Cell::new(None),
record: RefCell::new(None),
null_flag: false,
going_upwards: false,
state: CursorState::None,
stack: PageStack {
current_page: RefCell::new(-1),
current_page: Cell::new(-1),
cell_indices: RefCell::new([0; BTCURSOR_MAX_DEPTH + 1]),
stack: RefCell::new([const { None }; BTCURSOR_MAX_DEPTH + 1]),
},
@@ -736,7 +736,7 @@ impl BTreeCursor {
.state
.mut_write_info()
.expect("can't insert while counting");
write_info.state.clone()
write_info.state
};
match write_state {
WriteState::Start => {
@@ -811,7 +811,7 @@ impl BTreeCursor {
};
};
self.state = CursorState::None;
return ret;
ret
}
/// Balance a leaf page.
@@ -867,12 +867,7 @@ impl BTreeCursor {
matches!(self.state, CursorState::Write(_)),
"Cursor must be in balancing state"
);
let state = self
.state
.write_info()
.expect("must be balancing")
.state
.clone();
let state = self.state.write_info().expect("must be balancing").state;
tracing::debug!("balance_non_root(state={:?})", state);
let (next_write_state, result) = match state {
WriteState::Start => todo!(),
@@ -917,25 +912,25 @@ impl BTreeCursor {
// As there will be at maximum 3 pages used to balance:
// sibling_pointer is the index represeneting one of those 3 pages, and we initialize it to the last possible page.
// next_divider is the first divider that contains the first page of the 3 pages.
let (sibling_pointer, first_cell_divider) = if number_of_cells_in_parent < 2 {
(number_of_cells_in_parent, 0)
} else if number_of_cells_in_parent == 2 {
let (sibling_pointer, first_cell_divider) = match number_of_cells_in_parent {
n if n < 2 => (number_of_cells_in_parent, 0),
2 => (2, 0),
// Here we will have at lest 2 cells and one right pointer, therefore we can get 3 siblings.
// In case of 2 we will have all pages to balance.
(2, 0)
} else {
// In case of > 3 we have to check which ones to get
let next_divider = if page_to_balance_idx == 0 {
// first cell, take first 3
0
} else if page_to_balance_idx == number_of_cells_in_parent {
// Page corresponds to right pointer, so take last 3
number_of_cells_in_parent - 2
} else {
// Some cell in the middle, so we want to take sibling on left and right.
page_to_balance_idx - 1
};
(2, next_divider)
_ => {
// In case of > 3 we have to check which ones to get
let next_divider = if page_to_balance_idx == 0 {
// first cell, take first 3
0
} else if page_to_balance_idx == number_of_cells_in_parent {
// Page corresponds to right pointer, so take last 3
number_of_cells_in_parent - 2
} else {
// Some cell in the middle, so we want to take sibling on left and right.
page_to_balance_idx - 1
};
(2, next_divider)
}
};
write_info.sibling_count.replace(sibling_pointer + 1);
write_info.first_divider_cell.replace(first_cell_divider);
@@ -1030,7 +1025,7 @@ impl BTreeCursor {
let parent_page = self.stack.top();
let parent_contents = parent_page.get_contents();
assert!(
parent_contents.overflow_cells.len() == 0,
parent_contents.overflow_cells.is_empty(),
"overflow parent not yet implemented"
);
let sibling_count = *write_info.sibling_count.borrow();
@@ -1170,7 +1165,7 @@ impl BTreeCursor {
.push(count_cells_in_old_pages[i]);
let page = &pages_to_balance[i];
let page_contents = page.get_contents();
let free_space = compute_free_space(&page_contents, self.usable_space() as u16);
let free_space = compute_free_space(page_contents, self.usable_space() as u16);
// If we have an empty page of cells, we ignore it
if k > 0
@@ -1371,12 +1366,14 @@ impl BTreeCursor {
.write_u32(PAGE_HEADER_OFFSET_RIGHTMOST_PTR, right_pointer);
}
// TODO: pointer map update (vacuum support)
for i in 0..sibling_count_new - 1
for (i, page) in pages_to_balance_new
.iter()
.enumerate()
.take(sibling_count_new - 1)
/* do not take last page */
{
let divider_cell_idx = cell_array.cell_count(i);
let mut divider_cell = &mut cell_array.cells[divider_cell_idx];
let page = &pages_to_balance_new[i];
// FIXME: dont use auxiliary space, could be done without allocations
let mut new_divider_cell = Vec::new();
if !is_leaf_page {
@@ -1653,7 +1650,7 @@ impl BTreeCursor {
}
pub fn rowid(&self) -> Result<Option<u64>> {
Ok(*self.rowid.borrow())
Ok(self.rowid.get())
}
pub fn seek(&mut self, key: SeekKey<'_>, op: SeekOp) -> Result<CursorResult<bool>> {
@@ -1663,8 +1660,8 @@ impl BTreeCursor {
Ok(CursorResult::Ok(rowid.is_some()))
}
pub fn record(&self) -> Result<Ref<Option<Record>>> {
Ok(self.record.borrow())
pub fn record(&self) -> Ref<Option<Record>> {
self.record.borrow()
}
pub fn insert(
@@ -1695,8 +1692,8 @@ impl BTreeCursor {
return Ok(CursorResult::IO);
}
let target_rowid = match self.rowid.borrow().as_ref() {
Some(rowid) => *rowid,
let target_rowid = match self.rowid.get() {
Some(rowid) => rowid,
None => return Ok(CursorResult::Ok(())),
};
@@ -1738,11 +1735,11 @@ impl BTreeCursor {
)?;
if cell_idx >= contents.cell_count() {
return Err(LimboError::Corrupt(format!(
return_corrupt!(format!(
"Corrupted page: cell index {} is out of bounds for page with {} cells",
cell_idx,
contents.cell_count()
)));
));
}
let original_child_pointer = match &cell {
@@ -1916,7 +1913,7 @@ impl BTreeCursor {
// Validate overflow page number
if current_page < 2 || current_page as usize > page_count {
return Err(LimboError::Corrupt("Invalid overflow page number".into()));
return_corrupt!("Invalid overflow page number");
}
let page = self.pager.read_page(current_page as usize)?;
@@ -1958,7 +1955,7 @@ impl BTreeCursor {
let n_overflow =
(payload_len - local_size + overflow_page_size).div_ceil(overflow_page_size);
if n_overflow == 0 {
return Err(LimboError::Corrupt("Invalid overflow calculation".into()));
return_corrupt!("Invalid overflow page count");
}
Ok(Some(n_overflow))
@@ -1966,16 +1963,22 @@ impl BTreeCursor {
}
impl PageStack {
fn increment_current(&self) {
self.current_page.set(self.current_page.get() + 1);
}
fn decrement_current(&self) {
self.current_page.set(self.current_page.get() - 1);
}
/// Push a new page onto the stack.
/// This effectively means traversing to a child page.
fn push(&self, page: PageRef) {
tracing::trace!(
"pagestack::push(current={}, new_page_id={})",
self.current_page.borrow(),
self.current_page.get(),
page.get().id
);
*self.current_page.borrow_mut() += 1;
let current = *self.current_page.borrow();
self.increment_current();
let current = self.current_page.get();
assert!(
current < BTCURSOR_MAX_DEPTH as i32,
"corrupted database, stack is bigger than expected"
@@ -1987,24 +1990,23 @@ impl PageStack {
/// Pop a page off the stack.
/// This effectively means traversing back up to a parent page.
fn pop(&self) {
let current = *self.current_page.borrow();
let current = self.current_page.get();
tracing::trace!("pagestack::pop(current={})", current);
self.cell_indices.borrow_mut()[current as usize] = 0;
self.stack.borrow_mut()[current as usize] = None;
*self.current_page.borrow_mut() -= 1;
self.decrement_current();
}
/// Get the top page on the stack.
/// This is the page that is currently being traversed.
fn top(&self) -> PageRef {
let current = *self.current_page.borrow();
let page = self.stack.borrow()[current as usize]
let page = self.stack.borrow()[self.current()]
.as_ref()
.unwrap()
.clone();
tracing::trace!(
"pagestack::top(current={}, page_id={})",
current,
self.current(),
page.get().id
);
page
@@ -2012,7 +2014,7 @@ impl PageStack {
/// Current page pointer being used
fn current(&self) -> usize {
*self.current_page.borrow() as usize
self.current_page.get() as usize
}
/// Cell index of the current page
@@ -2048,11 +2050,11 @@ impl PageStack {
}
fn has_parent(&self) -> bool {
*self.current_page.borrow() > 0
self.current_page.get() > 0
}
fn clear(&self) {
*self.current_page.borrow_mut() = -1;
self.current_page.set(-1);
}
}
@@ -2104,15 +2106,11 @@ fn find_free_cell(page_ref: &PageContent, usable_space: u16, amount: usize) -> R
prev_pc = pc;
pc = next as usize;
if pc <= prev_pc && pc != 0 {
return Err(LimboError::Corrupt(
"Free list not in ascending order".into(),
));
return_corrupt!("Free list not in ascending order");
}
}
if pc > maxpc + amount - 4 {
return Err(LimboError::Corrupt(
"Free block chain extends beyond page end".into(),
));
return_corrupt!("Free block chain extends beyond page end");
}
Ok(0)
}
@@ -2302,9 +2300,7 @@ fn free_cell_range(
if pc == 0 {
break;
}
return Err(LimboError::Corrupt(
"free cell range free block not in ascending order".into(),
));
return_corrupt!("free cell range free block not in ascending order");
}
let next = page.read_u16_no_offset(pc as usize);
@@ -2313,20 +2309,18 @@ fn free_cell_range(
}
if pc > usable_space - 4 {
return Err(LimboError::Corrupt("Free block beyond usable space".into()));
return_corrupt!("Free block beyond usable space");
}
let mut removed_fragmentation = 0;
if pc > 0 && offset + len + 3 >= pc {
removed_fragmentation = (pc - end) as u8;
if end > pc {
return Err(LimboError::Corrupt("Invalid block overlap".into()));
return_corrupt!("Invalid block overlap");
}
end = pc + page.read_u16_no_offset(pc as usize + 2);
if end > usable_space {
return Err(LimboError::Corrupt(
"Coalesced block extends beyond page".into(),
));
return_corrupt!("Coalesced block extends beyond page");
}
size = end - offset;
pc = page.read_u16_no_offset(pc as usize);
@@ -2336,7 +2330,7 @@ fn free_cell_range(
let prev_end = pointer_to_pc + page.read_u16_no_offset(pointer_to_pc as usize + 2);
if prev_end + 3 >= offset {
if prev_end > offset {
return Err(LimboError::Corrupt("Invalid previous block overlap".into()));
return_corrupt!("Invalid previous block overlap");
}
removed_fragmentation += (offset - prev_end) as u8;
size = end - pointer_to_pc;
@@ -2344,7 +2338,7 @@ fn free_cell_range(
}
}
if removed_fragmentation > page.num_frag_free_bytes() {
return Err(LimboError::Corrupt("Invalid fragmentation count".into()));
return_corrupt!("Invalid fragmentation count");
}
let frag = page.num_frag_free_bytes() - removed_fragmentation;
page.write_u8(PAGE_HEADER_OFFSET_FRAGMENTED_BYTES_COUNT, frag);
@@ -2354,10 +2348,10 @@ fn free_cell_range(
if offset <= page.cell_content_area() {
if offset < page.cell_content_area() {
return Err(LimboError::Corrupt("Free block before content area".into()));
return_corrupt!("Free block before content area");
}
if pointer_to_pc != page.offset as u16 + PAGE_HEADER_OFFSET_FIRST_FREEBLOCK as u16 {
return Err(LimboError::Corrupt("Invalid content area merge".into()));
return_corrupt!("Invalid content area merge");
}
page.write_u16(PAGE_HEADER_OFFSET_FIRST_FREEBLOCK, pc);
page.write_u16(PAGE_HEADER_OFFSET_CELL_CONTENT_AREA, end);
@@ -2423,7 +2417,7 @@ fn defragment_page(page: &PageContent, usable_space: u16) {
assert!(cbrk >= first_cell);
// set new first byte of cell content
page.write_u16(PAGE_HEADER_OFFSET_CELL_CONTENT_AREA, cbrk as u16);
page.write_u16(PAGE_HEADER_OFFSET_CELL_CONTENT_AREA, cbrk);
// set free block to 0, unused spaced can be retrieved from gap between cell pointer end and content start
page.write_u16(PAGE_HEADER_OFFSET_FIRST_FREEBLOCK, 0);
page.write_u8(PAGE_HEADER_OFFSET_FRAGMENTED_BYTES_COUNT, 0);
@@ -2632,8 +2626,7 @@ fn fill_cell_payload(
write_varint_to_vec(record_buf.len() as u64, cell_payload);
}
let payload_overflow_threshold_max =
payload_overflow_threshold_max(page_type.clone(), usable_space);
let payload_overflow_threshold_max = payload_overflow_threshold_max(page_type, usable_space);
debug!(
"fill_cell_payload(record_size={}, payload_overflow_threshold_max={})",
record_buf.len(),

View File

@@ -19,3 +19,10 @@ pub(crate) mod pager;
pub(crate) mod sqlite3_ondisk;
#[allow(clippy::arc_with_non_send_sync)]
pub(crate) mod wal;
#[macro_export]
macro_rules! return_corrupt {
($msg:expr) => {
return Err(LimboError::Corrupt($msg.into()));
};
}

View File

@@ -1116,7 +1116,7 @@ impl Program {
cursors,
"Column"
);
let record = cursor.record()?;
let record = cursor.record();
if let Some(record) = record.as_ref() {
state.registers[*dest] = if cursor.get_null_flag() {
OwnedValue::Null
@@ -1517,7 +1517,7 @@ impl Program {
let cursor = get_cursor_as_index_mut(&mut cursors, *cursor_id);
let record_from_regs: Record =
make_owned_record(&state.registers, start_reg, num_regs);
if let Some(ref idx_record) = *cursor.record()? {
if let Some(ref idx_record) = *cursor.record() {
// Compare against the same number of values
if idx_record.get_values()[..record_from_regs.len()]
>= record_from_regs.get_values()[..]
@@ -1541,7 +1541,7 @@ impl Program {
let cursor = get_cursor_as_index_mut(&mut cursors, *cursor_id);
let record_from_regs: Record =
make_owned_record(&state.registers, start_reg, num_regs);
if let Some(ref idx_record) = *cursor.record()? {
if let Some(ref idx_record) = *cursor.record() {
// Compare against the same number of values
if idx_record.get_values()[..record_from_regs.len()]
<= record_from_regs.get_values()[..]
@@ -1565,7 +1565,7 @@ impl Program {
let cursor = get_cursor_as_index_mut(&mut cursors, *cursor_id);
let record_from_regs: Record =
make_owned_record(&state.registers, start_reg, num_regs);
if let Some(ref idx_record) = *cursor.record()? {
if let Some(ref idx_record) = *cursor.record() {
// Compare against the same number of values
if idx_record.get_values()[..record_from_regs.len()]
> record_from_regs.get_values()[..]
@@ -1589,7 +1589,7 @@ impl Program {
let cursor = get_cursor_as_index_mut(&mut cursors, *cursor_id);
let record_from_regs: Record =
make_owned_record(&state.registers, start_reg, num_regs);
if let Some(ref idx_record) = *cursor.record()? {
if let Some(ref idx_record) = *cursor.record() {
// Compare against the same number of values
if idx_record.get_values()[..record_from_regs.len()]
< record_from_regs.get_values()[..]