Added a unit-test to test the case where clear_overflow_pages function is called on cell with no overflow pages

This commit is contained in:
krishvishal
2025-01-26 17:16:46 +05:30
parent 8c763780a8
commit f62ec61694
2 changed files with 210 additions and 1 deletions

View File

@@ -2139,3 +2139,212 @@ pub fn btree_init_page(
fn to_static_buf(buf: &[u8]) -> &'static [u8] {
unsafe { std::mem::transmute::<&[u8], &'static [u8]>(buf) }
}
#[cfg(test)]
mod tests {
use super::*;
use crate::io::{Buffer, Completion, MemoryIO, OpenFlags, IO};
use crate::storage::database::FileStorage;
use crate::storage::page_cache::DumbLruPageCache;
use crate::storage::sqlite3_ondisk;
use crate::{BufferPool, DatabaseStorage, WalFile, WalFileShared, WriteCompletion};
use std::cell::RefCell;
use std::sync::{Arc, RwLock};
#[allow(clippy::arc_with_non_send_sync)]
fn setup_test_env(database_size: u32) -> (Rc<Pager>, Rc<RefCell<DatabaseHeader>>) {
let page_size = 512;
let mut db_header = DatabaseHeader::default();
db_header.page_size = page_size;
db_header.database_size = database_size;
let db_header = Rc::new(RefCell::new(db_header));
let buffer_pool = Rc::new(BufferPool::new(10));
// Initialize buffer pool with correctly sized buffers
for _ in 0..10 {
let vec = vec![0; page_size as usize]; // Initialize with correct length, not just capacity
buffer_pool.put(Pin::new(vec));
}
let io: Arc<dyn IO> = Arc::new(MemoryIO::new().unwrap());
let page_io = Rc::new(FileStorage::new(
io.open_file("test.db", OpenFlags::Create, false).unwrap(),
));
let drop_fn = Rc::new(|_buf| {});
let buf = Rc::new(RefCell::new(Buffer::allocate(page_size as usize, drop_fn)));
{
let mut buf_mut = buf.borrow_mut();
let buf_slice = buf_mut.as_mut_slice();
sqlite3_ondisk::write_header_to_buf(buf_slice, &db_header.borrow());
}
let write_complete = Box::new(|_| {});
let c = Rc::new(Completion::Write(WriteCompletion::new(write_complete)));
page_io.write_page(1, buf.clone(), c).unwrap();
let wal_shared = WalFileShared::open_shared(&io, "test.wal", page_size).unwrap();
let wal = Rc::new(RefCell::new(WalFile::new(
io.clone(),
page_size as usize,
wal_shared,
buffer_pool.clone(),
)));
let pager = Rc::new(
Pager::finish_open(
db_header.clone(),
page_io,
wal,
io,
Arc::new(RwLock::new(DumbLruPageCache::new(10))),
buffer_pool,
)
.unwrap(),
);
pager.io.run_once().unwrap();
(pager, db_header)
}
#[test]
fn test_clear_overflow_pages() -> Result<()> {
let (pager, db_header) = setup_test_env(5);
let cursor = BTreeCursor::new(pager.clone(), 1, db_header.clone());
let max_local = cursor.payload_overflow_threshold_max(PageType::TableLeaf);
let usable_size = cursor.usable_space();
// Create a large payload that will definitely trigger overflow
let large_payload = vec![b'A'; max_local + usable_size];
// Setup overflow pages (2, 3, 4) with linking
let mut current_page = 2u32;
while current_page <= 4 {
let drop_fn = Rc::new(|_buf| {});
let buf = Rc::new(RefCell::new(Buffer::allocate(
db_header.borrow().page_size as usize,
drop_fn,
)));
let write_complete = Box::new(|_| {});
let c = Rc::new(Completion::Write(WriteCompletion::new(write_complete)));
pager
.page_io
.write_page(current_page as usize, buf.clone(), c)?;
pager.io.run_once()?;
let page = cursor.pager.read_page(current_page as usize)?;
while page.is_locked() {
cursor.pager.io.run_once()?;
}
{
let contents = page.get().contents.as_mut().unwrap();
let next_page = if current_page < 4 {
current_page + 1
} else {
0
};
contents.write_u32(0, next_page); // Write pointer to next overflow page
let buf = contents.as_ptr();
buf[4..].fill(b'A');
}
current_page += 1;
}
pager.io.run_once()?;
// Create leaf cell pointing to start of overflow chain
let leaf_cell = BTreeCell::TableLeafCell(TableLeafCell {
_rowid: 1,
_payload: large_payload,
first_overflow_page: Some(2), // Point to first overflow page
});
let initial_freelist_pages = db_header.borrow().freelist_pages;
// Clear overflow pages
let clear_result = cursor.clear_overflow_pages(&leaf_cell)?;
match clear_result {
CursorResult::Ok(_) => {
// Verify proper number of pages were added to freelist
assert_eq!(
db_header.borrow().freelist_pages,
initial_freelist_pages + 3,
"Expected 3 pages to be added to freelist"
);
// If this is first trunk page
let trunk_page_id = db_header.borrow().freelist_trunk_page;
if trunk_page_id > 0 {
// Verify trunk page structure
let trunk_page = cursor.pager.read_page(trunk_page_id as usize)?;
if let Some(contents) = trunk_page.get().contents.as_ref() {
// Read number of leaf pages in trunk
let n_leaf = contents.read_u32(4);
assert!(n_leaf > 0, "Trunk page should have leaf entries");
for i in 0..n_leaf {
let leaf_page_id = contents.read_u32(8 + (i as usize * 4));
assert!(
(2..=4).contains(&leaf_page_id),
"Leaf page ID {} should be in range 2-4",
leaf_page_id
);
}
}
}
}
CursorResult::IO => {
cursor.pager.io.run_once()?;
}
}
Ok(())
}
#[test]
fn test_clear_overflow_pages_no_overflow() -> Result<()> {
let (pager, db_header) = setup_test_env(5);
let cursor = BTreeCursor::new(pager.clone(), 1, db_header.clone());
let small_payload = vec![b'A'; 10];
// Create leaf cell with no overflow pages
let leaf_cell = BTreeCell::TableLeafCell(TableLeafCell {
_rowid: 1,
_payload: small_payload,
first_overflow_page: None,
});
let initial_freelist_pages = db_header.borrow().freelist_pages;
// Try to clear non-existent overflow pages
let clear_result = cursor.clear_overflow_pages(&leaf_cell)?;
match clear_result {
CursorResult::Ok(_) => {
// Verify freelist was not modified
assert_eq!(
db_header.borrow().freelist_pages,
initial_freelist_pages,
"Freelist should not change when no overflow pages exist"
);
// Verify trunk page wasn't created
assert_eq!(
db_header.borrow().freelist_trunk_page,
0,
"No trunk page should be created when no overflow pages exist"
);
}
CursorResult::IO => {
cursor.pager.io.run_once()?;
}
}
Ok(())
}
}

View File

@@ -333,7 +333,7 @@ pub fn begin_write_database_header(header: &DatabaseHeader, pager: &Pager) -> Re
Ok(())
}
fn write_header_to_buf(buf: &mut [u8], header: &DatabaseHeader) {
pub fn write_header_to_buf(buf: &mut [u8], header: &DatabaseHeader) {
buf[0..16].copy_from_slice(&header.magic);
buf[16..18].copy_from_slice(&header.page_size.to_be_bytes());
buf[18] = header.write_version;