From ec4bf19fc7b2f1e8965b475113b23c19ab2d14ff Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Wed, 13 Aug 2025 12:07:41 -0400 Subject: [PATCH] Gracefully handle submission queue overflow in io_uring backend --- core/io/io_uring.rs | 52 ++++++++++++++++++++++++++++++++++++++------- 1 file changed, 44 insertions(+), 8 deletions(-) diff --git a/core/io/io_uring.rs b/core/io/io_uring.rs index a5b32c661..e780e049f 100644 --- a/core/io/io_uring.rs +++ b/core/io/io_uring.rs @@ -55,6 +55,7 @@ struct WrappedIOUring { ring: io_uring::IoUring, pending_ops: usize, writev_states: HashMap, + overflow: VecDeque, iov_pool: IovecPool, } @@ -118,6 +119,7 @@ impl UringIO { let inner = InnerUringIO { ring: WrappedIOUring { ring, + overflow: VecDeque::new(), pending_ops: 0, writev_states: HashMap::new(), iov_pool: IovecPool::new(), @@ -287,18 +289,51 @@ impl InnerUringIO { impl WrappedIOUring { fn submit_entry(&mut self, entry: &io_uring::squeue::Entry) { trace!("submit_entry({:?})", entry); - unsafe { - let mut sub = self.ring.submission_shared(); - match sub.push(entry) { - Ok(_) => self.pending_ops += 1, - Err(e) => { - tracing::error!("Failed to submit entry: {e}"); - self.ring.submit().expect("failed to submit entry"); - sub.push(entry).expect("failed to push entry after submit"); + // we cannot push current entries before any overflow + if self.flush_overflow().is_ok() { + let pushed = unsafe { + let mut sub = self.ring.submission(); + sub.push(entry).is_ok() + }; + if pushed { + self.pending_ops += 1; + return; + } + } + // if we were unable to push, add to overflow + self.overflow.push_back(entry.clone()); + self.ring.submit().expect("submiting when full"); + } + + /// Flush overflow entries to submission queue when possible + fn flush_overflow(&mut self) -> Result<()> { + while !self.overflow.is_empty() { + let sub_len = self.ring.submission().len(); + // safe subtraction as submission len will always be < ENTRIES + let available_space = ENTRIES as usize - sub_len; + if available_space == 0 { + // No space available, always return error if we dont flush all overflow entries + // to prevent out of order I/O operations + return Err(LimboError::UringIOError("squeue full".into())); + } + // Push as many as we can + let to_push = std::cmp::min(available_space, self.overflow.len()); + unsafe { + let mut sq = self.ring.submission(); + for _ in 0..to_push { + let entry = self.overflow.pop_front().unwrap(); + if sq.push(&entry).is_err() { + // Unexpected failure, put it back + self.overflow.push_front(entry); + // No space available, always return error if we dont flush all overflow entries + // to prevent out of order I/O operations + return Err(LimboError::UringIOError("squeue full".into())); + } self.pending_ops += 1; } } } + Ok(()) } fn submit_and_wait(&mut self) -> Result<()> { @@ -475,6 +510,7 @@ impl IO for UringIO { trace!("run_once()"); let mut inner = self.inner.borrow_mut(); let ring = &mut inner.ring; + ring.flush_overflow()?; if ring.empty() { return Ok(()); }