emitter.rs: use way less arguments to functions

This commit is contained in:
Jussi Saurio
2025-01-04 12:59:30 +02:00
parent d2b73e8492
commit e31317fbb5

View File

@@ -424,43 +424,19 @@ fn emit_query<'a>(
let mut order_by_necessary = plan.order_by.is_some() && !plan.contains_constant_false_condition;
let order_by = plan.order_by.as_ref();
// Handle GROUP BY and aggregation processing
if let Some(ref mut group_by) = plan.group_by {
emit_group_by(
program,
t_ctx,
&plan.result_columns,
&plan.referenced_tables,
group_by,
&plan.aggregates,
order_by,
plan.limit,
&plan.query_type,
)?;
if plan.group_by.is_some() {
emit_group_by(program, t_ctx, plan)?;
} else if !plan.aggregates.is_empty() {
// Handle aggregation without GROUP BY
emit_ungrouped_aggregation(
program,
t_ctx,
&plan.result_columns,
&plan.referenced_tables,
&plan.aggregates,
&plan.query_type,
)?;
emit_ungrouped_aggregation(program, t_ctx, plan)?;
// Single row result for aggregates without GROUP BY, so ORDER BY not needed
order_by_necessary = false;
}
// Process ORDER BY results if needed
if let Some(order_by) = order_by {
if order_by.is_some() {
if order_by_necessary {
emit_order_by(
program,
t_ctx,
order_by,
&plan.result_columns,
plan.limit,
&plan.query_type,
)?;
emit_order_by(program, t_ctx, plan)?;
}
}
@@ -1152,19 +1128,11 @@ fn open_loop(
/// - an ORDER BY sorter (when there is no GROUP BY, but there is an ORDER BY)
/// - an AggStep (the columns are collected for aggregation, which is finished later)
/// - a QueryResult (there is none of the above, so the loop either emits a ResultRow, or if it's a subquery, yields to the parent query)
pub enum LoopEmitTarget<'a> {
GroupBySorter {
group_by: &'a GroupBy,
aggregates: &'a Vec<Aggregate>,
},
OrderBySorter {
order_by: &'a Vec<(ast::Expr, Direction)>,
},
pub enum LoopEmitTarget {
GroupBySorter,
OrderBySorter,
AggStep,
QueryResult {
query_type: &'a SelectQueryType,
limit: Option<usize>,
},
QueryResult,
}
/// Emits the bytecode for the inner loop of a query.
@@ -1175,54 +1143,20 @@ fn emit_loop(
plan: &mut SelectPlan,
) -> Result<()> {
// if we have a group by, we emit a record into the group by sorter.
if let Some(group_by) = &plan.group_by {
return emit_loop_source(
program,
t_ctx,
&plan.result_columns,
&plan.aggregates,
&plan.referenced_tables,
LoopEmitTarget::GroupBySorter {
group_by,
aggregates: &plan.aggregates,
},
);
if plan.group_by.is_some() {
return emit_loop_source(program, t_ctx, plan, LoopEmitTarget::GroupBySorter);
}
// if we DONT have a group by, but we have aggregates, we emit without ResultRow.
// we also do not need to sort because we are emitting a single row.
if !plan.aggregates.is_empty() {
return emit_loop_source(
program,
t_ctx,
&plan.result_columns,
&plan.aggregates,
&plan.referenced_tables,
LoopEmitTarget::AggStep,
);
return emit_loop_source(program, t_ctx, plan, LoopEmitTarget::AggStep);
}
// if we DONT have a group by, but we have an order by, we emit a record into the order by sorter.
if let Some(order_by) = &plan.order_by {
return emit_loop_source(
program,
t_ctx,
&plan.result_columns,
&plan.aggregates,
&plan.referenced_tables,
LoopEmitTarget::OrderBySorter { order_by },
);
if plan.order_by.is_some() {
return emit_loop_source(program, t_ctx, plan, LoopEmitTarget::OrderBySorter);
}
// if we have neither, we emit a ResultRow. In that case, if we have a Limit, we handle that with DecrJumpZero.
emit_loop_source(
program,
t_ctx,
&plan.result_columns,
&plan.aggregates,
&plan.referenced_tables,
LoopEmitTarget::QueryResult {
query_type: &plan.query_type,
limit: plan.limit,
},
)
emit_loop_source(program, t_ctx, plan, LoopEmitTarget::QueryResult)
}
/// This is a helper function for inner_loop_emit,
@@ -1231,19 +1165,19 @@ fn emit_loop(
fn emit_loop_source(
program: &mut ProgramBuilder,
t_ctx: &mut TranslateCtx,
result_columns: &[ResultSetColumn],
aggregates: &[Aggregate],
referenced_tables: &[TableReference],
plan: &SelectPlan,
emit_target: LoopEmitTarget,
) -> Result<()> {
match emit_target {
LoopEmitTarget::GroupBySorter {
group_by,
aggregates,
} => {
LoopEmitTarget::GroupBySorter => {
let group_by = plan.group_by.as_ref().unwrap();
let aggregates = &plan.aggregates;
let sort_keys_count = group_by.exprs.len();
let aggregate_arguments_count =
aggregates.iter().map(|agg| agg.args.len()).sum::<usize>();
let aggregate_arguments_count = plan
.aggregates
.iter()
.map(|agg| agg.args.len())
.sum::<usize>();
let column_count = sort_keys_count + aggregate_arguments_count;
let start_reg = program.alloc_registers(column_count);
let mut cur_reg = start_reg;
@@ -1254,7 +1188,7 @@ fn emit_loop_source(
cur_reg += 1;
translate_expr(
program,
Some(referenced_tables),
Some(&plan.referenced_tables),
expr,
key_reg,
&t_ctx.resolver,
@@ -1273,7 +1207,7 @@ fn emit_loop_source(
cur_reg += 1;
translate_expr(
program,
Some(referenced_tables),
Some(&plan.referenced_tables),
expr,
agg_reg,
&t_ctx.resolver,
@@ -1296,12 +1230,9 @@ fn emit_loop_source(
Ok(())
}
LoopEmitTarget::OrderBySorter { order_by } => {
order_by_sorter_insert(program, t_ctx, referenced_tables, order_by, result_columns)?;
Ok(())
}
LoopEmitTarget::OrderBySorter => order_by_sorter_insert(program, t_ctx, plan),
LoopEmitTarget::AggStep => {
let num_aggs = aggregates.len();
let num_aggs = plan.aggregates.len();
let start_reg = program.alloc_registers(num_aggs);
t_ctx.reg_agg_start = Some(start_reg);
@@ -1309,11 +1240,11 @@ fn emit_loop_source(
// a more complex expression. Some examples: length(sum(x)), sum(x) + avg(y), sum(x) + 1, etc.
// The result of those more complex expressions depends on the final result of the aggregate, so we don't translate the complete expressions here.
// Instead, we translate the aggregates + any expressions that do not contain aggregates.
for (i, agg) in aggregates.iter().enumerate() {
for (i, agg) in plan.aggregates.iter().enumerate() {
let reg = start_reg + i;
translate_aggregation(program, referenced_tables, agg, reg, &t_ctx.resolver)?;
translate_aggregation(program, &plan.referenced_tables, agg, reg, &t_ctx.resolver)?;
}
for (i, rc) in result_columns.iter().enumerate() {
for (i, rc) in plan.result_columns.iter().enumerate() {
if rc.contains_aggregates {
// Do nothing, aggregates are computed above
// if this result column is e.g. something like sum(x) + 1 or length(sum(x)), we do not want to translate that (+1) or length() yet,
@@ -1323,7 +1254,7 @@ fn emit_loop_source(
let reg = start_reg + num_aggs + i;
translate_expr(
program,
Some(referenced_tables),
Some(&plan.referenced_tables),
&rc.expr,
reg,
&t_ctx.resolver,
@@ -1331,25 +1262,12 @@ fn emit_loop_source(
}
Ok(())
}
LoopEmitTarget::QueryResult { query_type, limit } => {
LoopEmitTarget::QueryResult => {
assert!(
aggregates.is_empty(),
plan.aggregates.is_empty(),
"We should not get here with aggregates"
);
emit_select_result(
program,
t_ctx,
referenced_tables,
result_columns,
limit.map(|l| {
(
l,
t_ctx.reg_limit.unwrap(),
t_ctx.label_main_loop_end.unwrap(),
)
}),
query_type,
)?;
emit_select_result(program, t_ctx, plan, t_ctx.label_main_loop_end)?;
Ok(())
}
@@ -1560,17 +1478,10 @@ fn emit_delete_insns<'a>(
/// Emits the bytecode for processing a GROUP BY clause.
/// This is called when the main query execution loop has finished processing,
/// and we now have data in the GROUP BY sorter.
#[allow(clippy::too_many_arguments)]
fn emit_group_by<'a>(
program: &mut ProgramBuilder,
t_ctx: &mut TranslateCtx<'a>,
result_columns: &'a [ResultSetColumn],
referenced_tables: &'a [TableReference],
group_by: &'a GroupBy,
aggregates: &'a [Aggregate],
order_by: Option<&'a Vec<(ast::Expr, Direction)>>,
limit: Option<usize>,
query_type: &'a SelectQueryType,
plan: &'a SelectPlan,
) -> Result<()> {
// Label for the first instruction of the grouping loop.
// This is the start of the loop that reads the sorted data and groups&aggregates it.
@@ -1601,10 +1512,16 @@ fn emit_group_by<'a>(
..
} = *t_ctx.meta_group_by.as_mut().unwrap();
let group_by = plan.group_by.as_ref().unwrap();
// all group by columns and all arguments of agg functions are in the sorter.
// the sort keys are the group by columns (the aggregation within groups is done based on how long the sort keys remain the same)
let sorter_column_count =
group_by.exprs.len() + aggregates.iter().map(|agg| agg.args.len()).sum::<usize>();
let sorter_column_count = group_by.exprs.len()
+ plan
.aggregates
.iter()
.map(|agg| agg.args.len())
.sum::<usize>();
// sorter column names do not matter
let pseudo_columns = (0..sorter_column_count)
.map(|i| Column {
@@ -1722,11 +1639,11 @@ fn emit_group_by<'a>(
program.resolve_label(agg_step_label, program.offset());
let start_reg = t_ctx.reg_agg_start.unwrap();
let mut cursor_index = group_by.exprs.len();
for (i, agg) in aggregates.iter().enumerate() {
for (i, agg) in plan.aggregates.iter().enumerate() {
let agg_result_reg = start_reg + i;
translate_aggregation_groupby(
program,
referenced_tables,
&plan.referenced_tables,
pseudo_cursor,
cursor_index,
agg,
@@ -1825,7 +1742,7 @@ fn emit_group_by<'a>(
let agg_start_reg = t_ctx.reg_agg_start.unwrap();
// Resolve the label for the start of the group by output row subroutine
program.resolve_label(label_agg_final, program.offset());
for (i, agg) in aggregates.iter().enumerate() {
for (i, agg) in plan.aggregates.iter().enumerate() {
let agg_result_reg = agg_start_reg + i;
program.emit_insn(Insn::AggFinal {
register: agg_result_reg,
@@ -1843,7 +1760,7 @@ fn emit_group_by<'a>(
.expr_to_reg_cache
.push((expr, reg_group_exprs_acc + i));
}
for (i, agg) in aggregates.iter().enumerate() {
for (i, agg) in plan.aggregates.iter().enumerate() {
t_ctx
.resolver
.expr_to_reg_cache
@@ -1854,7 +1771,7 @@ fn emit_group_by<'a>(
for expr in having.iter() {
translate_condition_expr(
program,
referenced_tables,
&plan.referenced_tables,
expr,
ConditionMetadata {
jump_if_condition_is_true: false,
@@ -1866,19 +1783,12 @@ fn emit_group_by<'a>(
}
}
match order_by {
match &plan.order_by {
None => {
emit_select_result(
program,
t_ctx,
referenced_tables,
result_columns,
limit.map(|l| (l, t_ctx.reg_limit.unwrap(), label_group_by_end)),
query_type,
)?;
emit_select_result(program, t_ctx, plan, Some(label_group_by_end))?;
}
Some(order_by) => {
order_by_sorter_insert(program, t_ctx, referenced_tables, order_by, result_columns)?;
Some(_) => {
order_by_sorter_insert(program, t_ctx, plan)?;
}
}
@@ -1891,7 +1801,7 @@ fn emit_group_by<'a>(
let start_reg = reg_group_exprs_acc;
program.emit_insn(Insn::Null {
dest: start_reg,
dest_end: Some(start_reg + group_by.exprs.len() + aggregates.len() - 1),
dest_end: Some(start_reg + group_by.exprs.len() + plan.aggregates.len() - 1),
});
program.emit_insn(Insn::Integer {
@@ -1913,13 +1823,10 @@ fn emit_group_by<'a>(
fn emit_ungrouped_aggregation<'a>(
program: &mut ProgramBuilder,
t_ctx: &mut TranslateCtx<'a>,
result_columns: &'a [ResultSetColumn],
referenced_tables: &'a [TableReference],
aggregates: &'a [Aggregate],
query_type: &SelectQueryType,
plan: &'a SelectPlan,
) -> Result<()> {
let agg_start_reg = t_ctx.reg_agg_start.unwrap();
for (i, agg) in aggregates.iter().enumerate() {
for (i, agg) in plan.aggregates.iter().enumerate() {
let agg_result_reg = agg_start_reg + i;
program.emit_insn(Insn::AggFinal {
register: agg_result_reg,
@@ -1929,7 +1836,7 @@ fn emit_ungrouped_aggregation<'a>(
// we now have the agg results in (agg_start_reg..agg_start_reg + aggregates.len() - 1)
// we need to call translate_expr on each result column, but replace the expr with a register copy in case any part of the
// result column expression matches a) a group by column or b) an aggregation result.
for (i, agg) in aggregates.iter().enumerate() {
for (i, agg) in plan.aggregates.iter().enumerate() {
t_ctx
.resolver
.expr_to_reg_cache
@@ -1938,14 +1845,7 @@ fn emit_ungrouped_aggregation<'a>(
// This always emits a ResultRow because currently it can only be used for a single row result
// Limit is None because we early exit on limit 0 and the max rows here is 1
emit_select_result(
program,
t_ctx,
referenced_tables,
result_columns,
None,
query_type,
)?;
emit_select_result(program, t_ctx, plan, None)?;
Ok(())
}
@@ -1956,11 +1856,10 @@ fn emit_ungrouped_aggregation<'a>(
fn emit_order_by(
program: &mut ProgramBuilder,
t_ctx: &mut TranslateCtx,
order_by: &[(ast::Expr, Direction)],
result_columns: &[ResultSetColumn],
limit: Option<usize>,
query_type: &SelectQueryType,
plan: &SelectPlan,
) -> Result<()> {
let order_by = plan.order_by.as_ref().unwrap();
let result_columns = &plan.result_columns;
let sort_loop_start_label = program.allocate_label();
let sort_loop_end_label = program.allocate_label();
let mut pseudo_columns = vec![];
@@ -2001,17 +1900,20 @@ fn emit_order_by(
columns: pseudo_columns,
}))),
);
let sort_metadata = t_ctx.meta_sort.as_mut().unwrap();
let SortMetadata {
sort_cursor,
reg_sorter_data,
} = *t_ctx.meta_sort.as_mut().unwrap();
program.emit_insn(Insn::OpenPseudo {
cursor_id: pseudo_cursor,
content_reg: sort_metadata.reg_sorter_data,
content_reg: reg_sorter_data,
num_fields: num_columns_in_sorter,
});
program.emit_insn_with_label_dependency(
Insn::SorterSort {
cursor_id: sort_metadata.sort_cursor,
cursor_id: sort_cursor,
pc_if_empty: sort_loop_end_label,
},
sort_loop_end_label,
@@ -2019,8 +1921,8 @@ fn emit_order_by(
program.defer_label_resolution(sort_loop_start_label, program.offset() as usize);
program.emit_insn(Insn::SorterData {
cursor_id: sort_metadata.sort_cursor,
dest_reg: sort_metadata.reg_sorter_data,
cursor_id: sort_cursor,
dest_reg: reg_sorter_data,
pseudo_cursor,
});
@@ -2037,17 +1939,11 @@ fn emit_order_by(
});
}
emit_result_row_and_limit(
program,
start_reg,
result_columns.len(),
limit.map(|l| (l, t_ctx.reg_limit.unwrap(), sort_loop_end_label)),
query_type,
)?;
emit_result_row_and_limit(program, t_ctx, plan, start_reg, Some(sort_loop_end_label))?;
program.emit_insn_with_label_dependency(
Insn::SorterNext {
cursor_id: sort_metadata.sort_cursor,
cursor_id: sort_cursor,
pc_if_next: sort_loop_start_label,
},
sort_loop_start_label,
@@ -2063,16 +1959,16 @@ fn emit_order_by(
/// - limit
fn emit_result_row_and_limit(
program: &mut ProgramBuilder,
start_reg: usize,
result_columns_len: usize,
limit: Option<(usize, usize, BranchOffset)>,
query_type: &SelectQueryType,
t_ctx: &mut TranslateCtx,
plan: &SelectPlan,
result_columns_start_reg: usize,
label_on_limit_reached: Option<BranchOffset>,
) -> Result<()> {
match query_type {
match &plan.query_type {
SelectQueryType::TopLevel => {
program.emit_insn(Insn::ResultRow {
start_reg,
count: result_columns_len,
start_reg: result_columns_start_reg,
count: plan.result_columns.len(),
});
}
SelectQueryType::Subquery { yield_reg, .. } => {
@@ -2083,18 +1979,24 @@ fn emit_result_row_and_limit(
}
}
if let Some((limit, limit_reg, jump_label_on_limit_reached)) = limit {
if let Some(limit) = plan.limit {
if label_on_limit_reached.is_none() {
// There are cases where LIMIT is ignored, e.g. aggregation without a GROUP BY clause.
// We already early return on LIMIT 0, so we can just return here since the n of rows
// is always 1 here.
return Ok(());
}
program.emit_insn(Insn::Integer {
value: limit as i64,
dest: limit_reg,
dest: t_ctx.reg_limit.unwrap(),
});
program.mark_last_insn_constant();
program.emit_insn_with_label_dependency(
Insn::DecrJumpZero {
reg: limit_reg,
target_pc: jump_label_on_limit_reached,
reg: t_ctx.reg_limit.unwrap(),
target_pc: label_on_limit_reached.unwrap(),
},
jump_label_on_limit_reached,
label_on_limit_reached.unwrap(),
);
}
Ok(())
@@ -2107,23 +2009,21 @@ fn emit_result_row_and_limit(
fn emit_select_result(
program: &mut ProgramBuilder,
t_ctx: &mut TranslateCtx,
referenced_tables: &[TableReference],
result_columns: &[ResultSetColumn],
limit: Option<(usize, usize, BranchOffset)>,
query_type: &SelectQueryType,
plan: &SelectPlan,
label_on_limit_reached: Option<BranchOffset>,
) -> Result<()> {
let start_reg = t_ctx.reg_result_cols_start.unwrap();
for (i, rc) in result_columns.iter().enumerate() {
for (i, rc) in plan.result_columns.iter().enumerate() {
let reg = start_reg + i;
translate_expr(
program,
Some(referenced_tables),
Some(&plan.referenced_tables),
&rc.expr,
reg,
&t_ctx.resolver,
)?;
}
emit_result_row_and_limit(program, start_reg, result_columns.len(), limit, query_type)?;
emit_result_row_and_limit(program, t_ctx, plan, start_reg, label_on_limit_reached)?;
Ok(())
}
@@ -2151,11 +2051,11 @@ fn sorter_insert(
fn order_by_sorter_insert(
program: &mut ProgramBuilder,
t_ctx: &mut TranslateCtx,
referenced_tables: &[TableReference],
order_by: &[(ast::Expr, Direction)],
result_columns: &[ResultSetColumn],
plan: &SelectPlan,
) -> Result<()> {
let order_by = plan.order_by.as_ref().unwrap();
let order_by_len = order_by.len();
let result_columns = &plan.result_columns;
// If any result columns can be skipped due to being an exact duplicate of a sort key, we need to know which ones and their new index in the ORDER BY sorter.
let result_columns_to_skip = order_by_deduplicate_result_columns(order_by, result_columns);
let result_columns_to_skip_len = result_columns_to_skip
@@ -2171,7 +2071,7 @@ fn order_by_sorter_insert(
let key_reg = start_reg + i;
translate_expr(
program,
Some(referenced_tables),
Some(&plan.referenced_tables),
expr,
key_reg,
&t_ctx.resolver,
@@ -2192,7 +2092,7 @@ fn order_by_sorter_insert(
}
translate_expr(
program,
Some(referenced_tables),
Some(&plan.referenced_tables),
&rc.expr,
cur_reg,
&t_ctx.resolver,