Merge 'Support JOINs in DBSP materialized views' from Glauber Costa

This PR introduces the final major operator: the JOIN operator.
Many things need to be fixed before we can properly support them, and we
handle those. In particular, JOINs always generate qualified column
statements, but we were not handling them correctly at all in the
operators. Not a problem for linear circuits, but fatal for JOINs.
The operator.rs file also becomes incredibly complex with not one, but
two stateful operator. So it is now broken apart.

Closes #3207
This commit is contained in:
Pekka Enberg
2025-09-19 13:40:21 +03:00
committed by GitHub
20 changed files with 8992 additions and 3461 deletions

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -355,7 +355,7 @@ mod tests {
"View not materialized".to_string(),
));
}
let num_columns = view.columns.len();
let num_columns = view.column_schema.columns.len();
drop(view);
// Create a btree cursor

View File

@@ -75,6 +75,10 @@ impl HashableRow {
hasher.finish()
}
pub fn cached_hash(&self) -> u64 {
self.cached_hash
}
}
impl Hash for HashableRow {
@@ -168,7 +172,7 @@ impl Delta {
}
/// A pair of deltas for operators that process two inputs
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Default)]
pub struct DeltaPair {
pub left: Delta,
pub right: Delta,
@@ -400,4 +404,57 @@ mod tests {
let weight = zset.iter().find(|(k, _)| **k == 1).map(|(_, w)| w);
assert_eq!(weight, Some(1));
}
#[test]
fn test_hashable_row_delta_operations() {
let mut delta = Delta::new();
// Test INSERT
delta.insert(1, vec![Value::Integer(1), Value::Integer(100)]);
assert_eq!(delta.len(), 1);
// Test UPDATE (DELETE + INSERT) - order matters!
delta.delete(1, vec![Value::Integer(1), Value::Integer(100)]);
delta.insert(1, vec![Value::Integer(1), Value::Integer(200)]);
assert_eq!(delta.len(), 3); // Should have 3 operations before consolidation
// Verify order is preserved
let ops: Vec<_> = delta.changes.iter().collect();
assert_eq!(ops[0].1, 1); // First insert
assert_eq!(ops[1].1, -1); // Delete
assert_eq!(ops[2].1, 1); // Second insert
// Test consolidation
delta.consolidate();
// After consolidation, the first insert and delete should cancel out
// leaving only the second insert
assert_eq!(delta.len(), 1);
let final_row = &delta.changes[0];
assert_eq!(final_row.0.rowid, 1);
assert_eq!(
final_row.0.values,
vec![Value::Integer(1), Value::Integer(200)]
);
assert_eq!(final_row.1, 1);
}
#[test]
fn test_duplicate_row_consolidation() {
let mut delta = Delta::new();
// Insert same row twice
delta.insert(2, vec![Value::Integer(2), Value::Integer(300)]);
delta.insert(2, vec![Value::Integer(2), Value::Integer(300)]);
assert_eq!(delta.len(), 2);
delta.consolidate();
assert_eq!(delta.len(), 1);
// Weight should be 2 (sum of both inserts)
let final_row = &delta.changes[0];
assert_eq!(final_row.0.rowid, 2);
assert_eq!(final_row.1, 2);
}
}

View File

@@ -0,0 +1,295 @@
#![allow(dead_code)]
// Filter operator for DBSP-style incremental computation
// This operator filters rows based on predicates
use crate::incremental::dbsp::{Delta, DeltaPair};
use crate::incremental::operator::{
ComputationTracker, DbspStateCursors, EvalState, IncrementalOperator,
};
use crate::types::IOResult;
use crate::{Result, Value};
use std::sync::{Arc, Mutex};
/// Filter predicate for filtering rows
#[derive(Debug, Clone)]
pub enum FilterPredicate {
/// Column = value (using column index)
Equals { column_idx: usize, value: Value },
/// Column != value (using column index)
NotEquals { column_idx: usize, value: Value },
/// Column > value (using column index)
GreaterThan { column_idx: usize, value: Value },
/// Column >= value (using column index)
GreaterThanOrEqual { column_idx: usize, value: Value },
/// Column < value (using column index)
LessThan { column_idx: usize, value: Value },
/// Column <= value (using column index)
LessThanOrEqual { column_idx: usize, value: Value },
/// Column = Column comparisons
ColumnEquals { left_idx: usize, right_idx: usize },
/// Column != Column comparisons
ColumnNotEquals { left_idx: usize, right_idx: usize },
/// Column > Column comparisons
ColumnGreaterThan { left_idx: usize, right_idx: usize },
/// Column >= Column comparisons
ColumnGreaterThanOrEqual { left_idx: usize, right_idx: usize },
/// Column < Column comparisons
ColumnLessThan { left_idx: usize, right_idx: usize },
/// Column <= Column comparisons
ColumnLessThanOrEqual { left_idx: usize, right_idx: usize },
/// Logical AND of two predicates
And(Box<FilterPredicate>, Box<FilterPredicate>),
/// Logical OR of two predicates
Or(Box<FilterPredicate>, Box<FilterPredicate>),
/// No predicate (accept all rows)
None,
}
/// Filter operator - filters rows based on predicate
#[derive(Debug)]
pub struct FilterOperator {
predicate: FilterPredicate,
tracker: Option<Arc<Mutex<ComputationTracker>>>,
}
impl FilterOperator {
pub fn new(predicate: FilterPredicate) -> Self {
Self {
predicate,
tracker: None,
}
}
/// Get the predicate for this filter
pub fn predicate(&self) -> &FilterPredicate {
&self.predicate
}
pub fn evaluate_predicate(&self, values: &[Value]) -> bool {
match &self.predicate {
FilterPredicate::None => true,
FilterPredicate::Equals { column_idx, value } => {
if let Some(v) = values.get(*column_idx) {
return v == value;
}
false
}
FilterPredicate::NotEquals { column_idx, value } => {
if let Some(v) = values.get(*column_idx) {
return v != value;
}
false
}
FilterPredicate::GreaterThan { column_idx, value } => {
if let Some(v) = values.get(*column_idx) {
// Compare based on value types
match (v, value) {
(Value::Integer(a), Value::Integer(b)) => return a > b,
(Value::Float(a), Value::Float(b)) => return a > b,
(Value::Text(a), Value::Text(b)) => return a.as_str() > b.as_str(),
_ => {}
}
}
false
}
FilterPredicate::GreaterThanOrEqual { column_idx, value } => {
if let Some(v) = values.get(*column_idx) {
match (v, value) {
(Value::Integer(a), Value::Integer(b)) => return a >= b,
(Value::Float(a), Value::Float(b)) => return a >= b,
(Value::Text(a), Value::Text(b)) => return a.as_str() >= b.as_str(),
_ => {}
}
}
false
}
FilterPredicate::LessThan { column_idx, value } => {
if let Some(v) = values.get(*column_idx) {
match (v, value) {
(Value::Integer(a), Value::Integer(b)) => return a < b,
(Value::Float(a), Value::Float(b)) => return a < b,
(Value::Text(a), Value::Text(b)) => return a.as_str() < b.as_str(),
_ => {}
}
}
false
}
FilterPredicate::LessThanOrEqual { column_idx, value } => {
if let Some(v) = values.get(*column_idx) {
match (v, value) {
(Value::Integer(a), Value::Integer(b)) => return a <= b,
(Value::Float(a), Value::Float(b)) => return a <= b,
(Value::Text(a), Value::Text(b)) => return a.as_str() <= b.as_str(),
_ => {}
}
}
false
}
FilterPredicate::And(left, right) => {
// Temporarily create sub-filters to evaluate
let left_filter = FilterOperator::new((**left).clone());
let right_filter = FilterOperator::new((**right).clone());
left_filter.evaluate_predicate(values) && right_filter.evaluate_predicate(values)
}
FilterPredicate::Or(left, right) => {
let left_filter = FilterOperator::new((**left).clone());
let right_filter = FilterOperator::new((**right).clone());
left_filter.evaluate_predicate(values) || right_filter.evaluate_predicate(values)
}
// Column-to-column comparisons
FilterPredicate::ColumnEquals {
left_idx,
right_idx,
} => {
if let (Some(left), Some(right)) = (values.get(*left_idx), values.get(*right_idx)) {
return left == right;
}
false
}
FilterPredicate::ColumnNotEquals {
left_idx,
right_idx,
} => {
if let (Some(left), Some(right)) = (values.get(*left_idx), values.get(*right_idx)) {
return left != right;
}
false
}
FilterPredicate::ColumnGreaterThan {
left_idx,
right_idx,
} => {
if let (Some(left), Some(right)) = (values.get(*left_idx), values.get(*right_idx)) {
match (left, right) {
(Value::Integer(a), Value::Integer(b)) => return a > b,
(Value::Float(a), Value::Float(b)) => return a > b,
(Value::Text(a), Value::Text(b)) => return a.as_str() > b.as_str(),
_ => {}
}
}
false
}
FilterPredicate::ColumnGreaterThanOrEqual {
left_idx,
right_idx,
} => {
if let (Some(left), Some(right)) = (values.get(*left_idx), values.get(*right_idx)) {
match (left, right) {
(Value::Integer(a), Value::Integer(b)) => return a >= b,
(Value::Float(a), Value::Float(b)) => return a >= b,
(Value::Text(a), Value::Text(b)) => return a.as_str() >= b.as_str(),
_ => {}
}
}
false
}
FilterPredicate::ColumnLessThan {
left_idx,
right_idx,
} => {
if let (Some(left), Some(right)) = (values.get(*left_idx), values.get(*right_idx)) {
match (left, right) {
(Value::Integer(a), Value::Integer(b)) => return a < b,
(Value::Float(a), Value::Float(b)) => return a < b,
(Value::Text(a), Value::Text(b)) => return a.as_str() < b.as_str(),
_ => {}
}
}
false
}
FilterPredicate::ColumnLessThanOrEqual {
left_idx,
right_idx,
} => {
if let (Some(left), Some(right)) = (values.get(*left_idx), values.get(*right_idx)) {
match (left, right) {
(Value::Integer(a), Value::Integer(b)) => return a <= b,
(Value::Float(a), Value::Float(b)) => return a <= b,
(Value::Text(a), Value::Text(b)) => return a.as_str() <= b.as_str(),
_ => {}
}
}
false
}
}
}
}
impl IncrementalOperator for FilterOperator {
fn eval(
&mut self,
state: &mut EvalState,
_cursors: &mut DbspStateCursors,
) -> Result<IOResult<Delta>> {
let delta = match state {
EvalState::Init { deltas } => {
// Filter operators only use left_delta, right_delta must be empty
assert!(
deltas.right.is_empty(),
"FilterOperator expects right_delta to be empty"
);
std::mem::take(&mut deltas.left)
}
_ => unreachable!(
"FilterOperator doesn't execute the state machine. Should be in Init state"
),
};
let mut output_delta = Delta::new();
// Process the delta through the filter
for (row, weight) in delta.changes {
if let Some(tracker) = &self.tracker {
tracker.lock().unwrap().record_filter();
}
// Only pass through rows that satisfy the filter predicate
// For deletes (weight < 0), we only pass them if the row values
// would have passed the filter (meaning it was in the view)
if self.evaluate_predicate(&row.values) {
output_delta.changes.push((row, weight));
}
}
*state = EvalState::Done;
Ok(IOResult::Done(output_delta))
}
fn commit(
&mut self,
deltas: DeltaPair,
_cursors: &mut DbspStateCursors,
) -> Result<IOResult<Delta>> {
// Filter operator only uses left delta, right must be empty
assert!(
deltas.right.is_empty(),
"FilterOperator expects right delta to be empty in commit"
);
let mut output_delta = Delta::new();
// Commit the delta to our internal state
// Only pass through and track rows that satisfy the filter predicate
for (row, weight) in deltas.left.changes {
if let Some(tracker) = &self.tracker {
tracker.lock().unwrap().record_filter();
}
// Only track and output rows that pass the filter
// For deletes, this means the row was in the view (its values pass the filter)
// For inserts, this means the row should be in the view
if self.evaluate_predicate(&row.values) {
output_delta.changes.push((row, weight));
}
}
Ok(IOResult::Done(output_delta))
}
fn set_tracker(&mut self, tracker: Arc<Mutex<ComputationTracker>>) {
self.tracker = Some(tracker);
}
}

View File

@@ -0,0 +1,66 @@
// Input operator for DBSP-style incremental computation
// This operator serves as the entry point for data into the incremental computation pipeline
use crate::incremental::dbsp::{Delta, DeltaPair};
use crate::incremental::operator::{
ComputationTracker, DbspStateCursors, EvalState, IncrementalOperator,
};
use crate::types::IOResult;
use crate::Result;
use std::sync::{Arc, Mutex};
/// Input operator - source of data for the circuit
/// Represents base relations/tables that receive external updates
#[derive(Debug)]
pub struct InputOperator {
#[allow(dead_code)]
name: String,
}
impl InputOperator {
pub fn new(name: String) -> Self {
Self { name }
}
}
impl IncrementalOperator for InputOperator {
fn eval(
&mut self,
state: &mut EvalState,
_cursors: &mut DbspStateCursors,
) -> Result<IOResult<Delta>> {
match state {
EvalState::Init { deltas } => {
// Input operators only use left_delta, right_delta must be empty
assert!(
deltas.right.is_empty(),
"InputOperator expects right_delta to be empty"
);
let output = std::mem::take(&mut deltas.left);
*state = EvalState::Done;
Ok(IOResult::Done(output))
}
_ => unreachable!(
"InputOperator doesn't execute the state machine. Should be in Init state"
),
}
}
fn commit(
&mut self,
deltas: DeltaPair,
_cursors: &mut DbspStateCursors,
) -> Result<IOResult<Delta>> {
// Input operator only uses left delta, right must be empty
assert!(
deltas.right.is_empty(),
"InputOperator expects right delta to be empty in commit"
);
// Input operator passes through the delta unchanged during commit
Ok(IOResult::Done(deltas.left))
}
fn set_tracker(&mut self, _tracker: Arc<Mutex<ComputationTracker>>) {
// Input operator doesn't need tracking
}
}

