diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 3f75331a4..afc68722d 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -374,6 +374,19 @@ impl AtomicDbState { } } +#[derive(Debug, Clone)] +#[cfg(not(feature = "omit_autovacuum"))] +enum PtrMapGetState { + Start, + ReadPage { + page_size: usize, + }, + Deserialize { + ptrmap_page: PageRef, + offset_in_ptrmap_page: usize, + }, +} + /// The pager interface implements the persistence layer by providing access /// to pages of the database file, including caching, concurrency control, and /// transaction management. @@ -413,6 +426,9 @@ pub struct Pager { pub(crate) page_size: Cell>, reserved_space: OnceCell, free_page_state: RefCell, + #[cfg(not(feature = "omit_autovacuum"))] + /// State machine for [Pager::ptrmap_get] + ptrmap_get_state: RefCell, } #[derive(Debug, Clone)] @@ -516,6 +532,8 @@ impl Pager { }), free_page_state: RefCell::new(FreePageState::Start), allocate_page_state: RefCell::new(AllocatePageState::Start), + #[cfg(not(feature = "omit_autovacuum"))] + ptrmap_get_state: RefCell::new(PtrMapGetState::Start), }) } @@ -536,71 +554,101 @@ impl Pager { /// Returns `Ok(None)` if the page is not supposed to have a ptrmap entry (e.g. header, or a ptrmap page itself). #[cfg(not(feature = "omit_autovacuum"))] pub fn ptrmap_get(&self, target_page_num: u32) -> Result>> { - tracing::trace!("ptrmap_get(page_idx = {})", target_page_num); - let configured_page_size = - return_if_io!(self.with_header(|header| header.page_size)).get() as usize; + loop { + let ptrmap_get_state = self.ptrmap_get_state.borrow().clone(); + match ptrmap_get_state { + PtrMapGetState::Start => { + tracing::trace!("ptrmap_get(page_idx = {})", target_page_num); + let configured_page_size = + return_if_io!(self.with_header(|header| header.page_size)).get() as usize; - if target_page_num < FIRST_PTRMAP_PAGE_NO - || is_ptrmap_page(target_page_num, configured_page_size) - { - return Ok(IOResult::Done(None)); - } + if target_page_num < FIRST_PTRMAP_PAGE_NO + || is_ptrmap_page(target_page_num, configured_page_size) + { + return Ok(IOResult::Done(None)); + } - let ptrmap_pg_no = get_ptrmap_page_no_for_db_page(target_page_num, configured_page_size); - let offset_in_ptrmap_page = - get_ptrmap_offset_in_page(target_page_num, ptrmap_pg_no, configured_page_size)?; - tracing::trace!( - "ptrmap_get(page_idx = {}) = ptrmap_pg_no = {}", - target_page_num, - ptrmap_pg_no - ); + self.ptrmap_get_state.replace(PtrMapGetState::ReadPage { + page_size: configured_page_size, + }); + } + PtrMapGetState::ReadPage { + page_size: configured_page_size, + } => { + let ptrmap_pg_no = + get_ptrmap_page_no_for_db_page(target_page_num, configured_page_size); + let offset_in_ptrmap_page = get_ptrmap_offset_in_page( + target_page_num, + ptrmap_pg_no, + configured_page_size, + )?; + tracing::trace!( + "ptrmap_get(page_idx = {}) = ptrmap_pg_no = {}", + target_page_num, + ptrmap_pg_no + ); - let (ptrmap_page, _c) = self.read_page(ptrmap_pg_no as usize)?; - if ptrmap_page.is_locked() { - return Ok(IOResult::IO); - } - if !ptrmap_page.is_loaded() { - return Ok(IOResult::IO); - } - let ptrmap_page_inner = ptrmap_page.get(); + let (ptrmap_page, _c) = self.read_page(ptrmap_pg_no as usize)?; + self.ptrmap_get_state.replace(PtrMapGetState::Deserialize { + ptrmap_page, + offset_in_ptrmap_page, + }); + return Ok(IOResult::IO); + } + PtrMapGetState::Deserialize { + ptrmap_page, + offset_in_ptrmap_page, + } => { + if ptrmap_page.is_locked() { + return Ok(IOResult::IO); + } + if !ptrmap_page.is_loaded() { + return Ok(IOResult::IO); + } + let ptrmap_page_inner = ptrmap_page.get(); + let ptrmap_pg_no = ptrmap_page_inner.id; - let page_content: &PageContent = match ptrmap_page_inner.contents.as_ref() { - Some(content) => content, - None => { - return Err(LimboError::InternalError(format!( - "Ptrmap page {ptrmap_pg_no} content not loaded" - ))) + let page_content: &PageContent = match ptrmap_page_inner.contents.as_ref() { + Some(content) => content, + None => { + return Err(LimboError::InternalError(format!( + "Ptrmap page {ptrmap_pg_no} content not loaded" + ))); + } + }; + + let page_buffer_guard: std::cell::Ref = page_content.buffer.borrow(); + let full_buffer_slice: &[u8] = page_buffer_guard.as_slice(); + + // Ptrmap pages are not page 1, so their internal offset within their buffer should be 0. + // The actual page data starts at page_content.offset within the full_buffer_slice. + if ptrmap_pg_no != 1 && page_content.offset != 0 { + return Err(LimboError::Corrupt(format!( + "Ptrmap page {} has unexpected internal offset {}", + ptrmap_pg_no, page_content.offset + ))); + } + let ptrmap_page_data_slice: &[u8] = &full_buffer_slice[page_content.offset..]; + let actual_data_length = ptrmap_page_data_slice.len(); + + // Check if the calculated offset for the entry is within the bounds of the actual page data length. + if offset_in_ptrmap_page + PTRMAP_ENTRY_SIZE > actual_data_length { + return Err(LimboError::InternalError(format!( + "Ptrmap offset {offset_in_ptrmap_page} + entry size {PTRMAP_ENTRY_SIZE} out of bounds for page {ptrmap_pg_no} (actual data len {actual_data_length})" + ))); + } + + let entry_slice = &ptrmap_page_data_slice + [offset_in_ptrmap_page..offset_in_ptrmap_page + PTRMAP_ENTRY_SIZE]; + self.ptrmap_get_state.replace(PtrMapGetState::Start); + return match PtrmapEntry::deserialize(entry_slice) { + Some(entry) => Ok(IOResult::Done(Some(entry))), + None => Err(LimboError::Corrupt(format!( + "Failed to deserialize ptrmap entry for page {target_page_num} from ptrmap page {ptrmap_pg_no}" + ))), + }; + } } - }; - - let page_buffer_guard: std::cell::Ref = page_content.buffer.borrow(); - let full_buffer_slice: &[u8] = page_buffer_guard.as_slice(); - - // Ptrmap pages are not page 1, so their internal offset within their buffer should be 0. - // The actual page data starts at page_content.offset within the full_buffer_slice. - if ptrmap_pg_no != 1 && page_content.offset != 0 { - return Err(LimboError::Corrupt(format!( - "Ptrmap page {} has unexpected internal offset {}", - ptrmap_pg_no, page_content.offset - ))); - } - let ptrmap_page_data_slice: &[u8] = &full_buffer_slice[page_content.offset..]; - let actual_data_length = ptrmap_page_data_slice.len(); - - // Check if the calculated offset for the entry is within the bounds of the actual page data length. - if offset_in_ptrmap_page + PTRMAP_ENTRY_SIZE > actual_data_length { - return Err(LimboError::InternalError(format!( - "Ptrmap offset {offset_in_ptrmap_page} + entry size {PTRMAP_ENTRY_SIZE} out of bounds for page {ptrmap_pg_no} (actual data len {actual_data_length})" - ))); - } - - let entry_slice = &ptrmap_page_data_slice - [offset_in_ptrmap_page..offset_in_ptrmap_page + PTRMAP_ENTRY_SIZE]; - match PtrmapEntry::deserialize(entry_slice) { - Some(entry) => Ok(IOResult::Done(Some(entry))), - None => Err(LimboError::Corrupt(format!( - "Failed to deserialize ptrmap entry for page {target_page_num} from ptrmap page {ptrmap_pg_no}" - ))), } } @@ -2382,11 +2430,11 @@ mod ptrmap_tests { ); // (1+1) -> (header + ptrmap) // Read the entry from the ptrmap page and verify it - let entry = pager.ptrmap_get(db_page_to_update).unwrap(); - assert!(matches!(entry, IOResult::Done(Some(_)))); - let IOResult::Done(Some(entry)) = entry else { - panic!("entry is not Some"); - }; + let entry = pager + .io + .block(|| pager.ptrmap_get(db_page_to_update)) + .unwrap() + .unwrap(); assert_eq!(entry.entry_type, PtrmapType::RootPage); assert_eq!(entry.parent_page_no, 0); }