From b7debabd817d60e71f322ceffc7377555f6b9a86 Mon Sep 17 00:00:00 2001 From: Arpit Saxena Date: Sat, 28 Sep 2024 00:10:05 +0530 Subject: [PATCH] Wrap IoUring to ensure pending_ops is always correctly updated Adds a struct WrappedIOUring which contains a IoUring and a pending_ops field. Entry submission and popping from the queue is done through functions operating on it, which also maintains pending_ops count NOTE: This is a bit weird/hacky since in get_completion we create a CompletionQueue and just call its next(). If it were a normal iterator it would always return the same first item. However it is a queue posing as an iterator which makes this work. --- core/io/linux.rs | 73 +++++++++++++++++++++++++++++++----------------- 1 file changed, 47 insertions(+), 26 deletions(-) diff --git a/core/io/linux.rs b/core/io/linux.rs index a23ab5a2f..3542e2870 100644 --- a/core/io/linux.rs +++ b/core/io/linux.rs @@ -3,7 +3,7 @@ use crate::{LimboError, Result}; use libc::{c_short, fcntl, flock, iovec, F_SETLK}; use log::{debug, trace}; use nix::fcntl::{FcntlArg, OFlag}; -use std::cell::{RefCell, RefMut}; +use std::cell::RefCell; use std::fmt; use std::os::unix::io::AsRawFd; use std::rc::Rc; @@ -33,24 +33,27 @@ pub struct LinuxIO { inner: Rc>, } -pub struct InnerLinuxIO { +struct WrappedIOUring { ring: io_uring::IoUring, + pending_ops: usize, +} + +struct InnerLinuxIO { + ring: WrappedIOUring, iovecs: [iovec; MAX_IOVECS], next_iovec: usize, - pending_ops: usize, } impl LinuxIO { pub fn new() -> Result { let ring = io_uring::IoUring::new(MAX_IOVECS as u32)?; let inner = InnerLinuxIO { - ring: ring, + ring: WrappedIOUring{ring, pending_ops: 0}, iovecs: [iovec { iov_base: std::ptr::null_mut(), iov_len: 0, }; MAX_IOVECS], next_iovec: 0, - pending_ops: 0, }; Ok(Self { inner: Rc::new(RefCell::new(inner)), @@ -68,6 +71,36 @@ impl InnerLinuxIO { } } +impl WrappedIOUring { + fn submit_entry(&mut self, entry: &io_uring::squeue::Entry) { + unsafe { + self.ring.submission() + .push(entry) + .expect("submission queue is full"); + } + self.pending_ops += 1; + } + + fn wait_for_completion(&mut self) -> Result<()> { + self.ring.submit_and_wait(1)?; + Ok(()) + } + + fn get_completion(&mut self) -> Option { + // NOTE: This works because CompletionQueue's next function pops the head of the queue. This is not normal behaviour of iterators + let entry = self.ring.completion().next(); + if entry.is_some() { + // consumed an entry from completion queue, update pending_ops + self.pending_ops -= 1; + } + entry + } + + fn empty(&self) -> bool { + self.pending_ops == 0 + } +} + impl IO for LinuxIO { fn open_file(&self, path: &str) -> Result> { trace!("open_file(path = {})", path); @@ -91,15 +124,15 @@ impl IO for LinuxIO { fn run_once(&self) -> Result<()> { trace!("run_once()"); - let inner = self.inner.borrow_mut(); - let (mut pending_ops, mut ring) = RefMut::map_split(inner, |inner_ref: &mut InnerLinuxIO| (&mut inner_ref.pending_ops, &mut inner_ref.ring)); - if *pending_ops == 0 { - return Ok(()); + let mut inner = self.inner.borrow_mut(); + let ring = &mut inner.ring; + + if ring.empty() { + return Ok(()) } - ring.submit_and_wait(1)?; - while let Some(cqe) = ring.completion().next() { - *pending_ops -= 1; + ring.wait_for_completion()?; + while let Some(cqe) = ring.get_completion() { let result = cqe.result(); if result < 0 { return Err(LimboError::LinuxIOError(format!( @@ -199,13 +232,7 @@ impl File for LinuxFile { .build() .user_data(ptr as u64) }; - let ring = &mut io.ring; - unsafe { - ring.submission() - .push(&read_e) - .expect("submission queue is full"); - } - io.pending_ops += 1; + io.ring.submit_entry(&read_e); Ok(()) } @@ -226,13 +253,7 @@ impl File for LinuxFile { .build() .user_data(ptr as u64) }; - let ring = &mut io.ring; - unsafe { - ring.submission() - .push(&write) - .expect("submission queue is full"); - } - io.pending_ops += 1; + io.ring.submit_entry(&write); Ok(()) } }