diff --git a/core/lib.rs b/core/lib.rs index 7ee46c078..d06bf803b 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -1180,7 +1180,14 @@ impl Connection { let stmt = self.prepare("SELECT * FROM sqlite_schema")?; // TODO: This function below is synchronous, make it async - parse_schema_rows(stmt, &mut fresh, &self.syms.read(), None, existing_views)?; + parse_schema_rows( + stmt, + &mut fresh, + &self.syms.read(), + None, + existing_views, + self._db.mv_store.as_ref(), + )?; tracing::debug!( "reparse_schema: schema_version={}, tables={:?}", @@ -1791,9 +1798,14 @@ impl Connection { let syms = self.syms.read(); self.with_schema_mut(|schema| { let existing_views = schema.incremental_views.clone(); - if let Err(LimboError::ExtensionError(e)) = - parse_schema_rows(rows, schema, &syms, None, existing_views) - { + if let Err(LimboError::ExtensionError(e)) = parse_schema_rows( + rows, + schema, + &syms, + None, + existing_views, + self._db.mv_store.as_ref(), + ) { // this means that a vtab exists and we no longer have the module loaded. we print // a warning to the user to load the module eprintln!("Warning: {e}"); diff --git a/core/mvcc/database/mod.rs b/core/mvcc/database/mod.rs index 0fd5de331..d849e9adf 100644 --- a/core/mvcc/database/mod.rs +++ b/core/mvcc/database/mod.rs @@ -1868,6 +1868,11 @@ impl MvStore { Ok(()) } + // Mark table as loaded + pub fn mark_table_as_loaded(&self, table_id: u64) { + self.loaded_tables.write().insert(table_id); + } + /// Scans the table and inserts the rows into the database. /// /// This is initialization step for a table, where we still don't have any rows so we need to insert them if there are. diff --git a/core/schema.rs b/core/schema.rs index 45e8e93e5..6d510b3a3 100644 --- a/core/schema.rs +++ b/core/schema.rs @@ -21,7 +21,7 @@ use crate::util::{ }; use crate::{ contains_ignore_ascii_case, eq_ignore_ascii_case, match_ignore_ascii_case, LimboError, - MvCursor, Pager, RefValue, SymbolTable, VirtualTable, + MvCursor, MvStore, Pager, RefValue, SymbolTable, VirtualTable, }; use crate::{util::normalize_ident, Result}; use core::fmt; @@ -296,6 +296,10 @@ impl Schema { pager: Arc, syms: &SymbolTable, ) -> Result<()> { + assert!( + mv_cursor.is_none(), + "mvcc not yet supported for make_from_btree" + ); let mut cursor = BTreeCursor::new_table(mv_cursor, Arc::clone(&pager), 1, 10); let mut from_sql_indexes = Vec::with_capacity(10); @@ -357,6 +361,7 @@ impl Schema { &mut dbsp_state_roots, &mut dbsp_state_index_roots, &mut materialized_view_info, + None, )?; drop(record_cursor); drop(row); @@ -553,6 +558,7 @@ impl Schema { dbsp_state_roots: &mut std::collections::HashMap, dbsp_state_index_roots: &mut std::collections::HashMap, materialized_view_info: &mut std::collections::HashMap, + mv_store: Option<&Arc>, ) -> Result<()> { match ty { "table" => { @@ -574,6 +580,9 @@ impl Schema { )? }; self.add_virtual_table(vtab); + if let Some(mv_store) = mv_store { + mv_store.mark_table_as_loaded(root_page as u64); + } } else { let table = BTreeTable::from_sql(sql, root_page as usize)?; @@ -588,10 +597,14 @@ impl Schema { dbsp_state_roots.insert(view_name, root_page as usize); } + if let Some(mv_store) = mv_store { + mv_store.mark_table_as_loaded(root_page as u64); + } self.add_btree_table(Arc::new(table)); } } "index" => { + assert!(mv_store.is_none(), "indexes not yet supported for mvcc"); match maybe_sql { Some(sql) => { from_sql_indexes.push(UnparsedFromSqlIndex { @@ -635,6 +648,7 @@ impl Schema { let sql = maybe_sql.expect("sql should be present for view"); let view_name = name.to_string(); + assert!(mv_store.is_none(), "views not yet supported for mvcc"); // Parse the SQL to determine if it's a regular or materialized view let mut parser = Parser::new(sql.as_bytes()); diff --git a/core/util.rs b/core/util.rs index 4512ccb5c..2df49d471 100644 --- a/core/util.rs +++ b/core/util.rs @@ -10,7 +10,7 @@ use crate::{ types::{Value, ValueType}, LimboError, OpenFlags, Result, Statement, StepResult, SymbolTable, }; -use crate::{Connection, IO}; +use crate::{Connection, MvStore, IO}; use std::{ collections::HashMap, rc::Rc, @@ -153,6 +153,7 @@ pub fn parse_schema_rows( syms: &SymbolTable, mv_tx: Option<(u64, TransactionMode)>, mut existing_views: HashMap>>, + mv_store: Option<&Arc>, ) -> Result<()> { rows.set_mv_tx(mv_tx); // TODO: if we IO, this unparsed indexes is lost. Will probably need some state between @@ -190,6 +191,7 @@ pub fn parse_schema_rows( &mut dbsp_state_roots, &mut dbsp_state_index_roots, &mut materialized_view_info, + mv_store, )? } StepResult::IO => { diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index dde1a456b..92113a001 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -6797,6 +6797,7 @@ pub fn op_parse_schema( &conn.syms.read(), program.connection.mv_tx.get(), existing_views, + mv_store, ) }) } else { @@ -6812,6 +6813,7 @@ pub fn op_parse_schema( &conn.syms.read(), program.connection.mv_tx.get(), existing_views, + mv_store, ) }) };