diff --git a/core/storage/pager.rs b/core/storage/pager.rs index da5bae08c..9ed7b162d 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -19,7 +19,7 @@ use std::cell::{Cell, RefCell, UnsafeCell}; use std::collections::HashSet; use std::hash; use std::rc::Rc; -use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU64, AtomicU8, AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use tracing::{instrument, trace, Level}; @@ -248,7 +248,7 @@ impl Page { /// Increment the pin count by 1. A pin count >0 means the page is pinned and not eligible for eviction from the page cache. pub fn pin(&self) { - self.get().pin_count.fetch_add(1, Ordering::Relaxed); + self.get().pin_count.fetch_add(1, Ordering::SeqCst); } /// Decrement the pin count by 1. If the count reaches 0, the page is no longer @@ -377,6 +377,27 @@ pub enum AutoVacuumMode { Incremental, } +impl From for u8 { + fn from(mode: AutoVacuumMode) -> u8 { + match mode { + AutoVacuumMode::None => 0, + AutoVacuumMode::Full => 1, + AutoVacuumMode::Incremental => 2, + } + } +} + +impl From for AutoVacuumMode { + fn from(value: u8) -> AutoVacuumMode { + match value { + 0 => AutoVacuumMode::None, + 1 => AutoVacuumMode::Full, + 2 => AutoVacuumMode::Incremental, + _ => unreachable!("Invalid AutoVacuumMode value: {}", value), + } + } +} + #[derive(Debug, Clone, Copy, PartialEq, Eq)] #[repr(usize)] pub enum DbState { @@ -481,8 +502,8 @@ pub struct Pager { commit_info: CommitInfo, checkpoint_state: RwLock, - syncing: Rc>, - auto_vacuum_mode: Cell, + syncing: Arc, + auto_vacuum_mode: AtomicU8, /// 0 -> Database is empty, /// 1 -> Database is being initialized, /// 2 -> Database is initialized and ready for use. @@ -490,9 +511,9 @@ pub struct Pager { /// Mutex for synchronizing database initialization to prevent race conditions init_lock: Arc>, /// The state of the current allocate page operation. - allocate_page_state: RefCell, + allocate_page_state: RwLock, /// The state of the current allocate page1 operation. - allocate_page1_state: RefCell, + allocate_page1_state: RwLock, /// Cache page_size and reserved_space at Pager init and reuse for subsequent /// `usable_space` calls. TODO: Invalidate reserved_space when we add the functionality /// to change it. @@ -573,9 +594,9 @@ impl Pager { init_lock: Arc>, ) -> Result { let allocate_page1_state = if !db_state.is_initialized() { - RefCell::new(AllocatePage1State::Start) + RwLock::new(AllocatePage1State::Start) } else { - RefCell::new(AllocatePage1State::Done) + RwLock::new(AllocatePage1State::Done) }; let now = io.now(); Ok(Self { @@ -592,17 +613,17 @@ impl Pager { state: CommitState::PrepareWal.into(), time: now.into(), }, - syncing: Rc::new(Cell::new(false)), + syncing: Arc::new(AtomicBool::new(false)), checkpoint_state: RwLock::new(CheckpointState::Checkpoint), buffer_pool, - auto_vacuum_mode: Cell::new(AutoVacuumMode::None), + auto_vacuum_mode: AtomicU8::new(AutoVacuumMode::None.into()), db_state, init_lock, allocate_page1_state, page_size: Cell::new(None), reserved_space: Cell::new(None), free_page_state: RefCell::new(FreePageState::Start), - allocate_page_state: RefCell::new(AllocatePageState::Start), + allocate_page_state: RwLock::new(AllocatePageState::Start), max_page_count: Cell::new(DEFAULT_MAX_PAGE_COUNT), #[cfg(not(feature = "omit_autovacuum"))] ptrmap_get_state: RefCell::new(PtrMapGetState::Start), @@ -638,11 +659,11 @@ impl Pager { } pub fn get_auto_vacuum_mode(&self) -> AutoVacuumMode { - self.auto_vacuum_mode.get() + self.auto_vacuum_mode.load(Ordering::SeqCst).into() } pub fn set_auto_vacuum_mode(&self, mode: AutoVacuumMode) { - self.auto_vacuum_mode.set(mode); + self.auto_vacuum_mode.store(mode.into(), Ordering::SeqCst); } /// Retrieves the pointer map entry for a given database page. @@ -858,7 +879,8 @@ impl Pager { // If autovacuum is enabled, we need to allocate a new page number that is greater than the largest root page number #[cfg(not(feature = "omit_autovacuum"))] { - let auto_vacuum_mode = self.auto_vacuum_mode.get(); + let auto_vacuum_mode = + AutoVacuumMode::from(self.auto_vacuum_mode.load(Ordering::SeqCst)); match auto_vacuum_mode { AutoVacuumMode::None => { let page = @@ -1635,7 +1657,10 @@ impl Pager { io_yield_one!(c); } CheckpointState::CheckpointDone { res } => { - turso_assert!(!self.syncing.get(), "syncing should be done"); + turso_assert!( + !self.syncing.load(Ordering::SeqCst), + "syncing should be done" + ); *self.checkpoint_state.write() = CheckpointState::Checkpoint; return Ok(IOResult::Done(res)); } @@ -1898,7 +1923,7 @@ impl Pager { #[instrument(skip_all, level = Level::DEBUG)] pub fn allocate_page1(&self) -> Result> { - let state = self.allocate_page1_state.borrow().clone(); + let state = self.allocate_page1_state.read().clone(); match state { AllocatePage1State::Start => { tracing::trace!("allocate_page1(Start)"); @@ -1948,8 +1973,7 @@ impl Pager { ); let c = begin_write_btree_page(self, &page1)?; - self.allocate_page1_state - .replace(AllocatePage1State::Writing { page: page1 }); + *self.allocate_page1_state.write() = AllocatePage1State::Writing { page: page1 }; io_yield_one!(c); } AllocatePage1State::Writing { page } => { @@ -1961,7 +1985,7 @@ impl Pager { LimboError::InternalError(format!("Failed to insert page 1 into cache: {e:?}")) })?; self.db_state.set(DbState::Initialized); - self.allocate_page1_state.replace(AllocatePage1State::Done); + *self.allocate_page1_state.write() = AllocatePage1State::Done; Ok(IOResult::Done(page.clone())) } AllocatePage1State::Done => unreachable!("cannot try to allocate page 1 again"), @@ -1970,7 +1994,7 @@ impl Pager { pub fn allocating_page1(&self) -> bool { matches!( - *self.allocate_page1_state.borrow(), + *self.allocate_page1_state.read(), AllocatePage1State::Writing { .. } ) } @@ -1994,7 +2018,7 @@ impl Pager { let header = header_ref.borrow_mut(); loop { - let mut state = self.allocate_page_state.borrow_mut(); + let mut state = self.allocate_page_state.write(); tracing::debug!("allocate_page(state={:?})", state); match &mut *state { AllocatePageState::Start => { @@ -2010,8 +2034,10 @@ impl Pager { // If the following conditions are met, allocate a pointer map page, add to cache and increment the database size // - autovacuum is enabled // - the last page is a pointer map page - if matches!(self.auto_vacuum_mode.get(), AutoVacuumMode::Full) - && is_ptrmap_page(new_db_size + 1, header.page_size.get() as usize) + if matches!( + AutoVacuumMode::from(self.auto_vacuum_mode.load(Ordering::SeqCst)), + AutoVacuumMode::Full + ) && is_ptrmap_page(new_db_size + 1, header.page_size.get() as usize) { // we will allocate a ptrmap page, so increment size new_db_size += 1; @@ -2254,10 +2280,10 @@ impl Pager { fn reset_internal_states(&self) { *self.checkpoint_state.write() = CheckpointState::Checkpoint; - self.syncing.replace(false); + self.syncing.store(false, Ordering::SeqCst); self.commit_info.state.set(CommitState::PrepareWal); self.commit_info.time.set(self.io.now()); - self.allocate_page_state.replace(AllocatePageState::Start); + *self.allocate_page_state.write() = AllocatePageState::Start; self.free_page_state.replace(FreePageState::Start); #[cfg(not(feature = "omit_autovacuum"))] { diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 710566911..56316239e 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -66,11 +66,9 @@ use crate::{ bail_corrupt_error, turso_assert, CompletionError, File, IOContext, Result, WalFileShared, }; use parking_lot::RwLock; -use std::cell::Cell; use std::collections::{BTreeMap, HashMap}; use std::mem::MaybeUninit; use std::pin::Pin; -use std::rc::Rc; use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, AtomicUsize, Ordering}; use std::sync::Arc; @@ -1066,12 +1064,12 @@ pub fn write_pages_vectored( #[instrument(skip_all, level = Level::DEBUG)] pub fn begin_sync( db_file: Arc, - syncing: Rc>, + syncing: Arc, ) -> Result { - assert!(!syncing.get()); - syncing.set(true); + assert!(!syncing.load(Ordering::SeqCst)); + syncing.store(true, Ordering::SeqCst); let completion = Completion::new_sync(move |_| { - syncing.set(false); + syncing.store(false, Ordering::SeqCst); }); #[allow(clippy::arc_with_non_send_sync)] db_file.sync(completion)