mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-25 10:54:28 +01:00
use heap-sort style algorithm for order by ... limit k queries
This commit is contained in:
@@ -304,6 +304,7 @@ pub fn emit_query<'a>(
|
||||
&plan.order_by,
|
||||
&plan.table_references,
|
||||
plan.group_by.is_some(),
|
||||
plan.distinctness != Distinctness::NonDistinct,
|
||||
&plan.aggregates,
|
||||
)?;
|
||||
}
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use turso_parser::ast::{self, SortOrder};
|
||||
|
||||
use crate::{
|
||||
emit_explain,
|
||||
schema::PseudoCursorType,
|
||||
schema::{Index, IndexColumn, PseudoCursorType},
|
||||
translate::{
|
||||
collate::{get_collseq_from_expr, CollationSeq},
|
||||
group_by::is_orderby_agg_or_const,
|
||||
@@ -11,7 +13,7 @@ use crate::{
|
||||
util::exprs_are_equivalent,
|
||||
vdbe::{
|
||||
builder::{CursorType, ProgramBuilder},
|
||||
insn::Insn,
|
||||
insn::{IdxInsertFlags, Insn},
|
||||
},
|
||||
QueryMode, Result,
|
||||
};
|
||||
@@ -39,6 +41,8 @@ pub struct SortMetadata {
|
||||
/// aggregates/constants, so that rows that tie on ORDER BY terms are output in
|
||||
/// the same relative order the underlying row stream produced them.
|
||||
pub has_sequence: bool,
|
||||
/// Whether to use heap-sort with BTreeIndex instead of full-collection sort through Sorter
|
||||
pub use_heap_sort: bool,
|
||||
}
|
||||
|
||||
/// Initialize resources needed for ORDER BY processing
|
||||
@@ -49,54 +53,106 @@ pub fn init_order_by(
|
||||
order_by: &[(Box<ast::Expr>, SortOrder)],
|
||||
referenced_tables: &TableReferences,
|
||||
has_group_by: bool,
|
||||
has_distinct: bool,
|
||||
aggregates: &[Aggregate],
|
||||
) -> Result<()> {
|
||||
let sort_cursor = program.alloc_cursor_id(CursorType::Sorter);
|
||||
let only_aggs = order_by
|
||||
.iter()
|
||||
.all(|(e, _)| is_orderby_agg_or_const(&t_ctx.resolver, e, aggregates));
|
||||
|
||||
// only emit sequence column if we have GROUP BY and ORDER BY is not only aggregates or constants
|
||||
let has_sequence = has_group_by && !only_aggs;
|
||||
|
||||
let use_heap_sort = !has_distinct && !has_group_by && t_ctx.limit_ctx.is_some();
|
||||
if use_heap_sort {
|
||||
assert!(!has_sequence);
|
||||
}
|
||||
let remappings = order_by_deduplicate_result_columns(order_by, result_columns, has_sequence);
|
||||
let sort_cursor = if use_heap_sort {
|
||||
let index_name = format!("heap_sort_{}", program.offset().as_offset_int()); // we don't really care about the name that much, just enough that we don't get name collisions
|
||||
let mut index_columns = Vec::with_capacity(order_by.len() + result_columns.len());
|
||||
for (column, order) in order_by {
|
||||
let collation = get_collseq_from_expr(column, referenced_tables)?;
|
||||
let pos_in_table = index_columns.len();
|
||||
index_columns.push(IndexColumn {
|
||||
name: pos_in_table.to_string(),
|
||||
order: *order,
|
||||
pos_in_table,
|
||||
collation,
|
||||
default: None,
|
||||
})
|
||||
}
|
||||
for _ in remappings.iter().filter(|r| !r.deduplicated) {
|
||||
let pos_in_table = index_columns.len();
|
||||
index_columns.push(IndexColumn {
|
||||
name: pos_in_table.to_string(),
|
||||
order: SortOrder::Asc,
|
||||
pos_in_table,
|
||||
collation: None,
|
||||
default: None,
|
||||
})
|
||||
}
|
||||
let index = Arc::new(Index {
|
||||
name: index_name.clone(),
|
||||
table_name: String::new(),
|
||||
ephemeral: true,
|
||||
root_page: 0,
|
||||
columns: index_columns,
|
||||
unique: false,
|
||||
has_rowid: false,
|
||||
where_clause: None,
|
||||
});
|
||||
program.alloc_cursor_id(CursorType::BTreeIndex(index))
|
||||
} else {
|
||||
program.alloc_cursor_id(CursorType::Sorter)
|
||||
};
|
||||
t_ctx.meta_sort = Some(SortMetadata {
|
||||
sort_cursor,
|
||||
reg_sorter_data: program.alloc_register(),
|
||||
remappings: order_by_deduplicate_result_columns(order_by, result_columns, has_sequence),
|
||||
remappings,
|
||||
has_sequence,
|
||||
use_heap_sort,
|
||||
});
|
||||
|
||||
/*
|
||||
* Terms of the ORDER BY clause that is part of a SELECT statement may be assigned a collating sequence using the COLLATE operator,
|
||||
* in which case the specified collating function is used for sorting.
|
||||
* Otherwise, if the expression sorted by an ORDER BY clause is a column,
|
||||
* then the collating sequence of the column is used to determine sort order.
|
||||
* If the expression is not a column and has no COLLATE clause, then the BINARY collating sequence is used.
|
||||
*/
|
||||
let mut collations = order_by
|
||||
.iter()
|
||||
.map(|(expr, _)| get_collseq_from_expr(expr, referenced_tables))
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
if use_heap_sort {
|
||||
program.emit_insn(Insn::OpenEphemeral {
|
||||
cursor_id: sort_cursor,
|
||||
is_table: false,
|
||||
});
|
||||
} else {
|
||||
/*
|
||||
* Terms of the ORDER BY clause that is part of a SELECT statement may be assigned a collating sequence using the COLLATE operator,
|
||||
* in which case the specified collating function is used for sorting.
|
||||
* Otherwise, if the expression sorted by an ORDER BY clause is a column,
|
||||
* then the collating sequence of the column is used to determine sort order.
|
||||
* If the expression is not a column and has no COLLATE clause, then the BINARY collating sequence is used.
|
||||
*/
|
||||
let mut collations = order_by
|
||||
.iter()
|
||||
.map(|(expr, _)| get_collseq_from_expr(expr, referenced_tables))
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
|
||||
if has_sequence {
|
||||
// sequence column uses BINARY collation
|
||||
collations.push(Some(CollationSeq::default()));
|
||||
if has_sequence {
|
||||
// sequence column uses BINARY collation
|
||||
collations.push(Some(CollationSeq::default()));
|
||||
}
|
||||
|
||||
let key_len = order_by.len() + if has_sequence { 1 } else { 0 };
|
||||
|
||||
program.emit_insn(Insn::SorterOpen {
|
||||
cursor_id: sort_cursor,
|
||||
columns: key_len,
|
||||
order: {
|
||||
let mut ord: Vec<SortOrder> = order_by.iter().map(|(_, d)| *d).collect();
|
||||
if has_sequence {
|
||||
// sequence is ascending tiebreaker
|
||||
ord.push(SortOrder::Asc);
|
||||
}
|
||||
ord
|
||||
},
|
||||
collations,
|
||||
});
|
||||
}
|
||||
|
||||
let key_len = order_by.len() + if has_sequence { 1 } else { 0 };
|
||||
|
||||
program.emit_insn(Insn::SorterOpen {
|
||||
cursor_id: sort_cursor,
|
||||
columns: key_len,
|
||||
order: {
|
||||
let mut ord: Vec<SortOrder> = order_by.iter().map(|(_, d)| *d).collect();
|
||||
if has_sequence {
|
||||
// sequence is ascending tiebreaker
|
||||
ord.push(SortOrder::Asc);
|
||||
}
|
||||
ord
|
||||
},
|
||||
collations,
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -118,6 +174,7 @@ pub fn emit_order_by(
|
||||
reg_sorter_data,
|
||||
ref remappings,
|
||||
has_sequence,
|
||||
use_heap_sort,
|
||||
} = *t_ctx.meta_sort.as_ref().unwrap();
|
||||
|
||||
let sorter_column_count = order_by.len()
|
||||
@@ -128,33 +185,44 @@ pub fn emit_order_by(
|
||||
// to emit correct explain output.
|
||||
emit_explain!(program, false, "USE TEMP B-TREE FOR ORDER BY".to_owned());
|
||||
|
||||
let pseudo_cursor = program.alloc_cursor_id(CursorType::Pseudo(PseudoCursorType {
|
||||
column_count: sorter_column_count,
|
||||
}));
|
||||
let cursor_id = if !use_heap_sort {
|
||||
let pseudo_cursor = program.alloc_cursor_id(CursorType::Pseudo(PseudoCursorType {
|
||||
column_count: sorter_column_count,
|
||||
}));
|
||||
|
||||
program.emit_insn(Insn::OpenPseudo {
|
||||
cursor_id: pseudo_cursor,
|
||||
content_reg: reg_sorter_data,
|
||||
num_fields: sorter_column_count,
|
||||
});
|
||||
program.emit_insn(Insn::OpenPseudo {
|
||||
cursor_id: pseudo_cursor,
|
||||
content_reg: reg_sorter_data,
|
||||
num_fields: sorter_column_count,
|
||||
});
|
||||
|
||||
program.emit_insn(Insn::SorterSort {
|
||||
cursor_id: sort_cursor,
|
||||
pc_if_empty: sort_loop_end_label,
|
||||
});
|
||||
pseudo_cursor
|
||||
} else {
|
||||
program.emit_insn(Insn::Rewind {
|
||||
cursor_id: sort_cursor,
|
||||
pc_if_empty: sort_loop_end_label,
|
||||
});
|
||||
sort_cursor
|
||||
};
|
||||
|
||||
program.emit_insn(Insn::SorterSort {
|
||||
cursor_id: sort_cursor,
|
||||
pc_if_empty: sort_loop_end_label,
|
||||
});
|
||||
program.preassign_label_to_next_insn(sort_loop_start_label);
|
||||
|
||||
emit_offset(program, sort_loop_next_label, t_ctx.reg_offset);
|
||||
|
||||
program.emit_insn(Insn::SorterData {
|
||||
cursor_id: sort_cursor,
|
||||
dest_reg: reg_sorter_data,
|
||||
pseudo_cursor,
|
||||
});
|
||||
if !use_heap_sort {
|
||||
program.emit_insn(Insn::SorterData {
|
||||
cursor_id: sort_cursor,
|
||||
dest_reg: reg_sorter_data,
|
||||
pseudo_cursor: cursor_id,
|
||||
});
|
||||
}
|
||||
|
||||
// We emit the columns in SELECT order, not sorter order (sorter always has the sort keys first).
|
||||
// This is tracked in sort_metadata.remappings.
|
||||
let cursor_id = pseudo_cursor;
|
||||
let start_reg = t_ctx.reg_result_cols_start.unwrap();
|
||||
for i in 0..result_columns.len() {
|
||||
let reg = start_reg + i;
|
||||
@@ -175,10 +243,17 @@ pub fn emit_order_by(
|
||||
)?;
|
||||
|
||||
program.resolve_label(sort_loop_next_label, program.offset());
|
||||
program.emit_insn(Insn::SorterNext {
|
||||
cursor_id: sort_cursor,
|
||||
pc_if_next: sort_loop_start_label,
|
||||
});
|
||||
if !use_heap_sort {
|
||||
program.emit_insn(Insn::SorterNext {
|
||||
cursor_id: sort_cursor,
|
||||
pc_if_next: sort_loop_start_label,
|
||||
});
|
||||
} else {
|
||||
program.emit_insn(Insn::Next {
|
||||
cursor_id: sort_cursor,
|
||||
pc_if_next: sort_loop_start_label,
|
||||
});
|
||||
}
|
||||
program.preassign_label_to_next_insn(sort_loop_end_label);
|
||||
|
||||
Ok(())
|
||||
@@ -333,16 +408,61 @@ pub fn order_by_sorter_insert(
|
||||
let SortMetadata {
|
||||
sort_cursor,
|
||||
reg_sorter_data,
|
||||
use_heap_sort,
|
||||
..
|
||||
} = sort_metadata;
|
||||
|
||||
sorter_insert(
|
||||
program,
|
||||
start_reg,
|
||||
orderby_sorter_column_count,
|
||||
*sort_cursor,
|
||||
*reg_sorter_data,
|
||||
);
|
||||
if *use_heap_sort {
|
||||
// maintain top-k records in the index instead of materializing the whole sequence
|
||||
let insert_label = program.allocate_label();
|
||||
let skip_label = program.allocate_label();
|
||||
let limit = t_ctx.limit_ctx.as_ref().expect("limit must be set");
|
||||
program.emit_insn(Insn::IfPos {
|
||||
reg: limit.reg_limit,
|
||||
target_pc: insert_label,
|
||||
decrement_by: 1,
|
||||
});
|
||||
program.emit_insn(Insn::Last {
|
||||
cursor_id: *sort_cursor,
|
||||
pc_if_empty: insert_label,
|
||||
});
|
||||
program.emit_insn(Insn::IdxLE {
|
||||
cursor_id: *sort_cursor,
|
||||
start_reg: start_reg,
|
||||
num_regs: orderby_sorter_column_count,
|
||||
target_pc: skip_label,
|
||||
});
|
||||
|
||||
program.emit_insn(Insn::Delete {
|
||||
cursor_id: *sort_cursor,
|
||||
table_name: "".to_string(),
|
||||
});
|
||||
|
||||
program.preassign_label_to_next_insn(insert_label);
|
||||
program.emit_insn(Insn::MakeRecord {
|
||||
start_reg,
|
||||
count: orderby_sorter_column_count,
|
||||
dest_reg: *reg_sorter_data,
|
||||
index_name: None,
|
||||
affinity_str: None,
|
||||
});
|
||||
program.emit_insn(Insn::IdxInsert {
|
||||
cursor_id: *sort_cursor,
|
||||
record_reg: *reg_sorter_data,
|
||||
unpacked_start: None,
|
||||
unpacked_count: None,
|
||||
flags: IdxInsertFlags::new(),
|
||||
});
|
||||
program.preassign_label_to_next_insn(skip_label);
|
||||
} else {
|
||||
sorter_insert(
|
||||
program,
|
||||
start_reg,
|
||||
orderby_sorter_column_count,
|
||||
*sort_cursor,
|
||||
*reg_sorter_data,
|
||||
);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user