From fc65c5096dfd6bb773abf9501ef943114c2ff7fa Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Fri, 27 Sep 2024 18:17:27 +0200 Subject: [PATCH] cacheflush state machine --- core/io/darwin.rs | 17 ++++++- core/io/linux.rs | 16 ++++++ core/io/mod.rs | 21 +++++++- core/io/windows.rs | 6 +++ core/storage/database.rs | 2 +- core/storage/pager.rs | 90 +++++++++++++++++++++++++++++----- core/storage/sqlite3_ondisk.rs | 11 ++++- core/storage/wal.rs | 74 +++++++++++++++++++++++++--- 8 files changed, 211 insertions(+), 26 deletions(-) diff --git a/core/io/darwin.rs b/core/io/darwin.rs index 01be41d4c..4a4c9ae58 100644 --- a/core/io/darwin.rs +++ b/core/io/darwin.rs @@ -71,7 +71,7 @@ impl IO for DarwinIO { let c: &Completion = c; let r = match c { Completion::Read(r) => r, - Completion::Write(_) => unreachable!(), + _ => unreachable!(), }; let mut buf = r.buf_mut(); file.seek(std::io::SeekFrom::Start(pos as u64))?; @@ -192,7 +192,7 @@ impl File for DarwinFile { let result = { let r = match &(*c) { Completion::Read(r) => r, - Completion::Write(_) => unreachable!(), + _ => unreachable!(), }; let mut buf = r.buf_mut(); rustix::io::pread(file.as_fd(), buf.as_mut_slice(), pos as u64) @@ -259,6 +259,19 @@ impl File for DarwinFile { Err(e) => Err(e.into()), } } + + fn sync(&self, c: Rc) -> Result<()> { + let file = self.file.borrow(); + let result = rustix::fs::fsync(file.as_fd()); + match result { + std::result::Result::Ok(()) => { + trace!("fsync"); + c.complete(0); + Ok(()) + } + Err(e) => Err(e.into()), + } + } } impl Drop for DarwinFile { diff --git a/core/io/linux.rs b/core/io/linux.rs index 4a4187e41..27846696e 100644 --- a/core/io/linux.rs +++ b/core/io/linux.rs @@ -260,6 +260,22 @@ impl File for LinuxFile { io.ring.submit_entry(&write); Ok(()) } + + fn sync(&self, c: Rc) -> Result<()> { + let mut io = self.io.borrow_mut(); + let fd = io_uring::types::Fd(self.file.as_raw_fd()); + let ptr = Rc::into_raw(c.clone()); + let sync = io_uring::opcode::Fsync::new(fd) + .build() + .user_data(ptr as u64); + let ring = &mut io.ring; + unsafe { + ring.submission() + .push(&sync) + .expect("submission queue is full"); + } + Ok(()) + } } impl Drop for LinuxFile { diff --git a/core/io/mod.rs b/core/io/mod.rs index 7de057769..905bb5a75 100644 --- a/core/io/mod.rs +++ b/core/io/mod.rs @@ -14,6 +14,7 @@ pub trait File { fn unlock_file(&self) -> Result<()>; fn pread(&self, pos: usize, c: Rc) -> Result<()>; fn pwrite(&self, pos: usize, buffer: Rc>, c: Rc) -> Result<()>; + fn sync(&self, c: Rc) -> Result<()>; } pub enum OpenFlags { @@ -33,10 +34,12 @@ pub trait IO { pub type Complete = dyn Fn(Rc>); pub type WriteComplete = dyn Fn(i32); +pub type SyncComplete = dyn Fn(i32); pub enum Completion { Read(ReadCompletion), Write(WriteCompletion), + Sync(SyncCompletion), } pub struct ReadCompletion { @@ -48,7 +51,8 @@ impl Completion { pub fn complete(&self, result: i32) { match self { Completion::Read(r) => r.complete(), - Completion::Write(w) => w.complete(result), // fix + Completion::Write(w) => w.complete(result), + Completion::Sync(s) => s.complete(result), // fix } } } @@ -57,6 +61,10 @@ pub struct WriteCompletion { pub complete: Box, } +pub struct SyncCompletion { + pub complete: Box, +} + impl ReadCompletion { pub fn new(buf: Rc>, complete: Box) -> Self { Self { buf, complete } @@ -79,11 +87,22 @@ impl WriteCompletion { pub fn new(complete: Box) -> Self { Self { complete } } + pub fn complete(&self, bytes_written: i32) { (self.complete)(bytes_written); } } +impl SyncCompletion { + pub fn new(complete: Box) -> Self { + Self { complete } + } + + pub fn complete(&self, res: i32) { + (self.complete)(res); + } +} + pub type BufferData = Pin>; pub type BufferDropFn = Rc; diff --git a/core/io/windows.rs b/core/io/windows.rs index 59f40aa1c..1330989bc 100644 --- a/core/io/windows.rs +++ b/core/io/windows.rs @@ -82,4 +82,10 @@ impl File for WindowsFile { file.write_all(buf)?; Ok(()) } + + fn sync(&self, c: Rc) -> Result<()> { + let mut file = self.file.borrow_mut(); + file.sync_all()?; + Ok(()) + } } diff --git a/core/storage/database.rs b/core/storage/database.rs index 16da1ed3f..5039facf3 100644 --- a/core/storage/database.rs +++ b/core/storage/database.rs @@ -26,7 +26,7 @@ impl DatabaseStorage for FileStorage { fn read_page(&self, page_idx: usize, c: Rc) -> Result<()> { let r = match &(*c) { Completion::Read(r) => r, - Completion::Write(_) => unreachable!(), + _ => unreachable!(), }; let size = r.buf().len(); assert!(page_idx > 0); diff --git a/core/storage/pager.rs b/core/storage/pager.rs index a62710905..7c28de2ca 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -265,6 +265,21 @@ impl PageCache { } } +#[derive(Clone)] +enum FlushState { + Start, + FramesDone, + CheckpointDone, + Syncing, +} + +/// This will keep track of the state of current cache flush in order to not repeat work +struct FlushInfo { + state: FlushState, + /// Number of writes taking place. When in_flight gets to 0 we can schedule a fsync. + in_flight_writes: Rc>, +} + /// The pager interface implements the persistence layer by providing access /// to pages of the database file, including caching, concurrency control, and /// transaction management. @@ -281,6 +296,8 @@ pub struct Pager { pub io: Arc, dirty_pages: Rc>>, db_header: Rc>, + + flush_info: RefCell, } impl Pager { @@ -308,6 +325,10 @@ impl Pager { io, dirty_pages: Rc::new(RefCell::new(HashSet::new())), db_header: db_header_ref.clone(), + flush_info: RefCell::new(FlushInfo { + state: FlushState::Start, + in_flight_writes: Rc::new(RefCell::new(0)), + }), }) } @@ -375,20 +396,59 @@ impl Pager { } pub fn cacheflush(&self) -> Result { - let db_size = self.db_header.borrow().database_size; - for page_id in self.dirty_pages.borrow().iter() { - let mut cache = self.page_cache.borrow_mut(); - let page = cache.get(&page_id).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it."); - self.wal - .borrow_mut() - .append_frame(page.clone(), db_size, self)?; + if matches!(self.flush_info.borrow().state.clone(), FlushState::Start) { + let db_size = self.db_header.borrow().database_size; + for page_id in self.dirty_pages.borrow().iter() { + let mut cache = self.page_cache.borrow_mut(); + let page = cache.get(&page_id).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it."); + self.wal.borrow_mut().append_frame( + page.clone(), + db_size, + self, + self.flush_info.borrow().in_flight_writes.clone(), + )?; + } + self.dirty_pages.borrow_mut().clear(); + self.flush_info.borrow_mut().state = FlushState::FramesDone; } - // remove before checkpoint so we can retry cacheflush if needed - self.dirty_pages.borrow_mut().clear(); - let should_checkpoint = self.wal.borrow().should_checkpoint(); - if should_checkpoint { - self.wal.borrow_mut().checkpoint(self)?; + if matches!( + self.flush_info.borrow().state.clone(), + FlushState::FramesDone + ) { + let should_checkpoint = self.wal.borrow().should_checkpoint(); + if should_checkpoint { + match self + .wal + .borrow_mut() + .checkpoint(self, self.flush_info.borrow().in_flight_writes.clone()) + { + Ok(CheckpointStatus::IO) => return Ok(CheckpointStatus::IO), + Ok(CheckpointStatus::Done) => {} + Err(e) => return Err(e), + }; + } + self.flush_info.borrow_mut().state = FlushState::CheckpointDone; + } + + if matches!( + self.flush_info.borrow().state.clone(), + FlushState::CheckpointDone + ) { + let in_flight = *self.flush_info.borrow().in_flight_writes.borrow(); + if in_flight == 0 { + self.flush_info.borrow_mut().state = FlushState::Syncing; + } + } + + if matches!(self.flush_info.borrow().state.clone(), FlushState::Syncing) { + println!("syncing"); + match self.wal.borrow_mut().sync() { + Ok(CheckpointStatus::IO) => return Ok(CheckpointStatus::IO), + Ok(CheckpointStatus::Done) => {} + Err(e) => return Err(e), + } + self.flush_info.borrow_mut().state = FlushState::Start; } Ok(CheckpointStatus::Done) } @@ -396,7 +456,11 @@ impl Pager { // WARN: used for testing purposes pub fn clear_page_cache(&self) { loop { - match self.wal.borrow_mut().checkpoint(self) { + match self + .wal + .borrow_mut() + .checkpoint(self, Rc::new(RefCell::new(0))) + { Ok(CheckpointStatus::IO) => {} Ok(CheckpointStatus::Done) => { break; diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 7cd2b959f..493ece414 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -528,7 +528,11 @@ fn finish_read_page( Ok(()) } -pub fn begin_write_btree_page(pager: &Pager, page: &Rc>) -> Result<()> { +pub fn begin_write_btree_page( + pager: &Pager, + page: &Rc>, + write_counter: Rc>, +) -> Result<()> { let page_source = &pager.page_io; let page_finish = page.clone(); @@ -540,11 +544,13 @@ pub fn begin_write_btree_page(pager: &Pager, page: &Rc>) -> Result contents.buffer.clone() }; + *write_counter.borrow_mut() += 1; let write_complete = { let buf_copy = buffer.clone(); Box::new(move |bytes_written: i32| { let buf_copy = buf_copy.clone(); let buf_len = buf_copy.borrow().len(); + *write_counter.borrow_mut() -= 1; page_finish.borrow_mut().clear_dirty(); if bytes_written < buf_len as i32 { @@ -994,6 +1000,7 @@ pub fn begin_write_wal_frame( offset: usize, page: &Rc>, db_size: u32, + write_counter: Rc>, ) -> Result<()> { let page_finish = page.clone(); let page_id = page.borrow().id; @@ -1029,11 +1036,13 @@ pub fn begin_write_wal_frame( Rc::new(RefCell::new(buffer)) }; + *write_counter.borrow_mut() += 1; let write_complete = { let buf_copy = buffer.clone(); Box::new(move |bytes_written: i32| { let buf_copy = buf_copy.clone(); let buf_len = buf_copy.borrow().len(); + *write_counter.borrow_mut() -= 1; page_finish.borrow_mut().clear_dirty(); if bytes_written < buf_len as i32 { diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 9acef6f83..aa5c92e38 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -1,16 +1,17 @@ use std::collections::{HashMap, HashSet}; use std::{cell::RefCell, rc::Rc, sync::Arc}; -use crate::io::{File, IO}; +use crate::io::{File, SyncCompletion, IO}; use crate::storage::sqlite3_ondisk::{ begin_read_page, begin_read_wal_frame, begin_write_wal_frame, WAL_FRAME_HEADER_SIZE, WAL_HEADER_SIZE, }; +use crate::Completion; use crate::{storage::pager::Page, Result}; use super::buffer_pool::BufferPool; use super::pager::Pager; -use super::sqlite3_ondisk; +use super::sqlite3_ondisk::{self, begin_write_btree_page}; /// Write-ahead log (WAL). pub trait Wal { @@ -38,10 +39,21 @@ pub trait Wal { ) -> Result<()>; /// Write a frame to the WAL. - fn append_frame(&mut self, page: Rc>, db_size: u32, pager: &Pager) -> Result<()>; + fn append_frame( + &mut self, + page: Rc>, + db_size: u32, + pager: &Pager, + write_counter: Rc>, + ) -> Result<()>; fn should_checkpoint(&self) -> bool; - fn checkpoint(&mut self, pager: &Pager) -> Result; + fn checkpoint( + &mut self, + pager: &Pager, + write_counter: Rc>, + ) -> Result; + fn sync(&mut self) -> Result; } #[cfg(feature = "fs")] @@ -57,6 +69,8 @@ pub struct WalFile { frame_cache: RefCell>>, // FIXME: for now let's use a simple hashmap instead of a shm file checkpoint_threshold: usize, ongoing_checkpoint: HashSet, + + syncing: Rc>, } pub enum CheckpointStatus { @@ -112,12 +126,24 @@ impl Wal for WalFile { } /// Write a frame to the WAL. - fn append_frame(&mut self, page: Rc>, db_size: u32, pager: &Pager) -> Result<()> { + fn append_frame( + &mut self, + page: Rc>, + db_size: u32, + pager: &Pager, + write_counter: Rc>, + ) -> Result<()> { self.ensure_init()?; let page_id = page.borrow().id; let frame_id = *self.max_frame.borrow(); let offset = self.frame_offset(frame_id); - begin_write_wal_frame(self.file.borrow().as_ref().unwrap(), offset, &page, db_size)?; + begin_write_wal_frame( + self.file.borrow().as_ref().unwrap(), + offset, + &page, + db_size, + write_counter, + )?; self.max_frame.replace(frame_id + 1); { let mut frame_cache = self.frame_cache.borrow_mut(); @@ -151,23 +177,54 @@ impl Wal for WalFile { } } - fn checkpoint(&mut self, pager: &Pager) -> Result { + fn checkpoint( + &mut self, + pager: &Pager, + write_counter: Rc>, + ) -> Result { for (page_id, _frames) in self.frame_cache.borrow().iter() { // move page from WAL to database file // TODO(Pere): use splice syscall in linux to do zero-copy file page movements to improve perf let page_id = *page_id as usize; + if self.ongoing_checkpoint.contains(&page_id) { + continue; + } + let page = pager.read_page(page_id)?; if page.borrow().is_locked() { return Ok(CheckpointStatus::IO); } - pager.put_page(page_id, page); + + begin_write_btree_page(pager, &page, write_counter.clone()); self.ongoing_checkpoint.insert(page_id); } + self.frame_cache.borrow_mut().clear(); *self.max_frame.borrow_mut() = 0; self.ongoing_checkpoint.clear(); Ok(CheckpointStatus::Done) } + + fn sync(&mut self) -> Result { + self.ensure_init()?; + let file = self.file.borrow(); + let file = file.as_ref().unwrap(); + { + let syncing = self.syncing.clone(); + let completion = Completion::Sync(SyncCompletion { + complete: Box::new(move |_| { + *syncing.borrow_mut() = false; + }), + }); + file.sync(Rc::new(completion))?; + } + + if *self.syncing.borrow() { + return Ok(CheckpointStatus::IO); + } else { + return Ok(CheckpointStatus::Done); + } + } } #[cfg(feature = "fs")] @@ -184,6 +241,7 @@ impl WalFile { nbackfills: RefCell::new(0), checkpoint_threshold: 1000, ongoing_checkpoint: HashSet::new(), + syncing: Rc::new(RefCell::new(false)), } }