From 7c919314a99778f11f842454ee70bb3cd29e82ef Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Tue, 14 Oct 2025 12:27:57 +0400 Subject: [PATCH] use heap-sort style algorithm for order by ... limit k queries --- core/translate/emitter.rs | 1 + core/translate/order_by.rs | 246 +++++++++++++++++++++++++++---------- 2 files changed, 184 insertions(+), 63 deletions(-) diff --git a/core/translate/emitter.rs b/core/translate/emitter.rs index 2dabd4b82..5ae59fc57 100644 --- a/core/translate/emitter.rs +++ b/core/translate/emitter.rs @@ -304,6 +304,7 @@ pub fn emit_query<'a>( &plan.order_by, &plan.table_references, plan.group_by.is_some(), + plan.distinctness != Distinctness::NonDistinct, &plan.aggregates, )?; } diff --git a/core/translate/order_by.rs b/core/translate/order_by.rs index aafb32a21..09de9c3d3 100644 --- a/core/translate/order_by.rs +++ b/core/translate/order_by.rs @@ -1,8 +1,10 @@ +use std::sync::Arc; + use turso_parser::ast::{self, SortOrder}; use crate::{ emit_explain, - schema::PseudoCursorType, + schema::{Index, IndexColumn, PseudoCursorType}, translate::{ collate::{get_collseq_from_expr, CollationSeq}, group_by::is_orderby_agg_or_const, @@ -11,7 +13,7 @@ use crate::{ util::exprs_are_equivalent, vdbe::{ builder::{CursorType, ProgramBuilder}, - insn::Insn, + insn::{IdxInsertFlags, Insn}, }, QueryMode, Result, }; @@ -39,6 +41,8 @@ pub struct SortMetadata { /// aggregates/constants, so that rows that tie on ORDER BY terms are output in /// the same relative order the underlying row stream produced them. pub has_sequence: bool, + /// Whether to use heap-sort with BTreeIndex instead of full-collection sort through Sorter + pub use_heap_sort: bool, } /// Initialize resources needed for ORDER BY processing @@ -49,54 +53,106 @@ pub fn init_order_by( order_by: &[(Box, SortOrder)], referenced_tables: &TableReferences, has_group_by: bool, + has_distinct: bool, aggregates: &[Aggregate], ) -> Result<()> { - let sort_cursor = program.alloc_cursor_id(CursorType::Sorter); let only_aggs = order_by .iter() .all(|(e, _)| is_orderby_agg_or_const(&t_ctx.resolver, e, aggregates)); // only emit sequence column if we have GROUP BY and ORDER BY is not only aggregates or constants let has_sequence = has_group_by && !only_aggs; + + let use_heap_sort = !has_distinct && !has_group_by && t_ctx.limit_ctx.is_some(); + if use_heap_sort { + assert!(!has_sequence); + } + let remappings = order_by_deduplicate_result_columns(order_by, result_columns, has_sequence); + let sort_cursor = if use_heap_sort { + let index_name = format!("heap_sort_{}", program.offset().as_offset_int()); // we don't really care about the name that much, just enough that we don't get name collisions + let mut index_columns = Vec::with_capacity(order_by.len() + result_columns.len()); + for (column, order) in order_by { + let collation = get_collseq_from_expr(column, referenced_tables)?; + let pos_in_table = index_columns.len(); + index_columns.push(IndexColumn { + name: pos_in_table.to_string(), + order: *order, + pos_in_table, + collation, + default: None, + }) + } + for _ in remappings.iter().filter(|r| !r.deduplicated) { + let pos_in_table = index_columns.len(); + index_columns.push(IndexColumn { + name: pos_in_table.to_string(), + order: SortOrder::Asc, + pos_in_table, + collation: None, + default: None, + }) + } + let index = Arc::new(Index { + name: index_name.clone(), + table_name: String::new(), + ephemeral: true, + root_page: 0, + columns: index_columns, + unique: false, + has_rowid: false, + where_clause: None, + }); + program.alloc_cursor_id(CursorType::BTreeIndex(index)) + } else { + program.alloc_cursor_id(CursorType::Sorter) + }; t_ctx.meta_sort = Some(SortMetadata { sort_cursor, reg_sorter_data: program.alloc_register(), - remappings: order_by_deduplicate_result_columns(order_by, result_columns, has_sequence), + remappings, has_sequence, + use_heap_sort, }); - /* - * Terms of the ORDER BY clause that is part of a SELECT statement may be assigned a collating sequence using the COLLATE operator, - * in which case the specified collating function is used for sorting. - * Otherwise, if the expression sorted by an ORDER BY clause is a column, - * then the collating sequence of the column is used to determine sort order. - * If the expression is not a column and has no COLLATE clause, then the BINARY collating sequence is used. - */ - let mut collations = order_by - .iter() - .map(|(expr, _)| get_collseq_from_expr(expr, referenced_tables)) - .collect::>>()?; + if use_heap_sort { + program.emit_insn(Insn::OpenEphemeral { + cursor_id: sort_cursor, + is_table: false, + }); + } else { + /* + * Terms of the ORDER BY clause that is part of a SELECT statement may be assigned a collating sequence using the COLLATE operator, + * in which case the specified collating function is used for sorting. + * Otherwise, if the expression sorted by an ORDER BY clause is a column, + * then the collating sequence of the column is used to determine sort order. + * If the expression is not a column and has no COLLATE clause, then the BINARY collating sequence is used. + */ + let mut collations = order_by + .iter() + .map(|(expr, _)| get_collseq_from_expr(expr, referenced_tables)) + .collect::>>()?; - if has_sequence { - // sequence column uses BINARY collation - collations.push(Some(CollationSeq::default())); + if has_sequence { + // sequence column uses BINARY collation + collations.push(Some(CollationSeq::default())); + } + + let key_len = order_by.len() + if has_sequence { 1 } else { 0 }; + + program.emit_insn(Insn::SorterOpen { + cursor_id: sort_cursor, + columns: key_len, + order: { + let mut ord: Vec = order_by.iter().map(|(_, d)| *d).collect(); + if has_sequence { + // sequence is ascending tiebreaker + ord.push(SortOrder::Asc); + } + ord + }, + collations, + }); } - - let key_len = order_by.len() + if has_sequence { 1 } else { 0 }; - - program.emit_insn(Insn::SorterOpen { - cursor_id: sort_cursor, - columns: key_len, - order: { - let mut ord: Vec = order_by.iter().map(|(_, d)| *d).collect(); - if has_sequence { - // sequence is ascending tiebreaker - ord.push(SortOrder::Asc); - } - ord - }, - collations, - }); Ok(()) } @@ -118,6 +174,7 @@ pub fn emit_order_by( reg_sorter_data, ref remappings, has_sequence, + use_heap_sort, } = *t_ctx.meta_sort.as_ref().unwrap(); let sorter_column_count = order_by.len() @@ -128,33 +185,44 @@ pub fn emit_order_by( // to emit correct explain output. emit_explain!(program, false, "USE TEMP B-TREE FOR ORDER BY".to_owned()); - let pseudo_cursor = program.alloc_cursor_id(CursorType::Pseudo(PseudoCursorType { - column_count: sorter_column_count, - })); + let cursor_id = if !use_heap_sort { + let pseudo_cursor = program.alloc_cursor_id(CursorType::Pseudo(PseudoCursorType { + column_count: sorter_column_count, + })); - program.emit_insn(Insn::OpenPseudo { - cursor_id: pseudo_cursor, - content_reg: reg_sorter_data, - num_fields: sorter_column_count, - }); + program.emit_insn(Insn::OpenPseudo { + cursor_id: pseudo_cursor, + content_reg: reg_sorter_data, + num_fields: sorter_column_count, + }); + + program.emit_insn(Insn::SorterSort { + cursor_id: sort_cursor, + pc_if_empty: sort_loop_end_label, + }); + pseudo_cursor + } else { + program.emit_insn(Insn::Rewind { + cursor_id: sort_cursor, + pc_if_empty: sort_loop_end_label, + }); + sort_cursor + }; - program.emit_insn(Insn::SorterSort { - cursor_id: sort_cursor, - pc_if_empty: sort_loop_end_label, - }); program.preassign_label_to_next_insn(sort_loop_start_label); emit_offset(program, sort_loop_next_label, t_ctx.reg_offset); - program.emit_insn(Insn::SorterData { - cursor_id: sort_cursor, - dest_reg: reg_sorter_data, - pseudo_cursor, - }); + if !use_heap_sort { + program.emit_insn(Insn::SorterData { + cursor_id: sort_cursor, + dest_reg: reg_sorter_data, + pseudo_cursor: cursor_id, + }); + } // We emit the columns in SELECT order, not sorter order (sorter always has the sort keys first). // This is tracked in sort_metadata.remappings. - let cursor_id = pseudo_cursor; let start_reg = t_ctx.reg_result_cols_start.unwrap(); for i in 0..result_columns.len() { let reg = start_reg + i; @@ -175,10 +243,17 @@ pub fn emit_order_by( )?; program.resolve_label(sort_loop_next_label, program.offset()); - program.emit_insn(Insn::SorterNext { - cursor_id: sort_cursor, - pc_if_next: sort_loop_start_label, - }); + if !use_heap_sort { + program.emit_insn(Insn::SorterNext { + cursor_id: sort_cursor, + pc_if_next: sort_loop_start_label, + }); + } else { + program.emit_insn(Insn::Next { + cursor_id: sort_cursor, + pc_if_next: sort_loop_start_label, + }); + } program.preassign_label_to_next_insn(sort_loop_end_label); Ok(()) @@ -333,16 +408,61 @@ pub fn order_by_sorter_insert( let SortMetadata { sort_cursor, reg_sorter_data, + use_heap_sort, .. } = sort_metadata; - sorter_insert( - program, - start_reg, - orderby_sorter_column_count, - *sort_cursor, - *reg_sorter_data, - ); + if *use_heap_sort { + // maintain top-k records in the index instead of materializing the whole sequence + let insert_label = program.allocate_label(); + let skip_label = program.allocate_label(); + let limit = t_ctx.limit_ctx.as_ref().expect("limit must be set"); + program.emit_insn(Insn::IfPos { + reg: limit.reg_limit, + target_pc: insert_label, + decrement_by: 1, + }); + program.emit_insn(Insn::Last { + cursor_id: *sort_cursor, + pc_if_empty: insert_label, + }); + program.emit_insn(Insn::IdxLE { + cursor_id: *sort_cursor, + start_reg: start_reg, + num_regs: orderby_sorter_column_count, + target_pc: skip_label, + }); + + program.emit_insn(Insn::Delete { + cursor_id: *sort_cursor, + table_name: "".to_string(), + }); + + program.preassign_label_to_next_insn(insert_label); + program.emit_insn(Insn::MakeRecord { + start_reg, + count: orderby_sorter_column_count, + dest_reg: *reg_sorter_data, + index_name: None, + affinity_str: None, + }); + program.emit_insn(Insn::IdxInsert { + cursor_id: *sort_cursor, + record_reg: *reg_sorter_data, + unpacked_start: None, + unpacked_count: None, + flags: IdxInsertFlags::new(), + }); + program.preassign_label_to_next_insn(skip_label); + } else { + sorter_insert( + program, + start_reg, + orderby_sorter_column_count, + *sort_cursor, + *reg_sorter_data, + ); + } Ok(()) }