diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 80fc27e56..5a47d6665 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -2234,7 +2234,7 @@ impl BTreeCursor { } WriteState::Insert { page, cell_idx } => { let mut cell_payload: Vec = Vec::with_capacity(record_values.len() + 4); - fill_cell_payload( + return_if_io!(fill_cell_payload( page.get().get().contents.as_ref().unwrap(), bkey.maybe_rowid(), &mut cell_payload, @@ -2242,7 +2242,7 @@ impl BTreeCursor { record, self.usable_space(), self.pager.clone(), - ); + )); { let page = page.get(); @@ -3160,7 +3160,17 @@ impl BTreeCursor { pages_to_balance_new[i].replace(page.clone()); } else { // FIXME: handle page cache is full - let page = self.allocate_page(page_type, 0)?; + let mut page = self.allocate_page(page_type, 0)?; + // FIXME: add new state machine state instead of this sync IO hack + while matches!(page, IOResult::IO) { + self.pager.io.run_once()?; + page = self.allocate_page(page_type, 0)?; + } + let IOResult::Done(page) = page else { + return Err(LimboError::InternalError( + "Failed to allocate page".into(), + )); + }; pages_to_balance_new[i].replace(page); // Since this page didn't exist before, we can set it to cells length as it // marks them as empty since it is a prefix sum of cells. @@ -4030,7 +4040,7 @@ impl BTreeCursor { /// Balance the root page. /// This is done when the root page overflows, and we need to create a new root page. /// See e.g. https://en.wikipedia.org/wiki/B-tree - fn balance_root(&mut self) -> Result<()> { + fn balance_root(&mut self) -> Result> { /* todo: balance deeper, create child and copy contents of root there. Then split root */ /* if we are in root page then we just need to create a new root and push key there */ @@ -4045,9 +4055,19 @@ impl BTreeCursor { let root = root_btree.get(); let root_contents = root.get_contents(); // FIXME: handle page cache is full - let child_btree = - self.pager - .do_allocate_page(root_contents.page_type(), 0, BtreePageAllocMode::Any)?; + // FIXME: remove sync IO hack + let child_btree = loop { + match self.pager.do_allocate_page( + root_contents.page_type(), + 0, + BtreePageAllocMode::Any, + )? { + IOResult::IO => { + self.pager.io.run_once()?; + } + IOResult::Done(page) => break page, + } + }; tracing::debug!( "balance_root(root={}, rightmost={}, page_type={:?})", @@ -4108,7 +4128,7 @@ impl BTreeCursor { self.stack.push(root_btree.clone()); self.stack.set_cell_index(0); // leave parent pointing at the rightmost pointer (in this case 0, as there are no cells), since we will be balancing the rightmost child page. self.stack.push(child_btree.clone()); - Ok(()) + Ok(IOResult::Done(())) } fn usable_space(&self) -> usize { @@ -5163,7 +5183,7 @@ impl BTreeCursor { let serial_types_len = self.record_cursor.borrow_mut().len(record); let mut new_payload = Vec::with_capacity(serial_types_len); let rowid = return_if_io!(self.rowid()); - fill_cell_payload( + return_if_io!(fill_cell_payload( page_contents, rowid, &mut new_payload, @@ -5171,7 +5191,7 @@ impl BTreeCursor { record, self.usable_space(), self.pager.clone(), - ); + )); // figure out old cell offset & size let (old_offset, old_local_size) = { let page_ref = page_ref.get(); @@ -5393,7 +5413,7 @@ impl BTreeCursor { btree_read_page(&self.pager, page_idx) } - pub fn allocate_page(&self, page_type: PageType, offset: usize) -> Result { + pub fn allocate_page(&self, page_type: PageType, offset: usize) -> Result> { self.pager .do_allocate_page(page_type, offset, BtreePageAllocMode::Any) } @@ -6711,7 +6731,7 @@ fn fill_cell_payload( record: &ImmutableRecord, usable_space: usize, pager: Rc, -) { +) -> Result> { // TODO: make record raw from start, having to serialize is not good let record_buf = record.get_payload().to_vec(); @@ -6740,7 +6760,7 @@ fn fill_cell_payload( if record_buf.len() <= payload_overflow_threshold_max { // enough allowed space to fit inside a btree page cell_payload.extend_from_slice(record_buf.as_slice()); - return; + return Ok(IOResult::Done(())); } let payload_overflow_threshold_min = payload_overflow_threshold_min(page_type, usable_space); @@ -6772,7 +6792,9 @@ fn fill_cell_payload( // we still have bytes to add, we will need to allocate new overflow page // FIXME: handle page cache is full - let overflow_page = pager.allocate_overflow_page(); + // FIXME: not re-entrant!!!!!!!!!!!!!!! + let overflow_page = return_if_io!(pager.allocate_overflow_page()); + turso_assert!(overflow_page.is_loaded(), "overflow page is not loaded"); { let id = overflow_page.get().id as u32; let contents = overflow_page.get().contents.as_mut().unwrap(); @@ -6792,6 +6814,7 @@ fn fill_cell_payload( } assert_eq!(cell_size, cell_payload.len()); + Ok(IOResult::Done(())) } /// Returns the maximum payload size (X) that can be stored directly on a b-tree page without spilling to overflow pages. @@ -6960,15 +6983,21 @@ mod tests { conn: &Arc, ) -> Vec { let mut payload: Vec = Vec::new(); - fill_cell_payload( - page, - Some(id as i64), - &mut payload, - pos, - &record, - 4096, - conn.pager.borrow().clone(), - ); + run_until_done( + || { + fill_cell_payload( + page, + Some(id as i64), + &mut payload, + pos, + &record, + 4096, + conn.pager.borrow().clone(), + ) + }, + &conn.pager.borrow().clone(), + ) + .unwrap(); insert_into_cell(page, &payload, pos, 4096).unwrap(); payload } @@ -7209,7 +7238,7 @@ mod tests { // FIXME: handle page cache is full let _ = run_until_done(|| pager.allocate_page1(), &pager); - let page2 = pager.allocate_page().unwrap(); + let page2 = run_until_done(|| pager.allocate_page(), &pager).unwrap(); let page2 = Arc::new(BTreePageInner { page: RefCell::new(page2), }); @@ -8320,11 +8349,20 @@ mod tests { let mut cursor = BTreeCursor::new_table(None, pager.clone(), 2, num_columns); // Initialize page 2 as a root page (interior) - let root_page = cursor.allocate_page(PageType::TableInterior, 0)?; + let root_page = run_until_done( + || cursor.allocate_page(PageType::TableInterior, 0), + &cursor.pager, + )?; // Allocate two leaf pages - let page3 = cursor.allocate_page(PageType::TableLeaf, 0)?; - let page4 = cursor.allocate_page(PageType::TableLeaf, 0)?; + let page3 = run_until_done( + || cursor.allocate_page(PageType::TableLeaf, 0), + &cursor.pager, + )?; + let page4 = run_until_done( + || cursor.allocate_page(PageType::TableLeaf, 0), + &cursor.pager, + )?; // Configure the root page to point to the two leaf pages { @@ -8502,15 +8540,21 @@ mod tests { let regs = &[Register::Value(Value::Integer(i as i64))]; let record = ImmutableRecord::from_registers(regs, regs.len()); let mut payload: Vec = Vec::new(); - fill_cell_payload( - page, - Some(i as i64), - &mut payload, - cell_idx, - &record, - 4096, - conn.pager.borrow().clone(), - ); + run_until_done( + || { + fill_cell_payload( + page, + Some(i as i64), + &mut payload, + cell_idx, + &record, + 4096, + conn.pager.borrow().clone(), + ) + }, + &conn.pager.borrow().clone(), + ) + .unwrap(); if (free as usize) < payload.len() + 2 { // do not try to insert overflow pages because they require balancing continue; @@ -8576,15 +8620,21 @@ mod tests { let regs = &[Register::Value(Value::Integer(i))]; let record = ImmutableRecord::from_registers(regs, regs.len()); let mut payload: Vec = Vec::new(); - fill_cell_payload( - page, - Some(i), - &mut payload, - cell_idx, - &record, - 4096, - conn.pager.borrow().clone(), - ); + run_until_done( + || { + fill_cell_payload( + page, + Some(i), + &mut payload, + cell_idx, + &record, + 4096, + conn.pager.borrow().clone(), + ) + }, + &conn.pager.borrow().clone(), + ) + .unwrap(); if (free as usize) < payload.len() - 2 { // do not try to insert overflow pages because they require balancing continue; @@ -8941,15 +8991,21 @@ mod tests { let regs = &[Register::Value(Value::Integer(0))]; let record = ImmutableRecord::from_registers(regs, regs.len()); let mut payload: Vec = Vec::new(); - fill_cell_payload( - page.get().get_contents(), - Some(0), - &mut payload, - 0, - &record, - 4096, - conn.pager.borrow().clone(), - ); + run_until_done( + || { + fill_cell_payload( + page.get().get_contents(), + Some(0), + &mut payload, + 0, + &record, + 4096, + conn.pager.borrow().clone(), + ) + }, + &conn.pager.borrow().clone(), + ) + .unwrap(); let page = page.get(); insert(0, page.get_contents()); defragment(page.get_contents()); @@ -9019,15 +9075,21 @@ mod tests { let regs = &[Register::Value(Value::Blob(vec![0; 3600]))]; let record = ImmutableRecord::from_registers(regs, regs.len()); let mut payload: Vec = Vec::new(); - fill_cell_payload( - page.get().get_contents(), - Some(0), - &mut payload, - 0, - &record, - 4096, - conn.pager.borrow().clone(), - ); + run_until_done( + || { + fill_cell_payload( + page.get().get_contents(), + Some(0), + &mut payload, + 0, + &record, + 4096, + conn.pager.borrow().clone(), + ) + }, + &conn.pager.borrow().clone(), + ) + .unwrap(); insert_into_cell(page.get().get_contents(), &payload, 0, 4096).unwrap(); let free = compute_free_space(page.get().get_contents(), usable_space); let total_size = payload.len() + 2; @@ -9355,7 +9417,7 @@ mod tests { let mut cells_cloned = Vec::new(); let (pager, _, _, _) = empty_btree(); let page_type = PageType::TableLeaf; - let page = pager.allocate_page().unwrap(); + let page = run_until_done(|| pager.allocate_page(), &pager).unwrap(); let page = Arc::new(BTreePageInner { page: RefCell::new(page), }); @@ -9427,15 +9489,21 @@ mod tests { let mut payload = Vec::new(); let regs = &[Register::Value(Value::Blob(vec![0; size as usize]))]; let record = ImmutableRecord::from_registers(regs, regs.len()); - fill_cell_payload( - contents, - Some(cell_idx as i64), - &mut payload, - cell_idx as usize, - &record, - pager.usable_space(), - pager.clone(), - ); + run_until_done( + || { + fill_cell_payload( + contents, + Some(cell_idx as i64), + &mut payload, + cell_idx as usize, + &record, + pager.usable_space(), + pager.clone(), + ) + }, + &pager, + ) + .unwrap(); insert_into_cell( contents, &payload, diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 3b4d4a06f..ee4269dff 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -335,6 +335,9 @@ pub struct Pager { pub db_state: Arc, /// Mutex for synchronizing database initialization to prevent race conditions init_lock: Arc>, + /// The state of the current allocate page operation. + allocate_page_state: RefCell, + /// The state of the current allocate page1 operation. allocate_page1_state: RefCell, /// Cache page_size and reserved_space at Pager init and reuse for subsequent /// `usable_space` calls. TODO: Invalidate reserved_space when we add the functionality @@ -355,6 +358,34 @@ pub enum PagerCommitResult { Rollback, } +#[derive(Debug, Clone)] +enum AllocatePageState { + Start, + /// Load the first freelist trunk page into memory. + LoadFreelistTrunkPage { + current_trunk: u32, + current_db_size: u32, + }, + /// Search the trunk page for an available free list leaf. + /// If none are found, there are two options: + /// - If there are no more trunk pages, the freelist is empty, so allocate a new page. + /// - If there are more trunk pages, use the current first trunk page as the new allocation, + /// and set the next trunk page as the database's "first freelist trunk page". + SearchAvailableFreeListLeaf { + trunk_page: PageRef, + current_db_size: u32, + }, + /// If a freelist leaf is found, reuse it for the page allocation and remove it from the trunk page. + ReuseFreelistLeaf { + trunk_page: PageRef, + number_of_freelist_leaves: u32, + }, + /// If a suitable freelist leaf is not found, allocate an entirely new page. + AllocateNewPage { + current_db_size: u32, + }, +} + #[derive(Clone)] enum AllocatePage1State { Start, @@ -421,6 +452,7 @@ impl Pager { dirty_pages: Vec::new(), }), free_page_state: RefCell::new(FreePageState::Start), + allocate_page_state: RefCell::new(AllocatePageState::Start), }) } @@ -612,9 +644,8 @@ impl Pager { }; #[cfg(feature = "omit_autovacuum")] { - let page = self.do_allocate_page(page_type, 0, BtreePageAllocMode::Any)?; - let page_id = page.get().get().id; - Ok(IOResult::Done(page_id as u32)) + let page = return_if_io!(self.do_allocate_page(page_type, 0, BtreePageAllocMode::Any)); + Ok(IOResult::Done(page.get().get().id as u32)) } // If autovacuum is enabled, we need to allocate a new page number that is greater than the largest root page number @@ -623,9 +654,9 @@ impl Pager { let auto_vacuum_mode = self.auto_vacuum_mode.borrow(); match *auto_vacuum_mode { AutoVacuumMode::None => { - let page = self.do_allocate_page(page_type, 0, BtreePageAllocMode::Any)?; - let page_id = page.get().get().id; - Ok(IOResult::Done(page_id as u32)) + let page = + return_if_io!(self.do_allocate_page(page_type, 0, BtreePageAllocMode::Any)); + Ok(IOResult::Done(page.get().get().id as u32)) } AutoVacuumMode::Full => { let mut root_page_num = @@ -648,11 +679,11 @@ impl Pager { assert!(root_page_num >= 3); // the very first root page is page 3 // root_page_num here is the desired root page - let page = self.do_allocate_page( + let page = return_if_io!(self.do_allocate_page( page_type, 0, BtreePageAllocMode::Exact(root_page_num), - )?; + )); let allocated_page_id = page.get().get().id as u32; if allocated_page_id != root_page_num { // TODO(Zaid): Handle swapping the allocated page with the desired root page @@ -676,8 +707,8 @@ impl Pager { /// Allocate a new overflow page. /// This is done when a cell overflows and new space is needed. // FIXME: handle no room in page cache - pub fn allocate_overflow_page(&self) -> PageRef { - let page = self.allocate_page().unwrap(); + pub fn allocate_overflow_page(&self) -> Result> { + let page = return_if_io!(self.allocate_page()); tracing::debug!("Pager::allocate_overflow_page(id={})", page.get().id); // setup overflow page @@ -685,7 +716,7 @@ impl Pager { let buf = contents.as_ptr(); buf.fill(0); - page + Ok(IOResult::Done(page)) } /// Allocate a new page to the btree via the pager. @@ -696,8 +727,8 @@ impl Pager { page_type: PageType, offset: usize, _alloc_mode: BtreePageAllocMode, - ) -> Result { - let page = self.allocate_page()?; + ) -> Result> { + let page = return_if_io!(self.allocate_page()); let page = Arc::new(BTreePageInner { page: RefCell::new(page), }); @@ -707,7 +738,7 @@ impl Pager { page.get().get().id, page.get().get_contents().page_type() ); - Ok(page) + Ok(IOResult::Done(page)) } /// The "usable size" of a database page is the page size specified by the 2-byte integer at offset 16 @@ -1412,7 +1443,7 @@ impl Pager { if let Some(size) = self.page_size.get() { default_header.update_page_size(size); } - let page = allocate_page(1, &self.buffer_pool, 0); + let page = allocate_new_page(1, &self.buffer_pool, 0); let contents = page.get_contents(); contents.write_database_header(&default_header); @@ -1470,63 +1501,249 @@ impl Pager { ) } - /* - Gets a new page that increasing the size of the page or uses a free page. - Currently free list pages are not yet supported. - */ - // FIXME: handle no room in page cache + /// Tries to reuse a page from the freelist if available. + /// If not, allocates a new page which increases the database size. + /// + /// FIXME: implement sqlite's 'nearby' parameter and use AllocMode. + /// SQLite's allocate_page() equivalent has a parameter 'nearby' which is a hint about the page number we want to have for the allocated page. + /// We should use this parameter to allocate the page in the same way as SQLite does; instead now we just either take the first available freelist page + /// or allocate a new page. + /// FIXME: handle no room in page cache #[allow(clippy::readonly_write_lock)] #[instrument(skip_all, level = Level::DEBUG)] - pub fn allocate_page(&self) -> Result { - let old_db_size = header_accessor::get_database_size(self)?; - #[allow(unused_mut)] - let mut new_db_size = old_db_size + 1; + pub fn allocate_page(&self) -> Result> { + const FREELIST_TRUNK_OFFSET_NEXT_TRUNK: usize = 0; + const FREELIST_TRUNK_OFFSET_LEAF_COUNT: usize = 4; + const FREELIST_TRUNK_OFFSET_FIRST_LEAF: usize = 8; - tracing::debug!("allocate_page(database_size={})", new_db_size); + loop { + let mut state = self.allocate_page_state.borrow_mut(); + tracing::debug!("allocate_page(state={:?})", state); + match &mut *state { + AllocatePageState::Start => { + let old_db_size = header_accessor::get_database_size(self)?; + #[cfg(not(feature = "omit_autovacuum"))] + let mut new_db_size = old_db_size; + #[cfg(feature = "omit_autovacuum")] + let new_db_size = old_db_size; - #[cfg(not(feature = "omit_autovacuum"))] - { - // If the following conditions are met, allocate a pointer map page, add to cache and increment the database size - // - autovacuum is enabled - // - the last page is a pointer map page - if matches!(*self.auto_vacuum_mode.borrow(), AutoVacuumMode::Full) - && is_ptrmap_page(new_db_size, header_accessor::get_page_size(self)? as usize) - { - let page = allocate_page(new_db_size as usize, &self.buffer_pool, 0); - self.add_dirty(&page); + tracing::debug!("allocate_page(database_size={})", new_db_size); + #[cfg(not(feature = "omit_autovacuum"))] + { + // If the following conditions are met, allocate a pointer map page, add to cache and increment the database size + // - autovacuum is enabled + // - the last page is a pointer map page + if matches!(*self.auto_vacuum_mode.borrow(), AutoVacuumMode::Full) + && is_ptrmap_page( + new_db_size, + header_accessor::get_page_size(self)? as usize, + ) + { + let page = + allocate_new_page(new_db_size as usize, &self.buffer_pool, 0); + self.add_dirty(&page); + let page_key = PageCacheKey::new(page.get().id); + let mut cache = self.page_cache.write(); + match cache.insert(page_key, page.clone()) { + Ok(_) => (), + Err(CacheError::Full) => return Err(LimboError::CacheFull), + Err(_) => { + return Err(LimboError::InternalError( + "Unknown error inserting page to cache".into(), + )) + } + } + // we allocated a ptrmap page, so the next data page will be at new_db_size + 1 + new_db_size += 1; + } + } - let page_key = PageCacheKey::new(page.get().id); - let mut cache = self.page_cache.write(); - match cache.insert(page_key, page.clone()) { - Ok(_) => (), - Err(CacheError::Full) => return Err(LimboError::CacheFull), - Err(_) => { - return Err(LimboError::InternalError( - "Unknown error inserting page to cache".into(), - )) + let first_freelist_trunk_page_id = + header_accessor::get_freelist_trunk_page(self)?; + if first_freelist_trunk_page_id == 0 { + *state = AllocatePageState::AllocateNewPage { + current_db_size: new_db_size, + }; + continue; + } + *state = AllocatePageState::LoadFreelistTrunkPage { + current_trunk: first_freelist_trunk_page_id, + current_db_size: new_db_size, + }; + continue; + } + AllocatePageState::LoadFreelistTrunkPage { + current_trunk, + current_db_size, + } => { + let page = self.read_page(*current_trunk as usize)?; + if page.is_locked() { + return Ok(IOResult::IO); + } + *state = AllocatePageState::SearchAvailableFreeListLeaf { + trunk_page: page, + current_db_size: *current_db_size, + }; + continue; + } + AllocatePageState::SearchAvailableFreeListLeaf { + trunk_page, + current_db_size, + } => { + turso_assert!( + trunk_page.is_loaded(), + "Freelist trunk page {} is not loaded", + trunk_page.get().id + ); + let page_contents = trunk_page.get().contents.as_ref().unwrap(); + let next_trunk_page_id = + page_contents.read_u32(FREELIST_TRUNK_OFFSET_NEXT_TRUNK); + let number_of_freelist_leaves = + page_contents.read_u32(FREELIST_TRUNK_OFFSET_LEAF_COUNT); + + // There are leaf pointers on this trunk page, so we can reuse one of the pages + // for the allocation. + if number_of_freelist_leaves != 0 { + *state = AllocatePageState::ReuseFreelistLeaf { + trunk_page: trunk_page.clone(), + number_of_freelist_leaves, + }; + continue; + } + + // No freelist leaves on this trunk page. + // If the freelist is completely empty, allocate a new page. + if next_trunk_page_id == 0 { + *state = AllocatePageState::AllocateNewPage { + current_db_size: *current_db_size, + }; + continue; + } + + // Freelist is not empty, so we can reuse the trunk itself as a new page + // and update the database's first freelist trunk page to the next trunk page. + header_accessor::set_freelist_trunk_page(self, next_trunk_page_id)?; + header_accessor::set_freelist_pages( + self, + header_accessor::get_freelist_pages(self)? - 1, + )?; + self.add_dirty(trunk_page); + // zero out the page + turso_assert!( + trunk_page.get_contents().overflow_cells.is_empty(), + "Freelist leaf page {} has overflow cells", + trunk_page.get().id + ); + trunk_page.get().contents.as_ref().unwrap().as_ptr().fill(0); + let page_key = PageCacheKey::new(trunk_page.get().id); + { + let mut page_cache = self.page_cache.write(); + turso_assert!( + page_cache.contains_key(&page_key), + "page {} is not in cache", + trunk_page.get().id + ); + } + let trunk_page = trunk_page.clone(); + *state = AllocatePageState::Start; + return Ok(IOResult::Done(trunk_page)); + } + AllocatePageState::ReuseFreelistLeaf { + trunk_page, + number_of_freelist_leaves, + } => { + turso_assert!( + trunk_page.is_loaded(), + "Freelist trunk page {} is not loaded", + trunk_page.get().id + ); + turso_assert!( + *number_of_freelist_leaves > 0, + "Freelist trunk page {} has no leaves", + trunk_page.get().id + ); + let page_contents = trunk_page.get().contents.as_ref().unwrap(); + let next_leaf_page_id = + page_contents.read_u32(FREELIST_TRUNK_OFFSET_FIRST_LEAF); + let leaf_page = self.read_page(next_leaf_page_id as usize)?; + if leaf_page.is_locked() { + return Ok(IOResult::IO); + } + self.add_dirty(&leaf_page); + // zero out the page + turso_assert!( + leaf_page.get_contents().overflow_cells.is_empty(), + "Freelist leaf page {} has overflow cells", + leaf_page.get().id + ); + leaf_page.get().contents.as_ref().unwrap().as_ptr().fill(0); + let page_key = PageCacheKey::new(leaf_page.get().id); + { + let mut page_cache = self.page_cache.write(); + turso_assert!( + page_cache.contains_key(&page_key), + "page {} is not in cache", + leaf_page.get().id + ); + } + + // Shift left all the other leaf pages in the trunk page and subtract 1 from the leaf count + let remaining_leaves_count = (*number_of_freelist_leaves - 1) as usize; + { + let buf = page_contents.as_ptr(); + // use copy within the same page + const LEAF_PTR_SIZE_BYTES: usize = 4; + let offset_remaining_leaves_start = + FREELIST_TRUNK_OFFSET_FIRST_LEAF + LEAF_PTR_SIZE_BYTES; + let offset_remaining_leaves_end = offset_remaining_leaves_start + + remaining_leaves_count * LEAF_PTR_SIZE_BYTES; + buf.copy_within( + offset_remaining_leaves_start..offset_remaining_leaves_end, + FREELIST_TRUNK_OFFSET_FIRST_LEAF, + ); + } + // write the new leaf count + page_contents.write_u32( + FREELIST_TRUNK_OFFSET_LEAF_COUNT, + remaining_leaves_count as u32, + ); + self.add_dirty(trunk_page); + + header_accessor::set_freelist_pages( + self, + header_accessor::get_freelist_pages(self)? - 1, + )?; + + *state = AllocatePageState::Start; + return Ok(IOResult::Done(leaf_page)); + } + AllocatePageState::AllocateNewPage { current_db_size } => { + let new_db_size = *current_db_size + 1; + // FIXME: should reserve page cache entry before modifying the database + let page = allocate_new_page(new_db_size as usize, &self.buffer_pool, 0); + { + // setup page and add to cache + self.add_dirty(&page); + + let page_key = PageCacheKey::new(page.get().id); + { + // Run in separate block to avoid deadlock on page cache write lock + let mut cache = self.page_cache.write(); + match cache.insert(page_key, page.clone()) { + Err(CacheError::Full) => return Err(LimboError::CacheFull), + Err(_) => { + return Err(LimboError::InternalError( + "Unknown error inserting page to cache".into(), + )) + } + Ok(_) => {} + }; + } + header_accessor::set_database_size(self, new_db_size)?; + *state = AllocatePageState::Start; + return Ok(IOResult::Done(page)); } } - // we allocated a ptrmap page, so the next data page will be at new_db_size + 1 - new_db_size += 1; - } - } - - header_accessor::set_database_size(self, new_db_size)?; - - // FIXME: should reserve page cache entry before modifying the database - let page = allocate_page(new_db_size as usize, &self.buffer_pool, 0); - { - // setup page and add to cache - self.add_dirty(&page); - - let page_key = PageCacheKey::new(page.get().id); - let mut cache = self.page_cache.write(); - match cache.insert(page_key, page.clone()) { - Err(CacheError::Full) => Err(LimboError::CacheFull), - Err(_) => Err(LimboError::InternalError( - "Unknown error inserting page to cache".into(), - )), - Ok(_) => Ok(page), } } } @@ -1597,7 +1814,7 @@ impl Pager { } } -pub fn allocate_page(page_id: usize, buffer_pool: &Arc, offset: usize) -> PageRef { +pub fn allocate_new_page(page_id: usize, buffer_pool: &Arc, offset: usize) -> PageRef { let page = Arc::new(Page::new(page_id)); { let buffer = buffer_pool.get();