mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-08 02:34:20 +01:00
Merge 'Add support for DISTINCT aggregate functions' from Jussi Saurio
Reviewable commit by commit. CI failures are not related. Adds support for e.g. `select first_name, sum(distinct age), count(distinct age), avg(distinct age) from users group by 1` Implementation details: - Creates an ephemeral index per distinct aggregate, and jumps over the accumulation step if a duplicate is found Closes #1507
This commit is contained in:
@@ -5441,9 +5441,12 @@ fn debug_validate_cells_core(page: &PageContent, usable_space: u16) {
|
||||
usable_space as usize,
|
||||
);
|
||||
let buf = &page.as_ptr()[offset..offset + size];
|
||||
// E.g. the following table btree cell may just have two bytes:
|
||||
// Payload size 0 (stored as SerialTypeKind::ConstInt0)
|
||||
// Rowid 1 (stored as SerialTypeKind::ConstInt1)
|
||||
assert!(
|
||||
size >= 4,
|
||||
"cell size should be at least 4 bytes idx={}, cell={:?}, offset={}",
|
||||
size >= 2,
|
||||
"cell size should be at least 2 bytes idx={}, cell={:?}, offset={}",
|
||||
i,
|
||||
buf,
|
||||
offset
|
||||
|
||||
@@ -2,14 +2,17 @@ use limbo_sqlite3_parser::ast;
|
||||
|
||||
use crate::{
|
||||
function::AggFunc,
|
||||
vdbe::{builder::ProgramBuilder, insn::Insn},
|
||||
vdbe::{
|
||||
builder::ProgramBuilder,
|
||||
insn::{IdxInsertFlags, Insn},
|
||||
},
|
||||
LimboError, Result,
|
||||
};
|
||||
|
||||
use super::{
|
||||
emitter::{Resolver, TranslateCtx},
|
||||
expr::translate_expr,
|
||||
plan::{Aggregate, SelectPlan, TableReference},
|
||||
plan::{AggDistinctness, Aggregate, SelectPlan, TableReference},
|
||||
result_row::emit_select_result,
|
||||
};
|
||||
|
||||
@@ -57,6 +60,39 @@ pub fn emit_ungrouped_aggregation<'a>(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Emits the bytecode for handling duplicates in a distinct aggregate.
|
||||
/// This is used in both GROUP BY and non-GROUP BY aggregations to jump over
|
||||
/// the AggStep that would otherwise accumulate the same value multiple times.
|
||||
pub fn handle_distinct(program: &mut ProgramBuilder, agg: &Aggregate, agg_arg_reg: usize) {
|
||||
let AggDistinctness::Distinct { ctx } = &agg.distinctness else {
|
||||
return;
|
||||
};
|
||||
let distinct_agg_ctx = ctx
|
||||
.as_ref()
|
||||
.expect("distinct aggregate context not populated");
|
||||
let num_regs = 1;
|
||||
program.emit_insn(Insn::Found {
|
||||
cursor_id: distinct_agg_ctx.cursor_id,
|
||||
target_pc: distinct_agg_ctx.label_on_conflict,
|
||||
record_reg: agg_arg_reg,
|
||||
num_regs,
|
||||
});
|
||||
let record_reg = program.alloc_register();
|
||||
program.emit_insn(Insn::MakeRecord {
|
||||
start_reg: agg_arg_reg,
|
||||
count: num_regs,
|
||||
dest_reg: record_reg,
|
||||
index_name: Some(distinct_agg_ctx.ephemeral_index_name.to_string()),
|
||||
});
|
||||
program.emit_insn(Insn::IdxInsert {
|
||||
cursor_id: distinct_agg_ctx.cursor_id,
|
||||
record_reg: record_reg,
|
||||
unpacked_start: None,
|
||||
unpacked_count: None,
|
||||
flags: IdxInsertFlags::new(),
|
||||
});
|
||||
}
|
||||
|
||||
/// Emits the bytecode for processing an aggregate step.
|
||||
/// E.g. in `SELECT SUM(price) FROM t`, 'price' is evaluated for every row, and the result is added to the accumulator.
|
||||
///
|
||||
@@ -77,6 +113,7 @@ pub fn translate_aggregation_step(
|
||||
let expr = &agg.args[0];
|
||||
let expr_reg = program.alloc_register();
|
||||
let _ = translate_expr(program, Some(referenced_tables), expr, expr_reg, resolver)?;
|
||||
handle_distinct(program, agg, expr_reg);
|
||||
program.emit_insn(Insn::AggStep {
|
||||
acc_reg: target_register,
|
||||
col: expr_reg,
|
||||
@@ -94,6 +131,7 @@ pub fn translate_aggregation_step(
|
||||
let _ = translate_expr(program, Some(referenced_tables), expr, expr_reg, resolver)?;
|
||||
expr_reg
|
||||
};
|
||||
handle_distinct(program, agg, expr_reg);
|
||||
program.emit_insn(Insn::AggStep {
|
||||
acc_reg: target_register,
|
||||
col: expr_reg,
|
||||
@@ -132,6 +170,7 @@ pub fn translate_aggregation_step(
|
||||
}
|
||||
|
||||
translate_expr(program, Some(referenced_tables), expr, expr_reg, resolver)?;
|
||||
handle_distinct(program, agg, expr_reg);
|
||||
translate_expr(
|
||||
program,
|
||||
Some(referenced_tables),
|
||||
@@ -156,6 +195,7 @@ pub fn translate_aggregation_step(
|
||||
let expr = &agg.args[0];
|
||||
let expr_reg = program.alloc_register();
|
||||
let _ = translate_expr(program, Some(referenced_tables), expr, expr_reg, resolver)?;
|
||||
handle_distinct(program, agg, expr_reg);
|
||||
program.emit_insn(Insn::AggStep {
|
||||
acc_reg: target_register,
|
||||
col: expr_reg,
|
||||
@@ -171,6 +211,7 @@ pub fn translate_aggregation_step(
|
||||
let expr = &agg.args[0];
|
||||
let expr_reg = program.alloc_register();
|
||||
let _ = translate_expr(program, Some(referenced_tables), expr, expr_reg, resolver)?;
|
||||
handle_distinct(program, agg, expr_reg);
|
||||
program.emit_insn(Insn::AggStep {
|
||||
acc_reg: target_register,
|
||||
col: expr_reg,
|
||||
@@ -190,6 +231,7 @@ pub fn translate_aggregation_step(
|
||||
let value_reg = program.alloc_register();
|
||||
|
||||
let _ = translate_expr(program, Some(referenced_tables), expr, expr_reg, resolver)?;
|
||||
handle_distinct(program, agg, expr_reg);
|
||||
let _ = translate_expr(
|
||||
program,
|
||||
Some(referenced_tables),
|
||||
@@ -214,6 +256,7 @@ pub fn translate_aggregation_step(
|
||||
let expr = &agg.args[0];
|
||||
let expr_reg = program.alloc_register();
|
||||
let _ = translate_expr(program, Some(referenced_tables), expr, expr_reg, resolver)?;
|
||||
handle_distinct(program, agg, expr_reg);
|
||||
program.emit_insn(Insn::AggStep {
|
||||
acc_reg: target_register,
|
||||
col: expr_reg,
|
||||
@@ -264,6 +307,7 @@ pub fn translate_aggregation_step(
|
||||
let expr = &agg.args[0];
|
||||
let expr_reg = program.alloc_register();
|
||||
let _ = translate_expr(program, Some(referenced_tables), expr, expr_reg, resolver)?;
|
||||
handle_distinct(program, agg, expr_reg);
|
||||
program.emit_insn(Insn::AggStep {
|
||||
acc_reg: target_register,
|
||||
col: expr_reg,
|
||||
@@ -279,6 +323,7 @@ pub fn translate_aggregation_step(
|
||||
let expr = &agg.args[0];
|
||||
let expr_reg = program.alloc_register();
|
||||
let _ = translate_expr(program, Some(referenced_tables), expr, expr_reg, resolver)?;
|
||||
handle_distinct(program, agg, expr_reg);
|
||||
program.emit_insn(Insn::AggStep {
|
||||
acc_reg: target_register,
|
||||
col: expr_reg,
|
||||
@@ -310,6 +355,10 @@ pub fn translate_aggregation_step(
|
||||
expr_reg + i,
|
||||
resolver,
|
||||
)?;
|
||||
// invariant: distinct aggregates are only supported for single-argument functions
|
||||
if argc == 1 {
|
||||
handle_distinct(program, agg, expr_reg + i);
|
||||
}
|
||||
}
|
||||
program.emit_insn(Insn::AggStep {
|
||||
acc_reg: target_register,
|
||||
|
||||
@@ -285,6 +285,8 @@ pub fn emit_query<'a>(
|
||||
program,
|
||||
t_ctx,
|
||||
&plan.table_references,
|
||||
&mut plan.aggregates,
|
||||
plan.group_by.as_ref(),
|
||||
OperationMode::SELECT,
|
||||
)?;
|
||||
|
||||
@@ -394,6 +396,8 @@ fn emit_program_for_delete(
|
||||
program,
|
||||
&mut t_ctx,
|
||||
&plan.table_references,
|
||||
&mut [],
|
||||
None,
|
||||
OperationMode::DELETE,
|
||||
)?;
|
||||
|
||||
@@ -586,6 +590,8 @@ fn emit_program_for_update(
|
||||
program,
|
||||
&mut t_ctx,
|
||||
&plan.table_references,
|
||||
&mut [],
|
||||
None,
|
||||
OperationMode::UPDATE,
|
||||
)?;
|
||||
// Open indexes for update.
|
||||
|
||||
@@ -16,10 +16,11 @@ use crate::{
|
||||
};
|
||||
|
||||
use super::{
|
||||
aggregation::handle_distinct,
|
||||
emitter::{Resolver, TranslateCtx},
|
||||
expr::{translate_condition_expr, translate_expr, ConditionMetadata},
|
||||
order_by::order_by_sorter_insert,
|
||||
plan::{Aggregate, GroupBy, SelectPlan, TableReference},
|
||||
plan::{AggDistinctness, Aggregate, GroupBy, SelectPlan, TableReference},
|
||||
result_row::emit_select_result,
|
||||
};
|
||||
|
||||
@@ -397,6 +398,14 @@ impl<'a> GroupByAggArgumentSource<'a> {
|
||||
aggregate,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn aggregate(&self) -> &Aggregate {
|
||||
match self {
|
||||
GroupByAggArgumentSource::PseudoCursor { aggregate, .. } => aggregate,
|
||||
GroupByAggArgumentSource::Register { aggregate, .. } => aggregate,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn agg_func(&self) -> &AggFunc {
|
||||
match self {
|
||||
GroupByAggArgumentSource::PseudoCursor { aggregate, .. } => &aggregate.func,
|
||||
@@ -567,6 +576,12 @@ pub fn group_by_process_single_group(
|
||||
agg_result_reg,
|
||||
&t_ctx.resolver,
|
||||
)?;
|
||||
if let AggDistinctness::Distinct { ctx } = &agg.distinctness {
|
||||
let ctx = ctx
|
||||
.as_ref()
|
||||
.expect("distinct aggregate context not populated");
|
||||
program.preassign_label_to_next_insn(ctx.label_on_conflict);
|
||||
}
|
||||
offset += agg.args.len();
|
||||
}
|
||||
|
||||
@@ -905,6 +920,26 @@ pub fn group_by_emit_row_phase<'a>(
|
||||
dest_end: Some(start_reg + plan.group_by_sorter_column_count() - 1),
|
||||
});
|
||||
|
||||
// Reopen ephemeral indexes for distinct aggregates (effectively clearing them).
|
||||
plan.aggregates
|
||||
.iter()
|
||||
.filter_map(|agg| {
|
||||
if let AggDistinctness::Distinct { ctx } = &agg.distinctness {
|
||||
Some(ctx)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.for_each(|ctx| {
|
||||
let ctx = ctx
|
||||
.as_ref()
|
||||
.expect("distinct aggregate context not populated");
|
||||
program.emit_insn(Insn::OpenEphemeral {
|
||||
cursor_id: ctx.cursor_id,
|
||||
is_table: false,
|
||||
});
|
||||
});
|
||||
|
||||
program.emit_insn(Insn::Integer {
|
||||
value: 0,
|
||||
dest: registers.reg_data_in_acc_flag,
|
||||
@@ -936,6 +971,7 @@ pub fn translate_aggregation_step_groupby(
|
||||
crate::bail_parse_error!("avg bad number of arguments");
|
||||
}
|
||||
let expr_reg = agg_arg_source.translate(program, 0)?;
|
||||
handle_distinct(program, agg_arg_source.aggregate(), expr_reg);
|
||||
program.emit_insn(Insn::AggStep {
|
||||
acc_reg: target_register,
|
||||
col: expr_reg,
|
||||
@@ -946,6 +982,7 @@ pub fn translate_aggregation_step_groupby(
|
||||
}
|
||||
AggFunc::Count | AggFunc::Count0 => {
|
||||
let expr_reg = agg_arg_source.translate(program, 0)?;
|
||||
handle_distinct(program, agg_arg_source.aggregate(), expr_reg);
|
||||
program.emit_insn(Insn::AggStep {
|
||||
acc_reg: target_register,
|
||||
col: expr_reg,
|
||||
@@ -983,6 +1020,7 @@ pub fn translate_aggregation_step_groupby(
|
||||
}
|
||||
|
||||
let expr_reg = agg_arg_source.translate(program, 0)?;
|
||||
handle_distinct(program, agg_arg_source.aggregate(), expr_reg);
|
||||
translate_expr(
|
||||
program,
|
||||
Some(referenced_tables),
|
||||
@@ -1005,6 +1043,7 @@ pub fn translate_aggregation_step_groupby(
|
||||
crate::bail_parse_error!("max bad number of arguments");
|
||||
}
|
||||
let expr_reg = agg_arg_source.translate(program, 0)?;
|
||||
handle_distinct(program, agg_arg_source.aggregate(), expr_reg);
|
||||
program.emit_insn(Insn::AggStep {
|
||||
acc_reg: target_register,
|
||||
col: expr_reg,
|
||||
@@ -1018,6 +1057,7 @@ pub fn translate_aggregation_step_groupby(
|
||||
crate::bail_parse_error!("min bad number of arguments");
|
||||
}
|
||||
let expr_reg = agg_arg_source.translate(program, 0)?;
|
||||
handle_distinct(program, agg_arg_source.aggregate(), expr_reg);
|
||||
program.emit_insn(Insn::AggStep {
|
||||
acc_reg: target_register,
|
||||
col: expr_reg,
|
||||
@@ -1032,6 +1072,7 @@ pub fn translate_aggregation_step_groupby(
|
||||
crate::bail_parse_error!("min bad number of arguments");
|
||||
}
|
||||
let expr_reg = agg_arg_source.translate(program, 0)?;
|
||||
handle_distinct(program, agg_arg_source.aggregate(), expr_reg);
|
||||
program.emit_insn(Insn::AggStep {
|
||||
acc_reg: target_register,
|
||||
col: expr_reg,
|
||||
@@ -1047,6 +1088,7 @@ pub fn translate_aggregation_step_groupby(
|
||||
}
|
||||
|
||||
let expr_reg = agg_arg_source.translate(program, 0)?;
|
||||
handle_distinct(program, agg_arg_source.aggregate(), expr_reg);
|
||||
let value_reg = agg_arg_source.translate(program, 1)?;
|
||||
|
||||
program.emit_insn(Insn::AggStep {
|
||||
@@ -1073,6 +1115,7 @@ pub fn translate_aggregation_step_groupby(
|
||||
};
|
||||
|
||||
let expr_reg = agg_arg_source.translate(program, 0)?;
|
||||
handle_distinct(program, agg_arg_source.aggregate(), expr_reg);
|
||||
translate_expr(
|
||||
program,
|
||||
Some(referenced_tables),
|
||||
@@ -1095,6 +1138,7 @@ pub fn translate_aggregation_step_groupby(
|
||||
crate::bail_parse_error!("sum bad number of arguments");
|
||||
}
|
||||
let expr_reg = agg_arg_source.translate(program, 0)?;
|
||||
handle_distinct(program, agg_arg_source.aggregate(), expr_reg);
|
||||
program.emit_insn(Insn::AggStep {
|
||||
acc_reg: target_register,
|
||||
col: expr_reg,
|
||||
@@ -1108,6 +1152,7 @@ pub fn translate_aggregation_step_groupby(
|
||||
crate::bail_parse_error!("total bad number of arguments");
|
||||
}
|
||||
let expr_reg = agg_arg_source.translate(program, 0)?;
|
||||
handle_distinct(program, agg_arg_source.aggregate(), expr_reg);
|
||||
program.emit_insn(Insn::AggStep {
|
||||
acc_reg: target_register,
|
||||
col: expr_reg,
|
||||
|
||||
@@ -1,14 +1,17 @@
|
||||
use limbo_ext::VTabKind;
|
||||
use limbo_sqlite3_parser::ast;
|
||||
use limbo_sqlite3_parser::ast::{self, SortOrder};
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::{
|
||||
schema::{Index, Table},
|
||||
translate::result_row::emit_select_result,
|
||||
schema::{Index, IndexColumn, Table},
|
||||
translate::{
|
||||
plan::{AggDistinctness, DistinctAggCtx},
|
||||
result_row::emit_select_result,
|
||||
},
|
||||
types::SeekOp,
|
||||
vdbe::{
|
||||
builder::ProgramBuilder,
|
||||
builder::{CursorType, ProgramBuilder},
|
||||
insn::{CmpInsFlags, IdxInsertFlags, Insn},
|
||||
BranchOffset, CursorID,
|
||||
},
|
||||
@@ -26,8 +29,8 @@ use super::{
|
||||
optimizer::Optimizable,
|
||||
order_by::{order_by_sorter_insert, sorter_insert},
|
||||
plan::{
|
||||
convert_where_to_vtab_constraint, IterationDirection, JoinOrderMember, Operation, Search,
|
||||
SeekDef, SelectPlan, SelectQueryType, TableReference, WhereTerm,
|
||||
convert_where_to_vtab_constraint, Aggregate, GroupBy, IterationDirection, JoinOrderMember,
|
||||
Operation, Search, SeekDef, SelectPlan, SelectQueryType, TableReference, WhereTerm,
|
||||
},
|
||||
};
|
||||
|
||||
@@ -68,12 +71,57 @@ pub fn init_loop(
|
||||
program: &mut ProgramBuilder,
|
||||
t_ctx: &mut TranslateCtx,
|
||||
tables: &[TableReference],
|
||||
aggregates: &mut [Aggregate],
|
||||
group_by: Option<&GroupBy>,
|
||||
mode: OperationMode,
|
||||
) -> Result<()> {
|
||||
assert!(
|
||||
t_ctx.meta_left_joins.len() == tables.len(),
|
||||
"meta_left_joins length does not match tables length"
|
||||
);
|
||||
// Initialize ephemeral indexes for distinct aggregates
|
||||
for (i, agg) in aggregates
|
||||
.iter_mut()
|
||||
.enumerate()
|
||||
.filter(|(_, agg)| agg.is_distinct())
|
||||
{
|
||||
assert!(
|
||||
agg.args.len() == 1,
|
||||
"DISTINCT aggregate functions must have exactly one argument"
|
||||
);
|
||||
let index_name = format!("distinct_agg_{}_{}", i, agg.args[0]);
|
||||
let index = Arc::new(Index {
|
||||
name: index_name.clone(),
|
||||
table_name: String::new(),
|
||||
ephemeral: true,
|
||||
root_page: 0,
|
||||
columns: vec![IndexColumn {
|
||||
name: agg.args[0].to_string(),
|
||||
order: SortOrder::Asc,
|
||||
pos_in_table: 0,
|
||||
}],
|
||||
unique: false,
|
||||
});
|
||||
let cursor_id = program.alloc_cursor_id(
|
||||
Some(index_name.clone()),
|
||||
CursorType::BTreeIndex(index.clone()),
|
||||
);
|
||||
if group_by.is_none() {
|
||||
// In GROUP BY, the ephemeral index is reinitialized for every group
|
||||
// in the clear accumulator subroutine, so we only do it here if there is no GROUP BY.
|
||||
program.emit_insn(Insn::OpenEphemeral {
|
||||
cursor_id,
|
||||
is_table: false,
|
||||
});
|
||||
}
|
||||
agg.distinctness = AggDistinctness::Distinct {
|
||||
ctx: Some(DistinctAggCtx {
|
||||
cursor_id,
|
||||
ephemeral_index_name: index_name,
|
||||
label_on_conflict: program.allocate_label(),
|
||||
}),
|
||||
};
|
||||
}
|
||||
for (table_index, table) in tables.iter().enumerate() {
|
||||
// Initialize bookkeeping for OUTER JOIN
|
||||
if let Some(join_info) = table.join_info.as_ref() {
|
||||
@@ -721,6 +769,12 @@ fn emit_loop_source<'a>(
|
||||
reg,
|
||||
&t_ctx.resolver,
|
||||
)?;
|
||||
if let AggDistinctness::Distinct { ctx } = &agg.distinctness {
|
||||
let ctx = ctx
|
||||
.as_ref()
|
||||
.expect("distinct aggregate context not populated");
|
||||
program.preassign_label_to_next_insn(ctx.label_on_conflict);
|
||||
}
|
||||
}
|
||||
|
||||
let label_emit_nonagg_only_once = if let Some(flag) = t_ctx.reg_nonagg_emit_once_flag {
|
||||
|
||||
@@ -804,11 +804,53 @@ pub enum Search {
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
|
||||
pub enum AggDistinctness {
|
||||
/// The aggregate is not a DISTINCT aggregate.
|
||||
NonDistinct,
|
||||
/// The aggregate is a DISTINCT aggregate.
|
||||
Distinct { ctx: Option<DistinctAggCtx> },
|
||||
}
|
||||
|
||||
impl AggDistinctness {
|
||||
pub fn from_ast(distinctness: Option<&ast::Distinctness>) -> Self {
|
||||
match distinctness {
|
||||
Some(ast::Distinctness::Distinct) => Self::Distinct { ctx: None },
|
||||
Some(ast::Distinctness::All) => Self::NonDistinct,
|
||||
None => Self::NonDistinct,
|
||||
}
|
||||
}
|
||||
pub fn is_distinct(&self) -> bool {
|
||||
matches!(self, AggDistinctness::Distinct { .. })
|
||||
}
|
||||
}
|
||||
|
||||
/// Translation context for handling distinct aggregates.
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub struct DistinctAggCtx {
|
||||
/// The cursor ID for the ephemeral index opened for the distinct aggregate.
|
||||
/// This is used to track the distinct values and avoid duplicates.
|
||||
pub cursor_id: usize,
|
||||
/// The index name for the ephemeral index opened for the distinct aggregate.
|
||||
pub ephemeral_index_name: String,
|
||||
/// The label for the on conflict branch.
|
||||
/// When a duplicate is found, the program will jump to the offset this label points to.
|
||||
pub label_on_conflict: BranchOffset,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub struct Aggregate {
|
||||
pub func: AggFunc,
|
||||
pub args: Vec<ast::Expr>,
|
||||
pub original_expr: ast::Expr,
|
||||
pub distinctness: AggDistinctness,
|
||||
}
|
||||
|
||||
impl Aggregate {
|
||||
pub fn is_distinct(&self) -> bool {
|
||||
self.distinctness.is_distinct()
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for Aggregate {
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
use super::{
|
||||
plan::{
|
||||
Aggregate, ColumnUsedMask, EvalAt, IterationDirection, JoinInfo, JoinOrderMember,
|
||||
Operation, Plan, ResultSetColumn, SelectPlan, SelectQueryType, TableReference, WhereTerm,
|
||||
AggDistinctness, Aggregate, ColumnUsedMask, EvalAt, IterationDirection, JoinInfo,
|
||||
JoinOrderMember, Operation, Plan, ResultSetColumn, SelectPlan, SelectQueryType,
|
||||
TableReference, WhereTerm,
|
||||
},
|
||||
select::prepare_select_plan,
|
||||
SymbolTable,
|
||||
@@ -19,15 +20,20 @@ use limbo_sqlite3_parser::ast::{
|
||||
|
||||
pub const ROWID: &str = "rowid";
|
||||
|
||||
pub fn resolve_aggregates(expr: &Expr, aggs: &mut Vec<Aggregate>) -> bool {
|
||||
pub fn resolve_aggregates(expr: &Expr, aggs: &mut Vec<Aggregate>) -> Result<bool> {
|
||||
if aggs
|
||||
.iter()
|
||||
.any(|a| exprs_are_equivalent(&a.original_expr, expr))
|
||||
{
|
||||
return true;
|
||||
return Ok(true);
|
||||
}
|
||||
match expr {
|
||||
Expr::FunctionCall { name, args, .. } => {
|
||||
Expr::FunctionCall {
|
||||
name,
|
||||
args,
|
||||
distinctness,
|
||||
..
|
||||
} => {
|
||||
let args_count = if let Some(args) = &args {
|
||||
args.len()
|
||||
} else {
|
||||
@@ -35,21 +41,29 @@ pub fn resolve_aggregates(expr: &Expr, aggs: &mut Vec<Aggregate>) -> bool {
|
||||
};
|
||||
match Func::resolve_function(normalize_ident(name.0.as_str()).as_str(), args_count) {
|
||||
Ok(Func::Agg(f)) => {
|
||||
let distinctness = AggDistinctness::from_ast(distinctness.as_ref());
|
||||
let num_args = args.as_ref().map_or(0, |args| args.len());
|
||||
if distinctness.is_distinct() && num_args != 1 {
|
||||
crate::bail_parse_error!(
|
||||
"DISTINCT aggregate functions must have exactly one argument"
|
||||
);
|
||||
}
|
||||
aggs.push(Aggregate {
|
||||
func: f,
|
||||
args: args.clone().unwrap_or_default(),
|
||||
original_expr: expr.clone(),
|
||||
distinctness,
|
||||
});
|
||||
true
|
||||
Ok(true)
|
||||
}
|
||||
_ => {
|
||||
let mut contains_aggregates = false;
|
||||
if let Some(args) = args {
|
||||
for arg in args.iter() {
|
||||
contains_aggregates |= resolve_aggregates(arg, aggs);
|
||||
contains_aggregates |= resolve_aggregates(arg, aggs)?;
|
||||
}
|
||||
}
|
||||
contains_aggregates
|
||||
Ok(contains_aggregates)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -61,25 +75,26 @@ pub fn resolve_aggregates(expr: &Expr, aggs: &mut Vec<Aggregate>) -> bool {
|
||||
func: f,
|
||||
args: vec![],
|
||||
original_expr: expr.clone(),
|
||||
distinctness: AggDistinctness::NonDistinct,
|
||||
});
|
||||
true
|
||||
Ok(true)
|
||||
} else {
|
||||
false
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
Expr::Binary(lhs, _, rhs) => {
|
||||
let mut contains_aggregates = false;
|
||||
contains_aggregates |= resolve_aggregates(lhs, aggs);
|
||||
contains_aggregates |= resolve_aggregates(rhs, aggs);
|
||||
contains_aggregates
|
||||
contains_aggregates |= resolve_aggregates(lhs, aggs)?;
|
||||
contains_aggregates |= resolve_aggregates(rhs, aggs)?;
|
||||
Ok(contains_aggregates)
|
||||
}
|
||||
Expr::Unary(_, expr) => {
|
||||
let mut contains_aggregates = false;
|
||||
contains_aggregates |= resolve_aggregates(expr, aggs);
|
||||
contains_aggregates
|
||||
contains_aggregates |= resolve_aggregates(expr, aggs)?;
|
||||
Ok(contains_aggregates)
|
||||
}
|
||||
// TODO: handle other expressions that may contain aggregates
|
||||
_ => false,
|
||||
_ => Ok(false),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
use super::emitter::{emit_program, TranslateCtx};
|
||||
use super::plan::{select_star, JoinOrderMember, Operation, Search, SelectQueryType};
|
||||
use super::plan::{
|
||||
select_star, AggDistinctness, JoinOrderMember, Operation, Search, SelectQueryType,
|
||||
};
|
||||
use super::planner::Scope;
|
||||
use crate::function::{AggFunc, ExtFunc, Func};
|
||||
use crate::translate::optimizer::optimize_plan;
|
||||
@@ -159,7 +161,7 @@ pub fn prepare_select_plan<'a>(
|
||||
match expr {
|
||||
ast::Expr::FunctionCall {
|
||||
name,
|
||||
distinctness: _,
|
||||
distinctness,
|
||||
args,
|
||||
filter_over: _,
|
||||
order_by: _,
|
||||
@@ -169,6 +171,10 @@ pub fn prepare_select_plan<'a>(
|
||||
} else {
|
||||
0
|
||||
};
|
||||
let distinctness = AggDistinctness::from_ast(distinctness.as_ref());
|
||||
if distinctness.is_distinct() && args_count != 1 {
|
||||
crate::bail_parse_error!("DISTINCT aggregate functions must have exactly one argument");
|
||||
}
|
||||
match Func::resolve_function(
|
||||
normalize_ident(name.0.as_str()).as_str(),
|
||||
args_count,
|
||||
@@ -192,6 +198,7 @@ pub fn prepare_select_plan<'a>(
|
||||
func: f,
|
||||
args: agg_args.clone(),
|
||||
original_expr: expr.clone(),
|
||||
distinctness,
|
||||
};
|
||||
aggregate_expressions.push(agg.clone());
|
||||
plan.result_columns.push(ResultSetColumn {
|
||||
@@ -205,7 +212,7 @@ pub fn prepare_select_plan<'a>(
|
||||
}
|
||||
Ok(_) => {
|
||||
let contains_aggregates =
|
||||
resolve_aggregates(expr, &mut aggregate_expressions);
|
||||
resolve_aggregates(expr, &mut aggregate_expressions)?;
|
||||
plan.result_columns.push(ResultSetColumn {
|
||||
alias: maybe_alias.as_ref().map(|alias| match alias {
|
||||
ast::As::Elided(alias) => alias.0.clone(),
|
||||
@@ -222,7 +229,7 @@ pub fn prepare_select_plan<'a>(
|
||||
let contains_aggregates = resolve_aggregates(
|
||||
expr,
|
||||
&mut aggregate_expressions,
|
||||
);
|
||||
)?;
|
||||
plan.result_columns.push(ResultSetColumn {
|
||||
alias: maybe_alias.as_ref().map(|alias| {
|
||||
match alias {
|
||||
@@ -240,6 +247,7 @@ pub fn prepare_select_plan<'a>(
|
||||
func: AggFunc::External(f.func.clone().into()),
|
||||
args: args.as_ref().unwrap().clone(),
|
||||
original_expr: expr.clone(),
|
||||
distinctness,
|
||||
};
|
||||
aggregate_expressions.push(agg.clone());
|
||||
plan.result_columns.push(ResultSetColumn {
|
||||
@@ -276,6 +284,7 @@ pub fn prepare_select_plan<'a>(
|
||||
"1".to_string(),
|
||||
))],
|
||||
original_expr: expr.clone(),
|
||||
distinctness: AggDistinctness::NonDistinct,
|
||||
};
|
||||
aggregate_expressions.push(agg.clone());
|
||||
plan.result_columns.push(ResultSetColumn {
|
||||
@@ -295,7 +304,7 @@ pub fn prepare_select_plan<'a>(
|
||||
}
|
||||
expr => {
|
||||
let contains_aggregates =
|
||||
resolve_aggregates(expr, &mut aggregate_expressions);
|
||||
resolve_aggregates(expr, &mut aggregate_expressions)?;
|
||||
plan.result_columns.push(ResultSetColumn {
|
||||
alias: maybe_alias.as_ref().map(|alias| match alias {
|
||||
ast::As::Elided(alias) => alias.0.clone(),
|
||||
@@ -341,7 +350,7 @@ pub fn prepare_select_plan<'a>(
|
||||
Some(&plan.result_columns),
|
||||
)?;
|
||||
let contains_aggregates =
|
||||
resolve_aggregates(expr, &mut aggregate_expressions);
|
||||
resolve_aggregates(expr, &mut aggregate_expressions)?;
|
||||
if !contains_aggregates {
|
||||
// TODO: sqlite allows HAVING clauses with non aggregate expressions like
|
||||
// HAVING id = 5. We should support this too eventually (I guess).
|
||||
@@ -376,7 +385,7 @@ pub fn prepare_select_plan<'a>(
|
||||
&mut plan.table_references,
|
||||
Some(&plan.result_columns),
|
||||
)?;
|
||||
resolve_aggregates(&o.expr, &mut plan.aggregates);
|
||||
resolve_aggregates(&o.expr, &mut plan.aggregates)?;
|
||||
|
||||
key.push((o.expr, o.order.unwrap_or(ast::SortOrder::Asc)));
|
||||
}
|
||||
|
||||
@@ -575,6 +575,12 @@ impl ProgramBuilder {
|
||||
Insn::NoConflict { target_pc, .. } => {
|
||||
resolve(target_pc, "NoConflict");
|
||||
}
|
||||
Insn::Found { target_pc, .. } => {
|
||||
resolve(target_pc, "Found");
|
||||
}
|
||||
Insn::NotFound { target_pc, .. } => {
|
||||
resolve(target_pc, "NotFound");
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4784,23 +4784,31 @@ pub fn op_once(
|
||||
Ok(InsnFunctionStepResult::Step)
|
||||
}
|
||||
|
||||
pub fn op_not_found(
|
||||
pub fn op_found(
|
||||
program: &Program,
|
||||
state: &mut ProgramState,
|
||||
insn: &Insn,
|
||||
pager: &Rc<Pager>,
|
||||
mv_store: Option<&Rc<MvStore>>,
|
||||
) -> Result<InsnFunctionStepResult> {
|
||||
let Insn::NotFound {
|
||||
cursor_id,
|
||||
target_pc,
|
||||
record_reg,
|
||||
num_regs,
|
||||
} = insn
|
||||
else {
|
||||
unreachable!("unexpected Insn {:?}", insn)
|
||||
let (cursor_id, target_pc, record_reg, num_regs) = match insn {
|
||||
Insn::NotFound {
|
||||
cursor_id,
|
||||
target_pc,
|
||||
record_reg,
|
||||
num_regs,
|
||||
} => (cursor_id, target_pc, record_reg, num_regs),
|
||||
Insn::Found {
|
||||
cursor_id,
|
||||
target_pc,
|
||||
record_reg,
|
||||
num_regs,
|
||||
} => (cursor_id, target_pc, record_reg, num_regs),
|
||||
_ => unreachable!("unexpected Insn {:?}", insn),
|
||||
};
|
||||
|
||||
let not = matches!(insn, Insn::NotFound { .. });
|
||||
|
||||
let found = {
|
||||
let mut cursor = state.get_cursor(*cursor_id);
|
||||
let cursor = cursor.as_btree_mut();
|
||||
@@ -4822,10 +4830,11 @@ pub fn op_not_found(
|
||||
}
|
||||
};
|
||||
|
||||
if found {
|
||||
state.pc += 1;
|
||||
} else {
|
||||
let do_jump = (!found && not) || (found && !not);
|
||||
if do_jump {
|
||||
state.pc = target_pc.to_offset_int();
|
||||
} else {
|
||||
state.pc += 1;
|
||||
}
|
||||
|
||||
Ok(InsnFunctionStepResult::Step)
|
||||
|
||||
@@ -1434,16 +1434,30 @@ pub fn insn_to_str(
|
||||
target_pc,
|
||||
record_reg,
|
||||
..
|
||||
}
|
||||
| Insn::Found {
|
||||
cursor_id,
|
||||
target_pc,
|
||||
record_reg,
|
||||
..
|
||||
} => (
|
||||
"NotFound",
|
||||
if matches!(insn, Insn::NotFound { .. }) {
|
||||
"NotFound"
|
||||
} else {
|
||||
"Found"
|
||||
},
|
||||
*cursor_id as i32,
|
||||
target_pc.to_debug_int(),
|
||||
*record_reg as i32,
|
||||
Value::build_text(""),
|
||||
0,
|
||||
format!(
|
||||
"if (r[{}] != NULL) goto {}",
|
||||
record_reg,
|
||||
"if {}found goto {}",
|
||||
if matches!(insn, Insn::NotFound { .. }) {
|
||||
"not "
|
||||
} else {
|
||||
""
|
||||
},
|
||||
target_pc.to_debug_int()
|
||||
),
|
||||
),
|
||||
|
||||
@@ -858,6 +858,15 @@ pub enum Insn {
|
||||
Once {
|
||||
target_pc_when_reentered: BranchOffset,
|
||||
},
|
||||
/// Search for a record in the index cursor.
|
||||
/// If any entry for which the key is a prefix exists, jump to target_pc.
|
||||
/// Otherwise, continue to the next instruction.
|
||||
Found {
|
||||
cursor_id: CursorID,
|
||||
target_pc: BranchOffset,
|
||||
record_reg: usize,
|
||||
num_regs: usize,
|
||||
},
|
||||
/// Search for record in the index cusor, if any entry for which the key is a prefix exists
|
||||
/// is a no-op, otherwise go to target_pc
|
||||
/// Example =>
|
||||
@@ -1004,7 +1013,7 @@ impl Insn {
|
||||
Insn::ReadCookie { .. } => execute::op_read_cookie,
|
||||
Insn::OpenEphemeral { .. } | Insn::OpenAutoindex { .. } => execute::op_open_ephemeral,
|
||||
Insn::Once { .. } => execute::op_once,
|
||||
Insn::NotFound { .. } => execute::op_not_found,
|
||||
Insn::Found { .. } | Insn::NotFound { .. } => execute::op_found,
|
||||
Insn::Affinity { .. } => execute::op_affinity,
|
||||
Insn::IdxDelete { .. } => execute::op_idx_delete,
|
||||
Insn::Count { .. } => execute::op_count,
|
||||
|
||||
Reference in New Issue
Block a user