Merge 'io_uring: use Arc pointer for user data of entries' from Preston Thorpe

trying to pull bite sized adjustments out of other open PR's

Closes #2281
This commit is contained in:
Pekka Enberg
2025-07-27 09:04:35 +03:00

View File

@@ -2,7 +2,7 @@
use super::{common, Completion, File, OpenFlags, IO};
use crate::io::clock::{Clock, Instant};
use crate::{LimboError, MemoryIO, Result};
use crate::{turso_assert, LimboError, MemoryIO, Result};
use rustix::fs::{self, FlockOperation, OFlags};
use std::cell::RefCell;
use std::collections::VecDeque;
@@ -17,6 +17,7 @@ use tracing::{debug, trace};
const ENTRIES: u32 = 512;
const SQPOLL_IDLE: u32 = 1000;
const FILES: u32 = 8;
#[derive(Debug, Error)]
enum UringIOError {
@@ -44,8 +45,6 @@ unsafe impl Sync for UringIO {}
struct WrappedIOUring {
ring: io_uring::IoUring,
pending_ops: usize,
pub pending: [Option<Arc<Completion>>; ENTRIES as usize + 1],
key: u64,
}
struct InnerUringIO {
@@ -65,15 +64,13 @@ impl UringIO {
Err(_) => io_uring::IoUring::new(ENTRIES)?,
};
// we only ever have 2 files open at a time for the moment
ring.submitter().register_files_sparse(8)?;
ring.submitter().register_files_sparse(FILES)?;
let inner = InnerUringIO {
ring: WrappedIOUring {
ring,
pending_ops: 0,
pending: [const { None }; ENTRIES as usize + 1],
key: 0,
},
free_files: (0..8).collect(),
free_files: (0..FILES).collect(),
};
debug!("Using IO backend 'io-uring'");
Ok(Self {
@@ -106,9 +103,8 @@ impl InnerUringIO {
}
impl WrappedIOUring {
fn submit_entry(&mut self, entry: &io_uring::squeue::Entry, c: Arc<Completion>) {
fn submit_entry(&mut self, entry: &io_uring::squeue::Entry) {
trace!("submit_entry({:?})", entry);
self.pending[entry.get_user_data() as usize] = Some(c);
unsafe {
self.ring
.submission()
@@ -137,16 +133,6 @@ impl WrappedIOUring {
fn empty(&self) -> bool {
self.pending_ops == 0
}
fn get_key(&mut self) -> u64 {
self.key += 1;
if self.key == ENTRIES as u64 {
let key = self.key;
self.key = 0;
return key;
}
self.key
}
}
impl IO for UringIO {
@@ -208,12 +194,9 @@ impl IO for UringIO {
cqe
)));
}
{
if let Some(c) = ring.pending[cqe.user_data() as usize].as_ref() {
c.complete(cqe.result());
}
}
ring.pending[cqe.user_data() as usize] = None;
let ud = cqe.user_data();
turso_assert!(ud > 0, "therea are no linked timeouts or cancelations, all cqe user_data should be valid arc pointers");
completion_from_key(ud).complete(result);
}
Ok(())
}
@@ -239,6 +222,19 @@ impl Clock for UringIO {
}
}
#[inline(always)]
/// use the callback pointer as the user_data for the operation as is
/// common practice for io_uring to prevent more indirection
fn get_key(c: Arc<Completion>) -> u64 {
Arc::into_raw(c) as u64
}
#[inline(always)]
/// convert the user_data back to an Arc<Completion> pointer
fn completion_from_key(key: u64) -> Arc<Completion> {
unsafe { Arc::from_raw(key as *const Completion) }
}
pub struct UringFile {
io: Rc<RefCell<InnerUringIO>>,
file: std::fs::File,
@@ -313,10 +309,10 @@ impl File for UringFile {
io_uring::opcode::Read::new(fd, buf, len as u32)
.offset(pos as u64)
.build()
.user_data(io.ring.get_key())
.user_data(get_key(c.clone()))
})
};
io.ring.submit_entry(&read_e, c.clone());
io.ring.submit_entry(&read_e);
Ok(c)
}
@@ -334,18 +330,10 @@ impl File for UringFile {
io_uring::opcode::Write::new(fd, buf.as_ptr(), buf.len() as u32)
.offset(pos as u64)
.build()
.user_data(io.ring.get_key())
.user_data(get_key(c.clone()))
})
};
let c_uring = c.clone();
io.ring.submit_entry(
&write,
Arc::new(Completion::new_write(move |result| {
c_uring.complete(result);
// NOTE: Explicitly reference buffer to ensure it lives until here
let _ = buffer.borrow();
})),
);
io.ring.submit_entry(&write);
Ok(c)
}
@@ -355,9 +343,9 @@ impl File for UringFile {
let sync = with_fd!(self, |fd| {
io_uring::opcode::Fsync::new(fd)
.build()
.user_data(io.ring.get_key())
.user_data(get_key(c.clone()))
});
io.ring.submit_entry(&sync, c.clone());
io.ring.submit_entry(&sync);
Ok(c)
}