Compute the final db page or make the commit frame submit a linked pwritev completion

This commit is contained in:
PThorpe92
2025-09-03 16:00:46 -04:00
parent 3831218f6c
commit e3f366963d
2 changed files with 36 additions and 20 deletions

View File

@@ -995,17 +995,14 @@ pub fn write_pages_vectored(
pager: &Pager,
batch: BTreeMap<usize, Arc<Buffer>>,
done_flag: Arc<AtomicBool>,
final_write: bool,
) -> Result<Vec<Completion>> {
if batch.is_empty() {
done_flag.store(true, Ordering::Relaxed);
return Ok(Vec::new());
}
// batch item array is already sorted by id, so we just need to find contiguous ranges of page_id's
// to submit as `writev`/write_pages calls.
let page_sz = pager.page_size.get().expect("page size is not set").get() as usize;
// Count expected number of runs to create the atomic counter we need to track each batch
let mut run_count = 0;
let mut prev_id = None;
@@ -1023,26 +1020,21 @@ pub fn write_pages_vectored(
// Create the atomic counters
let runs_left = Arc::new(AtomicUsize::new(run_count));
let done = done_flag.clone();
// we know how many runs, but we don't know how many buffers per run, so we can only give an
// estimate of the capacity
const EST_BUFF_CAPACITY: usize = 32;
// Iterate through the batch, submitting each run as soon as it ends
// We can reuse this across runs without reallocating
let mut run_bufs = Vec::with_capacity(EST_BUFF_CAPACITY);
let mut run_start_id: Option<usize> = None;
// Iterate through the batch
// Track which run we're on to identify the last one
let mut current_run = 0;
let mut iter = batch.iter().peekable();
let mut completions = Vec::new();
while let Some((id, item)) = iter.next() {
// Track the start of the run
if run_start_id.is_none() {
run_start_id = Some(*id);
}
// Add this page to the current run
run_bufs.push(item.clone());
// Check if this is the end of a run
@@ -1052,24 +1044,32 @@ pub fn write_pages_vectored(
};
if is_end_of_run {
current_run += 1;
let start_id = run_start_id.expect("should have a start id");
let runs_left_cl = runs_left.clone();
let done_cl = done.clone();
// This is the last chunk if it's the last run AND final_write is true
let is_last_chunk = current_run == run_count && final_write;
let total_sz = (page_sz * run_bufs.len()) as i32;
let c = Completion::new_write(move |res| {
let cmp = move |res| {
let Ok(res) = res else {
return;
};
// writev calls can sometimes return partial writes, but our `pwritev`
// implementation aggregates any partial writes and calls completion with total
turso_assert!(total_sz == res, "failed to write expected size");
if runs_left_cl.fetch_sub(1, Ordering::AcqRel) == 1 {
done_cl.store(true, Ordering::Release);
}
});
};
// Submit write operation for this run, decrementing the counter if we error
let c = if is_last_chunk {
Completion::new_write_linked(cmp)
} else {
Completion::new_write(cmp)
};
// Submit write operation for this run
let io_ctx = &pager.io_ctx.borrow();
match pager.db_file.write_pages(
start_id,

View File

@@ -465,6 +465,13 @@ impl OngoingCheckpoint {
self.state = CheckpointState::Start;
}
#[inline]
fn is_final_write(&self) -> bool {
self.current_page as usize >= self.pages_to_checkpoint.len()
&& self.inflight_reads.is_empty()
&& !self.pending_writes.is_empty()
}
#[inline]
/// Whether or not new reads should be issued during checkpoint processing.
fn should_issue_reads(&self) -> bool {
@@ -1510,7 +1517,7 @@ impl Wal for WalFile {
// single completion for the whole batch
let total_len: i32 = iovecs.iter().map(|b| b.len() as i32).sum();
let page_frame_for_cb = page_frame_and_checksum.clone();
let c = Completion::new_write(move |res: Result<i32, CompletionError>| {
let cmp = move |res: Result<i32, CompletionError>| {
let Ok(bytes_written) = res else {
return;
};
@@ -1523,7 +1530,13 @@ impl Wal for WalFile {
page.clear_dirty();
page.set_wal_tag(*fid, seq);
}
});
};
let c = if db_size_on_commit.is_some() {
Completion::new_write_linked(cmp)
} else {
Completion::new_write(cmp)
};
let c = self.get_shared().file.pwritev(start_off, iovecs, c)?;
Ok(c)
@@ -1821,7 +1834,10 @@ impl WalFile {
let batch_map = self.ongoing_checkpoint.pending_writes.take();
if !batch_map.is_empty() {
let done_flag = self.ongoing_checkpoint.add_write();
completions.extend(write_pages_vectored(pager, batch_map, done_flag)?);
let is_final = self.ongoing_checkpoint.is_final_write();
completions.extend(write_pages_vectored(
pager, batch_map, done_flag, is_final,
)?);
}
}