mirror of
https://github.com/aljazceru/turso.git
synced 2025-12-30 14:34:22 +01:00
Merge 'Persistence for DBSP-based materialized views' from Glauber Costa
This fairly long commit implements persistence for materialized view. It
is hard to split because of all the interdependencies between
components, so it is a one big thing. This commit message will at least
try to go into details about the basic architecture.
Materialized Views as tables
============================
Materialized views are now a normal table - whereas before they were a
virtual table. By making a materialized view a table, we can reuse all
the infrastructure for dealing with tables (cursors, etc).
One of the advantages of doing this is that we can create indexes on
view columns. Later, we should also be able to write those views to
separate files with ATTACH write.
Materialized Views as Zsets
===========================
The contents of the table are a ZSet: rowid, values, weight. Readers
will notice that because of this, the usage of the ZSet data structure
dwindles throughout the codebase. The main difference between our
materialized ZSet and the standard DBSP ZSet, is that obviously ours is
backed by a BTree, not a Hash (since SQLite tables are BTrees)
Aggregator State
================
In DBSP, the aggregator nodes also have state. To store that state,
there is a second table. The table holds all aggregators in the view,
and there is one table per view. That is
__turso_internal_dbsp_state_{view_name}. The format of that table is
similar to a ZSet: rowid, serialized_values, weight. We serialize the
values because there will be many aggregators in the table. We can't
rely on a particular format for the values.
The Materialized View Cursor
============================
Reading from a Materialized View essentially means reading from the
persisted ZSet, and enhancing that with data that exists within the
transaction. Transaction data is ephemeral, so we do not materialize
this anywhere: we have a carefully crafted implementation of seek that
takes care of merging weights and stitching the two sets together.
Closes #2921
This commit is contained in:
File diff suppressed because it is too large
Load Diff
1618
core/incremental/cursor.rs
Normal file
1618
core/incremental/cursor.rs
Normal file
File diff suppressed because it is too large
Load Diff
@@ -1,19 +1,86 @@
|
||||
// Simplified DBSP integration for incremental view maintenance
|
||||
// For now, we'll use a basic approach and can expand to full DBSP later
|
||||
|
||||
use std::collections::HashMap;
|
||||
use super::hashable_row::HashableRow;
|
||||
use crate::Value;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
|
||||
type DeltaEntry = (HashableRow, isize);
|
||||
/// A delta represents ordered changes to data
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct Delta {
|
||||
/// Ordered list of changes: (row, weight) where weight is +1 for insert, -1 for delete
|
||||
/// It is crucial that this is ordered. Imagine the case of an update, which becomes a delete +
|
||||
/// insert. If this is not ordered, it would be applied in arbitrary order and break the view.
|
||||
pub changes: Vec<DeltaEntry>,
|
||||
}
|
||||
|
||||
impl Delta {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
changes: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn insert(&mut self, row_key: i64, values: Vec<Value>) {
|
||||
let row = HashableRow::new(row_key, values);
|
||||
self.changes.push((row, 1));
|
||||
}
|
||||
|
||||
pub fn delete(&mut self, row_key: i64, values: Vec<Value>) {
|
||||
let row = HashableRow::new(row_key, values);
|
||||
self.changes.push((row, -1));
|
||||
}
|
||||
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.changes.is_empty()
|
||||
}
|
||||
|
||||
pub fn len(&self) -> usize {
|
||||
self.changes.len()
|
||||
}
|
||||
|
||||
/// Merge another delta into this one
|
||||
/// This preserves the order of operations - no consolidation is done
|
||||
/// to maintain the full history of changes
|
||||
pub fn merge(&mut self, other: &Delta) {
|
||||
// Simply append all changes from other, preserving order
|
||||
self.changes.extend(other.changes.iter().cloned());
|
||||
}
|
||||
|
||||
/// Consolidate changes by combining entries with the same HashableRow
|
||||
pub fn consolidate(&mut self) {
|
||||
if self.changes.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
// Use a HashMap to accumulate weights
|
||||
let mut consolidated: HashMap<HashableRow, isize> = HashMap::new();
|
||||
|
||||
for (row, weight) in self.changes.drain(..) {
|
||||
*consolidated.entry(row).or_insert(0) += weight;
|
||||
}
|
||||
|
||||
// Convert back to vec, filtering out zero weights
|
||||
self.changes = consolidated
|
||||
.into_iter()
|
||||
.filter(|(_, weight)| *weight != 0)
|
||||
.collect();
|
||||
}
|
||||
}
|
||||
|
||||
/// A simplified ZSet for incremental computation
|
||||
/// Each element has a weight: positive for additions, negative for deletions
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub struct SimpleZSet<T> {
|
||||
data: HashMap<T, isize>,
|
||||
data: BTreeMap<T, isize>,
|
||||
}
|
||||
|
||||
impl<T: std::hash::Hash + Eq + Clone> SimpleZSet<T> {
|
||||
#[allow(dead_code)]
|
||||
impl<T: std::hash::Hash + Eq + Ord + Clone> SimpleZSet<T> {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
data: HashMap::new(),
|
||||
data: BTreeMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -45,36 +112,121 @@ impl<T: std::hash::Hash + Eq + Clone> SimpleZSet<T> {
|
||||
self.insert(item.clone(), weight);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A simplified stream for incremental computation
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct SimpleStream<T> {
|
||||
current: SimpleZSet<T>,
|
||||
}
|
||||
|
||||
impl<T: std::hash::Hash + Eq + Clone> SimpleStream<T> {
|
||||
pub fn from_zset(zset: SimpleZSet<T>) -> Self {
|
||||
Self { current: zset }
|
||||
/// Get the weight for a specific item (0 if not present)
|
||||
pub fn get(&self, item: &T) -> isize {
|
||||
self.data.get(item).copied().unwrap_or(0)
|
||||
}
|
||||
|
||||
/// Apply a delta (change) to the stream
|
||||
pub fn apply_delta(&mut self, delta: &SimpleZSet<T>) {
|
||||
self.current.merge(delta);
|
||||
/// Get the first element (smallest key) in the Z-set
|
||||
pub fn first(&self) -> Option<(&T, isize)> {
|
||||
self.data.iter().next().map(|(k, &v)| (k, v))
|
||||
}
|
||||
|
||||
/// Get the current state as a vector of items (only positive weights)
|
||||
pub fn to_vec(&self) -> Vec<T> {
|
||||
self.current.to_vec()
|
||||
/// Get the last element (largest key) in the Z-set
|
||||
pub fn last(&self) -> Option<(&T, isize)> {
|
||||
self.data.iter().next_back().map(|(k, &v)| (k, v))
|
||||
}
|
||||
|
||||
/// Get a range of elements
|
||||
pub fn range<R>(&self, range: R) -> impl Iterator<Item = (&T, isize)> + '_
|
||||
where
|
||||
R: std::ops::RangeBounds<T>,
|
||||
{
|
||||
self.data.range(range).map(|(k, &v)| (k, v))
|
||||
}
|
||||
|
||||
/// Check if empty
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.data.is_empty()
|
||||
}
|
||||
|
||||
/// Get the number of elements
|
||||
pub fn len(&self) -> usize {
|
||||
self.data.len()
|
||||
}
|
||||
}
|
||||
|
||||
// Type aliases for convenience
|
||||
use super::hashable_row::HashableRow;
|
||||
|
||||
pub type RowKey = HashableRow;
|
||||
pub type RowKeyZSet = SimpleZSet<RowKey>;
|
||||
pub type RowKeyStream = SimpleStream<RowKey>;
|
||||
|
||||
impl RowKeyZSet {
|
||||
/// Create a Z-set from a Delta by consolidating all changes
|
||||
pub fn from_delta(delta: &Delta) -> Self {
|
||||
let mut zset = Self::new();
|
||||
|
||||
// Add all changes from the delta, consolidating as we go
|
||||
for (row, weight) in &delta.changes {
|
||||
zset.insert(row.clone(), *weight);
|
||||
}
|
||||
|
||||
zset
|
||||
}
|
||||
|
||||
/// Seek to find ALL entries for the best matching rowid
|
||||
/// For GT/GE: returns all entries for the smallest rowid that satisfies the condition
|
||||
/// For LT/LE: returns all entries for the largest rowid that satisfies the condition
|
||||
/// Returns empty vec if no match found
|
||||
pub fn seek(&self, target: i64, op: crate::types::SeekOp) -> Vec<(HashableRow, isize)> {
|
||||
use crate::types::SeekOp;
|
||||
|
||||
// First find the best matching rowid
|
||||
let best_rowid = match op {
|
||||
SeekOp::GT => {
|
||||
// Find smallest rowid > target
|
||||
self.data
|
||||
.iter()
|
||||
.filter(|(row, _)| row.rowid > target)
|
||||
.map(|(row, _)| row.rowid)
|
||||
.min()
|
||||
}
|
||||
SeekOp::GE { eq_only: false } => {
|
||||
// Find smallest rowid >= target
|
||||
self.data
|
||||
.iter()
|
||||
.filter(|(row, _)| row.rowid >= target)
|
||||
.map(|(row, _)| row.rowid)
|
||||
.min()
|
||||
}
|
||||
SeekOp::GE { eq_only: true } | SeekOp::LE { eq_only: true } => {
|
||||
// Need exact match
|
||||
if self.data.iter().any(|(row, _)| row.rowid == target) {
|
||||
Some(target)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
SeekOp::LT => {
|
||||
// Find largest rowid < target
|
||||
self.data
|
||||
.iter()
|
||||
.filter(|(row, _)| row.rowid < target)
|
||||
.map(|(row, _)| row.rowid)
|
||||
.max()
|
||||
}
|
||||
SeekOp::LE { eq_only: false } => {
|
||||
// Find largest rowid <= target
|
||||
self.data
|
||||
.iter()
|
||||
.filter(|(row, _)| row.rowid <= target)
|
||||
.map(|(row, _)| row.rowid)
|
||||
.max()
|
||||
}
|
||||
};
|
||||
|
||||
// Now get ALL entries with that rowid
|
||||
match best_rowid {
|
||||
Some(rowid) => self
|
||||
.data
|
||||
.iter()
|
||||
.filter(|(row, _)| row.rowid == rowid)
|
||||
.map(|(k, &v)| (k.clone(), v))
|
||||
.collect(),
|
||||
None => Vec::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
@@ -78,3 +78,23 @@ impl Hash for HashableRow {
|
||||
self.cached_hash.hash(state);
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialOrd for HashableRow {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
|
||||
Some(self.cmp(other))
|
||||
}
|
||||
}
|
||||
|
||||
impl Ord for HashableRow {
|
||||
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
|
||||
// First compare by rowid, then by values if rowids are equal
|
||||
// This ensures Ord is consistent with Eq (which compares all fields)
|
||||
match self.rowid.cmp(&other.rowid) {
|
||||
std::cmp::Ordering::Equal => {
|
||||
// If rowids are equal, compare values to maintain consistency with Eq
|
||||
self.values.cmp(&other.values)
|
||||
}
|
||||
other => other,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
pub mod compiler;
|
||||
pub mod cursor;
|
||||
pub mod dbsp;
|
||||
pub mod expr_compiler;
|
||||
pub mod hashable_row;
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,13 +1,16 @@
|
||||
use super::compiler::{DbspCircuit, DbspCompiler, DeltaSet};
|
||||
use super::dbsp::{RowKeyStream, RowKeyZSet};
|
||||
use super::operator::{ComputationTracker, Delta, FilterPredicate};
|
||||
use super::dbsp::Delta;
|
||||
use super::operator::{ComputationTracker, FilterPredicate};
|
||||
use crate::schema::{BTreeTable, Column, Schema};
|
||||
use crate::storage::btree::BTreeCursor;
|
||||
use crate::translate::logical::LogicalPlanBuilder;
|
||||
use crate::types::{IOCompletions, IOResult, Value};
|
||||
use crate::types::{IOResult, Value};
|
||||
use crate::util::extract_view_columns;
|
||||
use crate::{io_yield_one, Completion, LimboError, Result, Statement};
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use crate::{return_if_io, LimboError, Pager, Result, Statement};
|
||||
use std::cell::RefCell;
|
||||
use std::collections::HashMap;
|
||||
use std::fmt;
|
||||
use std::rc::Rc;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use turso_parser::ast;
|
||||
use turso_parser::{
|
||||
@@ -23,18 +26,26 @@ pub enum PopulateState {
|
||||
Processing {
|
||||
stmt: Box<Statement>,
|
||||
rows_processed: usize,
|
||||
/// If we're in the middle of processing a row (merge_delta returned I/O)
|
||||
pending_row: Option<(i64, Vec<Value>)>, // (rowid, values)
|
||||
},
|
||||
/// Population complete
|
||||
Done,
|
||||
}
|
||||
|
||||
/// State machine for merge_delta to handle I/O operations
|
||||
impl fmt::Debug for PopulateState {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
PopulateState::Start => write!(f, "Start"),
|
||||
PopulateState::Processing { rows_processed, .. } => f
|
||||
PopulateState::Processing {
|
||||
rows_processed,
|
||||
pending_row,
|
||||
..
|
||||
} => f
|
||||
.debug_struct("Processing")
|
||||
.field("rows_processed", rows_processed)
|
||||
.field("has_pending", &pending_row.is_some())
|
||||
.finish(),
|
||||
PopulateState::Done => write!(f, "Done"),
|
||||
}
|
||||
@@ -45,11 +56,95 @@ impl fmt::Debug for PopulateState {
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct ViewTransactionState {
|
||||
// Per-connection delta for uncommitted changes (contains both weights and values)
|
||||
pub delta: Delta,
|
||||
// Using RefCell for interior mutability
|
||||
delta: RefCell<Delta>,
|
||||
}
|
||||
|
||||
/// Incremental view that maintains a stream of row keys using DBSP-style computation
|
||||
/// The actual row data is stored as transformed Values
|
||||
impl ViewTransactionState {
|
||||
/// Create a new transaction state
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
delta: RefCell::new(Delta::new()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Insert a row into the delta
|
||||
pub fn insert(&self, key: i64, values: Vec<Value>) {
|
||||
self.delta.borrow_mut().insert(key, values);
|
||||
}
|
||||
|
||||
/// Delete a row from the delta
|
||||
pub fn delete(&self, key: i64, values: Vec<Value>) {
|
||||
self.delta.borrow_mut().delete(key, values);
|
||||
}
|
||||
|
||||
/// Clear all changes in the delta
|
||||
pub fn clear(&self) {
|
||||
self.delta.borrow_mut().changes.clear();
|
||||
}
|
||||
|
||||
/// Get a clone of the current delta
|
||||
pub fn get_delta(&self) -> Delta {
|
||||
self.delta.borrow().clone()
|
||||
}
|
||||
|
||||
/// Check if the delta is empty
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.delta.borrow().is_empty()
|
||||
}
|
||||
|
||||
/// Returns how many elements exist in the delta.
|
||||
pub fn len(&self) -> usize {
|
||||
self.delta.borrow().len()
|
||||
}
|
||||
}
|
||||
|
||||
/// Container for all view transaction states within a connection
|
||||
/// Provides interior mutability for the map of view states
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct AllViewsTxState {
|
||||
states: Rc<RefCell<HashMap<String, Rc<ViewTransactionState>>>>,
|
||||
}
|
||||
|
||||
impl AllViewsTxState {
|
||||
/// Create a new container for view transaction states
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
states: Rc::new(RefCell::new(HashMap::new())),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get or create a transaction state for a view
|
||||
pub fn get_or_create(&self, view_name: &str) -> Rc<ViewTransactionState> {
|
||||
let mut states = self.states.borrow_mut();
|
||||
states
|
||||
.entry(view_name.to_string())
|
||||
.or_insert_with(|| Rc::new(ViewTransactionState::new()))
|
||||
.clone()
|
||||
}
|
||||
|
||||
/// Get a transaction state for a view if it exists
|
||||
pub fn get(&self, view_name: &str) -> Option<Rc<ViewTransactionState>> {
|
||||
self.states.borrow().get(view_name).cloned()
|
||||
}
|
||||
|
||||
/// Clear all transaction states
|
||||
pub fn clear(&self) {
|
||||
self.states.borrow_mut().clear();
|
||||
}
|
||||
|
||||
/// Check if there are no transaction states
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.states.borrow().is_empty()
|
||||
}
|
||||
|
||||
/// Get all view names that have transaction states
|
||||
pub fn get_view_names(&self) -> Vec<String> {
|
||||
self.states.borrow().keys().cloned().collect()
|
||||
}
|
||||
}
|
||||
|
||||
/// Incremental view that maintains its state through a DBSP circuit
|
||||
///
|
||||
/// This version keeps everything in-memory. This is acceptable for small views, since DBSP
|
||||
/// doesn't have to track the history of changes. Still for very large views (think of the result
|
||||
@@ -62,12 +157,7 @@ pub struct ViewTransactionState {
|
||||
/// Uses DBSP circuits for incremental computation.
|
||||
#[derive(Debug)]
|
||||
pub struct IncrementalView {
|
||||
// Stream of row keys for this view
|
||||
stream: RowKeyStream,
|
||||
name: String,
|
||||
// Store the actual row data as Values, keyed by row_key
|
||||
// Using BTreeMap for ordered iteration
|
||||
pub records: BTreeMap<i64, Vec<Value>>,
|
||||
// WHERE clause predicate for filtering (kept for compatibility)
|
||||
pub where_predicate: FilterPredicate,
|
||||
// The SELECT statement that defines how to transform input data
|
||||
@@ -75,8 +165,6 @@ pub struct IncrementalView {
|
||||
|
||||
// DBSP circuit that encapsulates the computation
|
||||
circuit: DbspCircuit,
|
||||
// Track whether circuit has been initialized with data
|
||||
circuit_initialized: bool,
|
||||
|
||||
// Tables referenced by this view (extracted from FROM clause and JOINs)
|
||||
base_table: Arc<BTreeTable>,
|
||||
@@ -88,6 +176,8 @@ pub struct IncrementalView {
|
||||
// We will use this one day to export rows_read, but for now, will just test that we're doing the expected amount of compute
|
||||
#[cfg_attr(not(test), allow(dead_code))]
|
||||
pub tracker: Arc<Mutex<ComputationTracker>>,
|
||||
// Root page of the btree storing the materialized state (0 for unmaterialized)
|
||||
root_page: usize,
|
||||
}
|
||||
|
||||
impl IncrementalView {
|
||||
@@ -110,6 +200,8 @@ impl IncrementalView {
|
||||
select: &ast::Select,
|
||||
schema: &Schema,
|
||||
_base_table: &Arc<BTreeTable>,
|
||||
main_data_root: usize,
|
||||
internal_state_root: usize,
|
||||
) -> Result<DbspCircuit> {
|
||||
// Build the logical plan from the SELECT statement
|
||||
let mut builder = LogicalPlanBuilder::new(schema);
|
||||
@@ -117,8 +209,8 @@ impl IncrementalView {
|
||||
let stmt = ast::Stmt::Select(select.clone());
|
||||
let logical_plan = builder.build_statement(&stmt)?;
|
||||
|
||||
// Compile the logical plan to a DBSP circuit
|
||||
let compiler = DbspCompiler::new();
|
||||
// Compile the logical plan to a DBSP circuit with the storage roots
|
||||
let compiler = DbspCompiler::new(main_data_root, internal_state_root);
|
||||
let circuit = compiler.compile(&logical_plan)?;
|
||||
|
||||
Ok(circuit)
|
||||
@@ -145,7 +237,37 @@ impl IncrementalView {
|
||||
false
|
||||
}
|
||||
|
||||
pub fn from_sql(sql: &str, schema: &Schema) -> Result<Self> {
|
||||
/// Validate a SELECT statement and extract the columns it would produce
|
||||
/// This is used during CREATE MATERIALIZED VIEW to validate the view before storing it
|
||||
pub fn validate_and_extract_columns(
|
||||
select: &ast::Select,
|
||||
schema: &Schema,
|
||||
) -> Result<Vec<crate::schema::Column>> {
|
||||
// For now, just extract columns from a simple select
|
||||
// This will need to be expanded to handle joins, aggregates, etc.
|
||||
|
||||
// Get the base table name
|
||||
let base_table_name = Self::extract_base_table(select).ok_or_else(|| {
|
||||
LimboError::ParseError("Cannot extract base table from SELECT".to_string())
|
||||
})?;
|
||||
|
||||
// Get the table from schema
|
||||
let table = schema
|
||||
.get_table(&base_table_name)
|
||||
.and_then(|t| t.btree())
|
||||
.ok_or_else(|| LimboError::ParseError(format!("Table {base_table_name} not found")))?;
|
||||
|
||||
// For now, return all columns from the base table
|
||||
// In the future, this should parse the select list and handle projections
|
||||
Ok(table.columns.clone())
|
||||
}
|
||||
|
||||
pub fn from_sql(
|
||||
sql: &str,
|
||||
schema: &Schema,
|
||||
main_data_root: usize,
|
||||
internal_state_root: usize,
|
||||
) -> Result<Self> {
|
||||
let mut parser = Parser::new(sql.as_bytes());
|
||||
let cmd = parser.next_cmd()?;
|
||||
let cmd = cmd.expect("View is an empty statement");
|
||||
@@ -155,7 +277,13 @@ impl IncrementalView {
|
||||
view_name,
|
||||
columns: _,
|
||||
select,
|
||||
}) => IncrementalView::from_stmt(view_name, select, schema),
|
||||
}) => IncrementalView::from_stmt(
|
||||
view_name,
|
||||
select,
|
||||
schema,
|
||||
main_data_root,
|
||||
internal_state_root,
|
||||
),
|
||||
_ => Err(LimboError::ParseError(format!(
|
||||
"View is not a CREATE MATERIALIZED VIEW statement: {sql}"
|
||||
))),
|
||||
@@ -166,6 +294,8 @@ impl IncrementalView {
|
||||
view_name: ast::QualifiedName,
|
||||
select: ast::Select,
|
||||
schema: &Schema,
|
||||
main_data_root: usize,
|
||||
internal_state_root: usize,
|
||||
) -> Result<Self> {
|
||||
let name = view_name.name.as_str().to_string();
|
||||
|
||||
@@ -203,9 +333,12 @@ impl IncrementalView {
|
||||
base_table,
|
||||
view_columns,
|
||||
schema,
|
||||
main_data_root,
|
||||
internal_state_root,
|
||||
)
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new(
|
||||
name: String,
|
||||
where_predicate: FilterPredicate,
|
||||
@@ -213,30 +346,31 @@ impl IncrementalView {
|
||||
base_table: Arc<BTreeTable>,
|
||||
columns: Vec<Column>,
|
||||
schema: &Schema,
|
||||
main_data_root: usize,
|
||||
internal_state_root: usize,
|
||||
) -> Result<Self> {
|
||||
let records = BTreeMap::new();
|
||||
|
||||
// Create the tracker that will be shared by all operators
|
||||
let tracker = Arc::new(Mutex::new(ComputationTracker::new()));
|
||||
|
||||
// Compile the SELECT statement into a DBSP circuit
|
||||
let circuit = Self::try_compile_circuit(&select_stmt, schema, &base_table)?;
|
||||
|
||||
// Circuit will be initialized when we first call merge_delta
|
||||
let circuit_initialized = false;
|
||||
let circuit = Self::try_compile_circuit(
|
||||
&select_stmt,
|
||||
schema,
|
||||
&base_table,
|
||||
main_data_root,
|
||||
internal_state_root,
|
||||
)?;
|
||||
|
||||
Ok(Self {
|
||||
stream: RowKeyStream::from_zset(RowKeyZSet::new()),
|
||||
name,
|
||||
records,
|
||||
where_predicate,
|
||||
select_stmt,
|
||||
circuit,
|
||||
circuit_initialized,
|
||||
base_table,
|
||||
columns,
|
||||
populate_state: PopulateState::Start,
|
||||
tracker,
|
||||
root_page: main_data_root,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -244,6 +378,29 @@ impl IncrementalView {
|
||||
&self.name
|
||||
}
|
||||
|
||||
pub fn base_table(&self) -> &Arc<BTreeTable> {
|
||||
&self.base_table
|
||||
}
|
||||
|
||||
/// Execute the circuit with uncommitted changes to get processed delta
|
||||
pub fn execute_with_uncommitted(
|
||||
&mut self,
|
||||
uncommitted: DeltaSet,
|
||||
pager: Rc<Pager>,
|
||||
execute_state: &mut crate::incremental::compiler::ExecuteState,
|
||||
) -> crate::Result<crate::types::IOResult<Delta>> {
|
||||
// Initialize execute_state with the input data
|
||||
*execute_state = crate::incremental::compiler::ExecuteState::Init {
|
||||
input_data: uncommitted,
|
||||
};
|
||||
self.circuit.execute(pager, execute_state)
|
||||
}
|
||||
|
||||
/// Get the root page for this materialized view's btree
|
||||
pub fn get_root_page(&self) -> usize {
|
||||
self.root_page
|
||||
}
|
||||
|
||||
/// Get all table names referenced by this view
|
||||
pub fn get_referenced_table_names(&self) -> Vec<String> {
|
||||
vec![self.base_table.name.clone()]
|
||||
@@ -348,132 +505,189 @@ impl IncrementalView {
|
||||
|
||||
/// Populate the view by scanning the source table using a state machine
|
||||
/// This can be called multiple times and will resume from where it left off
|
||||
/// This method is only for materialized views and will persist data to the btree
|
||||
pub fn populate_from_table(
|
||||
&mut self,
|
||||
conn: &std::sync::Arc<crate::Connection>,
|
||||
pager: &std::rc::Rc<crate::Pager>,
|
||||
_btree_cursor: &mut BTreeCursor,
|
||||
) -> crate::Result<IOResult<()>> {
|
||||
// If already populated, return immediately
|
||||
if matches!(self.populate_state, PopulateState::Done) {
|
||||
return Ok(IOResult::Done(()));
|
||||
}
|
||||
|
||||
const BATCH_SIZE: usize = 100; // Process 100 rows at a time before yielding
|
||||
// Assert that this is a materialized view with a root page
|
||||
assert!(
|
||||
self.root_page != 0,
|
||||
"populate_from_table should only be called for materialized views with root_page"
|
||||
);
|
||||
|
||||
loop {
|
||||
match &mut self.populate_state {
|
||||
PopulateState::Start => {
|
||||
// Generate the SQL query for populating the view
|
||||
// It is best to use a standard query than a cursor for two reasons:
|
||||
// 1) Using a sql query will allow us to be much more efficient in cases where we only want
|
||||
// some rows, in particular for indexed filters
|
||||
// 2) There are two types of cursors: index and table. In some situations (like for example
|
||||
// if the table has an integer primary key), the key will be exclusively in the index
|
||||
// btree and not in the table btree. Using cursors would force us to be aware of this
|
||||
// distinction (and others), and ultimately lead to reimplementing the whole query
|
||||
// machinery (next step is which index is best to use, etc)
|
||||
let query = self.sql_for_populate()?;
|
||||
// To avoid borrow checker issues, we need to handle state transitions carefully
|
||||
let needs_start = matches!(self.populate_state, PopulateState::Start);
|
||||
|
||||
// Prepare the statement
|
||||
let stmt = conn.prepare(&query)?;
|
||||
if needs_start {
|
||||
// Generate the SQL query for populating the view
|
||||
// It is best to use a standard query than a cursor for two reasons:
|
||||
// 1) Using a sql query will allow us to be much more efficient in cases where we only want
|
||||
// some rows, in particular for indexed filters
|
||||
// 2) There are two types of cursors: index and table. In some situations (like for example
|
||||
// if the table has an integer primary key), the key will be exclusively in the index
|
||||
// btree and not in the table btree. Using cursors would force us to be aware of this
|
||||
// distinction (and others), and ultimately lead to reimplementing the whole query
|
||||
// machinery (next step is which index is best to use, etc)
|
||||
let query = self.sql_for_populate()?;
|
||||
|
||||
self.populate_state = PopulateState::Processing {
|
||||
stmt: Box::new(stmt),
|
||||
rows_processed: 0,
|
||||
};
|
||||
// Continue to next state
|
||||
// Prepare the statement
|
||||
let stmt = conn.prepare(&query)?;
|
||||
|
||||
self.populate_state = PopulateState::Processing {
|
||||
stmt: Box::new(stmt),
|
||||
rows_processed: 0,
|
||||
pending_row: None,
|
||||
};
|
||||
// Continue to next state
|
||||
continue;
|
||||
}
|
||||
|
||||
// Handle Done state
|
||||
if matches!(self.populate_state, PopulateState::Done) {
|
||||
return Ok(IOResult::Done(()));
|
||||
}
|
||||
|
||||
// Handle Processing state - extract state to avoid borrow issues
|
||||
let (mut stmt, mut rows_processed, pending_row) =
|
||||
match std::mem::replace(&mut self.populate_state, PopulateState::Done) {
|
||||
PopulateState::Processing {
|
||||
stmt,
|
||||
rows_processed,
|
||||
pending_row,
|
||||
} => (stmt, rows_processed, pending_row),
|
||||
_ => unreachable!("We already handled Start and Done states"),
|
||||
};
|
||||
|
||||
// If we have a pending row from a previous I/O interruption, process it first
|
||||
if let Some((rowid, values)) = pending_row {
|
||||
// Create a single-row delta for the pending row
|
||||
let mut single_row_delta = Delta::new();
|
||||
single_row_delta.insert(rowid, values.clone());
|
||||
|
||||
// Process the pending row with the pager
|
||||
match self.merge_delta(&single_row_delta, pager.clone())? {
|
||||
IOResult::Done(_) => {
|
||||
// Row processed successfully, continue to next row
|
||||
rows_processed += 1;
|
||||
// Continue to fetch next row from statement
|
||||
}
|
||||
IOResult::IO(io) => {
|
||||
// Still not done, save state with pending row
|
||||
self.populate_state = PopulateState::Processing {
|
||||
stmt,
|
||||
rows_processed,
|
||||
pending_row: Some((rowid, values)), // Keep the pending row
|
||||
};
|
||||
return Ok(IOResult::IO(io));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
PopulateState::Processing {
|
||||
stmt,
|
||||
rows_processed,
|
||||
} => {
|
||||
// Collect rows into a delta batch
|
||||
let mut batch_delta = Delta::new();
|
||||
let mut batch_count = 0;
|
||||
// Process rows one at a time - no batching
|
||||
loop {
|
||||
// This step() call resumes from where the statement left off
|
||||
match stmt.step()? {
|
||||
crate::vdbe::StepResult::Row => {
|
||||
// Get the row
|
||||
let row = stmt.row().unwrap();
|
||||
|
||||
loop {
|
||||
if batch_count >= BATCH_SIZE {
|
||||
// Process this batch through the standard pipeline
|
||||
self.merge_delta(&batch_delta);
|
||||
// Yield control after processing a batch
|
||||
// TODO: currently this inner statement is the one that is tracking completions
|
||||
// so as a stop gap we can just return a dummy completion here
|
||||
io_yield_one!(Completion::new_dummy());
|
||||
}
|
||||
// Extract values from the row
|
||||
let all_values: Vec<crate::types::Value> =
|
||||
row.get_values().cloned().collect();
|
||||
|
||||
// This step() call resumes from where the statement left off
|
||||
match stmt.step()? {
|
||||
crate::vdbe::StepResult::Row => {
|
||||
// Get the row
|
||||
let row = stmt.row().unwrap();
|
||||
|
||||
// Extract values from the row
|
||||
let all_values: Vec<crate::types::Value> =
|
||||
row.get_values().cloned().collect();
|
||||
|
||||
// Determine how to extract the rowid
|
||||
// If there's a rowid alias (INTEGER PRIMARY KEY), the rowid is one of the columns
|
||||
// Otherwise, it's the last value we explicitly selected
|
||||
let (rowid, values) = if let Some((idx, _)) =
|
||||
self.base_table.get_rowid_alias_column()
|
||||
{
|
||||
// The rowid is the value at the rowid alias column index
|
||||
let rowid = match all_values.get(idx) {
|
||||
Some(crate::types::Value::Integer(id)) => *id,
|
||||
_ => {
|
||||
// This shouldn't happen - rowid alias must be an integer
|
||||
*rows_processed += 1;
|
||||
batch_count += 1;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
// All values are table columns (no separate rowid was selected)
|
||||
(rowid, all_values)
|
||||
} else {
|
||||
// The last value is the explicitly selected rowid
|
||||
let rowid = match all_values.last() {
|
||||
Some(crate::types::Value::Integer(id)) => *id,
|
||||
_ => {
|
||||
// This shouldn't happen - rowid must be an integer
|
||||
*rows_processed += 1;
|
||||
batch_count += 1;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
// Get all values except the rowid
|
||||
let values = all_values[..all_values.len() - 1].to_vec();
|
||||
(rowid, values)
|
||||
// Determine how to extract the rowid
|
||||
// If there's a rowid alias (INTEGER PRIMARY KEY), the rowid is one of the columns
|
||||
// Otherwise, it's the last value we explicitly selected
|
||||
let (rowid, values) =
|
||||
if let Some((idx, _)) = self.base_table.get_rowid_alias_column() {
|
||||
// The rowid is the value at the rowid alias column index
|
||||
let rowid = match all_values.get(idx) {
|
||||
Some(crate::types::Value::Integer(id)) => *id,
|
||||
_ => {
|
||||
// This shouldn't happen - rowid alias must be an integer
|
||||
rows_processed += 1;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
// All values are table columns (no separate rowid was selected)
|
||||
(rowid, all_values)
|
||||
} else {
|
||||
// The last value is the explicitly selected rowid
|
||||
let rowid = match all_values.last() {
|
||||
Some(crate::types::Value::Integer(id)) => *id,
|
||||
_ => {
|
||||
// This shouldn't happen - rowid must be an integer
|
||||
rows_processed += 1;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
// Get all values except the rowid
|
||||
let values = all_values[..all_values.len() - 1].to_vec();
|
||||
(rowid, values)
|
||||
};
|
||||
|
||||
// Add to batch delta - let merge_delta handle filtering and aggregation
|
||||
batch_delta.insert(rowid, values);
|
||||
// Create a single-row delta and process it immediately
|
||||
let mut single_row_delta = Delta::new();
|
||||
single_row_delta.insert(rowid, values.clone());
|
||||
|
||||
*rows_processed += 1;
|
||||
batch_count += 1;
|
||||
// Process this single row through merge_delta with the pager
|
||||
match self.merge_delta(&single_row_delta, pager.clone())? {
|
||||
IOResult::Done(_) => {
|
||||
// Row processed successfully, continue to next row
|
||||
rows_processed += 1;
|
||||
}
|
||||
crate::vdbe::StepResult::Done => {
|
||||
// Process any remaining rows in the batch
|
||||
self.merge_delta(&batch_delta);
|
||||
// All rows processed, move to Done state
|
||||
self.populate_state = PopulateState::Done;
|
||||
return Ok(IOResult::Done(()));
|
||||
}
|
||||
crate::vdbe::StepResult::Interrupt | crate::vdbe::StepResult::Busy => {
|
||||
return Err(LimboError::Busy);
|
||||
}
|
||||
crate::vdbe::StepResult::IO => {
|
||||
// Process current batch before yielding
|
||||
self.merge_delta(&batch_delta);
|
||||
// The Statement needs to wait for IO
|
||||
io_yield_one!(Completion::new_dummy());
|
||||
IOResult::IO(io) => {
|
||||
// Save state and return I/O
|
||||
// We'll resume at the SAME row when called again (don't increment rows_processed)
|
||||
// The circuit still has unfinished work for this row
|
||||
self.populate_state = PopulateState::Processing {
|
||||
stmt,
|
||||
rows_processed, // Don't increment - row not done yet!
|
||||
pending_row: Some((rowid, values)), // Save the row for resumption
|
||||
};
|
||||
return Ok(IOResult::IO(io));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
PopulateState::Done => {
|
||||
// Already populated
|
||||
return Ok(IOResult::Done(()));
|
||||
crate::vdbe::StepResult::Done => {
|
||||
// All rows processed, we're done
|
||||
self.populate_state = PopulateState::Done;
|
||||
return Ok(IOResult::Done(()));
|
||||
}
|
||||
|
||||
crate::vdbe::StepResult::Interrupt | crate::vdbe::StepResult::Busy => {
|
||||
// Save state before returning error
|
||||
self.populate_state = PopulateState::Processing {
|
||||
stmt,
|
||||
rows_processed,
|
||||
pending_row: None, // No pending row when interrupted between rows
|
||||
};
|
||||
return Err(LimboError::Busy);
|
||||
}
|
||||
|
||||
crate::vdbe::StepResult::IO => {
|
||||
// Statement needs I/O - save state and return
|
||||
self.populate_state = PopulateState::Processing {
|
||||
stmt,
|
||||
rows_processed,
|
||||
pending_row: None, // No pending row when interrupted between rows
|
||||
};
|
||||
// TODO: Get the actual I/O completion from the statement
|
||||
let completion = crate::io::Completion::new_dummy();
|
||||
return Ok(IOResult::IO(crate::types::IOCompletions::Single(
|
||||
completion,
|
||||
)));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -555,95 +769,23 @@ impl IncrementalView {
|
||||
None
|
||||
}
|
||||
|
||||
/// Get the current records as an iterator - for cursor-based access
|
||||
pub fn iter(&self) -> impl Iterator<Item = (i64, Vec<Value>)> + '_ {
|
||||
self.stream.to_vec().into_iter().filter_map(move |row| {
|
||||
self.records
|
||||
.get(&row.rowid)
|
||||
.map(|values| (row.rowid, values.clone()))
|
||||
})
|
||||
}
|
||||
|
||||
/// Get current data merged with transaction state
|
||||
pub fn current_data(&self, tx_state: Option<&ViewTransactionState>) -> Vec<(i64, Vec<Value>)> {
|
||||
if let Some(tx_state) = tx_state {
|
||||
// Use circuit to process uncommitted changes
|
||||
let mut uncommitted = DeltaSet::new();
|
||||
uncommitted.insert(self.base_table.name.clone(), tx_state.delta.clone());
|
||||
|
||||
// Execute with uncommitted changes (won't affect circuit state)
|
||||
match self.circuit.execute(HashMap::new(), uncommitted) {
|
||||
Ok(processed_delta) => {
|
||||
// Merge processed delta with committed records
|
||||
let mut result_map: BTreeMap<i64, Vec<Value>> = self.records.clone();
|
||||
for (row, weight) in &processed_delta.changes {
|
||||
if *weight > 0 {
|
||||
result_map.insert(row.rowid, row.values.clone());
|
||||
} else if *weight < 0 {
|
||||
result_map.remove(&row.rowid);
|
||||
}
|
||||
}
|
||||
result_map.into_iter().collect()
|
||||
}
|
||||
Err(e) => {
|
||||
// Return error or panic - no fallback
|
||||
panic!("Failed to execute circuit with uncommitted data: {e:?}");
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// No transaction state: return committed records
|
||||
self.records.clone().into_iter().collect()
|
||||
}
|
||||
}
|
||||
|
||||
/// Merge a delta of changes into the view's current state
|
||||
pub fn merge_delta(&mut self, delta: &Delta) {
|
||||
pub fn merge_delta(
|
||||
&mut self,
|
||||
delta: &Delta,
|
||||
pager: std::rc::Rc<crate::Pager>,
|
||||
) -> crate::Result<IOResult<()>> {
|
||||
// Early return if delta is empty
|
||||
if delta.is_empty() {
|
||||
return;
|
||||
return Ok(IOResult::Done(()));
|
||||
}
|
||||
|
||||
// Use the circuit to process the delta
|
||||
// Use the circuit to process the delta and write to btree
|
||||
let mut input_data = HashMap::new();
|
||||
input_data.insert(self.base_table.name.clone(), delta.clone());
|
||||
|
||||
// If circuit hasn't been initialized yet, initialize it first
|
||||
// This happens during populate_from_table
|
||||
if !self.circuit_initialized {
|
||||
// Initialize the circuit with empty state
|
||||
self.circuit
|
||||
.initialize(HashMap::new())
|
||||
.expect("Failed to initialize circuit");
|
||||
self.circuit_initialized = true;
|
||||
}
|
||||
|
||||
// Execute the circuit to process the delta
|
||||
let current_delta = match self.circuit.execute(input_data.clone(), DeltaSet::empty()) {
|
||||
Ok(output) => {
|
||||
// Commit the changes to the circuit's internal state
|
||||
self.circuit
|
||||
.commit(input_data)
|
||||
.expect("Failed to commit to circuit");
|
||||
output
|
||||
}
|
||||
Err(e) => {
|
||||
panic!("Failed to execute circuit: {e:?}");
|
||||
}
|
||||
};
|
||||
|
||||
// Update records and stream with the processed delta
|
||||
let mut zset_delta = RowKeyZSet::new();
|
||||
|
||||
for (row, weight) in ¤t_delta.changes {
|
||||
if *weight > 0 {
|
||||
self.records.insert(row.rowid, row.values.clone());
|
||||
zset_delta.insert(row.clone(), 1);
|
||||
} else if *weight < 0 {
|
||||
self.records.remove(&row.rowid);
|
||||
zset_delta.insert(row.clone(), -1);
|
||||
}
|
||||
}
|
||||
|
||||
self.stream.apply_delta(&zset_delta);
|
||||
// The circuit now handles all btree I/O internally with the provided pager
|
||||
let _delta = return_if_io!(self.circuit.commit(input_data, pager));
|
||||
Ok(IOResult::Done(()))
|
||||
}
|
||||
}
|
||||
|
||||
25
core/lib.rs
25
core/lib.rs
@@ -32,7 +32,6 @@ mod uuid;
|
||||
mod vdbe;
|
||||
mod vector;
|
||||
mod vtab;
|
||||
mod vtab_view;
|
||||
|
||||
#[cfg(feature = "fuzz")]
|
||||
pub mod numeric;
|
||||
@@ -40,7 +39,7 @@ pub mod numeric;
|
||||
#[cfg(not(feature = "fuzz"))]
|
||||
mod numeric;
|
||||
|
||||
use crate::incremental::view::ViewTransactionState;
|
||||
use crate::incremental::view::AllViewsTxState;
|
||||
use crate::storage::encryption::CipherMode;
|
||||
use crate::translate::optimizer::optimize_plan;
|
||||
use crate::translate::pragma::TURSO_CDC_DEFAULT_TABLE_NAME;
|
||||
@@ -441,13 +440,6 @@ impl Database {
|
||||
Ok(())
|
||||
})?;
|
||||
}
|
||||
// FIXME: the correct way to do this is to just materialize the view.
|
||||
// But this will allow us to keep going.
|
||||
let conn = db.connect()?;
|
||||
let pager = conn.pager.borrow().clone();
|
||||
pager
|
||||
.io
|
||||
.block(|| conn.schema.borrow().populate_materialized_views(&conn))?;
|
||||
Ok(db)
|
||||
}
|
||||
|
||||
@@ -489,7 +481,7 @@ impl Database {
|
||||
attached_databases: RefCell::new(DatabaseCatalog::new()),
|
||||
query_only: Cell::new(false),
|
||||
mv_tx_id: Cell::new(None),
|
||||
view_transaction_states: RefCell::new(HashMap::new()),
|
||||
view_transaction_states: AllViewsTxState::new(),
|
||||
metrics: RefCell::new(ConnectionMetrics::new()),
|
||||
is_nested_stmt: Cell::new(false),
|
||||
encryption_key: RefCell::new(None),
|
||||
@@ -926,7 +918,7 @@ pub struct Connection {
|
||||
|
||||
/// Per-connection view transaction states for uncommitted changes. This represents
|
||||
/// one entry per view that was touched in the transaction.
|
||||
view_transaction_states: RefCell<HashMap<String, ViewTransactionState>>,
|
||||
view_transaction_states: AllViewsTxState,
|
||||
/// Connection-level metrics aggregation
|
||||
pub metrics: RefCell<ConnectionMetrics>,
|
||||
/// Whether the connection is executing a statement initiated by another statement.
|
||||
@@ -1072,7 +1064,7 @@ impl Connection {
|
||||
|
||||
// Preserve existing views to avoid expensive repopulation.
|
||||
// TODO: We may not need to do this if we materialize our views.
|
||||
let existing_views = self.schema.borrow().materialized_views.clone();
|
||||
let existing_views = self.schema.borrow().incremental_views.clone();
|
||||
|
||||
// TODO: this is hack to avoid a cyclical problem with schema reprepare
|
||||
// The problem here is that we prepare a statement here, but when the statement tries
|
||||
@@ -1096,13 +1088,6 @@ impl Connection {
|
||||
self.with_schema_mut(|schema| {
|
||||
*schema = fresh;
|
||||
});
|
||||
|
||||
{
|
||||
let schema = self.schema.borrow();
|
||||
pager
|
||||
.io
|
||||
.block(|| schema.populate_materialized_views(self))?;
|
||||
}
|
||||
Result::Ok(())
|
||||
}
|
||||
|
||||
@@ -1716,7 +1701,7 @@ impl Connection {
|
||||
.expect("query must be parsed to statement");
|
||||
let syms = self.syms.borrow();
|
||||
self.with_schema_mut(|schema| {
|
||||
let existing_views = schema.materialized_views.clone();
|
||||
let existing_views = schema.incremental_views.clone();
|
||||
if let Err(LimboError::ExtensionError(e)) =
|
||||
parse_schema_rows(rows, schema, &syms, None, existing_views)
|
||||
{
|
||||
|
||||
192
core/schema.rs
192
core/schema.rs
@@ -1,8 +1,4 @@
|
||||
use crate::incremental::view::IncrementalView;
|
||||
use crate::types::IOResult;
|
||||
|
||||
/// Type alias for the materialized views collection
|
||||
pub type MaterializedViewsMap = HashMap<String, Arc<Mutex<IncrementalView>>>;
|
||||
|
||||
/// Simple view structure for non-materialized views
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -23,12 +19,12 @@ use crate::translate::plan::SelectPlan;
|
||||
use crate::util::{
|
||||
module_args_from_sql, module_name_from_sql, type_from_name, IOExt, UnparsedFromSqlIndex,
|
||||
};
|
||||
use crate::{return_if_io, LimboError, MvCursor, Pager, RefValue, SymbolTable, VirtualTable};
|
||||
use crate::{util::normalize_ident, Result};
|
||||
use crate::{LimboError, MvCursor, Pager, RefValue, SymbolTable, VirtualTable};
|
||||
use core::fmt;
|
||||
use std::cell::RefCell;
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::{BTreeSet, HashMap};
|
||||
use std::collections::{BTreeSet, HashMap, HashSet};
|
||||
use std::ops::Deref;
|
||||
use std::rc::Rc;
|
||||
use std::sync::Arc;
|
||||
@@ -42,6 +38,7 @@ use turso_parser::{
|
||||
|
||||
const SCHEMA_TABLE_NAME: &str = "sqlite_schema";
|
||||
const SCHEMA_TABLE_NAME_ALT: &str = "sqlite_master";
|
||||
pub const DBSP_TABLE_PREFIX: &str = "__turso_internal_dbsp_state_";
|
||||
|
||||
/// Check if a table name refers to a system table that should be protected from direct writes
|
||||
pub fn is_system_table(table_name: &str) -> bool {
|
||||
@@ -52,7 +49,14 @@ pub fn is_system_table(table_name: &str) -> bool {
|
||||
#[derive(Debug)]
|
||||
pub struct Schema {
|
||||
pub tables: HashMap<String, Arc<Table>>,
|
||||
pub materialized_views: MaterializedViewsMap,
|
||||
|
||||
/// Track which tables are actually materialized views
|
||||
pub materialized_view_names: HashSet<String>,
|
||||
/// Store original SQL for materialized views (for .schema command)
|
||||
pub materialized_view_sql: HashMap<String, String>,
|
||||
/// The incremental view objects (DBSP circuits)
|
||||
pub incremental_views: HashMap<String, Arc<Mutex<IncrementalView>>>,
|
||||
|
||||
pub views: ViewsMap,
|
||||
|
||||
/// table_name to list of indexes for the table
|
||||
@@ -81,12 +85,16 @@ impl Schema {
|
||||
Arc::new(Table::Virtual(Arc::new((*function).clone()))),
|
||||
);
|
||||
}
|
||||
let materialized_views: MaterializedViewsMap = HashMap::new();
|
||||
let materialized_view_names = HashSet::new();
|
||||
let materialized_view_sql = HashMap::new();
|
||||
let incremental_views = HashMap::new();
|
||||
let views: ViewsMap = HashMap::new();
|
||||
let table_to_materialized_views: HashMap<String, Vec<String>> = HashMap::new();
|
||||
Self {
|
||||
tables,
|
||||
materialized_views,
|
||||
materialized_view_names,
|
||||
materialized_view_sql,
|
||||
incremental_views,
|
||||
views,
|
||||
indexes,
|
||||
has_indexes,
|
||||
@@ -102,41 +110,51 @@ impl Schema {
|
||||
.iter()
|
||||
.any(|idx| idx.1.iter().any(|i| i.name == name))
|
||||
}
|
||||
pub fn add_materialized_view(&mut self, view: IncrementalView) {
|
||||
pub fn add_materialized_view(&mut self, view: IncrementalView, table: Arc<Table>, sql: String) {
|
||||
let name = normalize_ident(view.name());
|
||||
self.materialized_views
|
||||
|
||||
// Add to tables (so it appears as a regular table)
|
||||
self.tables.insert(name.clone(), table);
|
||||
|
||||
// Track that this is a materialized view
|
||||
self.materialized_view_names.insert(name.clone());
|
||||
self.materialized_view_sql.insert(name.clone(), sql);
|
||||
|
||||
// Store the incremental view (DBSP circuit)
|
||||
self.incremental_views
|
||||
.insert(name, Arc::new(Mutex::new(view)));
|
||||
}
|
||||
|
||||
pub fn get_materialized_view(&self, name: &str) -> Option<Arc<Mutex<IncrementalView>>> {
|
||||
let name = normalize_ident(name);
|
||||
self.materialized_views.get(&name).cloned()
|
||||
self.incremental_views.get(&name).cloned()
|
||||
}
|
||||
|
||||
pub fn is_materialized_view(&self, name: &str) -> bool {
|
||||
let name = normalize_ident(name);
|
||||
self.materialized_view_names.contains(&name)
|
||||
}
|
||||
|
||||
pub fn remove_view(&mut self, name: &str) -> Result<()> {
|
||||
let name = normalize_ident(name);
|
||||
|
||||
// Check if we have both a regular view and a materialized view with the same name
|
||||
// It should be impossible to have both
|
||||
let has_regular_view = self.views.contains_key(&name);
|
||||
let has_materialized_view = self.materialized_views.contains_key(&name);
|
||||
|
||||
assert!(
|
||||
!(has_regular_view && has_materialized_view),
|
||||
"Found both regular view and materialized view with name: {name}"
|
||||
);
|
||||
|
||||
if has_regular_view {
|
||||
if self.views.contains_key(&name) {
|
||||
self.views.remove(&name);
|
||||
Ok(())
|
||||
} else if has_materialized_view {
|
||||
} else if self.materialized_view_names.contains(&name) {
|
||||
// Remove from tables
|
||||
self.tables.remove(&name);
|
||||
|
||||
// Remove from materialized view tracking
|
||||
self.materialized_view_names.remove(&name);
|
||||
self.materialized_view_sql.remove(&name);
|
||||
self.incremental_views.remove(&name);
|
||||
|
||||
// Remove from table_to_materialized_views dependencies
|
||||
for views in self.table_to_materialized_views.values_mut() {
|
||||
views.retain(|v| v != &name);
|
||||
}
|
||||
|
||||
// Remove the materialized view itself
|
||||
self.materialized_views.remove(&name);
|
||||
Ok(())
|
||||
} else {
|
||||
Err(crate::LimboError::ParseError(format!(
|
||||
@@ -165,30 +183,6 @@ impl Schema {
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
/// Get all materialized views that depend on a given table, skip normalizing ident.
|
||||
/// We are basically assuming we already normalized the ident.
|
||||
pub fn get_dependent_materialized_views_unnormalized(
|
||||
&self,
|
||||
table_name: &str,
|
||||
) -> Option<&Vec<String>> {
|
||||
self.table_to_materialized_views.get(table_name)
|
||||
}
|
||||
|
||||
/// Populate all materialized views by scanning their source tables
|
||||
/// Returns IOResult to support async execution
|
||||
pub fn populate_materialized_views(
|
||||
&self,
|
||||
conn: &Arc<crate::Connection>,
|
||||
) -> Result<IOResult<()>> {
|
||||
for view in self.materialized_views.values() {
|
||||
let mut view = view
|
||||
.lock()
|
||||
.map_err(|_| LimboError::InternalError("Failed to lock view".to_string()))?;
|
||||
return_if_io!(view.populate_from_table(conn));
|
||||
}
|
||||
Ok(IOResult::Done(()))
|
||||
}
|
||||
|
||||
/// Add a regular (non-materialized) view
|
||||
pub fn add_view(&mut self, view: View) {
|
||||
let name = normalize_ident(&view.name);
|
||||
@@ -224,6 +218,12 @@ impl Schema {
|
||||
pub fn remove_table(&mut self, table_name: &str) {
|
||||
let name = normalize_ident(table_name);
|
||||
self.tables.remove(&name);
|
||||
|
||||
// If this was a materialized view, also clean up the metadata
|
||||
if self.materialized_view_names.remove(&name) {
|
||||
self.incremental_views.remove(&name);
|
||||
self.materialized_view_sql.remove(&name);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_btree_table(&self, name: &str) -> Option<Arc<BTreeTable>> {
|
||||
@@ -297,8 +297,10 @@ impl Schema {
|
||||
let mut automatic_indices: HashMap<String, Vec<(String, usize)>> =
|
||||
HashMap::with_capacity(10);
|
||||
|
||||
// Collect materialized views for second pass to populate table_to_materialized_views mapping
|
||||
let mut materialized_views_to_process: Vec<(String, Vec<String>)> = Vec::new();
|
||||
// Store DBSP state table root pages: view_name -> dbsp_state_root_page
|
||||
let mut dbsp_state_roots: HashMap<String, usize> = HashMap::new();
|
||||
// Store materialized view info (SQL and root page) for later creation
|
||||
let mut materialized_view_info: HashMap<String, (String, usize)> = HashMap::new();
|
||||
|
||||
if matches!(pager.begin_read_tx()?, LimboResult::Busy) {
|
||||
return Err(LimboError::Busy);
|
||||
@@ -357,6 +359,18 @@ impl Schema {
|
||||
}
|
||||
|
||||
let table = BTreeTable::from_sql(sql, root_page as usize)?;
|
||||
|
||||
// Check if this is a DBSP state table
|
||||
if table.name.starts_with(DBSP_TABLE_PREFIX) {
|
||||
// Extract the view name from _dbsp_state_<viewname>
|
||||
let view_name = table
|
||||
.name
|
||||
.strip_prefix(DBSP_TABLE_PREFIX)
|
||||
.unwrap()
|
||||
.to_string();
|
||||
dbsp_state_roots.insert(view_name, root_page as usize);
|
||||
}
|
||||
|
||||
self.add_btree_table(Arc::new(table));
|
||||
}
|
||||
"index" => {
|
||||
@@ -418,6 +432,14 @@ impl Schema {
|
||||
};
|
||||
let name = name_text.as_str();
|
||||
|
||||
// Get the root page (column 3) to determine if this is a materialized view
|
||||
// Regular views have rootpage = 0, materialized views have rootpage != 0
|
||||
let root_page_value = record_cursor.get_value(&row, 3)?;
|
||||
let RefValue::Integer(root_page_int) = root_page_value else {
|
||||
return Err(LimboError::ConversionError("Expected integer value".into()));
|
||||
};
|
||||
let root_page = root_page_int as usize;
|
||||
|
||||
let sql_value = record_cursor.get_value(&row, 4)?;
|
||||
let RefValue::Text(sql_text) = sql_value else {
|
||||
return Err(LimboError::ConversionError("Expected text value".into()));
|
||||
@@ -429,15 +451,12 @@ impl Schema {
|
||||
if let Ok(Some(Cmd::Stmt(stmt))) = parser.next_cmd() {
|
||||
match stmt {
|
||||
Stmt::CreateMaterializedView { .. } => {
|
||||
// Create IncrementalView for materialized views
|
||||
if let Ok(incremental_view) = IncrementalView::from_sql(sql, self) {
|
||||
let referenced_tables =
|
||||
incremental_view.get_referenced_table_names();
|
||||
let view_name = name.to_string();
|
||||
self.add_materialized_view(incremental_view);
|
||||
materialized_views_to_process
|
||||
.push((view_name, referenced_tables));
|
||||
}
|
||||
// Store materialized view info for later creation
|
||||
// We'll create the actual IncrementalView in a later pass
|
||||
// when we have both the main root page and DBSP state root
|
||||
let view_name = name.to_string();
|
||||
materialized_view_info
|
||||
.insert(view_name, (sql.to_string(), root_page));
|
||||
}
|
||||
Stmt::CreateView {
|
||||
view_name: _,
|
||||
@@ -481,14 +500,6 @@ impl Schema {
|
||||
|
||||
pager.end_read_tx()?;
|
||||
|
||||
// Second pass: populate table_to_materialized_views mapping
|
||||
for (view_name, referenced_tables) in materialized_views_to_process {
|
||||
// Register this view as dependent on each referenced table
|
||||
for table_name in referenced_tables {
|
||||
self.add_materialized_view_dependency(&table_name, &view_name);
|
||||
}
|
||||
}
|
||||
|
||||
for unparsed_sql_from_index in from_sql_indexes {
|
||||
if !self.indexes_enabled() {
|
||||
self.table_set_has_index(&unparsed_sql_from_index.table_name);
|
||||
@@ -520,6 +531,39 @@ impl Schema {
|
||||
}
|
||||
}
|
||||
|
||||
// Third pass: Create materialized views now that we have both root pages
|
||||
for (view_name, (sql, main_root)) in materialized_view_info {
|
||||
// Look up the DBSP state root for this view - must exist for materialized views
|
||||
let dbsp_state_root = dbsp_state_roots.get(&view_name).ok_or_else(|| {
|
||||
LimboError::InternalError(format!(
|
||||
"Materialized view {view_name} is missing its DBSP state table"
|
||||
))
|
||||
})?;
|
||||
|
||||
// Create the IncrementalView with both root pages
|
||||
let incremental_view =
|
||||
IncrementalView::from_sql(&sql, self, main_root, *dbsp_state_root)?;
|
||||
let referenced_tables = incremental_view.get_referenced_table_names();
|
||||
|
||||
// Create a BTreeTable for the materialized view
|
||||
let table = Arc::new(Table::BTree(Arc::new(BTreeTable {
|
||||
name: view_name.clone(),
|
||||
root_page: main_root,
|
||||
columns: incremental_view.columns.clone(),
|
||||
primary_key_columns: Vec::new(),
|
||||
has_rowid: true,
|
||||
is_strict: false,
|
||||
unique_sets: None,
|
||||
})));
|
||||
|
||||
self.add_materialized_view(incremental_view, table, sql);
|
||||
|
||||
// Register dependencies
|
||||
for table_name in referenced_tables {
|
||||
self.add_materialized_view_dependency(&table_name, &view_name);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -565,15 +609,19 @@ impl Clone for Schema {
|
||||
(name.clone(), indexes)
|
||||
})
|
||||
.collect();
|
||||
let materialized_views = self
|
||||
.materialized_views
|
||||
let materialized_view_names = self.materialized_view_names.clone();
|
||||
let materialized_view_sql = self.materialized_view_sql.clone();
|
||||
let incremental_views = self
|
||||
.incremental_views
|
||||
.iter()
|
||||
.map(|(name, view)| (name.clone(), view.clone()))
|
||||
.collect();
|
||||
let views = self.views.clone();
|
||||
Self {
|
||||
tables,
|
||||
materialized_views,
|
||||
materialized_view_names,
|
||||
materialized_view_sql,
|
||||
incremental_views,
|
||||
views,
|
||||
indexes,
|
||||
has_indexes: self.has_indexes.clone(),
|
||||
|
||||
@@ -82,6 +82,12 @@ pub fn prepare_delete_plan(
|
||||
Some(table) => table,
|
||||
None => crate::bail_parse_error!("no such table: {}", tbl_name),
|
||||
};
|
||||
|
||||
// Check if this is a materialized view
|
||||
if schema.is_materialized_view(&tbl_name) {
|
||||
crate::bail_parse_error!("cannot modify materialized view {}", tbl_name);
|
||||
}
|
||||
|
||||
let table = if let Some(table) = table.virtual_table() {
|
||||
Table::Virtual(table.clone())
|
||||
} else if let Some(table) = table.btree() {
|
||||
|
||||
@@ -63,6 +63,7 @@ pub fn translate_insert(
|
||||
if with.is_some() {
|
||||
crate::bail_parse_error!("WITH clause is not supported");
|
||||
}
|
||||
|
||||
if on_conflict.is_some() {
|
||||
crate::bail_parse_error!("ON CONFLICT clause is not supported");
|
||||
}
|
||||
@@ -86,6 +87,11 @@ pub fn translate_insert(
|
||||
None => crate::bail_parse_error!("no such table: {}", table_name),
|
||||
};
|
||||
|
||||
// Check if this is a materialized view
|
||||
if schema.is_materialized_view(table_name.as_str()) {
|
||||
crate::bail_parse_error!("cannot modify materialized view {}", table_name);
|
||||
}
|
||||
|
||||
let resolver = Resolver::new(schema, syms);
|
||||
|
||||
if let Some(virtual_table) = &table.virtual_table() {
|
||||
|
||||
@@ -196,7 +196,8 @@ pub fn init_loop(
|
||||
t_ctx.meta_left_joins[table_index] = Some(lj_metadata);
|
||||
}
|
||||
}
|
||||
let (table_cursor_id, index_cursor_id) = table.open_cursors(program, mode)?;
|
||||
let (table_cursor_id, index_cursor_id) =
|
||||
table.open_cursors(program, mode, t_ctx.resolver.schema)?;
|
||||
match &table.op {
|
||||
Operation::Scan(Scan::BTreeTable { index, .. }) => match (mode, &table.table) {
|
||||
(OperationMode::SELECT, Table::BTree(btree)) => {
|
||||
|
||||
@@ -3,7 +3,7 @@ use turso_parser::ast::{self, SortOrder};
|
||||
|
||||
use crate::{
|
||||
function::AggFunc,
|
||||
schema::{BTreeTable, Column, FromClauseSubquery, Index, Table},
|
||||
schema::{BTreeTable, Column, FromClauseSubquery, Index, Schema, Table},
|
||||
vdbe::{
|
||||
builder::{CursorKey, CursorType, ProgramBuilder},
|
||||
insn::{IdxInsertFlags, Insn},
|
||||
@@ -852,6 +852,7 @@ impl JoinedTable {
|
||||
&self,
|
||||
program: &mut ProgramBuilder,
|
||||
mode: OperationMode,
|
||||
schema: &Schema,
|
||||
) -> Result<(Option<CursorID>, Option<CursorID>)> {
|
||||
let index = self.op.index();
|
||||
match &self.table {
|
||||
@@ -863,10 +864,17 @@ impl JoinedTable {
|
||||
let table_cursor_id = if table_not_required {
|
||||
None
|
||||
} else {
|
||||
Some(program.alloc_cursor_id_keyed(
|
||||
CursorKey::table(self.internal_id),
|
||||
CursorType::BTreeTable(btree.clone()),
|
||||
))
|
||||
// Check if this is a materialized view
|
||||
let cursor_type =
|
||||
if let Some(view_mutex) = schema.get_materialized_view(&btree.name) {
|
||||
CursorType::MaterializedView(btree.clone(), view_mutex)
|
||||
} else {
|
||||
CursorType::BTreeTable(btree.clone())
|
||||
};
|
||||
Some(
|
||||
program
|
||||
.alloc_cursor_id_keyed(CursorKey::table(self.internal_id), cursor_type),
|
||||
)
|
||||
};
|
||||
let index_cursor_id = index.map(|index| {
|
||||
program.alloc_cursor_id_keyed(
|
||||
|
||||
@@ -3,9 +3,9 @@ use std::sync::Arc;
|
||||
use super::{
|
||||
expr::walk_expr,
|
||||
plan::{
|
||||
Aggregate, ColumnUsedMask, Distinctness, EvalAt, JoinInfo, JoinOrderMember, JoinedTable,
|
||||
Operation, OuterQueryReference, Plan, QueryDestination, ResultSetColumn, Scan,
|
||||
TableReferences, WhereTerm,
|
||||
Aggregate, ColumnUsedMask, Distinctness, EvalAt, IterationDirection, JoinInfo,
|
||||
JoinOrderMember, JoinedTable, Operation, OuterQueryReference, Plan, QueryDestination,
|
||||
ResultSetColumn, Scan, TableReferences, WhereTerm,
|
||||
},
|
||||
select::prepare_select_plan,
|
||||
SymbolTable,
|
||||
@@ -529,12 +529,29 @@ fn parse_table(
|
||||
schema.get_materialized_view(table_name.as_str())
|
||||
});
|
||||
if let Some(view) = view {
|
||||
// Create a virtual table wrapper for the view
|
||||
// We'll use the view's columns from the schema
|
||||
let vtab = crate::vtab_view::create_view_virtual_table(
|
||||
normalize_ident(table_name.as_str()).as_str(),
|
||||
view.clone(),
|
||||
)?;
|
||||
// Check if this materialized view has persistent storage
|
||||
let view_guard = view.lock().unwrap();
|
||||
let root_page = view_guard.get_root_page();
|
||||
|
||||
if root_page == 0 {
|
||||
drop(view_guard);
|
||||
return Err(crate::LimboError::InternalError(
|
||||
"Materialized view has no storage allocated".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
// This is a materialized view with storage - treat it as a regular BTree table
|
||||
// Create a BTreeTable from the view's metadata
|
||||
let btree_table = Arc::new(crate::schema::BTreeTable {
|
||||
name: view_guard.name().to_string(),
|
||||
root_page,
|
||||
columns: view_guard.columns.clone(),
|
||||
primary_key_columns: Vec::new(),
|
||||
has_rowid: true,
|
||||
is_strict: false,
|
||||
unique_sets: None,
|
||||
});
|
||||
drop(view_guard);
|
||||
|
||||
let alias = maybe_alias
|
||||
.map(|a| match a {
|
||||
@@ -544,12 +561,11 @@ fn parse_table(
|
||||
.map(|a| normalize_ident(a.as_str()));
|
||||
|
||||
table_references.add_joined_table(JoinedTable {
|
||||
op: Operation::Scan(Scan::VirtualTable {
|
||||
idx_num: -1,
|
||||
idx_str: None,
|
||||
constraints: Vec::new(),
|
||||
op: Operation::Scan(Scan::BTreeTable {
|
||||
iter_dir: IterationDirection::Forwards,
|
||||
index: None,
|
||||
}),
|
||||
table: Table::Virtual(vtab),
|
||||
table: Table::BTree(btree_table),
|
||||
identifier: alias.unwrap_or(normalized_qualified_name),
|
||||
internal_id: table_ref_counter.next(),
|
||||
join_info: None,
|
||||
|
||||
@@ -690,6 +690,14 @@ pub fn translate_drop_table(
|
||||
}
|
||||
|
||||
let table = table.unwrap(); // safe since we just checked for None
|
||||
|
||||
// Check if this is a materialized view - if so, refuse to drop it with DROP TABLE
|
||||
if schema.is_materialized_view(tbl_name.name.as_str()) {
|
||||
bail_parse_error!(
|
||||
"Cannot DROP TABLE on materialized view {}. Use DROP VIEW instead.",
|
||||
tbl_name.name.as_str()
|
||||
);
|
||||
}
|
||||
let cdc_table = prepare_cdc_if_necessary(&mut program, schema, SQLITE_TABLEID)?;
|
||||
|
||||
let null_reg = program.alloc_register(); // r1
|
||||
|
||||
@@ -140,6 +140,12 @@ pub fn prepare_update_plan(
|
||||
Some(table) => table,
|
||||
None => bail_parse_error!("Parse error: no such table: {}", table_name),
|
||||
};
|
||||
|
||||
// Check if this is a materialized view
|
||||
if schema.is_materialized_view(table_name.as_str()) {
|
||||
bail_parse_error!("cannot modify materialized view {}", table_name);
|
||||
}
|
||||
|
||||
let table_name = table.get_name();
|
||||
let iter_dir = body
|
||||
.order_by
|
||||
|
||||
@@ -1,69 +1,14 @@
|
||||
use crate::schema::Schema;
|
||||
use crate::schema::{Schema, DBSP_TABLE_PREFIX};
|
||||
use crate::storage::pager::CreateBTreeFlags;
|
||||
use crate::translate::emitter::Resolver;
|
||||
use crate::translate::schema::{emit_schema_entry, SchemaEntryType, SQLITE_TABLEID};
|
||||
use crate::util::normalize_ident;
|
||||
use crate::vdbe::builder::{CursorType, ProgramBuilder};
|
||||
use crate::vdbe::insn::{CmpInsFlags, Cookie, Insn};
|
||||
use crate::vdbe::insn::{CmpInsFlags, Cookie, Insn, RegisterOrLiteral};
|
||||
use crate::{Connection, Result, SymbolTable};
|
||||
use std::sync::Arc;
|
||||
use turso_parser::ast;
|
||||
|
||||
/// Common logic for creating views (both regular and materialized)
|
||||
fn emit_create_view_program(
|
||||
schema: &Schema,
|
||||
view_name: &str,
|
||||
sql: String,
|
||||
syms: &SymbolTable,
|
||||
program: &mut ProgramBuilder,
|
||||
populate_materialized: bool,
|
||||
) -> Result<()> {
|
||||
let normalized_view_name = normalize_ident(view_name);
|
||||
|
||||
// Open cursor to sqlite_schema table
|
||||
let table = schema.get_btree_table(SQLITE_TABLEID).unwrap();
|
||||
let sqlite_schema_cursor_id = program.alloc_cursor_id(CursorType::BTreeTable(table.clone()));
|
||||
program.emit_insn(Insn::OpenWrite {
|
||||
cursor_id: sqlite_schema_cursor_id,
|
||||
root_page: 1usize.into(),
|
||||
db: 0,
|
||||
});
|
||||
|
||||
// Add the view entry to sqlite_schema
|
||||
let resolver = Resolver::new(schema, syms);
|
||||
emit_schema_entry(
|
||||
program,
|
||||
&resolver,
|
||||
sqlite_schema_cursor_id,
|
||||
None, // cdc_table_cursor_id, no cdc for views
|
||||
SchemaEntryType::View,
|
||||
&normalized_view_name,
|
||||
&normalized_view_name, // for views, tbl_name is same as name
|
||||
0, // views don't have a root page
|
||||
Some(sql),
|
||||
)?;
|
||||
|
||||
// Parse schema to load the new view
|
||||
program.emit_insn(Insn::ParseSchema {
|
||||
db: sqlite_schema_cursor_id,
|
||||
where_clause: Some(format!("name = '{normalized_view_name}'")),
|
||||
});
|
||||
|
||||
program.emit_insn(Insn::SetCookie {
|
||||
db: 0,
|
||||
cookie: Cookie::SchemaVersion,
|
||||
value: (schema.schema_version + 1) as i32,
|
||||
p5: 0,
|
||||
});
|
||||
|
||||
// Populate materialized views if needed
|
||||
// Note: This must come after SetCookie since it may do I/O operations
|
||||
if populate_materialized {
|
||||
program.emit_insn(Insn::PopulateMaterializedViews);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn translate_create_materialized_view(
|
||||
schema: &Schema,
|
||||
view_name: &str,
|
||||
@@ -92,17 +37,144 @@ pub fn translate_create_materialized_view(
|
||||
)));
|
||||
}
|
||||
|
||||
// Validate that this view can be created as an IncrementalView
|
||||
// Validate the view can be created and extract its columns
|
||||
// This validation happens before updating sqlite_master to prevent
|
||||
// storing invalid view definitions
|
||||
use crate::incremental::view::IncrementalView;
|
||||
IncrementalView::can_create_view(select_stmt)?;
|
||||
use crate::schema::BTreeTable;
|
||||
let view_columns = IncrementalView::validate_and_extract_columns(select_stmt, schema)?;
|
||||
|
||||
// Reconstruct the SQL string
|
||||
// Reconstruct the SQL string for storage
|
||||
let sql = create_materialized_view_to_str(view_name, select_stmt);
|
||||
|
||||
// Use common logic to emit the view creation program
|
||||
emit_create_view_program(schema, view_name, sql, syms, &mut program, true)?;
|
||||
// Create a btree for storing the materialized view state
|
||||
// This btree will hold the materialized rows (row_id -> values)
|
||||
let view_root_reg = program.alloc_register();
|
||||
|
||||
program.emit_insn(Insn::CreateBtree {
|
||||
db: 0,
|
||||
root: view_root_reg,
|
||||
flags: CreateBTreeFlags::new_table(),
|
||||
});
|
||||
|
||||
// Create a second btree for DBSP operator state (e.g., aggregate state)
|
||||
// This is stored as a hidden table: __turso_internal_dbsp_state_<view_name>
|
||||
let dbsp_state_root_reg = program.alloc_register();
|
||||
|
||||
program.emit_insn(Insn::CreateBtree {
|
||||
db: 0,
|
||||
root: dbsp_state_root_reg,
|
||||
flags: CreateBTreeFlags::new_table(),
|
||||
});
|
||||
|
||||
// Create a proper BTreeTable for the cursor with the actual view columns
|
||||
let view_table = Arc::new(BTreeTable {
|
||||
root_page: 0, // Will be set to actual root page after creation
|
||||
name: normalized_view_name.clone(),
|
||||
columns: view_columns.clone(),
|
||||
primary_key_columns: vec![], // Materialized views use implicit rowid
|
||||
has_rowid: true,
|
||||
is_strict: false,
|
||||
unique_sets: None,
|
||||
});
|
||||
|
||||
// Allocate a cursor for writing to the view's btree during population
|
||||
let view_cursor_id = program.alloc_cursor_id(crate::vdbe::builder::CursorType::BTreeTable(
|
||||
view_table.clone(),
|
||||
));
|
||||
|
||||
// Open the cursor to the view's btree
|
||||
program.emit_insn(Insn::OpenWrite {
|
||||
cursor_id: view_cursor_id,
|
||||
root_page: RegisterOrLiteral::Register(view_root_reg),
|
||||
db: 0,
|
||||
});
|
||||
|
||||
// Clear any existing data in the btree
|
||||
// This is important because if we're reusing a page that previously held
|
||||
// a materialized view, there might be old data still there
|
||||
// We need to start with a clean slate
|
||||
let clear_loop_label = program.allocate_label();
|
||||
let clear_done_label = program.allocate_label();
|
||||
|
||||
// Rewind to the beginning of the btree
|
||||
program.emit_insn(Insn::Rewind {
|
||||
cursor_id: view_cursor_id,
|
||||
pc_if_empty: clear_done_label,
|
||||
});
|
||||
|
||||
// Loop to delete all rows
|
||||
program.preassign_label_to_next_insn(clear_loop_label);
|
||||
program.emit_insn(Insn::Delete {
|
||||
cursor_id: view_cursor_id,
|
||||
table_name: normalized_view_name.clone(),
|
||||
});
|
||||
program.emit_insn(Insn::Next {
|
||||
cursor_id: view_cursor_id,
|
||||
pc_if_next: clear_loop_label,
|
||||
});
|
||||
|
||||
program.preassign_label_to_next_insn(clear_done_label);
|
||||
|
||||
// Open cursor to sqlite_schema table
|
||||
let table = schema.get_btree_table(SQLITE_TABLEID).unwrap();
|
||||
let sqlite_schema_cursor_id = program.alloc_cursor_id(CursorType::BTreeTable(table.clone()));
|
||||
program.emit_insn(Insn::OpenWrite {
|
||||
cursor_id: sqlite_schema_cursor_id,
|
||||
root_page: 1usize.into(),
|
||||
db: 0,
|
||||
});
|
||||
|
||||
// Add the materialized view entry to sqlite_schema
|
||||
let resolver = Resolver::new(schema, syms);
|
||||
emit_schema_entry(
|
||||
&mut program,
|
||||
&resolver,
|
||||
sqlite_schema_cursor_id,
|
||||
None, // cdc_table_cursor_id, no cdc for views
|
||||
SchemaEntryType::View,
|
||||
&normalized_view_name,
|
||||
&normalized_view_name,
|
||||
view_root_reg, // btree root for materialized view data
|
||||
Some(sql),
|
||||
)?;
|
||||
|
||||
// Add the DBSP state table to sqlite_master (required for materialized views)
|
||||
let dbsp_table_name = format!("{DBSP_TABLE_PREFIX}{normalized_view_name}");
|
||||
let dbsp_sql = format!("CREATE TABLE {dbsp_table_name} (key INTEGER PRIMARY KEY, state BLOB)");
|
||||
|
||||
emit_schema_entry(
|
||||
&mut program,
|
||||
&resolver,
|
||||
sqlite_schema_cursor_id,
|
||||
None, // cdc_table_cursor_id
|
||||
SchemaEntryType::Table,
|
||||
&dbsp_table_name,
|
||||
&dbsp_table_name,
|
||||
dbsp_state_root_reg, // Root for DBSP state table
|
||||
Some(dbsp_sql),
|
||||
)?;
|
||||
|
||||
// Parse schema to load the new view and DBSP state table
|
||||
program.emit_insn(Insn::ParseSchema {
|
||||
db: sqlite_schema_cursor_id,
|
||||
where_clause: Some(format!(
|
||||
"name = '{normalized_view_name}' OR name = '{dbsp_table_name}'"
|
||||
)),
|
||||
});
|
||||
|
||||
program.emit_insn(Insn::SetCookie {
|
||||
db: 0,
|
||||
cookie: Cookie::SchemaVersion,
|
||||
value: (schema.schema_version + 1) as i32,
|
||||
p5: 0,
|
||||
});
|
||||
|
||||
// Populate the materialized view
|
||||
let cursor_info = vec![(normalized_view_name.clone(), view_cursor_id)];
|
||||
program.emit_insn(Insn::PopulateMaterializedViews {
|
||||
cursors: cursor_info,
|
||||
});
|
||||
|
||||
program.epilogue(schema);
|
||||
Ok(program)
|
||||
@@ -137,8 +209,41 @@ pub fn translate_create_view(
|
||||
// Reconstruct the SQL string
|
||||
let sql = create_view_to_str(view_name, select_stmt);
|
||||
|
||||
// Use common logic to emit the view creation program
|
||||
emit_create_view_program(schema, view_name, sql, syms, &mut program, false)?;
|
||||
// Open cursor to sqlite_schema table
|
||||
let table = schema.get_btree_table(SQLITE_TABLEID).unwrap();
|
||||
let sqlite_schema_cursor_id = program.alloc_cursor_id(CursorType::BTreeTable(table.clone()));
|
||||
program.emit_insn(Insn::OpenWrite {
|
||||
cursor_id: sqlite_schema_cursor_id,
|
||||
root_page: 1usize.into(),
|
||||
db: 0,
|
||||
});
|
||||
|
||||
// Add the view entry to sqlite_schema
|
||||
let resolver = Resolver::new(schema, syms);
|
||||
emit_schema_entry(
|
||||
&mut program,
|
||||
&resolver,
|
||||
sqlite_schema_cursor_id,
|
||||
None, // cdc_table_cursor_id, no cdc for views
|
||||
SchemaEntryType::View,
|
||||
&normalized_view_name,
|
||||
&normalized_view_name,
|
||||
0, // Regular views don't have a btree
|
||||
Some(sql),
|
||||
)?;
|
||||
|
||||
// Parse schema to load the new view
|
||||
program.emit_insn(Insn::ParseSchema {
|
||||
db: sqlite_schema_cursor_id,
|
||||
where_clause: Some(format!("name = '{normalized_view_name}'")),
|
||||
});
|
||||
|
||||
program.emit_insn(Insn::SetCookie {
|
||||
db: 0,
|
||||
cookie: Cookie::SchemaVersion,
|
||||
value: (schema.schema_version + 1) as i32,
|
||||
p5: 0,
|
||||
});
|
||||
|
||||
Ok(program)
|
||||
}
|
||||
@@ -156,10 +261,9 @@ pub fn translate_drop_view(
|
||||
let normalized_view_name = normalize_ident(view_name);
|
||||
|
||||
// Check if view exists (either regular or materialized)
|
||||
let view_exists = schema.get_view(&normalized_view_name).is_some()
|
||||
|| schema
|
||||
.get_materialized_view(&normalized_view_name)
|
||||
.is_some();
|
||||
let is_regular_view = schema.get_view(&normalized_view_name).is_some();
|
||||
let is_materialized_view = schema.is_materialized_view(&normalized_view_name);
|
||||
let view_exists = is_regular_view || is_materialized_view;
|
||||
|
||||
if !view_exists && !if_exists {
|
||||
return Err(crate::LimboError::ParseError(format!(
|
||||
@@ -172,6 +276,20 @@ pub fn translate_drop_view(
|
||||
return Ok(program);
|
||||
}
|
||||
|
||||
// If this is a materialized view, we need to destroy its btree as well
|
||||
if is_materialized_view {
|
||||
if let Some(table) = schema.get_table(&normalized_view_name) {
|
||||
if let Some(btree_table) = table.btree() {
|
||||
// Destroy the btree for the materialized view
|
||||
program.emit_insn(Insn::Destroy {
|
||||
root: btree_table.root_page,
|
||||
former_root_reg: 0, // No autovacuum
|
||||
is_temp: 0,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Open cursor to sqlite_schema table
|
||||
let schema_table = schema.get_btree_table(SQLITE_TABLEID).unwrap();
|
||||
let sqlite_schema_cursor_id =
|
||||
@@ -217,6 +335,8 @@ pub fn translate_drop_view(
|
||||
|
||||
// Check if type == 'view' and name == view_name
|
||||
let skip_delete_label = program.allocate_label();
|
||||
|
||||
// Both regular and materialized views are stored as type='view' in sqlite_schema
|
||||
program.emit_insn(Insn::Ne {
|
||||
lhs: col0_reg,
|
||||
rhs: type_reg,
|
||||
@@ -224,6 +344,7 @@ pub fn translate_drop_view(
|
||||
flags: CmpInsFlags::default(),
|
||||
collation: program.curr_collation(),
|
||||
});
|
||||
|
||||
program.emit_insn(Insn::Ne {
|
||||
lhs: col1_reg,
|
||||
rhs: view_name_reg,
|
||||
|
||||
@@ -2430,6 +2430,7 @@ pub enum Cursor {
|
||||
Pseudo(PseudoCursor),
|
||||
Sorter(Sorter),
|
||||
Virtual(VirtualTableCursor),
|
||||
MaterializedView(Box<crate::incremental::cursor::MaterializedViewCursor>),
|
||||
}
|
||||
|
||||
impl Cursor {
|
||||
@@ -2445,6 +2446,12 @@ impl Cursor {
|
||||
Self::Sorter(cursor)
|
||||
}
|
||||
|
||||
pub fn new_materialized_view(
|
||||
cursor: crate::incremental::cursor::MaterializedViewCursor,
|
||||
) -> Self {
|
||||
Self::MaterializedView(Box::new(cursor))
|
||||
}
|
||||
|
||||
pub fn as_btree_mut(&mut self) -> &mut BTreeCursor {
|
||||
match self {
|
||||
Self::BTree(cursor) => cursor,
|
||||
@@ -2472,6 +2479,15 @@ impl Cursor {
|
||||
_ => panic!("Cursor is not a virtual cursor"),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn as_materialized_view_mut(
|
||||
&mut self,
|
||||
) -> &mut crate::incremental::cursor::MaterializedViewCursor {
|
||||
match self {
|
||||
Self::MaterializedView(cursor) => cursor,
|
||||
_ => panic!("Cursor is not a materialized view cursor"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -2549,6 +2565,23 @@ macro_rules! return_if_io {
|
||||
};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! return_and_restore_if_io {
|
||||
($field:expr, $saved_state:expr, $e:expr) => {
|
||||
match $e {
|
||||
Ok(IOResult::Done(v)) => v,
|
||||
Ok(IOResult::IO(io)) => {
|
||||
let _ = std::mem::replace($field, $saved_state);
|
||||
return Ok(IOResult::IO(io));
|
||||
}
|
||||
Err(e) => {
|
||||
let _ = std::mem::replace($field, $saved_state);
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum SeekResult {
|
||||
/// Record matching the [SeekOp] found in the B-tree and cursor was positioned to point onto that record
|
||||
|
||||
137
core/util.rs
137
core/util.rs
@@ -1,14 +1,16 @@
|
||||
#![allow(unused)]
|
||||
use crate::incremental::view::IncrementalView;
|
||||
use crate::translate::expr::WalkControl;
|
||||
use crate::types::IOResult;
|
||||
use crate::{
|
||||
schema::{self, Column, MaterializedViewsMap, Schema, Type},
|
||||
schema::{self, BTreeTable, Column, Schema, Table, Type, DBSP_TABLE_PREFIX},
|
||||
translate::{collate::CollationSeq, expr::walk_expr, plan::JoinOrderMember},
|
||||
types::{Value, ValueType},
|
||||
LimboError, OpenFlags, Result, Statement, StepResult, SymbolTable,
|
||||
};
|
||||
use crate::{Connection, IO};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
rc::Rc,
|
||||
sync::{Arc, Mutex},
|
||||
};
|
||||
@@ -148,7 +150,7 @@ pub fn parse_schema_rows(
|
||||
schema: &mut Schema,
|
||||
syms: &SymbolTable,
|
||||
mv_tx_id: Option<u64>,
|
||||
mut existing_views: MaterializedViewsMap,
|
||||
mut existing_views: HashMap<String, Arc<Mutex<IncrementalView>>>,
|
||||
) -> Result<()> {
|
||||
rows.set_mv_tx_id(mv_tx_id);
|
||||
// TODO: if we IO, this unparsed indexes is lost. Will probably need some state between
|
||||
@@ -156,8 +158,12 @@ pub fn parse_schema_rows(
|
||||
let mut from_sql_indexes = Vec::with_capacity(10);
|
||||
let mut automatic_indices = std::collections::HashMap::with_capacity(10);
|
||||
|
||||
// Collect views for second pass to populate table_to_views mapping
|
||||
let mut views_to_process: Vec<(String, Vec<String>)> = Vec::new();
|
||||
// Store DBSP state table root pages: view_name -> dbsp_state_root_page
|
||||
let mut dbsp_state_roots: std::collections::HashMap<String, usize> =
|
||||
std::collections::HashMap::new();
|
||||
// Store materialized view info (SQL and root page) for later creation
|
||||
let mut materialized_view_info: std::collections::HashMap<String, (String, usize)> =
|
||||
std::collections::HashMap::new();
|
||||
loop {
|
||||
match rows.step()? {
|
||||
StepResult::Row => {
|
||||
@@ -189,6 +195,18 @@ pub fn parse_schema_rows(
|
||||
schema.add_virtual_table(vtab);
|
||||
} else {
|
||||
let table = schema::BTreeTable::from_sql(sql, root_page as usize)?;
|
||||
|
||||
// Check if this is a DBSP state table
|
||||
if table.name.starts_with(DBSP_TABLE_PREFIX) {
|
||||
// Extract the view name from __turso_internal_dbsp_state_<viewname>
|
||||
let view_name = table
|
||||
.name
|
||||
.strip_prefix(DBSP_TABLE_PREFIX)
|
||||
.unwrap()
|
||||
.to_string();
|
||||
dbsp_state_roots.insert(view_name, root_page as usize);
|
||||
}
|
||||
|
||||
schema.add_btree_table(Arc::new(table));
|
||||
}
|
||||
}
|
||||
@@ -228,6 +246,7 @@ pub fn parse_schema_rows(
|
||||
use turso_parser::parser::Parser;
|
||||
|
||||
let name: &str = row.get::<&str>(1)?;
|
||||
let root_page = row.get::<i64>(3)?;
|
||||
let sql: &str = row.get::<&str>(4)?;
|
||||
let view_name = name.to_string();
|
||||
|
||||
@@ -236,52 +255,17 @@ pub fn parse_schema_rows(
|
||||
if let Ok(Some(Cmd::Stmt(stmt))) = parser.next_cmd() {
|
||||
match stmt {
|
||||
Stmt::CreateMaterializedView { .. } => {
|
||||
// Handle materialized view with potential reuse
|
||||
let should_create_new = if let Some(existing_view) =
|
||||
existing_views.remove(&view_name)
|
||||
{
|
||||
// Check if we can reuse this view (same SQL definition)
|
||||
let can_reuse = if let Ok(view_guard) = existing_view.lock()
|
||||
{
|
||||
view_guard.has_same_sql(sql)
|
||||
} else {
|
||||
false
|
||||
};
|
||||
// Store materialized view info for later creation
|
||||
// We'll handle reuse logic and create the actual IncrementalView
|
||||
// in a later pass when we have both the main root page and DBSP state root
|
||||
materialized_view_info.insert(
|
||||
view_name.clone(),
|
||||
(sql.to_string(), root_page as usize),
|
||||
);
|
||||
|
||||
if can_reuse {
|
||||
// Reuse the existing view - it's already populated!
|
||||
let referenced_tables =
|
||||
if let Ok(view_guard) = existing_view.lock() {
|
||||
view_guard.get_referenced_table_names()
|
||||
} else {
|
||||
vec![]
|
||||
};
|
||||
|
||||
// Add the existing view to the new schema
|
||||
schema
|
||||
.materialized_views
|
||||
.insert(view_name.clone(), existing_view);
|
||||
|
||||
// Store for second pass processing
|
||||
views_to_process
|
||||
.push((view_name.clone(), referenced_tables));
|
||||
false // Don't create new
|
||||
} else {
|
||||
true // SQL changed, need to create new
|
||||
}
|
||||
} else {
|
||||
true // No existing view, need to create new
|
||||
};
|
||||
|
||||
if should_create_new {
|
||||
// Create a new IncrementalView
|
||||
// If this fails, we should propagate the error so the transaction rolls back
|
||||
let incremental_view =
|
||||
IncrementalView::from_sql(sql, schema)?;
|
||||
let referenced_tables =
|
||||
incremental_view.get_referenced_table_names();
|
||||
schema.add_materialized_view(incremental_view);
|
||||
views_to_process.push((view_name, referenced_tables));
|
||||
// Mark the existing view for potential reuse
|
||||
if existing_views.contains_key(&view_name) {
|
||||
// We'll check for reuse in the third pass
|
||||
}
|
||||
}
|
||||
Stmt::CreateView {
|
||||
@@ -359,11 +343,56 @@ pub fn parse_schema_rows(
|
||||
}
|
||||
}
|
||||
|
||||
// Second pass: populate table_to_views mapping
|
||||
for (view_name, referenced_tables) in views_to_process {
|
||||
// Register this view as dependent on each referenced table
|
||||
for table_name in referenced_tables {
|
||||
schema.add_materialized_view_dependency(&table_name, &view_name);
|
||||
// Third pass: Create materialized views now that we have both root pages
|
||||
for (view_name, (sql, main_root)) in materialized_view_info {
|
||||
// Look up the DBSP state root for this view - must exist for materialized views
|
||||
let dbsp_state_root = dbsp_state_roots.get(&view_name).ok_or_else(|| {
|
||||
LimboError::InternalError(format!(
|
||||
"Materialized view {view_name} is missing its DBSP state table"
|
||||
))
|
||||
})?;
|
||||
|
||||
// Check if we can reuse the existing view
|
||||
let mut reuse_view = false;
|
||||
if let Some(existing_view_mutex) = schema.get_materialized_view(&view_name) {
|
||||
let existing_view = existing_view_mutex.lock().unwrap();
|
||||
if let Some(existing_sql) = schema.materialized_view_sql.get(&view_name) {
|
||||
if existing_sql == &sql {
|
||||
reuse_view = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if reuse_view {
|
||||
// View already exists with same SQL, just update dependencies
|
||||
let existing_view_mutex = schema.get_materialized_view(&view_name).unwrap();
|
||||
let existing_view = existing_view_mutex.lock().unwrap();
|
||||
let referenced_tables = existing_view.get_referenced_table_names();
|
||||
drop(existing_view); // Release lock before modifying schema
|
||||
for table_name in referenced_tables {
|
||||
schema.add_materialized_view_dependency(&table_name, &view_name);
|
||||
}
|
||||
} else {
|
||||
// Create new IncrementalView with both root pages
|
||||
let incremental_view =
|
||||
IncrementalView::from_sql(&sql, schema, main_root, *dbsp_state_root)?;
|
||||
let referenced_tables = incremental_view.get_referenced_table_names();
|
||||
|
||||
// Create a Table for the materialized view
|
||||
let table = Arc::new(schema::Table::BTree(Arc::new(schema::BTreeTable {
|
||||
root_page: main_root,
|
||||
name: view_name.clone(),
|
||||
columns: incremental_view.columns.clone(), // Use the view's columns, not the base table's
|
||||
primary_key_columns: vec![],
|
||||
has_rowid: true,
|
||||
is_strict: false,
|
||||
unique_sets: None,
|
||||
})));
|
||||
|
||||
schema.add_materialized_view(incremental_view, table, sql.clone());
|
||||
for table_name in referenced_tables {
|
||||
schema.add_materialized_view_dependency(&table_name, &view_name);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -123,6 +123,10 @@ pub enum CursorType {
|
||||
Pseudo(PseudoCursorType),
|
||||
Sorter,
|
||||
VirtualTable(Arc<VirtualTable>),
|
||||
MaterializedView(
|
||||
Arc<BTreeTable>,
|
||||
Arc<std::sync::Mutex<crate::incremental::view::IncrementalView>>,
|
||||
),
|
||||
}
|
||||
|
||||
impl CursorType {
|
||||
@@ -865,6 +869,7 @@ impl ProgramBuilder {
|
||||
let default = match cursor_type {
|
||||
CursorType::BTreeTable(btree) => &btree.columns[column].default,
|
||||
CursorType::BTreeIndex(index) => &index.columns[column].default,
|
||||
CursorType::MaterializedView(btree, _) => &btree.columns[column].default,
|
||||
_ => break 'value None,
|
||||
};
|
||||
|
||||
|
||||
@@ -953,11 +953,43 @@ pub fn op_open_read(
|
||||
let num_columns = match cursor_type {
|
||||
CursorType::BTreeTable(table_rc) => table_rc.columns.len(),
|
||||
CursorType::BTreeIndex(index_arc) => index_arc.columns.len(),
|
||||
CursorType::MaterializedView(table_rc, _) => table_rc.columns.len(),
|
||||
_ => unreachable!("This should not have happened"),
|
||||
};
|
||||
|
||||
match cursor_type {
|
||||
CursorType::MaterializedView(_, view_mutex) => {
|
||||
// This is a materialized view with storage
|
||||
// Create btree cursor for reading the persistent data
|
||||
let btree_cursor = Box::new(BTreeCursor::new_table(
|
||||
mv_cursor,
|
||||
pager.clone(),
|
||||
*root_page,
|
||||
num_columns,
|
||||
));
|
||||
|
||||
// Get the view name and look up or create its transaction state
|
||||
let view_name = view_mutex.lock().unwrap().name().to_string();
|
||||
let tx_state = program
|
||||
.connection
|
||||
.view_transaction_states
|
||||
.get_or_create(&view_name);
|
||||
|
||||
// Create materialized view cursor with this view's transaction state
|
||||
let mv_cursor = crate::incremental::cursor::MaterializedViewCursor::new(
|
||||
btree_cursor,
|
||||
view_mutex.clone(),
|
||||
pager.clone(),
|
||||
tx_state,
|
||||
)?;
|
||||
|
||||
cursors
|
||||
.get_mut(*cursor_id)
|
||||
.unwrap()
|
||||
.replace(Cursor::new_materialized_view(mv_cursor));
|
||||
}
|
||||
CursorType::BTreeTable(_) => {
|
||||
// Regular table
|
||||
let cursor = BTreeCursor::new_table(mv_cursor, pager.clone(), *root_page, num_columns);
|
||||
cursors
|
||||
.get_mut(*cursor_id)
|
||||
@@ -1282,10 +1314,18 @@ pub fn op_rewind(
|
||||
);
|
||||
assert!(pc_if_empty.is_offset());
|
||||
let is_empty = {
|
||||
let mut cursor = must_be_btree_cursor!(*cursor_id, program.cursor_ref, state, "Rewind");
|
||||
let cursor = cursor.as_btree_mut();
|
||||
return_if_io!(cursor.rewind());
|
||||
cursor.is_empty()
|
||||
let mut cursor = state.get_cursor(*cursor_id);
|
||||
match &mut *cursor {
|
||||
Cursor::BTree(btree_cursor) => {
|
||||
return_if_io!(btree_cursor.rewind());
|
||||
btree_cursor.is_empty()
|
||||
}
|
||||
Cursor::MaterializedView(mv_cursor) => {
|
||||
return_if_io!(mv_cursor.rewind());
|
||||
!mv_cursor.is_valid()?
|
||||
}
|
||||
_ => panic!("Rewind on non-btree/materialized-view cursor"),
|
||||
}
|
||||
};
|
||||
if is_empty {
|
||||
state.pc = pc_if_empty.as_offset_int();
|
||||
@@ -1430,17 +1470,43 @@ pub fn op_column(
|
||||
} => {
|
||||
{
|
||||
let mut table_cursor = state.get_cursor(table_cursor_id);
|
||||
let table_cursor = table_cursor.as_btree_mut();
|
||||
return_if_io!(
|
||||
table_cursor.seek(SeekKey::TableRowId(rowid), SeekOp::GE { eq_only: true })
|
||||
);
|
||||
// MaterializedView cursors shouldn't go through deferred seek logic
|
||||
// but if we somehow get here, handle it appropriately
|
||||
match &mut *table_cursor {
|
||||
Cursor::MaterializedView(mv_cursor) => {
|
||||
// Seek to the rowid in the materialized view
|
||||
return_if_io!(mv_cursor
|
||||
.seek(SeekKey::TableRowId(rowid), SeekOp::GE { eq_only: true }));
|
||||
}
|
||||
_ => {
|
||||
// Regular btree cursor
|
||||
let table_cursor = table_cursor.as_btree_mut();
|
||||
return_if_io!(table_cursor
|
||||
.seek(SeekKey::TableRowId(rowid), SeekOp::GE { eq_only: true }));
|
||||
}
|
||||
}
|
||||
}
|
||||
state.op_column_state = OpColumnState::GetColumn;
|
||||
}
|
||||
OpColumnState::GetColumn => {
|
||||
// First check if this is a MaterializedViewCursor
|
||||
{
|
||||
let mut cursor = state.get_cursor(*cursor_id);
|
||||
if let Cursor::MaterializedView(mv_cursor) = &mut *cursor {
|
||||
// Handle materialized view column access
|
||||
let value = return_if_io!(mv_cursor.column(*column));
|
||||
drop(cursor);
|
||||
state.registers[*dest] = Register::Value(value);
|
||||
break 'outer;
|
||||
}
|
||||
// Fall back to normal handling
|
||||
}
|
||||
|
||||
let (_, cursor_type) = program.cursor_ref.get(*cursor_id).unwrap();
|
||||
match cursor_type {
|
||||
CursorType::BTreeTable(_) | CursorType::BTreeIndex(_) => {
|
||||
CursorType::BTreeTable(_)
|
||||
| CursorType::BTreeIndex(_)
|
||||
| CursorType::MaterializedView(_, _) => {
|
||||
'ifnull: {
|
||||
let mut cursor_ref = must_be_btree_cursor!(
|
||||
*cursor_id,
|
||||
@@ -1843,12 +1909,19 @@ pub fn op_next(
|
||||
);
|
||||
assert!(pc_if_next.is_offset());
|
||||
let is_empty = {
|
||||
let mut cursor = must_be_btree_cursor!(*cursor_id, program.cursor_ref, state, "Next");
|
||||
let cursor = cursor.as_btree_mut();
|
||||
cursor.set_null_flag(false);
|
||||
return_if_io!(cursor.next());
|
||||
|
||||
cursor.is_empty()
|
||||
let mut cursor = state.get_cursor(*cursor_id);
|
||||
match &mut *cursor {
|
||||
Cursor::BTree(btree_cursor) => {
|
||||
btree_cursor.set_null_flag(false);
|
||||
return_if_io!(btree_cursor.next());
|
||||
btree_cursor.is_empty()
|
||||
}
|
||||
Cursor::MaterializedView(mv_cursor) => {
|
||||
let has_more = return_if_io!(mv_cursor.next());
|
||||
!has_more
|
||||
}
|
||||
_ => panic!("Next on non-btree/materialized-view cursor"),
|
||||
}
|
||||
};
|
||||
if !is_empty {
|
||||
// Increment metrics for row read
|
||||
@@ -2444,9 +2517,18 @@ pub fn op_row_id(
|
||||
} else {
|
||||
state.registers[*dest] = Register::Value(Value::Null);
|
||||
}
|
||||
} else if let Some(Cursor::MaterializedView(mv_cursor)) =
|
||||
cursors.get_mut(*cursor_id).unwrap()
|
||||
{
|
||||
if let Some(rowid) = return_if_io!(mv_cursor.rowid()) {
|
||||
state.registers[*dest] = Register::Value(Value::Integer(rowid));
|
||||
} else {
|
||||
state.registers[*dest] = Register::Value(Value::Null);
|
||||
}
|
||||
} else {
|
||||
return Err(LimboError::InternalError(
|
||||
"RowId: cursor is not a table or virtual cursor".to_string(),
|
||||
"RowId: cursor is not a table, virtual, or materialized view cursor"
|
||||
.to_string(),
|
||||
));
|
||||
}
|
||||
break;
|
||||
@@ -2497,40 +2579,67 @@ pub fn op_seek_rowid(
|
||||
assert!(target_pc.is_offset());
|
||||
let (pc, did_seek) = {
|
||||
let mut cursor = state.get_cursor(*cursor_id);
|
||||
let cursor = cursor.as_btree_mut();
|
||||
let rowid = match state.registers[*src_reg].get_value() {
|
||||
Value::Integer(rowid) => Some(*rowid),
|
||||
Value::Null => None,
|
||||
// For non-integer values try to apply affinity and convert them to integer.
|
||||
other => {
|
||||
let mut temp_reg = Register::Value(other.clone());
|
||||
let converted = apply_affinity_char(&mut temp_reg, Affinity::Numeric);
|
||||
if converted {
|
||||
match temp_reg.get_value() {
|
||||
Value::Integer(i) => Some(*i),
|
||||
Value::Float(f) => Some(*f as i64),
|
||||
_ => unreachable!("apply_affinity_char with Numeric should produce an integer if it returns true"),
|
||||
|
||||
// Handle MaterializedView cursor
|
||||
let (pc, did_seek) = match &mut *cursor {
|
||||
Cursor::MaterializedView(mv_cursor) => {
|
||||
let rowid = match state.registers[*src_reg].get_value() {
|
||||
Value::Integer(rowid) => Some(*rowid),
|
||||
Value::Null => None,
|
||||
_ => None,
|
||||
};
|
||||
|
||||
match rowid {
|
||||
Some(rowid) => {
|
||||
let seek_result = return_if_io!(mv_cursor
|
||||
.seek(SeekKey::TableRowId(rowid), SeekOp::GE { eq_only: true }));
|
||||
let pc = if !matches!(seek_result, SeekResult::Found) {
|
||||
target_pc.as_offset_int()
|
||||
} else {
|
||||
state.pc + 1
|
||||
};
|
||||
(pc, true)
|
||||
}
|
||||
} else {
|
||||
None
|
||||
None => (target_pc.as_offset_int(), false),
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
match rowid {
|
||||
Some(rowid) => {
|
||||
let seek_result = return_if_io!(
|
||||
cursor.seek(SeekKey::TableRowId(rowid), SeekOp::GE { eq_only: true })
|
||||
);
|
||||
let pc = if !matches!(seek_result, SeekResult::Found) {
|
||||
target_pc.as_offset_int()
|
||||
} else {
|
||||
state.pc + 1
|
||||
Cursor::BTree(btree_cursor) => {
|
||||
let rowid = match state.registers[*src_reg].get_value() {
|
||||
Value::Integer(rowid) => Some(*rowid),
|
||||
Value::Null => None,
|
||||
// For non-integer values try to apply affinity and convert them to integer.
|
||||
other => {
|
||||
let mut temp_reg = Register::Value(other.clone());
|
||||
let converted = apply_affinity_char(&mut temp_reg, Affinity::Numeric);
|
||||
if converted {
|
||||
match temp_reg.get_value() {
|
||||
Value::Integer(i) => Some(*i),
|
||||
Value::Float(f) => Some(*f as i64),
|
||||
_ => unreachable!("apply_affinity_char with Numeric should produce an integer if it returns true"),
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
};
|
||||
(pc, true)
|
||||
|
||||
match rowid {
|
||||
Some(rowid) => {
|
||||
let seek_result = return_if_io!(btree_cursor
|
||||
.seek(SeekKey::TableRowId(rowid), SeekOp::GE { eq_only: true }));
|
||||
let pc = if !matches!(seek_result, SeekResult::Found) {
|
||||
target_pc.as_offset_int()
|
||||
} else {
|
||||
state.pc + 1
|
||||
};
|
||||
(pc, true)
|
||||
}
|
||||
None => (target_pc.as_offset_int(), false),
|
||||
}
|
||||
}
|
||||
None => (target_pc.as_offset_int(), false),
|
||||
}
|
||||
_ => panic!("SeekRowid on non-btree/materialized-view cursor"),
|
||||
};
|
||||
(pc, did_seek)
|
||||
};
|
||||
// Increment btree_seeks metric for SeekRowid operation after cursor is dropped
|
||||
if did_seek {
|
||||
@@ -5192,12 +5301,11 @@ pub fn op_insert(
|
||||
match &state.op_insert_state.sub_state {
|
||||
OpInsertSubState::MaybeCaptureRecord => {
|
||||
let schema = program.connection.schema.borrow();
|
||||
let dependent_views =
|
||||
schema.get_dependent_materialized_views_unnormalized(table_name);
|
||||
let dependent_views = schema.get_dependent_materialized_views(table_name);
|
||||
// If there are no dependent views, we don't need to capture the old record.
|
||||
// We also don't need to do it if the rowid of the UPDATEd row was changed, because that means
|
||||
// we deleted it earlier and `op_delete` already captured the change.
|
||||
if dependent_views.is_none() || flag.has(InsertFlags::UPDATE_ROWID_CHANGE) {
|
||||
if dependent_views.is_empty() || flag.has(InsertFlags::UPDATE_ROWID_CHANGE) {
|
||||
if flag.has(InsertFlags::REQUIRE_SEEK) {
|
||||
state.op_insert_state.sub_state = OpInsertSubState::Seek;
|
||||
} else {
|
||||
@@ -5303,9 +5411,8 @@ pub fn op_insert(
|
||||
state.op_insert_state.sub_state = OpInsertSubState::UpdateLastRowid;
|
||||
} else {
|
||||
let schema = program.connection.schema.borrow();
|
||||
let dependent_views =
|
||||
schema.get_dependent_materialized_views_unnormalized(table_name);
|
||||
if dependent_views.is_some() {
|
||||
let dependent_views = schema.get_dependent_materialized_views(table_name);
|
||||
if !dependent_views.is_empty() {
|
||||
state.op_insert_state.sub_state = OpInsertSubState::ApplyViewChange;
|
||||
} else {
|
||||
break;
|
||||
@@ -5325,9 +5432,8 @@ pub fn op_insert(
|
||||
program.n_change.set(prev_changes + 1);
|
||||
}
|
||||
let schema = program.connection.schema.borrow();
|
||||
let dependent_views =
|
||||
schema.get_dependent_materialized_views_unnormalized(table_name);
|
||||
if dependent_views.is_some() {
|
||||
let dependent_views = schema.get_dependent_materialized_views(table_name);
|
||||
if !dependent_views.is_empty() {
|
||||
state.op_insert_state.sub_state = OpInsertSubState::ApplyViewChange;
|
||||
continue;
|
||||
}
|
||||
@@ -5335,10 +5441,8 @@ pub fn op_insert(
|
||||
}
|
||||
OpInsertSubState::ApplyViewChange => {
|
||||
let schema = program.connection.schema.borrow();
|
||||
let dependent_views =
|
||||
schema.get_dependent_materialized_views_unnormalized(table_name);
|
||||
assert!(dependent_views.is_some());
|
||||
let dependent_views = dependent_views.unwrap();
|
||||
let dependent_views = schema.get_dependent_materialized_views(table_name);
|
||||
assert!(!dependent_views.is_empty());
|
||||
|
||||
let (key, values) = {
|
||||
let mut cursor = state.get_cursor(*cursor_id);
|
||||
@@ -5383,17 +5487,22 @@ pub fn op_insert(
|
||||
(key, new_values)
|
||||
};
|
||||
|
||||
let mut tx_states = program.connection.view_transaction_states.borrow_mut();
|
||||
if let Some((key, values)) = state.op_insert_state.old_record.take() {
|
||||
for view_name in dependent_views.iter() {
|
||||
let tx_state = tx_states.entry(view_name.clone()).or_default();
|
||||
tx_state.delta.delete(key, values.clone());
|
||||
let tx_state = program
|
||||
.connection
|
||||
.view_transaction_states
|
||||
.get_or_create(view_name);
|
||||
tx_state.delete(key, values.clone());
|
||||
}
|
||||
}
|
||||
for view_name in dependent_views.iter() {
|
||||
let tx_state = tx_states.entry(view_name.clone()).or_default();
|
||||
let tx_state = program
|
||||
.connection
|
||||
.view_transaction_states
|
||||
.get_or_create(view_name);
|
||||
|
||||
tx_state.delta.insert(key, values.clone());
|
||||
tx_state.insert(key, values.clone());
|
||||
}
|
||||
|
||||
break;
|
||||
@@ -5522,10 +5631,12 @@ pub fn op_delete(
|
||||
assert!(!dependent_views.is_empty());
|
||||
let maybe_deleted_record = state.op_delete_state.deleted_record.take();
|
||||
if let Some((key, values)) = maybe_deleted_record {
|
||||
let mut tx_states = program.connection.view_transaction_states.borrow_mut();
|
||||
for view_name in dependent_views {
|
||||
let tx_state = tx_states.entry(view_name.clone()).or_default();
|
||||
tx_state.delta.delete(key, values.clone());
|
||||
let tx_state = program
|
||||
.connection
|
||||
.view_transaction_states
|
||||
.get_or_create(&view_name);
|
||||
tx_state.delete(key, values.clone());
|
||||
}
|
||||
}
|
||||
break;
|
||||
@@ -6232,7 +6343,10 @@ pub fn op_open_write(
|
||||
} else {
|
||||
let num_columns = match cursor_type {
|
||||
CursorType::BTreeTable(table_rc) => table_rc.columns.len(),
|
||||
_ => unreachable!("Expected BTreeTable. This should not have happened."),
|
||||
CursorType::MaterializedView(table_rc, _) => table_rc.columns.len(),
|
||||
_ => unreachable!(
|
||||
"Expected BTreeTable or MaterializedView. This should not have happened."
|
||||
),
|
||||
};
|
||||
|
||||
let cursor =
|
||||
@@ -6453,6 +6567,7 @@ pub fn op_parse_schema(
|
||||
},
|
||||
insn
|
||||
);
|
||||
|
||||
let conn = program.connection.clone();
|
||||
// set auto commit to false in order for parse schema to not commit changes as transaction state is stored in connection,
|
||||
// and we use the same connection for nested query.
|
||||
@@ -6464,7 +6579,7 @@ pub fn op_parse_schema(
|
||||
|
||||
conn.with_schema_mut(|schema| {
|
||||
// TODO: This function below is synchronous, make it async
|
||||
let existing_views = schema.materialized_views.clone();
|
||||
let existing_views = schema.incremental_views.clone();
|
||||
conn.is_nested_stmt.set(true);
|
||||
parse_schema_rows(
|
||||
stmt,
|
||||
@@ -6479,7 +6594,7 @@ pub fn op_parse_schema(
|
||||
|
||||
conn.with_schema_mut(|schema| {
|
||||
// TODO: This function below is synchronous, make it async
|
||||
let existing_views = schema.materialized_views.clone();
|
||||
let existing_views = schema.incremental_views.clone();
|
||||
conn.is_nested_stmt.set(true);
|
||||
parse_schema_rows(
|
||||
stmt,
|
||||
@@ -6500,14 +6615,75 @@ pub fn op_parse_schema(
|
||||
pub fn op_populate_materialized_views(
|
||||
program: &Program,
|
||||
state: &mut ProgramState,
|
||||
_insn: &Insn,
|
||||
_pager: &Rc<Pager>,
|
||||
insn: &Insn,
|
||||
pager: &Rc<Pager>,
|
||||
_mv_store: Option<&Arc<MvStore>>,
|
||||
) -> Result<InsnFunctionStepResult> {
|
||||
let conn = program.connection.clone();
|
||||
let schema = conn.schema.borrow();
|
||||
load_insn!(PopulateMaterializedViews { cursors }, insn);
|
||||
|
||||
let conn = program.connection.clone();
|
||||
|
||||
// For each view, get its cursor and root page
|
||||
let mut view_info = Vec::new();
|
||||
{
|
||||
let cursors_ref = state.cursors.borrow();
|
||||
for (view_name, cursor_id) in cursors {
|
||||
// Get the cursor to find the root page
|
||||
let cursor = cursors_ref
|
||||
.get(*cursor_id)
|
||||
.and_then(|c| c.as_ref())
|
||||
.ok_or_else(|| {
|
||||
LimboError::InternalError(format!("Cursor {cursor_id} not found"))
|
||||
})?;
|
||||
|
||||
let root_page = match cursor {
|
||||
crate::types::Cursor::BTree(btree_cursor) => btree_cursor.root_page(),
|
||||
_ => {
|
||||
return Err(LimboError::InternalError(
|
||||
"Expected BTree cursor for materialized view".into(),
|
||||
))
|
||||
}
|
||||
};
|
||||
|
||||
view_info.push((view_name.clone(), root_page, *cursor_id));
|
||||
}
|
||||
}
|
||||
|
||||
// Now populate the views (after releasing the schema borrow)
|
||||
for (view_name, _root_page, cursor_id) in view_info {
|
||||
let schema = conn.schema.borrow();
|
||||
if let Some(view) = schema.get_materialized_view(&view_name) {
|
||||
let mut view = view.lock().unwrap();
|
||||
// Drop the schema borrow before calling populate_from_table
|
||||
drop(schema);
|
||||
|
||||
// Get the cursor for writing
|
||||
// Get a mutable reference to the cursor
|
||||
let mut cursors_ref = state.cursors.borrow_mut();
|
||||
let cursor = cursors_ref
|
||||
.get_mut(cursor_id)
|
||||
.and_then(|c| c.as_mut())
|
||||
.ok_or_else(|| {
|
||||
LimboError::InternalError(format!(
|
||||
"Cursor {cursor_id} not found for population"
|
||||
))
|
||||
})?;
|
||||
|
||||
// Extract the BTreeCursor
|
||||
let btree_cursor = match cursor {
|
||||
crate::types::Cursor::BTree(btree_cursor) => btree_cursor,
|
||||
_ => {
|
||||
return Err(LimboError::InternalError(
|
||||
"Expected BTree cursor for materialized view population".into(),
|
||||
))
|
||||
}
|
||||
};
|
||||
|
||||
// Now populate it with the cursor for writing
|
||||
return_if_io!(view.populate_from_table(&conn, pager, btree_cursor.as_mut()));
|
||||
}
|
||||
}
|
||||
|
||||
return_if_io!(schema.populate_materialized_views(&conn));
|
||||
// All views populated, advance to next instruction
|
||||
state.pc += 1;
|
||||
Ok(InsnFunctionStepResult::Step)
|
||||
@@ -6932,6 +7108,9 @@ pub fn op_open_ephemeral(
|
||||
CursorType::VirtualTable(_) => {
|
||||
panic!("OpenEphemeral on virtual table cursor, use Insn::VOpen instead");
|
||||
}
|
||||
CursorType::MaterializedView(_, _) => {
|
||||
panic!("OpenEphemeral on materialized view cursor");
|
||||
}
|
||||
}
|
||||
|
||||
state.pc += 1;
|
||||
|
||||
@@ -19,6 +19,7 @@ pub fn insn_to_str(
|
||||
CursorType::BTreeIndex(index) => &index.name,
|
||||
CursorType::Pseudo(_) => "pseudo",
|
||||
CursorType::VirtualTable(virtual_table) => &virtual_table.name,
|
||||
CursorType::MaterializedView(table, _) => &table.name,
|
||||
CursorType::Sorter => "sorter",
|
||||
}
|
||||
};
|
||||
@@ -541,6 +542,10 @@ pub fn insn_to_str(
|
||||
let name = &index.columns.get(*column).unwrap().name;
|
||||
Some(name)
|
||||
}
|
||||
CursorType::MaterializedView(table, _) => {
|
||||
let name = table.columns.get(*column).and_then(|v| v.name.as_ref());
|
||||
name
|
||||
}
|
||||
CursorType::Pseudo(_) => None,
|
||||
CursorType::Sorter => None,
|
||||
CursorType::VirtualTable(v) => v.columns.get(*column).unwrap().name.as_ref(),
|
||||
@@ -1337,13 +1342,13 @@ pub fn insn_to_str(
|
||||
0,
|
||||
where_clause.clone().unwrap_or("NULL".to_string()),
|
||||
),
|
||||
Insn::PopulateMaterializedViews => (
|
||||
Insn::PopulateMaterializedViews { cursors } => (
|
||||
"PopulateMaterializedViews",
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
Value::Null,
|
||||
0,
|
||||
cursors.len() as u16,
|
||||
"".to_string(),
|
||||
),
|
||||
Insn::Prev {
|
||||
|
||||
@@ -898,7 +898,12 @@ pub enum Insn {
|
||||
},
|
||||
|
||||
/// Populate all materialized views after schema parsing
|
||||
PopulateMaterializedViews,
|
||||
/// The cursors parameter contains a mapping of view names to cursor IDs that have been
|
||||
/// opened to the view's btree for writing the materialized data
|
||||
PopulateMaterializedViews {
|
||||
/// Mapping of view name to cursor_id for writing to the view's btree
|
||||
cursors: Vec<(String, usize)>,
|
||||
},
|
||||
|
||||
/// Place the result of lhs >> rhs in dest register.
|
||||
ShiftRight {
|
||||
@@ -1190,7 +1195,7 @@ impl Insn {
|
||||
Insn::IsNull { .. } => execute::op_is_null,
|
||||
Insn::CollSeq { .. } => execute::op_coll_seq,
|
||||
Insn::ParseSchema { .. } => execute::op_parse_schema,
|
||||
Insn::PopulateMaterializedViews => execute::op_populate_materialized_views,
|
||||
Insn::PopulateMaterializedViews { .. } => execute::op_populate_materialized_views,
|
||||
Insn::ShiftRight { .. } => execute::op_shift_right,
|
||||
Insn::ShiftLeft { .. } => execute::op_shift_left,
|
||||
Insn::AddImm { .. } => execute::op_add_imm,
|
||||
|
||||
123
core/vdbe/mod.rs
123
core/vdbe/mod.rs
@@ -69,6 +69,17 @@ use std::{
|
||||
};
|
||||
use tracing::{instrument, Level};
|
||||
|
||||
/// State machine for committing view deltas with I/O handling
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum ViewDeltaCommitState {
|
||||
NotStarted,
|
||||
Processing {
|
||||
views: Vec<String>, // view names (all materialized views have storage)
|
||||
current_index: usize,
|
||||
},
|
||||
Done,
|
||||
}
|
||||
|
||||
/// We use labels to indicate that we want to jump to whatever the instruction offset
|
||||
/// will be at runtime, because the offset cannot always be determined when the jump
|
||||
/// instruction is created.
|
||||
@@ -284,6 +295,8 @@ pub struct ProgramState {
|
||||
current_collation: Option<CollationSeq>,
|
||||
op_column_state: OpColumnState,
|
||||
op_row_id_state: OpRowIdState,
|
||||
/// State machine for committing view deltas with I/O handling
|
||||
view_delta_state: ViewDeltaCommitState,
|
||||
}
|
||||
|
||||
impl ProgramState {
|
||||
@@ -326,6 +339,7 @@ impl ProgramState {
|
||||
current_collation: None,
|
||||
op_column_state: OpColumnState::Start,
|
||||
op_row_id_state: OpRowIdState::Start,
|
||||
view_delta_state: ViewDeltaCommitState::NotStarted,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -413,6 +427,7 @@ macro_rules! must_be_btree_cursor {
|
||||
let cursor = match cursor_type {
|
||||
CursorType::BTreeTable(_) => $state.get_cursor($cursor_id),
|
||||
CursorType::BTreeIndex(_) => $state.get_cursor($cursor_id),
|
||||
CursorType::MaterializedView(_, _) => $state.get_cursor($cursor_id),
|
||||
CursorType::Pseudo(_) => panic!("{} on pseudo cursor", $insn_name),
|
||||
CursorType::Sorter => panic!("{} on sorter cursor", $insn_name),
|
||||
CursorType::VirtualTable(_) => panic!("{} on virtual table cursor", $insn_name),
|
||||
@@ -518,20 +533,97 @@ impl Program {
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::DEBUG)]
|
||||
fn apply_view_deltas(&self, rollback: bool) {
|
||||
if self.connection.view_transaction_states.borrow().is_empty() {
|
||||
return;
|
||||
}
|
||||
fn apply_view_deltas(
|
||||
&self,
|
||||
state: &mut ProgramState,
|
||||
rollback: bool,
|
||||
pager: &Rc<Pager>,
|
||||
) -> Result<IOResult<()>> {
|
||||
use crate::types::IOResult;
|
||||
|
||||
let tx_states = self.connection.view_transaction_states.take();
|
||||
loop {
|
||||
match &state.view_delta_state {
|
||||
ViewDeltaCommitState::NotStarted => {
|
||||
if self.connection.view_transaction_states.is_empty() {
|
||||
return Ok(IOResult::Done(()));
|
||||
}
|
||||
|
||||
if !rollback {
|
||||
let schema = self.connection.schema.borrow();
|
||||
if rollback {
|
||||
// On rollback, just clear and done
|
||||
self.connection.view_transaction_states.clear();
|
||||
return Ok(IOResult::Done(()));
|
||||
}
|
||||
|
||||
for (view_name, tx_state) in tx_states.iter() {
|
||||
if let Some(view_mutex) = schema.get_materialized_view(view_name) {
|
||||
let mut view = view_mutex.lock().unwrap();
|
||||
view.merge_delta(&tx_state.delta);
|
||||
// Not a rollback - proceed with processing
|
||||
let schema = self.connection.schema.borrow();
|
||||
|
||||
// Collect materialized views - they should all have storage
|
||||
let mut views = Vec::new();
|
||||
for view_name in self.connection.view_transaction_states.get_view_names() {
|
||||
if let Some(view_mutex) = schema.get_materialized_view(&view_name) {
|
||||
let view = view_mutex.lock().unwrap();
|
||||
let root_page = view.get_root_page();
|
||||
|
||||
// Materialized views should always have storage (root_page != 0)
|
||||
assert!(
|
||||
root_page != 0,
|
||||
"Materialized view '{view_name}' should have a root page"
|
||||
);
|
||||
|
||||
views.push(view_name);
|
||||
}
|
||||
}
|
||||
|
||||
state.view_delta_state = ViewDeltaCommitState::Processing {
|
||||
views,
|
||||
current_index: 0,
|
||||
};
|
||||
}
|
||||
|
||||
ViewDeltaCommitState::Processing {
|
||||
views,
|
||||
current_index,
|
||||
} => {
|
||||
// At this point we know it's not a rollback
|
||||
if *current_index >= views.len() {
|
||||
// All done, clear the transaction states
|
||||
self.connection.view_transaction_states.clear();
|
||||
state.view_delta_state = ViewDeltaCommitState::Done;
|
||||
return Ok(IOResult::Done(()));
|
||||
}
|
||||
|
||||
let view_name = &views[*current_index];
|
||||
|
||||
let delta = self
|
||||
.connection
|
||||
.view_transaction_states
|
||||
.get(view_name)
|
||||
.unwrap()
|
||||
.get_delta();
|
||||
|
||||
let schema = self.connection.schema.borrow();
|
||||
if let Some(view_mutex) = schema.get_materialized_view(view_name) {
|
||||
let mut view = view_mutex.lock().unwrap();
|
||||
|
||||
// Handle I/O from merge_delta - pass pager, circuit will create its own cursor
|
||||
match view.merge_delta(&delta, pager.clone())? {
|
||||
IOResult::Done(_) => {
|
||||
// Move to next view
|
||||
state.view_delta_state = ViewDeltaCommitState::Processing {
|
||||
views: views.clone(),
|
||||
current_index: current_index + 1,
|
||||
};
|
||||
}
|
||||
IOResult::IO(io) => {
|
||||
// Return I/O, will resume at same index
|
||||
return Ok(IOResult::IO(io));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ViewDeltaCommitState::Done => {
|
||||
return Ok(IOResult::Done(()));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -544,7 +636,14 @@ impl Program {
|
||||
mv_store: Option<&Arc<MvStore>>,
|
||||
rollback: bool,
|
||||
) -> Result<IOResult<()>> {
|
||||
self.apply_view_deltas(rollback);
|
||||
// Apply view deltas with I/O handling
|
||||
match self.apply_view_deltas(program_state, rollback, &pager)? {
|
||||
IOResult::IO(io) => return Ok(IOResult::IO(io)),
|
||||
IOResult::Done(_) => {}
|
||||
}
|
||||
|
||||
// Reset state for next use
|
||||
program_state.view_delta_state = ViewDeltaCommitState::NotStarted;
|
||||
|
||||
if self.connection.transaction_state.get() == TransactionState::None && mv_store.is_none() {
|
||||
// No need to do any work here if not in tx. Current MVCC logic doesn't work with this assumption,
|
||||
|
||||
30
core/vtab.rs
30
core/vtab.rs
@@ -6,7 +6,7 @@ use crate::{Connection, LimboError, SymbolTable, Value};
|
||||
use std::ffi::c_void;
|
||||
use std::ptr::NonNull;
|
||||
use std::rc::Rc;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::sync::Arc;
|
||||
use turso_ext::{ConstraintInfo, IndexInfo, OrderByInfo, ResultCode, VTabKind, VTabModuleImpl};
|
||||
use turso_parser::{ast, parser::Parser};
|
||||
|
||||
@@ -14,7 +14,6 @@ use turso_parser::{ast, parser::Parser};
|
||||
pub(crate) enum VirtualTableType {
|
||||
Pragma(PragmaVirtualTable),
|
||||
External(ExtVirtualTable),
|
||||
View(crate::vtab_view::ViewVirtualTable),
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
@@ -30,7 +29,6 @@ impl VirtualTable {
|
||||
match &self.vtab_type {
|
||||
VirtualTableType::Pragma(_) => true,
|
||||
VirtualTableType::External(table) => table.readonly(),
|
||||
VirtualTableType::View(_) => true,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -88,21 +86,6 @@ impl VirtualTable {
|
||||
Ok(Arc::new(vtab))
|
||||
}
|
||||
|
||||
/// Create a virtual table for a view
|
||||
pub(crate) fn view(
|
||||
view_name: &str,
|
||||
columns: Vec<Column>,
|
||||
view: Arc<Mutex<crate::incremental::view::IncrementalView>>,
|
||||
) -> crate::Result<Arc<VirtualTable>> {
|
||||
let vtab = VirtualTable {
|
||||
name: view_name.to_owned(),
|
||||
columns,
|
||||
kind: VTabKind::VirtualTable,
|
||||
vtab_type: VirtualTableType::View(crate::vtab_view::ViewVirtualTable { view }),
|
||||
};
|
||||
Ok(Arc::new(vtab))
|
||||
}
|
||||
|
||||
fn resolve_columns(schema: String) -> crate::Result<Vec<Column>> {
|
||||
let mut parser = Parser::new(schema.as_bytes());
|
||||
if let ast::Cmd::Stmt(ast::Stmt::CreateTable { body, .. }) = parser.next_cmd()?.ok_or(
|
||||
@@ -124,9 +107,6 @@ impl VirtualTable {
|
||||
VirtualTableType::External(table) => {
|
||||
Ok(VirtualTableCursor::External(table.open(conn.clone())?))
|
||||
}
|
||||
VirtualTableType::View(table) => {
|
||||
Ok(VirtualTableCursor::View(Box::new(table.open(conn)?)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -134,7 +114,6 @@ impl VirtualTable {
|
||||
match &self.vtab_type {
|
||||
VirtualTableType::Pragma(_) => Err(LimboError::ReadOnly),
|
||||
VirtualTableType::External(table) => table.update(args),
|
||||
VirtualTableType::View(_) => Err(LimboError::ReadOnly),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -142,7 +121,6 @@ impl VirtualTable {
|
||||
match &self.vtab_type {
|
||||
VirtualTableType::Pragma(_) => Ok(()),
|
||||
VirtualTableType::External(table) => table.destroy(),
|
||||
VirtualTableType::View(_) => Ok(()),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -154,7 +132,6 @@ impl VirtualTable {
|
||||
match &self.vtab_type {
|
||||
VirtualTableType::Pragma(table) => table.best_index(constraints),
|
||||
VirtualTableType::External(table) => table.best_index(constraints, order_by),
|
||||
VirtualTableType::View(view) => view.best_index(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -162,7 +139,6 @@ impl VirtualTable {
|
||||
pub enum VirtualTableCursor {
|
||||
Pragma(Box<PragmaVirtualTableCursor>),
|
||||
External(ExtVirtualTableCursor),
|
||||
View(Box<crate::vtab_view::ViewVirtualTableCursor>),
|
||||
}
|
||||
|
||||
impl VirtualTableCursor {
|
||||
@@ -170,7 +146,6 @@ impl VirtualTableCursor {
|
||||
match self {
|
||||
VirtualTableCursor::Pragma(cursor) => cursor.next(),
|
||||
VirtualTableCursor::External(cursor) => cursor.next(),
|
||||
VirtualTableCursor::View(cursor) => cursor.next(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -178,7 +153,6 @@ impl VirtualTableCursor {
|
||||
match self {
|
||||
VirtualTableCursor::Pragma(cursor) => cursor.rowid(),
|
||||
VirtualTableCursor::External(cursor) => cursor.rowid(),
|
||||
VirtualTableCursor::View(cursor) => cursor.rowid(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -186,7 +160,6 @@ impl VirtualTableCursor {
|
||||
match self {
|
||||
VirtualTableCursor::Pragma(cursor) => cursor.column(column),
|
||||
VirtualTableCursor::External(cursor) => cursor.column(column),
|
||||
VirtualTableCursor::View(cursor) => cursor.column(column),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -202,7 +175,6 @@ impl VirtualTableCursor {
|
||||
VirtualTableCursor::External(cursor) => {
|
||||
cursor.filter(idx_num, idx_str, arg_count, args)
|
||||
}
|
||||
VirtualTableCursor::View(cursor) => cursor.filter(args),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,101 +0,0 @@
|
||||
use crate::incremental::view::IncrementalView;
|
||||
use crate::{Connection, LimboError, Value, VirtualTable};
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
/// Create a virtual table wrapper for a view
|
||||
pub fn create_view_virtual_table(
|
||||
view_name: &str,
|
||||
view: Arc<Mutex<IncrementalView>>,
|
||||
) -> crate::Result<Arc<VirtualTable>> {
|
||||
// Use the VirtualTable::view method we added
|
||||
let view_locked = view.lock().map_err(|_| {
|
||||
LimboError::InternalError("Failed to lock view for virtual table creation".to_string())
|
||||
})?;
|
||||
let columns = view_locked.columns.clone();
|
||||
drop(view_locked); // Release the lock before passing the Arc
|
||||
VirtualTable::view(view_name, columns, view)
|
||||
}
|
||||
|
||||
/// Virtual table wrapper for incremental views
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct ViewVirtualTable {
|
||||
pub view: Arc<Mutex<IncrementalView>>,
|
||||
}
|
||||
|
||||
impl ViewVirtualTable {
|
||||
pub fn best_index(&self) -> Result<turso_ext::IndexInfo, turso_ext::ResultCode> {
|
||||
// Views don't use indexes - return a simple index info
|
||||
Ok(turso_ext::IndexInfo {
|
||||
idx_num: 0,
|
||||
idx_str: None,
|
||||
order_by_consumed: false,
|
||||
estimated_cost: 1000000.0,
|
||||
estimated_rows: 1000,
|
||||
constraint_usages: Vec::new(),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn open(&self, conn: Arc<Connection>) -> crate::Result<ViewVirtualTableCursor> {
|
||||
// Views are now populated during schema parsing (in parse_schema_rows)
|
||||
// so we just get the current data from the view.
|
||||
|
||||
let view = self.view.lock().map_err(|_| {
|
||||
LimboError::InternalError("Failed to lock view for reading".to_string())
|
||||
})?;
|
||||
|
||||
let tx_states = conn.view_transaction_states.borrow();
|
||||
let tx_state = tx_states.get(view.name());
|
||||
|
||||
let data: Vec<(i64, Vec<Value>)> = view.current_data(tx_state);
|
||||
Ok(ViewVirtualTableCursor {
|
||||
data,
|
||||
current_pos: 0,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Cursor for iterating over view data
|
||||
pub struct ViewVirtualTableCursor {
|
||||
data: Vec<(i64, Vec<Value>)>,
|
||||
current_pos: usize,
|
||||
}
|
||||
|
||||
impl ViewVirtualTableCursor {
|
||||
pub fn next(&mut self) -> crate::Result<bool> {
|
||||
if self.current_pos < self.data.len() {
|
||||
self.current_pos += 1;
|
||||
Ok(self.current_pos < self.data.len())
|
||||
} else {
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn rowid(&self) -> i64 {
|
||||
if self.current_pos < self.data.len() {
|
||||
self.data[self.current_pos].0
|
||||
} else {
|
||||
-1
|
||||
}
|
||||
}
|
||||
|
||||
pub fn column(&self, column: usize) -> crate::Result<Value> {
|
||||
if self.current_pos >= self.data.len() {
|
||||
return Ok(Value::Null);
|
||||
}
|
||||
|
||||
let (_row_key, values) = &self.data[self.current_pos];
|
||||
|
||||
// Return the value at the requested column index
|
||||
if let Some(value) = values.get(column) {
|
||||
Ok(value.clone())
|
||||
} else {
|
||||
Ok(Value::Null)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn filter(&mut self, _args: Vec<Value>) -> crate::Result<bool> {
|
||||
// Reset to beginning for new filter
|
||||
self.current_pos = 0;
|
||||
Ok(!self.data.is_empty())
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user