mvcc: remove unused states and fields from CommitStateMachine

none of the pager commit related data and logic are used anymore,
so let's delete them.
This commit is contained in:
Jussi Saurio
2025-09-24 08:02:34 +03:00
parent 0a78ea87d5
commit 949e6dd728
4 changed files with 11 additions and 326 deletions

View File

@@ -47,9 +47,7 @@ fn bench(c: &mut Criterion) {
let conn = &db.conn;
let tx_id = db.mvcc_store.begin_tx(conn.get_pager().clone()).unwrap();
let mv_store = &db.mvcc_store;
let mut sm = mv_store
.commit_tx(tx_id, conn.get_pager().clone(), conn)
.unwrap();
let mut sm = mv_store.commit_tx(tx_id, conn).unwrap();
// TODO: sync IO hack
loop {
let res = sm.step(mv_store).unwrap();
@@ -76,9 +74,7 @@ fn bench(c: &mut Criterion) {
)
.unwrap();
let mv_store = &db.mvcc_store;
let mut sm = mv_store
.commit_tx(tx_id, conn.get_pager().clone(), conn)
.unwrap();
let mut sm = mv_store.commit_tx(tx_id, conn).unwrap();
// TODO: sync IO hack
loop {
let res = sm.step(mv_store).unwrap();
@@ -111,9 +107,7 @@ fn bench(c: &mut Criterion) {
)
.unwrap();
let mv_store = &db.mvcc_store;
let mut sm = mv_store
.commit_tx(tx_id, conn.get_pager().clone(), conn)
.unwrap();
let mut sm = mv_store.commit_tx(tx_id, conn).unwrap();
// TODO: sync IO hack
loop {
let res = sm.step(mv_store).unwrap();

View File

@@ -19,7 +19,6 @@ use crate::Result;
use crate::{Connection, Pager};
use crossbeam_skiplist::{SkipMap, SkipSet};
use parking_lot::RwLock;
use std::collections::HashMap;
use std::collections::HashSet;
use std::fmt::Debug;
use std::marker::PhantomData;
@@ -262,41 +261,11 @@ impl AtomicTransactionState {
#[derive(Debug)]
pub enum CommitState {
Initial,
BeginPagerTxn {
end_ts: u64,
},
WriteRow {
end_ts: u64,
write_set_index: usize,
requires_seek: bool,
},
WriteRowStateMachine {
end_ts: u64,
write_set_index: usize,
},
DeleteRowStateMachine {
end_ts: u64,
write_set_index: usize,
},
CommitPagerTxn {
end_ts: u64,
},
Commit {
end_ts: u64,
},
BeginCommitLogicalLog {
end_ts: u64,
log_record: LogRecord,
},
EndCommitLogicalLog {
end_ts: u64,
},
SyncLogicalLog {
end_ts: u64,
},
CommitEnd {
end_ts: u64,
},
Commit { end_ts: u64 },
BeginCommitLogicalLog { end_ts: u64, log_record: LogRecord },
EndCommitLogicalLog { end_ts: u64 },
SyncLogicalLog { end_ts: u64 },
CommitEnd { end_ts: u64 },
}
#[derive(Debug)]
@@ -311,21 +280,16 @@ pub enum WriteRowState {
#[derive(Debug)]
struct CommitCoordinator {
pager_commit_lock: Arc<TursoRwLock>,
commits_waiting: Arc<AtomicU64>,
}
pub struct CommitStateMachine<Clock: LogicalClock> {
state: CommitState,
is_finalized: bool,
pager: Arc<Pager>,
tx_id: TxID,
connection: Arc<Connection>,
/// Write set sorted by table id and row id
write_set: Vec<RowID>,
write_row_state_machine: Option<StateMachine<WriteRowStateMachine>>,
delete_row_state_machine: Option<StateMachine<DeleteRowStateMachine>>,
commit_coordinator: Arc<CommitCoordinator>,
cursors: HashMap<u64, Arc<RwLock<BTreeCursor>>>,
header: Arc<RwLock<Option<DatabaseHeader>>>,
_phantom: PhantomData<Clock>,
}
@@ -365,7 +329,6 @@ pub struct DeleteRowStateMachine {
impl<Clock: LogicalClock> CommitStateMachine<Clock> {
fn new(
state: CommitState,
pager: Arc<Pager>,
tx_id: TxID,
connection: Arc<Connection>,
commit_coordinator: Arc<CommitCoordinator>,
@@ -374,46 +337,14 @@ impl<Clock: LogicalClock> CommitStateMachine<Clock> {
Self {
state,
is_finalized: false,
pager,
tx_id,
connection,
write_set: Vec::new(),
write_row_state_machine: None,
delete_row_state_machine: None,
commit_coordinator,
cursors: HashMap::new(),
header,
_phantom: PhantomData,
}
}
/// We need to update pager's header to account for changes made by other transactions.
fn update_pager_header(&self, mvcc_store: &MvStore<Clock>) -> Result<()> {
let header = self.header.read();
let last_commited_header = header.as_ref().expect("Header not found");
self.pager.io.block(|| self.pager.maybe_allocate_page1())?;
let _ = self.pager.io.block(|| {
self.pager.with_header_mut(|header_in_pager| {
let header_in_transaction = mvcc_store.get_transaction_database_header(&self.tx_id);
tracing::debug!("update header here {}", header_in_transaction.schema_cookie);
// database_size should only be updated in each commit so it should be safe to assume correct database_size is in last_commited_header
header_in_pager.database_size = last_commited_header.database_size;
if header_in_transaction.schema_cookie < last_commited_header.schema_cookie {
tracing::error!("txn's schema cookie went back in time, aborting");
return Err(LimboError::SchemaUpdated);
}
assert!(
header_in_transaction.schema_cookie >= last_commited_header.schema_cookie,
"txn's schema cookie went back in time"
);
header_in_pager.schema_cookie = header_in_transaction.schema_cookie;
// TODO: deal with other fields
Ok(())
})
})?;
Ok(())
}
}
impl WriteRowStateMachine {
@@ -547,239 +478,6 @@ impl<Clock: LogicalClock> StateTransition for CommitStateMachine<Clock> {
self.state = CommitState::Commit { end_ts };
Ok(TransitionResult::Continue)
}
CommitState::BeginPagerTxn { end_ts } => {
// FIXME: how do we deal with multiple concurrent writes?
// WAL requires a txn to be written sequentially. Either we:
// 1. Wait for currently writer to finish before second txn starts.
// 2. Choose a txn to write depending on some heuristics like amount of frames will be written.
// 3. ..
// If this is the exclusive transaction, we already acquired a write transaction
// on the pager in begin_exclusive_tx() and don't need to acquire it.
if mvcc_store.is_exclusive_tx(&self.tx_id) {
self.update_pager_header(mvcc_store)?;
self.state = CommitState::WriteRow {
end_ts: *end_ts,
write_set_index: 0,
requires_seek: true,
};
return Ok(TransitionResult::Continue);
} else if mvcc_store.has_exclusive_tx() {
// There is an exclusive transaction holding the write lock. We must abort.
return Err(LimboError::WriteWriteConflict);
}
// Currently txns are queued without any heuristics whasoever. This is important because
// we need to ensure writes to disk happen sequentially.
// * We don't want txns to write to WAL in parallel.
// * We don't want BTree modifications to happen in parallel.
// If any of these were to happen, we would find ourselves in a bad corruption situation.
// NOTE: since we are blocking for `begin_write_tx` we do not care about re-entrancy right now.
let locked = self.commit_coordinator.pager_commit_lock.write();
if !locked {
self.commit_coordinator
.commits_waiting
.fetch_add(1, Ordering::SeqCst);
// FIXME: IOCompletions still needs a yield variant...
return Ok(TransitionResult::Io(crate::types::IOCompletions::Single(
Completion::new_dummy(),
)));
}
self.update_pager_header(mvcc_store)?;
{
let mut wal = self.pager.wal.as_ref().unwrap().borrow_mut();
// we need to update the max frame to the latest shared max frame in order to avoid snapshot staleness
wal.update_max_frame();
}
// We started a pager read transaction at the beginning of the MV transaction, because
// any reads we do from the database file and WAL must uphold snapshot isolation.
// However, now we must end and immediately restart the read transaction before committing.
// This is because other transactions may have committed writes to the DB file or WAL,
// and our pager must read in those changes when applying our writes; otherwise we would overwrite
// the changes from the previous committed transactions.
//
// Note that this would be incredibly unsafe in the regular transaction model, but in MVCC we trust
// the MV-store to uphold the guarantee that no write-write conflicts happened.
self.pager.end_read_tx().expect("end_read_tx cannot fail");
let result = self.pager.begin_read_tx();
if let Err(LimboError::Busy) = result {
// We cannot obtain a WAL read lock due to contention, so we must abort.
self.commit_coordinator.pager_commit_lock.unlock();
return Err(LimboError::WriteWriteConflict);
}
result?;
let result = self.pager.io.block(|| self.pager.begin_write_tx());
if let Err(LimboError::Busy) = result {
// There is a non-CONCURRENT transaction holding the write lock. We must abort.
self.commit_coordinator.pager_commit_lock.unlock();
return Err(LimboError::WriteWriteConflict);
}
result?;
self.state = CommitState::WriteRow {
end_ts: *end_ts,
write_set_index: 0,
requires_seek: true,
};
return Ok(TransitionResult::Continue);
}
CommitState::WriteRow {
end_ts,
write_set_index,
requires_seek,
} => {
if *write_set_index == self.write_set.len() {
self.state = CommitState::CommitPagerTxn { end_ts: *end_ts };
return Ok(TransitionResult::Continue);
}
let id = &self.write_set[*write_set_index];
if let Some(row_versions) = mvcc_store.rows.get(id) {
let row_versions = row_versions.value().read();
// Find rows that were written by this transaction.
// Hekaton uses oldest-to-newest order for row versions, so we reverse iterate to find the newest one
// this transaction changed.
for row_version in row_versions.iter().rev() {
if let TxTimestampOrID::TxID(row_tx_id) = row_version.begin {
if row_tx_id == self.tx_id {
let cursor = if let Some(cursor) = self.cursors.get(&id.table_id) {
cursor.clone()
} else {
let cursor = BTreeCursor::new_table(
None, // Write directly to B-tree
self.pager.clone(),
id.table_id as usize,
row_version.row.column_count,
);
let cursor = Arc::new(RwLock::new(cursor));
self.cursors.insert(id.table_id, cursor.clone());
cursor
};
let state_machine = mvcc_store.write_row_to_pager(
&row_version.row,
cursor,
*requires_seek,
)?;
self.write_row_state_machine = Some(state_machine);
self.state = CommitState::WriteRowStateMachine {
end_ts: *end_ts,
write_set_index: *write_set_index,
};
break;
}
}
if let Some(TxTimestampOrID::TxID(row_tx_id)) = row_version.end {
if row_tx_id == self.tx_id {
let column_count = row_version.row.column_count;
let cursor = if let Some(cursor) = self.cursors.get(&id.table_id) {
cursor.clone()
} else {
let cursor = BTreeCursor::new_table(
None, // Write directly to B-tree
self.pager.clone(),
id.table_id as usize,
column_count,
);
let cursor = Arc::new(RwLock::new(cursor));
self.cursors.insert(id.table_id, cursor.clone());
cursor
};
let state_machine =
mvcc_store.delete_row_from_pager(row_version.row.id, cursor)?;
self.delete_row_state_machine = Some(state_machine);
self.state = CommitState::DeleteRowStateMachine {
end_ts: *end_ts,
write_set_index: *write_set_index,
};
break;
}
}
}
}
Ok(TransitionResult::Continue)
}
CommitState::WriteRowStateMachine {
end_ts,
write_set_index,
} => {
let write_row_state_machine = self.write_row_state_machine.as_mut().unwrap();
match write_row_state_machine.step(&())? {
IOResult::IO(io) => return Ok(TransitionResult::Io(io)),
IOResult::Done(_) => {
let requires_seek = {
if let Some(next_id) = self.write_set.get(*write_set_index + 1) {
let current_id = &self.write_set[*write_set_index];
if current_id.table_id == next_id.table_id
&& current_id.row_id + 1 == next_id.row_id
{
// simple optimizaiton for sequential inserts with inceasing by 1 ids
// we should probably just check record in next row and see if it requires seek
false
} else {
true
}
} else {
false
}
};
self.state = CommitState::WriteRow {
end_ts: *end_ts,
write_set_index: *write_set_index + 1,
requires_seek,
};
return Ok(TransitionResult::Continue);
}
}
}
CommitState::DeleteRowStateMachine {
end_ts,
write_set_index,
} => {
let delete_row_state_machine = self.delete_row_state_machine.as_mut().unwrap();
match delete_row_state_machine.step(&())? {
IOResult::IO(io) => return Ok(TransitionResult::Io(io)),
IOResult::Done(_) => {
self.state = CommitState::WriteRow {
end_ts: *end_ts,
write_set_index: *write_set_index + 1,
requires_seek: true,
};
return Ok(TransitionResult::Continue);
}
}
}
CommitState::CommitPagerTxn { end_ts } => {
// Write committed data to pager for persistence
// Flush dirty pages to WAL - this is critical for data persistence
// Similar to what step_end_write_txn does for legacy transactions
let result = self
.pager
.end_tx(
false, // rollback = false since we're committing
&self.connection,
)
.map_err(|e| LimboError::InternalError(e.to_string()))
.unwrap();
match result {
IOResult::Done(_) => {
// FIXME: hack for now to keep database header updated for pager commit
let tx = mvcc_store.txs.get(&self.tx_id).unwrap();
let tx_unlocked = tx.value();
self.header.write().replace(*tx_unlocked.header.read());
self.commit_coordinator.pager_commit_lock.unlock();
// TODO: here mark we are ready for a batch
self.state = CommitState::Commit { end_ts: *end_ts };
return Ok(TransitionResult::Continue);
}
IOResult::IO(io) => {
return Ok(TransitionResult::Io(io));
}
}
}
CommitState::Commit { end_ts } => {
let mut log_record = LogRecord::new(*end_ts);
if !mvcc_store.is_exclusive_tx(&self.tx_id) && mvcc_store.has_exclusive_tx() {
@@ -1126,7 +824,6 @@ impl<Clock: LogicalClock> MvStore<Clock> {
exclusive_tx: RwLock::new(None),
commit_coordinator: Arc::new(CommitCoordinator {
pager_commit_lock: Arc::new(TursoRwLock::new()),
commits_waiting: Arc::new(AtomicU64::new(0)),
}),
global_header: Arc::new(RwLock::new(None)),
blocking_checkpoint_lock: Arc::new(TursoRwLock::new()),
@@ -1553,13 +1250,11 @@ impl<Clock: LogicalClock> MvStore<Clock> {
pub fn commit_tx(
&self,
tx_id: TxID,
pager: Arc<Pager>,
connection: &Arc<Connection>,
) -> Result<StateMachine<CommitStateMachine<Clock>>> {
let state_machine: StateMachine<CommitStateMachine<Clock>> =
StateMachine::<CommitStateMachine<Clock>>::new(CommitStateMachine::new(
CommitState::Initial,
pager,
tx_id,
connection.clone(),
self.commit_coordinator.clone(),

View File

@@ -760,9 +760,7 @@ pub(crate) fn commit_tx(
conn: &Arc<Connection>,
tx_id: u64,
) -> Result<()> {
let mut sm = mv_store
.commit_tx(tx_id, conn.pager.read().clone(), conn)
.unwrap();
let mut sm = mv_store.commit_tx(tx_id, conn).unwrap();
// TODO: sync IO hack
loop {
let res = sm.step(&mv_store)?;
@@ -783,9 +781,7 @@ pub(crate) fn commit_tx_no_conn(
conn: &Arc<Connection>,
) -> Result<(), LimboError> {
let mv_store = db.get_mvcc_store();
let mut sm = mv_store
.commit_tx(tx_id, conn.pager.read().clone(), conn)
.unwrap();
let mut sm = mv_store.commit_tx(tx_id, conn).unwrap();
// TODO: sync IO hack
loop {
let res = sm.step(&mv_store)?;

View File

@@ -837,7 +837,7 @@ impl Program {
let Some((tx_id, _)) = conn.mv_tx.get() else {
return Ok(IOResult::Done(()));
};
let state_machine = mv_store.commit_tx(tx_id, pager.clone(), &conn).unwrap();
let state_machine = mv_store.commit_tx(tx_id, &conn).unwrap();
program_state.commit_state = CommitState::CommitingMvcc { state_machine };
}
let CommitState::CommitingMvcc { state_machine } = &mut program_state.commit_state