From 62f004c8986333cb029dc52fb8ee95321fbbdf69 Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Fri, 25 Jul 2025 20:31:23 -0400 Subject: [PATCH] Fix write counter for writev batching in checkpoint --- core/storage/pager.rs | 5 +++-- core/storage/sqlite3_ondisk.rs | 26 +++++++++++++++++++------- core/storage/wal.rs | 6 +++--- 3 files changed, 25 insertions(+), 12 deletions(-) diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 9889988ea..768b8c6c1 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -1303,11 +1303,12 @@ impl Pager { return Ok(CheckpointResult::default()); } - let counter = Rc::new(RefCell::new(0)); - let mut checkpoint_result = self.io.block(|| { + let write_counter = Rc::new(RefCell::new(0)); + let checkpoint_result = self.io.block(|| { self.wal .borrow_mut() .checkpoint(self, counter.clone(), mode) + .map_err(|err| panic!("error while clearing cache {err}")) })?; if checkpoint_result.everything_backfilled() diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 78d6a5dd5..b1c07a86a 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -61,7 +61,7 @@ use crate::storage::pager::Pager; use crate::storage::wal::{BatchItem, PendingFlush}; use crate::types::{RawSlice, RefValue, SerialType, SerialTypeKind, TextRef, TextSubtype}; use crate::{turso_assert, File, Result, WalFileShared}; -use std::cell::{RefCell, UnsafeCell}; +use std::cell::{Cell, RefCell, UnsafeCell}; use std::collections::HashMap; use std::mem::MaybeUninit; use std::pin::Pin; @@ -857,7 +857,8 @@ pub fn begin_write_btree_page( pub fn begin_write_btree_pages_writev( pager: &Pager, batch: &[BatchItem], - write_counter: Rc>, + // track writes for each flush series + write_counter: Rc>, ) -> Result { if batch.is_empty() { return Ok(PendingFlush::default()); @@ -878,23 +879,34 @@ pub fn begin_write_btree_pages_writev( } // submit contiguous run - let first = run[start].id; + let first_id = run[start].id; let bufs: Vec<_> = run[start..end].iter().map(|b| b.buf.clone()).collect(); all_ids.extend(run[start..end].iter().map(|b| b.id)); - *write_counter.borrow_mut() += 1; + write_counter.set(write_counter.get() + 1); let wc = write_counter.clone(); let done_clone = done.clone(); let c = Completion::new_write(move |_| { // one run finished - *wc.borrow_mut() -= 1; - if wc.borrow().eq(&0) { + wc.set(wc.get() - 1); + if wc.get().eq(&0) { // last run of this batch is done done_clone.store(true, Ordering::Release); } }); - pager.db_file.write_pages(first, page_sz, bufs, c)?; + pager + .db_file + .write_pages(first_id, page_sz, bufs, c) + .inspect_err(|e| { + tracing::error!( + "Failed to write pages {}-{}: {}", + first_id, + first_id + (end - start) - 1, + e + ); + write_counter.set(write_counter.get() - 1); + })?; start = end; } Ok(PendingFlush { diff --git a/core/storage/wal.rs b/core/storage/wal.rs index c4f265e7b..9e345a59c 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -399,7 +399,7 @@ pub enum CheckpointState { Done, } -const CKPT_BATCH_PAGES: usize = 256; +const CKPT_BATCH_PAGES: usize = 512; #[derive(Clone)] pub(super) struct BatchItem { @@ -416,7 +416,7 @@ pub(super) struct BatchItem { // current_page is a helper to iterate through all the pages that might have a frame in the safe // range. This is inefficient for now. struct OngoingCheckpoint { - scratch: PageRef, + scratch_page: PageRef, batch: Vec, state: CheckpointState, pending_flushes: Vec, @@ -1261,7 +1261,7 @@ impl WalFile { max_frame: unsafe { (*shared.get()).max_frame.load(Ordering::SeqCst) }, shared, ongoing_checkpoint: OngoingCheckpoint { - scratch: checkpoint_page, + scratch_page: checkpoint_page, batch: Vec::new(), pending_flushes: Vec::new(), state: CheckpointState::Start,