mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-11 03:04:22 +01:00
core/mvcc/logical-log: checkpoint support not updating transaction_state if needed
This commit is contained in:
@@ -88,6 +88,10 @@ pub struct CheckpointStateMachine<Clock: LogicalClock> {
|
||||
destroyed_tables: HashSet<MVTableId>,
|
||||
/// Result of the checkpoint
|
||||
checkpoint_result: Option<CheckpointResult>,
|
||||
/// Update connection's transaction state on checkpoint. If checkpoint was called as automatic
|
||||
/// process in a transaction we don't want to change the state as we assume we are already on a
|
||||
/// write transaction and any failure will be cleared on vdbe error handling.
|
||||
update_transaction_state: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
||||
@@ -109,6 +113,7 @@ impl<Clock: LogicalClock> CheckpointStateMachine<Clock> {
|
||||
pager: Arc<Pager>,
|
||||
mvstore: Arc<MvStore<Clock>>,
|
||||
connection: Arc<Connection>,
|
||||
update_transaction_state: bool,
|
||||
) -> Self {
|
||||
let checkpoint_lock = mvstore.blocking_checkpoint_lock.clone();
|
||||
Self {
|
||||
@@ -130,6 +135,7 @@ impl<Clock: LogicalClock> CheckpointStateMachine<Clock> {
|
||||
cursors: HashMap::new(),
|
||||
destroyed_tables: HashSet::new(),
|
||||
checkpoint_result: None,
|
||||
update_transaction_state,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -324,6 +330,7 @@ impl<Clock: LogicalClock> CheckpointStateMachine<Clock> {
|
||||
// Start a pager transaction to write committed versions to B-tree
|
||||
let result = self.pager.begin_read_tx();
|
||||
if let Err(crate::LimboError::Busy) = result {
|
||||
|
||||
return Err(crate::LimboError::Busy);
|
||||
}
|
||||
result?;
|
||||
@@ -334,9 +341,11 @@ impl<Clock: LogicalClock> CheckpointStateMachine<Clock> {
|
||||
return Err(crate::LimboError::Busy);
|
||||
}
|
||||
result?;
|
||||
*self.connection.transaction_state.write() = TransactionState::Write {
|
||||
schema_did_change: false,
|
||||
}; // TODO: schema_did_change??
|
||||
if self.update_transaction_state {
|
||||
*self.connection.transaction_state.write() = TransactionState::Write {
|
||||
schema_did_change: false,
|
||||
}; // TODO: schema_did_change??
|
||||
}
|
||||
self.lock_states.pager_write_tx = true;
|
||||
self.state = CheckpointState::WriteRow {
|
||||
write_set_index: 0,
|
||||
@@ -546,7 +555,9 @@ impl<Clock: LogicalClock> CheckpointStateMachine<Clock> {
|
||||
self.state = CheckpointState::TruncateLogicalLog;
|
||||
self.lock_states.pager_read_tx = false;
|
||||
self.lock_states.pager_write_tx = false;
|
||||
*self.connection.transaction_state.write() = TransactionState::None;
|
||||
if self.update_transaction_state {
|
||||
*self.connection.transaction_state.write() = TransactionState::None;
|
||||
}
|
||||
let header = self
|
||||
.pager
|
||||
.io
|
||||
@@ -637,10 +648,14 @@ impl<Clock: LogicalClock> StateTransition for CheckpointStateMachine<Clock> {
|
||||
.io
|
||||
.block(|| self.pager.end_tx(rollback, self.connection.as_ref()))
|
||||
.expect("failed to end pager write tx");
|
||||
*self.connection.transaction_state.write() = TransactionState::None;
|
||||
if self.update_transaction_state {
|
||||
*self.connection.transaction_state.write() = TransactionState::None;
|
||||
}
|
||||
} else if self.lock_states.pager_read_tx {
|
||||
self.pager.end_read_tx().unwrap();
|
||||
*self.connection.transaction_state.write() = TransactionState::None;
|
||||
if self.update_transaction_state {
|
||||
*self.connection.transaction_state.write() = TransactionState::None;
|
||||
}
|
||||
}
|
||||
if self.lock_states.blocking_checkpoint_lock_held {
|
||||
self.checkpoint_lock.unlock();
|
||||
|
||||
Reference in New Issue
Block a user