From e9bc4b04a736f0abf8e2328d479e206bbd66bceb Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Fri, 13 Sep 2024 20:32:33 +0200 Subject: [PATCH] overflow pages support --- core/storage/btree.rs | 160 +++++++++++++++++++++++++-------- core/storage/pager.rs | 5 ++ core/storage/sqlite3_ondisk.rs | 69 ++++++++++---- 3 files changed, 180 insertions(+), 54 deletions(-) diff --git a/core/storage/btree.rs b/core/storage/btree.rs index d7d4f956c..7d3af851a 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -118,7 +118,7 @@ impl BTreeCursor { }, } } - let cell = page.cell_get(mem_page.cell_idx())?; + let cell = page.cell_get(mem_page.cell_idx(), self.pager.clone())?; match &cell { BTreeCell::TableInteriorCell(TableInteriorCell { _left_child_page, @@ -167,7 +167,7 @@ impl BTreeCursor { let page = page.as_ref().unwrap(); for cell_idx in 0..page.cell_count() { - match &page.cell_get(cell_idx)? { + match &page.cell_get(cell_idx, self.pager.clone())? { BTreeCell::TableLeafCell(TableLeafCell { _rowid: cell_rowid, _payload: p, @@ -271,7 +271,7 @@ impl BTreeCursor { let mut found_cell = false; for cell_idx in 0..page.cell_count() { - match &page.cell_get(cell_idx)? { + match &page.cell_get(cell_idx, self.pager.clone())? { BTreeCell::TableInteriorCell(TableInteriorCell { _left_child_page, _rowid, @@ -344,7 +344,7 @@ impl BTreeCursor { assert!(matches!(page.page_type(), PageType::TableLeaf)); // find cell - (find_cell(page, int_key), page.page_type()) + (self.find_cell(page, int_key), page.page_type()) }; // TODO: if overwrite drop cell @@ -593,7 +593,7 @@ impl BTreeCursor { // Right page pointer is u32 in right most pointer, and in cell is u32 too, so we can use a *u32 to hold where we want to change this value let mut right_pointer = BTREE_HEADER_OFFSET_RIGHTMOST; for cell_idx in 0..parent.cell_count() { - let cell = parent.cell_get(cell_idx).unwrap(); + let cell = parent.cell_get(cell_idx, self.pager.clone()).unwrap(); let found = match cell { BTreeCell::TableInteriorCell(interior) => { interior._left_child_page as usize == mem_page.page_idx @@ -650,7 +650,9 @@ impl BTreeCursor { if !is_leaf { for page in new_pages.iter_mut().take(new_pages_len - 1) { assert!(page.cell_count() == 1); - let last_cell = page.cell_get(page.cell_count() - 1).unwrap(); + let last_cell = page + .cell_get(page.cell_count() - 1, self.pager.clone()) + .unwrap(); let last_cell_pointer = match last_cell { BTreeCell::TableInteriorCell(interior) => interior._left_child_page, _ => unreachable!(), @@ -672,7 +674,9 @@ impl BTreeCursor { assert!(page.cell_count() > 1); let divider_cell_index = divider_cells_index[page_id_index]; let cell_payload = scratch_cells[divider_cell_index]; - let cell = read_btree_cell(cell_payload, &page.page_type(), 0).unwrap(); + let cell = + read_btree_cell(cell_payload, &page.page_type(), 0, self.pager.clone()) + .unwrap(); if is_leaf { // create a new divider cell and push let key = match cell { @@ -686,7 +690,7 @@ impl BTreeCursor { divider_cell.extend(std::iter::repeat(0).take(9)); let n = write_varint(&mut divider_cell.as_mut_slice()[4..], key); divider_cell.truncate(4 + n); - let parent_cell_idx = find_cell(parent, key); + let parent_cell_idx = self.find_cell(parent, key); self.insert_into_cell(parent, divider_cell.as_slice(), parent_cell_idx); } else { // move cell @@ -694,7 +698,7 @@ impl BTreeCursor { BTreeCell::TableInteriorCell(interior) => interior._rowid, _ => unreachable!(), }; - let parent_cell_idx = find_cell(page, key); + let parent_cell_idx = self.find_cell(page, key); self.insert_into_cell(parent, cell_payload, parent_cell_idx); // self.drop_cell(*page, 0); } @@ -792,6 +796,21 @@ impl BTreeCursor { page } + fn allocate_overflow_page(&self) -> Rc> { + let page = self.pager.allocate_page().unwrap(); + + { + // setup overflow page + let contents = RefCell::borrow(&page); + let mut contents = contents.contents.write().unwrap(); + let contents = contents.as_mut().unwrap(); + let buf = contents.as_ptr(); + buf.fill(0); + } + + page + } + /* Allocate space for a cell on a page. */ @@ -1009,19 +1028,67 @@ impl BTreeCursor { write_varint_to_vec(record_buf.len() as u64, cell_payload); } - if record_buf.len() <= self.max_local(page_type) { + let max_local = self.max_local(page_type.clone()); + if record_buf.len() <= max_local { // enough allowed space to fit inside a btree page cell_payload.extend_from_slice(record_buf.as_slice()); + cell_payload.resize(cell_payload.len() + 4, 0); return; } - todo!("implement overflow page"); + + let min_local = self.min_local(page_type); + let mut space_left = min_local + (record_buf.len() - min_local) % (self.usable_space() - 4); + + if space_left > max_local { + space_left = min_local; + } + + // cell_size must be equal to first value of space_left as this will be the bytes copied to non-overflow page. + let cell_size = space_left + cell_payload.len() + 4; // 4 is the number of bytes of pointer to first overflow page + let mut to_copy_buffer = record_buf.as_slice(); + + let prev_size = cell_payload.len(); + cell_payload.resize(prev_size + space_left + 4, 0); + let mut pointer = unsafe { cell_payload.as_mut_ptr().add(prev_size) }; + let mut pointer_to_next = unsafe { cell_payload.as_mut_ptr().add(prev_size + space_left) }; + let mut overflow_pages = Vec::new(); + + loop { + let to_copy = space_left.min(to_copy_buffer.len()); + unsafe { std::ptr::copy(to_copy_buffer.as_ptr(), pointer, to_copy) }; + + let left = to_copy_buffer.len() - to_copy; + if left == 0 { + break; + } + + // we still have bytes to add, we will need to allocate new overflow page + let overflow_page = self.allocate_overflow_page(); + overflow_pages.push(overflow_page.clone()); + { + let page = overflow_page.borrow(); + let mut contents_lock = page.contents.write().unwrap(); + let contents = contents_lock.as_mut().unwrap(); + + let buf = contents.as_ptr(); + let id = page.id as u32; + let as_bytes = id.to_be_bytes(); + // update pointer to new overflow page + unsafe { std::ptr::copy(as_bytes.as_ptr(), pointer_to_next, 4) }; + + pointer = unsafe { buf.as_mut_ptr().add(4) }; + pointer_to_next = buf.as_mut_ptr(); + space_left = self.usable_space() - 4; + } + + to_copy_buffer = &to_copy_buffer[to_copy..]; + } + + assert_eq!(cell_size, cell_payload.len()); } fn max_local(&self, page_type: PageType) -> usize { - let usable_space = { - let db_header = RefCell::borrow(&self.database_header); - (db_header.page_size - db_header.unused_space as u16) as usize - }; + let usable_space = self.usable_space(); match page_type { PageType::IndexInterior | PageType::TableInterior => { (usable_space - 12) * 64 / 255 - 23 @@ -1029,6 +1096,43 @@ impl BTreeCursor { PageType::IndexLeaf | PageType::TableLeaf => usable_space - 35, } } + + fn min_local(&self, page_type: PageType) -> usize { + let usable_space = self.usable_space(); + match page_type { + PageType::IndexInterior | PageType::TableInterior => { + (usable_space - 12) * 32 / 255 - 23 + } + PageType::IndexLeaf | PageType::TableLeaf => (usable_space - 12) * 32 / 255 - 23, + } + } + + fn usable_space(&self) -> usize { + let db_header = RefCell::borrow(&self.database_header); + (db_header.page_size - db_header.unused_space as u16) as usize + } + + fn find_cell(&self, page: &PageContent, int_key: u64) -> usize { + let mut cell_idx = 0; + let cell_count = page.cell_count(); + while cell_idx < cell_count { + match page.cell_get(cell_idx, self.pager.clone()).unwrap() { + BTreeCell::TableLeafCell(cell) => { + if int_key <= cell._rowid { + break; + } + } + BTreeCell::TableInteriorCell(cell) => { + if int_key <= cell._rowid { + break; + } + } + _ => todo!(), + } + cell_idx += 1; + } + cell_idx + } } fn find_free_cell(page_ref: &PageContent, db_header: Ref, amount: usize) -> usize { @@ -1184,11 +1288,11 @@ impl Cursor for BTreeCursor { OwnedValue::Integer(i) => *i as u64, _ => unreachable!("btree tables are indexed by integers!"), }; - let cell_idx = find_cell(page, int_key); + let cell_idx = self.find_cell(page, int_key); if cell_idx >= page.cell_count() { Ok(CursorResult::Ok(false)) } else { - let equals = match &page.cell_get(cell_idx)? { + let equals = match &page.cell_get(cell_idx, self.pager.clone())? { BTreeCell::TableLeafCell(l) => l._rowid == int_key, _ => unreachable!(), }; @@ -1196,25 +1300,3 @@ impl Cursor for BTreeCursor { } } } - -fn find_cell(page: &PageContent, int_key: u64) -> usize { - let mut cell_idx = 0; - let cell_count = page.cell_count(); - while cell_idx < cell_count { - match page.cell_get(cell_idx).unwrap() { - BTreeCell::TableLeafCell(cell) => { - if int_key <= cell._rowid { - break; - } - } - BTreeCell::TableInteriorCell(cell) => { - if int_key <= cell._rowid { - break; - } - } - _ => todo!(), - } - cell_idx += 1; - } - cell_idx -} diff --git a/core/storage/pager.rs b/core/storage/pager.rs index ae26314f8..8d4ce4747 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -416,4 +416,9 @@ impl Pager { let mut cache = RefCell::borrow_mut(&self.page_cache); cache.insert(id, page); } + + pub fn usable_size(&self) -> usize { + let db_header = self.db_header.borrow(); + (db_header.page_size - db_header.unused_space as u16) as usize + } } diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index f2a216131..7bbeea346 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -357,7 +357,7 @@ impl PageContent { } } - pub fn cell_get(&self, idx: usize) -> Result { + pub fn cell_get(&self, idx: usize, pager: Rc) -> Result { let buf = self.as_ptr(); let ncells = self.cell_count(); @@ -371,7 +371,7 @@ impl PageContent { let cell_pointer = cell_start + (idx * 2); let cell_pointer = self.read_u16(cell_pointer) as usize; - read_btree_cell(buf, &self.page_type(), cell_pointer) + read_btree_cell(buf, &self.page_type(), cell_pointer, pager) } pub fn cell_get_raw_pointer_region(&self) -> (usize, usize) { @@ -548,7 +548,12 @@ pub struct IndexLeafCell { pub first_overflow_page: Option, } -pub fn read_btree_cell(page: &[u8], page_type: &PageType, pos: usize) -> Result { +pub fn read_btree_cell( + page: &[u8], + page_type: &PageType, + pos: usize, + pager: Rc, +) -> Result { match page_type { PageType::IndexInterior => { let mut pos = pos; @@ -557,7 +562,8 @@ pub fn read_btree_cell(page: &[u8], page_type: &PageType, pos: usize) -> Result< pos += 4; let (payload_size, nr) = read_varint(&page[pos..])?; pos += nr; - let (payload, first_overflow_page) = read_payload(&page[pos..], payload_size as usize); + let (payload, first_overflow_page) = + read_payload(&page[pos..], payload_size as usize, pager); Ok(BTreeCell::IndexInteriorCell(IndexInteriorCell { left_child_page, payload, @@ -579,7 +585,8 @@ pub fn read_btree_cell(page: &[u8], page_type: &PageType, pos: usize) -> Result< let mut pos = pos; let (payload_size, nr) = read_varint(&page[pos..])?; pos += nr; - let (payload, first_overflow_page) = read_payload(&page[pos..], payload_size as usize); + let (payload, first_overflow_page) = + read_payload(&page[pos..], payload_size as usize, pager); Ok(BTreeCell::IndexLeafCell(IndexLeafCell { payload, first_overflow_page, @@ -591,7 +598,8 @@ pub fn read_btree_cell(page: &[u8], page_type: &PageType, pos: usize) -> Result< pos += nr; let (rowid, nr) = read_varint(&page[pos..])?; pos += nr; - let (payload, first_overflow_page) = read_payload(&page[pos..], payload_size as usize); + let (payload, first_overflow_page) = + read_payload(&page[pos..], payload_size as usize, pager); Ok(BTreeCell::TableLeafCell(TableLeafCell { _rowid: rowid, _payload: payload, @@ -603,20 +611,47 @@ pub fn read_btree_cell(page: &[u8], page_type: &PageType, pos: usize) -> Result< /// read_payload takes in the unread bytearray with the payload size /// and returns the payload on the page, and optionally the first overflow page number. -fn read_payload(unread: &[u8], payload_size: usize) -> (Vec, Option) { - let page_len = unread.len(); - if payload_size <= page_len { +fn read_payload(unread: &[u8], payload_size: usize, pager: Rc) -> (Vec, Option) { + let cell_len = unread.len(); + if payload_size <= cell_len { // fit within 1 page (unread[..payload_size].to_vec(), None) } else { // overflow let first_overflow_page = u32::from_be_bytes([ - unread[page_len - 4], - unread[page_len - 3], - unread[page_len - 2], - unread[page_len - 1], + unread[cell_len - 4], + unread[cell_len - 3], + unread[cell_len - 2], + unread[cell_len - 1], ]); - (unread[..page_len - 4].to_vec(), Some(first_overflow_page)) + let usable_size = pager.usable_size(); + let mut next_overflow = first_overflow_page; + let mut payload = unread[..cell_len - 4].to_vec(); + let mut left_to_read = payload_size - (cell_len - 4); // minus four because last for bytes of a payload cell are the overflow pointer + while next_overflow != 0 { + assert!(left_to_read > 0); + let page; + loop { + let page_ref = pager.read_page(next_overflow as usize); + if let Ok(p) = page_ref { + page = p; + break; + } + } + let page = page.borrow(); + let contents = page.contents.write().unwrap(); + let contents = contents.as_ref().unwrap(); + + let to_read = left_to_read.min(usable_size - 4); + let buf = contents.as_ptr(); + payload.extend_from_slice(&buf[4..4 + to_read]); + + next_overflow = contents.read_u32(0); + left_to_read -= to_read; + } + assert_eq!(left_to_read, 0); + + (payload, Some(first_overflow_page)) } } @@ -761,7 +796,11 @@ pub fn read_value(buf: &[u8], serial_type: &SerialType) -> Result<(OwnedValue, u } SerialType::String(n) => { if buf.len() < n { - crate::bail_corrupt_error!("Invalid String value"); + crate::bail_corrupt_error!( + "Invalid String value, length {} < expected length {}", + buf.len(), + n + ); } let bytes = buf[0..n].to_vec(); let value = unsafe { String::from_utf8_unchecked(bytes) };