mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-22 16:35:30 +01:00
refactor compound select
This commit is contained in:
309
core/translate/compound_select.rs
Normal file
309
core/translate/compound_select.rs
Normal file
@@ -0,0 +1,309 @@
|
||||
use crate::schema::{Index, IndexColumn, Schema};
|
||||
use crate::translate::emitter::{emit_query, LimitCtx, TransactionMode, TranslateCtx};
|
||||
use crate::translate::plan::{Plan, QueryDestination, SelectPlan};
|
||||
use crate::vdbe::builder::{CursorType, ProgramBuilder};
|
||||
use crate::vdbe::insn::Insn;
|
||||
use crate::vdbe::BranchOffset;
|
||||
use crate::SymbolTable;
|
||||
use limbo_sqlite3_parser::ast::{CompoundOperator, SortOrder};
|
||||
use std::sync::Arc;
|
||||
use tracing::instrument;
|
||||
|
||||
use tracing::Level;
|
||||
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
pub fn emit_program_for_compound_select(
|
||||
program: &mut ProgramBuilder,
|
||||
plan: Plan,
|
||||
schema: &Schema,
|
||||
syms: &SymbolTable,
|
||||
) -> crate::Result<()> {
|
||||
let Plan::CompoundSelect {
|
||||
left: _left,
|
||||
right_most,
|
||||
limit,
|
||||
..
|
||||
} = &plan
|
||||
else {
|
||||
crate::bail_parse_error!("expected compound select plan");
|
||||
};
|
||||
|
||||
let right_plan = right_most.clone();
|
||||
// Trivial exit on LIMIT 0
|
||||
if let Some(limit) = limit {
|
||||
if *limit == 0 {
|
||||
program.epilogue(TransactionMode::Read);
|
||||
program.result_columns = right_plan.result_columns;
|
||||
program.table_references.extend(right_plan.table_references);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
// Each subselect gets their own TranslateCtx, but they share the same limit_ctx
|
||||
// because the LIMIT applies to the entire compound select, not just a single subselect.
|
||||
// The way LIMIT works with compound selects is:
|
||||
// - If a given subselect appears BEFORE any UNION, then do NOT count those rows towards the LIMIT,
|
||||
// because the rows from those subselects need to be deduplicated before they start being counted.
|
||||
// - If a given subselect appears AFTER the last UNION, then count those rows towards the LIMIT immediately.
|
||||
let limit_ctx = limit.map(|limit| {
|
||||
let reg = program.alloc_register();
|
||||
program.emit_insn(Insn::Integer {
|
||||
value: limit as i64,
|
||||
dest: reg,
|
||||
});
|
||||
LimitCtx::new_shared(reg)
|
||||
});
|
||||
|
||||
let (yield_reg, reg_result_cols_start) = match right_most.query_destination {
|
||||
QueryDestination::CoroutineYield { yield_reg, .. } => {
|
||||
let start_reg = program.alloc_registers(right_most.result_columns.len());
|
||||
(Some(yield_reg), Some(start_reg))
|
||||
}
|
||||
_ => (None, None),
|
||||
};
|
||||
|
||||
emit_compound_select(
|
||||
program,
|
||||
plan,
|
||||
schema,
|
||||
syms,
|
||||
limit_ctx,
|
||||
yield_reg,
|
||||
reg_result_cols_start,
|
||||
)?;
|
||||
|
||||
program.epilogue(TransactionMode::Read);
|
||||
program.result_columns = right_plan.result_columns;
|
||||
program.table_references.extend(right_plan.table_references);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn emit_compound_select(
|
||||
program: &mut ProgramBuilder,
|
||||
plan: Plan,
|
||||
schema: &Schema,
|
||||
syms: &SymbolTable,
|
||||
limit_ctx: Option<LimitCtx>,
|
||||
yield_reg: Option<usize>,
|
||||
reg_result_cols_start: Option<usize>,
|
||||
) -> crate::Result<()> {
|
||||
let Plan::CompoundSelect {
|
||||
mut left,
|
||||
mut right_most,
|
||||
limit,
|
||||
offset,
|
||||
order_by,
|
||||
} = plan
|
||||
else {
|
||||
unreachable!()
|
||||
};
|
||||
|
||||
let mut right_most_ctx = TranslateCtx::new(
|
||||
program,
|
||||
schema,
|
||||
syms,
|
||||
right_most.table_references.joined_tables().len(),
|
||||
right_most.result_columns.len(),
|
||||
);
|
||||
right_most_ctx.reg_result_cols_start = reg_result_cols_start;
|
||||
match left.pop() {
|
||||
Some((mut plan, operator)) => match operator {
|
||||
CompoundOperator::UnionAll => {
|
||||
if matches!(
|
||||
right_most.query_destination,
|
||||
QueryDestination::EphemeralIndex { .. }
|
||||
) {
|
||||
plan.query_destination = right_most.query_destination.clone();
|
||||
}
|
||||
let compound_select = Plan::CompoundSelect {
|
||||
left,
|
||||
right_most: plan,
|
||||
limit,
|
||||
offset,
|
||||
order_by,
|
||||
};
|
||||
emit_compound_select(
|
||||
program,
|
||||
compound_select,
|
||||
schema,
|
||||
syms,
|
||||
limit_ctx,
|
||||
yield_reg,
|
||||
reg_result_cols_start,
|
||||
)?;
|
||||
|
||||
let label_next_select = program.allocate_label();
|
||||
if let Some(limit_ctx) = limit_ctx {
|
||||
program.emit_insn(Insn::IfNot {
|
||||
reg: limit_ctx.reg_limit,
|
||||
target_pc: label_next_select,
|
||||
jump_if_null: true,
|
||||
});
|
||||
right_most.limit = limit;
|
||||
right_most_ctx.limit_ctx = Some(limit_ctx);
|
||||
}
|
||||
emit_query(program, &mut right_most, &mut right_most_ctx)?;
|
||||
program.preassign_label_to_next_insn(label_next_select);
|
||||
}
|
||||
CompoundOperator::Union => {
|
||||
let mut new_dedupe_index = false;
|
||||
let dedupe_index = match right_most.query_destination {
|
||||
QueryDestination::EphemeralIndex { cursor_id, index } => {
|
||||
(cursor_id, index.clone())
|
||||
}
|
||||
_ => {
|
||||
new_dedupe_index = true;
|
||||
create_union_dedupe_index(program, &right_most)
|
||||
}
|
||||
};
|
||||
plan.query_destination = QueryDestination::EphemeralIndex {
|
||||
cursor_id: dedupe_index.0,
|
||||
index: dedupe_index.1.clone(),
|
||||
};
|
||||
let compound_select = Plan::CompoundSelect {
|
||||
left,
|
||||
right_most: plan,
|
||||
limit,
|
||||
offset,
|
||||
order_by,
|
||||
};
|
||||
emit_compound_select(
|
||||
program,
|
||||
compound_select,
|
||||
schema,
|
||||
syms,
|
||||
None,
|
||||
yield_reg,
|
||||
reg_result_cols_start,
|
||||
)?;
|
||||
|
||||
right_most.query_destination = QueryDestination::EphemeralIndex {
|
||||
cursor_id: dedupe_index.0,
|
||||
index: dedupe_index.1.clone(),
|
||||
};
|
||||
emit_query(program, &mut right_most, &mut right_most_ctx)?;
|
||||
|
||||
if new_dedupe_index {
|
||||
let label_jump_over_dedupe = program.allocate_label();
|
||||
read_deduplicated_union_rows(
|
||||
program,
|
||||
dedupe_index.0,
|
||||
dedupe_index.1.as_ref(),
|
||||
limit_ctx,
|
||||
label_jump_over_dedupe,
|
||||
yield_reg,
|
||||
);
|
||||
program.preassign_label_to_next_insn(label_jump_over_dedupe);
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
crate::bail_parse_error!("unimplemented compound select operator: {:?}", operator);
|
||||
}
|
||||
},
|
||||
None => {
|
||||
if let Some(limit_ctx) = limit_ctx {
|
||||
right_most_ctx.limit_ctx = Some(limit_ctx);
|
||||
right_most.limit = limit;
|
||||
}
|
||||
emit_query(program, &mut right_most, &mut right_most_ctx)?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Creates an ephemeral index that will be used to deduplicate the results of any sub-selects
|
||||
/// that appear before the last UNION operator.
|
||||
fn create_union_dedupe_index(
|
||||
program: &mut ProgramBuilder,
|
||||
first_select_in_compound: &SelectPlan,
|
||||
) -> (usize, Arc<Index>) {
|
||||
let dedupe_index = Arc::new(Index {
|
||||
columns: first_select_in_compound
|
||||
.result_columns
|
||||
.iter()
|
||||
.map(|c| IndexColumn {
|
||||
name: c
|
||||
.name(&first_select_in_compound.table_references)
|
||||
.map(|n| n.to_string())
|
||||
.unwrap_or_default(),
|
||||
order: SortOrder::Asc,
|
||||
pos_in_table: 0,
|
||||
default: None,
|
||||
collation: None, // FIXME: this should be inferred
|
||||
})
|
||||
.collect(),
|
||||
name: "union_dedupe".to_string(),
|
||||
root_page: 0,
|
||||
ephemeral: true,
|
||||
table_name: String::new(),
|
||||
unique: true,
|
||||
has_rowid: false,
|
||||
});
|
||||
let cursor_id = program.alloc_cursor_id(CursorType::BTreeIndex(dedupe_index.clone()));
|
||||
program.emit_insn(Insn::OpenEphemeral {
|
||||
cursor_id,
|
||||
is_table: false,
|
||||
});
|
||||
(cursor_id, dedupe_index.clone())
|
||||
}
|
||||
|
||||
/// Emits the bytecode for reading deduplicated rows from the ephemeral index created for UNION operators.
|
||||
fn read_deduplicated_union_rows(
|
||||
program: &mut ProgramBuilder,
|
||||
dedupe_cursor_id: usize,
|
||||
dedupe_index: &Index,
|
||||
limit_ctx: Option<LimitCtx>,
|
||||
label_limit_reached: BranchOffset,
|
||||
yield_reg: Option<usize>,
|
||||
) {
|
||||
let label_dedupe_next = program.allocate_label();
|
||||
let label_dedupe_loop_start = program.allocate_label();
|
||||
let dedupe_cols_start_reg = program.alloc_registers(dedupe_index.columns.len());
|
||||
program.emit_insn(Insn::Rewind {
|
||||
cursor_id: dedupe_cursor_id,
|
||||
pc_if_empty: label_dedupe_next,
|
||||
});
|
||||
program.preassign_label_to_next_insn(label_dedupe_loop_start);
|
||||
for col_idx in 0..dedupe_index.columns.len() {
|
||||
let start_reg = if let Some(yield_reg) = yield_reg {
|
||||
// Need to reuse the yield_reg for the column being emitted
|
||||
yield_reg + 1
|
||||
} else {
|
||||
dedupe_cols_start_reg
|
||||
};
|
||||
program.emit_insn(Insn::Column {
|
||||
cursor_id: dedupe_cursor_id,
|
||||
column: col_idx,
|
||||
dest: start_reg + col_idx,
|
||||
default: None,
|
||||
});
|
||||
}
|
||||
if let Some(yield_reg) = yield_reg {
|
||||
program.emit_insn(Insn::Yield {
|
||||
yield_reg,
|
||||
end_offset: BranchOffset::Offset(0),
|
||||
});
|
||||
} else {
|
||||
program.emit_insn(Insn::ResultRow {
|
||||
start_reg: dedupe_cols_start_reg,
|
||||
count: dedupe_index.columns.len(),
|
||||
});
|
||||
}
|
||||
|
||||
if let Some(limit_ctx) = limit_ctx {
|
||||
program.emit_insn(Insn::DecrJumpZero {
|
||||
reg: limit_ctx.reg_limit,
|
||||
target_pc: label_limit_reached,
|
||||
})
|
||||
}
|
||||
program.preassign_label_to_next_insn(label_dedupe_next);
|
||||
program.emit_insn(Insn::Next {
|
||||
cursor_id: dedupe_cursor_id,
|
||||
pc_if_next: label_dedupe_loop_start,
|
||||
});
|
||||
program.emit_insn(Insn::Close {
|
||||
cursor_id: dedupe_cursor_id,
|
||||
});
|
||||
}
|
||||
@@ -30,17 +30,17 @@ impl Display for Plan {
|
||||
match self {
|
||||
Self::Select(select_plan) => select_plan.fmt(f),
|
||||
Self::CompoundSelect {
|
||||
first,
|
||||
rest,
|
||||
left,
|
||||
right_most,
|
||||
limit,
|
||||
offset,
|
||||
order_by,
|
||||
} => {
|
||||
first.fmt(f)?;
|
||||
for (plan, operator) in rest {
|
||||
for (plan, operator) in left {
|
||||
writeln!(f, "{}", operator)?;
|
||||
plan.fmt(f)?;
|
||||
}
|
||||
right_most.fmt(f)?;
|
||||
if let Some(limit) = limit {
|
||||
writeln!(f, "LIMIT: {}", limit)?;
|
||||
}
|
||||
@@ -268,28 +268,24 @@ impl ToSqlString for Plan {
|
||||
match self {
|
||||
Self::Select(select) => select.to_sql_string(&PlanContext(&[&select.table_references])),
|
||||
Self::CompoundSelect {
|
||||
first,
|
||||
rest,
|
||||
left,
|
||||
right_most,
|
||||
limit,
|
||||
offset,
|
||||
order_by,
|
||||
} => {
|
||||
let all_refs = std::iter::once(&first.table_references)
|
||||
.chain(
|
||||
rest.iter()
|
||||
.flat_map(|(plan, _)| std::iter::once(&plan.table_references)),
|
||||
)
|
||||
let all_refs = left
|
||||
.iter()
|
||||
.flat_map(|(plan, _)| std::iter::once(&plan.table_references))
|
||||
.chain(std::iter::once(&right_most.table_references))
|
||||
.collect::<Vec<_>>();
|
||||
let context = &PlanContext(all_refs.as_slice());
|
||||
|
||||
let mut ret = vec![first.to_sql_string(context)];
|
||||
for (other_plan, operator) in rest {
|
||||
ret.push(format!(
|
||||
"{} {}",
|
||||
operator,
|
||||
other_plan.to_sql_string(context),
|
||||
));
|
||||
let mut ret = Vec::new();
|
||||
for (plan, operator) in left {
|
||||
ret.push(format!("{} {}", operator, plan.to_sql_string(context)));
|
||||
}
|
||||
ret.push(right_most.to_sql_string(context));
|
||||
if let Some(order_by) = &order_by {
|
||||
ret.push(format!(
|
||||
"ORDER BY {}",
|
||||
|
||||
@@ -2,9 +2,8 @@
|
||||
// It handles translating high-level SQL operations into low-level bytecode that can be executed by the virtual machine.
|
||||
|
||||
use std::rc::Rc;
|
||||
use std::sync::Arc;
|
||||
|
||||
use limbo_sqlite3_parser::ast::{self, SortOrder};
|
||||
use limbo_sqlite3_parser::ast::{self};
|
||||
use tracing::{instrument, Level};
|
||||
|
||||
use super::aggregation::emit_ungrouped_aggregation;
|
||||
@@ -16,14 +15,13 @@ use super::main_loop::{
|
||||
close_loop, emit_loop, init_distinct, init_loop, open_loop, LeftJoinMetadata, LoopLabels,
|
||||
};
|
||||
use super::order_by::{emit_order_by, init_order_by, SortMetadata};
|
||||
use super::plan::{
|
||||
JoinOrderMember, Operation, QueryDestination, SelectPlan, TableReferences, UpdatePlan,
|
||||
};
|
||||
use super::plan::{JoinOrderMember, Operation, SelectPlan, TableReferences, UpdatePlan};
|
||||
use super::select::emit_simple_count;
|
||||
use super::subquery::emit_subqueries;
|
||||
use crate::error::SQLITE_CONSTRAINT_PRIMARYKEY;
|
||||
use crate::function::Func;
|
||||
use crate::schema::{Index, IndexColumn, Schema};
|
||||
use crate::schema::Schema;
|
||||
use crate::translate::compound_select::emit_program_for_compound_select;
|
||||
use crate::translate::plan::{DeletePlan, Plan, Search};
|
||||
use crate::translate::values::emit_values;
|
||||
use crate::util::exprs_are_equivalent;
|
||||
@@ -193,278 +191,6 @@ pub fn emit_program(
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
fn emit_program_for_compound_select(
|
||||
program: &mut ProgramBuilder,
|
||||
plan: Plan,
|
||||
schema: &Schema,
|
||||
syms: &SymbolTable,
|
||||
) -> Result<()> {
|
||||
let Plan::CompoundSelect {
|
||||
mut first,
|
||||
mut rest,
|
||||
limit,
|
||||
..
|
||||
} = plan
|
||||
else {
|
||||
crate::bail_parse_error!("expected compound select plan");
|
||||
};
|
||||
|
||||
// Trivial exit on LIMIT 0
|
||||
if let Some(limit) = limit {
|
||||
if limit == 0 {
|
||||
program.epilogue(TransactionMode::Read);
|
||||
program.result_columns = first.result_columns;
|
||||
program.table_references.extend(first.table_references);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
// Each subselect gets their own TranslateCtx, but they share the same limit_ctx
|
||||
// because the LIMIT applies to the entire compound select, not just a single subselect.
|
||||
// The way LIMIT works with compound selects is:
|
||||
// - If a given subselect appears BEFORE any UNION, then do NOT count those rows towards the LIMIT,
|
||||
// because the rows from those subselects need to be deduplicated before they start being counted.
|
||||
// - If a given subselect appears AFTER the last UNION, then count those rows towards the LIMIT immediately.
|
||||
let limit_ctx = limit.map(|limit| {
|
||||
let reg = program.alloc_register();
|
||||
program.emit_insn(Insn::Integer {
|
||||
value: limit as i64,
|
||||
dest: reg,
|
||||
});
|
||||
LimitCtx::new_shared(reg)
|
||||
});
|
||||
|
||||
// Each subselect gets their own TranslateCtx.
|
||||
let mut t_ctx_list = Vec::with_capacity(rest.len() + 1);
|
||||
t_ctx_list.push(TranslateCtx::new(
|
||||
program,
|
||||
schema,
|
||||
syms,
|
||||
first.table_references.joined_tables().len(),
|
||||
first.result_columns.len(),
|
||||
));
|
||||
rest.iter().for_each(|(select, _)| {
|
||||
let t_ctx = TranslateCtx::new(
|
||||
program,
|
||||
schema,
|
||||
syms,
|
||||
select.table_references.joined_tables().len(),
|
||||
select.result_columns.len(),
|
||||
);
|
||||
t_ctx_list.push(t_ctx);
|
||||
});
|
||||
|
||||
// Compound select operators have the same precedence and are left-associative.
|
||||
// If there is any remaining UNION operator on the right side of a given sub-SELECT,
|
||||
// all of the rows from the preceding UNION arms need to be deduplicated.
|
||||
// This is done by creating an ephemeral index and inserting all the rows from the left side of
|
||||
// the last UNION arm into it.
|
||||
// Then, as soon as there are no more UNION operators left, all the deduplicated rows from the
|
||||
// ephemeral index are emitted, and lastly the rows from the remaining sub-SELECTS are emitted
|
||||
// as is, as they don't require deduplication.
|
||||
let mut first_t_ctx = t_ctx_list.remove(0);
|
||||
let requires_union_deduplication = rest
|
||||
.iter()
|
||||
.any(|(_, operator)| operator == &ast::CompoundOperator::Union);
|
||||
if requires_union_deduplication {
|
||||
// appears BEFORE a UNION operator, so do not count those rows towards the LIMIT.
|
||||
first.limit = None;
|
||||
} else {
|
||||
// appears AFTER the last UNION operator, so count those rows towards the LIMIT.
|
||||
first_t_ctx.limit_ctx = limit_ctx;
|
||||
}
|
||||
|
||||
let mut registers_subqery = None;
|
||||
let yield_reg = match first.query_destination {
|
||||
QueryDestination::CoroutineYield { yield_reg, .. } => {
|
||||
registers_subqery = Some(program.alloc_registers(first.result_columns.len()));
|
||||
first_t_ctx.reg_result_cols_start = registers_subqery.clone();
|
||||
Some(yield_reg)
|
||||
}
|
||||
_ => None,
|
||||
};
|
||||
|
||||
let mut union_dedupe_index = if requires_union_deduplication {
|
||||
let dedupe_index = get_union_dedupe_index(program, &first);
|
||||
first.query_destination = QueryDestination::EphemeralIndex {
|
||||
cursor_id: dedupe_index.0,
|
||||
index: dedupe_index.1.clone(),
|
||||
};
|
||||
Some(dedupe_index)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// Emit the first SELECT
|
||||
emit_query(program, &mut first, &mut first_t_ctx)?;
|
||||
|
||||
// Emit the remaining SELECTs. Any selects on the left side of a UNION must deduplicate their
|
||||
// results with the ephemeral index created above.
|
||||
while !t_ctx_list.is_empty() {
|
||||
let label_next_select = program.allocate_label();
|
||||
// If the LIMIT is reached in any subselect, jump to either:
|
||||
// a) the IfNot of the next subselect, or
|
||||
// b) the end of the program
|
||||
if let Some(limit_ctx) = limit_ctx {
|
||||
program.emit_insn(Insn::IfNot {
|
||||
reg: limit_ctx.reg_limit,
|
||||
target_pc: label_next_select,
|
||||
jump_if_null: true,
|
||||
});
|
||||
}
|
||||
let mut t_ctx = t_ctx_list.remove(0);
|
||||
let requires_union_deduplication = rest
|
||||
.iter()
|
||||
.any(|(_, operator)| operator == &ast::CompoundOperator::Union);
|
||||
let (mut select, operator) = rest.remove(0);
|
||||
if operator != ast::CompoundOperator::UnionAll && operator != ast::CompoundOperator::Union {
|
||||
crate::bail_parse_error!("unimplemented compound select operator: {:?}", operator);
|
||||
}
|
||||
|
||||
if requires_union_deduplication {
|
||||
// Again: appears BEFORE a UNION operator, so do not count those rows towards the LIMIT.
|
||||
select.limit = None;
|
||||
} else {
|
||||
// appears AFTER the last UNION operator, so count those rows towards the LIMIT.
|
||||
t_ctx.limit_ctx = limit_ctx;
|
||||
}
|
||||
|
||||
if requires_union_deduplication {
|
||||
select.query_destination = QueryDestination::EphemeralIndex {
|
||||
cursor_id: union_dedupe_index.as_ref().unwrap().0,
|
||||
index: union_dedupe_index.as_ref().unwrap().1.clone(),
|
||||
};
|
||||
} else if let Some((dedupe_cursor_id, dedupe_index)) = union_dedupe_index.take() {
|
||||
// When there are no more UNION operators left, all the deduplicated rows from the preceding union arms need to be emitted
|
||||
// as result rows.
|
||||
read_deduplicated_union_rows(
|
||||
program,
|
||||
dedupe_cursor_id,
|
||||
dedupe_index.as_ref(),
|
||||
limit_ctx,
|
||||
label_next_select,
|
||||
yield_reg.clone(),
|
||||
);
|
||||
}
|
||||
if matches!(
|
||||
select.query_destination,
|
||||
crate::translate::plan::QueryDestination::CoroutineYield { .. }
|
||||
) {
|
||||
// Need to reuse the same registers when you are yielding
|
||||
t_ctx.reg_result_cols_start = registers_subqery.clone();
|
||||
}
|
||||
emit_query(program, &mut select, &mut t_ctx)?;
|
||||
program.preassign_label_to_next_insn(label_next_select);
|
||||
}
|
||||
|
||||
if let Some((dedupe_cursor_id, dedupe_index)) = union_dedupe_index {
|
||||
let label_jump_over_dedupe = program.allocate_label();
|
||||
read_deduplicated_union_rows(
|
||||
program,
|
||||
dedupe_cursor_id,
|
||||
dedupe_index.as_ref(),
|
||||
limit_ctx,
|
||||
label_jump_over_dedupe,
|
||||
yield_reg,
|
||||
);
|
||||
program.preassign_label_to_next_insn(label_jump_over_dedupe);
|
||||
}
|
||||
|
||||
program.epilogue(TransactionMode::Read);
|
||||
program.result_columns = first.result_columns;
|
||||
program.table_references.extend(first.table_references);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Creates an ephemeral index that will be used to deduplicate the results of any sub-selects
|
||||
/// that appear before the last UNION operator.
|
||||
fn get_union_dedupe_index(
|
||||
program: &mut ProgramBuilder,
|
||||
first_select_in_compound: &SelectPlan,
|
||||
) -> (usize, Arc<Index>) {
|
||||
let dedupe_index = Arc::new(Index {
|
||||
columns: first_select_in_compound
|
||||
.result_columns
|
||||
.iter()
|
||||
.map(|c| IndexColumn {
|
||||
name: c
|
||||
.name(&first_select_in_compound.table_references)
|
||||
.map(|n| n.to_string())
|
||||
.unwrap_or_default(),
|
||||
order: SortOrder::Asc,
|
||||
pos_in_table: 0,
|
||||
collation: None, // FIXME: this should be inferred
|
||||
default: None,
|
||||
})
|
||||
.collect(),
|
||||
name: "union_dedupe".to_string(),
|
||||
root_page: 0,
|
||||
ephemeral: true,
|
||||
table_name: String::new(),
|
||||
unique: true,
|
||||
has_rowid: false,
|
||||
});
|
||||
let cursor_id = program.alloc_cursor_id(CursorType::BTreeIndex(dedupe_index.clone()));
|
||||
program.emit_insn(Insn::OpenEphemeral {
|
||||
cursor_id,
|
||||
is_table: false,
|
||||
});
|
||||
(cursor_id, dedupe_index.clone())
|
||||
}
|
||||
|
||||
/// Emits the bytecode for reading deduplicated rows from the ephemeral index created for UNION operators.
|
||||
fn read_deduplicated_union_rows(
|
||||
program: &mut ProgramBuilder,
|
||||
dedupe_cursor_id: usize,
|
||||
dedupe_index: &Index,
|
||||
limit_ctx: Option<LimitCtx>,
|
||||
label_limit_reached: BranchOffset,
|
||||
yield_reg: Option<usize>,
|
||||
) {
|
||||
let label_dedupe_next = program.allocate_label();
|
||||
let label_dedupe_loop_start = program.allocate_label();
|
||||
let dedupe_cols_start_reg = program.alloc_registers(dedupe_index.columns.len());
|
||||
program.emit_insn(Insn::Rewind {
|
||||
cursor_id: dedupe_cursor_id,
|
||||
pc_if_empty: label_dedupe_next,
|
||||
});
|
||||
program.preassign_label_to_next_insn(label_dedupe_loop_start);
|
||||
for col_idx in 0..dedupe_index.columns.len() {
|
||||
let start_reg = if let Some(yield_reg) = yield_reg {
|
||||
// Need to reuse the yield_reg for the column being emitted
|
||||
yield_reg + 1
|
||||
} else {
|
||||
dedupe_cols_start_reg
|
||||
};
|
||||
program.emit_column(dedupe_cursor_id, col_idx, start_reg + col_idx);
|
||||
}
|
||||
if let Some(yield_reg) = yield_reg {
|
||||
program.emit_insn(Insn::Yield {
|
||||
yield_reg,
|
||||
end_offset: BranchOffset::Offset(0),
|
||||
});
|
||||
} else {
|
||||
program.emit_insn(Insn::ResultRow {
|
||||
start_reg: dedupe_cols_start_reg,
|
||||
count: dedupe_index.columns.len(),
|
||||
});
|
||||
}
|
||||
|
||||
if let Some(limit_ctx) = limit_ctx {
|
||||
program.emit_insn(Insn::DecrJumpZero {
|
||||
reg: limit_ctx.reg_limit,
|
||||
target_pc: label_limit_reached,
|
||||
})
|
||||
}
|
||||
program.preassign_label_to_next_insn(label_dedupe_next);
|
||||
program.emit_insn(Insn::Next {
|
||||
cursor_id: dedupe_cursor_id,
|
||||
pc_if_next: label_dedupe_loop_start,
|
||||
});
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::TRACE)]
|
||||
fn emit_program_for_select(
|
||||
program: &mut ProgramBuilder,
|
||||
|
||||
@@ -10,6 +10,7 @@
|
||||
pub(crate) mod aggregation;
|
||||
pub(crate) mod alter;
|
||||
pub(crate) mod collate;
|
||||
mod compound_select;
|
||||
pub(crate) mod delete;
|
||||
pub(crate) mod display;
|
||||
pub(crate) mod emitter;
|
||||
|
||||
@@ -41,9 +41,11 @@ pub fn optimize_plan(plan: &mut Plan, schema: &Schema) -> Result<()> {
|
||||
Plan::Select(plan) => optimize_select_plan(plan, schema)?,
|
||||
Plan::Delete(plan) => optimize_delete_plan(plan, schema)?,
|
||||
Plan::Update(plan) => optimize_update_plan(plan, schema)?,
|
||||
Plan::CompoundSelect { first, rest, .. } => {
|
||||
optimize_select_plan(first, schema)?;
|
||||
for (plan, _) in rest {
|
||||
Plan::CompoundSelect {
|
||||
left, right_most, ..
|
||||
} => {
|
||||
optimize_select_plan(right_most, schema)?;
|
||||
for (plan, _) in left {
|
||||
optimize_select_plan(plan, schema)?;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -292,8 +292,8 @@ impl Ord for EvalAt {
|
||||
pub enum Plan {
|
||||
Select(SelectPlan),
|
||||
CompoundSelect {
|
||||
first: SelectPlan,
|
||||
rest: Vec<(SelectPlan, ast::CompoundOperator)>,
|
||||
left: Vec<(SelectPlan, ast::CompoundOperator)>,
|
||||
right_most: SelectPlan,
|
||||
limit: Option<isize>,
|
||||
offset: Option<isize>,
|
||||
order_by: Option<Vec<(ast::Expr, SortOrder)>>,
|
||||
|
||||
@@ -52,24 +52,26 @@ pub fn translate_select(
|
||||
approx_num_labels: estimate_num_labels(select),
|
||||
}
|
||||
}
|
||||
Plan::CompoundSelect { first, rest, .. } => {
|
||||
Plan::CompoundSelect {
|
||||
left, right_most, ..
|
||||
} => {
|
||||
// Compound Selects must return the same number of columns
|
||||
num_result_cols = first.result_columns.len();
|
||||
num_result_cols = right_most.result_columns.len();
|
||||
|
||||
ProgramBuilderOpts {
|
||||
query_mode,
|
||||
num_cursors: count_plan_required_cursors(first)
|
||||
+ rest
|
||||
num_cursors: count_plan_required_cursors(right_most)
|
||||
+ left
|
||||
.iter()
|
||||
.map(|(plan, _)| count_plan_required_cursors(plan))
|
||||
.sum::<usize>(),
|
||||
approx_num_insns: estimate_num_instructions(first)
|
||||
+ rest
|
||||
approx_num_insns: estimate_num_instructions(right_most)
|
||||
+ left
|
||||
.iter()
|
||||
.map(|(plan, _)| estimate_num_instructions(plan))
|
||||
.sum::<usize>(),
|
||||
approx_num_labels: estimate_num_labels(first)
|
||||
+ rest
|
||||
approx_num_labels: estimate_num_labels(right_most)
|
||||
+ left
|
||||
.iter()
|
||||
.map(|(plan, _)| estimate_num_labels(plan))
|
||||
.sum::<usize>(),
|
||||
@@ -111,7 +113,7 @@ pub fn prepare_select_plan<'a>(
|
||||
)?))
|
||||
}
|
||||
Some(compounds) => {
|
||||
let mut first = prepare_one_select_plan(
|
||||
let mut last = prepare_one_select_plan(
|
||||
schema,
|
||||
*select.body.select,
|
||||
None,
|
||||
@@ -122,7 +124,8 @@ pub fn prepare_select_plan<'a>(
|
||||
table_ref_counter,
|
||||
query_destination.clone(),
|
||||
)?;
|
||||
let mut rest = Vec::with_capacity(compounds.len());
|
||||
|
||||
let mut left = Vec::with_capacity(compounds.len());
|
||||
for CompoundSelect { select, operator } in compounds {
|
||||
// TODO: add support for EXCEPT and INTERSECT
|
||||
if operator != ast::CompoundOperator::UnionAll
|
||||
@@ -132,7 +135,8 @@ pub fn prepare_select_plan<'a>(
|
||||
"only UNION ALL and UNION are supported for compound SELECTs"
|
||||
);
|
||||
}
|
||||
let plan = prepare_one_select_plan(
|
||||
left.push((last, operator));
|
||||
last = prepare_one_select_plan(
|
||||
schema,
|
||||
*select,
|
||||
None,
|
||||
@@ -143,22 +147,17 @@ pub fn prepare_select_plan<'a>(
|
||||
table_ref_counter,
|
||||
query_destination.clone(),
|
||||
)?;
|
||||
rest.push((plan, operator));
|
||||
}
|
||||
|
||||
// Ensure all subplans have same number of result columns
|
||||
let first_num_result_columns = first.result_columns.len();
|
||||
for (plan, operator) in rest.iter() {
|
||||
let first_num_result_columns = last.result_columns.len();
|
||||
for (plan, operator) in left.iter() {
|
||||
if plan.result_columns.len() != first_num_result_columns {
|
||||
crate::bail_parse_error!("SELECTs to the left and right of {} do not have the same number of result columns", operator);
|
||||
}
|
||||
}
|
||||
let (limit, offset) = select.limit.map_or(Ok((None, None)), |l| parse_limit(&l))?;
|
||||
|
||||
first.limit = limit.clone();
|
||||
for (plan, _) in rest.iter_mut() {
|
||||
plan.limit = limit.clone();
|
||||
}
|
||||
|
||||
// FIXME: handle OFFSET for compound selects
|
||||
if offset.map_or(false, |o| o > 0) {
|
||||
crate::bail_parse_error!("OFFSET is not supported for compound SELECTs yet");
|
||||
@@ -172,8 +171,8 @@ pub fn prepare_select_plan<'a>(
|
||||
crate::bail_parse_error!("WITH is not supported for compound SELECTs yet");
|
||||
}
|
||||
Ok(Plan::CompoundSelect {
|
||||
first,
|
||||
rest,
|
||||
left,
|
||||
right_most: last,
|
||||
limit,
|
||||
offset,
|
||||
order_by: None,
|
||||
|
||||
Binary file not shown.
Reference in New Issue
Block a user