mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-22 08:25:29 +01:00
mvcc: fix non-concurrent transaction semantics
on the main branch, mvcc allows concurrent inserts from multiple txns even without BEGIN CONCURRENT, and then always hangs whenever one of the txns tries to commit. this commit fixes that issue.
This commit is contained in:
@@ -1262,19 +1262,50 @@ impl<Clock: LogicalClock> MvStore<Clock> {
|
||||
///
|
||||
/// This is used for IMMEDIATE and EXCLUSIVE transaction types where we need
|
||||
/// to ensure exclusive write access as per SQLite semantics.
|
||||
pub fn begin_exclusive_tx(&self, pager: Rc<Pager>) -> Result<IOResult<TxID>> {
|
||||
let tx_id = self.get_tx_id();
|
||||
pub fn begin_exclusive_tx(
|
||||
&self,
|
||||
pager: Rc<Pager>,
|
||||
maybe_existing_tx_id: Option<TxID>,
|
||||
) -> Result<IOResult<TxID>> {
|
||||
self._begin_exclusive_tx(pager, false, maybe_existing_tx_id)
|
||||
}
|
||||
|
||||
/// Upgrades a read transaction to an exclusive write transaction.
|
||||
///
|
||||
/// This is used for IMMEDIATE and EXCLUSIVE transaction types where we need
|
||||
/// to ensure exclusive write access as per SQLite semantics.
|
||||
pub fn upgrade_to_exclusive_tx(
|
||||
&self,
|
||||
pager: Rc<Pager>,
|
||||
maybe_existing_tx_id: Option<TxID>,
|
||||
) -> Result<IOResult<TxID>> {
|
||||
self._begin_exclusive_tx(pager, true, maybe_existing_tx_id)
|
||||
}
|
||||
|
||||
/// Begins an exclusive write transaction that prevents concurrent writes.
|
||||
///
|
||||
/// This is used for IMMEDIATE and EXCLUSIVE transaction types where we need
|
||||
/// to ensure exclusive write access as per SQLite semantics.
|
||||
fn _begin_exclusive_tx(
|
||||
&self,
|
||||
pager: Rc<Pager>,
|
||||
is_upgrade_from_read: bool,
|
||||
maybe_existing_tx_id: Option<TxID>,
|
||||
) -> Result<IOResult<TxID>> {
|
||||
let tx_id = maybe_existing_tx_id.unwrap_or_else(|| self.get_tx_id());
|
||||
let begin_ts = self.get_timestamp();
|
||||
|
||||
self.acquire_exclusive_tx(&tx_id)?;
|
||||
|
||||
// Try to acquire the pager read lock
|
||||
match pager.begin_read_tx()? {
|
||||
LimboResult::Busy => {
|
||||
self.release_exclusive_tx(&tx_id);
|
||||
return Err(LimboError::Busy);
|
||||
if !is_upgrade_from_read {
|
||||
match pager.begin_read_tx()? {
|
||||
LimboResult::Busy => {
|
||||
self.release_exclusive_tx(&tx_id);
|
||||
return Err(LimboError::Busy);
|
||||
}
|
||||
LimboResult::Ok => {}
|
||||
}
|
||||
LimboResult::Ok => {}
|
||||
}
|
||||
let locked = self.commit_coordinator.pager_commit_lock.write();
|
||||
if !locked {
|
||||
@@ -1287,7 +1318,9 @@ impl<Clock: LogicalClock> MvStore<Clock> {
|
||||
LimboResult::Busy => {
|
||||
tracing::debug!("begin_exclusive_tx: tx_id={} failed with Busy", tx_id);
|
||||
// Failed to get pager lock - release our exclusive lock
|
||||
panic!("begin_exclusive_tx: tx_id={tx_id} failed with Busy, this should never happen as we were able to lock mvcc exclusive write lock");
|
||||
self.commit_coordinator.pager_commit_lock.unlock();
|
||||
self.release_exclusive_tx(&tx_id);
|
||||
return Err(LimboError::Busy);
|
||||
}
|
||||
LimboResult::Ok => {
|
||||
let tx = Transaction::new(tx_id, begin_ts);
|
||||
@@ -1336,7 +1369,6 @@ impl<Clock: LogicalClock> MvStore<Clock> {
|
||||
pager: Rc<Pager>,
|
||||
connection: &Arc<Connection>,
|
||||
) -> Result<StateMachine<CommitStateMachine<Clock>>> {
|
||||
tracing::trace!("commit_tx(tx_id={})", tx_id);
|
||||
let state_machine: StateMachine<CommitStateMachine<Clock>> =
|
||||
StateMachine::<CommitStateMachine<Clock>>::new(CommitStateMachine::new(
|
||||
CommitState::Initial,
|
||||
|
||||
@@ -2171,7 +2171,7 @@ pub fn op_transaction(
|
||||
mv_store.begin_tx(pager.clone())
|
||||
}
|
||||
TransactionMode::Write => {
|
||||
return_if_io!(mv_store.begin_exclusive_tx(pager.clone()))
|
||||
return_if_io!(mv_store.begin_exclusive_tx(pager.clone(), None))
|
||||
}
|
||||
};
|
||||
conn.mv_transactions.borrow_mut().push(tx_id);
|
||||
@@ -2180,11 +2180,13 @@ pub fn op_transaction(
|
||||
&& matches!(new_transaction_state, TransactionState::Write { .. })
|
||||
&& matches!(tx_mode, TransactionMode::Write)
|
||||
{
|
||||
// For MVCC with concurrent transactions, we don't need to upgrade to exclusive.
|
||||
// The existing MVCC transaction can handle both reads and writes.
|
||||
// We only upgrade to exclusive for IMMEDIATE/EXCLUSIVE transaction modes.
|
||||
// Since we already have an MVCC transaction from BEGIN CONCURRENT,
|
||||
// we can just continue using it for writes.
|
||||
let is_upgrade_from_read = matches!(current_state, TransactionState::Read);
|
||||
let tx_id = program.connection.mv_tx_id.get().unwrap();
|
||||
if is_upgrade_from_read {
|
||||
return_if_io!(mv_store.upgrade_to_exclusive_tx(pager.clone(), Some(tx_id)));
|
||||
} else {
|
||||
return_if_io!(mv_store.begin_exclusive_tx(pager.clone(), Some(tx_id)));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if matches!(tx_mode, TransactionMode::Concurrent) {
|
||||
|
||||
@@ -1082,6 +1082,8 @@ pub fn handle_program_error(
|
||||
LimboError::TxError(_) => {}
|
||||
// Table locked errors, e.g. trying to checkpoint in an interactive transaction, do not cause a rollback.
|
||||
LimboError::TableLocked => {}
|
||||
// Busy errors do not cause a rollback.
|
||||
LimboError::Busy => {}
|
||||
_ => {
|
||||
if let Some(mv_store) = mv_store {
|
||||
if let Some(tx_id) = connection.mv_tx_id.get() {
|
||||
|
||||
Reference in New Issue
Block a user