mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-07 01:04:26 +01:00
Add plumbing to add sequence column to stabilize tiebreakers in order+group by
This commit is contained in:
@@ -299,6 +299,8 @@ pub fn emit_query<'a>(
|
||||
&plan.result_columns,
|
||||
&plan.order_by,
|
||||
&plan.table_references,
|
||||
plan.group_by.is_some(),
|
||||
&plan.aggregates,
|
||||
)?;
|
||||
}
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use turso_parser::ast;
|
||||
use turso_parser::ast::{self, SortOrder};
|
||||
|
||||
use super::{
|
||||
emitter::TranslateCtx,
|
||||
@@ -7,9 +7,12 @@ use super::{
|
||||
plan::{Distinctness, GroupBy, SelectPlan},
|
||||
result_row::emit_select_result,
|
||||
};
|
||||
use crate::translate::aggregation::{translate_aggregation_step, AggArgumentSource};
|
||||
use crate::translate::expr::{walk_expr, WalkControl};
|
||||
use crate::translate::plan::ResultSetColumn;
|
||||
use crate::translate::{
|
||||
aggregation::{translate_aggregation_step, AggArgumentSource},
|
||||
plan::Aggregate,
|
||||
};
|
||||
use crate::{
|
||||
schema::PseudoCursorType,
|
||||
translate::collate::CollationSeq,
|
||||
@@ -231,6 +234,70 @@ pub fn init_group_by<'a>(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns whether an ORDER BY expression should be treated as an
|
||||
/// aggregate-position term for the purposes of tie-ordering.
|
||||
///
|
||||
/// We classify an ORDER BY term as "aggregate or constant" when:
|
||||
/// it is syntactically equivalent to one of the finalized aggregate
|
||||
/// expressions for this SELECT (`COUNT(*)`, `SUM(col)`, `MAX(price)`), or
|
||||
/// it is a constant literal
|
||||
///
|
||||
/// Why this matters:
|
||||
/// When ORDER BY consists only of aggregates and/or constants, SQLite relies
|
||||
/// on the stability of the ORDER BY sorter to preserve the traversal order
|
||||
/// of groups established by GROUP BY iteration, and no extra tiebreak
|
||||
/// `Sequence` column is appended
|
||||
pub fn is_orderby_agg_or_const(e: &ast::Expr, aggs: &[Aggregate]) -> bool {
|
||||
if aggs
|
||||
.iter()
|
||||
.any(|agg| exprs_are_equivalent(&agg.original_expr, e))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
matches!(e, ast::Expr::Literal(_))
|
||||
}
|
||||
|
||||
/// Computes the traversal order of GROUP BY keys so that the final
|
||||
/// ORDER BY matches SQLite's tie-breaking semantics.
|
||||
///
|
||||
/// If there are no GROUP BY keys or no ORDER BY terms, all keys default to ascending.
|
||||
///
|
||||
/// If *every* ORDER BY term is an aggregate or a constant then we mirror the
|
||||
/// direction of the first ORDER BY term across all GROUP BY keys.
|
||||
///
|
||||
/// Otherwise (mixed ORDER BY: at least one non-aggregate, non-constant term),
|
||||
/// we try to mirror explicit directions for any GROUP BY expression that
|
||||
/// appears in ORDER BY, and the remaining keys default to `ASC`.
|
||||
pub fn compute_group_by_sort_order(
|
||||
group_by_exprs: &[ast::Expr],
|
||||
order_by: &[(Box<ast::Expr>, SortOrder)],
|
||||
aggs: &[Aggregate],
|
||||
) -> Vec<SortOrder> {
|
||||
let groupby_len = group_by_exprs.len();
|
||||
if groupby_len == 0 || order_by.is_empty() {
|
||||
return vec![SortOrder::Asc; groupby_len];
|
||||
}
|
||||
let only_agg_or_const = order_by
|
||||
.iter()
|
||||
.all(|(e, _)| is_orderby_agg_or_const(e, aggs));
|
||||
|
||||
if only_agg_or_const {
|
||||
let first_direction = order_by[0].1;
|
||||
return vec![first_direction; groupby_len];
|
||||
}
|
||||
|
||||
let mut result = vec![SortOrder::Asc; groupby_len];
|
||||
for (idx, groupby_expr) in group_by_exprs.iter().enumerate() {
|
||||
if let Some((_, direction)) = order_by
|
||||
.iter()
|
||||
.find(|(expr, _)| exprs_are_equivalent(expr, groupby_expr))
|
||||
{
|
||||
result[idx] = *direction;
|
||||
}
|
||||
}
|
||||
result
|
||||
}
|
||||
|
||||
fn collect_non_aggregate_expressions<'a>(
|
||||
non_aggregate_expressions: &mut Vec<(&'a ast::Expr, bool)>,
|
||||
group_by: &'a GroupBy,
|
||||
@@ -736,15 +803,7 @@ pub fn group_by_emit_row_phase<'a>(
|
||||
)?;
|
||||
}
|
||||
false => {
|
||||
order_by_sorter_insert(
|
||||
program,
|
||||
&t_ctx.resolver,
|
||||
t_ctx
|
||||
.meta_sort
|
||||
.as_ref()
|
||||
.expect("sort metadata must exist for ORDER BY"),
|
||||
plan,
|
||||
)?;
|
||||
order_by_sorter_insert(program, t_ctx, plan)?;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -858,15 +858,7 @@ fn emit_loop_source(
|
||||
Ok(())
|
||||
}
|
||||
LoopEmitTarget::OrderBySorter => {
|
||||
order_by_sorter_insert(
|
||||
program,
|
||||
&t_ctx.resolver,
|
||||
t_ctx
|
||||
.meta_sort
|
||||
.as_ref()
|
||||
.expect("sort metadata must exist for ORDER BY"),
|
||||
plan,
|
||||
)?;
|
||||
order_by_sorter_insert(program, t_ctx, plan)?;
|
||||
|
||||
if let Distinctness::Distinct { ctx } = &plan.distinctness {
|
||||
let distinct_ctx = ctx.as_ref().expect("distinct context must exist");
|
||||
|
||||
@@ -3,7 +3,7 @@ use turso_parser::ast::{self, SortOrder};
|
||||
use crate::{
|
||||
emit_explain,
|
||||
schema::PseudoCursorType,
|
||||
translate::collate::CollationSeq,
|
||||
translate::{collate::CollationSeq, group_by::is_orderby_agg_or_const, plan::Aggregate},
|
||||
util::exprs_are_equivalent,
|
||||
vdbe::{
|
||||
builder::{CursorType, ProgramBuilder},
|
||||
@@ -13,7 +13,7 @@ use crate::{
|
||||
};
|
||||
|
||||
use super::{
|
||||
emitter::{Resolver, TranslateCtx},
|
||||
emitter::TranslateCtx,
|
||||
expr::translate_expr,
|
||||
plan::{Distinctness, ResultSetColumn, SelectPlan, TableReferences},
|
||||
result_row::{emit_offset, emit_result_row_and_limit},
|
||||
@@ -30,6 +30,11 @@ pub struct SortMetadata {
|
||||
// This vector holds the indexes of the result columns in the ORDER BY sorter.
|
||||
// This vector must be the same length as the result columns.
|
||||
pub remappings: Vec<OrderByRemapping>,
|
||||
/// Whether we append an extra ascending "Sequence" key to the ORDER BY sort keys.
|
||||
/// This is used *only* when a GROUP BY is present *and* ORDER BY is not purely
|
||||
/// aggregates/constants, so that rows that tie on ORDER BY terms are output in
|
||||
/// the same relative order the underlying row stream produced them.
|
||||
pub has_sequence: bool,
|
||||
}
|
||||
|
||||
/// Initialize resources needed for ORDER BY processing
|
||||
@@ -39,12 +44,21 @@ pub fn init_order_by(
|
||||
result_columns: &[ResultSetColumn],
|
||||
order_by: &[(Box<ast::Expr>, SortOrder)],
|
||||
referenced_tables: &TableReferences,
|
||||
has_group_by: bool,
|
||||
aggregates: &[Aggregate],
|
||||
) -> Result<()> {
|
||||
let sort_cursor = program.alloc_cursor_id(CursorType::Sorter);
|
||||
let only_aggs = order_by
|
||||
.iter()
|
||||
.all(|(e, _)| is_orderby_agg_or_const(e, aggregates));
|
||||
|
||||
// only emit sequence column if we have GROUP BY and ORDER BY is not only aggregates or constants
|
||||
let has_sequence = has_group_by && !only_aggs;
|
||||
t_ctx.meta_sort = Some(SortMetadata {
|
||||
sort_cursor,
|
||||
reg_sorter_data: program.alloc_register(),
|
||||
remappings: order_by_deduplicate_result_columns(order_by, result_columns),
|
||||
remappings: order_by_deduplicate_result_columns(order_by, result_columns, has_sequence),
|
||||
has_sequence,
|
||||
});
|
||||
|
||||
/*
|
||||
@@ -54,7 +68,7 @@ pub fn init_order_by(
|
||||
* then the collating sequence of the column is used to determine sort order.
|
||||
* If the expression is not a column and has no COLLATE clause, then the BINARY collating sequence is used.
|
||||
*/
|
||||
let collations = order_by
|
||||
let mut collations = order_by
|
||||
.iter()
|
||||
.map(|(expr, _)| match expr.as_ref() {
|
||||
ast::Expr::Collate(_, collation_name) => {
|
||||
@@ -72,10 +86,24 @@ pub fn init_order_by(
|
||||
_ => Ok(Some(CollationSeq::default())),
|
||||
})
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
|
||||
if has_sequence {
|
||||
// sequence column uses BINARY collation
|
||||
collations.push(Some(CollationSeq::default()));
|
||||
}
|
||||
let key_len = order_by.len() + if has_sequence { 1 } else { 0 };
|
||||
|
||||
program.emit_insn(Insn::SorterOpen {
|
||||
cursor_id: sort_cursor,
|
||||
columns: order_by.len(),
|
||||
order: order_by.iter().map(|(_, direction)| *direction).collect(),
|
||||
columns: key_len,
|
||||
order: {
|
||||
let mut ord: Vec<SortOrder> = order_by.iter().map(|(_, d)| *d).collect();
|
||||
if has_sequence {
|
||||
// sequence is ascending tiebreaker
|
||||
ord.push(SortOrder::Asc);
|
||||
}
|
||||
ord
|
||||
},
|
||||
collations,
|
||||
});
|
||||
Ok(())
|
||||
@@ -98,9 +126,12 @@ pub fn emit_order_by(
|
||||
sort_cursor,
|
||||
reg_sorter_data,
|
||||
ref remappings,
|
||||
has_sequence,
|
||||
} = *t_ctx.meta_sort.as_ref().unwrap();
|
||||
let sorter_column_count =
|
||||
order_by.len() + remappings.iter().filter(|r| !r.deduplicated).count();
|
||||
|
||||
let sorter_column_count = order_by.len()
|
||||
+ if has_sequence { 1 } else { 0 }
|
||||
+ remappings.iter().filter(|r| !r.deduplicated).count();
|
||||
|
||||
// TODO: we need to know how many indices used for sorting
|
||||
// to emit correct explain output.
|
||||
@@ -142,10 +173,11 @@ pub fn emit_order_by(
|
||||
let start_reg = t_ctx.reg_result_cols_start.unwrap();
|
||||
for i in 0..result_columns.len() {
|
||||
let reg = start_reg + i;
|
||||
let column_idx = remappings
|
||||
let remapping = remappings
|
||||
.get(i)
|
||||
.expect("remapping must exist for all result columns")
|
||||
.orderby_sorter_idx;
|
||||
.expect("remapping must exist for all result columns");
|
||||
|
||||
let column_idx = remapping.orderby_sorter_idx;
|
||||
program.emit_column_or_rowid(cursor_id, column_idx, reg);
|
||||
}
|
||||
|
||||
@@ -170,10 +202,11 @@ pub fn emit_order_by(
|
||||
/// Emits the bytecode for inserting a row into an ORDER BY sorter.
|
||||
pub fn order_by_sorter_insert(
|
||||
program: &mut ProgramBuilder,
|
||||
resolver: &Resolver,
|
||||
sort_metadata: &SortMetadata,
|
||||
t_ctx: &TranslateCtx,
|
||||
plan: &SelectPlan,
|
||||
) -> Result<()> {
|
||||
let resolver = &t_ctx.resolver;
|
||||
let sort_metadata = t_ctx.meta_sort.as_ref().expect("sort metadata must exist");
|
||||
let order_by = &plan.order_by;
|
||||
let order_by_len = order_by.len();
|
||||
let result_columns = &plan.result_columns;
|
||||
@@ -185,19 +218,49 @@ pub fn order_by_sorter_insert(
|
||||
|
||||
// The ORDER BY sorter has the sort keys first, then the result columns.
|
||||
let orderby_sorter_column_count =
|
||||
order_by_len + result_columns.len() - result_columns_to_skip_len;
|
||||
order_by_len + if sort_metadata.has_sequence { 1 } else { 0 } + result_columns.len()
|
||||
- result_columns_to_skip_len;
|
||||
|
||||
let start_reg = program.alloc_registers(orderby_sorter_column_count);
|
||||
for (i, (expr, _)) in order_by.iter().enumerate() {
|
||||
let key_reg = start_reg + i;
|
||||
translate_expr(
|
||||
program,
|
||||
Some(&plan.table_references),
|
||||
expr,
|
||||
key_reg,
|
||||
resolver,
|
||||
)?;
|
||||
|
||||
// Check if this ORDER BY expression matches a finalized aggregate
|
||||
if let Some(agg_idx) = plan
|
||||
.aggregates
|
||||
.iter()
|
||||
.position(|agg| exprs_are_equivalent(&agg.original_expr, expr))
|
||||
{
|
||||
// This ORDER BY expression is an aggregate, so copy from register
|
||||
let agg_start_reg = t_ctx
|
||||
.reg_agg_start
|
||||
.expect("aggregate registers must be initialized");
|
||||
let src_reg = agg_start_reg + agg_idx;
|
||||
program.emit_insn(Insn::Copy {
|
||||
src_reg,
|
||||
dst_reg: key_reg,
|
||||
extra_amount: 0,
|
||||
});
|
||||
} else {
|
||||
// Not an aggregate, translate normally
|
||||
translate_expr(
|
||||
program,
|
||||
Some(&plan.table_references),
|
||||
expr,
|
||||
key_reg,
|
||||
resolver,
|
||||
)?;
|
||||
}
|
||||
}
|
||||
let mut cur_reg = start_reg + order_by_len;
|
||||
if sort_metadata.has_sequence {
|
||||
program.emit_insn(Insn::Sequence {
|
||||
cursor_id: sort_metadata.sort_cursor,
|
||||
target_reg: cur_reg,
|
||||
});
|
||||
cur_reg += 1;
|
||||
}
|
||||
|
||||
for (i, rc) in result_columns.iter().enumerate() {
|
||||
// If the result column is an exact duplicate of a sort key, we skip it.
|
||||
if sort_metadata
|
||||
@@ -251,12 +314,12 @@ pub fn order_by_sorter_insert(
|
||||
let reordered_start_reg = program.alloc_registers(result_columns.len());
|
||||
|
||||
for (select_idx, _rc) in result_columns.iter().enumerate() {
|
||||
let src_reg = sort_metadata
|
||||
let remapping = sort_metadata
|
||||
.remappings
|
||||
.get(select_idx)
|
||||
.map(|r| start_reg + r.orderby_sorter_idx)
|
||||
.expect("remapping must exist for all result columns");
|
||||
|
||||
let src_reg = start_reg + remapping.orderby_sorter_idx;
|
||||
let dst_reg = reordered_start_reg + select_idx;
|
||||
|
||||
program.emit_insn(Insn::Copy {
|
||||
@@ -336,11 +399,13 @@ pub struct OrderByRemapping {
|
||||
pub fn order_by_deduplicate_result_columns(
|
||||
order_by: &[(Box<ast::Expr>, SortOrder)],
|
||||
result_columns: &[ResultSetColumn],
|
||||
has_sequence: bool,
|
||||
) -> Vec<OrderByRemapping> {
|
||||
let mut result_column_remapping: Vec<OrderByRemapping> = Vec::new();
|
||||
let order_by_len = order_by.len();
|
||||
let sequence_offset = if has_sequence { 1 } else { 0 };
|
||||
|
||||
let mut i = 0;
|
||||
let mut non_dedup_count = 0;
|
||||
for rc in result_columns.iter() {
|
||||
let found = order_by
|
||||
.iter()
|
||||
@@ -352,14 +417,11 @@ pub fn order_by_deduplicate_result_columns(
|
||||
deduplicated: true,
|
||||
});
|
||||
} else {
|
||||
// This result column is not a duplicate of any ORDER BY key, so its sorter
|
||||
// index comes after all ORDER BY entries (hence the +order_by_len). The
|
||||
// counter `i` tracks how many such non-duplicate result columns we've seen.
|
||||
result_column_remapping.push(OrderByRemapping {
|
||||
orderby_sorter_idx: i + order_by_len,
|
||||
orderby_sorter_idx: order_by_len + sequence_offset + non_dedup_count,
|
||||
deduplicated: false,
|
||||
});
|
||||
i += 1;
|
||||
non_dedup_count += 1;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ use super::plan::{
|
||||
};
|
||||
use crate::schema::Table;
|
||||
use crate::translate::expr::{bind_and_rewrite_expr, ParamState};
|
||||
use crate::translate::group_by::compute_group_by_sort_order;
|
||||
use crate::translate::optimizer::optimize_plan;
|
||||
use crate::translate::plan::{GroupBy, Plan, ResultSetColumn, SelectPlan};
|
||||
use crate::translate::planner::{
|
||||
@@ -424,7 +425,7 @@ fn prepare_one_select_plan(
|
||||
}
|
||||
|
||||
plan.group_by = Some(GroupBy {
|
||||
sort_order: Some((0..group_by.exprs.len()).map(|_| SortOrder::Asc).collect()),
|
||||
sort_order: None,
|
||||
exprs: group_by.exprs.iter().map(|expr| *expr.clone()).collect(),
|
||||
having: if let Some(having) = group_by.having {
|
||||
let mut predicates = vec![];
|
||||
@@ -487,6 +488,15 @@ fn prepare_one_select_plan(
|
||||
key.push((o.expr, o.order.unwrap_or(ast::SortOrder::Asc)));
|
||||
}
|
||||
plan.order_by = key;
|
||||
if let Some(group_by) = &mut plan.group_by {
|
||||
// now that we have resolved the ORDER BY expressions and aggregates, we can
|
||||
// compute the necessary sort order for the GROUP BY clause
|
||||
group_by.sort_order = Some(compute_group_by_sort_order(
|
||||
&group_by.exprs,
|
||||
&plan.order_by,
|
||||
&plan.aggregates,
|
||||
));
|
||||
}
|
||||
|
||||
// Parse the LIMIT/OFFSET clause
|
||||
(plan.limit, plan.offset) = limit.map_or(Ok((None, None)), |mut l| {
|
||||
|
||||
@@ -1013,15 +1013,7 @@ fn emit_return_buffered_rows(
|
||||
)?;
|
||||
}
|
||||
false => {
|
||||
order_by_sorter_insert(
|
||||
program,
|
||||
&t_ctx.resolver,
|
||||
t_ctx
|
||||
.meta_sort
|
||||
.as_ref()
|
||||
.expect("sort metadata must exist for ORDER BY"),
|
||||
plan,
|
||||
)?;
|
||||
order_by_sorter_insert(program, t_ctx, plan)?;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user