From aba596441c4c658932c7947a5b5e381a50fdda6b Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Sun, 28 Sep 2025 13:38:08 +0300 Subject: [PATCH] core/storage: Wrap WalFile::max_frame_read_lock_index with AtomicUsize --- core/storage/wal.rs | 34 +++++++++++++++++++++------------- 1 file changed, 21 insertions(+), 13 deletions(-) diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 6172bb1de..4307345a2 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -8,7 +8,7 @@ use tracing::{instrument, Level}; use parking_lot::RwLock; use std::fmt::{Debug, Formatter}; -use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, AtomicUsize, Ordering}; use std::{cell::Cell, fmt, sync::Arc}; use super::buffer_pool::BufferPool; @@ -569,7 +569,7 @@ pub struct WalFile { // min and max frames for this connection /// This is the index to the read_lock in WalFileShared that we are holding. This lock contains /// the max frame for this connection. - max_frame_read_lock_index: Cell, + max_frame_read_lock_index: AtomicUsize, /// Max frame allowed to lookup range=(minframe..max_frame) max_frame: u64, /// Start of range to look for frames range=(minframe..max_frame) @@ -812,9 +812,11 @@ impl Wal for WalFile { #[instrument(skip_all, level = Level::DEBUG)] fn begin_read_tx(&mut self) -> Result { turso_assert!( - self.max_frame_read_lock_index.get().eq(&NO_LOCK_HELD), + self.max_frame_read_lock_index + .load(Ordering::Acquire) + .eq(&NO_LOCK_HELD), "cannot start a new read tx without ending an existing one, lock_value={}, expected={}", - self.max_frame_read_lock_index.get(), + self.max_frame_read_lock_index.load(Ordering::Acquire), NO_LOCK_HELD ); let (shared_max, nbackfills, last_checksum, checkpoint_seq) = { @@ -843,7 +845,8 @@ impl Wal for WalFile { // we need to keep self.max_frame set to the appropriate // max frame in the wal at the time this transaction starts. self.max_frame = shared_max; - self.max_frame_read_lock_index.set(lock_0_idx); + self.max_frame_read_lock_index + .store(lock_0_idx, Ordering::Release); self.min_frame = nbackfills + 1; self.last_checksum = last_checksum; return Ok(db_changed); @@ -937,7 +940,8 @@ impl Wal for WalFile { } self.min_frame = nb2 + 1; self.max_frame = best_mark as u64; - self.max_frame_read_lock_index.set(best_idx as usize); + self.max_frame_read_lock_index + .store(best_idx as usize, Ordering::Release); tracing::debug!( "begin_read_tx(min={}, max={}, slot={}, max_frame_in_wal={})", self.min_frame, @@ -952,10 +956,11 @@ impl Wal for WalFile { #[inline(always)] #[instrument(skip_all, level = Level::DEBUG)] fn end_read_tx(&self) { - let slot = self.max_frame_read_lock_index.get(); + let slot = self.max_frame_read_lock_index.load(Ordering::Acquire); if slot != NO_LOCK_HELD { self.get_shared_mut().read_locks[slot].unlock(); - self.max_frame_read_lock_index.set(NO_LOCK_HELD); + self.max_frame_read_lock_index + .store(NO_LOCK_HELD, Ordering::Release); tracing::debug!("end_read_tx(slot={slot})"); } else { tracing::debug!("end_read_tx(slot=no_lock)"); @@ -972,7 +977,7 @@ impl Wal for WalFile { // assert(pWal->readLock >= 0); // assert(pWal->writeLock == 0 && pWal->iReCksum == 0); turso_assert!( - self.max_frame_read_lock_index.get() != NO_LOCK_HELD, + self.max_frame_read_lock_index.load(Ordering::Acquire) != NO_LOCK_HELD, "must have a read transaction to begin a write transaction" ); if !shared.write_lock.write() { @@ -1033,7 +1038,9 @@ impl Wal for WalFile { // min_frame is set to nbackfill + 1 and max_frame is set to shared_max_frame // // by default, SQLite tries to restart log file in this case - but for now let's keep it simple in the turso-db - if self.max_frame_read_lock_index.get() == 0 && self.max_frame < self.min_frame { + if self.max_frame_read_lock_index.load(Ordering::Acquire) == 0 + && self.max_frame < self.min_frame + { tracing::debug!( "find_frame(page_id={}, frame_watermark={:?}): max_frame is 0 - read from DB file", page_id, @@ -1618,7 +1625,7 @@ impl WalFile { checkpoint_seq: AtomicU32::new(0), syncing: Arc::new(AtomicBool::new(false)), min_frame: 0, - max_frame_read_lock_index: NO_LOCK_HELD.into(), + max_frame_read_lock_index: AtomicUsize::new(NO_LOCK_HELD), last_checksum, prev_checkpoint: CheckpointResult::default(), checkpoint_guard: None, @@ -1693,7 +1700,8 @@ impl WalFile { } fn reset_internal_states(&mut self) { - self.max_frame_read_lock_index.set(NO_LOCK_HELD); + self.max_frame_read_lock_index + .store(NO_LOCK_HELD, Ordering::Release); self.ongoing_checkpoint.reset(); self.syncing.store(false, Ordering::SeqCst); } @@ -3037,7 +3045,7 @@ pub mod test { { let wal_any = wal.as_any(); if let Some(wal_file) = wal_any.downcast_ref::() { - return wal_file.max_frame_read_lock_index.get() == expected_slot; + return wal_file.max_frame_read_lock_index.load(Ordering::Acquire) == expected_slot; } }