Remove ENV var and enable cache by default, track which pages were cached

This commit is contained in:
PThorpe92
2025-08-19 11:53:30 -04:00
parent bf48101db2
commit 7082086061
5 changed files with 76 additions and 86 deletions

View File

@@ -288,8 +288,8 @@ impl Page {
}
#[inline]
pub fn is_valid_for_checkpoint(&self, target_frame: u64, seq: u32) -> bool {
let (f, s) = self.wal_tag_pair();
pub fn is_valid_for_checkpoint(&self, target_frame: u64) -> bool {
let (f, _s) = self.wal_tag_pair();
f == target_frame && !self.is_dirty()
}
}
@@ -1168,7 +1168,7 @@ impl Pager {
let mut page_cache = self.page_cache.write();
let page_key = PageCacheKey::new(page_idx);
page_cache.get(&page_key).and_then(|page| {
if page.is_valid_for_checkpoint(target_frame, seq) {
if page.is_valid_for_checkpoint(target_frame) {
tracing::trace!(
"cache_get_for_checkpoint: page {} frame {} is valid",
page_idx,

View File

@@ -15,7 +15,7 @@ use super::buffer_pool::BufferPool;
use super::pager::{PageRef, Pager};
use super::sqlite3_ondisk::{self, checksum_wal, WalHeader, WAL_MAGIC_BE, WAL_MAGIC_LE};
use crate::fast_lock::SpinLock;
use crate::io::{File, IO};
use crate::io::{clock, File, IO};
use crate::result::LimboResult;
use crate::storage::encryption::{decrypt_page, encrypt_page, EncryptionKey};
use crate::storage::sqlite3_ondisk::{
@@ -299,7 +299,7 @@ pub enum CheckpointState {
Done,
}
/// IOV_MAX is 1024 on most systems
/// IOV_MAX is 1024 on most systems, lets use 512 to be safe
pub const CKPT_BATCH_PAGES: usize = 512;
/// TODO: *ALL* of these need to be tuned for perf. It is tricky
@@ -358,7 +358,8 @@ impl WriteBatch {
}
(true, true) => {
// merges two runs into one
self.run_count = self.run_count.saturating_sub(1);
turso_assert!(self.run_count >= 2, "should have at least two runs here");
self.run_count -= 1;
}
}
self.items.insert(page_id, buf);
@@ -414,7 +415,7 @@ impl std::ops::DerefMut for WriteBatch {
/// Information and structures for processing a checkpoint operation.
struct OngoingCheckpoint {
/// Used for benchmarking/debugging a checkpoint operation.
time: std::time::Instant,
time: clock::Instant,
/// minimum frame number to be backfilled by this checkpoint operation.
min_frame: u64,
/// maximum safe frame number that will be backfilled by this checkpoint operation.
@@ -429,8 +430,9 @@ struct OngoingCheckpoint {
inflight_reads: Vec<InflightRead>,
/// Array of atomic counters representing write operations that are currently in flight.
inflight_writes: Vec<Arc<AtomicBool>>,
/// List of all (page_id, frame_id) combinations to be backfilled.
pages_to_checkpoint: Vec<(u64, u64)>,
/// List of all page_id + frame_id combinations to be backfilled, with a boolean
/// to denote that a cached page was used
pages_to_checkpoint: Vec<(u64, u64, bool)>,
}
impl OngoingCheckpoint {
@@ -476,7 +478,7 @@ impl OngoingCheckpoint {
#[inline]
/// Remove any completed write operations from `inflight_writes`,
/// returns whether any progress was made.
fn process_pending_writes(&mut self) -> bool {
fn process_inflight_writes(&mut self) -> bool {
let before_len = self.inflight_writes.len();
self.inflight_writes
.retain(|done| !done.load(Ordering::Acquire));
@@ -1024,10 +1026,15 @@ impl Wal for WalFile {
let key = self.encryption_key.borrow().clone();
let seq = self.header.checkpoint_seq;
let complete = Box::new(move |res: Result<(Arc<Buffer>, i32), CompletionError>| {
let Ok((buf, _bytes_read)) = res else {
let Ok((buf, bytes_read)) = res else {
page.clear_locked();
return;
};
let buf_len = buf.len();
turso_assert!(
bytes_read == buf_len as i32,
"read({bytes_read}) less than expected({buf_len}): frame_id={frame_id}"
);
let cloned = frame.clone();
if let Some(key) = key.clone() {
match decrypt_page(buf.as_slice(), page_idx, &key) {
@@ -1364,22 +1371,14 @@ impl WalFile {
) -> Self {
let header = unsafe { shared.get().as_mut().unwrap().wal_header.lock() };
let last_checksum = unsafe { (*shared.get()).last_checksum };
let enable_checkpoint_cache = ["true", "1", "on"].iter().any(|s| {
s.eq_ignore_ascii_case(
std::env::var("TURSO_ENABLE_CHECKPOINT_CACHE")
.unwrap_or_default()
.to_lowercase()
.as_str(),
)
});
set_enable_checkpoint_cache(enable_checkpoint_cache);
let now = io.now();
Self {
io,
// default to max frame in WAL, so that when we read schema we can read from WAL too if it's there.
max_frame: unsafe { (*shared.get()).max_frame.load(Ordering::Acquire) },
shared,
ongoing_checkpoint: OngoingCheckpoint {
time: std::time::Instant::now(),
time: now,
pending_writes: WriteBatch::new(),
inflight_writes: Vec::new(),
state: CheckpointState::Start,
@@ -1532,7 +1531,7 @@ impl WalFile {
f >= self.ongoing_checkpoint.min_frame
&& f <= self.ongoing_checkpoint.max_frame
}) {
list.push((page_id, frame));
list.push((page_id, frame, false));
}
}
// sort by frame_id for read locality
@@ -1544,7 +1543,7 @@ impl WalFile {
self.ongoing_checkpoint.inflight_writes.clear();
self.ongoing_checkpoint.inflight_reads.clear();
self.ongoing_checkpoint.state = CheckpointState::Processing;
self.ongoing_checkpoint.time = std::time::Instant::now();
self.ongoing_checkpoint.time = self.io.now();
tracing::trace!(
"checkpoint_start(min_frame={}, max_frame={})",
self.ongoing_checkpoint.min_frame,
@@ -1557,11 +1556,11 @@ impl WalFile {
// to prevent serialization, and we try to issue reads and flush batches concurrently
// if at all possible, at the cost of some batching potential.
CheckpointState::Processing => {
// Gather I/O completions, estimate with MAX_PENDING_WRITES to prevent realloc
// Gather I/O completions, estimate with MAX_INFLIGHT_WRITES to prevent realloc
let mut completions = Vec::with_capacity(MAX_INFLIGHT_WRITES);
// Check and clean any completed writes from pending flush
if self.ongoing_checkpoint.process_pending_writes() {
if self.ongoing_checkpoint.process_inflight_writes() {
tracing::trace!("Completed a write batch");
}
// Process completed reads into current batch
@@ -1572,40 +1571,40 @@ impl WalFile {
let seq = self.header.checkpoint_seq;
// Issue reads until we hit limits
while self.ongoing_checkpoint.should_issue_reads() {
let (page_id, target_frame) = self.ongoing_checkpoint.pages_to_checkpoint
[self.ongoing_checkpoint.current_page as usize];
let (page_id, target_frame, _) =
self.ongoing_checkpoint.pages_to_checkpoint
[self.ongoing_checkpoint.current_page as usize];
// Try cache first, if enabled
if is_checkpoint_cache_enabled() {
if let Some(cached_page) =
pager.cache_get_for_checkpoint(page_id as usize, target_frame, seq)
if let Some(cached_page) =
pager.cache_get_for_checkpoint(page_id as usize, target_frame, seq)
{
let contents = cached_page.get_contents();
let buffer = contents.buffer.clone();
// TODO: remove this eventually to actually benefit from the
// performance.. for now we assert that the cached page has the
// exact contents as one read from the WAL.
#[cfg(debug_assertions)]
{
let contents = cached_page.get_contents();
let buffer = contents.buffer.clone();
// TODO: remove this eventually to actually benefit from the
// performance.. for now we assert that the cached page has the
// exact contents as one read from the WAL.
#[cfg(debug_assertions)]
{
let mut raw = vec![
0u8;
self.page_size() as usize
+ WAL_FRAME_HEADER_SIZE
];
self.io.wait_for_completion(
self.read_frame_raw(target_frame, &mut raw)?,
)?;
let (_, wal_page) =
sqlite3_ondisk::parse_wal_frame_header(&raw);
let cached = cached_page.get_contents().buffer.as_slice();
turso_assert!(wal_page == cached, "cache fast-path returned wrong content for page {page_id} frame {target_frame}");
}
self.ongoing_checkpoint
.pending_writes
.insert(page_id as usize, buffer);
self.ongoing_checkpoint.current_page += 1;
continue;
let mut raw =
vec![0u8; self.page_size() as usize + WAL_FRAME_HEADER_SIZE];
self.io.wait_for_completion(
self.read_frame_raw(target_frame, &mut raw)?,
)?;
let (_, wal_page) = sqlite3_ondisk::parse_wal_frame_header(&raw);
let cached = cached_page.get_contents().buffer.as_slice();
turso_assert!(wal_page == cached, "cache fast-path returned wrong content for page {page_id} frame {target_frame}");
}
self.ongoing_checkpoint
.pending_writes
.insert(page_id as usize, buffer);
// signify that a cached page was used, so it can be unpinned
self.ongoing_checkpoint.pages_to_checkpoint
[self.ongoing_checkpoint.current_page as usize] =
(page_id, target_frame, true);
self.ongoing_checkpoint.current_page += 1;
continue;
}
// Issue read if page wasn't found in the page cache or doesnt meet
// the frame requirements
@@ -1635,23 +1634,21 @@ impl WalFile {
if !completions.is_empty() {
io_yield_many!(completions);
} else if self.ongoing_checkpoint.complete() {
// if we are completely done backfilling, we need to unpin any pages we
// used from the page cache.
for (page_id, frame_id) in
// if we are completely done backfilling, we need to unpin any pages we used from the page cache.
for (page_id, _, cached) in
self.ongoing_checkpoint.pages_to_checkpoint.iter()
{
if let Some(page) = pager.cache_get((*page_id) as usize) {
// we have to check validity to ensure we dont unpin anything we didnt pin
if page
.is_valid_for_checkpoint(*frame_id, self.header.checkpoint_seq)
{
page.try_unpin();
}
if *cached {
let page = pager.cache_get((*page_id) as usize);
turso_assert!(
page.is_some(),
"page should still exist in the page cache"
);
// if we used a cached page, unpin it
page.map(|p| p.try_unpin());
}
}
self.ongoing_checkpoint.state = CheckpointState::Done;
} else if !completions.is_empty() {
io_yield_many!(completions);
}
}
// All eligible frames copied to the db file
@@ -1731,9 +1728,12 @@ impl WalFile {
self.ongoing_checkpoint.pages_to_checkpoint.clear();
self.ongoing_checkpoint.current_page = 0;
tracing::debug!(
"total time spent checkpointing: {}",
std::time::Instant::now()
.duration_since(self.ongoing_checkpoint.time)
"total time spent checkpointing: {:?}",
self.io
.now()
.to_system_time()
.duration_since(self.ongoing_checkpoint.time.to_system_time())
.expect("time")
.as_millis()
);
self.ongoing_checkpoint.state = CheckpointState::Start;
@@ -2061,16 +2061,6 @@ impl WalFileShared {
}
}
/// Enable experimental using cached pages to backfill .db file during checkpoint.
static ENABLE_CKPT_CACHE: AtomicBool = AtomicBool::new(false);
fn set_enable_checkpoint_cache(v: bool) {
ENABLE_CKPT_CACHE.store(v, Ordering::Relaxed);
}
#[inline]
fn is_checkpoint_cache_enabled() -> bool {
ENABLE_CKPT_CACHE.load(Ordering::Relaxed)
}
#[cfg(test)]
pub mod test {
use crate::{

View File

@@ -11,7 +11,7 @@ EXPERIMENTAL_FLAGS="--experimental-views"
# if RUST_LOG is non-empty, enable tracing output
if [ -n "$RUST_LOG" ]; then
TURSO_ENABLE_CHECKPOINT_CACHE=1 TESTING="true" "$TURSODB" -m list -q $EXPERIMENTAL_FLAGS -t testing/test.log "$@"
TESTING="true" "$TURSODB" -m list -q $EXPERIMENTAL_FLAGS -t testing/test.log "$@"
else
TURSO_ENABLE_CHECKPOINT_CACHE=1 TESTING="true" "$TURSODB" -m list -q $EXPERIMENTAL_FLAGS "$@"
TESTING="true" "$TURSODB" -m list -q $EXPERIMENTAL_FLAGS "$@"
fi

View File

@@ -11,7 +11,7 @@ EXPERIMENTAL_FLAGS="--experimental-indexes"
# if RUST_LOG is non-empty, enable tracing output
if [ -n "$RUST_LOG" ]; then
TURSO_ENABLE_CHECKPOINT_CACHE=1 "$TURSODB" -m list -q $EXPERIMENTAL_FLAGS -t testing/test.log "$@"
"$TURSODB" -m list -q $EXPERIMENTAL_FLAGS -t testing/test.log "$@"
else
TURSO_ENABLE_CHECKPOINT_CACHE=1 "$TURSODB" -m list -q $EXPERIMENTAL_FLAGS "$@"
"$TURSODB" -m list -q $EXPERIMENTAL_FLAGS "$@"
fi

View File

@@ -3,10 +3,10 @@
set -e
if [[ -n "$@" ]]; then
TURSO_ENABLE_CHECKPOINT_CACHE=1 cargo run -p limbo_sim -- "$@"
cargo run -p limbo_sim -- "$@"
else
echo "Running limbo_sim in infinite loop..."
while true; do
TURSO_ENABLE_CHECKPOINT_CACHE=1 cargo run -p limbo_sim
cargo run -p limbo_sim
done
fi