mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-04 00:44:19 +01:00
Merge 'Cleanup emitter some more' from Jussi Saurio
No functional changes, just move almost everything out of `emitter.rs` into smaller modules with more distinct responsibilities. Also, from `expr.rs`, move `translate_aggregation` into `aggregation.rs` and `translate_aggregation_groupby` into `group_by.rs` Closes #610
This commit is contained in:
234
core/translate/aggregation.rs
Normal file
234
core/translate/aggregation.rs
Normal file
@@ -0,0 +1,234 @@
|
||||
use sqlite3_parser::ast;
|
||||
|
||||
use crate::{
|
||||
function::AggFunc,
|
||||
vdbe::{builder::ProgramBuilder, insn::Insn},
|
||||
Result,
|
||||
};
|
||||
|
||||
use super::{
|
||||
emitter::{Resolver, TranslateCtx},
|
||||
expr::translate_expr,
|
||||
plan::{Aggregate, SelectPlan, TableReference},
|
||||
result_row::emit_select_result,
|
||||
};
|
||||
|
||||
/// Emits the bytecode for processing an aggregate without a GROUP BY clause.
|
||||
/// This is called when the main query execution loop has finished processing,
|
||||
/// and we can now materialize the aggregate results.
|
||||
pub fn emit_ungrouped_aggregation<'a>(
|
||||
program: &mut ProgramBuilder,
|
||||
t_ctx: &mut TranslateCtx<'a>,
|
||||
plan: &'a SelectPlan,
|
||||
) -> Result<()> {
|
||||
let agg_start_reg = t_ctx.reg_agg_start.unwrap();
|
||||
for (i, agg) in plan.aggregates.iter().enumerate() {
|
||||
let agg_result_reg = agg_start_reg + i;
|
||||
program.emit_insn(Insn::AggFinal {
|
||||
register: agg_result_reg,
|
||||
func: agg.func.clone(),
|
||||
});
|
||||
}
|
||||
// we now have the agg results in (agg_start_reg..agg_start_reg + aggregates.len() - 1)
|
||||
// we need to call translate_expr on each result column, but replace the expr with a register copy in case any part of the
|
||||
// result column expression matches a) a group by column or b) an aggregation result.
|
||||
for (i, agg) in plan.aggregates.iter().enumerate() {
|
||||
t_ctx
|
||||
.resolver
|
||||
.expr_to_reg_cache
|
||||
.push((&agg.original_expr, agg_start_reg + i));
|
||||
}
|
||||
|
||||
// This always emits a ResultRow because currently it can only be used for a single row result
|
||||
// Limit is None because we early exit on limit 0 and the max rows here is 1
|
||||
emit_select_result(program, t_ctx, plan, None)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Emits the bytecode for processing an aggregate step.
|
||||
/// E.g. in `SELECT SUM(price) FROM t`, 'price' is evaluated for every row, and the result is added to the accumulator.
|
||||
///
|
||||
/// This is distinct from the final step, which is called after the main loop has finished processing
|
||||
/// and the actual result value of the aggregation is materialized.
|
||||
pub fn translate_aggregation_step(
|
||||
program: &mut ProgramBuilder,
|
||||
referenced_tables: &[TableReference],
|
||||
agg: &Aggregate,
|
||||
target_register: usize,
|
||||
resolver: &Resolver,
|
||||
) -> Result<usize> {
|
||||
let dest = match agg.func {
|
||||
AggFunc::Avg => {
|
||||
if agg.args.len() != 1 {
|
||||
crate::bail_parse_error!("avg bad number of arguments");
|
||||
}
|
||||
let expr = &agg.args[0];
|
||||
let expr_reg = program.alloc_register();
|
||||
let _ = translate_expr(program, Some(referenced_tables), expr, expr_reg, resolver)?;
|
||||
program.emit_insn(Insn::AggStep {
|
||||
acc_reg: target_register,
|
||||
col: expr_reg,
|
||||
delimiter: 0,
|
||||
func: AggFunc::Avg,
|
||||
});
|
||||
target_register
|
||||
}
|
||||
AggFunc::Count => {
|
||||
let expr_reg = if agg.args.is_empty() {
|
||||
program.alloc_register()
|
||||
} else {
|
||||
let expr = &agg.args[0];
|
||||
let expr_reg = program.alloc_register();
|
||||
let _ = translate_expr(program, Some(referenced_tables), expr, expr_reg, resolver)?;
|
||||
expr_reg
|
||||
};
|
||||
program.emit_insn(Insn::AggStep {
|
||||
acc_reg: target_register,
|
||||
col: expr_reg,
|
||||
delimiter: 0,
|
||||
func: AggFunc::Count,
|
||||
});
|
||||
target_register
|
||||
}
|
||||
AggFunc::GroupConcat => {
|
||||
if agg.args.len() != 1 && agg.args.len() != 2 {
|
||||
crate::bail_parse_error!("group_concat bad number of arguments");
|
||||
}
|
||||
|
||||
let expr_reg = program.alloc_register();
|
||||
let delimiter_reg = program.alloc_register();
|
||||
|
||||
let expr = &agg.args[0];
|
||||
let delimiter_expr: ast::Expr;
|
||||
|
||||
if agg.args.len() == 2 {
|
||||
match &agg.args[1] {
|
||||
ast::Expr::Column { .. } => {
|
||||
delimiter_expr = agg.args[1].clone();
|
||||
}
|
||||
ast::Expr::Literal(ast::Literal::String(s)) => {
|
||||
delimiter_expr = ast::Expr::Literal(ast::Literal::String(s.to_string()));
|
||||
}
|
||||
_ => crate::bail_parse_error!("Incorrect delimiter parameter"),
|
||||
};
|
||||
} else {
|
||||
delimiter_expr = ast::Expr::Literal(ast::Literal::String(String::from("\",\"")));
|
||||
}
|
||||
|
||||
translate_expr(program, Some(referenced_tables), expr, expr_reg, resolver)?;
|
||||
translate_expr(
|
||||
program,
|
||||
Some(referenced_tables),
|
||||
&delimiter_expr,
|
||||
delimiter_reg,
|
||||
resolver,
|
||||
)?;
|
||||
|
||||
program.emit_insn(Insn::AggStep {
|
||||
acc_reg: target_register,
|
||||
col: expr_reg,
|
||||
delimiter: delimiter_reg,
|
||||
func: AggFunc::GroupConcat,
|
||||
});
|
||||
|
||||
target_register
|
||||
}
|
||||
AggFunc::Max => {
|
||||
if agg.args.len() != 1 {
|
||||
crate::bail_parse_error!("max bad number of arguments");
|
||||
}
|
||||
let expr = &agg.args[0];
|
||||
let expr_reg = program.alloc_register();
|
||||
let _ = translate_expr(program, Some(referenced_tables), expr, expr_reg, resolver)?;
|
||||
program.emit_insn(Insn::AggStep {
|
||||
acc_reg: target_register,
|
||||
col: expr_reg,
|
||||
delimiter: 0,
|
||||
func: AggFunc::Max,
|
||||
});
|
||||
target_register
|
||||
}
|
||||
AggFunc::Min => {
|
||||
if agg.args.len() != 1 {
|
||||
crate::bail_parse_error!("min bad number of arguments");
|
||||
}
|
||||
let expr = &agg.args[0];
|
||||
let expr_reg = program.alloc_register();
|
||||
let _ = translate_expr(program, Some(referenced_tables), expr, expr_reg, resolver)?;
|
||||
program.emit_insn(Insn::AggStep {
|
||||
acc_reg: target_register,
|
||||
col: expr_reg,
|
||||
delimiter: 0,
|
||||
func: AggFunc::Min,
|
||||
});
|
||||
target_register
|
||||
}
|
||||
AggFunc::StringAgg => {
|
||||
if agg.args.len() != 2 {
|
||||
crate::bail_parse_error!("string_agg bad number of arguments");
|
||||
}
|
||||
|
||||
let expr_reg = program.alloc_register();
|
||||
let delimiter_reg = program.alloc_register();
|
||||
|
||||
let expr = &agg.args[0];
|
||||
let delimiter_expr = match &agg.args[1] {
|
||||
ast::Expr::Column { .. } => agg.args[1].clone(),
|
||||
ast::Expr::Literal(ast::Literal::String(s)) => {
|
||||
ast::Expr::Literal(ast::Literal::String(s.to_string()))
|
||||
}
|
||||
_ => crate::bail_parse_error!("Incorrect delimiter parameter"),
|
||||
};
|
||||
|
||||
translate_expr(program, Some(referenced_tables), expr, expr_reg, resolver)?;
|
||||
translate_expr(
|
||||
program,
|
||||
Some(referenced_tables),
|
||||
&delimiter_expr,
|
||||
delimiter_reg,
|
||||
resolver,
|
||||
)?;
|
||||
|
||||
program.emit_insn(Insn::AggStep {
|
||||
acc_reg: target_register,
|
||||
col: expr_reg,
|
||||
delimiter: delimiter_reg,
|
||||
func: AggFunc::StringAgg,
|
||||
});
|
||||
|
||||
target_register
|
||||
}
|
||||
AggFunc::Sum => {
|
||||
if agg.args.len() != 1 {
|
||||
crate::bail_parse_error!("sum bad number of arguments");
|
||||
}
|
||||
let expr = &agg.args[0];
|
||||
let expr_reg = program.alloc_register();
|
||||
let _ = translate_expr(program, Some(referenced_tables), expr, expr_reg, resolver)?;
|
||||
program.emit_insn(Insn::AggStep {
|
||||
acc_reg: target_register,
|
||||
col: expr_reg,
|
||||
delimiter: 0,
|
||||
func: AggFunc::Sum,
|
||||
});
|
||||
target_register
|
||||
}
|
||||
AggFunc::Total => {
|
||||
if agg.args.len() != 1 {
|
||||
crate::bail_parse_error!("total bad number of arguments");
|
||||
}
|
||||
let expr = &agg.args[0];
|
||||
let expr_reg = program.alloc_register();
|
||||
let _ = translate_expr(program, Some(referenced_tables), expr, expr_reg, resolver)?;
|
||||
program.emit_insn(Insn::AggStep {
|
||||
acc_reg: target_register,
|
||||
col: expr_reg,
|
||||
delimiter: 0,
|
||||
func: AggFunc::Total,
|
||||
});
|
||||
target_register
|
||||
}
|
||||
};
|
||||
Ok(dest)
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
@@ -4,14 +4,14 @@ use sqlite3_parser::ast::{self, UnaryOperator};
|
||||
use crate::ext::{ExtFunc, UuidFunc};
|
||||
#[cfg(feature = "json")]
|
||||
use crate::function::JsonFunc;
|
||||
use crate::function::{AggFunc, Func, FuncCtx, MathFuncArity, ScalarFunc};
|
||||
use crate::function::{Func, FuncCtx, MathFuncArity, ScalarFunc};
|
||||
use crate::schema::Type;
|
||||
use crate::util::normalize_ident;
|
||||
use crate::vdbe::{builder::ProgramBuilder, insn::Insn, BranchOffset};
|
||||
use crate::Result;
|
||||
|
||||
use super::emitter::Resolver;
|
||||
use super::plan::{Aggregate, TableReference, TableReferenceType};
|
||||
use super::plan::{TableReference, TableReferenceType};
|
||||
|
||||
#[derive(Default, Debug, Clone, Copy)]
|
||||
pub struct ConditionMetadata {
|
||||
@@ -2049,366 +2049,6 @@ pub fn maybe_apply_affinity(col_type: Type, target_register: usize, program: &mu
|
||||
}
|
||||
}
|
||||
|
||||
pub fn translate_aggregation(
|
||||
program: &mut ProgramBuilder,
|
||||
referenced_tables: &[TableReference],
|
||||
agg: &Aggregate,
|
||||
target_register: usize,
|
||||
resolver: &Resolver,
|
||||
) -> Result<usize> {
|
||||
let dest = match agg.func {
|
||||
AggFunc::Avg => {
|
||||
if agg.args.len() != 1 {
|
||||
crate::bail_parse_error!("avg bad number of arguments");
|
||||
}
|
||||
let expr = &agg.args[0];
|
||||
let expr_reg = program.alloc_register();
|
||||
let _ = translate_expr(program, Some(referenced_tables), expr, expr_reg, resolver)?;
|
||||
program.emit_insn(Insn::AggStep {
|
||||
acc_reg: target_register,
|
||||
col: expr_reg,
|
||||
delimiter: 0,
|
||||
func: AggFunc::Avg,
|
||||
});
|
||||
target_register
|
||||
}
|
||||
AggFunc::Count => {
|
||||
let expr_reg = if agg.args.is_empty() {
|
||||
program.alloc_register()
|
||||
} else {
|
||||
let expr = &agg.args[0];
|
||||
let expr_reg = program.alloc_register();
|
||||
let _ = translate_expr(program, Some(referenced_tables), expr, expr_reg, resolver)?;
|
||||
expr_reg
|
||||
};
|
||||
program.emit_insn(Insn::AggStep {
|
||||
acc_reg: target_register,
|
||||
col: expr_reg,
|
||||
delimiter: 0,
|
||||
func: AggFunc::Count,
|
||||
});
|
||||
target_register
|
||||
}
|
||||
AggFunc::GroupConcat => {
|
||||
if agg.args.len() != 1 && agg.args.len() != 2 {
|
||||
crate::bail_parse_error!("group_concat bad number of arguments");
|
||||
}
|
||||
|
||||
let expr_reg = program.alloc_register();
|
||||
let delimiter_reg = program.alloc_register();
|
||||
|
||||
let expr = &agg.args[0];
|
||||
let delimiter_expr: ast::Expr;
|
||||
|
||||
if agg.args.len() == 2 {
|
||||
match &agg.args[1] {
|
||||
ast::Expr::Column { .. } => {
|
||||
delimiter_expr = agg.args[1].clone();
|
||||
}
|
||||
ast::Expr::Literal(ast::Literal::String(s)) => {
|
||||
delimiter_expr = ast::Expr::Literal(ast::Literal::String(s.to_string()));
|
||||
}
|
||||
_ => crate::bail_parse_error!("Incorrect delimiter parameter"),
|
||||
};
|
||||
} else {
|
||||
delimiter_expr = ast::Expr::Literal(ast::Literal::String(String::from("\",\"")));
|
||||
}
|
||||
|
||||
translate_expr(program, Some(referenced_tables), expr, expr_reg, resolver)?;
|
||||
translate_expr(
|
||||
program,
|
||||
Some(referenced_tables),
|
||||
&delimiter_expr,
|
||||
delimiter_reg,
|
||||
resolver,
|
||||
)?;
|
||||
|
||||
program.emit_insn(Insn::AggStep {
|
||||
acc_reg: target_register,
|
||||
col: expr_reg,
|
||||
delimiter: delimiter_reg,
|
||||
func: AggFunc::GroupConcat,
|
||||
});
|
||||
|
||||
target_register
|
||||
}
|
||||
AggFunc::Max => {
|
||||
if agg.args.len() != 1 {
|
||||
crate::bail_parse_error!("max bad number of arguments");
|
||||
}
|
||||
let expr = &agg.args[0];
|
||||
let expr_reg = program.alloc_register();
|
||||
let _ = translate_expr(program, Some(referenced_tables), expr, expr_reg, resolver)?;
|
||||
program.emit_insn(Insn::AggStep {
|
||||
acc_reg: target_register,
|
||||
col: expr_reg,
|
||||
delimiter: 0,
|
||||
func: AggFunc::Max,
|
||||
});
|
||||
target_register
|
||||
}
|
||||
AggFunc::Min => {
|
||||
if agg.args.len() != 1 {
|
||||
crate::bail_parse_error!("min bad number of arguments");
|
||||
}
|
||||
let expr = &agg.args[0];
|
||||
let expr_reg = program.alloc_register();
|
||||
let _ = translate_expr(program, Some(referenced_tables), expr, expr_reg, resolver)?;
|
||||
program.emit_insn(Insn::AggStep {
|
||||
acc_reg: target_register,
|
||||
col: expr_reg,
|
||||
delimiter: 0,
|
||||
func: AggFunc::Min,
|
||||
});
|
||||
target_register
|
||||
}
|
||||
AggFunc::StringAgg => {
|
||||
if agg.args.len() != 2 {
|
||||
crate::bail_parse_error!("string_agg bad number of arguments");
|
||||
}
|
||||
|
||||
let expr_reg = program.alloc_register();
|
||||
let delimiter_reg = program.alloc_register();
|
||||
|
||||
let expr = &agg.args[0];
|
||||
let delimiter_expr = match &agg.args[1] {
|
||||
ast::Expr::Column { .. } => agg.args[1].clone(),
|
||||
ast::Expr::Literal(ast::Literal::String(s)) => {
|
||||
ast::Expr::Literal(ast::Literal::String(s.to_string()))
|
||||
}
|
||||
_ => crate::bail_parse_error!("Incorrect delimiter parameter"),
|
||||
};
|
||||
|
||||
translate_expr(program, Some(referenced_tables), expr, expr_reg, resolver)?;
|
||||
translate_expr(
|
||||
program,
|
||||
Some(referenced_tables),
|
||||
&delimiter_expr,
|
||||
delimiter_reg,
|
||||
resolver,
|
||||
)?;
|
||||
|
||||
program.emit_insn(Insn::AggStep {
|
||||
acc_reg: target_register,
|
||||
col: expr_reg,
|
||||
delimiter: delimiter_reg,
|
||||
func: AggFunc::StringAgg,
|
||||
});
|
||||
|
||||
target_register
|
||||
}
|
||||
AggFunc::Sum => {
|
||||
if agg.args.len() != 1 {
|
||||
crate::bail_parse_error!("sum bad number of arguments");
|
||||
}
|
||||
let expr = &agg.args[0];
|
||||
let expr_reg = program.alloc_register();
|
||||
let _ = translate_expr(program, Some(referenced_tables), expr, expr_reg, resolver)?;
|
||||
program.emit_insn(Insn::AggStep {
|
||||
acc_reg: target_register,
|
||||
col: expr_reg,
|
||||
delimiter: 0,
|
||||
func: AggFunc::Sum,
|
||||
});
|
||||
target_register
|
||||
}
|
||||
AggFunc::Total => {
|
||||
if agg.args.len() != 1 {
|
||||
crate::bail_parse_error!("total bad number of arguments");
|
||||
}
|
||||
let expr = &agg.args[0];
|
||||
let expr_reg = program.alloc_register();
|
||||
let _ = translate_expr(program, Some(referenced_tables), expr, expr_reg, resolver)?;
|
||||
program.emit_insn(Insn::AggStep {
|
||||
acc_reg: target_register,
|
||||
col: expr_reg,
|
||||
delimiter: 0,
|
||||
func: AggFunc::Total,
|
||||
});
|
||||
target_register
|
||||
}
|
||||
};
|
||||
Ok(dest)
|
||||
}
|
||||
|
||||
pub fn translate_aggregation_groupby(
|
||||
program: &mut ProgramBuilder,
|
||||
referenced_tables: &[TableReference],
|
||||
group_by_sorter_cursor_id: usize,
|
||||
cursor_index: usize,
|
||||
agg: &Aggregate,
|
||||
target_register: usize,
|
||||
resolver: &Resolver,
|
||||
) -> Result<usize> {
|
||||
let emit_column = |program: &mut ProgramBuilder, expr_reg: usize| {
|
||||
program.emit_insn(Insn::Column {
|
||||
cursor_id: group_by_sorter_cursor_id,
|
||||
column: cursor_index,
|
||||
dest: expr_reg,
|
||||
});
|
||||
};
|
||||
let dest = match agg.func {
|
||||
AggFunc::Avg => {
|
||||
if agg.args.len() != 1 {
|
||||
crate::bail_parse_error!("avg bad number of arguments");
|
||||
}
|
||||
let expr_reg = program.alloc_register();
|
||||
emit_column(program, expr_reg);
|
||||
program.emit_insn(Insn::AggStep {
|
||||
acc_reg: target_register,
|
||||
col: expr_reg,
|
||||
delimiter: 0,
|
||||
func: AggFunc::Avg,
|
||||
});
|
||||
target_register
|
||||
}
|
||||
AggFunc::Count => {
|
||||
let expr_reg = program.alloc_register();
|
||||
emit_column(program, expr_reg);
|
||||
program.emit_insn(Insn::AggStep {
|
||||
acc_reg: target_register,
|
||||
col: expr_reg,
|
||||
delimiter: 0,
|
||||
func: AggFunc::Count,
|
||||
});
|
||||
target_register
|
||||
}
|
||||
AggFunc::GroupConcat => {
|
||||
if agg.args.len() != 1 && agg.args.len() != 2 {
|
||||
crate::bail_parse_error!("group_concat bad number of arguments");
|
||||
}
|
||||
|
||||
let expr_reg = program.alloc_register();
|
||||
let delimiter_reg = program.alloc_register();
|
||||
|
||||
let delimiter_expr: ast::Expr;
|
||||
|
||||
if agg.args.len() == 2 {
|
||||
match &agg.args[1] {
|
||||
ast::Expr::Column { .. } => {
|
||||
delimiter_expr = agg.args[1].clone();
|
||||
}
|
||||
ast::Expr::Literal(ast::Literal::String(s)) => {
|
||||
delimiter_expr = ast::Expr::Literal(ast::Literal::String(s.to_string()));
|
||||
}
|
||||
_ => crate::bail_parse_error!("Incorrect delimiter parameter"),
|
||||
};
|
||||
} else {
|
||||
delimiter_expr = ast::Expr::Literal(ast::Literal::String(String::from("\",\"")));
|
||||
}
|
||||
|
||||
emit_column(program, expr_reg);
|
||||
translate_expr(
|
||||
program,
|
||||
Some(referenced_tables),
|
||||
&delimiter_expr,
|
||||
delimiter_reg,
|
||||
resolver,
|
||||
)?;
|
||||
|
||||
program.emit_insn(Insn::AggStep {
|
||||
acc_reg: target_register,
|
||||
col: expr_reg,
|
||||
delimiter: delimiter_reg,
|
||||
func: AggFunc::GroupConcat,
|
||||
});
|
||||
|
||||
target_register
|
||||
}
|
||||
AggFunc::Max => {
|
||||
if agg.args.len() != 1 {
|
||||
crate::bail_parse_error!("max bad number of arguments");
|
||||
}
|
||||
let expr_reg = program.alloc_register();
|
||||
emit_column(program, expr_reg);
|
||||
program.emit_insn(Insn::AggStep {
|
||||
acc_reg: target_register,
|
||||
col: expr_reg,
|
||||
delimiter: 0,
|
||||
func: AggFunc::Max,
|
||||
});
|
||||
target_register
|
||||
}
|
||||
AggFunc::Min => {
|
||||
if agg.args.len() != 1 {
|
||||
crate::bail_parse_error!("min bad number of arguments");
|
||||
}
|
||||
let expr_reg = program.alloc_register();
|
||||
emit_column(program, expr_reg);
|
||||
program.emit_insn(Insn::AggStep {
|
||||
acc_reg: target_register,
|
||||
col: expr_reg,
|
||||
delimiter: 0,
|
||||
func: AggFunc::Min,
|
||||
});
|
||||
target_register
|
||||
}
|
||||
AggFunc::StringAgg => {
|
||||
if agg.args.len() != 2 {
|
||||
crate::bail_parse_error!("string_agg bad number of arguments");
|
||||
}
|
||||
|
||||
let expr_reg = program.alloc_register();
|
||||
let delimiter_reg = program.alloc_register();
|
||||
|
||||
let delimiter_expr = match &agg.args[1] {
|
||||
ast::Expr::Column { .. } => agg.args[1].clone(),
|
||||
ast::Expr::Literal(ast::Literal::String(s)) => {
|
||||
ast::Expr::Literal(ast::Literal::String(s.to_string()))
|
||||
}
|
||||
_ => crate::bail_parse_error!("Incorrect delimiter parameter"),
|
||||
};
|
||||
|
||||
emit_column(program, expr_reg);
|
||||
translate_expr(
|
||||
program,
|
||||
Some(referenced_tables),
|
||||
&delimiter_expr,
|
||||
delimiter_reg,
|
||||
resolver,
|
||||
)?;
|
||||
|
||||
program.emit_insn(Insn::AggStep {
|
||||
acc_reg: target_register,
|
||||
col: expr_reg,
|
||||
delimiter: delimiter_reg,
|
||||
func: AggFunc::StringAgg,
|
||||
});
|
||||
|
||||
target_register
|
||||
}
|
||||
AggFunc::Sum => {
|
||||
if agg.args.len() != 1 {
|
||||
crate::bail_parse_error!("sum bad number of arguments");
|
||||
}
|
||||
let expr_reg = program.alloc_register();
|
||||
emit_column(program, expr_reg);
|
||||
program.emit_insn(Insn::AggStep {
|
||||
acc_reg: target_register,
|
||||
col: expr_reg,
|
||||
delimiter: 0,
|
||||
func: AggFunc::Sum,
|
||||
});
|
||||
target_register
|
||||
}
|
||||
AggFunc::Total => {
|
||||
if agg.args.len() != 1 {
|
||||
crate::bail_parse_error!("total bad number of arguments");
|
||||
}
|
||||
let expr_reg = program.alloc_register();
|
||||
emit_column(program, expr_reg);
|
||||
program.emit_insn(Insn::AggStep {
|
||||
acc_reg: target_register,
|
||||
col: expr_reg,
|
||||
delimiter: 0,
|
||||
func: AggFunc::Total,
|
||||
});
|
||||
target_register
|
||||
}
|
||||
};
|
||||
Ok(dest)
|
||||
}
|
||||
|
||||
/// Get an appropriate name for an expression.
|
||||
/// If the query provides an alias (e.g. `SELECT a AS b FROM t`), use that (e.g. `b`).
|
||||
/// If the expression is a column from a table, use the column name (e.g. `a`).
|
||||
|
||||
643
core/translate/group_by.rs
Normal file
643
core/translate/group_by.rs
Normal file
@@ -0,0 +1,643 @@
|
||||
use std::rc::Rc;
|
||||
|
||||
use sqlite3_parser::ast;
|
||||
|
||||
use crate::{
|
||||
function::AggFunc,
|
||||
schema::{Column, PseudoTable, Table},
|
||||
types::{OwnedRecord, OwnedValue},
|
||||
vdbe::{builder::ProgramBuilder, insn::Insn, BranchOffset},
|
||||
Result,
|
||||
};
|
||||
|
||||
use super::{
|
||||
emitter::{Resolver, TranslateCtx},
|
||||
expr::{translate_condition_expr, translate_expr, ConditionMetadata},
|
||||
order_by::order_by_sorter_insert,
|
||||
plan::{Aggregate, GroupBy, SelectPlan, TableReference},
|
||||
result_row::emit_select_result,
|
||||
};
|
||||
|
||||
// Metadata for handling GROUP BY operations
|
||||
#[derive(Debug)]
|
||||
pub struct GroupByMetadata {
|
||||
// Cursor ID for the Sorter table where the grouped rows are stored
|
||||
pub sort_cursor: usize,
|
||||
// Label for the subroutine that clears the accumulator registers (temporary storage for per-group aggregate calculations)
|
||||
pub label_subrtn_acc_clear: BranchOffset,
|
||||
// Label for the instruction that sets the accumulator indicator to true (indicating data exists in the accumulator for the current group)
|
||||
pub label_acc_indicator_set_flag_true: BranchOffset,
|
||||
// Register holding the return offset for the accumulator clear subroutine
|
||||
pub reg_subrtn_acc_clear_return_offset: usize,
|
||||
// Register holding the key used for sorting in the Sorter
|
||||
pub reg_sorter_key: usize,
|
||||
// Register holding a flag to abort the grouping process if necessary
|
||||
pub reg_abort_flag: usize,
|
||||
// Register holding the start of the accumulator group registers (i.e. the groups, not the aggregates)
|
||||
pub reg_group_exprs_acc: usize,
|
||||
// Starting index of the register(s) that hold the comparison result between the current row and the previous row
|
||||
// The comparison result is used to determine if the current row belongs to the same group as the previous row
|
||||
// Each group by expression has a corresponding register
|
||||
pub reg_group_exprs_cmp: usize,
|
||||
}
|
||||
|
||||
/// Initialize resources needed for GROUP BY processing
|
||||
pub fn init_group_by(
|
||||
program: &mut ProgramBuilder,
|
||||
t_ctx: &mut TranslateCtx,
|
||||
group_by: &GroupBy,
|
||||
aggregates: &[Aggregate],
|
||||
) -> Result<()> {
|
||||
let num_aggs = aggregates.len();
|
||||
|
||||
let sort_cursor = program.alloc_cursor_id(None, None);
|
||||
|
||||
let reg_abort_flag = program.alloc_register();
|
||||
let reg_group_exprs_cmp = program.alloc_registers(group_by.exprs.len());
|
||||
let reg_group_exprs_acc = program.alloc_registers(group_by.exprs.len());
|
||||
let reg_agg_exprs_start = program.alloc_registers(num_aggs);
|
||||
let reg_sorter_key = program.alloc_register();
|
||||
|
||||
let label_subrtn_acc_clear = program.allocate_label();
|
||||
|
||||
let mut order = Vec::new();
|
||||
const ASCENDING: i64 = 0;
|
||||
for _ in group_by.exprs.iter() {
|
||||
order.push(OwnedValue::Integer(ASCENDING));
|
||||
}
|
||||
program.emit_insn(Insn::SorterOpen {
|
||||
cursor_id: sort_cursor,
|
||||
columns: aggregates.len() + group_by.exprs.len(),
|
||||
order: OwnedRecord::new(order),
|
||||
});
|
||||
|
||||
program.add_comment(program.offset(), "clear group by abort flag");
|
||||
program.emit_insn(Insn::Integer {
|
||||
value: 0,
|
||||
dest: reg_abort_flag,
|
||||
});
|
||||
|
||||
program.add_comment(
|
||||
program.offset(),
|
||||
"initialize group by comparison registers to NULL",
|
||||
);
|
||||
program.emit_insn(Insn::Null {
|
||||
dest: reg_group_exprs_cmp,
|
||||
dest_end: if group_by.exprs.len() > 1 {
|
||||
Some(reg_group_exprs_cmp + group_by.exprs.len() - 1)
|
||||
} else {
|
||||
None
|
||||
},
|
||||
});
|
||||
|
||||
program.add_comment(program.offset(), "go to clear accumulator subroutine");
|
||||
|
||||
let reg_subrtn_acc_clear_return_offset = program.alloc_register();
|
||||
program.emit_insn_with_label_dependency(
|
||||
Insn::Gosub {
|
||||
target_pc: label_subrtn_acc_clear,
|
||||
return_reg: reg_subrtn_acc_clear_return_offset,
|
||||
},
|
||||
label_subrtn_acc_clear,
|
||||
);
|
||||
|
||||
t_ctx.reg_agg_start = Some(reg_agg_exprs_start);
|
||||
|
||||
t_ctx.meta_group_by = Some(GroupByMetadata {
|
||||
sort_cursor,
|
||||
label_subrtn_acc_clear,
|
||||
label_acc_indicator_set_flag_true: program.allocate_label(),
|
||||
reg_subrtn_acc_clear_return_offset,
|
||||
reg_abort_flag,
|
||||
reg_group_exprs_acc,
|
||||
reg_group_exprs_cmp,
|
||||
reg_sorter_key,
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Emits the bytecode for processing a GROUP BY clause.
|
||||
/// This is called when the main query execution loop has finished processing,
|
||||
/// and we now have data in the GROUP BY sorter.
|
||||
pub fn emit_group_by<'a>(
|
||||
program: &mut ProgramBuilder,
|
||||
t_ctx: &mut TranslateCtx<'a>,
|
||||
plan: &'a SelectPlan,
|
||||
) -> Result<()> {
|
||||
// Label for the first instruction of the grouping loop.
|
||||
// This is the start of the loop that reads the sorted data and groups&aggregates it.
|
||||
let label_grouping_loop_start = program.allocate_label();
|
||||
// Label for the instruction immediately after the grouping loop.
|
||||
let label_grouping_loop_end = program.allocate_label();
|
||||
// Label for the instruction where a row for a finished group is output.
|
||||
// Distinct from subroutine_accumulator_output_label, which is the start of the subroutine, but may still skip emitting a row.
|
||||
let label_agg_final = program.allocate_label();
|
||||
// Label for the instruction immediately after the entire group by phase.
|
||||
let label_group_by_end = program.allocate_label();
|
||||
// Label for the beginning of the subroutine that potentially outputs a row for a finished group.
|
||||
let label_subrtn_acc_output = program.allocate_label();
|
||||
// Register holding the return offset of the subroutine that potentially outputs a row for a finished group.
|
||||
let reg_subrtn_acc_output_return_offset = program.alloc_register();
|
||||
// Register holding a boolean indicating whether there's data in the accumulator (used for aggregation)
|
||||
let reg_data_in_acc_flag = program.alloc_register();
|
||||
|
||||
let GroupByMetadata {
|
||||
sort_cursor,
|
||||
reg_group_exprs_cmp,
|
||||
reg_subrtn_acc_clear_return_offset,
|
||||
reg_group_exprs_acc,
|
||||
reg_abort_flag,
|
||||
reg_sorter_key,
|
||||
label_subrtn_acc_clear,
|
||||
label_acc_indicator_set_flag_true,
|
||||
..
|
||||
} = *t_ctx.meta_group_by.as_mut().unwrap();
|
||||
|
||||
let group_by = plan.group_by.as_ref().unwrap();
|
||||
|
||||
// all group by columns and all arguments of agg functions are in the sorter.
|
||||
// the sort keys are the group by columns (the aggregation within groups is done based on how long the sort keys remain the same)
|
||||
let sorter_column_count = group_by.exprs.len()
|
||||
+ plan
|
||||
.aggregates
|
||||
.iter()
|
||||
.map(|agg| agg.args.len())
|
||||
.sum::<usize>();
|
||||
// sorter column names do not matter
|
||||
let pseudo_columns = (0..sorter_column_count)
|
||||
.map(|i| Column {
|
||||
name: i.to_string(),
|
||||
primary_key: false,
|
||||
ty: crate::schema::Type::Null,
|
||||
is_rowid_alias: false,
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// A pseudo table is a "fake" table to which we read one row at a time from the sorter
|
||||
let pseudo_table = Rc::new(PseudoTable {
|
||||
columns: pseudo_columns,
|
||||
});
|
||||
|
||||
let pseudo_cursor = program.alloc_cursor_id(None, Some(Table::Pseudo(pseudo_table.clone())));
|
||||
|
||||
program.emit_insn(Insn::OpenPseudo {
|
||||
cursor_id: pseudo_cursor,
|
||||
content_reg: reg_sorter_key,
|
||||
num_fields: sorter_column_count,
|
||||
});
|
||||
|
||||
// Sort the sorter based on the group by columns
|
||||
program.emit_insn_with_label_dependency(
|
||||
Insn::SorterSort {
|
||||
cursor_id: sort_cursor,
|
||||
pc_if_empty: label_grouping_loop_end,
|
||||
},
|
||||
label_grouping_loop_end,
|
||||
);
|
||||
|
||||
program.defer_label_resolution(label_grouping_loop_start, program.offset() as usize);
|
||||
// Read a row from the sorted data in the sorter into the pseudo cursor
|
||||
program.emit_insn(Insn::SorterData {
|
||||
cursor_id: sort_cursor,
|
||||
dest_reg: reg_sorter_key,
|
||||
pseudo_cursor,
|
||||
});
|
||||
|
||||
// Read the group by columns from the pseudo cursor
|
||||
let groups_start_reg = program.alloc_registers(group_by.exprs.len());
|
||||
for i in 0..group_by.exprs.len() {
|
||||
let sorter_column_index = i;
|
||||
let group_reg = groups_start_reg + i;
|
||||
program.emit_insn(Insn::Column {
|
||||
cursor_id: pseudo_cursor,
|
||||
column: sorter_column_index,
|
||||
dest: group_reg,
|
||||
});
|
||||
}
|
||||
|
||||
// Compare the group by columns to the previous group by columns to see if we are at a new group or not
|
||||
program.emit_insn(Insn::Compare {
|
||||
start_reg_a: reg_group_exprs_cmp,
|
||||
start_reg_b: groups_start_reg,
|
||||
count: group_by.exprs.len(),
|
||||
});
|
||||
|
||||
let agg_step_label = program.allocate_label();
|
||||
|
||||
program.add_comment(
|
||||
program.offset(),
|
||||
"start new group if comparison is not equal",
|
||||
);
|
||||
// If we are at a new group, continue. If we are at the same group, jump to the aggregation step (i.e. accumulate more values into the aggregations)
|
||||
program.emit_insn_with_label_dependency(
|
||||
Insn::Jump {
|
||||
target_pc_lt: program.offset() + 1,
|
||||
target_pc_eq: agg_step_label,
|
||||
target_pc_gt: program.offset() + 1,
|
||||
},
|
||||
agg_step_label,
|
||||
);
|
||||
|
||||
// New group, move current group by columns into the comparison register
|
||||
program.emit_insn(Insn::Move {
|
||||
source_reg: groups_start_reg,
|
||||
dest_reg: reg_group_exprs_cmp,
|
||||
count: group_by.exprs.len(),
|
||||
});
|
||||
|
||||
program.add_comment(
|
||||
program.offset(),
|
||||
"check if ended group had data, and output if so",
|
||||
);
|
||||
program.emit_insn_with_label_dependency(
|
||||
Insn::Gosub {
|
||||
target_pc: label_subrtn_acc_output,
|
||||
return_reg: reg_subrtn_acc_output_return_offset,
|
||||
},
|
||||
label_subrtn_acc_output,
|
||||
);
|
||||
|
||||
program.add_comment(program.offset(), "check abort flag");
|
||||
program.emit_insn_with_label_dependency(
|
||||
Insn::IfPos {
|
||||
reg: reg_abort_flag,
|
||||
target_pc: label_group_by_end,
|
||||
decrement_by: 0,
|
||||
},
|
||||
label_group_by_end,
|
||||
);
|
||||
|
||||
program.add_comment(program.offset(), "goto clear accumulator subroutine");
|
||||
program.emit_insn_with_label_dependency(
|
||||
Insn::Gosub {
|
||||
target_pc: label_subrtn_acc_clear,
|
||||
return_reg: reg_subrtn_acc_clear_return_offset,
|
||||
},
|
||||
label_subrtn_acc_clear,
|
||||
);
|
||||
|
||||
// Accumulate the values into the aggregations
|
||||
program.resolve_label(agg_step_label, program.offset());
|
||||
let start_reg = t_ctx.reg_agg_start.unwrap();
|
||||
let mut cursor_index = group_by.exprs.len();
|
||||
for (i, agg) in plan.aggregates.iter().enumerate() {
|
||||
let agg_result_reg = start_reg + i;
|
||||
translate_aggregation_step_groupby(
|
||||
program,
|
||||
&plan.referenced_tables,
|
||||
pseudo_cursor,
|
||||
cursor_index,
|
||||
agg,
|
||||
agg_result_reg,
|
||||
&t_ctx.resolver,
|
||||
)?;
|
||||
cursor_index += agg.args.len();
|
||||
}
|
||||
|
||||
// We only emit the group by columns if we are going to start a new group (i.e. the prev group will not accumulate any more values into the aggregations)
|
||||
program.add_comment(
|
||||
program.offset(),
|
||||
"don't emit group columns if continuing existing group",
|
||||
);
|
||||
program.emit_insn_with_label_dependency(
|
||||
Insn::If {
|
||||
target_pc: label_acc_indicator_set_flag_true,
|
||||
reg: reg_data_in_acc_flag,
|
||||
null_reg: 0, // unused in this case
|
||||
},
|
||||
label_acc_indicator_set_flag_true,
|
||||
);
|
||||
|
||||
// Read the group by columns for a finished group
|
||||
for i in 0..group_by.exprs.len() {
|
||||
let key_reg = reg_group_exprs_acc + i;
|
||||
let sorter_column_index = i;
|
||||
program.emit_insn(Insn::Column {
|
||||
cursor_id: pseudo_cursor,
|
||||
column: sorter_column_index,
|
||||
dest: key_reg,
|
||||
});
|
||||
}
|
||||
|
||||
program.resolve_label(label_acc_indicator_set_flag_true, program.offset());
|
||||
program.add_comment(program.offset(), "indicate data in accumulator");
|
||||
program.emit_insn(Insn::Integer {
|
||||
value: 1,
|
||||
dest: reg_data_in_acc_flag,
|
||||
});
|
||||
|
||||
program.emit_insn_with_label_dependency(
|
||||
Insn::SorterNext {
|
||||
cursor_id: sort_cursor,
|
||||
pc_if_next: label_grouping_loop_start,
|
||||
},
|
||||
label_grouping_loop_start,
|
||||
);
|
||||
|
||||
program.resolve_label(label_grouping_loop_end, program.offset());
|
||||
|
||||
program.add_comment(program.offset(), "emit row for final group");
|
||||
program.emit_insn_with_label_dependency(
|
||||
Insn::Gosub {
|
||||
target_pc: label_subrtn_acc_output,
|
||||
return_reg: reg_subrtn_acc_output_return_offset,
|
||||
},
|
||||
label_subrtn_acc_output,
|
||||
);
|
||||
|
||||
program.add_comment(program.offset(), "group by finished");
|
||||
program.emit_insn_with_label_dependency(
|
||||
Insn::Goto {
|
||||
target_pc: label_group_by_end,
|
||||
},
|
||||
label_group_by_end,
|
||||
);
|
||||
program.emit_insn(Insn::Integer {
|
||||
value: 1,
|
||||
dest: reg_abort_flag,
|
||||
});
|
||||
program.emit_insn(Insn::Return {
|
||||
return_reg: reg_subrtn_acc_output_return_offset,
|
||||
});
|
||||
|
||||
program.resolve_label(label_subrtn_acc_output, program.offset());
|
||||
|
||||
program.add_comment(program.offset(), "output group by row subroutine start");
|
||||
program.emit_insn_with_label_dependency(
|
||||
Insn::IfPos {
|
||||
reg: reg_data_in_acc_flag,
|
||||
target_pc: label_agg_final,
|
||||
decrement_by: 0,
|
||||
},
|
||||
label_agg_final,
|
||||
);
|
||||
let group_by_end_without_emitting_row_label = program.allocate_label();
|
||||
program.defer_label_resolution(
|
||||
group_by_end_without_emitting_row_label,
|
||||
program.offset() as usize,
|
||||
);
|
||||
program.emit_insn(Insn::Return {
|
||||
return_reg: reg_subrtn_acc_output_return_offset,
|
||||
});
|
||||
|
||||
let agg_start_reg = t_ctx.reg_agg_start.unwrap();
|
||||
// Resolve the label for the start of the group by output row subroutine
|
||||
program.resolve_label(label_agg_final, program.offset());
|
||||
for (i, agg) in plan.aggregates.iter().enumerate() {
|
||||
let agg_result_reg = agg_start_reg + i;
|
||||
program.emit_insn(Insn::AggFinal {
|
||||
register: agg_result_reg,
|
||||
func: agg.func.clone(),
|
||||
});
|
||||
}
|
||||
|
||||
// we now have the group by columns in registers (group_exprs_start_register..group_exprs_start_register + group_by.len() - 1)
|
||||
// and the agg results in (agg_start_reg..agg_start_reg + aggregates.len() - 1)
|
||||
// we need to call translate_expr on each result column, but replace the expr with a register copy in case any part of the
|
||||
// result column expression matches a) a group by column or b) an aggregation result.
|
||||
for (i, expr) in group_by.exprs.iter().enumerate() {
|
||||
t_ctx
|
||||
.resolver
|
||||
.expr_to_reg_cache
|
||||
.push((expr, reg_group_exprs_acc + i));
|
||||
}
|
||||
for (i, agg) in plan.aggregates.iter().enumerate() {
|
||||
t_ctx
|
||||
.resolver
|
||||
.expr_to_reg_cache
|
||||
.push((&agg.original_expr, agg_start_reg + i));
|
||||
}
|
||||
|
||||
if let Some(having) = &group_by.having {
|
||||
for expr in having.iter() {
|
||||
translate_condition_expr(
|
||||
program,
|
||||
&plan.referenced_tables,
|
||||
expr,
|
||||
ConditionMetadata {
|
||||
jump_if_condition_is_true: false,
|
||||
jump_target_when_false: group_by_end_without_emitting_row_label,
|
||||
jump_target_when_true: i64::MAX, // unused
|
||||
},
|
||||
&t_ctx.resolver,
|
||||
)?;
|
||||
}
|
||||
}
|
||||
|
||||
match &plan.order_by {
|
||||
None => {
|
||||
emit_select_result(program, t_ctx, plan, Some(label_group_by_end))?;
|
||||
}
|
||||
Some(_) => {
|
||||
order_by_sorter_insert(program, t_ctx, plan)?;
|
||||
}
|
||||
}
|
||||
|
||||
program.emit_insn(Insn::Return {
|
||||
return_reg: reg_subrtn_acc_output_return_offset,
|
||||
});
|
||||
|
||||
program.add_comment(program.offset(), "clear accumulator subroutine start");
|
||||
program.resolve_label(label_subrtn_acc_clear, program.offset());
|
||||
let start_reg = reg_group_exprs_acc;
|
||||
program.emit_insn(Insn::Null {
|
||||
dest: start_reg,
|
||||
dest_end: Some(start_reg + group_by.exprs.len() + plan.aggregates.len() - 1),
|
||||
});
|
||||
|
||||
program.emit_insn(Insn::Integer {
|
||||
value: 0,
|
||||
dest: reg_data_in_acc_flag,
|
||||
});
|
||||
program.emit_insn(Insn::Return {
|
||||
return_reg: reg_subrtn_acc_clear_return_offset,
|
||||
});
|
||||
|
||||
program.resolve_label(label_group_by_end, program.offset());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Emits the bytecode for processing an aggregate step within a GROUP BY clause.
|
||||
/// Eg. in `SELECT product_category, SUM(price) FROM t GROUP BY line_item`, 'price' is evaluated for every row
|
||||
/// where the 'product_category' is the same, and the result is added to the accumulator for that category.
|
||||
///
|
||||
/// This is distinct from the final step, which is called after a single group has been entirely accumulated,
|
||||
/// and the actual result value of the aggregation is materialized.
|
||||
pub fn translate_aggregation_step_groupby(
|
||||
program: &mut ProgramBuilder,
|
||||
referenced_tables: &[TableReference],
|
||||
group_by_sorter_cursor_id: usize,
|
||||
cursor_index: usize,
|
||||
agg: &Aggregate,
|
||||
target_register: usize,
|
||||
resolver: &Resolver,
|
||||
) -> Result<usize> {
|
||||
let emit_column = |program: &mut ProgramBuilder, expr_reg: usize| {
|
||||
program.emit_insn(Insn::Column {
|
||||
cursor_id: group_by_sorter_cursor_id,
|
||||
column: cursor_index,
|
||||
dest: expr_reg,
|
||||
});
|
||||
};
|
||||
let dest = match agg.func {
|
||||
AggFunc::Avg => {
|
||||
if agg.args.len() != 1 {
|
||||
crate::bail_parse_error!("avg bad number of arguments");
|
||||
}
|
||||
let expr_reg = program.alloc_register();
|
||||
emit_column(program, expr_reg);
|
||||
program.emit_insn(Insn::AggStep {
|
||||
acc_reg: target_register,
|
||||
col: expr_reg,
|
||||
delimiter: 0,
|
||||
func: AggFunc::Avg,
|
||||
});
|
||||
target_register
|
||||
}
|
||||
AggFunc::Count => {
|
||||
let expr_reg = program.alloc_register();
|
||||
emit_column(program, expr_reg);
|
||||
program.emit_insn(Insn::AggStep {
|
||||
acc_reg: target_register,
|
||||
col: expr_reg,
|
||||
delimiter: 0,
|
||||
func: AggFunc::Count,
|
||||
});
|
||||
target_register
|
||||
}
|
||||
AggFunc::GroupConcat => {
|
||||
if agg.args.len() != 1 && agg.args.len() != 2 {
|
||||
crate::bail_parse_error!("group_concat bad number of arguments");
|
||||
}
|
||||
|
||||
let expr_reg = program.alloc_register();
|
||||
let delimiter_reg = program.alloc_register();
|
||||
|
||||
let delimiter_expr: ast::Expr;
|
||||
|
||||
if agg.args.len() == 2 {
|
||||
match &agg.args[1] {
|
||||
ast::Expr::Column { .. } => {
|
||||
delimiter_expr = agg.args[1].clone();
|
||||
}
|
||||
ast::Expr::Literal(ast::Literal::String(s)) => {
|
||||
delimiter_expr = ast::Expr::Literal(ast::Literal::String(s.to_string()));
|
||||
}
|
||||
_ => crate::bail_parse_error!("Incorrect delimiter parameter"),
|
||||
};
|
||||
} else {
|
||||
delimiter_expr = ast::Expr::Literal(ast::Literal::String(String::from("\",\"")));
|
||||
}
|
||||
|
||||
emit_column(program, expr_reg);
|
||||
translate_expr(
|
||||
program,
|
||||
Some(referenced_tables),
|
||||
&delimiter_expr,
|
||||
delimiter_reg,
|
||||
resolver,
|
||||
)?;
|
||||
|
||||
program.emit_insn(Insn::AggStep {
|
||||
acc_reg: target_register,
|
||||
col: expr_reg,
|
||||
delimiter: delimiter_reg,
|
||||
func: AggFunc::GroupConcat,
|
||||
});
|
||||
|
||||
target_register
|
||||
}
|
||||
AggFunc::Max => {
|
||||
if agg.args.len() != 1 {
|
||||
crate::bail_parse_error!("max bad number of arguments");
|
||||
}
|
||||
let expr_reg = program.alloc_register();
|
||||
emit_column(program, expr_reg);
|
||||
program.emit_insn(Insn::AggStep {
|
||||
acc_reg: target_register,
|
||||
col: expr_reg,
|
||||
delimiter: 0,
|
||||
func: AggFunc::Max,
|
||||
});
|
||||
target_register
|
||||
}
|
||||
AggFunc::Min => {
|
||||
if agg.args.len() != 1 {
|
||||
crate::bail_parse_error!("min bad number of arguments");
|
||||
}
|
||||
let expr_reg = program.alloc_register();
|
||||
emit_column(program, expr_reg);
|
||||
program.emit_insn(Insn::AggStep {
|
||||
acc_reg: target_register,
|
||||
col: expr_reg,
|
||||
delimiter: 0,
|
||||
func: AggFunc::Min,
|
||||
});
|
||||
target_register
|
||||
}
|
||||
AggFunc::StringAgg => {
|
||||
if agg.args.len() != 2 {
|
||||
crate::bail_parse_error!("string_agg bad number of arguments");
|
||||
}
|
||||
|
||||
let expr_reg = program.alloc_register();
|
||||
let delimiter_reg = program.alloc_register();
|
||||
|
||||
let delimiter_expr = match &agg.args[1] {
|
||||
ast::Expr::Column { .. } => agg.args[1].clone(),
|
||||
ast::Expr::Literal(ast::Literal::String(s)) => {
|
||||
ast::Expr::Literal(ast::Literal::String(s.to_string()))
|
||||
}
|
||||
_ => crate::bail_parse_error!("Incorrect delimiter parameter"),
|
||||
};
|
||||
|
||||
emit_column(program, expr_reg);
|
||||
translate_expr(
|
||||
program,
|
||||
Some(referenced_tables),
|
||||
&delimiter_expr,
|
||||
delimiter_reg,
|
||||
resolver,
|
||||
)?;
|
||||
|
||||
program.emit_insn(Insn::AggStep {
|
||||
acc_reg: target_register,
|
||||
col: expr_reg,
|
||||
delimiter: delimiter_reg,
|
||||
func: AggFunc::StringAgg,
|
||||
});
|
||||
|
||||
target_register
|
||||
}
|
||||
AggFunc::Sum => {
|
||||
if agg.args.len() != 1 {
|
||||
crate::bail_parse_error!("sum bad number of arguments");
|
||||
}
|
||||
let expr_reg = program.alloc_register();
|
||||
emit_column(program, expr_reg);
|
||||
program.emit_insn(Insn::AggStep {
|
||||
acc_reg: target_register,
|
||||
col: expr_reg,
|
||||
delimiter: 0,
|
||||
func: AggFunc::Sum,
|
||||
});
|
||||
target_register
|
||||
}
|
||||
AggFunc::Total => {
|
||||
if agg.args.len() != 1 {
|
||||
crate::bail_parse_error!("total bad number of arguments");
|
||||
}
|
||||
let expr_reg = program.alloc_register();
|
||||
emit_column(program, expr_reg);
|
||||
program.emit_insn(Insn::AggStep {
|
||||
acc_reg: target_register,
|
||||
col: expr_reg,
|
||||
delimiter: 0,
|
||||
func: AggFunc::Total,
|
||||
});
|
||||
target_register
|
||||
}
|
||||
};
|
||||
Ok(dest)
|
||||
}
|
||||
882
core/translate/main_loop.rs
Normal file
882
core/translate/main_loop.rs
Normal file
@@ -0,0 +1,882 @@
|
||||
use sqlite3_parser::ast;
|
||||
|
||||
use crate::{
|
||||
schema::Table,
|
||||
translate::result_row::emit_select_result,
|
||||
vdbe::{builder::ProgramBuilder, insn::Insn, BranchOffset},
|
||||
Result,
|
||||
};
|
||||
|
||||
use super::{
|
||||
aggregation::translate_aggregation_step,
|
||||
emitter::{OperationMode, TranslateCtx},
|
||||
expr::{translate_condition_expr, translate_expr, ConditionMetadata},
|
||||
order_by::{order_by_sorter_insert, sorter_insert},
|
||||
plan::{
|
||||
IterationDirection, Search, SelectPlan, SelectQueryType, SourceOperator, TableReference,
|
||||
},
|
||||
};
|
||||
|
||||
// Metadata for handling LEFT JOIN operations
|
||||
#[derive(Debug)]
|
||||
pub struct LeftJoinMetadata {
|
||||
// integer register that holds a flag that is set to true if the current row has a match for the left join
|
||||
pub reg_match_flag: usize,
|
||||
// label for the instruction that sets the match flag to true
|
||||
pub label_match_flag_set_true: BranchOffset,
|
||||
// label for the instruction that checks if the match flag is true
|
||||
pub label_match_flag_check_value: BranchOffset,
|
||||
}
|
||||
|
||||
/// Jump labels for each loop in the query's main execution loop
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct LoopLabels {
|
||||
/// jump to the start of the loop body
|
||||
loop_start: BranchOffset,
|
||||
/// jump to the NextAsync instruction (or equivalent)
|
||||
next: BranchOffset,
|
||||
/// jump to the end of the loop, exiting it
|
||||
loop_end: BranchOffset,
|
||||
}
|
||||
|
||||
/// Initialize resources needed for the source operators (tables, joins, etc)
|
||||
pub fn init_loop(
|
||||
program: &mut ProgramBuilder,
|
||||
t_ctx: &mut TranslateCtx,
|
||||
source: &SourceOperator,
|
||||
mode: &OperationMode,
|
||||
) -> Result<()> {
|
||||
let operator_id = source.id();
|
||||
let loop_labels = LoopLabels {
|
||||
next: program.allocate_label(),
|
||||
loop_start: program.allocate_label(),
|
||||
loop_end: program.allocate_label(),
|
||||
};
|
||||
t_ctx.labels_main_loop.insert(operator_id, loop_labels);
|
||||
|
||||
match source {
|
||||
SourceOperator::Subquery { .. } => Ok(()),
|
||||
SourceOperator::Join {
|
||||
id,
|
||||
left,
|
||||
right,
|
||||
outer,
|
||||
..
|
||||
} => {
|
||||
if *outer {
|
||||
let lj_metadata = LeftJoinMetadata {
|
||||
reg_match_flag: program.alloc_register(),
|
||||
label_match_flag_set_true: program.allocate_label(),
|
||||
label_match_flag_check_value: program.allocate_label(),
|
||||
};
|
||||
t_ctx.meta_left_joins.insert(*id, lj_metadata);
|
||||
}
|
||||
init_loop(program, t_ctx, left, mode)?;
|
||||
init_loop(program, t_ctx, right, mode)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
SourceOperator::Scan {
|
||||
table_reference, ..
|
||||
} => {
|
||||
let cursor_id = program.alloc_cursor_id(
|
||||
Some(table_reference.table_identifier.clone()),
|
||||
Some(table_reference.table.clone()),
|
||||
);
|
||||
let root_page = table_reference.table.get_root_page();
|
||||
|
||||
match mode {
|
||||
OperationMode::SELECT => {
|
||||
program.emit_insn(Insn::OpenReadAsync {
|
||||
cursor_id,
|
||||
root_page,
|
||||
});
|
||||
program.emit_insn(Insn::OpenReadAwait {});
|
||||
}
|
||||
OperationMode::DELETE => {
|
||||
program.emit_insn(Insn::OpenWriteAsync {
|
||||
cursor_id,
|
||||
root_page,
|
||||
});
|
||||
program.emit_insn(Insn::OpenWriteAwait {});
|
||||
}
|
||||
_ => {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
SourceOperator::Search {
|
||||
table_reference,
|
||||
search,
|
||||
..
|
||||
} => {
|
||||
let table_cursor_id = program.alloc_cursor_id(
|
||||
Some(table_reference.table_identifier.clone()),
|
||||
Some(table_reference.table.clone()),
|
||||
);
|
||||
|
||||
match mode {
|
||||
OperationMode::SELECT => {
|
||||
program.emit_insn(Insn::OpenReadAsync {
|
||||
cursor_id: table_cursor_id,
|
||||
root_page: table_reference.table.get_root_page(),
|
||||
});
|
||||
program.emit_insn(Insn::OpenReadAwait {});
|
||||
}
|
||||
OperationMode::DELETE => {
|
||||
program.emit_insn(Insn::OpenWriteAsync {
|
||||
cursor_id: table_cursor_id,
|
||||
root_page: table_reference.table.get_root_page(),
|
||||
});
|
||||
program.emit_insn(Insn::OpenWriteAwait {});
|
||||
}
|
||||
_ => {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
if let Search::IndexSearch { index, .. } = search {
|
||||
let index_cursor_id = program
|
||||
.alloc_cursor_id(Some(index.name.clone()), Some(Table::Index(index.clone())));
|
||||
|
||||
match mode {
|
||||
OperationMode::SELECT => {
|
||||
program.emit_insn(Insn::OpenReadAsync {
|
||||
cursor_id: index_cursor_id,
|
||||
root_page: index.root_page,
|
||||
});
|
||||
program.emit_insn(Insn::OpenReadAwait);
|
||||
}
|
||||
OperationMode::DELETE => {
|
||||
program.emit_insn(Insn::OpenWriteAsync {
|
||||
cursor_id: index_cursor_id,
|
||||
root_page: index.root_page,
|
||||
});
|
||||
program.emit_insn(Insn::OpenWriteAwait {});
|
||||
}
|
||||
_ => {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
SourceOperator::Nothing { .. } => Ok(()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Set up the main query execution loop
|
||||
/// For example in the case of a nested table scan, this means emitting the RewindAsync instruction
|
||||
/// for all tables involved, outermost first.
|
||||
pub fn open_loop(
|
||||
program: &mut ProgramBuilder,
|
||||
t_ctx: &mut TranslateCtx,
|
||||
source: &mut SourceOperator,
|
||||
referenced_tables: &[TableReference],
|
||||
) -> Result<()> {
|
||||
match source {
|
||||
SourceOperator::Subquery {
|
||||
id,
|
||||
predicates,
|
||||
plan,
|
||||
..
|
||||
} => {
|
||||
let (yield_reg, coroutine_implementation_start) = match &plan.query_type {
|
||||
SelectQueryType::Subquery {
|
||||
yield_reg,
|
||||
coroutine_implementation_start,
|
||||
} => (*yield_reg, *coroutine_implementation_start),
|
||||
_ => unreachable!("Subquery operator with non-subquery query type"),
|
||||
};
|
||||
// In case the subquery is an inner loop, it needs to be reinitialized on each iteration of the outer loop.
|
||||
program.emit_insn(Insn::InitCoroutine {
|
||||
yield_reg,
|
||||
jump_on_definition: 0,
|
||||
start_offset: coroutine_implementation_start,
|
||||
});
|
||||
let LoopLabels {
|
||||
loop_start,
|
||||
loop_end,
|
||||
next,
|
||||
} = *t_ctx
|
||||
.labels_main_loop
|
||||
.get(id)
|
||||
.expect("subquery has no loop labels");
|
||||
program.defer_label_resolution(loop_start, program.offset() as usize);
|
||||
// A subquery within the main loop of a parent query has no cursor, so instead of advancing the cursor,
|
||||
// it emits a Yield which jumps back to the main loop of the subquery itself to retrieve the next row.
|
||||
// When the subquery coroutine completes, this instruction jumps to the label at the top of the termination_label_stack,
|
||||
// which in this case is the end of the Yield-Goto loop in the parent query.
|
||||
program.emit_insn_with_label_dependency(
|
||||
Insn::Yield {
|
||||
yield_reg,
|
||||
end_offset: loop_end,
|
||||
},
|
||||
loop_end,
|
||||
);
|
||||
|
||||
// These are predicates evaluated outside of the subquery,
|
||||
// so they are translated here.
|
||||
// E.g. SELECT foo FROM (SELECT bar as foo FROM t1) sub WHERE sub.foo > 10
|
||||
if let Some(preds) = predicates {
|
||||
for expr in preds {
|
||||
let jump_target_when_true = program.allocate_label();
|
||||
let condition_metadata = ConditionMetadata {
|
||||
jump_if_condition_is_true: false,
|
||||
jump_target_when_true,
|
||||
jump_target_when_false: next,
|
||||
};
|
||||
translate_condition_expr(
|
||||
program,
|
||||
referenced_tables,
|
||||
expr,
|
||||
condition_metadata,
|
||||
&t_ctx.resolver,
|
||||
)?;
|
||||
program.resolve_label(jump_target_when_true, program.offset());
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
SourceOperator::Join {
|
||||
id,
|
||||
left,
|
||||
right,
|
||||
predicates,
|
||||
outer,
|
||||
..
|
||||
} => {
|
||||
open_loop(program, t_ctx, left, referenced_tables)?;
|
||||
|
||||
let LoopLabels { next, .. } = *t_ctx
|
||||
.labels_main_loop
|
||||
.get(&right.id())
|
||||
.expect("right side of join has no loop labels");
|
||||
|
||||
let mut jump_target_when_false = next;
|
||||
|
||||
if *outer {
|
||||
let lj_meta = t_ctx.meta_left_joins.get(id).unwrap();
|
||||
program.emit_insn(Insn::Integer {
|
||||
value: 0,
|
||||
dest: lj_meta.reg_match_flag,
|
||||
});
|
||||
jump_target_when_false = lj_meta.label_match_flag_check_value;
|
||||
}
|
||||
|
||||
open_loop(program, t_ctx, right, referenced_tables)?;
|
||||
|
||||
if let Some(predicates) = predicates {
|
||||
let jump_target_when_true = program.allocate_label();
|
||||
let condition_metadata = ConditionMetadata {
|
||||
jump_if_condition_is_true: false,
|
||||
jump_target_when_true,
|
||||
jump_target_when_false,
|
||||
};
|
||||
for predicate in predicates.iter() {
|
||||
translate_condition_expr(
|
||||
program,
|
||||
referenced_tables,
|
||||
predicate,
|
||||
condition_metadata,
|
||||
&t_ctx.resolver,
|
||||
)?;
|
||||
}
|
||||
program.resolve_label(jump_target_when_true, program.offset());
|
||||
}
|
||||
|
||||
if *outer {
|
||||
let lj_meta = t_ctx.meta_left_joins.get(id).unwrap();
|
||||
program.defer_label_resolution(
|
||||
lj_meta.label_match_flag_set_true,
|
||||
program.offset() as usize,
|
||||
);
|
||||
program.emit_insn(Insn::Integer {
|
||||
value: 1,
|
||||
dest: lj_meta.reg_match_flag,
|
||||
});
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
SourceOperator::Scan {
|
||||
id,
|
||||
table_reference,
|
||||
predicates,
|
||||
iter_dir,
|
||||
} => {
|
||||
let cursor_id = program.resolve_cursor_id(&table_reference.table_identifier);
|
||||
if iter_dir
|
||||
.as_ref()
|
||||
.is_some_and(|dir| *dir == IterationDirection::Backwards)
|
||||
{
|
||||
program.emit_insn(Insn::LastAsync { cursor_id });
|
||||
} else {
|
||||
program.emit_insn(Insn::RewindAsync { cursor_id });
|
||||
}
|
||||
let LoopLabels {
|
||||
loop_start,
|
||||
loop_end,
|
||||
next,
|
||||
} = *t_ctx
|
||||
.labels_main_loop
|
||||
.get(id)
|
||||
.expect("scan has no loop labels");
|
||||
program.emit_insn_with_label_dependency(
|
||||
if iter_dir
|
||||
.as_ref()
|
||||
.is_some_and(|dir| *dir == IterationDirection::Backwards)
|
||||
{
|
||||
Insn::LastAwait {
|
||||
cursor_id,
|
||||
pc_if_empty: loop_end,
|
||||
}
|
||||
} else {
|
||||
Insn::RewindAwait {
|
||||
cursor_id,
|
||||
pc_if_empty: loop_end,
|
||||
}
|
||||
},
|
||||
loop_end,
|
||||
);
|
||||
program.defer_label_resolution(loop_start, program.offset() as usize);
|
||||
|
||||
if let Some(preds) = predicates {
|
||||
for expr in preds {
|
||||
let jump_target_when_true = program.allocate_label();
|
||||
let condition_metadata = ConditionMetadata {
|
||||
jump_if_condition_is_true: false,
|
||||
jump_target_when_true,
|
||||
jump_target_when_false: next,
|
||||
};
|
||||
translate_condition_expr(
|
||||
program,
|
||||
referenced_tables,
|
||||
expr,
|
||||
condition_metadata,
|
||||
&t_ctx.resolver,
|
||||
)?;
|
||||
program.resolve_label(jump_target_when_true, program.offset());
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
SourceOperator::Search {
|
||||
id,
|
||||
table_reference,
|
||||
search,
|
||||
predicates,
|
||||
..
|
||||
} => {
|
||||
let table_cursor_id = program.resolve_cursor_id(&table_reference.table_identifier);
|
||||
let LoopLabels {
|
||||
loop_start,
|
||||
loop_end,
|
||||
next,
|
||||
} = *t_ctx
|
||||
.labels_main_loop
|
||||
.get(id)
|
||||
.expect("search has no loop labels");
|
||||
// Open the loop for the index search.
|
||||
// Rowid equality point lookups are handled with a SeekRowid instruction which does not loop, since it is a single row lookup.
|
||||
if !matches!(search, Search::RowidEq { .. }) {
|
||||
let index_cursor_id = if let Search::IndexSearch { index, .. } = search {
|
||||
Some(program.resolve_cursor_id(&index.name))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let cmp_reg = program.alloc_register();
|
||||
let (cmp_expr, cmp_op) = match search {
|
||||
Search::IndexSearch {
|
||||
cmp_expr, cmp_op, ..
|
||||
} => (cmp_expr, cmp_op),
|
||||
Search::RowidSearch { cmp_expr, cmp_op } => (cmp_expr, cmp_op),
|
||||
Search::RowidEq { .. } => unreachable!(),
|
||||
};
|
||||
// TODO this only handles ascending indexes
|
||||
match cmp_op {
|
||||
ast::Operator::Equals
|
||||
| ast::Operator::Greater
|
||||
| ast::Operator::GreaterEquals => {
|
||||
translate_expr(
|
||||
program,
|
||||
Some(referenced_tables),
|
||||
cmp_expr,
|
||||
cmp_reg,
|
||||
&t_ctx.resolver,
|
||||
)?;
|
||||
}
|
||||
ast::Operator::Less | ast::Operator::LessEquals => {
|
||||
program.emit_insn(Insn::Null {
|
||||
dest: cmp_reg,
|
||||
dest_end: None,
|
||||
});
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
// If we try to seek to a key that is not present in the table/index, we exit the loop entirely.
|
||||
program.emit_insn_with_label_dependency(
|
||||
match cmp_op {
|
||||
ast::Operator::Equals | ast::Operator::GreaterEquals => Insn::SeekGE {
|
||||
is_index: index_cursor_id.is_some(),
|
||||
cursor_id: index_cursor_id.unwrap_or(table_cursor_id),
|
||||
start_reg: cmp_reg,
|
||||
num_regs: 1,
|
||||
target_pc: loop_end,
|
||||
},
|
||||
ast::Operator::Greater
|
||||
| ast::Operator::Less
|
||||
| ast::Operator::LessEquals => Insn::SeekGT {
|
||||
is_index: index_cursor_id.is_some(),
|
||||
cursor_id: index_cursor_id.unwrap_or(table_cursor_id),
|
||||
start_reg: cmp_reg,
|
||||
num_regs: 1,
|
||||
target_pc: loop_end,
|
||||
},
|
||||
_ => unreachable!(),
|
||||
},
|
||||
loop_end,
|
||||
);
|
||||
if *cmp_op == ast::Operator::Less || *cmp_op == ast::Operator::LessEquals {
|
||||
translate_expr(
|
||||
program,
|
||||
Some(referenced_tables),
|
||||
cmp_expr,
|
||||
cmp_reg,
|
||||
&t_ctx.resolver,
|
||||
)?;
|
||||
}
|
||||
|
||||
program.defer_label_resolution(loop_start, program.offset() as usize);
|
||||
// TODO: We are currently only handling ascending indexes.
|
||||
// For conditions like index_key > 10, we have already seeked to the first key greater than 10, and can just scan forward.
|
||||
// For conditions like index_key < 10, we are at the beginning of the index, and will scan forward and emit IdxGE(10) with a conditional jump to the end.
|
||||
// For conditions like index_key = 10, we have already seeked to the first key greater than or equal to 10, and can just scan forward and emit IdxGT(10) with a conditional jump to the end.
|
||||
// For conditions like index_key >= 10, we have already seeked to the first key greater than or equal to 10, and can just scan forward.
|
||||
// For conditions like index_key <= 10, we are at the beginning of the index, and will scan forward and emit IdxGT(10) with a conditional jump to the end.
|
||||
// For conditions like index_key != 10, TODO. probably the optimal way is not to use an index at all.
|
||||
//
|
||||
// For primary key searches we emit RowId and then compare it to the seek value.
|
||||
|
||||
match cmp_op {
|
||||
ast::Operator::Equals | ast::Operator::LessEquals => {
|
||||
if let Some(index_cursor_id) = index_cursor_id {
|
||||
program.emit_insn_with_label_dependency(
|
||||
Insn::IdxGT {
|
||||
cursor_id: index_cursor_id,
|
||||
start_reg: cmp_reg,
|
||||
num_regs: 1,
|
||||
target_pc: loop_end,
|
||||
},
|
||||
loop_end,
|
||||
);
|
||||
} else {
|
||||
let rowid_reg = program.alloc_register();
|
||||
program.emit_insn(Insn::RowId {
|
||||
cursor_id: table_cursor_id,
|
||||
dest: rowid_reg,
|
||||
});
|
||||
program.emit_insn_with_label_dependency(
|
||||
Insn::Gt {
|
||||
lhs: rowid_reg,
|
||||
rhs: cmp_reg,
|
||||
target_pc: loop_end,
|
||||
},
|
||||
loop_end,
|
||||
);
|
||||
}
|
||||
}
|
||||
ast::Operator::Less => {
|
||||
if let Some(index_cursor_id) = index_cursor_id {
|
||||
program.emit_insn_with_label_dependency(
|
||||
Insn::IdxGE {
|
||||
cursor_id: index_cursor_id,
|
||||
start_reg: cmp_reg,
|
||||
num_regs: 1,
|
||||
target_pc: loop_end,
|
||||
},
|
||||
loop_end,
|
||||
);
|
||||
} else {
|
||||
let rowid_reg = program.alloc_register();
|
||||
program.emit_insn(Insn::RowId {
|
||||
cursor_id: table_cursor_id,
|
||||
dest: rowid_reg,
|
||||
});
|
||||
program.emit_insn_with_label_dependency(
|
||||
Insn::Ge {
|
||||
lhs: rowid_reg,
|
||||
rhs: cmp_reg,
|
||||
target_pc: loop_end,
|
||||
},
|
||||
loop_end,
|
||||
);
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
if let Some(index_cursor_id) = index_cursor_id {
|
||||
program.emit_insn(Insn::DeferredSeek {
|
||||
index_cursor_id,
|
||||
table_cursor_id,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
if let Search::RowidEq { cmp_expr } = search {
|
||||
let src_reg = program.alloc_register();
|
||||
translate_expr(
|
||||
program,
|
||||
Some(referenced_tables),
|
||||
cmp_expr,
|
||||
src_reg,
|
||||
&t_ctx.resolver,
|
||||
)?;
|
||||
program.emit_insn_with_label_dependency(
|
||||
Insn::SeekRowid {
|
||||
cursor_id: table_cursor_id,
|
||||
src_reg,
|
||||
target_pc: next,
|
||||
},
|
||||
next,
|
||||
);
|
||||
}
|
||||
if let Some(predicates) = predicates {
|
||||
for predicate in predicates.iter() {
|
||||
let jump_target_when_true = program.allocate_label();
|
||||
let condition_metadata = ConditionMetadata {
|
||||
jump_if_condition_is_true: false,
|
||||
jump_target_when_true,
|
||||
jump_target_when_false: next,
|
||||
};
|
||||
translate_condition_expr(
|
||||
program,
|
||||
referenced_tables,
|
||||
predicate,
|
||||
condition_metadata,
|
||||
&t_ctx.resolver,
|
||||
)?;
|
||||
program.resolve_label(jump_target_when_true, program.offset());
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
SourceOperator::Nothing { .. } => Ok(()),
|
||||
}
|
||||
}
|
||||
|
||||
/// SQLite (and so Limbo) processes joins as a nested loop.
|
||||
/// The loop may emit rows to various destinations depending on the query:
|
||||
/// - a GROUP BY sorter (grouping is done by sorting based on the GROUP BY keys and aggregating while the GROUP BY keys match)
|
||||
/// - an ORDER BY sorter (when there is no GROUP BY, but there is an ORDER BY)
|
||||
/// - an AggStep (the columns are collected for aggregation, which is finished later)
|
||||
/// - a QueryResult (there is none of the above, so the loop either emits a ResultRow, or if it's a subquery, yields to the parent query)
|
||||
enum LoopEmitTarget {
|
||||
GroupBySorter,
|
||||
OrderBySorter,
|
||||
AggStep,
|
||||
QueryResult,
|
||||
}
|
||||
|
||||
/// Emits the bytecode for the inner loop of a query.
|
||||
/// At this point the cursors for all tables have been opened and rewound.
|
||||
pub fn emit_loop(
|
||||
program: &mut ProgramBuilder,
|
||||
t_ctx: &mut TranslateCtx,
|
||||
plan: &mut SelectPlan,
|
||||
) -> Result<()> {
|
||||
// if we have a group by, we emit a record into the group by sorter.
|
||||
if plan.group_by.is_some() {
|
||||
return emit_loop_source(program, t_ctx, plan, LoopEmitTarget::GroupBySorter);
|
||||
}
|
||||
// if we DONT have a group by, but we have aggregates, we emit without ResultRow.
|
||||
// we also do not need to sort because we are emitting a single row.
|
||||
if !plan.aggregates.is_empty() {
|
||||
return emit_loop_source(program, t_ctx, plan, LoopEmitTarget::AggStep);
|
||||
}
|
||||
// if we DONT have a group by, but we have an order by, we emit a record into the order by sorter.
|
||||
if plan.order_by.is_some() {
|
||||
return emit_loop_source(program, t_ctx, plan, LoopEmitTarget::OrderBySorter);
|
||||
}
|
||||
// if we have neither, we emit a ResultRow. In that case, if we have a Limit, we handle that with DecrJumpZero.
|
||||
emit_loop_source(program, t_ctx, plan, LoopEmitTarget::QueryResult)
|
||||
}
|
||||
|
||||
/// This is a helper function for inner_loop_emit,
|
||||
/// which does a different thing depending on the emit target.
|
||||
/// See the InnerLoopEmitTarget enum for more details.
|
||||
fn emit_loop_source(
|
||||
program: &mut ProgramBuilder,
|
||||
t_ctx: &mut TranslateCtx,
|
||||
plan: &SelectPlan,
|
||||
emit_target: LoopEmitTarget,
|
||||
) -> Result<()> {
|
||||
match emit_target {
|
||||
LoopEmitTarget::GroupBySorter => {
|
||||
let group_by = plan.group_by.as_ref().unwrap();
|
||||
let aggregates = &plan.aggregates;
|
||||
let sort_keys_count = group_by.exprs.len();
|
||||
let aggregate_arguments_count = plan
|
||||
.aggregates
|
||||
.iter()
|
||||
.map(|agg| agg.args.len())
|
||||
.sum::<usize>();
|
||||
let column_count = sort_keys_count + aggregate_arguments_count;
|
||||
let start_reg = program.alloc_registers(column_count);
|
||||
let mut cur_reg = start_reg;
|
||||
|
||||
// The group by sorter rows will contain the grouping keys first. They are also the sort keys.
|
||||
for expr in group_by.exprs.iter() {
|
||||
let key_reg = cur_reg;
|
||||
cur_reg += 1;
|
||||
translate_expr(
|
||||
program,
|
||||
Some(&plan.referenced_tables),
|
||||
expr,
|
||||
key_reg,
|
||||
&t_ctx.resolver,
|
||||
)?;
|
||||
}
|
||||
// Then we have the aggregate arguments.
|
||||
for agg in aggregates.iter() {
|
||||
// Here we are collecting scalars for the group by sorter, which will include
|
||||
// both the group by expressions and the aggregate arguments.
|
||||
// e.g. in `select u.first_name, sum(u.age) from users group by u.first_name`
|
||||
// the sorter will have two scalars: u.first_name and u.age.
|
||||
// these are then sorted by u.first_name, and for each u.first_name, we sum the u.age.
|
||||
// the actual aggregation is done later.
|
||||
for expr in agg.args.iter() {
|
||||
let agg_reg = cur_reg;
|
||||
cur_reg += 1;
|
||||
translate_expr(
|
||||
program,
|
||||
Some(&plan.referenced_tables),
|
||||
expr,
|
||||
agg_reg,
|
||||
&t_ctx.resolver,
|
||||
)?;
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: although it's less often useful, SQLite does allow for expressions in the SELECT that are not part of a GROUP BY or aggregate.
|
||||
// We currently ignore those and only emit the GROUP BY keys and aggregate arguments. This should be fixed.
|
||||
|
||||
let group_by_metadata = t_ctx.meta_group_by.as_ref().unwrap();
|
||||
|
||||
sorter_insert(
|
||||
program,
|
||||
start_reg,
|
||||
column_count,
|
||||
group_by_metadata.sort_cursor,
|
||||
group_by_metadata.reg_sorter_key,
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
LoopEmitTarget::OrderBySorter => order_by_sorter_insert(program, t_ctx, plan),
|
||||
LoopEmitTarget::AggStep => {
|
||||
let num_aggs = plan.aggregates.len();
|
||||
let start_reg = program.alloc_registers(num_aggs);
|
||||
t_ctx.reg_agg_start = Some(start_reg);
|
||||
|
||||
// In planner.rs, we have collected all aggregates from the SELECT clause, including ones where the aggregate is embedded inside
|
||||
// a more complex expression. Some examples: length(sum(x)), sum(x) + avg(y), sum(x) + 1, etc.
|
||||
// The result of those more complex expressions depends on the final result of the aggregate, so we don't translate the complete expressions here.
|
||||
// Instead, we accumulate the intermediate results of all aggreagates, and evaluate any expressions that do not contain aggregates.
|
||||
for (i, agg) in plan.aggregates.iter().enumerate() {
|
||||
let reg = start_reg + i;
|
||||
translate_aggregation_step(
|
||||
program,
|
||||
&plan.referenced_tables,
|
||||
agg,
|
||||
reg,
|
||||
&t_ctx.resolver,
|
||||
)?;
|
||||
}
|
||||
for (i, rc) in plan.result_columns.iter().enumerate() {
|
||||
if rc.contains_aggregates {
|
||||
// Do nothing, aggregates are computed above
|
||||
// if this result column is e.g. something like sum(x) + 1 or length(sum(x)), we do not want to translate that (+1) or length() yet,
|
||||
// it will be computed after the aggregations are finalized.
|
||||
continue;
|
||||
}
|
||||
let reg = start_reg + num_aggs + i;
|
||||
translate_expr(
|
||||
program,
|
||||
Some(&plan.referenced_tables),
|
||||
&rc.expr,
|
||||
reg,
|
||||
&t_ctx.resolver,
|
||||
)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
LoopEmitTarget::QueryResult => {
|
||||
assert!(
|
||||
plan.aggregates.is_empty(),
|
||||
"We should not get here with aggregates"
|
||||
);
|
||||
emit_select_result(program, t_ctx, plan, t_ctx.label_main_loop_end)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Closes the loop for a given source operator.
|
||||
/// For example in the case of a nested table scan, this means emitting the NextAsync instruction
|
||||
/// for all tables involved, innermost first.
|
||||
pub fn close_loop(
|
||||
program: &mut ProgramBuilder,
|
||||
t_ctx: &mut TranslateCtx,
|
||||
source: &SourceOperator,
|
||||
) -> Result<()> {
|
||||
let loop_labels = *t_ctx
|
||||
.labels_main_loop
|
||||
.get(&source.id())
|
||||
.expect("source has no loop labels");
|
||||
match source {
|
||||
SourceOperator::Subquery { .. } => {
|
||||
program.resolve_label(loop_labels.next, program.offset());
|
||||
// A subquery has no cursor to call NextAsync on, so it just emits a Goto
|
||||
// to the Yield instruction, which in turn jumps back to the main loop of the subquery,
|
||||
// so that the next row from the subquery can be read.
|
||||
program.emit_insn_with_label_dependency(
|
||||
Insn::Goto {
|
||||
target_pc: loop_labels.loop_start,
|
||||
},
|
||||
loop_labels.loop_start,
|
||||
);
|
||||
}
|
||||
SourceOperator::Join {
|
||||
id,
|
||||
left,
|
||||
right,
|
||||
outer,
|
||||
..
|
||||
} => {
|
||||
close_loop(program, t_ctx, right)?;
|
||||
|
||||
if *outer {
|
||||
let lj_meta = t_ctx.meta_left_joins.get(id).unwrap();
|
||||
// The left join match flag is set to 1 when there is any match on the right table
|
||||
// (e.g. SELECT * FROM t1 LEFT JOIN t2 ON t1.a = t2.a).
|
||||
// If the left join match flag has been set to 1, we jump to the next row on the outer table,
|
||||
// i.e. continue to the next row of t1 in our example.
|
||||
program.resolve_label(lj_meta.label_match_flag_check_value, program.offset());
|
||||
let jump_offset = program.offset() + 3;
|
||||
program.emit_insn(Insn::IfPos {
|
||||
reg: lj_meta.reg_match_flag,
|
||||
target_pc: jump_offset,
|
||||
decrement_by: 0,
|
||||
});
|
||||
// If the left join match flag is still 0, it means there was no match on the right table,
|
||||
// but since it's a LEFT JOIN, we still need to emit a row with NULLs for the right table.
|
||||
// In that case, we now enter the routine that does exactly that.
|
||||
// First we set the right table cursor's "pseudo null bit" on, which means any Insn::Column will return NULL
|
||||
let right_cursor_id = match right.as_ref() {
|
||||
SourceOperator::Scan {
|
||||
table_reference, ..
|
||||
} => program.resolve_cursor_id(&table_reference.table_identifier),
|
||||
SourceOperator::Search {
|
||||
table_reference, ..
|
||||
} => program.resolve_cursor_id(&table_reference.table_identifier),
|
||||
_ => unreachable!(),
|
||||
};
|
||||
program.emit_insn(Insn::NullRow {
|
||||
cursor_id: right_cursor_id,
|
||||
});
|
||||
// Then we jump to setting the left join match flag to 1 again,
|
||||
// but this time the right table cursor will set everything to null.
|
||||
// This leads to emitting a row with cols from the left + nulls from the right,
|
||||
// and we will end up back in the IfPos instruction above, which will then
|
||||
// check the match flag again, and since it is now 1, we will jump to the
|
||||
// next row in the left table.
|
||||
program.emit_insn_with_label_dependency(
|
||||
Insn::Goto {
|
||||
target_pc: lj_meta.label_match_flag_set_true,
|
||||
},
|
||||
lj_meta.label_match_flag_set_true,
|
||||
);
|
||||
|
||||
assert!(program.offset() == jump_offset);
|
||||
}
|
||||
|
||||
close_loop(program, t_ctx, left)?;
|
||||
}
|
||||
SourceOperator::Scan {
|
||||
table_reference,
|
||||
iter_dir,
|
||||
..
|
||||
} => {
|
||||
program.resolve_label(loop_labels.next, program.offset());
|
||||
let cursor_id = program.resolve_cursor_id(&table_reference.table_identifier);
|
||||
if iter_dir
|
||||
.as_ref()
|
||||
.is_some_and(|dir| *dir == IterationDirection::Backwards)
|
||||
{
|
||||
program.emit_insn(Insn::PrevAsync { cursor_id });
|
||||
} else {
|
||||
program.emit_insn(Insn::NextAsync { cursor_id });
|
||||
}
|
||||
if iter_dir
|
||||
.as_ref()
|
||||
.is_some_and(|dir| *dir == IterationDirection::Backwards)
|
||||
{
|
||||
program.emit_insn_with_label_dependency(
|
||||
Insn::PrevAwait {
|
||||
cursor_id,
|
||||
pc_if_next: loop_labels.loop_start,
|
||||
},
|
||||
loop_labels.loop_start,
|
||||
);
|
||||
} else {
|
||||
program.emit_insn_with_label_dependency(
|
||||
Insn::NextAwait {
|
||||
cursor_id,
|
||||
pc_if_next: loop_labels.loop_start,
|
||||
},
|
||||
loop_labels.loop_start,
|
||||
);
|
||||
}
|
||||
}
|
||||
SourceOperator::Search {
|
||||
table_reference,
|
||||
search,
|
||||
..
|
||||
} => {
|
||||
program.resolve_label(loop_labels.next, program.offset());
|
||||
if matches!(search, Search::RowidEq { .. }) {
|
||||
// Rowid equality point lookups are handled with a SeekRowid instruction which does not loop, so there is no need to emit a NextAsync instruction.
|
||||
return Ok(());
|
||||
}
|
||||
let cursor_id = match search {
|
||||
Search::IndexSearch { index, .. } => program.resolve_cursor_id(&index.name),
|
||||
Search::RowidSearch { .. } => {
|
||||
program.resolve_cursor_id(&table_reference.table_identifier)
|
||||
}
|
||||
Search::RowidEq { .. } => unreachable!(),
|
||||
};
|
||||
|
||||
program.emit_insn(Insn::NextAsync { cursor_id });
|
||||
program.emit_insn_with_label_dependency(
|
||||
Insn::NextAwait {
|
||||
cursor_id,
|
||||
pc_if_next: loop_labels.loop_start,
|
||||
},
|
||||
loop_labels.loop_start,
|
||||
);
|
||||
}
|
||||
SourceOperator::Nothing { .. } => {}
|
||||
};
|
||||
|
||||
program.resolve_label(loop_labels.loop_end, program.offset());
|
||||
Ok(())
|
||||
}
|
||||
@@ -7,14 +7,20 @@
|
||||
//! a SELECT statement will be translated into a sequence of instructions that
|
||||
//! will read rows from the database and filter them according to a WHERE clause.
|
||||
|
||||
pub(crate) mod aggregation;
|
||||
pub(crate) mod delete;
|
||||
pub(crate) mod emitter;
|
||||
pub(crate) mod expr;
|
||||
pub(crate) mod group_by;
|
||||
pub(crate) mod insert;
|
||||
pub(crate) mod main_loop;
|
||||
pub(crate) mod optimizer;
|
||||
pub(crate) mod order_by;
|
||||
pub(crate) mod plan;
|
||||
pub(crate) mod planner;
|
||||
pub(crate) mod result_row;
|
||||
pub(crate) mod select;
|
||||
pub(crate) mod subquery;
|
||||
|
||||
use crate::schema::Schema;
|
||||
use crate::storage::pager::Pager;
|
||||
|
||||
273
core/translate/order_by.rs
Normal file
273
core/translate/order_by.rs
Normal file
@@ -0,0 +1,273 @@
|
||||
use std::rc::Rc;
|
||||
|
||||
use sqlite3_parser::ast;
|
||||
|
||||
use crate::{
|
||||
schema::{Column, PseudoTable, Table},
|
||||
types::{OwnedRecord, OwnedValue},
|
||||
util::exprs_are_equivalent,
|
||||
vdbe::{builder::ProgramBuilder, insn::Insn},
|
||||
Result,
|
||||
};
|
||||
|
||||
use super::{
|
||||
emitter::TranslateCtx,
|
||||
expr::translate_expr,
|
||||
plan::{Direction, ResultSetColumn, SelectPlan},
|
||||
result_row::emit_result_row_and_limit,
|
||||
};
|
||||
|
||||
// Metadata for handling ORDER BY operations
|
||||
#[derive(Debug)]
|
||||
pub struct SortMetadata {
|
||||
// cursor id for the Sorter table where the sorted rows are stored
|
||||
pub sort_cursor: usize,
|
||||
// register where the sorter data is inserted and later retrieved from
|
||||
pub reg_sorter_data: usize,
|
||||
}
|
||||
|
||||
/// Initialize resources needed for ORDER BY processing
|
||||
pub fn init_order_by(
|
||||
program: &mut ProgramBuilder,
|
||||
t_ctx: &mut TranslateCtx,
|
||||
order_by: &[(ast::Expr, Direction)],
|
||||
) -> Result<()> {
|
||||
let sort_cursor = program.alloc_cursor_id(None, None);
|
||||
t_ctx.meta_sort = Some(SortMetadata {
|
||||
sort_cursor,
|
||||
reg_sorter_data: program.alloc_register(),
|
||||
});
|
||||
let mut order = Vec::new();
|
||||
for (_, direction) in order_by.iter() {
|
||||
order.push(OwnedValue::Integer(*direction as i64));
|
||||
}
|
||||
program.emit_insn(Insn::SorterOpen {
|
||||
cursor_id: sort_cursor,
|
||||
columns: order_by.len(),
|
||||
order: OwnedRecord::new(order),
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Emits the bytecode for outputting rows from an ORDER BY sorter.
|
||||
/// This is called when the main query execution loop has finished processing,
|
||||
/// and we can now emit rows from the ORDER BY sorter.
|
||||
pub fn emit_order_by(
|
||||
program: &mut ProgramBuilder,
|
||||
t_ctx: &mut TranslateCtx,
|
||||
plan: &SelectPlan,
|
||||
) -> Result<()> {
|
||||
let order_by = plan.order_by.as_ref().unwrap();
|
||||
let result_columns = &plan.result_columns;
|
||||
let sort_loop_start_label = program.allocate_label();
|
||||
let sort_loop_end_label = program.allocate_label();
|
||||
let mut pseudo_columns = vec![];
|
||||
for (i, _) in order_by.iter().enumerate() {
|
||||
pseudo_columns.push(Column {
|
||||
// Names don't matter. We are tracking which result column is in which position in the ORDER BY clause in m.result_column_indexes_in_orderby_sorter.
|
||||
name: format!("sort_key_{}", i),
|
||||
primary_key: false,
|
||||
ty: crate::schema::Type::Null,
|
||||
is_rowid_alias: false,
|
||||
});
|
||||
}
|
||||
for (i, rc) in result_columns.iter().enumerate() {
|
||||
// If any result columns are not in the ORDER BY sorter, it's because they are equal to a sort key and were already added to the pseudo columns above.
|
||||
if let Some(ref v) = t_ctx.result_columns_to_skip_in_orderby_sorter {
|
||||
if v.contains(&i) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
pseudo_columns.push(Column {
|
||||
name: rc.expr.to_string(),
|
||||
primary_key: false,
|
||||
ty: crate::schema::Type::Null,
|
||||
is_rowid_alias: false,
|
||||
});
|
||||
}
|
||||
|
||||
let num_columns_in_sorter = order_by.len() + result_columns.len()
|
||||
- t_ctx
|
||||
.result_columns_to_skip_in_orderby_sorter
|
||||
.as_ref()
|
||||
.map(|v| v.len())
|
||||
.unwrap_or(0);
|
||||
|
||||
let pseudo_cursor = program.alloc_cursor_id(
|
||||
None,
|
||||
Some(Table::Pseudo(Rc::new(PseudoTable {
|
||||
columns: pseudo_columns,
|
||||
}))),
|
||||
);
|
||||
let SortMetadata {
|
||||
sort_cursor,
|
||||
reg_sorter_data,
|
||||
} = *t_ctx.meta_sort.as_mut().unwrap();
|
||||
|
||||
program.emit_insn(Insn::OpenPseudo {
|
||||
cursor_id: pseudo_cursor,
|
||||
content_reg: reg_sorter_data,
|
||||
num_fields: num_columns_in_sorter,
|
||||
});
|
||||
|
||||
program.emit_insn_with_label_dependency(
|
||||
Insn::SorterSort {
|
||||
cursor_id: sort_cursor,
|
||||
pc_if_empty: sort_loop_end_label,
|
||||
},
|
||||
sort_loop_end_label,
|
||||
);
|
||||
|
||||
program.defer_label_resolution(sort_loop_start_label, program.offset() as usize);
|
||||
program.emit_insn(Insn::SorterData {
|
||||
cursor_id: sort_cursor,
|
||||
dest_reg: reg_sorter_data,
|
||||
pseudo_cursor,
|
||||
});
|
||||
|
||||
// We emit the columns in SELECT order, not sorter order (sorter always has the sort keys first).
|
||||
// This is tracked in m.result_column_indexes_in_orderby_sorter.
|
||||
let cursor_id = pseudo_cursor;
|
||||
let start_reg = t_ctx.reg_result_cols_start.unwrap();
|
||||
for i in 0..result_columns.len() {
|
||||
let reg = start_reg + i;
|
||||
program.emit_insn(Insn::Column {
|
||||
cursor_id,
|
||||
column: t_ctx.result_column_indexes_in_orderby_sorter[&i],
|
||||
dest: reg,
|
||||
});
|
||||
}
|
||||
|
||||
emit_result_row_and_limit(program, t_ctx, plan, start_reg, Some(sort_loop_end_label))?;
|
||||
|
||||
program.emit_insn_with_label_dependency(
|
||||
Insn::SorterNext {
|
||||
cursor_id: sort_cursor,
|
||||
pc_if_next: sort_loop_start_label,
|
||||
},
|
||||
sort_loop_start_label,
|
||||
);
|
||||
|
||||
program.resolve_label(sort_loop_end_label, program.offset());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Emits the bytecode for inserting a row into an ORDER BY sorter.
|
||||
pub fn order_by_sorter_insert(
|
||||
program: &mut ProgramBuilder,
|
||||
t_ctx: &mut TranslateCtx,
|
||||
plan: &SelectPlan,
|
||||
) -> Result<()> {
|
||||
let order_by = plan.order_by.as_ref().unwrap();
|
||||
let order_by_len = order_by.len();
|
||||
let result_columns = &plan.result_columns;
|
||||
// If any result columns can be skipped due to being an exact duplicate of a sort key, we need to know which ones and their new index in the ORDER BY sorter.
|
||||
let result_columns_to_skip = order_by_deduplicate_result_columns(order_by, result_columns);
|
||||
let result_columns_to_skip_len = result_columns_to_skip
|
||||
.as_ref()
|
||||
.map(|v| v.len())
|
||||
.unwrap_or(0);
|
||||
|
||||
// The ORDER BY sorter has the sort keys first, then the result columns.
|
||||
let orderby_sorter_column_count =
|
||||
order_by_len + result_columns.len() - result_columns_to_skip_len;
|
||||
let start_reg = program.alloc_registers(orderby_sorter_column_count);
|
||||
for (i, (expr, _)) in order_by.iter().enumerate() {
|
||||
let key_reg = start_reg + i;
|
||||
translate_expr(
|
||||
program,
|
||||
Some(&plan.referenced_tables),
|
||||
expr,
|
||||
key_reg,
|
||||
&t_ctx.resolver,
|
||||
)?;
|
||||
}
|
||||
let mut cur_reg = start_reg + order_by_len;
|
||||
let mut cur_idx_in_orderby_sorter = order_by_len;
|
||||
for (i, rc) in result_columns.iter().enumerate() {
|
||||
if let Some(ref v) = result_columns_to_skip {
|
||||
let found = v.iter().find(|(skipped_idx, _)| *skipped_idx == i);
|
||||
// If the result column is in the list of columns to skip, we need to know its new index in the ORDER BY sorter.
|
||||
if let Some((_, result_column_idx)) = found {
|
||||
t_ctx
|
||||
.result_column_indexes_in_orderby_sorter
|
||||
.insert(i, *result_column_idx);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
translate_expr(
|
||||
program,
|
||||
Some(&plan.referenced_tables),
|
||||
&rc.expr,
|
||||
cur_reg,
|
||||
&t_ctx.resolver,
|
||||
)?;
|
||||
t_ctx
|
||||
.result_column_indexes_in_orderby_sorter
|
||||
.insert(i, cur_idx_in_orderby_sorter);
|
||||
cur_idx_in_orderby_sorter += 1;
|
||||
cur_reg += 1;
|
||||
}
|
||||
|
||||
let SortMetadata {
|
||||
sort_cursor,
|
||||
reg_sorter_data,
|
||||
} = *t_ctx.meta_sort.as_mut().unwrap();
|
||||
|
||||
sorter_insert(
|
||||
program,
|
||||
start_reg,
|
||||
orderby_sorter_column_count,
|
||||
sort_cursor,
|
||||
reg_sorter_data,
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Emits the bytecode for inserting a row into a sorter.
|
||||
/// This can be either a GROUP BY sorter or an ORDER BY sorter.
|
||||
pub fn sorter_insert(
|
||||
program: &mut ProgramBuilder,
|
||||
start_reg: usize,
|
||||
column_count: usize,
|
||||
cursor_id: usize,
|
||||
record_reg: usize,
|
||||
) {
|
||||
program.emit_insn(Insn::MakeRecord {
|
||||
start_reg,
|
||||
count: column_count,
|
||||
dest_reg: record_reg,
|
||||
});
|
||||
program.emit_insn(Insn::SorterInsert {
|
||||
cursor_id,
|
||||
record_reg,
|
||||
});
|
||||
}
|
||||
|
||||
/// In case any of the ORDER BY sort keys are exactly equal to a result column, we can skip emitting that result column.
|
||||
/// If we skip a result column, we need to keep track what index in the ORDER BY sorter the result columns have,
|
||||
/// because the result columns should be emitted in the SELECT clause order, not the ORDER BY clause order.
|
||||
///
|
||||
/// If any result columns can be skipped, this returns list of 2-tuples of (SkippedResultColumnIndex: usize, ResultColumnIndexInOrderBySorter: usize)
|
||||
pub fn order_by_deduplicate_result_columns(
|
||||
order_by: &[(ast::Expr, Direction)],
|
||||
result_columns: &[ResultSetColumn],
|
||||
) -> Option<Vec<(usize, usize)>> {
|
||||
let mut result_column_remapping: Option<Vec<(usize, usize)>> = None;
|
||||
for (i, rc) in result_columns.iter().enumerate() {
|
||||
let found = order_by
|
||||
.iter()
|
||||
.enumerate()
|
||||
.find(|(_, (expr, _))| exprs_are_equivalent(expr, &rc.expr));
|
||||
if let Some((j, _)) = found {
|
||||
if let Some(ref mut v) = result_column_remapping {
|
||||
v.push((i, j));
|
||||
} else {
|
||||
result_column_remapping = Some(vec![(i, j)]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
result_column_remapping
|
||||
}
|
||||
83
core/translate/result_row.rs
Normal file
83
core/translate/result_row.rs
Normal file
@@ -0,0 +1,83 @@
|
||||
use crate::{
|
||||
vdbe::{builder::ProgramBuilder, insn::Insn, BranchOffset},
|
||||
Result,
|
||||
};
|
||||
|
||||
use super::{
|
||||
emitter::TranslateCtx,
|
||||
expr::translate_expr,
|
||||
plan::{SelectPlan, SelectQueryType},
|
||||
};
|
||||
|
||||
/// Emits the bytecode for:
|
||||
/// - all result columns
|
||||
/// - result row (or if a subquery, yields to the parent query)
|
||||
/// - limit
|
||||
pub fn emit_select_result(
|
||||
program: &mut ProgramBuilder,
|
||||
t_ctx: &mut TranslateCtx,
|
||||
plan: &SelectPlan,
|
||||
label_on_limit_reached: Option<BranchOffset>,
|
||||
) -> Result<()> {
|
||||
let start_reg = t_ctx.reg_result_cols_start.unwrap();
|
||||
for (i, rc) in plan.result_columns.iter().enumerate() {
|
||||
let reg = start_reg + i;
|
||||
translate_expr(
|
||||
program,
|
||||
Some(&plan.referenced_tables),
|
||||
&rc.expr,
|
||||
reg,
|
||||
&t_ctx.resolver,
|
||||
)?;
|
||||
}
|
||||
emit_result_row_and_limit(program, t_ctx, plan, start_reg, label_on_limit_reached)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Emits the bytecode for:
|
||||
/// - result row (or if a subquery, yields to the parent query)
|
||||
/// - limit
|
||||
pub fn emit_result_row_and_limit(
|
||||
program: &mut ProgramBuilder,
|
||||
t_ctx: &mut TranslateCtx,
|
||||
plan: &SelectPlan,
|
||||
result_columns_start_reg: usize,
|
||||
label_on_limit_reached: Option<BranchOffset>,
|
||||
) -> Result<()> {
|
||||
match &plan.query_type {
|
||||
SelectQueryType::TopLevel => {
|
||||
program.emit_insn(Insn::ResultRow {
|
||||
start_reg: result_columns_start_reg,
|
||||
count: plan.result_columns.len(),
|
||||
});
|
||||
}
|
||||
SelectQueryType::Subquery { yield_reg, .. } => {
|
||||
program.emit_insn(Insn::Yield {
|
||||
yield_reg: *yield_reg,
|
||||
end_offset: 0,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(limit) = plan.limit {
|
||||
if label_on_limit_reached.is_none() {
|
||||
// There are cases where LIMIT is ignored, e.g. aggregation without a GROUP BY clause.
|
||||
// We already early return on LIMIT 0, so we can just return here since the n of rows
|
||||
// is always 1 here.
|
||||
return Ok(());
|
||||
}
|
||||
program.emit_insn(Insn::Integer {
|
||||
value: limit as i64,
|
||||
dest: t_ctx.reg_limit.unwrap(),
|
||||
});
|
||||
program.mark_last_insn_constant();
|
||||
program.emit_insn_with_label_dependency(
|
||||
Insn::DecrJumpZero {
|
||||
reg: t_ctx.reg_limit.unwrap(),
|
||||
target_pc: label_on_limit_reached.unwrap(),
|
||||
},
|
||||
label_on_limit_reached.unwrap(),
|
||||
);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
125
core/translate/subquery.rs
Normal file
125
core/translate/subquery.rs
Normal file
@@ -0,0 +1,125 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use crate::{
|
||||
vdbe::{builder::ProgramBuilder, insn::Insn},
|
||||
Result,
|
||||
};
|
||||
|
||||
use super::{
|
||||
emitter::{emit_query, Resolver, TranslateCtx},
|
||||
plan::{SelectPlan, SelectQueryType, SourceOperator, TableReference, TableReferenceType},
|
||||
};
|
||||
|
||||
/// Emit the subqueries contained in the FROM clause.
|
||||
/// This is done first so the results can be read in the main query loop.
|
||||
pub fn emit_subqueries<'a>(
|
||||
program: &mut ProgramBuilder,
|
||||
t_ctx: &mut TranslateCtx<'a>,
|
||||
referenced_tables: &mut [TableReference],
|
||||
source: &mut SourceOperator,
|
||||
) -> Result<()> {
|
||||
match source {
|
||||
SourceOperator::Subquery {
|
||||
table_reference,
|
||||
plan,
|
||||
..
|
||||
} => {
|
||||
// Emit the subquery and get the start register of the result columns.
|
||||
let result_columns_start = emit_subquery(program, plan, t_ctx)?;
|
||||
// Set the result_columns_start_reg in the TableReference object.
|
||||
// This is done so that translate_expr() can read the result columns of the subquery,
|
||||
// as if it were reading from a regular table.
|
||||
let table_ref = referenced_tables
|
||||
.iter_mut()
|
||||
.find(|t| t.table_identifier == table_reference.table_identifier)
|
||||
.unwrap();
|
||||
if let TableReferenceType::Subquery {
|
||||
result_columns_start_reg,
|
||||
..
|
||||
} = &mut table_ref.reference_type
|
||||
{
|
||||
*result_columns_start_reg = result_columns_start;
|
||||
} else {
|
||||
unreachable!("emit_subqueries called on non-subquery");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
SourceOperator::Join { left, right, .. } => {
|
||||
emit_subqueries(program, t_ctx, referenced_tables, left)?;
|
||||
emit_subqueries(program, t_ctx, referenced_tables, right)?;
|
||||
Ok(())
|
||||
}
|
||||
_ => Ok(()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Emit a subquery and return the start register of the result columns.
|
||||
/// This is done by emitting a coroutine that stores the result columns in sequential registers.
|
||||
/// Each subquery in a FROM clause has its own separate SelectPlan which is wrapped in a coroutine.
|
||||
///
|
||||
/// The resulting bytecode from a subquery is mostly exactly the same as a regular query, except:
|
||||
/// - it ends in an EndCoroutine instead of a Halt.
|
||||
/// - instead of emitting ResultRows, the coroutine yields to the main query loop.
|
||||
/// - the first register of the result columns is returned to the parent query,
|
||||
/// so that translate_expr() can read the result columns of the subquery,
|
||||
/// as if it were reading from a regular table.
|
||||
///
|
||||
/// Since a subquery has its own SelectPlan, it can contain nested subqueries,
|
||||
/// which can contain even more nested subqueries, etc.
|
||||
pub fn emit_subquery<'a>(
|
||||
program: &mut ProgramBuilder,
|
||||
plan: &mut SelectPlan,
|
||||
t_ctx: &mut TranslateCtx<'a>,
|
||||
) -> Result<usize> {
|
||||
let yield_reg = program.alloc_register();
|
||||
let coroutine_implementation_start_offset = program.offset() + 1;
|
||||
match &mut plan.query_type {
|
||||
SelectQueryType::Subquery {
|
||||
yield_reg: y,
|
||||
coroutine_implementation_start,
|
||||
} => {
|
||||
// The parent query will use this register to jump to/from the subquery.
|
||||
*y = yield_reg;
|
||||
// The parent query will use this register to reinitialize the coroutine when it needs to run multiple times.
|
||||
*coroutine_implementation_start = coroutine_implementation_start_offset;
|
||||
}
|
||||
_ => unreachable!("emit_subquery called on non-subquery"),
|
||||
}
|
||||
let end_coroutine_label = program.allocate_label();
|
||||
let mut metadata = TranslateCtx {
|
||||
labels_main_loop: HashMap::new(),
|
||||
label_main_loop_end: None,
|
||||
meta_group_by: None,
|
||||
meta_left_joins: HashMap::new(),
|
||||
meta_sort: None,
|
||||
reg_agg_start: None,
|
||||
reg_result_cols_start: None,
|
||||
result_column_indexes_in_orderby_sorter: HashMap::new(),
|
||||
result_columns_to_skip_in_orderby_sorter: None,
|
||||
reg_limit: plan.limit.map(|_| program.alloc_register()),
|
||||
resolver: Resolver::new(t_ctx.resolver.symbol_table),
|
||||
};
|
||||
let subquery_body_end_label = program.allocate_label();
|
||||
program.emit_insn_with_label_dependency(
|
||||
Insn::InitCoroutine {
|
||||
yield_reg,
|
||||
jump_on_definition: subquery_body_end_label,
|
||||
start_offset: coroutine_implementation_start_offset,
|
||||
},
|
||||
subquery_body_end_label,
|
||||
);
|
||||
// Normally we mark each LIMIT value as a constant insn that is emitted only once, but in the case of a subquery,
|
||||
// we need to initialize it every time the subquery is run; otherwise subsequent runs of the subquery will already
|
||||
// have the LIMIT counter at 0, and will never return rows.
|
||||
if let Some(limit) = plan.limit {
|
||||
program.emit_insn(Insn::Integer {
|
||||
value: limit as i64,
|
||||
dest: metadata.reg_limit.unwrap(),
|
||||
});
|
||||
}
|
||||
let result_column_start_reg = emit_query(program, plan, &mut metadata)?;
|
||||
program.resolve_label(end_coroutine_label, program.offset());
|
||||
program.emit_insn(Insn::EndCoroutine { yield_reg });
|
||||
program.resolve_label(subquery_body_end_label, program.offset());
|
||||
Ok(result_column_start_reg)
|
||||
}
|
||||
Reference in New Issue
Block a user