mirror of
https://github.com/aljazceru/turso.git
synced 2025-12-26 04:24:21 +01:00
Merge 'core,btree: support overflow pages' from Pere Diaz Bou
This PR adds support for traversing overflow payloads and inserting them too. The test added is a simple one and must be tested with higher degree of stress in the future. Closes #322
This commit is contained in:
@@ -9,10 +9,9 @@ use crate::types::{Cursor, CursorResult, OwnedRecord, OwnedValue};
|
||||
use crate::Result;
|
||||
|
||||
use std::cell::{Ref, RefCell};
|
||||
use std::mem::swap;
|
||||
use std::rc::Rc;
|
||||
|
||||
use super::sqlite3_ondisk::OverflowCell;
|
||||
use super::sqlite3_ondisk::{write_varint_to_vec, OverflowCell};
|
||||
|
||||
/*
|
||||
These are offsets of fields in the header of a b-tree page.
|
||||
@@ -119,7 +118,13 @@ impl BTreeCursor {
|
||||
},
|
||||
}
|
||||
}
|
||||
let cell = page.cell_get(mem_page.cell_idx())?;
|
||||
let cell = page.cell_get(
|
||||
mem_page.cell_idx(),
|
||||
self.pager.clone(),
|
||||
self.max_local(page.page_type()),
|
||||
self.min_local(page.page_type()),
|
||||
self.usable_space(),
|
||||
)?;
|
||||
match &cell {
|
||||
BTreeCell::TableInteriorCell(TableInteriorCell {
|
||||
_left_child_page,
|
||||
@@ -168,7 +173,13 @@ impl BTreeCursor {
|
||||
let page = page.as_ref().unwrap();
|
||||
|
||||
for cell_idx in 0..page.cell_count() {
|
||||
match &page.cell_get(cell_idx)? {
|
||||
match &page.cell_get(
|
||||
cell_idx,
|
||||
self.pager.clone(),
|
||||
self.max_local(page.page_type()),
|
||||
self.min_local(page.page_type()),
|
||||
self.usable_space(),
|
||||
)? {
|
||||
BTreeCell::TableLeafCell(TableLeafCell {
|
||||
_rowid: cell_rowid,
|
||||
_payload: p,
|
||||
@@ -272,7 +283,13 @@ impl BTreeCursor {
|
||||
|
||||
let mut found_cell = false;
|
||||
for cell_idx in 0..page.cell_count() {
|
||||
match &page.cell_get(cell_idx)? {
|
||||
match &page.cell_get(
|
||||
cell_idx,
|
||||
self.pager.clone(),
|
||||
self.max_local(page.page_type()),
|
||||
self.min_local(page.page_type()),
|
||||
self.usable_space(),
|
||||
)? {
|
||||
BTreeCell::TableInteriorCell(TableInteriorCell {
|
||||
_left_child_page,
|
||||
_rowid,
|
||||
@@ -323,7 +340,7 @@ impl BTreeCursor {
|
||||
fn insert_to_page(
|
||||
&mut self,
|
||||
key: &OwnedValue,
|
||||
_record: &OwnedRecord,
|
||||
record: &OwnedRecord,
|
||||
) -> Result<CursorResult<()>> {
|
||||
let page_ref = self.get_page()?;
|
||||
let int_key = match key {
|
||||
@@ -331,7 +348,7 @@ impl BTreeCursor {
|
||||
_ => unreachable!("btree tables are indexed by integers!"),
|
||||
};
|
||||
|
||||
let cell_idx = {
|
||||
let (cell_idx, page_type) = {
|
||||
let page = RefCell::borrow(&page_ref);
|
||||
if page.is_locked() {
|
||||
return Ok(CursorResult::IO);
|
||||
@@ -345,50 +362,15 @@ impl BTreeCursor {
|
||||
assert!(matches!(page.page_type(), PageType::TableLeaf));
|
||||
|
||||
// find cell
|
||||
find_cell(page, int_key)
|
||||
(self.find_cell(page, int_key), page.page_type())
|
||||
};
|
||||
|
||||
// TODO: if overwrite drop cell
|
||||
|
||||
// insert cell
|
||||
|
||||
let mut cell_payload: Vec<u8> = Vec::new();
|
||||
|
||||
{
|
||||
// Data len will be prepended later
|
||||
// Key
|
||||
let mut key_varint: Vec<u8> = Vec::new();
|
||||
key_varint.extend(std::iter::repeat(0).take(9));
|
||||
let n = write_varint(&mut key_varint.as_mut_slice()[0..9], int_key);
|
||||
write_varint(&mut key_varint, int_key);
|
||||
key_varint.truncate(n);
|
||||
cell_payload.extend_from_slice(&key_varint);
|
||||
}
|
||||
|
||||
// Data payload
|
||||
let payload_size_before_record = cell_payload.len();
|
||||
_record.serialize(&mut cell_payload);
|
||||
let header_size = cell_payload.len() - payload_size_before_record;
|
||||
|
||||
{
|
||||
// Data len
|
||||
let mut data_len_varint: Vec<u8> = Vec::new();
|
||||
data_len_varint.extend(std::iter::repeat(0).take(9));
|
||||
let n = write_varint(
|
||||
&mut data_len_varint.as_mut_slice()[0..9],
|
||||
header_size as u64,
|
||||
);
|
||||
data_len_varint.truncate(n);
|
||||
cell_payload.splice(0..0, data_len_varint.iter().cloned());
|
||||
}
|
||||
|
||||
let usable_space = {
|
||||
let db_header = RefCell::borrow(&self.database_header);
|
||||
(db_header.page_size - db_header.unused_space as u16) as usize
|
||||
};
|
||||
assert!(
|
||||
cell_payload.len() <= usable_space - 100, /* 100 bytes minus for precaution to remember */
|
||||
"need to implemented overflow pages, too big to even add to a an empty page"
|
||||
);
|
||||
self.fill_cell_payload(page_type, Some(int_key), &mut cell_payload, record);
|
||||
|
||||
// insert
|
||||
let overflow = {
|
||||
@@ -507,7 +489,12 @@ impl BTreeCursor {
|
||||
}
|
||||
|
||||
fn drop_cell(&mut self, page: &mut PageContent, cell_idx: usize) {
|
||||
let (cell_start, cell_len) = page.cell_get_raw_region(cell_idx);
|
||||
let (cell_start, cell_len) = page.cell_get_raw_region(
|
||||
cell_idx,
|
||||
self.max_local(page.page_type()),
|
||||
self.min_local(page.page_type()),
|
||||
self.usable_space(),
|
||||
);
|
||||
self.free_cell_range(page, cell_start as u16, cell_len as u16);
|
||||
page.write_u16(BTREE_HEADER_OFFSET_CELL_COUNT, page.cell_count() as u16 - 1);
|
||||
}
|
||||
@@ -571,7 +558,12 @@ impl BTreeCursor {
|
||||
let mut scratch_cells: Vec<&[u8]> = Vec::new();
|
||||
|
||||
for cell_idx in 0..page_copy.cell_count() {
|
||||
let (start, len) = page_copy.cell_get_raw_region(cell_idx);
|
||||
let (start, len) = page_copy.cell_get_raw_region(
|
||||
cell_idx,
|
||||
self.max_local(page_copy.page_type()),
|
||||
self.min_local(page_copy.page_type()),
|
||||
self.usable_space(),
|
||||
);
|
||||
let buf = page_copy.as_ptr();
|
||||
scratch_cells.push(&buf[start..start + len]);
|
||||
}
|
||||
@@ -601,6 +593,7 @@ impl BTreeCursor {
|
||||
let right_page = right_page.as_mut().unwrap();
|
||||
{
|
||||
let is_leaf = page.is_leaf();
|
||||
let page_type = page.page_type();
|
||||
let mut new_pages = vec![page, right_page];
|
||||
let new_pages_ids = vec![mem_page.page_idx, right_page_id];
|
||||
trace!(
|
||||
@@ -629,7 +622,15 @@ impl BTreeCursor {
|
||||
// Right page pointer is u32 in right most pointer, and in cell is u32 too, so we can use a *u32 to hold where we want to change this value
|
||||
let mut right_pointer = BTREE_HEADER_OFFSET_RIGHTMOST;
|
||||
for cell_idx in 0..parent.cell_count() {
|
||||
let cell = parent.cell_get(cell_idx).unwrap();
|
||||
let cell = parent
|
||||
.cell_get(
|
||||
cell_idx,
|
||||
self.pager.clone(),
|
||||
self.max_local(page_type.clone()),
|
||||
self.min_local(page_type.clone()),
|
||||
self.usable_space(),
|
||||
)
|
||||
.unwrap();
|
||||
let found = match cell {
|
||||
BTreeCell::TableInteriorCell(interior) => {
|
||||
interior._left_child_page as usize == mem_page.page_idx
|
||||
@@ -637,7 +638,12 @@ impl BTreeCursor {
|
||||
_ => unreachable!("Parent should always be a "),
|
||||
};
|
||||
if found {
|
||||
let (start, len) = parent.cell_get_raw_region(cell_idx);
|
||||
let (start, len) = parent.cell_get_raw_region(
|
||||
cell_idx,
|
||||
self.max_local(page_type.clone()),
|
||||
self.min_local(page_type.clone()),
|
||||
self.usable_space(),
|
||||
);
|
||||
right_pointer = start;
|
||||
break;
|
||||
}
|
||||
@@ -686,7 +692,15 @@ impl BTreeCursor {
|
||||
if !is_leaf {
|
||||
for page in new_pages.iter_mut().take(new_pages_len - 1) {
|
||||
assert!(page.cell_count() == 1);
|
||||
let last_cell = page.cell_get(page.cell_count() - 1).unwrap();
|
||||
let last_cell = page
|
||||
.cell_get(
|
||||
page.cell_count() - 1,
|
||||
self.pager.clone(),
|
||||
self.max_local(page.page_type()),
|
||||
self.min_local(page.page_type()),
|
||||
self.usable_space(),
|
||||
)
|
||||
.unwrap();
|
||||
let last_cell_pointer = match last_cell {
|
||||
BTreeCell::TableInteriorCell(interior) => interior._left_child_page,
|
||||
_ => unreachable!(),
|
||||
@@ -708,7 +722,17 @@ impl BTreeCursor {
|
||||
assert!(page.cell_count() > 1);
|
||||
let divider_cell_index = divider_cells_index[page_id_index];
|
||||
let cell_payload = scratch_cells[divider_cell_index];
|
||||
let cell = read_btree_cell(cell_payload, &page.page_type(), 0).unwrap();
|
||||
let cell = read_btree_cell(
|
||||
cell_payload,
|
||||
&page.page_type(),
|
||||
0,
|
||||
self.pager.clone(),
|
||||
self.max_local(page.page_type()),
|
||||
self.min_local(page.page_type()),
|
||||
self.usable_space(),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
if is_leaf {
|
||||
// create a new divider cell and push
|
||||
let key = match cell {
|
||||
@@ -722,7 +746,7 @@ impl BTreeCursor {
|
||||
divider_cell.extend(std::iter::repeat(0).take(9));
|
||||
let n = write_varint(&mut divider_cell.as_mut_slice()[4..], key);
|
||||
divider_cell.truncate(4 + n);
|
||||
let parent_cell_idx = find_cell(parent, key);
|
||||
let parent_cell_idx = self.find_cell(parent, key);
|
||||
self.insert_into_cell(parent, divider_cell.as_slice(), parent_cell_idx);
|
||||
} else {
|
||||
// move cell
|
||||
@@ -730,7 +754,7 @@ impl BTreeCursor {
|
||||
BTreeCell::TableInteriorCell(interior) => interior._rowid,
|
||||
_ => unreachable!(),
|
||||
};
|
||||
let parent_cell_idx = find_cell(page, key);
|
||||
let parent_cell_idx = self.find_cell(page, key);
|
||||
self.insert_into_cell(parent, cell_payload, parent_cell_idx);
|
||||
// self.drop_cell(*page, 0);
|
||||
}
|
||||
@@ -828,6 +852,21 @@ impl BTreeCursor {
|
||||
page
|
||||
}
|
||||
|
||||
fn allocate_overflow_page(&self) -> Rc<RefCell<Page>> {
|
||||
let page = self.pager.allocate_page().unwrap();
|
||||
|
||||
{
|
||||
// setup overflow page
|
||||
let contents = RefCell::borrow(&page);
|
||||
let mut contents = contents.contents.write().unwrap();
|
||||
let contents = contents.as_mut().unwrap();
|
||||
let buf = contents.as_ptr();
|
||||
buf.fill(0);
|
||||
}
|
||||
|
||||
page
|
||||
}
|
||||
|
||||
/*
|
||||
Allocate space for a cell on a page.
|
||||
*/
|
||||
@@ -1020,6 +1059,145 @@ impl BTreeCursor {
|
||||
let mem_page = mem_page.as_ref().unwrap();
|
||||
mem_page.clone()
|
||||
}
|
||||
|
||||
fn fill_cell_payload(
|
||||
&self,
|
||||
page_type: PageType,
|
||||
int_key: Option<u64>,
|
||||
cell_payload: &mut Vec<u8>,
|
||||
record: &OwnedRecord,
|
||||
) {
|
||||
assert!(matches!(
|
||||
page_type,
|
||||
PageType::TableLeaf | PageType::IndexLeaf
|
||||
));
|
||||
// TODO: make record raw from start, having to serialize is not good
|
||||
let mut record_buf = Vec::new();
|
||||
record.serialize(&mut record_buf);
|
||||
|
||||
// fill in header
|
||||
if matches!(page_type, PageType::TableLeaf) {
|
||||
let int_key = int_key.unwrap();
|
||||
write_varint_to_vec(record_buf.len() as u64, cell_payload);
|
||||
write_varint_to_vec(int_key, cell_payload);
|
||||
} else {
|
||||
write_varint_to_vec(record_buf.len() as u64, cell_payload);
|
||||
}
|
||||
|
||||
let max_local = self.max_local(page_type.clone());
|
||||
if record_buf.len() <= max_local {
|
||||
// enough allowed space to fit inside a btree page
|
||||
cell_payload.extend_from_slice(record_buf.as_slice());
|
||||
cell_payload.resize(cell_payload.len() + 4, 0);
|
||||
return;
|
||||
}
|
||||
|
||||
let min_local = self.min_local(page_type);
|
||||
let mut space_left = min_local + (record_buf.len() - min_local) % (self.usable_space() - 4);
|
||||
|
||||
if space_left > max_local {
|
||||
space_left = min_local;
|
||||
}
|
||||
|
||||
// cell_size must be equal to first value of space_left as this will be the bytes copied to non-overflow page.
|
||||
let cell_size = space_left + cell_payload.len() + 4; // 4 is the number of bytes of pointer to first overflow page
|
||||
let mut to_copy_buffer = record_buf.as_slice();
|
||||
|
||||
let prev_size = cell_payload.len();
|
||||
cell_payload.resize(prev_size + space_left + 4, 0);
|
||||
let mut pointer = unsafe { cell_payload.as_mut_ptr().add(prev_size) };
|
||||
let mut pointer_to_next = unsafe { cell_payload.as_mut_ptr().add(prev_size + space_left) };
|
||||
let mut overflow_pages = Vec::new();
|
||||
|
||||
loop {
|
||||
let to_copy = space_left.min(to_copy_buffer.len());
|
||||
unsafe { std::ptr::copy(to_copy_buffer.as_ptr(), pointer, to_copy) };
|
||||
|
||||
let left = to_copy_buffer.len() - to_copy;
|
||||
if left == 0 {
|
||||
break;
|
||||
}
|
||||
|
||||
// we still have bytes to add, we will need to allocate new overflow page
|
||||
let overflow_page = self.allocate_overflow_page();
|
||||
overflow_pages.push(overflow_page.clone());
|
||||
{
|
||||
let page = overflow_page.borrow();
|
||||
let mut contents_lock = page.contents.write().unwrap();
|
||||
let contents = contents_lock.as_mut().unwrap();
|
||||
|
||||
let buf = contents.as_ptr();
|
||||
let id = page.id as u32;
|
||||
let as_bytes = id.to_be_bytes();
|
||||
// update pointer to new overflow page
|
||||
unsafe { std::ptr::copy(as_bytes.as_ptr(), pointer_to_next, 4) };
|
||||
|
||||
pointer = unsafe { buf.as_mut_ptr().add(4) };
|
||||
pointer_to_next = buf.as_mut_ptr();
|
||||
space_left = self.usable_space() - 4;
|
||||
}
|
||||
|
||||
to_copy_buffer = &to_copy_buffer[to_copy..];
|
||||
}
|
||||
|
||||
assert_eq!(cell_size, cell_payload.len());
|
||||
}
|
||||
|
||||
fn max_local(&self, page_type: PageType) -> usize {
|
||||
let usable_space = self.usable_space();
|
||||
match page_type {
|
||||
PageType::IndexInterior | PageType::TableInterior => {
|
||||
(usable_space - 12) * 64 / 255 - 23
|
||||
}
|
||||
PageType::IndexLeaf | PageType::TableLeaf => usable_space - 35,
|
||||
}
|
||||
}
|
||||
|
||||
fn min_local(&self, page_type: PageType) -> usize {
|
||||
let usable_space = self.usable_space();
|
||||
match page_type {
|
||||
PageType::IndexInterior | PageType::TableInterior => {
|
||||
(usable_space - 12) * 32 / 255 - 23
|
||||
}
|
||||
PageType::IndexLeaf | PageType::TableLeaf => (usable_space - 12) * 32 / 255 - 23,
|
||||
}
|
||||
}
|
||||
|
||||
fn usable_space(&self) -> usize {
|
||||
let db_header = RefCell::borrow(&self.database_header);
|
||||
(db_header.page_size - db_header.unused_space as u16) as usize
|
||||
}
|
||||
|
||||
fn find_cell(&self, page: &PageContent, int_key: u64) -> usize {
|
||||
let mut cell_idx = 0;
|
||||
let cell_count = page.cell_count();
|
||||
while cell_idx < cell_count {
|
||||
match page
|
||||
.cell_get(
|
||||
cell_idx,
|
||||
self.pager.clone(),
|
||||
self.max_local(page.page_type()),
|
||||
self.min_local(page.page_type()),
|
||||
self.usable_space(),
|
||||
)
|
||||
.unwrap()
|
||||
{
|
||||
BTreeCell::TableLeafCell(cell) => {
|
||||
if int_key <= cell._rowid {
|
||||
break;
|
||||
}
|
||||
}
|
||||
BTreeCell::TableInteriorCell(cell) => {
|
||||
if int_key <= cell._rowid {
|
||||
break;
|
||||
}
|
||||
}
|
||||
_ => todo!(),
|
||||
}
|
||||
cell_idx += 1;
|
||||
}
|
||||
cell_idx
|
||||
}
|
||||
}
|
||||
|
||||
fn find_free_cell(page_ref: &PageContent, db_header: Ref<DatabaseHeader>, amount: usize) -> usize {
|
||||
@@ -1175,11 +1353,17 @@ impl Cursor for BTreeCursor {
|
||||
OwnedValue::Integer(i) => *i as u64,
|
||||
_ => unreachable!("btree tables are indexed by integers!"),
|
||||
};
|
||||
let cell_idx = find_cell(page, int_key);
|
||||
let cell_idx = self.find_cell(page, int_key);
|
||||
if cell_idx >= page.cell_count() {
|
||||
Ok(CursorResult::Ok(false))
|
||||
} else {
|
||||
let equals = match &page.cell_get(cell_idx)? {
|
||||
let equals = match &page.cell_get(
|
||||
cell_idx,
|
||||
self.pager.clone(),
|
||||
self.max_local(page.page_type()),
|
||||
self.min_local(page.page_type()),
|
||||
self.usable_space(),
|
||||
)? {
|
||||
BTreeCell::TableLeafCell(l) => l._rowid == int_key,
|
||||
_ => unreachable!(),
|
||||
};
|
||||
@@ -1187,25 +1371,3 @@ impl Cursor for BTreeCursor {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn find_cell(page: &PageContent, int_key: u64) -> usize {
|
||||
let mut cell_idx = 0;
|
||||
let cell_count = page.cell_count();
|
||||
while cell_idx < cell_count {
|
||||
match page.cell_get(cell_idx).unwrap() {
|
||||
BTreeCell::TableLeafCell(cell) => {
|
||||
if int_key <= cell._rowid {
|
||||
break;
|
||||
}
|
||||
}
|
||||
BTreeCell::TableInteriorCell(cell) => {
|
||||
if int_key <= cell._rowid {
|
||||
break;
|
||||
}
|
||||
}
|
||||
_ => todo!(),
|
||||
}
|
||||
cell_idx += 1;
|
||||
}
|
||||
cell_idx
|
||||
}
|
||||
|
||||
@@ -416,4 +416,9 @@ impl Pager {
|
||||
let mut cache = RefCell::borrow_mut(&self.page_cache);
|
||||
cache.insert(id, page);
|
||||
}
|
||||
|
||||
pub fn usable_size(&self) -> usize {
|
||||
let db_header = self.db_header.borrow();
|
||||
(db_header.page_size - db_header.unused_space as u16) as usize
|
||||
}
|
||||
}
|
||||
|
||||
@@ -357,7 +357,14 @@ impl PageContent {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn cell_get(&self, idx: usize) -> Result<BTreeCell> {
|
||||
pub fn cell_get(
|
||||
&self,
|
||||
idx: usize,
|
||||
pager: Rc<Pager>,
|
||||
max_local: usize,
|
||||
min_local: usize,
|
||||
usable_size: usize,
|
||||
) -> Result<BTreeCell> {
|
||||
let buf = self.as_ptr();
|
||||
|
||||
let ncells = self.cell_count();
|
||||
@@ -371,7 +378,15 @@ impl PageContent {
|
||||
let cell_pointer = cell_start + (idx * 2);
|
||||
let cell_pointer = self.read_u16(cell_pointer) as usize;
|
||||
|
||||
read_btree_cell(buf, &self.page_type(), cell_pointer)
|
||||
read_btree_cell(
|
||||
buf,
|
||||
&self.page_type(),
|
||||
cell_pointer,
|
||||
pager,
|
||||
max_local,
|
||||
min_local,
|
||||
usable_size,
|
||||
)
|
||||
}
|
||||
|
||||
pub fn cell_get_raw_pointer_region(&self) -> (usize, usize) {
|
||||
@@ -385,7 +400,13 @@ impl PageContent {
|
||||
}
|
||||
|
||||
/* Get region of a cell's payload */
|
||||
pub fn cell_get_raw_region(&self, idx: usize) -> (usize, usize) {
|
||||
pub fn cell_get_raw_region(
|
||||
&self,
|
||||
idx: usize,
|
||||
max_local: usize,
|
||||
min_local: usize,
|
||||
usable_size: usize,
|
||||
) -> (usize, usize) {
|
||||
let buf = self.as_ptr();
|
||||
let ncells = self.cell_count();
|
||||
let cell_start = match self.page_type() {
|
||||
@@ -401,7 +422,13 @@ impl PageContent {
|
||||
let len = match self.page_type() {
|
||||
PageType::IndexInterior => {
|
||||
let (len_payload, n_payload) = read_varint(&buf[cell_pointer + 4..]).unwrap();
|
||||
4 + len_payload as usize + n_payload + 4
|
||||
let (overflows, to_read) =
|
||||
payload_overflows(len_payload as usize, max_local, min_local, usable_size);
|
||||
if overflows {
|
||||
4 + to_read + n_payload + 4
|
||||
} else {
|
||||
4 + len_payload as usize + n_payload + 4
|
||||
}
|
||||
}
|
||||
PageType::TableInterior => {
|
||||
let (_, n_rowid) = read_varint(&buf[cell_pointer + 4..]).unwrap();
|
||||
@@ -409,13 +436,24 @@ impl PageContent {
|
||||
}
|
||||
PageType::IndexLeaf => {
|
||||
let (len_payload, n_payload) = read_varint(&buf[cell_pointer..]).unwrap();
|
||||
len_payload as usize + n_payload + 4
|
||||
let (overflows, to_read) =
|
||||
payload_overflows(len_payload as usize, max_local, min_local, usable_size);
|
||||
if overflows {
|
||||
to_read as usize + n_payload + 4
|
||||
} else {
|
||||
len_payload as usize + n_payload + 4
|
||||
}
|
||||
}
|
||||
PageType::TableLeaf => {
|
||||
let (len_payload, n_payload) = read_varint(&buf[cell_pointer..]).unwrap();
|
||||
let (_, n_rowid) = read_varint(&buf[cell_pointer + n_payload..]).unwrap();
|
||||
// TODO: add overflow 4 bytes
|
||||
len_payload as usize + n_payload + n_rowid
|
||||
let (overflows, to_read) =
|
||||
payload_overflows(len_payload as usize, max_local, min_local, usable_size);
|
||||
if overflows {
|
||||
to_read + n_payload + n_rowid
|
||||
} else {
|
||||
len_payload as usize + n_payload + n_rowid
|
||||
}
|
||||
}
|
||||
};
|
||||
(start, len)
|
||||
@@ -548,7 +586,15 @@ pub struct IndexLeafCell {
|
||||
pub first_overflow_page: Option<u32>,
|
||||
}
|
||||
|
||||
pub fn read_btree_cell(page: &[u8], page_type: &PageType, pos: usize) -> Result<BTreeCell> {
|
||||
pub fn read_btree_cell(
|
||||
page: &[u8],
|
||||
page_type: &PageType,
|
||||
pos: usize,
|
||||
pager: Rc<Pager>,
|
||||
max_local: usize,
|
||||
min_local: usize,
|
||||
usable_size: usize,
|
||||
) -> Result<BTreeCell> {
|
||||
match page_type {
|
||||
PageType::IndexInterior => {
|
||||
let mut pos = pos;
|
||||
@@ -557,7 +603,13 @@ pub fn read_btree_cell(page: &[u8], page_type: &PageType, pos: usize) -> Result<
|
||||
pos += 4;
|
||||
let (payload_size, nr) = read_varint(&page[pos..])?;
|
||||
pos += nr;
|
||||
let (payload, first_overflow_page) = read_payload(&page[pos..], payload_size as usize);
|
||||
|
||||
let (overflows, to_read) =
|
||||
payload_overflows(payload_size as usize, max_local, min_local, usable_size);
|
||||
let to_read = if overflows { to_read } else { page.len() - pos };
|
||||
|
||||
let (payload, first_overflow_page) =
|
||||
read_payload(&page[pos..pos + to_read], payload_size as usize, pager);
|
||||
Ok(BTreeCell::IndexInteriorCell(IndexInteriorCell {
|
||||
left_child_page,
|
||||
payload,
|
||||
@@ -579,7 +631,13 @@ pub fn read_btree_cell(page: &[u8], page_type: &PageType, pos: usize) -> Result<
|
||||
let mut pos = pos;
|
||||
let (payload_size, nr) = read_varint(&page[pos..])?;
|
||||
pos += nr;
|
||||
let (payload, first_overflow_page) = read_payload(&page[pos..], payload_size as usize);
|
||||
|
||||
let (overflows, to_read) =
|
||||
payload_overflows(payload_size as usize, max_local, min_local, usable_size);
|
||||
let to_read = if overflows { to_read } else { page.len() - pos };
|
||||
|
||||
let (payload, first_overflow_page) =
|
||||
read_payload(&page[pos..pos + to_read], payload_size as usize, pager);
|
||||
Ok(BTreeCell::IndexLeafCell(IndexLeafCell {
|
||||
payload,
|
||||
first_overflow_page,
|
||||
@@ -591,7 +649,13 @@ pub fn read_btree_cell(page: &[u8], page_type: &PageType, pos: usize) -> Result<
|
||||
pos += nr;
|
||||
let (rowid, nr) = read_varint(&page[pos..])?;
|
||||
pos += nr;
|
||||
let (payload, first_overflow_page) = read_payload(&page[pos..], payload_size as usize);
|
||||
|
||||
let (overflows, to_read) =
|
||||
payload_overflows(payload_size as usize, max_local, min_local, usable_size);
|
||||
let to_read = if overflows { to_read } else { page.len() - pos };
|
||||
|
||||
let (payload, first_overflow_page) =
|
||||
read_payload(&page[pos..pos + to_read], payload_size as usize, pager);
|
||||
Ok(BTreeCell::TableLeafCell(TableLeafCell {
|
||||
_rowid: rowid,
|
||||
_payload: payload,
|
||||
@@ -603,20 +667,47 @@ pub fn read_btree_cell(page: &[u8], page_type: &PageType, pos: usize) -> Result<
|
||||
|
||||
/// read_payload takes in the unread bytearray with the payload size
|
||||
/// and returns the payload on the page, and optionally the first overflow page number.
|
||||
fn read_payload(unread: &[u8], payload_size: usize) -> (Vec<u8>, Option<u32>) {
|
||||
let page_len = unread.len();
|
||||
if payload_size <= page_len {
|
||||
fn read_payload(unread: &[u8], payload_size: usize, pager: Rc<Pager>) -> (Vec<u8>, Option<u32>) {
|
||||
let cell_len = unread.len();
|
||||
if payload_size <= cell_len {
|
||||
// fit within 1 page
|
||||
(unread[..payload_size].to_vec(), None)
|
||||
} else {
|
||||
// overflow
|
||||
let first_overflow_page = u32::from_be_bytes([
|
||||
unread[page_len - 4],
|
||||
unread[page_len - 3],
|
||||
unread[page_len - 2],
|
||||
unread[page_len - 1],
|
||||
unread[cell_len - 4],
|
||||
unread[cell_len - 3],
|
||||
unread[cell_len - 2],
|
||||
unread[cell_len - 1],
|
||||
]);
|
||||
(unread[..page_len - 4].to_vec(), Some(first_overflow_page))
|
||||
let usable_size = pager.usable_size();
|
||||
let mut next_overflow = first_overflow_page;
|
||||
let mut payload = unread[..cell_len - 4].to_vec();
|
||||
let mut left_to_read = payload_size - (cell_len - 4); // minus four because last for bytes of a payload cell are the overflow pointer
|
||||
while next_overflow != 0 {
|
||||
assert!(left_to_read > 0);
|
||||
let page;
|
||||
loop {
|
||||
let page_ref = pager.read_page(next_overflow as usize);
|
||||
if let Ok(p) = page_ref {
|
||||
page = p;
|
||||
break;
|
||||
}
|
||||
}
|
||||
let page = page.borrow();
|
||||
let contents = page.contents.write().unwrap();
|
||||
let contents = contents.as_ref().unwrap();
|
||||
|
||||
let to_read = left_to_read.min(usable_size - 4);
|
||||
let buf = contents.as_ptr();
|
||||
payload.extend_from_slice(&buf[4..4 + to_read]);
|
||||
|
||||
next_overflow = contents.read_u32(0);
|
||||
left_to_read -= to_read;
|
||||
}
|
||||
assert_eq!(left_to_read, 0);
|
||||
|
||||
(payload, Some(first_overflow_page))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -761,7 +852,11 @@ pub fn read_value(buf: &[u8], serial_type: &SerialType) -> Result<(OwnedValue, u
|
||||
}
|
||||
SerialType::String(n) => {
|
||||
if buf.len() < n {
|
||||
crate::bail_corrupt_error!("Invalid String value");
|
||||
crate::bail_corrupt_error!(
|
||||
"Invalid String value, length {} < expected length {}",
|
||||
buf.len(),
|
||||
n
|
||||
);
|
||||
}
|
||||
let bytes = buf[0..n].to_vec();
|
||||
let value = unsafe { String::from_utf8_unchecked(bytes) };
|
||||
@@ -828,6 +923,15 @@ pub fn write_varint(buf: &mut [u8], value: u64) -> usize {
|
||||
n
|
||||
}
|
||||
|
||||
pub fn write_varint_to_vec(value: u64, payload: &mut Vec<u8>) {
|
||||
let mut varint: Vec<u8> = Vec::new();
|
||||
varint.extend(std::iter::repeat(0).take(9));
|
||||
let n = write_varint(&mut varint.as_mut_slice()[0..9], value);
|
||||
write_varint(&mut varint, value);
|
||||
varint.truncate(n);
|
||||
payload.extend_from_slice(&varint);
|
||||
}
|
||||
|
||||
pub fn begin_read_wal_header(io: Rc<dyn File>) -> Result<Rc<RefCell<WalHeader>>> {
|
||||
let drop_fn = Rc::new(|_buf| {});
|
||||
let buf = Rc::new(RefCell::new(Buffer::allocate(32, drop_fn)));
|
||||
@@ -890,6 +994,28 @@ fn finish_read_wal_frame_header(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/*
|
||||
Checks if payload will overflow a cell based on max local and
|
||||
it will return the min size that will be stored in that case,
|
||||
including overflow pointer
|
||||
*/
|
||||
pub fn payload_overflows(
|
||||
payload_size: usize,
|
||||
max_local: usize,
|
||||
min_local: usize,
|
||||
usable_size: usize,
|
||||
) -> (bool, usize) {
|
||||
if payload_size <= max_local {
|
||||
return (false, 0);
|
||||
}
|
||||
|
||||
let mut space_left = min_local + (payload_size - min_local) % (usable_size - 4);
|
||||
if space_left > max_local {
|
||||
space_left = min_local;
|
||||
}
|
||||
return (true, space_left + 4);
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
214
test/src/lib.rs
214
test/src/lib.rs
@@ -1,28 +1,48 @@
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use limbo_core::{Database, RowResult, Value};
|
||||
use rusqlite::Connection;
|
||||
use std::env::current_dir;
|
||||
use std::sync::Arc;
|
||||
#[test]
|
||||
fn test_sequential_write() -> anyhow::Result<()> {
|
||||
env_logger::init();
|
||||
let path = "test.db";
|
||||
use limbo_core::Database;
|
||||
use std::env;
|
||||
use std::fs;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
||||
let io: Arc<dyn limbo_core::IO> = Arc::new(limbo_core::PlatformIO::new()?);
|
||||
dbg!(path);
|
||||
let mut path = current_dir()?;
|
||||
struct TempDatabase {
|
||||
pub path: PathBuf,
|
||||
pub io: Arc<dyn limbo_core::IO>,
|
||||
}
|
||||
|
||||
impl TempDatabase {
|
||||
pub fn new(table_sql: &str) -> Self {
|
||||
let mut path = env::current_dir().unwrap();
|
||||
path.push("test.db");
|
||||
{
|
||||
if path.exists() {
|
||||
std::fs::remove_file(&path)?;
|
||||
fs::remove_file(&path).unwrap();
|
||||
}
|
||||
let connection = Connection::open(&path)?;
|
||||
connection.execute("CREATE TABLE test (x INTEGER PRIMARY KEY);", ())?;
|
||||
let connection = rusqlite::Connection::open(&path).unwrap();
|
||||
connection.execute(table_sql, ()).unwrap();
|
||||
}
|
||||
let io: Arc<dyn limbo_core::IO> = Arc::new(limbo_core::PlatformIO::new().unwrap());
|
||||
|
||||
let db = Database::open_file(io.clone(), path.to_str().unwrap())?;
|
||||
Self { path, io }
|
||||
}
|
||||
|
||||
pub fn connect_limbo(&self) -> limbo_core::Connection {
|
||||
let db = Database::open_file(self.io.clone(), self.path.to_str().unwrap()).unwrap();
|
||||
let conn = db.connect();
|
||||
conn
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use limbo_core::{RowResult, Value};
|
||||
|
||||
#[test]
|
||||
fn test_sequential_write() -> anyhow::Result<()> {
|
||||
let _ = env_logger::try_init();
|
||||
|
||||
let tmp_db = TempDatabase::new("CREATE TABLE test (x INTEGER PRIMARY KEY);");
|
||||
let conn = tmp_db.connect_limbo();
|
||||
|
||||
let list_query = "SELECT * FROM test";
|
||||
let max_iterations = 10000;
|
||||
@@ -36,7 +56,7 @@ mod tests {
|
||||
Ok(Some(ref mut rows)) => loop {
|
||||
match rows.next_row()? {
|
||||
RowResult::IO => {
|
||||
io.run_once()?;
|
||||
tmp_db.io.run_once()?;
|
||||
}
|
||||
RowResult::Done => break,
|
||||
_ => unreachable!(),
|
||||
@@ -63,7 +83,7 @@ mod tests {
|
||||
current_read_index += 1;
|
||||
}
|
||||
RowResult::IO => {
|
||||
io.run_once()?;
|
||||
tmp_db.io.run_once()?;
|
||||
}
|
||||
RowResult::Done => break,
|
||||
}
|
||||
@@ -77,4 +97,160 @@ mod tests {
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_simple_overflow_page() -> anyhow::Result<()> {
|
||||
let _ = env_logger::try_init();
|
||||
let tmp_db = TempDatabase::new("CREATE TABLE test (x INTEGER PRIMARY KEY, t TEXT);");
|
||||
let conn = tmp_db.connect_limbo();
|
||||
|
||||
let mut huge_text = String::new();
|
||||
for i in 0..8192 {
|
||||
huge_text.push(('A' as u8 + (i % 24) as u8) as char);
|
||||
}
|
||||
|
||||
let list_query = "SELECT * FROM test LIMIT 1";
|
||||
let insert_query = format!("INSERT INTO test VALUES (1, '{}')", huge_text.as_str());
|
||||
|
||||
match conn.query(insert_query) {
|
||||
Ok(Some(ref mut rows)) => loop {
|
||||
match rows.next_row()? {
|
||||
RowResult::IO => {
|
||||
tmp_db.io.run_once()?;
|
||||
}
|
||||
RowResult::Done => break,
|
||||
_ => unreachable!(),
|
||||
}
|
||||
},
|
||||
Ok(None) => {}
|
||||
Err(err) => {
|
||||
eprintln!("{}", err);
|
||||
}
|
||||
};
|
||||
|
||||
// this flush helped to review hex of test.db
|
||||
conn.cacheflush()?;
|
||||
|
||||
match conn.query(list_query) {
|
||||
Ok(Some(ref mut rows)) => loop {
|
||||
match rows.next_row()? {
|
||||
RowResult::Row(row) => {
|
||||
let first_value = &row.values[0];
|
||||
let text = &row.values[1];
|
||||
let id = match first_value {
|
||||
Value::Integer(i) => *i as i32,
|
||||
Value::Float(f) => *f as i32,
|
||||
_ => unreachable!(),
|
||||
};
|
||||
let text = match text {
|
||||
Value::Text(t) => *t,
|
||||
_ => unreachable!(),
|
||||
};
|
||||
assert_eq!(1, id);
|
||||
compare_string(&huge_text, text);
|
||||
}
|
||||
RowResult::IO => {
|
||||
tmp_db.io.run_once()?;
|
||||
}
|
||||
RowResult::Done => break,
|
||||
}
|
||||
},
|
||||
Ok(None) => {}
|
||||
Err(err) => {
|
||||
eprintln!("{}", err);
|
||||
}
|
||||
}
|
||||
conn.cacheflush()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_sequential_overflow_page() -> anyhow::Result<()> {
|
||||
let _ = env_logger::try_init();
|
||||
let tmp_db = TempDatabase::new("CREATE TABLE test (x INTEGER PRIMARY KEY, t TEXT);");
|
||||
let conn = tmp_db.connect_limbo();
|
||||
let iterations = 10 as usize;
|
||||
|
||||
let mut huge_texts = Vec::new();
|
||||
for i in 0..iterations {
|
||||
let mut huge_text = String::new();
|
||||
for j in 0..8192 {
|
||||
huge_text.push(('A' as u8 + i as u8) as char);
|
||||
}
|
||||
huge_texts.push(huge_text);
|
||||
}
|
||||
|
||||
for i in 0..iterations {
|
||||
let huge_text = &huge_texts[i];
|
||||
let insert_query = format!("INSERT INTO test VALUES ({}, '{}')", i, huge_text.as_str());
|
||||
match conn.query(insert_query) {
|
||||
Ok(Some(ref mut rows)) => loop {
|
||||
match rows.next_row()? {
|
||||
RowResult::IO => {
|
||||
tmp_db.io.run_once()?;
|
||||
}
|
||||
RowResult::Done => break,
|
||||
_ => unreachable!(),
|
||||
}
|
||||
},
|
||||
Ok(None) => {}
|
||||
Err(err) => {
|
||||
eprintln!("{}", err);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
let list_query = "SELECT * FROM test LIMIT 1";
|
||||
let mut current_index = 0;
|
||||
match conn.query(list_query) {
|
||||
Ok(Some(ref mut rows)) => loop {
|
||||
match rows.next_row()? {
|
||||
RowResult::Row(row) => {
|
||||
let first_value = &row.values[0];
|
||||
let text = &row.values[1];
|
||||
let id = match first_value {
|
||||
Value::Integer(i) => *i as i32,
|
||||
Value::Float(f) => *f as i32,
|
||||
_ => unreachable!(),
|
||||
};
|
||||
let text = match text {
|
||||
Value::Text(t) => *t,
|
||||
_ => unreachable!(),
|
||||
};
|
||||
let huge_text = &huge_texts[current_index];
|
||||
assert_eq!(current_index, id as usize);
|
||||
compare_string(&huge_text, text);
|
||||
current_index += 1;
|
||||
}
|
||||
RowResult::IO => {
|
||||
tmp_db.io.run_once()?;
|
||||
}
|
||||
RowResult::Done => break,
|
||||
}
|
||||
},
|
||||
Ok(None) => {}
|
||||
Err(err) => {
|
||||
eprintln!("{}", err);
|
||||
}
|
||||
}
|
||||
conn.cacheflush()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn compare_string(a: &String, b: &String) {
|
||||
assert_eq!(a.len(), b.len(), "Strings are not equal in size!");
|
||||
let a = a.as_bytes();
|
||||
let b = b.as_bytes();
|
||||
|
||||
let len = a.len();
|
||||
for i in 0..len {
|
||||
if a[i] != b[i] {
|
||||
println!(
|
||||
"Bytes differ \n\t at index: dec -> {} hex -> {:#02x} \n\t values dec -> {}!={} hex -> {:#02x}!={:#02x}",
|
||||
i, i, a[i], b[i], a[i], b[i]
|
||||
);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user