mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-23 17:05:36 +01:00
Add registered file descriptors to io_uring IO module
This commit is contained in:
@@ -5,8 +5,8 @@ use crate::io::clock::{Clock, Instant};
|
||||
use crate::io::CompletionType;
|
||||
use crate::{LimboError, MemoryIO, Result};
|
||||
use rustix::fs::{self, FlockOperation, OFlags};
|
||||
use rustix::io_uring::iovec;
|
||||
use std::cell::RefCell;
|
||||
use std::collections::VecDeque;
|
||||
use std::fmt;
|
||||
use std::io::ErrorKind;
|
||||
use std::os::fd::AsFd;
|
||||
@@ -16,7 +16,7 @@ use std::sync::Arc;
|
||||
use thiserror::Error;
|
||||
use tracing::{debug, trace};
|
||||
|
||||
const MAX_IOVECS: u32 = 128;
|
||||
const ENTRIES: u32 = 128;
|
||||
const SQPOLL_IDLE: u32 = 1000;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
@@ -45,37 +45,34 @@ unsafe impl Sync for UringIO {}
|
||||
struct WrappedIOUring {
|
||||
ring: io_uring::IoUring,
|
||||
pending_ops: usize,
|
||||
pub pending: [Option<Arc<Completion>>; MAX_IOVECS as usize + 1],
|
||||
pub pending: [Option<Arc<Completion>>; ENTRIES as usize + 1],
|
||||
key: u64,
|
||||
}
|
||||
|
||||
struct InnerUringIO {
|
||||
ring: WrappedIOUring,
|
||||
iovecs: [iovec; MAX_IOVECS as usize],
|
||||
next_iovec: usize,
|
||||
free_files: VecDeque<u32>,
|
||||
}
|
||||
|
||||
impl UringIO {
|
||||
pub fn new() -> Result<Self> {
|
||||
let ring = match io_uring::IoUring::builder()
|
||||
.setup_sqpoll(SQPOLL_IDLE)
|
||||
.build(MAX_IOVECS)
|
||||
.build(ENTRIES)
|
||||
{
|
||||
Ok(ring) => ring,
|
||||
Err(_) => io_uring::IoUring::new(MAX_IOVECS)?,
|
||||
Err(_) => io_uring::IoUring::new(ENTRIES)?,
|
||||
};
|
||||
// we only ever have 2 files open at a time for the moment
|
||||
ring.submitter().register_files_sparse(8)?;
|
||||
let inner = InnerUringIO {
|
||||
ring: WrappedIOUring {
|
||||
ring,
|
||||
pending_ops: 0,
|
||||
pending: [const { None }; MAX_IOVECS as usize + 1],
|
||||
pending: [const { None }; ENTRIES as usize + 1],
|
||||
key: 0,
|
||||
},
|
||||
iovecs: [iovec {
|
||||
iov_base: std::ptr::null_mut(),
|
||||
iov_len: 0,
|
||||
}; MAX_IOVECS as usize],
|
||||
next_iovec: 0,
|
||||
free_files: (0..8).collect(),
|
||||
};
|
||||
debug!("Using IO backend 'io-uring'");
|
||||
Ok(Self {
|
||||
@@ -85,12 +82,30 @@ impl UringIO {
|
||||
}
|
||||
|
||||
impl InnerUringIO {
|
||||
pub fn get_iovec(&mut self, buf: *const u8, len: usize) -> &iovec {
|
||||
let iovec = &mut self.iovecs[self.next_iovec];
|
||||
iovec.iov_base = buf as *mut std::ffi::c_void;
|
||||
iovec.iov_len = len;
|
||||
self.next_iovec = (self.next_iovec + 1) % MAX_IOVECS as usize;
|
||||
iovec
|
||||
fn register_file(&mut self, fd: i32) -> Result<u32> {
|
||||
if let Some(slot) = self.free_files.pop_front() {
|
||||
self.ring
|
||||
.ring
|
||||
.submitter()
|
||||
.register_files_update(slot, &[fd.as_raw_fd()])?;
|
||||
return Ok(slot);
|
||||
}
|
||||
Err(LimboError::UringIOError(
|
||||
"unable to register file, no free slots available".to_string(),
|
||||
))
|
||||
}
|
||||
fn unregister_file(&mut self, id: u32) -> Result<()> {
|
||||
if self.free_files.len() >= ENTRIES as usize {
|
||||
return Err(LimboError::UringIOError(
|
||||
"unable to unregister file, too many free slots".to_string(),
|
||||
));
|
||||
}
|
||||
self.ring
|
||||
.ring
|
||||
.submitter()
|
||||
.register_files_update(id, &[-1])?;
|
||||
self.free_files.push_back(id);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -129,7 +144,7 @@ impl WrappedIOUring {
|
||||
|
||||
fn get_key(&mut self) -> u64 {
|
||||
self.key += 1;
|
||||
if self.key == MAX_IOVECS as u64 {
|
||||
if self.key == ENTRIES as u64 {
|
||||
let key = self.key;
|
||||
self.key = 0;
|
||||
return key;
|
||||
@@ -159,9 +174,11 @@ impl IO for UringIO {
|
||||
Err(error) => debug!("Error {error:?} returned when setting O_DIRECT flag to read file. The performance of the system may be affected"),
|
||||
}
|
||||
}
|
||||
let id = self.inner.borrow_mut().register_file(file.as_raw_fd())?;
|
||||
let uring_file = Arc::new(UringFile {
|
||||
io: self.inner.clone(),
|
||||
file,
|
||||
id,
|
||||
});
|
||||
if std::env::var(common::ENV_DISABLE_FILE_LOCK).is_err() {
|
||||
uring_file.lock_file(!flags.contains(OpenFlags::ReadOnly))?;
|
||||
@@ -229,6 +246,7 @@ impl Clock for UringIO {
|
||||
pub struct UringFile {
|
||||
io: Rc<RefCell<InnerUringIO>>,
|
||||
file: std::fs::File,
|
||||
id: u32,
|
||||
}
|
||||
|
||||
unsafe impl Send for UringFile {}
|
||||
@@ -275,14 +293,12 @@ impl File for UringFile {
|
||||
fn pread(&self, pos: usize, c: Completion) -> Result<Arc<Completion>> {
|
||||
let r = c.as_read();
|
||||
trace!("pread(pos = {}, length = {})", pos, r.buf().len());
|
||||
let fd = io_uring::types::Fd(self.file.as_raw_fd());
|
||||
let mut io = self.io.borrow_mut();
|
||||
let read_e = {
|
||||
let mut buf = r.buf_mut();
|
||||
let len = buf.len();
|
||||
let buf = buf.as_mut_ptr();
|
||||
let iovec = io.get_iovec(buf, len);
|
||||
io_uring::opcode::Readv::new(fd, iovec as *const iovec as *const libc::iovec, 1)
|
||||
io_uring::opcode::Read::new(io_uring::types::Fixed(self.id), buf, len as u32)
|
||||
.offset(pos as u64)
|
||||
.build()
|
||||
.user_data(io.ring.get_key())
|
||||
@@ -299,15 +315,17 @@ impl File for UringFile {
|
||||
c: Completion,
|
||||
) -> Result<Arc<Completion>> {
|
||||
let mut io = self.io.borrow_mut();
|
||||
let fd = io_uring::types::Fd(self.file.as_raw_fd());
|
||||
let write = {
|
||||
let buf = buffer.borrow();
|
||||
trace!("pwrite(pos = {}, length = {})", pos, buf.len());
|
||||
let iovec = io.get_iovec(buf.as_ptr(), buf.len());
|
||||
io_uring::opcode::Writev::new(fd, iovec as *const iovec as *const libc::iovec, 1)
|
||||
.offset(pos as u64)
|
||||
.build()
|
||||
.user_data(io.ring.get_key())
|
||||
io_uring::opcode::Write::new(
|
||||
io_uring::types::Fixed(self.id),
|
||||
buf.as_ptr(),
|
||||
buf.len() as u32,
|
||||
)
|
||||
.offset(pos as u64)
|
||||
.build()
|
||||
.user_data(io.ring.get_key())
|
||||
};
|
||||
let c = Arc::new(c);
|
||||
let c_uring = c.clone();
|
||||
@@ -325,10 +343,9 @@ impl File for UringFile {
|
||||
}
|
||||
|
||||
fn sync(&self, c: Completion) -> Result<Arc<Completion>> {
|
||||
let fd = io_uring::types::Fd(self.file.as_raw_fd());
|
||||
let mut io = self.io.borrow_mut();
|
||||
trace!("sync()");
|
||||
let sync = io_uring::opcode::Fsync::new(fd)
|
||||
let sync = io_uring::opcode::Fsync::new(io_uring::types::Fixed(self.id))
|
||||
.build()
|
||||
.user_data(io.ring.get_key());
|
||||
let c = Arc::new(c);
|
||||
@@ -344,6 +361,13 @@ impl File for UringFile {
|
||||
impl Drop for UringFile {
|
||||
fn drop(&mut self) {
|
||||
self.unlock_file().expect("Failed to unlock file");
|
||||
self.io
|
||||
.borrow_mut()
|
||||
.unregister_file(self.id)
|
||||
.inspect_err(|e| {
|
||||
debug!("Failed to unregister file: {e}");
|
||||
})
|
||||
.ok();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user