mirror of
https://github.com/aljazceru/turso.git
synced 2025-12-26 12:34:22 +01:00
Merge 'translate/insert: more refactoring and support INSERT OR IGNORE' from Preston Thorpe
`INSERT OR IGNORE INTO t VALUES (...)` can trivially be rewritten to `INSERT INTO t VALUES (..) ON CONFLICT DO NOTHING` This PR does this rewriting, as well as finishes a large refactor on INSERT translation in general.. I just need a break from the rest of this feature tbh.. just was getting under my skin and I have been in `translate` land for too long. Closes #3742
This commit is contained in:
@@ -222,13 +222,18 @@ pub fn translate_insert(
|
||||
crate::bail_parse_error!("no such table: {}", table_name);
|
||||
};
|
||||
|
||||
let root_page = btree_table.root_page;
|
||||
|
||||
let BoundInsertResult {
|
||||
mut values,
|
||||
mut upsert_actions,
|
||||
inserting_multiple_rows,
|
||||
} = bind_insert(&mut program, resolver, &table, &mut body, connection)?;
|
||||
} = bind_insert(
|
||||
&mut program,
|
||||
resolver,
|
||||
&table,
|
||||
&mut body,
|
||||
connection,
|
||||
on_conflict.unwrap_or(ResolveType::Rollback),
|
||||
)?;
|
||||
|
||||
if inserting_multiple_rows && btree_table.has_autoincrement {
|
||||
ensure_sequence_initialized(&mut program, resolver.schema, &btree_table)?;
|
||||
@@ -278,109 +283,19 @@ pub fn translate_insert(
|
||||
}
|
||||
let insertion = build_insertion(&mut program, &table, &columns, ctx.num_values)?;
|
||||
|
||||
if inserting_multiple_rows {
|
||||
let select_result_start_reg = program
|
||||
.reg_result_cols_start
|
||||
.unwrap_or(ctx.yield_reg_opt.unwrap() + 1);
|
||||
translate_rows_multiple(
|
||||
&mut program,
|
||||
&insertion,
|
||||
select_result_start_reg,
|
||||
resolver,
|
||||
&ctx.temp_table_ctx,
|
||||
)?;
|
||||
} else {
|
||||
// Single row - populate registers directly
|
||||
program.emit_insn(Insn::OpenWrite {
|
||||
cursor_id: ctx.cursor_id,
|
||||
root_page: RegisterOrLiteral::Literal(root_page),
|
||||
db: 0,
|
||||
});
|
||||
|
||||
translate_rows_single(&mut program, &values, &insertion, resolver)?;
|
||||
}
|
||||
|
||||
// Open all the index btrees for writing
|
||||
for idx_cursor in ctx.idx_cursors.iter() {
|
||||
program.emit_insn(Insn::OpenWrite {
|
||||
cursor_id: idx_cursor.2,
|
||||
root_page: idx_cursor.1.into(),
|
||||
db: 0,
|
||||
});
|
||||
}
|
||||
translate_rows_and_open_tables(
|
||||
&mut program,
|
||||
resolver,
|
||||
&insertion,
|
||||
&ctx,
|
||||
&values,
|
||||
inserting_multiple_rows,
|
||||
)?;
|
||||
|
||||
let has_user_provided_rowid = insertion.key.is_provided_by_user();
|
||||
if btree_table.has_autoincrement {
|
||||
let seq_table = resolver
|
||||
.schema
|
||||
.get_btree_table("sqlite_sequence")
|
||||
.ok_or_else(|| {
|
||||
crate::error::LimboError::InternalError(
|
||||
"sqlite_sequence table not found".to_string(),
|
||||
)
|
||||
})?;
|
||||
let seq_cursor_id = program.alloc_cursor_id(CursorType::BTreeTable(seq_table.clone()));
|
||||
program.emit_insn(Insn::OpenWrite {
|
||||
cursor_id: seq_cursor_id,
|
||||
root_page: seq_table.root_page.into(),
|
||||
db: 0,
|
||||
});
|
||||
|
||||
let table_name_reg = program.emit_string8_new_reg(btree_table.name.clone());
|
||||
let r_seq = program.alloc_register();
|
||||
let r_seq_rowid = program.alloc_register();
|
||||
|
||||
ctx.autoincrement_meta = Some(AutoincMeta {
|
||||
seq_cursor_id,
|
||||
r_seq,
|
||||
r_seq_rowid,
|
||||
table_name_reg,
|
||||
});
|
||||
|
||||
program.emit_insn(Insn::Integer {
|
||||
dest: r_seq,
|
||||
value: 0,
|
||||
});
|
||||
program.emit_insn(Insn::Null {
|
||||
dest: r_seq_rowid,
|
||||
dest_end: None,
|
||||
});
|
||||
|
||||
let loop_start_label = program.allocate_label();
|
||||
let loop_end_label = program.allocate_label();
|
||||
let found_label = program.allocate_label();
|
||||
|
||||
program.emit_insn(Insn::Rewind {
|
||||
cursor_id: seq_cursor_id,
|
||||
pc_if_empty: loop_end_label,
|
||||
});
|
||||
program.preassign_label_to_next_insn(loop_start_label);
|
||||
|
||||
let name_col_reg = program.alloc_register();
|
||||
program.emit_column_or_rowid(seq_cursor_id, 0, name_col_reg);
|
||||
program.emit_insn(Insn::Ne {
|
||||
lhs: table_name_reg,
|
||||
rhs: name_col_reg,
|
||||
target_pc: found_label,
|
||||
flags: Default::default(),
|
||||
collation: None,
|
||||
});
|
||||
|
||||
program.emit_column_or_rowid(seq_cursor_id, 1, r_seq);
|
||||
program.emit_insn(Insn::RowId {
|
||||
cursor_id: seq_cursor_id,
|
||||
dest: r_seq_rowid,
|
||||
});
|
||||
program.emit_insn(Insn::Goto {
|
||||
target_pc: loop_end_label,
|
||||
});
|
||||
|
||||
program.preassign_label_to_next_insn(found_label);
|
||||
program.emit_insn(Insn::Next {
|
||||
cursor_id: seq_cursor_id,
|
||||
pc_if_next: loop_start_label,
|
||||
});
|
||||
program.preassign_label_to_next_insn(loop_end_label);
|
||||
if ctx.table.has_autoincrement {
|
||||
init_autoincrement(&mut program, &mut ctx, resolver)?;
|
||||
}
|
||||
|
||||
if has_user_provided_rowid {
|
||||
@@ -406,76 +321,8 @@ pub fn translate_insert(
|
||||
}
|
||||
|
||||
program.preassign_label_to_next_insn(ctx.key_generation_label);
|
||||
if let Some(AutoincMeta { r_seq, .. }) = ctx.autoincrement_meta {
|
||||
let r_max = program.alloc_register();
|
||||
|
||||
let dummy_reg = program.alloc_register();
|
||||
|
||||
program.emit_insn(Insn::NewRowid {
|
||||
cursor: ctx.cursor_id,
|
||||
rowid_reg: dummy_reg,
|
||||
prev_largest_reg: r_max,
|
||||
});
|
||||
|
||||
program.emit_insn(Insn::Copy {
|
||||
src_reg: r_seq,
|
||||
dst_reg: insertion.key_register(),
|
||||
extra_amount: 0,
|
||||
});
|
||||
program.emit_insn(Insn::MemMax {
|
||||
dest_reg: insertion.key_register(),
|
||||
src_reg: r_max,
|
||||
});
|
||||
|
||||
let no_overflow_label = program.allocate_label();
|
||||
let max_i64_reg = program.alloc_register();
|
||||
program.emit_insn(Insn::Integer {
|
||||
dest: max_i64_reg,
|
||||
value: i64::MAX,
|
||||
});
|
||||
program.emit_insn(Insn::Ne {
|
||||
lhs: insertion.key_register(),
|
||||
rhs: max_i64_reg,
|
||||
target_pc: no_overflow_label,
|
||||
flags: Default::default(),
|
||||
collation: None,
|
||||
});
|
||||
|
||||
program.emit_insn(Insn::Halt {
|
||||
err_code: crate::error::SQLITE_FULL,
|
||||
description: "database or disk is full".to_string(),
|
||||
});
|
||||
|
||||
program.preassign_label_to_next_insn(no_overflow_label);
|
||||
|
||||
program.emit_insn(Insn::AddImm {
|
||||
register: insertion.key_register(),
|
||||
value: 1,
|
||||
});
|
||||
|
||||
if let Some(AutoincMeta {
|
||||
seq_cursor_id,
|
||||
r_seq_rowid,
|
||||
table_name_reg,
|
||||
..
|
||||
}) = ctx.autoincrement_meta
|
||||
{
|
||||
emit_update_sqlite_sequence(
|
||||
&mut program,
|
||||
resolver.schema,
|
||||
seq_cursor_id,
|
||||
r_seq_rowid,
|
||||
table_name_reg,
|
||||
insertion.key_register(),
|
||||
)?;
|
||||
}
|
||||
} else {
|
||||
program.emit_insn(Insn::NewRowid {
|
||||
cursor: ctx.cursor_id,
|
||||
rowid_reg: insertion.key_register(),
|
||||
prev_largest_reg: 0,
|
||||
});
|
||||
}
|
||||
emit_rowid_generation(&mut program, resolver, &ctx, &insertion)?;
|
||||
|
||||
program.preassign_label_to_next_insn(ctx.key_ready_for_uniqueness_check_label);
|
||||
|
||||
@@ -488,7 +335,9 @@ pub fn translate_insert(
|
||||
});
|
||||
}
|
||||
|
||||
let (constraints_to_check, upsert_catch_all_position) = build_constraints_to_check(
|
||||
// Build a list of upsert constraints/indexes we need to run preflight
|
||||
// checks against, in the proper order of evaluation,
|
||||
let constraints = build_constraints_to_check(
|
||||
resolver,
|
||||
table_name.as_str(),
|
||||
&upsert_actions,
|
||||
@@ -505,8 +354,7 @@ pub fn translate_insert(
|
||||
resolver,
|
||||
&insertion,
|
||||
&upsert_actions,
|
||||
&constraints_to_check,
|
||||
upsert_catch_all_position,
|
||||
&constraints,
|
||||
)?;
|
||||
|
||||
emit_notnulls(&mut program, &ctx, &insertion);
|
||||
@@ -526,85 +374,9 @@ pub fn translate_insert(
|
||||
});
|
||||
|
||||
if has_upsert {
|
||||
// COMMIT PHASE: no preflight jumps happened; emit the actual index writes now
|
||||
// We re-check partial-index predicates against the NEW image, produce packed records,
|
||||
// and insert into all applicable indexes, we do not re-probe uniqueness here, as preflight
|
||||
// already guaranteed non-conflict.
|
||||
for index in resolver.schema.get_indices(table_name.as_str()) {
|
||||
let idx_cursor_id = ctx
|
||||
.idx_cursors
|
||||
.iter()
|
||||
.find(|(name, _, _)| *name == &index.name)
|
||||
.map(|(_, _, c_id)| *c_id)
|
||||
.expect("no cursor found for index");
|
||||
|
||||
// Re-evaluate partial predicate on the would-be inserted image
|
||||
let commit_skip_label = if let Some(where_clause) = &index.where_clause {
|
||||
let mut where_for_eval = where_clause.as_ref().clone();
|
||||
rewrite_partial_index_where(&mut where_for_eval, &insertion)?;
|
||||
let reg = program.alloc_register();
|
||||
translate_expr_no_constant_opt(
|
||||
&mut program,
|
||||
Some(&TableReferences::new_empty()),
|
||||
&where_for_eval,
|
||||
reg,
|
||||
resolver,
|
||||
NoConstantOptReason::RegisterReuse,
|
||||
)?;
|
||||
let lbl = program.allocate_label();
|
||||
program.emit_insn(Insn::IfNot {
|
||||
reg,
|
||||
target_pc: lbl,
|
||||
jump_if_null: true,
|
||||
});
|
||||
Some(lbl)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let num_cols = index.columns.len();
|
||||
let idx_start_reg = program.alloc_registers(num_cols + 1);
|
||||
|
||||
// Build [key cols..., rowid] from insertion registers
|
||||
for (i, idx_col) in index.columns.iter().enumerate() {
|
||||
let Some(cm) = insertion.get_col_mapping_by_name(&idx_col.name) else {
|
||||
return Err(crate::LimboError::PlanningError(
|
||||
"Column not found in INSERT (commit phase)".to_string(),
|
||||
));
|
||||
};
|
||||
program.emit_insn(Insn::Copy {
|
||||
src_reg: cm.register,
|
||||
dst_reg: idx_start_reg + i,
|
||||
extra_amount: 0,
|
||||
});
|
||||
}
|
||||
program.emit_insn(Insn::Copy {
|
||||
src_reg: insertion.key_register(),
|
||||
dst_reg: idx_start_reg + num_cols,
|
||||
extra_amount: 0,
|
||||
});
|
||||
|
||||
let record_reg = program.alloc_register();
|
||||
program.emit_insn(Insn::MakeRecord {
|
||||
start_reg: idx_start_reg,
|
||||
count: num_cols + 1,
|
||||
dest_reg: record_reg,
|
||||
index_name: Some(index.name.clone()),
|
||||
affinity_str: None,
|
||||
});
|
||||
program.emit_insn(Insn::IdxInsert {
|
||||
cursor_id: idx_cursor_id,
|
||||
record_reg,
|
||||
unpacked_start: Some(idx_start_reg),
|
||||
unpacked_count: Some((num_cols + 1) as u16),
|
||||
flags: IdxInsertFlags::new().nchange(true),
|
||||
});
|
||||
|
||||
if let Some(lbl) = commit_skip_label {
|
||||
program.resolve_label(lbl, program.offset());
|
||||
}
|
||||
}
|
||||
emit_commit_phase(&mut program, resolver, &insertion, &ctx)?;
|
||||
}
|
||||
|
||||
if has_fks {
|
||||
// Child-side check must run before Insert (may HALT or increment deferred counter)
|
||||
emit_fk_child_insert_checks(
|
||||
@@ -712,8 +484,13 @@ pub fn translate_insert(
|
||||
connection,
|
||||
)?;
|
||||
|
||||
emit_epilogue(&mut program, &ctx, inserting_multiple_rows);
|
||||
Ok(program)
|
||||
}
|
||||
|
||||
fn emit_epilogue(program: &mut ProgramBuilder, ctx: &InsertEmitCtx, inserting_multiple_rows: bool) {
|
||||
if inserting_multiple_rows {
|
||||
if let Some(temp_table_ctx) = ctx.temp_table_ctx {
|
||||
if let Some(temp_table_ctx) = &ctx.temp_table_ctx {
|
||||
program.resolve_label(ctx.row_done_label, program.offset());
|
||||
|
||||
program.emit_insn(Insn::Next {
|
||||
@@ -748,11 +525,214 @@ pub fn translate_insert(
|
||||
target_pc: ctx.stmt_epilogue,
|
||||
});
|
||||
}
|
||||
|
||||
program.preassign_label_to_next_insn(ctx.stmt_epilogue);
|
||||
program.resolve_label(ctx.halt_label, program.offset());
|
||||
}
|
||||
|
||||
Ok(program)
|
||||
// COMMIT PHASE: no preflight jumps happened; emit the actual index writes now
|
||||
// We re-check partial-index predicates against the NEW image, produce packed records,
|
||||
// and insert into all applicable indexes, we do not re-probe uniqueness here, as preflight
|
||||
// already guaranteed non-conflict.
|
||||
fn emit_commit_phase(
|
||||
program: &mut ProgramBuilder,
|
||||
resolver: &Resolver,
|
||||
insertion: &Insertion,
|
||||
ctx: &InsertEmitCtx,
|
||||
) -> Result<()> {
|
||||
for index in resolver.schema.get_indices(ctx.table.name.as_str()) {
|
||||
let idx_cursor_id = ctx
|
||||
.idx_cursors
|
||||
.iter()
|
||||
.find(|(name, _, _)| *name == &index.name)
|
||||
.map(|(_, _, c_id)| *c_id)
|
||||
.expect("no cursor found for index");
|
||||
|
||||
// Re-evaluate partial predicate on the would-be inserted image
|
||||
let commit_skip_label = if let Some(where_clause) = &index.where_clause {
|
||||
let mut where_for_eval = where_clause.as_ref().clone();
|
||||
rewrite_partial_index_where(&mut where_for_eval, insertion)?;
|
||||
let reg = program.alloc_register();
|
||||
translate_expr_no_constant_opt(
|
||||
program,
|
||||
Some(&TableReferences::new_empty()),
|
||||
&where_for_eval,
|
||||
reg,
|
||||
resolver,
|
||||
NoConstantOptReason::RegisterReuse,
|
||||
)?;
|
||||
let lbl = program.allocate_label();
|
||||
program.emit_insn(Insn::IfNot {
|
||||
reg,
|
||||
target_pc: lbl,
|
||||
jump_if_null: true,
|
||||
});
|
||||
Some(lbl)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let num_cols = index.columns.len();
|
||||
let idx_start_reg = program.alloc_registers(num_cols + 1);
|
||||
|
||||
// Build [key cols..., rowid] from insertion registers
|
||||
for (i, idx_col) in index.columns.iter().enumerate() {
|
||||
let Some(cm) = insertion.get_col_mapping_by_name(&idx_col.name) else {
|
||||
return Err(crate::LimboError::PlanningError(
|
||||
"Column not found in INSERT (commit phase)".to_string(),
|
||||
));
|
||||
};
|
||||
program.emit_insn(Insn::Copy {
|
||||
src_reg: cm.register,
|
||||
dst_reg: idx_start_reg + i,
|
||||
extra_amount: 0,
|
||||
});
|
||||
}
|
||||
program.emit_insn(Insn::Copy {
|
||||
src_reg: insertion.key_register(),
|
||||
dst_reg: idx_start_reg + num_cols,
|
||||
extra_amount: 0,
|
||||
});
|
||||
|
||||
let record_reg = program.alloc_register();
|
||||
program.emit_insn(Insn::MakeRecord {
|
||||
start_reg: idx_start_reg,
|
||||
count: num_cols + 1,
|
||||
dest_reg: record_reg,
|
||||
index_name: Some(index.name.clone()),
|
||||
affinity_str: None,
|
||||
});
|
||||
program.emit_insn(Insn::IdxInsert {
|
||||
cursor_id: idx_cursor_id,
|
||||
record_reg,
|
||||
unpacked_start: Some(idx_start_reg),
|
||||
unpacked_count: Some((num_cols + 1) as u16),
|
||||
flags: IdxInsertFlags::new().nchange(true),
|
||||
});
|
||||
|
||||
if let Some(lbl) = commit_skip_label {
|
||||
program.resolve_label(lbl, program.offset());
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn translate_rows_and_open_tables(
|
||||
program: &mut ProgramBuilder,
|
||||
resolver: &Resolver,
|
||||
insertion: &Insertion,
|
||||
ctx: &InsertEmitCtx,
|
||||
values: &[Box<Expr>],
|
||||
inserting_multiple_rows: bool,
|
||||
) -> Result<()> {
|
||||
if inserting_multiple_rows {
|
||||
let select_result_start_reg = program
|
||||
.reg_result_cols_start
|
||||
.unwrap_or(ctx.yield_reg_opt.unwrap() + 1);
|
||||
translate_rows_multiple(
|
||||
program,
|
||||
insertion,
|
||||
select_result_start_reg,
|
||||
resolver,
|
||||
&ctx.temp_table_ctx,
|
||||
)?;
|
||||
} else {
|
||||
// Single row - populate registers directly
|
||||
program.emit_insn(Insn::OpenWrite {
|
||||
cursor_id: ctx.cursor_id,
|
||||
root_page: RegisterOrLiteral::Literal(ctx.table.root_page),
|
||||
db: 0,
|
||||
});
|
||||
|
||||
translate_rows_single(program, values, insertion, resolver)?;
|
||||
}
|
||||
|
||||
// Open all the index btrees for writing
|
||||
for idx_cursor in ctx.idx_cursors.iter() {
|
||||
program.emit_insn(Insn::OpenWrite {
|
||||
cursor_id: idx_cursor.2,
|
||||
root_page: idx_cursor.1.into(),
|
||||
db: 0,
|
||||
});
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn emit_rowid_generation(
|
||||
program: &mut ProgramBuilder,
|
||||
resolver: &Resolver,
|
||||
ctx: &InsertEmitCtx,
|
||||
insertion: &Insertion,
|
||||
) -> Result<()> {
|
||||
if let Some(AutoincMeta {
|
||||
r_seq,
|
||||
seq_cursor_id,
|
||||
r_seq_rowid,
|
||||
table_name_reg,
|
||||
..
|
||||
}) = ctx.autoincrement_meta
|
||||
{
|
||||
let r_max = program.alloc_register();
|
||||
|
||||
let dummy_reg = program.alloc_register();
|
||||
|
||||
program.emit_insn(Insn::NewRowid {
|
||||
cursor: ctx.cursor_id,
|
||||
rowid_reg: dummy_reg,
|
||||
prev_largest_reg: r_max,
|
||||
});
|
||||
|
||||
program.emit_insn(Insn::Copy {
|
||||
src_reg: r_seq,
|
||||
dst_reg: insertion.key_register(),
|
||||
extra_amount: 0,
|
||||
});
|
||||
program.emit_insn(Insn::MemMax {
|
||||
dest_reg: insertion.key_register(),
|
||||
src_reg: r_max,
|
||||
});
|
||||
|
||||
let no_overflow_label = program.allocate_label();
|
||||
let max_i64_reg = program.alloc_register();
|
||||
program.emit_insn(Insn::Integer {
|
||||
dest: max_i64_reg,
|
||||
value: i64::MAX,
|
||||
});
|
||||
program.emit_insn(Insn::Ne {
|
||||
lhs: insertion.key_register(),
|
||||
rhs: max_i64_reg,
|
||||
target_pc: no_overflow_label,
|
||||
flags: Default::default(),
|
||||
collation: None,
|
||||
});
|
||||
|
||||
program.emit_insn(Insn::Halt {
|
||||
err_code: crate::error::SQLITE_FULL,
|
||||
description: "database or disk is full".to_string(),
|
||||
});
|
||||
|
||||
program.preassign_label_to_next_insn(no_overflow_label);
|
||||
|
||||
program.emit_insn(Insn::AddImm {
|
||||
register: insertion.key_register(),
|
||||
value: 1,
|
||||
});
|
||||
|
||||
emit_update_sqlite_sequence(
|
||||
program,
|
||||
resolver.schema,
|
||||
seq_cursor_id,
|
||||
r_seq_rowid,
|
||||
table_name_reg,
|
||||
insertion.key_register(),
|
||||
)?;
|
||||
} else {
|
||||
program.emit_insn(Insn::NewRowid {
|
||||
cursor: ctx.cursor_id,
|
||||
rowid_reg: insertion.key_register(),
|
||||
prev_largest_reg: 0,
|
||||
});
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
@@ -798,6 +778,82 @@ fn resolve_upserts(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn init_autoincrement(
|
||||
program: &mut ProgramBuilder,
|
||||
ctx: &mut InsertEmitCtx,
|
||||
resolver: &Resolver,
|
||||
) -> Result<()> {
|
||||
let seq_table = resolver
|
||||
.schema
|
||||
.get_btree_table("sqlite_sequence")
|
||||
.ok_or_else(|| {
|
||||
crate::error::LimboError::InternalError("sqlite_sequence table not found".to_string())
|
||||
})?;
|
||||
let seq_cursor_id = program.alloc_cursor_id(CursorType::BTreeTable(seq_table.clone()));
|
||||
program.emit_insn(Insn::OpenWrite {
|
||||
cursor_id: seq_cursor_id,
|
||||
root_page: seq_table.root_page.into(),
|
||||
db: 0,
|
||||
});
|
||||
|
||||
let table_name_reg = program.emit_string8_new_reg(ctx.table.name.clone());
|
||||
let r_seq = program.alloc_register();
|
||||
let r_seq_rowid = program.alloc_register();
|
||||
|
||||
ctx.autoincrement_meta = Some(AutoincMeta {
|
||||
seq_cursor_id,
|
||||
r_seq,
|
||||
r_seq_rowid,
|
||||
table_name_reg,
|
||||
});
|
||||
|
||||
program.emit_insn(Insn::Integer {
|
||||
dest: r_seq,
|
||||
value: 0,
|
||||
});
|
||||
program.emit_insn(Insn::Null {
|
||||
dest: r_seq_rowid,
|
||||
dest_end: None,
|
||||
});
|
||||
|
||||
let loop_start_label = program.allocate_label();
|
||||
let loop_end_label = program.allocate_label();
|
||||
let found_label = program.allocate_label();
|
||||
|
||||
program.emit_insn(Insn::Rewind {
|
||||
cursor_id: seq_cursor_id,
|
||||
pc_if_empty: loop_end_label,
|
||||
});
|
||||
program.preassign_label_to_next_insn(loop_start_label);
|
||||
|
||||
let name_col_reg = program.alloc_register();
|
||||
program.emit_column_or_rowid(seq_cursor_id, 0, name_col_reg);
|
||||
program.emit_insn(Insn::Ne {
|
||||
lhs: table_name_reg,
|
||||
rhs: name_col_reg,
|
||||
target_pc: found_label,
|
||||
flags: Default::default(),
|
||||
collation: None,
|
||||
});
|
||||
|
||||
program.emit_column_or_rowid(seq_cursor_id, 1, r_seq);
|
||||
program.emit_insn(Insn::RowId {
|
||||
cursor_id: seq_cursor_id,
|
||||
dest: r_seq_rowid,
|
||||
});
|
||||
program.emit_insn(Insn::Goto {
|
||||
target_pc: loop_end_label,
|
||||
});
|
||||
|
||||
program.preassign_label_to_next_insn(found_label);
|
||||
program.emit_insn(Insn::Next {
|
||||
cursor_id: seq_cursor_id,
|
||||
pc_if_next: loop_start_label,
|
||||
});
|
||||
program.preassign_label_to_next_insn(loop_end_label);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn emit_notnulls(program: &mut ProgramBuilder, ctx: &InsertEmitCtx, insertion: &Insertion) {
|
||||
for column_mapping in insertion
|
||||
.col_mappings
|
||||
@@ -850,8 +906,10 @@ fn bind_insert(
|
||||
table: &Table,
|
||||
body: &mut InsertBody,
|
||||
connection: &Arc<Connection>,
|
||||
on_conflict: ResolveType,
|
||||
) -> Result<BoundInsertResult> {
|
||||
let mut values: Vec<Box<Expr>> = vec![];
|
||||
let mut upsert: Option<Box<Upsert>> = None;
|
||||
let mut upsert_actions: Vec<(ResolvedUpsertTarget, BranchOffset, Box<Upsert>)> = Vec::new();
|
||||
let mut inserting_multiple_rows = false;
|
||||
match body {
|
||||
@@ -907,43 +965,63 @@ fn bind_insert(
|
||||
}
|
||||
_ => inserting_multiple_rows = true,
|
||||
}
|
||||
while let Some(mut upsert) = upsert_opt.take() {
|
||||
if let UpsertDo::Set {
|
||||
ref mut sets,
|
||||
ref mut where_clause,
|
||||
} = &mut upsert.do_clause
|
||||
{
|
||||
for set in sets.iter_mut() {
|
||||
bind_and_rewrite_expr(
|
||||
&mut set.expr,
|
||||
None,
|
||||
None,
|
||||
connection,
|
||||
&mut program.param_ctx,
|
||||
BindingBehavior::AllowUnboundIdentifiers,
|
||||
)?;
|
||||
}
|
||||
if let Some(ref mut where_expr) = where_clause {
|
||||
bind_and_rewrite_expr(
|
||||
where_expr,
|
||||
None,
|
||||
None,
|
||||
connection,
|
||||
&mut program.param_ctx,
|
||||
BindingBehavior::AllowUnboundIdentifiers,
|
||||
)?;
|
||||
}
|
||||
}
|
||||
let next = upsert.next.take();
|
||||
upsert_actions.push((
|
||||
// resolve the constrained target for UPSERT in the chain
|
||||
resolve_upsert_target(resolver.schema, table, &upsert)?,
|
||||
program.allocate_label(),
|
||||
upsert,
|
||||
));
|
||||
*upsert_opt = next;
|
||||
upsert = upsert_opt.take();
|
||||
}
|
||||
}
|
||||
match on_conflict {
|
||||
ResolveType::Ignore => {
|
||||
upsert.replace(Box::new(ast::Upsert {
|
||||
do_clause: UpsertDo::Nothing,
|
||||
index: None,
|
||||
next: None,
|
||||
}));
|
||||
}
|
||||
ResolveType::Rollback => {
|
||||
// This is the current default behavior for INSERT in tursodb - the transaction will be rolled back if the insert fails.
|
||||
// In SQLite, the default is ABORT and we should use that one once we support subtransactions.
|
||||
}
|
||||
_ => {
|
||||
crate::bail_parse_error!(
|
||||
"INSERT OR {} is only supported with UPSERT",
|
||||
on_conflict.to_string()
|
||||
);
|
||||
}
|
||||
}
|
||||
while let Some(mut upsert_opt) = upsert.take() {
|
||||
if let UpsertDo::Set {
|
||||
ref mut sets,
|
||||
ref mut where_clause,
|
||||
} = &mut upsert_opt.do_clause
|
||||
{
|
||||
for set in sets.iter_mut() {
|
||||
bind_and_rewrite_expr(
|
||||
&mut set.expr,
|
||||
None,
|
||||
None,
|
||||
connection,
|
||||
&mut program.param_ctx,
|
||||
BindingBehavior::AllowUnboundIdentifiers,
|
||||
)?;
|
||||
}
|
||||
if let Some(ref mut where_expr) = where_clause {
|
||||
bind_and_rewrite_expr(
|
||||
where_expr,
|
||||
None,
|
||||
None,
|
||||
connection,
|
||||
&mut program.param_ctx,
|
||||
BindingBehavior::AllowUnboundIdentifiers,
|
||||
)?;
|
||||
}
|
||||
}
|
||||
let next = upsert_opt.next.take();
|
||||
upsert_actions.push((
|
||||
// resolve the constrained target for UPSERT in the chain
|
||||
resolve_upsert_target(resolver.schema, table, &upsert_opt)?,
|
||||
program.allocate_label(),
|
||||
upsert_opt,
|
||||
));
|
||||
upsert = next;
|
||||
}
|
||||
Ok(BoundInsertResult {
|
||||
values,
|
||||
@@ -1552,10 +1630,9 @@ fn emit_preflight_constraint_checks(
|
||||
resolver: &Resolver,
|
||||
insertion: &Insertion,
|
||||
upsert_actions: &[(ResolvedUpsertTarget, BranchOffset, Box<Upsert>)],
|
||||
constraints_to_check: &[(ResolvedUpsertTarget, Option<usize>)],
|
||||
upsert_catch_all_position: Option<usize>,
|
||||
constraints: &ConstraintsToCheck,
|
||||
) -> Result<()> {
|
||||
for (constraint, position) in constraints_to_check {
|
||||
for (constraint, position) in &constraints.constraints_to_check {
|
||||
match constraint {
|
||||
ResolvedUpsertTarget::PrimaryKey => {
|
||||
let make_record_label = program.allocate_label();
|
||||
@@ -1569,7 +1646,7 @@ fn emit_preflight_constraint_checks(
|
||||
// Conflict on rowid: attempt to route through UPSERT if it targets the PK, otherwise raise constraint.
|
||||
// emit Halt for every case *except* when upsert handles the conflict
|
||||
'emit_halt: {
|
||||
if let Some(position) = position.or(upsert_catch_all_position) {
|
||||
if let Some(position) = position.or(constraints.upsert_catch_all_position) {
|
||||
// PK conflict: the conflicting rowid is exactly the attempted key
|
||||
program.emit_insn(Insn::Copy {
|
||||
src_reg: insertion.key_register(),
|
||||
@@ -1677,7 +1754,7 @@ fn emit_preflight_constraint_checks(
|
||||
});
|
||||
|
||||
// Conflict detected, figure out if this UPSERT handles the conflict
|
||||
if let Some(position) = position.or(upsert_catch_all_position) {
|
||||
if let Some(position) = position.or(constraints.upsert_catch_all_position) {
|
||||
match &upsert_actions[position].2.do_clause {
|
||||
UpsertDo::Nothing => {
|
||||
// Bail out without writing anything
|
||||
@@ -2008,12 +2085,17 @@ pub fn rewrite_partial_index_where(
|
||||
)
|
||||
}
|
||||
|
||||
struct ConstraintsToCheck {
|
||||
constraints_to_check: Vec<(ResolvedUpsertTarget, Option<usize>)>,
|
||||
upsert_catch_all_position: Option<usize>,
|
||||
}
|
||||
|
||||
fn build_constraints_to_check(
|
||||
resolver: &Resolver,
|
||||
table_name: &str,
|
||||
upsert_actions: &[(ResolvedUpsertTarget, BranchOffset, Box<Upsert>)],
|
||||
has_user_provided_rowid: bool,
|
||||
) -> (Vec<(ResolvedUpsertTarget, Option<usize>)>, Option<usize>) {
|
||||
) -> ConstraintsToCheck {
|
||||
let mut constraints_to_check = Vec::new();
|
||||
if has_user_provided_rowid {
|
||||
// Check uniqueness constraint for rowid if it was provided by user.
|
||||
@@ -2043,7 +2125,10 @@ fn build_constraints_to_check(
|
||||
} else {
|
||||
None
|
||||
};
|
||||
(constraints_to_check, upsert_catch_all_position)
|
||||
ConstraintsToCheck {
|
||||
constraints_to_check,
|
||||
upsert_catch_all_position,
|
||||
}
|
||||
}
|
||||
|
||||
fn emit_update_sqlite_sequence(
|
||||
|
||||
@@ -701,4 +701,77 @@ do_execsql_test_on_specific_db {:memory:} insert-rowid-backwards-compability-2 {
|
||||
CREATE TABLE t(a INTEGER, PRIMARY KEY (a DESC));
|
||||
INSERT INTO t(a) VALUES (123);
|
||||
SELECT rowid, * FROM t;
|
||||
} {123|123}
|
||||
} {123|123}
|
||||
|
||||
|
||||
do_execsql_test_on_specific_db {:memory:} ignore-pk-conflict {
|
||||
CREATE TABLE t(a INTEGER PRIMARY KEY);
|
||||
INSERT INTO t VALUES (1),(2),(3);
|
||||
INSERT OR IGNORE INTO t VALUES (2);
|
||||
SELECT a FROM t ORDER BY a;
|
||||
} {1
|
||||
2
|
||||
3}
|
||||
|
||||
do_execsql_test_on_specific_db {:memory:} ignore-unique-conflict {
|
||||
CREATE TABLE t(a INTEGER, b TEXT UNIQUE);
|
||||
INSERT INTO t VALUES (1,'x'),(2,'y');
|
||||
INSERT OR IGNORE INTO t VALUES (3,'y');
|
||||
SELECT a,b FROM t ORDER BY a;
|
||||
} {1|x
|
||||
2|y}
|
||||
|
||||
do_execsql_test_on_specific_db {:memory:} ignore-multi-unique-conflict {
|
||||
CREATE TABLE t(a UNIQUE, b UNIQUE, c);
|
||||
INSERT INTO t VALUES (1,10,100),(2,20,200);
|
||||
INSERT OR IGNORE INTO t VALUES (1,30,300); -- conflicts on a
|
||||
INSERT OR IGNORE INTO t VALUES (3,20,300); -- conflicts on b
|
||||
INSERT OR IGNORE INTO t VALUES (1,20,300); -- conflicts on both
|
||||
SELECT a,b,c FROM t ORDER BY a;
|
||||
} {1|10|100
|
||||
2|20|200}
|
||||
|
||||
do_execsql_test_on_specific_db {:memory:} ignore-some-conflicts-multirow {
|
||||
CREATE TABLE t(a INTEGER UNIQUE);
|
||||
INSERT INTO t VALUES (2),(4);
|
||||
INSERT OR IGNORE INTO t VALUES (1),(2),(3),(4),(5);
|
||||
SELECT a FROM t ORDER BY a;
|
||||
} {1
|
||||
2
|
||||
3
|
||||
4
|
||||
5}
|
||||
|
||||
do_execsql_test_on_specific_db {:memory:} ignore-from-select {
|
||||
CREATE TABLE src(x);
|
||||
INSERT INTO src VALUES (1),(2),(2),(3);
|
||||
CREATE TABLE dst(a INTEGER UNIQUE);
|
||||
INSERT INTO dst VALUES (2);
|
||||
INSERT OR IGNORE INTO dst SELECT x FROM src;
|
||||
SELECT a FROM dst ORDER BY a;
|
||||
} {1
|
||||
2
|
||||
3}
|
||||
|
||||
do_execsql_test_on_specific_db {:memory:} ignore-null-in-unique {
|
||||
CREATE TABLE t(a INTEGER UNIQUE);
|
||||
INSERT INTO t VALUES (1),(NULL),(NULL);
|
||||
INSERT OR IGNORE INTO t VALUES (1),(NULL);
|
||||
SELECT COUNT(*) FROM t WHERE a IS NULL;
|
||||
} {3}
|
||||
|
||||
do_execsql_test_on_specific_db {:memory:} ignore-preserves-rowid {
|
||||
CREATE TABLE t(data TEXT UNIQUE);
|
||||
INSERT INTO t VALUES ('x'),('y'),('z');
|
||||
SELECT rowid, data FROM t WHERE data='y';
|
||||
INSERT OR IGNORE INTO t VALUES ('y');
|
||||
SELECT rowid, data FROM t WHERE data='y';
|
||||
} {2|y
|
||||
2|y}
|
||||
|
||||
do_execsql_test_on_specific_db {:memory:} ignore-intra-statement-dups {
|
||||
CREATE TABLE t(a INTEGER PRIMARY KEY, b TEXT);
|
||||
INSERT OR IGNORE INTO t VALUES (5,'first'),(6,'x'),(5,'second'),(5,'third');
|
||||
SELECT a,b FROM t ORDER BY a;
|
||||
} {5|first
|
||||
6|x}
|
||||
|
||||
Reference in New Issue
Block a user