mirror of
https://github.com/aljazceru/turso.git
synced 2025-12-27 13:04:20 +01:00
Merge 'Update epoch on each checkpoint to prevent using stale pages for backfilling' from Preston Thorpe
Using this epoch that gets incremented on each checkpoint, combined with snapshotting the page, allows us to use cached pages during even passive mode (without the write lock), because we check again after the snapshot that the page is still valid. https://github.com/tursodatabase/turso/pull/3053#issuecomment-3285093103 This PR also removes the `WalFile` copy of the `WalHeader`, to prevent us forgetting that it exists and using potentially stale data, and adds `checkpoint_seq` atomic to WalFile to help us determine whether the log has changed and a read tx snapshot is stale. Reviewed-by: Jussi Saurio <jussi.saurio@gmail.com> Closes #3058
This commit is contained in:
@@ -141,24 +141,24 @@ pub struct PageInner {
|
||||
pub const TAG_UNSET: u64 = u64::MAX;
|
||||
|
||||
/// Bit layout:
|
||||
/// seq: 20
|
||||
/// epoch: 20
|
||||
/// frame: 44
|
||||
const EPOCH_BITS: u32 = 20;
|
||||
const FRAME_BITS: u32 = 64 - EPOCH_BITS;
|
||||
const SEQ_SHIFT: u32 = FRAME_BITS;
|
||||
const SEQ_MAX: u32 = (1u32 << EPOCH_BITS) - 1;
|
||||
const EPOCH_SHIFT: u32 = FRAME_BITS;
|
||||
const EPOCH_MAX: u32 = (1u32 << EPOCH_BITS) - 1;
|
||||
const FRAME_MAX: u64 = (1u64 << FRAME_BITS) - 1;
|
||||
|
||||
#[inline]
|
||||
pub fn pack_tag_pair(frame: u64, seq: u32) -> u64 {
|
||||
((seq as u64) << SEQ_SHIFT) | (frame & FRAME_MAX)
|
||||
((seq as u64) << EPOCH_SHIFT) | (frame & FRAME_MAX)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn unpack_tag_pair(tag: u64) -> (u64, u32) {
|
||||
let seq = ((tag >> SEQ_SHIFT) & (SEQ_MAX as u64)) as u32;
|
||||
let epoch = ((tag >> EPOCH_SHIFT) & (EPOCH_MAX as u64)) as u32;
|
||||
let frame = tag & FRAME_MAX;
|
||||
(frame, seq)
|
||||
(frame, epoch)
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -285,15 +285,15 @@ impl Page {
|
||||
}
|
||||
|
||||
#[inline]
|
||||
/// Set the WAL tag from a (frame, seq) pair.
|
||||
/// Set the WAL tag from a (frame, epoch) pair.
|
||||
/// If inputs are invalid, stores TAG_UNSET, which will prevent
|
||||
/// the cached page from being used during checkpoint.
|
||||
pub fn set_wal_tag(&self, frame: u64, seq: u32) {
|
||||
pub fn set_wal_tag(&self, frame: u64, epoch: u32) {
|
||||
// use only first 20 bits for seq (max: 1048576)
|
||||
let seq20 = seq & SEQ_MAX;
|
||||
let e = epoch & EPOCH_MAX;
|
||||
self.get()
|
||||
.wal_tag
|
||||
.store(pack_tag_pair(frame, seq20), Ordering::Release);
|
||||
.store(pack_tag_pair(frame, e), Ordering::Release);
|
||||
}
|
||||
|
||||
#[inline]
|
||||
@@ -308,9 +308,9 @@ impl Page {
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn is_valid_for_checkpoint(&self, target_frame: u64, seq: u32) -> bool {
|
||||
pub fn is_valid_for_checkpoint(&self, target_frame: u64, epoch: u32) -> bool {
|
||||
let (f, s) = self.wal_tag_pair();
|
||||
f == target_frame && s == seq && !self.is_dirty() && self.is_loaded() && !self.is_locked()
|
||||
f == target_frame && s == epoch && !self.is_dirty() && self.is_loaded() && !self.is_locked()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -71,7 +71,7 @@ use std::collections::{BTreeMap, HashMap};
|
||||
use std::mem::MaybeUninit;
|
||||
use std::pin::Pin;
|
||||
use std::rc::Rc;
|
||||
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
|
||||
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, AtomicUsize, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
/// The minimum size of a cell in bytes.
|
||||
@@ -1652,6 +1652,7 @@ pub fn build_shared_wal(
|
||||
loaded: AtomicBool::new(false),
|
||||
checkpoint_lock: TursoRwLock::new(),
|
||||
initialized: AtomicBool::new(false),
|
||||
epoch: AtomicU32::new(0),
|
||||
}));
|
||||
|
||||
if size < WAL_HEADER_SIZE as u64 {
|
||||
|
||||
@@ -9,7 +9,7 @@ use tracing::{instrument, Level};
|
||||
|
||||
use parking_lot::RwLock;
|
||||
use std::fmt::{Debug, Formatter};
|
||||
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
|
||||
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
|
||||
use std::{cell::Cell, fmt, rc::Rc, sync::Arc};
|
||||
|
||||
use super::buffer_pool::BufferPool;
|
||||
@@ -576,6 +576,7 @@ pub struct WalFile {
|
||||
min_frame: u64,
|
||||
/// Check of last frame in WAL, this is a cumulative checksum over all frames in the WAL
|
||||
last_checksum: (u32, u32),
|
||||
checkpoint_seq: AtomicU32,
|
||||
|
||||
/// Count of possible pages to checkpoint, and number of backfilled
|
||||
prev_checkpoint: CheckpointResult,
|
||||
@@ -693,6 +694,9 @@ pub struct WalFileShared {
|
||||
pub checkpoint_lock: TursoRwLock,
|
||||
pub loaded: AtomicBool,
|
||||
pub initialized: AtomicBool,
|
||||
/// Increments on each checkpoint, used to prevent stale cached pages being used for
|
||||
/// backfilling.
|
||||
pub epoch: AtomicU32,
|
||||
}
|
||||
|
||||
impl fmt::Debug for WalFileShared {
|
||||
@@ -822,7 +826,7 @@ impl Wal for WalFile {
|
||||
};
|
||||
let db_changed = shared_max != self.max_frame
|
||||
|| last_checksum != self.last_checksum
|
||||
|| checkpoint_seq != self.header.checkpoint_seq;
|
||||
|| checkpoint_seq != self.checkpoint_seq.load(Ordering::Acquire);
|
||||
|
||||
// WAL is already fully back‑filled into the main DB image
|
||||
// (mxFrame == nBackfill). Readers can therefore ignore the
|
||||
@@ -1077,7 +1081,7 @@ impl Wal for WalFile {
|
||||
page.set_locked();
|
||||
let frame = page.clone();
|
||||
let page_idx = page.get().id;
|
||||
let header = self.get_shared().wal_header.clone();
|
||||
let shared_file = self.shared.clone();
|
||||
let complete = Box::new(move |res: Result<(Arc<Buffer>, i32), CompletionError>| {
|
||||
let Ok((buf, bytes_read)) = res else {
|
||||
page.clear_locked();
|
||||
@@ -1091,8 +1095,8 @@ impl Wal for WalFile {
|
||||
);
|
||||
let cloned = frame.clone();
|
||||
finish_read_page(page.get().id, buf, cloned);
|
||||
let seq = header.lock().checkpoint_seq;
|
||||
frame.set_wal_tag(frame_id, seq);
|
||||
let epoch = shared_file.read().epoch.load(Ordering::Acquire);
|
||||
frame.set_wal_tag(frame_id, epoch);
|
||||
});
|
||||
let shared = self.get_shared();
|
||||
assert!(
|
||||
@@ -1301,7 +1305,7 @@ impl Wal for WalFile {
|
||||
tracing::debug!(frame_id, offset, page_id);
|
||||
let (c, checksums) = {
|
||||
let shared = self.get_shared();
|
||||
let shared_header = shared.wal_header.clone();
|
||||
let shared_file = self.shared.clone();
|
||||
let header = shared.wal_header.lock();
|
||||
let checksums = self.last_checksum;
|
||||
let page_content = page.get_contents();
|
||||
@@ -1345,7 +1349,7 @@ impl Wal for WalFile {
|
||||
);
|
||||
|
||||
page.clear_dirty();
|
||||
let seq = shared_header.lock().checkpoint_seq;
|
||||
let seq = shared_file.read().epoch.load(Ordering::Acquire);
|
||||
page.set_wal_tag(frame_id, seq);
|
||||
}
|
||||
});
|
||||
@@ -1410,7 +1414,7 @@ impl Wal for WalFile {
|
||||
}
|
||||
|
||||
fn get_checkpoint_seq(&self) -> u32 {
|
||||
self.header.checkpoint_seq
|
||||
self.get_shared().wal_header.lock().checkpoint_seq
|
||||
}
|
||||
|
||||
fn get_max_frame(&self) -> u64 {
|
||||
@@ -1487,13 +1491,13 @@ impl Wal for WalFile {
|
||||
);
|
||||
self.ensure_header_if_needed(page_sz)?;
|
||||
|
||||
let (header, shared_page_size, seq) = {
|
||||
let (header, shared_page_size, epoch) = {
|
||||
let shared = self.get_shared();
|
||||
let hdr_guard = shared.wal_header.lock();
|
||||
let header: WalHeader = *hdr_guard;
|
||||
let shared_page_size = header.page_size;
|
||||
let seq = header.checkpoint_seq;
|
||||
(header, shared_page_size, seq)
|
||||
let epoch = shared.epoch.load(Ordering::Acquire);
|
||||
(header, shared_page_size, epoch)
|
||||
};
|
||||
turso_assert!(
|
||||
shared_page_size == page_sz.get(),
|
||||
@@ -1573,7 +1577,7 @@ impl Wal for WalFile {
|
||||
|
||||
for (page, fid, _csum) in &page_frame_for_cb {
|
||||
page.clear_dirty();
|
||||
page.set_wal_tag(*fid, seq);
|
||||
page.set_wal_tag(*fid, epoch);
|
||||
}
|
||||
};
|
||||
|
||||
@@ -1642,6 +1646,7 @@ impl WalFile {
|
||||
},
|
||||
checkpoint_threshold: 1000,
|
||||
buffer_pool,
|
||||
checkpoint_seq: AtomicU32::new(0),
|
||||
syncing: Rc::new(Cell::new(false)),
|
||||
min_frame: 0,
|
||||
max_frame_read_lock_index: NO_LOCK_HELD.into(),
|
||||
@@ -1852,20 +1857,16 @@ impl WalFile {
|
||||
if self.ongoing_checkpoint.process_pending_reads() {
|
||||
tracing::trace!("Drained reads into batch");
|
||||
}
|
||||
|
||||
let seq = self.header.checkpoint_seq;
|
||||
let epoch = self.get_shared().epoch.load(Ordering::Acquire);
|
||||
// Issue reads until we hit limits
|
||||
while self.ongoing_checkpoint.should_issue_reads() {
|
||||
'inner: while self.ongoing_checkpoint.should_issue_reads() {
|
||||
let (page_id, target_frame) = self.ongoing_checkpoint.pages_to_checkpoint
|
||||
[self.ongoing_checkpoint.current_page as usize];
|
||||
|
||||
// dont use cached pages unless we hold the write lock, preventing them
|
||||
// from being touched from under us.
|
||||
if matches!(self.checkpoint_guard, Some(CheckpointLocks::Writer { .. })) {
|
||||
'fast_path: {
|
||||
if let Some(cached_page) = pager.cache_get_for_checkpoint(
|
||||
page_id as usize,
|
||||
target_frame,
|
||||
seq,
|
||||
epoch,
|
||||
)? {
|
||||
let contents = cached_page.get_contents().buffer.clone();
|
||||
// to avoid TOCTOU issues with using cached pages, we snapshot the contents and pay the memcpy
|
||||
@@ -1873,6 +1874,12 @@ impl WalFile {
|
||||
let buffer = Arc::new(self.buffer_pool.get_page());
|
||||
buffer.as_mut_slice()[..contents.len()]
|
||||
.copy_from_slice(contents.as_slice());
|
||||
if !cached_page.is_valid_for_checkpoint(target_frame, epoch) {
|
||||
// check again, atomically, if the page is still valid after we
|
||||
// copied a snapshot of it, if not: fallthrough to reading
|
||||
// from disk
|
||||
break 'fast_path;
|
||||
}
|
||||
// TODO: remove this eventually to actually benefit from the
|
||||
// performance.. for now we assert that the cached page has the
|
||||
// exact contents as one read from the WAL.
|
||||
@@ -1889,12 +1896,8 @@ impl WalFile {
|
||||
let (_, wal_page) =
|
||||
sqlite3_ondisk::parse_wal_frame_header(&raw);
|
||||
let cached = buffer.as_slice();
|
||||
turso_assert!(wal_page == cached, "cache fast-path returned wrong content for page {page_id} frame {target_frame}");
|
||||
turso_assert!(wal_page == cached, "cached page content differs from WAL read for page_id={page_id}, frame_id={target_frame}");
|
||||
}
|
||||
turso_assert!(
|
||||
cached_page.is_valid_for_checkpoint(target_frame, seq),
|
||||
" should still be valid after snapshotting the page"
|
||||
);
|
||||
self.ongoing_checkpoint
|
||||
.pending_writes
|
||||
.insert(page_id as usize, buffer);
|
||||
@@ -1904,7 +1907,7 @@ impl WalFile {
|
||||
[self.ongoing_checkpoint.current_page as usize] =
|
||||
(page_id, target_frame);
|
||||
self.ongoing_checkpoint.current_page += 1;
|
||||
continue;
|
||||
continue 'inner;
|
||||
}
|
||||
}
|
||||
// Issue read if page wasn't found in the page cache or doesnt meet
|
||||
@@ -1993,6 +1996,8 @@ impl WalFile {
|
||||
if mode.should_restart_log() {
|
||||
self.restart_log(mode)?;
|
||||
}
|
||||
// increment wal epoch to ensure no stale pages are used for backfilling
|
||||
self.get_shared().epoch.fetch_add(1, Ordering::Release);
|
||||
|
||||
// store a copy of the checkpoint result to return in the future if pragma
|
||||
// wal_checkpoint is called and we haven't backfilled again since.
|
||||
@@ -2136,16 +2141,11 @@ impl WalFile {
|
||||
.inspect_err(|e| {
|
||||
unlock(Some(e));
|
||||
})?;
|
||||
let (header, cksm) = {
|
||||
let shared = self.get_shared();
|
||||
let header = *shared.wal_header.lock();
|
||||
let cksm = shared.last_checksum;
|
||||
(header, cksm)
|
||||
};
|
||||
let cksm = self.get_shared().last_checksum;
|
||||
self.last_checksum = cksm;
|
||||
self.header = header;
|
||||
self.max_frame = 0;
|
||||
self.min_frame = 0;
|
||||
self.checkpoint_seq.fetch_add(1, Ordering::Release);
|
||||
|
||||
// For TRUNCATE mode: shrink the WAL file to 0 B
|
||||
if matches!(mode, CheckpointMode::Truncate { .. }) {
|
||||
@@ -2298,6 +2298,7 @@ impl WalFileShared {
|
||||
checkpoint_lock: TursoRwLock::new(),
|
||||
loaded: AtomicBool::new(true),
|
||||
initialized: AtomicBool::new(false),
|
||||
epoch: AtomicU32::new(0),
|
||||
};
|
||||
Ok(Arc::new(RwLock::new(shared)))
|
||||
}
|
||||
@@ -2341,6 +2342,7 @@ impl WalFileShared {
|
||||
checkpoint_lock: TursoRwLock::new(),
|
||||
loaded: AtomicBool::new(true),
|
||||
initialized: AtomicBool::new(false),
|
||||
epoch: AtomicU32::new(0),
|
||||
};
|
||||
Ok(Arc::new(RwLock::new(shared)))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user