From 735026b50269937878be667593fa25b130f8ce4c Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Sat, 26 Jul 2025 16:35:40 -0400 Subject: [PATCH] Use Arc pointer for user data and save indirection when processing sqe/cqes --- core/io/io_uring.rs | 66 +++++++++++++++++++-------------------------- 1 file changed, 27 insertions(+), 39 deletions(-) diff --git a/core/io/io_uring.rs b/core/io/io_uring.rs index 3983d61dc..aa0f21f3c 100644 --- a/core/io/io_uring.rs +++ b/core/io/io_uring.rs @@ -2,7 +2,7 @@ use super::{common, Completion, File, OpenFlags, IO}; use crate::io::clock::{Clock, Instant}; -use crate::{LimboError, MemoryIO, Result}; +use crate::{turso_assert, LimboError, MemoryIO, Result}; use rustix::fs::{self, FlockOperation, OFlags}; use std::cell::RefCell; use std::collections::VecDeque; @@ -17,6 +17,7 @@ use tracing::{debug, trace}; const ENTRIES: u32 = 512; const SQPOLL_IDLE: u32 = 1000; +const FILES: u32 = 8; #[derive(Debug, Error)] enum UringIOError { @@ -44,8 +45,6 @@ unsafe impl Sync for UringIO {} struct WrappedIOUring { ring: io_uring::IoUring, pending_ops: usize, - pub pending: [Option>; ENTRIES as usize + 1], - key: u64, } struct InnerUringIO { @@ -65,15 +64,13 @@ impl UringIO { Err(_) => io_uring::IoUring::new(ENTRIES)?, }; // we only ever have 2 files open at a time for the moment - ring.submitter().register_files_sparse(8)?; + ring.submitter().register_files_sparse(FILES)?; let inner = InnerUringIO { ring: WrappedIOUring { ring, pending_ops: 0, - pending: [const { None }; ENTRIES as usize + 1], - key: 0, }, - free_files: (0..8).collect(), + free_files: (0..FILES).collect(), }; debug!("Using IO backend 'io-uring'"); Ok(Self { @@ -106,9 +103,8 @@ impl InnerUringIO { } impl WrappedIOUring { - fn submit_entry(&mut self, entry: &io_uring::squeue::Entry, c: Arc) { + fn submit_entry(&mut self, entry: &io_uring::squeue::Entry) { trace!("submit_entry({:?})", entry); - self.pending[entry.get_user_data() as usize] = Some(c); unsafe { self.ring .submission() @@ -137,16 +133,6 @@ impl WrappedIOUring { fn empty(&self) -> bool { self.pending_ops == 0 } - - fn get_key(&mut self) -> u64 { - self.key += 1; - if self.key == ENTRIES as u64 { - let key = self.key; - self.key = 0; - return key; - } - self.key - } } impl IO for UringIO { @@ -208,12 +194,9 @@ impl IO for UringIO { cqe ))); } - { - if let Some(c) = ring.pending[cqe.user_data() as usize].as_ref() { - c.complete(cqe.result()); - } - } - ring.pending[cqe.user_data() as usize] = None; + let ud = cqe.user_data(); + turso_assert!(ud > 0, "therea are no linked timeouts or cancelations, all cqe user_data should be valid arc pointers"); + completion_from_key(ud).complete(result); } Ok(()) } @@ -239,6 +222,19 @@ impl Clock for UringIO { } } +#[inline(always)] +/// use the callback pointer as the user_data for the operation as is +/// common practice for io_uring to prevent more indirection +fn get_key(c: Arc) -> u64 { + Arc::into_raw(c) as u64 +} + +#[inline(always)] +/// convert the user_data back to an Arc pointer +fn completion_from_key(key: u64) -> Arc { + unsafe { Arc::from_raw(key as *const Completion) } +} + pub struct UringFile { io: Rc>, file: std::fs::File, @@ -313,10 +309,10 @@ impl File for UringFile { io_uring::opcode::Read::new(fd, buf, len as u32) .offset(pos as u64) .build() - .user_data(io.ring.get_key()) + .user_data(get_key(c.clone())) }) }; - io.ring.submit_entry(&read_e, c.clone()); + io.ring.submit_entry(&read_e); Ok(c) } @@ -334,18 +330,10 @@ impl File for UringFile { io_uring::opcode::Write::new(fd, buf.as_ptr(), buf.len() as u32) .offset(pos as u64) .build() - .user_data(io.ring.get_key()) + .user_data(get_key(c.clone())) }) }; - let c_uring = c.clone(); - io.ring.submit_entry( - &write, - Arc::new(Completion::new_write(move |result| { - c_uring.complete(result); - // NOTE: Explicitly reference buffer to ensure it lives until here - let _ = buffer.borrow(); - })), - ); + io.ring.submit_entry(&write); Ok(c) } @@ -355,9 +343,9 @@ impl File for UringFile { let sync = with_fd!(self, |fd| { io_uring::opcode::Fsync::new(fd) .build() - .user_data(io.ring.get_key()) + .user_data(get_key(c.clone())) }); - io.ring.submit_entry(&sync, c.clone()); + io.ring.submit_entry(&sync); Ok(c) }