Merge 'io_uring: prevent out of order operations that could interfere with durability' from Preston Thorpe

closes #1419
When submitting a `pwritev` for flushing dirty pages, in the case that
it's a commit frame, we use a new completion type which tells io_uring
to add a flag, which ensures the following:
1. If any operation in the chain fails, subsequent operations get
cancelled with -ECANCELED
2. All operations in the chain complete in order
If there is an ongoing chain of `IO_LINK`, it ends at the `fsync`
barrier, and ensures everything submitted before it has completed.
for 99% of the cases, the syscall that immediately proceeds the
`pwritev` is going to be the fsync, but just in case, this
implementation links everything that comes between the final commit
`pwritev` and the next `fsync`
In the event that we get a partial write, if it was linked, then we
submit an additional fsync after the partial write completes, with an
`IO_DRAIN` flag after forcing a `submit`, which will mean durability is
maintained, as that fsync will flush/drain everything in the squeue
before submission.
The other option in the event of partial writes on commit frames/linked
writes is to error.. not sure which is the right move here. I guess it's
possible that since the fsync completion fired, than the commit could be
over without us being durable ondisk. So maybe it's an assertion
instead? Thoughts?

Closes #2909
This commit is contained in:
Pekka Enberg
2025-09-05 08:34:35 +03:00
committed by GitHub
4 changed files with 139 additions and 31 deletions

View File

