Merge 'translate/emitter: Implement partial indexes' from Preston Thorpe

This PR adds support for partial indexes, e.g. `CREATE INDEX` with a
provided predicate
```sql
CREATE UNIQUE INDEX idx_expensive ON products(sku) where price > 100;
```
The PR does not yet implement support for using the partial indexes in
the optimizer.

Reviewed-by: Jussi Saurio <jussi.saurio@gmail.com>

Closes #3228
This commit is contained in:
Preston Thorpe
2025-09-22 09:09:54 -04:00
committed by GitHub
24 changed files with 2523 additions and 546 deletions

View File

@@ -164,3 +164,4 @@ impl From<turso_ext::ResultCode> for LimboError {
pub const SQLITE_CONSTRAINT: usize = 19;
pub const SQLITE_CONSTRAINT_PRIMARYKEY: usize = SQLITE_CONSTRAINT | (6 << 8);
pub const SQLITE_CONSTRAINT_NOTNULL: usize = SQLITE_CONSTRAINT | (5 << 8);
pub const SQLITE_CONSTRAINT_UNIQUE: usize = 2067;

View File

@@ -69,6 +69,7 @@ pub fn create_dbsp_state_index(root_page: usize) -> Index {
unique: true,
ephemeral: false,
has_rowid: true,
where_clause: None,
}
}

View File

@@ -1,4 +1,6 @@
use crate::function::Func;
use crate::incremental::view::IncrementalView;
use crate::translate::expr::{bind_and_rewrite_expr, walk_expr, ParamState, WalkControl};
use parking_lot::RwLock;
/// Simple view structure for non-materialized views
@@ -15,24 +17,24 @@ pub type ViewsMap = HashMap<String, View>;
use crate::storage::btree::BTreeCursor;
use crate::translate::collate::CollationSeq;
use crate::translate::plan::SelectPlan;
use crate::translate::plan::{SelectPlan, TableReferences};
use crate::util::{
module_args_from_sql, module_name_from_sql, type_from_name, IOExt, UnparsedFromSqlIndex,
};
use crate::{
contains_ignore_ascii_case, eq_ignore_ascii_case, match_ignore_ascii_case, LimboError,
MvCursor, MvStore, Pager, RefValue, SymbolTable, VirtualTable,
contains_ignore_ascii_case, eq_ignore_ascii_case, match_ignore_ascii_case, Connection,
LimboError, MvCursor, MvStore, Pager, RefValue, SymbolTable, VirtualTable,
};
use crate::{util::normalize_ident, Result};
use core::fmt;
use std::collections::{HashMap, HashSet};
use std::collections::{HashMap, HashSet, VecDeque};
use std::ops::Deref;
use std::sync::Arc;
use std::sync::Mutex;
use tracing::trace;
use turso_parser::ast::{self, ColumnDefinition, Expr, Literal, SortOrder, TableOptions};
use turso_parser::{
ast::{Cmd, CreateTableBody, ResultColumn, Stmt},
ast::{Cmd, CreateTableBody, Name, ResultColumn, Stmt},
parser::Parser,
};
@@ -62,7 +64,7 @@ pub struct Schema {
pub views: ViewsMap,
/// table_name to list of indexes for the table
pub indexes: HashMap<String, Vec<Arc<Index>>>,
pub indexes: HashMap<String, VecDeque<Arc<Index>>>,
pub has_indexes: std::collections::HashSet<String>,
pub indexes_enabled: bool,
pub schema_version: u32,
@@ -75,7 +77,7 @@ impl Schema {
pub fn new(indexes_enabled: bool) -> Self {
let mut tables: HashMap<String, Arc<Table>> = HashMap::new();
let has_indexes = std::collections::HashSet::new();
let indexes: HashMap<String, Vec<Arc<Index>>> = HashMap::new();
let indexes: HashMap<String, VecDeque<Arc<Index>>> = HashMap::new();
#[allow(clippy::arc_with_non_send_sync)]
tables.insert(
SCHEMA_TABLE_NAME.to_string(),
@@ -242,17 +244,23 @@ impl Schema {
pub fn add_index(&mut self, index: Arc<Index>) {
let table_name = normalize_ident(&index.table_name);
// We must add the new index to the front of the deque, because SQLite stores index definitions as a linked list
// where the newest parsed index entry is at the head of list. If we would add it to the back of a regular Vec for example,
// then we would evaluate ON CONFLICT DO UPDATE clauses in the wrong index iteration order and UPDATE the wrong row. One might
// argue that this is an implementation detail and we should not care about this, but it makes e.g. the fuzz test 'partial_index_mutation_and_upsert_fuzz'
// fail, so let's just be compatible.
self.indexes
.entry(table_name)
.or_default()
.push(index.clone())
.push_front(index.clone())
}
pub fn get_indices(&self, table_name: &str) -> &[Arc<Index>] {
pub fn get_indices(&self, table_name: &str) -> impl Iterator<Item = &Arc<Index>> {
let name = normalize_ident(table_name);
self.indexes
.get(&name)
.map_or_else(|| &[] as &[Arc<Index>], |v| v.as_slice())
.map(|v| v.iter())
.unwrap_or_default()
}
pub fn get_index(&self, table_name: &str, index_name: &str) -> Option<&Arc<Index>> {
@@ -447,11 +455,13 @@ impl Schema {
)?));
} else {
// Add single column unique index
self.add_index(Arc::new(Index::automatic_from_unique(
table.as_ref(),
automatic_indexes.pop().unwrap(),
vec![(pos_in_table, unique_set.columns.first().unwrap().1)],
)?));
if let Some(autoidx) = automatic_indexes.pop() {
self.add_index(Arc::new(Index::automatic_from_unique(
table.as_ref(),
autoidx,
vec![(pos_in_table, unique_set.columns.first().unwrap().1)],
)?));
}
}
}
for unique_set in table.unique_sets.iter().filter(|us| us.columns.len() > 1) {
@@ -1593,6 +1603,7 @@ pub struct Index {
/// For example, WITHOUT ROWID tables (not supported in Limbo yet),
/// and SELECT DISTINCT ephemeral indexes will not have a rowid.
pub has_rowid: bool,
pub where_clause: Option<Box<Expr>>,
}
#[allow(dead_code)]
@@ -1620,6 +1631,7 @@ impl Index {
tbl_name,
columns,
unique,
where_clause,
..
})) => {
let index_name = normalize_ident(idx_name.name.as_str());
@@ -1649,6 +1661,7 @@ impl Index {
unique,
ephemeral: false,
has_rowid: table.has_rowid,
where_clause,
})
}
_ => todo!("Expected create index statement"),
@@ -1693,6 +1706,7 @@ impl Index {
unique: true,
ephemeral: false,
has_rowid: table.has_rowid,
where_clause: None,
})
}
@@ -1729,6 +1743,7 @@ impl Index {
unique: true,
ephemeral: false,
has_rowid: table.has_rowid,
where_clause: None,
})
}
@@ -1743,6 +1758,102 @@ impl Index {
.iter()
.position(|c| c.pos_in_table == table_pos)
}
/// Walk the where_clause Expr of a partial index and validate that it doesn't reference any other
/// tables or use any disallowed constructs.
pub fn validate_where_expr(&self, table: &Table) -> bool {
let Some(where_clause) = &self.where_clause else {
return true;
};
let tbl_norm = normalize_ident(self.table_name.as_str());
let has_col = |name: &str| {
let n = normalize_ident(name);
table
.columns()
.iter()
.any(|c| c.name.as_ref().is_some_and(|cn| normalize_ident(cn) == n))
};
let is_tbl = |ns: &str| normalize_ident(ns).eq_ignore_ascii_case(&tbl_norm);
let is_deterministic_fn = |name: &str, argc: usize| {
let n = normalize_ident(name);
Func::resolve_function(&n, argc).is_ok_and(|f| f.is_deterministic())
};
let mut ok = true;
let _ = walk_expr(where_clause.as_ref(), &mut |e: &Expr| -> crate::Result<
WalkControl,
> {
if !ok {
return Ok(WalkControl::SkipChildren);
}
match e {
Expr::Literal(_) | Expr::RowId { .. } => {}
// Unqualified identifier: must be a column of the target table or ROWID
Expr::Id(Name::Ident(n)) | Expr::Id(Name::Quoted(n)) => {
let n = n.as_str();
if !n.eq_ignore_ascii_case("rowid") && !has_col(n) {
ok = false;
}
}
// Qualified: qualifier must match this index's table; column must exist
Expr::Qualified(ns, col) | Expr::DoublyQualified(_, ns, col) => {
if !is_tbl(ns.as_str()) || !has_col(col.as_str()) {
ok = false;
}
}
Expr::FunctionCall {
name, filter_over, ..
}
| Expr::FunctionCallStar {
name, filter_over, ..
} => {
// reject windowed
if filter_over.over_clause.is_some() {
ok = false;
} else {
let argc = match e {
Expr::FunctionCall { args, .. } => args.len(),
Expr::FunctionCallStar { .. } => 0,
_ => unreachable!(),
};
if !is_deterministic_fn(name.as_str(), argc) {
ok = false;
}
}
}
// Explicitly disallowed constructs
Expr::Exists(_)
| Expr::InSelect { .. }
| Expr::Subquery(_)
| Expr::Raise { .. }
| Expr::Variable(_) => {
ok = false;
}
_ => {}
}
Ok(if ok {
WalkControl::Continue
} else {
WalkControl::SkipChildren
})
});
ok
}
pub fn bind_where_expr(
&self,
table_refs: Option<&mut TableReferences>,
connection: &Arc<Connection>,
) -> Option<ast::Expr> {
let Some(where_clause) = &self.where_clause else {
return None;
};
let mut params = ParamState::disallow();
let mut expr = where_clause.clone();
bind_and_rewrite_expr(&mut expr, table_refs, None, connection, &mut params).ok()?;
Some(*expr)
}
}
#[cfg(test)]

View File

