Properly reparse and revalidate parent and child foreign keys when altering columns

This commit is contained in:
PThorpe92
2025-10-18 13:21:59 -04:00
parent 4dcabf37f1
commit 25aa2b7190

View File

@@ -17,7 +17,9 @@ use crate::types::{
compare_immutable, compare_records_generic, Extendable, IOCompletions, ImmutableRecord,
SeekResult, Text,
};
use crate::util::normalize_ident;
use crate::util::{
normalize_ident, rewrite_column_references_if_needed, rewrite_fk_parent_cols_if_self_ref,
};
use crate::vdbe::insn::InsertFlags;
use crate::vdbe::{registers_to_ref_values, TxnCleanup};
use crate::vector::{vector32_sparse, vector_concat, vector_distance_jaccard, vector_slice};
@@ -73,7 +75,7 @@ use super::{
};
use parking_lot::RwLock;
use rand::{thread_rng, Rng};
use turso_parser::ast::{self, Name, SortOrder};
use turso_parser::ast::{self, ForeignKeyClause, Name, SortOrder};
use turso_parser::parser::Parser;
use super::{
@@ -5463,11 +5465,9 @@ pub fn op_function(
.parse_column_definition(true)
.unwrap();
let new_sql = 'sql: {
if table != tbl_name {
break 'sql None;
}
let rename_to = normalize_ident(column_def.col_name.as_str());
let new_sql = 'sql: {
let Value::Text(sql) = sql else {
break 'sql None;
};
@@ -5521,32 +5521,159 @@ pub fn op_function(
temporary,
if_not_exists,
} => {
if table != normalize_ident(tbl_name.name.as_str()) {
break 'sql None;
}
let ast::CreateTableBody::ColumnsAndConstraints {
mut columns,
constraints,
mut constraints,
options,
} = body
else {
todo!()
};
let column = columns
.iter_mut()
.find(|column| {
column.col_name.as_str() == original_rename_from.as_str()
})
.expect("column being renamed should be present");
let normalized_tbl_name = normalize_ident(tbl_name.name.as_str());
match alter_func {
AlterTableFunc::AlterColumn => *column = column_def,
AlterTableFunc::RenameColumn => {
column.col_name = column_def.col_name
if normalized_tbl_name == table {
// This is the table being altered - update its column
let column = columns
.iter_mut()
.find(|column| {
column.col_name.as_str()
== original_rename_from.as_str()
})
.expect("column being renamed should be present");
match alter_func {
AlterTableFunc::AlterColumn => *column = column_def.clone(),
AlterTableFunc::RenameColumn => {
column.col_name = column_def.col_name.clone()
}
_ => unreachable!(),
}
// Update table-level constraints (PRIMARY KEY, UNIQUE, FOREIGN KEY)
for constraint in &mut constraints {
match &mut constraint.constraint {
ast::TableConstraint::PrimaryKey {
columns: pk_cols,
..
} => {
for col in pk_cols {
let (ast::Expr::Name(ref name)
| ast::Expr::Id(ref name)) = *col.expr
else {
return Err(LimboError::ParseError("Unexpected expression in PRIMARY KEY constraint".to_string()));
};
if normalize_ident(name.as_str()) == rename_from
{
*col.expr = ast::Expr::Name(Name::exact(
column_def.col_name.as_str().to_owned(),
));
}
}
}
ast::TableConstraint::Unique {
columns: uniq_cols,
..
} => {
for col in uniq_cols {
let (ast::Expr::Name(ref name)
| ast::Expr::Id(ref name)) = *col.expr
else {
return Err(LimboError::ParseError("Unexpected expression in PRIMARY KEY constraint".to_string()));
};
if normalize_ident(name.as_str()) == rename_from
{
*col.expr = ast::Expr::Name(Name::exact(
column_def.col_name.as_str().to_owned(),
));
}
}
}
ast::TableConstraint::ForeignKey {
columns: child_cols,
clause,
..
} => {
// Update child columns in this table's FK definitions
for child_col in child_cols {
if normalize_ident(child_col.col_name.as_str())
== rename_from
{
child_col.col_name = Name::exact(
column_def.col_name.as_str().to_owned(),
);
}
}
rewrite_fk_parent_cols_if_self_ref(
clause,
&normalized_tbl_name,
&rename_from,
column_def.col_name.as_str(),
);
}
_ => {}
}
for col in &mut columns {
rewrite_column_references_if_needed(
col,
&normalized_tbl_name,
&rename_from,
column_def.col_name.as_str(),
);
}
}
} else {
// This is a different table, check if it has FKs referencing the renamed column
let mut fk_updated = false;
for constraint in &mut constraints {
if let ast::TableConstraint::ForeignKey {
columns: _,
clause:
ForeignKeyClause {
tbl_name,
columns: parent_cols,
..
},
..
} = &mut constraint.constraint
{
// Check if this FK references the table being altered
if normalize_ident(tbl_name.as_str()) == table {
// Update parent column references if they match the renamed column
for parent_col in parent_cols {
if normalize_ident(parent_col.col_name.as_str())
== rename_from
{
parent_col.col_name = Name::exact(
column_def.col_name.as_str().to_owned(),
);
fk_updated = true;
}
}
}
}
}
for col in &mut columns {
let before = fk_updated;
let mut local_col = col.clone();
rewrite_column_references_if_needed(
&mut local_col,
&table,
&rename_from,
column_def.col_name.as_str(),
);
if local_col != *col {
*col = local_col;
fk_updated = true;
}
}
// Only return updated SQL if we actually changed something
if !fk_updated {
break 'sql None;
}
_ => unreachable!(),
}
Some(
@@ -5563,7 +5690,7 @@ pub fn op_function(
.to_string(),
)
}
_ => todo!(),
_ => None,
}
};
@@ -8238,43 +8365,94 @@ pub fn op_alter_column(
.clone()
};
let new_column = crate::schema::Column::from(definition);
let new_name = definition.col_name.as_str().to_owned();
conn.with_schema_mut(|schema| {
let table = schema
let table_arc = schema
.tables
.get_mut(&normalized_table_name)
.expect("table being renamed should be in schema");
.expect("table being ALTERed should be in schema");
let table = Arc::make_mut(table_arc);
let table = Arc::make_mut(table);
let Table::BTree(btree) = table else {
panic!("only btree tables can be renamed");
let Table::BTree(ref mut btree_arc) = table else {
panic!("only btree tables can be altered");
};
let btree = Arc::make_mut(btree);
let column = btree
let btree = Arc::make_mut(btree_arc);
let col = btree
.columns
.get_mut(*column_index)
.expect("renamed column should be in schema");
.expect("column being ALTERed should be in schema");
if let Some(indexes) = schema.indexes.get_mut(&normalized_table_name) {
for index in indexes {
let index = Arc::make_mut(index);
for index_column in &mut index.columns {
if index_column.name
== *column.name.as_ref().expect("btree column should be named")
{
index_column.name = definition.col_name.as_str().to_owned();
// Update indexes on THIS table that name the old column (you already had this)
if let Some(idxs) = schema.indexes.get_mut(&normalized_table_name) {
for idx in idxs {
let idx = Arc::make_mut(idx);
for ic in &mut idx.columns {
if ic.name.eq_ignore_ascii_case(
col.name.as_ref().expect("btree column should be named"),
) {
ic.name = new_name.clone();
}
}
}
}
if *rename {
col.name = Some(new_name.clone());
} else {
*col = new_column.clone();
}
// Keep primary_key_columns consistent (names may change on rename)
for (pk_name, _ord) in &mut btree.primary_key_columns {
if pk_name.eq_ignore_ascii_case(&old_column_name) {
*pk_name = new_name.clone();
}
}
// Maintain rowid-alias bit after change/rename (INTEGER PRIMARY KEY)
if !*rename {
// recompute alias from `new_column`
btree.columns[*column_index].is_rowid_alias = new_column.is_rowid_alias;
}
// Update this tables OWN foreign keys
for fk_arc in &mut btree.foreign_keys {
let fk = Arc::make_mut(fk_arc);
// child side: rename child column if it matches
for cc in &mut fk.child_columns {
if cc.eq_ignore_ascii_case(&old_column_name) {
*cc = new_name.clone();
}
}
// parent side: if self-referencing, rename parent column too
if normalize_ident(&fk.parent_table) == normalized_table_name {
for pc in &mut fk.parent_columns {
if pc.eq_ignore_ascii_case(&old_column_name) {
*pc = new_name.clone();
}
}
}
}
if *rename {
column.name = new_column.name;
} else {
*column = new_column;
// fix OTHER tables that reference this table as parent
for (tname, t_arc) in schema.tables.iter_mut() {
if normalize_ident(tname) == normalized_table_name {
continue;
}
if let Table::BTree(ref mut child_btree_arc) = Arc::make_mut(t_arc) {
let child_btree = Arc::make_mut(child_btree_arc);
for fk_arc in &mut child_btree.foreign_keys {
if normalize_ident(&fk_arc.parent_table) != normalized_table_name {
continue;
}
let fk = Arc::make_mut(fk_arc);
for pc in &mut fk.parent_columns {
if pc.eq_ignore_ascii_case(&old_column_name) {
*pc = new_name.clone();
}
}
}
}
}
});