core/schema: parse schema rows for MVCC transactions

This commit is contained in:
Pere Diaz Bou
2025-09-17 14:22:58 +00:00
parent a0555c254d
commit d53c64e84b
5 changed files with 41 additions and 6 deletions

View File

@@ -1180,7 +1180,14 @@ impl Connection {
let stmt = self.prepare("SELECT * FROM sqlite_schema")?;
// TODO: This function below is synchronous, make it async
parse_schema_rows(stmt, &mut fresh, &self.syms.read(), None, existing_views)?;
parse_schema_rows(
stmt,
&mut fresh,
&self.syms.read(),
None,
existing_views,
self._db.mv_store.as_ref(),
)?;
tracing::debug!(
"reparse_schema: schema_version={}, tables={:?}",
@@ -1791,9 +1798,14 @@ impl Connection {
let syms = self.syms.read();
self.with_schema_mut(|schema| {
let existing_views = schema.incremental_views.clone();
if let Err(LimboError::ExtensionError(e)) =
parse_schema_rows(rows, schema, &syms, None, existing_views)
{
if let Err(LimboError::ExtensionError(e)) = parse_schema_rows(
rows,
schema,
&syms,
None,
existing_views,
self._db.mv_store.as_ref(),
) {
// this means that a vtab exists and we no longer have the module loaded. we print
// a warning to the user to load the module
eprintln!("Warning: {e}");

View File

@@ -1868,6 +1868,11 @@ impl<Clock: LogicalClock> MvStore<Clock> {
Ok(())
}
// Mark table as loaded
pub fn mark_table_as_loaded(&self, table_id: u64) {
self.loaded_tables.write().insert(table_id);
}
/// Scans the table and inserts the rows into the database.
///
/// This is initialization step for a table, where we still don't have any rows so we need to insert them if there are.

View File

@@ -21,7 +21,7 @@ use crate::util::{
};
use crate::{
contains_ignore_ascii_case, eq_ignore_ascii_case, match_ignore_ascii_case, LimboError,
MvCursor, Pager, RefValue, SymbolTable, VirtualTable,
MvCursor, MvStore, Pager, RefValue, SymbolTable, VirtualTable,
};
use crate::{util::normalize_ident, Result};
use core::fmt;
@@ -296,6 +296,10 @@ impl Schema {
pager: Arc<Pager>,
syms: &SymbolTable,
) -> Result<()> {
assert!(
mv_cursor.is_none(),
"mvcc not yet supported for make_from_btree"
);
let mut cursor = BTreeCursor::new_table(mv_cursor, Arc::clone(&pager), 1, 10);
let mut from_sql_indexes = Vec::with_capacity(10);
@@ -357,6 +361,7 @@ impl Schema {
&mut dbsp_state_roots,
&mut dbsp_state_index_roots,
&mut materialized_view_info,
None,
)?;
drop(record_cursor);
drop(row);
@@ -553,6 +558,7 @@ impl Schema {
dbsp_state_roots: &mut std::collections::HashMap<String, usize>,
dbsp_state_index_roots: &mut std::collections::HashMap<String, usize>,
materialized_view_info: &mut std::collections::HashMap<String, (String, usize)>,
mv_store: Option<&Arc<MvStore>>,
) -> Result<()> {
match ty {
"table" => {
@@ -574,6 +580,9 @@ impl Schema {
)?
};
self.add_virtual_table(vtab);
if let Some(mv_store) = mv_store {
mv_store.mark_table_as_loaded(root_page as u64);
}
} else {
let table = BTreeTable::from_sql(sql, root_page as usize)?;
@@ -588,10 +597,14 @@ impl Schema {
dbsp_state_roots.insert(view_name, root_page as usize);
}
if let Some(mv_store) = mv_store {
mv_store.mark_table_as_loaded(root_page as u64);
}
self.add_btree_table(Arc::new(table));
}
}
"index" => {
assert!(mv_store.is_none(), "indexes not yet supported for mvcc");
match maybe_sql {
Some(sql) => {
from_sql_indexes.push(UnparsedFromSqlIndex {
@@ -635,6 +648,7 @@ impl Schema {
let sql = maybe_sql.expect("sql should be present for view");
let view_name = name.to_string();
assert!(mv_store.is_none(), "views not yet supported for mvcc");
// Parse the SQL to determine if it's a regular or materialized view
let mut parser = Parser::new(sql.as_bytes());

View File

@@ -10,7 +10,7 @@ use crate::{
types::{Value, ValueType},
LimboError, OpenFlags, Result, Statement, StepResult, SymbolTable,
};
use crate::{Connection, IO};
use crate::{Connection, MvStore, IO};
use std::{
collections::HashMap,
rc::Rc,
@@ -153,6 +153,7 @@ pub fn parse_schema_rows(
syms: &SymbolTable,
mv_tx: Option<(u64, TransactionMode)>,
mut existing_views: HashMap<String, Arc<Mutex<IncrementalView>>>,
mv_store: Option<&Arc<MvStore>>,
) -> Result<()> {
rows.set_mv_tx(mv_tx);
// TODO: if we IO, this unparsed indexes is lost. Will probably need some state between
@@ -190,6 +191,7 @@ pub fn parse_schema_rows(
&mut dbsp_state_roots,
&mut dbsp_state_index_roots,
&mut materialized_view_info,
mv_store,
)?
}
StepResult::IO => {

View File

@@ -6797,6 +6797,7 @@ pub fn op_parse_schema(
&conn.syms.read(),
program.connection.mv_tx.get(),
existing_views,
mv_store,
)
})
} else {
@@ -6812,6 +6813,7 @@ pub fn op_parse_schema(
&conn.syms.read(),
program.connection.mv_tx.get(),
existing_views,
mv_store,
)
})
};