diff --git a/bindings/wasm/lib.rs b/bindings/wasm/lib.rs index 9b7582e7c..d859509d9 100644 --- a/bindings/wasm/lib.rs +++ b/bindings/wasm/lib.rs @@ -45,7 +45,7 @@ impl limbo_core::PageIO for PageIO { &self, _page_idx: usize, _buffer: Rc>, - _c: Rc, + _c: Rc, ) -> Result<()> { todo!() } diff --git a/cli/main.rs b/cli/main.rs index 678bc4afd..bae51058e 100644 --- a/cli/main.rs +++ b/cli/main.rs @@ -277,5 +277,7 @@ fn query( eprintln!("{}", err); } } + // for now let's cache flush always + conn.cacheflush()?; Ok(()) } diff --git a/core/btree.rs b/core/btree.rs index 41becb614..9b1318e0a 100644 --- a/core/btree.rs +++ b/core/btree.rs @@ -1,11 +1,25 @@ -use crate::pager::Pager; -use crate::sqlite3_ondisk::{BTreeCell, TableInteriorCell, TableLeafCell}; -use crate::types::{Cursor, CursorResult, OwnedRecord}; +use crate::pager::{Page, Pager}; +use crate::sqlite3_ondisk::{ + read_varint, write_varint, BTreeCell, DatabaseHeader, PageContent, PageType, TableInteriorCell, + TableLeafCell, +}; +use crate::types::{Cursor, CursorResult, OwnedRecord, OwnedValue}; use crate::Result; use std::cell::{Ref, RefCell}; +use std::mem; use std::rc::Rc; +/* + These are offsets of fields in the header of a b-tree page. +*/ +const BTREE_HEADER_OFFSET_TYPE: usize = 0; /* type of btree page -> u8 */ +const BTREE_HEADER_OFFSET_FREEBLOCK: usize = 1; /* pointer to first freeblock -> u16 */ +const BTREE_HEADER_OFFSET_CELL_COUNT: usize = 3; /* number of cells in the page -> u16 */ +const BTREE_HEADER_OFFSET_CELL_CONTENT: usize = 5; /* pointer to first byte of cell allocated content from top -> u16 */ +const BTREE_HEADER_OFFSET_FRAGMENTED: usize = 7; /* number of fragmented bytes -> u8 */ +const BTREE_HEADER_OFFSET_RIGHTMOST: usize = 8; /* if internalnode, pointer right most pointer (saved separately from cells) -> u32 */ + pub struct MemPage { parent: Option>, page_idx: usize, @@ -38,10 +52,15 @@ pub struct BTreeCursor { rowid: RefCell>, record: RefCell>, null_flag: bool, + database_header: Rc>, } impl BTreeCursor { - pub fn new(pager: Rc, root_page: usize) -> Self { + pub fn new( + pager: Rc, + root_page: usize, + database_header: Rc>, + ) -> Self { Self { pager, root_page, @@ -49,6 +68,7 @@ impl BTreeCursor { rowid: RefCell::new(None), record: RefCell::new(None), null_flag: false, + database_header, } } @@ -61,14 +81,15 @@ impl BTreeCursor { }; let page_idx = mem_page.page_idx; let page = self.pager.read_page(page_idx)?; + let page = RefCell::borrow(&page); if page.is_locked() { return Ok(CursorResult::IO); } let page = page.contents.read().unwrap(); let page = page.as_ref().unwrap(); - if mem_page.cell_idx() >= page.cells.len() { + if mem_page.cell_idx() >= page.cell_count() { let parent = mem_page.parent.clone(); - match page.header.right_most_pointer { + match page.rightmost_pointer() { Some(right_most_pointer) => { let mem_page = MemPage::new(parent.clone(), right_most_pointer as usize, 0); self.page.replace(Some(Rc::new(mem_page))); @@ -85,7 +106,7 @@ impl BTreeCursor { }, } } - let cell = &page.cells[mem_page.cell_idx()]; + let cell = page.cell_get(mem_page.cell_idx())?; match &cell { BTreeCell::TableInteriorCell(TableInteriorCell { _left_child_page, @@ -115,6 +136,630 @@ impl BTreeCursor { } } } + + fn move_to_root(&mut self) { + self.page + .replace(Some(Rc::new(MemPage::new(None, self.root_page, 0)))); + } + + pub fn move_to(&mut self, key: u64) -> Result> { + self.move_to_root(); + + loop { + let mem_page = { + let mem_page = self.page.borrow(); + let mem_page = mem_page.as_ref().unwrap(); + mem_page.clone() + }; + let page_idx = mem_page.page_idx; + let page = self.pager.read_page(page_idx)?; + let page = RefCell::borrow(&page); + if page.is_locked() { + return Ok(CursorResult::IO); + } + let page = page.contents.read().unwrap(); + let page = page.as_ref().unwrap(); + if page.is_leaf() { + return Ok(CursorResult::Ok(())); + } + + let mut found_cell = false; + for cell_idx in 0..page.cell_count() { + match &page.cell_get(cell_idx)? { + BTreeCell::TableInteriorCell(TableInteriorCell { + _left_child_page, + _rowid, + }) => { + if key < *_rowid { + mem_page.advance(); + let mem_page = + MemPage::new(Some(mem_page.clone()), *_left_child_page as usize, 0); + self.page.replace(Some(Rc::new(mem_page))); + found_cell = true; + break; + } + } + BTreeCell::TableLeafCell(TableLeafCell { + _rowid: _, + _payload: _, + first_overflow_page: _, + }) => { + unreachable!( + "we don't iterate leaf cells while trying to move to a leaf cell" + ); + } + BTreeCell::IndexInteriorCell(_) => { + unimplemented!(); + } + BTreeCell::IndexLeafCell(_) => { + unimplemented!(); + } + } + } + + if !found_cell { + let parent = mem_page.clone(); + match page.rightmost_pointer() { + Some(right_most_pointer) => { + let mem_page = MemPage::new(Some(parent), right_most_pointer as usize, 0); + self.page.replace(Some(Rc::new(mem_page))); + continue; + } + None => { + unreachable!("we shall not go back up! The only way is down the slope"); + } + } + } + } + } + + fn insert_to_page( + &mut self, + key: &OwnedValue, + _record: &OwnedRecord, + ) -> Result> { + let page_ref = self.get_page()?; + let int_key = match key { + OwnedValue::Integer(i) => *i as u64, + _ => unreachable!("btree tables are indexed by integers!"), + }; + + let cell_idx = { + let page = RefCell::borrow(&page_ref); + if page.is_locked() { + return Ok(CursorResult::IO); + } + + page.set_dirty(); + self.pager.add_dirty(page_ref.clone()); + + let mut page = page.contents.write().unwrap(); + let page = page.as_mut().unwrap(); + assert!(matches!(page.page_type(), PageType::TableLeaf)); + + // find cell + find_cell(page, int_key) + }; + + // TODO: if overwrite drop cell + + // insert cell + let mut payload: Vec = Vec::new(); + + { + // Data len will be prepended later + // Key + let mut key_varint: Vec = Vec::new(); + key_varint.extend(std::iter::repeat(0).take(9)); + let n = write_varint(&mut key_varint.as_mut_slice()[0..9], int_key); + write_varint(&mut key_varint, int_key); + key_varint.truncate(n); + payload.extend_from_slice(&key_varint); + } + + // Data payload + let payload_size_before_record = payload.len(); + _record.serialize(&mut payload); + let header_size = payload.len() - payload_size_before_record; + + { + // Data len + let mut data_len_varint: Vec = Vec::new(); + data_len_varint.extend(std::iter::repeat(0).take(9)); + let n = write_varint( + &mut data_len_varint.as_mut_slice()[0..9], + header_size as u64, + ); + data_len_varint.truncate(n); + payload.splice(0..0, data_len_varint.iter().cloned()); + } + + let usable_space = { + let db_header = RefCell::borrow(&self.database_header); + (db_header.page_size - db_header.unused_space as u16) as usize + }; + let free = { + let page = RefCell::borrow(&page_ref); + let mut page = page.contents.write().unwrap(); + let page = page.as_mut().unwrap(); + self.compute_free_space(page, RefCell::borrow(&self.database_header)) + }; + assert!( + payload.len() <= usable_space - 100, /* 100 bytes minus for precaution to remember */ + "need to implemented overflow pages, too big to even add to a an empty page" + ); + if payload.len() + 2 > free as usize { + // overflow or balance + self.balance_leaf(int_key, payload); + } else { + // insert + let page = RefCell::borrow(&page_ref); + + let mut page = page.contents.write().unwrap(); + let page = page.as_mut().unwrap(); + self.insert_into_cell(page, &payload, cell_idx); + } + + Ok(CursorResult::Ok(())) + } + + /* insert to postion and shift other pointers */ + fn insert_into_cell(&mut self, page: &mut PageContent, payload: &Vec, cell_idx: usize) { + // TODO: insert into cell payload in internal page + let pc = self.allocate_cell_space(page, payload.len() as u16); + let mut buf_ref = RefCell::borrow_mut(&page.buffer); + let buf: &mut [u8] = buf_ref.as_mut_slice(); + + // copy data + buf[pc as usize..pc as usize + payload.len()].copy_from_slice(&payload); + // memmove(pIns+2, pIns, 2*(pPage->nCell - i)); + let (pointer_area_pc_by_idx, _) = page.cell_get_raw_pointer_region(); + let pointer_area_pc_by_idx = pointer_area_pc_by_idx + (2 * cell_idx); + + // move previous pointers forward and insert new pointer there + let n_cells_forward = 2 * (page.cell_count() - cell_idx); + if n_cells_forward > 0 { + buf.copy_within( + pointer_area_pc_by_idx..pointer_area_pc_by_idx + n_cells_forward, + pointer_area_pc_by_idx + 2, + ); + } + page.write_u16(pointer_area_pc_by_idx, pc); + + // update first byte of content area + page.write_u16(BTREE_HEADER_OFFSET_CELL_CONTENT, pc); + + // update cell count + let new_n_cells = (page.cell_count() + 1) as u16; + page.write_u16(BTREE_HEADER_OFFSET_CELL_COUNT, new_n_cells); + } + + fn get_page(&mut self) -> crate::Result>> { + let mem_page = { + let mem_page = self.page.borrow(); + let mem_page = mem_page.as_ref().unwrap(); + mem_page.clone() + }; + let page_idx = mem_page.page_idx; + let page_ref = self.pager.read_page(page_idx)?; + Ok(page_ref) + } + + fn balance_leaf(&mut self, key: u64, payload: Vec) { + // This is a naive algorithm that doesn't try to distribute cells evenly by content. + // It will try to split the page in half by keys not by content. + // Sqlite tries to have a page at least 40% full. + let mut key = key; + let mut payload = payload; + loop { + let mem_page = { + let mem_page = self.page.borrow(); + let mem_page = mem_page.as_ref().unwrap(); + mem_page.clone() + }; + let page_ref = self.read_page_sync(mem_page.page_idx); + let mut page_rc = RefCell::borrow_mut(&page_ref); + + let right_page_id = { + // split procedure + let mut page = page_rc.contents.write().unwrap(); + let page = page.as_mut().unwrap(); + let free = self.compute_free_space(page, RefCell::borrow(&self.database_header)); + assert!( + matches!( + page.page_type(), + PageType::TableLeaf | PageType::TableInterior + ), + "indexes still not supported " + ); + if payload.len() + 2 <= free as usize { + let cell_idx = find_cell(page, key); + self.insert_into_cell(page, &payload, cell_idx); + break; + } + + let right_page_ref = self.allocate_page(page.page_type()); + let right_page = RefCell::borrow_mut(&right_page_ref); + let right_page_id = right_page.id; + let mut right_page = right_page.contents.write().unwrap(); + let right_page = right_page.as_mut().unwrap(); + { + // move data from one buffer to another + // done in a separate block to satisfy borrow checker + let mut left_buf = RefCell::borrow_mut(&page.buffer); + let left_buf: &mut [u8] = left_buf.as_mut_slice(); + let mut right_buf = RefCell::borrow_mut(&right_page.buffer); + let right_buf: &mut [u8] = right_buf.as_mut_slice(); + + let mut rbrk = right_page.cell_content_area() as usize; + + let cells_to_move = page.cell_count() / 2; + let (mut cell_pointer_idx, _) = page.cell_get_raw_pointer_region(); + // move half of cells to right page + for cell_idx in cells_to_move..page.cell_count() { + let (start, len) = page.cell_get_raw_region_borrowed(cell_idx, left_buf); + // copy data + rbrk -= len; + right_buf[rbrk..rbrk + len].copy_from_slice(&left_buf[start..start + len]); + // set pointer + right_page.write_u16(cell_pointer_idx, rbrk as u16); + cell_pointer_idx += 2; + } + // update cell count in both pages + let keys_moved = page.cell_count() - cells_to_move; + page.write_u16(BTREE_HEADER_OFFSET_CELL_COUNT, cells_to_move as u16); + right_page.write_u16(BTREE_HEADER_OFFSET_CELL_COUNT, keys_moved as u16); + // update cell content are start + right_page.write_u16(BTREE_HEADER_OFFSET_CELL_CONTENT, rbrk as u16); + } + + let last_cell = page.cell_get(page.cell_count() - 1).unwrap(); + let last_cell_key = match &last_cell { + BTreeCell::TableLeafCell(cell) => cell._rowid, + BTreeCell::TableInteriorCell(cell) => cell._rowid, + _ => unreachable!(), /* not yet supported index tables */ + }; + // if not leaf page update rightmost pointer + if let PageType::TableInterior = page.page_type() { + right_page.write_u32( + BTREE_HEADER_OFFSET_RIGHTMOST, + page.rightmost_pointer().unwrap(), + ); + // convert last cell to rightmost pointer + let BTreeCell::TableInteriorCell(last_cell) = &last_cell else { + unreachable!(); + }; + page.write_u32(BTREE_HEADER_OFFSET_RIGHTMOST, last_cell._left_child_page); + // page count now has one less cell because we've added the last one to rightmost pointer + page.write_u16( + BTREE_HEADER_OFFSET_CELL_COUNT, + (page.cell_count() - 1) as u16, + ); + } + + // update free list block by defragmenting page + self.defragment_page(page, RefCell::borrow(&self.database_header)); + // insert into one of the pages + if key < last_cell_key { + let cell_idx = find_cell(page, key); + self.insert_into_cell(page, &payload, cell_idx); + } else { + let cell_idx = find_cell(right_page, key); + self.insert_into_cell(right_page, &payload, cell_idx); + } + // propagate parent split + key = last_cell_key; + right_page_id + }; + + payload = Vec::new(); + if mem_page.page_idx == self.root_page { + /* todo: balance deeper, create child and copy contents of root there. Then split root */ + /* if we are in root page then we just need to create a new root and push key there */ + let new_root_page_ref = self.allocate_page(PageType::TableInterior); + { + let new_root_page = RefCell::borrow_mut(&new_root_page_ref); + let new_root_page_id = new_root_page.id; + let mut new_root_page_contents = new_root_page.contents.write().unwrap(); + let new_root_page_contents = new_root_page_contents.as_mut().unwrap(); + /* + Note that we set cell pointer to point to itself, because we will later swap this page's + content with splitted page in order to not update root page idx. + */ + let new_root_page_id = new_root_page_id as u32; + payload.extend_from_slice(&(new_root_page_id as u32).to_be_bytes()); + payload.extend(std::iter::repeat(0).take(9)); + let n = write_varint(&mut payload.as_mut_slice()[4..], key as u64); + payload.truncate(4 + n); + + // write left child cell + self.insert_into_cell(new_root_page_contents, &payload, 0); + + // write right child cell + new_root_page_contents + .write_u32(BTREE_HEADER_OFFSET_RIGHTMOST, right_page_id as u32); + } + + /* swap splitted page buffer with new root buffer so we don't have to update page idx */ + { + let mut new_root_page = RefCell::borrow_mut(&new_root_page_ref); + mem::swap(&mut *new_root_page, &mut *page_rc); + + // now swap contents + let mut new_root_page_contents = new_root_page.contents.write().unwrap(); + let mut page_contents = page_rc.contents.write().unwrap(); + std::mem::swap(&mut *new_root_page_contents, &mut *page_contents); + + self.page = + RefCell::new(Some(Rc::new(MemPage::new(None, new_root_page.id, 0)))); + } + + break; + } + + // Propagate split divided to top. + payload.extend_from_slice(&(mem_page.page_idx as u32).to_be_bytes()); + payload.extend(std::iter::repeat(0).take(9)); + let n = write_varint(&mut payload.as_mut_slice()[4..], key as u64); + payload.truncate(n); + + self.page = RefCell::new(Some(mem_page.parent.as_ref().unwrap().clone())); + } + } + + fn read_page_sync(&mut self, page_idx: usize) -> Rc> { + loop { + let page_ref = self.pager.read_page(page_idx); + match page_ref { + Ok(p) => return p, + Err(_) => {} + } + } + } + + fn allocate_page(&mut self, page_type: PageType) -> Rc> { + let page = self.pager.allocate_page().unwrap(); + + { + // setup btree page + let contents = RefCell::borrow(&page); + let mut contents = contents.contents.write().unwrap(); + let contents = contents.as_mut().unwrap(); + let id = page_type as u8; + contents.write_u8(BTREE_HEADER_OFFSET_TYPE, id); + contents.write_u16(BTREE_HEADER_OFFSET_FREEBLOCK, 0); + contents.write_u16(BTREE_HEADER_OFFSET_CELL_COUNT, 0); + + let db_header = RefCell::borrow(&self.database_header); + let cell_content_area_start = db_header.page_size - db_header.unused_space as u16; + contents.write_u16(BTREE_HEADER_OFFSET_CELL_CONTENT, cell_content_area_start); + + contents.write_u8(BTREE_HEADER_OFFSET_FRAGMENTED, 0); + contents.write_u32(BTREE_HEADER_OFFSET_RIGHTMOST, 0); + } + + page + } + + /* + Allocate space for a cell on a page. + */ + fn allocate_cell_space(&mut self, page_ref: &PageContent, amount: u16) -> u16 { + let amount = amount as usize; + + let (cell_offset, _) = page_ref.cell_get_raw_pointer_region(); + let gap = cell_offset + 2 * page_ref.cell_count(); + let mut top = page_ref.cell_content_area() as usize; + + // there are free blocks and enough space + if page_ref.first_freeblock() != 0 && gap + 2 <= top { + // find slot + let db_header = RefCell::borrow(&self.database_header); + let pc = find_free_cell(page_ref, db_header, amount); + if pc != 0 { + return pc as u16; + } + /* fall through, we might need to defragment */ + } + + if gap + 2 + amount as usize > top { + // defragment + self.defragment_page(page_ref, RefCell::borrow(&self.database_header)); + let mut buf_ref = RefCell::borrow_mut(&page_ref.buffer); + let buf = buf_ref.as_mut_slice(); + top = u16::from_be_bytes([buf[5], buf[6]]) as usize; + } + + let db_header = RefCell::borrow(&self.database_header); + top -= amount; + + { + let mut buf_ref = RefCell::borrow_mut(&page_ref.buffer); + let buf = buf_ref.as_mut_slice(); + buf[5..7].copy_from_slice(&(top as u16).to_be_bytes()); + } + + let usable_space = (db_header.page_size - db_header.unused_space as u16) as usize; + assert!(top + amount <= usable_space); + return top as u16; + } + + fn defragment_page(&self, page: &PageContent, db_header: Ref) { + let cloned_page = page.clone(); + let usable_space = (db_header.page_size - db_header.unused_space as u16) as u64; + let mut cbrk = usable_space as u64; + + // TODO: implement fast algorithm + + let last_cell = (usable_space - 4) as u64; + let first_cell = { + let (start, end) = cloned_page.cell_get_raw_pointer_region(); + start + end + }; + + if cloned_page.cell_count() > 0 { + let page_type = page.page_type(); + let buf = RefCell::borrow(&cloned_page.buffer); + let buf = buf.as_slice(); + let mut write_buf = RefCell::borrow_mut(&page.buffer); + let write_buf = write_buf.as_mut_slice(); + + for i in 0..cloned_page.cell_count() { + let cell_offset = page.offset + 8; + let cell_idx = cell_offset + i * 2; + + let pc = u16::from_be_bytes([buf[cell_idx], buf[cell_idx + 1]]) as u64; + if pc > last_cell { + unimplemented!("corrupted page"); + } + + assert!(pc <= last_cell); + + let size = match page_type { + PageType::TableInterior => { + let (_, nr_key) = match read_varint(&buf[pc as usize ..]) { + Ok(v) => v, + Err(_) => todo!( + "error while parsing varint from cell, probably treat this as corruption?" + ), + }; + 4 + nr_key as u64 + } + PageType::TableLeaf => { + let (payload_size, nr_payload) = match read_varint(&buf[pc as usize..]) { + Ok(v) => v, + Err(_) => todo!( + "error while parsing varint from cell, probably treat this as corruption?" + ), + }; + let (_, nr_key) = match read_varint(&buf[pc as usize + nr_payload as usize..]) { + Ok(v) => v, + Err(_) => todo!( + "error while parsing varint from cell, probably treat this as corruption?" + ), + }; + // TODO: add overflow page calculation + payload_size + nr_payload as u64 + nr_key as u64 + } + PageType::IndexInterior => todo!(), + PageType::IndexLeaf => todo!(), + }; + cbrk -= size; + if cbrk < first_cell as u64 || pc as u64 + size > usable_space as u64 { + todo!("corrupt"); + } + assert!(cbrk + size <= usable_space && cbrk >= first_cell as u64); + // set new pointer + write_buf[cell_idx..cell_idx + 2].copy_from_slice(&(cbrk as u16).to_be_bytes()); + // copy payload + write_buf[cbrk as usize..cbrk as usize + size as usize] + .copy_from_slice(&buf[pc as usize..pc as usize + size as usize]); + } + } + + // assert!( nfree >= 0 ); + // if( data[hdr+7]+cbrk-iCellFirst!=pPage->nFree ){ + // return SQLITE_CORRUPT_PAGE(pPage); + // } + assert!(cbrk >= first_cell as u64); + let mut write_buf = RefCell::borrow_mut(&page.buffer); + let write_buf = write_buf.as_mut_slice(); + + // set new first byte of cell content + write_buf[5..7].copy_from_slice(&(cbrk as u16).to_be_bytes()); + // set free block to 0, unused spaced can be retrieved from gap between cell pointer end and content start + write_buf[1] = 0; + write_buf[2] = 0; + // set unused space to 0 + let first_cell = cloned_page.cell_content_area() as u64; + assert!(first_cell <= cbrk); + write_buf[first_cell as usize..cbrk as usize].fill(0); + } + + // Free blocks can be zero, meaning the "real free space" that can be used to allocate is expected to be between first cell byte + // and end of cell pointer area. + fn compute_free_space(&self, page: &PageContent, db_header: Ref) -> u16 { + let buffer = RefCell::borrow(&page.buffer); + let buf = buffer.as_slice(); + + let usable_space = (db_header.page_size - db_header.unused_space as u16) as usize; + let mut first_byte_in_cell_content = page.cell_content_area(); + if first_byte_in_cell_content == 0 { + first_byte_in_cell_content = u16::MAX; + } + + let fragmented_free_bytes = page.num_frag_free_bytes(); + let free_block_pointer = page.first_freeblock(); + let ncell = page.cell_count(); + + // 8 + 4 == header end + let first_cell = (page.offset + 8 + 4 + (2 * ncell)) as u16; + + let mut nfree = fragmented_free_bytes as usize + first_byte_in_cell_content as usize; + + let mut pc = free_block_pointer as usize; + if pc > 0 { + let mut next = 0; + let mut size = 0; + if pc < first_byte_in_cell_content as usize { + // corrupt + todo!("corrupted page"); + } + + loop { + // TODO: check corruption icellast + next = u16::from_be_bytes(buf[pc..pc + 2].try_into().unwrap()) as usize; + size = u16::from_be_bytes(buf[pc + 2..pc + 4].try_into().unwrap()) as usize; + nfree += size as usize; + if next <= pc + size + 3 { + break; + } + pc = next as usize; + } + + if next > 0 { + todo!("corrupted page ascending order"); + } + + if pc + size > usable_space { + todo!("corrupted page last freeblock extends last page end"); + } + } + + // if( nFree>usableSize || nFree, amount: usize) -> usize { + // NOTE: freelist is in ascending order of keys and pc + // unuse_space is reserved bytes at the end of page, therefore we must substract from maxpc + let mut pc = page_ref.first_freeblock() as usize; + + let buf_ref = RefCell::borrow(&page_ref.buffer); + let buf = buf_ref.as_slice(); + + let usable_space = (db_header.page_size - db_header.unused_space as u16) as usize; + let maxpc = (usable_space - amount as usize) as usize; + let mut found = false; + while pc <= maxpc { + let next = u16::from_be_bytes(buf[pc..pc + 2].try_into().unwrap()); + let size = u16::from_be_bytes(buf[pc + 2..pc + 4].try_into().unwrap()); + if amount <= size as usize { + found = true; + break; + } + pc = next as usize; + } + if !found { + 0 + } else { + pc + } } impl Cursor for BTreeCursor { @@ -159,8 +804,27 @@ impl Cursor for BTreeCursor { Ok(self.record.borrow()) } - fn insert(&mut self, _record: &OwnedRecord) -> Result<()> { - unimplemented!() + fn insert( + &mut self, + key: &OwnedValue, + _record: &OwnedRecord, + moved_before: bool, /* Indicate whether it's necessary to traverse to find the leaf page */ + ) -> Result> { + let int_key = match key { + OwnedValue::Integer(i) => i, + _ => unreachable!("btree tables are indexed by integers!"), + }; + if !moved_before { + match self.move_to(*int_key as u64)? { + CursorResult::Ok(_) => {} + CursorResult::IO => return Ok(CursorResult::IO), + }; + } + + match self.insert_to_page(key, _record)? { + CursorResult::Ok(_) => Ok(CursorResult::Ok(())), + CursorResult::IO => Ok(CursorResult::IO), + } } fn set_null_flag(&mut self, flag: bool) { @@ -170,4 +834,61 @@ impl Cursor for BTreeCursor { fn get_null_flag(&self) -> bool { self.null_flag } + + fn exists(&mut self, key: &OwnedValue) -> Result> { + let int_key = match key { + OwnedValue::Integer(i) => i, + _ => unreachable!("btree tables are indexed by integers!"), + }; + match self.move_to(*int_key as u64)? { + CursorResult::Ok(_) => {} + CursorResult::IO => return Ok(CursorResult::IO), + }; + let page_ref = self.get_page()?; + let page = RefCell::borrow(&page_ref); + if page.is_locked() { + return Ok(CursorResult::IO); + } + + let page = page.contents.read().unwrap(); + let page = page.as_ref().unwrap(); + + // find cell + let int_key = match key { + OwnedValue::Integer(i) => *i as u64, + _ => unreachable!("btree tables are indexed by integers!"), + }; + let cell_idx = find_cell(page, int_key); + if cell_idx >= page.cell_count() { + Ok(CursorResult::Ok(false)) + } else { + let equals = match &page.cell_get(cell_idx)? { + BTreeCell::TableLeafCell(l) => l._rowid == int_key, + _ => unreachable!(), + }; + Ok(CursorResult::Ok(equals)) + } + } +} + +fn find_cell(page: &PageContent, int_key: u64) -> usize { + let mut cell_idx = 0; + let cell_count = page.cell_count(); + while cell_idx < cell_count { + match page.cell_get(cell_idx).unwrap() { + BTreeCell::TableLeafCell(cell) => { + if int_key <= cell._rowid { + break; + } + } + BTreeCell::TableInteriorCell(cell) => { + if int_key <= cell._rowid { + break; + } + } + _ => todo!(), + } + cell_idx += 1; + } + cell_idx } diff --git a/core/io/darwin.rs b/core/io/darwin.rs index 2b63da31e..06b057f65 100644 --- a/core/io/darwin.rs +++ b/core/io/darwin.rs @@ -2,7 +2,7 @@ use crate::error::LimboError; use crate::io::common; use crate::Result; -use super::{Completion, File, WriteCompletion, IO}; +use super::{Completion, File, IO}; use libc::{c_short, fcntl, flock, F_SETLK}; use log::trace; use polling::{Event, Events, Poller}; @@ -67,7 +67,12 @@ impl IO for DarwinIO { match cf { CompletionCallback::Read(ref file, ref c, pos) => { let mut file = file.borrow_mut(); - let mut buf = c.buf_mut(); + let c: &Completion = &c; + let r = match c { + Completion::Read(r) => r, + Completion::Write(_) => unreachable!(), + }; + let mut buf = r.buf_mut(); file.seek(std::io::SeekFrom::Start(pos as u64))?; file.read(buf.as_mut_slice()) } @@ -81,12 +86,12 @@ impl IO for DarwinIO { }; match result { std::result::Result::Ok(n) => { - match cf { + match &cf { CompletionCallback::Read(_, ref c, _) => { - c.complete(); + c.complete(0); } CompletionCallback::Write(_, ref c, _, _) => { - c.complete(n); + c.complete(n as i32); } } return Ok(()); @@ -105,7 +110,7 @@ enum CompletionCallback { Read(Rc>, Rc, usize), Write( Rc>, - Rc, + Rc, Rc>, usize, ), @@ -142,7 +147,10 @@ impl File for DarwinFile { "Failed locking file. File is locked by another process" ))); } else { - return Err(LimboError::LockingError(format!("Failed locking file, {}", err))); + return Err(LimboError::LockingError(format!( + "Failed locking file, {}", + err + ))); } } Ok(()) @@ -168,17 +176,21 @@ impl File for DarwinFile { Ok(()) } - fn pread(&self, pos: usize, c: Rc) -> Result<()> { - let file = self.file.borrow(); + fn pread(&self, pos: usize, c: Rc) -> Result<()> { + let file = self.file.borrow(); let result = { - let mut buf = c.buf_mut(); + let r = match &(*c) { + Completion::Read(r) => r, + Completion::Write(_) => unreachable!(), + }; + let mut buf = r.buf_mut(); rustix::io::pread(file.as_fd(), buf.as_mut_slice(), pos as u64) }; match result { std::result::Result::Ok(n) => { trace!("pread n: {}", n); // Read succeeded immediately - c.complete(); + c.complete(0); Ok(()) } Err(Errno::AGAIN) => { @@ -204,7 +216,7 @@ impl File for DarwinFile { &self, pos: usize, buffer: Rc>, - c: Rc, + c: Rc, ) -> Result<()> { let file = self.file.borrow(); let result = { @@ -215,7 +227,7 @@ impl File for DarwinFile { std::result::Result::Ok(n) => { trace!("pwrite n: {}", n); // Read succeeded immediately - c.complete(n); + c.complete(n as i32); Ok(()) } Err(Errno::AGAIN) => { diff --git a/core/io/generic.rs b/core/io/generic.rs index 0fb86d3d8..113a3aacd 100644 --- a/core/io/generic.rs +++ b/core/io/generic.rs @@ -1,4 +1,4 @@ -use crate::{Completion, File, Result, WriteCompletion, IO}; +use crate::{Completion, File, Result, IO}; use log::trace; use std::cell::RefCell; use std::io::{Read, Seek, Write}; @@ -45,11 +45,15 @@ impl File for GenericFile { let mut file = self.file.borrow_mut(); file.seek(std::io::SeekFrom::Start(pos as u64))?; { - let mut buf = c.buf_mut(); + let r = match &(*c) { + Completion::Read(r) => r, + Completion::Write(_) => unreachable!(), + }; + let mut buf = r.buf_mut(); let buf = buf.as_mut_slice(); file.read_exact(buf)?; } - c.complete(); + c.complete(0); Ok(()) } @@ -57,7 +61,7 @@ impl File for GenericFile { &self, pos: usize, buffer: Rc>, - c: Rc, + c: Rc, ) -> Result<()> { let mut file = self.file.borrow_mut(); file.seek(std::io::SeekFrom::Start(pos as u64))?; @@ -72,4 +76,4 @@ impl Drop for GenericFile { fn drop(&mut self) { self.unlock_file().expect("Failed to unlock file"); } -} \ No newline at end of file +} diff --git a/core/io/linux.rs b/core/io/linux.rs index 71a5cd58d..24621dd93 100644 --- a/core/io/linux.rs +++ b/core/io/linux.rs @@ -1,5 +1,5 @@ -use super::{common, Completion, File, WriteCompletion, IO}; -use crate::{Result, LimboError}; +use super::{common, Completion, File, IO}; +use crate::{LimboError, Result}; use libc::{c_short, fcntl, flock, iovec, F_SETLK}; use log::{debug, trace}; use nix::fcntl::{FcntlArg, OFlag}; @@ -95,10 +95,13 @@ impl IO for LinuxIO { while let Some(cqe) = ring.completion().next() { let result = cqe.result(); if result < 0 { - return Err(LimboError::LinuxIOError(format!("{}", LinuxIOError::IOUringCQError(result)))); + return Err(LimboError::LinuxIOError(format!( + "{}", + LinuxIOError::IOUringCQError(result) + ))); } let c = unsafe { Rc::from_raw(cqe.user_data() as *const Completion) }; - c.complete(); + c.complete(cqe.result()); } Ok(()) } @@ -130,7 +133,9 @@ impl File for LinuxFile { if lock_result == -1 { let err = std::io::Error::last_os_error(); if err.kind() == std::io::ErrorKind::WouldBlock { - return Err(LimboError::LockingError("File is locked by another process".into())); + return Err(LimboError::LockingError( + "File is locked by another process".into(), + )); } else { return Err(LimboError::IOError(err)); } @@ -159,11 +164,15 @@ impl File for LinuxFile { } fn pread(&self, pos: usize, c: Rc) -> Result<()> { - trace!("pread(pos = {}, length = {})", pos, c.buf().len()); + let r = match &(*c) { + Completion::Read(r) => r, + Completion::Write(_) => unreachable!(), + }; + trace!("pread(pos = {}, length = {})", pos, r.buf().len()); let fd = io_uring::types::Fd(self.file.as_raw_fd()); let mut io = self.io.borrow_mut(); let read_e = { - let mut buf = c.buf_mut(); + let mut buf = r.buf_mut(); let len = buf.len(); let buf = buf.as_mut_ptr(); let ptr = Rc::into_raw(c.clone()); @@ -186,7 +195,7 @@ impl File for LinuxFile { &self, pos: usize, buffer: Rc>, - c: Rc, + c: Rc, ) -> Result<()> { let mut io = self.io.borrow_mut(); let fd = io_uring::types::Fd(self.file.as_raw_fd()); diff --git a/core/io/mod.rs b/core/io/mod.rs index 98d903adc..cb26bb832 100644 --- a/core/io/mod.rs +++ b/core/io/mod.rs @@ -1,7 +1,9 @@ use crate::Result; use cfg_block::cfg_block; +use std::fmt; use std::{ cell::{Ref, RefCell, RefMut}, + fmt::Debug, mem::ManuallyDrop, pin::Pin, rc::Rc, @@ -11,8 +13,7 @@ pub trait File { fn lock_file(&self, exclusive: bool) -> Result<()>; fn unlock_file(&self) -> Result<()>; fn pread(&self, pos: usize, c: Rc) -> Result<()>; - fn pwrite(&self, pos: usize, buffer: Rc>, c: Rc) - -> Result<()>; + fn pwrite(&self, pos: usize, buffer: Rc>, c: Rc) -> Result<()>; } pub trait IO { @@ -21,21 +22,34 @@ pub trait IO { fn run_once(&self) -> Result<()>; } -pub type Complete = dyn Fn(&Buffer); -pub type WriteComplete = dyn Fn(usize); +pub type Complete = dyn Fn(Rc>); +pub type WriteComplete = dyn Fn(i32); -pub struct Completion { - pub buf: RefCell, +pub enum Completion { + Read(ReadCompletion), + Write(WriteCompletion), +} + +pub struct ReadCompletion { + pub buf: Rc>, pub complete: Box, } +impl Completion { + pub fn complete(&self, result: i32) { + match self { + Completion::Read(r) => r.complete(), + Completion::Write(w) => w.complete(result), // fix + } + } +} + pub struct WriteCompletion { pub complete: Box, } -impl Completion { - pub fn new(buf: Buffer, complete: Box) -> Self { - let buf = RefCell::new(buf); +impl ReadCompletion { + pub fn new(buf: Rc>, complete: Box) -> Self { Self { buf, complete } } @@ -48,8 +62,7 @@ impl Completion { } pub fn complete(&self) { - let buf = self.buf.borrow_mut(); - (self.complete)(&buf); + (self.complete)(self.buf.clone()); } } @@ -57,7 +70,7 @@ impl WriteCompletion { pub fn new(complete: Box) -> Self { Self { complete } } - pub fn complete(&self, bytes_written: usize) { + pub fn complete(&self, bytes_written: i32) { (self.complete)(bytes_written); } } @@ -72,6 +85,12 @@ pub struct Buffer { drop: BufferDropFn, } +impl Debug for Buffer { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{:?}", self.data) + } +} + impl Drop for Buffer { fn drop(&mut self) { let data = unsafe { ManuallyDrop::take(&mut self.data) }; diff --git a/core/io/windows.rs b/core/io/windows.rs index cd0119b9c..ca48e8d0b 100644 --- a/core/io/windows.rs +++ b/core/io/windows.rs @@ -43,11 +43,15 @@ impl File for WindowsFile { let mut file = self.file.borrow_mut(); file.seek(std::io::SeekFrom::Start(pos as u64))?; { - let mut buf = c.buf_mut(); + let r = match &(*c) { + Completion::Read(r) => r, + Completion::Write(_) => unreachable!(), + }; + let mut buf = r.buf_mut(); let buf = buf.as_mut_slice(); file.read_exact(buf)?; } - c.complete(); + c.complete(0); Ok(()) } @@ -55,7 +59,7 @@ impl File for WindowsFile { &self, pos: usize, buffer: Rc>, - _c: Rc, + c: Rc, ) -> Result<()> { let mut file = self.file.borrow_mut(); file.seek(std::io::SeekFrom::Start(pos as u64))?; diff --git a/core/lib.rs b/core/lib.rs index 593e8765e..eefd2acde 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -201,6 +201,11 @@ impl Connection { } Ok(()) } + + pub fn cacheflush(&self) -> Result<()> { + self.pager.cacheflush()?; + Ok(()) + } } pub struct Statement { diff --git a/core/pager.rs b/core/pager.rs index de7a1d6bf..876c5c242 100644 --- a/core/pager.rs +++ b/core/pager.rs @@ -1,18 +1,22 @@ use crate::buffer_pool::BufferPool; -use crate::sqlite3_ondisk::BTreePage; +use crate::sqlite3_ondisk::PageContent; use crate::sqlite3_ondisk::{self, DatabaseHeader}; -use crate::{PageSource, Result}; +use crate::{Buffer, PageSource, Result}; use log::trace; use sieve_cache::SieveCache; +use std::borrow::Borrow; use std::cell::RefCell; +use std::collections::HashMap; use std::hash::Hash; +use std::ptr::{drop_in_place, NonNull}; use std::rc::Rc; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; pub struct Page { - flags: AtomicUsize, - pub contents: RwLock>, + pub flags: AtomicUsize, + pub contents: RwLock>, + pub id: usize, } /// Page is up-to-date. @@ -21,18 +25,21 @@ const PAGE_UPTODATE: usize = 0b001; const PAGE_LOCKED: usize = 0b010; /// Page had an I/O error. const PAGE_ERROR: usize = 0b100; +/// Page is dirty. Flush needed. +const PAGE_DIRTY: usize = 0b1000; impl Default for Page { fn default() -> Self { - Self::new() + Self::new(0) } } impl Page { - pub fn new() -> Page { + pub fn new(id: usize) -> Page { Page { flags: AtomicUsize::new(0), contents: RwLock::new(None), + id, } } @@ -71,6 +78,162 @@ impl Page { pub fn clear_error(&self) { self.flags.fetch_and(!PAGE_ERROR, Ordering::SeqCst); } + + pub fn is_dirty(&self) -> bool { + self.flags.load(Ordering::SeqCst) & PAGE_DIRTY != 0 + } + + pub fn set_dirty(&self) { + self.flags.fetch_or(PAGE_DIRTY, Ordering::SeqCst); + } + + pub fn clear_dirty(&self) { + self.flags.fetch_and(!PAGE_DIRTY, Ordering::SeqCst); + } +} + +struct PageCacheEntry { + key: usize, + page: Rc>, + prev: Option>, + next: Option>, +} + +impl PageCacheEntry { + fn into_non_null(&mut self) -> NonNull { + NonNull::new(&mut *self).unwrap() + } +} + +struct DumbLruPageCache { + capacity: usize, + map: RefCell>>, + head: RefCell>>, + tail: RefCell>>, +} + +impl DumbLruPageCache { + pub fn new(capacity: usize) -> Self { + Self { + capacity: capacity, + map: RefCell::new(HashMap::new()), + head: RefCell::new(None), + tail: RefCell::new(None), + } + } + + pub fn insert(&mut self, key: usize, value: Rc>) { + self.delete(key); + let mut entry = Box::new(PageCacheEntry { + key: key, + next: None, + prev: None, + page: value, + }); + self.touch(&mut entry); + + if self.map.borrow().len() >= self.capacity { + self.pop_if_not_dirty(); + } + let b = Box::into_raw(entry); + let as_non_null = NonNull::new(b).unwrap(); + self.map.borrow_mut().insert(key, as_non_null); + } + + pub fn delete(&mut self, key: usize) { + let ptr = self.map.borrow_mut().remove(&key); + if ptr.is_none() { + return; + } + let mut ptr = ptr.unwrap(); + { + let ptr = unsafe { ptr.as_mut() }; + self.detach(ptr); + } + unsafe { drop_in_place(ptr.as_ptr()) }; + } + + fn get_ptr(&mut self, key: usize) -> Option> { + let m = self.map.borrow_mut(); + let ptr = m.get(&key); + match ptr { + Some(v) => Some(*v), + None => None, + } + } + + pub fn get(&mut self, key: &usize) -> Option>> { + let ptr = self.get_ptr(*key); + if ptr.is_none() { + return None; + } + let ptr = unsafe { ptr.unwrap().as_mut() }; + let page = ptr.page.clone(); + self.detach(ptr); + self.touch(ptr); + return Some(page); + } + + pub fn resize(&mut self, capacity: usize) { + let _ = capacity; + todo!(); + } + + fn detach(&mut self, entry: &mut PageCacheEntry) { + let mut current = entry.into_non_null(); + + let (next, prev) = unsafe { + let c = current.as_mut(); + let next = c.next; + let prev = c.prev; + c.prev = None; + c.next = None; + (next, prev) + }; + + // detach + match (prev, next) { + (None, None) => {} + (None, Some(_)) => todo!(), + (Some(p), None) => { + self.tail = RefCell::new(Some(p)); + } + (Some(mut p), Some(mut n)) => unsafe { + let p_mut = p.as_mut(); + p_mut.next = Some(n); + let n_mut = n.as_mut(); + n_mut.prev = Some(p); + }, + }; + } + + fn touch(&mut self, entry: &mut PageCacheEntry) { + let mut current = entry.into_non_null(); + unsafe { + let c = current.as_mut(); + c.next = *self.head.borrow(); + } + + if let Some(mut head) = *self.head.borrow_mut() { + unsafe { + let head = head.as_mut(); + head.prev = Some(current); + } + } + } + + fn pop_if_not_dirty(&mut self) { + let tail = *self.tail.borrow(); + if tail.is_none() { + return; + } + let tail = unsafe { tail.unwrap().as_mut() }; + if RefCell::borrow(&tail.page).is_dirty() { + // TODO: drop from another clean entry? + return; + } + self.detach(tail); + } } pub struct PageCache { @@ -101,12 +264,13 @@ impl PageCache { pub struct Pager { /// Source of the database pages. pub page_source: PageSource, - /// Cache for storing loaded pages. - page_cache: RefCell>>, + page_cache: RefCell, /// Buffer pool for temporary data storage. buffer_pool: Rc, /// I/O interface for input/output operations. pub io: Arc, + dirty_pages: Rc>>>>, + db_header: Rc>, } impl Pager { @@ -117,32 +281,34 @@ impl Pager { /// Completes opening a database by initializing the Pager with the database header. pub fn finish_open( - db_header: Rc>, + db_header_ref: Rc>, page_source: PageSource, io: Arc, ) -> Result { - let db_header = db_header.borrow(); + let db_header = RefCell::borrow(&db_header_ref); let page_size = db_header.page_size as usize; let buffer_pool = Rc::new(BufferPool::new(page_size)); - let page_cache = RefCell::new(PageCache::new(SieveCache::new(10).unwrap())); + let page_cache = RefCell::new(DumbLruPageCache::new(10)); Ok(Self { page_source, buffer_pool, page_cache, io, + dirty_pages: Rc::new(RefCell::new(Vec::new())), + db_header: db_header_ref.clone(), }) } /// Reads a page from the database. - pub fn read_page(&self, page_idx: usize) -> Result> { + pub fn read_page(&self, page_idx: usize) -> crate::Result>> { trace!("read_page(page_idx = {})", page_idx); let mut page_cache = self.page_cache.borrow_mut(); if let Some(page) = page_cache.get(&page_idx) { return Ok(page.clone()); } - let page = Rc::new(Page::new()); - page.set_locked(); - sqlite3_ondisk::begin_read_btree_page( + let page = Rc::new(RefCell::new(Page::new(page_idx))); + RefCell::borrow(&page).set_locked(); + sqlite3_ondisk::begin_read_page( &self.page_source, self.buffer_pool.clone(), page.clone(), @@ -161,4 +327,66 @@ impl Pager { pub fn change_page_cache_size(&self, capacity: usize) { self.page_cache.borrow_mut().resize(capacity); } + + pub fn add_dirty(&self, page: Rc>) { + // TODO: cehck duplicates? + let mut dirty_pages = RefCell::borrow_mut(&self.dirty_pages); + dirty_pages.push(page); + } + + pub fn cacheflush(&self) -> Result<()> { + let mut dirty_pages = RefCell::borrow_mut(&self.dirty_pages); + if dirty_pages.len() == 0 { + return Ok(()); + } + loop { + if dirty_pages.len() == 0 { + break; + } + let page = dirty_pages.pop().unwrap(); + sqlite3_ondisk::begin_write_btree_page(self, &page)?; + } + self.io.run_once()?; + Ok(()) + } + + /* + Get's a new page that increasing the size of the page or uses a free page. + Currently free list pages are not yet supported. + */ + pub fn allocate_page(&self) -> Result>> { + let header = &self.db_header; + let mut header = RefCell::borrow_mut(&header); + header.database_size += 1; + { + // update database size + let first_page_ref = self.read_page(1).unwrap(); + let first_page = RefCell::borrow_mut(&first_page_ref); + first_page.set_dirty(); + self.add_dirty(first_page_ref.clone()); + + let contents = first_page.contents.write().unwrap(); + let contents = contents.as_ref().unwrap(); + contents.write_database_header(&header); + } + + let page_ref = Rc::new(RefCell::new(Page::new(0))); + { + // setup page and add to cache + self.add_dirty(page_ref.clone()); + let mut page = RefCell::borrow_mut(&page_ref); + page.set_dirty(); + page.id = header.database_size as usize; + let buffer = self.buffer_pool.get(); + let bp = self.buffer_pool.clone(); + let drop_fn = Rc::new(move |buf| { + bp.put(buf); + }); + let buffer = Rc::new(RefCell::new(Buffer::new(buffer, drop_fn))); + page.contents = RwLock::new(Some(PageContent { offset: 0, buffer })); + let mut cache = RefCell::borrow_mut(&self.page_cache); + cache.insert(page.id, page_ref.clone()); + } + Ok(page_ref) + } } diff --git a/core/pseudo.rs b/core/pseudo.rs index dfd3212ad..3cb8e421c 100644 --- a/core/pseudo.rs +++ b/core/pseudo.rs @@ -50,9 +50,16 @@ impl Cursor for PseudoCursor { Ok(self.current.borrow()) } - fn insert(&mut self, record: &OwnedRecord) -> Result<()> { + fn insert( + &mut self, + key: &OwnedValue, + record: &OwnedRecord, + moved_before: bool, + ) -> Result> { + let _ = key; + let _ = moved_before; *self.current.borrow_mut() = Some(record.clone()); - Ok(()) + Ok(CursorResult::Ok(())) } fn get_null_flag(&self) -> bool { @@ -62,4 +69,9 @@ impl Cursor for PseudoCursor { fn set_null_flag(&mut self, _null_flag: bool) { // Do nothing } + + fn exists(&mut self, key: &OwnedValue) -> Result> { + let _ = key; + todo!() + } } diff --git a/core/schema.rs b/core/schema.rs index d91a8a9e8..b910a81e2 100644 --- a/core/schema.rs +++ b/core/schema.rs @@ -83,6 +83,13 @@ impl Table { Table::Pseudo(table) => &table.columns, } } + + pub fn has_rowid(&self) -> bool { + match self { + Table::BTree(table) => table.has_rowid, + Table::Pseudo(_) => todo!(), + } + } } impl PartialEq for Table { diff --git a/core/sqlite3_ondisk.rs b/core/sqlite3_ondisk.rs index 8a4216853..d57efef75 100644 --- a/core/sqlite3_ondisk.rs +++ b/core/sqlite3_ondisk.rs @@ -25,7 +25,7 @@ /// For more information, see: https://www.sqlite.org/fileformat.html use crate::buffer_pool::BufferPool; use crate::error::LimboError; -use crate::io::{Buffer, Completion, WriteCompletion}; +use crate::io::{Buffer, Completion, ReadCompletion, WriteCompletion}; use crate::pager::{Page, Pager}; use crate::types::{OwnedRecord, OwnedValue}; use crate::{PageSource, Result}; @@ -47,12 +47,12 @@ pub struct DatabaseHeader { pub page_size: u16, write_version: u8, read_version: u8, - unused_space: u8, + pub unused_space: u8, max_embed_frac: u8, min_embed_frac: u8, min_leaf_frac: u8, change_counter: u32, - database_size: u32, + pub database_size: u32, freelist_trunk_page: u32, freelist_pages: u32, schema_cookie: u32, @@ -70,19 +70,23 @@ pub struct DatabaseHeader { pub fn begin_read_database_header(page_source: &PageSource) -> Result>> { let drop_fn = Rc::new(|_buf| {}); - let buf = Buffer::allocate(512, drop_fn); + let buf = Rc::new(RefCell::new(Buffer::allocate(512, drop_fn))); let result = Rc::new(RefCell::new(DatabaseHeader::default())); let header = result.clone(); - let complete = Box::new(move |buf: &Buffer| { + let complete = Box::new(move |buf: Rc>| { let header = header.clone(); finish_read_database_header(buf, header).unwrap(); }); - let c = Rc::new(Completion::new(buf, complete)); + let c = Rc::new(Completion::Read(ReadCompletion::new(buf, complete))); page_source.get(1, c.clone())?; Ok(result) } -fn finish_read_database_header(buf: &Buffer, header: Rc>) -> Result<()> { +fn finish_read_database_header( + buf: Rc>, + header: Rc>, +) -> Result<()> { + let buf = buf.borrow(); let buf = buf.as_slice(); let mut header = std::cell::RefCell::borrow_mut(&header); header.magic.copy_from_slice(&buf[0..16]); @@ -123,38 +127,14 @@ pub fn begin_write_database_header(header: &DatabaseHeader, pager: &Pager) -> Re let buffer_to_copy_in_cb = buffer_to_copy.clone(); let header_cb = header.clone(); - let complete = Box::new(move |buffer: &Buffer| { + let complete = Box::new(move |buffer: Rc>| { let header = header_cb.clone(); - let buffer: Buffer = buffer.clone(); + let buffer: Buffer = buffer.borrow().clone(); let buffer = Rc::new(RefCell::new(buffer)); { let mut buf_mut = std::cell::RefCell::borrow_mut(&buffer); let buf = buf_mut.as_mut_slice(); - buf[0..16].copy_from_slice(&header.magic); - buf[16..18].copy_from_slice(&header.page_size.to_be_bytes()); - buf[18] = header.write_version; - buf[19] = header.read_version; - buf[20] = header.unused_space; - buf[21] = header.max_embed_frac; - buf[22] = header.min_embed_frac; - buf[23] = header.min_leaf_frac; - buf[24..28].copy_from_slice(&header.change_counter.to_be_bytes()); - buf[28..32].copy_from_slice(&header.database_size.to_be_bytes()); - buf[32..36].copy_from_slice(&header.freelist_trunk_page.to_be_bytes()); - buf[36..40].copy_from_slice(&header.freelist_pages.to_be_bytes()); - buf[40..44].copy_from_slice(&header.schema_cookie.to_be_bytes()); - buf[44..48].copy_from_slice(&header.schema_format.to_be_bytes()); - buf[48..52].copy_from_slice(&header.default_cache_size.to_be_bytes()); - - buf[52..56].copy_from_slice(&header.vacuum.to_be_bytes()); - buf[56..60].copy_from_slice(&header.text_encoding.to_be_bytes()); - buf[60..64].copy_from_slice(&header.user_version.to_be_bytes()); - buf[64..68].copy_from_slice(&header.incremental_vacuum.to_be_bytes()); - - buf[68..72].copy_from_slice(&header.application_id.to_be_bytes()); - buf[72..92].copy_from_slice(&header.reserved); - buf[92..96].copy_from_slice(&header.version_valid_for.to_be_bytes()); - buf[96..100].copy_from_slice(&header.version_number.to_be_bytes()); + write_header_to_buf(buf, &header); let mut buffer_to_copy = std::cell::RefCell::borrow_mut(&buffer_to_copy_in_cb); let buffer_to_copy_slice = buffer_to_copy.as_mut_slice(); @@ -163,39 +143,57 @@ pub fn begin_write_database_header(header: &DatabaseHeader, pager: &Pager) -> Re }); let drop_fn = Rc::new(|_buf| {}); - let buf = Buffer::allocate(512, drop_fn); - let c = Rc::new(Completion::new(buf.clone(), complete)); + let buf = Rc::new(RefCell::new(Buffer::allocate(512, drop_fn))); + let c = Rc::new(Completion::Read(ReadCompletion::new(buf.clone(), complete))); page_source.get(1, c.clone())?; // run get header block pager.io.run_once()?; let buffer_in_cb = buffer_to_copy.clone(); - let write_complete = Box::new(move |bytes_written: usize| { + let write_complete = Box::new(move |bytes_written: i32| { let buf = buffer_in_cb.clone(); let buf_len = std::cell::RefCell::borrow(&buf).len(); - if bytes_written < buf_len { + if bytes_written < buf_len as i32 { log::error!("wrote({bytes_written}) less than expected({buf_len})"); } // finish_read_database_header(buf, header).unwrap(); }); - let c = Rc::new(WriteCompletion::new(write_complete)); + let c = Rc::new(Completion::Write(WriteCompletion::new(write_complete))); page_source.write(0, buffer_to_copy.clone(), c).unwrap(); Ok(()) } -#[derive(Debug)] -pub struct BTreePageHeader { - page_type: PageType, - _first_freeblock_offset: u16, - num_cells: u16, - _cell_content_area: u16, - _num_frag_free_bytes: u8, - pub(crate) right_most_pointer: Option, +fn write_header_to_buf(buf: &mut [u8], header: &DatabaseHeader) { + buf[0..16].copy_from_slice(&header.magic); + buf[16..18].copy_from_slice(&header.page_size.to_be_bytes()); + buf[18] = header.write_version; + buf[19] = header.read_version; + buf[20] = header.unused_space; + buf[21] = header.max_embed_frac; + buf[22] = header.min_embed_frac; + buf[23] = header.min_leaf_frac; + buf[24..28].copy_from_slice(&header.change_counter.to_be_bytes()); + buf[28..32].copy_from_slice(&header.database_size.to_be_bytes()); + buf[32..36].copy_from_slice(&header.freelist_trunk_page.to_be_bytes()); + buf[36..40].copy_from_slice(&header.freelist_pages.to_be_bytes()); + buf[40..44].copy_from_slice(&header.schema_cookie.to_be_bytes()); + buf[44..48].copy_from_slice(&header.schema_format.to_be_bytes()); + buf[48..52].copy_from_slice(&header.default_cache_size.to_be_bytes()); + + buf[52..56].copy_from_slice(&header.vacuum.to_be_bytes()); + buf[56..60].copy_from_slice(&header.text_encoding.to_be_bytes()); + buf[60..64].copy_from_slice(&header.user_version.to_be_bytes()); + buf[64..68].copy_from_slice(&header.incremental_vacuum.to_be_bytes()); + + buf[68..72].copy_from_slice(&header.application_id.to_be_bytes()); + buf[72..92].copy_from_slice(&header.reserved); + buf[92..96].copy_from_slice(&header.version_valid_for.to_be_bytes()); + buf[96..100].copy_from_slice(&header.version_number.to_be_bytes()); } #[repr(u8)] -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Clone)] pub enum PageType { IndexInterior = 2, TableInterior = 5, @@ -218,15 +216,193 @@ impl TryFrom for PageType { } #[derive(Debug)] -pub struct BTreePage { - pub header: BTreePageHeader, - pub cells: Vec, +pub struct PageContent { + pub offset: usize, + pub buffer: Rc>, } -pub fn begin_read_btree_page( +impl Clone for PageContent { + fn clone(&self) -> Self { + Self { + offset: self.offset, + buffer: Rc::new(RefCell::new((*self.buffer.borrow()).clone())), + } + } +} + +impl PageContent { + pub fn page_type(&self) -> PageType { + self.read_u8(self.offset).try_into().unwrap() + } + + fn read_u8(&self, pos: usize) -> u8 { + // unsafe trick to borrow twice + unsafe { + let buf_pointer = &self.buffer.as_ptr(); + let buf = (*buf_pointer).as_ref().unwrap().as_slice(); + buf[pos] + } + } + + fn read_u16(&self, pos: usize) -> u16 { + unsafe { + let buf_pointer = &self.buffer.as_ptr(); + let buf = (*buf_pointer).as_ref().unwrap().as_slice(); + u16::from_be_bytes([buf[self.offset + pos], buf[self.offset + pos + 1]]) + } + } + + fn read_u32(&self, pos: usize) -> u32 { + unsafe { + let buf_pointer = &self.buffer.as_ptr(); + let buf = (*buf_pointer).as_ref().unwrap().as_slice(); + u32::from_be_bytes([ + buf[self.offset + pos], + buf[self.offset + pos + 1], + buf[self.offset + pos + 2], + buf[self.offset + pos + 3], + ]) + } + } + + pub fn write_u8(&self, pos: usize, value: u8) { + unsafe { + let buf_pointer = &self.buffer.as_ptr(); + let buf = (*buf_pointer).as_mut().unwrap().as_mut_slice(); + buf[self.offset + pos] = value; + } + } + + pub fn write_u16(&self, pos: usize, value: u16) { + unsafe { + let buf_pointer = &self.buffer.as_ptr(); + let buf = (*buf_pointer).as_mut().unwrap().as_mut_slice(); + buf[self.offset + pos..self.offset + pos + 2].copy_from_slice(&value.to_be_bytes()); + } + } + + pub fn write_u32(&self, pos: usize, value: u32) { + unsafe { + let buf_pointer = &self.buffer.as_ptr(); + let buf = (*buf_pointer).as_mut().unwrap().as_mut_slice(); + buf[self.offset + pos..self.offset + pos + 4].copy_from_slice(&value.to_be_bytes()); + } + } + + pub fn first_freeblock(&self) -> u16 { + self.read_u16(1) + } + + pub fn cell_count(&self) -> usize { + self.read_u16(3) as usize + } + + pub fn cell_content_area(&self) -> u16 { + self.read_u16(5) as u16 + } + + pub fn num_frag_free_bytes(&self) -> u8 { + self.read_u8(7) as u8 + } + + pub fn rightmost_pointer(&self) -> Option { + match self.page_type() { + PageType::IndexInterior => Some(self.read_u32(8)), + PageType::TableInterior => Some(self.read_u32(8)), + PageType::IndexLeaf => None, + PageType::TableLeaf => None, + } + } + + pub fn cell_get(&self, idx: usize) -> Result { + let buf = self.buffer.borrow(); + let buf = buf.as_slice(); + + let ncells = self.cell_count(); + let cell_start = match self.page_type() { + PageType::IndexInterior => 12, + PageType::TableInterior => 12, + PageType::IndexLeaf => 8, + PageType::TableLeaf => 8, + }; + assert!(idx < ncells, "cell_get: idx out of bounds"); + let cell_pointer = cell_start + (idx * 2); + let cell_pointer = self.read_u16(cell_pointer) as usize; + + read_btree_cell(buf, &self.page_type(), cell_pointer) + } + + pub fn cell_get_raw_pointer_region(&self) -> (usize, usize) { + let cell_start = match self.page_type() { + PageType::IndexInterior => 12, + PageType::TableInterior => 12, + PageType::IndexLeaf => 8, + PageType::TableLeaf => 8, + }; + (self.offset + cell_start, self.cell_count() * 2) + } + + pub fn cell_get_raw_region(&self, idx: usize) -> (usize, usize) { + let mut buf = self.buffer.borrow_mut(); + let buf = buf.as_mut_slice(); + self.cell_get_raw_region_borrowed(idx, buf) + } + + pub fn cell_get_raw_region_borrowed(&self, idx: usize, buf: &mut [u8]) -> (usize, usize) { + let ncells = self.cell_count(); + let cell_start = match self.page_type() { + PageType::IndexInterior => 12, + PageType::TableInterior => 12, + PageType::IndexLeaf => 8, + PageType::TableLeaf => 8, + }; + assert!(idx < ncells, "cell_get: idx out of bounds"); + let cell_pointer = cell_start + (idx * 2); + let cell_pointer = self.read_u16(cell_pointer) as usize; + let start = cell_pointer; + let len = match self.page_type() { + PageType::IndexInterior => { + let (len_payload, n_payload) = read_varint(&buf[cell_pointer + 4..]).unwrap(); + 4 + len_payload as usize + n_payload + 4 + } + PageType::TableInterior => { + let (_, n_rowid) = read_varint(&buf[cell_pointer + 4..]).unwrap(); + 4 + n_rowid + } + PageType::IndexLeaf => { + let (len_payload, n_payload) = read_varint(&buf[cell_pointer..]).unwrap(); + len_payload as usize + n_payload + 4 + } + PageType::TableLeaf => { + let (len_payload, n_payload) = read_varint(&buf[cell_pointer..]).unwrap(); + let (_, n_rowid) = read_varint(&buf[cell_pointer + n_payload..]).unwrap(); + // TODO: add overflow 4 bytes + len_payload as usize + n_payload + n_rowid + } + }; + (start, len) + } + + pub fn is_leaf(&self) -> bool { + match self.page_type() { + PageType::IndexInterior => false, + PageType::TableInterior => false, + PageType::IndexLeaf => true, + PageType::TableLeaf => true, + } + } + + pub fn write_database_header(&self, header: &DatabaseHeader) { + let mut buf = self.buffer.borrow_mut(); + let buf = buf.as_mut_slice(); + write_header_to_buf(buf, header); + } +} + +pub fn begin_read_page( page_source: &PageSource, buffer_pool: Rc, - page: Rc, + page: Rc>, page_idx: usize, ) -> Result<()> { trace!("begin_read_btree_page(page_idx = {})", page_idx); @@ -235,59 +411,67 @@ pub fn begin_read_btree_page( let buffer_pool = buffer_pool.clone(); buffer_pool.put(buf); }); - let buf = Buffer::new(buf, drop_fn); - let complete = Box::new(move |buf: &Buffer| { + let buf = Rc::new(RefCell::new(Buffer::new(buf, drop_fn))); + let complete = Box::new(move |buf: Rc>| { let page = page.clone(); - if finish_read_btree_page(page_idx, buf, page.clone()).is_err() { - page.set_error(); + if finish_read_page(page_idx, buf, page.clone()).is_err() { + page.borrow_mut().set_error(); } }); - let c = Rc::new(Completion::new(buf, complete)); + let c = Rc::new(Completion::Read(ReadCompletion::new(buf, complete))); page_source.get(page_idx, c.clone())?; Ok(()) } -fn finish_read_btree_page(page_idx: usize, buf: &Buffer, page: Rc) -> Result<()> { +fn finish_read_page( + page_idx: usize, + buffer_ref: Rc>, + page: Rc>, +) -> Result<()> { trace!("finish_read_btree_page(page_idx = {})", page_idx); - let mut pos = if page_idx == 1 { + let pos = if page_idx == 1 { DATABASE_HEADER_SIZE } else { 0 }; - let buf = buf.as_slice(); - let mut header = BTreePageHeader { - page_type: buf[pos].try_into()?, - _first_freeblock_offset: u16::from_be_bytes([buf[pos + 1], buf[pos + 2]]), - num_cells: u16::from_be_bytes([buf[pos + 3], buf[pos + 4]]), - _cell_content_area: u16::from_be_bytes([buf[pos + 5], buf[pos + 6]]), - _num_frag_free_bytes: buf[pos + 7], - right_most_pointer: None, + let inner = PageContent { + offset: pos, + buffer: buffer_ref.clone(), }; - pos += 8; - if header.page_type == PageType::IndexInterior || header.page_type == PageType::TableInterior { - header.right_most_pointer = Some(u32::from_be_bytes([ - buf[pos], - buf[pos + 1], - buf[pos + 2], - buf[pos + 3], - ])); - pos += 4; + { + let page = page.borrow_mut(); + page.contents.write().unwrap().replace(inner); + page.set_uptodate(); + page.clear_locked(); } - let mut cells = Vec::with_capacity(header.num_cells as usize); - for _ in 0..header.num_cells { - let cell_pointer = u16::from_be_bytes([buf[pos], buf[pos + 1]]); - pos += 2; - let cell = read_btree_cell(buf, &header.page_type, cell_pointer as usize)?; - cells.push(cell); - } - let inner = BTreePage { header, cells }; - page.contents.write().unwrap().replace(inner); - page.set_uptodate(); - page.clear_locked(); Ok(()) } -#[derive(Debug)] +pub fn begin_write_btree_page(pager: &Pager, page: &Rc>) -> Result<()> { + let page_source = &pager.page_source; + let page_finish = page.clone(); + let page = page.borrow(); + + let contents = page.contents.read().unwrap(); + let contents = contents.as_ref().unwrap(); + let buffer = contents.buffer.clone(); + let write_complete = { + let buf_copy = buffer.clone(); + Box::new(move |bytes_written: i32| { + let buf_copy = buf_copy.clone(); + let buf_len = buf_copy.borrow().len(); + page_finish.borrow_mut().clear_dirty(); + if bytes_written < buf_len as i32 { + log::error!("wrote({bytes_written}) less than expected({buf_len})"); + } + }) + }; + let c = Rc::new(Completion::Write(WriteCompletion::new(write_complete))); + page_source.write(page.id, buffer.clone(), c)?; + Ok(()) +} + +#[derive(Debug, Clone)] pub enum BTreeCell { TableInteriorCell(TableInteriorCell), TableLeafCell(TableLeafCell), @@ -295,27 +479,27 @@ pub enum BTreeCell { IndexLeafCell(IndexLeafCell), } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct TableInteriorCell { pub _left_child_page: u32, pub _rowid: u64, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct TableLeafCell { pub _rowid: u64, pub _payload: Vec, pub first_overflow_page: Option, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct IndexInteriorCell { pub left_child_page: u32, pub payload: Vec, pub first_overflow_page: Option, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct IndexLeafCell { pub payload: Vec, pub first_overflow_page: Option, @@ -442,15 +626,14 @@ pub fn read_record(payload: &[u8]) -> Result { let (serial_type, nr) = read_varint(&payload[pos..])?; let serial_type = SerialType::try_from(serial_type)?; serial_types.push(serial_type); - assert!(pos + nr < payload.len()); pos += nr; assert!(header_size >= nr); header_size -= nr; } let mut values = Vec::with_capacity(serial_types.len()); for serial_type in &serial_types { - let (value, usize) = read_value(&payload[pos..], serial_type)?; - pos += usize; + let (value, n) = read_value(&payload[pos..], serial_type)?; + pos += n; values.push(value); } Ok(OwnedRecord::new(values)) @@ -544,7 +727,7 @@ pub fn read_value(buf: &[u8], serial_type: &SerialType) -> Result<(OwnedValue, u } } -fn read_varint(buf: &[u8]) -> Result<(u64, usize)> { +pub fn read_varint(buf: &[u8]) -> Result<(u64, usize)> { let mut v: u64 = 0; for i in 0..8 { match buf.get(i) { @@ -563,6 +746,45 @@ fn read_varint(buf: &[u8]) -> Result<(u64, usize)> { Ok((v, 9)) } +pub fn write_varint(buf: &mut [u8], value: u64) -> usize { + if value <= 0x7f { + buf[0] = (value & 0x7f) as u8; + return 1; + } + + if value <= 0x3fff { + buf[0] = (((value >> 7) & 0x7f) | 0x80) as u8; + buf[1] = (value & 0x7f) as u8; + return 2; + } + + let mut value = value; + if (value & ((0xff000000_u64) << 32)) > 0 { + buf[8] = value as u8; + value >>= 8; + for i in (0..8).rev() { + buf[i] = ((value & 0x7f) | 0x80) as u8; + value >>= 7; + } + return 9; + } + + let mut encoded: [u8; 10] = [0; 10]; + let mut bytes = value; + let mut n = 0; + while bytes != 0 { + let v = 0x80 | (bytes & 0x7f); + encoded[n] = v as u8; + bytes >>= 7; + n += 1; + } + encoded[0] &= 0x7f; + for i in 0..n { + buf[i] = encoded[n - 1 - i]; + } + return n; +} + #[cfg(test)] mod tests { use super::*; @@ -651,4 +873,31 @@ mod tests { let result = read_varint(&buf); assert!(result.is_err()); } + + // ** 0x00 becomes 0x00000000 + // ** 0x7f becomes 0x0000007f + // ** 0x81 0x00 becomes 0x00000080 + // ** 0x82 0x00 becomes 0x00000100 + // ** 0x80 0x7f becomes 0x0000007f + // ** 0x81 0x91 0xd1 0xac 0x78 becomes 0x12345678 + // ** 0x81 0x81 0x81 0x81 0x01 becomes 0x10204081 + #[rstest] + #[case((0, 1), &[0x00])] + #[case((1, 1), &[0x01])] + #[case((129, 2), &[0x81, 0x01] )] + #[case((16513, 3), &[0x81, 0x81, 0x01] )] + #[case((2113665, 4), &[0x81, 0x81, 0x81, 0x01] )] + #[case((270549121, 5), &[0x81, 0x81, 0x81, 0x81, 0x01] )] + #[case((34630287489, 6), &[0x81, 0x81, 0x81, 0x81, 0x81, 0x01] )] + #[case((4432676798593, 7), &[0x81, 0x81, 0x81, 0x81, 0x81, 0x81, 0x01] )] + #[case((567382630219905, 8), &[0x81, 0x81, 0x81, 0x81, 0x81, 0x81, 0x81, 0x01] )] + #[case((145249953336295681, 9), &[0x81, 0x81, 0x81, 0x81, 0x81, 0x81, 0x81, 0x81, 0x01] )] + fn test_write_varint(#[case] value: (u64, usize), #[case] output: &[u8]) { + let mut buf: [u8; 10] = [0; 10]; + let n = write_varint(&mut buf, value.0); + assert_eq!(n, value.1); + for i in 0..output.len() { + assert_eq!(buf[i], output[i]); + } + } } diff --git a/core/storage.rs b/core/storage.rs index bb62df0bd..54cab056f 100644 --- a/core/storage.rs +++ b/core/storage.rs @@ -1,10 +1,6 @@ #[cfg(feature = "fs")] use crate::io::File; -use crate::{ - error::LimboError, - io::{Completion, WriteCompletion}, - Buffer, Result, -}; +use crate::{error::LimboError, io::Completion, Buffer, Result}; use std::{cell::RefCell, rc::Rc}; pub struct PageSource { @@ -39,7 +35,7 @@ impl PageSource { &self, page_idx: usize, buffer: Rc>, - c: Rc, + c: Rc, ) -> Result<()> { self.io.write(page_idx, buffer, c) } @@ -47,12 +43,7 @@ impl PageSource { pub trait PageIO { fn get(&self, page_idx: usize, c: Rc) -> Result<()>; - fn write( - &self, - page_idx: usize, - buffer: Rc>, - c: Rc, - ) -> Result<()>; + fn write(&self, page_idx: usize, buffer: Rc>, c: Rc) -> Result<()>; } #[cfg(feature = "fs")] @@ -63,7 +54,11 @@ struct FileStorage { #[cfg(feature = "fs")] impl PageIO for FileStorage { fn get(&self, page_idx: usize, c: Rc) -> Result<()> { - let size = c.buf().len(); + let r = match &(*c) { + Completion::Read(r) => r, + Completion::Write(_) => unreachable!(), + }; + let size = r.buf().len(); assert!(page_idx > 0); if size < 512 || size > 65536 || size & (size - 1) != 0 { return Err(LimboError::NotADB.into()); @@ -73,17 +68,13 @@ impl PageIO for FileStorage { Ok(()) } - fn write( - &self, - page_idx: usize, - buffer: Rc>, - c: Rc, - ) -> Result<()> { + fn write(&self, page_idx: usize, buffer: Rc>, c: Rc) -> Result<()> { let buffer_size = buffer.borrow().len(); assert!(buffer_size >= 512); assert!(buffer_size <= 65536); assert!((buffer_size & (buffer_size - 1)) == 0); - self.file.pwrite(page_idx, buffer, c)?; + let pos = (page_idx - 1) * buffer_size; + self.file.pwrite(pos, buffer, c)?; Ok(()) } } diff --git a/core/translate/expr.rs b/core/translate/expr.rs index 163357978..deb81ecc0 100644 --- a/core/translate/expr.rs +++ b/core/translate/expr.rs @@ -3,7 +3,7 @@ use sqlite3_parser::ast::{self, Expr, UnaryOperator}; use crate::{ function::{Func, ScalarFunc}, - schema::{Schema, Table, Type}, + schema::{Table, Type}, translate::select::{ColumnInfo, Select, SrcTable}, util::normalize_ident, vdbe::{builder::ProgramBuilder, BranchOffset, Insn}, @@ -11,7 +11,7 @@ use crate::{ pub fn translate_expr( program: &mut ProgramBuilder, - select: &Select, + select: Option<&Select>, expr: &ast::Expr, target_register: usize, cursor_hint: Option, @@ -435,7 +435,7 @@ pub fn translate_expr( ast::Expr::Parenthesized(_) => todo!(), ast::Expr::Qualified(tbl, ident) => { let (idx, col_type, cursor_id, is_primary_key) = - resolve_ident_qualified(program, &tbl.0, &ident.0, select, cursor_hint)?; + resolve_ident_qualified(program, &tbl.0, &ident.0, select.unwrap(), cursor_hint)?; if is_primary_key { program.emit_insn(Insn::RowId { cursor_id, @@ -614,12 +614,12 @@ pub fn resolve_ident_qualified( pub fn resolve_ident_table( program: &ProgramBuilder, ident: &String, - select: &Select, + select: Option<&Select>, cursor_hint: Option, ) -> Result<(usize, Type, usize, bool)> { let ident = normalize_ident(ident); let mut found = Vec::new(); - for join in &select.src_tables { + for join in &select.unwrap().src_tables { match join.table { Table::BTree(ref table) => { let res = table diff --git a/core/translate/insert.rs b/core/translate/insert.rs new file mode 100644 index 000000000..a54c3a2ce --- /dev/null +++ b/core/translate/insert.rs @@ -0,0 +1,194 @@ +use std::{cell::RefCell, ops::Deref, rc::Rc}; + +use sqlite3_parser::ast::{ + DistinctNames, InsertBody, QualifiedName, ResolveType, ResultColumn, With, +}; + +use crate::Result; +use crate::{ + schema::{Schema, Table}, + sqlite3_ondisk::DatabaseHeader, + translate::expr::translate_expr, + vdbe::{builder::ProgramBuilder, Insn, Program}, +}; + +pub fn translate_insert( + schema: &Schema, + with: &Option, + or_conflict: &Option, + tbl_name: &QualifiedName, + _columns: &Option, + body: &InsertBody, + _returning: &Option>, + database_header: Rc>, +) -> Result { + assert!(with.is_none()); + assert!(or_conflict.is_none()); + let mut program = ProgramBuilder::new(); + let init_label = program.allocate_label(); + program.emit_insn_with_label_dependency( + Insn::Init { + target_pc: init_label, + }, + init_label, + ); + let start_offset = program.offset(); + + // open table + let table_name = &tbl_name.name; + + let table = match schema.get_table(table_name.0.as_str()) { + Some(table) => table, + None => crate::bail_corrupt_error!("Parse error: no such table: {}", table_name), + }; + let table = Rc::new(Table::BTree(table)); + let cursor_id = program.alloc_cursor_id( + Some(table_name.0.clone()), + Some(table.clone().deref().clone()), + ); + let root_page = match table.as_ref() { + Table::BTree(btree) => btree.root_page, + Table::Pseudo(_) => todo!(), + }; + + let mut num_cols = table.columns().len(); + if table.has_rowid() { + num_cols += 1; + } + // column_registers_start[0] == rowid if has rowid + let column_registers_start = program.alloc_registers(num_cols); + + // Coroutine for values + let yield_reg = program.alloc_register(); + let jump_on_definition_label = program.allocate_label(); + { + program.emit_insn_with_label_dependency( + Insn::InitCoroutine { + yield_reg, + jump_on_definition: jump_on_definition_label, + start_offset: program.offset() + 1, + }, + jump_on_definition_label, + ); + match body { + InsertBody::Select(select, None) => match &select.body.select { + sqlite3_parser::ast::OneSelect::Select { + distinctness: _, + columns: _, + from: _, + where_clause: _, + group_by: _, + window_clause: _, + } => todo!(), + sqlite3_parser::ast::OneSelect::Values(values) => { + for value in values { + for (col, expr) in value.iter().enumerate() { + let mut col = col; + if table.has_rowid() { + col += 1; + } + translate_expr( + &mut program, + None, + expr, + column_registers_start + col, + None, + )?; + } + program.emit_insn(Insn::Yield { + yield_reg, + end_offset: 0, + }); + } + } + }, + InsertBody::DefaultValues => todo!("default values not yet supported"), + _ => todo!(), + } + program.emit_insn(Insn::EndCoroutine { yield_reg }); + } + + program.resolve_label(jump_on_definition_label, program.offset()); + program.emit_insn(Insn::OpenWriteAsync { + cursor_id, + root_page, + }); + program.emit_insn(Insn::OpenWriteAwait {}); + + // Main loop + let record_register = program.alloc_register(); + let halt_label = program.allocate_label(); + let loop_start_offset = program.offset(); + program.emit_insn_with_label_dependency( + Insn::Yield { + yield_reg, + end_offset: halt_label, + }, + halt_label, + ); + + if table.has_rowid() { + let key_reg = column_registers_start + 1; + let row_id_reg = column_registers_start; + // copy key to rowid + program.emit_insn(Insn::Copy { + src_reg: key_reg, + dst_reg: row_id_reg, + amount: 0, + }); + program.emit_insn(Insn::SoftNull { reg: key_reg }); + + let notnull_label = program.allocate_label(); + program.emit_insn_with_label_dependency( + Insn::NotNull { + reg: row_id_reg, + target_pc: notnull_label, + }, + notnull_label, + ); + program.emit_insn(Insn::NewRowid { reg: row_id_reg }); + + program.resolve_label(notnull_label, program.offset()); + program.emit_insn(Insn::MustBeInt { reg: row_id_reg }); + let make_record_label = program.allocate_label(); + program.emit_insn_with_label_dependency( + Insn::NotExists { + cursor: cursor_id, + rowid_reg: row_id_reg, + target_pc: make_record_label, + }, + make_record_label, + ); + program.emit_insn(Insn::Halt); // Add error code 1555 and rollback + program.resolve_label(make_record_label, program.offset()); + program.emit_insn(Insn::MakeRecord { + start_reg: column_registers_start + 1, + count: num_cols - 1, + dest_reg: record_register, + }); + program.emit_insn(Insn::InsertAsync { + cursor: cursor_id, + key_reg: column_registers_start, + record_reg: record_register, + flag: 0, + }); + program.emit_insn(Insn::InsertAwait { + cursor_id: cursor_id, + }); + } + + program.emit_insn(Insn::Goto { + target_pc: loop_start_offset, + }); + + program.resolve_label(halt_label, program.offset()); + program.emit_insn(Insn::Halt); + program.resolve_label(init_label, program.offset()); + program.emit_insn(Insn::Transaction); + program.emit_constant_insns(); + program.emit_insn(Insn::Goto { + target_pc: start_offset, + }); + program.resolve_deferred_labels(); + Ok(program.build(database_header)) +} diff --git a/core/translate/mod.rs b/core/translate/mod.rs index 53c288ab9..280cbc372 100644 --- a/core/translate/mod.rs +++ b/core/translate/mod.rs @@ -8,6 +8,7 @@ //! will read rows from the database and filter them according to a WHERE clause. pub(crate) mod expr; +pub(crate) mod insert; pub(crate) mod select; pub(crate) mod where_clause; @@ -20,6 +21,7 @@ use crate::sqlite3_ondisk::{DatabaseHeader, MIN_PAGE_CACHE_SIZE}; use crate::util::normalize_ident; use crate::vdbe::{builder::ProgramBuilder, Insn, Program}; use crate::{bail_parse_error, Result}; +use insert::translate_insert; use select::{prepare_select, translate_select}; use sqlite3_parser::ast; @@ -49,7 +51,6 @@ pub fn translate( ast::Stmt::DropTable { .. } => bail_parse_error!("DROP TABLE not supported yet"), ast::Stmt::DropTrigger { .. } => bail_parse_error!("DROP TRIGGER not supported yet"), ast::Stmt::DropView { .. } => bail_parse_error!("DROP VIEW not supported yet"), - ast::Stmt::Insert { .. } => bail_parse_error!("INSERT not supported yet"), ast::Stmt::Pragma(name, body) => translate_pragma(&name, body, database_header, pager), ast::Stmt::Reindex { .. } => bail_parse_error!("REINDEX not supported yet"), ast::Stmt::Release(_) => bail_parse_error!("RELEASE not supported yet"), @@ -57,10 +58,27 @@ pub fn translate( ast::Stmt::Savepoint(_) => bail_parse_error!("SAVEPOINT not supported yet"), ast::Stmt::Select(select) => { let select = prepare_select(schema, &select)?; - translate_select(select) + translate_select(select, database_header) } ast::Stmt::Update { .. } => bail_parse_error!("UPDATE not supported yet"), ast::Stmt::Vacuum(_, _) => bail_parse_error!("VACUUM not supported yet"), + ast::Stmt::Insert { + with, + or_conflict, + tbl_name, + columns, + body, + returning, + } => translate_insert( + schema, + &with, + &or_conflict, + &tbl_name, + &columns, + &body, + &returning, + database_header, + ), } } @@ -107,7 +125,12 @@ fn translate_pragma( }, _ => 0, }; - update_pragma(&name.name.0, value_to_update, database_header, pager); + update_pragma( + &name.name.0, + value_to_update, + database_header.clone(), + pager, + ); } Some(ast::PragmaBody::Call(_)) => { todo!() @@ -121,7 +144,7 @@ fn translate_pragma( target_pc: start_offset, }); program.resolve_deferred_labels(); - Ok(program.build()) + Ok(program.build(database_header)) } fn update_pragma(name: &str, value: i64, header: Rc>, pager: Rc) { diff --git a/core/translate/select.rs b/core/translate/select.rs index fe832dcaf..154b91be5 100644 --- a/core/translate/select.rs +++ b/core/translate/select.rs @@ -1,5 +1,6 @@ use crate::function::{AggFunc, Func}; use crate::schema::{Column, PseudoTable, Schema, Table}; +use crate::sqlite3_ondisk::DatabaseHeader; use crate::translate::expr::{analyze_columns, maybe_apply_affinity, translate_expr}; use crate::translate::where_clause::{ process_where, translate_processed_where, translate_tableless_where, ProcessedWhereClause, @@ -11,6 +12,7 @@ use crate::Result; use sqlite3_parser::ast::{self, JoinOperator, JoinType, ResultColumn}; +use std::cell::RefCell; use std::rc::Rc; /// A representation of a `SELECT` statement that has all the information @@ -235,7 +237,10 @@ pub fn prepare_select<'a>(schema: &Schema, select: &'a ast::Select) -> Result Result { +pub fn translate_select( + mut select: Select, + database_header: Rc>, +) -> Result { let mut program = ProgramBuilder::new(); let init_label = program.allocate_label(); let early_terminate_label = program.allocate_label(); @@ -274,7 +279,13 @@ pub fn translate_select(mut select: Select) -> Result { let limit_info = if let Some(limit) = &select.limit { assert!(limit.offset.is_none()); let target_register = program.alloc_register(); - let limit_reg = translate_expr(&mut program, &select, &limit.expr, target_register, None)?; + let limit_reg = translate_expr( + &mut program, + Some(&select), + &limit.expr, + target_register, + None, + )?; let num = if let ast::Expr::Literal(ast::Literal::Numeric(num)) = &limit.expr { num.parse::()? } else { @@ -326,7 +337,7 @@ pub fn translate_select(mut select: Select) -> Result { } else { &col.expr }; - translate_expr(&mut program, &select, sort_col_expr, target, None)?; + translate_expr(&mut program, Some(&select), sort_col_expr, target, None)?; } let (_, result_cols_count) = translate_columns(&mut program, &select, None)?; sort_info @@ -417,7 +428,7 @@ pub fn translate_select(mut select: Select) -> Result { target_pc: start_offset, }); program.resolve_deferred_labels(); - Ok(program.build()) + Ok(program.build(database_header)) } fn emit_limit_insn(limit_info: &Option, program: &mut ProgramBuilder) { @@ -742,7 +753,7 @@ fn translate_column( cursor_hint, )?; } else { - let _ = translate_expr(program, select, expr, target_register, cursor_hint)?; + let _ = translate_expr(program, Some(select), expr, target_register, cursor_hint)?; } } ast::ResultColumn::Star => { @@ -807,7 +818,7 @@ fn translate_aggregation( } let expr = &args[0]; let expr_reg = program.alloc_register(); - let _ = translate_expr(program, select, expr, expr_reg, cursor_hint)?; + let _ = translate_expr(program, Some(select), expr, expr_reg, cursor_hint)?; program.emit_insn(Insn::AggStep { acc_reg: target_register, col: expr_reg, @@ -822,7 +833,7 @@ fn translate_aggregation( } else { let expr = &args[0]; let expr_reg = program.alloc_register(); - let _ = translate_expr(program, select, expr, expr_reg, cursor_hint); + let _ = translate_expr(program, Some(select), expr, expr_reg, cursor_hint); expr_reg }; program.emit_insn(Insn::AggStep { @@ -865,8 +876,14 @@ fn translate_aggregation( ast::Expr::Literal(ast::Literal::String(String::from("\",\""))); } - translate_expr(program, select, expr, expr_reg, cursor_hint)?; - translate_expr(program, select, &delimiter_expr, delimiter_reg, cursor_hint)?; + translate_expr(program, Some(select), expr, expr_reg, cursor_hint)?; + translate_expr( + program, + Some(select), + &delimiter_expr, + delimiter_reg, + cursor_hint, + )?; program.emit_insn(Insn::AggStep { acc_reg: target_register, @@ -883,7 +900,7 @@ fn translate_aggregation( } let expr = &args[0]; let expr_reg = program.alloc_register(); - let _ = translate_expr(program, select, expr, expr_reg, cursor_hint); + let _ = translate_expr(program, Some(select), expr, expr_reg, cursor_hint); program.emit_insn(Insn::AggStep { acc_reg: target_register, col: expr_reg, @@ -898,7 +915,7 @@ fn translate_aggregation( } let expr = &args[0]; let expr_reg = program.alloc_register(); - let _ = translate_expr(program, select, expr, expr_reg, cursor_hint); + let _ = translate_expr(program, Some(select), expr, expr_reg, cursor_hint); program.emit_insn(Insn::AggStep { acc_reg: target_register, col: expr_reg, @@ -932,8 +949,14 @@ fn translate_aggregation( _ => crate::bail_parse_error!("Incorrect delimiter parameter"), }; - translate_expr(program, select, expr, expr_reg, cursor_hint)?; - translate_expr(program, select, &delimiter_expr, delimiter_reg, cursor_hint)?; + translate_expr(program, Some(select), expr, expr_reg, cursor_hint)?; + translate_expr( + program, + Some(select), + &delimiter_expr, + delimiter_reg, + cursor_hint, + )?; program.emit_insn(Insn::AggStep { acc_reg: target_register, @@ -950,7 +973,7 @@ fn translate_aggregation( } let expr = &args[0]; let expr_reg = program.alloc_register(); - let _ = translate_expr(program, select, expr, expr_reg, cursor_hint)?; + let _ = translate_expr(program, Some(select), expr, expr_reg, cursor_hint)?; program.emit_insn(Insn::AggStep { acc_reg: target_register, col: expr_reg, @@ -965,7 +988,7 @@ fn translate_aggregation( } let expr = &args[0]; let expr_reg = program.alloc_register(); - let _ = translate_expr(program, select, expr, expr_reg, cursor_hint)?; + let _ = translate_expr(program, Some(select), expr, expr_reg, cursor_hint)?; program.emit_insn(Insn::AggStep { acc_reg: target_register, col: expr_reg, diff --git a/core/translate/where_clause.rs b/core/translate/where_clause.rs index 4b6e89acb..fca060a65 100644 --- a/core/translate/where_clause.rs +++ b/core/translate/where_clause.rs @@ -306,12 +306,12 @@ fn translate_condition_expr( ast::Expr::Binary(lhs, op, rhs) => { let lhs_reg = program.alloc_register(); let rhs_reg = program.alloc_register(); - let _ = translate_expr(program, select, lhs, lhs_reg, cursor_hint); + let _ = translate_expr(program, Some(select), lhs, lhs_reg, cursor_hint); match lhs.as_ref() { ast::Expr::Literal(_) => program.mark_last_insn_constant(), _ => {} } - let _ = translate_expr(program, select, rhs, rhs_reg, cursor_hint); + let _ = translate_expr(program, Some(select), rhs, rhs_reg, cursor_hint); match rhs.as_ref() { ast::Expr::Literal(_) => program.mark_last_insn_constant(), _ => {} @@ -554,7 +554,7 @@ fn translate_condition_expr( // The left hand side only needs to be evaluated once we have a list of values to compare against. let lhs_reg = program.alloc_register(); - let _ = translate_expr(program, select, lhs, lhs_reg, cursor_hint)?; + let _ = translate_expr(program, Some(select), lhs, lhs_reg, cursor_hint)?; let rhs = rhs.as_ref().unwrap(); @@ -577,7 +577,7 @@ fn translate_condition_expr( for (i, expr) in rhs.iter().enumerate() { let rhs_reg = program.alloc_register(); let last_condition = i == rhs.len() - 1; - let _ = translate_expr(program, select, expr, rhs_reg, cursor_hint)?; + let _ = translate_expr(program, Some(select), expr, rhs_reg, cursor_hint)?; // If this is not the last condition, we need to jump to the 'jump_target_when_true' label if the condition is true. if !last_condition { program.emit_insn_with_label_dependency( @@ -614,7 +614,7 @@ fn translate_condition_expr( // If it's a NOT IN expression, we need to jump to the 'jump_target_when_false' label if any of the conditions are true. for expr in rhs.iter() { let rhs_reg = program.alloc_register(); - let _ = translate_expr(program, select, expr, rhs_reg, cursor_hint)?; + let _ = translate_expr(program, Some(select), expr, rhs_reg, cursor_hint)?; program.emit_insn_with_label_dependency( Insn::Eq { lhs: lhs_reg, @@ -657,9 +657,9 @@ fn translate_condition_expr( let pattern_reg = program.alloc_register(); let column_reg = program.alloc_register(); // LIKE(pattern, column). We should translate the pattern first before the column - let _ = translate_expr(program, select, rhs, pattern_reg, cursor_hint)?; + let _ = translate_expr(program, Some(select), rhs, pattern_reg, cursor_hint)?; program.mark_last_insn_constant(); - let _ = translate_expr(program, select, lhs, column_reg, cursor_hint)?; + let _ = translate_expr(program, Some(select), lhs, column_reg, cursor_hint)?; program.emit_insn(Insn::Function { func: ScalarFunc::Like, start_reg: pattern_reg, diff --git a/core/types.rs b/core/types.rs index 2562fc9de..67d4162cc 100644 --- a/core/types.rs +++ b/core/types.rs @@ -4,6 +4,8 @@ use std::{cell::Ref, rc::Rc}; use crate::error::LimboError; use crate::Result; +use crate::sqlite3_ondisk::write_varint; + #[derive(Debug, Clone, PartialEq)] pub enum Value<'a> { Null, @@ -301,6 +303,59 @@ impl OwnedRecord { pub fn new(values: Vec) -> Self { Self { values } } + + pub fn serialize(&self, buf: &mut Vec) { + let initial_i = buf.len(); + + for value in &self.values { + let serial_type = match value { + OwnedValue::Null => 0, + OwnedValue::Integer(_) => 6, // for now let's only do i64 + OwnedValue::Float(_) => 7, + OwnedValue::Text(t) => (t.len() * 2 + 13) as u64, + OwnedValue::Blob(b) => (b.len() * 2 + 12) as u64, + // not serializable values + OwnedValue::Agg(_) => unreachable!(), + OwnedValue::Record(_) => unreachable!(), + }; + + buf.resize(buf.len() + 9, 0); // Ensure space for varint + let len = buf.len(); + let n = write_varint(&mut buf[len - 9..], serial_type); + buf.truncate(buf.len() - 9 + n); // Remove unused bytes + } + + let mut header_size = buf.len() - initial_i; + // write content + for value in &self.values { + match value { + OwnedValue::Null => {} + OwnedValue::Integer(i) => buf.extend_from_slice(&i.to_be_bytes()), + OwnedValue::Float(f) => buf.extend_from_slice(&f.to_be_bytes()), + OwnedValue::Text(t) => buf.extend_from_slice(t.as_bytes()), + OwnedValue::Blob(b) => buf.extend_from_slice(b), + // non serializable + OwnedValue::Agg(_) => unreachable!(), + OwnedValue::Record(_) => unreachable!(), + }; + } + + let mut header_bytes_buf: Vec = Vec::new(); + if header_size <= 126 { + // common case + header_size += 1; + } else { + todo!("calculate big header size extra bytes"); + // get header varint len + // header_size += n; + // if( nVarint { @@ -315,7 +370,13 @@ pub trait Cursor { fn wait_for_completion(&mut self) -> Result<()>; fn rowid(&self) -> Result>; fn record(&self) -> Result>>; - fn insert(&mut self, record: &OwnedRecord) -> Result<()>; + fn insert( + &mut self, + key: &OwnedValue, + record: &OwnedRecord, + moved_before: bool, /* Tells inserter that it doesn't need to traverse in order to find leaf page */ + ) -> Result>; // + fn exists(&mut self, key: &OwnedValue) -> Result>; fn set_null_flag(&mut self, flag: bool); fn get_null_flag(&self) -> bool; } diff --git a/core/vdbe/builder.rs b/core/vdbe/builder.rs index 382bc8731..101177a4d 100644 --- a/core/vdbe/builder.rs +++ b/core/vdbe/builder.rs @@ -1,3 +1,7 @@ +use std::{cell::RefCell, rc::Rc}; + +use crate::sqlite3_ondisk::DatabaseHeader; + use super::{BranchOffset, CursorID, Insn, InsnReference, Program, Table}; pub struct ProgramBuilder { @@ -246,6 +250,26 @@ impl ProgramBuilder { assert!(*pc_if_next < 0); *pc_if_next = to_offset; } + Insn::InitCoroutine { + yield_reg: _, + jump_on_definition, + start_offset: _, + } => { + *jump_on_definition = to_offset; + } + Insn::NotExists { + cursor: _, + rowid_reg: _, + target_pc, + } => { + *target_pc = to_offset; + } + Insn::Yield { + yield_reg: _, + end_offset, + } => { + *end_offset = to_offset; + } _ => { todo!("missing resolve_label for {:?}", insn); } @@ -281,7 +305,7 @@ impl ProgramBuilder { self.deferred_label_resolutions.clear(); } - pub fn build(self) -> Program { + pub fn build(self, database_header: Rc>) -> Program { assert!( self.deferred_label_resolutions.is_empty(), "deferred_label_resolutions is not empty when build() is called, did you forget to call resolve_deferred_labels()?" @@ -294,6 +318,7 @@ impl ProgramBuilder { max_registers: self.next_free_register, insns: self.insns, cursor_ref: self.cursor_ref, + database_header, } } } diff --git a/core/vdbe/explain.rs b/core/vdbe/explain.rs index 8f29c360b..242428170 100644 --- a/core/vdbe/explain.rs +++ b/core/vdbe/explain.rs @@ -507,6 +507,137 @@ pub fn insn_to_str(program: &Program, addr: InsnReference, insn: &Insn, indent: 0, format!("r[{}]=func(r[{}..])", dest, start_reg), ), + Insn::InitCoroutine { + yield_reg, + jump_on_definition, + start_offset, + } => ( + "InitCoroutine", + *yield_reg as i32, + *jump_on_definition as i32, + *start_offset as i32, + OwnedValue::Text(Rc::new("".to_string())), + 0, + format!(""), + ), + Insn::EndCoroutine { yield_reg } => ( + "EndCoroutine", + *yield_reg as i32, + 0, + 0, + OwnedValue::Text(Rc::new("".to_string())), + 0, + format!(""), + ), + Insn::Yield { + yield_reg, + end_offset, + } => ( + "Yield", + *yield_reg as i32, + *end_offset as i32, + 0, + OwnedValue::Text(Rc::new("".to_string())), + 0, + format!(""), + ), + Insn::InsertAsync { + cursor, + key_reg, + record_reg, + flag, + } => ( + "InsertAsync", + *cursor as i32, + *record_reg as i32, + *key_reg as i32, + OwnedValue::Text(Rc::new("".to_string())), + *flag as u16, + format!(""), + ), + Insn::InsertAwait { cursor_id } => ( + "InsertAwait", + *cursor_id as i32, + 0, + 0, + OwnedValue::Text(Rc::new("".to_string())), + 0, + format!(""), + ), + Insn::NewRowid { reg } => ( + "NewRowId", + 0, + *reg as i32, + 0, + OwnedValue::Text(Rc::new("".to_string())), + 0, + format!(""), + ), + Insn::MustBeInt { reg } => ( + "MustBeInt", + *reg as i32, + 0, + 0, + OwnedValue::Text(Rc::new("".to_string())), + 0, + format!(""), + ), + Insn::SoftNull { reg } => ( + "SoftNull", + *reg as i32, + 0, + 0, + OwnedValue::Text(Rc::new("".to_string())), + 0, + format!(""), + ), + Insn::NotExists { + cursor, + rowid_reg, + target_pc, + } => ( + "NotExists", + *cursor as i32, + *target_pc as i32, + *rowid_reg as i32, + OwnedValue::Text(Rc::new("".to_string())), + 0, + format!(""), + ), + Insn::OpenWriteAsync { + cursor_id, + root_page, + } => ( + "OpenWriteAsync", + *cursor_id as i32, + *root_page as i32, + 0, + OwnedValue::Text(Rc::new("".to_string())), + 0, + format!(""), + ), + Insn::OpenWriteAwait {} => ( + "OpenWriteAwait", + 0, + 0, + 0, + OwnedValue::Text(Rc::new("".to_string())), + 0, + format!(""), + ), + Insn::Copy { + src_reg, + dst_reg, + amount, + } => ( + "Copy", + *src_reg as i32, + *dst_reg as i32, + *amount as i32, + OwnedValue::Text(Rc::new("".to_string())), + 0, + format!(""), + ), }; format!( "{:<4} {:<17} {:<4} {:<4} {:<4} {:<13} {:<2} {}", diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index 587536bed..5e68f93b3 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -28,6 +28,7 @@ use crate::function::{AggFunc, ScalarFunc}; use crate::pager::Pager; use crate::pseudo::PseudoCursor; use crate::schema::Table; +use crate::sqlite3_ondisk::DatabaseHeader; use crate::types::{AggContext, Cursor, CursorResult, OwnedRecord, OwnedValue, Record}; use crate::Result; @@ -279,6 +280,63 @@ pub enum Insn { dest: usize, // P3 func: ScalarFunc, // P4 }, + + InitCoroutine { + yield_reg: usize, + jump_on_definition: BranchOffset, + start_offset: BranchOffset, + }, + + EndCoroutine { + yield_reg: usize, + }, + + Yield { + yield_reg: usize, + end_offset: BranchOffset, + }, + + InsertAsync { + cursor: CursorID, + key_reg: usize, // Must be int. + record_reg: usize, // Blob of record data. + flag: usize, // Flags used by insert, for now not used. + }, + + InsertAwait { + cursor_id: usize, + }, + + NewRowid { + reg: usize, + }, + + MustBeInt { + reg: usize, + }, + + SoftNull { + reg: usize, + }, + + NotExists { + cursor: CursorID, + rowid_reg: usize, + target_pc: BranchOffset, + }, + + OpenWriteAsync { + cursor_id: CursorID, + root_page: PageIdx, + }, + + OpenWriteAwait {}, + + Copy { + src_reg: usize, + dst_reg: usize, + amount: usize, // 0 amount means we include src_reg, dst_reg..=dst_reg+amount = src_reg..=src_reg+amount + }, } // Index of insn in list of insns @@ -295,6 +353,7 @@ pub struct ProgramState { pub pc: BranchOffset, cursors: RefCell>>, registers: Vec, + ended_coroutine: bool, // flag to notify yield coroutine finished } impl ProgramState { @@ -306,6 +365,7 @@ impl ProgramState { pc: 0, cursors, registers, + ended_coroutine: false, } } @@ -322,6 +382,7 @@ pub struct Program { pub max_registers: usize, pub insns: Vec, pub cursor_ref: Vec<(Option, Option)>, + pub database_header: Rc>, } impl Program { @@ -580,7 +641,11 @@ impl Program { cursor_id, root_page, } => { - let cursor = Box::new(BTreeCursor::new(pager.clone(), *root_page)); + let cursor = Box::new(BTreeCursor::new( + pager.clone(), + *root_page, + self.database_header.clone(), + )); cursors.insert(*cursor_id, cursor); state.pc += 1; } @@ -997,7 +1062,7 @@ impl Program { }; state.registers[*dest_reg] = OwnedValue::Record(record.clone()); let sorter_cursor = cursors.get_mut(sorter_cursor).unwrap(); - sorter_cursor.insert(&record)?; + sorter_cursor.insert(&OwnedValue::Integer(0), &record, false)?; // fix key later state.pc += 1; } Insn::SorterInsert { @@ -1009,7 +1074,8 @@ impl Program { OwnedValue::Record(record) => record, _ => unreachable!("SorterInsert on non-record register"), }; - cursor.insert(record)?; + // TODO: set correct key + cursor.insert(&OwnedValue::Integer(0), record, false)?; state.pc += 1; } Insn::SorterSort { @@ -1198,6 +1264,121 @@ impl Program { state.pc += 1; } }, + Insn::InitCoroutine { + yield_reg, + jump_on_definition, + start_offset, + } => { + state.registers[*yield_reg] = OwnedValue::Integer(*start_offset); + state.pc = *jump_on_definition; + } + Insn::EndCoroutine { yield_reg } => { + if let OwnedValue::Integer(pc) = state.registers[*yield_reg] { + state.ended_coroutine = true; + state.pc = pc - 1; // yield jump is always next to yield. Here we substract 1 to go back to yield instruction + } else { + unreachable!(); + } + } + Insn::Yield { + yield_reg, + end_offset, + } => { + if let OwnedValue::Integer(pc) = state.registers[*yield_reg] { + if state.ended_coroutine { + state.pc = *end_offset; + } else { + // swap + (state.pc, state.registers[*yield_reg]) = + (pc, OwnedValue::Integer(state.pc + 1)); + } + } else { + unreachable!(); + } + } + Insn::InsertAsync { + cursor, + key_reg, + record_reg, + flag: _, + } => { + let cursor = cursors.get_mut(cursor).unwrap(); + let record = match &state.registers[*record_reg] { + OwnedValue::Record(r) => r, + _ => unreachable!("Not a record! Cannot insert a non record value."), + }; + let key = &state.registers[*key_reg]; + match cursor.insert(key, record, true)? { + CursorResult::Ok(_) => { + state.pc += 1; + } + CursorResult::IO => { + // If there is I/O, the instruction is restarted. + return Ok(StepResult::IO); + } + } + } + Insn::InsertAwait { cursor_id } => { + let cursor = cursors.get_mut(cursor_id).unwrap(); + cursor.wait_for_completion()?; + state.pc += 1; + } + Insn::NewRowid { reg: _ } => todo!(), + Insn::MustBeInt { reg } => { + match state.registers[*reg] { + OwnedValue::Integer(_) => {} + _ => { + crate::bail_parse_error!( + "MustBeInt: the value in the register is not an integer" + ); + } + }; + state.pc += 1; + } + Insn::SoftNull { reg } => { + state.registers[*reg] = OwnedValue::Null; + state.pc += 1; + } + Insn::NotExists { + cursor, + rowid_reg, + target_pc, + } => { + let cursor = cursors.get_mut(cursor).unwrap(); + match cursor.exists(&state.registers[*rowid_reg])? { + CursorResult::Ok(true) => state.pc += 1, + CursorResult::Ok(false) => state.pc = *target_pc, + CursorResult::IO => return Ok(StepResult::IO), + }; + } + // this cursor may be reused for next insert + // Update: tablemoveto is used to travers on not exists, on insert depending on flags if nonseek it traverses again. + // If not there might be some optimizations obviously. + Insn::OpenWriteAsync { + cursor_id, + root_page, + } => { + let cursor = Box::new(BTreeCursor::new( + pager.clone(), + *root_page, + self.database_header.clone(), + )); + cursors.insert(*cursor_id, cursor); + state.pc += 1; + } + Insn::OpenWriteAwait {} => { + state.pc += 1; + } + Insn::Copy { + src_reg, + dst_reg, + amount, + } => { + for i in 0..=*amount { + state.registers[*dst_reg + i] = state.registers[*src_reg + i].clone(); + } + state.pc += 1; + } } } } diff --git a/core/vdbe/sorter.rs b/core/vdbe/sorter.rs index de4f90296..26704ae4b 100644 --- a/core/vdbe/sorter.rs +++ b/core/vdbe/sorter.rs @@ -1,5 +1,5 @@ use crate::{ - types::{Cursor, CursorResult, OwnedRecord}, + types::{Cursor, CursorResult, OwnedRecord, OwnedValue}, Result, }; use std::{ @@ -79,11 +79,18 @@ impl Cursor for Sorter { Ok(self.current.borrow()) } - fn insert(&mut self, record: &OwnedRecord) -> Result<()> { + fn insert( + &mut self, + key: &OwnedValue, + record: &OwnedRecord, + moved_before: bool, + ) -> Result> { + let _ = key; + let _ = moved_before; let key_fields = self.order.len(); let key = OwnedRecord::new(record.values[0..key_fields].to_vec()); self.insert(key, OwnedRecord::new(record.values[key_fields..].to_vec())); - Ok(()) + Ok(CursorResult::Ok(())) } fn set_null_flag(&mut self, _flag: bool) { @@ -93,4 +100,9 @@ impl Cursor for Sorter { fn get_null_flag(&self) -> bool { todo!(); } + + fn exists(&mut self, key: &OwnedValue) -> Result> { + let _ = key; + todo!() + } } diff --git a/simulator/main.rs b/simulator/main.rs index 628ba5526..4e26eea9f 100644 --- a/simulator/main.rs +++ b/simulator/main.rs @@ -162,7 +162,7 @@ impl limbo_core::File for SimulatorFile { &self, pos: usize, buffer: Rc>, - c: Rc, + c: Rc, ) -> Result<()> { if *self.fault.borrow() { *self.nr_pwrite_faults.borrow_mut() += 1;