core/storage: Wrap Pager::commit_info with RwLock

Also remove RefCells from CommitInfo because they're not only redundant,
but cause CommitInfo not to be Send.
This commit is contained in:
Pekka Enberg
2025-09-29 12:30:16 +03:00
parent fda1b89540
commit f247b1a2bb

View File

@@ -15,7 +15,7 @@ use crate::{
Result, TransactionState,
};
use parking_lot::RwLock;
use std::cell::{Cell, RefCell, UnsafeCell};
use std::cell::{RefCell, UnsafeCell};
use std::collections::HashSet;
use std::hash;
use std::rc::Rc;
@@ -371,10 +371,10 @@ pub enum BtreePageAllocMode {
/// This will keep track of the state of current cache commit in order to not repeat work
struct CommitInfo {
completions: RefCell<Vec<Completion>>,
result: RefCell<Option<PagerCommitResult>>,
state: Cell<CommitState>,
time: Cell<crate::io::clock::Instant>,
completions: Vec<Completion>,
result: Option<PagerCommitResult>,
state: CommitState,
time: crate::io::clock::Instant,
}
/// Track the state of the auto-vacuum mode.
@@ -508,7 +508,7 @@ pub struct Pager {
pub io: Arc<dyn crate::io::IO>,
dirty_pages: Arc<RwLock<HashSet<usize, hash::BuildHasherDefault<hash::DefaultHasher>>>>,
commit_info: CommitInfo,
commit_info: RwLock<CommitInfo>,
checkpoint_state: RwLock<CheckpointState>,
syncing: Arc<AtomicBool>,
auto_vacuum_mode: AtomicU8,
@@ -618,12 +618,12 @@ impl Pager {
dirty_pages: Arc::new(RwLock::new(HashSet::with_hasher(
hash::BuildHasherDefault::new(),
))),
commit_info: CommitInfo {
result: RefCell::new(None),
completions: RefCell::new(Vec::new()),
state: CommitState::PrepareWal.into(),
time: now.into(),
},
commit_info: RwLock::new(CommitInfo {
result: None,
completions: Vec::new(),
state: CommitState::PrepareWal,
time: now,
}),
syncing: Arc::new(AtomicBool::new(false)),
checkpoint_state: RwLock::new(CheckpointState::Checkpoint),
buffer_pool,
@@ -1421,7 +1421,7 @@ impl Pager {
data_sync_retry,
) {
r @ (Ok(IOResult::Done(..)) | Err(..)) => {
self.commit_info.state.set(CommitState::PrepareWal);
self.commit_info.write().state = CommitState::PrepareWal;
r
}
Ok(IOResult::IO(io)) => Ok(IOResult::IO(io)),
@@ -1442,45 +1442,48 @@ impl Pager {
};
loop {
let state = self.commit_info.state.get();
let state = self.commit_info.read().state;
trace!(?state);
match state {
CommitState::PrepareWal => {
let page_sz = self.get_page_size_unchecked();
let c = wal.borrow_mut().prepare_wal_start(page_sz)?;
let Some(c) = c else {
self.commit_info.state.set(CommitState::GetDbSize);
self.commit_info.write().state = CommitState::GetDbSize;
continue;
};
self.commit_info.state.set(CommitState::PrepareWalSync);
self.commit_info.write().state = CommitState::PrepareWalSync;
if !c.is_completed() {
io_yield_one!(c);
}
}
CommitState::PrepareWalSync => {
let c = wal.borrow_mut().prepare_wal_finish()?;
self.commit_info.state.set(CommitState::GetDbSize);
self.commit_info.write().state = CommitState::GetDbSize;
if !c.is_completed() {
io_yield_one!(c);
}
}
CommitState::GetDbSize => {
let db_size = return_if_io!(self.with_header(|header| header.database_size));
self.commit_info.state.set(CommitState::PrepareFrames {
self.commit_info.write().state = CommitState::PrepareFrames {
db_size: db_size.get(),
});
};
}
CommitState::PrepareFrames { db_size } => {
let now = self.io.now();
self.commit_info.time.set(now);
self.commit_info.write().time = now;
let dirty_ids: Vec<usize> = self.dirty_pages.read().iter().copied().collect();
if dirty_ids.is_empty() {
return Ok(IOResult::Done(PagerCommitResult::WalWritten));
}
let mut completions = self.commit_info.completions.borrow_mut();
completions.clear();
let mut completions = Vec::new();
{
let mut commit_info = self.commit_info.write();
commit_info.completions.clear();
}
let page_sz = self.get_page_size_unchecked();
let mut pages: Vec<PageRef> = Vec::with_capacity(dirty_ids.len().min(IOV_MAX));
let total = dirty_ids.len();
@@ -1530,14 +1533,21 @@ impl Pager {
// Skip sync if synchronous mode is OFF
if sync_mode == crate::SyncMode::Off {
if wal_auto_checkpoint_disabled || !wal.borrow().should_checkpoint() {
*self.commit_info.result.borrow_mut() =
Some(PagerCommitResult::WalWritten);
self.commit_info.state.set(CommitState::Done);
let mut commit_info = self.commit_info.write();
commit_info.completions = completions;
commit_info.result = Some(PagerCommitResult::WalWritten);
commit_info.state = CommitState::Done;
continue;
}
self.commit_info.state.set(CommitState::Checkpoint);
{
let mut commit_info = self.commit_info.write();
commit_info.completions = completions;
commit_info.state = CommitState::Checkpoint;
}
} else {
self.commit_info.state.set(CommitState::SyncWal);
let mut commit_info = self.commit_info.write();
commit_info.completions = completions;
commit_info.state = CommitState::SyncWal;
}
}
}
@@ -1550,37 +1560,41 @@ impl Pager {
}
Err(e) => return Err(e),
};
self.commit_info.completions.borrow_mut().push(c);
self.commit_info.write().completions.push(c);
if wal_auto_checkpoint_disabled || !wal.borrow().should_checkpoint() {
*self.commit_info.result.borrow_mut() = Some(PagerCommitResult::WalWritten);
self.commit_info.state.set(CommitState::Done);
let mut commit_info = self.commit_info.write();
commit_info.result = Some(PagerCommitResult::WalWritten);
commit_info.state = CommitState::Done;
continue;
}
self.commit_info.state.set(CommitState::Checkpoint);
self.commit_info.write().state = CommitState::Checkpoint;
}
CommitState::Checkpoint => {
let mut completions = self.commit_info.completions.borrow_mut();
match self.checkpoint()? {
IOResult::IO(cmp) => {
match cmp {
IOCompletions::Single(c) => {
completions.push(c);
let completions = {
let mut commit_info = self.commit_info.write();
match cmp {
IOCompletions::Single(c) => {
commit_info.completions.push(c);
}
IOCompletions::Many(c) => {
commit_info.completions.extend(c);
}
}
IOCompletions::Many(c) => {
completions.extend(c);
}
}
std::mem::take(&mut commit_info.completions)
};
// TODO: remove serialization of checkpoint path
io_yield_many!(std::mem::take(&mut *completions));
io_yield_many!(completions);
}
IOResult::Done(res) => {
*self.commit_info.result.borrow_mut() =
Some(PagerCommitResult::Checkpointed(res));
let mut commit_info = self.commit_info.write();
commit_info.result = Some(PagerCommitResult::Checkpointed(res));
// Skip sync if synchronous mode is OFF
if sync_mode == crate::SyncMode::Off {
self.commit_info.state.set(CommitState::Done);
commit_info.state = CommitState::Done;
} else {
self.commit_info.state.set(CommitState::SyncDbFile);
commit_info.state = CommitState::SyncDbFile;
}
}
}
@@ -1589,8 +1603,8 @@ impl Pager {
let sync_result =
sqlite3_ondisk::begin_sync(self.db_file.clone(), self.syncing.clone());
self.commit_info
.write()
.completions
.borrow_mut()
.push(match sync_result {
Ok(c) => c,
Err(e) if !data_sync_retry => {
@@ -1598,7 +1612,7 @@ impl Pager {
}
Err(e) => return Err(e),
});
self.commit_info.state.set(CommitState::Done);
self.commit_info.write().state = CommitState::Done;
}
CommitState::Done => {
tracing::debug!(
@@ -1606,19 +1620,25 @@ impl Pager {
self.io
.now()
.to_system_time()
.duration_since(self.commit_info.time.get().to_system_time())
.duration_since(self.commit_info.read().time.to_system_time())
.unwrap()
.as_millis()
);
let mut completions = self.commit_info.completions.borrow_mut();
if completions.iter().all(|c| c.is_completed()) {
completions.clear();
self.commit_info.state.set(CommitState::PrepareWal);
let (should_finish, result, completions) = {
let mut commit_info = self.commit_info.write();
if commit_info.completions.iter().all(|c| c.is_completed()) {
commit_info.completions.clear();
commit_info.state = CommitState::PrepareWal;
(true, commit_info.result.take(), Vec::new())
} else {
(false, None, std::mem::take(&mut commit_info.completions))
}
};
if should_finish {
wal.borrow_mut().finish_append_frames_commit()?;
let result = self.commit_info.result.borrow_mut().take();
return Ok(IOResult::Done(result.expect("commit result should be set")));
}
io_yield_many!(std::mem::take(&mut completions));
io_yield_many!(completions);
}
}
}
@@ -2343,8 +2363,8 @@ impl Pager {
fn reset_internal_states(&self) {
*self.checkpoint_state.write() = CheckpointState::Checkpoint;
self.syncing.store(false, Ordering::SeqCst);
self.commit_info.state.set(CommitState::PrepareWal);
self.commit_info.time.set(self.io.now());
self.commit_info.write().state = CommitState::PrepareWal;
self.commit_info.write().time = self.io.now();
*self.allocate_page_state.write() = AllocatePageState::Start;
*self.free_page_state.write() = FreePageState::Start;
#[cfg(not(feature = "omit_autovacuum"))]