From 703cb4a70f87ad44f0a4a2ca11b0caef773c443f Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Sun, 14 Sep 2025 10:39:52 -0400 Subject: [PATCH] Link all writes to the fsync barrier, not just the commit frame --- core/storage/sqlite3_ondisk.rs | 75 +++++++++------------------------- core/storage/wal.rs | 18 +------- 2 files changed, 21 insertions(+), 72 deletions(-) diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 96dc77e29..5031e50da 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -996,7 +996,6 @@ pub fn write_pages_vectored( pager: &Pager, batch: BTreeMap>, done_flag: Arc, - final_write: bool, ) -> Result> { if batch.is_empty() { done_flag.store(true, Ordering::Relaxed); @@ -1004,90 +1003,54 @@ pub fn write_pages_vectored( } let page_sz = pager.page_size.get().expect("page size is not set").get() as usize; - // Count expected number of runs to create the atomic counter we need to track each batch - let mut run_count = 0; - let mut prev_id = None; - for &id in batch.keys() { - if let Some(prev) = prev_id { - if id != prev + 1 { - run_count += 1; - } - } else { - run_count = 1; // First run - } - prev_id = Some(id); - } - - // Create the atomic counters + let run_count = batch + .keys() + .zip(batch.keys().skip(1)) + .filter(|(&curr, &next)| next != curr + 1) + .count() + + 1; let runs_left = Arc::new(AtomicUsize::new(run_count)); - let done = done_flag.clone(); - const EST_BUFF_CAPACITY: usize = 32; + const EST_BUFF_CAPACITY: usize = 32; let mut run_bufs = Vec::with_capacity(EST_BUFF_CAPACITY); let mut run_start_id: Option = None; - - // Track which run we're on to identify the last one - let mut current_run = 0; - let mut iter = batch.iter().peekable(); let mut completions = Vec::new(); - while let Some((id, item)) = iter.next() { - // Track the start of the run + let mut iter = batch.iter().peekable(); + while let Some((id, buffer)) = iter.next() { if run_start_id.is_none() { run_start_id = Some(*id); } - run_bufs.push(item.clone()); + run_bufs.push(buffer.clone()); - // Check if this is the end of a run - let is_end_of_run = match iter.peek() { - Some(&(next_id, _)) => *next_id != id + 1, - None => true, - }; - - if is_end_of_run { - current_run += 1; + if iter.peek().is_none_or(|(next_id, _)| **next_id != id + 1) { let start_id = run_start_id.expect("should have a start id"); let runs_left_cl = runs_left.clone(); - let done_cl = done.clone(); - - // This is the last chunk if it's the last run AND final_write is true - let is_last_chunk = current_run == run_count && final_write; - + let done_cl = done_flag.clone(); let total_sz = (page_sz * run_bufs.len()) as i32; - let cmp = move |res| { - let Ok(res) = res else { - return; - }; + + let cmp = Completion::new_write_linked(move |res| { + let Ok(res) = res else { return }; turso_assert!(total_sz == res, "failed to write expected size"); if runs_left_cl.fetch_sub(1, Ordering::AcqRel) == 1 { done_cl.store(true, Ordering::Release); } - }; + }); - let c = if is_last_chunk { - Completion::new_write_linked(cmp) - } else { - Completion::new_write(cmp) - }; - - // Submit write operation for this run let io_ctx = &pager.io_ctx.borrow(); match pager.db_file.write_pages( start_id, page_sz, std::mem::replace(&mut run_bufs, Vec::with_capacity(EST_BUFF_CAPACITY)), io_ctx, - c, + cmp, ) { - Ok(c) => { - completions.push(c); - } + Ok(c) => completions.push(c), Err(e) => { if runs_left.fetch_sub(1, Ordering::AcqRel) == 1 { - done.store(true, Ordering::Release); + done_flag.store(true, Ordering::Release); } pager.io.cancel(&completions)?; - // cancel any submitted completions and drain the IO before returning an error pager.io.drain()?; return Err(e); } diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 637216caa..2a45646df 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -471,13 +471,6 @@ impl OngoingCheckpoint { self.state = CheckpointState::Start; } - #[inline] - fn is_final_write(&self) -> bool { - self.current_page as usize >= self.pages_to_checkpoint.len() - && self.inflight_reads.is_empty() - && !self.pending_writes.is_empty() - } - #[inline] /// Whether or not new reads should be issued during checkpoint processing. fn should_issue_reads(&self) -> bool { @@ -1585,11 +1578,7 @@ impl Wal for WalFile { } }; - let c = if db_size_on_commit.is_some() { - Completion::new_write_linked(cmp) - } else { - Completion::new_write(cmp) - }; + let c = Completion::new_write_linked(cmp); let shared = self.get_shared(); assert!( @@ -1930,10 +1919,7 @@ impl WalFile { let batch_map = self.ongoing_checkpoint.pending_writes.take(); if !batch_map.is_empty() { let done_flag = self.ongoing_checkpoint.add_write(); - let is_final = self.ongoing_checkpoint.is_final_write(); - completions.extend(write_pages_vectored( - pager, batch_map, done_flag, is_final, - )?); + completions.extend(write_pages_vectored(pager, batch_map, done_flag)?); } }