Merge 'Auto-create index in CREATE TABLE when necessary' from Jussi Saurio

Closes #448
Adds support for:
- Automatically creating index on the PRIMARY KEY if the pk is not a
rowid alias
- Parsing the automatically created index into memory, for use in
queries
    * `testing/testing_norowidalias.db` now uses the PK indexes and some
tests were failing -- looks like taking the index into use revealed some
bugs in our codegen :) I fixed those in later commits.
Does not add support for:
- Inserting to the index during writes to the table

Closes #588
This commit is contained in:
Pekka Enberg
2025-01-04 14:10:44 +02:00
6 changed files with 438 additions and 63 deletions

View File

@@ -508,11 +508,50 @@ impl Index {
_ => todo!("Expected create index statement"),
}
}
pub fn automatic_from_primary_key(
table: &BTreeTable,
index_name: &str,
root_page: usize,
) -> Result<Index> {
if table.primary_key_column_names.is_empty() {
return Err(crate::LimboError::InternalError(
"Cannot create automatic index for table without primary key".to_string(),
));
}
let index_columns = table
.primary_key_column_names
.iter()
.map(|col_name| {
// Verify that each primary key column exists in the table
if table.get_column(col_name).is_none() {
return Err(crate::LimboError::InternalError(format!(
"Primary key column {} not found in table {}",
col_name, table.name
)));
}
Ok(IndexColumn {
name: normalize_ident(col_name),
order: Order::Ascending, // Primary key indexes are always ascending
})
})
.collect::<Result<Vec<_>>>()?;
Ok(Index {
name: normalize_ident(index_name),
table_name: table.name.clone(),
root_page,
columns: index_columns,
unique: true, // Primary key indexes are always unique
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::LimboError;
#[test]
pub fn test_has_rowid_true() -> Result<()> {
@@ -735,4 +774,78 @@ mod tests {
let actual = sqlite_schema_table().to_sql();
assert_eq!(expected, actual);
}
#[test]
fn test_automatic_index_single_column() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a INTEGER PRIMARY KEY, b TEXT);"#;
let table = BTreeTable::from_sql(sql, 0)?;
let index = Index::automatic_from_primary_key(&table, "sqlite_autoindex_t1_1", 2)?;
assert_eq!(index.name, "sqlite_autoindex_t1_1");
assert_eq!(index.table_name, "t1");
assert_eq!(index.root_page, 2);
assert!(index.unique);
assert_eq!(index.columns.len(), 1);
assert_eq!(index.columns[0].name, "a");
assert!(matches!(index.columns[0].order, Order::Ascending));
Ok(())
}
#[test]
fn test_automatic_index_composite_key() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a INTEGER, b TEXT, PRIMARY KEY(a, b));"#;
let table = BTreeTable::from_sql(sql, 0)?;
let index = Index::automatic_from_primary_key(&table, "sqlite_autoindex_t1_1", 2)?;
assert_eq!(index.name, "sqlite_autoindex_t1_1");
assert_eq!(index.table_name, "t1");
assert_eq!(index.root_page, 2);
assert!(index.unique);
assert_eq!(index.columns.len(), 2);
assert_eq!(index.columns[0].name, "a");
assert_eq!(index.columns[1].name, "b");
assert!(matches!(index.columns[0].order, Order::Ascending));
assert!(matches!(index.columns[1].order, Order::Ascending));
Ok(())
}
#[test]
fn test_automatic_index_no_primary_key() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a INTEGER, b TEXT);"#;
let table = BTreeTable::from_sql(sql, 0)?;
let result = Index::automatic_from_primary_key(&table, "sqlite_autoindex_t1_1", 2);
assert!(result.is_err());
assert!(matches!(
result.unwrap_err(),
LimboError::InternalError(msg) if msg.contains("without primary key")
));
Ok(())
}
#[test]
fn test_automatic_index_nonexistent_column() -> Result<()> {
// Create a table with a primary key column that doesn't exist in the table
let table = BTreeTable {
root_page: 0,
name: "t1".to_string(),
has_rowid: true,
primary_key_column_names: vec!["nonexistent".to_string()],
columns: vec![Column {
name: "a".to_string(),
ty: Type::Integer,
primary_key: false,
is_rowid_alias: false,
}],
};
let result = Index::automatic_from_primary_key(&table, "sqlite_autoindex_t1_1", 2);
assert!(result.is_err());
assert!(matches!(
result.unwrap_err(),
LimboError::InternalError(msg) if msg.contains("not found in table")
));
Ok(())
}
}

