diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index 596ff3b9e..54cd1d0cf 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -803,11 +803,10 @@ impl StateTransition for CommitStateMachine { } CommitState::Commit { end_ts } => { let mut log_record = LogRecord::new(*end_ts); - let tx = mvcc_store.txs.get(&self.tx_id).unwrap(); - let tx_unlocked = tx.value(); - tx_unlocked - .state - .store(TransactionState::Committed(*end_ts)); + if !mvcc_store.is_exclusive_tx(&self.tx_id) && mvcc_store.has_exclusive_tx() { + // A non-CONCURRENT transaction is holding the exclusive lock, we must abort. + return Err(LimboError::WriteWriteConflict); + } for id in &self.write_set { if let Some(row_versions) = mvcc_store.rows.get(id) { let mut row_versions = row_versions.value().write(); @@ -909,6 +908,11 @@ impl StateTransition for CommitStateMachine { return Ok(TransitionResult::Continue); } CommitState::CommitEnd { end_ts } => { + let tx = mvcc_store.txs.get(&self.tx_id).unwrap(); + let tx_unlocked = tx.value(); + tx_unlocked + .state + .store(TransactionState::Committed(*end_ts)); // We have now updated all the versions with a reference to the // transaction ID to a timestamp and can, therefore, remove the // transaction. Please note that when we move to lockless, the @@ -1451,7 +1455,11 @@ impl MvStore { ) -> Result> { let is_logical_log = self.storage.is_logical_log(); let tx_id = maybe_existing_tx_id.unwrap_or_else(|| self.get_tx_id()); - let begin_ts = self.get_timestamp(); + let begin_ts = if let Some(tx_id) = maybe_existing_tx_id { + self.txs.get(&tx_id).unwrap().value().begin_ts + } else { + self.get_timestamp() + }; self.acquire_exclusive_tx(&tx_id)?; @@ -1644,7 +1652,7 @@ impl MvStore { pub fn rollback_tx( &self, tx_id: TxID, - pager: Arc, + _pager: Arc, connection: &Connection, ) -> Result<()> { let tx_unlocked = self.txs.get(&tx_id).unwrap(); @@ -1655,14 +1663,10 @@ impl MvStore { tracing::trace!("abort(tx_id={})", tx_id); let write_set: Vec = tx.write_set.iter().map(|v| *v.value()).collect(); - let pager_rollback_done = if self.is_exclusive_tx(&tx_id) { + if self.is_exclusive_tx(&tx_id) { self.commit_coordinator.pager_commit_lock.unlock(); self.release_exclusive_tx(&tx_id); - pager.io.block(|| pager.end_tx(true, connection))?; - true - } else { - false - }; + } for ref id in write_set { if let Some(row_versions) = self.rows.get(id) { @@ -1684,9 +1688,6 @@ impl MvStore { let tx = tx_unlocked.value(); tx.state.store(TransactionState::Terminated); tracing::trace!("terminate(tx_id={})", tx_id); - if !pager_rollback_done { - pager.end_read_tx()?; - } // FIXME: verify that we can already remove the transaction here! // Maybe it's fine for snapshot isolation, but too early for serializable? self.txs.remove(&tx_id); diff --git a/tests/integration/fuzz_transaction/mod.rs b/tests/integration/fuzz_transaction/mod.rs index b8667da2a..cc8e35724 100644 --- a/tests/integration/fuzz_transaction/mod.rs +++ b/tests/integration/fuzz_transaction/mod.rs @@ -490,15 +490,14 @@ async fn test_multiple_connections_fuzz() { } #[tokio::test] -#[ignore = "MVCC is currently under development, it is expected to fail"] // Same as test_multiple_connections_fuzz, but with MVCC enabled. async fn test_multiple_connections_fuzz_mvcc() { let mvcc_fuzz_options = FuzzOptions { mvcc_enabled: true, max_num_connections: 8, query_gen_options: QueryGenOptions { - weight_begin_deferred: 8, - weight_begin_concurrent: 8, + weight_begin_deferred: 4, + weight_begin_concurrent: 12, weight_commit: 8, weight_rollback: 8, weight_checkpoint: 0,