Merge 'index_method: fully integrate into query planner' from Nikita Sivukhin

This PR completely integrate custom indices to the query planner.
In order to do that new `Cursor::IndexMethod` is introduced with few
correlated changes in the VM implementation:
1. Added special `IndexMethod{Create,Destroy,Query}` opcodes to handle
index method creation, deletion and query
2. `Next` , `IdxRowid` , `IdxInsert`, `IdxDelete` opcodes updated to
properly handle new cursor case

Reviewed-by: Jussi Saurio <jussi.saurio@gmail.com>

Closes #3827
This commit is contained in:
Jussi Saurio
2025-10-29 09:42:37 +02:00
committed by GitHub
20 changed files with 1105 additions and 148 deletions

View File

@@ -865,7 +865,7 @@ impl Limbo {
) -> usize {
let indent_count = match prev_insn {
"Rewind" | "Last" | "SorterSort" | "SeekGE" | "SeekGT" | "SeekLE"
| "SeekLT" | "BeginSubrtn" => indent_count + 1,
| "SeekLT" | "BeginSubrtn" | "IndexMethodQuery" => indent_count + 1,
_ => indent_count,
};

View File

@@ -87,7 +87,9 @@ pub trait IndexMethodCursor {
/// For example, for 2 patterns ["SELECT * FROM {table} LIMIT ?", "SELECT * FROM {table} WHERE x = ?"], query_start(...) call can have following arguments:
/// - [Integer(0), Integer(10)] - pattern "SELECT * FROM {table} LIMIT ?" was chosen with LIMIT parameter equals to 10
/// - [Integer(1), Text("turso")] - pattern "SELECT * FROM {table} WHERE x = ?" was chosen with equality comparison equals to "turso"
fn query_start(&mut self, values: &[Register]) -> Result<IOResult<()>>;
///
/// Returns false if query will produce no rows (similar to VFilter/Rewind op codes)
fn query_start(&mut self, values: &[Register]) -> Result<IOResult<bool>>;
/// Moves cursor to the next response row
/// Returns false if query exhausted all rows

View File

@@ -43,6 +43,7 @@ pub enum VectorSparseInvertedIndexCreateState {
Run { stmt: Box<Statement> },
}
#[derive(Debug)]
pub enum VectorSparseInvertedIndexInsertState {
Init,
Prepare {
@@ -64,6 +65,7 @@ pub enum VectorSparseInvertedIndexInsertState {
},
}
#[derive(Debug)]
pub enum VectorSparseInvertedIndexDeleteState {
Init,
Prepare {
@@ -151,7 +153,6 @@ pub struct VectorSparseInvertedIndexMethodCursor {
delete_state: VectorSparseInvertedIndexDeleteState,
search_state: VectorSparseInvertedIndexSearchState,
search_result: VecDeque<(i64, f64)>,
search_row: Option<(i64, f64)>,
}
impl IndexMethod for VectorSparseInvertedIndexMethod {
@@ -199,7 +200,6 @@ impl VectorSparseInvertedIndexMethodCursor {
scratch_cursor: None,
main_btree: None,
search_result: VecDeque::new(),
search_row: None,
create_state: VectorSparseInvertedIndexCreateState::Init,
insert_state: VectorSparseInvertedIndexInsertState::Init,
delete_state: VectorSparseInvertedIndexDeleteState::Init,
@@ -288,6 +288,7 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor {
));
};
loop {
tracing::debug!("insert_state: {:?}", self.insert_state);
match &mut self.insert_state {
VectorSparseInvertedIndexInsertState::Init => {
let Some(vector) = values[0].get_value().to_blob() else {
@@ -377,6 +378,7 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor {
));
};
loop {
tracing::debug!("delete_state: {:?}", self.delete_state);
match &mut self.delete_state {
VectorSparseInvertedIndexDeleteState::Init => {
let Some(vector) = values[0].get_value().to_blob() else {
@@ -459,7 +461,7 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor {
}
}
fn query_start(&mut self, values: &[Register]) -> Result<IOResult<()>> {
fn query_start(&mut self, values: &[Register]) -> Result<IOResult<bool>> {
let Some(scratch) = &mut self.scratch_cursor else {
return Err(LimboError::InternalError(
"cursor must be opened".to_string(),
@@ -471,7 +473,7 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor {
));
};
loop {
tracing::debug!("state: {:?}", self.search_state);
tracing::debug!("query_state: {:?}", self.search_state);
match &mut self.search_state {
VectorSparseInvertedIndexSearchState::Init => {
let Some(vector) = values[1].get_value().to_blob() else {
@@ -521,7 +523,7 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor {
continue;
}
let position = p[*idx];
let key = ImmutableRecord::from_values(&[Value::Integer(position as i64)], 2);
let key = ImmutableRecord::from_values(&[Value::Integer(position as i64)], 1);
self.search_state = VectorSparseInvertedIndexSearchState::Seek {
collected: collected.take(),
positions: positions.take(),
@@ -551,7 +553,7 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor {
limit: *limit,
};
}
SeekResult::TryAdvance => {
SeekResult::TryAdvance | SeekResult::NotFound => {
self.search_state = VectorSparseInvertedIndexSearchState::Next {
collected: collected.take(),
positions: positions.take(),
@@ -560,9 +562,6 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor {
limit: *limit,
};
}
SeekResult::NotFound => {
return Err(LimboError::Corrupt("inverted index corrupted".to_string()))
}
}
}
VectorSparseInvertedIndexSearchState::Read {
@@ -637,7 +636,7 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor {
let Some(rowid) = rowids.as_ref().unwrap().last() else {
let distances = distances.take().unwrap();
self.search_result = distances.iter().map(|(d, i)| (*i, d.0)).collect();
return Ok(IOResult::Done(()));
return Ok(IOResult::Done(!self.search_result.is_empty()));
};
let result = return_if_io!(
main.seek(SeekKey::TableRowId(*rowid), SeekOp::GE { eq_only: true })
@@ -709,17 +708,17 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor {
}
fn query_rowid(&mut self) -> Result<IOResult<Option<i64>>> {
let result = self.search_row.as_ref().unwrap();
let result = self.search_result.front().unwrap();
Ok(IOResult::Done(Some(result.0)))
}
fn query_column(&mut self, _: usize) -> Result<IOResult<Value>> {
let result = self.search_row.as_ref().unwrap();
let result = self.search_result.front().unwrap();
Ok(IOResult::Done(Value::Float(result.1)))
}
fn query_next(&mut self) -> Result<IOResult<bool>> {
self.search_row = self.search_result.pop_front();
Ok(IOResult::Done(self.search_row.is_some()))
let _ = self.search_result.pop_front();
Ok(IOResult::Done(!self.search_result.is_empty()))
}
}

View File

@@ -119,6 +119,15 @@ impl Display for SelectPlan {
)?;
}
},
Operation::IndexMethodQuery(query) => {
let index_method = query.index.index_method.as_ref().unwrap();
writeln!(
f,
"{}QUERY INDEX METHOD {}",
indent,
index_method.definition().method_name
)?;
}
}
}
Ok(())
@@ -161,6 +170,15 @@ impl Display for DeletePlan {
)?;
}
},
Operation::IndexMethodQuery(query) => {
let module = query.index.index_method.as_ref().unwrap();
writeln!(
f,
"{}QUERY MODULE {}",
indent,
module.definition().method_name
)?;
}
}
}
Ok(())
@@ -215,6 +233,15 @@ impl fmt::Display for UpdatePlan {
)?;
}
},
Operation::IndexMethodQuery(query) => {
let module = query.index.index_method.as_ref().unwrap();
writeln!(
f,
"{}QUERY MODULE {}",
indent,
module.definition().method_name
)?;
}
}
}
if !self.order_by.is_empty() {

View File

@@ -670,6 +670,9 @@ fn emit_delete_insns(
index: Some(index), ..
} => program.resolve_cursor_id(&CursorKey::index(internal_id, index.clone())),
},
Operation::IndexMethodQuery(_) => {
panic!("access through IndexMethod is not supported for delete statements")
}
};
let main_table_cursor_id = program.resolve_cursor_id(&CursorKey::table(internal_id));
@@ -980,7 +983,7 @@ fn emit_program_for_update(
)) {
cursor
} else {
let cursor = program.alloc_cursor_id(CursorType::BTreeIndex(index.clone()));
let cursor = program.alloc_cursor_index(None, index)?;
program.emit_insn(Insn::OpenWrite {
cursor_id: cursor,
root_page: RegisterOrLiteral::Literal(index.root_page),
@@ -1087,6 +1090,9 @@ fn emit_update_insns(
false,
),
},
Operation::IndexMethodQuery(_) => {
panic!("access through IndexMethod is not supported for update operations")
}
};
let beg = program.alloc_registers(

View File

@@ -12,7 +12,7 @@ use crate::function::{Func, FuncCtx, MathFuncArity, ScalarFunc, VectorFunc};
use crate::functions::datetime;
use crate::schema::{affinity, Affinity, Table, Type};
use crate::translate::optimizer::TakeOwnership;
use crate::translate::plan::ResultSetColumn;
use crate::translate::plan::{Operation, ResultSetColumn};
use crate::translate::planner::parse_row_id;
use crate::util::{exprs_are_equivalent, normalize_ident, parse_numeric_literal};
use crate::vdbe::builder::CursorKey;
@@ -2098,19 +2098,25 @@ pub fn translate_expr(
column,
is_rowid_alias,
} => {
let (index, use_covering_index) = {
let (index, index_method, use_covering_index) = {
if let Some(table_reference) = referenced_tables
.unwrap()
.find_joined_table_by_internal_id(*table_ref_id)
{
(
table_reference.op.index(),
if let Operation::IndexMethodQuery(index_method) = &table_reference.op {
Some(index_method)
} else {
None
},
table_reference.utilizes_covering_index(),
)
} else {
(None, false)
(None, None, false)
}
};
let use_index_method = index_method.and_then(|m| m.covered_columns.get(column));
let (is_from_outer_query_scope, table) = referenced_tables
.unwrap()
@@ -2122,11 +2128,13 @@ pub fn translate_expr(
)
});
let Some(table_column) = table.get_column_at(*column) else {
crate::bail_parse_error!("column index out of bounds");
};
// Counter intuitive but a column always needs to have a collation
program.set_collation(Some((table_column.collation.unwrap_or_default(), false)));
if use_index_method.is_none() {
let Some(table_column) = table.get_column_at(*column) else {
crate::bail_parse_error!("column index out of bounds");
};
// Counter intuitive but a column always needs to have a collation
program.set_collation(Some((table_column.collation.unwrap_or_default(), false)));
}
// If we are reading a column from a table, we find the cursor that corresponds to
// the table and read the column from the cursor.
@@ -2149,7 +2157,7 @@ pub fn translate_expr(
)
}
} else {
let table_cursor_id = if use_covering_index {
let table_cursor_id = if use_covering_index || use_index_method.is_some() {
None
} else {
Some(program.resolve_cursor_id(&CursorKey::table(*table_ref_id)))
@@ -2161,53 +2169,61 @@ pub fn translate_expr(
(table_cursor_id, index_cursor_id)
};
if *is_rowid_alias {
if let Some(index_cursor_id) = index_cursor_id {
program.emit_insn(Insn::IdxRowId {
cursor_id: index_cursor_id,
dest: target_register,
});
} else if let Some(table_cursor_id) = table_cursor_id {
program.emit_insn(Insn::RowId {
cursor_id: table_cursor_id,
dest: target_register,
});
} else {
unreachable!("Either index or table cursor must be opened");
}
if let Some(custom_module_column) = use_index_method {
program.emit_column_or_rowid(
index_cursor_id.unwrap(),
*custom_module_column,
target_register,
);
} else {
let read_from_index = if is_from_outer_query_scope {
index_cursor_id.is_some()
if *is_rowid_alias {
if let Some(index_cursor_id) = index_cursor_id {
program.emit_insn(Insn::IdxRowId {
cursor_id: index_cursor_id,
dest: target_register,
});
} else if let Some(table_cursor_id) = table_cursor_id {
program.emit_insn(Insn::RowId {
cursor_id: table_cursor_id,
dest: target_register,
});
} else {
unreachable!("Either index or table cursor must be opened");
}
} else {
use_covering_index
};
let read_cursor = if read_from_index {
index_cursor_id.expect("index cursor should be opened")
} else {
table_cursor_id.expect("table cursor should be opened")
};
let column = if read_from_index {
let index = program.resolve_index_for_cursor_id(
index_cursor_id.expect("index cursor should be opened"),
);
index
.column_table_pos_to_index_pos(*column)
.unwrap_or_else(|| {
panic!(
let read_from_index = if is_from_outer_query_scope {
index_cursor_id.is_some()
} else {
use_covering_index
};
let read_cursor = if read_from_index {
index_cursor_id.expect("index cursor should be opened")
} else {
table_cursor_id.expect("table cursor should be opened")
};
let column = if read_from_index {
let index = program.resolve_index_for_cursor_id(
index_cursor_id.expect("index cursor should be opened"),
);
index
.column_table_pos_to_index_pos(*column)
.unwrap_or_else(|| {
panic!(
"index {} does not contain column number {} of table {}",
index.name, column, table_ref_id
)
})
} else {
*column
};
})
} else {
*column
};
program.emit_column_or_rowid(read_cursor, column, target_register);
program.emit_column_or_rowid(read_cursor, column, target_register);
}
let Some(column) = table.get_column_at(*column) else {
crate::bail_parse_error!("column index out of bounds");
};
maybe_apply_affinity(column.ty, target_register, program);
}
let Some(column) = table.get_column_at(*column) else {
crate::bail_parse_error!("column index out of bounds");
};
maybe_apply_affinity(column.ty, target_register, program);
Ok(target_register)
}
Table::FromClauseSubquery(from_clause_subquery) => {
@@ -3596,7 +3612,7 @@ pub fn bind_and_rewrite_expr<'a>(
connection: &'a Arc<crate::Connection>,
param_state: &mut ParamState,
binding_behavior: BindingBehavior,
) -> Result<WalkControl> {
) -> Result<()> {
walk_expr_mut(
top_level_expr,
&mut |expr: &mut ast::Expr| -> Result<WalkControl> {
@@ -3894,7 +3910,8 @@ pub fn bind_and_rewrite_expr<'a>(
}
Ok(WalkControl::Continue)
},
)
)?;
Ok(())
}
/// Recursively walks a mutable expression, applying a function to each sub-expression.

View File

@@ -172,7 +172,7 @@ pub fn translate_create_index(
let sqlite_schema_cursor_id =
program.alloc_cursor_id(CursorType::BTreeTable(sqlite_table.clone()));
let table_ref = program.table_reference_counter.next();
let index_cursor_id = program.alloc_cursor_id(CursorType::BTreeIndex(idx.clone()));
let index_cursor_id = program.alloc_cursor_index(None, &idx)?;
let table_cursor_id = program.alloc_cursor_id_keyed(
CursorKey::table(table_ref),
CursorType::BTreeTable(tbl.clone()),
@@ -201,11 +201,20 @@ pub fn translate_create_index(
// Create a new B-Tree and store the root page index in a register
let root_page_reg = program.alloc_register();
program.emit_insn(Insn::CreateBtree {
db: 0,
root: root_page_reg,
flags: CreateBTreeFlags::new_index(),
});
if idx.index_method.is_some() && !idx.is_backing_btree_index() {
program.emit_insn(Insn::IndexMethodCreate {
db: 0,
cursor_id: index_cursor_id,
});
// index method sqlite_schema row always has root_page equals to zero in the schema (same as virtual tables)
program.emit_int(0, root_page_reg);
} else {
program.emit_insn(Insn::CreateBtree {
db: 0,
root: root_page_reg,
flags: CreateBTreeFlags::new_index(),
});
}
// open the sqlite schema table for writing and create a new entry for the index
program.emit_insn(Insn::OpenWrite {
@@ -226,7 +235,92 @@ pub fn translate_create_index(
Some(sql),
)?;
if index_method.is_none() {
if index_method
.as_ref()
.is_some_and(|m| !m.definition().backing_btree)
{
// open the table we are creating the index on for reading
program.emit_insn(Insn::OpenRead {
cursor_id: table_cursor_id,
root_page: tbl.root_page,
db: 0,
});
// Open the index btree we created for writing to insert the
// newly sorted index records.
program.emit_insn(Insn::OpenWrite {
cursor_id: index_cursor_id,
root_page: RegisterOrLiteral::Register(root_page_reg),
db: 0,
});
let loop_start_label = program.allocate_label();
let loop_end_label = program.allocate_label();
program.emit_insn(Insn::Rewind {
cursor_id: table_cursor_id,
pc_if_empty: loop_end_label,
});
program.preassign_label_to_next_insn(loop_start_label);
// Loop start:
// Collect index values into start_reg..rowid_reg
// emit MakeRecord (index key + rowid) into record_reg.
//
// Then insert the record into the sorter
let mut skip_row_label = None;
if let Some(where_clause) = where_clause {
let label = program.allocate_label();
translate_condition_expr(
&mut program,
&table_references,
&where_clause,
ConditionMetadata {
jump_if_condition_is_true: false,
jump_target_when_false: label,
jump_target_when_true: BranchOffset::Placeholder,
jump_target_when_null: label,
},
resolver,
)?;
skip_row_label = Some(label);
}
let start_reg = program.alloc_registers(columns.len() + 1);
for (i, col) in columns.iter().enumerate() {
program.emit_column_or_rowid(table_cursor_id, col.pos_in_table, start_reg + i);
}
let rowid_reg = start_reg + columns.len();
program.emit_insn(Insn::RowId {
cursor_id: table_cursor_id,
dest: rowid_reg,
});
let record_reg = program.alloc_register();
program.emit_insn(Insn::MakeRecord {
start_reg,
count: columns.len() + 1,
dest_reg: record_reg,
index_name: Some(idx_name.clone()),
affinity_str: None,
});
// insert new index record
program.emit_insn(Insn::IdxInsert {
cursor_id: index_cursor_id,
record_reg,
unpacked_start: Some(start_reg),
unpacked_count: Some((columns.len() + 1) as u16),
flags: IdxInsertFlags::new().use_seek(false),
});
if let Some(skip_row_label) = skip_row_label {
program.resolve_label(skip_row_label, program.offset());
}
program.emit_insn(Insn::Next {
cursor_id: table_cursor_id,
pc_if_next: loop_start_label,
});
program.preassign_label_to_next_insn(loop_end_label);
} else if index_method.is_none() {
// determine the order of the columns in the index for the sorter
let order = idx.columns.iter().map(|c| c.order).collect();
// open the sorter and the pseudo table
@@ -642,20 +736,24 @@ pub fn translate_drop_index(
p5: 0,
});
// Destroy index btree
program.emit_insn(Insn::Destroy {
root: maybe_index.unwrap().root_page,
former_root_reg: 0,
is_temp: 0,
});
// Remove from the Schema any mention of the index
if let Some(idx) = maybe_index {
program.emit_insn(Insn::DropIndex {
index: idx.clone(),
db: 0,
let index = maybe_index.unwrap();
if index.index_method.is_some() && !index.is_backing_btree_index() {
let cursor_id = program.alloc_cursor_index(None, index)?;
program.emit_insn(Insn::IndexMethodDestroy { db: 0, cursor_id });
} else {
// Destroy index btree
program.emit_insn(Insn::Destroy {
root: index.root_page,
former_root_reg: 0,
is_temp: 0,
});
}
// Remove from the Schema any mention of the index
program.emit_insn(Insn::DropIndex {
index: index.clone(),
db: 0,
});
Ok(program)
}

View File

@@ -139,26 +139,24 @@ impl<'a> InsertEmitCtx<'a> {
cdc_table: Option<(usize, Arc<BTreeTable>)>,
num_values: usize,
temp_table_ctx: Option<TempTableCtx>,
) -> Self {
) -> Result<Self> {
// allocate cursor id's for each btree index cursor we'll need to populate the indexes
let idx_cursors = resolver
.schema
.get_indices(table.name.as_str())
.map(|idx| {
(
&idx.name,
idx.root_page,
program.alloc_cursor_id(CursorType::BTreeIndex(idx.clone())),
)
})
.collect::<Vec<(&String, i64, usize)>>();
let indices = resolver.schema.get_indices(table.name.as_str());
let mut idx_cursors = Vec::new();
for idx in indices {
idx_cursors.push((
&idx.name,
idx.root_page,
program.alloc_cursor_index(None, idx)?,
));
}
let halt_label = program.allocate_label();
let loop_start_label = program.allocate_label();
let row_done_label = program.allocate_label();
let stmt_epilogue = program.allocate_label();
let key_ready_for_uniqueness_check_label = program.allocate_label();
let key_generation_label = program.allocate_label();
Self {
Ok(Self {
table,
idx_cursors,
temp_table_ctx,
@@ -176,7 +174,7 @@ impl<'a> InsertEmitCtx<'a> {
key_ready_for_uniqueness_check_label,
key_generation_label,
autoincrement_meta: None,
}
})
}
}
@@ -263,7 +261,7 @@ pub fn translate_insert(
cdc_table,
values.len(),
None,
);
)?;
program = init_source_emission(
program,

View File

@@ -257,10 +257,10 @@ pub fn init_loop(
{
continue;
}
let cursor_id = program.alloc_cursor_id_keyed(
CursorKey::index(table.internal_id, index.clone()),
CursorType::BTreeIndex(index.clone()),
);
let cursor_id = program.alloc_cursor_index(
Some(CursorKey::index(table.internal_id, index.clone())),
index,
)?;
program.emit_insn(Insn::OpenWrite {
cursor_id,
root_page: index.root_page.into(),
@@ -350,10 +350,10 @@ pub fn init_loop(
{
continue;
}
let cursor_id = program.alloc_cursor_id_keyed(
CursorKey::index(table.internal_id, index.clone()),
CursorType::BTreeIndex(index.clone()),
);
let cursor_id = program.alloc_cursor_index(
Some(CursorKey::index(table.internal_id, index.clone())),
index,
)?;
program.emit_insn(Insn::OpenWrite {
cursor_id,
root_page: index.root_page.into(),
@@ -397,6 +397,24 @@ pub fn init_loop(
}
}
}
Operation::IndexMethodQuery(_) => match mode {
OperationMode::SELECT => {
if let Some(table_cursor_id) = table_cursor_id {
program.emit_insn(Insn::OpenRead {
cursor_id: table_cursor_id,
root_page: table.table.get_root_page(),
db: table.database_id,
});
}
let index_cursor_id = index_cursor_id.unwrap();
program.emit_insn(Insn::OpenRead {
cursor_id: index_cursor_id,
root_page: table.op.index().unwrap().root_page,
db: table.database_id,
});
}
_ => panic!("only SELECT is supported for index method"),
},
}
}
@@ -685,6 +703,35 @@ pub fn open_loop(
}
}
}
Operation::IndexMethodQuery(query) => {
let start_reg = program.alloc_registers(query.arguments.len() + 1);
program.emit_int(query.pattern_idx as i64, start_reg);
for i in 0..query.arguments.len() {
translate_expr(
program,
Some(table_references),
&query.arguments[i],
start_reg + 1 + i,
&t_ctx.resolver,
)?;
}
program.emit_insn(Insn::IndexMethodQuery {
db: 0,
cursor_id: index_cursor_id.expect("IndexMethod requires a index cursor"),
start_reg,
count_reg: query.arguments.len() + 1,
pc_if_empty: loop_end,
});
program.preassign_label_to_next_insn(loop_start);
if let Some(table_cursor_id) = table_cursor_id {
if let Some(index_cursor_id) = index_cursor_id {
program.emit_insn(Insn::DeferredSeek {
index_cursor_id,
table_cursor_id,
});
}
}
}
}
for subquery in subqueries.iter_mut().filter(|s| !s.has_been_evaluated()) {
@@ -1169,6 +1216,14 @@ pub fn close_loop(
}
program.preassign_label_to_next_insn(loop_labels.loop_end);
}
Operation::IndexMethodQuery(_) => {
program.resolve_label(loop_labels.next, program.offset());
program.emit_insn(Insn::Next {
cursor_id: index_cursor_id.unwrap(),
pc_if_next: loop_labels.loop_start,
});
program.preassign_label_to_next_insn(loop_labels.loop_end);
}
}
// Handle OUTER JOIN logic. The reason this comes after the "loop end" mark is that we may need to still jump back

View File

@@ -23,11 +23,14 @@ use crate::{
constraints::{RangeConstraintRef, SeekRangeConstraint, TableConstraints},
},
plan::{
ColumnUsedMask, NonFromClauseSubquery, OuterQueryReference, QueryDestination,
ResultSetColumn, Scan, SeekKeyComponent,
ColumnUsedMask, IndexMethodQuery, NonFromClauseSubquery, OuterQueryReference,
QueryDestination, ResultSetColumn, Scan, SeekKeyComponent,
},
},
types::SeekOp,
util::{
exprs_are_equivalent, simple_bind_expr, try_capture_parameters, try_substitute_parameters,
},
vdbe::builder::{CursorKey, CursorType, ProgramBuilder},
LimboError, Result,
};
@@ -84,12 +87,15 @@ pub fn optimize_select_plan(plan: &mut SelectPlan, schema: &Schema) -> Result<()
let best_join_order = optimize_table_access(
schema,
&mut plan.result_columns,
&mut plan.table_references,
&schema.indexes,
&mut plan.where_clause,
&mut plan.order_by,
&mut plan.group_by,
&plan.non_from_clause_subqueries,
&mut plan.limit,
&mut plan.offset,
)?;
if let Some(best_join_order) = best_join_order {
@@ -110,12 +116,15 @@ fn optimize_delete_plan(plan: &mut DeletePlan, schema: &Schema) -> Result<()> {
let _ = optimize_table_access(
schema,
&mut plan.result_columns,
&mut plan.table_references,
&schema.indexes,
&mut plan.where_clause,
&mut plan.order_by,
&mut None,
&[],
&mut plan.limit,
&mut plan.offset,
)?;
Ok(())
@@ -135,12 +144,15 @@ fn optimize_update_plan(
}
let _ = optimize_table_access(
schema,
&mut [],
&mut plan.table_references,
&schema.indexes,
&mut plan.where_clause,
&mut plan.order_by,
&mut None,
&[],
&mut plan.limit,
&mut plan.offset,
)?;
let table_ref = &mut plan.table_references.joined_tables_mut()[0];
@@ -322,6 +334,187 @@ fn optimize_subqueries(plan: &mut SelectPlan, schema: &Schema) -> Result<()> {
Ok(())
}
#[allow(clippy::too_many_arguments)]
fn optimize_table_access_with_custom_modules(
schema: &Schema,
result_columns: &mut [ResultSetColumn],
table_references: &mut TableReferences,
available_indexes: &HashMap<String, VecDeque<Arc<Index>>>,
where_query: &mut [WhereTerm],
order_by: &mut Vec<(Box<ast::Expr>, SortOrder)>,
group_by: &mut Option<GroupBy>,
limit: &mut Option<Box<Expr>>,
offset: &mut Option<Box<Expr>>,
) -> Result<bool> {
let tables = table_references.joined_tables_mut();
assert_eq!(tables.len(), 1);
// group by is not supported for now
if group_by.is_some() {
return Ok(false);
}
let table = &mut tables[0];
let Some(indexes) = available_indexes.get(table.table.get_name()) else {
return Ok(false);
};
for index in indexes {
let Some(module) = &index.index_method else {
continue;
};
if index.is_backing_btree_index() {
continue;
}
let definition = module.definition();
'pattern: for (pattern_idx, pattern) in definition.patterns.iter().enumerate() {
let mut pattern = pattern.clone();
assert!(pattern.with.is_none());
assert!(pattern.body.compounds.is_empty());
let ast::OneSelect::Select {
columns,
from: Some(ast::FromClause { select, joins }),
distinctness: None,
ref mut where_clause,
group_by: None,
window_clause,
} = &mut pattern.body.select
else {
panic!("unexpected select pattern body");
};
assert!(window_clause.is_empty());
assert!(joins.is_empty());
let ast::SelectTable::Table(name, _, _) = select.as_ref() else {
panic!("unexpected from clause");
};
for column in columns.iter_mut() {
if let ast::ResultColumn::Expr(e, _) = column {
simple_bind_expr(schema, table, &[], e)?;
}
}
for column in pattern.order_by.iter_mut() {
simple_bind_expr(schema, table, columns, &mut column.expr)?;
}
if let Some(pattern_where) = where_clause {
simple_bind_expr(schema, table, columns, pattern_where)?;
}
if name.name.as_str() != table.table.get_name() {
continue;
}
if order_by.len() != pattern.order_by.len() {
continue;
}
let mut where_query_covered = None;
let mut parameters = HashMap::new();
for (pattern_column, (query_column, query_order)) in
pattern.order_by.iter().zip(order_by.iter())
{
if *query_order != pattern_column.order.unwrap_or(SortOrder::Asc) {
continue 'pattern;
}
let Some(captured) = try_capture_parameters(&pattern_column.expr, query_column)
else {
continue 'pattern;
};
parameters.extend(captured);
}
match (pattern.limit.as_ref().map(|x| &x.expr), &limit) {
(None, Some(_)) | (Some(_), None) => continue,
(Some(pattern_limit), Some(query_limit)) => {
let Some(captured) = try_capture_parameters(pattern_limit, query_limit) else {
continue 'pattern;
};
parameters.extend(captured);
}
(None, None) => {}
}
match (
pattern.limit.as_ref().and_then(|x| x.offset.as_ref()),
&offset,
) {
(None, Some(_)) | (Some(_), None) => continue,
(Some(pattern_off), Some(query_off)) => {
let Some(captured) = try_capture_parameters(pattern_off, query_off) else {
continue 'pattern;
};
parameters.extend(captured);
}
(None, None) => {}
}
if let Some(pattern_where) = where_clause {
for (i, query_where) in where_query.iter().enumerate() {
let captured = try_capture_parameters(pattern_where, &query_where.expr);
let Some(captured) = captured else {
continue;
};
parameters.extend(captured);
where_query_covered = Some(i);
break;
}
}
if where_clause.is_some() && where_query_covered.is_none() {
continue;
}
let where_covered_completely =
where_query.is_empty() || where_query_covered.is_some() && where_query.len() == 1;
if !where_covered_completely
&& (!order_by.is_empty() || limit.is_some() || offset.is_some())
{
continue;
}
if let Some(where_covered) = where_query_covered {
where_query[where_covered].consumed = true;
}
// todo: fix this
let mut covered_column_id = 1_000_000;
let mut covered_columns = HashMap::new();
for (patter_column_id, pattern_column) in columns.iter().enumerate() {
let ast::ResultColumn::Expr(pattern, _) = pattern_column else {
continue;
};
let Some(pattern) = try_substitute_parameters(pattern, &parameters) else {
continue;
};
for query_column in result_columns.iter_mut() {
if !exprs_are_equivalent(&query_column.expr, &pattern) {
continue;
}
query_column.expr = ast::Expr::Column {
database: None,
table: table.internal_id,
column: covered_column_id,
is_rowid_alias: false,
};
covered_columns.insert(covered_column_id, patter_column_id);
covered_column_id += 1;
}
}
let _ = order_by.drain(..);
let _ = limit.take();
let _ = offset.take();
let mut arguments = parameters.iter().collect::<Vec<_>>();
arguments.sort_by_key(|(&i, _)| i);
table.op = Operation::IndexMethodQuery(IndexMethodQuery {
index: index.clone(),
pattern_idx,
covered_columns,
arguments: arguments.iter().map(|(_, e)| (*e).clone()).collect(),
});
return Ok(true);
}
}
Ok(false)
}
/// Optimize the join order and index selection for a query.
///
/// This function does the following:
@@ -333,14 +526,18 @@ fn optimize_subqueries(plan: &mut SelectPlan, schema: &Schema) -> Result<()> {
/// - Removes sorting operations if the selected join order and access methods satisfy the [crate::translate::optimizer::order::OrderTarget].
///
/// Returns the join order if it was optimized, or None if the default join order was considered best.
#[allow(clippy::too_many_arguments)]
fn optimize_table_access(
schema: &Schema,
result_columns: &mut [ResultSetColumn],
table_references: &mut TableReferences,
available_indexes: &HashMap<String, VecDeque<Arc<Index>>>,
where_clause: &mut [WhereTerm],
order_by: &mut Vec<(Box<ast::Expr>, SortOrder)>,
group_by: &mut Option<GroupBy>,
subqueries: &[NonFromClauseSubquery],
limit: &mut Option<Box<Expr>>,
offset: &mut Option<Box<Expr>>,
) -> Result<Option<Vec<JoinOrderMember>>> {
if table_references.joined_tables().len() > TableReferences::MAX_JOINED_TABLES {
crate::bail_parse_error!(
@@ -348,6 +545,24 @@ fn optimize_table_access(
TableReferences::MAX_JOINED_TABLES
);
}
if table_references.joined_tables().len() == 1 {
let optimized = optimize_table_access_with_custom_modules(
schema,
result_columns,
table_references,
available_indexes,
where_clause,
order_by,
group_by,
limit,
offset,
)?;
if optimized {
return Ok(None);
}
}
let access_methods_arena = RefCell::new(Vec::new());
let maybe_order_target = compute_order_target(order_by, group_by.as_mut());
let constraints_per_table = constraints_from_where_clause(

View File

@@ -1,4 +1,4 @@
use std::{cmp::Ordering, sync::Arc};
use std::{cmp::Ordering, collections::HashMap, sync::Arc};
use turso_parser::ast::{
self, FrameBound, FrameClause, FrameExclude, FrameMode, SortOrder, SubqueryType,
};
@@ -38,7 +38,13 @@ impl ResultSetColumn {
}
match &self.expr {
ast::Expr::Column { table, column, .. } => {
let (_, table_ref) = tables.find_table_by_internal_id(*table).unwrap();
let joined_table_ref = tables.find_joined_table_by_internal_id(*table).unwrap();
if let Operation::IndexMethodQuery(module) = &joined_table_ref.op {
if module.covered_columns.contains_key(column) {
return None;
}
}
let table_ref = &joined_table_ref.table;
table_ref.get_column_at(*column).unwrap().name.as_deref()
}
ast::Expr::RowId { table, .. } => {
@@ -851,6 +857,8 @@ pub enum Operation {
// This operation is used to search for a row in a table using an index
// (i.e. a primary key or a secondary index)
Search(Search),
// Access through custom index method query
IndexMethodQuery(IndexMethodQuery),
}
impl Operation {
@@ -872,9 +880,10 @@ impl Operation {
pub fn index(&self) -> Option<&Arc<Index>> {
match self {
Operation::Scan(Scan::BTreeTable { index, .. }) => index.as_ref(),
Operation::Search(Search::Seek { index, .. }) => index.as_ref(),
Operation::IndexMethodQuery(IndexMethodQuery { index, .. }) => Some(index),
Operation::Scan(_) => None,
Operation::Search(Search::RowidEq { .. }) => None,
Operation::Search(Search::Seek { index, .. }) => index.as_ref(),
}
}
}
@@ -1000,12 +1009,14 @@ impl JoinedTable {
)
};
let index_cursor_id = index.map(|index| {
program.alloc_cursor_id_keyed(
CursorKey::index(self.internal_id, index.clone()),
CursorType::BTreeIndex(index.clone()),
)
});
let index_cursor_id = index
.map(|index| {
program.alloc_cursor_index(
Some(CursorKey::index(self.internal_id, index.clone())),
index,
)
})
.transpose()?;
Ok((table_cursor_id, index_cursor_id))
}
Table::Virtual(virtual_table) => {
@@ -1221,6 +1232,19 @@ pub enum Search {
},
}
#[allow(clippy::large_enum_variant)]
#[derive(Clone, Debug)]
pub struct IndexMethodQuery {
/// index method to use
pub index: Arc<Index>,
/// idx of the pattern from [crate::index_method::IndexMethodAttachment::definition] which planner chose to use for the access
pub pattern_idx: usize,
/// captured arguments for the pattern chosen by the planner
pub arguments: Vec<Expr>,
/// mapping from index of [ast::Expr::Column] to the column index of IndexMethod response
pub covered_columns: HashMap<usize, usize>,
}
#[derive(Debug, Clone, PartialEq)]
pub struct Aggregate {
pub func: AggFunc,

View File

@@ -645,6 +645,7 @@ fn count_plan_required_cursors(plan: &SelectPlan) -> usize {
Search::RowidEq { .. } => 1,
Search::Seek { index, .. } => 1 + index.is_some() as usize,
}
Operation::IndexMethodQuery(_) => 1,
} + if let Table::FromClauseSubquery(from_clause_subquery) = &t.table {
count_plan_required_cursors(&from_clause_subquery.plan)
} else {
@@ -664,6 +665,7 @@ fn estimate_num_instructions(select: &SelectPlan) -> usize {
.map(|t| match &t.op {
Operation::Scan { .. } => 10,
Operation::Search(_) => 15,
Operation::IndexMethodQuery(_) => 15,
} + if let Table::FromClauseSubquery(from_clause_subquery) = &t.table {
10 + estimate_num_instructions(&from_clause_subquery.plan)
} else {
@@ -687,6 +689,7 @@ fn estimate_num_labels(select: &SelectPlan) -> usize {
.map(|t| match &t.op {
Operation::Scan { .. } => 3,
Operation::Search(_) => 3,
Operation::IndexMethodQuery(_) => 3,
} + if let Table::FromClauseSubquery(from_clause_subquery) = &t.table {
3 + estimate_num_labels(&from_clause_subquery.plan)
} else {

View File

@@ -390,6 +390,13 @@ pub fn emit_from_clause_subqueries(
)
}
},
Operation::IndexMethodQuery(query) => {
let index_method = query.index.index_method.as_ref().unwrap();
format!(
"QUERY INDEX METHOD {}",
index_method.definition().method_name
)
}
}
);

View File

@@ -5,6 +5,7 @@ use turso_parser::ast::SortOrder;
use crate::error::LimboError;
use crate::ext::{ExtValue, ExtValueType};
use crate::index_method::IndexMethodCursor;
use crate::numeric::format_float;
use crate::pseudo::PseudoCursor;
use crate::schema::Index;
@@ -2271,6 +2272,7 @@ impl Record {
pub enum Cursor {
BTree(Box<dyn CursorTrait>),
IndexMethod(Box<dyn IndexMethodCursor>),
Pseudo(PseudoCursor),
Sorter(Sorter),
Virtual(VirtualTableCursor),
@@ -2281,6 +2283,7 @@ impl Debug for Cursor {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::BTree(..) => f.debug_tuple("BTree").finish(),
Self::IndexMethod(..) => f.debug_tuple("IndexMethod").finish(),
Self::Pseudo(..) => f.debug_tuple("Pseudo").finish(),
Self::Sorter(..) => f.debug_tuple("Sorter").finish(),
Self::Virtual(..) => f.debug_tuple("Virtual").finish(),
@@ -2344,6 +2347,13 @@ impl Cursor {
_ => panic!("Cursor is not a materialized view cursor"),
}
}
pub fn as_index_method_mut(&mut self) -> &mut dyn IndexMethodCursor {
match self {
Self::IndexMethod(cursor) => cursor.as_mut(),
_ => panic!("Cursor is not an IndexMethod cursor"),
}
}
}
#[derive(Debug)]

View File

@@ -2,7 +2,9 @@
use crate::incremental::view::IncrementalView;
use crate::numeric::StrToF64;
use crate::translate::emitter::TransactionMode;
use crate::translate::expr::WalkControl;
use crate::translate::expr::{walk_expr_mut, WalkControl};
use crate::translate::plan::JoinedTable;
use crate::translate::planner::parse_row_id;
use crate::types::IOResult;
use crate::{
schema::{self, BTreeTable, Column, Schema, Table, Type, DBSP_TABLE_PREFIX},
@@ -318,6 +320,154 @@ pub fn check_literal_equivalency(lhs: &Literal, rhs: &Literal) -> bool {
}
}
/// bind AST identifiers to either Column or Rowid if possible
pub fn simple_bind_expr(
schema: &Schema,
joined_table: &JoinedTable,
result_columns: &[ast::ResultColumn],
expr: &mut ast::Expr,
) -> Result<()> {
let internal_id = joined_table.internal_id;
walk_expr_mut(expr, &mut |expr: &mut ast::Expr| -> Result<WalkControl> {
#[allow(clippy::single_match)]
match expr {
Expr::Id(id) => {
let normalized_id = normalize_ident(id.as_str());
for result_column in result_columns.iter() {
if let ast::ResultColumn::Expr(result, Some(ast::As::As(alias))) = result_column
{
if alias.as_str().eq_ignore_ascii_case(&normalized_id) {
*expr = *result.clone();
return Ok(WalkControl::Continue);
}
}
}
let col_idx = joined_table.columns().iter().position(|c| {
c.name
.as_ref()
.is_some_and(|name| name.eq_ignore_ascii_case(&normalized_id))
});
if let Some(col_idx) = col_idx {
let col = joined_table.table.columns().get(col_idx).unwrap();
*expr = ast::Expr::Column {
database: None,
table: internal_id,
column: col_idx,
is_rowid_alias: col.is_rowid_alias,
};
} else {
// only if we haven't found a match, check for explicit rowid reference
let is_btree_table = matches!(joined_table.table, Table::BTree(_));
if is_btree_table {
if let Some(rowid) = parse_row_id(&normalized_id, internal_id, || false)? {
*expr = rowid;
}
}
}
}
_ => {}
}
Ok(WalkControl::Continue)
});
Ok(())
}
pub fn try_substitute_parameters(
pattern: &Expr,
parameters: &HashMap<i32, Expr>,
) -> Option<Box<Expr>> {
match pattern {
Expr::FunctionCall {
name,
distinctness,
args,
order_by,
filter_over,
} => {
let mut substituted = Vec::new();
for arg in args {
substituted.push(try_substitute_parameters(arg, parameters)?);
}
Some(Box::new(Expr::FunctionCall {
args: substituted,
distinctness: *distinctness,
name: name.clone(),
order_by: order_by.clone(),
filter_over: filter_over.clone(),
}))
}
Expr::Variable(var) => {
let Ok(var) = var.parse::<i32>() else {
return None;
};
Some(Box::new(parameters.get(&var)?.clone()))
}
_ => Some(Box::new(pattern.clone())),
}
}
pub fn try_capture_parameters(pattern: &Expr, query: &Expr) -> Option<HashMap<i32, Expr>> {
let mut captured = HashMap::new();
match (pattern, query) {
(
Expr::FunctionCall {
name: name1,
distinctness: distinct1,
args: args1,
order_by: order1,
filter_over: filter1,
},
Expr::FunctionCall {
name: name2,
distinctness: distinct2,
args: args2,
order_by: order2,
filter_over: filter2,
},
) => {
if !name1.as_str().eq_ignore_ascii_case(name2.as_str()) {
return None;
}
if distinct1.is_some() || distinct2.is_some() {
return None;
}
if !order1.is_empty() || !order2.is_empty() {
return None;
}
if filter1.filter_clause.is_some() || filter1.over_clause.is_some() {
return None;
}
if filter2.filter_clause.is_some() || filter2.over_clause.is_some() {
return None;
}
for (arg1, arg2) in args1.iter().zip(args2.iter()) {
let result = try_capture_parameters(arg1, arg2)?;
captured.extend(result);
}
Some(captured)
}
(Expr::Variable(var), expr) => {
let Ok(var) = var.parse::<i32>() else {
return None;
};
captured.insert(var, expr.clone());
Some(captured)
}
(
Expr::Id(_) | Expr::Name(_) | Expr::Column { .. },
Expr::Id(_) | Expr::Name(_) | Expr::Column { .. },
) => {
if pattern == query {
Some(captured)
} else {
None
}
}
(_, _) => None,
}
}
/// This function is used to determine whether two expressions are logically
/// equivalent in the context of queries, even if their representations
/// differ. e.g.: `SUM(x)` and `sum(x)`, `x + y` and `y + x`

View File

@@ -7,6 +7,7 @@ use tracing::{instrument, Level};
use turso_parser::ast::{self, TableInternalId};
use crate::{
index_method::IndexMethodAttachment,
numeric::Numeric,
parameters::Parameters,
schema::{BTreeTable, Index, PseudoCursorType, Schema, Table},
@@ -134,6 +135,7 @@ pub struct ProgramBuilder {
pub enum CursorType {
BTreeTable(Arc<BTreeTable>),
BTreeIndex(Arc<Index>),
IndexMethod(Arc<dyn IndexMethodAttachment>),
Pseudo(PseudoCursorType),
Sorter,
VirtualTable(Arc<VirtualTable>),
@@ -332,6 +334,21 @@ impl ProgramBuilder {
}
}
/// allocate proper cursor for the given index (either [CursorType::BTreeIndex] or [CursorType::IndexMethod])
pub fn alloc_cursor_index(
&mut self,
key: Option<CursorKey>,
index: &Arc<Index>,
) -> crate::Result<usize> {
tracing::debug!("alloc cursor: {:?} {:?}", key, index.index_method.is_some());
let module = index.index_method.as_ref();
if module.is_some_and(|m| !m.definition().backing_btree) {
let module = module.unwrap().clone();
return Ok(self._alloc_cursor_id(key, CursorType::IndexMethod(module)));
}
Ok(self._alloc_cursor_id(key, CursorType::BTreeIndex(index.clone())))
}
pub fn alloc_cursor_id(&mut self, cursor_type: CursorType) -> usize {
self._alloc_cursor_id(None, cursor_type)
}
@@ -803,6 +820,9 @@ impl ProgramBuilder {
Insn::IdxLT { target_pc, .. } => {
resolve(target_pc, "IdxLT");
}
Insn::IndexMethodQuery { pc_if_empty, .. } => {
resolve(pc_if_empty, "IndexMethodQuery");
}
Insn::IsNull { reg: _, target_pc } => {
resolve(target_pc, "IsNull");
}

View File

@@ -1045,6 +1045,20 @@ pub fn op_open_read(
let pager = program.get_pager_from_database_index(db);
if let (_, CursorType::IndexMethod(module)) = &program.cursor_ref[*cursor_id] {
if state.cursors[*cursor_id].is_none() {
let cursor = module.init()?;
let cursor_ref = &mut state.cursors[*cursor_id];
*cursor_ref = Some(Cursor::IndexMethod(cursor));
}
let cursor = state.cursors[*cursor_id].as_mut().unwrap();
let cursor = cursor.as_index_method_mut();
return_if_io!(cursor.open_read(&program.connection));
state.pc += 1;
return Ok(InsnFunctionStepResult::Step);
}
let (_, cursor_type) = program.cursor_ref.get(*cursor_id).unwrap();
if program.connection.get_mv_tx_id().is_none() {
assert!(*root_page >= 0, "");
@@ -1137,6 +1151,9 @@ pub fn op_open_read(
CursorType::Sorter => {
panic!("OpenRead on sorter cursor");
}
CursorType::IndexMethod(..) => {
unreachable!("IndexMethod handled above")
}
CursorType::VirtualTable(_) => {
panic!("OpenRead on virtual table cursor, use Insn:VOpen instead");
}
@@ -1536,8 +1553,11 @@ pub fn op_column(
} => {
let Some(rowid) = ({
let index_cursor = state.get_cursor(index_cursor_id);
let index_cursor = index_cursor.as_btree_mut();
return_if_io!(index_cursor.rowid())
match index_cursor {
Cursor::BTree(cursor) => return_if_io!(cursor.rowid()),
Cursor::IndexMethod(cursor) => return_if_io!(cursor.query_rowid()),
_ => panic!("unexpected cursor type"),
}
}) else {
state.registers[*dest] = Register::Value(Value::Null);
break 'outer;
@@ -1858,6 +1878,12 @@ pub fn op_column(
};
state.registers[*dest] = Register::Value(value);
}
CursorType::IndexMethod(..) => {
let cursor = state.cursors[*cursor_id].as_mut().unwrap();
let cursor = cursor.as_index_method_mut();
let value = return_if_io!(cursor.query_column(*column));
state.registers[*dest] = Register::Value(value);
}
CursorType::VirtualTable(_) => {
panic!("Insn:Column on virtual table cursor, use Insn:VColumn instead");
}
@@ -2038,6 +2064,11 @@ pub fn op_next(
let has_more = return_if_io!(mv_cursor.next());
!has_more
}
Cursor::IndexMethod(_) => {
let cursor = cursor.as_index_method_mut();
let has_more = return_if_io!(cursor.query_next());
!has_more
}
_ => panic!("Next on non-btree/materialized-view cursor"),
}
};
@@ -2773,15 +2804,22 @@ pub fn op_row_id(
} => {
let rowid = {
let index_cursor = state.get_cursor(index_cursor_id);
let index_cursor = index_cursor.as_btree_mut();
let record = return_if_io!(index_cursor.record());
let record = record.as_ref().unwrap();
let mut record_cursor_ref = index_cursor.record_cursor_mut();
let record_cursor = record_cursor_ref.deref_mut();
let rowid = record.last_value(record_cursor).unwrap();
match rowid {
Ok(ValueRef::Integer(rowid)) => rowid,
_ => unreachable!(),
match index_cursor {
Cursor::BTree(index_cursor) => {
let record = return_if_io!(index_cursor.record());
let record = record.as_ref().unwrap();
let mut record_cursor_ref = index_cursor.record_cursor_mut();
let record_cursor = record_cursor_ref.deref_mut();
let rowid = record.last_value(record_cursor).unwrap();
match rowid {
Ok(ValueRef::Integer(rowid)) => rowid,
_ => unreachable!(),
}
}
Cursor::IndexMethod(index_cursor) => {
return_if_io!(index_cursor.query_rowid()).unwrap()
}
_ => panic!("unexpected cursor type"),
}
};
state.op_row_id_state = OpRowIdState::Seek {
@@ -2827,6 +2865,14 @@ pub fn op_row_id(
} else {
state.registers[*dest] = Register::Value(Value::Null);
}
} else if let Some(Cursor::IndexMethod(cursor)) =
cursors.get_mut(*cursor_id).unwrap()
{
if let Some(rowid) = return_if_io!(cursor.query_rowid()) {
state.registers[*dest] = Register::Value(Value::Integer(rowid));
} else {
state.registers[*dest] = Register::Value(Value::Null);
}
} else {
return Err(LimboError::InternalError(
"RowId: cursor is not a table, virtual, or materialized view cursor"
@@ -2853,8 +2899,12 @@ pub fn op_idx_row_id(
load_insn!(IdxRowId { cursor_id, dest }, insn);
let cursors = &mut state.cursors;
let cursor = cursors.get_mut(*cursor_id).unwrap().as_mut().unwrap();
let cursor = cursor.as_btree_mut();
let rowid = return_if_io!(cursor.rowid());
let rowid = match cursor {
Cursor::BTree(cursor) => return_if_io!(cursor.rowid()),
Cursor::IndexMethod(cursor) => return_if_io!(cursor.query_rowid()),
_ => panic!("unexpected cursor type"),
};
state.registers[*dest] = match rowid {
Some(rowid) => Register::Value(Value::Integer(rowid)),
None => Register::Value(Value::Null),
@@ -6314,6 +6364,13 @@ pub fn op_idx_delete(
insn
);
tracing::info!("idx_delete cursor: {:?}", program.cursor_ref[*cursor_id]);
if let Some(Cursor::IndexMethod(cursor)) = &mut state.cursors[*cursor_id] {
return_if_io!(cursor.delete(&state.registers[*start_reg..*start_reg + *num_regs]));
state.pc += 1;
return Ok(InsnFunctionStepResult::Step);
}
loop {
#[cfg(debug_assertions)]
tracing::debug!(
@@ -6420,11 +6477,29 @@ pub fn op_idx_insert(
cursor_id,
record_reg,
flags,
unpacked_start,
unpacked_count,
..
},
*insn
);
if let Some(Cursor::IndexMethod(cursor)) = &mut state.cursors[cursor_id] {
let Some(start) = unpacked_start else {
return Err(LimboError::InternalError(
"IndexMethod must receive unpacked values".to_string(),
));
};
let Some(count) = unpacked_count else {
return Err(LimboError::InternalError(
"IndexMethod must receive unpacked values".to_string(),
));
};
return_if_io!(cursor.insert(&state.registers[start..start + count as usize]));
state.pc += 1;
return Ok(InsnFunctionStepResult::Step);
}
let record_to_insert = match &state.registers[record_reg] {
Register::Record(ref r) => r,
o => {
@@ -6929,6 +7004,20 @@ pub fn op_open_write(
}
let pager = program.get_pager_from_database_index(db);
if let (_, CursorType::IndexMethod(module)) = &program.cursor_ref[*cursor_id] {
if state.cursors[*cursor_id].is_none() {
let cursor = module.init()?;
let cursor_ref = &mut state.cursors[*cursor_id];
*cursor_ref = Some(Cursor::IndexMethod(cursor));
}
let cursor = state.cursors[*cursor_id].as_mut().unwrap();
let cursor = cursor.as_index_method_mut();
return_if_io!(cursor.open_write(&program.connection));
state.pc += 1;
return Ok(InsnFunctionStepResult::Step);
}
let root_page = match root_page {
RegisterOrLiteral::Literal(lit) => *lit,
RegisterOrLiteral::Register(reg) => match &state.registers[*reg].get_value() {
@@ -7087,6 +7176,102 @@ pub fn op_create_btree(
Ok(InsnFunctionStepResult::Step)
}
pub fn op_index_method_create(
program: &Program,
state: &mut ProgramState,
insn: &Insn,
pager: &Arc<Pager>,
mv_store: Option<&Arc<MvStore>>,
) -> Result<InsnFunctionStepResult> {
load_insn!(IndexMethodCreate { db, cursor_id }, insn);
assert_eq!(*db, 0);
if program.connection.is_readonly(*db) {
return Err(LimboError::ReadOnly);
}
if let Some(mv_store) = mv_store {
todo!("MVCC is not supported yet");
}
if let (_, CursorType::IndexMethod(module)) = &program.cursor_ref[*cursor_id] {
if state.cursors[*cursor_id].is_none() {
let cursor = module.init()?;
let cursor_ref = &mut state.cursors[*cursor_id];
*cursor_ref = Some(Cursor::IndexMethod(cursor));
}
}
let cursor = state.cursors[*cursor_id].as_mut().unwrap();
let cursor = cursor.as_index_method_mut();
return_if_io!(cursor.create(&program.connection));
state.pc += 1;
Ok(InsnFunctionStepResult::Step)
}
pub fn op_index_method_destroy(
program: &Program,
state: &mut ProgramState,
insn: &Insn,
pager: &Arc<Pager>,
mv_store: Option<&Arc<MvStore>>,
) -> Result<InsnFunctionStepResult> {
load_insn!(IndexMethodDestroy { db, cursor_id }, insn);
assert_eq!(*db, 0);
if program.connection.is_readonly(*db) {
return Err(LimboError::ReadOnly);
}
if let Some(mv_store) = mv_store {
todo!("MVCC is not supported yet");
}
if let (_, CursorType::IndexMethod(module)) = &program.cursor_ref[*cursor_id] {
if state.cursors[*cursor_id].is_none() {
let cursor = module.init()?;
let cursor_ref = &mut state.cursors[*cursor_id];
*cursor_ref = Some(Cursor::IndexMethod(cursor));
}
}
let cursor = state.cursors[*cursor_id].as_mut().unwrap();
let cursor = cursor.as_index_method_mut();
return_if_io!(cursor.destroy(&program.connection));
state.pc += 1;
Ok(InsnFunctionStepResult::Step)
}
pub fn op_index_method_query(
program: &Program,
state: &mut ProgramState,
insn: &Insn,
pager: &Arc<Pager>,
mv_store: Option<&Arc<MvStore>>,
) -> Result<InsnFunctionStepResult> {
load_insn!(
IndexMethodQuery {
db,
cursor_id,
start_reg,
count_reg,
pc_if_empty,
},
insn
);
assert_eq!(*db, 0);
if program.connection.is_readonly(*db) {
return Err(LimboError::ReadOnly);
}
if let Some(mv_store) = mv_store {
todo!("MVCC is not supported yet");
}
let cursor = state.cursors[*cursor_id].as_mut().unwrap();
let cursor = cursor.as_index_method_mut();
let has_rows =
return_if_io!(cursor.query_start(&state.registers[*start_reg..*start_reg + *count_reg]));
if !has_rows {
state.pc = pc_if_empty.as_offset_int();
} else {
state.pc += 1;
}
Ok(InsnFunctionStepResult::Step)
}
pub enum OpDestroyState {
CreateCursor,
DestroyBtree(Arc<RwLock<BTreeCursor>>),
@@ -7830,6 +8015,9 @@ pub fn op_open_ephemeral(
CursorType::VirtualTable(_) => {
panic!("OpenEphemeral on virtual table cursor, use Insn::VOpen instead");
}
CursorType::IndexMethod(..) => {
panic!("OpenEphemeral on index method cursor")
}
CursorType::MaterializedView(_, _) => {
panic!("OpenEphemeral on materialized view cursor");
}

View File

@@ -19,11 +19,12 @@ pub fn insn_to_row(
let get_table_or_index_name = |cursor_id: usize| {
let cursor_type = &program.cursor_ref[cursor_id].1;
match cursor_type {
CursorType::BTreeTable(table) => &table.name,
CursorType::BTreeIndex(index) => &index.name,
CursorType::BTreeTable(table) => table.name.as_str(),
CursorType::BTreeIndex(index) => index.name.as_str(),
CursorType::IndexMethod(descriptor) => descriptor.definition().index_name,
CursorType::Pseudo(_) => "pseudo",
CursorType::VirtualTable(virtual_table) => &virtual_table.name,
CursorType::MaterializedView(table, _) => &table.name,
CursorType::VirtualTable(virtual_table) => virtual_table.name.as_str(),
CursorType::MaterializedView(table, _) => table.name.as_str(),
CursorType::Sorter => "sorter",
}
};
@@ -551,6 +552,7 @@ pub fn insn_to_row(
}
CursorType::Pseudo(_) => None,
CursorType::Sorter => None,
CursorType::IndexMethod(..) => None,
CursorType::VirtualTable(v) => v.columns.get(*column).unwrap().name.as_ref(),
};
(
@@ -1283,6 +1285,33 @@ pub fn insn_to_row(
0,
format!("r[{}]=root iDb={} flags={}", root, db, flags.get_flags()),
),
Insn::IndexMethodCreate { db, cursor_id } => (
"IndexMethodCreate",
*db as i32,
*cursor_id as i32,
0,
Value::build_text(""),
0,
"".to_string()
),
Insn::IndexMethodDestroy { db, cursor_id } => (
"IndexMethodDestroy",
*db as i32,
*cursor_id as i32,
0,
Value::build_text(""),
0,
"".to_string()
),
Insn::IndexMethodQuery { db, cursor_id, start_reg, .. } => (
"IndexMethodQuery",
*db as i32,
*cursor_id as i32,
*start_reg as i32,
Value::build_text(""),
0,
"".to_string()
),
Insn::Destroy {
root,
former_root_reg,

View File

@@ -879,6 +879,25 @@ pub enum Insn {
flags: CreateBTreeFlags,
},
/// Create custom index method (calls [crate::index_method::IndexMethodCursor::create] under the hood)
IndexMethodCreate {
db: usize,
cursor_id: CursorID,
},
/// Destroy custom index method (calls [crate::index_method::IndexMethodCursor::destroy] under the hood)
IndexMethodDestroy {
db: usize,
cursor_id: CursorID,
},
/// Query custom index method (call [crate::index_method::IndexMethodCursor::query_start] under the hood)
IndexMethodQuery {
db: usize,
cursor_id: CursorID,
start_reg: usize,
count_reg: usize,
pc_if_empty: BranchOffset,
},
/// Deletes an entire database table or index whose root page in the database file is given by P1.
Destroy {
/// The root page of the table/index to destroy
@@ -1318,6 +1337,9 @@ impl InsnVariants {
InsnVariants::OpenWrite => execute::op_open_write,
InsnVariants::Copy => execute::op_copy,
InsnVariants::CreateBtree => execute::op_create_btree,
InsnVariants::IndexMethodCreate => execute::op_index_method_create,
InsnVariants::IndexMethodDestroy => execute::op_index_method_destroy,
InsnVariants::IndexMethodQuery => execute::op_index_method_query,
InsnVariants::Destroy => execute::op_destroy,
InsnVariants::ResetSorter => execute::op_reset_sorter,
InsnVariants::DropTable => execute::op_drop_table,

View File

@@ -1,5 +1,8 @@
use std::collections::HashMap;
use core_tester::common::rng_from_time_or_env;
use rand::{RngCore, SeedableRng};
use rand_chacha::ChaCha8Rng;
use turso_core::{
index_method::{
toy_vector_sparse_ivf::VectorSparseInvertedIndexMethod, IndexMethod,
@@ -154,10 +157,9 @@ fn test_vector_sparse_ivf_insert_query() {
Register::Value(sparse_vector(vector)),
Register::Value(Value::Integer(5)),
];
run(&tmp_db, || cursor.query_start(&values)).unwrap();
assert!(run(&tmp_db, || cursor.query_start(&values)).unwrap());
for (rowid, dist) in results {
assert!(run(&tmp_db, || cursor.query_next()).unwrap());
for (i, (rowid, dist)) in results.iter().enumerate() {
assert_eq!(
*rowid,
run(&tmp_db, || cursor.query_rowid()).unwrap().unwrap()
@@ -166,9 +168,11 @@ fn test_vector_sparse_ivf_insert_query() {
*dist,
run(&tmp_db, || cursor.query_column(0)).unwrap().as_float()
);
assert_eq!(
i + 1 < results.len(),
run(&tmp_db, || cursor.query_next()).unwrap()
);
}
assert!(!run(&tmp_db, || cursor.query_next()).unwrap());
}
}
@@ -231,8 +235,7 @@ fn test_vector_sparse_ivf_update() {
let mut reader = attached.init().unwrap();
run(&tmp_db, || reader.open_read(&conn)).unwrap();
run(&tmp_db, || reader.query_start(&query_values)).unwrap();
assert!(!run(&tmp_db, || reader.query_next()).unwrap());
assert!(!run(&tmp_db, || reader.query_start(&query_values)).unwrap());
limbo_exec_rows(
&tmp_db,
@@ -244,8 +247,7 @@ fn test_vector_sparse_ivf_update() {
let mut reader = attached.init().unwrap();
run(&tmp_db, || reader.open_read(&conn)).unwrap();
run(&tmp_db, || reader.query_start(&query_values)).unwrap();
assert!(run(&tmp_db, || reader.query_next()).unwrap());
assert!(run(&tmp_db, || reader.query_start(&query_values)).unwrap());
assert_eq!(1, run(&tmp_db, || reader.query_rowid()).unwrap().unwrap());
assert_eq!(
0.0,
@@ -253,3 +255,88 @@ fn test_vector_sparse_ivf_update() {
);
assert!(!run(&tmp_db, || reader.query_next()).unwrap());
}
#[test]
fn test_vector_sparse_ivf_fuzz() {
let _ = env_logger::try_init();
const DIMS: usize = 40;
const MOD: u32 = 5;
let (mut rng, _) = rng_from_time_or_env();
let mut keys = Vec::new();
for _ in 0..10 {
let seed = rng.next_u64();
tracing::info!("======== seed: {} ========", seed);
let mut rng = ChaCha8Rng::seed_from_u64(seed);
let simple_db =
TempDatabase::new_with_rusqlite("CREATE TABLE t(key TEXT PRIMARY KEY, embedding)");
let index_db =
TempDatabase::new_with_rusqlite("CREATE TABLE t(key TEXT PRIMARY KEY, embedding)");
let simple_conn = simple_db.connect_limbo();
let index_conn = index_db.connect_limbo();
index_conn
.execute("CREATE INDEX t_idx ON t USING toy_vector_sparse_ivf (embedding)")
.unwrap();
let vector = |rng: &mut ChaCha8Rng| {
let mut values = Vec::with_capacity(DIMS);
for _ in 0..DIMS {
if rng.next_u32() % MOD == 0 {
values.push((rng.next_u32() as f32 / (u32::MAX as f32)).to_string());
} else {
values.push("0".to_string())
}
}
format!("[{}]", values.join(", "))
};
for _ in 0..200 {
let choice = rng.next_u32() % 4;
if choice == 0 {
let key = rng.next_u64().to_string();
let v = vector(&mut rng);
let sql = format!("INSERT INTO t VALUES ('{key}', vector32_sparse('{v}'))");
tracing::info!("{}", sql);
simple_conn.execute(&sql).unwrap();
index_conn.execute(sql).unwrap();
keys.push(key);
} else if choice == 1 && !keys.is_empty() {
let idx = rng.next_u32() as usize % keys.len();
let key = &keys[idx];
let v = vector(&mut rng);
let sql =
format!("UPDATE t SET embedding = vector32_sparse('{v}') WHERE key = '{key}'",);
tracing::info!("{}", sql);
simple_conn.execute(&sql).unwrap();
index_conn.execute(&sql).unwrap();
} else if choice == 2 && !keys.is_empty() {
let idx = rng.next_u32() as usize % keys.len();
let key = &keys[idx];
let sql = format!("DELETE FROM t WHERE key = '{key}'");
tracing::info!("{}", sql);
simple_conn.execute(&sql).unwrap();
index_conn.execute(&sql).unwrap();
keys.remove(idx);
} else {
let v = vector(&mut rng);
let k = rng.next_u32() % 20 + 1;
let sql = format!("SELECT key, vector_distance_jaccard(embedding, vector32_sparse('{v}')) as d FROM t ORDER BY d LIMIT {k}");
tracing::info!("{}", sql);
let simple_rows = limbo_exec_rows(&simple_db, &simple_conn, &sql);
let index_rows = limbo_exec_rows(&index_db, &index_conn, &sql);
assert!(index_rows.len() <= simple_rows.len());
for (a, b) in index_rows.iter().zip(simple_rows.iter()) {
assert_eq!(a, b);
}
for row in simple_rows.iter().skip(index_rows.len()) {
match row[1] {
rusqlite::types::Value::Real(r) => assert_eq!(r, 1.0),
_ => panic!("unexpected simple row value"),
}
}
tracing::info!("simple: {:?}, index_rows: {:?}", simple_rows, index_rows);
}
}
}
}