Adopt Hekaton solution for rollback tx

This commit is contained in:
Duy Dang
2025-10-05 23:51:50 +07:00
parent b52d6ab056
commit c57567d776
3 changed files with 45 additions and 37 deletions

View File

@@ -168,7 +168,7 @@ impl<Clock: LogicalClock> CheckpointStateMachine<Clock> {
let mut exists_in_db_file = false;
for (i, version) in row_versions.iter().enumerate() {
let is_last = i == row_versions.len() - 1;
if let TxTimestampOrID::Timestamp(ts) = &version.begin {
if let Some(TxTimestampOrID::Timestamp(ts)) = &version.begin {
if *ts <= self.checkpointed_txid_max_old {
exists_in_db_file = true;
}

View File

@@ -115,9 +115,10 @@ impl Row {
}
/// A row version.
/// TODO: we can optimize this by using bitpacking for the begin and end fields.
#[derive(Clone, Debug, PartialEq)]
pub struct RowVersion {
pub begin: TxTimestampOrID,
pub begin: Option<TxTimestampOrID>,
pub end: Option<TxTimestampOrID>,
pub row: Row,
}
@@ -572,11 +573,11 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
if let Some(row_versions) = mvcc_store.rows.get(id) {
let mut row_versions = row_versions.value().write();
for row_version in row_versions.iter_mut() {
if let TxTimestampOrID::TxID(id) = row_version.begin {
if let Some(TxTimestampOrID::TxID(id)) = row_version.begin {
if id == self.tx_id {
// New version is valid STARTING FROM committing transaction's end timestamp
// See diagram on page 299: https://www.cs.cmu.edu/~15721-f24/papers/Hekaton.pdf
row_version.begin = TxTimestampOrID::Timestamp(*end_ts);
row_version.begin = Some(TxTimestampOrID::Timestamp(*end_ts));
mvcc_store.insert_version_raw(
&mut log_record.row_versions,
row_version.clone(),
@@ -1091,7 +1092,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
assert_eq!(tx.state, TransactionState::Active);
let id = row.id;
let row_version = RowVersion {
begin: TxTimestampOrID::TxID(tx.tx_id),
begin: Some(TxTimestampOrID::TxID(tx.tx_id)),
end: None,
row,
};
@@ -1535,14 +1536,14 @@ impl<Clock: LogicalClock> MvStore<Clock> {
// Hekaton uses oldest-to-newest order for row versions, so we reverse iterate to find the newest one
// this transaction changed.
for row_version in row_versions.iter_mut().rev() {
if let TxTimestampOrID::TxID(id) = row_version.begin {
if let Some(TxTimestampOrID::TxID(id)) = row_version.begin {
turso_assert!(
id == tx_id,
"only one tx(0) should exist on loading logical log"
);
// New version is valid STARTING FROM committing transaction's end timestamp
// See diagram on page 299: https://www.cs.cmu.edu/~15721-f24/papers/Hekaton.pdf
row_version.begin = TxTimestampOrID::Timestamp(end_ts);
row_version.begin = Some(TxTimestampOrID::Timestamp(end_ts));
}
if let Some(TxTimestampOrID::TxID(id)) = row_version.end {
turso_assert!(
@@ -1578,27 +1579,30 @@ impl<Clock: LogicalClock> MvStore<Clock> {
assert!(tx.state == TransactionState::Active || tx.state == TransactionState::Preparing);
tx.state.store(TransactionState::Aborted);
tracing::trace!("abort(tx_id={})", tx_id);
let write_set: Vec<RowID> = tx.write_set.iter().map(|v| *v.value()).collect();
if self.is_exclusive_tx(&tx_id) {
self.commit_coordinator.pager_commit_lock.unlock();
self.release_exclusive_tx(&tx_id);
}
for ref id in write_set {
if let Some(row_versions) = self.rows.get(id) {
for rowid in &tx.write_set {
let rowid = rowid.value();
if let Some(row_versions) = self.rows.get(rowid) {
let mut row_versions = row_versions.value().write();
for rv in row_versions.iter_mut() {
if rv.end == Some(TxTimestampOrID::TxID(tx_id)) {
if let Some(TxTimestampOrID::TxID(id)) = rv.begin {
assert_eq!(id, tx_id);
// If the transaction has aborted,
// it marks all its new versions as garbage and sets their Begin
// and End timestamps to infinity to make them invisible
// See section 2.4: https://www.cs.cmu.edu/~15721-f24/papers/Hekaton.pdf
rv.begin = None;
rv.end = None;
} else if rv.end == Some(TxTimestampOrID::TxID(tx_id)) {
// undo deletions by this transaction
rv.end = None;
}
}
// remove insertions by this transaction
row_versions.retain(|rv| rv.begin != TxTimestampOrID::TxID(tx_id));
if row_versions.is_empty() {
self.rows.remove(id);
}
}
}
@@ -1747,10 +1751,11 @@ impl<Clock: LogicalClock> MvStore<Clock> {
// Extracts the begin timestamp from a transaction
#[inline]
fn get_begin_timestamp(&self, ts_or_id: &TxTimestampOrID) -> u64 {
fn get_begin_timestamp(&self, ts_or_id: &Option<TxTimestampOrID>) -> u64 {
match ts_or_id {
TxTimestampOrID::Timestamp(ts) => *ts,
TxTimestampOrID::TxID(tx_id) => self.txs.get(tx_id).unwrap().value().begin_ts,
Some(TxTimestampOrID::Timestamp(ts)) => *ts,
Some(TxTimestampOrID::TxID(tx_id)) => self.txs.get(tx_id).unwrap().value().begin_ts,
None => 0,
}
}
@@ -1902,7 +1907,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
self.insert_version(
id,
RowVersion {
begin: TxTimestampOrID::Timestamp(0),
begin: Some(TxTimestampOrID::Timestamp(0)),
end: None,
row: Row::new(id, record.get_payload().to_vec(), column_count),
},
@@ -2068,8 +2073,8 @@ impl RowVersion {
fn is_begin_visible(txs: &SkipMap<TxID, Transaction>, tx: &Transaction, rv: &RowVersion) -> bool {
match rv.begin {
TxTimestampOrID::Timestamp(rv_begin_ts) => tx.begin_ts >= rv_begin_ts,
TxTimestampOrID::TxID(rv_begin) => {
Some(TxTimestampOrID::Timestamp(rv_begin_ts)) => tx.begin_ts >= rv_begin_ts,
Some(TxTimestampOrID::TxID(rv_begin)) => {
let tb = txs.get(&rv_begin).unwrap();
let tb = tb.value();
let visible = match tb.state.load() {
@@ -2089,6 +2094,7 @@ fn is_begin_visible(txs: &SkipMap<TxID, Transaction>, tx: &Transaction, rv: &Row
);
visible
}
None => false,
}
}

View File

@@ -1076,7 +1076,7 @@ fn test_snapshot_isolation_tx_visible1() {
let current_tx = new_tx(4, 4, TransactionState::Preparing);
let rv_visible = |begin: TxTimestampOrID, end: Option<TxTimestampOrID>| {
let rv_visible = |begin: Option<TxTimestampOrID>, end: Option<TxTimestampOrID>| {
let row_version = RowVersion {
begin,
end,
@@ -1088,60 +1088,60 @@ fn test_snapshot_isolation_tx_visible1() {
// begin visible: transaction committed with ts < current_tx.begin_ts
// end visible: inf
assert!(rv_visible(TxTimestampOrID::TxID(1), None));
assert!(rv_visible(Some(TxTimestampOrID::TxID(1)), None));
// begin invisible: transaction committed with ts > current_tx.begin_ts
assert!(!rv_visible(TxTimestampOrID::TxID(2), None));
assert!(!rv_visible(Some(TxTimestampOrID::TxID(2)), None));
// begin invisible: transaction aborted
assert!(!rv_visible(TxTimestampOrID::TxID(3), None));
assert!(!rv_visible(Some(TxTimestampOrID::TxID(3)), None));
// begin visible: timestamp < current_tx.begin_ts
// end invisible: transaction committed with ts > current_tx.begin_ts
assert!(!rv_visible(
TxTimestampOrID::Timestamp(0),
Some(TxTimestampOrID::Timestamp(0)),
Some(TxTimestampOrID::TxID(1))
));
// begin visible: timestamp < current_tx.begin_ts
// end visible: transaction committed with ts < current_tx.begin_ts
assert!(rv_visible(
TxTimestampOrID::Timestamp(0),
Some(TxTimestampOrID::Timestamp(0)),
Some(TxTimestampOrID::TxID(2))
));
// begin visible: timestamp < current_tx.begin_ts
// end invisible: transaction aborted
assert!(!rv_visible(
TxTimestampOrID::Timestamp(0),
Some(TxTimestampOrID::Timestamp(0)),
Some(TxTimestampOrID::TxID(3))
));
// begin invisible: transaction preparing
assert!(!rv_visible(TxTimestampOrID::TxID(5), None));
assert!(!rv_visible(Some(TxTimestampOrID::TxID(5)), None));
// begin invisible: transaction committed with ts > current_tx.begin_ts
assert!(!rv_visible(TxTimestampOrID::TxID(6), None));
assert!(!rv_visible(Some(TxTimestampOrID::TxID(6)), None));
// begin invisible: transaction active
assert!(!rv_visible(TxTimestampOrID::TxID(7), None));
assert!(!rv_visible(Some(TxTimestampOrID::TxID(7)), None));
// begin invisible: transaction committed with ts > current_tx.begin_ts
assert!(!rv_visible(TxTimestampOrID::TxID(6), None));
assert!(!rv_visible(Some(TxTimestampOrID::TxID(6)), None));
// begin invisible: transaction active
assert!(!rv_visible(TxTimestampOrID::TxID(7), None));
assert!(!rv_visible(Some(TxTimestampOrID::TxID(7)), None));
// begin visible: timestamp < current_tx.begin_ts
// end invisible: transaction preparing
assert!(!rv_visible(
TxTimestampOrID::Timestamp(0),
Some(TxTimestampOrID::Timestamp(0)),
Some(TxTimestampOrID::TxID(5))
));
// begin invisible: timestamp > current_tx.begin_ts
assert!(!rv_visible(
TxTimestampOrID::Timestamp(6),
Some(TxTimestampOrID::Timestamp(6)),
Some(TxTimestampOrID::TxID(6))
));
@@ -1150,9 +1150,11 @@ fn test_snapshot_isolation_tx_visible1() {
// but that hasn't happened
// (this is the https://avi.im/blag/2023/hekaton-paper-typo/ case, I believe!)
assert!(rv_visible(
TxTimestampOrID::Timestamp(0),
Some(TxTimestampOrID::Timestamp(0)),
Some(TxTimestampOrID::TxID(7))
));
assert!(!rv_visible(None, None));
}
#[test]