Merge 'core/wal: Minor checkpointing cleanups and optimizations' from Preston Thorpe

Small contribution to my current work on making checkpointing efficient.
We hold a write lock, and especially here on main there is no reason to
mark the pages as dirty in the cache, so we can do away with that
Vec<u64> and just track whether it's `Done`

Closes #2545
This commit is contained in:
Jussi Saurio
2025-08-12 10:16:51 +03:00
committed by GitHub
2 changed files with 39 additions and 61 deletions

View File

@@ -948,9 +948,10 @@ pub fn begin_write_btree_page(
pub fn write_pages_vectored(
pager: &Pager,
batch: BTreeMap<usize, Arc<Buffer>>,
) -> Result<PendingFlush> {
flush: &PendingFlush,
) -> Result<()> {
if batch.is_empty() {
return Ok(PendingFlush::default());
return Ok(());
}
// batch item array is already sorted by id, so we just need to find contiguous ranges of page_id's
@@ -974,7 +975,8 @@ pub fn write_pages_vectored(
// Create the atomic counters
let runs_left = Arc::new(AtomicUsize::new(run_count));
let done = Arc::new(AtomicBool::new(false));
flush.new_flush();
let done = flush.done.clone();
// we know how many runs, but we don't know how many buffers per run, so we can only give an
// estimate of the capacity
const EST_BUFF_CAPACITY: usize = 32;
@@ -983,7 +985,6 @@ pub fn write_pages_vectored(
// We can reuse this across runs without reallocating
let mut run_bufs = Vec::with_capacity(EST_BUFF_CAPACITY);
let mut run_start_id: Option<usize> = None;
let mut all_ids = Vec::with_capacity(batch.len());
// Iterate through the batch
let mut iter = batch.into_iter().peekable();
@@ -996,7 +997,6 @@ pub fn write_pages_vectored(
// Add this page to the current run
run_bufs.push(item);
all_ids.push(id);
// Check if this is the end of a run
let is_end_of_run = match iter.peek() {
@@ -1016,31 +1016,23 @@ pub fn write_pages_vectored(
});
// Submit write operation for this run, decrementing the counter if we error
if let Err(e) = pager
.db_file
.write_pages(start_id, page_sz, run_bufs.clone(), c)
{
if let Err(e) = pager.db_file.write_pages(
start_id,
page_sz,
std::mem::replace(&mut run_bufs, Vec::with_capacity(EST_BUFF_CAPACITY)),
c,
) {
if runs_left.fetch_sub(1, Ordering::AcqRel) == 1 {
done.store(true, Ordering::Release);
}
return Err(e);
}
// Reset for next run
run_bufs.clear();
run_start_id = None;
}
}
tracing::debug!(
"write_pages_vectored: {} pages to write, runs: {run_count}",
all_ids.len()
);
Ok(PendingFlush {
pages: all_ids,
done,
})
tracing::debug!("write_pages_vectored: total runs={run_count}");
Ok(())
}
#[instrument(skip_all, level = Level::DEBUG)]

View File

@@ -370,15 +370,13 @@ struct OngoingCheckpoint {
scratch_page: PageRef,
batch: Batch,
state: CheckpointState,
pending_flush: Option<PendingFlush>,
pending_flush: PendingFlush,
min_frame: u64,
max_frame: u64,
current_page: u64,
}
pub(super) struct PendingFlush {
// page ids to clear
pub(super) pages: Vec<usize>,
// completion flag set by IO callback
pub(super) done: Arc<AtomicBool>,
}
@@ -392,17 +390,21 @@ impl Default for PendingFlush {
impl PendingFlush {
pub fn new() -> Self {
Self {
pages: Vec::with_capacity(CKPT_BATCH_PAGES),
done: Arc::new(AtomicBool::new(false)),
done: Arc::new(AtomicBool::new(true)),
}
}
// clear the dirty flag of all pages in the pending flush batch
fn clear_dirty(&self, pager: &Pager) {
for id in &self.pages {
if let Some(p) = pager.cache_get(*id) {
p.clear_dirty();
}
}
pub(super) fn new_flush(&self) {
turso_assert!(
self.is_done(),
"should not reset new flush without being in done state"
);
self.done.store(false, Ordering::Release);
}
fn clear(&self) {
self.done.store(true, Ordering::Relaxed);
}
fn is_done(&self) -> bool {
self.done.load(Ordering::Acquire)
}
}
@@ -1239,7 +1241,7 @@ impl WalFile {
ongoing_checkpoint: OngoingCheckpoint {
scratch_page: checkpoint_page,
batch: Batch::new(),
pending_flush: None,
pending_flush: PendingFlush::new(),
state: CheckpointState::Start,
min_frame: 0,
max_frame: 0,
@@ -1303,7 +1305,7 @@ impl WalFile {
self.ongoing_checkpoint.current_page = 0;
self.max_frame_read_lock_index.set(NO_LOCK_HELD);
self.ongoing_checkpoint.batch.clear();
let _ = self.ongoing_checkpoint.pending_flush.take();
self.ongoing_checkpoint.pending_flush.clear();
self.sync_state.set(SyncState::NotSyncing);
self.syncing.set(false);
}
@@ -1448,8 +1450,6 @@ impl WalFile {
}
}
CheckpointState::AccumulatePage => {
// mark before batching
self.ongoing_checkpoint.scratch_page.set_dirty();
// we read the frame into memory, add it to our batch
self.ongoing_checkpoint
.batch
@@ -1475,30 +1475,19 @@ impl WalFile {
}
CheckpointState::FlushBatch => {
tracing::trace!("started checkpoint backfilling batch");
self.ongoing_checkpoint.pending_flush = Some(write_pages_vectored(
write_pages_vectored(
pager,
std::mem::take(&mut self.ongoing_checkpoint.batch),
)?);
&self.ongoing_checkpoint.pending_flush,
)?;
// batch is queued
self.ongoing_checkpoint.batch.clear();
self.ongoing_checkpoint.state = CheckpointState::WaitFlush;
}
CheckpointState::WaitFlush => {
match self.ongoing_checkpoint.pending_flush.as_ref() {
Some(pf) if pf.done.load(Ordering::Acquire) => {
// flush is done, we can continue
tracing::trace!("checkpoint backfilling batch done");
}
Some(_) => return Ok(IOResult::IO),
None => panic!("we should have a pending flush here"),
if !self.ongoing_checkpoint.pending_flush.is_done() {
return Ok(IOResult::IO);
}
tracing::debug!("finished checkpoint backfilling batch");
let pf = self
.ongoing_checkpoint
.pending_flush
.as_ref()
.expect("we should have a pending flush here");
pf.clear_dirty(pager);
// done with batch
let shared = self.get_shared();
if (self.ongoing_checkpoint.current_page as usize)
@@ -1516,12 +1505,10 @@ impl WalFile {
// In Restart or Truncate mode, we need to restart the log over and possibly truncate the file
// Release all locks and return the current num of wal frames and the amount we backfilled
CheckpointState::Done => {
if let Some(pf) = self.ongoing_checkpoint.pending_flush.as_ref() {
turso_assert!(
pf.done.load(Ordering::Relaxed),
"checkpoint pending flush must have finished"
);
}
turso_assert!(
self.ongoing_checkpoint.pending_flush.is_done(),
"checkpoint pending flush must have finished"
);
let mut checkpoint_result = {
let shared = self.get_shared();
let current_mx = shared.max_frame.load(Ordering::Acquire);
@@ -1584,10 +1571,9 @@ impl WalFile {
} else {
let _ = self.checkpoint_guard.take();
}
self.ongoing_checkpoint.scratch_page.clear_dirty();
self.ongoing_checkpoint.scratch_page.get().id = 0;
self.ongoing_checkpoint.scratch_page.get().contents = None;
let _ = self.ongoing_checkpoint.pending_flush.take();
self.ongoing_checkpoint.pending_flush.clear();
self.ongoing_checkpoint.batch.clear();
self.ongoing_checkpoint.state = CheckpointState::Start;
return Ok(IOResult::Done(checkpoint_result));