diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 9ca59d5e7..c188256ff 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -948,9 +948,10 @@ pub fn begin_write_btree_page( pub fn write_pages_vectored( pager: &Pager, batch: BTreeMap>, -) -> Result { + flush: &PendingFlush, +) -> Result<()> { if batch.is_empty() { - return Ok(PendingFlush::default()); + return Ok(()); } // batch item array is already sorted by id, so we just need to find contiguous ranges of page_id's @@ -974,7 +975,8 @@ pub fn write_pages_vectored( // Create the atomic counters let runs_left = Arc::new(AtomicUsize::new(run_count)); - let done = Arc::new(AtomicBool::new(false)); + flush.new_flush(); + let done = flush.done.clone(); // we know how many runs, but we don't know how many buffers per run, so we can only give an // estimate of the capacity const EST_BUFF_CAPACITY: usize = 32; @@ -983,7 +985,6 @@ pub fn write_pages_vectored( // We can reuse this across runs without reallocating let mut run_bufs = Vec::with_capacity(EST_BUFF_CAPACITY); let mut run_start_id: Option = None; - let mut all_ids = Vec::with_capacity(batch.len()); // Iterate through the batch let mut iter = batch.into_iter().peekable(); @@ -996,7 +997,6 @@ pub fn write_pages_vectored( // Add this page to the current run run_bufs.push(item); - all_ids.push(id); // Check if this is the end of a run let is_end_of_run = match iter.peek() { @@ -1016,31 +1016,23 @@ pub fn write_pages_vectored( }); // Submit write operation for this run, decrementing the counter if we error - if let Err(e) = pager - .db_file - .write_pages(start_id, page_sz, run_bufs.clone(), c) - { + if let Err(e) = pager.db_file.write_pages( + start_id, + page_sz, + std::mem::replace(&mut run_bufs, Vec::with_capacity(EST_BUFF_CAPACITY)), + c, + ) { if runs_left.fetch_sub(1, Ordering::AcqRel) == 1 { done.store(true, Ordering::Release); } return Err(e); } - - // Reset for next run - run_bufs.clear(); run_start_id = None; } } - tracing::debug!( - "write_pages_vectored: {} pages to write, runs: {run_count}", - all_ids.len() - ); - - Ok(PendingFlush { - pages: all_ids, - done, - }) + tracing::debug!("write_pages_vectored: total runs={run_count}"); + Ok(()) } #[instrument(skip_all, level = Level::DEBUG)] diff --git a/core/storage/wal.rs b/core/storage/wal.rs index d4ae837fb..cd6b30e30 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -370,15 +370,13 @@ struct OngoingCheckpoint { scratch_page: PageRef, batch: Batch, state: CheckpointState, - pending_flush: Option, + pending_flush: PendingFlush, min_frame: u64, max_frame: u64, current_page: u64, } pub(super) struct PendingFlush { - // page ids to clear - pub(super) pages: Vec, // completion flag set by IO callback pub(super) done: Arc, } @@ -392,17 +390,21 @@ impl Default for PendingFlush { impl PendingFlush { pub fn new() -> Self { Self { - pages: Vec::with_capacity(CKPT_BATCH_PAGES), - done: Arc::new(AtomicBool::new(false)), + done: Arc::new(AtomicBool::new(true)), } } - // clear the dirty flag of all pages in the pending flush batch - fn clear_dirty(&self, pager: &Pager) { - for id in &self.pages { - if let Some(p) = pager.cache_get(*id) { - p.clear_dirty(); - } - } + pub(super) fn new_flush(&self) { + turso_assert!( + self.is_done(), + "should not reset new flush without being in done state" + ); + self.done.store(false, Ordering::Release); + } + fn clear(&self) { + self.done.store(true, Ordering::Relaxed); + } + fn is_done(&self) -> bool { + self.done.load(Ordering::Acquire) } } @@ -1239,7 +1241,7 @@ impl WalFile { ongoing_checkpoint: OngoingCheckpoint { scratch_page: checkpoint_page, batch: Batch::new(), - pending_flush: None, + pending_flush: PendingFlush::new(), state: CheckpointState::Start, min_frame: 0, max_frame: 0, @@ -1303,7 +1305,7 @@ impl WalFile { self.ongoing_checkpoint.current_page = 0; self.max_frame_read_lock_index.set(NO_LOCK_HELD); self.ongoing_checkpoint.batch.clear(); - let _ = self.ongoing_checkpoint.pending_flush.take(); + self.ongoing_checkpoint.pending_flush.clear(); self.sync_state.set(SyncState::NotSyncing); self.syncing.set(false); } @@ -1448,8 +1450,6 @@ impl WalFile { } } CheckpointState::AccumulatePage => { - // mark before batching - self.ongoing_checkpoint.scratch_page.set_dirty(); // we read the frame into memory, add it to our batch self.ongoing_checkpoint .batch @@ -1475,30 +1475,19 @@ impl WalFile { } CheckpointState::FlushBatch => { tracing::trace!("started checkpoint backfilling batch"); - self.ongoing_checkpoint.pending_flush = Some(write_pages_vectored( + write_pages_vectored( pager, std::mem::take(&mut self.ongoing_checkpoint.batch), - )?); + &self.ongoing_checkpoint.pending_flush, + )?; // batch is queued - self.ongoing_checkpoint.batch.clear(); self.ongoing_checkpoint.state = CheckpointState::WaitFlush; } CheckpointState::WaitFlush => { - match self.ongoing_checkpoint.pending_flush.as_ref() { - Some(pf) if pf.done.load(Ordering::Acquire) => { - // flush is done, we can continue - tracing::trace!("checkpoint backfilling batch done"); - } - Some(_) => return Ok(IOResult::IO), - None => panic!("we should have a pending flush here"), + if !self.ongoing_checkpoint.pending_flush.is_done() { + return Ok(IOResult::IO); } tracing::debug!("finished checkpoint backfilling batch"); - let pf = self - .ongoing_checkpoint - .pending_flush - .as_ref() - .expect("we should have a pending flush here"); - pf.clear_dirty(pager); // done with batch let shared = self.get_shared(); if (self.ongoing_checkpoint.current_page as usize) @@ -1516,12 +1505,10 @@ impl WalFile { // In Restart or Truncate mode, we need to restart the log over and possibly truncate the file // Release all locks and return the current num of wal frames and the amount we backfilled CheckpointState::Done => { - if let Some(pf) = self.ongoing_checkpoint.pending_flush.as_ref() { - turso_assert!( - pf.done.load(Ordering::Relaxed), - "checkpoint pending flush must have finished" - ); - } + turso_assert!( + self.ongoing_checkpoint.pending_flush.is_done(), + "checkpoint pending flush must have finished" + ); let mut checkpoint_result = { let shared = self.get_shared(); let current_mx = shared.max_frame.load(Ordering::Acquire); @@ -1584,10 +1571,9 @@ impl WalFile { } else { let _ = self.checkpoint_guard.take(); } - self.ongoing_checkpoint.scratch_page.clear_dirty(); self.ongoing_checkpoint.scratch_page.get().id = 0; self.ongoing_checkpoint.scratch_page.get().contents = None; - let _ = self.ongoing_checkpoint.pending_flush.take(); + self.ongoing_checkpoint.pending_flush.clear(); self.ongoing_checkpoint.batch.clear(); self.ongoing_checkpoint.state = CheckpointState::Start; return Ok(IOResult::Done(checkpoint_result));