Add append_frames_vectored to WAL api

In addition to the existing `append_frame` which will write an individual frame
to the WAL, we add a method `append_frames_vectored` that takes N frames and the
db size which will need to be set for the last (commit) frame, and it
calculates the checksums and submits them as a single `pwritev` call,
reducing the number of syscalls needed for each write operation.
This commit is contained in:
PThorpe92
2025-08-23 16:37:36 -04:00
parent 1a4a53e6ea
commit 46e288ac26

View File

@@ -1,6 +1,7 @@
#![allow(clippy::not_unsafe_ptr_arg_deref)]
use std::array;
use std::borrow::Cow;
use std::cell::{RefCell, UnsafeCell};
use std::collections::{BTreeMap, HashMap, HashSet};
use strum::EnumString;
@@ -275,6 +276,13 @@ pub trait Wal: Debug {
db_size: u32,
) -> Result<Completion>;
fn append_frames_vectored(
&mut self,
pages: Vec<PageRef>,
page_sz: PageSize,
db_size_on_commit: Option<u32>,
) -> Result<Completion>;
/// Complete append of frames by updating shared wal state. Before this
/// all changes were stored locally.
fn finish_append_frames_commit(&mut self) -> Result<()>;
@@ -317,7 +325,8 @@ pub const CKPT_BATCH_PAGES: usize = 512;
const MIN_AVG_RUN_FOR_FLUSH: f32 = 32.0;
const MIN_BATCH_LEN_FOR_FLUSH: usize = 512;
const MAX_INFLIGHT_WRITES: usize = 64;
const MAX_INFLIGHT_READS: usize = 512;
pub const MAX_INFLIGHT_READS: usize = 512;
pub const IOV_MAX: usize = 1024;
type PageId = usize;
struct InflightRead {
@@ -1391,6 +1400,110 @@ impl Wal for WalFile {
Ok(pages)
}
/// Use pwritev to append many frames to the log at once
fn append_frames_vectored(
&mut self,
pages: Vec<PageRef>,
page_sz: PageSize,
db_size_on_commit: Option<u32>,
) -> Result<Completion> {
turso_assert!(
pages.len() <= IOV_MAX,
"we limit number of iovecs to IOV_MAX"
);
self.ensure_header_if_needed(page_sz)?;
let (header, shared_page_size, seq) = {
let shared = self.get_shared();
let hdr_guard = shared.wal_header.lock();
let header: WalHeader = *hdr_guard;
let shared_page_size = header.page_size;
let seq = header.checkpoint_seq;
(header, shared_page_size, seq)
};
turso_assert!(
shared_page_size == page_sz.get(),
"page size mismatch, tried to change page size after WAL header was already initialized: shared.page_size={shared_page_size}, page_size={}",
page_sz.get()
);
// Prepare write buffers and bookkeeping
let mut iovecs: Vec<Arc<Buffer>> = Vec::with_capacity(pages.len());
let mut page_frame_and_checksum: Vec<(PageRef, u64, (u32, u32))> =
Vec::with_capacity(pages.len());
// Rolling checksum input to each frame build
let mut rolling_csum: (u32, u32) = self.last_checksum;
let mut next_frame_id = self.max_frame + 1;
// Build every frame in order, updating the rolling checksum
for (idx, page) in pages.iter().enumerate() {
let page_id = page.get().id as u64;
let plain = page.get_contents().as_ptr();
let data_to_write: std::borrow::Cow<[u8]> = {
let key = self.encryption_key.borrow();
if let Some(k) = key.as_ref() {
Cow::Owned(encrypt_page(plain, page_id as usize, k)?)
} else {
Cow::Borrowed(plain)
}
};
let frame_db_size = if idx + 1 == pages.len() {
db_size_on_commit.unwrap_or(0)
} else {
0
};
let (new_csum, frame_bytes) = prepare_wal_frame(
&self.buffer_pool,
&header,
rolling_csum,
shared_page_size,
page_id as u32,
frame_db_size,
&data_to_write,
);
iovecs.push(frame_bytes);
// (page, assigned_frame_id, cumulative_checksum_at_this_frame)
page_frame_and_checksum.push((page.clone(), next_frame_id, new_csum));
// Advance for the next frame
rolling_csum = new_csum;
next_frame_id += 1;
}
let first_frame_id = self.max_frame + 1;
let start_off = self.frame_offset(first_frame_id);
// pre-advance in-memory WAL state like the single-frame path
for (page, fid, csum) in &page_frame_and_checksum {
self.complete_append_frame(page.get().id as u64, *fid, *csum);
}
// single completion for the whole batch
let total_len: i32 = iovecs.iter().map(|b| b.len() as i32).sum();
let page_frame_for_cb = page_frame_and_checksum.clone();
let c = Completion::new_write(move |res: Result<i32, CompletionError>| {
let Ok(bytes_written) = res else {
return;
};
turso_assert!(
bytes_written == total_len,
"pwritev wrote {bytes_written} bytes, expected {total_len}"
);
for (page, fid, _csum) in &page_frame_for_cb {
page.clear_dirty();
page.set_wal_tag(*fid, seq);
}
});
let c = self.get_shared().file.pwritev(start_off, iovecs, c)?;
Ok(c)
}
#[cfg(debug_assertions)]
fn as_any(&self) -> &dyn std::any::Any {
self