Fix taking page content from cached pages in checkpoint loop

This commit is contained in:
PThorpe92
2025-07-26 15:49:20 -04:00
parent b04128b585
commit b8e6cd5ae2

View File

@@ -20,8 +20,8 @@ use crate::fast_lock::SpinLock;
use crate::io::{File, IO};
use crate::result::LimboResult;
use crate::storage::sqlite3_ondisk::{
begin_read_wal_frame, begin_read_wal_frame_raw, begin_write_btree_pages_writev,
finish_read_page, prepare_wal_frame, WAL_FRAME_HEADER_SIZE, WAL_HEADER_SIZE,
begin_read_wal_frame, begin_read_wal_frame_raw, finish_read_page, prepare_wal_frame,
write_pages_vectored, WAL_FRAME_HEADER_SIZE, WAL_HEADER_SIZE,
};
use crate::types::IOResult;
use crate::{turso_assert, Buffer, LimboError, Result};
@@ -399,7 +399,7 @@ pub enum CheckpointState {
Done,
}
pub const CKPT_BATCH_PAGES: usize = 512;
pub const CKPT_BATCH_PAGES: usize = 1024;
#[derive(Clone)]
pub(super) struct BatchItem {
@@ -699,27 +699,28 @@ impl Drop for CheckpointLocks {
}
fn take_page_into_batch(scratch: &PageRef, pool: &Arc<BufferPool>, batch: &mut Vec<BatchItem>) {
// grab id and buffer
let id = scratch.get().id;
let buf = scratch.get_contents().buffer.clone();
scratch.pin(); // ensure it isnt evicted
batch.push(BatchItem { id, buf });
// give scratch a brand-new empty buffer for the next read
reinit_scratch_buffer(scratch, pool);
}
let (id, buf_clone) = unsafe {
let inner = &*scratch.inner.get();
let id = inner.id;
let contents = inner.contents.as_ref().expect("scratch has contents");
let buf = contents.buffer.clone();
(id, buf)
};
fn reinit_scratch_buffer(scratch: &PageRef, pool: &Arc<BufferPool>) {
// Push into batch
batch.push(BatchItem { id, buf: buf_clone });
// Re-initialize scratch with a fresh buffer
let raw = pool.get();
let pool_clone = pool.clone();
let drop_fn = Rc::new(move |b| {
pool_clone.put(b);
});
let drop_fn = Rc::new(move |b| pool_clone.put(b));
let new_buf = Arc::new(RefCell::new(Buffer::new(raw, drop_fn)));
// replace contents
unsafe {
let inner = &mut *scratch.inner.get();
inner.contents = Some(PageContent::new(0, new_buf));
inner.flags.store(0, Ordering::Relaxed);
// reset flags on scratch so it won't be cleared later with the real page
inner.flags.store(0, Ordering::SeqCst);
}
}
@@ -1128,8 +1129,8 @@ impl Wal for WalFile {
#[instrument(skip_all, level = Level::DEBUG)]
fn should_checkpoint(&self) -> bool {
let shared = self.get_shared();
let frame_id = shared.max_frame.load(Ordering::SeqCst) as usize;
let nbackfills = shared.nbackfills.load(Ordering::SeqCst) as usize;
let frame_id = shared.max_frame.load(Ordering::SeqCst) as usize;
frame_id > self.checkpoint_threshold + nbackfills
}
@@ -1137,7 +1138,7 @@ impl Wal for WalFile {
fn checkpoint(
&mut self,
pager: &Pager,
write_counter: Rc<RefCell<usize>>,
_write_counter: Rc<RefCell<usize>>,
mode: CheckpointMode,
) -> Result<IOResult<CheckpointResult>> {
if matches!(mode, CheckpointMode::Full) {
@@ -1323,6 +1324,8 @@ impl WalFile {
self.ongoing_checkpoint.max_frame = 0;
self.ongoing_checkpoint.current_page = 0;
self.max_frame_read_lock_index.set(NO_LOCK_HELD);
self.ongoing_checkpoint.batch.clear();
self.ongoing_checkpoint.pending_flushes.clear();
self.sync_state.set(SyncState::NotSyncing);
self.syncing.set(false);
}
@@ -1455,17 +1458,16 @@ impl WalFile {
}
CheckpointState::AccumulatePage => {
// mark before batching
self.ongoing_checkpoint.scratch.set_dirty();
self.ongoing_checkpoint.scratch_page.set_dirty();
take_page_into_batch(
&self.ongoing_checkpoint.scratch,
&self.ongoing_checkpoint.scratch_page,
&self.buffer_pool,
&mut self.ongoing_checkpoint.batch,
);
let more_pages = (self.ongoing_checkpoint.current_page as usize)
< self.get_shared().pages_in_frames.lock().len() - 1;
if self.ongoing_checkpoint.batch.len() < CKPT_BATCH_PAGES && more_pages {
if more_pages {
self.ongoing_checkpoint.current_page += 1;
self.ongoing_checkpoint.state = CheckpointState::ReadFrame;
} else {
@@ -1473,17 +1475,13 @@ impl WalFile {
}
}
CheckpointState::FlushBatch => {
tracing::trace!("started checkpoint backfilling batch");
self.ongoing_checkpoint
.pending_flushes
.push(begin_write_btree_pages_writev(
pager,
&self.ongoing_checkpoint.batch,
write_counter.clone(),
)?);
.push(write_pages_vectored(pager, &self.ongoing_checkpoint.batch)?);
// batch is queued
self.ongoing_checkpoint.batch.clear();
self.ongoing_checkpoint.state = CheckpointState::WaitFlush;
return Ok(IOResult::IO);
}
CheckpointState::WaitFlush => {
if self
@@ -1494,7 +1492,12 @@ impl WalFile {
{
return Ok(IOResult::IO);
}
for pf in self.ongoing_checkpoint.pending_flushes.drain(..) {
tracing::debug!("finished checkpoint backfilling batch");
for pf in self
.ongoing_checkpoint
.pending_flushes
.drain(std::ops::RangeFull)
{
for id in pf.pages {
if let Some(p) = pager.cache_get(id) {
p.clear_dirty();
@@ -1509,6 +1512,7 @@ impl WalFile {
self.ongoing_checkpoint.current_page += 1;
self.ongoing_checkpoint.state = CheckpointState::ReadFrame;
} else {
tracing::info!("transitioning checkpoint to done");
self.ongoing_checkpoint.state = CheckpointState::Done;
}
}