Merge 'Fix automatic indexes' from Jussi Saurio

Closes #2993
## Background
When a `CREATE TABLE` statement specifies constraints like `col UNIQUE`,
`col PRIMARY KEY`, `UNIQUE (col1, col2)`, `PRIMARY KEY(col3, col4)`,
SQLite creates indexes for these constraints automatically with the
naming scheme `sqlite_autoindex_<table_name>_<increasing_number>`.
## Problem
SQLite expects these indexes to be created in table definition order.
For example:
```sql
CREATE TABLE t(x UNIQUE, y PRIMARY KEY, c, d, UNIQUE(c,d));
```
Should result in:
```sql
sqlite_autoindex_t_1 -- x UNIQUE
sqlite_autoindex_t_2 -- y PRIMARY KEY
sqlite_autoindex_t_3-- UNIQUE(c,d)
```
However, `tursodb` currently doesn't uphold this invariant -- for
example: the PRIMARY KEY index is always constructed first. SQLite flags
this as a corruption error (see #2993).
## Solution
- Process "unique sets" in table definition order. "Unique sets" are
groups of 1-n columns that are part of either a UNIQUE or a PRIMARY KEY
constraint.
- Deduplicate unique sets properly: a PRIMARY KEY of a rowid alias
(INTEGER PRIMARY KEY) is not a unique set. `UNIQUE (a desc, b)` and
`PRIMARY KEY(a, b)` are a single unique set, not two.
- Unify logic for creating automatic indexes and parsing them - remove
separate logic in `check_automatic_pk_index_required()` and use the
existing `create_table()` utility in both index creation and
deserialization.
- Deserialize a single automatic index per unique set, and assert that
`unique_sets.len() == autoindexes.len()`.
- Verify consistent behavior by adding a fuzz tests that creates 1000
databases with 1 table each and runs `PRAGMA integrity_check` on all of
them with SQLite.
## Trivia
Apart from fixing the exact issue #2993, this PR also fixes other bugs
related to autoindex construction - namely cases where too many indexes
were created due to improper deduplication of unique sets.

Reviewed-by: Preston Thorpe <preston@turso.tech>

Closes #3018
This commit is contained in:
Preston Thorpe
2025-09-11 17:04:53 -04:00
committed by GitHub
10 changed files with 446 additions and 548 deletions

View File

@@ -1309,7 +1309,7 @@ mod tests {
],
has_rowid: true,
is_strict: false,
unique_sets: None,
unique_sets: vec![],
};
schema.add_btree_table(Arc::new(users_table));
let sales_table = BTreeTable {
@@ -1344,7 +1344,7 @@ mod tests {
],
has_rowid: true,
is_strict: false,
unique_sets: None,
unique_sets: vec![],
};
schema.add_btree_table(Arc::new(sales_table));

View File

