mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-23 08:55:40 +01:00
Implement INSERT OR REPLACE translation/emission
This commit is contained in:
@@ -9,15 +9,16 @@ use crate::error::{
|
||||
};
|
||||
use crate::schema::{self, BTreeTable, ColDef, Index, ResolvedFkRef, Table};
|
||||
use crate::translate::emitter::{
|
||||
emit_cdc_insns, emit_cdc_patch_record, prepare_cdc_if_necessary, OperationMode,
|
||||
emit_cdc_full_record, emit_cdc_insns, emit_cdc_patch_record, emit_fk_child_decrement_on_delete,
|
||||
prepare_cdc_if_necessary, OperationMode,
|
||||
};
|
||||
use crate::translate::expr::{
|
||||
bind_and_rewrite_expr, emit_returning_results, process_returning_clause, walk_expr_mut,
|
||||
BindingBehavior, WalkControl,
|
||||
};
|
||||
use crate::translate::fkeys::{
|
||||
build_index_affinity_string, emit_fk_violation, emit_guarded_fk_decrement, index_probe,
|
||||
open_read_index, open_read_table,
|
||||
build_index_affinity_string, emit_fk_delete_parent_existence_checks, emit_fk_violation,
|
||||
emit_guarded_fk_decrement, index_probe, open_read_index, open_read_table,
|
||||
};
|
||||
use crate::translate::plan::{
|
||||
ColumnUsedMask, JoinedTable, Operation, ResultSetColumn, TableReferences,
|
||||
@@ -126,7 +127,6 @@ pub struct InsertEmitCtx<'a> {
|
||||
pub key_generation_label: BranchOffset,
|
||||
/// Jump here when the insert value SELECT source has been fully exhausted
|
||||
pub select_exhausted_label: Option<BranchOffset>,
|
||||
|
||||
/// CDC table info
|
||||
pub cdc_table: Option<(usize, Arc<BTreeTable>)>,
|
||||
/// Autoincrement sequence table info
|
||||
@@ -368,9 +368,10 @@ pub fn translate_insert(
|
||||
&insertion,
|
||||
&upsert_actions,
|
||||
&constraints,
|
||||
connection,
|
||||
)?;
|
||||
|
||||
emit_notnulls(&mut program, &ctx, &insertion);
|
||||
emit_notnulls(&mut program, &ctx, &insertion, resolver)?;
|
||||
|
||||
// Create and insert the record
|
||||
let affinity_str = insertion
|
||||
@@ -401,11 +402,15 @@ pub fn translate_insert(
|
||||
)?;
|
||||
}
|
||||
|
||||
let mut insert_flags = InsertFlags::new();
|
||||
if matches!(ctx.on_conflict, ResolveType::Replace) {
|
||||
insert_flags = insert_flags.require_seek();
|
||||
}
|
||||
program.emit_insn(Insn::Insert {
|
||||
cursor: ctx.cursor_id,
|
||||
key_reg: insertion.key_register(),
|
||||
record_reg: insertion.record_register(),
|
||||
flag: InsertFlags::new(),
|
||||
flag: insert_flags,
|
||||
table_name: table_name.to_string(),
|
||||
});
|
||||
|
||||
@@ -876,7 +881,13 @@ fn init_autoincrement(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn emit_notnulls(program: &mut ProgramBuilder, ctx: &InsertEmitCtx, insertion: &Insertion) {
|
||||
fn emit_notnulls(
|
||||
program: &mut ProgramBuilder,
|
||||
ctx: &InsertEmitCtx,
|
||||
insertion: &Insertion,
|
||||
resolver: &Resolver,
|
||||
) -> Result<()> {
|
||||
let on_replace = matches!(ctx.on_conflict, ResolveType::Replace);
|
||||
for column_mapping in insertion
|
||||
.col_mappings
|
||||
.iter()
|
||||
@@ -886,6 +897,32 @@ fn emit_notnulls(program: &mut ProgramBuilder, ctx: &InsertEmitCtx, insertion: &
|
||||
if column_mapping.column.is_rowid_alias() {
|
||||
continue;
|
||||
}
|
||||
if on_replace {
|
||||
if let Some(default_expr) = column_mapping.column.default.as_ref() {
|
||||
// OR REPLACE + NOT NULL + DEFAULT:
|
||||
// if reg IS NOT NULL -> skip
|
||||
// if reg IS NULL -> evaluate DEFAULT into this register
|
||||
let ok = program.allocate_label();
|
||||
|
||||
program.emit_insn(Insn::NotNull {
|
||||
reg: column_mapping.register,
|
||||
target_pc: ok,
|
||||
});
|
||||
|
||||
// At this point, value is NULL: overwrite with default expression
|
||||
translate_expr(
|
||||
program,
|
||||
None,
|
||||
default_expr,
|
||||
column_mapping.register,
|
||||
resolver,
|
||||
)?;
|
||||
|
||||
program.preassign_label_to_next_insn(ok);
|
||||
continue;
|
||||
}
|
||||
// OR REPLACE but no DEFAULT: fall through to ABORT behavior below
|
||||
}
|
||||
program.emit_insn(Insn::HaltIfNull {
|
||||
target_reg: column_mapping.register,
|
||||
err_code: SQLITE_CONSTRAINT_NOTNULL,
|
||||
@@ -913,6 +950,7 @@ fn emit_notnulls(program: &mut ProgramBuilder, ctx: &InsertEmitCtx, insertion: &
|
||||
},
|
||||
});
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
struct BoundInsertResult {
|
||||
@@ -1005,6 +1043,7 @@ fn bind_insert(
|
||||
ResolveType::Abort => {
|
||||
// This is the default conflict resolution strategy for INSERT in SQLite.
|
||||
}
|
||||
ResolveType::Replace => {}
|
||||
_ => {
|
||||
crate::bail_parse_error!(
|
||||
"INSERT OR {} is only supported with UPSERT",
|
||||
@@ -1683,7 +1722,9 @@ fn emit_preflight_constraint_checks(
|
||||
insertion: &Insertion,
|
||||
upsert_actions: &[(ResolvedUpsertTarget, BranchOffset, Box<Upsert>)],
|
||||
constraints: &ConstraintsToCheck,
|
||||
connection: &Arc<Connection>,
|
||||
) -> Result<()> {
|
||||
let on_replace = matches!(ctx.on_conflict, ResolveType::Replace) && upsert_actions.is_empty();
|
||||
for (constraint, position) in &constraints.constraints_to_check {
|
||||
match constraint {
|
||||
ResolvedUpsertTarget::PrimaryKey => {
|
||||
@@ -1698,6 +1739,24 @@ fn emit_preflight_constraint_checks(
|
||||
// Conflict on rowid: attempt to route through UPSERT if it targets the PK, otherwise raise constraint.
|
||||
// emit Halt for every case *except* when upsert handles the conflict
|
||||
'emit_halt: {
|
||||
if on_replace {
|
||||
// copy the conflicting rowid into the key register and delete the existing row inline
|
||||
program.emit_insn(Insn::Copy {
|
||||
src_reg: insertion.key_register(),
|
||||
dst_reg: ctx.conflict_rowid_reg,
|
||||
extra_amount: 0,
|
||||
});
|
||||
emit_replace_delete_conflicting_row(
|
||||
program,
|
||||
resolver,
|
||||
connection,
|
||||
ctx,
|
||||
)?;
|
||||
program.emit_insn(Insn::Goto {
|
||||
target_pc: make_record_label,
|
||||
});
|
||||
break 'emit_halt;
|
||||
}
|
||||
if let Some(position) = position.or(constraints.upsert_catch_all_position) {
|
||||
// PK conflict: the conflicting rowid is exactly the attempted key
|
||||
program.emit_insn(Insn::Copy {
|
||||
@@ -1804,7 +1863,6 @@ fn emit_preflight_constraint_checks(
|
||||
record_reg: idx_start_reg,
|
||||
num_regs: num_cols,
|
||||
});
|
||||
|
||||
// Conflict detected, figure out if this UPSERT handles the conflict
|
||||
if let Some(position) = position.or(constraints.upsert_catch_all_position) {
|
||||
match &upsert_actions[position].2.do_clause {
|
||||
@@ -1847,14 +1905,28 @@ fn emit_preflight_constraint_checks(
|
||||
record_reg: idx_start_reg,
|
||||
num_regs: num_cols,
|
||||
});
|
||||
// Unique violation without ON CONFLICT clause -> error
|
||||
program.emit_insn(Insn::Halt {
|
||||
err_code: SQLITE_CONSTRAINT_UNIQUE,
|
||||
description: format_unique_violation_desc(
|
||||
ctx.table.name.as_str(),
|
||||
index,
|
||||
),
|
||||
});
|
||||
if on_replace {
|
||||
program.emit_insn(Insn::IdxRowId {
|
||||
cursor_id: idx_cursor_id,
|
||||
dest: ctx.conflict_rowid_reg,
|
||||
});
|
||||
emit_replace_delete_conflicting_row(
|
||||
program,
|
||||
resolver,
|
||||
connection,
|
||||
ctx,
|
||||
)?;
|
||||
program.emit_insn(Insn::Goto { target_pc: ok });
|
||||
} else {
|
||||
// Unique violation without ON CONFLICT clause -> error
|
||||
program.emit_insn(Insn::Halt {
|
||||
err_code: SQLITE_CONSTRAINT_UNIQUE,
|
||||
description: format_unique_violation_desc(
|
||||
ctx.table.name.as_str(),
|
||||
index,
|
||||
),
|
||||
});
|
||||
}
|
||||
program.preassign_label_to_next_insn(ok);
|
||||
|
||||
// In the non-UPSERT case, we insert the index
|
||||
@@ -2258,6 +2330,150 @@ fn emit_update_sqlite_sequence(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn emit_replace_delete_conflicting_row(
|
||||
program: &mut ProgramBuilder,
|
||||
resolver: &Resolver,
|
||||
connection: &Arc<Connection>,
|
||||
ctx: &InsertEmitCtx,
|
||||
) -> Result<()> {
|
||||
program.emit_insn(Insn::SeekRowid {
|
||||
cursor_id: ctx.cursor_id,
|
||||
src_reg: ctx.conflict_rowid_reg,
|
||||
target_pc: ctx.halt_label,
|
||||
});
|
||||
|
||||
emit_delete_single_row(
|
||||
connection,
|
||||
program,
|
||||
ctx,
|
||||
resolver,
|
||||
ctx.conflict_rowid_reg,
|
||||
true,
|
||||
)
|
||||
}
|
||||
|
||||
fn emit_delete_single_row(
|
||||
connection: &Arc<Connection>,
|
||||
program: &mut ProgramBuilder,
|
||||
ctx: &InsertEmitCtx,
|
||||
resolver: &Resolver,
|
||||
rowid_reg: usize,
|
||||
is_part_of_update: bool,
|
||||
) -> Result<()> {
|
||||
let table = &ctx.table;
|
||||
let table_name = table.name.as_str();
|
||||
let main_cursor_id = ctx.cursor_id;
|
||||
|
||||
if connection.foreign_keys_enabled() {
|
||||
if resolver.schema.any_resolved_fks_referencing(table_name) {
|
||||
emit_fk_delete_parent_existence_checks(
|
||||
program,
|
||||
resolver,
|
||||
table_name,
|
||||
main_cursor_id,
|
||||
rowid_reg,
|
||||
)?;
|
||||
}
|
||||
if resolver.schema.has_child_fks(table_name) {
|
||||
emit_fk_child_decrement_on_delete(
|
||||
program,
|
||||
resolver,
|
||||
table,
|
||||
table_name,
|
||||
main_cursor_id,
|
||||
rowid_reg,
|
||||
)?;
|
||||
}
|
||||
}
|
||||
for (name, _, index_cursor_id) in ctx.idx_cursors.iter() {
|
||||
let index = resolver
|
||||
.schema
|
||||
.get_index(table_name, name)
|
||||
.expect("index to exist");
|
||||
let skip_delete_label = if index.where_clause.is_some() {
|
||||
let where_copy = index
|
||||
.bind_where_expr(None, connection)
|
||||
.expect("where clause to exist");
|
||||
let skip_label = program.allocate_label();
|
||||
let reg = program.alloc_register();
|
||||
translate_expr_no_constant_opt(
|
||||
program,
|
||||
None,
|
||||
&where_copy,
|
||||
reg,
|
||||
resolver,
|
||||
NoConstantOptReason::RegisterReuse,
|
||||
)?;
|
||||
program.emit_insn(Insn::IfNot {
|
||||
reg,
|
||||
jump_if_null: true,
|
||||
target_pc: skip_label,
|
||||
});
|
||||
Some(skip_label)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let num_regs = index.columns.len() + 1;
|
||||
let start_reg = program.alloc_registers(num_regs);
|
||||
|
||||
for (reg_offset, column_index) in index.columns.iter().enumerate() {
|
||||
program.emit_column_or_rowid(
|
||||
main_cursor_id,
|
||||
column_index.pos_in_table,
|
||||
start_reg + reg_offset,
|
||||
);
|
||||
}
|
||||
program.emit_insn(Insn::Copy {
|
||||
src_reg: rowid_reg,
|
||||
dst_reg: start_reg + num_regs - 1,
|
||||
extra_amount: 0,
|
||||
});
|
||||
program.emit_insn(Insn::IdxDelete {
|
||||
start_reg,
|
||||
num_regs,
|
||||
cursor_id: *index_cursor_id,
|
||||
raise_error_if_no_matching_entry: index.where_clause.is_none(),
|
||||
});
|
||||
|
||||
if let Some(label) = skip_delete_label {
|
||||
program.resolve_label(label, program.offset());
|
||||
}
|
||||
}
|
||||
|
||||
// CDC BEFORE, using rowid_reg
|
||||
if let Some(cdc_cursor_id) = ctx.cdc_table.as_ref().map(|(id, _tbl)| *id) {
|
||||
let cdc_has_before = program.capture_data_changes_mode().has_before();
|
||||
let before_record_reg = if cdc_has_before {
|
||||
Some(emit_cdc_full_record(
|
||||
program,
|
||||
&table.columns,
|
||||
main_cursor_id,
|
||||
rowid_reg,
|
||||
))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
emit_cdc_insns(
|
||||
program,
|
||||
resolver,
|
||||
OperationMode::DELETE,
|
||||
cdc_cursor_id,
|
||||
rowid_reg,
|
||||
before_record_reg,
|
||||
None,
|
||||
None,
|
||||
table_name,
|
||||
)?;
|
||||
}
|
||||
program.emit_insn(Insn::Delete {
|
||||
cursor_id: main_cursor_id,
|
||||
table_name: table_name.to_string(),
|
||||
is_part_of_update,
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Child-side FK checks for INSERT of a single row:
|
||||
/// For each outgoing FK on `child_tbl`, if the NEW tuple's FK columns are all non-NULL,
|
||||
/// verify that the referenced parent key exists.
|
||||
|
||||
@@ -871,4 +871,97 @@ do_execsql_test_on_specific_db {:memory:} insert-999999999 {
|
||||
create table t(a);
|
||||
insert into t(a, a) values (2, 3);
|
||||
select * from t;
|
||||
} {2}
|
||||
} {2}
|
||||
|
||||
do_execsql_test_on_specific_db {:memory:} insert-on-conflict-do-nothing-single-row {
|
||||
CREATE TABLE t(a unique, b text);
|
||||
INSERT INTO t VALUES (2, 'foo'),(3, 'bar');
|
||||
INSERT OR IGNORE INTO t values (2, 'baz');
|
||||
SELECT * FROM t;
|
||||
} {2|foo
|
||||
3|bar}
|
||||
|
||||
do_execsql_test_on_specific_db {:memory:} insert-on-conflict-replace-single-row {
|
||||
CREATE TABLE t(a unique, b text);
|
||||
INSERT INTO t VALUES (2, 'foo'),(3, 'bar');
|
||||
INSERT OR REPLACE INTO t values (2, 'baz');
|
||||
SELECT * FROM t ORDER BY a;
|
||||
} {2|baz
|
||||
3|bar}
|
||||
|
||||
do_execsql_test_on_specific_db {:memory:} insert-on-conflict-do-nothing-multiple-rows {
|
||||
CREATE TABLE t(a unique);
|
||||
INSERT INTO t VALUES (2),(3);
|
||||
INSERT OR IGNORE INTO t values (1),(2),(3);
|
||||
SELECT * FROM t;
|
||||
} {1
|
||||
2
|
||||
3}
|
||||
|
||||
do_execsql_test_on_specific_db {:memory:} onconflict-ignore-existing {
|
||||
CREATE TABLE t(a INTEGER UNIQUE, b TEXT);
|
||||
INSERT INTO t VALUES (1,'x'),(2,'y');
|
||||
INSERT OR IGNORE INTO t VALUES (2,'yy'),(3,'z'),(2,'zzz');
|
||||
SELECT a, b FROM t ORDER BY a;
|
||||
} {1|x
|
||||
2|y
|
||||
3|z}
|
||||
|
||||
do_execsql_test_on_specific_db {:memory:} onconflict-ignore-selfdup-values {
|
||||
CREATE TABLE t(a INTEGER UNIQUE, b TEXT);
|
||||
INSERT OR IGNORE INTO t VALUES (1,'one'),(1,'two'),(1,'three');
|
||||
SELECT a, b FROM t ORDER BY a;
|
||||
} {1|one}
|
||||
|
||||
do_execsql_test_on_specific_db {:memory:} onconflict-ignore-selfdup-values-targeted {
|
||||
CREATE TABLE t(a INTEGER PRIMARY KEY, b INTEGER UNIQUE);
|
||||
INSERT INTO t(a,b) VALUES (100,1);
|
||||
INSERT INTO t(a,b) VALUES (200,1) ON CONFLICT(b) DO NOTHING;
|
||||
SELECT a,b FROM t ORDER BY a;
|
||||
} {100|1}
|
||||
|
||||
do_execsql_test_on_specific_db {:memory:} onconflict-ignore-selfdup-multirow-targeted {
|
||||
CREATE TABLE t(a INTEGER PRIMARY KEY, b INTEGER UNIQUE);
|
||||
INSERT INTO t(a,b) VALUES (1,10),(2,20);
|
||||
INSERT INTO t(a,b) VALUES (3,30),(4,30) ON CONFLICT(b) DO NOTHING;
|
||||
SELECT a,b FROM t ORDER BY a;
|
||||
} {1|10
|
||||
2|20
|
||||
3|30}
|
||||
|
||||
do_execsql_test_on_specific_db {:memory:} onconflict-replace-existing {
|
||||
CREATE TABLE t(a INTEGER UNIQUE, b TEXT);
|
||||
INSERT INTO t VALUES (1,'x'),(2,'y');
|
||||
INSERT OR REPLACE INTO t VALUES (2,'yy'),(3,'z');
|
||||
SELECT a,b FROM t ORDER BY a;
|
||||
} {1|x
|
||||
2|yy
|
||||
3|z}
|
||||
|
||||
do_execsql_test_on_specific_db {:memory:} onconflict-replace-selfdup-values-lastwins {
|
||||
CREATE TABLE t(a INTEGER UNIQUE, b TEXT);
|
||||
INSERT OR REPLACE INTO t VALUES (5,'one'),(5,'two'),(5,'three');
|
||||
SELECT a,b FROM t;
|
||||
} {5|three}
|
||||
|
||||
do_execsql_test_on_specific_db {:memory:} onconflict-replace-existing-then-selfdup-lastwins {
|
||||
CREATE TABLE t(a INTEGER UNIQUE, b TEXT);
|
||||
INSERT INTO t VALUES (5,'orig');
|
||||
INSERT OR REPLACE INTO t VALUES (5,'first'),(5,'second');
|
||||
SELECT a,b FROM t;
|
||||
} {5|second}
|
||||
|
||||
do_execsql_test_on_specific_db {:memory:} onconflict-replace-pk-rowid-alias {
|
||||
CREATE TABLE t(a INTEGER PRIMARY KEY, b TEXT);
|
||||
INSERT OR REPLACE INTO t(a,b) VALUES (1,'foo'),(1,'bar');
|
||||
SELECT a,b FROM t;
|
||||
} {1|bar}
|
||||
|
||||
do_execsql_test_on_specific_db {:memory:} onconflict-replace-from-select-selfdup {
|
||||
CREATE TABLE src(x);
|
||||
INSERT INTO src VALUES (1),(1),(2);
|
||||
CREATE TABLE t(a INTEGER UNIQUE);
|
||||
INSERT OR REPLACE INTO t SELECT x FROM src;
|
||||
SELECT a FROM t ORDER BY a;
|
||||
} {1
|
||||
2}
|
||||
|
||||
Reference in New Issue
Block a user