diff --git a/core/storage/btree.rs b/core/storage/btree.rs index b33109f5b..ebe83174e 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -2172,7 +2172,7 @@ fn edit_page( cell_array, usable_space, )?; - assert!(page.cell_count() >= number_tail_removed); + assert!(count_cells >= number_tail_removed); count_cells -= number_tail_removed; } // TODO: make page_free_array defragment, for now I'm lazy so this will work for now. @@ -2187,17 +2187,19 @@ fn edit_page( for i in 0..page.overflow_cells.len() { let overflow_cell = &page.overflow_cells[i]; // cell index in context of new list of cells that should be in the page - let cell_idx = start_old_cells + overflow_cell.index - start_new_cells; - if cell_idx < number_new_cells { - count_cells += 1; - page_insert_array( - page, - start_new_cells + cell_idx, - 1, - cell_array, - cell_idx, - usable_space, - )?; + if start_old_cells + overflow_cell.index >= start_new_cells { + let cell_idx = start_old_cells + overflow_cell.index - start_new_cells; + if cell_idx < number_new_cells { + count_cells += 1; + page_insert_array( + page, + start_new_cells + cell_idx, + 1, + cell_array, + cell_idx, + usable_space, + )?; + } } } // TODO: append cells to end @@ -2283,11 +2285,19 @@ fn free_cell_range( // then we need to do some more calculation to figure out where to insert the freeblock // in the freeblock linked list. let first_block = page.first_freeblock(); - let maxpc = usable_space; let mut pc = first_block; - while pc <= maxpc && pc < offset && pc != 0 { + while pc < offset { + if pc <= pointer_to_pc { + if pc == 0 { + break; + } + return Err(LimboError::Corrupt( + "free cell range free block not in ascending order".into(), + )); + } + let next = page.read_u16_no_offset(pc as usize); pointer_to_pc = pc; pc = next; @@ -2310,6 +2320,7 @@ fn free_cell_range( )); } size = end - offset; + pc = page.read_u16_no_offset(pc as usize); } if pointer_to_pc > page.offset as u16 + 1 { @@ -2340,7 +2351,7 @@ fn free_cell_range( return Err(LimboError::Corrupt("Invalid content area merge".into())); } page.write_u16(PAGE_HEADER_OFFSET_FIRST_FREEBLOCK, pc); - page.write_u16(PAGE_HEADER_OFFSET_CELL_CONTENT_AREA, offset + len); + page.write_u16(PAGE_HEADER_OFFSET_CELL_CONTENT_AREA, end); } else { page.write_u16_no_offset(pointer_to_pc as usize, offset); page.write_u16_no_offset(offset as usize, pc); @@ -2351,7 +2362,6 @@ fn free_cell_range( /// Defragment a page. This means packing all the cells to the end of the page. fn defragment_page(page: &PageContent, usable_space: u16) { - // TODO: test this tracing::debug!("defragment_page"); let cloned_page = page.clone(); // TODO(pere): usable space should include offset probably @@ -2574,7 +2584,7 @@ fn allocate_cell_space(page_ref: &PageContent, amount: u16, usable_space: u16) - if gap + 2 + amount > top { // defragment - defragment_page(page_ref, usable_space as u16); + defragment_page(page_ref, usable_space); top = page_ref.read_u16(PAGE_HEADER_OFFSET_CELL_CONTENT_AREA) as usize; } @@ -2742,6 +2752,7 @@ fn drop_cell(page: &mut PageContent, cell_idx: usize, usable_space: u16) -> Resu } else { page.write_u16(PAGE_HEADER_OFFSET_CELL_CONTENT_AREA, usable_space); page.write_u16(PAGE_HEADER_OFFSET_FIRST_FREEBLOCK, 0); + page.write_u8(PAGE_HEADER_OFFSET_FRAGMENTED_BYTES_COUNT, 0); } page.write_u16(PAGE_HEADER_OFFSET_CELL_COUNT, page.cell_count() as u16 - 1); Ok(()) @@ -3572,13 +3583,22 @@ mod tests { 0 => { // allow appends with extra place to insert let cell_idx = rng.next_u64() as usize % (page.cell_count() + 1); - let record = Record::new([OwnedValue::Integer(i as i64)].to_vec()); - let payload = add_record(i, cell_idx, page, record, &db); let free = compute_free_space(page, usable_space); + let record = Record::new([OwnedValue::Integer(i as i64)].to_vec()); + let mut payload: Vec = Vec::new(); + fill_cell_payload( + page.page_type(), + Some(i as u64), + &mut payload, + &record, + 4096, + db.pager.clone(), + ); if (free as usize) < payload.len() - 2 { // do not try to insert overflow pages because they require balancing continue; } + insert_into_cell(page, &payload, cell_idx, 4096).unwrap(); assert!(page.overflow_cells.is_empty()); total_size += payload.len() as u16 + 2; cells.push(Cell { pos: i, payload }); @@ -3608,6 +3628,20 @@ mod tests { } } + #[test] + pub fn test_free_space() { + let db = get_database(); + let page = get_page(2); + let page = page.get_contents(); + let header_size = 8; + let usable_space = 4096; + + let record = Record::new([OwnedValue::Integer(0)].to_vec()); + let payload = add_record(0, 0, page, record, &db); + let free = compute_free_space(page, usable_space); + assert_eq!(free, 4096 - payload.len() as u16 - 2 - header_size); + } + #[test] pub fn test_defragment_1() { let db = get_database(); @@ -3751,6 +3785,85 @@ mod tests { let _ = add_record(0, 1, page, record, &db); } + #[test] + pub fn test_fuzz_victim_2() { + let db = get_database(); + + let page = get_page(2); + let usable_space = 4096; + let insert = |pos, page| { + let record = Record::new([OwnedValue::Integer(0)].to_vec()); + let _ = add_record(0, pos, page, record, &db); + }; + let drop = |pos, page| { + drop_cell(page, pos, usable_space).unwrap(); + }; + let defragment = |page| { + defragment_page(page, usable_space); + }; + defragment(page.get_contents()); + defragment(page.get_contents()); + insert(0, page.get_contents()); + drop(0, page.get_contents()); + insert(0, page.get_contents()); + drop(0, page.get_contents()); + insert(0, page.get_contents()); + defragment(page.get_contents()); + defragment(page.get_contents()); + drop(0, page.get_contents()); + defragment(page.get_contents()); + insert(0, page.get_contents()); + drop(0, page.get_contents()); + insert(0, page.get_contents()); + insert(1, page.get_contents()); + insert(1, page.get_contents()); + insert(0, page.get_contents()); + drop(3, page.get_contents()); + drop(2, page.get_contents()); + compute_free_space(page.get_contents(), usable_space); + } + + #[test] + pub fn test_fuzz_victim_3() { + let db = get_database(); + + let page = get_page(2); + let usable_space = 4096; + let insert = |pos, page| { + let record = Record::new([OwnedValue::Integer(0)].to_vec()); + let _ = add_record(0, pos, page, record, &db); + }; + let drop = |pos, page| { + drop_cell(page, pos, usable_space).unwrap(); + }; + let defragment = |page| { + defragment_page(page, usable_space); + }; + let record = Record::new([OwnedValue::Integer(0)].to_vec()); + let mut payload: Vec = Vec::new(); + fill_cell_payload( + page.get_contents().page_type(), + Some(0), + &mut payload, + &record, + 4096, + db.pager.clone(), + ); + insert(0, page.get_contents()); + defragment(page.get_contents()); + insert(0, page.get_contents()); + defragment(page.get_contents()); + insert(0, page.get_contents()); + drop(2, page.get_contents()); + drop(0, page.get_contents()); + let free = compute_free_space(page.get_contents(), usable_space); + let total_size = payload.len() + 2; + assert_eq!( + free, + usable_space - page.get_contents().header_size() as u16 - total_size as u16 + ); + dbg!(free); + } #[test] pub fn btree_insert_sequential() { let (pager, root_page) = empty_btree();