mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-22 16:35:30 +01:00
Merge 'MVCC: Don't modify the row version chain on rollback' from Duy Dang
Rollback shouldn't modify the row version chain. This is crucial for implementing a Non-blocking row version chain in #3499 Reviewed-by: Jussi Saurio <jussi.saurio@gmail.com> Closes #3583
This commit is contained in:
@@ -164,92 +164,66 @@ impl<Clock: LogicalClock> CheckpointStateMachine<Clock> {
|
||||
// 2. A checkpointed table that was destroyed in the logical log. We need to destroy the btree in the pager/btree layer.
|
||||
continue;
|
||||
}
|
||||
|
||||
let row_versions = entry.value().read();
|
||||
|
||||
let mut version_to_checkpoint = None;
|
||||
let mut exists_in_db_file = false;
|
||||
for (i, version) in row_versions.iter().enumerate() {
|
||||
let is_last = i == row_versions.len() - 1;
|
||||
if let TxTimestampOrID::Timestamp(ts) = &version.begin {
|
||||
if *ts <= self.checkpointed_txid_max_old {
|
||||
for version in row_versions.iter() {
|
||||
if let Some(TxTimestampOrID::Timestamp(ts)) = version.begin {
|
||||
//TODO: garbage collect row versions after checkpointing.
|
||||
if ts > self.checkpointed_txid_max_old {
|
||||
version_to_checkpoint = Some(version);
|
||||
} else {
|
||||
exists_in_db_file = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let current_version_ts =
|
||||
if let Some(TxTimestampOrID::Timestamp(ts_end)) = version.end {
|
||||
ts_end.max(*ts)
|
||||
} else {
|
||||
*ts
|
||||
};
|
||||
if current_version_ts <= self.checkpointed_txid_max_old {
|
||||
// already checkpointed. TODO: garbage collect row versions after checkpointing.
|
||||
continue;
|
||||
}
|
||||
if let Some(version) = version_to_checkpoint {
|
||||
let is_delete = version.end.is_some();
|
||||
if let Some(TxTimestampOrID::Timestamp(ts)) = version.begin {
|
||||
max_timestamp = max_timestamp.max(ts);
|
||||
}
|
||||
|
||||
// Row versions in sqlite_schema are temporarily assigned a negative root page that is equal to the table id,
|
||||
// because the root page is not known until it's actually allocated during the checkpoint.
|
||||
// However, existing tables have a real root page.
|
||||
let get_table_id_or_root_page_from_sqlite_schema = |row_data: &Vec<u8>| {
|
||||
let row_data = ImmutableRecord::from_bin_record(row_data.clone());
|
||||
// Only write the row to the B-tree if it is not a delete, or if it is a delete and it exists in
|
||||
// the database file.
|
||||
if !is_delete || exists_in_db_file {
|
||||
let mut special_write = None;
|
||||
|
||||
if version.row.id.table_id == SQLITE_SCHEMA_MVCC_TABLE_ID {
|
||||
let row_data = ImmutableRecord::from_bin_record(version.row.data.clone());
|
||||
let mut record_cursor = RecordCursor::new();
|
||||
record_cursor.parse_full_header(&row_data).unwrap();
|
||||
let ValueRef::Integer(root_page) =
|
||||
if let ValueRef::Integer(root_page) =
|
||||
record_cursor.get_value(&row_data, 3).unwrap()
|
||||
else {
|
||||
panic!(
|
||||
"Expected integer value for root page, got {:?}",
|
||||
record_cursor.get_value(&row_data, 3)
|
||||
);
|
||||
};
|
||||
root_page
|
||||
};
|
||||
{
|
||||
if is_delete {
|
||||
let table_id = self
|
||||
.mvstore
|
||||
.table_id_to_rootpage
|
||||
.iter()
|
||||
.find(|entry| {
|
||||
entry.value().is_some_and(|r| r == root_page as u64)
|
||||
})
|
||||
.map(|entry| *entry.key())
|
||||
.unwrap(); // This assumes a valid mapping exists.
|
||||
self.destroyed_tables.insert(table_id);
|
||||
|
||||
max_timestamp = max_timestamp.max(current_version_ts);
|
||||
if is_last {
|
||||
let is_delete = version.end.is_some();
|
||||
let is_delete_of_table =
|
||||
is_delete && version.row.id.table_id == SQLITE_SCHEMA_MVCC_TABLE_ID;
|
||||
let is_create_of_table = !exists_in_db_file
|
||||
&& !is_delete
|
||||
&& version.row.id.table_id == SQLITE_SCHEMA_MVCC_TABLE_ID;
|
||||
|
||||
// We might need to create or destroy a B-tree in the pager during checkpoint if a row in root page 1 is deleted or created.
|
||||
let special_write = if is_delete_of_table {
|
||||
let root_page =
|
||||
get_table_id_or_root_page_from_sqlite_schema(&version.row.data);
|
||||
assert!(root_page > 0, "rootpage is positive integer");
|
||||
let root_page = root_page as u64;
|
||||
let table_id = *self
|
||||
.mvstore
|
||||
.table_id_to_rootpage
|
||||
.iter()
|
||||
.find(|entry| entry.value().is_some_and(|r| r == root_page))
|
||||
.unwrap()
|
||||
.key();
|
||||
self.destroyed_tables.insert(table_id);
|
||||
|
||||
if exists_in_db_file {
|
||||
Some(SpecialWrite::BTreeDestroy {
|
||||
// We might need to create or destroy a B-tree in the pager during checkpoint if a row in root page 1 is deleted or created.
|
||||
special_write = Some(SpecialWrite::BTreeDestroy {
|
||||
table_id,
|
||||
root_page,
|
||||
root_page: root_page as u64,
|
||||
num_columns: version.row.column_count,
|
||||
})
|
||||
} else {
|
||||
None
|
||||
});
|
||||
} else if !exists_in_db_file {
|
||||
let table_id = MVTableId::from(root_page);
|
||||
special_write = Some(SpecialWrite::BTreeCreate { table_id });
|
||||
}
|
||||
} else if is_create_of_table {
|
||||
let table_id =
|
||||
get_table_id_or_root_page_from_sqlite_schema(&version.row.data);
|
||||
let table_id = MVTableId::from(table_id);
|
||||
Some(SpecialWrite::BTreeCreate { table_id })
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// Only write the row to the B-tree if it is not a delete, or if it is a delete and it exists in the database file.
|
||||
let should_be_deleted_from_db_file = is_delete && exists_in_db_file;
|
||||
if !is_delete || should_be_deleted_from_db_file {
|
||||
self.write_set.push((version.clone(), special_write));
|
||||
}
|
||||
}
|
||||
|
||||
self.write_set.push((version.clone(), special_write));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -115,9 +115,10 @@ impl Row {
|
||||
}
|
||||
|
||||
/// A row version.
|
||||
/// TODO: we can optimize this by using bitpacking for the begin and end fields.
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub struct RowVersion {
|
||||
pub begin: TxTimestampOrID,
|
||||
pub begin: Option<TxTimestampOrID>,
|
||||
pub end: Option<TxTimestampOrID>,
|
||||
pub row: Row,
|
||||
}
|
||||
@@ -572,11 +573,11 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
|
||||
if let Some(row_versions) = mvcc_store.rows.get(id) {
|
||||
let mut row_versions = row_versions.value().write();
|
||||
for row_version in row_versions.iter_mut() {
|
||||
if let TxTimestampOrID::TxID(id) = row_version.begin {
|
||||
if let Some(TxTimestampOrID::TxID(id)) = row_version.begin {
|
||||
if id == self.tx_id {
|
||||
// New version is valid STARTING FROM committing transaction's end timestamp
|
||||
// See diagram on page 299: https://www.cs.cmu.edu/~15721-f24/papers/Hekaton.pdf
|
||||
row_version.begin = TxTimestampOrID::Timestamp(*end_ts);
|
||||
row_version.begin = Some(TxTimestampOrID::Timestamp(*end_ts));
|
||||
mvcc_store.insert_version_raw(
|
||||
&mut log_record.row_versions,
|
||||
row_version.clone(),
|
||||
@@ -1091,7 +1092,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
|
||||
assert_eq!(tx.state, TransactionState::Active);
|
||||
let id = row.id;
|
||||
let row_version = RowVersion {
|
||||
begin: TxTimestampOrID::TxID(tx.tx_id),
|
||||
begin: Some(TxTimestampOrID::TxID(tx.tx_id)),
|
||||
end: None,
|
||||
row,
|
||||
};
|
||||
@@ -1547,14 +1548,14 @@ impl<Clock: LogicalClock> MvStore<Clock> {
|
||||
// Hekaton uses oldest-to-newest order for row versions, so we reverse iterate to find the newest one
|
||||
// this transaction changed.
|
||||
for row_version in row_versions.iter_mut().rev() {
|
||||
if let TxTimestampOrID::TxID(id) = row_version.begin {
|
||||
if let Some(TxTimestampOrID::TxID(id)) = row_version.begin {
|
||||
turso_assert!(
|
||||
id == tx_id,
|
||||
"only one tx(0) should exist on loading logical log"
|
||||
);
|
||||
// New version is valid STARTING FROM committing transaction's end timestamp
|
||||
// See diagram on page 299: https://www.cs.cmu.edu/~15721-f24/papers/Hekaton.pdf
|
||||
row_version.begin = TxTimestampOrID::Timestamp(end_ts);
|
||||
row_version.begin = Some(TxTimestampOrID::Timestamp(end_ts));
|
||||
}
|
||||
if let Some(TxTimestampOrID::TxID(id)) = row_version.end {
|
||||
turso_assert!(
|
||||
@@ -1585,27 +1586,30 @@ impl<Clock: LogicalClock> MvStore<Clock> {
|
||||
assert!(tx.state == TransactionState::Active || tx.state == TransactionState::Preparing);
|
||||
tx.state.store(TransactionState::Aborted);
|
||||
tracing::trace!("abort(tx_id={})", tx_id);
|
||||
let write_set: Vec<RowID> = tx.write_set.iter().map(|v| *v.value()).collect();
|
||||
|
||||
if self.is_exclusive_tx(&tx_id) {
|
||||
self.commit_coordinator.pager_commit_lock.unlock();
|
||||
self.release_exclusive_tx(&tx_id);
|
||||
}
|
||||
|
||||
for ref id in write_set {
|
||||
if let Some(row_versions) = self.rows.get(id) {
|
||||
for rowid in &tx.write_set {
|
||||
let rowid = rowid.value();
|
||||
if let Some(row_versions) = self.rows.get(rowid) {
|
||||
let mut row_versions = row_versions.value().write();
|
||||
for rv in row_versions.iter_mut() {
|
||||
if rv.end == Some(TxTimestampOrID::TxID(tx_id)) {
|
||||
if let Some(TxTimestampOrID::TxID(id)) = rv.begin {
|
||||
assert_eq!(id, tx_id);
|
||||
// If the transaction has aborted,
|
||||
// it marks all its new versions as garbage and sets their Begin
|
||||
// and End timestamps to infinity to make them invisible
|
||||
// See section 2.4: https://www.cs.cmu.edu/~15721-f24/papers/Hekaton.pdf
|
||||
rv.begin = None;
|
||||
rv.end = None;
|
||||
} else if rv.end == Some(TxTimestampOrID::TxID(tx_id)) {
|
||||
// undo deletions by this transaction
|
||||
rv.end = None;
|
||||
}
|
||||
}
|
||||
// remove insertions by this transaction
|
||||
row_versions.retain(|rv| rv.begin != TxTimestampOrID::TxID(tx_id));
|
||||
if row_versions.is_empty() {
|
||||
self.rows.remove(id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1752,10 +1756,20 @@ impl<Clock: LogicalClock> MvStore<Clock> {
|
||||
|
||||
// Extracts the begin timestamp from a transaction
|
||||
#[inline]
|
||||
fn get_begin_timestamp(&self, ts_or_id: &TxTimestampOrID) -> u64 {
|
||||
fn get_begin_timestamp(&self, ts_or_id: &Option<TxTimestampOrID>) -> u64 {
|
||||
match ts_or_id {
|
||||
TxTimestampOrID::Timestamp(ts) => *ts,
|
||||
TxTimestampOrID::TxID(tx_id) => self.txs.get(tx_id).unwrap().value().begin_ts,
|
||||
Some(TxTimestampOrID::Timestamp(ts)) => *ts,
|
||||
Some(TxTimestampOrID::TxID(tx_id)) => self.txs.get(tx_id).unwrap().value().begin_ts,
|
||||
// This function is intended to be used in the ordering of row versions within the row version chain in `insert_version_raw`.
|
||||
//
|
||||
// The row version chain should be append-only (aside from garbage collection),
|
||||
// so the specific ordering handled by this function may not be critical. We might
|
||||
// be able to append directly to the row version chain in the future.
|
||||
//
|
||||
// The value 0 is used here to represent an infinite timestamp value. This is a deliberate
|
||||
// choice for a planned future bitpacking optimization, reserving 0 for this purpose,
|
||||
// while actual timestamps will start from 1.
|
||||
None => 0,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1907,7 +1921,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
|
||||
self.insert_version(
|
||||
id,
|
||||
RowVersion {
|
||||
begin: TxTimestampOrID::Timestamp(0),
|
||||
begin: Some(TxTimestampOrID::Timestamp(0)),
|
||||
end: None,
|
||||
row: Row::new(id, record.get_payload().to_vec(), column_count),
|
||||
},
|
||||
@@ -2077,8 +2091,8 @@ impl RowVersion {
|
||||
|
||||
fn is_begin_visible(txs: &SkipMap<TxID, Transaction>, tx: &Transaction, rv: &RowVersion) -> bool {
|
||||
match rv.begin {
|
||||
TxTimestampOrID::Timestamp(rv_begin_ts) => tx.begin_ts >= rv_begin_ts,
|
||||
TxTimestampOrID::TxID(rv_begin) => {
|
||||
Some(TxTimestampOrID::Timestamp(rv_begin_ts)) => tx.begin_ts >= rv_begin_ts,
|
||||
Some(TxTimestampOrID::TxID(rv_begin)) => {
|
||||
let tb = txs.get(&rv_begin).unwrap();
|
||||
let tb = tb.value();
|
||||
let visible = match tb.state.load() {
|
||||
@@ -2098,6 +2112,7 @@ fn is_begin_visible(txs: &SkipMap<TxID, Transaction>, tx: &Transaction, rv: &Row
|
||||
);
|
||||
visible
|
||||
}
|
||||
None => false,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1074,7 +1074,7 @@ fn test_snapshot_isolation_tx_visible1() {
|
||||
|
||||
let current_tx = new_tx(4, 4, TransactionState::Preparing);
|
||||
|
||||
let rv_visible = |begin: TxTimestampOrID, end: Option<TxTimestampOrID>| {
|
||||
let rv_visible = |begin: Option<TxTimestampOrID>, end: Option<TxTimestampOrID>| {
|
||||
let row_version = RowVersion {
|
||||
begin,
|
||||
end,
|
||||
@@ -1086,60 +1086,60 @@ fn test_snapshot_isolation_tx_visible1() {
|
||||
|
||||
// begin visible: transaction committed with ts < current_tx.begin_ts
|
||||
// end visible: inf
|
||||
assert!(rv_visible(TxTimestampOrID::TxID(1), None));
|
||||
assert!(rv_visible(Some(TxTimestampOrID::TxID(1)), None));
|
||||
|
||||
// begin invisible: transaction committed with ts > current_tx.begin_ts
|
||||
assert!(!rv_visible(TxTimestampOrID::TxID(2), None));
|
||||
assert!(!rv_visible(Some(TxTimestampOrID::TxID(2)), None));
|
||||
|
||||
// begin invisible: transaction aborted
|
||||
assert!(!rv_visible(TxTimestampOrID::TxID(3), None));
|
||||
assert!(!rv_visible(Some(TxTimestampOrID::TxID(3)), None));
|
||||
|
||||
// begin visible: timestamp < current_tx.begin_ts
|
||||
// end invisible: transaction committed with ts > current_tx.begin_ts
|
||||
assert!(!rv_visible(
|
||||
TxTimestampOrID::Timestamp(0),
|
||||
Some(TxTimestampOrID::Timestamp(0)),
|
||||
Some(TxTimestampOrID::TxID(1))
|
||||
));
|
||||
|
||||
// begin visible: timestamp < current_tx.begin_ts
|
||||
// end visible: transaction committed with ts < current_tx.begin_ts
|
||||
assert!(rv_visible(
|
||||
TxTimestampOrID::Timestamp(0),
|
||||
Some(TxTimestampOrID::Timestamp(0)),
|
||||
Some(TxTimestampOrID::TxID(2))
|
||||
));
|
||||
|
||||
// begin visible: timestamp < current_tx.begin_ts
|
||||
// end invisible: transaction aborted
|
||||
assert!(!rv_visible(
|
||||
TxTimestampOrID::Timestamp(0),
|
||||
Some(TxTimestampOrID::Timestamp(0)),
|
||||
Some(TxTimestampOrID::TxID(3))
|
||||
));
|
||||
|
||||
// begin invisible: transaction preparing
|
||||
assert!(!rv_visible(TxTimestampOrID::TxID(5), None));
|
||||
assert!(!rv_visible(Some(TxTimestampOrID::TxID(5)), None));
|
||||
|
||||
// begin invisible: transaction committed with ts > current_tx.begin_ts
|
||||
assert!(!rv_visible(TxTimestampOrID::TxID(6), None));
|
||||
assert!(!rv_visible(Some(TxTimestampOrID::TxID(6)), None));
|
||||
|
||||
// begin invisible: transaction active
|
||||
assert!(!rv_visible(TxTimestampOrID::TxID(7), None));
|
||||
assert!(!rv_visible(Some(TxTimestampOrID::TxID(7)), None));
|
||||
|
||||
// begin invisible: transaction committed with ts > current_tx.begin_ts
|
||||
assert!(!rv_visible(TxTimestampOrID::TxID(6), None));
|
||||
assert!(!rv_visible(Some(TxTimestampOrID::TxID(6)), None));
|
||||
|
||||
// begin invisible: transaction active
|
||||
assert!(!rv_visible(TxTimestampOrID::TxID(7), None));
|
||||
assert!(!rv_visible(Some(TxTimestampOrID::TxID(7)), None));
|
||||
|
||||
// begin visible: timestamp < current_tx.begin_ts
|
||||
// end invisible: transaction preparing
|
||||
assert!(!rv_visible(
|
||||
TxTimestampOrID::Timestamp(0),
|
||||
Some(TxTimestampOrID::Timestamp(0)),
|
||||
Some(TxTimestampOrID::TxID(5))
|
||||
));
|
||||
|
||||
// begin invisible: timestamp > current_tx.begin_ts
|
||||
assert!(!rv_visible(
|
||||
TxTimestampOrID::Timestamp(6),
|
||||
Some(TxTimestampOrID::Timestamp(6)),
|
||||
Some(TxTimestampOrID::TxID(6))
|
||||
));
|
||||
|
||||
@@ -1148,9 +1148,11 @@ fn test_snapshot_isolation_tx_visible1() {
|
||||
// but that hasn't happened
|
||||
// (this is the https://avi.im/blag/2023/hekaton-paper-typo/ case, I believe!)
|
||||
assert!(rv_visible(
|
||||
TxTimestampOrID::Timestamp(0),
|
||||
Some(TxTimestampOrID::Timestamp(0)),
|
||||
Some(TxTimestampOrID::TxID(7))
|
||||
));
|
||||
|
||||
assert!(!rv_visible(None, None));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
Reference in New Issue
Block a user