diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 27dd7a35e..0d877fb62 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -949,9 +949,9 @@ pub fn write_pages_vectored( pager: &Pager, batch: BTreeMap>, flush: &PendingFlush, -) -> Result<()> { +) -> Result> { if batch.is_empty() { - return Ok(()); + return Ok(Vec::new()); } // batch item array is already sorted by id, so we just need to find contiguous ranges of page_id's @@ -989,6 +989,7 @@ pub fn write_pages_vectored( // Iterate through the batch let mut iter = batch.into_iter().peekable(); + let mut completions = Vec::new(); while let Some((id, item)) = iter.next() { // Track the start of the run if run_start_id.is_none() { @@ -1016,23 +1017,28 @@ pub fn write_pages_vectored( }); // Submit write operation for this run, decrementing the counter if we error - if let Err(e) = pager.db_file.write_pages( + match pager.db_file.write_pages( start_id, page_sz, std::mem::replace(&mut run_bufs, Vec::with_capacity(EST_BUFF_CAPACITY)), c, ) { - if runs_left.fetch_sub(1, Ordering::AcqRel) == 1 { - done.store(true, Ordering::Release); + Ok(c) => { + completions.push(c); + } + Err(e) => { + if runs_left.fetch_sub(1, Ordering::AcqRel) == 1 { + done.store(true, Ordering::Release); + } + return Err(e); } - return Err(e); } run_start_id = None; } } tracing::debug!("write_pages_vectored: total runs={run_count}"); - Ok(()) + Ok(completions) } #[instrument(skip_all, level = Level::DEBUG)] diff --git a/core/storage/wal.rs b/core/storage/wal.rs index e6a26efa4..b70bfe583 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -18,7 +18,7 @@ use crate::storage::sqlite3_ondisk::{ begin_read_wal_frame, begin_read_wal_frame_raw, finish_read_page, prepare_wal_frame, write_pages_vectored, WAL_FRAME_HEADER_SIZE, WAL_HEADER_SIZE, }; -use crate::types::IOResult; +use crate::types::{IOCompletions, IOResult}; use crate::{turso_assert, Buffer, LimboError, Result}; use crate::{Completion, Page}; @@ -286,10 +286,9 @@ pub trait Wal { pub enum CheckpointState { Start, ReadFrame, - WaitReadFrame, AccumulatePage, FlushBatch, - WaitFlush, + AfterFlush, Done, } @@ -1392,24 +1391,17 @@ impl WalFile { *frame ); self.ongoing_checkpoint.scratch_page.get().id = page as usize; - let _ = self.read_frame( + let c = self.read_frame( *frame, self.ongoing_checkpoint.scratch_page.clone(), self.buffer_pool.clone(), )?; - self.ongoing_checkpoint.state = CheckpointState::WaitReadFrame; - continue 'checkpoint_loop; + self.ongoing_checkpoint.state = CheckpointState::AccumulatePage; + return Ok(IOResult::IO(IOCompletions::Single(c))); } } self.ongoing_checkpoint.current_page += 1; } - CheckpointState::WaitReadFrame => { - if self.ongoing_checkpoint.scratch_page.is_locked() { - return Ok(IOResult::IO); - } else { - self.ongoing_checkpoint.state = CheckpointState::AccumulatePage; - } - } CheckpointState::AccumulatePage => { // we read the frame into memory, add it to our batch self.ongoing_checkpoint @@ -1436,18 +1428,20 @@ impl WalFile { } CheckpointState::FlushBatch => { tracing::trace!("started checkpoint backfilling batch"); - write_pages_vectored( + let completions = write_pages_vectored( pager, std::mem::take(&mut self.ongoing_checkpoint.batch), &self.ongoing_checkpoint.pending_flush, )?; // batch is queued - self.ongoing_checkpoint.state = CheckpointState::WaitFlush; + self.ongoing_checkpoint.state = CheckpointState::AfterFlush; + return Ok(IOResult::IO(IOCompletions::Many(completions))); } - CheckpointState::WaitFlush => { - if !self.ongoing_checkpoint.pending_flush.is_done() { - return Ok(IOResult::IO); - } + CheckpointState::AfterFlush => { + turso_assert!( + self.ongoing_checkpoint.pending_flush.is_done(), + "flush should be done" + ); tracing::debug!("finished checkpoint backfilling batch"); // done with batch let shared = self.get_shared();