From 1f554c2707865541f9960dc63b8b161170940c4a Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Mon, 11 Aug 2025 15:40:36 -0400 Subject: [PATCH 1/4] Cleanup some minor checkpointing issues --- core/storage/sqlite3_ondisk.rs | 21 ++++-------- core/storage/wal.rs | 61 +++++++++++++--------------------- 2 files changed, 31 insertions(+), 51 deletions(-) diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 9ca59d5e7..4301b6e72 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -948,9 +948,10 @@ pub fn begin_write_btree_page( pub fn write_pages_vectored( pager: &Pager, batch: BTreeMap>, -) -> Result { + flush: &PendingFlush, +) -> Result<()> { if batch.is_empty() { - return Ok(PendingFlush::default()); + return Ok(()); } // batch item array is already sorted by id, so we just need to find contiguous ranges of page_id's @@ -974,7 +975,8 @@ pub fn write_pages_vectored( // Create the atomic counters let runs_left = Arc::new(AtomicUsize::new(run_count)); - let done = Arc::new(AtomicBool::new(false)); + let done = flush.done.clone(); + done.store(false, Ordering::Release); // we know how many runs, but we don't know how many buffers per run, so we can only give an // estimate of the capacity const EST_BUFF_CAPACITY: usize = 32; @@ -983,7 +985,6 @@ pub fn write_pages_vectored( // We can reuse this across runs without reallocating let mut run_bufs = Vec::with_capacity(EST_BUFF_CAPACITY); let mut run_start_id: Option = None; - let mut all_ids = Vec::with_capacity(batch.len()); // Iterate through the batch let mut iter = batch.into_iter().peekable(); @@ -996,7 +997,6 @@ pub fn write_pages_vectored( // Add this page to the current run run_bufs.push(item); - all_ids.push(id); // Check if this is the end of a run let is_end_of_run = match iter.peek() { @@ -1032,15 +1032,8 @@ pub fn write_pages_vectored( } } - tracing::debug!( - "write_pages_vectored: {} pages to write, runs: {run_count}", - all_ids.len() - ); - - Ok(PendingFlush { - pages: all_ids, - done, - }) + tracing::debug!("write_pages_vectored: total runs={run_count}"); + Ok(()) } #[instrument(skip_all, level = Level::DEBUG)] diff --git a/core/storage/wal.rs b/core/storage/wal.rs index d4ae837fb..f4ef2c925 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -370,15 +370,13 @@ struct OngoingCheckpoint { scratch_page: PageRef, batch: Batch, state: CheckpointState, - pending_flush: Option, + pending_flush: PendingFlush, min_frame: u64, max_frame: u64, current_page: u64, } pub(super) struct PendingFlush { - // page ids to clear - pub(super) pages: Vec, // completion flag set by IO callback pub(super) done: Arc, } @@ -392,17 +390,11 @@ impl Default for PendingFlush { impl PendingFlush { pub fn new() -> Self { Self { - pages: Vec::with_capacity(CKPT_BATCH_PAGES), - done: Arc::new(AtomicBool::new(false)), + done: Arc::new(AtomicBool::new(true)), } } - // clear the dirty flag of all pages in the pending flush batch - fn clear_dirty(&self, pager: &Pager) { - for id in &self.pages { - if let Some(p) = pager.cache_get(*id) { - p.clear_dirty(); - } - } + fn clear(&mut self) { + self.done.store(true, Ordering::Relaxed); } } @@ -1239,7 +1231,7 @@ impl WalFile { ongoing_checkpoint: OngoingCheckpoint { scratch_page: checkpoint_page, batch: Batch::new(), - pending_flush: None, + pending_flush: PendingFlush::new(), state: CheckpointState::Start, min_frame: 0, max_frame: 0, @@ -1303,7 +1295,7 @@ impl WalFile { self.ongoing_checkpoint.current_page = 0; self.max_frame_read_lock_index.set(NO_LOCK_HELD); self.ongoing_checkpoint.batch.clear(); - let _ = self.ongoing_checkpoint.pending_flush.take(); + self.ongoing_checkpoint.pending_flush.clear(); self.sync_state.set(SyncState::NotSyncing); self.syncing.set(false); } @@ -1475,30 +1467,24 @@ impl WalFile { } CheckpointState::FlushBatch => { tracing::trace!("started checkpoint backfilling batch"); - self.ongoing_checkpoint.pending_flush = Some(write_pages_vectored( + write_pages_vectored( pager, std::mem::take(&mut self.ongoing_checkpoint.batch), - )?); + &self.ongoing_checkpoint.pending_flush, + )?; // batch is queued - self.ongoing_checkpoint.batch.clear(); self.ongoing_checkpoint.state = CheckpointState::WaitFlush; } CheckpointState::WaitFlush => { - match self.ongoing_checkpoint.pending_flush.as_ref() { - Some(pf) if pf.done.load(Ordering::Acquire) => { - // flush is done, we can continue - tracing::trace!("checkpoint backfilling batch done"); - } - Some(_) => return Ok(IOResult::IO), - None => panic!("we should have a pending flush here"), - } - tracing::debug!("finished checkpoint backfilling batch"); - let pf = self + if !self .ongoing_checkpoint .pending_flush - .as_ref() - .expect("we should have a pending flush here"); - pf.clear_dirty(pager); + .done + .load(Ordering::Acquire) + { + return Ok(IOResult::IO); + } + tracing::debug!("finished checkpoint backfilling batch"); // done with batch let shared = self.get_shared(); if (self.ongoing_checkpoint.current_page as usize) @@ -1516,12 +1502,13 @@ impl WalFile { // In Restart or Truncate mode, we need to restart the log over and possibly truncate the file // Release all locks and return the current num of wal frames and the amount we backfilled CheckpointState::Done => { - if let Some(pf) = self.ongoing_checkpoint.pending_flush.as_ref() { - turso_assert!( - pf.done.load(Ordering::Relaxed), - "checkpoint pending flush must have finished" - ); - } + turso_assert!( + self.ongoing_checkpoint + .pending_flush + .done + .load(Ordering::Relaxed), + "checkpoint pending flush must have finished" + ); let mut checkpoint_result = { let shared = self.get_shared(); let current_mx = shared.max_frame.load(Ordering::Acquire); @@ -1587,7 +1574,7 @@ impl WalFile { self.ongoing_checkpoint.scratch_page.clear_dirty(); self.ongoing_checkpoint.scratch_page.get().id = 0; self.ongoing_checkpoint.scratch_page.get().contents = None; - let _ = self.ongoing_checkpoint.pending_flush.take(); + self.ongoing_checkpoint.pending_flush.clear(); self.ongoing_checkpoint.batch.clear(); self.ongoing_checkpoint.state = CheckpointState::Start; return Ok(IOResult::Done(checkpoint_result)); From e2896d2f95db8894b87dbea2ac8e168726b8f677 Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Mon, 11 Aug 2025 16:01:41 -0400 Subject: [PATCH 2/4] Minor improvement in flush api --- core/storage/sqlite3_ondisk.rs | 2 +- core/storage/wal.rs | 24 +++++++++++++----------- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 4301b6e72..d400a7837 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -975,8 +975,8 @@ pub fn write_pages_vectored( // Create the atomic counters let runs_left = Arc::new(AtomicUsize::new(run_count)); + flush.new_flush(); let done = flush.done.clone(); - done.store(false, Ordering::Release); // we know how many runs, but we don't know how many buffers per run, so we can only give an // estimate of the capacity const EST_BUFF_CAPACITY: usize = 32; diff --git a/core/storage/wal.rs b/core/storage/wal.rs index f4ef2c925..91d0aa7c1 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -393,9 +393,19 @@ impl PendingFlush { done: Arc::new(AtomicBool::new(true)), } } - fn clear(&mut self) { + pub(super) fn new_flush(&self) { + turso_assert!( + self.is_done(), + "should not reset new flush without being in done state" + ); + self.done.store(false, Ordering::Release); + } + fn clear(&self) { self.done.store(true, Ordering::Relaxed); } + fn is_done(&self) -> bool { + self.done.load(Ordering::Acquire) + } } impl fmt::Debug for OngoingCheckpoint { @@ -1476,12 +1486,7 @@ impl WalFile { self.ongoing_checkpoint.state = CheckpointState::WaitFlush; } CheckpointState::WaitFlush => { - if !self - .ongoing_checkpoint - .pending_flush - .done - .load(Ordering::Acquire) - { + if !self.ongoing_checkpoint.pending_flush.is_done() { return Ok(IOResult::IO); } tracing::debug!("finished checkpoint backfilling batch"); @@ -1503,10 +1508,7 @@ impl WalFile { // Release all locks and return the current num of wal frames and the amount we backfilled CheckpointState::Done => { turso_assert!( - self.ongoing_checkpoint - .pending_flush - .done - .load(Ordering::Relaxed), + self.ongoing_checkpoint.pending_flush.is_done(), "checkpoint pending flush must have finished" ); let mut checkpoint_result = { From 688851c97a4db22ded728e9e1e779bd44c0e9e7b Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Mon, 11 Aug 2025 16:18:43 -0400 Subject: [PATCH 3/4] Take instead of clone vec of buffers in write_pages_vectored --- core/storage/sqlite3_ondisk.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index d400a7837..2054b703f 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -1016,9 +1016,10 @@ pub fn write_pages_vectored( }); // Submit write operation for this run, decrementing the counter if we error - if let Err(e) = pager - .db_file - .write_pages(start_id, page_sz, run_bufs.clone(), c) + if let Err(e) = + pager + .db_file + .write_pages(start_id, page_sz, std::mem::take(&mut run_bufs), c) { if runs_left.fetch_sub(1, Ordering::AcqRel) == 1 { done.store(true, Ordering::Release); From c098a48924243f860db6388a1098254438dac758 Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Mon, 11 Aug 2025 16:40:05 -0400 Subject: [PATCH 4/4] Remove clear dirty from checkpoint state machine in sratch page --- core/storage/sqlite3_ondisk.rs | 14 ++++++-------- core/storage/wal.rs | 3 --- 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 2054b703f..c188256ff 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -1016,19 +1016,17 @@ pub fn write_pages_vectored( }); // Submit write operation for this run, decrementing the counter if we error - if let Err(e) = - pager - .db_file - .write_pages(start_id, page_sz, std::mem::take(&mut run_bufs), c) - { + if let Err(e) = pager.db_file.write_pages( + start_id, + page_sz, + std::mem::replace(&mut run_bufs, Vec::with_capacity(EST_BUFF_CAPACITY)), + c, + ) { if runs_left.fetch_sub(1, Ordering::AcqRel) == 1 { done.store(true, Ordering::Release); } return Err(e); } - - // Reset for next run - run_bufs.clear(); run_start_id = None; } } diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 91d0aa7c1..cd6b30e30 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -1450,8 +1450,6 @@ impl WalFile { } } CheckpointState::AccumulatePage => { - // mark before batching - self.ongoing_checkpoint.scratch_page.set_dirty(); // we read the frame into memory, add it to our batch self.ongoing_checkpoint .batch @@ -1573,7 +1571,6 @@ impl WalFile { } else { let _ = self.checkpoint_guard.take(); } - self.ongoing_checkpoint.scratch_page.clear_dirty(); self.ongoing_checkpoint.scratch_page.get().id = 0; self.ongoing_checkpoint.scratch_page.get().contents = None; self.ongoing_checkpoint.pending_flush.clear();