Merge 'Order by heap sort' from Nikita Sivukhin

This PR implements simple heap-sort approach for query plans like
`SELECT ... FROM t WHERE ... ORDER BY ... LIMIT N` in order to maintain
small set of top N elements in the ephemeral B-tree and avoid sort and
materialization of whole dataset.
I removed all optimizations not related to this particular change in
order to make branch lightweight.

Reviewed-by: Jussi Saurio <jussi.saurio@gmail.com>

Closes #3726
This commit is contained in:
Jussi Saurio
2025-10-23 15:00:42 +03:00
committed by GitHub
7 changed files with 275 additions and 133 deletions

View File

@@ -1486,6 +1486,44 @@ pub fn read_integer(buf: &[u8], serial_type: u8) -> Result<i64> {
}
}
/// Fast varint reader optimized for the common cases of 1-byte and 2-byte varints.
///
/// This function is a performance-optimized version of `read_varint()` that handles
/// the most common varint cases inline before falling back to the full implementation.
/// It follows the same varint encoding as SQLite.
///
/// # Optimized Cases
///
/// - **Single-byte case**: Values 0-127 (0x00-0x7F) are returned immediately
/// - **Two-byte case**: Values 128-16383 (0x80-0x3FFF) are handled inline
/// - **Multi-byte case**: Larger values fall back to the full `read_varint()` implementation
///
/// This function is similar to `sqlite3GetVarint32`
#[inline(always)]
pub fn read_varint_fast(buf: &[u8]) -> Result<(u64, usize)> {
// Fast path: Single-byte varint
if let Some(&first_byte) = buf.first() {
if first_byte & 0x80 == 0 {
return Ok((first_byte as u64, 1));
}
} else {
crate::bail_corrupt_error!("Invalid varint");
}
// Fast path: Two-byte varint
if let Some(&second_byte) = buf.get(1) {
if second_byte & 0x80 == 0 {
let v = (((buf[0] & 0x7f) as u64) << 7) + (second_byte as u64);
return Ok((v, 2));
}
} else {
crate::bail_corrupt_error!("Invalid varint");
}
//Fallback: Multi-byte varint
read_varint(buf)
}
#[inline(always)]
pub fn read_varint(buf: &[u8]) -> Result<(u64, usize)> {
let mut v: u64 = 0;

View File

@@ -319,6 +319,7 @@ pub fn emit_query<'a>(
&plan.order_by,
&plan.table_references,
plan.group_by.is_some(),
plan.distinctness != Distinctness::NonDistinct,
&plan.aggregates,
)?;
}

View File

