Merge 'Refactor INSERT translation to a modular setup with emitter context' from Preston Thorpe

This PR contains NO semantic changes at all, this simply refactors
existing INSERT code to be easier to reason about.
Very sorry I know I've been working on `INSERT OR IGNORE|REPLACE|etc..`
for days now but the insert translation was literally unbearable and I
got it working but I barely could wrap my head around that whole
`translate_insert` function, so I spent a bunch of time refactoring the
whole INSERT handling out into different "Plans"...  which turned into a
whole different clusterf***... So I just went back and made the existing
insert emission more modular and created some context that can make it
easier to reason about.
This should be able to just be merged quickly

Reviewed-by: Jussi Saurio <jussi.saurio@gmail.com>

Closes #3731
This commit is contained in:
Preston Thorpe
2025-10-14 16:10:43 -04:00
committed by GitHub
3 changed files with 974 additions and 750 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -284,17 +284,21 @@ pub fn translate_inner(
columns,
body,
returning,
} => translate_insert(
with,
resolver,
or_conflict,
tbl_name,
columns,
body,
returning,
program,
connection,
)?,
} => {
if with.is_some() {
crate::bail_parse_error!("WITH clause is not supported");
}
translate_insert(
resolver,
or_conflict,
tbl_name,
columns,
body,
returning,
program,
connection,
)?
}
};
// Indicate write operations so that in the epilogue we can emit the correct type of transaction

View File

