diff --git a/core/storage/btree.rs b/core/storage/btree.rs index b2c36a560..3204bce3b 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -6,9 +6,10 @@ use crate::{ storage::{ pager::{BtreePageAllocMode, Pager}, sqlite3_ondisk::{ - read_u32, read_varint, BTreeCell, DatabaseHeader, PageContent, PageSize, PageType, - TableInteriorCell, TableLeafCell, CELL_PTR_SIZE_BYTES, INTERIOR_PAGE_HEADER_SIZE_BYTES, - LEAF_PAGE_HEADER_SIZE_BYTES, LEFT_CHILD_PTR_SIZE_BYTES, + payload_overflows, read_u32, read_varint, BTreeCell, DatabaseHeader, PageContent, + PageSize, PageType, TableInteriorCell, TableLeafCell, CELL_PTR_SIZE_BYTES, + INTERIOR_PAGE_HEADER_SIZE_BYTES, LEAF_PAGE_HEADER_SIZE_BYTES, + LEFT_CHILD_PTR_SIZE_BYTES, }, state_machines::{ AdvanceState, CountState, EmptyTableState, MoveToRightState, MoveToState, RewindState, @@ -2273,7 +2274,7 @@ impl BTreeCursor { ref mut fill_cell_payload_state, } => { return_if_io!(fill_cell_payload( - page.get().get().contents.as_ref().unwrap(), + page.get(), bkey.maybe_rowid(), new_payload, *cell_idx, @@ -5175,10 +5176,9 @@ impl BTreeCursor { fill_cell_payload_state, } => { let page = page_ref.get(); - let page_contents = page.get().contents.as_ref().unwrap(); { return_if_io!(fill_cell_payload( - page_contents, + page, *rowid, new_payload, cell_idx, @@ -7117,39 +7117,66 @@ fn allocate_cell_space( #[derive(Debug, Clone)] pub enum FillCellPayloadState { + /// Determine whether we can fit the record on the current page. + /// If yes, return immediately after copying the data. + /// Otherwise move to [CopyData] state. Start, - AllocateOverflowPages { - /// Arc because we clone [WriteState] for some reason and we use unsafe pointer dereferences in [FillCellPayloadState::AllocateOverflowPages] - /// so the underlying bytes must not be cloned in upper layers. - record_buf: Arc<[u8]>, - space_left: usize, - to_copy_buffer_ptr: *const u8, - to_copy_buffer_len: usize, - pointer: *mut u8, - pointer_to_next: *mut u8, + /// Copy the next chunk of data from the record buffer to the cell payload. + /// If we can't fit all of the remaining data on the current page, + /// move the internal state to [CopyDataState::AllocateOverflowPage] + CopyData { + /// Internal state of the copy data operation. + /// We can either be copying data or allocating an overflow page. + state: CopyDataState, + /// Track how much space we have left on the current page we are copying data into. + /// This is reset whenever a new overflow page is allocated. + space_left_on_cur_page: usize, + /// Offset into the record buffer to copy from. + src_data_offset: usize, + /// Offset into the destination buffer we are copying data into. + /// This is either: + /// - an offset in the btree page where the cell is, or + /// - an offset in an overflow page + dst_data_offset: usize, + /// If this is Some, we will copy data into this overflow page. + /// If this is None, we will copy data into the cell payload on the btree page. + /// Also: to safely form a chain of overflow pages, the current page must be pinned to the page cache + /// so that e.g. a spilling operation does not evict it to disk. + current_overflow_page: Option, }, } +#[derive(Debug, Clone, PartialEq, Eq, Copy)] +pub enum CopyDataState { + /// Copy the next chunk of data from the record buffer to the cell payload. + Copy, + /// Allocate a new overflow page if we couldn't fit all data to the current page. + AllocateOverflowPage, +} + /// Fill in the cell payload with the record. /// If the record is too large to fit in the cell, it will spill onto overflow pages. /// This function needs a separate [FillCellPayloadState] because allocating overflow pages /// may require I/O. #[allow(clippy::too_many_arguments)] fn fill_cell_payload( - page_contents: &PageContent, + page: PageRef, int_key: Option, cell_payload: &mut Vec, cell_idx: usize, record: &ImmutableRecord, usable_space: usize, pager: Rc, - state: &mut FillCellPayloadState, + fill_cell_payload_state: &mut FillCellPayloadState, ) -> Result> { - loop { - match state { + let overflow_page_pointer_size = 4; + let overflow_page_data_size = usable_space - overflow_page_pointer_size; + let result = loop { + let record_buf = record.get_payload(); + match fill_cell_payload_state { FillCellPayloadState::Start => { - // TODO: make record raw from start, having to serialize is not good - let record_buf: Arc<[u8]> = Arc::from(record.get_payload()); + page.pin(); // We need to pin this page because we will be accessing its contents after fill_cell_payload is done. + let page_contents = page.get().contents.as_ref().unwrap(); let page_type = page_contents.page_type(); // fill in header @@ -7168,124 +7195,124 @@ fn fill_cell_payload( write_varint_to_vec(record_buf.len() as u64, cell_payload); } - let payload_overflow_threshold_max = - payload_overflow_threshold_max(page_type, usable_space); - tracing::debug!( - "fill_cell_payload(record_size={}, payload_overflow_threshold_max={})", - record_buf.len(), - payload_overflow_threshold_max - ); - if record_buf.len() <= payload_overflow_threshold_max { + let max_local = payload_overflow_threshold_max(page_type, usable_space); + let min_local = payload_overflow_threshold_min(page_type, usable_space); + + let (overflows, local_size_if_overflow) = + payload_overflows(record_buf.len(), max_local, min_local, usable_space); + if !overflows { // enough allowed space to fit inside a btree page cell_payload.extend_from_slice(record_buf.as_ref()); - return Ok(IOResult::Done(())); + break Ok(IOResult::Done(())); } - let payload_overflow_threshold_min = - payload_overflow_threshold_min(page_type, usable_space); - // see e.g. https://github.com/sqlite/sqlite/blob/9591d3fe93936533c8c3b0dc4d025ac999539e11/src/dbstat.c#L371 - let mut space_left = payload_overflow_threshold_min - + (record_buf.len() - payload_overflow_threshold_min) % (usable_space - 4); + // so far we've written any of: left child page, rowid, payload size (depending on page type) + let cell_non_payload_elems_size = cell_payload.len(); + let new_total_local_size = cell_non_payload_elems_size + local_size_if_overflow; + cell_payload.resize(new_total_local_size, 0); - if space_left > payload_overflow_threshold_max { - space_left = payload_overflow_threshold_min; - } - - // cell_size must be equal to first value of space_left as this will be the bytes copied to non-overflow page. - let cell_size = space_left + cell_payload.len() + 4; // 4 is the number of bytes of pointer to first overflow page - let to_copy_buffer = record_buf.as_ref(); - - let prev_size = cell_payload.len(); - cell_payload.resize(prev_size + space_left + 4, 0); - assert_eq!( - cell_size, - cell_payload.len(), - "cell_size={} != cell_payload.len()={}", - cell_size, - cell_payload.len() - ); - - // SAFETY: this pointer is valid because it points to a buffer in an Arc>> that lives at least as long as this function, - // and the Vec will not be mutated in FillCellPayloadState::AllocateOverflowPages, which we will move to next. - let pointer = unsafe { cell_payload.as_mut_ptr().add(prev_size) }; - let pointer_to_next = - unsafe { cell_payload.as_mut_ptr().add(prev_size + space_left) }; - - let to_copy_buffer_ptr = to_copy_buffer.as_ptr(); - let to_copy_buffer_len = to_copy_buffer.len(); - - *state = FillCellPayloadState::AllocateOverflowPages { - record_buf, - space_left, - to_copy_buffer_ptr, - to_copy_buffer_len, - pointer, - pointer_to_next, + *fill_cell_payload_state = FillCellPayloadState::CopyData { + state: CopyDataState::Copy, + space_left_on_cur_page: local_size_if_overflow - overflow_page_pointer_size, // local_size_if_overflow includes the overflow page pointer, but we don't want to write payload data there. + src_data_offset: 0, + dst_data_offset: cell_non_payload_elems_size, + current_overflow_page: None, }; continue; } - FillCellPayloadState::AllocateOverflowPages { - record_buf: _record_buf, - space_left, - to_copy_buffer_ptr, - to_copy_buffer_len, - pointer, - pointer_to_next, + FillCellPayloadState::CopyData { + state, + src_data_offset, + space_left_on_cur_page, + dst_data_offset, + current_overflow_page, } => { - let to_copy; - { - let to_copy_buffer_ptr = *to_copy_buffer_ptr; - let to_copy_buffer_len = *to_copy_buffer_len; - let pointer = *pointer; - let space_left = *space_left; + match state { + CopyDataState::Copy => { + turso_assert!(*src_data_offset < record_buf.len(), "trying to read past end of record buffer: record_offset={} < record_buf.len()={}", src_data_offset, record_buf.len()); + let record_offset_slice = &record_buf[*src_data_offset..]; + let amount_to_copy = + (*space_left_on_cur_page).min(record_offset_slice.len()); + let record_offset_slice_to_copy = &record_offset_slice[..amount_to_copy]; + if let Some(cur_page) = current_overflow_page { + // Copy data into the current overflow page. + turso_assert!( + cur_page.is_loaded(), + "current overflow page is not loaded" + ); + turso_assert!(*dst_data_offset == overflow_page_pointer_size, "data must be copied to offset {overflow_page_pointer_size} on overflow pages, instead tried to copy to offset {dst_data_offset}"); + let contents = cur_page.get_contents(); + let buf = &mut contents.as_ptr() + [*dst_data_offset..*dst_data_offset + amount_to_copy]; + buf.copy_from_slice(record_offset_slice_to_copy); + } else { + // Copy data into the cell payload on the btree page. + let buf = &mut cell_payload + [*dst_data_offset..*dst_data_offset + amount_to_copy]; + buf.copy_from_slice(record_offset_slice_to_copy); + } - // SAFETY: we know to_copy_buffer_ptr is valid because it refers to record_buf which lives at least as long as this function, - // and the underlying bytes are not mutated in FillCellPayloadState::AllocateOverflowPages. - let to_copy_buffer = unsafe { - std::slice::from_raw_parts(to_copy_buffer_ptr, to_copy_buffer_len) - }; - to_copy = space_left.min(to_copy_buffer_len); - // SAFETY: we know 'pointer' is valid because it refers to cell_payload which lives at least as long as this function, - // and the underlying bytes are not mutated in FillCellPayloadState::AllocateOverflowPages. - unsafe { std::ptr::copy(to_copy_buffer_ptr, pointer, to_copy) }; + if record_offset_slice.len() - amount_to_copy == 0 { + let cur_page = current_overflow_page.as_ref().expect("we must have overflowed if the remaining payload fits on the current page"); + cur_page.unpin(); // We can safely unpin the current overflow page now. + // Everything copied. + break Ok(IOResult::Done(())); + } + *state = CopyDataState::AllocateOverflowPage; + *src_data_offset += amount_to_copy; + } + CopyDataState::AllocateOverflowPage => { + let new_overflow_page = match pager.allocate_overflow_page() { + Ok(IOResult::Done(new_overflow_page)) => new_overflow_page, + Ok(IOResult::IO(io_result)) => return Ok(IOResult::IO(io_result)), + Err(e) => { + if let Some(cur_page) = current_overflow_page { + cur_page.unpin(); + } + break Err(e); + } + }; + new_overflow_page.pin(); // Pin the current overflow page so the cache won't evict it because we need this page to be in memory for the next iteration of FillCellPayloadState::CopyData. + if let Some(prev_page) = current_overflow_page { + prev_page.unpin(); // We can safely unpin the previous overflow page now. + } - let left = to_copy_buffer.len() - to_copy; - if left == 0 { - break; + turso_assert!( + new_overflow_page.is_loaded(), + "new overflow page is not loaded" + ); + let new_overflow_page_id = new_overflow_page.get().id as u32; + + if let Some(prev_page) = current_overflow_page { + // Update the previous overflow page's "next overflow page" pointer to point to the new overflow page. + turso_assert!( + prev_page.is_loaded(), + "previous overflow page is not loaded" + ); + let contents = prev_page.get_contents(); + let buf = &mut contents.as_ptr()[..overflow_page_pointer_size]; + buf.copy_from_slice(&new_overflow_page_id.to_be_bytes()); + } else { + // Update the cell payload's "next overflow page" pointer to point to the new overflow page. + let first_overflow_page_ptr_offset = + cell_payload.len() - overflow_page_pointer_size; + let buf = &mut cell_payload[first_overflow_page_ptr_offset + ..first_overflow_page_ptr_offset + overflow_page_pointer_size]; + buf.copy_from_slice(&new_overflow_page_id.to_be_bytes()); + } + + *dst_data_offset = overflow_page_pointer_size; + *space_left_on_cur_page = overflow_page_data_size; + *current_overflow_page = Some(new_overflow_page.clone()); + *state = CopyDataState::Copy; } } - - // we still have bytes to add, we will need to allocate new overflow page - // FIXME: handle page cache is full - let overflow_page = return_if_io!(pager.allocate_overflow_page()); - turso_assert!(overflow_page.is_loaded(), "overflow page is not loaded"); - { - let id = overflow_page.get().id as u32; - let contents = overflow_page.get_contents(); - - // TODO: take into account offset here? - let buf = contents.as_ptr(); - let as_bytes = id.to_be_bytes(); - // update pointer to new overflow page - // SAFETY: we know 'pointer_to_next' is valid because it refers to an offset in cell_payload which is less than space_left + 4, - // and the underlying bytes are not mutated in FillCellPayloadState::AllocateOverflowPages. - unsafe { std::ptr::copy(as_bytes.as_ptr(), *pointer_to_next, 4) }; - - *pointer = unsafe { buf.as_mut_ptr().add(4) }; - *pointer_to_next = buf.as_mut_ptr(); - *space_left = usable_space - 4; - } - - *to_copy_buffer_len -= to_copy; - // SAFETY: we know 'to_copy_buffer_ptr' is valid because it refers to record_buf which lives at least as long as this function, - // and that the offset is less than its length, and the underlying bytes are not mutated in FillCellPayloadState::AllocateOverflowPages. - *to_copy_buffer_ptr = unsafe { to_copy_buffer_ptr.add(to_copy) }; } } - } - Ok(IOResult::Done(())) + }; + page.unpin(); + result } - /// Returns the maximum payload size (X) that can be stored directly on a b-tree page without spilling to overflow pages. /// /// For table leaf pages: X = usable_size - 35 @@ -7439,7 +7466,7 @@ mod tests { fn add_record( id: usize, pos: usize, - page: &mut PageContent, + page: PageRef, record: ImmutableRecord, conn: &Arc, ) -> Vec { @@ -7448,7 +7475,7 @@ mod tests { run_until_done( || { fill_cell_payload( - page, + page.clone(), Some(id as i64), &mut payload, pos, @@ -7461,7 +7488,7 @@ mod tests { &conn.pager.borrow().clone(), ) .unwrap(); - insert_into_cell(page, &payload, pos, 4096).unwrap(); + insert_into_cell(page.get_contents(), &payload, pos, 4096).unwrap(); payload } @@ -7471,17 +7498,17 @@ mod tests { let conn = db.connect().unwrap(); let page = get_page(2); let page = page.get(); - let page = page.get_contents(); let header_size = 8; let regs = &[Register::Value(Value::Integer(1))]; let record = ImmutableRecord::from_registers(regs, regs.len()); - let payload = add_record(1, 0, page, record, &conn); - assert_eq!(page.cell_count(), 1); - let free = compute_free_space(page, 4096); + let payload = add_record(1, 0, page.clone(), record, &conn); + let page_contents = page.get_contents(); + assert_eq!(page_contents.cell_count(), 1); + let free = compute_free_space(page_contents, 4096); assert_eq!(free, 4096 - payload.len() - 2 - header_size); let cell_idx = 0; - ensure_cell(page, cell_idx, &payload); + ensure_cell(page_contents, cell_idx, &payload); } struct Cell { @@ -7496,7 +7523,7 @@ mod tests { let page = get_page(2); let page = page.get(); - let page = page.get_contents(); + let page_contents = page.get_contents(); let header_size = 8; let mut total_size = 0; @@ -7505,22 +7532,22 @@ mod tests { for i in 0..3 { let regs = &[Register::Value(Value::Integer(i as i64))]; let record = ImmutableRecord::from_registers(regs, regs.len()); - let payload = add_record(i, i, page, record, &conn); - assert_eq!(page.cell_count(), i + 1); - let free = compute_free_space(page, usable_space); + let payload = add_record(i, i, page.clone(), record, &conn); + assert_eq!(page_contents.cell_count(), i + 1); + let free = compute_free_space(page_contents, usable_space); total_size += payload.len() + 2; assert_eq!(free, 4096 - total_size - header_size); cells.push(Cell { pos: i, payload }); } for (i, cell) in cells.iter().enumerate() { - ensure_cell(page, i, &cell.payload); + ensure_cell(page_contents, i, &cell.payload); } cells.remove(1); - drop_cell(page, 1, usable_space).unwrap(); + drop_cell(page_contents, 1, usable_space).unwrap(); for (i, cell) in cells.iter().enumerate() { - ensure_cell(page, i, &cell.payload); + ensure_cell(page_contents, i, &cell.payload); } } @@ -8466,7 +8493,7 @@ mod tests { let page = get_page(2); let page = page.get(); - let page = page.get_contents(); + let page_contents = page.get_contents(); let header_size = 8; let mut total_size = 0; @@ -8476,9 +8503,9 @@ mod tests { for i in 0..total_cells { let regs = &[Register::Value(Value::Integer(i as i64))]; let record = ImmutableRecord::from_registers(regs, regs.len()); - let payload = add_record(i, i, page, record, &conn); - assert_eq!(page.cell_count(), i + 1); - let free = compute_free_space(page, usable_space); + let payload = add_record(i, i, page.clone(), record, &conn); + assert_eq!(page_contents.cell_count(), i + 1); + let free = compute_free_space(page_contents, usable_space); total_size += payload.len() + 2; assert_eq!(free, 4096 - total_size - header_size); cells.push(Cell { pos: i, payload }); @@ -8488,7 +8515,7 @@ mod tests { let mut new_cells = Vec::new(); for cell in cells { if cell.pos % 2 == 1 { - drop_cell(page, cell.pos - removed, usable_space).unwrap(); + drop_cell(page_contents, cell.pos - removed, usable_space).unwrap(); removed += 1; } else { new_cells.push(cell); @@ -8496,11 +8523,11 @@ mod tests { } let cells = new_cells; for (i, cell) in cells.iter().enumerate() { - ensure_cell(page, i, &cell.payload); + ensure_cell(page_contents, i, &cell.payload); } for (i, cell) in cells.iter().enumerate() { - ensure_cell(page, i, &cell.payload); + ensure_cell(page_contents, i, &cell.payload); } } @@ -8918,7 +8945,7 @@ mod tests { let page = get_page(2); let page = page.get(); - let page = page.get_contents(); + let page_contents = page.get_contents(); let header_size = 8; let mut total_size = 0; @@ -8927,28 +8954,28 @@ mod tests { for i in 0..3 { let regs = &[Register::Value(Value::Integer(i as i64))]; let record = ImmutableRecord::from_registers(regs, regs.len()); - let payload = add_record(i, i, page, record, &conn); - assert_eq!(page.cell_count(), i + 1); - let free = compute_free_space(page, usable_space); + let payload = add_record(i, i, page.clone(), record, &conn); + assert_eq!(page_contents.cell_count(), i + 1); + let free = compute_free_space(page_contents, usable_space); total_size += payload.len() + 2; assert_eq!(free, 4096 - total_size - header_size); cells.push(Cell { pos: i, payload }); } for (i, cell) in cells.iter().enumerate() { - ensure_cell(page, i, &cell.payload); + ensure_cell(page_contents, i, &cell.payload); } cells.remove(1); - drop_cell(page, 1, usable_space).unwrap(); + drop_cell(page_contents, 1, usable_space).unwrap(); for (i, cell) in cells.iter().enumerate() { - ensure_cell(page, i, &cell.payload); + ensure_cell(page_contents, i, &cell.payload); } - defragment_page(page, usable_space, 4).unwrap(); + defragment_page(page_contents, usable_space, 4).unwrap(); for (i, cell) in cells.iter().enumerate() { - ensure_cell(page, i, &cell.payload); + ensure_cell(page_contents, i, &cell.payload); } } @@ -8959,7 +8986,7 @@ mod tests { let page = get_page(2); let page = page.get(); - let page = page.get_contents(); + let page_contents = page.get_contents(); let header_size = 8; let mut total_size = 0; @@ -8969,9 +8996,9 @@ mod tests { for i in 0..total_cells { let regs = &[Register::Value(Value::Integer(i as i64))]; let record = ImmutableRecord::from_registers(regs, regs.len()); - let payload = add_record(i, i, page, record, &conn); - assert_eq!(page.cell_count(), i + 1); - let free = compute_free_space(page, usable_space); + let payload = add_record(i, i, page.clone(), record, &conn); + assert_eq!(page_contents.cell_count(), i + 1); + let free = compute_free_space(page_contents, usable_space); total_size += payload.len() + 2; assert_eq!(free, 4096 - total_size - header_size); cells.push(Cell { pos: i, payload }); @@ -8981,7 +9008,7 @@ mod tests { let mut new_cells = Vec::new(); for cell in cells { if cell.pos % 2 == 1 { - drop_cell(page, cell.pos - removed, usable_space).unwrap(); + drop_cell(page_contents, cell.pos - removed, usable_space).unwrap(); removed += 1; } else { new_cells.push(cell); @@ -8989,13 +9016,13 @@ mod tests { } let cells = new_cells; for (i, cell) in cells.iter().enumerate() { - ensure_cell(page, i, &cell.payload); + ensure_cell(page_contents, i, &cell.payload); } - defragment_page(page, usable_space, 4).unwrap(); + defragment_page(page_contents, usable_space, 4).unwrap(); for (i, cell) in cells.iter().enumerate() { - ensure_cell(page, i, &cell.payload); + ensure_cell(page_contents, i, &cell.payload); } } @@ -9006,7 +9033,7 @@ mod tests { let page = get_page(2); let page = page.get(); - let page = page.get_contents(); + let page_contents = page.get_contents(); let header_size = 8; let mut total_size = 0; @@ -9021,8 +9048,8 @@ mod tests { match rng.next_u64() % 4 { 0 => { // allow appends with extra place to insert - let cell_idx = rng.next_u64() as usize % (page.cell_count() + 1); - let free = compute_free_space(page, usable_space); + let cell_idx = rng.next_u64() as usize % (page_contents.cell_count() + 1); + let free = compute_free_space(page_contents, usable_space); let regs = &[Register::Value(Value::Integer(i as i64))]; let record = ImmutableRecord::from_registers(regs, regs.len()); let mut payload: Vec = Vec::new(); @@ -9030,7 +9057,7 @@ mod tests { run_until_done( || { fill_cell_payload( - page, + page.clone(), Some(i as i64), &mut payload, cell_idx, @@ -9047,34 +9074,34 @@ mod tests { // do not try to insert overflow pages because they require balancing continue; } - insert_into_cell(page, &payload, cell_idx, 4096).unwrap(); - assert!(page.overflow_cells.is_empty()); + insert_into_cell(page_contents, &payload, cell_idx, 4096).unwrap(); + assert!(page_contents.overflow_cells.is_empty()); total_size += payload.len() + 2; cells.insert(cell_idx, Cell { pos: i, payload }); } 1 => { - if page.cell_count() == 0 { + if page_contents.cell_count() == 0 { continue; } - let cell_idx = rng.next_u64() as usize % page.cell_count(); - let (_, len) = page.cell_get_raw_region(cell_idx, usable_space); - drop_cell(page, cell_idx, usable_space).unwrap(); + let cell_idx = rng.next_u64() as usize % page_contents.cell_count(); + let (_, len) = page_contents.cell_get_raw_region(cell_idx, usable_space); + drop_cell(page_contents, cell_idx, usable_space).unwrap(); total_size -= len + 2; cells.remove(cell_idx); } 2 => { - defragment_page(page, usable_space, 4).unwrap(); + defragment_page(page_contents, usable_space, 4).unwrap(); } 3 => { // check cells for (i, cell) in cells.iter().enumerate() { - ensure_cell(page, i, &cell.payload); + ensure_cell(page_contents, i, &cell.payload); } - assert_eq!(page.cell_count(), cells.len()); + assert_eq!(page_contents.cell_count(), cells.len()); } _ => unreachable!(), } - let free = compute_free_space(page, usable_space); + let free = compute_free_space(page_contents, usable_space); assert_eq!(free, 4096 - total_size - header_size); } } @@ -9088,7 +9115,7 @@ mod tests { let page = get_page(2); let page = page.get(); - let page = page.get_contents(); + let page_contents = page.get_contents(); let header_size = 8; let mut total_size = 0; @@ -9103,8 +9130,8 @@ mod tests { match rng.next_u64() % 3 { 0 => { // allow appends with extra place to insert - let cell_idx = rng.next_u64() as usize % (page.cell_count() + 1); - let free = compute_free_space(page, usable_space); + let cell_idx = rng.next_u64() as usize % (page_contents.cell_count() + 1); + let free = compute_free_space(page_contents, usable_space); let regs = &[Register::Value(Value::Integer(i))]; let record = ImmutableRecord::from_registers(regs, regs.len()); let mut payload: Vec = Vec::new(); @@ -9112,7 +9139,7 @@ mod tests { run_until_done( || { fill_cell_payload( - page, + page.clone(), Some(i), &mut payload, cell_idx, @@ -9129,8 +9156,8 @@ mod tests { // do not try to insert overflow pages because they require balancing continue; } - insert_into_cell(page, &payload, cell_idx, 4096).unwrap(); - assert!(page.overflow_cells.is_empty()); + insert_into_cell(page_contents, &payload, cell_idx, 4096).unwrap(); + assert!(page_contents.overflow_cells.is_empty()); total_size += payload.len() + 2; cells.push(Cell { pos: i as usize, @@ -9138,21 +9165,21 @@ mod tests { }); } 1 => { - if page.cell_count() == 0 { + if page_contents.cell_count() == 0 { continue; } - let cell_idx = rng.next_u64() as usize % page.cell_count(); - let (_, len) = page.cell_get_raw_region(cell_idx, usable_space); - drop_cell(page, cell_idx, usable_space).unwrap(); + let cell_idx = rng.next_u64() as usize % page_contents.cell_count(); + let (_, len) = page_contents.cell_get_raw_region(cell_idx, usable_space); + drop_cell(page_contents, cell_idx, usable_space).unwrap(); total_size -= len + 2; cells.remove(cell_idx); } 2 => { - defragment_page(page, usable_space, 4).unwrap(); + defragment_page(page_contents, usable_space, 4).unwrap(); } _ => unreachable!(), } - let free = compute_free_space(page, usable_space); + let free = compute_free_space(page_contents, usable_space); assert_eq!(free, 4096 - total_size - header_size); } } @@ -9264,14 +9291,14 @@ mod tests { let conn = db.connect().unwrap(); let page = get_page(2); let page = page.get(); - let page = page.get_contents(); + let page_contents = page.get_contents(); let header_size = 8; let usable_space = 4096; let regs = &[Register::Value(Value::Integer(0))]; let record = ImmutableRecord::from_registers(regs, regs.len()); - let payload = add_record(0, 0, page, record, &conn); - let free = compute_free_space(page, usable_space); + let payload = add_record(0, 0, page.clone(), record, &conn); + let free = compute_free_space(page_contents, usable_space); assert_eq!(free, 4096 - payload.len() - 2 - header_size); } @@ -9282,18 +9309,18 @@ mod tests { let page = get_page(2); let page = page.get(); - let page = page.get_contents(); + let page_contents = page.get_contents(); let usable_space = 4096; let regs = &[Register::Value(Value::Integer(0))]; let record = ImmutableRecord::from_registers(regs, regs.len()); - let payload = add_record(0, 0, page, record, &conn); + let payload = add_record(0, 0, page.clone(), record, &conn); - assert_eq!(page.cell_count(), 1); - defragment_page(page, usable_space, 4).unwrap(); - assert_eq!(page.cell_count(), 1); - let (start, len) = page.cell_get_raw_region(0, usable_space); - let buf = page.as_ptr(); + assert_eq!(page_contents.cell_count(), 1); + defragment_page(page_contents, usable_space, 4).unwrap(); + assert_eq!(page_contents.cell_count(), 1); + let (start, len) = page_contents.cell_get_raw_region(0, usable_space); + let buf = page_contents.as_ptr(); assert_eq!(&payload, &buf[start..start + len]); } @@ -9304,7 +9331,7 @@ mod tests { let page = get_page(2); let page = page.get(); - let page = page.get_contents(); + let page_contents = page.get_contents(); let usable_space = 4096; let regs = &[ @@ -9312,19 +9339,19 @@ mod tests { Register::Value(Value::Text(Text::new("aaaaaaaa"))), ]; let record = ImmutableRecord::from_registers(regs, regs.len()); - let _ = add_record(0, 0, page, record, &conn); + let _ = add_record(0, 0, page.clone(), record, &conn); - assert_eq!(page.cell_count(), 1); - drop_cell(page, 0, usable_space).unwrap(); - assert_eq!(page.cell_count(), 0); + assert_eq!(page_contents.cell_count(), 1); + drop_cell(page_contents, 0, usable_space).unwrap(); + assert_eq!(page_contents.cell_count(), 0); let regs = &[Register::Value(Value::Integer(0))]; let record = ImmutableRecord::from_registers(regs, regs.len()); - let payload = add_record(0, 0, page, record, &conn); - assert_eq!(page.cell_count(), 1); + let payload = add_record(0, 0, page.clone(), record, &conn); + assert_eq!(page_contents.cell_count(), 1); - let (start, len) = page.cell_get_raw_region(0, usable_space); - let buf = page.as_ptr(); + let (start, len) = page_contents.cell_get_raw_region(0, usable_space); + let buf = page_contents.as_ptr(); assert_eq!(&payload, &buf[start..start + len]); } @@ -9335,7 +9362,7 @@ mod tests { let page = get_page(2); let page = page.get(); - let page = page.get_contents(); + let page_contents = page.get_contents(); let usable_space = 4096; let regs = &[ @@ -9343,20 +9370,20 @@ mod tests { Register::Value(Value::Text(Text::new("aaaaaaaa"))), ]; let record = ImmutableRecord::from_registers(regs, regs.len()); - let _ = add_record(0, 0, page, record, &conn); + let _ = add_record(0, 0, page.clone(), record, &conn); for _ in 0..100 { - assert_eq!(page.cell_count(), 1); - drop_cell(page, 0, usable_space).unwrap(); - assert_eq!(page.cell_count(), 0); + assert_eq!(page_contents.cell_count(), 1); + drop_cell(page_contents, 0, usable_space).unwrap(); + assert_eq!(page_contents.cell_count(), 0); let regs = &[Register::Value(Value::Integer(0))]; let record = ImmutableRecord::from_registers(regs, regs.len()); - let payload = add_record(0, 0, page, record, &conn); - assert_eq!(page.cell_count(), 1); + let payload = add_record(0, 0, page.clone(), record, &conn); + assert_eq!(page_contents.cell_count(), 1); - let (start, len) = page.cell_get_raw_region(0, usable_space); - let buf = page.as_ptr(); + let (start, len) = page_contents.cell_get_raw_region(0, usable_space); + let buf = page_contents.as_ptr(); assert_eq!(&payload, &buf[start..start + len]); } } @@ -9368,23 +9395,23 @@ mod tests { let page = get_page(2); let page = page.get(); - let page = page.get_contents(); + let page_contents = page.get_contents(); let usable_space = 4096; let regs = &[Register::Value(Value::Integer(0))]; let record = ImmutableRecord::from_registers(regs, regs.len()); - let payload = add_record(0, 0, page, record, &conn); + let payload = add_record(0, 0, page.clone(), record, &conn); let regs = &[Register::Value(Value::Integer(1))]; let record = ImmutableRecord::from_registers(regs, regs.len()); - let _ = add_record(1, 1, page, record, &conn); + let _ = add_record(1, 1, page.clone(), record, &conn); let regs = &[Register::Value(Value::Integer(2))]; let record = ImmutableRecord::from_registers(regs, regs.len()); - let _ = add_record(2, 2, page, record, &conn); + let _ = add_record(2, 2, page.clone(), record, &conn); - drop_cell(page, 1, usable_space).unwrap(); - drop_cell(page, 1, usable_space).unwrap(); + drop_cell(page_contents, 1, usable_space).unwrap(); + drop_cell(page_contents, 1, usable_space).unwrap(); - ensure_cell(page, 0, &payload); + ensure_cell(page_contents, 0, &payload); } #[test] @@ -9394,29 +9421,29 @@ mod tests { let page = get_page(2); let page = page.get(); - let page = page.get_contents(); + let page_contents = page.get_contents(); let usable_space = 4096; let regs = &[Register::Value(Value::Integer(0))]; let record = ImmutableRecord::from_registers(regs, regs.len()); - let _ = add_record(0, 0, page, record, &conn); + let _ = add_record(0, 0, page.clone(), record, &conn); let regs = &[Register::Value(Value::Integer(0))]; let record = ImmutableRecord::from_registers(regs, regs.len()); - let _ = add_record(0, 0, page, record, &conn); - drop_cell(page, 0, usable_space).unwrap(); + let _ = add_record(0, 0, page.clone(), record, &conn); + drop_cell(page_contents, 0, usable_space).unwrap(); - defragment_page(page, usable_space, 4).unwrap(); + defragment_page(page_contents, usable_space, 4).unwrap(); let regs = &[Register::Value(Value::Integer(0))]; let record = ImmutableRecord::from_registers(regs, regs.len()); - let _ = add_record(0, 1, page, record, &conn); + let _ = add_record(0, 1, page.clone(), record, &conn); - drop_cell(page, 0, usable_space).unwrap(); + drop_cell(page_contents, 0, usable_space).unwrap(); let regs = &[Register::Value(Value::Integer(0))]; let record = ImmutableRecord::from_registers(regs, regs.len()); - let _ = add_record(0, 1, page, record, &conn); + let _ = add_record(0, 1, page.clone(), record, &conn); } #[test] @@ -9440,21 +9467,21 @@ mod tests { let page = page.get(); defragment(page.get_contents()); defragment(page.get_contents()); - insert(0, page.get_contents()); + insert(0, page.clone()); drop(0, page.get_contents()); - insert(0, page.get_contents()); + insert(0, page.clone()); drop(0, page.get_contents()); - insert(0, page.get_contents()); + insert(0, page.clone()); defragment(page.get_contents()); defragment(page.get_contents()); drop(0, page.get_contents()); defragment(page.get_contents()); - insert(0, page.get_contents()); + insert(0, page.clone()); drop(0, page.get_contents()); - insert(0, page.get_contents()); - insert(1, page.get_contents()); - insert(1, page.get_contents()); - insert(0, page.get_contents()); + insert(0, page.clone()); + insert(1, page.clone()); + insert(1, page.clone()); + insert(0, page.clone()); drop(3, page.get_contents()); drop(2, page.get_contents()); compute_free_space(page.get_contents(), usable_space); @@ -9485,7 +9512,7 @@ mod tests { run_until_done( || { fill_cell_payload( - page.get().get_contents(), + page.get(), Some(0), &mut payload, 0, @@ -9499,11 +9526,11 @@ mod tests { ) .unwrap(); let page = page.get(); - insert(0, page.get_contents()); + insert(0, page.clone()); defragment(page.get_contents()); - insert(0, page.get_contents()); + insert(0, page.clone()); defragment(page.get_contents()); - insert(0, page.get_contents()); + insert(0, page.clone()); drop(2, page.get_contents()); drop(0, page.get_contents()); let free = compute_free_space(page.get_contents(), usable_space); @@ -9571,7 +9598,7 @@ mod tests { run_until_done( || { fill_cell_payload( - page.get().get_contents(), + page.get(), Some(0), &mut payload, 0, @@ -9923,7 +9950,7 @@ mod tests { while compute_free_space(page.get_contents(), pager.usable_space()) >= size as usize + 10 { - insert_cell(i, size, page.get_contents(), pager.clone()); + insert_cell(i, size, page.clone(), pager.clone()); i += 1; size = (rng.next_u64() % 1024) as u16; } @@ -9974,15 +10001,16 @@ mod tests { } } - fn insert_cell(cell_idx: u64, size: u16, contents: &mut PageContent, pager: Rc) { + fn insert_cell(cell_idx: u64, size: u16, page: PageRef, pager: Rc) { let mut payload = Vec::new(); let regs = &[Register::Value(Value::Blob(vec![0; size as usize]))]; let record = ImmutableRecord::from_registers(regs, regs.len()); let mut fill_cell_payload_state = FillCellPayloadState::Start; + let contents = page.get_contents(); run_until_done( || { fill_cell_payload( - contents, + page.clone(), Some(cell_idx as i64), &mut payload, cell_idx as usize,