mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-23 08:55:40 +01:00
rename some structures to mention materialized views
A lot of the structures we have - like the ones under Schema, are specific for materialized views. In preparation to adding normal views, rename them, so things are less confusing.
This commit is contained in:
10
core/lib.rs
10
core/lib.rs
@@ -373,7 +373,7 @@ impl Database {
|
||||
let pager = conn.pager.borrow().clone();
|
||||
pager
|
||||
.io
|
||||
.block(|| conn.schema.borrow().populate_views(&conn))?;
|
||||
.block(|| conn.schema.borrow().populate_materialized_views(&conn))?;
|
||||
Ok(db)
|
||||
}
|
||||
|
||||
@@ -898,7 +898,7 @@ impl Connection {
|
||||
|
||||
// Preserve existing views to avoid expensive repopulation.
|
||||
// TODO: We may not need to do this if we materialize our views.
|
||||
let existing_views = self.schema.borrow().views.clone();
|
||||
let existing_views = self.schema.borrow().materialized_views.clone();
|
||||
|
||||
// TODO: this is hack to avoid a cyclical problem with schema reprepare
|
||||
// The problem here is that we prepare a statement here, but when the statement tries
|
||||
@@ -920,7 +920,9 @@ impl Connection {
|
||||
|
||||
{
|
||||
let schema = self.schema.borrow();
|
||||
pager.io.block(|| schema.populate_views(self))?;
|
||||
pager
|
||||
.io
|
||||
.block(|| schema.populate_materialized_views(self))?;
|
||||
}
|
||||
Result::Ok(())
|
||||
};
|
||||
@@ -1526,7 +1528,7 @@ impl Connection {
|
||||
.expect("query must be parsed to statement");
|
||||
let syms = self.syms.borrow();
|
||||
self.with_schema_mut(|schema| {
|
||||
let existing_views = schema.views.clone();
|
||||
let existing_views = schema.materialized_views.clone();
|
||||
if let Err(LimboError::ExtensionError(e)) =
|
||||
parse_schema_rows(rows, schema, &syms, None, existing_views)
|
||||
{
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
use crate::incremental::view::IncrementalView;
|
||||
use crate::types::IOResult;
|
||||
|
||||
/// Type alias for the views collection
|
||||
pub type ViewsMap = HashMap<String, Arc<Mutex<IncrementalView>>>;
|
||||
/// Type alias for the materialized views collection
|
||||
pub type MaterializedViewsMap = HashMap<String, Arc<Mutex<IncrementalView>>>;
|
||||
use crate::result::LimboResult;
|
||||
use crate::storage::btree::BTreeCursor;
|
||||
use crate::translate::collate::CollationSeq;
|
||||
@@ -32,7 +32,7 @@ const SCHEMA_TABLE_NAME_ALT: &str = "sqlite_master";
|
||||
#[derive(Debug)]
|
||||
pub struct Schema {
|
||||
pub tables: HashMap<String, Arc<Table>>,
|
||||
pub views: ViewsMap,
|
||||
pub materialized_views: MaterializedViewsMap,
|
||||
|
||||
/// table_name to list of indexes for the table
|
||||
pub indexes: HashMap<String, Vec<Arc<Index>>>,
|
||||
@@ -40,8 +40,8 @@ pub struct Schema {
|
||||
pub indexes_enabled: bool,
|
||||
pub schema_version: u32,
|
||||
|
||||
/// Mapping from table names to the views that depend on them
|
||||
pub table_to_views: HashMap<String, Vec<String>>,
|
||||
/// Mapping from table names to the materialized views that depend on them
|
||||
pub table_to_materialized_views: HashMap<String, Vec<String>>,
|
||||
}
|
||||
|
||||
impl Schema {
|
||||
@@ -60,16 +60,16 @@ impl Schema {
|
||||
Arc::new(Table::Virtual(Arc::new((*function).clone()))),
|
||||
);
|
||||
}
|
||||
let views: ViewsMap = HashMap::new();
|
||||
let table_to_views: HashMap<String, Vec<String>> = HashMap::new();
|
||||
let materialized_views: MaterializedViewsMap = HashMap::new();
|
||||
let table_to_materialized_views: HashMap<String, Vec<String>> = HashMap::new();
|
||||
Self {
|
||||
tables,
|
||||
views,
|
||||
materialized_views,
|
||||
indexes,
|
||||
has_indexes,
|
||||
indexes_enabled,
|
||||
schema_version: 0,
|
||||
table_to_views,
|
||||
table_to_materialized_views,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -79,52 +79,56 @@ impl Schema {
|
||||
.iter()
|
||||
.any(|idx| idx.1.iter().any(|i| i.name == name))
|
||||
}
|
||||
pub fn add_view(&mut self, view: IncrementalView) {
|
||||
pub fn add_materialized_view(&mut self, view: IncrementalView) {
|
||||
let name = normalize_ident(view.name());
|
||||
self.views.insert(name, Arc::new(Mutex::new(view)));
|
||||
self.materialized_views
|
||||
.insert(name, Arc::new(Mutex::new(view)));
|
||||
}
|
||||
|
||||
pub fn get_view(&self, name: &str) -> Option<Arc<Mutex<IncrementalView>>> {
|
||||
pub fn get_materialized_view(&self, name: &str) -> Option<Arc<Mutex<IncrementalView>>> {
|
||||
let name = normalize_ident(name);
|
||||
self.views.get(&name).cloned()
|
||||
self.materialized_views.get(&name).cloned()
|
||||
}
|
||||
|
||||
pub fn remove_view(&mut self, name: &str) -> Option<Arc<Mutex<IncrementalView>>> {
|
||||
let name = normalize_ident(name);
|
||||
|
||||
// Remove from table_to_views dependencies
|
||||
for views in self.table_to_views.values_mut() {
|
||||
// Remove from table_to_materialized_views dependencies
|
||||
for views in self.table_to_materialized_views.values_mut() {
|
||||
views.retain(|v| v != &name);
|
||||
}
|
||||
|
||||
// Remove the view itself
|
||||
self.views.remove(&name)
|
||||
// Remove the materialized view itself
|
||||
self.materialized_views.remove(&name)
|
||||
}
|
||||
|
||||
/// Register that a view depends on a table
|
||||
pub fn add_view_dependency(&mut self, table_name: &str, view_name: &str) {
|
||||
/// Register that a materialized view depends on a table
|
||||
pub fn add_materialized_view_dependency(&mut self, table_name: &str, view_name: &str) {
|
||||
let table_name = normalize_ident(table_name);
|
||||
let view_name = normalize_ident(view_name);
|
||||
|
||||
self.table_to_views
|
||||
self.table_to_materialized_views
|
||||
.entry(table_name)
|
||||
.or_default()
|
||||
.push(view_name);
|
||||
}
|
||||
|
||||
/// Get all views that depend on a given table
|
||||
pub fn get_dependent_views(&self, table_name: &str) -> Vec<String> {
|
||||
/// Get all materialized views that depend on a given table
|
||||
pub fn get_dependent_materialized_views(&self, table_name: &str) -> Vec<String> {
|
||||
let table_name = normalize_ident(table_name);
|
||||
self.table_to_views
|
||||
self.table_to_materialized_views
|
||||
.get(&table_name)
|
||||
.cloned()
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
/// Populate all views by scanning their source tables
|
||||
/// Populate all materialized views by scanning their source tables
|
||||
/// Returns IOResult to support async execution
|
||||
pub fn populate_views(&self, conn: &Arc<crate::Connection>) -> Result<IOResult<()>> {
|
||||
for view in self.views.values() {
|
||||
pub fn populate_materialized_views(
|
||||
&self,
|
||||
conn: &Arc<crate::Connection>,
|
||||
) -> Result<IOResult<()>> {
|
||||
for view in self.materialized_views.values() {
|
||||
let mut view = view
|
||||
.lock()
|
||||
.map_err(|_| LimboError::InternalError("Failed to lock view".to_string()))?;
|
||||
@@ -362,7 +366,7 @@ impl Schema {
|
||||
let view_name = name.to_string();
|
||||
|
||||
// Add to schema (moves incremental_view)
|
||||
self.add_view(incremental_view);
|
||||
self.add_materialized_view(incremental_view);
|
||||
|
||||
// Store for second pass processing
|
||||
views_to_process.push((view_name, referenced_tables));
|
||||
@@ -385,7 +389,7 @@ impl Schema {
|
||||
for (view_name, referenced_tables) in views_to_process {
|
||||
// Register this view as dependent on each referenced table
|
||||
for table_name in referenced_tables {
|
||||
self.add_view_dependency(&table_name, &view_name);
|
||||
self.add_materialized_view_dependency(&table_name, &view_name);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -465,19 +469,19 @@ impl Clone for Schema {
|
||||
(name.clone(), indexes)
|
||||
})
|
||||
.collect();
|
||||
let views = self
|
||||
.views
|
||||
let materialized_views = self
|
||||
.materialized_views
|
||||
.iter()
|
||||
.map(|(name, view)| (name.clone(), view.clone()))
|
||||
.collect();
|
||||
Self {
|
||||
tables,
|
||||
views,
|
||||
materialized_views,
|
||||
indexes,
|
||||
has_indexes: self.has_indexes.clone(),
|
||||
indexes_enabled: self.indexes_enabled,
|
||||
schema_version: self.schema_version,
|
||||
table_to_views: self.table_to_views.clone(),
|
||||
table_to_materialized_views: self.table_to_materialized_views.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -470,7 +470,9 @@ fn parse_table(
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let view = connection.with_schema(database_id, |schema| schema.get_view(table_name.as_str()));
|
||||
let view = connection.with_schema(database_id, |schema| {
|
||||
schema.get_materialized_view(table_name.as_str())
|
||||
});
|
||||
if let Some(view) = view {
|
||||
// Create a virtual table wrapper for the view
|
||||
// We'll use the view's columns from the schema
|
||||
|
||||
@@ -448,7 +448,7 @@ fn query_pragma(
|
||||
if let Some(name) = name {
|
||||
if let Some(table) = schema.get_table(&name) {
|
||||
emit_columns_for_table_info(&mut program, table.columns(), base_reg);
|
||||
} else if let Some(view_mutex) = schema.get_view(&name) {
|
||||
} else if let Some(view_mutex) = schema.get_materialized_view(&name) {
|
||||
let view = view_mutex.lock().unwrap();
|
||||
emit_columns_for_table_info(&mut program, &view.columns, base_reg);
|
||||
}
|
||||
|
||||
@@ -27,7 +27,10 @@ pub fn translate_create_materialized_view(
|
||||
let normalized_view_name = normalize_ident(view_name);
|
||||
|
||||
// Check if view already exists
|
||||
if schema.get_view(&normalized_view_name).is_some() {
|
||||
if schema
|
||||
.get_materialized_view(&normalized_view_name)
|
||||
.is_some()
|
||||
{
|
||||
return Err(crate::LimboError::ParseError(format!(
|
||||
"View {normalized_view_name} already exists"
|
||||
)));
|
||||
@@ -71,8 +74,8 @@ pub fn translate_create_materialized_view(
|
||||
where_clause: Some(format!("name = '{normalized_view_name}'")),
|
||||
});
|
||||
|
||||
// Populate the new view
|
||||
program.emit_insn(Insn::PopulateViews);
|
||||
// Populate the new materialized view
|
||||
program.emit_insn(Insn::PopulateMaterializedViews);
|
||||
|
||||
program.epilogue(schema);
|
||||
Ok(program)
|
||||
@@ -104,7 +107,9 @@ pub fn translate_drop_view(
|
||||
let normalized_view_name = normalize_ident(view_name);
|
||||
|
||||
// Check if view exists
|
||||
let view_exists = schema.get_view(&normalized_view_name).is_some();
|
||||
let view_exists = schema
|
||||
.get_materialized_view(&normalized_view_name)
|
||||
.is_some();
|
||||
|
||||
if !view_exists && !if_exists {
|
||||
return Err(crate::LimboError::ParseError(format!(
|
||||
|
||||
12
core/util.rs
12
core/util.rs
@@ -2,7 +2,7 @@
|
||||
use crate::translate::expr::WalkControl;
|
||||
use crate::types::IOResult;
|
||||
use crate::{
|
||||
schema::{self, Column, Schema, Type, ViewsMap},
|
||||
schema::{self, Column, MaterializedViewsMap, Schema, Type},
|
||||
translate::{collate::CollationSeq, expr::walk_expr, plan::JoinOrderMember},
|
||||
types::{Value, ValueType},
|
||||
LimboError, OpenFlags, Result, Statement, StepResult, SymbolTable,
|
||||
@@ -77,7 +77,7 @@ pub fn parse_schema_rows(
|
||||
schema: &mut Schema,
|
||||
syms: &SymbolTable,
|
||||
mv_tx_id: Option<u64>,
|
||||
mut existing_views: ViewsMap,
|
||||
mut existing_views: MaterializedViewsMap,
|
||||
) -> Result<()> {
|
||||
rows.set_mv_tx_id(mv_tx_id);
|
||||
// TODO: if we IO, this unparsed indexes is lost. Will probably need some state between
|
||||
@@ -179,7 +179,9 @@ pub fn parse_schema_rows(
|
||||
};
|
||||
|
||||
// Add the existing view to the new schema
|
||||
schema.views.insert(view_name.clone(), existing_view);
|
||||
schema
|
||||
.materialized_views
|
||||
.insert(view_name.clone(), existing_view);
|
||||
|
||||
// Store for second pass processing
|
||||
views_to_process.push((view_name.clone(), referenced_tables));
|
||||
@@ -197,7 +199,7 @@ pub fn parse_schema_rows(
|
||||
Ok(incremental_view) => {
|
||||
let referenced_tables =
|
||||
incremental_view.get_referenced_table_names();
|
||||
schema.add_view(incremental_view);
|
||||
schema.add_materialized_view(incremental_view);
|
||||
views_to_process.push((view_name, referenced_tables));
|
||||
}
|
||||
Err(e) => {
|
||||
@@ -255,7 +257,7 @@ pub fn parse_schema_rows(
|
||||
for (view_name, referenced_tables) in views_to_process {
|
||||
// Register this view as dependent on each referenced table
|
||||
for table_name in referenced_tables {
|
||||
schema.add_view_dependency(&table_name, &view_name);
|
||||
schema.add_materialized_view_dependency(&table_name, &view_name);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -5177,7 +5177,7 @@ pub fn op_insert(
|
||||
match &state.op_insert_state.sub_state {
|
||||
OpInsertSubState::MaybeCaptureRecord => {
|
||||
let schema = program.connection.schema.borrow();
|
||||
let dependent_views = schema.get_dependent_views(table_name);
|
||||
let dependent_views = schema.get_dependent_materialized_views(table_name);
|
||||
// If there are no dependent views, we don't need to capture the old record.
|
||||
// We also don't need to do it if the rowid of the UPDATEd row was changed, because that means
|
||||
// we deleted it earlier and `op_delete` already captured the change.
|
||||
@@ -5265,7 +5265,7 @@ pub fn op_insert(
|
||||
state.op_insert_state.sub_state = OpInsertSubState::UpdateLastRowid;
|
||||
} else {
|
||||
let schema = program.connection.schema.borrow();
|
||||
let dependent_views = schema.get_dependent_views(table_name);
|
||||
let dependent_views = schema.get_dependent_materialized_views(table_name);
|
||||
if !dependent_views.is_empty() {
|
||||
state.op_insert_state.sub_state = OpInsertSubState::ApplyViewChange;
|
||||
} else {
|
||||
@@ -5286,7 +5286,7 @@ pub fn op_insert(
|
||||
program.n_change.set(prev_changes + 1);
|
||||
}
|
||||
let schema = program.connection.schema.borrow();
|
||||
let dependent_views = schema.get_dependent_views(table_name);
|
||||
let dependent_views = schema.get_dependent_materialized_views(table_name);
|
||||
if !dependent_views.is_empty() {
|
||||
state.op_insert_state.sub_state = OpInsertSubState::ApplyViewChange;
|
||||
continue;
|
||||
@@ -5295,7 +5295,7 @@ pub fn op_insert(
|
||||
}
|
||||
OpInsertSubState::ApplyViewChange => {
|
||||
let schema = program.connection.schema.borrow();
|
||||
let dependent_views = schema.get_dependent_views(table_name);
|
||||
let dependent_views = schema.get_dependent_materialized_views(table_name);
|
||||
assert!(!dependent_views.is_empty());
|
||||
|
||||
let (key, values) = {
|
||||
@@ -5418,7 +5418,7 @@ pub fn op_delete(
|
||||
match &state.op_delete_state.sub_state {
|
||||
OpDeleteSubState::MaybeCaptureRecord => {
|
||||
let schema = program.connection.schema.borrow();
|
||||
let dependent_views = schema.get_dependent_views(table_name);
|
||||
let dependent_views = schema.get_dependent_materialized_views(table_name);
|
||||
if dependent_views.is_empty() {
|
||||
state.op_delete_state.sub_state = OpDeleteSubState::Delete;
|
||||
continue;
|
||||
@@ -5465,7 +5465,7 @@ pub fn op_delete(
|
||||
return_if_io!(cursor.delete());
|
||||
}
|
||||
let schema = program.connection.schema.borrow();
|
||||
let dependent_views = schema.get_dependent_views(table_name);
|
||||
let dependent_views = schema.get_dependent_materialized_views(table_name);
|
||||
if dependent_views.is_empty() {
|
||||
break;
|
||||
}
|
||||
@@ -5474,7 +5474,7 @@ pub fn op_delete(
|
||||
}
|
||||
OpDeleteSubState::ApplyViewChange => {
|
||||
let schema = program.connection.schema.borrow();
|
||||
let dependent_views = schema.get_dependent_views(table_name);
|
||||
let dependent_views = schema.get_dependent_materialized_views(table_name);
|
||||
assert!(!dependent_views.is_empty());
|
||||
let maybe_deleted_record = state.op_delete_state.deleted_record.take();
|
||||
if let Some((key, values)) = maybe_deleted_record {
|
||||
@@ -6421,7 +6421,7 @@ pub fn op_parse_schema(
|
||||
|
||||
conn.with_schema_mut(|schema| {
|
||||
// TODO: This function below is synchronous, make it async
|
||||
let existing_views = schema.views.clone();
|
||||
let existing_views = schema.materialized_views.clone();
|
||||
parse_schema_rows(
|
||||
stmt,
|
||||
schema,
|
||||
@@ -6435,7 +6435,7 @@ pub fn op_parse_schema(
|
||||
|
||||
conn.with_schema_mut(|schema| {
|
||||
// TODO: This function below is synchronous, make it async
|
||||
let existing_views = schema.views.clone();
|
||||
let existing_views = schema.materialized_views.clone();
|
||||
parse_schema_rows(
|
||||
stmt,
|
||||
schema,
|
||||
@@ -6450,7 +6450,7 @@ pub fn op_parse_schema(
|
||||
Ok(InsnFunctionStepResult::Step)
|
||||
}
|
||||
|
||||
pub fn op_populate_views(
|
||||
pub fn op_populate_materialized_views(
|
||||
program: &Program,
|
||||
state: &mut ProgramState,
|
||||
_insn: &Insn,
|
||||
@@ -6460,7 +6460,7 @@ pub fn op_populate_views(
|
||||
let conn = program.connection.clone();
|
||||
let schema = conn.schema.borrow();
|
||||
|
||||
return_if_io!(schema.populate_views(&conn));
|
||||
return_if_io!(schema.populate_materialized_views(&conn));
|
||||
// All views populated, advance to next instruction
|
||||
state.pc += 1;
|
||||
Ok(InsnFunctionStepResult::Step)
|
||||
|
||||
@@ -1337,8 +1337,8 @@ pub fn insn_to_str(
|
||||
0,
|
||||
where_clause.clone().unwrap_or("NULL".to_string()),
|
||||
),
|
||||
Insn::PopulateViews => (
|
||||
"PopulateViews",
|
||||
Insn::PopulateMaterializedViews => (
|
||||
"PopulateMaterializedViews",
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
|
||||
@@ -897,8 +897,8 @@ pub enum Insn {
|
||||
where_clause: Option<String>,
|
||||
},
|
||||
|
||||
/// Populate all views after schema parsing
|
||||
PopulateViews,
|
||||
/// Populate all materialized views after schema parsing
|
||||
PopulateMaterializedViews,
|
||||
|
||||
/// Place the result of lhs >> rhs in dest register.
|
||||
ShiftRight {
|
||||
@@ -1185,7 +1185,7 @@ impl Insn {
|
||||
Insn::IsNull { .. } => execute::op_is_null,
|
||||
Insn::CollSeq { .. } => execute::op_coll_seq,
|
||||
Insn::ParseSchema { .. } => execute::op_parse_schema,
|
||||
Insn::PopulateViews => execute::op_populate_views,
|
||||
Insn::PopulateMaterializedViews => execute::op_populate_materialized_views,
|
||||
Insn::ShiftRight { .. } => execute::op_shift_right,
|
||||
Insn::ShiftLeft { .. } => execute::op_shift_left,
|
||||
Insn::AddImm { .. } => execute::op_add_imm,
|
||||
|
||||
@@ -476,7 +476,7 @@ impl Program {
|
||||
let schema = self.connection.schema.borrow();
|
||||
|
||||
for (view_name, tx_state) in tx_states.iter() {
|
||||
if let Some(view_mutex) = schema.get_view(view_name) {
|
||||
if let Some(view_mutex) = schema.get_materialized_view(view_name) {
|
||||
let mut view = view_mutex.lock().unwrap();
|
||||
view.merge_delta(&tx_state.delta);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user