mirror of
https://github.com/aljazceru/turso.git
synced 2025-12-24 03:34:18 +01:00
btree: clear overflow pages when insert overwrites a cell
This commit is contained in:
@@ -145,6 +145,7 @@ macro_rules! return_if_locked_maybe_load {
|
||||
|
||||
/// Wrapper around a page reference used in order to update the reference in case page was unloaded
|
||||
/// and we need to update the reference.
|
||||
#[derive(Debug)]
|
||||
pub struct BTreePageInner {
|
||||
pub page: RefCell<PageRef>,
|
||||
}
|
||||
@@ -225,11 +226,33 @@ struct DeleteInfo {
|
||||
balance_write_info: Option<WriteInfo>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum OverwriteCellState {
|
||||
FillPayload,
|
||||
ClearOverflowPagesAndOverwrite {
|
||||
new_payload: Vec<u8>,
|
||||
old_offset: usize,
|
||||
old_local_size: usize,
|
||||
},
|
||||
}
|
||||
|
||||
/// State machine of a write operation.
|
||||
/// May involve balancing due to overflow.
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
#[derive(Debug, Clone)]
|
||||
enum WriteState {
|
||||
Start,
|
||||
Overwrite {
|
||||
page: Arc<BTreePageInner>,
|
||||
cell_idx: usize,
|
||||
state: OverwriteCellState,
|
||||
},
|
||||
Insert {
|
||||
page: Arc<BTreePageInner>,
|
||||
cell_idx: usize,
|
||||
},
|
||||
CheckNeedsBalancing {
|
||||
page: Arc<BTreePageInner>,
|
||||
},
|
||||
BalanceStart,
|
||||
BalanceFreePages {
|
||||
curr_page: usize,
|
||||
@@ -2114,7 +2137,7 @@ impl BTreeCursor {
|
||||
.state
|
||||
.mut_write_info()
|
||||
.expect("can't insert while counting");
|
||||
write_info.state
|
||||
write_info.state.clone()
|
||||
};
|
||||
match write_state {
|
||||
WriteState::Start => {
|
||||
@@ -2148,20 +2171,16 @@ impl BTreeCursor {
|
||||
BTreeCell::TableLeafCell(tbl_leaf) => {
|
||||
if tbl_leaf.rowid == bkey.to_rowid() {
|
||||
tracing::debug!("TableLeafCell: found exact match with cell_idx={cell_idx}, overwriting");
|
||||
self.overwrite_cell(page.clone(), cell_idx, record)?;
|
||||
self.has_record.set(true);
|
||||
let write_info = self
|
||||
.state
|
||||
.mut_write_info()
|
||||
.expect("expected write info");
|
||||
if page.get().get_contents().overflow_cells.is_empty() {
|
||||
write_info.state = WriteState::Finish;
|
||||
} else {
|
||||
write_info.state = WriteState::BalanceStart;
|
||||
// If we balance, we must save the cursor position and seek to it later.
|
||||
// FIXME: we shouldn't have both DeleteState::SeekAfterBalancing and
|
||||
// save_context()/restore/context(), they are practically the same thing.
|
||||
self.save_context(CursorContext::TableRowId(bkey.to_rowid()));
|
||||
}
|
||||
write_info.state = WriteState::Overwrite {
|
||||
page: page.clone(),
|
||||
cell_idx,
|
||||
state: OverwriteCellState::FillPayload,
|
||||
};
|
||||
continue;
|
||||
}
|
||||
}
|
||||
@@ -2178,20 +2197,15 @@ impl BTreeCursor {
|
||||
if cmp == Ordering::Equal {
|
||||
tracing::debug!("IndexLeafCell: found exact match with cell_idx={cell_idx}, overwriting");
|
||||
self.has_record.set(true);
|
||||
self.overwrite_cell(page.clone(), cell_idx, record)?;
|
||||
let write_info = self
|
||||
.state
|
||||
.mut_write_info()
|
||||
.expect("expected write info");
|
||||
if page.get().get_contents().overflow_cells.is_empty() {
|
||||
write_info.state = WriteState::Finish;
|
||||
} else {
|
||||
write_info.state = WriteState::BalanceStart;
|
||||
// If we balance, we must save the cursor position and seek to it later.
|
||||
// FIXME: we shouldn't have both DeleteState::SeekAfterBalancing and
|
||||
// save_context()/restore/context(), they are practically the same thing.
|
||||
self.save_context(CursorContext::IndexKeyRowId((*record).clone()));
|
||||
}
|
||||
write_info.state = WriteState::Overwrite {
|
||||
page: page.clone(),
|
||||
cell_idx,
|
||||
state: OverwriteCellState::FillPayload,
|
||||
};
|
||||
continue;
|
||||
} else {
|
||||
turso_assert!(
|
||||
@@ -2204,8 +2218,17 @@ impl BTreeCursor {
|
||||
}
|
||||
}
|
||||
|
||||
// insert cell
|
||||
|
||||
let write_info = self
|
||||
.state
|
||||
.mut_write_info()
|
||||
.expect("write info should be present");
|
||||
write_info.state = WriteState::Insert {
|
||||
page: page.clone(),
|
||||
cell_idx,
|
||||
};
|
||||
continue;
|
||||
}
|
||||
WriteState::Insert { page, cell_idx } => {
|
||||
let mut cell_payload: Vec<u8> = Vec::with_capacity(record_values.len() + 4);
|
||||
fill_cell_payload(
|
||||
page.get().get().contents.as_ref().unwrap(),
|
||||
@@ -2217,8 +2240,7 @@ impl BTreeCursor {
|
||||
self.pager.clone(),
|
||||
);
|
||||
|
||||
// insert
|
||||
let overflow = {
|
||||
{
|
||||
let page = page.get();
|
||||
let contents = page.get().contents.as_mut().unwrap();
|
||||
tracing::debug!(name: "overflow", cell_count = contents.cell_count());
|
||||
@@ -2229,30 +2251,74 @@ impl BTreeCursor {
|
||||
cell_idx,
|
||||
self.usable_space() as u16,
|
||||
)?;
|
||||
!contents.overflow_cells.is_empty()
|
||||
};
|
||||
self.stack.set_cell_index(cell_idx as i32);
|
||||
if overflow {
|
||||
// A balance will happen so save the key we were inserting
|
||||
tracing::debug!(page = page.get().get().id, cell_idx, "balance triggered:");
|
||||
let write_info = self
|
||||
.state
|
||||
.mut_write_info()
|
||||
.expect("write info should be present");
|
||||
write_info.state = WriteState::CheckNeedsBalancing { page: page.clone() };
|
||||
continue;
|
||||
}
|
||||
WriteState::Overwrite {
|
||||
page,
|
||||
cell_idx,
|
||||
mut state,
|
||||
} => {
|
||||
turso_assert!(
|
||||
page.get().is_loaded(),
|
||||
"page {}is not loaded",
|
||||
page.get().get().id
|
||||
);
|
||||
if matches!(
|
||||
self.overwrite_cell(page.clone(), cell_idx, record, &mut state)?,
|
||||
IOResult::IO
|
||||
) {
|
||||
let write_info = self
|
||||
.state
|
||||
.mut_write_info()
|
||||
.expect("write info should be present");
|
||||
let WriteState::Overwrite {
|
||||
state: old_state, ..
|
||||
} = &mut write_info.state
|
||||
else {
|
||||
panic!("expected overwrite state");
|
||||
};
|
||||
*old_state = state;
|
||||
return Ok(IOResult::IO);
|
||||
}
|
||||
let write_info = self
|
||||
.state
|
||||
.mut_write_info()
|
||||
.expect("write info should be present");
|
||||
write_info.state = WriteState::CheckNeedsBalancing { page: page.clone() };
|
||||
continue;
|
||||
}
|
||||
WriteState::CheckNeedsBalancing { page } => {
|
||||
turso_assert!(
|
||||
page.get().is_loaded(),
|
||||
"page {}is not loaded",
|
||||
page.get().get().id
|
||||
);
|
||||
let write_info = self
|
||||
.state
|
||||
.mut_write_info()
|
||||
.expect("write info should be present");
|
||||
if page.get().get_contents().overflow_cells.is_empty() {
|
||||
write_info.state = WriteState::Finish;
|
||||
} else {
|
||||
write_info.state = WriteState::BalanceStart;
|
||||
// If we balance, we must save the cursor position and seek to it later.
|
||||
// FIXME: we shouldn't have both DeleteState::SeekAfterBalancing and
|
||||
// save_context()/restore/context(), they are practically the same thing.
|
||||
self.save_context(match bkey {
|
||||
BTreeKey::TableRowId(rowid) => CursorContext::TableRowId(rowid.0),
|
||||
BTreeKey::IndexKey(record) => {
|
||||
CursorContext::IndexKeyRowId((*record).clone())
|
||||
}
|
||||
});
|
||||
let write_info = self
|
||||
.state
|
||||
.mut_write_info()
|
||||
.expect("can't count while inserting");
|
||||
write_info.state = WriteState::BalanceStart;
|
||||
} else {
|
||||
let write_info = self
|
||||
.state
|
||||
.mut_write_info()
|
||||
.expect("can't count while inserting");
|
||||
write_info.state = WriteState::Finish;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
WriteState::BalanceStart
|
||||
| WriteState::BalanceFreePages { .. }
|
||||
@@ -2293,7 +2359,12 @@ impl BTreeCursor {
|
||||
"Cursor must be in balancing state"
|
||||
);
|
||||
loop {
|
||||
let state = self.state.write_info().expect("must be balancing").state;
|
||||
let state = self
|
||||
.state
|
||||
.write_info()
|
||||
.expect("must be balancing")
|
||||
.state
|
||||
.clone();
|
||||
match state {
|
||||
WriteState::BalanceStart => {
|
||||
assert!(
|
||||
@@ -2367,11 +2438,20 @@ impl BTreeCursor {
|
||||
matches!(self.state, CursorState::Write(_)),
|
||||
"Cursor must be in balancing state"
|
||||
);
|
||||
let state = self.state.write_info().expect("must be balancing").state;
|
||||
let state = self
|
||||
.state
|
||||
.write_info()
|
||||
.expect("must be balancing")
|
||||
.state
|
||||
.clone();
|
||||
tracing::debug!(?state);
|
||||
let (next_write_state, result) = match state {
|
||||
WriteState::Start => todo!(),
|
||||
WriteState::BalanceStart => todo!(),
|
||||
WriteState::Start
|
||||
| WriteState::Overwrite { .. }
|
||||
| WriteState::Insert { .. }
|
||||
| WriteState::CheckNeedsBalancing { .. }
|
||||
| WriteState::BalanceStart
|
||||
| WriteState::Finish => panic!("balance_non_root: unexpected state {state:?}"),
|
||||
WriteState::BalanceNonRootPickSiblings => {
|
||||
let parent_page = self.stack.top();
|
||||
return_if_locked_maybe_load!(self.pager, parent_page);
|
||||
@@ -3378,7 +3458,6 @@ impl BTreeCursor {
|
||||
)
|
||||
}
|
||||
}
|
||||
WriteState::Finish => todo!(),
|
||||
};
|
||||
if matches!(next_write_state, WriteState::BalanceStart) {
|
||||
// reset balance state
|
||||
@@ -4946,48 +5025,73 @@ impl BTreeCursor {
|
||||
page_ref: BTreePage,
|
||||
cell_idx: usize,
|
||||
record: &ImmutableRecord,
|
||||
state: &mut OverwriteCellState,
|
||||
) -> Result<IOResult<()>> {
|
||||
// build the new payload
|
||||
let page = page_ref.get();
|
||||
let page_contents = page.get().contents.as_ref().unwrap();
|
||||
let serial_types_len = self.record_cursor.borrow_mut().len(record);
|
||||
let mut new_payload = Vec::with_capacity(serial_types_len);
|
||||
let rowid = return_if_io!(self.rowid());
|
||||
fill_cell_payload(
|
||||
page_contents,
|
||||
rowid,
|
||||
&mut new_payload,
|
||||
cell_idx,
|
||||
record,
|
||||
self.usable_space(),
|
||||
self.pager.clone(),
|
||||
);
|
||||
loop {
|
||||
turso_assert!(
|
||||
page_ref.get().is_loaded(),
|
||||
"page {} is not loaded",
|
||||
page_ref.get().get().id
|
||||
);
|
||||
match state {
|
||||
OverwriteCellState::FillPayload => {
|
||||
let page = page_ref.get();
|
||||
let page_contents = page.get().contents.as_ref().unwrap();
|
||||
let serial_types_len = self.record_cursor.borrow_mut().len(record);
|
||||
let mut new_payload = Vec::with_capacity(serial_types_len);
|
||||
let rowid = return_if_io!(self.rowid());
|
||||
fill_cell_payload(
|
||||
page_contents,
|
||||
rowid,
|
||||
&mut new_payload,
|
||||
cell_idx,
|
||||
record,
|
||||
self.usable_space(),
|
||||
self.pager.clone(),
|
||||
);
|
||||
// figure out old cell offset & size
|
||||
let (old_offset, old_local_size) = {
|
||||
let page_ref = page_ref.get();
|
||||
let page = page_ref.get().contents.as_ref().unwrap();
|
||||
page.cell_get_raw_region(cell_idx, self.usable_space())
|
||||
};
|
||||
|
||||
// figure out old cell offset & size
|
||||
let (old_offset, old_local_size) = {
|
||||
let page_ref = page_ref.get();
|
||||
let page = page_ref.get().contents.as_ref().unwrap();
|
||||
page.cell_get_raw_region(cell_idx, self.usable_space())
|
||||
};
|
||||
*state = OverwriteCellState::ClearOverflowPagesAndOverwrite {
|
||||
new_payload,
|
||||
old_offset,
|
||||
old_local_size,
|
||||
};
|
||||
continue;
|
||||
}
|
||||
OverwriteCellState::ClearOverflowPagesAndOverwrite {
|
||||
new_payload,
|
||||
old_offset,
|
||||
old_local_size,
|
||||
} => {
|
||||
let page = page_ref.get();
|
||||
let page_contents = page.get().contents.as_ref().unwrap();
|
||||
let cell = page_contents.cell_get(cell_idx, self.usable_space())?;
|
||||
return_if_io!(self.clear_overflow_pages(&cell));
|
||||
// if it all fits in local space and old_local_size is enough, do an in-place overwrite
|
||||
if new_payload.len() == *old_local_size {
|
||||
self.overwrite_content(page_ref.clone(), *old_offset, new_payload)?;
|
||||
return Ok(IOResult::Done(()));
|
||||
}
|
||||
|
||||
// if it all fits in local space and old_local_size is enough, do an in-place overwrite
|
||||
if new_payload.len() == old_local_size {
|
||||
self.overwrite_content(page_ref.clone(), old_offset, &new_payload)?;
|
||||
Ok(IOResult::Done(()))
|
||||
} else {
|
||||
// doesn't fit, drop it and insert a new one
|
||||
drop_cell(
|
||||
page_ref.get().get_contents(),
|
||||
cell_idx,
|
||||
self.usable_space() as u16,
|
||||
)?;
|
||||
insert_into_cell(
|
||||
page_ref.get().get_contents(),
|
||||
&new_payload,
|
||||
cell_idx,
|
||||
self.usable_space() as u16,
|
||||
)?;
|
||||
Ok(IOResult::Done(()))
|
||||
drop_cell(
|
||||
page_ref.get().get_contents(),
|
||||
cell_idx,
|
||||
self.usable_space() as u16,
|
||||
)?;
|
||||
insert_into_cell(
|
||||
page_ref.get().get_contents(),
|
||||
new_payload,
|
||||
cell_idx,
|
||||
self.usable_space() as u16,
|
||||
)?;
|
||||
return Ok(IOResult::Done(()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6990,6 +7094,116 @@ mod tests {
|
||||
(pager, page2.get().get().id, db, conn)
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn btree_test_overflow_pages_are_cleared_on_overwrite() {
|
||||
// Create a database with a table
|
||||
let (pager, root_page, _, _) = empty_btree();
|
||||
let num_columns = 5;
|
||||
|
||||
// Get the maximum local payload size for table leaf pages
|
||||
let max_local = payload_overflow_threshold_max(PageType::TableLeaf, 4096);
|
||||
let usable_size = 4096;
|
||||
|
||||
// Create a payload that is definitely larger than the maximum local size
|
||||
// This will force the creation of overflow pages
|
||||
let large_payload_size = max_local + usable_size * 2;
|
||||
let large_payload = vec![b'X'; large_payload_size];
|
||||
|
||||
// Create a record with the large payload
|
||||
let regs = &[Register::Value(Value::Blob(large_payload.clone()))];
|
||||
let large_record = ImmutableRecord::from_registers(regs, regs.len());
|
||||
|
||||
// Create cursor for the table
|
||||
let mut cursor = BTreeCursor::new_table(None, pager.clone(), root_page, num_columns);
|
||||
|
||||
let initial_pagecount = header_accessor::get_database_size(&pager).unwrap();
|
||||
assert_eq!(
|
||||
initial_pagecount, 2,
|
||||
"Page count should be 2 after initial insert, was {initial_pagecount}"
|
||||
);
|
||||
|
||||
// Insert the large record with rowid 1
|
||||
run_until_done(
|
||||
|| {
|
||||
let key = SeekKey::TableRowId(1);
|
||||
cursor.seek(key, SeekOp::GE { eq_only: true })
|
||||
},
|
||||
pager.deref(),
|
||||
)
|
||||
.unwrap();
|
||||
let key = BTreeKey::new_table_rowid(1, Some(&large_record));
|
||||
run_until_done(|| cursor.insert(&key, true), pager.deref()).unwrap();
|
||||
|
||||
// Verify that overflow pages were created by checking freelist count
|
||||
// The freelist count should be 0 initially, and after inserting a large record,
|
||||
// some pages should be allocated for overflow, but they won't be in freelist yet
|
||||
let freelist_after_insert = header_accessor::get_freelist_pages(&pager).unwrap();
|
||||
assert_eq!(
|
||||
freelist_after_insert, 0,
|
||||
"Freelist count should be 0 after insert, was {freelist_after_insert}"
|
||||
);
|
||||
let pagecount_after_insert = header_accessor::get_database_size(&pager).unwrap();
|
||||
const EXPECTED_OVERFLOW_PAGES: u32 = 3;
|
||||
assert_eq!(
|
||||
pagecount_after_insert,
|
||||
initial_pagecount + EXPECTED_OVERFLOW_PAGES,
|
||||
"Page count should be {} after insert, was {pagecount_after_insert}",
|
||||
initial_pagecount + EXPECTED_OVERFLOW_PAGES
|
||||
);
|
||||
|
||||
// Create a smaller record to overwrite with
|
||||
let small_payload = vec![b'Y'; 100]; // Much smaller payload
|
||||
let regs = &[Register::Value(Value::Blob(small_payload.clone()))];
|
||||
let small_record = ImmutableRecord::from_registers(regs, regs.len());
|
||||
|
||||
// Seek to the existing record
|
||||
run_until_done(
|
||||
|| {
|
||||
let key = SeekKey::TableRowId(1);
|
||||
cursor.seek(key, SeekOp::GE { eq_only: true })
|
||||
},
|
||||
pager.deref(),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// Overwrite the record with the same rowid
|
||||
let key = BTreeKey::new_table_rowid(1, Some(&small_record));
|
||||
run_until_done(|| cursor.insert(&key, true), pager.deref()).unwrap();
|
||||
|
||||
// Check that the freelist count has increased, indicating overflow pages were cleared
|
||||
let freelist_after_overwrite = header_accessor::get_freelist_pages(&pager).unwrap();
|
||||
assert_eq!(freelist_after_overwrite, EXPECTED_OVERFLOW_PAGES, "Freelist count should be {EXPECTED_OVERFLOW_PAGES} after overwrite, was {freelist_after_overwrite}");
|
||||
|
||||
// Verify the record was actually overwritten by reading it back
|
||||
run_until_done(
|
||||
|| {
|
||||
let key = SeekKey::TableRowId(1);
|
||||
cursor.seek(key, SeekOp::GE { eq_only: true })
|
||||
},
|
||||
pager.deref(),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let record = run_until_done(|| cursor.record(), pager.deref()).unwrap();
|
||||
let record = record.unwrap();
|
||||
|
||||
// The record should now contain the smaller payload
|
||||
let record_payload = record.get_payload();
|
||||
const RECORD_HEADER_SIZE: usize = 1;
|
||||
const ROWID_VARINT_SIZE: usize = 1;
|
||||
const ROWID_PAYLOAD_SIZE: usize = 0; // const int 1 doesn't take any space
|
||||
const BLOB_PAYLOAD_SIZE: usize = 1; // the size '100 bytes' can be expressed as 1 byte
|
||||
assert_eq!(
|
||||
record_payload.len(),
|
||||
RECORD_HEADER_SIZE
|
||||
+ ROWID_VARINT_SIZE
|
||||
+ ROWID_PAYLOAD_SIZE
|
||||
+ BLOB_PAYLOAD_SIZE
|
||||
+ small_payload.len(),
|
||||
"Record should now contain smaller payload after overwrite"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
pub fn btree_insert_fuzz_ex() {
|
||||
|
||||
Reference in New Issue
Block a user