adjust wal checkpoint to return completions

This commit is contained in:
pedrocarlo
2025-08-12 14:25:23 -03:00
committed by Jussi Saurio
parent 10ad43d9e0
commit a7f5912e7d
2 changed files with 26 additions and 26 deletions

View File

@@ -949,9 +949,9 @@ pub fn write_pages_vectored(
pager: &Pager,
batch: BTreeMap<usize, Arc<Buffer>>,
flush: &PendingFlush,
) -> Result<()> {
) -> Result<Vec<Completion>> {
if batch.is_empty() {
return Ok(());
return Ok(Vec::new());
}
// batch item array is already sorted by id, so we just need to find contiguous ranges of page_id's
@@ -989,6 +989,7 @@ pub fn write_pages_vectored(
// Iterate through the batch
let mut iter = batch.into_iter().peekable();
let mut completions = Vec::new();
while let Some((id, item)) = iter.next() {
// Track the start of the run
if run_start_id.is_none() {
@@ -1016,23 +1017,28 @@ pub fn write_pages_vectored(
});
// Submit write operation for this run, decrementing the counter if we error
if let Err(e) = pager.db_file.write_pages(
match pager.db_file.write_pages(
start_id,
page_sz,
std::mem::replace(&mut run_bufs, Vec::with_capacity(EST_BUFF_CAPACITY)),
c,
) {
if runs_left.fetch_sub(1, Ordering::AcqRel) == 1 {
done.store(true, Ordering::Release);
Ok(c) => {
completions.push(c);
}
Err(e) => {
if runs_left.fetch_sub(1, Ordering::AcqRel) == 1 {
done.store(true, Ordering::Release);
}
return Err(e);
}
return Err(e);
}
run_start_id = None;
}
}
tracing::debug!("write_pages_vectored: total runs={run_count}");
Ok(())
Ok(completions)
}
#[instrument(skip_all, level = Level::DEBUG)]

View File

@@ -18,7 +18,7 @@ use crate::storage::sqlite3_ondisk::{
begin_read_wal_frame, begin_read_wal_frame_raw, finish_read_page, prepare_wal_frame,
write_pages_vectored, WAL_FRAME_HEADER_SIZE, WAL_HEADER_SIZE,
};
use crate::types::IOResult;
use crate::types::{IOCompletions, IOResult};
use crate::{turso_assert, Buffer, LimboError, Result};
use crate::{Completion, Page};
@@ -286,10 +286,9 @@ pub trait Wal {
pub enum CheckpointState {
Start,
ReadFrame,
WaitReadFrame,
AccumulatePage,
FlushBatch,
WaitFlush,
AfterFlush,
Done,
}
@@ -1392,24 +1391,17 @@ impl WalFile {
*frame
);
self.ongoing_checkpoint.scratch_page.get().id = page as usize;
let _ = self.read_frame(
let c = self.read_frame(
*frame,
self.ongoing_checkpoint.scratch_page.clone(),
self.buffer_pool.clone(),
)?;
self.ongoing_checkpoint.state = CheckpointState::WaitReadFrame;
continue 'checkpoint_loop;
self.ongoing_checkpoint.state = CheckpointState::AccumulatePage;
return Ok(IOResult::IO(IOCompletions::Single(c)));
}
}
self.ongoing_checkpoint.current_page += 1;
}
CheckpointState::WaitReadFrame => {
if self.ongoing_checkpoint.scratch_page.is_locked() {
return Ok(IOResult::IO);
} else {
self.ongoing_checkpoint.state = CheckpointState::AccumulatePage;
}
}
CheckpointState::AccumulatePage => {
// we read the frame into memory, add it to our batch
self.ongoing_checkpoint
@@ -1436,18 +1428,20 @@ impl WalFile {
}
CheckpointState::FlushBatch => {
tracing::trace!("started checkpoint backfilling batch");
write_pages_vectored(
let completions = write_pages_vectored(
pager,
std::mem::take(&mut self.ongoing_checkpoint.batch),
&self.ongoing_checkpoint.pending_flush,
)?;
// batch is queued
self.ongoing_checkpoint.state = CheckpointState::WaitFlush;
self.ongoing_checkpoint.state = CheckpointState::AfterFlush;
return Ok(IOResult::IO(IOCompletions::Many(completions)));
}
CheckpointState::WaitFlush => {
if !self.ongoing_checkpoint.pending_flush.is_done() {
return Ok(IOResult::IO);
}
CheckpointState::AfterFlush => {
turso_assert!(
self.ongoing_checkpoint.pending_flush.is_done(),
"flush should be done"
);
tracing::debug!("finished checkpoint backfilling batch");
// done with batch
let shared = self.get_shared();