From 708208606140f025a2e81598da5fb98a1c09ad8a Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Tue, 19 Aug 2025 11:53:30 -0400 Subject: [PATCH] Remove ENV var and enable cache by default, track which pages were cached --- core/storage/pager.rs | 6 +- core/storage/wal.rs | 144 +++++++++++------------ scripts/limbo-sqlite3 | 4 +- scripts/limbo-sqlite3-index-experimental | 4 +- scripts/run-sim | 4 +- 5 files changed, 76 insertions(+), 86 deletions(-) diff --git a/core/storage/pager.rs b/core/storage/pager.rs index f74e696cc..103276285 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -288,8 +288,8 @@ impl Page { } #[inline] - pub fn is_valid_for_checkpoint(&self, target_frame: u64, seq: u32) -> bool { - let (f, s) = self.wal_tag_pair(); + pub fn is_valid_for_checkpoint(&self, target_frame: u64) -> bool { + let (f, _s) = self.wal_tag_pair(); f == target_frame && !self.is_dirty() } } @@ -1168,7 +1168,7 @@ impl Pager { let mut page_cache = self.page_cache.write(); let page_key = PageCacheKey::new(page_idx); page_cache.get(&page_key).and_then(|page| { - if page.is_valid_for_checkpoint(target_frame, seq) { + if page.is_valid_for_checkpoint(target_frame) { tracing::trace!( "cache_get_for_checkpoint: page {} frame {} is valid", page_idx, diff --git a/core/storage/wal.rs b/core/storage/wal.rs index b8d09fae3..358c259be 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -15,7 +15,7 @@ use super::buffer_pool::BufferPool; use super::pager::{PageRef, Pager}; use super::sqlite3_ondisk::{self, checksum_wal, WalHeader, WAL_MAGIC_BE, WAL_MAGIC_LE}; use crate::fast_lock::SpinLock; -use crate::io::{File, IO}; +use crate::io::{clock, File, IO}; use crate::result::LimboResult; use crate::storage::encryption::{decrypt_page, encrypt_page, EncryptionKey}; use crate::storage::sqlite3_ondisk::{ @@ -299,7 +299,7 @@ pub enum CheckpointState { Done, } -/// IOV_MAX is 1024 on most systems +/// IOV_MAX is 1024 on most systems, lets use 512 to be safe pub const CKPT_BATCH_PAGES: usize = 512; /// TODO: *ALL* of these need to be tuned for perf. It is tricky @@ -358,7 +358,8 @@ impl WriteBatch { } (true, true) => { // merges two runs into one - self.run_count = self.run_count.saturating_sub(1); + turso_assert!(self.run_count >= 2, "should have at least two runs here"); + self.run_count -= 1; } } self.items.insert(page_id, buf); @@ -414,7 +415,7 @@ impl std::ops::DerefMut for WriteBatch { /// Information and structures for processing a checkpoint operation. struct OngoingCheckpoint { /// Used for benchmarking/debugging a checkpoint operation. - time: std::time::Instant, + time: clock::Instant, /// minimum frame number to be backfilled by this checkpoint operation. min_frame: u64, /// maximum safe frame number that will be backfilled by this checkpoint operation. @@ -429,8 +430,9 @@ struct OngoingCheckpoint { inflight_reads: Vec, /// Array of atomic counters representing write operations that are currently in flight. inflight_writes: Vec>, - /// List of all (page_id, frame_id) combinations to be backfilled. - pages_to_checkpoint: Vec<(u64, u64)>, + /// List of all page_id + frame_id combinations to be backfilled, with a boolean + /// to denote that a cached page was used + pages_to_checkpoint: Vec<(u64, u64, bool)>, } impl OngoingCheckpoint { @@ -476,7 +478,7 @@ impl OngoingCheckpoint { #[inline] /// Remove any completed write operations from `inflight_writes`, /// returns whether any progress was made. - fn process_pending_writes(&mut self) -> bool { + fn process_inflight_writes(&mut self) -> bool { let before_len = self.inflight_writes.len(); self.inflight_writes .retain(|done| !done.load(Ordering::Acquire)); @@ -1024,10 +1026,15 @@ impl Wal for WalFile { let key = self.encryption_key.borrow().clone(); let seq = self.header.checkpoint_seq; let complete = Box::new(move |res: Result<(Arc, i32), CompletionError>| { - let Ok((buf, _bytes_read)) = res else { + let Ok((buf, bytes_read)) = res else { page.clear_locked(); return; }; + let buf_len = buf.len(); + turso_assert!( + bytes_read == buf_len as i32, + "read({bytes_read}) less than expected({buf_len}): frame_id={frame_id}" + ); let cloned = frame.clone(); if let Some(key) = key.clone() { match decrypt_page(buf.as_slice(), page_idx, &key) { @@ -1364,22 +1371,14 @@ impl WalFile { ) -> Self { let header = unsafe { shared.get().as_mut().unwrap().wal_header.lock() }; let last_checksum = unsafe { (*shared.get()).last_checksum }; - let enable_checkpoint_cache = ["true", "1", "on"].iter().any(|s| { - s.eq_ignore_ascii_case( - std::env::var("TURSO_ENABLE_CHECKPOINT_CACHE") - .unwrap_or_default() - .to_lowercase() - .as_str(), - ) - }); - set_enable_checkpoint_cache(enable_checkpoint_cache); + let now = io.now(); Self { io, // default to max frame in WAL, so that when we read schema we can read from WAL too if it's there. max_frame: unsafe { (*shared.get()).max_frame.load(Ordering::Acquire) }, shared, ongoing_checkpoint: OngoingCheckpoint { - time: std::time::Instant::now(), + time: now, pending_writes: WriteBatch::new(), inflight_writes: Vec::new(), state: CheckpointState::Start, @@ -1532,7 +1531,7 @@ impl WalFile { f >= self.ongoing_checkpoint.min_frame && f <= self.ongoing_checkpoint.max_frame }) { - list.push((page_id, frame)); + list.push((page_id, frame, false)); } } // sort by frame_id for read locality @@ -1544,7 +1543,7 @@ impl WalFile { self.ongoing_checkpoint.inflight_writes.clear(); self.ongoing_checkpoint.inflight_reads.clear(); self.ongoing_checkpoint.state = CheckpointState::Processing; - self.ongoing_checkpoint.time = std::time::Instant::now(); + self.ongoing_checkpoint.time = self.io.now(); tracing::trace!( "checkpoint_start(min_frame={}, max_frame={})", self.ongoing_checkpoint.min_frame, @@ -1557,11 +1556,11 @@ impl WalFile { // to prevent serialization, and we try to issue reads and flush batches concurrently // if at all possible, at the cost of some batching potential. CheckpointState::Processing => { - // Gather I/O completions, estimate with MAX_PENDING_WRITES to prevent realloc + // Gather I/O completions, estimate with MAX_INFLIGHT_WRITES to prevent realloc let mut completions = Vec::with_capacity(MAX_INFLIGHT_WRITES); // Check and clean any completed writes from pending flush - if self.ongoing_checkpoint.process_pending_writes() { + if self.ongoing_checkpoint.process_inflight_writes() { tracing::trace!("Completed a write batch"); } // Process completed reads into current batch @@ -1572,40 +1571,40 @@ impl WalFile { let seq = self.header.checkpoint_seq; // Issue reads until we hit limits while self.ongoing_checkpoint.should_issue_reads() { - let (page_id, target_frame) = self.ongoing_checkpoint.pages_to_checkpoint - [self.ongoing_checkpoint.current_page as usize]; + let (page_id, target_frame, _) = + self.ongoing_checkpoint.pages_to_checkpoint + [self.ongoing_checkpoint.current_page as usize]; // Try cache first, if enabled - if is_checkpoint_cache_enabled() { - if let Some(cached_page) = - pager.cache_get_for_checkpoint(page_id as usize, target_frame, seq) + if let Some(cached_page) = + pager.cache_get_for_checkpoint(page_id as usize, target_frame, seq) + { + let contents = cached_page.get_contents(); + let buffer = contents.buffer.clone(); + // TODO: remove this eventually to actually benefit from the + // performance.. for now we assert that the cached page has the + // exact contents as one read from the WAL. + #[cfg(debug_assertions)] { - let contents = cached_page.get_contents(); - let buffer = contents.buffer.clone(); - // TODO: remove this eventually to actually benefit from the - // performance.. for now we assert that the cached page has the - // exact contents as one read from the WAL. - #[cfg(debug_assertions)] - { - let mut raw = vec![ - 0u8; - self.page_size() as usize - + WAL_FRAME_HEADER_SIZE - ]; - self.io.wait_for_completion( - self.read_frame_raw(target_frame, &mut raw)?, - )?; - let (_, wal_page) = - sqlite3_ondisk::parse_wal_frame_header(&raw); - let cached = cached_page.get_contents().buffer.as_slice(); - turso_assert!(wal_page == cached, "cache fast-path returned wrong content for page {page_id} frame {target_frame}"); - } - self.ongoing_checkpoint - .pending_writes - .insert(page_id as usize, buffer); - self.ongoing_checkpoint.current_page += 1; - continue; + let mut raw = + vec![0u8; self.page_size() as usize + WAL_FRAME_HEADER_SIZE]; + self.io.wait_for_completion( + self.read_frame_raw(target_frame, &mut raw)?, + )?; + let (_, wal_page) = sqlite3_ondisk::parse_wal_frame_header(&raw); + let cached = cached_page.get_contents().buffer.as_slice(); + turso_assert!(wal_page == cached, "cache fast-path returned wrong content for page {page_id} frame {target_frame}"); } + self.ongoing_checkpoint + .pending_writes + .insert(page_id as usize, buffer); + + // signify that a cached page was used, so it can be unpinned + self.ongoing_checkpoint.pages_to_checkpoint + [self.ongoing_checkpoint.current_page as usize] = + (page_id, target_frame, true); + self.ongoing_checkpoint.current_page += 1; + continue; } // Issue read if page wasn't found in the page cache or doesnt meet // the frame requirements @@ -1635,23 +1634,21 @@ impl WalFile { if !completions.is_empty() { io_yield_many!(completions); } else if self.ongoing_checkpoint.complete() { - // if we are completely done backfilling, we need to unpin any pages we - // used from the page cache. - for (page_id, frame_id) in + // if we are completely done backfilling, we need to unpin any pages we used from the page cache. + for (page_id, _, cached) in self.ongoing_checkpoint.pages_to_checkpoint.iter() { - if let Some(page) = pager.cache_get((*page_id) as usize) { - // we have to check validity to ensure we dont unpin anything we didnt pin - if page - .is_valid_for_checkpoint(*frame_id, self.header.checkpoint_seq) - { - page.try_unpin(); - } + if *cached { + let page = pager.cache_get((*page_id) as usize); + turso_assert!( + page.is_some(), + "page should still exist in the page cache" + ); + // if we used a cached page, unpin it + page.map(|p| p.try_unpin()); } } self.ongoing_checkpoint.state = CheckpointState::Done; - } else if !completions.is_empty() { - io_yield_many!(completions); } } // All eligible frames copied to the db file @@ -1731,9 +1728,12 @@ impl WalFile { self.ongoing_checkpoint.pages_to_checkpoint.clear(); self.ongoing_checkpoint.current_page = 0; tracing::debug!( - "total time spent checkpointing: {}", - std::time::Instant::now() - .duration_since(self.ongoing_checkpoint.time) + "total time spent checkpointing: {:?}", + self.io + .now() + .to_system_time() + .duration_since(self.ongoing_checkpoint.time.to_system_time()) + .expect("time") .as_millis() ); self.ongoing_checkpoint.state = CheckpointState::Start; @@ -2061,16 +2061,6 @@ impl WalFileShared { } } -/// Enable experimental using cached pages to backfill .db file during checkpoint. -static ENABLE_CKPT_CACHE: AtomicBool = AtomicBool::new(false); -fn set_enable_checkpoint_cache(v: bool) { - ENABLE_CKPT_CACHE.store(v, Ordering::Relaxed); -} -#[inline] -fn is_checkpoint_cache_enabled() -> bool { - ENABLE_CKPT_CACHE.load(Ordering::Relaxed) -} - #[cfg(test)] pub mod test { use crate::{ diff --git a/scripts/limbo-sqlite3 b/scripts/limbo-sqlite3 index 3225910d8..9054eca05 100755 --- a/scripts/limbo-sqlite3 +++ b/scripts/limbo-sqlite3 @@ -11,7 +11,7 @@ EXPERIMENTAL_FLAGS="--experimental-views" # if RUST_LOG is non-empty, enable tracing output if [ -n "$RUST_LOG" ]; then - TURSO_ENABLE_CHECKPOINT_CACHE=1 TESTING="true" "$TURSODB" -m list -q $EXPERIMENTAL_FLAGS -t testing/test.log "$@" + TESTING="true" "$TURSODB" -m list -q $EXPERIMENTAL_FLAGS -t testing/test.log "$@" else - TURSO_ENABLE_CHECKPOINT_CACHE=1 TESTING="true" "$TURSODB" -m list -q $EXPERIMENTAL_FLAGS "$@" + TESTING="true" "$TURSODB" -m list -q $EXPERIMENTAL_FLAGS "$@" fi diff --git a/scripts/limbo-sqlite3-index-experimental b/scripts/limbo-sqlite3-index-experimental index 5a512496d..a0ec545aa 100755 --- a/scripts/limbo-sqlite3-index-experimental +++ b/scripts/limbo-sqlite3-index-experimental @@ -11,7 +11,7 @@ EXPERIMENTAL_FLAGS="--experimental-indexes" # if RUST_LOG is non-empty, enable tracing output if [ -n "$RUST_LOG" ]; then - TURSO_ENABLE_CHECKPOINT_CACHE=1 "$TURSODB" -m list -q $EXPERIMENTAL_FLAGS -t testing/test.log "$@" + "$TURSODB" -m list -q $EXPERIMENTAL_FLAGS -t testing/test.log "$@" else - TURSO_ENABLE_CHECKPOINT_CACHE=1 "$TURSODB" -m list -q $EXPERIMENTAL_FLAGS "$@" + "$TURSODB" -m list -q $EXPERIMENTAL_FLAGS "$@" fi diff --git a/scripts/run-sim b/scripts/run-sim index 4489238e5..661062c54 100755 --- a/scripts/run-sim +++ b/scripts/run-sim @@ -3,10 +3,10 @@ set -e if [[ -n "$@" ]]; then - TURSO_ENABLE_CHECKPOINT_CACHE=1 cargo run -p limbo_sim -- "$@" + cargo run -p limbo_sim -- "$@" else echo "Running limbo_sim in infinite loop..." while true; do - TURSO_ENABLE_CHECKPOINT_CACHE=1 cargo run -p limbo_sim + cargo run -p limbo_sim done fi