From 2215cccebb039560b65991c599d6d08e55817c25 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Wed, 17 Sep 2025 13:09:45 +0300 Subject: [PATCH 1/3] core/storage: Wrap Pager::syncing in Arc --- core/storage/pager.rs | 15 +++++++++------ core/storage/sqlite3_ondisk.rs | 10 ++++------ 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/core/storage/pager.rs b/core/storage/pager.rs index da5bae08c..b20b651b5 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -19,7 +19,7 @@ use std::cell::{Cell, RefCell, UnsafeCell}; use std::collections::HashSet; use std::hash; use std::rc::Rc; -use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use tracing::{instrument, trace, Level}; @@ -248,7 +248,7 @@ impl Page { /// Increment the pin count by 1. A pin count >0 means the page is pinned and not eligible for eviction from the page cache. pub fn pin(&self) { - self.get().pin_count.fetch_add(1, Ordering::Relaxed); + self.get().pin_count.fetch_add(1, Ordering::SeqCst); } /// Decrement the pin count by 1. If the count reaches 0, the page is no longer @@ -481,7 +481,7 @@ pub struct Pager { commit_info: CommitInfo, checkpoint_state: RwLock, - syncing: Rc>, + syncing: Arc, auto_vacuum_mode: Cell, /// 0 -> Database is empty, /// 1 -> Database is being initialized, @@ -592,7 +592,7 @@ impl Pager { state: CommitState::PrepareWal.into(), time: now.into(), }, - syncing: Rc::new(Cell::new(false)), + syncing: Arc::new(AtomicBool::new(false)), checkpoint_state: RwLock::new(CheckpointState::Checkpoint), buffer_pool, auto_vacuum_mode: Cell::new(AutoVacuumMode::None), @@ -1635,7 +1635,10 @@ impl Pager { io_yield_one!(c); } CheckpointState::CheckpointDone { res } => { - turso_assert!(!self.syncing.get(), "syncing should be done"); + turso_assert!( + !self.syncing.load(Ordering::SeqCst), + "syncing should be done" + ); *self.checkpoint_state.write() = CheckpointState::Checkpoint; return Ok(IOResult::Done(res)); } @@ -2254,7 +2257,7 @@ impl Pager { fn reset_internal_states(&self) { *self.checkpoint_state.write() = CheckpointState::Checkpoint; - self.syncing.replace(false); + self.syncing.store(false, Ordering::SeqCst); self.commit_info.state.set(CommitState::PrepareWal); self.commit_info.time.set(self.io.now()); self.allocate_page_state.replace(AllocatePageState::Start); diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 710566911..56316239e 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -66,11 +66,9 @@ use crate::{ bail_corrupt_error, turso_assert, CompletionError, File, IOContext, Result, WalFileShared, }; use parking_lot::RwLock; -use std::cell::Cell; use std::collections::{BTreeMap, HashMap}; use std::mem::MaybeUninit; use std::pin::Pin; -use std::rc::Rc; use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, AtomicUsize, Ordering}; use std::sync::Arc; @@ -1066,12 +1064,12 @@ pub fn write_pages_vectored( #[instrument(skip_all, level = Level::DEBUG)] pub fn begin_sync( db_file: Arc, - syncing: Rc>, + syncing: Arc, ) -> Result { - assert!(!syncing.get()); - syncing.set(true); + assert!(!syncing.load(Ordering::SeqCst)); + syncing.store(true, Ordering::SeqCst); let completion = Completion::new_sync(move |_| { - syncing.set(false); + syncing.store(false, Ordering::SeqCst); }); #[allow(clippy::arc_with_non_send_sync)] db_file.sync(completion) From 365f606cce406ced1ab71d2ebcc6e7993fbdde81 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Wed, 17 Sep 2025 13:15:00 +0300 Subject: [PATCH 2/3] core/storage: Use AtomicU8 for Pager::auto_vacuum_mode --- core/storage/pager.rs | 40 ++++++++++++++++++++++++++++++++-------- 1 file changed, 32 insertions(+), 8 deletions(-) diff --git a/core/storage/pager.rs b/core/storage/pager.rs index b20b651b5..688d2a80a 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -19,7 +19,7 @@ use std::cell::{Cell, RefCell, UnsafeCell}; use std::collections::HashSet; use std::hash; use std::rc::Rc; -use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU64, AtomicU8, AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use tracing::{instrument, trace, Level}; @@ -377,6 +377,27 @@ pub enum AutoVacuumMode { Incremental, } +impl From for u8 { + fn from(mode: AutoVacuumMode) -> u8 { + match mode { + AutoVacuumMode::None => 0, + AutoVacuumMode::Full => 1, + AutoVacuumMode::Incremental => 2, + } + } +} + +impl From for AutoVacuumMode { + fn from(value: u8) -> AutoVacuumMode { + match value { + 0 => AutoVacuumMode::None, + 1 => AutoVacuumMode::Full, + 2 => AutoVacuumMode::Incremental, + _ => unreachable!("Invalid AutoVacuumMode value: {}", value), + } + } +} + #[derive(Debug, Clone, Copy, PartialEq, Eq)] #[repr(usize)] pub enum DbState { @@ -482,7 +503,7 @@ pub struct Pager { commit_info: CommitInfo, checkpoint_state: RwLock, syncing: Arc, - auto_vacuum_mode: Cell, + auto_vacuum_mode: AtomicU8, /// 0 -> Database is empty, /// 1 -> Database is being initialized, /// 2 -> Database is initialized and ready for use. @@ -595,7 +616,7 @@ impl Pager { syncing: Arc::new(AtomicBool::new(false)), checkpoint_state: RwLock::new(CheckpointState::Checkpoint), buffer_pool, - auto_vacuum_mode: Cell::new(AutoVacuumMode::None), + auto_vacuum_mode: AtomicU8::new(AutoVacuumMode::None.into()), db_state, init_lock, allocate_page1_state, @@ -638,11 +659,11 @@ impl Pager { } pub fn get_auto_vacuum_mode(&self) -> AutoVacuumMode { - self.auto_vacuum_mode.get() + self.auto_vacuum_mode.load(Ordering::SeqCst).into() } pub fn set_auto_vacuum_mode(&self, mode: AutoVacuumMode) { - self.auto_vacuum_mode.set(mode); + self.auto_vacuum_mode.store(mode.into(), Ordering::SeqCst); } /// Retrieves the pointer map entry for a given database page. @@ -858,7 +879,8 @@ impl Pager { // If autovacuum is enabled, we need to allocate a new page number that is greater than the largest root page number #[cfg(not(feature = "omit_autovacuum"))] { - let auto_vacuum_mode = self.auto_vacuum_mode.get(); + let auto_vacuum_mode = + AutoVacuumMode::from(self.auto_vacuum_mode.load(Ordering::SeqCst)); match auto_vacuum_mode { AutoVacuumMode::None => { let page = @@ -2013,8 +2035,10 @@ impl Pager { // If the following conditions are met, allocate a pointer map page, add to cache and increment the database size // - autovacuum is enabled // - the last page is a pointer map page - if matches!(self.auto_vacuum_mode.get(), AutoVacuumMode::Full) - && is_ptrmap_page(new_db_size + 1, header.page_size.get() as usize) + if matches!( + AutoVacuumMode::from(self.auto_vacuum_mode.load(Ordering::SeqCst)), + AutoVacuumMode::Full + ) && is_ptrmap_page(new_db_size + 1, header.page_size.get() as usize) { // we will allocate a ptrmap page, so increment size new_db_size += 1; From e6d994dee02184f6f6e740796acb032dd39ce637 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Wed, 17 Sep 2025 13:22:54 +0300 Subject: [PATCH 3/3] core/storage: Wrap Pager::allocate_page_state with RwLock --- core/storage/pager.rs | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 688d2a80a..9ed7b162d 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -511,9 +511,9 @@ pub struct Pager { /// Mutex for synchronizing database initialization to prevent race conditions init_lock: Arc>, /// The state of the current allocate page operation. - allocate_page_state: RefCell, + allocate_page_state: RwLock, /// The state of the current allocate page1 operation. - allocate_page1_state: RefCell, + allocate_page1_state: RwLock, /// Cache page_size and reserved_space at Pager init and reuse for subsequent /// `usable_space` calls. TODO: Invalidate reserved_space when we add the functionality /// to change it. @@ -594,9 +594,9 @@ impl Pager { init_lock: Arc>, ) -> Result { let allocate_page1_state = if !db_state.is_initialized() { - RefCell::new(AllocatePage1State::Start) + RwLock::new(AllocatePage1State::Start) } else { - RefCell::new(AllocatePage1State::Done) + RwLock::new(AllocatePage1State::Done) }; let now = io.now(); Ok(Self { @@ -623,7 +623,7 @@ impl Pager { page_size: Cell::new(None), reserved_space: Cell::new(None), free_page_state: RefCell::new(FreePageState::Start), - allocate_page_state: RefCell::new(AllocatePageState::Start), + allocate_page_state: RwLock::new(AllocatePageState::Start), max_page_count: Cell::new(DEFAULT_MAX_PAGE_COUNT), #[cfg(not(feature = "omit_autovacuum"))] ptrmap_get_state: RefCell::new(PtrMapGetState::Start), @@ -1923,7 +1923,7 @@ impl Pager { #[instrument(skip_all, level = Level::DEBUG)] pub fn allocate_page1(&self) -> Result> { - let state = self.allocate_page1_state.borrow().clone(); + let state = self.allocate_page1_state.read().clone(); match state { AllocatePage1State::Start => { tracing::trace!("allocate_page1(Start)"); @@ -1973,8 +1973,7 @@ impl Pager { ); let c = begin_write_btree_page(self, &page1)?; - self.allocate_page1_state - .replace(AllocatePage1State::Writing { page: page1 }); + *self.allocate_page1_state.write() = AllocatePage1State::Writing { page: page1 }; io_yield_one!(c); } AllocatePage1State::Writing { page } => { @@ -1986,7 +1985,7 @@ impl Pager { LimboError::InternalError(format!("Failed to insert page 1 into cache: {e:?}")) })?; self.db_state.set(DbState::Initialized); - self.allocate_page1_state.replace(AllocatePage1State::Done); + *self.allocate_page1_state.write() = AllocatePage1State::Done; Ok(IOResult::Done(page.clone())) } AllocatePage1State::Done => unreachable!("cannot try to allocate page 1 again"), @@ -1995,7 +1994,7 @@ impl Pager { pub fn allocating_page1(&self) -> bool { matches!( - *self.allocate_page1_state.borrow(), + *self.allocate_page1_state.read(), AllocatePage1State::Writing { .. } ) } @@ -2019,7 +2018,7 @@ impl Pager { let header = header_ref.borrow_mut(); loop { - let mut state = self.allocate_page_state.borrow_mut(); + let mut state = self.allocate_page_state.write(); tracing::debug!("allocate_page(state={:?})", state); match &mut *state { AllocatePageState::Start => { @@ -2284,7 +2283,7 @@ impl Pager { self.syncing.store(false, Ordering::SeqCst); self.commit_info.state.set(CommitState::PrepareWal); self.commit_info.time.set(self.io.now()); - self.allocate_page_state.replace(AllocatePageState::Start); + *self.allocate_page_state.write() = AllocatePageState::Start; self.free_page_state.replace(FreePageState::Start); #[cfg(not(feature = "omit_autovacuum"))] {