Merge ' should not return a Completion when there is a page cache hit' from Pedro Muniz

Reviewed-by: Jussi Saurio <jussi.saurio@gmail.com>

Closes #2723
This commit is contained in:
Pekka Enberg
2025-08-22 07:33:28 +03:00
committed by GitHub
2 changed files with 688 additions and 566 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -41,26 +41,30 @@ pub struct HeaderRef(PageRef);
impl HeaderRef {
pub fn from_pager(pager: &Pager) -> Result<IOResult<Self>> {
let state = pager.header_ref_state.borrow().clone();
tracing::trace!(?state);
match state {
HeaderRefState::Start => {
if !pager.db_state.is_initialized() {
return Err(LimboError::Page1NotAlloc);
}
loop {
let state = pager.header_ref_state.borrow().clone();
tracing::trace!(?state);
match state {
HeaderRefState::Start => {
if !pager.db_state.is_initialized() {
return Err(LimboError::Page1NotAlloc);
}
let (page, c) = pager.read_page(DatabaseHeader::PAGE_ID)?;
*pager.header_ref_state.borrow_mut() = HeaderRefState::CreateHeader { page };
io_yield_one!(c);
}
HeaderRefState::CreateHeader { page } => {
turso_assert!(page.is_loaded(), "page should be loaded");
turso_assert!(
page.get().id == DatabaseHeader::PAGE_ID,
"incorrect header page id"
);
*pager.header_ref_state.borrow_mut() = HeaderRefState::Start;
Ok(IOResult::Done(Self(page)))
let (page, c) = pager.read_page(DatabaseHeader::PAGE_ID)?;
*pager.header_ref_state.borrow_mut() = HeaderRefState::CreateHeader { page };
if let Some(c) = c {
io_yield_one!(c);
}
}
HeaderRefState::CreateHeader { page } => {
turso_assert!(page.is_loaded(), "page should be loaded");
turso_assert!(
page.get().id == DatabaseHeader::PAGE_ID,
"incorrect header page id"
);
*pager.header_ref_state.borrow_mut() = HeaderRefState::Start;
break Ok(IOResult::Done(Self(page)));
}
}
}
}
@@ -77,28 +81,32 @@ pub struct HeaderRefMut(PageRef);
impl HeaderRefMut {
pub fn from_pager(pager: &Pager) -> Result<IOResult<Self>> {
let state = pager.header_ref_state.borrow().clone();
tracing::trace!(?state);
match state {
HeaderRefState::Start => {
if !pager.db_state.is_initialized() {
return Err(LimboError::Page1NotAlloc);
loop {
let state = pager.header_ref_state.borrow().clone();
tracing::trace!(?state);
match state {
HeaderRefState::Start => {
if !pager.db_state.is_initialized() {
return Err(LimboError::Page1NotAlloc);
}
let (page, c) = pager.read_page(DatabaseHeader::PAGE_ID)?;
*pager.header_ref_state.borrow_mut() = HeaderRefState::CreateHeader { page };
if let Some(c) = c {
io_yield_one!(c);
}
}
HeaderRefState::CreateHeader { page } => {
turso_assert!(page.is_loaded(), "page should be loaded");
turso_assert!(
page.get().id == DatabaseHeader::PAGE_ID,
"incorrect header page id"
);
let (page, c) = pager.read_page(DatabaseHeader::PAGE_ID)?;
*pager.header_ref_state.borrow_mut() = HeaderRefState::CreateHeader { page };
io_yield_one!(c);
}
HeaderRefState::CreateHeader { page } => {
turso_assert!(page.is_loaded(), "page should be loaded");
turso_assert!(
page.get().id == DatabaseHeader::PAGE_ID,
"incorrect header page id"
);
pager.add_dirty(&page);
*pager.header_ref_state.borrow_mut() = HeaderRefState::Start;
Ok(IOResult::Done(Self(page)))
pager.add_dirty(&page);
*pager.header_ref_state.borrow_mut() = HeaderRefState::Start;
break Ok(IOResult::Done(Self(page)));
}
}
}
}
@@ -624,81 +632,88 @@ impl Pager {
/// Returns `Ok(None)` if the page is not supposed to have a ptrmap entry (e.g. header, or a ptrmap page itself).
#[cfg(not(feature = "omit_autovacuum"))]
pub fn ptrmap_get(&self, target_page_num: u32) -> Result<IOResult<Option<PtrmapEntry>>> {
let ptrmap_get_state = self.ptrmap_get_state.borrow().clone();
match ptrmap_get_state {
PtrMapGetState::Start => {
tracing::trace!("ptrmap_get(page_idx = {})", target_page_num);
let configured_page_size =
return_if_io!(self.with_header(|header| header.page_size)).get() as usize;
loop {
let ptrmap_get_state = self.ptrmap_get_state.borrow().clone();
match ptrmap_get_state {
PtrMapGetState::Start => {
tracing::trace!("ptrmap_get(page_idx = {})", target_page_num);
let configured_page_size =
return_if_io!(self.with_header(|header| header.page_size)).get() as usize;
if target_page_num < FIRST_PTRMAP_PAGE_NO
|| is_ptrmap_page(target_page_num, configured_page_size)
{
return Ok(IOResult::Done(None));
if target_page_num < FIRST_PTRMAP_PAGE_NO
|| is_ptrmap_page(target_page_num, configured_page_size)
{
return Ok(IOResult::Done(None));
}
let ptrmap_pg_no =
get_ptrmap_page_no_for_db_page(target_page_num, configured_page_size);
let offset_in_ptrmap_page = get_ptrmap_offset_in_page(
target_page_num,
ptrmap_pg_no,
configured_page_size,
)?;
tracing::trace!(
"ptrmap_get(page_idx = {}) = ptrmap_pg_no = {}",
target_page_num,
ptrmap_pg_no
);
let (ptrmap_page, c) = self.read_page(ptrmap_pg_no as usize)?;
self.ptrmap_get_state.replace(PtrMapGetState::Deserialize {
ptrmap_page,
offset_in_ptrmap_page,
});
if let Some(c) = c {
io_yield_one!(c);
}
}
let ptrmap_pg_no =
get_ptrmap_page_no_for_db_page(target_page_num, configured_page_size);
let offset_in_ptrmap_page =
get_ptrmap_offset_in_page(target_page_num, ptrmap_pg_no, configured_page_size)?;
tracing::trace!(
"ptrmap_get(page_idx = {}) = ptrmap_pg_no = {}",
target_page_num,
ptrmap_pg_no
);
let (ptrmap_page, c) = self.read_page(ptrmap_pg_no as usize)?;
self.ptrmap_get_state.replace(PtrMapGetState::Deserialize {
PtrMapGetState::Deserialize {
ptrmap_page,
offset_in_ptrmap_page,
});
io_yield_one!(c);
}
PtrMapGetState::Deserialize {
ptrmap_page,
offset_in_ptrmap_page,
} => {
turso_assert!(ptrmap_page.is_loaded(), "ptrmap_page should be loaded");
let ptrmap_page_inner = ptrmap_page.get();
let ptrmap_pg_no = ptrmap_page_inner.id;
} => {
turso_assert!(ptrmap_page.is_loaded(), "ptrmap_page should be loaded");
let ptrmap_page_inner = ptrmap_page.get();
let ptrmap_pg_no = ptrmap_page_inner.id;
let page_content: &PageContent = match ptrmap_page_inner.contents.as_ref() {
Some(content) => content,
None => {
return Err(LimboError::InternalError(format!(
"Ptrmap page {ptrmap_pg_no} content not loaded"
let page_content: &PageContent = match ptrmap_page_inner.contents.as_ref() {
Some(content) => content,
None => {
return Err(LimboError::InternalError(format!(
"Ptrmap page {ptrmap_pg_no} content not loaded"
)));
}
};
let full_buffer_slice: &[u8] = page_content.buffer.as_slice();
// Ptrmap pages are not page 1, so their internal offset within their buffer should be 0.
// The actual page data starts at page_content.offset within the full_buffer_slice.
if ptrmap_pg_no != 1 && page_content.offset != 0 {
return Err(LimboError::Corrupt(format!(
"Ptrmap page {} has unexpected internal offset {}",
ptrmap_pg_no, page_content.offset
)));
}
};
let ptrmap_page_data_slice: &[u8] = &full_buffer_slice[page_content.offset..];
let actual_data_length = ptrmap_page_data_slice.len();
let full_buffer_slice: &[u8] = page_content.buffer.as_slice();
// Ptrmap pages are not page 1, so their internal offset within their buffer should be 0.
// The actual page data starts at page_content.offset within the full_buffer_slice.
if ptrmap_pg_no != 1 && page_content.offset != 0 {
return Err(LimboError::Corrupt(format!(
"Ptrmap page {} has unexpected internal offset {}",
ptrmap_pg_no, page_content.offset
)));
}
let ptrmap_page_data_slice: &[u8] = &full_buffer_slice[page_content.offset..];
let actual_data_length = ptrmap_page_data_slice.len();
// Check if the calculated offset for the entry is within the bounds of the actual page data length.
if offset_in_ptrmap_page + PTRMAP_ENTRY_SIZE > actual_data_length {
return Err(LimboError::InternalError(format!(
// Check if the calculated offset for the entry is within the bounds of the actual page data length.
if offset_in_ptrmap_page + PTRMAP_ENTRY_SIZE > actual_data_length {
return Err(LimboError::InternalError(format!(
"Ptrmap offset {offset_in_ptrmap_page} + entry size {PTRMAP_ENTRY_SIZE} out of bounds for page {ptrmap_pg_no} (actual data len {actual_data_length})"
)));
}
}
let entry_slice = &ptrmap_page_data_slice
[offset_in_ptrmap_page..offset_in_ptrmap_page + PTRMAP_ENTRY_SIZE];
self.ptrmap_get_state.replace(PtrMapGetState::Start);
match PtrmapEntry::deserialize(entry_slice) {
Some(entry) => Ok(IOResult::Done(Some(entry))),
None => Err(LimboError::Corrupt(format!(
"Failed to deserialize ptrmap entry for page {target_page_num} from ptrmap page {ptrmap_pg_no}"
))),
let entry_slice = &ptrmap_page_data_slice
[offset_in_ptrmap_page..offset_in_ptrmap_page + PTRMAP_ENTRY_SIZE];
self.ptrmap_get_state.replace(PtrMapGetState::Start);
break match PtrmapEntry::deserialize(entry_slice) {
Some(entry) => Ok(IOResult::Done(Some(entry))),
None => Err(LimboError::Corrupt(format!(
"Failed to deserialize ptrmap entry for page {target_page_num} from ptrmap page {ptrmap_pg_no}"
))),
};
}
}
}
@@ -720,24 +735,26 @@ impl Pager {
entry_type,
parent_page_no
);
let ptrmap_put_state = self.ptrmap_put_state.borrow().clone();
match ptrmap_put_state {
PtrMapPutState::Start => {
let page_size =
return_if_io!(self.with_header(|header| header.page_size)).get() as usize;
loop {
let ptrmap_put_state = self.ptrmap_put_state.borrow().clone();
match ptrmap_put_state {
PtrMapPutState::Start => {
let page_size =
return_if_io!(self.with_header(|header| header.page_size)).get() as usize;
if db_page_no_to_update < FIRST_PTRMAP_PAGE_NO
|| is_ptrmap_page(db_page_no_to_update, page_size)
{
return Err(LimboError::InternalError(format!(
if db_page_no_to_update < FIRST_PTRMAP_PAGE_NO
|| is_ptrmap_page(db_page_no_to_update, page_size)
{
return Err(LimboError::InternalError(format!(
"Cannot set ptrmap entry for page {db_page_no_to_update}: it's a header/ptrmap page or invalid."
)));
}
}
let ptrmap_pg_no = get_ptrmap_page_no_for_db_page(db_page_no_to_update, page_size);
let offset_in_ptrmap_page =
get_ptrmap_offset_in_page(db_page_no_to_update, ptrmap_pg_no, page_size)?;
tracing::trace!(
let ptrmap_pg_no =
get_ptrmap_page_no_for_db_page(db_page_no_to_update, page_size);
let offset_in_ptrmap_page =
get_ptrmap_offset_in_page(db_page_no_to_update, ptrmap_pg_no, page_size)?;
tracing::trace!(
"ptrmap_put(page_idx = {}, entry_type = {:?}, parent_page_no = {}) = ptrmap_pg_no = {}, offset_in_ptrmap_page = {}",
db_page_no_to_update,
entry_type,
@@ -746,58 +763,61 @@ impl Pager {
offset_in_ptrmap_page
);
let (ptrmap_page, c) = self.read_page(ptrmap_pg_no as usize)?;
self.ptrmap_put_state.replace(PtrMapPutState::Deserialize {
let (ptrmap_page, c) = self.read_page(ptrmap_pg_no as usize)?;
self.ptrmap_put_state.replace(PtrMapPutState::Deserialize {
ptrmap_page,
offset_in_ptrmap_page,
});
if let Some(c) = c {
io_yield_one!(c);
}
}
PtrMapPutState::Deserialize {
ptrmap_page,
offset_in_ptrmap_page,
});
io_yield_one!(c);
}
PtrMapPutState::Deserialize {
ptrmap_page,
offset_in_ptrmap_page,
} => {
turso_assert!(ptrmap_page.is_loaded(), "page should be loaded");
let ptrmap_page_inner = ptrmap_page.get();
let ptrmap_pg_no = ptrmap_page_inner.id;
} => {
turso_assert!(ptrmap_page.is_loaded(), "page should be loaded");
let ptrmap_page_inner = ptrmap_page.get();
let ptrmap_pg_no = ptrmap_page_inner.id;
let page_content = match ptrmap_page_inner.contents.as_ref() {
Some(content) => content,
None => {
let page_content = match ptrmap_page_inner.contents.as_ref() {
Some(content) => content,
None => {
return Err(LimboError::InternalError(format!(
"Ptrmap page {ptrmap_pg_no} content not loaded"
)))
}
};
let full_buffer_slice = page_content.buffer.as_mut_slice();
if offset_in_ptrmap_page + PTRMAP_ENTRY_SIZE > full_buffer_slice.len() {
return Err(LimboError::InternalError(format!(
"Ptrmap page {ptrmap_pg_no} content not loaded"
)))
}
};
let full_buffer_slice = page_content.buffer.as_mut_slice();
if offset_in_ptrmap_page + PTRMAP_ENTRY_SIZE > full_buffer_slice.len() {
return Err(LimboError::InternalError(format!(
"Ptrmap offset {} + entry size {} out of bounds for page {} (actual data len {})",
offset_in_ptrmap_page,
PTRMAP_ENTRY_SIZE,
ptrmap_pg_no,
full_buffer_slice.len()
)));
}
let entry = PtrmapEntry {
entry_type,
parent_page_no,
};
entry.serialize(
&mut full_buffer_slice
[offset_in_ptrmap_page..offset_in_ptrmap_page + PTRMAP_ENTRY_SIZE],
)?;
turso_assert!(
ptrmap_page.get().id == ptrmap_pg_no,
"ptrmap page has unexpected number"
);
self.add_dirty(&ptrmap_page);
self.ptrmap_put_state.replace(PtrMapPutState::Start);
break Ok(IOResult::Done(()));
}
let entry = PtrmapEntry {
entry_type,
parent_page_no,
};
entry.serialize(
&mut full_buffer_slice
[offset_in_ptrmap_page..offset_in_ptrmap_page + PTRMAP_ENTRY_SIZE],
)?;
turso_assert!(
ptrmap_page.get().id == ptrmap_pg_no,
"ptrmap page has unexpected number"
);
self.add_dirty(&ptrmap_page);
self.ptrmap_put_state.replace(PtrMapPutState::Start);
Ok(IOResult::Done(()))
}
}
}
@@ -1097,18 +1117,17 @@ impl Pager {
/// Reads a page from the database.
#[tracing::instrument(skip_all, level = Level::DEBUG)]
pub fn read_page(&self, page_idx: usize) -> Result<(PageRef, Completion)> {
pub fn read_page(&self, page_idx: usize) -> Result<(PageRef, Option<Completion>)> {
tracing::trace!("read_page(page_idx = {})", page_idx);
let mut page_cache = self.page_cache.write();
let page_key = PageCacheKey::new(page_idx);
if let Some(page) = page_cache.get(&page_key) {
tracing::trace!("read_page(page_idx = {}) = cached", page_idx);
// Dummy completion being passed, as we do not need to read from database or wal
return Ok((page.clone(), Completion::new_dummy()));
return Ok((page.clone(), None));
}
let (page, c) = self.read_page_no_cache(page_idx, None, false)?;
self.cache_insert(page_idx, page.clone(), &mut page_cache)?;
Ok((page, c))
Ok((page, Some(c)))
}
fn begin_read_disk_page(
@@ -1633,7 +1652,9 @@ impl Pager {
// Add as leaf to current trunk
let (page, c) = self.read_page(trunk_page_id as usize)?;
trunk_page.replace(page);
io_yield_one!(c);
if let Some(c) = c {
io_yield_one!(c);
}
}
let trunk_page = trunk_page.as_ref().unwrap();
turso_assert!(trunk_page.is_loaded(), "trunk_page should be loaded");
@@ -1830,7 +1851,9 @@ impl Pager {
trunk_page,
current_db_size: new_db_size,
};
io_yield_one!(c);
if let Some(c) = c {
io_yield_one!(c);
}
}
AllocatePageState::SearchAvailableFreeListLeaf {
trunk_page,
@@ -1866,7 +1889,10 @@ impl Pager {
leaf_page,
number_of_freelist_leaves,
};
io_yield_one!(c);
if let Some(c) = c {
io_yield_one!(c);
}
continue;
}
// No freelist leaves on this trunk page.