Support proper index handling when doing insertions

This commit is contained in:
PThorpe92
2025-04-05 16:12:59 -04:00
parent f7de575873
commit 1f29307fe8
4 changed files with 166 additions and 6 deletions

View File

@@ -85,6 +85,19 @@ impl Schema {
let name = normalize_ident(table_name);
self.indexes.remove(&name);
}
pub fn get_index_for_column(&self, table_name: &str, column_name: &str) -> Option<Arc<Index>> {
if let Some(indexes) = self.indexes.get(table_name) {
for index in indexes {
for column in &index.columns {
if column.name.eq_ignore_ascii_case(column_name) {
return Some(index.clone());
}
}
}
}
None
}
}
#[derive(Clone, Debug)]

View File

@@ -172,6 +172,11 @@ pub fn translate_create_index(
cursor_id: table_cursor_id,
dest: rowid_reg,
});
// if the rowid is null, skip the insert
program.emit_insn(Insn::IsNull {
reg: rowid_reg,
target_pc: loop_end_label,
});
let record_reg = program.alloc_register();
program.emit_insn(Insn::MakeRecord {
start_reg,

View File

@@ -6,10 +6,10 @@ use limbo_sqlite3_parser::ast::{
};
use crate::error::SQLITE_CONSTRAINT_PRIMARYKEY;
use crate::schema::Table;
use crate::schema::{IndexColumn, Table};
use crate::util::normalize_ident;
use crate::vdbe::builder::{ProgramBuilderOpts, QueryMode};
use crate::vdbe::insn::RegisterOrLiteral;
use crate::vdbe::insn::{IdxInsertFlags, RegisterOrLiteral};
use crate::vdbe::BranchOffset;
use crate::{
schema::{Column, Schema},
@@ -83,6 +83,22 @@ pub fn translate_insert(
Some(table_name.0.clone()),
CursorType::BTreeTable(btree_table.clone()),
);
// allocate cursor id's for each btree index cursor we'll need to populate the indexes
// (idx name, root_page, idx cursor id)
let idx_cursors = schema
.get_indices(&table_name.0)
.iter()
.map(|idx| {
(
&idx.name,
idx.root_page,
program.alloc_cursor_id(
Some(table_name.0.clone()),
CursorType::BTreeIndex(idx.clone()),
),
)
})
.collect::<Vec<(&String, usize, usize)>>();
let root_page = btree_table.root_page;
let values = match body {
InsertBody::Select(select, _) => match &select.body.select.deref() {
@@ -93,6 +109,7 @@ pub fn translate_insert(
};
let column_mappings = resolve_columns_for_insert(&table, columns, values)?;
let index_col_mappings = resolve_indicies_for_insert(schema, table.as_ref(), &column_mappings)?;
// Check if rowid was provided (through INTEGER PRIMARY KEY as a rowid alias)
let rowid_alias_index = btree_table.columns.iter().position(|c| c.is_rowid_alias);
let has_user_provided_rowid = {
@@ -183,7 +200,14 @@ pub fn translate_insert(
&resolver,
)?;
}
// Open all the index btrees for writing
for idx_cursor in idx_cursors.iter() {
program.emit_insn(Insn::OpenWriteAsync {
cursor_id: idx_cursor.2,
root_page: idx_cursor.1.into(),
});
program.emit_insn(Insn::OpenWriteAwait {});
}
// Common record insertion logic for both single and multiple rows
let check_rowid_is_integer_label = rowid_alias_reg.and(Some(program.allocate_label()));
if let Some(reg) = rowid_alias_reg {
@@ -265,7 +289,54 @@ pub fn translate_insert(
flag: 0,
});
program.emit_insn(Insn::InsertAwait { cursor_id });
for index_col_mapping in index_col_mappings.iter() {
// find which cursor we opened earlier for this index
let idx_cursor_id = idx_cursors
.iter()
.find(|(name, _, _)| *name == &index_col_mapping.idx_name)
.map(|(_, _, c_id)| *c_id)
.expect("no cursor found for index");
let num_cols = index_col_mapping.columns.len();
// allocate scratch registers for the index columns plus rowid
let idx_start_reg = program.alloc_registers(num_cols + 1);
// copy each index column from the table's column registers into these scratch regs
for (i, col) in index_col_mapping.columns.iter().enumerate() {
// copy from the table's column register over to the index's scratch register
program.emit_insn(Insn::Copy {
src_reg: column_registers_start + col.0,
dst_reg: idx_start_reg + i,
amount: 0,
});
}
// last register is the rowid
program.emit_insn(Insn::Copy {
src_reg: rowid_reg,
dst_reg: idx_start_reg + num_cols,
amount: 0,
});
let record_reg = program.alloc_register();
program.emit_insn(Insn::MakeRecord {
start_reg: idx_start_reg,
count: num_cols + 1,
dest_reg: record_reg,
});
// now do the actual index insertion using the unpacked registers
program.emit_insn(Insn::IdxInsertAsync {
cursor_id: idx_cursor_id,
record_reg,
unpacked_start: Some(idx_start_reg), // TODO: enable optimization
unpacked_count: Some((num_cols + 1) as u16),
// TODO: figure out how to determine whether or not we need to seek prior to insert.
flags: IdxInsertFlags::new(),
});
program.emit_insn(Insn::IdxInsertAwait {
cursor_id: idx_cursor_id,
});
}
if inserting_multiple_rows {
// For multiple rows, loop back
program.emit_insn(Insn::Goto {
@@ -393,6 +464,78 @@ fn resolve_columns_for_insert<'a>(
Ok(mappings)
}
/// Represents how a column in an index should be populated during an INSERT.
/// Similar to ColumnMapping above but includes the index name, as well as multiple
/// possible value indices for each.
#[derive(Default)]
struct IndexColMapping {
idx_name: String,
columns: Vec<(usize, IndexColumn)>,
value_indicies: Vec<Option<usize>>,
}
impl IndexColMapping {
fn new(name: String) -> Self {
IndexColMapping {
idx_name: name,
..Default::default()
}
}
}
/// Example:
/// Table 'test': (a, b, c);
/// Index 'idx': test(a, b);
///________________________________
/// Insert (a, c): (2, 3)
/// Record: (2, NULL, 3)
/// IndexColMapping: (a, b) = (2, NULL)
fn resolve_indicies_for_insert<'a>(
schema: &Schema,
table: &Table,
columns: &[ColumnMapping],
) -> Result<Vec<IndexColMapping>> {
let mut index_col_mappings = Vec::new();
for col in columns {
// check if any of the inserted columns are part of an index
if let Some(index) =
schema.get_index_for_column(table.get_name(), col.column.name.as_ref().unwrap())
{
// check if the index is already in the list
if index_col_mappings
.iter()
.any(|i: &IndexColMapping| i.idx_name.eq_ignore_ascii_case(&index.name))
{
continue;
}
let mut idx_col_map = IndexColMapping::new(index.name.clone()); //todo: rm clone -_-
for column in &index.columns {
let column_name = normalize_ident(column.name.as_str());
// find the other columns in the index that are not part of the insert
if let Some((i, index_column)) = columns.iter().enumerate().find(|(_, c)| {
c.column
.name
.as_ref()
.is_some_and(|c| c.eq_ignore_ascii_case(&column_name))
}) {
// the column is also part of the insert
idx_col_map.columns.push((i, column.clone()));
// store the value index (which may be null if not part of the insert)
idx_col_map.value_indicies.push(index_column.value_index);
} else {
// column not found, meaning the ColumnMapping failed, thus we bail
return Err(crate::LimboError::ParseError(format!(
"Column {} not found in index {}",
column_name, index.name
)));
}
}
index_col_mappings.push(idx_col_map);
}
}
Ok(index_col_mappings)
}
/// Populates the column registers with values for a single row
fn populate_column_registers(
program: &mut ProgramBuilder,

View File

@@ -627,11 +627,10 @@ impl Row {
pub fn get_value<'a>(&'a self, idx: usize) -> &'a OwnedValue {
let value = unsafe { self.values.add(idx).as_ref().unwrap() };
let value = match value {
match value {
Register::OwnedValue(owned_value) => owned_value,
_ => unreachable!("a row should be formed of values only"),
};
value
}
}
pub fn get_values(&self) -> impl Iterator<Item = &OwnedValue> {