diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 01c38396d..17a49a69a 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -141,24 +141,24 @@ pub struct PageInner { pub const TAG_UNSET: u64 = u64::MAX; /// Bit layout: -/// seq: 20 +/// epoch: 20 /// frame: 44 const EPOCH_BITS: u32 = 20; const FRAME_BITS: u32 = 64 - EPOCH_BITS; -const SEQ_SHIFT: u32 = FRAME_BITS; -const SEQ_MAX: u32 = (1u32 << EPOCH_BITS) - 1; +const EPOCH_SHIFT: u32 = FRAME_BITS; +const EPOCH_MAX: u32 = (1u32 << EPOCH_BITS) - 1; const FRAME_MAX: u64 = (1u64 << FRAME_BITS) - 1; #[inline] pub fn pack_tag_pair(frame: u64, seq: u32) -> u64 { - ((seq as u64) << SEQ_SHIFT) | (frame & FRAME_MAX) + ((seq as u64) << EPOCH_SHIFT) | (frame & FRAME_MAX) } #[inline] pub fn unpack_tag_pair(tag: u64) -> (u64, u32) { - let seq = ((tag >> SEQ_SHIFT) & (SEQ_MAX as u64)) as u32; + let epoch = ((tag >> EPOCH_SHIFT) & (EPOCH_MAX as u64)) as u32; let frame = tag & FRAME_MAX; - (frame, seq) + (frame, epoch) } #[derive(Debug)] @@ -285,15 +285,15 @@ impl Page { } #[inline] - /// Set the WAL tag from a (frame, seq) pair. + /// Set the WAL tag from a (frame, epoch) pair. /// If inputs are invalid, stores TAG_UNSET, which will prevent /// the cached page from being used during checkpoint. - pub fn set_wal_tag(&self, frame: u64, seq: u32) { + pub fn set_wal_tag(&self, frame: u64, epoch: u32) { // use only first 20 bits for seq (max: 1048576) - let seq20 = seq & SEQ_MAX; + let e = epoch & EPOCH_MAX; self.get() .wal_tag - .store(pack_tag_pair(frame, seq20), Ordering::Release); + .store(pack_tag_pair(frame, e), Ordering::Release); } #[inline] @@ -308,9 +308,9 @@ impl Page { } #[inline] - pub fn is_valid_for_checkpoint(&self, target_frame: u64, seq: u32) -> bool { + pub fn is_valid_for_checkpoint(&self, target_frame: u64, epoch: u32) -> bool { let (f, s) = self.wal_tag_pair(); - f == target_frame && s == seq && !self.is_dirty() && self.is_loaded() && !self.is_locked() + f == target_frame && s == epoch && !self.is_dirty() && self.is_loaded() && !self.is_locked() } } diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 59e2ca612..d2af9a6b0 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -71,7 +71,7 @@ use std::collections::{BTreeMap, HashMap}; use std::mem::MaybeUninit; use std::pin::Pin; use std::rc::Rc; -use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, AtomicUsize, Ordering}; use std::sync::Arc; /// The minimum size of a cell in bytes. @@ -1652,6 +1652,7 @@ pub fn build_shared_wal( loaded: AtomicBool::new(false), checkpoint_lock: TursoRwLock::new(), initialized: AtomicBool::new(false), + epoch: AtomicU32::new(0), })); if size < WAL_HEADER_SIZE as u64 { diff --git a/core/storage/wal.rs b/core/storage/wal.rs index dff796790..8fbf8eaa2 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -9,7 +9,7 @@ use tracing::{instrument, Level}; use parking_lot::RwLock; use std::fmt::{Debug, Formatter}; -use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering}; use std::{cell::Cell, fmt, rc::Rc, sync::Arc}; use super::buffer_pool::BufferPool; @@ -576,6 +576,7 @@ pub struct WalFile { min_frame: u64, /// Check of last frame in WAL, this is a cumulative checksum over all frames in the WAL last_checksum: (u32, u32), + checkpoint_seq: AtomicU32, /// Count of possible pages to checkpoint, and number of backfilled prev_checkpoint: CheckpointResult, @@ -693,6 +694,9 @@ pub struct WalFileShared { pub checkpoint_lock: TursoRwLock, pub loaded: AtomicBool, pub initialized: AtomicBool, + /// Increments on each checkpoint, used to prevent stale cached pages being used for + /// backfilling. + pub epoch: AtomicU32, } impl fmt::Debug for WalFileShared { @@ -822,7 +826,7 @@ impl Wal for WalFile { }; let db_changed = shared_max != self.max_frame || last_checksum != self.last_checksum - || checkpoint_seq != self.header.checkpoint_seq; + || checkpoint_seq != self.checkpoint_seq.load(Ordering::Acquire); // WAL is already fully back‑filled into the main DB image // (mxFrame == nBackfill). Readers can therefore ignore the @@ -1077,7 +1081,7 @@ impl Wal for WalFile { page.set_locked(); let frame = page.clone(); let page_idx = page.get().id; - let header = self.get_shared().wal_header.clone(); + let shared_file = self.shared.clone(); let complete = Box::new(move |res: Result<(Arc, i32), CompletionError>| { let Ok((buf, bytes_read)) = res else { page.clear_locked(); @@ -1091,8 +1095,8 @@ impl Wal for WalFile { ); let cloned = frame.clone(); finish_read_page(page.get().id, buf, cloned); - let seq = header.lock().checkpoint_seq; - frame.set_wal_tag(frame_id, seq); + let epoch = shared_file.read().epoch.load(Ordering::Acquire); + frame.set_wal_tag(frame_id, epoch); }); let shared = self.get_shared(); assert!( @@ -1301,7 +1305,7 @@ impl Wal for WalFile { tracing::debug!(frame_id, offset, page_id); let (c, checksums) = { let shared = self.get_shared(); - let shared_header = shared.wal_header.clone(); + let shared_file = self.shared.clone(); let header = shared.wal_header.lock(); let checksums = self.last_checksum; let page_content = page.get_contents(); @@ -1345,7 +1349,7 @@ impl Wal for WalFile { ); page.clear_dirty(); - let seq = shared_header.lock().checkpoint_seq; + let seq = shared_file.read().epoch.load(Ordering::Acquire); page.set_wal_tag(frame_id, seq); } }); @@ -1410,7 +1414,7 @@ impl Wal for WalFile { } fn get_checkpoint_seq(&self) -> u32 { - self.header.checkpoint_seq + self.get_shared().wal_header.lock().checkpoint_seq } fn get_max_frame(&self) -> u64 { @@ -1487,13 +1491,13 @@ impl Wal for WalFile { ); self.ensure_header_if_needed(page_sz)?; - let (header, shared_page_size, seq) = { + let (header, shared_page_size, epoch) = { let shared = self.get_shared(); let hdr_guard = shared.wal_header.lock(); let header: WalHeader = *hdr_guard; let shared_page_size = header.page_size; - let seq = header.checkpoint_seq; - (header, shared_page_size, seq) + let epoch = shared.epoch.load(Ordering::Acquire); + (header, shared_page_size, epoch) }; turso_assert!( shared_page_size == page_sz.get(), @@ -1573,7 +1577,7 @@ impl Wal for WalFile { for (page, fid, _csum) in &page_frame_for_cb { page.clear_dirty(); - page.set_wal_tag(*fid, seq); + page.set_wal_tag(*fid, epoch); } }; @@ -1642,6 +1646,7 @@ impl WalFile { }, checkpoint_threshold: 1000, buffer_pool, + checkpoint_seq: AtomicU32::new(0), syncing: Rc::new(Cell::new(false)), min_frame: 0, max_frame_read_lock_index: NO_LOCK_HELD.into(), @@ -1852,20 +1857,16 @@ impl WalFile { if self.ongoing_checkpoint.process_pending_reads() { tracing::trace!("Drained reads into batch"); } - - let seq = self.header.checkpoint_seq; + let epoch = self.get_shared().epoch.load(Ordering::Acquire); // Issue reads until we hit limits - while self.ongoing_checkpoint.should_issue_reads() { + 'inner: while self.ongoing_checkpoint.should_issue_reads() { let (page_id, target_frame) = self.ongoing_checkpoint.pages_to_checkpoint [self.ongoing_checkpoint.current_page as usize]; - - // dont use cached pages unless we hold the write lock, preventing them - // from being touched from under us. - if matches!(self.checkpoint_guard, Some(CheckpointLocks::Writer { .. })) { + 'fast_path: { if let Some(cached_page) = pager.cache_get_for_checkpoint( page_id as usize, target_frame, - seq, + epoch, )? { let contents = cached_page.get_contents().buffer.clone(); // to avoid TOCTOU issues with using cached pages, we snapshot the contents and pay the memcpy @@ -1873,6 +1874,12 @@ impl WalFile { let buffer = Arc::new(self.buffer_pool.get_page()); buffer.as_mut_slice()[..contents.len()] .copy_from_slice(contents.as_slice()); + if !cached_page.is_valid_for_checkpoint(target_frame, epoch) { + // check again, atomically, if the page is still valid after we + // copied a snapshot of it, if not: fallthrough to reading + // from disk + break 'fast_path; + } // TODO: remove this eventually to actually benefit from the // performance.. for now we assert that the cached page has the // exact contents as one read from the WAL. @@ -1889,12 +1896,8 @@ impl WalFile { let (_, wal_page) = sqlite3_ondisk::parse_wal_frame_header(&raw); let cached = buffer.as_slice(); - turso_assert!(wal_page == cached, "cache fast-path returned wrong content for page {page_id} frame {target_frame}"); + turso_assert!(wal_page == cached, "cached page content differs from WAL read for page_id={page_id}, frame_id={target_frame}"); } - turso_assert!( - cached_page.is_valid_for_checkpoint(target_frame, seq), - " should still be valid after snapshotting the page" - ); self.ongoing_checkpoint .pending_writes .insert(page_id as usize, buffer); @@ -1904,7 +1907,7 @@ impl WalFile { [self.ongoing_checkpoint.current_page as usize] = (page_id, target_frame); self.ongoing_checkpoint.current_page += 1; - continue; + continue 'inner; } } // Issue read if page wasn't found in the page cache or doesnt meet @@ -1993,6 +1996,8 @@ impl WalFile { if mode.should_restart_log() { self.restart_log(mode)?; } + // increment wal epoch to ensure no stale pages are used for backfilling + self.get_shared().epoch.fetch_add(1, Ordering::Release); // store a copy of the checkpoint result to return in the future if pragma // wal_checkpoint is called and we haven't backfilled again since. @@ -2136,16 +2141,11 @@ impl WalFile { .inspect_err(|e| { unlock(Some(e)); })?; - let (header, cksm) = { - let shared = self.get_shared(); - let header = *shared.wal_header.lock(); - let cksm = shared.last_checksum; - (header, cksm) - }; + let cksm = self.get_shared().last_checksum; self.last_checksum = cksm; - self.header = header; self.max_frame = 0; self.min_frame = 0; + self.checkpoint_seq.fetch_add(1, Ordering::Release); // For TRUNCATE mode: shrink the WAL file to 0 B if matches!(mode, CheckpointMode::Truncate { .. }) { @@ -2298,6 +2298,7 @@ impl WalFileShared { checkpoint_lock: TursoRwLock::new(), loaded: AtomicBool::new(true), initialized: AtomicBool::new(false), + epoch: AtomicU32::new(0), }; Ok(Arc::new(RwLock::new(shared))) } @@ -2341,6 +2342,7 @@ impl WalFileShared { checkpoint_lock: TursoRwLock::new(), loaded: AtomicBool::new(true), initialized: AtomicBool::new(false), + epoch: AtomicU32::new(0), }; Ok(Arc::new(RwLock::new(shared))) }