shared lock on file and throw ReadOnly error in transaction

This commit is contained in:
pedrocarlo
2025-05-02 16:30:48 -03:00
parent 6096cfb3d8
commit 0c22382f3c
11 changed files with 88 additions and 46 deletions

1
Cargo.lock generated
View File

@@ -1726,6 +1726,7 @@ dependencies = [
name = "limbo_core"
version = "0.0.19"
dependencies = [
"bitflags 2.9.0",
"built",
"cfg_block",
"chrono",

View File

@@ -73,6 +73,7 @@ parking_lot = "0.12.3"
crossbeam-skiplist = "0.1.3"
tracing = "0.1.41"
ryu = "1.0.19"
bitflags = "2.9.0"
[build-dependencies]
chrono = { version = "0.4.38", default-features = false }

View File

@@ -51,6 +51,8 @@ pub enum LimboError {
IntegerOverflow,
#[error("Schema is locked for write")]
SchemaLocked,
#[error("Database Connection is read-only")]
ReadOnly,
}
#[macro_export]

View File

@@ -20,11 +20,15 @@ unsafe impl Sync for GenericIO {}
impl IO for GenericIO {
fn open_file(&self, path: &str, flags: OpenFlags, _direct: bool) -> Result<Arc<dyn File>> {
trace!("open_file(path = {})", path);
let file = std::fs::OpenOptions::new()
.read(true)
.write(true)
.create(matches!(flags, OpenFlags::Create))
.open(path)?;
let mut file = std::fs::File::options();
file.read(true);
if !flags.contains(OpenFlags::ReadOnly) {
file.write(true);
file.create(flags.contains(OpenFlags::Create));
}
let file = file.open(path)?;
Ok(Arc::new(GenericFile {
file: RefCell::new(file),
memory_io: Arc::new(MemoryIO::new()),

View File

@@ -139,11 +139,15 @@ impl WrappedIOUring {
impl IO for UringIO {
fn open_file(&self, path: &str, flags: OpenFlags, direct: bool) -> Result<Arc<dyn File>> {
trace!("open_file(path = {})", path);
let file = std::fs::File::options()
.read(true)
.write(true)
.create(matches!(flags, OpenFlags::Create))
.open(path)?;
let mut file = std::fs::File::options();
file.read(true);
if !flags.contains(OpenFlags::ReadOnly) {
file.write(true);
file.create(flags.contains(OpenFlags::Create));
}
let file = file.open(path)?;
// Let's attempt to enable direct I/O. Not all filesystems support it
// so ignore any errors.
let fd = file.as_fd();
@@ -158,7 +162,7 @@ impl IO for UringIO {
file,
});
if std::env::var(common::ENV_DISABLE_FILE_LOCK).is_err() {
uring_file.lock_file(true)?;
uring_file.lock_file(!flags.contains(OpenFlags::ReadOnly))?;
}
Ok(uring_file)
}

View File

@@ -1,4 +1,5 @@
use crate::Result;
use bitflags::bitflags;
use cfg_block::cfg_block;
use std::fmt;
use std::sync::Arc;
@@ -19,18 +20,20 @@ pub trait File: Send + Sync {
fn size(&self) -> Result<u64>;
}
#[derive(Copy, Clone)]
pub enum OpenFlags {
None,
Create,
#[derive(Debug, Copy, Clone, PartialEq)]
pub struct OpenFlags(i32);
bitflags! {
impl OpenFlags: i32 {
const None = 0b00000000;
const Create = 0b0000001;
const ReadOnly = 0b0000010;
}
}
impl OpenFlags {
pub fn to_flags(&self) -> i32 {
match self {
Self::None => 0,
Self::Create => 1,
}
impl Default for OpenFlags {
fn default() -> Self {
Self::Create
}
}

View File

@@ -3,6 +3,7 @@ use crate::io::common;
use crate::Result;
use super::{Completion, File, MemoryIO, OpenFlags, IO};
use crate::io::clock::{Clock, Instant};
use polling::{Event, Events, Poller};
use rustix::{
fd::{AsFd, AsRawFd},
@@ -18,7 +19,6 @@ use std::{
sync::Arc,
};
use tracing::{debug, trace};
use crate::io::clock::{Clock, Instant};
struct OwnedCallbacks(UnsafeCell<Callbacks>);
// We assume we locking on IO level is done by user.
@@ -197,12 +197,15 @@ impl Clock for UnixIO {
impl IO for UnixIO {
fn open_file(&self, path: &str, flags: OpenFlags, _direct: bool) -> Result<Arc<dyn File>> {
trace!("open_file(path = {})", path);
let file = std::fs::File::options()
.read(true)
.custom_flags(OFlags::NONBLOCK.bits() as i32)
.write(true)
.create(matches!(flags, OpenFlags::Create))
.open(path)?;
let mut file = std::fs::File::options();
file.read(true).custom_flags(OFlags::NONBLOCK.bits() as i32);
if !flags.contains(OpenFlags::ReadOnly) {
file.write(true);
file.create(flags.contains(OpenFlags::Create));
}
let file = file.open(path)?;
#[allow(clippy::arc_with_non_send_sync)]
let unix_file = Arc::new(UnixFile {
@@ -211,7 +214,7 @@ impl IO for UnixIO {
callbacks: BorrowedCallbacks(self.callbacks.as_mut().into()),
});
if std::env::var(common::ENV_DISABLE_FILE_LOCK).is_err() {
unix_file.lock_file(true)?;
unix_file.lock_file(!flags.contains(OpenFlags::ReadOnly))?;
}
Ok(unix_file)
}
@@ -258,7 +261,7 @@ impl IO for UnixIO {
getrandom::getrandom(&mut buf).unwrap();
i64::from_ne_bytes(buf)
}
fn get_memory_io(&self) -> Arc<MemoryIO> {
Arc::new(MemoryIO::new())
}

View File

@@ -24,7 +24,7 @@ impl IO for VfsMod {
})?;
let ctx = self.ctx as *mut c_void;
let vfs = unsafe { &*self.ctx };
let file = unsafe { (vfs.open)(ctx, c_path.as_ptr(), flags.to_flags(), direct) };
let file = unsafe { (vfs.open)(ctx, c_path.as_ptr(), flags.0, direct) };
if file.is_null() {
return Err(LimboError::ExtensionError("File not found".to_string()));
}

View File

@@ -19,11 +19,15 @@ unsafe impl Sync for WindowsIO {}
impl IO for WindowsIO {
fn open_file(&self, path: &str, flags: OpenFlags, direct: bool) -> Result<Arc<dyn File>> {
trace!("open_file(path = {})", path);
let file = std::fs::File::options()
.read(true)
.write(true)
.create(matches!(flags, OpenFlags::Create))
.open(path)?;
let mut file = std::fs::File::options();
file.read(true);
if !flags.contains(OpenFlags::ReadOnly) {
file.write(true);
file.create(flags.contains(OpenFlags::Create));
}
let file = file.open(path)?;
Ok(Arc::new(WindowsFile {
file: RefCell::new(file),
}))

View File

@@ -101,6 +101,7 @@ pub struct Database {
// create DB connections.
shared_page_cache: Arc<RwLock<DumbLruPageCache>>,
shared_wal: Arc<UnsafeCell<WalFileShared>>,
open_flags: OpenFlags,
}
unsafe impl Send for Database {}
@@ -109,10 +110,20 @@ unsafe impl Sync for Database {}
impl Database {
#[cfg(feature = "fs")]
pub fn open_file(io: Arc<dyn IO>, path: &str, enable_mvcc: bool) -> Result<Arc<Database>> {
let file = io.open_file(path, OpenFlags::Create, true)?;
Self::open_file_with_flags(io, path, OpenFlags::default(), enable_mvcc)
}
#[cfg(feature = "fs")]
pub fn open_file_with_flags(
io: Arc<dyn IO>,
path: &str,
flags: OpenFlags,
enable_mvcc: bool,
) -> Result<Arc<Database>> {
let file = io.open_file(path, flags, true)?;
maybe_init_database_file(&file, &io)?;
let db_file = Arc::new(DatabaseFile::new(file));
Self::open(io, path, db_file, enable_mvcc)
Self::open_with_flags(io, path, db_file, flags, enable_mvcc)
}
#[allow(clippy::arc_with_non_send_sync)]
@@ -121,6 +132,17 @@ impl Database {
path: &str,
db_file: Arc<dyn DatabaseStorage>,
enable_mvcc: bool,
) -> Result<Arc<Database>> {
Self::open_with_flags(io, path, db_file, OpenFlags::default(), enable_mvcc)
}
#[allow(clippy::arc_with_non_send_sync)]
pub fn open_with_flags(
io: Arc<dyn IO>,
path: &str,
db_file: Arc<dyn DatabaseStorage>,
flags: OpenFlags,
enable_mvcc: bool,
) -> Result<Arc<Database>> {
let db_header = Pager::begin_open(db_file.clone())?;
// ensure db header is there
@@ -155,6 +177,7 @@ impl Database {
db_file,
io: io.clone(),
page_size,
open_flags: flags,
};
let db = Arc::new(db);
{

View File

@@ -1564,20 +1564,17 @@ pub fn op_transaction(
let Insn::Transaction { write } = insn else {
unreachable!("unexpected Insn {:?}", insn)
};
let connection = program.connection.upgrade().unwrap();
if *write && connection._db.open_flags.contains(OpenFlags::ReadOnly) {
return Err(LimboError::ReadOnly);
}
if let Some(mv_store) = &mv_store {
if state.mv_tx_id.is_none() {
let tx_id = mv_store.begin_tx();
program
.connection
.upgrade()
.unwrap()
.mv_transactions
.borrow_mut()
.push(tx_id);
connection.mv_transactions.borrow_mut().push(tx_id);
state.mv_tx_id = Some(tx_id);
}
} else {
let connection = program.connection.upgrade().unwrap();
let current_state = connection.transaction_state.get();
let (new_transaction_state, updated) = match (current_state, write) {
(TransactionState::Write, true) => (TransactionState::Write, false),