Remove LimboResult

this is only used for returning LimboResult::Busy, and we already
have LimboError::Busy, so it only adds confusion.

Moreover, the current busy handler was not handling LimboError::Busy,
because it's returned as an error, not as Ok. So this may fix the
"busy handler not working" issue in the perf thrpt benchmark.
This commit is contained in:
Jussi Saurio
2025-09-17 11:04:44 +03:00
parent 104b8dd083
commit dc103da2ed
7 changed files with 88 additions and 123 deletions

View File

@@ -16,7 +16,6 @@ pub mod mvcc;
mod parameters;
mod pragma;
mod pseudo;
pub mod result;
mod schema;
#[cfg(feature = "series")]
mod series;
@@ -1519,19 +1518,10 @@ impl Connection {
#[cfg(all(feature = "fs", feature = "conn_raw_api"))]
pub fn wal_insert_begin(&self) -> Result<()> {
let pager = self.pager.borrow();
match pager.begin_read_tx()? {
result::LimboResult::Busy => return Err(LimboError::Busy),
result::LimboResult::Ok => {}
}
match pager.io.block(|| pager.begin_write_tx()).inspect_err(|_| {
pager.begin_read_tx()?;
pager.io.block(|| pager.begin_write_tx()).inspect_err(|_| {
pager.end_read_tx().expect("read txn must be closed");
})? {
result::LimboResult::Busy => {
pager.end_read_tx().expect("read txn must be closed");
return Err(LimboError::Busy);
}
result::LimboResult::Ok => {}
}
})?;
// start write transaction and disable auto-commit mode as SQL can be executed within WAL session (at caller own risk)
self.transaction_state.replace(TransactionState::Write {

View File

@@ -1,6 +1,5 @@
use crate::mvcc::clock::LogicalClock;
use crate::mvcc::persistent_storage::Storage;
use crate::result::LimboResult;
use crate::return_if_io;
use crate::state_machine::StateMachine;
use crate::state_machine::StateTransition;
@@ -573,18 +572,20 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
// Note that this would be incredibly unsafe in the regular transaction model, but in MVCC we trust
// the MV-store to uphold the guarantee that no write-write conflicts happened.
self.pager.end_read_tx().expect("end_read_tx cannot fail");
let result = self.pager.begin_read_tx()?;
if let crate::result::LimboResult::Busy = result {
let result = self.pager.begin_read_tx();
if let Err(LimboError::Busy) = result {
// We cannot obtain a WAL read lock due to contention, so we must abort.
self.commit_coordinator.pager_commit_lock.unlock();
return Err(LimboError::WriteWriteConflict);
}
let result = self.pager.io.block(|| self.pager.begin_write_tx())?;
if let crate::result::LimboResult::Busy = result {
result?;
let result = self.pager.io.block(|| self.pager.begin_write_tx());
if let Err(LimboError::Busy) = result {
// There is a non-CONCURRENT transaction holding the write lock. We must abort.
self.commit_coordinator.pager_commit_lock.unlock();
return Err(LimboError::WriteWriteConflict);
}
result?;
self.state = CommitState::WriteRow {
end_ts,
write_set_index: 0,
@@ -1342,13 +1343,9 @@ impl<Clock: LogicalClock> MvStore<Clock> {
// Try to acquire the pager read lock
if !is_upgrade_from_read {
match pager.begin_read_tx()? {
LimboResult::Busy => {
self.release_exclusive_tx(&tx_id);
return Err(LimboError::Busy);
}
LimboResult::Ok => {}
}
pager.begin_read_tx().inspect_err(|_| {
self.release_exclusive_tx(&tx_id);
})?;
}
let locked = self.commit_coordinator.pager_commit_lock.write();
if !locked {
@@ -1357,30 +1354,28 @@ impl<Clock: LogicalClock> MvStore<Clock> {
return Err(LimboError::Busy);
}
// Try to acquire the pager write lock
match return_if_io!(pager.begin_write_tx()) {
LimboResult::Busy => {
tracing::debug!("begin_exclusive_tx: tx_id={} failed with Busy", tx_id);
// Failed to get pager lock - release our exclusive lock
self.commit_coordinator.pager_commit_lock.unlock();
self.release_exclusive_tx(&tx_id);
if maybe_existing_tx_id.is_none() {
// If we were upgrading an existing non-CONCURRENT mvcc transaction to write, we don't end the read tx on Busy.
// But if we were beginning a completely new non-CONCURRENT mvcc transaction, we do end it because the next time the connection
// attempts to do something, it will open a new read tx, which will fail if we don't end this one here.
pager.end_read_tx()?;
}
return Err(LimboError::Busy);
}
LimboResult::Ok => {
let tx = Transaction::new(tx_id, begin_ts);
tracing::trace!(
"begin_exclusive_tx(tx_id={}) - exclusive write transaction",
tx_id
);
tracing::debug!("begin_exclusive_tx: tx_id={} succeeded", tx_id);
self.txs.insert(tx_id, tx);
let begin_w_tx_res = pager.begin_write_tx();
if let Err(LimboError::Busy) = begin_w_tx_res {
tracing::debug!("begin_exclusive_tx: tx_id={} failed with Busy", tx_id);
// Failed to get pager lock - release our exclusive lock
self.commit_coordinator.pager_commit_lock.unlock();
self.release_exclusive_tx(&tx_id);
if maybe_existing_tx_id.is_none() {
// If we were upgrading an existing non-CONCURRENT mvcc transaction to write, we don't end the read tx on Busy.
// But if we were beginning a completely new non-CONCURRENT mvcc transaction, we do end it because the next time the connection
// attempts to do something, it will open a new read tx, which will fail if we don't end this one here.
pager.end_read_tx()?;
}
return Err(LimboError::Busy);
}
return_if_io!(begin_w_tx_res);
let tx = Transaction::new(tx_id, begin_ts);
tracing::trace!(
"begin_exclusive_tx(tx_id={}) - exclusive write transaction",
tx_id
);
tracing::debug!("begin_exclusive_tx: tx_id={} succeeded", tx_id);
self.txs.insert(tx_id, tx);
Ok(IOResult::Done(tx_id))
}
@@ -1399,10 +1394,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
// TODO: we need to tie a pager's read transaction to a transaction ID, so that future refactors to read
// pages from WAL/DB read from a consistent state to maintiain snapshot isolation.
let result = pager.begin_read_tx()?;
if let crate::result::LimboResult::Busy = result {
return Err(LimboError::Busy);
}
pager.begin_read_tx()?;
Ok(tx_id)
}

View File

@@ -1,7 +0,0 @@
/// Common results that different functions can return in limbo.
#[derive(Debug)]
pub enum LimboResult {
/// Couldn't acquire a lock
Busy,
Ok,
}

View File

@@ -12,7 +12,6 @@ pub struct View {
/// Type alias for regular views collection
pub type ViewsMap = HashMap<String, View>;
use crate::result::LimboResult;
use crate::storage::btree::BTreeCursor;
use crate::translate::collate::CollationSeq;
use crate::translate::plan::SelectPlan;
@@ -311,9 +310,7 @@ impl Schema {
// Store materialized view info (SQL and root page) for later creation
let mut materialized_view_info: HashMap<String, (String, usize)> = HashMap::new();
if matches!(pager.begin_read_tx()?, LimboResult::Busy) {
return Err(LimboError::Busy);
}
pager.begin_read_tx()?;
pager.io.block(|| cursor.rewind())?;

View File

@@ -1,4 +1,3 @@
use crate::result::LimboResult;
use crate::storage::wal::IOV_MAX;
use crate::storage::{
buffer_pool::BufferPool,
@@ -981,16 +980,16 @@ impl Pager {
#[inline(always)]
#[instrument(skip_all, level = Level::DEBUG)]
pub fn begin_read_tx(&self) -> Result<LimboResult> {
pub fn begin_read_tx(&self) -> Result<()> {
let Some(wal) = self.wal.as_ref() else {
return Ok(LimboResult::Ok);
return Ok(());
};
let (result, changed) = wal.borrow_mut().begin_read_tx()?;
let changed = wal.borrow_mut().begin_read_tx()?;
if changed {
// Someone else changed the database -> assume our page cache is invalid (this is default SQLite behavior, we can probably do better with more granular invalidation)
self.clear_page_cache();
}
Ok(result)
Ok(())
}
#[instrument(skip_all, level = Level::DEBUG)]
@@ -1019,12 +1018,12 @@ impl Pager {
#[inline(always)]
#[instrument(skip_all, level = Level::DEBUG)]
pub fn begin_write_tx(&self) -> Result<IOResult<LimboResult>> {
pub fn begin_write_tx(&self) -> Result<IOResult<()>> {
// TODO(Diego): The only possibly allocate page1 here is because OpenEphemeral needs a write transaction
// we should have a unique API to begin transactions, something like sqlite3BtreeBeginTrans
return_if_io!(self.maybe_allocate_page1());
let Some(wal) = self.wal.as_ref() else {
return Ok(IOResult::Done(LimboResult::Ok));
return Ok(IOResult::Done(()));
};
Ok(IOResult::Done(wal.borrow_mut().begin_write_tx()?))
}

View File

@@ -17,7 +17,6 @@ use super::pager::{PageRef, Pager};
use super::sqlite3_ondisk::{self, checksum_wal, WalHeader, WAL_MAGIC_BE, WAL_MAGIC_LE};
use crate::fast_lock::SpinLock;
use crate::io::{clock, File, IO};
use crate::result::LimboResult;
use crate::storage::database::EncryptionOrChecksum;
use crate::storage::sqlite3_ondisk::{
begin_read_wal_frame, begin_read_wal_frame_raw, finish_read_page, prepare_wal_frame,
@@ -226,10 +225,11 @@ impl TursoRwLock {
/// Write-ahead log (WAL).
pub trait Wal: Debug {
/// Begin a read transaction.
fn begin_read_tx(&mut self) -> Result<(LimboResult, bool)>;
/// Returns whether the database state has changed since the last read transaction.
fn begin_read_tx(&mut self) -> Result<bool>;
/// Begin a write transaction.
fn begin_write_tx(&mut self) -> Result<LimboResult>;
fn begin_write_tx(&mut self) -> Result<()>;
/// End a read transaction.
fn end_read_tx(&self);
@@ -807,10 +807,11 @@ impl Drop for CheckpointLocks {
impl Wal for WalFile {
/// Begin a read transaction. The caller must ensure that there is not already
/// an ongoing read transaction.
/// Returns whether the database state has changed since the last read transaction.
/// sqlite/src/wal.c 3023
/// assert(pWal->readLock < 0); /* Not currently locked */
#[instrument(skip_all, level = Level::DEBUG)]
fn begin_read_tx(&mut self) -> Result<(LimboResult, bool)> {
fn begin_read_tx(&mut self) -> Result<bool> {
turso_assert!(
self.max_frame_read_lock_index.get().eq(&NO_LOCK_HELD),
"cannot start a new read tx without ending an existing one, lock_value={}, expected={}",
@@ -836,7 +837,7 @@ impl Wal for WalFile {
if shared_max == nbackfills {
let lock_0_idx = 0;
if !self.get_shared().read_locks[lock_0_idx].read() {
return Ok((LimboResult::Busy, db_changed));
return Err(LimboError::Busy);
}
// we need to keep self.max_frame set to the appropriate
// max frame in the wal at the time this transaction starts.
@@ -844,7 +845,7 @@ impl Wal for WalFile {
self.max_frame_read_lock_index.set(lock_0_idx);
self.min_frame = nbackfills + 1;
self.last_checksum = last_checksum;
return Ok((LimboResult::Ok, db_changed));
return Ok(db_changed);
}
// If we get this far, it means that the reader will want to use
@@ -886,7 +887,7 @@ impl Wal for WalFile {
if best_idx == -1 || best_mark != shared_max as u32 {
// If we cannot find a valid slot or the highest readmark has a stale max frame, we must return busy;
// otherwise we would not see some committed changes.
return Ok((LimboResult::Busy, db_changed));
return Err(LimboError::Busy);
}
// Now take a shared read on that slot, and if we are successful,
@@ -895,7 +896,7 @@ impl Wal for WalFile {
let shared = self.get_shared();
if !shared.read_locks[best_idx as usize].read() {
// TODO: we should retry here instead of always returning Busy
return Ok((LimboResult::Busy, db_changed));
return Err(LimboError::Busy);
}
let checkpoint_seq = shared.wal_header.lock().checkpoint_seq;
(
@@ -926,7 +927,6 @@ impl Wal for WalFile {
// file that has not yet been checkpointed. This client will not need
// to read any frames earlier than minFrame from the wal file - they
// can be safely read directly from the database file.
self.min_frame = nb2 + 1;
if mx2 != shared_max
|| nb2 != nbackfills
|| cksm2 != last_checksum
@@ -934,6 +934,7 @@ impl Wal for WalFile {
{
return Err(LimboError::Busy);
}
self.min_frame = nb2 + 1;
self.max_frame = best_mark as u64;
self.max_frame_read_lock_index.set(best_idx as usize);
tracing::debug!(
@@ -943,7 +944,7 @@ impl Wal for WalFile {
best_idx,
shared_max
);
Ok((LimboResult::Ok, db_changed))
Ok(db_changed)
}
/// End a read transaction.
@@ -962,7 +963,7 @@ impl Wal for WalFile {
/// Begin a write transaction
#[instrument(skip_all, level = Level::DEBUG)]
fn begin_write_tx(&mut self) -> Result<LimboResult> {
fn begin_write_tx(&mut self) -> Result<()> {
let shared = self.get_shared_mut();
// sqlite/src/wal.c 3702
// Cannot start a write transaction without first holding a read
@@ -974,7 +975,7 @@ impl Wal for WalFile {
"must have a read transaction to begin a write transaction"
);
if !shared.write_lock.write() {
return Ok(LimboResult::Busy);
return Err(LimboError::Busy);
}
let (shared_max, nbackfills, last_checksum) = (
shared.max_frame.load(Ordering::Acquire),
@@ -986,13 +987,13 @@ impl Wal for WalFile {
drop(shared);
self.last_checksum = last_checksum;
self.min_frame = nbackfills + 1;
return Ok(LimboResult::Ok);
return Ok(());
}
// Snapshot is stale, give up and let caller retry from scratch
tracing::debug!("unable to upgrade transaction from read to write: snapshot is stale, give up and let caller retry from scratch, self.max_frame={}, shared_max={}", self.max_frame, shared_max);
shared.write_lock.unlock();
Ok(LimboResult::Busy)
Err(LimboError::Busy)
}
/// End a write transaction
@@ -2446,7 +2447,6 @@ impl WalFileShared {
#[cfg(test)]
pub mod test {
use crate::{
result::LimboResult,
storage::{
sqlite3_ondisk::{self, WAL_HEADER_SIZE},
wal::READMARK_NOT_USED,
@@ -2710,7 +2710,7 @@ pub mod test {
let readmark = {
let pager = conn2.pager.borrow_mut();
let mut wal2 = pager.wal.as_ref().unwrap().borrow_mut();
assert!(matches!(wal2.begin_read_tx().unwrap().0, LimboResult::Ok));
wal2.begin_read_tx().unwrap();
wal2.get_max_frame()
};
@@ -2892,7 +2892,7 @@ pub mod test {
let r1_max_frame = {
let pager = conn_r1.pager.borrow_mut();
let mut wal = pager.wal.as_ref().unwrap().borrow_mut();
assert!(matches!(wal.begin_read_tx().unwrap().0, LimboResult::Ok));
wal.begin_read_tx().unwrap();
wal.get_max_frame()
};
bulk_inserts(&conn_writer, 5, 10);
@@ -2901,7 +2901,7 @@ pub mod test {
let r2_max_frame = {
let pager = conn_r2.pager.borrow_mut();
let mut wal = pager.wal.as_ref().unwrap().borrow_mut();
assert!(matches!(wal.begin_read_tx().unwrap().0, LimboResult::Ok));
wal.begin_read_tx().unwrap();
wal.get_max_frame()
};
@@ -2992,8 +2992,7 @@ pub mod test {
let pager = conn2.pager.borrow_mut();
let mut wal = pager.wal.as_ref().unwrap().borrow_mut();
let _ = wal.begin_read_tx().unwrap();
let res = wal.begin_write_tx().unwrap();
assert!(matches!(res, LimboResult::Ok), "result: {res:?}");
wal.begin_write_tx().unwrap();
}
// should fail because writer lock is held
@@ -3325,8 +3324,7 @@ pub mod test {
{
let pager = conn2.pager.borrow_mut();
let mut wal = pager.wal.as_ref().unwrap().borrow_mut();
let (res, _) = wal.begin_read_tx().unwrap();
assert!(matches!(res, LimboResult::Ok));
wal.begin_read_tx().unwrap();
}
// Make changes using conn1
bulk_inserts(&conn1, 5, 5);
@@ -3337,15 +3335,14 @@ pub mod test {
wal.begin_write_tx()
};
// Should get Busy due to stale snapshot
assert!(matches!(result.unwrap(), LimboResult::Busy));
assert!(matches!(result, Err(LimboError::Busy)));
// End read transaction and start a fresh one
{
let pager = conn2.pager.borrow();
let mut wal = pager.wal.as_ref().unwrap().borrow_mut();
wal.end_read_tx();
let (res, _) = wal.begin_read_tx().unwrap();
assert!(matches!(res, LimboResult::Ok));
wal.begin_read_tx().unwrap();
}
// Now write transaction should work
let result = {
@@ -3353,7 +3350,7 @@ pub mod test {
let mut wal = pager.wal.as_ref().unwrap().borrow_mut();
wal.begin_write_tx()
};
assert!(matches!(result.unwrap(), LimboResult::Ok));
assert!(matches!(result, Ok(())));
}
#[test]
@@ -3383,8 +3380,7 @@ pub mod test {
{
let pager = conn2.pager.borrow_mut();
let mut wal = pager.wal.as_ref().unwrap().borrow_mut();
let (res, _) = wal.begin_read_tx().unwrap();
assert!(matches!(res, LimboResult::Ok));
wal.begin_read_tx().unwrap();
}
// should use slot 0, as everything is backfilled
assert!(check_read_lock_slot(&conn2, 0));
@@ -3476,8 +3472,7 @@ pub mod test {
{
let pager = reader.pager.borrow_mut();
let mut wal = pager.wal.as_ref().unwrap().borrow_mut();
let (res, _) = wal.begin_read_tx().unwrap();
assert!(matches!(res, LimboResult::Ok));
wal.begin_read_tx().unwrap();
}
let r_snapshot = {
let pager = reader.pager.borrow();

View File

@@ -42,7 +42,7 @@ use std::{
};
use turso_macros::match_ignore_ascii_case;
use crate::{pseudo::PseudoCursor, result::LimboResult};
use crate::pseudo::PseudoCursor;
use crate::{
schema::{affinity, Affinity},
@@ -2205,9 +2205,11 @@ pub fn op_transaction(
!conn.is_nested_stmt.get(),
"nested stmt should not begin a new read transaction"
);
if let LimboResult::Busy = pager.begin_read_tx()? {
let res = pager.begin_read_tx();
if let Err(LimboError::Busy) = res {
return Ok(InsnFunctionStepResult::Busy);
}
res?;
}
if updated && matches!(new_transaction_state, TransactionState::Write { .. }) {
@@ -2215,29 +2217,26 @@ pub fn op_transaction(
!conn.is_nested_stmt.get(),
"nested stmt should not begin a new write transaction"
);
match pager.begin_write_tx()? {
IOResult::Done(r) => {
if let LimboResult::Busy = r {
// We failed to upgrade to write transaction so put the transaction into its original state.
// That is, if the transaction had not started, end the read transaction so that next time we
// start a new one.
if matches!(current_state, TransactionState::None) {
pager.end_read_tx()?;
conn.transaction_state.replace(TransactionState::None);
}
assert_eq!(conn.transaction_state.get(), current_state);
return Ok(InsnFunctionStepResult::Busy);
}
}
IOResult::IO(io) => {
// set the transaction state to pending so we don't have to
// end the read transaction.
program
.connection
.transaction_state
.replace(TransactionState::PendingUpgrade);
return Ok(InsnFunctionStepResult::IO(io));
let begin_w_tx_res = pager.begin_write_tx();
if let Err(LimboError::Busy) = begin_w_tx_res {
// We failed to upgrade to write transaction so put the transaction into its original state.
// That is, if the transaction had not started, end the read transaction so that next time we
// start a new one.
if matches!(current_state, TransactionState::None) {
pager.end_read_tx()?;
conn.transaction_state.replace(TransactionState::None);
}
assert_eq!(conn.transaction_state.get(), current_state);
return Ok(InsnFunctionStepResult::Busy);
}
if let IOResult::IO(io) = begin_w_tx_res? {
// set the transaction state to pending so we don't have to
// end the read transaction.
program
.connection
.transaction_state
.replace(TransactionState::PendingUpgrade);
return Ok(InsnFunctionStepResult::IO(io));
}
}
}