mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-20 15:35:29 +01:00
Create plan for Update queries
This commit is contained in:
@@ -11,12 +11,11 @@ use crate::vdbe::{insn::Insn, BranchOffset};
|
||||
use crate::{Result, SymbolTable};
|
||||
|
||||
use super::aggregation::emit_ungrouped_aggregation;
|
||||
use super::expr::{translate_condition_expr, ConditionMetadata};
|
||||
use super::expr::{translate_condition_expr, translate_expr, ConditionMetadata};
|
||||
use super::group_by::{emit_group_by, init_group_by, GroupByMetadata};
|
||||
use super::main_loop::{close_loop, emit_loop, init_loop, open_loop, LeftJoinMetadata, LoopLabels};
|
||||
use super::order_by::{emit_order_by, init_order_by, SortMetadata};
|
||||
use super::plan::Operation;
|
||||
use super::plan::{SelectPlan, TableReference};
|
||||
use super::plan::{Operation, SelectPlan, TableReference, UpdatePlan};
|
||||
use super::subquery::emit_subqueries;
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -89,7 +88,7 @@ pub struct TranslateCtx<'a> {
|
||||
|
||||
/// Used to distinguish database operations
|
||||
#[allow(clippy::upper_case_acronyms, dead_code)]
|
||||
#[derive(Debug, Clone)]
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub enum OperationMode {
|
||||
SELECT,
|
||||
INSERT,
|
||||
@@ -138,6 +137,7 @@ fn epilogue(
|
||||
program: &mut ProgramBuilder,
|
||||
init_label: BranchOffset,
|
||||
start_offset: BranchOffset,
|
||||
write: bool,
|
||||
) -> Result<()> {
|
||||
program.emit_insn(Insn::Halt {
|
||||
err_code: 0,
|
||||
@@ -145,7 +145,7 @@ fn epilogue(
|
||||
});
|
||||
|
||||
program.resolve_label(init_label, program.offset());
|
||||
program.emit_insn(Insn::Transaction { write: false });
|
||||
program.emit_insn(Insn::Transaction { write });
|
||||
|
||||
program.emit_constant_insns();
|
||||
program.emit_insn(Insn::Goto {
|
||||
@@ -161,6 +161,7 @@ pub fn emit_program(program: &mut ProgramBuilder, plan: Plan, syms: &SymbolTable
|
||||
match plan {
|
||||
Plan::Select(plan) => emit_program_for_select(program, plan, syms),
|
||||
Plan::Delete(plan) => emit_program_for_delete(program, plan, syms),
|
||||
Plan::Update(plan) => emit_program_for_update(program, plan, syms),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -179,7 +180,7 @@ fn emit_program_for_select(
|
||||
// Trivial exit on LIMIT 0
|
||||
if let Some(limit) = plan.limit {
|
||||
if limit == 0 {
|
||||
epilogue(program, init_label, start_offset)?;
|
||||
epilogue(program, init_label, start_offset, false)?;
|
||||
program.result_columns = plan.result_columns;
|
||||
program.table_references = plan.table_references;
|
||||
return Ok(());
|
||||
@@ -188,7 +189,7 @@ fn emit_program_for_select(
|
||||
// Emit main parts of query
|
||||
emit_query(program, &mut plan, &mut t_ctx)?;
|
||||
// Finalize program
|
||||
epilogue(program, init_label, start_offset)?;
|
||||
epilogue(program, init_label, start_offset, false)?;
|
||||
program.result_columns = plan.result_columns;
|
||||
program.table_references = plan.table_references;
|
||||
Ok(())
|
||||
@@ -240,7 +241,7 @@ pub fn emit_query<'a>(
|
||||
program,
|
||||
t_ctx,
|
||||
&plan.table_references,
|
||||
&OperationMode::SELECT,
|
||||
OperationMode::SELECT,
|
||||
)?;
|
||||
|
||||
for where_term in plan.where_clause.iter().filter(|wt| wt.is_constant()) {
|
||||
@@ -255,7 +256,7 @@ pub fn emit_query<'a>(
|
||||
&plan.table_references,
|
||||
&where_term.expr,
|
||||
condition_metadata,
|
||||
&mut t_ctx.resolver,
|
||||
&t_ctx.resolver,
|
||||
)?;
|
||||
program.resolve_label(jump_target_when_true, program.offset());
|
||||
}
|
||||
@@ -305,7 +306,7 @@ fn emit_program_for_delete(
|
||||
|
||||
// exit early if LIMIT 0
|
||||
if let Some(0) = plan.limit {
|
||||
epilogue(program, init_label, start_offset)?;
|
||||
epilogue(program, init_label, start_offset, true)?;
|
||||
program.result_columns = plan.result_columns;
|
||||
program.table_references = plan.table_references;
|
||||
return Ok(());
|
||||
@@ -325,7 +326,7 @@ fn emit_program_for_delete(
|
||||
program,
|
||||
&mut t_ctx,
|
||||
&plan.table_references,
|
||||
&OperationMode::DELETE,
|
||||
OperationMode::DELETE,
|
||||
)?;
|
||||
|
||||
// Set up main query execution loop
|
||||
@@ -343,7 +344,7 @@ fn emit_program_for_delete(
|
||||
program.resolve_label(after_main_loop_label, program.offset());
|
||||
|
||||
// Finalize program
|
||||
epilogue(program, init_label, start_offset)?;
|
||||
epilogue(program, init_label, start_offset, true)?;
|
||||
program.result_columns = plan.result_columns;
|
||||
program.table_references = plan.table_references;
|
||||
Ok(())
|
||||
@@ -409,3 +410,161 @@ fn emit_delete_insns(
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn emit_program_for_update(
|
||||
program: &mut ProgramBuilder,
|
||||
plan: UpdatePlan,
|
||||
syms: &SymbolTable,
|
||||
) -> Result<()> {
|
||||
let (mut t_ctx, init_label, start_offset) = prologue(
|
||||
program,
|
||||
syms,
|
||||
plan.table_references.len(),
|
||||
plan.returning.as_ref().map_or(0, |r| r.len()),
|
||||
)?;
|
||||
|
||||
let after_main_loop_label = program.allocate_label();
|
||||
t_ctx.label_main_loop_end = Some(after_main_loop_label);
|
||||
if plan.contains_constant_false_condition {
|
||||
program.emit_insn(Insn::Goto {
|
||||
target_pc: after_main_loop_label,
|
||||
});
|
||||
}
|
||||
let skip_label = program.allocate_label();
|
||||
init_loop(
|
||||
program,
|
||||
&mut t_ctx,
|
||||
&plan.table_references,
|
||||
OperationMode::UPDATE,
|
||||
)?;
|
||||
open_loop(
|
||||
program,
|
||||
&mut t_ctx,
|
||||
&plan.table_references,
|
||||
&plan.where_clause,
|
||||
)?;
|
||||
emit_update_insns(&plan, &t_ctx, program)?;
|
||||
program.resolve_label(skip_label, program.offset());
|
||||
close_loop(program, &mut t_ctx, &plan.table_references)?;
|
||||
|
||||
program.resolve_label(after_main_loop_label, program.offset());
|
||||
|
||||
// Finalize program
|
||||
epilogue(program, init_label, start_offset, true)?;
|
||||
program.result_columns = plan.returning.unwrap_or_default();
|
||||
program.table_references = plan.table_references;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn emit_update_insns(
|
||||
plan: &UpdatePlan,
|
||||
t_ctx: &TranslateCtx,
|
||||
program: &mut ProgramBuilder,
|
||||
) -> crate::Result<()> {
|
||||
let table_ref = &plan.table_references.first().unwrap();
|
||||
let (cursor_id, index) = match &table_ref.op {
|
||||
Operation::Scan { .. } => (program.resolve_cursor_id(&table_ref.identifier), None),
|
||||
Operation::Search(search) => match search {
|
||||
&Search::RowidEq { .. } | Search::RowidSearch { .. } => {
|
||||
(program.resolve_cursor_id(&table_ref.identifier), None)
|
||||
}
|
||||
Search::IndexSearch { index, .. } => (
|
||||
program.resolve_cursor_id(&table_ref.identifier),
|
||||
Some((index.clone(), program.resolve_cursor_id(&index.name))),
|
||||
),
|
||||
},
|
||||
_ => return Ok(()),
|
||||
};
|
||||
|
||||
for cond in plan.where_clause.iter().filter(|c| c.is_constant()) {
|
||||
let jump_target = program.allocate_label();
|
||||
let meta = ConditionMetadata {
|
||||
jump_if_condition_is_true: false,
|
||||
jump_target_when_true: jump_target,
|
||||
jump_target_when_false: t_ctx.label_main_loop_end.unwrap(),
|
||||
};
|
||||
translate_condition_expr(
|
||||
program,
|
||||
&plan.table_references,
|
||||
&cond.expr,
|
||||
meta,
|
||||
&t_ctx.resolver,
|
||||
)?;
|
||||
program.resolve_label(jump_target, program.offset());
|
||||
}
|
||||
let first_col_reg = program.alloc_registers(table_ref.table.columns().len());
|
||||
let rowid_reg = program.alloc_register();
|
||||
program.emit_insn(Insn::RowId {
|
||||
cursor_id,
|
||||
dest: rowid_reg,
|
||||
});
|
||||
// if no rowid, we're done
|
||||
program.emit_insn(Insn::IsNull {
|
||||
reg: rowid_reg,
|
||||
target_pc: t_ctx.label_main_loop_end.unwrap(),
|
||||
});
|
||||
|
||||
// we scan a column at a time, loading either the column's values, or the new value
|
||||
// from the Set expression, into registers so we can emit a MakeRecord and update the row.
|
||||
for idx in 0..table_ref.columns().len() {
|
||||
if let Some((idx, expr)) = plan.set_clauses.iter().find(|(i, _)| *i == idx) {
|
||||
let target_reg = first_col_reg + idx;
|
||||
translate_expr(
|
||||
program,
|
||||
Some(&plan.table_references),
|
||||
expr,
|
||||
target_reg,
|
||||
&t_ctx.resolver,
|
||||
)?;
|
||||
} else {
|
||||
let table_column = table_ref.table.columns().get(idx).unwrap();
|
||||
let column_idx_in_index = index.as_ref().and_then(|(idx, _)| {
|
||||
idx.columns
|
||||
.iter()
|
||||
.position(|c| Some(&c.name) == table_column.name.as_ref())
|
||||
});
|
||||
program.emit_insn(Insn::Column {
|
||||
cursor_id: *index
|
||||
.as_ref()
|
||||
.and_then(|(_, id)| {
|
||||
if column_idx_in_index.is_some() {
|
||||
Some(id)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.unwrap_or(&cursor_id),
|
||||
column: column_idx_in_index.unwrap_or(idx),
|
||||
dest: first_col_reg + idx,
|
||||
});
|
||||
}
|
||||
}
|
||||
let record_reg = program.alloc_register();
|
||||
program.emit_insn(Insn::MakeRecord {
|
||||
start_reg: first_col_reg,
|
||||
count: table_ref.columns().len(),
|
||||
dest_reg: record_reg,
|
||||
});
|
||||
program.emit_insn(Insn::InsertAsync {
|
||||
cursor: cursor_id,
|
||||
key_reg: rowid_reg,
|
||||
record_reg,
|
||||
flag: 0,
|
||||
});
|
||||
program.emit_insn(Insn::InsertAwait { cursor_id });
|
||||
|
||||
if let Some(limit) = plan.limit {
|
||||
let limit_reg = program.alloc_register();
|
||||
program.emit_insn(Insn::Integer {
|
||||
value: limit as i64,
|
||||
dest: limit_reg,
|
||||
});
|
||||
program.mark_last_insn_constant();
|
||||
program.emit_insn(Insn::DecrJumpZero {
|
||||
reg: limit_reg,
|
||||
target_pc: t_ctx.label_main_loop_end.unwrap(),
|
||||
})
|
||||
}
|
||||
// TODO(pthorpe): handle RETURNING clause
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -59,7 +59,7 @@ pub fn init_loop(
|
||||
program: &mut ProgramBuilder,
|
||||
t_ctx: &mut TranslateCtx,
|
||||
tables: &[TableReference],
|
||||
mode: &OperationMode,
|
||||
mode: OperationMode,
|
||||
) -> Result<()> {
|
||||
assert!(
|
||||
t_ctx.meta_left_joins.len() == tables.len(),
|
||||
@@ -90,16 +90,24 @@ pub fn init_loop(
|
||||
},
|
||||
);
|
||||
match (mode, &table.table) {
|
||||
(OperationMode::SELECT, Table::BTree(_)) => {
|
||||
let root_page = table.btree().unwrap().root_page;
|
||||
(OperationMode::SELECT, Table::BTree(btree)) => {
|
||||
let root_page = btree.root_page;
|
||||
program.emit_insn(Insn::OpenReadAsync {
|
||||
cursor_id,
|
||||
root_page,
|
||||
});
|
||||
program.emit_insn(Insn::OpenReadAwait {});
|
||||
}
|
||||
(OperationMode::DELETE, Table::BTree(_)) => {
|
||||
let root_page = table.btree().unwrap().root_page;
|
||||
(OperationMode::DELETE, Table::BTree(btree)) => {
|
||||
let root_page = btree.root_page;
|
||||
program.emit_insn(Insn::OpenWriteAsync {
|
||||
cursor_id,
|
||||
root_page,
|
||||
});
|
||||
program.emit_insn(Insn::OpenWriteAwait {});
|
||||
}
|
||||
(OperationMode::UPDATE, Table::BTree(btree)) => {
|
||||
let root_page = btree.root_page;
|
||||
program.emit_insn(Insn::OpenWriteAsync {
|
||||
cursor_id,
|
||||
root_page,
|
||||
@@ -140,6 +148,13 @@ pub fn init_loop(
|
||||
});
|
||||
program.emit_insn(Insn::OpenWriteAwait {});
|
||||
}
|
||||
OperationMode::UPDATE => {
|
||||
program.emit_insn(Insn::OpenWriteAsync {
|
||||
cursor_id: table_cursor_id,
|
||||
root_page: table.table.get_root_page(),
|
||||
});
|
||||
program.emit_insn(Insn::OpenWriteAwait {});
|
||||
}
|
||||
_ => {
|
||||
unimplemented!()
|
||||
}
|
||||
@@ -166,6 +181,13 @@ pub fn init_loop(
|
||||
});
|
||||
program.emit_insn(Insn::OpenWriteAwait {});
|
||||
}
|
||||
OperationMode::UPDATE => {
|
||||
program.emit_insn(Insn::OpenWriteAsync {
|
||||
cursor_id: index_cursor_id,
|
||||
root_page: index.root_page,
|
||||
});
|
||||
program.emit_insn(Insn::OpenWriteAwait {});
|
||||
}
|
||||
_ => {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
@@ -9,13 +9,14 @@ use crate::{
|
||||
|
||||
use super::plan::{
|
||||
DeletePlan, Direction, IterationDirection, Operation, Plan, Search, SelectPlan, TableReference,
|
||||
WhereTerm,
|
||||
UpdatePlan, WhereTerm,
|
||||
};
|
||||
|
||||
pub fn optimize_plan(plan: &mut Plan, schema: &Schema) -> Result<()> {
|
||||
match plan {
|
||||
Plan::Select(plan) => optimize_select_plan(plan, schema),
|
||||
Plan::Delete(plan) => optimize_delete_plan(plan, schema),
|
||||
Plan::Update(plan) => optimize_update_plan(plan, schema),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -63,6 +64,22 @@ fn optimize_delete_plan(plan: &mut DeletePlan, schema: &Schema) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn optimize_update_plan(plan: &mut UpdatePlan, schema: &Schema) -> Result<()> {
|
||||
rewrite_exprs_update(plan)?;
|
||||
if let ConstantConditionEliminationResult::ImpossibleCondition =
|
||||
eliminate_constant_conditions(&mut plan.where_clause)?
|
||||
{
|
||||
plan.contains_constant_false_condition = true;
|
||||
return Ok(());
|
||||
}
|
||||
use_indexes(
|
||||
&mut plan.table_references,
|
||||
&schema.indexes,
|
||||
&mut plan.where_clause,
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn optimize_subqueries(plan: &mut SelectPlan, schema: &Schema) -> Result<()> {
|
||||
for table in plan.table_references.iter_mut() {
|
||||
if let Operation::Subquery { plan, .. } = &mut table.op {
|
||||
@@ -89,7 +106,7 @@ fn query_is_already_ordered_by(
|
||||
Search::RowidEq { .. } => Ok(key.is_rowid_alias_of(0)),
|
||||
Search::RowidSearch { .. } => Ok(key.is_rowid_alias_of(0)),
|
||||
Search::IndexSearch { index, .. } => {
|
||||
let index_rc = key.check_index_scan(0, &table_reference, available_indexes)?;
|
||||
let index_rc = key.check_index_scan(0, table_reference, available_indexes)?;
|
||||
let index_is_the_same = index_rc
|
||||
.map(|irc| Arc::ptr_eq(index, &irc))
|
||||
.unwrap_or(false);
|
||||
@@ -104,7 +121,7 @@ fn eliminate_unnecessary_orderby(plan: &mut SelectPlan, schema: &Schema) -> Resu
|
||||
if plan.order_by.is_none() {
|
||||
return Ok(());
|
||||
}
|
||||
if plan.table_references.len() == 0 {
|
||||
if plan.table_references.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
@@ -154,7 +171,7 @@ fn use_indexes(
|
||||
if let Some(index_search) = try_extract_index_search_expression(
|
||||
cond,
|
||||
table_index,
|
||||
&table_reference,
|
||||
table_reference,
|
||||
available_indexes,
|
||||
)? {
|
||||
where_clause.remove(i);
|
||||
@@ -249,6 +266,26 @@ fn rewrite_exprs_delete(plan: &mut DeletePlan) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn rewrite_exprs_update(plan: &mut UpdatePlan) -> Result<()> {
|
||||
if let Some(rc) = plan.returning.as_mut() {
|
||||
for rc in rc.iter_mut() {
|
||||
rewrite_expr(&mut rc.expr)?;
|
||||
}
|
||||
}
|
||||
for (_, expr) in plan.set_clauses.iter_mut() {
|
||||
rewrite_expr(expr)?;
|
||||
}
|
||||
for cond in plan.where_clause.iter_mut() {
|
||||
rewrite_expr(&mut cond.expr)?;
|
||||
}
|
||||
if let Some(order_by) = &mut plan.order_by {
|
||||
for (expr, _) in order_by.iter_mut() {
|
||||
rewrite_expr(expr)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum ConstantPredicate {
|
||||
AlwaysTrue,
|
||||
|
||||
@@ -7,16 +7,13 @@ use std::{
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use crate::schema::{PseudoTable, Type};
|
||||
use crate::{
|
||||
function::AggFunc,
|
||||
schema::{BTreeTable, Column, Index, Table},
|
||||
vdbe::BranchOffset,
|
||||
VirtualTable,
|
||||
};
|
||||
use crate::{
|
||||
schema::{PseudoTable, Type},
|
||||
translate::plan::Plan::{Delete, Select},
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ResultSetColumn {
|
||||
@@ -112,6 +109,7 @@ impl Ord for EvalAt {
|
||||
pub enum Plan {
|
||||
Select(SelectPlan),
|
||||
Delete(DeletePlan),
|
||||
Update(UpdatePlan),
|
||||
}
|
||||
|
||||
/// The type of the query, either top level or subquery
|
||||
@@ -170,6 +168,23 @@ pub struct DeletePlan {
|
||||
pub contains_constant_false_condition: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct UpdatePlan {
|
||||
// list of table references, table being updated is always first
|
||||
pub table_references: Vec<TableReference>,
|
||||
// which columns are being updated and what they are being set to
|
||||
pub set_clauses: Vec<(usize, ast::Expr)>, // (column_index, expression)
|
||||
// where clause split into a vec at 'AND' boundaries.
|
||||
pub where_clause: Vec<WhereTerm>,
|
||||
pub order_by: Option<Vec<(ast::Expr, Direction)>>,
|
||||
// TODO: support optional LIMIT
|
||||
pub limit: Option<isize>,
|
||||
// optional RETURNING clause
|
||||
pub returning: Option<Vec<ResultSetColumn>>,
|
||||
// whether the WHERE clause is always false
|
||||
pub contains_constant_false_condition: bool,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||
pub enum IterationDirection {
|
||||
Forwards,
|
||||
@@ -370,8 +385,9 @@ impl Display for Aggregate {
|
||||
impl Display for Plan {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
Select(select_plan) => select_plan.fmt(f),
|
||||
Delete(delete_plan) => delete_plan.fmt(f),
|
||||
Self::Select(select_plan) => select_plan.fmt(f),
|
||||
Self::Delete(delete_plan) => delete_plan.fmt(f),
|
||||
Self::Update(update_plan) => update_plan.fmt(f),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -461,3 +477,77 @@ impl Display for DeletePlan {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for UpdatePlan {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
writeln!(f, "QUERY PLAN")?;
|
||||
|
||||
for (i, reference) in self.table_references.iter().enumerate() {
|
||||
let is_last = i == self.table_references.len() - 1;
|
||||
let indent = if i == 0 {
|
||||
if is_last { "`--" } else { "|--" }.to_string()
|
||||
} else {
|
||||
format!(
|
||||
" {}{}",
|
||||
"| ".repeat(i - 1),
|
||||
if is_last { "`--" } else { "|--" }
|
||||
)
|
||||
};
|
||||
|
||||
match &reference.op {
|
||||
Operation::Scan { .. } => {
|
||||
let table_name = if reference.table.get_name() == reference.identifier {
|
||||
reference.identifier.clone()
|
||||
} else {
|
||||
format!("{} AS {}", reference.table.get_name(), reference.identifier)
|
||||
};
|
||||
|
||||
if i == 0 {
|
||||
writeln!(f, "{}UPDATE {}", indent, table_name)?;
|
||||
} else {
|
||||
writeln!(f, "{}SCAN {}", indent, table_name)?;
|
||||
}
|
||||
}
|
||||
Operation::Search(search) => match search {
|
||||
Search::RowidEq { .. } | Search::RowidSearch { .. } => {
|
||||
writeln!(
|
||||
f,
|
||||
"{}SEARCH {} USING INTEGER PRIMARY KEY (rowid=?)",
|
||||
indent, reference.identifier
|
||||
)?;
|
||||
}
|
||||
Search::IndexSearch { index, .. } => {
|
||||
writeln!(
|
||||
f,
|
||||
"{}SEARCH {} USING INDEX {}",
|
||||
indent, reference.identifier, index.name
|
||||
)?;
|
||||
}
|
||||
},
|
||||
Operation::Subquery { plan, .. } => {
|
||||
writeln!(f, "{}SUBQUERY {}", indent, reference.identifier)?;
|
||||
for line in format!("{}", plan).lines() {
|
||||
writeln!(f, "{} {}", indent, line)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if let Some(order_by) = &self.order_by {
|
||||
writeln!(f, "ORDER BY:")?;
|
||||
for (expr, dir) in order_by {
|
||||
writeln!(f, " - {} {}", expr, dir)?;
|
||||
}
|
||||
}
|
||||
if let Some(limit) = self.limit {
|
||||
writeln!(f, "LIMIT: {}", limit)?;
|
||||
}
|
||||
if let Some(ret) = &self.returning {
|
||||
writeln!(f, "RETURNING:")?;
|
||||
for col in ret {
|
||||
writeln!(f, " - {}", col.expr)?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,20 +3,17 @@ use crate::{
|
||||
bail_parse_error,
|
||||
schema::{Schema, Table},
|
||||
util::normalize_ident,
|
||||
vdbe::{
|
||||
builder::{CursorType, ProgramBuilder, ProgramBuilderOpts, QueryMode},
|
||||
insn::Insn,
|
||||
},
|
||||
vdbe::builder::{ProgramBuilder, ProgramBuilderOpts, QueryMode},
|
||||
SymbolTable,
|
||||
};
|
||||
use limbo_sqlite3_parser::ast::Update;
|
||||
use limbo_sqlite3_parser::ast::{self, Expr, ResultColumn, SortOrder, Update};
|
||||
|
||||
use super::planner::bind_column_references;
|
||||
use super::{
|
||||
emitter::Resolver,
|
||||
expr::{translate_condition_expr, translate_expr, ConditionMetadata},
|
||||
plan::TableReference,
|
||||
use super::emitter::emit_program;
|
||||
use super::optimizer::optimize_plan;
|
||||
use super::plan::{
|
||||
Direction, IterationDirection, Plan, ResultSetColumn, TableReference, UpdatePlan,
|
||||
};
|
||||
use super::planner::{bind_column_references, parse_limit, parse_where};
|
||||
|
||||
/*
|
||||
* Update is simple. By default we scan the table, and for each row, we check the WHERE
|
||||
@@ -53,6 +50,8 @@ pub fn translate_update(
|
||||
body: &mut Update,
|
||||
syms: &SymbolTable,
|
||||
) -> crate::Result<ProgramBuilder> {
|
||||
let mut plan = prepare_update_plan(schema, body)?;
|
||||
optimize_plan(&mut plan, schema)?;
|
||||
// TODO: freestyling these numbers
|
||||
let mut program = ProgramBuilder::new(ProgramBuilderOpts {
|
||||
query_mode,
|
||||
@@ -60,157 +59,114 @@ pub fn translate_update(
|
||||
approx_num_insns: 20,
|
||||
approx_num_labels: 4,
|
||||
});
|
||||
emit_program(&mut program, plan, syms)?;
|
||||
Ok(program)
|
||||
}
|
||||
|
||||
if body.with.is_some() {
|
||||
bail_parse_error!("WITH clause is not supported");
|
||||
}
|
||||
if body.or_conflict.is_some() {
|
||||
bail_parse_error!("ON CONFLICT clause is not supported");
|
||||
}
|
||||
pub fn prepare_update_plan(schema: &Schema, body: &mut Update) -> crate::Result<Plan> {
|
||||
let table_name = &body.tbl_name.name;
|
||||
let table = match schema.get_table(table_name.0.as_str()) {
|
||||
Some(table) => table,
|
||||
None => bail_parse_error!("Parse error: no such table: {}", table_name),
|
||||
};
|
||||
if let Table::Virtual(_) = table.as_ref() {
|
||||
bail_parse_error!("vtable update not yet supported");
|
||||
}
|
||||
let resolver = Resolver::new(syms);
|
||||
|
||||
let init_label = program.allocate_label();
|
||||
program.emit_insn(Insn::Init {
|
||||
target_pc: init_label,
|
||||
});
|
||||
let start_offset = program.offset();
|
||||
let Some(btree_table) = table.btree() else {
|
||||
crate::bail_corrupt_error!("Parse error: no such table: {}", table_name);
|
||||
bail_parse_error!("Parse error: no such table: {}", table_name);
|
||||
};
|
||||
let cursor_id = program.alloc_cursor_id(
|
||||
Some(table_name.0.clone()),
|
||||
CursorType::BTreeTable(btree_table.clone()),
|
||||
);
|
||||
let root_page = btree_table.root_page;
|
||||
program.emit_insn(Insn::OpenWriteAsync {
|
||||
cursor_id,
|
||||
root_page,
|
||||
});
|
||||
program.emit_insn(Insn::OpenWriteAwait {});
|
||||
|
||||
let end_label = program.allocate_label();
|
||||
program.emit_insn(Insn::RewindAsync { cursor_id });
|
||||
program.emit_insn(Insn::RewindAwait {
|
||||
cursor_id,
|
||||
pc_if_empty: end_label,
|
||||
});
|
||||
let first_col_reg = program.alloc_registers(btree_table.columns.len());
|
||||
let referenced_tables = vec![TableReference {
|
||||
let mut iter_dir = None;
|
||||
if let Some(order_by) = body.order_by.as_ref() {
|
||||
if !order_by.is_empty() {
|
||||
if let Some(order) = order_by.first().unwrap().order {
|
||||
iter_dir = Some(match order {
|
||||
SortOrder::Asc => IterationDirection::Forwards,
|
||||
SortOrder::Desc => IterationDirection::Backwards,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
let table_references = vec![TableReference {
|
||||
table: Table::BTree(btree_table.clone()),
|
||||
identifier: table_name.0.clone(),
|
||||
op: Operation::Scan { iter_dir: None },
|
||||
op: Operation::Scan { iter_dir },
|
||||
join_info: None,
|
||||
}];
|
||||
let set_clauses = body
|
||||
.sets
|
||||
.iter_mut()
|
||||
.map(|set| {
|
||||
let ident = normalize_ident(set.col_names[0].0.as_str());
|
||||
let col_index = btree_table
|
||||
.columns
|
||||
.iter()
|
||||
.enumerate()
|
||||
.find(|(_, col)| {
|
||||
col.name
|
||||
.as_ref()
|
||||
.unwrap_or(&String::new())
|
||||
.eq_ignore_ascii_case(&ident)
|
||||
})
|
||||
.map(|(i, _)| i)
|
||||
.unwrap();
|
||||
let _ = bind_column_references(&mut set.expr, &table_references, None);
|
||||
(col_index, set.expr.clone())
|
||||
})
|
||||
.collect::<Vec<(usize, Expr)>>();
|
||||
|
||||
// store the (col_index, Expr value) of each 'Set'
|
||||
// if a column declared here isn't found: error
|
||||
let mut update_idxs = Vec::with_capacity(body.sets.len());
|
||||
for s in body.sets.iter_mut() {
|
||||
let ident = normalize_ident(s.col_names[0].0.as_str());
|
||||
if let Some((i, _)) = btree_table.columns.iter().enumerate().find(|(_, col)| {
|
||||
col.name
|
||||
.as_ref()
|
||||
.unwrap_or(&String::new())
|
||||
.eq_ignore_ascii_case(&ident)
|
||||
}) {
|
||||
bind_column_references(&mut s.expr, &referenced_tables, None)?;
|
||||
update_idxs.push((i, &s.expr));
|
||||
} else {
|
||||
bail_parse_error!("column {} not found", ident);
|
||||
let mut where_clause = vec![];
|
||||
let mut result_columns = vec![];
|
||||
if let Some(returning) = &mut body.returning {
|
||||
for rc in returning.iter_mut() {
|
||||
if let ResultColumn::Expr(expr, alias) = rc {
|
||||
bind_column_references(expr, &table_references, None)?;
|
||||
result_columns.push(ResultSetColumn {
|
||||
expr: expr.clone(),
|
||||
alias: alias.as_ref().and_then(|a| {
|
||||
if let ast::As::As(name) = a {
|
||||
Some(name.to_string())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}),
|
||||
contains_aggregates: false,
|
||||
});
|
||||
} else {
|
||||
bail_parse_error!("Only expressions are allowed in RETURNING clause");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let loop_start = program.offset();
|
||||
let skip_label = program.allocate_label();
|
||||
if let Some(where_clause) = body.where_clause.as_mut() {
|
||||
bind_column_references(where_clause, &referenced_tables, None)?;
|
||||
translate_condition_expr(
|
||||
&mut program,
|
||||
&referenced_tables,
|
||||
where_clause,
|
||||
ConditionMetadata {
|
||||
jump_if_condition_is_true: false,
|
||||
jump_target_when_true: crate::vdbe::BranchOffset::Placeholder,
|
||||
jump_target_when_false: skip_label,
|
||||
},
|
||||
&resolver,
|
||||
)?;
|
||||
}
|
||||
let rowid_reg = program.alloc_register();
|
||||
program.emit_insn(Insn::RowId {
|
||||
cursor_id,
|
||||
dest: rowid_reg,
|
||||
let order_by = body.order_by.as_ref().map(|order| {
|
||||
order
|
||||
.iter()
|
||||
.map(|o| {
|
||||
(
|
||||
o.expr.clone(),
|
||||
o.order
|
||||
.map(|s| match s {
|
||||
SortOrder::Asc => Direction::Ascending,
|
||||
SortOrder::Desc => Direction::Descending,
|
||||
})
|
||||
.unwrap_or(Direction::Ascending),
|
||||
)
|
||||
})
|
||||
.collect()
|
||||
});
|
||||
// if no rowid, we're done
|
||||
program.emit_insn(Insn::IsNull {
|
||||
reg: rowid_reg,
|
||||
target_pc: end_label,
|
||||
});
|
||||
|
||||
// we scan a column at a time, loading either the column's values, or the new value
|
||||
// from the Set expression, into registers so we can emit a MakeRecord and update the row.
|
||||
for idx in 0..btree_table.columns.len() {
|
||||
if let Some((idx, expr)) = update_idxs.iter().find(|(i, _)| *i == idx) {
|
||||
let target_reg = first_col_reg + idx;
|
||||
translate_expr(
|
||||
&mut program,
|
||||
Some(&referenced_tables),
|
||||
expr,
|
||||
target_reg,
|
||||
&resolver,
|
||||
)?;
|
||||
} else {
|
||||
program.emit_insn(Insn::Column {
|
||||
cursor_id,
|
||||
column: idx,
|
||||
dest: first_col_reg + idx,
|
||||
});
|
||||
}
|
||||
}
|
||||
let record_reg = program.alloc_register();
|
||||
program.emit_insn(Insn::MakeRecord {
|
||||
start_reg: first_col_reg,
|
||||
count: btree_table.columns.len(),
|
||||
dest_reg: record_reg,
|
||||
});
|
||||
|
||||
program.emit_insn(Insn::InsertAsync {
|
||||
cursor: cursor_id,
|
||||
key_reg: rowid_reg,
|
||||
record_reg,
|
||||
flag: 0,
|
||||
});
|
||||
program.emit_insn(Insn::InsertAwait { cursor_id });
|
||||
|
||||
// label for false `WHERE` clause: proceed to next row
|
||||
program.resolve_label(skip_label, program.offset());
|
||||
program.emit_insn(Insn::NextAsync { cursor_id });
|
||||
program.emit_insn(Insn::NextAwait {
|
||||
cursor_id,
|
||||
pc_if_next: loop_start,
|
||||
});
|
||||
|
||||
// cleanup/halt
|
||||
program.resolve_label(end_label, program.offset());
|
||||
program.emit_insn(Insn::Halt {
|
||||
err_code: 0,
|
||||
description: String::new(),
|
||||
});
|
||||
program.resolve_label(init_label, program.offset());
|
||||
program.emit_insn(Insn::Transaction { write: true });
|
||||
|
||||
program.emit_constant_insns();
|
||||
program.emit_insn(Insn::Goto {
|
||||
target_pc: start_offset,
|
||||
});
|
||||
program.table_references = referenced_tables.clone();
|
||||
Ok(program)
|
||||
parse_where(
|
||||
body.where_clause.as_ref().map(|w| *w.clone()),
|
||||
&table_references,
|
||||
Some(&result_columns),
|
||||
&mut where_clause,
|
||||
)?;
|
||||
let limit = if let Some(Ok((_, limit))) = body.limit.as_ref().map(|l| parse_limit(*l.clone())) {
|
||||
limit
|
||||
} else {
|
||||
None
|
||||
};
|
||||
Ok(Plan::Update(UpdatePlan {
|
||||
table_references,
|
||||
set_clauses,
|
||||
where_clause,
|
||||
returning: Some(result_columns),
|
||||
order_by,
|
||||
limit,
|
||||
contains_constant_false_condition: false,
|
||||
}))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user