diff --git a/bindings/python/src/lib.rs b/bindings/python/src/lib.rs index 595400a21..c31520a82 100644 --- a/bindings/python/src/lib.rs +++ b/bindings/python/src/lib.rs @@ -143,6 +143,11 @@ impl Cursor { limbo_core::RowResult::Done => { return Ok(None); } + limbo_core::RowResult::Busy => { + return Err( + PyErr::new::("Busy error".to_string()).into() + ); + } } } } else { @@ -177,6 +182,11 @@ impl Cursor { limbo_core::RowResult::Done => { return Ok(results); } + limbo_core::RowResult::Busy => { + return Err( + PyErr::new::("Busy error".to_string()).into() + ); + } } } } else { diff --git a/bindings/wasm/lib.rs b/bindings/wasm/lib.rs index ec2762b91..a2ae5b266 100644 --- a/bindings/wasm/lib.rs +++ b/bindings/wasm/lib.rs @@ -85,7 +85,8 @@ impl Statement { } Ok(limbo_core::RowResult::IO) | Ok(limbo_core::RowResult::Done) - | Ok(limbo_core::RowResult::Interrupt) => JsValue::UNDEFINED, + | Ok(limbo_core::RowResult::Interrupt) + | Ok(limbo_core::RowResult::Busy) => JsValue::UNDEFINED, Err(e) => panic!("Error: {:?}", e), } } @@ -105,6 +106,7 @@ impl Statement { Ok(limbo_core::RowResult::IO) => {} Ok(limbo_core::RowResult::Interrupt) => break, Ok(limbo_core::RowResult::Done) => break, + Ok(limbo_core::RowResult::Busy) => break, Err(e) => panic!("Error: {:?}", e), } } diff --git a/cli/app.rs b/cli/app.rs index 34ab20481..cbce1ca5c 100644 --- a/cli/app.rs +++ b/cli/app.rs @@ -525,6 +525,10 @@ impl Limbo { Ok(RowResult::Done) => { break; } + Ok(RowResult::Busy) => { + self.writeln("database is busy"); + break; + } Err(err) => { let _ = self.writeln(err.to_string()); break; @@ -560,6 +564,10 @@ impl Limbo { } Ok(RowResult::Interrupt) => break, Ok(RowResult::Done) => break, + Ok(RowResult::Busy) => { + self.writeln("database is busy"); + break; + } Err(err) => { let _ = self.write_fmt(format_args!("{}", err)); break; @@ -610,6 +618,10 @@ impl Limbo { } RowResult::Interrupt => break, RowResult::Done => break, + RowResult::Busy => { + self.writeln("database is busy"); + break; + } } } if !found { @@ -663,6 +675,10 @@ impl Limbo { } RowResult::Interrupt => break, RowResult::Done => break, + RowResult::Busy => { + self.writeln("database is busy"); + break; + } } } diff --git a/core/lib.rs b/core/lib.rs index 79e06abfb..255c47217 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -5,6 +5,7 @@ mod io; #[cfg(feature = "json")] mod json; mod pseudo; +mod result; mod schema; mod storage; mod translate; @@ -66,7 +67,6 @@ pub struct Database { pager: Rc, schema: Rc>, header: Rc>, - transaction_state: RefCell, // Shared structures of a Database are the parts that are common to multiple threads that might // create DB connections. shared_page_cache: Arc>, @@ -123,6 +123,7 @@ impl Database { pager: pager.clone(), schema: bootstrap_schema.clone(), header: db_header.clone(), + transaction_state: RefCell::new(TransactionState::None), db: Weak::new(), last_insert_rowid: Cell::new(0), }); @@ -135,7 +136,6 @@ impl Database { pager, schema, header, - transaction_state: RefCell::new(TransactionState::None), shared_page_cache, shared_wal, })) @@ -148,6 +148,7 @@ impl Database { header: self.header.clone(), last_insert_rowid: Cell::new(0), db: Arc::downgrade(self), + transaction_state: RefCell::new(TransactionState::None), }) } } @@ -206,6 +207,7 @@ pub struct Connection { schema: Rc>, header: Rc>, db: Weak, // backpointer to the database holding this connection + transaction_state: RefCell, last_insert_rowid: Cell, } @@ -379,6 +381,7 @@ impl Statement { vdbe::StepResult::IO => Ok(RowResult::IO), vdbe::StepResult::Done => Ok(RowResult::Done), vdbe::StepResult::Interrupt => Ok(RowResult::Interrupt), + vdbe::StepResult::Busy => Ok(RowResult::Busy), } } @@ -395,6 +398,7 @@ pub enum RowResult<'a> { IO, Done, Interrupt, + Busy, } pub struct Row<'a> { diff --git a/core/result.rs b/core/result.rs new file mode 100644 index 000000000..3056528ce --- /dev/null +++ b/core/result.rs @@ -0,0 +1,6 @@ +/// Common results that different functions can return in limbo. +pub enum LimboResult { + /// Couldn't acquire a lock + Busy, + Ok, +} diff --git a/core/storage/pager.rs b/core/storage/pager.rs index cd934d42a..0e0d0304c 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -1,3 +1,4 @@ +use crate::result::LimboResult; use crate::storage::buffer_pool::BufferPool; use crate::storage::database::DatabaseStorage; use crate::storage::sqlite3_ondisk::{self, DatabaseHeader, PageContent}; @@ -196,14 +197,12 @@ impl Pager { }) } - pub fn begin_read_tx(&self) -> Result<()> { - self.wal.borrow_mut().begin_read_tx()?; - Ok(()) + pub fn begin_read_tx(&self) -> Result { + self.wal.borrow_mut().begin_read_tx() } - pub fn begin_write_tx(&self) -> Result<()> { - self.wal.borrow_mut().begin_write_tx()?; - Ok(()) + pub fn begin_write_tx(&self) -> Result { + self.wal.borrow_mut().begin_write_tx() } pub fn end_tx(&self) -> Result { diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 13323dac3..ef8a718ce 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -1,10 +1,12 @@ use std::collections::{HashMap, HashSet}; +use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::RwLock; use std::{cell::RefCell, rc::Rc, sync::Arc}; use log::{debug, trace}; use crate::io::{File, SyncCompletion, IO}; +use crate::result::LimboResult; use crate::storage::sqlite3_ondisk::{ begin_read_wal_frame, begin_write_wal_frame, WAL_FRAME_HEADER_SIZE, WAL_HEADER_SIZE, }; @@ -18,19 +20,110 @@ use super::page_cache::PageCacheKey; use super::pager::{PageRef, Pager}; use super::sqlite3_ondisk::{self, begin_write_btree_page, WalHeader}; +pub const READMARK_NOT_USED: u32 = 0xffffffff; + +pub const NO_LOCK: u32 = 0; +pub const SHARED_LOCK: u32 = 1; +pub const WRITE_LOCK: u32 = 2; + +#[derive(Debug)] +struct LimboRwLock { + lock: AtomicU32, + nreads: AtomicU32, + value: AtomicU32, +} + +impl LimboRwLock { + /// Shared lock. Returns true if it was successful, false if it couldn't lock it + pub fn read(&mut self) -> bool { + let lock = self.lock.load(Ordering::SeqCst); + match lock { + NO_LOCK => { + let res = self.lock.compare_exchange( + lock, + SHARED_LOCK, + Ordering::SeqCst, + Ordering::SeqCst, + ); + let ok = res.is_ok(); + if ok { + dbg!("adding"); + self.nreads.fetch_add(1, Ordering::SeqCst); + } + ok + } + SHARED_LOCK => { + self.nreads.fetch_add(1, Ordering::SeqCst); + true + } + WRITE_LOCK => false, + _ => unreachable!(), + } + } + + /// Locks exlusively. Returns true if it was successful, false if it couldn't lock it + pub fn write(&mut self) -> bool { + let lock = self.lock.load(Ordering::SeqCst); + match lock { + NO_LOCK => { + let res = self.lock.compare_exchange( + lock, + WRITE_LOCK, + Ordering::SeqCst, + Ordering::SeqCst, + ); + res.is_ok() + } + SHARED_LOCK => { + // no op + false + } + WRITE_LOCK => true, + _ => unreachable!(), + } + } + + /// Unlock the current held lock. + pub fn unlock(&mut self) { + let lock = self.lock.load(Ordering::SeqCst); + match lock { + NO_LOCK => {} + SHARED_LOCK => { + let prev = self.nreads.fetch_sub(1, Ordering::SeqCst); + if prev == 1 { + let res = self.lock.compare_exchange( + lock, + NO_LOCK, + Ordering::SeqCst, + Ordering::SeqCst, + ); + assert!(res.is_ok()); + } + } + WRITE_LOCK => { + let res = + self.lock + .compare_exchange(lock, NO_LOCK, Ordering::SeqCst, Ordering::SeqCst); + assert!(res.is_ok()); + } + _ => unreachable!(), + } + } +} + /// Write-ahead log (WAL). pub trait Wal { /// Begin a read transaction. - fn begin_read_tx(&mut self) -> Result<()>; + fn begin_read_tx(&mut self) -> Result; /// Begin a write transaction. - fn begin_write_tx(&mut self) -> Result<()>; + fn begin_write_tx(&mut self) -> Result; /// End a read transaction. - fn end_read_tx(&self) -> Result<()>; + fn end_read_tx(&self) -> Result; /// End a write transaction. - fn end_write_tx(&self) -> Result<()>; + fn end_write_tx(&self) -> Result; /// Find the latest frame containing a page. fn find_frame(&self, page_id: u64) -> Result>; @@ -108,10 +201,16 @@ pub struct WalFile { ongoing_checkpoint: OngoingCheckpoint, checkpoint_threshold: usize, // min and max frames for this connection + /// This is the index to the read_lock in WalFileShared that we are holding. This lock contains + /// the max frame for this connection. + max_frame_read_lock_index: usize, + /// Max frame allowed to lookup range=(minframe..max_frame) max_frame: u64, + /// Start of range to look for frames range=(minframe..max_frame) min_frame: u64, } +// TODO(pere): lock only important parts + pin WalFileShared /// WalFileShared is the part of a WAL that will be shared between threads. A wal has information /// that needs to be communicated between threads so this struct does the job. pub struct WalFileShared { @@ -130,20 +229,94 @@ pub struct WalFileShared { pages_in_frames: Vec, last_checksum: (u32, u32), // Check of last frame in WAL, this is a cumulative checksum over all frames in the WAL file: Rc, + /// read_locks is a list of read locks that can coexist with the max_frame nubmer stored in + /// value. There is a limited amount because and unbounded amount of connections could be + /// fatal. Therefore, for now we copy how SQLite behaves with limited amounts of read max + /// frames that is equal to 5 + read_locks: [LimboRwLock; 5], + /// There is only one write allowed in WAL mode. This lock takes care of ensuring there is only + /// one used. + write_lock: LimboRwLock, } impl Wal for WalFile { /// Begin a read transaction. - fn begin_read_tx(&mut self) -> Result<()> { - let shared = self.shared.read().unwrap(); + fn begin_read_tx(&mut self) -> Result { + let mut shared = self.shared.write().unwrap(); + let max_frame_in_wal = shared.max_frame; self.min_frame = shared.nbackfills + 1; - self.max_frame = shared.max_frame; - Ok(()) + + let mut max_read_mark = 0; + let mut max_read_mark_index = -1; + // Find the largest mark we can find, ignore frames that are impossible to be in range and + // that are not set + for (index, lock) in shared.read_locks.iter().enumerate() { + let this_mark = lock.value.load(Ordering::SeqCst); + if this_mark > max_read_mark && this_mark <= max_frame_in_wal as u32 { + max_read_mark = this_mark; + max_read_mark_index = index as i64; + } + } + + // If we didn't find any mark, then let's add a new one + if max_read_mark_index == -1 { + for (index, lock) in shared.read_locks.iter_mut().enumerate() { + let busy = !lock.write(); + if !busy { + // If this was busy then it must mean >1 threads tried to set this read lock + lock.value.store(max_frame_in_wal as u32, Ordering::SeqCst); + max_read_mark = max_frame_in_wal as u32; + max_read_mark_index = index as i64; + lock.unlock(); + break; + } + } + } + + if max_read_mark_index == -1 { + return Ok(LimboResult::Busy); + } + + let lock = &mut shared.read_locks[max_read_mark_index as usize]; + let busy = !lock.read(); + if busy { + return Ok(LimboResult::Busy); + } + self.max_frame_read_lock_index = max_read_mark_index as usize; + self.max_frame = max_read_mark as u64; + self.min_frame = shared.nbackfills + 1; + log::trace!( + "begin_read_tx(min_frame={}, max_frame={}, lock={})", + self.min_frame, + self.max_frame, + self.max_frame_read_lock_index + ); + Ok(LimboResult::Ok) } /// End a read transaction. - fn end_read_tx(&self) -> Result<()> { - Ok(()) + fn end_read_tx(&self) -> Result { + let mut shared = self.shared.write().unwrap(); + let read_lock = &mut shared.read_locks[self.max_frame_read_lock_index]; + read_lock.unlock(); + Ok(LimboResult::Ok) + } + + /// Begin a write transaction + fn begin_write_tx(&mut self) -> Result { + let mut shared = self.shared.write().unwrap(); + let busy = !shared.write_lock.write(); + if busy { + return Ok(LimboResult::Busy); + } + Ok(LimboResult::Ok) + } + + /// End a write transaction + fn end_write_tx(&self) -> Result { + let mut shared = self.shared.write().unwrap(); + shared.write_lock.unlock(); + Ok(LimboResult::Ok) } /// Find the latest frame containing a page. @@ -186,7 +359,11 @@ impl Wal for WalFile { ) -> Result<()> { let page_id = page.get().id; let mut shared = self.shared.write().unwrap(); - let frame_id = shared.max_frame; + let frame_id = if shared.max_frame == 0 { + 1 + } else { + shared.max_frame + }; let offset = self.frame_offset(frame_id); trace!( "append_frame(frame={}, offset={}, page_id={})", @@ -221,16 +398,6 @@ impl Wal for WalFile { Ok(()) } - /// Begin a write transaction - fn begin_write_tx(&mut self) -> Result<()> { - Ok(()) - } - - /// End a write transaction - fn end_write_tx(&self) -> Result<()> { - Ok(()) - } - fn should_checkpoint(&self) -> bool { let shared = self.shared.read().unwrap(); let frame_id = shared.max_frame as usize; @@ -249,9 +416,29 @@ impl Wal for WalFile { CheckpointState::Start => { // TODO(pere): check what frames are safe to checkpoint between many readers! self.ongoing_checkpoint.min_frame = self.min_frame; - self.ongoing_checkpoint.max_frame = self.max_frame; + let mut shared = self.shared.write().unwrap(); + let max_frame_in_wal = shared.max_frame as u32; + let mut max_safe_frame = shared.max_frame; + for read_lock in shared.read_locks.iter_mut() { + let this_mark = read_lock.value.load(Ordering::SeqCst); + if this_mark < max_safe_frame as u32 { + let busy = !read_lock.write(); + if !busy { + read_lock.value.store(max_frame_in_wal, Ordering::SeqCst); + read_lock.unlock(); + } else { + max_safe_frame = this_mark as u64; + } + } + } + self.ongoing_checkpoint.max_frame = max_safe_frame; self.ongoing_checkpoint.current_page = 0; self.ongoing_checkpoint.state = CheckpointState::ReadFrame; + log::trace!( + "checkpoint_start(min_frame={}, max_frame={})", + self.ongoing_checkpoint.max_frame, + self.ongoing_checkpoint.min_frame + ); } CheckpointState::ReadFrame => { let shared = self.shared.read().unwrap(); @@ -412,10 +599,11 @@ impl WalFile { syncing: Rc::new(RefCell::new(false)), checkpoint_threshold: 1000, page_size, - max_frame: 0, - min_frame: 0, buffer_pool, sync_state: RefCell::new(SyncState::NotSyncing), + max_frame: 0, + min_frame: 0, + max_frame_read_lock_index: 0, } } @@ -488,6 +676,38 @@ impl WalFileShared { last_checksum: checksum, file, pages_in_frames: Vec::new(), + read_locks: [ + LimboRwLock { + lock: AtomicU32::new(NO_LOCK), + nreads: AtomicU32::new(0), + value: AtomicU32::new(READMARK_NOT_USED), + }, + LimboRwLock { + lock: AtomicU32::new(NO_LOCK), + nreads: AtomicU32::new(0), + value: AtomicU32::new(READMARK_NOT_USED), + }, + LimboRwLock { + lock: AtomicU32::new(NO_LOCK), + nreads: AtomicU32::new(0), + value: AtomicU32::new(READMARK_NOT_USED), + }, + LimboRwLock { + lock: AtomicU32::new(NO_LOCK), + nreads: AtomicU32::new(0), + value: AtomicU32::new(READMARK_NOT_USED), + }, + LimboRwLock { + lock: AtomicU32::new(NO_LOCK), + nreads: AtomicU32::new(0), + value: AtomicU32::new(READMARK_NOT_USED), + }, + ], + write_lock: LimboRwLock { + lock: AtomicU32::new(NO_LOCK), + nreads: AtomicU32::new(0), + value: AtomicU32::new(READMARK_NOT_USED), + }, }; Ok(Arc::new(RwLock::new(shared))) } diff --git a/core/util.rs b/core/util.rs index 6320f6612..a57186890 100644 --- a/core/util.rs +++ b/core/util.rs @@ -60,6 +60,7 @@ pub fn parse_schema_rows(rows: Option, schema: &mut Schema, io: Arc break, RowResult::Done => break, + RowResult::Busy => break, } } } diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index 520100463..debabb067 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -28,6 +28,7 @@ use crate::error::{LimboError, SQLITE_CONSTRAINT_PRIMARYKEY}; use crate::ext::{exec_ts_from_uuid7, exec_uuid, exec_uuidblob, exec_uuidstr, ExtFunc, UuidFunc}; use crate::function::{AggFunc, FuncCtx, MathFunc, MathFuncArity, ScalarFunc}; use crate::pseudo::PseudoCursor; +use crate::result::LimboResult; use crate::schema::Table; use crate::storage::sqlite3_ondisk::DatabaseHeader; use crate::storage::{btree::BTreeCursor, pager::Pager}; @@ -537,6 +538,7 @@ pub enum StepResult<'a> { IO, Row(Record<'a>), Interrupt, + Busy, } /// If there is I/O, the instruction is restarted. @@ -1657,29 +1659,34 @@ impl Program { } Insn::Transaction { write } => { let connection = self.connection.upgrade().unwrap(); - if let Some(db) = connection.db.upgrade() { - // TODO(pere): are backpointers good ?? this looks ugly af - // upgrade transaction if needed - let new_transaction_state = - match (db.transaction_state.borrow().clone(), write) { - (crate::TransactionState::Write, true) => TransactionState::Write, - (crate::TransactionState::Write, false) => TransactionState::Write, - (crate::TransactionState::Read, true) => TransactionState::Write, - (crate::TransactionState::Read, false) => TransactionState::Read, - (crate::TransactionState::None, true) => TransactionState::Read, - (crate::TransactionState::None, false) => TransactionState::Read, - }; - // TODO(Pere): - // 1. lock wal - // 2. lock shared - // 3. lock write db if write - db.transaction_state.replace(new_transaction_state.clone()); - if matches!(new_transaction_state, TransactionState::Write) { - pager.begin_read_tx()?; - } else { - pager.begin_write_tx()?; + let current_state = connection.transaction_state.borrow().clone(); + let (new_transaction_state, updated) = match (¤t_state, write) { + (crate::TransactionState::Write, true) => (TransactionState::Write, false), + (crate::TransactionState::Write, false) => (TransactionState::Write, false), + (crate::TransactionState::Read, true) => (TransactionState::Write, true), + (crate::TransactionState::Read, false) => (TransactionState::Read, false), + (crate::TransactionState::None, true) => (TransactionState::Write, true), + (crate::TransactionState::None, false) => (TransactionState::Read, true), + }; + + if updated && matches!(current_state, TransactionState::None) { + if let LimboResult::Busy = pager.begin_read_tx()? { + log::trace!("begin_read_tx busy"); + return Ok(StepResult::Busy); } } + + if updated && matches!(new_transaction_state, TransactionState::Write) { + if let LimboResult::Busy = pager.begin_write_tx()? { + log::trace!("begin_write_tx busy"); + return Ok(StepResult::Busy); + } + } + if updated { + connection + .transaction_state + .replace(new_transaction_state.clone()); + } state.pc += 1; } Insn::Goto { target_pc } => { diff --git a/simulator/generation/plan.rs b/simulator/generation/plan.rs index fd194de66..61b115f01 100644 --- a/simulator/generation/plan.rs +++ b/simulator/generation/plan.rs @@ -235,6 +235,7 @@ impl Interaction { RowResult::Done => { break; } + RowResult::Busy => {} } } diff --git a/simulator/main.rs b/simulator/main.rs index 085711391..7959c655e 100644 --- a/simulator/main.rs +++ b/simulator/main.rs @@ -298,6 +298,9 @@ fn get_all_rows( RowResult::Done => { break; } + RowResult::Busy => { + // for now let's retry? + } } } Ok(out) diff --git a/sqlite3/src/lib.rs b/sqlite3/src/lib.rs index da057df79..b183648b2 100644 --- a/sqlite3/src/lib.rs +++ b/sqlite3/src/lib.rs @@ -246,6 +246,7 @@ pub unsafe extern "C" fn sqlite3_step(stmt: *mut sqlite3_stmt) -> std::ffi::c_in stmt.row.replace(Some(row)); SQLITE_ROW } + limbo_core::RowResult::Busy => SQLITE_BUSY, } } else { SQLITE_ERROR @@ -1032,7 +1033,7 @@ fn sqlite3_errstr_impl(rc: i32) -> *const std::ffi::c_char { "datatype mismatch", // SQLITE_MISMATCH "bad parameter or other API misuse", // SQLITE_MISUSE #[cfg(feature = "lfs")] - "", // SQLITE_NOLFS + "", // SQLITE_NOLFS #[cfg(not(feature = "lfs"))] "large file support is disabled", // SQLITE_NOLFS "authorization denied", // SQLITE_AUTH diff --git a/test/src/lib.rs b/test/src/lib.rs index 53cec37a4..8bd6feea2 100644 --- a/test/src/lib.rs +++ b/test/src/lib.rs @@ -95,6 +95,9 @@ mod tests { } RowResult::Interrupt => break, RowResult::Done => break, + RowResult::Busy => { + panic!("Database is busy"); + } } }, Ok(None) => {} @@ -163,6 +166,7 @@ mod tests { } RowResult::Interrupt => break, RowResult::Done => break, + RowResult::Busy => unreachable!(), } }, Ok(None) => {} @@ -237,6 +241,7 @@ mod tests { } RowResult::Interrupt => break, RowResult::Done => break, + RowResult::Busy => unreachable!(), } }, Ok(None) => {} @@ -300,6 +305,7 @@ mod tests { } RowResult::Interrupt => break, RowResult::Done => break, + RowResult::Busy => unreachable!(), } }, Ok(None) => {} @@ -361,6 +367,7 @@ mod tests { } RowResult::Interrupt => break, RowResult::Done => break, + RowResult::Busy => panic!("Database is busy"), } } } @@ -453,6 +460,7 @@ mod tests { } RowResult::Interrupt => break, RowResult::Done => break, + RowResult::Busy => panic!("Database is busy"), } } } @@ -487,6 +495,7 @@ mod tests { } RowResult::Interrupt => break, RowResult::Done => break, + RowResult::Busy => panic!("Database is busy"), } }, Ok(None) => {}