diff --git a/core/lib.rs b/core/lib.rs index 00ff043c4..6cf030fc5 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -16,7 +16,6 @@ pub mod mvcc; mod parameters; mod pragma; mod pseudo; -pub mod result; mod schema; #[cfg(feature = "series")] mod series; @@ -1519,19 +1518,10 @@ impl Connection { #[cfg(all(feature = "fs", feature = "conn_raw_api"))] pub fn wal_insert_begin(&self) -> Result<()> { let pager = self.pager.borrow(); - match pager.begin_read_tx()? { - result::LimboResult::Busy => return Err(LimboError::Busy), - result::LimboResult::Ok => {} - } - match pager.io.block(|| pager.begin_write_tx()).inspect_err(|_| { + pager.begin_read_tx()?; + pager.io.block(|| pager.begin_write_tx()).inspect_err(|_| { pager.end_read_tx().expect("read txn must be closed"); - })? { - result::LimboResult::Busy => { - pager.end_read_tx().expect("read txn must be closed"); - return Err(LimboError::Busy); - } - result::LimboResult::Ok => {} - } + })?; // start write transaction and disable auto-commit mode as SQL can be executed within WAL session (at caller own risk) self.transaction_state.replace(TransactionState::Write { diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index 62a7a3b11..f4b3ae8c4 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -1,6 +1,5 @@ use crate::mvcc::clock::LogicalClock; use crate::mvcc::persistent_storage::Storage; -use crate::result::LimboResult; use crate::return_if_io; use crate::state_machine::StateMachine; use crate::state_machine::StateTransition; @@ -573,18 +572,20 @@ impl StateTransition for CommitStateMachine { // Note that this would be incredibly unsafe in the regular transaction model, but in MVCC we trust // the MV-store to uphold the guarantee that no write-write conflicts happened. self.pager.end_read_tx().expect("end_read_tx cannot fail"); - let result = self.pager.begin_read_tx()?; - if let crate::result::LimboResult::Busy = result { + let result = self.pager.begin_read_tx(); + if let Err(LimboError::Busy) = result { // We cannot obtain a WAL read lock due to contention, so we must abort. self.commit_coordinator.pager_commit_lock.unlock(); return Err(LimboError::WriteWriteConflict); } - let result = self.pager.io.block(|| self.pager.begin_write_tx())?; - if let crate::result::LimboResult::Busy = result { + result?; + let result = self.pager.io.block(|| self.pager.begin_write_tx()); + if let Err(LimboError::Busy) = result { // There is a non-CONCURRENT transaction holding the write lock. We must abort. self.commit_coordinator.pager_commit_lock.unlock(); return Err(LimboError::WriteWriteConflict); } + result?; self.state = CommitState::WriteRow { end_ts, write_set_index: 0, @@ -1342,13 +1343,9 @@ impl MvStore { // Try to acquire the pager read lock if !is_upgrade_from_read { - match pager.begin_read_tx()? { - LimboResult::Busy => { - self.release_exclusive_tx(&tx_id); - return Err(LimboError::Busy); - } - LimboResult::Ok => {} - } + pager.begin_read_tx().inspect_err(|_| { + self.release_exclusive_tx(&tx_id); + })?; } let locked = self.commit_coordinator.pager_commit_lock.write(); if !locked { @@ -1357,30 +1354,28 @@ impl MvStore { return Err(LimboError::Busy); } // Try to acquire the pager write lock - match return_if_io!(pager.begin_write_tx()) { - LimboResult::Busy => { - tracing::debug!("begin_exclusive_tx: tx_id={} failed with Busy", tx_id); - // Failed to get pager lock - release our exclusive lock - self.commit_coordinator.pager_commit_lock.unlock(); - self.release_exclusive_tx(&tx_id); - if maybe_existing_tx_id.is_none() { - // If we were upgrading an existing non-CONCURRENT mvcc transaction to write, we don't end the read tx on Busy. - // But if we were beginning a completely new non-CONCURRENT mvcc transaction, we do end it because the next time the connection - // attempts to do something, it will open a new read tx, which will fail if we don't end this one here. - pager.end_read_tx()?; - } - return Err(LimboError::Busy); - } - LimboResult::Ok => { - let tx = Transaction::new(tx_id, begin_ts); - tracing::trace!( - "begin_exclusive_tx(tx_id={}) - exclusive write transaction", - tx_id - ); - tracing::debug!("begin_exclusive_tx: tx_id={} succeeded", tx_id); - self.txs.insert(tx_id, tx); + let begin_w_tx_res = pager.begin_write_tx(); + if let Err(LimboError::Busy) = begin_w_tx_res { + tracing::debug!("begin_exclusive_tx: tx_id={} failed with Busy", tx_id); + // Failed to get pager lock - release our exclusive lock + self.commit_coordinator.pager_commit_lock.unlock(); + self.release_exclusive_tx(&tx_id); + if maybe_existing_tx_id.is_none() { + // If we were upgrading an existing non-CONCURRENT mvcc transaction to write, we don't end the read tx on Busy. + // But if we were beginning a completely new non-CONCURRENT mvcc transaction, we do end it because the next time the connection + // attempts to do something, it will open a new read tx, which will fail if we don't end this one here. + pager.end_read_tx()?; } + return Err(LimboError::Busy); } + return_if_io!(begin_w_tx_res); + let tx = Transaction::new(tx_id, begin_ts); + tracing::trace!( + "begin_exclusive_tx(tx_id={}) - exclusive write transaction", + tx_id + ); + tracing::debug!("begin_exclusive_tx: tx_id={} succeeded", tx_id); + self.txs.insert(tx_id, tx); Ok(IOResult::Done(tx_id)) } @@ -1399,10 +1394,7 @@ impl MvStore { // TODO: we need to tie a pager's read transaction to a transaction ID, so that future refactors to read // pages from WAL/DB read from a consistent state to maintiain snapshot isolation. - let result = pager.begin_read_tx()?; - if let crate::result::LimboResult::Busy = result { - return Err(LimboError::Busy); - } + pager.begin_read_tx()?; Ok(tx_id) } diff --git a/core/result.rs b/core/result.rs deleted file mode 100644 index a13754b32..000000000 --- a/core/result.rs +++ /dev/null @@ -1,7 +0,0 @@ -/// Common results that different functions can return in limbo. -#[derive(Debug)] -pub enum LimboResult { - /// Couldn't acquire a lock - Busy, - Ok, -} diff --git a/core/schema.rs b/core/schema.rs index cb2817d77..14b19daa4 100644 --- a/core/schema.rs +++ b/core/schema.rs @@ -12,7 +12,6 @@ pub struct View { /// Type alias for regular views collection pub type ViewsMap = HashMap; -use crate::result::LimboResult; use crate::storage::btree::BTreeCursor; use crate::translate::collate::CollationSeq; use crate::translate::plan::SelectPlan; @@ -311,9 +310,7 @@ impl Schema { // Store materialized view info (SQL and root page) for later creation let mut materialized_view_info: HashMap = HashMap::new(); - if matches!(pager.begin_read_tx()?, LimboResult::Busy) { - return Err(LimboError::Busy); - } + pager.begin_read_tx()?; pager.io.block(|| cursor.rewind())?; diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 1ce9e689b..f1823b85d 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -1,4 +1,3 @@ -use crate::result::LimboResult; use crate::storage::wal::IOV_MAX; use crate::storage::{ buffer_pool::BufferPool, @@ -981,16 +980,16 @@ impl Pager { #[inline(always)] #[instrument(skip_all, level = Level::DEBUG)] - pub fn begin_read_tx(&self) -> Result { + pub fn begin_read_tx(&self) -> Result<()> { let Some(wal) = self.wal.as_ref() else { - return Ok(LimboResult::Ok); + return Ok(()); }; - let (result, changed) = wal.borrow_mut().begin_read_tx()?; + let changed = wal.borrow_mut().begin_read_tx()?; if changed { // Someone else changed the database -> assume our page cache is invalid (this is default SQLite behavior, we can probably do better with more granular invalidation) self.clear_page_cache(); } - Ok(result) + Ok(()) } #[instrument(skip_all, level = Level::DEBUG)] @@ -1019,12 +1018,12 @@ impl Pager { #[inline(always)] #[instrument(skip_all, level = Level::DEBUG)] - pub fn begin_write_tx(&self) -> Result> { + pub fn begin_write_tx(&self) -> Result> { // TODO(Diego): The only possibly allocate page1 here is because OpenEphemeral needs a write transaction // we should have a unique API to begin transactions, something like sqlite3BtreeBeginTrans return_if_io!(self.maybe_allocate_page1()); let Some(wal) = self.wal.as_ref() else { - return Ok(IOResult::Done(LimboResult::Ok)); + return Ok(IOResult::Done(())); }; Ok(IOResult::Done(wal.borrow_mut().begin_write_tx()?)) } diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 988245b28..d7368a5ce 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -17,7 +17,6 @@ use super::pager::{PageRef, Pager}; use super::sqlite3_ondisk::{self, checksum_wal, WalHeader, WAL_MAGIC_BE, WAL_MAGIC_LE}; use crate::fast_lock::SpinLock; use crate::io::{clock, File, IO}; -use crate::result::LimboResult; use crate::storage::database::EncryptionOrChecksum; use crate::storage::sqlite3_ondisk::{ begin_read_wal_frame, begin_read_wal_frame_raw, finish_read_page, prepare_wal_frame, @@ -226,10 +225,11 @@ impl TursoRwLock { /// Write-ahead log (WAL). pub trait Wal: Debug { /// Begin a read transaction. - fn begin_read_tx(&mut self) -> Result<(LimboResult, bool)>; + /// Returns whether the database state has changed since the last read transaction. + fn begin_read_tx(&mut self) -> Result; /// Begin a write transaction. - fn begin_write_tx(&mut self) -> Result; + fn begin_write_tx(&mut self) -> Result<()>; /// End a read transaction. fn end_read_tx(&self); @@ -807,10 +807,11 @@ impl Drop for CheckpointLocks { impl Wal for WalFile { /// Begin a read transaction. The caller must ensure that there is not already /// an ongoing read transaction. + /// Returns whether the database state has changed since the last read transaction. /// sqlite/src/wal.c 3023 /// assert(pWal->readLock < 0); /* Not currently locked */ #[instrument(skip_all, level = Level::DEBUG)] - fn begin_read_tx(&mut self) -> Result<(LimboResult, bool)> { + fn begin_read_tx(&mut self) -> Result { turso_assert!( self.max_frame_read_lock_index.get().eq(&NO_LOCK_HELD), "cannot start a new read tx without ending an existing one, lock_value={}, expected={}", @@ -836,7 +837,7 @@ impl Wal for WalFile { if shared_max == nbackfills { let lock_0_idx = 0; if !self.get_shared().read_locks[lock_0_idx].read() { - return Ok((LimboResult::Busy, db_changed)); + return Err(LimboError::Busy); } // we need to keep self.max_frame set to the appropriate // max frame in the wal at the time this transaction starts. @@ -844,7 +845,7 @@ impl Wal for WalFile { self.max_frame_read_lock_index.set(lock_0_idx); self.min_frame = nbackfills + 1; self.last_checksum = last_checksum; - return Ok((LimboResult::Ok, db_changed)); + return Ok(db_changed); } // If we get this far, it means that the reader will want to use @@ -886,7 +887,7 @@ impl Wal for WalFile { if best_idx == -1 || best_mark != shared_max as u32 { // If we cannot find a valid slot or the highest readmark has a stale max frame, we must return busy; // otherwise we would not see some committed changes. - return Ok((LimboResult::Busy, db_changed)); + return Err(LimboError::Busy); } // Now take a shared read on that slot, and if we are successful, @@ -895,7 +896,7 @@ impl Wal for WalFile { let shared = self.get_shared(); if !shared.read_locks[best_idx as usize].read() { // TODO: we should retry here instead of always returning Busy - return Ok((LimboResult::Busy, db_changed)); + return Err(LimboError::Busy); } let checkpoint_seq = shared.wal_header.lock().checkpoint_seq; ( @@ -926,7 +927,6 @@ impl Wal for WalFile { // file that has not yet been checkpointed. This client will not need // to read any frames earlier than minFrame from the wal file - they // can be safely read directly from the database file. - self.min_frame = nb2 + 1; if mx2 != shared_max || nb2 != nbackfills || cksm2 != last_checksum @@ -934,6 +934,7 @@ impl Wal for WalFile { { return Err(LimboError::Busy); } + self.min_frame = nb2 + 1; self.max_frame = best_mark as u64; self.max_frame_read_lock_index.set(best_idx as usize); tracing::debug!( @@ -943,7 +944,7 @@ impl Wal for WalFile { best_idx, shared_max ); - Ok((LimboResult::Ok, db_changed)) + Ok(db_changed) } /// End a read transaction. @@ -962,7 +963,7 @@ impl Wal for WalFile { /// Begin a write transaction #[instrument(skip_all, level = Level::DEBUG)] - fn begin_write_tx(&mut self) -> Result { + fn begin_write_tx(&mut self) -> Result<()> { let shared = self.get_shared_mut(); // sqlite/src/wal.c 3702 // Cannot start a write transaction without first holding a read @@ -974,7 +975,7 @@ impl Wal for WalFile { "must have a read transaction to begin a write transaction" ); if !shared.write_lock.write() { - return Ok(LimboResult::Busy); + return Err(LimboError::Busy); } let (shared_max, nbackfills, last_checksum) = ( shared.max_frame.load(Ordering::Acquire), @@ -986,13 +987,13 @@ impl Wal for WalFile { drop(shared); self.last_checksum = last_checksum; self.min_frame = nbackfills + 1; - return Ok(LimboResult::Ok); + return Ok(()); } // Snapshot is stale, give up and let caller retry from scratch tracing::debug!("unable to upgrade transaction from read to write: snapshot is stale, give up and let caller retry from scratch, self.max_frame={}, shared_max={}", self.max_frame, shared_max); shared.write_lock.unlock(); - Ok(LimboResult::Busy) + Err(LimboError::Busy) } /// End a write transaction @@ -2446,7 +2447,6 @@ impl WalFileShared { #[cfg(test)] pub mod test { use crate::{ - result::LimboResult, storage::{ sqlite3_ondisk::{self, WAL_HEADER_SIZE}, wal::READMARK_NOT_USED, @@ -2710,7 +2710,7 @@ pub mod test { let readmark = { let pager = conn2.pager.borrow_mut(); let mut wal2 = pager.wal.as_ref().unwrap().borrow_mut(); - assert!(matches!(wal2.begin_read_tx().unwrap().0, LimboResult::Ok)); + wal2.begin_read_tx().unwrap(); wal2.get_max_frame() }; @@ -2892,7 +2892,7 @@ pub mod test { let r1_max_frame = { let pager = conn_r1.pager.borrow_mut(); let mut wal = pager.wal.as_ref().unwrap().borrow_mut(); - assert!(matches!(wal.begin_read_tx().unwrap().0, LimboResult::Ok)); + wal.begin_read_tx().unwrap(); wal.get_max_frame() }; bulk_inserts(&conn_writer, 5, 10); @@ -2901,7 +2901,7 @@ pub mod test { let r2_max_frame = { let pager = conn_r2.pager.borrow_mut(); let mut wal = pager.wal.as_ref().unwrap().borrow_mut(); - assert!(matches!(wal.begin_read_tx().unwrap().0, LimboResult::Ok)); + wal.begin_read_tx().unwrap(); wal.get_max_frame() }; @@ -2992,8 +2992,7 @@ pub mod test { let pager = conn2.pager.borrow_mut(); let mut wal = pager.wal.as_ref().unwrap().borrow_mut(); let _ = wal.begin_read_tx().unwrap(); - let res = wal.begin_write_tx().unwrap(); - assert!(matches!(res, LimboResult::Ok), "result: {res:?}"); + wal.begin_write_tx().unwrap(); } // should fail because writer lock is held @@ -3325,8 +3324,7 @@ pub mod test { { let pager = conn2.pager.borrow_mut(); let mut wal = pager.wal.as_ref().unwrap().borrow_mut(); - let (res, _) = wal.begin_read_tx().unwrap(); - assert!(matches!(res, LimboResult::Ok)); + wal.begin_read_tx().unwrap(); } // Make changes using conn1 bulk_inserts(&conn1, 5, 5); @@ -3337,15 +3335,14 @@ pub mod test { wal.begin_write_tx() }; // Should get Busy due to stale snapshot - assert!(matches!(result.unwrap(), LimboResult::Busy)); + assert!(matches!(result, Err(LimboError::Busy))); // End read transaction and start a fresh one { let pager = conn2.pager.borrow(); let mut wal = pager.wal.as_ref().unwrap().borrow_mut(); wal.end_read_tx(); - let (res, _) = wal.begin_read_tx().unwrap(); - assert!(matches!(res, LimboResult::Ok)); + wal.begin_read_tx().unwrap(); } // Now write transaction should work let result = { @@ -3353,7 +3350,7 @@ pub mod test { let mut wal = pager.wal.as_ref().unwrap().borrow_mut(); wal.begin_write_tx() }; - assert!(matches!(result.unwrap(), LimboResult::Ok)); + assert!(matches!(result, Ok(()))); } #[test] @@ -3383,8 +3380,7 @@ pub mod test { { let pager = conn2.pager.borrow_mut(); let mut wal = pager.wal.as_ref().unwrap().borrow_mut(); - let (res, _) = wal.begin_read_tx().unwrap(); - assert!(matches!(res, LimboResult::Ok)); + wal.begin_read_tx().unwrap(); } // should use slot 0, as everything is backfilled assert!(check_read_lock_slot(&conn2, 0)); @@ -3476,8 +3472,7 @@ pub mod test { { let pager = reader.pager.borrow_mut(); let mut wal = pager.wal.as_ref().unwrap().borrow_mut(); - let (res, _) = wal.begin_read_tx().unwrap(); - assert!(matches!(res, LimboResult::Ok)); + wal.begin_read_tx().unwrap(); } let r_snapshot = { let pager = reader.pager.borrow(); diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 9ac5be7e6..53fc247df 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -42,7 +42,7 @@ use std::{ }; use turso_macros::match_ignore_ascii_case; -use crate::{pseudo::PseudoCursor, result::LimboResult}; +use crate::pseudo::PseudoCursor; use crate::{ schema::{affinity, Affinity}, @@ -2205,9 +2205,11 @@ pub fn op_transaction( !conn.is_nested_stmt.get(), "nested stmt should not begin a new read transaction" ); - if let LimboResult::Busy = pager.begin_read_tx()? { + let res = pager.begin_read_tx(); + if let Err(LimboError::Busy) = res { return Ok(InsnFunctionStepResult::Busy); } + res?; } if updated && matches!(new_transaction_state, TransactionState::Write { .. }) { @@ -2215,29 +2217,26 @@ pub fn op_transaction( !conn.is_nested_stmt.get(), "nested stmt should not begin a new write transaction" ); - match pager.begin_write_tx()? { - IOResult::Done(r) => { - if let LimboResult::Busy = r { - // We failed to upgrade to write transaction so put the transaction into its original state. - // That is, if the transaction had not started, end the read transaction so that next time we - // start a new one. - if matches!(current_state, TransactionState::None) { - pager.end_read_tx()?; - conn.transaction_state.replace(TransactionState::None); - } - assert_eq!(conn.transaction_state.get(), current_state); - return Ok(InsnFunctionStepResult::Busy); - } - } - IOResult::IO(io) => { - // set the transaction state to pending so we don't have to - // end the read transaction. - program - .connection - .transaction_state - .replace(TransactionState::PendingUpgrade); - return Ok(InsnFunctionStepResult::IO(io)); + let begin_w_tx_res = pager.begin_write_tx(); + if let Err(LimboError::Busy) = begin_w_tx_res { + // We failed to upgrade to write transaction so put the transaction into its original state. + // That is, if the transaction had not started, end the read transaction so that next time we + // start a new one. + if matches!(current_state, TransactionState::None) { + pager.end_read_tx()?; + conn.transaction_state.replace(TransactionState::None); } + assert_eq!(conn.transaction_state.get(), current_state); + return Ok(InsnFunctionStepResult::Busy); + } + if let IOResult::IO(io) = begin_w_tx_res? { + // set the transaction state to pending so we don't have to + // end the read transaction. + program + .connection + .transaction_state + .replace(TransactionState::PendingUpgrade); + return Ok(InsnFunctionStepResult::IO(io)); } } }