Merge 'mvcc: remove unused code related to is_logical_log()' from Jussi Saurio

is always logical log

Closes #3220
This commit is contained in:
Jussi Saurio
2025-09-19 13:31:34 +03:00
committed by GitHub
3 changed files with 10 additions and 113 deletions

View File

@@ -1,6 +1,5 @@
use crate::mvcc::clock::LogicalClock;
use crate::mvcc::persistent_storage::Storage;
use crate::return_if_io;
use crate::state_machine::StateMachine;
use crate::state_machine::StateTransition;
use crate::state_machine::TransitionResult;
@@ -542,24 +541,11 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
if mvcc_store.is_exclusive_tx(&self.tx_id) {
mvcc_store.release_exclusive_tx(&self.tx_id);
self.commit_coordinator.pager_commit_lock.unlock();
if !mvcc_store.storage.is_logical_log() {
// FIXME: this function isnt re-entrant
self.pager
.io
.block(|| self.pager.end_tx(false, &self.connection))?;
}
} else if !mvcc_store.storage.is_logical_log() {
self.pager.end_read_tx()?;
}
self.finalize(mvcc_store)?;
return Ok(TransitionResult::Done(()));
}
if mvcc_store.storage.is_logical_log() {
self.state = CommitState::Commit { end_ts };
return Ok(TransitionResult::Continue);
} else {
self.state = CommitState::BeginPagerTxn { end_ts };
}
self.state = CommitState::Commit { end_ts };
Ok(TransitionResult::Continue)
}
CommitState::BeginPagerTxn { end_ts } => {
@@ -851,7 +837,6 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
return Ok(TransitionResult::Continue);
}
CommitState::BeginCommitLogicalLog { end_ts, log_record } => {
assert!(mvcc_store.storage.is_logical_log());
if !mvcc_store.is_exclusive_tx(&self.tx_id) {
// logical log needs to be serialized
let locked = self.commit_coordinator.pager_commit_lock.write();
@@ -866,10 +851,6 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
match result {
IOResult::Done(_) => {}
IOResult::IO(io) => {
assert!(
mvcc_store.storage.is_logical_log(),
"for now logical log is the only storage that can return IO"
);
if !io.finished() {
return Ok(TransitionResult::Io(io));
}
@@ -897,13 +878,11 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
let schema = connection.schema.borrow().clone();
connection.db.update_schema_if_newer(schema)?;
}
if mvcc_store.storage.is_logical_log() {
let tx = mvcc_store.txs.get(&self.tx_id).unwrap();
let tx_unlocked = tx.value();
self.header.write().replace(*tx_unlocked.header.borrow());
tracing::trace!("end_commit_logical_log(tx_id={})", self.tx_id);
self.commit_coordinator.pager_commit_lock.unlock();
}
let tx = mvcc_store.txs.get(&self.tx_id).unwrap();
let tx_unlocked = tx.value();
self.header.write().replace(*tx_unlocked.header.borrow());
tracing::trace!("end_commit_logical_log(tx_id={})", self.tx_id);
self.commit_coordinator.pager_commit_lock.unlock();
self.state = CommitState::CommitEnd { end_ts: *end_ts };
return Ok(TransitionResult::Continue);
}
@@ -1422,38 +1401,12 @@ impl<Clock: LogicalClock> MvStore<Clock> {
///
/// This is used for IMMEDIATE and EXCLUSIVE transaction types where we need
/// to ensure exclusive write access as per SQLite semantics.
#[instrument(skip_all, level = Level::DEBUG)]
pub fn begin_exclusive_tx(
&self,
pager: Arc<Pager>,
maybe_existing_tx_id: Option<TxID>,
) -> Result<IOResult<TxID>> {
self._begin_exclusive_tx(pager, false, maybe_existing_tx_id)
}
/// Upgrades a read transaction to an exclusive write transaction.
///
/// This is used for IMMEDIATE and EXCLUSIVE transaction types where we need
/// to ensure exclusive write access as per SQLite semantics.
pub fn upgrade_to_exclusive_tx(
&self,
pager: Arc<Pager>,
maybe_existing_tx_id: Option<TxID>,
) -> Result<IOResult<TxID>> {
self._begin_exclusive_tx(pager, true, maybe_existing_tx_id)
}
/// Begins an exclusive write transaction that prevents concurrent writes.
///
/// This is used for IMMEDIATE and EXCLUSIVE transaction types where we need
/// to ensure exclusive write access as per SQLite semantics.
#[instrument(skip_all, level = Level::DEBUG)]
fn _begin_exclusive_tx(
&self,
pager: Arc<Pager>,
is_upgrade_from_read: bool,
maybe_existing_tx_id: Option<TxID>,
) -> Result<IOResult<TxID>> {
let is_logical_log = self.storage.is_logical_log();
let tx_id = maybe_existing_tx_id.unwrap_or_else(|| self.get_tx_id());
let begin_ts = if let Some(tx_id) = maybe_existing_tx_id {
self.txs.get(&tx_id).unwrap().value().begin_ts
@@ -1463,16 +1416,6 @@ impl<Clock: LogicalClock> MvStore<Clock> {
self.acquire_exclusive_tx(&tx_id)?;
// Try to acquire the pager read lock
if !is_upgrade_from_read && !is_logical_log {
pager.begin_read_tx().inspect_err(|_| {
tracing::debug!(
"begin_exclusive_tx: tx_id={} failed with Busy on pager_read_lock",
tx_id
);
self.release_exclusive_tx(&tx_id);
})?;
}
let locked = self.commit_coordinator.pager_commit_lock.write();
if !locked {
tracing::debug!(
@@ -1480,46 +1423,18 @@ impl<Clock: LogicalClock> MvStore<Clock> {
tx_id
);
self.release_exclusive_tx(&tx_id);
pager.end_read_tx()?;
return Err(LimboError::Busy);
}
let header = self.get_new_transaction_database_header(&pager);
if is_logical_log {
let tx = Transaction::new(tx_id, begin_ts, header);
tracing::trace!(
"begin_exclusive_tx(tx_id={}) - exclusive write logical log transaction",
tx_id
);
tracing::debug!("begin_exclusive_tx: tx_id={} succeeded", tx_id);
self.txs.insert(tx_id, tx);
return Ok(IOResult::Done(tx_id));
}
// Try to acquire the pager write lock
let begin_w_tx_res = pager.begin_write_tx();
if let Err(LimboError::Busy) = begin_w_tx_res {
tracing::debug!("begin_exclusive_tx: tx_id={} failed with Busy", tx_id);
// Failed to get pager lock - release our exclusive lock
self.commit_coordinator.pager_commit_lock.unlock();
self.release_exclusive_tx(&tx_id);
if maybe_existing_tx_id.is_none() {
// If we were upgrading an existing non-CONCURRENT mvcc transaction to write, we don't end the read tx on Busy.
// But if we were beginning a completely new non-CONCURRENT mvcc transaction, we do end it because the next time the connection
// attempts to do something, it will open a new read tx, which will fail if we don't end this one here.
pager.end_read_tx()?;
}
return Err(LimboError::Busy);
}
return_if_io!(begin_w_tx_res);
let tx = Transaction::new(tx_id, begin_ts, header);
tracing::trace!(
"begin_exclusive_tx(tx_id={}) - exclusive write transaction",
"begin_exclusive_tx(tx_id={}) - exclusive write logical log transaction",
tx_id
);
tracing::debug!("begin_exclusive_tx: tx_id={} succeeded", tx_id);
self.txs.insert(tx_id, tx);
Ok(IOResult::Done(tx_id))
}
@@ -1532,12 +1447,6 @@ impl<Clock: LogicalClock> MvStore<Clock> {
let tx_id = self.get_tx_id();
let begin_ts = self.get_timestamp();
// TODO: we need to tie a pager's read transaction to a transaction ID, so that future refactors to read
// pages from WAL/DB read from a consistent state to maintiain snapshot isolation.
if !self.storage.is_logical_log() {
pager.begin_read_tx()?;
}
// Set txn's header to the global header
let header = self.get_new_transaction_database_header(&pager);
let tx = Transaction::new(tx_id, begin_ts, header);

View File

@@ -29,10 +29,6 @@ impl Storage {
todo!()
}
pub fn is_logical_log(&self) -> bool {
true
}
pub fn sync(&self) -> Result<IOResult<()>> {
self.logical_log.borrow_mut().sync()
}

View File

@@ -2276,16 +2276,8 @@ pub fn op_transaction_inner(
if matches!(new_transaction_state, TransactionState::Write { .. })
&& matches!(actual_tx_mode, TransactionMode::Write)
{
let (tx_id, mv_tx_mode) = program.connection.mv_tx.get().unwrap();
if mv_tx_mode == TransactionMode::Read {
return_if_io!(
mv_store.upgrade_to_exclusive_tx(pager.clone(), Some(tx_id))
);
} else {
return_if_io!(
mv_store.begin_exclusive_tx(pager.clone(), Some(tx_id))
);
}
let (tx_id, _) = program.connection.mv_tx.get().unwrap();
return_if_io!(mv_store.begin_exclusive_tx(pager.clone(), Some(tx_id)));
}
}
} else {