mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-06 17:54:20 +01:00
Merge 'Make io_uring sound for connections on multiple threads' from Preston Thorpe
closes #1232 Reviewed-by: Jussi Saurio <jussi.saurio@gmail.com> Closes #2908
This commit is contained in:
@@ -4,15 +4,14 @@ use super::{common, Completion, CompletionInner, File, OpenFlags, IO};
|
||||
use crate::io::clock::{Clock, Instant};
|
||||
use crate::storage::wal::CKPT_BATCH_PAGES;
|
||||
use crate::{turso_assert, LimboError, Result};
|
||||
use parking_lot::Mutex;
|
||||
use rustix::fs::{self, FlockOperation, OFlags};
|
||||
use std::ptr::NonNull;
|
||||
use std::{
|
||||
cell::RefCell,
|
||||
collections::{HashMap, VecDeque},
|
||||
io::ErrorKind,
|
||||
ops::Deref,
|
||||
os::{fd::AsFd, unix::io::AsRawFd},
|
||||
rc::Rc,
|
||||
sync::Arc,
|
||||
};
|
||||
use tracing::{debug, trace};
|
||||
@@ -45,7 +44,7 @@ const MAX_WAIT: usize = 4;
|
||||
const ARENA_COUNT: usize = 2;
|
||||
|
||||
pub struct UringIO {
|
||||
inner: Rc<RefCell<InnerUringIO>>,
|
||||
inner: Arc<Mutex<InnerUringIO>>,
|
||||
}
|
||||
|
||||
unsafe impl Send for UringIO {}
|
||||
@@ -129,7 +128,7 @@ impl UringIO {
|
||||
};
|
||||
debug!("Using IO backend 'io-uring'");
|
||||
Ok(Self {
|
||||
inner: Rc::new(RefCell::new(inner)),
|
||||
inner: Arc::new(Mutex::new(inner)),
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -490,7 +489,7 @@ impl IO for UringIO {
|
||||
Err(error) => debug!("Error {error:?} returned when setting O_DIRECT flag to read file. The performance of the system may be affected"),
|
||||
}
|
||||
}
|
||||
let id = self.inner.borrow_mut().register_file(file.as_raw_fd()).ok();
|
||||
let id = self.inner.lock().register_file(file.as_raw_fd()).ok();
|
||||
let uring_file = Arc::new(UringFile {
|
||||
io: self.inner.clone(),
|
||||
file,
|
||||
@@ -509,7 +508,7 @@ impl IO for UringIO {
|
||||
|
||||
fn run_once(&self) -> Result<()> {
|
||||
trace!("run_once()");
|
||||
let mut inner = self.inner.borrow_mut();
|
||||
let mut inner = self.inner.lock();
|
||||
let ring = &mut inner.ring;
|
||||
ring.flush_overflow()?;
|
||||
if ring.empty() {
|
||||
@@ -541,7 +540,7 @@ impl IO for UringIO {
|
||||
len % 512 == 0,
|
||||
"fixed buffer length must be logical block aligned"
|
||||
);
|
||||
let mut inner = self.inner.borrow_mut();
|
||||
let mut inner = self.inner.lock();
|
||||
let slot = inner.free_arenas.iter().position(|e| e.is_none()).ok_or(
|
||||
crate::error::CompletionError::UringIOError("no free fixed buffer slots"),
|
||||
)?;
|
||||
@@ -585,7 +584,7 @@ fn completion_from_key(key: u64) -> Completion {
|
||||
}
|
||||
|
||||
pub struct UringFile {
|
||||
io: Rc<RefCell<InnerUringIO>>,
|
||||
io: Arc<Mutex<InnerUringIO>>,
|
||||
file: std::fs::File,
|
||||
id: Option<u32>,
|
||||
}
|
||||
@@ -645,7 +644,6 @@ impl File for UringFile {
|
||||
|
||||
fn pread(&self, pos: u64, c: Completion) -> Result<Completion> {
|
||||
let r = c.as_read();
|
||||
let mut io = self.io.borrow_mut();
|
||||
let read_e = {
|
||||
let buf = r.buf();
|
||||
let ptr = buf.as_mut_ptr();
|
||||
@@ -660,7 +658,7 @@ impl File for UringFile {
|
||||
);
|
||||
#[cfg(debug_assertions)]
|
||||
{
|
||||
io.debug_check_fixed(idx, ptr, len);
|
||||
self.io.lock().debug_check_fixed(idx, ptr, len);
|
||||
}
|
||||
io_uring::opcode::ReadFixed::new(fd, ptr, len as u32, idx as u16)
|
||||
.offset(pos)
|
||||
@@ -676,12 +674,12 @@ impl File for UringFile {
|
||||
}
|
||||
})
|
||||
};
|
||||
io.ring.submit_entry(&read_e);
|
||||
self.io.lock().ring.submit_entry(&read_e);
|
||||
Ok(c)
|
||||
}
|
||||
|
||||
fn pwrite(&self, pos: u64, buffer: Arc<crate::Buffer>, c: Completion) -> Result<Completion> {
|
||||
let mut io = self.io.borrow_mut();
|
||||
let mut io = self.io.lock();
|
||||
let write = {
|
||||
let ptr = buffer.as_ptr();
|
||||
let len = buffer.len();
|
||||
@@ -715,7 +713,7 @@ impl File for UringFile {
|
||||
}
|
||||
|
||||
fn sync(&self, c: Completion) -> Result<Completion> {
|
||||
let mut io = self.io.borrow_mut();
|
||||
let mut io = self.io.lock();
|
||||
trace!("sync()");
|
||||
let sync = with_fd!(self, |fd| {
|
||||
io_uring::opcode::Fsync::new(fd)
|
||||
@@ -737,10 +735,9 @@ impl File for UringFile {
|
||||
return self.pwrite(pos, bufs[0].clone(), c.clone());
|
||||
}
|
||||
tracing::trace!("pwritev(pos = {}, bufs.len() = {})", pos, bufs.len());
|
||||
let mut io = self.io.borrow_mut();
|
||||
// create state to track ongoing writev operation
|
||||
let state = WritevState::new(self, pos, bufs);
|
||||
io.ring.submit_writev(get_key(c.clone()), state);
|
||||
self.io.lock().ring.submit_writev(get_key(c.clone()), state);
|
||||
Ok(c)
|
||||
}
|
||||
|
||||
@@ -749,13 +746,12 @@ impl File for UringFile {
|
||||
}
|
||||
|
||||
fn truncate(&self, len: u64, c: Completion) -> Result<Completion> {
|
||||
let mut io = self.io.borrow_mut();
|
||||
let truncate = with_fd!(self, |fd| {
|
||||
io_uring::opcode::Ftruncate::new(fd, len)
|
||||
.build()
|
||||
.user_data(get_key(c.clone()))
|
||||
});
|
||||
io.ring.submit_entry(&truncate);
|
||||
self.io.lock().ring.submit_entry(&truncate);
|
||||
Ok(c)
|
||||
}
|
||||
}
|
||||
@@ -765,7 +761,7 @@ impl Drop for UringFile {
|
||||
self.unlock_file().expect("Failed to unlock file");
|
||||
if let Some(id) = self.id {
|
||||
self.io
|
||||
.borrow_mut()
|
||||
.lock()
|
||||
.unregister_file(id)
|
||||
.inspect_err(|e| {
|
||||
debug!("Failed to unregister file: {e}");
|
||||
|
||||
Reference in New Issue
Block a user