From 9dfadf78720280749f31b17338152abd6a3f263f Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Wed, 16 Jul 2025 22:41:47 -0400 Subject: [PATCH] Add registered file descriptors to io_uring IO module --- core/io/io_uring.rs | 86 +++++++++++++++++++++++++++++---------------- 1 file changed, 55 insertions(+), 31 deletions(-) diff --git a/core/io/io_uring.rs b/core/io/io_uring.rs index 4beb624d5..d4baee34a 100644 --- a/core/io/io_uring.rs +++ b/core/io/io_uring.rs @@ -5,8 +5,8 @@ use crate::io::clock::{Clock, Instant}; use crate::io::CompletionType; use crate::{LimboError, MemoryIO, Result}; use rustix::fs::{self, FlockOperation, OFlags}; -use rustix::io_uring::iovec; use std::cell::RefCell; +use std::collections::VecDeque; use std::fmt; use std::io::ErrorKind; use std::os::fd::AsFd; @@ -16,7 +16,7 @@ use std::sync::Arc; use thiserror::Error; use tracing::{debug, trace}; -const MAX_IOVECS: u32 = 128; +const ENTRIES: u32 = 128; const SQPOLL_IDLE: u32 = 1000; #[derive(Debug, Error)] @@ -45,37 +45,34 @@ unsafe impl Sync for UringIO {} struct WrappedIOUring { ring: io_uring::IoUring, pending_ops: usize, - pub pending: [Option>; MAX_IOVECS as usize + 1], + pub pending: [Option>; ENTRIES as usize + 1], key: u64, } struct InnerUringIO { ring: WrappedIOUring, - iovecs: [iovec; MAX_IOVECS as usize], - next_iovec: usize, + free_files: VecDeque, } impl UringIO { pub fn new() -> Result { let ring = match io_uring::IoUring::builder() .setup_sqpoll(SQPOLL_IDLE) - .build(MAX_IOVECS) + .build(ENTRIES) { Ok(ring) => ring, - Err(_) => io_uring::IoUring::new(MAX_IOVECS)?, + Err(_) => io_uring::IoUring::new(ENTRIES)?, }; + // we only ever have 2 files open at a time for the moment + ring.submitter().register_files_sparse(8)?; let inner = InnerUringIO { ring: WrappedIOUring { ring, pending_ops: 0, - pending: [const { None }; MAX_IOVECS as usize + 1], + pending: [const { None }; ENTRIES as usize + 1], key: 0, }, - iovecs: [iovec { - iov_base: std::ptr::null_mut(), - iov_len: 0, - }; MAX_IOVECS as usize], - next_iovec: 0, + free_files: (0..8).collect(), }; debug!("Using IO backend 'io-uring'"); Ok(Self { @@ -85,12 +82,30 @@ impl UringIO { } impl InnerUringIO { - pub fn get_iovec(&mut self, buf: *const u8, len: usize) -> &iovec { - let iovec = &mut self.iovecs[self.next_iovec]; - iovec.iov_base = buf as *mut std::ffi::c_void; - iovec.iov_len = len; - self.next_iovec = (self.next_iovec + 1) % MAX_IOVECS as usize; - iovec + fn register_file(&mut self, fd: i32) -> Result { + if let Some(slot) = self.free_files.pop_front() { + self.ring + .ring + .submitter() + .register_files_update(slot, &[fd.as_raw_fd()])?; + return Ok(slot); + } + Err(LimboError::UringIOError( + "unable to register file, no free slots available".to_string(), + )) + } + fn unregister_file(&mut self, id: u32) -> Result<()> { + if self.free_files.len() >= ENTRIES as usize { + return Err(LimboError::UringIOError( + "unable to unregister file, too many free slots".to_string(), + )); + } + self.ring + .ring + .submitter() + .register_files_update(id, &[-1])?; + self.free_files.push_back(id); + Ok(()) } } @@ -129,7 +144,7 @@ impl WrappedIOUring { fn get_key(&mut self) -> u64 { self.key += 1; - if self.key == MAX_IOVECS as u64 { + if self.key == ENTRIES as u64 { let key = self.key; self.key = 0; return key; @@ -159,9 +174,11 @@ impl IO for UringIO { Err(error) => debug!("Error {error:?} returned when setting O_DIRECT flag to read file. The performance of the system may be affected"), } } + let id = self.inner.borrow_mut().register_file(file.as_raw_fd())?; let uring_file = Arc::new(UringFile { io: self.inner.clone(), file, + id, }); if std::env::var(common::ENV_DISABLE_FILE_LOCK).is_err() { uring_file.lock_file(!flags.contains(OpenFlags::ReadOnly))?; @@ -229,6 +246,7 @@ impl Clock for UringIO { pub struct UringFile { io: Rc>, file: std::fs::File, + id: u32, } unsafe impl Send for UringFile {} @@ -275,14 +293,12 @@ impl File for UringFile { fn pread(&self, pos: usize, c: Completion) -> Result> { let r = c.as_read(); trace!("pread(pos = {}, length = {})", pos, r.buf().len()); - let fd = io_uring::types::Fd(self.file.as_raw_fd()); let mut io = self.io.borrow_mut(); let read_e = { let mut buf = r.buf_mut(); let len = buf.len(); let buf = buf.as_mut_ptr(); - let iovec = io.get_iovec(buf, len); - io_uring::opcode::Readv::new(fd, iovec as *const iovec as *const libc::iovec, 1) + io_uring::opcode::Read::new(io_uring::types::Fixed(self.id), buf, len as u32) .offset(pos as u64) .build() .user_data(io.ring.get_key()) @@ -299,15 +315,17 @@ impl File for UringFile { c: Completion, ) -> Result> { let mut io = self.io.borrow_mut(); - let fd = io_uring::types::Fd(self.file.as_raw_fd()); let write = { let buf = buffer.borrow(); trace!("pwrite(pos = {}, length = {})", pos, buf.len()); - let iovec = io.get_iovec(buf.as_ptr(), buf.len()); - io_uring::opcode::Writev::new(fd, iovec as *const iovec as *const libc::iovec, 1) - .offset(pos as u64) - .build() - .user_data(io.ring.get_key()) + io_uring::opcode::Write::new( + io_uring::types::Fixed(self.id), + buf.as_ptr(), + buf.len() as u32, + ) + .offset(pos as u64) + .build() + .user_data(io.ring.get_key()) }; let c = Arc::new(c); let c_uring = c.clone(); @@ -325,10 +343,9 @@ impl File for UringFile { } fn sync(&self, c: Completion) -> Result> { - let fd = io_uring::types::Fd(self.file.as_raw_fd()); let mut io = self.io.borrow_mut(); trace!("sync()"); - let sync = io_uring::opcode::Fsync::new(fd) + let sync = io_uring::opcode::Fsync::new(io_uring::types::Fixed(self.id)) .build() .user_data(io.ring.get_key()); let c = Arc::new(c); @@ -344,6 +361,13 @@ impl File for UringFile { impl Drop for UringFile { fn drop(&mut self) { self.unlock_file().expect("Failed to unlock file"); + self.io + .borrow_mut() + .unregister_file(self.id) + .inspect_err(|e| { + debug!("Failed to unregister file: {e}"); + }) + .ok(); } }