core: fix split cells to right page

This commit is contained in:
Pere Diaz Bou
2024-07-31 13:03:16 +02:00
parent 68e7a062a4
commit 836aa6ee07
3 changed files with 75 additions and 82 deletions

View File

@@ -219,24 +219,27 @@ impl BTreeCursor {
_record: &OwnedRecord,
) -> Result<CursorResult<()>> {
let page_ref = self.get_page()?;
let page = RefCell::borrow(&page_ref);
if page.is_locked() {
return Ok(CursorResult::IO);
}
page.set_dirty();
self.pager.add_dirty(page_ref.clone());
let mut page = page.contents.write().unwrap();
let page = page.as_mut().unwrap();
assert!(matches!(page.page_type(), PageType::TableLeaf));
// find cell
let int_key = match key {
OwnedValue::Integer(i) => *i as u64,
_ => unreachable!("btree tables are indexed by integers!"),
};
let cell_idx = find_cell(page, int_key);
let cell_idx = {
let page = RefCell::borrow(&page_ref);
if page.is_locked() {
return Ok(CursorResult::IO);
}
page.set_dirty();
self.pager.add_dirty(page_ref.clone());
let mut page = page.contents.write().unwrap();
let page = page.as_mut().unwrap();
assert!(matches!(page.page_type(), PageType::TableLeaf));
// find cell
find_cell(page, int_key)
};
// TODO: if overwrite drop cell
@@ -275,7 +278,12 @@ impl BTreeCursor {
let db_header = RefCell::borrow(&self.database_header);
(db_header.page_size - db_header.unused_space as u16) as usize
};
let free = self.compute_free_space(page, RefCell::borrow(&self.database_header));
let free = {
let page = RefCell::borrow(&page_ref);
let mut page = page.contents.write().unwrap();
let page = page.as_mut().unwrap();
self.compute_free_space(page, RefCell::borrow(&self.database_header))
};
assert!(
payload.len() <= usable_space - 100, /* 100 bytes minus for precaution to remember */
"need to implemented overflow pages, too big to even add to a an empty page"
@@ -285,6 +293,10 @@ impl BTreeCursor {
self.balance_leaf(int_key, payload);
} else {
// insert
let page = RefCell::borrow(&page_ref);
let mut page = page.contents.write().unwrap();
let page = page.as_mut().unwrap();
self.insert_into_cell(page, &payload, cell_idx);
}
@@ -293,10 +305,6 @@ impl BTreeCursor {
/* insert to postion and shift other pointers */
fn insert_into_cell(&mut self, page: &mut PageContent, payload: &Vec<u8>, cell_idx: usize) {
assert!(
page.is_leaf() || (!page.is_leaf() && cell_idx < page.cell_count()),
"if it's greater it might mean we need to insert in a rightmost pointer?"
);
// TODO: insert into cell payload in internal page
let pc = self.allocate_cell_space(page, payload.len() as u16);
let mut buf_ref = RefCell::borrow_mut(&page.buffer);
@@ -305,22 +313,25 @@ impl BTreeCursor {
// copy data
buf[pc as usize..pc as usize + payload.len()].copy_from_slice(&payload);
// memmove(pIns+2, pIns, 2*(pPage->nCell - i));
let pointer_area_pc_by_idx = page.offset + 8 + 2 * cell_idx;
let (pointer_area_pc_by_idx, _) = page.cell_get_raw_pointer_region();
let pointer_area_pc_by_idx = pointer_area_pc_by_idx + (2 * cell_idx);
// move previous pointers forward and insert new pointer there
let n_cells_forward = 2 * (page.cell_count() - cell_idx);
buf.copy_within(
pointer_area_pc_by_idx..pointer_area_pc_by_idx + n_cells_forward,
pointer_area_pc_by_idx + 2,
);
buf[pointer_area_pc_by_idx..pointer_area_pc_by_idx + 2].copy_from_slice(&pc.to_be_bytes());
if n_cells_forward > 0 {
buf.copy_within(
pointer_area_pc_by_idx..pointer_area_pc_by_idx + n_cells_forward,
pointer_area_pc_by_idx + 2,
);
}
page.write_u16(pointer_area_pc_by_idx, pc);
// update first byte of content area
buf[5..7].copy_from_slice(&pc.to_be_bytes());
page.write_u16(BTREE_HEADER_OFFSET_CELL_CONTENT, pc);
// update cell count
let new_n_cells = (page.cell_count() + 1) as u16;
buf[3..5].copy_from_slice(&new_n_cells.to_be_bytes());
page.write_u16(BTREE_HEADER_OFFSET_CELL_COUNT, new_n_cells);
}
fn get_page(&mut self) -> crate::Result<Rc<RefCell<Page>>> {
@@ -382,45 +393,26 @@ impl BTreeCursor {
let mut rbrk = right_page.cell_content_area() as usize;
let cells_to_move = page.cell_count() / 2;
let (mut cell_pointer_idx, _) = page.cell_get_raw_pointer_region();
// move half of cells to right page
for cell_idx in 0..page.cell_count() {
let (start, len) = page.cell_get_raw_region(cell_idx);
for cell_idx in cells_to_move..page.cell_count() {
let (start, len) = page.cell_get_raw_region_borrowed(cell_idx, left_buf);
// copy data
rbrk -= len;
right_buf[rbrk..rbrk + len].copy_from_slice(&left_buf[start..start + len]);
// set pointer
right_page.write_u16(cell_pointer_idx, rbrk as u16);
cell_pointer_idx += 2;
}
// move half of keys to right page
let (src_pointers_start, src_pointers_len) = page.cell_get_raw_pointer_region();
assert!(page.cell_count() >= 2);
let keys_to_move_start = page.cell_count() / 2;
let (dst_pointers_start, _) = right_page.cell_get_raw_pointer_region();
/*
Copy half
count = 8
k-v = 2 bytes
keys_to_move_start
V
-------------------------------------------------
| 0k-v | 1k-v | 2k-v | 3k-v | 4k-v | 5k-v | 7k-v |
-------------------------------------------------
*/
let pointer_data_to_move = (page.cell_count() - keys_to_move_start - 1) * 2;
right_buf[dst_pointers_start + pointer_data_to_move
..dst_pointers_start + pointer_data_to_move]
.copy_from_slice(
&left_buf[src_pointers_start + pointer_data_to_move
..src_pointers_start + (pointer_data_to_move * 2)],
);
// update cell count in both pages
let keys_moved = page.cell_count() - keys_to_move_start + 1;
page.write_u16(
BTREE_HEADER_OFFSET_CELL_COUNT,
(page.cell_count() - keys_moved) as u16,
);
let keys_moved = page.cell_count() - cells_to_move;
page.write_u16(BTREE_HEADER_OFFSET_CELL_COUNT, cells_to_move as u16);
right_page.write_u16(BTREE_HEADER_OFFSET_CELL_COUNT, keys_moved as u16);
// update cell content are start
right_page.write_u16(BTREE_HEADER_OFFSET_CELL_CONTENT, rbrk as u16);
}
let last_cell = page.cell_get(page.cell_count() - 1).unwrap();
let last_cell_key = match &last_cell {
BTreeCell::TableLeafCell(cell) => cell._rowid,
@@ -462,23 +454,23 @@ impl BTreeCursor {
payload = Vec::new();
if mem_page.page_idx == self.root_page {
/* todo: balance deeper, create child and copy contents of root there. Then split root */
/* if we are in root page then we just need to create a new root and push key there */
let new_root_page_ref = self.allocate_page(PageType::TableInterior);
let mut new_root_page = RefCell::borrow_mut(&new_root_page_ref);
let new_root_page_id = new_root_page.id;
new_root_page.set_dirty();
self.pager.add_dirty(new_root_page_ref.clone());
{
let new_root_page = RefCell::borrow_mut(&new_root_page_ref);
let new_root_page_id = new_root_page.id;
let mut new_root_page_contents = new_root_page.contents.write().unwrap();
let new_root_page_contents = new_root_page_contents.as_mut().unwrap();
/*
Note that we set cell pointer to point to itself, because we will later swap this page's
content with splitted page in order to not update root page idx.
*/
let new_root_page_id = new_root_page_id as u32;
payload.extend_from_slice(&(new_root_page_id as u32).to_be_bytes());
payload.extend(std::iter::repeat(0).take(9));
let n = write_varint(&mut payload.as_mut_slice()[0..9], key as u64);
payload.truncate(n);
let n = write_varint(&mut payload.as_mut_slice()[4..], key as u64);
payload.truncate(4 + n);
// write left child cell
self.insert_into_cell(new_root_page_contents, &payload, 0);
@@ -490,20 +482,17 @@ impl BTreeCursor {
/* swap splitted page buffer with new root buffer so we don't have to update page idx */
{
let mut new_root_page_contents = new_root_page.contents.write().unwrap();
let new_root_page_contents = new_root_page_contents.as_mut().unwrap();
let root_buf = new_root_page_contents.buffer.as_ptr();
let root_buf = unsafe { (*root_buf).as_mut_slice() };
let mut page = page_rc.contents.write().unwrap();
let page = page.as_mut().unwrap();
let mut left_buf = RefCell::borrow_mut(&page.buffer);
let left_buf: &mut [u8] = left_buf.as_mut_slice();
let mut new_root_page = RefCell::borrow_mut(&new_root_page_ref);
mem::swap(&mut *new_root_page, &mut *page_rc);
left_buf.swap_with_slice(root_buf);
// now swap contents
let mut new_root_page_contents = new_root_page.contents.write().unwrap();
let mut page_contents = page_rc.contents.write().unwrap();
std::mem::swap(&mut *new_root_page_contents, &mut *page_contents);
self.page =
RefCell::new(Some(Rc::new(MemPage::new(None, new_root_page.id, 0))));
}
// swap in memory state of pages
mem::swap(&mut page_rc.id, &mut new_root_page.id);
self.page = RefCell::new(Some(Rc::new(MemPage::new(None, new_root_page.id, 0))));
break;
}

View File

@@ -14,7 +14,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
pub struct Page {
flags: AtomicUsize,
pub flags: AtomicUsize,
pub contents: RwLock<Option<PageContent>>,
pub id: usize,
}

View File

@@ -232,12 +232,11 @@ impl Clone for PageContent {
impl PageContent {
pub fn page_type(&self) -> PageType {
let buf = self.buffer.borrow();
let buf = buf.as_slice();
buf[self.offset].try_into().unwrap()
self.read_u8(self.offset).try_into().unwrap()
}
fn read_u8(&self, pos: usize) -> u8 {
// unsafe trick to borrow twice
unsafe {
let buf_pointer = &self.buffer.as_ptr();
let buf = (*buf_pointer).as_ref().unwrap().as_slice();
@@ -344,9 +343,12 @@ impl PageContent {
}
pub fn cell_get_raw_region(&self, idx: usize) -> (usize, usize) {
let buf = self.buffer.borrow();
let buf = buf.as_slice();
let mut buf = self.buffer.borrow_mut();
let buf = buf.as_mut_slice();
self.cell_get_raw_region_borrowed(idx, buf)
}
pub fn cell_get_raw_region_borrowed(&self, idx: usize, buf: &mut [u8]) -> (usize, usize) {
let ncells = self.cell_count();
let cell_start = match self.page_type() {
PageType::IndexInterior => 12,
@@ -374,7 +376,8 @@ impl PageContent {
PageType::TableLeaf => {
let (len_payload, n_payload) = read_varint(&buf[cell_pointer..]).unwrap();
let (_, n_rowid) = read_varint(&buf[cell_pointer + n_payload..]).unwrap();
len_payload as usize + n_payload + n_rowid + 4
// TODO: add overflow 4 bytes
len_payload as usize + n_payload + n_rowid
}
};
(start, len)
@@ -448,6 +451,7 @@ pub fn begin_write_btree_page(pager: &Pager, page: &Rc<RefCell<Page>>) -> Result
let page_source = &pager.page_source;
let page_finish = page.clone();
let page = page.borrow();
let contents = page.contents.read().unwrap();
let contents = contents.as_ref().unwrap();
let buffer = contents.buffer.clone();