Merge pull request #151 from gvos94/graghura/lock-files

This commit is contained in:
Pekka Enberg
2024-07-17 22:11:47 +03:00
committed by GitHub
9 changed files with 264 additions and 21 deletions

1
Cargo.lock generated
View File

@@ -1011,6 +1011,7 @@ dependencies = [
"rustix",
"sieve-cache",
"sqlite3-parser",
"tempfile",
"thiserror",
]

View File

@@ -47,6 +47,7 @@ pprof = { version = "0.12.1", features = ["criterion", "flamegraph"] }
criterion = { version = "0.5", features = ["html_reports", "async", "async_futures"] }
rstest = "0.18.2"
rusqlite = "0.29.0"
tempfile = "3.8.0"
[[bench]]
name = "benchmark"

51
core/io/common.rs Normal file
View File

@@ -0,0 +1,51 @@
pub const ENV_DISABLE_FILE_LOCK: &str = "LIMBO_DISABLE_FILE_LOCK";
#[cfg(test)]
pub mod tests {
use crate::IO;
use anyhow::Result;
use tempfile::NamedTempFile;
use std::process::{Command, Stdio};
fn run_test_parent_process<T: IO>(create_io: fn() -> Result<T>) {
let temp_file: NamedTempFile = NamedTempFile::new().expect("Failed to create temp file");
let path = temp_file.path().to_str().unwrap().to_string();
// Parent process opens the file
let io1 = create_io().expect("Failed to create IO");
let _file1 = io1
.open_file(&path)
.expect("Failed to open file in parent process");
let current_exe = std::env::current_exe().expect("Failed to get current executable path");
// Spawn a child process and try to open the same file
let child = Command::new(current_exe)
.env("RUST_TEST_CHILD_PROCESS", "1")
.env("RUST_TEST_FILE_PATH", &path)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.expect("Failed to spawn child process");
let output = child.wait_with_output().expect("Failed to wait on child");
assert!(!output.status.success(), "Child process should have failed to open the file");
}
fn run_test_child_process<T: IO>(create_io: fn() -> Result<T>) -> Result<()> {
if std::env::var("RUST_TEST_CHILD_PROCESS").is_ok() {
let path = std::env::var("RUST_TEST_FILE_PATH")?;
let io = create_io()?;
match io.open_file(&path) {
Ok(_) => std::process::exit(0),
Err(_) => std::process::exit(1),
}
}
Ok(())
}
pub fn test_multiple_processes_cannot_open_file<T: IO>(create_io: fn() -> Result<T>) {
run_test_child_process(create_io).unwrap();
run_test_parent_process(create_io);
}
}

View File

