Merge 'Convert more Pager fields towards being Send' from Pekka Enberg

Reviewed-by: Jussi Saurio <jussi.saurio@gmail.com>

Closes #3177
This commit is contained in:
Pekka Enberg
2025-09-18 11:21:20 +03:00
committed by GitHub
2 changed files with 55 additions and 31 deletions

View File

@@ -19,7 +19,7 @@ use std::cell::{Cell, RefCell, UnsafeCell};
use std::collections::HashSet;
use std::hash;
use std::rc::Rc;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicU8, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use tracing::{instrument, trace, Level};
@@ -248,7 +248,7 @@ impl Page {
/// Increment the pin count by 1. A pin count >0 means the page is pinned and not eligible for eviction from the page cache.
pub fn pin(&self) {
self.get().pin_count.fetch_add(1, Ordering::Relaxed);
self.get().pin_count.fetch_add(1, Ordering::SeqCst);
}
/// Decrement the pin count by 1. If the count reaches 0, the page is no longer
@@ -377,6 +377,27 @@ pub enum AutoVacuumMode {
Incremental,
}
impl From<AutoVacuumMode> for u8 {
fn from(mode: AutoVacuumMode) -> u8 {
match mode {
AutoVacuumMode::None => 0,
AutoVacuumMode::Full => 1,
AutoVacuumMode::Incremental => 2,
}
}
}
impl From<u8> for AutoVacuumMode {
fn from(value: u8) -> AutoVacuumMode {
match value {
0 => AutoVacuumMode::None,
1 => AutoVacuumMode::Full,
2 => AutoVacuumMode::Incremental,
_ => unreachable!("Invalid AutoVacuumMode value: {}", value),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(usize)]
pub enum DbState {
@@ -481,8 +502,8 @@ pub struct Pager {
commit_info: CommitInfo,
checkpoint_state: RwLock<CheckpointState>,
syncing: Rc<Cell<bool>>,
auto_vacuum_mode: Cell<AutoVacuumMode>,
syncing: Arc<AtomicBool>,
auto_vacuum_mode: AtomicU8,
/// 0 -> Database is empty,
/// 1 -> Database is being initialized,
/// 2 -> Database is initialized and ready for use.
@@ -490,9 +511,9 @@ pub struct Pager {
/// Mutex for synchronizing database initialization to prevent race conditions
init_lock: Arc<Mutex<()>>,
/// The state of the current allocate page operation.
allocate_page_state: RefCell<AllocatePageState>,
allocate_page_state: RwLock<AllocatePageState>,
/// The state of the current allocate page1 operation.
allocate_page1_state: RefCell<AllocatePage1State>,
allocate_page1_state: RwLock<AllocatePage1State>,
/// Cache page_size and reserved_space at Pager init and reuse for subsequent
/// `usable_space` calls. TODO: Invalidate reserved_space when we add the functionality
/// to change it.
@@ -573,9 +594,9 @@ impl Pager {
init_lock: Arc<Mutex<()>>,
) -> Result<Self> {
let allocate_page1_state = if !db_state.is_initialized() {
RefCell::new(AllocatePage1State::Start)
RwLock::new(AllocatePage1State::Start)
} else {
RefCell::new(AllocatePage1State::Done)
RwLock::new(AllocatePage1State::Done)
};
let now = io.now();
Ok(Self {
@@ -592,17 +613,17 @@ impl Pager {
state: CommitState::PrepareWal.into(),
time: now.into(),
},
syncing: Rc::new(Cell::new(false)),
syncing: Arc::new(AtomicBool::new(false)),
checkpoint_state: RwLock::new(CheckpointState::Checkpoint),
buffer_pool,
auto_vacuum_mode: Cell::new(AutoVacuumMode::None),
auto_vacuum_mode: AtomicU8::new(AutoVacuumMode::None.into()),
db_state,
init_lock,
allocate_page1_state,
page_size: Cell::new(None),
reserved_space: Cell::new(None),
free_page_state: RefCell::new(FreePageState::Start),
allocate_page_state: RefCell::new(AllocatePageState::Start),
allocate_page_state: RwLock::new(AllocatePageState::Start),
max_page_count: Cell::new(DEFAULT_MAX_PAGE_COUNT),
#[cfg(not(feature = "omit_autovacuum"))]
ptrmap_get_state: RefCell::new(PtrMapGetState::Start),
@@ -638,11 +659,11 @@ impl Pager {
}
pub fn get_auto_vacuum_mode(&self) -> AutoVacuumMode {
self.auto_vacuum_mode.get()
self.auto_vacuum_mode.load(Ordering::SeqCst).into()
}
pub fn set_auto_vacuum_mode(&self, mode: AutoVacuumMode) {
self.auto_vacuum_mode.set(mode);
self.auto_vacuum_mode.store(mode.into(), Ordering::SeqCst);
}
/// Retrieves the pointer map entry for a given database page.
@@ -858,7 +879,8 @@ impl Pager {
// If autovacuum is enabled, we need to allocate a new page number that is greater than the largest root page number
#[cfg(not(feature = "omit_autovacuum"))]
{
let auto_vacuum_mode = self.auto_vacuum_mode.get();
let auto_vacuum_mode =
AutoVacuumMode::from(self.auto_vacuum_mode.load(Ordering::SeqCst));
match auto_vacuum_mode {
AutoVacuumMode::None => {
let page =
@@ -1635,7 +1657,10 @@ impl Pager {
io_yield_one!(c);
}
CheckpointState::CheckpointDone { res } => {
turso_assert!(!self.syncing.get(), "syncing should be done");
turso_assert!(
!self.syncing.load(Ordering::SeqCst),
"syncing should be done"
);
*self.checkpoint_state.write() = CheckpointState::Checkpoint;
return Ok(IOResult::Done(res));
}
@@ -1898,7 +1923,7 @@ impl Pager {
#[instrument(skip_all, level = Level::DEBUG)]
pub fn allocate_page1(&self) -> Result<IOResult<PageRef>> {
let state = self.allocate_page1_state.borrow().clone();
let state = self.allocate_page1_state.read().clone();
match state {
AllocatePage1State::Start => {
tracing::trace!("allocate_page1(Start)");
@@ -1948,8 +1973,7 @@ impl Pager {
);
let c = begin_write_btree_page(self, &page1)?;
self.allocate_page1_state
.replace(AllocatePage1State::Writing { page: page1 });
*self.allocate_page1_state.write() = AllocatePage1State::Writing { page: page1 };
io_yield_one!(c);
}
AllocatePage1State::Writing { page } => {
@@ -1961,7 +1985,7 @@ impl Pager {
LimboError::InternalError(format!("Failed to insert page 1 into cache: {e:?}"))
})?;
self.db_state.set(DbState::Initialized);
self.allocate_page1_state.replace(AllocatePage1State::Done);
*self.allocate_page1_state.write() = AllocatePage1State::Done;
Ok(IOResult::Done(page.clone()))
}
AllocatePage1State::Done => unreachable!("cannot try to allocate page 1 again"),
@@ -1970,7 +1994,7 @@ impl Pager {
pub fn allocating_page1(&self) -> bool {
matches!(
*self.allocate_page1_state.borrow(),
*self.allocate_page1_state.read(),
AllocatePage1State::Writing { .. }
)
}
@@ -1994,7 +2018,7 @@ impl Pager {
let header = header_ref.borrow_mut();
loop {
let mut state = self.allocate_page_state.borrow_mut();
let mut state = self.allocate_page_state.write();
tracing::debug!("allocate_page(state={:?})", state);
match &mut *state {
AllocatePageState::Start => {
@@ -2010,8 +2034,10 @@ impl Pager {
// If the following conditions are met, allocate a pointer map page, add to cache and increment the database size
// - autovacuum is enabled
// - the last page is a pointer map page
if matches!(self.auto_vacuum_mode.get(), AutoVacuumMode::Full)
&& is_ptrmap_page(new_db_size + 1, header.page_size.get() as usize)
if matches!(
AutoVacuumMode::from(self.auto_vacuum_mode.load(Ordering::SeqCst)),
AutoVacuumMode::Full
) && is_ptrmap_page(new_db_size + 1, header.page_size.get() as usize)
{
// we will allocate a ptrmap page, so increment size
new_db_size += 1;
@@ -2254,10 +2280,10 @@ impl Pager {
fn reset_internal_states(&self) {
*self.checkpoint_state.write() = CheckpointState::Checkpoint;
self.syncing.replace(false);
self.syncing.store(false, Ordering::SeqCst);
self.commit_info.state.set(CommitState::PrepareWal);
self.commit_info.time.set(self.io.now());
self.allocate_page_state.replace(AllocatePageState::Start);
*self.allocate_page_state.write() = AllocatePageState::Start;
self.free_page_state.replace(FreePageState::Start);
#[cfg(not(feature = "omit_autovacuum"))]
{

View File

@@ -66,11 +66,9 @@ use crate::{
bail_corrupt_error, turso_assert, CompletionError, File, IOContext, Result, WalFileShared,
};
use parking_lot::RwLock;
use std::cell::Cell;
use std::collections::{BTreeMap, HashMap};
use std::mem::MaybeUninit;
use std::pin::Pin;
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
@@ -1066,12 +1064,12 @@ pub fn write_pages_vectored(
#[instrument(skip_all, level = Level::DEBUG)]
pub fn begin_sync(
db_file: Arc<dyn DatabaseStorage>,
syncing: Rc<Cell<bool>>,
syncing: Arc<AtomicBool>,
) -> Result<Completion> {
assert!(!syncing.get());
syncing.set(true);
assert!(!syncing.load(Ordering::SeqCst));
syncing.store(true, Ordering::SeqCst);
let completion = Completion::new_sync(move |_| {
syncing.set(false);
syncing.store(false, Ordering::SeqCst);
});
#[allow(clippy::arc_with_non_send_sync)]
db_file.sync(completion)