mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-18 14:35:14 +01:00
Merge 'Refactor compound select' from meteorgan
Implement recursive processing for compound SELECT statements by dividing them into two components: 1. left: a vector containing a SELECT core and its associated compound operator 2. right_most: the rightmost SELECT core in the compound statement Based on the compound operator, we can determine how to process the rightmost part and recursively handle the left component. Following this refactor, we can seamlessly integrate support for `INTERSECT` and `EXCEPT` operations. Reviewed-by: Jussi Saurio <jussi.saurio@gmail.com> Closes #1718
This commit is contained in:
310
core/translate/compound_select.rs
Normal file
310
core/translate/compound_select.rs
Normal file
@@ -0,0 +1,310 @@
|
||||
use crate::schema::{Index, IndexColumn, Schema};
|
||||
use crate::translate::emitter::{emit_query, LimitCtx, TransactionMode, TranslateCtx};
|
||||
use crate::translate::plan::{Plan, QueryDestination, SelectPlan};
|
||||
use crate::vdbe::builder::{CursorType, ProgramBuilder};
|
||||
use crate::vdbe::insn::Insn;
|
||||
use crate::vdbe::BranchOffset;
|
||||
use crate::SymbolTable;
|
||||
use limbo_sqlite3_parser::ast::{CompoundOperator, SortOrder};
|
||||
use std::sync::Arc;
|
||||
use tracing::instrument;
|
||||
|
||||
use tracing::Level;
|
||||
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
pub fn emit_program_for_compound_select(
|
||||
program: &mut ProgramBuilder,
|
||||
plan: Plan,
|
||||
schema: &Schema,
|
||||
syms: &SymbolTable,
|
||||
) -> crate::Result<()> {
|
||||
let Plan::CompoundSelect {
|
||||
left: _left,
|
||||
right_most,
|
||||
limit,
|
||||
..
|
||||
} = &plan
|
||||
else {
|
||||
crate::bail_parse_error!("expected compound select plan");
|
||||
};
|
||||
|
||||
let right_plan = right_most.clone();
|
||||
// Trivial exit on LIMIT 0
|
||||
if let Some(limit) = limit {
|
||||
if *limit == 0 {
|
||||
program.epilogue(TransactionMode::Read);
|
||||
program.result_columns = right_plan.result_columns;
|
||||
program.table_references.extend(right_plan.table_references);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
// Each subselect shares the same limit_ctx, because the LIMIT applies to the entire compound select,
|
||||
// not just a single subselect.
|
||||
let limit_ctx = limit.map(|limit| {
|
||||
let reg = program.alloc_register();
|
||||
program.emit_insn(Insn::Integer {
|
||||
value: limit as i64,
|
||||
dest: reg,
|
||||
});
|
||||
LimitCtx::new_shared(reg)
|
||||
});
|
||||
|
||||
// When a compound SELECT is part of a query that yields results to a coroutine (e.g. within an INSERT clause),
|
||||
// we must allocate registers for the result columns to be yielded. Each subselect will then yield to
|
||||
// the coroutine using the same set of registers.
|
||||
let (yield_reg, reg_result_cols_start) = match right_most.query_destination {
|
||||
QueryDestination::CoroutineYield { yield_reg, .. } => {
|
||||
let start_reg = program.alloc_registers(right_most.result_columns.len());
|
||||
(Some(yield_reg), Some(start_reg))
|
||||
}
|
||||
_ => (None, None),
|
||||
};
|
||||
|
||||
emit_compound_select(
|
||||
program,
|
||||
plan,
|
||||
schema,
|
||||
syms,
|
||||
limit_ctx,
|
||||
yield_reg,
|
||||
reg_result_cols_start,
|
||||
)?;
|
||||
|
||||
program.epilogue(TransactionMode::Read);
|
||||
program.result_columns = right_plan.result_columns;
|
||||
program.table_references.extend(right_plan.table_references);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Emits bytecode for a compound SELECT statement. This function processes the rightmost part of
|
||||
// the compound SELECT and handles the left parts recursively based on the compound operator type.
|
||||
fn emit_compound_select(
|
||||
program: &mut ProgramBuilder,
|
||||
plan: Plan,
|
||||
schema: &Schema,
|
||||
syms: &SymbolTable,
|
||||
limit_ctx: Option<LimitCtx>,
|
||||
yield_reg: Option<usize>,
|
||||
reg_result_cols_start: Option<usize>,
|
||||
) -> crate::Result<()> {
|
||||
let Plan::CompoundSelect {
|
||||
mut left,
|
||||
mut right_most,
|
||||
limit,
|
||||
offset,
|
||||
order_by,
|
||||
} = plan
|
||||
else {
|
||||
unreachable!()
|
||||
};
|
||||
|
||||
let mut right_most_ctx = TranslateCtx::new(
|
||||
program,
|
||||
schema,
|
||||
syms,
|
||||
right_most.table_references.joined_tables().len(),
|
||||
right_most.result_columns.len(),
|
||||
);
|
||||
right_most_ctx.reg_result_cols_start = reg_result_cols_start;
|
||||
match left.pop() {
|
||||
Some((mut plan, operator)) => match operator {
|
||||
CompoundOperator::UnionAll => {
|
||||
if matches!(
|
||||
right_most.query_destination,
|
||||
QueryDestination::EphemeralIndex { .. }
|
||||
) {
|
||||
plan.query_destination = right_most.query_destination.clone();
|
||||
}
|
||||
let compound_select = Plan::CompoundSelect {
|
||||
left,
|
||||
right_most: plan,
|
||||
limit,
|
||||
offset,
|
||||
order_by,
|
||||
};
|
||||
emit_compound_select(
|
||||
program,
|
||||
compound_select,
|
||||
schema,
|
||||
syms,
|
||||
limit_ctx,
|
||||
yield_reg,
|
||||
reg_result_cols_start,
|
||||
)?;
|
||||
|
||||
let label_next_select = program.allocate_label();
|
||||
if let Some(limit_ctx) = limit_ctx {
|
||||
program.emit_insn(Insn::IfNot {
|
||||
reg: limit_ctx.reg_limit,
|
||||
target_pc: label_next_select,
|
||||
jump_if_null: true,
|
||||
});
|
||||
right_most.limit = limit;
|
||||
right_most_ctx.limit_ctx = Some(limit_ctx);
|
||||
}
|
||||
emit_query(program, &mut right_most, &mut right_most_ctx)?;
|
||||
program.preassign_label_to_next_insn(label_next_select);
|
||||
}
|
||||
CompoundOperator::Union => {
|
||||
let mut new_dedupe_index = false;
|
||||
let dedupe_index = match right_most.query_destination {
|
||||
QueryDestination::EphemeralIndex { cursor_id, index } => {
|
||||
(cursor_id, index.clone())
|
||||
}
|
||||
_ => {
|
||||
new_dedupe_index = true;
|
||||
create_union_dedupe_index(program, &right_most)
|
||||
}
|
||||
};
|
||||
plan.query_destination = QueryDestination::EphemeralIndex {
|
||||
cursor_id: dedupe_index.0,
|
||||
index: dedupe_index.1.clone(),
|
||||
};
|
||||
let compound_select = Plan::CompoundSelect {
|
||||
left,
|
||||
right_most: plan,
|
||||
limit,
|
||||
offset,
|
||||
order_by,
|
||||
};
|
||||
emit_compound_select(
|
||||
program,
|
||||
compound_select,
|
||||
schema,
|
||||
syms,
|
||||
None,
|
||||
yield_reg,
|
||||
reg_result_cols_start,
|
||||
)?;
|
||||
|
||||
right_most.query_destination = QueryDestination::EphemeralIndex {
|
||||
cursor_id: dedupe_index.0,
|
||||
index: dedupe_index.1.clone(),
|
||||
};
|
||||
emit_query(program, &mut right_most, &mut right_most_ctx)?;
|
||||
|
||||
if new_dedupe_index {
|
||||
let label_jump_over_dedupe = program.allocate_label();
|
||||
read_deduplicated_union_rows(
|
||||
program,
|
||||
dedupe_index.0,
|
||||
dedupe_index.1.as_ref(),
|
||||
limit_ctx,
|
||||
label_jump_over_dedupe,
|
||||
yield_reg,
|
||||
);
|
||||
program.preassign_label_to_next_insn(label_jump_over_dedupe);
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
crate::bail_parse_error!("unimplemented compound select operator: {:?}", operator);
|
||||
}
|
||||
},
|
||||
None => {
|
||||
if let Some(limit_ctx) = limit_ctx {
|
||||
right_most_ctx.limit_ctx = Some(limit_ctx);
|
||||
right_most.limit = limit;
|
||||
}
|
||||
emit_query(program, &mut right_most, &mut right_most_ctx)?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Creates an ephemeral index that will be used to deduplicate the results of any sub-selects
|
||||
/// that appear before the last UNION operator.
|
||||
fn create_union_dedupe_index(
|
||||
program: &mut ProgramBuilder,
|
||||
first_select_in_compound: &SelectPlan,
|
||||
) -> (usize, Arc<Index>) {
|
||||
let dedupe_index = Arc::new(Index {
|
||||
columns: first_select_in_compound
|
||||
.result_columns
|
||||
.iter()
|
||||
.map(|c| IndexColumn {
|
||||
name: c
|
||||
.name(&first_select_in_compound.table_references)
|
||||
.map(|n| n.to_string())
|
||||
.unwrap_or_default(),
|
||||
order: SortOrder::Asc,
|
||||
pos_in_table: 0,
|
||||
default: None,
|
||||
collation: None, // FIXME: this should be inferred
|
||||
})
|
||||
.collect(),
|
||||
name: "union_dedupe".to_string(),
|
||||
root_page: 0,
|
||||
ephemeral: true,
|
||||
table_name: String::new(),
|
||||
unique: true,
|
||||
has_rowid: false,
|
||||
});
|
||||
let cursor_id = program.alloc_cursor_id(CursorType::BTreeIndex(dedupe_index.clone()));
|
||||
program.emit_insn(Insn::OpenEphemeral {
|
||||
cursor_id,
|
||||
is_table: false,
|
||||
});
|
||||
(cursor_id, dedupe_index.clone())
|
||||
}
|
||||
|
||||
/// Emits the bytecode for reading deduplicated rows from the ephemeral index created for UNION operators.
|
||||
fn read_deduplicated_union_rows(
|
||||
program: &mut ProgramBuilder,
|
||||
dedupe_cursor_id: usize,
|
||||
dedupe_index: &Index,
|
||||
limit_ctx: Option<LimitCtx>,
|
||||
label_limit_reached: BranchOffset,
|
||||
yield_reg: Option<usize>,
|
||||
) {
|
||||
let label_dedupe_next = program.allocate_label();
|
||||
let label_dedupe_loop_start = program.allocate_label();
|
||||
let dedupe_cols_start_reg = program.alloc_registers(dedupe_index.columns.len());
|
||||
program.emit_insn(Insn::Rewind {
|
||||
cursor_id: dedupe_cursor_id,
|
||||
pc_if_empty: label_dedupe_next,
|
||||
});
|
||||
program.preassign_label_to_next_insn(label_dedupe_loop_start);
|
||||
for col_idx in 0..dedupe_index.columns.len() {
|
||||
let start_reg = if let Some(yield_reg) = yield_reg {
|
||||
// Need to reuse the yield_reg for the column being emitted
|
||||
yield_reg + 1
|
||||
} else {
|
||||
dedupe_cols_start_reg
|
||||
};
|
||||
program.emit_insn(Insn::Column {
|
||||
cursor_id: dedupe_cursor_id,
|
||||
column: col_idx,
|
||||
dest: start_reg + col_idx,
|
||||
default: None,
|
||||
});
|
||||
}
|
||||
if let Some(yield_reg) = yield_reg {
|
||||
program.emit_insn(Insn::Yield {
|
||||
yield_reg,
|
||||
end_offset: BranchOffset::Offset(0),
|
||||
});
|
||||
} else {
|
||||
program.emit_insn(Insn::ResultRow {
|
||||
start_reg: dedupe_cols_start_reg,
|
||||
count: dedupe_index.columns.len(),
|
||||
});
|
||||
}
|
||||
|
||||
if let Some(limit_ctx) = limit_ctx {
|
||||
program.emit_insn(Insn::DecrJumpZero {
|
||||
reg: limit_ctx.reg_limit,
|
||||
target_pc: label_limit_reached,
|
||||
})
|
||||
}
|
||||
program.preassign_label_to_next_insn(label_dedupe_next);
|
||||
program.emit_insn(Insn::Next {
|
||||
cursor_id: dedupe_cursor_id,
|
||||
pc_if_next: label_dedupe_loop_start,
|
||||
});
|
||||
program.emit_insn(Insn::Close {
|
||||
cursor_id: dedupe_cursor_id,
|
||||
});
|
||||
}
|
||||
@@ -30,17 +30,17 @@ impl Display for Plan {
|
||||
match self {
|
||||
Self::Select(select_plan) => select_plan.fmt(f),
|
||||
Self::CompoundSelect {
|
||||
first,
|
||||
rest,
|
||||
left,
|
||||
right_most,
|
||||
limit,
|
||||
offset,
|
||||
order_by,
|
||||
} => {
|
||||
first.fmt(f)?;
|
||||
for (plan, operator) in rest {
|
||||
writeln!(f, "{}", operator)?;
|
||||
for (plan, operator) in left {
|
||||
plan.fmt(f)?;
|
||||
writeln!(f, "{}", operator)?;
|
||||
}
|
||||
right_most.fmt(f)?;
|
||||
if let Some(limit) = limit {
|
||||
writeln!(f, "LIMIT: {}", limit)?;
|
||||
}
|
||||
@@ -268,28 +268,24 @@ impl ToSqlString for Plan {
|
||||
match self {
|
||||
Self::Select(select) => select.to_sql_string(&PlanContext(&[&select.table_references])),
|
||||
Self::CompoundSelect {
|
||||
first,
|
||||
rest,
|
||||
left,
|
||||
right_most,
|
||||
limit,
|
||||
offset,
|
||||
order_by,
|
||||
} => {
|
||||
let all_refs = std::iter::once(&first.table_references)
|
||||
.chain(
|
||||
rest.iter()
|
||||
.flat_map(|(plan, _)| std::iter::once(&plan.table_references)),
|
||||
)
|
||||
let all_refs = left
|
||||
.iter()
|
||||
.flat_map(|(plan, _)| std::iter::once(&plan.table_references))
|
||||
.chain(std::iter::once(&right_most.table_references))
|
||||
.collect::<Vec<_>>();
|
||||
let context = &PlanContext(all_refs.as_slice());
|
||||
|
||||
let mut ret = vec![first.to_sql_string(context)];
|
||||
for (other_plan, operator) in rest {
|
||||
ret.push(format!(
|
||||
"{} {}",
|
||||
operator,
|
||||
other_plan.to_sql_string(context),
|
||||
));
|
||||
let mut ret = Vec::new();
|
||||
for (plan, operator) in left {
|
||||
ret.push(format!("{} {}", plan.to_sql_string(context), operator));
|
||||
}
|
||||
ret.push(right_most.to_sql_string(context));
|
||||
if let Some(order_by) = &order_by {
|
||||
ret.push(format!(
|
||||
"ORDER BY {}",
|
||||
|
||||
@@ -2,9 +2,8 @@
|
||||
// It handles translating high-level SQL operations into low-level bytecode that can be executed by the virtual machine.
|
||||
|
||||
use std::rc::Rc;
|
||||
use std::sync::Arc;
|
||||
|
||||
use limbo_sqlite3_parser::ast::{self, SortOrder};
|
||||
use limbo_sqlite3_parser::ast::{self};
|
||||
use tracing::{instrument, Level};
|
||||
|
||||
use super::aggregation::emit_ungrouped_aggregation;
|
||||
@@ -16,14 +15,13 @@ use super::main_loop::{
|
||||
close_loop, emit_loop, init_distinct, init_loop, open_loop, LeftJoinMetadata, LoopLabels,
|
||||
};
|
||||
use super::order_by::{emit_order_by, init_order_by, SortMetadata};
|
||||
use super::plan::{
|
||||
JoinOrderMember, Operation, QueryDestination, SelectPlan, TableReferences, UpdatePlan,
|
||||
};
|
||||
use super::plan::{JoinOrderMember, Operation, SelectPlan, TableReferences, UpdatePlan};
|
||||
use super::select::emit_simple_count;
|
||||
use super::subquery::emit_subqueries;
|
||||
use crate::error::SQLITE_CONSTRAINT_PRIMARYKEY;
|
||||
use crate::function::Func;
|
||||
use crate::schema::{Index, IndexColumn, Schema};
|
||||
use crate::schema::Schema;
|
||||
use crate::translate::compound_select::emit_program_for_compound_select;
|
||||
use crate::translate::plan::{DeletePlan, Plan, Search};
|
||||
use crate::translate::values::emit_values;
|
||||
use crate::util::exprs_are_equivalent;
|
||||
@@ -193,278 +191,6 @@ pub fn emit_program(
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
fn emit_program_for_compound_select(
|
||||
program: &mut ProgramBuilder,
|
||||
plan: Plan,
|
||||
schema: &Schema,
|
||||
syms: &SymbolTable,
|
||||
) -> Result<()> {
|
||||
let Plan::CompoundSelect {
|
||||
mut first,
|
||||
mut rest,
|
||||
limit,
|
||||
..
|
||||
} = plan
|
||||
else {
|
||||
crate::bail_parse_error!("expected compound select plan");
|
||||
};
|
||||
|
||||
// Trivial exit on LIMIT 0
|
||||
if let Some(limit) = limit {
|
||||
if limit == 0 {
|
||||
program.epilogue(TransactionMode::Read);
|
||||
program.result_columns = first.result_columns;
|
||||
program.table_references.extend(first.table_references);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
// Each subselect gets their own TranslateCtx, but they share the same limit_ctx
|
||||
// because the LIMIT applies to the entire compound select, not just a single subselect.
|
||||
// The way LIMIT works with compound selects is:
|
||||
// - If a given subselect appears BEFORE any UNION, then do NOT count those rows towards the LIMIT,
|
||||
// because the rows from those subselects need to be deduplicated before they start being counted.
|
||||
// - If a given subselect appears AFTER the last UNION, then count those rows towards the LIMIT immediately.
|
||||
let limit_ctx = limit.map(|limit| {
|
||||
let reg = program.alloc_register();
|
||||
program.emit_insn(Insn::Integer {
|
||||
value: limit as i64,
|
||||
dest: reg,
|
||||
});
|
||||
LimitCtx::new_shared(reg)
|
||||
});
|
||||
|
||||
// Each subselect gets their own TranslateCtx.
|
||||
let mut t_ctx_list = Vec::with_capacity(rest.len() + 1);
|
||||
t_ctx_list.push(TranslateCtx::new(
|
||||
program,
|
||||
schema,
|
||||
syms,
|
||||
first.table_references.joined_tables().len(),
|
||||
first.result_columns.len(),
|
||||
));
|
||||
rest.iter().for_each(|(select, _)| {
|
||||
let t_ctx = TranslateCtx::new(
|
||||
program,
|
||||
schema,
|
||||
syms,
|
||||
select.table_references.joined_tables().len(),
|
||||
select.result_columns.len(),
|
||||
);
|
||||
t_ctx_list.push(t_ctx);
|
||||
});
|
||||
|
||||
// Compound select operators have the same precedence and are left-associative.
|
||||
// If there is any remaining UNION operator on the right side of a given sub-SELECT,
|
||||
// all of the rows from the preceding UNION arms need to be deduplicated.
|
||||
// This is done by creating an ephemeral index and inserting all the rows from the left side of
|
||||
// the last UNION arm into it.
|
||||
// Then, as soon as there are no more UNION operators left, all the deduplicated rows from the
|
||||
// ephemeral index are emitted, and lastly the rows from the remaining sub-SELECTS are emitted
|
||||
// as is, as they don't require deduplication.
|
||||
let mut first_t_ctx = t_ctx_list.remove(0);
|
||||
let requires_union_deduplication = rest
|
||||
.iter()
|
||||
.any(|(_, operator)| operator == &ast::CompoundOperator::Union);
|
||||
if requires_union_deduplication {
|
||||
// appears BEFORE a UNION operator, so do not count those rows towards the LIMIT.
|
||||
first.limit = None;
|
||||
} else {
|
||||
// appears AFTER the last UNION operator, so count those rows towards the LIMIT.
|
||||
first_t_ctx.limit_ctx = limit_ctx;
|
||||
}
|
||||
|
||||
let mut registers_subqery = None;
|
||||
let yield_reg = match first.query_destination {
|
||||
QueryDestination::CoroutineYield { yield_reg, .. } => {
|
||||
registers_subqery = Some(program.alloc_registers(first.result_columns.len()));
|
||||
first_t_ctx.reg_result_cols_start = registers_subqery.clone();
|
||||
Some(yield_reg)
|
||||
}
|
||||
_ => None,
|
||||
};
|
||||
|
||||
let mut union_dedupe_index = if requires_union_deduplication {
|
||||
let dedupe_index = get_union_dedupe_index(program, &first);
|
||||
first.query_destination = QueryDestination::EphemeralIndex {
|
||||
cursor_id: dedupe_index.0,
|
||||
index: dedupe_index.1.clone(),
|
||||
};
|
||||
Some(dedupe_index)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// Emit the first SELECT
|
||||
emit_query(program, &mut first, &mut first_t_ctx)?;
|
||||
|
||||
// Emit the remaining SELECTs. Any selects on the left side of a UNION must deduplicate their
|
||||
// results with the ephemeral index created above.
|
||||
while !t_ctx_list.is_empty() {
|
||||
let label_next_select = program.allocate_label();
|
||||
// If the LIMIT is reached in any subselect, jump to either:
|
||||
// a) the IfNot of the next subselect, or
|
||||
// b) the end of the program
|
||||
if let Some(limit_ctx) = limit_ctx {
|
||||
program.emit_insn(Insn::IfNot {
|
||||
reg: limit_ctx.reg_limit,
|
||||
target_pc: label_next_select,
|
||||
jump_if_null: true,
|
||||
});
|
||||
}
|
||||
let mut t_ctx = t_ctx_list.remove(0);
|
||||
let requires_union_deduplication = rest
|
||||
.iter()
|
||||
.any(|(_, operator)| operator == &ast::CompoundOperator::Union);
|
||||
let (mut select, operator) = rest.remove(0);
|
||||
if operator != ast::CompoundOperator::UnionAll && operator != ast::CompoundOperator::Union {
|
||||
crate::bail_parse_error!("unimplemented compound select operator: {:?}", operator);
|
||||
}
|
||||
|
||||
if requires_union_deduplication {
|
||||
// Again: appears BEFORE a UNION operator, so do not count those rows towards the LIMIT.
|
||||
select.limit = None;
|
||||
} else {
|
||||
// appears AFTER the last UNION operator, so count those rows towards the LIMIT.
|
||||
t_ctx.limit_ctx = limit_ctx;
|
||||
}
|
||||
|
||||
if requires_union_deduplication {
|
||||
select.query_destination = QueryDestination::EphemeralIndex {
|
||||
cursor_id: union_dedupe_index.as_ref().unwrap().0,
|
||||
index: union_dedupe_index.as_ref().unwrap().1.clone(),
|
||||
};
|
||||
} else if let Some((dedupe_cursor_id, dedupe_index)) = union_dedupe_index.take() {
|
||||
// When there are no more UNION operators left, all the deduplicated rows from the preceding union arms need to be emitted
|
||||
// as result rows.
|
||||
read_deduplicated_union_rows(
|
||||
program,
|
||||
dedupe_cursor_id,
|
||||
dedupe_index.as_ref(),
|
||||
limit_ctx,
|
||||
label_next_select,
|
||||
yield_reg.clone(),
|
||||
);
|
||||
}
|
||||
if matches!(
|
||||
select.query_destination,
|
||||
crate::translate::plan::QueryDestination::CoroutineYield { .. }
|
||||
) {
|
||||
// Need to reuse the same registers when you are yielding
|
||||
t_ctx.reg_result_cols_start = registers_subqery.clone();
|
||||
}
|
||||
emit_query(program, &mut select, &mut t_ctx)?;
|
||||
program.preassign_label_to_next_insn(label_next_select);
|
||||
}
|
||||
|
||||
if let Some((dedupe_cursor_id, dedupe_index)) = union_dedupe_index {
|
||||
let label_jump_over_dedupe = program.allocate_label();
|
||||
read_deduplicated_union_rows(
|
||||
program,
|
||||
dedupe_cursor_id,
|
||||
dedupe_index.as_ref(),
|
||||
limit_ctx,
|
||||
label_jump_over_dedupe,
|
||||
yield_reg,
|
||||
);
|
||||
program.preassign_label_to_next_insn(label_jump_over_dedupe);
|
||||
}
|
||||
|
||||
program.epilogue(TransactionMode::Read);
|
||||
program.result_columns = first.result_columns;
|
||||
program.table_references.extend(first.table_references);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Creates an ephemeral index that will be used to deduplicate the results of any sub-selects
|
||||
/// that appear before the last UNION operator.
|
||||
fn get_union_dedupe_index(
|
||||
program: &mut ProgramBuilder,
|
||||
first_select_in_compound: &SelectPlan,
|
||||
) -> (usize, Arc<Index>) {
|
||||
let dedupe_index = Arc::new(Index {
|
||||
columns: first_select_in_compound
|
||||
.result_columns
|
||||
.iter()
|
||||
.map(|c| IndexColumn {
|
||||
name: c
|
||||
.name(&first_select_in_compound.table_references)
|
||||
.map(|n| n.to_string())
|
||||
.unwrap_or_default(),
|
||||
order: SortOrder::Asc,
|
||||
pos_in_table: 0,
|
||||
collation: None, // FIXME: this should be inferred
|
||||
default: None,
|
||||
})
|
||||
.collect(),
|
||||
name: "union_dedupe".to_string(),
|
||||
root_page: 0,
|
||||
ephemeral: true,
|
||||
table_name: String::new(),
|
||||
unique: true,
|
||||
has_rowid: false,
|
||||
});
|
||||
let cursor_id = program.alloc_cursor_id(CursorType::BTreeIndex(dedupe_index.clone()));
|
||||
program.emit_insn(Insn::OpenEphemeral {
|
||||
cursor_id,
|
||||
is_table: false,
|
||||
});
|
||||
(cursor_id, dedupe_index.clone())
|
||||
}
|
||||
|
||||
/// Emits the bytecode for reading deduplicated rows from the ephemeral index created for UNION operators.
|
||||
fn read_deduplicated_union_rows(
|
||||
program: &mut ProgramBuilder,
|
||||
dedupe_cursor_id: usize,
|
||||
dedupe_index: &Index,
|
||||
limit_ctx: Option<LimitCtx>,
|
||||
label_limit_reached: BranchOffset,
|
||||
yield_reg: Option<usize>,
|
||||
) {
|
||||
let label_dedupe_next = program.allocate_label();
|
||||
let label_dedupe_loop_start = program.allocate_label();
|
||||
let dedupe_cols_start_reg = program.alloc_registers(dedupe_index.columns.len());
|
||||
program.emit_insn(Insn::Rewind {
|
||||
cursor_id: dedupe_cursor_id,
|
||||
pc_if_empty: label_dedupe_next,
|
||||
});
|
||||
program.preassign_label_to_next_insn(label_dedupe_loop_start);
|
||||
for col_idx in 0..dedupe_index.columns.len() {
|
||||
let start_reg = if let Some(yield_reg) = yield_reg {
|
||||
// Need to reuse the yield_reg for the column being emitted
|
||||
yield_reg + 1
|
||||
} else {
|
||||
dedupe_cols_start_reg
|
||||
};
|
||||
program.emit_column(dedupe_cursor_id, col_idx, start_reg + col_idx);
|
||||
}
|
||||
if let Some(yield_reg) = yield_reg {
|
||||
program.emit_insn(Insn::Yield {
|
||||
yield_reg,
|
||||
end_offset: BranchOffset::Offset(0),
|
||||
});
|
||||
} else {
|
||||
program.emit_insn(Insn::ResultRow {
|
||||
start_reg: dedupe_cols_start_reg,
|
||||
count: dedupe_index.columns.len(),
|
||||
});
|
||||
}
|
||||
|
||||
if let Some(limit_ctx) = limit_ctx {
|
||||
program.emit_insn(Insn::DecrJumpZero {
|
||||
reg: limit_ctx.reg_limit,
|
||||
target_pc: label_limit_reached,
|
||||
})
|
||||
}
|
||||
program.preassign_label_to_next_insn(label_dedupe_next);
|
||||
program.emit_insn(Insn::Next {
|
||||
cursor_id: dedupe_cursor_id,
|
||||
pc_if_next: label_dedupe_loop_start,
|
||||
});
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
fn emit_program_for_select(
|
||||
program: &mut ProgramBuilder,
|
||||
|
||||
@@ -10,6 +10,7 @@
|
||||
pub(crate) mod aggregation;
|
||||
pub(crate) mod alter;
|
||||
pub(crate) mod collate;
|
||||
mod compound_select;
|
||||
pub(crate) mod delete;
|
||||
pub(crate) mod display;
|
||||
pub(crate) mod emitter;
|
||||
|
||||
@@ -41,9 +41,11 @@ pub fn optimize_plan(plan: &mut Plan, schema: &Schema) -> Result<()> {
|
||||
Plan::Select(plan) => optimize_select_plan(plan, schema)?,
|
||||
Plan::Delete(plan) => optimize_delete_plan(plan, schema)?,
|
||||
Plan::Update(plan) => optimize_update_plan(plan, schema)?,
|
||||
Plan::CompoundSelect { first, rest, .. } => {
|
||||
optimize_select_plan(first, schema)?;
|
||||
for (plan, _) in rest {
|
||||
Plan::CompoundSelect {
|
||||
left, right_most, ..
|
||||
} => {
|
||||
optimize_select_plan(right_most, schema)?;
|
||||
for (plan, _) in left {
|
||||
optimize_select_plan(plan, schema)?;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -292,8 +292,8 @@ impl Ord for EvalAt {
|
||||
pub enum Plan {
|
||||
Select(SelectPlan),
|
||||
CompoundSelect {
|
||||
first: SelectPlan,
|
||||
rest: Vec<(SelectPlan, ast::CompoundOperator)>,
|
||||
left: Vec<(SelectPlan, ast::CompoundOperator)>,
|
||||
right_most: SelectPlan,
|
||||
limit: Option<isize>,
|
||||
offset: Option<isize>,
|
||||
order_by: Option<Vec<(ast::Expr, SortOrder)>>,
|
||||
|
||||
@@ -52,24 +52,26 @@ pub fn translate_select(
|
||||
approx_num_labels: estimate_num_labels(select),
|
||||
}
|
||||
}
|
||||
Plan::CompoundSelect { first, rest, .. } => {
|
||||
Plan::CompoundSelect {
|
||||
left, right_most, ..
|
||||
} => {
|
||||
// Compound Selects must return the same number of columns
|
||||
num_result_cols = first.result_columns.len();
|
||||
num_result_cols = right_most.result_columns.len();
|
||||
|
||||
ProgramBuilderOpts {
|
||||
query_mode,
|
||||
num_cursors: count_plan_required_cursors(first)
|
||||
+ rest
|
||||
num_cursors: count_plan_required_cursors(right_most)
|
||||
+ left
|
||||
.iter()
|
||||
.map(|(plan, _)| count_plan_required_cursors(plan))
|
||||
.sum::<usize>(),
|
||||
approx_num_insns: estimate_num_instructions(first)
|
||||
+ rest
|
||||
approx_num_insns: estimate_num_instructions(right_most)
|
||||
+ left
|
||||
.iter()
|
||||
.map(|(plan, _)| estimate_num_instructions(plan))
|
||||
.sum::<usize>(),
|
||||
approx_num_labels: estimate_num_labels(first)
|
||||
+ rest
|
||||
approx_num_labels: estimate_num_labels(right_most)
|
||||
+ left
|
||||
.iter()
|
||||
.map(|(plan, _)| estimate_num_labels(plan))
|
||||
.sum::<usize>(),
|
||||
@@ -111,7 +113,7 @@ pub fn prepare_select_plan<'a>(
|
||||
)?))
|
||||
}
|
||||
Some(compounds) => {
|
||||
let mut first = prepare_one_select_plan(
|
||||
let mut last = prepare_one_select_plan(
|
||||
schema,
|
||||
*select.body.select,
|
||||
None,
|
||||
@@ -122,7 +124,8 @@ pub fn prepare_select_plan<'a>(
|
||||
table_ref_counter,
|
||||
query_destination.clone(),
|
||||
)?;
|
||||
let mut rest = Vec::with_capacity(compounds.len());
|
||||
|
||||
let mut left = Vec::with_capacity(compounds.len());
|
||||
for CompoundSelect { select, operator } in compounds {
|
||||
// TODO: add support for EXCEPT and INTERSECT
|
||||
if operator != ast::CompoundOperator::UnionAll
|
||||
@@ -132,7 +135,8 @@ pub fn prepare_select_plan<'a>(
|
||||
"only UNION ALL and UNION are supported for compound SELECTs"
|
||||
);
|
||||
}
|
||||
let plan = prepare_one_select_plan(
|
||||
left.push((last, operator));
|
||||
last = prepare_one_select_plan(
|
||||
schema,
|
||||
*select,
|
||||
None,
|
||||
@@ -143,22 +147,17 @@ pub fn prepare_select_plan<'a>(
|
||||
table_ref_counter,
|
||||
query_destination.clone(),
|
||||
)?;
|
||||
rest.push((plan, operator));
|
||||
}
|
||||
// Ensure all subplans have same number of result columns
|
||||
let first_num_result_columns = first.result_columns.len();
|
||||
for (plan, operator) in rest.iter() {
|
||||
if plan.result_columns.len() != first_num_result_columns {
|
||||
|
||||
// Ensure all subplans have the same number of result columns
|
||||
let right_most_num_result_columns = last.result_columns.len();
|
||||
for (plan, operator) in left.iter() {
|
||||
if plan.result_columns.len() != right_most_num_result_columns {
|
||||
crate::bail_parse_error!("SELECTs to the left and right of {} do not have the same number of result columns", operator);
|
||||
}
|
||||
}
|
||||
let (limit, offset) = select.limit.map_or(Ok((None, None)), |l| parse_limit(&l))?;
|
||||
|
||||
first.limit = limit.clone();
|
||||
for (plan, _) in rest.iter_mut() {
|
||||
plan.limit = limit.clone();
|
||||
}
|
||||
|
||||
// FIXME: handle OFFSET for compound selects
|
||||
if offset.map_or(false, |o| o > 0) {
|
||||
crate::bail_parse_error!("OFFSET is not supported for compound SELECTs yet");
|
||||
@@ -172,8 +171,8 @@ pub fn prepare_select_plan<'a>(
|
||||
crate::bail_parse_error!("WITH is not supported for compound SELECTs yet");
|
||||
}
|
||||
Ok(Plan::CompoundSelect {
|
||||
first,
|
||||
rest,
|
||||
left,
|
||||
right_most: last,
|
||||
limit,
|
||||
offset,
|
||||
order_by: None,
|
||||
|
||||
Reference in New Issue
Block a user