GROUP BY: refactor logic to support cases where no sorting is needed

This commit is contained in:
Jussi Saurio
2025-05-03 12:47:32 +03:00
parent ae2561dbca
commit 37097e01ae
8 changed files with 785 additions and 347 deletions

View File

@@ -41,7 +41,18 @@ pub fn emit_ungrouped_aggregation<'a>(
// This always emits a ResultRow because currently it can only be used for a single row result
// Limit is None because we early exit on limit 0 and the max rows here is 1
emit_select_result(program, t_ctx, plan, None, None)?;
emit_select_result(
program,
&t_ctx.resolver,
plan,
None,
None,
t_ctx.reg_nonagg_emit_once_flag,
t_ctx.reg_offset,
t_ctx.reg_result_cols_start.unwrap(),
t_ctx.reg_limit,
t_ctx.reg_limit_offset_sum,
)?;
Ok(())
}

View File

@@ -17,7 +17,9 @@ use crate::{Result, SymbolTable};
use super::aggregation::emit_ungrouped_aggregation;
use super::expr::{translate_condition_expr, translate_expr, ConditionMetadata};
use super::group_by::{emit_group_by, init_group_by, GroupByMetadata};
use super::group_by::{
group_by_agg_phase, group_by_emit_row_phase, init_group_by, GroupByMetadata, GroupByRowSource,
};
use super::main_loop::{close_loop, emit_loop, init_loop, open_loop, LeftJoinMetadata, LoopLabels};
use super::order_by::{emit_order_by, init_order_by, SortMetadata};
use super::plan::{JoinOrderMember, Operation, SelectPlan, TableReference, UpdatePlan};
@@ -324,9 +326,18 @@ pub fn emit_query<'a>(
let mut order_by_necessary = plan.order_by.is_some() && !plan.contains_constant_false_condition;
let order_by = plan.order_by.as_ref();
// Handle GROUP BY and aggregation processing
if plan.group_by.is_some() {
emit_group_by(program, t_ctx, plan)?;
let row_source = &t_ctx
.meta_group_by
.as_ref()
.expect("group by metadata not found")
.row_source;
if matches!(row_source, GroupByRowSource::Sorter { .. }) {
group_by_agg_phase(program, t_ctx, plan)?;
}
group_by_emit_row_phase(program, t_ctx, plan)?;
} else if !plan.aggregates.is_empty() {
// Handle aggregation without GROUP BY
emit_ungrouped_aggregation(program, t_ctx, plan)?;

File diff suppressed because it is too large Load Diff

View File

@@ -22,7 +22,7 @@ use super::{
translate_condition_expr, translate_expr, translate_expr_no_constant_opt,
ConditionMetadata, NoConstantOptReason,
},
group_by::is_column_in_group_by,
group_by::{group_by_agg_phase, GroupByMetadata, GroupByRowSource},
optimizer::Optimizable,
order_by::{order_by_sorter_insert, sorter_insert},
plan::{
@@ -562,11 +562,12 @@ pub fn open_loop(
/// SQLite (and so Limbo) processes joins as a nested loop.
/// The loop may emit rows to various destinations depending on the query:
/// - a GROUP BY sorter (grouping is done by sorting based on the GROUP BY keys and aggregating while the GROUP BY keys match)
/// - a GROUP BY phase with no sorting (when the rows are already in the order required by the GROUP BY keys)
/// - an ORDER BY sorter (when there is no GROUP BY, but there is an ORDER BY)
/// - an AggStep (the columns are collected for aggregation, which is finished later)
/// - a QueryResult (there is none of the above, so the loop either emits a ResultRow, or if it's a subquery, yields to the parent query)
enum LoopEmitTarget {
GroupBySorter,
GroupBy,
OrderBySorter,
AggStep,
QueryResult,
@@ -574,14 +575,15 @@ enum LoopEmitTarget {
/// Emits the bytecode for the inner loop of a query.
/// At this point the cursors for all tables have been opened and rewound.
pub fn emit_loop(
pub fn emit_loop<'a>(
program: &mut ProgramBuilder,
t_ctx: &mut TranslateCtx,
plan: &mut SelectPlan,
t_ctx: &mut TranslateCtx<'a>,
plan: &'a SelectPlan,
) -> Result<()> {
// if we have a group by, we emit a record into the group by sorter.
// if we have a group by, we emit a record into the group by sorter,
// or if the rows are already sorted, we do the group by aggregation phase directly.
if plan.group_by.is_some() {
return emit_loop_source(program, t_ctx, plan, LoopEmitTarget::GroupBySorter);
return emit_loop_source(program, t_ctx, plan, LoopEmitTarget::GroupBy);
}
// if we DONT have a group by, but we have aggregates, we emit without ResultRow.
// we also do not need to sort because we are emitting a single row.
@@ -599,50 +601,31 @@ pub fn emit_loop(
/// This is a helper function for inner_loop_emit,
/// which does a different thing depending on the emit target.
/// See the InnerLoopEmitTarget enum for more details.
fn emit_loop_source(
fn emit_loop_source<'a>(
program: &mut ProgramBuilder,
t_ctx: &mut TranslateCtx,
plan: &SelectPlan,
t_ctx: &mut TranslateCtx<'a>,
plan: &'a SelectPlan,
emit_target: LoopEmitTarget,
) -> Result<()> {
match emit_target {
LoopEmitTarget::GroupBySorter => {
// This function creates a sorter for GROUP BY operations by allocating registers and
// translating expressions for three types of columns:
LoopEmitTarget::GroupBy => {
// This function either:
// - creates a sorter for GROUP BY operations by allocating registers and translating expressions for three types of columns:
// 1) GROUP BY columns (used as sorting keys)
// 2) non-aggregate, non-GROUP BY columns
// 3) aggregate function arguments
// - or if the rows produced by the loop are already sorted in the order required by the GROUP BY keys,
// the group by comparisons are done directly inside the main loop.
let group_by = plan.group_by.as_ref().unwrap();
let aggregates = &plan.aggregates;
// Identify columns in the result set that are neither in GROUP BY nor contain aggregates
let non_group_by_non_agg_expr = plan
.result_columns
.iter()
.filter(|rc| {
!rc.contains_aggregates && !is_column_in_group_by(&rc.expr, &group_by.exprs)
})
.map(|rc| &rc.expr);
let non_agg_count = non_group_by_non_agg_expr.clone().count();
// Store the count of non-GROUP BY, non-aggregate columns in the metadata
// This will be used later during aggregation processing
t_ctx.meta_group_by.as_mut().map(|meta| {
meta.non_group_by_non_agg_column_count = Some(non_agg_count);
meta
});
let GroupByMetadata {
row_source,
registers,
..
} = t_ctx.meta_group_by.as_ref().unwrap();
// Calculate the total number of arguments used across all aggregate functions
let aggregate_arguments_count = plan
.aggregates
.iter()
.map(|agg| agg.args.len())
.sum::<usize>();
// Calculate total number of registers needed for all columns in the sorter
let column_count = group_by.exprs.len() + aggregate_arguments_count + non_agg_count;
// Allocate a contiguous block of registers for all columns
let start_reg = program.alloc_registers(column_count);
let start_reg = registers.reg_group_by_source_cols_start;
let mut cur_reg = start_reg;
// Step 1: Process GROUP BY columns first
@@ -662,7 +645,7 @@ fn emit_loop_source(
// Step 2: Process columns that aren't part of GROUP BY and don't contain aggregates
// Example: SELECT col1, col2, SUM(col3) FROM table GROUP BY col1
// Here col2 would be processed in this loop if it's in the result set
for expr in non_group_by_non_agg_expr {
for expr in plan.non_group_by_non_agg_columns() {
let key_reg = cur_reg;
cur_reg += 1;
translate_expr(
@@ -693,19 +676,36 @@ fn emit_loop_source(
}
}
let group_by_metadata = t_ctx.meta_group_by.as_ref().unwrap();
sorter_insert(
program,
start_reg,
column_count,
group_by_metadata.sort_cursor,
group_by_metadata.reg_sorter_key,
);
match row_source {
GroupByRowSource::Sorter {
sort_cursor,
sorter_column_count,
reg_sorter_key,
..
} => {
sorter_insert(
program,
start_reg,
*sorter_column_count,
*sort_cursor,
*reg_sorter_key,
);
}
GroupByRowSource::MainLoop { .. } => group_by_agg_phase(program, t_ctx, plan)?,
}
Ok(())
}
LoopEmitTarget::OrderBySorter => order_by_sorter_insert(program, t_ctx, plan),
LoopEmitTarget::OrderBySorter => order_by_sorter_insert(
program,
&t_ctx.resolver,
t_ctx
.meta_sort
.as_ref()
.expect("sort metadata must exist for ORDER BY"),
&mut t_ctx.result_column_indexes_in_orderby_sorter,
plan,
),
LoopEmitTarget::AggStep => {
let num_aggs = plan.aggregates.len();
let start_reg = program.alloc_registers(num_aggs);
@@ -778,10 +778,15 @@ fn emit_loop_source(
.or(t_ctx.label_main_loop_end);
emit_select_result(
program,
t_ctx,
&t_ctx.resolver,
plan,
t_ctx.label_main_loop_end,
offset_jump_to,
t_ctx.reg_nonagg_emit_once_flag,
t_ctx.reg_offset,
t_ctx.reg_result_cols_start.unwrap(),
t_ctx.reg_limit,
t_ctx.reg_limit_offset_sum,
)?;
Ok(())

View File

@@ -13,7 +13,7 @@ use crate::{
};
use super::{
emitter::TranslateCtx,
emitter::{Resolver, TranslateCtx},
expr::translate_expr,
plan::{ResultSetColumn, SelectPlan},
result_row::{emit_offset, emit_result_row_and_limit},
@@ -121,7 +121,7 @@ pub fn emit_order_by(
});
program.preassign_label_to_next_insn(sort_loop_start_label);
emit_offset(program, t_ctx, plan, sort_loop_next_label)?;
emit_offset(program, plan, sort_loop_next_label, t_ctx.reg_offset)?;
program.emit_insn(Insn::SorterData {
cursor_id: sort_cursor,
@@ -142,7 +142,15 @@ pub fn emit_order_by(
});
}
emit_result_row_and_limit(program, t_ctx, plan, start_reg, Some(sort_loop_end_label))?;
emit_result_row_and_limit(
program,
plan,
start_reg,
t_ctx.reg_limit,
t_ctx.reg_offset,
t_ctx.reg_limit_offset_sum,
Some(sort_loop_end_label),
)?;
program.resolve_label(sort_loop_next_label, program.offset());
program.emit_insn(Insn::SorterNext {
@@ -157,7 +165,9 @@ pub fn emit_order_by(
/// Emits the bytecode for inserting a row into an ORDER BY sorter.
pub fn order_by_sorter_insert(
program: &mut ProgramBuilder,
t_ctx: &mut TranslateCtx,
resolver: &Resolver,
sort_metadata: &SortMetadata,
res_col_indexes_in_orderby_sorter: &mut Vec<usize>,
plan: &SelectPlan,
) -> Result<()> {
let order_by = plan.order_by.as_ref().unwrap();
@@ -181,7 +191,7 @@ pub fn order_by_sorter_insert(
Some(&plan.table_references),
expr,
key_reg,
&t_ctx.resolver,
resolver,
)?;
}
let mut cur_reg = start_reg + order_by_len;
@@ -191,9 +201,7 @@ pub fn order_by_sorter_insert(
let found = v.iter().find(|(skipped_idx, _)| *skipped_idx == i);
// If the result column is in the list of columns to skip, we need to know its new index in the ORDER BY sorter.
if let Some((_, result_column_idx)) = found {
t_ctx
.result_column_indexes_in_orderby_sorter
.insert(i, *result_column_idx);
res_col_indexes_in_orderby_sorter.insert(i, *result_column_idx);
continue;
}
}
@@ -202,11 +210,9 @@ pub fn order_by_sorter_insert(
Some(&plan.table_references),
&rc.expr,
cur_reg,
&t_ctx.resolver,
resolver,
)?;
t_ctx
.result_column_indexes_in_orderby_sorter
.insert(i, cur_idx_in_orderby_sorter);
res_col_indexes_in_orderby_sorter.insert(i, cur_idx_in_orderby_sorter);
cur_idx_in_orderby_sorter += 1;
cur_reg += 1;
}
@@ -214,14 +220,14 @@ pub fn order_by_sorter_insert(
let SortMetadata {
sort_cursor,
reg_sorter_data,
} = *t_ctx.meta_sort.as_mut().unwrap();
} = sort_metadata;
sorter_insert(
program,
start_reg,
orderby_sorter_column_count,
sort_cursor,
reg_sorter_data,
*sort_cursor,
*reg_sorter_data,
);
Ok(())
}

View File

@@ -11,6 +11,7 @@ use std::{
use crate::{
function::AggFunc,
schema::{BTreeTable, Column, Index, Table},
util::exprs_are_equivalent,
vdbe::{
builder::{CursorType, ProgramBuilder},
BranchOffset, CursorID,
@@ -65,6 +66,7 @@ pub struct GroupBy {
pub exprs: Vec<ast::Expr>,
/// having clause split into a vec at 'AND' boundaries.
pub having: Option<Vec<ast::Expr>>,
pub sort_order: Option<Vec<SortOrder>>,
}
/// In a query plan, WHERE clause conditions and JOIN conditions are all folded into a vector of WhereTerm.
@@ -308,6 +310,41 @@ pub struct SelectPlan {
pub query_type: SelectQueryType,
}
impl SelectPlan {
pub fn agg_args_count(&self) -> usize {
self.aggregates.iter().map(|agg| agg.args.len()).sum()
}
pub fn group_by_col_count(&self) -> usize {
self.group_by
.as_ref()
.map_or(0, |group_by| group_by.exprs.len())
}
pub fn non_group_by_non_agg_columns(&self) -> impl Iterator<Item = &ast::Expr> {
self.result_columns
.iter()
.filter(|c| {
!c.contains_aggregates
&& !self.group_by.as_ref().map_or(false, |group_by| {
group_by
.exprs
.iter()
.any(|expr| exprs_are_equivalent(&c.expr, expr))
})
})
.map(|c| &c.expr)
}
pub fn non_group_by_non_agg_column_count(&self) -> usize {
self.non_group_by_non_agg_columns().count()
}
pub fn group_by_sorter_column_count(&self) -> usize {
self.agg_args_count() + self.group_by_col_count() + self.non_group_by_non_agg_column_count()
}
}
#[allow(dead_code)]
#[derive(Debug, Clone)]
pub struct DeletePlan {

View File

@@ -4,7 +4,7 @@ use crate::{
};
use super::{
emitter::TranslateCtx,
emitter::Resolver,
expr::translate_expr,
plan::{SelectPlan, SelectQueryType},
};
@@ -15,16 +15,21 @@ use super::{
/// - limit
pub fn emit_select_result(
program: &mut ProgramBuilder,
t_ctx: &mut TranslateCtx,
resolver: &Resolver,
plan: &SelectPlan,
label_on_limit_reached: Option<BranchOffset>,
offset_jump_to: Option<BranchOffset>,
reg_nonagg_emit_once_flag: Option<usize>,
reg_offset: Option<usize>,
reg_result_cols_start: usize,
reg_limit: Option<usize>,
reg_limit_offset_sum: Option<usize>,
) -> Result<()> {
if let (Some(jump_to), Some(_)) = (offset_jump_to, label_on_limit_reached) {
emit_offset(program, t_ctx, plan, jump_to)?;
emit_offset(program, plan, jump_to, reg_offset)?;
}
let start_reg = t_ctx.reg_result_cols_start.unwrap();
let start_reg = reg_result_cols_start;
for (i, rc) in plan.result_columns.iter().enumerate().filter(|(_, rc)| {
// For aggregate queries, we handle columns differently; example: select id, first_name, sum(age) from users limit 1;
// 1. Columns with aggregates (e.g., sum(age)) are computed in each iteration of aggregation
@@ -32,8 +37,8 @@ pub fn emit_select_result(
// This filter ensures we only emit expressions for non aggregate columns once,
// preserving previously calculated values while updating aggregate results
// For all other queries where reg_nonagg_emit_once_flag is none we do nothing.
t_ctx.reg_nonagg_emit_once_flag.is_some() && rc.contains_aggregates
|| t_ctx.reg_nonagg_emit_once_flag.is_none()
reg_nonagg_emit_once_flag.is_some() && rc.contains_aggregates
|| reg_nonagg_emit_once_flag.is_none()
}) {
let reg = start_reg + i;
translate_expr(
@@ -41,10 +46,18 @@ pub fn emit_select_result(
Some(&plan.table_references),
&rc.expr,
reg,
&t_ctx.resolver,
resolver,
)?;
}
emit_result_row_and_limit(program, t_ctx, plan, start_reg, label_on_limit_reached)?;
emit_result_row_and_limit(
program,
plan,
start_reg,
reg_limit,
reg_offset,
reg_limit_offset_sum,
label_on_limit_reached,
)?;
Ok(())
}
@@ -53,9 +66,11 @@ pub fn emit_select_result(
/// - limit
pub fn emit_result_row_and_limit(
program: &mut ProgramBuilder,
t_ctx: &mut TranslateCtx,
plan: &SelectPlan,
result_columns_start_reg: usize,
reg_limit: Option<usize>,
reg_offset: Option<usize>,
reg_limit_offset_sum: Option<usize>,
label_on_limit_reached: Option<BranchOffset>,
) -> Result<()> {
match &plan.query_type {
@@ -82,27 +97,27 @@ pub fn emit_result_row_and_limit(
}
program.emit_insn(Insn::Integer {
value: limit as i64,
dest: t_ctx.reg_limit.unwrap(),
dest: reg_limit.expect("reg_limit must be Some"),
});
program.mark_last_insn_constant();
if let Some(offset) = plan.offset {
program.emit_insn(Insn::Integer {
value: offset as i64,
dest: t_ctx.reg_offset.unwrap(),
dest: reg_offset.expect("reg_offset must be Some"),
});
program.mark_last_insn_constant();
program.emit_insn(Insn::OffsetLimit {
limit_reg: t_ctx.reg_limit.unwrap(),
combined_reg: t_ctx.reg_limit_offset_sum.unwrap(),
offset_reg: t_ctx.reg_offset.unwrap(),
limit_reg: reg_limit.expect("reg_limit must be Some"),
combined_reg: reg_limit_offset_sum.expect("reg_limit_offset_sum must be Some"),
offset_reg: reg_offset.expect("reg_offset must be Some"),
});
program.mark_last_insn_constant();
}
program.emit_insn(Insn::DecrJumpZero {
reg: t_ctx.reg_limit.unwrap(),
reg: reg_limit.expect("reg_limit must be Some"),
target_pc: label_on_limit_reached.unwrap(),
});
}
@@ -111,15 +126,15 @@ pub fn emit_result_row_and_limit(
pub fn emit_offset(
program: &mut ProgramBuilder,
t_ctx: &mut TranslateCtx,
plan: &SelectPlan,
jump_to: BranchOffset,
reg_offset: Option<usize>,
) -> Result<()> {
match plan.offset {
Some(offset) if offset > 0 => {
program.add_comment(program.offset(), "OFFSET");
program.emit_insn(Insn::IfPos {
reg: t_ctx.reg_offset.unwrap(),
reg: reg_offset.expect("reg_offset must be Some"),
target_pc: jump_to,
decrement_by: 1,
});

View File

@@ -12,7 +12,7 @@ use crate::util::normalize_ident;
use crate::vdbe::builder::{ProgramBuilderOpts, QueryMode};
use crate::SymbolTable;
use crate::{schema::Schema, vdbe::builder::ProgramBuilder, Result};
use limbo_sqlite3_parser::ast::{self};
use limbo_sqlite3_parser::ast::{self, SortOrder};
use limbo_sqlite3_parser::ast::{ResultColumn, SelectInner};
pub fn translate_select(
@@ -328,6 +328,7 @@ pub fn prepare_select_plan<'a>(
}
plan.group_by = Some(GroupBy {
sort_order: Some((0..group_by.exprs.len()).map(|_| SortOrder::Asc).collect()),
exprs: group_by.exprs,
having: if let Some(having) = group_by.having {
let mut predicates = vec![];