View File

@@ -0,0 +1,787 @@
#![allow(dead_code)]
use crate::incremental::dbsp::{Delta, DeltaPair, HashableRow};
use crate::incremental::operator::{
generate_storage_id, ComputationTracker, DbspStateCursors, EvalState, IncrementalOperator,
};
use crate::incremental::persistence::WriteRow;
use crate::types::{IOResult, ImmutableRecord, SeekKey, SeekOp, SeekResult};
use crate::{return_and_restore_if_io, return_if_io, Result, Value};
use std::sync::{Arc, Mutex};
#[derive(Debug, Clone, PartialEq)]
pub enum JoinType {
Inner,
Left,
Right,
Full,
Cross,
}
// Helper function to read the next row from the BTree for joins
fn read_next_join_row(
storage_id: i64,
join_key: &HashableRow,
last_element_id: i64,
cursors: &mut DbspStateCursors,
) -> Result<IOResult<Option<(i64, HashableRow, isize)>>> {
// Build the index key: (storage_id, zset_id, element_id)
// zset_id is the hash of the join key
let zset_id = join_key.cached_hash() as i64;
let index_key_values = vec![
Value::Integer(storage_id),
Value::Integer(zset_id),
Value::Integer(last_element_id),
];
let index_record = ImmutableRecord::from_values(&index_key_values, index_key_values.len());
let seek_result = return_if_io!(cursors
.index_cursor
.seek(SeekKey::IndexKey(&index_record), SeekOp::GT));
if !matches!(seek_result, SeekResult::Found) {
return Ok(IOResult::Done(None));
}
// Check if we're still in the same (storage_id, zset_id) range
let current_record = return_if_io!(cursors.index_cursor.record());
// Extract all needed values from the record before dropping it
let (found_storage_id, found_zset_id, element_id) = if let Some(rec) = current_record {
let values = rec.get_values();
// Index has 4 values: storage_id, zset_id, element_id, rowid (appended by WriteRow)
if values.len() >= 3 {
let found_storage_id = match &values[0].to_owned() {
Value::Integer(id) => *id,
_ => return Ok(IOResult::Done(None)),
};
let found_zset_id = match &values[1].to_owned() {
Value::Integer(id) => *id,
_ => return Ok(IOResult::Done(None)),
};
let element_id = match &values[2].to_owned() {
Value::Integer(id) => *id,
_ => {
return Ok(IOResult::Done(None));
}
};
(found_storage_id, found_zset_id, element_id)
} else {
return Ok(IOResult::Done(None));
}
} else {
return Ok(IOResult::Done(None));
};
// Now we can safely check if we're in the right range
// If we've moved to a different storage_id or zset_id, we're done
if found_storage_id != storage_id || found_zset_id != zset_id {
return Ok(IOResult::Done(None));
}
// Now get the actual row from the table using the rowid from the index
let rowid = return_if_io!(cursors.index_cursor.rowid());
if let Some(rowid) = rowid {
return_if_io!(cursors
.table_cursor
.seek(SeekKey::TableRowId(rowid), SeekOp::GE { eq_only: true }));
let table_record = return_if_io!(cursors.table_cursor.record());
if let Some(rec) = table_record {
let table_values = rec.get_values();
// Table format: [storage_id, zset_id, element_id, value_blob, weight]
if table_values.len() >= 5 {
// Deserialize the row from the blob
let value_at_3 = table_values[3].to_owned();
let blob = match value_at_3 {
Value::Blob(ref b) => b,
_ => return Ok(IOResult::Done(None)),
};
// The blob contains the serialized HashableRow
// For now, let's deserialize it simply
let row = deserialize_hashable_row(blob)?;
let weight = match &table_values[4].to_owned() {
Value::Integer(w) => *w as isize,
_ => return Ok(IOResult::Done(None)),
};
return Ok(IOResult::Done(Some((element_id, row, weight))));
}
}
}
Ok(IOResult::Done(None))
}
// Join-specific eval states
#[derive(Debug)]
pub enum JoinEvalState {
ProcessDeltaJoin {
deltas: DeltaPair,
output: Delta,
},
ProcessLeftJoin {
deltas: DeltaPair,
output: Delta,
current_idx: usize,
last_row_scanned: i64,
},
ProcessRightJoin {
deltas: DeltaPair,
output: Delta,
current_idx: usize,
last_row_scanned: i64,
},
Done {
output: Delta,
},
}
impl JoinEvalState {
fn combine_rows(
left_row: &HashableRow,
left_weight: i64,
right_row: &HashableRow,
right_weight: i64,
output: &mut Delta,
) {
// Combine the rows
let mut combined_values = left_row.values.clone();
combined_values.extend(right_row.values.clone());
// Use hash of the combined values as rowid to ensure uniqueness
let temp_row = HashableRow::new(0, combined_values.clone());
let joined_rowid = temp_row.cached_hash() as i64;
let joined_row = HashableRow::new(joined_rowid, combined_values);
// Add to output with combined weight
let combined_weight = left_weight * right_weight;
output.changes.push((joined_row, combined_weight as isize));
}
fn process_join_state(
&mut self,
cursors: &mut DbspStateCursors,
left_key_indices: &[usize],
right_key_indices: &[usize],
left_storage_id: i64,
right_storage_id: i64,
) -> Result<IOResult<Delta>> {
loop {
match self {
JoinEvalState::ProcessDeltaJoin { deltas, output } => {
// Move to ProcessLeftJoin
*self = JoinEvalState::ProcessLeftJoin {
deltas: std::mem::take(deltas),
output: std::mem::take(output),
current_idx: 0,
last_row_scanned: i64::MIN,
};
}
JoinEvalState::ProcessLeftJoin {
deltas,
output,
current_idx,
last_row_scanned,
} => {
if *current_idx >= deltas.left.changes.len() {
*self = JoinEvalState::ProcessRightJoin {
deltas: std::mem::take(deltas),
output: std::mem::take(output),
current_idx: 0,
last_row_scanned: i64::MIN,
};
} else {
let (left_row, left_weight) = &deltas.left.changes[*current_idx];
// Extract join key using provided indices
let key_values: Vec<Value> = left_key_indices
.iter()
.map(|&idx| left_row.values.get(idx).cloned().unwrap_or(Value::Null))
.collect();
let left_key = HashableRow::new(0, key_values);
let next_row = return_if_io!(read_next_join_row(
right_storage_id,
&left_key,
*last_row_scanned,
cursors
));
match next_row {
Some((element_id, right_row, right_weight)) => {
Self::combine_rows(
left_row,
(*left_weight) as i64,
&right_row,
right_weight as i64,
output,
);
// Continue scanning with this left row
*self = JoinEvalState::ProcessLeftJoin {
deltas: std::mem::take(deltas),
output: std::mem::take(output),
current_idx: *current_idx,
last_row_scanned: element_id,
};
}
None => {
// No more matches for this left row, move to next
*self = JoinEvalState::ProcessLeftJoin {
deltas: std::mem::take(deltas),
output: std::mem::take(output),
current_idx: *current_idx + 1,
last_row_scanned: i64::MIN,
};
}
}
}
}
JoinEvalState::ProcessRightJoin {
deltas,
output,
current_idx,
last_row_scanned,
} => {
if *current_idx >= deltas.right.changes.len() {
*self = JoinEvalState::Done {
output: std::mem::take(output),
};
} else {
let (right_row, right_weight) = &deltas.right.changes[*current_idx];
// Extract join key using provided indices
let key_values: Vec<Value> = right_key_indices
.iter()
.map(|&idx| right_row.values.get(idx).cloned().unwrap_or(Value::Null))
.collect();
let right_key = HashableRow::new(0, key_values);
let next_row = return_if_io!(read_next_join_row(
left_storage_id,
&right_key,
*last_row_scanned,
cursors
));
match next_row {
Some((element_id, left_row, left_weight)) => {
Self::combine_rows(
&left_row,
left_weight as i64,
right_row,
(*right_weight) as i64,
output,
);
// Continue scanning with this right row
*self = JoinEvalState::ProcessRightJoin {
deltas: std::mem::take(deltas),
output: std::mem::take(output),
current_idx: *current_idx,
last_row_scanned: element_id,
};
}
None => {
// No more matches for this right row, move to next
*self = JoinEvalState::ProcessRightJoin {
deltas: std::mem::take(deltas),
output: std::mem::take(output),
current_idx: *current_idx + 1,
last_row_scanned: i64::MIN,
};
}
}
}
}
JoinEvalState::Done { output } => {
return Ok(IOResult::Done(std::mem::take(output)));
}
}
}
}
}
#[derive(Debug)]
enum JoinCommitState {
Idle,
Eval {
eval_state: EvalState,
},
CommitLeftDelta {
deltas: DeltaPair,
output: Delta,
current_idx: usize,
write_row: WriteRow,
},
CommitRightDelta {
deltas: DeltaPair,
output: Delta,
current_idx: usize,
write_row: WriteRow,
},
Invalid,
}
/// Join operator - performs incremental join between two relations
/// Implements the DBSP formula: δ(R ⋈ S) = (δR ⋈ S) (R ⋈ δS) (δR ⋈ δS)
#[derive(Debug)]
pub struct JoinOperator {
/// Unique operator ID for indexing in persistent storage
operator_id: usize,
/// Type of join to perform
join_type: JoinType,
/// Column indices for extracting join keys from left input
left_key_indices: Vec<usize>,
/// Column indices for extracting join keys from right input
right_key_indices: Vec<usize>,
/// Column names from left input
left_columns: Vec<String>,
/// Column names from right input
right_columns: Vec<String>,
/// Tracker for computation statistics
tracker: Option<Arc<Mutex<ComputationTracker>>>,
commit_state: JoinCommitState,
}
impl JoinOperator {
pub fn new(
operator_id: usize,
join_type: JoinType,
left_key_indices: Vec<usize>,
right_key_indices: Vec<usize>,
left_columns: Vec<String>,
right_columns: Vec<String>,
) -> Result<Self> {
// Check for unsupported join types
match join_type {
JoinType::Left => {
return Err(crate::LimboError::ParseError(
"LEFT OUTER JOIN is not yet supported in incremental views".to_string(),
))
}
JoinType::Right => {
return Err(crate::LimboError::ParseError(
"RIGHT OUTER JOIN is not yet supported in incremental views".to_string(),
))
}
JoinType::Full => {
return Err(crate::LimboError::ParseError(
"FULL OUTER JOIN is not yet supported in incremental views".to_string(),
))
}
JoinType::Cross => {
return Err(crate::LimboError::ParseError(
"CROSS JOIN is not yet supported in incremental views".to_string(),
))
}
JoinType::Inner => {} // Inner join is supported
}
Ok(Self {
operator_id,
join_type,
left_key_indices,
right_key_indices,
left_columns,
right_columns,
tracker: None,
commit_state: JoinCommitState::Idle,
})
}
/// Extract join key from row values using the specified indices
fn extract_join_key(&self, values: &[Value], indices: &[usize]) -> HashableRow {
let key_values: Vec<Value> = indices
.iter()
.map(|&idx| values.get(idx).cloned().unwrap_or(Value::Null))
.collect();
// Use 0 as a dummy rowid for join keys. They don't come from a table,
// so they don't need a rowid. Their key will be the hash of the row values.
HashableRow::new(0, key_values)
}
/// Generate storage ID for left table
fn left_storage_id(&self) -> i64 {
// Use column_index=0 for left side
generate_storage_id(self.operator_id, 0, 0)
}
/// Generate storage ID for right table
fn right_storage_id(&self) -> i64 {
// Use column_index=1 for right side
generate_storage_id(self.operator_id, 1, 0)
}
/// SQL-compliant comparison for join keys
/// Returns true if keys match according to SQL semantics (NULL != NULL)
fn sql_keys_equal(left_key: &HashableRow, right_key: &HashableRow) -> bool {
if left_key.values.len() != right_key.values.len() {
return false;
}
for (left_val, right_val) in left_key.values.iter().zip(right_key.values.iter()) {
// In SQL, NULL never equals NULL
if matches!(left_val, Value::Null) || matches!(right_val, Value::Null) {
return false;
}
// For non-NULL values, use regular comparison
if left_val != right_val {
return false;
}
}
true
}
fn process_join_state(
&mut self,
state: &mut EvalState,
cursors: &mut DbspStateCursors,
) -> Result<IOResult<Delta>> {
// Get the join state out of the enum
match state {
EvalState::Join(js) => js.process_join_state(
cursors,
&self.left_key_indices,
&self.right_key_indices,
self.left_storage_id(),
self.right_storage_id(),
),
_ => panic!("process_join_state called with non-join state"),
}
}
fn eval_internal(
&mut self,
state: &mut EvalState,
cursors: &mut DbspStateCursors,
) -> Result<IOResult<Delta>> {
loop {
let loop_state = std::mem::replace(state, EvalState::Uninitialized);
match loop_state {
EvalState::Uninitialized => {
panic!("Cannot eval JoinOperator with Uninitialized state");
}
EvalState::Init { deltas } => {
let mut output = Delta::new();
// Component 3: δR ⋈ δS (left delta join right delta)
for (left_row, left_weight) in &deltas.left.changes {
let left_key =
self.extract_join_key(&left_row.values, &self.left_key_indices);
for (right_row, right_weight) in &deltas.right.changes {
let right_key =
self.extract_join_key(&right_row.values, &self.right_key_indices);
if Self::sql_keys_equal(&left_key, &right_key) {
if let Some(tracker) = &self.tracker {
tracker.lock().unwrap().record_join_lookup();
}
// Combine the rows
let mut combined_values = left_row.values.clone();
combined_values.extend(right_row.values.clone());
// Create the joined row with a unique rowid
// Use hash of the combined values to ensure uniqueness
let temp_row = HashableRow::new(0, combined_values.clone());
let joined_rowid = temp_row.cached_hash() as i64;
let joined_row =
HashableRow::new(joined_rowid, combined_values.clone());
// Add to output with combined weight
let combined_weight = left_weight * right_weight;
output.changes.push((joined_row, combined_weight));
}
}
}
*state = EvalState::Join(Box::new(JoinEvalState::ProcessDeltaJoin {
deltas,
output,
}));
}
EvalState::Join(join_state) => {
*state = EvalState::Join(join_state);
let output = return_if_io!(self.process_join_state(state, cursors));
return Ok(IOResult::Done(output));
}
EvalState::Done => {
return Ok(IOResult::Done(Delta::new()));
}
EvalState::Aggregate(_) => {
panic!("Aggregate state should not appear in join operator");
}
}
}
}
}
// Helper to deserialize a HashableRow from a blob
fn deserialize_hashable_row(blob: &[u8]) -> Result<HashableRow> {
// Simple deserialization - this needs to match how we serialize in commit
// Format: [rowid:8 bytes][num_values:4 bytes][values...]
if blob.len() < 12 {
return Err(crate::LimboError::InternalError(
"Invalid blob size".to_string(),
));
}
let rowid = i64::from_le_bytes(blob[0..8].try_into().unwrap());
let num_values = u32::from_le_bytes(blob[8..12].try_into().unwrap()) as usize;
let mut values = Vec::new();
let mut offset = 12;
for _ in 0..num_values {
if offset >= blob.len() {
break;
}
let type_tag = blob[offset];
offset += 1;
match type_tag {
0 => values.push(Value::Null),
1 => {
if offset + 8 <= blob.len() {
let i = i64::from_le_bytes(blob[offset..offset + 8].try_into().unwrap());
values.push(Value::Integer(i));
offset += 8;
}
}
2 => {
if offset + 8 <= blob.len() {
let f = f64::from_le_bytes(blob[offset..offset + 8].try_into().unwrap());
values.push(Value::Float(f));
offset += 8;
}
}
3 => {
if offset + 4 <= blob.len() {
let len =
u32::from_le_bytes(blob[offset..offset + 4].try_into().unwrap()) as usize;
offset += 4;
if offset + len < blob.len() {
let text_bytes = blob[offset..offset + len].to_vec();
offset += len;
let subtype = match blob[offset] {
0 => crate::types::TextSubtype::Text,
1 => crate::types::TextSubtype::Json,
_ => crate::types::TextSubtype::Text,
};
offset += 1;
values.push(Value::Text(crate::types::Text {
value: text_bytes,
subtype,
}));
}
}
}
4 => {
if offset + 4 <= blob.len() {
let len =
u32::from_le_bytes(blob[offset..offset + 4].try_into().unwrap()) as usize;
offset += 4;
if offset + len <= blob.len() {
let blob_data = blob[offset..offset + len].to_vec();
values.push(Value::Blob(blob_data));
offset += len;
}
}
}
_ => break, // Unknown type tag
}
}
Ok(HashableRow::new(rowid, values))
}
// Helper to serialize a HashableRow to a blob
fn serialize_hashable_row(row: &HashableRow) -> Vec<u8> {
let mut blob = Vec::new();
// Write rowid
blob.extend_from_slice(&row.rowid.to_le_bytes());
// Write number of values
blob.extend_from_slice(&(row.values.len() as u32).to_le_bytes());
// Write each value directly with type tags (like AggregateState does)
for value in &row.values {
match value {
Value::Null => blob.push(0u8),
Value::Integer(i) => {
blob.push(1u8);
blob.extend_from_slice(&i.to_le_bytes());
}
Value::Float(f) => {
blob.push(2u8);
blob.extend_from_slice(&f.to_le_bytes());
}
Value::Text(s) => {
blob.push(3u8);
let bytes = &s.value;
blob.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
blob.extend_from_slice(bytes);
blob.push(s.subtype as u8);
}
Value::Blob(b) => {
blob.push(4u8);
blob.extend_from_slice(&(b.len() as u32).to_le_bytes());
blob.extend_from_slice(b);
}
}
}
blob
}
impl IncrementalOperator for JoinOperator {
fn eval(
&mut self,
state: &mut EvalState,
cursors: &mut DbspStateCursors,
) -> Result<IOResult<Delta>> {
let delta = return_if_io!(self.eval_internal(state, cursors));
Ok(IOResult::Done(delta))
}
fn commit(
&mut self,
deltas: DeltaPair,
cursors: &mut DbspStateCursors,
) -> Result<IOResult<Delta>> {
loop {
let mut state = std::mem::replace(&mut self.commit_state, JoinCommitState::Invalid);
match &mut state {
JoinCommitState::Idle => {
self.commit_state = JoinCommitState::Eval {
eval_state: deltas.clone().into(),
}
}
JoinCommitState::Eval { ref mut eval_state } => {
let output = return_and_restore_if_io!(
&mut self.commit_state,
state,
self.eval(eval_state, cursors)
);
self.commit_state = JoinCommitState::CommitLeftDelta {
deltas: deltas.clone(),
output,
current_idx: 0,
write_row: WriteRow::new(),
};
}
JoinCommitState::CommitLeftDelta {
deltas,
output,
current_idx,
ref mut write_row,
} => {
if *current_idx >= deltas.left.changes.len() {
self.commit_state = JoinCommitState::CommitRightDelta {
deltas: std::mem::take(deltas),
output: std::mem::take(output),
current_idx: 0,
write_row: WriteRow::new(),
};
continue;
}
let (row, weight) = &deltas.left.changes[*current_idx];
// Extract join key from the left row
let join_key = self.extract_join_key(&row.values, &self.left_key_indices);
// The index key: (storage_id, zset_id, element_id)
// zset_id is the hash of the join key, element_id is hash of the row
let storage_id = self.left_storage_id();
let zset_id = join_key.cached_hash() as i64;
let element_id = row.cached_hash() as i64;
let index_key = vec![
Value::Integer(storage_id),
Value::Integer(zset_id),
Value::Integer(element_id),
];
// The record values: we'll store the serialized row as a blob
let row_blob = serialize_hashable_row(row);
let record_values = vec![
Value::Integer(self.left_storage_id()),
Value::Integer(join_key.cached_hash() as i64),
Value::Integer(row.cached_hash() as i64),
Value::Blob(row_blob),
];
// Use return_and_restore_if_io to handle I/O properly
return_and_restore_if_io!(
&mut self.commit_state,
state,
write_row.write_row(cursors, index_key, record_values, *weight)
);
self.commit_state = JoinCommitState::CommitLeftDelta {
deltas: deltas.clone(),
output: output.clone(),
current_idx: *current_idx + 1,
write_row: WriteRow::new(),
};
}
JoinCommitState::CommitRightDelta {
deltas,
output,
current_idx,
ref mut write_row,
} => {
if *current_idx >= deltas.right.changes.len() {
// Reset to Idle state for next commit
self.commit_state = JoinCommitState::Idle;
return Ok(IOResult::Done(output.clone()));
}
let (row, weight) = &deltas.right.changes[*current_idx];
// Extract join key from the right row
let join_key = self.extract_join_key(&row.values, &self.right_key_indices);
// The index key: (storage_id, zset_id, element_id)
let index_key = vec![
Value::Integer(self.right_storage_id()),
Value::Integer(join_key.cached_hash() as i64),
Value::Integer(row.cached_hash() as i64),
];
// The record values: we'll store the serialized row as a blob
let row_blob = serialize_hashable_row(row);
let record_values = vec![
Value::Integer(self.right_storage_id()),
Value::Integer(join_key.cached_hash() as i64),
Value::Integer(row.cached_hash() as i64),
Value::Blob(row_blob),
];
// Use return_and_restore_if_io to handle I/O properly
return_and_restore_if_io!(
&mut self.commit_state,
state,
write_row.write_row(cursors, index_key, record_values, *weight)
);
self.commit_state = JoinCommitState::CommitRightDelta {
deltas: std::mem::take(deltas),
output: std::mem::take(output),
current_idx: *current_idx + 1,
write_row: WriteRow::new(),
};
}
JoinCommitState::Invalid => {
panic!("Invalid join commit state");
}
}
}
}
fn set_tracker(&mut self, tracker: Arc<Mutex<ComputationTracker>>) {
self.tracker = Some(tracker);
}
}

