diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 0b85be394..d656290b0 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -15,7 +15,7 @@ use crate::{ Result, TransactionState, }; use parking_lot::RwLock; -use std::cell::{Cell, RefCell, UnsafeCell}; +use std::cell::{RefCell, UnsafeCell}; use std::collections::HashSet; use std::hash; use std::rc::Rc; @@ -371,10 +371,10 @@ pub enum BtreePageAllocMode { /// This will keep track of the state of current cache commit in order to not repeat work struct CommitInfo { - completions: RefCell>, - result: RefCell>, - state: Cell, - time: Cell, + completions: Vec, + result: Option, + state: CommitState, + time: crate::io::clock::Instant, } /// Track the state of the auto-vacuum mode. @@ -508,7 +508,7 @@ pub struct Pager { pub io: Arc, dirty_pages: Arc>>>, - commit_info: CommitInfo, + commit_info: RwLock, checkpoint_state: RwLock, syncing: Arc, auto_vacuum_mode: AtomicU8, @@ -618,12 +618,12 @@ impl Pager { dirty_pages: Arc::new(RwLock::new(HashSet::with_hasher( hash::BuildHasherDefault::new(), ))), - commit_info: CommitInfo { - result: RefCell::new(None), - completions: RefCell::new(Vec::new()), - state: CommitState::PrepareWal.into(), - time: now.into(), - }, + commit_info: RwLock::new(CommitInfo { + result: None, + completions: Vec::new(), + state: CommitState::PrepareWal, + time: now, + }), syncing: Arc::new(AtomicBool::new(false)), checkpoint_state: RwLock::new(CheckpointState::Checkpoint), buffer_pool, @@ -1421,7 +1421,7 @@ impl Pager { data_sync_retry, ) { r @ (Ok(IOResult::Done(..)) | Err(..)) => { - self.commit_info.state.set(CommitState::PrepareWal); + self.commit_info.write().state = CommitState::PrepareWal; r } Ok(IOResult::IO(io)) => Ok(IOResult::IO(io)), @@ -1442,45 +1442,48 @@ impl Pager { }; loop { - let state = self.commit_info.state.get(); + let state = self.commit_info.read().state; trace!(?state); match state { CommitState::PrepareWal => { let page_sz = self.get_page_size_unchecked(); let c = wal.borrow_mut().prepare_wal_start(page_sz)?; let Some(c) = c else { - self.commit_info.state.set(CommitState::GetDbSize); + self.commit_info.write().state = CommitState::GetDbSize; continue; }; - self.commit_info.state.set(CommitState::PrepareWalSync); + self.commit_info.write().state = CommitState::PrepareWalSync; if !c.is_completed() { io_yield_one!(c); } } CommitState::PrepareWalSync => { let c = wal.borrow_mut().prepare_wal_finish()?; - self.commit_info.state.set(CommitState::GetDbSize); + self.commit_info.write().state = CommitState::GetDbSize; if !c.is_completed() { io_yield_one!(c); } } CommitState::GetDbSize => { let db_size = return_if_io!(self.with_header(|header| header.database_size)); - self.commit_info.state.set(CommitState::PrepareFrames { + self.commit_info.write().state = CommitState::PrepareFrames { db_size: db_size.get(), - }); + }; } CommitState::PrepareFrames { db_size } => { let now = self.io.now(); - self.commit_info.time.set(now); + self.commit_info.write().time = now; let dirty_ids: Vec = self.dirty_pages.read().iter().copied().collect(); if dirty_ids.is_empty() { return Ok(IOResult::Done(PagerCommitResult::WalWritten)); } - let mut completions = self.commit_info.completions.borrow_mut(); - completions.clear(); + let mut completions = Vec::new(); + { + let mut commit_info = self.commit_info.write(); + commit_info.completions.clear(); + } let page_sz = self.get_page_size_unchecked(); let mut pages: Vec = Vec::with_capacity(dirty_ids.len().min(IOV_MAX)); let total = dirty_ids.len(); @@ -1530,14 +1533,21 @@ impl Pager { // Skip sync if synchronous mode is OFF if sync_mode == crate::SyncMode::Off { if wal_auto_checkpoint_disabled || !wal.borrow().should_checkpoint() { - *self.commit_info.result.borrow_mut() = - Some(PagerCommitResult::WalWritten); - self.commit_info.state.set(CommitState::Done); + let mut commit_info = self.commit_info.write(); + commit_info.completions = completions; + commit_info.result = Some(PagerCommitResult::WalWritten); + commit_info.state = CommitState::Done; continue; } - self.commit_info.state.set(CommitState::Checkpoint); + { + let mut commit_info = self.commit_info.write(); + commit_info.completions = completions; + commit_info.state = CommitState::Checkpoint; + } } else { - self.commit_info.state.set(CommitState::SyncWal); + let mut commit_info = self.commit_info.write(); + commit_info.completions = completions; + commit_info.state = CommitState::SyncWal; } } } @@ -1550,37 +1560,41 @@ impl Pager { } Err(e) => return Err(e), }; - self.commit_info.completions.borrow_mut().push(c); + self.commit_info.write().completions.push(c); if wal_auto_checkpoint_disabled || !wal.borrow().should_checkpoint() { - *self.commit_info.result.borrow_mut() = Some(PagerCommitResult::WalWritten); - self.commit_info.state.set(CommitState::Done); + let mut commit_info = self.commit_info.write(); + commit_info.result = Some(PagerCommitResult::WalWritten); + commit_info.state = CommitState::Done; continue; } - self.commit_info.state.set(CommitState::Checkpoint); + self.commit_info.write().state = CommitState::Checkpoint; } CommitState::Checkpoint => { - let mut completions = self.commit_info.completions.borrow_mut(); match self.checkpoint()? { IOResult::IO(cmp) => { - match cmp { - IOCompletions::Single(c) => { - completions.push(c); + let completions = { + let mut commit_info = self.commit_info.write(); + match cmp { + IOCompletions::Single(c) => { + commit_info.completions.push(c); + } + IOCompletions::Many(c) => { + commit_info.completions.extend(c); + } } - IOCompletions::Many(c) => { - completions.extend(c); - } - } + std::mem::take(&mut commit_info.completions) + }; // TODO: remove serialization of checkpoint path - io_yield_many!(std::mem::take(&mut *completions)); + io_yield_many!(completions); } IOResult::Done(res) => { - *self.commit_info.result.borrow_mut() = - Some(PagerCommitResult::Checkpointed(res)); + let mut commit_info = self.commit_info.write(); + commit_info.result = Some(PagerCommitResult::Checkpointed(res)); // Skip sync if synchronous mode is OFF if sync_mode == crate::SyncMode::Off { - self.commit_info.state.set(CommitState::Done); + commit_info.state = CommitState::Done; } else { - self.commit_info.state.set(CommitState::SyncDbFile); + commit_info.state = CommitState::SyncDbFile; } } } @@ -1589,8 +1603,8 @@ impl Pager { let sync_result = sqlite3_ondisk::begin_sync(self.db_file.clone(), self.syncing.clone()); self.commit_info + .write() .completions - .borrow_mut() .push(match sync_result { Ok(c) => c, Err(e) if !data_sync_retry => { @@ -1598,7 +1612,7 @@ impl Pager { } Err(e) => return Err(e), }); - self.commit_info.state.set(CommitState::Done); + self.commit_info.write().state = CommitState::Done; } CommitState::Done => { tracing::debug!( @@ -1606,19 +1620,25 @@ impl Pager { self.io .now() .to_system_time() - .duration_since(self.commit_info.time.get().to_system_time()) + .duration_since(self.commit_info.read().time.to_system_time()) .unwrap() .as_millis() ); - let mut completions = self.commit_info.completions.borrow_mut(); - if completions.iter().all(|c| c.is_completed()) { - completions.clear(); - self.commit_info.state.set(CommitState::PrepareWal); + let (should_finish, result, completions) = { + let mut commit_info = self.commit_info.write(); + if commit_info.completions.iter().all(|c| c.is_completed()) { + commit_info.completions.clear(); + commit_info.state = CommitState::PrepareWal; + (true, commit_info.result.take(), Vec::new()) + } else { + (false, None, std::mem::take(&mut commit_info.completions)) + } + }; + if should_finish { wal.borrow_mut().finish_append_frames_commit()?; - let result = self.commit_info.result.borrow_mut().take(); return Ok(IOResult::Done(result.expect("commit result should be set"))); } - io_yield_many!(std::mem::take(&mut completions)); + io_yield_many!(completions); } } } @@ -2343,8 +2363,8 @@ impl Pager { fn reset_internal_states(&self) { *self.checkpoint_state.write() = CheckpointState::Checkpoint; self.syncing.store(false, Ordering::SeqCst); - self.commit_info.state.set(CommitState::PrepareWal); - self.commit_info.time.set(self.io.now()); + self.commit_info.write().state = CommitState::PrepareWal; + self.commit_info.write().time = self.io.now(); *self.allocate_page_state.write() = AllocatePageState::Start; *self.free_page_state.write() = FreePageState::Start; #[cfg(not(feature = "omit_autovacuum"))]