From f8257df77b70642ddf1a8dff9d3baa80a3cbfcf9 Mon Sep 17 00:00:00 2001 From: Jussi Saurio Date: Thu, 29 May 2025 18:43:58 +0300 Subject: [PATCH] Fix: aggregate regs must be initialized as NULL at the start --- core/translate/emitter.rs | 4 ++++ core/translate/group_by.rs | 29 +++++++++++++++++++++-------- core/translate/main_loop.rs | 6 +++--- core/vdbe/builder.rs | 13 +++++++++++++ core/vdbe/execute.rs | 29 ++++++++++++++++++++++------- 5 files changed, 63 insertions(+), 18 deletions(-) diff --git a/core/translate/emitter.rs b/core/translate/emitter.rs index bb6892334..7290b955f 100644 --- a/core/translate/emitter.rs +++ b/core/translate/emitter.rs @@ -553,6 +553,10 @@ pub fn emit_query<'a>( if let Some(ref group_by) = plan.group_by { init_group_by(program, t_ctx, group_by, &plan)?; + } else if !plan.aggregates.is_empty() { + // Aggregate registers need to be NULLed at the start because the same registers might be reused on another invocation of a subquery, + // and if they are not NULLed, the 2nd invocation of the same subquery will have values left over from the first invocation. + t_ctx.reg_agg_start = Some(program.alloc_registers_and_init_w_null(plan.aggregates.len())); } init_distinct(program, plan); diff --git a/core/translate/group_by.rs b/core/translate/group_by.rs index afcffcfdc..a1dacdd7b 100644 --- a/core/translate/group_by.rs +++ b/core/translate/group_by.rs @@ -88,8 +88,6 @@ pub fn init_group_by( group_by: &GroupBy, plan: &SelectPlan, ) -> Result<()> { - let num_aggs = plan.aggregates.len(); - let non_aggregate_count = plan .result_columns .iter() @@ -110,8 +108,18 @@ pub fn init_group_by( let reg_data_in_acc_flag = program.alloc_register(); let reg_abort_flag = program.alloc_register(); let reg_group_exprs_cmp = program.alloc_registers(group_by.exprs.len()); + + // The following two blocks of registers should always be allocated contiguously, + // because they are cleared in a contiguous block in the GROUP BYs clear accumulator subroutine. + // START BLOCK let reg_non_aggregate_exprs_acc = program.alloc_registers(non_aggregate_count); - let reg_agg_exprs_start = program.alloc_registers(num_aggs); + if !plan.aggregates.is_empty() { + // Aggregate registers need to be NULLed at the start because the same registers might be reused on another invocation of a subquery, + // and if they are not NULLed, the 2nd invocation of the same subquery will have values left over from the first invocation. + t_ctx.reg_agg_start = Some(program.alloc_registers_and_init_w_null(plan.aggregates.len())); + } + // END BLOCK + let reg_sorter_key = program.alloc_register(); let column_count = plan.group_by_sorter_column_count(); let reg_group_by_source_cols_start = program.alloc_registers(column_count); @@ -203,8 +211,6 @@ pub fn init_group_by( return_reg: reg_subrtn_acc_clear_return_offset, }); - t_ctx.reg_agg_start = Some(reg_agg_exprs_start); - t_ctx.meta_group_by = Some(GroupByMetadata { row_source, labels: GroupByLabels { @@ -552,10 +558,12 @@ pub fn group_by_process_single_group( // Process each aggregate function for the current row program.resolve_label(labels.label_grouping_agg_step, program.offset()); - let start_reg = t_ctx.reg_agg_start.unwrap(); let cursor_index = *non_group_by_non_agg_column_count + group_by.exprs.len(); // Skipping all columns in sorter that not an aggregation arguments let mut offset = 0; for (i, agg) in plan.aggregates.iter().enumerate() { + let start_reg = t_ctx + .reg_agg_start + .expect("aggregate registers must be initialized"); let agg_result_reg = start_reg + i; let agg_arg_source = match &row_source { GroupByRowSource::Sorter { pseudo_cursor, .. } => { @@ -788,11 +796,13 @@ pub fn group_by_emit_row_phase<'a>( can_fallthrough: false, }); - // Finalize aggregate values for output - let agg_start_reg = t_ctx.reg_agg_start.unwrap(); // Resolve the label for the start of the group by output row subroutine program.resolve_label(labels.label_agg_final, program.offset()); + // Finalize aggregate values for output for (i, agg) in plan.aggregates.iter().enumerate() { + let agg_start_reg = t_ctx + .reg_agg_start + .expect("aggregate registers must be initialized"); let agg_result_reg = agg_start_reg + i; program.emit_insn(Insn::AggFinal { register: agg_result_reg, @@ -865,6 +875,9 @@ pub fn group_by_emit_row_phase<'a>( // Map aggregate expressions to their result registers for (i, agg) in plan.aggregates.iter().enumerate() { + let agg_start_reg = t_ctx + .reg_agg_start + .expect("aggregate registers must be initialized"); t_ctx .resolver .expr_to_reg_cache diff --git a/core/translate/main_loop.rs b/core/translate/main_loop.rs index 1e695730e..3fad6c017 100644 --- a/core/translate/main_loop.rs +++ b/core/translate/main_loop.rs @@ -799,9 +799,9 @@ fn emit_loop_source<'a>( Ok(()) } LoopEmitTarget::AggStep => { - let num_aggs = plan.aggregates.len(); - let start_reg = program.alloc_registers(num_aggs); - t_ctx.reg_agg_start = Some(start_reg); + let start_reg = t_ctx + .reg_agg_start + .expect("aggregate registers must be initialized"); // In planner.rs, we have collected all aggregates from the SELECT clause, including ones where the aggregate is embedded inside // a more complex expression. Some examples: length(sum(x)), sum(x) + avg(y), sum(x) + 1, etc. diff --git a/core/vdbe/builder.rs b/core/vdbe/builder.rs index aa3f9a315..895a07fe4 100644 --- a/core/vdbe/builder.rs +++ b/core/vdbe/builder.rs @@ -248,6 +248,19 @@ impl ProgramBuilder { reg } + pub fn alloc_registers_and_init_w_null(&mut self, amount: usize) -> usize { + let reg = self.alloc_registers(amount); + self.emit_insn(Insn::Null { + dest: reg, + dest_end: if amount == 1 { + None + } else { + Some(reg + amount - 1) + }, + }); + reg + } + pub fn alloc_cursor_id_keyed(&mut self, key: CursorKey, cursor_type: CursorType) -> usize { assert!( !self diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 84b9d7562..d3a49e48f 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -2482,7 +2482,10 @@ pub fn op_agg_step( AggFunc::Avg => { let col = state.registers[*col].clone(); let Register::Aggregate(agg) = state.registers[*acc_reg].borrow_mut() else { - unreachable!(); + panic!( + "Unexpected value {:?} in AggStep at register {}", + state.registers[*acc_reg], *acc_reg + ); }; let AggContext::Avg(acc, count) = agg.borrow_mut() else { unreachable!(); @@ -2493,7 +2496,10 @@ pub fn op_agg_step( AggFunc::Sum | AggFunc::Total => { let col = state.registers[*col].clone(); let Register::Aggregate(agg) = state.registers[*acc_reg].borrow_mut() else { - unreachable!(); + panic!( + "Unexpected value {:?} at register {:?} in AggStep", + state.registers[*acc_reg], *acc_reg + ); }; let AggContext::Sum(acc) = agg.borrow_mut() else { unreachable!(); @@ -2512,7 +2518,10 @@ pub fn op_agg_step( Register::Aggregate(AggContext::Count(Value::Integer(0))); } let Register::Aggregate(agg) = state.registers[*acc_reg].borrow_mut() else { - unreachable!(); + panic!( + "Unexpected value {:?} in AggStep at register {}", + state.registers[*acc_reg], *acc_reg + ); }; let AggContext::Count(count) = agg.borrow_mut() else { unreachable!(); @@ -2525,7 +2534,10 @@ pub fn op_agg_step( AggFunc::Max => { let col = state.registers[*col].clone(); let Register::Aggregate(agg) = state.registers[*acc_reg].borrow_mut() else { - unreachable!(); + panic!( + "Unexpected value {:?} in AggStep at register {}", + state.registers[*acc_reg], *acc_reg + ); }; let AggContext::Max(acc) = agg.borrow_mut() else { unreachable!(); @@ -2558,7 +2570,10 @@ pub fn op_agg_step( AggFunc::Min => { let col = state.registers[*col].clone(); let Register::Aggregate(agg) = state.registers[*acc_reg].borrow_mut() else { - unreachable!(); + panic!( + "Unexpected value {:?} in AggStep", + state.registers[*acc_reg] + ); }; let AggContext::Min(acc) = agg.borrow_mut() else { unreachable!(); @@ -2806,8 +2821,8 @@ pub fn op_agg_final( _ => {} } } - _ => { - unreachable!(); + other => { + panic!("Unexpected value {:?} in AggFinal", other); } }; state.pc += 1;