View File

@@ -1,7 +1,12 @@
pub mod aggregate_operator;
pub mod compiler;
pub mod cursor;
pub mod dbsp;
pub mod expr_compiler;
pub mod filter_operator;
pub mod input_operator;
pub mod join_operator;
pub mod operator;
pub mod persistence;
pub mod project_operator;
pub mod view;

File diff suppressed because it is too large Load Diff

View File

@@ -1,12 +1,7 @@
use crate::incremental::dbsp::HashableRow;
use crate::incremental::operator::{
generate_storage_id, AggColumnInfo, AggregateFunction, AggregateOperator, AggregateState,
DbspStateCursors, MinMaxDeltas, AGG_TYPE_MINMAX,
};
use crate::incremental::operator::{AggregateFunction, AggregateState, DbspStateCursors};
use crate::storage::btree::{BTreeCursor, BTreeKey};
use crate::types::{IOResult, ImmutableRecord, RefValue, SeekKey, SeekOp, SeekResult};
use crate::types::{IOResult, ImmutableRecord, SeekKey, SeekOp, SeekResult};
use crate::{return_if_io, LimboError, Result, Value};
use std::collections::{HashMap, HashSet};
#[derive(Debug, Default)]
pub enum ReadRecord {
@@ -290,672 +285,3 @@ impl WriteRow {
}
}
}
/// State machine for recomputing MIN/MAX values after deletion
#[derive(Debug)]
pub enum RecomputeMinMax {
ProcessElements {
/// Current column being processed
current_column_idx: usize,
/// Columns to process (combined MIN and MAX)
columns_to_process: Vec<(String, String, bool)>, // (group_key, column_name, is_min)
/// MIN/MAX deltas for checking values and weights
min_max_deltas: MinMaxDeltas,
},
Scan {
/// Columns still to process
columns_to_process: Vec<(String, String, bool)>,
/// Current index in columns_to_process (will resume from here)
current_column_idx: usize,
/// MIN/MAX deltas for checking values and weights
min_max_deltas: MinMaxDeltas,
/// Current group key being processed
group_key: String,
/// Current column name being processed
column_name: String,
/// Whether we're looking for MIN (true) or MAX (false)
is_min: bool,
/// The scan state machine for finding the new MIN/MAX
scan_state: Box<ScanState>,
},
Done,
}
impl RecomputeMinMax {
pub fn new(
min_max_deltas: MinMaxDeltas,
existing_groups: &HashMap<String, AggregateState>,
operator: &AggregateOperator,
) -> Self {
let mut groups_to_check: HashSet<(String, String, bool)> = HashSet::new();
// Remember the min_max_deltas are essentially just the only column that is affected by
// this min/max, in delta (actually ZSet - consolidated delta) format. This makes it easier
// for us to consume it in here.
//
// The most challenging case is the case where there is a retraction, since we need to go
// back to the index.
for (group_key_str, values) in &min_max_deltas {
for ((col_name, hashable_row), weight) in values {
let col_info = operator.column_min_max.get(col_name);
let value = &hashable_row.values[0];
if *weight < 0 {
// Deletion detected - check if it's the current MIN/MAX
if let Some(state) = existing_groups.get(group_key_str) {
// Check for MIN
if let Some(current_min) = state.mins.get(col_name) {
if current_min == value {
groups_to_check.insert((
group_key_str.clone(),
col_name.clone(),
true,
));
}
}
// Check for MAX
if let Some(current_max) = state.maxs.get(col_name) {
if current_max == value {
groups_to_check.insert((
group_key_str.clone(),
col_name.clone(),
false,
));
}
}
}
} else if *weight > 0 {
// If it is not found in the existing groups, then we only need to care
// about this if this is a new record being inserted
if let Some(info) = col_info {
if info.has_min {
groups_to_check.insert((group_key_str.clone(), col_name.clone(), true));
}
if info.has_max {
groups_to_check.insert((
group_key_str.clone(),
col_name.clone(),
false,
));
}
}
}
}
}
if groups_to_check.is_empty() {
// No recomputation or initialization needed
Self::Done
} else {
// Convert HashSet to Vec for indexed processing
let groups_to_check_vec: Vec<_> = groups_to_check.into_iter().collect();
Self::ProcessElements {
current_column_idx: 0,
columns_to_process: groups_to_check_vec,
min_max_deltas,
}
}
}
pub fn process(
&mut self,
existing_groups: &mut HashMap<String, AggregateState>,
operator: &AggregateOperator,
cursors: &mut DbspStateCursors,
) -> Result<IOResult<()>> {
loop {
match self {
RecomputeMinMax::ProcessElements {
current_column_idx,
columns_to_process,
min_max_deltas,
} => {
if *current_column_idx >= columns_to_process.len() {
*self = RecomputeMinMax::Done;
return Ok(IOResult::Done(()));
}
let (group_key, column_name, is_min) =
columns_to_process[*current_column_idx].clone();
// Get column index from pre-computed info
let column_index = operator
.column_min_max
.get(&column_name)
.map(|info| info.index)
.unwrap(); // Should always exist since we're processing known columns
// Get current value from existing state
let current_value = existing_groups.get(&group_key).and_then(|state| {
if is_min {
state.mins.get(&column_name).cloned()
} else {
state.maxs.get(&column_name).cloned()
}
});
// Create storage keys for index lookup
let storage_id =
generate_storage_id(operator.operator_id, column_index, AGG_TYPE_MINMAX);
let zset_id = operator.generate_group_rowid(&group_key);
// Get the values for this group from min_max_deltas
let group_values = min_max_deltas.get(&group_key).cloned().unwrap_or_default();
let columns_to_process = std::mem::take(columns_to_process);
let min_max_deltas = std::mem::take(min_max_deltas);
let scan_state = if is_min {
Box::new(ScanState::new_for_min(
current_value,
group_key.clone(),
column_name.clone(),
storage_id,
zset_id,
group_values,
))
} else {
Box::new(ScanState::new_for_max(
current_value,
group_key.clone(),
column_name.clone(),
storage_id,
zset_id,
group_values,
))
};
*self = RecomputeMinMax::Scan {
columns_to_process,
current_column_idx: *current_column_idx,
min_max_deltas,
group_key,
column_name,
is_min,
scan_state,
};
}
RecomputeMinMax::Scan {
columns_to_process,
current_column_idx,
min_max_deltas,
group_key,
column_name,
is_min,
scan_state,
} => {
// Find new value using the scan state machine
let new_value = return_if_io!(scan_state.find_new_value(cursors));
// Update the state with new value (create if doesn't exist)
let state = existing_groups.entry(group_key.clone()).or_default();
if *is_min {
if let Some(min_val) = new_value {
state.mins.insert(column_name.clone(), min_val);
} else {
state.mins.remove(column_name);
}
} else if let Some(max_val) = new_value {
state.maxs.insert(column_name.clone(), max_val);
} else {
state.maxs.remove(column_name);
}
// Move to next column
let min_max_deltas = std::mem::take(min_max_deltas);
let columns_to_process = std::mem::take(columns_to_process);
*self = RecomputeMinMax::ProcessElements {
current_column_idx: *current_column_idx + 1,
columns_to_process,
min_max_deltas,
};
}
RecomputeMinMax::Done => {
return Ok(IOResult::Done(()));
}
}
}
}
}
/// State machine for scanning through the index to find new MIN/MAX values
#[derive(Debug)]
pub enum ScanState {
CheckCandidate {
/// Current candidate value for MIN/MAX
candidate: Option<Value>,
/// Group key being processed
group_key: String,
/// Column name being processed
column_name: String,
/// Storage ID for the index seek
storage_id: i64,
/// ZSet ID for the group
zset_id: i64,
/// Group values from MinMaxDeltas: (column_name, HashableRow) -> weight
group_values: HashMap<(String, HashableRow), isize>,
/// Whether we're looking for MIN (true) or MAX (false)
is_min: bool,
},
FetchNextCandidate {
/// Current candidate to seek past
current_candidate: Value,
/// Group key being processed
group_key: String,
/// Column name being processed
column_name: String,
/// Storage ID for the index seek
storage_id: i64,
/// ZSet ID for the group
zset_id: i64,
/// Group values from MinMaxDeltas: (column_name, HashableRow) -> weight
group_values: HashMap<(String, HashableRow), isize>,
/// Whether we're looking for MIN (true) or MAX (false)
is_min: bool,
},
Done {
/// The final MIN/MAX value found
result: Option<Value>,
},
}
impl ScanState {
pub fn new_for_min(
current_min: Option<Value>,
group_key: String,
column_name: String,
storage_id: i64,
zset_id: i64,
group_values: HashMap<(String, HashableRow), isize>,
) -> Self {
Self::CheckCandidate {
candidate: current_min,
group_key,
column_name,
storage_id,
zset_id,
group_values,
is_min: true,
}
}
// Extract a new candidate from the index. It is possible that, when searching,
// we end up going into a different operator altogether. That means we have
// exhausted this operator (or group) entirely, and no good candidate was found
fn extract_new_candidate(
cursors: &mut DbspStateCursors,
index_record: &ImmutableRecord,
seek_op: SeekOp,
storage_id: i64,
zset_id: i64,
) -> Result<IOResult<Option<Value>>> {
let seek_result = return_if_io!(cursors
.index_cursor
.seek(SeekKey::IndexKey(index_record), seek_op));
if !matches!(seek_result, SeekResult::Found) {
return Ok(IOResult::Done(None));
}
let record = return_if_io!(cursors.index_cursor.record()).ok_or_else(|| {
LimboError::InternalError(
"Record found on the cursor, but could not be read".to_string(),
)
})?;
let values = record.get_values();
if values.len() < 3 {
return Ok(IOResult::Done(None));
}
let Some(rec_storage_id) = values.first() else {
return Ok(IOResult::Done(None));
};
let Some(rec_zset_id) = values.get(1) else {
return Ok(IOResult::Done(None));
};
// Check if we're still in the same group
if let (RefValue::Integer(rec_sid), RefValue::Integer(rec_zid)) =
(rec_storage_id, rec_zset_id)
{
if *rec_sid != storage_id || *rec_zid != zset_id {
return Ok(IOResult::Done(None));
}
} else {
return Ok(IOResult::Done(None));
}
// Get the value (3rd element)
Ok(IOResult::Done(values.get(2).map(|v| v.to_owned())))
}
pub fn new_for_max(
current_max: Option<Value>,
group_key: String,
column_name: String,
storage_id: i64,
zset_id: i64,
group_values: HashMap<(String, HashableRow), isize>,
) -> Self {
Self::CheckCandidate {
candidate: current_max,
group_key,
column_name,
storage_id,
zset_id,
group_values,
is_min: false,
}
}
pub fn find_new_value(
&mut self,
cursors: &mut DbspStateCursors,
) -> Result<IOResult<Option<Value>>> {
loop {
match self {
ScanState::CheckCandidate {
candidate,
group_key,
column_name,
storage_id,
zset_id,
group_values,
is_min,
} => {
// First, check if we have a candidate
if let Some(cand_val) = candidate {
// Check if the candidate is retracted (weight <= 0)
// Create a HashableRow to look up the weight
let hashable_cand = HashableRow::new(0, vec![cand_val.clone()]);
let key = (column_name.clone(), hashable_cand);
let is_retracted =
group_values.get(&key).is_some_and(|weight| *weight <= 0);
if is_retracted {
// Candidate is retracted, need to fetch next from index
*self = ScanState::FetchNextCandidate {
current_candidate: cand_val.clone(),
group_key: std::mem::take(group_key),
column_name: std::mem::take(column_name),
storage_id: *storage_id,
zset_id: *zset_id,
group_values: std::mem::take(group_values),
is_min: *is_min,
};
continue;
}
}
// Candidate is valid or we have no candidate
// Now find the best value from insertions in group_values
let mut best_from_zset = None;
for ((col, hashable_val), weight) in group_values.iter() {
if col == column_name && *weight > 0 {
let value = &hashable_val.values[0];
// Skip NULL values - they don't participate in MIN/MAX
if value == &Value::Null {
continue;
}
// This is an insertion for our column
if let Some(ref current_best) = best_from_zset {
if *is_min {
if value.cmp(current_best) == std::cmp::Ordering::Less {
best_from_zset = Some(value.clone());
}
} else if value.cmp(current_best) == std::cmp::Ordering::Greater {
best_from_zset = Some(value.clone());
}
} else {
best_from_zset = Some(value.clone());
}
}
}
// Compare candidate with best from ZSet, filtering out NULLs
let result = match (&candidate, &best_from_zset) {
(Some(cand), Some(zset_val)) if cand != &Value::Null => {
if *is_min {
if zset_val.cmp(cand) == std::cmp::Ordering::Less {
Some(zset_val.clone())
} else {
Some(cand.clone())
}
} else if zset_val.cmp(cand) == std::cmp::Ordering::Greater {
Some(zset_val.clone())
} else {
Some(cand.clone())
}
}
(Some(cand), None) if cand != &Value::Null => Some(cand.clone()),
(None, Some(zset_val)) => Some(zset_val.clone()),
(Some(cand), Some(_)) if cand == &Value::Null => best_from_zset,
_ => None,
};
*self = ScanState::Done { result };
}
ScanState::FetchNextCandidate {
current_candidate,
group_key,
column_name,
storage_id,
zset_id,
group_values,
is_min,
} => {
// Seek to the next value in the index
let index_key = vec![
Value::Integer(*storage_id),
Value::Integer(*zset_id),
current_candidate.clone(),
];
let index_record = ImmutableRecord::from_values(&index_key, index_key.len());
let seek_op = if *is_min {
SeekOp::GT // For MIN, seek greater than current
} else {
SeekOp::LT // For MAX, seek less than current
};
let new_candidate = return_if_io!(Self::extract_new_candidate(
cursors,
&index_record,
seek_op,
*storage_id,
*zset_id
));
*self = ScanState::CheckCandidate {
candidate: new_candidate,
group_key: std::mem::take(group_key),
column_name: std::mem::take(column_name),
storage_id: *storage_id,
zset_id: *zset_id,
group_values: std::mem::take(group_values),
is_min: *is_min,
};
}
ScanState::Done { result } => {
return Ok(IOResult::Done(result.clone()));
}
}
}
}
}
/// State machine for persisting Min/Max values to storage
#[derive(Debug)]
pub enum MinMaxPersistState {
Init {
min_max_deltas: MinMaxDeltas,
group_keys: Vec<String>,
},
ProcessGroup {
min_max_deltas: MinMaxDeltas,
group_keys: Vec<String>,
group_idx: usize,
value_idx: usize,
},
WriteValue {
min_max_deltas: MinMaxDeltas,
group_keys: Vec<String>,
group_idx: usize,
value_idx: usize,
value: Value,
column_name: String,
weight: isize,
write_row: WriteRow,
},
Done,
}
impl MinMaxPersistState {
pub fn new(min_max_deltas: MinMaxDeltas) -> Self {
let group_keys: Vec<String> = min_max_deltas.keys().cloned().collect();
Self::Init {
min_max_deltas,
group_keys,
}
}
pub fn persist_min_max(
&mut self,
operator_id: usize,
column_min_max: &HashMap<String, AggColumnInfo>,
cursors: &mut DbspStateCursors,
generate_group_rowid: impl Fn(&str) -> i64,
) -> Result<IOResult<()>> {
loop {
match self {
MinMaxPersistState::Init {
min_max_deltas,
group_keys,
} => {
let min_max_deltas = std::mem::take(min_max_deltas);
let group_keys = std::mem::take(group_keys);
*self = MinMaxPersistState::ProcessGroup {
min_max_deltas,
group_keys,
group_idx: 0,
value_idx: 0,
};
}
MinMaxPersistState::ProcessGroup {
min_max_deltas,
group_keys,
group_idx,
value_idx,
} => {
// Check if we're past all groups
if *group_idx >= group_keys.len() {
*self = MinMaxPersistState::Done;
continue;
}
let group_key_str = &group_keys[*group_idx];
let values = &min_max_deltas[group_key_str]; // This should always exist
// Convert HashMap to Vec for indexed access
let values_vec: Vec<_> = values.iter().collect();
// Check if we have more values in current group
if *value_idx >= values_vec.len() {
*group_idx += 1;
*value_idx = 0;
// Continue to check if we're past all groups now
continue;
}
// Process current value and extract what we need before taking ownership
let ((column_name, hashable_row), weight) = values_vec[*value_idx];
let column_name = column_name.clone();
let value = hashable_row.values[0].clone(); // Extract the Value from HashableRow
let weight = *weight;
let min_max_deltas = std::mem::take(min_max_deltas);
let group_keys = std::mem::take(group_keys);
*self = MinMaxPersistState::WriteValue {
min_max_deltas,
group_keys,
group_idx: *group_idx,
value_idx: *value_idx,
column_name,
value,
weight,
write_row: WriteRow::new(),
};
}
MinMaxPersistState::WriteValue {
min_max_deltas,
group_keys,
group_idx,
value_idx,
value,
column_name,
weight,
write_row,
} => {
// Should have exited in the previous state
assert!(*group_idx < group_keys.len());
let group_key_str = &group_keys[*group_idx];
// Get the column index from the pre-computed map
let column_info = column_min_max
.get(&*column_name)
.expect("Column should exist in column_min_max map");
let column_index = column_info.index;
// Build the key components for MinMax storage using new encoding
let storage_id =
generate_storage_id(operator_id, column_index, AGG_TYPE_MINMAX);
let zset_id = generate_group_rowid(group_key_str);
// element_id is the actual value for Min/Max
let element_id_val = value.clone();
// Create index key
let index_key = vec![
Value::Integer(storage_id),
Value::Integer(zset_id),
element_id_val.clone(),
];
// Record values (operator_id, zset_id, element_id, unused_placeholder)
// For MIN/MAX, the element_id IS the value, so we use NULL for the 4th column
let record_values = vec![
Value::Integer(storage_id),
Value::Integer(zset_id),
element_id_val.clone(),
Value::Null, // Placeholder - not used for MIN/MAX
];
return_if_io!(write_row.write_row(
cursors,
index_key.clone(),
record_values,
*weight
));
// Move to next value
let min_max_deltas = std::mem::take(min_max_deltas);
let group_keys = std::mem::take(group_keys);
*self = MinMaxPersistState::ProcessGroup {
min_max_deltas,
group_keys,
group_idx: *group_idx,
value_idx: *value_idx + 1,
};
}
MinMaxPersistState::Done => {
return Ok(IOResult::Done(()));
}
}
}
}
}

