mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-21 07:55:18 +01:00
Fix: aggregate regs must be initialized as NULL at the start
This commit is contained in:
@@ -553,6 +553,10 @@ pub fn emit_query<'a>(
|
||||
|
||||
if let Some(ref group_by) = plan.group_by {
|
||||
init_group_by(program, t_ctx, group_by, &plan)?;
|
||||
} else if !plan.aggregates.is_empty() {
|
||||
// Aggregate registers need to be NULLed at the start because the same registers might be reused on another invocation of a subquery,
|
||||
// and if they are not NULLed, the 2nd invocation of the same subquery will have values left over from the first invocation.
|
||||
t_ctx.reg_agg_start = Some(program.alloc_registers_and_init_w_null(plan.aggregates.len()));
|
||||
}
|
||||
|
||||
init_distinct(program, plan);
|
||||
|
||||
@@ -88,8 +88,6 @@ pub fn init_group_by(
|
||||
group_by: &GroupBy,
|
||||
plan: &SelectPlan,
|
||||
) -> Result<()> {
|
||||
let num_aggs = plan.aggregates.len();
|
||||
|
||||
let non_aggregate_count = plan
|
||||
.result_columns
|
||||
.iter()
|
||||
@@ -110,8 +108,18 @@ pub fn init_group_by(
|
||||
let reg_data_in_acc_flag = program.alloc_register();
|
||||
let reg_abort_flag = program.alloc_register();
|
||||
let reg_group_exprs_cmp = program.alloc_registers(group_by.exprs.len());
|
||||
|
||||
// The following two blocks of registers should always be allocated contiguously,
|
||||
// because they are cleared in a contiguous block in the GROUP BYs clear accumulator subroutine.
|
||||
// START BLOCK
|
||||
let reg_non_aggregate_exprs_acc = program.alloc_registers(non_aggregate_count);
|
||||
let reg_agg_exprs_start = program.alloc_registers(num_aggs);
|
||||
if !plan.aggregates.is_empty() {
|
||||
// Aggregate registers need to be NULLed at the start because the same registers might be reused on another invocation of a subquery,
|
||||
// and if they are not NULLed, the 2nd invocation of the same subquery will have values left over from the first invocation.
|
||||
t_ctx.reg_agg_start = Some(program.alloc_registers_and_init_w_null(plan.aggregates.len()));
|
||||
}
|
||||
// END BLOCK
|
||||
|
||||
let reg_sorter_key = program.alloc_register();
|
||||
let column_count = plan.group_by_sorter_column_count();
|
||||
let reg_group_by_source_cols_start = program.alloc_registers(column_count);
|
||||
@@ -203,8 +211,6 @@ pub fn init_group_by(
|
||||
return_reg: reg_subrtn_acc_clear_return_offset,
|
||||
});
|
||||
|
||||
t_ctx.reg_agg_start = Some(reg_agg_exprs_start);
|
||||
|
||||
t_ctx.meta_group_by = Some(GroupByMetadata {
|
||||
row_source,
|
||||
labels: GroupByLabels {
|
||||
@@ -552,10 +558,12 @@ pub fn group_by_process_single_group(
|
||||
|
||||
// Process each aggregate function for the current row
|
||||
program.resolve_label(labels.label_grouping_agg_step, program.offset());
|
||||
let start_reg = t_ctx.reg_agg_start.unwrap();
|
||||
let cursor_index = *non_group_by_non_agg_column_count + group_by.exprs.len(); // Skipping all columns in sorter that not an aggregation arguments
|
||||
let mut offset = 0;
|
||||
for (i, agg) in plan.aggregates.iter().enumerate() {
|
||||
let start_reg = t_ctx
|
||||
.reg_agg_start
|
||||
.expect("aggregate registers must be initialized");
|
||||
let agg_result_reg = start_reg + i;
|
||||
let agg_arg_source = match &row_source {
|
||||
GroupByRowSource::Sorter { pseudo_cursor, .. } => {
|
||||
@@ -788,11 +796,13 @@ pub fn group_by_emit_row_phase<'a>(
|
||||
can_fallthrough: false,
|
||||
});
|
||||
|
||||
// Finalize aggregate values for output
|
||||
let agg_start_reg = t_ctx.reg_agg_start.unwrap();
|
||||
// Resolve the label for the start of the group by output row subroutine
|
||||
program.resolve_label(labels.label_agg_final, program.offset());
|
||||
// Finalize aggregate values for output
|
||||
for (i, agg) in plan.aggregates.iter().enumerate() {
|
||||
let agg_start_reg = t_ctx
|
||||
.reg_agg_start
|
||||
.expect("aggregate registers must be initialized");
|
||||
let agg_result_reg = agg_start_reg + i;
|
||||
program.emit_insn(Insn::AggFinal {
|
||||
register: agg_result_reg,
|
||||
@@ -865,6 +875,9 @@ pub fn group_by_emit_row_phase<'a>(
|
||||
|
||||
// Map aggregate expressions to their result registers
|
||||
for (i, agg) in plan.aggregates.iter().enumerate() {
|
||||
let agg_start_reg = t_ctx
|
||||
.reg_agg_start
|
||||
.expect("aggregate registers must be initialized");
|
||||
t_ctx
|
||||
.resolver
|
||||
.expr_to_reg_cache
|
||||
|
||||
@@ -799,9 +799,9 @@ fn emit_loop_source<'a>(
|
||||
Ok(())
|
||||
}
|
||||
LoopEmitTarget::AggStep => {
|
||||
let num_aggs = plan.aggregates.len();
|
||||
let start_reg = program.alloc_registers(num_aggs);
|
||||
t_ctx.reg_agg_start = Some(start_reg);
|
||||
let start_reg = t_ctx
|
||||
.reg_agg_start
|
||||
.expect("aggregate registers must be initialized");
|
||||
|
||||
// In planner.rs, we have collected all aggregates from the SELECT clause, including ones where the aggregate is embedded inside
|
||||
// a more complex expression. Some examples: length(sum(x)), sum(x) + avg(y), sum(x) + 1, etc.
|
||||
|
||||
@@ -248,6 +248,19 @@ impl ProgramBuilder {
|
||||
reg
|
||||
}
|
||||
|
||||
pub fn alloc_registers_and_init_w_null(&mut self, amount: usize) -> usize {
|
||||
let reg = self.alloc_registers(amount);
|
||||
self.emit_insn(Insn::Null {
|
||||
dest: reg,
|
||||
dest_end: if amount == 1 {
|
||||
None
|
||||
} else {
|
||||
Some(reg + amount - 1)
|
||||
},
|
||||
});
|
||||
reg
|
||||
}
|
||||
|
||||
pub fn alloc_cursor_id_keyed(&mut self, key: CursorKey, cursor_type: CursorType) -> usize {
|
||||
assert!(
|
||||
!self
|
||||
|
||||
@@ -2482,7 +2482,10 @@ pub fn op_agg_step(
|
||||
AggFunc::Avg => {
|
||||
let col = state.registers[*col].clone();
|
||||
let Register::Aggregate(agg) = state.registers[*acc_reg].borrow_mut() else {
|
||||
unreachable!();
|
||||
panic!(
|
||||
"Unexpected value {:?} in AggStep at register {}",
|
||||
state.registers[*acc_reg], *acc_reg
|
||||
);
|
||||
};
|
||||
let AggContext::Avg(acc, count) = agg.borrow_mut() else {
|
||||
unreachable!();
|
||||
@@ -2493,7 +2496,10 @@ pub fn op_agg_step(
|
||||
AggFunc::Sum | AggFunc::Total => {
|
||||
let col = state.registers[*col].clone();
|
||||
let Register::Aggregate(agg) = state.registers[*acc_reg].borrow_mut() else {
|
||||
unreachable!();
|
||||
panic!(
|
||||
"Unexpected value {:?} at register {:?} in AggStep",
|
||||
state.registers[*acc_reg], *acc_reg
|
||||
);
|
||||
};
|
||||
let AggContext::Sum(acc) = agg.borrow_mut() else {
|
||||
unreachable!();
|
||||
@@ -2512,7 +2518,10 @@ pub fn op_agg_step(
|
||||
Register::Aggregate(AggContext::Count(Value::Integer(0)));
|
||||
}
|
||||
let Register::Aggregate(agg) = state.registers[*acc_reg].borrow_mut() else {
|
||||
unreachable!();
|
||||
panic!(
|
||||
"Unexpected value {:?} in AggStep at register {}",
|
||||
state.registers[*acc_reg], *acc_reg
|
||||
);
|
||||
};
|
||||
let AggContext::Count(count) = agg.borrow_mut() else {
|
||||
unreachable!();
|
||||
@@ -2525,7 +2534,10 @@ pub fn op_agg_step(
|
||||
AggFunc::Max => {
|
||||
let col = state.registers[*col].clone();
|
||||
let Register::Aggregate(agg) = state.registers[*acc_reg].borrow_mut() else {
|
||||
unreachable!();
|
||||
panic!(
|
||||
"Unexpected value {:?} in AggStep at register {}",
|
||||
state.registers[*acc_reg], *acc_reg
|
||||
);
|
||||
};
|
||||
let AggContext::Max(acc) = agg.borrow_mut() else {
|
||||
unreachable!();
|
||||
@@ -2558,7 +2570,10 @@ pub fn op_agg_step(
|
||||
AggFunc::Min => {
|
||||
let col = state.registers[*col].clone();
|
||||
let Register::Aggregate(agg) = state.registers[*acc_reg].borrow_mut() else {
|
||||
unreachable!();
|
||||
panic!(
|
||||
"Unexpected value {:?} in AggStep",
|
||||
state.registers[*acc_reg]
|
||||
);
|
||||
};
|
||||
let AggContext::Min(acc) = agg.borrow_mut() else {
|
||||
unreachable!();
|
||||
@@ -2806,8 +2821,8 @@ pub fn op_agg_final(
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
unreachable!();
|
||||
other => {
|
||||
panic!("Unexpected value {:?} in AggFinal", other);
|
||||
}
|
||||
};
|
||||
state.pc += 1;
|
||||
|
||||
Reference in New Issue
Block a user