Files
turso/core/translate/emitter.rs
Pekka Enberg 7f2525ac27 Merge 'Implement create virtual table using vtab modules, more work on virtual tables' from Preston Thorpe
This PR started out as one to improve the API of extensions but I ended
up building on top of this quite a bit and it just kept going. Sorry
this one is so large but there wasn't really a good stopping point, as
it kept leaving stuff in broken states.
**VCreate**: Support for `CREATE VIRTUAL TABLE t USING vtab_module`
**VUpdate**: Support for `INSERT` and `DELETE` methods on virtual
tables.
Sqlite uses `xUpdate` function with the `VUpdate` opcode to handle all
insert/update/delete functionality in virtual tables..
have to just document that:
```
if args[0] == NULL:  INSERT args[1] the values in args[2..]

if args[1] == NULL: DELETE args[0]

if args[0] != NULL && len(args) > 2: Update values=args[2..]  rowid=args[0]
```
I know I asked @jussisaurio on discord about this already, but it just
sucked so bad that I added some internal translation so we could expose
a [nice API](https://github.com/tursodatabase/limbo/pull/996/files#diff-
3e8f8a660b11786745b48b528222d11671e9f19fa00a032a4eefb5412e8200d1R54) and
handle the logic ourselves while keeping with sqlite's opcodes.
I'll change it back if I have to, I just thought it was genuinely awful
to have to rely on comments to explain all that to extension authors.
The included extension is not meant to be a legitimately useful one, it
is there for testing purposes. I did something similar in #960 using a
test extension, so I figure when they are both merged, I will go back
and combine them into one since you can do many kinds at once, and that
way it will reduce the amount of crates and therefore compile time.
1. Remaining opcodes.
2. `UPDATE` (when we support the syntax)
3. `xConnect` - expose API for a DB connection to a vtab so it can
perform arbitrary queries.

Closes #996
2025-02-25 15:31:12 +02:00

403 lines
14 KiB
Rust

