use super::{Completion, File, WriteCompletion, IO}; use anyhow::{ensure, Result}; use libc::iovec; use log::{debug, trace}; use std::cell::{Ref, RefCell}; use nix::fcntl::{self, FcntlArg, OFlag}; use std::os::unix::io::AsRawFd; use std::rc::Rc; use std::fmt; use thiserror::Error; const MAX_IOVECS: usize = 128; #[derive(Debug, Error)] enum LinuxIOError { IOUringCQError(i32), } // Implement the Display trait to customize error messages impl fmt::Display for LinuxIOError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { LinuxIOError::IOUringCQError(code) => write!(f, "IOUring completion queue error occurred with code {}", code), } } } pub struct LinuxIO { inner: Rc>, } pub struct InnerLinuxIO { ring: io_uring::IoUring, iovecs: [iovec; MAX_IOVECS], next_iovec: usize, } impl LinuxIO { pub fn new() -> Result { let ring = io_uring::IoUring::new(MAX_IOVECS as u32)?; let inner = InnerLinuxIO { ring: ring, iovecs: [iovec { iov_base: std::ptr::null_mut(), iov_len: 0, }; MAX_IOVECS], next_iovec: 0, }; Ok(Self { inner: Rc::new(RefCell::new(inner)), }) } } impl InnerLinuxIO { pub fn get_iovec<'a>(&'a mut self, buf: *const u8, len: usize) -> &'a iovec { let iovec = &mut self.iovecs[self.next_iovec]; iovec.iov_base = buf as *mut std::ffi::c_void; iovec.iov_len = len; self.next_iovec = (self.next_iovec + 1) % MAX_IOVECS; iovec } } impl IO for LinuxIO { fn open_file(&self, path: &str) -> Result> { trace!("open_file(path = {})", path); let file = std::fs::File::options() .read(true) .write(true) .open(path)?; // Let's attempt to enable direct I/O. Not all filesystems support it // so ignore any errors. let fd = file.as_raw_fd(); match nix::fcntl::fcntl(fd, FcntlArg::F_SETFL(OFlag::O_DIRECT)) { Ok(_) => {}, Err(error) => debug!("Error {error:?} returned when setting O_DIRECT flag to read file. The performance of the system may be affected"), }; Ok(Rc::new(LinuxFile { io: self.inner.clone(), file, })) } fn run_once(&self) -> Result<()> { trace!("run_once()"); let mut inner = self.inner.borrow_mut(); let mut ring = &mut inner.ring; ring.submit_and_wait(1)?; while let Some(cqe) = ring.completion().next() { let result = cqe.result(); ensure!( result >= 0, LinuxIOError::IOUringCQError(result) ); let c = unsafe { Rc::from_raw(cqe.user_data() as *const Completion) }; c.complete(); } Ok(()) } } pub struct LinuxFile { io: Rc>, file: std::fs::File, } impl File for LinuxFile { fn pread(&self, pos: usize, c: Rc) -> Result<()> { trace!("pread(pos = {}, length = {})", pos, c.buf().len()); let fd = io_uring::types::Fd(self.file.as_raw_fd()); let mut io = self.io.borrow_mut(); let read_e = { let mut buf = c.buf_mut(); let len = buf.len(); let buf = buf.as_mut_ptr(); let ptr = Rc::into_raw(c.clone()); let iovec = io.get_iovec(buf, len); io_uring::opcode::Readv::new(fd, iovec, 1) .offset(pos as u64) .build() .user_data(ptr as u64) }; let mut ring = &mut io.ring; unsafe { ring.submission() .push(&read_e) .expect("submission queue is full"); } Ok(()) } fn pwrite( &self, pos: usize, buffer: Rc>, c: Rc, ) -> Result<()> { let mut io = self.io.borrow_mut(); let fd = io_uring::types::Fd(self.file.as_raw_fd()); let write = { let buf = buffer.borrow(); let ptr = Rc::into_raw(c.clone()); let iovec = io.get_iovec(buf.as_ptr(), buf.len()); io_uring::opcode::Writev::new(fd, iovec, 1) .offset(pos as u64) .build() .user_data(ptr as u64) }; let mut ring = &mut io.ring; unsafe { ring.submission() .push(&write) .expect("submission queue is full"); } Ok(()) } }