Pager: subjournal page if required when it's marked as dirty

This commit is contained in:
Jussi Saurio
2025-10-21 12:05:48 +03:00
parent 97177dae02
commit a8cf8e4594
2 changed files with 36 additions and 26 deletions

View File

@@ -1049,7 +1049,13 @@ impl BTreeCursor {
local_amount = local_size as u32 - offset;
}
if is_write {
self.write_payload_to_page(offset, local_amount, payload, buffer, page.clone());
self.write_payload_to_page(
offset,
local_amount,
payload,
buffer,
page.clone(),
)?;
} else {
self.read_payload_from_page(offset, local_amount, payload, buffer);
}
@@ -1175,7 +1181,7 @@ impl BTreeCursor {
page_payload,
buffer,
page.clone(),
);
)?;
} else {
self.read_payload_from_page(
payload_offset as u32,
@@ -1247,13 +1253,14 @@ impl BTreeCursor {
payload: &[u8],
buffer: &mut [u8],
page: PageRef,
) {
self.pager.add_dirty(&page);
) -> Result<()> {
self.pager.add_dirty(&page)?;
// SAFETY: This is safe as long as the page is not evicted from the cache.
let payload_mut =
unsafe { std::slice::from_raw_parts_mut(payload.as_ptr() as *mut u8, payload.len()) };
payload_mut[payload_offset as usize..payload_offset as usize + num_bytes as usize]
.copy_from_slice(&buffer[..num_bytes as usize]);
Ok(())
}
/// Check if any ancestor pages still have cells to iterate.
@@ -2275,7 +2282,7 @@ impl BTreeCursor {
// get page and find cell
let cell_idx = {
self.pager.add_dirty(&page);
self.pager.add_dirty(&page)?;
self.stack.current_cell_index()
};
if cell_idx == -1 {
@@ -2643,8 +2650,8 @@ impl BTreeCursor {
usable_space,
)?;
parent_contents.write_rightmost_ptr(new_rightmost_leaf.get().id as u32);
self.pager.add_dirty(parent);
self.pager.add_dirty(&new_rightmost_leaf);
self.pager.add_dirty(parent)?;
self.pager.add_dirty(&new_rightmost_leaf)?;
// Continue balance from the parent page (inserting the new divider cell may have overflowed the parent)
self.stack.pop();
@@ -2721,7 +2728,7 @@ impl BTreeCursor {
overflow_cell.index
);
}
self.pager.add_dirty(parent_page);
self.pager.add_dirty(parent_page)?;
let parent_contents = parent_page.get_contents();
let page_to_balance_idx = self.stack.current_cell_index() as usize;
@@ -2824,7 +2831,7 @@ impl BTreeCursor {
}
Ok((page, c)) => {
// mark as dirty
self.pager.add_dirty(&page);
self.pager.add_dirty(&page)?;
pages_to_balance[i].replace(page);
if let Some(c) = c {
group.add(&c);
@@ -4678,7 +4685,7 @@ impl BTreeCursor {
destroy_info.state = DestroyState::ProcessPage;
} else {
if keep_root {
self.clear_root(&page);
self.clear_root(&page)?;
} else {
return_if_io!(self.pager.free_page(Some(page), page_id));
}
@@ -4693,7 +4700,7 @@ impl BTreeCursor {
}
}
fn clear_root(&mut self, root_page: &PageRef) {
fn clear_root(&mut self, root_page: &PageRef) -> Result<()> {
let page_ref = root_page.get();
let contents = page_ref.contents.as_ref().unwrap();
@@ -4702,8 +4709,9 @@ impl BTreeCursor {
PageType::IndexLeaf | PageType::IndexInterior => PageType::IndexLeaf,
};
self.pager.add_dirty(root_page);
self.pager.add_dirty(root_page)?;
btree_init_page(root_page, page_type, 0, self.pager.usable_space());
Ok(())
}
pub fn overwrite_cell(
@@ -5072,7 +5080,7 @@ impl CursorTrait for BTreeCursor {
match delete_state {
DeleteState::Start => {
let page = self.stack.top_ref();
self.pager.add_dirty(page);
self.pager.add_dirty(page)?;
if matches!(
page.get_contents().page_type(),
PageType::TableLeaf | PageType::TableInterior
@@ -5269,8 +5277,8 @@ impl CursorTrait for BTreeCursor {
let leaf_page = self.stack.top_ref();
self.pager.add_dirty(page);
self.pager.add_dirty(leaf_page);
self.pager.add_dirty(page)?;
self.pager.add_dirty(leaf_page)?;
// Step 2: Replace the cell in the parent (interior) page.
{

View File

@@ -117,7 +117,7 @@ impl HeaderRefMut {
"incorrect header page id"
);
pager.add_dirty(&page);
pager.add_dirty(&page)?;
*pager.header_ref_state.write() = HeaderRefState::Start;
break Ok(IOResult::Done(Self(page)));
}
@@ -1118,7 +1118,7 @@ impl Pager {
ptrmap_page.get().id == ptrmap_pg_no,
"ptrmap page has unexpected number"
);
self.add_dirty(&ptrmap_page);
self.add_dirty(&ptrmap_page)?;
self.vacuum_state.write().ptrmap_put_state = PtrMapPutState::Start;
break Ok(IOResult::Done(()));
}
@@ -1574,11 +1574,13 @@ impl Pager {
Ok(page_cache.resize(capacity))
}
pub fn add_dirty(&self, page: &Page) {
pub fn add_dirty(&self, page: &Page) -> Result<()> {
self.subjournal_page_if_required(page)?;
// TODO: check duplicates?
let mut dirty_pages = self.dirty_pages.write();
dirty_pages.insert(page.get().id);
page.set_dirty();
Ok(())
}
pub fn wal_state(&self) -> Result<WalState> {
@@ -2235,7 +2237,7 @@ impl Pager {
trunk_page.get().id == trunk_page_id as usize,
"trunk page has unexpected id"
);
self.add_dirty(&trunk_page);
self.add_dirty(&trunk_page)?;
trunk_page_contents.write_u32_no_offset(
TRUNK_PAGE_LEAF_COUNT_OFFSET,
@@ -2255,7 +2257,7 @@ impl Pager {
turso_assert!(page.is_loaded(), "page should be loaded");
// If we get here, need to make this page a new trunk
turso_assert!(page.get().id == page_id, "page has unexpected id");
self.add_dirty(page);
self.add_dirty(page)?;
let trunk_page_id = header.freelist_trunk_page.get();
@@ -2395,7 +2397,7 @@ impl Pager {
// we will allocate a ptrmap page, so increment size
new_db_size += 1;
let page = allocate_new_page(new_db_size as i64, &self.buffer_pool, 0);
self.add_dirty(&page);
self.add_dirty(&page)?;
let page_key = PageCacheKey::new(page.get().id as usize);
let mut cache = self.page_cache.write();
cache.insert(page_key, page.clone())?;
@@ -2471,7 +2473,7 @@ impl Pager {
// and update the database's first freelist trunk page to the next trunk page.
header.freelist_trunk_page = next_trunk_page_id.into();
header.freelist_pages = (header.freelist_pages.get() - 1).into();
self.add_dirty(trunk_page);
self.add_dirty(trunk_page)?;
// zero out the page
turso_assert!(
trunk_page.get_contents().overflow_cells.is_empty(),
@@ -2503,7 +2505,7 @@ impl Pager {
leaf_page.get().id
);
let page_contents = trunk_page.get_contents();
self.add_dirty(leaf_page);
self.add_dirty(leaf_page)?;
// zero out the page
turso_assert!(
leaf_page.get_contents().overflow_cells.is_empty(),
@@ -2541,7 +2543,7 @@ impl Pager {
FREELIST_TRUNK_OFFSET_LEAF_COUNT,
remaining_leaves_count as u32,
);
self.add_dirty(trunk_page);
self.add_dirty(trunk_page)?;
header.freelist_pages = (header.freelist_pages.get() - 1).into();
let leaf_page = leaf_page.clone();
@@ -2555,7 +2557,7 @@ impl Pager {
if Some(new_db_size) == self.pending_byte_page_id() {
let richard_hipp_special_page =
allocate_new_page(new_db_size as i64, &self.buffer_pool, 0);
self.add_dirty(&richard_hipp_special_page);
self.add_dirty(&richard_hipp_special_page)?;
let page_key = PageCacheKey::new(richard_hipp_special_page.get().id);
{
let mut cache = self.page_cache.write();
@@ -2579,7 +2581,7 @@ impl Pager {
let page = allocate_new_page(new_db_size as i64, &self.buffer_pool, 0);
{
// setup page and add to cache
self.add_dirty(&page);
self.add_dirty(&page)?;
let page_key = PageCacheKey::new(page.get().id as usize);
{