diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 4ae59cd74..ca376055c 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -1657,7 +1657,7 @@ impl BTreeCursor { } pub fn rewind(&mut self) -> Result> { - if let Some(_) = &self.mv_cursor { + if self.mv_cursor.is_some() { let (rowid, record) = return_if_io!(self.get_next_record(None)); self.rowid.replace(rowid); self.record.replace(record); @@ -2313,41 +2313,49 @@ impl CellArray { fn find_free_cell(page_ref: &PageContent, usable_space: u16, amount: usize) -> Result { // NOTE: freelist is in ascending order of keys and pc // unuse_space is reserved bytes at the end of page, therefore we must substract from maxpc - let mut pc = page_ref.first_freeblock() as usize; let mut prev_pc = page_ref.offset + PAGE_HEADER_OFFSET_FIRST_FREEBLOCK; + let mut pc = page_ref.first_freeblock() as usize; + let maxpc = usable_space as usize - amount; - let buf = page_ref.as_ptr(); - - let usable_space = usable_space as usize; - let maxpc = usable_space - amount; while pc <= maxpc { - let next = u16::from_be_bytes(buf[pc..pc + 2].try_into().unwrap()); - let size = u16::from_be_bytes(buf[pc + 2..pc + 4].try_into().unwrap()); - if amount <= size as usize { - if amount == size as usize { - // delete whole thing - page_ref.write_u16(PAGE_HEADER_OFFSET_FIRST_FREEBLOCK, next); - } else { - // take only the part we are interested in by reducing the size - let new_size = size - amount as u16; - // size includes 4 bytes of freeblock - // we need to leave the free block at least - if new_size >= 4 { - buf[pc + 2..pc + 4].copy_from_slice(&new_size.to_be_bytes()); - } else { - // increase fragment size and delete entry from free list - buf[prev_pc..prev_pc + 2].copy_from_slice(&next.to_be_bytes()); - let frag = page_ref.num_frag_free_bytes() + new_size as u8; - page_ref.write_u8(PAGE_HEADER_OFFSET_FRAGMENTED_BYTES_COUNT, frag); - } - pc += new_size as usize; - } - return Ok(pc); + if pc + 4 > usable_space as usize { + return_corrupt!("Free block header extends beyond page"); } + + let next = page_ref.read_u16_no_offset(pc); + let size = page_ref.read_u16_no_offset(pc + 2); + + if amount <= size as usize { + let new_size = size as usize - amount; + if new_size < 4 { + // The code is checking if using a free slot that would leave behind a very small fragment (x < 4 bytes) + // would cause the total fragmentation to exceed the limit of 60 bytes + // check sqlite docs https://www.sqlite.org/fileformat.html#:~:text=A%20freeblock%20requires,not%20exceed%2060 + if page_ref.num_frag_free_bytes() > 57 { + return Ok(0); + } + // Delete the slot from freelist and update the page's fragment count. + page_ref.write_u16(prev_pc, next); + let frag = page_ref.num_frag_free_bytes() + new_size as u8; + page_ref.write_u8(PAGE_HEADER_OFFSET_FRAGMENTED_BYTES_COUNT, frag); + return Ok(pc); + } else if new_size + pc > maxpc { + return_corrupt!("Free block extends beyond page end"); + } else { + // Requested amount fits inside the current free slot so we reduce its size + // to account for newly allocated space. + page_ref.write_u16(pc + 2, new_size as u16); + return Ok(pc + new_size); + } + } + prev_pc = pc; pc = next as usize; - if pc <= prev_pc && pc != 0 { - return_corrupt!("Free list not in ascending order"); + if pc <= prev_pc { + if pc != 0 { + return_corrupt!("Free list not in ascending order"); + } + return Ok(0); } } if pc > maxpc + amount - 4 { @@ -2522,6 +2530,14 @@ fn free_cell_range( len: u16, usable_space: u16, ) -> Result<()> { + if len < 4 { + return_corrupt!("Minimum cell size is 4"); + } + + if offset > usable_space.saturating_sub(4) { + return_corrupt!("Start offset beyond usable space"); + } + let mut size = len; let mut end = offset + len; let mut pointer_to_pc = page.offset as u16 + 1; @@ -2583,7 +2599,6 @@ fn free_cell_range( } let frag = page.num_frag_free_bytes() - removed_fragmentation; page.write_u8(PAGE_HEADER_OFFSET_FRAGMENTED_BYTES_COUNT, frag); - pc }; @@ -2601,6 +2616,7 @@ fn free_cell_range( page.write_u16_no_offset(offset as usize, pc); page.write_u16_no_offset(offset as usize + 2, size); } + Ok(()) } @@ -3036,7 +3052,6 @@ mod tests { use std::sync::Arc; use std::sync::Mutex; - use rand::{thread_rng, Rng}; use tempfile::TempDir; use crate::{ @@ -3923,7 +3938,9 @@ mod tests { let mut cells = Vec::new(); let usable_space = 4096; let mut i = 1000; - let seed = thread_rng().gen(); + // let seed = thread_rng().gen(); + // let seed = 15292777653676891381; + let seed = 9261043168681395159; tracing::info!("seed {}", seed); let mut rng = ChaCha8Rng::seed_from_u64(seed); while i > 0 { @@ -3977,6 +3994,76 @@ mod tests { } } + #[test] + pub fn test_fuzz_drop_defragment_insert_issue_1085() { + // This test is used to demonstrate that issue at https://github.com/tursodatabase/limbo/issues/1085 + // is FIXED. + let db = get_database(); + let conn = db.connect().unwrap(); + + let page = get_page(2); + let page = page.get_contents(); + let header_size = 8; + + let mut total_size = 0; + let mut cells = Vec::new(); + let usable_space = 4096; + let mut i = 1000; + for seed in [15292777653676891381, 9261043168681395159] { + tracing::info!("seed {}", seed); + let mut rng = ChaCha8Rng::seed_from_u64(seed); + while i > 0 { + i -= 1; + match rng.next_u64() % 3 { + 0 => { + // allow appends with extra place to insert + let cell_idx = rng.next_u64() as usize % (page.cell_count() + 1); + let free = compute_free_space(page, usable_space); + let record = Record::new([OwnedValue::Integer(i as i64)].to_vec()); + let mut payload: Vec = Vec::new(); + fill_cell_payload( + page.page_type(), + Some(i as u64), + &mut payload, + &record, + 4096, + conn.pager.clone(), + ); + if (free as usize) < payload.len() - 2 { + // do not try to insert overflow pages because they require balancing + continue; + } + insert_into_cell(page, &payload, cell_idx, 4096).unwrap(); + assert!(page.overflow_cells.is_empty()); + total_size += payload.len() as u16 + 2; + cells.push(Cell { pos: i, payload }); + } + 1 => { + if page.cell_count() == 0 { + continue; + } + let cell_idx = rng.next_u64() as usize % page.cell_count(); + let (_, len) = page.cell_get_raw_region( + cell_idx, + payload_overflow_threshold_max(page.page_type(), 4096), + payload_overflow_threshold_min(page.page_type(), 4096), + usable_space as usize, + ); + drop_cell(page, cell_idx, usable_space).unwrap(); + total_size -= len as u16 + 2; + cells.remove(cell_idx); + } + 2 => { + defragment_page(page, usable_space); + } + _ => unreachable!(), + } + let free = compute_free_space(page, usable_space); + assert_eq!(free, 4096 - total_size - header_size); + } + } + } + #[test] pub fn test_free_space() { let db = get_database(); @@ -4001,7 +4088,7 @@ mod tests { let page = page.get_contents(); let usable_space = 4096; - let record = Record::new([OwnedValue::Integer(0 as i64)].to_vec()); + let record = Record::new([OwnedValue::Integer(0)].to_vec()); let payload = add_record(0, 0, page, record, &conn); assert_eq!(page.cell_count(), 1); @@ -4039,7 +4126,7 @@ mod tests { drop_cell(page, 0, usable_space).unwrap(); assert_eq!(page.cell_count(), 0); - let record = Record::new([OwnedValue::Integer(0 as i64)].to_vec()); + let record = Record::new([OwnedValue::Integer(0)].to_vec()); let payload = add_record(0, 0, page, record, &conn); assert_eq!(page.cell_count(), 1); diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index d3c7ff431..7ec197517 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -674,6 +674,28 @@ impl PageContent { let buf = self.as_ptr(); write_header_to_buf(buf, header); } + + pub fn debug_print_freelist(&self, usable_space: u16) { + let mut pc = self.first_freeblock() as usize; + let mut block_num = 0; + println!("---- Free List Blocks ----"); + println!("first freeblock pointer: {}", pc); + println!("cell content area: {}", self.cell_content_area()); + println!("fragmented bytes: {}", self.num_frag_free_bytes()); + + while pc != 0 && pc <= usable_space as usize { + let next = self.read_u16_no_offset(pc); + let size = self.read_u16_no_offset(pc + 2); + + println!( + "block {}: position={}, size={}, next={}", + block_num, pc, size, next + ); + pc = next as usize; + block_num += 1; + } + println!("--------------"); + } } pub fn begin_read_page(