View File

@@ -0,0 +1,168 @@
// Project operator for DBSP-style incremental computation
// This operator projects/transforms columns in a relational stream
use crate::incremental::dbsp::{Delta, DeltaPair, HashableRow};
use crate::incremental::expr_compiler::CompiledExpression;
use crate::incremental::operator::{
ComputationTracker, DbspStateCursors, EvalState, IncrementalOperator,
};
use crate::types::IOResult;
use crate::{Connection, Database, Result, Value};
use std::sync::{Arc, Mutex};
#[derive(Debug, Clone)]
pub struct ProjectColumn {
/// Compiled expression (handles both trivial columns and complex expressions)
pub compiled: CompiledExpression,
}
/// Project operator - selects/transforms columns
#[derive(Clone)]
pub struct ProjectOperator {
columns: Vec<ProjectColumn>,
input_column_names: Vec<String>,
output_column_names: Vec<String>,
tracker: Option<Arc<Mutex<ComputationTracker>>>,
// Internal in-memory connection for expression evaluation
// Programs are very dependent on having a connection, so give it one.
//
// We could in theory pass the current connection, but there are a host of problems with that.
// For example: during a write transaction, where views are usually updated, we have autocommit
// on. When the program we are executing calls Halt, it will try to commit the current
// transaction, which is absolutely incorrect.
//
// There are other ways to solve this, but a read-only connection to an empty in-memory
// database gives us the closest environment we need to execute expressions.
internal_conn: Arc<Connection>,
}
impl std::fmt::Debug for ProjectOperator {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ProjectOperator")
.field("columns", &self.columns)
.field("input_column_names", &self.input_column_names)
.field("output_column_names", &self.output_column_names)
.finish()
}
}
impl ProjectOperator {
/// Create a ProjectOperator from pre-compiled expressions
pub fn from_compiled(
compiled_exprs: Vec<CompiledExpression>,
aliases: Vec<Option<String>>,
input_column_names: Vec<String>,
output_column_names: Vec<String>,
) -> crate::Result<Self> {
// Set up internal connection for expression evaluation
let io = Arc::new(crate::MemoryIO::new());
let db = Database::open_file(
io, ":memory:", false, // no MVCC needed for expression evaluation
false, // no indexes needed
)?;
let internal_conn = db.connect()?;
// Set to read-only mode and disable auto-commit since we're only evaluating expressions
internal_conn.query_only.set(true);
internal_conn.auto_commit.set(false);
// Create ProjectColumn structs from compiled expressions
let columns: Vec<ProjectColumn> = compiled_exprs
.into_iter()
.zip(aliases)
.map(|(compiled, _alias)| ProjectColumn { compiled })
.collect();
Ok(Self {
columns,
input_column_names,
output_column_names,
tracker: None,
internal_conn,
})
}
fn project_values(&self, values: &[Value]) -> Vec<Value> {
let mut output = Vec::new();
for col in &self.columns {
// Use the internal connection's pager for expression evaluation
let internal_pager = self.internal_conn.pager.borrow().clone();
// Execute the compiled expression (handles both columns and complex expressions)
let result = col
.compiled
.execute(values, internal_pager)
.expect("Failed to execute compiled expression for the Project operator");
output.push(result);
}
output
}
}
impl IncrementalOperator for ProjectOperator {
fn eval(
&mut self,
state: &mut EvalState,
_cursors: &mut DbspStateCursors,
) -> Result<IOResult<Delta>> {
let delta = match state {
EvalState::Init { deltas } => {
// Project operators only use left_delta, right_delta must be empty
assert!(
deltas.right.is_empty(),
"ProjectOperator expects right_delta to be empty"
);
std::mem::take(&mut deltas.left)
}
_ => unreachable!(
"ProjectOperator doesn't execute the state machine. Should be in Init state"
),
};
let mut output_delta = Delta::new();
for (row, weight) in delta.changes {
if let Some(tracker) = &self.tracker {
tracker.lock().unwrap().record_project();
}
let projected = self.project_values(&row.values);
let projected_row = HashableRow::new(row.rowid, projected);
output_delta.changes.push((projected_row, weight));
}
*state = EvalState::Done;
Ok(IOResult::Done(output_delta))
}
fn commit(
&mut self,
deltas: DeltaPair,
_cursors: &mut DbspStateCursors,
) -> Result<IOResult<Delta>> {
// Project operator only uses left delta, right must be empty
assert!(
deltas.right.is_empty(),
"ProjectOperator expects right delta to be empty in commit"
);
let mut output_delta = Delta::new();
// Commit the delta to our internal state and build output
for (row, weight) in &deltas.left.changes {
if let Some(tracker) = &self.tracker {
tracker.lock().unwrap().record_project();
}
let projected = self.project_values(&row.values);
let projected_row = HashableRow::new(row.rowid, projected);
output_delta.changes.push((projected_row, *weight));
}
Ok(crate::types::IOResult::Done(output_delta))
}
fn set_tracker(&mut self, tracker: Arc<Mutex<ComputationTracker>>) {
self.tracker = Some(tracker);
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -527,7 +527,7 @@ impl Schema {
let table = Arc::new(Table::BTree(Arc::new(BTreeTable {
name: view_name.clone(),
root_page: main_root,
columns: incremental_view.columns.clone(),
columns: incremental_view.column_schema.flat_columns(),
primary_key_columns: Vec::new(),
has_rowid: true,
is_strict: false,
@@ -673,11 +673,12 @@ impl Schema {
..
} => {
// Extract actual columns from the SELECT statement
let view_columns = crate::util::extract_view_columns(&select, self);
let view_column_schema =
crate::util::extract_view_columns(&select, self)?;
// If column names were provided in CREATE VIEW (col1, col2, ...),
// use them to rename the columns
let mut final_columns = view_columns;
let mut final_columns = view_column_schema.flat_columns();
for (i, indexed_col) in column_names.iter().enumerate() {
if let Some(col) = final_columns.get_mut(i) {
col.name = Some(indexed_col.col_name.to_string());

File diff suppressed because it is too large Load Diff

View File

@@ -663,7 +663,7 @@ fn parse_table(
let btree_table = Arc::new(crate::schema::BTreeTable {
name: view_guard.name().to_string(),
root_page,
columns: view_guard.columns.clone(),
columns: view_guard.column_schema.flat_columns(),
primary_key_columns: Vec::new(),
has_rowid: true,
is_strict: false,

View File

@@ -508,7 +508,8 @@ fn query_pragma(
emit_columns_for_table_info(&mut program, table.columns(), base_reg);
} else if let Some(view_mutex) = schema.get_materialized_view(&name) {
let view = view_mutex.lock().unwrap();
emit_columns_for_table_info(&mut program, &view.columns, base_reg);
let flat_columns = view.column_schema.flat_columns();
emit_columns_for_table_info(&mut program, &flat_columns, base_reg);
} else if let Some(view) = schema.get_view(&name) {
emit_columns_for_table_info(&mut program, &view.columns, base_reg);
}

View File

@@ -42,7 +42,8 @@ pub fn translate_create_materialized_view(
// storing invalid view definitions
use crate::incremental::view::IncrementalView;
use crate::schema::BTreeTable;
let view_columns = IncrementalView::validate_and_extract_columns(select_stmt, schema)?;
let view_column_schema = IncrementalView::validate_and_extract_columns(select_stmt, schema)?;
let view_columns = view_column_schema.flat_columns();
// Reconstruct the SQL string for storage
let sql = create_materialized_view_to_str(view_name, select_stmt);

View File

@@ -1066,9 +1066,59 @@ pub fn extract_column_name_from_expr(expr: impl AsRef<ast::Expr>) -> Option<Stri
}
}
/// Information about a table referenced in a view
#[derive(Debug, Clone)]
pub struct ViewTable {
/// The full table name (potentially including database qualifier like "main.customers")
pub name: String,
/// Optional alias (e.g., "c" in "FROM customers c")
pub alias: Option<String>,
}
/// Information about a column in the view's output
#[derive(Debug, Clone)]
pub struct ViewColumn {
/// Index into ViewColumnSchema.tables indicating which table this column comes from
/// For computed columns or constants, this will be usize::MAX
pub table_index: usize,
/// The actual column definition
pub column: Column,
}
/// Schema information for a view, tracking which columns come from which tables
#[derive(Debug, Clone)]
pub struct ViewColumnSchema {
/// All tables referenced by the view (in order of appearance)
pub tables: Vec<ViewTable>,
/// The view's output columns with their table associations
pub columns: Vec<ViewColumn>,
}
impl ViewColumnSchema {
/// Get all columns as a flat vector (without table association info)
pub fn flat_columns(&self) -> Vec<Column> {
self.columns.iter().map(|vc| vc.column.clone()).collect()
}
/// Get columns that belong to a specific table
pub fn table_columns(&self, table_index: usize) -> Vec<Column> {
self.columns
.iter()
.filter(|vc| vc.table_index == table_index)
.map(|vc| vc.column.clone())
.collect()
}
}
/// Extract column information from a SELECT statement for view creation
pub fn extract_view_columns(select_stmt: &ast::Select, schema: &Schema) -> Vec<Column> {
pub fn extract_view_columns(
select_stmt: &ast::Select,
schema: &Schema,
) -> Result<ViewColumnSchema> {
let mut tables = Vec::new();
let mut columns = Vec::new();
let mut column_name_counts: HashMap<String, usize> = HashMap::new();
// Navigate to the first SELECT in the statement
if let ast::OneSelect::Select {
ref from,
@@ -1076,23 +1126,85 @@ pub fn extract_view_columns(select_stmt: &ast::Select, schema: &Schema) -> Vec<C
..
} = &select_stmt.body.select
{
// First, we need to figure out which table(s) are being selected from
let table_name = if let Some(from) = from {
if let ast::SelectTable::Table(qualified_name, _, _) = from.select.as_ref() {
Some(normalize_ident(qualified_name.name.as_str()))
} else {
None
// First, extract all tables (from FROM clause and JOINs)
if let Some(from) = from {
// Add the main table from FROM clause
match from.select.as_ref() {
ast::SelectTable::Table(qualified_name, alias, _) => {
let table_name = if qualified_name.db_name.is_some() {
// Include database qualifier if present
qualified_name.to_string()
} else {
normalize_ident(qualified_name.name.as_str())
};
tables.push(ViewTable {
name: table_name.clone(),
alias: alias.as_ref().map(|a| match a {
ast::As::As(name) => normalize_ident(name.as_str()),
ast::As::Elided(name) => normalize_ident(name.as_str()),
}),
});
}
_ => {
// Handle other types like subqueries if needed
}
}
} else {
None
// Add tables from JOINs
for join in &from.joins {
match join.table.as_ref() {
ast::SelectTable::Table(qualified_name, alias, _) => {
let table_name = if qualified_name.db_name.is_some() {
// Include database qualifier if present
qualified_name.to_string()
} else {
normalize_ident(qualified_name.name.as_str())
};
tables.push(ViewTable {
name: table_name.clone(),
alias: alias.as_ref().map(|a| match a {
ast::As::As(name) => normalize_ident(name.as_str()),
ast::As::Elided(name) => normalize_ident(name.as_str()),
}),
});
}
_ => {
// Handle other types like subqueries if needed
}
}
}
}
// Helper function to find table index by name or alias
let find_table_index = |name: &str| -> Option<usize> {
tables
.iter()
.position(|t| t.name == name || t.alias.as_ref().is_some_and(|a| a == name))
};
// Get the table for column resolution
let _table = table_name.as_ref().and_then(|name| schema.get_table(name));
// Process each column in the SELECT list
for (i, result_col) in select_columns.iter().enumerate() {
for result_col in select_columns.iter() {
match result_col {
ast::ResultColumn::Expr(expr, alias) => {
let name = alias
// Figure out which table this expression comes from
let table_index = match expr.as_ref() {
ast::Expr::Qualified(table_ref, _col_name) => {
// Column qualified with table name
find_table_index(table_ref.as_str())
}
ast::Expr::Id(_col_name) => {
// Unqualified column - would need to resolve based on schema
// For now, assume it's from the first table if there is one
if !tables.is_empty() {
Some(0)
} else {
None
}
}
_ => None, // Expression, literal, etc.
};
let col_name = alias
.as_ref()
.map(|a| match a {
ast::As::Elided(name) => name.as_str().to_string(),
@@ -1103,41 +1215,65 @@ pub fn extract_view_columns(select_stmt: &ast::Select, schema: &Schema) -> Vec<C
// If we can't extract a simple column name, use the expression itself
expr.to_string()
});
columns.push(Column {
name: Some(name),
ty: Type::Text, // Default to TEXT, could be refined with type analysis
ty_str: "TEXT".to_string(),
primary_key: false, // Views don't have primary keys
is_rowid_alias: false,
notnull: false, // Views typically don't enforce NOT NULL
default: None, // Views don't have default values
unique: false,
collation: None,
hidden: false,
columns.push(ViewColumn {
table_index: table_index.unwrap_or(usize::MAX),
column: Column {
name: Some(col_name),
ty: Type::Text, // Default to TEXT, could be refined with type analysis
ty_str: "TEXT".to_string(),
primary_key: false,
is_rowid_alias: false,
notnull: false,
default: None,
unique: false,
collation: None,
hidden: false,
},
});
}
ast::ResultColumn::Star => {
// For SELECT *, expand to all columns from the table
if let Some(ref table_name) = table_name {
if let Some(table) = schema.get_table(table_name) {
// Copy all columns from the table, but adjust for view constraints
for table_column in table.columns() {
columns.push(Column {
name: table_column.name.clone(),
ty: table_column.ty,
ty_str: table_column.ty_str.clone(),
primary_key: false, // Views don't have primary keys
is_rowid_alias: false,
notnull: false, // Views typically don't enforce NOT NULL
default: None, // Views don't have default values
unique: false,
collation: table_column.collation,
hidden: false,
// For SELECT *, expand to all columns from all tables
for (table_idx, table) in tables.iter().enumerate() {
if let Some(table_obj) = schema.get_table(&table.name) {
for table_column in table_obj.columns() {
let col_name =
table_column.name.clone().unwrap_or_else(|| "?".to_string());
// Handle duplicate column names by adding suffix
let final_name =
if let Some(count) = column_name_counts.get_mut(&col_name) {
*count += 1;
format!("{}:{}", col_name, *count - 1)
} else {
column_name_counts.insert(col_name.clone(), 1);
col_name.clone()
};
columns.push(ViewColumn {
table_index: table_idx,
column: Column {
name: Some(final_name),
ty: table_column.ty,
ty_str: table_column.ty_str.clone(),
primary_key: false,
is_rowid_alias: false,
notnull: false,
default: None,
unique: false,
collation: table_column.collation,
hidden: false,
},
});
}
} else {
// Table not found, create placeholder
columns.push(Column {
}
}
// If no tables, create a placeholder
if tables.is_empty() {
columns.push(ViewColumn {
table_index: usize::MAX,
column: Column {
name: Some("*".to_string()),
ty: Type::Text,
ty_str: "TEXT".to_string(),
@@ -1148,63 +1284,70 @@ pub fn extract_view_columns(select_stmt: &ast::Select, schema: &Schema) -> Vec<C
unique: false,
collation: None,
hidden: false,
});
}
} else {
// No FROM clause or couldn't determine table, create placeholder
columns.push(Column {
name: Some("*".to_string()),
ty: Type::Text,
ty_str: "TEXT".to_string(),
primary_key: false,
is_rowid_alias: false,
notnull: false,
default: None,
unique: false,
collation: None,
hidden: false,
},
});
}
}
ast::ResultColumn::TableStar(table_name) => {
ast::ResultColumn::TableStar(table_ref) => {
// For table.*, expand to all columns from the specified table
let table_name_str = normalize_ident(table_name.as_str());
if let Some(table) = schema.get_table(&table_name_str) {
// Copy all columns from the table, but adjust for view constraints
for table_column in table.columns() {
columns.push(Column {
name: table_column.name.clone(),
ty: table_column.ty,
ty_str: table_column.ty_str.clone(),
primary_key: false,
is_rowid_alias: false,
notnull: false,
default: None,
unique: false,
collation: table_column.collation,
hidden: false,
let table_name_str = normalize_ident(table_ref.as_str());
if let Some(table_idx) = find_table_index(&table_name_str) {
if let Some(table) = schema.get_table(&tables[table_idx].name) {
for table_column in table.columns() {
let col_name =
table_column.name.clone().unwrap_or_else(|| "?".to_string());
// Handle duplicate column names by adding suffix
let final_name =
if let Some(count) = column_name_counts.get_mut(&col_name) {
*count += 1;
format!("{}:{}", col_name, *count - 1)
} else {
column_name_counts.insert(col_name.clone(), 1);
col_name.clone()
};
columns.push(ViewColumn {
table_index: table_idx,
column: Column {
name: Some(final_name),
ty: table_column.ty,
ty_str: table_column.ty_str.clone(),
primary_key: false,
is_rowid_alias: false,
notnull: false,
default: None,
unique: false,
collation: table_column.collation,
hidden: false,
},
});
}
} else {
// Table not found, create placeholder
columns.push(ViewColumn {
table_index: usize::MAX,
column: Column {
name: Some(format!("{table_name_str}.*")),
ty: Type::Text,
ty_str: "TEXT".to_string(),
primary_key: false,
is_rowid_alias: false,
notnull: false,
default: None,
unique: false,
collation: None,
hidden: false,
},
});
}
} else {
// Table not found, create placeholder
columns.push(Column {
name: Some(format!("{table_name_str}.*")),
ty: Type::Text,
ty_str: "TEXT".to_string(),
primary_key: false,
is_rowid_alias: false,
notnull: false,
default: None,
unique: false,
collation: None,
hidden: false,
});
}
}
}
}
}
columns
Ok(ViewColumnSchema { tables, columns })
}
#[cfg(test)]

View File

@@ -749,7 +749,7 @@ impl<'a> Parser<'a> {
fn parse_create_materialized_view(&mut self) -> Result<Stmt> {
eat_assert!(self, TK_MATERIALIZED);
eat_assert!(self, TK_VIEW);
eat_expect!(self, TK_VIEW);
let if_not_exists = self.parse_if_not_exists()?;
let view_name = self.parse_fullname(false)?;
let columns = self.parse_eid_list(false)?;

View File

@@ -44,13 +44,13 @@ do_execsql_test_on_specific_db {:memory:} matview-aggregation-population {
do_execsql_test_on_specific_db {:memory:} matview-filter-with-groupby {
CREATE TABLE t(a INTEGER, b INTEGER);
INSERT INTO t(a,b) VALUES (2,2), (3,3), (6,6), (7,7);
CREATE MATERIALIZED VIEW v AS
SELECT b as yourb, SUM(a) as mysum, COUNT(a) as mycount
FROM t
WHERE b > 2
GROUP BY b;
SELECT * FROM v ORDER BY yourb;
} {3|3|1
6|6|1
@@ -63,13 +63,13 @@ do_execsql_test_on_specific_db {:memory:} matview-insert-maintenance {
FROM t
WHERE b > 2
GROUP BY b;
INSERT INTO t VALUES (3,3), (6,6);
SELECT * FROM v ORDER BY b;
INSERT INTO t VALUES (4,3), (5,6);
SELECT * FROM v ORDER BY b;
INSERT INTO t VALUES (1,1), (2,2);
SELECT * FROM v ORDER BY b;
} {3|3|1
@@ -87,17 +87,17 @@ do_execsql_test_on_specific_db {:memory:} matview-delete-maintenance {
(3, 'A', 30),
(4, 'B', 40),
(5, 'A', 50);
CREATE MATERIALIZED VIEW category_sums AS
SELECT category, SUM(amount) as total, COUNT(*) as cnt
FROM items
GROUP BY category;
SELECT * FROM category_sums ORDER BY category;
DELETE FROM items WHERE id = 3;
SELECT * FROM category_sums ORDER BY category;
DELETE FROM items WHERE category = 'B';
SELECT * FROM category_sums ORDER BY category;
} {A|90|3
@@ -113,17 +113,17 @@ do_execsql_test_on_specific_db {:memory:} matview-update-maintenance {
(2, 200, 2),
(3, 300, 1),
(4, 400, 2);
CREATE MATERIALIZED VIEW status_totals AS
SELECT status, SUM(value) as total, COUNT(*) as cnt
FROM records
GROUP BY status;
SELECT * FROM status_totals ORDER BY status;
UPDATE records SET value = 150 WHERE id = 1;
SELECT * FROM status_totals ORDER BY status;
UPDATE records SET status = 2 WHERE id = 3;
SELECT * FROM status_totals ORDER BY status;
} {1|400|2
@@ -136,10 +136,10 @@ do_execsql_test_on_specific_db {:memory:} matview-update-maintenance {
do_execsql_test_on_specific_db {:memory:} matview-integer-primary-key-basic {
CREATE TABLE t(a INTEGER PRIMARY KEY, b INTEGER);
INSERT INTO t(a,b) VALUES (2,2), (3,3), (6,6), (7,7);
CREATE MATERIALIZED VIEW v AS
SELECT * FROM t WHERE b > 2;
SELECT * FROM v ORDER BY a;
} {3|3
6|6
@@ -148,15 +148,15 @@ do_execsql_test_on_specific_db {:memory:} matview-integer-primary-key-basic {
do_execsql_test_on_specific_db {:memory:} matview-integer-primary-key-update-rowid {
CREATE TABLE t(a INTEGER PRIMARY KEY, b INTEGER);
INSERT INTO t(a,b) VALUES (2,2), (3,3), (6,6), (7,7);
CREATE MATERIALIZED VIEW v AS
SELECT * FROM t WHERE b > 2;
SELECT * FROM v ORDER BY a;
UPDATE t SET a = 1 WHERE b = 3;
SELECT * FROM v ORDER BY a;
UPDATE t SET a = 10 WHERE a = 6;
SELECT * FROM v ORDER BY a;
} {3|3
@@ -172,15 +172,15 @@ do_execsql_test_on_specific_db {:memory:} matview-integer-primary-key-update-row
do_execsql_test_on_specific_db {:memory:} matview-integer-primary-key-update-value {
CREATE TABLE t(a INTEGER PRIMARY KEY, b INTEGER);
INSERT INTO t(a,b) VALUES (2,2), (3,3), (6,6), (7,7);
CREATE MATERIALIZED VIEW v AS
SELECT * FROM t WHERE b > 2;
SELECT * FROM v ORDER BY a;
UPDATE t SET b = 1 WHERE a = 6;
SELECT * FROM v ORDER BY a;
UPDATE t SET b = 5 WHERE a = 2;
SELECT * FROM v ORDER BY a;
} {3|3
@@ -200,18 +200,18 @@ do_execsql_test_on_specific_db {:memory:} matview-integer-primary-key-with-aggre
(3, 20, 300),
(4, 20, 400),
(5, 10, 500);
CREATE MATERIALIZED VIEW v AS
SELECT b, SUM(c) as total, COUNT(*) as cnt
FROM t
WHERE a > 2
GROUP BY b;
SELECT * FROM v ORDER BY b;
UPDATE t SET a = 6 WHERE a = 1;
SELECT * FROM v ORDER BY b;
DELETE FROM t WHERE a = 3;
SELECT * FROM v ORDER BY b;
} {10|500|1
@@ -228,7 +228,7 @@ do_execsql_test_on_specific_db {:memory:} matview-complex-filter-aggregation {
amount INTEGER,
type INTEGER
);
INSERT INTO transactions VALUES
(1, 100, 50, 1),
(2, 100, 30, 2),
@@ -236,21 +236,21 @@ do_execsql_test_on_specific_db {:memory:} matview-complex-filter-aggregation {
(4, 100, 20, 1),
(5, 200, 40, 2),
(6, 300, 60, 1);
CREATE MATERIALIZED VIEW account_deposits AS
SELECT account, SUM(amount) as total_deposits, COUNT(*) as deposit_count
FROM transactions
WHERE type = 1
GROUP BY account;
SELECT * FROM account_deposits ORDER BY account;
INSERT INTO transactions VALUES (7, 100, 25, 1);
SELECT * FROM account_deposits ORDER BY account;
UPDATE transactions SET amount = 80 WHERE id = 1;
SELECT * FROM account_deposits ORDER BY account;
DELETE FROM transactions WHERE id = 3;
SELECT * FROM account_deposits ORDER BY account;
} {100|70|2
@@ -273,19 +273,19 @@ do_execsql_test_on_specific_db {:memory:} matview-sum-count-only {
(3, 30, 2),
(4, 40, 2),
(5, 50, 1);
CREATE MATERIALIZED VIEW category_stats AS
SELECT category,
SUM(value) as sum_val,
COUNT(*) as cnt
FROM data
GROUP BY category;
SELECT * FROM category_stats ORDER BY category;
INSERT INTO data VALUES (6, 5, 1);
SELECT * FROM category_stats ORDER BY category;
UPDATE data SET value = 35 WHERE id = 3;
SELECT * FROM category_stats ORDER BY category;
} {1|80|3
@@ -302,9 +302,9 @@ do_execsql_test_on_specific_db {:memory:} matview-empty-table-population {
FROM t
WHERE b > 5
GROUP BY b;
SELECT COUNT(*) FROM v;
INSERT INTO t VALUES (1, 3), (2, 7), (3, 9);
SELECT * FROM v ORDER BY b;
} {0
@@ -314,15 +314,15 @@ do_execsql_test_on_specific_db {:memory:} matview-empty-table-population {
do_execsql_test_on_specific_db {:memory:} matview-all-rows-filtered {
CREATE TABLE t(a INTEGER, b INTEGER);
INSERT INTO t VALUES (1, 1), (2, 2), (3, 3);
CREATE MATERIALIZED VIEW v AS
SELECT * FROM t WHERE b > 10;
SELECT COUNT(*) FROM v;
INSERT INTO t VALUES (11, 11);
SELECT * FROM v;
UPDATE t SET b = 1 WHERE a = 11;
SELECT COUNT(*) FROM v;
} {0
@@ -335,26 +335,26 @@ do_execsql_test_on_specific_db {:memory:} matview-mixed-operations-sequence {
customer_id INTEGER,
amount INTEGER
);
INSERT INTO orders VALUES (1, 100, 50);
INSERT INTO orders VALUES (2, 200, 75);
CREATE MATERIALIZED VIEW customer_totals AS
SELECT customer_id, SUM(amount) as total, COUNT(*) as order_count
FROM orders
GROUP BY customer_id;
SELECT * FROM customer_totals ORDER BY customer_id;
INSERT INTO orders VALUES (3, 100, 25);
SELECT * FROM customer_totals ORDER BY customer_id;
UPDATE orders SET amount = 100 WHERE order_id = 2;
SELECT * FROM customer_totals ORDER BY customer_id;
DELETE FROM orders WHERE order_id = 1;
SELECT * FROM customer_totals ORDER BY customer_id;
INSERT INTO orders VALUES (4, 300, 150);
SELECT * FROM customer_totals ORDER BY customer_id;
} {100|50|1
@@ -389,17 +389,17 @@ do_execsql_test_on_specific_db {:memory:} matview-projections {
do_execsql_test_on_specific_db {:memory:} matview-rollback-insert {
CREATE TABLE t(a INTEGER, b INTEGER);
INSERT INTO t VALUES (1, 10), (2, 20), (3, 30);
CREATE MATERIALIZED VIEW v AS
SELECT * FROM t WHERE b > 15;
SELECT * FROM v ORDER BY a;
BEGIN;
INSERT INTO t VALUES (4, 40), (5, 50);
SELECT * FROM v ORDER BY a;
ROLLBACK;
SELECT * FROM v ORDER BY a;
} {2|20
3|30
@@ -413,17 +413,17 @@ do_execsql_test_on_specific_db {:memory:} matview-rollback-insert {
do_execsql_test_on_specific_db {:memory:} matview-rollback-delete {
CREATE TABLE t(a INTEGER, b INTEGER);
INSERT INTO t VALUES (1, 10), (2, 20), (3, 30), (4, 40);
CREATE MATERIALIZED VIEW v AS
SELECT * FROM t WHERE b > 15;
SELECT * FROM v ORDER BY a;
BEGIN;
DELETE FROM t WHERE a IN (2, 3);
SELECT * FROM v ORDER BY a;
ROLLBACK;
SELECT * FROM v ORDER BY a;
} {2|20
3|30
@@ -436,18 +436,18 @@ do_execsql_test_on_specific_db {:memory:} matview-rollback-delete {
do_execsql_test_on_specific_db {:memory:} matview-rollback-update {
CREATE TABLE t(a INTEGER, b INTEGER);
INSERT INTO t VALUES (1, 10), (2, 20), (3, 30);
CREATE MATERIALIZED VIEW v AS
SELECT * FROM t WHERE b > 15;
SELECT * FROM v ORDER BY a;
BEGIN;
UPDATE t SET b = 5 WHERE a = 2;
UPDATE t SET b = 35 WHERE a = 1;
SELECT * FROM v ORDER BY a;
ROLLBACK;
SELECT * FROM v ORDER BY a;
} {2|20
3|30
@@ -459,19 +459,19 @@ do_execsql_test_on_specific_db {:memory:} matview-rollback-update {
do_execsql_test_on_specific_db {:memory:} matview-rollback-aggregation {
CREATE TABLE sales(product_id INTEGER, amount INTEGER);
INSERT INTO sales VALUES (1, 100), (1, 200), (2, 150), (2, 250);
CREATE MATERIALIZED VIEW product_totals AS
SELECT product_id, SUM(amount) as total, COUNT(*) as cnt
FROM sales
GROUP BY product_id;
SELECT * FROM product_totals ORDER BY product_id;
BEGIN;
INSERT INTO sales VALUES (1, 50), (3, 300);
SELECT * FROM product_totals ORDER BY product_id;
ROLLBACK;
SELECT * FROM product_totals ORDER BY product_id;
} {1|300|2
2|400|2
@@ -484,21 +484,21 @@ do_execsql_test_on_specific_db {:memory:} matview-rollback-aggregation {
do_execsql_test_on_specific_db {:memory:} matview-rollback-mixed-operations {
CREATE TABLE orders(id INTEGER PRIMARY KEY, customer INTEGER, amount INTEGER);
INSERT INTO orders VALUES (1, 100, 50), (2, 200, 75), (3, 100, 25);
CREATE MATERIALIZED VIEW customer_totals AS
SELECT customer, SUM(amount) as total, COUNT(*) as cnt
FROM orders
GROUP BY customer;
SELECT * FROM customer_totals ORDER BY customer;
BEGIN;
INSERT INTO orders VALUES (4, 100, 100);
UPDATE orders SET amount = 150 WHERE id = 2;
DELETE FROM orders WHERE id = 3;
SELECT * FROM customer_totals ORDER BY customer;
ROLLBACK;
SELECT * FROM customer_totals ORDER BY customer;
} {100|75|2
200|75|1
@@ -514,22 +514,22 @@ do_execsql_test_on_specific_db {:memory:} matview-rollback-filtered-aggregation
(2, 100, 30, 'withdraw'),
(3, 200, 100, 'deposit'),
(4, 200, 40, 'withdraw');
CREATE MATERIALIZED VIEW deposits AS
SELECT account, SUM(amount) as total_deposits, COUNT(*) as cnt
FROM transactions
WHERE type = 'deposit'
GROUP BY account;
SELECT * FROM deposits ORDER BY account;
BEGIN;
INSERT INTO transactions VALUES (5, 100, 75, 'deposit');
UPDATE transactions SET amount = 60 WHERE id = 1;
DELETE FROM transactions WHERE id = 3;
SELECT * FROM deposits ORDER BY account;
ROLLBACK;
SELECT * FROM deposits ORDER BY account;
} {100|50|1
200|100|1
@@ -540,12 +540,12 @@ do_execsql_test_on_specific_db {:memory:} matview-rollback-filtered-aggregation
do_execsql_test_on_specific_db {:memory:} matview-rollback-empty-view {
CREATE TABLE t(a INTEGER, b INTEGER);
INSERT INTO t VALUES (1, 5), (2, 8);
CREATE MATERIALIZED VIEW v AS
SELECT * FROM t WHERE b > 10;
SELECT COUNT(*) FROM v;
BEGIN;
INSERT INTO t VALUES (3, 15), (4, 20);
SELECT * FROM v ORDER BY a;
@@ -556,3 +556,538 @@ do_execsql_test_on_specific_db {:memory:} matview-rollback-empty-view {
3|15
4|20
0}
# Join tests for materialized views
do_execsql_test_on_specific_db {:memory:} matview-simple-join {
CREATE TABLE users(id INTEGER PRIMARY KEY, name TEXT, age INTEGER);
CREATE TABLE orders(order_id INTEGER PRIMARY KEY, user_id INTEGER, product_id INTEGER, quantity INTEGER);
INSERT INTO users VALUES (1, 'Alice', 25), (2, 'Bob', 30), (3, 'Charlie', 35);
INSERT INTO orders VALUES (1, 1, 100, 5), (2, 1, 101, 3), (3, 2, 100, 7);
CREATE MATERIALIZED VIEW user_orders AS
SELECT u.name, o.quantity
FROM users u
JOIN orders o ON u.id = o.user_id;
SELECT * FROM user_orders ORDER BY name, quantity;
} {Alice|3
Alice|5
Bob|7}
do_execsql_test_on_specific_db {:memory:} matview-join-with-aggregation {
CREATE TABLE users(id INTEGER PRIMARY KEY, name TEXT);
CREATE TABLE orders(order_id INTEGER PRIMARY KEY, user_id INTEGER, amount INTEGER);
INSERT INTO users VALUES (1, 'Alice'), (2, 'Bob');
INSERT INTO orders VALUES (1, 1, 100), (2, 1, 150), (3, 2, 200), (4, 2, 50);
CREATE MATERIALIZED VIEW user_totals AS
SELECT u.name, SUM(o.amount) as total_amount
FROM users u
JOIN orders o ON u.id = o.user_id
GROUP BY u.name;
SELECT * FROM user_totals ORDER BY name;
} {Alice|250
Bob|250}
do_execsql_test_on_specific_db {:memory:} matview-three-way-join {
CREATE TABLE customers(id INTEGER PRIMARY KEY, name TEXT, city TEXT);
CREATE TABLE orders(id INTEGER PRIMARY KEY, customer_id INTEGER, product_id INTEGER, quantity INTEGER);
CREATE TABLE products(id INTEGER PRIMARY KEY, name TEXT, price INTEGER);
INSERT INTO customers VALUES (1, 'Alice', 'NYC'), (2, 'Bob', 'LA');
INSERT INTO products VALUES (1, 'Widget', 10), (2, 'Gadget', 20);
INSERT INTO orders VALUES (1, 1, 1, 5), (2, 1, 2, 3), (3, 2, 1, 2);
CREATE MATERIALIZED VIEW sales_summary AS
SELECT c.name as customer_name, p.name as product_name, o.quantity
FROM customers c
JOIN orders o ON c.id = o.customer_id
JOIN products p ON o.product_id = p.id;
SELECT * FROM sales_summary ORDER BY customer_name, product_name;
} {Alice|Gadget|3
Alice|Widget|5
Bob|Widget|2}
do_execsql_test_on_specific_db {:memory:} matview-three-way-join-with-aggregation {
CREATE TABLE customers(id INTEGER PRIMARY KEY, name TEXT);
CREATE TABLE orders(id INTEGER PRIMARY KEY, customer_id INTEGER, product_id INTEGER, quantity INTEGER);
CREATE TABLE products(id INTEGER PRIMARY KEY, name TEXT, price INTEGER);
INSERT INTO customers VALUES (1, 'Alice'), (2, 'Bob');
INSERT INTO products VALUES (1, 'Widget', 10), (2, 'Gadget', 20);
INSERT INTO orders VALUES (1, 1, 1, 5), (2, 1, 2, 3), (3, 2, 1, 2), (4, 1, 1, 4);
CREATE MATERIALIZED VIEW sales_totals AS
SELECT c.name as customer_name, p.name as product_name,
SUM(o.quantity) as total_quantity,
SUM(o.quantity * p.price) as total_value
FROM customers c
JOIN orders o ON c.id = o.customer_id
JOIN products p ON o.product_id = p.id
GROUP BY c.name, p.name;
SELECT * FROM sales_totals ORDER BY customer_name, product_name;
} {Alice|Gadget|3|60
Alice|Widget|9|90
Bob|Widget|2|20}
do_execsql_test_on_specific_db {:memory:} matview-join-incremental-insert {
CREATE TABLE users(id INTEGER PRIMARY KEY, name TEXT);
CREATE TABLE orders(order_id INTEGER PRIMARY KEY, user_id INTEGER, amount INTEGER);
INSERT INTO users VALUES (1, 'Alice');
INSERT INTO orders VALUES (1, 1, 100);
CREATE MATERIALIZED VIEW user_orders AS
SELECT u.name, o.amount
FROM users u
JOIN orders o ON u.id = o.user_id;
SELECT COUNT(*) FROM user_orders;
INSERT INTO orders VALUES (2, 1, 150);
SELECT COUNT(*) FROM user_orders;
INSERT INTO users VALUES (2, 'Bob');
INSERT INTO orders VALUES (3, 2, 200);
SELECT COUNT(*) FROM user_orders;
} {1
2
3}
do_execsql_test_on_specific_db {:memory:} matview-join-incremental-delete {
CREATE TABLE users(id INTEGER PRIMARY KEY, name TEXT);
CREATE TABLE orders(order_id INTEGER PRIMARY KEY, user_id INTEGER, amount INTEGER);
INSERT INTO users VALUES (1, 'Alice'), (2, 'Bob');
INSERT INTO orders VALUES (1, 1, 100), (2, 1, 150), (3, 2, 200);
CREATE MATERIALIZED VIEW user_orders AS
SELECT u.name, o.amount
FROM users u
JOIN orders o ON u.id = o.user_id;
SELECT COUNT(*) FROM user_orders;
DELETE FROM orders WHERE order_id = 2;
SELECT COUNT(*) FROM user_orders;
DELETE FROM users WHERE id = 2;
SELECT COUNT(*) FROM user_orders;
} {3
2
1}
do_execsql_test_on_specific_db {:memory:} matview-join-incremental-update {
CREATE TABLE users(id INTEGER PRIMARY KEY, name TEXT);
CREATE TABLE orders(order_id INTEGER PRIMARY KEY, user_id INTEGER, amount INTEGER);
INSERT INTO users VALUES (1, 'Alice'), (2, 'Bob');
INSERT INTO orders VALUES (1, 1, 100), (2, 2, 200);
CREATE MATERIALIZED VIEW user_orders AS
SELECT u.name, o.amount
FROM users u
JOIN orders o ON u.id = o.user_id;
SELECT * FROM user_orders ORDER BY name;
UPDATE orders SET amount = 150 WHERE order_id = 1;
SELECT * FROM user_orders ORDER BY name;
UPDATE users SET name = 'Robert' WHERE id = 2;
SELECT * FROM user_orders ORDER BY name;
} {Alice|100
Bob|200
Alice|150
Bob|200
Alice|150
Robert|200}
do_execsql_test_on_specific_db {:memory:} matview-join-with-filter {
CREATE TABLE users(id INTEGER PRIMARY KEY, name TEXT, age INTEGER);
CREATE TABLE orders(order_id INTEGER PRIMARY KEY, user_id INTEGER, amount INTEGER);
INSERT INTO users VALUES (1, 'Alice', 25), (2, 'Bob', 35), (3, 'Charlie', 20);
INSERT INTO orders VALUES (1, 1, 100), (2, 2, 200), (3, 3, 150);
CREATE MATERIALIZED VIEW adult_orders AS
SELECT u.name, o.amount
FROM users u
JOIN orders o ON u.id = o.user_id
WHERE u.age > 21;
SELECT * FROM adult_orders ORDER BY name;
} {Alice|100
Bob|200}
do_execsql_test_on_specific_db {:memory:} matview-join-rollback {
CREATE TABLE users(id INTEGER PRIMARY KEY, name TEXT);
CREATE TABLE orders(order_id INTEGER PRIMARY KEY, user_id INTEGER, amount INTEGER);
INSERT INTO users VALUES (1, 'Alice'), (2, 'Bob');
INSERT INTO orders VALUES (1, 1, 100), (2, 2, 200);
CREATE MATERIALIZED VIEW user_orders AS
SELECT u.name, o.amount
FROM users u
JOIN orders o ON u.id = o.user_id;
SELECT COUNT(*) FROM user_orders;
BEGIN;
INSERT INTO users VALUES (3, 'Charlie');
INSERT INTO orders VALUES (3, 3, 300);
SELECT COUNT(*) FROM user_orders;
ROLLBACK;
SELECT COUNT(*) FROM user_orders;
} {2
3
2}
# ===== COMPREHENSIVE JOIN TESTS =====
# Test 1: Join with filter BEFORE the join (on base tables)
do_execsql_test_on_specific_db {:memory:} matview-join-with-pre-filter {
CREATE TABLE employees(id INTEGER PRIMARY KEY, name TEXT, department TEXT, salary INTEGER);
CREATE TABLE departments(id INTEGER PRIMARY KEY, dept_name TEXT, budget INTEGER);
INSERT INTO employees VALUES
(1, 'Alice', 'Engineering', 80000),
(2, 'Bob', 'Engineering', 90000),
(3, 'Charlie', 'Sales', 60000),
(4, 'David', 'Sales', 65000),
(5, 'Eve', 'HR', 70000);
INSERT INTO departments VALUES
(1, 'Engineering', 500000),
(2, 'Sales', 300000),
(3, 'HR', 200000);
-- View: Join only high-salary employees with their departments
CREATE MATERIALIZED VIEW high_earners_by_dept AS
SELECT e.name, e.salary, d.dept_name, d.budget
FROM employees e
JOIN departments d ON e.department = d.dept_name
WHERE e.salary > 70000;
SELECT * FROM high_earners_by_dept ORDER BY salary DESC;
} {Bob|90000|Engineering|500000
Alice|80000|Engineering|500000}
# Test 2: Join with filter AFTER the join
do_execsql_test_on_specific_db {:memory:} matview-join-with-post-filter {
CREATE TABLE products(id INTEGER PRIMARY KEY, name TEXT, category_id INTEGER, price INTEGER);
CREATE TABLE categories(id INTEGER PRIMARY KEY, name TEXT, min_price INTEGER);
INSERT INTO products VALUES
(1, 'Laptop', 1, 1200),
(2, 'Mouse', 1, 25),
(3, 'Shirt', 2, 50),
(4, 'Shoes', 2, 120);
INSERT INTO categories VALUES
(1, 'Electronics', 100),
(2, 'Clothing', 30);
-- View: Products that meet or exceed their category's minimum price
CREATE MATERIALIZED VIEW premium_products AS
SELECT p.name as product, c.name as category, p.price, c.min_price
FROM products p
JOIN categories c ON p.category_id = c.id
WHERE p.price >= c.min_price;
SELECT * FROM premium_products ORDER BY price DESC;
} {Laptop|Electronics|1200|100
Shoes|Clothing|120|30
Shirt|Clothing|50|30}
# Test 3: Join with aggregation BEFORE the join
do_execsql_test_on_specific_db {:memory:} matview-aggregation-before-join {
CREATE TABLE orders(id INTEGER PRIMARY KEY, customer_id INTEGER, product_id INTEGER, quantity INTEGER, order_date INTEGER);
CREATE TABLE customers(id INTEGER PRIMARY KEY, name TEXT, tier TEXT);
INSERT INTO orders VALUES
(1, 1, 101, 2, 1),
(2, 1, 102, 1, 1),
(3, 2, 101, 5, 1),
(4, 1, 101, 3, 2),
(5, 2, 103, 2, 2),
(6, 3, 102, 1, 2);
INSERT INTO customers VALUES
(1, 'Alice', 'Gold'),
(2, 'Bob', 'Silver'),
(3, 'Charlie', 'Bronze');
-- View: Customer order counts joined with customer details
-- Note: Simplified to avoid subquery issues with DBSP compiler
CREATE MATERIALIZED VIEW customer_order_summary AS
SELECT c.name, c.tier, COUNT(o.id) as order_count, SUM(o.quantity) as total_quantity
FROM customers c
JOIN orders o ON c.id = o.customer_id
GROUP BY c.id, c.name, c.tier;
SELECT * FROM customer_order_summary ORDER BY total_quantity DESC;
} {Bob|Silver|2|7
Alice|Gold|3|6
Charlie|Bronze|1|1}
# Test 4: Join with aggregation AFTER the join
do_execsql_test_on_specific_db {:memory:} matview-aggregation-after-join {
CREATE TABLE sales(id INTEGER PRIMARY KEY, product_id INTEGER, store_id INTEGER, units_sold INTEGER, revenue INTEGER);
CREATE TABLE stores(id INTEGER PRIMARY KEY, name TEXT, region TEXT);
INSERT INTO sales VALUES
(1, 1, 1, 10, 1000),
(2, 1, 2, 15, 1500),
(3, 2, 1, 5, 250),
(4, 2, 2, 8, 400),
(5, 1, 3, 12, 1200),
(6, 2, 3, 6, 300);
INSERT INTO stores VALUES
(1, 'StoreA', 'North'),
(2, 'StoreB', 'North'),
(3, 'StoreC', 'South');
-- View: Regional sales summary (aggregate after joining)
CREATE MATERIALIZED VIEW regional_sales AS
SELECT st.region, SUM(s.units_sold) as total_units, SUM(s.revenue) as total_revenue
FROM sales s
JOIN stores st ON s.store_id = st.id
GROUP BY st.region;
SELECT * FROM regional_sales ORDER BY total_revenue DESC;
} {North|38|3150
South|18|1500}
# Test 5: Modifying both tables in same transaction
do_execsql_test_on_specific_db {:memory:} matview-join-both-tables-modified {
CREATE TABLE authors(id INTEGER PRIMARY KEY, name TEXT);
CREATE TABLE books(id INTEGER PRIMARY KEY, title TEXT, author_id INTEGER, year INTEGER);
INSERT INTO authors VALUES (1, 'Orwell'), (2, 'Asimov');
INSERT INTO books VALUES (1, '1984', 1, 1949), (2, 'Foundation', 2, 1951);
CREATE MATERIALIZED VIEW author_books AS
SELECT a.name, b.title, b.year
FROM authors a
JOIN books b ON a.id = b.author_id;
SELECT COUNT(*) FROM author_books;
BEGIN;
INSERT INTO authors VALUES (3, 'Herbert');
INSERT INTO books VALUES (3, 'Dune', 3, 1965);
SELECT COUNT(*) FROM author_books;
COMMIT;
SELECT * FROM author_books ORDER BY year;
} {2
3
Orwell|1984|1949
Asimov|Foundation|1951
Herbert|Dune|1965}
# Test 6: Modifying only one table in transaction
do_execsql_test_on_specific_db {:memory:} matview-join-single-table-modified {
CREATE TABLE users(id INTEGER PRIMARY KEY, name TEXT, active INTEGER);
CREATE TABLE posts(id INTEGER PRIMARY KEY, user_id INTEGER, content TEXT);
INSERT INTO users VALUES (1, 'Alice', 1), (2, 'Bob', 1), (3, 'Charlie', 0);
INSERT INTO posts VALUES (1, 1, 'Hello'), (2, 1, 'World'), (3, 2, 'Test');
CREATE MATERIALIZED VIEW active_user_posts AS
SELECT u.name, p.content
FROM users u
JOIN posts p ON u.id = p.user_id
WHERE u.active = 1;
SELECT COUNT(*) FROM active_user_posts;
-- Add posts for existing user (modify only posts table)
BEGIN;
INSERT INTO posts VALUES (4, 1, 'NewPost'), (5, 2, 'Another');
SELECT COUNT(*) FROM active_user_posts;
COMMIT;
SELECT * FROM active_user_posts ORDER BY name, content;
} {3
5
Alice|Hello
Alice|NewPost
Alice|World
Bob|Another
Bob|Test}
do_execsql_test_on_specific_db {:memory:} matview-three-way-incremental {
CREATE TABLE students(id INTEGER PRIMARY KEY, name TEXT, major TEXT);
CREATE TABLE courses(id INTEGER PRIMARY KEY, name TEXT, department TEXT, credits INTEGER);
CREATE TABLE enrollments(student_id INTEGER, course_id INTEGER, grade TEXT, PRIMARY KEY(student_id, course_id));
INSERT INTO students VALUES (1, 'Alice', 'CS'), (2, 'Bob', 'Math');
INSERT INTO courses VALUES (1, 'DatabaseSystems', 'CS', 3), (2, 'Calculus', 'Math', 4);
INSERT INTO enrollments VALUES (1, 1, 'A'), (2, 2, 'B');
CREATE MATERIALIZED VIEW student_transcripts AS
SELECT s.name as student, c.name as course, c.credits, e.grade
FROM students s
JOIN enrollments e ON s.id = e.student_id
JOIN courses c ON e.course_id = c.id;
SELECT COUNT(*) FROM student_transcripts;
-- Add new student
INSERT INTO students VALUES (3, 'Charlie', 'CS');
SELECT COUNT(*) FROM student_transcripts;
-- Enroll new student
INSERT INTO enrollments VALUES (3, 1, 'A'), (3, 2, 'A');
SELECT COUNT(*) FROM student_transcripts;
-- Add new course
INSERT INTO courses VALUES (3, 'Algorithms', 'CS', 3);
SELECT COUNT(*) FROM student_transcripts;
-- Enroll existing students in new course
INSERT INTO enrollments VALUES (1, 3, 'B'), (3, 3, 'A');
SELECT COUNT(*) FROM student_transcripts;
SELECT * FROM student_transcripts ORDER BY student, course;
} {2
2
4
4
6
Alice|Algorithms|3|B
Alice|DatabaseSystems|3|A
Bob|Calculus|4|B
Charlie|Algorithms|3|A
Charlie|Calculus|4|A
Charlie|DatabaseSystems|3|A}
do_execsql_test_on_specific_db {:memory:} matview-self-join {
CREATE TABLE employees(id INTEGER PRIMARY KEY, name TEXT, manager_id INTEGER, salary INTEGER);
INSERT INTO employees VALUES
(1, 'CEO', NULL, 150000),
(2, 'VPSales', 1, 120000),
(3, 'VPEngineering', 1, 130000),
(4, 'Engineer1', 3, 90000),
(5, 'Engineer2', 3, 85000),
(6, 'SalesRep', 2, 70000);
CREATE MATERIALIZED VIEW org_chart AS
SELECT e.name as employee, m.name as manager, e.salary
FROM employees e
JOIN employees m ON e.manager_id = m.id;
SELECT * FROM org_chart ORDER BY salary DESC;
} {VPEngineering|CEO|130000
VPSales|CEO|120000
Engineer1|VPEngineering|90000
Engineer2|VPEngineering|85000
SalesRep|VPSales|70000}
do_execsql_test_on_specific_db {:memory:} matview-join-cascade-update {
CREATE TABLE categories(id INTEGER PRIMARY KEY, name TEXT, discount_rate INTEGER);
CREATE TABLE products(id INTEGER PRIMARY KEY, name TEXT, category_id INTEGER, base_price INTEGER);
INSERT INTO categories VALUES (1, 'Electronics', 10), (2, 'Books', 5);
INSERT INTO products VALUES
(1, 'Laptop', 1, 1000),
(2, 'Phone', 1, 500),
(3, 'Novel', 2, 20),
(4, 'Textbook', 2, 80);
CREATE MATERIALIZED VIEW discounted_prices AS
SELECT p.name as product, c.name as category,
p.base_price, c.discount_rate,
(p.base_price * (100 - c.discount_rate) / 100) as final_price
FROM products p
JOIN categories c ON p.category_id = c.id;
SELECT * FROM discounted_prices ORDER BY final_price DESC;
-- Update discount rate for Electronics
UPDATE categories SET discount_rate = 20 WHERE id = 1;
SELECT * FROM discounted_prices ORDER BY final_price DESC;
} {Laptop|Electronics|1000|10|900
Phone|Electronics|500|10|450
Textbook|Books|80|5|76
Novel|Books|20|5|19
Laptop|Electronics|1000|20|800
Phone|Electronics|500|20|400
Textbook|Books|80|5|76
Novel|Books|20|5|19}
do_execsql_test_on_specific_db {:memory:} matview-join-delete-cascade {
CREATE TABLE users(id INTEGER PRIMARY KEY, name TEXT, active INTEGER);
CREATE TABLE sessions(id INTEGER PRIMARY KEY, user_id INTEGER, duration INTEGER);
INSERT INTO users VALUES (1, 'Alice', 1), (2, 'Bob', 1), (3, 'Charlie', 0);
INSERT INTO sessions VALUES
(1, 1, 30),
(2, 1, 45),
(3, 2, 60),
(4, 3, 15),
(5, 2, 90);
CREATE MATERIALIZED VIEW active_sessions AS
SELECT u.name, s.duration
FROM users u
JOIN sessions s ON u.id = s.user_id
WHERE u.active = 1;
SELECT COUNT(*) FROM active_sessions;
-- Delete Bob's sessions
DELETE FROM sessions WHERE user_id = 2;
SELECT COUNT(*) FROM active_sessions;
SELECT * FROM active_sessions ORDER BY name, duration;
} {4
2
Alice|30
Alice|45}
do_execsql_test_on_specific_db {:memory:} matview-join-complex-where {
CREATE TABLE orders(id INTEGER PRIMARY KEY, customer_id INTEGER, product_id INTEGER, quantity INTEGER, price INTEGER, order_date INTEGER);
CREATE TABLE customers(id INTEGER PRIMARY KEY, name TEXT, tier TEXT, country TEXT);
INSERT INTO customers VALUES
(1, 'Alice', 'Gold', 'USA'),
(2, 'Bob', 'Silver', 'Canada'),
(3, 'Charlie', 'Gold', 'USA'),
(4, 'David', 'Bronze', 'UK');
INSERT INTO orders VALUES
(1, 1, 1, 5, 100, 20240101),
(2, 2, 2, 3, 50, 20240102),
(3, 3, 1, 10, 100, 20240103),
(4, 4, 3, 2, 75, 20240104),
(5, 1, 2, 4, 50, 20240105),
(6, 3, 3, 6, 75, 20240106);
-- View: Gold tier USA customers with high-value orders
CREATE MATERIALIZED VIEW premium_usa_orders AS
SELECT c.name, o.quantity, o.price, (o.quantity * o.price) as total
FROM customers c
JOIN orders o ON c.id = o.customer_id
WHERE c.tier = 'Gold'
AND c.country = 'USA'
AND (o.quantity * o.price) >= 400;
SELECT * FROM premium_usa_orders ORDER by total DESC;
} {Charlie|10|100|1000
Alice|5|100|500
Charlie|6|75|450}