@@ -8,7 +8,7 @@ use crate::error::SQLITE_CONSTRAINT_PRIMARYKEY;
use crate::schema::ROWID_SENTINEL;
use crate::translate::expr::{walk_expr, WalkControl};
use crate::translate::fkeys::{emit_fk_child_update_counters, emit_parent_pk_change_checks};
use crate::translate::insert::format_unique_violation_desc;
use crate::translate::insert::{format_unique_violation_desc, InsertEmitCtx};
use crate::translate::planner::ROWID_STRS;
use crate::vdbe::insn::CmpInsFlags;
use crate::Connection;
@@ -31,7 +31,6 @@ use crate::{
vdbe::{
builder::ProgramBuilder,
insn::{IdxInsertFlags, InsertFlags, Insn},
BranchOffset,
},
};
@@ -339,35 +338,31 @@ pub fn resolve_upsert_target(
pub fn emit_upsert(
program: &mut ProgramBuilder,
table: &Table,
ctx: &InsertEmitCtx,
insertion: &Insertion,
tbl_cursor_id: usize,
conflict_rowid_reg: usize,
set_pairs: &mut [(usize, Box<ast::Expr>)],
where_clause: &mut Option<Box<ast::Expr>>,
resolver: &Resolver,
idx_cursors: &[(&String, i64, usize)],
returning: &mut [ResultSetColumn],
cdc_cursor_id: Option<usize>,
row_done_label: BranchOffset,
connection: &Arc<Connection>,
) -> crate::Result<()> {
// Seek & snapshot CURRENT
program.emit_insn(Insn::SeekRowid {
cursor_id: tbl_cursor_id,
src_reg: conflict_rowid_reg,
target_pc: row_done_label,
cursor_id: ctx.cursor_id,
src_reg: ctx.conflict_rowid_reg,
target_pc: ctx.row_done_label,
});
let num_cols = table.columns().len();
let num_cols = ctx.table.columns.len();
let current_start = program.alloc_registers(num_cols);
for (i, col) in table.columns().iter().enumerate() {
for (i, col) in ctx.table.columns.iter().enumerate() {
if col.is_rowid_alias {
program.emit_insn(Insn::RowId {
cursor_id: tbl_cursor_id,
cursor_id: ctx.cursor_id,
dest: current_start + i,
});
} else {
program.emit_insn(Insn::Column {
cursor_id: tbl_cursor_id,
cursor_id: ctx.cursor_id,
column: i,
dest: current_start + i,
default: None,
@@ -376,7 +371,7 @@ pub fn emit_upsert(
}
// BEFORE for index maintenance / CDC
let before_start = if cdc_cursor_id.is_some() || !idx_cursors.is_empty() {
let before_start = if ctx.cdc_table.is_some() || !ctx.idx_cursors.is_empty() {
let s = program.alloc_registers(num_cols);
program.emit_insn(Insn::Copy {
src_reg: current_start,
@@ -402,7 +397,7 @@ pub fn emit_upsert(
pred,
table,
current_start,
conflict_rowid_reg,
ctx.conflict_rowid_reg,
Some(table.get_name()),
Some(insertion),
true,
@@ -411,7 +406,7 @@ pub fn emit_upsert(
translate_expr(program, None, pred, pr, resolver)?;
program.emit_insn(Insn::IfNot {
reg: pr,
target_pc: row_done_label,
target_pc: ctx.row_done_label,
jump_if_null: true,
});
}
@@ -423,7 +418,7 @@ pub fn emit_upsert(
expr,
table,
current_start,
conflict_rowid_reg,
ctx.conflict_rowid_reg,
Some(table.get_name()),
Some(insertion),
true,
@@ -480,13 +475,13 @@ pub fn emit_upsert(
};
let rowid_set_clause_reg = if has_user_provided_rowid {
Some(new_rowid_reg.unwrap_or(conflict_rowid_reg))
Some(new_rowid_reg.unwrap_or(ctx.conflict_rowid_reg))
} else {
None
};
if let Some(bt) = table.btree() {
if connection.foreign_keys_enabled() {
let rowid_new_reg = new_rowid_reg.unwrap_or(conflict_rowid_reg);
let rowid_new_reg = new_rowid_reg.unwrap_or(ctx.conflict_rowid_reg);
// Child-side checks
if resolver.schema.has_child_fks(bt.name.as_str()) {
@@ -495,7 +490,7 @@ pub fn emit_upsert(
resolver,
&bt,
table.get_name(),
tbl_cursor_id,
ctx.cursor_id,
new_start,
rowid_new_reg,
&changed_cols,
@@ -505,10 +500,10 @@ pub fn emit_upsert(
program,
resolver,
&bt,
tbl_cursor_id,
conflict_rowid_reg,
ctx.cursor_id,
ctx.conflict_rowid_reg,
new_start,
new_rowid_reg.unwrap_or(conflict_rowid_reg),
new_rowid_reg.unwrap_or(ctx.conflict_rowid_reg),
rowid_set_clause_reg,
set_pairs,
)?;
@@ -517,7 +512,7 @@ pub fn emit_upsert(
// Index rebuild (DELETE old, INSERT new), honoring partial-index WHEREs
if let Some(before) = before_start {
for (idx_name, _root, idx_cid) in idx_cursors {
for (idx_name, _root, idx_cid) in &ctx.idx_cursors {
let idx_meta = resolver
.schema
.get_index(table.get_name(), idx_name)
@@ -533,10 +528,10 @@ pub fn emit_upsert(
table,
idx_meta,
before,
conflict_rowid_reg,
ctx.conflict_rowid_reg,
resolver,
);
let new_rowid = new_rowid_reg.unwrap_or(conflict_rowid_reg);
let new_rowid = new_rowid_reg.unwrap_or(ctx.conflict_rowid_reg);
let new_pred_reg = eval_partial_pred_for_row_image(
program, table, idx_meta, new_start, new_rowid, resolver,
);
@@ -563,7 +558,7 @@ pub fn emit_upsert(
});
}
program.emit_insn(Insn::Copy {
src_reg: conflict_rowid_reg,
src_reg: ctx.conflict_rowid_reg,
dst_reg: del + k,
extra_amount: 0,
});
@@ -694,7 +689,7 @@ pub fn emit_upsert(
// If equal to old rowid, skip uniqueness probe
program.emit_insn(Insn::Eq {
lhs: rnew,
rhs: conflict_rowid_reg,
rhs: ctx.conflict_rowid_reg,
target_pc: ok,
flags: CmpInsFlags::default(),
collation: program.curr_collation(),
@@ -702,7 +697,7 @@ pub fn emit_upsert(
// If another row already has rnew -> constraint
program.emit_insn(Insn::NotExists {
cursor: tbl_cursor_id,
cursor: ctx.cursor_id,
rowid_reg: rnew,
target_pc: ok,
});
@@ -723,11 +718,11 @@ pub fn emit_upsert(
// Now replace the row
program.emit_insn(Insn::Delete {
cursor_id: tbl_cursor_id,
cursor_id: ctx.cursor_id,
table_name: table.get_name().to_string(),
});
program.emit_insn(Insn::Insert {
cursor: tbl_cursor_id,
cursor: ctx.cursor_id,
key_reg: rnew,
record_reg: rec,
flag: InsertFlags::new().require_seek().update_rowid_change(),
@@ -735,8 +730,8 @@ pub fn emit_upsert(
});
} else {
program.emit_insn(Insn::Insert {
cursor: tbl_cursor_id,
key_reg: conflict_rowid_reg,
cursor: ctx.cursor_id,
key_reg: ctx.conflict_rowid_reg,
record_reg: rec,
flag: InsertFlags::new(),
table_name: table.get_name().to_string(),
@@ -744,16 +739,16 @@ pub fn emit_upsert(
}
// emit CDC instructions
if let Some(cdc_id) = cdc_cursor_id {
let new_rowid = new_rowid_reg.unwrap_or(conflict_rowid_reg);
if let Some((cdc_id, _)) = ctx.cdc_table {
let new_rowid = new_rowid_reg.unwrap_or(ctx.conflict_rowid_reg);
if new_rowid_reg.is_some() {
// DELETE (before)
let before_rec = if program.capture_data_changes_mode().has_before() {
Some(emit_cdc_full_record(
program,
table.columns(),
tbl_cursor_id,
conflict_rowid_reg,
ctx.cursor_id,
ctx.conflict_rowid_reg,
))
} else {
None
@@ -763,7 +758,7 @@ pub fn emit_upsert(
resolver,
OperationMode::DELETE,
cdc_id,
conflict_rowid_reg,
ctx.conflict_rowid_reg,
before_rec,
None,
None,
@@ -796,7 +791,7 @@ pub fn emit_upsert(
table,
new_start,
rec,
conflict_rowid_reg,
ctx.conflict_rowid_reg,
))
} else {
None
@@ -805,8 +800,8 @@ pub fn emit_upsert(
Some(emit_cdc_full_record(
program,
table.columns(),
tbl_cursor_id,
conflict_rowid_reg,
ctx.cursor_id,
ctx.conflict_rowid_reg,
))
} else {
None
@@ -816,7 +811,7 @@ pub fn emit_upsert(
resolver,
OperationMode::UPDATE,
cdc_id,
conflict_rowid_reg,
ctx.conflict_rowid_reg,
before_rec,
after_rec,
None,
@@ -828,7 +823,7 @@ pub fn emit_upsert(
// RETURNING from NEW image + final rowid
if !returning.is_empty() {
let regs = ReturningValueRegisters {
rowid_register: new_rowid_reg.unwrap_or(conflict_rowid_reg),
rowid_register: new_rowid_reg.unwrap_or(ctx.conflict_rowid_reg),
columns_start_register: new_start,
num_columns: num_cols,
};
@@ -836,7 +831,7 @@ pub fn emit_upsert(
}
program.emit_insn(Insn::Goto {
target_pc: row_done_label,
target_pc: ctx.row_done_label,
});
Ok(())
}