diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 4c521cfb6..8a12a3199 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -1,15 +1,17 @@ use crate::storage::pager::{Page, Pager}; use crate::storage::sqlite3_ondisk::{ - read_varint, write_varint, BTreeCell, DatabaseHeader, PageContent, PageType, TableInteriorCell, - TableLeafCell, + read_btree_cell, read_varint, write_varint, BTreeCell, DatabaseHeader, PageContent, PageType, + TableInteriorCell, TableLeafCell, }; use crate::types::{Cursor, CursorResult, OwnedRecord, OwnedValue}; use crate::Result; use std::cell::{Ref, RefCell}; -use std::mem; +use std::mem::swap; use std::rc::Rc; +use super::sqlite3_ondisk::OverflowCell; + /* These are offsets of fields in the header of a b-tree page. */ @@ -99,6 +101,7 @@ impl BTreeCursor { } let page = page.contents.read().unwrap(); let page = page.as_ref().unwrap(); + if mem_page.cell_idx() >= page.cell_count() { let parent = mem_page.parent.clone(); match page.rightmost_pointer() { @@ -270,6 +273,7 @@ impl BTreeCursor { if page.is_locked() { return Ok(CursorResult::IO); } + let page = page.contents.read().unwrap(); let page = page.as_ref().unwrap(); if page.is_leaf() { @@ -344,7 +348,7 @@ impl BTreeCursor { } page.set_dirty(); - self.pager.add_dirty(page_ref.clone()); + self.pager.add_dirty(page.id); let mut page = page.contents.write().unwrap(); let page = page.as_mut().unwrap(); @@ -357,7 +361,7 @@ impl BTreeCursor { // TODO: if overwrite drop cell // insert cell - let mut payload: Vec = Vec::new(); + let mut cell_payload: Vec = Vec::new(); { // Data len will be prepended later @@ -367,13 +371,13 @@ impl BTreeCursor { let n = write_varint(&mut key_varint.as_mut_slice()[0..9], int_key); write_varint(&mut key_varint, int_key); key_varint.truncate(n); - payload.extend_from_slice(&key_varint); + cell_payload.extend_from_slice(&key_varint); } // Data payload - let payload_size_before_record = payload.len(); - _record.serialize(&mut payload); - let header_size = payload.len() - payload_size_before_record; + let payload_size_before_record = cell_payload.len(); + _record.serialize(&mut cell_payload); + let header_size = cell_payload.len() - payload_size_before_record; { // Data len @@ -384,40 +388,48 @@ impl BTreeCursor { header_size as u64, ); data_len_varint.truncate(n); - payload.splice(0..0, data_len_varint.iter().cloned()); + cell_payload.splice(0..0, data_len_varint.iter().cloned()); } let usable_space = { let db_header = RefCell::borrow(&self.database_header); (db_header.page_size - db_header.unused_space as u16) as usize }; - let free = { - let page = RefCell::borrow(&page_ref); - let mut page = page.contents.write().unwrap(); - let page = page.as_mut().unwrap(); - self.compute_free_space(page, RefCell::borrow(&self.database_header)) - }; assert!( - payload.len() <= usable_space - 100, /* 100 bytes minus for precaution to remember */ + cell_payload.len() <= usable_space - 100, /* 100 bytes minus for precaution to remember */ "need to implemented overflow pages, too big to even add to a an empty page" ); - if payload.len() + 2 > free as usize { - // overflow or balance - self.balance_leaf(int_key, payload); - } else { - // insert + + // insert + let overflow = { let page = RefCell::borrow(&page_ref); let mut page = page.contents.write().unwrap(); let page = page.as_mut().unwrap(); - self.insert_into_cell(page, &payload, cell_idx); + self.insert_into_cell(page, &cell_payload.as_slice(), cell_idx); + page.overflow_cells.len() + }; + + if overflow > 0 { + self.balance_leaf(); } Ok(CursorResult::Ok(())) } /* insert to postion and shift other pointers */ - fn insert_into_cell(&mut self, page: &mut PageContent, payload: &Vec, cell_idx: usize) { + fn insert_into_cell(&mut self, page: &mut PageContent, payload: &[u8], cell_idx: usize) { + let free = self.compute_free_space(page, RefCell::borrow(&self.database_header)); + let enough_space = payload.len() + 2 <= free as usize; + if !enough_space { + // add to overflow cell + page.overflow_cells.push(OverflowCell { + index: cell_idx, + payload: Vec::from(payload), + }); + return; + } + // TODO: insert into cell payload in internal page let pc = self.allocate_cell_space(page, payload.len() as u16); let buf = page.as_ptr(); @@ -446,6 +458,70 @@ impl BTreeCursor { page.write_u16(BTREE_HEADER_OFFSET_CELL_COUNT, new_n_cells); } + fn free_cell_range(&mut self, page: &mut PageContent, offset: u16, len: u16) { + if page.first_freeblock() == 0 { + // insert into empty list + page.write_u16(offset as usize, 0); + page.write_u16(offset as usize + 2, len as u16); + page.write_u16(BTREE_HEADER_OFFSET_FREEBLOCK, offset as u16); + return; + } + let first_block = page.first_freeblock(); + + if offset < first_block { + // insert into head of list + page.write_u16(offset as usize, first_block); + page.write_u16(offset as usize + 2, len as u16); + page.write_u16(BTREE_HEADER_OFFSET_FREEBLOCK, offset as u16); + return; + } + + if offset <= page.cell_content_area() { + // extend boundary of content area + page.write_u16(BTREE_HEADER_OFFSET_FREEBLOCK, page.first_freeblock()); + page.write_u16(BTREE_HEADER_OFFSET_CELL_CONTENT, offset + len); + return; + } + + let maxpc = { + let db_header = self.database_header.borrow(); + let usable_space = (db_header.page_size - db_header.unused_space as u16) as usize; + usable_space as u16 + }; + + let mut pc = first_block; + let mut prev = first_block; + + while pc <= maxpc && pc < offset as u16 { + let next = page.read_u16(pc as usize); + prev = pc; + pc = next; + } + + if pc >= maxpc { + // insert into tail + let offset = offset as usize; + let prev = prev as usize; + page.write_u16(prev, offset as u16); + page.write_u16(offset, 0); + page.write_u16(offset + 2, len); + } else { + // insert in between + let next = page.read_u16(pc as usize); + let offset = offset as usize; + let prev = prev as usize; + page.write_u16(prev, offset as u16); + page.write_u16(offset, next); + page.write_u16(offset + 2, len); + } + } + + fn drop_cell(&mut self, page: &mut PageContent, cell_idx: usize) { + let (cell_start, cell_len) = page.cell_get_raw_region(cell_idx); + self.free_cell_range(page, cell_start as u16, cell_len as u16); + page.write_u16(BTREE_HEADER_OFFSET_CELL_COUNT, page.cell_count() as u16 - 1); + } + fn get_page(&mut self) -> crate::Result>> { let mem_page = { let mem_page = self.page.borrow(); @@ -457,22 +533,66 @@ impl BTreeCursor { Ok(page_ref) } - fn balance_leaf(&mut self, key: u64, payload: Vec) { + fn balance_leaf(&mut self) { // This is a naive algorithm that doesn't try to distribute cells evenly by content. // It will try to split the page in half by keys not by content. // Sqlite tries to have a page at least 40% full. - let mut key = key; - let mut payload = payload; loop { let mem_page = { let mem_page = self.page.borrow(); let mem_page = mem_page.as_ref().unwrap(); mem_page.clone() }; - let page_ref = self.read_page_sync(mem_page.page_idx); - let mut page_rc = RefCell::borrow_mut(&page_ref); - let right_page_id = { + { + // check if we don't need to balance + let page_ref = self.read_page_sync(mem_page.page_idx); + let page_rc = RefCell::borrow(&page_ref); + + { + // don't continue if there are no overflow cells + let mut page = page_rc.contents.write().unwrap(); + let page = page.as_mut().unwrap(); + if page.overflow_cells.is_empty() { + break; + } + } + } + + println!("Balancing leaf. leaf={}", mem_page.page_idx); + if mem_page.parent.is_none() { + self.balance_root(); + continue; + } + + let page_ref = self.read_page_sync(mem_page.page_idx); + let page_rc = RefCell::borrow(&page_ref); + + // Copy of page used to reference cell bytes. + let page_copy = { + let mut page = page_rc.contents.write().unwrap(); + let page = page.as_mut().unwrap(); + page.clone() + }; + + // In memory in order copy of all cells in pages we want to balance. For now let's do a 2 page split. + // Right pointer in interior cells should be converted to regular cells if more than 2 pages are used for balancing. + let (scratch_cells, right_most_pointer) = { + let mut scratch_cells: Vec<&[u8]> = Vec::new(); + + for cell_idx in 0..page_copy.cell_count() { + let (start, len) = page_copy.cell_get_raw_region(cell_idx); + let buf = page_copy.as_ptr(); + scratch_cells.push(&buf[start..start + len]); + } + for overflow_cell in &page_copy.overflow_cells { + scratch_cells.insert(overflow_cell.index, &overflow_cell.payload); + } + (scratch_cells, page_copy.rightmost_pointer()) + }; + + // allocate new pages and move cells to those new pages + { // split procedure let mut page = page_rc.contents.write().unwrap(); let page = page.as_mut().unwrap(); @@ -484,11 +604,6 @@ impl BTreeCursor { ), "indexes still not supported " ); - if payload.len() + 2 <= free as usize { - let cell_idx = find_cell(page, key); - self.insert_into_cell(page, &payload, cell_idx); - break; - } let right_page_ref = self.allocate_page(page.page_type()); let right_page = RefCell::borrow_mut(&right_page_ref); @@ -496,127 +611,203 @@ impl BTreeCursor { let mut right_page = right_page.contents.write().unwrap(); let right_page = right_page.as_mut().unwrap(); { - // move data from one buffer to another - // done in a separate block to satisfy borrow checker - let left_buf: &mut [u8] = page.as_ptr(); - let right_buf: &mut [u8] = right_page.as_ptr(); + let is_leaf = page.is_leaf(); + let mut new_pages = vec![page, right_page]; + let new_pages_ids = vec![mem_page.page_idx, right_page_id]; + println!( + "splitting left={} right={}", + new_pages_ids[0], new_pages_ids[1] + ); - let mut rbrk = right_page.cell_content_area() as usize; + // drop divider cells and find right pointer + // NOTE: since we are doing a simple split we only finding the pointer we want to update (right pointer). + // Right pointer means cell that points to the last page, as we don't really want to drop this one. This one + // can be a "rightmost pointer" or a "cell". + // TODO(pere): simplify locking... + // we always asumme there is a parent + let parent_rc = mem_page.parent.as_ref().unwrap(); - let cells_to_move = page.cell_count() / 2; - let (mut cell_pointer_idx, _) = page.cell_get_raw_pointer_region(); - // move half of cells to right page - for cell_idx in cells_to_move..page.cell_count() { - let (start, len) = page.cell_get_raw_region(cell_idx); - // copy data - rbrk -= len; - right_buf[rbrk..rbrk + len].copy_from_slice(&left_buf[start..start + len]); - // set pointer - right_page.write_u16(cell_pointer_idx, rbrk as u16); - cell_pointer_idx += 2; + let parent_ref = self.read_page_sync(parent_rc.page_idx); + let parent = RefCell::borrow_mut(&parent_ref); + parent.set_dirty(); + self.pager.add_dirty(parent.id); + let mut parent = parent.contents.write().unwrap(); + let parent = parent.as_mut().unwrap(); + // if this isn't empty next loop won't work + assert!(parent.overflow_cells.is_empty()); + + // Right page pointer is u32 in right most pointer, and in cell is u32 too, so we can use a *u32 to hold where we want to change this value + let mut right_pointer = BTREE_HEADER_OFFSET_RIGHTMOST; + for cell_idx in 0..parent.cell_count() { + let cell = parent.cell_get(cell_idx).unwrap(); + let found = match cell { + BTreeCell::TableInteriorCell(interior) => { + interior._left_child_page as usize == mem_page.page_idx + } + _ => unreachable!("Parent should always be a "), + }; + if found { + let (start, len) = parent.cell_get_raw_region(cell_idx); + right_pointer = start; + break; + } + } + + // reset pages + for page in &new_pages { + page.write_u16(BTREE_HEADER_OFFSET_FREEBLOCK, 0); + page.write_u16(BTREE_HEADER_OFFSET_CELL_COUNT, 0); + + let db_header = RefCell::borrow(&self.database_header); + let cell_content_area_start = + db_header.page_size - db_header.unused_space as u16; + page.write_u16(BTREE_HEADER_OFFSET_CELL_CONTENT, cell_content_area_start); + + page.write_u8(BTREE_HEADER_OFFSET_FRAGMENTED, 0); + page.write_u32(BTREE_HEADER_OFFSET_RIGHTMOST, 0); + } + + // distribute cells + let new_pages_len = new_pages.len(); + let cells_per_page = scratch_cells.len() / new_pages.len(); + let mut current_cell_index = 0_usize; + let mut divider_cells_index = Vec::new(); /* index to scratch cells that will be used as dividers in order */ + + for (i, page) in new_pages.iter_mut().enumerate() { + let last_page = i == new_pages_len - 1; + let cells_to_copy = if last_page { + // last cells is remaining pages if division was odd + scratch_cells.len() - current_cell_index + } else { + cells_per_page + }; + + let mut i = 0; + for cell_idx in current_cell_index..current_cell_index + cells_to_copy { + let cell = scratch_cells[cell_idx]; + self.insert_into_cell(*page, cell, i); + i += 1; + } + divider_cells_index.push(current_cell_index + cells_to_copy - 1); + current_cell_index += cells_to_copy; + } + + // update rightmost pointer for each page if we are in interior page + if !is_leaf { + for page in new_pages.iter_mut().take(new_pages_len - 1) { + assert!(page.cell_count() == 1); + let last_cell = page.cell_get(page.cell_count() - 1).unwrap(); + let last_cell_pointer = match last_cell { + BTreeCell::TableInteriorCell(interior) => interior._left_child_page, + _ => unreachable!(), + }; + self.drop_cell(*page, page.cell_count() - 1); + page.write_u32(BTREE_HEADER_OFFSET_RIGHTMOST, last_cell_pointer); + } + // last page right most pointer points to previous right most pointer before splitting + let last_page = new_pages.last().unwrap(); + last_page + .write_u32(BTREE_HEADER_OFFSET_RIGHTMOST, right_most_pointer.unwrap()); + } + + // insert dividers in parent + // we can consider dividers the first cell of each page starting from the second page + for (page_id_index, page) in + new_pages.iter_mut().take(new_pages_len - 1).enumerate() + { + assert!(page.cell_count() > 1); + let divider_cell_index = divider_cells_index[page_id_index]; + let cell_payload = scratch_cells[divider_cell_index]; + let cell = read_btree_cell(cell_payload, &page.page_type(), 0).unwrap(); + if is_leaf { + // create a new divider cell and push + let key = match cell { + BTreeCell::TableLeafCell(leaf) => leaf._rowid, + _ => unreachable!(), + }; + let mut divider_cell = Vec::new(); + divider_cell.extend_from_slice( + &(new_pages_ids[page_id_index] as u32).to_be_bytes(), + ); + divider_cell.extend(std::iter::repeat(0).take(9)); + let n = write_varint(&mut divider_cell.as_mut_slice()[4..], key); + divider_cell.truncate(4 + n); + let parent_cell_idx = find_cell(parent, key); + self.insert_into_cell(parent, divider_cell.as_slice(), parent_cell_idx); + } else { + // move cell + let key = match cell { + BTreeCell::TableInteriorCell(interior) => interior._rowid, + _ => unreachable!(), + }; + let parent_cell_idx = find_cell(page, key); + self.insert_into_cell(parent, cell_payload, parent_cell_idx); + // self.drop_cell(*page, 0); + } + } + + { + // copy last page id to right pointer + let last_pointer = *new_pages_ids.last().unwrap() as u32; + parent.write_u32(right_pointer, last_pointer); } - // update cell count in both pages - let keys_moved = page.cell_count() - cells_to_move; - page.write_u16(BTREE_HEADER_OFFSET_CELL_COUNT, cells_to_move as u16); - right_page.write_u16(BTREE_HEADER_OFFSET_CELL_COUNT, keys_moved as u16); - // update cell content are start - right_page.write_u16(BTREE_HEADER_OFFSET_CELL_CONTENT, rbrk as u16); } - - let last_cell = page.cell_get(page.cell_count() - 1).unwrap(); - let last_cell_key = match &last_cell { - BTreeCell::TableLeafCell(cell) => cell._rowid, - BTreeCell::TableInteriorCell(cell) => cell._rowid, - _ => unreachable!(), /* not yet supported index tables */ - }; - // if not leaf page update rightmost pointer - if let PageType::TableInterior = page.page_type() { - right_page.write_u32( - BTREE_HEADER_OFFSET_RIGHTMOST, - page.rightmost_pointer().unwrap(), - ); - // convert last cell to rightmost pointer - let BTreeCell::TableInteriorCell(last_cell) = &last_cell else { - unreachable!(); - }; - page.write_u32(BTREE_HEADER_OFFSET_RIGHTMOST, last_cell._left_child_page); - // page count now has one less cell because we've added the last one to rightmost pointer - page.write_u16( - BTREE_HEADER_OFFSET_CELL_COUNT, - (page.cell_count() - 1) as u16, - ); - } - - // update free list block by defragmenting page - self.defragment_page(page, RefCell::borrow(&self.database_header)); - // insert into one of the pages - if key < last_cell_key { - let cell_idx = find_cell(page, key); - self.insert_into_cell(page, &payload, cell_idx); - } else { - let cell_idx = find_cell(right_page, key); - self.insert_into_cell(right_page, &payload, cell_idx); - } - // propagate parent split - key = last_cell_key; - right_page_id - }; - - payload = Vec::new(); - if mem_page.page_idx == self.root_page { - /* todo: balance deeper, create child and copy contents of root there. Then split root */ - /* if we are in root page then we just need to create a new root and push key there */ - let new_root_page_ref = self.allocate_page(PageType::TableInterior); - { - let new_root_page = RefCell::borrow_mut(&new_root_page_ref); - let new_root_page_id = new_root_page.id; - let mut new_root_page_contents = new_root_page.contents.write().unwrap(); - let new_root_page_contents = new_root_page_contents.as_mut().unwrap(); - /* - Note that we set cell pointer to point to itself, because we will later swap this page's - content with splitted page in order to not update root page idx. - */ - let new_root_page_id = new_root_page_id as u32; - payload.extend_from_slice(&new_root_page_id.to_be_bytes()); - payload.extend(std::iter::repeat(0).take(9)); - let n = write_varint(&mut payload.as_mut_slice()[4..], key); - payload.truncate(4 + n); - - // write left child cell - self.insert_into_cell(new_root_page_contents, &payload, 0); - - // write right child cell - new_root_page_contents - .write_u32(BTREE_HEADER_OFFSET_RIGHTMOST, right_page_id as u32); - } - - /* swap splitted page buffer with new root buffer so we don't have to update page idx */ - { - let mut new_root_page = RefCell::borrow_mut(&new_root_page_ref); - mem::swap(&mut *new_root_page, &mut *page_rc); - - // now swap contents - let mut new_root_page_contents = new_root_page.contents.write().unwrap(); - let mut page_contents = page_rc.contents.write().unwrap(); - std::mem::swap(&mut *new_root_page_contents, &mut *page_contents); - - self.page = - RefCell::new(Some(Rc::new(MemPage::new(None, new_root_page.id, 0)))); - } - - break; } - // Propagate split divided to top. - payload.extend_from_slice(&(mem_page.page_idx as u32).to_be_bytes()); - payload.extend(std::iter::repeat(0).take(9)); - let n = write_varint(&mut payload.as_mut_slice()[4..], key); - payload.truncate(n); - self.page = RefCell::new(Some(mem_page.parent.as_ref().unwrap().clone())); } } + fn balance_root(&mut self) { + /* todo: balance deeper, create child and copy contents of root there. Then split root */ + /* if we are in root page then we just need to create a new root and push key there */ + let mem_page = { + let mem_page = self.page.borrow(); + let mem_page = mem_page.as_ref().unwrap(); + mem_page.clone() + }; + + let new_root_page_ref = self.allocate_page(PageType::TableInterior); + { + let new_root_page = RefCell::borrow(&new_root_page_ref); + let new_root_page_id = new_root_page.id; + let mut new_root_page_contents = new_root_page.contents.write().unwrap(); + let new_root_page_contents = new_root_page_contents.as_mut().unwrap(); + // point new root right child to previous root + new_root_page_contents + .write_u32(BTREE_HEADER_OFFSET_RIGHTMOST, new_root_page_id as u32); + new_root_page_contents.write_u16(BTREE_HEADER_OFFSET_CELL_COUNT, 0); + } + + /* swap splitted page buffer with new root buffer so we don't have to update page idx */ + { + let page_ref = self.read_page_sync(mem_page.page_idx); + let (root_id, child_id) = { + let mut page_rc = RefCell::borrow_mut(&page_ref); + let mut new_root_page = RefCell::borrow_mut(&new_root_page_ref); + + // Swap the entire Page structs + std::mem::swap(&mut page_rc.id, &mut new_root_page.id); + + self.pager.add_dirty(new_root_page.id); + self.pager.add_dirty(page_rc.id); + (new_root_page.id, page_rc.id) + }; + { + let page_rc = RefCell::borrow_mut(&page_ref); + } + + let root = new_root_page_ref.clone(); + let child = page_ref.clone(); + + let parent = Some(Rc::new(MemPage::new(None, root_id, 0))); + self.page = RefCell::new(Some(Rc::new(MemPage::new(parent, child_id, 0)))); + println!("Balancing root. root={}, rightmost={}", root_id, child_id); + self.pager.put_page(root_id, root); + self.pager.put_page(child_id, child); + } + } + fn read_page_sync(&mut self, page_idx: usize) -> Rc> { loop { let page_ref = self.pager.read_page(page_idx); diff --git a/core/storage/pager.rs b/core/storage/pager.rs index de4c0615e..ae26314f8 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -6,7 +6,7 @@ use crate::{Buffer, Result}; use log::trace; use sieve_cache::SieveCache; use std::cell::RefCell; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::hash::Hash; use std::ptr::{drop_in_place, NonNull}; use std::rc::Rc; @@ -267,7 +267,7 @@ pub struct Pager { buffer_pool: Rc, /// I/O interface for input/output operations. pub io: Arc, - dirty_pages: Rc>>>>, + dirty_pages: Rc>>, db_header: Rc>, } @@ -294,7 +294,7 @@ impl Pager { buffer_pool, page_cache, io, - dirty_pages: Rc::new(RefCell::new(Vec::new())), + dirty_pages: Rc::new(RefCell::new(HashSet::new())), db_header: db_header_ref.clone(), }) } @@ -347,10 +347,10 @@ impl Pager { self.page_cache.borrow_mut().resize(capacity); } - pub fn add_dirty(&self, page: Rc>) { + pub fn add_dirty(&self, page_id: usize) { // TODO: cehck duplicates? let mut dirty_pages = RefCell::borrow_mut(&self.dirty_pages); - dirty_pages.push(page); + dirty_pages.insert(page_id); } pub fn cacheflush(&self) -> Result<()> { @@ -358,13 +358,12 @@ impl Pager { if dirty_pages.len() == 0 { return Ok(()); } - loop { - if dirty_pages.len() == 0 { - break; - } - let page = dirty_pages.pop().unwrap(); + for page_id in dirty_pages.iter() { + let mut cache = self.page_cache.borrow_mut(); + let page = cache.get(&page_id).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it."); sqlite3_ondisk::begin_write_btree_page(self, &page)?; } + dirty_pages.clear(); self.io.run_once()?; Ok(()) } @@ -382,7 +381,7 @@ impl Pager { let first_page_ref = self.read_page(1).unwrap(); let first_page = RefCell::borrow_mut(&first_page_ref); first_page.set_dirty(); - self.add_dirty(first_page_ref.clone()); + self.add_dirty(1); let contents = first_page.contents.write().unwrap(); let contents = contents.as_ref().unwrap(); @@ -392,20 +391,29 @@ impl Pager { let page_ref = Rc::new(RefCell::new(Page::new(0))); { // setup page and add to cache - self.add_dirty(page_ref.clone()); let mut page = RefCell::borrow_mut(&page_ref); - page.set_dirty(); page.id = header.database_size as usize; + page.set_dirty(); + self.add_dirty(page.id); let buffer = self.buffer_pool.get(); let bp = self.buffer_pool.clone(); let drop_fn = Rc::new(move |buf| { bp.put(buf); }); let buffer = Rc::new(RefCell::new(Buffer::new(buffer, drop_fn))); - page.contents = RwLock::new(Some(PageContent { offset: 0, buffer })); + page.contents = RwLock::new(Some(PageContent { + offset: 0, + buffer, + overflow_cells: Vec::new(), + })); let mut cache = RefCell::borrow_mut(&self.page_cache); cache.insert(page.id, page_ref.clone()); } Ok(page_ref) } + + pub fn put_page(&self, id: usize, page: Rc>) { + let mut cache = RefCell::borrow_mut(&self.page_cache); + cache.insert(id, page); + } } diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 64ea91b29..99dd6e362 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -260,10 +260,17 @@ impl TryFrom for PageType { } } +#[derive(Debug, Clone)] +pub struct OverflowCell { + pub index: usize, + pub payload: Vec, +} + #[derive(Debug)] pub struct PageContent { pub offset: usize, pub buffer: Rc>, + pub overflow_cells: Vec, } impl Clone for PageContent { @@ -271,6 +278,7 @@ impl Clone for PageContent { Self { offset: self.offset, buffer: Rc::new(RefCell::new((*self.buffer.borrow()).clone())), + overflow_cells: self.overflow_cells.clone(), } } } @@ -294,7 +302,7 @@ impl PageContent { buf[pos] } - fn read_u16(&self, pos: usize) -> u16 { + pub fn read_u16(&self, pos: usize) -> u16 { let buf = self.as_ptr(); u16::from_be_bytes([buf[self.offset + pos], buf[self.offset + pos + 1]]) } @@ -376,6 +384,7 @@ impl PageContent { (self.offset + cell_start, self.cell_count() * 2) } + /* Get region of a cell's payload */ pub fn cell_get_raw_region(&self, idx: usize) -> (usize, usize) { let buf = self.as_ptr(); let ncells = self.cell_count(); @@ -465,6 +474,7 @@ fn finish_read_page( let inner = PageContent { offset: pos, buffer: buffer_ref.clone(), + overflow_cells: Vec::new(), }; { let page = page.borrow_mut();