// This module contains code for emitting bytecode instructions for SQL query execution.
// It handles translating high-level SQL operations into low-level bytecode that can be executed by the virtual machine.
use limbo_sqlite3_parser::ast::{self};
use crate::function::Func;
use crate::translate::plan::{DeletePlan, Plan, Search};
use crate::util::exprs_are_equivalent;
use crate::vdbe::builder::ProgramBuilder;
use crate::vdbe::{insn::Insn, BranchOffset};
use crate::{Result, SymbolTable};
use super::aggregation::emit_ungrouped_aggregation;
use super::expr::{translate_condition_expr, ConditionMetadata};
use super::group_by::{emit_group_by, init_group_by, GroupByMetadata};
use super::main_loop::{close_loop, emit_loop, init_loop, open_loop, LeftJoinMetadata, LoopLabels};
use super::order_by::{emit_order_by, init_order_by, SortMetadata};
use super::plan::Operation;
use super::plan::{SelectPlan, TableReference};
use super::subquery::emit_subqueries;
#[derive(Debug)]
pub struct Resolver<'a> {
pub symbol_table: &'a SymbolTable,
pub expr_to_reg_cache: Vec<(&'a ast::Expr, usize)>,
}
impl<'a> Resolver<'a> {
pub fn new(symbol_table: &'a SymbolTable) -> Self {
Self {
symbol_table,
expr_to_reg_cache: Vec::new(),
}
}
pub fn resolve_function(&self, func_name: &str, arg_count: usize) -> Option<Func> {
match Func::resolve_function(func_name, arg_count).ok() {
Some(func) => Some(func),
None => self
.symbol_table
.resolve_function(func_name, arg_count)
.map(|arg| Func::External(arg.clone())),
}
}
pub fn resolve_cached_expr_reg(&self, expr: &ast::Expr) -> Option<usize> {
self.expr_to_reg_cache
.iter()
.find(|(e, _)| exprs_are_equivalent(expr, e))
.map(|(_, reg)| *reg)
}
}
/// The TranslateCtx struct holds various information and labels used during bytecode generation.
/// It is used for maintaining state and control flow during the bytecode
/// generation process.
#[derive(Debug)]
pub struct TranslateCtx<'a> {
// A typical query plan is a nested loop. Each loop has its own LoopLabels (see the definition of LoopLabels for more details)
pub labels_main_loop: Vec<LoopLabels>,
// label for the instruction that jumps to the next phase of the query after the main loop
// we don't know ahead of time what that is (GROUP BY, ORDER BY, etc.)
pub label_main_loop_end: Option<BranchOffset>,
// First register of the aggregation results
pub reg_agg_start: Option<usize>,
// First register of the result columns of the query
pub reg_result_cols_start: Option<usize>,
// The register holding the limit value, if any.
pub reg_limit: Option<usize>,
// The register holding the offset value, if any.
pub reg_offset: Option<usize>,
// The register holding the limit+offset value, if any.
pub reg_limit_offset_sum: Option<usize>,
// metadata for the group by operator
pub meta_group_by: Option<GroupByMetadata>,
// metadata for the order by operator
pub meta_sort: Option<SortMetadata>,
/// mapping between table loop index and associated metadata (for left joins only)
/// this metadata exists for the right table in a given left join
pub meta_left_joins: Vec<Option<LeftJoinMetadata>>,
// We need to emit result columns in the order they are present in the SELECT, but they may not be in the same order in the ORDER BY sorter.
// This vector holds the indexes of the result columns in the ORDER BY sorter.
pub result_column_indexes_in_orderby_sorter: Vec<usize>,
// We might skip adding a SELECT result column into the ORDER BY sorter if it is an exact match in the ORDER BY keys.
// This vector holds the indexes of the result columns that we need to skip.
pub result_columns_to_skip_in_orderby_sorter: Option<Vec<usize>>,
pub resolver: Resolver<'a>,
}
/// Used to distinguish database operations
#[allow(clippy::upper_case_acronyms, dead_code)]
#[derive(Debug, Clone)]
pub enum OperationMode {
SELECT,
INSERT,
UPDATE,
DELETE,
}
/// Initialize the program with basic setup and return initial metadata and labels
fn prologue<'a>(
program: &mut ProgramBuilder,
syms: &'a SymbolTable,
table_count: usize,
result_column_count: usize,
) -> Result<(TranslateCtx<'a>, BranchOffset, BranchOffset)> {
let init_label = program.allocate_label();
program.emit_insn(Insn::Init {
target_pc: init_label,
});
let start_offset = program.offset();
let t_ctx = TranslateCtx {
labels_main_loop: (0..table_count).map(|_| LoopLabels::new(program)).collect(),
label_main_loop_end: None,
reg_agg_start: None,
reg_limit: None,
reg_offset: None,
reg_limit_offset_sum: None,
reg_result_cols_start: None,
meta_group_by: None,
meta_left_joins: (0..table_count).map(|_| None).collect(),
meta_sort: None,
result_column_indexes_in_orderby_sorter: (0..result_column_count).collect(),
result_columns_to_skip_in_orderby_sorter: None,
resolver: Resolver::new(syms),
};
Ok((t_ctx, init_label, start_offset))
}
/// Clean up and finalize the program, resolving any remaining labels
/// Note that although these are the final instructions, typically an SQLite
/// query will jump to the Transaction instruction via init_label.
fn epilogue(
program: &mut ProgramBuilder,
init_label: BranchOffset,
start_offset: BranchOffset,
) -> Result<()> {
program.emit_insn(Insn::Halt {
err_code: 0,
description: String::new(),
});
program.resolve_label(init_label, program.offset());
program.emit_insn(Insn::Transaction { write: false });
program.emit_constant_insns();
program.emit_insn(Insn::Goto {
target_pc: start_offset,
});
Ok(())
}
/// Main entry point for emitting bytecode for a SQL query
/// Takes a query plan and generates the corresponding bytecode program
pub fn emit_program(program: &mut ProgramBuilder, plan: Plan, syms: &SymbolTable) -> Result<()> {
match plan {
Plan::Select(plan) => emit_program_for_select(program, plan, syms),
Plan::Delete(plan) => emit_program_for_delete(program, plan, syms),
}
}
fn emit_program_for_select(
program: &mut ProgramBuilder,
mut plan: SelectPlan,
syms: &SymbolTable,
) -> Result<()> {
let (mut t_ctx, init_label, start_offset) = prologue(
program,
syms,
plan.table_references.len(),
plan.result_columns.len(),
)?;
// Trivial exit on LIMIT 0
if let Some(limit) = plan.limit {
if limit == 0 {
epilogue(program, init_label, start_offset)?;
}
}
// Emit main parts of query
emit_query(program, &mut plan, &mut t_ctx)?;
// Finalize program
epilogue(program, init_label, start_offset)?;
program.result_columns = plan.result_columns;
program.table_references = plan.table_references;
Ok(())
}
pub fn emit_query<'a>(
program: &'a mut ProgramBuilder,
plan: &'a mut SelectPlan,
t_ctx: &'a mut TranslateCtx<'a>,
) -> Result<usize> {
// Emit subqueries first so the results can be read in the main query loop.
emit_subqueries(program, t_ctx, &mut plan.table_references)?;
if t_ctx.reg_limit.is_none() {
t_ctx.reg_limit = plan.limit.map(|_| program.alloc_register());
}
if t_ctx.reg_offset.is_none() {
t_ctx.reg_offset = plan.offset.map(|_| program.alloc_register());
}
if t_ctx.reg_limit_offset_sum.is_none() {
t_ctx.reg_limit_offset_sum = plan.offset.map(|_| program.alloc_register());
}
// No rows will be read from source table loops if there is a constant false condition eg. WHERE 0
// however an aggregation might still happen,
// e.g. SELECT COUNT(*) WHERE 0 returns a row with 0, not an empty result set
let after_main_loop_label = program.allocate_label();
t_ctx.label_main_loop_end = Some(after_main_loop_label);
if plan.contains_constant_false_condition {
program.emit_insn(Insn::Goto {
target_pc: after_main_loop_label,
});
}
// Allocate registers for result columns
t_ctx.reg_result_cols_start = Some(program.alloc_registers(plan.result_columns.len()));
// Initialize cursors and other resources needed for query execution
if let Some(ref mut order_by) = plan.order_by {
init_order_by(program, t_ctx, order_by)?;
}
if let Some(ref mut group_by) = plan.group_by {
init_group_by(program, t_ctx, group_by, &plan.aggregates)?;
}
init_loop(
program,
t_ctx,
&plan.table_references,
&OperationMode::SELECT,
)?;
for where_term in plan.where_clause.iter().filter(|wt| wt.is_constant()) {
let jump_target_when_true = program.allocate_label();
let condition_metadata = ConditionMetadata {
jump_if_condition_is_true: false,
jump_target_when_false: after_main_loop_label,
jump_target_when_true,
};
translate_condition_expr(
program,
&plan.table_references,
&where_term.expr,
condition_metadata,
&mut t_ctx.resolver,
)?;
program.resolve_label(jump_target_when_true, program.offset());
}
// Set up main query execution loop
open_loop(program, t_ctx, &plan.table_references, &plan.where_clause)?;
// Process result columns and expressions in the inner loop
emit_loop(program, t_ctx, plan)?;
// Clean up and close the main execution loop
close_loop(program, t_ctx, &plan.table_references)?;
program.resolve_label(after_main_loop_label, program.offset());
let mut order_by_necessary = plan.order_by.is_some() && !plan.contains_constant_false_condition;
let order_by = plan.order_by.as_ref();
// Handle GROUP BY and aggregation processing
if plan.group_by.is_some() {
emit_group_by(program, t_ctx, plan)?;
} else if !plan.aggregates.is_empty() {
// Handle aggregation without GROUP BY
emit_ungrouped_aggregation(program, t_ctx, plan)?;
// Single row result for aggregates without GROUP BY, so ORDER BY not needed
order_by_necessary = false;
}
// Process ORDER BY results if needed
if order_by.is_some() && order_by_necessary {
emit_order_by(program, t_ctx, plan)?;
}
Ok(t_ctx.reg_result_cols_start.unwrap())
}
fn emit_program_for_delete(
program: &mut ProgramBuilder,
plan: DeletePlan,
syms: &SymbolTable,
) -> Result<()> {
let (mut t_ctx, init_label, start_offset) = prologue(
program,
syms,
plan.table_references.len(),
plan.result_columns.len(),
)?;
// No rows will be read from source table loops if there is a constant false condition eg. WHERE 0
let after_main_loop_label = program.allocate_label();
t_ctx.label_main_loop_end = Some(after_main_loop_label);
if plan.contains_constant_false_condition {
program.emit_insn(Insn::Goto {
target_pc: after_main_loop_label,
});
}
// Initialize cursors and other resources needed for query execution
init_loop(
program,
&mut t_ctx,
&plan.table_references,
&OperationMode::DELETE,
)?;
// Set up main query execution loop
open_loop(
program,
&mut t_ctx,
&plan.table_references,
&plan.where_clause,
)?;
emit_delete_insns(program, &mut t_ctx, &plan.table_references, &plan.limit)?;
// Clean up and close the main execution loop
close_loop(program, &mut t_ctx, &plan.table_references)?;
program.resolve_label(after_main_loop_label, program.offset());
// Finalize program
epilogue(program, init_label, start_offset)?;
program.result_columns = plan.result_columns;
program.table_references = plan.table_references;
Ok(())
}
fn emit_delete_insns(
program: &mut ProgramBuilder,
t_ctx: &mut TranslateCtx,
table_references: &[TableReference],
limit: &Option<isize>,
) -> Result<()> {
let table_reference = table_references.first().unwrap();
let cursor_id = match &table_reference.op {
Operation::Scan { .. } => program.resolve_cursor_id(&table_reference.identifier),
Operation::Search(search) => match search {
Search::RowidEq { .. } | Search::RowidSearch { .. } => {
program.resolve_cursor_id(&table_reference.identifier)
}
Search::IndexSearch { index, .. } => program.resolve_cursor_id(&index.name),
},
_ => return Ok(()),
};
// Emit the instructions to delete the row
let key_reg = program.alloc_register();
program.emit_insn(Insn::RowId {
cursor_id,
dest: key_reg,
});
if let Some(vtab) = table_reference.virtual_table() {
let conflict_action = 0u16;
let start_reg = key_reg;
let new_rowid_reg = program.alloc_register();
program.emit_insn(Insn::Null {
dest: new_rowid_reg,
dest_end: None,
});
program.emit_insn(Insn::VUpdate {
cursor_id,
arg_count: 2,
start_reg,
vtab_ptr: vtab.implementation.as_ref().ctx as usize,
conflict_action,
});
} else {
program.emit_insn(Insn::DeleteAsync { cursor_id });
program.emit_insn(Insn::DeleteAwait { cursor_id });
}
if let Some(limit) = limit {
let limit_reg = program.alloc_register();
program.emit_insn(Insn::Integer {
value: *limit as i64,
dest: limit_reg,
});
program.mark_last_insn_constant();
program.emit_insn(Insn::DecrJumpZero {
reg: limit_reg,
target_pc: t_ctx.label_main_loop_end.unwrap(),
})
}
Ok(())
}