mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-08 18:54:21 +01:00
Make fill_cell_payload() safe for async IO and cache spilling
Problems: 1. fill_cell_payload() is not re-entrant because it can yield IO on allocating a new overflow page, resulting in losing some of the input data. 2. fill_cell_payload() in its current form is not safe for cache spilling because the previous overflow page in the chain of allocated overflow pages can be evicted by a spill caused by the next overflow page allocation, invalidating the page pointer and causing corruption. 3. fill_cell_payload() uses raw pointers and `unsafe` as a workaround from a previous time when we used to clone `WriteState`, resulting in hard-to-read code. Solutions: 1. Introduce a new substate to the fill_cell_payload state machine to handle re-entrancy wrt. allocating overflow pages. 2. Always pin the current overflow page so that it cannot be evicted during the overflow chain construction. Also pin the regular page the overflow chain is attached to, because it is immediately accessed after fill_cell_payload is done. 3. Remove all explicit usages of `unsafe` from `fill_cell_payload` (although our pager is ofc still extremely unsafe under the hood :] ) Note that solution 2 addresses a problem that arose in the development of page cache spilling, which is not yet implemented, but will be soon. Miscellania: 1. Renamed a bunch of variables to be clearer 2. Added more comments about what is happening in fill_cell_payload
This commit is contained in:
@@ -2274,7 +2274,7 @@ impl BTreeCursor {
|
||||
ref mut fill_cell_payload_state,
|
||||
} => {
|
||||
return_if_io!(fill_cell_payload(
|
||||
page.get().get().contents.as_ref().unwrap(),
|
||||
page.get(),
|
||||
bkey.maybe_rowid(),
|
||||
new_payload,
|
||||
*cell_idx,
|
||||
@@ -5176,10 +5176,9 @@ impl BTreeCursor {
|
||||
fill_cell_payload_state,
|
||||
} => {
|
||||
let page = page_ref.get();
|
||||
let page_contents = page.get().contents.as_ref().unwrap();
|
||||
{
|
||||
return_if_io!(fill_cell_payload(
|
||||
page_contents,
|
||||
page,
|
||||
*rowid,
|
||||
new_payload,
|
||||
cell_idx,
|
||||
@@ -6963,39 +6962,66 @@ fn allocate_cell_space(
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum FillCellPayloadState {
|
||||
/// Determine whether we can fit the record on the current page.
|
||||
/// If yes, return immediately after copying the data.
|
||||
/// Otherwise move to [CopyData] state.
|
||||
Start,
|
||||
AllocateOverflowPages {
|
||||
/// Arc because we clone [WriteState] for some reason and we use unsafe pointer dereferences in [FillCellPayloadState::AllocateOverflowPages]
|
||||
/// so the underlying bytes must not be cloned in upper layers.
|
||||
record_buf: Arc<[u8]>,
|
||||
space_left: usize,
|
||||
to_copy_buffer_ptr: *const u8,
|
||||
to_copy_buffer_len: usize,
|
||||
pointer: *mut u8,
|
||||
pointer_to_next: *mut u8,
|
||||
/// Copy the next chunk of data from the record buffer to the cell payload.
|
||||
/// If we can't fit all of the remaining data on the current page,
|
||||
/// move the internal state to [CopyDataState::AllocateOverflowPage]
|
||||
CopyData {
|
||||
/// Internal state of the copy data operation.
|
||||
/// We can either be copying data or allocating an overflow page.
|
||||
state: CopyDataState,
|
||||
/// Track how much space we have left on the current page we are copying data into.
|
||||
/// This is reset whenever a new overflow page is allocated.
|
||||
space_left_on_cur_page: usize,
|
||||
/// Offset into the record buffer to copy from.
|
||||
src_data_offset: usize,
|
||||
/// Offset into the destination buffer we are copying data into.
|
||||
/// This is either:
|
||||
/// - an offset in the btree page where the cell is, or
|
||||
/// - an offset in an overflow page
|
||||
dst_data_offset: usize,
|
||||
/// If this is Some, we will copy data into this overflow page.
|
||||
/// If this is None, we will copy data into the cell payload on the btree page.
|
||||
/// Also: to safely form a chain of overflow pages, the current page must be pinned to the page cache
|
||||
/// so that e.g. a spilling operation does not evict it to disk.
|
||||
current_overflow_page: Option<PageRef>,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Copy)]
|
||||
pub enum CopyDataState {
|
||||
/// Copy the next chunk of data from the record buffer to the cell payload.
|
||||
Copy,
|
||||
/// Allocate a new overflow page if we couldn't fit all data to the current page.
|
||||
AllocateOverflowPage,
|
||||
}
|
||||
|
||||
/// Fill in the cell payload with the record.
|
||||
/// If the record is too large to fit in the cell, it will spill onto overflow pages.
|
||||
/// This function needs a separate [FillCellPayloadState] because allocating overflow pages
|
||||
/// may require I/O.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn fill_cell_payload(
|
||||
page_contents: &PageContent,
|
||||
page: PageRef,
|
||||
int_key: Option<i64>,
|
||||
cell_payload: &mut Vec<u8>,
|
||||
cell_idx: usize,
|
||||
record: &ImmutableRecord,
|
||||
usable_space: usize,
|
||||
pager: Rc<Pager>,
|
||||
state: &mut FillCellPayloadState,
|
||||
fill_cell_payload_state: &mut FillCellPayloadState,
|
||||
) -> Result<IOResult<()>> {
|
||||
let overflow_page_pointer_size = 4;
|
||||
let overflow_page_data_size = usable_space - overflow_page_pointer_size;
|
||||
loop {
|
||||
match state {
|
||||
let record_buf = record.get_payload();
|
||||
match fill_cell_payload_state {
|
||||
FillCellPayloadState::Start => {
|
||||
// TODO: make record raw from start, having to serialize is not good
|
||||
let record_buf: Arc<[u8]> = Arc::from(record.get_payload());
|
||||
page.pin(); // We need to pin this page because we will be accessing its contents after fill_cell_payload is done.
|
||||
let page_contents = page.get().contents.as_ref().unwrap();
|
||||
|
||||
let page_type = page_contents.page_type();
|
||||
// fill in header
|
||||
@@ -7024,25 +7050,25 @@ fn fill_cell_payload(
|
||||
if record_buf.len() <= payload_overflow_threshold_max {
|
||||
// enough allowed space to fit inside a btree page
|
||||
cell_payload.extend_from_slice(record_buf.as_ref());
|
||||
return Ok(IOResult::Done(()));
|
||||
break;
|
||||
}
|
||||
|
||||
let payload_overflow_threshold_min =
|
||||
payload_overflow_threshold_min(page_type, usable_space);
|
||||
// see e.g. https://github.com/sqlite/sqlite/blob/9591d3fe93936533c8c3b0dc4d025ac999539e11/src/dbstat.c#L371
|
||||
let mut space_left = payload_overflow_threshold_min
|
||||
+ (record_buf.len() - payload_overflow_threshold_min) % (usable_space - 4);
|
||||
+ (record_buf.len() - payload_overflow_threshold_min) % overflow_page_data_size;
|
||||
|
||||
if space_left > payload_overflow_threshold_max {
|
||||
space_left = payload_overflow_threshold_min;
|
||||
}
|
||||
|
||||
// cell_size must be equal to first value of space_left as this will be the bytes copied to non-overflow page.
|
||||
let cell_size = space_left + cell_payload.len() + 4; // 4 is the number of bytes of pointer to first overflow page
|
||||
let to_copy_buffer = record_buf.as_ref();
|
||||
let cell_size = space_left + cell_payload.len() + overflow_page_pointer_size;
|
||||
|
||||
let prev_size = cell_payload.len();
|
||||
cell_payload.resize(prev_size + space_left + 4, 0);
|
||||
let new_data_size = prev_size + space_left;
|
||||
cell_payload.resize(new_data_size + overflow_page_pointer_size, 0);
|
||||
assert_eq!(
|
||||
cell_size,
|
||||
cell_payload.len(),
|
||||
@@ -7051,87 +7077,100 @@ fn fill_cell_payload(
|
||||
cell_payload.len()
|
||||
);
|
||||
|
||||
// SAFETY: this pointer is valid because it points to a buffer in an Arc<Mutex<Vec<u8>>> that lives at least as long as this function,
|
||||
// and the Vec will not be mutated in FillCellPayloadState::AllocateOverflowPages, which we will move to next.
|
||||
let pointer = unsafe { cell_payload.as_mut_ptr().add(prev_size) };
|
||||
let pointer_to_next =
|
||||
unsafe { cell_payload.as_mut_ptr().add(prev_size + space_left) };
|
||||
|
||||
let to_copy_buffer_ptr = to_copy_buffer.as_ptr();
|
||||
let to_copy_buffer_len = to_copy_buffer.len();
|
||||
|
||||
*state = FillCellPayloadState::AllocateOverflowPages {
|
||||
record_buf,
|
||||
space_left,
|
||||
to_copy_buffer_ptr,
|
||||
to_copy_buffer_len,
|
||||
pointer,
|
||||
pointer_to_next,
|
||||
*fill_cell_payload_state = FillCellPayloadState::CopyData {
|
||||
state: CopyDataState::Copy,
|
||||
space_left_on_cur_page: space_left,
|
||||
src_data_offset: 0,
|
||||
dst_data_offset: prev_size,
|
||||
current_overflow_page: None,
|
||||
};
|
||||
continue;
|
||||
}
|
||||
FillCellPayloadState::AllocateOverflowPages {
|
||||
record_buf: _record_buf,
|
||||
space_left,
|
||||
to_copy_buffer_ptr,
|
||||
to_copy_buffer_len,
|
||||
pointer,
|
||||
pointer_to_next,
|
||||
FillCellPayloadState::CopyData {
|
||||
state,
|
||||
src_data_offset,
|
||||
space_left_on_cur_page,
|
||||
dst_data_offset,
|
||||
current_overflow_page,
|
||||
} => {
|
||||
let to_copy;
|
||||
{
|
||||
let to_copy_buffer_ptr = *to_copy_buffer_ptr;
|
||||
let to_copy_buffer_len = *to_copy_buffer_len;
|
||||
let pointer = *pointer;
|
||||
let space_left = *space_left;
|
||||
match state {
|
||||
CopyDataState::Copy => {
|
||||
turso_assert!(*src_data_offset < record_buf.len(), "trying to read past end of record buffer: record_offset={} < record_buf.len()={}", src_data_offset, record_buf.len());
|
||||
let record_offset_slice = &record_buf[*src_data_offset..];
|
||||
let amount_to_copy =
|
||||
(*space_left_on_cur_page).min(record_offset_slice.len());
|
||||
let record_offset_slice_to_copy = &record_offset_slice[..amount_to_copy];
|
||||
if let Some(cur_page) = current_overflow_page {
|
||||
// Copy data into the current overflow page.
|
||||
turso_assert!(
|
||||
cur_page.is_loaded(),
|
||||
"current overflow page is not loaded"
|
||||
);
|
||||
turso_assert!(*dst_data_offset == overflow_page_pointer_size, "data must be copied to offset {overflow_page_pointer_size} on overflow pages, instead tried to copy to offset {dst_data_offset}");
|
||||
let contents = cur_page.get_contents();
|
||||
let buf = &mut contents.as_ptr()
|
||||
[*dst_data_offset..*dst_data_offset + amount_to_copy];
|
||||
buf.copy_from_slice(record_offset_slice_to_copy);
|
||||
} else {
|
||||
// Copy data into the cell payload on the btree page.
|
||||
let buf = &mut cell_payload
|
||||
[*dst_data_offset..*dst_data_offset + amount_to_copy];
|
||||
buf.copy_from_slice(record_offset_slice_to_copy);
|
||||
}
|
||||
|
||||
// SAFETY: we know to_copy_buffer_ptr is valid because it refers to record_buf which lives at least as long as this function,
|
||||
// and the underlying bytes are not mutated in FillCellPayloadState::AllocateOverflowPages.
|
||||
let to_copy_buffer = unsafe {
|
||||
std::slice::from_raw_parts(to_copy_buffer_ptr, to_copy_buffer_len)
|
||||
};
|
||||
to_copy = space_left.min(to_copy_buffer_len);
|
||||
// SAFETY: we know 'pointer' is valid because it refers to cell_payload which lives at least as long as this function,
|
||||
// and the underlying bytes are not mutated in FillCellPayloadState::AllocateOverflowPages.
|
||||
unsafe { std::ptr::copy(to_copy_buffer_ptr, pointer, to_copy) };
|
||||
if record_offset_slice.len() - amount_to_copy == 0 {
|
||||
if let Some(cur_page) = current_overflow_page {
|
||||
cur_page.unpin(); // We can safely unpin the current overflow page now.
|
||||
}
|
||||
// Everything copied.
|
||||
break;
|
||||
}
|
||||
*state = CopyDataState::AllocateOverflowPage;
|
||||
*src_data_offset += amount_to_copy;
|
||||
}
|
||||
CopyDataState::AllocateOverflowPage => {
|
||||
let new_overflow_page = return_if_io!(pager.allocate_overflow_page());
|
||||
new_overflow_page.pin(); // Pin the current overflow page so the cache won't evict it because we need this page to be in memory for the next iteration of FillCellPayloadState::CopyData.
|
||||
if let Some(prev_page) = current_overflow_page {
|
||||
prev_page.unpin(); // We can safely unpin the previous overflow page now.
|
||||
}
|
||||
|
||||
let left = to_copy_buffer.len() - to_copy;
|
||||
if left == 0 {
|
||||
break;
|
||||
turso_assert!(
|
||||
new_overflow_page.is_loaded(),
|
||||
"new overflow page is not loaded"
|
||||
);
|
||||
let new_overflow_page_id = new_overflow_page.get().id as u32;
|
||||
|
||||
if let Some(prev_page) = current_overflow_page {
|
||||
// Update the previous overflow page's "next overflow page" pointer to point to the new overflow page.
|
||||
turso_assert!(
|
||||
prev_page.is_loaded(),
|
||||
"previous overflow page is not loaded"
|
||||
);
|
||||
let contents = prev_page.get_contents();
|
||||
let buf = &mut contents.as_ptr()[..overflow_page_pointer_size];
|
||||
buf.copy_from_slice(&new_overflow_page_id.to_be_bytes());
|
||||
} else {
|
||||
// Update the cell payload's "next overflow page" pointer to point to the new overflow page.
|
||||
let first_overflow_page_ptr_offset =
|
||||
cell_payload.len() - overflow_page_pointer_size;
|
||||
let buf = &mut cell_payload[first_overflow_page_ptr_offset
|
||||
..first_overflow_page_ptr_offset + overflow_page_pointer_size];
|
||||
buf.copy_from_slice(&new_overflow_page_id.to_be_bytes());
|
||||
}
|
||||
|
||||
*dst_data_offset = overflow_page_pointer_size;
|
||||
*space_left_on_cur_page = overflow_page_data_size;
|
||||
*current_overflow_page = Some(new_overflow_page.clone());
|
||||
*state = CopyDataState::Copy;
|
||||
}
|
||||
}
|
||||
|
||||
// we still have bytes to add, we will need to allocate new overflow page
|
||||
// FIXME: handle page cache is full
|
||||
let overflow_page = return_if_io!(pager.allocate_overflow_page());
|
||||
turso_assert!(overflow_page.is_loaded(), "overflow page is not loaded");
|
||||
{
|
||||
let id = overflow_page.get().id as u32;
|
||||
let contents = overflow_page.get_contents();
|
||||
|
||||
// TODO: take into account offset here?
|
||||
let buf = contents.as_ptr();
|
||||
let as_bytes = id.to_be_bytes();
|
||||
// update pointer to new overflow page
|
||||
// SAFETY: we know 'pointer_to_next' is valid because it refers to an offset in cell_payload which is less than space_left + 4,
|
||||
// and the underlying bytes are not mutated in FillCellPayloadState::AllocateOverflowPages.
|
||||
unsafe { std::ptr::copy(as_bytes.as_ptr(), *pointer_to_next, 4) };
|
||||
|
||||
*pointer = unsafe { buf.as_mut_ptr().add(4) };
|
||||
*pointer_to_next = buf.as_mut_ptr();
|
||||
*space_left = usable_space - 4;
|
||||
}
|
||||
|
||||
*to_copy_buffer_len -= to_copy;
|
||||
// SAFETY: we know 'to_copy_buffer_ptr' is valid because it refers to record_buf which lives at least as long as this function,
|
||||
// and that the offset is less than its length, and the underlying bytes are not mutated in FillCellPayloadState::AllocateOverflowPages.
|
||||
*to_copy_buffer_ptr = unsafe { to_copy_buffer_ptr.add(to_copy) };
|
||||
}
|
||||
}
|
||||
}
|
||||
page.unpin();
|
||||
Ok(IOResult::Done(()))
|
||||
}
|
||||
|
||||
/// Returns the maximum payload size (X) that can be stored directly on a b-tree page without spilling to overflow pages.
|
||||
///
|
||||
/// For table leaf pages: X = usable_size - 35
|
||||
|
||||
Reference in New Issue
Block a user