suspendable checkpoint

This commit is contained in:
Pere Diaz Bou
2024-09-27 13:01:16 +02:00
parent c0e51c4ca6
commit f009eb35c6
11 changed files with 131 additions and 73 deletions

View File

@@ -1,4 +1,4 @@
use limbo_core::{Result, IO};
use limbo_core::{OpenFlags, Result, IO};
use std::rc::Rc;
use std::sync::Arc;
use wasm_bindgen::prelude::*;
@@ -14,7 +14,7 @@ impl Database {
#[wasm_bindgen(constructor)]
pub fn new(path: &str) -> Database {
let io = Arc::new(PlatformIO { vfs: VFS::new() });
let file = io.open_file(path).unwrap();
let file = io.open_file(path, limbo_core::OpenFlags::None).unwrap();
let page_io = Rc::new(DatabaseStorage::new(file));
let wal = Rc::new(Wal {});
let inner = limbo_core::Database::open(io, page_io, wal).unwrap();
@@ -78,7 +78,7 @@ pub struct PlatformIO {
}
impl limbo_core::IO for PlatformIO {
fn open_file(&self, path: &str) -> Result<Rc<dyn limbo_core::File>> {
fn open_file(&self, path: &str, flags: OpenFlags) -> Result<Rc<dyn limbo_core::File>> {
let fd = self.vfs.open(path);
Ok(Rc::new(File {
vfs: VFS::new(),

View File

@@ -13,7 +13,7 @@ pub mod tests {
// Parent process opens the file
let io1 = create_io().expect("Failed to create IO");
let _file1 = io1
.open_file(&path)
.open_file(&path, crate::io::OpenFlags::None)
.expect("Failed to open file in parent process");
let current_exe = std::env::current_exe().expect("Failed to get current executable path");
@@ -38,7 +38,7 @@ pub mod tests {
if std::env::var("RUST_TEST_CHILD_PROCESS").is_ok() {
let path = std::env::var("RUST_TEST_FILE_PATH")?;
let io = create_io()?;
match io.open_file(&path) {
match io.open_file(&path, crate::io::OpenFlags::None) {
Ok(_) => std::process::exit(0),
Err(_) => std::process::exit(1),
}

View File

@@ -2,7 +2,7 @@ use crate::error::LimboError;
use crate::io::common;
use crate::Result;
use super::{Completion, File, IO};
use super::{Completion, File, OpenFlags, IO};
use libc::{c_short, fcntl, flock, F_SETLK};
use log::trace;
use polling::{Event, Events, Poller};
@@ -31,12 +31,13 @@ impl DarwinIO {
}
impl IO for DarwinIO {
fn open_file(&self, path: &str) -> Result<Rc<dyn File>> {
fn open_file(&self, path: &str, flags: OpenFlags) -> Result<Rc<dyn File>> {
trace!("open_file(path = {})", path);
let file = std::fs::File::options()
.read(true)
.custom_flags(libc::O_NONBLOCK)
.write(true)
.create(matches!(flags, OpenFlags::Create))
.open(path)?;
let darwin_file = Rc::new(DarwinFile {

View File

@@ -1,4 +1,4 @@
use super::{common, Completion, File, IO};
use super::{common, Completion, File, OpenFlags, IO};
use crate::{LimboError, Result};
use libc::{c_short, fcntl, flock, iovec, F_SETLK};
use log::{debug, trace};
@@ -102,9 +102,13 @@ impl WrappedIOUring {
}
impl IO for LinuxIO {
fn open_file(&self, path: &str) -> Result<Rc<dyn File>> {
fn open_file(&self, path: &str, flags: OpenFlags) -> Result<Rc<dyn File>> {
trace!("open_file(path = {})", path);
let file = std::fs::File::options().read(true).write(true).open(path)?;
let file = std::fs::File::options()
.read(true)
.write(true)
.create(matches!(flags, OpenFlags::Create))
.open(path)?;
// Let's attempt to enable direct I/O. Not all filesystems support it
// so ignore any errors.
let fd = file.as_raw_fd();

View File

@@ -16,8 +16,13 @@ pub trait File {
fn pwrite(&self, pos: usize, buffer: Rc<RefCell<Buffer>>, c: Rc<Completion>) -> Result<()>;
}
pub enum OpenFlags {
None,
Create,
}
pub trait IO {
fn open_file(&self, path: &str) -> Result<Rc<dyn File>>;
fn open_file(&self, path: &str, flags: OpenFlags) -> Result<Rc<dyn File>>;
fn run_once(&self) -> Result<()>;

View File

@@ -1,4 +1,4 @@
use crate::{Completion, File, Result, WriteCompletion, IO};
use crate::{Completion, File, OpenFlags, Result, WriteCompletion, IO};
use log::trace;
use std::cell::RefCell;
use std::io::{Read, Seek, Write};
@@ -13,9 +13,13 @@ impl WindowsIO {
}
impl IO for WindowsIO {
fn open_file(&self, path: &str) -> Result<Rc<dyn File>> {
fn open_file(&self, path: &str, flags: OpenFlags) -> Result<Rc<dyn File>> {
trace!("open_file(path = {})", path);
let file = std::fs::File::open(path)?;
let file = std::fs::File::options()
.read(true)
.write(true)
.create(matches!(flags, OpenFlags::Create))
.open(path)?;
Ok(Rc::new(WindowsFile {
file: RefCell::new(file),
}))

View File

@@ -20,7 +20,6 @@ use schema::Schema;
use sqlite3_parser::ast;
use sqlite3_parser::{ast::Cmd, lexer::sql::Parser};
use std::rc::Weak;
use std::sync::Arc;
use std::sync::{Arc, OnceLock};
use std::{cell::RefCell, rc::Rc};
#[cfg(feature = "fs")]
@@ -36,6 +35,7 @@ use translate::planner::prepare_select_plan;
pub use error::LimboError;
pub type Result<T> = std::result::Result<T, error::LimboError>;
pub use io::OpenFlags;
#[cfg(feature = "fs")]
pub use io::PlatformIO;
pub use io::{Buffer, Completion, File, WriteCompletion, IO};
@@ -63,17 +63,17 @@ pub struct Database {
impl Database {
#[cfg(feature = "fs")]
pub fn open_file(io: Arc<dyn IO>, path: &str) -> Result<Rc<Database>> {
let file = io.open_file(path)?;
let file = io.open_file(path, io::OpenFlags::None)?;
let page_io = Rc::new(FileStorage::new(file));
let wal_path = format!("{}-wal", path);
let wal = Rc::new(WalFile::new(io.clone(), wal_path));
let wal = Rc::new(RefCell::new(WalFile::new(io.clone(), wal_path)));
Self::open(io, page_io, wal)
}
pub fn open(
io: Arc<dyn IO>,
page_io: Rc<dyn DatabaseStorage>,
wal: Rc<dyn Wal>,
wal: Rc<RefCell<dyn Wal>>,
) -> Result<Rc<Database>> {
let db_header = Pager::begin_open(page_io.clone())?;
DATABASE_VERSION.get_or_init(|| {
@@ -271,6 +271,11 @@ impl Connection {
self.pager.cacheflush()?;
Ok(())
}
pub fn clear_page_cache(&self) -> Result<()> {
self.pager.clear_page_cache();
Ok(())
}
}
pub struct Statement {

View File

@@ -13,6 +13,8 @@ use std::rc::Rc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use super::wal::CheckpointStatus;
pub struct Page {
pub flags: AtomicUsize,
pub contents: RwLock<Option<PageContent>>,
@@ -230,6 +232,13 @@ impl DumbLruPageCache {
}
self.detach(tail);
}
fn clear(&mut self) {
let to_remove: Vec<usize> = self.map.borrow().iter().map(|v| *v.0).collect();
for key in to_remove {
self.delete(key);
}
}
}
#[allow(dead_code)]
@@ -263,7 +272,7 @@ pub struct Pager {
/// Source of the database pages.
pub page_io: Rc<dyn DatabaseStorage>,
/// The write-ahead log (WAL) for the database.
wal: Rc<dyn Wal>,
wal: Rc<RefCell<dyn Wal>>,
/// A page cache for the database.
page_cache: RefCell<DumbLruPageCache>,
/// Buffer pool for temporary data storage.
@@ -284,7 +293,7 @@ impl Pager {
pub fn finish_open(
db_header_ref: Rc<RefCell<DatabaseHeader>>,
page_io: Rc<dyn DatabaseStorage>,
wal: Rc<dyn Wal>,
wal: Rc<RefCell<dyn Wal>>,
io: Arc<dyn crate::io::IO>,
) -> Result<Self> {
let db_header = RefCell::borrow(&db_header_ref);
@@ -303,18 +312,19 @@ impl Pager {
}
pub fn begin_read_tx(&self) -> Result<()> {
self.wal.begin_read_tx()?;
self.wal.borrow().begin_read_tx()?;
Ok(())
}
pub fn begin_write_tx(&self) -> Result<()> {
self.wal.begin_read_tx()?;
self.wal.borrow().begin_read_tx()?;
Ok(())
}
pub fn end_tx(&self) -> Result<()> {
self.wal.end_read_tx()?;
Ok(())
pub fn end_tx(&self) -> Result<CheckpointStatus> {
self.cacheflush()?;
self.wal.borrow().end_read_tx()?;
Ok(CheckpointStatus::Done)
}
/// Reads a page from the database.
@@ -326,9 +336,10 @@ impl Pager {
}
let page = Rc::new(RefCell::new(Page::new(page_idx)));
RefCell::borrow(&page).set_locked();
if let Some(frame_id) = self.wal.find_frame(page_idx as u64)? {
if let Some(frame_id) = self.wal.borrow().find_frame(page_idx as u64)? {
dbg!(frame_id);
self.wal
.borrow()
.read_frame(frame_id, page.clone(), self.buffer_pool.clone())?;
{
let page = page.borrow_mut();
@@ -363,20 +374,37 @@ impl Pager {
dirty_pages.insert(page_id);
}
pub fn cacheflush(&self) -> Result<()> {
let mut dirty_pages = RefCell::borrow_mut(&self.dirty_pages);
if dirty_pages.len() == 0 {
return Ok(());
}
pub fn cacheflush(&self) -> Result<CheckpointStatus> {
let db_size = self.db_header.borrow().database_size;
for page_id in dirty_pages.iter() {
for page_id in self.dirty_pages.borrow().iter() {
let mut cache = self.page_cache.borrow_mut();
let page = cache.get(&page_id).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it.");
self.wal.append_frame(page.clone(), db_size, self)?;
self.wal
.borrow_mut()
.append_frame(page.clone(), db_size, self)?;
}
dirty_pages.clear();
self.io.run_once()?;
Ok(())
// remove before checkpoint so we can retry cacheflush if needed
self.dirty_pages.borrow_mut().clear();
let should_checkpoint = self.wal.borrow().should_checkpoint();
if should_checkpoint {
self.wal.borrow_mut().checkpoint(self)?;
}
Ok(CheckpointStatus::Done)
}
// WARN: used for testing purposes
pub fn clear_page_cache(&self) {
loop {
match self.wal.borrow_mut().checkpoint(self) {
Ok(CheckpointStatus::IO) => {}
Ok(CheckpointStatus::Done) => {
break;
}
Err(err) => panic!("error while clearing cache {}", err),
}
}
self.page_cache.borrow_mut().clear();
}
/*

View File

@@ -1,4 +1,4 @@
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::{cell::RefCell, rc::Rc, sync::Arc};
use crate::io::{File, IO};
@@ -38,14 +38,10 @@ pub trait Wal {
) -> Result<()>;
/// Write a frame to the WAL.
fn append_frame(
&self,
page: Rc<RefCell<Page>>,
db_size: u32,
pager: &Pager,
) -> Result<CheckpointStatus>;
fn append_frame(&mut self, page: Rc<RefCell<Page>>, db_size: u32, pager: &Pager) -> Result<()>;
fn checkpoint(&self, pager: &Pager) -> Result<CheckpointStatus>;
fn should_checkpoint(&self) -> bool;
fn checkpoint(&mut self, pager: &Pager) -> Result<CheckpointStatus>;
}
#[cfg(feature = "fs")]
@@ -60,9 +56,10 @@ pub struct WalFile {
// Maps pgno to frame id and offset in wal file
frame_cache: RefCell<HashMap<u64, Vec<u64>>>, // FIXME: for now let's use a simple hashmap instead of a shm file
checkpoint_threshold: usize,
ongoing_checkpoint: HashSet<usize>,
}
enum CheckpointStatus {
pub enum CheckpointStatus {
Done,
IO,
}
@@ -83,9 +80,7 @@ impl Wal for WalFile {
/// Find the latest frame containing a page.
fn find_frame(&self, page_id: u64) -> Result<Option<u64>> {
let frame_cache = self.frame_cache.borrow();
dbg!(&frame_cache);
let frames = frame_cache.get(&page_id);
dbg!(&frames);
if frames.is_none() {
return Ok(None);
}
@@ -106,7 +101,6 @@ impl Wal for WalFile {
page: Rc<RefCell<Page>>,
buffer_pool: Rc<BufferPool>,
) -> Result<()> {
println!("read frame {}", frame_id);
let offset = self.frame_offset(frame_id);
begin_read_wal_frame(
self.file.borrow().as_ref().unwrap(),
@@ -118,26 +112,23 @@ impl Wal for WalFile {
}
/// Write a frame to the WAL.
fn append_frame(&self, page: Rc<RefCell<Page>>, db_size: u32, pager: &Pager) -> Result<()> {
fn append_frame(&mut self, page: Rc<RefCell<Page>>, db_size: u32, pager: &Pager) -> Result<()> {
self.ensure_init()?;
let page_id = page.borrow().id;
let frame_id = *self.max_frame.borrow();
let offset = self.frame_offset(frame_id);
println!("appending {} at {}", frame_id, offset);
begin_write_wal_frame(self.file.borrow().as_ref().unwrap(), offset, &page, db_size)?;
self.max_frame.replace(frame_id + 1);
let mut frame_cache = self.frame_cache.borrow_mut();
let frames = frame_cache.get_mut(&(page_id as u64));
match frames {
Some(frames) => frames.push(frame_id),
None => {
frame_cache.insert(page_id as u64, vec![frame_id]);
{
let mut frame_cache = self.frame_cache.borrow_mut();
let frames = frame_cache.get_mut(&(page_id as u64));
match frames {
Some(frames) => frames.push(frame_id),
None => {
frame_cache.insert(page_id as u64, vec![frame_id]);
}
}
}
dbg!(&frame_cache);
if (frame_id + 1) as usize >= self.checkpoint_threshold {
self.checkpoint(pager);
}
Ok(())
}
@@ -151,16 +142,31 @@ impl Wal for WalFile {
Ok(())
}
fn checkpoint(&self, pager: &Pager) -> Result<CheckpointStatus> {
for (page_id, frames) in self.frame_cache.borrow().iter() {
fn should_checkpoint(&self) -> bool {
let frame_id = *self.max_frame.borrow() as usize;
if frame_id < self.checkpoint_threshold {
true
} else {
false
}
}
fn checkpoint(&mut self, pager: &Pager) -> Result<CheckpointStatus> {
for (page_id, _frames) in self.frame_cache.borrow().iter() {
// move page from WAL to database file
// TODO(Pere): use splice syscall in linux to do zero-copy file page movements to improve perf
let page = pager.read_page(*page_id as usize)?;
let page_id = *page_id as usize;
let page = pager.read_page(page_id)?;
if page.borrow().is_locked() {
return Ok(CheckpointStatus::IO);
}
pager.put_page(page_id, page);
self.ongoing_checkpoint.insert(page_id);
}
Ok(())
self.frame_cache.borrow_mut().clear();
*self.max_frame.borrow_mut() = 0;
self.ongoing_checkpoint.clear();
Ok(CheckpointStatus::Done)
}
}
@@ -177,14 +183,16 @@ impl WalFile {
max_frame: RefCell::new(0),
nbackfills: RefCell::new(0),
checkpoint_threshold: 1000,
ongoing_checkpoint: HashSet::new(),
}
}
fn ensure_init(&self) -> Result<()> {
println!("ensure");
if self.file.borrow().is_none() {
println!("inside ensure");
match self.io.open_file(&self.wal_path) {
match self
.io
.open_file(&self.wal_path, crate::io::OpenFlags::Create)
{
Ok(file) => {
*self.file.borrow_mut() = Some(file.clone());
let wal_header = match sqlite3_ondisk::begin_read_wal_header(file) {
@@ -194,9 +202,8 @@ impl WalFile {
// TODO: Return a completion instead.
self.io.run_once()?;
self.wal_header.replace(Some(wal_header));
dbg!(&self.wal_header);
}
Err(err) => panic!("{:?}", err),
Err(err) => panic!("{:?} {}", err, &self.wal_path),
};
}
Ok(())

View File

@@ -560,7 +560,6 @@ impl Program {
state: &'a mut ProgramState,
pager: Rc<Pager>,
) -> Result<StepResult<'a>> {
dbg!(&self.connection.upgrade().is_none());
loop {
let insn = &self.insns[state.pc as usize];
trace_insn(self, state.pc as InsnReference, insn);
@@ -1100,9 +1099,14 @@ impl Program {
}
}
if self.auto_commit {
pager.end_tx()?;
return match pager.end_tx() {
Ok(crate::storage::wal::CheckpointStatus::IO) => Ok(StepResult::IO),
Ok(crate::storage::wal::CheckpointStatus::Done) => Ok(StepResult::Done),
Err(e) => Err(e),
};
} else {
return Ok(StepResult::Done);
}
return Ok(StepResult::Done);
}
Insn::Transaction { write } => {
let connection = self.connection.upgrade().unwrap();

View File

@@ -91,7 +91,7 @@ impl SimulatorIO {
}
impl IO for SimulatorIO {
fn open_file(&self, path: &str) -> Result<Rc<dyn limbo_core::File>> {
fn open_file(&self, path: &str, flags: OpenFlags) -> Result<Rc<dyn limbo_core::File>> {
let inner = self.inner.open_file(path)?;
let file = Rc::new(SimulatorFile {
inner,