diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 1e5827198..190b94479 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -1486,6 +1486,44 @@ pub fn read_integer(buf: &[u8], serial_type: u8) -> Result { } } +/// Fast varint reader optimized for the common cases of 1-byte and 2-byte varints. +/// +/// This function is a performance-optimized version of `read_varint()` that handles +/// the most common varint cases inline before falling back to the full implementation. +/// It follows the same varint encoding as SQLite. +/// +/// # Optimized Cases +/// +/// - **Single-byte case**: Values 0-127 (0x00-0x7F) are returned immediately +/// - **Two-byte case**: Values 128-16383 (0x80-0x3FFF) are handled inline +/// - **Multi-byte case**: Larger values fall back to the full `read_varint()` implementation +/// +/// This function is similar to `sqlite3GetVarint32` +#[inline(always)] +pub fn read_varint_fast(buf: &[u8]) -> Result<(u64, usize)> { + // Fast path: Single-byte varint + if let Some(&first_byte) = buf.first() { + if first_byte & 0x80 == 0 { + return Ok((first_byte as u64, 1)); + } + } else { + crate::bail_corrupt_error!("Invalid varint"); + } + + // Fast path: Two-byte varint + if let Some(&second_byte) = buf.get(1) { + if second_byte & 0x80 == 0 { + let v = (((buf[0] & 0x7f) as u64) << 7) + (second_byte as u64); + return Ok((v, 2)); + } + } else { + crate::bail_corrupt_error!("Invalid varint"); + } + + //Fallback: Multi-byte varint + read_varint(buf) +} + #[inline(always)] pub fn read_varint(buf: &[u8]) -> Result<(u64, usize)> { let mut v: u64 = 0; diff --git a/core/translate/emitter.rs b/core/translate/emitter.rs index 6069ac23b..8d75dc63d 100644 --- a/core/translate/emitter.rs +++ b/core/translate/emitter.rs @@ -319,6 +319,7 @@ pub fn emit_query<'a>( &plan.order_by, &plan.table_references, plan.group_by.is_some(), + plan.distinctness != Distinctness::NonDistinct, &plan.aggregates, )?; } diff --git a/core/translate/order_by.rs b/core/translate/order_by.rs index aafb32a21..2dd7a3c8a 100644 --- a/core/translate/order_by.rs +++ b/core/translate/order_by.rs @@ -1,8 +1,10 @@ +use std::sync::Arc; + use turso_parser::ast::{self, SortOrder}; use crate::{ emit_explain, - schema::PseudoCursorType, + schema::{Index, IndexColumn, PseudoCursorType}, translate::{ collate::{get_collseq_from_expr, CollationSeq}, group_by::is_orderby_agg_or_const, @@ -11,7 +13,7 @@ use crate::{ util::exprs_are_equivalent, vdbe::{ builder::{CursorType, ProgramBuilder}, - insn::Insn, + insn::{IdxInsertFlags, Insn}, }, QueryMode, Result, }; @@ -39,9 +41,12 @@ pub struct SortMetadata { /// aggregates/constants, so that rows that tie on ORDER BY terms are output in /// the same relative order the underlying row stream produced them. pub has_sequence: bool, + /// Whether to use heap-sort with BTreeIndex instead of full-collection sort through Sorter + pub use_heap_sort: bool, } /// Initialize resources needed for ORDER BY processing +#[allow(clippy::too_many_arguments)] pub fn init_order_by( program: &mut ProgramBuilder, t_ctx: &mut TranslateCtx, @@ -49,54 +54,113 @@ pub fn init_order_by( order_by: &[(Box, SortOrder)], referenced_tables: &TableReferences, has_group_by: bool, + has_distinct: bool, aggregates: &[Aggregate], ) -> Result<()> { - let sort_cursor = program.alloc_cursor_id(CursorType::Sorter); let only_aggs = order_by .iter() .all(|(e, _)| is_orderby_agg_or_const(&t_ctx.resolver, e, aggregates)); - // only emit sequence column if we have GROUP BY and ORDER BY is not only aggregates or constants - let has_sequence = has_group_by && !only_aggs; + let use_heap_sort = !has_distinct && !has_group_by && t_ctx.limit_ctx.is_some(); + + // only emit sequence column if (we have GROUP BY and ORDER BY is not only aggregates or constants) OR (we decided to use heap-sort) + let has_sequence = (has_group_by && !only_aggs) || use_heap_sort; + + let remappings = order_by_deduplicate_result_columns(order_by, result_columns, has_sequence); + let sort_cursor = if use_heap_sort { + let index_name = format!("heap_sort_{}", program.offset().as_offset_int()); // we don't really care about the name that much, just enough that we don't get name collisions + let mut index_columns = Vec::with_capacity(order_by.len() + result_columns.len()); + for (column, order) in order_by { + let collation = get_collseq_from_expr(column, referenced_tables)?; + let pos_in_table = index_columns.len(); + index_columns.push(IndexColumn { + name: pos_in_table.to_string(), + order: *order, + pos_in_table, + collation, + default: None, + }) + } + let pos_in_table = index_columns.len(); + // add sequence number between ORDER BY columns and result column + index_columns.push(IndexColumn { + name: pos_in_table.to_string(), + order: SortOrder::Asc, + pos_in_table, + collation: Some(CollationSeq::Binary), + default: None, + }); + for _ in remappings.iter().filter(|r| !r.deduplicated) { + let pos_in_table = index_columns.len(); + index_columns.push(IndexColumn { + name: pos_in_table.to_string(), + order: SortOrder::Asc, + pos_in_table, + collation: None, + default: None, + }) + } + let index = Arc::new(Index { + name: index_name.clone(), + table_name: String::new(), + ephemeral: true, + root_page: 0, + columns: index_columns, + unique: false, + has_rowid: false, + where_clause: None, + }); + program.alloc_cursor_id(CursorType::BTreeIndex(index)) + } else { + program.alloc_cursor_id(CursorType::Sorter) + }; t_ctx.meta_sort = Some(SortMetadata { sort_cursor, reg_sorter_data: program.alloc_register(), - remappings: order_by_deduplicate_result_columns(order_by, result_columns, has_sequence), + remappings, has_sequence, + use_heap_sort, }); - /* - * Terms of the ORDER BY clause that is part of a SELECT statement may be assigned a collating sequence using the COLLATE operator, - * in which case the specified collating function is used for sorting. - * Otherwise, if the expression sorted by an ORDER BY clause is a column, - * then the collating sequence of the column is used to determine sort order. - * If the expression is not a column and has no COLLATE clause, then the BINARY collating sequence is used. - */ - let mut collations = order_by - .iter() - .map(|(expr, _)| get_collseq_from_expr(expr, referenced_tables)) - .collect::>>()?; + if use_heap_sort { + program.emit_insn(Insn::OpenEphemeral { + cursor_id: sort_cursor, + is_table: false, + }); + } else { + /* + * Terms of the ORDER BY clause that is part of a SELECT statement may be assigned a collating sequence using the COLLATE operator, + * in which case the specified collating function is used for sorting. + * Otherwise, if the expression sorted by an ORDER BY clause is a column, + * then the collating sequence of the column is used to determine sort order. + * If the expression is not a column and has no COLLATE clause, then the BINARY collating sequence is used. + */ + let mut collations = order_by + .iter() + .map(|(expr, _)| get_collseq_from_expr(expr, referenced_tables)) + .collect::>>()?; - if has_sequence { - // sequence column uses BINARY collation - collations.push(Some(CollationSeq::default())); + if has_sequence { + // sequence column uses BINARY collation + collations.push(Some(CollationSeq::default())); + } + + let key_len = order_by.len() + if has_sequence { 1 } else { 0 }; + + program.emit_insn(Insn::SorterOpen { + cursor_id: sort_cursor, + columns: key_len, + order: { + let mut ord: Vec = order_by.iter().map(|(_, d)| *d).collect(); + if has_sequence { + // sequence is ascending tiebreaker + ord.push(SortOrder::Asc); + } + ord + }, + collations, + }); } - - let key_len = order_by.len() + if has_sequence { 1 } else { 0 }; - - program.emit_insn(Insn::SorterOpen { - cursor_id: sort_cursor, - columns: key_len, - order: { - let mut ord: Vec = order_by.iter().map(|(_, d)| *d).collect(); - if has_sequence { - // sequence is ascending tiebreaker - ord.push(SortOrder::Asc); - } - ord - }, - collations, - }); Ok(()) } @@ -118,6 +182,7 @@ pub fn emit_order_by( reg_sorter_data, ref remappings, has_sequence, + use_heap_sort, } = *t_ctx.meta_sort.as_ref().unwrap(); let sorter_column_count = order_by.len() @@ -128,33 +193,44 @@ pub fn emit_order_by( // to emit correct explain output. emit_explain!(program, false, "USE TEMP B-TREE FOR ORDER BY".to_owned()); - let pseudo_cursor = program.alloc_cursor_id(CursorType::Pseudo(PseudoCursorType { - column_count: sorter_column_count, - })); + let cursor_id = if !use_heap_sort { + let pseudo_cursor = program.alloc_cursor_id(CursorType::Pseudo(PseudoCursorType { + column_count: sorter_column_count, + })); - program.emit_insn(Insn::OpenPseudo { - cursor_id: pseudo_cursor, - content_reg: reg_sorter_data, - num_fields: sorter_column_count, - }); + program.emit_insn(Insn::OpenPseudo { + cursor_id: pseudo_cursor, + content_reg: reg_sorter_data, + num_fields: sorter_column_count, + }); + + program.emit_insn(Insn::SorterSort { + cursor_id: sort_cursor, + pc_if_empty: sort_loop_end_label, + }); + pseudo_cursor + } else { + program.emit_insn(Insn::Rewind { + cursor_id: sort_cursor, + pc_if_empty: sort_loop_end_label, + }); + sort_cursor + }; - program.emit_insn(Insn::SorterSort { - cursor_id: sort_cursor, - pc_if_empty: sort_loop_end_label, - }); program.preassign_label_to_next_insn(sort_loop_start_label); emit_offset(program, sort_loop_next_label, t_ctx.reg_offset); - program.emit_insn(Insn::SorterData { - cursor_id: sort_cursor, - dest_reg: reg_sorter_data, - pseudo_cursor, - }); + if !use_heap_sort { + program.emit_insn(Insn::SorterData { + cursor_id: sort_cursor, + dest_reg: reg_sorter_data, + pseudo_cursor: cursor_id, + }); + } // We emit the columns in SELECT order, not sorter order (sorter always has the sort keys first). // This is tracked in sort_metadata.remappings. - let cursor_id = pseudo_cursor; let start_reg = t_ctx.reg_result_cols_start.unwrap(); for i in 0..result_columns.len() { let reg = start_reg + i; @@ -171,14 +247,25 @@ pub fn emit_order_by( plan, start_reg, t_ctx.limit_ctx, - Some(sort_loop_end_label), + if !use_heap_sort { + Some(sort_loop_end_label) + } else { + None + }, )?; program.resolve_label(sort_loop_next_label, program.offset()); - program.emit_insn(Insn::SorterNext { - cursor_id: sort_cursor, - pc_if_next: sort_loop_start_label, - }); + if !use_heap_sort { + program.emit_insn(Insn::SorterNext { + cursor_id: sort_cursor, + pc_if_next: sort_loop_start_label, + }); + } else { + program.emit_insn(Insn::Next { + cursor_id: sort_cursor, + pc_if_next: sort_loop_start_label, + }); + } program.preassign_label_to_next_insn(sort_loop_end_label); Ok(()) @@ -237,6 +324,46 @@ pub fn order_by_sorter_insert( )?; } } + + let SortMetadata { + sort_cursor, + reg_sorter_data, + use_heap_sort, + .. + } = sort_metadata; + + let skip_label = if *use_heap_sort { + // skip records which greater than current top-k maintained in a separate BTreeIndex + let insert_label = program.allocate_label(); + let skip_label = program.allocate_label(); + let limit = t_ctx.limit_ctx.as_ref().expect("limit must be set"); + let limit_reg = t_ctx.reg_limit_offset_sum.unwrap_or(limit.reg_limit); + program.emit_insn(Insn::IfPos { + reg: limit_reg, + target_pc: insert_label, + decrement_by: 1, + }); + program.emit_insn(Insn::Last { + cursor_id: *sort_cursor, + pc_if_empty: insert_label, + }); + program.emit_insn(Insn::IdxLE { + cursor_id: *sort_cursor, + start_reg, + num_regs: orderby_sorter_column_count, + target_pc: skip_label, + }); + program.emit_insn(Insn::Delete { + cursor_id: *sort_cursor, + table_name: "".to_string(), + is_part_of_update: false, + }); + program.preassign_label_to_next_insn(insert_label); + Some(skip_label) + } else { + None + }; + let mut cur_reg = start_reg + order_by_len; if sort_metadata.has_sequence { program.emit_insn(Insn::Sequence { @@ -330,19 +457,31 @@ pub fn order_by_sorter_insert( } } - let SortMetadata { - sort_cursor, - reg_sorter_data, - .. - } = sort_metadata; - - sorter_insert( - program, - start_reg, - orderby_sorter_column_count, - *sort_cursor, - *reg_sorter_data, - ); + if *use_heap_sort { + program.emit_insn(Insn::MakeRecord { + start_reg, + count: orderby_sorter_column_count, + dest_reg: *reg_sorter_data, + index_name: None, + affinity_str: None, + }); + program.emit_insn(Insn::IdxInsert { + cursor_id: *sort_cursor, + record_reg: *reg_sorter_data, + unpacked_start: None, + unpacked_count: None, + flags: IdxInsertFlags::new(), + }); + program.preassign_label_to_next_insn(skip_label.unwrap()); + } else { + sorter_insert( + program, + start_reg, + orderby_sorter_column_count, + *sort_cursor, + *reg_sorter_data, + ); + } Ok(()) } diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index c23782556..30ece2be8 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -11,7 +11,7 @@ use crate::storage::btree::{ use crate::storage::database::DatabaseFile; use crate::storage::page_cache::PageCache; use crate::storage::pager::{AtomicDbState, CreateBTreeFlags, DbState}; -use crate::storage::sqlite3_ondisk::{read_varint, DatabaseHeader, PageSize}; +use crate::storage::sqlite3_ondisk::{read_varint_fast, DatabaseHeader, PageSize}; use crate::translate::collate::CollationSeq; use crate::types::{ compare_immutable, compare_records_generic, Extendable, IOCompletions, ImmutableRecord, @@ -1486,44 +1486,6 @@ pub fn op_last( Ok(InsnFunctionStepResult::Step) } -/// Fast varint reader optimized for the common cases of 1-byte and 2-byte varints. -/// -/// This function is a performance-optimized version of `read_varint()` that handles -/// the most common varint cases inline before falling back to the full implementation. -/// It follows the same varint encoding as SQLite. -/// -/// # Optimized Cases -/// -/// - **Single-byte case**: Values 0-127 (0x00-0x7F) are returned immediately -/// - **Two-byte case**: Values 128-16383 (0x80-0x3FFF) are handled inline -/// - **Multi-byte case**: Larger values fall back to the full `read_varint()` implementation -/// -/// This function is similar to `sqlite3GetVarint32` -#[inline(always)] -fn read_varint_fast(buf: &[u8]) -> Result<(u64, usize)> { - // Fast path: Single-byte varint - if let Some(&first_byte) = buf.first() { - if first_byte & 0x80 == 0 { - return Ok((first_byte as u64, 1)); - } - } else { - crate::bail_corrupt_error!("Invalid varint"); - } - - // Fast path: Two-byte varint - if let Some(&second_byte) = buf.get(1) { - if second_byte & 0x80 == 0 { - let v = (((buf[0] & 0x7f) as u64) << 7) + (second_byte as u64); - return Ok((v, 2)); - } - } else { - crate::bail_corrupt_error!("Invalid varint"); - } - - //Fallback: Multi-byte varint - read_varint(buf) -} - #[derive(Debug, Clone, Copy)] pub enum OpColumnState { Start, @@ -5778,8 +5740,9 @@ pub fn op_sequence( }, insn ); - let cursor = state.get_cursor(*cursor_id).as_sorter_mut(); - let seq_num = cursor.next_sequence(); + let cursor_seq = state.cursor_seqs.get_mut(*cursor_id).unwrap(); + let seq_num = *cursor_seq; + *cursor_seq += 1; state.registers[*target_reg] = Register::Value(Value::Integer(seq_num)); state.pc += 1; Ok(InsnFunctionStepResult::Step) @@ -5800,8 +5763,10 @@ pub fn op_sequence_test( }, insn ); - let cursor = state.get_cursor(*cursor_id).as_sorter_mut(); - state.pc = if cursor.seq_beginning() { + let cursor_seq = state.cursor_seqs.get_mut(*cursor_id).unwrap(); + let was_zero = *cursor_seq == 0; + *cursor_seq += 1; + state.pc = if was_zero { target_pc.as_offset_int() } else { state.pc + 1 diff --git a/core/vdbe/mod.rs b/core/vdbe/mod.rs index 68981dd70..8eec50ba3 100644 --- a/core/vdbe/mod.rs +++ b/core/vdbe/mod.rs @@ -271,6 +271,7 @@ pub struct ProgramState { pub io_completions: Option, pub pc: InsnReference, cursors: Vec>, + cursor_seqs: Vec, registers: Vec, pub(crate) result_row: Option, last_compare: Option, @@ -325,11 +326,13 @@ unsafe impl Sync for ProgramState {} impl ProgramState { pub fn new(max_registers: usize, max_cursors: usize) -> Self { let cursors: Vec> = (0..max_cursors).map(|_| None).collect(); + let cursor_seqs = vec![0i64; max_cursors]; let registers = vec![Register::Value(Value::Null); max_registers]; Self { io_completions: None, pc: 0, cursors, + cursor_seqs, registers, result_row: None, last_compare: None, @@ -412,6 +415,7 @@ impl ProgramState { if let Some(max_cursors) = max_cursors { self.cursors.resize_with(max_cursors, || None); + self.cursor_seqs.resize(max_cursors, 0); } if let Some(max_resgisters) = max_registers { self.registers diff --git a/core/vdbe/sorter.rs b/core/vdbe/sorter.rs index 4f1f190f6..81abde222 100644 --- a/core/vdbe/sorter.rs +++ b/core/vdbe/sorter.rs @@ -86,7 +86,6 @@ pub struct Sorter { insert_state: InsertState, /// State machine for [Sorter::init_chunk_heap] init_chunk_heap_state: InitChunkHeapState, - seq_count: i64, pending_completions: Vec, } @@ -125,7 +124,6 @@ impl Sorter { sort_state: SortState::Start, insert_state: InsertState::Start, init_chunk_heap_state: InitChunkHeapState::Start, - seq_count: 0, pending_completions: Vec::new(), } } @@ -138,21 +136,6 @@ impl Sorter { self.current.is_some() } - /// Get current sequence count and increment it - pub fn next_sequence(&mut self) -> i64 { - let current = self.seq_count; - self.seq_count += 1; - current - } - - /// Test if at beginning of sequence (count == 0) and increment - /// Returns true if this was the first call (seq_count was 0) - pub fn seq_beginning(&mut self) -> bool { - let was_zero = self.seq_count == 0; - self.seq_count += 1; - was_zero - } - // We do the sorting here since this is what is called by the SorterSort instruction pub fn sort(&mut self) -> Result> { loop { diff --git a/testing/orderby.test b/testing/orderby.test index 0fac87856..3864cf695 100755 --- a/testing/orderby.test +++ b/testing/orderby.test @@ -241,6 +241,18 @@ do_execsql_test_on_specific_db {:memory:} orderby_alias_precedence { } {2|100 1|200} +# Check that ORDER BY with heap-sort properly handle multiple rows with same order key + result values +do_execsql_test_on_specific_db {:memory:} orderby_same_rows { + CREATE TABLE t(x,y,z); + INSERT INTO t VALUES (1,2,3),(1,2,6),(1,2,9),(1,2,10),(1,3,-1),(1,3,-2); + SELECT x, y FROM t ORDER BY x, y LIMIT 10; +} {1|2 +1|2 +1|2 +1|2 +1|3 +1|3} + # https://github.com/tursodatabase/turso/issues/3684 do_execsql_test_on_specific_db {:memory:} orderby_alias_shadows_column { CREATE TABLE t(a, b); @@ -264,4 +276,4 @@ FROM a JOIN b ON a.id = b.id ORDER BY value; - } \ No newline at end of file + }