diff --git a/core/storage/btree.rs b/core/storage/btree.rs index aa70651d8..c018f75ed 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -786,7 +786,8 @@ impl BTreeCursor { cell_payload.as_slice(), cell_idx, self.usable_space() as u16, - ); + ) + .unwrap(); contents.overflow_cells.len() }; let write_info = self @@ -1076,7 +1077,7 @@ impl BTreeCursor { cell_idx, parent_contents.cell_count() ); - drop_cell(parent_contents, cell_idx, self.usable_space() as u16); + drop_cell(parent_contents, cell_idx, self.usable_space() as u16)?; } assert_eq!( write_info.divider_cells.borrow().len(), @@ -1393,7 +1394,8 @@ impl BTreeCursor { &new_divider_cell, first_divider_cell + i, self.usable_space() as u16, - ); + ) + .unwrap(); } // TODO: update pages let mut done = vec![false; sibling_count_new]; @@ -1796,11 +1798,12 @@ impl BTreeCursor { &cell_payload, cell_idx, self.usable_space() as u16, - ); - drop_cell(contents, cell_idx, self.usable_space() as u16); + ) + .unwrap(); + drop_cell(contents, cell_idx, self.usable_space() as u16)?; } else { // For leaf nodes, simply remove the cell - drop_cell(contents, cell_idx, self.usable_space() as u16); + drop_cell(contents, cell_idx, self.usable_space() as u16)?; } // TODO(Krishna): Implement balance after delete. I will implement after balance_nonroot is extended. @@ -2064,7 +2067,7 @@ impl CellArray { } /// Try to find a free block available and allocate it if found -fn find_free_cell(page_ref: &PageContent, usable_space: u16, amount: usize) -> usize { +fn find_free_cell(page_ref: &PageContent, usable_space: u16, amount: usize) -> Result { // NOTE: freelist is in ascending order of keys and pc // unuse_space is reserved bytes at the end of page, therefore we must substract from maxpc let mut pc = page_ref.first_freeblock() as usize; @@ -2095,13 +2098,23 @@ fn find_free_cell(page_ref: &PageContent, usable_space: u16, amount: usize) -> u page_ref.write_u8(PAGE_HEADER_OFFSET_FRAGMENTED_BYTES_COUNT, frag); } pc += new_size as usize; - return pc; + return Ok(pc); } } prev_pc = pc; + if pc <= prev_pc && pc != 0 { + return Err(LimboError::Corrupt( + "Free list not in ascending order".into(), + )); + } pc = next as usize; } - 0 + if pc > maxpc + amount - 4 { + return Err(LimboError::Corrupt( + "Free block chain extends beyond page end".into(), + )); + } + Ok(0) } pub fn btree_init_page(page: &PageRef, page_type: PageType, offset: usize, usable_space: u16) { @@ -2132,7 +2145,7 @@ fn edit_page( number_new_cells: usize, cell_array: &CellArray, usable_space: u16, -) { +) -> Result<()> { tracing::trace!( "edit_page start_old_cells={} start_new_cells={} number_new_cells={} cell_array={}", start_old_cells, @@ -2150,7 +2163,7 @@ fn edit_page( start_new_cells - start_old_cells, cell_array, usable_space, - ); + )?; // shift pointers left let buf = page.as_ptr(); let (start, _) = page.cell_pointer_array_offset_and_size(); @@ -2167,7 +2180,7 @@ fn edit_page( end_old_cells - end_new_cells, cell_array, usable_space, - ); + )?; assert!(page.cell_count() >= number_tail_removed); count_cells -= number_tail_removed; } @@ -2176,7 +2189,7 @@ fn edit_page( // TODO: add to start if start_new_cells < start_old_cells { let count = number_new_cells.min(start_old_cells - start_new_cells); - page_insert_array(page, start_new_cells, count, cell_array, 0, usable_space); + page_insert_array(page, start_new_cells, count, cell_array, 0, usable_space)?; count_cells += count; } // TODO: overflow cells @@ -2193,7 +2206,7 @@ fn edit_page( cell_array, cell_idx, usable_space, - ); + )?; } } // TODO: append cells to end @@ -2204,9 +2217,10 @@ fn edit_page( cell_array, count_cells, usable_space, - ); + )?; // TODO: noverflow page.write_u16(PAGE_HEADER_OFFSET_CELL_COUNT, number_new_cells as u16); + Ok(()) } fn page_free_array( @@ -2215,7 +2229,7 @@ fn page_free_array( count: usize, cell_array: &CellArray, usable_space: u16, -) -> usize { +) -> Result { let buf = &mut page.as_ptr()[page.offset..usable_space as usize]; let buf_range = buf.as_ptr_range(); let mut number_of_cells_removed = 0; @@ -2233,12 +2247,12 @@ fn page_free_array( // TODO: remove pointer too let offset = (cell_pointer.start as usize - buf_range.start as usize) as u16; let len = (cell_pointer.end as usize - cell_pointer.start as usize) as u16; - free_cell_range(page, offset, len, usable_space); + free_cell_range(page, offset, len, usable_space)?; page.write_u16(PAGE_HEADER_OFFSET_CELL_COUNT, page.cell_count() as u16 - 1); number_of_cells_removed += 1; } } - number_of_cells_removed + Ok(number_of_cells_removed) } pub fn page_insert_array( page: &mut PageContent, @@ -2247,20 +2261,26 @@ pub fn page_insert_array( cell_array: &CellArray, mut start_insert: usize, usable_space: u16, -) { +) -> Result<()> { // TODO: implement faster algorithm, this is doing extra work that's not needed. // See pageInsertArray to understand faster way. for i in first..first + count { - insert_into_cell(page, cell_array.cells[i], start_insert, usable_space); + insert_into_cell(page, cell_array.cells[i], start_insert, usable_space)?; start_insert += 1; } + Ok(()) } /// Free the range of bytes that a cell occupies. /// This function also updates the freeblock list in the page. /// Freeblocks are used to keep track of free space in the page, /// and are organized as a linked list. -fn free_cell_range(page: &mut PageContent, mut offset: u16, len: u16, usable_space: u16) { +fn free_cell_range( + page: &mut PageContent, + mut offset: u16, + len: u16, + usable_space: u16, +) -> Result<()> { let mut size = len; let mut end = offset + len; let mut pointer_to_pc = page.offset as u16 + 1; @@ -2281,21 +2301,40 @@ fn free_cell_range(page: &mut PageContent, mut offset: u16, len: u16, usable_spa pointer_to_pc = pc; pc = next; } + + if pc > usable_space - 4 { + return Err(LimboError::Corrupt("Free block beyond usable space".into())); + } let mut removed_fragmentation = 0; if pc > 0 && offset + len + 3 >= pc { removed_fragmentation = (pc - end) as u8; + + if end > pc { + return Err(LimboError::Corrupt("Invalid block overlap".into())); + } end = pc + page.read_u16_no_offset(pc as usize); + if end > usable_space { + return Err(LimboError::Corrupt( + "Coalesced block extends beyond page".into(), + )); + } size = end - offset; } if pointer_to_pc > page.offset as u16 + 1 { let prev_end = pointer_to_pc + page.read_u16_no_offset(pointer_to_pc as usize + 2); if prev_end + 3 >= offset { + if prev_end > offset { + return Err(LimboError::Corrupt("Invalid previous block overlap".into())); + } removed_fragmentation += (offset - prev_end) as u8; size = end - pointer_to_pc; offset = pointer_to_pc; } } + if removed_fragmentation > page.num_frag_free_bytes() { + return Err(LimboError::Corrupt("Invalid fragmentation count".into())); + } let frag = page.num_frag_free_bytes() - removed_fragmentation; page.write_u8(PAGE_HEADER_OFFSET_FRAGMENTED_BYTES_COUNT, frag); @@ -2303,6 +2342,12 @@ fn free_cell_range(page: &mut PageContent, mut offset: u16, len: u16, usable_spa }; if offset <= page.cell_content_area() { + if offset < page.cell_content_area() { + return Err(LimboError::Corrupt("Free block before content area".into())); + } + if offset != PAGE_HEADER_OFFSET_FIRST_FREEBLOCK as u16 { + return Err(LimboError::Corrupt("Invalid content area merge".into())); + } page.write_u16(PAGE_HEADER_OFFSET_FIRST_FREEBLOCK, pc); page.write_u16(PAGE_HEADER_OFFSET_CELL_CONTENT_AREA, offset + len); } else { @@ -2310,6 +2355,7 @@ fn free_cell_range(page: &mut PageContent, mut offset: u16, len: u16, usable_spa page.write_u16_no_offset(offset as usize, pc); page.write_u16_no_offset(offset as usize + 2, size); } + Ok(()) } /// Defragment a page. This means packing all the cells to the end of the page. @@ -2378,7 +2424,12 @@ fn defragment_page(page: &PageContent, usable_space: u16) { /// insert_into_cell() is called from insert_into_page(), /// and the overflow cell count is used to determine if the page overflows, /// i.e. whether we need to balance the btree after the insert. -fn insert_into_cell(page: &mut PageContent, payload: &[u8], cell_idx: usize, usable_space: u16) { +fn insert_into_cell( + page: &mut PageContent, + payload: &[u8], + cell_idx: usize, + usable_space: u16, +) -> Result<()> { assert!( cell_idx <= page.cell_count(), "attempting to add cell to an incorrect place cell_idx={} cell_count={}", @@ -2394,11 +2445,11 @@ fn insert_into_cell(page: &mut PageContent, payload: &[u8], cell_idx: usize, usa index: cell_idx, payload: Pin::new(Vec::from(payload)), }); - return; + return Ok(()); } // TODO: insert into cell payload in internal page - let new_cell_data_pointer = allocate_cell_space(page, payload.len() as u16, usable_space); + let new_cell_data_pointer = allocate_cell_space(page, payload.len() as u16, usable_space)?; let buf = page.as_ptr(); // copy data @@ -2423,6 +2474,7 @@ fn insert_into_cell(page: &mut PageContent, payload: &[u8], cell_idx: usize, usa // update cell count let new_n_cells = (page.cell_count() + 1) as u16; page.write_u16(PAGE_HEADER_OFFSET_CELL_COUNT, new_n_cells); + Ok(()) } /// Free blocks can be zero, meaning the "real free space" that can be used to allocate is expected to be between first cell byte @@ -2512,7 +2564,7 @@ fn compute_free_space(page: &PageContent, usable_space: u16) -> u16 { } /// Allocate space for a cell on a page. -fn allocate_cell_space(page_ref: &PageContent, amount: u16, usable_space: u16) -> u16 { +fn allocate_cell_space(page_ref: &PageContent, amount: u16, usable_space: u16) -> Result { let amount = amount as usize; let (cell_offset, _) = page_ref.cell_pointer_array_offset_and_size(); @@ -2522,9 +2574,9 @@ fn allocate_cell_space(page_ref: &PageContent, amount: u16, usable_space: u16) - // there are free blocks and enough space if page_ref.first_freeblock() != 0 && gap + 2 <= top { // find slot - let pc = find_free_cell(page_ref, usable_space, amount); + let pc = find_free_cell(page_ref, usable_space, amount)?; if pc != 0 { - return pc as u16; + return Ok(pc as u16); } /* fall through, we might need to defragment */ } @@ -2540,7 +2592,7 @@ fn allocate_cell_space(page_ref: &PageContent, amount: u16, usable_space: u16) - page_ref.write_u16(PAGE_HEADER_OFFSET_CELL_CONTENT_AREA, top as u16); assert!(top + amount <= usable_space as usize); - top as u16 + Ok(top as u16) } /// Fill in the cell payload with the record. @@ -2686,14 +2738,14 @@ fn payload_overflow_threshold_min(_page_type: PageType, usable_space: u16) -> us /// Drop a cell from a page. /// This is done by freeing the range of bytes that the cell occupies. -fn drop_cell(page: &mut PageContent, cell_idx: usize, usable_space: u16) { +fn drop_cell(page: &mut PageContent, cell_idx: usize, usable_space: u16) -> Result<()> { let (cell_start, cell_len) = page.cell_get_raw_region( cell_idx, payload_overflow_threshold_max(page.page_type(), usable_space), payload_overflow_threshold_min(page.page_type(), usable_space), usable_space as usize, ); - free_cell_range(page, cell_start as u16, cell_len as u16, usable_space); + free_cell_range(page, cell_start as u16, cell_len as u16, usable_space)?; if page.cell_count() > 1 { shift_pointers_left(page, cell_idx); } else { @@ -2701,6 +2753,7 @@ fn drop_cell(page: &mut PageContent, cell_idx: usize, usable_space: u16) { page.write_u16(PAGE_HEADER_OFFSET_FIRST_FREEBLOCK, 0); } page.write_u16(PAGE_HEADER_OFFSET_CELL_COUNT, page.cell_count() as u16 - 1); + Ok(()) } /// Shift pointers to the left once starting from a cell position @@ -2816,7 +2869,7 @@ mod tests { 4096, db.pager.clone(), ); - insert_into_cell(page, &payload, pos, 4096); + insert_into_cell(page, &payload, pos, 4096).unwrap(); payload } @@ -2867,7 +2920,7 @@ mod tests { ensure_cell(page, i, &cell.payload); } cells.remove(1); - drop_cell(page, 1, usable_space); + drop_cell(page, 1, usable_space).unwrap(); for (i, cell) in cells.iter().enumerate() { ensure_cell(page, i, &cell.payload); @@ -3178,7 +3231,7 @@ mod tests { let mut new_cells = Vec::new(); for cell in cells { if cell.pos % 2 == 1 { - drop_cell(page, cell.pos - removed, usable_space); + drop_cell(page, cell.pos - removed, usable_space).unwrap(); removed += 1; } else { new_cells.push(cell); @@ -3444,7 +3497,7 @@ mod tests { ensure_cell(page, i, &cell.payload); } cells.remove(1); - drop_cell(page, 1, usable_space); + drop_cell(page, 1, usable_space).unwrap(); for (i, cell) in cells.iter().enumerate() { ensure_cell(page, i, &cell.payload); @@ -3484,7 +3537,7 @@ mod tests { let mut new_cells = Vec::new(); for cell in cells { if cell.pos % 2 == 1 { - drop_cell(page, cell.pos - removed, usable_space); + drop_cell(page, cell.pos - removed, usable_space).unwrap(); removed += 1; } else { new_cells.push(cell); @@ -3546,7 +3599,7 @@ mod tests { payload_overflow_threshold_min(page.page_type(), 4096), usable_space as usize, ); - drop_cell(page, cell_idx, usable_space); + drop_cell(page, cell_idx, usable_space).unwrap(); total_size -= len as u16 + 2; cells.remove(cell_idx); } @@ -3602,7 +3655,7 @@ mod tests { let payload = add_record(0, 0, page, record, &db); assert_eq!(page.cell_count(), 1); - drop_cell(page, 0, usable_space); + drop_cell(page, 0, usable_space).unwrap(); assert_eq!(page.cell_count(), 0); let record = Record::new([OwnedValue::Integer(0 as i64)].to_vec()); @@ -3638,7 +3691,7 @@ mod tests { for i in 0..100 { assert_eq!(page.cell_count(), 1); - drop_cell(page, 0, usable_space); + drop_cell(page, 0, usable_space).unwrap(); assert_eq!(page.cell_count(), 0); let record = Record::new([OwnedValue::Integer(0 as i64)].to_vec()); @@ -3671,8 +3724,8 @@ mod tests { let record = Record::new([OwnedValue::Integer(2 as i64)].to_vec()); let _ = add_record(2, 2, page, record, &db); - drop_cell(page, 1, usable_space); - drop_cell(page, 1, usable_space); + drop_cell(page, 1, usable_space).unwrap(); + drop_cell(page, 1, usable_space).unwrap(); ensure_cell(page, 0, &payload); } @@ -3691,14 +3744,14 @@ mod tests { let record = Record::new([OwnedValue::Integer(0 as i64)].to_vec()); let _ = add_record(0, 0, page, record, &db); - drop_cell(page, 0, usable_space); + drop_cell(page, 0, usable_space).unwrap(); defragment_page(page, usable_space); let record = Record::new([OwnedValue::Integer(0 as i64)].to_vec()); let _ = add_record(0, 1, page, record, &db); - drop_cell(page, 0, usable_space); + drop_cell(page, 0, usable_space).unwrap(); let record = Record::new([OwnedValue::Integer(0 as i64)].to_vec()); let _ = add_record(0, 1, page, record, &db);