mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-08 02:34:20 +01:00
Merge 'core: Consolidate libc implementations' from Jorge López Tello
This shaves off `nix` as a dependency in core, which was only being used in the io_uring backend for the `fcntl` advisory record locking. Since a previous PR made `rustix` a dep not only for Mac but for any Unix, and `rustix` also has `fcntl`, `nix` becomes redundant. Furthermore, it reduces `libc` usage across core. The only remaining uses are: ```rust io_uring::opcode::Readv::new(fd, iovec as *const iovec as *const libc::iovec, 1) io_uring::opcode::Writev::new(fd, iovec as *const iovec as *const libc::iovec, 1) ``` These two are a little ugly, but sadly the `io_uring` crate requires both opcodes to take a `libc::iovec` while it doesn't export the type. See tokio-rs/io-uring#259 for a request to use `std::io::IoSlice` or to export the type directly. Apart from those two, there are no other usages of libc, so if those are resolved, we could also drop the dependency on libc. Closes #668
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -1217,7 +1217,6 @@ dependencies = [
|
||||
"miette",
|
||||
"mimalloc",
|
||||
"mockall",
|
||||
"nix 0.29.0",
|
||||
"pest",
|
||||
"pest_derive",
|
||||
"polling",
|
||||
|
||||
@@ -22,7 +22,7 @@ json = [
|
||||
"dep:pest_derive",
|
||||
]
|
||||
uuid = ["dep:uuid"]
|
||||
io_uring = ["dep:io-uring"]
|
||||
io_uring = ["dep:io-uring", "rustix/io_uring"]
|
||||
|
||||
[target.'cfg(target_os = "linux")'.dependencies]
|
||||
io-uring = { version = "0.6.1", optional = true }
|
||||
@@ -40,7 +40,6 @@ fallible-iterator = "0.3.0"
|
||||
hex = "0.4.3"
|
||||
libc = "0.2.155"
|
||||
log = "0.4.20"
|
||||
nix = { version = "0.29.0", features = ["fs"] }
|
||||
sieve-cache = "0.1.4"
|
||||
sqlite3-parser = { path = "../vendored/sqlite3-parser" }
|
||||
thiserror = "1.0.61"
|
||||
|
||||
@@ -1,16 +1,18 @@
|
||||
use super::{common, Completion, File, OpenFlags, IO};
|
||||
use crate::{LimboError, Result};
|
||||
use libc::{c_short, fcntl, flock, iovec, F_SETLK};
|
||||
use log::{debug, trace};
|
||||
use nix::fcntl::{FcntlArg, OFlag};
|
||||
use rustix::fs::{self, FlockOperation, OFlags};
|
||||
use rustix::io_uring::iovec;
|
||||
use std::cell::RefCell;
|
||||
use std::collections::HashMap;
|
||||
use std::fmt;
|
||||
use std::io::ErrorKind;
|
||||
use std::os::fd::AsFd;
|
||||
use std::os::unix::io::AsRawFd;
|
||||
use std::rc::Rc;
|
||||
use thiserror::Error;
|
||||
|
||||
const MAX_IOVECS: usize = 128;
|
||||
const MAX_IOVECS: u32 = 128;
|
||||
const SQPOLL_IDLE: u32 = 1000;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
@@ -44,7 +46,7 @@ struct WrappedIOUring {
|
||||
|
||||
struct InnerUringIO {
|
||||
ring: WrappedIOUring,
|
||||
iovecs: [iovec; MAX_IOVECS],
|
||||
iovecs: [iovec; MAX_IOVECS as usize],
|
||||
next_iovec: usize,
|
||||
}
|
||||
|
||||
@@ -52,10 +54,10 @@ impl UringIO {
|
||||
pub fn new() -> Result<Self> {
|
||||
let ring = match io_uring::IoUring::builder()
|
||||
.setup_sqpoll(SQPOLL_IDLE)
|
||||
.build(MAX_IOVECS as u32)
|
||||
.build(MAX_IOVECS)
|
||||
{
|
||||
Ok(ring) => ring,
|
||||
Err(_) => io_uring::IoUring::new(MAX_IOVECS as u32)?,
|
||||
Err(_) => io_uring::IoUring::new(MAX_IOVECS)?,
|
||||
};
|
||||
let inner = InnerUringIO {
|
||||
ring: WrappedIOUring {
|
||||
@@ -67,7 +69,7 @@ impl UringIO {
|
||||
iovecs: [iovec {
|
||||
iov_base: std::ptr::null_mut(),
|
||||
iov_len: 0,
|
||||
}; MAX_IOVECS],
|
||||
}; MAX_IOVECS as usize],
|
||||
next_iovec: 0,
|
||||
};
|
||||
debug!("Using IO backend 'io-uring'");
|
||||
@@ -82,14 +84,14 @@ impl InnerUringIO {
|
||||
let iovec = &mut self.iovecs[self.next_iovec];
|
||||
iovec.iov_base = buf as *mut std::ffi::c_void;
|
||||
iovec.iov_len = len;
|
||||
self.next_iovec = (self.next_iovec + 1) % MAX_IOVECS;
|
||||
self.next_iovec = (self.next_iovec + 1) % MAX_IOVECS as usize;
|
||||
iovec
|
||||
}
|
||||
}
|
||||
|
||||
impl WrappedIOUring {
|
||||
fn submit_entry(&mut self, entry: &io_uring::squeue::Entry, c: Rc<Completion>) {
|
||||
log::trace!("submit_entry({:?})", entry);
|
||||
trace!("submit_entry({:?})", entry);
|
||||
self.pending.insert(entry.get_user_data(), c);
|
||||
unsafe {
|
||||
self.ring
|
||||
@@ -109,7 +111,7 @@ impl WrappedIOUring {
|
||||
// NOTE: This works because CompletionQueue's next function pops the head of the queue. This is not normal behaviour of iterators
|
||||
let entry = self.ring.completion().next();
|
||||
if entry.is_some() {
|
||||
log::trace!("get_completion({:?})", entry);
|
||||
trace!("get_completion({:?})", entry);
|
||||
// consumed an entry from completion queue, update pending_ops
|
||||
self.pending_ops -= 1;
|
||||
}
|
||||
@@ -136,12 +138,12 @@ impl IO for UringIO {
|
||||
.open(path)?;
|
||||
// Let's attempt to enable direct I/O. Not all filesystems support it
|
||||
// so ignore any errors.
|
||||
let fd = file.as_raw_fd();
|
||||
let fd = file.as_fd();
|
||||
if direct {
|
||||
match nix::fcntl::fcntl(fd, FcntlArg::F_SETFL(OFlag::O_DIRECT)) {
|
||||
Ok(_) => {},
|
||||
match fs::fcntl_setfl(fd, OFlags::DIRECT) {
|
||||
Ok(_) => {}
|
||||
Err(error) => debug!("Error {error:?} returned when setting O_DIRECT flag to read file. The performance of the system may be affected"),
|
||||
};
|
||||
}
|
||||
}
|
||||
let uring_file = Rc::new(UringFile {
|
||||
io: self.inner.clone(),
|
||||
@@ -199,52 +201,39 @@ pub struct UringFile {
|
||||
|
||||
impl File for UringFile {
|
||||
fn lock_file(&self, exclusive: bool) -> Result<()> {
|
||||
let fd = self.file.as_raw_fd();
|
||||
let flock = flock {
|
||||
l_type: if exclusive {
|
||||
libc::F_WRLCK as c_short
|
||||
} else {
|
||||
libc::F_RDLCK as c_short
|
||||
},
|
||||
l_whence: libc::SEEK_SET as c_short,
|
||||
l_start: 0,
|
||||
l_len: 0, // Lock entire file
|
||||
l_pid: 0,
|
||||
};
|
||||
|
||||
let fd = self.file.as_fd();
|
||||
// F_SETLK is a non-blocking lock. The lock will be released when the file is closed
|
||||
// or the process exits or after an explicit unlock.
|
||||
let lock_result = unsafe { fcntl(fd, F_SETLK, &flock) };
|
||||
if lock_result == -1 {
|
||||
let err = std::io::Error::last_os_error();
|
||||
if err.kind() == std::io::ErrorKind::WouldBlock {
|
||||
return Err(LimboError::LockingError(
|
||||
"File is locked by another process".into(),
|
||||
));
|
||||
fs::fcntl_lock(
|
||||
fd,
|
||||
if exclusive {
|
||||
FlockOperation::LockExclusive
|
||||
} else {
|
||||
return Err(LimboError::IOError(err));
|
||||
}
|
||||
}
|
||||
FlockOperation::LockShared
|
||||
},
|
||||
)
|
||||
.map_err(|e| {
|
||||
let io_error = std::io::Error::from(e);
|
||||
let message = match io_error.kind() {
|
||||
ErrorKind::WouldBlock => {
|
||||
"Failed locking file. File is locked by another process".to_string()
|
||||
}
|
||||
_ => format!("Failed locking file, {}", io_error),
|
||||
};
|
||||
LimboError::LockingError(message)
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn unlock_file(&self) -> Result<()> {
|
||||
let fd = self.file.as_raw_fd();
|
||||
let flock = flock {
|
||||
l_type: libc::F_UNLCK as c_short,
|
||||
l_whence: libc::SEEK_SET as c_short,
|
||||
l_start: 0,
|
||||
l_len: 0,
|
||||
l_pid: 0,
|
||||
};
|
||||
|
||||
let unlock_result = unsafe { fcntl(fd, F_SETLK, &flock) };
|
||||
if unlock_result == -1 {
|
||||
return Err(LimboError::LockingError(format!(
|
||||
let fd = self.file.as_fd();
|
||||
fs::fcntl_lock(fd, FlockOperation::Unlock).map_err(|e| {
|
||||
LimboError::LockingError(format!(
|
||||
"Failed to release file lock: {}",
|
||||
std::io::Error::last_os_error()
|
||||
)));
|
||||
}
|
||||
std::io::Error::from(e)
|
||||
))
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -261,7 +250,7 @@ impl File for UringFile {
|
||||
let len = buf.len();
|
||||
let buf = buf.as_mut_ptr();
|
||||
let iovec = io.get_iovec(buf, len);
|
||||
io_uring::opcode::Readv::new(fd, iovec, 1)
|
||||
io_uring::opcode::Readv::new(fd, iovec as *const iovec as *const libc::iovec, 1)
|
||||
.offset(pos as u64)
|
||||
.build()
|
||||
.user_data(io.ring.get_key())
|
||||
@@ -282,7 +271,7 @@ impl File for UringFile {
|
||||
let buf = buffer.borrow();
|
||||
trace!("pwrite(pos = {}, length = {})", pos, buf.len());
|
||||
let iovec = io.get_iovec(buf.as_ptr(), buf.len());
|
||||
io_uring::opcode::Writev::new(fd, iovec, 1)
|
||||
io_uring::opcode::Writev::new(fd, iovec as *const iovec as *const libc::iovec, 1)
|
||||
.offset(pos as u64)
|
||||
.build()
|
||||
.user_data(io.ring.get_key())
|
||||
@@ -303,7 +292,7 @@ impl File for UringFile {
|
||||
}
|
||||
|
||||
fn size(&self) -> Result<u64> {
|
||||
Ok(self.file.metadata().unwrap().len())
|
||||
Ok(self.file.metadata()?.len())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
105
core/io/unix.rs
105
core/io/unix.rs
@@ -3,15 +3,16 @@ use crate::io::common;
|
||||
use crate::Result;
|
||||
|
||||
use super::{Completion, File, OpenFlags, IO};
|
||||
use libc::{c_short, fcntl, flock, F_SETLK};
|
||||
use log::{debug, trace};
|
||||
use polling::{Event, Events, Poller};
|
||||
use rustix::fd::{AsFd, AsRawFd};
|
||||
use rustix::fs::OpenOptionsExt;
|
||||
use rustix::io::Errno;
|
||||
use rustix::{
|
||||
fd::{AsFd, AsRawFd},
|
||||
fs::{self, FlockOperation, OFlags, OpenOptionsExt},
|
||||
io::Errno,
|
||||
};
|
||||
use std::cell::RefCell;
|
||||
use std::collections::HashMap;
|
||||
use std::io::{Read, Seek, Write};
|
||||
use std::io::{ErrorKind, Read, Seek, Write};
|
||||
use std::rc::Rc;
|
||||
|
||||
pub struct UnixIO {
|
||||
@@ -36,7 +37,7 @@ impl IO for UnixIO {
|
||||
trace!("open_file(path = {})", path);
|
||||
let file = std::fs::File::options()
|
||||
.read(true)
|
||||
.custom_flags(libc::O_NONBLOCK)
|
||||
.custom_flags(OFlags::NONBLOCK.bits() as i32)
|
||||
.write(true)
|
||||
.create(matches!(flags, OpenFlags::Create))
|
||||
.open(path)?;
|
||||
@@ -86,8 +87,8 @@ impl IO for UnixIO {
|
||||
}
|
||||
}
|
||||
};
|
||||
match result {
|
||||
std::result::Result::Ok(n) => {
|
||||
return match result {
|
||||
Ok(n) => {
|
||||
match &cf {
|
||||
CompletionCallback::Read(_, ref c, _) => {
|
||||
c.complete(0);
|
||||
@@ -96,12 +97,10 @@ impl IO for UnixIO {
|
||||
c.complete(n as i32);
|
||||
}
|
||||
}
|
||||
return Ok(());
|
||||
Ok(())
|
||||
}
|
||||
Err(e) => {
|
||||
return Err(e.into());
|
||||
}
|
||||
}
|
||||
Err(e) => Err(e.into()),
|
||||
};
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
@@ -130,61 +129,47 @@ enum CompletionCallback {
|
||||
|
||||
pub struct UnixFile {
|
||||
file: Rc<RefCell<std::fs::File>>,
|
||||
poller: Rc<RefCell<polling::Poller>>,
|
||||
poller: Rc<RefCell<Poller>>,
|
||||
callbacks: Rc<RefCell<HashMap<usize, CompletionCallback>>>,
|
||||
}
|
||||
|
||||
impl File for UnixFile {
|
||||
fn lock_file(&self, exclusive: bool) -> Result<()> {
|
||||
let fd = self.file.borrow().as_raw_fd();
|
||||
let flock = flock {
|
||||
l_type: if exclusive {
|
||||
libc::F_WRLCK as c_short
|
||||
} else {
|
||||
libc::F_RDLCK as c_short
|
||||
},
|
||||
l_whence: libc::SEEK_SET as c_short,
|
||||
l_start: 0,
|
||||
l_len: 0, // Lock entire file
|
||||
l_pid: 0,
|
||||
};
|
||||
|
||||
let fd = self.file.borrow();
|
||||
let fd = fd.as_fd();
|
||||
// F_SETLK is a non-blocking lock. The lock will be released when the file is closed
|
||||
// or the process exits or after an explicit unlock.
|
||||
let lock_result = unsafe { fcntl(fd, F_SETLK, &flock) };
|
||||
if lock_result == -1 {
|
||||
let err = std::io::Error::last_os_error();
|
||||
if err.kind() == std::io::ErrorKind::WouldBlock {
|
||||
return Err(LimboError::LockingError(
|
||||
"Failed locking file. File is locked by another process".to_string(),
|
||||
));
|
||||
fs::fcntl_lock(
|
||||
fd,
|
||||
if exclusive {
|
||||
FlockOperation::LockExclusive
|
||||
} else {
|
||||
return Err(LimboError::LockingError(format!(
|
||||
"Failed locking file, {}",
|
||||
err
|
||||
)));
|
||||
}
|
||||
}
|
||||
FlockOperation::LockShared
|
||||
},
|
||||
)
|
||||
.map_err(|e| {
|
||||
let io_error = std::io::Error::from(e);
|
||||
let message = match io_error.kind() {
|
||||
ErrorKind::WouldBlock => {
|
||||
"Failed locking file. File is locked by another process".to_string()
|
||||
}
|
||||
_ => format!("Failed locking file, {}", io_error),
|
||||
};
|
||||
LimboError::LockingError(message)
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn unlock_file(&self) -> Result<()> {
|
||||
let fd = self.file.borrow().as_raw_fd();
|
||||
let flock = flock {
|
||||
l_type: libc::F_UNLCK as c_short,
|
||||
l_whence: libc::SEEK_SET as c_short,
|
||||
l_start: 0,
|
||||
l_len: 0,
|
||||
l_pid: 0,
|
||||
};
|
||||
|
||||
let unlock_result = unsafe { fcntl(fd, F_SETLK, &flock) };
|
||||
if unlock_result == -1 {
|
||||
return Err(LimboError::LockingError(format!(
|
||||
let fd = self.file.borrow();
|
||||
let fd = fd.as_fd();
|
||||
fs::fcntl_lock(fd, FlockOperation::Unlock).map_err(|e| {
|
||||
LimboError::LockingError(format!(
|
||||
"Failed to release file lock: {}",
|
||||
std::io::Error::last_os_error()
|
||||
)));
|
||||
}
|
||||
std::io::Error::from(e)
|
||||
))
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -199,7 +184,7 @@ impl File for UnixFile {
|
||||
rustix::io::pread(file.as_fd(), buf.as_mut_slice(), pos as u64)
|
||||
};
|
||||
match result {
|
||||
std::result::Result::Ok(n) => {
|
||||
Ok(n) => {
|
||||
trace!("pread n: {}", n);
|
||||
// Read succeeded immediately
|
||||
c.complete(0);
|
||||
@@ -236,7 +221,7 @@ impl File for UnixFile {
|
||||
rustix::io::pwrite(file.as_fd(), buf.as_slice(), pos as u64)
|
||||
};
|
||||
match result {
|
||||
std::result::Result::Ok(n) => {
|
||||
Ok(n) => {
|
||||
trace!("pwrite n: {}", n);
|
||||
// Read succeeded immediately
|
||||
c.complete(n as i32);
|
||||
@@ -263,9 +248,9 @@ impl File for UnixFile {
|
||||
|
||||
fn sync(&self, c: Rc<Completion>) -> Result<()> {
|
||||
let file = self.file.borrow();
|
||||
let result = rustix::fs::fsync(file.as_fd());
|
||||
let result = fs::fsync(file.as_fd());
|
||||
match result {
|
||||
std::result::Result::Ok(()) => {
|
||||
Ok(()) => {
|
||||
trace!("fsync");
|
||||
c.complete(0);
|
||||
Ok(())
|
||||
@@ -276,7 +261,7 @@ impl File for UnixFile {
|
||||
|
||||
fn size(&self) -> Result<u64> {
|
||||
let file = self.file.borrow();
|
||||
Ok(file.metadata().unwrap().len())
|
||||
Ok(file.metadata()?.len())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user