Merge 'Add support for delete row' from Krishna Vishal

**Core delete tasks**:
- [x] Implement free page functionality
- [x] Clear overflow pages if any before deleting their cells using free
page.
- [x] Implement delete for leaf page
- [ ] Balance after delete properly
**Auxiliary tasks to make delete work properly**:
- [x] Implement block coalescing in `free_cell_range` to reduce
fragmentation.
- [x] Track page fragmentation in `free_cell_range`.
- [x] Update page offsets in `drop_cell` and update cell pointer array
after dropping a cell.
- [x] Add TCL tasks
Closes #455
--------
I will add support for balancing after delete once `balance_nonroot` is
extended. In the current state of `balance_nonroot` balancing won't work
after delete and corrupts page.
But delete itself is functional now.

Closes #785
This commit is contained in:
Pekka Enberg
2025-02-07 12:36:23 +02:00
5 changed files with 762 additions and 117 deletions

View File

@@ -5,8 +5,9 @@ use crate::storage::sqlite3_ondisk::{
read_btree_cell, read_varint, write_varint, BTreeCell, DatabaseHeader, PageContent, PageType,
TableInteriorCell, TableLeafCell,
};
use crate::types::{CursorResult, OwnedValue, Record, SeekKey, SeekOp};
use crate::Result;
use crate::{LimboError, Result};
use std::cell::{Ref, RefCell};
use std::pin::Pin;
@@ -14,7 +15,8 @@ use std::rc::Rc;
use super::pager::PageRef;
use super::sqlite3_ondisk::{
write_varint_to_vec, IndexInteriorCell, IndexLeafCell, OverflowCell, DATABASE_HEADER_SIZE,
payload_overflows, write_varint_to_vec, IndexInteriorCell, IndexLeafCell, OverflowCell,
DATABASE_HEADER_SIZE,
};
/*
@@ -734,7 +736,6 @@ impl BTreeCursor {
// TODO: if overwrite drop cell
// insert cell
let mut cell_payload: Vec<u8> = Vec::new();
self.fill_cell_payload(page_type, Some(int_key), &mut cell_payload, record);
@@ -784,7 +785,7 @@ impl BTreeCursor {
const CELL_POINTER_SIZE_BYTES: usize = 2;
let enough_space = payload.len() + CELL_POINTER_SIZE_BYTES <= free as usize;
if !enough_space {
// add to overflow cell
// add to overflow cells
page.overflow_cells.push(OverflowCell {
index: cell_idx,
payload: Pin::new(Vec::from(payload)),
@@ -793,32 +794,33 @@ impl BTreeCursor {
}
// TODO: insert into cell payload in internal page
let new_cell_data_pointer = self.allocate_cell_space(page, payload.len() as u16);
let new_cell_data_pointer = self
.allocate_cell_space(page, payload.len() as u16)
.unwrap();
let buf = page.as_ptr();
// copy data
// Copy cell data
buf[new_cell_data_pointer as usize..new_cell_data_pointer as usize + payload.len()]
.copy_from_slice(payload);
// memmove(pIns+2, pIns, 2*(pPage->nCell - i));
let (cell_pointer_array_start, _) = page.cell_pointer_array_offset_and_size();
let cell_pointer_cur_idx = cell_pointer_array_start + (CELL_POINTER_SIZE_BYTES * cell_idx);
// move existing pointers forward by CELL_POINTER_SIZE_BYTES...
let n_cells_forward = page.cell_count() - cell_idx;
let n_bytes_forward = CELL_POINTER_SIZE_BYTES * n_cells_forward;
let cell_count = page.cell_count();
// Move existing pointers if needed
let n_bytes_forward = CELL_POINTER_SIZE_BYTES * (cell_count - cell_idx);
if n_bytes_forward > 0 {
buf.copy_within(
cell_pointer_cur_idx..cell_pointer_cur_idx + n_bytes_forward,
cell_pointer_cur_idx + CELL_POINTER_SIZE_BYTES,
);
}
// ...and insert new cell pointer at the current index
// Insert new cell pointer at the current cell index
page.write_u16(cell_pointer_cur_idx - page.offset, new_cell_data_pointer);
// update first byte of content area (cell data always appended to the left, so cell content area pointer moves to point to the new cell data)
page.write_u16(PAGE_HEADER_OFFSET_CELL_CONTENT_AREA, new_cell_data_pointer);
// update cell count
// Update cell count
let new_n_cells = (page.cell_count() + 1) as u16;
page.write_u16(PAGE_HEADER_OFFSET_CELL_COUNT, new_n_cells);
}
@@ -827,84 +829,212 @@ impl BTreeCursor {
/// This function also updates the freeblock list in the page.
/// Freeblocks are used to keep track of free space in the page,
/// and are organized as a linked list.
fn free_cell_range(&self, page: &mut PageContent, offset: u16, len: u16) {
// if the freeblock list is empty, we set this block as the first freeblock in the page header.
if page.first_freeblock() == 0 {
page.write_u16(offset as usize, 0); // next freeblock = null
page.write_u16(offset as usize + 2, len); // size of this freeblock
page.write_u16(PAGE_HEADER_OFFSET_FIRST_FREEBLOCK, offset); // first freeblock in page = this block
return;
}
let first_block = page.first_freeblock();
fn free_cell_range(&self, page: &mut PageContent, offset: u16, len: u16) -> Result<()> {
let mut cell_block_end = offset as u32 + len as u32;
let mut fragments_reduced = 0;
let mut cell_length = len;
let mut cell_block_start = offset;
let mut next_free_block_ptr = PAGE_HEADER_OFFSET_FIRST_FREEBLOCK as u16;
// if the freeblock list is not empty, and the offset is less than the first freeblock,
// we insert this block at the head of the list
if offset < first_block {
page.write_u16(offset as usize, first_block); // next freeblock = previous first freeblock
page.write_u16(offset as usize + 2, len); // size of this freeblock
page.write_u16(PAGE_HEADER_OFFSET_FIRST_FREEBLOCK, offset); // first freeblock in page = this block
return;
}
// if we clear space that is at the start of the cell content area,
// we need to update the cell content area pointer forward to account for the removed space
// FIXME: is offset ever < cell_content_area? cell content area grows leftwards and the pointer
// is to the start of the last allocated cell. should we assert!(offset >= page.cell_content_area())
// and change this to if offset == page.cell_content_area()?
if offset <= page.cell_content_area() {
// FIXME: remove the line directly below this, it does not change anything.
page.write_u16(PAGE_HEADER_OFFSET_FIRST_FREEBLOCK, page.first_freeblock());
page.write_u16(PAGE_HEADER_OFFSET_CELL_CONTENT_AREA, offset + len);
return;
}
// if the freeblock list is not empty, and the offset is greater than the first freeblock,
// then we need to do some more calculation to figure out where to insert the freeblock
// in the freeblock linked list.
let maxpc = {
let usable_size = {
let db_header = self.pager.db_header.borrow();
let usable_space = (db_header.page_size - db_header.reserved_space as u16) as usize;
usable_space as u16
(db_header.page_size - db_header.reserved_space as u16) as u32
};
let mut pc = first_block;
let mut prev = first_block;
assert!(
cell_block_end <= usable_size,
"cell block end is out of bounds"
);
assert!(cell_block_start >= 4, "minimum cell size is 4");
assert!(
cell_block_start <= self.usable_space() as u16 - 4,
"offset is out of page bounds"
);
while pc <= maxpc && pc < offset {
let next = page.read_u16(pc as usize);
prev = pc;
pc = next;
}
if pc >= maxpc {
// insert into tail
let offset = offset as usize;
let prev = prev as usize;
page.write_u16(prev, offset as u16);
page.write_u16(offset, 0);
page.write_u16(offset + 2, len);
// Check for empty freelist fast path
let mut next_free_block = if page.read_u8(next_free_block_ptr as usize + 1) == 0
&& page.read_u8(next_free_block_ptr as usize) == 0
{
0 // Fast path for empty freelist
} else {
// insert in between
let next = page.read_u16(pc as usize);
let offset = offset as usize;
let prev = prev as usize;
page.write_u16(prev, offset as u16);
page.write_u16(offset, next);
page.write_u16(offset + 2, len);
// Find position in free list
let mut block = page.read_u16(next_free_block_ptr as usize);
while block != 0 && block < cell_block_start {
if block <= next_free_block_ptr {
if block == 0 {
break; // Handle corruption test case
}
return Err(LimboError::Corrupt("Free block list not ascending".into()));
}
next_free_block_ptr = block;
block = page.read_u16(block as usize);
}
block
};
if next_free_block as u32 > usable_size - 4 {
return Err(LimboError::Corrupt("Free block beyond usable space".into()));
}
// Coalesce with next block if adjacent
if next_free_block != 0 && cell_block_end + 3 >= next_free_block as u32 {
fragments_reduced = (next_free_block as u32 - cell_block_end) as u8;
if cell_block_end > next_free_block as u32 {
return Err(LimboError::Corrupt("Invalid block overlap".into()));
}
let next_block_size = page.read_u16(next_free_block as usize + 2) as u32;
cell_block_end = next_free_block as u32 + next_block_size;
if cell_block_end > usable_size {
return Err(LimboError::Corrupt(
"Coalesced block extends beyond page".into(),
));
}
cell_length = cell_block_end as u16 - cell_block_start;
next_free_block = page.read_u16(next_free_block as usize);
}
// Coalesce with previous block if adjacent
if next_free_block_ptr > PAGE_HEADER_OFFSET_FIRST_FREEBLOCK as u16 {
let prev_block_end =
next_free_block_ptr as u32 + page.read_u16(next_free_block_ptr as usize + 2) as u32;
if prev_block_end + 3 >= cell_block_start as u32 {
if prev_block_end > cell_block_start as u32 {
return Err(LimboError::Corrupt("Invalid previous block overlap".into()));
}
fragments_reduced += (cell_block_start as u32 - prev_block_end) as u8;
cell_length = (cell_block_end - next_free_block_ptr as u32) as u16;
cell_block_start = next_free_block_ptr;
}
}
// Update frag count
let current_frags = page.read_u8(PAGE_HEADER_OFFSET_FRAGMENTED_BYTES_COUNT);
if fragments_reduced > current_frags {
return Err(LimboError::Corrupt("Invalid fragmentation count".into()));
}
page.write_u8(
PAGE_HEADER_OFFSET_FRAGMENTED_BYTES_COUNT,
current_frags - fragments_reduced,
);
let content_area_start = page.cell_content_area();
if cell_block_start <= content_area_start {
if cell_block_start < content_area_start {
return Err(LimboError::Corrupt("Free block before content area".into()));
}
if next_free_block_ptr != PAGE_HEADER_OFFSET_FIRST_FREEBLOCK as u16 {
return Err(LimboError::Corrupt("Invalid content area merge".into()));
}
// Extend content area
page.write_u16(PAGE_HEADER_OFFSET_FIRST_FREEBLOCK, next_free_block);
page.write_u16(PAGE_HEADER_OFFSET_CELL_CONTENT_AREA, cell_block_end as u16);
} else {
// Insert in free list
page.write_u16(next_free_block_ptr as usize, cell_block_start);
page.write_u16(cell_block_start as usize, next_free_block);
page.write_u16(cell_block_start as usize + 2, cell_length);
}
Ok(())
}
/// Drop a cell from a page.
/// This is done by freeing the range of bytes that the cell occupies.
fn drop_cell(&self, page: &mut PageContent, cell_idx: usize) {
let cell_count = page.cell_count();
let (cell_start, cell_len) = page.cell_get_raw_region(
cell_idx,
self.payload_overflow_threshold_max(page.page_type()),
self.payload_overflow_threshold_min(page.page_type()),
self.usable_space(),
);
self.free_cell_range(page, cell_start as u16, cell_len as u16);
page.write_u16(PAGE_HEADER_OFFSET_CELL_COUNT, page.cell_count() as u16 - 1);
self.free_cell_range(page, cell_start as u16, cell_len as u16)
.expect("Failed to free cell range");
let new_cell_count = cell_count - 1;
page.write_u16(PAGE_HEADER_OFFSET_CELL_COUNT, new_cell_count as u16);
if new_cell_count == 0 {
page.write_u16(PAGE_HEADER_OFFSET_FIRST_FREEBLOCK, 0);
page.write_u8(PAGE_HEADER_OFFSET_FRAGMENTED_BYTES_COUNT, 0);
page.write_u16(
PAGE_HEADER_OFFSET_CELL_CONTENT_AREA,
self.usable_space() as u16,
);
} else {
let (pointer_array_start, _) = page.cell_pointer_array_offset_and_size();
let buf = page.as_ptr();
buf.copy_within(
pointer_array_start + (2 * (cell_idx + 1)) // src
..pointer_array_start + (2 * cell_count),
pointer_array_start + (2 * cell_idx), // dst
);
}
}
fn find_free_cell(
&self,
page_ref: &PageContent,
amount: usize,
db_header: Ref<DatabaseHeader>,
) -> Result<usize> {
// NOTE: freelist is in ascending order of keys and pc
// unused_space is reserved bytes at the end of page, therefore we must substract from maxpc
let mut free_list_pointer_addr = 1;
let mut pc = page_ref.first_freeblock() as usize;
let usable_space = (db_header.page_size - db_header.reserved_space as u16) as usize;
let maxpc = usable_space - amount;
if pc == 0 {
return Ok(0);
}
while pc <= maxpc {
let size = page_ref.read_u16(pc + 2) as usize;
if let Some(x) = size.checked_sub(amount) {
if x < 4 {
if page_ref.read_u8(PAGE_HEADER_OFFSET_FRAGMENTED_BYTES_COUNT) > 57 {
return Ok(0);
}
let next_ptr = page_ref.read_u16(pc);
page_ref.write_u16(free_list_pointer_addr, next_ptr);
let frag_count = page_ref.read_u8(PAGE_HEADER_OFFSET_FRAGMENTED_BYTES_COUNT);
page_ref.write_u8(
PAGE_HEADER_OFFSET_FRAGMENTED_BYTES_COUNT,
frag_count + x as u8,
);
return Ok(pc);
} else if x + pc > maxpc {
return Err(LimboError::Corrupt("Free block extends beyond page".into()));
} else {
page_ref.write_u16(pc + 2, x as u16);
return Ok(pc + x);
}
}
free_list_pointer_addr = pc;
pc = page_ref.read_u16(pc) as usize;
if pc <= free_list_pointer_addr && pc != 0 {
return Err(LimboError::Corrupt(
"Free list not in ascending order".into(),
));
}
}
if pc > maxpc + amount - 4 {
return Err(LimboError::Corrupt(
"Free block chain extends beyond page end".into(),
));
}
Ok(0)
}
/// Balance a leaf page.
@@ -1358,38 +1488,41 @@ impl BTreeCursor {
}
/// Allocate space for a cell on a page.
fn allocate_cell_space(&self, page_ref: &PageContent, amount: u16) -> u16 {
fn allocate_cell_space(&self, page_ref: &mut PageContent, amount: u16) -> Result<u16> {
let amount = amount as usize;
let (cell_offset, _) = page_ref.cell_pointer_array_offset_and_size();
let gap = cell_offset + 2 * page_ref.cell_count();
let mut top = page_ref.cell_content_area() as usize;
// there are free blocks and enough space
if page_ref.first_freeblock() != 0 && gap + 2 <= top {
// find slot
let db_header = RefCell::borrow(&self.pager.db_header);
let pc = find_free_cell(page_ref, db_header, amount);
let pc = self.find_free_cell(page_ref, amount, db_header)?;
if pc != 0 {
return pc as u16;
// Corruption check
if pc <= gap {
return Err(LimboError::Corrupt(
"Corrupted page: free block overlaps cell pointer array".into(),
));
}
return Ok(pc as u16);
}
/* fall through, we might need to defragment */
}
if gap + 2 + amount > top {
// defragment
self.defragment_page(page_ref, RefCell::borrow(&self.pager.db_header));
top = page_ref.read_u16(PAGE_HEADER_OFFSET_CELL_CONTENT_AREA) as usize;
assert!(gap + 2 + amount <= top);
}
let db_header = RefCell::borrow(&self.pager.db_header);
top -= amount;
page_ref.write_u16(PAGE_HEADER_OFFSET_CELL_CONTENT_AREA, top as u16);
let db_header = RefCell::borrow(&self.pager.db_header);
let usable_space = (db_header.page_size - db_header.reserved_space as u16) as usize;
assert!(top + amount <= usable_space);
top as u16
Ok(top as u16)
}
/// Defragment a page. This means packing all the cells to the end of the page.
@@ -1834,7 +1967,125 @@ impl BTreeCursor {
}
pub fn delete(&mut self) -> Result<CursorResult<()>> {
println!("rowid: {:?}", self.rowid.borrow());
let page = self.stack.top();
return_if_locked!(page);
if !page.is_loaded() {
self.pager.load_page(page.clone())?;
return Ok(CursorResult::IO);
}
let target_rowid = match self.rowid.borrow().as_ref() {
Some(rowid) => *rowid,
None => return Ok(CursorResult::Ok(())),
};
let contents = page.get().contents.as_ref().unwrap();
// TODO(Krishna): We are doing this linear search here because seek() is returning the index of previous cell.
// And the fix is currently not very clear to me.
// This finds the cell with matching rowid with in a page.
let mut cell_idx = None;
for idx in 0..contents.cell_count() {
let cell = contents.cell_get(
idx,
self.pager.clone(),
self.payload_overflow_threshold_max(contents.page_type()),
self.payload_overflow_threshold_min(contents.page_type()),
self.usable_space(),
)?;
if let BTreeCell::TableLeafCell(leaf_cell) = cell {
if leaf_cell._rowid == target_rowid {
cell_idx = Some(idx);
break;
}
}
}
let cell_idx = match cell_idx {
Some(idx) => idx,
None => return Ok(CursorResult::Ok(())),
};
let contents = page.get().contents.as_ref().unwrap();
let cell = contents.cell_get(
cell_idx,
self.pager.clone(),
self.payload_overflow_threshold_max(contents.page_type()),
self.payload_overflow_threshold_min(contents.page_type()),
self.usable_space(),
)?;
if cell_idx >= contents.cell_count() {
return Err(LimboError::Corrupt(format!(
"Corrupted page: cell index {} is out of bounds for page with {} cells",
cell_idx,
contents.cell_count()
)));
}
let original_child_pointer = match &cell {
BTreeCell::TableInteriorCell(interior) => Some(interior._left_child_page),
_ => None,
};
return_if_io!(self.clear_overflow_pages(&cell));
let page = self.stack.top();
return_if_locked!(page);
if !page.is_loaded() {
self.pager.load_page(page.clone())?;
return Ok(CursorResult::IO);
}
page.set_dirty();
self.pager.add_dirty(page.get().id);
let contents = page.get().contents.as_mut().unwrap();
// If this is an interior node, we need to handle deletion differently
// For interior nodes:
// 1. Move cursor to largest entry in left subtree
// 2. Copy that entry to replace the one being deleted
// 3. Delete the leaf entry
if !contents.is_leaf() {
// 1. Move cursor to largest entry in left subtree
return_if_io!(self.prev());
let leaf_page = self.stack.top();
// 2. Copy that entry to replace the one being deleted
let leaf_contents = leaf_page.get().contents.as_ref().unwrap();
let leaf_cell_idx = self.stack.current_cell_index() as usize - 1;
let predecessor_cell = leaf_contents.cell_get(
leaf_cell_idx,
self.pager.clone(),
self.payload_overflow_threshold_max(leaf_contents.page_type()),
self.payload_overflow_threshold_min(leaf_contents.page_type()),
self.usable_space(),
)?;
// 3. Create an interior cell from the leaf cell
let mut cell_payload: Vec<u8> = Vec::new();
match predecessor_cell {
BTreeCell::TableLeafCell(leaf_cell) => {
// Format: [left child page (4 bytes)][rowid varint]
if let Some(child_pointer) = original_child_pointer {
cell_payload.extend_from_slice(&child_pointer.to_be_bytes());
write_varint_to_vec(leaf_cell._rowid, &mut cell_payload);
}
}
_ => unreachable!("Expected table leaf cell"),
}
self.insert_into_cell(contents, &cell_payload, cell_idx);
self.drop_cell(contents, cell_idx);
} else {
// For leaf nodes, simply remove the cell
self.drop_cell(contents, cell_idx);
}
// TODO(Krishna): Implement balance after delete. I will implement after balance_nonroot is extended.
Ok(CursorResult::Ok(()))
}
@@ -1894,6 +2145,92 @@ impl BTreeCursor {
let id = page.get().id;
id as u32
}
fn clear_overflow_pages(&self, cell: &BTreeCell) -> Result<CursorResult<()>> {
// Get overflow info based on cell type
let (first_overflow_page, n_overflow) = match cell {
BTreeCell::TableLeafCell(leaf_cell) => {
match self.calculate_overflow_info(leaf_cell._payload.len(), PageType::TableLeaf)? {
Some(n_overflow) => (leaf_cell.first_overflow_page, n_overflow),
None => return Ok(CursorResult::Ok(())),
}
}
BTreeCell::IndexLeafCell(leaf_cell) => {
match self.calculate_overflow_info(leaf_cell.payload.len(), PageType::IndexLeaf)? {
Some(n_overflow) => (leaf_cell.first_overflow_page, n_overflow),
None => return Ok(CursorResult::Ok(())),
}
}
BTreeCell::IndexInteriorCell(interior_cell) => {
match self
.calculate_overflow_info(interior_cell.payload.len(), PageType::IndexInterior)?
{
Some(n_overflow) => (interior_cell.first_overflow_page, n_overflow),
None => return Ok(CursorResult::Ok(())),
}
}
BTreeCell::TableInteriorCell(_) => return Ok(CursorResult::Ok(())), // No overflow pages
};
let Some(first_page) = first_overflow_page else {
return Ok(CursorResult::Ok(()));
};
let page_count = self.pager.db_header.borrow().database_size as usize;
let mut pages_left = n_overflow;
let mut current_page = first_page;
// Clear overflow pages
while pages_left > 0 {
pages_left -= 1;
// Validate overflow page number
if current_page < 2 || current_page as usize > page_count {
return Err(LimboError::Corrupt("Invalid overflow page number".into()));
}
let page = self.pager.read_page(current_page as usize)?;
return_if_locked!(page);
let contents = page.get().contents.as_ref().unwrap();
let next_page = if pages_left > 0 {
contents.read_u32(0)
} else {
0
};
// Free the current page
self.pager.free_page(Some(page), current_page as usize)?;
current_page = next_page;
}
Ok(CursorResult::Ok(()))
}
fn calculate_overflow_info(
&self,
payload_len: usize,
page_type: PageType,
) -> Result<Option<usize>> {
let max_local = self.payload_overflow_threshold_max(page_type.clone());
let min_local = self.payload_overflow_threshold_min(page_type.clone());
let usable_size = self.usable_space();
let (_, local_size) = payload_overflows(payload_len, max_local, min_local, usable_size);
assert!(
local_size != payload_len,
"Trying to clear overflow pages when there are no overflow pages"
);
// Calculate expected overflow pages
let overflow_page_size = self.usable_space() - 4;
let n_overflow =
(payload_len - local_size + overflow_page_size).div_ceil(overflow_page_size);
if n_overflow == 0 {
return Err(LimboError::Corrupt("Invalid overflow calculation".into()));
}
Ok(Some(n_overflow))
}
}
impl PageStack {
@@ -1993,32 +2330,6 @@ impl PageStack {
}
}
fn find_free_cell(page_ref: &PageContent, db_header: Ref<DatabaseHeader>, amount: usize) -> usize {
// NOTE: freelist is in ascending order of keys and pc
// unuse_space is reserved bytes at the end of page, therefore we must substract from maxpc
let mut pc = page_ref.first_freeblock() as usize;
let buf = page_ref.as_ptr();
let usable_space = (db_header.page_size - db_header.reserved_space as u16) as usize;
let maxpc = usable_space - amount;
let mut found = false;
while pc <= maxpc {
let next = u16::from_be_bytes(buf[pc..pc + 2].try_into().unwrap());
let size = u16::from_be_bytes(buf[pc + 2..pc + 4].try_into().unwrap());
if amount <= size as usize {
found = true;
break;
}
pc = next as usize;
}
if !found {
0
} else {
pc
}
}
pub fn btree_init_page(
page: &PageRef,
page_type: PageType,
@@ -2048,3 +2359,212 @@ pub fn btree_init_page(
fn to_static_buf(buf: &[u8]) -> &'static [u8] {
unsafe { std::mem::transmute::<&[u8], &'static [u8]>(buf) }
}
#[cfg(test)]
mod tests {
use super::*;
use crate::io::{Buffer, Completion, MemoryIO, OpenFlags, IO};
use crate::storage::database::FileStorage;
use crate::storage::page_cache::DumbLruPageCache;
use crate::storage::sqlite3_ondisk;
use crate::{BufferPool, DatabaseStorage, WalFile, WalFileShared, WriteCompletion};
use std::cell::RefCell;
use std::sync::Arc;
#[allow(clippy::arc_with_non_send_sync)]
fn setup_test_env(database_size: u32) -> (Rc<Pager>, Rc<RefCell<DatabaseHeader>>) {
let page_size = 512;
let mut db_header = DatabaseHeader::default();
db_header.page_size = page_size;
db_header.database_size = database_size;
let db_header = Rc::new(RefCell::new(db_header));
let buffer_pool = Rc::new(BufferPool::new(10));
// Initialize buffer pool with correctly sized buffers
for _ in 0..10 {
let vec = vec![0; page_size as usize]; // Initialize with correct length, not just capacity
buffer_pool.put(Pin::new(vec));
}
let io: Arc<dyn IO> = Arc::new(MemoryIO::new().unwrap());
let page_io = Rc::new(FileStorage::new(
io.open_file("test.db", OpenFlags::Create, false).unwrap(),
));
let drop_fn = Rc::new(|_buf| {});
let buf = Rc::new(RefCell::new(Buffer::allocate(page_size as usize, drop_fn)));
{
let mut buf_mut = buf.borrow_mut();
let buf_slice = buf_mut.as_mut_slice();
sqlite3_ondisk::write_header_to_buf(buf_slice, &db_header.borrow());
}
let write_complete = Box::new(|_| {});
let c = Rc::new(Completion::Write(WriteCompletion::new(write_complete)));
page_io.write_page(1, buf.clone(), c).unwrap();
let wal_shared = WalFileShared::open_shared(&io, "test.wal", page_size).unwrap();
let wal = Rc::new(RefCell::new(WalFile::new(
io.clone(),
page_size as usize,
wal_shared,
buffer_pool.clone(),
)));
let pager = Rc::new(
Pager::finish_open(
db_header.clone(),
page_io,
wal,
io,
Arc::new(parking_lot::RwLock::new(DumbLruPageCache::new(10))),
buffer_pool,
)
.unwrap(),
);
pager.io.run_once().unwrap();
(pager, db_header)
}
#[test]
fn test_clear_overflow_pages() -> Result<()> {
let (pager, db_header) = setup_test_env(5);
let cursor = BTreeCursor::new(pager.clone(), 1);
let max_local = cursor.payload_overflow_threshold_max(PageType::TableLeaf);
let usable_size = cursor.usable_space();
// Create a large payload that will definitely trigger overflow
let large_payload = vec![b'A'; max_local + usable_size];
// Setup overflow pages (2, 3, 4) with linking
let mut current_page = 2u32;
while current_page <= 4 {
let drop_fn = Rc::new(|_buf| {});
let buf = Rc::new(RefCell::new(Buffer::allocate(
db_header.borrow().page_size as usize,
drop_fn,
)));
let write_complete = Box::new(|_| {});
let c = Rc::new(Completion::Write(WriteCompletion::new(write_complete)));
pager
.page_io
.write_page(current_page as usize, buf.clone(), c)?;
pager.io.run_once()?;
let page = cursor.pager.read_page(current_page as usize)?;
while page.is_locked() {
cursor.pager.io.run_once()?;
}
{
let contents = page.get().contents.as_mut().unwrap();
let next_page = if current_page < 4 {
current_page + 1
} else {
0
};
contents.write_u32(0, next_page); // Write pointer to next overflow page
let buf = contents.as_ptr();
buf[4..].fill(b'A');
}
current_page += 1;
}
pager.io.run_once()?;
// Create leaf cell pointing to start of overflow chain
let leaf_cell = BTreeCell::TableLeafCell(TableLeafCell {
_rowid: 1,
_payload: large_payload,
first_overflow_page: Some(2), // Point to first overflow page
});
let initial_freelist_pages = db_header.borrow().freelist_pages;
// Clear overflow pages
let clear_result = cursor.clear_overflow_pages(&leaf_cell)?;
match clear_result {
CursorResult::Ok(_) => {
// Verify proper number of pages were added to freelist
assert_eq!(
db_header.borrow().freelist_pages,
initial_freelist_pages + 3,
"Expected 3 pages to be added to freelist"
);
// If this is first trunk page
let trunk_page_id = db_header.borrow().freelist_trunk_page;
if trunk_page_id > 0 {
// Verify trunk page structure
let trunk_page = cursor.pager.read_page(trunk_page_id as usize)?;
if let Some(contents) = trunk_page.get().contents.as_ref() {
// Read number of leaf pages in trunk
let n_leaf = contents.read_u32(4);
assert!(n_leaf > 0, "Trunk page should have leaf entries");
for i in 0..n_leaf {
let leaf_page_id = contents.read_u32(8 + (i as usize * 4));
assert!(
(2..=4).contains(&leaf_page_id),
"Leaf page ID {} should be in range 2-4",
leaf_page_id
);
}
}
}
}
CursorResult::IO => {
cursor.pager.io.run_once()?;
}
}
Ok(())
}
#[test]
fn test_clear_overflow_pages_no_overflow() -> Result<()> {
let (pager, db_header) = setup_test_env(5);
let cursor = BTreeCursor::new(pager.clone(), 1);
let small_payload = vec![b'A'; 10];
// Create leaf cell with no overflow pages
let leaf_cell = BTreeCell::TableLeafCell(TableLeafCell {
_rowid: 1,
_payload: small_payload,
first_overflow_page: None,
});
let initial_freelist_pages = db_header.borrow().freelist_pages;
// Try to clear non-existent overflow pages
let clear_result = cursor.clear_overflow_pages(&leaf_cell)?;
match clear_result {
CursorResult::Ok(_) => {
// Verify freelist was not modified
assert_eq!(
db_header.borrow().freelist_pages,
initial_freelist_pages,
"Freelist should not change when no overflow pages exist"
);
// Verify trunk page wasn't created
assert_eq!(
db_header.borrow().freelist_trunk_page,
0,
"No trunk page should be created when no overflow pages exist"
);
}
CursorResult::IO => {
cursor.pager.io.run_once()?;
}
}
Ok(())
}
}

View File

@@ -3,7 +3,7 @@ use crate::storage::buffer_pool::BufferPool;
use crate::storage::database::DatabaseStorage;
use crate::storage::sqlite3_ondisk::{self, DatabaseHeader, PageContent};
use crate::storage::wal::{CheckpointResult, Wal};
use crate::{Buffer, Result};
use crate::{Buffer, LimboError, Result};
use log::trace;
use parking_lot::RwLock;
use std::cell::{RefCell, UnsafeCell};
@@ -451,6 +451,78 @@ impl Pager {
checkpoint_result
}
// Providing a page is optional, if provided it will be used to avoid reading the page from disk.
// This is implemented in accordance with sqlite freepage2() function.
pub fn free_page(&self, page: Option<PageRef>, page_id: usize) -> Result<()> {
const TRUNK_PAGE_HEADER_SIZE: usize = 8;
const LEAF_ENTRY_SIZE: usize = 4;
const RESERVED_SLOTS: usize = 2;
const TRUNK_PAGE_NEXT_PAGE_OFFSET: usize = 0; // Offset to next trunk page pointer
const TRUNK_PAGE_LEAF_COUNT_OFFSET: usize = 4; // Offset to leaf count
if page_id < 2 || page_id > self.db_header.borrow().database_size as usize {
return Err(LimboError::Corrupt(format!(
"Invalid page number {} for free operation",
page_id
)));
}
let page = match page {
Some(page) => {
assert_eq!(page.get().id, page_id, "Page id mismatch");
page
}
None => self.read_page(page_id)?,
};
self.db_header.borrow_mut().freelist_pages += 1;
let trunk_page_id = self.db_header.borrow().freelist_trunk_page;
if trunk_page_id != 0 {
// Add as leaf to current trunk
let trunk_page = self.read_page(trunk_page_id as usize)?;
let trunk_page_contents = trunk_page.get().contents.as_ref().unwrap();
let number_of_leaf_pages = trunk_page_contents.read_u32(TRUNK_PAGE_LEAF_COUNT_OFFSET);
// Reserve 2 slots for the trunk page header which is 8 bytes or 2*LEAF_ENTRY_SIZE
let max_free_list_entries = (self.usable_size() / LEAF_ENTRY_SIZE) - RESERVED_SLOTS;
if number_of_leaf_pages < max_free_list_entries as u32 {
trunk_page.set_dirty();
self.add_dirty(trunk_page_id as usize);
trunk_page_contents
.write_u32(TRUNK_PAGE_LEAF_COUNT_OFFSET, number_of_leaf_pages + 1);
trunk_page_contents.write_u32(
TRUNK_PAGE_HEADER_SIZE + (number_of_leaf_pages as usize * LEAF_ENTRY_SIZE),
page_id as u32,
);
page.clear_uptodate();
page.clear_loaded();
return Ok(());
}
}
// If we get here, need to make this page a new trunk
page.set_dirty();
self.add_dirty(page_id);
let contents = page.get().contents.as_mut().unwrap();
// Point to previous trunk
contents.write_u32(TRUNK_PAGE_NEXT_PAGE_OFFSET, trunk_page_id);
// Zero leaf count
contents.write_u32(TRUNK_PAGE_LEAF_COUNT_OFFSET, 0);
// Update page 1 to point to new trunk
self.db_header.borrow_mut().freelist_trunk_page = page_id as u32;
// Clear flags
page.clear_uptodate();
page.clear_loaded();
Ok(())
}
/*
Gets a new page that increasing the size of the page or uses a free page.
Currently free list pages are not yet supported.

View File

@@ -106,10 +106,10 @@ pub struct DatabaseHeader {
pub database_size: u32,
/// Page number of the first freelist trunk page.
freelist_trunk_page: u32,
pub freelist_trunk_page: u32,
/// Total number of freelist pages.
freelist_pages: u32,
pub freelist_pages: u32,
/// The schema cookie. Incremented when the database schema changes.
schema_cookie: u32,
@@ -333,7 +333,7 @@ pub fn begin_write_database_header(header: &DatabaseHeader, pager: &Pager) -> Re
Ok(())
}
fn write_header_to_buf(buf: &mut [u8], header: &DatabaseHeader) {
pub fn write_header_to_buf(buf: &mut [u8], header: &DatabaseHeader) {
buf[0..16].copy_from_slice(&header.magic);
buf[16..18].copy_from_slice(&header.page_size.to_be_bytes());
buf[18] = header.write_version;
@@ -429,7 +429,7 @@ impl PageContent {
}
}
fn read_u8(&self, pos: usize) -> u8 {
pub fn read_u8(&self, pos: usize) -> u8 {
let buf = self.as_ptr();
buf[self.offset + pos]
}
@@ -439,7 +439,7 @@ impl PageContent {
u16::from_be_bytes([buf[self.offset + pos], buf[self.offset + pos + 1]])
}
fn read_u32(&self, pos: usize) -> u32 {
pub fn read_u32(&self, pos: usize) -> u32 {
let buf = self.as_ptr();
u32::from_be_bytes([
buf[self.offset + pos],

View File

@@ -5,6 +5,7 @@ set testdir [file dirname $argv0]
source $testdir/cmdlineshell.test
source $testdir/agg-functions.test
source $testdir/coalesce.test
source $testdir/delete.test
source $testdir/glob.test
source $testdir/join.test
source $testdir/insert.test

52
testing/delete.test Normal file
View File

@@ -0,0 +1,52 @@
#!/usr/bin/env tclsh
set testdir [file dirname $argv0]
source $testdir/tester.tcl
# Basic single row delete test
do_execsql_test_on_specific_db {:memory:} delete-single-1 {
CREATE TABLE t1(x INTEGER PRIMARY KEY);
INSERT INTO t1 VALUES (1);
INSERT INTO t1 VALUES (2);
INSERT INTO t1 VALUES (3);
DELETE FROM t1 WHERE x = 2;
SELECT * FROM t1 ORDER BY x;
} {1 3}
# Test alternating delete-insert pattern to stress freelist
do_execsql_test_on_specific_db {:memory:} delete-insert-alternate-1 {
CREATE TABLE t4(x INTEGER PRIMARY KEY);
INSERT INTO t4 VALUES (1);
INSERT INTO t4 VALUES (2);
INSERT INTO t4 VALUES (3);
DELETE FROM t4 WHERE x = 2;
INSERT INTO t4 VALUES (4);
DELETE FROM t4 WHERE x = 1;
INSERT INTO t4 VALUES (5);
SELECT * FROM t4 ORDER BY x;
} {3 4 5}
# Test deleting from both ends
do_execsql_test_on_specific_db {:memory:} delete-ends-1 {
CREATE TABLE t5(x INTEGER PRIMARY KEY);
INSERT INTO t5 VALUES (1);
INSERT INTO t5 VALUES (2);
INSERT INTO t5 VALUES (3);
INSERT INTO t5 VALUES (4);
INSERT INTO t5 VALUES (5);
-- Delete from both ends
DELETE FROM t5 WHERE x = 1;
DELETE FROM t5 WHERE x = 5;
SELECT * FROM t5 ORDER BY x;
} {2 3 4}
# Test delete-insert cycles with value reuse
do_execsql_test_on_specific_db {:memory:} delete-reuse-1 {
CREATE TABLE t6(x INTEGER PRIMARY KEY);
INSERT INTO t6 VALUES (1);
INSERT INTO t6 VALUES (2);
INSERT INTO t6 VALUES (3);
DELETE FROM t6 WHERE x = 2;
INSERT INTO t6 VALUES (2); -- Reuse same value
SELECT * FROM t6 ORDER BY x;
} {1 2 3}