Merge 'Refactor parseschema' from Jussi Saurio

Extracts out duplicated logic from `Schema::make_from_btree()` and
`parse_schema_rows()`

Closes #3015
This commit is contained in:
Jussi Saurio
2025-09-11 13:35:36 +03:00
committed by GitHub
2 changed files with 215 additions and 390 deletions

View File

@@ -19,11 +19,13 @@ use crate::translate::plan::SelectPlan;
use crate::util::{
module_args_from_sql, module_name_from_sql, type_from_name, IOExt, UnparsedFromSqlIndex,
};
use crate::{
contains_ignore_ascii_case, eq_ignore_ascii_case, match_ignore_ascii_case, LimboError,
MvCursor, Pager, RefValue, SymbolTable, VirtualTable,
};
use crate::{util::normalize_ident, Result};
use crate::{LimboError, MvCursor, Pager, RefValue, SymbolTable, VirtualTable};
use core::fmt;
use std::cell::RefCell;
use std::collections::hash_map::Entry;
use std::collections::{BTreeSet, HashMap, HashSet};
use std::ops::Deref;
use std::rc::Rc;
@@ -314,184 +316,44 @@ impl Schema {
};
let mut record_cursor = cursor.record_cursor.borrow_mut();
// sqlite schema table has 5 columns: type, name, tbl_name, rootpage, sql
let ty_value = record_cursor.get_value(&row, 0)?;
let RefValue::Text(ty) = ty_value else {
return Err(LimboError::ConversionError("Expected text value".into()));
};
match ty.as_str() {
"table" => {
let root_page_value = record_cursor.get_value(&row, 3)?;
let RefValue::Integer(root_page) = root_page_value else {
return Err(LimboError::ConversionError("Expected integer value".into()));
};
let sql_value = record_cursor.get_value(&row, 4)?;
let RefValue::Text(sql_text) = sql_value else {
return Err(LimboError::ConversionError("Expected text value".into()));
};
let sql = sql_text.as_str();
let create_virtual = "create virtual";
if root_page == 0
&& sql[0..create_virtual.len()].eq_ignore_ascii_case(create_virtual)
{
let name_value = record_cursor.get_value(&row, 1)?;
let RefValue::Text(name_text) = name_value else {
return Err(LimboError::ConversionError("Expected text value".into()));
};
let name = name_text.as_str();
// a virtual table is found in the sqlite_schema, but it's no
// longer in the in-memory schema. We need to recreate it if
// the module is loaded in the symbol table.
let vtab = if let Some(vtab) = syms.vtabs.get(name) {
Arc::new((**vtab).clone())
} else {
let mod_name = module_name_from_sql(sql)?;
let vtab_rc = crate::VirtualTable::table(
Some(name),
mod_name,
module_args_from_sql(sql)?,
syms,
)?;
Arc::new((*vtab_rc).clone())
};
self.add_virtual_table(vtab);
continue;
}
let table = BTreeTable::from_sql(sql, root_page as usize)?;
// Check if this is a DBSP state table
if table.name.starts_with(DBSP_TABLE_PREFIX) {
// Extract the view name from _dbsp_state_<viewname>
let view_name = table
.name
.strip_prefix(DBSP_TABLE_PREFIX)
.unwrap()
.to_string();
dbsp_state_roots.insert(view_name, root_page as usize);
}
self.add_btree_table(Arc::new(table));
}
"index" => {
let root_page_value = record_cursor.get_value(&row, 3)?;
let RefValue::Integer(root_page) = root_page_value else {
return Err(LimboError::ConversionError("Expected integer value".into()));
};
match record_cursor.get_value(&row, 4) {
Ok(RefValue::Text(sql_text)) => {
let table_name_value = record_cursor.get_value(&row, 2)?;
let RefValue::Text(table_name_text) = table_name_value else {
return Err(LimboError::ConversionError(
"Expected text value".into(),
));
};
from_sql_indexes.push(UnparsedFromSqlIndex {
table_name: table_name_text.as_str().to_string(),
root_page: root_page as usize,
sql: sql_text.as_str().to_string(),
});
}
_ => {
let index_name_value = record_cursor.get_value(&row, 1)?;
let RefValue::Text(index_name_text) = index_name_value else {
return Err(LimboError::ConversionError(
"Expected text value".into(),
));
};
let table_name_value = record_cursor.get_value(&row, 2)?;
let RefValue::Text(table_name_text) = table_name_value else {
return Err(LimboError::ConversionError(
"Expected text value".into(),
));
};
match automatic_indices.entry(table_name_text.as_str().to_string()) {
Entry::Vacant(e) => {
e.insert(vec![(
index_name_text.as_str().to_string(),
root_page as usize,
)]);
}
Entry::Occupied(mut e) => {
e.get_mut().push((
index_name_text.as_str().to_string(),
root_page as usize,
));
}
}
}
}
}
"view" => {
let name_value = record_cursor.get_value(&row, 1)?;
let RefValue::Text(name_text) = name_value else {
return Err(LimboError::ConversionError("Expected text value".into()));
};
let name = name_text.as_str();
// Get the root page (column 3) to determine if this is a materialized view
// Regular views have rootpage = 0, materialized views have rootpage != 0
let root_page_value = record_cursor.get_value(&row, 3)?;
let RefValue::Integer(root_page_int) = root_page_value else {
return Err(LimboError::ConversionError("Expected integer value".into()));
};
let root_page = root_page_int as usize;
let sql_value = record_cursor.get_value(&row, 4)?;
let RefValue::Text(sql_text) = sql_value else {
return Err(LimboError::ConversionError("Expected text value".into()));
};
let sql = sql_text.as_str();
// Parse the SQL to determine if it's a regular or materialized view
let mut parser = Parser::new(sql.as_bytes());
if let Ok(Some(Cmd::Stmt(stmt))) = parser.next_cmd() {
match stmt {
Stmt::CreateMaterializedView { .. } => {
// Store materialized view info for later creation
// We'll create the actual IncrementalView in a later pass
// when we have both the main root page and DBSP state root
let view_name = name.to_string();
materialized_view_info
.insert(view_name, (sql.to_string(), root_page));
}
Stmt::CreateView {
view_name: _,
columns: column_names,
select,
..
} => {
// Extract actual columns from the SELECT statement
let view_columns = crate::util::extract_view_columns(&select, self);
// If column names were provided in CREATE VIEW (col1, col2, ...),
// use them to rename the columns
let mut final_columns = view_columns;
for (i, indexed_col) in column_names.iter().enumerate() {
if let Some(col) = final_columns.get_mut(i) {
col.name = Some(indexed_col.col_name.to_string());
}
}
// Create regular view
let view = View {
name: name.to_string(),
sql: sql.to_string(),
select_stmt: select,
columns: final_columns,
};
self.add_view(view);
}
_ => {}
}
}
}
_ => {}
let ty = ty.as_str();
let RefValue::Text(name) = record_cursor.get_value(&row, 1)? else {
return Err(LimboError::ConversionError("Expected text value".into()));
};
let name = name.as_str();
let table_name_value = record_cursor.get_value(&row, 2)?;
let RefValue::Text(table_name) = table_name_value else {
return Err(LimboError::ConversionError("Expected text value".into()));
};
let table_name = table_name.as_str();
let root_page_value = record_cursor.get_value(&row, 3)?;
let RefValue::Integer(root_page) = root_page_value else {
return Err(LimboError::ConversionError("Expected integer value".into()));
};
let sql_value = record_cursor.get_value(&row, 4)?;
let sql_textref = match sql_value {
RefValue::Text(sql) => Some(sql),
_ => None,
};
let sql = sql_textref.as_ref().map(|s| s.as_str());
self.handle_schema_row(
ty,
name,
table_name,
root_page,
sql,
syms,
&mut from_sql_indexes,
&mut automatic_indices,
&mut dbsp_state_roots,
&mut materialized_view_info,
)?;
drop(record_cursor);
drop(row);
@@ -500,6 +362,21 @@ impl Schema {
pager.end_read_tx()?;
self.populate_indices(from_sql_indexes, automatic_indices)?;
self.populate_materialized_views(materialized_view_info, dbsp_state_roots)?;
Ok(())
}
/// Populate indices parsed from the schema.
/// from_sql_indexes: indices explicitly created with CREATE INDEX
/// automatic_indices: indices created automatically for primary key and unique constraints
pub fn populate_indices(
&mut self,
from_sql_indexes: Vec<UnparsedFromSqlIndex>,
automatic_indices: std::collections::HashMap<String, Vec<(String, usize)>>,
) -> Result<()> {
for unparsed_sql_from_index in from_sql_indexes {
if !self.indexes_enabled() {
self.table_set_has_index(&unparsed_sql_from_index.table_name);
@@ -530,8 +407,15 @@ impl Schema {
}
}
}
Ok(())
}
// Third pass: Create materialized views now that we have both root pages
/// Populate materialized views parsed from the schema.
pub fn populate_materialized_views(
&mut self,
materialized_view_info: std::collections::HashMap<String, (String, usize)>,
dbsp_state_roots: std::collections::HashMap<String, usize>,
) -> Result<()> {
for (view_name, (sql, main_root)) in materialized_view_info {
// Look up the DBSP state root for this view - must exist for materialized views
let dbsp_state_root = dbsp_state_roots.get(&view_name).ok_or_else(|| {
@@ -563,6 +447,143 @@ impl Schema {
self.add_materialized_view_dependency(&table_name, &view_name);
}
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
pub fn handle_schema_row(
&mut self,
ty: &str,
name: &str,
table_name: &str,
root_page: i64,
maybe_sql: Option<&str>,
syms: &SymbolTable,
from_sql_indexes: &mut Vec<UnparsedFromSqlIndex>,
automatic_indices: &mut std::collections::HashMap<String, Vec<(String, usize)>>,
dbsp_state_roots: &mut std::collections::HashMap<String, usize>,
materialized_view_info: &mut std::collections::HashMap<String, (String, usize)>,
) -> Result<()> {
match ty {
"table" => {
let sql = maybe_sql.expect("sql should be present for table");
let sql_bytes = sql.as_bytes();
if root_page == 0 && contains_ignore_ascii_case!(sql_bytes, b"create virtual") {
// a virtual table is found in the sqlite_schema, but it's no
// longer in the in-memory schema. We need to recreate it if
// the module is loaded in the symbol table.
let vtab = if let Some(vtab) = syms.vtabs.get(name) {
vtab.clone()
} else {
let mod_name = module_name_from_sql(sql)?;
crate::VirtualTable::table(
Some(name),
mod_name,
module_args_from_sql(sql)?,
syms,
)?
};
self.add_virtual_table(vtab);
} else {
let table = BTreeTable::from_sql(sql, root_page as usize)?;
// Check if this is a DBSP state table
if table.name.starts_with(DBSP_TABLE_PREFIX) {
// Extract the view name from __turso_internal_dbsp_state_<viewname>
let view_name = table
.name
.strip_prefix(DBSP_TABLE_PREFIX)
.unwrap()
.to_string();
dbsp_state_roots.insert(view_name, root_page as usize);
}
self.add_btree_table(Arc::new(table));
}
}
"index" => {
match maybe_sql {
Some(sql) => {
from_sql_indexes.push(UnparsedFromSqlIndex {
table_name: table_name.to_string(),
root_page: root_page as usize,
sql: sql.to_string(),
});
}
None => {
// Automatic index on primary key and/or unique constraint, e.g.
// table|foo|foo|2|CREATE TABLE foo (a text PRIMARY KEY, b)
// index|sqlite_autoindex_foo_1|foo|3|
let index_name = name.to_string();
let table_name = table_name.to_string();
match automatic_indices.entry(table_name) {
std::collections::hash_map::Entry::Vacant(e) => {
e.insert(vec![(index_name, root_page as usize)]);
}
std::collections::hash_map::Entry::Occupied(mut e) => {
e.get_mut().push((index_name, root_page as usize));
}
}
}
}
}
"view" => {
use crate::schema::View;
use turso_parser::ast::{Cmd, Stmt};
use turso_parser::parser::Parser;
let sql = maybe_sql.expect("sql should be present for view");
let view_name = name.to_string();
// Parse the SQL to determine if it's a regular or materialized view
let mut parser = Parser::new(sql.as_bytes());
if let Ok(Some(Cmd::Stmt(stmt))) = parser.next_cmd() {
match stmt {
Stmt::CreateMaterializedView { .. } => {
// Store materialized view info for later creation
// We'll handle reuse logic and create the actual IncrementalView
// in a later pass when we have both the main root page and DBSP state root
materialized_view_info
.insert(view_name.clone(), (sql.to_string(), root_page as usize));
// Mark the existing view for potential reuse
if self.incremental_views.contains_key(&view_name) {
// We'll check for reuse in the third pass
}
}
Stmt::CreateView {
view_name: _,
columns: column_names,
select,
..
} => {
// Extract actual columns from the SELECT statement
let view_columns = crate::util::extract_view_columns(&select, self);
// If column names were provided in CREATE VIEW (col1, col2, ...),
// use them to rename the columns
let mut final_columns = view_columns;
for (i, indexed_col) in column_names.iter().enumerate() {
if let Some(col) = final_columns.get_mut(i) {
col.name = Some(indexed_col.col_name.to_string());
}
}
// Create regular view
let view = View {
name: name.to_string(),
sql: sql.to_string(),
select_stmt: select,
columns: final_columns,
};
self.add_view(view);
}
_ => {}
}
}
}
_ => {}
};
Ok(())
}

View File

@@ -170,139 +170,22 @@ pub fn parse_schema_rows(
StepResult::Row => {
let row = rows.row().unwrap();
let ty = row.get::<&str>(0)?;
match ty {
"table" => {
let root_page: i64 = row.get::<i64>(3)?;
let sql: &str = row.get::<&str>(4)?;
let sql_bytes = sql.as_bytes();
if root_page == 0
&& contains_ignore_ascii_case!(sql_bytes, b"create virtual")
{
let name: &str = row.get::<&str>(1)?;
// a virtual table is found in the sqlite_schema, but it's no
// longer in the in-memory schema. We need to recreate it if
// the module is loaded in the symbol table.
let vtab = if let Some(vtab) = syms.vtabs.get(name) {
vtab.clone()
} else {
let mod_name = module_name_from_sql(sql)?;
crate::VirtualTable::table(
Some(name),
mod_name,
module_args_from_sql(sql)?,
syms,
)?
};
schema.add_virtual_table(vtab);
} else {
let table = schema::BTreeTable::from_sql(sql, root_page as usize)?;
// Check if this is a DBSP state table
if table.name.starts_with(DBSP_TABLE_PREFIX) {
// Extract the view name from __turso_internal_dbsp_state_<viewname>
let view_name = table
.name
.strip_prefix(DBSP_TABLE_PREFIX)
.unwrap()
.to_string();
dbsp_state_roots.insert(view_name, root_page as usize);
}
schema.add_btree_table(Arc::new(table));
}
}
"index" => {
let root_page: i64 = row.get::<i64>(3)?;
match row.get::<&str>(4) {
Ok(sql) => {
from_sql_indexes.push(UnparsedFromSqlIndex {
table_name: row.get::<&str>(2)?.to_string(),
root_page: root_page as usize,
sql: sql.to_string(),
});
}
_ => {
// Automatic index on primary key and/or unique constraint, e.g.
// table|foo|foo|2|CREATE TABLE foo (a text PRIMARY KEY, b)
// index|sqlite_autoindex_foo_1|foo|3|
let index_name = row.get::<&str>(1)?.to_string();
let table_name = row.get::<&str>(2)?.to_string();
let root_page = row.get::<i64>(3)?;
match automatic_indices.entry(table_name) {
std::collections::hash_map::Entry::Vacant(e) => {
e.insert(vec![(index_name, root_page as usize)]);
}
std::collections::hash_map::Entry::Occupied(mut e) => {
e.get_mut().push((index_name, root_page as usize));
}
}
}
}
}
"view" => {
use crate::incremental::view::IncrementalView;
use crate::schema::View;
use fallible_iterator::FallibleIterator;
use turso_parser::ast::{Cmd, Stmt};
use turso_parser::parser::Parser;
let name: &str = row.get::<&str>(1)?;
let root_page = row.get::<i64>(3)?;
let sql: &str = row.get::<&str>(4)?;
let view_name = name.to_string();
// Parse the SQL to determine if it's a regular or materialized view
let mut parser = Parser::new(sql.as_bytes());
if let Ok(Some(Cmd::Stmt(stmt))) = parser.next_cmd() {
match stmt {
Stmt::CreateMaterializedView { .. } => {
// Store materialized view info for later creation
// We'll handle reuse logic and create the actual IncrementalView
// in a later pass when we have both the main root page and DBSP state root
materialized_view_info.insert(
view_name.clone(),
(sql.to_string(), root_page as usize),
);
// Mark the existing view for potential reuse
if existing_views.contains_key(&view_name) {
// We'll check for reuse in the third pass
}
}
Stmt::CreateView {
view_name: _,
columns: column_names,
select,
..
} => {
// Extract actual columns from the SELECT statement
let view_columns =
crate::util::extract_view_columns(&select, schema);
// If column names were provided in CREATE VIEW (col1, col2, ...),
// use them to rename the columns
let mut final_columns = view_columns;
for (i, indexed_col) in column_names.iter().enumerate() {
if let Some(col) = final_columns.get_mut(i) {
col.name = Some(indexed_col.col_name.to_string());
}
}
// Create regular view
let view = View {
name: name.to_string(),
sql: sql.to_string(),
select_stmt: select,
columns: final_columns,
};
schema.add_view(view);
}
_ => {}
}
}
}
_ => continue,
}
let name = row.get::<&str>(1)?;
let table_name = row.get::<&str>(2)?;
let root_page = row.get::<i64>(3)?;
let sql = row.get::<&str>(4).ok();
schema.handle_schema_row(
ty,
name,
table_name,
root_page,
sql,
syms,
&mut from_sql_indexes,
&mut automatic_indices,
&mut dbsp_state_roots,
&mut materialized_view_info,
)?;
}
StepResult::IO => {
// TODO: How do we ensure that the I/O we submitted to
@@ -314,88 +197,9 @@ pub fn parse_schema_rows(
StepResult::Busy => break,
}
}
for unparsed_sql_from_index in from_sql_indexes {
if !schema.indexes_enabled() {
schema.table_set_has_index(&unparsed_sql_from_index.table_name);
} else {
let table = schema
.get_btree_table(&unparsed_sql_from_index.table_name)
.unwrap();
let index = schema::Index::from_sql(
&unparsed_sql_from_index.sql,
unparsed_sql_from_index.root_page,
table.as_ref(),
)?;
schema.add_index(Arc::new(index));
}
}
for automatic_index in automatic_indices {
if !schema.indexes_enabled() {
schema.table_set_has_index(&automatic_index.0);
} else {
let table = schema.get_btree_table(&automatic_index.0).unwrap();
let ret_index = schema::Index::automatic_from_primary_key_and_unique(
table.as_ref(),
automatic_index.1,
)?;
for index in ret_index {
schema.add_index(Arc::new(index));
}
}
}
// Third pass: Create materialized views now that we have both root pages
for (view_name, (sql, main_root)) in materialized_view_info {
// Look up the DBSP state root for this view - must exist for materialized views
let dbsp_state_root = dbsp_state_roots.get(&view_name).ok_or_else(|| {
LimboError::InternalError(format!(
"Materialized view {view_name} is missing its DBSP state table"
))
})?;
// Check if we can reuse the existing view
let mut reuse_view = false;
if let Some(existing_view_mutex) = schema.get_materialized_view(&view_name) {
let existing_view = existing_view_mutex.lock().unwrap();
if let Some(existing_sql) = schema.materialized_view_sql.get(&view_name) {
if existing_sql == &sql {
reuse_view = true;
}
}
}
if reuse_view {
// View already exists with same SQL, just update dependencies
let existing_view_mutex = schema.get_materialized_view(&view_name).unwrap();
let existing_view = existing_view_mutex.lock().unwrap();
let referenced_tables = existing_view.get_referenced_table_names();
drop(existing_view); // Release lock before modifying schema
for table_name in referenced_tables {
schema.add_materialized_view_dependency(&table_name, &view_name);
}
} else {
// Create new IncrementalView with both root pages
let incremental_view =
IncrementalView::from_sql(&sql, schema, main_root, *dbsp_state_root)?;
let referenced_tables = incremental_view.get_referenced_table_names();
// Create a Table for the materialized view
let table = Arc::new(schema::Table::BTree(Arc::new(schema::BTreeTable {
root_page: main_root,
name: view_name.clone(),
columns: incremental_view.columns.clone(), // Use the view's columns, not the base table's
primary_key_columns: vec![],
has_rowid: true,
is_strict: false,
unique_sets: None,
})));
schema.add_materialized_view(incremental_view, table, sql.clone());
for table_name in referenced_tables {
schema.add_materialized_view_dependency(&table_name, &view_name);
}
}
}
schema.populate_indices(from_sql_indexes, automatic_indices)?;
schema.populate_materialized_views(materialized_view_info, dbsp_state_roots)?;
Ok(())
}