@@ -1,5 +1,8 @@
use crate::io::common;
use super::{Completion, File, WriteCompletion, IO};
use anyhow::{Ok, Result};
use libc::{c_short, fcntl, flock, F_SETLK};
use log::trace;
use polling::{Event, Events, Poller};
use rustix::fd::{AsFd, AsRawFd};
@@ -32,12 +35,18 @@ impl IO for DarwinIO {
let file = std::fs::File::options()
.read(true)
.custom_flags(libc::O_NONBLOCK)
.write(true)
.open(path)?;
Ok(Rc::new(DarwinFile {
let darwin_file = Rc::new(DarwinFile {
file: Rc::new(RefCell::new(file)),
poller: self.poller.clone(),
callbacks: self.callbacks.clone(),
}))
});
if std::env::var(common::ENV_DISABLE_FILE_LOCK).is_err() {
darwin_file.lock_file(true)?;
}
Ok(darwin_file)
}
fn run_once(&self) -> Result<()> {
@@ -108,6 +117,56 @@ pub struct DarwinFile {
}
impl File for DarwinFile {
fn lock_file(&self, exclusive: bool) -> Result<()> {
let fd = self.file.borrow().as_raw_fd();
let flock = flock {
l_type: if exclusive {
libc::F_WRLCK as c_short
} else {
libc::F_RDLCK as c_short
},
l_whence: libc::SEEK_SET as c_short,
l_start: 0,
l_len: 0, // Lock entire file
l_pid: 0,
};
// F_SETLK is a non-blocking lock. The lock will be released when the file is closed
// or the process exits or after an explicit unlock.
let lock_result = unsafe { fcntl(fd, F_SETLK, &flock) };
if lock_result == -1 {
let err = std::io::Error::last_os_error();
if err.kind() == std::io::ErrorKind::WouldBlock {
return Err(anyhow::anyhow!(
"Failed locking file. File is locked by another process"
));
} else {
return Err(anyhow::anyhow!("Failed locking file, {}", err));
}
}
Ok(())
}
fn unlock_file(&self) -> Result<()> {
let fd = self.file.borrow().as_raw_fd();
let flock = flock {
l_type: libc::F_UNLCK as c_short,
l_whence: libc::SEEK_SET as c_short,
l_start: 0,
l_len: 0,
l_pid: 0,
};
let unlock_result = unsafe { fcntl(fd, F_SETLK, &flock) };
if unlock_result == -1 {
return Err(anyhow::anyhow!(
"Failed to release file lock: {}",
std::io::Error::last_os_error()
));
}
Ok(())
}
fn pread(&self, pos: usize, c: Rc<Completion>) -> Result<()> {
let file = self.file.borrow();
let result = {
@@ -177,3 +236,19 @@ impl File for DarwinFile {
}
}
}
impl Drop for DarwinFile {
fn drop(&mut self) {
self.unlock_file().expect("Failed to unlock file");
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_multiple_processes_cannot_open_file() {
common::tests::test_multiple_processes_cannot_open_file(DarwinIO::new);
}
}

View File

@@ -32,6 +32,16 @@ pub struct GenericFile {
}
impl File for GenericFile {
// Since we let the OS handle the locking, file locking is not supported on the generic IO implementation
// No-op implementation allows compilation but provides no actual file locking.
fn lock_file(&self, exclusive: bool) -> Result<()> {
Ok(())
}
fn unlock_file(&self) -> Result<()> {
Ok(())
}
fn pread(&self, pos: usize, c: Rc<Completion>) -> Result<()> {
let mut file = self.file.borrow_mut();
file.seek(std::io::SeekFrom::Start(pos as u64))?;
@@ -58,3 +68,9 @@ impl File for GenericFile {
Ok(())
}
}
impl Drop for GenericFile {
fn drop(&mut self) {
self.unlock_file().expect("Failed to unlock file");
}
}

View File

@@ -1,12 +1,12 @@
use super::{Completion, File, WriteCompletion, IO};
use super::{common, Completion, File, WriteCompletion, IO};
use anyhow::{ensure, Result};
use libc::iovec;
use libc::{c_short, fcntl, flock, iovec, F_SETLK};
use log::{debug, trace};
use std::cell::{Ref, RefCell};
use nix::fcntl::{self, FcntlArg, OFlag};
use nix::fcntl::{FcntlArg, OFlag};
use std::cell::RefCell;
use std::fmt;
use std::os::unix::io::AsRawFd;
use std::rc::Rc;
use std::fmt;
use thiserror::Error;
const MAX_IOVECS: usize = 128;
@@ -20,7 +20,11 @@ enum LinuxIOError {
impl fmt::Display for LinuxIOError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
LinuxIOError::IOUringCQError(code) => write!(f, "IOUring completion queue error occurred with code {}", code),
LinuxIOError::IOUringCQError(code) => write!(
f,
"IOUring completion queue error occurred with code {}",
code
),
}
}
}
@@ -65,10 +69,7 @@ impl InnerLinuxIO {
impl IO for LinuxIO {
fn open_file(&self, path: &str) -> Result<Rc<dyn File>> {
trace!("open_file(path = {})", path);
let file = std::fs::File::options()
.read(true)
.write(true)
.open(path)?;
let file = std::fs::File::options().read(true).write(true).open(path)?;
// Let's attempt to enable direct I/O. Not all filesystems support it
// so ignore any errors.
let fd = file.as_raw_fd();
@@ -76,23 +77,24 @@ impl IO for LinuxIO {
Ok(_) => {},
Err(error) => debug!("Error {error:?} returned when setting O_DIRECT flag to read file. The performance of the system may be affected"),
};
Ok(Rc::new(LinuxFile {
let linux_file = Rc::new(LinuxFile {
io: self.inner.clone(),
file,
}))
});
if std::env::var(common::ENV_DISABLE_FILE_LOCK).is_err() {
linux_file.lock_file(true)?;
}
Ok(linux_file)
}
fn run_once(&self) -> Result<()> {
trace!("run_once()");
let mut inner = self.inner.borrow_mut();
let mut ring = &mut inner.ring;
let ring = &mut inner.ring;
ring.submit_and_wait(1)?;
while let Some(cqe) = ring.completion().next() {
let result = cqe.result();
ensure!(
result >= 0,
LinuxIOError::IOUringCQError(result)
);
ensure!(result >= 0, LinuxIOError::IOUringCQError(result));
let c = unsafe { Rc::from_raw(cqe.user_data() as *const Completion) };
c.complete();
}
@@ -106,6 +108,54 @@ pub struct LinuxFile {
}
impl File for LinuxFile {
fn lock_file(&self, exclusive: bool) -> Result<()> {
let fd = self.file.as_raw_fd();
let flock = flock {
l_type: if exclusive {
libc::F_WRLCK as c_short
} else {
libc::F_RDLCK as c_short
},
l_whence: libc::SEEK_SET as c_short,
l_start: 0,
l_len: 0, // Lock entire file
l_pid: 0,
};
// F_SETLK is a non-blocking lock. The lock will be released when the file is closed
// or the process exits or after an explicit unlock.
let lock_result = unsafe { fcntl(fd, F_SETLK, &flock) };
if lock_result == -1 {
let err = std::io::Error::last_os_error();
if err.kind() == std::io::ErrorKind::WouldBlock {
return Err(anyhow::anyhow!("File is locked by another process"));
} else {
return Err(anyhow::anyhow!(err));
}
}
Ok(())
}
fn unlock_file(&self) -> Result<()> {
let fd = self.file.as_raw_fd();
let flock = flock {
l_type: libc::F_UNLCK as c_short,
l_whence: libc::SEEK_SET as c_short,
l_start: 0,
l_len: 0,
l_pid: 0,
};
let unlock_result = unsafe { fcntl(fd, F_SETLK, &flock) };
if unlock_result == -1 {
return Err(anyhow::anyhow!(
"Failed to release file lock: {}",
std::io::Error::last_os_error()
));
}
Ok(())
}
fn pread(&self, pos: usize, c: Rc<Completion>) -> Result<()> {
trace!("pread(pos = {}, length = {})", pos, c.buf().len());
let fd = io_uring::types::Fd(self.file.as_raw_fd());
@@ -121,7 +171,7 @@ impl File for LinuxFile {
.build()
.user_data(ptr as u64)
};
let mut ring = &mut io.ring;
let ring = &mut io.ring;
unsafe {
ring.submission()
.push(&read_e)
@@ -147,7 +197,7 @@ impl File for LinuxFile {
.build()
.user_data(ptr as u64)
};
let mut ring = &mut io.ring;
let ring = &mut io.ring;
unsafe {
ring.submission()
.push(&write)
@@ -156,3 +206,20 @@ impl File for LinuxFile {
Ok(())
}
}
impl Drop for LinuxFile {
fn drop(&mut self) {
self.unlock_file().expect("Failed to unlock file");
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::io::common;
#[test]
fn test_multiple_processes_cannot_open_file() {
common::tests::test_multiple_processes_cannot_open_file(LinuxIO::new);
}
}

View File

@@ -8,6 +8,8 @@ use std::{
};
pub trait File {
fn lock_file(&self, exclusive: bool) -> Result<()>;
fn unlock_file(&self) -> Result<()>;
fn pread(&self, pos: usize, c: Rc<Completion>) -> Result<()>;
fn pwrite(&self, pos: usize, buffer: Rc<RefCell<Buffer>>, c: Rc<WriteCompletion>)
-> Result<()>;
@@ -134,3 +136,5 @@ cfg_block! {
pub use generic::GenericIO as PlatformIO;
}
}
mod common;

View File

@@ -32,6 +32,14 @@ pub struct WindowsFile {
}
impl File for WindowsFile {
fn lock_file(&self, exclusive: bool) -> Result<()> {
unimplemented!()
}
fn unlock_file(&self) -> Result<()> {
unimplemented!()
}
fn pread(&self, pos: usize, c: Rc<Completion>) -> Result<()> {
let mut file = self.file.borrow_mut();
file.seek(std::io::SeekFrom::Start(pos as u64))?;

View File

@@ -99,6 +99,20 @@ impl SimulatorFile {
}
impl limbo_core::File for SimulatorFile {
fn lock_file(&self, exclusive: bool) -> Result<()> {
if *self.fault.borrow() {
return Err(anyhow::anyhow!("Injected fault"));
}
self.inner.lock_file(exclusive)
}
fn unlock_file(&self) -> Result<()> {
if *self.fault.borrow() {
return Err(anyhow::anyhow!("Injected fault"));
}
self.inner.unlock_file()
}
fn pread(&self, pos: usize, c: Rc<limbo_core::Completion>) -> Result<()> {
if *self.fault.borrow() {
return Err(anyhow::anyhow!("Injected fault"));
@@ -118,3 +132,9 @@ impl limbo_core::File for SimulatorFile {
self.inner.pwrite(pos, buffer, c)
}
}
impl Drop for SimulatorFile {
fn drop(&mut self) {
self.inner.unlock_file().expect("Failed to unlock file");
}
}