Files
turso/core/schema.rs

3394 lines
126 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

use crate::function::Func;
use crate::incremental::view::IncrementalView;
use crate::index_method::{IndexMethodAttachment, IndexMethodConfiguration};
use crate::translate::expr::{bind_and_rewrite_expr, walk_expr, BindingBehavior, WalkControl};
use crate::translate::index::{resolve_index_method_parameters, resolve_sorted_columns};
use crate::translate::planner::ROWID_STRS;
use crate::vdbe::affinity::Affinity;
use parking_lot::RwLock;
use turso_macros::AtomicEnum;
#[derive(Debug, Clone, AtomicEnum)]
pub enum ViewState {
Ready,
InProgress,
}
/// Simple view structure for non-materialized views
#[derive(Debug)]
pub struct View {
pub name: String,
pub sql: String,
pub select_stmt: ast::Select,
pub columns: Vec<Column>,
pub state: AtomicViewState,
}
impl View {
fn new(name: String, sql: String, select_stmt: ast::Select, columns: Vec<Column>) -> Self {
Self {
name,
sql,
select_stmt,
columns,
state: AtomicViewState::new(ViewState::Ready),
}
}
pub fn process(&self) -> Result<()> {
let state = self.state.get();
match state {
ViewState::InProgress => {
bail_parse_error!("view {} is circularly defined", self.name)
}
ViewState::Ready => {
self.state.set(ViewState::InProgress);
Ok(())
}
}
}
pub fn done(&self) {
let state = self.state.get();
match state {
ViewState::InProgress => {
self.state.set(ViewState::Ready);
}
ViewState::Ready => {}
}
}
}
impl Clone for View {
fn clone(&self) -> Self {
Self {
name: self.name.clone(),
sql: self.sql.clone(),
select_stmt: self.select_stmt.clone(),
columns: self.columns.clone(),
state: AtomicViewState::new(ViewState::Ready),
}
}
}
/// Type alias for regular views collection
pub type ViewsMap = HashMap<String, Arc<View>>;
/// Trigger structure
#[derive(Debug, Clone)]
pub struct Trigger {
pub name: String,
pub sql: String,
pub table_name: String,
pub time: turso_parser::ast::TriggerTime,
pub event: turso_parser::ast::TriggerEvent,
pub for_each_row: bool,
pub when_clause: Option<turso_parser::ast::Expr>,
pub commands: Vec<turso_parser::ast::TriggerCmd>,
pub temporary: bool,
}
impl Trigger {
#[allow(clippy::too_many_arguments)]
pub fn new(
name: String,
sql: String,
table_name: String,
time: Option<turso_parser::ast::TriggerTime>,
event: turso_parser::ast::TriggerEvent,
for_each_row: bool,
when_clause: Option<turso_parser::ast::Expr>,
commands: Vec<turso_parser::ast::TriggerCmd>,
temporary: bool,
) -> Self {
Self {
name,
sql,
table_name,
time: time.unwrap_or(turso_parser::ast::TriggerTime::Before),
event,
for_each_row,
when_clause,
commands,
temporary,
}
}
}
use crate::storage::btree::{BTreeCursor, CursorTrait};
use crate::translate::collate::CollationSeq;
use crate::translate::plan::{SelectPlan, TableReferences};
use crate::util::{
module_args_from_sql, module_name_from_sql, type_from_name, IOExt, UnparsedFromSqlIndex,
};
use crate::{
bail_parse_error, contains_ignore_ascii_case, eq_ignore_ascii_case, match_ignore_ascii_case,
Connection, LimboError, MvCursor, MvStore, Pager, SymbolTable, ValueRef, VirtualTable,
};
use crate::{util::normalize_ident, Result};
use core::fmt;
use std::collections::{HashMap, HashSet, VecDeque};
use std::ops::Deref;
use std::sync::Arc;
use std::sync::Mutex;
use tracing::trace;
use turso_parser::ast::{
self, ColumnDefinition, Expr, InitDeferredPred, Literal, RefAct, SortOrder, TableOptions,
};
use turso_parser::{
ast::{Cmd, CreateTableBody, ResultColumn, Stmt},
parser::Parser,
};
const SCHEMA_TABLE_NAME: &str = "sqlite_schema";
const SCHEMA_TABLE_NAME_ALT: &str = "sqlite_master";
pub const DBSP_TABLE_PREFIX: &str = "__turso_internal_dbsp_state_v";
/// Used to refer to the implicit rowid column in tables without an alias during UPDATE
pub const ROWID_SENTINEL: usize = usize::MAX;
/// Internal table prefixes that should be protected from CREATE/DROP
pub const RESERVED_TABLE_PREFIXES: [&str; 2] = ["sqlite_", "__turso_internal_"];
/// Check if a table name refers to a system table that should be protected from direct writes
pub fn is_system_table(table_name: &str) -> bool {
let normalized = table_name.to_lowercase();
normalized == SCHEMA_TABLE_NAME
|| normalized == SCHEMA_TABLE_NAME_ALT
|| table_name.starts_with(DBSP_TABLE_PREFIX)
}
#[derive(Debug)]
pub struct Schema {
pub tables: HashMap<String, Arc<Table>>,
/// Track which tables are actually materialized views
pub materialized_view_names: HashSet<String>,
/// Store original SQL for materialized views (for .schema command)
pub materialized_view_sql: HashMap<String, String>,
/// The incremental view objects (DBSP circuits)
pub incremental_views: HashMap<String, Arc<Mutex<IncrementalView>>>,
pub views: ViewsMap,
/// table_name to list of triggers
pub triggers: HashMap<String, VecDeque<Arc<Trigger>>>,
/// table_name to list of indexes for the table
pub indexes: HashMap<String, VecDeque<Arc<Index>>>,
pub has_indexes: std::collections::HashSet<String>,
pub indexes_enabled: bool,
pub schema_version: u32,
/// Mapping from table names to the materialized views that depend on them
pub table_to_materialized_views: HashMap<String, Vec<String>>,
/// Track views that exist but have incompatible versions
pub incompatible_views: HashSet<String>,
}
impl Schema {
pub fn new(indexes_enabled: bool) -> Self {
let mut tables: HashMap<String, Arc<Table>> = HashMap::new();
let has_indexes = std::collections::HashSet::new();
let indexes: HashMap<String, VecDeque<Arc<Index>>> = HashMap::new();
#[allow(clippy::arc_with_non_send_sync)]
tables.insert(
SCHEMA_TABLE_NAME.to_string(),
Arc::new(Table::BTree(sqlite_schema_table().into())),
);
for function in VirtualTable::builtin_functions() {
tables.insert(
function.name.to_owned(),
Arc::new(Table::Virtual(Arc::new((*function).clone()))),
);
}
let materialized_view_names = HashSet::new();
let materialized_view_sql = HashMap::new();
let incremental_views = HashMap::new();
let views: ViewsMap = HashMap::new();
let triggers = HashMap::new();
let table_to_materialized_views: HashMap<String, Vec<String>> = HashMap::new();
let incompatible_views = HashSet::new();
Self {
tables,
materialized_view_names,
materialized_view_sql,
incremental_views,
views,
triggers,
indexes,
has_indexes,
indexes_enabled,
schema_version: 0,
table_to_materialized_views,
incompatible_views,
}
}
pub fn is_unique_idx_name(&self, name: &str) -> bool {
!self
.indexes
.iter()
.any(|idx| idx.1.iter().any(|i| i.name == name))
}
pub fn add_materialized_view(&mut self, view: IncrementalView, table: Arc<Table>, sql: String) {
let name = normalize_ident(view.name());
// Add to tables (so it appears as a regular table)
self.tables.insert(name.clone(), table);
// Track that this is a materialized view
self.materialized_view_names.insert(name.clone());
self.materialized_view_sql.insert(name.clone(), sql);
// Store the incremental view (DBSP circuit)
self.incremental_views
.insert(name, Arc::new(Mutex::new(view)));
}
pub fn get_materialized_view(&self, name: &str) -> Option<Arc<Mutex<IncrementalView>>> {
let name = normalize_ident(name);
self.incremental_views.get(&name).cloned()
}
/// Check if DBSP state table exists with the current version
pub fn has_compatible_dbsp_state_table(&self, view_name: &str) -> bool {
use crate::incremental::compiler::DBSP_CIRCUIT_VERSION;
let view_name = normalize_ident(view_name);
let expected_table_name = format!("{DBSP_TABLE_PREFIX}{DBSP_CIRCUIT_VERSION}_{view_name}");
// Check if a table with the expected versioned name exists
self.tables.contains_key(&expected_table_name)
}
pub fn is_materialized_view(&self, name: &str) -> bool {
let name = normalize_ident(name);
self.materialized_view_names.contains(&name)
}
/// Check if a table has any incompatible dependent materialized views
pub fn has_incompatible_dependent_views(&self, table_name: &str) -> Vec<String> {
let table_name = normalize_ident(table_name);
// Get all materialized views that depend on this table
let dependent_views = self
.table_to_materialized_views
.get(&table_name)
.cloned()
.unwrap_or_default();
// Filter to only incompatible views
dependent_views
.into_iter()
.filter(|view_name| self.incompatible_views.contains(view_name))
.collect()
}
pub fn remove_view(&mut self, name: &str) -> Result<()> {
let name = normalize_ident(name);
if self.views.contains_key(&name) {
self.views.remove(&name);
Ok(())
} else if self.materialized_view_names.contains(&name) {
// Remove from tables
self.tables.remove(&name);
// Remove DBSP state table and its indexes from in-memory schema
use crate::incremental::compiler::DBSP_CIRCUIT_VERSION;
let dbsp_table_name = format!("{DBSP_TABLE_PREFIX}{DBSP_CIRCUIT_VERSION}_{name}");
self.tables.remove(&dbsp_table_name);
self.remove_indices_for_table(&dbsp_table_name);
// Remove from materialized view tracking
self.materialized_view_names.remove(&name);
self.materialized_view_sql.remove(&name);
self.incremental_views.remove(&name);
// Remove from table_to_materialized_views dependencies
for views in self.table_to_materialized_views.values_mut() {
views.retain(|v| v != &name);
}
Ok(())
} else {
Err(crate::LimboError::ParseError(format!(
"no such view: {name}"
)))
}
}
/// Register that a materialized view depends on a table
pub fn add_materialized_view_dependency(&mut self, table_name: &str, view_name: &str) {
let table_name = normalize_ident(table_name);
let view_name = normalize_ident(view_name);
self.table_to_materialized_views
.entry(table_name)
.or_default()
.push(view_name);
}
/// Get all materialized views that depend on a given table
pub fn get_dependent_materialized_views(&self, table_name: &str) -> Vec<String> {
if self.table_to_materialized_views.is_empty() {
return Vec::new();
}
let table_name = normalize_ident(table_name);
self.table_to_materialized_views
.get(&table_name)
.cloned()
.unwrap_or_default()
}
/// Add a regular (non-materialized) view
pub fn add_view(&mut self, view: View) -> Result<()> {
self.check_object_name_conflict(&view.name)?;
let name = normalize_ident(&view.name);
self.views.insert(name, Arc::new(view));
Ok(())
}
/// Get a regular view by name
pub fn get_view(&self, name: &str) -> Option<Arc<View>> {
let name = normalize_ident(name);
self.views.get(&name).cloned()
}
pub fn add_trigger(&mut self, trigger: Trigger, table_name: &str) -> Result<()> {
self.check_object_name_conflict(&trigger.name)?;
let table_name = normalize_ident(table_name);
// See [Schema::add_index] for why we push to the front of the deque.
self.triggers
.entry(table_name)
.or_default()
.push_front(Arc::new(trigger));
Ok(())
}
pub fn remove_trigger(&mut self, name: &str) -> Result<()> {
let name = normalize_ident(name);
let mut removed = false;
for triggers_list in self.triggers.values_mut() {
for i in 0..triggers_list.len() {
let trigger = &triggers_list[i];
if normalize_ident(&trigger.name) == name {
removed = true;
triggers_list.remove(i);
break;
}
}
if removed {
break;
}
}
if !removed {
return Err(crate::LimboError::ParseError(format!(
"no such trigger: {name}"
)));
}
Ok(())
}
pub fn get_trigger_for_table(&self, table_name: &str, name: &str) -> Option<Arc<Trigger>> {
let table_name = normalize_ident(table_name);
let name = normalize_ident(name);
self.triggers
.get(&table_name)
.and_then(|triggers| triggers.iter().find(|t| t.name == name).cloned())
}
pub fn get_triggers_for_table(
&self,
table_name: &str,
) -> impl Iterator<Item = &Arc<Trigger>> + Clone {
let table_name = normalize_ident(table_name);
self.triggers
.get(&table_name)
.map(|triggers| triggers.iter())
.unwrap_or_default()
}
pub fn get_trigger(&self, name: &str) -> Option<Arc<Trigger>> {
let name = normalize_ident(name);
self.triggers
.values()
.flatten()
.find(|t| t.name == name)
.cloned()
}
pub fn add_btree_table(&mut self, table: Arc<BTreeTable>) -> Result<()> {
self.check_object_name_conflict(&table.name)?;
let name = normalize_ident(&table.name);
self.tables.insert(name, Table::BTree(table).into());
Ok(())
}
pub fn add_virtual_table(&mut self, table: Arc<VirtualTable>) -> Result<()> {
self.check_object_name_conflict(&table.name)?;
let name = normalize_ident(&table.name);
self.tables.insert(name, Table::Virtual(table).into());
Ok(())
}
pub fn get_table(&self, name: &str) -> Option<Arc<Table>> {
let name = normalize_ident(name);
let name = if name.eq_ignore_ascii_case(SCHEMA_TABLE_NAME_ALT) {
SCHEMA_TABLE_NAME
} else {
&name
};
self.tables.get(name).cloned()
}
pub fn remove_table(&mut self, table_name: &str) {
let name = normalize_ident(table_name);
self.tables.remove(&name);
// If this was a materialized view, also clean up the metadata
if self.materialized_view_names.remove(&name) {
self.incremental_views.remove(&name);
self.materialized_view_sql.remove(&name);
}
}
pub fn get_btree_table(&self, name: &str) -> Option<Arc<BTreeTable>> {
let name = normalize_ident(name);
if let Some(table) = self.tables.get(&name) {
table.btree()
} else {
None
}
}
pub fn add_index(&mut self, index: Arc<Index>) -> Result<()> {
self.check_object_name_conflict(&index.name)?;
let table_name = normalize_ident(&index.table_name);
// We must add the new index to the front of the deque, because SQLite stores index definitions as a linked list
// where the newest parsed index entry is at the head of list. If we would add it to the back of a regular Vec for example,
// then we would evaluate ON CONFLICT DO UPDATE clauses in the wrong index iteration order and UPDATE the wrong row. One might
// argue that this is an implementation detail and we should not care about this, but it makes e.g. the fuzz test 'partial_index_mutation_and_upsert_fuzz'
// fail, so let's just be compatible.
self.indexes
.entry(table_name)
.or_default()
.push_front(index.clone());
Ok(())
}
pub fn get_indices(&self, table_name: &str) -> impl Iterator<Item = &Arc<Index>> {
let name = normalize_ident(table_name);
self.indexes
.get(&name)
.map(|v| v.iter())
.unwrap_or_default()
.filter(|i| !i.is_backing_btree_index())
}
pub fn get_index(&self, table_name: &str, index_name: &str) -> Option<&Arc<Index>> {
let name = normalize_ident(table_name);
self.indexes
.get(&name)?
.iter()
.find(|index| index.name == index_name)
}
pub fn remove_indices_for_table(&mut self, table_name: &str) {
let name = normalize_ident(table_name);
self.indexes.remove(&name);
}
pub fn remove_index(&mut self, idx: &Index) {
let name = normalize_ident(&idx.table_name);
self.indexes
.get_mut(&name)
.expect("Must have the index")
.retain_mut(|other_idx| other_idx.name != idx.name);
}
pub fn table_has_indexes(&self, table_name: &str) -> bool {
let name = normalize_ident(table_name);
self.has_indexes.contains(&name)
}
pub fn table_set_has_index(&mut self, table_name: &str) {
self.has_indexes.insert(table_name.to_string());
}
pub fn indexes_enabled(&self) -> bool {
self.indexes_enabled
}
/// Update [Schema] by scanning the first root page (sqlite_schema)
pub fn make_from_btree(
&mut self,
mv_cursor: Option<Arc<RwLock<MvCursor>>>,
pager: Arc<Pager>,
syms: &SymbolTable,
) -> Result<()> {
assert!(
mv_cursor.is_none(),
"mvcc not yet supported for make_from_btree"
);
let mut cursor = BTreeCursor::new_table(Arc::clone(&pager), 1, 10);
let mut from_sql_indexes = Vec::with_capacity(10);
let mut automatic_indices: HashMap<String, Vec<(String, i64)>> = HashMap::with_capacity(10);
// Store DBSP state table root pages: view_name -> dbsp_state_root_page
let mut dbsp_state_roots: HashMap<String, i64> = HashMap::new();
// Store DBSP state table index root pages: view_name -> dbsp_state_index_root_page
let mut dbsp_state_index_roots: HashMap<String, i64> = HashMap::new();
// Store materialized view info (SQL and root page) for later creation
let mut materialized_view_info: HashMap<String, (String, i64)> = HashMap::new();
pager.begin_read_tx()?;
pager.io.block(|| cursor.rewind())?;
loop {
let Some(row) = pager.io.block(|| cursor.record())? else {
break;
};
let mut record_cursor = cursor.record_cursor.borrow_mut();
// sqlite schema table has 5 columns: type, name, tbl_name, rootpage, sql
let ty_value = record_cursor.get_value(&row, 0)?;
let ValueRef::Text(ty) = ty_value else {
return Err(LimboError::ConversionError("Expected text value".into()));
};
let ValueRef::Text(name) = record_cursor.get_value(&row, 1)? else {
return Err(LimboError::ConversionError("Expected text value".into()));
};
let table_name_value = record_cursor.get_value(&row, 2)?;
let ValueRef::Text(table_name) = table_name_value else {
return Err(LimboError::ConversionError("Expected text value".into()));
};
let root_page_value = record_cursor.get_value(&row, 3)?;
let ValueRef::Integer(root_page) = root_page_value else {
return Err(LimboError::ConversionError("Expected integer value".into()));
};
let sql_value = record_cursor.get_value(&row, 4)?;
let sql_textref = match sql_value {
ValueRef::Text(sql) => Some(sql),
_ => None,
};
let sql = sql_textref.map(|s| s.as_str());
self.handle_schema_row(
&ty,
&name,
&table_name,
root_page,
sql,
syms,
&mut from_sql_indexes,
&mut automatic_indices,
&mut dbsp_state_roots,
&mut dbsp_state_index_roots,
&mut materialized_view_info,
None,
)?;
drop(record_cursor);
drop(row);
pager.io.block(|| cursor.next())?;
}
pager.end_read_tx();
self.populate_indices(syms, from_sql_indexes, automatic_indices)?;
self.populate_materialized_views(
materialized_view_info,
dbsp_state_roots,
dbsp_state_index_roots,
)?;
Ok(())
}
/// Populate indices parsed from the schema.
/// from_sql_indexes: indices explicitly created with CREATE INDEX
/// automatic_indices: indices created automatically for primary key and unique constraints
pub fn populate_indices(
&mut self,
syms: &SymbolTable,
from_sql_indexes: Vec<UnparsedFromSqlIndex>,
automatic_indices: std::collections::HashMap<String, Vec<(String, i64)>>,
) -> Result<()> {
for unparsed_sql_from_index in from_sql_indexes {
if !self.indexes_enabled() {
self.table_set_has_index(&unparsed_sql_from_index.table_name);
} else {
let table = self
.get_btree_table(&unparsed_sql_from_index.table_name)
.unwrap();
let index = Index::from_sql(
syms,
&unparsed_sql_from_index.sql,
unparsed_sql_from_index.root_page,
table.as_ref(),
)?;
self.add_index(Arc::new(index))?;
}
}
for automatic_index in automatic_indices {
if !self.indexes_enabled() {
self.table_set_has_index(&automatic_index.0);
continue;
}
// Autoindexes must be parsed in definition order.
// The SQL statement parser enforces that the column definitions come first, and compounds are defined after that,
// e.g. CREATE TABLE t (a, b, UNIQUE(a, b)), and you can't do something like CREATE TABLE t (a, b, UNIQUE(a, b), c);
// Hence, we can process the singles first (unique_set.columns.len() == 1), and then the compounds (unique_set.columns.len() > 1).
let table = self.get_btree_table(&automatic_index.0).unwrap();
let mut automatic_indexes = automatic_index.1;
automatic_indexes.reverse(); // reverse so we can pop() without shifting array elements, while still processing in left-to-right order
let mut pk_index_added = false;
for unique_set in table.unique_sets.iter().filter(|us| us.columns.len() == 1) {
let col_name = &unique_set.columns.first().unwrap().0;
let Some((pos_in_table, column)) = table.get_column(col_name) else {
return Err(LimboError::ParseError(format!(
"Column {col_name} not found in table {}",
table.name
)));
};
if column.primary_key() && unique_set.is_primary_key {
if column.is_rowid_alias() {
// rowid alias, no index needed
continue;
}
assert!(table.primary_key_columns.first().unwrap().0 == *col_name, "trying to add a primary key index for column that is not the first column in the primary key: {} != {}", table.primary_key_columns.first().unwrap().0, col_name);
// Add single column primary key index
assert!(
!pk_index_added,
"trying to add a second primary key index for table {}",
table.name
);
pk_index_added = true;
self.add_index(Arc::new(Index::automatic_from_primary_key(
table.as_ref(),
automatic_indexes.pop().unwrap(),
1,
)?))?;
} else {
// Add single column unique index
if let Some(autoidx) = automatic_indexes.pop() {
self.add_index(Arc::new(Index::automatic_from_unique(
table.as_ref(),
autoidx,
vec![(pos_in_table, unique_set.columns.first().unwrap().1)],
)?))?;
}
}
}
for unique_set in table.unique_sets.iter().filter(|us| us.columns.len() > 1) {
if unique_set.is_primary_key {
assert!(table.primary_key_columns.len() == unique_set.columns.len(), "trying to add a {}-column primary key index for table {}, but the table has {} primary key columns", unique_set.columns.len(), table.name, table.primary_key_columns.len());
// Add composite primary key index
assert!(
!pk_index_added,
"trying to add a second primary key index for table {}",
table.name
);
pk_index_added = true;
self.add_index(Arc::new(Index::automatic_from_primary_key(
table.as_ref(),
automatic_indexes.pop().unwrap(),
unique_set.columns.len(),
)?))?;
} else {
// Add composite unique index
let mut column_indices_and_sort_orders =
Vec::with_capacity(unique_set.columns.len());
for (col_name, sort_order) in unique_set.columns.iter() {
let Some((pos_in_table, _)) = table.get_column(col_name) else {
return Err(crate::LimboError::ParseError(format!(
"Column {} not found in table {}",
col_name, table.name
)));
};
column_indices_and_sort_orders.push((pos_in_table, *sort_order));
}
self.add_index(Arc::new(Index::automatic_from_unique(
table.as_ref(),
automatic_indexes.pop().unwrap(),
column_indices_and_sort_orders,
)?))?;
}
}
assert!(automatic_indexes.is_empty(), "all automatic indexes parsed from sqlite_schema should have been consumed, but {} remain", automatic_indexes.len());
}
Ok(())
}
/// Populate materialized views parsed from the schema.
pub fn populate_materialized_views(
&mut self,
materialized_view_info: std::collections::HashMap<String, (String, i64)>,
dbsp_state_roots: std::collections::HashMap<String, i64>,
dbsp_state_index_roots: std::collections::HashMap<String, i64>,
) -> Result<()> {
for (view_name, (sql, main_root)) in materialized_view_info {
// Look up the DBSP state root for this view
// If missing, it means version mismatch - skip this view
// Check if we have a compatible DBSP state root
let dbsp_state_root = if let Some(&root) = dbsp_state_roots.get(&view_name) {
root
} else {
tracing::warn!(
"Materialized view '{}' has incompatible version or missing DBSP state table",
view_name
);
// Track this as an incompatible view
self.incompatible_views.insert(view_name.clone());
// Use a dummy root page - the view won't be usable anyway
0
};
// Look up the DBSP state index root (may not exist for older schemas)
let dbsp_state_index_root =
dbsp_state_index_roots.get(&view_name).copied().unwrap_or(0);
// Create the IncrementalView with all root pages
let incremental_view = IncrementalView::from_sql(
&sql,
self,
main_root,
dbsp_state_root,
dbsp_state_index_root,
)?;
let referenced_tables = incremental_view.get_referenced_table_names();
// Create a BTreeTable for the materialized view
let table = Arc::new(Table::BTree(Arc::new(BTreeTable {
name: view_name.clone(),
root_page: main_root,
columns: incremental_view.column_schema.flat_columns(),
primary_key_columns: Vec::new(),
has_rowid: true,
is_strict: false,
has_autoincrement: false,
foreign_keys: vec![],
unique_sets: vec![],
})));
// Only add to schema if compatible
if !self.incompatible_views.contains(&view_name) {
self.add_materialized_view(incremental_view, table, sql);
}
// Register dependencies regardless of compatibility
for table_name in referenced_tables {
self.add_materialized_view_dependency(&table_name, &view_name);
}
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
pub fn handle_schema_row(
&mut self,
ty: &str,
name: &str,
table_name: &str,
root_page: i64,
maybe_sql: Option<&str>,
syms: &SymbolTable,
from_sql_indexes: &mut Vec<UnparsedFromSqlIndex>,
automatic_indices: &mut std::collections::HashMap<String, Vec<(String, i64)>>,
dbsp_state_roots: &mut std::collections::HashMap<String, i64>,
dbsp_state_index_roots: &mut std::collections::HashMap<String, i64>,
materialized_view_info: &mut std::collections::HashMap<String, (String, i64)>,
mv_store: Option<&Arc<MvStore>>,
) -> Result<()> {
match ty {
"table" => {
let sql = maybe_sql.expect("sql should be present for table");
let sql_bytes = sql.as_bytes();
if root_page == 0 && contains_ignore_ascii_case!(sql_bytes, b"create virtual") {
// a virtual table is found in the sqlite_schema, but it's no
// longer in the in-memory schema. We need to recreate it if
// the module is loaded in the symbol table.
let vtab = if let Some(vtab) = syms.vtabs.get(name) {
vtab.clone()
} else {
let mod_name = module_name_from_sql(sql)?;
crate::VirtualTable::table(
Some(name),
mod_name,
module_args_from_sql(sql)?,
syms,
)?
};
self.add_virtual_table(vtab)?;
} else {
let table = BTreeTable::from_sql(sql, root_page)?;
// Check if this is a DBSP state table
if table.name.starts_with(DBSP_TABLE_PREFIX) {
// Extract version and view name from __turso_internal_dbsp_state_v<version>_<viewname>
let suffix = table.name.strip_prefix(DBSP_TABLE_PREFIX).unwrap();
// Parse version and view name (format: "<version>_<viewname>")
if let Some(underscore_pos) = suffix.find('_') {
let version_str = &suffix[..underscore_pos];
let view_name = &suffix[underscore_pos + 1..];
// Check version compatibility
if let Ok(stored_version) = version_str.parse::<u32>() {
use crate::incremental::compiler::DBSP_CIRCUIT_VERSION;
if stored_version == DBSP_CIRCUIT_VERSION {
// Version matches, store the root page
dbsp_state_roots.insert(view_name.to_string(), root_page);
} else {
// Version mismatch - DO NOT insert into dbsp_state_roots
// This will cause populate_materialized_views to skip this view
tracing::warn!(
"Skipping materialized view '{}' - has version {} but current version is {}. DROP and recreate the view to use it.",
view_name, stored_version, DBSP_CIRCUIT_VERSION
);
// We can't track incompatible views here since we're in handle_schema_row
// which doesn't have mutable access to self
}
}
}
}
self.add_btree_table(Arc::new(table))?;
}
}
"index" => {
assert!(mv_store.is_none(), "indexes not yet supported for mvcc");
match maybe_sql {
Some(sql) => {
from_sql_indexes.push(UnparsedFromSqlIndex {
table_name: table_name.to_string(),
root_page,
sql: sql.to_string(),
});
}
None => {
// Automatic index on primary key and/or unique constraint, e.g.
// table|foo|foo|2|CREATE TABLE foo (a text PRIMARY KEY, b)
// index|sqlite_autoindex_foo_1|foo|3|
let index_name = name.to_string();
let table_name = table_name.to_string();
// Check if this is an index for a DBSP state table
if table_name.starts_with(DBSP_TABLE_PREFIX) {
// Extract version and view name from __turso_internal_dbsp_state_v<version>_<viewname>
let suffix = table_name.strip_prefix(DBSP_TABLE_PREFIX).unwrap();
// Parse version and view name (format: "<version>_<viewname>")
if let Some(underscore_pos) = suffix.find('_') {
let version_str = &suffix[..underscore_pos];
let view_name = &suffix[underscore_pos + 1..];
// Only store index root if version matches
if let Ok(stored_version) = version_str.parse::<u32>() {
use crate::incremental::compiler::DBSP_CIRCUIT_VERSION;
if stored_version == DBSP_CIRCUIT_VERSION {
dbsp_state_index_roots
.insert(view_name.to_string(), root_page);
}
}
}
} else {
match automatic_indices.entry(table_name) {
std::collections::hash_map::Entry::Vacant(e) => {
e.insert(vec![(index_name, root_page)]);
}
std::collections::hash_map::Entry::Occupied(mut e) => {
e.get_mut().push((index_name, root_page));
}
}
}
}
}
}
"view" => {
use crate::schema::View;
use turso_parser::ast::{Cmd, Stmt};
use turso_parser::parser::Parser;
let sql = maybe_sql.expect("sql should be present for view");
let view_name = name.to_string();
assert!(mv_store.is_none(), "views not yet supported for mvcc");
// Parse the SQL to determine if it's a regular or materialized view
let mut parser = Parser::new(sql.as_bytes());
if let Ok(Some(Cmd::Stmt(stmt))) = parser.next_cmd() {
match stmt {
Stmt::CreateMaterializedView { .. } => {
// Store materialized view info for later creation
// We'll handle reuse logic and create the actual IncrementalView
// in a later pass when we have both the main root page and DBSP state root
materialized_view_info
.insert(view_name.clone(), (sql.to_string(), root_page));
// Mark the existing view for potential reuse
if self.incremental_views.contains_key(&view_name) {
// We'll check for reuse in the third pass
}
}
Stmt::CreateView {
view_name: _,
columns: column_names,
select,
..
} => {
// Extract actual columns from the SELECT statement
let view_column_schema =
crate::util::extract_view_columns(&select, self)?;
// If column names were provided in CREATE VIEW (col1, col2, ...),
// use them to rename the columns
let mut final_columns = view_column_schema.flat_columns();
for (i, indexed_col) in column_names.iter().enumerate() {
if let Some(col) = final_columns.get_mut(i) {
col.name = Some(indexed_col.col_name.to_string());
}
}
// Create regular view
let view =
View::new(name.to_string(), sql.to_string(), select, final_columns);
self.add_view(view)?;
}
_ => {}
}
}
}
"trigger" => {
use turso_parser::ast::{Cmd, Stmt};
use turso_parser::parser::Parser;
let sql = maybe_sql.expect("sql should be present for trigger");
let trigger_name = name.to_string();
let mut parser = Parser::new(sql.as_bytes());
let Ok(Some(Cmd::Stmt(Stmt::CreateTrigger {
temporary,
if_not_exists: _,
trigger_name: _,
time,
event,
tbl_name,
for_each_row,
when_clause,
commands,
}))) = parser.next_cmd()
else {
return Err(crate::LimboError::ParseError(format!(
"invalid trigger sql: {sql}"
)));
};
self.add_trigger(
Trigger::new(
trigger_name.clone(),
sql.to_string(),
tbl_name.name.to_string(),
time,
event,
for_each_row,
when_clause.map(|e| *e),
commands,
temporary,
),
tbl_name.name.as_str(),
)?;
}
_ => {}
};
Ok(())
}
/// Compute all resolved FKs *referencing* `table_name` (arg: `table_name` is the parent).
/// Each item contains the child table, normalized columns/positions, and the parent lookup
/// strategy (rowid vs. UNIQUE index or PK).
pub fn resolved_fks_referencing(&self, table_name: &str) -> Result<Vec<ResolvedFkRef>> {
let fk_mismatch_err = |child: &str, parent: &str| -> crate::LimboError {
crate::LimboError::Constraint(format!(
"foreign key mismatch - \"{child}\" referencing \"{parent}\""
))
};
let target = normalize_ident(table_name);
let mut out = Vec::with_capacity(4); // arbitrary estimate
let parent_tbl = self
.get_btree_table(&target)
.ok_or_else(|| fk_mismatch_err("<unknown>", &target))?;
// Precompute helper to find parent unique index, if it's not the rowid
let find_parent_unique = |cols: &Vec<String>| -> Option<Arc<Index>> {
self.get_indices(&parent_tbl.name)
.find(|idx| {
idx.unique
&& idx.columns.len() == cols.len()
&& idx
.columns
.iter()
.zip(cols.iter())
.all(|(ic, pc)| ic.name.eq_ignore_ascii_case(pc))
})
.cloned()
};
for t in self.tables.values() {
let Some(child) = t.btree() else {
continue;
};
for fk in &child.foreign_keys {
if !fk.parent_table.eq_ignore_ascii_case(&target) {
continue;
}
if fk.child_columns.is_empty() {
// SQLite requires an explicit child column list unless the table has a single-column PK that
return Err(fk_mismatch_err(&child.name, &parent_tbl.name));
}
let child_cols: Vec<String> = fk.child_columns.clone();
let mut child_pos = Vec::with_capacity(child_cols.len());
for cname in &child_cols {
let (i, _) = child
.get_column(cname)
.ok_or_else(|| fk_mismatch_err(&child.name, &parent_tbl.name))?;
child_pos.push(i);
}
let parent_cols: Vec<String> = if fk.parent_columns.is_empty() {
if !parent_tbl.primary_key_columns.is_empty() {
parent_tbl
.primary_key_columns
.iter()
.map(|(col, _)| col)
.cloned()
.collect()
} else {
return Err(fk_mismatch_err(&child.name, &parent_tbl.name));
}
} else {
fk.parent_columns.clone()
};
// Same length required
if parent_cols.len() != child_cols.len() {
return Err(fk_mismatch_err(&child.name, &parent_tbl.name));
}
let mut parent_pos = Vec::with_capacity(parent_cols.len());
for pc in &parent_cols {
let pos = parent_tbl.get_column(pc).map(|(i, _)| i).or_else(|| {
ROWID_STRS
.iter()
.any(|s| pc.eq_ignore_ascii_case(s))
.then_some(0)
});
let Some(p) = pos else {
return Err(fk_mismatch_err(&child.name, &parent_tbl.name));
};
parent_pos.push(p);
}
// Determine if parent key is ROWID/alias
let parent_uses_rowid = parent_tbl.primary_key_columns.len().eq(&1) && {
if parent_tbl.primary_key_columns.len() == 1 {
let pk_name = &parent_tbl.primary_key_columns[0].0;
// rowid or alias INTEGER PRIMARY KEY; either is ok implicitly
parent_tbl.columns.iter().any(|c| {
c.is_rowid_alias()
&& c.name
.as_deref()
.is_some_and(|n| n.eq_ignore_ascii_case(pk_name))
}) || ROWID_STRS.iter().any(|&r| r.eq_ignore_ascii_case(pk_name))
} else {
false
}
};
// If not rowid, there must be a non-partial UNIQUE exactly on parent_cols
let parent_unique_index = if parent_uses_rowid {
None
} else {
find_parent_unique(&parent_cols)
};
fk.validate()?;
out.push(ResolvedFkRef {
child_table: Arc::clone(&child),
fk: Arc::clone(fk),
child_cols,
child_pos,
parent_pos,
parent_uses_rowid,
parent_unique_index,
});
}
}
Ok(out)
}
/// Compute all resolved FKs *declared by* `child_table`
pub fn resolved_fks_for_child(&self, child_table: &str) -> crate::Result<Vec<ResolvedFkRef>> {
let fk_mismatch_err = |child: &str, parent: &str| -> crate::LimboError {
crate::LimboError::Constraint(format!(
"foreign key mismatch - \"{child}\" referencing \"{parent}\""
))
};
let child_name = normalize_ident(child_table);
let child = self
.get_btree_table(&child_name)
.ok_or_else(|| fk_mismatch_err(&child_name, "<unknown>"))?;
let mut out = Vec::with_capacity(child.foreign_keys.len());
for fk in &child.foreign_keys {
let parent_name = normalize_ident(&fk.parent_table);
let parent_tbl = self
.get_btree_table(&parent_name)
.ok_or_else(|| fk_mismatch_err(&child.name, &parent_name))?;
let child_cols: Vec<String> = fk.child_columns.clone();
if child_cols.is_empty() {
return Err(fk_mismatch_err(&child.name, &parent_tbl.name));
}
// Child positions exist
let mut child_pos = Vec::with_capacity(child_cols.len());
for cname in &child_cols {
let (i, _) = child
.get_column(cname)
.ok_or_else(|| fk_mismatch_err(&child.name, &parent_tbl.name))?;
child_pos.push(i);
}
let parent_cols: Vec<String> = if fk.parent_columns.is_empty() {
if !parent_tbl.primary_key_columns.is_empty() {
parent_tbl
.primary_key_columns
.iter()
.map(|(col, _)| col)
.cloned()
.collect()
} else {
return Err(fk_mismatch_err(&child.name, &parent_tbl.name));
}
} else {
fk.parent_columns.clone()
};
if parent_cols.len() != child_cols.len() {
return Err(fk_mismatch_err(&child.name, &parent_tbl.name));
}
// Parent positions exist, or rowid sentinel
let mut parent_pos = Vec::with_capacity(parent_cols.len());
for pc in &parent_cols {
let pos = parent_tbl.get_column(pc).map(|(i, _)| i).or_else(|| {
ROWID_STRS
.iter()
.any(|&r| r.eq_ignore_ascii_case(pc))
.then_some(0)
});
let Some(p) = pos else {
return Err(fk_mismatch_err(&child.name, &parent_tbl.name));
};
parent_pos.push(p);
}
let parent_uses_rowid = parent_cols.len().eq(&1) && {
let c = parent_cols[0].as_str();
ROWID_STRS.iter().any(|&r| r.eq_ignore_ascii_case(c))
|| parent_tbl.columns.iter().any(|col| {
col.is_rowid_alias()
&& col
.name
.as_deref()
.is_some_and(|n| n.eq_ignore_ascii_case(c))
})
};
// Must be PK or a non-partial UNIQUE on exactly those columns.
let parent_unique_index = if parent_uses_rowid {
None
} else {
self.get_indices(&parent_tbl.name)
.find(|idx| {
idx.unique
&& idx.where_clause.is_none()
&& idx.columns.len() == parent_cols.len()
&& idx
.columns
.iter()
.zip(parent_cols.iter())
.all(|(ic, pc)| ic.name.eq_ignore_ascii_case(pc))
})
.cloned()
.ok_or_else(|| fk_mismatch_err(&child.name, &parent_tbl.name))?
.into()
};
fk.validate()?;
out.push(ResolvedFkRef {
child_table: Arc::clone(&child),
fk: Arc::clone(fk),
child_cols,
child_pos,
parent_pos,
parent_uses_rowid,
parent_unique_index,
});
}
Ok(out)
}
/// Returns if any table declares a FOREIGN KEY whose parent is `table_name`.
pub fn any_resolved_fks_referencing(&self, table_name: &str) -> bool {
self.tables.values().any(|t| {
let Some(bt) = t.btree() else {
return false;
};
bt.foreign_keys
.iter()
.any(|fk| fk.parent_table == table_name)
})
}
/// Returns true if `table_name` declares any FOREIGN KEYs
pub fn has_child_fks(&self, table_name: &str) -> bool {
self.get_table(table_name)
.and_then(|t| t.btree())
.is_some_and(|t| !t.foreign_keys.is_empty())
}
fn check_object_name_conflict(&self, name: &str) -> Result<()> {
let normalized_name = normalize_ident(name);
if self.tables.contains_key(&normalized_name) {
return Err(crate::LimboError::ParseError(
["table \"", name, "\" already exists"].concat().to_string(),
));
}
if self.views.contains_key(&normalized_name) {
return Err(crate::LimboError::ParseError(
["view \"", name, "\" already exists"].concat().to_string(),
));
}
for index_list in self.indexes.values() {
if index_list.iter().any(|i| i.name.eq_ignore_ascii_case(name)) {
return Err(crate::LimboError::ParseError(
["index \"", name, "\" already exists"].concat().to_string(),
));
}
}
Ok(())
}
}
impl Clone for Schema {
/// Cloning a `Schema` requires deep cloning of all internal tables and indexes, even though they are wrapped in `Arc`.
/// Simply copying the `Arc` pointers would result in multiple `Schema` instances sharing the same underlying tables and indexes,
/// which could lead to panics or data races if any instance attempts to modify them.
/// To ensure each `Schema` is independent and safe to modify, we clone the underlying data for all tables and indexes.
fn clone(&self) -> Self {
let tables = self
.tables
.iter()
.map(|(name, table)| match table.deref() {
Table::BTree(table) => {
let table = Arc::deref(table);
(
name.clone(),
Arc::new(Table::BTree(Arc::new(table.clone()))),
)
}
Table::Virtual(table) => {
let table = Arc::deref(table);
(
name.clone(),
Arc::new(Table::Virtual(Arc::new(table.clone()))),
)
}
Table::FromClauseSubquery(from_clause_subquery) => (
name.clone(),
Arc::new(Table::FromClauseSubquery(from_clause_subquery.clone())),
),
})
.collect();
let indexes = self
.indexes
.iter()
.map(|(name, indexes)| {
let indexes = indexes
.iter()
.map(|index| Arc::new((**index).clone()))
.collect();
(name.clone(), indexes)
})
.collect();
let materialized_view_names = self.materialized_view_names.clone();
let materialized_view_sql = self.materialized_view_sql.clone();
let incremental_views = self
.incremental_views
.iter()
.map(|(name, view)| (name.clone(), view.clone()))
.collect();
let views = self
.views
.iter()
.map(|(name, view)| (name.clone(), Arc::new((**view).clone())))
.collect();
let triggers = self
.triggers
.iter()
.map(|(table_name, triggers)| {
(
table_name.clone(),
triggers.iter().map(|t| Arc::new((**t).clone())).collect(),
)
})
.collect();
let incompatible_views = self.incompatible_views.clone();
Self {
tables,
materialized_view_names,
materialized_view_sql,
incremental_views,
views,
triggers,
indexes,
has_indexes: self.has_indexes.clone(),
indexes_enabled: self.indexes_enabled,
schema_version: self.schema_version,
table_to_materialized_views: self.table_to_materialized_views.clone(),
incompatible_views,
}
}
}
#[derive(Clone, Debug)]
pub enum Table {
BTree(Arc<BTreeTable>),
Virtual(Arc<VirtualTable>),
FromClauseSubquery(FromClauseSubquery),
}
impl Table {
pub fn get_root_page(&self) -> i64 {
match self {
Table::BTree(table) => table.root_page,
Table::Virtual(_) => unimplemented!(),
Table::FromClauseSubquery(_) => unimplemented!(),
}
}
pub fn get_name(&self) -> &str {
match self {
Self::BTree(table) => &table.name,
Self::Virtual(table) => &table.name,
Self::FromClauseSubquery(from_clause_subquery) => &from_clause_subquery.name,
}
}
pub fn get_column_at(&self, index: usize) -> Option<&Column> {
match self {
Self::BTree(table) => table.columns.get(index),
Self::Virtual(table) => table.columns.get(index),
Self::FromClauseSubquery(from_clause_subquery) => {
from_clause_subquery.columns.get(index)
}
}
}
/// Returns the column position and column for a given column name.
pub fn get_column_by_name(&self, name: &str) -> Option<(usize, &Column)> {
let name = normalize_ident(name);
match self {
Self::BTree(table) => table.get_column(name.as_str()),
Self::Virtual(table) => table
.columns
.iter()
.enumerate()
.find(|(_, col)| col.name.as_ref() == Some(&name)),
Self::FromClauseSubquery(from_clause_subquery) => from_clause_subquery
.columns
.iter()
.enumerate()
.find(|(_, col)| col.name.as_ref() == Some(&name)),
}
}
pub fn columns(&self) -> &Vec<Column> {
match self {
Self::BTree(table) => &table.columns,
Self::Virtual(table) => &table.columns,
Self::FromClauseSubquery(from_clause_subquery) => &from_clause_subquery.columns,
}
}
pub fn btree(&self) -> Option<Arc<BTreeTable>> {
match self {
Self::BTree(table) => Some(table.clone()),
Self::Virtual(_) => None,
Self::FromClauseSubquery(_) => None,
}
}
pub fn virtual_table(&self) -> Option<Arc<VirtualTable>> {
match self {
Self::Virtual(table) => Some(table.clone()),
_ => None,
}
}
}
impl PartialEq for Table {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(Self::BTree(a), Self::BTree(b)) => Arc::ptr_eq(a, b),
(Self::Virtual(a), Self::Virtual(b)) => Arc::ptr_eq(a, b),
_ => false,
}
}
}
#[derive(Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct UniqueSet {
pub columns: Vec<(String, SortOrder)>,
pub is_primary_key: bool,
}
#[derive(Clone, Debug)]
pub struct BTreeTable {
pub root_page: i64,
pub name: String,
pub primary_key_columns: Vec<(String, SortOrder)>,
pub columns: Vec<Column>,
pub has_rowid: bool,
pub is_strict: bool,
pub has_autoincrement: bool,
pub unique_sets: Vec<UniqueSet>,
pub foreign_keys: Vec<Arc<ForeignKey>>,
}
impl BTreeTable {
pub fn get_rowid_alias_column(&self) -> Option<(usize, &Column)> {
if self.primary_key_columns.len() == 1 {
let (idx, col) = self.get_column(&self.primary_key_columns[0].0)?;
if col.is_rowid_alias() {
return Some((idx, col));
}
}
None
}
/// Returns the column position and column for a given column name.
/// Returns None if the column name is not found.
/// E.g. if table is CREATE TABLE t (a, b, c)
/// then get_column("b") returns (1, &Column { .. })
pub fn get_column(&self, name: &str) -> Option<(usize, &Column)> {
let name = normalize_ident(name);
self.columns
.iter()
.enumerate()
.find(|(_, column)| column.name.as_ref() == Some(&name))
}
pub fn from_sql(sql: &str, root_page: i64) -> Result<BTreeTable> {
let mut parser = Parser::new(sql.as_bytes());
let cmd = parser.next_cmd()?;
match cmd {
Some(Cmd::Stmt(Stmt::CreateTable { tbl_name, body, .. })) => {
create_table(tbl_name.name.as_str(), &body, root_page)
}
_ => unreachable!("Expected CREATE TABLE statement"),
}
}
/// Reconstruct the SQL for the table.
/// FIXME: this makes us incompatible with SQLite since sqlite stores the user-provided SQL as is in
/// `sqlite_schema.sql`
/// For example, if a user creates a table like: `CREATE TABLE t (x)`, we store it as
/// `CREATE TABLE t (x)`, whereas sqlite stores it with the original extra whitespace.
pub fn to_sql(&self) -> String {
let mut sql = format!("CREATE TABLE {} (", self.name);
let needs_pk_inline = self.primary_key_columns.len() == 1;
// Add columns
for (i, column) in self.columns.iter().enumerate() {
if i > 0 {
sql.push_str(", ");
}
// we need to wrap the column name in square brackets if it contains special characters
let column_name = column.name.as_ref().expect("column name is None");
if identifier_contains_special_chars(column_name) {
sql.push('[');
sql.push_str(column_name);
sql.push(']');
} else {
sql.push_str(column_name);
}
if !column.ty_str.is_empty() {
sql.push(' ');
sql.push_str(&column.ty_str);
}
if column.notnull() {
sql.push_str(" NOT NULL");
}
if column.unique() {
sql.push_str(" UNIQUE");
}
if needs_pk_inline && column.primary_key() {
sql.push_str(" PRIMARY KEY");
}
if let Some(default) = &column.default {
sql.push_str(" DEFAULT ");
sql.push_str(&default.to_string());
}
}
let has_table_pk = !self.primary_key_columns.is_empty();
// Add table-level PRIMARY KEY constraint if exists
if !needs_pk_inline && has_table_pk {
sql.push_str(", PRIMARY KEY (");
for (i, col) in self.primary_key_columns.iter().enumerate() {
if i > 0 {
sql.push_str(", ");
}
sql.push_str(&col.0);
}
sql.push(')');
}
for fk in &self.foreign_keys {
sql.push_str(", FOREIGN KEY (");
for (i, col) in fk.child_columns.iter().enumerate() {
if i > 0 {
sql.push_str(", ");
}
sql.push_str(col);
}
sql.push_str(") REFERENCES ");
sql.push_str(&fk.parent_table);
sql.push('(');
for (i, col) in fk.parent_columns.iter().enumerate() {
if i > 0 {
sql.push_str(", ");
}
sql.push_str(col);
}
sql.push(')');
// Add ON DELETE/UPDATE actions, NoAction is default so just make empty in that case
if fk.on_delete != RefAct::NoAction {
sql.push_str(" ON DELETE ");
sql.push_str(match fk.on_delete {
RefAct::SetNull => "SET NULL",
RefAct::SetDefault => "SET DEFAULT",
RefAct::Cascade => "CASCADE",
RefAct::Restrict => "RESTRICT",
_ => "",
});
}
if fk.on_update != RefAct::NoAction {
sql.push_str(" ON UPDATE ");
sql.push_str(match fk.on_update {
RefAct::SetNull => "SET NULL",
RefAct::SetDefault => "SET DEFAULT",
RefAct::Cascade => "CASCADE",
RefAct::Restrict => "RESTRICT",
_ => "",
});
}
if fk.deferred {
sql.push_str(" DEFERRABLE INITIALLY DEFERRED");
}
}
sql.push(')');
sql
}
pub fn column_collations(&self) -> Vec<CollationSeq> {
self.columns
.iter()
.map(|column| column.collation())
.collect()
}
}
fn identifier_contains_special_chars(name: &str) -> bool {
name.chars().any(|c| !c.is_ascii_alphanumeric() && c != '_')
}
#[derive(Debug, Default, Clone, Copy)]
pub struct PseudoCursorType {
pub column_count: usize,
}
impl PseudoCursorType {
pub fn new() -> Self {
Self { column_count: 0 }
}
pub fn new_with_columns(columns: impl AsRef<[Column]>) -> Self {
Self {
column_count: columns.as_ref().len(),
}
}
}
/// A derived table from a FROM clause subquery.
#[derive(Debug, Clone)]
pub struct FromClauseSubquery {
/// The name of the derived table; uses the alias if available.
pub name: String,
/// The query plan for the derived table.
pub plan: Box<SelectPlan>,
/// The columns of the derived table.
pub columns: Vec<Column>,
/// The start register for the result columns of the derived table;
/// must be set before data is read from it.
pub result_columns_start_reg: Option<usize>,
}
pub fn create_table(tbl_name: &str, body: &CreateTableBody, root_page: i64) -> Result<BTreeTable> {
let table_name = normalize_ident(tbl_name);
trace!("Creating table {}", table_name);
let mut has_rowid = true;
let mut has_autoincrement = false;
let mut primary_key_columns = vec![];
let mut foreign_keys = vec![];
let mut cols = vec![];
let is_strict: bool;
let mut unique_sets: Vec<UniqueSet> = vec![];
match body {
CreateTableBody::ColumnsAndConstraints {
columns,
constraints,
options,
} => {
is_strict = options.contains(TableOptions::STRICT);
for c in constraints {
if let ast::TableConstraint::PrimaryKey {
columns,
auto_increment,
..
} = &c.constraint
{
if !primary_key_columns.is_empty() {
crate::bail_parse_error!(
"table \"{}\" has more than one primary key",
tbl_name
);
}
if *auto_increment {
has_autoincrement = true;
}
for column in columns {
let col_name = match column.expr.as_ref() {
Expr::Id(id) => normalize_ident(id.as_str()),
Expr::Literal(Literal::String(value)) => {
value.trim_matches('\'').to_owned()
}
expr => {
bail_parse_error!("unsupported primary key expression: {}", expr)
}
};
primary_key_columns
.push((col_name, column.order.unwrap_or(SortOrder::Asc)));
}
unique_sets.push(UniqueSet {
columns: primary_key_columns.clone(),
is_primary_key: true,
});
} else if let ast::TableConstraint::Unique {
columns,
conflict_clause,
} = &c.constraint
{
if conflict_clause.is_some() {
unimplemented!("ON CONFLICT not implemented");
}
let mut unique_columns = Vec::with_capacity(columns.len());
for column in columns {
match column.expr.as_ref() {
Expr::Id(id) => unique_columns.push((
id.as_str().to_string(),
column.order.unwrap_or(SortOrder::Asc),
)),
Expr::Literal(Literal::String(value)) => unique_columns.push((
value.trim_matches('\'').to_owned(),
column.order.unwrap_or(SortOrder::Asc),
)),
expr => {
bail_parse_error!("unsupported unique key expression: {}", expr)
}
}
}
let unique_set = UniqueSet {
columns: unique_columns,
is_primary_key: false,
};
unique_sets.push(unique_set);
} else if let ast::TableConstraint::ForeignKey {
columns,
clause,
defer_clause,
} = &c.constraint
{
let child_columns: Vec<String> = columns
.iter()
.map(|ic| normalize_ident(ic.col_name.as_str()))
.collect();
// derive parent columns: explicit or default to parent PK
let parent_table = normalize_ident(clause.tbl_name.as_str());
let parent_columns: Vec<String> = clause
.columns
.iter()
.map(|ic| normalize_ident(ic.col_name.as_str()))
.collect();
// Only check arity if parent columns were explicitly listed
if !parent_columns.is_empty() && child_columns.len() != parent_columns.len() {
crate::bail_parse_error!(
"foreign key on \"{}\" has {} child column(s) but {} parent column(s)",
tbl_name,
child_columns.len(),
parent_columns.len()
);
}
// deferrable semantics
let deferred = match defer_clause {
Some(d) => {
d.deferrable
&& matches!(
d.init_deferred,
Some(InitDeferredPred::InitiallyDeferred)
)
}
None => false, // NOT DEFERRABLE INITIALLY IMMEDIATE by default
};
let fk = ForeignKey {
parent_table,
parent_columns,
child_columns,
on_delete: clause
.args
.iter()
.find_map(|a| {
if let ast::RefArg::OnDelete(x) = a {
Some(*x)
} else {
None
}
})
.unwrap_or(RefAct::NoAction),
on_update: clause
.args
.iter()
.find_map(|a| {
if let ast::RefArg::OnUpdate(x) = a {
Some(*x)
} else {
None
}
})
.unwrap_or(RefAct::NoAction),
deferred,
};
foreign_keys.push(Arc::new(fk));
}
}
// Due to a bug in SQLite, this check is needed to maintain backwards compatibility with rowid alias
// SQLite docs: https://sqlite.org/lang_createtable.html#rowids_and_the_integer_primary_key
// Issue: https://github.com/tursodatabase/turso/issues/3665
let mut primary_key_desc_columns_constraint = false;
for ast::ColumnDefinition {
col_name,
col_type,
constraints,
} in columns
{
let name = col_name.as_str().to_string();
// Regular sqlite tables have an integer rowid that uniquely identifies a row.
// Even if you create a table with a column e.g. 'id INT PRIMARY KEY', there will still
// be a separate hidden rowid, and the 'id' column will have a separate index built for it.
//
// However:
// A column defined as exactly INTEGER PRIMARY KEY is a rowid alias, meaning that the rowid
// and the value of this column are the same.
// https://www.sqlite.org/lang_createtable.html#rowids_and_the_integer_primary_key
let ty_str = col_type
.as_ref()
.cloned()
.map(|ast::Type { name, .. }| name.clone())
.unwrap_or_default();
let mut typename_exactly_integer = false;
let ty = match col_type {
Some(data_type) => {
let (ty, ei) = type_from_name(&data_type.name);
typename_exactly_integer = ei;
ty
}
None => Type::Null,
};
let mut default = None;
let mut primary_key = false;
let mut notnull = false;
let mut order = SortOrder::Asc;
let mut unique = false;
let mut collation = None;
for c_def in constraints {
match &c_def.constraint {
ast::ColumnConstraint::Check { .. } => {
crate::bail_parse_error!("CHECK constraints are not yet supported");
}
ast::ColumnConstraint::Generated { .. } => {
crate::bail_parse_error!("GENERATED columns are not yet supported");
}
ast::ColumnConstraint::PrimaryKey {
order: o,
auto_increment,
conflict_clause,
..
} => {
if conflict_clause.is_some() {
crate::bail_parse_error!(
"ON CONFLICT not implemented for column definition"
);
}
if !primary_key_columns.is_empty() {
crate::bail_parse_error!(
"table \"{}\" has more than one primary key",
tbl_name
);
}
primary_key = true;
if *auto_increment {
has_autoincrement = true;
}
if let Some(o) = o {
order = *o;
}
unique_sets.push(UniqueSet {
columns: vec![(name.clone(), order)],
is_primary_key: true,
});
}
ast::ColumnConstraint::NotNull {
nullable,
conflict_clause,
..
} => {
if conflict_clause.is_some() {
crate::bail_parse_error!(
"ON CONFLICT not implemented for column definition"
);
}
notnull = !nullable;
}
ast::ColumnConstraint::Default(ref expr) => {
default = Some(
translate_ident_to_string_literal(expr).unwrap_or(expr.clone()),
);
}
// TODO: for now we don't check Resolve type of unique
ast::ColumnConstraint::Unique(conflict) => {
if conflict.is_some() {
crate::bail_parse_error!(
"ON CONFLICT not implemented for column definition"
);
}
unique = true;
unique_sets.push(UniqueSet {
columns: vec![(name.clone(), order)],
is_primary_key: false,
});
}
ast::ColumnConstraint::Collate { ref collation_name } => {
collation = Some(CollationSeq::new(collation_name.as_str())?);
}
ast::ColumnConstraint::ForeignKey {
clause,
defer_clause,
} => {
let fk = ForeignKey {
parent_table: normalize_ident(clause.tbl_name.as_str()),
parent_columns: clause
.columns
.iter()
.map(|c| normalize_ident(c.col_name.as_str()))
.collect(),
on_delete: clause
.args
.iter()
.find_map(|arg| {
if let ast::RefArg::OnDelete(act) = arg {
Some(*act)
} else {
None
}
})
.unwrap_or(RefAct::NoAction),
on_update: clause
.args
.iter()
.find_map(|arg| {
if let ast::RefArg::OnUpdate(act) = arg {
Some(*act)
} else {
None
}
})
.unwrap_or(RefAct::NoAction),
child_columns: vec![name.clone()],
deferred: match defer_clause {
Some(d) => {
d.deferrable
&& matches!(
d.init_deferred,
Some(InitDeferredPred::InitiallyDeferred)
)
}
None => false,
},
};
foreign_keys.push(Arc::new(fk));
}
}
}
if primary_key {
primary_key_columns.push((name.clone(), order));
if order == SortOrder::Desc {
primary_key_desc_columns_constraint = true;
}
} else if primary_key_columns
.iter()
.any(|(col_name, _)| col_name == &name)
{
primary_key = true;
}
cols.push(Column::new(
Some(normalize_ident(&name)),
ty_str,
default,
ty,
collation,
ColDef {
primary_key,
rowid_alias: typename_exactly_integer
&& primary_key
&& !primary_key_desc_columns_constraint,
notnull,
unique,
hidden: false,
},
));
}
if options.contains(TableOptions::WITHOUT_ROWID) {
has_rowid = false;
}
}
CreateTableBody::AsSelect(_) => todo!(),
};
// flip is_rowid_alias back to false if the table has multiple primary key columns
// or if the table has no rowid
if !has_rowid || primary_key_columns.len() > 1 {
for col in cols.iter_mut() {
col.set_rowid_alias(false);
}
}
if has_autoincrement {
// only allow integers
if primary_key_columns.len() != 1 {
crate::bail_parse_error!("AUTOINCREMENT is only allowed on an INTEGER PRIMARY KEY");
}
let pk_col_name = &primary_key_columns[0].0;
let pk_col = cols.iter().find(|c| c.name.as_deref() == Some(pk_col_name));
if let Some(col) = pk_col {
if col.ty() != Type::Integer {
crate::bail_parse_error!("AUTOINCREMENT is only allowed on an INTEGER PRIMARY KEY");
}
}
}
for col in cols.iter() {
if col.is_rowid_alias() {
// Unique sets are used for creating automatic indexes. An index is not created for a rowid alias PRIMARY KEY.
// However, an index IS created for a rowid alias UNIQUE, e.g. CREATE TABLE t(x INTEGER PRIMARY KEY, UNIQUE(x))
let unique_set_w_only_rowid_alias = unique_sets.iter().position(|us| {
us.is_primary_key
&& us.columns.len() == 1
&& &us.columns.first().unwrap().0 == col.name.as_ref().unwrap()
});
if let Some(u) = unique_set_w_only_rowid_alias {
unique_sets.remove(u);
}
}
}
Ok(BTreeTable {
root_page,
name: table_name,
has_rowid,
primary_key_columns,
has_autoincrement,
columns: cols,
is_strict,
foreign_keys,
unique_sets: {
// If there are any unique sets that have identical column names in the same order (even if they are PRIMARY KEY and UNIQUE and have different sort orders), remove the duplicates.
// Examples:
// PRIMARY KEY (a, b) and UNIQUE (a desc, b) are the same
// PRIMARY KEY (a, b) and UNIQUE (b, a) are not the same
// Using a n^2 monkey algorithm here because n is small, CPUs are fast, life is short, and most importantly:
// we want to preserve the order of the sets -- automatic index names in sqlite_schema must be in definition order.
let mut i = 0;
while i < unique_sets.len() {
let mut j = i + 1;
while j < unique_sets.len() {
let lengths_equal =
unique_sets[i].columns.len() == unique_sets[j].columns.len();
if lengths_equal
&& unique_sets[i]
.columns
.iter()
.zip(unique_sets[j].columns.iter())
.all(|((a_name, _), (b_name, _))| a_name == b_name)
{
unique_sets.remove(j);
} else {
j += 1;
}
}
i += 1;
}
unique_sets
},
})
}
pub fn translate_ident_to_string_literal(expr: &Expr) -> Option<Box<Expr>> {
match expr {
Expr::Name(name) => Some(Box::new(Expr::Literal(Literal::String(name.as_literal())))),
_ => None,
}
}
pub fn _build_pseudo_table(columns: &[ResultColumn]) -> PseudoCursorType {
let table = PseudoCursorType::new();
for column in columns {
match column {
ResultColumn::Expr(expr, _as_name) => {
todo!("unsupported expression {:?}", expr);
}
ResultColumn::Star => {
todo!();
}
ResultColumn::TableStar(_) => {
todo!();
}
}
}
table
}
#[derive(Debug, Clone)]
pub struct ForeignKey {
/// Columns in this table (child side)
pub child_columns: Vec<String>,
/// Referenced (parent) table
pub parent_table: String,
/// Parent-side referenced columns
pub parent_columns: Vec<String>,
pub on_delete: RefAct,
pub on_update: RefAct,
/// DEFERRABLE INITIALLY DEFERRED
pub deferred: bool,
}
impl ForeignKey {
fn validate(&self) -> Result<()> {
// TODO: remove this when actions are implemented
if !(matches!(self.on_update, RefAct::NoAction)
&& matches!(self.on_delete, RefAct::NoAction))
{
crate::bail_parse_error!(
"foreign key actions other than NO ACTION are not implemented"
);
}
if self
.parent_columns
.iter()
.any(|c| ROWID_STRS.iter().any(|&r| r.eq_ignore_ascii_case(c)))
{
return Err(crate::LimboError::Constraint(format!(
"foreign key mismatch referencing \"{}\"",
self.parent_table
)));
}
Ok(())
}
}
/// A single resolved foreign key where `parent_table == target`.
#[derive(Clone, Debug)]
pub struct ResolvedFkRef {
/// Child table that owns the FK.
pub child_table: Arc<BTreeTable>,
/// The FK as declared on the child table.
pub fk: Arc<ForeignKey>,
/// Resolved, normalized column names.
pub child_cols: Vec<String>,
/// Column positions in the child/parent tables (pos_in_table)
pub child_pos: Vec<usize>,
pub parent_pos: Vec<usize>,
/// If the parent key is rowid or a rowid-alias (single-column only)
pub parent_uses_rowid: bool,
/// For non-rowid parents: the UNIQUE index that enforces the parent key.
/// (None when `parent_uses_rowid == true`.)
pub parent_unique_index: Option<Arc<Index>>,
}
impl ResolvedFkRef {
/// Returns if any referenced parent column can change when these column positions are updated.
pub fn parent_key_may_change(
&self,
updated_parent_positions: &HashSet<usize>,
parent_tbl: &BTreeTable,
) -> bool {
if self.parent_uses_rowid {
// parent rowid changes if the parent's rowid or alias is updated
if let Some((idx, _)) = parent_tbl
.columns
.iter()
.enumerate()
.find(|(_, c)| c.is_rowid_alias())
{
return updated_parent_positions.contains(&idx);
}
// Without a rowid alias, a direct rowid update is represented separately with ROWID_SENTINEL
return true;
}
self.parent_pos
.iter()
.any(|p| updated_parent_positions.contains(p))
}
/// Returns if any child column of this FK is in `updated_child_positions`
pub fn child_key_changed(
&self,
updated_child_positions: &HashSet<usize>,
child_tbl: &BTreeTable,
) -> bool {
if self
.child_pos
.iter()
.any(|p| updated_child_positions.contains(p))
{
return true;
}
// special case: if FK uses a rowid alias on child, and rowid changed
if self.child_cols.len() == 1 {
let (i, col) = child_tbl.get_column(&self.child_cols[0]).unwrap();
if col.is_rowid_alias() && updated_child_positions.contains(&i) {
return true;
}
}
false
}
}
#[derive(Debug, Clone)]
pub struct Column {
pub name: Option<String>,
pub ty_str: String,
pub default: Option<Box<Expr>>,
raw: u16,
}
#[derive(Default)]
pub struct ColDef {
pub primary_key: bool,
pub rowid_alias: bool,
pub notnull: bool,
pub unique: bool,
pub hidden: bool,
}
// flags
const F_PRIMARY_KEY: u16 = 1;
const F_ROWID_ALIAS: u16 = 2;
const F_NOTNULL: u16 = 4;
const F_UNIQUE: u16 = 8;
const F_HIDDEN: u16 = 16;
// pack Type and Collation in the remaining bits
const TYPE_SHIFT: u16 = 5;
const TYPE_MASK: u16 = 0b111 << TYPE_SHIFT;
const COLL_SHIFT: u16 = TYPE_SHIFT + 3;
const COLL_MASK: u16 = 0b11 << COLL_SHIFT;
impl Column {
pub fn affinity(&self) -> Affinity {
Affinity::affinity(&self.ty_str)
}
pub fn new_default_text(
name: Option<String>,
ty_str: String,
default: Option<Box<Expr>>,
) -> Self {
Self::new(name, ty_str, default, Type::Text, None, ColDef::default())
}
pub fn new_default_integer(
name: Option<String>,
ty_str: String,
default: Option<Box<Expr>>,
) -> Self {
Self::new(
name,
ty_str,
default,
Type::Integer,
None,
ColDef::default(),
)
}
#[inline]
pub const fn new(
name: Option<String>,
ty_str: String,
default: Option<Box<Expr>>,
ty: Type,
col: Option<CollationSeq>,
coldef: ColDef,
) -> Self {
let mut raw = 0u16;
raw |= (ty as u16) << TYPE_SHIFT;
if let Some(c) = col {
raw |= (c as u16) << COLL_SHIFT;
}
if coldef.primary_key {
raw |= F_PRIMARY_KEY
}
if coldef.rowid_alias {
raw |= F_ROWID_ALIAS
}
if coldef.notnull {
raw |= F_NOTNULL
}
if coldef.unique {
raw |= F_UNIQUE
}
if coldef.hidden {
raw |= F_HIDDEN
}
Self {
name,
ty_str,
default,
raw,
}
}
#[inline]
pub const fn ty(&self) -> Type {
let v = ((self.raw & TYPE_MASK) >> TYPE_SHIFT) as u8;
Type::from_bits(v)
}
#[inline]
pub const fn set_ty(&mut self, ty: Type) {
self.raw = (self.raw & !TYPE_MASK) | (((ty as u16) << TYPE_SHIFT) & TYPE_MASK);
}
#[inline]
pub const fn collation_opt(&self) -> Option<CollationSeq> {
if self.has_explicit_collation() {
Some(self.collation())
} else {
None
}
}
#[inline]
pub const fn collation(&self) -> CollationSeq {
let v = ((self.raw & COLL_MASK) >> COLL_SHIFT) as u8;
CollationSeq::from_bits(v)
}
#[inline]
pub const fn has_explicit_collation(&self) -> bool {
let v = ((self.raw & COLL_MASK) >> COLL_SHIFT) as u8;
v != CollationSeq::Unset as u8
}
#[inline]
pub const fn set_collation(&mut self, c: Option<CollationSeq>) {
if let Some(c) = c {
self.raw = (self.raw & !COLL_MASK) | (((c as u16) << COLL_SHIFT) & COLL_MASK);
}
}
#[inline]
pub fn primary_key(&self) -> bool {
self.raw & F_PRIMARY_KEY != 0
}
#[inline]
pub const fn is_rowid_alias(&self) -> bool {
self.raw & F_ROWID_ALIAS != 0
}
#[inline]
pub const fn notnull(&self) -> bool {
self.raw & F_NOTNULL != 0
}
#[inline]
pub const fn unique(&self) -> bool {
self.raw & F_UNIQUE != 0
}
#[inline]
pub const fn hidden(&self) -> bool {
self.raw & F_HIDDEN != 0
}
#[inline]
pub const fn set_primary_key(&mut self, v: bool) {
self.set_flag(F_PRIMARY_KEY, v);
}
#[inline]
pub const fn set_rowid_alias(&mut self, v: bool) {
self.set_flag(F_ROWID_ALIAS, v);
}
#[inline]
pub const fn set_notnull(&mut self, v: bool) {
self.set_flag(F_NOTNULL, v);
}
#[inline]
pub const fn set_unique(&mut self, v: bool) {
self.set_flag(F_UNIQUE, v);
}
#[inline]
pub const fn set_hidden(&mut self, v: bool) {
self.set_flag(F_HIDDEN, v);
}
#[inline]
const fn set_flag(&mut self, mask: u16, val: bool) {
if val {
self.raw |= mask
} else {
self.raw &= !mask
}
}
}
// TODO: This might replace some of util::columns_from_create_table_body
impl From<&ColumnDefinition> for Column {
fn from(value: &ColumnDefinition) -> Self {
let name = value.col_name.as_str();
let mut default = None;
let mut notnull = false;
let mut primary_key = false;
let mut unique = false;
let mut collation = None;
for ast::NamedColumnConstraint { constraint, .. } in &value.constraints {
match constraint {
ast::ColumnConstraint::PrimaryKey { .. } => primary_key = true,
ast::ColumnConstraint::NotNull { .. } => notnull = true,
ast::ColumnConstraint::Unique(..) => unique = true,
ast::ColumnConstraint::Default(expr) => {
default
.replace(translate_ident_to_string_literal(expr).unwrap_or(expr.clone()));
}
ast::ColumnConstraint::Collate { collation_name } => {
collation.replace(
CollationSeq::new(collation_name.as_str())
.expect("collation should have been set correctly in create table"),
);
}
_ => {}
};
}
let ty = match value.col_type {
Some(ref data_type) => type_from_name(&data_type.name).0,
None => Type::Null,
};
let ty_str = value
.col_type
.as_ref()
.map(|t| t.name.to_string())
.unwrap_or_default();
let hidden = ty_str.contains("HIDDEN");
Column::new(
Some(normalize_ident(name)),
ty_str,
default,
ty,
collation,
ColDef {
primary_key,
rowid_alias: primary_key && matches!(ty, Type::Integer),
notnull,
unique,
hidden,
},
)
}
}
#[repr(u8)]
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum Type {
Null = 0,
Text = 1,
Numeric = 2,
Integer = 3,
Real = 4,
Blob = 5,
}
impl Type {
#[inline]
const fn from_bits(bits: u8) -> Self {
match bits {
0 => Type::Null,
1 => Type::Text,
2 => Type::Numeric,
3 => Type::Integer,
4 => Type::Real,
5 => Type::Blob,
_ => Type::Null,
}
}
}
impl fmt::Display for Type {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let s = match self {
Self::Null => "",
Self::Text => "TEXT",
Self::Numeric => "NUMERIC",
Self::Integer => "INTEGER",
Self::Real => "REAL",
Self::Blob => "BLOB",
};
write!(f, "{s}")
}
}
pub fn sqlite_schema_table() -> BTreeTable {
BTreeTable {
root_page: 1,
name: "sqlite_schema".to_string(),
has_rowid: true,
is_strict: false,
has_autoincrement: false,
primary_key_columns: vec![],
columns: vec![
Column::new_default_text(Some("type".to_string()), "TEXT".to_string(), None),
Column::new_default_text(Some("name".to_string()), "TEXT".to_string(), None),
Column::new_default_text(Some("tbl_name".to_string()), "TEXT".to_string(), None),
Column::new_default_integer(Some("rootpage".to_string()), "INT".to_string(), None),
Column::new_default_text(Some("sql".to_string()), "TEXT".to_string(), None),
],
foreign_keys: vec![],
unique_sets: vec![],
}
}
#[allow(dead_code)]
#[derive(Debug, Clone)]
pub struct Index {
pub name: String,
pub table_name: String,
pub root_page: i64,
pub columns: Vec<IndexColumn>,
pub unique: bool,
pub ephemeral: bool,
/// Does the index have a rowid as the last column?
/// This is the case for btree indexes (persistent or ephemeral) that
/// have been created based on a table with a rowid.
/// For example, WITHOUT ROWID tables (not supported in Limbo yet),
/// and SELECT DISTINCT ephemeral indexes will not have a rowid.
pub has_rowid: bool,
pub where_clause: Option<Box<Expr>>,
pub index_method: Option<Arc<dyn IndexMethodAttachment>>,
}
#[allow(dead_code)]
#[derive(Debug, Clone)]
pub struct IndexColumn {
pub name: String,
pub order: SortOrder,
/// the position of the column in the source table.
/// for example:
/// CREATE TABLE t (a,b,c)
/// CREATE INDEX idx ON t(b)
/// b.pos_in_table == 1
pub pos_in_table: usize,
pub collation: Option<CollationSeq>,
pub default: Option<Box<Expr>>,
}
impl Index {
pub fn from_sql(
syms: &SymbolTable,
sql: &str,
root_page: i64,
table: &BTreeTable,
) -> Result<Index> {
let mut parser = Parser::new(sql.as_bytes());
let cmd = parser.next_cmd()?;
match cmd {
Some(Cmd::Stmt(Stmt::CreateIndex {
idx_name,
tbl_name,
columns,
unique,
where_clause,
using,
with_clause,
..
})) => {
let index_name = normalize_ident(idx_name.name.as_str());
let index_columns = resolve_sorted_columns(table, &columns)?;
if let Some(using) = using {
if where_clause.is_some() {
bail_parse_error!("custom index module do not support partial indices");
}
if unique {
bail_parse_error!("custom index module do not support UNIQUE indices");
}
let parameters = resolve_index_method_parameters(with_clause)?;
let Some(module) = syms.index_methods.get(using.as_str()) else {
bail_parse_error!("unknown module name: '{}'", using);
};
let configuration = IndexMethodConfiguration {
table_name: table.name.clone(),
index_name: index_name.clone(),
columns: index_columns.clone(),
parameters,
};
let descriptor = module.attach(&configuration)?;
Ok(Index {
name: index_name,
table_name: normalize_ident(tbl_name.as_str()),
root_page,
columns: index_columns,
unique: false,
ephemeral: false,
has_rowid: table.has_rowid,
where_clause: None,
index_method: Some(descriptor),
})
} else {
Ok(Index {
name: index_name,
table_name: normalize_ident(tbl_name.as_str()),
root_page,
columns: index_columns,
unique,
ephemeral: false,
has_rowid: table.has_rowid,
where_clause,
index_method: None,
})
}
}
_ => todo!("Expected create index statement"),
}
}
/// check if this is special backing_btree index created and managed by custom index_method
pub fn is_backing_btree_index(&self) -> bool {
self.index_method
.as_ref()
.is_some_and(|x| x.definition().backing_btree)
}
pub fn automatic_from_primary_key(
table: &BTreeTable,
auto_index: (String, i64), // name, root_page
column_count: usize,
) -> Result<Index> {
let has_primary_key_index =
table.get_rowid_alias_column().is_none() && !table.primary_key_columns.is_empty();
assert!(has_primary_key_index);
let (index_name, root_page) = auto_index;
let mut primary_keys = Vec::with_capacity(column_count);
for (col_name, order) in table.primary_key_columns.iter() {
let Some((pos_in_table, _)) = table.get_column(col_name) else {
return Err(crate::LimboError::ParseError(format!(
"Column {} not found in table {}",
col_name, table.name
)));
};
let (_, column) = table.get_column(col_name).unwrap();
primary_keys.push(IndexColumn {
name: normalize_ident(col_name),
order: *order,
pos_in_table,
collation: column.collation_opt(),
default: column.default.clone(),
});
}
assert!(primary_keys.len() == column_count);
Ok(Index {
name: normalize_ident(index_name.as_str()),
table_name: table.name.clone(),
root_page,
columns: primary_keys,
unique: true,
ephemeral: false,
has_rowid: table.has_rowid,
where_clause: None,
index_method: None,
})
}
pub fn automatic_from_unique(
table: &BTreeTable,
auto_index: (String, i64), // name, root_page
column_indices_and_sort_orders: Vec<(usize, SortOrder)>,
) -> Result<Index> {
let (index_name, root_page) = auto_index;
let unique_cols = table
.columns
.iter()
.enumerate()
.filter_map(|(pos_in_table, col)| {
let (pos_in_table, sort_order) = column_indices_and_sort_orders
.iter()
.find(|(pos, _)| *pos == pos_in_table)?;
Some(IndexColumn {
name: normalize_ident(col.name.as_ref().unwrap()),
order: *sort_order,
pos_in_table: *pos_in_table,
collation: col.collation_opt(),
default: col.default.clone(),
})
})
.collect::<Vec<_>>();
Ok(Index {
name: normalize_ident(index_name.as_str()),
table_name: table.name.clone(),
root_page,
columns: unique_cols,
unique: true,
ephemeral: false,
has_rowid: table.has_rowid,
where_clause: None,
index_method: None,
})
}
/// Given a column position in the table, return the position in the index.
/// Returns None if the column is not found in the index.
/// For example, given:
/// CREATE TABLE t (a, b, c)
/// CREATE INDEX idx ON t(b)
/// then column_table_pos_to_index_pos(1) returns Some(0)
pub fn column_table_pos_to_index_pos(&self, table_pos: usize) -> Option<usize> {
self.columns
.iter()
.position(|c| c.pos_in_table == table_pos)
}
/// Walk the where_clause Expr of a partial index and validate that it doesn't reference any other
/// tables or use any disallowed constructs.
pub fn validate_where_expr(&self, table: &Table) -> bool {
let Some(where_clause) = &self.where_clause else {
return true;
};
let tbl_norm = normalize_ident(self.table_name.as_str());
let has_col = |name: &str| {
let n = normalize_ident(name);
table
.columns()
.iter()
.any(|c| c.name.as_ref().is_some_and(|cn| normalize_ident(cn) == n))
};
let is_tbl = |ns: &str| normalize_ident(ns).eq_ignore_ascii_case(&tbl_norm);
let is_deterministic_fn = |name: &str, argc: usize| {
let n = normalize_ident(name);
Func::resolve_function(&n, argc).is_ok_and(|f| f.is_deterministic())
};
let mut ok = true;
let _ = walk_expr(where_clause.as_ref(), &mut |e: &Expr| -> crate::Result<
WalkControl,
> {
if !ok {
return Ok(WalkControl::SkipChildren);
}
match e {
Expr::Literal(_) | Expr::RowId { .. } => {}
// Unqualified identifier: must be a column of the target table or ROWID
Expr::Id(n) => {
let n = n.as_str();
if !ROWID_STRS.iter().any(|s| s.eq_ignore_ascii_case(n)) && !has_col(n) {
ok = false;
}
}
// Qualified: qualifier must match this index's table; column must exist
Expr::Qualified(ns, col) | Expr::DoublyQualified(_, ns, col) => {
if !is_tbl(ns.as_str()) || !has_col(col.as_str()) {
ok = false;
}
}
Expr::FunctionCall {
name, filter_over, ..
}
| Expr::FunctionCallStar {
name, filter_over, ..
} => {
// reject windowed
if filter_over.over_clause.is_some() {
ok = false;
} else {
let argc = match e {
Expr::FunctionCall { args, .. } => args.len(),
Expr::FunctionCallStar { .. } => 0,
_ => unreachable!(),
};
if !is_deterministic_fn(name.as_str(), argc) {
ok = false;
}
}
}
// Explicitly disallowed constructs
Expr::Exists(_)
| Expr::InSelect { .. }
| Expr::Subquery(_)
| Expr::Raise { .. }
| Expr::Variable(_) => {
ok = false;
}
_ => {}
}
Ok(if ok {
WalkControl::Continue
} else {
WalkControl::SkipChildren
})
});
ok
}
pub fn bind_where_expr(
&self,
table_refs: Option<&mut TableReferences>,
connection: &Arc<Connection>,
) -> Option<ast::Expr> {
let Some(where_clause) = &self.where_clause else {
return None;
};
let mut expr = where_clause.clone();
bind_and_rewrite_expr(
&mut expr,
table_refs,
None,
connection,
BindingBehavior::ResultColumnsNotAllowed,
)
.ok()?;
Some(*expr)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
pub fn test_has_rowid_true() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a INTEGER PRIMARY KEY, b TEXT);"#;
let table = BTreeTable::from_sql(sql, 0)?;
assert!(table.has_rowid, "has_rowid should be set to true");
Ok(())
}
#[test]
pub fn test_has_rowid_false() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a INTEGER PRIMARY KEY, b TEXT) WITHOUT ROWID;"#;
let table = BTreeTable::from_sql(sql, 0)?;
assert!(!table.has_rowid, "has_rowid should be set to false");
Ok(())
}
#[test]
pub fn test_column_is_rowid_alias_single_text() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a TEXT PRIMARY KEY, b TEXT);"#;
let table = BTreeTable::from_sql(sql, 0)?;
let column = table.get_column("a").unwrap().1;
assert!(
!column.is_rowid_alias(),
"column 'a´ has type different than INTEGER so can't be a rowid alias"
);
Ok(())
}
#[test]
pub fn test_column_is_rowid_alias_single_integer() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a INTEGER PRIMARY KEY, b TEXT);"#;
let table = BTreeTable::from_sql(sql, 0)?;
let column = table.get_column("a").unwrap().1;
assert!(
column.is_rowid_alias(),
"column 'a´ should be a rowid alias"
);
Ok(())
}
#[test]
pub fn test_column_is_rowid_alias_single_integer_separate_primary_key_definition() -> Result<()>
{
let sql = r#"CREATE TABLE t1 (a INTEGER, b TEXT, PRIMARY KEY(a));"#;
let table = BTreeTable::from_sql(sql, 0)?;
let column = table.get_column("a").unwrap().1;
assert!(
column.is_rowid_alias(),
"column 'a´ should be a rowid alias"
);
Ok(())
}
#[test]
pub fn test_column_is_rowid_alias_single_integer_separate_primary_key_definition_without_rowid(
) -> Result<()> {
let sql = r#"CREATE TABLE t1 (a INTEGER, b TEXT, PRIMARY KEY(a)) WITHOUT ROWID;"#;
let table = BTreeTable::from_sql(sql, 0)?;
let column = table.get_column("a").unwrap().1;
assert!(
!column.is_rowid_alias(),
"column 'a´ shouldn't be a rowid alias because table has no rowid"
);
Ok(())
}
#[test]
pub fn test_column_is_rowid_alias_single_integer_without_rowid() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a INTEGER PRIMARY KEY, b TEXT) WITHOUT ROWID;"#;
let table = BTreeTable::from_sql(sql, 0)?;
let column = table.get_column("a").unwrap().1;
assert!(
!column.is_rowid_alias(),
"column 'a´ shouldn't be a rowid alias because table has no rowid"
);
Ok(())
}
#[test]
pub fn test_multiple_pk_forbidden() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a INTEGER PRIMARY KEY, b TEXT PRIMARY KEY);"#;
let table = BTreeTable::from_sql(sql, 0);
let error = table.unwrap_err();
assert!(
matches!(error, LimboError::ParseError(e) if e.contains("table \"t1\" has more than one primary key"))
);
Ok(())
}
#[test]
pub fn test_column_is_rowid_alias_separate_composite_primary_key_definition() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a INTEGER, b TEXT, PRIMARY KEY(a, b));"#;
let table = BTreeTable::from_sql(sql, 0)?;
let column = table.get_column("a").unwrap().1;
assert!(
!column.is_rowid_alias(),
"column 'a´ shouldn't be a rowid alias because table has composite primary key"
);
Ok(())
}
#[test]
pub fn test_primary_key_inline_single() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a INTEGER PRIMARY KEY, b TEXT, c REAL);"#;
let table = BTreeTable::from_sql(sql, 0)?;
let column = table.get_column("a").unwrap().1;
assert!(column.primary_key(), "column 'a' should be a primary key");
let column = table.get_column("b").unwrap().1;
assert!(
!column.primary_key(),
"column 'b' shouldn't be a primary key"
);
let column = table.get_column("c").unwrap().1;
assert!(
!column.primary_key(),
"column 'c' shouldn't be a primary key"
);
assert_eq!(
vec![("a".to_string(), SortOrder::Asc)],
table.primary_key_columns,
"primary key column names should be ['a']"
);
Ok(())
}
#[test]
pub fn test_primary_key_inline_multiple_forbidden() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a INTEGER PRIMARY KEY, b TEXT PRIMARY KEY, c REAL);"#;
let table = BTreeTable::from_sql(sql, 0);
let error = table.unwrap_err();
assert!(
matches!(error, LimboError::ParseError(e) if e.contains("table \"t1\" has more than one primary key"))
);
Ok(())
}
#[test]
pub fn test_primary_key_separate_single() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a INTEGER, b TEXT, c REAL, PRIMARY KEY(a desc));"#;
let table = BTreeTable::from_sql(sql, 0)?;
let column = table.get_column("a").unwrap().1;
assert!(column.primary_key(), "column 'a' should be a primary key");
let column = table.get_column("b").unwrap().1;
assert!(
!column.primary_key(),
"column 'b' shouldn't be a primary key"
);
let column = table.get_column("c").unwrap().1;
assert!(
!column.primary_key(),
"column 'c' shouldn't be a primary key"
);
assert_eq!(
vec![("a".to_string(), SortOrder::Desc)],
table.primary_key_columns,
"primary key column names should be ['a']"
);
Ok(())
}
#[test]
pub fn test_primary_key_separate_multiple() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a INTEGER, b TEXT, c REAL, PRIMARY KEY(a, b desc));"#;
let table = BTreeTable::from_sql(sql, 0)?;
let column = table.get_column("a").unwrap().1;
assert!(column.primary_key(), "column 'a' should be a primary key");
let column = table.get_column("b").unwrap().1;
assert!(column.primary_key(), "column 'b' shouldn be a primary key");
let column = table.get_column("c").unwrap().1;
assert!(
!column.primary_key(),
"column 'c' shouldn't be a primary key"
);
assert_eq!(
vec![
("a".to_string(), SortOrder::Asc),
("b".to_string(), SortOrder::Desc)
],
table.primary_key_columns,
"primary key column names should be ['a', 'b']"
);
Ok(())
}
#[test]
pub fn test_primary_key_separate_single_quoted() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a INTEGER, b TEXT, c REAL, PRIMARY KEY('a'));"#;
let table = BTreeTable::from_sql(sql, 0)?;
let column = table.get_column("a").unwrap().1;
assert!(column.primary_key(), "column 'a' should be a primary key");
let column = table.get_column("b").unwrap().1;
assert!(
!column.primary_key(),
"column 'b' shouldn't be a primary key"
);
let column = table.get_column("c").unwrap().1;
assert!(
!column.primary_key(),
"column 'c' shouldn't be a primary key"
);
assert_eq!(
vec![("a".to_string(), SortOrder::Asc)],
table.primary_key_columns,
"primary key column names should be ['a']"
);
Ok(())
}
#[test]
pub fn test_primary_key_separate_single_doubly_quoted() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a INTEGER, b TEXT, c REAL, PRIMARY KEY("a"));"#;
let table = BTreeTable::from_sql(sql, 0)?;
let column = table.get_column("a").unwrap().1;
assert!(column.primary_key(), "column 'a' should be a primary key");
let column = table.get_column("b").unwrap().1;
assert!(
!column.primary_key(),
"column 'b' shouldn't be a primary key"
);
let column = table.get_column("c").unwrap().1;
assert!(
!column.primary_key(),
"column 'c' shouldn't be a primary key"
);
assert_eq!(
vec![("a".to_string(), SortOrder::Asc)],
table.primary_key_columns,
"primary key column names should be ['a']"
);
Ok(())
}
#[test]
pub fn test_default_value() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a INTEGER DEFAULT 23);"#;
let table = BTreeTable::from_sql(sql, 0)?;
let column = table.get_column("a").unwrap().1;
let default = column.default.clone().unwrap();
assert_eq!(default.to_string(), "23");
Ok(())
}
#[test]
pub fn test_col_notnull() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a INTEGER NOT NULL);"#;
let table = BTreeTable::from_sql(sql, 0)?;
let column = table.get_column("a").unwrap().1;
assert!(column.notnull());
Ok(())
}
#[test]
pub fn test_col_notnull_negative() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a INTEGER);"#;
let table = BTreeTable::from_sql(sql, 0)?;
let column = table.get_column("a").unwrap().1;
assert!(!column.notnull());
Ok(())
}
#[test]
pub fn test_col_type_string_integer() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a InTeGeR);"#;
let table = BTreeTable::from_sql(sql, 0)?;
let column = table.get_column("a").unwrap().1;
assert_eq!(column.ty_str, "InTeGeR");
Ok(())
}
#[test]
pub fn test_sqlite_schema() {
let expected = r#"CREATE TABLE sqlite_schema (type TEXT, name TEXT, tbl_name TEXT, rootpage INT, sql TEXT)"#;
let actual = sqlite_schema_table().to_sql();
assert_eq!(expected, actual);
}
#[test]
pub fn test_special_column_names() -> Result<()> {
let tests = [
("foobar", "CREATE TABLE t (foobar TEXT)"),
("_table_name3", "CREATE TABLE t (_table_name3 TEXT)"),
("special name", "CREATE TABLE t ([special name] TEXT)"),
("foo&bar", "CREATE TABLE t ([foo&bar] TEXT)"),
(" name", "CREATE TABLE t ([ name] TEXT)"),
];
for (input_column_name, expected_sql) in tests {
let sql = format!("CREATE TABLE t ([{input_column_name}] TEXT)");
let actual = BTreeTable::from_sql(&sql, 0)?.to_sql();
assert_eq!(expected_sql, actual);
}
Ok(())
}
#[test]
#[should_panic]
fn test_automatic_index_single_column() {
// Without composite primary keys, we should not have an automatic index on a primary key that is a rowid alias
let sql = r#"CREATE TABLE t1 (a INTEGER PRIMARY KEY, b TEXT);"#;
let table = BTreeTable::from_sql(sql, 0).unwrap();
let _index =
Index::automatic_from_primary_key(&table, ("sqlite_autoindex_t1_1".to_string(), 2), 1)
.unwrap();
}
#[test]
fn test_automatic_index_composite_key() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a INTEGER, b TEXT, PRIMARY KEY(a, b));"#;
let table = BTreeTable::from_sql(sql, 0)?;
let index =
Index::automatic_from_primary_key(&table, ("sqlite_autoindex_t1_1".to_string(), 2), 2)?;
assert_eq!(index.name, "sqlite_autoindex_t1_1");
assert_eq!(index.table_name, "t1");
assert_eq!(index.root_page, 2);
assert!(index.unique);
assert_eq!(index.columns.len(), 2);
assert_eq!(index.columns[0].name, "a");
assert_eq!(index.columns[1].name, "b");
assert!(matches!(index.columns[0].order, SortOrder::Asc));
assert!(matches!(index.columns[1].order, SortOrder::Asc));
Ok(())
}
#[test]
#[should_panic]
fn test_automatic_index_no_primary_key() {
let sql = r#"CREATE TABLE t1 (a INTEGER, b TEXT);"#;
let table = BTreeTable::from_sql(sql, 0).unwrap();
Index::automatic_from_primary_key(&table, ("sqlite_autoindex_t1_1".to_string(), 2), 1)
.unwrap();
}
#[test]
fn test_automatic_index_nonexistent_column() {
// Create a table with a primary key column that doesn't exist in the table
let table = BTreeTable {
root_page: 0,
name: "t1".to_string(),
has_rowid: true,
is_strict: false,
has_autoincrement: false,
primary_key_columns: vec![("nonexistent".to_string(), SortOrder::Asc)],
columns: vec![Column::new_default_integer(
Some("a".to_string()),
"INT".to_string(),
None,
)],
unique_sets: vec![],
foreign_keys: vec![],
};
let result =
Index::automatic_from_primary_key(&table, ("sqlite_autoindex_t1_1".to_string(), 2), 1);
assert!(result.is_err());
}
#[test]
fn test_automatic_index_unique_column() -> Result<()> {
let sql = r#"CREATE table t1 (x INTEGER, y INTEGER UNIQUE);"#;
let table = BTreeTable::from_sql(sql, 0)?;
let index = Index::automatic_from_unique(
&table,
("sqlite_autoindex_t1_1".to_string(), 2),
vec![(1, SortOrder::Asc)],
)?;
assert_eq!(index.name, "sqlite_autoindex_t1_1");
assert_eq!(index.table_name, "t1");
assert_eq!(index.root_page, 2);
assert!(index.unique);
assert_eq!(index.columns.len(), 1);
assert_eq!(index.columns[0].name, "y");
assert!(matches!(index.columns[0].order, SortOrder::Asc));
Ok(())
}
#[test]
fn test_automatic_index_pkey_unique_column() -> Result<()> {
let sql = r#"CREATE TABLE t1 (x PRIMARY KEY, y UNIQUE);"#;
let table = BTreeTable::from_sql(sql, 0)?;
let indices = [
Index::automatic_from_primary_key(&table, ("sqlite_autoindex_t1_1".to_string(), 2), 1)?,
Index::automatic_from_unique(
&table,
("sqlite_autoindex_t1_2".to_string(), 3),
vec![(1, SortOrder::Asc)],
)?,
];
assert_eq!(indices[0].name, "sqlite_autoindex_t1_1");
assert_eq!(indices[0].table_name, "t1");
assert_eq!(indices[0].root_page, 2);
assert!(indices[0].unique);
assert_eq!(indices[0].columns.len(), 1);
assert_eq!(indices[0].columns[0].name, "x");
assert!(matches!(indices[0].columns[0].order, SortOrder::Asc));
assert_eq!(indices[1].name, "sqlite_autoindex_t1_2");
assert_eq!(indices[1].table_name, "t1");
assert_eq!(indices[1].root_page, 3);
assert!(indices[1].unique);
assert_eq!(indices[1].columns.len(), 1);
assert_eq!(indices[1].columns[0].name, "y");
assert!(matches!(indices[1].columns[0].order, SortOrder::Asc));
Ok(())
}
#[test]
fn test_automatic_index_pkey_many_unique_columns() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a PRIMARY KEY, b UNIQUE, c, d, UNIQUE(c, d));"#;
let table = BTreeTable::from_sql(sql, 0)?;
let auto_indices = [
("sqlite_autoindex_t1_1".to_string(), 2),
("sqlite_autoindex_t1_2".to_string(), 3),
("sqlite_autoindex_t1_3".to_string(), 4),
];
let indices = vec![
Index::automatic_from_primary_key(&table, ("sqlite_autoindex_t1_1".to_string(), 2), 1)?,
Index::automatic_from_unique(
&table,
("sqlite_autoindex_t1_2".to_string(), 3),
vec![(1, SortOrder::Asc)],
)?,
Index::automatic_from_unique(
&table,
("sqlite_autoindex_t1_3".to_string(), 4),
vec![(2, SortOrder::Asc), (3, SortOrder::Asc)],
)?,
];
assert!(indices.len() == auto_indices.len());
for (pos, index) in indices.iter().enumerate() {
let (index_name, root_page) = &auto_indices[pos];
assert_eq!(index.name, *index_name);
assert_eq!(index.table_name, "t1");
assert_eq!(index.root_page, *root_page);
assert!(index.unique);
if pos == 0 {
assert_eq!(index.columns.len(), 1);
assert_eq!(index.columns[0].name, "a");
} else if pos == 1 {
assert_eq!(index.columns.len(), 1);
assert_eq!(index.columns[0].name, "b");
} else if pos == 2 {
assert_eq!(index.columns.len(), 2);
assert_eq!(index.columns[0].name, "c");
assert_eq!(index.columns[1].name, "d");
}
assert!(matches!(index.columns[0].order, SortOrder::Asc));
}
Ok(())
}
#[test]
fn test_automatic_index_unique_set_dedup() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a, b, UNIQUE(a, b), UNIQUE(a, b));"#;
let table = BTreeTable::from_sql(sql, 0)?;
let index = Index::automatic_from_unique(
&table,
("sqlite_autoindex_t1_1".to_string(), 2),
vec![(0, SortOrder::Asc), (1, SortOrder::Asc)],
)?;
assert_eq!(index.name, "sqlite_autoindex_t1_1");
assert_eq!(index.table_name, "t1");
assert_eq!(index.root_page, 2);
assert!(index.unique);
assert_eq!(index.columns.len(), 2);
assert_eq!(index.columns[0].name, "a");
assert!(matches!(index.columns[0].order, SortOrder::Asc));
assert_eq!(index.columns[1].name, "b");
assert!(matches!(index.columns[1].order, SortOrder::Asc));
Ok(())
}
#[test]
fn test_automatic_index_primary_key_is_unique() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a primary key unique);"#;
let table = BTreeTable::from_sql(sql, 0)?;
let index =
Index::automatic_from_primary_key(&table, ("sqlite_autoindex_t1_1".to_string(), 2), 1)?;
assert_eq!(index.name, "sqlite_autoindex_t1_1");
assert_eq!(index.table_name, "t1");
assert_eq!(index.root_page, 2);
assert!(index.unique);
assert_eq!(index.columns.len(), 1);
assert_eq!(index.columns[0].name, "a");
assert!(matches!(index.columns[0].order, SortOrder::Asc));
Ok(())
}
#[test]
fn test_automatic_index_primary_key_is_unique_and_composite() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a, b, PRIMARY KEY(a, b), UNIQUE(a, b));"#;
let table = BTreeTable::from_sql(sql, 0)?;
let index =
Index::automatic_from_primary_key(&table, ("sqlite_autoindex_t1_1".to_string(), 2), 2)?;
assert_eq!(index.name, "sqlite_autoindex_t1_1");
assert_eq!(index.table_name, "t1");
assert_eq!(index.root_page, 2);
assert!(index.unique);
assert_eq!(index.columns.len(), 2);
assert_eq!(index.columns[0].name, "a");
assert_eq!(index.columns[1].name, "b");
assert!(matches!(index.columns[0].order, SortOrder::Asc));
Ok(())
}
#[test]
fn test_automatic_index_unique_and_a_pk() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a NUMERIC UNIQUE UNIQUE, b TEXT PRIMARY KEY)"#;
let table = BTreeTable::from_sql(sql, 0)?;
let mut indexes = vec![
Index::automatic_from_unique(
&table,
("sqlite_autoindex_t1_1".to_string(), 2),
vec![(0, SortOrder::Asc)],
)?,
Index::automatic_from_primary_key(&table, ("sqlite_autoindex_t1_2".to_string(), 3), 1)?,
];
assert!(indexes.len() == 2);
let index = indexes.pop().unwrap();
assert_eq!(index.name, "sqlite_autoindex_t1_2");
assert_eq!(index.table_name, "t1");
assert_eq!(index.root_page, 3);
assert!(index.unique);
assert_eq!(index.columns.len(), 1);
assert_eq!(index.columns[0].name, "b");
assert!(matches!(index.columns[0].order, SortOrder::Asc));
let index = indexes.pop().unwrap();
assert_eq!(index.name, "sqlite_autoindex_t1_1");
assert_eq!(index.table_name, "t1");
assert_eq!(index.root_page, 2);
assert!(index.unique);
assert_eq!(index.columns.len(), 1);
assert_eq!(index.columns[0].name, "a");
assert!(matches!(index.columns[0].order, SortOrder::Asc));
Ok(())
}
}