fix rebase

This commit is contained in:
Pere Diaz Bou
2025-02-16 18:31:03 +01:00
parent b5ec5186ea
commit b64cc769b6
3 changed files with 142 additions and 71 deletions

View File

@@ -2,8 +2,7 @@ use tracing::debug;
use crate::storage::pager::Pager;
use crate::storage::sqlite3_ondisk::{
read_u32, read_varint, BTreeCell, DatabaseHeader, PageContent, PageType, TableInteriorCell,
TableLeafCell,
read_varint, BTreeCell, PageContent, PageType, TableInteriorCell, TableLeafCell,
};
use crate::types::{CursorResult, OwnedValue, Record, SeekKey, SeekOp};
@@ -96,8 +95,6 @@ struct WriteInfo {
scratch_cells: RefCell<Vec<Vec<u8>>>,
/// Bookkeeping of the rightmost pointer so the PAGE_HEADER_OFFSET_RIGHTMOST_PTR can be updated.
rightmost_pointer: RefCell<Option<*mut u8>>,
/// Copy of the current page needed for buffer references.
page_copy: RefCell<Option<PageContent>>,
/// Divider cells of old pages
divider_cells: RefCell<Vec<Vec<u8>>>,
/// Number of siblings being used to balance
@@ -112,7 +109,6 @@ impl WriteInfo {
state: WriteState::Start,
scratch_cells: RefCell::new(Vec::new()),
rightmost_pointer: RefCell::new(None),
page_copy: RefCell::new(None),
pages_to_balance: RefCell::new(Vec::new()),
divider_cells: RefCell::new(Vec::new()),
sibling_count: RefCell::new(0),
@@ -847,7 +843,6 @@ impl BTreeCursor {
if !self.stack.has_parent() {
self.balance_root();
return Ok(CursorResult::Ok(()));
}
let write_info = self.state.mut_write_info().unwrap();
@@ -859,6 +854,8 @@ impl BTreeCursor {
WriteState::BalanceNonRoot | WriteState::BalanceNonRootWaitLoadPages => {
return_if_io!(self.balance_non_root());
}
WriteState::Finish => return Ok(CursorResult::Ok(())),
_ => panic!("unexpected state on balance {:?}", state),
}
}
}
@@ -875,7 +872,7 @@ impl BTreeCursor {
.expect("must be balancing")
.state
.clone();
tracing::trace!("balance_non_root(state={:?})", state);
tracing::debug!("balance_non_root(state={:?})", state);
let (next_write_state, result) = match state {
WriteState::Start => todo!(),
WriteState::BalanceStart => todo!(),
@@ -900,6 +897,10 @@ impl BTreeCursor {
parent_page.get().id,
page_to_balance_idx
);
assert!(matches!(
parent_contents.page_type(),
PageType::IndexInterior | PageType::TableInterior
));
// Part 1: Find the sibling pages to balance
write_info.new_pages.borrow_mut().clear();
write_info.pages_to_balance.borrow_mut().clear();
@@ -965,7 +966,7 @@ impl BTreeCursor {
// start loading right page first
let mut pgno: u32 = unsafe { right_pointer.cast::<u32>().read().swap_bytes() };
write_info.rightmost_pointer.replace(Some(right_pointer));
let mut current_sibling = sibling_pointer;
let current_sibling = sibling_pointer;
for i in (0..=current_sibling).rev() {
let page = self.pager.read_page(pgno as usize)?;
write_info.pages_to_balance.borrow_mut().push(page);
@@ -1070,7 +1071,6 @@ impl BTreeCursor {
.divider_cells
.borrow_mut()
.push(cell_buf.to_vec());
log::trace!(
tracing::trace!(
"dropping divider cell from parent cell_idx={} count={}",
cell_idx,
@@ -1697,8 +1697,8 @@ impl BTreeCursor {
let cell = contents.cell_get(
idx,
self.pager.clone(),
self.payload_overflow_threshold_max(contents.page_type()),
self.payload_overflow_threshold_min(contents.page_type()),
payload_overflow_threshold_max(contents.page_type(), self.usable_space() as u16),
payload_overflow_threshold_min(contents.page_type(), self.usable_space() as u16),
self.usable_space(),
)?;
@@ -1719,8 +1719,8 @@ impl BTreeCursor {
let cell = contents.cell_get(
cell_idx,
self.pager.clone(),
self.payload_overflow_threshold_max(contents.page_type()),
self.payload_overflow_threshold_min(contents.page_type()),
payload_overflow_threshold_max(contents.page_type(), self.usable_space() as u16),
payload_overflow_threshold_min(contents.page_type(), self.usable_space() as u16),
self.usable_space(),
)?;
@@ -1768,8 +1768,14 @@ impl BTreeCursor {
let predecessor_cell = leaf_contents.cell_get(
leaf_cell_idx,
self.pager.clone(),
self.payload_overflow_threshold_max(leaf_contents.page_type()),
self.payload_overflow_threshold_min(leaf_contents.page_type()),
payload_overflow_threshold_max(
leaf_contents.page_type(),
self.usable_space() as u16,
),
payload_overflow_threshold_min(
leaf_contents.page_type(),
self.usable_space() as u16,
),
self.usable_space(),
)?;
@@ -1785,11 +1791,16 @@ impl BTreeCursor {
}
_ => unreachable!("Expected table leaf cell"),
}
self.insert_into_cell(contents, &cell_payload, cell_idx);
self.drop_cell(contents, cell_idx);
insert_into_cell(
contents,
&cell_payload,
cell_idx,
self.usable_space() as u16,
);
drop_cell(contents, cell_idx, self.usable_space() as u16);
} else {
// For leaf nodes, simply remove the cell
self.drop_cell(contents, cell_idx);
drop_cell(contents, cell_idx, self.usable_space() as u16);
}
// TODO(Krishna): Implement balance after delete. I will implement after balance_nonroot is extended.
@@ -1811,6 +1822,7 @@ impl BTreeCursor {
};
return_if_io!(self.move_to(SeekKey::TableRowId(*int_key as u64), SeekOp::EQ));
let page = self.stack.top();
dbg!(page.get().id);
// TODO(pere): request load
return_if_locked!(page);
@@ -1917,8 +1929,10 @@ impl BTreeCursor {
payload_len: usize,
page_type: PageType,
) -> Result<Option<usize>> {
let max_local = self.payload_overflow_threshold_max(page_type.clone());
let min_local = self.payload_overflow_threshold_min(page_type.clone());
let max_local =
payload_overflow_threshold_max(page_type.clone(), self.usable_space() as u16);
let min_local =
payload_overflow_threshold_min(page_type.clone(), self.usable_space() as u16);
let usable_size = self.usable_space();
let (_, local_size) = payload_overflows(payload_len, max_local, min_local, usable_size);
@@ -1944,7 +1958,7 @@ impl PageStack {
/// Push a new page onto the stack.
/// This effectively means traversing to a child page.
fn push(&self, page: PageRef) {
debug!(
tracing::trace!(
"pagestack::push(current={}, new_page_id={})",
self.current_page.borrow(),
page.get().id
@@ -1963,7 +1977,7 @@ impl PageStack {
/// This effectively means traversing back up to a parent page.
fn pop(&self) {
let current = *self.current_page.borrow();
debug!("pagestack::pop(current={})", current);
tracing::trace!("pagestack::pop(current={})", current);
self.cell_indices.borrow_mut()[current as usize] = 0;
self.stack.borrow_mut()[current as usize] = None;
*self.current_page.borrow_mut() -= 1;
@@ -1977,11 +1991,11 @@ impl PageStack {
.as_ref()
.unwrap()
.clone();
// debug!(
// "pagestack::top(current={}, page_id={})",
// current,
// page.get().id
// );
tracing::trace!(
"pagestack::top(current={}, page_id={})",
current,
page.get().id
);
page
}
@@ -2016,11 +2030,13 @@ impl PageStack {
/// We usually advance after going traversing a new page
fn advance(&self) {
let current = self.current();
tracing::trace!("advance {}", self.cell_indices.borrow()[current]);
self.cell_indices.borrow_mut()[current] += 1;
}
fn retreat(&self) {
let current = self.current();
tracing::trace!("retreat {}", self.cell_indices.borrow()[current]);
self.cell_indices.borrow_mut()[current] -= 1;
}
@@ -2110,7 +2126,7 @@ fn to_static_buf(buf: &mut [u8]) -> &'static mut [u8] {
unsafe { std::mem::transmute::<&mut [u8], &'static mut [u8]>(buf) }
}
pub fn edit_page(
fn edit_page(
page: &mut PageContent,
start_old_cells: usize,
start_new_cells: usize,
@@ -2118,7 +2134,7 @@ pub fn edit_page(
cell_array: &CellArray,
usable_space: u16,
) {
log::trace!(
tracing::trace!(
"edit_page start_old_cells={} start_new_cells={} number_new_cells={} cell_array={}",
start_old_cells,
start_new_cells,
@@ -2169,7 +2185,7 @@ pub fn edit_page(
let overflow_cell = &page.overflow_cells[i];
// cell index in context of new list of cells that should be in the page
let cell_idx = start_old_cells + overflow_cell.index - start_new_cells;
if cell_idx >= 0 && cell_idx < number_new_cells {
if cell_idx < number_new_cells {
count_cells += 1;
page_insert_array(
page,
@@ -2194,7 +2210,7 @@ pub fn edit_page(
page.write_u16(PAGE_HEADER_OFFSET_CELL_COUNT, number_new_cells as u16);
}
pub fn page_free_array(
fn page_free_array(
page: &mut PageContent,
first: usize,
count: usize,
@@ -2300,7 +2316,7 @@ fn free_cell_range(page: &mut PageContent, mut offset: u16, len: u16, usable_spa
/// Defragment a page. This means packing all the cells to the end of the page.
fn defragment_page(page: &PageContent, usable_space: u16) {
// TODO: test this
log::debug!("defragment_page");
tracing::debug!("defragment_page");
let cloned_page = page.clone();
// TODO(pere): usable space should include offset probably
let mut cbrk = usable_space;
@@ -2534,7 +2550,7 @@ fn fill_cell_payload(
page_type: PageType,
int_key: Option<u64>,
cell_payload: &mut Vec<u8>,
record: &OwnedRecord,
record: &Record,
usable_space: u16,
pager: Rc<Pager>,
) {
@@ -2712,14 +2728,15 @@ mod tests {
use crate::storage::database::FileStorage;
use crate::storage::page_cache::DumbLruPageCache;
use crate::storage::sqlite3_ondisk;
use crate::storage::sqlite3_ondisk::DatabaseHeader;
use crate::types::Text;
use crate::{BufferPool, DatabaseStorage, WalFile, WalFileShared, WriteCompletion};
use std::cell::RefCell;
use std::panic;
use std::rc::Rc;
use std::sync::Arc;
use std::{cell::RefCell, panic, rc::Rc, sync::Arc};
use rand::{thread_rng, Rng, SeedableRng};
use rand_chacha::ChaCha8Rng;
use rand::{thread_rng, Rng};
use tempfile::TempDir;
use crate::{
@@ -2732,14 +2749,14 @@ mod tests {
pager::PageRef,
sqlite3_ondisk::{BTreeCell, PageContent, PageType},
},
types::{LimboText, OwnedRecord, OwnedValue, Record},
Buffer, Database, Page, Pager, PlatformIO, Value, DATABASE_VERSION, IO,
types::{OwnedValue, Record},
Database, Page, Pager, PlatformIO,
};
use super::{btree_init_page, defragment_page, drop_cell, insert_into_cell};
fn get_page(id: usize) -> PageRef {
let page = Arc::new(Page::new(2));
let page = Arc::new(Page::new(id));
let drop_fn = Rc::new(|_| {});
let inner = PageContent {
@@ -2778,7 +2795,7 @@ mod tests {
payload_overflow_threshold_min(page.page_type(), 4096),
4096,
);
log::trace!("cell idx={} start={} len={}", cell_idx, cell.0, cell.1);
tracing::trace!("cell idx={} start={} len={}", cell_idx, cell.0, cell.1);
let buf = &page.as_ptr()[cell.0..cell.0 + cell.1];
assert_eq!(buf.len(), payload.len());
assert_eq!(buf, payload);
@@ -2788,7 +2805,7 @@ mod tests {
id: usize,
pos: usize,
page: &mut PageContent,
record: OwnedRecord,
record: Record,
db: &Arc<Database>,
) -> Vec<u8> {
let mut payload: Vec<u8> = Vec::new();
@@ -2810,7 +2827,7 @@ mod tests {
let page = get_page(2);
let page = page.get_contents();
let header_size = 8;
let record = OwnedRecord::new([OwnedValue::Integer(1)].to_vec());
let record = Record::new([OwnedValue::Integer(1)].to_vec());
let payload = add_record(1, 0, page, record, &db);
assert_eq!(page.cell_count(), 1);
let free = compute_free_space(page, 4096);
@@ -2838,7 +2855,7 @@ mod tests {
let mut cells = Vec::new();
let usable_space = 4096;
for i in 0..3 {
let record = OwnedRecord::new([OwnedValue::Integer(i as i64)].to_vec());
let record = Record::new([OwnedValue::Integer(i as i64)].to_vec());
let payload = add_record(i, i, page, record, &db);
assert_eq!(page.cell_count(), i + 1);
let free = compute_free_space(page, usable_space);
@@ -2872,8 +2889,8 @@ mod tests {
.cell_get(
cell_idx,
pager.clone(),
cursor.payload_overflow_threshold_max(page_type),
cursor.payload_overflow_threshold_min(page_type),
payload_overflow_threshold_max(page_type, 4096),
payload_overflow_threshold_min(page_type, 4096),
cursor.usable_space(),
)
.unwrap();
@@ -2935,8 +2952,8 @@ mod tests {
.cell_get(
cell_idx,
pager.clone(),
cursor.payload_overflow_threshold_max(page_type),
cursor.payload_overflow_threshold_min(page_type),
payload_overflow_threshold_max(page_type, 4096),
payload_overflow_threshold_min(page_type, 4096),
cursor.usable_space(),
)
.unwrap();
@@ -3002,7 +3019,7 @@ mod tests {
};
let pager = Rc::new(pager);
let page1 = pager.allocate_page().unwrap();
btree_init_page(&page1, PageType::TableLeaf, &db_header, 0);
btree_init_page(&page1, PageType::TableLeaf, 0, 4096);
(pager, page1.get().id)
}
@@ -3149,7 +3166,7 @@ mod tests {
let usable_space = 4096;
let total_cells = 10;
for i in 0..total_cells {
let record = OwnedRecord::new([OwnedValue::Integer(i as i64)].to_vec());
let record = Record::new([OwnedValue::Integer(i as i64)].to_vec());
let payload = add_record(i, i, page, record, &db);
assert_eq!(page.cell_count(), i + 1);
let free = compute_free_space(page, usable_space);
@@ -3269,7 +3286,7 @@ mod tests {
let (pager, db_header) = setup_test_env(5);
let cursor = BTreeCursor::new(pager.clone(), 1);
let max_local = cursor.payload_overflow_threshold_max(PageType::TableLeaf);
let max_local = payload_overflow_threshold_max(PageType::TableLeaf, 4096);
let usable_size = cursor.usable_space();
// Create a large payload that will definitely trigger overflow
@@ -3415,7 +3432,7 @@ mod tests {
let mut cells = Vec::new();
let usable_space = 4096;
for i in 0..3 {
let record = OwnedRecord::new([OwnedValue::Integer(i as i64)].to_vec());
let record = Record::new([OwnedValue::Integer(i as i64)].to_vec());
let payload = add_record(i, i, page, record, &db);
assert_eq!(page.cell_count(), i + 1);
let free = compute_free_space(page, usable_space);
@@ -3455,7 +3472,7 @@ mod tests {
let usable_space = 4096;
let total_cells = 10;
for i in 0..total_cells {
let record = OwnedRecord::new([OwnedValue::Integer(i as i64)].to_vec());
let record = Record::new([OwnedValue::Integer(i as i64)].to_vec());
let payload = add_record(i, i, page, record, &db);
assert_eq!(page.cell_count(), i + 1);
let free = compute_free_space(page, usable_space);
@@ -3504,11 +3521,11 @@ mod tests {
let mut rng = ChaCha8Rng::seed_from_u64(seed);
while i > 0 {
i -= 1;
match rng.gen_range(0..3) {
match rng.next_u64() % 3 {
0 => {
// allow appends with extra place to insert
let cell_idx = rng.gen_range(0..=page.cell_count());
let record = OwnedRecord::new([OwnedValue::Integer(i as i64)].to_vec());
let cell_idx = rng.next_u64() as usize % (page.cell_count() + 1);
let record = Record::new([OwnedValue::Integer(i as i64)].to_vec());
let payload = add_record(i, cell_idx, page, record, &db);
let free = compute_free_space(page, usable_space);
if (free as usize) < payload.len() - 2 {
@@ -3523,7 +3540,7 @@ mod tests {
if page.cell_count() == 0 {
continue;
}
let cell_idx = rng.gen_range(0..page.cell_count());
let cell_idx = rng.next_u64() as usize % page.cell_count();
let (_, len) = page.cell_get_raw_region(
cell_idx,
payload_overflow_threshold_max(page.page_type(), 4096),
@@ -3552,7 +3569,7 @@ mod tests {
let page = page.get_contents();
let usable_space = 4096;
let record = OwnedRecord::new([OwnedValue::Integer(0 as i64)].to_vec());
let record = Record::new([OwnedValue::Integer(0 as i64)].to_vec());
let payload = add_record(0, 0, page, record, &db);
assert_eq!(page.cell_count(), 1);
@@ -3576,10 +3593,10 @@ mod tests {
let page = page.get_contents();
let usable_space = 4096;
let record = OwnedRecord::new(
let record = Record::new(
[
OwnedValue::Integer(0 as i64),
OwnedValue::Text(LimboText::new(Rc::new("aaaaaaaa".to_string()))),
OwnedValue::Text(Text::new("aaaaaaaa")),
]
.to_vec(),
);
@@ -3589,7 +3606,7 @@ mod tests {
drop_cell(page, 0, usable_space);
assert_eq!(page.cell_count(), 0);
let record = OwnedRecord::new([OwnedValue::Integer(0 as i64)].to_vec());
let record = Record::new([OwnedValue::Integer(0 as i64)].to_vec());
let payload = add_record(0, 0, page, record, &db);
assert_eq!(page.cell_count(), 1);
@@ -3611,10 +3628,10 @@ mod tests {
let page = page.get_contents();
let usable_space = 4096;
let record = OwnedRecord::new(
let record = Record::new(
[
OwnedValue::Integer(0 as i64),
OwnedValue::Text(LimboText::new(Rc::new("aaaaaaaa".to_string()))),
OwnedValue::Text(Text::new("aaaaaaaa")),
]
.to_vec(),
);
@@ -3625,7 +3642,7 @@ mod tests {
drop_cell(page, 0, usable_space);
assert_eq!(page.cell_count(), 0);
let record = OwnedRecord::new([OwnedValue::Integer(0 as i64)].to_vec());
let record = Record::new([OwnedValue::Integer(0 as i64)].to_vec());
let payload = add_record(0, 0, page, record, &db);
assert_eq!(page.cell_count(), 1);
@@ -3648,11 +3665,11 @@ mod tests {
let page = page.get_contents();
let usable_space = 4096;
let record = OwnedRecord::new([OwnedValue::Integer(0 as i64)].to_vec());
let record = Record::new([OwnedValue::Integer(0 as i64)].to_vec());
let payload = add_record(0, 0, page, record, &db);
let record = OwnedRecord::new([OwnedValue::Integer(1 as i64)].to_vec());
let record = Record::new([OwnedValue::Integer(1 as i64)].to_vec());
let _ = add_record(1, 1, page, record, &db);
let record = OwnedRecord::new([OwnedValue::Integer(2 as i64)].to_vec());
let record = Record::new([OwnedValue::Integer(2 as i64)].to_vec());
let _ = add_record(2, 2, page, record, &db);
drop_cell(page, 1, usable_space);
@@ -3670,24 +3687,75 @@ mod tests {
let page = page.get_contents();
let usable_space = 4096;
let record = OwnedRecord::new([OwnedValue::Integer(0 as i64)].to_vec());
let record = Record::new([OwnedValue::Integer(0 as i64)].to_vec());
let _ = add_record(0, 0, page, record, &db);
let record = OwnedRecord::new([OwnedValue::Integer(0 as i64)].to_vec());
let record = Record::new([OwnedValue::Integer(0 as i64)].to_vec());
let _ = add_record(0, 0, page, record, &db);
drop_cell(page, 0, usable_space);
defragment_page(page, usable_space);
let record = OwnedRecord::new([OwnedValue::Integer(0 as i64)].to_vec());
let record = Record::new([OwnedValue::Integer(0 as i64)].to_vec());
let _ = add_record(0, 1, page, record, &db);
drop_cell(page, 0, usable_space);
let record = OwnedRecord::new([OwnedValue::Integer(0 as i64)].to_vec());
let record = Record::new([OwnedValue::Integer(0 as i64)].to_vec());
let _ = add_record(0, 1, page, record, &db);
}
#[test]
fn btree_insert_sequential() {
let (pager, root_page) = empty_btree();
let mut keys = Vec::new();
for i in 0..10000 {
let mut cursor = BTreeCursor::new(pager.clone(), root_page);
tracing::info!("INSERT INTO t VALUES ({});", i,);
let key = OwnedValue::Integer(i);
let value = Record::new(vec![OwnedValue::Integer(i)]);
tracing::trace!("before insert {}", i);
loop {
let key = SeekKey::TableRowId(i as u64);
match cursor.move_to(key, SeekOp::EQ).unwrap() {
CursorResult::Ok(_) => break,
CursorResult::IO => {
pager.io.run_once().unwrap();
}
}
}
loop {
match cursor.insert(&key, &value, true).unwrap() {
CursorResult::Ok(_) => break,
CursorResult::IO => {
pager.io.run_once().unwrap();
}
}
}
keys.push(i);
}
if matches!(validate_btree(pager.clone(), root_page), (_, false)) {
panic!("invalid btree");
}
tracing::trace!(
"=========== btree ===========\n{}\n\n",
format_btree(pager.clone(), root_page, 0)
);
for key in keys.iter() {
let mut cursor = BTreeCursor::new(pager.clone(), root_page);
let key = OwnedValue::Integer(*key);
loop {
match cursor.exists(&key).unwrap() {
CursorResult::Ok(exists) => {
assert!(exists, "key {} is not found", key);
break;
}
CursorResult::IO => pager.io.run_once().unwrap(),
}
}
}
}
fn set_breakpoint_panic() {
// Set custom panic hook at start of program
panic::set_hook(Box::new(|panic_info| {

View File

@@ -462,7 +462,7 @@ impl PageContent {
}
pub fn write_u16_no_offset(&self, pos: usize, value: u16) {
log::debug!("write_u16(pos={}, value={})", pos, value);
tracing::debug!("write_u16(pos={}, value={})", pos, value);
let buf = self.as_ptr();
buf[pos..pos + 2].copy_from_slice(&value.to_be_bytes());
}

View File

@@ -2467,6 +2467,9 @@ impl Program {
_ => unreachable!("Not a record! Cannot insert a non record value."),
};
let key = &state.registers[*key_reg];
// NOTE(pere): Sending moved_before == true is okay because we moved before but
// if we were to set to false after starting a balance procedure, it might
// leave undefined state.
return_if_io!(cursor.insert(key, record, true));
state.pc += 1;
}