support intersect operation for compound select

This commit is contained in:
meteorgan
2025-06-12 23:08:03 +08:00
parent e162b56d01
commit cd36fc26fd

View File

@@ -202,6 +202,52 @@ fn emit_compound_select(
program.preassign_label_to_next_insn(label_jump_over_dedupe);
}
}
CompoundOperator::Intersect => {
let mut target_cursor_id = None;
match right_most.query_destination {
QueryDestination::EphemeralIndex { cursor_id, .. } => {
target_cursor_id = Some(cursor_id);
}
QueryDestination::ResultRows => {}
QueryDestination::CoroutineYield { .. } => {}
}
let (left_cursor_id, left_index) = create_dedupe_index(program, &right_most);
plan.query_destination = QueryDestination::EphemeralIndex {
cursor_id: left_cursor_id,
index: left_index.clone(),
};
let compound_select = Plan::CompoundSelect {
left,
right_most: plan,
limit,
offset,
order_by,
};
emit_compound_select(
program,
compound_select,
schema,
syms,
None,
yield_reg,
reg_result_cols_start,
)?;
let (right_cursor_id, right_index) = create_dedupe_index(program, &right_most);
right_most.query_destination = QueryDestination::EphemeralIndex {
cursor_id: right_cursor_id,
index: right_index,
};
emit_query(program, &mut right_most, &mut right_most_ctx)?;
read_intersect_rows(
program,
left_cursor_id,
&left_index,
right_cursor_id,
target_cursor_id,
);
}
_ => {
crate::bail_parse_error!("unimplemented compound select operator: {:?}", operator);
}
@@ -218,19 +264,15 @@ fn emit_compound_select(
Ok(())
}
/// Creates an ephemeral index that will be used to deduplicate the results of any sub-selects
/// that appear before the last UNION operator.
fn create_union_dedupe_index(
program: &mut ProgramBuilder,
first_select_in_compound: &SelectPlan,
) -> (usize, Arc<Index>) {
// Creates an ephemeral index that will be used to deduplicate the results of any sub-selects
fn create_dedupe_index(program: &mut ProgramBuilder, select: &SelectPlan) -> (usize, Arc<Index>) {
let dedupe_index = Arc::new(Index {
columns: first_select_in_compound
columns: select
.result_columns
.iter()
.map(|c| IndexColumn {
name: c
.name(&first_select_in_compound.table_references)
.name(&select.table_references)
.map(|n| n.to_string())
.unwrap_or_default(),
order: SortOrder::Asc,
@@ -239,7 +281,7 @@ fn create_union_dedupe_index(
collation: None, // FIXME: this should be inferred
})
.collect(),
name: "union_dedupe".to_string(),
name: "compound_dedupe".to_string(),
root_page: 0,
ephemeral: true,
table_name: String::new(),
@@ -312,3 +354,56 @@ fn read_deduplicated_union_rows(
cursor_id: dedupe_cursor_id,
});
}
// Emits the bytecode for Reading rows from the intersection of two cursors.
fn read_intersect_rows(
program: &mut ProgramBuilder,
left_cursor_id: usize,
index: &Index,
right_cursor_id: usize,
target_cursor_id: Option<usize>,
) {
let label_close = program.allocate_label();
let label_loop_start = program.allocate_label();
program.emit_insn(Insn::Rewind {
cursor_id: left_cursor_id,
pc_if_empty: label_close,
});
program.preassign_label_to_next_insn(label_loop_start);
let row_content_reg = program.alloc_register();
// we need to emit opcode RowData here
let label_next = program.allocate_label();
program.emit_insn(Insn::NotFound {
cursor_id: right_cursor_id,
target_pc: label_next,
record_reg: row_content_reg,
num_regs: 0,
});
let cols_start_reg = program.alloc_registers(index.columns.len());
for i in 0..index.columns.len() {
program.emit_insn(Insn::Column {
cursor_id: left_cursor_id,
column: i,
dest: cols_start_reg + i,
});
}
program.emit_insn(Insn::ResultRow {
start_reg: cols_start_reg,
count: index.columns.len(),
});
program.preassign_label_to_next_insn(label_next);
program.emit_insn(Insn::Next {
cursor_id: left_cursor_id,
pc_if_next: label_loop_start,
});
program.preassign_label_to_next_insn(label_close);
program.emit_insn(Insn::Close {
cursor_id: right_cursor_id,
});
program.emit_insn(Insn::Close {
cursor_id: left_cursor_id,
});
}