From e2896d2f95db8894b87dbea2ac8e168726b8f677 Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Mon, 11 Aug 2025 16:01:41 -0400 Subject: [PATCH] Minor improvement in flush api --- core/storage/sqlite3_ondisk.rs | 2 +- core/storage/wal.rs | 24 +++++++++++++----------- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 4301b6e72..d400a7837 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -975,8 +975,8 @@ pub fn write_pages_vectored( // Create the atomic counters let runs_left = Arc::new(AtomicUsize::new(run_count)); + flush.new_flush(); let done = flush.done.clone(); - done.store(false, Ordering::Release); // we know how many runs, but we don't know how many buffers per run, so we can only give an // estimate of the capacity const EST_BUFF_CAPACITY: usize = 32; diff --git a/core/storage/wal.rs b/core/storage/wal.rs index f4ef2c925..91d0aa7c1 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -393,9 +393,19 @@ impl PendingFlush { done: Arc::new(AtomicBool::new(true)), } } - fn clear(&mut self) { + pub(super) fn new_flush(&self) { + turso_assert!( + self.is_done(), + "should not reset new flush without being in done state" + ); + self.done.store(false, Ordering::Release); + } + fn clear(&self) { self.done.store(true, Ordering::Relaxed); } + fn is_done(&self) -> bool { + self.done.load(Ordering::Acquire) + } } impl fmt::Debug for OngoingCheckpoint { @@ -1476,12 +1486,7 @@ impl WalFile { self.ongoing_checkpoint.state = CheckpointState::WaitFlush; } CheckpointState::WaitFlush => { - if !self - .ongoing_checkpoint - .pending_flush - .done - .load(Ordering::Acquire) - { + if !self.ongoing_checkpoint.pending_flush.is_done() { return Ok(IOResult::IO); } tracing::debug!("finished checkpoint backfilling batch"); @@ -1503,10 +1508,7 @@ impl WalFile { // Release all locks and return the current num of wal frames and the amount we backfilled CheckpointState::Done => { turso_assert!( - self.ongoing_checkpoint - .pending_flush - .done - .load(Ordering::Relaxed), + self.ongoing_checkpoint.pending_flush.is_done(), "checkpoint pending flush must have finished" ); let mut checkpoint_result = {