View File

@@ -1360,8 +1360,8 @@ fn close_loop(
iter_dir,
..
} => {
let cursor_id = program.resolve_cursor_id(&table_reference.table_identifier);
program.resolve_label(loop_labels.next, program.offset());
let cursor_id = program.resolve_cursor_id(&table_reference.table_identifier);
if iter_dir
.as_ref()
.is_some_and(|dir| *dir == IterationDirection::Backwards)

View File

@@ -20,8 +20,9 @@ use crate::schema::Schema;
use crate::storage::pager::Pager;
use crate::storage::sqlite3_ondisk::{DatabaseHeader, MIN_PAGE_CACHE_SIZE};
use crate::translate::delete::translate_delete;
use crate::util::PRIMARY_KEY_AUTOMATIC_INDEX_NAME_PREFIX;
use crate::vdbe::{builder::ProgramBuilder, insn::Insn, Program};
use crate::{bail_parse_error, Connection, Result, SymbolTable};
use crate::{bail_parse_error, Connection, LimboError, Result, SymbolTable};
use insert::translate_insert;
use select::translate_select;
use sqlite3_parser::ast::fmt::ToTokens;
@@ -162,6 +163,213 @@ addr opcode p1 p2 p3 p4 p5 comment
31 Goto 0 1 0 0
*/
#[derive(Debug)]
enum SchemaEntryType {
Table,
Index,
}
impl SchemaEntryType {
fn as_str(&self) -> &'static str {
match self {
SchemaEntryType::Table => "table",
SchemaEntryType::Index => "index",
}
}
}
fn emit_schema_entry(
program: &mut ProgramBuilder,
sqlite_schema_cursor_id: usize,
entry_type: SchemaEntryType,
name: &str,
tbl_name: &str,
root_page_reg: usize,
sql: Option<String>,
) {
let rowid_reg = program.alloc_register();
program.emit_insn(Insn::NewRowid {
cursor: sqlite_schema_cursor_id,
rowid_reg,
prev_largest_reg: 0,
});
let type_reg = program.alloc_register();
program.emit_insn(Insn::String8 {
value: entry_type.as_str().to_string(),
dest: type_reg,
});
let name_reg = program.alloc_register();
program.emit_insn(Insn::String8 {
value: name.to_string(),
dest: name_reg,
});
let tbl_name_reg = program.alloc_register();
program.emit_insn(Insn::String8 {
value: tbl_name.to_string(),
dest: tbl_name_reg,
});
let rootpage_reg = program.alloc_register();
program.emit_insn(Insn::Copy {
src_reg: root_page_reg,
dst_reg: rootpage_reg,
amount: 1,
});
let sql_reg = program.alloc_register();
if let Some(sql) = sql {
program.emit_insn(Insn::String8 {
value: sql,
dest: sql_reg,
});
} else {
program.emit_insn(Insn::Null {
dest: sql_reg,
dest_end: None,
});
}
let record_reg = program.alloc_register();
program.emit_insn(Insn::MakeRecord {
start_reg: type_reg,
count: 5,
dest_reg: record_reg,
});
program.emit_insn(Insn::InsertAsync {
cursor: sqlite_schema_cursor_id,
key_reg: rowid_reg,
record_reg,
flag: 0,
});
program.emit_insn(Insn::InsertAwait {
cursor_id: sqlite_schema_cursor_id,
});
}
/// Check if an automatic PRIMARY KEY index is required for the table.
/// If so, create a register for the index root page and return it.
///
/// An automatic PRIMARY KEY index is not required if:
/// - The table has no PRIMARY KEY
/// - The table has a single-column PRIMARY KEY whose typename is _exactly_ "INTEGER" e.g. not "INT".
/// In this case, the PRIMARY KEY column becomes an alias for the rowid.
///
/// Otherwise, an automatic PRIMARY KEY index is required.
fn check_automatic_pk_index_required(
body: &ast::CreateTableBody,
program: &mut ProgramBuilder,
tbl_name: &str,
) -> Result<Option<usize>> {
match body {
ast::CreateTableBody::ColumnsAndConstraints {
columns,
constraints,
options,
} => {
let mut primary_key_definition = None;
// Check table constraints for PRIMARY KEY
if let Some(constraints) = constraints {
for constraint in constraints {
if let ast::TableConstraint::PrimaryKey {
columns: pk_cols, ..
} = &constraint.constraint
{
let primary_key_column_results: Vec<Result<&String>> = pk_cols
.iter()
.map(|col| match &col.expr {
ast::Expr::Id(name) => Ok(&name.0),
_ => Err(LimboError::ParseError(
"expressions prohibited in PRIMARY KEY and UNIQUE constraints"
.to_string(),
)),
})
.collect();
for result in primary_key_column_results {
if let Err(e) = result {
crate::bail_parse_error!("{}", e);
}
let column_name = result.unwrap();
let column_def = columns.get(&ast::Name(column_name.clone()));
if column_def.is_none() {
crate::bail_parse_error!("No such column: {}", column_name);
}
if matches!(
primary_key_definition,
Some(PrimaryKeyDefinitionType::Simple { .. })
) {
primary_key_definition = Some(PrimaryKeyDefinitionType::Composite);
continue;
}
if primary_key_definition.is_none() {
let column_def = column_def.unwrap();
let typename =
column_def.col_type.as_ref().map(|t| t.name.as_str());
primary_key_definition =
Some(PrimaryKeyDefinitionType::Simple { typename });
}
}
}
}
}
// Check column constraints for PRIMARY KEY
for (_, col_def) in columns.iter() {
for constraint in &col_def.constraints {
if matches!(
constraint.constraint,
ast::ColumnConstraint::PrimaryKey { .. }
) {
if primary_key_definition.is_some() {
crate::bail_parse_error!(
"table {} has more than one primary key",
tbl_name
);
}
let typename = col_def.col_type.as_ref().map(|t| t.name.as_str());
primary_key_definition =
Some(PrimaryKeyDefinitionType::Simple { typename });
}
}
}
// Check if table has rowid
if options.contains(ast::TableOptions::WITHOUT_ROWID) {
crate::bail_parse_error!("WITHOUT ROWID tables are not supported yet");
}
// Check if we need an automatic index
let needs_auto_index = if let Some(primary_key_definition) = &primary_key_definition {
match primary_key_definition {
PrimaryKeyDefinitionType::Simple { typename } => {
let is_integer = typename.is_some() && typename.unwrap() == "INTEGER";
!is_integer
}
PrimaryKeyDefinitionType::Composite => true,
}
} else {
false
};
if needs_auto_index {
let index_root_reg = program.alloc_register();
Ok(Some(index_root_reg))
} else {
Ok(None)
}
}
ast::CreateTableBody::AsSelect(_) => {
crate::bail_parse_error!("CREATE TABLE AS SELECT not supported yet")
}
}
}
fn translate_create_table(
tbl_name: ast::QualifiedName,
body: ast::CreateTableBody,
@@ -211,12 +419,48 @@ fn translate_create_table(
// TODO: If
// TODO: SetCookie
// TODO: SetCookie
let root_reg = program.alloc_register();
// Create the table B-tree
let table_root_reg = program.alloc_register();
program.emit_insn(Insn::CreateBtree {
db: 0,
root: root_reg,
flags: 1,
root: table_root_reg,
flags: 1, // Table leaf page
});
// Create an automatic index B-tree if needed
//
// NOTE: we are deviating from SQLite bytecode here. For some reason, SQLite first creates a placeholder entry
// for the table in sqlite_schema, then writes the index to sqlite_schema, then UPDATEs the table placeholder entry
// in sqlite_schema with actual data.
//
// What we do instead is:
// 1. Create the table B-tree
// 2. Create the index B-tree
// 3. Add the table entry to sqlite_schema
// 4. Add the index entry to sqlite_schema
//
// I.e. we skip the weird song and dance with the placeholder entry. Unclear why sqlite does this.
// The sqlite code has this comment:
//
// "This just creates a place-holder record in the sqlite_schema table.
// The record created does not contain anything yet. It will be replaced
// by the real entry in code generated at sqlite3EndTable()."
//
// References:
// https://github.com/sqlite/sqlite/blob/95f6df5b8d55e67d1e34d2bff217305a2f21b1fb/src/build.c#L1355
// https://github.com/sqlite/sqlite/blob/95f6df5b8d55e67d1e34d2bff217305a2f21b1fb/src/build.c#L2856-L2871
// https://github.com/sqlite/sqlite/blob/95f6df5b8d55e67d1e34d2bff217305a2f21b1fb/src/build.c#L1334C5-L1336C65
let index_root_reg = check_automatic_pk_index_required(&body, &mut program, &tbl_name.name.0)?;
if let Some(index_root_reg) = index_root_reg {
program.emit_insn(Insn::CreateBtree {
db: 0,
root: index_root_reg,
flags: 2, // Index leaf page
});
}
let table_id = "sqlite_schema".to_string();
let table = schema.get_table(&table_id).unwrap();
let table = crate::schema::Table::BTree(table.clone());
@@ -227,60 +471,35 @@ fn translate_create_table(
root_page: 1,
});
program.emit_insn(Insn::OpenWriteAwait {});
let rowid_reg = program.alloc_register();
program.emit_insn(Insn::NewRowid {
cursor: sqlite_schema_cursor_id,
rowid_reg,
prev_largest_reg: 0,
});
let null_reg_1 = program.alloc_register();
let null_reg_2 = program.alloc_register();
program.emit_insn(Insn::Null {
dest: null_reg_1,
dest_end: Some(null_reg_2),
});
let type_reg = program.alloc_register();
program.emit_insn(Insn::String8 {
value: "table".to_string(),
dest: type_reg,
});
let name_reg = program.alloc_register();
program.emit_insn(Insn::String8 {
value: tbl_name.name.0.to_string(),
dest: name_reg,
});
let tbl_name_reg = program.alloc_register();
program.emit_insn(Insn::String8 {
value: tbl_name.name.0.to_string(),
dest: tbl_name_reg,
});
let rootpage_reg = program.alloc_register();
program.emit_insn(Insn::Copy {
src_reg: root_reg,
dst_reg: rootpage_reg,
amount: 1,
});
let sql_reg = program.alloc_register();
program.emit_insn(Insn::String8 {
value: sql.to_string(),
dest: sql_reg,
});
let record_reg = program.alloc_register();
program.emit_insn(Insn::MakeRecord {
start_reg: type_reg,
count: 5,
dest_reg: record_reg,
});
// TODO: Delete
program.emit_insn(Insn::InsertAsync {
cursor: sqlite_schema_cursor_id,
key_reg: rowid_reg,
record_reg,
flag: 0,
});
program.emit_insn(Insn::InsertAwait {
cursor_id: sqlite_schema_cursor_id,
});
// Add the table entry to sqlite_schema
emit_schema_entry(
&mut program,
sqlite_schema_cursor_id,
SchemaEntryType::Table,
&tbl_name.name.0,
&tbl_name.name.0,
table_root_reg,
Some(sql),
);
// If we need an automatic index, add its entry to sqlite_schema
if let Some(index_root_reg) = index_root_reg {
let index_name = format!(
"{}{}_1",
PRIMARY_KEY_AUTOMATIC_INDEX_NAME_PREFIX, tbl_name.name.0
);
emit_schema_entry(
&mut program,
sqlite_schema_cursor_id,
SchemaEntryType::Index,
&index_name,
&tbl_name.name.0,
index_root_reg,
None,
);
}
program.resolve_label(parse_schema_label, program.offset());
// TODO: SetCookie
//
@@ -305,6 +524,11 @@ fn translate_create_table(
Ok(program.build(database_header, connection))
}
enum PrimaryKeyDefinitionType<'a> {
Simple { typename: Option<&'a str> },
Composite,
}
fn translate_pragma(
name: &ast::QualifiedName,
body: Option<ast::PragmaBody>,

View File

@@ -709,6 +709,23 @@ impl Optimizable for ast::Expr {
Ok(None)
}
Self::Binary(lhs, op, rhs) => {
// Only consider index scans for binary ops that are comparisons.
// e.g. "t1.id = t2.id" is a valid index scan, but "t1.id + 1" is not.
//
// TODO/optimization: consider detecting index scan on e.g. table t1 in
// "WHERE t1.id + 1 = t2.id"
// here the Expr could be rewritten to "t1.id = t2.id - 1"
// and then t1.id could be used as an index key.
if !matches!(
*op,
ast::Operator::Equals
| ast::Operator::Greater
| ast::Operator::GreaterEquals
| ast::Operator::Less
| ast::Operator::LessEquals
) {
return Ok(None);
}
let lhs_index =
lhs.check_index_scan(table_index, referenced_tables, available_indexes)?;
if lhs_index.is_some() {

View File

@@ -23,8 +23,11 @@ pub fn normalize_ident(identifier: &str) -> String {
.to_lowercase()
}
pub const PRIMARY_KEY_AUTOMATIC_INDEX_NAME_PREFIX: &str = "sqlite_autoindex_";
pub fn parse_schema_rows(rows: Option<Rows>, schema: &mut Schema, io: Arc<dyn IO>) -> Result<()> {
if let Some(mut rows) = rows {
let mut automatic_indexes = Vec::new();
loop {
match rows.next_row()? {
StepResult::Row(row) => {
@@ -46,8 +49,19 @@ pub fn parse_schema_rows(rows: Option<Rows>, schema: &mut Schema, io: Arc<dyn IO
let index = schema::Index::from_sql(sql, root_page as usize)?;
schema.add_index(Rc::new(index));
}
_ => continue,
// TODO parse auto index structures
_ => {
// Automatic index on primary key, e.g.
// table|foo|foo|2|CREATE TABLE foo (a text PRIMARY KEY, b)
// index|sqlite_autoindex_foo_1|foo|3|
let index_name = row.get::<&str>(1)?;
let table_name = row.get::<&str>(2)?;
let root_page = row.get::<i64>(3)?;
automatic_indexes.push((
index_name.to_string(),
table_name.to_string(),
root_page,
));
}
}
}
_ => continue,
@@ -63,6 +77,13 @@ pub fn parse_schema_rows(rows: Option<Rows>, schema: &mut Schema, io: Arc<dyn IO
StepResult::Busy => break,
}
}
for (index_name, table_name, root_page) in automatic_indexes {
// We need to process these after all tables are loaded into memory due to the schema.get_table() call
let table = schema.get_table(&table_name).unwrap();
let index =
schema::Index::automatic_from_primary_key(&table, &index_name, root_page as usize)?;
schema.add_index(Rc::new(index));
}
}
Ok(())
}

View File

@@ -1843,7 +1843,7 @@ impl Program {
}
state.pc += 1;
}
Insn::CreateBtree { db, root, flags: _ } => {
Insn::CreateBtree { db, root, flags } => {
if *db > 0 {
// TODO: implement temp datbases
todo!("temp databases not implemented yet");
@@ -1854,7 +1854,7 @@ impl Program {
self.database_header.clone(),
));
let root_page = cursor.btree_create(1);
let root_page = cursor.btree_create(*flags);
state.registers[*root] = OwnedValue::Integer(root_page as i64);
state.pc += 1;
}