core/storage: Switch checkpoint_inner() to completion group

This commit is contained in:
Pekka Enberg
2025-10-03 14:36:01 +03:00
parent c27b167c6d
commit be6f3d09ea

View File

@@ -15,6 +15,7 @@ use super::buffer_pool::BufferPool;
use super::pager::{PageRef, Pager};
use super::sqlite3_ondisk::{self, checksum_wal, WalHeader, WAL_MAGIC_BE, WAL_MAGIC_LE};
use crate::fast_lock::SpinLock;
use crate::io::CompletionGroup;
use crate::io::{clock, File, IO};
use crate::storage::database::EncryptionOrChecksum;
use crate::storage::sqlite3_ondisk::{
@@ -23,8 +24,8 @@ use crate::storage::sqlite3_ondisk::{
};
use crate::types::{IOCompletions, IOResult};
use crate::{
bail_corrupt_error, io_yield_many, io_yield_one, return_if_io, turso_assert, Buffer,
Completion, CompletionError, IOContext, LimboError, Result,
bail_corrupt_error, io_yield_one, return_if_io, turso_assert, Buffer, Completion,
CompletionError, IOContext, LimboError, Result,
};
#[derive(Debug, Clone, Default)]
@@ -1823,8 +1824,9 @@ impl WalFile {
// to prevent serialization, and we try to issue reads and flush batches concurrently
// if at all possible, at the cost of some batching potential.
CheckpointState::Processing => {
// Gather I/O completions, estimate with MAX_INFLIGHT_WRITES to prevent realloc
let mut completions = Vec::with_capacity(MAX_INFLIGHT_WRITES);
// Gather I/O completions using a completion group
let mut nr_completions = 0;
let mut group = CompletionGroup::new(|_| {});
// Check and clean any completed writes from pending flush
if self.ongoing_checkpoint.process_inflight_writes() {
@@ -1891,7 +1893,8 @@ impl WalFile {
// the frame requirements
let inflight =
self.issue_wal_read_into_buffer(page_id as usize, target_frame)?;
completions.push(inflight.completion.clone());
group.add(&inflight.completion);
nr_completions += 1;
self.ongoing_checkpoint.inflight_reads.push(inflight);
self.ongoing_checkpoint.current_page += 1;
}
@@ -1903,12 +1906,15 @@ impl WalFile {
let batch_map = self.ongoing_checkpoint.pending_writes.take();
if !batch_map.is_empty() {
let done_flag = self.ongoing_checkpoint.add_write();
completions.extend(write_pages_vectored(pager, batch_map, done_flag)?);
for c in write_pages_vectored(pager, batch_map, done_flag)? {
group.add(&c);
nr_completions += 1;
}
}
}
if !completions.is_empty() {
io_yield_many!(completions);
if nr_completions > 0 {
io_yield_one!(group.build());
} else if self.ongoing_checkpoint.complete() {
self.ongoing_checkpoint.state = CheckpointState::Finalize;
}