Merge 'impl order by desc' from lemonwx

Hi all,
I attempted to implement the `order by desc`, which mainly consists of
the following two parts:
1. for cases where the primary key can be utilized, I implemented
`push_scan_direction` to push the scan direction down to the `Scan`.
2. for cases where the primary key cannot be utilized, I re-implemented
the sorting in `Sorter::rewind` based on whether it's desc or asc.
there is also some related work to be done, such as sort using secondary
index, I will attempt to do next.
Please help review this and let me know how I can improve it, thanks in
advance

Reviewed-by: Pere Diaz Bou <pere-altea@hotmail.com>

Closes #376
This commit is contained in:
jussisaurio
2024-11-21 16:37:38 +02:00
12 changed files with 431 additions and 34 deletions

View File

@@ -91,4 +91,12 @@ impl Cursor for PseudoCursor {
fn btree_create(&mut self, _flags: usize) -> u32 {
unreachable!("Please don't.")
}
fn last(&mut self) -> Result<CursorResult<()>> {
todo!()
}
fn prev(&mut self) -> Result<CursorResult<()>> {
todo!()
}
}

View File

@@ -9,6 +9,7 @@ use crate::types::{Cursor, CursorResult, OwnedRecord, OwnedValue, SeekKey, SeekO
use crate::Result;
use std::cell::{Ref, RefCell};
use std::i32;
use std::pin::Pin;
use std::rc::Rc;
@@ -87,7 +88,10 @@ struct PageStack {
/// cell_indices[current_page] is the current cell index being consumed. Similarly
/// cell_indices[current_page-1] is the cell index of the parent of the current page
/// that we save in case of going back up.
cell_indices: RefCell<[usize; BTCURSOR_MAX_DEPTH + 1]>,
/// There are two points that need special attention:
/// If cell_indices[current_page] = -1, it indicates that the current iteration has reached the start of the current_page
/// If cell_indices[current_page] = `cell_count`, it means that the current iteration has reached the end of the current_page
cell_indices: RefCell<[i32; BTCURSOR_MAX_DEPTH + 1]>,
}
impl BTreeCursor {
@@ -130,13 +134,94 @@ impl BTreeCursor {
Ok(CursorResult::Ok(cell_count == 0))
}
fn get_prev_record(&mut self) -> Result<CursorResult<(Option<u64>, Option<OwnedRecord>)>> {
loop {
let mem_page_rc = self.stack.top();
let cell_idx = self.stack.current_index();
// moved to current page begin
// todo: find a better way to flag moved to end or begin of page
if self.stack.curr_idx_out_of_begin() {
loop {
if self.stack.current_index() > 0 {
self.stack.retreat();
break;
}
if self.stack.has_parent() {
self.stack.pop();
} else {
// moved to begin of btree
return Ok(CursorResult::Ok((None, None)));
}
}
// continue to next loop to get record from the new page
continue;
}
let cell_idx = cell_idx as usize;
debug!(
"get_prev_record current id={} cell={}",
mem_page_rc.borrow().id,
cell_idx
);
if mem_page_rc.borrow().is_locked() {
return Ok(CursorResult::IO);
}
if !mem_page_rc.borrow().is_loaded() {
self.pager.load_page(mem_page_rc.clone())?;
return Ok(CursorResult::IO);
}
let mem_page = mem_page_rc.borrow();
let contents = mem_page.contents.as_ref().unwrap();
let cell_count = contents.cell_count();
let cell_idx = if cell_idx >= cell_count {
self.stack.set_cell_index(cell_count as i32 - 1);
cell_count - 1
} else {
cell_idx
};
let cell = contents.cell_get(
cell_idx,
self.pager.clone(),
self.max_local(contents.page_type()),
self.min_local(contents.page_type()),
self.usable_space(),
)?;
match cell {
BTreeCell::TableInteriorCell(TableInteriorCell {
_left_child_page,
_rowid,
}) => {
let mem_page = self.pager.read_page(_left_child_page as usize)?;
self.stack.push(mem_page);
// use cell_index = i32::MAX to tell next loop to go to the end of the current page
self.stack.set_cell_index(i32::MAX);
continue;
}
BTreeCell::TableLeafCell(TableLeafCell {
_rowid, _payload, ..
}) => {
self.stack.retreat();
let record: OwnedRecord =
crate::storage::sqlite3_ondisk::read_record(&_payload)?;
return Ok(CursorResult::Ok((Some(_rowid), Some(record))));
}
BTreeCell::IndexInteriorCell(_) => todo!(),
BTreeCell::IndexLeafCell(_) => todo!(),
}
}
}
fn get_next_record(
&mut self,
predicate: Option<(SeekKey<'_>, SeekOp)>,
) -> Result<CursorResult<(Option<u64>, Option<OwnedRecord>)>> {
loop {
let mem_page_rc = self.stack.top();
let cell_idx = self.stack.current_index();
let cell_idx = self.stack.current_index() as usize;
debug!("current id={} cell={}", mem_page_rc.borrow().id, cell_idx);
if mem_page_rc.borrow().is_locked() {
@@ -406,14 +491,14 @@ impl BTreeCursor {
let contents = page.contents.as_ref().unwrap();
if contents.is_leaf() {
if contents.cell_count() > 0 {
self.stack.set_cell_index(contents.cell_count() - 1);
self.stack.set_cell_index(contents.cell_count() as i32 - 1);
}
return Ok(CursorResult::Ok(()));
}
match contents.rightmost_pointer() {
Some(right_most_pointer) => {
self.stack.set_cell_index(contents.cell_count() + 1);
self.stack.set_cell_index(contents.cell_count() as i32 + 1);
let mem_page = self.pager.read_page(right_most_pointer as usize).unwrap();
self.stack.push(mem_page);
continue;
@@ -1544,20 +1629,30 @@ impl PageStack {
}
/// Cell index of the current page
fn current_index(&self) -> usize {
fn current_index(&self) -> i32 {
let current = self.current();
self.cell_indices.borrow()[current]
}
fn curr_idx_out_of_begin(&self) -> bool {
let cell_idx = self.current_index();
cell_idx < 0
}
/// Advance the current cell index of the current page to the next cell.
fn advance(&self) {
let current = self.current();
self.cell_indices.borrow_mut()[current] += 1;
}
fn set_cell_index(&self, idx: usize) {
fn retreat(&self) {
let current = self.current();
self.cell_indices.borrow_mut()[current] = idx;
self.cell_indices.borrow_mut()[current] -= 1;
}
fn set_cell_index(&self, idx: i32) {
let current = self.current();
self.cell_indices.borrow_mut()[current] = idx
}
fn has_parent(&self) -> bool {
@@ -1633,6 +1728,13 @@ impl Cursor for BTreeCursor {
}
}
fn last(&mut self) -> Result<CursorResult<()>> {
match self.move_to_rightmost()? {
CursorResult::Ok(_) => self.prev(),
CursorResult::IO => Ok(CursorResult::IO),
}
}
fn next(&mut self) -> Result<CursorResult<()>> {
match self.get_next_record(None)? {
CursorResult::Ok((rowid, next)) => {
@@ -1644,6 +1746,17 @@ impl Cursor for BTreeCursor {
}
}
fn prev(&mut self) -> Result<CursorResult<()>> {
match self.get_prev_record()? {
CursorResult::Ok((rowid, record)) => {
self.rowid.replace(rowid);
self.record.replace(record);
Ok(CursorResult::Ok(()))
}
CursorResult::IO => Ok(CursorResult::IO),
}
}
fn wait_for_completion(&mut self) -> Result<()> {
// TODO: Wait for pager I/O to complete
Ok(())

View File

@@ -8,7 +8,7 @@ use sqlite3_parser::ast;
use crate::schema::{Column, PseudoTable, Table};
use crate::storage::sqlite3_ondisk::DatabaseHeader;
use crate::translate::expr::resolve_ident_pseudo_table;
use crate::translate::plan::Search;
use crate::translate::plan::{IterationDirection, Search};
use crate::types::{OwnedRecord, OwnedValue};
use crate::vdbe::builder::ProgramBuilder;
use crate::vdbe::{BranchOffset, Insn, Program};
@@ -173,12 +173,15 @@ impl Emitter for Operator {
id,
step,
predicates,
..
iter_dir,
} => {
*step += 1;
const SCAN_OPEN_READ: usize = 1;
const SCAN_BODY: usize = 2;
const SCAN_NEXT: usize = 3;
let reverse = iter_dir
.as_ref()
.is_some_and(|iter_dir| *iter_dir == IterationDirection::Backwards);
match *step {
SCAN_OPEN_READ => {
let cursor_id = program.alloc_cursor_id(
@@ -199,13 +202,24 @@ impl Emitter for Operator {
SCAN_BODY => {
let cursor_id =
program.resolve_cursor_id(&table_reference.table_identifier, None);
program.emit_insn(Insn::RewindAsync { cursor_id });
if reverse {
program.emit_insn(Insn::LastAsync { cursor_id });
} else {
program.emit_insn(Insn::RewindAsync { cursor_id });
}
let scan_loop_body_label = program.allocate_label();
let halt_label = m.termination_label_stack.last().unwrap();
program.emit_insn_with_label_dependency(
Insn::RewindAwait {
cursor_id,
pc_if_empty: *halt_label,
if reverse {
Insn::LastAwait {
cursor_id,
pc_if_empty: *halt_label,
}
} else {
Insn::RewindAwait {
cursor_id,
pc_if_empty: *halt_label,
}
},
*halt_label,
);
@@ -242,15 +256,30 @@ impl Emitter for Operator {
program.resolve_cursor_id(&table_reference.table_identifier, None);
program
.resolve_label(*m.next_row_labels.get(id).unwrap(), program.offset());
program.emit_insn(Insn::NextAsync { cursor_id });
if reverse {
program.emit_insn(Insn::PrevAsync { cursor_id });
} else {
program.emit_insn(Insn::NextAsync { cursor_id });
}
let jump_label = m.scan_loop_body_labels.pop().unwrap();
program.emit_insn_with_label_dependency(
Insn::NextAwait {
cursor_id,
pc_if_next: jump_label,
},
jump_label,
);
if reverse {
program.emit_insn_with_label_dependency(
Insn::PrevAwait {
cursor_id,
pc_if_next: jump_label,
},
jump_label,
);
} else {
program.emit_insn_with_label_dependency(
Insn::NextAwait {
cursor_id,
pc_if_next: jump_label,
},
jump_label,
);
}
Ok(OpStepResult::Done)
}
_ => Ok(OpStepResult::Done),

View File

@@ -6,7 +6,7 @@ use crate::{schema::Index, util::normalize_ident, Result};
use super::plan::{
get_table_ref_bitmask_for_ast_expr, get_table_ref_bitmask_for_operator, BTreeTableReference,
Direction, Operator, Plan, ProjectionColumn, Search,
Direction, IterationDirection, Operator, Plan, ProjectionColumn, Search,
};
/**
@@ -88,16 +88,18 @@ fn eliminate_unnecessary_orderby(
) -> Result<()> {
match operator {
Operator::Order { source, key, .. } => {
if key.len() != 1 || key.first().unwrap().1 != Direction::Ascending {
// TODO: handle multiple order by keys and descending order
if key.len() != 1 {
// TODO: handle multiple order by keys
return Ok(());
}
let already_ordered = _operator_is_already_ordered_by(
source,
&mut key.first_mut().unwrap().0,
available_indexes,
)?;
let (key, direction) = key.first_mut().unwrap();
let already_ordered = _operator_is_already_ordered_by(source, key, available_indexes)?;
if already_ordered {
push_scan_direction(source, direction);
*operator = source.take_ownership();
}
Ok(())
@@ -572,6 +574,21 @@ fn push_predicate(
}
}
fn push_scan_direction(operator: &mut Operator, direction: &Direction) {
match operator {
Operator::Projection { source, .. } => push_scan_direction(source, direction),
Operator::Scan { iter_dir, .. } => {
if iter_dir.is_none() {
match direction {
Direction::Ascending => *iter_dir = Some(IterationDirection::Forwards),
Direction::Descending => *iter_dir = Some(IterationDirection::Backwards),
}
}
}
_ => todo!(),
}
}
#[derive(Debug)]
pub struct ExpressionResultCache {
resultmap: HashMap<usize, CachedResult>,

View File

@@ -26,6 +26,12 @@ impl Display for Plan {
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum IterationDirection {
Forwards,
Backwards,
}
/**
An Operator is a Node in the query plan.
Operators form a tree structure, with each having zero or more children.
@@ -109,11 +115,16 @@ pub enum Operator {
// It takes a table to scan and an optional list of predicates to evaluate.
// The predicates are used to filter rows from the table.
// e.g. SELECT * FROM t1 WHERE t1.foo = 5
// The iter_dir are uset to indicate the direction of the iterator.
// The use of Option for iter_dir is aimed at implementing a conservative optimization strategy: it only pushes
// iter_dir down to Scan when iter_dir is None, to prevent potential result set errors caused by multiple
// assignments. for more detailed discussions, please refer to https://github.com/penberg/limbo/pull/376
Scan {
id: usize,
table_reference: BTreeTableReference,
predicates: Option<Vec<ast::Expr>>,
step: usize,
iter_dir: Option<IterationDirection>,
},
// Search operator
// This operator is used to search for a row in a table using an index

View File

@@ -319,6 +319,7 @@ fn parse_from(
predicates: None,
id: operator_id_counter.get_next_id(),
step: 0,
iter_dir: None,
};
let mut tables = vec![first_table];
@@ -398,6 +399,7 @@ fn parse_join(
predicates: None,
id: operator_id_counter.get_next_id(),
step: 0,
iter_dir: None,
},
outer,
predicates,

View File

@@ -426,7 +426,9 @@ pub enum SeekKey<'a> {
pub trait Cursor {
fn is_empty(&self) -> bool;
fn rewind(&mut self) -> Result<CursorResult<()>>;
fn last(&mut self) -> Result<CursorResult<()>>;
fn next(&mut self) -> Result<CursorResult<()>>;
fn prev(&mut self) -> Result<CursorResult<()>>;
fn wait_for_completion(&mut self) -> Result<()>;
fn rowid(&self) -> Result<Option<u64>>;
fn seek(&mut self, key: SeekKey, op: SeekOp) -> Result<CursorResult<bool>>;

View File

@@ -232,6 +232,13 @@ impl ProgramBuilder {
assert!(*pc_if_empty < 0);
*pc_if_empty = to_offset;
}
Insn::LastAwait {
cursor_id: _cursor_id,
pc_if_empty,
} => {
assert!(*pc_if_empty < 0);
*pc_if_empty = to_offset;
}
Insn::Goto { target_pc } => {
assert!(*target_pc < 0);
*target_pc = to_offset;
@@ -269,6 +276,10 @@ impl ProgramBuilder {
assert!(*pc_if_next < 0);
*pc_if_next = to_offset;
}
Insn::PrevAwait { pc_if_next, .. } => {
assert!(*pc_if_next < 0);
*pc_if_next = to_offset;
}
Insn::InitCoroutine {
yield_reg: _,
jump_on_definition,

View File

@@ -893,6 +893,15 @@ pub fn insn_to_str(
0,
"".to_string(),
),
Insn::LastAsync { .. } => (
"LastAsync",
0,
0,
0,
OwnedValue::Text(Rc::new("".to_string())),
0,
"".to_string(),
),
Insn::IsNull { src, target_pc } => (
"IsNull",
*src as i32,
@@ -911,6 +920,33 @@ pub fn insn_to_str(
0,
where_clause.clone(),
),
Insn::LastAwait { .. } => (
"LastAwait",
0,
0,
0,
OwnedValue::Text(Rc::new("".to_string())),
0,
"".to_string(),
),
Insn::PrevAsync { .. } => (
"PrevAsync",
0,
0,
0,
OwnedValue::Text(Rc::new("".to_string())),
0,
"".to_string(),
),
Insn::PrevAwait { .. } => (
"PrevAwait",
0,
0,
0,
OwnedValue::Text(Rc::new("".to_string())),
0,
"".to_string(),
),
};
format!(
"{:<4} {:<17} {:<4} {:<4} {:<4} {:<13} {:<2} {}",

View File

@@ -213,6 +213,15 @@ pub enum Insn {
pc_if_empty: BranchOffset,
},
LastAsync {
cursor_id: CursorID,
},
LastAwait {
cursor_id: CursorID,
pc_if_empty: BranchOffset,
},
// Read a column from the current row of the cursor.
Column {
cursor_id: CursorID,
@@ -244,6 +253,15 @@ pub enum Insn {
pc_if_next: BranchOffset,
},
PrevAsync {
cursor_id: CursorID,
},
PrevAwait {
cursor_id: CursorID,
pc_if_next: BranchOffset,
},
// Halt the program.
Halt {
err_code: usize,
@@ -1093,6 +1111,29 @@ impl Program {
}
state.pc += 1;
}
Insn::LastAsync { cursor_id } => {
let cursor = cursors.get_mut(cursor_id).unwrap();
match cursor.last()? {
CursorResult::Ok(()) => {}
CursorResult::IO => {
// If there is I/O, the instruction is restarted.
return Ok(StepResult::IO);
}
}
state.pc += 1;
}
Insn::LastAwait {
cursor_id,
pc_if_empty,
} => {
let cursor = cursors.get_mut(cursor_id).unwrap();
cursor.wait_for_completion()?;
if cursor.is_empty() {
state.pc = *pc_if_empty;
} else {
state.pc += 1;
}
}
Insn::RewindAwait {
cursor_id,
pc_if_empty,
@@ -1162,6 +1203,31 @@ impl Program {
}
state.pc += 1;
}
Insn::PrevAsync { cursor_id } => {
let cursor = cursors.get_mut(cursor_id).unwrap();
cursor.set_null_flag(false);
match cursor.prev()? {
CursorResult::Ok(_) => {}
CursorResult::IO => {
// If there is I/O, the instruction is restarted.
return Ok(StepResult::IO);
}
}
state.pc += 1;
}
Insn::PrevAwait {
cursor_id,
pc_if_next,
} => {
assert!(*pc_if_next >= 0);
let cursor = cursors.get_mut(cursor_id).unwrap();
cursor.wait_for_completion()?;
if !cursor.is_empty() {
state.pc = *pc_if_next;
} else {
state.pc += 1;
}
}
Insn::NextAwait {
cursor_id,
pc_if_next,
@@ -2313,6 +2379,7 @@ fn get_indent_count(indent_count: usize, curr_insn: &Insn, prev_insn: Option<&In
let indent_count = if let Some(insn) = prev_insn {
match insn {
Insn::RewindAwait { .. }
| Insn::LastAwait { .. }
| Insn::SorterSort { .. }
| Insn::SeekGE { .. }
| Insn::SeekGT { .. } => indent_count + 1,
@@ -2323,7 +2390,9 @@ fn get_indent_count(indent_count: usize, curr_insn: &Insn, prev_insn: Option<&In
};
match curr_insn {
Insn::NextAsync { .. } | Insn::SorterNext { .. } => indent_count - 1,
Insn::NextAsync { .. } | Insn::SorterNext { .. } | Insn::PrevAsync { .. } => {
indent_count - 1
}
_ => indent_count,
}
}
@@ -3089,6 +3158,14 @@ mod tests {
fn btree_create(&mut self, _flags: usize) -> u32 {
unimplemented!()
}
fn last(&mut self) -> Result<CursorResult<()>> {
todo!()
}
fn prev(&mut self) -> Result<CursorResult<()>> {
todo!()
}
}
#[test]

View File

@@ -3,6 +3,7 @@ use crate::{
Result,
};
use std::cell::{Ref, RefCell};
use std::cmp::Ordering;
pub struct Sorter {
records: Vec<OwnedRecord>,
@@ -27,11 +28,27 @@ impl Cursor for Sorter {
// We do the sorting here since this is what is called by the SorterSort instruction
fn rewind(&mut self) -> Result<CursorResult<()>> {
let key_fields = self.order.len();
self.records
.sort_by_cached_key(|record| OwnedRecord::new(record.values[0..key_fields].to_vec()));
self.records.reverse();
self.records.sort_by(|a, b| {
let cmp_by_idx = |idx: usize, ascending: bool| {
let a = &a.values[idx];
let b = &b.values[idx];
if ascending {
a.cmp(b)
} else {
b.cmp(a)
}
};
let mut cmp_ret = Ordering::Equal;
for (idx, &is_asc) in self.order.iter().enumerate() {
cmp_ret = cmp_by_idx(idx, is_asc);
if cmp_ret != Ordering::Equal {
break;
}
}
cmp_ret
});
self.records.reverse();
self.next()
}
@@ -91,4 +108,12 @@ impl Cursor for Sorter {
fn btree_create(&mut self, _flags: usize) -> u32 {
unreachable!("Why did you try to build a new tree with a sorter??? Stand up, open the door and take a walk for 30 min to come back with a better plan.");
}
fn last(&mut self) -> Result<CursorResult<()>> {
todo!()
}
fn prev(&mut self) -> Result<CursorResult<()>> {
todo!()
}
}

View File

@@ -17,6 +17,20 @@ do_execsql_test basic-order-by {
2|cap|82.0
8|sneakers|82.0}
do_execsql_test basic-order-by {
select * from products order by price desc;
} {2|cap|82.0
8|sneakers|82.0
11|accessories|81.0
1|hat|79.0
7|jeans|78.0
5|sweatshirt|74.0
6|shorts|70.0
10|coat|33.0
4|sweater|25.0
3|shirt|18.0
9|boots|1.0}
do_execsql_test basic-order-by-and-limit {
select * from products order by name limit 5;
} {11|accessories|81.0
@@ -25,6 +39,14 @@ do_execsql_test basic-order-by-and-limit {
10|coat|33.0
1|hat|79.0}
do_execsql_test basic-order-by-and-limit {
select * from products order by name desc limit 5;
} {5|sweatshirt|74.0
4|sweater|25.0
8|sneakers|82.0
6|shorts|70.0
3|shirt|18.0}
do_execsql_test basic-order-by-and-limit-2 {
select id, name from products order by name limit 5;
} {11|accessories
@@ -33,6 +55,14 @@ do_execsql_test basic-order-by-and-limit-2 {
10|coat
1|hat}
do_execsql_test basic-order-by-and-limit-2 {
select id, name from products order by name desc limit 5;
} {5|sweatshirt
4|sweater
8|sneakers
6|shorts
3|shirt}
do_execsql_test basic-order-by-and-limit-3 {
select price, name from products where price > 70 order by name;
} {81.0|accessories
@@ -42,11 +72,47 @@ do_execsql_test basic-order-by-and-limit-3 {
82.0|sneakers
74.0|sweatshirt}
do_execsql_test basic-order-by-and-limit-3 {
select price, name from products where price > 70 order by name desc;
} {74.0|sweatshirt
82.0|sneakers
78.0|jeans
79.0|hat
82.0|cap
81.0|accessories}
do_execsql_test order-by-qualified {
select u.first_name from users u order by u.first_name limit 1;
} {Aaron}
do_execsql_test order-by-qualified {
select u.first_name from users u order by u.first_name desc limit 1;
} {Zoe}
do_execsql_test order-by-column-number {
select first_name, last_name, age from users order by 3,2 limit 2;
} {Teresa|Allen|1
David|Baker|1}
do_execsql_test order-by-column-number {
select first_name, last_name, age from users order by 3 desc, 2 asc limit 2;
} {Connor|Adkins|100
John|Bell|100}
do_execsql_test order-by-column-number {
select first_name, last_name, age from users order by 3 asc, 2 desc limit 2;
} {Kyle|Wolf|1
Jason|Williams|1}
do_execsql_test order-by-column-number {
select first_name, last_name, age from users order by 3 asc, 2 desc limit 10;
} {Kyle|Wolf|1
Jason|Williams|1
Tracey|Williams|1
Jessica|Werner|1
Jasmine|Warren|1
Dennis|Ward|1
Whitney|Walker|1
Robert|Villanueva|1
Cynthia|Thomas|1
Brandon|Tate|1}