From 0c22382f3c9f8aecfae2072d200a0021733e4377 Mon Sep 17 00:00:00 2001 From: pedrocarlo Date: Fri, 2 May 2025 16:30:48 -0300 Subject: [PATCH] shared lock on file and throw ReadOnly error in transaction --- Cargo.lock | 1 + core/Cargo.toml | 1 + core/error.rs | 2 ++ core/io/generic.rs | 14 +++++++++----- core/io/io_uring.rs | 16 ++++++++++------ core/io/mod.rs | 23 +++++++++++++---------- core/io/unix.rs | 21 ++++++++++++--------- core/io/vfs.rs | 2 +- core/io/windows.rs | 14 +++++++++----- core/lib.rs | 27 +++++++++++++++++++++++++-- core/vdbe/execute.rs | 13 +++++-------- 11 files changed, 88 insertions(+), 46 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1a4c6069c..3fb8760ac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1726,6 +1726,7 @@ dependencies = [ name = "limbo_core" version = "0.0.19" dependencies = [ + "bitflags 2.9.0", "built", "cfg_block", "chrono", diff --git a/core/Cargo.toml b/core/Cargo.toml index f23aeeeb0..9fd71b059 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -73,6 +73,7 @@ parking_lot = "0.12.3" crossbeam-skiplist = "0.1.3" tracing = "0.1.41" ryu = "1.0.19" +bitflags = "2.9.0" [build-dependencies] chrono = { version = "0.4.38", default-features = false } diff --git a/core/error.rs b/core/error.rs index e8eb83a5a..1eca50305 100644 --- a/core/error.rs +++ b/core/error.rs @@ -51,6 +51,8 @@ pub enum LimboError { IntegerOverflow, #[error("Schema is locked for write")] SchemaLocked, + #[error("Database Connection is read-only")] + ReadOnly, } #[macro_export] diff --git a/core/io/generic.rs b/core/io/generic.rs index fd59ece88..aab5f2687 100644 --- a/core/io/generic.rs +++ b/core/io/generic.rs @@ -20,11 +20,15 @@ unsafe impl Sync for GenericIO {} impl IO for GenericIO { fn open_file(&self, path: &str, flags: OpenFlags, _direct: bool) -> Result> { trace!("open_file(path = {})", path); - let file = std::fs::OpenOptions::new() - .read(true) - .write(true) - .create(matches!(flags, OpenFlags::Create)) - .open(path)?; + let mut file = std::fs::File::options(); + file.read(true); + + if !flags.contains(OpenFlags::ReadOnly) { + file.write(true); + file.create(flags.contains(OpenFlags::Create)); + } + + let file = file.open(path)?; Ok(Arc::new(GenericFile { file: RefCell::new(file), memory_io: Arc::new(MemoryIO::new()), diff --git a/core/io/io_uring.rs b/core/io/io_uring.rs index b4b21aca8..25d6aa33e 100644 --- a/core/io/io_uring.rs +++ b/core/io/io_uring.rs @@ -139,11 +139,15 @@ impl WrappedIOUring { impl IO for UringIO { fn open_file(&self, path: &str, flags: OpenFlags, direct: bool) -> Result> { trace!("open_file(path = {})", path); - let file = std::fs::File::options() - .read(true) - .write(true) - .create(matches!(flags, OpenFlags::Create)) - .open(path)?; + let mut file = std::fs::File::options(); + file.read(true); + + if !flags.contains(OpenFlags::ReadOnly) { + file.write(true); + file.create(flags.contains(OpenFlags::Create)); + } + + let file = file.open(path)?; // Let's attempt to enable direct I/O. Not all filesystems support it // so ignore any errors. let fd = file.as_fd(); @@ -158,7 +162,7 @@ impl IO for UringIO { file, }); if std::env::var(common::ENV_DISABLE_FILE_LOCK).is_err() { - uring_file.lock_file(true)?; + uring_file.lock_file(!flags.contains(OpenFlags::ReadOnly))?; } Ok(uring_file) } diff --git a/core/io/mod.rs b/core/io/mod.rs index 6f161d114..6f75e9bea 100644 --- a/core/io/mod.rs +++ b/core/io/mod.rs @@ -1,4 +1,5 @@ use crate::Result; +use bitflags::bitflags; use cfg_block::cfg_block; use std::fmt; use std::sync::Arc; @@ -19,18 +20,20 @@ pub trait File: Send + Sync { fn size(&self) -> Result; } -#[derive(Copy, Clone)] -pub enum OpenFlags { - None, - Create, +#[derive(Debug, Copy, Clone, PartialEq)] +pub struct OpenFlags(i32); + +bitflags! { + impl OpenFlags: i32 { + const None = 0b00000000; + const Create = 0b0000001; + const ReadOnly = 0b0000010; + } } -impl OpenFlags { - pub fn to_flags(&self) -> i32 { - match self { - Self::None => 0, - Self::Create => 1, - } +impl Default for OpenFlags { + fn default() -> Self { + Self::Create } } diff --git a/core/io/unix.rs b/core/io/unix.rs index c232ed3ad..721ba20f3 100644 --- a/core/io/unix.rs +++ b/core/io/unix.rs @@ -3,6 +3,7 @@ use crate::io::common; use crate::Result; use super::{Completion, File, MemoryIO, OpenFlags, IO}; +use crate::io::clock::{Clock, Instant}; use polling::{Event, Events, Poller}; use rustix::{ fd::{AsFd, AsRawFd}, @@ -18,7 +19,6 @@ use std::{ sync::Arc, }; use tracing::{debug, trace}; -use crate::io::clock::{Clock, Instant}; struct OwnedCallbacks(UnsafeCell); // We assume we locking on IO level is done by user. @@ -197,12 +197,15 @@ impl Clock for UnixIO { impl IO for UnixIO { fn open_file(&self, path: &str, flags: OpenFlags, _direct: bool) -> Result> { trace!("open_file(path = {})", path); - let file = std::fs::File::options() - .read(true) - .custom_flags(OFlags::NONBLOCK.bits() as i32) - .write(true) - .create(matches!(flags, OpenFlags::Create)) - .open(path)?; + let mut file = std::fs::File::options(); + file.read(true).custom_flags(OFlags::NONBLOCK.bits() as i32); + + if !flags.contains(OpenFlags::ReadOnly) { + file.write(true); + file.create(flags.contains(OpenFlags::Create)); + } + + let file = file.open(path)?; #[allow(clippy::arc_with_non_send_sync)] let unix_file = Arc::new(UnixFile { @@ -211,7 +214,7 @@ impl IO for UnixIO { callbacks: BorrowedCallbacks(self.callbacks.as_mut().into()), }); if std::env::var(common::ENV_DISABLE_FILE_LOCK).is_err() { - unix_file.lock_file(true)?; + unix_file.lock_file(!flags.contains(OpenFlags::ReadOnly))?; } Ok(unix_file) } @@ -258,7 +261,7 @@ impl IO for UnixIO { getrandom::getrandom(&mut buf).unwrap(); i64::from_ne_bytes(buf) } - + fn get_memory_io(&self) -> Arc { Arc::new(MemoryIO::new()) } diff --git a/core/io/vfs.rs b/core/io/vfs.rs index 95b4055d0..d02f7d345 100644 --- a/core/io/vfs.rs +++ b/core/io/vfs.rs @@ -24,7 +24,7 @@ impl IO for VfsMod { })?; let ctx = self.ctx as *mut c_void; let vfs = unsafe { &*self.ctx }; - let file = unsafe { (vfs.open)(ctx, c_path.as_ptr(), flags.to_flags(), direct) }; + let file = unsafe { (vfs.open)(ctx, c_path.as_ptr(), flags.0, direct) }; if file.is_null() { return Err(LimboError::ExtensionError("File not found".to_string())); } diff --git a/core/io/windows.rs b/core/io/windows.rs index 6c46d1973..a329abc14 100644 --- a/core/io/windows.rs +++ b/core/io/windows.rs @@ -19,11 +19,15 @@ unsafe impl Sync for WindowsIO {} impl IO for WindowsIO { fn open_file(&self, path: &str, flags: OpenFlags, direct: bool) -> Result> { trace!("open_file(path = {})", path); - let file = std::fs::File::options() - .read(true) - .write(true) - .create(matches!(flags, OpenFlags::Create)) - .open(path)?; + let mut file = std::fs::File::options(); + file.read(true); + + if !flags.contains(OpenFlags::ReadOnly) { + file.write(true); + file.create(flags.contains(OpenFlags::Create)); + } + + let file = file.open(path)?; Ok(Arc::new(WindowsFile { file: RefCell::new(file), })) diff --git a/core/lib.rs b/core/lib.rs index ddf741ffb..2f7ab0577 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -101,6 +101,7 @@ pub struct Database { // create DB connections. shared_page_cache: Arc>, shared_wal: Arc>, + open_flags: OpenFlags, } unsafe impl Send for Database {} @@ -109,10 +110,20 @@ unsafe impl Sync for Database {} impl Database { #[cfg(feature = "fs")] pub fn open_file(io: Arc, path: &str, enable_mvcc: bool) -> Result> { - let file = io.open_file(path, OpenFlags::Create, true)?; + Self::open_file_with_flags(io, path, OpenFlags::default(), enable_mvcc) + } + + #[cfg(feature = "fs")] + pub fn open_file_with_flags( + io: Arc, + path: &str, + flags: OpenFlags, + enable_mvcc: bool, + ) -> Result> { + let file = io.open_file(path, flags, true)?; maybe_init_database_file(&file, &io)?; let db_file = Arc::new(DatabaseFile::new(file)); - Self::open(io, path, db_file, enable_mvcc) + Self::open_with_flags(io, path, db_file, flags, enable_mvcc) } #[allow(clippy::arc_with_non_send_sync)] @@ -121,6 +132,17 @@ impl Database { path: &str, db_file: Arc, enable_mvcc: bool, + ) -> Result> { + Self::open_with_flags(io, path, db_file, OpenFlags::default(), enable_mvcc) + } + + #[allow(clippy::arc_with_non_send_sync)] + pub fn open_with_flags( + io: Arc, + path: &str, + db_file: Arc, + flags: OpenFlags, + enable_mvcc: bool, ) -> Result> { let db_header = Pager::begin_open(db_file.clone())?; // ensure db header is there @@ -155,6 +177,7 @@ impl Database { db_file, io: io.clone(), page_size, + open_flags: flags, }; let db = Arc::new(db); { diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 948f0a5b8..30f9e6781 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -1564,20 +1564,17 @@ pub fn op_transaction( let Insn::Transaction { write } = insn else { unreachable!("unexpected Insn {:?}", insn) }; + let connection = program.connection.upgrade().unwrap(); + if *write && connection._db.open_flags.contains(OpenFlags::ReadOnly) { + return Err(LimboError::ReadOnly); + } if let Some(mv_store) = &mv_store { if state.mv_tx_id.is_none() { let tx_id = mv_store.begin_tx(); - program - .connection - .upgrade() - .unwrap() - .mv_transactions - .borrow_mut() - .push(tx_id); + connection.mv_transactions.borrow_mut().push(tx_id); state.mv_tx_id = Some(tx_id); } } else { - let connection = program.connection.upgrade().unwrap(); let current_state = connection.transaction_state.get(); let (new_transaction_state, updated) = match (current_state, write) { (TransactionState::Write, true) => (TransactionState::Write, false),