diff --git a/.gitignore b/.gitignore index 61a87d17e..70c4e5c88 100644 --- a/.gitignore +++ b/.gitignore @@ -45,4 +45,5 @@ simulator-output/ &1 bisected.sql -*.log \ No newline at end of file +*.log + diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 5fff92f93..96c7d3db8 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -327,8 +327,6 @@ enum CommitState { Checkpoint, /// Fsync the database file. SyncDbFile, - /// After database file is fsynced. - AfterSyncDbFile, } #[derive(Clone, Debug, Default)] @@ -359,6 +357,7 @@ pub enum BtreePageAllocMode { /// This will keep track of the state of current cache commit in order to not repeat work struct CommitInfo { + completions: RefCell>, state: Cell, time: Cell, } @@ -586,6 +585,7 @@ impl Pager { hash::BuildHasherDefault::new(), ))), commit_info: CommitInfo { + completions: RefCell::new(Vec::new()), state: CommitState::Start.into(), time: now.into(), }, @@ -1345,11 +1345,11 @@ impl Pager { return Ok(IOResult::Done(PagerCommitResult::WalWritten)); } + let mut completions = self.commit_info.completions.borrow_mut(); + completions.clear(); let page_sz = self.page_size.get().expect("page size not set"); - let mut completions: Vec = Vec::new(); let mut pages: Vec = Vec::with_capacity(dirty_ids.len().min(IOV_MAX)); let total = dirty_ids.len(); - let mut cache = self.page_cache.write(); for (i, page_id) in dirty_ids.into_iter().enumerate() { let page = { @@ -1400,9 +1400,6 @@ impl Pager { self.commit_info.state.set(CommitState::SyncWal); } } - if !completions.iter().all(|c| c.is_completed()) { - io_yield_many!(completions); - } } CommitState::SyncWal => { self.commit_info.state.set(CommitState::AfterSyncWal); @@ -1414,12 +1411,10 @@ impl Pager { } Err(e) => return Err(e), }; - if !c.is_completed() { - io_yield_one!(c); - } + self.commit_info.completions.borrow_mut().push(c); } CommitState::AfterSyncWal => { - turso_assert!(!wal.borrow().is_syncing(), "wal should have synced"); + // turso_assert!(!wal.borrow().is_syncing(), "wal should have synced"); if wal_auto_checkpoint_disabled || !wal.borrow().should_checkpoint() { self.commit_info.state.set(CommitState::Start); break PagerCommitResult::WalWritten; @@ -1427,12 +1422,30 @@ impl Pager { self.commit_info.state.set(CommitState::Checkpoint); } CommitState::Checkpoint => { - checkpoint_result = return_if_io!(self.checkpoint()); - // Skip sync if synchronous mode is OFF - if sync_mode == crate::SyncMode::Off { - self.commit_info.state.set(CommitState::AfterSyncDbFile); - } else { - self.commit_info.state.set(CommitState::SyncDbFile); + let mut completions = self.commit_info.completions.borrow_mut(); + match self.checkpoint()? { + IOResult::IO(cmp) => { + match cmp { + IOCompletions::Single(c) => { + completions.push(c); + } + IOCompletions::Many(c) => { + completions.extend(c); + } + } + // TODO: remove serialization of checkpoint path + io_yield_many!(std::mem::take(&mut *completions)); + } + IOResult::Done(res) => { + checkpoint_result = res; + // Skip sync if synchronous mode is OFF + if sync_mode == crate::SyncMode::Off { + self.commit_info.state.set(CommitState::Start); + break PagerCommitResult::Checkpointed(checkpoint_result); + } else { + self.commit_info.state.set(CommitState::SyncDbFile); + } + } } } CommitState::SyncDbFile => { @@ -1445,13 +1458,7 @@ impl Pager { } Err(e) => return Err(e), }; - self.commit_info.state.set(CommitState::AfterSyncDbFile); - if !c.is_completed() { - io_yield_one!(c); - } - } - CommitState::AfterSyncDbFile => { - turso_assert!(!self.syncing.get(), "should have finished syncing"); + self.commit_info.completions.borrow_mut().push(c); self.commit_info.state.set(CommitState::Start); break PagerCommitResult::Checkpointed(checkpoint_result); } @@ -1466,8 +1473,13 @@ impl Pager { .unwrap() .as_millis() ); - wal.borrow_mut().finish_append_frames_commit()?; - Ok(IOResult::Done(res)) + let mut completions = self.commit_info.completions.borrow_mut(); + if completions.is_empty() || completions.iter().all(|c| c.is_completed()) { + completions.clear(); + wal.borrow_mut().finish_append_frames_commit()?; + return Ok(IOResult::Done(res)); + } + io_yield_many!(std::mem::take(&mut completions)); } #[instrument(skip_all, level = Level::DEBUG)]