From c4c9d409c95c5e2c8bc93cb6057ae3171820da17 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 17 Sep 2025 16:35:36 +0400 Subject: [PATCH] remove io.block from op_checkpoint --- core/vdbe/execute.rs | 110 +++++++++++++++++++++++++++++++++++-------- core/vdbe/mod.rs | 8 ++-- 2 files changed, 96 insertions(+), 22 deletions(-) diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index e193cd05f..ccb7f4b3e 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -330,12 +330,35 @@ pub fn op_bit_not( Ok(InsnFunctionStepResult::Step) } +#[derive(Debug)] +pub enum OpCheckpointState { + StartCheckpoint, + FinishCheckpoint { result: Option }, + CompleteResult { result: Result }, +} + pub fn op_checkpoint( program: &Program, state: &mut ProgramState, insn: &Insn, pager: &Arc, mv_store: Option<&Arc>, +) -> Result { + match op_checkpoint_inner(program, state, insn, pager, mv_store) { + Ok(result) => Ok(result), + Err(err) => { + state.op_checkpoint_state = OpCheckpointState::StartCheckpoint; + return Err(err); + } + } +} + +pub fn op_checkpoint_inner( + program: &Program, + state: &mut ProgramState, + insn: &Insn, + pager: &Rc, + mv_store: Option<&Arc>, ) -> Result { load_insn!( Checkpoint { @@ -352,26 +375,75 @@ pub fn op_checkpoint( // however. return Err(LimboError::TableLocked); } - let result = program.connection.checkpoint(*checkpoint_mode); - match result { - Ok(CheckpointResult { - num_attempted, - num_backfilled, - .. - }) => { - // https://sqlite.org/pragma.html#pragma_wal_checkpoint - // 1st col: 1 (checkpoint SQLITE_BUSY) or 0 (not busy). - state.registers[*dest] = Register::Value(Value::Integer(0)); - // 2nd col: # modified pages written to wal file - state.registers[*dest + 1] = Register::Value(Value::Integer(num_attempted as i64)); - // 3rd col: # pages moved to db after checkpoint - state.registers[*dest + 2] = Register::Value(Value::Integer(num_backfilled as i64)); - } - Err(_err) => state.registers[*dest] = Register::Value(Value::Integer(1)), - } + loop { + match &mut state.op_checkpoint_state { + OpCheckpointState::StartCheckpoint => { + let step_result = program + .connection + .pager + .borrow_mut() + .wal_checkpoint_start(*checkpoint_mode); + match step_result { + Ok(IOResult::Done(result)) => { + state.op_checkpoint_state = OpCheckpointState::FinishCheckpoint { + result: Some(result), + }; + continue; + } + Ok(IOResult::IO(io)) => return Ok(InsnFunctionStepResult::IO(io)), + Err(err) => { + state.op_checkpoint_state = + OpCheckpointState::CompleteResult { result: Err(err) }; + continue; + } + } + } + OpCheckpointState::FinishCheckpoint { result } => { + let step_result = program + .connection + .pager + .borrow_mut() + .wal_checkpoint_finish(result.as_mut().unwrap()); + match step_result { + Ok(IOResult::Done(())) => { + state.op_checkpoint_state = OpCheckpointState::CompleteResult { + result: Ok(result.take().unwrap()), + }; + continue; + } + Ok(IOResult::IO(io)) => return Ok(InsnFunctionStepResult::IO(io)), + Err(err) => { + state.op_checkpoint_state = + OpCheckpointState::CompleteResult { result: Err(err) }; + continue; + } + } + } + OpCheckpointState::CompleteResult { result } => { + match result { + Ok(CheckpointResult { + num_attempted, + num_backfilled, + .. + }) => { + // https://sqlite.org/pragma.html#pragma_wal_checkpoint + // 1st col: 1 (checkpoint SQLITE_BUSY) or 0 (not busy). + state.registers[*dest] = Register::Value(Value::Integer(0)); + // 2nd col: # modified pages written to wal file + state.registers[*dest + 1] = + Register::Value(Value::Integer(*num_attempted as i64)); + // 3rd col: # pages moved to db after checkpoint + state.registers[*dest + 2] = + Register::Value(Value::Integer(*num_backfilled as i64)); + } + Err(_err) => state.registers[*dest] = Register::Value(Value::Integer(1)), + } - state.pc += 1; - Ok(InsnFunctionStepResult::Step) + state.pc += 1; + return Ok(InsnFunctionStepResult::Step); + } + } + } } pub fn op_null( diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index a39902bf0..66fc2a774 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -35,9 +35,9 @@ use crate::{ types::{IOCompletions, IOResult, RawSlice, TextRef}, vdbe::{ execute::{ - OpColumnState, OpDeleteState, OpDeleteSubState, OpIdxInsertState, OpInsertState, - OpInsertSubState, OpNewRowidState, OpNoConflictState, OpRowIdState, OpSeekState, - OpTransactionState, + OpCheckpointState, OpColumnState, OpDeleteState, OpDeleteSubState, OpIdxInsertState, + OpInsertState, OpInsertSubState, OpNewRowidState, OpNoConflictState, OpRowIdState, + OpSeekState, OpTransactionState, }, metrics::StatementMetrics, }, @@ -292,6 +292,7 @@ pub struct ProgramState { op_column_state: OpColumnState, op_row_id_state: OpRowIdState, op_transaction_state: OpTransactionState, + op_checkpoint_state: OpCheckpointState, /// State machine for committing view deltas with I/O handling view_delta_state: ViewDeltaCommitState, } @@ -336,6 +337,7 @@ impl ProgramState { op_column_state: OpColumnState::Start, op_row_id_state: OpRowIdState::Start, op_transaction_state: OpTransactionState::Start, + op_checkpoint_state: OpCheckpointState::StartCheckpoint, view_delta_state: ViewDeltaCommitState::NotStarted, } }