From b590b353eb2a49aae8ae15f06fd06107793da400 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Wed, 24 Sep 2025 10:36:56 +0300 Subject: [PATCH] core/mvcc: Wrap Transaction::database_header with RwLock --- core/mvcc/database/mod.rs | 15 +++++++-------- core/mvcc/database/tests.rs | 6 +++--- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index 3621b5c97..95b36beb9 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -19,7 +19,6 @@ use crate::Result; use crate::{Connection, Pager}; use crossbeam_skiplist::{SkipMap, SkipSet}; use parking_lot::RwLock; -use std::cell::RefCell; use std::collections::HashMap; use std::collections::HashSet; use std::fmt::Debug; @@ -118,7 +117,7 @@ pub struct Transaction { /// The transaction read set. read_set: SkipSet, /// The transaction header. - header: RefCell, + header: RwLock, } impl Transaction { @@ -129,7 +128,7 @@ impl Transaction { begin_ts, write_set: SkipSet::new(), read_set: SkipSet::new(), - header: RefCell::new(header), + header: RwLock::new(header), } } @@ -770,7 +769,7 @@ impl StateTransition for CommitStateMachine { // FIXME: hack for now to keep database header updated for pager commit let tx = mvcc_store.txs.get(&self.tx_id).unwrap(); let tx_unlocked = tx.value(); - self.header.write().replace(*tx_unlocked.header.borrow()); + self.header.write().replace(*tx_unlocked.header.read()); self.commit_coordinator.pager_commit_lock.unlock(); // TODO: here mark we are ready for a batch self.state = CommitState::Commit { end_ts: *end_ts }; @@ -874,7 +873,7 @@ impl StateTransition for CommitStateMachine { } let tx = mvcc_store.txs.get(&self.tx_id).unwrap(); let tx_unlocked = tx.value(); - self.header.write().replace(*tx_unlocked.header.borrow()); + self.header.write().replace(*tx_unlocked.header.read()); tracing::trace!("end_commit_logical_log(tx_id={})", self.tx_id); self.commit_coordinator.pager_commit_lock.unlock(); self.state = CommitState::CommitEnd { end_ts: *end_ts }; @@ -1502,7 +1501,7 @@ impl MvStore { .get(tx_id) .expect("transaction not found when trying to get header"); let header = tx.value(); - let header = header.header.borrow(); + let header = header.header.read(); tracing::debug!("get_transaction_database_header read: header={:?}", header); *header } @@ -1514,7 +1513,7 @@ impl MvStore { if let Some(tx_id) = tx_id { let tx = self.txs.get(tx_id).unwrap(); let header = tx.value(); - let header = header.header.borrow(); + let header = header.header.read(); tracing::debug!("with_header read: header={:?}", header); Ok(f(&header)) } else { @@ -1531,7 +1530,7 @@ impl MvStore { if let Some(tx_id) = tx_id { let tx = self.txs.get(tx_id).unwrap(); let header = tx.value(); - let mut header = header.header.borrow_mut(); + let mut header = header.header.write(); tracing::debug!("with_header_mut read: header={:?}", header); Ok(f(&mut header)) } else { diff --git a/core/mvcc/database/tests.rs b/core/mvcc/database/tests.rs index cc693abdf..ff35ba9ab 100644 --- a/core/mvcc/database/tests.rs +++ b/core/mvcc/database/tests.rs @@ -2,7 +2,7 @@ use super::*; use crate::io::PlatformIO; use crate::mvcc::clock::LocalClock; use crate::storage::sqlite3_ondisk::DatabaseHeader; -use std::cell::RefCell; +use parking_lot::RwLock; pub(crate) struct MvccTestDbNoConn { pub(crate) db: Option>, @@ -1041,7 +1041,7 @@ fn new_tx(tx_id: TxID, begin_ts: u64, state: TransactionState) -> Transaction { begin_ts, write_set: SkipSet::new(), read_set: SkipSet::new(), - header: RefCell::new(DatabaseHeader::default()), + header: RwLock::new(DatabaseHeader::default()), } } @@ -1453,7 +1453,7 @@ fn transaction_display() { begin_ts, write_set, read_set, - header: RefCell::new(DatabaseHeader::default()), + header: RwLock::new(DatabaseHeader::default()), }; let expected = "{ state: Preparing, id: 42, begin_ts: 20250914, write_set: [RowID { table_id: 1, row_id: 11 }, RowID { table_id: 1, row_id: 13 }], read_set: [RowID { table_id: 2, row_id: 17 }, RowID { table_id: 2, row_id: 19 }] }";