mirror of
https://github.com/aljazceru/turso.git
synced 2025-12-27 13:04:20 +01:00
Yield is a completion that does not allocate any inner state. By design it is completed from the start and has no errors. This allows lightly yield without allocating any locks nor heap allocate inner state.
941 lines
32 KiB
Rust
941 lines
32 KiB
Rust
#![allow(clippy::arc_with_non_send_sync)]
|
|
|
|
use super::{common, Completion, CompletionInner, File, OpenFlags, IO};
|
|
use crate::io::clock::{Clock, Instant};
|
|
use crate::storage::wal::CKPT_BATCH_PAGES;
|
|
use crate::{turso_assert, CompletionError, LimboError, Result};
|
|
use parking_lot::Mutex;
|
|
use rustix::fs::{self, FlockOperation, OFlags};
|
|
use std::ptr::NonNull;
|
|
use std::sync::atomic::{AtomicBool, Ordering};
|
|
use std::{
|
|
collections::{HashMap, VecDeque},
|
|
io::ErrorKind,
|
|
ops::Deref,
|
|
os::{fd::AsFd, unix::io::AsRawFd},
|
|
sync::Arc,
|
|
};
|
|
use tracing::{debug, trace};
|
|
|
|
/// Size of the io_uring submission and completion queues
|
|
const ENTRIES: u32 = 512;
|
|
|
|
/// Idle timeout for the sqpoll kernel thread before it needs
|
|
/// to be woken back up by a call IORING_ENTER_SQ_WAKEUP flag.
|
|
/// (handled by the io_uring crate in `submit_and_wait`)
|
|
const SQPOLL_IDLE: u32 = 1000;
|
|
|
|
/// Number of file descriptors we preallocate for io_uring.
|
|
/// NOTE: we may need to increase this when `attach` is fully implemented.
|
|
const FILES: u32 = 8;
|
|
|
|
/// Number of Vec<Box<[iovec]>> we preallocate on initialization
|
|
const IOVEC_POOL_SIZE: usize = 64;
|
|
|
|
/// Maximum number of iovec entries per writev operation.
|
|
/// IOV_MAX is typically 1024
|
|
const MAX_IOVEC_ENTRIES: usize = CKPT_BATCH_PAGES;
|
|
|
|
/// Maximum number of I/O operations to wait for in a single run,
|
|
/// waiting for > 1 can reduce the amount of `io_uring_enter` syscalls we
|
|
/// make, but can increase single operation latency.
|
|
const MAX_WAIT: usize = 4;
|
|
|
|
/// One memory arena for DB pages and another for WAL frames
|
|
const ARENA_COUNT: usize = 2;
|
|
|
|
/// Arbitrary non-zero user_data for barrier operation when handling a partial writev
|
|
/// writing a commit frame.
|
|
const BARRIER_USER_DATA: u64 = 1;
|
|
|
|
/// user_data tag for cancellation operations
|
|
const CANCEL_TAG: u64 = 1;
|
|
|
|
pub struct UringIO {
|
|
inner: Arc<Mutex<InnerUringIO>>,
|
|
}
|
|
|
|
unsafe impl Send for UringIO {}
|
|
unsafe impl Sync for UringIO {}
|
|
|
|
struct WrappedIOUring {
|
|
ring: io_uring::IoUring,
|
|
pending_ops: usize,
|
|
writev_states: HashMap<u64, WritevState>,
|
|
overflow: VecDeque<io_uring::squeue::Entry>,
|
|
iov_pool: IovecPool,
|
|
pending_link: AtomicBool,
|
|
}
|
|
|
|
struct InnerUringIO {
|
|
ring: WrappedIOUring,
|
|
free_files: VecDeque<u32>,
|
|
free_arenas: [Option<(NonNull<u8>, usize)>; ARENA_COUNT],
|
|
}
|
|
|
|
/// preallocated vec of iovec arrays to avoid allocations during writev operations
|
|
struct IovecPool {
|
|
pool: Vec<Box<[libc::iovec; MAX_IOVEC_ENTRIES]>>,
|
|
}
|
|
|
|
impl IovecPool {
|
|
fn new() -> Self {
|
|
let pool = (0..IOVEC_POOL_SIZE)
|
|
.map(|_| {
|
|
Box::new(
|
|
[libc::iovec {
|
|
iov_base: std::ptr::null_mut(),
|
|
iov_len: 0,
|
|
}; MAX_IOVEC_ENTRIES],
|
|
)
|
|
})
|
|
.collect();
|
|
Self { pool }
|
|
}
|
|
|
|
#[inline(always)]
|
|
fn acquire(&mut self) -> Option<Box<[libc::iovec; MAX_IOVEC_ENTRIES]>> {
|
|
self.pool.pop()
|
|
}
|
|
|
|
#[inline(always)]
|
|
fn release(&mut self, iovec: Box<[libc::iovec; MAX_IOVEC_ENTRIES]>) {
|
|
if self.pool.len() < IOVEC_POOL_SIZE {
|
|
self.pool.push(iovec);
|
|
}
|
|
}
|
|
}
|
|
|
|
impl UringIO {
|
|
pub fn new() -> Result<Self> {
|
|
let ring = match io_uring::IoUring::builder()
|
|
.setup_single_issuer()
|
|
.setup_coop_taskrun()
|
|
.setup_sqpoll(SQPOLL_IDLE)
|
|
.build(ENTRIES)
|
|
{
|
|
Ok(ring) => ring,
|
|
Err(_) => io_uring::IoUring::new(ENTRIES)?,
|
|
};
|
|
// we only ever have 2 files open at a time for the moment
|
|
ring.submitter().register_files_sparse(FILES)?;
|
|
// RL_MEMLOCK cap is typically 8MB, the current design is to have one large arena
|
|
// registered at startup and therefore we can simply use the zero index, falling back
|
|
// to similar logic as the existing buffer pool for cases where it is over capacity.
|
|
ring.submitter()
|
|
.register_buffers_sparse(ARENA_COUNT as u32)?;
|
|
let inner = InnerUringIO {
|
|
ring: WrappedIOUring {
|
|
ring,
|
|
overflow: VecDeque::new(),
|
|
pending_ops: 0,
|
|
writev_states: HashMap::new(),
|
|
iov_pool: IovecPool::new(),
|
|
pending_link: AtomicBool::new(false),
|
|
},
|
|
free_files: (0..FILES).collect(),
|
|
free_arenas: [const { None }; ARENA_COUNT],
|
|
};
|
|
debug!("Using IO backend 'io-uring'");
|
|
Ok(Self {
|
|
inner: Arc::new(Mutex::new(inner)),
|
|
})
|
|
}
|
|
}
|
|
|
|
/// io_uring crate decides not to export their `UseFixed` trait, so we
|
|
/// are forced to use a macro here to handle either fixed or raw file descriptors.
|
|
macro_rules! with_fd {
|
|
($file:expr, |$fd:ident| $body:expr) => {
|
|
match $file.id() {
|
|
Some(id) => {
|
|
let $fd = io_uring::types::Fixed(id);
|
|
$body
|
|
}
|
|
None => {
|
|
let $fd = io_uring::types::Fd($file.as_raw_fd());
|
|
$body
|
|
}
|
|
}
|
|
};
|
|
}
|
|
|
|
/// wrapper type to represent a possibly registered file descriptor,
|
|
/// only used in WritevState, and piggy-backs on the available methods from
|
|
/// `UringFile`, so we don't have to store the file on `WritevState`.
|
|
#[derive(Clone)]
|
|
enum Fd {
|
|
Fixed(u32),
|
|
RawFd(i32),
|
|
}
|
|
|
|
impl Fd {
|
|
/// to match the behavior of the File, we need to implement the same methods
|
|
fn id(&self) -> Option<u32> {
|
|
match self {
|
|
Fd::Fixed(id) => Some(*id),
|
|
Fd::RawFd(_) => None,
|
|
}
|
|
}
|
|
/// ONLY to be called by the macro, in the case where id() is None
|
|
fn as_raw_fd(&self) -> i32 {
|
|
match self {
|
|
Fd::RawFd(fd) => *fd,
|
|
_ => panic!("Cannot call as_raw_fd on a Fixed Fd"),
|
|
}
|
|
}
|
|
}
|
|
|
|
/// State to track an ongoing writev operation in
|
|
/// the case of a partial write.
|
|
struct WritevState {
|
|
/// File descriptor/id of the file we are writing to
|
|
file_id: Fd,
|
|
/// absolute file offset for next submit
|
|
file_pos: u64,
|
|
/// current buffer index in `bufs`
|
|
current_buffer_idx: usize,
|
|
/// intra-buffer offset
|
|
current_buffer_offset: usize,
|
|
/// total bytes written so far
|
|
total_written: usize,
|
|
/// cache the sum of all buffer lengths for the total expected write
|
|
total_len: usize,
|
|
/// buffers to write
|
|
bufs: Vec<Arc<crate::Buffer>>,
|
|
/// we keep the last iovec allocation alive until final CQE
|
|
last_iov_allocation: Option<Box<[libc::iovec; MAX_IOVEC_ENTRIES]>>,
|
|
had_partial: bool,
|
|
linked_op: bool,
|
|
}
|
|
|
|
impl WritevState {
|
|
fn new(file: &UringFile, pos: u64, linked: bool, bufs: Vec<Arc<crate::Buffer>>) -> Self {
|
|
let file_id = file
|
|
.id()
|
|
.map(Fd::Fixed)
|
|
.unwrap_or_else(|| Fd::RawFd(file.as_raw_fd()));
|
|
let total_len = bufs.iter().map(|b| b.len()).sum();
|
|
Self {
|
|
file_id,
|
|
file_pos: pos,
|
|
current_buffer_idx: 0,
|
|
current_buffer_offset: 0,
|
|
total_written: 0,
|
|
bufs,
|
|
last_iov_allocation: None,
|
|
total_len,
|
|
had_partial: false,
|
|
linked_op: linked,
|
|
}
|
|
}
|
|
|
|
#[inline(always)]
|
|
fn remaining(&self) -> usize {
|
|
self.total_len - self.total_written
|
|
}
|
|
|
|
/// Advance (idx, off, pos) after written bytes
|
|
#[inline(always)]
|
|
fn advance(&mut self, written: u64) {
|
|
let mut remaining = written;
|
|
while remaining > 0 {
|
|
let current_buf_len = self.bufs[self.current_buffer_idx].len();
|
|
let left = current_buf_len - self.current_buffer_offset;
|
|
if remaining < left as u64 {
|
|
self.current_buffer_offset += remaining as usize;
|
|
self.file_pos += remaining;
|
|
remaining = 0;
|
|
} else {
|
|
remaining -= left as u64;
|
|
self.file_pos += left as u64;
|
|
self.current_buffer_idx += 1;
|
|
self.current_buffer_offset = 0;
|
|
}
|
|
}
|
|
self.total_written += written as usize;
|
|
}
|
|
|
|
#[inline(always)]
|
|
/// Free the allocation that keeps the iovec array alive while writev is ongoing
|
|
fn free_last_iov(&mut self, pool: &mut IovecPool) {
|
|
if let Some(allocation) = self.last_iov_allocation.take() {
|
|
pool.release(allocation);
|
|
}
|
|
}
|
|
}
|
|
|
|
impl InnerUringIO {
|
|
fn register_file(&mut self, fd: i32) -> Result<u32> {
|
|
if let Some(slot) = self.free_files.pop_front() {
|
|
self.ring
|
|
.ring
|
|
.submitter()
|
|
.register_files_update(slot, &[fd.as_raw_fd()])?;
|
|
return Ok(slot);
|
|
}
|
|
Err(crate::error::CompletionError::UringIOError(
|
|
"unable to register file, no free slots available",
|
|
)
|
|
.into())
|
|
}
|
|
fn unregister_file(&mut self, id: u32) -> Result<()> {
|
|
self.ring
|
|
.ring
|
|
.submitter()
|
|
.register_files_update(id, &[-1])?;
|
|
self.free_files.push_back(id);
|
|
Ok(())
|
|
}
|
|
|
|
#[cfg(debug_assertions)]
|
|
fn debug_check_fixed(&self, idx: u32, ptr: *const u8, len: usize) {
|
|
let (base, blen) = self.free_arenas[idx as usize].expect("slot not registered");
|
|
let start = base.as_ptr() as usize;
|
|
let end = start + blen;
|
|
let p = ptr as usize;
|
|
turso_assert!(
|
|
p >= start && p + len <= end,
|
|
"Fixed operation, pointer out of registered range"
|
|
);
|
|
}
|
|
}
|
|
|
|
impl WrappedIOUring {
|
|
fn submit_entry(&mut self, entry: &io_uring::squeue::Entry) {
|
|
trace!("submit_entry({:?})", entry);
|
|
// we cannot push current entries before any overflow
|
|
if self.flush_overflow().is_ok() {
|
|
let pushed = unsafe {
|
|
let mut sub = self.ring.submission();
|
|
sub.push(entry).is_ok()
|
|
};
|
|
if pushed {
|
|
self.pending_ops += 1;
|
|
return;
|
|
}
|
|
}
|
|
// if we were unable to push, add to overflow
|
|
self.overflow.push_back(entry.clone());
|
|
self.ring.submit().expect("submiting when full");
|
|
}
|
|
|
|
fn submit_cancel_urgent(&mut self, entry: &io_uring::squeue::Entry) -> Result<()> {
|
|
let pushed = unsafe { self.ring.submission().push(entry).is_ok() };
|
|
if pushed {
|
|
self.pending_ops += 1;
|
|
return Ok(());
|
|
}
|
|
// place cancel op at the front, if overflowed
|
|
self.overflow.push_front(entry.clone());
|
|
self.ring.submit()?;
|
|
Ok(())
|
|
}
|
|
|
|
/// Flush overflow entries to submission queue when possible
|
|
fn flush_overflow(&mut self) -> Result<()> {
|
|
while !self.overflow.is_empty() {
|
|
let sub_len = self.ring.submission().len();
|
|
// safe subtraction as submission len will always be < ENTRIES
|
|
let available_space = ENTRIES as usize - sub_len;
|
|
if available_space == 0 {
|
|
// No space available, always return error if we dont flush all overflow entries
|
|
// to prevent out of order I/O operations
|
|
return Err(crate::error::CompletionError::UringIOError("squeue full").into());
|
|
}
|
|
// Push as many as we can
|
|
let to_push = std::cmp::min(available_space, self.overflow.len());
|
|
unsafe {
|
|
let mut sq = self.ring.submission();
|
|
for _ in 0..to_push {
|
|
let entry = self.overflow.pop_front().unwrap();
|
|
if sq.push(&entry).is_err() {
|
|
// Unexpected failure, put it back
|
|
self.overflow.push_front(entry);
|
|
// No space available, always return error if we dont flush all overflow entries
|
|
// to prevent out of order I/O operations
|
|
return Err(
|
|
crate::error::CompletionError::UringIOError("squeue full").into()
|
|
);
|
|
}
|
|
self.pending_ops += 1;
|
|
}
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
fn submit_and_wait(&mut self) -> Result<()> {
|
|
if self.empty() {
|
|
return Ok(());
|
|
}
|
|
let wants = std::cmp::min(self.pending_ops, MAX_WAIT);
|
|
tracing::trace!("submit_and_wait for {wants} pending operations to complete");
|
|
self.ring.submit_and_wait(wants)?;
|
|
Ok(())
|
|
}
|
|
|
|
fn empty(&self) -> bool {
|
|
self.pending_ops == 0
|
|
}
|
|
|
|
/// Submit or resubmit a writev operation
|
|
fn submit_writev(&mut self, key: u64, mut st: WritevState, continue_chain: bool) {
|
|
st.free_last_iov(&mut self.iov_pool);
|
|
let mut iov_allocation = self.iov_pool.acquire().unwrap_or_else(|| {
|
|
// Fallback: allocate a new one if pool is exhausted
|
|
Box::new(
|
|
[libc::iovec {
|
|
iov_base: std::ptr::null_mut(),
|
|
iov_len: 0,
|
|
}; MAX_IOVEC_ENTRIES],
|
|
)
|
|
});
|
|
let mut iov_count = 0;
|
|
let mut last_end: Option<(*const u8, usize)> = None;
|
|
for buffer in st.bufs.iter().skip(st.current_buffer_idx) {
|
|
let ptr = buffer.as_ptr();
|
|
let len = buffer.len();
|
|
if let Some((last_ptr, last_len)) = last_end {
|
|
// Check if this buffer is adjacent to the last
|
|
if unsafe { last_ptr.add(last_len) } == ptr {
|
|
// Extend the last iovec instead of adding new
|
|
iov_allocation[iov_count - 1].iov_len += len;
|
|
last_end = Some((last_ptr, last_len + len));
|
|
continue;
|
|
}
|
|
}
|
|
// Add new iovec
|
|
iov_allocation[iov_count] = libc::iovec {
|
|
iov_base: ptr as *mut _,
|
|
iov_len: len,
|
|
};
|
|
last_end = Some((ptr, len));
|
|
iov_count += 1;
|
|
if iov_count >= MAX_IOVEC_ENTRIES {
|
|
break;
|
|
}
|
|
}
|
|
// If we have coalesced everything into a single iovec, submit as a single`pwrite`
|
|
if iov_count == 1 {
|
|
let mut entry = with_fd!(st.file_id, |fd| {
|
|
if let Some(id) = st.bufs[st.current_buffer_idx].fixed_id() {
|
|
io_uring::opcode::WriteFixed::new(
|
|
fd,
|
|
iov_allocation[0].iov_base as *const u8,
|
|
iov_allocation[0].iov_len as u32,
|
|
id as u16,
|
|
)
|
|
.offset(st.file_pos)
|
|
.build()
|
|
.user_data(key)
|
|
} else {
|
|
io_uring::opcode::Write::new(
|
|
fd,
|
|
iov_allocation[0].iov_base as *const u8,
|
|
iov_allocation[0].iov_len as u32,
|
|
)
|
|
.offset(st.file_pos)
|
|
.build()
|
|
.user_data(key)
|
|
}
|
|
});
|
|
|
|
if st.linked_op && !st.had_partial {
|
|
// Starting a new link chain
|
|
entry = entry.flags(io_uring::squeue::Flags::IO_LINK);
|
|
self.pending_link.store(true, Ordering::Release);
|
|
} else if continue_chain && !st.had_partial {
|
|
// Continue existing chain
|
|
entry = entry.flags(io_uring::squeue::Flags::IO_LINK);
|
|
}
|
|
|
|
self.submit_entry(&entry);
|
|
return;
|
|
}
|
|
|
|
// Store the pointers and get the pointer to the iovec array that we pass
|
|
// to the writev operation, and keep the array itself alive
|
|
let ptr = iov_allocation.as_ptr() as *mut libc::iovec;
|
|
st.last_iov_allocation = Some(iov_allocation);
|
|
|
|
let mut entry = with_fd!(st.file_id, |fd| {
|
|
io_uring::opcode::Writev::new(fd, ptr, iov_count as u32)
|
|
.offset(st.file_pos)
|
|
.build()
|
|
.user_data(key)
|
|
});
|
|
if st.linked_op {
|
|
entry = entry.flags(io_uring::squeue::Flags::IO_LINK);
|
|
}
|
|
// track the current state in case we get a partial write
|
|
self.writev_states.insert(key, st);
|
|
self.submit_entry(&entry);
|
|
}
|
|
|
|
fn handle_writev_completion(&mut self, mut state: WritevState, user_data: u64, result: i32) {
|
|
if result < 0 {
|
|
let err = std::io::Error::from_raw_os_error(result);
|
|
tracing::error!("writev failed (user_data: {}): {}", user_data, err);
|
|
state.free_last_iov(&mut self.iov_pool);
|
|
completion_from_key(user_data).complete(result);
|
|
return;
|
|
}
|
|
|
|
let written = result;
|
|
|
|
// guard against no-progress loop
|
|
if written == 0 && state.remaining() > 0 {
|
|
state.free_last_iov(&mut self.iov_pool);
|
|
completion_from_key(user_data).error(CompletionError::ShortWrite);
|
|
return;
|
|
}
|
|
state.advance(written as u64);
|
|
|
|
match state.remaining() {
|
|
0 => {
|
|
tracing::debug!(
|
|
"writev operation completed: wrote {} bytes",
|
|
state.total_written
|
|
);
|
|
// write complete, return iovec to pool
|
|
state.free_last_iov(&mut self.iov_pool);
|
|
if state.linked_op && state.had_partial {
|
|
// if it was a linked operation, we need to submit a fsync after this writev
|
|
// to ensure data is on disk
|
|
self.ring.submit().expect("submit after writev");
|
|
let file_id = state.file_id;
|
|
let sync = with_fd!(file_id, |fd| {
|
|
io_uring::opcode::Fsync::new(fd)
|
|
.build()
|
|
.user_data(BARRIER_USER_DATA)
|
|
})
|
|
.flags(io_uring::squeue::Flags::IO_DRAIN);
|
|
self.submit_entry(&sync);
|
|
}
|
|
completion_from_key(user_data).complete(state.total_written as i32);
|
|
}
|
|
remaining => {
|
|
tracing::trace!(
|
|
"resubmitting writev operation for user_data {}: wrote {} bytes, remaining {}",
|
|
user_data,
|
|
written,
|
|
remaining
|
|
);
|
|
// make sure partial write is recorded, because fsync could happen after this
|
|
// and we are not finished writing to disk
|
|
state.had_partial = true;
|
|
self.submit_writev(user_data, state, false);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
impl IO for UringIO {
|
|
fn open_file(&self, path: &str, flags: OpenFlags, direct: bool) -> Result<Arc<dyn File>> {
|
|
trace!("open_file(path = {})", path);
|
|
let mut file = std::fs::File::options();
|
|
file.read(true);
|
|
|
|
if !flags.contains(OpenFlags::ReadOnly) {
|
|
file.write(true);
|
|
file.create(flags.contains(OpenFlags::Create));
|
|
}
|
|
|
|
let file = file.open(path)?;
|
|
// Let's attempt to enable direct I/O. Not all filesystems support it
|
|
// so ignore any errors.
|
|
let fd = file.as_fd();
|
|
if direct {
|
|
match fs::fcntl_setfl(fd, OFlags::DIRECT) {
|
|
Ok(_) => {}
|
|
Err(error) => debug!("Error {error:?} returned when setting O_DIRECT flag to read file. The performance of the system may be affected"),
|
|
}
|
|
}
|
|
let id = self.inner.lock().register_file(file.as_raw_fd()).ok();
|
|
let uring_file = Arc::new(UringFile {
|
|
io: self.inner.clone(),
|
|
file,
|
|
id,
|
|
});
|
|
if std::env::var(common::ENV_DISABLE_FILE_LOCK).is_err() {
|
|
uring_file.lock_file(!flags.contains(OpenFlags::ReadOnly))?;
|
|
}
|
|
Ok(uring_file)
|
|
}
|
|
|
|
fn remove_file(&self, path: &str) -> Result<()> {
|
|
std::fs::remove_file(path)?;
|
|
Ok(())
|
|
}
|
|
|
|
/// Drain calls `run_once` in a loop until the ring is empty.
|
|
/// To prevent mutex churn of checking if ring.empty() on each iteration, we violate DRY
|
|
fn drain(&self) -> Result<()> {
|
|
trace!("drain()");
|
|
let mut inner = self.inner.lock();
|
|
let ring = &mut inner.ring;
|
|
loop {
|
|
ring.flush_overflow()?;
|
|
if ring.empty() {
|
|
return Ok(());
|
|
}
|
|
ring.submit_and_wait()?;
|
|
'inner: loop {
|
|
let Some(cqe) = ring.ring.completion().next() else {
|
|
break 'inner;
|
|
};
|
|
ring.pending_ops -= 1;
|
|
let user_data = cqe.user_data();
|
|
if user_data == CANCEL_TAG {
|
|
// ignore if this is a cancellation CQE
|
|
continue 'inner;
|
|
}
|
|
let result = cqe.result();
|
|
turso_assert!(
|
|
user_data != 0,
|
|
"user_data must not be zero, we dont submit linked timeouts that would cause this"
|
|
);
|
|
if let Some(state) = ring.writev_states.remove(&user_data) {
|
|
// if we have ongoing writev state, handle it separately and don't call completion
|
|
ring.handle_writev_completion(state, user_data, result);
|
|
continue 'inner;
|
|
}
|
|
if result < 0 {
|
|
let errno = -result;
|
|
let err = std::io::Error::from_raw_os_error(errno);
|
|
completion_from_key(user_data).error(err.into());
|
|
} else {
|
|
completion_from_key(user_data).complete(result)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
fn cancel(&self, completions: &[Completion]) -> Result<()> {
|
|
let mut inner = self.inner.lock();
|
|
for c in completions {
|
|
c.abort();
|
|
let e = io_uring::opcode::AsyncCancel::new(get_key(c.clone()))
|
|
.build()
|
|
.user_data(CANCEL_TAG);
|
|
inner.ring.submit_cancel_urgent(&e)?;
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
fn step(&self) -> Result<()> {
|
|
trace!("step()");
|
|
let mut inner = self.inner.lock();
|
|
let ring = &mut inner.ring;
|
|
ring.flush_overflow()?;
|
|
if ring.empty() {
|
|
return Ok(());
|
|
}
|
|
ring.submit_and_wait()?;
|
|
loop {
|
|
let Some(cqe) = ring.ring.completion().next() else {
|
|
return Ok(());
|
|
};
|
|
ring.pending_ops -= 1;
|
|
let user_data = cqe.user_data();
|
|
if user_data == CANCEL_TAG {
|
|
// ignore if this is a cancellation CQE
|
|
continue;
|
|
}
|
|
let result = cqe.result();
|
|
turso_assert!(
|
|
user_data != 0,
|
|
"user_data must not be zero, we dont submit linked timeouts that would cause this"
|
|
);
|
|
if let Some(state) = ring.writev_states.remove(&user_data) {
|
|
// if we have ongoing writev state, handle it separately and don't call completion
|
|
ring.handle_writev_completion(state, user_data, result);
|
|
continue;
|
|
} else if user_data == BARRIER_USER_DATA {
|
|
// barrier operation, no completion to call
|
|
if result < 0 {
|
|
let err = std::io::Error::from_raw_os_error(result);
|
|
tracing::error!("barrier operation failed: {}", err);
|
|
return Err(err.into());
|
|
}
|
|
continue;
|
|
}
|
|
if result < 0 {
|
|
let errno = -result;
|
|
let err = std::io::Error::from_raw_os_error(errno);
|
|
completion_from_key(user_data).error(err.into());
|
|
} else {
|
|
completion_from_key(user_data).complete(result)
|
|
}
|
|
}
|
|
}
|
|
|
|
fn register_fixed_buffer(&self, ptr: std::ptr::NonNull<u8>, len: usize) -> Result<u32> {
|
|
turso_assert!(
|
|
len % 512 == 0,
|
|
"fixed buffer length must be logical block aligned"
|
|
);
|
|
let mut inner = self.inner.lock();
|
|
let slot = inner.free_arenas.iter().position(|e| e.is_none()).ok_or(
|
|
crate::error::CompletionError::UringIOError("no free fixed buffer slots"),
|
|
)?;
|
|
unsafe {
|
|
inner.ring.ring.submitter().register_buffers_update(
|
|
slot as u32,
|
|
&[libc::iovec {
|
|
iov_base: ptr.as_ptr() as *mut libc::c_void,
|
|
iov_len: len,
|
|
}],
|
|
None,
|
|
)?
|
|
};
|
|
inner.free_arenas[slot] = Some((ptr, len));
|
|
Ok(slot as u32)
|
|
}
|
|
}
|
|
|
|
impl Clock for UringIO {
|
|
fn now(&self) -> Instant {
|
|
let now = chrono::Local::now();
|
|
Instant {
|
|
secs: now.timestamp(),
|
|
micros: now.timestamp_subsec_micros(),
|
|
}
|
|
}
|
|
}
|
|
|
|
#[inline(always)]
|
|
/// use the callback pointer as the user_data for the operation as is
|
|
/// common practice for io_uring to prevent more indirection
|
|
fn get_key(c: Completion) -> u64 {
|
|
Arc::into_raw(c.get_inner().clone()) as u64
|
|
}
|
|
|
|
#[inline(always)]
|
|
/// convert the user_data back to an Completion pointer
|
|
fn completion_from_key(key: u64) -> Completion {
|
|
let c_inner = unsafe { Arc::from_raw(key as *const CompletionInner) };
|
|
Completion {
|
|
inner: Some(c_inner),
|
|
}
|
|
}
|
|
|
|
pub struct UringFile {
|
|
io: Arc<Mutex<InnerUringIO>>,
|
|
file: std::fs::File,
|
|
id: Option<u32>,
|
|
}
|
|
|
|
impl Deref for UringFile {
|
|
type Target = std::fs::File;
|
|
fn deref(&self) -> &Self::Target {
|
|
&self.file
|
|
}
|
|
}
|
|
|
|
impl UringFile {
|
|
fn id(&self) -> Option<u32> {
|
|
self.id
|
|
}
|
|
}
|
|
unsafe impl Send for UringFile {}
|
|
unsafe impl Sync for UringFile {}
|
|
|
|
impl File for UringFile {
|
|
fn lock_file(&self, exclusive: bool) -> Result<()> {
|
|
let fd = self.file.as_fd();
|
|
// F_SETLK is a non-blocking lock. The lock will be released when the file is closed
|
|
// or the process exits or after an explicit unlock.
|
|
fs::fcntl_lock(
|
|
fd,
|
|
if exclusive {
|
|
FlockOperation::NonBlockingLockExclusive
|
|
} else {
|
|
FlockOperation::NonBlockingLockShared
|
|
},
|
|
)
|
|
.map_err(|e| {
|
|
let io_error = std::io::Error::from(e);
|
|
let message = match io_error.kind() {
|
|
ErrorKind::WouldBlock => {
|
|
"Failed locking file. File is locked by another process".to_string()
|
|
}
|
|
_ => format!("Failed locking file, {io_error}"),
|
|
};
|
|
LimboError::LockingError(message)
|
|
})?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
fn unlock_file(&self) -> Result<()> {
|
|
let fd = self.file.as_fd();
|
|
fs::fcntl_lock(fd, FlockOperation::NonBlockingUnlock).map_err(|e| {
|
|
LimboError::LockingError(format!(
|
|
"Failed to release file lock: {}",
|
|
std::io::Error::from(e)
|
|
))
|
|
})?;
|
|
Ok(())
|
|
}
|
|
|
|
fn pread(&self, pos: u64, c: Completion) -> Result<Completion> {
|
|
let r = c.as_read();
|
|
let read_e = {
|
|
let buf = r.buf();
|
|
let ptr = buf.as_mut_ptr();
|
|
let len = buf.len();
|
|
with_fd!(self, |fd| {
|
|
if let Some(idx) = buf.fixed_id() {
|
|
trace!(
|
|
"pread_fixed(pos = {}, length = {}, idx = {})",
|
|
pos,
|
|
len,
|
|
idx
|
|
);
|
|
#[cfg(debug_assertions)]
|
|
{
|
|
self.io.lock().debug_check_fixed(idx, ptr, len);
|
|
}
|
|
io_uring::opcode::ReadFixed::new(fd, ptr, len as u32, idx as u16)
|
|
.offset(pos)
|
|
.build()
|
|
.user_data(get_key(c.clone()))
|
|
} else {
|
|
trace!("pread(pos = {}, length = {})", pos, len);
|
|
// Use Read opcode if fixed buffer is not available
|
|
io_uring::opcode::Read::new(fd, buf.as_mut_ptr(), len as u32)
|
|
.offset(pos)
|
|
.build()
|
|
.user_data(get_key(c.clone()))
|
|
}
|
|
})
|
|
};
|
|
self.io.lock().ring.submit_entry(&read_e);
|
|
Ok(c)
|
|
}
|
|
|
|
fn pwrite(&self, pos: u64, buffer: Arc<crate::Buffer>, c: Completion) -> Result<Completion> {
|
|
let mut io = self.io.lock();
|
|
let mut write = {
|
|
let ptr = buffer.as_ptr();
|
|
let len = buffer.len();
|
|
with_fd!(self, |fd| {
|
|
if let Some(idx) = buffer.fixed_id() {
|
|
trace!(
|
|
"pwrite_fixed(pos = {}, length = {}, idx= {})",
|
|
pos,
|
|
len,
|
|
idx
|
|
);
|
|
#[cfg(debug_assertions)]
|
|
{
|
|
io.debug_check_fixed(idx, ptr, len);
|
|
}
|
|
io_uring::opcode::WriteFixed::new(fd, ptr, len as u32, idx as u16)
|
|
.offset(pos)
|
|
.build()
|
|
.user_data(get_key(c.clone()))
|
|
} else {
|
|
trace!("pwrite(pos = {}, length = {})", pos, buffer.len());
|
|
io_uring::opcode::Write::new(fd, ptr, len as u32)
|
|
.offset(pos)
|
|
.build()
|
|
.user_data(get_key(c.clone()))
|
|
}
|
|
})
|
|
};
|
|
if c.needs_link() {
|
|
// Start a new link chain
|
|
write = write.flags(io_uring::squeue::Flags::IO_LINK);
|
|
io.ring.pending_link.store(true, Ordering::Release);
|
|
} else if io.ring.pending_link.load(Ordering::Acquire) {
|
|
// Continue existing link chain
|
|
write = write.flags(io_uring::squeue::Flags::IO_LINK);
|
|
}
|
|
|
|
io.ring.submit_entry(&write);
|
|
Ok(c)
|
|
}
|
|
|
|
fn sync(&self, c: Completion) -> Result<Completion> {
|
|
let mut io = self.io.lock();
|
|
trace!("sync()");
|
|
let sync = with_fd!(self, |fd| {
|
|
io_uring::opcode::Fsync::new(fd)
|
|
.build()
|
|
.user_data(get_key(c.clone()))
|
|
});
|
|
// sync always ends the chain of linked operations
|
|
io.ring.pending_link.store(false, Ordering::Release);
|
|
io.ring.submit_entry(&sync);
|
|
Ok(c)
|
|
}
|
|
|
|
fn pwritev(
|
|
&self,
|
|
pos: u64,
|
|
bufs: Vec<Arc<crate::Buffer>>,
|
|
c: Completion,
|
|
) -> Result<Completion> {
|
|
// for a single buffer use pwrite directly
|
|
if bufs.len().eq(&1) {
|
|
return self.pwrite(pos, bufs[0].clone(), c.clone());
|
|
}
|
|
let linked = c.needs_link();
|
|
tracing::trace!("pwritev(pos = {}, bufs.len() = {})", pos, bufs.len());
|
|
// create state to track ongoing writev operation
|
|
let state = WritevState::new(self, pos, linked, bufs);
|
|
let mut io = self.io.lock();
|
|
let continue_chain = !linked && io.ring.pending_link.load(Ordering::Acquire);
|
|
io.ring
|
|
.submit_writev(get_key(c.clone()), state, continue_chain);
|
|
Ok(c)
|
|
}
|
|
|
|
fn size(&self) -> Result<u64> {
|
|
Ok(self.file.metadata()?.len())
|
|
}
|
|
|
|
fn truncate(&self, len: u64, c: Completion) -> Result<Completion> {
|
|
let mut truncate = with_fd!(self, |fd| {
|
|
io_uring::opcode::Ftruncate::new(fd, len)
|
|
.build()
|
|
.user_data(get_key(c.clone()))
|
|
});
|
|
let mut io = self.io.lock();
|
|
if io.ring.pending_link.load(Ordering::Acquire) {
|
|
truncate = truncate.flags(io_uring::squeue::Flags::IO_LINK);
|
|
}
|
|
io.ring.submit_entry(&truncate);
|
|
Ok(c)
|
|
}
|
|
}
|
|
|
|
impl Drop for UringFile {
|
|
fn drop(&mut self) {
|
|
self.unlock_file().expect("Failed to unlock file");
|
|
if let Some(id) = self.id {
|
|
self.io
|
|
.lock()
|
|
.unregister_file(id)
|
|
.inspect_err(|e| {
|
|
debug!("Failed to unregister file: {e}");
|
|
})
|
|
.ok();
|
|
}
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use crate::io::common;
|
|
|
|
#[test]
|
|
fn test_multiple_processes_cannot_open_file() {
|
|
common::tests::test_multiple_processes_cannot_open_file(UringIO::new);
|
|
}
|
|
}
|