diff --git a/core/storage/database.rs b/core/storage/database.rs index 5039facf3..75d835734 100644 --- a/core/storage/database.rs +++ b/core/storage/database.rs @@ -14,6 +14,7 @@ pub trait DatabaseStorage { buffer: Rc>, c: Rc, ) -> Result<()>; + fn sync(&self, c: Rc) -> Result<()>; } #[cfg(feature = "fs")] @@ -52,6 +53,10 @@ impl DatabaseStorage for FileStorage { self.file.pwrite(pos, buffer, c)?; Ok(()) } + + fn sync(&self, c: Rc) -> Result<()> { + self.file.sync(c) + } } #[cfg(feature = "fs")] diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 17927e1b6..e5d8c4b99 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -268,9 +268,11 @@ impl PageCache { #[derive(Clone)] enum FlushState { Start, - FramesDone, + SyncWal, + Checkpoint, CheckpointDone, - Syncing, + SyncDbFile, + WaitSyncDbFile, } /// This will keep track of the state of current cache flush in order to not repeat work @@ -298,6 +300,7 @@ pub struct Pager { db_header: Rc>, flush_info: RefCell, + syncing: Rc>, } impl Pager { @@ -329,6 +332,7 @@ impl Pager { state: FlushState::Start, in_flight_writes: Rc::new(RefCell::new(0)), }), + syncing: Rc::new(RefCell::new(false)), }) } @@ -396,58 +400,72 @@ impl Pager { } pub fn cacheflush(&self) -> Result { - if matches!(self.flush_info.borrow().state.clone(), FlushState::Start) { - let db_size = self.db_header.borrow().database_size; - for page_id in self.dirty_pages.borrow().iter() { - let mut cache = self.page_cache.borrow_mut(); - let page = cache.get(&page_id).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it."); - self.wal.borrow_mut().append_frame( - page.clone(), - db_size, - self, - self.flush_info.borrow().in_flight_writes.clone(), - )?; - } - self.dirty_pages.borrow_mut().clear(); - self.flush_info.borrow_mut().state = FlushState::FramesDone; - } + loop { + let state = self.flush_info.borrow().state.clone(); + match state { + FlushState::Start => { + let db_size = self.db_header.borrow().database_size; + for page_id in self.dirty_pages.borrow().iter() { + let mut cache = self.page_cache.borrow_mut(); + let page = cache.get(&page_id).expect("we somehow added a page to dirty list but we didn't mark it as dirty, causing cache to drop it."); + self.wal.borrow_mut().append_frame( + page.clone(), + db_size, + self, + self.flush_info.borrow().in_flight_writes.clone(), + )?; + } + self.dirty_pages.borrow_mut().clear(); + self.flush_info.borrow_mut().state = FlushState::SyncWal; + } + FlushState::Checkpoint => { + let in_flight = self.flush_info.borrow().in_flight_writes.clone(); + dbg!("checkpoint"); + match self.wal.borrow_mut().checkpoint(self, in_flight)? { + CheckpointStatus::IO => return Ok(CheckpointStatus::IO), + CheckpointStatus::Done => { + self.flush_info.borrow_mut().state = FlushState::CheckpointDone; + } + }; + } + FlushState::CheckpointDone => { + dbg!("checkpoint done"); + let in_flight = *self.flush_info.borrow().in_flight_writes.borrow(); + if in_flight == 0 { + self.flush_info.borrow_mut().state = FlushState::SyncDbFile; + } else { + return Ok(CheckpointStatus::IO); + } + } + FlushState::SyncWal => { + match self.wal.borrow_mut().sync() { + Ok(CheckpointStatus::IO) => return Ok(CheckpointStatus::IO), + Ok(CheckpointStatus::Done) => {} + Err(e) => return Err(e), + } - if matches!( - self.flush_info.borrow().state.clone(), - FlushState::FramesDone - ) { - let should_checkpoint = self.wal.borrow().should_checkpoint(); - if should_checkpoint { - match self - .wal - .borrow_mut() - .checkpoint(self, self.flush_info.borrow().in_flight_writes.clone()) - { - Ok(CheckpointStatus::IO) => return Ok(CheckpointStatus::IO), - Ok(CheckpointStatus::Done) => {} - Err(e) => return Err(e), - }; + let should_checkpoint = self.wal.borrow().should_checkpoint(); + if should_checkpoint { + self.flush_info.borrow_mut().state = FlushState::Checkpoint; + } else { + self.flush_info.borrow_mut().state = FlushState::Start; + break; + } + } + FlushState::SyncDbFile => { + dbg!("sync db"); + sqlite3_ondisk::begin_sync(self.page_io.clone(), self.syncing.clone())?; + self.flush_info.borrow_mut().state = FlushState::WaitSyncDbFile; + } + FlushState::WaitSyncDbFile => { + if *self.syncing.borrow() { + return Ok(CheckpointStatus::IO); + } else { + self.flush_info.borrow_mut().state = FlushState::Start; + break; + } + } } - self.flush_info.borrow_mut().state = FlushState::CheckpointDone; - } - - if matches!( - self.flush_info.borrow().state.clone(), - FlushState::CheckpointDone - ) { - let in_flight = *self.flush_info.borrow().in_flight_writes.borrow(); - if in_flight == 0 { - self.flush_info.borrow_mut().state = FlushState::Syncing; - } - } - - if matches!(self.flush_info.borrow().state.clone(), FlushState::Syncing) { - match self.wal.borrow_mut().sync() { - Ok(CheckpointStatus::IO) => return Ok(CheckpointStatus::IO), - Ok(CheckpointStatus::Done) => {} - Err(e) => return Err(e), - } - self.flush_info.borrow_mut().state = FlushState::Start; } Ok(CheckpointStatus::Done) } diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 493ece414..51235c7d7 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -42,7 +42,7 @@ //! https://www.sqlite.org/fileformat.html use crate::error::LimboError; -use crate::io::{Buffer, Completion, ReadCompletion, WriteCompletion}; +use crate::io::{Buffer, Completion, ReadCompletion, SyncCompletion, WriteCompletion}; use crate::storage::buffer_pool::BufferPool; use crate::storage::database::DatabaseStorage; use crate::storage::pager::{Page, Pager}; @@ -563,6 +563,18 @@ pub fn begin_write_btree_page( Ok(()) } +pub fn begin_sync(page_io: Rc, syncing: Rc>) -> Result<()> { + assert!(!*syncing.borrow()); + *syncing.borrow_mut() = true; + let completion = Completion::Sync(SyncCompletion { + complete: Box::new(move |_| { + *syncing.borrow_mut() = false; + }), + }); + page_io.sync(Rc::new(completion))?; + Ok(()) +} + #[allow(clippy::enum_variant_names)] #[derive(Debug, Clone)] pub enum BTreeCell { diff --git a/core/storage/wal.rs b/core/storage/wal.rs index aa5c92e38..704ac03fe 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -170,11 +170,7 @@ impl Wal for WalFile { fn should_checkpoint(&self) -> bool { let frame_id = *self.max_frame.borrow() as usize; - if frame_id < self.checkpoint_threshold { - true - } else { - false - } + frame_id >= self.checkpoint_threshold } fn checkpoint( @@ -195,7 +191,7 @@ impl Wal for WalFile { return Ok(CheckpointStatus::IO); } - begin_write_btree_page(pager, &page, write_counter.clone()); + begin_write_btree_page(pager, &page, write_counter.clone())?; self.ongoing_checkpoint.insert(page_id); }