diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 94fe30750..e66f70510 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -2155,6 +2155,7 @@ fn find_free_cell(page_ref: &PageContent, usable_space: u16, amount: usize) -> u while pc <= maxpc { let next = u16::from_be_bytes(buf[pc..pc + 2].try_into().unwrap()); let size = u16::from_be_bytes(buf[pc + 2..pc + 4].try_into().unwrap()); + dbg!(next, size); if amount <= size as usize { if amount == size as usize { // delete whole thing @@ -2347,68 +2348,57 @@ pub fn page_insert_array( /// This function also updates the freeblock list in the page. /// Freeblocks are used to keep track of free space in the page, /// and are organized as a linked list. -fn free_cell_range(page: &mut PageContent, offset: u16, len: u16, usable_space: u16) { +fn free_cell_range(page: &mut PageContent, mut offset: u16, len: u16, usable_space: u16) { + let mut size = len; + let mut end = offset + len; + let mut pointer_to_pc = page.offset as u16 + 1; // if the freeblock list is empty, we set this block as the first freeblock in the page header. - if page.first_freeblock() == 0 { - page.write_u16(offset as usize, 0); // next freeblock = null - page.write_u16(offset as usize + 2, len); // size of this freeblock - page.write_u16(PAGE_HEADER_OFFSET_FIRST_FREEBLOCK, offset); // first freeblock in page = this block - return; - } - let first_block = page.first_freeblock(); - - // if the freeblock list is not empty, and the offset is less than the first freeblock, - // we insert this block at the head of the list - if offset < first_block { - page.write_u16(offset as usize, first_block); // next freeblock = previous first freeblock - page.write_u16(offset as usize + 2, len); // size of this freeblock - page.write_u16(PAGE_HEADER_OFFSET_FIRST_FREEBLOCK, offset); // first freeblock in page = this block - return; - } - - // if we clear space that is at the start of the cell content area, - // we need to update the cell content area pointer forward to account for the removed space - // FIXME: is offset ever < cell_content_area? cell content area grows leftwards and the pointer - // is to the start of the last allocated cell. should we assert!(offset >= page.cell_content_area()) - // and change this to if offset == page.cell_content_area()? - if offset <= page.cell_content_area() { - // FIXME: remove the line directly below this, it does not change anything. - page.write_u16(PAGE_HEADER_OFFSET_FIRST_FREEBLOCK, page.first_freeblock()); - page.write_u16(PAGE_HEADER_OFFSET_CELL_CONTENT_AREA, offset + len); - return; - } - - // if the freeblock list is not empty, and the offset is greater than the first freeblock, - // then we need to do some more calculation to figure out where to insert the freeblock - // in the freeblock linked list. - let maxpc = usable_space; - - let mut pc = first_block; - let mut prev = first_block; - - dbg!(pc, prev, offset); - while pc <= maxpc && pc < offset && pc != 0 { - let next = page.read_u16(pc as usize); - prev = pc; - pc = next; - dbg!(pc, prev); - } - - if pc == 0 || pc >= maxpc { - // insert into tail - let offset = offset as usize; - let prev = prev as usize; - page.write_u16(prev, offset as u16); - page.write_u16(offset, 0); - page.write_u16(offset + 2, len); + let pc = if page.first_freeblock() == 0 { + 0 } else { - // insert in between - let next = page.read_u16(pc as usize); - let offset = offset as usize; - let prev = prev as usize; - page.write_u16(prev, offset as u16); - page.write_u16(offset, next); - page.write_u16(offset + 2, len); + // if the freeblock list is not empty, and the offset is greater than the first freeblock, + // then we need to do some more calculation to figure out where to insert the freeblock + // in the freeblock linked list. + let first_block = page.first_freeblock(); + let maxpc = usable_space; + + let mut pc = first_block; + + dbg!(pc, pointer_to_pc, offset); + while pc <= maxpc && pc < offset && pc != 0 { + let next = page.read_u16_no_offset(pc as usize); + pointer_to_pc = pc; + pc = next; + dbg!(pc, pointer_to_pc); + } + let mut removed_fragmentation = 0; + if pc > 0 && offset + len + 3 >= pc { + removed_fragmentation = (pc - end) as u8; + end = pc + page.read_u16_no_offset(pc as usize); + size = end - offset; + } + + if pointer_to_pc > page.offset as u16 + 1 { + let prev_end = pointer_to_pc + page.read_u16_no_offset(pointer_to_pc as usize + 2); + if prev_end + 3 >= offset { + removed_fragmentation += (offset - prev_end) as u8; + size = end - pointer_to_pc; + offset = pointer_to_pc; + } + } + let frag = page.num_frag_free_bytes() - removed_fragmentation; + page.write_u8(PAGE_HEADER_OFFSET_FRAGMENTED_BYTES_COUNT, frag); + + pc + }; + + if offset < page.cell_content_area() { + page.write_u16(PAGE_HEADER_OFFSET_FIRST_FREEBLOCK, pc); + page.write_u16(PAGE_HEADER_OFFSET_CELL_CONTENT_AREA, offset + len); + } else { + page.write_u16_no_offset(pointer_to_pc as usize, offset); + page.write_u16_no_offset(offset as usize, pc); + page.write_u16_no_offset(offset as usize + 2, size); } } @@ -2495,6 +2485,7 @@ fn defragment_page(page: &PageContent, usable_space: u16) { page.write_u16(PAGE_HEADER_OFFSET_CELL_CONTENT_AREA, cbrk as u16); // set free block to 0, unused spaced can be retrieved from gap between cell pointer end and content start page.write_u16(PAGE_HEADER_OFFSET_FIRST_FREEBLOCK, 0); + page.write_u16(PAGE_HEADER_OFFSET_FRAGMENTED_BYTES_COUNT, 0); // set unused space to 0 dbg!(cbrk, first_cell); write_buf[first_cell as usize..cbrk as usize].fill(0); @@ -2546,7 +2537,7 @@ fn insert_into_cell(page: &mut PageContent, payload: &[u8], cell_idx: usize, usa ); } // ...and insert new cell pointer at the current index - page.write_u16(cell_pointer_cur_idx - page.offset, new_cell_data_pointer); + page.write_u16_no_offset(cell_pointer_cur_idx, new_cell_data_pointer); // update cell count let new_n_cells = (page.cell_count() + 1) as u16; @@ -2593,7 +2584,6 @@ fn compute_free_space(page: &PageContent, usable_space: u16) -> u16 { // #3 is computed by iterating over the freeblocks linked list let mut cur_freeblock_ptr = page.first_freeblock() as usize; - let page_buf = page.as_ptr(); if cur_freeblock_ptr > 0 { if cur_freeblock_ptr < cell_content_area_start as usize { // Freeblocks exist in the cell content area e.g. after deletions @@ -2605,16 +2595,8 @@ fn compute_free_space(page: &PageContent, usable_space: u16) -> u16 { let mut size = 0; loop { // TODO: check corruption icellast - next = u16::from_be_bytes( - page_buf[cur_freeblock_ptr..cur_freeblock_ptr + 2] - .try_into() - .unwrap(), - ) as usize; // first 2 bytes in freeblock = next freeblock pointer - size = u16::from_be_bytes( - page_buf[cur_freeblock_ptr + 2..cur_freeblock_ptr + 4] - .try_into() - .unwrap(), - ) as usize; // next 2 bytes in freeblock = size of current freeblock + next = page.read_u16_no_offset(cur_freeblock_ptr) as usize; // first 2 bytes in freeblock = next freeblock pointer + size = page.read_u16_no_offset(cur_freeblock_ptr + 2) as usize; // next 2 bytes in freeblock = size of current freeblock free_space_bytes += size; dbg!(cur_freeblock_ptr, next, size); // Freeblocks are in order from left to right on the page, @@ -2657,6 +2639,7 @@ fn allocate_cell_space(page_ref: &PageContent, amount: u16, usable_space: u16) - let gap = cell_offset + 2 * page_ref.cell_count(); let mut top = page_ref.cell_content_area() as usize; + dbg!("allocate_cell_space"); // there are free blocks and enough space if page_ref.first_freeblock() != 0 && gap + 2 <= top { // find slot @@ -3650,17 +3633,16 @@ mod tests { let mut cells = Vec::new(); let usable_space = 4096; let total_cells = 10; - let mut ticks = 0; - let mut rng = ChaCha8Rng::seed_from_u64(0); - while ticks > 0 { - ticks -= 1; - } - let mut total_size = 0; - for i in 0..total_cells { + let mut i = 1000; + let seed = thread_rng().gen(); + let mut rng = ChaCha8Rng::seed_from_u64(seed); + while i > 0 { + i -= 1; match rng.gen_range(0..3) { 0 => { // allow appends with extra place to insert let cell_idx = rng.gen_range(0..=page.cell_count()); + println!("insert {}", cell_idx); let record = OwnedRecord::new([OwnedValue::Integer(i as i64)].to_vec()); let payload = add_record(i, cell_idx, page, record, &db); let free = compute_free_space(page, usable_space); @@ -3673,10 +3655,12 @@ mod tests { cells.push(Cell { pos: i, payload }); } 1 => { + dbg!("drop"); if page.cell_count() == 0 { continue; } let cell_idx = rng.gen_range(0..page.cell_count()); + println!("drop {}", cell_idx); let (_, len) = page.cell_get_raw_region( cell_idx, payload_overflow_threshold_max(page.page_type(), 4096), @@ -3688,10 +3672,12 @@ mod tests { cells.remove(cell_idx); } 2 => { + println!("defragment_page"); defragment_page(page, usable_space); } _ => unreachable!(), } + dbg!("compute free"); let free = compute_free_space(page, usable_space); assert_eq!(free, 4096 - total_size - header_size); } @@ -3793,6 +3779,54 @@ mod tests { } } + #[test] + fn test_drop_a_few_insert() { + let db = get_database(); + + let page = get_page(2); + let page = page.get_contents(); + let usable_space = 4096; + + let record = OwnedRecord::new([OwnedValue::Integer(0 as i64)].to_vec()); + let payload = add_record(0, 0, page, record, &db); + let record = OwnedRecord::new([OwnedValue::Integer(1 as i64)].to_vec()); + let _ = add_record(1, 1, page, record, &db); + let record = OwnedRecord::new([OwnedValue::Integer(2 as i64)].to_vec()); + let _ = add_record(2, 2, page, record, &db); + + drop_cell(page, 1, usable_space); + drop_cell(page, 1, usable_space); + + ensure_cell(page, 0, &payload); + } + + #[test] + fn test_fuzz_victim_1() { + set_breakpoint_panic(); + let db = get_database(); + + let page = get_page(2); + let page = page.get_contents(); + let usable_space = 4096; + + let record = OwnedRecord::new([OwnedValue::Integer(0 as i64)].to_vec()); + let _ = add_record(0, 0, page, record, &db); + + let record = OwnedRecord::new([OwnedValue::Integer(0 as i64)].to_vec()); + let _ = add_record(0, 0, page, record, &db); + drop_cell(page, 0, usable_space); + + defragment_page(page, usable_space); + + let record = OwnedRecord::new([OwnedValue::Integer(0 as i64)].to_vec()); + let _ = add_record(0, 1, page, record, &db); + + drop_cell(page, 0, usable_space); + + let record = OwnedRecord::new([OwnedValue::Integer(0 as i64)].to_vec()); + let _ = add_record(0, 1, page, record, &db); + } + fn set_breakpoint_panic() { // Set custom panic hook at start of program panic::set_hook(Box::new(|panic_info| {