diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 06c18a0cd..bf7844e3d 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -5441,9 +5441,12 @@ fn debug_validate_cells_core(page: &PageContent, usable_space: u16) { usable_space as usize, ); let buf = &page.as_ptr()[offset..offset + size]; + // E.g. the following table btree cell may just have two bytes: + // Payload size 0 (stored as SerialTypeKind::ConstInt0) + // Rowid 1 (stored as SerialTypeKind::ConstInt1) assert!( - size >= 4, - "cell size should be at least 4 bytes idx={}, cell={:?}, offset={}", + size >= 2, + "cell size should be at least 2 bytes idx={}, cell={:?}, offset={}", i, buf, offset diff --git a/core/translate/aggregation.rs b/core/translate/aggregation.rs index 4ac82bf7c..3e5579d1d 100644 --- a/core/translate/aggregation.rs +++ b/core/translate/aggregation.rs @@ -2,14 +2,17 @@ use limbo_sqlite3_parser::ast; use crate::{ function::AggFunc, - vdbe::{builder::ProgramBuilder, insn::Insn}, + vdbe::{ + builder::ProgramBuilder, + insn::{IdxInsertFlags, Insn}, + }, LimboError, Result, }; use super::{ emitter::{Resolver, TranslateCtx}, expr::translate_expr, - plan::{Aggregate, SelectPlan, TableReference}, + plan::{AggDistinctness, Aggregate, SelectPlan, TableReference}, result_row::emit_select_result, }; @@ -57,6 +60,39 @@ pub fn emit_ungrouped_aggregation<'a>( Ok(()) } +/// Emits the bytecode for handling duplicates in a distinct aggregate. +/// This is used in both GROUP BY and non-GROUP BY aggregations to jump over +/// the AggStep that would otherwise accumulate the same value multiple times. +pub fn handle_distinct(program: &mut ProgramBuilder, agg: &Aggregate, agg_arg_reg: usize) { + let AggDistinctness::Distinct { ctx } = &agg.distinctness else { + return; + }; + let distinct_agg_ctx = ctx + .as_ref() + .expect("distinct aggregate context not populated"); + let num_regs = 1; + program.emit_insn(Insn::Found { + cursor_id: distinct_agg_ctx.cursor_id, + target_pc: distinct_agg_ctx.label_on_conflict, + record_reg: agg_arg_reg, + num_regs, + }); + let record_reg = program.alloc_register(); + program.emit_insn(Insn::MakeRecord { + start_reg: agg_arg_reg, + count: num_regs, + dest_reg: record_reg, + index_name: Some(distinct_agg_ctx.ephemeral_index_name.to_string()), + }); + program.emit_insn(Insn::IdxInsert { + cursor_id: distinct_agg_ctx.cursor_id, + record_reg: record_reg, + unpacked_start: None, + unpacked_count: None, + flags: IdxInsertFlags::new(), + }); +} + /// Emits the bytecode for processing an aggregate step. /// E.g. in `SELECT SUM(price) FROM t`, 'price' is evaluated for every row, and the result is added to the accumulator. /// @@ -77,6 +113,7 @@ pub fn translate_aggregation_step( let expr = &agg.args[0]; let expr_reg = program.alloc_register(); let _ = translate_expr(program, Some(referenced_tables), expr, expr_reg, resolver)?; + handle_distinct(program, agg, expr_reg); program.emit_insn(Insn::AggStep { acc_reg: target_register, col: expr_reg, @@ -94,6 +131,7 @@ pub fn translate_aggregation_step( let _ = translate_expr(program, Some(referenced_tables), expr, expr_reg, resolver)?; expr_reg }; + handle_distinct(program, agg, expr_reg); program.emit_insn(Insn::AggStep { acc_reg: target_register, col: expr_reg, @@ -132,6 +170,7 @@ pub fn translate_aggregation_step( } translate_expr(program, Some(referenced_tables), expr, expr_reg, resolver)?; + handle_distinct(program, agg, expr_reg); translate_expr( program, Some(referenced_tables), @@ -156,6 +195,7 @@ pub fn translate_aggregation_step( let expr = &agg.args[0]; let expr_reg = program.alloc_register(); let _ = translate_expr(program, Some(referenced_tables), expr, expr_reg, resolver)?; + handle_distinct(program, agg, expr_reg); program.emit_insn(Insn::AggStep { acc_reg: target_register, col: expr_reg, @@ -171,6 +211,7 @@ pub fn translate_aggregation_step( let expr = &agg.args[0]; let expr_reg = program.alloc_register(); let _ = translate_expr(program, Some(referenced_tables), expr, expr_reg, resolver)?; + handle_distinct(program, agg, expr_reg); program.emit_insn(Insn::AggStep { acc_reg: target_register, col: expr_reg, @@ -190,6 +231,7 @@ pub fn translate_aggregation_step( let value_reg = program.alloc_register(); let _ = translate_expr(program, Some(referenced_tables), expr, expr_reg, resolver)?; + handle_distinct(program, agg, expr_reg); let _ = translate_expr( program, Some(referenced_tables), @@ -214,6 +256,7 @@ pub fn translate_aggregation_step( let expr = &agg.args[0]; let expr_reg = program.alloc_register(); let _ = translate_expr(program, Some(referenced_tables), expr, expr_reg, resolver)?; + handle_distinct(program, agg, expr_reg); program.emit_insn(Insn::AggStep { acc_reg: target_register, col: expr_reg, @@ -264,6 +307,7 @@ pub fn translate_aggregation_step( let expr = &agg.args[0]; let expr_reg = program.alloc_register(); let _ = translate_expr(program, Some(referenced_tables), expr, expr_reg, resolver)?; + handle_distinct(program, agg, expr_reg); program.emit_insn(Insn::AggStep { acc_reg: target_register, col: expr_reg, @@ -279,6 +323,7 @@ pub fn translate_aggregation_step( let expr = &agg.args[0]; let expr_reg = program.alloc_register(); let _ = translate_expr(program, Some(referenced_tables), expr, expr_reg, resolver)?; + handle_distinct(program, agg, expr_reg); program.emit_insn(Insn::AggStep { acc_reg: target_register, col: expr_reg, @@ -310,6 +355,10 @@ pub fn translate_aggregation_step( expr_reg + i, resolver, )?; + // invariant: distinct aggregates are only supported for single-argument functions + if argc == 1 { + handle_distinct(program, agg, expr_reg + i); + } } program.emit_insn(Insn::AggStep { acc_reg: target_register, diff --git a/core/translate/emitter.rs b/core/translate/emitter.rs index 38a2410bb..0ee730c91 100644 --- a/core/translate/emitter.rs +++ b/core/translate/emitter.rs @@ -285,6 +285,8 @@ pub fn emit_query<'a>( program, t_ctx, &plan.table_references, + &mut plan.aggregates, + plan.group_by.as_ref(), OperationMode::SELECT, )?; @@ -394,6 +396,8 @@ fn emit_program_for_delete( program, &mut t_ctx, &plan.table_references, + &mut [], + None, OperationMode::DELETE, )?; @@ -586,6 +590,8 @@ fn emit_program_for_update( program, &mut t_ctx, &plan.table_references, + &mut [], + None, OperationMode::UPDATE, )?; // Open indexes for update. diff --git a/core/translate/group_by.rs b/core/translate/group_by.rs index 25afebd8e..6f099c898 100644 --- a/core/translate/group_by.rs +++ b/core/translate/group_by.rs @@ -16,10 +16,11 @@ use crate::{ }; use super::{ + aggregation::handle_distinct, emitter::{Resolver, TranslateCtx}, expr::{translate_condition_expr, translate_expr, ConditionMetadata}, order_by::order_by_sorter_insert, - plan::{Aggregate, GroupBy, SelectPlan, TableReference}, + plan::{AggDistinctness, Aggregate, GroupBy, SelectPlan, TableReference}, result_row::emit_select_result, }; @@ -397,6 +398,14 @@ impl<'a> GroupByAggArgumentSource<'a> { aggregate, } } + + pub fn aggregate(&self) -> &Aggregate { + match self { + GroupByAggArgumentSource::PseudoCursor { aggregate, .. } => aggregate, + GroupByAggArgumentSource::Register { aggregate, .. } => aggregate, + } + } + pub fn agg_func(&self) -> &AggFunc { match self { GroupByAggArgumentSource::PseudoCursor { aggregate, .. } => &aggregate.func, @@ -567,6 +576,12 @@ pub fn group_by_process_single_group( agg_result_reg, &t_ctx.resolver, )?; + if let AggDistinctness::Distinct { ctx } = &agg.distinctness { + let ctx = ctx + .as_ref() + .expect("distinct aggregate context not populated"); + program.preassign_label_to_next_insn(ctx.label_on_conflict); + } offset += agg.args.len(); } @@ -905,6 +920,26 @@ pub fn group_by_emit_row_phase<'a>( dest_end: Some(start_reg + plan.group_by_sorter_column_count() - 1), }); + // Reopen ephemeral indexes for distinct aggregates (effectively clearing them). + plan.aggregates + .iter() + .filter_map(|agg| { + if let AggDistinctness::Distinct { ctx } = &agg.distinctness { + Some(ctx) + } else { + None + } + }) + .for_each(|ctx| { + let ctx = ctx + .as_ref() + .expect("distinct aggregate context not populated"); + program.emit_insn(Insn::OpenEphemeral { + cursor_id: ctx.cursor_id, + is_table: false, + }); + }); + program.emit_insn(Insn::Integer { value: 0, dest: registers.reg_data_in_acc_flag, @@ -936,6 +971,7 @@ pub fn translate_aggregation_step_groupby( crate::bail_parse_error!("avg bad number of arguments"); } let expr_reg = agg_arg_source.translate(program, 0)?; + handle_distinct(program, agg_arg_source.aggregate(), expr_reg); program.emit_insn(Insn::AggStep { acc_reg: target_register, col: expr_reg, @@ -946,6 +982,7 @@ pub fn translate_aggregation_step_groupby( } AggFunc::Count | AggFunc::Count0 => { let expr_reg = agg_arg_source.translate(program, 0)?; + handle_distinct(program, agg_arg_source.aggregate(), expr_reg); program.emit_insn(Insn::AggStep { acc_reg: target_register, col: expr_reg, @@ -983,6 +1020,7 @@ pub fn translate_aggregation_step_groupby( } let expr_reg = agg_arg_source.translate(program, 0)?; + handle_distinct(program, agg_arg_source.aggregate(), expr_reg); translate_expr( program, Some(referenced_tables), @@ -1005,6 +1043,7 @@ pub fn translate_aggregation_step_groupby( crate::bail_parse_error!("max bad number of arguments"); } let expr_reg = agg_arg_source.translate(program, 0)?; + handle_distinct(program, agg_arg_source.aggregate(), expr_reg); program.emit_insn(Insn::AggStep { acc_reg: target_register, col: expr_reg, @@ -1018,6 +1057,7 @@ pub fn translate_aggregation_step_groupby( crate::bail_parse_error!("min bad number of arguments"); } let expr_reg = agg_arg_source.translate(program, 0)?; + handle_distinct(program, agg_arg_source.aggregate(), expr_reg); program.emit_insn(Insn::AggStep { acc_reg: target_register, col: expr_reg, @@ -1032,6 +1072,7 @@ pub fn translate_aggregation_step_groupby( crate::bail_parse_error!("min bad number of arguments"); } let expr_reg = agg_arg_source.translate(program, 0)?; + handle_distinct(program, agg_arg_source.aggregate(), expr_reg); program.emit_insn(Insn::AggStep { acc_reg: target_register, col: expr_reg, @@ -1047,6 +1088,7 @@ pub fn translate_aggregation_step_groupby( } let expr_reg = agg_arg_source.translate(program, 0)?; + handle_distinct(program, agg_arg_source.aggregate(), expr_reg); let value_reg = agg_arg_source.translate(program, 1)?; program.emit_insn(Insn::AggStep { @@ -1073,6 +1115,7 @@ pub fn translate_aggregation_step_groupby( }; let expr_reg = agg_arg_source.translate(program, 0)?; + handle_distinct(program, agg_arg_source.aggregate(), expr_reg); translate_expr( program, Some(referenced_tables), @@ -1095,6 +1138,7 @@ pub fn translate_aggregation_step_groupby( crate::bail_parse_error!("sum bad number of arguments"); } let expr_reg = agg_arg_source.translate(program, 0)?; + handle_distinct(program, agg_arg_source.aggregate(), expr_reg); program.emit_insn(Insn::AggStep { acc_reg: target_register, col: expr_reg, @@ -1108,6 +1152,7 @@ pub fn translate_aggregation_step_groupby( crate::bail_parse_error!("total bad number of arguments"); } let expr_reg = agg_arg_source.translate(program, 0)?; + handle_distinct(program, agg_arg_source.aggregate(), expr_reg); program.emit_insn(Insn::AggStep { acc_reg: target_register, col: expr_reg, diff --git a/core/translate/main_loop.rs b/core/translate/main_loop.rs index 59168509e..574af98c0 100644 --- a/core/translate/main_loop.rs +++ b/core/translate/main_loop.rs @@ -1,14 +1,17 @@ use limbo_ext::VTabKind; -use limbo_sqlite3_parser::ast; +use limbo_sqlite3_parser::ast::{self, SortOrder}; use std::sync::Arc; use crate::{ - schema::{Index, Table}, - translate::result_row::emit_select_result, + schema::{Index, IndexColumn, Table}, + translate::{ + plan::{AggDistinctness, DistinctAggCtx}, + result_row::emit_select_result, + }, types::SeekOp, vdbe::{ - builder::ProgramBuilder, + builder::{CursorType, ProgramBuilder}, insn::{CmpInsFlags, IdxInsertFlags, Insn}, BranchOffset, CursorID, }, @@ -26,8 +29,8 @@ use super::{ optimizer::Optimizable, order_by::{order_by_sorter_insert, sorter_insert}, plan::{ - convert_where_to_vtab_constraint, IterationDirection, JoinOrderMember, Operation, Search, - SeekDef, SelectPlan, SelectQueryType, TableReference, WhereTerm, + convert_where_to_vtab_constraint, Aggregate, GroupBy, IterationDirection, JoinOrderMember, + Operation, Search, SeekDef, SelectPlan, SelectQueryType, TableReference, WhereTerm, }, }; @@ -68,12 +71,57 @@ pub fn init_loop( program: &mut ProgramBuilder, t_ctx: &mut TranslateCtx, tables: &[TableReference], + aggregates: &mut [Aggregate], + group_by: Option<&GroupBy>, mode: OperationMode, ) -> Result<()> { assert!( t_ctx.meta_left_joins.len() == tables.len(), "meta_left_joins length does not match tables length" ); + // Initialize ephemeral indexes for distinct aggregates + for (i, agg) in aggregates + .iter_mut() + .enumerate() + .filter(|(_, agg)| agg.is_distinct()) + { + assert!( + agg.args.len() == 1, + "DISTINCT aggregate functions must have exactly one argument" + ); + let index_name = format!("distinct_agg_{}_{}", i, agg.args[0]); + let index = Arc::new(Index { + name: index_name.clone(), + table_name: String::new(), + ephemeral: true, + root_page: 0, + columns: vec![IndexColumn { + name: agg.args[0].to_string(), + order: SortOrder::Asc, + pos_in_table: 0, + }], + unique: false, + }); + let cursor_id = program.alloc_cursor_id( + Some(index_name.clone()), + CursorType::BTreeIndex(index.clone()), + ); + if group_by.is_none() { + // In GROUP BY, the ephemeral index is reinitialized for every group + // in the clear accumulator subroutine, so we only do it here if there is no GROUP BY. + program.emit_insn(Insn::OpenEphemeral { + cursor_id, + is_table: false, + }); + } + agg.distinctness = AggDistinctness::Distinct { + ctx: Some(DistinctAggCtx { + cursor_id, + ephemeral_index_name: index_name, + label_on_conflict: program.allocate_label(), + }), + }; + } for (table_index, table) in tables.iter().enumerate() { // Initialize bookkeeping for OUTER JOIN if let Some(join_info) = table.join_info.as_ref() { @@ -721,6 +769,12 @@ fn emit_loop_source<'a>( reg, &t_ctx.resolver, )?; + if let AggDistinctness::Distinct { ctx } = &agg.distinctness { + let ctx = ctx + .as_ref() + .expect("distinct aggregate context not populated"); + program.preassign_label_to_next_insn(ctx.label_on_conflict); + } } let label_emit_nonagg_only_once = if let Some(flag) = t_ctx.reg_nonagg_emit_once_flag { diff --git a/core/translate/plan.rs b/core/translate/plan.rs index 48c768f1d..d54b219c4 100644 --- a/core/translate/plan.rs +++ b/core/translate/plan.rs @@ -804,11 +804,53 @@ pub enum Search { }, } +#[derive(Debug, Clone, PartialEq)] + +pub enum AggDistinctness { + /// The aggregate is not a DISTINCT aggregate. + NonDistinct, + /// The aggregate is a DISTINCT aggregate. + Distinct { ctx: Option }, +} + +impl AggDistinctness { + pub fn from_ast(distinctness: Option<&ast::Distinctness>) -> Self { + match distinctness { + Some(ast::Distinctness::Distinct) => Self::Distinct { ctx: None }, + Some(ast::Distinctness::All) => Self::NonDistinct, + None => Self::NonDistinct, + } + } + pub fn is_distinct(&self) -> bool { + matches!(self, AggDistinctness::Distinct { .. }) + } +} + +/// Translation context for handling distinct aggregates. +#[derive(Debug, Clone, PartialEq)] +pub struct DistinctAggCtx { + /// The cursor ID for the ephemeral index opened for the distinct aggregate. + /// This is used to track the distinct values and avoid duplicates. + pub cursor_id: usize, + /// The index name for the ephemeral index opened for the distinct aggregate. + pub ephemeral_index_name: String, + /// The label for the on conflict branch. + /// When a duplicate is found, the program will jump to the offset this label points to. + pub label_on_conflict: BranchOffset, +} + #[derive(Clone, Debug, PartialEq)] pub struct Aggregate { pub func: AggFunc, pub args: Vec, pub original_expr: ast::Expr, + pub distinctness: AggDistinctness, +} + +impl Aggregate { + pub fn is_distinct(&self) -> bool { + self.distinctness.is_distinct() + } } impl Display for Aggregate { diff --git a/core/translate/planner.rs b/core/translate/planner.rs index 1ed38bdb4..2685c5301 100644 --- a/core/translate/planner.rs +++ b/core/translate/planner.rs @@ -1,7 +1,8 @@ use super::{ plan::{ - Aggregate, ColumnUsedMask, EvalAt, IterationDirection, JoinInfo, JoinOrderMember, - Operation, Plan, ResultSetColumn, SelectPlan, SelectQueryType, TableReference, WhereTerm, + AggDistinctness, Aggregate, ColumnUsedMask, EvalAt, IterationDirection, JoinInfo, + JoinOrderMember, Operation, Plan, ResultSetColumn, SelectPlan, SelectQueryType, + TableReference, WhereTerm, }, select::prepare_select_plan, SymbolTable, @@ -19,15 +20,20 @@ use limbo_sqlite3_parser::ast::{ pub const ROWID: &str = "rowid"; -pub fn resolve_aggregates(expr: &Expr, aggs: &mut Vec) -> bool { +pub fn resolve_aggregates(expr: &Expr, aggs: &mut Vec) -> Result { if aggs .iter() .any(|a| exprs_are_equivalent(&a.original_expr, expr)) { - return true; + return Ok(true); } match expr { - Expr::FunctionCall { name, args, .. } => { + Expr::FunctionCall { + name, + args, + distinctness, + .. + } => { let args_count = if let Some(args) = &args { args.len() } else { @@ -35,21 +41,29 @@ pub fn resolve_aggregates(expr: &Expr, aggs: &mut Vec) -> bool { }; match Func::resolve_function(normalize_ident(name.0.as_str()).as_str(), args_count) { Ok(Func::Agg(f)) => { + let distinctness = AggDistinctness::from_ast(distinctness.as_ref()); + let num_args = args.as_ref().map_or(0, |args| args.len()); + if distinctness.is_distinct() && num_args != 1 { + crate::bail_parse_error!( + "DISTINCT aggregate functions must have exactly one argument" + ); + } aggs.push(Aggregate { func: f, args: args.clone().unwrap_or_default(), original_expr: expr.clone(), + distinctness, }); - true + Ok(true) } _ => { let mut contains_aggregates = false; if let Some(args) = args { for arg in args.iter() { - contains_aggregates |= resolve_aggregates(arg, aggs); + contains_aggregates |= resolve_aggregates(arg, aggs)?; } } - contains_aggregates + Ok(contains_aggregates) } } } @@ -61,25 +75,26 @@ pub fn resolve_aggregates(expr: &Expr, aggs: &mut Vec) -> bool { func: f, args: vec![], original_expr: expr.clone(), + distinctness: AggDistinctness::NonDistinct, }); - true + Ok(true) } else { - false + Ok(false) } } Expr::Binary(lhs, _, rhs) => { let mut contains_aggregates = false; - contains_aggregates |= resolve_aggregates(lhs, aggs); - contains_aggregates |= resolve_aggregates(rhs, aggs); - contains_aggregates + contains_aggregates |= resolve_aggregates(lhs, aggs)?; + contains_aggregates |= resolve_aggregates(rhs, aggs)?; + Ok(contains_aggregates) } Expr::Unary(_, expr) => { let mut contains_aggregates = false; - contains_aggregates |= resolve_aggregates(expr, aggs); - contains_aggregates + contains_aggregates |= resolve_aggregates(expr, aggs)?; + Ok(contains_aggregates) } // TODO: handle other expressions that may contain aggregates - _ => false, + _ => Ok(false), } } diff --git a/core/translate/select.rs b/core/translate/select.rs index d6766a20f..f98e6e2a5 100644 --- a/core/translate/select.rs +++ b/core/translate/select.rs @@ -1,5 +1,7 @@ use super::emitter::{emit_program, TranslateCtx}; -use super::plan::{select_star, JoinOrderMember, Operation, Search, SelectQueryType}; +use super::plan::{ + select_star, AggDistinctness, JoinOrderMember, Operation, Search, SelectQueryType, +}; use super::planner::Scope; use crate::function::{AggFunc, ExtFunc, Func}; use crate::translate::optimizer::optimize_plan; @@ -159,7 +161,7 @@ pub fn prepare_select_plan<'a>( match expr { ast::Expr::FunctionCall { name, - distinctness: _, + distinctness, args, filter_over: _, order_by: _, @@ -169,6 +171,10 @@ pub fn prepare_select_plan<'a>( } else { 0 }; + let distinctness = AggDistinctness::from_ast(distinctness.as_ref()); + if distinctness.is_distinct() && args_count != 1 { + crate::bail_parse_error!("DISTINCT aggregate functions must have exactly one argument"); + } match Func::resolve_function( normalize_ident(name.0.as_str()).as_str(), args_count, @@ -192,6 +198,7 @@ pub fn prepare_select_plan<'a>( func: f, args: agg_args.clone(), original_expr: expr.clone(), + distinctness, }; aggregate_expressions.push(agg.clone()); plan.result_columns.push(ResultSetColumn { @@ -205,7 +212,7 @@ pub fn prepare_select_plan<'a>( } Ok(_) => { let contains_aggregates = - resolve_aggregates(expr, &mut aggregate_expressions); + resolve_aggregates(expr, &mut aggregate_expressions)?; plan.result_columns.push(ResultSetColumn { alias: maybe_alias.as_ref().map(|alias| match alias { ast::As::Elided(alias) => alias.0.clone(), @@ -222,7 +229,7 @@ pub fn prepare_select_plan<'a>( let contains_aggregates = resolve_aggregates( expr, &mut aggregate_expressions, - ); + )?; plan.result_columns.push(ResultSetColumn { alias: maybe_alias.as_ref().map(|alias| { match alias { @@ -240,6 +247,7 @@ pub fn prepare_select_plan<'a>( func: AggFunc::External(f.func.clone().into()), args: args.as_ref().unwrap().clone(), original_expr: expr.clone(), + distinctness, }; aggregate_expressions.push(agg.clone()); plan.result_columns.push(ResultSetColumn { @@ -276,6 +284,7 @@ pub fn prepare_select_plan<'a>( "1".to_string(), ))], original_expr: expr.clone(), + distinctness: AggDistinctness::NonDistinct, }; aggregate_expressions.push(agg.clone()); plan.result_columns.push(ResultSetColumn { @@ -295,7 +304,7 @@ pub fn prepare_select_plan<'a>( } expr => { let contains_aggregates = - resolve_aggregates(expr, &mut aggregate_expressions); + resolve_aggregates(expr, &mut aggregate_expressions)?; plan.result_columns.push(ResultSetColumn { alias: maybe_alias.as_ref().map(|alias| match alias { ast::As::Elided(alias) => alias.0.clone(), @@ -341,7 +350,7 @@ pub fn prepare_select_plan<'a>( Some(&plan.result_columns), )?; let contains_aggregates = - resolve_aggregates(expr, &mut aggregate_expressions); + resolve_aggregates(expr, &mut aggregate_expressions)?; if !contains_aggregates { // TODO: sqlite allows HAVING clauses with non aggregate expressions like // HAVING id = 5. We should support this too eventually (I guess). @@ -376,7 +385,7 @@ pub fn prepare_select_plan<'a>( &mut plan.table_references, Some(&plan.result_columns), )?; - resolve_aggregates(&o.expr, &mut plan.aggregates); + resolve_aggregates(&o.expr, &mut plan.aggregates)?; key.push((o.expr, o.order.unwrap_or(ast::SortOrder::Asc))); } diff --git a/core/vdbe/builder.rs b/core/vdbe/builder.rs index 0d8dea8d0..a64aa1f85 100644 --- a/core/vdbe/builder.rs +++ b/core/vdbe/builder.rs @@ -575,6 +575,12 @@ impl ProgramBuilder { Insn::NoConflict { target_pc, .. } => { resolve(target_pc, "NoConflict"); } + Insn::Found { target_pc, .. } => { + resolve(target_pc, "Found"); + } + Insn::NotFound { target_pc, .. } => { + resolve(target_pc, "NotFound"); + } _ => {} } } diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index c3e7f328c..22f0b997d 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -4784,23 +4784,31 @@ pub fn op_once( Ok(InsnFunctionStepResult::Step) } -pub fn op_not_found( +pub fn op_found( program: &Program, state: &mut ProgramState, insn: &Insn, pager: &Rc, mv_store: Option<&Rc>, ) -> Result { - let Insn::NotFound { - cursor_id, - target_pc, - record_reg, - num_regs, - } = insn - else { - unreachable!("unexpected Insn {:?}", insn) + let (cursor_id, target_pc, record_reg, num_regs) = match insn { + Insn::NotFound { + cursor_id, + target_pc, + record_reg, + num_regs, + } => (cursor_id, target_pc, record_reg, num_regs), + Insn::Found { + cursor_id, + target_pc, + record_reg, + num_regs, + } => (cursor_id, target_pc, record_reg, num_regs), + _ => unreachable!("unexpected Insn {:?}", insn), }; + let not = matches!(insn, Insn::NotFound { .. }); + let found = { let mut cursor = state.get_cursor(*cursor_id); let cursor = cursor.as_btree_mut(); @@ -4822,10 +4830,11 @@ pub fn op_not_found( } }; - if found { - state.pc += 1; - } else { + let do_jump = (!found && not) || (found && !not); + if do_jump { state.pc = target_pc.to_offset_int(); + } else { + state.pc += 1; } Ok(InsnFunctionStepResult::Step) diff --git a/core/vdbe/explain.rs b/core/vdbe/explain.rs index e899e5a9b..4de07b2e5 100644 --- a/core/vdbe/explain.rs +++ b/core/vdbe/explain.rs @@ -1434,16 +1434,30 @@ pub fn insn_to_str( target_pc, record_reg, .. + } + | Insn::Found { + cursor_id, + target_pc, + record_reg, + .. } => ( - "NotFound", + if matches!(insn, Insn::NotFound { .. }) { + "NotFound" + } else { + "Found" + }, *cursor_id as i32, target_pc.to_debug_int(), *record_reg as i32, Value::build_text(""), 0, format!( - "if (r[{}] != NULL) goto {}", - record_reg, + "if {}found goto {}", + if matches!(insn, Insn::NotFound { .. }) { + "not " + } else { + "" + }, target_pc.to_debug_int() ), ), diff --git a/core/vdbe/insn.rs b/core/vdbe/insn.rs index 7cac9f202..7063ac050 100644 --- a/core/vdbe/insn.rs +++ b/core/vdbe/insn.rs @@ -858,6 +858,15 @@ pub enum Insn { Once { target_pc_when_reentered: BranchOffset, }, + /// Search for a record in the index cursor. + /// If any entry for which the key is a prefix exists, jump to target_pc. + /// Otherwise, continue to the next instruction. + Found { + cursor_id: CursorID, + target_pc: BranchOffset, + record_reg: usize, + num_regs: usize, + }, /// Search for record in the index cusor, if any entry for which the key is a prefix exists /// is a no-op, otherwise go to target_pc /// Example => @@ -1004,7 +1013,7 @@ impl Insn { Insn::ReadCookie { .. } => execute::op_read_cookie, Insn::OpenEphemeral { .. } | Insn::OpenAutoindex { .. } => execute::op_open_ephemeral, Insn::Once { .. } => execute::op_once, - Insn::NotFound { .. } => execute::op_not_found, + Insn::Found { .. } | Insn::NotFound { .. } => execute::op_found, Insn::Affinity { .. } => execute::op_affinity, Insn::IdxDelete { .. } => execute::op_idx_delete, Insn::Count { .. } => execute::op_count, diff --git a/testing/agg-functions.test b/testing/agg-functions.test index f1a85dde5..c9169906b 100755 --- a/testing/agg-functions.test +++ b/testing/agg-functions.test @@ -127,3 +127,7 @@ do_execsql_test select-agg-json-array { do_execsql_test select-agg-json-array-object { SELECT json_group_array(json_object('name', name)) FROM products; } {[{"name":"hat"},{"name":"cap"},{"name":"shirt"},{"name":"sweater"},{"name":"sweatshirt"},{"name":"shorts"},{"name":"jeans"},{"name":"sneakers"},{"name":"boots"},{"name":"coat"},{"name":"accessories"}]} + +do_execsql_test select-distinct-agg-functions { + SELECT sum(distinct age), count(distinct age), avg(distinct age) FROM users; +} {5050|100|50.5} \ No newline at end of file diff --git a/testing/groupby.test b/testing/groupby.test index 70141be0a..1012ed658 100644 --- a/testing/groupby.test +++ b/testing/groupby.test @@ -197,4 +197,13 @@ do_execsql_test group_by_no_sorting_required { select age, count(1) from users group by age limit 3; } {1|112 2|113 -3|97} \ No newline at end of file +3|97} + +do_execsql_test distinct_agg_functions { + select first_name, sum(distinct age), count(distinct age), avg(distinct age) + from users + group by 1 + limit 3; +} {Aaron|1769|33|53.6060606060606 +Abigail|833|15|55.5333333333333 +Adam|1517|30|50.5666666666667} \ No newline at end of file