overflow pages support

This commit is contained in:
Pere Diaz Bou
2024-09-13 20:32:33 +02:00
parent 54615d752d
commit e9bc4b04a7
3 changed files with 180 additions and 54 deletions

View File

@@ -118,7 +118,7 @@ impl BTreeCursor {
},
}
}
let cell = page.cell_get(mem_page.cell_idx())?;
let cell = page.cell_get(mem_page.cell_idx(), self.pager.clone())?;
match &cell {
BTreeCell::TableInteriorCell(TableInteriorCell {
_left_child_page,
@@ -167,7 +167,7 @@ impl BTreeCursor {
let page = page.as_ref().unwrap();
for cell_idx in 0..page.cell_count() {
match &page.cell_get(cell_idx)? {
match &page.cell_get(cell_idx, self.pager.clone())? {
BTreeCell::TableLeafCell(TableLeafCell {
_rowid: cell_rowid,
_payload: p,
@@ -271,7 +271,7 @@ impl BTreeCursor {
let mut found_cell = false;
for cell_idx in 0..page.cell_count() {
match &page.cell_get(cell_idx)? {
match &page.cell_get(cell_idx, self.pager.clone())? {
BTreeCell::TableInteriorCell(TableInteriorCell {
_left_child_page,
_rowid,
@@ -344,7 +344,7 @@ impl BTreeCursor {
assert!(matches!(page.page_type(), PageType::TableLeaf));
// find cell
(find_cell(page, int_key), page.page_type())
(self.find_cell(page, int_key), page.page_type())
};
// TODO: if overwrite drop cell
@@ -593,7 +593,7 @@ impl BTreeCursor {
// Right page pointer is u32 in right most pointer, and in cell is u32 too, so we can use a *u32 to hold where we want to change this value
let mut right_pointer = BTREE_HEADER_OFFSET_RIGHTMOST;
for cell_idx in 0..parent.cell_count() {
let cell = parent.cell_get(cell_idx).unwrap();
let cell = parent.cell_get(cell_idx, self.pager.clone()).unwrap();
let found = match cell {
BTreeCell::TableInteriorCell(interior) => {
interior._left_child_page as usize == mem_page.page_idx
@@ -650,7 +650,9 @@ impl BTreeCursor {
if !is_leaf {
for page in new_pages.iter_mut().take(new_pages_len - 1) {
assert!(page.cell_count() == 1);
let last_cell = page.cell_get(page.cell_count() - 1).unwrap();
let last_cell = page
.cell_get(page.cell_count() - 1, self.pager.clone())
.unwrap();
let last_cell_pointer = match last_cell {
BTreeCell::TableInteriorCell(interior) => interior._left_child_page,
_ => unreachable!(),
@@ -672,7 +674,9 @@ impl BTreeCursor {
assert!(page.cell_count() > 1);
let divider_cell_index = divider_cells_index[page_id_index];
let cell_payload = scratch_cells[divider_cell_index];
let cell = read_btree_cell(cell_payload, &page.page_type(), 0).unwrap();
let cell =
read_btree_cell(cell_payload, &page.page_type(), 0, self.pager.clone())
.unwrap();
if is_leaf {
// create a new divider cell and push
let key = match cell {
@@ -686,7 +690,7 @@ impl BTreeCursor {
divider_cell.extend(std::iter::repeat(0).take(9));
let n = write_varint(&mut divider_cell.as_mut_slice()[4..], key);
divider_cell.truncate(4 + n);
let parent_cell_idx = find_cell(parent, key);
let parent_cell_idx = self.find_cell(parent, key);
self.insert_into_cell(parent, divider_cell.as_slice(), parent_cell_idx);
} else {
// move cell
@@ -694,7 +698,7 @@ impl BTreeCursor {
BTreeCell::TableInteriorCell(interior) => interior._rowid,
_ => unreachable!(),
};
let parent_cell_idx = find_cell(page, key);
let parent_cell_idx = self.find_cell(page, key);
self.insert_into_cell(parent, cell_payload, parent_cell_idx);
// self.drop_cell(*page, 0);
}
@@ -792,6 +796,21 @@ impl BTreeCursor {
page
}
fn allocate_overflow_page(&self) -> Rc<RefCell<Page>> {
let page = self.pager.allocate_page().unwrap();
{
// setup overflow page
let contents = RefCell::borrow(&page);
let mut contents = contents.contents.write().unwrap();
let contents = contents.as_mut().unwrap();
let buf = contents.as_ptr();
buf.fill(0);
}
page
}
/*
Allocate space for a cell on a page.
*/
@@ -1009,19 +1028,67 @@ impl BTreeCursor {
write_varint_to_vec(record_buf.len() as u64, cell_payload);
}
if record_buf.len() <= self.max_local(page_type) {
let max_local = self.max_local(page_type.clone());
if record_buf.len() <= max_local {
// enough allowed space to fit inside a btree page
cell_payload.extend_from_slice(record_buf.as_slice());
cell_payload.resize(cell_payload.len() + 4, 0);
return;
}
todo!("implement overflow page");
let min_local = self.min_local(page_type);
let mut space_left = min_local + (record_buf.len() - min_local) % (self.usable_space() - 4);
if space_left > max_local {
space_left = min_local;
}
// cell_size must be equal to first value of space_left as this will be the bytes copied to non-overflow page.
let cell_size = space_left + cell_payload.len() + 4; // 4 is the number of bytes of pointer to first overflow page
let mut to_copy_buffer = record_buf.as_slice();
let prev_size = cell_payload.len();
cell_payload.resize(prev_size + space_left + 4, 0);
let mut pointer = unsafe { cell_payload.as_mut_ptr().add(prev_size) };
let mut pointer_to_next = unsafe { cell_payload.as_mut_ptr().add(prev_size + space_left) };
let mut overflow_pages = Vec::new();
loop {
let to_copy = space_left.min(to_copy_buffer.len());
unsafe { std::ptr::copy(to_copy_buffer.as_ptr(), pointer, to_copy) };
let left = to_copy_buffer.len() - to_copy;
if left == 0 {
break;
}
// we still have bytes to add, we will need to allocate new overflow page
let overflow_page = self.allocate_overflow_page();
overflow_pages.push(overflow_page.clone());
{
let page = overflow_page.borrow();
let mut contents_lock = page.contents.write().unwrap();
let contents = contents_lock.as_mut().unwrap();
let buf = contents.as_ptr();
let id = page.id as u32;
let as_bytes = id.to_be_bytes();
// update pointer to new overflow page
unsafe { std::ptr::copy(as_bytes.as_ptr(), pointer_to_next, 4) };
pointer = unsafe { buf.as_mut_ptr().add(4) };
pointer_to_next = buf.as_mut_ptr();
space_left = self.usable_space() - 4;
}
to_copy_buffer = &to_copy_buffer[to_copy..];
}
assert_eq!(cell_size, cell_payload.len());
}
fn max_local(&self, page_type: PageType) -> usize {
let usable_space = {
let db_header = RefCell::borrow(&self.database_header);
(db_header.page_size - db_header.unused_space as u16) as usize
};
let usable_space = self.usable_space();
match page_type {
PageType::IndexInterior | PageType::TableInterior => {
(usable_space - 12) * 64 / 255 - 23
@@ -1029,6 +1096,43 @@ impl BTreeCursor {
PageType::IndexLeaf | PageType::TableLeaf => usable_space - 35,
}
}
fn min_local(&self, page_type: PageType) -> usize {
let usable_space = self.usable_space();
match page_type {
PageType::IndexInterior | PageType::TableInterior => {
(usable_space - 12) * 32 / 255 - 23
}
PageType::IndexLeaf | PageType::TableLeaf => (usable_space - 12) * 32 / 255 - 23,
}
}
fn usable_space(&self) -> usize {
let db_header = RefCell::borrow(&self.database_header);
(db_header.page_size - db_header.unused_space as u16) as usize
}
fn find_cell(&self, page: &PageContent, int_key: u64) -> usize {
let mut cell_idx = 0;
let cell_count = page.cell_count();
while cell_idx < cell_count {
match page.cell_get(cell_idx, self.pager.clone()).unwrap() {
BTreeCell::TableLeafCell(cell) => {
if int_key <= cell._rowid {
break;
}
}
BTreeCell::TableInteriorCell(cell) => {
if int_key <= cell._rowid {
break;
}
}
_ => todo!(),
}
cell_idx += 1;
}
cell_idx
}
}
fn find_free_cell(page_ref: &PageContent, db_header: Ref<DatabaseHeader>, amount: usize) -> usize {
@@ -1184,11 +1288,11 @@ impl Cursor for BTreeCursor {
OwnedValue::Integer(i) => *i as u64,
_ => unreachable!("btree tables are indexed by integers!"),
};
let cell_idx = find_cell(page, int_key);
let cell_idx = self.find_cell(page, int_key);
if cell_idx >= page.cell_count() {
Ok(CursorResult::Ok(false))
} else {
let equals = match &page.cell_get(cell_idx)? {
let equals = match &page.cell_get(cell_idx, self.pager.clone())? {
BTreeCell::TableLeafCell(l) => l._rowid == int_key,
_ => unreachable!(),
};
@@ -1196,25 +1300,3 @@ impl Cursor for BTreeCursor {
}
}
}
fn find_cell(page: &PageContent, int_key: u64) -> usize {
let mut cell_idx = 0;
let cell_count = page.cell_count();
while cell_idx < cell_count {
match page.cell_get(cell_idx).unwrap() {
BTreeCell::TableLeafCell(cell) => {
if int_key <= cell._rowid {
break;
}
}
BTreeCell::TableInteriorCell(cell) => {
if int_key <= cell._rowid {
break;
}
}
_ => todo!(),
}
cell_idx += 1;
}
cell_idx
}

View File

@@ -416,4 +416,9 @@ impl Pager {
let mut cache = RefCell::borrow_mut(&self.page_cache);
cache.insert(id, page);
}
pub fn usable_size(&self) -> usize {
let db_header = self.db_header.borrow();
(db_header.page_size - db_header.unused_space as u16) as usize
}
}

View File

@@ -357,7 +357,7 @@ impl PageContent {
}
}
pub fn cell_get(&self, idx: usize) -> Result<BTreeCell> {
pub fn cell_get(&self, idx: usize, pager: Rc<Pager>) -> Result<BTreeCell> {
let buf = self.as_ptr();
let ncells = self.cell_count();
@@ -371,7 +371,7 @@ impl PageContent {
let cell_pointer = cell_start + (idx * 2);
let cell_pointer = self.read_u16(cell_pointer) as usize;
read_btree_cell(buf, &self.page_type(), cell_pointer)
read_btree_cell(buf, &self.page_type(), cell_pointer, pager)
}
pub fn cell_get_raw_pointer_region(&self) -> (usize, usize) {
@@ -548,7 +548,12 @@ pub struct IndexLeafCell {
pub first_overflow_page: Option<u32>,
}
pub fn read_btree_cell(page: &[u8], page_type: &PageType, pos: usize) -> Result<BTreeCell> {
pub fn read_btree_cell(
page: &[u8],
page_type: &PageType,
pos: usize,
pager: Rc<Pager>,
) -> Result<BTreeCell> {
match page_type {
PageType::IndexInterior => {
let mut pos = pos;
@@ -557,7 +562,8 @@ pub fn read_btree_cell(page: &[u8], page_type: &PageType, pos: usize) -> Result<
pos += 4;
let (payload_size, nr) = read_varint(&page[pos..])?;
pos += nr;
let (payload, first_overflow_page) = read_payload(&page[pos..], payload_size as usize);
let (payload, first_overflow_page) =
read_payload(&page[pos..], payload_size as usize, pager);
Ok(BTreeCell::IndexInteriorCell(IndexInteriorCell {
left_child_page,
payload,
@@ -579,7 +585,8 @@ pub fn read_btree_cell(page: &[u8], page_type: &PageType, pos: usize) -> Result<
let mut pos = pos;
let (payload_size, nr) = read_varint(&page[pos..])?;
pos += nr;
let (payload, first_overflow_page) = read_payload(&page[pos..], payload_size as usize);
let (payload, first_overflow_page) =
read_payload(&page[pos..], payload_size as usize, pager);
Ok(BTreeCell::IndexLeafCell(IndexLeafCell {
payload,
first_overflow_page,
@@ -591,7 +598,8 @@ pub fn read_btree_cell(page: &[u8], page_type: &PageType, pos: usize) -> Result<
pos += nr;
let (rowid, nr) = read_varint(&page[pos..])?;
pos += nr;
let (payload, first_overflow_page) = read_payload(&page[pos..], payload_size as usize);
let (payload, first_overflow_page) =
read_payload(&page[pos..], payload_size as usize, pager);
Ok(BTreeCell::TableLeafCell(TableLeafCell {
_rowid: rowid,
_payload: payload,
@@ -603,20 +611,47 @@ pub fn read_btree_cell(page: &[u8], page_type: &PageType, pos: usize) -> Result<
/// read_payload takes in the unread bytearray with the payload size
/// and returns the payload on the page, and optionally the first overflow page number.
fn read_payload(unread: &[u8], payload_size: usize) -> (Vec<u8>, Option<u32>) {
let page_len = unread.len();
if payload_size <= page_len {
fn read_payload(unread: &[u8], payload_size: usize, pager: Rc<Pager>) -> (Vec<u8>, Option<u32>) {
let cell_len = unread.len();
if payload_size <= cell_len {
// fit within 1 page
(unread[..payload_size].to_vec(), None)
} else {
// overflow
let first_overflow_page = u32::from_be_bytes([
unread[page_len - 4],
unread[page_len - 3],
unread[page_len - 2],
unread[page_len - 1],
unread[cell_len - 4],
unread[cell_len - 3],
unread[cell_len - 2],
unread[cell_len - 1],
]);
(unread[..page_len - 4].to_vec(), Some(first_overflow_page))
let usable_size = pager.usable_size();
let mut next_overflow = first_overflow_page;
let mut payload = unread[..cell_len - 4].to_vec();
let mut left_to_read = payload_size - (cell_len - 4); // minus four because last for bytes of a payload cell are the overflow pointer
while next_overflow != 0 {
assert!(left_to_read > 0);
let page;
loop {
let page_ref = pager.read_page(next_overflow as usize);
if let Ok(p) = page_ref {
page = p;
break;
}
}
let page = page.borrow();
let contents = page.contents.write().unwrap();
let contents = contents.as_ref().unwrap();
let to_read = left_to_read.min(usable_size - 4);
let buf = contents.as_ptr();
payload.extend_from_slice(&buf[4..4 + to_read]);
next_overflow = contents.read_u32(0);
left_to_read -= to_read;
}
assert_eq!(left_to_read, 0);
(payload, Some(first_overflow_page))
}
}
@@ -761,7 +796,11 @@ pub fn read_value(buf: &[u8], serial_type: &SerialType) -> Result<(OwnedValue, u
}
SerialType::String(n) => {
if buf.len() < n {
crate::bail_corrupt_error!("Invalid String value");
crate::bail_corrupt_error!(
"Invalid String value, length {} < expected length {}",
buf.len(),
n
);
}
let bytes = buf[0..n].to_vec();
let value = unsafe { String::from_utf8_unchecked(bytes) };