From 9ef212edd36c0a825b8aa54bd9933497a86606fd Mon Sep 17 00:00:00 2001 From: gandeevanr Date: Sun, 14 Jul 2024 18:32:57 -0700 Subject: [PATCH 1/2] Added support to disallow multiple processes from opening the same database file in linux and darwin --- Cargo.lock | 1 + core/Cargo.toml | 1 + core/io/common.rs | 51 ++++++++++++++++++++++++ core/io/darwin.rs | 73 +++++++++++++++++++++++++++++++++- core/io/generic.rs | 10 +++++ core/io/linux.rs | 99 +++++++++++++++++++++++++++++++++++++--------- core/io/mod.rs | 4 ++ core/io/windows.rs | 8 ++++ simulator/main.rs | 14 +++++++ 9 files changed, 240 insertions(+), 21 deletions(-) create mode 100644 core/io/common.rs diff --git a/Cargo.lock b/Cargo.lock index 9d178321f..094f4bf99 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1011,6 +1011,7 @@ dependencies = [ "rustix", "sieve-cache", "sqlite3-parser", + "tempfile", "thiserror", ] diff --git a/core/Cargo.toml b/core/Cargo.toml index b1ed2244d..fd0d36cc4 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -47,6 +47,7 @@ pprof = { version = "0.12.1", features = ["criterion", "flamegraph"] } criterion = { version = "0.5", features = ["html_reports", "async", "async_futures"] } rstest = "0.18.2" rusqlite = "0.29.0" +tempfile = "3.8.0" [[bench]] name = "benchmark" diff --git a/core/io/common.rs b/core/io/common.rs new file mode 100644 index 000000000..a8456cd95 --- /dev/null +++ b/core/io/common.rs @@ -0,0 +1,51 @@ +pub const ENV_DISABLE_FILE_LOCK: &str = "LIMBO_DISABLE_FILE_LOCK"; + +#[cfg(test)] +pub mod tests { + use crate::IO; + use anyhow::Result; + use tempfile::NamedTempFile; + use std::process::{Command, Stdio}; + + fn run_test_parent_process(create_io: fn() -> Result) { + let temp_file: NamedTempFile = NamedTempFile::new().expect("Failed to create temp file"); + let path = temp_file.path().to_str().unwrap().to_string(); + + // Parent process opens the file + let io1 = create_io().expect("Failed to create IO"); + let _file1 = io1 + .open_file(&path) + .expect("Failed to open file in parent process"); + + let current_exe = std::env::current_exe().expect("Failed to get current executable path"); + + // Spawn a child process and try to open the same file + let child = Command::new(current_exe) + .env("RUST_TEST_CHILD_PROCESS", "1") + .env("RUST_TEST_FILE_PATH", &path) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + .expect("Failed to spawn child process"); + + let output = child.wait_with_output().expect("Failed to wait on child"); + assert!(!output.status.success(), "Child process should have failed to open the file"); + } + + fn run_test_child_process(create_io: fn() -> Result) -> Result<()> { + if std::env::var("RUST_TEST_CHILD_PROCESS").is_ok() { + let path = std::env::var("RUST_TEST_FILE_PATH")?; + let io = create_io()?; + match io.open_file(&path) { + Ok(_) => std::process::exit(0), + Err(_) => std::process::exit(1), + } + } + Ok(()) + } + + pub fn test_multiple_processes_cannot_open_file(create_io: fn() -> Result) { + run_test_child_process(create_io).unwrap(); + run_test_parent_process(create_io); + } +} diff --git a/core/io/darwin.rs b/core/io/darwin.rs index 21f248785..45c1eee87 100644 --- a/core/io/darwin.rs +++ b/core/io/darwin.rs @@ -1,5 +1,8 @@ +use crate::io::common; + use super::{Completion, File, WriteCompletion, IO}; use anyhow::{Ok, Result}; +use libc::{c_short, fcntl, flock, F_SETLK}; use log::trace; use polling::{Event, Events, Poller}; use rustix::fd::{AsFd, AsRawFd}; @@ -32,12 +35,18 @@ impl IO for DarwinIO { let file = std::fs::File::options() .read(true) .custom_flags(libc::O_NONBLOCK) + .write(true) .open(path)?; - Ok(Rc::new(DarwinFile { + + let darwin_file = Rc::new(DarwinFile { file: Rc::new(RefCell::new(file)), poller: self.poller.clone(), callbacks: self.callbacks.clone(), - })) + }); + if std::env::var(common::ENV_DISABLE_FILE_LOCK).is_err() { + darwin_file.lock_file(true)?; + } + Ok(darwin_file) } fn run_once(&self) -> Result<()> { @@ -108,6 +117,56 @@ pub struct DarwinFile { } impl File for DarwinFile { + fn lock_file(&self, exclusive: bool) -> Result<()> { + let fd = self.file.borrow().as_raw_fd(); + let flock = flock { + l_type: if exclusive { + libc::F_WRLCK as c_short + } else { + libc::F_RDLCK as c_short + }, + l_whence: libc::SEEK_SET as c_short, + l_start: 0, + l_len: 0, // Lock entire file + l_pid: 0, + }; + + // F_SETLK is a non-blocking lock. The lock will be released when the file is closed + // or the process exits or after an explicit unlock. + let lock_result = unsafe { fcntl(fd, F_SETLK, &flock) }; + if lock_result == -1 { + let err = std::io::Error::last_os_error(); + if err.kind() == std::io::ErrorKind::WouldBlock { + return Err(anyhow::anyhow!( + "Failed locking file. File is locked by another process" + )); + } else { + return Err(anyhow::anyhow!("Failed locking file, {}", err)); + } + } + Ok(()) + } + + fn unlock_file(&self) -> Result<()> { + let fd = self.file.borrow().as_raw_fd(); + let flock = flock { + l_type: libc::F_UNLCK as c_short, + l_whence: libc::SEEK_SET as c_short, + l_start: 0, + l_len: 0, + l_pid: 0, + }; + + let unlock_result = unsafe { fcntl(fd, F_SETLK, &flock) }; + if unlock_result == -1 { + return Err(anyhow::anyhow!( + "Failed to release file lock: {}", + std::io::Error::last_os_error() + )); + } + Ok(()) + } + fn pread(&self, pos: usize, c: Rc) -> Result<()> { let file = self.file.borrow(); let result = { @@ -177,3 +236,13 @@ impl File for DarwinFile { } } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_multiple_processes_cannot_open_file() { + common::tests::test_multiple_processes_cannot_open_file(DarwinIO::new); + } +} diff --git a/core/io/generic.rs b/core/io/generic.rs index 55dbdfb90..3665189bd 100644 --- a/core/io/generic.rs +++ b/core/io/generic.rs @@ -32,6 +32,16 @@ pub struct GenericFile { } impl File for GenericFile { + // Since we let the OS handle the locking, file locking is not supported on the generic IO implementation + // No-op implementation allows compilation but provides no actual file locking. + fn lock_file(&self, exclusive: bool) -> Result<()> { + Ok(()) + } + + fn unlock_file(&self) -> Result<()> { + Ok(()) + } + fn pread(&self, pos: usize, c: Rc) -> Result<()> { let mut file = self.file.borrow_mut(); file.seek(std::io::SeekFrom::Start(pos as u64))?; diff --git a/core/io/linux.rs b/core/io/linux.rs index 9d29feb22..b27c47981 100644 --- a/core/io/linux.rs +++ b/core/io/linux.rs @@ -1,12 +1,12 @@ -use super::{Completion, File, WriteCompletion, IO}; +use super::{common, Completion, File, WriteCompletion, IO}; use anyhow::{ensure, Result}; -use libc::iovec; +use libc::{c_short, fcntl, flock, iovec, F_SETLK}; use log::{debug, trace}; -use std::cell::{Ref, RefCell}; -use nix::fcntl::{self, FcntlArg, OFlag}; +use nix::fcntl::{FcntlArg, OFlag}; +use std::cell::RefCell; +use std::fmt; use std::os::unix::io::AsRawFd; use std::rc::Rc; -use std::fmt; use thiserror::Error; const MAX_IOVECS: usize = 128; @@ -20,7 +20,11 @@ enum LinuxIOError { impl fmt::Display for LinuxIOError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - LinuxIOError::IOUringCQError(code) => write!(f, "IOUring completion queue error occurred with code {}", code), + LinuxIOError::IOUringCQError(code) => write!( + f, + "IOUring completion queue error occurred with code {}", + code + ), } } } @@ -65,10 +69,7 @@ impl InnerLinuxIO { impl IO for LinuxIO { fn open_file(&self, path: &str) -> Result> { trace!("open_file(path = {})", path); - let file = std::fs::File::options() - .read(true) - .write(true) - .open(path)?; + let file = std::fs::File::options().read(true).write(true).open(path)?; // Let's attempt to enable direct I/O. Not all filesystems support it // so ignore any errors. let fd = file.as_raw_fd(); @@ -76,23 +77,24 @@ impl IO for LinuxIO { Ok(_) => {}, Err(error) => debug!("Error {error:?} returned when setting O_DIRECT flag to read file. The performance of the system may be affected"), }; - Ok(Rc::new(LinuxFile { + let linux_file = Rc::new(LinuxFile { io: self.inner.clone(), file, - })) + }); + if std::env::var(common::ENV_DISABLE_FILE_LOCK).is_err() { + linux_file.lock_file(true)?; + } + Ok(linux_file) } fn run_once(&self) -> Result<()> { trace!("run_once()"); let mut inner = self.inner.borrow_mut(); - let mut ring = &mut inner.ring; + let ring = &mut inner.ring; ring.submit_and_wait(1)?; while let Some(cqe) = ring.completion().next() { let result = cqe.result(); - ensure!( - result >= 0, - LinuxIOError::IOUringCQError(result) - ); + ensure!(result >= 0, LinuxIOError::IOUringCQError(result)); let c = unsafe { Rc::from_raw(cqe.user_data() as *const Completion) }; c.complete(); } @@ -106,6 +108,54 @@ pub struct LinuxFile { } impl File for LinuxFile { + fn lock_file(&self, exclusive: bool) -> Result<()> { + let fd = self.file.as_raw_fd(); + let flock = flock { + l_type: if exclusive { + libc::F_WRLCK as c_short + } else { + libc::F_RDLCK as c_short + }, + l_whence: libc::SEEK_SET as c_short, + l_start: 0, + l_len: 0, // Lock entire file + l_pid: 0, + }; + + // F_SETLK is a non-blocking lock. The lock will be released when the file is closed + // or the process exits or after an explicit unlock. + let lock_result = unsafe { fcntl(fd, F_SETLK, &flock) }; + if lock_result == -1 { + let err = std::io::Error::last_os_error(); + if err.kind() == std::io::ErrorKind::WouldBlock { + return Err(anyhow::anyhow!("File is locked by another process")); + } else { + return Err(anyhow::anyhow!(err)); + } + } + Ok(()) + } + + fn unlock_file(&self) -> Result<()> { + let fd = self.file.as_raw_fd(); + let flock = flock { + l_type: libc::F_UNLCK as c_short, + l_whence: libc::SEEK_SET as c_short, + l_start: 0, + l_len: 0, + l_pid: 0, + }; + + let unlock_result = unsafe { fcntl(fd, F_SETLK, &flock) }; + if unlock_result == -1 { + return Err(anyhow::anyhow!( + "Failed to release file lock: {}", + std::io::Error::last_os_error() + )); + } + Ok(()) + } + fn pread(&self, pos: usize, c: Rc) -> Result<()> { trace!("pread(pos = {}, length = {})", pos, c.buf().len()); let fd = io_uring::types::Fd(self.file.as_raw_fd()); @@ -121,7 +171,7 @@ impl File for LinuxFile { .build() .user_data(ptr as u64) }; - let mut ring = &mut io.ring; + let ring = &mut io.ring; unsafe { ring.submission() .push(&read_e) @@ -147,7 +197,7 @@ impl File for LinuxFile { .build() .user_data(ptr as u64) }; - let mut ring = &mut io.ring; + let ring = &mut io.ring; unsafe { ring.submission() .push(&write) @@ -156,3 +206,14 @@ impl File for LinuxFile { Ok(()) } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::io::common; + + #[test] + fn test_multiple_processes_cannot_open_file() { + common::tests::test_multiple_processes_cannot_open_file(LinuxIO::new); + } +} diff --git a/core/io/mod.rs b/core/io/mod.rs index 549ded5b0..f0021959c 100644 --- a/core/io/mod.rs +++ b/core/io/mod.rs @@ -8,6 +8,8 @@ use std::{ }; pub trait File { + fn lock_file(&self, exclusive: bool) -> Result<()>; + fn unlock_file(&self) -> Result<()>; fn pread(&self, pos: usize, c: Rc) -> Result<()>; fn pwrite(&self, pos: usize, buffer: Rc>, c: Rc) -> Result<()>; @@ -134,3 +136,5 @@ cfg_block! { pub use generic::GenericIO as PlatformIO; } } + +mod common; \ No newline at end of file diff --git a/core/io/windows.rs b/core/io/windows.rs index cdd99a6c0..40ac91055 100644 --- a/core/io/windows.rs +++ b/core/io/windows.rs @@ -32,6 +32,14 @@ pub struct WindowsFile { } impl File for WindowsFile { + fn lock_file(&self, exclusive: bool) -> Result<()> { + unimplemented!() + } + + fn unlock_file(&self) -> Result<()> { + unimplemented!() + } + fn pread(&self, pos: usize, c: Rc) -> Result<()> { let mut file = self.file.borrow_mut(); file.seek(std::io::SeekFrom::Start(pos as u64))?; diff --git a/simulator/main.rs b/simulator/main.rs index ce048dfc5..07698526a 100644 --- a/simulator/main.rs +++ b/simulator/main.rs @@ -99,6 +99,20 @@ impl SimulatorFile { } impl limbo_core::File for SimulatorFile { + fn lock_file(&self, exclusive: bool) -> Result<()> { + if *self.fault.borrow() { + return Err(anyhow::anyhow!("Injected fault")); + } + self.inner.lock_file(exclusive) + } + + fn unlock_file(&self) -> Result<()> { + if *self.fault.borrow() { + return Err(anyhow::anyhow!("Injected fault")); + } + self.inner.unlock_file() + } + fn pread(&self, pos: usize, c: Rc) -> Result<()> { if *self.fault.borrow() { return Err(anyhow::anyhow!("Injected fault")); From c391b7b9a65fbb52453d6180336d698da83d0536 Mon Sep 17 00:00:00 2001 From: gandeevanr Date: Wed, 17 Jul 2024 11:34:19 -0700 Subject: [PATCH 2/2] implemented the drop trait for all the limbo_core::File implementations --- core/io/darwin.rs | 6 ++++++ core/io/generic.rs | 6 ++++++ core/io/linux.rs | 6 ++++++ simulator/main.rs | 6 ++++++ 4 files changed, 24 insertions(+) diff --git a/core/io/darwin.rs b/core/io/darwin.rs index 45c1eee87..6c6bbf1de 100644 --- a/core/io/darwin.rs +++ b/core/io/darwin.rs @@ -237,6 +237,12 @@ impl File for DarwinFile { } } +impl Drop for DarwinFile { + fn drop(&mut self) { + self.unlock_file().expect("Failed to unlock file"); + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/core/io/generic.rs b/core/io/generic.rs index 3665189bd..fce69859a 100644 --- a/core/io/generic.rs +++ b/core/io/generic.rs @@ -68,3 +68,9 @@ impl File for GenericFile { Ok(()) } } + +impl Drop for GenericFile { + fn drop(&mut self) { + self.unlock_file().expect("Failed to unlock file"); + } +} \ No newline at end of file diff --git a/core/io/linux.rs b/core/io/linux.rs index b27c47981..237538441 100644 --- a/core/io/linux.rs +++ b/core/io/linux.rs @@ -207,6 +207,12 @@ impl File for LinuxFile { } } +impl Drop for LinuxFile { + fn drop(&mut self) { + self.unlock_file().expect("Failed to unlock file"); + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/simulator/main.rs b/simulator/main.rs index 07698526a..11fccd67c 100644 --- a/simulator/main.rs +++ b/simulator/main.rs @@ -132,3 +132,9 @@ impl limbo_core::File for SimulatorFile { self.inner.pwrite(pos, buffer, c) } } + +impl Drop for SimulatorFile { + fn drop(&mut self) { + self.inner.unlock_file().expect("Failed to unlock file"); + } +}