From 7178d8d31c91a82310a746052bbdbe71b355fdd4 Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Wed, 17 Sep 2025 10:45:12 -0500 Subject: [PATCH] move the project operator to its own file. The code is becoming impossible to reason about with everything in operator.rs --- core/incremental/mod.rs | 1 + core/incremental/operator.rs | 424 +-------------------------- core/incremental/project_operator.rs | 168 +++++++++++ 3 files changed, 172 insertions(+), 421 deletions(-) create mode 100644 core/incremental/project_operator.rs diff --git a/core/incremental/mod.rs b/core/incremental/mod.rs index 8a6722370..0e45b3194 100644 --- a/core/incremental/mod.rs +++ b/core/incremental/mod.rs @@ -6,4 +6,5 @@ pub mod filter_operator; pub mod input_operator; pub mod operator; pub mod persistence; +pub mod project_operator; pub mod view; diff --git a/core/incremental/operator.rs b/core/incremental/operator.rs index 4374e27e5..43ad8f67c 100644 --- a/core/incremental/operator.rs +++ b/core/incremental/operator.rs @@ -4,22 +4,18 @@ pub use crate::incremental::filter_operator::{FilterOperator, FilterPredicate}; pub use crate::incremental::input_operator::InputOperator; +pub use crate::incremental::project_operator::{ProjectColumn, ProjectOperator}; use crate::function::{AggFunc, Func}; use crate::incremental::dbsp::{Delta, DeltaPair, HashableRow}; -use crate::incremental::expr_compiler::CompiledExpression; use crate::incremental::persistence::{MinMaxPersistState, ReadRecord, RecomputeMinMax, WriteRow}; use crate::schema::{Index, IndexColumn}; use crate::storage::btree::BTreeCursor; -use crate::types::{IOResult, ImmutableRecord, SeekKey, SeekOp, SeekResult, Text}; -use crate::{ - return_and_restore_if_io, return_if_io, Connection, Database, Result, SymbolTable, Value, -}; +use crate::types::{IOResult, ImmutableRecord, SeekKey, SeekOp, SeekResult}; +use crate::{return_and_restore_if_io, return_if_io, Result, Value}; use std::collections::{BTreeMap, HashMap}; use std::fmt::{self, Debug, Display}; use std::sync::{Arc, Mutex}; -use turso_macros::match_ignore_ascii_case; -use turso_parser::ast::{As, Expr, Literal, Name, OneSelect, Operator, ResultColumn}; /// Struct to hold both table and index cursors for DBSP state operations pub struct DbspStateCursors { @@ -795,16 +791,6 @@ pub enum QueryOperator { }, } -#[derive(Debug, Clone)] -pub struct ProjectColumn { - /// The original SQL expression (for debugging/fallback) - pub expr: turso_parser::ast::Expr, - /// Optional alias for the column - pub alias: Option, - /// Compiled expression (handles both trivial columns and complex expressions) - pub compiled: CompiledExpression, -} - #[derive(Debug, Clone)] pub enum JoinType { Inner, @@ -897,410 +883,6 @@ pub trait IncrementalOperator: Debug { fn set_tracker(&mut self, tracker: Arc>); } -/// Project operator - selects/transforms columns -#[derive(Clone)] -pub struct ProjectOperator { - columns: Vec, - input_column_names: Vec, - output_column_names: Vec, - tracker: Option>>, - // Internal in-memory connection for expression evaluation - // Programs are very dependent on having a connection, so give it one. - // - // We could in theory pass the current connection, but there are a host of problems with that. - // For example: during a write transaction, where views are usually updated, we have autocommit - // on. When the program we are executing calls Halt, it will try to commit the current - // transaction, which is absolutely incorrect. - // - // There are other ways to solve this, but a read-only connection to an empty in-memory - // database gives us the closest environment we need to execute expressions. - internal_conn: Arc, -} - -impl std::fmt::Debug for ProjectOperator { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("ProjectOperator") - .field("columns", &self.columns) - .field("input_column_names", &self.input_column_names) - .field("output_column_names", &self.output_column_names) - .field("tracker", &self.tracker) - .finish_non_exhaustive() - } -} - -impl ProjectOperator { - /// Create a new ProjectOperator from a SELECT statement, extracting projection columns - pub fn from_select( - select: &turso_parser::ast::Select, - input_column_names: Vec, - schema: &crate::schema::Schema, - ) -> crate::Result { - // Set up internal connection for expression evaluation - let io = Arc::new(crate::MemoryIO::new()); - let db = Database::open_file( - io, ":memory:", false, // no MVCC needed for expression evaluation - false, // no indexes needed - )?; - let internal_conn = db.connect()?; - // Set to read-only mode and disable auto-commit since we're only evaluating expressions - internal_conn.query_only.set(true); - internal_conn.auto_commit.set(false); - - let temp_syms = SymbolTable::new(); - - // Extract columns from SELECT statement - let columns = if let OneSelect::Select { - columns: ref select_columns, - .. - } = &select.body.select - { - let mut columns = Vec::new(); - for result_col in select_columns { - match result_col { - ResultColumn::Expr(expr, alias) => { - let alias_str = if let Some(As::As(alias_name)) = alias { - Some(alias_name.as_str().to_string()) - } else { - None - }; - // Try to compile the expression (handles both columns and complex expressions) - let compiled = CompiledExpression::compile( - expr, - &input_column_names, - schema, - &temp_syms, - internal_conn.clone(), - )?; - columns.push(ProjectColumn { - expr: (**expr).clone(), - alias: alias_str, - compiled, - }); - } - ResultColumn::Star => { - // Select all columns - create trivial column references - for name in &input_column_names { - // Create an Id expression for the column - let expr = Expr::Id(Name::Ident(name.clone())); - let compiled = CompiledExpression::compile( - &expr, - &input_column_names, - schema, - &temp_syms, - internal_conn.clone(), - )?; - columns.push(ProjectColumn { - expr, - alias: None, - compiled, - }); - } - } - x => { - return Err(crate::LimboError::ParseError(format!( - "Unsupported {x:?} clause when compiling project operator", - ))); - } - } - } - - if columns.is_empty() { - return Err(crate::LimboError::ParseError( - "No columns found when compiling project operator".to_string(), - )); - } - columns - } else { - return Err(crate::LimboError::ParseError( - "Expression is not a valid SELECT expression".to_string(), - )); - }; - - // Generate output column names based on aliases or expressions - let output_column_names = columns - .iter() - .map(|c| { - c.alias.clone().unwrap_or_else(|| match &c.expr { - Expr::Id(name) => name.as_str().to_string(), - Expr::Qualified(table, column) => { - format!("{}.{}", table.as_str(), column.as_str()) - } - Expr::DoublyQualified(db, table, column) => { - format!("{}.{}.{}", db.as_str(), table.as_str(), column.as_str()) - } - _ => c.expr.to_string(), - }) - }) - .collect(); - - Ok(Self { - columns, - input_column_names, - output_column_names, - tracker: None, - internal_conn, - }) - } - - /// Create a ProjectOperator from pre-compiled expressions - pub fn from_compiled( - compiled_exprs: Vec, - aliases: Vec>, - input_column_names: Vec, - output_column_names: Vec, - ) -> crate::Result { - // Set up internal connection for expression evaluation - let io = Arc::new(crate::MemoryIO::new()); - let db = Database::open_file( - io, ":memory:", false, // no MVCC needed for expression evaluation - false, // no indexes needed - )?; - let internal_conn = db.connect()?; - // Set to read-only mode and disable auto-commit since we're only evaluating expressions - internal_conn.query_only.set(true); - internal_conn.auto_commit.set(false); - - // Create ProjectColumn structs from compiled expressions - let columns: Vec = compiled_exprs - .into_iter() - .zip(aliases) - .map(|(compiled, alias)| ProjectColumn { - // Create a placeholder AST expression since we already have the compiled version - expr: turso_parser::ast::Expr::Literal(turso_parser::ast::Literal::Null), - alias, - compiled, - }) - .collect(); - - Ok(Self { - columns, - input_column_names, - output_column_names, - tracker: None, - internal_conn, - }) - } - - /// Get the columns for this projection - pub fn columns(&self) -> &[ProjectColumn] { - &self.columns - } - - fn project_values(&self, values: &[Value]) -> Vec { - let mut output = Vec::new(); - - for col in &self.columns { - // Use the internal connection's pager for expression evaluation - let internal_pager = self.internal_conn.pager.borrow().clone(); - - // Execute the compiled expression (handles both columns and complex expressions) - let result = col - .compiled - .execute(values, internal_pager) - .expect("Failed to execute compiled expression for the Project operator"); - output.push(result); - } - - output - } - - fn evaluate_expression(&self, expr: &turso_parser::ast::Expr, values: &[Value]) -> Value { - match expr { - Expr::Id(name) => { - if let Some(idx) = self - .input_column_names - .iter() - .position(|c| c == name.as_str()) - { - if let Some(v) = values.get(idx) { - return v.clone(); - } - } - Value::Null - } - Expr::Literal(lit) => { - match lit { - Literal::Numeric(n) => { - if let Ok(i) = n.parse::() { - Value::Integer(i) - } else if let Ok(f) = n.parse::() { - Value::Float(f) - } else { - Value::Null - } - } - Literal::String(s) => { - let cleaned = s.trim_matches('\'').trim_matches('"'); - Value::Text(Text::new(cleaned)) - } - Literal::Null => Value::Null, - Literal::Blob(_) - | Literal::Keyword(_) - | Literal::CurrentDate - | Literal::CurrentTime - | Literal::CurrentTimestamp => Value::Null, // Not supported yet - } - } - Expr::Binary(left, op, right) => { - let left_val = self.evaluate_expression(left, values); - let right_val = self.evaluate_expression(right, values); - - match op { - Operator::Add => match (&left_val, &right_val) { - (Value::Integer(a), Value::Integer(b)) => Value::Integer(a + b), - (Value::Float(a), Value::Float(b)) => Value::Float(a + b), - (Value::Integer(a), Value::Float(b)) => Value::Float(*a as f64 + b), - (Value::Float(a), Value::Integer(b)) => Value::Float(a + *b as f64), - _ => Value::Null, - }, - Operator::Subtract => match (&left_val, &right_val) { - (Value::Integer(a), Value::Integer(b)) => Value::Integer(a - b), - (Value::Float(a), Value::Float(b)) => Value::Float(a - b), - (Value::Integer(a), Value::Float(b)) => Value::Float(*a as f64 - b), - (Value::Float(a), Value::Integer(b)) => Value::Float(a - *b as f64), - _ => Value::Null, - }, - Operator::Multiply => match (&left_val, &right_val) { - (Value::Integer(a), Value::Integer(b)) => Value::Integer(a * b), - (Value::Float(a), Value::Float(b)) => Value::Float(a * b), - (Value::Integer(a), Value::Float(b)) => Value::Float(*a as f64 * b), - (Value::Float(a), Value::Integer(b)) => Value::Float(a * *b as f64), - _ => Value::Null, - }, - Operator::Divide => match (&left_val, &right_val) { - (Value::Integer(a), Value::Integer(b)) => { - if *b != 0 { - Value::Integer(a / b) - } else { - Value::Null - } - } - (Value::Float(a), Value::Float(b)) => { - if *b != 0.0 { - Value::Float(a / b) - } else { - Value::Null - } - } - (Value::Integer(a), Value::Float(b)) => { - if *b != 0.0 { - Value::Float(*a as f64 / b) - } else { - Value::Null - } - } - (Value::Float(a), Value::Integer(b)) => { - if *b != 0 { - Value::Float(a / *b as f64) - } else { - Value::Null - } - } - _ => Value::Null, - }, - _ => Value::Null, // Other operators not supported yet - } - } - Expr::FunctionCall { name, args, .. } => { - let name_bytes = name.as_str().as_bytes(); - match_ignore_ascii_case!(match name_bytes { - b"hex" => { - if args.len() == 1 { - let arg_val = self.evaluate_expression(&args[0], values); - match arg_val { - Value::Integer(i) => Value::Text(Text::new(&format!("{i:X}"))), - _ => Value::Null, - } - } else { - Value::Null - } - } - _ => Value::Null, // Other functions not supported yet - }) - } - Expr::Parenthesized(inner) => { - assert!( - inner.len() <= 1, - "Parenthesized expressions with multiple elements are not supported" - ); - if !inner.is_empty() { - self.evaluate_expression(&inner[0], values) - } else { - Value::Null - } - } - _ => Value::Null, // Other expression types not supported yet - } - } -} - -impl IncrementalOperator for ProjectOperator { - fn eval( - &mut self, - state: &mut EvalState, - _cursors: &mut DbspStateCursors, - ) -> Result> { - let delta = match state { - EvalState::Init { deltas } => { - // Project operators only use left_delta, right_delta must be empty - assert!( - deltas.right.is_empty(), - "ProjectOperator expects right_delta to be empty" - ); - std::mem::take(&mut deltas.left) - } - _ => unreachable!( - "ProjectOperator doesn't execute the state machine. Should be in Init state" - ), - }; - - let mut output_delta = Delta::new(); - - for (row, weight) in delta.changes { - if let Some(tracker) = &self.tracker { - tracker.lock().unwrap().record_project(); - } - - let projected = self.project_values(&row.values); - let projected_row = HashableRow::new(row.rowid, projected); - output_delta.changes.push((projected_row, weight)); - } - - *state = EvalState::Done; - Ok(IOResult::Done(output_delta)) - } - - fn commit( - &mut self, - deltas: DeltaPair, - _cursors: &mut DbspStateCursors, - ) -> Result> { - // Project operator only uses left delta, right must be empty - assert!( - deltas.right.is_empty(), - "ProjectOperator expects right delta to be empty in commit" - ); - - let mut output_delta = Delta::new(); - - // Commit the delta to our internal state and build output - for (row, weight) in &deltas.left.changes { - if let Some(tracker) = &self.tracker { - tracker.lock().unwrap().record_project(); - } - let projected = self.project_values(&row.values); - let projected_row = HashableRow::new(row.rowid, projected); - output_delta.changes.push((projected_row, *weight)); - } - - Ok(crate::types::IOResult::Done(output_delta)) - } - - fn set_tracker(&mut self, tracker: Arc>) { - self.tracker = Some(tracker); - } -} - /// Aggregate operator - performs incremental aggregation with GROUP BY /// Maintains running totals/counts that are updated incrementally /// diff --git a/core/incremental/project_operator.rs b/core/incremental/project_operator.rs new file mode 100644 index 000000000..b1d9fc9ed --- /dev/null +++ b/core/incremental/project_operator.rs @@ -0,0 +1,168 @@ +// Project operator for DBSP-style incremental computation +// This operator projects/transforms columns in a relational stream + +use crate::incremental::dbsp::{Delta, DeltaPair, HashableRow}; +use crate::incremental::expr_compiler::CompiledExpression; +use crate::incremental::operator::{ + ComputationTracker, DbspStateCursors, EvalState, IncrementalOperator, +}; +use crate::types::IOResult; +use crate::{Connection, Database, Result, Value}; +use std::sync::{Arc, Mutex}; + +#[derive(Debug, Clone)] +pub struct ProjectColumn { + /// Compiled expression (handles both trivial columns and complex expressions) + pub compiled: CompiledExpression, +} + +/// Project operator - selects/transforms columns +#[derive(Clone)] +pub struct ProjectOperator { + columns: Vec, + input_column_names: Vec, + output_column_names: Vec, + tracker: Option>>, + // Internal in-memory connection for expression evaluation + // Programs are very dependent on having a connection, so give it one. + // + // We could in theory pass the current connection, but there are a host of problems with that. + // For example: during a write transaction, where views are usually updated, we have autocommit + // on. When the program we are executing calls Halt, it will try to commit the current + // transaction, which is absolutely incorrect. + // + // There are other ways to solve this, but a read-only connection to an empty in-memory + // database gives us the closest environment we need to execute expressions. + internal_conn: Arc, +} + +impl std::fmt::Debug for ProjectOperator { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ProjectOperator") + .field("columns", &self.columns) + .field("input_column_names", &self.input_column_names) + .field("output_column_names", &self.output_column_names) + .finish() + } +} + +impl ProjectOperator { + /// Create a ProjectOperator from pre-compiled expressions + pub fn from_compiled( + compiled_exprs: Vec, + aliases: Vec>, + input_column_names: Vec, + output_column_names: Vec, + ) -> crate::Result { + // Set up internal connection for expression evaluation + let io = Arc::new(crate::MemoryIO::new()); + let db = Database::open_file( + io, ":memory:", false, // no MVCC needed for expression evaluation + false, // no indexes needed + )?; + let internal_conn = db.connect()?; + // Set to read-only mode and disable auto-commit since we're only evaluating expressions + internal_conn.query_only.set(true); + internal_conn.auto_commit.set(false); + + // Create ProjectColumn structs from compiled expressions + let columns: Vec = compiled_exprs + .into_iter() + .zip(aliases) + .map(|(compiled, _alias)| ProjectColumn { compiled }) + .collect(); + + Ok(Self { + columns, + input_column_names, + output_column_names, + tracker: None, + internal_conn, + }) + } + + fn project_values(&self, values: &[Value]) -> Vec { + let mut output = Vec::new(); + + for col in &self.columns { + // Use the internal connection's pager for expression evaluation + let internal_pager = self.internal_conn.pager.borrow().clone(); + + // Execute the compiled expression (handles both columns and complex expressions) + let result = col + .compiled + .execute(values, internal_pager) + .expect("Failed to execute compiled expression for the Project operator"); + output.push(result); + } + + output + } +} + +impl IncrementalOperator for ProjectOperator { + fn eval( + &mut self, + state: &mut EvalState, + _cursors: &mut DbspStateCursors, + ) -> Result> { + let delta = match state { + EvalState::Init { deltas } => { + // Project operators only use left_delta, right_delta must be empty + assert!( + deltas.right.is_empty(), + "ProjectOperator expects right_delta to be empty" + ); + std::mem::take(&mut deltas.left) + } + _ => unreachable!( + "ProjectOperator doesn't execute the state machine. Should be in Init state" + ), + }; + + let mut output_delta = Delta::new(); + + for (row, weight) in delta.changes { + if let Some(tracker) = &self.tracker { + tracker.lock().unwrap().record_project(); + } + + let projected = self.project_values(&row.values); + let projected_row = HashableRow::new(row.rowid, projected); + output_delta.changes.push((projected_row, weight)); + } + + *state = EvalState::Done; + Ok(IOResult::Done(output_delta)) + } + + fn commit( + &mut self, + deltas: DeltaPair, + _cursors: &mut DbspStateCursors, + ) -> Result> { + // Project operator only uses left delta, right must be empty + assert!( + deltas.right.is_empty(), + "ProjectOperator expects right delta to be empty in commit" + ); + + let mut output_delta = Delta::new(); + + // Commit the delta to our internal state and build output + for (row, weight) in &deltas.left.changes { + if let Some(tracker) = &self.tracker { + tracker.lock().unwrap().record_project(); + } + let projected = self.project_values(&row.values); + let projected_row = HashableRow::new(row.rowid, projected); + output_delta.changes.push((projected_row, *weight)); + } + + Ok(crate::types::IOResult::Done(output_delta)) + } + + fn set_tracker(&mut self, tracker: Arc>) { + self.tracker = Some(tracker); + } +}