Merge 'io/unix: wrap file with Mutex' from Pere Diaz Bou

Reviewed-by: Jussi Saurio <jussi.saurio@gmail.com>

Closes #2301
This commit is contained in:
Pekka Enberg
2025-07-28 12:53:38 +03:00

View File

@@ -13,6 +13,7 @@ use rustix::{
use std::{
cell::{RefCell, UnsafeCell},
mem::MaybeUninit,
sync::Mutex,
};
use std::{io::ErrorKind, sync::Arc};
@@ -227,7 +228,7 @@ impl IO for UnixIO {
#[allow(clippy::arc_with_non_send_sync)]
let unix_file = Arc::new(UnixFile {
file: Arc::new(RefCell::new(file)),
file: Arc::new(Mutex::new(file)),
poller: BorrowedPollHandler(self.poller.as_mut().into()),
callbacks: BorrowedCallbacks(self.callbacks.as_mut().into()),
});
@@ -250,13 +251,13 @@ impl IO for UnixIO {
if let Some(cf) = self.callbacks.get(event.key) {
let result = match cf {
CompletionCallback::Read(ref file, ref c, pos) => {
let file = file.borrow_mut();
let file = file.lock().unwrap();
let r = c.as_read();
let mut buf = r.buf_mut();
rustix::io::pread(file.as_fd(), buf.as_mut_slice(), *pos as u64)
}
CompletionCallback::Write(ref file, _, ref buf, pos) => {
let file = file.borrow_mut();
let file = file.lock().unwrap();
let buf = buf.borrow();
rustix::io::pwrite(file.as_fd(), buf.as_slice(), *pos as u64)
}
@@ -304,9 +305,9 @@ impl IO for UnixIO {
}
enum CompletionCallback {
Read(Arc<RefCell<std::fs::File>>, Arc<Completion>, usize),
Read(Arc<Mutex<std::fs::File>>, Arc<Completion>, usize),
Write(
Arc<RefCell<std::fs::File>>,
Arc<Mutex<std::fs::File>>,
Arc<Completion>,
Arc<RefCell<crate::Buffer>>,
usize,
@@ -315,7 +316,7 @@ enum CompletionCallback {
pub struct UnixFile<'io> {
#[allow(clippy::arc_with_non_send_sync)]
file: Arc<RefCell<std::fs::File>>,
file: Arc<Mutex<std::fs::File>>,
poller: BorrowedPollHandler<'io>,
callbacks: BorrowedCallbacks<'io>,
}
@@ -324,7 +325,7 @@ unsafe impl Sync for UnixFile<'_> {}
impl File for UnixFile<'_> {
fn lock_file(&self, exclusive: bool) -> Result<()> {
let fd = self.file.borrow();
let fd = self.file.lock().unwrap();
let fd = fd.as_fd();
// F_SETLK is a non-blocking lock. The lock will be released when the file is closed
// or the process exits or after an explicit unlock.
@@ -351,7 +352,7 @@ impl File for UnixFile<'_> {
}
fn unlock_file(&self) -> Result<()> {
let fd = self.file.borrow();
let fd = self.file.lock().unwrap();
let fd = fd.as_fd();
fs::fcntl_lock(fd, FlockOperation::NonBlockingUnlock).map_err(|e| {
LimboError::LockingError(format!(
@@ -364,7 +365,7 @@ impl File for UnixFile<'_> {
#[instrument(err, skip_all, level = Level::TRACE)]
fn pread(&self, pos: usize, c: Arc<Completion>) -> Result<Arc<Completion>> {
let file = self.file.borrow();
let file = self.file.lock().unwrap();
let result = {
let r = c.as_read();
let mut buf = r.buf_mut();
@@ -402,7 +403,7 @@ impl File for UnixFile<'_> {
buffer: Arc<RefCell<crate::Buffer>>,
c: Arc<Completion>,
) -> Result<Arc<Completion>> {
let file = self.file.borrow();
let file = self.file.lock().unwrap();
let result = {
let buf = buffer.borrow();
rustix::io::pwrite(file.as_fd(), buf.as_slice(), pos as u64)
@@ -432,7 +433,7 @@ impl File for UnixFile<'_> {
#[instrument(err, skip_all, level = Level::TRACE)]
fn sync(&self, c: Arc<Completion>) -> Result<Arc<Completion>> {
let file = self.file.borrow();
let file = self.file.lock().unwrap();
let result = fs::fsync(file.as_fd());
match result {
Ok(()) => {
@@ -446,7 +447,7 @@ impl File for UnixFile<'_> {
#[instrument(err, skip_all, level = Level::TRACE)]
fn size(&self) -> Result<u64> {
let file = self.file.borrow();
let file = self.file.lock().unwrap();
Ok(file.metadata()?.len())
}
}