Merge pull request #229 from pereman2/ww

core: write path
This commit is contained in:
Pekka Enberg
2024-07-31 20:04:06 +03:00
committed by GitHub
25 changed files with 2138 additions and 225 deletions

View File

@@ -45,7 +45,7 @@ impl limbo_core::PageIO for PageIO {
&self,
_page_idx: usize,
_buffer: Rc<std::cell::RefCell<limbo_core::Buffer>>,
_c: Rc<limbo_core::WriteCompletion>,
_c: Rc<limbo_core::Completion>,
) -> Result<()> {
todo!()
}

View File

@@ -277,5 +277,7 @@ fn query(
eprintln!("{}", err);
}
}
// for now let's cache flush always
conn.cacheflush()?;
Ok(())
}

View File

@@ -1,11 +1,25 @@
use crate::pager::Pager;
use crate::sqlite3_ondisk::{BTreeCell, TableInteriorCell, TableLeafCell};
use crate::types::{Cursor, CursorResult, OwnedRecord};
use crate::pager::{Page, Pager};
use crate::sqlite3_ondisk::{
read_varint, write_varint, BTreeCell, DatabaseHeader, PageContent, PageType, TableInteriorCell,
TableLeafCell,
};
use crate::types::{Cursor, CursorResult, OwnedRecord, OwnedValue};
use crate::Result;
use std::cell::{Ref, RefCell};
use std::mem;
use std::rc::Rc;
/*
These are offsets of fields in the header of a b-tree page.
*/
const BTREE_HEADER_OFFSET_TYPE: usize = 0; /* type of btree page -> u8 */
const BTREE_HEADER_OFFSET_FREEBLOCK: usize = 1; /* pointer to first freeblock -> u16 */
const BTREE_HEADER_OFFSET_CELL_COUNT: usize = 3; /* number of cells in the page -> u16 */
const BTREE_HEADER_OFFSET_CELL_CONTENT: usize = 5; /* pointer to first byte of cell allocated content from top -> u16 */
const BTREE_HEADER_OFFSET_FRAGMENTED: usize = 7; /* number of fragmented bytes -> u8 */
const BTREE_HEADER_OFFSET_RIGHTMOST: usize = 8; /* if internalnode, pointer right most pointer (saved separately from cells) -> u32 */
pub struct MemPage {
parent: Option<Rc<MemPage>>,
page_idx: usize,
@@ -38,10 +52,15 @@ pub struct BTreeCursor {
rowid: RefCell<Option<u64>>,
record: RefCell<Option<OwnedRecord>>,
null_flag: bool,
database_header: Rc<RefCell<DatabaseHeader>>,
}
impl BTreeCursor {
pub fn new(pager: Rc<Pager>, root_page: usize) -> Self {
pub fn new(
pager: Rc<Pager>,
root_page: usize,
database_header: Rc<RefCell<DatabaseHeader>>,
) -> Self {
Self {
pager,
root_page,
@@ -49,6 +68,7 @@ impl BTreeCursor {
rowid: RefCell::new(None),
record: RefCell::new(None),
null_flag: false,
database_header,
}
}
@@ -61,14 +81,15 @@ impl BTreeCursor {
};
let page_idx = mem_page.page_idx;
let page = self.pager.read_page(page_idx)?;
let page = RefCell::borrow(&page);
if page.is_locked() {
return Ok(CursorResult::IO);
}
let page = page.contents.read().unwrap();
let page = page.as_ref().unwrap();
if mem_page.cell_idx() >= page.cells.len() {
if mem_page.cell_idx() >= page.cell_count() {
let parent = mem_page.parent.clone();
match page.header.right_most_pointer {
match page.rightmost_pointer() {
Some(right_most_pointer) => {
let mem_page = MemPage::new(parent.clone(), right_most_pointer as usize, 0);
self.page.replace(Some(Rc::new(mem_page)));
@@ -85,7 +106,7 @@ impl BTreeCursor {
},
}
}
let cell = &page.cells[mem_page.cell_idx()];
let cell = page.cell_get(mem_page.cell_idx())?;
match &cell {
BTreeCell::TableInteriorCell(TableInteriorCell {
_left_child_page,
@@ -115,6 +136,630 @@ impl BTreeCursor {
}
}
}
fn move_to_root(&mut self) {
self.page
.replace(Some(Rc::new(MemPage::new(None, self.root_page, 0))));
}
pub fn move_to(&mut self, key: u64) -> Result<CursorResult<()>> {
self.move_to_root();
loop {
let mem_page = {
let mem_page = self.page.borrow();
let mem_page = mem_page.as_ref().unwrap();
mem_page.clone()
};
let page_idx = mem_page.page_idx;
let page = self.pager.read_page(page_idx)?;
let page = RefCell::borrow(&page);
if page.is_locked() {
return Ok(CursorResult::IO);
}
let page = page.contents.read().unwrap();
let page = page.as_ref().unwrap();
if page.is_leaf() {
return Ok(CursorResult::Ok(()));
}
let mut found_cell = false;
for cell_idx in 0..page.cell_count() {
match &page.cell_get(cell_idx)? {
BTreeCell::TableInteriorCell(TableInteriorCell {
_left_child_page,
_rowid,
}) => {
if key < *_rowid {
mem_page.advance();
let mem_page =
MemPage::new(Some(mem_page.clone()), *_left_child_page as usize, 0);
self.page.replace(Some(Rc::new(mem_page)));
found_cell = true;
break;
}
}
BTreeCell::TableLeafCell(TableLeafCell {
_rowid: _,
_payload: _,
first_overflow_page: _,
}) => {
unreachable!(
"we don't iterate leaf cells while trying to move to a leaf cell"
);
}
BTreeCell::IndexInteriorCell(_) => {
unimplemented!();
}
BTreeCell::IndexLeafCell(_) => {
unimplemented!();
}
}
}
if !found_cell {
let parent = mem_page.clone();
match page.rightmost_pointer() {
Some(right_most_pointer) => {
let mem_page = MemPage::new(Some(parent), right_most_pointer as usize, 0);
self.page.replace(Some(Rc::new(mem_page)));
continue;
}
None => {
unreachable!("we shall not go back up! The only way is down the slope");
}
}
}
}
}
fn insert_to_page(
&mut self,
key: &OwnedValue,
_record: &OwnedRecord,
) -> Result<CursorResult<()>> {
let page_ref = self.get_page()?;
let int_key = match key {
OwnedValue::Integer(i) => *i as u64,
_ => unreachable!("btree tables are indexed by integers!"),
};
let cell_idx = {
let page = RefCell::borrow(&page_ref);
if page.is_locked() {
return Ok(CursorResult::IO);
}
page.set_dirty();
self.pager.add_dirty(page_ref.clone());
let mut page = page.contents.write().unwrap();
let page = page.as_mut().unwrap();
assert!(matches!(page.page_type(), PageType::TableLeaf));
// find cell
find_cell(page, int_key)
};
// TODO: if overwrite drop cell
// insert cell
let mut payload: Vec<u8> = Vec::new();
{
// Data len will be prepended later
// Key
let mut key_varint: Vec<u8> = Vec::new();
key_varint.extend(std::iter::repeat(0).take(9));
let n = write_varint(&mut key_varint.as_mut_slice()[0..9], int_key);
write_varint(&mut key_varint, int_key);
key_varint.truncate(n);
payload.extend_from_slice(&key_varint);
}
// Data payload
let payload_size_before_record = payload.len();
_record.serialize(&mut payload);
let header_size = payload.len() - payload_size_before_record;
{
// Data len
let mut data_len_varint: Vec<u8> = Vec::new();
data_len_varint.extend(std::iter::repeat(0).take(9));
let n = write_varint(
&mut data_len_varint.as_mut_slice()[0..9],
header_size as u64,
);
data_len_varint.truncate(n);
payload.splice(0..0, data_len_varint.iter().cloned());
}
let usable_space = {
let db_header = RefCell::borrow(&self.database_header);
(db_header.page_size - db_header.unused_space as u16) as usize
};
let free = {
let page = RefCell::borrow(&page_ref);
let mut page = page.contents.write().unwrap();
let page = page.as_mut().unwrap();
self.compute_free_space(page, RefCell::borrow(&self.database_header))
};
assert!(
payload.len() <= usable_space - 100, /* 100 bytes minus for precaution to remember */
"need to implemented overflow pages, too big to even add to a an empty page"
);
if payload.len() + 2 > free as usize {
// overflow or balance
self.balance_leaf(int_key, payload);
} else {
// insert
let page = RefCell::borrow(&page_ref);
let mut page = page.contents.write().unwrap();
let page = page.as_mut().unwrap();
self.insert_into_cell(page, &payload, cell_idx);
}
Ok(CursorResult::Ok(()))
}
/* insert to postion and shift other pointers */
fn insert_into_cell(&mut self, page: &mut PageContent, payload: &Vec<u8>, cell_idx: usize) {
// TODO: insert into cell payload in internal page
let pc = self.allocate_cell_space(page, payload.len() as u16);
let mut buf_ref = RefCell::borrow_mut(&page.buffer);
let buf: &mut [u8] = buf_ref.as_mut_slice();
// copy data
buf[pc as usize..pc as usize + payload.len()].copy_from_slice(&payload);
// memmove(pIns+2, pIns, 2*(pPage->nCell - i));
let (pointer_area_pc_by_idx, _) = page.cell_get_raw_pointer_region();
let pointer_area_pc_by_idx = pointer_area_pc_by_idx + (2 * cell_idx);
// move previous pointers forward and insert new pointer there
let n_cells_forward = 2 * (page.cell_count() - cell_idx);
if n_cells_forward > 0 {
buf.copy_within(
pointer_area_pc_by_idx..pointer_area_pc_by_idx + n_cells_forward,
pointer_area_pc_by_idx + 2,
);
}
page.write_u16(pointer_area_pc_by_idx, pc);
// update first byte of content area
page.write_u16(BTREE_HEADER_OFFSET_CELL_CONTENT, pc);
// update cell count
let new_n_cells = (page.cell_count() + 1) as u16;
page.write_u16(BTREE_HEADER_OFFSET_CELL_COUNT, new_n_cells);
}
fn get_page(&mut self) -> crate::Result<Rc<RefCell<Page>>> {
let mem_page = {
let mem_page = self.page.borrow();
let mem_page = mem_page.as_ref().unwrap();
mem_page.clone()
};
let page_idx = mem_page.page_idx;
let page_ref = self.pager.read_page(page_idx)?;
Ok(page_ref)
}
fn balance_leaf(&mut self, key: u64, payload: Vec<u8>) {
// This is a naive algorithm that doesn't try to distribute cells evenly by content.
// It will try to split the page in half by keys not by content.
// Sqlite tries to have a page at least 40% full.
let mut key = key;
let mut payload = payload;
loop {
let mem_page = {
let mem_page = self.page.borrow();
let mem_page = mem_page.as_ref().unwrap();
mem_page.clone()
};
let page_ref = self.read_page_sync(mem_page.page_idx);
let mut page_rc = RefCell::borrow_mut(&page_ref);
let right_page_id = {
// split procedure
let mut page = page_rc.contents.write().unwrap();
let page = page.as_mut().unwrap();
let free = self.compute_free_space(page, RefCell::borrow(&self.database_header));
assert!(
matches!(
page.page_type(),
PageType::TableLeaf | PageType::TableInterior
),
"indexes still not supported "
);
if payload.len() + 2 <= free as usize {
let cell_idx = find_cell(page, key);
self.insert_into_cell(page, &payload, cell_idx);
break;
}
let right_page_ref = self.allocate_page(page.page_type());
let right_page = RefCell::borrow_mut(&right_page_ref);
let right_page_id = right_page.id;
let mut right_page = right_page.contents.write().unwrap();
let right_page = right_page.as_mut().unwrap();
{
// move data from one buffer to another
// done in a separate block to satisfy borrow checker
let mut left_buf = RefCell::borrow_mut(&page.buffer);
let left_buf: &mut [u8] = left_buf.as_mut_slice();
let mut right_buf = RefCell::borrow_mut(&right_page.buffer);
let right_buf: &mut [u8] = right_buf.as_mut_slice();
let mut rbrk = right_page.cell_content_area() as usize;
let cells_to_move = page.cell_count() / 2;
let (mut cell_pointer_idx, _) = page.cell_get_raw_pointer_region();
// move half of cells to right page
for cell_idx in cells_to_move..page.cell_count() {
let (start, len) = page.cell_get_raw_region_borrowed(cell_idx, left_buf);
// copy data
rbrk -= len;
right_buf[rbrk..rbrk + len].copy_from_slice(&left_buf[start..start + len]);
// set pointer
right_page.write_u16(cell_pointer_idx, rbrk as u16);
cell_pointer_idx += 2;
}
// update cell count in both pages
let keys_moved = page.cell_count() - cells_to_move;
page.write_u16(BTREE_HEADER_OFFSET_CELL_COUNT, cells_to_move as u16);
right_page.write_u16(BTREE_HEADER_OFFSET_CELL_COUNT, keys_moved as u16);
// update cell content are start
right_page.write_u16(BTREE_HEADER_OFFSET_CELL_CONTENT, rbrk as u16);
}
let last_cell = page.cell_get(page.cell_count() - 1).unwrap();
let last_cell_key = match &last_cell {
BTreeCell::TableLeafCell(cell) => cell._rowid,
BTreeCell::TableInteriorCell(cell) => cell._rowid,
_ => unreachable!(), /* not yet supported index tables */
};
// if not leaf page update rightmost pointer
if let PageType::TableInterior = page.page_type() {
right_page.write_u32(
BTREE_HEADER_OFFSET_RIGHTMOST,
page.rightmost_pointer().unwrap(),
);
// convert last cell to rightmost pointer
let BTreeCell::TableInteriorCell(last_cell) = &last_cell else {
unreachable!();
};
page.write_u32(BTREE_HEADER_OFFSET_RIGHTMOST, last_cell._left_child_page);
// page count now has one less cell because we've added the last one to rightmost pointer
page.write_u16(
BTREE_HEADER_OFFSET_CELL_COUNT,
(page.cell_count() - 1) as u16,
);
}
// update free list block by defragmenting page
self.defragment_page(page, RefCell::borrow(&self.database_header));
// insert into one of the pages
if key < last_cell_key {
let cell_idx = find_cell(page, key);
self.insert_into_cell(page, &payload, cell_idx);
} else {
let cell_idx = find_cell(right_page, key);
self.insert_into_cell(right_page, &payload, cell_idx);
}
// propagate parent split
key = last_cell_key;
right_page_id
};
payload = Vec::new();
if mem_page.page_idx == self.root_page {
/* todo: balance deeper, create child and copy contents of root there. Then split root */
/* if we are in root page then we just need to create a new root and push key there */
let new_root_page_ref = self.allocate_page(PageType::TableInterior);
{
let new_root_page = RefCell::borrow_mut(&new_root_page_ref);
let new_root_page_id = new_root_page.id;
let mut new_root_page_contents = new_root_page.contents.write().unwrap();
let new_root_page_contents = new_root_page_contents.as_mut().unwrap();
/*
Note that we set cell pointer to point to itself, because we will later swap this page's
content with splitted page in order to not update root page idx.
*/
let new_root_page_id = new_root_page_id as u32;
payload.extend_from_slice(&(new_root_page_id as u32).to_be_bytes());
payload.extend(std::iter::repeat(0).take(9));
let n = write_varint(&mut payload.as_mut_slice()[4..], key as u64);
payload.truncate(4 + n);
// write left child cell
self.insert_into_cell(new_root_page_contents, &payload, 0);
// write right child cell
new_root_page_contents
.write_u32(BTREE_HEADER_OFFSET_RIGHTMOST, right_page_id as u32);
}
/* swap splitted page buffer with new root buffer so we don't have to update page idx */
{
let mut new_root_page = RefCell::borrow_mut(&new_root_page_ref);
mem::swap(&mut *new_root_page, &mut *page_rc);
// now swap contents
let mut new_root_page_contents = new_root_page.contents.write().unwrap();
let mut page_contents = page_rc.contents.write().unwrap();
std::mem::swap(&mut *new_root_page_contents, &mut *page_contents);
self.page =
RefCell::new(Some(Rc::new(MemPage::new(None, new_root_page.id, 0))));
}
break;
}
// Propagate split divided to top.
payload.extend_from_slice(&(mem_page.page_idx as u32).to_be_bytes());
payload.extend(std::iter::repeat(0).take(9));
let n = write_varint(&mut payload.as_mut_slice()[4..], key as u64);
payload.truncate(n);
self.page = RefCell::new(Some(mem_page.parent.as_ref().unwrap().clone()));
}
}
fn read_page_sync(&mut self, page_idx: usize) -> Rc<RefCell<Page>> {
loop {
let page_ref = self.pager.read_page(page_idx);
match page_ref {
Ok(p) => return p,
Err(_) => {}
}
}
}
fn allocate_page(&mut self, page_type: PageType) -> Rc<RefCell<Page>> {
let page = self.pager.allocate_page().unwrap();
{
// setup btree page
let contents = RefCell::borrow(&page);
let mut contents = contents.contents.write().unwrap();
let contents = contents.as_mut().unwrap();
let id = page_type as u8;
contents.write_u8(BTREE_HEADER_OFFSET_TYPE, id);
contents.write_u16(BTREE_HEADER_OFFSET_FREEBLOCK, 0);
contents.write_u16(BTREE_HEADER_OFFSET_CELL_COUNT, 0);
let db_header = RefCell::borrow(&self.database_header);
let cell_content_area_start = db_header.page_size - db_header.unused_space as u16;
contents.write_u16(BTREE_HEADER_OFFSET_CELL_CONTENT, cell_content_area_start);
contents.write_u8(BTREE_HEADER_OFFSET_FRAGMENTED, 0);
contents.write_u32(BTREE_HEADER_OFFSET_RIGHTMOST, 0);
}
page
}
/*
Allocate space for a cell on a page.
*/
fn allocate_cell_space(&mut self, page_ref: &PageContent, amount: u16) -> u16 {
let amount = amount as usize;
let (cell_offset, _) = page_ref.cell_get_raw_pointer_region();
let gap = cell_offset + 2 * page_ref.cell_count();
let mut top = page_ref.cell_content_area() as usize;
// there are free blocks and enough space
if page_ref.first_freeblock() != 0 && gap + 2 <= top {
// find slot
let db_header = RefCell::borrow(&self.database_header);
let pc = find_free_cell(page_ref, db_header, amount);
if pc != 0 {
return pc as u16;
}
/* fall through, we might need to defragment */
}
if gap + 2 + amount as usize > top {
// defragment
self.defragment_page(page_ref, RefCell::borrow(&self.database_header));
let mut buf_ref = RefCell::borrow_mut(&page_ref.buffer);
let buf = buf_ref.as_mut_slice();
top = u16::from_be_bytes([buf[5], buf[6]]) as usize;
}
let db_header = RefCell::borrow(&self.database_header);
top -= amount;
{
let mut buf_ref = RefCell::borrow_mut(&page_ref.buffer);
let buf = buf_ref.as_mut_slice();
buf[5..7].copy_from_slice(&(top as u16).to_be_bytes());
}
let usable_space = (db_header.page_size - db_header.unused_space as u16) as usize;
assert!(top + amount <= usable_space);
return top as u16;
}
fn defragment_page(&self, page: &PageContent, db_header: Ref<DatabaseHeader>) {
let cloned_page = page.clone();
let usable_space = (db_header.page_size - db_header.unused_space as u16) as u64;
let mut cbrk = usable_space as u64;
// TODO: implement fast algorithm
let last_cell = (usable_space - 4) as u64;
let first_cell = {
let (start, end) = cloned_page.cell_get_raw_pointer_region();
start + end
};
if cloned_page.cell_count() > 0 {
let page_type = page.page_type();
let buf = RefCell::borrow(&cloned_page.buffer);
let buf = buf.as_slice();
let mut write_buf = RefCell::borrow_mut(&page.buffer);
let write_buf = write_buf.as_mut_slice();
for i in 0..cloned_page.cell_count() {
let cell_offset = page.offset + 8;
let cell_idx = cell_offset + i * 2;
let pc = u16::from_be_bytes([buf[cell_idx], buf[cell_idx + 1]]) as u64;
if pc > last_cell {
unimplemented!("corrupted page");
}
assert!(pc <= last_cell);
let size = match page_type {
PageType::TableInterior => {
let (_, nr_key) = match read_varint(&buf[pc as usize ..]) {
Ok(v) => v,
Err(_) => todo!(
"error while parsing varint from cell, probably treat this as corruption?"
),
};
4 + nr_key as u64
}
PageType::TableLeaf => {
let (payload_size, nr_payload) = match read_varint(&buf[pc as usize..]) {
Ok(v) => v,
Err(_) => todo!(
"error while parsing varint from cell, probably treat this as corruption?"
),
};
let (_, nr_key) = match read_varint(&buf[pc as usize + nr_payload as usize..]) {
Ok(v) => v,
Err(_) => todo!(
"error while parsing varint from cell, probably treat this as corruption?"
),
};
// TODO: add overflow page calculation
payload_size + nr_payload as u64 + nr_key as u64
}
PageType::IndexInterior => todo!(),
PageType::IndexLeaf => todo!(),
};
cbrk -= size;
if cbrk < first_cell as u64 || pc as u64 + size > usable_space as u64 {
todo!("corrupt");
}
assert!(cbrk + size <= usable_space && cbrk >= first_cell as u64);
// set new pointer
write_buf[cell_idx..cell_idx + 2].copy_from_slice(&(cbrk as u16).to_be_bytes());
// copy payload
write_buf[cbrk as usize..cbrk as usize + size as usize]
.copy_from_slice(&buf[pc as usize..pc as usize + size as usize]);
}
}
// assert!( nfree >= 0 );
// if( data[hdr+7]+cbrk-iCellFirst!=pPage->nFree ){
// return SQLITE_CORRUPT_PAGE(pPage);
// }
assert!(cbrk >= first_cell as u64);
let mut write_buf = RefCell::borrow_mut(&page.buffer);
let write_buf = write_buf.as_mut_slice();
// set new first byte of cell content
write_buf[5..7].copy_from_slice(&(cbrk as u16).to_be_bytes());
// set free block to 0, unused spaced can be retrieved from gap between cell pointer end and content start
write_buf[1] = 0;
write_buf[2] = 0;
// set unused space to 0
let first_cell = cloned_page.cell_content_area() as u64;
assert!(first_cell <= cbrk);
write_buf[first_cell as usize..cbrk as usize].fill(0);
}
// Free blocks can be zero, meaning the "real free space" that can be used to allocate is expected to be between first cell byte
// and end of cell pointer area.
fn compute_free_space(&self, page: &PageContent, db_header: Ref<DatabaseHeader>) -> u16 {
let buffer = RefCell::borrow(&page.buffer);
let buf = buffer.as_slice();
let usable_space = (db_header.page_size - db_header.unused_space as u16) as usize;
let mut first_byte_in_cell_content = page.cell_content_area();
if first_byte_in_cell_content == 0 {
first_byte_in_cell_content = u16::MAX;
}
let fragmented_free_bytes = page.num_frag_free_bytes();
let free_block_pointer = page.first_freeblock();
let ncell = page.cell_count();
// 8 + 4 == header end
let first_cell = (page.offset + 8 + 4 + (2 * ncell)) as u16;
let mut nfree = fragmented_free_bytes as usize + first_byte_in_cell_content as usize;
let mut pc = free_block_pointer as usize;
if pc > 0 {
let mut next = 0;
let mut size = 0;
if pc < first_byte_in_cell_content as usize {
// corrupt
todo!("corrupted page");
}
loop {
// TODO: check corruption icellast
next = u16::from_be_bytes(buf[pc..pc + 2].try_into().unwrap()) as usize;
size = u16::from_be_bytes(buf[pc + 2..pc + 4].try_into().unwrap()) as usize;
nfree += size as usize;
if next <= pc + size + 3 {
break;
}
pc = next as usize;
}
if next > 0 {
todo!("corrupted page ascending order");
}
if pc + size > usable_space {
todo!("corrupted page last freeblock extends last page end");
}
}
// if( nFree>usableSize || nFree<iCellFirst ){
// return SQLITE_CORRUPT_PAGE(pPage);
// }
// don't count header and cell pointers?
nfree = nfree - first_cell as usize;
return nfree as u16;
}
}
fn find_free_cell(page_ref: &PageContent, db_header: Ref<DatabaseHeader>, amount: usize) -> usize {
// NOTE: freelist is in ascending order of keys and pc
// unuse_space is reserved bytes at the end of page, therefore we must substract from maxpc
let mut pc = page_ref.first_freeblock() as usize;
let buf_ref = RefCell::borrow(&page_ref.buffer);
let buf = buf_ref.as_slice();
let usable_space = (db_header.page_size - db_header.unused_space as u16) as usize;
let maxpc = (usable_space - amount as usize) as usize;
let mut found = false;
while pc <= maxpc {
let next = u16::from_be_bytes(buf[pc..pc + 2].try_into().unwrap());
let size = u16::from_be_bytes(buf[pc + 2..pc + 4].try_into().unwrap());
if amount <= size as usize {
found = true;
break;
}
pc = next as usize;
}
if !found {
0
} else {
pc
}
}
impl Cursor for BTreeCursor {
@@ -159,8 +804,27 @@ impl Cursor for BTreeCursor {
Ok(self.record.borrow())
}
fn insert(&mut self, _record: &OwnedRecord) -> Result<()> {
unimplemented!()
fn insert(
&mut self,
key: &OwnedValue,
_record: &OwnedRecord,
moved_before: bool, /* Indicate whether it's necessary to traverse to find the leaf page */
) -> Result<CursorResult<()>> {
let int_key = match key {
OwnedValue::Integer(i) => i,
_ => unreachable!("btree tables are indexed by integers!"),
};
if !moved_before {
match self.move_to(*int_key as u64)? {
CursorResult::Ok(_) => {}
CursorResult::IO => return Ok(CursorResult::IO),
};
}
match self.insert_to_page(key, _record)? {
CursorResult::Ok(_) => Ok(CursorResult::Ok(())),
CursorResult::IO => Ok(CursorResult::IO),
}
}
fn set_null_flag(&mut self, flag: bool) {
@@ -170,4 +834,61 @@ impl Cursor for BTreeCursor {
fn get_null_flag(&self) -> bool {
self.null_flag
}
fn exists(&mut self, key: &OwnedValue) -> Result<CursorResult<bool>> {
let int_key = match key {
OwnedValue::Integer(i) => i,
_ => unreachable!("btree tables are indexed by integers!"),
};
match self.move_to(*int_key as u64)? {
CursorResult::Ok(_) => {}
CursorResult::IO => return Ok(CursorResult::IO),
};
let page_ref = self.get_page()?;
let page = RefCell::borrow(&page_ref);
if page.is_locked() {
return Ok(CursorResult::IO);
}
let page = page.contents.read().unwrap();
let page = page.as_ref().unwrap();
// find cell
let int_key = match key {
OwnedValue::Integer(i) => *i as u64,
_ => unreachable!("btree tables are indexed by integers!"),
};
let cell_idx = find_cell(page, int_key);
if cell_idx >= page.cell_count() {
Ok(CursorResult::Ok(false))
} else {
let equals = match &page.cell_get(cell_idx)? {
BTreeCell::TableLeafCell(l) => l._rowid == int_key,
_ => unreachable!(),
};
Ok(CursorResult::Ok(equals))
}
}
}
fn find_cell(page: &PageContent, int_key: u64) -> usize {
let mut cell_idx = 0;
let cell_count = page.cell_count();
while cell_idx < cell_count {
match page.cell_get(cell_idx).unwrap() {
BTreeCell::TableLeafCell(cell) => {
if int_key <= cell._rowid {
break;
}
}
BTreeCell::TableInteriorCell(cell) => {
if int_key <= cell._rowid {
break;
}
}
_ => todo!(),
}
cell_idx += 1;
}
cell_idx
}

View File

@@ -2,7 +2,7 @@ use crate::error::LimboError;
use crate::io::common;
use crate::Result;
use super::{Completion, File, WriteCompletion, IO};
use super::{Completion, File, IO};
use libc::{c_short, fcntl, flock, F_SETLK};
use log::trace;
use polling::{Event, Events, Poller};
@@ -67,7 +67,12 @@ impl IO for DarwinIO {
match cf {
CompletionCallback::Read(ref file, ref c, pos) => {
let mut file = file.borrow_mut();
let mut buf = c.buf_mut();
let c: &Completion = &c;
let r = match c {
Completion::Read(r) => r,
Completion::Write(_) => unreachable!(),
};
let mut buf = r.buf_mut();
file.seek(std::io::SeekFrom::Start(pos as u64))?;
file.read(buf.as_mut_slice())
}
@@ -81,12 +86,12 @@ impl IO for DarwinIO {
};
match result {
std::result::Result::Ok(n) => {
match cf {
match &cf {
CompletionCallback::Read(_, ref c, _) => {
c.complete();
c.complete(0);
}
CompletionCallback::Write(_, ref c, _, _) => {
c.complete(n);
c.complete(n as i32);
}
}
return Ok(());
@@ -105,7 +110,7 @@ enum CompletionCallback {
Read(Rc<RefCell<std::fs::File>>, Rc<Completion>, usize),
Write(
Rc<RefCell<std::fs::File>>,
Rc<WriteCompletion>,
Rc<Completion>,
Rc<RefCell<crate::Buffer>>,
usize,
),
@@ -142,7 +147,10 @@ impl File for DarwinFile {
"Failed locking file. File is locked by another process"
)));
} else {
return Err(LimboError::LockingError(format!("Failed locking file, {}", err)));
return Err(LimboError::LockingError(format!(
"Failed locking file, {}",
err
)));
}
}
Ok(())
@@ -168,17 +176,21 @@ impl File for DarwinFile {
Ok(())
}
fn pread(&self, pos: usize, c: Rc<Completion>) -> Result<()> {
let file = self.file.borrow();
fn pread(&self, pos: usize, c: Rc<Completion>) -> Result<()> {
let file = self.file.borrow();
let result = {
let mut buf = c.buf_mut();
let r = match &(*c) {
Completion::Read(r) => r,
Completion::Write(_) => unreachable!(),
};
let mut buf = r.buf_mut();
rustix::io::pread(file.as_fd(), buf.as_mut_slice(), pos as u64)
};
match result {
std::result::Result::Ok(n) => {
trace!("pread n: {}", n);
// Read succeeded immediately
c.complete();
c.complete(0);
Ok(())
}
Err(Errno::AGAIN) => {
@@ -204,7 +216,7 @@ impl File for DarwinFile {
&self,
pos: usize,
buffer: Rc<RefCell<crate::Buffer>>,
c: Rc<WriteCompletion>,
c: Rc<Completion>,
) -> Result<()> {
let file = self.file.borrow();
let result = {
@@ -215,7 +227,7 @@ impl File for DarwinFile {
std::result::Result::Ok(n) => {
trace!("pwrite n: {}", n);
// Read succeeded immediately
c.complete(n);
c.complete(n as i32);
Ok(())
}
Err(Errno::AGAIN) => {

View File

@@ -1,4 +1,4 @@
use crate::{Completion, File, Result, WriteCompletion, IO};
use crate::{Completion, File, Result, IO};
use log::trace;
use std::cell::RefCell;
use std::io::{Read, Seek, Write};
@@ -45,11 +45,15 @@ impl File for GenericFile {
let mut file = self.file.borrow_mut();
file.seek(std::io::SeekFrom::Start(pos as u64))?;
{
let mut buf = c.buf_mut();
let r = match &(*c) {
Completion::Read(r) => r,
Completion::Write(_) => unreachable!(),
};
let mut buf = r.buf_mut();
let buf = buf.as_mut_slice();
file.read_exact(buf)?;
}
c.complete();
c.complete(0);
Ok(())
}
@@ -57,7 +61,7 @@ impl File for GenericFile {
&self,
pos: usize,
buffer: Rc<RefCell<crate::Buffer>>,
c: Rc<WriteCompletion>,
c: Rc<Completion>,
) -> Result<()> {
let mut file = self.file.borrow_mut();
file.seek(std::io::SeekFrom::Start(pos as u64))?;
@@ -72,4 +76,4 @@ impl Drop for GenericFile {
fn drop(&mut self) {
self.unlock_file().expect("Failed to unlock file");
}
}
}

View File

@@ -1,5 +1,5 @@
use super::{common, Completion, File, WriteCompletion, IO};
use crate::{Result, LimboError};
use super::{common, Completion, File, IO};
use crate::{LimboError, Result};
use libc::{c_short, fcntl, flock, iovec, F_SETLK};
use log::{debug, trace};
use nix::fcntl::{FcntlArg, OFlag};
@@ -95,10 +95,13 @@ impl IO for LinuxIO {
while let Some(cqe) = ring.completion().next() {
let result = cqe.result();
if result < 0 {
return Err(LimboError::LinuxIOError(format!("{}", LinuxIOError::IOUringCQError(result))));
return Err(LimboError::LinuxIOError(format!(
"{}",
LinuxIOError::IOUringCQError(result)
)));
}
let c = unsafe { Rc::from_raw(cqe.user_data() as *const Completion) };
c.complete();
c.complete(cqe.result());
}
Ok(())
}
@@ -130,7 +133,9 @@ impl File for LinuxFile {
if lock_result == -1 {
let err = std::io::Error::last_os_error();
if err.kind() == std::io::ErrorKind::WouldBlock {
return Err(LimboError::LockingError("File is locked by another process".into()));
return Err(LimboError::LockingError(
"File is locked by another process".into(),
));
} else {
return Err(LimboError::IOError(err));
}
@@ -159,11 +164,15 @@ impl File for LinuxFile {
}
fn pread(&self, pos: usize, c: Rc<Completion>) -> Result<()> {
trace!("pread(pos = {}, length = {})", pos, c.buf().len());
let r = match &(*c) {
Completion::Read(r) => r,
Completion::Write(_) => unreachable!(),
};
trace!("pread(pos = {}, length = {})", pos, r.buf().len());
let fd = io_uring::types::Fd(self.file.as_raw_fd());
let mut io = self.io.borrow_mut();
let read_e = {
let mut buf = c.buf_mut();
let mut buf = r.buf_mut();
let len = buf.len();
let buf = buf.as_mut_ptr();
let ptr = Rc::into_raw(c.clone());
@@ -186,7 +195,7 @@ impl File for LinuxFile {
&self,
pos: usize,
buffer: Rc<RefCell<crate::Buffer>>,
c: Rc<WriteCompletion>,
c: Rc<Completion>,
) -> Result<()> {
let mut io = self.io.borrow_mut();
let fd = io_uring::types::Fd(self.file.as_raw_fd());

View File

@@ -1,7 +1,9 @@
use crate::Result;
use cfg_block::cfg_block;
use std::fmt;
use std::{
cell::{Ref, RefCell, RefMut},
fmt::Debug,
mem::ManuallyDrop,
pin::Pin,
rc::Rc,
@@ -11,8 +13,7 @@ pub trait File {
fn lock_file(&self, exclusive: bool) -> Result<()>;
fn unlock_file(&self) -> Result<()>;
fn pread(&self, pos: usize, c: Rc<Completion>) -> Result<()>;
fn pwrite(&self, pos: usize, buffer: Rc<RefCell<Buffer>>, c: Rc<WriteCompletion>)
-> Result<()>;
fn pwrite(&self, pos: usize, buffer: Rc<RefCell<Buffer>>, c: Rc<Completion>) -> Result<()>;
}
pub trait IO {
@@ -21,21 +22,34 @@ pub trait IO {
fn run_once(&self) -> Result<()>;
}
pub type Complete = dyn Fn(&Buffer);
pub type WriteComplete = dyn Fn(usize);
pub type Complete = dyn Fn(Rc<RefCell<Buffer>>);
pub type WriteComplete = dyn Fn(i32);
pub struct Completion {
pub buf: RefCell<Buffer>,
pub enum Completion {
Read(ReadCompletion),
Write(WriteCompletion),
}
pub struct ReadCompletion {
pub buf: Rc<RefCell<Buffer>>,
pub complete: Box<Complete>,
}
impl Completion {
pub fn complete(&self, result: i32) {
match self {
Completion::Read(r) => r.complete(),
Completion::Write(w) => w.complete(result), // fix
}
}
}
pub struct WriteCompletion {
pub complete: Box<WriteComplete>,
}
impl Completion {
pub fn new(buf: Buffer, complete: Box<Complete>) -> Self {
let buf = RefCell::new(buf);
impl ReadCompletion {
pub fn new(buf: Rc<RefCell<Buffer>>, complete: Box<Complete>) -> Self {
Self { buf, complete }
}
@@ -48,8 +62,7 @@ impl Completion {
}
pub fn complete(&self) {
let buf = self.buf.borrow_mut();
(self.complete)(&buf);
(self.complete)(self.buf.clone());
}
}
@@ -57,7 +70,7 @@ impl WriteCompletion {
pub fn new(complete: Box<WriteComplete>) -> Self {
Self { complete }
}
pub fn complete(&self, bytes_written: usize) {
pub fn complete(&self, bytes_written: i32) {
(self.complete)(bytes_written);
}
}
@@ -72,6 +85,12 @@ pub struct Buffer {
drop: BufferDropFn,
}
impl Debug for Buffer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self.data)
}
}
impl Drop for Buffer {
fn drop(&mut self) {
let data = unsafe { ManuallyDrop::take(&mut self.data) };

View File

@@ -43,11 +43,15 @@ impl File for WindowsFile {
let mut file = self.file.borrow_mut();
file.seek(std::io::SeekFrom::Start(pos as u64))?;
{
let mut buf = c.buf_mut();
let r = match &(*c) {
Completion::Read(r) => r,
Completion::Write(_) => unreachable!(),
};
let mut buf = r.buf_mut();
let buf = buf.as_mut_slice();
file.read_exact(buf)?;
}
c.complete();
c.complete(0);
Ok(())
}
@@ -55,7 +59,7 @@ impl File for WindowsFile {
&self,
pos: usize,
buffer: Rc<RefCell<crate::Buffer>>,
_c: Rc<WriteCompletion>,
c: Rc<Completion>,
) -> Result<()> {
let mut file = self.file.borrow_mut();
file.seek(std::io::SeekFrom::Start(pos as u64))?;

View File

@@ -201,6 +201,11 @@ impl Connection {
}
Ok(())
}
pub fn cacheflush(&self) -> Result<()> {
self.pager.cacheflush()?;
Ok(())
}
}
pub struct Statement {

View File

@@ -1,18 +1,22 @@
use crate::buffer_pool::BufferPool;
use crate::sqlite3_ondisk::BTreePage;
use crate::sqlite3_ondisk::PageContent;
use crate::sqlite3_ondisk::{self, DatabaseHeader};
use crate::{PageSource, Result};
use crate::{Buffer, PageSource, Result};
use log::trace;
use sieve_cache::SieveCache;
use std::borrow::Borrow;
use std::cell::RefCell;
use std::collections::HashMap;
use std::hash::Hash;
use std::ptr::{drop_in_place, NonNull};
use std::rc::Rc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
pub struct Page {
flags: AtomicUsize,
pub contents: RwLock<Option<BTreePage>>,
pub flags: AtomicUsize,
pub contents: RwLock<Option<PageContent>>,
pub id: usize,
}
/// Page is up-to-date.
@@ -21,18 +25,21 @@ const PAGE_UPTODATE: usize = 0b001;
const PAGE_LOCKED: usize = 0b010;
/// Page had an I/O error.
const PAGE_ERROR: usize = 0b100;
/// Page is dirty. Flush needed.
const PAGE_DIRTY: usize = 0b1000;
impl Default for Page {
fn default() -> Self {
Self::new()
Self::new(0)
}
}
impl Page {
pub fn new() -> Page {
pub fn new(id: usize) -> Page {
Page {
flags: AtomicUsize::new(0),
contents: RwLock::new(None),
id,
}
}
@@ -71,6 +78,162 @@ impl Page {
pub fn clear_error(&self) {
self.flags.fetch_and(!PAGE_ERROR, Ordering::SeqCst);
}
pub fn is_dirty(&self) -> bool {
self.flags.load(Ordering::SeqCst) & PAGE_DIRTY != 0
}
pub fn set_dirty(&self) {
self.flags.fetch_or(PAGE_DIRTY, Ordering::SeqCst);
}
pub fn clear_dirty(&self) {
self.flags.fetch_and(!PAGE_DIRTY, Ordering::SeqCst);
}
}
struct PageCacheEntry {
key: usize,
page: Rc<RefCell<Page>>,
prev: Option<NonNull<PageCacheEntry>>,
next: Option<NonNull<PageCacheEntry>>,
}
impl PageCacheEntry {
fn into_non_null(&mut self) -> NonNull<PageCacheEntry> {
NonNull::new(&mut *self).unwrap()
}
}
struct DumbLruPageCache {
capacity: usize,
map: RefCell<HashMap<usize, NonNull<PageCacheEntry>>>,
head: RefCell<Option<NonNull<PageCacheEntry>>>,
tail: RefCell<Option<NonNull<PageCacheEntry>>>,
}
impl DumbLruPageCache {
pub fn new(capacity: usize) -> Self {
Self {
capacity: capacity,
map: RefCell::new(HashMap::new()),
head: RefCell::new(None),
tail: RefCell::new(None),
}
}
pub fn insert(&mut self, key: usize, value: Rc<RefCell<Page>>) {
self.delete(key);
let mut entry = Box::new(PageCacheEntry {
key: key,
next: None,
prev: None,
page: value,
});
self.touch(&mut entry);
if self.map.borrow().len() >= self.capacity {
self.pop_if_not_dirty();
}
let b = Box::into_raw(entry);
let as_non_null = NonNull::new(b).unwrap();
self.map.borrow_mut().insert(key, as_non_null);
}
pub fn delete(&mut self, key: usize) {
let ptr = self.map.borrow_mut().remove(&key);
if ptr.is_none() {
return;
}
let mut ptr = ptr.unwrap();
{
let ptr = unsafe { ptr.as_mut() };
self.detach(ptr);
}
unsafe { drop_in_place(ptr.as_ptr()) };
}
fn get_ptr(&mut self, key: usize) -> Option<NonNull<PageCacheEntry>> {
let m = self.map.borrow_mut();
let ptr = m.get(&key);
match ptr {
Some(v) => Some(*v),
None => None,
}
}
pub fn get(&mut self, key: &usize) -> Option<Rc<RefCell<Page>>> {
let ptr = self.get_ptr(*key);
if ptr.is_none() {
return None;
}
let ptr = unsafe { ptr.unwrap().as_mut() };
let page = ptr.page.clone();
self.detach(ptr);
self.touch(ptr);
return Some(page);
}
pub fn resize(&mut self, capacity: usize) {
let _ = capacity;
todo!();
}
fn detach(&mut self, entry: &mut PageCacheEntry) {
let mut current = entry.into_non_null();
let (next, prev) = unsafe {
let c = current.as_mut();
let next = c.next;
let prev = c.prev;
c.prev = None;
c.next = None;
(next, prev)
};
// detach
match (prev, next) {
(None, None) => {}
(None, Some(_)) => todo!(),
(Some(p), None) => {
self.tail = RefCell::new(Some(p));
}
(Some(mut p), Some(mut n)) => unsafe {
let p_mut = p.as_mut();
p_mut.next = Some(n);
let n_mut = n.as_mut();
n_mut.prev = Some(p);
},
};
}
fn touch(&mut self, entry: &mut PageCacheEntry) {
let mut current = entry.into_non_null();
unsafe {
let c = current.as_mut();
c.next = *self.head.borrow();
}
if let Some(mut head) = *self.head.borrow_mut() {
unsafe {
let head = head.as_mut();
head.prev = Some(current);
}
}
}
fn pop_if_not_dirty(&mut self) {
let tail = *self.tail.borrow();
if tail.is_none() {
return;
}
let tail = unsafe { tail.unwrap().as_mut() };
if RefCell::borrow(&tail.page).is_dirty() {
// TODO: drop from another clean entry?
return;
}
self.detach(tail);
}
}
pub struct PageCache<K: Eq + Hash + Clone, V> {
@@ -101,12 +264,13 @@ impl<K: Eq + Hash + Clone, V> PageCache<K, V> {
pub struct Pager {
/// Source of the database pages.
pub page_source: PageSource,
/// Cache for storing loaded pages.
page_cache: RefCell<PageCache<usize, Rc<Page>>>,
page_cache: RefCell<DumbLruPageCache>,
/// Buffer pool for temporary data storage.
buffer_pool: Rc<BufferPool>,
/// I/O interface for input/output operations.
pub io: Arc<dyn crate::io::IO>,
dirty_pages: Rc<RefCell<Vec<Rc<RefCell<Page>>>>>,
db_header: Rc<RefCell<DatabaseHeader>>,
}
impl Pager {
@@ -117,32 +281,34 @@ impl Pager {
/// Completes opening a database by initializing the Pager with the database header.
pub fn finish_open(
db_header: Rc<RefCell<DatabaseHeader>>,
db_header_ref: Rc<RefCell<DatabaseHeader>>,
page_source: PageSource,
io: Arc<dyn crate::io::IO>,
) -> Result<Self> {
let db_header = db_header.borrow();
let db_header = RefCell::borrow(&db_header_ref);
let page_size = db_header.page_size as usize;
let buffer_pool = Rc::new(BufferPool::new(page_size));
let page_cache = RefCell::new(PageCache::new(SieveCache::new(10).unwrap()));
let page_cache = RefCell::new(DumbLruPageCache::new(10));
Ok(Self {
page_source,
buffer_pool,
page_cache,
io,
dirty_pages: Rc::new(RefCell::new(Vec::new())),
db_header: db_header_ref.clone(),
})
}
/// Reads a page from the database.
pub fn read_page(&self, page_idx: usize) -> Result<Rc<Page>> {
pub fn read_page(&self, page_idx: usize) -> crate::Result<Rc<RefCell<Page>>> {
trace!("read_page(page_idx = {})", page_idx);
let mut page_cache = self.page_cache.borrow_mut();
if let Some(page) = page_cache.get(&page_idx) {
return Ok(page.clone());
}
let page = Rc::new(Page::new());
page.set_locked();
sqlite3_ondisk::begin_read_btree_page(
let page = Rc::new(RefCell::new(Page::new(page_idx)));
RefCell::borrow(&page).set_locked();
sqlite3_ondisk::begin_read_page(
&self.page_source,
self.buffer_pool.clone(),
page.clone(),
@@ -161,4 +327,66 @@ impl Pager {
pub fn change_page_cache_size(&self, capacity: usize) {
self.page_cache.borrow_mut().resize(capacity);
}
pub fn add_dirty(&self, page: Rc<RefCell<Page>>) {
// TODO: cehck duplicates?
let mut dirty_pages = RefCell::borrow_mut(&self.dirty_pages);
dirty_pages.push(page);
}
pub fn cacheflush(&self) -> Result<()> {
let mut dirty_pages = RefCell::borrow_mut(&self.dirty_pages);
if dirty_pages.len() == 0 {
return Ok(());
}
loop {
if dirty_pages.len() == 0 {
break;
}
let page = dirty_pages.pop().unwrap();
sqlite3_ondisk::begin_write_btree_page(self, &page)?;
}
self.io.run_once()?;
Ok(())
}
/*
Get's a new page that increasing the size of the page or uses a free page.
Currently free list pages are not yet supported.
*/
pub fn allocate_page(&self) -> Result<Rc<RefCell<Page>>> {
let header = &self.db_header;
let mut header = RefCell::borrow_mut(&header);
header.database_size += 1;
{
// update database size
let first_page_ref = self.read_page(1).unwrap();
let first_page = RefCell::borrow_mut(&first_page_ref);
first_page.set_dirty();
self.add_dirty(first_page_ref.clone());
let contents = first_page.contents.write().unwrap();
let contents = contents.as_ref().unwrap();
contents.write_database_header(&header);
}
let page_ref = Rc::new(RefCell::new(Page::new(0)));
{
// setup page and add to cache
self.add_dirty(page_ref.clone());
let mut page = RefCell::borrow_mut(&page_ref);
page.set_dirty();
page.id = header.database_size as usize;
let buffer = self.buffer_pool.get();
let bp = self.buffer_pool.clone();
let drop_fn = Rc::new(move |buf| {
bp.put(buf);
});
let buffer = Rc::new(RefCell::new(Buffer::new(buffer, drop_fn)));
page.contents = RwLock::new(Some(PageContent { offset: 0, buffer }));
let mut cache = RefCell::borrow_mut(&self.page_cache);
cache.insert(page.id, page_ref.clone());
}
Ok(page_ref)
}
}

View File

@@ -50,9 +50,16 @@ impl Cursor for PseudoCursor {
Ok(self.current.borrow())
}
fn insert(&mut self, record: &OwnedRecord) -> Result<()> {
fn insert(
&mut self,
key: &OwnedValue,
record: &OwnedRecord,
moved_before: bool,
) -> Result<CursorResult<()>> {
let _ = key;
let _ = moved_before;
*self.current.borrow_mut() = Some(record.clone());
Ok(())
Ok(CursorResult::Ok(()))
}
fn get_null_flag(&self) -> bool {
@@ -62,4 +69,9 @@ impl Cursor for PseudoCursor {
fn set_null_flag(&mut self, _null_flag: bool) {
// Do nothing
}
fn exists(&mut self, key: &OwnedValue) -> Result<CursorResult<bool>> {
let _ = key;
todo!()
}
}

View File

@@ -83,6 +83,13 @@ impl Table {
Table::Pseudo(table) => &table.columns,
}
}
pub fn has_rowid(&self) -> bool {
match self {
Table::BTree(table) => table.has_rowid,
Table::Pseudo(_) => todo!(),
}
}
}
impl PartialEq for Table {

View File

@@ -25,7 +25,7 @@
/// For more information, see: https://www.sqlite.org/fileformat.html
use crate::buffer_pool::BufferPool;
use crate::error::LimboError;
use crate::io::{Buffer, Completion, WriteCompletion};
use crate::io::{Buffer, Completion, ReadCompletion, WriteCompletion};
use crate::pager::{Page, Pager};
use crate::types::{OwnedRecord, OwnedValue};
use crate::{PageSource, Result};
@@ -47,12 +47,12 @@ pub struct DatabaseHeader {
pub page_size: u16,
write_version: u8,
read_version: u8,
unused_space: u8,
pub unused_space: u8,
max_embed_frac: u8,
min_embed_frac: u8,
min_leaf_frac: u8,
change_counter: u32,
database_size: u32,
pub database_size: u32,
freelist_trunk_page: u32,
freelist_pages: u32,
schema_cookie: u32,
@@ -70,19 +70,23 @@ pub struct DatabaseHeader {
pub fn begin_read_database_header(page_source: &PageSource) -> Result<Rc<RefCell<DatabaseHeader>>> {
let drop_fn = Rc::new(|_buf| {});
let buf = Buffer::allocate(512, drop_fn);
let buf = Rc::new(RefCell::new(Buffer::allocate(512, drop_fn)));
let result = Rc::new(RefCell::new(DatabaseHeader::default()));
let header = result.clone();
let complete = Box::new(move |buf: &Buffer| {
let complete = Box::new(move |buf: Rc<RefCell<Buffer>>| {
let header = header.clone();
finish_read_database_header(buf, header).unwrap();
});
let c = Rc::new(Completion::new(buf, complete));
let c = Rc::new(Completion::Read(ReadCompletion::new(buf, complete)));
page_source.get(1, c.clone())?;
Ok(result)
}
fn finish_read_database_header(buf: &Buffer, header: Rc<RefCell<DatabaseHeader>>) -> Result<()> {
fn finish_read_database_header(
buf: Rc<RefCell<Buffer>>,
header: Rc<RefCell<DatabaseHeader>>,
) -> Result<()> {
let buf = buf.borrow();
let buf = buf.as_slice();
let mut header = std::cell::RefCell::borrow_mut(&header);
header.magic.copy_from_slice(&buf[0..16]);
@@ -123,38 +127,14 @@ pub fn begin_write_database_header(header: &DatabaseHeader, pager: &Pager) -> Re
let buffer_to_copy_in_cb = buffer_to_copy.clone();
let header_cb = header.clone();
let complete = Box::new(move |buffer: &Buffer| {
let complete = Box::new(move |buffer: Rc<RefCell<Buffer>>| {
let header = header_cb.clone();
let buffer: Buffer = buffer.clone();
let buffer: Buffer = buffer.borrow().clone();
let buffer = Rc::new(RefCell::new(buffer));
{
let mut buf_mut = std::cell::RefCell::borrow_mut(&buffer);
let buf = buf_mut.as_mut_slice();
buf[0..16].copy_from_slice(&header.magic);
buf[16..18].copy_from_slice(&header.page_size.to_be_bytes());
buf[18] = header.write_version;
buf[19] = header.read_version;
buf[20] = header.unused_space;
buf[21] = header.max_embed_frac;
buf[22] = header.min_embed_frac;
buf[23] = header.min_leaf_frac;
buf[24..28].copy_from_slice(&header.change_counter.to_be_bytes());
buf[28..32].copy_from_slice(&header.database_size.to_be_bytes());
buf[32..36].copy_from_slice(&header.freelist_trunk_page.to_be_bytes());
buf[36..40].copy_from_slice(&header.freelist_pages.to_be_bytes());
buf[40..44].copy_from_slice(&header.schema_cookie.to_be_bytes());
buf[44..48].copy_from_slice(&header.schema_format.to_be_bytes());
buf[48..52].copy_from_slice(&header.default_cache_size.to_be_bytes());
buf[52..56].copy_from_slice(&header.vacuum.to_be_bytes());
buf[56..60].copy_from_slice(&header.text_encoding.to_be_bytes());
buf[60..64].copy_from_slice(&header.user_version.to_be_bytes());
buf[64..68].copy_from_slice(&header.incremental_vacuum.to_be_bytes());
buf[68..72].copy_from_slice(&header.application_id.to_be_bytes());
buf[72..92].copy_from_slice(&header.reserved);
buf[92..96].copy_from_slice(&header.version_valid_for.to_be_bytes());
buf[96..100].copy_from_slice(&header.version_number.to_be_bytes());
write_header_to_buf(buf, &header);
let mut buffer_to_copy = std::cell::RefCell::borrow_mut(&buffer_to_copy_in_cb);
let buffer_to_copy_slice = buffer_to_copy.as_mut_slice();
@@ -163,39 +143,57 @@ pub fn begin_write_database_header(header: &DatabaseHeader, pager: &Pager) -> Re
});
let drop_fn = Rc::new(|_buf| {});
let buf = Buffer::allocate(512, drop_fn);
let c = Rc::new(Completion::new(buf.clone(), complete));
let buf = Rc::new(RefCell::new(Buffer::allocate(512, drop_fn)));
let c = Rc::new(Completion::Read(ReadCompletion::new(buf.clone(), complete)));
page_source.get(1, c.clone())?;
// run get header block
pager.io.run_once()?;
let buffer_in_cb = buffer_to_copy.clone();
let write_complete = Box::new(move |bytes_written: usize| {
let write_complete = Box::new(move |bytes_written: i32| {
let buf = buffer_in_cb.clone();
let buf_len = std::cell::RefCell::borrow(&buf).len();
if bytes_written < buf_len {
if bytes_written < buf_len as i32 {
log::error!("wrote({bytes_written}) less than expected({buf_len})");
}
// finish_read_database_header(buf, header).unwrap();
});
let c = Rc::new(WriteCompletion::new(write_complete));
let c = Rc::new(Completion::Write(WriteCompletion::new(write_complete)));
page_source.write(0, buffer_to_copy.clone(), c).unwrap();
Ok(())
}
#[derive(Debug)]
pub struct BTreePageHeader {
page_type: PageType,
_first_freeblock_offset: u16,
num_cells: u16,
_cell_content_area: u16,
_num_frag_free_bytes: u8,
pub(crate) right_most_pointer: Option<u32>,
fn write_header_to_buf(buf: &mut [u8], header: &DatabaseHeader) {
buf[0..16].copy_from_slice(&header.magic);
buf[16..18].copy_from_slice(&header.page_size.to_be_bytes());
buf[18] = header.write_version;
buf[19] = header.read_version;
buf[20] = header.unused_space;
buf[21] = header.max_embed_frac;
buf[22] = header.min_embed_frac;
buf[23] = header.min_leaf_frac;
buf[24..28].copy_from_slice(&header.change_counter.to_be_bytes());
buf[28..32].copy_from_slice(&header.database_size.to_be_bytes());
buf[32..36].copy_from_slice(&header.freelist_trunk_page.to_be_bytes());
buf[36..40].copy_from_slice(&header.freelist_pages.to_be_bytes());
buf[40..44].copy_from_slice(&header.schema_cookie.to_be_bytes());
buf[44..48].copy_from_slice(&header.schema_format.to_be_bytes());
buf[48..52].copy_from_slice(&header.default_cache_size.to_be_bytes());
buf[52..56].copy_from_slice(&header.vacuum.to_be_bytes());
buf[56..60].copy_from_slice(&header.text_encoding.to_be_bytes());
buf[60..64].copy_from_slice(&header.user_version.to_be_bytes());
buf[64..68].copy_from_slice(&header.incremental_vacuum.to_be_bytes());
buf[68..72].copy_from_slice(&header.application_id.to_be_bytes());
buf[72..92].copy_from_slice(&header.reserved);
buf[92..96].copy_from_slice(&header.version_valid_for.to_be_bytes());
buf[96..100].copy_from_slice(&header.version_number.to_be_bytes());
}
#[repr(u8)]
#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Clone)]
pub enum PageType {
IndexInterior = 2,
TableInterior = 5,
@@ -218,15 +216,193 @@ impl TryFrom<u8> for PageType {
}
#[derive(Debug)]
pub struct BTreePage {
pub header: BTreePageHeader,
pub cells: Vec<BTreeCell>,
pub struct PageContent {
pub offset: usize,
pub buffer: Rc<RefCell<Buffer>>,
}
pub fn begin_read_btree_page(
impl Clone for PageContent {
fn clone(&self) -> Self {
Self {
offset: self.offset,
buffer: Rc::new(RefCell::new((*self.buffer.borrow()).clone())),
}
}
}
impl PageContent {
pub fn page_type(&self) -> PageType {
self.read_u8(self.offset).try_into().unwrap()
}
fn read_u8(&self, pos: usize) -> u8 {
// unsafe trick to borrow twice
unsafe {
let buf_pointer = &self.buffer.as_ptr();
let buf = (*buf_pointer).as_ref().unwrap().as_slice();
buf[pos]
}
}
fn read_u16(&self, pos: usize) -> u16 {
unsafe {
let buf_pointer = &self.buffer.as_ptr();
let buf = (*buf_pointer).as_ref().unwrap().as_slice();
u16::from_be_bytes([buf[self.offset + pos], buf[self.offset + pos + 1]])
}
}
fn read_u32(&self, pos: usize) -> u32 {
unsafe {
let buf_pointer = &self.buffer.as_ptr();
let buf = (*buf_pointer).as_ref().unwrap().as_slice();
u32::from_be_bytes([
buf[self.offset + pos],
buf[self.offset + pos + 1],
buf[self.offset + pos + 2],
buf[self.offset + pos + 3],
])
}
}
pub fn write_u8(&self, pos: usize, value: u8) {
unsafe {
let buf_pointer = &self.buffer.as_ptr();
let buf = (*buf_pointer).as_mut().unwrap().as_mut_slice();
buf[self.offset + pos] = value;
}
}
pub fn write_u16(&self, pos: usize, value: u16) {
unsafe {
let buf_pointer = &self.buffer.as_ptr();
let buf = (*buf_pointer).as_mut().unwrap().as_mut_slice();
buf[self.offset + pos..self.offset + pos + 2].copy_from_slice(&value.to_be_bytes());
}
}
pub fn write_u32(&self, pos: usize, value: u32) {
unsafe {
let buf_pointer = &self.buffer.as_ptr();
let buf = (*buf_pointer).as_mut().unwrap().as_mut_slice();
buf[self.offset + pos..self.offset + pos + 4].copy_from_slice(&value.to_be_bytes());
}
}
pub fn first_freeblock(&self) -> u16 {
self.read_u16(1)
}
pub fn cell_count(&self) -> usize {
self.read_u16(3) as usize
}
pub fn cell_content_area(&self) -> u16 {
self.read_u16(5) as u16
}
pub fn num_frag_free_bytes(&self) -> u8 {
self.read_u8(7) as u8
}
pub fn rightmost_pointer(&self) -> Option<u32> {
match self.page_type() {
PageType::IndexInterior => Some(self.read_u32(8)),
PageType::TableInterior => Some(self.read_u32(8)),
PageType::IndexLeaf => None,
PageType::TableLeaf => None,
}
}
pub fn cell_get(&self, idx: usize) -> Result<BTreeCell> {
let buf = self.buffer.borrow();
let buf = buf.as_slice();
let ncells = self.cell_count();
let cell_start = match self.page_type() {
PageType::IndexInterior => 12,
PageType::TableInterior => 12,
PageType::IndexLeaf => 8,
PageType::TableLeaf => 8,
};
assert!(idx < ncells, "cell_get: idx out of bounds");
let cell_pointer = cell_start + (idx * 2);
let cell_pointer = self.read_u16(cell_pointer) as usize;
read_btree_cell(buf, &self.page_type(), cell_pointer)
}
pub fn cell_get_raw_pointer_region(&self) -> (usize, usize) {
let cell_start = match self.page_type() {
PageType::IndexInterior => 12,
PageType::TableInterior => 12,
PageType::IndexLeaf => 8,
PageType::TableLeaf => 8,
};
(self.offset + cell_start, self.cell_count() * 2)
}
pub fn cell_get_raw_region(&self, idx: usize) -> (usize, usize) {
let mut buf = self.buffer.borrow_mut();
let buf = buf.as_mut_slice();
self.cell_get_raw_region_borrowed(idx, buf)
}
pub fn cell_get_raw_region_borrowed(&self, idx: usize, buf: &mut [u8]) -> (usize, usize) {
let ncells = self.cell_count();
let cell_start = match self.page_type() {
PageType::IndexInterior => 12,
PageType::TableInterior => 12,
PageType::IndexLeaf => 8,
PageType::TableLeaf => 8,
};
assert!(idx < ncells, "cell_get: idx out of bounds");
let cell_pointer = cell_start + (idx * 2);
let cell_pointer = self.read_u16(cell_pointer) as usize;
let start = cell_pointer;
let len = match self.page_type() {
PageType::IndexInterior => {
let (len_payload, n_payload) = read_varint(&buf[cell_pointer + 4..]).unwrap();
4 + len_payload as usize + n_payload + 4
}
PageType::TableInterior => {
let (_, n_rowid) = read_varint(&buf[cell_pointer + 4..]).unwrap();
4 + n_rowid
}
PageType::IndexLeaf => {
let (len_payload, n_payload) = read_varint(&buf[cell_pointer..]).unwrap();
len_payload as usize + n_payload + 4
}
PageType::TableLeaf => {
let (len_payload, n_payload) = read_varint(&buf[cell_pointer..]).unwrap();
let (_, n_rowid) = read_varint(&buf[cell_pointer + n_payload..]).unwrap();
// TODO: add overflow 4 bytes
len_payload as usize + n_payload + n_rowid
}
};
(start, len)
}
pub fn is_leaf(&self) -> bool {
match self.page_type() {
PageType::IndexInterior => false,
PageType::TableInterior => false,
PageType::IndexLeaf => true,
PageType::TableLeaf => true,
}
}
pub fn write_database_header(&self, header: &DatabaseHeader) {
let mut buf = self.buffer.borrow_mut();
let buf = buf.as_mut_slice();
write_header_to_buf(buf, header);
}
}
pub fn begin_read_page(
page_source: &PageSource,
buffer_pool: Rc<BufferPool>,
page: Rc<Page>,
page: Rc<RefCell<Page>>,
page_idx: usize,
) -> Result<()> {
trace!("begin_read_btree_page(page_idx = {})", page_idx);
@@ -235,59 +411,67 @@ pub fn begin_read_btree_page(
let buffer_pool = buffer_pool.clone();
buffer_pool.put(buf);
});
let buf = Buffer::new(buf, drop_fn);
let complete = Box::new(move |buf: &Buffer| {
let buf = Rc::new(RefCell::new(Buffer::new(buf, drop_fn)));
let complete = Box::new(move |buf: Rc<RefCell<Buffer>>| {
let page = page.clone();
if finish_read_btree_page(page_idx, buf, page.clone()).is_err() {
page.set_error();
if finish_read_page(page_idx, buf, page.clone()).is_err() {
page.borrow_mut().set_error();
}
});
let c = Rc::new(Completion::new(buf, complete));
let c = Rc::new(Completion::Read(ReadCompletion::new(buf, complete)));
page_source.get(page_idx, c.clone())?;
Ok(())
}
fn finish_read_btree_page(page_idx: usize, buf: &Buffer, page: Rc<Page>) -> Result<()> {
fn finish_read_page(
page_idx: usize,
buffer_ref: Rc<RefCell<Buffer>>,
page: Rc<RefCell<Page>>,
) -> Result<()> {
trace!("finish_read_btree_page(page_idx = {})", page_idx);
let mut pos = if page_idx == 1 {
let pos = if page_idx == 1 {
DATABASE_HEADER_SIZE
} else {
0
};
let buf = buf.as_slice();
let mut header = BTreePageHeader {
page_type: buf[pos].try_into()?,
_first_freeblock_offset: u16::from_be_bytes([buf[pos + 1], buf[pos + 2]]),
num_cells: u16::from_be_bytes([buf[pos + 3], buf[pos + 4]]),
_cell_content_area: u16::from_be_bytes([buf[pos + 5], buf[pos + 6]]),
_num_frag_free_bytes: buf[pos + 7],
right_most_pointer: None,
let inner = PageContent {
offset: pos,
buffer: buffer_ref.clone(),
};
pos += 8;
if header.page_type == PageType::IndexInterior || header.page_type == PageType::TableInterior {
header.right_most_pointer = Some(u32::from_be_bytes([
buf[pos],
buf[pos + 1],
buf[pos + 2],
buf[pos + 3],
]));
pos += 4;
{
let page = page.borrow_mut();
page.contents.write().unwrap().replace(inner);
page.set_uptodate();
page.clear_locked();
}
let mut cells = Vec::with_capacity(header.num_cells as usize);
for _ in 0..header.num_cells {
let cell_pointer = u16::from_be_bytes([buf[pos], buf[pos + 1]]);
pos += 2;
let cell = read_btree_cell(buf, &header.page_type, cell_pointer as usize)?;
cells.push(cell);
}
let inner = BTreePage { header, cells };
page.contents.write().unwrap().replace(inner);
page.set_uptodate();
page.clear_locked();
Ok(())
}
#[derive(Debug)]
pub fn begin_write_btree_page(pager: &Pager, page: &Rc<RefCell<Page>>) -> Result<()> {
let page_source = &pager.page_source;
let page_finish = page.clone();
let page = page.borrow();
let contents = page.contents.read().unwrap();
let contents = contents.as_ref().unwrap();
let buffer = contents.buffer.clone();
let write_complete = {
let buf_copy = buffer.clone();
Box::new(move |bytes_written: i32| {
let buf_copy = buf_copy.clone();
let buf_len = buf_copy.borrow().len();
page_finish.borrow_mut().clear_dirty();
if bytes_written < buf_len as i32 {
log::error!("wrote({bytes_written}) less than expected({buf_len})");
}
})
};
let c = Rc::new(Completion::Write(WriteCompletion::new(write_complete)));
page_source.write(page.id, buffer.clone(), c)?;
Ok(())
}
#[derive(Debug, Clone)]
pub enum BTreeCell {
TableInteriorCell(TableInteriorCell),
TableLeafCell(TableLeafCell),
@@ -295,27 +479,27 @@ pub enum BTreeCell {
IndexLeafCell(IndexLeafCell),
}
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct TableInteriorCell {
pub _left_child_page: u32,
pub _rowid: u64,
}
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct TableLeafCell {
pub _rowid: u64,
pub _payload: Vec<u8>,
pub first_overflow_page: Option<u32>,
}
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct IndexInteriorCell {
pub left_child_page: u32,
pub payload: Vec<u8>,
pub first_overflow_page: Option<u32>,
}
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct IndexLeafCell {
pub payload: Vec<u8>,
pub first_overflow_page: Option<u32>,
@@ -442,15 +626,14 @@ pub fn read_record(payload: &[u8]) -> Result<OwnedRecord> {
let (serial_type, nr) = read_varint(&payload[pos..])?;
let serial_type = SerialType::try_from(serial_type)?;
serial_types.push(serial_type);
assert!(pos + nr < payload.len());
pos += nr;
assert!(header_size >= nr);
header_size -= nr;
}
let mut values = Vec::with_capacity(serial_types.len());
for serial_type in &serial_types {
let (value, usize) = read_value(&payload[pos..], serial_type)?;
pos += usize;
let (value, n) = read_value(&payload[pos..], serial_type)?;
pos += n;
values.push(value);
}
Ok(OwnedRecord::new(values))
@@ -544,7 +727,7 @@ pub fn read_value(buf: &[u8], serial_type: &SerialType) -> Result<(OwnedValue, u
}
}
fn read_varint(buf: &[u8]) -> Result<(u64, usize)> {
pub fn read_varint(buf: &[u8]) -> Result<(u64, usize)> {
let mut v: u64 = 0;
for i in 0..8 {
match buf.get(i) {
@@ -563,6 +746,45 @@ fn read_varint(buf: &[u8]) -> Result<(u64, usize)> {
Ok((v, 9))
}
pub fn write_varint(buf: &mut [u8], value: u64) -> usize {
if value <= 0x7f {
buf[0] = (value & 0x7f) as u8;
return 1;
}
if value <= 0x3fff {
buf[0] = (((value >> 7) & 0x7f) | 0x80) as u8;
buf[1] = (value & 0x7f) as u8;
return 2;
}
let mut value = value;
if (value & ((0xff000000_u64) << 32)) > 0 {
buf[8] = value as u8;
value >>= 8;
for i in (0..8).rev() {
buf[i] = ((value & 0x7f) | 0x80) as u8;
value >>= 7;
}
return 9;
}
let mut encoded: [u8; 10] = [0; 10];
let mut bytes = value;
let mut n = 0;
while bytes != 0 {
let v = 0x80 | (bytes & 0x7f);
encoded[n] = v as u8;
bytes >>= 7;
n += 1;
}
encoded[0] &= 0x7f;
for i in 0..n {
buf[i] = encoded[n - 1 - i];
}
return n;
}
#[cfg(test)]
mod tests {
use super::*;
@@ -651,4 +873,31 @@ mod tests {
let result = read_varint(&buf);
assert!(result.is_err());
}
// ** 0x00 becomes 0x00000000
// ** 0x7f becomes 0x0000007f
// ** 0x81 0x00 becomes 0x00000080
// ** 0x82 0x00 becomes 0x00000100
// ** 0x80 0x7f becomes 0x0000007f
// ** 0x81 0x91 0xd1 0xac 0x78 becomes 0x12345678
// ** 0x81 0x81 0x81 0x81 0x01 becomes 0x10204081
#[rstest]
#[case((0, 1), &[0x00])]
#[case((1, 1), &[0x01])]
#[case((129, 2), &[0x81, 0x01] )]
#[case((16513, 3), &[0x81, 0x81, 0x01] )]
#[case((2113665, 4), &[0x81, 0x81, 0x81, 0x01] )]
#[case((270549121, 5), &[0x81, 0x81, 0x81, 0x81, 0x01] )]
#[case((34630287489, 6), &[0x81, 0x81, 0x81, 0x81, 0x81, 0x01] )]
#[case((4432676798593, 7), &[0x81, 0x81, 0x81, 0x81, 0x81, 0x81, 0x01] )]
#[case((567382630219905, 8), &[0x81, 0x81, 0x81, 0x81, 0x81, 0x81, 0x81, 0x01] )]
#[case((145249953336295681, 9), &[0x81, 0x81, 0x81, 0x81, 0x81, 0x81, 0x81, 0x81, 0x01] )]
fn test_write_varint(#[case] value: (u64, usize), #[case] output: &[u8]) {
let mut buf: [u8; 10] = [0; 10];
let n = write_varint(&mut buf, value.0);
assert_eq!(n, value.1);
for i in 0..output.len() {
assert_eq!(buf[i], output[i]);
}
}
}

View File

@@ -1,10 +1,6 @@
#[cfg(feature = "fs")]
use crate::io::File;
use crate::{
error::LimboError,
io::{Completion, WriteCompletion},
Buffer, Result,
};
use crate::{error::LimboError, io::Completion, Buffer, Result};
use std::{cell::RefCell, rc::Rc};
pub struct PageSource {
@@ -39,7 +35,7 @@ impl PageSource {
&self,
page_idx: usize,
buffer: Rc<RefCell<Buffer>>,
c: Rc<WriteCompletion>,
c: Rc<Completion>,
) -> Result<()> {
self.io.write(page_idx, buffer, c)
}
@@ -47,12 +43,7 @@ impl PageSource {
pub trait PageIO {
fn get(&self, page_idx: usize, c: Rc<Completion>) -> Result<()>;
fn write(
&self,
page_idx: usize,
buffer: Rc<RefCell<Buffer>>,
c: Rc<WriteCompletion>,
) -> Result<()>;
fn write(&self, page_idx: usize, buffer: Rc<RefCell<Buffer>>, c: Rc<Completion>) -> Result<()>;
}
#[cfg(feature = "fs")]
@@ -63,7 +54,11 @@ struct FileStorage {
#[cfg(feature = "fs")]
impl PageIO for FileStorage {
fn get(&self, page_idx: usize, c: Rc<Completion>) -> Result<()> {
let size = c.buf().len();
let r = match &(*c) {
Completion::Read(r) => r,
Completion::Write(_) => unreachable!(),
};
let size = r.buf().len();
assert!(page_idx > 0);
if size < 512 || size > 65536 || size & (size - 1) != 0 {
return Err(LimboError::NotADB.into());
@@ -73,17 +68,13 @@ impl PageIO for FileStorage {
Ok(())
}
fn write(
&self,
page_idx: usize,
buffer: Rc<RefCell<Buffer>>,
c: Rc<WriteCompletion>,
) -> Result<()> {
fn write(&self, page_idx: usize, buffer: Rc<RefCell<Buffer>>, c: Rc<Completion>) -> Result<()> {
let buffer_size = buffer.borrow().len();
assert!(buffer_size >= 512);
assert!(buffer_size <= 65536);
assert!((buffer_size & (buffer_size - 1)) == 0);
self.file.pwrite(page_idx, buffer, c)?;
let pos = (page_idx - 1) * buffer_size;
self.file.pwrite(pos, buffer, c)?;
Ok(())
}
}

View File

@@ -3,7 +3,7 @@ use sqlite3_parser::ast::{self, Expr, UnaryOperator};
use crate::{
function::{Func, ScalarFunc},
schema::{Schema, Table, Type},
schema::{Table, Type},
translate::select::{ColumnInfo, Select, SrcTable},
util::normalize_ident,
vdbe::{builder::ProgramBuilder, BranchOffset, Insn},
@@ -11,7 +11,7 @@ use crate::{
pub fn translate_expr(
program: &mut ProgramBuilder,
select: &Select,
select: Option<&Select>,
expr: &ast::Expr,
target_register: usize,
cursor_hint: Option<usize>,
@@ -435,7 +435,7 @@ pub fn translate_expr(
ast::Expr::Parenthesized(_) => todo!(),
ast::Expr::Qualified(tbl, ident) => {
let (idx, col_type, cursor_id, is_primary_key) =
resolve_ident_qualified(program, &tbl.0, &ident.0, select, cursor_hint)?;
resolve_ident_qualified(program, &tbl.0, &ident.0, select.unwrap(), cursor_hint)?;
if is_primary_key {
program.emit_insn(Insn::RowId {
cursor_id,
@@ -614,12 +614,12 @@ pub fn resolve_ident_qualified(
pub fn resolve_ident_table(
program: &ProgramBuilder,
ident: &String,
select: &Select,
select: Option<&Select>,
cursor_hint: Option<usize>,
) -> Result<(usize, Type, usize, bool)> {
let ident = normalize_ident(ident);
let mut found = Vec::new();
for join in &select.src_tables {
for join in &select.unwrap().src_tables {
match join.table {
Table::BTree(ref table) => {
let res = table

194
core/translate/insert.rs Normal file
View File

@@ -0,0 +1,194 @@
use std::{cell::RefCell, ops::Deref, rc::Rc};
use sqlite3_parser::ast::{
DistinctNames, InsertBody, QualifiedName, ResolveType, ResultColumn, With,
};
use crate::Result;
use crate::{
schema::{Schema, Table},
sqlite3_ondisk::DatabaseHeader,
translate::expr::translate_expr,
vdbe::{builder::ProgramBuilder, Insn, Program},
};
pub fn translate_insert(
schema: &Schema,
with: &Option<With>,
or_conflict: &Option<ResolveType>,
tbl_name: &QualifiedName,
_columns: &Option<DistinctNames>,
body: &InsertBody,
_returning: &Option<Vec<ResultColumn>>,
database_header: Rc<RefCell<DatabaseHeader>>,
) -> Result<Program> {
assert!(with.is_none());
assert!(or_conflict.is_none());
let mut program = ProgramBuilder::new();
let init_label = program.allocate_label();
program.emit_insn_with_label_dependency(
Insn::Init {
target_pc: init_label,
},
init_label,
);
let start_offset = program.offset();
// open table
let table_name = &tbl_name.name;
let table = match schema.get_table(table_name.0.as_str()) {
Some(table) => table,
None => crate::bail_corrupt_error!("Parse error: no such table: {}", table_name),
};
let table = Rc::new(Table::BTree(table));
let cursor_id = program.alloc_cursor_id(
Some(table_name.0.clone()),
Some(table.clone().deref().clone()),
);
let root_page = match table.as_ref() {
Table::BTree(btree) => btree.root_page,
Table::Pseudo(_) => todo!(),
};
let mut num_cols = table.columns().len();
if table.has_rowid() {
num_cols += 1;
}
// column_registers_start[0] == rowid if has rowid
let column_registers_start = program.alloc_registers(num_cols);
// Coroutine for values
let yield_reg = program.alloc_register();
let jump_on_definition_label = program.allocate_label();
{
program.emit_insn_with_label_dependency(
Insn::InitCoroutine {
yield_reg,
jump_on_definition: jump_on_definition_label,
start_offset: program.offset() + 1,
},
jump_on_definition_label,
);
match body {
InsertBody::Select(select, None) => match &select.body.select {
sqlite3_parser::ast::OneSelect::Select {
distinctness: _,
columns: _,
from: _,
where_clause: _,
group_by: _,
window_clause: _,
} => todo!(),
sqlite3_parser::ast::OneSelect::Values(values) => {
for value in values {
for (col, expr) in value.iter().enumerate() {
let mut col = col;
if table.has_rowid() {
col += 1;
}
translate_expr(
&mut program,
None,
expr,
column_registers_start + col,
None,
)?;
}
program.emit_insn(Insn::Yield {
yield_reg,
end_offset: 0,
});
}
}
},
InsertBody::DefaultValues => todo!("default values not yet supported"),
_ => todo!(),
}
program.emit_insn(Insn::EndCoroutine { yield_reg });
}
program.resolve_label(jump_on_definition_label, program.offset());
program.emit_insn(Insn::OpenWriteAsync {
cursor_id,
root_page,
});
program.emit_insn(Insn::OpenWriteAwait {});
// Main loop
let record_register = program.alloc_register();
let halt_label = program.allocate_label();
let loop_start_offset = program.offset();
program.emit_insn_with_label_dependency(
Insn::Yield {
yield_reg,
end_offset: halt_label,
},
halt_label,
);
if table.has_rowid() {
let key_reg = column_registers_start + 1;
let row_id_reg = column_registers_start;
// copy key to rowid
program.emit_insn(Insn::Copy {
src_reg: key_reg,
dst_reg: row_id_reg,
amount: 0,
});
program.emit_insn(Insn::SoftNull { reg: key_reg });
let notnull_label = program.allocate_label();
program.emit_insn_with_label_dependency(
Insn::NotNull {
reg: row_id_reg,
target_pc: notnull_label,
},
notnull_label,
);
program.emit_insn(Insn::NewRowid { reg: row_id_reg });
program.resolve_label(notnull_label, program.offset());
program.emit_insn(Insn::MustBeInt { reg: row_id_reg });
let make_record_label = program.allocate_label();
program.emit_insn_with_label_dependency(
Insn::NotExists {
cursor: cursor_id,
rowid_reg: row_id_reg,
target_pc: make_record_label,
},
make_record_label,
);
program.emit_insn(Insn::Halt); // Add error code 1555 and rollback
program.resolve_label(make_record_label, program.offset());
program.emit_insn(Insn::MakeRecord {
start_reg: column_registers_start + 1,
count: num_cols - 1,
dest_reg: record_register,
});
program.emit_insn(Insn::InsertAsync {
cursor: cursor_id,
key_reg: column_registers_start,
record_reg: record_register,
flag: 0,
});
program.emit_insn(Insn::InsertAwait {
cursor_id: cursor_id,
});
}
program.emit_insn(Insn::Goto {
target_pc: loop_start_offset,
});
program.resolve_label(halt_label, program.offset());
program.emit_insn(Insn::Halt);
program.resolve_label(init_label, program.offset());
program.emit_insn(Insn::Transaction);
program.emit_constant_insns();
program.emit_insn(Insn::Goto {
target_pc: start_offset,
});
program.resolve_deferred_labels();
Ok(program.build(database_header))
}

View File

@@ -8,6 +8,7 @@
//! will read rows from the database and filter them according to a WHERE clause.
pub(crate) mod expr;
pub(crate) mod insert;
pub(crate) mod select;
pub(crate) mod where_clause;
@@ -20,6 +21,7 @@ use crate::sqlite3_ondisk::{DatabaseHeader, MIN_PAGE_CACHE_SIZE};
use crate::util::normalize_ident;
use crate::vdbe::{builder::ProgramBuilder, Insn, Program};
use crate::{bail_parse_error, Result};
use insert::translate_insert;
use select::{prepare_select, translate_select};
use sqlite3_parser::ast;
@@ -49,7 +51,6 @@ pub fn translate(
ast::Stmt::DropTable { .. } => bail_parse_error!("DROP TABLE not supported yet"),
ast::Stmt::DropTrigger { .. } => bail_parse_error!("DROP TRIGGER not supported yet"),
ast::Stmt::DropView { .. } => bail_parse_error!("DROP VIEW not supported yet"),
ast::Stmt::Insert { .. } => bail_parse_error!("INSERT not supported yet"),
ast::Stmt::Pragma(name, body) => translate_pragma(&name, body, database_header, pager),
ast::Stmt::Reindex { .. } => bail_parse_error!("REINDEX not supported yet"),
ast::Stmt::Release(_) => bail_parse_error!("RELEASE not supported yet"),
@@ -57,10 +58,27 @@ pub fn translate(
ast::Stmt::Savepoint(_) => bail_parse_error!("SAVEPOINT not supported yet"),
ast::Stmt::Select(select) => {
let select = prepare_select(schema, &select)?;
translate_select(select)
translate_select(select, database_header)
}
ast::Stmt::Update { .. } => bail_parse_error!("UPDATE not supported yet"),
ast::Stmt::Vacuum(_, _) => bail_parse_error!("VACUUM not supported yet"),
ast::Stmt::Insert {
with,
or_conflict,
tbl_name,
columns,
body,
returning,
} => translate_insert(
schema,
&with,
&or_conflict,
&tbl_name,
&columns,
&body,
&returning,
database_header,
),
}
}
@@ -107,7 +125,12 @@ fn translate_pragma(
},
_ => 0,
};
update_pragma(&name.name.0, value_to_update, database_header, pager);
update_pragma(
&name.name.0,
value_to_update,
database_header.clone(),
pager,
);
}
Some(ast::PragmaBody::Call(_)) => {
todo!()
@@ -121,7 +144,7 @@ fn translate_pragma(
target_pc: start_offset,
});
program.resolve_deferred_labels();
Ok(program.build())
Ok(program.build(database_header))
}
fn update_pragma(name: &str, value: i64, header: Rc<RefCell<DatabaseHeader>>, pager: Rc<Pager>) {

View File

@@ -1,5 +1,6 @@
use crate::function::{AggFunc, Func};
use crate::schema::{Column, PseudoTable, Schema, Table};
use crate::sqlite3_ondisk::DatabaseHeader;
use crate::translate::expr::{analyze_columns, maybe_apply_affinity, translate_expr};
use crate::translate::where_clause::{
process_where, translate_processed_where, translate_tableless_where, ProcessedWhereClause,
@@ -11,6 +12,7 @@ use crate::Result;
use sqlite3_parser::ast::{self, JoinOperator, JoinType, ResultColumn};
use std::cell::RefCell;
use std::rc::Rc;
/// A representation of a `SELECT` statement that has all the information
@@ -235,7 +237,10 @@ pub fn prepare_select<'a>(schema: &Schema, select: &'a ast::Select) -> Result<Se
}
/// Generate code for a SELECT statement.
pub fn translate_select(mut select: Select) -> Result<Program> {
pub fn translate_select(
mut select: Select,
database_header: Rc<RefCell<DatabaseHeader>>,
) -> Result<Program> {
let mut program = ProgramBuilder::new();
let init_label = program.allocate_label();
let early_terminate_label = program.allocate_label();
@@ -274,7 +279,13 @@ pub fn translate_select(mut select: Select) -> Result<Program> {
let limit_info = if let Some(limit) = &select.limit {
assert!(limit.offset.is_none());
let target_register = program.alloc_register();
let limit_reg = translate_expr(&mut program, &select, &limit.expr, target_register, None)?;
let limit_reg = translate_expr(
&mut program,
Some(&select),
&limit.expr,
target_register,
None,
)?;
let num = if let ast::Expr::Literal(ast::Literal::Numeric(num)) = &limit.expr {
num.parse::<i64>()?
} else {
@@ -326,7 +337,7 @@ pub fn translate_select(mut select: Select) -> Result<Program> {
} else {
&col.expr
};
translate_expr(&mut program, &select, sort_col_expr, target, None)?;
translate_expr(&mut program, Some(&select), sort_col_expr, target, None)?;
}
let (_, result_cols_count) = translate_columns(&mut program, &select, None)?;
sort_info
@@ -417,7 +428,7 @@ pub fn translate_select(mut select: Select) -> Result<Program> {
target_pc: start_offset,
});
program.resolve_deferred_labels();
Ok(program.build())
Ok(program.build(database_header))
}
fn emit_limit_insn(limit_info: &Option<LimitInfo>, program: &mut ProgramBuilder) {
@@ -742,7 +753,7 @@ fn translate_column(
cursor_hint,
)?;
} else {
let _ = translate_expr(program, select, expr, target_register, cursor_hint)?;
let _ = translate_expr(program, Some(select), expr, target_register, cursor_hint)?;
}
}
ast::ResultColumn::Star => {
@@ -807,7 +818,7 @@ fn translate_aggregation(
}
let expr = &args[0];
let expr_reg = program.alloc_register();
let _ = translate_expr(program, select, expr, expr_reg, cursor_hint)?;
let _ = translate_expr(program, Some(select), expr, expr_reg, cursor_hint)?;
program.emit_insn(Insn::AggStep {
acc_reg: target_register,
col: expr_reg,
@@ -822,7 +833,7 @@ fn translate_aggregation(
} else {
let expr = &args[0];
let expr_reg = program.alloc_register();
let _ = translate_expr(program, select, expr, expr_reg, cursor_hint);
let _ = translate_expr(program, Some(select), expr, expr_reg, cursor_hint);
expr_reg
};
program.emit_insn(Insn::AggStep {
@@ -865,8 +876,14 @@ fn translate_aggregation(
ast::Expr::Literal(ast::Literal::String(String::from("\",\"")));
}
translate_expr(program, select, expr, expr_reg, cursor_hint)?;
translate_expr(program, select, &delimiter_expr, delimiter_reg, cursor_hint)?;
translate_expr(program, Some(select), expr, expr_reg, cursor_hint)?;
translate_expr(
program,
Some(select),
&delimiter_expr,
delimiter_reg,
cursor_hint,
)?;
program.emit_insn(Insn::AggStep {
acc_reg: target_register,
@@ -883,7 +900,7 @@ fn translate_aggregation(
}
let expr = &args[0];
let expr_reg = program.alloc_register();
let _ = translate_expr(program, select, expr, expr_reg, cursor_hint);
let _ = translate_expr(program, Some(select), expr, expr_reg, cursor_hint);
program.emit_insn(Insn::AggStep {
acc_reg: target_register,
col: expr_reg,
@@ -898,7 +915,7 @@ fn translate_aggregation(
}
let expr = &args[0];
let expr_reg = program.alloc_register();
let _ = translate_expr(program, select, expr, expr_reg, cursor_hint);
let _ = translate_expr(program, Some(select), expr, expr_reg, cursor_hint);
program.emit_insn(Insn::AggStep {
acc_reg: target_register,
col: expr_reg,
@@ -932,8 +949,14 @@ fn translate_aggregation(
_ => crate::bail_parse_error!("Incorrect delimiter parameter"),
};
translate_expr(program, select, expr, expr_reg, cursor_hint)?;
translate_expr(program, select, &delimiter_expr, delimiter_reg, cursor_hint)?;
translate_expr(program, Some(select), expr, expr_reg, cursor_hint)?;
translate_expr(
program,
Some(select),
&delimiter_expr,
delimiter_reg,
cursor_hint,
)?;
program.emit_insn(Insn::AggStep {
acc_reg: target_register,
@@ -950,7 +973,7 @@ fn translate_aggregation(
}
let expr = &args[0];
let expr_reg = program.alloc_register();
let _ = translate_expr(program, select, expr, expr_reg, cursor_hint)?;
let _ = translate_expr(program, Some(select), expr, expr_reg, cursor_hint)?;
program.emit_insn(Insn::AggStep {
acc_reg: target_register,
col: expr_reg,
@@ -965,7 +988,7 @@ fn translate_aggregation(
}
let expr = &args[0];
let expr_reg = program.alloc_register();
let _ = translate_expr(program, select, expr, expr_reg, cursor_hint)?;
let _ = translate_expr(program, Some(select), expr, expr_reg, cursor_hint)?;
program.emit_insn(Insn::AggStep {
acc_reg: target_register,
col: expr_reg,

View File

@@ -306,12 +306,12 @@ fn translate_condition_expr(
ast::Expr::Binary(lhs, op, rhs) => {
let lhs_reg = program.alloc_register();
let rhs_reg = program.alloc_register();
let _ = translate_expr(program, select, lhs, lhs_reg, cursor_hint);
let _ = translate_expr(program, Some(select), lhs, lhs_reg, cursor_hint);
match lhs.as_ref() {
ast::Expr::Literal(_) => program.mark_last_insn_constant(),
_ => {}
}
let _ = translate_expr(program, select, rhs, rhs_reg, cursor_hint);
let _ = translate_expr(program, Some(select), rhs, rhs_reg, cursor_hint);
match rhs.as_ref() {
ast::Expr::Literal(_) => program.mark_last_insn_constant(),
_ => {}
@@ -554,7 +554,7 @@ fn translate_condition_expr(
// The left hand side only needs to be evaluated once we have a list of values to compare against.
let lhs_reg = program.alloc_register();
let _ = translate_expr(program, select, lhs, lhs_reg, cursor_hint)?;
let _ = translate_expr(program, Some(select), lhs, lhs_reg, cursor_hint)?;
let rhs = rhs.as_ref().unwrap();
@@ -577,7 +577,7 @@ fn translate_condition_expr(
for (i, expr) in rhs.iter().enumerate() {
let rhs_reg = program.alloc_register();
let last_condition = i == rhs.len() - 1;
let _ = translate_expr(program, select, expr, rhs_reg, cursor_hint)?;
let _ = translate_expr(program, Some(select), expr, rhs_reg, cursor_hint)?;
// If this is not the last condition, we need to jump to the 'jump_target_when_true' label if the condition is true.
if !last_condition {
program.emit_insn_with_label_dependency(
@@ -614,7 +614,7 @@ fn translate_condition_expr(
// If it's a NOT IN expression, we need to jump to the 'jump_target_when_false' label if any of the conditions are true.
for expr in rhs.iter() {
let rhs_reg = program.alloc_register();
let _ = translate_expr(program, select, expr, rhs_reg, cursor_hint)?;
let _ = translate_expr(program, Some(select), expr, rhs_reg, cursor_hint)?;
program.emit_insn_with_label_dependency(
Insn::Eq {
lhs: lhs_reg,
@@ -657,9 +657,9 @@ fn translate_condition_expr(
let pattern_reg = program.alloc_register();
let column_reg = program.alloc_register();
// LIKE(pattern, column). We should translate the pattern first before the column
let _ = translate_expr(program, select, rhs, pattern_reg, cursor_hint)?;
let _ = translate_expr(program, Some(select), rhs, pattern_reg, cursor_hint)?;
program.mark_last_insn_constant();
let _ = translate_expr(program, select, lhs, column_reg, cursor_hint)?;
let _ = translate_expr(program, Some(select), lhs, column_reg, cursor_hint)?;
program.emit_insn(Insn::Function {
func: ScalarFunc::Like,
start_reg: pattern_reg,

View File

@@ -4,6 +4,8 @@ use std::{cell::Ref, rc::Rc};
use crate::error::LimboError;
use crate::Result;
use crate::sqlite3_ondisk::write_varint;
#[derive(Debug, Clone, PartialEq)]
pub enum Value<'a> {
Null,
@@ -301,6 +303,59 @@ impl OwnedRecord {
pub fn new(values: Vec<OwnedValue>) -> Self {
Self { values }
}
pub fn serialize(&self, buf: &mut Vec<u8>) {
let initial_i = buf.len();
for value in &self.values {
let serial_type = match value {
OwnedValue::Null => 0,
OwnedValue::Integer(_) => 6, // for now let's only do i64
OwnedValue::Float(_) => 7,
OwnedValue::Text(t) => (t.len() * 2 + 13) as u64,
OwnedValue::Blob(b) => (b.len() * 2 + 12) as u64,
// not serializable values
OwnedValue::Agg(_) => unreachable!(),
OwnedValue::Record(_) => unreachable!(),
};
buf.resize(buf.len() + 9, 0); // Ensure space for varint
let len = buf.len();
let n = write_varint(&mut buf[len - 9..], serial_type);
buf.truncate(buf.len() - 9 + n); // Remove unused bytes
}
let mut header_size = buf.len() - initial_i;
// write content
for value in &self.values {
match value {
OwnedValue::Null => {}
OwnedValue::Integer(i) => buf.extend_from_slice(&i.to_be_bytes()),
OwnedValue::Float(f) => buf.extend_from_slice(&f.to_be_bytes()),
OwnedValue::Text(t) => buf.extend_from_slice(t.as_bytes()),
OwnedValue::Blob(b) => buf.extend_from_slice(b),
// non serializable
OwnedValue::Agg(_) => unreachable!(),
OwnedValue::Record(_) => unreachable!(),
};
}
let mut header_bytes_buf: Vec<u8> = Vec::new();
if header_size <= 126 {
// common case
header_size += 1;
} else {
todo!("calculate big header size extra bytes");
// get header varint len
// header_size += n;
// if( nVarint<sqlite3VarintLen(nHdr) ) nHdr++;
}
assert!(header_size <= 126);
header_bytes_buf.extend(std::iter::repeat(0).take(9));
let n = write_varint(&mut header_bytes_buf.as_mut_slice(), header_size as u64);
header_bytes_buf.truncate(n);
buf.splice(initial_i..initial_i, header_bytes_buf.iter().cloned());
}
}
pub enum CursorResult<T> {
@@ -315,7 +370,13 @@ pub trait Cursor {
fn wait_for_completion(&mut self) -> Result<()>;
fn rowid(&self) -> Result<Option<u64>>;
fn record(&self) -> Result<Ref<Option<OwnedRecord>>>;
fn insert(&mut self, record: &OwnedRecord) -> Result<()>;
fn insert(
&mut self,
key: &OwnedValue,
record: &OwnedRecord,
moved_before: bool, /* Tells inserter that it doesn't need to traverse in order to find leaf page */
) -> Result<CursorResult<()>>; //
fn exists(&mut self, key: &OwnedValue) -> Result<CursorResult<bool>>;
fn set_null_flag(&mut self, flag: bool);
fn get_null_flag(&self) -> bool;
}

View File

@@ -1,3 +1,7 @@
use std::{cell::RefCell, rc::Rc};
use crate::sqlite3_ondisk::DatabaseHeader;
use super::{BranchOffset, CursorID, Insn, InsnReference, Program, Table};
pub struct ProgramBuilder {
@@ -246,6 +250,26 @@ impl ProgramBuilder {
assert!(*pc_if_next < 0);
*pc_if_next = to_offset;
}
Insn::InitCoroutine {
yield_reg: _,
jump_on_definition,
start_offset: _,
} => {
*jump_on_definition = to_offset;
}
Insn::NotExists {
cursor: _,
rowid_reg: _,
target_pc,
} => {
*target_pc = to_offset;
}
Insn::Yield {
yield_reg: _,
end_offset,
} => {
*end_offset = to_offset;
}
_ => {
todo!("missing resolve_label for {:?}", insn);
}
@@ -281,7 +305,7 @@ impl ProgramBuilder {
self.deferred_label_resolutions.clear();
}
pub fn build(self) -> Program {
pub fn build(self, database_header: Rc<RefCell<DatabaseHeader>>) -> Program {
assert!(
self.deferred_label_resolutions.is_empty(),
"deferred_label_resolutions is not empty when build() is called, did you forget to call resolve_deferred_labels()?"
@@ -294,6 +318,7 @@ impl ProgramBuilder {
max_registers: self.next_free_register,
insns: self.insns,
cursor_ref: self.cursor_ref,
database_header,
}
}
}

View File

@@ -507,6 +507,137 @@ pub fn insn_to_str(program: &Program, addr: InsnReference, insn: &Insn, indent:
0,
format!("r[{}]=func(r[{}..])", dest, start_reg),
),
Insn::InitCoroutine {
yield_reg,
jump_on_definition,
start_offset,
} => (
"InitCoroutine",
*yield_reg as i32,
*jump_on_definition as i32,
*start_offset as i32,
OwnedValue::Text(Rc::new("".to_string())),
0,
format!(""),
),
Insn::EndCoroutine { yield_reg } => (
"EndCoroutine",
*yield_reg as i32,
0,
0,
OwnedValue::Text(Rc::new("".to_string())),
0,
format!(""),
),
Insn::Yield {
yield_reg,
end_offset,
} => (
"Yield",
*yield_reg as i32,
*end_offset as i32,
0,
OwnedValue::Text(Rc::new("".to_string())),
0,
format!(""),
),
Insn::InsertAsync {
cursor,
key_reg,
record_reg,
flag,
} => (
"InsertAsync",
*cursor as i32,
*record_reg as i32,
*key_reg as i32,
OwnedValue::Text(Rc::new("".to_string())),
*flag as u16,
format!(""),
),
Insn::InsertAwait { cursor_id } => (
"InsertAwait",
*cursor_id as i32,
0,
0,
OwnedValue::Text(Rc::new("".to_string())),
0,
format!(""),
),
Insn::NewRowid { reg } => (
"NewRowId",
0,
*reg as i32,
0,
OwnedValue::Text(Rc::new("".to_string())),
0,
format!(""),
),
Insn::MustBeInt { reg } => (
"MustBeInt",
*reg as i32,
0,
0,
OwnedValue::Text(Rc::new("".to_string())),
0,
format!(""),
),
Insn::SoftNull { reg } => (
"SoftNull",
*reg as i32,
0,
0,
OwnedValue::Text(Rc::new("".to_string())),
0,
format!(""),
),
Insn::NotExists {
cursor,
rowid_reg,
target_pc,
} => (
"NotExists",
*cursor as i32,
*target_pc as i32,
*rowid_reg as i32,
OwnedValue::Text(Rc::new("".to_string())),
0,
format!(""),
),
Insn::OpenWriteAsync {
cursor_id,
root_page,
} => (
"OpenWriteAsync",
*cursor_id as i32,
*root_page as i32,
0,
OwnedValue::Text(Rc::new("".to_string())),
0,
format!(""),
),
Insn::OpenWriteAwait {} => (
"OpenWriteAwait",
0,
0,
0,
OwnedValue::Text(Rc::new("".to_string())),
0,
format!(""),
),
Insn::Copy {
src_reg,
dst_reg,
amount,
} => (
"Copy",
*src_reg as i32,
*dst_reg as i32,
*amount as i32,
OwnedValue::Text(Rc::new("".to_string())),
0,
format!(""),
),
};
format!(
"{:<4} {:<17} {:<4} {:<4} {:<4} {:<13} {:<2} {}",

View File

@@ -28,6 +28,7 @@ use crate::function::{AggFunc, ScalarFunc};
use crate::pager::Pager;
use crate::pseudo::PseudoCursor;
use crate::schema::Table;
use crate::sqlite3_ondisk::DatabaseHeader;
use crate::types::{AggContext, Cursor, CursorResult, OwnedRecord, OwnedValue, Record};
use crate::Result;
@@ -279,6 +280,63 @@ pub enum Insn {
dest: usize, // P3
func: ScalarFunc, // P4
},
InitCoroutine {
yield_reg: usize,
jump_on_definition: BranchOffset,
start_offset: BranchOffset,
},
EndCoroutine {
yield_reg: usize,
},
Yield {
yield_reg: usize,
end_offset: BranchOffset,
},
InsertAsync {
cursor: CursorID,
key_reg: usize, // Must be int.
record_reg: usize, // Blob of record data.
flag: usize, // Flags used by insert, for now not used.
},
InsertAwait {
cursor_id: usize,
},
NewRowid {
reg: usize,
},
MustBeInt {
reg: usize,
},
SoftNull {
reg: usize,
},
NotExists {
cursor: CursorID,
rowid_reg: usize,
target_pc: BranchOffset,
},
OpenWriteAsync {
cursor_id: CursorID,
root_page: PageIdx,
},
OpenWriteAwait {},
Copy {
src_reg: usize,
dst_reg: usize,
amount: usize, // 0 amount means we include src_reg, dst_reg..=dst_reg+amount = src_reg..=src_reg+amount
},
}
// Index of insn in list of insns
@@ -295,6 +353,7 @@ pub struct ProgramState {
pub pc: BranchOffset,
cursors: RefCell<BTreeMap<CursorID, Box<dyn Cursor>>>,
registers: Vec<OwnedValue>,
ended_coroutine: bool, // flag to notify yield coroutine finished
}
impl ProgramState {
@@ -306,6 +365,7 @@ impl ProgramState {
pc: 0,
cursors,
registers,
ended_coroutine: false,
}
}
@@ -322,6 +382,7 @@ pub struct Program {
pub max_registers: usize,
pub insns: Vec<Insn>,
pub cursor_ref: Vec<(Option<String>, Option<Table>)>,
pub database_header: Rc<RefCell<DatabaseHeader>>,
}
impl Program {
@@ -580,7 +641,11 @@ impl Program {
cursor_id,
root_page,
} => {
let cursor = Box::new(BTreeCursor::new(pager.clone(), *root_page));
let cursor = Box::new(BTreeCursor::new(
pager.clone(),
*root_page,
self.database_header.clone(),
));
cursors.insert(*cursor_id, cursor);
state.pc += 1;
}
@@ -997,7 +1062,7 @@ impl Program {
};
state.registers[*dest_reg] = OwnedValue::Record(record.clone());
let sorter_cursor = cursors.get_mut(sorter_cursor).unwrap();
sorter_cursor.insert(&record)?;
sorter_cursor.insert(&OwnedValue::Integer(0), &record, false)?; // fix key later
state.pc += 1;
}
Insn::SorterInsert {
@@ -1009,7 +1074,8 @@ impl Program {
OwnedValue::Record(record) => record,
_ => unreachable!("SorterInsert on non-record register"),
};
cursor.insert(record)?;
// TODO: set correct key
cursor.insert(&OwnedValue::Integer(0), record, false)?;
state.pc += 1;
}
Insn::SorterSort {
@@ -1198,6 +1264,121 @@ impl Program {
state.pc += 1;
}
},
Insn::InitCoroutine {
yield_reg,
jump_on_definition,
start_offset,
} => {
state.registers[*yield_reg] = OwnedValue::Integer(*start_offset);
state.pc = *jump_on_definition;
}
Insn::EndCoroutine { yield_reg } => {
if let OwnedValue::Integer(pc) = state.registers[*yield_reg] {
state.ended_coroutine = true;
state.pc = pc - 1; // yield jump is always next to yield. Here we substract 1 to go back to yield instruction
} else {
unreachable!();
}
}
Insn::Yield {
yield_reg,
end_offset,
} => {
if let OwnedValue::Integer(pc) = state.registers[*yield_reg] {
if state.ended_coroutine {
state.pc = *end_offset;
} else {
// swap
(state.pc, state.registers[*yield_reg]) =
(pc, OwnedValue::Integer(state.pc + 1));
}
} else {
unreachable!();
}
}
Insn::InsertAsync {
cursor,
key_reg,
record_reg,
flag: _,
} => {
let cursor = cursors.get_mut(cursor).unwrap();
let record = match &state.registers[*record_reg] {
OwnedValue::Record(r) => r,
_ => unreachable!("Not a record! Cannot insert a non record value."),
};
let key = &state.registers[*key_reg];
match cursor.insert(key, record, true)? {
CursorResult::Ok(_) => {
state.pc += 1;
}
CursorResult::IO => {
// If there is I/O, the instruction is restarted.
return Ok(StepResult::IO);
}
}
}
Insn::InsertAwait { cursor_id } => {
let cursor = cursors.get_mut(cursor_id).unwrap();
cursor.wait_for_completion()?;
state.pc += 1;
}
Insn::NewRowid { reg: _ } => todo!(),
Insn::MustBeInt { reg } => {
match state.registers[*reg] {
OwnedValue::Integer(_) => {}
_ => {
crate::bail_parse_error!(
"MustBeInt: the value in the register is not an integer"
);
}
};
state.pc += 1;
}
Insn::SoftNull { reg } => {
state.registers[*reg] = OwnedValue::Null;
state.pc += 1;
}
Insn::NotExists {
cursor,
rowid_reg,
target_pc,
} => {
let cursor = cursors.get_mut(cursor).unwrap();
match cursor.exists(&state.registers[*rowid_reg])? {
CursorResult::Ok(true) => state.pc += 1,
CursorResult::Ok(false) => state.pc = *target_pc,
CursorResult::IO => return Ok(StepResult::IO),
};
}
// this cursor may be reused for next insert
// Update: tablemoveto is used to travers on not exists, on insert depending on flags if nonseek it traverses again.
// If not there might be some optimizations obviously.
Insn::OpenWriteAsync {
cursor_id,
root_page,
} => {
let cursor = Box::new(BTreeCursor::new(
pager.clone(),
*root_page,
self.database_header.clone(),
));
cursors.insert(*cursor_id, cursor);
state.pc += 1;
}
Insn::OpenWriteAwait {} => {
state.pc += 1;
}
Insn::Copy {
src_reg,
dst_reg,
amount,
} => {
for i in 0..=*amount {
state.registers[*dst_reg + i] = state.registers[*src_reg + i].clone();
}
state.pc += 1;
}
}
}
}

View File

@@ -1,5 +1,5 @@
use crate::{
types::{Cursor, CursorResult, OwnedRecord},
types::{Cursor, CursorResult, OwnedRecord, OwnedValue},
Result,
};
use std::{
@@ -79,11 +79,18 @@ impl Cursor for Sorter {
Ok(self.current.borrow())
}
fn insert(&mut self, record: &OwnedRecord) -> Result<()> {
fn insert(
&mut self,
key: &OwnedValue,
record: &OwnedRecord,
moved_before: bool,
) -> Result<CursorResult<()>> {
let _ = key;
let _ = moved_before;
let key_fields = self.order.len();
let key = OwnedRecord::new(record.values[0..key_fields].to_vec());
self.insert(key, OwnedRecord::new(record.values[key_fields..].to_vec()));
Ok(())
Ok(CursorResult::Ok(()))
}
fn set_null_flag(&mut self, _flag: bool) {
@@ -93,4 +100,9 @@ impl Cursor for Sorter {
fn get_null_flag(&self) -> bool {
todo!();
}
fn exists(&mut self, key: &OwnedValue) -> Result<CursorResult<bool>> {
let _ = key;
todo!()
}
}

View File

@@ -162,7 +162,7 @@ impl limbo_core::File for SimulatorFile {
&self,
pos: usize,
buffer: Rc<std::cell::RefCell<limbo_core::Buffer>>,
c: Rc<limbo_core::WriteCompletion>,
c: Rc<limbo_core::Completion>,
) -> Result<()> {
if *self.fault.borrow() {
*self.nr_pwrite_faults.borrow_mut() += 1;