core: multiple level btree page split

Signed-off-by: Pere Diaz Bou <pere-altea@hotmail.com>
This commit is contained in:
Pere Diaz Bou
2024-09-03 18:35:53 +02:00
parent ec94770d08
commit d87f9c9774
3 changed files with 370 additions and 161 deletions

View File

@@ -1,15 +1,17 @@
use crate::storage::pager::{Page, Pager};
use crate::storage::sqlite3_ondisk::{
read_varint, write_varint, BTreeCell, DatabaseHeader, PageContent, PageType, TableInteriorCell,
TableLeafCell,
read_btree_cell, read_varint, write_varint, BTreeCell, DatabaseHeader, PageContent, PageType,
TableInteriorCell, TableLeafCell,
};
use crate::types::{Cursor, CursorResult, OwnedRecord, OwnedValue};
use crate::Result;
use std::cell::{Ref, RefCell};
use std::mem;
use std::mem::swap;
use std::rc::Rc;
use super::sqlite3_ondisk::OverflowCell;
/*
These are offsets of fields in the header of a b-tree page.
*/
@@ -99,6 +101,7 @@ impl BTreeCursor {
}
let page = page.contents.read().unwrap();
let page = page.as_ref().unwrap();
if mem_page.cell_idx() >= page.cell_count() {
let parent = mem_page.parent.clone();
match page.rightmost_pointer() {
@@ -270,6 +273,7 @@ impl BTreeCursor {
if page.is_locked() {
return Ok(CursorResult::IO);
}
let page = page.contents.read().unwrap();
let page = page.as_ref().unwrap();
if page.is_leaf() {
@@ -344,7 +348,7 @@ impl BTreeCursor {
}
page.set_dirty();
self.pager.add_dirty(page_ref.clone());
self.pager.add_dirty(page.id);
let mut page = page.contents.write().unwrap();
let page = page.as_mut().unwrap();
@@ -357,7 +361,7 @@ impl BTreeCursor {
// TODO: if overwrite drop cell
// insert cell
let mut payload: Vec<u8> = Vec::new();
let mut cell_payload: Vec<u8> = Vec::new();
{
// Data len will be prepended later
@@ -367,13 +371,13 @@ impl BTreeCursor {
let n = write_varint(&mut key_varint.as_mut_slice()[0..9], int_key);
write_varint(&mut key_varint, int_key);
key_varint.truncate(n);
payload.extend_from_slice(&key_varint);
cell_payload.extend_from_slice(&key_varint);
}
// Data payload
let payload_size_before_record = payload.len();
_record.serialize(&mut payload);
let header_size = payload.len() - payload_size_before_record;
let payload_size_before_record = cell_payload.len();
_record.serialize(&mut cell_payload);
let header_size = cell_payload.len() - payload_size_before_record;
{
// Data len
@@ -384,40 +388,48 @@ impl BTreeCursor {
header_size as u64,
);
data_len_varint.truncate(n);
payload.splice(0..0, data_len_varint.iter().cloned());
cell_payload.splice(0..0, data_len_varint.iter().cloned());
}
let usable_space = {
let db_header = RefCell::borrow(&self.database_header);
(db_header.page_size - db_header.unused_space as u16) as usize
};
let free = {
let page = RefCell::borrow(&page_ref);
let mut page = page.contents.write().unwrap();
let page = page.as_mut().unwrap();
self.compute_free_space(page, RefCell::borrow(&self.database_header))
};
assert!(
payload.len() <= usable_space - 100, /* 100 bytes minus for precaution to remember */
cell_payload.len() <= usable_space - 100, /* 100 bytes minus for precaution to remember */
"need to implemented overflow pages, too big to even add to a an empty page"
);
if payload.len() + 2 > free as usize {
// overflow or balance
self.balance_leaf(int_key, payload);
} else {
// insert
// insert
let overflow = {
let page = RefCell::borrow(&page_ref);
let mut page = page.contents.write().unwrap();
let page = page.as_mut().unwrap();
self.insert_into_cell(page, &payload, cell_idx);
self.insert_into_cell(page, &cell_payload.as_slice(), cell_idx);
page.overflow_cells.len()
};
if overflow > 0 {
self.balance_leaf();
}
Ok(CursorResult::Ok(()))
}
/* insert to postion and shift other pointers */
fn insert_into_cell(&mut self, page: &mut PageContent, payload: &Vec<u8>, cell_idx: usize) {
fn insert_into_cell(&mut self, page: &mut PageContent, payload: &[u8], cell_idx: usize) {
let free = self.compute_free_space(page, RefCell::borrow(&self.database_header));
let enough_space = payload.len() + 2 <= free as usize;
if !enough_space {
// add to overflow cell
page.overflow_cells.push(OverflowCell {
index: cell_idx,
payload: Vec::from(payload),
});
return;
}
// TODO: insert into cell payload in internal page
let pc = self.allocate_cell_space(page, payload.len() as u16);
let buf = page.as_ptr();
@@ -446,6 +458,70 @@ impl BTreeCursor {
page.write_u16(BTREE_HEADER_OFFSET_CELL_COUNT, new_n_cells);
}
fn free_cell_range(&mut self, page: &mut PageContent, offset: u16, len: u16) {
if page.first_freeblock() == 0 {
// insert into empty list
page.write_u16(offset as usize, 0);
page.write_u16(offset as usize + 2, len as u16);
page.write_u16(BTREE_HEADER_OFFSET_FREEBLOCK, offset as u16);
return;
}
let first_block = page.first_freeblock();
if offset < first_block {
// insert into head of list
page.write_u16(offset as usize, first_block);
page.write_u16(offset as usize + 2, len as u16);
page.write_u16(BTREE_HEADER_OFFSET_FREEBLOCK, offset as u16);
return;
}
if offset <= page.cell_content_area() {
// extend boundary of content area
page.write_u16(BTREE_HEADER_OFFSET_FREEBLOCK, page.first_freeblock());
page.write_u16(BTREE_HEADER_OFFSET_CELL_CONTENT, offset + len);
return;
}
let maxpc = {
let db_header = self.database_header.borrow();
let usable_space = (db_header.page_size - db_header.unused_space as u16) as usize;
usable_space as u16
};
let mut pc = first_block;
let mut prev = first_block;
while pc <= maxpc && pc < offset as u16 {
let next = page.read_u16(pc as usize);
prev = pc;
pc = next;
}
if pc >= maxpc {
// insert into tail
let offset = offset as usize;
let prev = prev as usize;
page.write_u16(prev, offset as u16);
page.write_u16(offset, 0);
page.write_u16(offset + 2, len);
} else {
// insert in between
let next = page.read_u16(pc as usize);
let offset = offset as usize;
let prev = prev as usize;
page.write_u16(prev, offset as u16);
page.write_u16(offset, next);
page.write_u16(offset + 2, len);
}
}
fn drop_cell(&mut self, page: &mut PageContent, cell_idx: usize) {
let (cell_start, cell_len) = page.cell_get_raw_region(cell_idx);
self.free_cell_range(page, cell_start as u16, cell_len as u16);
page.write_u16(BTREE_HEADER_OFFSET_CELL_COUNT, page.cell_count() as u16 - 1);
}
fn get_page(&mut self) -> crate::Result<Rc<RefCell<Page>>> {
let mem_page = {
let mem_page = self.page.borrow();
@@ -457,22 +533,66 @@ impl BTreeCursor {
Ok(page_ref)
}
fn balance_leaf(&mut self, key: u64, payload: Vec<u8>) {
fn balance_leaf(&mut self) {
// This is a naive algorithm that doesn't try to distribute cells evenly by content.
// It will try to split the page in half by keys not by content.
// Sqlite tries to have a page at least 40% full.
let mut key = key;
let mut payload = payload;
loop {
let mem_page = {
let mem_page = self.page.borrow();
let mem_page = mem_page.as_ref().unwrap();
mem_page.clone()
};
let page_ref = self.read_page_sync(mem_page.page_idx);
let mut page_rc = RefCell::borrow_mut(&page_ref);
let right_page_id = {
{
// check if we don't need to balance
let page_ref = self.read_page_sync(mem_page.page_idx);
let page_rc = RefCell::borrow(&page_ref);
{
// don't continue if there are no overflow cells
let mut page = page_rc.contents.write().unwrap();
let page = page.as_mut().unwrap();
if page.overflow_cells.is_empty() {
break;
}
}
}
println!("Balancing leaf. leaf={}", mem_page.page_idx);
if mem_page.parent.is_none() {
self.balance_root();
continue;
}
let page_ref = self.read_page_sync(mem_page.page_idx);
let page_rc = RefCell::borrow(&page_ref);
// Copy of page used to reference cell bytes.
let page_copy = {
let mut page = page_rc.contents.write().unwrap();
let page = page.as_mut().unwrap();
page.clone()
};
// In memory in order copy of all cells in pages we want to balance. For now let's do a 2 page split.
// Right pointer in interior cells should be converted to regular cells if more than 2 pages are used for balancing.
let (scratch_cells, right_most_pointer) = {
let mut scratch_cells: Vec<&[u8]> = Vec::new();
for cell_idx in 0..page_copy.cell_count() {
let (start, len) = page_copy.cell_get_raw_region(cell_idx);
let buf = page_copy.as_ptr();
scratch_cells.push(&buf[start..start + len]);
}
for overflow_cell in &page_copy.overflow_cells {
scratch_cells.insert(overflow_cell.index, &overflow_cell.payload);
}
(scratch_cells, page_copy.rightmost_pointer())
};
// allocate new pages and move cells to those new pages
{
// split procedure
let mut page = page_rc.contents.write().unwrap();
let page = page.as_mut().unwrap();
@@ -484,11 +604,6 @@ impl BTreeCursor {
),
"indexes still not supported "
);
if payload.len() + 2 <= free as usize {
let cell_idx = find_cell(page, key);
self.insert_into_cell(page, &payload, cell_idx);
break;
}
let right_page_ref = self.allocate_page(page.page_type());
let right_page = RefCell::borrow_mut(&right_page_ref);
@@ -496,127 +611,203 @@ impl BTreeCursor {
let mut right_page = right_page.contents.write().unwrap();
let right_page = right_page.as_mut().unwrap();
{
// move data from one buffer to another
// done in a separate block to satisfy borrow checker
let left_buf: &mut [u8] = page.as_ptr();
let right_buf: &mut [u8] = right_page.as_ptr();
let is_leaf = page.is_leaf();
let mut new_pages = vec![page, right_page];
let new_pages_ids = vec![mem_page.page_idx, right_page_id];
println!(
"splitting left={} right={}",
new_pages_ids[0], new_pages_ids[1]
);
let mut rbrk = right_page.cell_content_area() as usize;
// drop divider cells and find right pointer
// NOTE: since we are doing a simple split we only finding the pointer we want to update (right pointer).
// Right pointer means cell that points to the last page, as we don't really want to drop this one. This one
// can be a "rightmost pointer" or a "cell".
// TODO(pere): simplify locking...
// we always asumme there is a parent
let parent_rc = mem_page.parent.as_ref().unwrap();
let cells_to_move = page.cell_count() / 2;
let (mut cell_pointer_idx, _) = page.cell_get_raw_pointer_region();
// move half of cells to right page
for cell_idx in cells_to_move..page.cell_count() {
let (start, len) = page.cell_get_raw_region(cell_idx);
// copy data
rbrk -= len;
right_buf[rbrk..rbrk + len].copy_from_slice(&left_buf[start..start + len]);
// set pointer
right_page.write_u16(cell_pointer_idx, rbrk as u16);
cell_pointer_idx += 2;
let parent_ref = self.read_page_sync(parent_rc.page_idx);
let parent = RefCell::borrow_mut(&parent_ref);
parent.set_dirty();
self.pager.add_dirty(parent.id);
let mut parent = parent.contents.write().unwrap();
let parent = parent.as_mut().unwrap();
// if this isn't empty next loop won't work
assert!(parent.overflow_cells.is_empty());
// Right page pointer is u32 in right most pointer, and in cell is u32 too, so we can use a *u32 to hold where we want to change this value
let mut right_pointer = BTREE_HEADER_OFFSET_RIGHTMOST;
for cell_idx in 0..parent.cell_count() {
let cell = parent.cell_get(cell_idx).unwrap();
let found = match cell {
BTreeCell::TableInteriorCell(interior) => {
interior._left_child_page as usize == mem_page.page_idx
}
_ => unreachable!("Parent should always be a "),
};
if found {
let (start, len) = parent.cell_get_raw_region(cell_idx);
right_pointer = start;
break;
}
}
// reset pages
for page in &new_pages {
page.write_u16(BTREE_HEADER_OFFSET_FREEBLOCK, 0);
page.write_u16(BTREE_HEADER_OFFSET_CELL_COUNT, 0);
let db_header = RefCell::borrow(&self.database_header);
let cell_content_area_start =
db_header.page_size - db_header.unused_space as u16;
page.write_u16(BTREE_HEADER_OFFSET_CELL_CONTENT, cell_content_area_start);
page.write_u8(BTREE_HEADER_OFFSET_FRAGMENTED, 0);
page.write_u32(BTREE_HEADER_OFFSET_RIGHTMOST, 0);
}
// distribute cells
let new_pages_len = new_pages.len();
let cells_per_page = scratch_cells.len() / new_pages.len();
let mut current_cell_index = 0_usize;
let mut divider_cells_index = Vec::new(); /* index to scratch cells that will be used as dividers in order */
for (i, page) in new_pages.iter_mut().enumerate() {
let last_page = i == new_pages_len - 1;
let cells_to_copy = if last_page {
// last cells is remaining pages if division was odd
scratch_cells.len() - current_cell_index
} else {
cells_per_page
};
let mut i = 0;
for cell_idx in current_cell_index..current_cell_index + cells_to_copy {
let cell = scratch_cells[cell_idx];
self.insert_into_cell(*page, cell, i);
i += 1;
}
divider_cells_index.push(current_cell_index + cells_to_copy - 1);
current_cell_index += cells_to_copy;
}
// update rightmost pointer for each page if we are in interior page
if !is_leaf {
for page in new_pages.iter_mut().take(new_pages_len - 1) {
assert!(page.cell_count() == 1);
let last_cell = page.cell_get(page.cell_count() - 1).unwrap();
let last_cell_pointer = match last_cell {
BTreeCell::TableInteriorCell(interior) => interior._left_child_page,
_ => unreachable!(),
};
self.drop_cell(*page, page.cell_count() - 1);
page.write_u32(BTREE_HEADER_OFFSET_RIGHTMOST, last_cell_pointer);
}
// last page right most pointer points to previous right most pointer before splitting
let last_page = new_pages.last().unwrap();
last_page
.write_u32(BTREE_HEADER_OFFSET_RIGHTMOST, right_most_pointer.unwrap());
}
// insert dividers in parent
// we can consider dividers the first cell of each page starting from the second page
for (page_id_index, page) in
new_pages.iter_mut().take(new_pages_len - 1).enumerate()
{
assert!(page.cell_count() > 1);
let divider_cell_index = divider_cells_index[page_id_index];
let cell_payload = scratch_cells[divider_cell_index];
let cell = read_btree_cell(cell_payload, &page.page_type(), 0).unwrap();
if is_leaf {
// create a new divider cell and push
let key = match cell {
BTreeCell::TableLeafCell(leaf) => leaf._rowid,
_ => unreachable!(),
};
let mut divider_cell = Vec::new();
divider_cell.extend_from_slice(
&(new_pages_ids[page_id_index] as u32).to_be_bytes(),
);
divider_cell.extend(std::iter::repeat(0).take(9));
let n = write_varint(&mut divider_cell.as_mut_slice()[4..], key);
divider_cell.truncate(4 + n);
let parent_cell_idx = find_cell(parent, key);
self.insert_into_cell(parent, divider_cell.as_slice(), parent_cell_idx);
} else {
// move cell
let key = match cell {
BTreeCell::TableInteriorCell(interior) => interior._rowid,
_ => unreachable!(),
};
let parent_cell_idx = find_cell(page, key);
self.insert_into_cell(parent, cell_payload, parent_cell_idx);
// self.drop_cell(*page, 0);
}
}
{
// copy last page id to right pointer
let last_pointer = *new_pages_ids.last().unwrap() as u32;
parent.write_u32(right_pointer, last_pointer);
}
// update cell count in both pages
let keys_moved = page.cell_count() - cells_to_move;
page.write_u16(BTREE_HEADER_OFFSET_CELL_COUNT, cells_to_move as u16);
right_page.write_u16(BTREE_HEADER_OFFSET_CELL_COUNT, keys_moved as u16);
// update cell content are start
right_page.write_u16(BTREE_HEADER_OFFSET_CELL_CONTENT, rbrk as u16);
}
let last_cell = page.cell_get(page.cell_count() - 1).unwrap();
let last_cell_key = match &last_cell {
BTreeCell::TableLeafCell(cell) => cell._rowid,
BTreeCell::TableInteriorCell(cell) => cell._rowid,
_ => unreachable!(), /* not yet supported index tables */
};
// if not leaf page update rightmost pointer
if let PageType::TableInterior = page.page_type() {
right_page.write_u32(
BTREE_HEADER_OFFSET_RIGHTMOST,
page.rightmost_pointer().unwrap(),
);
// convert last cell to rightmost pointer
let BTreeCell::TableInteriorCell(last_cell) = &last_cell else {
unreachable!();
};
page.write_u32(BTREE_HEADER_OFFSET_RIGHTMOST, last_cell._left_child_page);
// page count now has one less cell because we've added the last one to rightmost pointer
page.write_u16(
BTREE_HEADER_OFFSET_CELL_COUNT,
(page.cell_count() - 1) as u16,
);
}
// update free list block by defragmenting page
self.defragment_page(page, RefCell::borrow(&self.database_header));
// insert into one of the pages
if key < last_cell_key {
let cell_idx = find_cell(page, key);
self.insert_into_cell(page, &payload, cell_idx);
} else {
let cell_idx = find_cell(right_page, key);
self.insert_into_cell(right_page, &payload, cell_idx);
}
// propagate parent split
key = last_cell_key;
right_page_id
};
payload = Vec::new();
if mem_page.page_idx == self.root_page {
/* todo: balance deeper, create child and copy contents of root there. Then split root */
/* if we are in root page then we just need to create a new root and push key there */
let new_root_page_ref = self.allocate_page(PageType::TableInterior);
{
let new_root_page = RefCell::borrow_mut(&new_root_page_ref);
let new_root_page_id = new_root_page.id;
let mut new_root_page_contents = new_root_page.contents.write().unwrap();
let new_root_page_contents = new_root_page_contents.as_mut().unwrap();
/*
Note that we set cell pointer to point to itself, because we will later swap this page's
content with splitted page in order to not update root page idx.
*/
let new_root_page_id = new_root_page_id as u32;
payload.extend_from_slice(&new_root_page_id.to_be_bytes());
payload.extend(std::iter::repeat(0).take(9));
let n = write_varint(&mut payload.as_mut_slice()[4..], key);
payload.truncate(4 + n);
// write left child cell
self.insert_into_cell(new_root_page_contents, &payload, 0);
// write right child cell
new_root_page_contents
.write_u32(BTREE_HEADER_OFFSET_RIGHTMOST, right_page_id as u32);
}
/* swap splitted page buffer with new root buffer so we don't have to update page idx */
{
let mut new_root_page = RefCell::borrow_mut(&new_root_page_ref);
mem::swap(&mut *new_root_page, &mut *page_rc);
// now swap contents
let mut new_root_page_contents = new_root_page.contents.write().unwrap();
let mut page_contents = page_rc.contents.write().unwrap();
std::mem::swap(&mut *new_root_page_contents, &mut *page_contents);
self.page =
RefCell::new(Some(Rc::new(MemPage::new(None, new_root_page.id, 0))));
}
break;
}
// Propagate split divided to top.
payload.extend_from_slice(&(mem_page.page_idx as u32).to_be_bytes());
payload.extend(std::iter::repeat(0).take(9));
let n = write_varint(&mut payload.as_mut_slice()[4..], key);
payload.truncate(n);
self.page = RefCell::new(Some(mem_page.parent.as_ref().unwrap().clone()));
}
}
fn balance_root(&mut self) {
/* todo: balance deeper, create child and copy contents of root there. Then split root */
/* if we are in root page then we just need to create a new root and push key there */
let mem_page = {
let mem_page = self.page.borrow();
let mem_page = mem_page.as_ref().unwrap();
mem_page.clone()
};
let new_root_page_ref = self.allocate_page(PageType::TableInterior);
{
let new_root_page = RefCell::borrow(&new_root_page_ref);
let new_root_page_id = new_root_page.id;
let mut new_root_page_contents = new_root_page.contents.write().unwrap();
let new_root_page_contents = new_root_page_contents.as_mut().unwrap();
// point new root right child to previous root
new_root_page_contents
.write_u32(BTREE_HEADER_OFFSET_RIGHTMOST, new_root_page_id as u32);
new_root_page_contents.write_u16(BTREE_HEADER_OFFSET_CELL_COUNT, 0);
}
/* swap splitted page buffer with new root buffer so we don't have to update page idx */
{
let page_ref = self.read_page_sync(mem_page.page_idx);
let (root_id, child_id) = {
let mut page_rc = RefCell::borrow_mut(&page_ref);
let mut new_root_page = RefCell::borrow_mut(&new_root_page_ref);
// Swap the entire Page structs
std::mem::swap(&mut page_rc.id, &mut new_root_page.id);
self.pager.add_dirty(new_root_page.id);
self.pager.add_dirty(page_rc.id);
(new_root_page.id, page_rc.id)
};
{
let page_rc = RefCell::borrow_mut(&page_ref);
}
let root = new_root_page_ref.clone();
let child = page_ref.clone();
let parent = Some(Rc::new(MemPage::new(None, root_id, 0)));
self.page = RefCell::new(Some(Rc::new(MemPage::new(parent, child_id, 0))));
println!("Balancing root. root={}, rightmost={}", root_id, child_id);
self.pager.put_page(root_id, root);
self.pager.put_page(child_id, child);
}
}
fn read_page_sync(&mut self, page_idx: usize) -> Rc<RefCell<Page>> {
loop {
let page_ref = self.pager.read_page(page_idx);

View File

@@ -6,7 +6,7 @@ use crate::{Buffer, Result};
use log::trace;
use sieve_cache::SieveCache;
use std::cell::RefCell;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::hash::Hash;
use std::ptr::{drop_in_place, NonNull};
use std::rc::Rc;
@@ -267,7 +267,7 @@ pub struct Pager {
buffer_pool: Rc<BufferPool>,
/// I/O interface for input/output operations.
pub io: Arc<dyn crate::io::IO>,
dirty_pages: Rc<RefCell<Vec<Rc<RefCell<Page>>>>>,
dirty_pages: Rc<RefCell<HashSet<usize>>>,
db_header: Rc<RefCell<DatabaseHeader>>,
}
@@ -294,7 +294,7 @@ impl Pager {
buffer_pool,
page_cache,
io,
dirty_pages: Rc::new(RefCell::new(Vec::new())),
dirty_pages: Rc::new(RefCell::new(HashSet::new())),
db_header: db_header_ref.clone(),
})
}
@@ -347,10 +347,10 @@ impl Pager {
self.page_cache.borrow_mut().resize(capacity);
}
pub fn add_dirty(&self, page: Rc<RefCell<Page>>) {
pub fn add_dirty(&self, page_id: usize) {
// TODO: cehck duplicates?
let mut dirty_pages = RefCell::borrow_mut(&self.dirty_pages);
dirty_pages.push(page);
dirty_pages.insert(page_id);
}
pub fn cacheflush(&self) -> Result<()> {
@@ -358,13 +358,12 @@ impl Pager {
if dirty_pages.len() == 0 {
return Ok(());
}
loop {
if dirty_pages.len() == 0 {
break;
}
let page = dirty_pages.pop().unwrap();
for page_id in dirty_pages.iter() {
let mut cache = self.page_cache.borrow_mut();
let page = cache.get(&page_id).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it.");
sqlite3_ondisk::begin_write_btree_page(self, &page)?;
}
dirty_pages.clear();
self.io.run_once()?;
Ok(())
}
@@ -382,7 +381,7 @@ impl Pager {
let first_page_ref = self.read_page(1).unwrap();
let first_page = RefCell::borrow_mut(&first_page_ref);
first_page.set_dirty();
self.add_dirty(first_page_ref.clone());
self.add_dirty(1);
let contents = first_page.contents.write().unwrap();
let contents = contents.as_ref().unwrap();
@@ -392,20 +391,29 @@ impl Pager {
let page_ref = Rc::new(RefCell::new(Page::new(0)));
{
// setup page and add to cache
self.add_dirty(page_ref.clone());
let mut page = RefCell::borrow_mut(&page_ref);
page.set_dirty();
page.id = header.database_size as usize;
page.set_dirty();
self.add_dirty(page.id);
let buffer = self.buffer_pool.get();
let bp = self.buffer_pool.clone();
let drop_fn = Rc::new(move |buf| {
bp.put(buf);
});
let buffer = Rc::new(RefCell::new(Buffer::new(buffer, drop_fn)));
page.contents = RwLock::new(Some(PageContent { offset: 0, buffer }));
page.contents = RwLock::new(Some(PageContent {
offset: 0,
buffer,
overflow_cells: Vec::new(),
}));
let mut cache = RefCell::borrow_mut(&self.page_cache);
cache.insert(page.id, page_ref.clone());
}
Ok(page_ref)
}
pub fn put_page(&self, id: usize, page: Rc<RefCell<Page>>) {
let mut cache = RefCell::borrow_mut(&self.page_cache);
cache.insert(id, page);
}
}

View File

@@ -260,10 +260,17 @@ impl TryFrom<u8> for PageType {
}
}
#[derive(Debug, Clone)]
pub struct OverflowCell {
pub index: usize,
pub payload: Vec<u8>,
}
#[derive(Debug)]
pub struct PageContent {
pub offset: usize,
pub buffer: Rc<RefCell<Buffer>>,
pub overflow_cells: Vec<OverflowCell>,
}
impl Clone for PageContent {
@@ -271,6 +278,7 @@ impl Clone for PageContent {
Self {
offset: self.offset,
buffer: Rc::new(RefCell::new((*self.buffer.borrow()).clone())),
overflow_cells: self.overflow_cells.clone(),
}
}
}
@@ -294,7 +302,7 @@ impl PageContent {
buf[pos]
}
fn read_u16(&self, pos: usize) -> u16 {
pub fn read_u16(&self, pos: usize) -> u16 {
let buf = self.as_ptr();
u16::from_be_bytes([buf[self.offset + pos], buf[self.offset + pos + 1]])
}
@@ -376,6 +384,7 @@ impl PageContent {
(self.offset + cell_start, self.cell_count() * 2)
}
/* Get region of a cell's payload */
pub fn cell_get_raw_region(&self, idx: usize) -> (usize, usize) {
let buf = self.as_ptr();
let ncells = self.cell_count();
@@ -465,6 +474,7 @@ fn finish_read_page(
let inner = PageContent {
offset: pos,
buffer: buffer_ref.clone(),
overflow_cells: Vec::new(),
};
{
let page = page.borrow_mut();