@@ -26,7 +26,7 @@ use crate::{
use crate::{util::normalize_ident, Result};
use core::fmt;
use std::cell::RefCell;
use std::collections::{BTreeSet, HashMap, HashSet};
use std::collections::{HashMap, HashSet};
use std::ops::Deref;
use std::rc::Rc;
use std::sync::Arc;
@@ -34,7 +34,7 @@ use std::sync::Mutex;
use tracing::trace;
use turso_parser::ast::{self, ColumnDefinition, Expr, Literal, SortOrder, TableOptions};
use turso_parser::{
ast::{Cmd, CreateTableBody, QualifiedName, ResultColumn, Stmt},
ast::{Cmd, CreateTableBody, ResultColumn, Stmt},
parser::Parser,
};
@@ -396,16 +396,88 @@ impl Schema {
for automatic_index in automatic_indices {
if !self.indexes_enabled() {
self.table_set_has_index(&automatic_index.0);
} else {
let table = self.get_btree_table(&automatic_index.0).unwrap();
let ret_index = Index::automatic_from_primary_key_and_unique(
table.as_ref(),
automatic_index.1,
)?;
for index in ret_index {
self.add_index(Arc::new(index));
continue;
}
// Autoindexes must be parsed in definition order.
// The SQL statement parser enforces that the column definitions come first, and compounds are defined after that,
// e.g. CREATE TABLE t (a, b, UNIQUE(a, b)), and you can't do something like CREATE TABLE t (a, b, UNIQUE(a, b), c);
// Hence, we can process the singles first (unique_set.columns.len() == 1), and then the compounds (unique_set.columns.len() > 1).
let table = self.get_btree_table(&automatic_index.0).unwrap();
let mut automatic_indexes = automatic_index.1;
automatic_indexes.reverse(); // reverse so we can pop() without shifting array elements, while still processing in left-to-right order
let mut pk_index_added = false;
for unique_set in table.unique_sets.iter().filter(|us| us.columns.len() == 1) {
let col_name = &unique_set.columns.first().unwrap().0;
let Some((pos_in_table, column)) = table.get_column(col_name) else {
return Err(LimboError::ParseError(format!(
"Column {col_name} not found in table {}",
table.name
)));
};
if column.primary_key && unique_set.is_primary_key {
if column.is_rowid_alias {
// rowid alias, no index needed
continue;
}
assert!(table.primary_key_columns.first().unwrap().0 == *col_name, "trying to add a primary key index for column that is not the first column in the primary key: {} != {}", table.primary_key_columns.first().unwrap().0, col_name);
// Add single column primary key index
assert!(
!pk_index_added,
"trying to add a second primary key index for table {}",
table.name
);
pk_index_added = true;
self.add_index(Arc::new(Index::automatic_from_primary_key(
table.as_ref(),
automatic_indexes.pop().unwrap(),
1,
)?));
} else {
// Add single column unique index
self.add_index(Arc::new(Index::automatic_from_unique(
table.as_ref(),
automatic_indexes.pop().unwrap(),
vec![(pos_in_table, unique_set.columns.first().unwrap().1)],
)?));
}
}
for unique_set in table.unique_sets.iter().filter(|us| us.columns.len() > 1) {
if unique_set.is_primary_key {
assert!(table.primary_key_columns.len() == unique_set.columns.len(), "trying to add a {}-column primary key index for table {}, but the table has {} primary key columns", unique_set.columns.len(), table.name, table.primary_key_columns.len());
// Add composite primary key index
assert!(
!pk_index_added,
"trying to add a second primary key index for table {}",
table.name
);
pk_index_added = true;
self.add_index(Arc::new(Index::automatic_from_primary_key(
table.as_ref(),
automatic_indexes.pop().unwrap(),
unique_set.columns.len(),
)?));
} else {
// Add composite unique index
let mut column_indices_and_sort_orders =
Vec::with_capacity(unique_set.columns.len());
for (col_name, sort_order) in unique_set.columns.iter() {
let Some((pos_in_table, _)) = table.get_column(col_name) else {
return Err(crate::LimboError::ParseError(format!(
"Column {} not found in table {}",
col_name, table.name
)));
};
column_indices_and_sort_orders.push((pos_in_table, *sort_order));
}
self.add_index(Arc::new(Index::automatic_from_unique(
table.as_ref(),
automatic_indexes.pop().unwrap(),
column_indices_and_sort_orders,
)?));
}
}
assert!(automatic_indexes.is_empty(), "all automatic indexes parsed from sqlite_schema should have been consumed, but {} remain", automatic_indexes.len());
}
Ok(())
}
@@ -437,7 +509,7 @@ impl Schema {
primary_key_columns: Vec::new(),
has_rowid: true,
is_strict: false,
unique_sets: None,
unique_sets: vec![],
})));
self.add_materialized_view(incremental_view, table, sql);
@@ -739,6 +811,12 @@ impl PartialEq for Table {
}
}
#[derive(Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct UniqueSet {
pub columns: Vec<(String, SortOrder)>,
pub is_primary_key: bool,
}
#[derive(Clone, Debug)]
pub struct BTreeTable {
pub root_page: usize,
@@ -747,7 +825,7 @@ pub struct BTreeTable {
pub columns: Vec<Column>,
pub has_rowid: bool,
pub is_strict: bool,
pub unique_sets: Option<Vec<Vec<(String, SortOrder)>>>,
pub unique_sets: Vec<UniqueSet>,
}
impl BTreeTable {
@@ -783,7 +861,7 @@ impl BTreeTable {
let cmd = parser.next_cmd()?;
match cmd {
Some(Cmd::Stmt(Stmt::CreateTable { tbl_name, body, .. })) => {
create_table(tbl_name, body, root_page)
create_table(tbl_name.name.as_str(), &body, root_page)
}
_ => unreachable!("Expected CREATE TABLE statement"),
}
@@ -873,43 +951,18 @@ pub struct FromClauseSubquery {
pub result_columns_start_reg: Option<usize>,
}
#[derive(Debug, Eq)]
struct UniqueColumnProps {
column_name: String,
order: SortOrder,
}
impl PartialEq for UniqueColumnProps {
fn eq(&self, other: &Self) -> bool {
self.column_name.eq(&other.column_name)
}
}
impl PartialOrd for UniqueColumnProps {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for UniqueColumnProps {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.column_name.cmp(&other.column_name)
}
}
fn create_table(
tbl_name: QualifiedName,
body: CreateTableBody,
pub fn create_table(
tbl_name: &str,
body: &CreateTableBody,
root_page: usize,
) -> Result<BTreeTable> {
let table_name = normalize_ident(tbl_name.name.as_str());
let table_name = normalize_ident(tbl_name);
trace!("Creating table {}", table_name);
let mut has_rowid = true;
let mut primary_key_columns = vec![];
let mut cols = vec![];
let is_strict: bool;
// BtreeSet here to preserve order of inserted keys
let mut unique_sets: Vec<BTreeSet<UniqueColumnProps>> = vec![];
let mut unique_sets: Vec<UniqueSet> = vec![];
match body {
CreateTableBody::ColumnsAndConstraints {
columns,
@@ -918,7 +971,7 @@ fn create_table(
} => {
is_strict = options.contains(TableOptions::STRICT);
for c in constraints {
if let ast::TableConstraint::PrimaryKey { columns, .. } = c.constraint {
if let ast::TableConstraint::PrimaryKey { columns, .. } = &c.constraint {
for column in columns {
let col_name = match column.expr.as_ref() {
Expr::Id(id) => normalize_ident(id.as_str()),
@@ -932,29 +985,30 @@ fn create_table(
primary_key_columns
.push((col_name, column.order.unwrap_or(SortOrder::Asc)));
}
unique_sets.push(UniqueSet {
columns: primary_key_columns.clone(),
is_primary_key: true,
});
} else if let ast::TableConstraint::Unique {
columns,
conflict_clause,
} = c.constraint
} = &c.constraint
{
if conflict_clause.is_some() {
unimplemented!("ON CONFLICT not implemented");
}
let unique_set = columns
.into_iter()
.map(|column| {
let column_name = match column.expr.as_ref() {
Expr::Id(id) => normalize_ident(id.as_str()),
_ => {
todo!("Unsupported unique expression");
}
};
UniqueColumnProps {
column_name,
order: column.order.unwrap_or(SortOrder::Asc),
}
})
.collect();
let unique_set = UniqueSet {
columns: columns
.iter()
.map(|column| {
(
column.expr.as_ref().to_string(),
column.order.unwrap_or(SortOrder::Asc),
)
})
.collect(),
is_primary_key: false,
};
unique_sets.push(unique_set);
}
}
@@ -962,7 +1016,7 @@ fn create_table(
col_name,
col_type,
constraints,
} in &columns
} in columns
{
let name = col_name.as_str().to_string();
// Regular sqlite tables have an integer rowid that uniquely identifies a row.
@@ -1002,6 +1056,10 @@ fn create_table(
if let Some(o) = o {
order = o;
}
unique_sets.push(UniqueSet {
columns: vec![(name.clone(), order)],
is_primary_key: true,
});
}
ast::ColumnConstraint::NotNull { nullable, .. } => {
notnull = !nullable;
@@ -1013,6 +1071,10 @@ fn create_table(
unimplemented!("ON CONFLICT not implemented");
}
unique = true;
unique_sets.push(UniqueSet {
columns: vec![(name.clone(), order)],
is_primary_key: false,
});
}
ast::ColumnConstraint::Collate { ref collation_name } => {
collation = Some(CollationSeq::new(collation_name.as_str())?);
@@ -1049,13 +1111,30 @@ fn create_table(
}
CreateTableBody::AsSelect(_) => todo!(),
};
// flip is_rowid_alias back to false if the table has multiple primary keys
// flip is_rowid_alias back to false if the table has multiple primary key columns
// or if the table has no rowid
if !has_rowid || primary_key_columns.len() > 1 {
for col in cols.iter_mut() {
col.is_rowid_alias = false;
}
}
for col in cols.iter() {
if col.is_rowid_alias {
// Unique sets are used for creating automatic indexes. An index is not created for a rowid alias PRIMARY KEY.
// However, an index IS created for a rowid alias UNIQUE, e.g. CREATE TABLE t(x INTEGER PRIMARY KEY, UNIQUE(x))
let unique_set_w_only_rowid_alias = unique_sets.iter().position(|us| {
us.is_primary_key
&& us.columns.len() == 1
&& &us.columns.first().unwrap().0 == col.name.as_ref().unwrap()
});
if let Some(u) = unique_set_w_only_rowid_alias {
unique_sets.remove(u);
}
}
}
Ok(BTreeTable {
root_page,
name: table_name,
@@ -1063,21 +1142,34 @@ fn create_table(
primary_key_columns,
columns: cols,
is_strict,
unique_sets: if unique_sets.is_empty() {
None
} else {
// Sort first so that dedup operation removes all duplicates
unique_sets.dedup();
Some(
unique_sets
.into_iter()
.map(|set| {
set.into_iter()
.map(|UniqueColumnProps { column_name, order }| (column_name, order))
.collect()
})
.collect(),
)
unique_sets: {
// If there are any unique sets that have identical column names in the same order (even if they are PRIMARY KEY and UNIQUE and have different sort orders), remove the duplicates.
// Examples:
// PRIMARY KEY (a, b) and UNIQUE (a desc, b) are the same
// PRIMARY KEY (a, b) and UNIQUE (b, a) are not the same
// Using a n^2 monkey algorithm here because n is small, CPUs are fast, life is short, and most importantly:
// we want to preserve the order of the sets -- automatic index names in sqlite_schema must be in definition order.
let mut i = 0;
while i < unique_sets.len() {
let mut j = i + 1;
while j < unique_sets.len() {
let lengths_equal =
unique_sets[i].columns.len() == unique_sets[j].columns.len();
if lengths_equal
&& unique_sets[i]
.columns
.iter()
.zip(unique_sets[j].columns.iter())
.all(|((a_name, _), (b_name, _))| a_name == b_name)
{
unique_sets.remove(j);
} else {
j += 1;
}
}
i += 1;
}
unique_sets
},
})
}
@@ -1438,7 +1530,7 @@ pub fn sqlite_schema_table() -> BTreeTable {
hidden: false,
},
],
unique_sets: None,
unique_sets: vec![],
}
}
@@ -1519,184 +1611,81 @@ impl Index {
}
}
/// The order of index returned should be kept the same
///
/// If the order of the index returned changes, this is a breaking change
///
/// In the future when we support Alter Column, we should revisit a way to make this less dependent on ordering
pub fn automatic_from_primary_key_and_unique(
pub fn automatic_from_primary_key(
table: &BTreeTable,
auto_indices: Vec<(String, usize)>,
) -> Result<Vec<Index>> {
assert!(!auto_indices.is_empty());
let mut indices = Vec::with_capacity(auto_indices.len());
// The number of auto_indices in create table should match in the number of indices we calculate in this function
let mut auto_indices = auto_indices.into_iter();
// TODO: see a better way to please Rust type system with iterators here
// I wanted to just chain the iterator above but Rust type system get's messy with Iterators.
// It would not allow me chain them even by using a core::iter::empty()
// To circumvent this, I'm having to allocate a second Vec, and extend the other from it.
auto_index: (String, usize), // name, root_page
column_count: usize,
) -> Result<Index> {
let has_primary_key_index =
table.get_rowid_alias_column().is_none() && !table.primary_key_columns.is_empty();
if has_primary_key_index {
let (index_name, root_page) = auto_indices.next().expect(
"number of auto_indices in schema should be same number of indices calculated",
);
assert!(has_primary_key_index);
let (index_name, root_page) = auto_index;
let primary_keys = table
.primary_key_columns
.iter()
.map(|(col_name, order)| {
// Verify that each primary key column exists in the table
let Some((pos_in_table, _)) = table.get_column(col_name) else {
// This is clearly an invariant that should be maintained, so a panic seems more correct here
panic!(
"Column {} is in index {} but not found in table {}",
col_name, index_name, table.name
);
};
let (_, column) = table.get_column(col_name).unwrap();
IndexColumn {
name: normalize_ident(col_name),
order: *order,
pos_in_table,
collation: column.collation,
default: column.default.clone(),
}
})
.collect::<Vec<_>>();
indices.push(Index {
name: normalize_ident(index_name.as_str()),
table_name: table.name.clone(),
root_page,
columns: primary_keys,
unique: true,
ephemeral: false,
has_rowid: table.has_rowid,
let mut primary_keys = Vec::with_capacity(column_count);
for (col_name, order) in table.primary_key_columns.iter() {
let Some((pos_in_table, _)) = table.get_column(col_name) else {
return Err(crate::LimboError::ParseError(format!(
"Column {} not found in table {}",
col_name, table.name
)));
};
let (_, column) = table.get_column(col_name).unwrap();
primary_keys.push(IndexColumn {
name: normalize_ident(col_name),
order: *order,
pos_in_table,
collation: column.collation,
default: column.default.clone(),
});
}
// Each unique col needs its own index
let unique_indices = table
assert!(primary_keys.len() == column_count);
Ok(Index {
name: normalize_ident(index_name.as_str()),
table_name: table.name.clone(),
root_page,
columns: primary_keys,
unique: true,
ephemeral: false,
has_rowid: table.has_rowid,
})
}
pub fn automatic_from_unique(
table: &BTreeTable,
auto_index: (String, usize), // name, root_page
column_indices_and_sort_orders: Vec<(usize, SortOrder)>,
) -> Result<Index> {
let (index_name, root_page) = auto_index;
let unique_cols = table
.columns
.iter()
.enumerate()
.filter_map(|(pos_in_table, col)| {
if col.unique {
// Unique columns in Table should always be named
let col_name = col.name.as_ref().unwrap();
if has_primary_key_index
&& table.primary_key_columns.len() == 1
&& &table.primary_key_columns.first().as_ref().unwrap().0 == col_name {
// skip unique columns that are satisfied with pk constraint
return None;
}
let (index_name, root_page) = auto_indices.next().expect("number of auto_indices in schema should be same number of indices calculated");
let (_, column) = table.get_column(col_name).unwrap();
Some(Index {
name: normalize_ident(index_name.as_str()),
table_name: table.name.clone(),
root_page,
columns: vec![IndexColumn {
name: normalize_ident(col_name),
order: SortOrder::Asc, // Default Sort Order
pos_in_table,
collation: column.collation,
default: column.default.clone(),
}],
unique: true,
ephemeral: false,
has_rowid: table.has_rowid,
})
} else {
None
}
});
indices.extend(unique_indices);
if table.primary_key_columns.is_empty() && indices.is_empty() && table.unique_sets.is_none()
{
return Err(crate::LimboError::InternalError(
"Cannot create automatic index for table without primary key or unique constraint"
.to_string(),
));
}
// Invariant: We should not create an automatic index on table with a single column as rowid_alias
// and no Unique columns.
// e.g CREATE TABLE t1 (a INTEGER PRIMARY KEY, b TEXT);
// If this happens, the caller incorrectly called this function
if table.get_rowid_alias_column().is_some()
&& indices.is_empty()
&& table.unique_sets.is_none()
{
panic!("should not create an automatic index on table with a single column as rowid_alias and no UNIQUE columns");
}
if let Some(unique_sets) = table.unique_sets.as_ref() {
let unique_set_indices = unique_sets
.iter()
.filter(|set| {
if has_primary_key_index
&& table.primary_key_columns.len() == set.len()
&& table
.primary_key_columns
.iter()
.all(|col| set.contains(col))
{
// skip unique columns that are satisfied with pk constraint
false
} else {
true
}
let (pos_in_table, sort_order) = column_indices_and_sort_orders
.iter()
.find(|(pos, _)| *pos == pos_in_table)?;
Some(IndexColumn {
name: normalize_ident(col.name.as_ref().unwrap()),
order: *sort_order,
pos_in_table: *pos_in_table,
collation: col.collation,
default: col.default.clone(),
})
.map(|set| {
let (index_name, root_page) = auto_indices.next().expect(
"number of auto_indices in schema should be same number of indices calculated",
);
})
.collect::<Vec<_>>();
let index_cols = set.iter().map(|(col_name, order)| {
let Some((pos_in_table, _)) = table.get_column(col_name) else {
// This is clearly an invariant that should be maintained, so a panic seems more correct here
panic!(
"Column {} is in index {} but not found in table {}",
col_name, index_name, table.name
);
};
let (_, column) = table.get_column(col_name).unwrap();
IndexColumn {
name: normalize_ident(col_name),
order: *order,
pos_in_table,
collation: column.collation,
default: column.default.clone(),
}
});
Index {
name: normalize_ident(index_name.as_str()),
table_name: table.name.clone(),
root_page,
columns: index_cols.collect(),
unique: true,
ephemeral: false,
has_rowid: table.has_rowid,
}
});
indices.extend(unique_set_indices);
}
if auto_indices.next().is_some() {
panic!("number of auto_indices in schema should be same number of indices calculated");
}
Ok(indices)
Ok(Index {
name: normalize_ident(index_name.as_str()),
table_name: table.name.clone(),
root_page,
columns: unique_cols,
unique: true,
ephemeral: false,
has_rowid: table.has_rowid,
})
}
/// Given a column position in the table, return the position in the index.
@@ -1715,7 +1704,6 @@ impl Index {
#[cfg(test)]
mod tests {
use super::*;
use crate::LimboError;
#[test]
pub fn test_has_rowid_true() -> Result<()> {
@@ -2001,24 +1989,18 @@ mod tests {
// Without composite primary keys, we should not have an automatic index on a primary key that is a rowid alias
let sql = r#"CREATE TABLE t1 (a INTEGER PRIMARY KEY, b TEXT);"#;
let table = BTreeTable::from_sql(sql, 0).unwrap();
let _index = Index::automatic_from_primary_key_and_unique(
&table,
vec![("sqlite_autoindex_t1_1".to_string(), 2)],
)
.unwrap();
let _index =
Index::automatic_from_primary_key(&table, ("sqlite_autoindex_t1_1".to_string(), 2), 1)
.unwrap();
}
#[test]
fn test_automatic_index_composite_key() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a INTEGER, b TEXT, PRIMARY KEY(a, b));"#;
let table = BTreeTable::from_sql(sql, 0)?;
let mut index = Index::automatic_from_primary_key_and_unique(
&table,
vec![("sqlite_autoindex_t1_1".to_string(), 2)],
)?;
let index =
Index::automatic_from_primary_key(&table, ("sqlite_autoindex_t1_1".to_string(), 2), 2)?;
assert!(index.len() == 1);
let index = index.pop().unwrap();
assert_eq!(index.name, "sqlite_autoindex_t1_1");
assert_eq!(index.table_name, "t1");
assert_eq!(index.root_page, 2);
@@ -2032,24 +2014,15 @@ mod tests {
}
#[test]
fn test_automatic_index_no_primary_key() -> Result<()> {
#[should_panic]
fn test_automatic_index_no_primary_key() {
let sql = r#"CREATE TABLE t1 (a INTEGER, b TEXT);"#;
let table = BTreeTable::from_sql(sql, 0)?;
let result = Index::automatic_from_primary_key_and_unique(
&table,
vec![("sqlite_autoindex_t1_1".to_string(), 2)],
);
assert!(result.is_err());
assert!(matches!(
result.unwrap_err(),
LimboError::InternalError(msg) if msg.contains("without primary key")
));
Ok(())
let table = BTreeTable::from_sql(sql, 0).unwrap();
Index::automatic_from_primary_key(&table, ("sqlite_autoindex_t1_1".to_string(), 2), 1)
.unwrap();
}
#[test]
#[should_panic]
fn test_automatic_index_nonexistent_column() {
// Create a table with a primary key column that doesn't exist in the table
let table = BTreeTable {
@@ -2070,27 +2043,24 @@ mod tests {
collation: None,
hidden: false,
}],
unique_sets: None,
unique_sets: vec![],
};
let _result = Index::automatic_from_primary_key_and_unique(
&table,
vec![("sqlite_autoindex_t1_1".to_string(), 2)],
);
let result =
Index::automatic_from_primary_key(&table, ("sqlite_autoindex_t1_1".to_string(), 2), 1);
assert!(result.is_err());
}
#[test]
fn test_automatic_index_unique_column() -> Result<()> {
let sql = r#"CREATE table t1 (x INTEGER, y INTEGER UNIQUE);"#;
let table = BTreeTable::from_sql(sql, 0)?;
let mut index = Index::automatic_from_primary_key_and_unique(
let index = Index::automatic_from_unique(
&table,
vec![("sqlite_autoindex_t1_1".to_string(), 2)],
("sqlite_autoindex_t1_1".to_string(), 2),
vec![(1, SortOrder::Asc)],
)?;
assert!(index.len() == 1);
let index = index.pop().unwrap();
assert_eq!(index.name, "sqlite_autoindex_t1_1");
assert_eq!(index.table_name, "t1");
assert_eq!(index.root_page, 2);
@@ -2105,29 +2075,30 @@ mod tests {
fn test_automatic_index_pkey_unique_column() -> Result<()> {
let sql = r#"CREATE TABLE t1 (x PRIMARY KEY, y UNIQUE);"#;
let table = BTreeTable::from_sql(sql, 0)?;
let auto_indices = vec![
("sqlite_autoindex_t1_1".to_string(), 2),
("sqlite_autoindex_t1_2".to_string(), 3),
let indices = [
Index::automatic_from_primary_key(&table, ("sqlite_autoindex_t1_1".to_string(), 2), 1)?,
Index::automatic_from_unique(
&table,
("sqlite_autoindex_t1_2".to_string(), 3),
vec![(1, SortOrder::Asc)],
)?,
];
let indices = Index::automatic_from_primary_key_and_unique(&table, auto_indices.clone())?;
assert!(indices.len() == auto_indices.len());
assert_eq!(indices[0].name, "sqlite_autoindex_t1_1");
assert_eq!(indices[0].table_name, "t1");
assert_eq!(indices[0].root_page, 2);
assert!(indices[0].unique);
assert_eq!(indices[0].columns.len(), 1);
assert_eq!(indices[0].columns[0].name, "x");
assert!(matches!(indices[0].columns[0].order, SortOrder::Asc));
for (pos, index) in indices.iter().enumerate() {
let (index_name, root_page) = &auto_indices[pos];
assert_eq!(index.name, *index_name);
assert_eq!(index.table_name, "t1");
assert_eq!(index.root_page, *root_page);
assert!(index.unique);
assert_eq!(index.columns.len(), 1);
if pos == 0 {
assert_eq!(index.columns[0].name, "x");
} else if pos == 1 {
assert_eq!(index.columns[0].name, "y");
}
assert!(matches!(index.columns[0].order, SortOrder::Asc));
}
assert_eq!(indices[1].name, "sqlite_autoindex_t1_2");
assert_eq!(indices[1].table_name, "t1");
assert_eq!(indices[1].root_page, 3);
assert!(indices[1].unique);
assert_eq!(indices[1].columns.len(), 1);
assert_eq!(indices[1].columns[0].name, "y");
assert!(matches!(indices[1].columns[0].order, SortOrder::Asc));
Ok(())
}
@@ -2136,12 +2107,24 @@ mod tests {
fn test_automatic_index_pkey_many_unique_columns() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a PRIMARY KEY, b UNIQUE, c, d, UNIQUE(c, d));"#;
let table = BTreeTable::from_sql(sql, 0)?;
let auto_indices = vec![
let auto_indices = [
("sqlite_autoindex_t1_1".to_string(), 2),
("sqlite_autoindex_t1_2".to_string(), 3),
("sqlite_autoindex_t1_2".to_string(), 4),
("sqlite_autoindex_t1_3".to_string(), 4),
];
let indices = vec![
Index::automatic_from_primary_key(&table, ("sqlite_autoindex_t1_1".to_string(), 2), 1)?,
Index::automatic_from_unique(
&table,
("sqlite_autoindex_t1_2".to_string(), 3),
vec![(1, SortOrder::Asc)],
)?,
Index::automatic_from_unique(
&table,
("sqlite_autoindex_t1_3".to_string(), 4),
vec![(2, SortOrder::Asc), (3, SortOrder::Asc)],
)?,
];
let indices = Index::automatic_from_primary_key_and_unique(&table, auto_indices.clone())?;
assert!(indices.len() == auto_indices.len());
@@ -2174,14 +2157,12 @@ mod tests {
fn test_automatic_index_unique_set_dedup() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a, b, UNIQUE(a, b), UNIQUE(a, b));"#;
let table = BTreeTable::from_sql(sql, 0)?;
let mut index = Index::automatic_from_primary_key_and_unique(
let index = Index::automatic_from_unique(
&table,
vec![("sqlite_autoindex_t1_1".to_string(), 2)],
("sqlite_autoindex_t1_1".to_string(), 2),
vec![(0, SortOrder::Asc), (1, SortOrder::Asc)],
)?;
assert!(index.len() == 1);
let index = index.pop().unwrap();
assert_eq!(index.name, "sqlite_autoindex_t1_1");
assert_eq!(index.table_name, "t1");
assert_eq!(index.root_page, 2);
@@ -2199,13 +2180,8 @@ mod tests {
fn test_automatic_index_primary_key_is_unique() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a primary key unique);"#;
let table = BTreeTable::from_sql(sql, 0)?;
let mut index = Index::automatic_from_primary_key_and_unique(
&table,
vec![("sqlite_autoindex_t1_1".to_string(), 2)],
)?;
assert!(index.len() == 1);
let index = index.pop().unwrap();
let index =
Index::automatic_from_primary_key(&table, ("sqlite_autoindex_t1_1".to_string(), 2), 1)?;
assert_eq!(index.name, "sqlite_autoindex_t1_1");
assert_eq!(index.table_name, "t1");
@@ -2222,13 +2198,8 @@ mod tests {
fn test_automatic_index_primary_key_is_unique_and_composite() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a, b, PRIMARY KEY(a, b), UNIQUE(a, b));"#;
let table = BTreeTable::from_sql(sql, 0)?;
let mut index = Index::automatic_from_primary_key_and_unique(
&table,
vec![("sqlite_autoindex_t1_1".to_string(), 2)],
)?;
assert!(index.len() == 1);
let index = index.pop().unwrap();
let index =
Index::automatic_from_primary_key(&table, ("sqlite_autoindex_t1_1".to_string(), 2), 2)?;
assert_eq!(index.name, "sqlite_autoindex_t1_1");
assert_eq!(index.table_name, "t1");
@@ -2246,13 +2217,14 @@ mod tests {
fn test_automatic_index_unique_and_a_pk() -> Result<()> {
let sql = r#"CREATE TABLE t1 (a NUMERIC UNIQUE UNIQUE, b TEXT PRIMARY KEY)"#;
let table = BTreeTable::from_sql(sql, 0)?;
let mut indexes = Index::automatic_from_primary_key_and_unique(
&table,
vec![
let mut indexes = vec![
Index::automatic_from_unique(
&table,
("sqlite_autoindex_t1_1".to_string(), 2),
("sqlite_autoindex_t1_2".to_string(), 3),
],
)?;
vec![(0, SortOrder::Asc)],
)?,
Index::automatic_from_primary_key(&table, ("sqlite_autoindex_t1_2".to_string(), 3), 1)?,
];
assert!(indexes.len() == 2);
let index = indexes.pop().unwrap();
@@ -2261,7 +2233,7 @@ mod tests {
assert_eq!(index.root_page, 3);
assert!(index.unique);
assert_eq!(index.columns.len(), 1);
assert_eq!(index.columns[0].name, "a");
assert_eq!(index.columns[0].name, "b");
assert!(matches!(index.columns[0].order, SortOrder::Asc));
let index = indexes.pop().unwrap();
@@ -2270,7 +2242,7 @@ mod tests {
assert_eq!(index.root_page, 2);
assert!(index.unique);
assert_eq!(index.columns.len(), 1);
assert_eq!(index.columns[0].name, "b");
assert_eq!(index.columns[0].name, "a");
assert!(matches!(index.columns[0].order, SortOrder::Asc));
Ok(())

View File

@@ -74,11 +74,10 @@ pub fn translate_alter_table(
}
if column.unique
|| btree.unique_sets.as_ref().is_some_and(|set| {
set.iter().any(|set| {
set.iter()
.any(|(name, _)| name == &normalize_ident(column_name))
})
|| btree.unique_sets.iter().any(|set| {
set.columns
.iter()
.any(|(name, _)| name == &normalize_ident(column_name))
})
{
return Err(LimboError::ParseError(format!(

View File

@@ -1903,7 +1903,7 @@ mod tests {
],
has_rowid: true,
is_strict: false,
unique_sets: None,
unique_sets: vec![],
};
schema.add_btree_table(Arc::new(users_table));
@@ -1964,7 +1964,7 @@ mod tests {
],
has_rowid: true,
is_strict: false,
unique_sets: None,
unique_sets: vec![],
};
schema.add_btree_table(Arc::new(orders_table));

View File

@@ -1654,7 +1654,7 @@ mod tests {
columns,
has_rowid: true,
is_strict: false,
unique_sets: None,
unique_sets: vec![],
})
}

View File

@@ -549,7 +549,7 @@ fn parse_table(
primary_key_columns: Vec::new(),
has_rowid: true,
is_strict: false,
unique_sets: None,
unique_sets: vec![],
});
drop(view_guard);

View File

@@ -1,10 +1,10 @@
use std::collections::HashSet;
use std::ops::Range;
use std::rc::Rc;
use std::sync::Arc;
use crate::ast;
use crate::ext::VTabImpl;
use crate::schema::create_table;
use crate::schema::BTreeTable;
use crate::schema::Column;
use crate::schema::Schema;
@@ -23,7 +23,6 @@ use crate::util::PRIMARY_KEY_AUTOMATIC_INDEX_NAME_PREFIX;
use crate::vdbe::builder::CursorType;
use crate::vdbe::insn::Cookie;
use crate::vdbe::insn::{CmpInsFlags, InsertFlags, Insn};
use crate::LimboError;
use crate::SymbolTable;
use crate::{bail_parse_error, Result};
@@ -289,12 +288,6 @@ pub fn emit_schema_entry(
Ok(())
}
#[derive(Debug)]
struct PrimaryKeyColumnInfo<'a> {
name: &'a str,
is_descending: bool,
}
/// Check if an automatic PRIMARY KEY index is required for the table.
/// If so, create a register for the index root page and return it.
///
@@ -309,202 +302,12 @@ fn check_automatic_pk_index_required(
program: &mut ProgramBuilder,
tbl_name: &str,
) -> Result<Option<Range<usize>>> {
match body {
ast::CreateTableBody::ColumnsAndConstraints {
columns,
constraints,
options,
} => {
let mut primary_key_definition = None;
// Used to dedup named unique constraints
let mut unique_sets = vec![];
// Check table constraints for PRIMARY KEY
for constraint in constraints {
if let ast::TableConstraint::PrimaryKey {
columns: pk_cols, ..
} = &constraint.constraint
{
if primary_key_definition.is_some() {
bail_parse_error!("table {} has more than one primary key", tbl_name);
}
let primary_key_column_results = pk_cols
.iter()
.map(|col| match col.expr.as_ref() {
ast::Expr::Id(name) => {
if !columns.iter().any(
|ast::ColumnDefinition { col_name, .. }| {
col_name.as_str() == name.as_str()
},
) {
bail_parse_error!("No such column: {}", name.as_str());
}
Ok(PrimaryKeyColumnInfo {
name: name.as_str(),
is_descending: matches!(col.order, Some(ast::SortOrder::Desc)),
})
}
_ => Err(LimboError::ParseError(
"expressions prohibited in PRIMARY KEY and UNIQUE constraints"
.to_string(),
)),
})
.collect::<Result<Vec<_>>>()?;
for pk_info in primary_key_column_results {
let column_name = pk_info.name;
let column_def = columns
.iter()
.find(|ast::ColumnDefinition { col_name, .. }| {
col_name.as_str() == column_name
})
.expect("primary key column should be in Create Body columns");
match &mut primary_key_definition {
Some(PrimaryKeyDefinitionType::Simple { column, .. }) => {
let mut columns = HashSet::new();
columns.insert(std::mem::take(column));
// Have to also insert the current column_name we are iterating over in primary_key_column_results
columns.insert(column_name.to_string());
primary_key_definition =
Some(PrimaryKeyDefinitionType::Composite { columns });
}
Some(PrimaryKeyDefinitionType::Composite { columns }) => {
columns.insert(column_name.to_string());
}
None => {
let typename =
column_def.col_type.as_ref().map(|t| t.name.as_str());
let is_descending = pk_info.is_descending;
primary_key_definition = Some(PrimaryKeyDefinitionType::Simple {
typename,
is_descending,
column: column_name.to_string(),
});
}
}
}
} else if let ast::TableConstraint::Unique {
columns: unique_columns,
conflict_clause,
} = &constraint.constraint
{
if conflict_clause.is_some() {
unimplemented!("ON CONFLICT not implemented");
}
let col_names = unique_columns
.iter()
.map(|column| match column.expr.as_ref() {
turso_parser::ast::Expr::Id(id) => {
if !columns.iter().any(
|ast::ColumnDefinition { col_name, .. }| {
col_name.as_str() == id.as_str()
},
) {
bail_parse_error!("No such column: {}", id.as_str());
}
Ok(crate::util::normalize_ident(id.as_str()))
}
_ => {
todo!("Unsupported unique expression");
}
})
.collect::<Result<HashSet<String>>>()?;
unique_sets.push(col_names);
}
}
// Check column constraints for PRIMARY KEY and UNIQUE
for ast::ColumnDefinition {
col_name,
col_type,
constraints,
..
} in columns.iter()
{
for constraint in constraints {
if matches!(
constraint.constraint,
ast::ColumnConstraint::PrimaryKey { .. }
) {
if primary_key_definition.is_some() {
bail_parse_error!("table {} has more than one primary key", tbl_name);
}
let typename = col_type.as_ref().map(|t| t.name.as_str());
primary_key_definition = Some(PrimaryKeyDefinitionType::Simple {
typename,
is_descending: false,
column: col_name.as_str().to_string(),
});
} else if matches!(constraint.constraint, ast::ColumnConstraint::Unique(..)) {
let mut single_set = HashSet::new();
single_set.insert(col_name.as_str().to_string());
unique_sets.push(single_set);
}
}
}
// Check if table has rowid
if options.contains(ast::TableOptions::WITHOUT_ROWID) {
bail_parse_error!("WITHOUT ROWID tables are not supported yet");
}
unique_sets.dedup();
// Check if we need an automatic index
let mut pk_is_unique = false;
let auto_index_pk = if let Some(primary_key_definition) = &primary_key_definition {
match primary_key_definition {
PrimaryKeyDefinitionType::Simple {
typename,
is_descending,
column,
} => {
pk_is_unique = unique_sets
.iter()
.any(|set| set.len() == 1 && set.contains(column));
let is_integer =
typename.is_some() && typename.unwrap().eq_ignore_ascii_case("INTEGER"); // Should match on any case of INTEGER
!is_integer || *is_descending
}
PrimaryKeyDefinitionType::Composite { columns } => {
pk_is_unique = unique_sets.iter().any(|set| set == columns);
true
}
}
} else {
false
};
let mut total_indices = unique_sets.len();
// if pk needs and index, but we already found out we primary key is unique, we only need a single index since constraint pk == unique
if auto_index_pk && !pk_is_unique {
total_indices += 1;
}
if total_indices > 0 {
let index_start_reg = program.alloc_registers(total_indices);
Ok(Some(index_start_reg..index_start_reg + total_indices))
} else {
Ok(None)
}
}
ast::CreateTableBody::AsSelect(_) => {
bail_parse_error!("CREATE TABLE AS SELECT not supported yet")
}
let table = create_table(tbl_name, body, 0)?; // abusing create_table() to avoid code duplication; we don't care about the root page here
if table.unique_sets.is_empty() {
return Ok(None);
}
}
#[derive(Debug)]
enum PrimaryKeyDefinitionType<'a> {
Simple {
column: String,
typename: Option<&'a str>,
is_descending: bool,
},
Composite {
columns: HashSet<String>,
},
let start_reg = program.alloc_registers(table.unique_sets.len());
Ok(Some(start_reg..start_reg + table.unique_sets.len()))
}
fn create_table_body_to_str(tbl_name: &ast::QualifiedName, body: &ast::CreateTableBody) -> String {
@@ -886,7 +689,7 @@ pub fn translate_drop_table(
hidden: false,
}],
is_strict: false,
unique_sets: None,
unique_sets: vec![],
});
// cursor id 2
let ephemeral_cursor_id = program.alloc_cursor_id(CursorType::BTreeTable(simple_table_rc));

View File

@@ -286,7 +286,7 @@ pub fn prepare_update_plan(
hidden: false,
}],
is_strict: false,
unique_sets: None,
unique_sets: vec![],
});
let temp_cursor_id = program.alloc_cursor_id(CursorType::BTreeTable(table.clone()));

View File

@@ -75,7 +75,7 @@ pub fn translate_create_materialized_view(
primary_key_columns: vec![], // Materialized views use implicit rowid
has_rowid: true,
is_strict: false,
unique_sets: None,
unique_sets: vec![],
});
// Allocate a cursor for writing to the view's btree during population

View File

@@ -2,7 +2,7 @@ pub mod grammar_generator;
#[cfg(test)]
mod tests {
use rand::seq::IndexedRandom;
use rand::seq::{IndexedRandom, SliceRandom};
use std::collections::HashSet;
use rand::{Rng, SeedableRng};
@@ -11,7 +11,7 @@ mod tests {
use crate::{
common::{
limbo_exec_rows, limbo_exec_rows_fallible, rng_from_time, sqlite_exec_rows,
do_flush, limbo_exec_rows, limbo_exec_rows_fallible, rng_from_time, sqlite_exec_rows,
TempDatabase,
},
fuzz::grammar_generator::{const_str, rand_int, rand_str, GrammarGenerator},
@@ -792,6 +792,130 @@ mod tests {
}
}
#[test]
pub fn ddl_compatibility_fuzz() {
let _ = env_logger::try_init();
let (mut rng, seed) = rng_from_time();
const ITERATIONS: usize = 1000;
for i in 0..ITERATIONS {
let db = TempDatabase::new_empty(true);
let conn = db.connect_limbo();
let num_cols = rng.random_range(1..=5);
let col_names: Vec<String> = (0..num_cols).map(|c| format!("c{c}")).collect();
// Decide whether to use a table-level PRIMARY KEY (possibly compound)
let use_table_pk = num_cols >= 1 && rng.random_bool(0.6);
let pk_len = if use_table_pk {
if num_cols == 1 {
1
} else {
rng.random_range(1..=num_cols.min(3))
}
} else {
0
};
let pk_cols: Vec<String> = if use_table_pk {
let mut col_names_shuffled = col_names.clone();
col_names_shuffled.shuffle(&mut rng);
col_names_shuffled.iter().take(pk_len).cloned().collect()
} else {
Vec::new()
};
let mut has_primary_key = false;
// Column definitions with optional types and column-level constraints
let mut column_defs: Vec<String> = Vec::new();
for name in col_names.iter() {
let mut parts = vec![name.clone()];
if rng.random_bool(0.7) {
let types = ["INTEGER", "TEXT", "REAL", "BLOB", "NUMERIC"];
let t = types[rng.random_range(0..types.len())];
parts.push(t.to_string());
}
if !use_table_pk && !has_primary_key && rng.random_bool(0.3) {
has_primary_key = true;
parts.push("PRIMARY KEY".to_string());
} else if rng.random_bool(0.2) {
parts.push("UNIQUE".to_string());
}
column_defs.push(parts.join(" "));
}
// Table-level constraints: PRIMARY KEY and some UNIQUE constraints (including compound)
let mut table_constraints: Vec<String> = Vec::new();
if use_table_pk {
let mut spec_parts: Vec<String> = Vec::new();
for col in pk_cols.iter() {
if rng.random_bool(0.5) {
let dir = if rng.random_bool(0.5) { "DESC" } else { "ASC" };
spec_parts.push(format!("{col} {dir}"));
} else {
spec_parts.push(col.clone());
}
}
table_constraints.push(format!("PRIMARY KEY ({})", spec_parts.join(", ")));
}
let num_uniques = if num_cols >= 2 {
rng.random_range(0..=2)
} else {
rng.random_range(0..=1)
};
for _ in 0..num_uniques {
let len = if num_cols == 1 {
1
} else {
rng.random_range(1..=num_cols.min(3))
};
let start = rng.random_range(0..num_cols);
let mut uniq_cols: Vec<String> = Vec::new();
for k in 0..len {
let idx = (start + k) % num_cols;
uniq_cols.push(col_names[idx].clone());
}
table_constraints.push(format!("UNIQUE ({})", uniq_cols.join(", ")));
}
let mut elements = column_defs;
elements.extend(table_constraints);
let table_name = format!("t{i}");
let create_sql = format!("CREATE TABLE {table_name} ({})", elements.join(", "));
println!("{create_sql}");
limbo_exec_rows(&db, &conn, &create_sql);
do_flush(&conn, &db).unwrap();
// Open with rusqlite and verify integrity_check returns OK
let sqlite_conn = rusqlite::Connection::open(db.path.clone()).unwrap();
let rows = sqlite_exec_rows(&sqlite_conn, "PRAGMA integrity_check");
assert!(
!rows.is_empty(),
"integrity_check returned no rows (seed: {seed})"
);
match &rows[0][0] {
Value::Text(s) => assert!(
s.eq_ignore_ascii_case("ok"),
"integrity_check failed (seed: {seed}): {rows:?}",
),
other => panic!("unexpected integrity_check result (seed: {seed}): {other:?}",),
}
// Verify the stored SQL matches the create table statement
let conn = db.connect_limbo();
let verify_sql = format!(
"SELECT sql FROM sqlite_schema WHERE name = '{table_name}' and type = 'table'"
);
let res = limbo_exec_rows(&db, &conn, &verify_sql);
assert!(res.len() == 1, "Expected 1 row, got {res:?}");
let Value::Text(s) = &res[0][0] else {
panic!("sql should be TEXT");
};
assert_eq!(s.as_str(), create_sql);
}
}
#[test]
pub fn arithmetic_expression_fuzz() {
let _ = env_logger::try_init();