PageContent: make read_x/write_x methods private and add dedicated methods

Problem:

A very easy source of bugs is to mistakenly use e.g. PageContent::read_u16()
instead of PageContent::read_u16_no_offset(). The difference between the two
is that `read_u16()` adds 100 bytes to the requested byte offset if and only if
the page in question is page 1, which contains a 100-byte database header.

Case in point: see #2491.

Observation:

In all of the cases where we want to read from or write to a page  "header-sensitively",
those reads/writes are to so-called "well known offsets", e.g. specific bytes in a btree
page header.

In all other cases, the "no-offset" versions, i.e. the ones taking the absolute byte offset
as parameter, should be used.

Solution:

1. Make all the offset-sensitive versions (read_u16() and friends) private methods of
`PageContent`.
2. Expose dedicated methods for things like updating rightmost pointer, updating fragmented
bytes count and so on, and use them instead of the plain read/write methods universally.
This commit is contained in:
Jussi Saurio
2025-08-07 16:27:42 +03:00
parent 3f181c9145
commit 1fe32dadf3
3 changed files with 128 additions and 75 deletions

View File

@@ -3249,7 +3249,7 @@ impl BTreeCursor {
new_last_page
.get()
.get_contents()
.write_u32(offset::BTREE_RIGHTMOST_PTR, right_pointer);
.write_rightmost_ptr(right_pointer);
}
turso_assert!(
parent_contents.overflow_cells.is_empty(),
@@ -3276,7 +3276,7 @@ impl BTreeCursor {
let previous_pointer_divider = read_u32(divider_cell, 0);
page.get()
.get_contents()
.write_u32(offset::BTREE_RIGHTMOST_PTR, previous_pointer_divider);
.write_rightmost_ptr(previous_pointer_divider);
// divider cell now points to this page
new_divider_cell
.extend_from_slice(&(page.get().get().id as u32).to_be_bytes());
@@ -4102,13 +4102,13 @@ impl BTreeCursor {
other => other,
} as u8;
// set new page type
root_contents.write_u8(offset::BTREE_PAGE_TYPE, new_root_page_type);
root_contents.write_u32(offset::BTREE_RIGHTMOST_PTR, child.get().id as u32);
root_contents.write_u16(offset::BTREE_CELL_CONTENT_AREA, self.usable_space() as u16);
root_contents.write_u16(offset::BTREE_CELL_COUNT, 0);
root_contents.write_u16(offset::BTREE_FIRST_FREEBLOCK, 0);
root_contents.write_page_type(new_root_page_type);
root_contents.write_rightmost_ptr(child.get().id as u32);
root_contents.write_cell_content_area(self.usable_space() as u16);
root_contents.write_cell_count(0);
root_contents.write_first_freeblock(0);
root_contents.write_u8(offset::BTREE_FRAGMENTED_BYTES_COUNT, 0);
root_contents.write_fragmented_bytes_count(0);
root_contents.overflow_cells.clear();
self.root_page = root.get().id;
self.stack.clear();
@@ -4924,7 +4924,7 @@ impl BTreeCursor {
return_if_locked!(page);
let contents = page.get_contents();
let next = contents.read_u32(0);
let next = contents.read_u32_no_offset(0);
let next_page_id = page.get().id;
return_if_io!(self.pager.free_page(Some(page), next_page_id));
@@ -6101,7 +6101,7 @@ fn find_free_cell(page_ref: &PageContent, usable_space: u16, amount: usize) -> R
// Delete the slot from freelist and update the page's fragment count.
page_ref.write_u16_no_offset(prev_pc, next);
let frag = page_ref.num_frag_free_bytes() + new_size as u8;
page_ref.write_u8(offset::BTREE_FRAGMENTED_BYTES_COUNT, frag);
page_ref.write_fragmented_bytes_count(frag);
return Ok(pc);
} else if new_size + pc > maxpc {
return_corrupt!("Free block extends beyond page end");
@@ -6139,14 +6139,14 @@ pub fn btree_init_page(page: &BTreePage, page_type: PageType, offset: usize, usa
let contents = contents.get().contents.as_mut().unwrap();
contents.offset = offset;
let id = page_type as u8;
contents.write_u8(offset::BTREE_PAGE_TYPE, id);
contents.write_u16(offset::BTREE_FIRST_FREEBLOCK, 0);
contents.write_u16(offset::BTREE_CELL_COUNT, 0);
contents.write_page_type(id);
contents.write_first_freeblock(0);
contents.write_cell_count(0);
contents.write_u16(offset::BTREE_CELL_CONTENT_AREA, usable_space);
contents.write_cell_content_area(usable_space);
contents.write_u8(offset::BTREE_FRAGMENTED_BYTES_COUNT, 0);
contents.write_u32(offset::BTREE_RIGHTMOST_PTR, 0);
contents.write_fragmented_bytes_count(0);
contents.write_rightmost_ptr(0);
}
fn to_static_buf(buf: &mut [u8]) -> &'static mut [u8] {
@@ -6238,7 +6238,7 @@ fn edit_page(
)?;
debug_validate_cells!(page, usable_space);
// TODO: noverflow
page.write_u16(offset::BTREE_CELL_COUNT, number_new_cells as u16);
page.write_cell_count(number_new_cells as u16);
Ok(())
}
@@ -6339,10 +6339,7 @@ fn page_free_array(
usable_space,
)?;
}
page.write_u16(
offset::BTREE_CELL_COUNT,
page.cell_count() as u16 - number_of_cells_removed as u16,
);
page.write_cell_count(page.cell_count() as u16 - number_of_cells_removed as u16);
Ok(number_of_cells_removed)
}
fn page_insert_array(
@@ -6457,7 +6454,7 @@ fn free_cell_range(
));
}
let frag = page.num_frag_free_bytes() - removed_fragmentation;
page.write_u8(offset::BTREE_FRAGMENTED_BYTES_COUNT, frag);
page.write_fragmented_bytes_count(frag);
pc
};
@@ -6468,8 +6465,8 @@ fn free_cell_range(
if pointer_to_pc != page.offset as u16 + offset::BTREE_FIRST_FREEBLOCK as u16 {
return_corrupt!("Invalid content area merge");
}
page.write_u16(offset::BTREE_FIRST_FREEBLOCK, pc);
page.write_u16(offset::BTREE_CELL_CONTENT_AREA, end);
page.write_first_freeblock(pc);
page.write_cell_content_area(end);
} else {
page.write_u16_no_offset(pointer_to_pc as usize, offset);
page.write_u16_no_offset(offset as usize, pc);
@@ -6569,11 +6566,8 @@ fn defragment_page_fast(
}
// Update page header
page.write_u16(
offset::BTREE_CELL_CONTENT_AREA,
new_cell_content_area as u16,
);
page.write_u16(offset::BTREE_FIRST_FREEBLOCK, 0);
page.write_cell_content_area(new_cell_content_area as u16);
page.write_first_freeblock(0);
debug_validate_cells!(page, usable_space);
@@ -6592,9 +6586,9 @@ fn defragment_page(page: &PageContent, usable_space: u16, max_frag_bytes: isize)
let cell_count = page.cell_count();
if cell_count == 0 {
page.write_u16(offset::BTREE_CELL_CONTENT_AREA, usable_space);
page.write_u16(offset::BTREE_FIRST_FREEBLOCK, 0);
page.write_u8(offset::BTREE_FRAGMENTED_BYTES_COUNT, 0);
page.write_cell_content_area(usable_space);
page.write_first_freeblock(0);
page.write_fragmented_bytes_count(0);
debug_validate_cells!(page, usable_space);
return Ok(());
}
@@ -6686,9 +6680,9 @@ fn defragment_page(page: &PageContent, usable_space: u16, max_frag_bytes: isize)
page.write_u16_no_offset(pointer_location, new_offset);
}
page.write_u16(offset::BTREE_CELL_CONTENT_AREA, cbrk);
page.write_u16(offset::BTREE_FIRST_FREEBLOCK, 0);
page.write_u8(offset::BTREE_FRAGMENTED_BYTES_COUNT, 0);
page.write_cell_content_area(cbrk);
page.write_first_freeblock(0);
page.write_fragmented_bytes_count(0);
debug_validate_cells!(page, usable_space);
Ok(())
@@ -6789,7 +6783,7 @@ fn _insert_into_cell(
// update cell count
let new_n_cells = (page.cell_count() + 1) as u16;
page.write_u16(offset::BTREE_CELL_COUNT, new_n_cells);
page.write_cell_count(new_n_cells);
debug_validate_cells!(page, usable_space);
Ok(())
}
@@ -6930,10 +6924,7 @@ fn allocate_cell_space(
// insert the cell -> content area start moves left by that amount.
cell_content_area_start -= amount;
page_ref.write_u16(
offset::BTREE_CELL_CONTENT_AREA,
cell_content_area_start as u16,
);
page_ref.write_cell_content_area(cell_content_area_start as u16);
assert!(cell_content_area_start + amount <= usable_space as usize);
// we can just return the start of the cell content area, since the cell is inserted to the very left of the cell content area.
@@ -7154,11 +7145,11 @@ fn drop_cell(page: &mut PageContent, cell_idx: usize, usable_space: u16) -> Resu
if page.cell_count() > 1 {
shift_pointers_left(page, cell_idx);
} else {
page.write_u16(offset::BTREE_CELL_CONTENT_AREA, usable_space);
page.write_u16(offset::BTREE_FIRST_FREEBLOCK, 0);
page.write_u8(offset::BTREE_FRAGMENTED_BYTES_COUNT, 0);
page.write_cell_content_area(usable_space);
page.write_first_freeblock(0);
page.write_fragmented_bytes_count(0);
}
page.write_u16(offset::BTREE_CELL_COUNT, page.cell_count() as u16 - 1);
page.write_cell_count(page.cell_count() as u16 - 1);
debug_validate_cells!(page, usable_space);
Ok(())
}
@@ -8549,7 +8540,7 @@ mod tests {
} else {
0
};
contents.write_u32(0, next_page); // Write pointer to next overflow page
contents.write_u32_no_offset(0, next_page); // Write pointer to next overflow page
let buf = contents.as_ptr();
buf[4..].fill(b'A');
@@ -8601,11 +8592,11 @@ mod tests {
let (trunk_page, _c) = cursor.read_page(trunk_page_id as usize)?;
if let Some(contents) = trunk_page.get().get().contents.as_ref() {
// Read number of leaf pages in trunk
let n_leaf = contents.read_u32(4);
let n_leaf = contents.read_u32_no_offset(4);
assert!(n_leaf > 0, "Trunk page should have leaf entries");
for i in 0..n_leaf {
let leaf_page_id = contents.read_u32(8 + (i as usize * 4));
let leaf_page_id = contents.read_u32_no_offset(8 + (i as usize * 4));
assert!(
(2..=4).contains(&leaf_page_id),
"Leaf page ID {leaf_page_id} should be in range 2-4"
@@ -8707,7 +8698,7 @@ mod tests {
let contents = root_page.get().contents.as_mut().unwrap();
// Set rightmost pointer to page4
contents.write_u32(offset::BTREE_RIGHTMOST_PTR, page4.get().get().id as u32);
contents.write_rightmost_ptr(page4.get().get().id as u32);
// Create a cell with pointer to page3
let cell_content = vec![

View File

@@ -1696,7 +1696,7 @@ impl Pager {
let trunk_page_contents = trunk_page.get().contents.as_ref().unwrap();
let number_of_leaf_pages =
trunk_page_contents.read_u32(TRUNK_PAGE_LEAF_COUNT_OFFSET);
trunk_page_contents.read_u32_no_offset(TRUNK_PAGE_LEAF_COUNT_OFFSET);
// Reserve 2 slots for the trunk page header which is 8 bytes or 2*LEAF_ENTRY_SIZE
let max_free_list_entries =
@@ -1709,9 +1709,11 @@ impl Pager {
);
self.add_dirty(trunk_page);
trunk_page_contents
.write_u32(TRUNK_PAGE_LEAF_COUNT_OFFSET, number_of_leaf_pages + 1);
trunk_page_contents.write_u32(
trunk_page_contents.write_u32_no_offset(
TRUNK_PAGE_LEAF_COUNT_OFFSET,
number_of_leaf_pages + 1,
);
trunk_page_contents.write_u32_no_offset(
TRUNK_PAGE_HEADER_SIZE
+ (number_of_leaf_pages as usize * LEAF_ENTRY_SIZE),
page_id as u32,
@@ -1733,9 +1735,9 @@ impl Pager {
let contents = page.get().contents.as_mut().unwrap();
// Point to previous trunk
contents.write_u32(TRUNK_PAGE_NEXT_PAGE_OFFSET, trunk_page_id);
contents.write_u32_no_offset(TRUNK_PAGE_NEXT_PAGE_OFFSET, trunk_page_id);
// Zero leaf count
contents.write_u32(TRUNK_PAGE_LEAF_COUNT_OFFSET, 0);
contents.write_u32_no_offset(TRUNK_PAGE_LEAF_COUNT_OFFSET, 0);
// Update page 1 to point to new trunk
header.freelist_trunk_page = (page_id as u32).into();
break;
@@ -1904,9 +1906,9 @@ impl Pager {
);
let page_contents = trunk_page.get().contents.as_ref().unwrap();
let next_trunk_page_id =
page_contents.read_u32(FREELIST_TRUNK_OFFSET_NEXT_TRUNK);
page_contents.read_u32_no_offset(FREELIST_TRUNK_OFFSET_NEXT_TRUNK);
let number_of_freelist_leaves =
page_contents.read_u32(FREELIST_TRUNK_OFFSET_LEAF_COUNT);
page_contents.read_u32_no_offset(FREELIST_TRUNK_OFFSET_LEAF_COUNT);
// There are leaf pointers on this trunk page, so we can reuse one of the pages
// for the allocation.
@@ -1968,7 +1970,7 @@ impl Pager {
);
let page_contents = trunk_page.get().contents.as_ref().unwrap();
let next_leaf_page_id =
page_contents.read_u32(FREELIST_TRUNK_OFFSET_FIRST_LEAF);
page_contents.read_u32_no_offset(FREELIST_TRUNK_OFFSET_FIRST_LEAF);
let (leaf_page, _c) = self.read_page(next_leaf_page_id as usize)?;
if leaf_page.is_locked() {
return Ok(IOResult::IO);
@@ -2007,7 +2009,7 @@ impl Pager {
);
}
// write the new leaf count
page_contents.write_u32(
page_contents.write_u32_no_offset(
FREELIST_TRUNK_OFFSET_LEAF_COUNT,
remaining_leaves_count as u32,
);

View File

@@ -456,53 +456,113 @@ impl PageContent {
self.buffer.as_mut_slice()
}
pub fn read_u8(&self, pos: usize) -> u8 {
/// Read a u8 from the page content at the given offset, taking account the possible db header on page 1 (self.offset).
/// Do not make this method public.
fn read_u8(&self, pos: usize) -> u8 {
let buf = self.as_ptr();
buf[self.offset + pos]
}
pub fn read_u16(&self, pos: usize) -> u16 {
/// Read a u16 from the page content at the given offset, taking account the possible db header on page 1 (self.offset).
/// Do not make this method public.
fn read_u16(&self, pos: usize) -> u16 {
let buf = self.as_ptr();
u16::from_be_bytes([buf[self.offset + pos], buf[self.offset + pos + 1]])
}
pub fn read_u16_no_offset(&self, pos: usize) -> u16 {
let buf = self.as_ptr();
u16::from_be_bytes([buf[pos], buf[pos + 1]])
}
pub fn read_u32_no_offset(&self, pos: usize) -> u32 {
let buf = self.as_ptr();
u32::from_be_bytes([buf[pos], buf[pos + 1], buf[pos + 2], buf[pos + 3]])
}
pub fn read_u32(&self, pos: usize) -> u32 {
/// Read a u32 from the page content at the given offset, taking account the possible db header on page 1 (self.offset).
/// Do not make this method public.
fn read_u32(&self, pos: usize) -> u32 {
let buf = self.as_ptr();
read_u32(buf, self.offset + pos)
}
pub fn write_u8(&self, pos: usize, value: u8) {
/// Write a u8 to the page content at the given offset, taking account the possible db header on page 1 (self.offset).
/// Do not make this method public.
fn write_u8(&self, pos: usize, value: u8) {
tracing::trace!("write_u8(pos={}, value={})", pos, value);
let buf = self.as_ptr();
buf[self.offset + pos] = value;
}
pub fn write_u16(&self, pos: usize, value: u16) {
/// Write a u16 to the page content at the given offset, taking account the possible db header on page 1 (self.offset).
/// Do not make this method public.
fn write_u16(&self, pos: usize, value: u16) {
tracing::trace!("write_u16(pos={}, value={})", pos, value);
let buf = self.as_ptr();
buf[self.offset + pos..self.offset + pos + 2].copy_from_slice(&value.to_be_bytes());
}
/// Write a u32 to the page content at the given offset, taking account the possible db header on page 1 (self.offset).
/// Do not make this method public.
fn write_u32(&self, pos: usize, value: u32) {
tracing::trace!("write_u32(pos={}, value={})", pos, value);
let buf = self.as_ptr();
buf[self.offset + pos..self.offset + pos + 4].copy_from_slice(&value.to_be_bytes());
}
/// Read a u16 from the page content at the given absolute offset, i.e. NOT taking account the possible db header on page 1 (self.offset).
/// This is useful when you want to read a location that you read from another location on the page, or when writing a field of an overflow
/// or freelist page, for example.
pub fn read_u16_no_offset(&self, pos: usize) -> u16 {
let buf = self.as_ptr();
u16::from_be_bytes([buf[pos], buf[pos + 1]])
}
/// Read a u32 from the page content at the given absolute offset, i.e. NOT taking account the possible db header on page 1 (self.offset).
/// This is useful when you want to read a location that you read from another location on the page, or when writing a field of an overflow
/// or freelist page, for example.
pub fn read_u32_no_offset(&self, pos: usize) -> u32 {
let buf = self.as_ptr();
u32::from_be_bytes([buf[pos], buf[pos + 1], buf[pos + 2], buf[pos + 3]])
}
/// Write a u16 to the page content at the given absolute offset, i.e. NOT taking account the possible db header on page 1 (self.offset).
/// This is useful when you want to write a location that you read from another location on the page, or when writing a field of an overflow
/// or freelist page, for example.
pub fn write_u16_no_offset(&self, pos: usize, value: u16) {
tracing::trace!("write_u16(pos={}, value={})", pos, value);
let buf = self.as_ptr();
buf[pos..pos + 2].copy_from_slice(&value.to_be_bytes());
}
pub fn write_u32(&self, pos: usize, value: u32) {
/// Write a u32 to the page content at the given absolute offset, i.e. NOT taking account the possible db header on page 1 (self.offset).
/// This is useful when you want to write a location that you read from another location on the page, or when writing a field of an overflow
/// or freelist page, for example.
pub fn write_u32_no_offset(&self, pos: usize, value: u32) {
tracing::trace!("write_u32(pos={}, value={})", pos, value);
let buf = self.as_ptr();
buf[self.offset + pos..self.offset + pos + 4].copy_from_slice(&value.to_be_bytes());
buf[pos..pos + 4].copy_from_slice(&value.to_be_bytes());
}
/// Assign a new page type to this page.
pub fn write_page_type(&self, value: u8) {
self.write_u8(BTREE_PAGE_TYPE, value);
}
/// Assign a new rightmost pointer to this page.
pub fn write_rightmost_ptr(&self, value: u32) {
self.write_u32(BTREE_RIGHTMOST_PTR, value);
}
/// Write the location (byte offset) of the first freeblock on this page, or zero if there are no freeblocks on the page.
pub fn write_first_freeblock(&self, value: u16) {
self.write_u16(BTREE_FIRST_FREEBLOCK, value);
}
/// Write the number of cells on this page.
pub fn write_cell_count(&self, value: u16) {
self.write_u16(BTREE_CELL_COUNT, value);
}
/// Write the beginning of the cell content area on this page.
pub fn write_cell_content_area(&self, value: u16) {
self.write_u16(BTREE_CELL_CONTENT_AREA, value);
}
/// Write the number of fragmented bytes on this page.
pub fn write_fragmented_bytes_count(&self, value: u8) {
self.write_u8(BTREE_FRAGMENTED_BYTES_COUNT, value);
}
/// The offset of the first freeblock, or zero if there are no freeblocks on the page.