diff --git a/core/mvcc/database/checkpoint_state_machine.rs b/core/mvcc/database/checkpoint_state_machine.rs index fbe8b2b84..16b5d3a4a 100644 --- a/core/mvcc/database/checkpoint_state_machine.rs +++ b/core/mvcc/database/checkpoint_state_machine.rs @@ -164,92 +164,66 @@ impl CheckpointStateMachine { // 2. A checkpointed table that was destroyed in the logical log. We need to destroy the btree in the pager/btree layer. continue; } + let row_versions = entry.value().read(); + + let mut version_to_checkpoint = None; let mut exists_in_db_file = false; - for (i, version) in row_versions.iter().enumerate() { - let is_last = i == row_versions.len() - 1; - if let TxTimestampOrID::Timestamp(ts) = &version.begin { - if *ts <= self.checkpointed_txid_max_old { + for version in row_versions.iter() { + if let Some(TxTimestampOrID::Timestamp(ts)) = version.begin { + //TODO: garbage collect row versions after checkpointing. + if ts > self.checkpointed_txid_max_old { + version_to_checkpoint = Some(version); + } else { exists_in_db_file = true; } + } + } - let current_version_ts = - if let Some(TxTimestampOrID::Timestamp(ts_end)) = version.end { - ts_end.max(*ts) - } else { - *ts - }; - if current_version_ts <= self.checkpointed_txid_max_old { - // already checkpointed. TODO: garbage collect row versions after checkpointing. - continue; - } + if let Some(version) = version_to_checkpoint { + let is_delete = version.end.is_some(); + if let Some(TxTimestampOrID::Timestamp(ts)) = version.begin { + max_timestamp = max_timestamp.max(ts); + } - // Row versions in sqlite_schema are temporarily assigned a negative root page that is equal to the table id, - // because the root page is not known until it's actually allocated during the checkpoint. - // However, existing tables have a real root page. - let get_table_id_or_root_page_from_sqlite_schema = |row_data: &Vec| { - let row_data = ImmutableRecord::from_bin_record(row_data.clone()); + // Only write the row to the B-tree if it is not a delete, or if it is a delete and it exists in + // the database file. + if !is_delete || exists_in_db_file { + let mut special_write = None; + + if version.row.id.table_id == SQLITE_SCHEMA_MVCC_TABLE_ID { + let row_data = ImmutableRecord::from_bin_record(version.row.data.clone()); let mut record_cursor = RecordCursor::new(); record_cursor.parse_full_header(&row_data).unwrap(); - let ValueRef::Integer(root_page) = + if let ValueRef::Integer(root_page) = record_cursor.get_value(&row_data, 3).unwrap() - else { - panic!( - "Expected integer value for root page, got {:?}", - record_cursor.get_value(&row_data, 3) - ); - }; - root_page - }; + { + if is_delete { + let table_id = self + .mvstore + .table_id_to_rootpage + .iter() + .find(|entry| { + entry.value().is_some_and(|r| r == root_page as u64) + }) + .map(|entry| *entry.key()) + .unwrap(); // This assumes a valid mapping exists. + self.destroyed_tables.insert(table_id); - max_timestamp = max_timestamp.max(current_version_ts); - if is_last { - let is_delete = version.end.is_some(); - let is_delete_of_table = - is_delete && version.row.id.table_id == SQLITE_SCHEMA_MVCC_TABLE_ID; - let is_create_of_table = !exists_in_db_file - && !is_delete - && version.row.id.table_id == SQLITE_SCHEMA_MVCC_TABLE_ID; - - // We might need to create or destroy a B-tree in the pager during checkpoint if a row in root page 1 is deleted or created. - let special_write = if is_delete_of_table { - let root_page = - get_table_id_or_root_page_from_sqlite_schema(&version.row.data); - assert!(root_page > 0, "rootpage is positive integer"); - let root_page = root_page as u64; - let table_id = *self - .mvstore - .table_id_to_rootpage - .iter() - .find(|entry| entry.value().is_some_and(|r| r == root_page)) - .unwrap() - .key(); - self.destroyed_tables.insert(table_id); - - if exists_in_db_file { - Some(SpecialWrite::BTreeDestroy { + // We might need to create or destroy a B-tree in the pager during checkpoint if a row in root page 1 is deleted or created. + special_write = Some(SpecialWrite::BTreeDestroy { table_id, - root_page, + root_page: root_page as u64, num_columns: version.row.column_count, - }) - } else { - None + }); + } else if !exists_in_db_file { + let table_id = MVTableId::from(root_page); + special_write = Some(SpecialWrite::BTreeCreate { table_id }); } - } else if is_create_of_table { - let table_id = - get_table_id_or_root_page_from_sqlite_schema(&version.row.data); - let table_id = MVTableId::from(table_id); - Some(SpecialWrite::BTreeCreate { table_id }) - } else { - None - }; - - // Only write the row to the B-tree if it is not a delete, or if it is a delete and it exists in the database file. - let should_be_deleted_from_db_file = is_delete && exists_in_db_file; - if !is_delete || should_be_deleted_from_db_file { - self.write_set.push((version.clone(), special_write)); } } + + self.write_set.push((version.clone(), special_write)); } } } diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index c7352fe7c..936a457f0 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -115,9 +115,10 @@ impl Row { } /// A row version. +/// TODO: we can optimize this by using bitpacking for the begin and end fields. #[derive(Clone, Debug, PartialEq)] pub struct RowVersion { - pub begin: TxTimestampOrID, + pub begin: Option, pub end: Option, pub row: Row, } @@ -572,11 +573,11 @@ impl StateTransition for CommitStateMachine { if let Some(row_versions) = mvcc_store.rows.get(id) { let mut row_versions = row_versions.value().write(); for row_version in row_versions.iter_mut() { - if let TxTimestampOrID::TxID(id) = row_version.begin { + if let Some(TxTimestampOrID::TxID(id)) = row_version.begin { if id == self.tx_id { // New version is valid STARTING FROM committing transaction's end timestamp // See diagram on page 299: https://www.cs.cmu.edu/~15721-f24/papers/Hekaton.pdf - row_version.begin = TxTimestampOrID::Timestamp(*end_ts); + row_version.begin = Some(TxTimestampOrID::Timestamp(*end_ts)); mvcc_store.insert_version_raw( &mut log_record.row_versions, row_version.clone(), @@ -1091,7 +1092,7 @@ impl MvStore { assert_eq!(tx.state, TransactionState::Active); let id = row.id; let row_version = RowVersion { - begin: TxTimestampOrID::TxID(tx.tx_id), + begin: Some(TxTimestampOrID::TxID(tx.tx_id)), end: None, row, }; @@ -1547,14 +1548,14 @@ impl MvStore { // Hekaton uses oldest-to-newest order for row versions, so we reverse iterate to find the newest one // this transaction changed. for row_version in row_versions.iter_mut().rev() { - if let TxTimestampOrID::TxID(id) = row_version.begin { + if let Some(TxTimestampOrID::TxID(id)) = row_version.begin { turso_assert!( id == tx_id, "only one tx(0) should exist on loading logical log" ); // New version is valid STARTING FROM committing transaction's end timestamp // See diagram on page 299: https://www.cs.cmu.edu/~15721-f24/papers/Hekaton.pdf - row_version.begin = TxTimestampOrID::Timestamp(end_ts); + row_version.begin = Some(TxTimestampOrID::Timestamp(end_ts)); } if let Some(TxTimestampOrID::TxID(id)) = row_version.end { turso_assert!( @@ -1585,27 +1586,30 @@ impl MvStore { assert!(tx.state == TransactionState::Active || tx.state == TransactionState::Preparing); tx.state.store(TransactionState::Aborted); tracing::trace!("abort(tx_id={})", tx_id); - let write_set: Vec = tx.write_set.iter().map(|v| *v.value()).collect(); if self.is_exclusive_tx(&tx_id) { self.commit_coordinator.pager_commit_lock.unlock(); self.release_exclusive_tx(&tx_id); } - for ref id in write_set { - if let Some(row_versions) = self.rows.get(id) { + for rowid in &tx.write_set { + let rowid = rowid.value(); + if let Some(row_versions) = self.rows.get(rowid) { let mut row_versions = row_versions.value().write(); for rv in row_versions.iter_mut() { - if rv.end == Some(TxTimestampOrID::TxID(tx_id)) { + if let Some(TxTimestampOrID::TxID(id)) = rv.begin { + assert_eq!(id, tx_id); + // If the transaction has aborted, + // it marks all its new versions as garbage and sets their Begin + // and End timestamps to infinity to make them invisible + // See section 2.4: https://www.cs.cmu.edu/~15721-f24/papers/Hekaton.pdf + rv.begin = None; + rv.end = None; + } else if rv.end == Some(TxTimestampOrID::TxID(tx_id)) { // undo deletions by this transaction rv.end = None; } } - // remove insertions by this transaction - row_versions.retain(|rv| rv.begin != TxTimestampOrID::TxID(tx_id)); - if row_versions.is_empty() { - self.rows.remove(id); - } } } @@ -1752,10 +1756,20 @@ impl MvStore { // Extracts the begin timestamp from a transaction #[inline] - fn get_begin_timestamp(&self, ts_or_id: &TxTimestampOrID) -> u64 { + fn get_begin_timestamp(&self, ts_or_id: &Option) -> u64 { match ts_or_id { - TxTimestampOrID::Timestamp(ts) => *ts, - TxTimestampOrID::TxID(tx_id) => self.txs.get(tx_id).unwrap().value().begin_ts, + Some(TxTimestampOrID::Timestamp(ts)) => *ts, + Some(TxTimestampOrID::TxID(tx_id)) => self.txs.get(tx_id).unwrap().value().begin_ts, + // This function is intended to be used in the ordering of row versions within the row version chain in `insert_version_raw`. + // + // The row version chain should be append-only (aside from garbage collection), + // so the specific ordering handled by this function may not be critical. We might + // be able to append directly to the row version chain in the future. + // + // The value 0 is used here to represent an infinite timestamp value. This is a deliberate + // choice for a planned future bitpacking optimization, reserving 0 for this purpose, + // while actual timestamps will start from 1. + None => 0, } } @@ -1907,7 +1921,7 @@ impl MvStore { self.insert_version( id, RowVersion { - begin: TxTimestampOrID::Timestamp(0), + begin: Some(TxTimestampOrID::Timestamp(0)), end: None, row: Row::new(id, record.get_payload().to_vec(), column_count), }, @@ -2077,8 +2091,8 @@ impl RowVersion { fn is_begin_visible(txs: &SkipMap, tx: &Transaction, rv: &RowVersion) -> bool { match rv.begin { - TxTimestampOrID::Timestamp(rv_begin_ts) => tx.begin_ts >= rv_begin_ts, - TxTimestampOrID::TxID(rv_begin) => { + Some(TxTimestampOrID::Timestamp(rv_begin_ts)) => tx.begin_ts >= rv_begin_ts, + Some(TxTimestampOrID::TxID(rv_begin)) => { let tb = txs.get(&rv_begin).unwrap(); let tb = tb.value(); let visible = match tb.state.load() { @@ -2098,6 +2112,7 @@ fn is_begin_visible(txs: &SkipMap, tx: &Transaction, rv: &Row ); visible } + None => false, } } diff --git a/core/mvcc/database/tests.rs b/core/mvcc/database/tests.rs index 3cc76db23..086391064 100644 --- a/core/mvcc/database/tests.rs +++ b/core/mvcc/database/tests.rs @@ -1074,7 +1074,7 @@ fn test_snapshot_isolation_tx_visible1() { let current_tx = new_tx(4, 4, TransactionState::Preparing); - let rv_visible = |begin: TxTimestampOrID, end: Option| { + let rv_visible = |begin: Option, end: Option| { let row_version = RowVersion { begin, end, @@ -1086,60 +1086,60 @@ fn test_snapshot_isolation_tx_visible1() { // begin visible: transaction committed with ts < current_tx.begin_ts // end visible: inf - assert!(rv_visible(TxTimestampOrID::TxID(1), None)); + assert!(rv_visible(Some(TxTimestampOrID::TxID(1)), None)); // begin invisible: transaction committed with ts > current_tx.begin_ts - assert!(!rv_visible(TxTimestampOrID::TxID(2), None)); + assert!(!rv_visible(Some(TxTimestampOrID::TxID(2)), None)); // begin invisible: transaction aborted - assert!(!rv_visible(TxTimestampOrID::TxID(3), None)); + assert!(!rv_visible(Some(TxTimestampOrID::TxID(3)), None)); // begin visible: timestamp < current_tx.begin_ts // end invisible: transaction committed with ts > current_tx.begin_ts assert!(!rv_visible( - TxTimestampOrID::Timestamp(0), + Some(TxTimestampOrID::Timestamp(0)), Some(TxTimestampOrID::TxID(1)) )); // begin visible: timestamp < current_tx.begin_ts // end visible: transaction committed with ts < current_tx.begin_ts assert!(rv_visible( - TxTimestampOrID::Timestamp(0), + Some(TxTimestampOrID::Timestamp(0)), Some(TxTimestampOrID::TxID(2)) )); // begin visible: timestamp < current_tx.begin_ts // end invisible: transaction aborted assert!(!rv_visible( - TxTimestampOrID::Timestamp(0), + Some(TxTimestampOrID::Timestamp(0)), Some(TxTimestampOrID::TxID(3)) )); // begin invisible: transaction preparing - assert!(!rv_visible(TxTimestampOrID::TxID(5), None)); + assert!(!rv_visible(Some(TxTimestampOrID::TxID(5)), None)); // begin invisible: transaction committed with ts > current_tx.begin_ts - assert!(!rv_visible(TxTimestampOrID::TxID(6), None)); + assert!(!rv_visible(Some(TxTimestampOrID::TxID(6)), None)); // begin invisible: transaction active - assert!(!rv_visible(TxTimestampOrID::TxID(7), None)); + assert!(!rv_visible(Some(TxTimestampOrID::TxID(7)), None)); // begin invisible: transaction committed with ts > current_tx.begin_ts - assert!(!rv_visible(TxTimestampOrID::TxID(6), None)); + assert!(!rv_visible(Some(TxTimestampOrID::TxID(6)), None)); // begin invisible: transaction active - assert!(!rv_visible(TxTimestampOrID::TxID(7), None)); + assert!(!rv_visible(Some(TxTimestampOrID::TxID(7)), None)); // begin visible: timestamp < current_tx.begin_ts // end invisible: transaction preparing assert!(!rv_visible( - TxTimestampOrID::Timestamp(0), + Some(TxTimestampOrID::Timestamp(0)), Some(TxTimestampOrID::TxID(5)) )); // begin invisible: timestamp > current_tx.begin_ts assert!(!rv_visible( - TxTimestampOrID::Timestamp(6), + Some(TxTimestampOrID::Timestamp(6)), Some(TxTimestampOrID::TxID(6)) )); @@ -1148,9 +1148,11 @@ fn test_snapshot_isolation_tx_visible1() { // but that hasn't happened // (this is the https://avi.im/blag/2023/hekaton-paper-typo/ case, I believe!) assert!(rv_visible( - TxTimestampOrID::Timestamp(0), + Some(TxTimestampOrID::Timestamp(0)), Some(TxTimestampOrID::TxID(7)) )); + + assert!(!rv_visible(None, None)); } #[test]