diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 881afacd0..c590219bd 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -15,6 +15,7 @@ use super::buffer_pool::BufferPool; use super::pager::{PageRef, Pager}; use super::sqlite3_ondisk::{self, checksum_wal, WalHeader, WAL_MAGIC_BE, WAL_MAGIC_LE}; use crate::fast_lock::SpinLock; +use crate::io::CompletionGroup; use crate::io::{clock, File, IO}; use crate::storage::database::EncryptionOrChecksum; use crate::storage::sqlite3_ondisk::{ @@ -23,8 +24,8 @@ use crate::storage::sqlite3_ondisk::{ }; use crate::types::{IOCompletions, IOResult}; use crate::{ - bail_corrupt_error, io_yield_many, io_yield_one, return_if_io, turso_assert, Buffer, - Completion, CompletionError, IOContext, LimboError, Result, + bail_corrupt_error, io_yield_one, return_if_io, turso_assert, Buffer, Completion, + CompletionError, IOContext, LimboError, Result, }; #[derive(Debug, Clone, Default)] @@ -1823,8 +1824,9 @@ impl WalFile { // to prevent serialization, and we try to issue reads and flush batches concurrently // if at all possible, at the cost of some batching potential. CheckpointState::Processing => { - // Gather I/O completions, estimate with MAX_INFLIGHT_WRITES to prevent realloc - let mut completions = Vec::with_capacity(MAX_INFLIGHT_WRITES); + // Gather I/O completions using a completion group + let mut nr_completions = 0; + let mut group = CompletionGroup::new(|_| {}); // Check and clean any completed writes from pending flush if self.ongoing_checkpoint.process_inflight_writes() { @@ -1891,7 +1893,8 @@ impl WalFile { // the frame requirements let inflight = self.issue_wal_read_into_buffer(page_id as usize, target_frame)?; - completions.push(inflight.completion.clone()); + group.add(&inflight.completion); + nr_completions += 1; self.ongoing_checkpoint.inflight_reads.push(inflight); self.ongoing_checkpoint.current_page += 1; } @@ -1903,12 +1906,15 @@ impl WalFile { let batch_map = self.ongoing_checkpoint.pending_writes.take(); if !batch_map.is_empty() { let done_flag = self.ongoing_checkpoint.add_write(); - completions.extend(write_pages_vectored(pager, batch_map, done_flag)?); + for c in write_pages_vectored(pager, batch_map, done_flag)? { + group.add(&c); + nr_completions += 1; + } } } - if !completions.is_empty() { - io_yield_many!(completions); + if nr_completions > 0 { + io_yield_one!(group.build()); } else if self.ongoing_checkpoint.complete() { self.ongoing_checkpoint.state = CheckpointState::Finalize; }