support using a INSERT SELECT that references the same table in both statements

This commit is contained in:
pedrocarlo
2025-05-25 00:02:12 -03:00
parent 90e3c8483d
commit e3fd1e589e
3 changed files with 251 additions and 57 deletions

View File

@@ -9,6 +9,7 @@ use crate::schema::{IndexColumn, Table};
use crate::util::normalize_ident;
use crate::vdbe::builder::{ProgramBuilderOpts, QueryMode};
use crate::vdbe::insn::{IdxInsertFlags, RegisterOrLiteral};
use crate::vdbe::BranchOffset;
use crate::{
schema::{Column, Schema},
vdbe::{
@@ -24,6 +25,12 @@ use super::optimizer::rewrite_expr;
use super::plan::QueryDestination;
use super::select::translate_select;
struct TempTableCtx {
cursor_id: usize,
loop_start_label: BranchOffset,
loop_end_label: BranchOffset,
}
#[allow(clippy::too_many_arguments)]
pub fn translate_insert(
query_mode: QueryMode,
@@ -79,26 +86,6 @@ pub fn translate_insert(
crate::bail_parse_error!("INSERT into WITHOUT ROWID table is not supported");
}
let cursor_id = program.alloc_cursor_id(
Some(table_name.0.clone()),
CursorType::BTreeTable(btree_table.clone()),
);
// allocate cursor id's for each btree index cursor we'll need to populate the indexes
// (idx name, root_page, idx cursor id)
let idx_cursors = schema
.get_indices(&table_name.0)
.iter()
.map(|idx| {
(
&idx.name,
idx.root_page,
program.alloc_cursor_id(
Some(table_name.0.clone()),
CursorType::BTreeIndex(idx.clone()),
),
)
})
.collect::<Vec<(&String, usize, usize)>>();
let root_page = btree_table.root_page;
let mut values: Option<Vec<Expr>> = None;
@@ -125,13 +112,20 @@ pub fn translate_insert(
let loop_start_label = program.allocate_label();
let mut yield_reg_opt = None;
let num_values = match body {
let mut temp_table_ctx = None;
let (num_values, cursor_id) = match body {
// TODO: upsert
InsertBody::Select(select, _) => {
// Simple Common case of INSERT INTO <table> VALUES (...)
if matches!(select.body.select.as_ref(), OneSelect::Values(values) if values.len() <= 1)
{
values.as_ref().unwrap().len()
(
values.as_ref().unwrap().len(),
program.alloc_cursor_id(
Some(table_name.0.clone()),
CursorType::BTreeTable(btree_table.clone()),
),
)
} else {
// Multiple rows - use coroutine for value population
let yield_reg = program.alloc_register();
@@ -164,28 +158,132 @@ pub fn translate_insert(
program.emit_insn(Insn::EndCoroutine { yield_reg });
program.preassign_label_to_next_insn(jump_on_definition_label);
program.emit_insn(Insn::OpenWrite {
cursor_id,
root_page: RegisterOrLiteral::Literal(root_page),
name: table_name.0.clone(),
});
// Have to allocate the cursor here to avoid having `init_loop` inside `translate_select` selecting the incorrect
// cursor_id
let cursor_id = program.alloc_cursor_id(
Some(table_name.0.clone()),
CursorType::BTreeTable(btree_table.clone()),
);
// Main loop
// FIXME: rollback is not implemented. E.g. if you insert 2 rows and one fails to unique constraint violation,
// the other row will still be inserted.
program.resolve_label(loop_start_label, program.offset());
program.emit_insn(Insn::Yield {
yield_reg,
end_offset: halt_label,
});
// From SQLite
/* Set useTempTable to TRUE if the result of the SELECT statement
** should be written into a temporary table (template 4). Set to
** FALSE if each output row of the SELECT can be written directly into
** the destination table (template 3).
**
** A temp table must be used if the table being updated is also one
** of the tables being read by the SELECT statement. Also use a
** temp table in the case of row triggers.
*/
if program.is_table_open(&table, schema) {
let temp_cursor_id = program.alloc_cursor_id(
Some("temp table".to_string()),
CursorType::BTreeTable(btree_table.clone()),
);
temp_table_ctx = Some(TempTableCtx {
cursor_id: temp_cursor_id,
loop_start_label: program.allocate_label(),
loop_end_label: program.allocate_label(),
});
program.emit_insn(Insn::OpenEphemeral {
cursor_id: temp_cursor_id,
is_table: true,
});
// Main loop
// FIXME: rollback is not implemented. E.g. if you insert 2 rows and one fails to unique constraint violation,
// the other row will still be inserted.
program.preassign_label_to_next_insn(loop_start_label);
let yield_label = program.allocate_label();
program.emit_insn(Insn::Yield {
yield_reg,
end_offset: yield_label,
});
let record_reg = program.alloc_register();
program.emit_insn(Insn::MakeRecord {
start_reg: yield_reg + 1,
count: result.num_result_cols,
dest_reg: record_reg,
index_name: None,
});
let rowid_reg = program.alloc_register();
program.emit_insn(Insn::NewRowid {
cursor: temp_cursor_id,
rowid_reg,
prev_largest_reg: 0,
});
program.emit_insn(Insn::Insert {
cursor: temp_cursor_id,
key_reg: rowid_reg,
record_reg,
flag: 0,
table_name: "".to_string(),
});
// loop back
program.emit_insn(Insn::Goto {
target_pc: loop_start_label,
});
program.preassign_label_to_next_insn(yield_label);
program.emit_insn(Insn::OpenWrite {
cursor_id,
root_page: RegisterOrLiteral::Literal(root_page),
name: table_name.0.clone(),
});
} else {
program.emit_insn(Insn::OpenWrite {
cursor_id,
root_page: RegisterOrLiteral::Literal(root_page),
name: table_name.0.clone(),
});
// Main loop
// FIXME: rollback is not implemented. E.g. if you insert 2 rows and one fails to unique constraint violation,
// the other row will still be inserted.
program.preassign_label_to_next_insn(loop_start_label);
program.emit_insn(Insn::Yield {
yield_reg,
end_offset: halt_label,
});
}
yield_reg_opt = Some(yield_reg);
result.num_result_cols
(result.num_result_cols, cursor_id)
}
}
InsertBody::DefaultValues => 0,
InsertBody::DefaultValues => (
0,
program.alloc_cursor_id(
Some(table_name.0.clone()),
CursorType::BTreeTable(btree_table.clone()),
),
),
};
// allocate cursor id's for each btree index cursor we'll need to populate the indexes
// (idx name, root_page, idx cursor id)
let idx_cursors = schema
.get_indices(&table_name.0)
.iter()
.map(|idx| {
(
&idx.name,
idx.root_page,
program.alloc_cursor_id(
Some(table_name.0.clone()),
CursorType::BTreeIndex(idx.clone()),
),
)
})
.collect::<Vec<(&String, usize, usize)>>();
let column_mappings = resolve_columns_for_insert(&table, &columns, num_values)?;
// Check if rowid was provided (through INTEGER PRIMARY KEY as a rowid alias)
let rowid_alias_index = btree_table.columns.iter().position(|c| c.is_rowid_alias);
@@ -214,12 +312,21 @@ pub fn translate_insert(
let record_register = program.alloc_register();
if inserting_multiple_rows {
if let Some(ref temp_table_ctx) = temp_table_ctx {
// Rewind loop to read from ephemeral table
program.emit_insn(Insn::Rewind {
cursor_id: temp_table_ctx.cursor_id,
pc_if_empty: temp_table_ctx.loop_end_label,
});
program.preassign_label_to_next_insn(temp_table_ctx.loop_start_label);
}
populate_columns_multiple_rows(
&mut program,
&column_mappings,
column_registers_start,
yield_reg_opt.unwrap() + 1,
&resolver,
&temp_table_ctx,
)?;
} else {
// Single row - populate registers directly
@@ -433,10 +540,22 @@ pub fn translate_insert(
});
if inserting_multiple_rows {
// For multiple rows, loop back
program.emit_insn(Insn::Goto {
target_pc: loop_start_label,
});
if let Some(temp_table_ctx) = temp_table_ctx {
program.emit_insn(Insn::Next {
cursor_id: temp_table_ctx.cursor_id,
pc_if_next: temp_table_ctx.loop_start_label,
});
program.preassign_label_to_next_insn(temp_table_ctx.loop_end_label);
program.emit_insn(Insn::Close {
cursor_id: temp_table_ctx.cursor_id,
});
} else {
// For multiple rows which not require a temp table, loop back
program.emit_insn(Insn::Goto {
target_pc: loop_start_label,
});
}
}
program.resolve_label(halt_label, program.offset());
@@ -605,24 +724,33 @@ fn populate_columns_multiple_rows(
column_registers_start: usize,
yield_reg: usize,
resolver: &Resolver,
temp_table_ctx: &Option<TempTableCtx>,
) -> Result<()> {
let mut value_index_seen = 0;
let mut other_values_seen = 0;
for (i, mapping) in column_mappings.iter().enumerate() {
let target_reg = column_registers_start + i;
if let Some(value_index) = mapping.value_index {
program.emit_insn(Insn::Copy {
src_reg: yield_reg + value_index_seen,
dst_reg: column_registers_start + value_index + other_values_seen,
amount: 0,
});
value_index_seen += 1;
continue;
}
other_values_seen += 1;
if mapping.column.is_rowid_alias {
if let Some(value_index) = mapping.value_index {
// Decrement as we have now seen a value index instead
other_values_seen -= 1;
if let Some(temp_table_ctx) = temp_table_ctx {
program.emit_insn(Insn::Column {
cursor_id: temp_table_ctx.cursor_id,
column: value_index_seen,
dest: column_registers_start + value_index_seen,
});
} else {
program.emit_insn(Insn::Copy {
src_reg: yield_reg + value_index_seen,
dst_reg: column_registers_start + value_index + other_values_seen,
amount: 0,
});
}
value_index_seen += 1;
} else if mapping.column.is_rowid_alias {
program.emit_insn(Insn::SoftNull { reg: target_reg });
} else if let Some(default_expr) = mapping.default_value {
translate_expr(program, None, default_expr, target_reg, resolver)?;
@@ -662,8 +790,7 @@ fn populate_column_registers(
// Column has a value in the VALUES tuple
if let Some(value_index) = mapping.value_index {
// When inserting a single row, SQLite writes the value provided for the rowid alias column (INTEGER PRIMARY KEY)
// directly into the rowid register and writes a NULL into the rowid alias column. Not sure why this only happens
// in the single row case, but let's copy it.
// directly into the rowid register and writes a NULL into the rowid alias column.
let write_directly_to_rowid_reg = mapping.column.is_rowid_alias;
let reg = if write_directly_to_rowid_reg {
rowid_reg

View File

@@ -10,7 +10,7 @@ use limbo_sqlite3_parser::ast::{self, TableInternalId};
use crate::{
fast_lock::SpinLock,
parameters::Parameters,
schema::{BTreeTable, Index, PseudoTable},
schema::{BTreeTable, Index, PseudoTable, Schema, Table},
storage::sqlite3_ondisk::DatabaseHeader,
translate::{
collate::CollationSeq,
@@ -37,7 +37,10 @@ impl TableRefIdCounter {
}
}
use super::{BranchOffset, CursorID, Insn, InsnFunction, InsnReference, JumpTarget, Program};
use super::{
insn::RegisterOrLiteral, BranchOffset, CursorID, Insn, InsnFunction, InsnReference, JumpTarget,
Program,
};
#[allow(dead_code)]
pub struct ProgramBuilder {
pub table_reference_counter: TableRefIdCounter,
@@ -682,6 +685,66 @@ impl ProgramBuilder {
}
}
/// Checks whether `table` or any of its indices has been opened in the program
pub fn is_table_open(&self, table: &Table, schema: &Schema) -> bool {
let btree = table.btree();
let vtab = table.virtual_table();
for (insn, ..) in self.insns.iter() {
match insn {
Insn::OpenRead {
cursor_id,
root_page,
..
} => {
if let Some(btree) = &btree {
if btree.root_page == *root_page {
return true;
}
}
let name = self.cursor_ref[*cursor_id].0.as_ref();
if name.is_none() {
continue;
}
let name = name.unwrap();
let indices = schema.get_indices(name);
for index in indices {
if index.root_page == *root_page {
return true;
}
}
}
Insn::OpenWrite {
root_page, name, ..
} => {
let RegisterOrLiteral::Literal(root_page) = root_page else {
unreachable!("root page can only be a literal");
};
if let Some(btree) = &btree {
if btree.root_page == *root_page {
return true;
}
}
let indices = schema.get_indices(name);
for index in indices {
if index.root_page == *root_page {
return true;
}
}
}
Insn::VOpen { cursor_id, .. } => {
if let Some(vtab) = &vtab {
let name = self.cursor_ref[*cursor_id].0.as_ref().unwrap();
if vtab.name == *name {
return true;
}
}
}
_ => {}
}
}
false
}
pub fn build(
mut self,
database_header: Arc<SpinLock<DatabaseHeader>>,

View File

@@ -370,7 +370,11 @@ pub fn insn_to_str(
0,
Value::build_text(""),
0,
"".to_string(),
program.cursor_ref[*cursor_id]
.0
.as_ref()
.unwrap()
.to_string(),
),
Insn::VCreate {
table_name,