Add plumbing in io_uring to handle linked writes to ensure consistency

This commit is contained in:
PThorpe92
2025-09-03 16:01:12 -04:00
parent e3f366963d
commit d894a62132

View File

@@ -7,6 +7,7 @@ use crate::{turso_assert, LimboError, Result};
use parking_lot::Mutex;
use rustix::fs::{self, FlockOperation, OFlags};
use std::ptr::NonNull;
use std::sync::atomic::{AtomicBool, Ordering};
use std::{
collections::{HashMap, VecDeque},
io::ErrorKind,
@@ -43,6 +44,10 @@ const MAX_WAIT: usize = 4;
/// One memory arena for DB pages and another for WAL frames
const ARENA_COUNT: usize = 2;
/// Arbitrary non-zero user_data for barrier operation when handling a partial writev
/// writing a commit frame.
const BARRIER_USER_DATA: u64 = 1;
pub struct UringIO {
inner: Arc<Mutex<InnerUringIO>>,
}
@@ -56,6 +61,7 @@ struct WrappedIOUring {
writev_states: HashMap<u64, WritevState>,
overflow: VecDeque<io_uring::squeue::Entry>,
iov_pool: IovecPool,
pending_link: AtomicBool,
}
struct InnerUringIO {
@@ -122,6 +128,7 @@ impl UringIO {
pending_ops: 0,
writev_states: HashMap::new(),
iov_pool: IovecPool::new(),
pending_link: AtomicBool::new(false),
},
free_files: (0..FILES).collect(),
free_arenas: [const { None }; ARENA_COUNT],
@@ -153,6 +160,7 @@ macro_rules! with_fd {
/// wrapper type to represent a possibly registered file descriptor,
/// only used in WritevState, and piggy-backs on the available methods from
/// `UringFile`, so we don't have to store the file on `WritevState`.
#[derive(Clone)]
enum Fd {
Fixed(u32),
RawFd(i32),
@@ -194,10 +202,12 @@ struct WritevState {
bufs: Vec<Arc<crate::Buffer>>,
/// we keep the last iovec allocation alive until final CQE
last_iov_allocation: Option<Box<[libc::iovec; MAX_IOVEC_ENTRIES]>>,
had_partial: bool,
linked_op: bool,
}
impl WritevState {
fn new(file: &UringFile, pos: u64, bufs: Vec<Arc<crate::Buffer>>) -> Self {
fn new(file: &UringFile, pos: u64, linked: bool, bufs: Vec<Arc<crate::Buffer>>) -> Self {
let file_id = file
.id()
.map(Fd::Fixed)
@@ -212,6 +222,8 @@ impl WritevState {
bufs,
last_iov_allocation: None,
total_len,
had_partial: false,
linked_op: linked,
}
}
@@ -353,7 +365,7 @@ impl WrappedIOUring {
}
/// Submit or resubmit a writev operation
fn submit_writev(&mut self, key: u64, mut st: WritevState) {
fn submit_writev(&mut self, key: u64, mut st: WritevState, continue_chain: bool) {
st.free_last_iov(&mut self.iov_pool);
let mut iov_allocation = self.iov_pool.acquire().unwrap_or_else(|| {
// Fallback: allocate a new one if pool is exhausted
@@ -391,7 +403,7 @@ impl WrappedIOUring {
}
// If we have coalesced everything into a single iovec, submit as a single`pwrite`
if iov_count == 1 {
let entry = with_fd!(st.file_id, |fd| {
let mut entry = with_fd!(st.file_id, |fd| {
if let Some(id) = st.bufs[st.current_buffer_idx].fixed_id() {
io_uring::opcode::WriteFixed::new(
fd,
@@ -413,6 +425,16 @@ impl WrappedIOUring {
.user_data(key)
}
});
if st.linked_op && !st.had_partial {
// Starting a new link chain
entry = entry.flags(io_uring::squeue::Flags::IO_LINK);
self.pending_link.store(true, Ordering::Release);
} else if continue_chain && !st.had_partial {
// Continue existing chain
entry = entry.flags(io_uring::squeue::Flags::IO_LINK);
}
self.submit_entry(&entry);
return;
}
@@ -422,12 +444,15 @@ impl WrappedIOUring {
let ptr = iov_allocation.as_ptr() as *mut libc::iovec;
st.last_iov_allocation = Some(iov_allocation);
let entry = with_fd!(st.file_id, |fd| {
let mut entry = with_fd!(st.file_id, |fd| {
io_uring::opcode::Writev::new(fd, ptr, iov_count as u32)
.offset(st.file_pos)
.build()
.user_data(key)
});
if st.linked_op {
entry = entry.flags(io_uring::squeue::Flags::IO_LINK);
}
// track the current state in case we get a partial write
self.writev_states.insert(key, st);
self.submit_entry(&entry);
@@ -452,6 +477,19 @@ impl WrappedIOUring {
);
// write complete, return iovec to pool
state.free_last_iov(&mut self.iov_pool);
if state.linked_op && state.had_partial {
// if it was a linked operation, we need to submit a fsync after this writev
// to ensure data is on disk
self.ring.submit().expect("submit after writev");
let file_id = state.file_id;
let sync = with_fd!(file_id, |fd| {
io_uring::opcode::Fsync::new(fd)
.build()
.user_data(BARRIER_USER_DATA)
})
.flags(io_uring::squeue::Flags::IO_DRAIN);
self.submit_entry(&sync);
}
completion_from_key(user_data).complete(state.total_written as i32);
}
remaining => {
@@ -461,8 +499,10 @@ impl WrappedIOUring {
written,
remaining
);
// partial write, submit next
self.submit_writev(user_data, state);
// make sure partial write is recorded, because fsync could happen after this
// and we are not finished writing to disk
state.had_partial = true;
self.submit_writev(user_data, state, false);
}
}
}
@@ -530,6 +570,14 @@ impl IO for UringIO {
// if we have ongoing writev state, handle it separately and don't call completion
ring.handle_writev_completion(state, user_data, result);
continue;
} else if user_data == BARRIER_USER_DATA {
// barrier operation, no completion to call
if result < 0 {
let err = std::io::Error::from_raw_os_error(result);
tracing::error!("barrier operation failed: {}", err);
return Err(err.into());
}
continue;
}
completion_from_key(user_data).complete(result)
}
@@ -680,7 +728,7 @@ impl File for UringFile {
fn pwrite(&self, pos: u64, buffer: Arc<crate::Buffer>, c: Completion) -> Result<Completion> {
let mut io = self.io.lock();
let write = {
let mut write = {
let ptr = buffer.as_ptr();
let len = buffer.len();
with_fd!(self, |fd| {
@@ -708,6 +756,15 @@ impl File for UringFile {
}
})
};
if c.needs_link() {
// Start a new link chain
write = write.flags(io_uring::squeue::Flags::IO_LINK);
io.ring.pending_link.store(true, Ordering::Release);
} else if io.ring.pending_link.load(Ordering::Acquire) {
// Continue existing link chain
write = write.flags(io_uring::squeue::Flags::IO_LINK);
}
io.ring.submit_entry(&write);
Ok(c)
}
@@ -720,6 +777,8 @@ impl File for UringFile {
.build()
.user_data(get_key(c.clone()))
});
// sync always ends the chain of linked operations
io.ring.pending_link.store(false, Ordering::Release);
io.ring.submit_entry(&sync);
Ok(c)
}
@@ -734,10 +793,14 @@ impl File for UringFile {
if bufs.len().eq(&1) {
return self.pwrite(pos, bufs[0].clone(), c.clone());
}
let linked = c.needs_link();
tracing::trace!("pwritev(pos = {}, bufs.len() = {})", pos, bufs.len());
// create state to track ongoing writev operation
let state = WritevState::new(self, pos, bufs);
self.io.lock().ring.submit_writev(get_key(c.clone()), state);
let state = WritevState::new(self, pos, linked, bufs);
let mut io = self.io.lock();
let continue_chain = !linked && io.ring.pending_link.load(Ordering::Acquire);
io.ring
.submit_writev(get_key(c.clone()), state, continue_chain);
Ok(c)
}
@@ -746,12 +809,16 @@ impl File for UringFile {
}
fn truncate(&self, len: u64, c: Completion) -> Result<Completion> {
let truncate = with_fd!(self, |fd| {
let mut truncate = with_fd!(self, |fd| {
io_uring::opcode::Ftruncate::new(fd, len)
.build()
.user_data(get_key(c.clone()))
});
self.io.lock().ring.submit_entry(&truncate);
let mut io = self.io.lock();
if io.ring.pending_link.load(Ordering::Acquire) {
truncate = truncate.flags(io_uring::squeue::Flags::IO_LINK);
}
io.ring.submit_entry(&truncate);
Ok(c)
}
}