mirror of
https://github.com/aljazceru/turso.git
synced 2026-01-07 02:04:21 +01:00
Merge 'Fix LIMIT handling' from Jussi Saurio
Currently we have some usages of LIMIT where the actual limit counter is initialized next to the DecrJumpZero instruction, and then `program.mark_last_insn_constant()` is used to hoist the counter initialization to the beginning of the program. This is very fragile, and already FROM clause subquery handling works around this with a hack (removed in this PR), and (upcoming) WHERE clause subqueries would also run into problems because of this, because the LIMIT might need to be initialized once for every iteration of the subquery. This PR removes those usages for LIMIT, and LIMIT processing is now more intuitive: - limit counter is now initialized at the start of the query processing - a function init_limit() is extracted to do this for select/update/delete Closes #1591
This commit is contained in:
@@ -54,7 +54,6 @@ pub fn emit_ungrouped_aggregation<'a>(
|
||||
t_ctx.reg_offset,
|
||||
t_ctx.reg_result_cols_start.unwrap(),
|
||||
t_ctx.limit_ctx,
|
||||
t_ctx.reg_limit_offset_sum,
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -519,21 +519,7 @@ pub fn emit_query<'a>(
|
||||
// Emit subqueries first so the results can be read in the main query loop.
|
||||
emit_subqueries(program, t_ctx, &mut plan.table_references)?;
|
||||
|
||||
if t_ctx.limit_ctx.is_none() {
|
||||
t_ctx.limit_ctx = plan.limit.map(|_| LimitCtx::new(program));
|
||||
}
|
||||
|
||||
if t_ctx.reg_offset.is_none() {
|
||||
t_ctx.reg_offset = t_ctx
|
||||
.reg_offset
|
||||
.or_else(|| plan.offset.map(|_| program.alloc_register()));
|
||||
}
|
||||
|
||||
if t_ctx.reg_limit_offset_sum.is_none() {
|
||||
t_ctx.reg_limit_offset_sum = t_ctx
|
||||
.reg_limit_offset_sum
|
||||
.or_else(|| plan.offset.map(|_| program.alloc_register()));
|
||||
}
|
||||
init_limit(program, t_ctx, plan.limit, plan.offset);
|
||||
|
||||
// No rows will be read from source table loops if there is a constant false condition eg. WHERE 0
|
||||
// however an aggregation might still happen,
|
||||
@@ -676,6 +662,8 @@ fn emit_program_for_delete(
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
init_limit(program, &mut t_ctx, plan.limit, None);
|
||||
|
||||
// No rows will be read from source table loops if there is a constant false condition eg. WHERE 0
|
||||
let after_main_loop_label = program.allocate_label();
|
||||
t_ctx.label_main_loop_end = Some(after_main_loop_label);
|
||||
@@ -703,13 +691,7 @@ fn emit_program_for_delete(
|
||||
&[JoinOrderMember::default()],
|
||||
&mut plan.where_clause,
|
||||
)?;
|
||||
emit_delete_insns(
|
||||
program,
|
||||
&mut t_ctx,
|
||||
&plan.table_references,
|
||||
&plan.indexes,
|
||||
&plan.limit,
|
||||
)?;
|
||||
emit_delete_insns(program, &mut t_ctx, &plan.table_references, &plan.indexes)?;
|
||||
|
||||
// Clean up and close the main execution loop
|
||||
close_loop(
|
||||
@@ -732,7 +714,6 @@ fn emit_delete_insns(
|
||||
t_ctx: &mut TranslateCtx,
|
||||
table_references: &[TableReference],
|
||||
index_references: &[Arc<Index>],
|
||||
limit: &Option<isize>,
|
||||
) -> Result<()> {
|
||||
let table_reference = table_references.first().unwrap();
|
||||
let cursor_id = match &table_reference.op {
|
||||
@@ -810,15 +791,9 @@ fn emit_delete_insns(
|
||||
cursor_id: main_table_cursor_id,
|
||||
});
|
||||
}
|
||||
if let Some(limit) = limit {
|
||||
let limit_reg = program.alloc_register();
|
||||
program.emit_insn(Insn::Integer {
|
||||
value: *limit as i64,
|
||||
dest: limit_reg,
|
||||
});
|
||||
program.mark_last_insn_constant();
|
||||
if let Some(limit_ctx) = t_ctx.limit_ctx {
|
||||
program.emit_insn(Insn::DecrJumpZero {
|
||||
reg: limit_reg,
|
||||
reg: limit_ctx.reg_limit,
|
||||
target_pc: t_ctx.label_main_loop_end.unwrap(),
|
||||
})
|
||||
}
|
||||
@@ -847,30 +822,8 @@ fn emit_program_for_update(
|
||||
program.table_references = plan.table_references;
|
||||
return Ok(());
|
||||
}
|
||||
if t_ctx.limit_ctx.is_none() && plan.limit.is_some() {
|
||||
t_ctx.limit_ctx = Some(LimitCtx::new(program));
|
||||
program.emit_insn(Insn::Integer {
|
||||
value: plan.limit.unwrap() as i64,
|
||||
dest: t_ctx.limit_ctx.unwrap().reg_limit,
|
||||
});
|
||||
program.mark_last_insn_constant();
|
||||
if t_ctx.reg_offset.is_none() && plan.offset.is_some_and(|n| n.ne(&0)) {
|
||||
let reg = program.alloc_register();
|
||||
t_ctx.reg_offset = Some(reg);
|
||||
program.emit_insn(Insn::Integer {
|
||||
value: plan.offset.unwrap() as i64,
|
||||
dest: reg,
|
||||
});
|
||||
program.mark_last_insn_constant();
|
||||
let combined_reg = program.alloc_register();
|
||||
t_ctx.reg_limit_offset_sum = Some(combined_reg);
|
||||
program.emit_insn(Insn::OffsetLimit {
|
||||
limit_reg: t_ctx.limit_ctx.unwrap().reg_limit,
|
||||
offset_reg: reg,
|
||||
combined_reg,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
init_limit(program, &mut t_ctx, plan.limit, plan.offset);
|
||||
let after_main_loop_label = program.allocate_label();
|
||||
t_ctx.label_main_loop_end = Some(after_main_loop_label);
|
||||
if plan.contains_constant_false_condition {
|
||||
@@ -1357,3 +1310,41 @@ fn emit_update_insns(
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Initialize the limit/offset counters and registers.
|
||||
/// In case of compound SELECTs, the limit counter is initialized only once,
|
||||
/// hence [LimitCtx::initialize_counter] being false in those cases.
|
||||
fn init_limit(
|
||||
program: &mut ProgramBuilder,
|
||||
t_ctx: &mut TranslateCtx,
|
||||
limit: Option<isize>,
|
||||
offset: Option<isize>,
|
||||
) {
|
||||
if t_ctx.limit_ctx.is_none() {
|
||||
t_ctx.limit_ctx = limit.map(|_| LimitCtx::new(program));
|
||||
}
|
||||
let Some(limit_ctx) = t_ctx.limit_ctx else {
|
||||
return;
|
||||
};
|
||||
if limit_ctx.initialize_counter {
|
||||
program.emit_insn(Insn::Integer {
|
||||
value: limit.expect("limit must be Some if limit_ctx is Some") as i64,
|
||||
dest: limit_ctx.reg_limit,
|
||||
});
|
||||
}
|
||||
if t_ctx.reg_offset.is_none() && offset.is_some_and(|n| n.ne(&0)) {
|
||||
let reg = program.alloc_register();
|
||||
t_ctx.reg_offset = Some(reg);
|
||||
program.emit_insn(Insn::Integer {
|
||||
value: offset.unwrap() as i64,
|
||||
dest: reg,
|
||||
});
|
||||
let combined_reg = program.alloc_register();
|
||||
t_ctx.reg_limit_offset_sum = Some(combined_reg);
|
||||
program.emit_insn(Insn::OffsetLimit {
|
||||
limit_reg: t_ctx.limit_ctx.unwrap().reg_limit,
|
||||
offset_reg: reg,
|
||||
combined_reg,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -899,7 +899,6 @@ pub fn group_by_emit_row_phase<'a>(
|
||||
t_ctx.reg_offset,
|
||||
t_ctx.reg_result_cols_start.unwrap(),
|
||||
t_ctx.limit_ctx,
|
||||
t_ctx.reg_limit_offset_sum,
|
||||
)?;
|
||||
}
|
||||
Some(_) => {
|
||||
|
||||
@@ -890,7 +890,6 @@ fn emit_loop_source<'a>(
|
||||
t_ctx.reg_offset,
|
||||
t_ctx.reg_result_cols_start.unwrap(),
|
||||
t_ctx.limit_ctx,
|
||||
t_ctx.reg_limit_offset_sum,
|
||||
)?;
|
||||
|
||||
if let Distinctness::Distinct { ctx } = &plan.distinctness {
|
||||
|
||||
@@ -181,8 +181,6 @@ pub fn emit_order_by(
|
||||
plan,
|
||||
start_reg,
|
||||
t_ctx.limit_ctx,
|
||||
t_ctx.reg_offset,
|
||||
t_ctx.reg_limit_offset_sum,
|
||||
Some(sort_loop_end_label),
|
||||
)?;
|
||||
|
||||
|
||||
@@ -27,7 +27,6 @@ pub fn emit_select_result(
|
||||
reg_offset: Option<usize>,
|
||||
reg_result_cols_start: usize,
|
||||
limit_ctx: Option<LimitCtx>,
|
||||
reg_limit_offset_sum: Option<usize>,
|
||||
) -> Result<()> {
|
||||
if let (Some(jump_to), Some(_)) = (offset_jump_to, label_on_limit_reached) {
|
||||
emit_offset(program, plan, jump_to, reg_offset)?;
|
||||
@@ -61,15 +60,7 @@ pub fn emit_select_result(
|
||||
distinct_ctx.emit_deduplication_insns(program, num_regs, start_reg);
|
||||
}
|
||||
|
||||
emit_result_row_and_limit(
|
||||
program,
|
||||
plan,
|
||||
start_reg,
|
||||
limit_ctx,
|
||||
reg_offset,
|
||||
reg_limit_offset_sum,
|
||||
label_on_limit_reached,
|
||||
)?;
|
||||
emit_result_row_and_limit(program, plan, start_reg, limit_ctx, label_on_limit_reached)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -81,8 +72,6 @@ pub fn emit_result_row_and_limit(
|
||||
plan: &SelectPlan,
|
||||
result_columns_start_reg: usize,
|
||||
limit_ctx: Option<LimitCtx>,
|
||||
reg_offset: Option<usize>,
|
||||
reg_limit_offset_sum: Option<usize>,
|
||||
label_on_limit_reached: Option<BranchOffset>,
|
||||
) -> Result<()> {
|
||||
match &plan.query_destination {
|
||||
@@ -119,7 +108,7 @@ pub fn emit_result_row_and_limit(
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(limit) = plan.limit {
|
||||
if plan.limit.is_some() {
|
||||
if label_on_limit_reached.is_none() {
|
||||
// There are cases where LIMIT is ignored, e.g. aggregation without a GROUP BY clause.
|
||||
// We already early return on LIMIT 0, so we can just return here since the n of rows
|
||||
@@ -127,28 +116,6 @@ pub fn emit_result_row_and_limit(
|
||||
return Ok(());
|
||||
}
|
||||
let limit_ctx = limit_ctx.expect("limit_ctx must be Some if plan.limit is Some");
|
||||
if limit_ctx.initialize_counter {
|
||||
program.emit_insn(Insn::Integer {
|
||||
value: limit as i64,
|
||||
dest: limit_ctx.reg_limit,
|
||||
});
|
||||
program.mark_last_insn_constant();
|
||||
}
|
||||
|
||||
if let Some(offset) = plan.offset {
|
||||
program.emit_insn(Insn::Integer {
|
||||
value: offset as i64,
|
||||
dest: reg_offset.expect("reg_offset must be Some"),
|
||||
});
|
||||
program.mark_last_insn_constant();
|
||||
|
||||
program.emit_insn(Insn::OffsetLimit {
|
||||
limit_reg: limit_ctx.reg_limit,
|
||||
combined_reg: reg_limit_offset_sum.expect("reg_limit_offset_sum must be Some"),
|
||||
offset_reg: reg_offset.expect("reg_offset must be Some"),
|
||||
});
|
||||
program.mark_last_insn_constant();
|
||||
}
|
||||
|
||||
program.emit_insn(Insn::DecrJumpZero {
|
||||
reg: limit_ctx.reg_limit,
|
||||
|
||||
@@ -5,7 +5,7 @@ use crate::{
|
||||
};
|
||||
|
||||
use super::{
|
||||
emitter::{emit_query, LimitCtx, Resolver, TranslateCtx},
|
||||
emitter::{emit_query, Resolver, TranslateCtx},
|
||||
main_loop::LoopLabels,
|
||||
plan::{QueryDestination, SelectPlan, TableReference},
|
||||
};
|
||||
@@ -77,9 +77,9 @@ pub fn emit_subquery<'a>(
|
||||
reg_result_cols_start: None,
|
||||
result_column_indexes_in_orderby_sorter: (0..plan.result_columns.len()).collect(),
|
||||
result_columns_to_skip_in_orderby_sorter: None,
|
||||
limit_ctx: plan.limit.map(|_| LimitCtx::new(program)),
|
||||
reg_offset: plan.offset.map(|_| program.alloc_register()),
|
||||
reg_limit_offset_sum: plan.offset.map(|_| program.alloc_register()),
|
||||
limit_ctx: None,
|
||||
reg_offset: None,
|
||||
reg_limit_offset_sum: None,
|
||||
resolver: Resolver::new(t_ctx.resolver.schema, t_ctx.resolver.symbol_table),
|
||||
};
|
||||
let subquery_body_end_label = program.allocate_label();
|
||||
@@ -89,15 +89,6 @@ pub fn emit_subquery<'a>(
|
||||
start_offset: coroutine_implementation_start_offset,
|
||||
});
|
||||
program.preassign_label_to_next_insn(coroutine_implementation_start_offset);
|
||||
// Normally we mark each LIMIT value as a constant insn that is emitted only once, but in the case of a subquery,
|
||||
// we need to initialize it every time the subquery is run; otherwise subsequent runs of the subquery will already
|
||||
// have the LIMIT counter at 0, and will never return rows.
|
||||
if let Some(limit) = plan.limit {
|
||||
program.emit_insn(Insn::Integer {
|
||||
value: limit as i64,
|
||||
dest: metadata.limit_ctx.unwrap().reg_limit,
|
||||
});
|
||||
}
|
||||
let result_column_start_reg = emit_query(program, plan, &mut metadata)?;
|
||||
program.resolve_label(end_coroutine_label, program.offset());
|
||||
program.emit_insn(Insn::EndCoroutine { yield_reg });
|
||||
|
||||
Reference in New Issue
Block a user