diff --git a/Cargo.lock b/Cargo.lock index ced65e88c..d0ea15bc7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1151,6 +1151,7 @@ dependencies = [ name = "limbo_core" version = "0.0.9" dependencies = [ + "bumpalo", "cfg_block", "chrono", "criterion", diff --git a/Cargo.toml b/Cargo.toml index a8beb431a..a9dccf37a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,6 +51,10 @@ codegen-units = 1 panic = "abort" lto = true +[profile.bench-profile] +inherits = "release" +debug = true + [profile.dist] inherits = "release" lto = "thin" diff --git a/bindings/wasm/lib.rs b/bindings/wasm/lib.rs index 06a3a43ee..a1febb5d7 100644 --- a/bindings/wasm/lib.rs +++ b/bindings/wasm/lib.rs @@ -1,4 +1,4 @@ -use limbo_core::{maybe_init_database_file, OpenFlags, Pager, Result, WalFile}; +use limbo_core::{maybe_init_database_file, OpenFlags, Pager, Result, WalFile, WalFileShared}; use std::cell::RefCell; use std::rc::Rc; use std::sync::Arc; @@ -6,7 +6,7 @@ use wasm_bindgen::prelude::*; #[wasm_bindgen] pub struct Database { - db: Rc, + db: Arc, conn: Rc, } @@ -22,13 +22,21 @@ impl Database { maybe_init_database_file(&file, &io).unwrap(); let page_io = Rc::new(DatabaseStorage::new(file)); let db_header = Pager::begin_open(page_io.clone()).unwrap(); + + // ensure db header is there + io.run_once().unwrap(); + let wal_path = format!("{}-wal", path); + let wal_shared = + WalFileShared::open_shared(&io, wal_path.as_str(), db_header.borrow().page_size) + .unwrap(); let wal = Rc::new(RefCell::new(WalFile::new( io.clone(), - wal_path, db_header.borrow().page_size as usize, + wal_shared.clone(), ))); - let db = limbo_core::Database::open(io, page_io, wal).unwrap(); + + let db = limbo_core::Database::open(io, page_io, wal, wal_shared).unwrap(); let conn = db.connect(); Database { db, conn } } diff --git a/core/Cargo.toml b/core/Cargo.toml index 52acb9e49..9c9ed5521 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -52,6 +52,7 @@ serde = { version = "1.0", features = ["derive"] } pest = { version = "2.0", optional = true } pest_derive = { version = "2.0", optional = true } rand = "0.8.5" +bumpalo = { version = "3.16.0", features = ["collections", "boxed"] } [target.'cfg(not(target_family = "windows"))'.dev-dependencies] pprof = { version = "0.14.0", features = ["criterion", "flamegraph"] } diff --git a/core/lib.rs b/core/lib.rs index 83210483a..486e56ea6 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -21,15 +21,16 @@ use schema::Schema; use sqlite3_parser::ast; use sqlite3_parser::{ast::Cmd, lexer::sql::Parser}; use std::cell::Cell; -use std::rc::Weak; -use std::sync::{Arc, OnceLock}; +use std::sync::Weak; +use std::sync::{Arc, OnceLock, RwLock}; use std::{cell::RefCell, rc::Rc}; use storage::btree::btree_init_page; #[cfg(feature = "fs")] use storage::database::FileStorage; -use storage::pager::allocate_page; +use storage::pager::{allocate_page, DumbLruPageCache}; use storage::sqlite3_ondisk::{DatabaseHeader, DATABASE_HEADER_SIZE}; pub use storage::wal::WalFile; +pub use storage::wal::WalFileShared; use util::parse_schema_rows; use translate::optimizer::optimize_plan; @@ -64,41 +65,52 @@ pub struct Database { schema: Rc>, header: Rc>, transaction_state: RefCell, + // Shared structures of a Database are the parts that are common to multiple threads that might + // create DB connections. + shared_page_cache: Arc>, + shared_wal: Arc>, } impl Database { #[cfg(feature = "fs")] - pub fn open_file(io: Arc, path: &str) -> Result> { + pub fn open_file(io: Arc, path: &str) -> Result> { + use storage::wal::WalFileShared; + let file = io.open_file(path, io::OpenFlags::Create, true)?; maybe_init_database_file(&file, &io)?; let page_io = Rc::new(FileStorage::new(file)); let wal_path = format!("{}-wal", path); let db_header = Pager::begin_open(page_io.clone())?; io.run_once()?; + let wal_shared = + WalFileShared::open_shared(&io, wal_path.as_str(), db_header.borrow().page_size)?; let wal = Rc::new(RefCell::new(WalFile::new( io.clone(), - wal_path, db_header.borrow().page_size as usize, + wal_shared.clone(), ))); - Self::open(io, page_io, wal) + Self::open(io, page_io, wal, wal_shared) } pub fn open( io: Arc, page_io: Rc, wal: Rc>, - ) -> Result> { + shared_wal: Arc>, + ) -> Result> { let db_header = Pager::begin_open(page_io.clone())?; io.run_once()?; DATABASE_VERSION.get_or_init(|| { let version = db_header.borrow().version_number; version.to_string() }); + let shared_page_cache = Arc::new(RwLock::new(DumbLruPageCache::new(10))); let pager = Rc::new(Pager::finish_open( db_header.clone(), page_io, wal, io.clone(), + shared_page_cache.clone(), )?); let bootstrap_schema = Rc::new(RefCell::new(Schema::new())); let conn = Rc::new(Connection { @@ -113,21 +125,23 @@ impl Database { parse_schema_rows(rows, &mut schema, io)?; let schema = Rc::new(RefCell::new(schema)); let header = db_header; - Ok(Rc::new(Database { + Ok(Arc::new(Database { pager, schema, header, transaction_state: RefCell::new(TransactionState::None), + shared_page_cache, + shared_wal, })) } - pub fn connect(self: &Rc) -> Rc { + pub fn connect(self: &Arc) -> Rc { Rc::new(Connection { pager: self.pager.clone(), schema: self.schema.clone(), header: self.header.clone(), - db: Rc::downgrade(self), last_insert_rowid: Cell::new(0), + db: Arc::downgrade(self), }) } } @@ -153,8 +167,7 @@ pub fn maybe_init_database_file(file: &Rc, io: &Arc) -> Result DATABASE_HEADER_SIZE, ); - let mut page = page1.borrow_mut(); - let contents = page.contents.as_mut().unwrap(); + let contents = page1.get().contents.as_mut().unwrap(); contents.write_database_header(&db_header); // write the first page to disk synchronously let flag_complete = Rc::new(RefCell::new(false)); diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 58eaa04d2..5ef2d8d6c 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -1,6 +1,6 @@ use log::debug; -use crate::storage::pager::{Page, Pager}; +use crate::storage::pager::Pager; use crate::storage::sqlite3_ondisk::{ read_btree_cell, read_varint, write_varint, BTreeCell, DatabaseHeader, PageContent, PageType, TableInteriorCell, TableLeafCell, @@ -12,6 +12,7 @@ use std::cell::{Ref, RefCell}; use std::pin::Pin; use std::rc::Rc; +use super::pager::PageRef; use super::sqlite3_ondisk::{ write_varint_to_vec, IndexInteriorCell, IndexLeafCell, OverflowCell, DATABASE_HEADER_SIZE, }; @@ -67,7 +68,7 @@ enum WriteState { struct WriteInfo { state: WriteState, - new_pages: RefCell>>>, + new_pages: RefCell>, scratch_cells: RefCell>, rightmost_pointer: RefCell>, page_copy: RefCell>, // this holds the copy a of a page needed for buffer references @@ -101,7 +102,7 @@ struct PageStack { /// Pointer to the currenet page being consumed current_page: RefCell, /// List of pages in the stack. Root page will be in index 0 - stack: RefCell<[Option>>; BTCURSOR_MAX_DEPTH + 1]>, + stack: RefCell<[Option; BTCURSOR_MAX_DEPTH + 1]>, /// List of cell indices in the stack. /// cell_indices[current_page] is the current cell index being consumed. Similarly /// cell_indices[current_page-1] is the cell index of the parent of the current page @@ -143,16 +144,15 @@ impl BTreeCursor { fn is_empty_table(&mut self) -> Result> { let page = self.pager.read_page(self.root_page)?; - let page = RefCell::borrow(&page); return_if_locked!(page); - let cell_count = page.contents.as_ref().unwrap().cell_count(); + let cell_count = page.get().contents.as_ref().unwrap().cell_count(); Ok(CursorResult::Ok(cell_count == 0)) } fn get_prev_record(&mut self) -> Result, Option)>> { loop { - let mem_page_rc = self.stack.top(); + let page = self.stack.top(); let cell_idx = self.stack.current_index(); // moved to current page begin @@ -177,18 +177,15 @@ impl BTreeCursor { let cell_idx = cell_idx as usize; debug!( "get_prev_record current id={} cell={}", - mem_page_rc.borrow().id, + page.get().id, cell_idx ); - if mem_page_rc.borrow().is_locked() { + return_if_locked!(page); + if !page.is_loaded() { + self.pager.load_page(page.clone())?; return Ok(CursorResult::IO); } - if !mem_page_rc.borrow().is_loaded() { - self.pager.load_page(mem_page_rc.clone())?; - return Ok(CursorResult::IO); - } - let mem_page = mem_page_rc.borrow(); - let contents = mem_page.contents.as_ref().unwrap(); + let contents = page.get().contents.as_ref().unwrap(); let cell_count = contents.cell_count(); let cell_idx = if cell_idx >= cell_count { @@ -239,13 +236,13 @@ impl BTreeCursor { let mem_page_rc = self.stack.top(); let cell_idx = self.stack.current_index() as usize; - debug!("current id={} cell={}", mem_page_rc.borrow().id, cell_idx); - return_if_locked!(mem_page_rc.borrow()); - if !mem_page_rc.borrow().is_loaded() { + debug!("current id={} cell={}", mem_page_rc.get().id, cell_idx); + return_if_locked!(mem_page_rc); + if !mem_page_rc.is_loaded() { self.pager.load_page(mem_page_rc.clone())?; return Ok(CursorResult::IO); } - let mem_page = mem_page_rc.borrow(); + let mem_page = mem_page_rc.get(); let contents = mem_page.contents.as_ref().unwrap(); @@ -397,11 +394,10 @@ impl BTreeCursor { return_if_io!(self.move_to(key.clone(), op.clone())); { - let page_rc = self.stack.top(); - let page = page_rc.borrow(); + let page = self.stack.top(); return_if_locked!(page); - let contents = page.contents.as_ref().unwrap(); + let contents = page.get().contents.as_ref().unwrap(); for cell_idx in 0..contents.cell_count() { let cell = contents.cell_get( @@ -491,11 +487,10 @@ impl BTreeCursor { loop { let mem_page = self.stack.top(); - let page_idx = mem_page.borrow().id; + let page_idx = mem_page.get().id; let page = self.pager.read_page(page_idx)?; - let page = RefCell::borrow(&page); return_if_locked!(page); - let contents = page.contents.as_ref().unwrap(); + let contents = page.get().contents.as_ref().unwrap(); if contents.is_leaf() { if contents.cell_count() > 0 { self.stack.set_cell_index(contents.cell_count() as i32 - 1); @@ -545,11 +540,10 @@ impl BTreeCursor { self.move_to_root(); loop { - let page_rc = self.stack.top(); - let page = RefCell::borrow(&page_rc); + let page = self.stack.top(); return_if_locked!(page); - let contents = page.contents.as_ref().unwrap(); + let contents = page.get().contents.as_ref().unwrap(); if contents.is_leaf() { return Ok(CursorResult::Ok(())); } @@ -649,7 +643,7 @@ impl BTreeCursor { let state = &self.write_info.state; match state { WriteState::Start => { - let page_ref = self.stack.top(); + let page = self.stack.top(); let int_key = match key { OwnedValue::Integer(i) => *i as u64, _ => unreachable!("btree tables are indexed by integers!"), @@ -657,13 +651,12 @@ impl BTreeCursor { // get page and find cell let (cell_idx, page_type) = { - let mut page = page_ref.borrow_mut(); return_if_locked!(page); page.set_dirty(); - self.pager.add_dirty(page.id); + self.pager.add_dirty(page.get().id); - let page = page.contents.as_mut().unwrap(); + let page = page.get().contents.as_mut().unwrap(); assert!(matches!(page.page_type(), PageType::TableLeaf)); // find cell @@ -679,8 +672,7 @@ impl BTreeCursor { // insert let overflow = { - let mut page = page_ref.borrow_mut(); - let contents = page.contents.as_mut().unwrap(); + let contents = page.get().contents.as_mut().unwrap(); log::debug!( "insert_into_page(overflow, cell_count={})", contents.cell_count() @@ -831,11 +823,10 @@ impl BTreeCursor { // can be a "rightmost pointer" or a "cell". // we always asumme there is a parent let current_page = self.stack.top(); - let mut page_rc = current_page.borrow_mut(); { // check if we don't need to balance // don't continue if there are no overflow cells - let page = page_rc.contents.as_mut().unwrap(); + let page = current_page.get().contents.as_mut().unwrap(); if page.overflow_cells.is_empty() { self.write_info.state = WriteState::Finish; return Ok(CursorResult::Ok(())); @@ -843,17 +834,15 @@ impl BTreeCursor { } if !self.stack.has_parent() { - drop(page_rc); - drop(current_page); self.balance_root(); return Ok(CursorResult::Ok(())); } - debug!("Balancing leaf. leaf={}", page_rc.id); + debug!("Balancing leaf. leaf={}", current_page.get().id); // Copy of page used to reference cell bytes. // This needs to be saved somewhere safe so taht references still point to here, // this will be store in write_info below - let page_copy = page_rc.contents.as_ref().unwrap().clone(); + let page_copy = current_page.get().contents.as_ref().unwrap().clone(); // In memory in order copy of all cells in pages we want to balance. For now let's do a 2 page split. // Right pointer in interior cells should be converted to regular cells if more than 2 pages are used for balancing. @@ -880,7 +869,7 @@ impl BTreeCursor { // allocate new pages and move cells to those new pages // split procedure - let page = page_rc.contents.as_mut().unwrap(); + let page = current_page.get().contents.as_mut().unwrap(); assert!( matches!( page.page_type(), @@ -889,9 +878,8 @@ impl BTreeCursor { "indexes still not supported " ); - let right_page_ref = self.allocate_page(page.page_type(), 0); - let right_page = right_page_ref.borrow_mut(); - let right_page_id = right_page.id; + let right_page = self.allocate_page(page.page_type(), 0); + let right_page_id = right_page.get().id; self.write_info.new_pages.borrow_mut().clear(); self.write_info @@ -901,41 +889,43 @@ impl BTreeCursor { self.write_info .new_pages .borrow_mut() - .push(right_page_ref.clone()); + .push(right_page.clone()); - debug!("splitting left={} right={}", page_rc.id, right_page_id); + debug!( + "splitting left={} right={}", + current_page.get().id, + right_page_id + ); self.write_info.state = WriteState::BalanceGetParentPage; Ok(CursorResult::Ok(())) } WriteState::BalanceGetParentPage => { - let parent_rc = self.stack.parent(); - let loaded = parent_rc.borrow().is_loaded(); - return_if_locked!(parent_rc.borrow()); + let parent = self.stack.parent(); + let loaded = parent.is_loaded(); + return_if_locked!(parent); if !loaded { debug!("balance_leaf(loading page)"); - self.pager.load_page(parent_rc.clone())?; + self.pager.load_page(parent.clone())?; return Ok(CursorResult::IO); } - parent_rc.borrow_mut().set_dirty(); + parent.set_dirty(); self.write_info.state = WriteState::BalanceMoveUp; Ok(CursorResult::Ok(())) } WriteState::BalanceMoveUp => { - let parent_ref = self.stack.parent(); - let mut parent = parent_ref.borrow_mut(); + let parent = self.stack.parent(); let (page_type, current_idx) = { let current_page = self.stack.top(); - let page_ref = current_page.borrow(); - let contents = page_ref.contents.as_ref().unwrap(); - (contents.page_type().clone(), page_ref.id) + let contents = current_page.get().contents.as_ref().unwrap(); + (contents.page_type().clone(), current_page.get().id) }; parent.set_dirty(); - self.pager.add_dirty(parent.id); - let parent_contents = parent.contents.as_mut().unwrap(); + self.pager.add_dirty(parent.get().id); + let parent_contents = parent.get().contents.as_mut().unwrap(); // if this isn't empty next loop won't work assert_eq!(parent_contents.overflow_cells.len(), 0); @@ -974,9 +964,8 @@ impl BTreeCursor { // reset pages for page in new_pages.iter() { - let mut page = page.borrow_mut(); assert!(page.is_dirty()); - let contents = page.contents.as_mut().unwrap(); + let contents = page.get().contents.as_mut().unwrap(); contents.write_u16(BTREE_HEADER_OFFSET_FREEBLOCK, 0); contents.write_u16(BTREE_HEADER_OFFSET_CELL_COUNT, 0); @@ -1005,9 +994,8 @@ impl BTreeCursor { ); for (i, page) in new_pages.iter_mut().enumerate() { - let mut page = page.borrow_mut(); - let page_id = page.id; - let contents = page.contents.as_mut().unwrap(); + let page_id = page.get().id; + let contents = page.get().contents.as_mut().unwrap(); let last_page = i == new_pages_len - 1; let cells_to_copy = if last_page { @@ -1033,16 +1021,14 @@ impl BTreeCursor { } let is_leaf = { let page = self.stack.top(); - let page = page.borrow(); - let page = page.contents.as_ref().unwrap(); + let page = page.get().contents.as_ref().unwrap(); page.is_leaf() }; // update rightmost pointer for each page if we are in interior page if !is_leaf { for page in new_pages.iter_mut().take(new_pages_len - 1) { - let mut page = page.borrow_mut(); - let contents = page.contents.as_mut().unwrap(); + let contents = page.get().contents.as_mut().unwrap(); assert!(contents.cell_count() == 1); let last_cell = contents @@ -1063,8 +1049,7 @@ impl BTreeCursor { } // last page right most pointer points to previous right most pointer before splitting let last_page = new_pages.last().unwrap(); - let mut last_page = last_page.borrow_mut(); - let last_page_contents = last_page.contents.as_mut().unwrap(); + let last_page_contents = last_page.get().contents.as_mut().unwrap(); last_page_contents.write_u32( BTREE_HEADER_OFFSET_RIGHTMOST, self.write_info.rightmost_pointer.borrow().unwrap(), @@ -1076,8 +1061,7 @@ impl BTreeCursor { for (page_id_index, page) in new_pages.iter_mut().take(new_pages_len - 1).enumerate() { - let mut page = page.borrow_mut(); - let contents = page.contents.as_mut().unwrap(); + let contents = page.get().contents.as_mut().unwrap(); let divider_cell_index = divider_cells_index[page_id_index]; let cell_payload = scratch_cells[divider_cell_index]; let cell = read_btree_cell( @@ -1098,7 +1082,7 @@ impl BTreeCursor { _ => unreachable!(), }; let mut divider_cell = Vec::new(); - divider_cell.extend_from_slice(&(page.id as u32).to_be_bytes()); + divider_cell.extend_from_slice(&(page.get().id as u32).to_be_bytes()); divider_cell.extend(std::iter::repeat(0).take(9)); let n = write_varint(&mut divider_cell.as_mut_slice()[4..], key); divider_cell.truncate(4 + n); @@ -1122,7 +1106,7 @@ impl BTreeCursor { { // copy last page id to right pointer - let last_pointer = new_pages.last().unwrap().borrow().id as u32; + let last_pointer = new_pages.last().unwrap().get().id as u32; parent_contents.write_u32(right_pointer, last_pointer); } self.stack.pop(); @@ -1141,20 +1125,17 @@ impl BTreeCursor { let is_page_1 = { let current_root = self.stack.top(); - let current_root_ref = current_root.borrow(); - current_root_ref.id == 1 + current_root.get().id == 1 }; let offset = if is_page_1 { DATABASE_HEADER_SIZE } else { 0 }; - let new_root_page_ref = self.allocate_page(PageType::TableInterior, offset); + let new_root_page = self.allocate_page(PageType::TableInterior, offset); { let current_root = self.stack.top(); - let current_root_ref = current_root.borrow(); - let current_root_contents = current_root_ref.contents.as_ref().unwrap(); + let current_root_contents = current_root.get().contents.as_ref().unwrap(); - let mut new_root_page = new_root_page_ref.borrow_mut(); - let new_root_page_id = new_root_page.id; - let new_root_page_contents = new_root_page.contents.as_mut().unwrap(); + let new_root_page_id = new_root_page.get().id; + let new_root_page_contents = new_root_page.get().contents.as_mut().unwrap(); if is_page_1 { // Copy header let current_root_buf = current_root_contents.as_ptr(); @@ -1166,8 +1147,6 @@ impl BTreeCursor { new_root_page_contents .write_u32(BTREE_HEADER_OFFSET_RIGHTMOST, new_root_page_id as u32); new_root_page_contents.write_u16(BTREE_HEADER_OFFSET_CELL_COUNT, 0); - // TODO:: this page should have offset - // copy header bytes to here } /* swap splitted page buffer with new root buffer so we don't have to update page idx */ @@ -1175,18 +1154,16 @@ impl BTreeCursor { let (root_id, child_id, child) = { let page_ref = self.stack.top(); let child = page_ref.clone(); - let mut child_rc = page_ref.borrow_mut(); - let mut new_root_page = new_root_page_ref.borrow_mut(); // Swap the entire Page structs - std::mem::swap(&mut child_rc.id, &mut new_root_page.id); + std::mem::swap(&mut child.get().id, &mut new_root_page.get().id); // TODO:: shift bytes by offset to left on child because now child has offset 100 // and header bytes // Also change the offset of page // if is_page_1 { // Remove header from child and set offset to 0 - let contents = child_rc.contents.as_mut().unwrap(); + let contents = child.get().contents.as_mut().unwrap(); let (cell_pointer_offset, _) = contents.cell_get_raw_pointer_region(); // change cell pointers for cell_idx in 0..contents.cell_count() { @@ -1200,13 +1177,13 @@ impl BTreeCursor { buf.copy_within(DATABASE_HEADER_SIZE.., 0); } - self.pager.add_dirty(new_root_page.id); - self.pager.add_dirty(child_rc.id); - (new_root_page.id, child_rc.id, child) + self.pager.add_dirty(new_root_page.get().id); + self.pager.add_dirty(child.get().id); + (new_root_page.get().id, child.get().id, child) }; debug!("Balancing root. root={}, rightmost={}", root_id, child_id); - let root = new_root_page_ref.clone(); + let root = new_root_page.clone(); self.root_page = root_id; self.stack.clear(); @@ -1218,22 +1195,19 @@ impl BTreeCursor { } } - fn allocate_page(&self, page_type: PageType, offset: usize) -> Rc> { + fn allocate_page(&self, page_type: PageType, offset: usize) -> PageRef { let page = self.pager.allocate_page().unwrap(); btree_init_page(&page, page_type, &self.database_header.borrow(), offset); page } - fn allocate_overflow_page(&self) -> Rc> { + fn allocate_overflow_page(&self) -> PageRef { let page = self.pager.allocate_page().unwrap(); - { - // setup overflow page - let mut contents = page.borrow_mut(); - let contents = contents.contents.as_mut().unwrap(); - let buf = contents.as_ptr(); - buf.fill(0); - } + // setup overflow page + let contents = page.get().contents.as_mut().unwrap(); + let buf = contents.as_ptr(); + buf.fill(0); page } @@ -1493,9 +1467,8 @@ impl BTreeCursor { let overflow_page = self.allocate_overflow_page(); overflow_pages.push(overflow_page.clone()); { - let mut page = overflow_page.borrow_mut(); - let id = page.id as u32; - let contents = page.contents.as_mut().unwrap(); + let id = overflow_page.get().id as u32; + let contents = overflow_page.get().contents.as_mut().unwrap(); // TODO: take into account offset here? let buf = contents.as_ptr(); @@ -1572,11 +1545,11 @@ impl BTreeCursor { } impl PageStack { - fn push(&self, page: Rc>) { + fn push(&self, page: PageRef) { debug!( "pagestack::push(current={}, new_page_id={})", self.current_page.borrow(), - page.borrow().id + page.get().id ); *self.current_page.borrow_mut() += 1; let current = *self.current_page.borrow(); @@ -1596,7 +1569,7 @@ impl PageStack { *self.current_page.borrow_mut() -= 1; } - fn top(&self) -> Rc> { + fn top(&self) -> PageRef { let current = *self.current_page.borrow(); let page = self.stack.borrow()[current as usize] .as_ref() @@ -1605,12 +1578,12 @@ impl PageStack { debug!( "pagestack::top(current={}, page_id={})", current, - page.borrow().id + page.get().id ); page } - fn parent(&self) -> Rc> { + fn parent(&self) -> PageRef { let current = *self.current_page.borrow(); self.stack.borrow()[current as usize - 1] .as_ref() @@ -1794,12 +1767,11 @@ impl Cursor for BTreeCursor { _ => unreachable!("btree tables are indexed by integers!"), }; return_if_io!(self.move_to(SeekKey::TableRowId(*int_key as u64), SeekOp::EQ)); - let page_ref = self.stack.top(); - let page = page_ref.borrow(); + let page = self.stack.top(); // TODO(pere): request load return_if_locked!(page); - let contents = page.contents.as_ref().unwrap(); + let contents = page.get().contents.as_ref().unwrap(); // find cell let int_key = match key { @@ -1834,19 +1806,19 @@ impl Cursor for BTreeCursor { ), }; let page = self.allocate_page(page_type, 0); - let id = page.borrow().id; + let id = page.get().id; id as u32 } } pub fn btree_init_page( - page: &Rc>, + page: &PageRef, page_type: PageType, db_header: &DatabaseHeader, offset: usize, ) { // setup btree page - let mut contents = page.borrow_mut(); + let contents = page.get(); debug!("btree_init_page(id={}, offset={})", contents.id, offset); let contents = contents.contents.as_mut().unwrap(); contents.offset = offset; diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 1bfbe8392..43ed17e97 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -5,22 +5,30 @@ use crate::storage::wal::Wal; use crate::{Buffer, Result}; use log::{debug, trace}; use sieve_cache::SieveCache; -use std::cell::RefCell; +use std::cell::{RefCell, UnsafeCell}; use std::collections::{HashMap, HashSet}; use std::hash::Hash; use std::ptr::{drop_in_place, NonNull}; use std::rc::Rc; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use super::wal::CheckpointStatus; -pub struct Page { +pub struct PageInner { pub flags: AtomicUsize, pub contents: Option, pub id: usize, } +pub struct Page { + pub inner: UnsafeCell, +} + +// Concurrency control of pages will be handled by the pager, we won't wrap Page with RwLock +// because that is bad bad. +pub type PageRef = Arc; + /// Page is up-to-date. const PAGE_UPTODATE: usize = 0b001; /// Page is locked for I/O to prevent concurrent access. @@ -35,78 +43,84 @@ const PAGE_LOADED: usize = 0b10000; impl Page { pub fn new(id: usize) -> Page { Page { - flags: AtomicUsize::new(0), - contents: None, - id, + inner: UnsafeCell::new(PageInner { + flags: AtomicUsize::new(0), + contents: None, + id, + }), } } + pub fn get(&self) -> &mut PageInner { + unsafe { &mut *self.inner.get() } + } + pub fn is_uptodate(&self) -> bool { - self.flags.load(Ordering::SeqCst) & PAGE_UPTODATE != 0 + self.get().flags.load(Ordering::SeqCst) & PAGE_UPTODATE != 0 } pub fn set_uptodate(&self) { - self.flags.fetch_or(PAGE_UPTODATE, Ordering::SeqCst); + self.get().flags.fetch_or(PAGE_UPTODATE, Ordering::SeqCst); } pub fn clear_uptodate(&self) { - self.flags.fetch_and(!PAGE_UPTODATE, Ordering::SeqCst); + self.get().flags.fetch_and(!PAGE_UPTODATE, Ordering::SeqCst); } pub fn is_locked(&self) -> bool { - self.flags.load(Ordering::SeqCst) & PAGE_LOCKED != 0 + self.get().flags.load(Ordering::SeqCst) & PAGE_LOCKED != 0 } pub fn set_locked(&self) { - self.flags.fetch_or(PAGE_LOCKED, Ordering::SeqCst); + self.get().flags.fetch_or(PAGE_LOCKED, Ordering::SeqCst); } pub fn clear_locked(&self) { - self.flags.fetch_and(!PAGE_LOCKED, Ordering::SeqCst); + self.get().flags.fetch_and(!PAGE_LOCKED, Ordering::SeqCst); } pub fn is_error(&self) -> bool { - self.flags.load(Ordering::SeqCst) & PAGE_ERROR != 0 + self.get().flags.load(Ordering::SeqCst) & PAGE_ERROR != 0 } pub fn set_error(&self) { - self.flags.fetch_or(PAGE_ERROR, Ordering::SeqCst); + self.get().flags.fetch_or(PAGE_ERROR, Ordering::SeqCst); } pub fn clear_error(&self) { - self.flags.fetch_and(!PAGE_ERROR, Ordering::SeqCst); + self.get().flags.fetch_and(!PAGE_ERROR, Ordering::SeqCst); } pub fn is_dirty(&self) -> bool { - self.flags.load(Ordering::SeqCst) & PAGE_DIRTY != 0 + self.get().flags.load(Ordering::SeqCst) & PAGE_DIRTY != 0 } pub fn set_dirty(&self) { - self.flags.fetch_or(PAGE_DIRTY, Ordering::SeqCst); + self.get().flags.fetch_or(PAGE_DIRTY, Ordering::SeqCst); } pub fn clear_dirty(&self) { - self.flags.fetch_and(!PAGE_DIRTY, Ordering::SeqCst); + self.get().flags.fetch_and(!PAGE_DIRTY, Ordering::SeqCst); } pub fn is_loaded(&self) -> bool { - self.flags.load(Ordering::SeqCst) & PAGE_LOADED != 0 + self.get().flags.load(Ordering::SeqCst) & PAGE_LOADED != 0 } pub fn set_loaded(&self) { - self.flags.fetch_or(PAGE_LOADED, Ordering::SeqCst); + self.get().flags.fetch_or(PAGE_LOADED, Ordering::SeqCst); } pub fn clear_loaded(&self) { - log::debug!("clear loaded {}", self.id); - self.flags.fetch_and(!PAGE_LOADED, Ordering::SeqCst); + log::debug!("clear loaded {}", self.get().id); + self.get().flags.fetch_and(!PAGE_LOADED, Ordering::SeqCst); } } #[allow(dead_code)] struct PageCacheEntry { key: usize, - page: Rc>, + page: PageRef, prev: Option>, next: Option>, } @@ -117,12 +131,14 @@ impl PageCacheEntry { } } -struct DumbLruPageCache { +pub struct DumbLruPageCache { capacity: usize, map: RefCell>>, head: RefCell>>, tail: RefCell>>, } +unsafe impl Send for DumbLruPageCache {} +unsafe impl Sync for DumbLruPageCache {} impl DumbLruPageCache { pub fn new(capacity: usize) -> Self { @@ -138,7 +154,7 @@ impl DumbLruPageCache { self.map.borrow().contains_key(&key) } - pub fn insert(&mut self, key: usize, value: Rc>) { + pub fn insert(&mut self, key: usize, value: PageRef) { self._delete(key, false); debug!("cache_insert(key={})", key); let mut entry = Box::new(PageCacheEntry { @@ -181,7 +197,7 @@ impl DumbLruPageCache { ptr.copied() } - pub fn get(&mut self, key: &usize) -> Option>> { + pub fn get(&mut self, key: &usize) -> Option { debug!("cache_get(key={})", key); let ptr = self.get_ptr(*key); ptr?; @@ -202,10 +218,10 @@ impl DumbLruPageCache { if clean_page { // evict buffer - let mut page = entry.page.borrow_mut(); + let page = &entry.page; page.clear_loaded(); - debug!("cleaning up page {}", page.id); - let _ = page.contents.take(); + debug!("cleaning up page {}", page.get().id); + let _ = page.get().contents.take(); } let (next, prev) = unsafe { @@ -254,7 +270,7 @@ impl DumbLruPageCache { return; } let tail = unsafe { tail.unwrap().as_mut() }; - if RefCell::borrow(&tail.page).is_dirty() { + if tail.page.is_dirty() { // TODO: drop from another clean entry? return; } @@ -325,7 +341,7 @@ pub struct Pager { /// The write-ahead log (WAL) for the database. wal: Rc>, /// A page cache for the database. - page_cache: RefCell, + page_cache: Arc>, /// Buffer pool for temporary data storage. buffer_pool: Rc, /// I/O interface for input/output operations. @@ -351,11 +367,11 @@ impl Pager { page_io: Rc, wal: Rc>, io: Arc, + page_cache: Arc>, ) -> Result { let db_header = RefCell::borrow(&db_header_ref); let page_size = db_header.page_size as usize; let buffer_pool = Rc::new(BufferPool::new(page_size)); - let page_cache = RefCell::new(DumbLruPageCache::new(10)); Ok(Self { page_io, wal, @@ -394,21 +410,21 @@ impl Pager { } /// Reads a page from the database. - pub fn read_page(&self, page_idx: usize) -> crate::Result>> { + pub fn read_page(&self, page_idx: usize) -> crate::Result { trace!("read_page(page_idx = {})", page_idx); - let mut page_cache = self.page_cache.borrow_mut(); + let mut page_cache = self.page_cache.write().unwrap(); if let Some(page) = page_cache.get(&page_idx) { trace!("read_page(page_idx = {}) = cached", page_idx); return Ok(page.clone()); } - let page = Rc::new(RefCell::new(Page::new(page_idx))); - RefCell::borrow(&page).set_locked(); + let page = Arc::new(Page::new(page_idx)); + page.set_locked(); + if let Some(frame_id) = self.wal.borrow().find_frame(page_idx as u64)? { self.wal .borrow() .read_frame(frame_id, page.clone(), self.buffer_pool.clone())?; { - let page = page.borrow_mut(); page.set_uptodate(); } // TODO(pere) ensure page is inserted, we should probably first insert to page cache @@ -428,17 +444,16 @@ impl Pager { } /// Loads pages if not loaded - pub fn load_page(&self, page: Rc>) -> Result<()> { - let id = page.borrow().id; + pub fn load_page(&self, page: PageRef) -> Result<()> { + let id = page.get().id; trace!("load_page(page_idx = {})", id); - let mut page_cache = self.page_cache.borrow_mut(); - page.borrow_mut().set_locked(); + let mut page_cache = self.page_cache.write().unwrap(); + page.set_locked(); if let Some(frame_id) = self.wal.borrow().find_frame(id as u64)? { self.wal .borrow() .read_frame(frame_id, page.clone(), self.buffer_pool.clone())?; { - let page = page.borrow_mut(); page.set_uptodate(); } // TODO(pere) ensure page is inserted @@ -467,7 +482,8 @@ impl Pager { /// Changes the size of the page cache. pub fn change_page_cache_size(&self, capacity: usize) { - self.page_cache.borrow_mut().resize(capacity); + let mut page_cache = self.page_cache.write().unwrap(); + page_cache.resize(capacity); } pub fn add_dirty(&self, page_id: usize) { @@ -483,9 +499,9 @@ impl Pager { FlushState::Start => { let db_size = self.db_header.borrow().database_size; for page_id in self.dirty_pages.borrow().iter() { - let mut cache = self.page_cache.borrow_mut(); + let mut cache = self.page_cache.write().unwrap(); let page = cache.get(page_id).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it."); - let page_type = page.borrow().contents.as_ref().unwrap().maybe_page_type(); + let page_type = page.get().contents.as_ref().unwrap().maybe_page_type(); debug!("appending frame {} {:?}", page_id, page_type); self.wal.borrow_mut().append_frame( page.clone(), @@ -589,7 +605,7 @@ impl Pager { Err(err) => panic!("error while clearing cache {}", err), } } - self.page_cache.borrow_mut().clear(); + self.page_cache.write().unwrap().clear(); } /* @@ -597,7 +613,7 @@ impl Pager { Currently free list pages are not yet supported. */ #[allow(clippy::readonly_write_lock)] - pub fn allocate_page(&self) -> Result>> { + pub fn allocate_page(&self) -> Result { let header = &self.db_header; let mut header = RefCell::borrow_mut(header); header.database_size += 1; @@ -606,38 +622,35 @@ impl Pager { // read sync for now loop { let first_page_ref = self.read_page(1)?; - let first_page = RefCell::borrow_mut(&first_page_ref); - if first_page.is_locked() { - drop(first_page); + if first_page_ref.is_locked() { self.io.run_once()?; continue; } - first_page.set_dirty(); + first_page_ref.set_dirty(); self.add_dirty(1); - let contents = first_page.contents.as_ref().unwrap(); + let contents = first_page_ref.get().contents.as_ref().unwrap(); contents.write_database_header(&header); break; } } - let page_ref = allocate_page(header.database_size as usize, &self.buffer_pool, 0); + let page = allocate_page(header.database_size as usize, &self.buffer_pool, 0); { // setup page and add to cache - let page = page_ref.borrow_mut(); page.set_dirty(); - self.add_dirty(page.id); - let mut cache = self.page_cache.borrow_mut(); - cache.insert(page.id, page_ref.clone()); + self.add_dirty(page.get().id); + let mut cache = self.page_cache.write().unwrap(); + cache.insert(page.get().id, page.clone()); } - Ok(page_ref) + Ok(page) } - pub fn put_loaded_page(&self, id: usize, page: Rc>) { - let mut cache = RefCell::borrow_mut(&self.page_cache); + pub fn put_loaded_page(&self, id: usize, page: PageRef) { + let mut cache = self.page_cache.write().unwrap(); // cache insert invalidates previous page cache.insert(id, page.clone()); - page.borrow_mut().set_loaded(); + page.set_loaded(); } pub fn usable_size(&self) -> usize { @@ -646,14 +659,9 @@ impl Pager { } } -pub fn allocate_page( - page_id: usize, - buffer_pool: &Rc, - offset: usize, -) -> Rc> { - let page_ref = Rc::new(RefCell::new(Page::new(page_id))); +pub fn allocate_page(page_id: usize, buffer_pool: &Rc, offset: usize) -> PageRef { + let page = Arc::new(Page::new(page_id)); { - let mut page = RefCell::borrow_mut(&page_ref); let buffer = buffer_pool.get(); let bp = buffer_pool.clone(); let drop_fn = Rc::new(move |buf| { @@ -661,11 +669,36 @@ pub fn allocate_page( }); let buffer = Rc::new(RefCell::new(Buffer::new(buffer, drop_fn))); page.set_loaded(); - page.contents = Some(PageContent { + page.get().contents = Some(PageContent { offset, buffer, overflow_cells: Vec::new(), }); } - page_ref + page +} + +#[cfg(test)] +mod tests { + use std::sync::{Arc, RwLock}; + + use super::{DumbLruPageCache, Page}; + + #[test] + fn test_shared_cache() { + // ensure cache can be shared between threads + let cache = Arc::new(RwLock::new(DumbLruPageCache::new(10))); + + let thread = { + let cache = cache.clone(); + std::thread::spawn(move || { + let mut cache = cache.write().unwrap(); + cache.insert(1, Arc::new(Page::new(1))); + }) + }; + let _ = thread.join(); + let mut cache = cache.write().unwrap(); + let page = cache.get(&1); + assert_eq!(page.unwrap().get().id, 1); + } } diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 3f7935491..3d7b7ae11 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -45,13 +45,16 @@ use crate::error::LimboError; use crate::io::{Buffer, Completion, ReadCompletion, SyncCompletion, WriteCompletion}; use crate::storage::buffer_pool::BufferPool; use crate::storage::database::DatabaseStorage; -use crate::storage::pager::{Page, Pager}; +use crate::storage::pager::Pager; use crate::types::{OwnedRecord, OwnedValue}; use crate::{File, Result}; use log::trace; use std::cell::RefCell; use std::pin::Pin; use std::rc::Rc; +use std::sync::{Arc, RwLock}; + +use super::pager::PageRef; /// The size of the database header in bytes. pub const DATABASE_HEADER_SIZE: usize = 100; @@ -95,7 +98,7 @@ pub const WAL_FRAME_HEADER_SIZE: usize = 24; pub const WAL_MAGIC_LE: u32 = 0x377f0682; pub const WAL_MAGIC_BE: u32 = 0x377f0683; -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone)] #[repr(C)] // This helps with encoding because rust does not respect the order in structs, so in // this case we want to keep the order pub struct WalHeader { @@ -531,7 +534,7 @@ impl PageContent { pub fn begin_read_page( page_io: Rc, buffer_pool: Rc, - page: Rc>, + page: PageRef, page_idx: usize, ) -> Result<()> { trace!("begin_read_btree_page(page_idx = {})", page_idx); @@ -544,7 +547,7 @@ pub fn begin_read_page( let complete = Box::new(move |buf: Rc>| { let page = page.clone(); if finish_read_page(page_idx, buf, page.clone()).is_err() { - page.borrow_mut().set_error(); + page.set_error(); } }); let c = Rc::new(Completion::Read(ReadCompletion::new(buf, complete))); @@ -552,11 +555,7 @@ pub fn begin_read_page( Ok(()) } -fn finish_read_page( - page_idx: usize, - buffer_ref: Rc>, - page: Rc>, -) -> Result<()> { +fn finish_read_page(page_idx: usize, buffer_ref: Rc>, page: PageRef) -> Result<()> { trace!("finish_read_btree_page(page_idx = {})", page_idx); let pos = if page_idx == 1 { DATABASE_HEADER_SIZE @@ -569,8 +568,7 @@ fn finish_read_page( overflow_cells: Vec::new(), }; { - let mut page = page.borrow_mut(); - page.contents.replace(inner); + page.get().contents.replace(inner); page.set_uptodate(); page.clear_locked(); page.set_loaded(); @@ -580,16 +578,16 @@ fn finish_read_page( pub fn begin_write_btree_page( pager: &Pager, - page: &Rc>, + page: &PageRef, write_counter: Rc>, ) -> Result<()> { let page_source = &pager.page_io; let page_finish = page.clone(); - let page_id = page.borrow().id; + let page_id = page.get().id; log::trace!("begin_write_btree_page(page_id={})", page_id); let buffer = { - let page = page.borrow(); + let page = page.get(); let contents = page.contents.as_ref().unwrap(); contents.buffer.clone() }; @@ -603,7 +601,7 @@ pub fn begin_write_btree_page( let buf_len = buf_copy.borrow().len(); *write_counter.borrow_mut() -= 1; - page_finish.borrow_mut().clear_dirty(); + page_finish.clear_dirty(); if bytes_written < buf_len as i32 { log::error!("wrote({bytes_written}) less than expected({buf_len})"); } @@ -770,7 +768,7 @@ fn read_payload(unread: &[u8], payload_size: usize, pager: Rc) -> (Vec) { payload.extend_from_slice(&varint); } -pub fn begin_read_wal_header(io: &Rc) -> Result>> { +pub fn begin_read_wal_header(io: &Rc) -> Result>> { let drop_fn = Rc::new(|_buf| {}); let buf = Rc::new(RefCell::new(Buffer::allocate(512, drop_fn))); - let result = Rc::new(RefCell::new(WalHeader::default())); + let result = Arc::new(RwLock::new(WalHeader::default())); let header = result.clone(); let complete = Box::new(move |buf: Rc>| { let header = header.clone(); @@ -1020,10 +1018,10 @@ pub fn begin_read_wal_header(io: &Rc) -> Result> Ok(result) } -fn finish_read_wal_header(buf: Rc>, header: Rc>) -> Result<()> { +fn finish_read_wal_header(buf: Rc>, header: Arc>) -> Result<()> { let buf = buf.borrow(); let buf = buf.as_slice(); - let mut header = header.borrow_mut(); + let mut header = header.write().unwrap(); header.magic = u32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]); header.file_format = u32::from_be_bytes([buf[4], buf[5], buf[6], buf[7]]); header.page_size = u32::from_be_bytes([buf[8], buf[9], buf[10], buf[11]]); @@ -1039,7 +1037,7 @@ pub fn begin_read_wal_frame( io: &Rc, offset: usize, buffer_pool: Rc, - page: Rc>, + page: PageRef, ) -> Result<()> { let buf = buffer_pool.get(); let drop_fn = Rc::new(move |buf| { @@ -1060,14 +1058,14 @@ pub fn begin_read_wal_frame( pub fn begin_write_wal_frame( io: &Rc, offset: usize, - page: &Rc>, + page: &PageRef, db_size: u32, write_counter: Rc>, wal_header: &WalHeader, checksums: (u32, u32), ) -> Result<(u32, u32)> { let page_finish = page.clone(); - let page_id = page.borrow().id; + let page_id = page.get().id; trace!("begin_write_wal_frame(offset={}, page={})", offset, page_id); let mut header = WalFrameHeader { @@ -1079,7 +1077,7 @@ pub fn begin_write_wal_frame( checksum_2: 0, }; let (buffer, checksums) = { - let page = page.borrow(); + let page = page.get(); let contents = page.contents.as_ref().unwrap(); let drop_fn = Rc::new(|_buf| {}); @@ -1122,7 +1120,7 @@ pub fn begin_write_wal_frame( let buf_len = buf_copy.borrow().len(); *write_counter.borrow_mut() -= 1; - page_finish.borrow_mut().clear_dirty(); + page_finish.clear_dirty(); if bytes_written < buf_len as i32 { log::error!("wrote({bytes_written}) less than expected({buf_len})"); } diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 774e57cc9..3ae854879 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -1,4 +1,5 @@ use std::collections::{HashMap, HashSet}; +use std::sync::RwLock; use std::{cell::RefCell, rc::Rc, sync::Arc}; use log::{debug, trace}; @@ -8,12 +9,12 @@ use crate::storage::sqlite3_ondisk::{ begin_read_wal_frame, begin_write_wal_frame, WAL_FRAME_HEADER_SIZE, WAL_HEADER_SIZE, }; use crate::Completion; -use crate::{storage::pager::Page, Result}; +use crate::Result; use self::sqlite3_ondisk::{checksum_wal, WAL_MAGIC_BE, WAL_MAGIC_LE}; use super::buffer_pool::BufferPool; -use super::pager::Pager; +use super::pager::{PageRef, Pager}; use super::sqlite3_ondisk::{self, begin_write_btree_page, WalHeader}; /// Write-ahead log (WAL). @@ -34,17 +35,12 @@ pub trait Wal { fn find_frame(&self, page_id: u64) -> Result>; /// Read a frame from the WAL. - fn read_frame( - &self, - frame_id: u64, - page: Rc>, - buffer_pool: Rc, - ) -> Result<()>; + fn read_frame(&self, frame_id: u64, page: PageRef, buffer_pool: Rc) -> Result<()>; /// Write a frame to the WAL. fn append_frame( &mut self, - page: Rc>, + page: PageRef, db_size: u32, pager: &Pager, write_counter: Rc>, @@ -61,22 +57,24 @@ pub trait Wal { pub struct WalFile { io: Arc, - wal_path: String, - file: RefCell>>, - wal_header: RefCell>>>, - min_frame: RefCell, - max_frame: RefCell, - nbackfills: RefCell, - // Maps pgno to frame id and offset in wal file - frame_cache: RefCell>>, // FIXME: for now let's use a simple hashmap instead of a shm file - checkpoint_threshold: usize, - ongoing_checkpoint: HashSet, syncing: Rc>, page_size: usize, - last_checksum: RefCell<(u32, u32)>, // Check of last frame in WAL, this is a cumulative checksum - // over all frames in the WAL + ongoing_checkpoint: HashSet, + shared: Arc>, + checkpoint_threshold: usize, +} + +pub struct WalFileShared { + wal_header: Arc>, + min_frame: u64, + max_frame: u64, + nbackfills: u64, + // Maps pgno to frame id and offset in wal file + frame_cache: HashMap>, // FIXME: for now let's use a simple hashmap instead of a shm file + last_checksum: (u32, u32), // Check of last frame in WAL, this is a cumulative checksum over all frames in the WAL + file: Rc, } pub enum CheckpointStatus { @@ -87,7 +85,8 @@ pub enum CheckpointStatus { impl Wal for WalFile { /// Begin a read transaction. fn begin_read_tx(&self) -> Result<()> { - self.min_frame.replace(*self.nbackfills.borrow() + 1); + let mut shared = self.shared.write().unwrap(); + shared.min_frame = shared.nbackfills + 1; Ok(()) } @@ -98,15 +97,14 @@ impl Wal for WalFile { /// Find the latest frame containing a page. fn find_frame(&self, page_id: u64) -> Result> { - let frame_cache = self.frame_cache.borrow(); - let frames = frame_cache.get(&page_id); + let shared = self.shared.read().unwrap(); + let frames = shared.frame_cache.get(&page_id); if frames.is_none() { return Ok(None); } - self.ensure_init()?; let frames = frames.unwrap(); for frame in frames.iter().rev() { - if *frame <= *self.max_frame.borrow() { + if *frame <= shared.max_frame { return Ok(Some(*frame)); } } @@ -114,16 +112,12 @@ impl Wal for WalFile { } /// Read a frame from the WAL. - fn read_frame( - &self, - frame_id: u64, - page: Rc>, - buffer_pool: Rc, - ) -> Result<()> { + fn read_frame(&self, frame_id: u64, page: PageRef, buffer_pool: Rc) -> Result<()> { debug!("read_frame({})", frame_id); let offset = self.frame_offset(frame_id); + let shared = self.shared.read().unwrap(); begin_read_wal_frame( - self.file.borrow().as_ref().unwrap(), + &shared.file, offset + WAL_FRAME_HEADER_SIZE, buffer_pool, page, @@ -134,14 +128,14 @@ impl Wal for WalFile { /// Write a frame to the WAL. fn append_frame( &mut self, - page: Rc>, + page: PageRef, db_size: u32, _pager: &Pager, write_counter: Rc>, ) -> Result<()> { - self.ensure_init()?; - let page_id = page.borrow().id; - let frame_id = *self.max_frame.borrow(); + let page_id = page.get().id; + let mut shared = self.shared.write().unwrap(); + let frame_id = shared.max_frame; let offset = self.frame_offset(frame_id); trace!( "append_frame(frame={}, offset={}, page_id={})", @@ -149,12 +143,11 @@ impl Wal for WalFile { offset, page_id ); - let header = self.wal_header.borrow(); - let header = header.as_ref().unwrap(); - let header = header.borrow(); - let checksums = *self.last_checksum.borrow(); + let header = shared.wal_header.clone(); + let header = header.read().unwrap(); + let checksums = shared.last_checksum; let checksums = begin_write_wal_frame( - self.file.borrow().as_ref().unwrap(), + &shared.file, offset, &page, db_size, @@ -162,15 +155,14 @@ impl Wal for WalFile { &header, checksums, )?; - self.last_checksum.replace(checksums); - self.max_frame.replace(frame_id + 1); + shared.last_checksum = checksums; + shared.max_frame = frame_id + 1; { - let mut frame_cache = self.frame_cache.borrow_mut(); - let frames = frame_cache.get_mut(&(page_id as u64)); + let frames = shared.frame_cache.get_mut(&(page_id as u64)); match frames { Some(frames) => frames.push(frame_id), None => { - frame_cache.insert(page_id as u64, vec![frame_id]); + shared.frame_cache.insert(page_id as u64, vec![frame_id]); } } } @@ -188,7 +180,8 @@ impl Wal for WalFile { } fn should_checkpoint(&self) -> bool { - let frame_id = *self.max_frame.borrow() as usize; + let shared = self.shared.read().unwrap(); + let frame_id = shared.max_frame as usize; frame_id >= self.checkpoint_threshold } @@ -197,7 +190,8 @@ impl Wal for WalFile { pager: &Pager, write_counter: Rc>, ) -> Result { - for (page_id, _frames) in self.frame_cache.borrow().iter() { + let mut shared = self.shared.write().unwrap(); + for (page_id, _frames) in shared.frame_cache.iter() { // move page from WAL to database file // TODO(Pere): use splice syscall in linux to do zero-copy file page movements to improve perf let page_id = *page_id as usize; @@ -206,7 +200,7 @@ impl Wal for WalFile { } let page = pager.read_page(page_id)?; - if page.borrow().is_locked() { + if page.is_locked() { return Ok(CheckpointStatus::IO); } @@ -214,16 +208,15 @@ impl Wal for WalFile { self.ongoing_checkpoint.insert(page_id); } - self.frame_cache.borrow_mut().clear(); - *self.max_frame.borrow_mut() = 0; + // TODO: only clear checkpointed frames + shared.frame_cache.clear(); + shared.max_frame = 0; self.ongoing_checkpoint.clear(); Ok(CheckpointStatus::Done) } fn sync(&mut self) -> Result { - self.ensure_init()?; - let file = self.file.borrow(); - let file = file.as_ref().unwrap(); + let shared = self.shared.write().unwrap(); { let syncing = self.syncing.clone(); let completion = Completion::Sync(SyncCompletion { @@ -231,7 +224,7 @@ impl Wal for WalFile { *syncing.borrow_mut() = false; }), }); - file.sync(Rc::new(completion))?; + shared.file.sync(Rc::new(completion))?; } if *self.syncing.borrow() { @@ -243,89 +236,86 @@ impl Wal for WalFile { } impl WalFile { - pub fn new(io: Arc, wal_path: String, page_size: usize) -> Self { + pub fn new(io: Arc, page_size: usize, shared: Arc>) -> Self { Self { io, - wal_path, - file: RefCell::new(None), - wal_header: RefCell::new(None), - frame_cache: RefCell::new(HashMap::new()), - min_frame: RefCell::new(0), - max_frame: RefCell::new(0), - nbackfills: RefCell::new(0), - checkpoint_threshold: 1000, + shared, ongoing_checkpoint: HashSet::new(), syncing: Rc::new(RefCell::new(false)), + checkpoint_threshold: 1000, page_size, - last_checksum: RefCell::new((0, 0)), } } - fn ensure_init(&self) -> Result<()> { - if self.file.borrow().is_none() { - match self - .io - .open_file(&self.wal_path, crate::io::OpenFlags::Create, false) - { - Ok(file) => { - if file.size()? > 0 { - let wal_header = match sqlite3_ondisk::begin_read_wal_header(&file) { - Ok(header) => header, - Err(err) => panic!("Couldn't read header page: {:?}", err), - }; - // TODO: Return a completion instead. - self.io.run_once()?; - self.wal_header.replace(Some(wal_header)); - } else { - // magic is a single number represented as WAL_MAGIC_LE but the big endian - // counterpart is just the same number with LSB set to 1. - let magic = if cfg!(target_endian = "big") { - WAL_MAGIC_BE - } else { - WAL_MAGIC_LE - }; - let mut wal_header = WalHeader { - magic, - file_format: 3007000, - page_size: self.page_size as u32, - checkpoint_seq: 0, // TODO implement sequence number - salt_1: 0, // TODO implement salt - salt_2: 0, - checksum_1: 0, - checksum_2: 0, - }; - let native = cfg!(target_endian = "big"); // if target_endian is - // already big then we don't care but if isn't, header hasn't yet been - // encoded to big endian, therefore we wan't to swap bytes to compute this - // checksum. - let checksums = *self.last_checksum.borrow_mut(); - let checksums = checksum_wal( - &wal_header.as_bytes()[..WAL_HEADER_SIZE - 2 * 4], // first 24 bytes - &wal_header, - checksums, - native, // this is false because we haven't encoded the wal header yet - ); - wal_header.checksum_1 = checksums.0; - wal_header.checksum_2 = checksums.1; - self.last_checksum.replace(checksums); - sqlite3_ondisk::begin_write_wal_header(&file, &wal_header)?; - self.wal_header - .replace(Some(Rc::new(RefCell::new(wal_header)))); - } - *self.file.borrow_mut() = Some(file); - } - Err(err) => panic!("{:?} {}", err, &self.wal_path), - }; - } - Ok(()) - } - fn frame_offset(&self, frame_id: u64) -> usize { - let header = self.wal_header.borrow(); - let header = header.as_ref().unwrap().borrow(); - let page_size = header.page_size; + let page_size = self.page_size; let page_offset = frame_id * (page_size as u64 + WAL_FRAME_HEADER_SIZE as u64); let offset = WAL_HEADER_SIZE as u64 + page_offset; offset as usize } } + +impl WalFileShared { + pub fn open_shared( + io: &Arc, + path: &str, + page_size: u16, + ) -> Result>> { + let file = io.open_file(path, crate::io::OpenFlags::Create, false)?; + let header = if file.size()? > 0 { + let wal_header = match sqlite3_ondisk::begin_read_wal_header(&file) { + Ok(header) => header, + Err(err) => panic!("Couldn't read header page: {:?}", err), + }; + log::info!("recover not implemented yet"); + // TODO: Return a completion instead. + io.run_once()?; + wal_header + } else { + let magic = if cfg!(target_endian = "big") { + WAL_MAGIC_BE + } else { + WAL_MAGIC_LE + }; + let mut wal_header = WalHeader { + magic, + file_format: 3007000, + page_size: page_size as u32, + checkpoint_seq: 0, // TODO implement sequence number + salt_1: 0, // TODO implement salt + salt_2: 0, + checksum_1: 0, + checksum_2: 0, + }; + let native = cfg!(target_endian = "big"); // if target_endian is + // already big then we don't care but if isn't, header hasn't yet been + // encoded to big endian, therefore we wan't to swap bytes to compute this + // checksum. + let checksums = (0, 0); + let checksums = checksum_wal( + &wal_header.as_bytes()[..WAL_HEADER_SIZE - 2 * 4], // first 24 bytes + &wal_header, + checksums, + native, // this is false because we haven't encoded the wal header yet + ); + wal_header.checksum_1 = checksums.0; + wal_header.checksum_2 = checksums.1; + sqlite3_ondisk::begin_write_wal_header(&file, &wal_header)?; + Arc::new(RwLock::new(wal_header)) + }; + let checksum = { + let checksum = header.read().unwrap(); + (checksum.checksum_1, checksum.checksum_2) + }; + let shared = WalFileShared { + wal_header: header, + min_frame: 0, + max_frame: 0, + nbackfills: 0, + frame_cache: HashMap::new(), + last_checksum: checksum, + file, + }; + Ok(Arc::new(RwLock::new(shared))) + } +} diff --git a/simulator/main.rs b/simulator/main.rs index 3c71bfef5..b7af9854e 100644 --- a/simulator/main.rs +++ b/simulator/main.rs @@ -13,7 +13,7 @@ struct SimulatorEnv { tables: Vec, connections: Vec, io: Arc, - db: Rc, + db: Arc, rng: ChaCha8Rng, } diff --git a/sqlite3/src/lib.rs b/sqlite3/src/lib.rs index ac37d8ea2..6a86a04fe 100644 --- a/sqlite3/src/lib.rs +++ b/sqlite3/src/lib.rs @@ -33,7 +33,7 @@ pub mod util; use util::sqlite3_safety_check_sick_or_ok; pub struct sqlite3 { - pub(crate) _db: Rc, + pub(crate) _db: Arc, pub(crate) conn: Rc, pub(crate) err_code: ffi::c_int, pub(crate) err_mask: ffi::c_int, @@ -43,7 +43,7 @@ pub struct sqlite3 { } impl sqlite3 { - pub fn new(db: Rc, conn: Rc) -> Self { + pub fn new(db: Arc, conn: Rc) -> Self { Self { _db: db, conn,