From 0f94cdef034194d3cb2dda12c358c19eca037f13 Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Sat, 26 Jul 2025 15:36:33 -0400 Subject: [PATCH] Fix io_uring pwritev to properly handle partial writes --- core/io/io_uring.rs | 221 +++++++++++++++++++++++++++++--------------- core/storage/wal.rs | 2 +- 2 files changed, 145 insertions(+), 78 deletions(-) diff --git a/core/io/io_uring.rs b/core/io/io_uring.rs index 963e2fe28..ccf213e12 100644 --- a/core/io/io_uring.rs +++ b/core/io/io_uring.rs @@ -2,15 +2,18 @@ use super::{common, Completion, CompletionInner, File, OpenFlags, IO}; use crate::io::clock::{Clock, Instant}; +use crate::storage::wal::CKPT_BATCH_PAGES; use crate::{turso_assert, LimboError, MemoryIO, Result}; use rustix::fs::{self, FlockOperation, OFlags}; -use std::cell::RefCell; -use std::collections::{HashMap, VecDeque}; -use std::io::ErrorKind; -use std::os::fd::AsFd; -use std::os::unix::io::AsRawFd; -use std::rc::Rc; -use std::sync::Arc; +use std::{ + cell::RefCell, + collections::{HashMap, VecDeque}, + io::ErrorKind, + ops::Deref, + os::{fd::AsFd, unix::io::AsRawFd}, + rc::Rc, + sync::Arc, +}; use tracing::{debug, trace}; const ENTRIES: u32 = 512; @@ -65,31 +68,50 @@ impl UringIO { macro_rules! with_fd { ($file:expr, |$fd:ident| $body:expr) => { - match $file.id { + match $file.id() { Some(id) => { let $fd = io_uring::types::Fixed(id); $body } None => { - let $fd = io_uring::types::Fd($file.file.as_raw_fd()); + let $fd = io_uring::types::Fd($file.as_raw_fd()); $body } } }; } +enum Fd { + Fixed(u32), + RawFd(i32), +} + +impl Fd { + fn as_raw_fd(&self) -> i32 { + match self { + Fd::RawFd(fd) => *fd, + _ => unreachable!("only to be called on RawFd variant"), + } + } + fn id(&self) -> Option { + match self { + Fd::Fixed(id) => Some(*id), + Fd::RawFd(_) => None, + } + } +} + struct WritevState { - // fixed fd slot or 0xFFFF_FFFF if none - file_id: u32, + // fixed fd slot + file_id: Fd, // absolute file offset for next submit - pos: usize, + file_pos: usize, // current buffer index in `bufs` - idx: usize, + current_buffer_idx: usize, // intra-buffer offset - off: usize, + current_buffer_offset: usize, + total_written: usize, bufs: Vec>>, - // completion returned to caller - user_c: Arc, // we keep the last iovec allocation alive until CQE: // raw ptr to Box<[iovec]> last_iov: *mut libc::iovec, @@ -97,12 +119,32 @@ struct WritevState { } impl WritevState { + fn new(file: &UringFile, pos: usize, bufs: Vec>>) -> Self { + let file_id = match file.id() { + Some(id) => Fd::Fixed(id), + None => Fd::RawFd(file.as_raw_fd()), + }; + Self { + file_id, + file_pos: pos, + current_buffer_idx: 0, + current_buffer_offset: 0, + total_written: 0, + bufs, + last_iov: core::ptr::null_mut(), + last_iov_len: 0, + } + } fn remaining(&self) -> usize { let mut total = 0; - for (i, b) in self.bufs.iter().enumerate().skip(self.idx) { + for (i, b) in self.bufs.iter().enumerate().skip(self.current_buffer_idx) { let r = b.borrow(); let len = r.len(); - total += if i == self.idx { len - self.off } else { len }; + total += if i == self.current_buffer_idx { + len - self.current_buffer_offset + } else { + len + }; } total } @@ -112,21 +154,22 @@ impl WritevState { let mut rem = written; while rem > 0 { let len = { - let r = self.bufs[self.idx].borrow(); + let r = self.bufs[self.current_buffer_idx].borrow(); r.len() }; - let left = len - self.off; + let left = len - self.current_buffer_offset; if rem < left { - self.off += rem; - self.pos += rem; + self.current_buffer_offset += rem; + self.file_pos += rem; rem = 0; } else { rem -= left; - self.pos += left; - self.idx += 1; - self.off = 0; + self.file_pos += left; + self.current_buffer_idx += 1; + self.current_buffer_offset = 0; } } + self.total_written += written; } fn free_last_iov(&mut self) { @@ -188,7 +231,7 @@ impl WrappedIOUring { return Ok(()); } let wants = std::cmp::min(self.pending_ops, 8); - tracing::info!("Waiting for {wants} pending operations to complete"); + tracing::trace!("submit_and_wait for {wants} pending operations to complete"); self.ring.submit_and_wait(wants)?; Ok(()) } @@ -199,13 +242,23 @@ impl WrappedIOUring { fn submit_writev(&mut self, key: u64) { let st = self.writev_states.get_mut(&key).expect("state must exist"); - // build iovecs for the remaining slice (respect UIO_MAXIOV) - const MAX_IOV: usize = libc::UIO_MAXIOV as usize; - let mut iov = Vec::with_capacity(MAX_IOV); - for (i, b) in st.bufs.iter().enumerate().skip(st.idx).take(MAX_IOV) { + // the likelyhood of the whole batch size being contiguous is very low, so lets not pre-allocate more than half + let max = CKPT_BATCH_PAGES / 2; + let mut iov = Vec::with_capacity(max); + for (i, b) in st + .bufs + .iter() + .enumerate() + .skip(st.current_buffer_idx) + .take(max) + { let r = b.borrow(); let s = r.as_slice(); - let slice = if i == st.idx { &s[st.off..] } else { s }; + let slice = if i == st.current_buffer_idx { + &s[st.current_buffer_offset..] + } else { + s + }; if slice.is_empty() { continue; } @@ -224,11 +277,12 @@ impl WrappedIOUring { st.last_iov_len = len; // leak; freed when CQE processed let _ = Box::into_raw(boxed); - let entry = - io_uring::opcode::Writev::new(io_uring::types::Fixed(st.file_id), ptr, len as u32) - .offset(st.pos as u64) + let entry = with_fd!(st.file_id, |fd| { + io_uring::opcode::Writev::new(fd, ptr, len as u32) + .offset(st.file_pos as u64) .build() - .user_data(key); + .user_data(key) + }); self.submit_entry(&entry); } } @@ -274,53 +328,58 @@ impl IO for UringIO { } fn run_once(&self) -> Result<()> { - { - let mut inner = self.inner.borrow_mut(); - let ring = &mut inner.ring; + let mut inner = self.inner.borrow_mut(); + let ring = &mut inner.ring; - if ring.empty() { - return Ok(()); - } - let ud = cqe.user_data(); - turso_assert!(ud > 0, "therea are no linked timeouts or cancelations, all cqe user_data should be valid arc pointers"); - ring.wait_for_completion()?; + if ring.empty() { + return Ok(()); } - loop { - let (had, user_data, result) = { - let mut inner = self.inner.borrow_mut(); - let mut cq = inner.ring.ring.completion(); - if let Some(cqe) = cq.next() { - (true, cqe.user_data(), cqe.result()) - } else { - (false, 0, 0) + ring.wait_for_completion()?; + const MAX: usize = ENTRIES as usize; + // to circumvent borrowing rules, collect everything without the heap + let mut uds = [0u64; MAX]; + let mut ress = [0i32; MAX]; + let mut count = 0; + { + let cq = ring.ring.completion(); + for cqe in cq { + ring.pending_ops -= 1; + uds[count] = cqe.user_data(); + ress[count] = cqe.result(); + count += 1; + if count == MAX { + break; } - }; - if !had { - break; } - self.inner.borrow_mut().ring.pending_ops -= 1; + } - let mut inner = self.inner.borrow_mut(); - if let Some(mut st) = inner.ring.writev_states.remove(&user_data) { + for i in 0..count { + ring.pending_ops -= 1; + let user_data = uds[i]; + let result = ress[i]; + turso_assert!( + user_data != 0, + "user_data must not be zero, we dont submit linked timeouts or cancelations that would cause this" + ); + if let Some(mut st) = ring.writev_states.remove(&user_data) { if result < 0 { st.free_last_iov(); - st.user_c.complete(result); + completion_from_key(user_data).complete(result); } else { let written = result as usize; st.free_last_iov(); st.advance(written); if st.remaining() == 0 { - st.user_c.complete(st.pos as i32); + completion_from_key(user_data).complete(st.total_written as i32); } else { - inner.ring.writev_states.insert(user_data, st); - inner.ring.submit_writev(user_data); // safe: no CQ borrow alive + ring.writev_states.insert(user_data, st); + ring.submit_writev(user_data); } } } else { completion_from_key(user_data).complete(result); } } - Ok(()) } @@ -365,6 +424,19 @@ pub struct UringFile { id: Option, } +impl Deref for UringFile { + type Target = std::fs::File; + fn deref(&self) -> &Self::Target { + &self.file + } +} + +impl UringFile { + fn id(&self) -> Option { + self.id + } +} + unsafe impl Send for UringFile {} unsafe impl Sync for UringFile {} @@ -462,26 +534,21 @@ impl File for UringFile { &self, pos: usize, bufs: Vec>>, - user_c: Arc, + c: Arc, ) -> Result> { + // for a single buffer use pwrite directly + if bufs.len().eq(&1) { + return self.pwrite(pos, bufs[0].clone(), c.clone()); + } + tracing::trace!("pwritev(pos = {}, bufs.len() = {})", pos, bufs.len()); // create state - let key = get_key(user_c.clone()); + let key = get_key(c.clone()); let mut io = self.io.borrow_mut(); - let state = WritevState { - file_id: self.id.unwrap_or(u32::MAX), - pos, - idx: 0, - off: 0, - bufs, - user_c: user_c.clone(), - last_iov: core::ptr::null_mut(), - last_iov_len: 0, - }; + let state = WritevState::new(self, pos, bufs); io.ring.writev_states.insert(key, state); io.ring.submit_writev(key); - - Ok(user_c.clone()) + Ok(c.clone()) } fn size(&self) -> Result { diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 7577849c2..5aa325af3 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -399,7 +399,7 @@ pub enum CheckpointState { Done, } -const CKPT_BATCH_PAGES: usize = 512; +pub const CKPT_BATCH_PAGES: usize = 512; #[derive(Clone)] pub(super) struct BatchItem {