@@ -1,8 +1,10 @@
use std::sync::Arc;
use turso_parser::ast::{self, SortOrder};
use crate::{
emit_explain,
schema::PseudoCursorType,
schema::{Index, IndexColumn, PseudoCursorType},
translate::{
collate::{get_collseq_from_expr, CollationSeq},
group_by::is_orderby_agg_or_const,
@@ -11,7 +13,7 @@ use crate::{
util::exprs_are_equivalent,
vdbe::{
builder::{CursorType, ProgramBuilder},
insn::Insn,
insn::{IdxInsertFlags, Insn},
},
QueryMode, Result,
};
@@ -39,9 +41,12 @@ pub struct SortMetadata {
/// aggregates/constants, so that rows that tie on ORDER BY terms are output in
/// the same relative order the underlying row stream produced them.
pub has_sequence: bool,
/// Whether to use heap-sort with BTreeIndex instead of full-collection sort through Sorter
pub use_heap_sort: bool,
}
/// Initialize resources needed for ORDER BY processing
#[allow(clippy::too_many_arguments)]
pub fn init_order_by(
program: &mut ProgramBuilder,
t_ctx: &mut TranslateCtx,
@@ -49,54 +54,113 @@ pub fn init_order_by(
order_by: &[(Box<ast::Expr>, SortOrder)],
referenced_tables: &TableReferences,
has_group_by: bool,
has_distinct: bool,
aggregates: &[Aggregate],
) -> Result<()> {
let sort_cursor = program.alloc_cursor_id(CursorType::Sorter);
let only_aggs = order_by
.iter()
.all(|(e, _)| is_orderby_agg_or_const(&t_ctx.resolver, e, aggregates));
// only emit sequence column if we have GROUP BY and ORDER BY is not only aggregates or constants
let has_sequence = has_group_by && !only_aggs;
let use_heap_sort = !has_distinct && !has_group_by && t_ctx.limit_ctx.is_some();
// only emit sequence column if (we have GROUP BY and ORDER BY is not only aggregates or constants) OR (we decided to use heap-sort)
let has_sequence = (has_group_by && !only_aggs) || use_heap_sort;
let remappings = order_by_deduplicate_result_columns(order_by, result_columns, has_sequence);
let sort_cursor = if use_heap_sort {
let index_name = format!("heap_sort_{}", program.offset().as_offset_int()); // we don't really care about the name that much, just enough that we don't get name collisions
let mut index_columns = Vec::with_capacity(order_by.len() + result_columns.len());
for (column, order) in order_by {
let collation = get_collseq_from_expr(column, referenced_tables)?;
let pos_in_table = index_columns.len();
index_columns.push(IndexColumn {
name: pos_in_table.to_string(),
order: *order,
pos_in_table,
collation,
default: None,
})
}
let pos_in_table = index_columns.len();
// add sequence number between ORDER BY columns and result column
index_columns.push(IndexColumn {
name: pos_in_table.to_string(),
order: SortOrder::Asc,
pos_in_table,
collation: Some(CollationSeq::Binary),
default: None,
});
for _ in remappings.iter().filter(|r| !r.deduplicated) {
let pos_in_table = index_columns.len();
index_columns.push(IndexColumn {
name: pos_in_table.to_string(),
order: SortOrder::Asc,
pos_in_table,
collation: None,
default: None,
})
}
let index = Arc::new(Index {
name: index_name.clone(),
table_name: String::new(),
ephemeral: true,
root_page: 0,
columns: index_columns,
unique: false,
has_rowid: false,
where_clause: None,
});
program.alloc_cursor_id(CursorType::BTreeIndex(index))
} else {
program.alloc_cursor_id(CursorType::Sorter)
};
t_ctx.meta_sort = Some(SortMetadata {
sort_cursor,
reg_sorter_data: program.alloc_register(),
remappings: order_by_deduplicate_result_columns(order_by, result_columns, has_sequence),
remappings,
has_sequence,
use_heap_sort,
});
/*
* Terms of the ORDER BY clause that is part of a SELECT statement may be assigned a collating sequence using the COLLATE operator,
* in which case the specified collating function is used for sorting.
* Otherwise, if the expression sorted by an ORDER BY clause is a column,
* then the collating sequence of the column is used to determine sort order.
* If the expression is not a column and has no COLLATE clause, then the BINARY collating sequence is used.
*/
let mut collations = order_by
.iter()
.map(|(expr, _)| get_collseq_from_expr(expr, referenced_tables))
.collect::<Result<Vec<_>>>()?;
if use_heap_sort {
program.emit_insn(Insn::OpenEphemeral {
cursor_id: sort_cursor,
is_table: false,
});
} else {
/*
* Terms of the ORDER BY clause that is part of a SELECT statement may be assigned a collating sequence using the COLLATE operator,
* in which case the specified collating function is used for sorting.
* Otherwise, if the expression sorted by an ORDER BY clause is a column,
* then the collating sequence of the column is used to determine sort order.
* If the expression is not a column and has no COLLATE clause, then the BINARY collating sequence is used.
*/
let mut collations = order_by
.iter()
.map(|(expr, _)| get_collseq_from_expr(expr, referenced_tables))
.collect::<Result<Vec<_>>>()?;
if has_sequence {
// sequence column uses BINARY collation
collations.push(Some(CollationSeq::default()));
if has_sequence {
// sequence column uses BINARY collation
collations.push(Some(CollationSeq::default()));
}
let key_len = order_by.len() + if has_sequence { 1 } else { 0 };
program.emit_insn(Insn::SorterOpen {
cursor_id: sort_cursor,
columns: key_len,
order: {
let mut ord: Vec<SortOrder> = order_by.iter().map(|(_, d)| *d).collect();
if has_sequence {
// sequence is ascending tiebreaker
ord.push(SortOrder::Asc);
}
ord
},
collations,
});
}
let key_len = order_by.len() + if has_sequence { 1 } else { 0 };
program.emit_insn(Insn::SorterOpen {
cursor_id: sort_cursor,
columns: key_len,
order: {
let mut ord: Vec<SortOrder> = order_by.iter().map(|(_, d)| *d).collect();
if has_sequence {
// sequence is ascending tiebreaker
ord.push(SortOrder::Asc);
}
ord
},
collations,
});
Ok(())
}
@@ -118,6 +182,7 @@ pub fn emit_order_by(
reg_sorter_data,
ref remappings,
has_sequence,
use_heap_sort,
} = *t_ctx.meta_sort.as_ref().unwrap();
let sorter_column_count = order_by.len()
@@ -128,33 +193,44 @@ pub fn emit_order_by(
// to emit correct explain output.
emit_explain!(program, false, "USE TEMP B-TREE FOR ORDER BY".to_owned());
let pseudo_cursor = program.alloc_cursor_id(CursorType::Pseudo(PseudoCursorType {
column_count: sorter_column_count,
}));
let cursor_id = if !use_heap_sort {
let pseudo_cursor = program.alloc_cursor_id(CursorType::Pseudo(PseudoCursorType {
column_count: sorter_column_count,
}));
program.emit_insn(Insn::OpenPseudo {
cursor_id: pseudo_cursor,
content_reg: reg_sorter_data,
num_fields: sorter_column_count,
});
program.emit_insn(Insn::OpenPseudo {
cursor_id: pseudo_cursor,
content_reg: reg_sorter_data,
num_fields: sorter_column_count,
});
program.emit_insn(Insn::SorterSort {
cursor_id: sort_cursor,
pc_if_empty: sort_loop_end_label,
});
pseudo_cursor
} else {
program.emit_insn(Insn::Rewind {
cursor_id: sort_cursor,
pc_if_empty: sort_loop_end_label,
});
sort_cursor
};
program.emit_insn(Insn::SorterSort {
cursor_id: sort_cursor,
pc_if_empty: sort_loop_end_label,
});
program.preassign_label_to_next_insn(sort_loop_start_label);
emit_offset(program, sort_loop_next_label, t_ctx.reg_offset);
program.emit_insn(Insn::SorterData {
cursor_id: sort_cursor,
dest_reg: reg_sorter_data,
pseudo_cursor,
});
if !use_heap_sort {
program.emit_insn(Insn::SorterData {
cursor_id: sort_cursor,
dest_reg: reg_sorter_data,
pseudo_cursor: cursor_id,
});
}
// We emit the columns in SELECT order, not sorter order (sorter always has the sort keys first).
// This is tracked in sort_metadata.remappings.
let cursor_id = pseudo_cursor;
let start_reg = t_ctx.reg_result_cols_start.unwrap();
for i in 0..result_columns.len() {
let reg = start_reg + i;
@@ -171,14 +247,25 @@ pub fn emit_order_by(
plan,
start_reg,
t_ctx.limit_ctx,
Some(sort_loop_end_label),
if !use_heap_sort {
Some(sort_loop_end_label)
} else {
None
},
)?;
program.resolve_label(sort_loop_next_label, program.offset());
program.emit_insn(Insn::SorterNext {
cursor_id: sort_cursor,
pc_if_next: sort_loop_start_label,
});
if !use_heap_sort {
program.emit_insn(Insn::SorterNext {
cursor_id: sort_cursor,
pc_if_next: sort_loop_start_label,
});
} else {
program.emit_insn(Insn::Next {
cursor_id: sort_cursor,
pc_if_next: sort_loop_start_label,
});
}
program.preassign_label_to_next_insn(sort_loop_end_label);
Ok(())
@@ -237,6 +324,46 @@ pub fn order_by_sorter_insert(
)?;
}
}
let SortMetadata {
sort_cursor,
reg_sorter_data,
use_heap_sort,
..
} = sort_metadata;
let skip_label = if *use_heap_sort {
// skip records which greater than current top-k maintained in a separate BTreeIndex
let insert_label = program.allocate_label();
let skip_label = program.allocate_label();
let limit = t_ctx.limit_ctx.as_ref().expect("limit must be set");
let limit_reg = t_ctx.reg_limit_offset_sum.unwrap_or(limit.reg_limit);
program.emit_insn(Insn::IfPos {
reg: limit_reg,
target_pc: insert_label,
decrement_by: 1,
});
program.emit_insn(Insn::Last {
cursor_id: *sort_cursor,
pc_if_empty: insert_label,
});
program.emit_insn(Insn::IdxLE {
cursor_id: *sort_cursor,
start_reg,
num_regs: orderby_sorter_column_count,
target_pc: skip_label,
});
program.emit_insn(Insn::Delete {
cursor_id: *sort_cursor,
table_name: "".to_string(),
is_part_of_update: false,
});
program.preassign_label_to_next_insn(insert_label);
Some(skip_label)
} else {
None
};
let mut cur_reg = start_reg + order_by_len;
if sort_metadata.has_sequence {
program.emit_insn(Insn::Sequence {
@@ -330,19 +457,31 @@ pub fn order_by_sorter_insert(
}
}
let SortMetadata {
sort_cursor,
reg_sorter_data,
..
} = sort_metadata;
sorter_insert(
program,
start_reg,
orderby_sorter_column_count,
*sort_cursor,
*reg_sorter_data,
);
if *use_heap_sort {
program.emit_insn(Insn::MakeRecord {
start_reg,
count: orderby_sorter_column_count,
dest_reg: *reg_sorter_data,
index_name: None,
affinity_str: None,
});
program.emit_insn(Insn::IdxInsert {
cursor_id: *sort_cursor,
record_reg: *reg_sorter_data,
unpacked_start: None,
unpacked_count: None,
flags: IdxInsertFlags::new(),
});
program.preassign_label_to_next_insn(skip_label.unwrap());
} else {
sorter_insert(
program,
start_reg,
orderby_sorter_column_count,
*sort_cursor,
*reg_sorter_data,
);
}
Ok(())
}

View File

@@ -11,7 +11,7 @@ use crate::storage::btree::{
use crate::storage::database::DatabaseFile;
use crate::storage::page_cache::PageCache;
use crate::storage::pager::{AtomicDbState, CreateBTreeFlags, DbState};
use crate::storage::sqlite3_ondisk::{read_varint, DatabaseHeader, PageSize};
use crate::storage::sqlite3_ondisk::{read_varint_fast, DatabaseHeader, PageSize};
use crate::translate::collate::CollationSeq;
use crate::types::{
compare_immutable, compare_records_generic, Extendable, IOCompletions, ImmutableRecord,
@@ -1486,44 +1486,6 @@ pub fn op_last(
Ok(InsnFunctionStepResult::Step)
}
/// Fast varint reader optimized for the common cases of 1-byte and 2-byte varints.
///
/// This function is a performance-optimized version of `read_varint()` that handles
/// the most common varint cases inline before falling back to the full implementation.
/// It follows the same varint encoding as SQLite.
///
/// # Optimized Cases
///
/// - **Single-byte case**: Values 0-127 (0x00-0x7F) are returned immediately
/// - **Two-byte case**: Values 128-16383 (0x80-0x3FFF) are handled inline
/// - **Multi-byte case**: Larger values fall back to the full `read_varint()` implementation
///
/// This function is similar to `sqlite3GetVarint32`
#[inline(always)]
fn read_varint_fast(buf: &[u8]) -> Result<(u64, usize)> {
// Fast path: Single-byte varint
if let Some(&first_byte) = buf.first() {
if first_byte & 0x80 == 0 {
return Ok((first_byte as u64, 1));
}
} else {
crate::bail_corrupt_error!("Invalid varint");
}
// Fast path: Two-byte varint
if let Some(&second_byte) = buf.get(1) {
if second_byte & 0x80 == 0 {
let v = (((buf[0] & 0x7f) as u64) << 7) + (second_byte as u64);
return Ok((v, 2));
}
} else {
crate::bail_corrupt_error!("Invalid varint");
}
//Fallback: Multi-byte varint
read_varint(buf)
}
#[derive(Debug, Clone, Copy)]
pub enum OpColumnState {
Start,
@@ -5778,8 +5740,9 @@ pub fn op_sequence(
},
insn
);
let cursor = state.get_cursor(*cursor_id).as_sorter_mut();
let seq_num = cursor.next_sequence();
let cursor_seq = state.cursor_seqs.get_mut(*cursor_id).unwrap();
let seq_num = *cursor_seq;
*cursor_seq += 1;
state.registers[*target_reg] = Register::Value(Value::Integer(seq_num));
state.pc += 1;
Ok(InsnFunctionStepResult::Step)
@@ -5800,8 +5763,10 @@ pub fn op_sequence_test(
},
insn
);
let cursor = state.get_cursor(*cursor_id).as_sorter_mut();
state.pc = if cursor.seq_beginning() {
let cursor_seq = state.cursor_seqs.get_mut(*cursor_id).unwrap();
let was_zero = *cursor_seq == 0;
*cursor_seq += 1;
state.pc = if was_zero {
target_pc.as_offset_int()
} else {
state.pc + 1

View File

@@ -271,6 +271,7 @@ pub struct ProgramState {
pub io_completions: Option<IOCompletions>,
pub pc: InsnReference,
cursors: Vec<Option<Cursor>>,
cursor_seqs: Vec<i64>,
registers: Vec<Register>,
pub(crate) result_row: Option<Row>,
last_compare: Option<std::cmp::Ordering>,
@@ -325,11 +326,13 @@ unsafe impl Sync for ProgramState {}
impl ProgramState {
pub fn new(max_registers: usize, max_cursors: usize) -> Self {
let cursors: Vec<Option<Cursor>> = (0..max_cursors).map(|_| None).collect();
let cursor_seqs = vec![0i64; max_cursors];
let registers = vec![Register::Value(Value::Null); max_registers];
Self {
io_completions: None,
pc: 0,
cursors,
cursor_seqs,
registers,
result_row: None,
last_compare: None,
@@ -412,6 +415,7 @@ impl ProgramState {
if let Some(max_cursors) = max_cursors {
self.cursors.resize_with(max_cursors, || None);
self.cursor_seqs.resize(max_cursors, 0);
}
if let Some(max_resgisters) = max_registers {
self.registers

View File

@@ -86,7 +86,6 @@ pub struct Sorter {
insert_state: InsertState,
/// State machine for [Sorter::init_chunk_heap]
init_chunk_heap_state: InitChunkHeapState,
seq_count: i64,
pending_completions: Vec<Completion>,
}
@@ -125,7 +124,6 @@ impl Sorter {
sort_state: SortState::Start,
insert_state: InsertState::Start,
init_chunk_heap_state: InitChunkHeapState::Start,
seq_count: 0,
pending_completions: Vec::new(),
}
}
@@ -138,21 +136,6 @@ impl Sorter {
self.current.is_some()
}
/// Get current sequence count and increment it
pub fn next_sequence(&mut self) -> i64 {
let current = self.seq_count;
self.seq_count += 1;
current
}
/// Test if at beginning of sequence (count == 0) and increment
/// Returns true if this was the first call (seq_count was 0)
pub fn seq_beginning(&mut self) -> bool {
let was_zero = self.seq_count == 0;
self.seq_count += 1;
was_zero
}
// We do the sorting here since this is what is called by the SorterSort instruction
pub fn sort(&mut self) -> Result<IOResult<()>> {
loop {

View File

@@ -241,6 +241,18 @@ do_execsql_test_on_specific_db {:memory:} orderby_alias_precedence {
} {2|100
1|200}
# Check that ORDER BY with heap-sort properly handle multiple rows with same order key + result values
do_execsql_test_on_specific_db {:memory:} orderby_same_rows {
CREATE TABLE t(x,y,z);
INSERT INTO t VALUES (1,2,3),(1,2,6),(1,2,9),(1,2,10),(1,3,-1),(1,3,-2);
SELECT x, y FROM t ORDER BY x, y LIMIT 10;
} {1|2
1|2
1|2
1|2
1|3
1|3}
# https://github.com/tursodatabase/turso/issues/3684
do_execsql_test_on_specific_db {:memory:} orderby_alias_shadows_column {
CREATE TABLE t(a, b);
@@ -264,4 +276,4 @@ FROM
a JOIN b ON a.id = b.id
ORDER BY
value;
}
}