Merge 'Fix checkpoint fast-path, don't use cached pages w/o write lock' from Preston Thorpe

closes #3024
Don't use pages from the cache unless we hold an exclusive write lock,
because a page could be updated by a writer in-memory at any point
before we backfill it.
Clear the WAL tag in other areas to prevent any stale tags. Also, we
will just snapshot the page when we determine that it's eligible, and
pay a memcpy instead of the read from disk, but this further prevents
any in-memory changes to the page/TOCTOU issues, and we also assert that
it's still eligible after we copy it to a new buffer.

Reviewed-by: Jussi Saurio <jussi.saurio@gmail.com>

Closes #3036
This commit is contained in:
Preston Thorpe
2025-09-12 07:39:32 -04:00
committed by GitHub
2 changed files with 64 additions and 60 deletions

View File

@@ -217,12 +217,14 @@ impl Page {
pub fn set_dirty(&self) {
tracing::debug!("set_dirty(page={})", self.get().id);
self.clear_wal_tag();
self.get().flags.fetch_or(PAGE_DIRTY, Ordering::Release);
}
pub fn clear_dirty(&self) {
tracing::debug!("clear_dirty(page={})", self.get().id);
self.get().flags.fetch_and(!PAGE_DIRTY, Ordering::Release);
self.clear_wal_tag();
}
pub fn is_loaded(&self) -> bool {
@@ -308,7 +310,7 @@ impl Page {
#[inline]
pub fn is_valid_for_checkpoint(&self, target_frame: u64, seq: u32) -> bool {
let (f, s) = self.wal_tag_pair();
f == target_frame && s == seq && !self.is_dirty()
f == target_frame && s == seq && !self.is_dirty() && self.is_loaded() && !self.is_locked()
}
}
@@ -1196,12 +1198,9 @@ impl Pager {
let page_key = PageCacheKey::new(page_idx);
let page = page_cache.get(&page_key)?.and_then(|page| {
if page.is_valid_for_checkpoint(target_frame, seq) {
tracing::trace!(
"cache_get_for_checkpoint: page {} frame {} is valid",
page_idx,
target_frame
tracing::debug!(
"cache_get_for_checkpoint: page {page_idx} frame {target_frame} is valid",
);
page.pin();
Some(page.clone())
} else {
tracing::trace!(
@@ -2110,6 +2109,7 @@ impl Pager {
))
})?;
page.set_loaded();
page.clear_wal_tag();
Ok(())
}
@@ -2203,6 +2203,7 @@ pub fn allocate_new_page(page_id: usize, buffer_pool: &Arc<BufferPool>, offset:
let buffer = buffer_pool.get_page();
let buffer = Arc::new(buffer);
page.set_loaded();
page.clear_wal_tag();
page.get().contents = Some(PageContent::new(offset, buffer));
}
page

View File

@@ -449,9 +449,8 @@ struct OngoingCheckpoint {
inflight_reads: Vec<InflightRead>,
/// Array of atomic counters representing write operations that are currently in flight.
inflight_writes: Vec<Arc<AtomicBool>>,
/// List of all page_id + frame_id combinations to be backfilled, with a boolean
/// to denote that a cached page was used
pages_to_checkpoint: Vec<(u64, u64, bool)>,
/// List of all page_id + frame_id combinations to be backfilled
pages_to_checkpoint: Vec<(u64, u64)>,
}
impl OngoingCheckpoint {
@@ -1073,10 +1072,11 @@ impl Wal for WalFile {
page.set_locked();
let frame = page.clone();
let page_idx = page.get().id;
let seq = self.header.checkpoint_seq;
let header = self.get_shared().wal_header.clone();
let complete = Box::new(move |res: Result<(Arc<Buffer>, i32), CompletionError>| {
let Ok((buf, bytes_read)) = res else {
page.clear_locked();
page.clear_wal_tag();
return;
};
let buf_len = buf.len();
@@ -1086,6 +1086,7 @@ impl Wal for WalFile {
);
let cloned = frame.clone();
finish_read_page(page.get().id, buf, cloned);
let seq = header.lock().checkpoint_seq;
frame.set_wal_tag(frame_id, seq);
});
let shared = self.get_shared();
@@ -1295,8 +1296,8 @@ impl Wal for WalFile {
tracing::debug!(frame_id, offset, page_id);
let (c, checksums) = {
let shared = self.get_shared();
let header = shared.wal_header.clone();
let header = header.lock();
let shared_header = shared.wal_header.clone();
let header = shared.wal_header.lock();
let checksums = self.last_checksum;
let page_content = page.get_contents();
let page_buf = page_content.as_ptr();
@@ -1316,7 +1317,6 @@ impl Wal for WalFile {
page_buf
};
let seq = header.checkpoint_seq;
let (frame_checksums, frame_bytes) = prepare_wal_frame(
&self.buffer_pool,
&header,
@@ -1340,6 +1340,7 @@ impl Wal for WalFile {
);
page.clear_dirty();
let seq = shared_header.lock().checkpoint_seq;
page.set_wal_tag(frame_id, seq);
}
});
@@ -1805,7 +1806,7 @@ impl WalFile {
f >= self.ongoing_checkpoint.min_frame
&& f <= self.ongoing_checkpoint.max_frame
}) {
list.push((page_id, frame, false));
list.push((page_id, frame));
}
}
// sort by frame_id for read locality
@@ -1845,40 +1846,56 @@ impl WalFile {
let seq = self.header.checkpoint_seq;
// Issue reads until we hit limits
while self.ongoing_checkpoint.should_issue_reads() {
let (page_id, target_frame, _) =
self.ongoing_checkpoint.pages_to_checkpoint
[self.ongoing_checkpoint.current_page as usize];
let (page_id, target_frame) = self.ongoing_checkpoint.pages_to_checkpoint
[self.ongoing_checkpoint.current_page as usize];
// Try cache first, if enabled
if let Some(cached_page) =
pager.cache_get_for_checkpoint(page_id as usize, target_frame, seq)?
{
let contents = cached_page.get_contents();
let buffer = contents.buffer.clone();
// TODO: remove this eventually to actually benefit from the
// performance.. for now we assert that the cached page has the
// exact contents as one read from the WAL.
#[cfg(debug_assertions)]
{
let mut raw =
vec![0u8; self.page_size() as usize + WAL_FRAME_HEADER_SIZE];
self.io.wait_for_completion(
self.read_frame_raw(target_frame, &mut raw)?,
)?;
let (_, wal_page) = sqlite3_ondisk::parse_wal_frame_header(&raw);
let cached = cached_page.get_contents().buffer.as_slice();
turso_assert!(wal_page == cached, "cache fast-path returned wrong content for page {page_id} frame {target_frame}");
// dont use cached pages unless we hold the write lock, preventing them
// from being touched from under us.
if matches!(self.checkpoint_guard, Some(CheckpointLocks::Writer { .. })) {
if let Some(cached_page) = pager.cache_get_for_checkpoint(
page_id as usize,
target_frame,
seq,
)? {
let contents = cached_page.get_contents().buffer.clone();
// to avoid TOCTOU issues with using cached pages, we snapshot the contents and pay the memcpy
// instead of risking the page changing out from under us.
let buffer = Arc::new(self.buffer_pool.get_page());
buffer.as_mut_slice()[..contents.len()]
.copy_from_slice(contents.as_slice());
// TODO: remove this eventually to actually benefit from the
// performance.. for now we assert that the cached page has the
// exact contents as one read from the WAL.
#[cfg(debug_assertions)]
{
let mut raw = vec![
0u8;
self.page_size() as usize
+ WAL_FRAME_HEADER_SIZE
];
self.io.wait_for_completion(
self.read_frame_raw(target_frame, &mut raw)?,
)?;
let (_, wal_page) =
sqlite3_ondisk::parse_wal_frame_header(&raw);
let cached = buffer.as_slice();
turso_assert!(wal_page == cached, "cache fast-path returned wrong content for page {page_id} frame {target_frame}");
}
turso_assert!(
cached_page.is_valid_for_checkpoint(target_frame, seq),
" should still be valid after snapshotting the page"
);
self.ongoing_checkpoint
.pending_writes
.insert(page_id as usize, buffer);
// signify that a cached page was used, so it can be unpinned
self.ongoing_checkpoint.pages_to_checkpoint
[self.ongoing_checkpoint.current_page as usize] =
(page_id, target_frame);
self.ongoing_checkpoint.current_page += 1;
continue;
}
self.ongoing_checkpoint
.pending_writes
.insert(page_id as usize, buffer);
// signify that a cached page was used, so it can be unpinned
self.ongoing_checkpoint.pages_to_checkpoint
[self.ongoing_checkpoint.current_page as usize] =
(page_id, target_frame, true);
self.ongoing_checkpoint.current_page += 1;
continue;
}
// Issue read if page wasn't found in the page cache or doesnt meet
// the frame requirements
@@ -1906,20 +1923,6 @@ impl WalFile {
if !completions.is_empty() {
io_yield_many!(completions);
} else if self.ongoing_checkpoint.complete() {
// if we are completely done backfilling, we need to unpin any pages we used from the page cache.
for (page_id, _, cached) in
self.ongoing_checkpoint.pages_to_checkpoint.iter()
{
if *cached {
let page = pager.cache_get((*page_id) as usize)?;
turso_assert!(
page.is_some(),
"page should still exist in the page cache"
);
// if we used a cached page, unpin it
page.map(|p| p.try_unpin());
}
}
self.ongoing_checkpoint.state = CheckpointState::Done;
}
}