mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-23 08:55:40 +01:00
Merge 'add metrics and implement the .stats command' from Glauber Costa
This adds basic statement and connection metrics like SQLite (and
libSQL) have.
This is particularly useful to show that materialized views are working:
turso> create table t(a);
turso> insert into t(a) values (1) , (2), (3), (4), (5), (6), (7), (8),
(9), (10); turso> create materialized view v as select count(*) from t;
turso> .stats on
Stats display enabled.
turso> select count(*) from t;
┌───────────┐
│ count (*) │
├───────────┤
│ 10 │
└───────────┘
Statement Metrics:
Row Operations:
Rows read: 10
Rows written: 0
[ ... other metrics ... ]
turso> select * from v;
┌───────────┐
│ count (*) │
├───────────┤
│ 10 │
└───────────┘
Statement Metrics:
Row Operations:
Rows read: 1
Rows written: 0
[ ... other metrics ... ]
Reviewed-by: Preston Thorpe <preston@turso.tech>
Closes #2651
This commit is contained in:
54
cli/app.rs
54
cli/app.rs
@@ -240,6 +240,40 @@ impl Limbo {
|
||||
self.writeln(opts)
|
||||
}
|
||||
|
||||
fn display_stats(&mut self, args: crate::commands::args::StatsArgs) -> io::Result<()> {
|
||||
use crate::commands::args::StatsToggle;
|
||||
|
||||
// Handle on/off toggle
|
||||
if let Some(toggle) = args.toggle {
|
||||
match toggle {
|
||||
StatsToggle::On => {
|
||||
self.opts.stats = true;
|
||||
self.writeln("Stats display enabled.")?;
|
||||
}
|
||||
StatsToggle::Off => {
|
||||
self.opts.stats = false;
|
||||
self.writeln("Stats display disabled.")?;
|
||||
}
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Display all metrics
|
||||
let output = {
|
||||
let metrics = self.conn.metrics.borrow();
|
||||
format!("{metrics}")
|
||||
};
|
||||
|
||||
self.writeln(output)?;
|
||||
|
||||
if args.reset {
|
||||
self.conn.metrics.borrow_mut().reset();
|
||||
self.writeln("Statistics reset.")?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn reset_input(&mut self) {
|
||||
self.prompt = PROMPT.to_string();
|
||||
self.input_buff.clear();
|
||||
@@ -383,6 +417,21 @@ impl Limbo {
|
||||
}
|
||||
}
|
||||
self.print_query_performance_stats(start, stats);
|
||||
|
||||
// Display stats if enabled
|
||||
if self.opts.stats {
|
||||
let stats_output = {
|
||||
let metrics = self.conn.metrics.borrow();
|
||||
metrics
|
||||
.last_statement
|
||||
.as_ref()
|
||||
.map(|last| format!("\n{last}"))
|
||||
};
|
||||
if let Some(output) = stats_output {
|
||||
let _ = self.writeln(output);
|
||||
}
|
||||
}
|
||||
|
||||
self.reset_input();
|
||||
}
|
||||
|
||||
@@ -564,6 +613,11 @@ impl Limbo {
|
||||
Command::ShowInfo => {
|
||||
let _ = self.show_info();
|
||||
}
|
||||
Command::Stats(args) => {
|
||||
if let Err(e) = self.display_stats(args) {
|
||||
let _ = self.writeln(e.to_string());
|
||||
}
|
||||
}
|
||||
Command::Import(args) => {
|
||||
let w = self.writer.as_mut().unwrap();
|
||||
let mut import_file = ImportFile::new(self.conn.clone(), w);
|
||||
|
||||
@@ -89,6 +89,24 @@ pub struct NullValueArgs {
|
||||
pub value: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Args)]
|
||||
pub struct StatsArgs {
|
||||
/// Toggle stats mode: on or off
|
||||
#[arg(value_enum)]
|
||||
pub toggle: Option<StatsToggle>,
|
||||
/// Reset stats after displaying
|
||||
#[arg(long, short, default_value_t = false)]
|
||||
pub reset: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, ValueEnum, Clone)]
|
||||
pub enum StatsToggle {
|
||||
/// Enable automatic stats display after each statement
|
||||
On,
|
||||
/// Disable automatic stats display
|
||||
Off,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Args)]
|
||||
pub struct EchoArgs {
|
||||
#[arg(value_enum)]
|
||||
|
||||
@@ -3,8 +3,8 @@ pub mod import;
|
||||
|
||||
use args::{
|
||||
CwdArgs, DbConfigArgs, EchoArgs, ExitArgs, HeadersArgs, IndexesArgs, LoadExtensionArgs,
|
||||
NullValueArgs, OpcodesArgs, OpenArgs, OutputModeArgs, SchemaArgs, SetOutputArgs, TablesArgs,
|
||||
TimerArgs,
|
||||
NullValueArgs, OpcodesArgs, OpenArgs, OutputModeArgs, SchemaArgs, SetOutputArgs, StatsArgs,
|
||||
TablesArgs, TimerArgs,
|
||||
};
|
||||
use clap::Parser;
|
||||
use import::ImportArgs;
|
||||
@@ -78,6 +78,9 @@ pub enum Command {
|
||||
/// Print or set the current configuration for the database. Currently ignored.
|
||||
#[command(name = "dbconfig", display_name = ".dbconfig")]
|
||||
DbConfig(DbConfigArgs),
|
||||
/// Display database statistics
|
||||
#[command(name = "stats", display_name = ".stats")]
|
||||
Stats(StatsArgs),
|
||||
/// List vfs modules available
|
||||
#[command(name = "vfslist", display_name = ".vfslist")]
|
||||
ListVfs,
|
||||
|
||||
@@ -85,6 +85,7 @@ pub struct Settings {
|
||||
pub timer: bool,
|
||||
pub headers: bool,
|
||||
pub mcp: bool,
|
||||
pub stats: bool,
|
||||
}
|
||||
|
||||
impl From<Opts> for Settings {
|
||||
@@ -110,6 +111,7 @@ impl From<Opts> for Settings {
|
||||
timer: false,
|
||||
headers: false,
|
||||
mcp: opts.mcp,
|
||||
stats: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
48
core/lib.rs
48
core/lib.rs
@@ -47,6 +47,7 @@ use crate::translate::pragma::TURSO_CDC_DEFAULT_TABLE_NAME;
|
||||
use crate::types::WalFrameInfo;
|
||||
#[cfg(feature = "fs")]
|
||||
use crate::util::{OpenMode, OpenOptions};
|
||||
use crate::vdbe::metrics::ConnectionMetrics;
|
||||
use crate::vtab::VirtualTable;
|
||||
use core::str;
|
||||
pub use error::LimboError;
|
||||
@@ -422,6 +423,7 @@ impl Database {
|
||||
attached_databases: RefCell::new(DatabaseCatalog::new()),
|
||||
query_only: Cell::new(false),
|
||||
view_transaction_states: RefCell::new(HashMap::new()),
|
||||
metrics: RefCell::new(ConnectionMetrics::new()),
|
||||
});
|
||||
let builtin_syms = self.builtin_syms.borrow();
|
||||
// add built-in extensions symbols to the connection to prevent having to load each time
|
||||
@@ -844,6 +846,8 @@ pub struct Connection {
|
||||
/// Per-connection view transaction states for uncommitted changes. This represents
|
||||
/// one entry per view that was touched in the transaction.
|
||||
view_transaction_states: RefCell<HashMap<String, ViewTransactionState>>,
|
||||
/// Connection-level metrics aggregation
|
||||
pub metrics: RefCell<ConnectionMetrics>,
|
||||
}
|
||||
|
||||
impl Connection {
|
||||
@@ -1956,26 +1960,32 @@ impl Statement {
|
||||
}
|
||||
|
||||
pub fn step(&mut self) -> Result<StepResult> {
|
||||
if !self.accesses_db {
|
||||
return self
|
||||
.program
|
||||
.step(&mut self.state, self.mv_store.clone(), self.pager.clone());
|
||||
}
|
||||
|
||||
const MAX_SCHEMA_RETRY: usize = 50;
|
||||
let mut res = self
|
||||
.program
|
||||
.step(&mut self.state, self.mv_store.clone(), self.pager.clone());
|
||||
for attempt in 0..MAX_SCHEMA_RETRY {
|
||||
// Only reprepare if we still need to update schema
|
||||
if !matches!(res, Err(LimboError::SchemaUpdated)) {
|
||||
break;
|
||||
let res = if !self.accesses_db {
|
||||
self.program
|
||||
.step(&mut self.state, self.mv_store.clone(), self.pager.clone())
|
||||
} else {
|
||||
const MAX_SCHEMA_RETRY: usize = 50;
|
||||
let mut res =
|
||||
self.program
|
||||
.step(&mut self.state, self.mv_store.clone(), self.pager.clone());
|
||||
for attempt in 0..MAX_SCHEMA_RETRY {
|
||||
// Only reprepare if we still need to update schema
|
||||
if !matches!(res, Err(LimboError::SchemaUpdated)) {
|
||||
break;
|
||||
}
|
||||
tracing::debug!("reprepare: attempt={}", attempt);
|
||||
self.reprepare()?;
|
||||
res = self
|
||||
.program
|
||||
.step(&mut self.state, self.mv_store.clone(), self.pager.clone());
|
||||
}
|
||||
tracing::debug!("reprepare: attempt={}", attempt);
|
||||
self.reprepare()?;
|
||||
res = self
|
||||
.program
|
||||
.step(&mut self.state, self.mv_store.clone(), self.pager.clone());
|
||||
res
|
||||
};
|
||||
|
||||
// Aggregate metrics when statement completes
|
||||
if matches!(res, Ok(StepResult::Done)) {
|
||||
let mut conn_metrics = self.program.connection.metrics.borrow_mut();
|
||||
conn_metrics.record_statement(self.state.metrics.clone());
|
||||
}
|
||||
|
||||
res
|
||||
|
||||
@@ -1171,7 +1171,7 @@ fn emit_update_insns(
|
||||
record_reg,
|
||||
unpacked_start: Some(start),
|
||||
unpacked_count: Some((index.columns.len() + 1) as u16),
|
||||
flags: IdxInsertFlags::new(),
|
||||
flags: IdxInsertFlags::new().nchange(true),
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -523,7 +523,7 @@ pub fn translate_insert(
|
||||
unpacked_start: Some(idx_start_reg), // TODO: enable optimization
|
||||
unpacked_count: Some((num_cols + 1) as u16),
|
||||
// TODO: figure out how to determine whether or not we need to seek prior to insert.
|
||||
flags: IdxInsertFlags::new(),
|
||||
flags: IdxInsertFlags::new().nchange(true),
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -1086,9 +1086,13 @@ pub fn op_vfilter(
|
||||
};
|
||||
cursor.filter(*idx_num as i32, idx_str, *arg_count, args)?
|
||||
};
|
||||
// Increment filter_operations metric for virtual table filter
|
||||
state.metrics.filter_operations = state.metrics.filter_operations.saturating_add(1);
|
||||
if !has_rows {
|
||||
state.pc = pc_if_empty.as_offset_int();
|
||||
} else {
|
||||
// VFilter positions to the first row if any exist, which counts as a read
|
||||
state.metrics.rows_read = state.metrics.rows_read.saturating_add(1);
|
||||
state.pc += 1;
|
||||
}
|
||||
Ok(InsnFunctionStepResult::Step)
|
||||
@@ -1203,6 +1207,8 @@ pub fn op_vnext(
|
||||
cursor.next()?
|
||||
};
|
||||
if has_more {
|
||||
// Increment metrics for row read from virtual table (including materialized views)
|
||||
state.metrics.rows_read = state.metrics.rows_read.saturating_add(1);
|
||||
state.pc = pc_if_next.as_offset_int();
|
||||
} else {
|
||||
state.pc += 1;
|
||||
@@ -1283,6 +1289,8 @@ pub fn op_rewind(
|
||||
if is_empty {
|
||||
state.pc = pc_if_empty.as_offset_int();
|
||||
} else {
|
||||
// Rewind positions to the first row, which is effectively a read
|
||||
state.metrics.rows_read = state.metrics.rows_read.saturating_add(1);
|
||||
state.pc += 1;
|
||||
}
|
||||
Ok(InsnFunctionStepResult::Step)
|
||||
@@ -1312,6 +1320,8 @@ pub fn op_last(
|
||||
if is_empty {
|
||||
state.pc = pc_if_empty.as_offset_int();
|
||||
} else {
|
||||
// Last positions to the last row, which is effectively a read
|
||||
state.metrics.rows_read = state.metrics.rows_read.saturating_add(1);
|
||||
state.pc += 1;
|
||||
}
|
||||
Ok(InsnFunctionStepResult::Step)
|
||||
@@ -1839,6 +1849,17 @@ pub fn op_next(
|
||||
cursor.is_empty()
|
||||
};
|
||||
if !is_empty {
|
||||
// Increment metrics for row read
|
||||
state.metrics.rows_read = state.metrics.rows_read.saturating_add(1);
|
||||
state.metrics.btree_next = state.metrics.btree_next.saturating_add(1);
|
||||
// Track if this is a full table scan or index scan
|
||||
if let Some((_, cursor_type)) = program.cursor_ref.get(*cursor_id) {
|
||||
if cursor_type.is_index() {
|
||||
state.metrics.index_steps = state.metrics.index_steps.saturating_add(1);
|
||||
} else if matches!(cursor_type, CursorType::BTreeTable(_)) {
|
||||
state.metrics.fullscan_steps = state.metrics.fullscan_steps.saturating_add(1);
|
||||
}
|
||||
}
|
||||
state.pc = pc_if_next.as_offset_int();
|
||||
} else {
|
||||
state.pc += 1;
|
||||
@@ -1870,6 +1891,17 @@ pub fn op_prev(
|
||||
cursor.is_empty()
|
||||
};
|
||||
if !is_empty {
|
||||
// Increment metrics for row read
|
||||
state.metrics.rows_read = state.metrics.rows_read.saturating_add(1);
|
||||
state.metrics.btree_prev = state.metrics.btree_prev.saturating_add(1);
|
||||
// Track if this is a full table scan or index scan
|
||||
if let Some((_, cursor_type)) = program.cursor_ref.get(*cursor_id) {
|
||||
if cursor_type.is_index() {
|
||||
state.metrics.index_steps = state.metrics.index_steps.saturating_add(1);
|
||||
} else if matches!(cursor_type, CursorType::BTreeTable(_)) {
|
||||
state.metrics.fullscan_steps = state.metrics.fullscan_steps.saturating_add(1);
|
||||
}
|
||||
}
|
||||
state.pc = pc_if_prev.as_offset_int();
|
||||
} else {
|
||||
state.pc += 1;
|
||||
@@ -2449,7 +2481,7 @@ pub fn op_seek_rowid(
|
||||
insn
|
||||
);
|
||||
assert!(target_pc.is_offset());
|
||||
let pc = {
|
||||
let (pc, did_seek) = {
|
||||
let mut cursor = state.get_cursor(*cursor_id);
|
||||
let cursor = cursor.as_btree_mut();
|
||||
let rowid = match state.registers[*src_reg].get_value() {
|
||||
@@ -2476,15 +2508,20 @@ pub fn op_seek_rowid(
|
||||
let seek_result = return_if_io!(
|
||||
cursor.seek(SeekKey::TableRowId(rowid), SeekOp::GE { eq_only: true })
|
||||
);
|
||||
if !matches!(seek_result, SeekResult::Found) {
|
||||
let pc = if !matches!(seek_result, SeekResult::Found) {
|
||||
target_pc.as_offset_int()
|
||||
} else {
|
||||
state.pc + 1
|
||||
}
|
||||
};
|
||||
(pc, true)
|
||||
}
|
||||
None => target_pc.as_offset_int(),
|
||||
None => (target_pc.as_offset_int(), false),
|
||||
}
|
||||
};
|
||||
// Increment btree_seeks metric for SeekRowid operation after cursor is dropped
|
||||
if did_seek {
|
||||
state.metrics.btree_seeks = state.metrics.btree_seeks.saturating_add(1);
|
||||
}
|
||||
state.pc = pc;
|
||||
Ok(InsnFunctionStepResult::Step)
|
||||
}
|
||||
@@ -2790,6 +2827,8 @@ pub fn seek_internal(
|
||||
IOResult::IO(io) => return Ok(SeekInternalResult::IO(io)),
|
||||
}
|
||||
};
|
||||
// Increment btree_seeks metric after seek operation and cursor is dropped
|
||||
state.metrics.btree_seeks = state.metrics.btree_seeks.saturating_add(1);
|
||||
let found = match seek_result {
|
||||
SeekResult::Found => true,
|
||||
SeekResult::NotFound => false,
|
||||
@@ -3768,15 +3807,19 @@ pub fn op_sorter_sort(
|
||||
},
|
||||
insn
|
||||
);
|
||||
let is_empty = {
|
||||
let (is_empty, did_sort) = {
|
||||
let mut cursor = state.get_cursor(*cursor_id);
|
||||
let cursor = cursor.as_sorter_mut();
|
||||
let is_empty = cursor.is_empty();
|
||||
if !is_empty {
|
||||
return_if_io!(cursor.sort());
|
||||
}
|
||||
is_empty
|
||||
(is_empty, !is_empty)
|
||||
};
|
||||
// Increment metrics for sort operation after cursor is dropped
|
||||
if did_sort {
|
||||
state.metrics.sort_operations = state.metrics.sort_operations.saturating_add(1);
|
||||
}
|
||||
if is_empty {
|
||||
state.pc = pc_if_empty.as_offset_int();
|
||||
} else {
|
||||
@@ -5211,6 +5254,8 @@ pub fn op_insert(
|
||||
moved_before
|
||||
));
|
||||
}
|
||||
// Increment metrics for row write
|
||||
state.metrics.rows_written = state.metrics.rows_written.saturating_add(1);
|
||||
|
||||
// Only update last_insert_rowid for regular table inserts, not schema modifications
|
||||
let root_page = {
|
||||
@@ -5421,6 +5466,8 @@ pub fn op_delete(
|
||||
let cursor = cursor.as_btree_mut();
|
||||
return_if_io!(cursor.delete());
|
||||
}
|
||||
// Increment metrics for row write (DELETE is a write operation)
|
||||
state.metrics.rows_written = state.metrics.rows_written.saturating_add(1);
|
||||
let schema = program.connection.schema.borrow();
|
||||
let dependent_views = schema.get_dependent_materialized_views(table_name);
|
||||
if dependent_views.is_empty() {
|
||||
@@ -5542,6 +5589,8 @@ pub fn op_idx_delete(
|
||||
let cursor = cursor.as_btree_mut();
|
||||
return_if_io!(cursor.delete());
|
||||
}
|
||||
// Increment metrics for index write (delete is a write operation)
|
||||
state.metrics.rows_written = state.metrics.rows_written.saturating_add(1);
|
||||
let n_change = program.n_change.get();
|
||||
program.n_change.set(n_change + 1);
|
||||
state.pc += 1;
|
||||
@@ -5688,9 +5737,12 @@ pub fn op_idx_insert(
|
||||
cursor.insert(&BTreeKey::new_index_key(record_to_insert), moved_before)
|
||||
);
|
||||
}
|
||||
// Increment metrics for index write
|
||||
if flags.has(IdxInsertFlags::NCHANGE) {
|
||||
state.metrics.rows_written = state.metrics.rows_written.saturating_add(1);
|
||||
}
|
||||
state.op_idx_insert_state = OpIdxInsertState::SeekIfUnique;
|
||||
state.pc += 1;
|
||||
// TODO: flag optimizations, update n_change if OPFLAG_NCHANGE
|
||||
Ok(InsnFunctionStepResult::Step)
|
||||
}
|
||||
}
|
||||
@@ -7002,6 +7054,12 @@ pub fn op_count(
|
||||
|
||||
state.registers[*target_reg] = Register::Value(Value::Integer(count as i64));
|
||||
|
||||
// For optimized COUNT(*) queries, the count represents rows that would be read
|
||||
// SQLite tracks this differently (as pages read), but for consistency we track as rows
|
||||
if *exact {
|
||||
state.metrics.rows_read = state.metrics.rows_read.saturating_add(count as u64);
|
||||
}
|
||||
|
||||
state.pc += 1;
|
||||
Ok(InsnFunctionStepResult::Step)
|
||||
}
|
||||
|
||||
199
core/vdbe/metrics.rs
Normal file
199
core/vdbe/metrics.rs
Normal file
@@ -0,0 +1,199 @@
|
||||
use std::fmt;
|
||||
|
||||
/// Statement-level execution metrics
|
||||
///
|
||||
/// These metrics are collected unconditionally during statement execution
|
||||
/// with minimal overhead (simple counter increments). The cost of incrementing
|
||||
/// these counters is negligible compared to the actual work being measured.
|
||||
#[derive(Debug, Default, Clone)]
|
||||
pub struct StatementMetrics {
|
||||
// Row operations
|
||||
pub rows_read: u64,
|
||||
pub rows_written: u64,
|
||||
|
||||
// Execution statistics
|
||||
pub vm_steps: u64,
|
||||
pub insn_executed: u64,
|
||||
|
||||
// Table scan metrics
|
||||
pub fullscan_steps: u64,
|
||||
pub index_steps: u64,
|
||||
|
||||
// Sort and filter operations
|
||||
pub sort_operations: u64,
|
||||
pub filter_operations: u64,
|
||||
|
||||
// B-tree operations
|
||||
pub btree_seeks: u64,
|
||||
pub btree_next: u64,
|
||||
pub btree_prev: u64,
|
||||
}
|
||||
|
||||
impl StatementMetrics {
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
/// Get total row operations
|
||||
pub fn total_row_ops(&self) -> u64 {
|
||||
self.rows_read + self.rows_written
|
||||
}
|
||||
|
||||
/// Merge another metrics instance into this one (for aggregation)
|
||||
pub fn merge(&mut self, other: &StatementMetrics) {
|
||||
self.rows_read = self.rows_read.saturating_add(other.rows_read);
|
||||
self.rows_written = self.rows_written.saturating_add(other.rows_written);
|
||||
self.vm_steps = self.vm_steps.saturating_add(other.vm_steps);
|
||||
self.insn_executed = self.insn_executed.saturating_add(other.insn_executed);
|
||||
self.fullscan_steps = self.fullscan_steps.saturating_add(other.fullscan_steps);
|
||||
self.index_steps = self.index_steps.saturating_add(other.index_steps);
|
||||
self.sort_operations = self.sort_operations.saturating_add(other.sort_operations);
|
||||
self.filter_operations = self
|
||||
.filter_operations
|
||||
.saturating_add(other.filter_operations);
|
||||
self.btree_seeks = self.btree_seeks.saturating_add(other.btree_seeks);
|
||||
self.btree_next = self.btree_next.saturating_add(other.btree_next);
|
||||
self.btree_prev = self.btree_prev.saturating_add(other.btree_prev);
|
||||
}
|
||||
|
||||
/// Reset all counters to zero
|
||||
pub fn reset(&mut self) {
|
||||
*self = Self::default();
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for StatementMetrics {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
writeln!(f, "Statement Metrics:")?;
|
||||
writeln!(f, " Row Operations:")?;
|
||||
writeln!(f, " Rows read: {}", self.rows_read)?;
|
||||
writeln!(f, " Rows written: {}", self.rows_written)?;
|
||||
writeln!(f, " Execution:")?;
|
||||
writeln!(f, " VM steps: {}", self.vm_steps)?;
|
||||
writeln!(f, " Instructions: {}", self.insn_executed)?;
|
||||
writeln!(f, " Table Access:")?;
|
||||
writeln!(f, " Full scan steps: {}", self.fullscan_steps)?;
|
||||
writeln!(f, " Index steps: {}", self.index_steps)?;
|
||||
writeln!(f, " Operations:")?;
|
||||
writeln!(f, " Sort operations: {}", self.sort_operations)?;
|
||||
writeln!(f, " Filter operations:{}", self.filter_operations)?;
|
||||
writeln!(f, " B-tree Operations:")?;
|
||||
writeln!(f, " Seeks: {}", self.btree_seeks)?;
|
||||
writeln!(f, " Next: {}", self.btree_next)?;
|
||||
writeln!(f, " Prev: {}", self.btree_prev)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Connection-level metrics aggregation
|
||||
#[derive(Debug, Default, Clone)]
|
||||
pub struct ConnectionMetrics {
|
||||
/// Total number of statements executed
|
||||
pub total_statements: u64,
|
||||
|
||||
/// Aggregate metrics from all statements
|
||||
pub aggregate: StatementMetrics,
|
||||
|
||||
/// Metrics from the last executed statement
|
||||
pub last_statement: Option<StatementMetrics>,
|
||||
|
||||
/// High-water marks for monitoring
|
||||
pub max_vm_steps_per_statement: u64,
|
||||
pub max_rows_read_per_statement: u64,
|
||||
}
|
||||
|
||||
impl ConnectionMetrics {
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
/// Record a completed statement's metrics
|
||||
pub fn record_statement(&mut self, metrics: StatementMetrics) {
|
||||
self.total_statements = self.total_statements.saturating_add(1);
|
||||
|
||||
// Update high-water marks
|
||||
self.max_vm_steps_per_statement = self.max_vm_steps_per_statement.max(metrics.vm_steps);
|
||||
self.max_rows_read_per_statement = self.max_rows_read_per_statement.max(metrics.rows_read);
|
||||
|
||||
// Aggregate into total
|
||||
self.aggregate.merge(&metrics);
|
||||
|
||||
// Keep last statement for debugging
|
||||
self.last_statement = Some(metrics);
|
||||
}
|
||||
|
||||
/// Reset connection metrics
|
||||
pub fn reset(&mut self) {
|
||||
*self = Self::default();
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for ConnectionMetrics {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
writeln!(f, "Connection Metrics:")?;
|
||||
writeln!(f, " Total statements: {}", self.total_statements)?;
|
||||
writeln!(f, " High-water marks:")?;
|
||||
writeln!(
|
||||
f,
|
||||
" Max VM steps: {}",
|
||||
self.max_vm_steps_per_statement
|
||||
)?;
|
||||
writeln!(
|
||||
f,
|
||||
" Max rows read: {}",
|
||||
self.max_rows_read_per_statement
|
||||
)?;
|
||||
writeln!(f)?;
|
||||
writeln!(f, "Aggregate Statistics:")?;
|
||||
write!(f, "{}", self.aggregate)?;
|
||||
|
||||
if let Some(ref last) = self.last_statement {
|
||||
writeln!(f)?;
|
||||
writeln!(f, "Last Statement:")?;
|
||||
write!(f, "{last}")?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_metrics_merge() {
|
||||
let mut m1 = StatementMetrics::new();
|
||||
m1.rows_read = 100;
|
||||
m1.vm_steps = 50;
|
||||
|
||||
let mut m2 = StatementMetrics::new();
|
||||
m2.rows_read = 200;
|
||||
m2.vm_steps = 75;
|
||||
|
||||
m1.merge(&m2);
|
||||
assert_eq!(m1.rows_read, 300);
|
||||
assert_eq!(m1.vm_steps, 125);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_connection_metrics_high_water() {
|
||||
let mut conn_metrics = ConnectionMetrics::new();
|
||||
|
||||
let mut stmt1 = StatementMetrics::new();
|
||||
stmt1.vm_steps = 100;
|
||||
stmt1.rows_read = 50;
|
||||
conn_metrics.record_statement(stmt1);
|
||||
|
||||
let mut stmt2 = StatementMetrics::new();
|
||||
stmt2.vm_steps = 75;
|
||||
stmt2.rows_read = 100;
|
||||
conn_metrics.record_statement(stmt2);
|
||||
|
||||
assert_eq!(conn_metrics.max_vm_steps_per_statement, 100);
|
||||
assert_eq!(conn_metrics.max_rows_read_per_statement, 100);
|
||||
assert_eq!(conn_metrics.total_statements, 2);
|
||||
assert_eq!(conn_metrics.aggregate.vm_steps, 175);
|
||||
assert_eq!(conn_metrics.aggregate.rows_read, 150);
|
||||
}
|
||||
}
|
||||
@@ -22,6 +22,7 @@ pub mod execute;
|
||||
pub mod explain;
|
||||
pub mod insn;
|
||||
pub mod likeop;
|
||||
pub mod metrics;
|
||||
pub mod sorter;
|
||||
|
||||
use crate::{
|
||||
@@ -35,6 +36,7 @@ use crate::{
|
||||
OpColumnState, OpDeleteState, OpDeleteSubState, OpIdxInsertState, OpInsertState,
|
||||
OpInsertSubState, OpNewRowidState, OpNoConflictState, OpRowIdState, OpSeekState,
|
||||
},
|
||||
vdbe::metrics::StatementMetrics,
|
||||
IOExt, RefValue,
|
||||
};
|
||||
|
||||
@@ -257,6 +259,8 @@ pub struct ProgramState {
|
||||
op_delete_state: OpDeleteState,
|
||||
op_idx_delete_state: Option<OpIdxDeleteState>,
|
||||
op_integrity_check_state: OpIntegrityCheckState,
|
||||
/// Metrics collected during statement execution
|
||||
pub metrics: StatementMetrics,
|
||||
op_open_ephemeral_state: OpOpenEphemeralState,
|
||||
op_new_rowid_state: OpNewRowidState,
|
||||
op_idx_insert_state: OpIdxInsertState,
|
||||
@@ -297,6 +301,7 @@ impl ProgramState {
|
||||
},
|
||||
op_idx_delete_state: None,
|
||||
op_integrity_check_state: OpIntegrityCheckState::Start,
|
||||
metrics: StatementMetrics::new(),
|
||||
op_open_ephemeral_state: OpOpenEphemeralState::Start,
|
||||
op_new_rowid_state: OpNewRowidState::Start,
|
||||
op_idx_insert_state: OpIdxInsertState::SeekIfUnique,
|
||||
@@ -446,16 +451,37 @@ impl Program {
|
||||
let _ = state.result_row.take();
|
||||
let (insn, insn_function) = &self.insns[state.pc as usize];
|
||||
trace_insn(self, state.pc as InsnReference, insn);
|
||||
// Always increment VM steps for every loop iteration
|
||||
state.metrics.vm_steps = state.metrics.vm_steps.saturating_add(1);
|
||||
|
||||
match insn_function(self, state, insn, &pager, mv_store.as_ref()) {
|
||||
Ok(InsnFunctionStepResult::Step) => {}
|
||||
Ok(InsnFunctionStepResult::Done) => return Ok(StepResult::Done),
|
||||
Ok(InsnFunctionStepResult::Step) => {
|
||||
// Instruction completed, moving to next
|
||||
state.metrics.insn_executed = state.metrics.insn_executed.saturating_add(1);
|
||||
}
|
||||
Ok(InsnFunctionStepResult::Done) => {
|
||||
// Instruction completed execution
|
||||
state.metrics.insn_executed = state.metrics.insn_executed.saturating_add(1);
|
||||
return Ok(StepResult::Done);
|
||||
}
|
||||
Ok(InsnFunctionStepResult::IO(io)) => {
|
||||
// Instruction not complete - waiting for I/O, will resume at same PC
|
||||
state.io_completions = Some(io);
|
||||
return Ok(StepResult::IO);
|
||||
}
|
||||
Ok(InsnFunctionStepResult::Row) => return Ok(StepResult::Row),
|
||||
Ok(InsnFunctionStepResult::Interrupt) => return Ok(StepResult::Interrupt),
|
||||
Ok(InsnFunctionStepResult::Busy) => return Ok(StepResult::Busy),
|
||||
Ok(InsnFunctionStepResult::Row) => {
|
||||
// Instruction completed (ResultRow already incremented PC)
|
||||
state.metrics.insn_executed = state.metrics.insn_executed.saturating_add(1);
|
||||
return Ok(StepResult::Row);
|
||||
}
|
||||
Ok(InsnFunctionStepResult::Interrupt) => {
|
||||
// Instruction interrupted - may resume at same PC
|
||||
return Ok(StepResult::Interrupt);
|
||||
}
|
||||
Ok(InsnFunctionStepResult::Busy) => {
|
||||
// Instruction blocked - will retry at same PC
|
||||
return Ok(StepResult::Busy);
|
||||
}
|
||||
Err(err) => {
|
||||
handle_program_error(&pager, &self.connection, &err)?;
|
||||
return Err(err);
|
||||
|
||||
Reference in New Issue
Block a user