diff --git a/bindings/wasm/lib.rs b/bindings/wasm/lib.rs index 3d3290426..a1febb5d7 100644 --- a/bindings/wasm/lib.rs +++ b/bindings/wasm/lib.rs @@ -1,4 +1,4 @@ -use limbo_core::{maybe_init_database_file, OpenFlags, Pager, Result, WalFile}; +use limbo_core::{maybe_init_database_file, OpenFlags, Pager, Result, WalFile, WalFileShared}; use std::cell::RefCell; use std::rc::Rc; use std::sync::Arc; @@ -22,13 +22,21 @@ impl Database { maybe_init_database_file(&file, &io).unwrap(); let page_io = Rc::new(DatabaseStorage::new(file)); let db_header = Pager::begin_open(page_io.clone()).unwrap(); + + // ensure db header is there + io.run_once().unwrap(); + let wal_path = format!("{}-wal", path); + let wal_shared = + WalFileShared::open_shared(&io, wal_path.as_str(), db_header.borrow().page_size) + .unwrap(); let wal = Rc::new(RefCell::new(WalFile::new( io.clone(), - wal_path, db_header.borrow().page_size as usize, + wal_shared.clone(), ))); - let db = limbo_core::Database::open(io, page_io, wal).unwrap(); + + let db = limbo_core::Database::open(io, page_io, wal, wal_shared).unwrap(); let conn = db.connect(); Database { db, conn } } diff --git a/core/lib.rs b/core/lib.rs index c883acc65..d5619cac4 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -28,8 +28,9 @@ use storage::btree::btree_init_page; #[cfg(feature = "fs")] use storage::database::FileStorage; use storage::pager::{allocate_page, DumbLruPageCache}; -use storage::sqlite3_ondisk::{DatabaseHeader, DATABASE_HEADER_SIZE}; +use storage::sqlite3_ondisk::{DatabaseHeader, WalHeader, DATABASE_HEADER_SIZE}; pub use storage::wal::WalFile; +pub use storage::wal::WalFileShared; use util::parse_schema_rows; use translate::optimizer::optimize_plan; @@ -64,30 +65,38 @@ pub struct Database { schema: Rc>, header: Rc>, transaction_state: RefCell, + // Shared structures of a Database are the parts that are common to multiple threads that might + // create DB connections. shared_page_cache: Arc>, + shared_wal: Arc>, } impl Database { #[cfg(feature = "fs")] pub fn open_file(io: Arc, path: &str) -> Result> { + use storage::wal::WalFileShared; + let file = io.open_file(path, io::OpenFlags::Create, true)?; maybe_init_database_file(&file, &io)?; let page_io = Rc::new(FileStorage::new(file)); let wal_path = format!("{}-wal", path); let db_header = Pager::begin_open(page_io.clone())?; io.run_once()?; + let wal_shared = + WalFileShared::open_shared(&io, wal_path.as_str(), db_header.borrow().page_size)?; let wal = Rc::new(RefCell::new(WalFile::new( io.clone(), - wal_path, db_header.borrow().page_size as usize, + wal_shared.clone(), ))); - Self::open(io, page_io, wal) + Self::open(io, page_io, wal, wal_shared) } pub fn open( io: Arc, page_io: Rc, wal: Rc>, + shared_wal: Arc>, ) -> Result> { let db_header = Pager::begin_open(page_io.clone())?; io.run_once()?; @@ -122,6 +131,7 @@ impl Database { header, transaction_state: RefCell::new(TransactionState::None), shared_page_cache, + shared_wal, })) } diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 3f7935491..dd992615d 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -52,6 +52,7 @@ use log::trace; use std::cell::RefCell; use std::pin::Pin; use std::rc::Rc; +use std::sync::{Arc, RwLock}; /// The size of the database header in bytes. pub const DATABASE_HEADER_SIZE: usize = 100; @@ -95,7 +96,7 @@ pub const WAL_FRAME_HEADER_SIZE: usize = 24; pub const WAL_MAGIC_LE: u32 = 0x377f0682; pub const WAL_MAGIC_BE: u32 = 0x377f0683; -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone)] #[repr(C)] // This helps with encoding because rust does not respect the order in structs, so in // this case we want to keep the order pub struct WalHeader { @@ -1006,10 +1007,10 @@ pub fn write_varint_to_vec(value: u64, payload: &mut Vec) { payload.extend_from_slice(&varint); } -pub fn begin_read_wal_header(io: &Rc) -> Result>> { +pub fn begin_read_wal_header(io: &Rc) -> Result>> { let drop_fn = Rc::new(|_buf| {}); let buf = Rc::new(RefCell::new(Buffer::allocate(512, drop_fn))); - let result = Rc::new(RefCell::new(WalHeader::default())); + let result = Arc::new(RwLock::new(WalHeader::default())); let header = result.clone(); let complete = Box::new(move |buf: Rc>| { let header = header.clone(); @@ -1020,10 +1021,10 @@ pub fn begin_read_wal_header(io: &Rc) -> Result> Ok(result) } -fn finish_read_wal_header(buf: Rc>, header: Rc>) -> Result<()> { +fn finish_read_wal_header(buf: Rc>, header: Arc>) -> Result<()> { let buf = buf.borrow(); let buf = buf.as_slice(); - let mut header = header.borrow_mut(); + let mut header = header.write().unwrap(); header.magic = u32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]); header.file_format = u32::from_be_bytes([buf[4], buf[5], buf[6], buf[7]]); header.page_size = u32::from_be_bytes([buf[8], buf[9], buf[10], buf[11]]); diff --git a/core/storage/wal.rs b/core/storage/wal.rs index ad5e9eb40..68264ca35 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -1,4 +1,5 @@ use std::collections::{HashMap, HashSet}; +use std::sync::RwLock; use std::{cell::RefCell, rc::Rc, sync::Arc}; use log::{debug, trace}; @@ -7,8 +8,8 @@ use crate::io::{File, SyncCompletion, IO}; use crate::storage::sqlite3_ondisk::{ begin_read_wal_frame, begin_write_wal_frame, WAL_FRAME_HEADER_SIZE, WAL_HEADER_SIZE, }; -use crate::Completion; use crate::{storage::pager::Page, Result}; +use crate::{Completion, OpenFlags}; use self::sqlite3_ondisk::{checksum_wal, WAL_MAGIC_BE, WAL_MAGIC_LE}; @@ -61,22 +62,24 @@ pub trait Wal { pub struct WalFile { io: Arc, - wal_path: String, - file: RefCell>>, - wal_header: RefCell>>>, - min_frame: RefCell, - max_frame: RefCell, - nbackfills: RefCell, - // Maps pgno to frame id and offset in wal file - frame_cache: RefCell>>, // FIXME: for now let's use a simple hashmap instead of a shm file - checkpoint_threshold: usize, - ongoing_checkpoint: HashSet, syncing: Rc>, page_size: usize, - last_checksum: RefCell<(u32, u32)>, // Check of last frame in WAL, this is a cumulative checksum - // over all frames in the WAL + ongoing_checkpoint: HashSet, + shared: Arc>, + checkpoint_threshold: usize, +} + +pub struct WalFileShared { + wal_header: Arc>, + min_frame: u64, + max_frame: u64, + nbackfills: u64, + // Maps pgno to frame id and offset in wal file + frame_cache: HashMap>, // FIXME: for now let's use a simple hashmap instead of a shm file + last_checksum: (u32, u32), // Check of last frame in WAL, this is a cumulative checksum over all frames in the WAL + file: Rc, } pub enum CheckpointStatus { @@ -87,7 +90,8 @@ pub enum CheckpointStatus { impl Wal for WalFile { /// Begin a read transaction. fn begin_read_tx(&self) -> Result<()> { - self.min_frame.replace(*self.nbackfills.borrow() + 1); + let mut shared = self.shared.write().unwrap(); + shared.min_frame = shared.nbackfills + 1; Ok(()) } @@ -98,15 +102,14 @@ impl Wal for WalFile { /// Find the latest frame containing a page. fn find_frame(&self, page_id: u64) -> Result> { - let frame_cache = self.frame_cache.borrow(); - let frames = frame_cache.get(&page_id); + let shared = self.shared.read().unwrap(); + let frames = shared.frame_cache.get(&page_id); if frames.is_none() { return Ok(None); } - self.ensure_init()?; let frames = frames.unwrap(); for frame in frames.iter().rev() { - if *frame <= *self.max_frame.borrow() { + if *frame <= shared.max_frame { return Ok(Some(*frame)); } } @@ -122,8 +125,9 @@ impl Wal for WalFile { ) -> Result<()> { debug!("read_frame({})", frame_id); let offset = self.frame_offset(frame_id); + let shared = self.shared.read().unwrap(); begin_read_wal_frame( - self.file.borrow().as_ref().unwrap(), + &shared.file, offset + WAL_FRAME_HEADER_SIZE, buffer_pool, page, @@ -139,9 +143,9 @@ impl Wal for WalFile { _pager: &Pager, write_counter: Rc>, ) -> Result<()> { - self.ensure_init()?; let page_id = page.borrow().id; - let frame_id = *self.max_frame.borrow(); + let mut shared = self.shared.write().unwrap(); + let frame_id = shared.max_frame; let offset = self.frame_offset(frame_id); trace!( "append_frame(frame={}, offset={}, page_id={})", @@ -149,12 +153,11 @@ impl Wal for WalFile { offset, page_id ); - let header = self.wal_header.borrow(); - let header = header.as_ref().unwrap(); - let header = header.borrow(); - let checksums = *self.last_checksum.borrow(); + let header = shared.wal_header.clone(); + let header = header.read().unwrap(); + let checksums = shared.last_checksum; let checksums = begin_write_wal_frame( - self.file.borrow().as_ref().unwrap(), + &shared.file, offset, &page, db_size, @@ -162,15 +165,14 @@ impl Wal for WalFile { &header, checksums, )?; - self.last_checksum.replace(checksums); - self.max_frame.replace(frame_id + 1); + shared.last_checksum = checksums; + shared.max_frame = frame_id + 1; { - let mut frame_cache = self.frame_cache.borrow_mut(); - let frames = frame_cache.get_mut(&(page_id as u64)); + let frames = shared.frame_cache.get_mut(&(page_id as u64)); match frames { Some(frames) => frames.push(frame_id), None => { - frame_cache.insert(page_id as u64, vec![frame_id]); + shared.frame_cache.insert(page_id as u64, vec![frame_id]); } } } @@ -188,7 +190,8 @@ impl Wal for WalFile { } fn should_checkpoint(&self) -> bool { - let frame_id = *self.max_frame.borrow() as usize; + let shared = self.shared.read().unwrap(); + let frame_id = shared.max_frame as usize; frame_id >= self.checkpoint_threshold } @@ -197,7 +200,8 @@ impl Wal for WalFile { pager: &Pager, write_counter: Rc>, ) -> Result { - for (page_id, _frames) in self.frame_cache.borrow().iter() { + let mut shared = self.shared.write().unwrap(); + for (page_id, _frames) in shared.frame_cache.iter() { // move page from WAL to database file // TODO(Pere): use splice syscall in linux to do zero-copy file page movements to improve perf let page_id = *page_id as usize; @@ -214,16 +218,15 @@ impl Wal for WalFile { self.ongoing_checkpoint.insert(page_id); } - self.frame_cache.borrow_mut().clear(); - *self.max_frame.borrow_mut() = 0; + // TODO: only clear checkpointed frames + shared.frame_cache.clear(); + shared.max_frame = 0; self.ongoing_checkpoint.clear(); Ok(CheckpointStatus::Done) } fn sync(&mut self) -> Result { - self.ensure_init()?; - let file = self.file.borrow(); - let file = file.as_ref().unwrap(); + let shared = self.shared.write().unwrap(); { let syncing = self.syncing.clone(); let completion = Completion::Sync(SyncCompletion { @@ -231,7 +234,7 @@ impl Wal for WalFile { *syncing.borrow_mut() = false; }), }); - file.sync(Rc::new(completion))?; + shared.file.sync(Rc::new(completion))?; } if *self.syncing.borrow() { @@ -243,87 +246,86 @@ impl Wal for WalFile { } impl WalFile { - pub fn new(io: Arc, wal_path: String, page_size: usize) -> Self { + pub fn new(io: Arc, page_size: usize, shared: Arc>) -> Self { Self { io, - wal_path, - file: RefCell::new(None), - wal_header: RefCell::new(None), - frame_cache: RefCell::new(HashMap::new()), - min_frame: RefCell::new(0), - max_frame: RefCell::new(0), - nbackfills: RefCell::new(0), - checkpoint_threshold: 1000, + shared, ongoing_checkpoint: HashSet::new(), syncing: Rc::new(RefCell::new(false)), + checkpoint_threshold: 1000, page_size, - last_checksum: RefCell::new((0, 0)), } } - fn ensure_init(&self) -> Result<()> { - if self.file.borrow().is_none() { - match self - .io - .open_file(&self.wal_path, crate::io::OpenFlags::Create, false) - { - Ok(file) => { - if file.size()? > 0 { - let wal_header = match sqlite3_ondisk::begin_read_wal_header(&file) { - Ok(header) => header, - Err(err) => panic!("Couldn't read header page: {:?}", err), - }; - // TODO: Return a completion instead. - self.io.run_once()?; - self.wal_header.replace(Some(wal_header)); - } else { - let magic = if cfg!(target_endian = "big") { - WAL_MAGIC_BE - } else { - WAL_MAGIC_LE - }; - let mut wal_header = WalHeader { - magic, - file_format: 3007000, - page_size: self.page_size as u32, - checkpoint_seq: 0, // TODO implement sequence number - salt_1: 0, // TODO implement salt - salt_2: 0, - checksum_1: 0, - checksum_2: 0, - }; - let native = cfg!(target_endian = "big"); // if target_endian is - // already big then we don't care but if isn't, header hasn't yet been - // encoded to big endian, therefore we wan't to swap bytes to compute this - // checksum. - let checksums = *self.last_checksum.borrow_mut(); - let checksums = checksum_wal( - &wal_header.as_bytes()[..WAL_HEADER_SIZE - 2 * 4], // first 24 bytes - &wal_header, - checksums, - native, // this is false because we haven't encoded the wal header yet - ); - wal_header.checksum_1 = checksums.0; - wal_header.checksum_2 = checksums.1; - self.last_checksum.replace(checksums); - sqlite3_ondisk::begin_write_wal_header(&file, &wal_header)?; - self.wal_header - .replace(Some(Rc::new(RefCell::new(wal_header)))); - } - *self.file.borrow_mut() = Some(file); - } - Err(err) => panic!("{:?} {}", err, &self.wal_path), - }; - } - Ok(()) - } - fn frame_offset(&self, frame_id: u64) -> usize { - let header = self.wal_header.borrow(); - let header = header.as_ref().unwrap().borrow(); - let page_size = header.page_size; + let page_size = self.page_size; let page_offset = frame_id * (page_size as u64 + WAL_FRAME_HEADER_SIZE as u64); let offset = WAL_HEADER_SIZE as u64 + page_offset; offset as usize } } + +impl WalFileShared { + pub fn open_shared( + io: &Arc, + path: &str, + page_size: u16, + ) -> Result>> { + let file = io.open_file(path, crate::io::OpenFlags::Create, false)?; + let header = if file.size()? > 0 { + let wal_header = match sqlite3_ondisk::begin_read_wal_header(&file) { + Ok(header) => header, + Err(err) => panic!("Couldn't read header page: {:?}", err), + }; + log::info!("recover not implemented yet"); + // TODO: Return a completion instead. + io.run_once()?; + wal_header + } else { + let magic = if cfg!(target_endian = "big") { + WAL_MAGIC_BE + } else { + WAL_MAGIC_LE + }; + let mut wal_header = WalHeader { + magic, + file_format: 3007000, + page_size: page_size as u32, + checkpoint_seq: 0, // TODO implement sequence number + salt_1: 0, // TODO implement salt + salt_2: 0, + checksum_1: 0, + checksum_2: 0, + }; + let native = cfg!(target_endian = "big"); // if target_endian is + // already big then we don't care but if isn't, header hasn't yet been + // encoded to big endian, therefore we wan't to swap bytes to compute this + // checksum. + let checksums = (0, 0); + let checksums = checksum_wal( + &wal_header.as_bytes()[..WAL_HEADER_SIZE - 2 * 4], // first 24 bytes + &wal_header, + checksums, + native, // this is false because we haven't encoded the wal header yet + ); + wal_header.checksum_1 = checksums.0; + wal_header.checksum_2 = checksums.1; + sqlite3_ondisk::begin_write_wal_header(&file, &wal_header)?; + Arc::new(RwLock::new(wal_header)) + }; + let checksum = { + let checksum = header.read().unwrap(); + (checksum.checksum_1, checksum.checksum_2) + }; + let shared = WalFileShared { + wal_header: header, + min_frame: 0, + max_frame: 0, + nbackfills: 0, + frame_cache: HashMap::new(), + last_checksum: checksum, + file, + }; + Ok(Arc::new(RwLock::new(shared))) + } +}