mirror of
https://github.com/aljazceru/turso.git
synced 2025-12-30 22:44:21 +01:00
Gracefully handle submission queue overflow in io_uring backend
This commit is contained in:
@@ -55,6 +55,7 @@ struct WrappedIOUring {
|
||||
ring: io_uring::IoUring,
|
||||
pending_ops: usize,
|
||||
writev_states: HashMap<u64, WritevState>,
|
||||
overflow: VecDeque<io_uring::squeue::Entry>,
|
||||
iov_pool: IovecPool,
|
||||
}
|
||||
|
||||
@@ -118,6 +119,7 @@ impl UringIO {
|
||||
let inner = InnerUringIO {
|
||||
ring: WrappedIOUring {
|
||||
ring,
|
||||
overflow: VecDeque::new(),
|
||||
pending_ops: 0,
|
||||
writev_states: HashMap::new(),
|
||||
iov_pool: IovecPool::new(),
|
||||
@@ -287,18 +289,51 @@ impl InnerUringIO {
|
||||
impl WrappedIOUring {
|
||||
fn submit_entry(&mut self, entry: &io_uring::squeue::Entry) {
|
||||
trace!("submit_entry({:?})", entry);
|
||||
unsafe {
|
||||
let mut sub = self.ring.submission_shared();
|
||||
match sub.push(entry) {
|
||||
Ok(_) => self.pending_ops += 1,
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to submit entry: {e}");
|
||||
self.ring.submit().expect("failed to submit entry");
|
||||
sub.push(entry).expect("failed to push entry after submit");
|
||||
// we cannot push current entries before any overflow
|
||||
if self.flush_overflow().is_ok() {
|
||||
let pushed = unsafe {
|
||||
let mut sub = self.ring.submission();
|
||||
sub.push(entry).is_ok()
|
||||
};
|
||||
if pushed {
|
||||
self.pending_ops += 1;
|
||||
return;
|
||||
}
|
||||
}
|
||||
// if we were unable to push, add to overflow
|
||||
self.overflow.push_back(entry.clone());
|
||||
self.ring.submit().expect("submiting when full");
|
||||
}
|
||||
|
||||
/// Flush overflow entries to submission queue when possible
|
||||
fn flush_overflow(&mut self) -> Result<()> {
|
||||
while !self.overflow.is_empty() {
|
||||
let sub_len = self.ring.submission().len();
|
||||
// safe subtraction as submission len will always be < ENTRIES
|
||||
let available_space = ENTRIES as usize - sub_len;
|
||||
if available_space == 0 {
|
||||
// No space available, always return error if we dont flush all overflow entries
|
||||
// to prevent out of order I/O operations
|
||||
return Err(LimboError::UringIOError("squeue full".into()));
|
||||
}
|
||||
// Push as many as we can
|
||||
let to_push = std::cmp::min(available_space, self.overflow.len());
|
||||
unsafe {
|
||||
let mut sq = self.ring.submission();
|
||||
for _ in 0..to_push {
|
||||
let entry = self.overflow.pop_front().unwrap();
|
||||
if sq.push(&entry).is_err() {
|
||||
// Unexpected failure, put it back
|
||||
self.overflow.push_front(entry);
|
||||
// No space available, always return error if we dont flush all overflow entries
|
||||
// to prevent out of order I/O operations
|
||||
return Err(LimboError::UringIOError("squeue full".into()));
|
||||
}
|
||||
self.pending_ops += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn submit_and_wait(&mut self) -> Result<()> {
|
||||
@@ -475,6 +510,7 @@ impl IO for UringIO {
|
||||
trace!("run_once()");
|
||||
let mut inner = self.inner.borrow_mut();
|
||||
let ring = &mut inner.ring;
|
||||
ring.flush_overflow()?;
|
||||
if ring.empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user