bring back corrupt errors

This commit is contained in:
Pere Diaz Bou
2025-02-16 18:51:09 +01:00
parent 1687072d77
commit aea4560422

View File

@@ -786,7 +786,8 @@ impl BTreeCursor {
cell_payload.as_slice(),
cell_idx,
self.usable_space() as u16,
);
)
.unwrap();
contents.overflow_cells.len()
};
let write_info = self
@@ -1076,7 +1077,7 @@ impl BTreeCursor {
cell_idx,
parent_contents.cell_count()
);
drop_cell(parent_contents, cell_idx, self.usable_space() as u16);
drop_cell(parent_contents, cell_idx, self.usable_space() as u16)?;
}
assert_eq!(
write_info.divider_cells.borrow().len(),
@@ -1393,7 +1394,8 @@ impl BTreeCursor {
&new_divider_cell,
first_divider_cell + i,
self.usable_space() as u16,
);
)
.unwrap();
}
// TODO: update pages
let mut done = vec![false; sibling_count_new];
@@ -1796,11 +1798,12 @@ impl BTreeCursor {
&cell_payload,
cell_idx,
self.usable_space() as u16,
);
drop_cell(contents, cell_idx, self.usable_space() as u16);
)
.unwrap();
drop_cell(contents, cell_idx, self.usable_space() as u16)?;
} else {
// For leaf nodes, simply remove the cell
drop_cell(contents, cell_idx, self.usable_space() as u16);
drop_cell(contents, cell_idx, self.usable_space() as u16)?;
}
// TODO(Krishna): Implement balance after delete. I will implement after balance_nonroot is extended.
@@ -2064,7 +2067,7 @@ impl CellArray {
}
/// Try to find a free block available and allocate it if found
fn find_free_cell(page_ref: &PageContent, usable_space: u16, amount: usize) -> usize {
fn find_free_cell(page_ref: &PageContent, usable_space: u16, amount: usize) -> Result<usize> {
// NOTE: freelist is in ascending order of keys and pc
// unuse_space is reserved bytes at the end of page, therefore we must substract from maxpc
let mut pc = page_ref.first_freeblock() as usize;
@@ -2095,13 +2098,23 @@ fn find_free_cell(page_ref: &PageContent, usable_space: u16, amount: usize) -> u
page_ref.write_u8(PAGE_HEADER_OFFSET_FRAGMENTED_BYTES_COUNT, frag);
}
pc += new_size as usize;
return pc;
return Ok(pc);
}
}
prev_pc = pc;
if pc <= prev_pc && pc != 0 {
return Err(LimboError::Corrupt(
"Free list not in ascending order".into(),
));
}
pc = next as usize;
}
0
if pc > maxpc + amount - 4 {
return Err(LimboError::Corrupt(
"Free block chain extends beyond page end".into(),
));
}
Ok(0)
}
pub fn btree_init_page(page: &PageRef, page_type: PageType, offset: usize, usable_space: u16) {
@@ -2132,7 +2145,7 @@ fn edit_page(
number_new_cells: usize,
cell_array: &CellArray,
usable_space: u16,
) {
) -> Result<()> {
tracing::trace!(
"edit_page start_old_cells={} start_new_cells={} number_new_cells={} cell_array={}",
start_old_cells,
@@ -2150,7 +2163,7 @@ fn edit_page(
start_new_cells - start_old_cells,
cell_array,
usable_space,
);
)?;
// shift pointers left
let buf = page.as_ptr();
let (start, _) = page.cell_pointer_array_offset_and_size();
@@ -2167,7 +2180,7 @@ fn edit_page(
end_old_cells - end_new_cells,
cell_array,
usable_space,
);
)?;
assert!(page.cell_count() >= number_tail_removed);
count_cells -= number_tail_removed;
}
@@ -2176,7 +2189,7 @@ fn edit_page(
// TODO: add to start
if start_new_cells < start_old_cells {
let count = number_new_cells.min(start_old_cells - start_new_cells);
page_insert_array(page, start_new_cells, count, cell_array, 0, usable_space);
page_insert_array(page, start_new_cells, count, cell_array, 0, usable_space)?;
count_cells += count;
}
// TODO: overflow cells
@@ -2193,7 +2206,7 @@ fn edit_page(
cell_array,
cell_idx,
usable_space,
);
)?;
}
}
// TODO: append cells to end
@@ -2204,9 +2217,10 @@ fn edit_page(
cell_array,
count_cells,
usable_space,
);
)?;
// TODO: noverflow
page.write_u16(PAGE_HEADER_OFFSET_CELL_COUNT, number_new_cells as u16);
Ok(())
}
fn page_free_array(
@@ -2215,7 +2229,7 @@ fn page_free_array(
count: usize,
cell_array: &CellArray,
usable_space: u16,
) -> usize {
) -> Result<usize> {
let buf = &mut page.as_ptr()[page.offset..usable_space as usize];
let buf_range = buf.as_ptr_range();
let mut number_of_cells_removed = 0;
@@ -2233,12 +2247,12 @@ fn page_free_array(
// TODO: remove pointer too
let offset = (cell_pointer.start as usize - buf_range.start as usize) as u16;
let len = (cell_pointer.end as usize - cell_pointer.start as usize) as u16;
free_cell_range(page, offset, len, usable_space);
free_cell_range(page, offset, len, usable_space)?;
page.write_u16(PAGE_HEADER_OFFSET_CELL_COUNT, page.cell_count() as u16 - 1);
number_of_cells_removed += 1;
}
}
number_of_cells_removed
Ok(number_of_cells_removed)
}
pub fn page_insert_array(
page: &mut PageContent,
@@ -2247,20 +2261,26 @@ pub fn page_insert_array(
cell_array: &CellArray,
mut start_insert: usize,
usable_space: u16,
) {
) -> Result<()> {
// TODO: implement faster algorithm, this is doing extra work that's not needed.
// See pageInsertArray to understand faster way.
for i in first..first + count {
insert_into_cell(page, cell_array.cells[i], start_insert, usable_space);
insert_into_cell(page, cell_array.cells[i], start_insert, usable_space)?;
start_insert += 1;
}
Ok(())
}
/// Free the range of bytes that a cell occupies.
/// This function also updates the freeblock list in the page.
/// Freeblocks are used to keep track of free space in the page,
/// and are organized as a linked list.
fn free_cell_range(page: &mut PageContent, mut offset: u16, len: u16, usable_space: u16) {
fn free_cell_range(
page: &mut PageContent,
mut offset: u16,
len: u16,
usable_space: u16,
) -> Result<()> {
let mut size = len;
let mut end = offset + len;
let mut pointer_to_pc = page.offset as u16 + 1;
@@ -2281,21 +2301,40 @@ fn free_cell_range(page: &mut PageContent, mut offset: u16, len: u16, usable_spa
pointer_to_pc = pc;
pc = next;
}
if pc > usable_space - 4 {
return Err(LimboError::Corrupt("Free block beyond usable space".into()));
}
let mut removed_fragmentation = 0;
if pc > 0 && offset + len + 3 >= pc {
removed_fragmentation = (pc - end) as u8;
if end > pc {
return Err(LimboError::Corrupt("Invalid block overlap".into()));
}
end = pc + page.read_u16_no_offset(pc as usize);
if end > usable_space {
return Err(LimboError::Corrupt(
"Coalesced block extends beyond page".into(),
));
}
size = end - offset;
}
if pointer_to_pc > page.offset as u16 + 1 {
let prev_end = pointer_to_pc + page.read_u16_no_offset(pointer_to_pc as usize + 2);
if prev_end + 3 >= offset {
if prev_end > offset {
return Err(LimboError::Corrupt("Invalid previous block overlap".into()));
}
removed_fragmentation += (offset - prev_end) as u8;
size = end - pointer_to_pc;
offset = pointer_to_pc;
}
}
if removed_fragmentation > page.num_frag_free_bytes() {
return Err(LimboError::Corrupt("Invalid fragmentation count".into()));
}
let frag = page.num_frag_free_bytes() - removed_fragmentation;
page.write_u8(PAGE_HEADER_OFFSET_FRAGMENTED_BYTES_COUNT, frag);
@@ -2303,6 +2342,12 @@ fn free_cell_range(page: &mut PageContent, mut offset: u16, len: u16, usable_spa
};
if offset <= page.cell_content_area() {
if offset < page.cell_content_area() {
return Err(LimboError::Corrupt("Free block before content area".into()));
}
if offset != PAGE_HEADER_OFFSET_FIRST_FREEBLOCK as u16 {
return Err(LimboError::Corrupt("Invalid content area merge".into()));
}
page.write_u16(PAGE_HEADER_OFFSET_FIRST_FREEBLOCK, pc);
page.write_u16(PAGE_HEADER_OFFSET_CELL_CONTENT_AREA, offset + len);
} else {
@@ -2310,6 +2355,7 @@ fn free_cell_range(page: &mut PageContent, mut offset: u16, len: u16, usable_spa
page.write_u16_no_offset(offset as usize, pc);
page.write_u16_no_offset(offset as usize + 2, size);
}
Ok(())
}
/// Defragment a page. This means packing all the cells to the end of the page.
@@ -2378,7 +2424,12 @@ fn defragment_page(page: &PageContent, usable_space: u16) {
/// insert_into_cell() is called from insert_into_page(),
/// and the overflow cell count is used to determine if the page overflows,
/// i.e. whether we need to balance the btree after the insert.
fn insert_into_cell(page: &mut PageContent, payload: &[u8], cell_idx: usize, usable_space: u16) {
fn insert_into_cell(
page: &mut PageContent,
payload: &[u8],
cell_idx: usize,
usable_space: u16,
) -> Result<()> {
assert!(
cell_idx <= page.cell_count(),
"attempting to add cell to an incorrect place cell_idx={} cell_count={}",
@@ -2394,11 +2445,11 @@ fn insert_into_cell(page: &mut PageContent, payload: &[u8], cell_idx: usize, usa
index: cell_idx,
payload: Pin::new(Vec::from(payload)),
});
return;
return Ok(());
}
// TODO: insert into cell payload in internal page
let new_cell_data_pointer = allocate_cell_space(page, payload.len() as u16, usable_space);
let new_cell_data_pointer = allocate_cell_space(page, payload.len() as u16, usable_space)?;
let buf = page.as_ptr();
// copy data
@@ -2423,6 +2474,7 @@ fn insert_into_cell(page: &mut PageContent, payload: &[u8], cell_idx: usize, usa
// update cell count
let new_n_cells = (page.cell_count() + 1) as u16;
page.write_u16(PAGE_HEADER_OFFSET_CELL_COUNT, new_n_cells);
Ok(())
}
/// Free blocks can be zero, meaning the "real free space" that can be used to allocate is expected to be between first cell byte
@@ -2512,7 +2564,7 @@ fn compute_free_space(page: &PageContent, usable_space: u16) -> u16 {
}
/// Allocate space for a cell on a page.
fn allocate_cell_space(page_ref: &PageContent, amount: u16, usable_space: u16) -> u16 {
fn allocate_cell_space(page_ref: &PageContent, amount: u16, usable_space: u16) -> Result<u16> {
let amount = amount as usize;
let (cell_offset, _) = page_ref.cell_pointer_array_offset_and_size();
@@ -2522,9 +2574,9 @@ fn allocate_cell_space(page_ref: &PageContent, amount: u16, usable_space: u16) -
// there are free blocks and enough space
if page_ref.first_freeblock() != 0 && gap + 2 <= top {
// find slot
let pc = find_free_cell(page_ref, usable_space, amount);
let pc = find_free_cell(page_ref, usable_space, amount)?;
if pc != 0 {
return pc as u16;
return Ok(pc as u16);
}
/* fall through, we might need to defragment */
}
@@ -2540,7 +2592,7 @@ fn allocate_cell_space(page_ref: &PageContent, amount: u16, usable_space: u16) -
page_ref.write_u16(PAGE_HEADER_OFFSET_CELL_CONTENT_AREA, top as u16);
assert!(top + amount <= usable_space as usize);
top as u16
Ok(top as u16)
}
/// Fill in the cell payload with the record.
@@ -2686,14 +2738,14 @@ fn payload_overflow_threshold_min(_page_type: PageType, usable_space: u16) -> us
/// Drop a cell from a page.
/// This is done by freeing the range of bytes that the cell occupies.
fn drop_cell(page: &mut PageContent, cell_idx: usize, usable_space: u16) {
fn drop_cell(page: &mut PageContent, cell_idx: usize, usable_space: u16) -> Result<()> {
let (cell_start, cell_len) = page.cell_get_raw_region(
cell_idx,
payload_overflow_threshold_max(page.page_type(), usable_space),
payload_overflow_threshold_min(page.page_type(), usable_space),
usable_space as usize,
);
free_cell_range(page, cell_start as u16, cell_len as u16, usable_space);
free_cell_range(page, cell_start as u16, cell_len as u16, usable_space)?;
if page.cell_count() > 1 {
shift_pointers_left(page, cell_idx);
} else {
@@ -2701,6 +2753,7 @@ fn drop_cell(page: &mut PageContent, cell_idx: usize, usable_space: u16) {
page.write_u16(PAGE_HEADER_OFFSET_FIRST_FREEBLOCK, 0);
}
page.write_u16(PAGE_HEADER_OFFSET_CELL_COUNT, page.cell_count() as u16 - 1);
Ok(())
}
/// Shift pointers to the left once starting from a cell position
@@ -2816,7 +2869,7 @@ mod tests {
4096,
db.pager.clone(),
);
insert_into_cell(page, &payload, pos, 4096);
insert_into_cell(page, &payload, pos, 4096).unwrap();
payload
}
@@ -2867,7 +2920,7 @@ mod tests {
ensure_cell(page, i, &cell.payload);
}
cells.remove(1);
drop_cell(page, 1, usable_space);
drop_cell(page, 1, usable_space).unwrap();
for (i, cell) in cells.iter().enumerate() {
ensure_cell(page, i, &cell.payload);
@@ -3178,7 +3231,7 @@ mod tests {
let mut new_cells = Vec::new();
for cell in cells {
if cell.pos % 2 == 1 {
drop_cell(page, cell.pos - removed, usable_space);
drop_cell(page, cell.pos - removed, usable_space).unwrap();
removed += 1;
} else {
new_cells.push(cell);
@@ -3444,7 +3497,7 @@ mod tests {
ensure_cell(page, i, &cell.payload);
}
cells.remove(1);
drop_cell(page, 1, usable_space);
drop_cell(page, 1, usable_space).unwrap();
for (i, cell) in cells.iter().enumerate() {
ensure_cell(page, i, &cell.payload);
@@ -3484,7 +3537,7 @@ mod tests {
let mut new_cells = Vec::new();
for cell in cells {
if cell.pos % 2 == 1 {
drop_cell(page, cell.pos - removed, usable_space);
drop_cell(page, cell.pos - removed, usable_space).unwrap();
removed += 1;
} else {
new_cells.push(cell);
@@ -3546,7 +3599,7 @@ mod tests {
payload_overflow_threshold_min(page.page_type(), 4096),
usable_space as usize,
);
drop_cell(page, cell_idx, usable_space);
drop_cell(page, cell_idx, usable_space).unwrap();
total_size -= len as u16 + 2;
cells.remove(cell_idx);
}
@@ -3602,7 +3655,7 @@ mod tests {
let payload = add_record(0, 0, page, record, &db);
assert_eq!(page.cell_count(), 1);
drop_cell(page, 0, usable_space);
drop_cell(page, 0, usable_space).unwrap();
assert_eq!(page.cell_count(), 0);
let record = Record::new([OwnedValue::Integer(0 as i64)].to_vec());
@@ -3638,7 +3691,7 @@ mod tests {
for i in 0..100 {
assert_eq!(page.cell_count(), 1);
drop_cell(page, 0, usable_space);
drop_cell(page, 0, usable_space).unwrap();
assert_eq!(page.cell_count(), 0);
let record = Record::new([OwnedValue::Integer(0 as i64)].to_vec());
@@ -3671,8 +3724,8 @@ mod tests {
let record = Record::new([OwnedValue::Integer(2 as i64)].to_vec());
let _ = add_record(2, 2, page, record, &db);
drop_cell(page, 1, usable_space);
drop_cell(page, 1, usable_space);
drop_cell(page, 1, usable_space).unwrap();
drop_cell(page, 1, usable_space).unwrap();
ensure_cell(page, 0, &payload);
}
@@ -3691,14 +3744,14 @@ mod tests {
let record = Record::new([OwnedValue::Integer(0 as i64)].to_vec());
let _ = add_record(0, 0, page, record, &db);
drop_cell(page, 0, usable_space);
drop_cell(page, 0, usable_space).unwrap();
defragment_page(page, usable_space);
let record = Record::new([OwnedValue::Integer(0 as i64)].to_vec());
let _ = add_record(0, 1, page, record, &db);
drop_cell(page, 0, usable_space);
drop_cell(page, 0, usable_space).unwrap();
let record = Record::new([OwnedValue::Integer(0 as i64)].to_vec());
let _ = add_record(0, 1, page, record, &db);