commit_txn track rollback case

This commit is contained in:
Pere Diaz Bou
2025-06-24 19:23:40 +02:00
parent b6a948e513
commit 22f9cd695d
5 changed files with 44 additions and 16 deletions

View File

@@ -7045,7 +7045,7 @@ mod tests {
)
.unwrap();
loop {
match pager.end_tx().unwrap() {
match pager.end_tx(false).unwrap() {
crate::PagerCacheflushStatus::Done(_) => break,
crate::PagerCacheflushStatus::IO => {
pager.io.run_once().unwrap();

View File

@@ -234,6 +234,7 @@ pub enum PagerCacheflushResult {
/// The WAL was written, fsynced, and a checkpoint was performed.
/// The database file was then also fsynced.
Checkpointed(CheckpointResult),
Rollback,
}
impl Pager {
@@ -573,7 +574,13 @@ impl Pager {
self.wal.borrow_mut().begin_write_tx()
}
pub fn end_tx(&self) -> Result<PagerCacheflushStatus> {
pub fn end_tx(&self, rollback: bool) -> Result<PagerCacheflushStatus> {
tracing::trace!("end_tx(rollback={})", rollback);
if rollback {
self.wal.borrow().end_write_tx()?;
self.wal.borrow().end_read_tx()?;
return Ok(PagerCacheflushStatus::Done(PagerCacheflushResult::Rollback));
}
let cacheflush_status = self.cacheflush()?;
match cacheflush_status {
PagerCacheflushStatus::IO => Ok(PagerCacheflushStatus::IO),
@@ -1064,8 +1071,10 @@ impl Pager {
pub fn rollback(&self) -> Result<(), LimboError> {
let mut cache = self.page_cache.write();
cache.clear();
cache.unset_dirty_all_pages();
cache.clear().expect("failed to clear page cache");
self.wal.borrow_mut().rollback()?;
Ok(())
}
}

View File

@@ -909,6 +909,7 @@ impl Wal for WalFile {
// TODO(pere): implement proper hashmap, this sucks :).
let shared = self.get_shared();
let max_frame = shared.max_frame.load(Ordering::SeqCst);
tracing::trace!("rollback(to_max_frame={})", max_frame);
let mut frame_cache = shared.frame_cache.lock();
for (_, frames) in frame_cache.iter_mut() {
let mut last_valid_frame = frames.len();
@@ -927,7 +928,6 @@ impl Wal for WalFile {
}
fn finish_append_frames_commit(&mut self) -> Result<()> {
println!("finish_append_frames_commit");
let shared = self.get_shared();
shared.max_frame.store(self.max_frame, Ordering::SeqCst);
tracing::trace!(

View File

@@ -1589,7 +1589,7 @@ pub fn halt(
)));
}
}
match program.commit_txn(pager.clone(), state, mv_store)? {
match program.commit_txn(pager.clone(), state, mv_store, false)? {
StepResult::Done => Ok(InsnFunctionStepResult::Done),
StepResult::IO => Ok(InsnFunctionStepResult::IO),
StepResult::Row => Ok(InsnFunctionStepResult::Row),
@@ -1637,12 +1637,18 @@ pub fn op_halt(
)));
}
}
match program.commit_txn(pager.clone(), state, mv_store)? {
StepResult::Done => Ok(InsnFunctionStepResult::Done),
StepResult::IO => Ok(InsnFunctionStepResult::IO),
StepResult::Row => Ok(InsnFunctionStepResult::Row),
StepResult::Interrupt => Ok(InsnFunctionStepResult::Interrupt),
StepResult::Busy => Ok(InsnFunctionStepResult::Busy),
let auto_commit = program.connection.auto_commit.get();
tracing::trace!("op_halt(auto_commit={})", auto_commit);
if auto_commit {
match program.commit_txn(pager.clone(), state, mv_store, false)? {
StepResult::Done => Ok(InsnFunctionStepResult::Done),
StepResult::IO => Ok(InsnFunctionStepResult::IO),
StepResult::Row => Ok(InsnFunctionStepResult::Row),
StepResult::Interrupt => Ok(InsnFunctionStepResult::Interrupt),
StepResult::Busy => Ok(InsnFunctionStepResult::Busy),
}
} else {
Ok(InsnFunctionStepResult::Done)
}
}
@@ -1743,7 +1749,7 @@ pub fn op_auto_commit(
};
let conn = program.connection.clone();
if state.commit_state == CommitState::Committing {
return match program.commit_txn(pager.clone(), state, mv_store)? {
return match program.commit_txn(pager.clone(), state, mv_store, *rollback)? {
super::StepResult::Done => Ok(InsnFunctionStepResult::Done),
super::StepResult::IO => Ok(InsnFunctionStepResult::IO),
super::StepResult::Row => Ok(InsnFunctionStepResult::Row),
@@ -1773,7 +1779,7 @@ pub fn op_auto_commit(
"cannot commit - no transaction is active".to_string(),
));
}
match program.commit_txn(pager.clone(), state, mv_store)? {
match program.commit_txn(pager.clone(), state, mv_store, *rollback)? {
super::StepResult::Done => Ok(InsnFunctionStepResult::Done),
super::StepResult::IO => Ok(InsnFunctionStepResult::IO),
super::StepResult::Row => Ok(InsnFunctionStepResult::Row),

View File

@@ -395,6 +395,7 @@ impl Program {
pager: Rc<Pager>,
program_state: &mut ProgramState,
mv_store: Option<&Rc<MvStore>>,
rollback: bool,
) -> Result<StepResult> {
if let Some(mv_store) = mv_store {
let conn = self.connection.clone();
@@ -410,16 +411,27 @@ impl Program {
} else {
let connection = self.connection.clone();
let auto_commit = connection.auto_commit.get();
tracing::trace!("Halt auto_commit {}", auto_commit);
tracing::trace!(
"Halt auto_commit {}, state={:?}",
auto_commit,
program_state.commit_state
);
if program_state.commit_state == CommitState::Committing {
self.step_end_write_txn(&pager, &mut program_state.commit_state, &connection)
self.step_end_write_txn(
&pager,
&mut program_state.commit_state,
&connection,
rollback,
)
} else if auto_commit {
let current_state = connection.transaction_state.get();
tracing::trace!("Auto-commit state: {:?}", current_state);
match current_state {
TransactionState::Write => self.step_end_write_txn(
&pager,
&mut program_state.commit_state,
&connection,
rollback,
),
TransactionState::Read => {
connection.transaction_state.replace(TransactionState::None);
@@ -443,8 +455,9 @@ impl Program {
pager: &Rc<Pager>,
commit_state: &mut CommitState,
connection: &Connection,
rollback: bool,
) -> Result<StepResult> {
let cacheflush_status = pager.end_tx()?;
let cacheflush_status = pager.end_tx(rollback)?;
match cacheflush_status {
PagerCacheflushStatus::Done(_) => {
if self.change_cnt_on {