fix free_cell_space extend content area

This commit is contained in:
Pere Diaz Bou
2025-02-17 11:50:15 +01:00
parent ddbfada8bd
commit e25272adc0

View File

@@ -2172,7 +2172,7 @@ fn edit_page(
cell_array,
usable_space,
)?;
assert!(page.cell_count() >= number_tail_removed);
assert!(count_cells >= number_tail_removed);
count_cells -= number_tail_removed;
}
// TODO: make page_free_array defragment, for now I'm lazy so this will work for now.
@@ -2187,17 +2187,19 @@ fn edit_page(
for i in 0..page.overflow_cells.len() {
let overflow_cell = &page.overflow_cells[i];
// cell index in context of new list of cells that should be in the page
let cell_idx = start_old_cells + overflow_cell.index - start_new_cells;
if cell_idx < number_new_cells {
count_cells += 1;
page_insert_array(
page,
start_new_cells + cell_idx,
1,
cell_array,
cell_idx,
usable_space,
)?;
if start_old_cells + overflow_cell.index >= start_new_cells {
let cell_idx = start_old_cells + overflow_cell.index - start_new_cells;
if cell_idx < number_new_cells {
count_cells += 1;
page_insert_array(
page,
start_new_cells + cell_idx,
1,
cell_array,
cell_idx,
usable_space,
)?;
}
}
}
// TODO: append cells to end
@@ -2283,11 +2285,19 @@ fn free_cell_range(
// then we need to do some more calculation to figure out where to insert the freeblock
// in the freeblock linked list.
let first_block = page.first_freeblock();
let maxpc = usable_space;
let mut pc = first_block;
while pc <= maxpc && pc < offset && pc != 0 {
while pc < offset {
if pc <= pointer_to_pc {
if pc == 0 {
break;
}
return Err(LimboError::Corrupt(
"free cell range free block not in ascending order".into(),
));
}
let next = page.read_u16_no_offset(pc as usize);
pointer_to_pc = pc;
pc = next;
@@ -2310,6 +2320,7 @@ fn free_cell_range(
));
}
size = end - offset;
pc = page.read_u16_no_offset(pc as usize);
}
if pointer_to_pc > page.offset as u16 + 1 {
@@ -2340,7 +2351,7 @@ fn free_cell_range(
return Err(LimboError::Corrupt("Invalid content area merge".into()));
}
page.write_u16(PAGE_HEADER_OFFSET_FIRST_FREEBLOCK, pc);
page.write_u16(PAGE_HEADER_OFFSET_CELL_CONTENT_AREA, offset + len);
page.write_u16(PAGE_HEADER_OFFSET_CELL_CONTENT_AREA, end);
} else {
page.write_u16_no_offset(pointer_to_pc as usize, offset);
page.write_u16_no_offset(offset as usize, pc);
@@ -2351,7 +2362,6 @@ fn free_cell_range(
/// Defragment a page. This means packing all the cells to the end of the page.
fn defragment_page(page: &PageContent, usable_space: u16) {
// TODO: test this
tracing::debug!("defragment_page");
let cloned_page = page.clone();
// TODO(pere): usable space should include offset probably
@@ -2574,7 +2584,7 @@ fn allocate_cell_space(page_ref: &PageContent, amount: u16, usable_space: u16) -
if gap + 2 + amount > top {
// defragment
defragment_page(page_ref, usable_space as u16);
defragment_page(page_ref, usable_space);
top = page_ref.read_u16(PAGE_HEADER_OFFSET_CELL_CONTENT_AREA) as usize;
}
@@ -2742,6 +2752,7 @@ fn drop_cell(page: &mut PageContent, cell_idx: usize, usable_space: u16) -> Resu
} else {
page.write_u16(PAGE_HEADER_OFFSET_CELL_CONTENT_AREA, usable_space);
page.write_u16(PAGE_HEADER_OFFSET_FIRST_FREEBLOCK, 0);
page.write_u8(PAGE_HEADER_OFFSET_FRAGMENTED_BYTES_COUNT, 0);
}
page.write_u16(PAGE_HEADER_OFFSET_CELL_COUNT, page.cell_count() as u16 - 1);
Ok(())
@@ -3572,13 +3583,22 @@ mod tests {
0 => {
// allow appends with extra place to insert
let cell_idx = rng.next_u64() as usize % (page.cell_count() + 1);
let record = Record::new([OwnedValue::Integer(i as i64)].to_vec());
let payload = add_record(i, cell_idx, page, record, &db);
let free = compute_free_space(page, usable_space);
let record = Record::new([OwnedValue::Integer(i as i64)].to_vec());
let mut payload: Vec<u8> = Vec::new();
fill_cell_payload(
page.page_type(),
Some(i as u64),
&mut payload,
&record,
4096,
db.pager.clone(),
);
if (free as usize) < payload.len() - 2 {
// do not try to insert overflow pages because they require balancing
continue;
}
insert_into_cell(page, &payload, cell_idx, 4096).unwrap();
assert!(page.overflow_cells.is_empty());
total_size += payload.len() as u16 + 2;
cells.push(Cell { pos: i, payload });
@@ -3608,6 +3628,20 @@ mod tests {
}
}
#[test]
pub fn test_free_space() {
let db = get_database();
let page = get_page(2);
let page = page.get_contents();
let header_size = 8;
let usable_space = 4096;
let record = Record::new([OwnedValue::Integer(0)].to_vec());
let payload = add_record(0, 0, page, record, &db);
let free = compute_free_space(page, usable_space);
assert_eq!(free, 4096 - payload.len() as u16 - 2 - header_size);
}
#[test]
pub fn test_defragment_1() {
let db = get_database();
@@ -3751,6 +3785,85 @@ mod tests {
let _ = add_record(0, 1, page, record, &db);
}
#[test]
pub fn test_fuzz_victim_2() {
let db = get_database();
let page = get_page(2);
let usable_space = 4096;
let insert = |pos, page| {
let record = Record::new([OwnedValue::Integer(0)].to_vec());
let _ = add_record(0, pos, page, record, &db);
};
let drop = |pos, page| {
drop_cell(page, pos, usable_space).unwrap();
};
let defragment = |page| {
defragment_page(page, usable_space);
};
defragment(page.get_contents());
defragment(page.get_contents());
insert(0, page.get_contents());
drop(0, page.get_contents());
insert(0, page.get_contents());
drop(0, page.get_contents());
insert(0, page.get_contents());
defragment(page.get_contents());
defragment(page.get_contents());
drop(0, page.get_contents());
defragment(page.get_contents());
insert(0, page.get_contents());
drop(0, page.get_contents());
insert(0, page.get_contents());
insert(1, page.get_contents());
insert(1, page.get_contents());
insert(0, page.get_contents());
drop(3, page.get_contents());
drop(2, page.get_contents());
compute_free_space(page.get_contents(), usable_space);
}
#[test]
pub fn test_fuzz_victim_3() {
let db = get_database();
let page = get_page(2);
let usable_space = 4096;
let insert = |pos, page| {
let record = Record::new([OwnedValue::Integer(0)].to_vec());
let _ = add_record(0, pos, page, record, &db);
};
let drop = |pos, page| {
drop_cell(page, pos, usable_space).unwrap();
};
let defragment = |page| {
defragment_page(page, usable_space);
};
let record = Record::new([OwnedValue::Integer(0)].to_vec());
let mut payload: Vec<u8> = Vec::new();
fill_cell_payload(
page.get_contents().page_type(),
Some(0),
&mut payload,
&record,
4096,
db.pager.clone(),
);
insert(0, page.get_contents());
defragment(page.get_contents());
insert(0, page.get_contents());
defragment(page.get_contents());
insert(0, page.get_contents());
drop(2, page.get_contents());
drop(0, page.get_contents());
let free = compute_free_space(page.get_contents(), usable_space);
let total_size = payload.len() + 2;
assert_eq!(
free,
usable_space - page.get_contents().header_size() as u16 - total_size as u16
);
dbg!(free);
}
#[test]
pub fn btree_insert_sequential() {
let (pager, root_page) = empty_btree();