mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-05 01:04:22 +01:00
Merge 'MVCC: fix write conflict handling' from Jussi Saurio
Fixes three bugs:
- When a transaction commits, it should:
1. set the end timestamp of the old version to its own **end
timestamp**,
2. set the begin timestamp of the new version to its own **end
timestamp**
* we were erroneously setting it to the transaction's begin
timestamp. see https://www.cs.cmu.edu/~15721-f24/papers/Hekaton.pdf page
299 diagram.
- When checking for write conflicts, the system should ignore versions
that the transaction cannot see. We were checking for write conflicts
before visibility, allowing a tx to always see the latest committed
version and to always overwrite it, since latest committed versions dont
have an end timestamp.
- When checking for write conflicts, the _presence_ of an end timestamp
should always indicate that it is not the latest version and thus a
newer committed version exists, forcing the transaction to abort.
see https://www.cs.cmu.edu/~15721-f24/papers/Hekaton.pdf page 301
section 2.6. Updating a version
Reviewed-by: Pekka Enberg <penberg@iki.fi>
Closes #927
This commit is contained in:
@@ -58,7 +58,9 @@ impl<T> LogRecord<T> {
|
||||
/// versions switch to tracking timestamps.
|
||||
#[derive(Clone, Debug, PartialEq, PartialOrd)]
|
||||
enum TxTimestampOrID {
|
||||
/// A committed transaction's timestamp.
|
||||
Timestamp(u64),
|
||||
/// The ID of a non-committed transaction.
|
||||
TxID(TxID),
|
||||
}
|
||||
|
||||
@@ -365,6 +367,10 @@ impl<Clock: LogicalClock, T: Sync + Send + Clone + Debug + 'static> MvStore<Cloc
|
||||
.ok_or(DatabaseError::NoSuchTransactionID(tx_id))?;
|
||||
let tx = tx.value().read().unwrap();
|
||||
assert_eq!(tx.state, TransactionState::Active);
|
||||
let version_is_visible_to_current_tx = is_version_visible(&self.txs, &tx, rv);
|
||||
if !version_is_visible_to_current_tx {
|
||||
continue;
|
||||
}
|
||||
if is_write_write_conflict(&self.txs, &tx, rv) {
|
||||
drop(row_versions);
|
||||
drop(row_versions_opt);
|
||||
@@ -372,19 +378,18 @@ impl<Clock: LogicalClock, T: Sync + Send + Clone + Debug + 'static> MvStore<Cloc
|
||||
self.rollback_tx(tx_id);
|
||||
return Err(DatabaseError::WriteWriteConflict);
|
||||
}
|
||||
if is_version_visible(&self.txs, &tx, rv) {
|
||||
rv.end = Some(TxTimestampOrID::TxID(tx.tx_id));
|
||||
drop(row_versions);
|
||||
drop(row_versions_opt);
|
||||
drop(tx);
|
||||
let tx = self
|
||||
.txs
|
||||
.get(&tx_id)
|
||||
.ok_or(DatabaseError::NoSuchTransactionID(tx_id))?;
|
||||
let mut tx = tx.value().write().unwrap();
|
||||
tx.insert_to_write_set(id);
|
||||
return Ok(true);
|
||||
}
|
||||
|
||||
rv.end = Some(TxTimestampOrID::TxID(tx.tx_id));
|
||||
drop(row_versions);
|
||||
drop(row_versions_opt);
|
||||
drop(tx);
|
||||
let tx = self
|
||||
.txs
|
||||
.get(&tx_id)
|
||||
.ok_or(DatabaseError::NoSuchTransactionID(tx_id))?;
|
||||
let mut tx = tx.value().write().unwrap();
|
||||
tx.insert_to_write_set(id);
|
||||
return Ok(true);
|
||||
}
|
||||
}
|
||||
Ok(false)
|
||||
@@ -556,7 +561,6 @@ impl<Clock: LogicalClock, T: Sync + Send + Clone + Debug + 'static> MvStore<Cloc
|
||||
*/
|
||||
tx.state.store(TransactionState::Committed(end_ts));
|
||||
tracing::trace!("COMMIT {tx}");
|
||||
let tx_begin_ts = tx.begin_ts;
|
||||
let write_set: Vec<RowID> = tx.write_set.iter().map(|v| *v.value()).collect();
|
||||
drop(tx);
|
||||
// Postprocessing: inserting row versions and logging the transaction to persistent storage.
|
||||
@@ -568,7 +572,9 @@ impl<Clock: LogicalClock, T: Sync + Send + Clone + Debug + 'static> MvStore<Cloc
|
||||
for row_version in row_versions.iter_mut() {
|
||||
if let TxTimestampOrID::TxID(id) = row_version.begin {
|
||||
if id == tx_id {
|
||||
row_version.begin = TxTimestampOrID::Timestamp(tx_begin_ts);
|
||||
// New version is valid STARTING FROM committing transaction's end timestamp
|
||||
// See diagram on page 299: https://www.cs.cmu.edu/~15721-f24/papers/Hekaton.pdf
|
||||
row_version.begin = TxTimestampOrID::Timestamp(end_ts);
|
||||
self.insert_version_raw(
|
||||
&mut log_record.row_versions,
|
||||
row_version.clone(),
|
||||
@@ -577,6 +583,8 @@ impl<Clock: LogicalClock, T: Sync + Send + Clone + Debug + 'static> MvStore<Cloc
|
||||
}
|
||||
if let Some(TxTimestampOrID::TxID(id)) = row_version.end {
|
||||
if id == tx_id {
|
||||
// New version is valid UNTIL committing transaction's end timestamp
|
||||
// See diagram on page 299: https://www.cs.cmu.edu/~15721-f24/papers/Hekaton.pdf
|
||||
row_version.end = Some(TxTimestampOrID::Timestamp(end_ts));
|
||||
self.insert_version_raw(
|
||||
&mut log_record.row_versions,
|
||||
@@ -720,8 +728,18 @@ impl<Clock: LogicalClock, T: Sync + Send + Clone + Debug + 'static> MvStore<Cloc
|
||||
}
|
||||
}
|
||||
|
||||
/// A write-write conflict happens when transaction T_m attempts to update a
|
||||
/// row version that is currently being updated by an active transaction T_n.
|
||||
/// A write-write conflict happens when transaction T_current attempts to update a
|
||||
/// row version that is:
|
||||
/// a) currently being updated by an active transaction T_previous, or
|
||||
/// b) was updated by an ended transaction T_previous that committed AFTER T_current started
|
||||
/// but BEFORE T_previous commits.
|
||||
///
|
||||
/// "Suppose transaction T wants to update a version V. V is updatable
|
||||
/// only if it is the latest version, that is, it has an end timestamp equal
|
||||
/// to infinity or its End field contains the ID of a transaction TE and
|
||||
/// TE’s state is Aborted"
|
||||
/// Ref: https://www.cs.cmu.edu/~15721-f24/papers/Hekaton.pdf , page 301,
|
||||
/// 2.6. Updating a Version.
|
||||
pub(crate) fn is_write_write_conflict<T>(
|
||||
txs: &SkipMap<TxID, RwLock<Transaction>>,
|
||||
tx: &Transaction,
|
||||
@@ -731,12 +749,16 @@ pub(crate) fn is_write_write_conflict<T>(
|
||||
Some(TxTimestampOrID::TxID(rv_end)) => {
|
||||
let te = txs.get(&rv_end).unwrap();
|
||||
let te = te.value().read().unwrap();
|
||||
match te.state.load() {
|
||||
TransactionState::Active | TransactionState::Preparing => tx.tx_id != te.tx_id,
|
||||
_ => false,
|
||||
if te.tx_id == tx.tx_id {
|
||||
return false;
|
||||
}
|
||||
te.state.load() != TransactionState::Aborted
|
||||
}
|
||||
Some(TxTimestampOrID::Timestamp(_)) => false,
|
||||
// A non-"infinity" end timestamp (here modeled by Some(ts)) functions as a write lock
|
||||
// on the row, so it can never be updated by another transaction.
|
||||
// Ref: https://www.cs.cmu.edu/~15721-f24/papers/Hekaton.pdf , page 301,
|
||||
// 2.6. Updating a Version.
|
||||
Some(TxTimestampOrID::Timestamp(_)) => true,
|
||||
None => false,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -382,7 +382,7 @@ fn test_fuzzy_read() {
|
||||
table_id: 1,
|
||||
row_id: 1,
|
||||
},
|
||||
data: "Hello".to_string(),
|
||||
data: "First".to_string(),
|
||||
};
|
||||
db.insert(tx1, tx1_row.clone()).unwrap();
|
||||
let row = db
|
||||
@@ -419,7 +419,7 @@ fn test_fuzzy_read() {
|
||||
table_id: 1,
|
||||
row_id: 1,
|
||||
},
|
||||
data: "World".to_string(),
|
||||
data: "Second".to_string(),
|
||||
};
|
||||
db.update(tx3, tx3_row).unwrap();
|
||||
db.commit_tx(tx3).unwrap();
|
||||
@@ -436,6 +436,18 @@ fn test_fuzzy_read() {
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(tx1_row, row);
|
||||
|
||||
// T2 tries to update the row, but fails because T3 has already committed an update to the row,
|
||||
// so T2 trying to write would violate snapshot isolation if it succeeded.
|
||||
let tx2_newrow = Row {
|
||||
id: RowID {
|
||||
table_id: 1,
|
||||
row_id: 1,
|
||||
},
|
||||
data: "Third".to_string(),
|
||||
};
|
||||
let update_result = db.update(tx2, tx2_newrow);
|
||||
assert_eq!(Err(DatabaseError::WriteWriteConflict), update_result);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
Reference in New Issue
Block a user