From 1626ef046b2db21ef51ce63d95d513371f5d5c8c Mon Sep 17 00:00:00 2001 From: Jussi Saurio Date: Wed, 24 Sep 2025 14:41:36 +0300 Subject: [PATCH] mvcc: abort tx if other tx made a schema change in between --- core/mvcc/database/mod.rs | 36 ++++++++++++++++++++++++++++++++---- 1 file changed, 32 insertions(+), 4 deletions(-) diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index 348402b16..a1174d2c0 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -288,6 +288,7 @@ struct CommitCoordinator { pub struct CommitStateMachine { state: CommitState, is_finalized: bool, + did_commit_schema_change: bool, tx_id: TxID, connection: Arc, /// Write set sorted by table id and row id @@ -340,6 +341,7 @@ impl CommitStateMachine { Self { state, is_finalized: false, + did_commit_schema_change: false, tx_id, connection, write_set: Vec::new(), @@ -388,6 +390,16 @@ impl StateTransition for CommitStateMachine { assert_eq!(tx.state, TransactionState::Active); } } + + if mvcc_store + .last_committed_schema_change_ts + .load(Ordering::Acquire) + > tx.begin_ts + { + // Schema changes made after the transaction began always cause a [SchemaUpdated] error and the tx must abort. + return Err(LimboError::SchemaUpdated); + } + tx.state.store(TransactionState::Preparing); tracing::trace!("prepare_tx(tx_id={})", self.tx_id); @@ -501,6 +513,10 @@ impl StateTransition for CommitStateMachine { &mut log_record.row_versions, row_version.clone(), ); // FIXME: optimize cloning out + + if row_version.row.id.table_id == 1 { + self.did_commit_schema_change = true; + } } } if let Some(TxTimestampOrID::TxID(id)) = row_version.end { @@ -512,6 +528,10 @@ impl StateTransition for CommitStateMachine { &mut log_record.row_versions, row_version.clone(), ); // FIXME: optimize cloning out + + if row_version.row.id.table_id == 1 { + self.did_commit_schema_change = true; + } } } } @@ -565,10 +585,7 @@ impl StateTransition for CommitStateMachine { } CommitState::EndCommitLogicalLog { end_ts } => { let connection = self.connection.clone(); - let schema_did_change = match connection.get_tx_state() { - crate::TransactionState::Write { schema_did_change } => schema_did_change, - _ => false, - }; + let schema_did_change = self.did_commit_schema_change; if schema_did_change { let schema = connection.schema.read().clone(); connection.db.update_schema_if_newer(schema)?; @@ -591,6 +608,13 @@ impl StateTransition for CommitStateMachine { .global_header .write() .replace(*tx_unlocked.header.read()); + + if self.did_commit_schema_change { + mvcc_store + .last_committed_schema_change_ts + .store(*end_ts, Ordering::Release); + } + // We have now updated all the versions with a reference to the // transaction ID to a timestamp and can, therefore, remove the // transaction. Please note that when we move to lockless, the @@ -820,6 +844,9 @@ pub struct MvStore { /// The highest transaction ID that has been checkpointed. /// Used to skip checkpointing transactions that have already been checkpointed. checkpointed_txid_max: AtomicU64, + /// The timestamp of the last committed schema change. + /// Schema changes always cause a [SchemaUpdated] error. + last_committed_schema_change_ts: AtomicU64, } impl MvStore { @@ -841,6 +868,7 @@ impl MvStore { global_header: Arc::new(RwLock::new(None)), blocking_checkpoint_lock: Arc::new(TursoRwLock::new()), checkpointed_txid_max: AtomicU64::new(0), + last_committed_schema_change_ts: AtomicU64::new(0), } }