finish EXPLAIN

This commit is contained in:
TcMits
2025-09-11 18:04:59 +07:00
parent a7373c9a97
commit b574b4bcea
5 changed files with 260 additions and 121 deletions

View File

@@ -29,7 +29,9 @@ use std::{
use tracing_appender::non_blocking::WorkerGuard;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
use turso_core::{Connection, Database, LimboError, OpenFlags, Statement, StepResult, Value};
use turso_core::{
Connection, Database, LimboError, OpenFlags, QueryMode, Statement, StepResult, Value,
};
#[derive(Parser, Debug)]
#[command(name = "Turso")]
@@ -460,34 +462,15 @@ impl Limbo {
} else {
None
};
// TODO this is a quickfix. Some ideas to do case insensitive comparisons is to use
// Uncased or Unicase.
let explain_str = "explain";
if input
.trim_start()
.get(..explain_str.len())
.map(|s| s.eq_ignore_ascii_case(explain_str))
.unwrap_or(false)
{
match self.conn.query(input) {
Ok(Some(stmt)) => {
let _ = self.writeln(stmt.explain().as_bytes());
}
Err(e) => {
let _ = self.writeln(e.to_string());
}
_ => {}
}
} else {
let conn = self.conn.clone();
let runner = conn.query_runner(input.as_bytes());
for output in runner {
if self
.print_query_result(input, output, stats.as_mut())
.is_err()
{
break;
}
let conn = self.conn.clone();
let runner = conn.query_runner(input.as_bytes());
for output in runner {
if self
.print_query_result(input, output, stats.as_mut())
.is_err()
{
break;
}
}
@@ -727,8 +710,56 @@ impl Limbo {
mut statistics: Option<&mut QueryStatistics>,
) -> anyhow::Result<()> {
match output {
Ok(Some(ref mut rows)) => match self.opts.output_mode {
OutputMode::List => {
Ok(Some(ref mut rows)) => match (self.opts.output_mode, rows.get_query_mode()) {
(_, QueryMode::Explain) => {
fn get_explain_indent(
indent_count: usize,
curr_insn: &str,
prev_insn: &str,
) -> usize {
let indent_count = match prev_insn {
"Rewind" | "Last" | "SorterSort" | "SeekGE" | "SeekGT" | "SeekLE"
| "SeekLT" => indent_count + 1,
_ => indent_count,
};
match curr_insn {
"Next" | "SorterNext" | "Prev" => indent_count - 1,
_ => indent_count,
}
}
let _ = self.writeln(
"addr opcode p1 p2 p3 p4 p5 comment",
);
let _ = self.writeln(
"---- ----------------- ---- ---- ---- ------------- -- -------",
);
let mut prev_insn: String = "".to_string();
let mut indent_count = 0;
let indent = " ";
loop {
row_step_result_query!(self, sql, rows, statistics, {
let row = rows.row().unwrap();
let insn = row.get_value(1).to_string();
indent_count = get_explain_indent(indent_count, &insn, &prev_insn);
let _ = self.writeln(format!(
"{:<4} {:<17} {:<4} {:<4} {:<4} {:<13} {:<2} {}",
row.get_value(0).to_string(),
&(indent.repeat(indent_count) + &insn),
row.get_value(2).to_string(),
row.get_value(3).to_string(),
row.get_value(4).to_string(),
row.get_value(5).to_string(),
row.get_value(6).to_string(),
row.get_value(7),
));
prev_insn = insn;
});
}
}
(OutputMode::List, _) => {
let mut headers_printed = false;
loop {
row_step_result_query!(self, sql, rows, statistics, {
@@ -759,7 +790,7 @@ impl Limbo {
});
}
}
OutputMode::Pretty => {
(OutputMode::Pretty, _) => {
let config = self.config.as_ref().unwrap();
let mut table = Table::new();
table
@@ -808,7 +839,7 @@ impl Limbo {
writeln!(self, "{table}")?;
}
}
OutputMode::Line => {
(OutputMode::Line, _) => {
let mut first_row_printed = false;
let max_width = (0..rows.num_columns())

View File

@@ -99,8 +99,8 @@ pub use types::RefValue;
pub use types::Value;
use util::parse_schema_rows;
pub use util::IOExt;
use vdbe::{builder::QueryMode, explain::EXPLAIN_COLUMNS};
use vdbe::{builder::TableRefIdCounter, explain::EXPLAIN_QUERY_PLAN_COLUMNS};
use vdbe::builder::TableRefIdCounter;
pub use vdbe::{builder::QueryMode, explain::EXPLAIN_COLUMNS, explain::EXPLAIN_QUERY_PLAN_COLUMNS};
/// Configuration for database features
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -970,26 +970,22 @@ impl Connection {
self.maybe_update_schema()?;
let pager = self.pager.borrow().clone();
let mode = QueryMode::new(&cmd);
match cmd {
Cmd::Stmt(stmt) | Cmd::Explain(stmt) => {
let program = translate::translate(
self.schema.borrow().deref(),
stmt,
pager.clone(),
self.clone(),
&syms,
mode,
input,
)?;
Ok(Statement::new(
program,
self._db.mv_store.clone(),
pager,
mode,
))
}
Cmd::ExplainQueryPlan(_) => todo!(),
}
let (Cmd::Stmt(stmt) | Cmd::Explain(stmt) | Cmd::ExplainQueryPlan(stmt)) = cmd;
let program = translate::translate(
self.schema.borrow().deref(),
stmt,
pager.clone(),
self.clone(),
&syms,
mode,
input,
)?;
Ok(Statement::new(
program,
self._db.mv_store.clone(),
pager,
mode,
))
}
/// Parse schema from scratch if version of schema for the connection differs from the schema cookie in the root page
@@ -1123,23 +1119,18 @@ impl Connection {
.unwrap()
.trim();
let mode = QueryMode::new(&cmd);
match cmd {
Cmd::Stmt(stmt) | Cmd::Explain(stmt) => {
let program = translate::translate(
self.schema.borrow().deref(),
stmt,
pager.clone(),
self.clone(),
&syms,
mode,
input,
)?;
Statement::new(program, self._db.mv_store.clone(), pager.clone(), mode)
.run_ignore_rows()?;
}
Cmd::ExplainQueryPlan(_) => todo!(),
}
let (Cmd::Stmt(stmt) | Cmd::Explain(stmt) | Cmd::ExplainQueryPlan(stmt)) = cmd;
let program = translate::translate(
self.schema.borrow().deref(),
stmt,
pager.clone(),
self.clone(),
&syms,
mode,
input,
)?;
Statement::new(program, self._db.mv_store.clone(), pager.clone(), mode)
.run_ignore_rows()?;
}
Ok(())
}
@@ -1191,6 +1182,8 @@ impl Connection {
}
Cmd::ExplainQueryPlan(stmt) => {
let mut table_ref_counter = TableRefIdCounter::new();
// TODO: we need OP_Explain
match stmt {
ast::Stmt::Select(select) => {
let mut plan = prepare_select_plan(
@@ -1234,22 +1227,18 @@ impl Connection {
.trim();
self.maybe_update_schema()?;
let mode = QueryMode::new(&cmd);
match cmd {
Cmd::ExplainQueryPlan(_stmt) => todo!(),
Cmd::Stmt(stmt) | Cmd::Explain(stmt) => {
let program = translate::translate(
self.schema.borrow().deref(),
stmt,
pager.clone(),
self.clone(),
&syms,
mode,
input,
)?;
Statement::new(program, self._db.mv_store.clone(), pager.clone(), mode)
.run_ignore_rows()?;
}
}
let (Cmd::Stmt(stmt) | Cmd::Explain(stmt) | Cmd::ExplainQueryPlan(stmt)) = cmd;
let program = translate::translate(
self.schema.borrow().deref(),
stmt,
pager.clone(),
self.clone(),
&syms,
mode,
input,
)?;
Statement::new(program, self._db.mv_store.clone(), pager.clone(), mode)
.run_ignore_rows()?;
}
Ok(())
}
@@ -2097,7 +2086,14 @@ impl Statement {
query_mode: QueryMode,
) -> Self {
let accesses_db = program.accesses_db;
let state = vdbe::ProgramState::new(program.max_registers, program.cursor_ref.len());
let state = vdbe::ProgramState::new(
match query_mode {
QueryMode::Normal => program.max_registers,
QueryMode::Explain => EXPLAIN_COLUMNS.len(),
QueryMode::ExplainQueryPlan => EXPLAIN_QUERY_PLAN_COLUMNS.len(),
},
program.cursor_ref.len(),
);
Self {
program,
state,
@@ -2125,13 +2121,20 @@ impl Statement {
pub fn step(&mut self) -> Result<StepResult> {
let res = if !self.accesses_db {
self.program
.step(&mut self.state, self.mv_store.clone(), self.pager.clone())
self.program.step(
&mut self.state,
self.mv_store.clone(),
self.pager.clone(),
self.query_mode,
)
} else {
const MAX_SCHEMA_RETRY: usize = 50;
let mut res =
self.program
.step(&mut self.state, self.mv_store.clone(), self.pager.clone());
let mut res = self.program.step(
&mut self.state,
self.mv_store.clone(),
self.pager.clone(),
self.query_mode,
);
for attempt in 0..MAX_SCHEMA_RETRY {
// Only reprepare if we still need to update schema
if !matches!(res, Err(LimboError::SchemaUpdated)) {
@@ -2139,9 +2142,12 @@ impl Statement {
}
tracing::debug!("reprepare: attempt={}", attempt);
self.reprepare()?;
res = self
.program
.step(&mut self.state, self.mv_store.clone(), self.pager.clone());
res = self.program.step(
&mut self.state,
self.mv_store.clone(),
self.pager.clone(),
self.query_mode,
);
}
res
};
@@ -2196,20 +2202,18 @@ impl Statement {
let cmd = cmd.expect("Same SQL string should be able to be parsed");
let syms = conn.syms.borrow();
match cmd {
Cmd::Stmt(stmt) => translate::translate(
conn.schema.borrow().deref(),
stmt,
self.pager.clone(),
conn.clone(),
&syms,
QueryMode::Normal,
&self.program.sql,
)?,
Cmd::Explain(_stmt) => todo!(),
Cmd::ExplainQueryPlan(_stmt) => todo!(),
}
let mode = self.query_mode;
debug_assert_eq!(QueryMode::new(&cmd), mode,);
let (Cmd::Stmt(stmt) | Cmd::Explain(stmt) | Cmd::ExplainQueryPlan(stmt)) = cmd;
translate::translate(
conn.schema.borrow().deref(),
stmt,
self.pager.clone(),
conn.clone(),
&syms,
mode,
&self.program.sql,
)?
};
// Save parameters before they are reset
let parameters = std::mem::take(&mut self.state.parameters);

View File

@@ -8,13 +8,10 @@ use crate::function::{Func, ScalarFunc};
pub const EXPLAIN_COLUMNS: [&str; 8] = ["addr", "opcode", "p1", "p2", "p3", "p4", "p5", "comment"];
pub const EXPLAIN_QUERY_PLAN_COLUMNS: [&str; 4] = ["id", "parent", "notused", "detail"];
pub fn insn_to_str(
pub fn insn_to_row(
program: &Program,
addr: InsnReference,
insn: &Insn,
indent: String,
manual_comment: Option<&'static str>,
) -> String {
) -> (&'static str, i32, i32, i32, Value, u16, String) {
let get_table_or_index_name = |cursor_id: usize| {
let cursor_type = &program.cursor_ref[cursor_id].1;
match cursor_type {
@@ -26,8 +23,7 @@ pub fn insn_to_str(
CursorType::Sorter => "sorter",
}
};
let (opcode, p1, p2, p3, p4, p5, comment): (&str, i32, i32, i32, Value, u16, String) =
match insn {
match insn {
Insn::Init { target_pc } => (
"Init",
0,
@@ -1727,7 +1723,34 @@ pub fn insn_to_str(
0,
format!("if (r[{}] < 0) goto {}", reg, target_pc.as_debug_int()),
),
};
}
}
pub fn insn_to_row_with_comment(
program: &Program,
insn: &Insn,
manual_comment: Option<&str>,
) -> (&'static str, i32, i32, i32, Value, u16, String) {
let (opcode, p1, p2, p3, p4, p5, comment) = insn_to_row(program, insn);
(
opcode,
p1,
p2,
p3,
p4,
p5,
manual_comment.map_or(comment.to_string(), |mc| format!("{comment}; {mc}")),
)
}
pub fn insn_to_str(
program: &Program,
addr: InsnReference,
insn: &Insn,
indent: String,
manual_comment: Option<&str>,
) -> String {
let (opcode, p1, p2, p3, p4, p5, comment) = insn_to_row(program, insn);
format!(
"{:<4} {:<17} {:<4} {:<4} {:<4} {:<13} {:<2} {}",
addr,

View File

@@ -53,12 +53,13 @@ use crate::{
#[cfg(feature = "json")]
use crate::json::JsonCacheCell;
use crate::{Connection, MvStore, Result, TransactionState};
use builder::CursorKey;
use builder::{CursorKey, QueryMode};
use execute::{
InsnFunction, InsnFunctionStepResult, OpIdxDeleteState, OpIntegrityCheckState,
OpOpenEphemeralState,
};
use explain::{insn_to_row_with_comment, EXPLAIN_COLUMNS, EXPLAIN_QUERY_PLAN_COLUMNS};
use regex::Regex;
use std::{cell::Cell, collections::HashMap, num::NonZero, rc::Rc, sync::Arc};
use tracing::{instrument, Level};
@@ -465,12 +466,92 @@ impl Program {
self.connection.get_pager_from_database_index(idx)
}
#[instrument(skip_all, level = Level::DEBUG)]
pub fn step(
&self,
state: &mut ProgramState,
mv_store: Option<Arc<MvStore>>,
pager: Rc<Pager>,
query_mode: QueryMode,
) -> Result<StepResult> {
match query_mode {
QueryMode::Normal => self.normal_step(state, mv_store, pager),
QueryMode::Explain => self.explain_step(state, mv_store, pager),
QueryMode::ExplainQueryPlan => self.explain_query_plan_step(state, mv_store, pager),
}
}
fn explain_step(
&self,
state: &mut ProgramState,
_mv_store: Option<Arc<MvStore>>,
pager: Rc<Pager>,
) -> Result<StepResult> {
debug_assert!(state.column_count() == EXPLAIN_COLUMNS.len());
if self.connection.closed.get() {
// Connection is closed for whatever reason, rollback the transaction.
let state = self.connection.transaction_state.get();
if let TransactionState::Write { .. } = state {
pager.io.block(|| pager.end_tx(true, &self.connection))?;
}
return Err(LimboError::InternalError("Connection closed".to_string()));
}
if state.is_interrupted() {
return Ok(StepResult::Interrupt);
}
// FIXME: do we need this?
state.metrics.vm_steps = state.metrics.vm_steps.saturating_add(1);
if state.pc as usize >= self.insns.len() {
return Ok(StepResult::Done);
}
let (current_insn, _) = &self.insns[state.pc as usize];
let (opcode, p1, p2, p3, p4, p5, comment) = insn_to_row_with_comment(
self,
current_insn,
self.comments.as_ref().and_then(|comments| {
comments
.iter()
.find(|(offset, _)| *offset == state.pc)
.map(|(_, comment)| comment)
.copied()
}),
);
state.registers[0] = Register::Value(Value::Integer(state.pc as i64));
state.registers[1] = Register::Value(Value::from_text(&opcode));
state.registers[2] = Register::Value(Value::Integer(p1 as i64));
state.registers[3] = Register::Value(Value::Integer(p2 as i64));
state.registers[4] = Register::Value(Value::Integer(p3 as i64));
state.registers[5] = Register::Value(p4);
state.registers[6] = Register::Value(Value::Integer(p5 as i64));
state.registers[7] = Register::Value(Value::from_text(&comment));
state.result_row = Some(Row {
values: &state.registers[0] as *const Register,
count: EXPLAIN_COLUMNS.len(),
});
state.pc += 1;
Ok(StepResult::Row)
}
fn explain_query_plan_step(
&self,
state: &mut ProgramState,
_mv_store: Option<Arc<MvStore>>,
_pager: Rc<Pager>,
) -> Result<StepResult> {
debug_assert!(state.column_count() == EXPLAIN_QUERY_PLAN_COLUMNS.len());
todo!("we need OP_Explain to be implemented first")
}
#[instrument(skip_all, level = Level::DEBUG)]
fn normal_step(
&self,
state: &mut ProgramState,
mv_store: Option<Arc<MvStore>>,
pager: Rc<Pager>,
) -> Result<StepResult> {
let enable_tracing = tracing::enabled!(tracing::Level::TRACE);
loop {

0
tursodb Normal file
View File