diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 80fc27e56..60a708f38 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -1,3 +1,4 @@ +use parking_lot::Mutex; use tracing::{instrument, Level}; use crate::{ @@ -228,11 +229,20 @@ struct DeleteInfo { #[derive(Debug, Clone)] pub enum OverwriteCellState { - /// Fill the cell payload with the new value. - FillPayload, - /// Clear the overflow pages of the old celland overwrite the cell. + /// Allocate a new payload for the cell. + AllocatePayload, + /// Fill the cell payload with the new payload. + FillPayload { + /// Dumb double-indirection via Arc because we clone [WriteState] for some reason and we use unsafe in [FillCellPayloadState::AllocateOverflowPages] + /// so the underlying Vec must not be cloned in upper layers. + new_payload: Arc>>, + rowid: Option, + fill_cell_payload_state: FillCellPayloadState, + }, + /// Clear the old cell's overflow pages and add them to the freelist. + /// Overwrite the cell with the new payload. ClearOverflowPagesAndOverwrite { - new_payload: Vec, + new_payload: Arc>>, old_offset: usize, old_local_size: usize, }, @@ -256,6 +266,8 @@ enum WriteState { Insert { page: Arc, cell_idx: usize, + new_payload: Vec, + fill_cell_payload_state: FillCellPayloadState, }, BalanceStart, BalanceFreePages { @@ -2183,7 +2195,7 @@ impl BTreeCursor { write_info.state = WriteState::Overwrite { page: page.clone(), cell_idx, - state: OverwriteCellState::FillPayload, + state: OverwriteCellState::AllocatePayload, }; continue; } @@ -2208,7 +2220,7 @@ impl BTreeCursor { write_info.state = WriteState::Overwrite { page: page.clone(), cell_idx, - state: OverwriteCellState::FillPayload, + state: OverwriteCellState::AllocatePayload, }; continue; } else { @@ -2229,20 +2241,27 @@ impl BTreeCursor { write_info.state = WriteState::Insert { page: page.clone(), cell_idx, + new_payload: Vec::with_capacity(record_values.len() + 4), + fill_cell_payload_state: FillCellPayloadState::Start, }; continue; } - WriteState::Insert { page, cell_idx } => { - let mut cell_payload: Vec = Vec::with_capacity(record_values.len() + 4); - fill_cell_payload( + WriteState::Insert { + page, + cell_idx, + mut new_payload, + mut fill_cell_payload_state, + } => { + return_if_io!(fill_cell_payload( page.get().get().contents.as_ref().unwrap(), bkey.maybe_rowid(), - &mut cell_payload, + &mut new_payload, cell_idx, record, self.usable_space(), self.pager.clone(), - ); + &mut fill_cell_payload_state, + )); { let page = page.get(); @@ -2251,7 +2270,7 @@ impl BTreeCursor { insert_into_cell( contents, - cell_payload.as_slice(), + new_payload.as_slice(), cell_idx, self.usable_space() as u16, )?; @@ -3160,7 +3179,17 @@ impl BTreeCursor { pages_to_balance_new[i].replace(page.clone()); } else { // FIXME: handle page cache is full - let page = self.allocate_page(page_type, 0)?; + let mut page = self.allocate_page(page_type, 0)?; + // FIXME: add new state machine state instead of this sync IO hack + while matches!(page, IOResult::IO) { + self.pager.io.run_once()?; + page = self.allocate_page(page_type, 0)?; + } + let IOResult::Done(page) = page else { + return Err(LimboError::InternalError( + "Failed to allocate page".into(), + )); + }; pages_to_balance_new[i].replace(page); // Since this page didn't exist before, we can set it to cells length as it // marks them as empty since it is a prefix sum of cells. @@ -4030,7 +4059,7 @@ impl BTreeCursor { /// Balance the root page. /// This is done when the root page overflows, and we need to create a new root page. /// See e.g. https://en.wikipedia.org/wiki/B-tree - fn balance_root(&mut self) -> Result<()> { + fn balance_root(&mut self) -> Result> { /* todo: balance deeper, create child and copy contents of root there. Then split root */ /* if we are in root page then we just need to create a new root and push key there */ @@ -4045,9 +4074,19 @@ impl BTreeCursor { let root = root_btree.get(); let root_contents = root.get_contents(); // FIXME: handle page cache is full - let child_btree = - self.pager - .do_allocate_page(root_contents.page_type(), 0, BtreePageAllocMode::Any)?; + // FIXME: remove sync IO hack + let child_btree = loop { + match self.pager.do_allocate_page( + root_contents.page_type(), + 0, + BtreePageAllocMode::Any, + )? { + IOResult::IO => { + self.pager.io.run_once()?; + } + IOResult::Done(page) => break page, + } + }; tracing::debug!( "balance_root(root={}, rightmost={}, page_type={:?})", @@ -4108,7 +4147,7 @@ impl BTreeCursor { self.stack.push(root_btree.clone()); self.stack.set_cell_index(0); // leave parent pointing at the rightmost pointer (in this case 0, as there are no cells), since we will be balancing the rightmost child page. self.stack.push(child_btree.clone()); - Ok(()) + Ok(IOResult::Done(())) } fn usable_space(&self) -> usize { @@ -5157,21 +5196,38 @@ impl BTreeCursor { page_ref.get().get().id ); match state { - OverwriteCellState::FillPayload => { + OverwriteCellState::AllocatePayload => { + let serial_types_len = self.record_cursor.borrow_mut().len(record); + let new_payload = Vec::with_capacity(serial_types_len); + let rowid = return_if_io!(self.rowid()); + *state = OverwriteCellState::FillPayload { + new_payload: Arc::new(Mutex::new(new_payload)), + rowid, + fill_cell_payload_state: FillCellPayloadState::Start, + }; + continue; + } + OverwriteCellState::FillPayload { + new_payload, + rowid, + fill_cell_payload_state, + } => { let page = page_ref.get(); let page_contents = page.get().contents.as_ref().unwrap(); - let serial_types_len = self.record_cursor.borrow_mut().len(record); - let mut new_payload = Vec::with_capacity(serial_types_len); - let rowid = return_if_io!(self.rowid()); - fill_cell_payload( - page_contents, - rowid, - &mut new_payload, - cell_idx, - record, - self.usable_space(), - self.pager.clone(), - ); + { + let mut new_payload_mut = new_payload.lock(); + let new_payload_mut = &mut *new_payload_mut; + return_if_io!(fill_cell_payload( + page_contents, + *rowid, + new_payload_mut, + cell_idx, + record, + self.usable_space(), + self.pager.clone(), + fill_cell_payload_state, + )); + } // figure out old cell offset & size let (old_offset, old_local_size) = { let page_ref = page_ref.get(); @@ -5180,7 +5236,7 @@ impl BTreeCursor { }; *state = OverwriteCellState::ClearOverflowPagesAndOverwrite { - new_payload, + new_payload: new_payload.clone(), old_offset, old_local_size, }; @@ -5195,6 +5251,9 @@ impl BTreeCursor { let page_contents = page.get().contents.as_ref().unwrap(); let cell = page_contents.cell_get(cell_idx, self.usable_space())?; return_if_io!(self.clear_overflow_pages(&cell)); + + let mut new_payload = new_payload.lock(); + let new_payload = &mut *new_payload; // if it all fits in local space and old_local_size is enough, do an in-place overwrite if new_payload.len() == *old_local_size { self.overwrite_content(page_ref.clone(), *old_offset, new_payload)?; @@ -5393,7 +5452,7 @@ impl BTreeCursor { btree_read_page(&self.pager, page_idx) } - pub fn allocate_page(&self, page_type: PageType, offset: usize) -> Result { + pub fn allocate_page(&self, page_type: PageType, offset: usize) -> Result> { self.pager .do_allocate_page(page_type, offset, BtreePageAllocMode::Any) } @@ -6701,8 +6760,26 @@ fn allocate_cell_space(page_ref: &PageContent, amount: u16, usable_space: u16) - Ok(top as u16) } +#[derive(Debug, Clone)] +pub enum FillCellPayloadState { + Start, + AllocateOverflowPages { + /// Arc because we clone [WriteState] for some reason and we use unsafe pointer dereferences in [FillCellPayloadState::AllocateOverflowPages] + /// so the underlying bytes must not be cloned in upper layers. + record_buf: Arc<[u8]>, + space_left: usize, + to_copy_buffer_ptr: *const u8, + to_copy_buffer_len: usize, + pointer: *mut u8, + pointer_to_next: *mut u8, + }, +} + /// Fill in the cell payload with the record. /// If the record is too large to fit in the cell, it will spill onto overflow pages. +/// This function needs a separate [FillCellPayloadState] because allocating overflow pages +/// may require I/O. +#[allow(clippy::too_many_arguments)] fn fill_cell_payload( page_contents: &PageContent, int_key: Option, @@ -6711,87 +6788,147 @@ fn fill_cell_payload( record: &ImmutableRecord, usable_space: usize, pager: Rc, -) { - // TODO: make record raw from start, having to serialize is not good - let record_buf = record.get_payload().to_vec(); - - let page_type = page_contents.page_type(); - // fill in header - if matches!(page_type, PageType::IndexInterior) { - // if a write happened on an index interior page, it is always an overwrite. - // we must copy the left child pointer of the replaced cell to the new cell. - let left_child_page = page_contents.cell_interior_read_left_child_page(cell_idx); - cell_payload.extend_from_slice(&left_child_page.to_be_bytes()); - } - if matches!(page_type, PageType::TableLeaf) { - let int_key = int_key.unwrap(); - write_varint_to_vec(record_buf.len() as u64, cell_payload); - write_varint_to_vec(int_key as u64, cell_payload); - } else { - write_varint_to_vec(record_buf.len() as u64, cell_payload); - } - - let payload_overflow_threshold_max = payload_overflow_threshold_max(page_type, usable_space); - tracing::debug!( - "fill_cell_payload(record_size={}, payload_overflow_threshold_max={})", - record_buf.len(), - payload_overflow_threshold_max - ); - if record_buf.len() <= payload_overflow_threshold_max { - // enough allowed space to fit inside a btree page - cell_payload.extend_from_slice(record_buf.as_slice()); - return; - } - - let payload_overflow_threshold_min = payload_overflow_threshold_min(page_type, usable_space); - // see e.g. https://github.com/sqlite/sqlite/blob/9591d3fe93936533c8c3b0dc4d025ac999539e11/src/dbstat.c#L371 - let mut space_left = payload_overflow_threshold_min - + (record_buf.len() - payload_overflow_threshold_min) % (usable_space - 4); - - if space_left > payload_overflow_threshold_max { - space_left = payload_overflow_threshold_min; - } - - // cell_size must be equal to first value of space_left as this will be the bytes copied to non-overflow page. - let cell_size = space_left + cell_payload.len() + 4; // 4 is the number of bytes of pointer to first overflow page - let mut to_copy_buffer = record_buf.as_slice(); - - let prev_size = cell_payload.len(); - cell_payload.resize(prev_size + space_left + 4, 0); - let mut pointer = unsafe { cell_payload.as_mut_ptr().add(prev_size) }; - let mut pointer_to_next = unsafe { cell_payload.as_mut_ptr().add(prev_size + space_left) }; - + state: &mut FillCellPayloadState, +) -> Result> { loop { - let to_copy = space_left.min(to_copy_buffer.len()); - unsafe { std::ptr::copy(to_copy_buffer.as_ptr(), pointer, to_copy) }; + match state { + FillCellPayloadState::Start => { + // TODO: make record raw from start, having to serialize is not good + let record_buf: Arc<[u8]> = Arc::from(record.get_payload()); - let left = to_copy_buffer.len() - to_copy; - if left == 0 { - break; + let page_type = page_contents.page_type(); + // fill in header + if matches!(page_type, PageType::IndexInterior) { + // if a write happened on an index interior page, it is always an overwrite. + // we must copy the left child pointer of the replaced cell to the new cell. + let left_child_page = + page_contents.cell_interior_read_left_child_page(cell_idx); + cell_payload.extend_from_slice(&left_child_page.to_be_bytes()); + } + if matches!(page_type, PageType::TableLeaf) { + let int_key = int_key.unwrap(); + write_varint_to_vec(record_buf.len() as u64, cell_payload); + write_varint_to_vec(int_key as u64, cell_payload); + } else { + write_varint_to_vec(record_buf.len() as u64, cell_payload); + } + + let payload_overflow_threshold_max = + payload_overflow_threshold_max(page_type, usable_space); + tracing::debug!( + "fill_cell_payload(record_size={}, payload_overflow_threshold_max={})", + record_buf.len(), + payload_overflow_threshold_max + ); + if record_buf.len() <= payload_overflow_threshold_max { + // enough allowed space to fit inside a btree page + cell_payload.extend_from_slice(record_buf.as_ref()); + return Ok(IOResult::Done(())); + } + + let payload_overflow_threshold_min = + payload_overflow_threshold_min(page_type, usable_space); + // see e.g. https://github.com/sqlite/sqlite/blob/9591d3fe93936533c8c3b0dc4d025ac999539e11/src/dbstat.c#L371 + let mut space_left = payload_overflow_threshold_min + + (record_buf.len() - payload_overflow_threshold_min) % (usable_space - 4); + + if space_left > payload_overflow_threshold_max { + space_left = payload_overflow_threshold_min; + } + + // cell_size must be equal to first value of space_left as this will be the bytes copied to non-overflow page. + let cell_size = space_left + cell_payload.len() + 4; // 4 is the number of bytes of pointer to first overflow page + let to_copy_buffer = record_buf.as_ref(); + + let prev_size = cell_payload.len(); + cell_payload.resize(prev_size + space_left + 4, 0); + assert_eq!( + cell_size, + cell_payload.len(), + "cell_size={} != cell_payload.len()={}", + cell_size, + cell_payload.len() + ); + + // SAFETY: this pointer is valid because it points to a buffer in an Arc>> that lives at least as long as this function, + // and the Vec will not be mutated in FillCellPayloadState::AllocateOverflowPages, which we will move to next. + let pointer = unsafe { cell_payload.as_mut_ptr().add(prev_size) }; + let pointer_to_next = + unsafe { cell_payload.as_mut_ptr().add(prev_size + space_left) }; + + let to_copy_buffer_ptr = to_copy_buffer.as_ptr(); + let to_copy_buffer_len = to_copy_buffer.len(); + + *state = FillCellPayloadState::AllocateOverflowPages { + record_buf, + space_left, + to_copy_buffer_ptr, + to_copy_buffer_len, + pointer, + pointer_to_next, + }; + continue; + } + FillCellPayloadState::AllocateOverflowPages { + record_buf: _record_buf, + space_left, + to_copy_buffer_ptr, + to_copy_buffer_len, + pointer, + pointer_to_next, + } => { + let to_copy; + { + let to_copy_buffer_ptr = *to_copy_buffer_ptr; + let to_copy_buffer_len = *to_copy_buffer_len; + let pointer = *pointer; + let space_left = *space_left; + + // SAFETY: we know to_copy_buffer_ptr is valid because it refers to record_buf which lives at least as long as this function, + // and the underlying bytes are not mutated in FillCellPayloadState::AllocateOverflowPages. + let to_copy_buffer = unsafe { + std::slice::from_raw_parts(to_copy_buffer_ptr, to_copy_buffer_len) + }; + to_copy = space_left.min(to_copy_buffer_len); + // SAFETY: we know 'pointer' is valid because it refers to cell_payload which lives at least as long as this function, + // and the underlying bytes are not mutated in FillCellPayloadState::AllocateOverflowPages. + unsafe { std::ptr::copy(to_copy_buffer_ptr, pointer, to_copy) }; + + let left = to_copy_buffer.len() - to_copy; + if left == 0 { + break; + } + } + + // we still have bytes to add, we will need to allocate new overflow page + // FIXME: handle page cache is full + let overflow_page = return_if_io!(pager.allocate_overflow_page()); + turso_assert!(overflow_page.is_loaded(), "overflow page is not loaded"); + { + let id = overflow_page.get().id as u32; + let contents = overflow_page.get_contents(); + + // TODO: take into account offset here? + let buf = contents.as_ptr(); + let as_bytes = id.to_be_bytes(); + // update pointer to new overflow page + // SAFETY: we know 'pointer_to_next' is valid because it refers to an offset in cell_payload which is less than space_left + 4, + // and the underlying bytes are not mutated in FillCellPayloadState::AllocateOverflowPages. + unsafe { std::ptr::copy(as_bytes.as_ptr(), *pointer_to_next, 4) }; + + *pointer = unsafe { buf.as_mut_ptr().add(4) }; + *pointer_to_next = buf.as_mut_ptr(); + *space_left = usable_space - 4; + } + + *to_copy_buffer_len -= to_copy; + // SAFETY: we know 'to_copy_buffer_ptr' is valid because it refers to record_buf which lives at least as long as this function, + // and that the offset is less than its length, and the underlying bytes are not mutated in FillCellPayloadState::AllocateOverflowPages. + *to_copy_buffer_ptr = unsafe { to_copy_buffer_ptr.add(to_copy) }; + } } - - // we still have bytes to add, we will need to allocate new overflow page - // FIXME: handle page cache is full - let overflow_page = pager.allocate_overflow_page(); - { - let id = overflow_page.get().id as u32; - let contents = overflow_page.get().contents.as_mut().unwrap(); - - // TODO: take into account offset here? - let buf = contents.as_ptr(); - let as_bytes = id.to_be_bytes(); - // update pointer to new overflow page - unsafe { std::ptr::copy(as_bytes.as_ptr(), pointer_to_next, 4) }; - - pointer = unsafe { buf.as_mut_ptr().add(4) }; - pointer_to_next = buf.as_mut_ptr(); - space_left = usable_space - 4; - } - - to_copy_buffer = &to_copy_buffer[to_copy..]; } - - assert_eq!(cell_size, cell_payload.len()); + Ok(IOResult::Done(())) } /// Returns the maximum payload size (X) that can be stored directly on a b-tree page without spilling to overflow pages. @@ -6960,15 +7097,23 @@ mod tests { conn: &Arc, ) -> Vec { let mut payload: Vec = Vec::new(); - fill_cell_payload( - page, - Some(id as i64), - &mut payload, - pos, - &record, - 4096, - conn.pager.borrow().clone(), - ); + let mut fill_cell_payload_state = FillCellPayloadState::Start; + run_until_done( + || { + fill_cell_payload( + page, + Some(id as i64), + &mut payload, + pos, + &record, + 4096, + conn.pager.borrow().clone(), + &mut fill_cell_payload_state, + ) + }, + &conn.pager.borrow().clone(), + ) + .unwrap(); insert_into_cell(page, &payload, pos, 4096).unwrap(); payload } @@ -7209,7 +7354,7 @@ mod tests { // FIXME: handle page cache is full let _ = run_until_done(|| pager.allocate_page1(), &pager); - let page2 = pager.allocate_page().unwrap(); + let page2 = run_until_done(|| pager.allocate_page(), &pager).unwrap(); let page2 = Arc::new(BTreePageInner { page: RefCell::new(page2), }); @@ -8320,11 +8465,20 @@ mod tests { let mut cursor = BTreeCursor::new_table(None, pager.clone(), 2, num_columns); // Initialize page 2 as a root page (interior) - let root_page = cursor.allocate_page(PageType::TableInterior, 0)?; + let root_page = run_until_done( + || cursor.allocate_page(PageType::TableInterior, 0), + &cursor.pager, + )?; // Allocate two leaf pages - let page3 = cursor.allocate_page(PageType::TableLeaf, 0)?; - let page4 = cursor.allocate_page(PageType::TableLeaf, 0)?; + let page3 = run_until_done( + || cursor.allocate_page(PageType::TableLeaf, 0), + &cursor.pager, + )?; + let page4 = run_until_done( + || cursor.allocate_page(PageType::TableLeaf, 0), + &cursor.pager, + )?; // Configure the root page to point to the two leaf pages { @@ -8502,15 +8656,23 @@ mod tests { let regs = &[Register::Value(Value::Integer(i as i64))]; let record = ImmutableRecord::from_registers(regs, regs.len()); let mut payload: Vec = Vec::new(); - fill_cell_payload( - page, - Some(i as i64), - &mut payload, - cell_idx, - &record, - 4096, - conn.pager.borrow().clone(), - ); + let mut fill_cell_payload_state = FillCellPayloadState::Start; + run_until_done( + || { + fill_cell_payload( + page, + Some(i as i64), + &mut payload, + cell_idx, + &record, + 4096, + conn.pager.borrow().clone(), + &mut fill_cell_payload_state, + ) + }, + &conn.pager.borrow().clone(), + ) + .unwrap(); if (free as usize) < payload.len() + 2 { // do not try to insert overflow pages because they require balancing continue; @@ -8576,15 +8738,23 @@ mod tests { let regs = &[Register::Value(Value::Integer(i))]; let record = ImmutableRecord::from_registers(regs, regs.len()); let mut payload: Vec = Vec::new(); - fill_cell_payload( - page, - Some(i), - &mut payload, - cell_idx, - &record, - 4096, - conn.pager.borrow().clone(), - ); + let mut fill_cell_payload_state = FillCellPayloadState::Start; + run_until_done( + || { + fill_cell_payload( + page, + Some(i), + &mut payload, + cell_idx, + &record, + 4096, + conn.pager.borrow().clone(), + &mut fill_cell_payload_state, + ) + }, + &conn.pager.borrow().clone(), + ) + .unwrap(); if (free as usize) < payload.len() - 2 { // do not try to insert overflow pages because they require balancing continue; @@ -8941,15 +9111,23 @@ mod tests { let regs = &[Register::Value(Value::Integer(0))]; let record = ImmutableRecord::from_registers(regs, regs.len()); let mut payload: Vec = Vec::new(); - fill_cell_payload( - page.get().get_contents(), - Some(0), - &mut payload, - 0, - &record, - 4096, - conn.pager.borrow().clone(), - ); + let mut fill_cell_payload_state = FillCellPayloadState::Start; + run_until_done( + || { + fill_cell_payload( + page.get().get_contents(), + Some(0), + &mut payload, + 0, + &record, + 4096, + conn.pager.borrow().clone(), + &mut fill_cell_payload_state, + ) + }, + &conn.pager.borrow().clone(), + ) + .unwrap(); let page = page.get(); insert(0, page.get_contents()); defragment(page.get_contents()); @@ -9019,15 +9197,23 @@ mod tests { let regs = &[Register::Value(Value::Blob(vec![0; 3600]))]; let record = ImmutableRecord::from_registers(regs, regs.len()); let mut payload: Vec = Vec::new(); - fill_cell_payload( - page.get().get_contents(), - Some(0), - &mut payload, - 0, - &record, - 4096, - conn.pager.borrow().clone(), - ); + let mut fill_cell_payload_state = FillCellPayloadState::Start; + run_until_done( + || { + fill_cell_payload( + page.get().get_contents(), + Some(0), + &mut payload, + 0, + &record, + 4096, + conn.pager.borrow().clone(), + &mut fill_cell_payload_state, + ) + }, + &conn.pager.borrow().clone(), + ) + .unwrap(); insert_into_cell(page.get().get_contents(), &payload, 0, 4096).unwrap(); let free = compute_free_space(page.get().get_contents(), usable_space); let total_size = payload.len() + 2; @@ -9355,7 +9541,7 @@ mod tests { let mut cells_cloned = Vec::new(); let (pager, _, _, _) = empty_btree(); let page_type = PageType::TableLeaf; - let page = pager.allocate_page().unwrap(); + let page = run_until_done(|| pager.allocate_page(), &pager).unwrap(); let page = Arc::new(BTreePageInner { page: RefCell::new(page), }); @@ -9427,15 +9613,23 @@ mod tests { let mut payload = Vec::new(); let regs = &[Register::Value(Value::Blob(vec![0; size as usize]))]; let record = ImmutableRecord::from_registers(regs, regs.len()); - fill_cell_payload( - contents, - Some(cell_idx as i64), - &mut payload, - cell_idx as usize, - &record, - pager.usable_space(), - pager.clone(), - ); + let mut fill_cell_payload_state = FillCellPayloadState::Start; + run_until_done( + || { + fill_cell_payload( + contents, + Some(cell_idx as i64), + &mut payload, + cell_idx as usize, + &record, + pager.usable_space(), + pager.clone(), + &mut fill_cell_payload_state, + ) + }, + &pager, + ) + .unwrap(); insert_into_cell( contents, &payload, diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 3b4d4a06f..17f8e7d61 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -221,6 +221,10 @@ enum CheckpointState { } /// The mode of allocating a btree page. +/// SQLite defines the following: +/// #define BTALLOC_ANY 0 /* Allocate any page */ +/// #define BTALLOC_EXACT 1 /* Allocate exact page if possible */ +/// #define BTALLOC_LE 2 /* Allocate any page <= the parameter */ pub enum BtreePageAllocMode { /// Allocate any btree page Any, @@ -335,6 +339,9 @@ pub struct Pager { pub db_state: Arc, /// Mutex for synchronizing database initialization to prevent race conditions init_lock: Arc>, + /// The state of the current allocate page operation. + allocate_page_state: RefCell, + /// The state of the current allocate page1 operation. allocate_page1_state: RefCell, /// Cache page_size and reserved_space at Pager init and reuse for subsequent /// `usable_space` calls. TODO: Invalidate reserved_space when we add the functionality @@ -355,6 +362,29 @@ pub enum PagerCommitResult { Rollback, } +#[derive(Debug, Clone)] +enum AllocatePageState { + Start, + /// Search the trunk page for an available free list leaf. + /// If none are found, there are two options: + /// - If there are no more trunk pages, the freelist is empty, so allocate a new page. + /// - If there are more trunk pages, use the current first trunk page as the new allocation, + /// and set the next trunk page as the database's "first freelist trunk page". + SearchAvailableFreeListLeaf { + trunk_page: PageRef, + current_db_size: u32, + }, + /// If a freelist leaf is found, reuse it for the page allocation and remove it from the trunk page. + ReuseFreelistLeaf { + trunk_page: PageRef, + number_of_freelist_leaves: u32, + }, + /// If a suitable freelist leaf is not found, allocate an entirely new page. + AllocateNewPage { + current_db_size: u32, + }, +} + #[derive(Clone)] enum AllocatePage1State { Start, @@ -421,6 +451,7 @@ impl Pager { dirty_pages: Vec::new(), }), free_page_state: RefCell::new(FreePageState::Start), + allocate_page_state: RefCell::new(AllocatePageState::Start), }) } @@ -612,9 +643,8 @@ impl Pager { }; #[cfg(feature = "omit_autovacuum")] { - let page = self.do_allocate_page(page_type, 0, BtreePageAllocMode::Any)?; - let page_id = page.get().get().id; - Ok(IOResult::Done(page_id as u32)) + let page = return_if_io!(self.do_allocate_page(page_type, 0, BtreePageAllocMode::Any)); + Ok(IOResult::Done(page.get().get().id as u32)) } // If autovacuum is enabled, we need to allocate a new page number that is greater than the largest root page number @@ -623,9 +653,9 @@ impl Pager { let auto_vacuum_mode = self.auto_vacuum_mode.borrow(); match *auto_vacuum_mode { AutoVacuumMode::None => { - let page = self.do_allocate_page(page_type, 0, BtreePageAllocMode::Any)?; - let page_id = page.get().get().id; - Ok(IOResult::Done(page_id as u32)) + let page = + return_if_io!(self.do_allocate_page(page_type, 0, BtreePageAllocMode::Any)); + Ok(IOResult::Done(page.get().get().id as u32)) } AutoVacuumMode::Full => { let mut root_page_num = @@ -648,11 +678,11 @@ impl Pager { assert!(root_page_num >= 3); // the very first root page is page 3 // root_page_num here is the desired root page - let page = self.do_allocate_page( + let page = return_if_io!(self.do_allocate_page( page_type, 0, BtreePageAllocMode::Exact(root_page_num), - )?; + )); let allocated_page_id = page.get().get().id as u32; if allocated_page_id != root_page_num { // TODO(Zaid): Handle swapping the allocated page with the desired root page @@ -676,8 +706,8 @@ impl Pager { /// Allocate a new overflow page. /// This is done when a cell overflows and new space is needed. // FIXME: handle no room in page cache - pub fn allocate_overflow_page(&self) -> PageRef { - let page = self.allocate_page().unwrap(); + pub fn allocate_overflow_page(&self) -> Result> { + let page = return_if_io!(self.allocate_page()); tracing::debug!("Pager::allocate_overflow_page(id={})", page.get().id); // setup overflow page @@ -685,7 +715,7 @@ impl Pager { let buf = contents.as_ptr(); buf.fill(0); - page + Ok(IOResult::Done(page)) } /// Allocate a new page to the btree via the pager. @@ -696,8 +726,8 @@ impl Pager { page_type: PageType, offset: usize, _alloc_mode: BtreePageAllocMode, - ) -> Result { - let page = self.allocate_page()?; + ) -> Result> { + let page = return_if_io!(self.allocate_page()); let page = Arc::new(BTreePageInner { page: RefCell::new(page), }); @@ -707,7 +737,7 @@ impl Pager { page.get().get().id, page.get().get_contents().page_type() ); - Ok(page) + Ok(IOResult::Done(page)) } /// The "usable size" of a database page is the page size specified by the 2-byte integer at offset 16 @@ -1412,7 +1442,7 @@ impl Pager { if let Some(size) = self.page_size.get() { default_header.update_page_size(size); } - let page = allocate_page(1, &self.buffer_pool, 0); + let page = allocate_new_page(1, &self.buffer_pool, 0); let contents = page.get_contents(); contents.write_database_header(&default_header); @@ -1470,63 +1500,239 @@ impl Pager { ) } - /* - Gets a new page that increasing the size of the page or uses a free page. - Currently free list pages are not yet supported. - */ - // FIXME: handle no room in page cache + /// Tries to reuse a page from the freelist if available. + /// If not, allocates a new page which increases the database size. + /// + /// FIXME: implement sqlite's 'nearby' parameter and use AllocMode. + /// SQLite's allocate_page() equivalent has a parameter 'nearby' which is a hint about the page number we want to have for the allocated page. + /// We should use this parameter to allocate the page in the same way as SQLite does; instead now we just either take the first available freelist page + /// or allocate a new page. + /// FIXME: handle no room in page cache #[allow(clippy::readonly_write_lock)] #[instrument(skip_all, level = Level::DEBUG)] - pub fn allocate_page(&self) -> Result { - let old_db_size = header_accessor::get_database_size(self)?; - #[allow(unused_mut)] - let mut new_db_size = old_db_size + 1; + pub fn allocate_page(&self) -> Result> { + const FREELIST_TRUNK_OFFSET_NEXT_TRUNK: usize = 0; + const FREELIST_TRUNK_OFFSET_LEAF_COUNT: usize = 4; + const FREELIST_TRUNK_OFFSET_FIRST_LEAF: usize = 8; - tracing::debug!("allocate_page(database_size={})", new_db_size); + loop { + let mut state = self.allocate_page_state.borrow_mut(); + tracing::debug!("allocate_page(state={:?})", state); + match &mut *state { + AllocatePageState::Start => { + let old_db_size = header_accessor::get_database_size(self)?; + #[cfg(not(feature = "omit_autovacuum"))] + let mut new_db_size = old_db_size; + #[cfg(feature = "omit_autovacuum")] + let new_db_size = old_db_size; - #[cfg(not(feature = "omit_autovacuum"))] - { - // If the following conditions are met, allocate a pointer map page, add to cache and increment the database size - // - autovacuum is enabled - // - the last page is a pointer map page - if matches!(*self.auto_vacuum_mode.borrow(), AutoVacuumMode::Full) - && is_ptrmap_page(new_db_size, header_accessor::get_page_size(self)? as usize) - { - let page = allocate_page(new_db_size as usize, &self.buffer_pool, 0); - self.add_dirty(&page); + tracing::debug!("allocate_page(database_size={})", new_db_size); + #[cfg(not(feature = "omit_autovacuum"))] + { + // If the following conditions are met, allocate a pointer map page, add to cache and increment the database size + // - autovacuum is enabled + // - the last page is a pointer map page + if matches!(*self.auto_vacuum_mode.borrow(), AutoVacuumMode::Full) + && is_ptrmap_page( + new_db_size + 1, + header_accessor::get_page_size(self)? as usize, + ) + { + // we will allocate a ptrmap page, so increment size + new_db_size += 1; + let page = + allocate_new_page(new_db_size as usize, &self.buffer_pool, 0); + self.add_dirty(&page); + let page_key = PageCacheKey::new(page.get().id); + let mut cache = self.page_cache.write(); + match cache.insert(page_key, page.clone()) { + Ok(_) => (), + Err(CacheError::Full) => return Err(LimboError::CacheFull), + Err(_) => { + return Err(LimboError::InternalError( + "Unknown error inserting page to cache".into(), + )) + } + } + } + } - let page_key = PageCacheKey::new(page.get().id); - let mut cache = self.page_cache.write(); - match cache.insert(page_key, page.clone()) { - Ok(_) => (), - Err(CacheError::Full) => return Err(LimboError::CacheFull), - Err(_) => { - return Err(LimboError::InternalError( - "Unknown error inserting page to cache".into(), - )) + let first_freelist_trunk_page_id = + header_accessor::get_freelist_trunk_page(self)?; + if first_freelist_trunk_page_id == 0 { + *state = AllocatePageState::AllocateNewPage { + current_db_size: new_db_size, + }; + continue; + } + let trunk_page = self.read_page(first_freelist_trunk_page_id as usize)?; + *state = AllocatePageState::SearchAvailableFreeListLeaf { + trunk_page, + current_db_size: new_db_size, + }; + continue; + } + AllocatePageState::SearchAvailableFreeListLeaf { + trunk_page, + current_db_size, + } => { + if trunk_page.is_locked() { + return Ok(IOResult::IO); + } + turso_assert!( + trunk_page.is_loaded(), + "Freelist trunk page {} is not loaded", + trunk_page.get().id + ); + let page_contents = trunk_page.get().contents.as_ref().unwrap(); + let next_trunk_page_id = + page_contents.read_u32(FREELIST_TRUNK_OFFSET_NEXT_TRUNK); + let number_of_freelist_leaves = + page_contents.read_u32(FREELIST_TRUNK_OFFSET_LEAF_COUNT); + + // There are leaf pointers on this trunk page, so we can reuse one of the pages + // for the allocation. + if number_of_freelist_leaves != 0 { + *state = AllocatePageState::ReuseFreelistLeaf { + trunk_page: trunk_page.clone(), + number_of_freelist_leaves, + }; + continue; + } + + // No freelist leaves on this trunk page. + // If the freelist is completely empty, allocate a new page. + if next_trunk_page_id == 0 { + *state = AllocatePageState::AllocateNewPage { + current_db_size: *current_db_size, + }; + continue; + } + + // Freelist is not empty, so we can reuse the trunk itself as a new page + // and update the database's first freelist trunk page to the next trunk page. + header_accessor::set_freelist_trunk_page(self, next_trunk_page_id)?; + header_accessor::set_freelist_pages( + self, + header_accessor::get_freelist_pages(self)? - 1, + )?; + self.add_dirty(trunk_page); + // zero out the page + turso_assert!( + trunk_page.get_contents().overflow_cells.is_empty(), + "Freelist leaf page {} has overflow cells", + trunk_page.get().id + ); + trunk_page.get().contents.as_ref().unwrap().as_ptr().fill(0); + let page_key = PageCacheKey::new(trunk_page.get().id); + { + let mut page_cache = self.page_cache.write(); + turso_assert!( + page_cache.contains_key(&page_key), + "page {} is not in cache", + trunk_page.get().id + ); + } + let trunk_page = trunk_page.clone(); + *state = AllocatePageState::Start; + return Ok(IOResult::Done(trunk_page)); + } + AllocatePageState::ReuseFreelistLeaf { + trunk_page, + number_of_freelist_leaves, + } => { + turso_assert!( + trunk_page.is_loaded(), + "Freelist trunk page {} is not loaded", + trunk_page.get().id + ); + turso_assert!( + *number_of_freelist_leaves > 0, + "Freelist trunk page {} has no leaves", + trunk_page.get().id + ); + let page_contents = trunk_page.get().contents.as_ref().unwrap(); + let next_leaf_page_id = + page_contents.read_u32(FREELIST_TRUNK_OFFSET_FIRST_LEAF); + let leaf_page = self.read_page(next_leaf_page_id as usize)?; + if leaf_page.is_locked() { + return Ok(IOResult::IO); + } + self.add_dirty(&leaf_page); + // zero out the page + turso_assert!( + leaf_page.get_contents().overflow_cells.is_empty(), + "Freelist leaf page {} has overflow cells", + leaf_page.get().id + ); + leaf_page.get().contents.as_ref().unwrap().as_ptr().fill(0); + let page_key = PageCacheKey::new(leaf_page.get().id); + { + let mut page_cache = self.page_cache.write(); + turso_assert!( + page_cache.contains_key(&page_key), + "page {} is not in cache", + leaf_page.get().id + ); + } + + // Shift left all the other leaf pages in the trunk page and subtract 1 from the leaf count + let remaining_leaves_count = (*number_of_freelist_leaves - 1) as usize; + { + let buf = page_contents.as_ptr(); + // use copy within the same page + const LEAF_PTR_SIZE_BYTES: usize = 4; + let offset_remaining_leaves_start = + FREELIST_TRUNK_OFFSET_FIRST_LEAF + LEAF_PTR_SIZE_BYTES; + let offset_remaining_leaves_end = offset_remaining_leaves_start + + remaining_leaves_count * LEAF_PTR_SIZE_BYTES; + buf.copy_within( + offset_remaining_leaves_start..offset_remaining_leaves_end, + FREELIST_TRUNK_OFFSET_FIRST_LEAF, + ); + } + // write the new leaf count + page_contents.write_u32( + FREELIST_TRUNK_OFFSET_LEAF_COUNT, + remaining_leaves_count as u32, + ); + self.add_dirty(trunk_page); + + header_accessor::set_freelist_pages( + self, + header_accessor::get_freelist_pages(self)? - 1, + )?; + + *state = AllocatePageState::Start; + return Ok(IOResult::Done(leaf_page)); + } + AllocatePageState::AllocateNewPage { current_db_size } => { + let new_db_size = *current_db_size + 1; + // FIXME: should reserve page cache entry before modifying the database + let page = allocate_new_page(new_db_size as usize, &self.buffer_pool, 0); + { + // setup page and add to cache + self.add_dirty(&page); + + let page_key = PageCacheKey::new(page.get().id); + { + // Run in separate block to avoid deadlock on page cache write lock + let mut cache = self.page_cache.write(); + match cache.insert(page_key, page.clone()) { + Err(CacheError::Full) => return Err(LimboError::CacheFull), + Err(_) => { + return Err(LimboError::InternalError( + "Unknown error inserting page to cache".into(), + )) + } + Ok(_) => {} + }; + } + header_accessor::set_database_size(self, new_db_size)?; + *state = AllocatePageState::Start; + return Ok(IOResult::Done(page)); } } - // we allocated a ptrmap page, so the next data page will be at new_db_size + 1 - new_db_size += 1; - } - } - - header_accessor::set_database_size(self, new_db_size)?; - - // FIXME: should reserve page cache entry before modifying the database - let page = allocate_page(new_db_size as usize, &self.buffer_pool, 0); - { - // setup page and add to cache - self.add_dirty(&page); - - let page_key = PageCacheKey::new(page.get().id); - let mut cache = self.page_cache.write(); - match cache.insert(page_key, page.clone()) { - Err(CacheError::Full) => Err(LimboError::CacheFull), - Err(_) => Err(LimboError::InternalError( - "Unknown error inserting page to cache".into(), - )), - Ok(_) => Ok(page), } } } @@ -1594,10 +1800,11 @@ impl Pager { in_flight_writes: Rc::new(RefCell::new(0)), dirty_pages: Vec::new(), }); + self.allocate_page_state.replace(AllocatePageState::Start); } } -pub fn allocate_page(page_id: usize, buffer_pool: &Arc, offset: usize) -> PageRef { +pub fn allocate_new_page(page_id: usize, buffer_pool: &Arc, offset: usize) -> PageRef { let page = Arc::new(Page::new(page_id)); { let buffer = buffer_pool.get(); @@ -1913,11 +2120,14 @@ mod ptrmap_tests { pager.set_auto_vacuum_mode(AutoVacuumMode::Full); // Allocate all the pages as btree root pages - for _ in 0..initial_db_pages { - match pager.btree_create(&CreateBTreeFlags::new_table()) { - Ok(IOResult::Done(_root_page_id)) => (), - Ok(IOResult::IO) => { - panic!("test_pager_setup: btree_create returned IOResult::IO unexpectedly"); + const EXPECTED_FIRST_ROOT_PAGE_ID: u32 = 3; // page1 = 1, first ptrmap page = 2, root page = 3 + for i in 0..initial_db_pages { + match run_until_done( + || pager.btree_create(&CreateBTreeFlags::new_table()), + &pager, + ) { + Ok(root_page_id) => { + assert_eq!(root_page_id, EXPECTED_FIRST_ROOT_PAGE_ID + i); } Err(e) => { panic!("test_pager_setup: btree_create failed: {e:?}");