core/mvcc: Wrap Transaction::database_header with RwLock

This commit is contained in:
Pekka Enberg
2025-09-24 10:36:56 +03:00
parent c894dcf438
commit b590b353eb
2 changed files with 10 additions and 11 deletions

View File

@@ -19,7 +19,6 @@ use crate::Result;
use crate::{Connection, Pager};
use crossbeam_skiplist::{SkipMap, SkipSet};
use parking_lot::RwLock;
use std::cell::RefCell;
use std::collections::HashMap;
use std::collections::HashSet;
use std::fmt::Debug;
@@ -118,7 +117,7 @@ pub struct Transaction {
/// The transaction read set.
read_set: SkipSet<RowID>,
/// The transaction header.
header: RefCell<DatabaseHeader>,
header: RwLock<DatabaseHeader>,
}
impl Transaction {
@@ -129,7 +128,7 @@ impl Transaction {
begin_ts,
write_set: SkipSet::new(),
read_set: SkipSet::new(),
header: RefCell::new(header),
header: RwLock::new(header),
}
}
@@ -770,7 +769,7 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
// FIXME: hack for now to keep database header updated for pager commit
let tx = mvcc_store.txs.get(&self.tx_id).unwrap();
let tx_unlocked = tx.value();
self.header.write().replace(*tx_unlocked.header.borrow());
self.header.write().replace(*tx_unlocked.header.read());
self.commit_coordinator.pager_commit_lock.unlock();
// TODO: here mark we are ready for a batch
self.state = CommitState::Commit { end_ts: *end_ts };
@@ -874,7 +873,7 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
}
let tx = mvcc_store.txs.get(&self.tx_id).unwrap();
let tx_unlocked = tx.value();
self.header.write().replace(*tx_unlocked.header.borrow());
self.header.write().replace(*tx_unlocked.header.read());
tracing::trace!("end_commit_logical_log(tx_id={})", self.tx_id);
self.commit_coordinator.pager_commit_lock.unlock();
self.state = CommitState::CommitEnd { end_ts: *end_ts };
@@ -1502,7 +1501,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
.get(tx_id)
.expect("transaction not found when trying to get header");
let header = tx.value();
let header = header.header.borrow();
let header = header.header.read();
tracing::debug!("get_transaction_database_header read: header={:?}", header);
*header
}
@@ -1514,7 +1513,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
if let Some(tx_id) = tx_id {
let tx = self.txs.get(tx_id).unwrap();
let header = tx.value();
let header = header.header.borrow();
let header = header.header.read();
tracing::debug!("with_header read: header={:?}", header);
Ok(f(&header))
} else {
@@ -1531,7 +1530,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
if let Some(tx_id) = tx_id {
let tx = self.txs.get(tx_id).unwrap();
let header = tx.value();
let mut header = header.header.borrow_mut();
let mut header = header.header.write();
tracing::debug!("with_header_mut read: header={:?}", header);
Ok(f(&mut header))
} else {

View File

@@ -2,7 +2,7 @@ use super::*;
use crate::io::PlatformIO;
use crate::mvcc::clock::LocalClock;
use crate::storage::sqlite3_ondisk::DatabaseHeader;
use std::cell::RefCell;
use parking_lot::RwLock;
pub(crate) struct MvccTestDbNoConn {
pub(crate) db: Option<Arc<Database>>,
@@ -1041,7 +1041,7 @@ fn new_tx(tx_id: TxID, begin_ts: u64, state: TransactionState) -> Transaction {
begin_ts,
write_set: SkipSet::new(),
read_set: SkipSet::new(),
header: RefCell::new(DatabaseHeader::default()),
header: RwLock::new(DatabaseHeader::default()),
}
}
@@ -1453,7 +1453,7 @@ fn transaction_display() {
begin_ts,
write_set,
read_set,
header: RefCell::new(DatabaseHeader::default()),
header: RwLock::new(DatabaseHeader::default()),
};
let expected = "{ state: Preparing, id: 42, begin_ts: 20250914, write_set: [RowID { table_id: 1, row_id: 11 }, RowID { table_id: 1, row_id: 13 }], read_set: [RowID { table_id: 2, row_id: 17 }, RowID { table_id: 2, row_id: 19 }] }";