From 30454336a6663245aaa101ce9f010408dc0b49d1 Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Wed, 3 Sep 2025 10:54:42 -0400 Subject: [PATCH 1/2] Make io_uring sound for connections across multiple threads --- core/io/io_uring.rs | 32 ++++++++++++++------------------ 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/core/io/io_uring.rs b/core/io/io_uring.rs index aca0ed42f..5de73849f 100644 --- a/core/io/io_uring.rs +++ b/core/io/io_uring.rs @@ -1,18 +1,17 @@ #![allow(clippy::arc_with_non_send_sync)] use super::{common, Completion, CompletionInner, File, OpenFlags, IO}; +use crate::fast_lock::SpinLock; use crate::io::clock::{Clock, Instant}; use crate::storage::wal::CKPT_BATCH_PAGES; use crate::{turso_assert, LimboError, Result}; use rustix::fs::{self, FlockOperation, OFlags}; use std::ptr::NonNull; use std::{ - cell::RefCell, collections::{HashMap, VecDeque}, io::ErrorKind, ops::Deref, os::{fd::AsFd, unix::io::AsRawFd}, - rc::Rc, sync::Arc, }; use tracing::{debug, trace}; @@ -45,7 +44,7 @@ const MAX_WAIT: usize = 4; const ARENA_COUNT: usize = 2; pub struct UringIO { - inner: Rc>, + inner: Arc>, } unsafe impl Send for UringIO {} @@ -129,7 +128,7 @@ impl UringIO { }; debug!("Using IO backend 'io-uring'"); Ok(Self { - inner: Rc::new(RefCell::new(inner)), + inner: Arc::new(SpinLock::new(inner)), }) } } @@ -490,7 +489,7 @@ impl IO for UringIO { Err(error) => debug!("Error {error:?} returned when setting O_DIRECT flag to read file. The performance of the system may be affected"), } } - let id = self.inner.borrow_mut().register_file(file.as_raw_fd()).ok(); + let id = self.inner.lock().register_file(file.as_raw_fd()).ok(); let uring_file = Arc::new(UringFile { io: self.inner.clone(), file, @@ -509,7 +508,7 @@ impl IO for UringIO { fn run_once(&self) -> Result<()> { trace!("run_once()"); - let mut inner = self.inner.borrow_mut(); + let mut inner = self.inner.lock(); let ring = &mut inner.ring; ring.flush_overflow()?; if ring.empty() { @@ -541,7 +540,7 @@ impl IO for UringIO { len % 512 == 0, "fixed buffer length must be logical block aligned" ); - let mut inner = self.inner.borrow_mut(); + let mut inner = self.inner.lock(); let slot = inner.free_arenas.iter().position(|e| e.is_none()).ok_or( crate::error::CompletionError::UringIOError("no free fixed buffer slots"), )?; @@ -585,7 +584,7 @@ fn completion_from_key(key: u64) -> Completion { } pub struct UringFile { - io: Rc>, + io: Arc>, file: std::fs::File, id: Option, } @@ -645,7 +644,6 @@ impl File for UringFile { fn pread(&self, pos: u64, c: Completion) -> Result { let r = c.as_read(); - let mut io = self.io.borrow_mut(); let read_e = { let buf = r.buf(); let ptr = buf.as_mut_ptr(); @@ -660,7 +658,7 @@ impl File for UringFile { ); #[cfg(debug_assertions)] { - io.debug_check_fixed(idx, ptr, len); + self.io.lock().debug_check_fixed(idx, ptr, len); } io_uring::opcode::ReadFixed::new(fd, ptr, len as u32, idx as u16) .offset(pos) @@ -676,12 +674,12 @@ impl File for UringFile { } }) }; - io.ring.submit_entry(&read_e); + self.io.lock().ring.submit_entry(&read_e); Ok(c) } fn pwrite(&self, pos: u64, buffer: Arc, c: Completion) -> Result { - let mut io = self.io.borrow_mut(); + let mut io = self.io.lock(); let write = { let ptr = buffer.as_ptr(); let len = buffer.len(); @@ -715,7 +713,7 @@ impl File for UringFile { } fn sync(&self, c: Completion) -> Result { - let mut io = self.io.borrow_mut(); + let mut io = self.io.lock(); trace!("sync()"); let sync = with_fd!(self, |fd| { io_uring::opcode::Fsync::new(fd) @@ -737,10 +735,9 @@ impl File for UringFile { return self.pwrite(pos, bufs[0].clone(), c.clone()); } tracing::trace!("pwritev(pos = {}, bufs.len() = {})", pos, bufs.len()); - let mut io = self.io.borrow_mut(); // create state to track ongoing writev operation let state = WritevState::new(self, pos, bufs); - io.ring.submit_writev(get_key(c.clone()), state); + self.io.lock().ring.submit_writev(get_key(c.clone()), state); Ok(c) } @@ -749,13 +746,12 @@ impl File for UringFile { } fn truncate(&self, len: u64, c: Completion) -> Result { - let mut io = self.io.borrow_mut(); let truncate = with_fd!(self, |fd| { io_uring::opcode::Ftruncate::new(fd, len) .build() .user_data(get_key(c.clone())) }); - io.ring.submit_entry(&truncate); + self.io.lock().ring.submit_entry(&truncate); Ok(c) } } @@ -765,7 +761,7 @@ impl Drop for UringFile { self.unlock_file().expect("Failed to unlock file"); if let Some(id) = self.id { self.io - .borrow_mut() + .lock() .unregister_file(id) .inspect_err(|e| { debug!("Failed to unregister file: {e}"); From c5b6df4249b13878403633fbbb027081560a3e65 Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Wed, 3 Sep 2025 11:12:33 -0400 Subject: [PATCH 2/2] Use mutex in place of spinlock for io_uring --- core/io/io_uring.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/io/io_uring.rs b/core/io/io_uring.rs index 5de73849f..393169028 100644 --- a/core/io/io_uring.rs +++ b/core/io/io_uring.rs @@ -1,10 +1,10 @@ #![allow(clippy::arc_with_non_send_sync)] use super::{common, Completion, CompletionInner, File, OpenFlags, IO}; -use crate::fast_lock::SpinLock; use crate::io::clock::{Clock, Instant}; use crate::storage::wal::CKPT_BATCH_PAGES; use crate::{turso_assert, LimboError, Result}; +use parking_lot::Mutex; use rustix::fs::{self, FlockOperation, OFlags}; use std::ptr::NonNull; use std::{ @@ -44,7 +44,7 @@ const MAX_WAIT: usize = 4; const ARENA_COUNT: usize = 2; pub struct UringIO { - inner: Arc>, + inner: Arc>, } unsafe impl Send for UringIO {} @@ -128,7 +128,7 @@ impl UringIO { }; debug!("Using IO backend 'io-uring'"); Ok(Self { - inner: Arc::new(SpinLock::new(inner)), + inner: Arc::new(Mutex::new(inner)), }) } } @@ -584,7 +584,7 @@ fn completion_from_key(key: u64) -> Completion { } pub struct UringFile { - io: Arc>, + io: Arc>, file: std::fs::File, id: Option, }