mvcc: abort tx if other tx made a schema change in between

This commit is contained in:
Jussi Saurio
2025-09-24 14:41:36 +03:00
parent 864fa379dd
commit 1626ef046b

View File

@@ -288,6 +288,7 @@ struct CommitCoordinator {
pub struct CommitStateMachine<Clock: LogicalClock> {
state: CommitState,
is_finalized: bool,
did_commit_schema_change: bool,
tx_id: TxID,
connection: Arc<Connection>,
/// Write set sorted by table id and row id
@@ -340,6 +341,7 @@ impl<Clock: LogicalClock> CommitStateMachine<Clock> {
Self {
state,
is_finalized: false,
did_commit_schema_change: false,
tx_id,
connection,
write_set: Vec::new(),
@@ -388,6 +390,16 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
assert_eq!(tx.state, TransactionState::Active);
}
}
if mvcc_store
.last_committed_schema_change_ts
.load(Ordering::Acquire)
> tx.begin_ts
{
// Schema changes made after the transaction began always cause a [SchemaUpdated] error and the tx must abort.
return Err(LimboError::SchemaUpdated);
}
tx.state.store(TransactionState::Preparing);
tracing::trace!("prepare_tx(tx_id={})", self.tx_id);
@@ -501,6 +513,10 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
&mut log_record.row_versions,
row_version.clone(),
); // FIXME: optimize cloning out
if row_version.row.id.table_id == 1 {
self.did_commit_schema_change = true;
}
}
}
if let Some(TxTimestampOrID::TxID(id)) = row_version.end {
@@ -512,6 +528,10 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
&mut log_record.row_versions,
row_version.clone(),
); // FIXME: optimize cloning out
if row_version.row.id.table_id == 1 {
self.did_commit_schema_change = true;
}
}
}
}
@@ -565,10 +585,7 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
}
CommitState::EndCommitLogicalLog { end_ts } => {
let connection = self.connection.clone();
let schema_did_change = match connection.get_tx_state() {
crate::TransactionState::Write { schema_did_change } => schema_did_change,
_ => false,
};
let schema_did_change = self.did_commit_schema_change;
if schema_did_change {
let schema = connection.schema.read().clone();
connection.db.update_schema_if_newer(schema)?;
@@ -591,6 +608,13 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
.global_header
.write()
.replace(*tx_unlocked.header.read());
if self.did_commit_schema_change {
mvcc_store
.last_committed_schema_change_ts
.store(*end_ts, Ordering::Release);
}
// We have now updated all the versions with a reference to the
// transaction ID to a timestamp and can, therefore, remove the
// transaction. Please note that when we move to lockless, the
@@ -820,6 +844,9 @@ pub struct MvStore<Clock: LogicalClock> {
/// The highest transaction ID that has been checkpointed.
/// Used to skip checkpointing transactions that have already been checkpointed.
checkpointed_txid_max: AtomicU64,
/// The timestamp of the last committed schema change.
/// Schema changes always cause a [SchemaUpdated] error.
last_committed_schema_change_ts: AtomicU64,
}
impl<Clock: LogicalClock> MvStore<Clock> {
@@ -841,6 +868,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
global_header: Arc::new(RwLock::new(None)),
blocking_checkpoint_lock: Arc::new(TursoRwLock::new()),
checkpointed_txid_max: AtomicU64::new(0),
last_committed_schema_change_ts: AtomicU64::new(0),
}
}