mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-01 07:24:19 +01:00
Merge 'core: core btree splitting' from Pere Diaz Bou
This pr adds a new crate to increase expressivity of tests with complex things like btree node splitting. To run tests: ```bash cd core_tester cargo test test_sequential_write -- --nocapture ``` This prs improves btree balancing with simple page splitting and some minor refactors. Closes #316
This commit is contained in:
14
Cargo.lock
generated
14
Cargo.lock
generated
@@ -388,6 +388,20 @@ version = "0.8.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f"
|
||||
|
||||
[[package]]
|
||||
name = "core_tester"
|
||||
version = "0.0.4"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"clap",
|
||||
"dirs",
|
||||
"env_logger 0.10.2",
|
||||
"limbo_core",
|
||||
"rstest",
|
||||
"rusqlite",
|
||||
"rustyline",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cpp_demangle"
|
||||
version = "0.4.3"
|
||||
|
||||
20
Cargo.toml
20
Cargo.toml
@@ -3,12 +3,13 @@
|
||||
[workspace]
|
||||
resolver = "2"
|
||||
members = [
|
||||
"bindings/python",
|
||||
"bindings/wasm",
|
||||
"cli",
|
||||
"sqlite3",
|
||||
"core",
|
||||
"simulator",
|
||||
"bindings/python",
|
||||
"bindings/wasm",
|
||||
"cli",
|
||||
"sqlite3",
|
||||
"core",
|
||||
"simulator",
|
||||
"test",
|
||||
]
|
||||
exclude = ["perf/latency/limbo"]
|
||||
|
||||
@@ -28,7 +29,12 @@ ci = "github"
|
||||
# The installers to generate for each app
|
||||
installers = ["shell", "powershell"]
|
||||
# Target platforms to build apps for (Rust target-triple syntax)
|
||||
targets = ["aarch64-apple-darwin", "x86_64-apple-darwin", "x86_64-unknown-linux-gnu", "x86_64-pc-windows-msvc"]
|
||||
targets = [
|
||||
"aarch64-apple-darwin",
|
||||
"x86_64-apple-darwin",
|
||||
"x86_64-unknown-linux-gnu",
|
||||
"x86_64-pc-windows-msvc",
|
||||
]
|
||||
# Which actions to run on pull requests
|
||||
pr-run-mode = "plan"
|
||||
# Path that installers should place binaries in
|
||||
|
||||
@@ -1,15 +1,19 @@
|
||||
use log::trace;
|
||||
|
||||
use crate::storage::pager::{Page, Pager};
|
||||
use crate::storage::sqlite3_ondisk::{
|
||||
read_varint, write_varint, BTreeCell, DatabaseHeader, PageContent, PageType, TableInteriorCell,
|
||||
TableLeafCell,
|
||||
read_btree_cell, read_varint, write_varint, BTreeCell, DatabaseHeader, PageContent, PageType,
|
||||
TableInteriorCell, TableLeafCell,
|
||||
};
|
||||
use crate::types::{Cursor, CursorResult, OwnedRecord, OwnedValue};
|
||||
use crate::Result;
|
||||
|
||||
use std::cell::{Ref, RefCell};
|
||||
use std::mem;
|
||||
use std::mem::swap;
|
||||
use std::rc::Rc;
|
||||
|
||||
use super::sqlite3_ondisk::OverflowCell;
|
||||
|
||||
/*
|
||||
These are offsets of fields in the header of a b-tree page.
|
||||
*/
|
||||
@@ -86,11 +90,7 @@ impl BTreeCursor {
|
||||
|
||||
fn get_next_record(&mut self) -> Result<CursorResult<(Option<u64>, Option<OwnedRecord>)>> {
|
||||
loop {
|
||||
let mem_page = {
|
||||
let mem_page = self.page.borrow();
|
||||
let mem_page = mem_page.as_ref().unwrap();
|
||||
mem_page.clone()
|
||||
};
|
||||
let mem_page = self.get_mem_page();
|
||||
let page_idx = mem_page.page_idx;
|
||||
let page = self.pager.read_page(page_idx)?;
|
||||
let page = RefCell::borrow(&page);
|
||||
@@ -99,6 +99,7 @@ impl BTreeCursor {
|
||||
}
|
||||
let page = page.contents.read().unwrap();
|
||||
let page = page.as_ref().unwrap();
|
||||
|
||||
if mem_page.cell_idx() >= page.cell_count() {
|
||||
let parent = mem_page.parent.clone();
|
||||
match page.rightmost_pointer() {
|
||||
@@ -155,11 +156,7 @@ impl BTreeCursor {
|
||||
) -> Result<CursorResult<(Option<u64>, Option<OwnedRecord>)>> {
|
||||
self.move_to(rowid)?;
|
||||
|
||||
let mem_page = {
|
||||
let mem_page = self.page.borrow();
|
||||
let mem_page = mem_page.as_ref().unwrap();
|
||||
mem_page.clone()
|
||||
};
|
||||
let mem_page = self.get_mem_page();
|
||||
|
||||
let page_idx = mem_page.page_idx;
|
||||
let page = self.pager.read_page(page_idx)?;
|
||||
@@ -259,17 +256,14 @@ impl BTreeCursor {
|
||||
self.move_to_root();
|
||||
|
||||
loop {
|
||||
let mem_page = {
|
||||
let mem_page = self.page.borrow();
|
||||
let mem_page = mem_page.as_ref().unwrap();
|
||||
mem_page.clone()
|
||||
};
|
||||
let mem_page = self.get_mem_page();
|
||||
let page_idx = mem_page.page_idx;
|
||||
let page = self.pager.read_page(page_idx)?;
|
||||
let page = RefCell::borrow(&page);
|
||||
if page.is_locked() {
|
||||
return Ok(CursorResult::IO);
|
||||
}
|
||||
|
||||
let page = page.contents.read().unwrap();
|
||||
let page = page.as_ref().unwrap();
|
||||
if page.is_leaf() {
|
||||
@@ -344,7 +338,7 @@ impl BTreeCursor {
|
||||
}
|
||||
|
||||
page.set_dirty();
|
||||
self.pager.add_dirty(page_ref.clone());
|
||||
self.pager.add_dirty(page.id);
|
||||
|
||||
let mut page = page.contents.write().unwrap();
|
||||
let page = page.as_mut().unwrap();
|
||||
@@ -357,7 +351,7 @@ impl BTreeCursor {
|
||||
// TODO: if overwrite drop cell
|
||||
|
||||
// insert cell
|
||||
let mut payload: Vec<u8> = Vec::new();
|
||||
let mut cell_payload: Vec<u8> = Vec::new();
|
||||
|
||||
{
|
||||
// Data len will be prepended later
|
||||
@@ -367,13 +361,13 @@ impl BTreeCursor {
|
||||
let n = write_varint(&mut key_varint.as_mut_slice()[0..9], int_key);
|
||||
write_varint(&mut key_varint, int_key);
|
||||
key_varint.truncate(n);
|
||||
payload.extend_from_slice(&key_varint);
|
||||
cell_payload.extend_from_slice(&key_varint);
|
||||
}
|
||||
|
||||
// Data payload
|
||||
let payload_size_before_record = payload.len();
|
||||
_record.serialize(&mut payload);
|
||||
let header_size = payload.len() - payload_size_before_record;
|
||||
let payload_size_before_record = cell_payload.len();
|
||||
_record.serialize(&mut cell_payload);
|
||||
let header_size = cell_payload.len() - payload_size_before_record;
|
||||
|
||||
{
|
||||
// Data len
|
||||
@@ -384,40 +378,48 @@ impl BTreeCursor {
|
||||
header_size as u64,
|
||||
);
|
||||
data_len_varint.truncate(n);
|
||||
payload.splice(0..0, data_len_varint.iter().cloned());
|
||||
cell_payload.splice(0..0, data_len_varint.iter().cloned());
|
||||
}
|
||||
|
||||
let usable_space = {
|
||||
let db_header = RefCell::borrow(&self.database_header);
|
||||
(db_header.page_size - db_header.unused_space as u16) as usize
|
||||
};
|
||||
let free = {
|
||||
let page = RefCell::borrow(&page_ref);
|
||||
let mut page = page.contents.write().unwrap();
|
||||
let page = page.as_mut().unwrap();
|
||||
self.compute_free_space(page, RefCell::borrow(&self.database_header))
|
||||
};
|
||||
assert!(
|
||||
payload.len() <= usable_space - 100, /* 100 bytes minus for precaution to remember */
|
||||
cell_payload.len() <= usable_space - 100, /* 100 bytes minus for precaution to remember */
|
||||
"need to implemented overflow pages, too big to even add to a an empty page"
|
||||
);
|
||||
if payload.len() + 2 > free as usize {
|
||||
// overflow or balance
|
||||
self.balance_leaf(int_key, payload);
|
||||
} else {
|
||||
// insert
|
||||
|
||||
// insert
|
||||
let overflow = {
|
||||
let page = RefCell::borrow(&page_ref);
|
||||
|
||||
let mut page = page.contents.write().unwrap();
|
||||
let page = page.as_mut().unwrap();
|
||||
self.insert_into_cell(page, &payload, cell_idx);
|
||||
self.insert_into_cell(page, &cell_payload.as_slice(), cell_idx);
|
||||
page.overflow_cells.len()
|
||||
};
|
||||
|
||||
if overflow > 0 {
|
||||
self.balance_leaf();
|
||||
}
|
||||
|
||||
Ok(CursorResult::Ok(()))
|
||||
}
|
||||
|
||||
/* insert to postion and shift other pointers */
|
||||
fn insert_into_cell(&mut self, page: &mut PageContent, payload: &Vec<u8>, cell_idx: usize) {
|
||||
fn insert_into_cell(&mut self, page: &mut PageContent, payload: &[u8], cell_idx: usize) {
|
||||
let free = self.compute_free_space(page, RefCell::borrow(&self.database_header));
|
||||
let enough_space = payload.len() + 2 <= free as usize;
|
||||
if !enough_space {
|
||||
// add to overflow cell
|
||||
page.overflow_cells.push(OverflowCell {
|
||||
index: cell_idx,
|
||||
payload: Vec::from(payload),
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
// TODO: insert into cell payload in internal page
|
||||
let pc = self.allocate_cell_space(page, payload.len() as u16);
|
||||
let buf = page.as_ptr();
|
||||
@@ -446,6 +448,70 @@ impl BTreeCursor {
|
||||
page.write_u16(BTREE_HEADER_OFFSET_CELL_COUNT, new_n_cells);
|
||||
}
|
||||
|
||||
fn free_cell_range(&mut self, page: &mut PageContent, offset: u16, len: u16) {
|
||||
if page.first_freeblock() == 0 {
|
||||
// insert into empty list
|
||||
page.write_u16(offset as usize, 0);
|
||||
page.write_u16(offset as usize + 2, len as u16);
|
||||
page.write_u16(BTREE_HEADER_OFFSET_FREEBLOCK, offset as u16);
|
||||
return;
|
||||
}
|
||||
let first_block = page.first_freeblock();
|
||||
|
||||
if offset < first_block {
|
||||
// insert into head of list
|
||||
page.write_u16(offset as usize, first_block);
|
||||
page.write_u16(offset as usize + 2, len as u16);
|
||||
page.write_u16(BTREE_HEADER_OFFSET_FREEBLOCK, offset as u16);
|
||||
return;
|
||||
}
|
||||
|
||||
if offset <= page.cell_content_area() {
|
||||
// extend boundary of content area
|
||||
page.write_u16(BTREE_HEADER_OFFSET_FREEBLOCK, page.first_freeblock());
|
||||
page.write_u16(BTREE_HEADER_OFFSET_CELL_CONTENT, offset + len);
|
||||
return;
|
||||
}
|
||||
|
||||
let maxpc = {
|
||||
let db_header = self.database_header.borrow();
|
||||
let usable_space = (db_header.page_size - db_header.unused_space as u16) as usize;
|
||||
usable_space as u16
|
||||
};
|
||||
|
||||
let mut pc = first_block;
|
||||
let mut prev = first_block;
|
||||
|
||||
while pc <= maxpc && pc < offset as u16 {
|
||||
let next = page.read_u16(pc as usize);
|
||||
prev = pc;
|
||||
pc = next;
|
||||
}
|
||||
|
||||
if pc >= maxpc {
|
||||
// insert into tail
|
||||
let offset = offset as usize;
|
||||
let prev = prev as usize;
|
||||
page.write_u16(prev, offset as u16);
|
||||
page.write_u16(offset, 0);
|
||||
page.write_u16(offset + 2, len);
|
||||
} else {
|
||||
// insert in between
|
||||
let next = page.read_u16(pc as usize);
|
||||
let offset = offset as usize;
|
||||
let prev = prev as usize;
|
||||
page.write_u16(prev, offset as u16);
|
||||
page.write_u16(offset, next);
|
||||
page.write_u16(offset + 2, len);
|
||||
}
|
||||
}
|
||||
|
||||
fn drop_cell(&mut self, page: &mut PageContent, cell_idx: usize) {
|
||||
let (cell_start, cell_len) = page.cell_get_raw_region(cell_idx);
|
||||
self.free_cell_range(page, cell_start as u16, cell_len as u16);
|
||||
page.write_u16(BTREE_HEADER_OFFSET_CELL_COUNT, page.cell_count() as u16 - 1);
|
||||
}
|
||||
|
||||
fn get_page(&mut self) -> crate::Result<Rc<RefCell<Page>>> {
|
||||
let mem_page = {
|
||||
let mem_page = self.page.borrow();
|
||||
@@ -457,26 +523,69 @@ impl BTreeCursor {
|
||||
Ok(page_ref)
|
||||
}
|
||||
|
||||
fn balance_leaf(&mut self, key: u64, payload: Vec<u8>) {
|
||||
fn balance_leaf(&mut self) {
|
||||
// This is a naive algorithm that doesn't try to distribute cells evenly by content.
|
||||
// It will try to split the page in half by keys not by content.
|
||||
// Sqlite tries to have a page at least 40% full.
|
||||
let mut key = key;
|
||||
let mut payload = payload;
|
||||
loop {
|
||||
let mem_page = {
|
||||
let mem_page = self.page.borrow();
|
||||
let mem_page = mem_page.as_ref().unwrap();
|
||||
mem_page.clone()
|
||||
};
|
||||
let page_ref = self.read_page_sync(mem_page.page_idx);
|
||||
let mut page_rc = RefCell::borrow_mut(&page_ref);
|
||||
|
||||
let right_page_id = {
|
||||
{
|
||||
// check if we don't need to balance
|
||||
let page_ref = self.read_page_sync(mem_page.page_idx);
|
||||
let page_rc = RefCell::borrow(&page_ref);
|
||||
|
||||
{
|
||||
// don't continue if there are no overflow cells
|
||||
let mut page = page_rc.contents.write().unwrap();
|
||||
let page = page.as_mut().unwrap();
|
||||
if page.overflow_cells.is_empty() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
trace!("Balancing leaf. leaf={}", mem_page.page_idx);
|
||||
if mem_page.parent.is_none() {
|
||||
self.balance_root();
|
||||
continue;
|
||||
}
|
||||
|
||||
let page_ref = self.read_page_sync(mem_page.page_idx);
|
||||
let page_rc = RefCell::borrow(&page_ref);
|
||||
|
||||
// Copy of page used to reference cell bytes.
|
||||
let page_copy = {
|
||||
let mut page = page_rc.contents.write().unwrap();
|
||||
let page = page.as_mut().unwrap();
|
||||
page.clone()
|
||||
};
|
||||
|
||||
// In memory in order copy of all cells in pages we want to balance. For now let's do a 2 page split.
|
||||
// Right pointer in interior cells should be converted to regular cells if more than 2 pages are used for balancing.
|
||||
let (scratch_cells, right_most_pointer) = {
|
||||
let mut scratch_cells: Vec<&[u8]> = Vec::new();
|
||||
|
||||
for cell_idx in 0..page_copy.cell_count() {
|
||||
let (start, len) = page_copy.cell_get_raw_region(cell_idx);
|
||||
let buf = page_copy.as_ptr();
|
||||
scratch_cells.push(&buf[start..start + len]);
|
||||
}
|
||||
for overflow_cell in &page_copy.overflow_cells {
|
||||
scratch_cells.insert(overflow_cell.index, &overflow_cell.payload);
|
||||
}
|
||||
(scratch_cells, page_copy.rightmost_pointer())
|
||||
};
|
||||
|
||||
// allocate new pages and move cells to those new pages
|
||||
{
|
||||
// split procedure
|
||||
let mut page = page_rc.contents.write().unwrap();
|
||||
let page = page.as_mut().unwrap();
|
||||
let free = self.compute_free_space(page, RefCell::borrow(&self.database_header));
|
||||
assert!(
|
||||
matches!(
|
||||
page.page_type(),
|
||||
@@ -484,11 +593,6 @@ impl BTreeCursor {
|
||||
),
|
||||
"indexes still not supported "
|
||||
);
|
||||
if payload.len() + 2 <= free as usize {
|
||||
let cell_idx = find_cell(page, key);
|
||||
self.insert_into_cell(page, &payload, cell_idx);
|
||||
break;
|
||||
}
|
||||
|
||||
let right_page_ref = self.allocate_page(page.page_type());
|
||||
let right_page = RefCell::borrow_mut(&right_page_ref);
|
||||
@@ -496,127 +600,201 @@ impl BTreeCursor {
|
||||
let mut right_page = right_page.contents.write().unwrap();
|
||||
let right_page = right_page.as_mut().unwrap();
|
||||
{
|
||||
// move data from one buffer to another
|
||||
// done in a separate block to satisfy borrow checker
|
||||
let left_buf: &mut [u8] = page.as_ptr();
|
||||
let right_buf: &mut [u8] = right_page.as_ptr();
|
||||
let is_leaf = page.is_leaf();
|
||||
let mut new_pages = vec![page, right_page];
|
||||
let new_pages_ids = vec![mem_page.page_idx, right_page_id];
|
||||
trace!(
|
||||
"splitting left={} right={}",
|
||||
new_pages_ids[0],
|
||||
new_pages_ids[1]
|
||||
);
|
||||
|
||||
let mut rbrk = right_page.cell_content_area() as usize;
|
||||
// drop divider cells and find right pointer
|
||||
// NOTE: since we are doing a simple split we only finding the pointer we want to update (right pointer).
|
||||
// Right pointer means cell that points to the last page, as we don't really want to drop this one. This one
|
||||
// can be a "rightmost pointer" or a "cell".
|
||||
// TODO(pere): simplify locking...
|
||||
// we always asumme there is a parent
|
||||
let parent_rc = mem_page.parent.as_ref().unwrap();
|
||||
|
||||
let cells_to_move = page.cell_count() / 2;
|
||||
let (mut cell_pointer_idx, _) = page.cell_get_raw_pointer_region();
|
||||
// move half of cells to right page
|
||||
for cell_idx in cells_to_move..page.cell_count() {
|
||||
let (start, len) = page.cell_get_raw_region(cell_idx);
|
||||
// copy data
|
||||
rbrk -= len;
|
||||
right_buf[rbrk..rbrk + len].copy_from_slice(&left_buf[start..start + len]);
|
||||
// set pointer
|
||||
right_page.write_u16(cell_pointer_idx, rbrk as u16);
|
||||
cell_pointer_idx += 2;
|
||||
let parent_ref = self.read_page_sync(parent_rc.page_idx);
|
||||
let parent = RefCell::borrow_mut(&parent_ref);
|
||||
parent.set_dirty();
|
||||
self.pager.add_dirty(parent.id);
|
||||
let mut parent = parent.contents.write().unwrap();
|
||||
let parent = parent.as_mut().unwrap();
|
||||
// if this isn't empty next loop won't work
|
||||
assert!(parent.overflow_cells.is_empty());
|
||||
|
||||
// Right page pointer is u32 in right most pointer, and in cell is u32 too, so we can use a *u32 to hold where we want to change this value
|
||||
let mut right_pointer = BTREE_HEADER_OFFSET_RIGHTMOST;
|
||||
for cell_idx in 0..parent.cell_count() {
|
||||
let cell = parent.cell_get(cell_idx).unwrap();
|
||||
let found = match cell {
|
||||
BTreeCell::TableInteriorCell(interior) => {
|
||||
interior._left_child_page as usize == mem_page.page_idx
|
||||
}
|
||||
_ => unreachable!("Parent should always be a "),
|
||||
};
|
||||
if found {
|
||||
let (start, len) = parent.cell_get_raw_region(cell_idx);
|
||||
right_pointer = start;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// reset pages
|
||||
for page in &new_pages {
|
||||
page.write_u16(BTREE_HEADER_OFFSET_FREEBLOCK, 0);
|
||||
page.write_u16(BTREE_HEADER_OFFSET_CELL_COUNT, 0);
|
||||
|
||||
let db_header = RefCell::borrow(&self.database_header);
|
||||
let cell_content_area_start =
|
||||
db_header.page_size - db_header.unused_space as u16;
|
||||
page.write_u16(BTREE_HEADER_OFFSET_CELL_CONTENT, cell_content_area_start);
|
||||
|
||||
page.write_u8(BTREE_HEADER_OFFSET_FRAGMENTED, 0);
|
||||
page.write_u32(BTREE_HEADER_OFFSET_RIGHTMOST, 0);
|
||||
}
|
||||
|
||||
// distribute cells
|
||||
let new_pages_len = new_pages.len();
|
||||
let cells_per_page = scratch_cells.len() / new_pages.len();
|
||||
let mut current_cell_index = 0_usize;
|
||||
let mut divider_cells_index = Vec::new(); /* index to scratch cells that will be used as dividers in order */
|
||||
|
||||
for (i, page) in new_pages.iter_mut().enumerate() {
|
||||
let last_page = i == new_pages_len - 1;
|
||||
let cells_to_copy = if last_page {
|
||||
// last cells is remaining pages if division was odd
|
||||
scratch_cells.len() - current_cell_index
|
||||
} else {
|
||||
cells_per_page
|
||||
};
|
||||
|
||||
let mut i = 0;
|
||||
for cell_idx in current_cell_index..current_cell_index + cells_to_copy {
|
||||
let cell = scratch_cells[cell_idx];
|
||||
self.insert_into_cell(*page, cell, i);
|
||||
i += 1;
|
||||
}
|
||||
divider_cells_index.push(current_cell_index + cells_to_copy - 1);
|
||||
current_cell_index += cells_to_copy;
|
||||
}
|
||||
|
||||
// update rightmost pointer for each page if we are in interior page
|
||||
if !is_leaf {
|
||||
for page in new_pages.iter_mut().take(new_pages_len - 1) {
|
||||
assert!(page.cell_count() == 1);
|
||||
let last_cell = page.cell_get(page.cell_count() - 1).unwrap();
|
||||
let last_cell_pointer = match last_cell {
|
||||
BTreeCell::TableInteriorCell(interior) => interior._left_child_page,
|
||||
_ => unreachable!(),
|
||||
};
|
||||
self.drop_cell(*page, page.cell_count() - 1);
|
||||
page.write_u32(BTREE_HEADER_OFFSET_RIGHTMOST, last_cell_pointer);
|
||||
}
|
||||
// last page right most pointer points to previous right most pointer before splitting
|
||||
let last_page = new_pages.last().unwrap();
|
||||
last_page
|
||||
.write_u32(BTREE_HEADER_OFFSET_RIGHTMOST, right_most_pointer.unwrap());
|
||||
}
|
||||
|
||||
// insert dividers in parent
|
||||
// we can consider dividers the first cell of each page starting from the second page
|
||||
for (page_id_index, page) in
|
||||
new_pages.iter_mut().take(new_pages_len - 1).enumerate()
|
||||
{
|
||||
assert!(page.cell_count() > 1);
|
||||
let divider_cell_index = divider_cells_index[page_id_index];
|
||||
let cell_payload = scratch_cells[divider_cell_index];
|
||||
let cell = read_btree_cell(cell_payload, &page.page_type(), 0).unwrap();
|
||||
if is_leaf {
|
||||
// create a new divider cell and push
|
||||
let key = match cell {
|
||||
BTreeCell::TableLeafCell(leaf) => leaf._rowid,
|
||||
_ => unreachable!(),
|
||||
};
|
||||
let mut divider_cell = Vec::new();
|
||||
divider_cell.extend_from_slice(
|
||||
&(new_pages_ids[page_id_index] as u32).to_be_bytes(),
|
||||
);
|
||||
divider_cell.extend(std::iter::repeat(0).take(9));
|
||||
let n = write_varint(&mut divider_cell.as_mut_slice()[4..], key);
|
||||
divider_cell.truncate(4 + n);
|
||||
let parent_cell_idx = find_cell(parent, key);
|
||||
self.insert_into_cell(parent, divider_cell.as_slice(), parent_cell_idx);
|
||||
} else {
|
||||
// move cell
|
||||
let key = match cell {
|
||||
BTreeCell::TableInteriorCell(interior) => interior._rowid,
|
||||
_ => unreachable!(),
|
||||
};
|
||||
let parent_cell_idx = find_cell(page, key);
|
||||
self.insert_into_cell(parent, cell_payload, parent_cell_idx);
|
||||
// self.drop_cell(*page, 0);
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
// copy last page id to right pointer
|
||||
let last_pointer = *new_pages_ids.last().unwrap() as u32;
|
||||
parent.write_u32(right_pointer, last_pointer);
|
||||
}
|
||||
// update cell count in both pages
|
||||
let keys_moved = page.cell_count() - cells_to_move;
|
||||
page.write_u16(BTREE_HEADER_OFFSET_CELL_COUNT, cells_to_move as u16);
|
||||
right_page.write_u16(BTREE_HEADER_OFFSET_CELL_COUNT, keys_moved as u16);
|
||||
// update cell content are start
|
||||
right_page.write_u16(BTREE_HEADER_OFFSET_CELL_CONTENT, rbrk as u16);
|
||||
}
|
||||
|
||||
let last_cell = page.cell_get(page.cell_count() - 1).unwrap();
|
||||
let last_cell_key = match &last_cell {
|
||||
BTreeCell::TableLeafCell(cell) => cell._rowid,
|
||||
BTreeCell::TableInteriorCell(cell) => cell._rowid,
|
||||
_ => unreachable!(), /* not yet supported index tables */
|
||||
};
|
||||
// if not leaf page update rightmost pointer
|
||||
if let PageType::TableInterior = page.page_type() {
|
||||
right_page.write_u32(
|
||||
BTREE_HEADER_OFFSET_RIGHTMOST,
|
||||
page.rightmost_pointer().unwrap(),
|
||||
);
|
||||
// convert last cell to rightmost pointer
|
||||
let BTreeCell::TableInteriorCell(last_cell) = &last_cell else {
|
||||
unreachable!();
|
||||
};
|
||||
page.write_u32(BTREE_HEADER_OFFSET_RIGHTMOST, last_cell._left_child_page);
|
||||
// page count now has one less cell because we've added the last one to rightmost pointer
|
||||
page.write_u16(
|
||||
BTREE_HEADER_OFFSET_CELL_COUNT,
|
||||
(page.cell_count() - 1) as u16,
|
||||
);
|
||||
}
|
||||
|
||||
// update free list block by defragmenting page
|
||||
self.defragment_page(page, RefCell::borrow(&self.database_header));
|
||||
// insert into one of the pages
|
||||
if key < last_cell_key {
|
||||
let cell_idx = find_cell(page, key);
|
||||
self.insert_into_cell(page, &payload, cell_idx);
|
||||
} else {
|
||||
let cell_idx = find_cell(right_page, key);
|
||||
self.insert_into_cell(right_page, &payload, cell_idx);
|
||||
}
|
||||
// propagate parent split
|
||||
key = last_cell_key;
|
||||
right_page_id
|
||||
};
|
||||
|
||||
payload = Vec::new();
|
||||
if mem_page.page_idx == self.root_page {
|
||||
/* todo: balance deeper, create child and copy contents of root there. Then split root */
|
||||
/* if we are in root page then we just need to create a new root and push key there */
|
||||
let new_root_page_ref = self.allocate_page(PageType::TableInterior);
|
||||
{
|
||||
let new_root_page = RefCell::borrow_mut(&new_root_page_ref);
|
||||
let new_root_page_id = new_root_page.id;
|
||||
let mut new_root_page_contents = new_root_page.contents.write().unwrap();
|
||||
let new_root_page_contents = new_root_page_contents.as_mut().unwrap();
|
||||
/*
|
||||
Note that we set cell pointer to point to itself, because we will later swap this page's
|
||||
content with splitted page in order to not update root page idx.
|
||||
*/
|
||||
let new_root_page_id = new_root_page_id as u32;
|
||||
payload.extend_from_slice(&new_root_page_id.to_be_bytes());
|
||||
payload.extend(std::iter::repeat(0).take(9));
|
||||
let n = write_varint(&mut payload.as_mut_slice()[4..], key);
|
||||
payload.truncate(4 + n);
|
||||
|
||||
// write left child cell
|
||||
self.insert_into_cell(new_root_page_contents, &payload, 0);
|
||||
|
||||
// write right child cell
|
||||
new_root_page_contents
|
||||
.write_u32(BTREE_HEADER_OFFSET_RIGHTMOST, right_page_id as u32);
|
||||
}
|
||||
|
||||
/* swap splitted page buffer with new root buffer so we don't have to update page idx */
|
||||
{
|
||||
let mut new_root_page = RefCell::borrow_mut(&new_root_page_ref);
|
||||
mem::swap(&mut *new_root_page, &mut *page_rc);
|
||||
|
||||
// now swap contents
|
||||
let mut new_root_page_contents = new_root_page.contents.write().unwrap();
|
||||
let mut page_contents = page_rc.contents.write().unwrap();
|
||||
std::mem::swap(&mut *new_root_page_contents, &mut *page_contents);
|
||||
|
||||
self.page =
|
||||
RefCell::new(Some(Rc::new(MemPage::new(None, new_root_page.id, 0))));
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
// Propagate split divided to top.
|
||||
payload.extend_from_slice(&(mem_page.page_idx as u32).to_be_bytes());
|
||||
payload.extend(std::iter::repeat(0).take(9));
|
||||
let n = write_varint(&mut payload.as_mut_slice()[4..], key);
|
||||
payload.truncate(n);
|
||||
|
||||
self.page = RefCell::new(Some(mem_page.parent.as_ref().unwrap().clone()));
|
||||
}
|
||||
}
|
||||
|
||||
fn balance_root(&mut self) {
|
||||
/* todo: balance deeper, create child and copy contents of root there. Then split root */
|
||||
/* if we are in root page then we just need to create a new root and push key there */
|
||||
let mem_page = {
|
||||
let mem_page = self.page.borrow();
|
||||
let mem_page = mem_page.as_ref().unwrap();
|
||||
mem_page.clone()
|
||||
};
|
||||
|
||||
let new_root_page_ref = self.allocate_page(PageType::TableInterior);
|
||||
{
|
||||
let new_root_page = RefCell::borrow(&new_root_page_ref);
|
||||
let new_root_page_id = new_root_page.id;
|
||||
let mut new_root_page_contents = new_root_page.contents.write().unwrap();
|
||||
let new_root_page_contents = new_root_page_contents.as_mut().unwrap();
|
||||
// point new root right child to previous root
|
||||
new_root_page_contents
|
||||
.write_u32(BTREE_HEADER_OFFSET_RIGHTMOST, new_root_page_id as u32);
|
||||
new_root_page_contents.write_u16(BTREE_HEADER_OFFSET_CELL_COUNT, 0);
|
||||
}
|
||||
|
||||
/* swap splitted page buffer with new root buffer so we don't have to update page idx */
|
||||
{
|
||||
let page_ref = self.read_page_sync(mem_page.page_idx);
|
||||
let (root_id, child_id) = {
|
||||
let mut page_rc = RefCell::borrow_mut(&page_ref);
|
||||
let mut new_root_page = RefCell::borrow_mut(&new_root_page_ref);
|
||||
|
||||
// Swap the entire Page structs
|
||||
std::mem::swap(&mut page_rc.id, &mut new_root_page.id);
|
||||
|
||||
self.pager.add_dirty(new_root_page.id);
|
||||
self.pager.add_dirty(page_rc.id);
|
||||
(new_root_page.id, page_rc.id)
|
||||
};
|
||||
|
||||
let root = new_root_page_ref.clone();
|
||||
let child = page_ref.clone();
|
||||
|
||||
let parent = Some(Rc::new(MemPage::new(None, root_id, 0)));
|
||||
self.page = RefCell::new(Some(Rc::new(MemPage::new(parent, child_id, 0))));
|
||||
trace!("Balancing root. root={}, rightmost={}", root_id, child_id);
|
||||
self.pager.put_page(root_id, root);
|
||||
self.pager.put_page(child_id, child);
|
||||
}
|
||||
}
|
||||
|
||||
fn read_page_sync(&mut self, page_idx: usize) -> Rc<RefCell<Page>> {
|
||||
loop {
|
||||
let page_ref = self.pager.read_page(page_idx);
|
||||
@@ -836,6 +1014,12 @@ impl BTreeCursor {
|
||||
nfree -= first_cell as usize;
|
||||
nfree as u16
|
||||
}
|
||||
|
||||
fn get_mem_page(&self) -> Rc<MemPage> {
|
||||
let mem_page = self.page.borrow();
|
||||
let mem_page = mem_page.as_ref().unwrap();
|
||||
mem_page.clone()
|
||||
}
|
||||
}
|
||||
|
||||
fn find_free_cell(page_ref: &PageContent, db_header: Ref<DatabaseHeader>, amount: usize) -> usize {
|
||||
|
||||
@@ -6,7 +6,7 @@ use crate::{Buffer, Result};
|
||||
use log::trace;
|
||||
use sieve_cache::SieveCache;
|
||||
use std::cell::RefCell;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::hash::Hash;
|
||||
use std::ptr::{drop_in_place, NonNull};
|
||||
use std::rc::Rc;
|
||||
@@ -267,7 +267,7 @@ pub struct Pager {
|
||||
buffer_pool: Rc<BufferPool>,
|
||||
/// I/O interface for input/output operations.
|
||||
pub io: Arc<dyn crate::io::IO>,
|
||||
dirty_pages: Rc<RefCell<Vec<Rc<RefCell<Page>>>>>,
|
||||
dirty_pages: Rc<RefCell<HashSet<usize>>>,
|
||||
db_header: Rc<RefCell<DatabaseHeader>>,
|
||||
}
|
||||
|
||||
@@ -294,7 +294,7 @@ impl Pager {
|
||||
buffer_pool,
|
||||
page_cache,
|
||||
io,
|
||||
dirty_pages: Rc::new(RefCell::new(Vec::new())),
|
||||
dirty_pages: Rc::new(RefCell::new(HashSet::new())),
|
||||
db_header: db_header_ref.clone(),
|
||||
})
|
||||
}
|
||||
@@ -347,10 +347,10 @@ impl Pager {
|
||||
self.page_cache.borrow_mut().resize(capacity);
|
||||
}
|
||||
|
||||
pub fn add_dirty(&self, page: Rc<RefCell<Page>>) {
|
||||
pub fn add_dirty(&self, page_id: usize) {
|
||||
// TODO: cehck duplicates?
|
||||
let mut dirty_pages = RefCell::borrow_mut(&self.dirty_pages);
|
||||
dirty_pages.push(page);
|
||||
dirty_pages.insert(page_id);
|
||||
}
|
||||
|
||||
pub fn cacheflush(&self) -> Result<()> {
|
||||
@@ -358,13 +358,12 @@ impl Pager {
|
||||
if dirty_pages.len() == 0 {
|
||||
return Ok(());
|
||||
}
|
||||
loop {
|
||||
if dirty_pages.len() == 0 {
|
||||
break;
|
||||
}
|
||||
let page = dirty_pages.pop().unwrap();
|
||||
for page_id in dirty_pages.iter() {
|
||||
let mut cache = self.page_cache.borrow_mut();
|
||||
let page = cache.get(&page_id).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it.");
|
||||
sqlite3_ondisk::begin_write_btree_page(self, &page)?;
|
||||
}
|
||||
dirty_pages.clear();
|
||||
self.io.run_once()?;
|
||||
Ok(())
|
||||
}
|
||||
@@ -382,7 +381,7 @@ impl Pager {
|
||||
let first_page_ref = self.read_page(1).unwrap();
|
||||
let first_page = RefCell::borrow_mut(&first_page_ref);
|
||||
first_page.set_dirty();
|
||||
self.add_dirty(first_page_ref.clone());
|
||||
self.add_dirty(1);
|
||||
|
||||
let contents = first_page.contents.write().unwrap();
|
||||
let contents = contents.as_ref().unwrap();
|
||||
@@ -392,20 +391,29 @@ impl Pager {
|
||||
let page_ref = Rc::new(RefCell::new(Page::new(0)));
|
||||
{
|
||||
// setup page and add to cache
|
||||
self.add_dirty(page_ref.clone());
|
||||
let mut page = RefCell::borrow_mut(&page_ref);
|
||||
page.set_dirty();
|
||||
page.id = header.database_size as usize;
|
||||
page.set_dirty();
|
||||
self.add_dirty(page.id);
|
||||
let buffer = self.buffer_pool.get();
|
||||
let bp = self.buffer_pool.clone();
|
||||
let drop_fn = Rc::new(move |buf| {
|
||||
bp.put(buf);
|
||||
});
|
||||
let buffer = Rc::new(RefCell::new(Buffer::new(buffer, drop_fn)));
|
||||
page.contents = RwLock::new(Some(PageContent { offset: 0, buffer }));
|
||||
page.contents = RwLock::new(Some(PageContent {
|
||||
offset: 0,
|
||||
buffer,
|
||||
overflow_cells: Vec::new(),
|
||||
}));
|
||||
let mut cache = RefCell::borrow_mut(&self.page_cache);
|
||||
cache.insert(page.id, page_ref.clone());
|
||||
}
|
||||
Ok(page_ref)
|
||||
}
|
||||
|
||||
pub fn put_page(&self, id: usize, page: Rc<RefCell<Page>>) {
|
||||
let mut cache = RefCell::borrow_mut(&self.page_cache);
|
||||
cache.insert(id, page);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -260,10 +260,17 @@ impl TryFrom<u8> for PageType {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct OverflowCell {
|
||||
pub index: usize,
|
||||
pub payload: Vec<u8>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PageContent {
|
||||
pub offset: usize,
|
||||
pub buffer: Rc<RefCell<Buffer>>,
|
||||
pub overflow_cells: Vec<OverflowCell>,
|
||||
}
|
||||
|
||||
impl Clone for PageContent {
|
||||
@@ -271,6 +278,7 @@ impl Clone for PageContent {
|
||||
Self {
|
||||
offset: self.offset,
|
||||
buffer: Rc::new(RefCell::new((*self.buffer.borrow()).clone())),
|
||||
overflow_cells: self.overflow_cells.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -294,7 +302,7 @@ impl PageContent {
|
||||
buf[pos]
|
||||
}
|
||||
|
||||
fn read_u16(&self, pos: usize) -> u16 {
|
||||
pub fn read_u16(&self, pos: usize) -> u16 {
|
||||
let buf = self.as_ptr();
|
||||
u16::from_be_bytes([buf[self.offset + pos], buf[self.offset + pos + 1]])
|
||||
}
|
||||
@@ -376,6 +384,7 @@ impl PageContent {
|
||||
(self.offset + cell_start, self.cell_count() * 2)
|
||||
}
|
||||
|
||||
/* Get region of a cell's payload */
|
||||
pub fn cell_get_raw_region(&self, idx: usize) -> (usize, usize) {
|
||||
let buf = self.as_ptr();
|
||||
let ncells = self.cell_count();
|
||||
@@ -465,6 +474,7 @@ fn finish_read_page(
|
||||
let inner = PageContent {
|
||||
offset: pos,
|
||||
buffer: buffer_ref.clone(),
|
||||
overflow_cells: Vec::new(),
|
||||
};
|
||||
{
|
||||
let page = page.borrow_mut();
|
||||
|
||||
25
test/Cargo.toml
Normal file
25
test/Cargo.toml
Normal file
@@ -0,0 +1,25 @@
|
||||
[package]
|
||||
name = "core_tester"
|
||||
version.workspace = true
|
||||
authors.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
repository.workspace = true
|
||||
description = "Internal tester of write path"
|
||||
|
||||
[lib]
|
||||
name = "test"
|
||||
path = "src/lib.rs"
|
||||
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0.75"
|
||||
clap = { version = "4.4.0", features = ["derive"] }
|
||||
dirs = "5.0.1"
|
||||
env_logger = "0.10.1"
|
||||
limbo_core = { path = "../core" }
|
||||
rustyline = "12.0.0"
|
||||
rusqlite = "0.29.0"
|
||||
|
||||
[dev-dependencies]
|
||||
rstest = "0.18.2"
|
||||
5
test/README.md
Normal file
5
test/README.md
Normal file
@@ -0,0 +1,5 @@
|
||||
Currently the best way to run these tests are like this due to long running tests:
|
||||
|
||||
```bash
|
||||
cargo test test_sequential_write -- --nocapture
|
||||
```
|
||||
80
test/src/lib.rs
Normal file
80
test/src/lib.rs
Normal file
@@ -0,0 +1,80 @@
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use limbo_core::{Database, RowResult, Value};
|
||||
use rusqlite::Connection;
|
||||
use std::env::current_dir;
|
||||
use std::sync::Arc;
|
||||
#[test]
|
||||
fn test_sequential_write() -> anyhow::Result<()> {
|
||||
env_logger::init();
|
||||
let path = "test.db";
|
||||
|
||||
let io: Arc<dyn limbo_core::IO> = Arc::new(limbo_core::PlatformIO::new()?);
|
||||
dbg!(path);
|
||||
let mut path = current_dir()?;
|
||||
path.push("test.db");
|
||||
{
|
||||
if path.exists() {
|
||||
std::fs::remove_file(&path)?;
|
||||
}
|
||||
let connection = Connection::open(&path)?;
|
||||
connection.execute("CREATE TABLE test (x INTEGER PRIMARY KEY);", ())?;
|
||||
}
|
||||
|
||||
let db = Database::open_file(io.clone(), path.to_str().unwrap())?;
|
||||
let conn = db.connect();
|
||||
|
||||
let list_query = "SELECT * FROM test";
|
||||
let max_iterations = 10000;
|
||||
for i in 0..max_iterations {
|
||||
if (i % 100) == 0 {
|
||||
let progress = (i as f64 / max_iterations as f64) * 100.0;
|
||||
println!("progress {:.1}%", progress);
|
||||
}
|
||||
let insert_query = format!("INSERT INTO test VALUES ({})", i);
|
||||
match conn.query(insert_query) {
|
||||
Ok(Some(ref mut rows)) => loop {
|
||||
match rows.next_row()? {
|
||||
RowResult::IO => {
|
||||
io.run_once()?;
|
||||
}
|
||||
RowResult::Done => break,
|
||||
_ => unreachable!(),
|
||||
}
|
||||
},
|
||||
Ok(None) => {}
|
||||
Err(err) => {
|
||||
eprintln!("{}", err);
|
||||
}
|
||||
};
|
||||
|
||||
let mut current_read_index = 0;
|
||||
match conn.query(list_query) {
|
||||
Ok(Some(ref mut rows)) => loop {
|
||||
match rows.next_row()? {
|
||||
RowResult::Row(row) => {
|
||||
let first_value = row.values.first().expect("missing id");
|
||||
let id = match first_value {
|
||||
Value::Integer(i) => *i as i32,
|
||||
Value::Float(f) => *f as i32,
|
||||
_ => unreachable!(),
|
||||
};
|
||||
assert_eq!(current_read_index, id);
|
||||
current_read_index += 1;
|
||||
}
|
||||
RowResult::IO => {
|
||||
io.run_once()?;
|
||||
}
|
||||
RowResult::Done => break,
|
||||
}
|
||||
},
|
||||
Ok(None) => {}
|
||||
Err(err) => {
|
||||
eprintln!("{}", err);
|
||||
}
|
||||
}
|
||||
conn.cacheflush()?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user