diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 28c3c4b17..f66933034 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -9,10 +9,9 @@ use crate::types::{Cursor, CursorResult, OwnedRecord, OwnedValue}; use crate::Result; use std::cell::{Ref, RefCell}; -use std::mem::swap; use std::rc::Rc; -use super::sqlite3_ondisk::OverflowCell; +use super::sqlite3_ondisk::{write_varint_to_vec, OverflowCell}; /* These are offsets of fields in the header of a b-tree page. @@ -119,7 +118,13 @@ impl BTreeCursor { }, } } - let cell = page.cell_get(mem_page.cell_idx())?; + let cell = page.cell_get( + mem_page.cell_idx(), + self.pager.clone(), + self.max_local(page.page_type()), + self.min_local(page.page_type()), + self.usable_space(), + )?; match &cell { BTreeCell::TableInteriorCell(TableInteriorCell { _left_child_page, @@ -168,7 +173,13 @@ impl BTreeCursor { let page = page.as_ref().unwrap(); for cell_idx in 0..page.cell_count() { - match &page.cell_get(cell_idx)? { + match &page.cell_get( + cell_idx, + self.pager.clone(), + self.max_local(page.page_type()), + self.min_local(page.page_type()), + self.usable_space(), + )? { BTreeCell::TableLeafCell(TableLeafCell { _rowid: cell_rowid, _payload: p, @@ -272,7 +283,13 @@ impl BTreeCursor { let mut found_cell = false; for cell_idx in 0..page.cell_count() { - match &page.cell_get(cell_idx)? { + match &page.cell_get( + cell_idx, + self.pager.clone(), + self.max_local(page.page_type()), + self.min_local(page.page_type()), + self.usable_space(), + )? { BTreeCell::TableInteriorCell(TableInteriorCell { _left_child_page, _rowid, @@ -323,7 +340,7 @@ impl BTreeCursor { fn insert_to_page( &mut self, key: &OwnedValue, - _record: &OwnedRecord, + record: &OwnedRecord, ) -> Result> { let page_ref = self.get_page()?; let int_key = match key { @@ -331,7 +348,7 @@ impl BTreeCursor { _ => unreachable!("btree tables are indexed by integers!"), }; - let cell_idx = { + let (cell_idx, page_type) = { let page = RefCell::borrow(&page_ref); if page.is_locked() { return Ok(CursorResult::IO); @@ -345,50 +362,15 @@ impl BTreeCursor { assert!(matches!(page.page_type(), PageType::TableLeaf)); // find cell - find_cell(page, int_key) + (self.find_cell(page, int_key), page.page_type()) }; // TODO: if overwrite drop cell // insert cell + let mut cell_payload: Vec = Vec::new(); - - { - // Data len will be prepended later - // Key - let mut key_varint: Vec = Vec::new(); - key_varint.extend(std::iter::repeat(0).take(9)); - let n = write_varint(&mut key_varint.as_mut_slice()[0..9], int_key); - write_varint(&mut key_varint, int_key); - key_varint.truncate(n); - cell_payload.extend_from_slice(&key_varint); - } - - // Data payload - let payload_size_before_record = cell_payload.len(); - _record.serialize(&mut cell_payload); - let header_size = cell_payload.len() - payload_size_before_record; - - { - // Data len - let mut data_len_varint: Vec = Vec::new(); - data_len_varint.extend(std::iter::repeat(0).take(9)); - let n = write_varint( - &mut data_len_varint.as_mut_slice()[0..9], - header_size as u64, - ); - data_len_varint.truncate(n); - cell_payload.splice(0..0, data_len_varint.iter().cloned()); - } - - let usable_space = { - let db_header = RefCell::borrow(&self.database_header); - (db_header.page_size - db_header.unused_space as u16) as usize - }; - assert!( - cell_payload.len() <= usable_space - 100, /* 100 bytes minus for precaution to remember */ - "need to implemented overflow pages, too big to even add to a an empty page" - ); + self.fill_cell_payload(page_type, Some(int_key), &mut cell_payload, record); // insert let overflow = { @@ -507,7 +489,12 @@ impl BTreeCursor { } fn drop_cell(&mut self, page: &mut PageContent, cell_idx: usize) { - let (cell_start, cell_len) = page.cell_get_raw_region(cell_idx); + let (cell_start, cell_len) = page.cell_get_raw_region( + cell_idx, + self.max_local(page.page_type()), + self.min_local(page.page_type()), + self.usable_space(), + ); self.free_cell_range(page, cell_start as u16, cell_len as u16); page.write_u16(BTREE_HEADER_OFFSET_CELL_COUNT, page.cell_count() as u16 - 1); } @@ -571,7 +558,12 @@ impl BTreeCursor { let mut scratch_cells: Vec<&[u8]> = Vec::new(); for cell_idx in 0..page_copy.cell_count() { - let (start, len) = page_copy.cell_get_raw_region(cell_idx); + let (start, len) = page_copy.cell_get_raw_region( + cell_idx, + self.max_local(page_copy.page_type()), + self.min_local(page_copy.page_type()), + self.usable_space(), + ); let buf = page_copy.as_ptr(); scratch_cells.push(&buf[start..start + len]); } @@ -601,6 +593,7 @@ impl BTreeCursor { let right_page = right_page.as_mut().unwrap(); { let is_leaf = page.is_leaf(); + let page_type = page.page_type(); let mut new_pages = vec![page, right_page]; let new_pages_ids = vec![mem_page.page_idx, right_page_id]; trace!( @@ -629,7 +622,15 @@ impl BTreeCursor { // Right page pointer is u32 in right most pointer, and in cell is u32 too, so we can use a *u32 to hold where we want to change this value let mut right_pointer = BTREE_HEADER_OFFSET_RIGHTMOST; for cell_idx in 0..parent.cell_count() { - let cell = parent.cell_get(cell_idx).unwrap(); + let cell = parent + .cell_get( + cell_idx, + self.pager.clone(), + self.max_local(page_type.clone()), + self.min_local(page_type.clone()), + self.usable_space(), + ) + .unwrap(); let found = match cell { BTreeCell::TableInteriorCell(interior) => { interior._left_child_page as usize == mem_page.page_idx @@ -637,7 +638,12 @@ impl BTreeCursor { _ => unreachable!("Parent should always be a "), }; if found { - let (start, len) = parent.cell_get_raw_region(cell_idx); + let (start, len) = parent.cell_get_raw_region( + cell_idx, + self.max_local(page_type.clone()), + self.min_local(page_type.clone()), + self.usable_space(), + ); right_pointer = start; break; } @@ -686,7 +692,15 @@ impl BTreeCursor { if !is_leaf { for page in new_pages.iter_mut().take(new_pages_len - 1) { assert!(page.cell_count() == 1); - let last_cell = page.cell_get(page.cell_count() - 1).unwrap(); + let last_cell = page + .cell_get( + page.cell_count() - 1, + self.pager.clone(), + self.max_local(page.page_type()), + self.min_local(page.page_type()), + self.usable_space(), + ) + .unwrap(); let last_cell_pointer = match last_cell { BTreeCell::TableInteriorCell(interior) => interior._left_child_page, _ => unreachable!(), @@ -708,7 +722,17 @@ impl BTreeCursor { assert!(page.cell_count() > 1); let divider_cell_index = divider_cells_index[page_id_index]; let cell_payload = scratch_cells[divider_cell_index]; - let cell = read_btree_cell(cell_payload, &page.page_type(), 0).unwrap(); + let cell = read_btree_cell( + cell_payload, + &page.page_type(), + 0, + self.pager.clone(), + self.max_local(page.page_type()), + self.min_local(page.page_type()), + self.usable_space(), + ) + .unwrap(); + if is_leaf { // create a new divider cell and push let key = match cell { @@ -722,7 +746,7 @@ impl BTreeCursor { divider_cell.extend(std::iter::repeat(0).take(9)); let n = write_varint(&mut divider_cell.as_mut_slice()[4..], key); divider_cell.truncate(4 + n); - let parent_cell_idx = find_cell(parent, key); + let parent_cell_idx = self.find_cell(parent, key); self.insert_into_cell(parent, divider_cell.as_slice(), parent_cell_idx); } else { // move cell @@ -730,7 +754,7 @@ impl BTreeCursor { BTreeCell::TableInteriorCell(interior) => interior._rowid, _ => unreachable!(), }; - let parent_cell_idx = find_cell(page, key); + let parent_cell_idx = self.find_cell(page, key); self.insert_into_cell(parent, cell_payload, parent_cell_idx); // self.drop_cell(*page, 0); } @@ -828,6 +852,21 @@ impl BTreeCursor { page } + fn allocate_overflow_page(&self) -> Rc> { + let page = self.pager.allocate_page().unwrap(); + + { + // setup overflow page + let contents = RefCell::borrow(&page); + let mut contents = contents.contents.write().unwrap(); + let contents = contents.as_mut().unwrap(); + let buf = contents.as_ptr(); + buf.fill(0); + } + + page + } + /* Allocate space for a cell on a page. */ @@ -1020,6 +1059,145 @@ impl BTreeCursor { let mem_page = mem_page.as_ref().unwrap(); mem_page.clone() } + + fn fill_cell_payload( + &self, + page_type: PageType, + int_key: Option, + cell_payload: &mut Vec, + record: &OwnedRecord, + ) { + assert!(matches!( + page_type, + PageType::TableLeaf | PageType::IndexLeaf + )); + // TODO: make record raw from start, having to serialize is not good + let mut record_buf = Vec::new(); + record.serialize(&mut record_buf); + + // fill in header + if matches!(page_type, PageType::TableLeaf) { + let int_key = int_key.unwrap(); + write_varint_to_vec(record_buf.len() as u64, cell_payload); + write_varint_to_vec(int_key, cell_payload); + } else { + write_varint_to_vec(record_buf.len() as u64, cell_payload); + } + + let max_local = self.max_local(page_type.clone()); + if record_buf.len() <= max_local { + // enough allowed space to fit inside a btree page + cell_payload.extend_from_slice(record_buf.as_slice()); + cell_payload.resize(cell_payload.len() + 4, 0); + return; + } + + let min_local = self.min_local(page_type); + let mut space_left = min_local + (record_buf.len() - min_local) % (self.usable_space() - 4); + + if space_left > max_local { + space_left = min_local; + } + + // cell_size must be equal to first value of space_left as this will be the bytes copied to non-overflow page. + let cell_size = space_left + cell_payload.len() + 4; // 4 is the number of bytes of pointer to first overflow page + let mut to_copy_buffer = record_buf.as_slice(); + + let prev_size = cell_payload.len(); + cell_payload.resize(prev_size + space_left + 4, 0); + let mut pointer = unsafe { cell_payload.as_mut_ptr().add(prev_size) }; + let mut pointer_to_next = unsafe { cell_payload.as_mut_ptr().add(prev_size + space_left) }; + let mut overflow_pages = Vec::new(); + + loop { + let to_copy = space_left.min(to_copy_buffer.len()); + unsafe { std::ptr::copy(to_copy_buffer.as_ptr(), pointer, to_copy) }; + + let left = to_copy_buffer.len() - to_copy; + if left == 0 { + break; + } + + // we still have bytes to add, we will need to allocate new overflow page + let overflow_page = self.allocate_overflow_page(); + overflow_pages.push(overflow_page.clone()); + { + let page = overflow_page.borrow(); + let mut contents_lock = page.contents.write().unwrap(); + let contents = contents_lock.as_mut().unwrap(); + + let buf = contents.as_ptr(); + let id = page.id as u32; + let as_bytes = id.to_be_bytes(); + // update pointer to new overflow page + unsafe { std::ptr::copy(as_bytes.as_ptr(), pointer_to_next, 4) }; + + pointer = unsafe { buf.as_mut_ptr().add(4) }; + pointer_to_next = buf.as_mut_ptr(); + space_left = self.usable_space() - 4; + } + + to_copy_buffer = &to_copy_buffer[to_copy..]; + } + + assert_eq!(cell_size, cell_payload.len()); + } + + fn max_local(&self, page_type: PageType) -> usize { + let usable_space = self.usable_space(); + match page_type { + PageType::IndexInterior | PageType::TableInterior => { + (usable_space - 12) * 64 / 255 - 23 + } + PageType::IndexLeaf | PageType::TableLeaf => usable_space - 35, + } + } + + fn min_local(&self, page_type: PageType) -> usize { + let usable_space = self.usable_space(); + match page_type { + PageType::IndexInterior | PageType::TableInterior => { + (usable_space - 12) * 32 / 255 - 23 + } + PageType::IndexLeaf | PageType::TableLeaf => (usable_space - 12) * 32 / 255 - 23, + } + } + + fn usable_space(&self) -> usize { + let db_header = RefCell::borrow(&self.database_header); + (db_header.page_size - db_header.unused_space as u16) as usize + } + + fn find_cell(&self, page: &PageContent, int_key: u64) -> usize { + let mut cell_idx = 0; + let cell_count = page.cell_count(); + while cell_idx < cell_count { + match page + .cell_get( + cell_idx, + self.pager.clone(), + self.max_local(page.page_type()), + self.min_local(page.page_type()), + self.usable_space(), + ) + .unwrap() + { + BTreeCell::TableLeafCell(cell) => { + if int_key <= cell._rowid { + break; + } + } + BTreeCell::TableInteriorCell(cell) => { + if int_key <= cell._rowid { + break; + } + } + _ => todo!(), + } + cell_idx += 1; + } + cell_idx + } } fn find_free_cell(page_ref: &PageContent, db_header: Ref, amount: usize) -> usize { @@ -1175,11 +1353,17 @@ impl Cursor for BTreeCursor { OwnedValue::Integer(i) => *i as u64, _ => unreachable!("btree tables are indexed by integers!"), }; - let cell_idx = find_cell(page, int_key); + let cell_idx = self.find_cell(page, int_key); if cell_idx >= page.cell_count() { Ok(CursorResult::Ok(false)) } else { - let equals = match &page.cell_get(cell_idx)? { + let equals = match &page.cell_get( + cell_idx, + self.pager.clone(), + self.max_local(page.page_type()), + self.min_local(page.page_type()), + self.usable_space(), + )? { BTreeCell::TableLeafCell(l) => l._rowid == int_key, _ => unreachable!(), }; @@ -1187,25 +1371,3 @@ impl Cursor for BTreeCursor { } } } - -fn find_cell(page: &PageContent, int_key: u64) -> usize { - let mut cell_idx = 0; - let cell_count = page.cell_count(); - while cell_idx < cell_count { - match page.cell_get(cell_idx).unwrap() { - BTreeCell::TableLeafCell(cell) => { - if int_key <= cell._rowid { - break; - } - } - BTreeCell::TableInteriorCell(cell) => { - if int_key <= cell._rowid { - break; - } - } - _ => todo!(), - } - cell_idx += 1; - } - cell_idx -} diff --git a/core/storage/pager.rs b/core/storage/pager.rs index ae26314f8..8d4ce4747 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -416,4 +416,9 @@ impl Pager { let mut cache = RefCell::borrow_mut(&self.page_cache); cache.insert(id, page); } + + pub fn usable_size(&self) -> usize { + let db_header = self.db_header.borrow(); + (db_header.page_size - db_header.unused_space as u16) as usize + } } diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 99dd6e362..dc80339af 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -357,7 +357,14 @@ impl PageContent { } } - pub fn cell_get(&self, idx: usize) -> Result { + pub fn cell_get( + &self, + idx: usize, + pager: Rc, + max_local: usize, + min_local: usize, + usable_size: usize, + ) -> Result { let buf = self.as_ptr(); let ncells = self.cell_count(); @@ -371,7 +378,15 @@ impl PageContent { let cell_pointer = cell_start + (idx * 2); let cell_pointer = self.read_u16(cell_pointer) as usize; - read_btree_cell(buf, &self.page_type(), cell_pointer) + read_btree_cell( + buf, + &self.page_type(), + cell_pointer, + pager, + max_local, + min_local, + usable_size, + ) } pub fn cell_get_raw_pointer_region(&self) -> (usize, usize) { @@ -385,7 +400,13 @@ impl PageContent { } /* Get region of a cell's payload */ - pub fn cell_get_raw_region(&self, idx: usize) -> (usize, usize) { + pub fn cell_get_raw_region( + &self, + idx: usize, + max_local: usize, + min_local: usize, + usable_size: usize, + ) -> (usize, usize) { let buf = self.as_ptr(); let ncells = self.cell_count(); let cell_start = match self.page_type() { @@ -401,7 +422,13 @@ impl PageContent { let len = match self.page_type() { PageType::IndexInterior => { let (len_payload, n_payload) = read_varint(&buf[cell_pointer + 4..]).unwrap(); - 4 + len_payload as usize + n_payload + 4 + let (overflows, to_read) = + payload_overflows(len_payload as usize, max_local, min_local, usable_size); + if overflows { + 4 + to_read + n_payload + 4 + } else { + 4 + len_payload as usize + n_payload + 4 + } } PageType::TableInterior => { let (_, n_rowid) = read_varint(&buf[cell_pointer + 4..]).unwrap(); @@ -409,13 +436,24 @@ impl PageContent { } PageType::IndexLeaf => { let (len_payload, n_payload) = read_varint(&buf[cell_pointer..]).unwrap(); - len_payload as usize + n_payload + 4 + let (overflows, to_read) = + payload_overflows(len_payload as usize, max_local, min_local, usable_size); + if overflows { + to_read as usize + n_payload + 4 + } else { + len_payload as usize + n_payload + 4 + } } PageType::TableLeaf => { let (len_payload, n_payload) = read_varint(&buf[cell_pointer..]).unwrap(); let (_, n_rowid) = read_varint(&buf[cell_pointer + n_payload..]).unwrap(); - // TODO: add overflow 4 bytes - len_payload as usize + n_payload + n_rowid + let (overflows, to_read) = + payload_overflows(len_payload as usize, max_local, min_local, usable_size); + if overflows { + to_read + n_payload + n_rowid + } else { + len_payload as usize + n_payload + n_rowid + } } }; (start, len) @@ -548,7 +586,15 @@ pub struct IndexLeafCell { pub first_overflow_page: Option, } -pub fn read_btree_cell(page: &[u8], page_type: &PageType, pos: usize) -> Result { +pub fn read_btree_cell( + page: &[u8], + page_type: &PageType, + pos: usize, + pager: Rc, + max_local: usize, + min_local: usize, + usable_size: usize, +) -> Result { match page_type { PageType::IndexInterior => { let mut pos = pos; @@ -557,7 +603,13 @@ pub fn read_btree_cell(page: &[u8], page_type: &PageType, pos: usize) -> Result< pos += 4; let (payload_size, nr) = read_varint(&page[pos..])?; pos += nr; - let (payload, first_overflow_page) = read_payload(&page[pos..], payload_size as usize); + + let (overflows, to_read) = + payload_overflows(payload_size as usize, max_local, min_local, usable_size); + let to_read = if overflows { to_read } else { page.len() - pos }; + + let (payload, first_overflow_page) = + read_payload(&page[pos..pos + to_read], payload_size as usize, pager); Ok(BTreeCell::IndexInteriorCell(IndexInteriorCell { left_child_page, payload, @@ -579,7 +631,13 @@ pub fn read_btree_cell(page: &[u8], page_type: &PageType, pos: usize) -> Result< let mut pos = pos; let (payload_size, nr) = read_varint(&page[pos..])?; pos += nr; - let (payload, first_overflow_page) = read_payload(&page[pos..], payload_size as usize); + + let (overflows, to_read) = + payload_overflows(payload_size as usize, max_local, min_local, usable_size); + let to_read = if overflows { to_read } else { page.len() - pos }; + + let (payload, first_overflow_page) = + read_payload(&page[pos..pos + to_read], payload_size as usize, pager); Ok(BTreeCell::IndexLeafCell(IndexLeafCell { payload, first_overflow_page, @@ -591,7 +649,13 @@ pub fn read_btree_cell(page: &[u8], page_type: &PageType, pos: usize) -> Result< pos += nr; let (rowid, nr) = read_varint(&page[pos..])?; pos += nr; - let (payload, first_overflow_page) = read_payload(&page[pos..], payload_size as usize); + + let (overflows, to_read) = + payload_overflows(payload_size as usize, max_local, min_local, usable_size); + let to_read = if overflows { to_read } else { page.len() - pos }; + + let (payload, first_overflow_page) = + read_payload(&page[pos..pos + to_read], payload_size as usize, pager); Ok(BTreeCell::TableLeafCell(TableLeafCell { _rowid: rowid, _payload: payload, @@ -603,20 +667,47 @@ pub fn read_btree_cell(page: &[u8], page_type: &PageType, pos: usize) -> Result< /// read_payload takes in the unread bytearray with the payload size /// and returns the payload on the page, and optionally the first overflow page number. -fn read_payload(unread: &[u8], payload_size: usize) -> (Vec, Option) { - let page_len = unread.len(); - if payload_size <= page_len { +fn read_payload(unread: &[u8], payload_size: usize, pager: Rc) -> (Vec, Option) { + let cell_len = unread.len(); + if payload_size <= cell_len { // fit within 1 page (unread[..payload_size].to_vec(), None) } else { // overflow let first_overflow_page = u32::from_be_bytes([ - unread[page_len - 4], - unread[page_len - 3], - unread[page_len - 2], - unread[page_len - 1], + unread[cell_len - 4], + unread[cell_len - 3], + unread[cell_len - 2], + unread[cell_len - 1], ]); - (unread[..page_len - 4].to_vec(), Some(first_overflow_page)) + let usable_size = pager.usable_size(); + let mut next_overflow = first_overflow_page; + let mut payload = unread[..cell_len - 4].to_vec(); + let mut left_to_read = payload_size - (cell_len - 4); // minus four because last for bytes of a payload cell are the overflow pointer + while next_overflow != 0 { + assert!(left_to_read > 0); + let page; + loop { + let page_ref = pager.read_page(next_overflow as usize); + if let Ok(p) = page_ref { + page = p; + break; + } + } + let page = page.borrow(); + let contents = page.contents.write().unwrap(); + let contents = contents.as_ref().unwrap(); + + let to_read = left_to_read.min(usable_size - 4); + let buf = contents.as_ptr(); + payload.extend_from_slice(&buf[4..4 + to_read]); + + next_overflow = contents.read_u32(0); + left_to_read -= to_read; + } + assert_eq!(left_to_read, 0); + + (payload, Some(first_overflow_page)) } } @@ -761,7 +852,11 @@ pub fn read_value(buf: &[u8], serial_type: &SerialType) -> Result<(OwnedValue, u } SerialType::String(n) => { if buf.len() < n { - crate::bail_corrupt_error!("Invalid String value"); + crate::bail_corrupt_error!( + "Invalid String value, length {} < expected length {}", + buf.len(), + n + ); } let bytes = buf[0..n].to_vec(); let value = unsafe { String::from_utf8_unchecked(bytes) }; @@ -828,6 +923,15 @@ pub fn write_varint(buf: &mut [u8], value: u64) -> usize { n } +pub fn write_varint_to_vec(value: u64, payload: &mut Vec) { + let mut varint: Vec = Vec::new(); + varint.extend(std::iter::repeat(0).take(9)); + let n = write_varint(&mut varint.as_mut_slice()[0..9], value); + write_varint(&mut varint, value); + varint.truncate(n); + payload.extend_from_slice(&varint); +} + pub fn begin_read_wal_header(io: Rc) -> Result>> { let drop_fn = Rc::new(|_buf| {}); let buf = Rc::new(RefCell::new(Buffer::allocate(32, drop_fn))); @@ -890,6 +994,28 @@ fn finish_read_wal_frame_header( Ok(()) } +/* + Checks if payload will overflow a cell based on max local and + it will return the min size that will be stored in that case, + including overflow pointer +*/ +pub fn payload_overflows( + payload_size: usize, + max_local: usize, + min_local: usize, + usable_size: usize, +) -> (bool, usize) { + if payload_size <= max_local { + return (false, 0); + } + + let mut space_left = min_local + (payload_size - min_local) % (usable_size - 4); + if space_left > max_local { + space_left = min_local; + } + return (true, space_left + 4); +} + #[cfg(test)] mod tests { use super::*; diff --git a/test/src/lib.rs b/test/src/lib.rs index b74d59e97..6bc6ab18d 100644 --- a/test/src/lib.rs +++ b/test/src/lib.rs @@ -1,28 +1,48 @@ -#[cfg(test)] -mod tests { - use limbo_core::{Database, RowResult, Value}; - use rusqlite::Connection; - use std::env::current_dir; - use std::sync::Arc; - #[test] - fn test_sequential_write() -> anyhow::Result<()> { - env_logger::init(); - let path = "test.db"; +use limbo_core::Database; +use std::env; +use std::fs; +use std::path::PathBuf; +use std::sync::Arc; - let io: Arc = Arc::new(limbo_core::PlatformIO::new()?); - dbg!(path); - let mut path = current_dir()?; +struct TempDatabase { + pub path: PathBuf, + pub io: Arc, +} + +impl TempDatabase { + pub fn new(table_sql: &str) -> Self { + let mut path = env::current_dir().unwrap(); path.push("test.db"); { if path.exists() { - std::fs::remove_file(&path)?; + fs::remove_file(&path).unwrap(); } - let connection = Connection::open(&path)?; - connection.execute("CREATE TABLE test (x INTEGER PRIMARY KEY);", ())?; + let connection = rusqlite::Connection::open(&path).unwrap(); + connection.execute(table_sql, ()).unwrap(); } + let io: Arc = Arc::new(limbo_core::PlatformIO::new().unwrap()); - let db = Database::open_file(io.clone(), path.to_str().unwrap())?; + Self { path, io } + } + + pub fn connect_limbo(&self) -> limbo_core::Connection { + let db = Database::open_file(self.io.clone(), self.path.to_str().unwrap()).unwrap(); let conn = db.connect(); + conn + } +} + +#[cfg(test)] +mod tests { + use super::*; + use limbo_core::{RowResult, Value}; + + #[test] + fn test_sequential_write() -> anyhow::Result<()> { + let _ = env_logger::try_init(); + + let tmp_db = TempDatabase::new("CREATE TABLE test (x INTEGER PRIMARY KEY);"); + let conn = tmp_db.connect_limbo(); let list_query = "SELECT * FROM test"; let max_iterations = 10000; @@ -36,7 +56,7 @@ mod tests { Ok(Some(ref mut rows)) => loop { match rows.next_row()? { RowResult::IO => { - io.run_once()?; + tmp_db.io.run_once()?; } RowResult::Done => break, _ => unreachable!(), @@ -63,7 +83,7 @@ mod tests { current_read_index += 1; } RowResult::IO => { - io.run_once()?; + tmp_db.io.run_once()?; } RowResult::Done => break, } @@ -77,4 +97,160 @@ mod tests { } Ok(()) } + + #[test] + fn test_simple_overflow_page() -> anyhow::Result<()> { + let _ = env_logger::try_init(); + let tmp_db = TempDatabase::new("CREATE TABLE test (x INTEGER PRIMARY KEY, t TEXT);"); + let conn = tmp_db.connect_limbo(); + + let mut huge_text = String::new(); + for i in 0..8192 { + huge_text.push(('A' as u8 + (i % 24) as u8) as char); + } + + let list_query = "SELECT * FROM test LIMIT 1"; + let insert_query = format!("INSERT INTO test VALUES (1, '{}')", huge_text.as_str()); + + match conn.query(insert_query) { + Ok(Some(ref mut rows)) => loop { + match rows.next_row()? { + RowResult::IO => { + tmp_db.io.run_once()?; + } + RowResult::Done => break, + _ => unreachable!(), + } + }, + Ok(None) => {} + Err(err) => { + eprintln!("{}", err); + } + }; + + // this flush helped to review hex of test.db + conn.cacheflush()?; + + match conn.query(list_query) { + Ok(Some(ref mut rows)) => loop { + match rows.next_row()? { + RowResult::Row(row) => { + let first_value = &row.values[0]; + let text = &row.values[1]; + let id = match first_value { + Value::Integer(i) => *i as i32, + Value::Float(f) => *f as i32, + _ => unreachable!(), + }; + let text = match text { + Value::Text(t) => *t, + _ => unreachable!(), + }; + assert_eq!(1, id); + compare_string(&huge_text, text); + } + RowResult::IO => { + tmp_db.io.run_once()?; + } + RowResult::Done => break, + } + }, + Ok(None) => {} + Err(err) => { + eprintln!("{}", err); + } + } + conn.cacheflush()?; + Ok(()) + } + + #[test] + fn test_sequential_overflow_page() -> anyhow::Result<()> { + let _ = env_logger::try_init(); + let tmp_db = TempDatabase::new("CREATE TABLE test (x INTEGER PRIMARY KEY, t TEXT);"); + let conn = tmp_db.connect_limbo(); + let iterations = 10 as usize; + + let mut huge_texts = Vec::new(); + for i in 0..iterations { + let mut huge_text = String::new(); + for j in 0..8192 { + huge_text.push(('A' as u8 + i as u8) as char); + } + huge_texts.push(huge_text); + } + + for i in 0..iterations { + let huge_text = &huge_texts[i]; + let insert_query = format!("INSERT INTO test VALUES ({}, '{}')", i, huge_text.as_str()); + match conn.query(insert_query) { + Ok(Some(ref mut rows)) => loop { + match rows.next_row()? { + RowResult::IO => { + tmp_db.io.run_once()?; + } + RowResult::Done => break, + _ => unreachable!(), + } + }, + Ok(None) => {} + Err(err) => { + eprintln!("{}", err); + } + }; + } + + let list_query = "SELECT * FROM test LIMIT 1"; + let mut current_index = 0; + match conn.query(list_query) { + Ok(Some(ref mut rows)) => loop { + match rows.next_row()? { + RowResult::Row(row) => { + let first_value = &row.values[0]; + let text = &row.values[1]; + let id = match first_value { + Value::Integer(i) => *i as i32, + Value::Float(f) => *f as i32, + _ => unreachable!(), + }; + let text = match text { + Value::Text(t) => *t, + _ => unreachable!(), + }; + let huge_text = &huge_texts[current_index]; + assert_eq!(current_index, id as usize); + compare_string(&huge_text, text); + current_index += 1; + } + RowResult::IO => { + tmp_db.io.run_once()?; + } + RowResult::Done => break, + } + }, + Ok(None) => {} + Err(err) => { + eprintln!("{}", err); + } + } + conn.cacheflush()?; + Ok(()) + } + + fn compare_string(a: &String, b: &String) { + assert_eq!(a.len(), b.len(), "Strings are not equal in size!"); + let a = a.as_bytes(); + let b = b.as_bytes(); + + let len = a.len(); + for i in 0..len { + if a[i] != b[i] { + println!( + "Bytes differ \n\t at index: dec -> {} hex -> {:#02x} \n\t values dec -> {}!={} hex -> {:#02x}!={:#02x}", + i, i, a[i], b[i], a[i], b[i] + ); + break; + } + } + } }