From 37097e01ae993a5193bc3e9f90f735df8ae0c1fa Mon Sep 17 00:00:00 2001 From: Jussi Saurio Date: Sat, 3 May 2025 12:47:32 +0300 Subject: [PATCH] GROUP BY: refactor logic to support cases where no sorting is needed --- core/translate/aggregation.rs | 13 +- core/translate/emitter.rs | 15 +- core/translate/group_by.rs | 870 ++++++++++++++++++++++++---------- core/translate/main_loop.rs | 109 +++-- core/translate/order_by.rs | 36 +- core/translate/plan.rs | 37 ++ core/translate/result_row.rs | 49 +- core/translate/select.rs | 3 +- 8 files changed, 785 insertions(+), 347 deletions(-) diff --git a/core/translate/aggregation.rs b/core/translate/aggregation.rs index 7e61d1305..4ac82bf7c 100644 --- a/core/translate/aggregation.rs +++ b/core/translate/aggregation.rs @@ -41,7 +41,18 @@ pub fn emit_ungrouped_aggregation<'a>( // This always emits a ResultRow because currently it can only be used for a single row result // Limit is None because we early exit on limit 0 and the max rows here is 1 - emit_select_result(program, t_ctx, plan, None, None)?; + emit_select_result( + program, + &t_ctx.resolver, + plan, + None, + None, + t_ctx.reg_nonagg_emit_once_flag, + t_ctx.reg_offset, + t_ctx.reg_result_cols_start.unwrap(), + t_ctx.reg_limit, + t_ctx.reg_limit_offset_sum, + )?; Ok(()) } diff --git a/core/translate/emitter.rs b/core/translate/emitter.rs index fe6567fe9..b74b20d78 100644 --- a/core/translate/emitter.rs +++ b/core/translate/emitter.rs @@ -17,7 +17,9 @@ use crate::{Result, SymbolTable}; use super::aggregation::emit_ungrouped_aggregation; use super::expr::{translate_condition_expr, translate_expr, ConditionMetadata}; -use super::group_by::{emit_group_by, init_group_by, GroupByMetadata}; +use super::group_by::{ + group_by_agg_phase, group_by_emit_row_phase, init_group_by, GroupByMetadata, GroupByRowSource, +}; use super::main_loop::{close_loop, emit_loop, init_loop, open_loop, LeftJoinMetadata, LoopLabels}; use super::order_by::{emit_order_by, init_order_by, SortMetadata}; use super::plan::{JoinOrderMember, Operation, SelectPlan, TableReference, UpdatePlan}; @@ -324,9 +326,18 @@ pub fn emit_query<'a>( let mut order_by_necessary = plan.order_by.is_some() && !plan.contains_constant_false_condition; let order_by = plan.order_by.as_ref(); + // Handle GROUP BY and aggregation processing if plan.group_by.is_some() { - emit_group_by(program, t_ctx, plan)?; + let row_source = &t_ctx + .meta_group_by + .as_ref() + .expect("group by metadata not found") + .row_source; + if matches!(row_source, GroupByRowSource::Sorter { .. }) { + group_by_agg_phase(program, t_ctx, plan)?; + } + group_by_emit_row_phase(program, t_ctx, plan)?; } else if !plan.aggregates.is_empty() { // Handle aggregation without GROUP BY emit_ungrouped_aggregation(program, t_ctx, plan)?; diff --git a/core/translate/group_by.rs b/core/translate/group_by.rs index 70b33dee1..0034bc172 100644 --- a/core/translate/group_by.rs +++ b/core/translate/group_by.rs @@ -1,6 +1,6 @@ use std::rc::Rc; -use limbo_sqlite3_parser::ast::{self, SortOrder}; +use limbo_sqlite3_parser::ast; use crate::{ function::AggFunc, @@ -22,30 +22,61 @@ use super::{ result_row::emit_select_result, }; +/// Labels needed for various jumps in GROUP BY handling. +#[derive(Debug)] +pub struct GroupByLabels { + /// Label for the subroutine that clears the accumulator registers (temporary storage for per-group aggregate calculations) + pub label_subrtn_acc_clear: BranchOffset, + /// Label for the subroutine that outputs the current group's data + pub label_subrtn_acc_output: BranchOffset, + /// Label for the instruction that sets the accumulator indicator to true (indicating data exists in the accumulator for the current group) + pub label_acc_indicator_set_flag_true: BranchOffset, + /// Label for the instruction that jumps to the end of the grouping process without emitting a row + pub label_group_by_end_without_emitting_row: BranchOffset, + /// Label for the instruction that jumps to the end of the grouping process + pub label_agg_final: BranchOffset, + /// Label for the instruction that jumps to the end of the grouping process + pub label_group_by_end: BranchOffset, + /// Label for the instruction that jumps to the start of the loop that processed sorted data for GROUP BY. + /// Not relevant for cases where the data is already sorted. + pub label_sort_loop_start: BranchOffset, + /// Label for the instruction that jumps to the end of the loop that processed sorted data for GROUP BY. + /// Not relevant for cases where the data is already sorted. + pub label_sort_loop_end: BranchOffset, + /// Label for the instruction that jumps to the start of the aggregation step + pub label_grouping_agg_step: BranchOffset, +} + +/// Registers allocated for GROUP BY operations. +#[derive(Debug)] +pub struct GroupByRegisters { + pub reg_group_by_source_cols_start: usize, + /// Register holding the return offset for the accumulator clear subroutine + pub reg_subrtn_acc_clear_return_offset: usize, + /// Register holding a flag to abort the grouping process if necessary + pub reg_abort_flag: usize, + /// Register holding the start of the non aggregate query members (all columns except aggregate arguments) + pub reg_non_aggregate_exprs_acc: usize, + /// Register holding the return offset for the accumulator output subroutine + pub reg_subrtn_acc_output_return_offset: usize, + /// Register holding a flag to indicate if data exists in the accumulator for the current group + pub reg_data_in_acc_flag: usize, + /// Starting index of the register(s) that hold the comparison result between the current row and the previous row + /// The comparison result is used to determine if the current row belongs to the same group as the previous row + /// Each group by expression has a corresponding register + pub reg_group_exprs_cmp: usize, +} + // Metadata for handling GROUP BY operations #[derive(Debug)] pub struct GroupByMetadata { - // Cursor ID for the Sorter table where the grouped rows are stored - pub sort_cursor: usize, - // Label for the subroutine that clears the accumulator registers (temporary storage for per-group aggregate calculations) - pub label_subrtn_acc_clear: BranchOffset, - // Label for the instruction that sets the accumulator indicator to true (indicating data exists in the accumulator for the current group) - pub label_acc_indicator_set_flag_true: BranchOffset, - // Register holding the return offset for the accumulator clear subroutine - pub reg_subrtn_acc_clear_return_offset: usize, - // Register holding the key used for sorting in the Sorter - pub reg_sorter_key: usize, - // Register holding a flag to abort the grouping process if necessary - pub reg_abort_flag: usize, - // Register holding the start of the non aggregate query members (all columns except aggregate arguments) - pub reg_non_aggregate_exprs_acc: usize, - // Starting index of the register(s) that hold the comparison result between the current row and the previous row - // The comparison result is used to determine if the current row belongs to the same group as the previous row - // Each group by expression has a corresponding register - pub reg_group_exprs_cmp: usize, + // Source of rows for the GROUP BY operation - either a sorter or the main loop itself, incase the rows are already sorted in GROUP BY required order + pub row_source: GroupByRowSource, + pub labels: GroupByLabels, + pub registers: GroupByRegisters, // Columns that not part of GROUP BY clause and not arguments of Aggregation function. // Heavy calculation and needed in different functions, so it is reasonable to do it once and save. - pub non_group_by_non_agg_column_count: Option, + pub non_group_by_non_agg_column_count: usize, } /// Initialize resources needed for GROUP BY processing @@ -63,21 +94,52 @@ pub fn init_group_by( .filter(|rc| !rc.contains_aggregates) .count(); - let sort_cursor = program.alloc_cursor_id(None, CursorType::Sorter); + let label_subrtn_acc_output = program.allocate_label(); + let label_group_by_end_without_emitting_row = program.allocate_label(); + let label_acc_indicator_set_flag_true = program.allocate_label(); + let label_agg_final = program.allocate_label(); + let label_group_by_end = program.allocate_label(); + let label_subrtn_acc_clear = program.allocate_label(); + let label_sort_loop_start = program.allocate_label(); + let label_sort_loop_end = program.allocate_label(); + let label_grouping_agg_step = program.allocate_label(); + let reg_subrtn_acc_output_return_offset = program.alloc_register(); + let reg_data_in_acc_flag = program.alloc_register(); let reg_abort_flag = program.alloc_register(); let reg_group_exprs_cmp = program.alloc_registers(group_by.exprs.len()); let reg_non_aggregate_exprs_acc = program.alloc_registers(non_aggregate_count); let reg_agg_exprs_start = program.alloc_registers(num_aggs); let reg_sorter_key = program.alloc_register(); + let column_count = plan.group_by_sorter_column_count(); + let reg_group_by_source_cols_start = program.alloc_registers(column_count); - let label_subrtn_acc_clear = program.allocate_label(); - - program.emit_insn(Insn::SorterOpen { - cursor_id: sort_cursor, - columns: non_aggregate_count + plan.aggregates.len(), - order: (0..group_by.exprs.len()).map(|_| SortOrder::Asc).collect(), - }); + let row_source = if let Some(sort_order) = group_by.sort_order.as_ref() { + let sort_cursor = program.alloc_cursor_id(None, CursorType::Sorter); + let sorter_column_count = plan.group_by_sorter_column_count(); + program.emit_insn(Insn::SorterOpen { + cursor_id: sort_cursor, + columns: sorter_column_count, + order: sort_order.clone(), + }); + let pseudo_cursor = group_by_create_pseudo_table(program, sorter_column_count); + GroupByRowSource::Sorter { + pseudo_cursor, + sort_cursor, + reg_sorter_key, + column_register_mapping: group_by_create_column_register_mapping( + group_by, + reg_non_aggregate_exprs_acc, + plan, + ), + sorter_column_count, + } + } else { + GroupByRowSource::MainLoop { + start_reg_src: reg_group_by_source_cols_start, + start_reg_dest: reg_non_aggregate_exprs_acc, + } + }; program.add_comment(program.offset(), "clear group by abort flag"); program.emit_insn(Insn::Integer { @@ -109,97 +171,39 @@ pub fn init_group_by( t_ctx.reg_agg_start = Some(reg_agg_exprs_start); t_ctx.meta_group_by = Some(GroupByMetadata { - sort_cursor, - label_subrtn_acc_clear, - label_acc_indicator_set_flag_true: program.allocate_label(), - reg_subrtn_acc_clear_return_offset, - reg_abort_flag, - reg_non_aggregate_exprs_acc, - reg_group_exprs_cmp, - reg_sorter_key, - non_group_by_non_agg_column_count: None, + row_source, + labels: GroupByLabels { + label_subrtn_acc_output, + label_group_by_end_without_emitting_row, + label_acc_indicator_set_flag_true, + label_agg_final, + label_group_by_end, + label_subrtn_acc_clear, + label_sort_loop_start, + label_sort_loop_end, + label_grouping_agg_step, + }, + registers: GroupByRegisters { + reg_subrtn_acc_output_return_offset, + reg_data_in_acc_flag, + reg_abort_flag, + reg_non_aggregate_exprs_acc, + reg_group_exprs_cmp, + reg_subrtn_acc_clear_return_offset, + reg_group_by_source_cols_start, + }, + non_group_by_non_agg_column_count: plan.non_group_by_non_agg_column_count(), }); Ok(()) } -/// Emits the bytecode for processing a GROUP BY clause. -/// This is called when the main query execution loop has finished processing, -/// and we now have data in the GROUP BY sorter. -pub fn emit_group_by<'a>( +/// In case sorting is needed for GROUP BY, creates a pseudo table that matches +/// the number of columns in the GROUP BY sorter. Rows are individually read +/// from the sorter into this pseudo table and processed. +pub fn group_by_create_pseudo_table( program: &mut ProgramBuilder, - t_ctx: &mut TranslateCtx<'a>, - plan: &'a SelectPlan, -) -> Result<()> { - // Label for the first instruction of the grouping loop. - // This is the start of the loop that reads the sorted data and groups&aggregates it. - let label_grouping_loop_start = program.allocate_label(); - // Label for the instruction immediately after the grouping loop. - let label_grouping_loop_end = program.allocate_label(); - // Label for the instruction where a row for a finished group is output. - // Distinct from subroutine_accumulator_output_label, which is the start of the subroutine, but may still skip emitting a row. - let label_agg_final = program.allocate_label(); - // Label for the instruction immediately after the entire group by phase. - let label_group_by_end = program.allocate_label(); - // Label for the beginning of the subroutine that potentially outputs a row for a finished group. - let label_subrtn_acc_output = program.allocate_label(); - // Register holding the return offset of the subroutine that potentially outputs a row for a finished group. - let reg_subrtn_acc_output_return_offset = program.alloc_register(); - // Register holding a boolean indicating whether there's data in the accumulator (used for aggregation) - let reg_data_in_acc_flag = program.alloc_register(); - - let GroupByMetadata { - sort_cursor, - reg_group_exprs_cmp, - reg_subrtn_acc_clear_return_offset, - reg_non_aggregate_exprs_acc, - reg_abort_flag, - reg_sorter_key, - label_subrtn_acc_clear, - label_acc_indicator_set_flag_true, - non_group_by_non_agg_column_count, - .. - } = *t_ctx.meta_group_by.as_mut().unwrap(); - let group_by = plan.group_by.as_ref().unwrap(); - - let agg_args_count = plan - .aggregates - .iter() - .map(|agg| agg.args.len()) - .sum::(); - let group_by_count = group_by.exprs.len(); - let non_group_by_non_agg_column_count = non_group_by_non_agg_column_count.unwrap(); - - // We have to know which group by expr present in resulting set - let group_by_expr_in_res_cols = group_by.exprs.iter().map(|expr| { - plan.result_columns - .iter() - .any(|e| exprs_are_equivalent(&e.expr, expr)) - }); - - // Create a map from sorter column index to result register - // This helps track where each column from the sorter should be stored - let mut column_register_mapping = - vec![None; group_by_count + non_group_by_non_agg_column_count]; - let mut next_reg = reg_non_aggregate_exprs_acc; - - // Map GROUP BY columns that are in the result set to registers - for (i, is_in_result) in group_by_expr_in_res_cols.clone().enumerate() { - if is_in_result { - column_register_mapping[i] = Some(next_reg); - next_reg += 1; - } - } - - // Handle other non-aggregate columns that aren't part of GROUP BY and not part of Aggregation function - for i in group_by_count..group_by_count + non_group_by_non_agg_column_count { - column_register_mapping[i] = Some(next_reg); - next_reg += 1; - } - - // Calculate total number of columns in the sorter - // The sorter contains all GROUP BY columns, aggregate arguments, and other columns - let sorter_column_count = agg_args_count + group_by_count + non_group_by_non_agg_column_count; - + sorter_column_count: usize, +) -> usize { // Create pseudo-columns for the pseudo-table // (these are placeholders as we only care about structure, not semantics) let ty = crate::schema::Type::Null; @@ -221,49 +225,243 @@ pub fn emit_group_by<'a>( columns: pseudo_columns, }); - let pseudo_cursor = program.alloc_cursor_id(None, CursorType::Pseudo(pseudo_table.clone())); + program.alloc_cursor_id(None, CursorType::Pseudo(pseudo_table.clone())) +} +/// In case sorting is needed for GROUP BY, sorts the rows in the GROUP BY sorter +/// and opens a pseudo table from which the sorted rows are read. +pub fn emit_group_by_sort_loop_start( + program: &mut ProgramBuilder, + row_source: &GroupByRowSource, + label_sort_loop_end: BranchOffset, +) -> Result<()> { + let GroupByRowSource::Sorter { + sort_cursor, + pseudo_cursor, + reg_sorter_key, + sorter_column_count, + .. + } = row_source + else { + crate::bail_parse_error!("sort cursor must be opened for GROUP BY if we got here"); + }; program.emit_insn(Insn::OpenPseudo { - cursor_id: pseudo_cursor, - content_reg: reg_sorter_key, - num_fields: sorter_column_count, + cursor_id: *pseudo_cursor, + content_reg: *reg_sorter_key, + num_fields: *sorter_column_count, }); // Sort the sorter based on the group by columns program.emit_insn(Insn::SorterSort { - cursor_id: sort_cursor, - pc_if_empty: label_grouping_loop_end, + cursor_id: *sort_cursor, + pc_if_empty: label_sort_loop_end, }); - program.resolve_label(label_grouping_loop_start, program.offset()); - // Read a row from the sorted data in the sorter into the pseudo cursor - program.emit_insn(Insn::SorterData { - cursor_id: sort_cursor, - dest_reg: reg_sorter_key, - pseudo_cursor, - }); + Ok(()) +} - // Read the group by columns from the pseudo cursor - let groups_start_reg = program.alloc_registers(group_by.exprs.len()); - for i in 0..group_by.exprs.len() { - let sorter_column_index = i; - let group_reg = groups_start_reg + i; - program.emit_insn(Insn::Column { - cursor_id: pseudo_cursor, - column: sorter_column_index, - dest: group_reg, - }); +/// In case sorting is needed for GROUP BY, advances to the next row +/// in the GROUP BY sorter. +pub fn emit_group_by_sort_loop_end( + program: &mut ProgramBuilder, + sort_cursor: usize, + label_sort_loop_start: BranchOffset, + label_sort_loop_end: BranchOffset, +) { + // Continue to the next row in the sorter + program.emit_insn(Insn::SorterNext { + cursor_id: sort_cursor, + pc_if_next: label_sort_loop_start, + }); + program.preassign_label_to_next_insn(label_sort_loop_end); +} + +/// Enum representing the source for the rows processed during a GROUP BY. +/// In case sorting is needed (which is most of the time), the variant +/// [GroupByRowSource::Sorter] encodes the necessary information about that +/// sorter. +/// +/// In case where the rows are already ordered, for example: +/// "SELECT indexed_col, count(1) FROM t GROUP BY indexed_col" +/// the rows are processed directly in the order they arrive from +/// the main query loop. +#[derive(Debug)] +pub enum GroupByRowSource { + Sorter { + /// Cursor opened for the pseudo table that GROUP BY reads rows from. + pseudo_cursor: usize, + /// The sorter opened for ensuring the rows are in GROUP BY order. + sort_cursor: usize, + /// Register holding the key used for sorting in the Sorter + reg_sorter_key: usize, + /// Number of columns in the GROUP BY sorter + sorter_column_count: usize, + /// In case some result columns of the SELECT query are equivalent to GROUP BY members, + /// this mapping encodes their position. + column_register_mapping: Vec>, + }, + MainLoop { + /// If GROUP BY rows are read directly in the main loop, start_reg is the first register + /// holding the value of a relevant column. + start_reg_src: usize, + /// The grouping columns for a group that is not yet finalized must be placed in new registers, + /// so that they don't get overwritten by the next group's data. + /// This is because the emission of a group that is "done" is made after a comparison between the "current" and "next" grouping + /// columns returns nonequal. If we don't store the "current" group in a separate set of registers, the "next" group's data will + /// overwrite the "current" group's columns and the wrong grouping column values will be emitted. + /// Aggregation results do not require new registers as they are not at risk of being overwritten before a given group + /// is processed. + start_reg_dest: usize, + }, +} + +/// Enum representing the source of the aggregate function arguments +/// emitted for a group by aggregation. +/// In the common case, the aggregate function arguments are first inserted +/// into a sorter in the main loop, and in the group by aggregation phase +/// we read the data from the sorter. +/// +/// In the alternative case, no sorting is required for group by, +/// and the aggregate function arguments are retrieved directly from +/// registers allocated in the main loop. +pub enum GroupByAggArgumentSource<'a> { + /// The aggregate function arguments are retrieved from a pseudo cursor + /// which reads from the GROUP BY sorter. + PseudoCursor { + cursor_id: usize, + col_start: usize, + dest_reg_start: usize, + aggregate: &'a Aggregate, + }, + /// The aggregate function arguments are retrieved from a contiguous block of registers + /// allocated in the main loop for that given aggregate function. + Register { + src_reg_start: usize, + aggregate: &'a Aggregate, + }, +} + +impl<'a> GroupByAggArgumentSource<'a> { + /// Create a new [GroupByAggArgumentSource] that retrieves the values from a GROUP BY sorter. + pub fn new_from_cursor( + program: &mut ProgramBuilder, + cursor_id: usize, + col_start: usize, + aggregate: &'a Aggregate, + ) -> Self { + let dest_reg_start = program.alloc_registers(aggregate.args.len()); + Self::PseudoCursor { + cursor_id, + col_start, + dest_reg_start, + aggregate, + } } + /// Create a new [GroupByAggArgumentSource] that retrieves the values directly from an already + /// populated register or registers. + pub fn new_from_registers(src_reg_start: usize, aggregate: &'a Aggregate) -> Self { + Self::Register { + src_reg_start, + aggregate, + } + } + pub fn agg_func(&self) -> &AggFunc { + match self { + GroupByAggArgumentSource::PseudoCursor { aggregate, .. } => &aggregate.func, + GroupByAggArgumentSource::Register { aggregate, .. } => &aggregate.func, + } + } + pub fn args(&self) -> &[ast::Expr] { + match self { + GroupByAggArgumentSource::PseudoCursor { aggregate, .. } => &aggregate.args, + GroupByAggArgumentSource::Register { aggregate, .. } => &aggregate.args, + } + } + pub fn num_args(&self) -> usize { + match self { + GroupByAggArgumentSource::PseudoCursor { aggregate, .. } => aggregate.args.len(), + GroupByAggArgumentSource::Register { aggregate, .. } => aggregate.args.len(), + } + } + /// Read the value of an aggregate function argument either from sorter data or directly from a register. + pub fn translate(&self, program: &mut ProgramBuilder, arg_idx: usize) -> Result { + match self { + GroupByAggArgumentSource::PseudoCursor { + cursor_id, + col_start, + dest_reg_start, + .. + } => { + program.emit_insn(Insn::Column { + cursor_id: *cursor_id, + column: *col_start, + dest: dest_reg_start + arg_idx, + }); + Ok(dest_reg_start + arg_idx) + } + GroupByAggArgumentSource::Register { + src_reg_start: start_reg, + .. + } => Ok(*start_reg + arg_idx), + } + } +} + +/// Emits bytecode for processing a single GROUP BY group. +pub fn group_by_process_single_group( + program: &mut ProgramBuilder, + group_by: &GroupBy, + plan: &SelectPlan, + t_ctx: &TranslateCtx, +) -> Result<()> { + let GroupByMetadata { + registers, + labels, + row_source, + non_group_by_non_agg_column_count, + .. + } = t_ctx + .meta_group_by + .as_ref() + .expect("group by metadata not found"); + program.preassign_label_to_next_insn(labels.label_sort_loop_start); + let groups_start_reg = match &row_source { + GroupByRowSource::Sorter { + sort_cursor, + pseudo_cursor, + reg_sorter_key, + .. + } => { + // Read a row from the sorted data in the sorter into the pseudo cursor + program.emit_insn(Insn::SorterData { + cursor_id: *sort_cursor, + dest_reg: *reg_sorter_key, + pseudo_cursor: *pseudo_cursor, + }); + // Read the group by columns from the pseudo cursor + let groups_start_reg = program.alloc_registers(group_by.exprs.len()); + for i in 0..group_by.exprs.len() { + let sorter_column_index = i; + let group_reg = groups_start_reg + i; + program.emit_insn(Insn::Column { + cursor_id: *pseudo_cursor, + column: sorter_column_index, + dest: group_reg, + }); + } + groups_start_reg + } + + GroupByRowSource::MainLoop { start_reg_src, .. } => *start_reg_src, + }; // Compare the group by columns to the previous group by columns to see if we are at a new group or not program.emit_insn(Insn::Compare { - start_reg_a: reg_group_exprs_cmp, + start_reg_a: registers.reg_group_exprs_cmp, start_reg_b: groups_start_reg, count: group_by.exprs.len(), }); - let agg_step_label = program.allocate_label(); - program.add_comment( program.offset(), "start new group if comparison is not equal", @@ -272,7 +470,7 @@ pub fn emit_group_by<'a>( let label_jump_after_comparison = program.allocate_label(); program.emit_insn(Insn::Jump { target_pc_lt: label_jump_after_comparison, - target_pc_eq: agg_step_label, + target_pc_eq: labels.label_grouping_agg_step, target_pc_gt: label_jump_after_comparison, }); @@ -282,46 +480,61 @@ pub fn emit_group_by<'a>( ); program.resolve_label(label_jump_after_comparison, program.offset()); program.emit_insn(Insn::Gosub { - target_pc: label_subrtn_acc_output, - return_reg: reg_subrtn_acc_output_return_offset, + target_pc: labels.label_subrtn_acc_output, + return_reg: registers.reg_subrtn_acc_output_return_offset, }); // New group, move current group by columns into the comparison register program.emit_insn(Insn::Move { source_reg: groups_start_reg, - dest_reg: reg_group_exprs_cmp, + dest_reg: registers.reg_group_exprs_cmp, count: group_by.exprs.len(), }); program.add_comment(program.offset(), "check abort flag"); program.emit_insn(Insn::IfPos { - reg: reg_abort_flag, - target_pc: label_group_by_end, + reg: registers.reg_abort_flag, + target_pc: labels.label_group_by_end, decrement_by: 0, }); program.add_comment(program.offset(), "goto clear accumulator subroutine"); program.emit_insn(Insn::Gosub { - target_pc: label_subrtn_acc_clear, - return_reg: reg_subrtn_acc_clear_return_offset, + target_pc: labels.label_subrtn_acc_clear, + return_reg: registers.reg_subrtn_acc_clear_return_offset, }); // Process each aggregate function for the current row - program.resolve_label(agg_step_label, program.offset()); + program.resolve_label(labels.label_grouping_agg_step, program.offset()); let start_reg = t_ctx.reg_agg_start.unwrap(); - let mut cursor_index = group_by_count + non_group_by_non_agg_column_count; // Skipping all columns in sorter that not an aggregation arguments + let cursor_index = *non_group_by_non_agg_column_count + group_by.exprs.len(); // Skipping all columns in sorter that not an aggregation arguments + let mut offset = 0; for (i, agg) in plan.aggregates.iter().enumerate() { let agg_result_reg = start_reg + i; + let agg_arg_source = match &row_source { + GroupByRowSource::Sorter { pseudo_cursor, .. } => { + GroupByAggArgumentSource::new_from_cursor( + program, + *pseudo_cursor, + cursor_index + offset, + agg, + ) + } + GroupByRowSource::MainLoop { start_reg_src, .. } => { + // Aggregation arguments are always placed in the registers that follow any scalars. + let start_reg_aggs = + start_reg_src + group_by.exprs.len() + plan.non_group_by_non_agg_column_count(); + GroupByAggArgumentSource::new_from_registers(start_reg_aggs + offset, agg) + } + }; translate_aggregation_step_groupby( program, &plan.table_references, - pseudo_cursor, - cursor_index, - agg, + agg_arg_source, agg_result_reg, &t_ctx.resolver, )?; - cursor_index += agg.args.len(); + offset += agg.args.len(); } // We only need to store non-aggregate columns once per group @@ -331,76 +544,195 @@ pub fn emit_group_by<'a>( "don't emit group columns if continuing existing group", ); program.emit_insn(Insn::If { - target_pc: label_acc_indicator_set_flag_true, - reg: reg_data_in_acc_flag, + target_pc: labels.label_acc_indicator_set_flag_true, + reg: registers.reg_data_in_acc_flag, jump_if_null: false, }); // Read non-aggregate columns from the current row - for (sorter_column_index, dest_reg) in column_register_mapping.iter().enumerate() { - if let Some(dest_reg) = dest_reg { - program.emit_insn(Insn::Column { - cursor_id: pseudo_cursor, - column: sorter_column_index, - dest: *dest_reg, - }); + match row_source { + GroupByRowSource::Sorter { + pseudo_cursor, + column_register_mapping, + .. + } => { + for (sorter_column_index, dest_reg) in column_register_mapping.iter().enumerate() { + if let Some(dest_reg) = dest_reg { + program.emit_insn(Insn::Column { + cursor_id: *pseudo_cursor, + column: sorter_column_index, + dest: *dest_reg, + }); + } + } + } + GroupByRowSource::MainLoop { start_reg_dest, .. } => { + // Re-translate all the non-aggregate expressions into destination registers. We cannot use the same registers as emitted + // in the earlier part of the main loop, because they would be overwritten by the next group before the group results + // are processed. + for (i, rc) in plan + .result_columns + .iter() + .filter(|rc| !rc.contains_aggregates) + .enumerate() + { + let dest_reg = start_reg_dest + i; + let expr = &rc.expr; + translate_expr( + program, + Some(&plan.table_references), + expr, + dest_reg, + &t_ctx.resolver, + )?; + } } } // Mark that we've stored data for this group - program.resolve_label(label_acc_indicator_set_flag_true, program.offset()); + program.resolve_label(labels.label_acc_indicator_set_flag_true, program.offset()); program.add_comment(program.offset(), "indicate data in accumulator"); program.emit_insn(Insn::Integer { value: 1, - dest: reg_data_in_acc_flag, + dest: registers.reg_data_in_acc_flag, }); + Ok(()) +} + +pub fn group_by_create_column_register_mapping( + group_by: &GroupBy, + reg_non_aggregate_exprs_acc: usize, + plan: &SelectPlan, +) -> Vec> { + // We have to know which group by expr present in resulting set + let group_by_expr_in_res_cols = group_by.exprs.iter().map(|expr| { + plan.result_columns + .iter() + .any(|e| exprs_are_equivalent(&e.expr, expr)) + }); + + let group_by_count = group_by.exprs.len(); + let non_group_by_non_agg_column_count = plan.non_group_by_non_agg_column_count(); + + // Create a map from sorter column index to result register + // This helps track where each column from the sorter should be stored + let mut column_register_mapping = + vec![None; group_by_count + non_group_by_non_agg_column_count]; + let mut next_reg = reg_non_aggregate_exprs_acc; + + // Map GROUP BY columns that are in the result set to registers + for (i, is_in_result) in group_by_expr_in_res_cols.clone().enumerate() { + if is_in_result { + column_register_mapping[i] = Some(next_reg); + next_reg += 1; + } + } + + // Handle other non-aggregate columns that aren't part of GROUP BY and not part of Aggregation function + for i in group_by_count..group_by_count + non_group_by_non_agg_column_count { + column_register_mapping[i] = Some(next_reg); + next_reg += 1; + } + + column_register_mapping +} + +/// Emits the bytecode for processing the aggregation phase of a GROUP BY clause. +/// This is called either when: +/// 1. the main query execution loop has finished processing, +/// and we now have data in the GROUP BY sorter. +/// 2. the rows are already sorted in the order that the GROUP BY keys are defined, +/// and we can start aggregating inside the main loop. +pub fn group_by_agg_phase<'a>( + program: &mut ProgramBuilder, + t_ctx: &mut TranslateCtx<'a>, + plan: &'a SelectPlan, +) -> Result<()> { + let GroupByMetadata { + labels, row_source, .. + } = t_ctx.meta_group_by.as_mut().unwrap(); + let group_by = plan.group_by.as_ref().unwrap(); + + let label_sort_loop_start = labels.label_sort_loop_start; + let label_sort_loop_end = labels.label_sort_loop_end; + + if matches!(row_source, GroupByRowSource::Sorter { .. }) { + emit_group_by_sort_loop_start(program, row_source, label_sort_loop_end)?; + } + + group_by_process_single_group(program, group_by, plan, t_ctx)?; + + let row_source = &t_ctx.meta_group_by.as_ref().unwrap().row_source; + // Continue to the next row in the sorter - program.emit_insn(Insn::SorterNext { - cursor_id: sort_cursor, - pc_if_next: label_grouping_loop_start, - }); - program.preassign_label_to_next_insn(label_grouping_loop_end); + if let GroupByRowSource::Sorter { sort_cursor, .. } = row_source { + emit_group_by_sort_loop_end( + program, + *sort_cursor, + label_sort_loop_start, + label_sort_loop_end, + ); + } + Ok(()) +} +pub fn group_by_emit_row_phase<'a>( + program: &mut ProgramBuilder, + t_ctx: &mut TranslateCtx<'a>, + plan: &'a SelectPlan, +) -> Result<()> { + let group_by = plan.group_by.as_ref().expect("group by not found"); + let GroupByMetadata { + row_source, + labels, + registers, + .. + } = t_ctx + .meta_group_by + .as_ref() + .expect("group by metadata not found"); program.add_comment(program.offset(), "emit row for final group"); program.emit_insn(Insn::Gosub { - target_pc: label_subrtn_acc_output, - return_reg: reg_subrtn_acc_output_return_offset, + target_pc: labels.label_subrtn_acc_output, + return_reg: registers.reg_subrtn_acc_output_return_offset, }); program.add_comment(program.offset(), "group by finished"); program.emit_insn(Insn::Goto { - target_pc: label_group_by_end, + target_pc: labels.label_group_by_end, }); program.emit_insn(Insn::Integer { value: 1, - dest: reg_abort_flag, + dest: registers.reg_abort_flag, }); program.emit_insn(Insn::Return { - return_reg: reg_subrtn_acc_output_return_offset, + return_reg: registers.reg_subrtn_acc_output_return_offset, }); - program.resolve_label(label_subrtn_acc_output, program.offset()); + program.resolve_label(labels.label_subrtn_acc_output, program.offset()); // Only output a row if there's data in the accumulator program.add_comment(program.offset(), "output group by row subroutine start"); program.emit_insn(Insn::IfPos { - reg: reg_data_in_acc_flag, - target_pc: label_agg_final, + reg: registers.reg_data_in_acc_flag, + target_pc: labels.label_agg_final, decrement_by: 0, }); // If no data, return without outputting a row - let group_by_end_without_emitting_row_label = program.allocate_label(); - program.resolve_label(group_by_end_without_emitting_row_label, program.offset()); + program.resolve_label( + labels.label_group_by_end_without_emitting_row, + program.offset(), + ); program.emit_insn(Insn::Return { - return_reg: reg_subrtn_acc_output_return_offset, + return_reg: registers.reg_subrtn_acc_output_return_offset, }); // Finalize aggregate values for output let agg_start_reg = t_ctx.reg_agg_start.unwrap(); // Resolve the label for the start of the group by output row subroutine - program.resolve_label(label_agg_final, program.offset()); + program.resolve_label(labels.label_agg_final, program.offset()); for (i, agg) in plan.aggregates.iter().enumerate() { let agg_result_reg = agg_start_reg + i; program.emit_insn(Insn::AggFinal { @@ -409,6 +741,13 @@ pub fn emit_group_by<'a>( }); } + // We have to know which group by expr present in resulting set + let group_by_expr_in_res_cols = group_by.exprs.iter().map(|expr| { + plan.result_columns + .iter() + .any(|e| exprs_are_equivalent(&e.expr, expr)) + }); + // Map GROUP BY expressions to their registers in the result set for (i, (expr, is_in_result)) in group_by .exprs @@ -417,8 +756,21 @@ pub fn emit_group_by<'a>( .enumerate() { if is_in_result { - if let Some(reg) = &column_register_mapping.get(i).and_then(|opt| *opt) { - t_ctx.resolver.expr_to_reg_cache.push((expr, *reg)); + match row_source { + GroupByRowSource::Sorter { + column_register_mapping, + .. + } => { + if let Some(reg) = column_register_mapping.get(i).and_then(|opt| *opt) { + t_ctx.resolver.expr_to_reg_cache.push((expr, reg)); + } + } + GroupByRowSource::MainLoop { start_reg_dest, .. } => { + t_ctx + .resolver + .expr_to_reg_cache + .push((expr, *start_reg_dest + i)); + } } } } @@ -430,9 +782,25 @@ pub fn emit_group_by<'a>( .filter(|rc| !rc.contains_aggregates && !is_column_in_group_by(&rc.expr, &group_by.exprs)); for (idx, rc) in non_agg_cols.enumerate() { - let sorter_idx = group_by_count + idx; - if let Some(reg) = column_register_mapping.get(sorter_idx).and_then(|opt| *opt) { - t_ctx.resolver.expr_to_reg_cache.push((&rc.expr, reg)); + let column_relative_idx = plan.group_by_col_count() + idx; + match &row_source { + GroupByRowSource::Sorter { + column_register_mapping, + .. + } => { + if let Some(reg) = column_register_mapping + .get(column_relative_idx) + .and_then(|opt| *opt) + { + t_ctx.resolver.expr_to_reg_cache.push((&rc.expr, reg)); + } + } + GroupByRowSource::MainLoop { start_reg_dest, .. } => { + t_ctx + .resolver + .expr_to_reg_cache + .push((&rc.expr, start_reg_dest + column_relative_idx)); + } } } @@ -452,7 +820,7 @@ pub fn emit_group_by<'a>( expr, ConditionMetadata { jump_if_condition_is_true: false, - jump_target_when_false: group_by_end_without_emitting_row_label, + jump_target_when_false: labels.label_group_by_end_without_emitting_row, jump_target_when_true: BranchOffset::Placeholder, // not used. FIXME: this is a bug. HAVING can have e.g. HAVING a OR b. }, &t_ctx.resolver, @@ -464,44 +832,54 @@ pub fn emit_group_by<'a>( None => { emit_select_result( program, - t_ctx, + &t_ctx.resolver, plan, - Some(label_group_by_end), - Some(group_by_end_without_emitting_row_label), + Some(labels.label_group_by_end), + Some(labels.label_group_by_end_without_emitting_row), + t_ctx.reg_nonagg_emit_once_flag, + t_ctx.reg_offset, + t_ctx.reg_result_cols_start.unwrap(), + t_ctx.reg_limit, + t_ctx.reg_limit_offset_sum, )?; } Some(_) => { - order_by_sorter_insert(program, t_ctx, plan)?; + order_by_sorter_insert( + program, + &t_ctx.resolver, + t_ctx + .meta_sort + .as_ref() + .expect("sort metadata must exist for ORDER BY"), + &mut t_ctx.result_column_indexes_in_orderby_sorter, + plan, + )?; } } program.emit_insn(Insn::Return { - return_reg: reg_subrtn_acc_output_return_offset, + return_reg: registers.reg_subrtn_acc_output_return_offset, }); // Subroutine to clear accumulators for a new group program.add_comment(program.offset(), "clear accumulator subroutine start"); - program.resolve_label(label_subrtn_acc_clear, program.offset()); - let start_reg = reg_non_aggregate_exprs_acc; + program.resolve_label(labels.label_subrtn_acc_clear, program.offset()); + let start_reg = registers.reg_non_aggregate_exprs_acc; // Reset all accumulator registers to NULL program.emit_insn(Insn::Null { dest: start_reg, - dest_end: Some( - start_reg + non_group_by_non_agg_column_count + group_by_count + plan.aggregates.len() - - 1, - ), + dest_end: Some(start_reg + plan.group_by_sorter_column_count() - 1), }); program.emit_insn(Insn::Integer { value: 0, - dest: reg_data_in_acc_flag, + dest: registers.reg_data_in_acc_flag, }); program.emit_insn(Insn::Return { - return_reg: reg_subrtn_acc_clear_return_offset, + return_reg: registers.reg_subrtn_acc_clear_return_offset, }); - program.preassign_label_to_next_insn(label_group_by_end); - + program.preassign_label_to_next_insn(labels.label_group_by_end); Ok(()) } @@ -514,26 +892,17 @@ pub fn emit_group_by<'a>( pub fn translate_aggregation_step_groupby( program: &mut ProgramBuilder, referenced_tables: &[TableReference], - group_by_sorter_cursor_id: usize, - cursor_index: usize, - agg: &Aggregate, + agg_arg_source: GroupByAggArgumentSource, target_register: usize, resolver: &Resolver, ) -> Result { - let emit_column = |program: &mut ProgramBuilder, expr_reg: usize| { - program.emit_insn(Insn::Column { - cursor_id: group_by_sorter_cursor_id, - column: cursor_index, - dest: expr_reg, - }); - }; - let dest = match agg.func { + let num_args = agg_arg_source.num_args(); + let dest = match agg_arg_source.agg_func() { AggFunc::Avg => { - if agg.args.len() != 1 { + if num_args != 1 { crate::bail_parse_error!("avg bad number of arguments"); } - let expr_reg = program.alloc_register(); - emit_column(program, expr_reg); + let expr_reg = agg_arg_source.translate(program, 0)?; program.emit_insn(Insn::AggStep { acc_reg: target_register, col: expr_reg, @@ -543,13 +912,12 @@ pub fn translate_aggregation_step_groupby( target_register } AggFunc::Count | AggFunc::Count0 => { - let expr_reg = program.alloc_register(); - emit_column(program, expr_reg); + let expr_reg = agg_arg_source.translate(program, 0)?; program.emit_insn(Insn::AggStep { acc_reg: target_register, col: expr_reg, delimiter: 0, - func: if matches!(agg.func, AggFunc::Count0) { + func: if matches!(agg_arg_source.agg_func(), AggFunc::Count0) { AggFunc::Count0 } else { AggFunc::Count @@ -558,19 +926,19 @@ pub fn translate_aggregation_step_groupby( target_register } AggFunc::GroupConcat => { - if agg.args.len() != 1 && agg.args.len() != 2 { + let num_args = agg_arg_source.num_args(); + if num_args != 1 && num_args != 2 { crate::bail_parse_error!("group_concat bad number of arguments"); } - let expr_reg = program.alloc_register(); let delimiter_reg = program.alloc_register(); let delimiter_expr: ast::Expr; - if agg.args.len() == 2 { - match &agg.args[1] { + if num_args == 2 { + match &agg_arg_source.args()[1] { ast::Expr::Column { .. } => { - delimiter_expr = agg.args[1].clone(); + delimiter_expr = agg_arg_source.args()[1].clone(); } ast::Expr::Literal(ast::Literal::String(s)) => { delimiter_expr = ast::Expr::Literal(ast::Literal::String(s.to_string())); @@ -581,7 +949,7 @@ pub fn translate_aggregation_step_groupby( delimiter_expr = ast::Expr::Literal(ast::Literal::String(String::from("\",\""))); } - emit_column(program, expr_reg); + let expr_reg = agg_arg_source.translate(program, 0)?; translate_expr( program, Some(referenced_tables), @@ -600,11 +968,10 @@ pub fn translate_aggregation_step_groupby( target_register } AggFunc::Max => { - if agg.args.len() != 1 { + if num_args != 1 { crate::bail_parse_error!("max bad number of arguments"); } - let expr_reg = program.alloc_register(); - emit_column(program, expr_reg); + let expr_reg = agg_arg_source.translate(program, 0)?; program.emit_insn(Insn::AggStep { acc_reg: target_register, col: expr_reg, @@ -614,11 +981,10 @@ pub fn translate_aggregation_step_groupby( target_register } AggFunc::Min => { - if agg.args.len() != 1 { + if num_args != 1 { crate::bail_parse_error!("min bad number of arguments"); } - let expr_reg = program.alloc_register(); - emit_column(program, expr_reg); + let expr_reg = agg_arg_source.translate(program, 0)?; program.emit_insn(Insn::AggStep { acc_reg: target_register, col: expr_reg, @@ -629,11 +995,10 @@ pub fn translate_aggregation_step_groupby( } #[cfg(feature = "json")] AggFunc::JsonGroupArray | AggFunc::JsonbGroupArray => { - if agg.args.len() != 1 { + if num_args != 1 { crate::bail_parse_error!("min bad number of arguments"); } - let expr_reg = program.alloc_register(); - emit_column(program, expr_reg); + let expr_reg = agg_arg_source.translate(program, 0)?; program.emit_insn(Insn::AggStep { acc_reg: target_register, col: expr_reg, @@ -644,22 +1009,12 @@ pub fn translate_aggregation_step_groupby( } #[cfg(feature = "json")] AggFunc::JsonGroupObject | AggFunc::JsonbGroupObject => { - if agg.args.len() != 2 { + if num_args != 2 { crate::bail_parse_error!("max bad number of arguments"); } - let expr = &agg.args[0]; - let expr_reg = program.alloc_register(); - let value_expr = &agg.args[1]; - let value_reg = program.alloc_register(); - let _ = translate_expr(program, Some(referenced_tables), expr, expr_reg, resolver)?; - let _ = translate_expr( - program, - Some(referenced_tables), - value_expr, - value_reg, - resolver, - )?; + let expr_reg = agg_arg_source.translate(program, 0)?; + let value_reg = agg_arg_source.translate(program, 1)?; program.emit_insn(Insn::AggStep { acc_reg: target_register, @@ -670,22 +1025,21 @@ pub fn translate_aggregation_step_groupby( target_register } AggFunc::StringAgg => { - if agg.args.len() != 2 { + if num_args != 2 { crate::bail_parse_error!("string_agg bad number of arguments"); } - let expr_reg = program.alloc_register(); let delimiter_reg = program.alloc_register(); - let delimiter_expr = match &agg.args[1] { - ast::Expr::Column { .. } => agg.args[1].clone(), + let delimiter_expr = match &agg_arg_source.args()[1] { + ast::Expr::Column { .. } => agg_arg_source.args()[1].clone(), ast::Expr::Literal(ast::Literal::String(s)) => { ast::Expr::Literal(ast::Literal::String(s.to_string())) } _ => crate::bail_parse_error!("Incorrect delimiter parameter"), }; - emit_column(program, expr_reg); + let expr_reg = agg_arg_source.translate(program, 0)?; translate_expr( program, Some(referenced_tables), @@ -704,11 +1058,10 @@ pub fn translate_aggregation_step_groupby( target_register } AggFunc::Sum => { - if agg.args.len() != 1 { + if num_args != 1 { crate::bail_parse_error!("sum bad number of arguments"); } - let expr_reg = program.alloc_register(); - emit_column(program, expr_reg); + let expr_reg = agg_arg_source.translate(program, 0)?; program.emit_insn(Insn::AggStep { acc_reg: target_register, col: expr_reg, @@ -718,11 +1071,10 @@ pub fn translate_aggregation_step_groupby( target_register } AggFunc::Total => { - if agg.args.len() != 1 { + if num_args != 1 { crate::bail_parse_error!("total bad number of arguments"); } - let expr_reg = program.alloc_register(); - emit_column(program, expr_reg); + let expr_reg = agg_arg_source.translate(program, 0)?; program.emit_insn(Insn::AggStep { acc_reg: target_register, col: expr_reg, diff --git a/core/translate/main_loop.rs b/core/translate/main_loop.rs index a5732b0a1..4b25474e5 100644 --- a/core/translate/main_loop.rs +++ b/core/translate/main_loop.rs @@ -22,7 +22,7 @@ use super::{ translate_condition_expr, translate_expr, translate_expr_no_constant_opt, ConditionMetadata, NoConstantOptReason, }, - group_by::is_column_in_group_by, + group_by::{group_by_agg_phase, GroupByMetadata, GroupByRowSource}, optimizer::Optimizable, order_by::{order_by_sorter_insert, sorter_insert}, plan::{ @@ -562,11 +562,12 @@ pub fn open_loop( /// SQLite (and so Limbo) processes joins as a nested loop. /// The loop may emit rows to various destinations depending on the query: /// - a GROUP BY sorter (grouping is done by sorting based on the GROUP BY keys and aggregating while the GROUP BY keys match) +/// - a GROUP BY phase with no sorting (when the rows are already in the order required by the GROUP BY keys) /// - an ORDER BY sorter (when there is no GROUP BY, but there is an ORDER BY) /// - an AggStep (the columns are collected for aggregation, which is finished later) /// - a QueryResult (there is none of the above, so the loop either emits a ResultRow, or if it's a subquery, yields to the parent query) enum LoopEmitTarget { - GroupBySorter, + GroupBy, OrderBySorter, AggStep, QueryResult, @@ -574,14 +575,15 @@ enum LoopEmitTarget { /// Emits the bytecode for the inner loop of a query. /// At this point the cursors for all tables have been opened and rewound. -pub fn emit_loop( +pub fn emit_loop<'a>( program: &mut ProgramBuilder, - t_ctx: &mut TranslateCtx, - plan: &mut SelectPlan, + t_ctx: &mut TranslateCtx<'a>, + plan: &'a SelectPlan, ) -> Result<()> { - // if we have a group by, we emit a record into the group by sorter. + // if we have a group by, we emit a record into the group by sorter, + // or if the rows are already sorted, we do the group by aggregation phase directly. if plan.group_by.is_some() { - return emit_loop_source(program, t_ctx, plan, LoopEmitTarget::GroupBySorter); + return emit_loop_source(program, t_ctx, plan, LoopEmitTarget::GroupBy); } // if we DONT have a group by, but we have aggregates, we emit without ResultRow. // we also do not need to sort because we are emitting a single row. @@ -599,50 +601,31 @@ pub fn emit_loop( /// This is a helper function for inner_loop_emit, /// which does a different thing depending on the emit target. /// See the InnerLoopEmitTarget enum for more details. -fn emit_loop_source( +fn emit_loop_source<'a>( program: &mut ProgramBuilder, - t_ctx: &mut TranslateCtx, - plan: &SelectPlan, + t_ctx: &mut TranslateCtx<'a>, + plan: &'a SelectPlan, emit_target: LoopEmitTarget, ) -> Result<()> { match emit_target { - LoopEmitTarget::GroupBySorter => { - // This function creates a sorter for GROUP BY operations by allocating registers and - // translating expressions for three types of columns: + LoopEmitTarget::GroupBy => { + // This function either: + // - creates a sorter for GROUP BY operations by allocating registers and translating expressions for three types of columns: // 1) GROUP BY columns (used as sorting keys) // 2) non-aggregate, non-GROUP BY columns // 3) aggregate function arguments + // - or if the rows produced by the loop are already sorted in the order required by the GROUP BY keys, + // the group by comparisons are done directly inside the main loop. let group_by = plan.group_by.as_ref().unwrap(); let aggregates = &plan.aggregates; - // Identify columns in the result set that are neither in GROUP BY nor contain aggregates - let non_group_by_non_agg_expr = plan - .result_columns - .iter() - .filter(|rc| { - !rc.contains_aggregates && !is_column_in_group_by(&rc.expr, &group_by.exprs) - }) - .map(|rc| &rc.expr); - let non_agg_count = non_group_by_non_agg_expr.clone().count(); - // Store the count of non-GROUP BY, non-aggregate columns in the metadata - // This will be used later during aggregation processing - t_ctx.meta_group_by.as_mut().map(|meta| { - meta.non_group_by_non_agg_column_count = Some(non_agg_count); - meta - }); + let GroupByMetadata { + row_source, + registers, + .. + } = t_ctx.meta_group_by.as_ref().unwrap(); - // Calculate the total number of arguments used across all aggregate functions - let aggregate_arguments_count = plan - .aggregates - .iter() - .map(|agg| agg.args.len()) - .sum::(); - - // Calculate total number of registers needed for all columns in the sorter - let column_count = group_by.exprs.len() + aggregate_arguments_count + non_agg_count; - - // Allocate a contiguous block of registers for all columns - let start_reg = program.alloc_registers(column_count); + let start_reg = registers.reg_group_by_source_cols_start; let mut cur_reg = start_reg; // Step 1: Process GROUP BY columns first @@ -662,7 +645,7 @@ fn emit_loop_source( // Step 2: Process columns that aren't part of GROUP BY and don't contain aggregates // Example: SELECT col1, col2, SUM(col3) FROM table GROUP BY col1 // Here col2 would be processed in this loop if it's in the result set - for expr in non_group_by_non_agg_expr { + for expr in plan.non_group_by_non_agg_columns() { let key_reg = cur_reg; cur_reg += 1; translate_expr( @@ -693,19 +676,36 @@ fn emit_loop_source( } } - let group_by_metadata = t_ctx.meta_group_by.as_ref().unwrap(); - - sorter_insert( - program, - start_reg, - column_count, - group_by_metadata.sort_cursor, - group_by_metadata.reg_sorter_key, - ); + match row_source { + GroupByRowSource::Sorter { + sort_cursor, + sorter_column_count, + reg_sorter_key, + .. + } => { + sorter_insert( + program, + start_reg, + *sorter_column_count, + *sort_cursor, + *reg_sorter_key, + ); + } + GroupByRowSource::MainLoop { .. } => group_by_agg_phase(program, t_ctx, plan)?, + } Ok(()) } - LoopEmitTarget::OrderBySorter => order_by_sorter_insert(program, t_ctx, plan), + LoopEmitTarget::OrderBySorter => order_by_sorter_insert( + program, + &t_ctx.resolver, + t_ctx + .meta_sort + .as_ref() + .expect("sort metadata must exist for ORDER BY"), + &mut t_ctx.result_column_indexes_in_orderby_sorter, + plan, + ), LoopEmitTarget::AggStep => { let num_aggs = plan.aggregates.len(); let start_reg = program.alloc_registers(num_aggs); @@ -778,10 +778,15 @@ fn emit_loop_source( .or(t_ctx.label_main_loop_end); emit_select_result( program, - t_ctx, + &t_ctx.resolver, plan, t_ctx.label_main_loop_end, offset_jump_to, + t_ctx.reg_nonagg_emit_once_flag, + t_ctx.reg_offset, + t_ctx.reg_result_cols_start.unwrap(), + t_ctx.reg_limit, + t_ctx.reg_limit_offset_sum, )?; Ok(()) diff --git a/core/translate/order_by.rs b/core/translate/order_by.rs index 0a43e6683..5260e8286 100644 --- a/core/translate/order_by.rs +++ b/core/translate/order_by.rs @@ -13,7 +13,7 @@ use crate::{ }; use super::{ - emitter::TranslateCtx, + emitter::{Resolver, TranslateCtx}, expr::translate_expr, plan::{ResultSetColumn, SelectPlan}, result_row::{emit_offset, emit_result_row_and_limit}, @@ -121,7 +121,7 @@ pub fn emit_order_by( }); program.preassign_label_to_next_insn(sort_loop_start_label); - emit_offset(program, t_ctx, plan, sort_loop_next_label)?; + emit_offset(program, plan, sort_loop_next_label, t_ctx.reg_offset)?; program.emit_insn(Insn::SorterData { cursor_id: sort_cursor, @@ -142,7 +142,15 @@ pub fn emit_order_by( }); } - emit_result_row_and_limit(program, t_ctx, plan, start_reg, Some(sort_loop_end_label))?; + emit_result_row_and_limit( + program, + plan, + start_reg, + t_ctx.reg_limit, + t_ctx.reg_offset, + t_ctx.reg_limit_offset_sum, + Some(sort_loop_end_label), + )?; program.resolve_label(sort_loop_next_label, program.offset()); program.emit_insn(Insn::SorterNext { @@ -157,7 +165,9 @@ pub fn emit_order_by( /// Emits the bytecode for inserting a row into an ORDER BY sorter. pub fn order_by_sorter_insert( program: &mut ProgramBuilder, - t_ctx: &mut TranslateCtx, + resolver: &Resolver, + sort_metadata: &SortMetadata, + res_col_indexes_in_orderby_sorter: &mut Vec, plan: &SelectPlan, ) -> Result<()> { let order_by = plan.order_by.as_ref().unwrap(); @@ -181,7 +191,7 @@ pub fn order_by_sorter_insert( Some(&plan.table_references), expr, key_reg, - &t_ctx.resolver, + resolver, )?; } let mut cur_reg = start_reg + order_by_len; @@ -191,9 +201,7 @@ pub fn order_by_sorter_insert( let found = v.iter().find(|(skipped_idx, _)| *skipped_idx == i); // If the result column is in the list of columns to skip, we need to know its new index in the ORDER BY sorter. if let Some((_, result_column_idx)) = found { - t_ctx - .result_column_indexes_in_orderby_sorter - .insert(i, *result_column_idx); + res_col_indexes_in_orderby_sorter.insert(i, *result_column_idx); continue; } } @@ -202,11 +210,9 @@ pub fn order_by_sorter_insert( Some(&plan.table_references), &rc.expr, cur_reg, - &t_ctx.resolver, + resolver, )?; - t_ctx - .result_column_indexes_in_orderby_sorter - .insert(i, cur_idx_in_orderby_sorter); + res_col_indexes_in_orderby_sorter.insert(i, cur_idx_in_orderby_sorter); cur_idx_in_orderby_sorter += 1; cur_reg += 1; } @@ -214,14 +220,14 @@ pub fn order_by_sorter_insert( let SortMetadata { sort_cursor, reg_sorter_data, - } = *t_ctx.meta_sort.as_mut().unwrap(); + } = sort_metadata; sorter_insert( program, start_reg, orderby_sorter_column_count, - sort_cursor, - reg_sorter_data, + *sort_cursor, + *reg_sorter_data, ); Ok(()) } diff --git a/core/translate/plan.rs b/core/translate/plan.rs index 7106a0e23..3c511c8e0 100644 --- a/core/translate/plan.rs +++ b/core/translate/plan.rs @@ -11,6 +11,7 @@ use std::{ use crate::{ function::AggFunc, schema::{BTreeTable, Column, Index, Table}, + util::exprs_are_equivalent, vdbe::{ builder::{CursorType, ProgramBuilder}, BranchOffset, CursorID, @@ -65,6 +66,7 @@ pub struct GroupBy { pub exprs: Vec, /// having clause split into a vec at 'AND' boundaries. pub having: Option>, + pub sort_order: Option>, } /// In a query plan, WHERE clause conditions and JOIN conditions are all folded into a vector of WhereTerm. @@ -308,6 +310,41 @@ pub struct SelectPlan { pub query_type: SelectQueryType, } +impl SelectPlan { + pub fn agg_args_count(&self) -> usize { + self.aggregates.iter().map(|agg| agg.args.len()).sum() + } + + pub fn group_by_col_count(&self) -> usize { + self.group_by + .as_ref() + .map_or(0, |group_by| group_by.exprs.len()) + } + + pub fn non_group_by_non_agg_columns(&self) -> impl Iterator { + self.result_columns + .iter() + .filter(|c| { + !c.contains_aggregates + && !self.group_by.as_ref().map_or(false, |group_by| { + group_by + .exprs + .iter() + .any(|expr| exprs_are_equivalent(&c.expr, expr)) + }) + }) + .map(|c| &c.expr) + } + + pub fn non_group_by_non_agg_column_count(&self) -> usize { + self.non_group_by_non_agg_columns().count() + } + + pub fn group_by_sorter_column_count(&self) -> usize { + self.agg_args_count() + self.group_by_col_count() + self.non_group_by_non_agg_column_count() + } +} + #[allow(dead_code)] #[derive(Debug, Clone)] pub struct DeletePlan { diff --git a/core/translate/result_row.rs b/core/translate/result_row.rs index dc24cee67..d674ae0f9 100644 --- a/core/translate/result_row.rs +++ b/core/translate/result_row.rs @@ -4,7 +4,7 @@ use crate::{ }; use super::{ - emitter::TranslateCtx, + emitter::Resolver, expr::translate_expr, plan::{SelectPlan, SelectQueryType}, }; @@ -15,16 +15,21 @@ use super::{ /// - limit pub fn emit_select_result( program: &mut ProgramBuilder, - t_ctx: &mut TranslateCtx, + resolver: &Resolver, plan: &SelectPlan, label_on_limit_reached: Option, offset_jump_to: Option, + reg_nonagg_emit_once_flag: Option, + reg_offset: Option, + reg_result_cols_start: usize, + reg_limit: Option, + reg_limit_offset_sum: Option, ) -> Result<()> { if let (Some(jump_to), Some(_)) = (offset_jump_to, label_on_limit_reached) { - emit_offset(program, t_ctx, plan, jump_to)?; + emit_offset(program, plan, jump_to, reg_offset)?; } - let start_reg = t_ctx.reg_result_cols_start.unwrap(); + let start_reg = reg_result_cols_start; for (i, rc) in plan.result_columns.iter().enumerate().filter(|(_, rc)| { // For aggregate queries, we handle columns differently; example: select id, first_name, sum(age) from users limit 1; // 1. Columns with aggregates (e.g., sum(age)) are computed in each iteration of aggregation @@ -32,8 +37,8 @@ pub fn emit_select_result( // This filter ensures we only emit expressions for non aggregate columns once, // preserving previously calculated values while updating aggregate results // For all other queries where reg_nonagg_emit_once_flag is none we do nothing. - t_ctx.reg_nonagg_emit_once_flag.is_some() && rc.contains_aggregates - || t_ctx.reg_nonagg_emit_once_flag.is_none() + reg_nonagg_emit_once_flag.is_some() && rc.contains_aggregates + || reg_nonagg_emit_once_flag.is_none() }) { let reg = start_reg + i; translate_expr( @@ -41,10 +46,18 @@ pub fn emit_select_result( Some(&plan.table_references), &rc.expr, reg, - &t_ctx.resolver, + resolver, )?; } - emit_result_row_and_limit(program, t_ctx, plan, start_reg, label_on_limit_reached)?; + emit_result_row_and_limit( + program, + plan, + start_reg, + reg_limit, + reg_offset, + reg_limit_offset_sum, + label_on_limit_reached, + )?; Ok(()) } @@ -53,9 +66,11 @@ pub fn emit_select_result( /// - limit pub fn emit_result_row_and_limit( program: &mut ProgramBuilder, - t_ctx: &mut TranslateCtx, plan: &SelectPlan, result_columns_start_reg: usize, + reg_limit: Option, + reg_offset: Option, + reg_limit_offset_sum: Option, label_on_limit_reached: Option, ) -> Result<()> { match &plan.query_type { @@ -82,27 +97,27 @@ pub fn emit_result_row_and_limit( } program.emit_insn(Insn::Integer { value: limit as i64, - dest: t_ctx.reg_limit.unwrap(), + dest: reg_limit.expect("reg_limit must be Some"), }); program.mark_last_insn_constant(); if let Some(offset) = plan.offset { program.emit_insn(Insn::Integer { value: offset as i64, - dest: t_ctx.reg_offset.unwrap(), + dest: reg_offset.expect("reg_offset must be Some"), }); program.mark_last_insn_constant(); program.emit_insn(Insn::OffsetLimit { - limit_reg: t_ctx.reg_limit.unwrap(), - combined_reg: t_ctx.reg_limit_offset_sum.unwrap(), - offset_reg: t_ctx.reg_offset.unwrap(), + limit_reg: reg_limit.expect("reg_limit must be Some"), + combined_reg: reg_limit_offset_sum.expect("reg_limit_offset_sum must be Some"), + offset_reg: reg_offset.expect("reg_offset must be Some"), }); program.mark_last_insn_constant(); } program.emit_insn(Insn::DecrJumpZero { - reg: t_ctx.reg_limit.unwrap(), + reg: reg_limit.expect("reg_limit must be Some"), target_pc: label_on_limit_reached.unwrap(), }); } @@ -111,15 +126,15 @@ pub fn emit_result_row_and_limit( pub fn emit_offset( program: &mut ProgramBuilder, - t_ctx: &mut TranslateCtx, plan: &SelectPlan, jump_to: BranchOffset, + reg_offset: Option, ) -> Result<()> { match plan.offset { Some(offset) if offset > 0 => { program.add_comment(program.offset(), "OFFSET"); program.emit_insn(Insn::IfPos { - reg: t_ctx.reg_offset.unwrap(), + reg: reg_offset.expect("reg_offset must be Some"), target_pc: jump_to, decrement_by: 1, }); diff --git a/core/translate/select.rs b/core/translate/select.rs index f9529e92a..1bf6905da 100644 --- a/core/translate/select.rs +++ b/core/translate/select.rs @@ -12,7 +12,7 @@ use crate::util::normalize_ident; use crate::vdbe::builder::{ProgramBuilderOpts, QueryMode}; use crate::SymbolTable; use crate::{schema::Schema, vdbe::builder::ProgramBuilder, Result}; -use limbo_sqlite3_parser::ast::{self}; +use limbo_sqlite3_parser::ast::{self, SortOrder}; use limbo_sqlite3_parser::ast::{ResultColumn, SelectInner}; pub fn translate_select( @@ -328,6 +328,7 @@ pub fn prepare_select_plan<'a>( } plan.group_by = Some(GroupBy { + sort_order: Some((0..group_by.exprs.len()).map(|_| SortOrder::Asc).collect()), exprs: group_by.exprs, having: if let Some(having) = group_by.having { let mut predicates = vec![];