diff --git a/bindings/wasm/lib.rs b/bindings/wasm/lib.rs index 77ad9e6cb..6527a0da3 100644 --- a/bindings/wasm/lib.rs +++ b/bindings/wasm/lib.rs @@ -1,4 +1,4 @@ -use limbo_core::{Result, IO}; +use limbo_core::{OpenFlags, Result, IO}; use std::rc::Rc; use std::sync::Arc; use wasm_bindgen::prelude::*; @@ -14,7 +14,7 @@ impl Database { #[wasm_bindgen(constructor)] pub fn new(path: &str) -> Database { let io = Arc::new(PlatformIO { vfs: VFS::new() }); - let file = io.open_file(path).unwrap(); + let file = io.open_file(path, limbo_core::OpenFlags::None).unwrap(); let page_io = Rc::new(DatabaseStorage::new(file)); let wal = Rc::new(Wal {}); let inner = limbo_core::Database::open(io, page_io, wal).unwrap(); @@ -78,7 +78,7 @@ pub struct PlatformIO { } impl limbo_core::IO for PlatformIO { - fn open_file(&self, path: &str) -> Result> { + fn open_file(&self, path: &str, flags: OpenFlags) -> Result> { let fd = self.vfs.open(path); Ok(Rc::new(File { vfs: VFS::new(), diff --git a/core/io/common.rs b/core/io/common.rs index a29c6b7ab..6627e2076 100644 --- a/core/io/common.rs +++ b/core/io/common.rs @@ -13,7 +13,7 @@ pub mod tests { // Parent process opens the file let io1 = create_io().expect("Failed to create IO"); let _file1 = io1 - .open_file(&path) + .open_file(&path, crate::io::OpenFlags::None) .expect("Failed to open file in parent process"); let current_exe = std::env::current_exe().expect("Failed to get current executable path"); @@ -38,7 +38,7 @@ pub mod tests { if std::env::var("RUST_TEST_CHILD_PROCESS").is_ok() { let path = std::env::var("RUST_TEST_FILE_PATH")?; let io = create_io()?; - match io.open_file(&path) { + match io.open_file(&path, crate::io::OpenFlags::None) { Ok(_) => std::process::exit(0), Err(_) => std::process::exit(1), } diff --git a/core/io/darwin.rs b/core/io/darwin.rs index 141a16541..01be41d4c 100644 --- a/core/io/darwin.rs +++ b/core/io/darwin.rs @@ -2,7 +2,7 @@ use crate::error::LimboError; use crate::io::common; use crate::Result; -use super::{Completion, File, IO}; +use super::{Completion, File, OpenFlags, IO}; use libc::{c_short, fcntl, flock, F_SETLK}; use log::trace; use polling::{Event, Events, Poller}; @@ -31,12 +31,13 @@ impl DarwinIO { } impl IO for DarwinIO { - fn open_file(&self, path: &str) -> Result> { + fn open_file(&self, path: &str, flags: OpenFlags) -> Result> { trace!("open_file(path = {})", path); let file = std::fs::File::options() .read(true) .custom_flags(libc::O_NONBLOCK) .write(true) + .create(matches!(flags, OpenFlags::Create)) .open(path)?; let darwin_file = Rc::new(DarwinFile { diff --git a/core/io/linux.rs b/core/io/linux.rs index 3542e2870..4a4187e41 100644 --- a/core/io/linux.rs +++ b/core/io/linux.rs @@ -1,4 +1,4 @@ -use super::{common, Completion, File, IO}; +use super::{common, Completion, File, OpenFlags, IO}; use crate::{LimboError, Result}; use libc::{c_short, fcntl, flock, iovec, F_SETLK}; use log::{debug, trace}; @@ -102,9 +102,13 @@ impl WrappedIOUring { } impl IO for LinuxIO { - fn open_file(&self, path: &str) -> Result> { + fn open_file(&self, path: &str, flags: OpenFlags) -> Result> { trace!("open_file(path = {})", path); - let file = std::fs::File::options().read(true).write(true).open(path)?; + let file = std::fs::File::options() + .read(true) + .write(true) + .create(matches!(flags, OpenFlags::Create)) + .open(path)?; // Let's attempt to enable direct I/O. Not all filesystems support it // so ignore any errors. let fd = file.as_raw_fd(); diff --git a/core/io/mod.rs b/core/io/mod.rs index ea4006f92..7de057769 100644 --- a/core/io/mod.rs +++ b/core/io/mod.rs @@ -16,8 +16,13 @@ pub trait File { fn pwrite(&self, pos: usize, buffer: Rc>, c: Rc) -> Result<()>; } +pub enum OpenFlags { + None, + Create, +} + pub trait IO { - fn open_file(&self, path: &str) -> Result>; + fn open_file(&self, path: &str, flags: OpenFlags) -> Result>; fn run_once(&self) -> Result<()>; diff --git a/core/io/windows.rs b/core/io/windows.rs index 2916f6719..59f40aa1c 100644 --- a/core/io/windows.rs +++ b/core/io/windows.rs @@ -1,4 +1,4 @@ -use crate::{Completion, File, Result, WriteCompletion, IO}; +use crate::{Completion, File, OpenFlags, Result, WriteCompletion, IO}; use log::trace; use std::cell::RefCell; use std::io::{Read, Seek, Write}; @@ -13,9 +13,13 @@ impl WindowsIO { } impl IO for WindowsIO { - fn open_file(&self, path: &str) -> Result> { + fn open_file(&self, path: &str, flags: OpenFlags) -> Result> { trace!("open_file(path = {})", path); - let file = std::fs::File::open(path)?; + let file = std::fs::File::options() + .read(true) + .write(true) + .create(matches!(flags, OpenFlags::Create)) + .open(path)?; Ok(Rc::new(WindowsFile { file: RefCell::new(file), })) diff --git a/core/lib.rs b/core/lib.rs index 0581c4dd6..c4a332a6d 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -20,7 +20,6 @@ use schema::Schema; use sqlite3_parser::ast; use sqlite3_parser::{ast::Cmd, lexer::sql::Parser}; use std::rc::Weak; -use std::sync::Arc; use std::sync::{Arc, OnceLock}; use std::{cell::RefCell, rc::Rc}; #[cfg(feature = "fs")] @@ -36,6 +35,7 @@ use translate::planner::prepare_select_plan; pub use error::LimboError; pub type Result = std::result::Result; +pub use io::OpenFlags; #[cfg(feature = "fs")] pub use io::PlatformIO; pub use io::{Buffer, Completion, File, WriteCompletion, IO}; @@ -63,17 +63,17 @@ pub struct Database { impl Database { #[cfg(feature = "fs")] pub fn open_file(io: Arc, path: &str) -> Result> { - let file = io.open_file(path)?; + let file = io.open_file(path, io::OpenFlags::None)?; let page_io = Rc::new(FileStorage::new(file)); let wal_path = format!("{}-wal", path); - let wal = Rc::new(WalFile::new(io.clone(), wal_path)); + let wal = Rc::new(RefCell::new(WalFile::new(io.clone(), wal_path))); Self::open(io, page_io, wal) } pub fn open( io: Arc, page_io: Rc, - wal: Rc, + wal: Rc>, ) -> Result> { let db_header = Pager::begin_open(page_io.clone())?; DATABASE_VERSION.get_or_init(|| { @@ -271,6 +271,11 @@ impl Connection { self.pager.cacheflush()?; Ok(()) } + + pub fn clear_page_cache(&self) -> Result<()> { + self.pager.clear_page_cache(); + Ok(()) + } } pub struct Statement { diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 025ce0e24..a62710905 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -13,6 +13,8 @@ use std::rc::Rc; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; +use super::wal::CheckpointStatus; + pub struct Page { pub flags: AtomicUsize, pub contents: RwLock>, @@ -230,6 +232,13 @@ impl DumbLruPageCache { } self.detach(tail); } + + fn clear(&mut self) { + let to_remove: Vec = self.map.borrow().iter().map(|v| *v.0).collect(); + for key in to_remove { + self.delete(key); + } + } } #[allow(dead_code)] @@ -263,7 +272,7 @@ pub struct Pager { /// Source of the database pages. pub page_io: Rc, /// The write-ahead log (WAL) for the database. - wal: Rc, + wal: Rc>, /// A page cache for the database. page_cache: RefCell, /// Buffer pool for temporary data storage. @@ -284,7 +293,7 @@ impl Pager { pub fn finish_open( db_header_ref: Rc>, page_io: Rc, - wal: Rc, + wal: Rc>, io: Arc, ) -> Result { let db_header = RefCell::borrow(&db_header_ref); @@ -303,18 +312,19 @@ impl Pager { } pub fn begin_read_tx(&self) -> Result<()> { - self.wal.begin_read_tx()?; + self.wal.borrow().begin_read_tx()?; Ok(()) } pub fn begin_write_tx(&self) -> Result<()> { - self.wal.begin_read_tx()?; + self.wal.borrow().begin_read_tx()?; Ok(()) } - pub fn end_tx(&self) -> Result<()> { - self.wal.end_read_tx()?; - Ok(()) + pub fn end_tx(&self) -> Result { + self.cacheflush()?; + self.wal.borrow().end_read_tx()?; + Ok(CheckpointStatus::Done) } /// Reads a page from the database. @@ -326,9 +336,10 @@ impl Pager { } let page = Rc::new(RefCell::new(Page::new(page_idx))); RefCell::borrow(&page).set_locked(); - if let Some(frame_id) = self.wal.find_frame(page_idx as u64)? { + if let Some(frame_id) = self.wal.borrow().find_frame(page_idx as u64)? { dbg!(frame_id); self.wal + .borrow() .read_frame(frame_id, page.clone(), self.buffer_pool.clone())?; { let page = page.borrow_mut(); @@ -363,20 +374,37 @@ impl Pager { dirty_pages.insert(page_id); } - pub fn cacheflush(&self) -> Result<()> { - let mut dirty_pages = RefCell::borrow_mut(&self.dirty_pages); - if dirty_pages.len() == 0 { - return Ok(()); - } + pub fn cacheflush(&self) -> Result { let db_size = self.db_header.borrow().database_size; - for page_id in dirty_pages.iter() { + for page_id in self.dirty_pages.borrow().iter() { let mut cache = self.page_cache.borrow_mut(); let page = cache.get(&page_id).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it."); - self.wal.append_frame(page.clone(), db_size, self)?; + self.wal + .borrow_mut() + .append_frame(page.clone(), db_size, self)?; } - dirty_pages.clear(); - self.io.run_once()?; - Ok(()) + // remove before checkpoint so we can retry cacheflush if needed + self.dirty_pages.borrow_mut().clear(); + + let should_checkpoint = self.wal.borrow().should_checkpoint(); + if should_checkpoint { + self.wal.borrow_mut().checkpoint(self)?; + } + Ok(CheckpointStatus::Done) + } + + // WARN: used for testing purposes + pub fn clear_page_cache(&self) { + loop { + match self.wal.borrow_mut().checkpoint(self) { + Ok(CheckpointStatus::IO) => {} + Ok(CheckpointStatus::Done) => { + break; + } + Err(err) => panic!("error while clearing cache {}", err), + } + } + self.page_cache.borrow_mut().clear(); } /* diff --git a/core/storage/wal.rs b/core/storage/wal.rs index eb77c204a..9acef6f83 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::{cell::RefCell, rc::Rc, sync::Arc}; use crate::io::{File, IO}; @@ -38,14 +38,10 @@ pub trait Wal { ) -> Result<()>; /// Write a frame to the WAL. - fn append_frame( - &self, - page: Rc>, - db_size: u32, - pager: &Pager, - ) -> Result; + fn append_frame(&mut self, page: Rc>, db_size: u32, pager: &Pager) -> Result<()>; - fn checkpoint(&self, pager: &Pager) -> Result; + fn should_checkpoint(&self) -> bool; + fn checkpoint(&mut self, pager: &Pager) -> Result; } #[cfg(feature = "fs")] @@ -60,9 +56,10 @@ pub struct WalFile { // Maps pgno to frame id and offset in wal file frame_cache: RefCell>>, // FIXME: for now let's use a simple hashmap instead of a shm file checkpoint_threshold: usize, + ongoing_checkpoint: HashSet, } -enum CheckpointStatus { +pub enum CheckpointStatus { Done, IO, } @@ -83,9 +80,7 @@ impl Wal for WalFile { /// Find the latest frame containing a page. fn find_frame(&self, page_id: u64) -> Result> { let frame_cache = self.frame_cache.borrow(); - dbg!(&frame_cache); let frames = frame_cache.get(&page_id); - dbg!(&frames); if frames.is_none() { return Ok(None); } @@ -106,7 +101,6 @@ impl Wal for WalFile { page: Rc>, buffer_pool: Rc, ) -> Result<()> { - println!("read frame {}", frame_id); let offset = self.frame_offset(frame_id); begin_read_wal_frame( self.file.borrow().as_ref().unwrap(), @@ -118,26 +112,23 @@ impl Wal for WalFile { } /// Write a frame to the WAL. - fn append_frame(&self, page: Rc>, db_size: u32, pager: &Pager) -> Result<()> { + fn append_frame(&mut self, page: Rc>, db_size: u32, pager: &Pager) -> Result<()> { self.ensure_init()?; let page_id = page.borrow().id; let frame_id = *self.max_frame.borrow(); let offset = self.frame_offset(frame_id); - println!("appending {} at {}", frame_id, offset); begin_write_wal_frame(self.file.borrow().as_ref().unwrap(), offset, &page, db_size)?; self.max_frame.replace(frame_id + 1); - let mut frame_cache = self.frame_cache.borrow_mut(); - let frames = frame_cache.get_mut(&(page_id as u64)); - match frames { - Some(frames) => frames.push(frame_id), - None => { - frame_cache.insert(page_id as u64, vec![frame_id]); + { + let mut frame_cache = self.frame_cache.borrow_mut(); + let frames = frame_cache.get_mut(&(page_id as u64)); + match frames { + Some(frames) => frames.push(frame_id), + None => { + frame_cache.insert(page_id as u64, vec![frame_id]); + } } } - dbg!(&frame_cache); - if (frame_id + 1) as usize >= self.checkpoint_threshold { - self.checkpoint(pager); - } Ok(()) } @@ -151,16 +142,31 @@ impl Wal for WalFile { Ok(()) } - fn checkpoint(&self, pager: &Pager) -> Result { - for (page_id, frames) in self.frame_cache.borrow().iter() { + fn should_checkpoint(&self) -> bool { + let frame_id = *self.max_frame.borrow() as usize; + if frame_id < self.checkpoint_threshold { + true + } else { + false + } + } + + fn checkpoint(&mut self, pager: &Pager) -> Result { + for (page_id, _frames) in self.frame_cache.borrow().iter() { // move page from WAL to database file // TODO(Pere): use splice syscall in linux to do zero-copy file page movements to improve perf - let page = pager.read_page(*page_id as usize)?; + let page_id = *page_id as usize; + let page = pager.read_page(page_id)?; if page.borrow().is_locked() { return Ok(CheckpointStatus::IO); } + pager.put_page(page_id, page); + self.ongoing_checkpoint.insert(page_id); } - Ok(()) + self.frame_cache.borrow_mut().clear(); + *self.max_frame.borrow_mut() = 0; + self.ongoing_checkpoint.clear(); + Ok(CheckpointStatus::Done) } } @@ -177,14 +183,16 @@ impl WalFile { max_frame: RefCell::new(0), nbackfills: RefCell::new(0), checkpoint_threshold: 1000, + ongoing_checkpoint: HashSet::new(), } } fn ensure_init(&self) -> Result<()> { - println!("ensure"); if self.file.borrow().is_none() { - println!("inside ensure"); - match self.io.open_file(&self.wal_path) { + match self + .io + .open_file(&self.wal_path, crate::io::OpenFlags::Create) + { Ok(file) => { *self.file.borrow_mut() = Some(file.clone()); let wal_header = match sqlite3_ondisk::begin_read_wal_header(file) { @@ -194,9 +202,8 @@ impl WalFile { // TODO: Return a completion instead. self.io.run_once()?; self.wal_header.replace(Some(wal_header)); - dbg!(&self.wal_header); } - Err(err) => panic!("{:?}", err), + Err(err) => panic!("{:?} {}", err, &self.wal_path), }; } Ok(()) diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index 71d4b6f54..0c292ff85 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -560,7 +560,6 @@ impl Program { state: &'a mut ProgramState, pager: Rc, ) -> Result> { - dbg!(&self.connection.upgrade().is_none()); loop { let insn = &self.insns[state.pc as usize]; trace_insn(self, state.pc as InsnReference, insn); @@ -1100,9 +1099,14 @@ impl Program { } } if self.auto_commit { - pager.end_tx()?; + return match pager.end_tx() { + Ok(crate::storage::wal::CheckpointStatus::IO) => Ok(StepResult::IO), + Ok(crate::storage::wal::CheckpointStatus::Done) => Ok(StepResult::Done), + Err(e) => Err(e), + }; + } else { + return Ok(StepResult::Done); } - return Ok(StepResult::Done); } Insn::Transaction { write } => { let connection = self.connection.upgrade().unwrap(); diff --git a/simulator/main.rs b/simulator/main.rs index 386c9c0dc..d9ebe320e 100644 --- a/simulator/main.rs +++ b/simulator/main.rs @@ -91,7 +91,7 @@ impl SimulatorIO { } impl IO for SimulatorIO { - fn open_file(&self, path: &str) -> Result> { + fn open_file(&self, path: &str, flags: OpenFlags) -> Result> { let inner = self.inner.open_file(path)?; let file = Rc::new(SimulatorFile { inner,