state machine for ptrmap_get

This commit is contained in:
pedrocarlo
2025-08-04 23:30:03 -03:00
parent 0ac040cc87
commit aa8d17cbf1

View File

@@ -374,6 +374,19 @@ impl AtomicDbState {
}
}
#[derive(Debug, Clone)]
#[cfg(not(feature = "omit_autovacuum"))]
enum PtrMapGetState {
Start,
ReadPage {
page_size: usize,
},
Deserialize {
ptrmap_page: PageRef,
offset_in_ptrmap_page: usize,
},
}
/// The pager interface implements the persistence layer by providing access
/// to pages of the database file, including caching, concurrency control, and
/// transaction management.
@@ -413,6 +426,9 @@ pub struct Pager {
pub(crate) page_size: Cell<Option<u32>>,
reserved_space: OnceCell<u8>,
free_page_state: RefCell<FreePageState>,
#[cfg(not(feature = "omit_autovacuum"))]
/// State machine for [Pager::ptrmap_get]
ptrmap_get_state: RefCell<PtrMapGetState>,
}
#[derive(Debug, Clone)]
@@ -516,6 +532,8 @@ impl Pager {
}),
free_page_state: RefCell::new(FreePageState::Start),
allocate_page_state: RefCell::new(AllocatePageState::Start),
#[cfg(not(feature = "omit_autovacuum"))]
ptrmap_get_state: RefCell::new(PtrMapGetState::Start),
})
}
@@ -536,71 +554,101 @@ impl Pager {
/// Returns `Ok(None)` if the page is not supposed to have a ptrmap entry (e.g. header, or a ptrmap page itself).
#[cfg(not(feature = "omit_autovacuum"))]
pub fn ptrmap_get(&self, target_page_num: u32) -> Result<IOResult<Option<PtrmapEntry>>> {
tracing::trace!("ptrmap_get(page_idx = {})", target_page_num);
let configured_page_size =
return_if_io!(self.with_header(|header| header.page_size)).get() as usize;
loop {
let ptrmap_get_state = self.ptrmap_get_state.borrow().clone();
match ptrmap_get_state {
PtrMapGetState::Start => {
tracing::trace!("ptrmap_get(page_idx = {})", target_page_num);
let configured_page_size =
return_if_io!(self.with_header(|header| header.page_size)).get() as usize;
if target_page_num < FIRST_PTRMAP_PAGE_NO
|| is_ptrmap_page(target_page_num, configured_page_size)
{
return Ok(IOResult::Done(None));
}
if target_page_num < FIRST_PTRMAP_PAGE_NO
|| is_ptrmap_page(target_page_num, configured_page_size)
{
return Ok(IOResult::Done(None));
}
let ptrmap_pg_no = get_ptrmap_page_no_for_db_page(target_page_num, configured_page_size);
let offset_in_ptrmap_page =
get_ptrmap_offset_in_page(target_page_num, ptrmap_pg_no, configured_page_size)?;
tracing::trace!(
"ptrmap_get(page_idx = {}) = ptrmap_pg_no = {}",
target_page_num,
ptrmap_pg_no
);
self.ptrmap_get_state.replace(PtrMapGetState::ReadPage {
page_size: configured_page_size,
});
}
PtrMapGetState::ReadPage {
page_size: configured_page_size,
} => {
let ptrmap_pg_no =
get_ptrmap_page_no_for_db_page(target_page_num, configured_page_size);
let offset_in_ptrmap_page = get_ptrmap_offset_in_page(
target_page_num,
ptrmap_pg_no,
configured_page_size,
)?;
tracing::trace!(
"ptrmap_get(page_idx = {}) = ptrmap_pg_no = {}",
target_page_num,
ptrmap_pg_no
);
let (ptrmap_page, _c) = self.read_page(ptrmap_pg_no as usize)?;
if ptrmap_page.is_locked() {
return Ok(IOResult::IO);
}
if !ptrmap_page.is_loaded() {
return Ok(IOResult::IO);
}
let ptrmap_page_inner = ptrmap_page.get();
let (ptrmap_page, _c) = self.read_page(ptrmap_pg_no as usize)?;
self.ptrmap_get_state.replace(PtrMapGetState::Deserialize {
ptrmap_page,
offset_in_ptrmap_page,
});
return Ok(IOResult::IO);
}
PtrMapGetState::Deserialize {
ptrmap_page,
offset_in_ptrmap_page,
} => {
if ptrmap_page.is_locked() {
return Ok(IOResult::IO);
}
if !ptrmap_page.is_loaded() {
return Ok(IOResult::IO);
}
let ptrmap_page_inner = ptrmap_page.get();
let ptrmap_pg_no = ptrmap_page_inner.id;
let page_content: &PageContent = match ptrmap_page_inner.contents.as_ref() {
Some(content) => content,
None => {
return Err(LimboError::InternalError(format!(
"Ptrmap page {ptrmap_pg_no} content not loaded"
)))
let page_content: &PageContent = match ptrmap_page_inner.contents.as_ref() {
Some(content) => content,
None => {
return Err(LimboError::InternalError(format!(
"Ptrmap page {ptrmap_pg_no} content not loaded"
)));
}
};
let page_buffer_guard: std::cell::Ref<IoBuffer> = page_content.buffer.borrow();
let full_buffer_slice: &[u8] = page_buffer_guard.as_slice();
// Ptrmap pages are not page 1, so their internal offset within their buffer should be 0.
// The actual page data starts at page_content.offset within the full_buffer_slice.
if ptrmap_pg_no != 1 && page_content.offset != 0 {
return Err(LimboError::Corrupt(format!(
"Ptrmap page {} has unexpected internal offset {}",
ptrmap_pg_no, page_content.offset
)));
}
let ptrmap_page_data_slice: &[u8] = &full_buffer_slice[page_content.offset..];
let actual_data_length = ptrmap_page_data_slice.len();
// Check if the calculated offset for the entry is within the bounds of the actual page data length.
if offset_in_ptrmap_page + PTRMAP_ENTRY_SIZE > actual_data_length {
return Err(LimboError::InternalError(format!(
"Ptrmap offset {offset_in_ptrmap_page} + entry size {PTRMAP_ENTRY_SIZE} out of bounds for page {ptrmap_pg_no} (actual data len {actual_data_length})"
)));
}
let entry_slice = &ptrmap_page_data_slice
[offset_in_ptrmap_page..offset_in_ptrmap_page + PTRMAP_ENTRY_SIZE];
self.ptrmap_get_state.replace(PtrMapGetState::Start);
return match PtrmapEntry::deserialize(entry_slice) {
Some(entry) => Ok(IOResult::Done(Some(entry))),
None => Err(LimboError::Corrupt(format!(
"Failed to deserialize ptrmap entry for page {target_page_num} from ptrmap page {ptrmap_pg_no}"
))),
};
}
}
};
let page_buffer_guard: std::cell::Ref<IoBuffer> = page_content.buffer.borrow();
let full_buffer_slice: &[u8] = page_buffer_guard.as_slice();
// Ptrmap pages are not page 1, so their internal offset within their buffer should be 0.
// The actual page data starts at page_content.offset within the full_buffer_slice.
if ptrmap_pg_no != 1 && page_content.offset != 0 {
return Err(LimboError::Corrupt(format!(
"Ptrmap page {} has unexpected internal offset {}",
ptrmap_pg_no, page_content.offset
)));
}
let ptrmap_page_data_slice: &[u8] = &full_buffer_slice[page_content.offset..];
let actual_data_length = ptrmap_page_data_slice.len();
// Check if the calculated offset for the entry is within the bounds of the actual page data length.
if offset_in_ptrmap_page + PTRMAP_ENTRY_SIZE > actual_data_length {
return Err(LimboError::InternalError(format!(
"Ptrmap offset {offset_in_ptrmap_page} + entry size {PTRMAP_ENTRY_SIZE} out of bounds for page {ptrmap_pg_no} (actual data len {actual_data_length})"
)));
}
let entry_slice = &ptrmap_page_data_slice
[offset_in_ptrmap_page..offset_in_ptrmap_page + PTRMAP_ENTRY_SIZE];
match PtrmapEntry::deserialize(entry_slice) {
Some(entry) => Ok(IOResult::Done(Some(entry))),
None => Err(LimboError::Corrupt(format!(
"Failed to deserialize ptrmap entry for page {target_page_num} from ptrmap page {ptrmap_pg_no}"
))),
}
}
@@ -2382,11 +2430,11 @@ mod ptrmap_tests {
); // (1+1) -> (header + ptrmap)
// Read the entry from the ptrmap page and verify it
let entry = pager.ptrmap_get(db_page_to_update).unwrap();
assert!(matches!(entry, IOResult::Done(Some(_))));
let IOResult::Done(Some(entry)) = entry else {
panic!("entry is not Some");
};
let entry = pager
.io
.block(|| pager.ptrmap_get(db_page_to_update))
.unwrap()
.unwrap();
assert_eq!(entry.entry_type, PtrmapType::RootPage);
assert_eq!(entry.parent_page_no, 0);
}