From 1d24925e215d4e57541f7966d48a0e05b6493e0e Mon Sep 17 00:00:00 2001 From: Jussi Saurio Date: Sat, 23 Aug 2025 00:21:50 +0300 Subject: [PATCH 1/5] Make fill_cell_payload() safe for async IO and cache spilling Problems: 1. fill_cell_payload() is not re-entrant because it can yield IO on allocating a new overflow page, resulting in losing some of the input data. 2. fill_cell_payload() in its current form is not safe for cache spilling because the previous overflow page in the chain of allocated overflow pages can be evicted by a spill caused by the next overflow page allocation, invalidating the page pointer and causing corruption. 3. fill_cell_payload() uses raw pointers and `unsafe` as a workaround from a previous time when we used to clone `WriteState`, resulting in hard-to-read code. Solutions: 1. Introduce a new substate to the fill_cell_payload state machine to handle re-entrancy wrt. allocating overflow pages. 2. Always pin the current overflow page so that it cannot be evicted during the overflow chain construction. Also pin the regular page the overflow chain is attached to, because it is immediately accessed after fill_cell_payload is done. 3. Remove all explicit usages of `unsafe` from `fill_cell_payload` (although our pager is ofc still extremely unsafe under the hood :] ) Note that solution 2 addresses a problem that arose in the development of page cache spilling, which is not yet implemented, but will be soon. Miscellania: 1. Renamed a bunch of variables to be clearer 2. Added more comments about what is happening in fill_cell_payload --- core/storage/btree.rs | 219 +++++++++++++++++++++++++----------------- 1 file changed, 129 insertions(+), 90 deletions(-) diff --git a/core/storage/btree.rs b/core/storage/btree.rs index ca5624220..ab263e406 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -2274,7 +2274,7 @@ impl BTreeCursor { ref mut fill_cell_payload_state, } => { return_if_io!(fill_cell_payload( - page.get().get().contents.as_ref().unwrap(), + page.get(), bkey.maybe_rowid(), new_payload, *cell_idx, @@ -5176,10 +5176,9 @@ impl BTreeCursor { fill_cell_payload_state, } => { let page = page_ref.get(); - let page_contents = page.get().contents.as_ref().unwrap(); { return_if_io!(fill_cell_payload( - page_contents, + page, *rowid, new_payload, cell_idx, @@ -6963,39 +6962,66 @@ fn allocate_cell_space( #[derive(Debug, Clone)] pub enum FillCellPayloadState { + /// Determine whether we can fit the record on the current page. + /// If yes, return immediately after copying the data. + /// Otherwise move to [CopyData] state. Start, - AllocateOverflowPages { - /// Arc because we clone [WriteState] for some reason and we use unsafe pointer dereferences in [FillCellPayloadState::AllocateOverflowPages] - /// so the underlying bytes must not be cloned in upper layers. - record_buf: Arc<[u8]>, - space_left: usize, - to_copy_buffer_ptr: *const u8, - to_copy_buffer_len: usize, - pointer: *mut u8, - pointer_to_next: *mut u8, + /// Copy the next chunk of data from the record buffer to the cell payload. + /// If we can't fit all of the remaining data on the current page, + /// move the internal state to [CopyDataState::AllocateOverflowPage] + CopyData { + /// Internal state of the copy data operation. + /// We can either be copying data or allocating an overflow page. + state: CopyDataState, + /// Track how much space we have left on the current page we are copying data into. + /// This is reset whenever a new overflow page is allocated. + space_left_on_cur_page: usize, + /// Offset into the record buffer to copy from. + src_data_offset: usize, + /// Offset into the destination buffer we are copying data into. + /// This is either: + /// - an offset in the btree page where the cell is, or + /// - an offset in an overflow page + dst_data_offset: usize, + /// If this is Some, we will copy data into this overflow page. + /// If this is None, we will copy data into the cell payload on the btree page. + /// Also: to safely form a chain of overflow pages, the current page must be pinned to the page cache + /// so that e.g. a spilling operation does not evict it to disk. + current_overflow_page: Option, }, } +#[derive(Debug, Clone, PartialEq, Eq, Copy)] +pub enum CopyDataState { + /// Copy the next chunk of data from the record buffer to the cell payload. + Copy, + /// Allocate a new overflow page if we couldn't fit all data to the current page. + AllocateOverflowPage, +} + /// Fill in the cell payload with the record. /// If the record is too large to fit in the cell, it will spill onto overflow pages. /// This function needs a separate [FillCellPayloadState] because allocating overflow pages /// may require I/O. #[allow(clippy::too_many_arguments)] fn fill_cell_payload( - page_contents: &PageContent, + page: PageRef, int_key: Option, cell_payload: &mut Vec, cell_idx: usize, record: &ImmutableRecord, usable_space: usize, pager: Rc, - state: &mut FillCellPayloadState, + fill_cell_payload_state: &mut FillCellPayloadState, ) -> Result> { + let overflow_page_pointer_size = 4; + let overflow_page_data_size = usable_space - overflow_page_pointer_size; loop { - match state { + let record_buf = record.get_payload(); + match fill_cell_payload_state { FillCellPayloadState::Start => { - // TODO: make record raw from start, having to serialize is not good - let record_buf: Arc<[u8]> = Arc::from(record.get_payload()); + page.pin(); // We need to pin this page because we will be accessing its contents after fill_cell_payload is done. + let page_contents = page.get().contents.as_ref().unwrap(); let page_type = page_contents.page_type(); // fill in header @@ -7024,25 +7050,25 @@ fn fill_cell_payload( if record_buf.len() <= payload_overflow_threshold_max { // enough allowed space to fit inside a btree page cell_payload.extend_from_slice(record_buf.as_ref()); - return Ok(IOResult::Done(())); + break; } let payload_overflow_threshold_min = payload_overflow_threshold_min(page_type, usable_space); // see e.g. https://github.com/sqlite/sqlite/blob/9591d3fe93936533c8c3b0dc4d025ac999539e11/src/dbstat.c#L371 let mut space_left = payload_overflow_threshold_min - + (record_buf.len() - payload_overflow_threshold_min) % (usable_space - 4); + + (record_buf.len() - payload_overflow_threshold_min) % overflow_page_data_size; if space_left > payload_overflow_threshold_max { space_left = payload_overflow_threshold_min; } // cell_size must be equal to first value of space_left as this will be the bytes copied to non-overflow page. - let cell_size = space_left + cell_payload.len() + 4; // 4 is the number of bytes of pointer to first overflow page - let to_copy_buffer = record_buf.as_ref(); + let cell_size = space_left + cell_payload.len() + overflow_page_pointer_size; let prev_size = cell_payload.len(); - cell_payload.resize(prev_size + space_left + 4, 0); + let new_data_size = prev_size + space_left; + cell_payload.resize(new_data_size + overflow_page_pointer_size, 0); assert_eq!( cell_size, cell_payload.len(), @@ -7051,87 +7077,100 @@ fn fill_cell_payload( cell_payload.len() ); - // SAFETY: this pointer is valid because it points to a buffer in an Arc>> that lives at least as long as this function, - // and the Vec will not be mutated in FillCellPayloadState::AllocateOverflowPages, which we will move to next. - let pointer = unsafe { cell_payload.as_mut_ptr().add(prev_size) }; - let pointer_to_next = - unsafe { cell_payload.as_mut_ptr().add(prev_size + space_left) }; - - let to_copy_buffer_ptr = to_copy_buffer.as_ptr(); - let to_copy_buffer_len = to_copy_buffer.len(); - - *state = FillCellPayloadState::AllocateOverflowPages { - record_buf, - space_left, - to_copy_buffer_ptr, - to_copy_buffer_len, - pointer, - pointer_to_next, + *fill_cell_payload_state = FillCellPayloadState::CopyData { + state: CopyDataState::Copy, + space_left_on_cur_page: space_left, + src_data_offset: 0, + dst_data_offset: prev_size, + current_overflow_page: None, }; continue; } - FillCellPayloadState::AllocateOverflowPages { - record_buf: _record_buf, - space_left, - to_copy_buffer_ptr, - to_copy_buffer_len, - pointer, - pointer_to_next, + FillCellPayloadState::CopyData { + state, + src_data_offset, + space_left_on_cur_page, + dst_data_offset, + current_overflow_page, } => { - let to_copy; - { - let to_copy_buffer_ptr = *to_copy_buffer_ptr; - let to_copy_buffer_len = *to_copy_buffer_len; - let pointer = *pointer; - let space_left = *space_left; + match state { + CopyDataState::Copy => { + turso_assert!(*src_data_offset < record_buf.len(), "trying to read past end of record buffer: record_offset={} < record_buf.len()={}", src_data_offset, record_buf.len()); + let record_offset_slice = &record_buf[*src_data_offset..]; + let amount_to_copy = + (*space_left_on_cur_page).min(record_offset_slice.len()); + let record_offset_slice_to_copy = &record_offset_slice[..amount_to_copy]; + if let Some(cur_page) = current_overflow_page { + // Copy data into the current overflow page. + turso_assert!( + cur_page.is_loaded(), + "current overflow page is not loaded" + ); + turso_assert!(*dst_data_offset == overflow_page_pointer_size, "data must be copied to offset {overflow_page_pointer_size} on overflow pages, instead tried to copy to offset {dst_data_offset}"); + let contents = cur_page.get_contents(); + let buf = &mut contents.as_ptr() + [*dst_data_offset..*dst_data_offset + amount_to_copy]; + buf.copy_from_slice(record_offset_slice_to_copy); + } else { + // Copy data into the cell payload on the btree page. + let buf = &mut cell_payload + [*dst_data_offset..*dst_data_offset + amount_to_copy]; + buf.copy_from_slice(record_offset_slice_to_copy); + } - // SAFETY: we know to_copy_buffer_ptr is valid because it refers to record_buf which lives at least as long as this function, - // and the underlying bytes are not mutated in FillCellPayloadState::AllocateOverflowPages. - let to_copy_buffer = unsafe { - std::slice::from_raw_parts(to_copy_buffer_ptr, to_copy_buffer_len) - }; - to_copy = space_left.min(to_copy_buffer_len); - // SAFETY: we know 'pointer' is valid because it refers to cell_payload which lives at least as long as this function, - // and the underlying bytes are not mutated in FillCellPayloadState::AllocateOverflowPages. - unsafe { std::ptr::copy(to_copy_buffer_ptr, pointer, to_copy) }; + if record_offset_slice.len() - amount_to_copy == 0 { + if let Some(cur_page) = current_overflow_page { + cur_page.unpin(); // We can safely unpin the current overflow page now. + } + // Everything copied. + break; + } + *state = CopyDataState::AllocateOverflowPage; + *src_data_offset += amount_to_copy; + } + CopyDataState::AllocateOverflowPage => { + let new_overflow_page = return_if_io!(pager.allocate_overflow_page()); + new_overflow_page.pin(); // Pin the current overflow page so the cache won't evict it because we need this page to be in memory for the next iteration of FillCellPayloadState::CopyData. + if let Some(prev_page) = current_overflow_page { + prev_page.unpin(); // We can safely unpin the previous overflow page now. + } - let left = to_copy_buffer.len() - to_copy; - if left == 0 { - break; + turso_assert!( + new_overflow_page.is_loaded(), + "new overflow page is not loaded" + ); + let new_overflow_page_id = new_overflow_page.get().id as u32; + + if let Some(prev_page) = current_overflow_page { + // Update the previous overflow page's "next overflow page" pointer to point to the new overflow page. + turso_assert!( + prev_page.is_loaded(), + "previous overflow page is not loaded" + ); + let contents = prev_page.get_contents(); + let buf = &mut contents.as_ptr()[..overflow_page_pointer_size]; + buf.copy_from_slice(&new_overflow_page_id.to_be_bytes()); + } else { + // Update the cell payload's "next overflow page" pointer to point to the new overflow page. + let first_overflow_page_ptr_offset = + cell_payload.len() - overflow_page_pointer_size; + let buf = &mut cell_payload[first_overflow_page_ptr_offset + ..first_overflow_page_ptr_offset + overflow_page_pointer_size]; + buf.copy_from_slice(&new_overflow_page_id.to_be_bytes()); + } + + *dst_data_offset = overflow_page_pointer_size; + *space_left_on_cur_page = overflow_page_data_size; + *current_overflow_page = Some(new_overflow_page.clone()); + *state = CopyDataState::Copy; } } - - // we still have bytes to add, we will need to allocate new overflow page - // FIXME: handle page cache is full - let overflow_page = return_if_io!(pager.allocate_overflow_page()); - turso_assert!(overflow_page.is_loaded(), "overflow page is not loaded"); - { - let id = overflow_page.get().id as u32; - let contents = overflow_page.get_contents(); - - // TODO: take into account offset here? - let buf = contents.as_ptr(); - let as_bytes = id.to_be_bytes(); - // update pointer to new overflow page - // SAFETY: we know 'pointer_to_next' is valid because it refers to an offset in cell_payload which is less than space_left + 4, - // and the underlying bytes are not mutated in FillCellPayloadState::AllocateOverflowPages. - unsafe { std::ptr::copy(as_bytes.as_ptr(), *pointer_to_next, 4) }; - - *pointer = unsafe { buf.as_mut_ptr().add(4) }; - *pointer_to_next = buf.as_mut_ptr(); - *space_left = usable_space - 4; - } - - *to_copy_buffer_len -= to_copy; - // SAFETY: we know 'to_copy_buffer_ptr' is valid because it refers to record_buf which lives at least as long as this function, - // and that the offset is less than its length, and the underlying bytes are not mutated in FillCellPayloadState::AllocateOverflowPages. - *to_copy_buffer_ptr = unsafe { to_copy_buffer_ptr.add(to_copy) }; } } } + page.unpin(); Ok(IOResult::Done(())) } - /// Returns the maximum payload size (X) that can be stored directly on a b-tree page without spilling to overflow pages. /// /// For table leaf pages: X = usable_size - 35 From b4ee40dd3d048612aa823e738ba2b19e2964b61d Mon Sep 17 00:00:00 2001 From: Jussi Saurio Date: Sat, 23 Aug 2025 00:25:50 +0300 Subject: [PATCH 2/5] fix tests --- core/storage/btree.rs | 249 +++++++++++++++++++++--------------------- 1 file changed, 125 insertions(+), 124 deletions(-) diff --git a/core/storage/btree.rs b/core/storage/btree.rs index ab263e406..962e5d51a 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -7324,7 +7324,7 @@ mod tests { fn add_record( id: usize, pos: usize, - page: &mut PageContent, + page: PageRef, record: ImmutableRecord, conn: &Arc, ) -> Vec { @@ -7333,7 +7333,7 @@ mod tests { run_until_done( || { fill_cell_payload( - page, + page.clone(), Some(id as i64), &mut payload, pos, @@ -7346,7 +7346,7 @@ mod tests { &conn.pager.borrow().clone(), ) .unwrap(); - insert_into_cell(page, &payload, pos, 4096).unwrap(); + insert_into_cell(page.get_contents(), &payload, pos, 4096).unwrap(); payload } @@ -7356,17 +7356,17 @@ mod tests { let conn = db.connect().unwrap(); let page = get_page(2); let page = page.get(); - let page = page.get_contents(); let header_size = 8; let regs = &[Register::Value(Value::Integer(1))]; let record = ImmutableRecord::from_registers(regs, regs.len()); - let payload = add_record(1, 0, page, record, &conn); - assert_eq!(page.cell_count(), 1); - let free = compute_free_space(page, 4096); + let payload = add_record(1, 0, page.clone(), record, &conn); + let page_contents = page.get_contents(); + assert_eq!(page_contents.cell_count(), 1); + let free = compute_free_space(page_contents, 4096); assert_eq!(free, 4096 - payload.len() - 2 - header_size); let cell_idx = 0; - ensure_cell(page, cell_idx, &payload); + ensure_cell(page_contents, cell_idx, &payload); } struct Cell { @@ -7381,7 +7381,7 @@ mod tests { let page = get_page(2); let page = page.get(); - let page = page.get_contents(); + let page_contents = page.get_contents(); let header_size = 8; let mut total_size = 0; @@ -7390,22 +7390,22 @@ mod tests { for i in 0..3 { let regs = &[Register::Value(Value::Integer(i as i64))]; let record = ImmutableRecord::from_registers(regs, regs.len()); - let payload = add_record(i, i, page, record, &conn); - assert_eq!(page.cell_count(), i + 1); - let free = compute_free_space(page, usable_space); + let payload = add_record(i, i, page.clone(), record, &conn); + assert_eq!(page_contents.cell_count(), i + 1); + let free = compute_free_space(page_contents, usable_space); total_size += payload.len() + 2; assert_eq!(free, 4096 - total_size - header_size); cells.push(Cell { pos: i, payload }); } for (i, cell) in cells.iter().enumerate() { - ensure_cell(page, i, &cell.payload); + ensure_cell(page_contents, i, &cell.payload); } cells.remove(1); - drop_cell(page, 1, usable_space).unwrap(); + drop_cell(page_contents, 1, usable_space).unwrap(); for (i, cell) in cells.iter().enumerate() { - ensure_cell(page, i, &cell.payload); + ensure_cell(page_contents, i, &cell.payload); } } @@ -8360,7 +8360,7 @@ mod tests { let page = get_page(2); let page = page.get(); - let page = page.get_contents(); + let page_contents = page.get_contents(); let header_size = 8; let mut total_size = 0; @@ -8370,9 +8370,9 @@ mod tests { for i in 0..total_cells { let regs = &[Register::Value(Value::Integer(i as i64))]; let record = ImmutableRecord::from_registers(regs, regs.len()); - let payload = add_record(i, i, page, record, &conn); - assert_eq!(page.cell_count(), i + 1); - let free = compute_free_space(page, usable_space); + let payload = add_record(i, i, page.clone(), record, &conn); + assert_eq!(page_contents.cell_count(), i + 1); + let free = compute_free_space(page_contents, usable_space); total_size += payload.len() + 2; assert_eq!(free, 4096 - total_size - header_size); cells.push(Cell { pos: i, payload }); @@ -8382,7 +8382,7 @@ mod tests { let mut new_cells = Vec::new(); for cell in cells { if cell.pos % 2 == 1 { - drop_cell(page, cell.pos - removed, usable_space).unwrap(); + drop_cell(page_contents, cell.pos - removed, usable_space).unwrap(); removed += 1; } else { new_cells.push(cell); @@ -8390,11 +8390,11 @@ mod tests { } let cells = new_cells; for (i, cell) in cells.iter().enumerate() { - ensure_cell(page, i, &cell.payload); + ensure_cell(page_contents, i, &cell.payload); } for (i, cell) in cells.iter().enumerate() { - ensure_cell(page, i, &cell.payload); + ensure_cell(page_contents, i, &cell.payload); } } @@ -8812,7 +8812,7 @@ mod tests { let page = get_page(2); let page = page.get(); - let page = page.get_contents(); + let page_contents = page.get_contents(); let header_size = 8; let mut total_size = 0; @@ -8821,28 +8821,28 @@ mod tests { for i in 0..3 { let regs = &[Register::Value(Value::Integer(i as i64))]; let record = ImmutableRecord::from_registers(regs, regs.len()); - let payload = add_record(i, i, page, record, &conn); - assert_eq!(page.cell_count(), i + 1); - let free = compute_free_space(page, usable_space); + let payload = add_record(i, i, page.clone(), record, &conn); + assert_eq!(page_contents.cell_count(), i + 1); + let free = compute_free_space(page_contents, usable_space); total_size += payload.len() + 2; assert_eq!(free, 4096 - total_size - header_size); cells.push(Cell { pos: i, payload }); } for (i, cell) in cells.iter().enumerate() { - ensure_cell(page, i, &cell.payload); + ensure_cell(page_contents, i, &cell.payload); } cells.remove(1); - drop_cell(page, 1, usable_space).unwrap(); + drop_cell(page_contents, 1, usable_space).unwrap(); for (i, cell) in cells.iter().enumerate() { - ensure_cell(page, i, &cell.payload); + ensure_cell(page_contents, i, &cell.payload); } - defragment_page(page, usable_space, 4).unwrap(); + defragment_page(page_contents, usable_space, 4).unwrap(); for (i, cell) in cells.iter().enumerate() { - ensure_cell(page, i, &cell.payload); + ensure_cell(page_contents, i, &cell.payload); } } @@ -8853,7 +8853,7 @@ mod tests { let page = get_page(2); let page = page.get(); - let page = page.get_contents(); + let page_contents = page.get_contents(); let header_size = 8; let mut total_size = 0; @@ -8863,9 +8863,9 @@ mod tests { for i in 0..total_cells { let regs = &[Register::Value(Value::Integer(i as i64))]; let record = ImmutableRecord::from_registers(regs, regs.len()); - let payload = add_record(i, i, page, record, &conn); - assert_eq!(page.cell_count(), i + 1); - let free = compute_free_space(page, usable_space); + let payload = add_record(i, i, page.clone(), record, &conn); + assert_eq!(page_contents.cell_count(), i + 1); + let free = compute_free_space(page_contents, usable_space); total_size += payload.len() + 2; assert_eq!(free, 4096 - total_size - header_size); cells.push(Cell { pos: i, payload }); @@ -8875,7 +8875,7 @@ mod tests { let mut new_cells = Vec::new(); for cell in cells { if cell.pos % 2 == 1 { - drop_cell(page, cell.pos - removed, usable_space).unwrap(); + drop_cell(page_contents, cell.pos - removed, usable_space).unwrap(); removed += 1; } else { new_cells.push(cell); @@ -8883,13 +8883,13 @@ mod tests { } let cells = new_cells; for (i, cell) in cells.iter().enumerate() { - ensure_cell(page, i, &cell.payload); + ensure_cell(page_contents, i, &cell.payload); } - defragment_page(page, usable_space, 4).unwrap(); + defragment_page(page_contents, usable_space, 4).unwrap(); for (i, cell) in cells.iter().enumerate() { - ensure_cell(page, i, &cell.payload); + ensure_cell(page_contents, i, &cell.payload); } } @@ -8900,7 +8900,7 @@ mod tests { let page = get_page(2); let page = page.get(); - let page = page.get_contents(); + let page_contents = page.get_contents(); let header_size = 8; let mut total_size = 0; @@ -8915,8 +8915,8 @@ mod tests { match rng.next_u64() % 4 { 0 => { // allow appends with extra place to insert - let cell_idx = rng.next_u64() as usize % (page.cell_count() + 1); - let free = compute_free_space(page, usable_space); + let cell_idx = rng.next_u64() as usize % (page_contents.cell_count() + 1); + let free = compute_free_space(page_contents, usable_space); let regs = &[Register::Value(Value::Integer(i as i64))]; let record = ImmutableRecord::from_registers(regs, regs.len()); let mut payload: Vec = Vec::new(); @@ -8924,7 +8924,7 @@ mod tests { run_until_done( || { fill_cell_payload( - page, + page.clone(), Some(i as i64), &mut payload, cell_idx, @@ -8941,34 +8941,34 @@ mod tests { // do not try to insert overflow pages because they require balancing continue; } - insert_into_cell(page, &payload, cell_idx, 4096).unwrap(); - assert!(page.overflow_cells.is_empty()); + insert_into_cell(page_contents, &payload, cell_idx, 4096).unwrap(); + assert!(page_contents.overflow_cells.is_empty()); total_size += payload.len() + 2; cells.insert(cell_idx, Cell { pos: i, payload }); } 1 => { - if page.cell_count() == 0 { + if page_contents.cell_count() == 0 { continue; } - let cell_idx = rng.next_u64() as usize % page.cell_count(); - let (_, len) = page.cell_get_raw_region(cell_idx, usable_space); - drop_cell(page, cell_idx, usable_space).unwrap(); + let cell_idx = rng.next_u64() as usize % page_contents.cell_count(); + let (_, len) = page_contents.cell_get_raw_region(cell_idx, usable_space); + drop_cell(page_contents, cell_idx, usable_space).unwrap(); total_size -= len + 2; cells.remove(cell_idx); } 2 => { - defragment_page(page, usable_space, 4).unwrap(); + defragment_page(page_contents, usable_space, 4).unwrap(); } 3 => { // check cells for (i, cell) in cells.iter().enumerate() { - ensure_cell(page, i, &cell.payload); + ensure_cell(page_contents, i, &cell.payload); } - assert_eq!(page.cell_count(), cells.len()); + assert_eq!(page_contents.cell_count(), cells.len()); } _ => unreachable!(), } - let free = compute_free_space(page, usable_space); + let free = compute_free_space(page_contents, usable_space); assert_eq!(free, 4096 - total_size - header_size); } } @@ -8982,7 +8982,7 @@ mod tests { let page = get_page(2); let page = page.get(); - let page = page.get_contents(); + let page_contents = page.get_contents(); let header_size = 8; let mut total_size = 0; @@ -8997,8 +8997,8 @@ mod tests { match rng.next_u64() % 3 { 0 => { // allow appends with extra place to insert - let cell_idx = rng.next_u64() as usize % (page.cell_count() + 1); - let free = compute_free_space(page, usable_space); + let cell_idx = rng.next_u64() as usize % (page_contents.cell_count() + 1); + let free = compute_free_space(page_contents, usable_space); let regs = &[Register::Value(Value::Integer(i))]; let record = ImmutableRecord::from_registers(regs, regs.len()); let mut payload: Vec = Vec::new(); @@ -9006,7 +9006,7 @@ mod tests { run_until_done( || { fill_cell_payload( - page, + page.clone(), Some(i), &mut payload, cell_idx, @@ -9023,8 +9023,8 @@ mod tests { // do not try to insert overflow pages because they require balancing continue; } - insert_into_cell(page, &payload, cell_idx, 4096).unwrap(); - assert!(page.overflow_cells.is_empty()); + insert_into_cell(page_contents, &payload, cell_idx, 4096).unwrap(); + assert!(page_contents.overflow_cells.is_empty()); total_size += payload.len() + 2; cells.push(Cell { pos: i as usize, @@ -9032,21 +9032,21 @@ mod tests { }); } 1 => { - if page.cell_count() == 0 { + if page_contents.cell_count() == 0 { continue; } - let cell_idx = rng.next_u64() as usize % page.cell_count(); - let (_, len) = page.cell_get_raw_region(cell_idx, usable_space); - drop_cell(page, cell_idx, usable_space).unwrap(); + let cell_idx = rng.next_u64() as usize % page_contents.cell_count(); + let (_, len) = page_contents.cell_get_raw_region(cell_idx, usable_space); + drop_cell(page_contents, cell_idx, usable_space).unwrap(); total_size -= len + 2; cells.remove(cell_idx); } 2 => { - defragment_page(page, usable_space, 4).unwrap(); + defragment_page(page_contents, usable_space, 4).unwrap(); } _ => unreachable!(), } - let free = compute_free_space(page, usable_space); + let free = compute_free_space(page_contents, usable_space); assert_eq!(free, 4096 - total_size - header_size); } } @@ -9158,14 +9158,14 @@ mod tests { let conn = db.connect().unwrap(); let page = get_page(2); let page = page.get(); - let page = page.get_contents(); + let page_contents = page.get_contents(); let header_size = 8; let usable_space = 4096; let regs = &[Register::Value(Value::Integer(0))]; let record = ImmutableRecord::from_registers(regs, regs.len()); - let payload = add_record(0, 0, page, record, &conn); - let free = compute_free_space(page, usable_space); + let payload = add_record(0, 0, page.clone(), record, &conn); + let free = compute_free_space(page_contents, usable_space); assert_eq!(free, 4096 - payload.len() - 2 - header_size); } @@ -9176,18 +9176,18 @@ mod tests { let page = get_page(2); let page = page.get(); - let page = page.get_contents(); + let page_contents = page.get_contents(); let usable_space = 4096; let regs = &[Register::Value(Value::Integer(0))]; let record = ImmutableRecord::from_registers(regs, regs.len()); - let payload = add_record(0, 0, page, record, &conn); + let payload = add_record(0, 0, page.clone(), record, &conn); - assert_eq!(page.cell_count(), 1); - defragment_page(page, usable_space, 4).unwrap(); - assert_eq!(page.cell_count(), 1); - let (start, len) = page.cell_get_raw_region(0, usable_space); - let buf = page.as_ptr(); + assert_eq!(page_contents.cell_count(), 1); + defragment_page(page_contents, usable_space, 4).unwrap(); + assert_eq!(page_contents.cell_count(), 1); + let (start, len) = page_contents.cell_get_raw_region(0, usable_space); + let buf = page_contents.as_ptr(); assert_eq!(&payload, &buf[start..start + len]); } @@ -9198,7 +9198,7 @@ mod tests { let page = get_page(2); let page = page.get(); - let page = page.get_contents(); + let page_contents = page.get_contents(); let usable_space = 4096; let regs = &[ @@ -9206,19 +9206,19 @@ mod tests { Register::Value(Value::Text(Text::new("aaaaaaaa"))), ]; let record = ImmutableRecord::from_registers(regs, regs.len()); - let _ = add_record(0, 0, page, record, &conn); + let _ = add_record(0, 0, page.clone(), record, &conn); - assert_eq!(page.cell_count(), 1); - drop_cell(page, 0, usable_space).unwrap(); - assert_eq!(page.cell_count(), 0); + assert_eq!(page_contents.cell_count(), 1); + drop_cell(page_contents, 0, usable_space).unwrap(); + assert_eq!(page_contents.cell_count(), 0); let regs = &[Register::Value(Value::Integer(0))]; let record = ImmutableRecord::from_registers(regs, regs.len()); - let payload = add_record(0, 0, page, record, &conn); - assert_eq!(page.cell_count(), 1); + let payload = add_record(0, 0, page.clone(), record, &conn); + assert_eq!(page_contents.cell_count(), 1); - let (start, len) = page.cell_get_raw_region(0, usable_space); - let buf = page.as_ptr(); + let (start, len) = page_contents.cell_get_raw_region(0, usable_space); + let buf = page_contents.as_ptr(); assert_eq!(&payload, &buf[start..start + len]); } @@ -9229,7 +9229,7 @@ mod tests { let page = get_page(2); let page = page.get(); - let page = page.get_contents(); + let page_contents = page.get_contents(); let usable_space = 4096; let regs = &[ @@ -9237,20 +9237,20 @@ mod tests { Register::Value(Value::Text(Text::new("aaaaaaaa"))), ]; let record = ImmutableRecord::from_registers(regs, regs.len()); - let _ = add_record(0, 0, page, record, &conn); + let _ = add_record(0, 0, page.clone(), record, &conn); for _ in 0..100 { - assert_eq!(page.cell_count(), 1); - drop_cell(page, 0, usable_space).unwrap(); - assert_eq!(page.cell_count(), 0); + assert_eq!(page_contents.cell_count(), 1); + drop_cell(page_contents, 0, usable_space).unwrap(); + assert_eq!(page_contents.cell_count(), 0); let regs = &[Register::Value(Value::Integer(0))]; let record = ImmutableRecord::from_registers(regs, regs.len()); - let payload = add_record(0, 0, page, record, &conn); - assert_eq!(page.cell_count(), 1); + let payload = add_record(0, 0, page.clone(), record, &conn); + assert_eq!(page_contents.cell_count(), 1); - let (start, len) = page.cell_get_raw_region(0, usable_space); - let buf = page.as_ptr(); + let (start, len) = page_contents.cell_get_raw_region(0, usable_space); + let buf = page_contents.as_ptr(); assert_eq!(&payload, &buf[start..start + len]); } } @@ -9262,23 +9262,23 @@ mod tests { let page = get_page(2); let page = page.get(); - let page = page.get_contents(); + let page_contents = page.get_contents(); let usable_space = 4096; let regs = &[Register::Value(Value::Integer(0))]; let record = ImmutableRecord::from_registers(regs, regs.len()); - let payload = add_record(0, 0, page, record, &conn); + let payload = add_record(0, 0, page.clone(), record, &conn); let regs = &[Register::Value(Value::Integer(1))]; let record = ImmutableRecord::from_registers(regs, regs.len()); - let _ = add_record(1, 1, page, record, &conn); + let _ = add_record(1, 1, page.clone(), record, &conn); let regs = &[Register::Value(Value::Integer(2))]; let record = ImmutableRecord::from_registers(regs, regs.len()); - let _ = add_record(2, 2, page, record, &conn); + let _ = add_record(2, 2, page.clone(), record, &conn); - drop_cell(page, 1, usable_space).unwrap(); - drop_cell(page, 1, usable_space).unwrap(); + drop_cell(page_contents, 1, usable_space).unwrap(); + drop_cell(page_contents, 1, usable_space).unwrap(); - ensure_cell(page, 0, &payload); + ensure_cell(page_contents, 0, &payload); } #[test] @@ -9288,29 +9288,29 @@ mod tests { let page = get_page(2); let page = page.get(); - let page = page.get_contents(); + let page_contents = page.get_contents(); let usable_space = 4096; let regs = &[Register::Value(Value::Integer(0))]; let record = ImmutableRecord::from_registers(regs, regs.len()); - let _ = add_record(0, 0, page, record, &conn); + let _ = add_record(0, 0, page.clone(), record, &conn); let regs = &[Register::Value(Value::Integer(0))]; let record = ImmutableRecord::from_registers(regs, regs.len()); - let _ = add_record(0, 0, page, record, &conn); - drop_cell(page, 0, usable_space).unwrap(); + let _ = add_record(0, 0, page.clone(), record, &conn); + drop_cell(page_contents, 0, usable_space).unwrap(); - defragment_page(page, usable_space, 4).unwrap(); + defragment_page(page_contents, usable_space, 4).unwrap(); let regs = &[Register::Value(Value::Integer(0))]; let record = ImmutableRecord::from_registers(regs, regs.len()); - let _ = add_record(0, 1, page, record, &conn); + let _ = add_record(0, 1, page.clone(), record, &conn); - drop_cell(page, 0, usable_space).unwrap(); + drop_cell(page_contents, 0, usable_space).unwrap(); let regs = &[Register::Value(Value::Integer(0))]; let record = ImmutableRecord::from_registers(regs, regs.len()); - let _ = add_record(0, 1, page, record, &conn); + let _ = add_record(0, 1, page.clone(), record, &conn); } #[test] @@ -9334,21 +9334,21 @@ mod tests { let page = page.get(); defragment(page.get_contents()); defragment(page.get_contents()); - insert(0, page.get_contents()); + insert(0, page.clone()); drop(0, page.get_contents()); - insert(0, page.get_contents()); + insert(0, page.clone()); drop(0, page.get_contents()); - insert(0, page.get_contents()); + insert(0, page.clone()); defragment(page.get_contents()); defragment(page.get_contents()); drop(0, page.get_contents()); defragment(page.get_contents()); - insert(0, page.get_contents()); + insert(0, page.clone()); drop(0, page.get_contents()); - insert(0, page.get_contents()); - insert(1, page.get_contents()); - insert(1, page.get_contents()); - insert(0, page.get_contents()); + insert(0, page.clone()); + insert(1, page.clone()); + insert(1, page.clone()); + insert(0, page.clone()); drop(3, page.get_contents()); drop(2, page.get_contents()); compute_free_space(page.get_contents(), usable_space); @@ -9379,7 +9379,7 @@ mod tests { run_until_done( || { fill_cell_payload( - page.get().get_contents(), + page.get(), Some(0), &mut payload, 0, @@ -9393,11 +9393,11 @@ mod tests { ) .unwrap(); let page = page.get(); - insert(0, page.get_contents()); + insert(0, page.clone()); defragment(page.get_contents()); - insert(0, page.get_contents()); + insert(0, page.clone()); defragment(page.get_contents()); - insert(0, page.get_contents()); + insert(0, page.clone()); drop(2, page.get_contents()); drop(0, page.get_contents()); let free = compute_free_space(page.get_contents(), usable_space); @@ -9465,7 +9465,7 @@ mod tests { run_until_done( || { fill_cell_payload( - page.get().get_contents(), + page.get(), Some(0), &mut payload, 0, @@ -9817,7 +9817,7 @@ mod tests { while compute_free_space(page.get_contents(), pager.usable_space()) >= size as usize + 10 { - insert_cell(i, size, page.get_contents(), pager.clone()); + insert_cell(i, size, page.clone(), pager.clone()); i += 1; size = (rng.next_u64() % 1024) as u16; } @@ -9868,15 +9868,16 @@ mod tests { } } - fn insert_cell(cell_idx: u64, size: u16, contents: &mut PageContent, pager: Rc) { + fn insert_cell(cell_idx: u64, size: u16, page: PageRef, pager: Rc) { let mut payload = Vec::new(); let regs = &[Register::Value(Value::Blob(vec![0; size as usize]))]; let record = ImmutableRecord::from_registers(regs, regs.len()); let mut fill_cell_payload_state = FillCellPayloadState::Start; + let contents = page.get_contents(); run_until_done( || { fill_cell_payload( - contents, + page.clone(), Some(cell_idx as i64), &mut payload, cell_idx as usize, From 42c8a77bb7d3a35c7889f3ce0d6d7043c6458a08 Mon Sep 17 00:00:00 2001 From: Jussi Saurio Date: Mon, 25 Aug 2025 15:03:10 +0300 Subject: [PATCH 3/5] use existing payload_overflows() utility in local space calculation --- core/storage/btree.rs | 52 +++++++++++++------------------------------ 1 file changed, 16 insertions(+), 36 deletions(-) diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 962e5d51a..b919e7c0e 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -6,9 +6,10 @@ use crate::{ storage::{ pager::{BtreePageAllocMode, Pager}, sqlite3_ondisk::{ - read_u32, read_varint, BTreeCell, DatabaseHeader, PageContent, PageSize, PageType, - TableInteriorCell, TableLeafCell, CELL_PTR_SIZE_BYTES, INTERIOR_PAGE_HEADER_SIZE_BYTES, - LEAF_PAGE_HEADER_SIZE_BYTES, LEFT_CHILD_PTR_SIZE_BYTES, + payload_overflows, read_u32, read_varint, BTreeCell, DatabaseHeader, PageContent, + PageSize, PageType, TableInteriorCell, TableLeafCell, CELL_PTR_SIZE_BYTES, + INTERIOR_PAGE_HEADER_SIZE_BYTES, LEAF_PAGE_HEADER_SIZE_BYTES, + LEFT_CHILD_PTR_SIZE_BYTES, }, state_machines::{ AdvanceState, CountState, EmptyTableState, MoveToRightState, MoveToState, RewindState, @@ -7040,48 +7041,27 @@ fn fill_cell_payload( write_varint_to_vec(record_buf.len() as u64, cell_payload); } - let payload_overflow_threshold_max = - payload_overflow_threshold_max(page_type, usable_space); - tracing::debug!( - "fill_cell_payload(record_size={}, payload_overflow_threshold_max={})", - record_buf.len(), - payload_overflow_threshold_max - ); - if record_buf.len() <= payload_overflow_threshold_max { + let max_local = payload_overflow_threshold_max(page_type, usable_space); + let min_local = payload_overflow_threshold_min(page_type, usable_space); + + let (overflows, local_size_if_overflow) = + payload_overflows(record_buf.len(), max_local, min_local, usable_space); + if !overflows { // enough allowed space to fit inside a btree page cell_payload.extend_from_slice(record_buf.as_ref()); break; } - let payload_overflow_threshold_min = - payload_overflow_threshold_min(page_type, usable_space); - // see e.g. https://github.com/sqlite/sqlite/blob/9591d3fe93936533c8c3b0dc4d025ac999539e11/src/dbstat.c#L371 - let mut space_left = payload_overflow_threshold_min - + (record_buf.len() - payload_overflow_threshold_min) % overflow_page_data_size; - - if space_left > payload_overflow_threshold_max { - space_left = payload_overflow_threshold_min; - } - - // cell_size must be equal to first value of space_left as this will be the bytes copied to non-overflow page. - let cell_size = space_left + cell_payload.len() + overflow_page_pointer_size; - - let prev_size = cell_payload.len(); - let new_data_size = prev_size + space_left; - cell_payload.resize(new_data_size + overflow_page_pointer_size, 0); - assert_eq!( - cell_size, - cell_payload.len(), - "cell_size={} != cell_payload.len()={}", - cell_size, - cell_payload.len() - ); + // so far we've written any of: left child page, rowid, payload size (depending on page type) + let cell_non_payload_elems_size = cell_payload.len(); + let new_total_local_size = cell_non_payload_elems_size + local_size_if_overflow; + cell_payload.resize(new_total_local_size, 0); *fill_cell_payload_state = FillCellPayloadState::CopyData { state: CopyDataState::Copy, - space_left_on_cur_page: space_left, + space_left_on_cur_page: local_size_if_overflow - overflow_page_pointer_size, // local_size_if_overflow includes the overflow page pointer, but we don't want to write payload data there. src_data_offset: 0, - dst_data_offset: prev_size, + dst_data_offset: cell_non_payload_elems_size, current_overflow_page: None, }; continue; From c6553d82b84ec7cd29a31329923a64a83e10f204 Mon Sep 17 00:00:00 2001 From: Jussi Saurio Date: Mon, 25 Aug 2025 15:05:04 +0300 Subject: [PATCH 4/5] Clarify expected behavior with assertion --- core/storage/btree.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/core/storage/btree.rs b/core/storage/btree.rs index b919e7c0e..30530d691 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -7099,10 +7099,9 @@ fn fill_cell_payload( } if record_offset_slice.len() - amount_to_copy == 0 { - if let Some(cur_page) = current_overflow_page { - cur_page.unpin(); // We can safely unpin the current overflow page now. - } - // Everything copied. + let cur_page = current_overflow_page.as_ref().expect("we must have overflowed if the remaining payload fits on the current page"); + cur_page.unpin(); // We can safely unpin the current overflow page now. + // Everything copied. break; } *state = CopyDataState::AllocateOverflowPage; From 16b1ae4a9f40b7b650c35a260f5f2044043ff061 Mon Sep 17 00:00:00 2001 From: Jussi Saurio Date: Mon, 25 Aug 2025 15:12:37 +0300 Subject: [PATCH 5/5] Handle unpinning btree page in case of allocate overflow page error --- core/storage/btree.rs | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 30530d691..d3e9d50d3 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -7017,7 +7017,7 @@ fn fill_cell_payload( ) -> Result> { let overflow_page_pointer_size = 4; let overflow_page_data_size = usable_space - overflow_page_pointer_size; - loop { + let result = loop { let record_buf = record.get_payload(); match fill_cell_payload_state { FillCellPayloadState::Start => { @@ -7049,7 +7049,7 @@ fn fill_cell_payload( if !overflows { // enough allowed space to fit inside a btree page cell_payload.extend_from_slice(record_buf.as_ref()); - break; + break Ok(IOResult::Done(())); } // so far we've written any of: left child page, rowid, payload size (depending on page type) @@ -7102,13 +7102,22 @@ fn fill_cell_payload( let cur_page = current_overflow_page.as_ref().expect("we must have overflowed if the remaining payload fits on the current page"); cur_page.unpin(); // We can safely unpin the current overflow page now. // Everything copied. - break; + break Ok(IOResult::Done(())); } *state = CopyDataState::AllocateOverflowPage; *src_data_offset += amount_to_copy; } CopyDataState::AllocateOverflowPage => { - let new_overflow_page = return_if_io!(pager.allocate_overflow_page()); + let new_overflow_page = match pager.allocate_overflow_page() { + Ok(IOResult::Done(new_overflow_page)) => new_overflow_page, + Ok(IOResult::IO(io_result)) => return Ok(IOResult::IO(io_result)), + Err(e) => { + if let Some(cur_page) = current_overflow_page { + cur_page.unpin(); + } + break Err(e); + } + }; new_overflow_page.pin(); // Pin the current overflow page so the cache won't evict it because we need this page to be in memory for the next iteration of FillCellPayloadState::CopyData. if let Some(prev_page) = current_overflow_page { prev_page.unpin(); // We can safely unpin the previous overflow page now. @@ -7146,9 +7155,9 @@ fn fill_cell_payload( } } } - } + }; page.unpin(); - Ok(IOResult::Done(())) + result } /// Returns the maximum payload size (X) that can be stored directly on a b-tree page without spilling to overflow pages. ///