fix read overflow page procedure

This commit is contained in:
Pere Diaz Bou
2025-03-19 16:07:24 +01:00
parent f51c20adf0
commit 83d0c9a1b6
3 changed files with 80 additions and 18 deletions

View File

@@ -145,6 +145,7 @@ enum ReadPayloadOverlow {
payload: Vec<u8>,
next_page: u32,
remaining_to_read: usize,
page: PageRef,
},
}
@@ -400,7 +401,7 @@ impl BTreeCursor {
{
record
} else {
continue;
return Ok(CursorResult::IO);
}
} else {
crate::storage::sqlite3_ondisk::read_record(_payload)?
@@ -424,12 +425,13 @@ impl BTreeCursor {
) -> Result<Option<ImmutableRecord>> {
let res = match &mut self.state {
CursorState::None => {
tracing::debug!("start reading overflow page payload_size={}", payload_size);
let page = self.pager.read_page(start_next_page as usize)?;
self.stack.push(page);
self.state = CursorState::Read(ReadPayloadOverlow::ProcessPage {
payload: payload.to_vec(),
next_page: start_next_page,
remaining_to_read: payload_size as usize - payload.len(),
page,
});
None
}
@@ -437,17 +439,18 @@ impl BTreeCursor {
payload,
next_page,
remaining_to_read,
page,
}) => {
let page = self.stack.top();
if page.is_locked() {
return Ok(None);
}
self.stack.pop();
tracing::debug!("reading overflow page {} {}", next_page, remaining_to_read);
let contents = page.get_contents();
let next = contents.read_u32_no_offset(0);
let buf = contents.as_ptr();
let to_read = (*remaining_to_read).min(buf.len() - 4);
payload.extend_from_slice(&buf[4..to_read]);
let usable_space = self.pager.usable_space();
let to_read = (*remaining_to_read).min(usable_space - 4);
payload.extend_from_slice(&buf[4..4 + to_read]);
*remaining_to_read -= to_read;
if *remaining_to_read == 0 || next == 0 {
assert!(
@@ -458,7 +461,7 @@ impl BTreeCursor {
Some(record)
} else {
let new_page = self.pager.read_page(next as usize)?;
self.stack.push(new_page);
*page = new_page;
*next_page = next;
None
}
@@ -571,7 +574,7 @@ impl BTreeCursor {
{
record
} else {
continue;
return Ok(CursorResult::IO);
}
} else {
crate::storage::sqlite3_ondisk::read_record(_payload)?
@@ -596,7 +599,7 @@ impl BTreeCursor {
{
record
} else {
continue;
return Ok(CursorResult::IO);
}
} else {
crate::storage::sqlite3_ondisk::read_record(payload)?
@@ -644,7 +647,7 @@ impl BTreeCursor {
{
record
} else {
continue;
return Ok(CursorResult::IO);
}
} else {
crate::storage::sqlite3_ondisk::read_record(payload)?
@@ -735,7 +738,7 @@ impl BTreeCursor {
{
record
} else {
continue;
return Ok(CursorResult::IO);
}
} else {
crate::storage::sqlite3_ondisk::read_record(payload)?
@@ -760,7 +763,7 @@ impl BTreeCursor {
{
record
} else {
continue;
return Ok(CursorResult::IO);
}
} else {
crate::storage::sqlite3_ondisk::read_record(payload)?
@@ -854,6 +857,7 @@ impl BTreeCursor {
pub fn move_to(&mut self, key: SeekKey<'_>, cmp: SeekOp) -> Result<CursorResult<()>> {
assert!(self.mv_cursor.is_none());
tracing::debug!("move_to(key={:?} cmp={:?})", key, cmp);
// For a table with N rows, we can find any row by row id in O(log(N)) time by starting at the root page and following the B-tree pointers.
// B-trees consist of interior pages and leaf pages. Interior pages contain pointers to other pages, while leaf pages contain the actual row data.
//
@@ -947,7 +951,7 @@ impl BTreeCursor {
{
record
} else {
continue;
return Ok(CursorResult::IO);
}
} else {
crate::storage::sqlite3_ondisk::read_record(payload)?
@@ -1030,6 +1034,7 @@ impl BTreeCursor {
// find cell
(self.find_cell(page, int_key), page.page_type())
};
tracing::debug!("insert_into_page(cell_idx={})", cell_idx);
// if the cell index is less than the total cells, check: if its an existing
// rowid, we are going to update / overwrite the cell
@@ -1719,7 +1724,9 @@ impl BTreeCursor {
cell_array.cell_count(page_idx) - start_new_cells,
)
};
let page = pages_to_balance_new[page_idx].get_contents();
let page = &pages_to_balance_new[page_idx];
tracing::debug!("pre_edit_page(page={})", page.get().id);
let page = page.get_contents();
edit_page(
page,
start_old_cells,
@@ -2775,7 +2782,7 @@ impl PageStack {
/// We usually advance after going traversing a new page
fn advance(&self) {
let current = self.current();
tracing::trace!("advance {}", self.cell_indices.borrow()[current]);
tracing::trace!("advance {}", self.cell_indices.borrow()[current],);
self.cell_indices.borrow_mut()[current] += 1;
}
@@ -2979,6 +2986,7 @@ fn page_free_array(
cell_array: &CellArray,
usable_space: u16,
) -> Result<usize> {
tracing::debug!("page_free_array {}..{}", first, first + count);
let buf = &mut page.as_ptr()[page.offset..usable_space as usize];
let buf_range = buf.as_ptr_range();
let mut number_of_cells_removed = 0;
@@ -3013,6 +3021,7 @@ fn page_insert_array(
) -> Result<()> {
// TODO: implement faster algorithm, this is doing extra work that's not needed.
// See pageInsertArray to understand faster way.
tracing::debug!("page_insert_array {}..{}", first, first + count);
for i in first..first + count {
insert_into_cell(page, cell_array.cells[i], start_insert, usable_space)?;
start_insert += 1;
@@ -3209,8 +3218,8 @@ fn insert_into_cell(
return Ok(());
}
// TODO: insert into cell payload in internal page
let new_cell_data_pointer = allocate_cell_space(page, payload.len() as u16, usable_space)?;
tracing::debug!("insert_into_cell pc={}", new_cell_data_pointer);
let buf = page.as_ptr();
// copy data
@@ -3451,6 +3460,7 @@ fn fill_cell_payload(
/// This is done when a cell overflows and new space is needed.
fn allocate_overflow_page(pager: Rc<Pager>) -> PageRef {
let page = pager.allocate_page().unwrap();
tracing::debug!("allocate_overflow_page(id={})", page.get().id);
// setup overflow page
let contents = page.get().contents.as_mut().unwrap();
@@ -4946,6 +4956,59 @@ mod tests {
}
}
#[test]
pub fn test_overflow_cells() {
let iterations = 10_usize;
let mut huge_texts = Vec::new();
for i in 0..iterations {
let mut huge_text = String::new();
for _j in 0..8192 {
huge_text.push((b'A' + i as u8) as char);
}
huge_texts.push(huge_text);
}
let (pager, root_page) = empty_btree();
for i in 0..iterations {
let mut cursor = BTreeCursor::new(None, pager.clone(), root_page);
tracing::info!("INSERT INTO t VALUES ({});", i,);
let key = OwnedValue::Integer(i as i64);
let value = Record::new(
[OwnedValue::Text(Text {
value: Rc::new(huge_texts[i].as_bytes().to_vec()),
subtype: crate::types::TextSubtype::Text,
})]
.to_vec(),
);
tracing::trace!("before insert {}", i);
tracing::debug!(
"=========== btree before ===========\n{}\n\n",
format_btree(pager.clone(), root_page, 0)
);
run_until_done(
|| {
let key = SeekKey::TableRowId(i as u64);
cursor.move_to(key, SeekOp::EQ)
},
pager.deref(),
)
.unwrap();
run_until_done(|| cursor.insert(&key, &value, true), pager.deref()).unwrap();
tracing::debug!(
"=========== btree after ===========\n{}\n\n",
format_btree(pager.clone(), root_page, 0)
);
}
let mut cursor = BTreeCursor::new(None, pager.clone(), root_page);
cursor.move_to_root();
for i in 0..iterations {
let (rowid, _) =
run_until_done(|| cursor.get_next_record(None), pager.deref()).unwrap();
assert_eq!(rowid.unwrap(), i as u64, "got!=expected");
}
}
fn run_until_done<T>(
mut action: impl FnMut() -> Result<CursorResult<T>>,
pager: &Pager,

View File

@@ -452,7 +452,7 @@ impl Wal for WalFile {
shared.max_frame + 1
};
let offset = self.frame_offset(frame_id);
trace!(
tracing::debug!(
"append_frame(frame={}, offset={}, page_id={})",
frame_id,
offset,

View File

@@ -80,7 +80,6 @@ use std::num::NonZero;
use std::ops::Deref;
use std::rc::{Rc, Weak};
use std::sync::Arc;
use tracing::instrument;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
/// Represents a target for a jump instruction.