diff --git a/cli/app.rs b/cli/app.rs index 3c1c9b295..5e2ae801c 100644 --- a/cli/app.rs +++ b/cli/app.rs @@ -240,6 +240,40 @@ impl Limbo { self.writeln(opts) } + fn display_stats(&mut self, args: crate::commands::args::StatsArgs) -> io::Result<()> { + use crate::commands::args::StatsToggle; + + // Handle on/off toggle + if let Some(toggle) = args.toggle { + match toggle { + StatsToggle::On => { + self.opts.stats = true; + self.writeln("Stats display enabled.")?; + } + StatsToggle::Off => { + self.opts.stats = false; + self.writeln("Stats display disabled.")?; + } + } + return Ok(()); + } + + // Display all metrics + let output = { + let metrics = self.conn.metrics.borrow(); + format!("{metrics}") + }; + + self.writeln(output)?; + + if args.reset { + self.conn.metrics.borrow_mut().reset(); + self.writeln("Statistics reset.")?; + } + + Ok(()) + } + pub fn reset_input(&mut self) { self.prompt = PROMPT.to_string(); self.input_buff.clear(); @@ -383,6 +417,21 @@ impl Limbo { } } self.print_query_performance_stats(start, stats); + + // Display stats if enabled + if self.opts.stats { + let stats_output = { + let metrics = self.conn.metrics.borrow(); + metrics + .last_statement + .as_ref() + .map(|last| format!("\n{last}")) + }; + if let Some(output) = stats_output { + let _ = self.writeln(output); + } + } + self.reset_input(); } @@ -564,6 +613,11 @@ impl Limbo { Command::ShowInfo => { let _ = self.show_info(); } + Command::Stats(args) => { + if let Err(e) = self.display_stats(args) { + let _ = self.writeln(e.to_string()); + } + } Command::Import(args) => { let w = self.writer.as_mut().unwrap(); let mut import_file = ImportFile::new(self.conn.clone(), w); diff --git a/cli/commands/args.rs b/cli/commands/args.rs index 5b83c08b3..62fa43d7c 100644 --- a/cli/commands/args.rs +++ b/cli/commands/args.rs @@ -89,6 +89,24 @@ pub struct NullValueArgs { pub value: String, } +#[derive(Debug, Clone, Args)] +pub struct StatsArgs { + /// Toggle stats mode: on or off + #[arg(value_enum)] + pub toggle: Option, + /// Reset stats after displaying + #[arg(long, short, default_value_t = false)] + pub reset: bool, +} + +#[derive(Debug, ValueEnum, Clone)] +pub enum StatsToggle { + /// Enable automatic stats display after each statement + On, + /// Disable automatic stats display + Off, +} + #[derive(Debug, Clone, Args)] pub struct EchoArgs { #[arg(value_enum)] diff --git a/cli/commands/mod.rs b/cli/commands/mod.rs index ce4ec920f..e9c19a925 100644 --- a/cli/commands/mod.rs +++ b/cli/commands/mod.rs @@ -3,8 +3,8 @@ pub mod import; use args::{ CwdArgs, DbConfigArgs, EchoArgs, ExitArgs, HeadersArgs, IndexesArgs, LoadExtensionArgs, - NullValueArgs, OpcodesArgs, OpenArgs, OutputModeArgs, SchemaArgs, SetOutputArgs, TablesArgs, - TimerArgs, + NullValueArgs, OpcodesArgs, OpenArgs, OutputModeArgs, SchemaArgs, SetOutputArgs, StatsArgs, + TablesArgs, TimerArgs, }; use clap::Parser; use import::ImportArgs; @@ -78,6 +78,9 @@ pub enum Command { /// Print or set the current configuration for the database. Currently ignored. #[command(name = "dbconfig", display_name = ".dbconfig")] DbConfig(DbConfigArgs), + /// Display database statistics + #[command(name = "stats", display_name = ".stats")] + Stats(StatsArgs), /// List vfs modules available #[command(name = "vfslist", display_name = ".vfslist")] ListVfs, diff --git a/cli/input.rs b/cli/input.rs index 1a045866e..e26a091a6 100644 --- a/cli/input.rs +++ b/cli/input.rs @@ -85,6 +85,7 @@ pub struct Settings { pub timer: bool, pub headers: bool, pub mcp: bool, + pub stats: bool, } impl From for Settings { @@ -110,6 +111,7 @@ impl From for Settings { timer: false, headers: false, mcp: opts.mcp, + stats: false, } } } diff --git a/core/lib.rs b/core/lib.rs index 5dd947673..f6899a081 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -47,6 +47,7 @@ use crate::translate::pragma::TURSO_CDC_DEFAULT_TABLE_NAME; use crate::types::WalFrameInfo; #[cfg(feature = "fs")] use crate::util::{OpenMode, OpenOptions}; +use crate::vdbe::metrics::ConnectionMetrics; use crate::vtab::VirtualTable; use core::str; pub use error::LimboError; @@ -422,6 +423,7 @@ impl Database { attached_databases: RefCell::new(DatabaseCatalog::new()), query_only: Cell::new(false), view_transaction_states: RefCell::new(HashMap::new()), + metrics: RefCell::new(ConnectionMetrics::new()), }); let builtin_syms = self.builtin_syms.borrow(); // add built-in extensions symbols to the connection to prevent having to load each time @@ -844,6 +846,8 @@ pub struct Connection { /// Per-connection view transaction states for uncommitted changes. This represents /// one entry per view that was touched in the transaction. view_transaction_states: RefCell>, + /// Connection-level metrics aggregation + pub metrics: RefCell, } impl Connection { @@ -1956,26 +1960,32 @@ impl Statement { } pub fn step(&mut self) -> Result { - if !self.accesses_db { - return self - .program - .step(&mut self.state, self.mv_store.clone(), self.pager.clone()); - } - - const MAX_SCHEMA_RETRY: usize = 50; - let mut res = self - .program - .step(&mut self.state, self.mv_store.clone(), self.pager.clone()); - for attempt in 0..MAX_SCHEMA_RETRY { - // Only reprepare if we still need to update schema - if !matches!(res, Err(LimboError::SchemaUpdated)) { - break; + let res = if !self.accesses_db { + self.program + .step(&mut self.state, self.mv_store.clone(), self.pager.clone()) + } else { + const MAX_SCHEMA_RETRY: usize = 50; + let mut res = + self.program + .step(&mut self.state, self.mv_store.clone(), self.pager.clone()); + for attempt in 0..MAX_SCHEMA_RETRY { + // Only reprepare if we still need to update schema + if !matches!(res, Err(LimboError::SchemaUpdated)) { + break; + } + tracing::debug!("reprepare: attempt={}", attempt); + self.reprepare()?; + res = self + .program + .step(&mut self.state, self.mv_store.clone(), self.pager.clone()); } - tracing::debug!("reprepare: attempt={}", attempt); - self.reprepare()?; - res = self - .program - .step(&mut self.state, self.mv_store.clone(), self.pager.clone()); + res + }; + + // Aggregate metrics when statement completes + if matches!(res, Ok(StepResult::Done)) { + let mut conn_metrics = self.program.connection.metrics.borrow_mut(); + conn_metrics.record_statement(self.state.metrics.clone()); } res diff --git a/core/translate/emitter.rs b/core/translate/emitter.rs index fe8742d14..e2498f121 100644 --- a/core/translate/emitter.rs +++ b/core/translate/emitter.rs @@ -1171,7 +1171,7 @@ fn emit_update_insns( record_reg, unpacked_start: Some(start), unpacked_count: Some((index.columns.len() + 1) as u16), - flags: IdxInsertFlags::new(), + flags: IdxInsertFlags::new().nchange(true), }); } diff --git a/core/translate/insert.rs b/core/translate/insert.rs index c3dfa6208..5d82139e6 100644 --- a/core/translate/insert.rs +++ b/core/translate/insert.rs @@ -523,7 +523,7 @@ pub fn translate_insert( unpacked_start: Some(idx_start_reg), // TODO: enable optimization unpacked_count: Some((num_cols + 1) as u16), // TODO: figure out how to determine whether or not we need to seek prior to insert. - flags: IdxInsertFlags::new(), + flags: IdxInsertFlags::new().nchange(true), }); } diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 89ac0a2d6..3e27d6931 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -1086,9 +1086,13 @@ pub fn op_vfilter( }; cursor.filter(*idx_num as i32, idx_str, *arg_count, args)? }; + // Increment filter_operations metric for virtual table filter + state.metrics.filter_operations = state.metrics.filter_operations.saturating_add(1); if !has_rows { state.pc = pc_if_empty.as_offset_int(); } else { + // VFilter positions to the first row if any exist, which counts as a read + state.metrics.rows_read = state.metrics.rows_read.saturating_add(1); state.pc += 1; } Ok(InsnFunctionStepResult::Step) @@ -1203,6 +1207,8 @@ pub fn op_vnext( cursor.next()? }; if has_more { + // Increment metrics for row read from virtual table (including materialized views) + state.metrics.rows_read = state.metrics.rows_read.saturating_add(1); state.pc = pc_if_next.as_offset_int(); } else { state.pc += 1; @@ -1283,6 +1289,8 @@ pub fn op_rewind( if is_empty { state.pc = pc_if_empty.as_offset_int(); } else { + // Rewind positions to the first row, which is effectively a read + state.metrics.rows_read = state.metrics.rows_read.saturating_add(1); state.pc += 1; } Ok(InsnFunctionStepResult::Step) @@ -1312,6 +1320,8 @@ pub fn op_last( if is_empty { state.pc = pc_if_empty.as_offset_int(); } else { + // Last positions to the last row, which is effectively a read + state.metrics.rows_read = state.metrics.rows_read.saturating_add(1); state.pc += 1; } Ok(InsnFunctionStepResult::Step) @@ -1839,6 +1849,17 @@ pub fn op_next( cursor.is_empty() }; if !is_empty { + // Increment metrics for row read + state.metrics.rows_read = state.metrics.rows_read.saturating_add(1); + state.metrics.btree_next = state.metrics.btree_next.saturating_add(1); + // Track if this is a full table scan or index scan + if let Some((_, cursor_type)) = program.cursor_ref.get(*cursor_id) { + if cursor_type.is_index() { + state.metrics.index_steps = state.metrics.index_steps.saturating_add(1); + } else if matches!(cursor_type, CursorType::BTreeTable(_)) { + state.metrics.fullscan_steps = state.metrics.fullscan_steps.saturating_add(1); + } + } state.pc = pc_if_next.as_offset_int(); } else { state.pc += 1; @@ -1870,6 +1891,17 @@ pub fn op_prev( cursor.is_empty() }; if !is_empty { + // Increment metrics for row read + state.metrics.rows_read = state.metrics.rows_read.saturating_add(1); + state.metrics.btree_prev = state.metrics.btree_prev.saturating_add(1); + // Track if this is a full table scan or index scan + if let Some((_, cursor_type)) = program.cursor_ref.get(*cursor_id) { + if cursor_type.is_index() { + state.metrics.index_steps = state.metrics.index_steps.saturating_add(1); + } else if matches!(cursor_type, CursorType::BTreeTable(_)) { + state.metrics.fullscan_steps = state.metrics.fullscan_steps.saturating_add(1); + } + } state.pc = pc_if_prev.as_offset_int(); } else { state.pc += 1; @@ -2441,7 +2473,7 @@ pub fn op_seek_rowid( insn ); assert!(target_pc.is_offset()); - let pc = { + let (pc, did_seek) = { let mut cursor = state.get_cursor(*cursor_id); let cursor = cursor.as_btree_mut(); let rowid = match state.registers[*src_reg].get_value() { @@ -2468,15 +2500,20 @@ pub fn op_seek_rowid( let seek_result = return_if_io!( cursor.seek(SeekKey::TableRowId(rowid), SeekOp::GE { eq_only: true }) ); - if !matches!(seek_result, SeekResult::Found) { + let pc = if !matches!(seek_result, SeekResult::Found) { target_pc.as_offset_int() } else { state.pc + 1 - } + }; + (pc, true) } - None => target_pc.as_offset_int(), + None => (target_pc.as_offset_int(), false), } }; + // Increment btree_seeks metric for SeekRowid operation after cursor is dropped + if did_seek { + state.metrics.btree_seeks = state.metrics.btree_seeks.saturating_add(1); + } state.pc = pc; Ok(InsnFunctionStepResult::Step) } @@ -2782,6 +2819,8 @@ pub fn seek_internal( IOResult::IO(io) => return Ok(SeekInternalResult::IO(io)), } }; + // Increment btree_seeks metric after seek operation and cursor is dropped + state.metrics.btree_seeks = state.metrics.btree_seeks.saturating_add(1); let found = match seek_result { SeekResult::Found => true, SeekResult::NotFound => false, @@ -3760,15 +3799,19 @@ pub fn op_sorter_sort( }, insn ); - let is_empty = { + let (is_empty, did_sort) = { let mut cursor = state.get_cursor(*cursor_id); let cursor = cursor.as_sorter_mut(); let is_empty = cursor.is_empty(); if !is_empty { return_if_io!(cursor.sort()); } - is_empty + (is_empty, !is_empty) }; + // Increment metrics for sort operation after cursor is dropped + if did_sort { + state.metrics.sort_operations = state.metrics.sort_operations.saturating_add(1); + } if is_empty { state.pc = pc_if_empty.as_offset_int(); } else { @@ -5203,6 +5246,8 @@ pub fn op_insert( moved_before )); } + // Increment metrics for row write + state.metrics.rows_written = state.metrics.rows_written.saturating_add(1); // Only update last_insert_rowid for regular table inserts, not schema modifications let root_page = { @@ -5413,6 +5458,8 @@ pub fn op_delete( let cursor = cursor.as_btree_mut(); return_if_io!(cursor.delete()); } + // Increment metrics for row write (DELETE is a write operation) + state.metrics.rows_written = state.metrics.rows_written.saturating_add(1); let schema = program.connection.schema.borrow(); let dependent_views = schema.get_dependent_materialized_views(table_name); if dependent_views.is_empty() { @@ -5534,6 +5581,8 @@ pub fn op_idx_delete( let cursor = cursor.as_btree_mut(); return_if_io!(cursor.delete()); } + // Increment metrics for index write (delete is a write operation) + state.metrics.rows_written = state.metrics.rows_written.saturating_add(1); let n_change = program.n_change.get(); program.n_change.set(n_change + 1); state.pc += 1; @@ -5680,9 +5729,12 @@ pub fn op_idx_insert( cursor.insert(&BTreeKey::new_index_key(record_to_insert), moved_before) ); } + // Increment metrics for index write + if flags.has(IdxInsertFlags::NCHANGE) { + state.metrics.rows_written = state.metrics.rows_written.saturating_add(1); + } state.op_idx_insert_state = OpIdxInsertState::SeekIfUnique; state.pc += 1; - // TODO: flag optimizations, update n_change if OPFLAG_NCHANGE Ok(InsnFunctionStepResult::Step) } } @@ -6996,6 +7048,12 @@ pub fn op_count( state.registers[*target_reg] = Register::Value(Value::Integer(count as i64)); + // For optimized COUNT(*) queries, the count represents rows that would be read + // SQLite tracks this differently (as pages read), but for consistency we track as rows + if *exact { + state.metrics.rows_read = state.metrics.rows_read.saturating_add(count as u64); + } + state.pc += 1; Ok(InsnFunctionStepResult::Step) } diff --git a/core/vdbe/metrics.rs b/core/vdbe/metrics.rs new file mode 100644 index 000000000..a9f891556 --- /dev/null +++ b/core/vdbe/metrics.rs @@ -0,0 +1,199 @@ +use std::fmt; + +/// Statement-level execution metrics +/// +/// These metrics are collected unconditionally during statement execution +/// with minimal overhead (simple counter increments). The cost of incrementing +/// these counters is negligible compared to the actual work being measured. +#[derive(Debug, Default, Clone)] +pub struct StatementMetrics { + // Row operations + pub rows_read: u64, + pub rows_written: u64, + + // Execution statistics + pub vm_steps: u64, + pub insn_executed: u64, + + // Table scan metrics + pub fullscan_steps: u64, + pub index_steps: u64, + + // Sort and filter operations + pub sort_operations: u64, + pub filter_operations: u64, + + // B-tree operations + pub btree_seeks: u64, + pub btree_next: u64, + pub btree_prev: u64, +} + +impl StatementMetrics { + pub fn new() -> Self { + Self::default() + } + + /// Get total row operations + pub fn total_row_ops(&self) -> u64 { + self.rows_read + self.rows_written + } + + /// Merge another metrics instance into this one (for aggregation) + pub fn merge(&mut self, other: &StatementMetrics) { + self.rows_read = self.rows_read.saturating_add(other.rows_read); + self.rows_written = self.rows_written.saturating_add(other.rows_written); + self.vm_steps = self.vm_steps.saturating_add(other.vm_steps); + self.insn_executed = self.insn_executed.saturating_add(other.insn_executed); + self.fullscan_steps = self.fullscan_steps.saturating_add(other.fullscan_steps); + self.index_steps = self.index_steps.saturating_add(other.index_steps); + self.sort_operations = self.sort_operations.saturating_add(other.sort_operations); + self.filter_operations = self + .filter_operations + .saturating_add(other.filter_operations); + self.btree_seeks = self.btree_seeks.saturating_add(other.btree_seeks); + self.btree_next = self.btree_next.saturating_add(other.btree_next); + self.btree_prev = self.btree_prev.saturating_add(other.btree_prev); + } + + /// Reset all counters to zero + pub fn reset(&mut self) { + *self = Self::default(); + } +} + +impl fmt::Display for StatementMetrics { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + writeln!(f, "Statement Metrics:")?; + writeln!(f, " Row Operations:")?; + writeln!(f, " Rows read: {}", self.rows_read)?; + writeln!(f, " Rows written: {}", self.rows_written)?; + writeln!(f, " Execution:")?; + writeln!(f, " VM steps: {}", self.vm_steps)?; + writeln!(f, " Instructions: {}", self.insn_executed)?; + writeln!(f, " Table Access:")?; + writeln!(f, " Full scan steps: {}", self.fullscan_steps)?; + writeln!(f, " Index steps: {}", self.index_steps)?; + writeln!(f, " Operations:")?; + writeln!(f, " Sort operations: {}", self.sort_operations)?; + writeln!(f, " Filter operations:{}", self.filter_operations)?; + writeln!(f, " B-tree Operations:")?; + writeln!(f, " Seeks: {}", self.btree_seeks)?; + writeln!(f, " Next: {}", self.btree_next)?; + writeln!(f, " Prev: {}", self.btree_prev)?; + Ok(()) + } +} + +/// Connection-level metrics aggregation +#[derive(Debug, Default, Clone)] +pub struct ConnectionMetrics { + /// Total number of statements executed + pub total_statements: u64, + + /// Aggregate metrics from all statements + pub aggregate: StatementMetrics, + + /// Metrics from the last executed statement + pub last_statement: Option, + + /// High-water marks for monitoring + pub max_vm_steps_per_statement: u64, + pub max_rows_read_per_statement: u64, +} + +impl ConnectionMetrics { + pub fn new() -> Self { + Self::default() + } + + /// Record a completed statement's metrics + pub fn record_statement(&mut self, metrics: StatementMetrics) { + self.total_statements = self.total_statements.saturating_add(1); + + // Update high-water marks + self.max_vm_steps_per_statement = self.max_vm_steps_per_statement.max(metrics.vm_steps); + self.max_rows_read_per_statement = self.max_rows_read_per_statement.max(metrics.rows_read); + + // Aggregate into total + self.aggregate.merge(&metrics); + + // Keep last statement for debugging + self.last_statement = Some(metrics); + } + + /// Reset connection metrics + pub fn reset(&mut self) { + *self = Self::default(); + } +} + +impl fmt::Display for ConnectionMetrics { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + writeln!(f, "Connection Metrics:")?; + writeln!(f, " Total statements: {}", self.total_statements)?; + writeln!(f, " High-water marks:")?; + writeln!( + f, + " Max VM steps: {}", + self.max_vm_steps_per_statement + )?; + writeln!( + f, + " Max rows read: {}", + self.max_rows_read_per_statement + )?; + writeln!(f)?; + writeln!(f, "Aggregate Statistics:")?; + write!(f, "{}", self.aggregate)?; + + if let Some(ref last) = self.last_statement { + writeln!(f)?; + writeln!(f, "Last Statement:")?; + write!(f, "{last}")?; + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_metrics_merge() { + let mut m1 = StatementMetrics::new(); + m1.rows_read = 100; + m1.vm_steps = 50; + + let mut m2 = StatementMetrics::new(); + m2.rows_read = 200; + m2.vm_steps = 75; + + m1.merge(&m2); + assert_eq!(m1.rows_read, 300); + assert_eq!(m1.vm_steps, 125); + } + + #[test] + fn test_connection_metrics_high_water() { + let mut conn_metrics = ConnectionMetrics::new(); + + let mut stmt1 = StatementMetrics::new(); + stmt1.vm_steps = 100; + stmt1.rows_read = 50; + conn_metrics.record_statement(stmt1); + + let mut stmt2 = StatementMetrics::new(); + stmt2.vm_steps = 75; + stmt2.rows_read = 100; + conn_metrics.record_statement(stmt2); + + assert_eq!(conn_metrics.max_vm_steps_per_statement, 100); + assert_eq!(conn_metrics.max_rows_read_per_statement, 100); + assert_eq!(conn_metrics.total_statements, 2); + assert_eq!(conn_metrics.aggregate.vm_steps, 175); + assert_eq!(conn_metrics.aggregate.rows_read, 150); + } +} diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index 4ac1455e3..bc62960c9 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -22,6 +22,7 @@ pub mod execute; pub mod explain; pub mod insn; pub mod likeop; +pub mod metrics; pub mod sorter; use crate::{ @@ -35,6 +36,7 @@ use crate::{ OpColumnState, OpDeleteState, OpDeleteSubState, OpIdxInsertState, OpInsertState, OpInsertSubState, OpNewRowidState, OpNoConflictState, OpRowIdState, OpSeekState, }, + vdbe::metrics::StatementMetrics, IOExt, RefValue, }; @@ -257,6 +259,8 @@ pub struct ProgramState { op_delete_state: OpDeleteState, op_idx_delete_state: Option, op_integrity_check_state: OpIntegrityCheckState, + /// Metrics collected during statement execution + pub metrics: StatementMetrics, op_open_ephemeral_state: OpOpenEphemeralState, op_new_rowid_state: OpNewRowidState, op_idx_insert_state: OpIdxInsertState, @@ -297,6 +301,7 @@ impl ProgramState { }, op_idx_delete_state: None, op_integrity_check_state: OpIntegrityCheckState::Start, + metrics: StatementMetrics::new(), op_open_ephemeral_state: OpOpenEphemeralState::Start, op_new_rowid_state: OpNewRowidState::Start, op_idx_insert_state: OpIdxInsertState::SeekIfUnique, @@ -446,16 +451,37 @@ impl Program { let _ = state.result_row.take(); let (insn, insn_function) = &self.insns[state.pc as usize]; trace_insn(self, state.pc as InsnReference, insn); + // Always increment VM steps for every loop iteration + state.metrics.vm_steps = state.metrics.vm_steps.saturating_add(1); + match insn_function(self, state, insn, &pager, mv_store.as_ref()) { - Ok(InsnFunctionStepResult::Step) => {} - Ok(InsnFunctionStepResult::Done) => return Ok(StepResult::Done), + Ok(InsnFunctionStepResult::Step) => { + // Instruction completed, moving to next + state.metrics.insn_executed = state.metrics.insn_executed.saturating_add(1); + } + Ok(InsnFunctionStepResult::Done) => { + // Instruction completed execution + state.metrics.insn_executed = state.metrics.insn_executed.saturating_add(1); + return Ok(StepResult::Done); + } Ok(InsnFunctionStepResult::IO(io)) => { + // Instruction not complete - waiting for I/O, will resume at same PC state.io_completions = Some(io); return Ok(StepResult::IO); } - Ok(InsnFunctionStepResult::Row) => return Ok(StepResult::Row), - Ok(InsnFunctionStepResult::Interrupt) => return Ok(StepResult::Interrupt), - Ok(InsnFunctionStepResult::Busy) => return Ok(StepResult::Busy), + Ok(InsnFunctionStepResult::Row) => { + // Instruction completed (ResultRow already incremented PC) + state.metrics.insn_executed = state.metrics.insn_executed.saturating_add(1); + return Ok(StepResult::Row); + } + Ok(InsnFunctionStepResult::Interrupt) => { + // Instruction interrupted - may resume at same PC + return Ok(StepResult::Interrupt); + } + Ok(InsnFunctionStepResult::Busy) => { + // Instruction blocked - will retry at same PC + return Ok(StepResult::Busy); + } Err(err) => { handle_program_error(&pager, &self.connection, &err)?; return Err(err);