From f81bc3236a0ae6a700d0db6fff6c864cd33fc18f Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Sat, 9 Aug 2025 14:08:25 -0400 Subject: [PATCH] Use a single packed u64 for LimboRwLock and relax memory ordering --- core/storage/sqlite3_ondisk.rs | 22 +-- core/storage/wal.rs | 346 ++++++++++++++++----------------- 2 files changed, 177 insertions(+), 191 deletions(-) diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index d2c24770a..9ca59d5e7 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -48,7 +48,7 @@ use pack1::{I32BE, U16BE, U32BE}; use tracing::{instrument, Level}; use super::pager::PageRef; -use super::wal::LimboRwLock; +use super::wal::TursoRwLock; use crate::error::LimboError; use crate::fast_lock::SpinLock; use crate::io::{Buffer, Complete, Completion}; @@ -60,7 +60,7 @@ use crate::storage::btree::{payload_overflow_threshold_max, payload_overflow_thr use crate::storage::buffer_pool::BufferPool; use crate::storage::database::DatabaseStorage; use crate::storage::pager::Pager; -use crate::storage::wal::PendingFlush; +use crate::storage::wal::{PendingFlush, READMARK_NOT_USED}; use crate::types::{RawSlice, RefValue, SerialType, SerialTypeKind, TextRef, TextSubtype}; use crate::{turso_assert, File, Result, WalFileShared}; use std::cell::{RefCell, UnsafeCell}; @@ -1557,6 +1557,12 @@ pub fn read_entire_wal_dumb(file: &Arc) -> Result) -> Result Self { - Self { - lock: AtomicU32::new(NO_LOCK), - nreads: AtomicU32::new(0), - value: AtomicU32::new(READMARK_NOT_USED), +pub const READMARK_NOT_USED: u32 = 0xffffffff; +const NO_LOCK_HELD: usize = usize::MAX; + +impl TursoRwLock { + /// Bit 0: Writer flag + const WRITER: u64 = 0b1; + + /// Reader increment value (bit 1) + const READER_INC: u64 = 0b10; + + /// Reader count starts at bit 1 + const READER_SHIFT: u32 = 1; + + /// Mask for 31 reader bits [31:1] + const READER_COUNT_MASK: u64 = 0x7fff_ffffu64 << Self::READER_SHIFT; + + /// Value starts at bit 32 + const VALUE_SHIFT: u32 = 32; + + /// Mask for 32 value bits [63:32] + const VALUE_MASK: u64 = 0xffff_ffffu64 << Self::VALUE_SHIFT; + + #[inline] + pub const fn new() -> Self { + Self(AtomicU64::new(0)) + } + + const fn has_writer(val: u64) -> bool { + val & Self::WRITER != 0 + } + + const fn has_readers(val: u64) -> bool { + val & Self::READER_COUNT_MASK != 0 + } + + #[inline] + /// Try to acquire a shared read lock. + pub fn read(&self) -> bool { + let cur = self.0.load(Ordering::Acquire); + // If a writer is present we cannot proceed. + if Self::has_writer(cur) { + return false; + } + // 2 billion readers is a high enough number where we will skip the branch + // and assume that we are not overflowing :) + let desired = cur.wrapping_add(Self::READER_INC); + // for success, Acquire establishes happens-before relationship with the previous Release from unlock + // for failure we only care about reading it for the next iteration so we can use Relaxed. + self.0 + .compare_exchange_weak(cur, desired, Ordering::Acquire, Ordering::Relaxed) + .is_ok() + } + + /// Try to take an exclusive lock. Succeeds if no readers and no writer. + #[inline] + pub fn write(&self) -> bool { + let cur = self.0.load(Ordering::Acquire); + // exclusive lock, so require no readers and no writer + if Self::has_writer(cur) || Self::has_readers(cur) { + return false; + } + let desired = cur | Self::WRITER; + self.0 + .compare_exchange(cur, desired, Ordering::Acquire, Ordering::Relaxed) + .is_ok() + } + + #[inline] + /// Unlock whatever lock is currently held. + /// For write lock: clear writer bit + /// For read lock: decrement reader count + pub fn unlock(&self) { + let cur = self.0.load(Ordering::Acquire); + if (cur & Self::WRITER) != 0 { + // Clear writer bit, preserve everything else (including value) + // Release ordering ensures all our writes are visible to next acquirer + let cur = self.0.fetch_and(!Self::WRITER, Ordering::Release); + turso_assert!(!Self::has_readers(cur), "write lock was held with readers"); + } else { + // drop one reader, last reader leaves value intact + let prev = self.0.fetch_sub(Self::READER_INC, Ordering::Release); + turso_assert!( + (prev & Self::READER_COUNT_MASK) >= Self::READER_INC, + "unlock called with no readers" + ); } } - /// Shared lock. Returns true if it was successful, false if it couldn't lock it - pub fn read(&mut self) -> bool { - let lock = self.lock.load(Ordering::SeqCst); - let ok = match lock { - NO_LOCK => { - let res = self.lock.compare_exchange( - lock, - SHARED_LOCK, - Ordering::SeqCst, - Ordering::SeqCst, - ); - let ok = res.is_ok(); - if ok { - self.nreads.fetch_add(1, Ordering::SeqCst); - } - ok - } - SHARED_LOCK => { - // There is this race condition where we could've unlocked after loading lock == - // SHARED_LOCK. - self.nreads.fetch_add(1, Ordering::SeqCst); - let lock_after_load = self.lock.load(Ordering::SeqCst); - if lock_after_load != lock { - // try to lock it again - let res = self.lock.compare_exchange( - lock_after_load, - SHARED_LOCK, - Ordering::SeqCst, - Ordering::SeqCst, - ); - let ok = res.is_ok(); - if ok { - // we were able to acquire it back - true - } else { - // we couldn't acquire it back, reduce number again - self.nreads.fetch_sub(1, Ordering::SeqCst); - false - } - } else { - true - } - } - WRITE_LOCK => false, - _ => unreachable!(), - }; - tracing::trace!("read_lock({})", ok); - ok + #[inline] + /// Read the embedded 32-bit value atomically regardless of slot occupancy. + pub fn get_value(&self) -> u32 { + (self.0.load(Ordering::Acquire) >> Self::VALUE_SHIFT) as u32 } - /// Locks exclusively. Returns true if it was successful, false if it couldn't lock it - pub fn write(&mut self) -> bool { - let lock = self.lock.load(Ordering::SeqCst); - let ok = match lock { - NO_LOCK => { - let res = self.lock.compare_exchange( - lock, - WRITE_LOCK, - Ordering::SeqCst, - Ordering::SeqCst, - ); - res.is_ok() - } - SHARED_LOCK => { - // no op - false - } - WRITE_LOCK => false, - _ => unreachable!(), - }; - tracing::trace!("write_lock({})", ok); - ok - } - - /// Unlock the current held lock. - pub fn unlock(&mut self) { - let lock = self.lock.load(Ordering::SeqCst); - tracing::trace!("unlock(value={})", lock); - match lock { - NO_LOCK => {} - SHARED_LOCK => { - let prev = self.nreads.fetch_sub(1, Ordering::SeqCst); - if prev == 1 { - let res = self.lock.compare_exchange( - lock, - NO_LOCK, - Ordering::SeqCst, - Ordering::SeqCst, - ); - assert!(res.is_ok()); - } - } - WRITE_LOCK => { - let res = - self.lock - .compare_exchange(lock, NO_LOCK, Ordering::SeqCst, Ordering::SeqCst); - assert!(res.is_ok()); - } - _ => unreachable!(), - } + #[inline] + /// Set the embedded value while holding the write lock. + pub fn set_value_exclusive(&self, v: u32) { + // Must be called only while WRITER bit is set + let cur = self.0.load(Ordering::Relaxed); + turso_assert!(Self::has_writer(cur), "must hold exclusive lock"); + let desired = (cur & !Self::VALUE_MASK) | ((v as u64) << Self::VALUE_SHIFT); + self.0.store(desired, Ordering::Relaxed); } } @@ -343,7 +342,7 @@ impl Batch { let inner = &mut *scratch.inner.get(); inner.contents = Some(PageContent::new(0, new_buf)); // reset flags on scratch so it won't be cleared later with the real page - inner.flags.store(0, Ordering::SeqCst); + inner.flags.store(0, Ordering::Release); } } } @@ -550,13 +549,13 @@ pub struct WalFileShared { /// from a partially checkpointed db file. /// Slots 1‑4 carry a frame‑number in value and may be shared by many readers. Slot 1 is the /// default read lock and is to contain the max_frame in WAL. - pub read_locks: [LimboRwLock; 5], + pub read_locks: [TursoRwLock; 5], /// There is only one write allowed in WAL mode. This lock takes care of ensuring there is only /// one used. - pub write_lock: LimboRwLock, + pub write_lock: TursoRwLock, /// Serialises checkpointer threads, only one checkpoint can be in flight at any time. Blocking and exclusive only - pub checkpoint_lock: LimboRwLock, + pub checkpoint_lock: TursoRwLock, pub loaded: AtomicBool, } @@ -671,8 +670,8 @@ impl Wal for WalFile { ); let (shared_max, nbackfills, last_checksum, checkpoint_seq) = { let shared = self.get_shared(); - let mx = shared.max_frame.load(Ordering::SeqCst); - let nb = shared.nbackfills.load(Ordering::SeqCst); + let mx = shared.max_frame.load(Ordering::Acquire); + let nb = shared.nbackfills.load(Ordering::Acquire); let ck = shared.last_checksum; let checkpoint_seq = shared.wal_header.lock().checkpoint_seq; (mx, nb, ck, checkpoint_seq) @@ -707,7 +706,7 @@ impl Wal for WalFile { let mut best_idx: i64 = -1; let mut best_mark: u32 = 0; for (idx, lock) in self.get_shared().read_locks.iter().enumerate().skip(1) { - let m = lock.value.load(Ordering::SeqCst); + let m = lock.get_value(); if m != READMARK_NOT_USED && m <= shared_max as u32 && m > best_mark { best_mark = m; best_idx = idx as i64; @@ -721,7 +720,7 @@ impl Wal for WalFile { continue; // busy slot } // claim or bump this slot - lock.value.store(shared_max as u32, Ordering::SeqCst); + lock.set_value_exclusive(shared_max as u32); best_idx = idx as i64; best_mark = shared_max as u32; lock.unlock(); @@ -742,8 +741,8 @@ impl Wal for WalFile { return Ok((LimboResult::Busy, db_changed)); } ( - shared.max_frame.load(Ordering::SeqCst), - shared.nbackfills.load(Ordering::SeqCst), + shared.max_frame.load(Ordering::Acquire), + shared.nbackfills.load(Ordering::Acquire), shared.last_checksum, shared.wal_header.lock().checkpoint_seq, ) @@ -823,8 +822,8 @@ impl Wal for WalFile { let (shared_max, nbackfills, last_checksum) = { let shared = self.get_shared(); ( - shared.max_frame.load(Ordering::SeqCst), - shared.nbackfills.load(Ordering::SeqCst), + shared.max_frame.load(Ordering::Acquire), + shared.nbackfills.load(Ordering::Acquire), shared.last_checksum, ) }; @@ -866,8 +865,8 @@ impl Wal for WalFile { // if it's not, than pages from WAL range [frame_watermark..nBackfill] are already in the DB file, // and in case if page first occurrence in WAL was after frame_watermark - we will be unable to read proper previous version of the page turso_assert!( - frame_watermark.is_none() || frame_watermark.unwrap() >= self.get_shared().nbackfills.load(Ordering::SeqCst), - "frame_watermark must be >= than current WAL backfill amount: frame_watermark={:?}, nBackfill={}", frame_watermark, self.get_shared().nbackfills.load(Ordering::SeqCst) + frame_watermark.is_none() || frame_watermark.unwrap() >= self.get_shared().nbackfills.load(Ordering::Acquire), + "frame_watermark must be >= than current WAL backfill amount: frame_watermark={:?}, nBackfill={}", frame_watermark, self.get_shared().nbackfills.load(Ordering::Acquire) ); // if we are holding read_lock 0, skip and read right from db file. @@ -1030,7 +1029,7 @@ impl Wal for WalFile { write_counter: Rc>, ) -> Result { let shared = self.get_shared(); - if shared.max_frame.load(Ordering::SeqCst).eq(&0) { + if shared.max_frame.load(Ordering::Acquire).eq(&0) { self.ensure_header_if_needed()?; } let page_id = page.get().id; @@ -1083,8 +1082,8 @@ impl Wal for WalFile { #[instrument(skip_all, level = Level::DEBUG)] fn should_checkpoint(&self) -> bool { let shared = self.get_shared(); - let frame_id = shared.max_frame.load(Ordering::SeqCst) as usize; - let nbackfills = shared.nbackfills.load(Ordering::SeqCst) as usize; + let frame_id = shared.max_frame.load(Ordering::Acquire) as usize; + let nbackfills = shared.nbackfills.load(Ordering::Acquire) as usize; frame_id > self.checkpoint_threshold + nbackfills } @@ -1136,7 +1135,7 @@ impl Wal for WalFile { } fn get_max_frame_in_wal(&self) -> u64 { - self.get_shared().max_frame.load(Ordering::SeqCst) + self.get_shared().max_frame.load(Ordering::Acquire) } fn get_max_frame(&self) -> u64 { @@ -1154,7 +1153,7 @@ impl Wal for WalFile { { // TODO(pere): implement proper hashmap, this sucks :). let shared = self.get_shared(); - let max_frame = shared.max_frame.load(Ordering::SeqCst); + let max_frame = shared.max_frame.load(Ordering::Acquire); tracing::debug!(to_max_frame = max_frame); { let mut frame_cache = shared.frame_cache.lock(); @@ -1182,7 +1181,7 @@ impl Wal for WalFile { #[instrument(skip_all, level = Level::DEBUG)] fn finish_append_frames_commit(&mut self) -> Result<()> { let shared = self.get_shared(); - shared.max_frame.store(self.max_frame, Ordering::SeqCst); + shared.max_frame.store(self.max_frame, Ordering::Release); tracing::trace!(self.max_frame, ?self.last_checksum); shared.last_checksum = self.last_checksum; Ok(()) @@ -1228,7 +1227,7 @@ impl WalFile { Self { io, // default to max frame in WAL, so that when we read schema we can read from WAL too if it's there. - max_frame: unsafe { (*shared.get()).max_frame.load(Ordering::SeqCst) }, + max_frame: unsafe { (*shared.get()).max_frame.load(Ordering::Acquire) }, shared, ongoing_checkpoint: OngoingCheckpoint { scratch_page: checkpoint_page, @@ -1308,7 +1307,7 @@ impl WalFile { tracing::debug!("ensure_header_if_needed"); self.last_checksum = { let shared = self.get_shared(); - if shared.max_frame.load(Ordering::SeqCst) != 0 { + if shared.max_frame.load(Ordering::Acquire) != 0 { return Ok(()); } if shared.file.size()? >= WAL_HEADER_SIZE as u64 { @@ -1359,8 +1358,8 @@ impl WalFile { CheckpointState::Start => { let (max_frame, nbackfills) = { let shared = self.get_shared(); - let max_frame = shared.max_frame.load(Ordering::SeqCst); - let n_backfills = shared.nbackfills.load(Ordering::SeqCst); + let max_frame = shared.max_frame.load(Ordering::Acquire); + let n_backfills = shared.nbackfills.load(Ordering::Acquire); (max_frame, n_backfills) }; let needs_backfill = max_frame > nbackfills; @@ -1473,7 +1472,7 @@ impl WalFile { } CheckpointState::WaitFlush => { match self.ongoing_checkpoint.pending_flush.as_ref() { - Some(pf) if pf.done.load(Ordering::SeqCst) => { + Some(pf) if pf.done.load(Ordering::Acquire) => { // flush is done, we can continue tracing::trace!("checkpoint backfilling batch done"); } @@ -1512,8 +1511,8 @@ impl WalFile { } let mut checkpoint_result = { let shared = self.get_shared(); - let current_mx = shared.max_frame.load(Ordering::SeqCst); - let nbackfills = shared.nbackfills.load(Ordering::SeqCst); + let current_mx = shared.max_frame.load(Ordering::Acquire); + let nbackfills = shared.nbackfills.load(Ordering::Acquire); // Record two num pages fields to return as checkpoint result to caller. // Ref: pnLog, pnCkpt on https://www.sqlite.org/c3ref/wal_checkpoint_v2.html @@ -1545,7 +1544,7 @@ impl WalFile { // store the max frame we were able to successfully checkpoint. self.get_shared() .nbackfills - .store(self.ongoing_checkpoint.max_frame, Ordering::SeqCst); + .store(self.ongoing_checkpoint.max_frame, Ordering::Release); if matches!(mode, CheckpointMode::Restart | CheckpointMode::Truncate) { if checkpoint_result.everything_backfilled() { @@ -1618,10 +1617,10 @@ impl WalFile { /// We never modify slot values while a reader holds that slot. fn determine_max_safe_checkpoint_frame(&self) -> u64 { let shared = self.get_shared(); - let mut max_safe_frame = shared.max_frame.load(Ordering::SeqCst); + let mut max_safe_frame = shared.max_frame.load(Ordering::Acquire); for (read_lock_idx, read_lock) in shared.read_locks.iter_mut().enumerate().skip(1) { - let this_mark = read_lock.value.load(Ordering::SeqCst); + let this_mark = read_lock.get_value(); if this_mark < max_safe_frame as u32 { let busy = !read_lock.write(); if !busy { @@ -1631,7 +1630,7 @@ impl WalFile { } else { READMARK_NOT_USED }; - read_lock.value.store(val, Ordering::SeqCst); + read_lock.set_value_exclusive(val); read_lock.unlock(); } else { max_safe_frame = this_mark as u64; @@ -1668,7 +1667,7 @@ impl WalFile { return Err(LimboError::Busy); } // after the log is reset, we must set all secondary marks to READMARK_NOT_USED so the next reader selects a fresh slot - lock.value.store(READMARK_NOT_USED, Ordering::SeqCst); + lock.set_value_exclusive(READMARK_NOT_USED); } } @@ -1759,7 +1758,7 @@ impl WalFileShared { let mut max_loops = 100_000; while !unsafe { &*wal_file_shared.get() } .loaded - .load(Ordering::SeqCst) + .load(Ordering::Acquire) { io.run_once()?; max_loops -= 1; @@ -1814,18 +1813,15 @@ impl WalFileShared { }; io.wait_for_completion(c)?; tracing::debug!("new_shared(header={:?})", header); - let read_locks = array::from_fn(|_| LimboRwLock { - lock: AtomicU32::new(NO_LOCK), - nreads: AtomicU32::new(0), - value: AtomicU32::new(READMARK_NOT_USED), - }); - + let read_locks = array::from_fn(|_| TursoRwLock::new()); // slot zero is always zero as it signifies that reads can be done from the db file // directly, and slot 1 is the default read mark containing the max frame. in this case // our max frame is zero so both slots 0 and 1 begin at 0 - read_locks[0].value.store(0, Ordering::SeqCst); - read_locks[1].value.store(0, Ordering::SeqCst); - + for (i, lock) in read_locks.iter().enumerate() { + lock.write(); + lock.set_value_exclusive(if i < 2 { 0 } else { READMARK_NOT_USED }); + lock.unlock(); + } let shared = WalFileShared { wal_header: Arc::new(SpinLock::new(wal_header)), min_frame: AtomicU64::new(0), @@ -1836,12 +1832,8 @@ impl WalFileShared { file, pages_in_frames: Arc::new(SpinLock::new(Vec::new())), read_locks, - write_lock: LimboRwLock { - lock: AtomicU32::new(NO_LOCK), - nreads: AtomicU32::new(0), - value: AtomicU32::new(READMARK_NOT_USED), - }, - checkpoint_lock: LimboRwLock::new(), + write_lock: TursoRwLock::new(), + checkpoint_lock: TursoRwLock::new(), loaded: AtomicBool::new(true), }; Ok(Arc::new(UnsafeCell::new(shared))) @@ -1878,8 +1870,8 @@ impl WalFileShared { hdr.salt_1 = hdr.salt_1.wrapping_add(1); hdr.salt_2 = io.generate_random_number() as u32; - self.max_frame.store(0, Ordering::SeqCst); - self.nbackfills.store(0, Ordering::SeqCst); + self.max_frame.store(0, Ordering::Release); + self.nbackfills.store(0, Ordering::Release); self.last_checksum = (hdr.checksum_1, hdr.checksum_2); } @@ -1887,10 +1879,10 @@ impl WalFileShared { self.pages_in_frames.lock().clear(); // read-marks - self.read_locks[0].value.store(0, Ordering::SeqCst); - self.read_locks[1].value.store(0, Ordering::SeqCst); - for l in &self.read_locks[2..] { - l.value.store(READMARK_NOT_USED, Ordering::SeqCst); + self.read_locks[0].set_value_exclusive(0); + self.read_locks[1].set_value_exclusive(0); + for lock in &self.read_locks[2..] { + lock.set_value_exclusive(READMARK_NOT_USED); } Ok(()) } @@ -2302,9 +2294,7 @@ pub mod test { // Verify read marks after restart let read_marks_after: Vec<_> = unsafe { let s = &*wal_shared.get(); - (0..5) - .map(|i| s.read_locks[i].value.load(Ordering::SeqCst)) - .collect() + (0..5).map(|i| s.read_locks[i].get_value()).collect() }; assert_eq!(read_marks_after[0], 0, "Slot 0 should remain 0"); @@ -2402,11 +2392,7 @@ pub mod test { } // check that read mark 1 (default reader) was updated to max_frame - let read_mark_1 = unsafe { - (*wal_shared.get()).read_locks[1] - .value - .load(Ordering::SeqCst) - }; + let read_mark_1 = unsafe { (*wal_shared.get()).read_locks[1].get_value() }; assert_eq!( read_mark_1 as u64, max_frame_before,