remove io.block from op_checkpoint

This commit is contained in:
Nikita Sivukhin
2025-09-17 16:35:36 +04:00
parent 27627bdb8d
commit c4c9d409c9
2 changed files with 96 additions and 22 deletions

View File

@@ -330,12 +330,35 @@ pub fn op_bit_not(
Ok(InsnFunctionStepResult::Step)
}
#[derive(Debug)]
pub enum OpCheckpointState {
StartCheckpoint,
FinishCheckpoint { result: Option<CheckpointResult> },
CompleteResult { result: Result<CheckpointResult> },
}
pub fn op_checkpoint(
program: &Program,
state: &mut ProgramState,
insn: &Insn,
pager: &Arc<Pager>,
mv_store: Option<&Arc<MvStore>>,
) -> Result<InsnFunctionStepResult> {
match op_checkpoint_inner(program, state, insn, pager, mv_store) {
Ok(result) => Ok(result),
Err(err) => {
state.op_checkpoint_state = OpCheckpointState::StartCheckpoint;
return Err(err);
}
}
}
pub fn op_checkpoint_inner(
program: &Program,
state: &mut ProgramState,
insn: &Insn,
pager: &Rc<Pager>,
mv_store: Option<&Arc<MvStore>>,
) -> Result<InsnFunctionStepResult> {
load_insn!(
Checkpoint {
@@ -352,26 +375,75 @@ pub fn op_checkpoint(
// however.
return Err(LimboError::TableLocked);
}
let result = program.connection.checkpoint(*checkpoint_mode);
match result {
Ok(CheckpointResult {
num_attempted,
num_backfilled,
..
}) => {
// https://sqlite.org/pragma.html#pragma_wal_checkpoint
// 1st col: 1 (checkpoint SQLITE_BUSY) or 0 (not busy).
state.registers[*dest] = Register::Value(Value::Integer(0));
// 2nd col: # modified pages written to wal file
state.registers[*dest + 1] = Register::Value(Value::Integer(num_attempted as i64));
// 3rd col: # pages moved to db after checkpoint
state.registers[*dest + 2] = Register::Value(Value::Integer(num_backfilled as i64));
}
Err(_err) => state.registers[*dest] = Register::Value(Value::Integer(1)),
}
loop {
match &mut state.op_checkpoint_state {
OpCheckpointState::StartCheckpoint => {
let step_result = program
.connection
.pager
.borrow_mut()
.wal_checkpoint_start(*checkpoint_mode);
match step_result {
Ok(IOResult::Done(result)) => {
state.op_checkpoint_state = OpCheckpointState::FinishCheckpoint {
result: Some(result),
};
continue;
}
Ok(IOResult::IO(io)) => return Ok(InsnFunctionStepResult::IO(io)),
Err(err) => {
state.op_checkpoint_state =
OpCheckpointState::CompleteResult { result: Err(err) };
continue;
}
}
}
OpCheckpointState::FinishCheckpoint { result } => {
let step_result = program
.connection
.pager
.borrow_mut()
.wal_checkpoint_finish(result.as_mut().unwrap());
match step_result {
Ok(IOResult::Done(())) => {
state.op_checkpoint_state = OpCheckpointState::CompleteResult {
result: Ok(result.take().unwrap()),
};
continue;
}
Ok(IOResult::IO(io)) => return Ok(InsnFunctionStepResult::IO(io)),
Err(err) => {
state.op_checkpoint_state =
OpCheckpointState::CompleteResult { result: Err(err) };
continue;
}
}
}
OpCheckpointState::CompleteResult { result } => {
match result {
Ok(CheckpointResult {
num_attempted,
num_backfilled,
..
}) => {
// https://sqlite.org/pragma.html#pragma_wal_checkpoint
// 1st col: 1 (checkpoint SQLITE_BUSY) or 0 (not busy).
state.registers[*dest] = Register::Value(Value::Integer(0));
// 2nd col: # modified pages written to wal file
state.registers[*dest + 1] =
Register::Value(Value::Integer(*num_attempted as i64));
// 3rd col: # pages moved to db after checkpoint
state.registers[*dest + 2] =
Register::Value(Value::Integer(*num_backfilled as i64));
}
Err(_err) => state.registers[*dest] = Register::Value(Value::Integer(1)),
}
state.pc += 1;
Ok(InsnFunctionStepResult::Step)
state.pc += 1;
return Ok(InsnFunctionStepResult::Step);
}
}
}
}
pub fn op_null(

View File

@@ -35,9 +35,9 @@ use crate::{
types::{IOCompletions, IOResult, RawSlice, TextRef},
vdbe::{
execute::{
OpColumnState, OpDeleteState, OpDeleteSubState, OpIdxInsertState, OpInsertState,
OpInsertSubState, OpNewRowidState, OpNoConflictState, OpRowIdState, OpSeekState,
OpTransactionState,
OpCheckpointState, OpColumnState, OpDeleteState, OpDeleteSubState, OpIdxInsertState,
OpInsertState, OpInsertSubState, OpNewRowidState, OpNoConflictState, OpRowIdState,
OpSeekState, OpTransactionState,
},
metrics::StatementMetrics,
},
@@ -292,6 +292,7 @@ pub struct ProgramState {
op_column_state: OpColumnState,
op_row_id_state: OpRowIdState,
op_transaction_state: OpTransactionState,
op_checkpoint_state: OpCheckpointState,
/// State machine for committing view deltas with I/O handling
view_delta_state: ViewDeltaCommitState,
}
@@ -336,6 +337,7 @@ impl ProgramState {
op_column_state: OpColumnState::Start,
op_row_id_state: OpRowIdState::Start,
op_transaction_state: OpTransactionState::Start,
op_checkpoint_state: OpCheckpointState::StartCheckpoint,
view_delta_state: ViewDeltaCommitState::NotStarted,
}
}