Merge 'Fix MVCC concurrency bugs' from Jussi Saurio

Reviewed-by: Pekka Enberg <penberg@iki.fi>

Closes #3214
This commit is contained in:
Jussi Saurio
2025-09-19 10:00:43 +03:00
committed by GitHub
2 changed files with 19 additions and 19 deletions

View File

@@ -803,11 +803,10 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
}
CommitState::Commit { end_ts } => {
let mut log_record = LogRecord::new(*end_ts);
let tx = mvcc_store.txs.get(&self.tx_id).unwrap();
let tx_unlocked = tx.value();
tx_unlocked
.state
.store(TransactionState::Committed(*end_ts));
if !mvcc_store.is_exclusive_tx(&self.tx_id) && mvcc_store.has_exclusive_tx() {
// A non-CONCURRENT transaction is holding the exclusive lock, we must abort.
return Err(LimboError::WriteWriteConflict);
}
for id in &self.write_set {
if let Some(row_versions) = mvcc_store.rows.get(id) {
let mut row_versions = row_versions.value().write();
@@ -909,6 +908,11 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
return Ok(TransitionResult::Continue);
}
CommitState::CommitEnd { end_ts } => {
let tx = mvcc_store.txs.get(&self.tx_id).unwrap();
let tx_unlocked = tx.value();
tx_unlocked
.state
.store(TransactionState::Committed(*end_ts));
// We have now updated all the versions with a reference to the
// transaction ID to a timestamp and can, therefore, remove the
// transaction. Please note that when we move to lockless, the
@@ -1451,7 +1455,11 @@ impl<Clock: LogicalClock> MvStore<Clock> {
) -> Result<IOResult<TxID>> {
let is_logical_log = self.storage.is_logical_log();
let tx_id = maybe_existing_tx_id.unwrap_or_else(|| self.get_tx_id());
let begin_ts = self.get_timestamp();
let begin_ts = if let Some(tx_id) = maybe_existing_tx_id {
self.txs.get(&tx_id).unwrap().value().begin_ts
} else {
self.get_timestamp()
};
self.acquire_exclusive_tx(&tx_id)?;
@@ -1644,7 +1652,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
pub fn rollback_tx(
&self,
tx_id: TxID,
pager: Arc<Pager>,
_pager: Arc<Pager>,
connection: &Connection,
) -> Result<()> {
let tx_unlocked = self.txs.get(&tx_id).unwrap();
@@ -1655,14 +1663,10 @@ impl<Clock: LogicalClock> MvStore<Clock> {
tracing::trace!("abort(tx_id={})", tx_id);
let write_set: Vec<RowID> = tx.write_set.iter().map(|v| *v.value()).collect();
let pager_rollback_done = if self.is_exclusive_tx(&tx_id) {
if self.is_exclusive_tx(&tx_id) {
self.commit_coordinator.pager_commit_lock.unlock();
self.release_exclusive_tx(&tx_id);
pager.io.block(|| pager.end_tx(true, connection))?;
true
} else {
false
};
}
for ref id in write_set {
if let Some(row_versions) = self.rows.get(id) {
@@ -1684,9 +1688,6 @@ impl<Clock: LogicalClock> MvStore<Clock> {
let tx = tx_unlocked.value();
tx.state.store(TransactionState::Terminated);
tracing::trace!("terminate(tx_id={})", tx_id);
if !pager_rollback_done {
pager.end_read_tx()?;
}
// FIXME: verify that we can already remove the transaction here!
// Maybe it's fine for snapshot isolation, but too early for serializable?
self.txs.remove(&tx_id);

View File

@@ -490,15 +490,14 @@ async fn test_multiple_connections_fuzz() {
}
#[tokio::test]
#[ignore = "MVCC is currently under development, it is expected to fail"]
// Same as test_multiple_connections_fuzz, but with MVCC enabled.
async fn test_multiple_connections_fuzz_mvcc() {
let mvcc_fuzz_options = FuzzOptions {
mvcc_enabled: true,
max_num_connections: 8,
query_gen_options: QueryGenOptions {
weight_begin_deferred: 8,
weight_begin_concurrent: 8,
weight_begin_deferred: 4,
weight_begin_concurrent: 12,
weight_commit: 8,
weight_rollback: 8,
weight_checkpoint: 0,