@@ -8565,6 +8565,7 @@ mod tests {
.unwrap() as usize;
let index_def = Index {
name: "testindex".to_string(),
where_clause: None,
columns: (0..10)
.map(|i| IndexColumn {
name: format!("test{i}"),
@@ -8726,6 +8727,7 @@ mod tests {
.unwrap() as usize;
let index_def = Index {
name: "testindex".to_string(),
where_clause: None,
columns: vec![IndexColumn {
name: "testcol".to_string(),
order: SortOrder::Asc,

View File

@@ -401,6 +401,7 @@ fn create_dedupe_index(
table_name: String::new(),
unique: false,
has_rowid: false,
where_clause: None,
});
let cursor_id = program.alloc_cursor_id(CursorType::BTreeIndex(dedupe_index.clone()));
program.emit_insn(Insn::OpenEphemeral {

View File

@@ -66,7 +66,7 @@ pub fn translate_delete(
approx_num_labels: 0,
};
program.extend(&opts);
emit_program(&mut program, delete_plan, schema, syms, |_| {})?;
emit_program(connection, &mut program, delete_plan, schema, syms, |_| {})?;
Ok(program)
}
@@ -96,7 +96,7 @@ pub fn prepare_delete_plan(
} else {
crate::bail_parse_error!("Table is neither a virtual table nor a btree table");
};
let indexes = schema.get_indices(table.get_name()).to_vec();
let indexes = schema.get_indices(table.get_name()).cloned().collect();
let joined_tables = vec![JoinedTable {
op: Operation::default_scan_for(&table),
table,

View File

@@ -1,6 +1,7 @@
// This module contains code for emitting bytecode instructions for SQL query execution.
// It handles translating high-level SQL operations into low-level bytecode that can be executed by the virtual machine.
use std::num::NonZeroUsize;
use std::sync::Arc;
use tracing::{instrument, Level};
@@ -24,16 +25,20 @@ use crate::error::SQLITE_CONSTRAINT_PRIMARYKEY;
use crate::function::Func;
use crate::schema::{BTreeTable, Column, Schema, Table};
use crate::translate::compound_select::emit_program_for_compound_select;
use crate::translate::expr::{emit_returning_results, ReturningValueRegisters};
use crate::translate::plan::{DeletePlan, Plan, QueryDestination, Search};
use crate::translate::expr::{
emit_returning_results, translate_expr_no_constant_opt, walk_expr_mut, NoConstantOptReason,
ReturningValueRegisters, WalkControl,
};
use crate::translate::plan::{DeletePlan, JoinedTable, Plan, QueryDestination, Search};
use crate::translate::result_row::try_fold_expr_to_i64;
use crate::translate::values::emit_values;
use crate::translate::window::{emit_window_results, init_window, WindowMetadata};
use crate::util::exprs_are_equivalent;
use crate::util::{exprs_are_equivalent, normalize_ident};
use crate::vdbe::builder::{CursorKey, CursorType, ProgramBuilder};
use crate::vdbe::insn::{CmpInsFlags, IdxInsertFlags, InsertFlags, RegisterOrLiteral};
use crate::vdbe::CursorID;
use crate::vdbe::{insn::Insn, BranchOffset};
use crate::Connection;
use crate::{bail_parse_error, Result, SymbolTable};
pub struct Resolver<'a> {
@@ -201,6 +206,7 @@ pub enum TransactionMode {
/// Takes a query plan and generates the corresponding bytecode program
#[instrument(skip_all, level = Level::DEBUG)]
pub fn emit_program(
connection: &Arc<Connection>,
program: &mut ProgramBuilder,
plan: Plan,
schema: &Schema,
@@ -209,8 +215,10 @@ pub fn emit_program(
) -> Result<()> {
match plan {
Plan::Select(plan) => emit_program_for_select(program, plan, schema, syms),
Plan::Delete(plan) => emit_program_for_delete(program, plan, schema, syms),
Plan::Update(plan) => emit_program_for_update(program, plan, schema, syms, after),
Plan::Delete(plan) => emit_program_for_delete(connection, program, plan, schema, syms),
Plan::Update(plan) => {
emit_program_for_update(connection, program, plan, schema, syms, after)
}
Plan::CompoundSelect { .. } => {
emit_program_for_compound_select(program, plan, schema, syms)
}
@@ -407,8 +415,9 @@ pub fn emit_query<'a>(
#[instrument(skip_all, level = Level::DEBUG)]
fn emit_program_for_delete(
connection: &Arc<Connection>,
program: &mut ProgramBuilder,
plan: DeletePlan,
mut plan: DeletePlan,
schema: &Schema,
syms: &SymbolTable,
) -> Result<()> {
@@ -461,9 +470,10 @@ fn emit_program_for_delete(
)?;
emit_delete_insns(
connection,
program,
&mut t_ctx,
&plan.table_references,
&mut plan.table_references,
&plan.result_columns,
)?;
@@ -484,37 +494,35 @@ fn emit_program_for_delete(
}
fn emit_delete_insns(
connection: &Arc<Connection>,
program: &mut ProgramBuilder,
t_ctx: &mut TranslateCtx,
table_references: &TableReferences,
table_references: &mut TableReferences,
result_columns: &[super::plan::ResultSetColumn],
) -> Result<()> {
let table_reference = table_references.joined_tables().first().unwrap();
if table_reference
// we can either use this obviously safe raw pointer or we can clone it
let table_reference: *const JoinedTable = table_references.joined_tables().first().unwrap();
if unsafe { &*table_reference }
.virtual_table()
.is_some_and(|t| t.readonly())
{
return Err(crate::LimboError::ReadOnly);
}
let internal_id = unsafe { (*table_reference).internal_id };
let cursor_id = match &table_reference.op {
Operation::Scan { .. } => {
program.resolve_cursor_id(&CursorKey::table(table_reference.internal_id))
}
let table_name = unsafe { &*table_reference }.table.get_name();
let cursor_id = match unsafe { &(*table_reference).op } {
Operation::Scan { .. } => program.resolve_cursor_id(&CursorKey::table(internal_id)),
Operation::Search(search) => match search {
Search::RowidEq { .. } | Search::Seek { index: None, .. } => {
program.resolve_cursor_id(&CursorKey::table(table_reference.internal_id))
program.resolve_cursor_id(&CursorKey::table(internal_id))
}
Search::Seek {
index: Some(index), ..
} => program.resolve_cursor_id(&CursorKey::index(
table_reference.internal_id,
index.clone(),
)),
} => program.resolve_cursor_id(&CursorKey::index(internal_id, index.clone())),
},
};
let main_table_cursor_id =
program.resolve_cursor_id(&CursorKey::table(table_reference.internal_id));
let main_table_cursor_id = program.resolve_cursor_id(&CursorKey::table(internal_id));
// Emit the instructions to delete the row
let key_reg = program.alloc_register();
@@ -523,7 +531,7 @@ fn emit_delete_insns(
dest: key_reg,
});
if table_reference.virtual_table().is_some() {
if unsafe { &*table_reference }.virtual_table().is_some() {
let conflict_action = 0u16;
let start_reg = key_reg;
@@ -540,14 +548,10 @@ fn emit_delete_insns(
});
} else {
// Delete from all indexes before deleting from the main table.
let indexes = t_ctx
.resolver
.schema
.indexes
.get(table_reference.table.get_name());
let indexes = t_ctx.resolver.schema.indexes.get(table_name);
// Get the index that is being used to iterate the deletion loop, if there is one.
let iteration_index = table_reference.op.index();
let iteration_index = unsafe { &*table_reference }.op.index();
// Get all indexes that are not the iteration index.
let other_indexes = indexes
.map(|indexes| {
@@ -561,10 +565,8 @@ fn emit_delete_insns(
.map(|index| {
(
index.clone(),
program.resolve_cursor_id(&CursorKey::index(
table_reference.internal_id,
index.clone(),
)),
program
.resolve_cursor_id(&CursorKey::index(internal_id, index.clone())),
)
})
.collect::<Vec<_>>()
@@ -572,6 +574,29 @@ fn emit_delete_insns(
.unwrap_or_default();
for (index, index_cursor_id) in other_indexes {
let skip_delete_label = if index.where_clause.is_some() {
let where_copy = index
.bind_where_expr(Some(table_references), connection)
.expect("where clause to exist");
let skip_label = program.allocate_label();
let reg = program.alloc_register();
translate_expr_no_constant_opt(
program,
Some(table_references),
&where_copy,
reg,
&t_ctx.resolver,
NoConstantOptReason::RegisterReuse,
)?;
program.emit_insn(Insn::IfNot {
reg,
jump_if_null: true,
target_pc: skip_label,
});
Some(skip_label)
} else {
None
};
let num_regs = index.columns.len() + 1;
let start_reg = program.alloc_registers(num_regs);
// Emit columns that are part of the index
@@ -594,8 +619,11 @@ fn emit_delete_insns(
start_reg,
num_regs,
cursor_id: index_cursor_id,
raise_error_if_no_matching_entry: true,
raise_error_if_no_matching_entry: index.where_clause.is_none(),
});
if let Some(label) = skip_delete_label {
program.resolve_label(label, program.offset());
}
}
// Emit update in the CDC table if necessary (before DELETE updated the table)
@@ -609,7 +637,7 @@ fn emit_delete_insns(
let before_record_reg = if cdc_has_before {
Some(emit_cdc_full_record(
program,
table_reference.table.columns(),
unsafe { &*table_reference }.table.columns(),
main_table_cursor_id,
rowid_reg,
))
@@ -625,7 +653,7 @@ fn emit_delete_insns(
before_record_reg,
None,
None,
table_reference.table.get_name(),
table_name,
)?;
}
@@ -637,12 +665,13 @@ fn emit_delete_insns(
cursor_id: main_table_cursor_id,
dest: rowid_reg,
});
let cols_len = unsafe { &*table_reference }.columns().len();
// Allocate registers for column values
let columns_start_reg = program.alloc_registers(table_reference.columns().len());
let columns_start_reg = program.alloc_registers(cols_len);
// Read all column values from the row to be deleted
for (i, _column) in table_reference.columns().iter().enumerate() {
for (i, _column) in unsafe { &*table_reference }.columns().iter().enumerate() {
program.emit_column_or_rowid(main_table_cursor_id, i, columns_start_reg + i);
}
@@ -650,7 +679,7 @@ fn emit_delete_insns(
let value_registers = ReturningValueRegisters {
rowid_register: rowid_reg,
columns_start_register: columns_start_reg,
num_columns: table_reference.columns().len(),
num_columns: cols_len,
};
emit_returning_results(program, result_columns, &value_registers)?;
@@ -658,14 +687,12 @@ fn emit_delete_insns(
program.emit_insn(Insn::Delete {
cursor_id: main_table_cursor_id,
table_name: table_reference.table.get_name().to_string(),
table_name: table_name.to_string(),
});
if let Some(index) = iteration_index {
let iteration_index_cursor = program.resolve_cursor_id(&CursorKey::index(
table_reference.internal_id,
index.clone(),
));
let iteration_index_cursor =
program.resolve_cursor_id(&CursorKey::index(internal_id, index.clone()));
program.emit_insn(Insn::Delete {
cursor_id: iteration_index_cursor,
table_name: index.name.clone(),
@@ -684,6 +711,7 @@ fn emit_delete_insns(
#[instrument(skip_all, level = Level::DEBUG)]
fn emit_program_for_update(
connection: &Arc<Connection>,
program: &mut ProgramBuilder,
mut plan: UpdatePlan,
schema: &Schema,
@@ -779,7 +807,14 @@ fn emit_program_for_update(
)?;
// Emit update instructions
emit_update_insns(&plan, &t_ctx, program, index_cursors, temp_cursor_id)?;
emit_update_insns(
connection,
&mut plan,
&t_ctx,
program,
index_cursors,
temp_cursor_id,
)?;
// Close the main loop
close_loop(
@@ -801,27 +836,29 @@ fn emit_program_for_update(
#[instrument(skip_all, level = Level::DEBUG)]
fn emit_update_insns(
plan: &UpdatePlan,
connection: &Arc<Connection>,
plan: &mut UpdatePlan,
t_ctx: &TranslateCtx,
program: &mut ProgramBuilder,
index_cursors: Vec<(usize, usize)>,
temp_cursor_id: Option<CursorID>,
) -> crate::Result<()> {
let table_ref = plan.table_references.joined_tables().first().unwrap();
// we can either use this obviously safe raw pointer or we can clone it
let table_ref: *const JoinedTable = plan.table_references.joined_tables().first().unwrap();
let internal_id = unsafe { (*table_ref).internal_id };
let loop_labels = t_ctx.labels_main_loop.first().unwrap();
let cursor_id = program.resolve_cursor_id(&CursorKey::table(table_ref.internal_id));
let (index, is_virtual) = match &table_ref.op {
let cursor_id = program.resolve_cursor_id(&CursorKey::table(internal_id));
let (index, is_virtual) = match &unsafe { &*table_ref }.op {
Operation::Scan(Scan::BTreeTable { index, .. }) => (
index.as_ref().map(|index| {
(
index.clone(),
program
.resolve_cursor_id(&CursorKey::index(table_ref.internal_id, index.clone())),
program.resolve_cursor_id(&CursorKey::index(internal_id, index.clone())),
)
}),
false,
),
Operation::Scan(_) => (None, table_ref.virtual_table().is_some()),
Operation::Scan(_) => (None, unsafe { &*table_ref }.virtual_table().is_some()),
Operation::Search(search) => match search {
&Search::RowidEq { .. } | Search::Seek { index: None, .. } => (None, false),
Search::Seek {
@@ -829,8 +866,7 @@ fn emit_update_insns(
} => (
Some((
index.clone(),
program
.resolve_cursor_id(&CursorKey::index(table_ref.internal_id, index.clone())),
program.resolve_cursor_id(&CursorKey::index(internal_id, index.clone())),
)),
false,
),
@@ -838,7 +874,7 @@ fn emit_update_insns(
};
let beg = program.alloc_registers(
table_ref.table.columns().len()
unsafe { &*table_ref }.table.columns().len()
+ if is_virtual {
2 // two args before the relevant columns for VUpdate
} else {
@@ -851,7 +887,10 @@ fn emit_update_insns(
});
// Check if rowid was provided (through INTEGER PRIMARY KEY as a rowid alias)
let rowid_alias_index = table_ref.columns().iter().position(|c| c.is_rowid_alias);
let rowid_alias_index = unsafe { &*table_ref }
.columns()
.iter()
.position(|c| c.is_rowid_alias);
let has_user_provided_rowid = if let Some(index) = rowid_alias_index {
plan.set_clauses.iter().position(|(idx, _)| *idx == index)
@@ -901,6 +940,7 @@ fn emit_update_insns(
decrement_by: 1,
});
}
let col_len = unsafe { &*table_ref }.columns().len();
// we scan a column at a time, loading either the column's values, or the new value
// from the Set expression, into registers so we can emit a MakeRecord and update the row.
@@ -908,13 +948,14 @@ fn emit_update_insns(
// we allocate 2C registers for "updates" as the structure of this column for CDC table is following:
// [C boolean values where true set for changed columns] [C values with updates where NULL is set for not-changed columns]
let cdc_updates_register = if program.capture_data_changes_mode().has_updates() {
Some(program.alloc_registers(2 * table_ref.columns().len()))
Some(program.alloc_registers(2 * col_len))
} else {
None
};
let table_name = unsafe { &*table_ref }.table.get_name();
let start = if is_virtual { beg + 2 } else { beg + 1 };
for (idx, table_column) in table_ref.columns().iter().enumerate() {
for (idx, table_column) in unsafe { &*table_ref }.columns().iter().enumerate() {
let target_reg = start + idx;
if let Some((_, expr)) = plan.set_clauses.iter().find(|(i, _)| *i == idx) {
if has_user_provided_rowid
@@ -950,7 +991,7 @@ fn emit_update_insns(
err_code: SQLITE_CONSTRAINT_NOTNULL,
description: format!(
"{}.{}",
table_ref.table.get_name(),
table_name,
table_column
.name
.as_ref()
@@ -962,7 +1003,7 @@ fn emit_update_insns(
if let Some(cdc_updates_register) = cdc_updates_register {
let change_reg = cdc_updates_register + idx;
let value_reg = cdc_updates_register + table_ref.columns().len() + idx;
let value_reg = cdc_updates_register + col_len + idx;
program.emit_bool(true, change_reg);
program.mark_last_insn_constant();
let mut updated = false;
@@ -1017,7 +1058,7 @@ fn emit_update_insns(
if let Some(cdc_updates_register) = cdc_updates_register {
let change_bit_reg = cdc_updates_register + idx;
let value_reg = cdc_updates_register + table_ref.columns().len() + idx;
let value_reg = cdc_updates_register + col_len + idx;
program.emit_bool(false, change_bit_reg);
program.mark_last_insn_constant();
program.emit_null(value_reg, None);
@@ -1027,34 +1068,125 @@ fn emit_update_insns(
}
for (index, (idx_cursor_id, record_reg)) in plan.indexes_to_update.iter().zip(&index_cursors) {
let num_cols = index.columns.len();
// allocate scratch registers for the index columns plus rowid
let idx_start_reg = program.alloc_registers(num_cols + 1);
// We need to know whether or not the OLD values satisfied the predicate on the
// partial index, so we can know whether or not to delete the old index entry,
// as well as whether or not the NEW values satisfy the predicate, to determine whether
// or not to insert a new index entry for a partial index
let (old_satisfies_where, new_satisfies_where) = if index.where_clause.is_some() {
// This means that we need to bind the column references to a copy of the index Expr,
// so we can emit Insn::Column instructions and refer to the old values.
let where_clause = index
.bind_where_expr(Some(&mut plan.table_references), connection)
.expect("where clause to exist");
let old_satisfied_reg = program.alloc_register();
translate_expr_no_constant_opt(
program,
Some(&plan.table_references),
&where_clause,
old_satisfied_reg,
&t_ctx.resolver,
NoConstantOptReason::RegisterReuse,
)?;
// Use the new rowid value (if the UPDATE statement sets the rowid alias),
// otherwise keep using the original rowid. This guarantees that any
// newly inserted/updated index entries point at the correct row after
// the primary key change.
let rowid_reg = if has_user_provided_rowid {
// Safe to unwrap because `has_user_provided_rowid` implies the register was allocated.
rowid_set_clause_reg.expect("rowid register must be set when updating rowid alias")
// grab a new copy of the original where clause from the index
let mut new_where = index
.where_clause
.as_ref()
.expect("checked where clause to exist")
.clone();
// Now we need to rewrite the Expr::Id and Expr::Qualified/Expr::RowID (from a copy of the original, un-bound `where` expr),
// to refer to the new values, which are already loaded into registers starting at `start`.
rewrite_where_for_update_registers(
&mut new_where,
unsafe { &*table_ref }.columns(),
start,
rowid_set_clause_reg.unwrap_or(beg),
)?;
let new_satisfied_reg = program.alloc_register();
translate_expr_no_constant_opt(
program,
None,
&new_where,
new_satisfied_reg,
&t_ctx.resolver,
NoConstantOptReason::RegisterReuse,
)?;
// now we have two registers that tell us whether or not the old and new values satisfy
// the partial index predicate, and we can use those to decide whether or not to
// delete/insert a new index entry for this partial index.
(Some(old_satisfied_reg), Some(new_satisfied_reg))
} else {
beg
(None, None)
};
let idx_cols_start_reg = beg + 1;
// copy each index column from the table's column registers into these scratch regs
let mut skip_delete_label = None;
let mut skip_insert_label = None;
// Handle deletion for partial indexes
if let Some(old_satisfied) = old_satisfies_where {
skip_delete_label = Some(program.allocate_label());
// If the old values don't satisfy the WHERE clause, skip the delete
program.emit_insn(Insn::IfNot {
reg: old_satisfied,
target_pc: skip_delete_label.unwrap(),
jump_if_null: true,
});
}
// Delete old index entry
let num_regs = index.columns.len() + 1;
let delete_start_reg = program.alloc_registers(num_regs);
for (reg_offset, column_index) in index.columns.iter().enumerate() {
program.emit_column_or_rowid(
cursor_id,
column_index.pos_in_table,
delete_start_reg + reg_offset,
);
}
program.emit_insn(Insn::RowId {
cursor_id,
dest: delete_start_reg + num_regs - 1,
});
program.emit_insn(Insn::IdxDelete {
start_reg: delete_start_reg,
num_regs,
cursor_id: *idx_cursor_id,
raise_error_if_no_matching_entry: true,
});
// Resolve delete skip label if it exists
if let Some(label) = skip_delete_label {
program.resolve_label(label, program.offset());
}
// Check if we should insert into partial index
if let Some(new_satisfied) = new_satisfies_where {
skip_insert_label = Some(program.allocate_label());
// If the new values don't satisfy the WHERE clause, skip the idx insert
program.emit_insn(Insn::IfNot {
reg: new_satisfied,
target_pc: skip_insert_label.unwrap(),
jump_if_null: true,
});
}
// Build new index entry
let num_cols = index.columns.len();
let idx_start_reg = program.alloc_registers(num_cols + 1);
let rowid_reg = rowid_set_clause_reg.unwrap_or(beg);
for (i, col) in index.columns.iter().enumerate() {
let col_in_table = table_ref
let col_in_table = unsafe { &*table_ref }
.columns()
.get(col.pos_in_table)
.expect("column index out of bounds");
// copy from the table's column register over to the index's scratch register
program.emit_insn(Insn::Copy {
src_reg: if col_in_table.is_rowid_alias {
rowid_reg
} else {
idx_cols_start_reg + col.pos_in_table
start + col.pos_in_table
},
dst_reg: idx_start_reg + i,
extra_amount: 0,
@@ -1067,7 +1199,6 @@ fn emit_update_insns(
extra_amount: 0,
});
// this record will be inserted into the index later
program.emit_insn(Insn::MakeRecord {
start_reg: idx_start_reg,
count: num_cols + 1,
@@ -1076,62 +1207,87 @@ fn emit_update_insns(
affinity_str: None,
});
if !index.unique {
continue;
// Handle unique constraint
if index.unique {
let aff = index
.columns
.iter()
.map(|ic| {
unsafe { &*table_ref }.columns()[ic.pos_in_table]
.affinity()
.aff_mask()
})
.collect::<String>();
program.emit_insn(Insn::Affinity {
start_reg: idx_start_reg,
count: NonZeroUsize::new(num_cols).expect("nonzero col count"),
affinities: aff,
});
let constraint_check = program.allocate_label();
// check if the record already exists in the index for unique indexes and abort if so
program.emit_insn(Insn::NoConflict {
cursor_id: *idx_cursor_id,
target_pc: constraint_check,
record_reg: idx_start_reg,
num_regs: num_cols,
});
let idx_rowid_reg = program.alloc_register();
program.emit_insn(Insn::IdxRowId {
cursor_id: *idx_cursor_id,
dest: idx_rowid_reg,
});
// Skip over the UNIQUE constraint failure if the existing row is the one that we are currently changing
program.emit_insn(Insn::Eq {
lhs: beg,
rhs: idx_rowid_reg,
target_pc: constraint_check,
flags: CmpInsFlags::default(),
collation: program.curr_collation(),
});
let column_names = index.columns.iter().enumerate().fold(
String::with_capacity(50),
|mut accum, (idx, col)| {
if idx > 0 {
accum.push_str(", ");
}
accum.push_str(table_name);
accum.push('.');
accum.push_str(&col.name);
accum
},
);
program.emit_insn(Insn::Halt {
err_code: SQLITE_CONSTRAINT_PRIMARYKEY,
description: column_names,
});
program.preassign_label_to_next_insn(constraint_check);
}
// check if the record already exists in the index for unique indexes and abort if so
let constraint_check = program.allocate_label();
program.emit_insn(Insn::NoConflict {
// Insert the index entry
program.emit_insn(Insn::IdxInsert {
cursor_id: *idx_cursor_id,
target_pc: constraint_check,
record_reg: idx_start_reg,
num_regs: num_cols,
record_reg: *record_reg,
unpacked_start: Some(idx_start_reg),
unpacked_count: Some((num_cols + 1) as u16),
flags: IdxInsertFlags::new().nchange(true),
});
let column_names = index.columns.iter().enumerate().fold(
String::with_capacity(50),
|mut accum, (idx, col)| {
if idx > 0 {
accum.push_str(", ");
}
accum.push_str(table_ref.table.get_name());
accum.push('.');
accum.push_str(&col.name);
accum
},
);
let idx_rowid_reg = program.alloc_register();
program.emit_insn(Insn::IdxRowId {
cursor_id: *idx_cursor_id,
dest: idx_rowid_reg,
});
// Skip over the UNIQUE constraint failure if the existing row is the one that we are currently changing
let original_rowid_reg = beg;
program.emit_insn(Insn::Eq {
lhs: original_rowid_reg,
rhs: idx_rowid_reg,
target_pc: constraint_check,
flags: CmpInsFlags::default(), // TODO: not sure what type of comparison flag is needed
collation: program.curr_collation(),
});
program.emit_insn(Insn::Halt {
err_code: SQLITE_CONSTRAINT_PRIMARYKEY, // TODO: distinct between primary key and unique index for error code
description: column_names,
});
program.preassign_label_to_next_insn(constraint_check);
// Resolve insert skip label if it exists
if let Some(label) = skip_insert_label {
program.resolve_label(label, program.offset());
}
}
if let Some(btree_table) = table_ref.btree() {
if let Some(btree_table) = unsafe { &*table_ref }.btree() {
if btree_table.is_strict {
program.emit_insn(Insn::TypeCheck {
start_reg: start,
count: table_ref.columns().len(),
count: col_len,
check_generated: true,
table_reference: Arc::clone(&btree_table),
});
@@ -1159,8 +1315,8 @@ fn emit_update_insns(
err_code: SQLITE_CONSTRAINT_PRIMARYKEY,
description: format!(
"{}.{}",
table_ref.table.get_name(),
&table_ref
table_name,
unsafe { &*table_ref }
.columns()
.get(idx)
.unwrap()
@@ -1175,7 +1331,7 @@ fn emit_update_insns(
let record_reg = program.alloc_register();
let affinity_str = table_ref
let affinity_str = unsafe { &*table_ref }
.columns()
.iter()
.map(|col| col.affinity().aff_mask())
@@ -1183,7 +1339,7 @@ fn emit_update_insns(
program.emit_insn(Insn::MakeRecord {
start_reg: start,
count: table_ref.columns().len(),
count: col_len,
dest_reg: record_reg,
index_name: None,
affinity_str: Some(affinity_str),
@@ -1197,47 +1353,6 @@ fn emit_update_insns(
});
}
// For each index -> insert
for (index, (idx_cursor_id, record_reg)) in plan.indexes_to_update.iter().zip(index_cursors)
{
let num_regs = index.columns.len() + 1;
let start_reg = program.alloc_registers(num_regs);
// Delete existing index key
index
.columns
.iter()
.enumerate()
.for_each(|(reg_offset, column_index)| {
program.emit_column_or_rowid(
cursor_id,
column_index.pos_in_table,
start_reg + reg_offset,
);
});
program.emit_insn(Insn::RowId {
cursor_id,
dest: start_reg + num_regs - 1,
});
program.emit_insn(Insn::IdxDelete {
start_reg,
num_regs,
cursor_id: idx_cursor_id,
raise_error_if_no_matching_entry: true,
});
// Insert new index key (filled further above with values from set_clauses)
program.emit_insn(Insn::IdxInsert {
cursor_id: idx_cursor_id,
record_reg,
unpacked_start: Some(start),
unpacked_count: Some((index.columns.len() + 1) as u16),
flags: IdxInsertFlags::new().nchange(true),
});
}
// create alias for CDC rowid after the change (will differ from cdc_rowid_before_reg only in case of UPDATE with change in rowid alias)
let cdc_rowid_after_reg = rowid_set_clause_reg.unwrap_or(beg);
@@ -1261,7 +1376,7 @@ fn emit_update_insns(
let cdc_before_reg = if program.capture_data_changes_mode().has_before() {
Some(emit_cdc_full_record(
program,
table_ref.table.columns(),
unsafe { &*table_ref }.table.columns(),
cursor_id,
cdc_rowid_before_reg.expect("cdc_rowid_before_reg must be set"),
))
@@ -1275,7 +1390,7 @@ fn emit_update_insns(
if has_user_provided_rowid {
program.emit_insn(Insn::Delete {
cursor_id,
table_name: table_ref.table.get_name().to_string(),
table_name: table_name.to_string(),
});
}
@@ -1290,7 +1405,7 @@ fn emit_update_insns(
} else {
InsertFlags::new()
},
table_name: table_ref.identifier.clone(),
table_name: unsafe { &*table_ref }.identifier.clone(),
});
// Emit RETURNING results if specified
@@ -1299,7 +1414,7 @@ fn emit_update_insns(
let value_registers = ReturningValueRegisters {
rowid_register: rowid_set_clause_reg.unwrap_or(beg),
columns_start_register: start,
num_columns: table_ref.columns().len(),
num_columns: col_len,
};
emit_returning_results(program, returning_columns, &value_registers)?;
@@ -1310,7 +1425,7 @@ fn emit_update_insns(
let cdc_after_reg = if program.capture_data_changes_mode().has_after() {
Some(emit_cdc_patch_record(
program,
&table_ref.table,
&unsafe { &*table_ref }.table,
start,
record_reg,
cdc_rowid_after_reg,
@@ -1323,7 +1438,7 @@ fn emit_update_insns(
let record_reg = program.alloc_register();
program.emit_insn(Insn::MakeRecord {
start_reg: cdc_updates_register,
count: 2 * table_ref.columns().len(),
count: 2 * col_len,
dest_reg: record_reg,
index_name: None,
affinity_str: None,
@@ -1347,7 +1462,7 @@ fn emit_update_insns(
cdc_before_reg,
None,
None,
table_ref.table.get_name(),
table_name,
)?;
emit_cdc_insns(
program,
@@ -1358,7 +1473,7 @@ fn emit_update_insns(
cdc_after_reg,
None,
None,
table_ref.table.get_name(),
table_name,
)?;
} else {
emit_cdc_insns(
@@ -1370,12 +1485,12 @@ fn emit_update_insns(
cdc_before_reg,
cdc_after_reg,
cdc_updates_record,
table_ref.table.get_name(),
table_name,
)?;
}
}
} else if table_ref.virtual_table().is_some() {
let arg_count = table_ref.columns().len() + 2;
} else if unsafe { &*table_ref }.virtual_table().is_some() {
let arg_count = col_len + 2;
program.emit_insn(Insn::VUpdate {
cursor_id,
arg_count,
@@ -1671,3 +1786,54 @@ fn init_limit(
}
}
}
/// We have `Expr`s which have *not* had column references bound to them,
/// so they are in the state of Expr::Id/Expr::Qualified, etc, and instead of binding Expr::Column
/// we need to bind Expr::Register, as we have already loaded the *new* column values from the
/// UPDATE statement into registers starting at `columns_start_reg`, which we want to reference.
fn rewrite_where_for_update_registers(
expr: &mut Expr,
columns: &[Column],
columns_start_reg: usize,
rowid_reg: usize,
) -> Result<WalkControl> {
walk_expr_mut(expr, &mut |e: &mut Expr| -> Result<WalkControl> {
match e {
Expr::Qualified(_, col) | Expr::DoublyQualified(_, _, col) => {
let normalized = normalize_ident(col.as_str());
if let Some((idx, c)) = columns.iter().enumerate().find(|(_, c)| {
c.name
.as_ref()
.is_some_and(|n| n.eq_ignore_ascii_case(&normalized))
}) {
if c.is_rowid_alias {
*e = Expr::Register(rowid_reg);
} else {
*e = Expr::Register(columns_start_reg + idx);
}
}
}
Expr::Id(ast::Name::Ident(name)) | Expr::Id(ast::Name::Quoted(name)) => {
let normalized = normalize_ident(name.as_str());
if normalized.eq_ignore_ascii_case("rowid") {
*e = Expr::Register(rowid_reg);
} else if let Some((idx, c)) = columns.iter().enumerate().find(|(_, c)| {
c.name
.as_ref()
.is_some_and(|n| n.eq_ignore_ascii_case(&normalized))
}) {
if c.is_rowid_alias {
*e = Expr::Register(rowid_reg);
} else {
*e = Expr::Register(columns_start_reg + idx);
}
}
}
Expr::RowId { .. } => {
*e = Expr::Register(rowid_reg);
}
_ => {}
}
Ok(WalkControl::Continue)
})
}

View File

@@ -3264,6 +3264,14 @@ impl Default for ParamState {
Self { next_param_idx: 1 }
}
}
impl ParamState {
pub fn is_valid(&self) -> bool {
self.next_param_idx > 0
}
pub fn disallow() -> Self {
Self { next_param_idx: 0 }
}
}
/// Rewrite ast::Expr in place, binding Column references/rewriting Expr::Id -> Expr::Column
/// using the provided TableReferences, and replacing anonymous parameters with internal named
@@ -3287,6 +3295,9 @@ pub fn bind_and_rewrite_expr<'a>(
}
// Rewrite anonymous variables in encounter order.
ast::Expr::Variable(var) if var.is_empty() => {
if !param_state.is_valid() {
crate::bail_parse_error!("Parameters are not allowed in this context");
}
*expr = ast::Expr::Variable(format!(
"{}{}",
PARAM_PREFIX, param_state.next_param_idx

View File

@@ -1,9 +1,16 @@
use std::sync::Arc;
use crate::schema::Table;
use crate::translate::emitter::{
emit_cdc_full_record, emit_cdc_insns, prepare_cdc_if_necessary, OperationMode, Resolver,
};
use crate::translate::expr::{translate_condition_expr, ConditionMetadata};
use crate::translate::plan::{
ColumnUsedMask, IterationDirection, JoinedTable, Operation, Scan, TableReferences,
};
use crate::vdbe::builder::CursorKey;
use crate::vdbe::insn::{CmpInsFlags, Cookie};
use crate::vdbe::BranchOffset;
use crate::SymbolTable;
use crate::{
schema::{BTreeTable, Column, Index, IndexColumn, PseudoCursorType, Schema},
@@ -18,6 +25,7 @@ use turso_parser::ast::{self, Expr, SortOrder, SortedColumn};
use super::schema::{emit_schema_entry, SchemaEntryType, SQLITE_TABLEID};
#[allow(clippy::too_many_arguments)]
pub fn translate_create_index(
unique_if_not_exists: (bool, bool),
idx_name: &str,
@@ -26,6 +34,8 @@ pub fn translate_create_index(
schema: &Schema,
syms: &SymbolTable,
mut program: ProgramBuilder,
connection: &Arc<crate::Connection>,
where_clause: Option<Box<Expr>>,
) -> crate::Result<ProgramBuilder> {
if !schema.indexes_enabled() {
crate::bail_parse_error!(
@@ -50,10 +60,10 @@ pub fn translate_create_index(
}
crate::bail_parse_error!("Error: index with name '{idx_name}' already exists.");
}
let Some(tbl) = schema.tables.get(&tbl_name) else {
let Some(table) = schema.tables.get(&tbl_name) else {
crate::bail_parse_error!("Error: table '{tbl_name}' does not exist.");
};
let Some(tbl) = tbl.btree() else {
let Some(tbl) = table.btree() else {
crate::bail_parse_error!("Error: table '{tbl_name}' is not a b-tree table.");
};
let columns = resolve_sorted_columns(&tbl, columns)?;
@@ -75,8 +85,20 @@ pub fn translate_create_index(
unique: unique_if_not_exists.0,
ephemeral: false,
has_rowid: tbl.has_rowid,
// store the *original* where clause, because we need to rewrite it
// before translating, and it cannot reference a table alias
where_clause: where_clause.clone(),
});
if !idx.validate_where_expr(table) {
crate::bail_parse_error!(
"Error: cannot use aggregate, window functions or reference other tables in WHERE clause of CREATE INDEX:\n {}",
where_clause
.expect("where expr has to exist in order to fail")
.to_string()
);
}
// Allocate the necessary cursors:
//
// 1. sqlite_schema_cursor_id - sqlite_schema table
@@ -87,13 +109,34 @@ pub fn translate_create_index(
let sqlite_table = schema.get_btree_table(SQLITE_TABLEID).unwrap();
let sqlite_schema_cursor_id =
program.alloc_cursor_id(CursorType::BTreeTable(sqlite_table.clone()));
let table_ref = program.table_reference_counter.next();
let btree_cursor_id = program.alloc_cursor_id(CursorType::BTreeIndex(idx.clone()));
let table_cursor_id = program.alloc_cursor_id(CursorType::BTreeTable(tbl.clone()));
let table_cursor_id = program.alloc_cursor_id_keyed(
CursorKey::table(table_ref),
CursorType::BTreeTable(tbl.clone()),
);
let sorter_cursor_id = program.alloc_cursor_id(CursorType::Sorter);
let pseudo_cursor_id = program.alloc_cursor_id(CursorType::Pseudo(PseudoCursorType {
column_count: tbl.columns.len(),
}));
let mut table_references = TableReferences::new(
vec![JoinedTable {
op: Operation::Scan(Scan::BTreeTable {
iter_dir: IterationDirection::Forwards,
index: None,
}),
table: Table::BTree(tbl.clone()),
identifier: tbl_name.clone(),
internal_id: table_ref,
join_info: None,
col_used_mask: ColumnUsedMask::default(),
database_id: 0,
}],
vec![],
);
let where_clause = idx.bind_where_expr(Some(&mut table_references), connection);
// Create a new B-Tree and store the root page index in a register
let root_page_reg = program.alloc_register();
program.emit_insn(Insn::CreateBtree {
@@ -108,7 +151,13 @@ pub fn translate_create_index(
root_page: RegisterOrLiteral::Literal(sqlite_table.root_page),
db: 0,
});
let sql = create_idx_stmt_to_sql(&tbl_name, &idx_name, unique_if_not_exists, &columns);
let sql = create_idx_stmt_to_sql(
&tbl_name,
&idx_name,
unique_if_not_exists,
&columns,
&idx.where_clause.clone(),
);
let resolver = Resolver::new(schema, syms);
let cdc_table = prepare_cdc_if_necessary(&mut program, schema, SQLITE_TABLEID)?;
emit_schema_entry(
@@ -159,6 +208,23 @@ pub fn translate_create_index(
// emit MakeRecord (index key + rowid) into record_reg.
//
// Then insert the record into the sorter
let mut skip_row_label = None;
if let Some(where_clause) = where_clause {
let label = program.allocate_label();
translate_condition_expr(
&mut program,
&table_references,
&where_clause,
ConditionMetadata {
jump_if_condition_is_true: false,
jump_target_when_false: label,
jump_target_when_true: BranchOffset::Placeholder,
},
&resolver,
)?;
skip_row_label = Some(label);
}
let start_reg = program.alloc_registers(columns.len() + 1);
for (i, (col, _)) in columns.iter().enumerate() {
program.emit_column_or_rowid(table_cursor_id, col.0, start_reg + i);
@@ -181,6 +247,9 @@ pub fn translate_create_index(
record_reg,
});
if let Some(skip_row_label) = skip_row_label {
program.resolve_label(skip_row_label, program.offset());
}
program.emit_insn(Insn::Next {
cursor_id: table_cursor_id,
pc_if_next: loop_start_label,
@@ -285,6 +354,7 @@ fn create_idx_stmt_to_sql(
idx_name: &str,
unique_if_not_exists: (bool, bool),
cols: &[((usize, &Column), SortOrder)],
where_clause: &Option<Box<Expr>>,
) -> String {
let mut sql = String::with_capacity(128);
sql.push_str("CREATE ");
@@ -309,6 +379,10 @@ fn create_idx_stmt_to_sql(
}
}
sql.push(')');
if let Some(where_clause) = where_clause {
sql.push_str(" WHERE ");
sql.push_str(&where_clause.to_string());
}
sql
}

View File

@@ -1,21 +1,25 @@
use std::num::NonZeroUsize;
use std::sync::Arc;
use turso_parser::ast::{
self, Expr, InsertBody, OneSelect, QualifiedName, ResolveType, ResultColumn, Upsert, UpsertDo,
With,
};
use crate::error::{SQLITE_CONSTRAINT_NOTNULL, SQLITE_CONSTRAINT_PRIMARYKEY};
use crate::schema::{self, Table};
use crate::error::{
SQLITE_CONSTRAINT_NOTNULL, SQLITE_CONSTRAINT_PRIMARYKEY, SQLITE_CONSTRAINT_UNIQUE,
};
use crate::schema::{self, Index, Table};
use crate::translate::emitter::{
emit_cdc_insns, emit_cdc_patch_record, prepare_cdc_if_necessary, OperationMode,
};
use crate::translate::expr::{
bind_and_rewrite_expr, emit_returning_results, process_returning_clause, ParamState,
ReturningValueRegisters,
bind_and_rewrite_expr, emit_returning_results, process_returning_clause, walk_expr_mut,
ParamState, ReturningValueRegisters, WalkControl,
};
use crate::translate::plan::TableReferences;
use crate::translate::planner::ROWID;
use crate::translate::upsert::{
collect_set_clauses_for_upsert, emit_upsert, upsert_matches_index, upsert_matches_pk,
collect_set_clauses_for_upsert, emit_upsert, resolve_upsert_target, ResolvedUpsertTarget,
};
use crate::util::normalize_ident;
use crate::vdbe::builder::ProgramBuilderOpts;
@@ -166,6 +170,12 @@ pub fn translate_insert(
}
upsert_opt = upsert.as_deref().cloned();
}
// resolve the constrained target for UPSERT if specified
let resolved_upsert = if let Some(upsert) = &upsert_opt {
Some(resolve_upsert_target(schema, &table, upsert)?)
} else {
None
};
let halt_label = program.allocate_label();
let loop_start_label = program.allocate_label();
@@ -343,6 +353,7 @@ pub fn translate_insert(
program.alloc_cursor_id(CursorType::BTreeTable(btree_table.clone())),
),
};
let has_upsert = upsert_opt.is_some();
// Set up the program to return result columns if RETURNING is specified
if !result_columns.is_empty() {
@@ -353,7 +364,6 @@ pub fn translate_insert(
// (idx name, root_page, idx cursor id)
let idx_cursors = schema
.get_indices(table_name.as_str())
.iter()
.map(|idx| {
(
&idx.name,
@@ -365,6 +375,9 @@ pub fn translate_insert(
let insertion = build_insertion(&mut program, &table, &columns, num_values)?;
let upsert_entry = program.allocate_label();
let conflict_rowid_reg = program.alloc_register();
if inserting_multiple_rows {
translate_rows_multiple(
&mut program,
@@ -422,17 +435,6 @@ pub fn translate_insert(
});
}
let emit_halt_with_constraint = |program: &mut ProgramBuilder, col_name: &str| {
let mut description = String::with_capacity(table_name.as_str().len() + col_name.len() + 2);
description.push_str(table_name.as_str());
description.push('.');
description.push_str(col_name);
program.emit_insn(Insn::Halt {
err_code: SQLITE_CONSTRAINT_PRIMARYKEY,
description,
});
};
// Check uniqueness constraint for rowid if it was provided by user.
// When the DB allocates it there are no need for separate uniqueness checks.
if has_user_provided_rowid {
@@ -447,41 +449,32 @@ pub fn translate_insert(
// Conflict on rowid: attempt to route through UPSERT if it targets the PK, otherwise raise constraint.
// emit Halt for every case *except* when upsert handles the conflict
'emit_halt: {
if let Some(ref mut upsert) = upsert_opt.as_mut() {
if upsert_matches_pk(upsert, &table) {
match upsert.do_clause {
UpsertDo::Nothing => {
program.emit_insn(Insn::Goto {
target_pc: row_done_label,
});
}
UpsertDo::Set {
ref mut sets,
ref mut where_clause,
} => {
let mut rewritten_sets = collect_set_clauses_for_upsert(&table, sets)?;
emit_upsert(
&mut program,
schema,
&table,
&insertion,
cursor_id,
insertion.key_register(),
&mut rewritten_sets,
where_clause,
&resolver,
&idx_cursors,
&mut result_columns,
cdc_table.as_ref().map(|c| c.0),
row_done_label,
)?;
}
}
if let (Some(_), Some(ref target)) = (upsert_opt.as_mut(), resolved_upsert.as_ref()) {
if matches!(
target,
ResolvedUpsertTarget::CatchAll | ResolvedUpsertTarget::PrimaryKey
) {
// PK conflict: the conflicting rowid is exactly the attempted key
program.emit_insn(Insn::Copy {
src_reg: insertion.key_register(),
dst_reg: conflict_rowid_reg,
extra_amount: 0,
});
program.emit_insn(Insn::Goto {
target_pc: upsert_entry,
});
break 'emit_halt;
}
}
emit_halt_with_constraint(&mut program, rowid_column_name);
let mut description =
String::with_capacity(table_name.as_str().len() + rowid_column_name.len() + 2);
description.push_str(table_name.as_str());
description.push('.');
description.push_str(rowid_column_name);
program.emit_insn(Insn::Halt {
err_code: SQLITE_CONSTRAINT_PRIMARYKEY,
description,
});
}
program.preassign_label_to_next_insn(make_record_label);
}
@@ -498,6 +491,18 @@ pub fn translate_insert(
_ => (),
}
// We need to separate index handling and insertion into a `preflight` and a
// `commit` phase, because in UPSERT mode we might need to skip the actual insertion, as we can
// have a naked ON CONFLICT DO NOTHING, so if we eagerly insert any indexes, we could insert
// invalid index entries before we hit a conflict down the line.
//
// Preflight phase: evaluate each applicable UNIQUE constraint and probe with NoConflict.
// If any probe hits:
// DO NOTHING -> jump to row_done_label.
//
// DO UPDATE (matching target) -> fetch conflicting rowid and jump to `upsert_entry`.
//
// otherwise, raise SQLITE_CONSTRAINT_UNIQUE
for index in schema.get_indices(table_name.as_str()) {
let column_mappings = index
.columns
@@ -510,10 +515,34 @@ pub fn translate_insert(
.map(|(_, _, c_id)| *c_id)
.expect("no cursor found for index");
let maybe_skip_probe_label = if let Some(where_clause) = &index.where_clause {
let mut where_for_eval = where_clause.as_ref().clone();
rewrite_partial_index_where(&mut where_for_eval, &insertion)?;
let reg = program.alloc_register();
translate_expr_no_constant_opt(
&mut program,
Some(&TableReferences::new_empty()),
&where_for_eval,
reg,
&resolver,
NoConstantOptReason::RegisterReuse,
)?;
let lbl = program.allocate_label();
program.emit_insn(Insn::IfNot {
reg,
target_pc: lbl,
jump_if_null: true,
});
Some(lbl)
} else {
None
};
let num_cols = index.columns.len();
// allocate scratch registers for the index columns plus rowid
let idx_start_reg = program.alloc_registers(num_cols + 1);
// build unpacked key [idx_start_reg .. idx_start_reg+num_cols-1], and rowid in last reg,
// copy each index column from the table's column registers into these scratch regs
for (i, column_mapping) in column_mappings.clone().enumerate() {
// copy from the table's column register over to the index's scratch register
@@ -535,96 +564,131 @@ pub fn translate_insert(
extra_amount: 0,
});
let record_reg = program.alloc_register();
program.emit_insn(Insn::MakeRecord {
start_reg: idx_start_reg,
count: num_cols + 1,
dest_reg: record_reg,
index_name: Some(index.name.clone()),
affinity_str: None,
});
if index.unique {
let label_idx_insert = program.allocate_label();
program.emit_insn(Insn::NoConflict {
cursor_id: idx_cursor_id,
target_pc: label_idx_insert,
record_reg: idx_start_reg,
num_regs: num_cols,
let aff = index
.columns
.iter()
.map(|ic| table.columns()[ic.pos_in_table].affinity().aff_mask())
.collect::<String>();
program.emit_insn(Insn::Affinity {
start_reg: idx_start_reg,
count: NonZeroUsize::new(num_cols).expect("nonzero col count"),
affinities: aff,
});
let column_names = index.columns.iter().enumerate().fold(
String::with_capacity(50),
|mut accum, (idx, column)| {
if idx > 0 {
accum.push_str(", ");
}
accum.push_str(&index.name);
accum.push('.');
accum.push_str(&column.name);
accum
},
);
// again, emit halt for every case *except* when upsert handles the conflict
'emit_halt: {
if let Some(ref mut upsert) = upsert_opt.as_mut() {
if upsert_matches_index(upsert, index, &table) {
match upsert.do_clause {
UpsertDo::Nothing => {
program.emit_insn(Insn::Goto {
target_pc: row_done_label,
});
}
UpsertDo::Set {
ref mut sets,
ref mut where_clause,
} => {
let mut rewritten_sets =
collect_set_clauses_for_upsert(&table, sets)?;
let conflict_rowid_reg = program.alloc_register();
program.emit_insn(Insn::IdxRowId {
cursor_id: idx_cursor_id,
dest: conflict_rowid_reg,
});
emit_upsert(
&mut program,
schema,
&table,
&insertion,
cursor_id,
conflict_rowid_reg,
&mut rewritten_sets,
where_clause,
&resolver,
&idx_cursors,
&mut result_columns,
cdc_table.as_ref().map(|c| c.0),
row_done_label,
)?;
}
}
break 'emit_halt;
if has_upsert {
let next_check = program.allocate_label();
program.emit_insn(Insn::NoConflict {
cursor_id: idx_cursor_id,
target_pc: next_check,
record_reg: idx_start_reg,
num_regs: num_cols,
});
// Conflict detected, figure out if this UPSERT handles the conflict
let upsert_matches_this_index = if let (Some(_u), Some(ref target)) =
(upsert_opt.as_ref(), resolved_upsert.as_ref())
{
match target {
ResolvedUpsertTarget::CatchAll => true,
ResolvedUpsertTarget::Index(tgt) => Arc::ptr_eq(tgt, index),
// note: PK handled earlier by rowid path; this is a secondary index
ResolvedUpsertTarget::PrimaryKey => false,
}
} else {
false
};
if upsert_matches_this_index {
// Distinguish DO NOTHING vs DO UPDATE
match upsert_opt.as_ref().unwrap().do_clause {
UpsertDo::Nothing => {
// Bail out without writing anything
program.emit_insn(Insn::Goto {
target_pc: row_done_label,
});
}
UpsertDo::Set { .. } => {
// Route to DO UPDATE: capture conflicting rowid then jump
program.emit_insn(Insn::IdxRowId {
cursor_id: idx_cursor_id,
dest: conflict_rowid_reg,
});
program.emit_insn(Insn::Goto {
target_pc: upsert_entry,
});
}
}
} else {
// No matching UPSERT handler so we emit constraint error
program.emit_insn(Insn::Halt {
err_code: SQLITE_CONSTRAINT_UNIQUE,
description: format_unique_violation_desc(table_name.as_str(), index),
});
}
// No matching UPSERT rule: unique constraint violation.
// continue preflight with next constraint
program.preassign_label_to_next_insn(next_check);
} else {
// No UPSERT fast-path: probe and immediately insert
let ok = program.allocate_label();
program.emit_insn(Insn::NoConflict {
cursor_id: idx_cursor_id,
target_pc: ok,
record_reg: idx_start_reg,
num_regs: num_cols,
});
// Unique violation without ON CONFLICT clause -> error
program.emit_insn(Insn::Halt {
err_code: SQLITE_CONSTRAINT_PRIMARYKEY,
description: column_names,
err_code: SQLITE_CONSTRAINT_UNIQUE,
description: format_unique_violation_desc(table_name.as_str(), index),
});
program.preassign_label_to_next_insn(ok);
// In the non-UPSERT case, we insert the index
let record_reg = program.alloc_register();
program.emit_insn(Insn::MakeRecord {
start_reg: idx_start_reg,
count: num_cols + 1,
dest_reg: record_reg,
index_name: Some(index.name.clone()),
affinity_str: None,
});
program.emit_insn(Insn::IdxInsert {
cursor_id: idx_cursor_id,
record_reg,
unpacked_start: Some(idx_start_reg),
unpacked_count: Some((num_cols + 1) as u16),
flags: IdxInsertFlags::new().nchange(true),
});
}
} else {
// Non-unique index: in UPSERT mode we postpone writes to commit phase.
if !has_upsert {
// eager insert for non-unique, no UPSERT
let record_reg = program.alloc_register();
program.emit_insn(Insn::MakeRecord {
start_reg: idx_start_reg,
count: num_cols + 1,
dest_reg: record_reg,
index_name: Some(index.name.clone()),
affinity_str: None,
});
program.emit_insn(Insn::IdxInsert {
cursor_id: idx_cursor_id,
record_reg,
unpacked_start: Some(idx_start_reg),
unpacked_count: Some((num_cols + 1) as u16),
flags: IdxInsertFlags::new().nchange(true),
});
}
program.resolve_label(label_idx_insert, program.offset());
}
// now do the actual index insertion using the unpacked registers
program.emit_insn(Insn::IdxInsert {
cursor_id: idx_cursor_id,
record_reg,
unpacked_start: Some(idx_start_reg), // TODO: enable optimization
unpacked_count: Some((num_cols + 1) as u16),
// TODO: figure out how to determine whether or not we need to seek prior to insert.
flags: IdxInsertFlags::new().nchange(true),
});
}
// Close the partial-index skip (preflight)
if let Some(lbl) = maybe_skip_probe_label {
program.resolve_label(lbl, program.offset());
}
}
for column_mapping in insertion
.col_mappings
.iter()
@@ -661,6 +725,7 @@ pub fn translate_insert(
},
});
}
// Create and insert the record
let affinity_str = insertion
.col_mappings
@@ -675,6 +740,87 @@ pub fn translate_insert(
index_name: None,
affinity_str: Some(affinity_str),
});
if has_upsert {
// COMMIT PHASE: no preflight jumps happened; emit the actual index writes now
// We re-check partial-index predicates against the NEW image, produce packed records,
// and insert into all applicable indexes, we do not re-probe uniqueness here, as preflight
// already guaranteed non-conflict.
for index in schema.get_indices(table_name.as_str()) {
let idx_cursor_id = idx_cursors
.iter()
.find(|(name, _, _)| *name == &index.name)
.map(|(_, _, c_id)| *c_id)
.expect("no cursor found for index");
// Re-evaluate partial predicate on the would-be inserted image
let commit_skip_label = if let Some(where_clause) = &index.where_clause {
let mut where_for_eval = where_clause.as_ref().clone();
rewrite_partial_index_where(&mut where_for_eval, &insertion)?;
let reg = program.alloc_register();
translate_expr_no_constant_opt(
&mut program,
Some(&TableReferences::new_empty()),
&where_for_eval,
reg,
&resolver,
NoConstantOptReason::RegisterReuse,
)?;
let lbl = program.allocate_label();
program.emit_insn(Insn::IfNot {
reg,
target_pc: lbl,
jump_if_null: true,
});
Some(lbl)
} else {
None
};
let num_cols = index.columns.len();
let idx_start_reg = program.alloc_registers(num_cols + 1);
// Build [key cols..., rowid] from insertion registers
for (i, idx_col) in index.columns.iter().enumerate() {
let Some(cm) = insertion.get_col_mapping_by_name(&idx_col.name) else {
return Err(crate::LimboError::PlanningError(
"Column not found in INSERT (commit phase)".to_string(),
));
};
program.emit_insn(Insn::Copy {
src_reg: cm.register,
dst_reg: idx_start_reg + i,
extra_amount: 0,
});
}
program.emit_insn(Insn::Copy {
src_reg: insertion.key_register(),
dst_reg: idx_start_reg + num_cols,
extra_amount: 0,
});
let record_reg = program.alloc_register();
program.emit_insn(Insn::MakeRecord {
start_reg: idx_start_reg,
count: num_cols + 1,
dest_reg: record_reg,
index_name: Some(index.name.clone()),
affinity_str: None,
});
program.emit_insn(Insn::IdxInsert {
cursor_id: idx_cursor_id,
record_reg,
unpacked_start: Some(idx_start_reg),
unpacked_count: Some((num_cols + 1) as u16),
flags: IdxInsertFlags::new().nchange(true),
});
if let Some(lbl) = commit_skip_label {
program.resolve_label(lbl, program.offset());
}
}
}
program.emit_insn(Insn::Insert {
cursor: cursor_id,
key_reg: insertion.key_register(),
@@ -720,6 +866,45 @@ pub fn translate_insert(
emit_returning_results(&mut program, &result_columns, &value_registers)?;
}
program.emit_insn(Insn::Goto {
target_pc: row_done_label,
});
// Normal INSERT path is done above
// Any conflict routed to UPSERT jumps past all that to here:
program.preassign_label_to_next_insn(upsert_entry);
if let (Some(mut upsert), Some(_)) = (upsert_opt.take(), resolved_upsert.clone()) {
// Only DO UPDATE (SET ...); DO NOTHING should have already jumped to row_done_label earlier.
if let UpsertDo::Set {
ref mut sets,
ref mut where_clause,
} = upsert.do_clause
{
// Normalize SET pairs once
let mut rewritten_sets = collect_set_clauses_for_upsert(&table, sets)?;
emit_upsert(
&mut program,
schema,
&table,
&insertion,
cursor_id,
conflict_rowid_reg,
&mut rewritten_sets,
where_clause,
&resolver,
&idx_cursors,
&mut result_columns,
cdc_table.as_ref().map(|c| c.0),
row_done_label,
)?;
} else {
// UpsertDo::Nothing case
program.emit_insn(Insn::Goto {
target_pc: row_done_label,
});
}
}
if inserting_multiple_rows {
if let Some(temp_table_ctx) = temp_table_ctx {
@@ -1186,3 +1371,73 @@ fn translate_virtual_table_insert(
Ok(program)
}
#[inline]
/// Build the UNIQUE constraint error description to match sqlite
/// single column: `t.c1`
/// multi-column: `t.(k, c1)`
pub fn format_unique_violation_desc(table_name: &str, index: &Index) -> String {
if index.columns.len() == 1 {
let mut s = String::with_capacity(table_name.len() + 1 + index.columns[0].name.len());
s.push_str(table_name);
s.push('.');
s.push_str(&index.columns[0].name);
s
} else {
let mut s = String::with_capacity(table_name.len() + 3 + 4 * index.columns.len());
s.push_str(table_name);
s.push_str(".(");
s.push_str(
&index
.columns
.iter()
.map(|c| c.name.as_str())
.collect::<Vec<_>>()
.join(", "),
);
s.push(')');
s
}
}
/// Rewrite WHERE clause for partial index to reference insertion registers
pub fn rewrite_partial_index_where(
expr: &mut ast::Expr,
insertion: &Insertion,
) -> crate::Result<WalkControl> {
let col_reg = |name: &str| -> Option<usize> {
if name.eq_ignore_ascii_case("rowid") {
Some(insertion.key_register())
} else if let Some(c) = insertion.get_col_mapping_by_name(name) {
if c.column.is_rowid_alias {
Some(insertion.key_register())
} else {
Some(c.register)
}
} else {
None
}
};
walk_expr_mut(
expr,
&mut |e: &mut ast::Expr| -> crate::Result<WalkControl> {
match e {
// NOTE: should not have ANY Expr::Columns bound to the expr
Expr::Id(ast::Name::Ident(name)) | Expr::Id(ast::Name::Quoted(name)) => {
let normalized = normalize_ident(name.as_str());
if let Some(reg) = col_reg(&normalized) {
*e = Expr::Register(reg);
}
}
Expr::Qualified(_, col) | Expr::DoublyQualified(_, _, col) => {
let normalized = normalize_ident(col.as_str());
if let Some(reg) = col_reg(&normalized) {
*e = Expr::Register(reg);
}
}
_ => {}
}
Ok(WalkControl::Continue)
},
)
}

View File

@@ -93,6 +93,7 @@ pub fn init_distinct(program: &mut ProgramBuilder, plan: &SelectPlan) -> Distinc
.collect(),
unique: false,
has_rowid: false,
where_clause: None,
});
let cursor_id = program.alloc_cursor_id(CursorType::BTreeIndex(index.clone()));
let ctx = DistinctCtx {
@@ -166,6 +167,7 @@ pub fn init_loop(
}],
has_rowid: false,
unique: false,
where_clause: None,
});
let cursor_id = program.alloc_cursor_id(CursorType::BTreeIndex(index.clone()));
if group_by.is_none() {

View File

@@ -163,20 +163,17 @@ pub fn translate_inner(
tbl_name,
columns,
where_clause,
} => {
if where_clause.is_some() {
bail_parse_error!("Partial indexes are not supported");
}
translate_create_index(
(unique, if_not_exists),
idx_name.name.as_str(),
tbl_name.as_str(),
&columns,
schema,
syms,
program,
)?
}
} => translate_create_index(
(unique, if_not_exists),
idx_name.name.as_str(),
tbl_name.as_str(),
&columns,
schema,
syms,
program,
connection,
where_clause,
)?,
ast::Stmt::CreateTable {
temporary,
if_not_exists,

View File

@@ -1,4 +1,8 @@
use std::{cmp::Ordering, collections::HashMap, sync::Arc};
use std::{
cmp::Ordering,
collections::{HashMap, VecDeque},
sync::Arc,
};
use crate::{
schema::{Column, Index},
@@ -7,6 +11,7 @@ use crate::{
plan::{JoinOrderMember, TableReferences, WhereTerm},
planner::{table_mask_from_expr, TableMask},
},
util::exprs_are_equivalent,
Result,
};
use turso_ext::{ConstraintInfo, ConstraintOp};
@@ -174,7 +179,7 @@ fn estimate_selectivity(column: &Column, op: ast::Operator) -> f64 {
pub fn constraints_from_where_clause(
where_clause: &[WhereTerm],
table_references: &TableReferences,
available_indexes: &HashMap<String, Vec<Arc<Index>>>,
available_indexes: &HashMap<String, VecDeque<Arc<Index>>>,
) -> Result<Vec<TableConstraints>> {
let mut constraints = Vec::new();
@@ -314,31 +319,26 @@ pub fn constraints_from_where_clause(
}
for index in available_indexes
.get(table_reference.table.get_name())
.unwrap_or(&Vec::new())
.unwrap_or(&VecDeque::new())
{
if let Some(position_in_index) =
index.column_table_pos_to_index_pos(constraint.table_col_pos)
{
let index_candidate = cs
.candidates
.iter_mut()
.find_map(|candidate| {
if candidate
.index
.as_ref()
.is_some_and(|i| Arc::ptr_eq(index, i))
{
Some(candidate)
} else {
None
}
})
.unwrap();
index_candidate.refs.push(ConstraintRef {
constraint_vec_pos: i,
index_col_pos: position_in_index,
sort_order: index.columns[position_in_index].order,
});
if let Some(index_candidate) = cs.candidates.iter_mut().find_map(|candidate| {
if candidate.index.as_ref().is_some_and(|i| {
Arc::ptr_eq(index, i) && can_use_partial_index(index, where_clause)
}) {
Some(candidate)
} else {
None
}
}) {
index_candidate.refs.push(ConstraintRef {
constraint_vec_pos: i,
index_col_pos: position_in_index,
sort_order: index.columns[position_in_index].order,
});
}
}
}
}
@@ -365,6 +365,15 @@ pub fn constraints_from_where_clause(
candidate.refs.truncate(first_inequality + 1);
}
}
cs.candidates.retain(|c| {
if let Some(idx) = &c.index {
if idx.where_clause.is_some() && c.refs.is_empty() {
// prevent a partial index from even being considered as a scan driver.
return false;
}
}
true
});
constraints.push(cs);
}
@@ -403,6 +412,21 @@ pub fn usable_constraints_for_join_order<'a>(
&refs[..usable_until]
}
fn can_use_partial_index(index: &Index, query_where_clause: &[WhereTerm]) -> bool {
let Some(index_where) = &index.where_clause else {
// Full index, always usable
return true;
};
// Check if query WHERE contains the exact same predicate
for term in query_where_clause {
if exprs_are_equivalent(&term.expr, index_where.as_ref()) {
return true;
}
}
// TODO: do better to determine if we should use partial index
false
}
pub fn convert_to_vtab_constraint(
constraints: &[Constraint],
join_order: &[JoinOrderMember],

View File

@@ -501,7 +501,7 @@ fn generate_join_bitmasks(table_number_max_exclusive: usize, how_many: usize) ->
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::{collections::VecDeque, sync::Arc};
use turso_parser::ast::{self, Expr, Operator, SortOrder, TableInternalId};
@@ -664,6 +664,7 @@ mod tests {
let index = Arc::new(Index {
name: "sqlite_autoindex_test_table_1".to_string(),
table_name: "test_table".to_string(),
where_clause: None,
columns: vec![IndexColumn {
name: "id".to_string(),
order: SortOrder::Asc,
@@ -676,7 +677,7 @@ mod tests {
root_page: 1,
has_rowid: true,
});
available_indexes.insert("test_table".to_string(), vec![index]);
available_indexes.insert("test_table".to_string(), VecDeque::from([index]));
let table_constraints =
constraints_from_where_clause(&where_clause, &table_references, &available_indexes)
@@ -733,6 +734,7 @@ mod tests {
let index1 = Arc::new(Index {
name: "index1".to_string(),
table_name: "table1".to_string(),
where_clause: None,
columns: vec![IndexColumn {
name: "id".to_string(),
order: SortOrder::Asc,
@@ -745,7 +747,7 @@ mod tests {
root_page: 1,
has_rowid: true,
});
available_indexes.insert("table1".to_string(), vec![index1]);
available_indexes.insert("table1".to_string(), VecDeque::from([index1]));
// SELECT * FROM table1 JOIN table2 WHERE table1.id = table2.id
// expecting table2 to be chosen first due to the index on table1.id
@@ -849,6 +851,7 @@ mod tests {
let index_name = format!("sqlite_autoindex_{table_name}_1");
let index = Arc::new(Index {
name: index_name,
where_clause: None,
table_name: table_name.to_string(),
columns: vec![IndexColumn {
name: "id".to_string(),
@@ -862,11 +865,12 @@ mod tests {
root_page: 1,
has_rowid: true,
});
available_indexes.insert(table_name.to_string(), vec![index]);
available_indexes.insert(table_name.to_string(), VecDeque::from([index]));
});
let customer_id_idx = Arc::new(Index {
name: "orders_customer_id_idx".to_string(),
table_name: "orders".to_string(),
where_clause: None,
columns: vec![IndexColumn {
name: "customer_id".to_string(),
order: SortOrder::Asc,
@@ -882,6 +886,7 @@ mod tests {
let order_id_idx = Arc::new(Index {
name: "order_items_order_id_idx".to_string(),
table_name: "order_items".to_string(),
where_clause: None,
columns: vec![IndexColumn {
name: "order_id".to_string(),
order: SortOrder::Asc,
@@ -897,10 +902,10 @@ mod tests {
available_indexes
.entry("orders".to_string())
.and_modify(|v| v.push(customer_id_idx));
.and_modify(|v| v.push_front(customer_id_idx));
available_indexes
.entry("order_items".to_string())
.and_modify(|v| v.push(order_id_idx));
.and_modify(|v| v.push_front(order_id_idx));
// SELECT * FROM orders JOIN customers JOIN order_items
// WHERE orders.customer_id = customers.id AND orders.id = order_items.order_id AND customers.id = 42
@@ -1295,6 +1300,7 @@ mod tests {
let index = Arc::new(Index {
name: "idx_xy".to_string(),
table_name: "t1".to_string(),
where_clause: None,
columns: vec![
IndexColumn {
name: "x".to_string(),
@@ -1318,7 +1324,7 @@ mod tests {
});
let mut available_indexes = HashMap::new();
available_indexes.insert("t1".to_string(), vec![index]);
available_indexes.insert("t1".to_string(), VecDeque::from([index]));
let table = Table::BTree(table);
joined_tables.push(JoinedTable {
@@ -1381,6 +1387,7 @@ mod tests {
let index = Arc::new(Index {
name: "idx1".to_string(),
table_name: "t1".to_string(),
where_clause: None,
columns: vec![
IndexColumn {
name: "c1".to_string(),
@@ -1409,7 +1416,7 @@ mod tests {
ephemeral: false,
has_rowid: true,
});
available_indexes.insert("t1".to_string(), vec![index]);
available_indexes.insert("t1".to_string(), VecDeque::from([index]));
let table = Table::BTree(table);
joined_tables.push(JoinedTable {
@@ -1492,6 +1499,7 @@ mod tests {
let index = Arc::new(Index {
name: "idx1".to_string(),
table_name: "t1".to_string(),
where_clause: None,
columns: vec![
IndexColumn {
name: "c1".to_string(),
@@ -1520,7 +1528,7 @@ mod tests {
has_rowid: true,
unique: false,
});
available_indexes.insert("t1".to_string(), vec![index]);
available_indexes.insert("t1".to_string(), VecDeque::from([index]));
let table = Table::BTree(table);
joined_tables.push(JoinedTable {

View File

@@ -1,4 +1,9 @@
use std::{cell::RefCell, cmp::Ordering, collections::HashMap, sync::Arc};
use std::{
cell::RefCell,
cmp::Ordering,
collections::{HashMap, VecDeque},
sync::Arc,
};
use constraints::{
constraints_from_where_clause, usable_constraints_for_join_order, Constraint, ConstraintRef,
@@ -178,7 +183,7 @@ fn optimize_subqueries(plan: &mut SelectPlan, schema: &Schema) -> Result<()> {
fn optimize_table_access(
schema: &Schema,
table_references: &mut TableReferences,
available_indexes: &HashMap<String, Vec<Arc<Index>>>,
available_indexes: &HashMap<String, VecDeque<Arc<Index>>>,
where_clause: &mut [WhereTerm],
order_by: &mut Vec<(Box<ast::Expr>, SortOrder)>,
group_by: &mut Option<GroupBy>,
@@ -899,6 +904,7 @@ fn ephemeral_index_build(
ephemeral: true,
table_name: table_reference.table.get_name().to_string(),
root_page: 0,
where_clause: None,
has_rowid: table_reference
.table
.btree()

View File

@@ -577,6 +577,12 @@ impl TableReferences {
outer_query_refs,
}
}
pub fn new_empty() -> Self {
Self {
joined_tables: Vec::new(),
outer_query_refs: Vec::new(),
}
}
pub fn is_empty(&self) -> bool {
self.joined_tables.is_empty() && self.outer_query_refs.is_empty()

View File

@@ -82,7 +82,7 @@ pub fn translate_select(
};
program.extend(&opts);
emit_program(&mut program, select_plan, schema, syms, |_| {})?;
emit_program(connection, &mut program, select_plan, schema, syms, |_| {})?;
Ok(TranslateSelectResult {
program,
num_result_cols,

View File

@@ -1,8 +1,8 @@
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use crate::schema::{BTreeTable, Column, Type};
use crate::translate::expr::{bind_and_rewrite_expr, ParamState};
use crate::translate::expr::{bind_and_rewrite_expr, walk_expr, ParamState, WalkControl};
use crate::translate::optimizer::optimize_select_plan;
use crate::translate::plan::{Operation, QueryDestination, Scan, Search, SelectPlan};
use crate::translate::planner::parse_limit;
@@ -62,14 +62,13 @@ pub fn translate_update(
) -> crate::Result<ProgramBuilder> {
let mut plan = prepare_update_plan(&mut program, schema, body, connection, false)?;
optimize_plan(&mut plan, schema)?;
// TODO: freestyling these numbers
let opts = ProgramBuilderOpts {
num_cursors: 1,
approx_num_insns: 20,
approx_num_labels: 4,
};
program.extend(&opts);
emit_program(&mut program, plan, schema, syms, |_| {})?;
emit_program(connection, &mut program, plan, schema, syms, |_| {})?;
Ok(program)
}
@@ -97,7 +96,7 @@ pub fn translate_update_for_schema_change(
approx_num_labels: 4,
};
program.extend(&opts);
emit_program(&mut program, plan, schema, syms, after)?;
emit_program(connection, &mut program, plan, schema, syms, after)?;
Ok(program)
}
@@ -372,24 +371,49 @@ pub fn prepare_update_plan(
// Check what indexes will need to be updated by checking set_clauses and see
// if a column is contained in an index.
let indexes = schema.get_indices(table_name);
let updated_cols: HashSet<usize> = set_clauses.iter().map(|(i, _)| *i).collect();
let rowid_alias_used = set_clauses
.iter()
.any(|(idx, _)| columns[*idx].is_rowid_alias);
let indexes_to_update = if rowid_alias_used {
// If the rowid alias is used in the SET clause, we need to update all indexes
indexes.to_vec()
indexes.cloned().collect()
} else {
// otherwise we need to update the indexes whose columns are set in the SET clause
// otherwise we need to update the indexes whose columns are set in the SET clause,
// or if the colunns used in the partial index WHERE clause are being updated
indexes
.iter()
.filter(|index| {
index.columns.iter().any(|index_column| {
set_clauses
.iter()
.any(|(set_index_column, _)| index_column.pos_in_table == *set_index_column)
})
.filter_map(|idx| {
let mut needs = idx
.columns
.iter()
.any(|c| updated_cols.contains(&c.pos_in_table));
if !needs {
if let Some(w) = &idx.where_clause {
let mut where_copy = w.as_ref().clone();
let mut param = ParamState::disallow();
let mut tr =
TableReferences::new(table_references.joined_tables().to_vec(), vec![]);
bind_and_rewrite_expr(
&mut where_copy,
Some(&mut tr),
None,
connection,
&mut param,
)
.ok()?;
let cols_used = collect_cols_used_in_expr(&where_copy);
// if any of the columns used in the partial index WHERE clause is being
// updated, we need to update this index
needs = cols_used.iter().any(|c| updated_cols.contains(c));
}
}
if needs {
Some(idx.clone())
} else {
None
}
})
.cloned()
.collect()
};
@@ -422,3 +446,17 @@ fn build_scan_op(table: &Table, iter_dir: IterationDirection) -> Operation {
_ => unreachable!(),
}
}
/// Returns a set of column indices used in the expression.
/// *Must* be used on an Expr already processed by `bind_and_rewrite_expr`
fn collect_cols_used_in_expr(expr: &Expr) -> HashSet<usize> {
let mut acc = HashSet::new();
let _ = walk_expr(expr, &mut |expr| match expr {
Expr::Column { column, .. } => {
acc.insert(*column);
Ok(WalkControl::Continue)
}
_ => Ok(WalkControl::Continue),
});
acc
}

View File

@@ -1,8 +1,13 @@
use std::collections::HashSet;
use std::num::NonZeroUsize;
use std::{collections::HashMap, sync::Arc};
use turso_parser::ast::{self, Upsert};
use crate::translate::expr::WalkControl;
use crate::error::SQLITE_CONSTRAINT_PRIMARYKEY;
use crate::translate::expr::{walk_expr, WalkControl};
use crate::translate::insert::format_unique_violation_desc;
use crate::vdbe::insn::CmpInsFlags;
use crate::{
bail_parse_error,
error::SQLITE_CONSTRAINT_NOTNULL,
@@ -26,6 +31,38 @@ use crate::{
},
};
// The following comment is copied directly from SQLite source and should be used as a guiding light
// whenever we encounter compatibility bugs related to conflict clause handling:
/* UNIQUE and PRIMARY KEY constraints should be handled in the following
** order:
**
** (1) OE_Update
** (2) OE_Abort, OE_Fail, OE_Rollback, OE_Ignore
** (3) OE_Replace
**
** OE_Fail and OE_Ignore must happen before any changes are made.
** OE_Update guarantees that only a single row will change, so it
** must happen before OE_Replace. Technically, OE_Abort and OE_Rollback
** could happen in any order, but they are grouped up front for
** convenience.
**
** 2018-08-14: Ticket https://www.sqlite.org/src/info/908f001483982c43
** The order of constraints used to have OE_Update as (2) and OE_Abort
** and so forth as (1). But apparently PostgreSQL checks the OE_Update
** constraint before any others, so it had to be moved.
**
** Constraint checking code is generated in this order:
** (A) The rowid constraint
** (B) Unique index constraints that do not have OE_Replace as their
** default conflict resolution strategy
** (C) Unique index that do use OE_Replace by default.
**
** The ordering of (2) and (3) is accomplished by making sure the linked
** list of indexes attached to a table puts all OE_Replace indexes last
** in the list. See sqlite3CreateIndex() for where that happens.
*/
/// A ConflictTarget is extracted from each ON CONFLICT target,
// e.g. INSERT INTO x(a) ON CONFLICT *(a COLLATE nocase)*
#[derive(Debug, Clone)]
@@ -107,82 +144,175 @@ pub fn upsert_matches_pk(upsert: &Upsert, table: &Table) -> bool {
})
}
#[derive(Hash, Debug, Eq, PartialEq, Clone)]
/// A hashable descriptor of a single index key term used when
/// matching an `ON CONFLICT` target against a UNIQUE index.
/// captures only the attributes (name and effective collation) that
/// determine whether two key terms are equivalent for conflict detection.
pub struct KeySig {
/// column name, normalized to lowercase
name: String,
/// defaults to "binary" if not specified on the target or col
coll: String,
/// Returns array of chaned column indicies and whether rowid was changed.
fn collect_changed_cols(
table: &Table,
set_pairs: &[(usize, Box<ast::Expr>)],
) -> (HashSet<usize>, bool) {
let mut cols_changed = HashSet::with_capacity(table.columns().len());
let mut rowid_changed = false;
for (col_idx, _) in set_pairs {
if let Some(c) = table.columns().get(*col_idx) {
if c.is_rowid_alias {
rowid_changed = true;
} else {
cols_changed.insert(*col_idx);
}
}
}
(cols_changed, rowid_changed)
}
/// Match ON CONFLICT target to a UNIQUE index, ignoring order, requiring exact
/// coverage, and honoring collations. `table` is used to derive effective collation.
#[inline]
fn upsert_index_is_affected(
table: &Table,
idx: &Index,
changed_cols: &HashSet<usize>,
rowid_changed: bool,
) -> bool {
if rowid_changed {
return true;
}
let km = index_keys(idx);
let pm = partial_index_cols(idx, table);
for c in km.iter().chain(pm.iter()) {
if changed_cols.contains(c) {
return true;
}
}
false
}
/// Columns used by index key
#[inline]
fn index_keys(idx: &Index) -> Vec<usize> {
idx.columns.iter().map(|ic| ic.pos_in_table).collect()
}
/// Columns referenced by the partial WHERE (empty if none).
fn partial_index_cols(idx: &Index, table: &Table) -> HashSet<usize> {
use ast::{Expr, Name};
let Some(expr) = &idx.where_clause else {
return HashSet::new();
};
let mut out = HashSet::new();
let _ = walk_expr(expr, &mut |e: &ast::Expr| -> crate::Result<WalkControl> {
match e {
Expr::Id(Name::Ident(n) | Name::Quoted(n)) => {
if let Some((i, _)) = table.get_column_by_name(&normalize_ident(n.as_str())) {
out.insert(i);
}
}
Expr::Qualified(ns, Name::Ident(c) | Name::Quoted(c))
| Expr::DoublyQualified(_, ns, Name::Ident(c) | Name::Quoted(c)) => {
// Only count columns that belong to this table
let nsn = normalize_ident(ns.as_str());
let tname = normalize_ident(table.get_name());
if nsn.eq_ignore_ascii_case(&tname) {
if let Some((i, _)) = table.get_column_by_name(&normalize_ident(c.as_str())) {
out.insert(i);
}
}
}
_ => {}
}
Ok(WalkControl::Continue)
});
out
}
/// Match ON CONFLICT target to a UNIQUE index, *ignoring order* but requiring
/// exact coverage (same column multiset). If the target specifies a COLLATED
/// column, the collation must match the index column's effective collation.
/// If the target omits collation, any index collation is accepted.
/// Partial (WHERE) indexes never match.
pub fn upsert_matches_index(upsert: &Upsert, index: &Index, table: &Table) -> bool {
let Some(target) = upsert.index.as_ref() else {
// catch-all
return true;
};
// if not unique or column count differs, no match
if !index.unique || target.targets.len() != index.columns.len() {
// must be a non-partial UNIQUE index with identical arity
if !index.unique || index.where_clause.is_some() || target.targets.len() != index.columns.len()
{
return false;
}
let mut need: HashMap<KeySig, usize> = HashMap::new();
for ic in &index.columns {
let sig = KeySig {
name: normalize_ident(&ic.name).to_string(),
coll: effective_collation_for_index_col(ic, table),
};
*need.entry(sig).or_insert(0) += 1;
}
// Consume from the multiset using target entries, order-insensitive
// Build a multiset of index columns: (normalized name, effective collation)
// effective collation = index collation if set, else table column default, else "binary"
let mut idx_cols: Vec<(String, String)> = index
.columns
.iter()
.map(|ic| {
(
normalize_ident(&ic.name),
effective_collation_for_index_col(ic, table),
)
})
.collect();
// For each target key, locate a matching index column (name equal ignoring case,
// and collation equal iff the target specifies one). Consume each match once.
for te in &target.targets {
let tk = match extract_target_key(&te.expr) {
Some(x) => x,
None => return false, // not a simple column ref
let Some(tk) = extract_target_key(&te.expr) else {
return false;
};
let tname = tk.col_name;
let mut found = None;
// Candidate signatures for this target:
// If target specifies COLLATE, require exact match on (name, coll).
// Otherwise, accept any collation currently present for that name.
let mut matched = false;
if let Some(ref coll) = tk.collate {
let sig = KeySig {
name: tk.col_name.to_string(),
coll: coll.clone(),
};
if let Some(cnt) = need.get_mut(&sig) {
*cnt -= 1;
if *cnt == 0 {
need.remove(&sig);
for (i, (iname, icoll)) in idx_cols.iter().enumerate() {
if tname.eq_ignore_ascii_case(iname)
&& match tk.collate.as_ref() {
Some(c) => c.eq_ignore_ascii_case(icoll),
None => true, // unspecified collation -> accept any
}
matched = true;
}
} else {
// Try any available collation for this column name
if let Some((sig, cnt)) = need
.iter_mut()
.find(|(k, _)| k.name.eq_ignore_ascii_case(&tk.col_name))
{
*cnt -= 1;
if *cnt == 0 {
let key = sig.clone();
need.remove(&key);
}
matched = true;
found = Some(i);
break;
}
}
if !matched {
if let Some(i) = found {
// consume this index column once (multiset match)
idx_cols.swap_remove(i);
} else {
return false;
}
}
// All targets matched exactly.
need.is_empty()
// All target columns matched exactly once
idx_cols.is_empty()
}
#[derive(Clone)]
pub enum ResolvedUpsertTarget {
// ON CONFLICT DO
CatchAll,
// ON CONFLICT(pk) DO
PrimaryKey,
// matched this non-partial UNIQUE index
Index(Arc<Index>),
}
pub fn resolve_upsert_target(
schema: &Schema,
table: &Table,
upsert: &Upsert,
) -> crate::Result<ResolvedUpsertTarget> {
// Omitted target, catch-all
if upsert.index.is_none() {
return Ok(ResolvedUpsertTarget::CatchAll);
}
// Targeted: must match PK or a non-partial UNIQUE index.
if upsert_matches_pk(upsert, table) {
return Ok(ResolvedUpsertTarget::PrimaryKey);
}
for idx in schema.get_indices(table.get_name()) {
if idx.unique && upsert_matches_index(upsert, idx, table) {
return Ok(ResolvedUpsertTarget::Index(Arc::clone(idx)));
}
}
// Match SQLites error text:
crate::bail_parse_error!(
"ON CONFLICT clause does not match any PRIMARY KEY or UNIQUE constraint"
);
}
#[allow(clippy::too_many_arguments)]
@@ -223,7 +353,7 @@ pub fn emit_upsert(
cdc_cursor_id: Option<usize>,
row_done_label: BranchOffset,
) -> crate::Result<()> {
// Seek and snapshot current row
// Seek & snapshot CURRENT
program.emit_insn(Insn::SeekRowid {
cursor_id: tbl_cursor_id,
src_reg: conflict_rowid_reg,
@@ -247,7 +377,7 @@ pub fn emit_upsert(
}
}
// Keep BEFORE snapshot if needed
// BEFORE for index maintenance / CDC
let before_start = if cdc_cursor_id.is_some() || !idx_cursors.is_empty() {
let s = program.alloc_registers(num_cols);
program.emit_insn(Insn::Copy {
@@ -260,8 +390,7 @@ pub fn emit_upsert(
None
};
// NEW snapshot starts as a copy of CURRENT, then SET expressions overwrite
// the assigned columns. matching SQLite semantics of UPDATE reading the old row.
// NEW = CURRENT, then apply SET
let new_start = program.alloc_registers(num_cols);
program.emit_insn(Insn::Copy {
src_reg: current_start,
@@ -269,15 +398,16 @@ pub fn emit_upsert(
extra_amount: num_cols - 1,
});
// WHERE predicate on the target row. If false or NULL, skip the UPDATE.
// WHERE on target row
if let Some(pred) = where_clause.as_mut() {
rewrite_upsert_expr_in_place(
rewrite_expr_to_registers(
pred,
table,
table.get_name(),
current_start,
conflict_rowid_reg,
insertion,
Some(table.get_name()),
Some(insertion),
true,
)?;
let pr = program.alloc_register();
translate_expr(program, None, pred, pr, resolver)?;
@@ -288,15 +418,17 @@ pub fn emit_upsert(
});
}
// Evaluate each SET expression into the NEW row img
// Apply SET; capture rowid change if any
let mut new_rowid_reg: Option<usize> = None;
for (col_idx, expr) in set_pairs.iter_mut() {
rewrite_upsert_expr_in_place(
rewrite_expr_to_registers(
expr,
table,
table.get_name(),
current_start,
conflict_rowid_reg,
insertion,
Some(table.get_name()),
Some(insertion),
true,
)?;
translate_expr_no_constant_opt(
program,
@@ -311,12 +443,22 @@ pub fn emit_upsert(
program.emit_insn(Insn::HaltIfNull {
target_reg: new_start + *col_idx,
err_code: SQLITE_CONSTRAINT_NOTNULL,
description: format!("{}.{}", table.get_name(), col.name.as_ref().unwrap()),
description: String::from(table.get_name()) + col.name.as_ref().unwrap(),
});
}
if col.is_rowid_alias {
// Must be integer; remember the NEW rowid value
let r = program.alloc_register();
program.emit_insn(Insn::Copy {
src_reg: new_start + *col_idx,
dst_reg: r,
extra_amount: 0,
});
program.emit_insn(Insn::MustBeInt { reg: r });
new_rowid_reg = Some(r);
}
}
// If STRICT, perform type checks on the NEW image
if let Some(bt) = table.btree() {
if bt.is_strict {
program.emit_insn(Insn::TypeCheck {
@@ -328,14 +470,45 @@ pub fn emit_upsert(
}
}
// Rebuild indexes: remove keys corresponding to BEFORE and insert keys for NEW.
// Index rebuild (DELETE old, INSERT new), honoring partial-index WHEREs
if let Some(before) = before_start {
let (changed_cols, rowid_changed) = collect_changed_cols(table, set_pairs);
for (idx_name, _root, idx_cid) in idx_cursors {
let idx_meta = schema
.get_index(table.get_name(), idx_name)
.expect("index exists");
if !upsert_index_is_affected(table, idx_meta, &changed_cols, rowid_changed) {
continue; // skip untouched index completely
}
let k = idx_meta.columns.len();
let before_pred_reg = eval_partial_pred_for_row_image(
program,
table,
idx_meta,
before,
conflict_rowid_reg,
resolver,
);
let new_rowid = new_rowid_reg.unwrap_or(conflict_rowid_reg);
let new_pred_reg = eval_partial_pred_for_row_image(
program, table, idx_meta, new_start, new_rowid, resolver,
);
// Skip delete if BEFORE predicate false/NULL
let maybe_skip_del = before_pred_reg.map(|r| {
let lbl = program.allocate_label();
program.emit_insn(Insn::IfNot {
reg: r,
target_pc: lbl,
jump_if_null: true,
});
lbl
});
// DELETE old key
let del = program.alloc_registers(k + 1);
for (i, ic) in idx_meta.columns.iter().enumerate() {
let (ci, _) = table.get_column_by_name(&ic.name).unwrap();
@@ -356,7 +529,22 @@ pub fn emit_upsert(
cursor_id: *idx_cid,
raise_error_if_no_matching_entry: false,
});
if let Some(label) = maybe_skip_del {
program.resolve_label(label, program.offset());
}
// Skip insert if NEW predicate false/NULL
let maybe_skip_ins = new_pred_reg.map(|r| {
let lbl = program.allocate_label();
program.emit_insn(Insn::IfNot {
reg: r,
target_pc: lbl,
jump_if_null: true,
});
lbl
});
// INSERT new key (use NEW rowid if present)
let ins = program.alloc_registers(k + 1);
for (i, ic) in idx_meta.columns.iter().enumerate() {
let (ci, _) = table.get_column_by_name(&ic.name).unwrap();
@@ -367,7 +555,7 @@ pub fn emit_upsert(
});
}
program.emit_insn(Insn::Copy {
src_reg: conflict_rowid_reg,
src_reg: new_rowid,
dst_reg: ins + k,
extra_amount: 0,
});
@@ -380,6 +568,52 @@ pub fn emit_upsert(
index_name: Some((*idx_name).clone()),
affinity_str: None,
});
if idx_meta.unique {
// Affinity on the key columns for the NoConflict probe
let ok = program.allocate_label();
let aff: String = idx_meta
.columns
.iter()
.map(|c| {
table
.get_column_by_name(&c.name)
.map(|(_, col)| col.affinity().aff_mask())
.unwrap_or('B')
})
.collect();
program.emit_insn(Insn::Affinity {
start_reg: ins,
count: NonZeroUsize::new(k).unwrap(),
affinities: aff,
});
program.emit_insn(Insn::NoConflict {
cursor_id: *idx_cid,
target_pc: ok,
record_reg: ins,
num_regs: k,
});
let hit = program.alloc_register();
program.emit_insn(Insn::IdxRowId {
cursor_id: *idx_cid,
dest: hit,
});
program.emit_insn(Insn::Eq {
lhs: new_rowid,
rhs: hit,
target_pc: ok,
flags: CmpInsFlags::default(),
collation: program.curr_collation(),
});
let description = format_unique_violation_desc(table.get_name(), idx_meta);
program.emit_insn(Insn::Halt {
err_code: SQLITE_CONSTRAINT_PRIMARYKEY,
description,
});
program.preassign_label_to_next_insn(ok);
}
program.emit_insn(Insn::IdxInsert {
cursor_id: *idx_cid,
record_reg: rec,
@@ -387,18 +621,20 @@ pub fn emit_upsert(
unpacked_count: Some((k + 1) as u16),
flags: IdxInsertFlags::new().nchange(true),
});
if let Some(lbl) = maybe_skip_ins {
program.resolve_label(lbl, program.offset());
}
}
}
// Write table row (same rowid, new payload)
// Build NEW table payload
let rec = program.alloc_register();
let affinity_str = table
.columns()
.iter()
.map(|col| col.affinity().aff_mask())
.map(|c| c.affinity().aff_mask())
.collect::<String>();
program.emit_insn(Insn::MakeRecord {
start_reg: new_start,
count: num_cols,
@@ -406,59 +642,155 @@ pub fn emit_upsert(
index_name: None,
affinity_str: Some(affinity_str),
});
program.emit_insn(Insn::Insert {
cursor: tbl_cursor_id,
key_reg: conflict_rowid_reg,
record_reg: rec,
flag: InsertFlags::new(),
table_name: table.get_name().to_string(),
});
if let Some(cdc_id) = cdc_cursor_id {
let after_rec = if program.capture_data_changes_mode().has_after() {
Some(emit_cdc_patch_record(
program,
table,
new_start,
rec,
conflict_rowid_reg,
))
} else {
None
};
// Build BEFORE if needed
let before_rec = if program.capture_data_changes_mode().has_before() {
Some(emit_cdc_full_record(
program,
table.columns(),
tbl_cursor_id,
conflict_rowid_reg,
))
} else {
None
};
emit_cdc_insns(
program,
resolver,
OperationMode::UPDATE,
cdc_id,
conflict_rowid_reg,
before_rec,
after_rec,
None,
table.get_name(),
)?;
// If rowid changed, first ensure no other row owns it, then delete+insert
if let Some(rnew) = new_rowid_reg {
let ok = program.allocate_label();
// If equal to old rowid, skip uniqueness probe
program.emit_insn(Insn::Eq {
lhs: rnew,
rhs: conflict_rowid_reg,
target_pc: ok,
flags: CmpInsFlags::default(),
collation: program.curr_collation(),
});
// If another row already has rnew -> constraint
program.emit_insn(Insn::NotExists {
cursor: tbl_cursor_id,
rowid_reg: rnew,
target_pc: ok,
});
program.emit_insn(Insn::Halt {
err_code: SQLITE_CONSTRAINT_PRIMARYKEY,
description: format!(
"{}.{}",
table.get_name(),
table
.columns()
.iter()
.find(|c| c.is_rowid_alias)
.and_then(|c| c.name.as_ref())
.unwrap_or(&"rowid".to_string())
),
});
program.preassign_label_to_next_insn(ok);
// Now replace the row
program.emit_insn(Insn::Delete {
cursor_id: tbl_cursor_id,
table_name: table.get_name().to_string(),
});
program.emit_insn(Insn::Insert {
cursor: tbl_cursor_id,
key_reg: rnew,
record_reg: rec,
flag: InsertFlags::new().require_seek().update_rowid_change(),
table_name: table.get_name().to_string(),
});
} else {
program.emit_insn(Insn::Insert {
cursor: tbl_cursor_id,
key_reg: conflict_rowid_reg,
record_reg: rec,
flag: InsertFlags::new(),
table_name: table.get_name().to_string(),
});
}
// emit CDC instructions
if let Some(cdc_id) = cdc_cursor_id {
let new_rowid = new_rowid_reg.unwrap_or(conflict_rowid_reg);
if new_rowid_reg.is_some() {
// DELETE (before)
let before_rec = if program.capture_data_changes_mode().has_before() {
Some(emit_cdc_full_record(
program,
table.columns(),
tbl_cursor_id,
conflict_rowid_reg,
))
} else {
None
};
emit_cdc_insns(
program,
resolver,
OperationMode::DELETE,
cdc_id,
conflict_rowid_reg,
before_rec,
None,
None,
table.get_name(),
)?;
// INSERT (after)
let after_rec = if program.capture_data_changes_mode().has_after() {
Some(emit_cdc_patch_record(
program, table, new_start, rec, new_rowid,
))
} else {
None
};
emit_cdc_insns(
program,
resolver,
OperationMode::INSERT,
cdc_id,
new_rowid,
None,
after_rec,
None,
table.get_name(),
)?;
} else {
let after_rec = if program.capture_data_changes_mode().has_after() {
Some(emit_cdc_patch_record(
program,
table,
new_start,
rec,
conflict_rowid_reg,
))
} else {
None
};
let before_rec = if program.capture_data_changes_mode().has_before() {
Some(emit_cdc_full_record(
program,
table.columns(),
tbl_cursor_id,
conflict_rowid_reg,
))
} else {
None
};
emit_cdc_insns(
program,
resolver,
OperationMode::UPDATE,
cdc_id,
conflict_rowid_reg,
before_rec,
after_rec,
None,
table.get_name(),
)?;
}
}
// RETURNING from NEW image + final rowid
if !returning.is_empty() {
let regs = ReturningValueRegisters {
rowid_register: conflict_rowid_reg,
rowid_register: new_rowid_reg.unwrap_or(conflict_rowid_reg),
columns_start_register: new_start,
num_columns: num_cols,
};
emit_returning_results(program, returning, &regs)?;
}
program.emit_insn(Insn::Goto {
target_pc: row_done_label,
});
@@ -469,7 +801,6 @@ pub fn emit_upsert(
///
/// Supports multi-target row-value SETs: `SET (a, b) = (expr1, expr2)`.
/// Enforces same number of column names and RHS values.
/// Rewrites `EXCLUDED.*` references to direct `Register` reads from the insertion registers
/// If the same column is assigned multiple times, the last assignment wins.
pub fn collect_set_clauses_for_upsert(
table: &Table,
@@ -510,59 +841,109 @@ pub fn collect_set_clauses_for_upsert(
Ok(out)
}
/// Rewrite an UPSERT expression so that:
/// EXCLUDED.x -> Register(insertion.x)
/// t.x / x -> Register(CURRENT.x) when t == target table or unqualified
/// rowid -> Register(conflict_rowid_reg)
fn eval_partial_pred_for_row_image(
prg: &mut ProgramBuilder,
table: &Table,
idx: &Index,
row_start: usize, // base of CURRENT or NEW image
rowid_reg: usize, // rowid for that image
resolver: &Resolver,
) -> Option<usize> {
let Some(where_expr) = &idx.where_clause else {
return None;
};
let mut e = where_expr.as_ref().clone();
rewrite_expr_to_registers(
&mut e, table, row_start, rowid_reg, None, // table_name
None, // insertion
false, // dont allow EXCLUDED
)
.ok()?;
let r = prg.alloc_register();
translate_expr_no_constant_opt(
prg,
None,
&e,
r,
resolver,
NoConstantOptReason::RegisterReuse,
)
.ok()?;
Some(r)
}
/// Generic rewriter that maps column references to registers for a given row image.
///
/// Only rewrites names in the current expression scope, does not enter subqueries.
fn rewrite_upsert_expr_in_place(
/// - Id/Qualified refs to the *target table* (when `table_name` is provided) resolve
/// to the CURRENT/NEW row image starting at `base_start`, with `rowid` (or the
/// rowid-alias) mapped to `rowid_reg`.
/// - If `allow_excluded` and `insertion` are provided, `EXCLUDED.x` resolves to the
/// insertion registers (and `EXCLUDED.rowid` resolves to `insertion.key_register()`).
/// - If `table_name` is `None`, qualified refs never match
/// - Leaves names from other tables/namespaces untouched.
fn rewrite_expr_to_registers(
e: &mut ast::Expr,
table: &Table,
table_name: &str,
current_start: usize,
conflict_rowid_reg: usize,
insertion: &Insertion,
base_start: usize,
rowid_reg: usize,
table_name: Option<&str>,
insertion: Option<&Insertion>,
allow_excluded: bool,
) -> crate::Result<WalkControl> {
use ast::{Expr, Name};
let table_name_norm = table_name.map(normalize_ident);
let col_reg = |name: &str| -> Option<usize> {
// Map a column name to a register within the row image at `base_start`.
let col_reg_from_row_image = |name: &str| -> Option<usize> {
if name.eq_ignore_ascii_case("rowid") {
return Some(conflict_rowid_reg);
return Some(rowid_reg);
}
let (idx, c) = table.get_column_by_name(name)?;
if c.is_rowid_alias {
Some(rowid_reg)
} else {
Some(base_start + idx)
}
let (idx, _) = table.get_column_by_name(name)?;
Some(current_start + idx)
};
walk_expr_mut(
e,
&mut |expr: &mut ast::Expr| -> crate::Result<WalkControl> {
match expr {
// EXCLUDED.x or t.x (t may be quoted)
Expr::Qualified(ns, Name::Ident(c) | Name::Quoted(c))
| Expr::DoublyQualified(_, ns, Name::Ident(c) | Name::Quoted(c)) => {
let ns = normalize_ident(ns.as_str());
let c = normalize_ident(c.as_str());
if ns.eq_ignore_ascii_case("excluded") {
let Some(reg) = insertion.get_col_mapping_by_name(&c) else {
bail_parse_error!("no such column in EXCLUDED: {}", c);
};
*expr = Expr::Register(reg.register);
} else if ns.eq_ignore_ascii_case(table_name) {
if let Some(reg) = col_reg(c.as_str()) {
*expr = Expr::Register(reg);
// Handle EXCLUDED.* if enabled
if allow_excluded && ns.eq_ignore_ascii_case("excluded") {
if let Some(ins) = insertion {
if c.eq_ignore_ascii_case("rowid") {
*expr = Expr::Register(ins.key_register());
} else if let Some(cm) = ins.get_col_mapping_by_name(&c) {
*expr = Expr::Register(cm.register);
} else {
bail_parse_error!("no such column in EXCLUDED: {}", c);
}
}
// If insertion is None, leave EXCLUDED.* untouched.
return Ok(WalkControl::Continue);
}
// Match the target table namespace if provided
if let Some(ref tn) = table_name_norm {
if ns.eq_ignore_ascii_case(tn) {
if let Some(r) = col_reg_from_row_image(&c) {
*expr = Expr::Register(r);
}
}
}
}
// Unqualified column id -> CURRENT
// Unqualified id -> row image (CURRENT/NEW depending on caller)
Expr::Id(Name::Ident(name)) | Expr::Id(Name::Quoted(name)) => {
if let Some(reg) = col_reg(&normalize_ident(name.as_str())) {
*expr = Expr::Register(reg);
if let Some(r) = col_reg_from_row_image(&normalize_ident(name.as_str())) {
*expr = Expr::Register(r);
}
}
Expr::RowId { .. } => {
*expr = Expr::Register(conflict_rowid_reg);
}
_ => {}
}
Ok(WalkControl::Continue)

View File

@@ -333,8 +333,6 @@ pub fn check_literal_equivalency(lhs: &Literal, rhs: &Literal) -> bool {
/// This function is used to determine whether two expressions are logically
/// equivalent in the context of queries, even if their representations
/// differ. e.g.: `SUM(x)` and `sum(x)`, `x + y` and `y + x`
///
/// *Note*: doesn't attempt to evaluate/compute "constexpr" results
pub fn exprs_are_equivalent(expr1: &Expr, expr2: &Expr) -> bool {
match (expr1, expr2) {
(

View File

@@ -1,4 +1,5 @@
#![allow(unused_variables)]
use crate::error::SQLITE_CONSTRAINT_UNIQUE;
use crate::function::AlterTableFunc;
use crate::numeric::{NullableInteger, Numeric};
use crate::schema::Table;
@@ -2094,6 +2095,11 @@ pub fn halt(
"NOT NULL constraint failed: {description} (19)"
)));
}
SQLITE_CONSTRAINT_UNIQUE => {
return Err(LimboError::Constraint(format!(
"UNIQUE constraint failed: {description} (19)"
)));
}
_ => {
return Err(LimboError::Constraint(format!(
"undocumented halt error code {description}"

View File

@@ -43,3 +43,4 @@ source $testdir/views.test
source $testdir/vtab.test
source $testdir/upsert.test
source $testdir/window.test
source $testdir/partial_idx.test

572
testing/partial_idx.test Executable file
View File

@@ -0,0 +1,572 @@
#!/usr/bin/env tclsh
set testdir [file dirname $argv0]
source $testdir/tester.tcl
source $testdir/sqlite3/tester.tcl
do_execsql_test_on_specific_db {:memory:} partial-index-unique-basic {
CREATE TABLE users (id INTEGER PRIMARY KEY, email TEXT, status TEXT);
CREATE UNIQUE INDEX idx_active_email ON users(email) WHERE status = 'active';
INSERT INTO users VALUES (1, 'user@test.com', 'active');
INSERT INTO users VALUES (2, 'user@test.com', 'inactive');
INSERT INTO users VALUES (3, 'user@test.com', 'deleted');
SELECT id, email, status FROM users ORDER BY id;
} {1|user@test.com|active
2|user@test.com|inactive
3|user@test.com|deleted}
do_execsql_test_in_memory_error_content partial-index-unique-violation {
CREATE TABLE users (id INTEGER PRIMARY KEY, email TEXT, status TEXT);
CREATE UNIQUE INDEX idx_active_email ON users(email) WHERE status = 'active';
INSERT INTO users VALUES (1, 'user@test.com', 'active');
INSERT INTO users VALUES (2, 'user@test.com', 'inactive');
INSERT INTO users VALUES (3, 'user@test.com', 'deleted');
INSERT INTO users VALUES (4, 'user@test.com', 'active');
} {UNIQUE constraint failed: users.email (19)}
do_execsql_test_on_specific_db {:memory:} partial-index-expression-where {
CREATE TABLE products (id INTEGER PRIMARY KEY, sku TEXT, price INTEGER);
CREATE UNIQUE INDEX idx_expensive ON products(sku) WHERE price > 100;
INSERT INTO products VALUES (1, 'ABC123', 50);
INSERT INTO products VALUES (2, 'ABC123', 150);
INSERT INTO products VALUES (3, 'XYZ789', 200);
INSERT INTO products VALUES (4, 'ABC123', 75);
SELECT id, sku, price FROM products ORDER BY id;
} {1|ABC123|50
2|ABC123|150
3|XYZ789|200
4|ABC123|75}
do_execsql_test_in_memory_error_content partial-index-expensive-violation {
CREATE TABLE products (id INTEGER PRIMARY KEY, sku TEXT, price INTEGER);
CREATE UNIQUE INDEX idx_expensive ON products(sku) WHERE price > 100;
INSERT INTO products VALUES (1, 'ABC123', 50);
INSERT INTO products VALUES (2, 'ABC123', 150);
INSERT INTO products VALUES (3, 'XYZ789', 200);
INSERT INTO products VALUES (4, 'ABC123', 75);
INSERT INTO products VALUES (5, 'ABC123', 250);
-- should fail with unique sku where price > 100
} {UNIQUE constraint failed: products.sku (19)}
do_execsql_test_in_memory_error_content partial-index-expensive-violation-update {
CREATE TABLE products (id INTEGER PRIMARY KEY, sku TEXT, price INTEGER);
CREATE UNIQUE INDEX idx_expensive ON products(sku) WHERE price > 100;
INSERT INTO products VALUES (1, 'ABC123', 50);
INSERT INTO products VALUES (2, 'ABC123', 150);
INSERT INTO products VALUES (3, 'XYZ789', 200);
INSERT INTO products VALUES (4, 'ABC123', 75);
UPDATE products SET price = 300 WHERE id = 1;
-- should fail with unique sku where price > 100
} {UNIQUE constraint failed: products.sku (19)}
do_execsql_test_on_specific_db {:memory:} partial-index-null-where {
CREATE TABLE items (id INTEGER PRIMARY KEY, code TEXT, category TEXT);
CREATE UNIQUE INDEX idx_categorized ON items(code) WHERE category IS NOT NULL;
INSERT INTO items VALUES (1, 'ITEM1', 'electronics');
INSERT INTO items VALUES (2, 'ITEM1', NULL);
INSERT INTO items VALUES (3, 'ITEM1', NULL);
INSERT INTO items VALUES (4, 'ITEM2', 'books');
SELECT id, code, category FROM items ORDER BY id;
} {1|ITEM1|electronics
2|ITEM1|
3|ITEM1|
4|ITEM2|books}
do_execsql_test_in_memory_error_content partial-index-function-where {
CREATE TABLE docs (id INTEGER PRIMARY KEY, title TEXT);
CREATE UNIQUE INDEX idx_lower_title ON docs(title) WHERE LOWER(title) = title;
INSERT INTO docs VALUES (1, 'lowercase');
INSERT INTO docs VALUES (2, 'UPPERCASE');
INSERT INTO docs VALUES (3, 'lowercase');
} {UNIQUE constraint failed: docs.title (19)}
do_execsql_test_on_specific_db {:memory:} partial-index-multiple {
CREATE TABLE tasks (id INTEGER PRIMARY KEY, name TEXT, priority INTEGER, status TEXT);
CREATE UNIQUE INDEX idx_urgent ON tasks(name) WHERE priority = 1;
CREATE UNIQUE INDEX idx_completed ON tasks(name) WHERE status = 'done';
INSERT INTO tasks VALUES (1, 'task1', 1, 'open');
INSERT INTO tasks VALUES (2, 'task1', 2, 'open');
INSERT INTO tasks VALUES (3, 'task1', 3, 'done');
INSERT INTO tasks VALUES (4, 'task2', 1, 'done');
SELECT id, name, priority, status FROM tasks ORDER BY id;
} {1|task1|1|open
2|task1|2|open
3|task1|3|done
4|task2|1|done}
do_execsql_test_in_memory_error_content partial-index-function-where {
CREATE TABLE tasks (id INTEGER PRIMARY KEY, name TEXT, priority INTEGER, status TEXT);
CREATE UNIQUE INDEX idx_urgent ON tasks(name) WHERE priority = 1;
CREATE UNIQUE INDEX idx_completed ON tasks(name) WHERE status = 'done';
INSERT INTO tasks VALUES (1, 'task1', 1, 'open');
INSERT INTO tasks VALUES (2, 'task1', 2, 'open');
INSERT INTO tasks VALUES (3, 'task1', 3, 'done');
INSERT INTO tasks VALUES (4, 'task2', 1, 'done');
INSERT INTO tasks VALUES (5, 'task1', 1, 'pending');
-- should fail for unique name where priority = 1
} {UNIQUE constraint failed: tasks.name (19)}
do_execsql_test_in_memory_error_content partial-index-function-where-2 {
CREATE TABLE tasks (id INTEGER PRIMARY KEY, name TEXT, priority INTEGER, status TEXT);
CREATE UNIQUE INDEX idx_urgent ON tasks(name) WHERE priority = 1;
CREATE UNIQUE INDEX idx_completed ON tasks(name) WHERE status = 'done';
INSERT INTO tasks VALUES (1, 'task1', 1, 'open');
INSERT INTO tasks VALUES (2, 'task1', 2, 'open');
INSERT INTO tasks VALUES (3, 'task1', 3, 'done');
INSERT INTO tasks VALUES (4, 'task2', 1, 'done');
INSERT INTO tasks VALUES (6, 'task1', 2, 'done');
-- should fail for unique name where status = 'done'
} {UNIQUE constraint failed: tasks.name (19)}
do_execsql_test_on_specific_db {:memory:} partial-index-update-rowid {
CREATE TABLE rowid_test (id INTEGER PRIMARY KEY, val TEXT, flag INTEGER);
CREATE UNIQUE INDEX idx_flagged ON rowid_test(val) WHERE flag = 1;
INSERT INTO rowid_test VALUES (1, 'test', 1);
INSERT INTO rowid_test VALUES (2, 'test', 0);
UPDATE rowid_test SET id = 10 WHERE id = 1;
SELECT id, val, flag FROM rowid_test ORDER BY id;
} {2|test|0
10|test|1}
do_execsql_test_in_memory_error_content partial-index-update-complex {
CREATE TABLE complex (id INTEGER PRIMARY KEY, a TEXT, b INTEGER, c TEXT);
CREATE UNIQUE INDEX idx_complex ON complex(a) WHERE b > 10 AND c = 'active';
INSERT INTO complex VALUES (1, 'dup', 5, 'active');
INSERT INTO complex VALUES (2, 'dup', 15, 'inactive');
INSERT INTO complex VALUES (3, 'dup', 15, 'active');
INSERT INTO complex VALUES (4, 'dup', 20, 'active');
} {UNIQUE constraint failed: complex.a (19)}
do_execsql_test_on_specific_db {:memory:} partial-index-delete {
CREATE TABLE products (id INTEGER PRIMARY KEY, sku TEXT, price INTEGER);
CREATE UNIQUE INDEX idx_expensive ON products(sku) WHERE price > 100;
INSERT INTO products VALUES (1, 'ABC123', 50);
INSERT INTO products VALUES (2, 'ABC123', 150);
INSERT INTO products VALUES (3, 'XYZ789', 200);
INSERT INTO products VALUES (4, 'ABC123', 75);
DELETE FROM products WHERE price > 100;
INSERT INTO products VALUES (5, 'ABC123', 500);
INSERT INTO products VALUES (6, 'XYZ789', 600);
SELECT id, sku, price FROM products WHERE price > 100 ORDER BY id;
} {5|ABC123|500
6|XYZ789|600}
do_execsql_test_on_specific_db {:memory:} partial-index-delete-function-where {
CREATE TABLE func_del (id INTEGER PRIMARY KEY, name TEXT);
CREATE UNIQUE INDEX idx_lower ON func_del(name) WHERE LOWER(name) = name;
INSERT INTO func_del VALUES (1, 'lowercase');
INSERT INTO func_del VALUES (2, 'UPPERCASE');
INSERT INTO func_del VALUES (3, 'MixedCase');
DELETE FROM func_del WHERE LOWER(name) = name;
-- Should be able to insert lowercase now
INSERT INTO func_del VALUES (4, 'lowercase');
INSERT INTO func_del VALUES (5, 'another');
SELECT id, name FROM func_del ORDER BY id;
} {2|UPPERCASE
3|MixedCase
4|lowercase
5|another}
do_execsql_test_in_memory_error_content partial-index-delete-all {
CREATE TABLE del_all (id INTEGER PRIMARY KEY, val TEXT, flag INTEGER);
CREATE UNIQUE INDEX idx_all ON del_all(val) WHERE flag = 1;
INSERT INTO del_all VALUES (1, 'test', 1), (2, 'test', 0), (3, 'other', 1);
DELETE FROM del_all;
-- Should be able to insert anything now
INSERT INTO del_all VALUES (4, 'test', 1);
INSERT INTO del_all VALUES (5, 'test', 1);
} {UNIQUE constraint failed: del_all.val (19)}
do_execsql_test_on_specific_db {:memory:} partial-index-delete-cascade-scenario {
CREATE TABLE parent_del (id INTEGER PRIMARY KEY, status TEXT);
CREATE TABLE child_del (id INTEGER PRIMARY KEY, parent_id INTEGER, name TEXT, active INTEGER);
CREATE UNIQUE INDEX idx_active_child ON child_del(name) WHERE active = 1;
INSERT INTO parent_del VALUES (1, 'active'), (2, 'inactive');
INSERT INTO child_del VALUES (1, 1, 'child1', 1);
INSERT INTO child_del VALUES (2, 1, 'child2', 1);
INSERT INTO child_del VALUES (3, 2, 'child1', 0);
-- Simulate cascade by deleting children of parent 1
DELETE FROM child_del WHERE parent_id = 1;
-- Should now allow these since active children are gone
INSERT INTO child_del VALUES (4, 2, 'child1', 1);
INSERT INTO child_del VALUES (5, 2, 'child2', 1);
SELECT COUNT(*) FROM child_del WHERE active = 1;
} {2}
do_execsql_test_on_specific_db {:memory:} partial-index-delete-null-where {
CREATE TABLE null_del (id INTEGER PRIMARY KEY, code TEXT, category TEXT);
CREATE UNIQUE INDEX idx_with_category ON null_del(code) WHERE category IS NOT NULL;
INSERT INTO null_del VALUES (1, 'CODE1', 'cat1');
INSERT INTO null_del VALUES (2, 'CODE1', NULL);
INSERT INTO null_del VALUES (3, 'CODE2', 'cat2');
INSERT INTO null_del VALUES (4, 'CODE1', NULL);
-- Delete the one with category
DELETE FROM null_del WHERE code = 'CODE1' AND category IS NOT NULL;
-- Should allow this now
INSERT INTO null_del VALUES (5, 'CODE1', 'cat3');
SELECT id, code, category FROM null_del WHERE code = 'CODE1' ORDER BY id;
} {2|CODE1|
4|CODE1|
5|CODE1|cat3}
do_execsql_test_on_specific_db {:memory:} partial-index-delete-complex-where {
CREATE TABLE complex_del (id INTEGER PRIMARY KEY, a INTEGER, b INTEGER, c TEXT);
CREATE UNIQUE INDEX idx_complex ON complex_del(c) WHERE a > 10 AND b < 20;
INSERT INTO complex_del VALUES (1, 15, 10, 'dup');
INSERT INTO complex_del VALUES (2, 5, 15, 'dup');
INSERT INTO complex_del VALUES (3, 15, 25, 'dup');
INSERT INTO complex_del VALUES (4, 20, 10, 'unique');
-- Delete the one entry that's actually in the partial index
DELETE FROM complex_del WHERE a > 10 AND b < 20;
-- Should now allow this since we deleted the conflicting entry
INSERT INTO complex_del VALUES (5, 12, 18, 'dup');
SELECT COUNT(*) FROM complex_del WHERE c = 'dup';
} {3}
# Entering predicate via UPDATE should conflict with an existing in-predicate key
do_execsql_test_in_memory_error_content partial-index-update-enter-conflict-1 {
CREATE TABLE products (id INTEGER PRIMARY KEY, sku TEXT, price INTEGER);
CREATE UNIQUE INDEX idx_expensive ON products(sku) WHERE price > 100;
INSERT INTO products VALUES (1, 'ABC123', 50);
INSERT INTO products VALUES (2, 'ABC123', 150);
UPDATE products SET price = 200 WHERE id = 1;
} {UNIQUE constraint failed: products.sku (19)}
# Staying in predicate but changing key to a conflicting key should fail
do_execsql_test_in_memory_error_content partial-index-update-change-key-conflict {
CREATE TABLE products (id INTEGER PRIMARY KEY, sku TEXT, price INTEGER);
CREATE UNIQUE INDEX idx_expensive ON products(sku) WHERE price > 100;
INSERT INTO products VALUES (1, 'ABC123', 150);
INSERT INTO products VALUES (2, 'XYZ789', 200);
UPDATE products SET sku = 'XYZ789' WHERE id = 1;
} {UNIQUE constraint failed: products.sku (19)}
# Exiting predicate via UPDATE should remove index entry; then re-entering later may fail
do_execsql_test_in_memory_error_content partial-index-update-exit-then-reenter {
CREATE TABLE products (id INTEGER PRIMARY KEY, sku TEXT, price INTEGER);
CREATE UNIQUE INDEX idx_expensive ON products(sku) WHERE price > 100;
INSERT INTO products VALUES (1, 'ABC123', 150);
UPDATE products SET price = 50 WHERE id = 1;
INSERT INTO products VALUES (2, 'ABC123', 200);
UPDATE products SET price = 300 WHERE id = 1;
} {UNIQUE constraint failed: products.sku (19)}
# Multi-row UPDATE causing multiple rows to enter predicate together should conflict
do_execsql_test_in_memory_error_content partial-index-update-multirow-conflict {
CREATE TABLE products (id INTEGER PRIMARY KEY, sku TEXT, price INTEGER);
CREATE UNIQUE INDEX idx_expensive ON products(sku) WHERE price > 100;
INSERT INTO products VALUES (1, 'ABC123', 50);
INSERT INTO products VALUES (2, 'ABC123', 150);
INSERT INTO products VALUES (3, 'ABC123', 75);
UPDATE products SET price = 150 WHERE sku = 'ABC123';
} {UNIQUE constraint failed: products.sku (19)}
# Update of unrelated columns should not affect partial index membership
do_execsql_test_on_specific_db {:memory:} partial-index-update-unrelated-column {
CREATE TABLE users (id INTEGER PRIMARY KEY, email TEXT, status TEXT, note TEXT);
CREATE UNIQUE INDEX idx_active_email ON users(email) WHERE status = 'active';
INSERT INTO users VALUES (1, 'u@test.com', 'active', 'n1');
INSERT INTO users VALUES (2, 'u@test.com', 'inactive','n2');
UPDATE users SET note = 'changed' WHERE id = 2;
SELECT id,email,status,note FROM users ORDER BY id;
} {1|u@test.com|active|n1
2|u@test.com|inactive|changed}
# NULL -> NOT NULL transition enters predicate and may conflict
do_execsql_test_in_memory_error_content partial-index-update-null-enters-conflict {
CREATE TABLE items (id INTEGER PRIMARY KEY, code TEXT, category TEXT);
CREATE UNIQUE INDEX idx_categorized ON items(code) WHERE category IS NOT NULL;
INSERT INTO items VALUES (1,'CODE1','electronics');
INSERT INTO items VALUES (2,'CODE1',NULL);
UPDATE items SET category = 'x' WHERE id = 2;
} {UNIQUE constraint failed: items.code (19)}
# Function predicate: UPDATE causes entry into predicate -> conflict
do_execsql_test_in_memory_error_content partial-index-update-function-enters {
CREATE TABLE docs (id INTEGER PRIMARY KEY, title TEXT);
CREATE UNIQUE INDEX idx_lower_title ON docs(title) WHERE LOWER(title) = title;
INSERT INTO docs VALUES (1, 'lowercase');
INSERT INTO docs VALUES (2, 'UPPERCASE');
UPDATE docs SET title = 'lowercase' WHERE id = 2;
} {UNIQUE constraint failed: docs.title (19)}
# Multi-column unique key with partial predicate: conflict on UPDATE entering predicate
do_execsql_test_in_memory_error_content partial-index-update-multicol-enter-conflict {
CREATE TABLE inv (id INTEGER PRIMARY KEY, sku TEXT, region TEXT, price INT);
CREATE UNIQUE INDEX idx_sr ON inv(sku,region) WHERE price > 100;
INSERT INTO inv VALUES (1,'A','US', 50);
INSERT INTO inv VALUES (2,'A','US',150);
INSERT INTO inv VALUES (3,'A','EU',150);
UPDATE inv SET price = 200 WHERE id = 1;
} {UNIQUE constraint failed: inv.sku, inv.region (19)}
# Staying in predicate but changing second key part to collide should fail
do_execsql_test_in_memory_error_content partial-index-update-multicol-change-second {
CREATE TABLE inv2 (id INTEGER PRIMARY KEY, sku TEXT, region TEXT, price INT);
CREATE UNIQUE INDEX idx_sr2 ON inv2(sku,region) WHERE price > 100;
INSERT INTO inv2 VALUES (1,'A','US',150);
INSERT INTO inv2 VALUES (2,'A','EU',150);
UPDATE inv2 SET region = 'US' WHERE id = 2;
} {UNIQUE constraint failed: inv2.sku, inv2.region (19)}
# UPDATE that leaves predicate and then changes key should be allowed, then re-entering may fail
do_execsql_test_in_memory_error_content partial-index-update-exit-change-key-reenter {
CREATE TABLE t (id INTEGER PRIMARY KEY, a TEXT, b INT);
CREATE UNIQUE INDEX idx_a ON t(a) WHERE b > 0;
INSERT INTO t VALUES (1,'K', 10);
INSERT INTO t VALUES (2,'X', 10);
UPDATE t SET b = 0 WHERE id = 1;
UPDATE t SET a = 'X' WHERE id = 1;
UPDATE t SET b = 5 WHERE id = 1;
} {UNIQUE constraint failed: t.a (19)}
# Rowid (INTEGER PRIMARY KEY) change while in predicate should not self-conflict
do_execsql_test_on_specific_db {:memory:} partial-index-update-rowid-no-self-conflict {
CREATE TABLE rowid_test (id INTEGER PRIMARY KEY, val TEXT, flag INT);
CREATE UNIQUE INDEX idx_flagged ON rowid_test(val) WHERE flag = 1;
INSERT INTO rowid_test VALUES (1,'v',1);
UPDATE rowid_test SET id = 9 WHERE id = 1;
SELECT id,val,flag FROM rowid_test ORDER BY id;
} {9|v|1}
# Batch UPDATE that toggles predicate truth for multiple rows; ensure net uniqueness is enforced
do_execsql_test_in_memory_error_content partial-index-update-batch-crossing {
CREATE TABLE p (id INTEGER PRIMARY KEY, k TEXT, x INT);
CREATE UNIQUE INDEX idx_k ON p(k) WHERE x > 0;
INSERT INTO p VALUES (1,'A', 1);
INSERT INTO p VALUES (2,'A', 0);
INSERT INTO p VALUES (3,'A', 0);
UPDATE p SET x = CASE id WHEN 1 THEN 0 ELSE 1 END;
} {UNIQUE constraint failed: p.k (19)}
# UPDATE with WHERE predicate true, but changing to a unique new key while staying in predicate
do_execsql_test_on_specific_db {:memory:} partial-index-update-stay-in-predicate-change-to-unique {
CREATE TABLE q (id INTEGER PRIMARY KEY, k TEXT, x INT);
CREATE UNIQUE INDEX idx_kx ON q(k) WHERE x > 0;
INSERT INTO q VALUES (1,'A',1);
INSERT INTO q VALUES (2,'B',1);
UPDATE q SET k='C' WHERE id=1; -- stays in predicate, key now unique
SELECT id,k,x FROM q ORDER BY id;
} {1|C|1
2|B|1}
do_execsql_test_in_memory_error_content partial-index-update-only-predicate-col-error {
CREATE TABLE r2 (id INTEGER PRIMARY KEY, k TEXT, x INT);
CREATE UNIQUE INDEX idx_k ON r2(k) WHERE x > 0;
INSERT INTO r2 VALUES (1,'A',0);
INSERT INTO r2 VALUES (2,'A',1);
UPDATE r2 SET x = 1 WHERE id = 1;
} {UNIQUE constraint failed: r2.k (19)}
do_execsql_test_on_specific_db {:memory:} partial-index-multi-predicate-references {
CREATE TABLE r2 (id INTEGER PRIMARY KEY, k TEXT, x INT);
CREATE UNIQUE INDEX idx_k ON r2(k) WHERE x < 10 AND id > 10;
INSERT INTO r2 (k,x) VALUES ('A',1), ('A',2), ('A',3), ('A',4), ('A',5), ('A',6), ('A',7), ('A',8), ('A', 9), ('A', 10), ('A', 10);
-- now `id` will be greater than 10, so anything added with k='A' and x<10 should conflict
INSERT INTO r2 (k,x) VALUES ('A',11);
INSERT INTO r2 (k,x) VALUES ('A',12);
SELECT id FROM r2 ORDER BY id DESC LIMIT 1;
} {13}
do_execsql_test_in_memory_error_content partial-index-multi-predicate-references-rowid-alais {
CREATE TABLE r2 (id INTEGER PRIMARY KEY, k TEXT, x INT);
CREATE UNIQUE INDEX idx_k ON r2(k) WHERE x < 10 AND id > 10;
INSERT INTO r2 (k,x) VALUES ('A',1), ('A',2), ('A',3), ('A',4), ('A',5), ('A',6), ('A',7), ('A',8), ('A', 9), ('A', 10), ('A', 10);
-- now `id` will be greater than 10, so anything added with k='A' and x<10 should conflict
INSERT INTO r2 (k,x) VALUES ('A',11);
INSERT INTO r2 (k,x) VALUES ('A',12);
INSERT INTO r2 (k,x) VALUES ('A', 3);
INSERT INTO r2 (k,x) VALUES ('A', 9);
-- should fail now
} {UNIQUE constraint failed: r2.k (19)}
do_execsql_test_in_memory_any_error upsert-partial-donothing-basic {
CREATE TABLE u1(id INTEGER PRIMARY KEY, email TEXT, status TEXT, note TEXT);
CREATE UNIQUE INDEX idx_active_email ON u1(email) WHERE status='active';
INSERT INTO u1(email,status,note)
VALUES('a@test','active','n3')
ON CONFLICT(email) DO NOTHING;
}
do_execsql_test_on_specific_db {:memory:} upsert-partial-doupdate-basic {
CREATE TABLE u2(id INTEGER PRIMARY KEY, email TEXT, status TEXT, note TEXT);
CREATE UNIQUE INDEX idx_active_email ON u2(email) WHERE status='active';
INSERT INTO u2 VALUES (1,'a@test','active','n1');
INSERT INTO u2(email,status,note)
VALUES('a@test','active','nNEW')
ON CONFLICT DO UPDATE SET note=excluded.note;
SELECT id,email,status,note FROM u2;
} {1|a@test|active|nNEW}
do_execsql_test_on_specific_db {:memory:} upsert-partial-doupdate-leave-predicate {
CREATE TABLE u3(id INTEGER PRIMARY KEY, email TEXT, status TEXT);
CREATE UNIQUE INDEX idx_active_email ON u3(email) WHERE status='active';
INSERT INTO u3 VALUES (1,'a@test','active');
INSERT INTO u3(email,status)
VALUES('a@test','active')
ON CONFLICT DO UPDATE SET status='inactive';
-- After update, the conflicting row no longer participates in idx predicate.
-- Insert should now succeed for active variant.
INSERT INTO u3 VALUES (2,'a@test','active');
SELECT id,email,status FROM u3 ORDER BY id;
} {1|a@test|inactive 2|a@test|active}
do_execsql_test_on_specific_db {:memory:} upsert-partial-doupdate-where-skip {
CREATE TABLE u4(id INTEGER PRIMARY KEY, email TEXT, status TEXT, hits INT DEFAULT 0);
CREATE UNIQUE INDEX idx_active_email ON u4(email) WHERE status='active';
INSERT INTO u4 VALUES(1,'a@test','active',5);
INSERT INTO u4(email,status)
VALUES('a@test','active')
ON CONFLICT DO UPDATE SET hits=hits+1 WHERE excluded.status='inactive';
-- filter false => no UPDATE; constraint remains => INSERT must be suppressed,
-- SQLite semantics: when WHERE is false, the UPSERT does nothing (no row added).
SELECT id,email,status,hits FROM u4 ORDER BY id;
} {1|a@test|active|5}
do_execsql_test_on_specific_db {:memory:} upsert-partial-omitted-target-matches {
CREATE TABLE u6(id INTEGER PRIMARY KEY, email TEXT, status TEXT, n INT);
CREATE UNIQUE INDEX idx_active_email ON u6(email) WHERE status='active';
INSERT INTO u6 VALUES (1,'a@test','active',0);
INSERT INTO u6(email,status,n)
VALUES('a@test','active',10)
ON CONFLICT DO UPDATE SET n = excluded.n;
SELECT id,email,status,n FROM u6;
} {1|a@test|active|10}
do_execsql_test_on_specific_db {:memory:} upsert-partial-multicol-leave-predicate {
CREATE TABLE m2(id INTEGER PRIMARY KEY, sku TEXT, region TEXT, price INT);
CREATE UNIQUE INDEX idx_sr ON m2(sku,region) WHERE price > 100;
INSERT INTO m2 VALUES(1,'A','US',150);
INSERT INTO m2(sku,region,price)
VALUES('A','US',150)
ON CONFLICT DO UPDATE SET price=50;
-- Now predicate false; insert another high-price duplicate should succeed
INSERT INTO m2 VALUES(2,'A','US',200);
SELECT id,sku,region,price FROM m2 ORDER BY id;
} {1|A|US|50 2|A|US|200}
do_execsql_test_on_specific_db {:memory:} upsert-partial-func-predicate {
CREATE TABLE d1(id INTEGER PRIMARY KEY, title TEXT, n INT DEFAULT 0);
CREATE UNIQUE INDEX idx_lower_title ON d1(title) WHERE LOWER(title)=title;
INSERT INTO d1 VALUES(1,'lower',0);
INSERT INTO d1(title)
VALUES('lower')
ON CONFLICT DO UPDATE SET n = n+1;
SELECT id,title,n FROM d1;
} {1|lower|1}
do_execsql_test_on_specific_db {:memory:} upsert-partial-rowid-predicate {
CREATE TABLE r1(id INTEGER PRIMARY KEY, k TEXT, x INT, hits INT DEFAULT 0);
CREATE UNIQUE INDEX idx_k ON r1(k) WHERE x < 10 AND id > 10;
-- create ids 1..12, with ('A', >=10) rows to push rowid>10
INSERT INTO r1(k,x) VALUES('A',10),('A',10),('A',10),('A',10),('A',10),
('A',10),('A',10),('A',10),('A',10),('A',10),('A',11),('A',12);
-- Now conflict for ('A', 5) is against partial index (id>10 & x<10)
INSERT INTO r1(k,x,hits)
VALUES('A',5,1)
ON CONFLICT DO UPDATE SET hits = hits + excluded.hits;
SELECT k, SUM(hits) FROM r1 GROUP BY k;
} {A|1}
# EXCLUDED usage inside DO UPDATE stays within predicate and changes key
do_execsql_test_on_specific_db {:memory:} upsert-partial-excluded-rewrite {
CREATE TABLE ex1(id INTEGER PRIMARY KEY, a TEXT, b INT, c TEXT);
CREATE UNIQUE INDEX idx_a ON ex1(a) WHERE b>0;
INSERT INTO ex1 VALUES(1,'X',1,'old');
INSERT INTO ex1(a,b,c)
VALUES('X',1,'new')
ON CONFLICT DO UPDATE SET c = excluded.c, b = excluded.b;
SELECT id,a,b,c FROM ex1;
} {1|X|1|new}
do_execsql_test_on_specific_db {:memory:} upsert-partial-stay-change-to-unique {
CREATE TABLE s1(id INTEGER PRIMARY KEY, a TEXT, flag INT);
CREATE UNIQUE INDEX idx_a ON s1(a) WHERE flag=1;
INSERT INTO s1 VALUES(1,'K',1);
INSERT INTO s1(a,flag)
VALUES('K',1)
ON CONFLICT DO UPDATE SET a='K2';
SELECT id,a,flag FROM s1;
} {1|K2|1}
do_execsql_test_on_specific_db {:memory:} upsert-partial-toggle-predicate {
CREATE TABLE tgl(id INTEGER PRIMARY KEY, k TEXT, x INT);
CREATE UNIQUE INDEX idx_k ON tgl(k) WHERE x>0;
INSERT INTO tgl VALUES(1,'A',1);
-- Conflicts on 'A', flips x to 0 (leaves predicate)
INSERT INTO tgl(k,x)
VALUES('A',1)
ON CONFLICT DO UPDATE SET x=0;
-- Now another 'A' with x>0 should insert
INSERT INTO tgl VALUES(2,'A',5);
SELECT id,k,x FROM tgl ORDER BY id;
} {1|A|0 2|A|5}
do_execsql_test_in_memory_error_content upsert-partial-target-pk-only {
CREATE TABLE pko(id INTEGER PRIMARY KEY, k TEXT, x INT);
CREATE UNIQUE INDEX idx_k ON pko(k) WHERE x>0;
INSERT INTO pko VALUES(1,'A',1);
-- Target PK only; conflict is on idx_k, so DO UPDATE must NOT fire and error is raised
INSERT INTO pko(id,k,x)
VALUES(2,'A',1)
ON CONFLICT(id) DO UPDATE SET x=99;
} {UNIQUE constraint failed: pko.k (19)}
do_execsql_test_on_specific_db {:memory:} upsert-partial-omitted-no-conflict {
CREATE TABLE insfree(id INTEGER PRIMARY KEY, k TEXT, x INT);
CREATE UNIQUE INDEX idx_k ON insfree(k) WHERE x>0;
INSERT INTO insfree VALUES(1,'A',1);
-- x=0 => not in predicate, so no conflict; row must be inserted
INSERT INTO insfree(k,x)
VALUES('A',0)
ON CONFLICT DO NOTHING;
SELECT COUNT(*) FROM insfree WHERE k='A';
} {2}

View File

@@ -656,6 +656,322 @@ mod tests {
}
}
#[test]
pub fn partial_index_mutation_and_upsert_fuzz() {
let _ = env_logger::try_init();
const OUTER_ITERS: usize = 5;
const INNER_ITERS: usize = 500;
let (mut rng, seed) = if std::env::var("SEED").is_ok() {
let seed = std::env::var("SEED").unwrap().parse::<u64>().unwrap();
(ChaCha8Rng::seed_from_u64(seed), seed)
} else {
rng_from_time()
};
println!("partial_index_mutation_and_upsert_fuzz seed: {seed}");
// we want to hit unique constraints fairly often so limit the insert values
const K_POOL: [&str; 35] = [
"a", "aa", "abc", "A", "B", "zzz", "foo", "bar", "baz", "fizz", "buzz", "bb", "cc",
"dd", "ee", "ff", "gg", "hh", "jj", "kk", "ll", "mm", "nn", "oo", "pp", "qq", "rr",
"ss", "tt", "uu", "vv", "ww", "xx", "yy", "zz",
];
for outer in 0..OUTER_ITERS {
println!(" ");
println!(
"partial_index_mutation_and_upsert_fuzz iteration {}/{}",
outer + 1,
OUTER_ITERS
);
// Columns: id (rowid PK), plus a few data columns we can reference in predicates/keys.
let limbo_db = TempDatabase::new_empty(true);
let sqlite_db = TempDatabase::new_empty(true);
let limbo_conn = limbo_db.connect_limbo();
let sqlite = rusqlite::Connection::open(sqlite_db.path.clone()).unwrap();
let num_cols = rng.random_range(2..=4);
// We'll always include a TEXT "k" and a couple INT columns to give predicates variety.
// Build: id INTEGER PRIMARY KEY, k TEXT, c0 INT, c1 INT, ...
let mut cols: Vec<String> = vec!["id INTEGER PRIMARY KEY".into(), "k TEXT".into()];
for i in 0..(num_cols - 1) {
cols.push(format!("c{i} INT"));
}
let create = format!("CREATE TABLE t ({})", cols.join(", "));
println!("{create};");
limbo_exec_rows(&limbo_db, &limbo_conn, &create);
sqlite.execute(&create, rusqlite::params![]).unwrap();
// Helper to list usable columns for keys/predicates
let int_cols: Vec<String> = (0..(num_cols - 1)).map(|i| format!("c{i}")).collect();
let functions = ["lower", "upper", "length"];
let num_pidx = rng.random_range(0..=3);
let mut idx_ddls: Vec<String> = Vec::new();
for i in 0..num_pidx {
// Pick 1 or 2 key columns; always include "k" sometimes to get frequent conflicts.
let mut key_cols = Vec::new();
if rng.random_bool(0.7) {
key_cols.push("k".to_string());
}
if key_cols.is_empty() || rng.random_bool(0.5) {
// Add one INT col to make compound keys common
if !int_cols.is_empty() {
let c = int_cols[rng.random_range(0..int_cols.len())].clone();
if !key_cols.contains(&c) {
key_cols.push(c);
}
}
}
// Ensure at least one key column
if key_cols.is_empty() {
key_cols.push("k".to_string());
}
// Build a simple deterministic partial predicate:
// Examples:
// c0 > 10 AND c1 < 50
// c0 IS NOT NULL
// id > 5 AND c0 >= 0
// lower(k) = k
let pred = {
// parts we can AND/OR (well only AND for stability)
let mut parts: Vec<String> = Vec::new();
// Maybe include rowid (id) bound
if rng.random_bool(0.4) {
let n = rng.random_range(0..20);
let op = *["<", "<=", ">", ">="].choose(&mut rng).unwrap();
parts.push(format!("id {op} {n}"));
}
// Maybe include int column comparison
if !int_cols.is_empty() && rng.random_bool(0.8) {
let c = &int_cols[rng.random_range(0..int_cols.len())];
match rng.random_range(0..3) {
0 => parts.push(format!("{c} IS NOT NULL")),
1 => {
let n = rng.random_range(-10..=20);
let op = *["<", "<=", "=", ">=", ">"].choose(&mut rng).unwrap();
parts.push(format!("{c} {op} {n}"));
}
_ => {
let n = rng.random_range(0..=1);
parts.push(format!(
"{c} IS {}",
if n == 0 { "NULL" } else { "NOT NULL" }
));
}
}
}
if rng.random_bool(0.2) {
parts.push(format!("{}(k) = k", functions.choose(&mut rng).unwrap()));
}
// Guarantee at least one part
if parts.is_empty() {
parts.push("1".to_string());
}
parts.join(" AND ")
};
let ddl = format!(
"CREATE UNIQUE INDEX idx_p{}_{} ON t({}) WHERE {}",
outer,
i,
key_cols.join(","),
pred
);
idx_ddls.push(ddl.clone());
// Create in both engines
println!("{ddl};");
limbo_exec_rows(&limbo_db, &limbo_conn, &ddl);
sqlite.execute(&ddl, rusqlite::params![]).unwrap();
}
let seed_rows = rng.random_range(10..=80);
for _ in 0..seed_rows {
let k = *K_POOL.choose(&mut rng).unwrap();
let mut vals: Vec<String> = vec!["NULL".into(), format!("'{k}'")]; // id NULL -> auto
for _ in 0..(num_cols - 1) {
// bias a bit toward small ints & NULL to make predicate flipping common
let v = match rng.random_range(0..6) {
0 => "NULL".into(),
_ => rng.random_range(-5..=15).to_string(),
};
vals.push(v);
}
let ins = format!("INSERT INTO t VALUES ({})", vals.join(", "));
// Execute on both; ignore errors due to partial unique conflicts (keep seeding going)
let _ = sqlite.execute(&ins, rusqlite::params![]);
let _ = limbo_exec_rows_fallible(&limbo_db, &limbo_conn, &ins);
}
for _ in 0..INNER_ITERS {
let action = rng.random_range(0..4); // 0: INSERT, 1: UPDATE, 2: DELETE, 3: UPSERT (catch-all)
let stmt = match action {
// INSERT
0 => {
let k = *K_POOL.choose(&mut rng).unwrap();
let mut cols_list = vec!["k".to_string()];
let mut vals_list = vec![format!("'{k}'")];
for i in 0..(num_cols - 1) {
if rng.random_bool(0.8) {
cols_list.push(format!("c{i}"));
vals_list.push(if rng.random_bool(0.15) {
"NULL".into()
} else {
rng.random_range(-5..=15).to_string()
});
}
}
format!(
"INSERT INTO t({}) VALUES({})",
cols_list.join(","),
vals_list.join(",")
)
}
// UPDATE (randomly touch either key or predicate column)
1 => {
// choose a column
let col_pick = if rng.random_bool(0.5) {
"k".to_string()
} else {
format!("c{}", rng.random_range(0..(num_cols - 1)))
};
let new_val = if col_pick == "k" {
format!("'{}'", K_POOL.choose(&mut rng).unwrap())
} else if rng.random_bool(0.2) {
"NULL".into()
} else {
rng.random_range(-5..=15).to_string()
};
// predicate to affect some rows
let wc = if rng.random_bool(0.6) {
let pred_col = format!("c{}", rng.random_range(0..(num_cols - 1)));
let op = *["<", "<=", "=", ">=", ">"].choose(&mut rng).unwrap();
let n = rng.random_range(-5..=15);
format!("WHERE {pred_col} {op} {n}")
} else {
// toggle rows by id parity
"WHERE (id % 2) = 0".into()
};
format!("UPDATE t SET {col_pick} = {new_val} {wc}")
}
// DELETE
2 => {
let wc = if rng.random_bool(0.5) {
// delete rows inside partial predicate zones
match int_cols.len() {
0 => "WHERE lower(k) = k".to_string(),
_ => {
let c = &int_cols[rng.random_range(0..int_cols.len())];
let n = rng.random_range(-5..=15);
let op = *["<", "<=", "=", ">=", ">"].choose(&mut rng).unwrap();
format!("WHERE {c} {op} {n}")
}
}
} else {
"WHERE id % 3 = 1".to_string()
};
format!("DELETE FROM t {wc}")
}
// UPSERT catch-all is allowed even if only partial unique constraints exist
3 => {
let k = *K_POOL.choose(&mut rng).unwrap();
let mut cols_list = vec!["k".to_string()];
let mut vals_list = vec![format!("'{k}'")];
for i in 0..(num_cols - 1) {
if rng.random_bool(0.8) {
cols_list.push(format!("c{i}"));
vals_list.push(if rng.random_bool(0.2) {
"NULL".into()
} else {
rng.random_range(-5..=15).to_string()
});
}
}
if rng.random_bool(0.3) {
// 30% chance ON CONFLICT DO UPDATE SET ...
let mut set_list = Vec::new();
let num_set = rng.random_range(1..=cols_list.len());
let set_cols = cols_list
.choose_multiple(&mut rng, num_set)
.cloned()
.collect::<Vec<_>>();
for c in set_cols.iter() {
let v = if c == "k" {
format!("'{}'", K_POOL.choose(&mut rng).unwrap())
} else if rng.random_bool(0.2) {
"NULL".into()
} else {
rng.random_range(-5..=15).to_string()
};
set_list.push(format!("{c} = {v}"));
}
format!(
"INSERT INTO t({}) VALUES({}) ON CONFLICT DO UPDATE SET {}",
cols_list.join(","),
vals_list.join(","),
set_list.join(", ")
)
} else {
format!(
"INSERT INTO t({}) VALUES({}) ON CONFLICT DO NOTHING",
cols_list.join(","),
vals_list.join(",")
)
}
}
_ => unreachable!(),
};
// Execute on SQLite first; capture success/error, then run on turso and demand same outcome.
let sqlite_res = sqlite.execute(&stmt, rusqlite::params![]);
let limbo_res = limbo_exec_rows_fallible(&limbo_db, &limbo_conn, &stmt);
match (sqlite_res, limbo_res) {
(Ok(_), Ok(_)) => {
println!("{stmt};");
// Compare canonical table state
let verify = format!(
"SELECT id, k{} FROM t ORDER BY id, k{}",
(0..(num_cols - 1))
.map(|i| format!(", c{i}"))
.collect::<String>(),
(0..(num_cols - 1))
.map(|i| format!(", c{i}"))
.collect::<String>(),
);
let s = sqlite_exec_rows(&sqlite, &verify);
let l = limbo_exec_rows(&limbo_db, &limbo_conn, &verify);
assert_eq!(
l, s,
"stmt: {stmt}, seed: {seed}, create: {create}, idx: {idx_ddls:?}"
);
}
(Err(_), Err(_)) => {
// Both errored
continue;
}
// Mismatch: dump context
(ok_sqlite, ok_turso) => {
println!("{stmt};");
eprintln!("Schema: {create};");
for d in idx_ddls.iter() {
eprintln!("{d};");
}
panic!(
"DML outcome mismatch (sqlite: {ok_sqlite:?}, turso ok: {ok_turso:?}) \n
stmt: {stmt}, seed: {seed}"
);
}
}
}
}
}
#[test]
pub fn compound_select_fuzz() {
let _ = env_logger::try_init();