Extract common logic for handling sqlite_schema rows

This commit is contained in:
Jussi Saurio
2025-09-11 09:30:18 +03:00
parent a93fe6be52
commit 07944e23b5
2 changed files with 191 additions and 308 deletions

View File

@@ -19,11 +19,13 @@ use crate::translate::plan::SelectPlan;
use crate::util::{
module_args_from_sql, module_name_from_sql, type_from_name, IOExt, UnparsedFromSqlIndex,
};
use crate::{
contains_ignore_ascii_case, eq_ignore_ascii_case, match_ignore_ascii_case, LimboError,
MvCursor, Pager, RefValue, SymbolTable, VirtualTable,
};
use crate::{util::normalize_ident, Result};
use crate::{LimboError, MvCursor, Pager, RefValue, SymbolTable, VirtualTable};
use core::fmt;
use std::cell::RefCell;
use std::collections::hash_map::Entry;
use std::collections::{BTreeSet, HashMap, HashSet};
use std::ops::Deref;
use std::rc::Rc;
@@ -314,184 +316,44 @@ impl Schema {
};
let mut record_cursor = cursor.record_cursor.borrow_mut();
// sqlite schema table has 5 columns: type, name, tbl_name, rootpage, sql
let ty_value = record_cursor.get_value(&row, 0)?;
let RefValue::Text(ty) = ty_value else {
return Err(LimboError::ConversionError("Expected text value".into()));
};
match ty.as_str() {
"table" => {
let root_page_value = record_cursor.get_value(&row, 3)?;
let RefValue::Integer(root_page) = root_page_value else {
return Err(LimboError::ConversionError("Expected integer value".into()));
};
let sql_value = record_cursor.get_value(&row, 4)?;
let RefValue::Text(sql_text) = sql_value else {
return Err(LimboError::ConversionError("Expected text value".into()));
};
let sql = sql_text.as_str();
let create_virtual = "create virtual";
if root_page == 0
&& sql[0..create_virtual.len()].eq_ignore_ascii_case(create_virtual)
{
let name_value = record_cursor.get_value(&row, 1)?;
let RefValue::Text(name_text) = name_value else {
return Err(LimboError::ConversionError("Expected text value".into()));
};
let name = name_text.as_str();
// a virtual table is found in the sqlite_schema, but it's no
// longer in the in-memory schema. We need to recreate it if
// the module is loaded in the symbol table.
let vtab = if let Some(vtab) = syms.vtabs.get(name) {
Arc::new((**vtab).clone())
} else {
let mod_name = module_name_from_sql(sql)?;
let vtab_rc = crate::VirtualTable::table(
Some(name),
mod_name,
module_args_from_sql(sql)?,
syms,
)?;
Arc::new((*vtab_rc).clone())
};
self.add_virtual_table(vtab);
continue;
}
let table = BTreeTable::from_sql(sql, root_page as usize)?;
// Check if this is a DBSP state table
if table.name.starts_with(DBSP_TABLE_PREFIX) {
// Extract the view name from _dbsp_state_<viewname>
let view_name = table
.name
.strip_prefix(DBSP_TABLE_PREFIX)
.unwrap()
.to_string();
dbsp_state_roots.insert(view_name, root_page as usize);
}
self.add_btree_table(Arc::new(table));
}
"index" => {
let root_page_value = record_cursor.get_value(&row, 3)?;
let RefValue::Integer(root_page) = root_page_value else {
return Err(LimboError::ConversionError("Expected integer value".into()));
};
match record_cursor.get_value(&row, 4) {
Ok(RefValue::Text(sql_text)) => {
let table_name_value = record_cursor.get_value(&row, 2)?;
let RefValue::Text(table_name_text) = table_name_value else {
return Err(LimboError::ConversionError(
"Expected text value".into(),
));
};
from_sql_indexes.push(UnparsedFromSqlIndex {
table_name: table_name_text.as_str().to_string(),
root_page: root_page as usize,
sql: sql_text.as_str().to_string(),
});
}
_ => {
let index_name_value = record_cursor.get_value(&row, 1)?;
let RefValue::Text(index_name_text) = index_name_value else {
return Err(LimboError::ConversionError(
"Expected text value".into(),
));
};
let table_name_value = record_cursor.get_value(&row, 2)?;
let RefValue::Text(table_name_text) = table_name_value else {
return Err(LimboError::ConversionError(
"Expected text value".into(),
));
};
match automatic_indices.entry(table_name_text.as_str().to_string()) {
Entry::Vacant(e) => {
e.insert(vec![(
index_name_text.as_str().to_string(),
root_page as usize,
)]);
}
Entry::Occupied(mut e) => {
e.get_mut().push((
index_name_text.as_str().to_string(),
root_page as usize,
));
}
}
}
}
}
"view" => {
let name_value = record_cursor.get_value(&row, 1)?;
let RefValue::Text(name_text) = name_value else {
return Err(LimboError::ConversionError("Expected text value".into()));
};
let name = name_text.as_str();
// Get the root page (column 3) to determine if this is a materialized view
// Regular views have rootpage = 0, materialized views have rootpage != 0
let root_page_value = record_cursor.get_value(&row, 3)?;
let RefValue::Integer(root_page_int) = root_page_value else {
return Err(LimboError::ConversionError("Expected integer value".into()));
};
let root_page = root_page_int as usize;
let sql_value = record_cursor.get_value(&row, 4)?;
let RefValue::Text(sql_text) = sql_value else {
return Err(LimboError::ConversionError("Expected text value".into()));
};
let sql = sql_text.as_str();
// Parse the SQL to determine if it's a regular or materialized view
let mut parser = Parser::new(sql.as_bytes());
if let Ok(Some(Cmd::Stmt(stmt))) = parser.next_cmd() {
match stmt {
Stmt::CreateMaterializedView { .. } => {
// Store materialized view info for later creation
// We'll create the actual IncrementalView in a later pass
// when we have both the main root page and DBSP state root
let view_name = name.to_string();
materialized_view_info
.insert(view_name, (sql.to_string(), root_page));
}
Stmt::CreateView {
view_name: _,
columns: column_names,
select,
..
} => {
// Extract actual columns from the SELECT statement
let view_columns = crate::util::extract_view_columns(&select, self);
// If column names were provided in CREATE VIEW (col1, col2, ...),
// use them to rename the columns
let mut final_columns = view_columns;
for (i, indexed_col) in column_names.iter().enumerate() {
if let Some(col) = final_columns.get_mut(i) {
col.name = Some(indexed_col.col_name.to_string());
}
}
// Create regular view
let view = View {
name: name.to_string(),
sql: sql.to_string(),
select_stmt: select,
columns: final_columns,
};
self.add_view(view);
}
_ => {}
}
}
}
_ => {}
let ty = ty.as_str();
let RefValue::Text(name) = record_cursor.get_value(&row, 1)? else {
return Err(LimboError::ConversionError("Expected text value".into()));
};
let name = name.as_str();
let table_name_value = record_cursor.get_value(&row, 2)?;
let RefValue::Text(table_name) = table_name_value else {
return Err(LimboError::ConversionError("Expected text value".into()));
};
let table_name = table_name.as_str();
let root_page_value = record_cursor.get_value(&row, 3)?;
let RefValue::Integer(root_page) = root_page_value else {
return Err(LimboError::ConversionError("Expected integer value".into()));
};
let sql_value = record_cursor.get_value(&row, 4)?;
let sql_textref = match sql_value {
RefValue::Text(sql) => Some(sql),
_ => None,
};
let sql = sql_textref.as_ref().map(|s| s.as_str());
self.handle_schema_row(
ty,
name,
table_name,
root_page,
sql,
syms,
&mut from_sql_indexes,
&mut automatic_indices,
&mut dbsp_state_roots,
&mut materialized_view_info,
)?;
drop(record_cursor);
drop(row);
@@ -566,6 +428,144 @@ impl Schema {
Ok(())
}
#[allow(clippy::too_many_arguments)]
pub fn handle_schema_row(
&mut self,
ty: &str,
name: &str,
table_name: &str,
root_page: i64,
maybe_sql: Option<&str>,
syms: &SymbolTable,
from_sql_indexes: &mut Vec<UnparsedFromSqlIndex>,
automatic_indices: &mut std::collections::HashMap<String, Vec<(String, usize)>>,
dbsp_state_roots: &mut std::collections::HashMap<String, usize>,
materialized_view_info: &mut std::collections::HashMap<String, (String, usize)>,
) -> Result<()> {
match ty {
"table" => {
let sql = maybe_sql.expect("sql should be present for table");
let sql_bytes = sql.as_bytes();
if root_page == 0 && contains_ignore_ascii_case!(sql_bytes, b"create virtual") {
// a virtual table is found in the sqlite_schema, but it's no
// longer in the in-memory schema. We need to recreate it if
// the module is loaded in the symbol table.
let vtab = if let Some(vtab) = syms.vtabs.get(name) {
vtab.clone()
} else {
let mod_name = module_name_from_sql(sql)?;
crate::VirtualTable::table(
Some(name),
mod_name,
module_args_from_sql(sql)?,
syms,
)?
};
self.add_virtual_table(vtab);
} else {
let table = BTreeTable::from_sql(sql, root_page as usize)?;
// Check if this is a DBSP state table
if table.name.starts_with(DBSP_TABLE_PREFIX) {
// Extract the view name from __turso_internal_dbsp_state_<viewname>
let view_name = table
.name
.strip_prefix(DBSP_TABLE_PREFIX)
.unwrap()
.to_string();
dbsp_state_roots.insert(view_name, root_page as usize);
}
self.add_btree_table(Arc::new(table));
}
}
"index" => {
match maybe_sql {
Some(sql) => {
from_sql_indexes.push(UnparsedFromSqlIndex {
table_name: table_name.to_string(),
root_page: root_page as usize,
sql: sql.to_string(),
});
}
None => {
// Automatic index on primary key and/or unique constraint, e.g.
// table|foo|foo|2|CREATE TABLE foo (a text PRIMARY KEY, b)
// index|sqlite_autoindex_foo_1|foo|3|
let index_name = name.to_string();
let table_name = table_name.to_string();
match automatic_indices.entry(table_name) {
std::collections::hash_map::Entry::Vacant(e) => {
e.insert(vec![(index_name, root_page as usize)]);
}
std::collections::hash_map::Entry::Occupied(mut e) => {
e.get_mut().push((index_name, root_page as usize));
}
}
}
}
}
"view" => {
use crate::schema::View;
use turso_parser::ast::{Cmd, Stmt};
use turso_parser::parser::Parser;
let sql = maybe_sql.expect("sql should be present for view");
let view_name = name.to_string();
// Parse the SQL to determine if it's a regular or materialized view
let mut parser = Parser::new(sql.as_bytes());
if let Ok(Some(Cmd::Stmt(stmt))) = parser.next_cmd() {
match stmt {
Stmt::CreateMaterializedView { .. } => {
// Store materialized view info for later creation
// We'll handle reuse logic and create the actual IncrementalView
// in a later pass when we have both the main root page and DBSP state root
materialized_view_info
.insert(view_name.clone(), (sql.to_string(), root_page as usize));
// Mark the existing view for potential reuse
if self.incremental_views.contains_key(&view_name) {
// We'll check for reuse in the third pass
}
}
Stmt::CreateView {
view_name: _,
columns: column_names,
select,
..
} => {
// Extract actual columns from the SELECT statement
let view_columns = crate::util::extract_view_columns(&select, self);
// If column names were provided in CREATE VIEW (col1, col2, ...),
// use them to rename the columns
let mut final_columns = view_columns;
for (i, indexed_col) in column_names.iter().enumerate() {
if let Some(col) = final_columns.get_mut(i) {
col.name = Some(indexed_col.col_name.to_string());
}
}
// Create regular view
let view = View {
name: name.to_string(),
sql: sql.to_string(),
select_stmt: select,
columns: final_columns,
};
self.add_view(view);
}
_ => {}
}
}
}
_ => {}
};
Ok(())
}
}
impl Clone for Schema {

View File

@@ -169,139 +169,22 @@ pub fn parse_schema_rows(
StepResult::Row => {
let row = rows.row().unwrap();
let ty = row.get::<&str>(0)?;
match ty {
"table" => {
let root_page: i64 = row.get::<i64>(3)?;
let sql: &str = row.get::<&str>(4)?;
let sql_bytes = sql.as_bytes();
if root_page == 0
&& contains_ignore_ascii_case!(sql_bytes, b"create virtual")
{
let name: &str = row.get::<&str>(1)?;
// a virtual table is found in the sqlite_schema, but it's no
// longer in the in-memory schema. We need to recreate it if
// the module is loaded in the symbol table.
let vtab = if let Some(vtab) = syms.vtabs.get(name) {
vtab.clone()
} else {
let mod_name = module_name_from_sql(sql)?;
crate::VirtualTable::table(
Some(name),
mod_name,
module_args_from_sql(sql)?,
syms,
)?
};
schema.add_virtual_table(vtab);
} else {
let table = schema::BTreeTable::from_sql(sql, root_page as usize)?;
// Check if this is a DBSP state table
if table.name.starts_with(DBSP_TABLE_PREFIX) {
// Extract the view name from __turso_internal_dbsp_state_<viewname>
let view_name = table
.name
.strip_prefix(DBSP_TABLE_PREFIX)
.unwrap()
.to_string();
dbsp_state_roots.insert(view_name, root_page as usize);
}
schema.add_btree_table(Arc::new(table));
}
}
"index" => {
let root_page: i64 = row.get::<i64>(3)?;
match row.get::<&str>(4) {
Ok(sql) => {
from_sql_indexes.push(UnparsedFromSqlIndex {
table_name: row.get::<&str>(2)?.to_string(),
root_page: root_page as usize,
sql: sql.to_string(),
});
}
_ => {
// Automatic index on primary key and/or unique constraint, e.g.
// table|foo|foo|2|CREATE TABLE foo (a text PRIMARY KEY, b)
// index|sqlite_autoindex_foo_1|foo|3|
let index_name = row.get::<&str>(1)?.to_string();
let table_name = row.get::<&str>(2)?.to_string();
let root_page = row.get::<i64>(3)?;
match automatic_indices.entry(table_name) {
std::collections::hash_map::Entry::Vacant(e) => {
e.insert(vec![(index_name, root_page as usize)]);
}
std::collections::hash_map::Entry::Occupied(mut e) => {
e.get_mut().push((index_name, root_page as usize));
}
}
}
}
}
"view" => {
use crate::incremental::view::IncrementalView;
use crate::schema::View;
use fallible_iterator::FallibleIterator;
use turso_parser::ast::{Cmd, Stmt};
use turso_parser::parser::Parser;
let name: &str = row.get::<&str>(1)?;
let root_page = row.get::<i64>(3)?;
let sql: &str = row.get::<&str>(4)?;
let view_name = name.to_string();
// Parse the SQL to determine if it's a regular or materialized view
let mut parser = Parser::new(sql.as_bytes());
if let Ok(Some(Cmd::Stmt(stmt))) = parser.next_cmd() {
match stmt {
Stmt::CreateMaterializedView { .. } => {
// Store materialized view info for later creation
// We'll handle reuse logic and create the actual IncrementalView
// in a later pass when we have both the main root page and DBSP state root
materialized_view_info.insert(
view_name.clone(),
(sql.to_string(), root_page as usize),
);
// Mark the existing view for potential reuse
if existing_views.contains_key(&view_name) {
// We'll check for reuse in the third pass
}
}
Stmt::CreateView {
view_name: _,
columns: column_names,
select,
..
} => {
// Extract actual columns from the SELECT statement
let view_columns =
crate::util::extract_view_columns(&select, schema);
// If column names were provided in CREATE VIEW (col1, col2, ...),
// use them to rename the columns
let mut final_columns = view_columns;
for (i, indexed_col) in column_names.iter().enumerate() {
if let Some(col) = final_columns.get_mut(i) {
col.name = Some(indexed_col.col_name.to_string());
}
}
// Create regular view
let view = View {
name: name.to_string(),
sql: sql.to_string(),
select_stmt: select,
columns: final_columns,
};
schema.add_view(view);
}
_ => {}
}
}
}
_ => continue,
}
let name = row.get::<&str>(1)?;
let table_name = row.get::<&str>(2)?;
let root_page = row.get::<i64>(3)?;
let sql = row.get::<&str>(4).ok();
schema.handle_schema_row(
ty,
name,
table_name,
root_page,
sql,
syms,
&mut from_sql_indexes,
&mut automatic_indices,
&mut dbsp_state_roots,
&mut materialized_view_info,
)?;
}
StepResult::IO => {
// TODO: How do we ensure that the I/O we submitted to