mirror of
https://github.com/aljazceru/turso.git
synced 2025-12-18 09:04:19 +01:00
add metrics and implement the .stats command
This adds basic statement and connection metrics like SQLite (and
libSQL) have.
This is particularly useful to show that materialized views are working:
turso> create table t(a);
turso> insert into t(a) values (1) , (2), (3), (4), (5), (6), (7), (8), (9), (10);
turso> create materialized view v as select count(*) from t;
turso> .stats on
Stats display enabled.
turso> select count(*) from t;
┌───────────┐
│ count (*) │
├───────────┤
│ 10 │
└───────────┘
Statement Metrics:
Row Operations:
Rows read: 10
Rows written: 0
[ ... other metrics ... ]
turso> select * from v;
┌───────────┐
│ count (*) │
├───────────┤
│ 10 │
└───────────┘
Statement Metrics:
Row Operations:
Rows read: 1
Rows written: 0
[ ... other metrics ... ]
This commit is contained in:
54
cli/app.rs
54
cli/app.rs
@@ -240,6 +240,40 @@ impl Limbo {
|
||||
self.writeln(opts)
|
||||
}
|
||||
|
||||
fn display_stats(&mut self, args: crate::commands::args::StatsArgs) -> io::Result<()> {
|
||||
use crate::commands::args::StatsToggle;
|
||||
|
||||
// Handle on/off toggle
|
||||
if let Some(toggle) = args.toggle {
|
||||
match toggle {
|
||||
StatsToggle::On => {
|
||||
self.opts.stats = true;
|
||||
self.writeln("Stats display enabled.")?;
|
||||
}
|
||||
StatsToggle::Off => {
|
||||
self.opts.stats = false;
|
||||
self.writeln("Stats display disabled.")?;
|
||||
}
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Display all metrics
|
||||
let output = {
|
||||
let metrics = self.conn.metrics.borrow();
|
||||
format!("{metrics}")
|
||||
};
|
||||
|
||||
self.writeln(output)?;
|
||||
|
||||
if args.reset {
|
||||
self.conn.metrics.borrow_mut().reset();
|
||||
self.writeln("Statistics reset.")?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn reset_input(&mut self) {
|
||||
self.prompt = PROMPT.to_string();
|
||||
self.input_buff.clear();
|
||||
@@ -383,6 +417,21 @@ impl Limbo {
|
||||
}
|
||||
}
|
||||
self.print_query_performance_stats(start, stats);
|
||||
|
||||
// Display stats if enabled
|
||||
if self.opts.stats {
|
||||
let stats_output = {
|
||||
let metrics = self.conn.metrics.borrow();
|
||||
metrics
|
||||
.last_statement
|
||||
.as_ref()
|
||||
.map(|last| format!("\n{last}"))
|
||||
};
|
||||
if let Some(output) = stats_output {
|
||||
let _ = self.writeln(output);
|
||||
}
|
||||
}
|
||||
|
||||
self.reset_input();
|
||||
}
|
||||
|
||||
@@ -564,6 +613,11 @@ impl Limbo {
|
||||
Command::ShowInfo => {
|
||||
let _ = self.show_info();
|
||||
}
|
||||
Command::Stats(args) => {
|
||||
if let Err(e) = self.display_stats(args) {
|
||||
let _ = self.writeln(e.to_string());
|
||||
}
|
||||
}
|
||||
Command::Import(args) => {
|
||||
let w = self.writer.as_mut().unwrap();
|
||||
let mut import_file = ImportFile::new(self.conn.clone(), w);
|
||||
|
||||
@@ -89,6 +89,24 @@ pub struct NullValueArgs {
|
||||
pub value: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Args)]
|
||||
pub struct StatsArgs {
|
||||
/// Toggle stats mode: on or off
|
||||
#[arg(value_enum)]
|
||||
pub toggle: Option<StatsToggle>,
|
||||
/// Reset stats after displaying
|
||||
#[arg(long, short, default_value_t = false)]
|
||||
pub reset: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, ValueEnum, Clone)]
|
||||
pub enum StatsToggle {
|
||||
/// Enable automatic stats display after each statement
|
||||
On,
|
||||
/// Disable automatic stats display
|
||||
Off,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Args)]
|
||||
pub struct EchoArgs {
|
||||
#[arg(value_enum)]
|
||||
|
||||
@@ -3,8 +3,8 @@ pub mod import;
|
||||
|
||||
use args::{
|
||||
CwdArgs, DbConfigArgs, EchoArgs, ExitArgs, HeadersArgs, IndexesArgs, LoadExtensionArgs,
|
||||
NullValueArgs, OpcodesArgs, OpenArgs, OutputModeArgs, SchemaArgs, SetOutputArgs, TablesArgs,
|
||||
TimerArgs,
|
||||
NullValueArgs, OpcodesArgs, OpenArgs, OutputModeArgs, SchemaArgs, SetOutputArgs, StatsArgs,
|
||||
TablesArgs, TimerArgs,
|
||||
};
|
||||
use clap::Parser;
|
||||
use import::ImportArgs;
|
||||
@@ -78,6 +78,9 @@ pub enum Command {
|
||||
/// Print or set the current configuration for the database. Currently ignored.
|
||||
#[command(name = "dbconfig", display_name = ".dbconfig")]
|
||||
DbConfig(DbConfigArgs),
|
||||
/// Display database statistics
|
||||
#[command(name = "stats", display_name = ".stats")]
|
||||
Stats(StatsArgs),
|
||||
/// List vfs modules available
|
||||
#[command(name = "vfslist", display_name = ".vfslist")]
|
||||
ListVfs,
|
||||
|
||||
@@ -85,6 +85,7 @@ pub struct Settings {
|
||||
pub timer: bool,
|
||||
pub headers: bool,
|
||||
pub mcp: bool,
|
||||
pub stats: bool,
|
||||
}
|
||||
|
||||
impl From<Opts> for Settings {
|
||||
@@ -110,6 +111,7 @@ impl From<Opts> for Settings {
|
||||
timer: false,
|
||||
headers: false,
|
||||
mcp: opts.mcp,
|
||||
stats: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
48
core/lib.rs
48
core/lib.rs
@@ -47,6 +47,7 @@ use crate::translate::pragma::TURSO_CDC_DEFAULT_TABLE_NAME;
|
||||
use crate::types::WalFrameInfo;
|
||||
#[cfg(feature = "fs")]
|
||||
use crate::util::{OpenMode, OpenOptions};
|
||||
use crate::vdbe::metrics::ConnectionMetrics;
|
||||
use crate::vtab::VirtualTable;
|
||||
use core::str;
|
||||
pub use error::LimboError;
|
||||
@@ -422,6 +423,7 @@ impl Database {
|
||||
attached_databases: RefCell::new(DatabaseCatalog::new()),
|
||||
query_only: Cell::new(false),
|
||||
view_transaction_states: RefCell::new(HashMap::new()),
|
||||
metrics: RefCell::new(ConnectionMetrics::new()),
|
||||
});
|
||||
let builtin_syms = self.builtin_syms.borrow();
|
||||
// add built-in extensions symbols to the connection to prevent having to load each time
|
||||
@@ -844,6 +846,8 @@ pub struct Connection {
|
||||
/// Per-connection view transaction states for uncommitted changes. This represents
|
||||
/// one entry per view that was touched in the transaction.
|
||||
view_transaction_states: RefCell<HashMap<String, ViewTransactionState>>,
|
||||
/// Connection-level metrics aggregation
|
||||
pub metrics: RefCell<ConnectionMetrics>,
|
||||
}
|
||||
|
||||
impl Connection {
|
||||
@@ -1956,26 +1960,32 @@ impl Statement {
|
||||
}
|
||||
|
||||
pub fn step(&mut self) -> Result<StepResult> {
|
||||
if !self.accesses_db {
|
||||
return self
|
||||
.program
|
||||
.step(&mut self.state, self.mv_store.clone(), self.pager.clone());
|
||||
}
|
||||
|
||||
const MAX_SCHEMA_RETRY: usize = 50;
|
||||
let mut res = self
|
||||
.program
|
||||
.step(&mut self.state, self.mv_store.clone(), self.pager.clone());
|
||||
for attempt in 0..MAX_SCHEMA_RETRY {
|
||||
// Only reprepare if we still need to update schema
|
||||
if !matches!(res, Err(LimboError::SchemaUpdated)) {
|
||||
break;
|
||||
let res = if !self.accesses_db {
|
||||
self.program
|
||||
.step(&mut self.state, self.mv_store.clone(), self.pager.clone())
|
||||
} else {
|
||||
const MAX_SCHEMA_RETRY: usize = 50;
|
||||
let mut res =
|
||||
self.program
|
||||
.step(&mut self.state, self.mv_store.clone(), self.pager.clone());
|
||||
for attempt in 0..MAX_SCHEMA_RETRY {
|
||||
// Only reprepare if we still need to update schema
|
||||
if !matches!(res, Err(LimboError::SchemaUpdated)) {
|
||||
break;
|
||||
}
|
||||
tracing::debug!("reprepare: attempt={}", attempt);
|
||||
self.reprepare()?;
|
||||
res = self
|
||||
.program
|
||||
.step(&mut self.state, self.mv_store.clone(), self.pager.clone());
|
||||
}
|
||||
tracing::debug!("reprepare: attempt={}", attempt);
|
||||
self.reprepare()?;
|
||||
res = self
|
||||
.program
|
||||
.step(&mut self.state, self.mv_store.clone(), self.pager.clone());
|
||||
res
|
||||
};
|
||||
|
||||
// Aggregate metrics when statement completes
|
||||
if matches!(res, Ok(StepResult::Done)) {
|
||||
let mut conn_metrics = self.program.connection.metrics.borrow_mut();
|
||||
conn_metrics.record_statement(self.state.metrics.clone());
|
||||
}
|
||||
|
||||
res
|
||||
|
||||
@@ -1171,7 +1171,7 @@ fn emit_update_insns(
|
||||
record_reg,
|
||||
unpacked_start: Some(start),
|
||||
unpacked_count: Some((index.columns.len() + 1) as u16),
|
||||
flags: IdxInsertFlags::new(),
|
||||
flags: IdxInsertFlags::new().nchange(true),
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -523,7 +523,7 @@ pub fn translate_insert(
|
||||
unpacked_start: Some(idx_start_reg), // TODO: enable optimization
|
||||
unpacked_count: Some((num_cols + 1) as u16),
|
||||
// TODO: figure out how to determine whether or not we need to seek prior to insert.
|
||||
flags: IdxInsertFlags::new(),
|
||||
flags: IdxInsertFlags::new().nchange(true),
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -1086,9 +1086,13 @@ pub fn op_vfilter(
|
||||
};
|
||||
cursor.filter(*idx_num as i32, idx_str, *arg_count, args)?
|
||||
};
|
||||
// Increment filter_operations metric for virtual table filter
|
||||
state.metrics.filter_operations = state.metrics.filter_operations.saturating_add(1);
|
||||
if !has_rows {
|
||||
state.pc = pc_if_empty.as_offset_int();
|
||||
} else {
|
||||
// VFilter positions to the first row if any exist, which counts as a read
|
||||
state.metrics.rows_read = state.metrics.rows_read.saturating_add(1);
|
||||
state.pc += 1;
|
||||
}
|
||||
Ok(InsnFunctionStepResult::Step)
|
||||
@@ -1203,6 +1207,8 @@ pub fn op_vnext(
|
||||
cursor.next()?
|
||||
};
|
||||
if has_more {
|
||||
// Increment metrics for row read from virtual table (including materialized views)
|
||||
state.metrics.rows_read = state.metrics.rows_read.saturating_add(1);
|
||||
state.pc = pc_if_next.as_offset_int();
|
||||
} else {
|
||||
state.pc += 1;
|
||||
@@ -1283,6 +1289,8 @@ pub fn op_rewind(
|
||||
if is_empty {
|
||||
state.pc = pc_if_empty.as_offset_int();
|
||||
} else {
|
||||
// Rewind positions to the first row, which is effectively a read
|
||||
state.metrics.rows_read = state.metrics.rows_read.saturating_add(1);
|
||||
state.pc += 1;
|
||||
}
|
||||
Ok(InsnFunctionStepResult::Step)
|
||||
@@ -1312,6 +1320,8 @@ pub fn op_last(
|
||||
if is_empty {
|
||||
state.pc = pc_if_empty.as_offset_int();
|
||||
} else {
|
||||
// Last positions to the last row, which is effectively a read
|
||||
state.metrics.rows_read = state.metrics.rows_read.saturating_add(1);
|
||||
state.pc += 1;
|
||||
}
|
||||
Ok(InsnFunctionStepResult::Step)
|
||||
@@ -1839,6 +1849,17 @@ pub fn op_next(
|
||||
cursor.is_empty()
|
||||
};
|
||||
if !is_empty {
|
||||
// Increment metrics for row read
|
||||
state.metrics.rows_read = state.metrics.rows_read.saturating_add(1);
|
||||
state.metrics.btree_next = state.metrics.btree_next.saturating_add(1);
|
||||
// Track if this is a full table scan or index scan
|
||||
if let Some((_, cursor_type)) = program.cursor_ref.get(*cursor_id) {
|
||||
if cursor_type.is_index() {
|
||||
state.metrics.index_steps = state.metrics.index_steps.saturating_add(1);
|
||||
} else if matches!(cursor_type, CursorType::BTreeTable(_)) {
|
||||
state.metrics.fullscan_steps = state.metrics.fullscan_steps.saturating_add(1);
|
||||
}
|
||||
}
|
||||
state.pc = pc_if_next.as_offset_int();
|
||||
} else {
|
||||
state.pc += 1;
|
||||
@@ -1870,6 +1891,17 @@ pub fn op_prev(
|
||||
cursor.is_empty()
|
||||
};
|
||||
if !is_empty {
|
||||
// Increment metrics for row read
|
||||
state.metrics.rows_read = state.metrics.rows_read.saturating_add(1);
|
||||
state.metrics.btree_prev = state.metrics.btree_prev.saturating_add(1);
|
||||
// Track if this is a full table scan or index scan
|
||||
if let Some((_, cursor_type)) = program.cursor_ref.get(*cursor_id) {
|
||||
if cursor_type.is_index() {
|
||||
state.metrics.index_steps = state.metrics.index_steps.saturating_add(1);
|
||||
} else if matches!(cursor_type, CursorType::BTreeTable(_)) {
|
||||
state.metrics.fullscan_steps = state.metrics.fullscan_steps.saturating_add(1);
|
||||
}
|
||||
}
|
||||
state.pc = pc_if_prev.as_offset_int();
|
||||
} else {
|
||||
state.pc += 1;
|
||||
@@ -2441,7 +2473,7 @@ pub fn op_seek_rowid(
|
||||
insn
|
||||
);
|
||||
assert!(target_pc.is_offset());
|
||||
let pc = {
|
||||
let (pc, did_seek) = {
|
||||
let mut cursor = state.get_cursor(*cursor_id);
|
||||
let cursor = cursor.as_btree_mut();
|
||||
let rowid = match state.registers[*src_reg].get_value() {
|
||||
@@ -2468,15 +2500,20 @@ pub fn op_seek_rowid(
|
||||
let seek_result = return_if_io!(
|
||||
cursor.seek(SeekKey::TableRowId(rowid), SeekOp::GE { eq_only: true })
|
||||
);
|
||||
if !matches!(seek_result, SeekResult::Found) {
|
||||
let pc = if !matches!(seek_result, SeekResult::Found) {
|
||||
target_pc.as_offset_int()
|
||||
} else {
|
||||
state.pc + 1
|
||||
}
|
||||
};
|
||||
(pc, true)
|
||||
}
|
||||
None => target_pc.as_offset_int(),
|
||||
None => (target_pc.as_offset_int(), false),
|
||||
}
|
||||
};
|
||||
// Increment btree_seeks metric for SeekRowid operation after cursor is dropped
|
||||
if did_seek {
|
||||
state.metrics.btree_seeks = state.metrics.btree_seeks.saturating_add(1);
|
||||
}
|
||||
state.pc = pc;
|
||||
Ok(InsnFunctionStepResult::Step)
|
||||
}
|
||||
@@ -2782,6 +2819,8 @@ pub fn seek_internal(
|
||||
IOResult::IO(io) => return Ok(SeekInternalResult::IO(io)),
|
||||
}
|
||||
};
|
||||
// Increment btree_seeks metric after seek operation and cursor is dropped
|
||||
state.metrics.btree_seeks = state.metrics.btree_seeks.saturating_add(1);
|
||||
let found = match seek_result {
|
||||
SeekResult::Found => true,
|
||||
SeekResult::NotFound => false,
|
||||
@@ -3760,15 +3799,19 @@ pub fn op_sorter_sort(
|
||||
},
|
||||
insn
|
||||
);
|
||||
let is_empty = {
|
||||
let (is_empty, did_sort) = {
|
||||
let mut cursor = state.get_cursor(*cursor_id);
|
||||
let cursor = cursor.as_sorter_mut();
|
||||
let is_empty = cursor.is_empty();
|
||||
if !is_empty {
|
||||
return_if_io!(cursor.sort());
|
||||
}
|
||||
is_empty
|
||||
(is_empty, !is_empty)
|
||||
};
|
||||
// Increment metrics for sort operation after cursor is dropped
|
||||
if did_sort {
|
||||
state.metrics.sort_operations = state.metrics.sort_operations.saturating_add(1);
|
||||
}
|
||||
if is_empty {
|
||||
state.pc = pc_if_empty.as_offset_int();
|
||||
} else {
|
||||
@@ -5203,6 +5246,8 @@ pub fn op_insert(
|
||||
moved_before
|
||||
));
|
||||
}
|
||||
// Increment metrics for row write
|
||||
state.metrics.rows_written = state.metrics.rows_written.saturating_add(1);
|
||||
|
||||
// Only update last_insert_rowid for regular table inserts, not schema modifications
|
||||
let root_page = {
|
||||
@@ -5413,6 +5458,8 @@ pub fn op_delete(
|
||||
let cursor = cursor.as_btree_mut();
|
||||
return_if_io!(cursor.delete());
|
||||
}
|
||||
// Increment metrics for row write (DELETE is a write operation)
|
||||
state.metrics.rows_written = state.metrics.rows_written.saturating_add(1);
|
||||
let schema = program.connection.schema.borrow();
|
||||
let dependent_views = schema.get_dependent_materialized_views(table_name);
|
||||
if dependent_views.is_empty() {
|
||||
@@ -5534,6 +5581,8 @@ pub fn op_idx_delete(
|
||||
let cursor = cursor.as_btree_mut();
|
||||
return_if_io!(cursor.delete());
|
||||
}
|
||||
// Increment metrics for index write (delete is a write operation)
|
||||
state.metrics.rows_written = state.metrics.rows_written.saturating_add(1);
|
||||
let n_change = program.n_change.get();
|
||||
program.n_change.set(n_change + 1);
|
||||
state.pc += 1;
|
||||
@@ -5680,9 +5729,12 @@ pub fn op_idx_insert(
|
||||
cursor.insert(&BTreeKey::new_index_key(record_to_insert), moved_before)
|
||||
);
|
||||
}
|
||||
// Increment metrics for index write
|
||||
if flags.has(IdxInsertFlags::NCHANGE) {
|
||||
state.metrics.rows_written = state.metrics.rows_written.saturating_add(1);
|
||||
}
|
||||
state.op_idx_insert_state = OpIdxInsertState::SeekIfUnique;
|
||||
state.pc += 1;
|
||||
// TODO: flag optimizations, update n_change if OPFLAG_NCHANGE
|
||||
Ok(InsnFunctionStepResult::Step)
|
||||
}
|
||||
}
|
||||
@@ -6996,6 +7048,12 @@ pub fn op_count(
|
||||
|
||||
state.registers[*target_reg] = Register::Value(Value::Integer(count as i64));
|
||||
|
||||
// For optimized COUNT(*) queries, the count represents rows that would be read
|
||||
// SQLite tracks this differently (as pages read), but for consistency we track as rows
|
||||
if *exact {
|
||||
state.metrics.rows_read = state.metrics.rows_read.saturating_add(count as u64);
|
||||
}
|
||||
|
||||
state.pc += 1;
|
||||
Ok(InsnFunctionStepResult::Step)
|
||||
}
|
||||
|
||||
199
core/vdbe/metrics.rs
Normal file
199
core/vdbe/metrics.rs
Normal file
@@ -0,0 +1,199 @@
|
||||
use std::fmt;
|
||||
|
||||
/// Statement-level execution metrics
|
||||
///
|
||||
/// These metrics are collected unconditionally during statement execution
|
||||
/// with minimal overhead (simple counter increments). The cost of incrementing
|
||||
/// these counters is negligible compared to the actual work being measured.
|
||||
#[derive(Debug, Default, Clone)]
|
||||
pub struct StatementMetrics {
|
||||
// Row operations
|
||||
pub rows_read: u64,
|
||||
pub rows_written: u64,
|
||||
|
||||
// Execution statistics
|
||||
pub vm_steps: u64,
|
||||
pub insn_executed: u64,
|
||||
|
||||
// Table scan metrics
|
||||
pub fullscan_steps: u64,
|
||||
pub index_steps: u64,
|
||||
|
||||
// Sort and filter operations
|
||||
pub sort_operations: u64,
|
||||
pub filter_operations: u64,
|
||||
|
||||
// B-tree operations
|
||||
pub btree_seeks: u64,
|
||||
pub btree_next: u64,
|
||||
pub btree_prev: u64,
|
||||
}
|
||||
|
||||
impl StatementMetrics {
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
/// Get total row operations
|
||||
pub fn total_row_ops(&self) -> u64 {
|
||||
self.rows_read + self.rows_written
|
||||
}
|
||||
|
||||
/// Merge another metrics instance into this one (for aggregation)
|
||||
pub fn merge(&mut self, other: &StatementMetrics) {
|
||||
self.rows_read = self.rows_read.saturating_add(other.rows_read);
|
||||
self.rows_written = self.rows_written.saturating_add(other.rows_written);
|
||||
self.vm_steps = self.vm_steps.saturating_add(other.vm_steps);
|
||||
self.insn_executed = self.insn_executed.saturating_add(other.insn_executed);
|
||||
self.fullscan_steps = self.fullscan_steps.saturating_add(other.fullscan_steps);
|
||||
self.index_steps = self.index_steps.saturating_add(other.index_steps);
|
||||
self.sort_operations = self.sort_operations.saturating_add(other.sort_operations);
|
||||
self.filter_operations = self
|
||||
.filter_operations
|
||||
.saturating_add(other.filter_operations);
|
||||
self.btree_seeks = self.btree_seeks.saturating_add(other.btree_seeks);
|
||||
self.btree_next = self.btree_next.saturating_add(other.btree_next);
|
||||
self.btree_prev = self.btree_prev.saturating_add(other.btree_prev);
|
||||
}
|
||||
|
||||
/// Reset all counters to zero
|
||||
pub fn reset(&mut self) {
|
||||
*self = Self::default();
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for StatementMetrics {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
writeln!(f, "Statement Metrics:")?;
|
||||
writeln!(f, " Row Operations:")?;
|
||||
writeln!(f, " Rows read: {}", self.rows_read)?;
|
||||
writeln!(f, " Rows written: {}", self.rows_written)?;
|
||||
writeln!(f, " Execution:")?;
|
||||
writeln!(f, " VM steps: {}", self.vm_steps)?;
|
||||
writeln!(f, " Instructions: {}", self.insn_executed)?;
|
||||
writeln!(f, " Table Access:")?;
|
||||
writeln!(f, " Full scan steps: {}", self.fullscan_steps)?;
|
||||
writeln!(f, " Index steps: {}", self.index_steps)?;
|
||||
writeln!(f, " Operations:")?;
|
||||
writeln!(f, " Sort operations: {}", self.sort_operations)?;
|
||||
writeln!(f, " Filter operations:{}", self.filter_operations)?;
|
||||
writeln!(f, " B-tree Operations:")?;
|
||||
writeln!(f, " Seeks: {}", self.btree_seeks)?;
|
||||
writeln!(f, " Next: {}", self.btree_next)?;
|
||||
writeln!(f, " Prev: {}", self.btree_prev)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Connection-level metrics aggregation
|
||||
#[derive(Debug, Default, Clone)]
|
||||
pub struct ConnectionMetrics {
|
||||
/// Total number of statements executed
|
||||
pub total_statements: u64,
|
||||
|
||||
/// Aggregate metrics from all statements
|
||||
pub aggregate: StatementMetrics,
|
||||
|
||||
/// Metrics from the last executed statement
|
||||
pub last_statement: Option<StatementMetrics>,
|
||||
|
||||
/// High-water marks for monitoring
|
||||
pub max_vm_steps_per_statement: u64,
|
||||
pub max_rows_read_per_statement: u64,
|
||||
}
|
||||
|
||||
impl ConnectionMetrics {
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
/// Record a completed statement's metrics
|
||||
pub fn record_statement(&mut self, metrics: StatementMetrics) {
|
||||
self.total_statements = self.total_statements.saturating_add(1);
|
||||
|
||||
// Update high-water marks
|
||||
self.max_vm_steps_per_statement = self.max_vm_steps_per_statement.max(metrics.vm_steps);
|
||||
self.max_rows_read_per_statement = self.max_rows_read_per_statement.max(metrics.rows_read);
|
||||
|
||||
// Aggregate into total
|
||||
self.aggregate.merge(&metrics);
|
||||
|
||||
// Keep last statement for debugging
|
||||
self.last_statement = Some(metrics);
|
||||
}
|
||||
|
||||
/// Reset connection metrics
|
||||
pub fn reset(&mut self) {
|
||||
*self = Self::default();
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for ConnectionMetrics {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
writeln!(f, "Connection Metrics:")?;
|
||||
writeln!(f, " Total statements: {}", self.total_statements)?;
|
||||
writeln!(f, " High-water marks:")?;
|
||||
writeln!(
|
||||
f,
|
||||
" Max VM steps: {}",
|
||||
self.max_vm_steps_per_statement
|
||||
)?;
|
||||
writeln!(
|
||||
f,
|
||||
" Max rows read: {}",
|
||||
self.max_rows_read_per_statement
|
||||
)?;
|
||||
writeln!(f)?;
|
||||
writeln!(f, "Aggregate Statistics:")?;
|
||||
write!(f, "{}", self.aggregate)?;
|
||||
|
||||
if let Some(ref last) = self.last_statement {
|
||||
writeln!(f)?;
|
||||
writeln!(f, "Last Statement:")?;
|
||||
write!(f, "{last}")?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_metrics_merge() {
|
||||
let mut m1 = StatementMetrics::new();
|
||||
m1.rows_read = 100;
|
||||
m1.vm_steps = 50;
|
||||
|
||||
let mut m2 = StatementMetrics::new();
|
||||
m2.rows_read = 200;
|
||||
m2.vm_steps = 75;
|
||||
|
||||
m1.merge(&m2);
|
||||
assert_eq!(m1.rows_read, 300);
|
||||
assert_eq!(m1.vm_steps, 125);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_connection_metrics_high_water() {
|
||||
let mut conn_metrics = ConnectionMetrics::new();
|
||||
|
||||
let mut stmt1 = StatementMetrics::new();
|
||||
stmt1.vm_steps = 100;
|
||||
stmt1.rows_read = 50;
|
||||
conn_metrics.record_statement(stmt1);
|
||||
|
||||
let mut stmt2 = StatementMetrics::new();
|
||||
stmt2.vm_steps = 75;
|
||||
stmt2.rows_read = 100;
|
||||
conn_metrics.record_statement(stmt2);
|
||||
|
||||
assert_eq!(conn_metrics.max_vm_steps_per_statement, 100);
|
||||
assert_eq!(conn_metrics.max_rows_read_per_statement, 100);
|
||||
assert_eq!(conn_metrics.total_statements, 2);
|
||||
assert_eq!(conn_metrics.aggregate.vm_steps, 175);
|
||||
assert_eq!(conn_metrics.aggregate.rows_read, 150);
|
||||
}
|
||||
}
|
||||
@@ -22,6 +22,7 @@ pub mod execute;
|
||||
pub mod explain;
|
||||
pub mod insn;
|
||||
pub mod likeop;
|
||||
pub mod metrics;
|
||||
pub mod sorter;
|
||||
|
||||
use crate::{
|
||||
@@ -35,6 +36,7 @@ use crate::{
|
||||
OpColumnState, OpDeleteState, OpDeleteSubState, OpIdxInsertState, OpInsertState,
|
||||
OpInsertSubState, OpNewRowidState, OpNoConflictState, OpRowIdState, OpSeekState,
|
||||
},
|
||||
vdbe::metrics::StatementMetrics,
|
||||
IOExt, RefValue,
|
||||
};
|
||||
|
||||
@@ -257,6 +259,8 @@ pub struct ProgramState {
|
||||
op_delete_state: OpDeleteState,
|
||||
op_idx_delete_state: Option<OpIdxDeleteState>,
|
||||
op_integrity_check_state: OpIntegrityCheckState,
|
||||
/// Metrics collected during statement execution
|
||||
pub metrics: StatementMetrics,
|
||||
op_open_ephemeral_state: OpOpenEphemeralState,
|
||||
op_new_rowid_state: OpNewRowidState,
|
||||
op_idx_insert_state: OpIdxInsertState,
|
||||
@@ -297,6 +301,7 @@ impl ProgramState {
|
||||
},
|
||||
op_idx_delete_state: None,
|
||||
op_integrity_check_state: OpIntegrityCheckState::Start,
|
||||
metrics: StatementMetrics::new(),
|
||||
op_open_ephemeral_state: OpOpenEphemeralState::Start,
|
||||
op_new_rowid_state: OpNewRowidState::Start,
|
||||
op_idx_insert_state: OpIdxInsertState::SeekIfUnique,
|
||||
@@ -446,16 +451,37 @@ impl Program {
|
||||
let _ = state.result_row.take();
|
||||
let (insn, insn_function) = &self.insns[state.pc as usize];
|
||||
trace_insn(self, state.pc as InsnReference, insn);
|
||||
// Always increment VM steps for every loop iteration
|
||||
state.metrics.vm_steps = state.metrics.vm_steps.saturating_add(1);
|
||||
|
||||
match insn_function(self, state, insn, &pager, mv_store.as_ref()) {
|
||||
Ok(InsnFunctionStepResult::Step) => {}
|
||||
Ok(InsnFunctionStepResult::Done) => return Ok(StepResult::Done),
|
||||
Ok(InsnFunctionStepResult::Step) => {
|
||||
// Instruction completed, moving to next
|
||||
state.metrics.insn_executed = state.metrics.insn_executed.saturating_add(1);
|
||||
}
|
||||
Ok(InsnFunctionStepResult::Done) => {
|
||||
// Instruction completed execution
|
||||
state.metrics.insn_executed = state.metrics.insn_executed.saturating_add(1);
|
||||
return Ok(StepResult::Done);
|
||||
}
|
||||
Ok(InsnFunctionStepResult::IO(io)) => {
|
||||
// Instruction not complete - waiting for I/O, will resume at same PC
|
||||
state.io_completions = Some(io);
|
||||
return Ok(StepResult::IO);
|
||||
}
|
||||
Ok(InsnFunctionStepResult::Row) => return Ok(StepResult::Row),
|
||||
Ok(InsnFunctionStepResult::Interrupt) => return Ok(StepResult::Interrupt),
|
||||
Ok(InsnFunctionStepResult::Busy) => return Ok(StepResult::Busy),
|
||||
Ok(InsnFunctionStepResult::Row) => {
|
||||
// Instruction completed (ResultRow already incremented PC)
|
||||
state.metrics.insn_executed = state.metrics.insn_executed.saturating_add(1);
|
||||
return Ok(StepResult::Row);
|
||||
}
|
||||
Ok(InsnFunctionStepResult::Interrupt) => {
|
||||
// Instruction interrupted - may resume at same PC
|
||||
return Ok(StepResult::Interrupt);
|
||||
}
|
||||
Ok(InsnFunctionStepResult::Busy) => {
|
||||
// Instruction blocked - will retry at same PC
|
||||
return Ok(StepResult::Busy);
|
||||
}
|
||||
Err(err) => {
|
||||
handle_program_error(&pager, &self.connection, &err)?;
|
||||
return Err(err);
|
||||
|
||||
Reference in New Issue
Block a user