@@ -7,6 +7,7 @@ use crate::{turso_assert, LimboError, Result};
use parking_lot::Mutex;
use rustix::fs::{self, FlockOperation, OFlags};
use std::ptr::NonNull;
use std::sync::atomic::{AtomicBool, Ordering};
use std::{
collections::{HashMap, VecDeque},
io::ErrorKind,
@@ -43,6 +44,10 @@ const MAX_WAIT: usize = 4;
/// One memory arena for DB pages and another for WAL frames
const ARENA_COUNT: usize = 2;
/// Arbitrary non-zero user_data for barrier operation when handling a partial writev
/// writing a commit frame.
const BARRIER_USER_DATA: u64 = 1;
pub struct UringIO {
inner: Arc<Mutex<InnerUringIO>>,
}
@@ -56,6 +61,7 @@ struct WrappedIOUring {
writev_states: HashMap<u64, WritevState>,
overflow: VecDeque<io_uring::squeue::Entry>,
iov_pool: IovecPool,
pending_link: AtomicBool,
}
struct InnerUringIO {
@@ -122,6 +128,7 @@ impl UringIO {
pending_ops: 0,
writev_states: HashMap::new(),
iov_pool: IovecPool::new(),
pending_link: AtomicBool::new(false),
},
free_files: (0..FILES).collect(),
free_arenas: [const { None }; ARENA_COUNT],
@@ -153,6 +160,7 @@ macro_rules! with_fd {
/// wrapper type to represent a possibly registered file descriptor,
/// only used in WritevState, and piggy-backs on the available methods from
/// `UringFile`, so we don't have to store the file on `WritevState`.
#[derive(Clone)]
enum Fd {
Fixed(u32),
RawFd(i32),
@@ -194,10 +202,12 @@ struct WritevState {
bufs: Vec<Arc<crate::Buffer>>,
/// we keep the last iovec allocation alive until final CQE
last_iov_allocation: Option<Box<[libc::iovec; MAX_IOVEC_ENTRIES]>>,
had_partial: bool,
linked_op: bool,
}
impl WritevState {
fn new(file: &UringFile, pos: u64, bufs: Vec<Arc<crate::Buffer>>) -> Self {
fn new(file: &UringFile, pos: u64, linked: bool, bufs: Vec<Arc<crate::Buffer>>) -> Self {
let file_id = file
.id()
.map(Fd::Fixed)
@@ -212,6 +222,8 @@ impl WritevState {
bufs,
last_iov_allocation: None,
total_len,
had_partial: false,
linked_op: linked,
}
}
@@ -353,7 +365,7 @@ impl WrappedIOUring {
}
/// Submit or resubmit a writev operation
fn submit_writev(&mut self, key: u64, mut st: WritevState) {
fn submit_writev(&mut self, key: u64, mut st: WritevState, continue_chain: bool) {
st.free_last_iov(&mut self.iov_pool);
let mut iov_allocation = self.iov_pool.acquire().unwrap_or_else(|| {
// Fallback: allocate a new one if pool is exhausted
@@ -391,7 +403,7 @@ impl WrappedIOUring {
}
// If we have coalesced everything into a single iovec, submit as a single`pwrite`
if iov_count == 1 {
let entry = with_fd!(st.file_id, |fd| {
let mut entry = with_fd!(st.file_id, |fd| {
if let Some(id) = st.bufs[st.current_buffer_idx].fixed_id() {
io_uring::opcode::WriteFixed::new(
fd,
@@ -413,6 +425,16 @@ impl WrappedIOUring {
.user_data(key)
}
});
if st.linked_op && !st.had_partial {
// Starting a new link chain
entry = entry.flags(io_uring::squeue::Flags::IO_LINK);
self.pending_link.store(true, Ordering::Release);
} else if continue_chain && !st.had_partial {
// Continue existing chain
entry = entry.flags(io_uring::squeue::Flags::IO_LINK);
}
self.submit_entry(&entry);
return;
}
@@ -422,12 +444,15 @@ impl WrappedIOUring {
let ptr = iov_allocation.as_ptr() as *mut libc::iovec;
st.last_iov_allocation = Some(iov_allocation);
let entry = with_fd!(st.file_id, |fd| {
let mut entry = with_fd!(st.file_id, |fd| {
io_uring::opcode::Writev::new(fd, ptr, iov_count as u32)
.offset(st.file_pos)
.build()
.user_data(key)
});
if st.linked_op {
entry = entry.flags(io_uring::squeue::Flags::IO_LINK);
}
// track the current state in case we get a partial write
self.writev_states.insert(key, st);
self.submit_entry(&entry);
@@ -452,6 +477,19 @@ impl WrappedIOUring {
);
// write complete, return iovec to pool
state.free_last_iov(&mut self.iov_pool);
if state.linked_op && state.had_partial {
// if it was a linked operation, we need to submit a fsync after this writev
// to ensure data is on disk
self.ring.submit().expect("submit after writev");
let file_id = state.file_id;
let sync = with_fd!(file_id, |fd| {
io_uring::opcode::Fsync::new(fd)
.build()
.user_data(BARRIER_USER_DATA)
})
.flags(io_uring::squeue::Flags::IO_DRAIN);
self.submit_entry(&sync);
}
completion_from_key(user_data).complete(state.total_written as i32);
}
remaining => {
@@ -461,8 +499,10 @@ impl WrappedIOUring {
written,
remaining
);
// partial write, submit next
self.submit_writev(user_data, state);
// make sure partial write is recorded, because fsync could happen after this
// and we are not finished writing to disk
state.had_partial = true;
self.submit_writev(user_data, state, false);
}
}
}
@@ -530,6 +570,14 @@ impl IO for UringIO {
// if we have ongoing writev state, handle it separately and don't call completion
ring.handle_writev_completion(state, user_data, result);
continue;
} else if user_data == BARRIER_USER_DATA {
// barrier operation, no completion to call
if result < 0 {
let err = std::io::Error::from_raw_os_error(result);
tracing::error!("barrier operation failed: {}", err);
return Err(err.into());
}
continue;
}
completion_from_key(user_data).complete(result)
}
@@ -680,7 +728,7 @@ impl File for UringFile {
fn pwrite(&self, pos: u64, buffer: Arc<crate::Buffer>, c: Completion) -> Result<Completion> {
let mut io = self.io.lock();
let write = {
let mut write = {
let ptr = buffer.as_ptr();
let len = buffer.len();
with_fd!(self, |fd| {
@@ -708,6 +756,15 @@ impl File for UringFile {
}
})
};
if c.needs_link() {
// Start a new link chain
write = write.flags(io_uring::squeue::Flags::IO_LINK);
io.ring.pending_link.store(true, Ordering::Release);
} else if io.ring.pending_link.load(Ordering::Acquire) {
// Continue existing link chain
write = write.flags(io_uring::squeue::Flags::IO_LINK);
}
io.ring.submit_entry(&write);
Ok(c)
}
@@ -720,6 +777,8 @@ impl File for UringFile {
.build()
.user_data(get_key(c.clone()))
});
// sync always ends the chain of linked operations
io.ring.pending_link.store(false, Ordering::Release);
io.ring.submit_entry(&sync);
Ok(c)
}
@@ -734,10 +793,14 @@ impl File for UringFile {
if bufs.len().eq(&1) {
return self.pwrite(pos, bufs[0].clone(), c.clone());
}
let linked = c.needs_link();
tracing::trace!("pwritev(pos = {}, bufs.len() = {})", pos, bufs.len());
// create state to track ongoing writev operation
let state = WritevState::new(self, pos, bufs);
self.io.lock().ring.submit_writev(get_key(c.clone()), state);
let state = WritevState::new(self, pos, linked, bufs);
let mut io = self.io.lock();
let continue_chain = !linked && io.ring.pending_link.load(Ordering::Acquire);
io.ring
.submit_writev(get_key(c.clone()), state, continue_chain);
Ok(c)
}
@@ -746,12 +809,16 @@ impl File for UringFile {
}
fn truncate(&self, len: u64, c: Completion) -> Result<Completion> {
let truncate = with_fd!(self, |fd| {
let mut truncate = with_fd!(self, |fd| {
io_uring::opcode::Ftruncate::new(fd, len)
.build()
.user_data(get_key(c.clone()))
});
self.io.lock().ring.submit_entry(&truncate);
let mut io = self.io.lock();
if io.ring.pending_link.load(Ordering::Acquire) {
truncate = truncate.flags(io_uring::squeue::Flags::IO_LINK);
}
io.ring.submit_entry(&truncate);
Ok(c)
}
}

View File

@@ -135,6 +135,7 @@ struct CompletionInner {
/// None means we completed successfully
// Thread safe with OnceLock
result: std::sync::OnceLock<Option<CompletionError>>,
needs_link: bool,
}
impl Debug for CompletionType {
@@ -161,10 +162,34 @@ impl Completion {
inner: Arc::new(CompletionInner {
completion_type,
result: OnceLock::new(),
needs_link: false,
}),
}
}
pub fn new_linked(completion_type: CompletionType) -> Self {
Self {
inner: Arc::new(CompletionInner {
completion_type,
result: OnceLock::new(),
needs_link: true,
}),
}
}
pub fn needs_link(&self) -> bool {
self.inner.needs_link
}
pub fn new_write_linked<F>(complete: F) -> Self
where
F: Fn(Result<i32, CompletionError>) + 'static,
{
Self::new_linked(CompletionType::Write(WriteCompletion::new(Box::new(
complete,
))))
}
pub fn new_write<F>(complete: F) -> Self
where
F: Fn(Result<i32, CompletionError>) + 'static,

View File

@@ -996,17 +996,14 @@ pub fn write_pages_vectored(
pager: &Pager,
batch: BTreeMap<usize, Arc<Buffer>>,
done_flag: Arc<AtomicBool>,
final_write: bool,
) -> Result<Vec<Completion>> {
if batch.is_empty() {
done_flag.store(true, Ordering::Relaxed);
return Ok(Vec::new());
}
// batch item array is already sorted by id, so we just need to find contiguous ranges of page_id's
// to submit as `writev`/write_pages calls.
let page_sz = pager.page_size.get().expect("page size is not set").get() as usize;
// Count expected number of runs to create the atomic counter we need to track each batch
let mut run_count = 0;
let mut prev_id = None;
@@ -1024,26 +1021,21 @@ pub fn write_pages_vectored(
// Create the atomic counters
let runs_left = Arc::new(AtomicUsize::new(run_count));
let done = done_flag.clone();
// we know how many runs, but we don't know how many buffers per run, so we can only give an
// estimate of the capacity
const EST_BUFF_CAPACITY: usize = 32;
// Iterate through the batch, submitting each run as soon as it ends
// We can reuse this across runs without reallocating
let mut run_bufs = Vec::with_capacity(EST_BUFF_CAPACITY);
let mut run_start_id: Option<usize> = None;
// Iterate through the batch
// Track which run we're on to identify the last one
let mut current_run = 0;
let mut iter = batch.iter().peekable();
let mut completions = Vec::new();
while let Some((id, item)) = iter.next() {
// Track the start of the run
if run_start_id.is_none() {
run_start_id = Some(*id);
}
// Add this page to the current run
run_bufs.push(item.clone());
// Check if this is the end of a run
@@ -1053,24 +1045,32 @@ pub fn write_pages_vectored(
};
if is_end_of_run {
current_run += 1;
let start_id = run_start_id.expect("should have a start id");
let runs_left_cl = runs_left.clone();
let done_cl = done.clone();
// This is the last chunk if it's the last run AND final_write is true
let is_last_chunk = current_run == run_count && final_write;
let total_sz = (page_sz * run_bufs.len()) as i32;
let c = Completion::new_write(move |res| {
let cmp = move |res| {
let Ok(res) = res else {
return;
};
// writev calls can sometimes return partial writes, but our `pwritev`
// implementation aggregates any partial writes and calls completion with total
turso_assert!(total_sz == res, "failed to write expected size");
if runs_left_cl.fetch_sub(1, Ordering::AcqRel) == 1 {
done_cl.store(true, Ordering::Release);
}
});
};
// Submit write operation for this run, decrementing the counter if we error
let c = if is_last_chunk {
Completion::new_write_linked(cmp)
} else {
Completion::new_write(cmp)
};
// Submit write operation for this run
let io_ctx = &pager.io_ctx.borrow();
match pager.db_file.write_pages(
start_id,

View File

@@ -466,6 +466,13 @@ impl OngoingCheckpoint {
self.state = CheckpointState::Start;
}
#[inline]
fn is_final_write(&self) -> bool {
self.current_page as usize >= self.pages_to_checkpoint.len()
&& self.inflight_reads.is_empty()
&& !self.pending_writes.is_empty()
}
#[inline]
/// Whether or not new reads should be issued during checkpoint processing.
fn should_issue_reads(&self) -> bool {
@@ -1550,7 +1557,7 @@ impl Wal for WalFile {
// single completion for the whole batch
let total_len: i32 = iovecs.iter().map(|b| b.len() as i32).sum();
let page_frame_for_cb = page_frame_and_checksum.clone();
let c = Completion::new_write(move |res: Result<i32, CompletionError>| {
let cmp = move |res: Result<i32, CompletionError>| {
let Ok(bytes_written) = res else {
return;
};
@@ -1563,7 +1570,13 @@ impl Wal for WalFile {
page.clear_dirty();
page.set_wal_tag(*fid, seq);
}
});
};
let c = if db_size_on_commit.is_some() {
Completion::new_write_linked(cmp)
} else {
Completion::new_write(cmp)
};
let shared = self.get_shared();
assert!(
@@ -1884,7 +1897,10 @@ impl WalFile {
let batch_map = self.ongoing_checkpoint.pending_writes.take();
if !batch_map.is_empty() {
let done_flag = self.ongoing_checkpoint.add_write();
completions.extend(write_pages_vectored(pager, batch_map, done_flag)?);
let is_final = self.ongoing_checkpoint.is_final_write();
completions.extend(write_pages_vectored(
pager, batch_map, done_flag, is_final,
)?);
}
}