mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-23 17:05:36 +01:00
move the project operator to its own file.
The code is becoming impossible to reason about with everything in operator.rs
This commit is contained in:
@@ -6,4 +6,5 @@ pub mod filter_operator;
|
||||
pub mod input_operator;
|
||||
pub mod operator;
|
||||
pub mod persistence;
|
||||
pub mod project_operator;
|
||||
pub mod view;
|
||||
|
||||
@@ -4,22 +4,18 @@
|
||||
|
||||
pub use crate::incremental::filter_operator::{FilterOperator, FilterPredicate};
|
||||
pub use crate::incremental::input_operator::InputOperator;
|
||||
pub use crate::incremental::project_operator::{ProjectColumn, ProjectOperator};
|
||||
|
||||
use crate::function::{AggFunc, Func};
|
||||
use crate::incremental::dbsp::{Delta, DeltaPair, HashableRow};
|
||||
use crate::incremental::expr_compiler::CompiledExpression;
|
||||
use crate::incremental::persistence::{MinMaxPersistState, ReadRecord, RecomputeMinMax, WriteRow};
|
||||
use crate::schema::{Index, IndexColumn};
|
||||
use crate::storage::btree::BTreeCursor;
|
||||
use crate::types::{IOResult, ImmutableRecord, SeekKey, SeekOp, SeekResult, Text};
|
||||
use crate::{
|
||||
return_and_restore_if_io, return_if_io, Connection, Database, Result, SymbolTable, Value,
|
||||
};
|
||||
use crate::types::{IOResult, ImmutableRecord, SeekKey, SeekOp, SeekResult};
|
||||
use crate::{return_and_restore_if_io, return_if_io, Result, Value};
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::fmt::{self, Debug, Display};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use turso_macros::match_ignore_ascii_case;
|
||||
use turso_parser::ast::{As, Expr, Literal, Name, OneSelect, Operator, ResultColumn};
|
||||
|
||||
/// Struct to hold both table and index cursors for DBSP state operations
|
||||
pub struct DbspStateCursors {
|
||||
@@ -795,16 +791,6 @@ pub enum QueryOperator {
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ProjectColumn {
|
||||
/// The original SQL expression (for debugging/fallback)
|
||||
pub expr: turso_parser::ast::Expr,
|
||||
/// Optional alias for the column
|
||||
pub alias: Option<String>,
|
||||
/// Compiled expression (handles both trivial columns and complex expressions)
|
||||
pub compiled: CompiledExpression,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum JoinType {
|
||||
Inner,
|
||||
@@ -897,410 +883,6 @@ pub trait IncrementalOperator: Debug {
|
||||
fn set_tracker(&mut self, tracker: Arc<Mutex<ComputationTracker>>);
|
||||
}
|
||||
|
||||
/// Project operator - selects/transforms columns
|
||||
#[derive(Clone)]
|
||||
pub struct ProjectOperator {
|
||||
columns: Vec<ProjectColumn>,
|
||||
input_column_names: Vec<String>,
|
||||
output_column_names: Vec<String>,
|
||||
tracker: Option<Arc<Mutex<ComputationTracker>>>,
|
||||
// Internal in-memory connection for expression evaluation
|
||||
// Programs are very dependent on having a connection, so give it one.
|
||||
//
|
||||
// We could in theory pass the current connection, but there are a host of problems with that.
|
||||
// For example: during a write transaction, where views are usually updated, we have autocommit
|
||||
// on. When the program we are executing calls Halt, it will try to commit the current
|
||||
// transaction, which is absolutely incorrect.
|
||||
//
|
||||
// There are other ways to solve this, but a read-only connection to an empty in-memory
|
||||
// database gives us the closest environment we need to execute expressions.
|
||||
internal_conn: Arc<Connection>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for ProjectOperator {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("ProjectOperator")
|
||||
.field("columns", &self.columns)
|
||||
.field("input_column_names", &self.input_column_names)
|
||||
.field("output_column_names", &self.output_column_names)
|
||||
.field("tracker", &self.tracker)
|
||||
.finish_non_exhaustive()
|
||||
}
|
||||
}
|
||||
|
||||
impl ProjectOperator {
|
||||
/// Create a new ProjectOperator from a SELECT statement, extracting projection columns
|
||||
pub fn from_select(
|
||||
select: &turso_parser::ast::Select,
|
||||
input_column_names: Vec<String>,
|
||||
schema: &crate::schema::Schema,
|
||||
) -> crate::Result<Self> {
|
||||
// Set up internal connection for expression evaluation
|
||||
let io = Arc::new(crate::MemoryIO::new());
|
||||
let db = Database::open_file(
|
||||
io, ":memory:", false, // no MVCC needed for expression evaluation
|
||||
false, // no indexes needed
|
||||
)?;
|
||||
let internal_conn = db.connect()?;
|
||||
// Set to read-only mode and disable auto-commit since we're only evaluating expressions
|
||||
internal_conn.query_only.set(true);
|
||||
internal_conn.auto_commit.set(false);
|
||||
|
||||
let temp_syms = SymbolTable::new();
|
||||
|
||||
// Extract columns from SELECT statement
|
||||
let columns = if let OneSelect::Select {
|
||||
columns: ref select_columns,
|
||||
..
|
||||
} = &select.body.select
|
||||
{
|
||||
let mut columns = Vec::new();
|
||||
for result_col in select_columns {
|
||||
match result_col {
|
||||
ResultColumn::Expr(expr, alias) => {
|
||||
let alias_str = if let Some(As::As(alias_name)) = alias {
|
||||
Some(alias_name.as_str().to_string())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
// Try to compile the expression (handles both columns and complex expressions)
|
||||
let compiled = CompiledExpression::compile(
|
||||
expr,
|
||||
&input_column_names,
|
||||
schema,
|
||||
&temp_syms,
|
||||
internal_conn.clone(),
|
||||
)?;
|
||||
columns.push(ProjectColumn {
|
||||
expr: (**expr).clone(),
|
||||
alias: alias_str,
|
||||
compiled,
|
||||
});
|
||||
}
|
||||
ResultColumn::Star => {
|
||||
// Select all columns - create trivial column references
|
||||
for name in &input_column_names {
|
||||
// Create an Id expression for the column
|
||||
let expr = Expr::Id(Name::Ident(name.clone()));
|
||||
let compiled = CompiledExpression::compile(
|
||||
&expr,
|
||||
&input_column_names,
|
||||
schema,
|
||||
&temp_syms,
|
||||
internal_conn.clone(),
|
||||
)?;
|
||||
columns.push(ProjectColumn {
|
||||
expr,
|
||||
alias: None,
|
||||
compiled,
|
||||
});
|
||||
}
|
||||
}
|
||||
x => {
|
||||
return Err(crate::LimboError::ParseError(format!(
|
||||
"Unsupported {x:?} clause when compiling project operator",
|
||||
)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if columns.is_empty() {
|
||||
return Err(crate::LimboError::ParseError(
|
||||
"No columns found when compiling project operator".to_string(),
|
||||
));
|
||||
}
|
||||
columns
|
||||
} else {
|
||||
return Err(crate::LimboError::ParseError(
|
||||
"Expression is not a valid SELECT expression".to_string(),
|
||||
));
|
||||
};
|
||||
|
||||
// Generate output column names based on aliases or expressions
|
||||
let output_column_names = columns
|
||||
.iter()
|
||||
.map(|c| {
|
||||
c.alias.clone().unwrap_or_else(|| match &c.expr {
|
||||
Expr::Id(name) => name.as_str().to_string(),
|
||||
Expr::Qualified(table, column) => {
|
||||
format!("{}.{}", table.as_str(), column.as_str())
|
||||
}
|
||||
Expr::DoublyQualified(db, table, column) => {
|
||||
format!("{}.{}.{}", db.as_str(), table.as_str(), column.as_str())
|
||||
}
|
||||
_ => c.expr.to_string(),
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
|
||||
Ok(Self {
|
||||
columns,
|
||||
input_column_names,
|
||||
output_column_names,
|
||||
tracker: None,
|
||||
internal_conn,
|
||||
})
|
||||
}
|
||||
|
||||
/// Create a ProjectOperator from pre-compiled expressions
|
||||
pub fn from_compiled(
|
||||
compiled_exprs: Vec<CompiledExpression>,
|
||||
aliases: Vec<Option<String>>,
|
||||
input_column_names: Vec<String>,
|
||||
output_column_names: Vec<String>,
|
||||
) -> crate::Result<Self> {
|
||||
// Set up internal connection for expression evaluation
|
||||
let io = Arc::new(crate::MemoryIO::new());
|
||||
let db = Database::open_file(
|
||||
io, ":memory:", false, // no MVCC needed for expression evaluation
|
||||
false, // no indexes needed
|
||||
)?;
|
||||
let internal_conn = db.connect()?;
|
||||
// Set to read-only mode and disable auto-commit since we're only evaluating expressions
|
||||
internal_conn.query_only.set(true);
|
||||
internal_conn.auto_commit.set(false);
|
||||
|
||||
// Create ProjectColumn structs from compiled expressions
|
||||
let columns: Vec<ProjectColumn> = compiled_exprs
|
||||
.into_iter()
|
||||
.zip(aliases)
|
||||
.map(|(compiled, alias)| ProjectColumn {
|
||||
// Create a placeholder AST expression since we already have the compiled version
|
||||
expr: turso_parser::ast::Expr::Literal(turso_parser::ast::Literal::Null),
|
||||
alias,
|
||||
compiled,
|
||||
})
|
||||
.collect();
|
||||
|
||||
Ok(Self {
|
||||
columns,
|
||||
input_column_names,
|
||||
output_column_names,
|
||||
tracker: None,
|
||||
internal_conn,
|
||||
})
|
||||
}
|
||||
|
||||
/// Get the columns for this projection
|
||||
pub fn columns(&self) -> &[ProjectColumn] {
|
||||
&self.columns
|
||||
}
|
||||
|
||||
fn project_values(&self, values: &[Value]) -> Vec<Value> {
|
||||
let mut output = Vec::new();
|
||||
|
||||
for col in &self.columns {
|
||||
// Use the internal connection's pager for expression evaluation
|
||||
let internal_pager = self.internal_conn.pager.borrow().clone();
|
||||
|
||||
// Execute the compiled expression (handles both columns and complex expressions)
|
||||
let result = col
|
||||
.compiled
|
||||
.execute(values, internal_pager)
|
||||
.expect("Failed to execute compiled expression for the Project operator");
|
||||
output.push(result);
|
||||
}
|
||||
|
||||
output
|
||||
}
|
||||
|
||||
fn evaluate_expression(&self, expr: &turso_parser::ast::Expr, values: &[Value]) -> Value {
|
||||
match expr {
|
||||
Expr::Id(name) => {
|
||||
if let Some(idx) = self
|
||||
.input_column_names
|
||||
.iter()
|
||||
.position(|c| c == name.as_str())
|
||||
{
|
||||
if let Some(v) = values.get(idx) {
|
||||
return v.clone();
|
||||
}
|
||||
}
|
||||
Value::Null
|
||||
}
|
||||
Expr::Literal(lit) => {
|
||||
match lit {
|
||||
Literal::Numeric(n) => {
|
||||
if let Ok(i) = n.parse::<i64>() {
|
||||
Value::Integer(i)
|
||||
} else if let Ok(f) = n.parse::<f64>() {
|
||||
Value::Float(f)
|
||||
} else {
|
||||
Value::Null
|
||||
}
|
||||
}
|
||||
Literal::String(s) => {
|
||||
let cleaned = s.trim_matches('\'').trim_matches('"');
|
||||
Value::Text(Text::new(cleaned))
|
||||
}
|
||||
Literal::Null => Value::Null,
|
||||
Literal::Blob(_)
|
||||
| Literal::Keyword(_)
|
||||
| Literal::CurrentDate
|
||||
| Literal::CurrentTime
|
||||
| Literal::CurrentTimestamp => Value::Null, // Not supported yet
|
||||
}
|
||||
}
|
||||
Expr::Binary(left, op, right) => {
|
||||
let left_val = self.evaluate_expression(left, values);
|
||||
let right_val = self.evaluate_expression(right, values);
|
||||
|
||||
match op {
|
||||
Operator::Add => match (&left_val, &right_val) {
|
||||
(Value::Integer(a), Value::Integer(b)) => Value::Integer(a + b),
|
||||
(Value::Float(a), Value::Float(b)) => Value::Float(a + b),
|
||||
(Value::Integer(a), Value::Float(b)) => Value::Float(*a as f64 + b),
|
||||
(Value::Float(a), Value::Integer(b)) => Value::Float(a + *b as f64),
|
||||
_ => Value::Null,
|
||||
},
|
||||
Operator::Subtract => match (&left_val, &right_val) {
|
||||
(Value::Integer(a), Value::Integer(b)) => Value::Integer(a - b),
|
||||
(Value::Float(a), Value::Float(b)) => Value::Float(a - b),
|
||||
(Value::Integer(a), Value::Float(b)) => Value::Float(*a as f64 - b),
|
||||
(Value::Float(a), Value::Integer(b)) => Value::Float(a - *b as f64),
|
||||
_ => Value::Null,
|
||||
},
|
||||
Operator::Multiply => match (&left_val, &right_val) {
|
||||
(Value::Integer(a), Value::Integer(b)) => Value::Integer(a * b),
|
||||
(Value::Float(a), Value::Float(b)) => Value::Float(a * b),
|
||||
(Value::Integer(a), Value::Float(b)) => Value::Float(*a as f64 * b),
|
||||
(Value::Float(a), Value::Integer(b)) => Value::Float(a * *b as f64),
|
||||
_ => Value::Null,
|
||||
},
|
||||
Operator::Divide => match (&left_val, &right_val) {
|
||||
(Value::Integer(a), Value::Integer(b)) => {
|
||||
if *b != 0 {
|
||||
Value::Integer(a / b)
|
||||
} else {
|
||||
Value::Null
|
||||
}
|
||||
}
|
||||
(Value::Float(a), Value::Float(b)) => {
|
||||
if *b != 0.0 {
|
||||
Value::Float(a / b)
|
||||
} else {
|
||||
Value::Null
|
||||
}
|
||||
}
|
||||
(Value::Integer(a), Value::Float(b)) => {
|
||||
if *b != 0.0 {
|
||||
Value::Float(*a as f64 / b)
|
||||
} else {
|
||||
Value::Null
|
||||
}
|
||||
}
|
||||
(Value::Float(a), Value::Integer(b)) => {
|
||||
if *b != 0 {
|
||||
Value::Float(a / *b as f64)
|
||||
} else {
|
||||
Value::Null
|
||||
}
|
||||
}
|
||||
_ => Value::Null,
|
||||
},
|
||||
_ => Value::Null, // Other operators not supported yet
|
||||
}
|
||||
}
|
||||
Expr::FunctionCall { name, args, .. } => {
|
||||
let name_bytes = name.as_str().as_bytes();
|
||||
match_ignore_ascii_case!(match name_bytes {
|
||||
b"hex" => {
|
||||
if args.len() == 1 {
|
||||
let arg_val = self.evaluate_expression(&args[0], values);
|
||||
match arg_val {
|
||||
Value::Integer(i) => Value::Text(Text::new(&format!("{i:X}"))),
|
||||
_ => Value::Null,
|
||||
}
|
||||
} else {
|
||||
Value::Null
|
||||
}
|
||||
}
|
||||
_ => Value::Null, // Other functions not supported yet
|
||||
})
|
||||
}
|
||||
Expr::Parenthesized(inner) => {
|
||||
assert!(
|
||||
inner.len() <= 1,
|
||||
"Parenthesized expressions with multiple elements are not supported"
|
||||
);
|
||||
if !inner.is_empty() {
|
||||
self.evaluate_expression(&inner[0], values)
|
||||
} else {
|
||||
Value::Null
|
||||
}
|
||||
}
|
||||
_ => Value::Null, // Other expression types not supported yet
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl IncrementalOperator for ProjectOperator {
|
||||
fn eval(
|
||||
&mut self,
|
||||
state: &mut EvalState,
|
||||
_cursors: &mut DbspStateCursors,
|
||||
) -> Result<IOResult<Delta>> {
|
||||
let delta = match state {
|
||||
EvalState::Init { deltas } => {
|
||||
// Project operators only use left_delta, right_delta must be empty
|
||||
assert!(
|
||||
deltas.right.is_empty(),
|
||||
"ProjectOperator expects right_delta to be empty"
|
||||
);
|
||||
std::mem::take(&mut deltas.left)
|
||||
}
|
||||
_ => unreachable!(
|
||||
"ProjectOperator doesn't execute the state machine. Should be in Init state"
|
||||
),
|
||||
};
|
||||
|
||||
let mut output_delta = Delta::new();
|
||||
|
||||
for (row, weight) in delta.changes {
|
||||
if let Some(tracker) = &self.tracker {
|
||||
tracker.lock().unwrap().record_project();
|
||||
}
|
||||
|
||||
let projected = self.project_values(&row.values);
|
||||
let projected_row = HashableRow::new(row.rowid, projected);
|
||||
output_delta.changes.push((projected_row, weight));
|
||||
}
|
||||
|
||||
*state = EvalState::Done;
|
||||
Ok(IOResult::Done(output_delta))
|
||||
}
|
||||
|
||||
fn commit(
|
||||
&mut self,
|
||||
deltas: DeltaPair,
|
||||
_cursors: &mut DbspStateCursors,
|
||||
) -> Result<IOResult<Delta>> {
|
||||
// Project operator only uses left delta, right must be empty
|
||||
assert!(
|
||||
deltas.right.is_empty(),
|
||||
"ProjectOperator expects right delta to be empty in commit"
|
||||
);
|
||||
|
||||
let mut output_delta = Delta::new();
|
||||
|
||||
// Commit the delta to our internal state and build output
|
||||
for (row, weight) in &deltas.left.changes {
|
||||
if let Some(tracker) = &self.tracker {
|
||||
tracker.lock().unwrap().record_project();
|
||||
}
|
||||
let projected = self.project_values(&row.values);
|
||||
let projected_row = HashableRow::new(row.rowid, projected);
|
||||
output_delta.changes.push((projected_row, *weight));
|
||||
}
|
||||
|
||||
Ok(crate::types::IOResult::Done(output_delta))
|
||||
}
|
||||
|
||||
fn set_tracker(&mut self, tracker: Arc<Mutex<ComputationTracker>>) {
|
||||
self.tracker = Some(tracker);
|
||||
}
|
||||
}
|
||||
|
||||
/// Aggregate operator - performs incremental aggregation with GROUP BY
|
||||
/// Maintains running totals/counts that are updated incrementally
|
||||
///
|
||||
|
||||
168
core/incremental/project_operator.rs
Normal file
168
core/incremental/project_operator.rs
Normal file
@@ -0,0 +1,168 @@
|
||||
// Project operator for DBSP-style incremental computation
|
||||
// This operator projects/transforms columns in a relational stream
|
||||
|
||||
use crate::incremental::dbsp::{Delta, DeltaPair, HashableRow};
|
||||
use crate::incremental::expr_compiler::CompiledExpression;
|
||||
use crate::incremental::operator::{
|
||||
ComputationTracker, DbspStateCursors, EvalState, IncrementalOperator,
|
||||
};
|
||||
use crate::types::IOResult;
|
||||
use crate::{Connection, Database, Result, Value};
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ProjectColumn {
|
||||
/// Compiled expression (handles both trivial columns and complex expressions)
|
||||
pub compiled: CompiledExpression,
|
||||
}
|
||||
|
||||
/// Project operator - selects/transforms columns
|
||||
#[derive(Clone)]
|
||||
pub struct ProjectOperator {
|
||||
columns: Vec<ProjectColumn>,
|
||||
input_column_names: Vec<String>,
|
||||
output_column_names: Vec<String>,
|
||||
tracker: Option<Arc<Mutex<ComputationTracker>>>,
|
||||
// Internal in-memory connection for expression evaluation
|
||||
// Programs are very dependent on having a connection, so give it one.
|
||||
//
|
||||
// We could in theory pass the current connection, but there are a host of problems with that.
|
||||
// For example: during a write transaction, where views are usually updated, we have autocommit
|
||||
// on. When the program we are executing calls Halt, it will try to commit the current
|
||||
// transaction, which is absolutely incorrect.
|
||||
//
|
||||
// There are other ways to solve this, but a read-only connection to an empty in-memory
|
||||
// database gives us the closest environment we need to execute expressions.
|
||||
internal_conn: Arc<Connection>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for ProjectOperator {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("ProjectOperator")
|
||||
.field("columns", &self.columns)
|
||||
.field("input_column_names", &self.input_column_names)
|
||||
.field("output_column_names", &self.output_column_names)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl ProjectOperator {
|
||||
/// Create a ProjectOperator from pre-compiled expressions
|
||||
pub fn from_compiled(
|
||||
compiled_exprs: Vec<CompiledExpression>,
|
||||
aliases: Vec<Option<String>>,
|
||||
input_column_names: Vec<String>,
|
||||
output_column_names: Vec<String>,
|
||||
) -> crate::Result<Self> {
|
||||
// Set up internal connection for expression evaluation
|
||||
let io = Arc::new(crate::MemoryIO::new());
|
||||
let db = Database::open_file(
|
||||
io, ":memory:", false, // no MVCC needed for expression evaluation
|
||||
false, // no indexes needed
|
||||
)?;
|
||||
let internal_conn = db.connect()?;
|
||||
// Set to read-only mode and disable auto-commit since we're only evaluating expressions
|
||||
internal_conn.query_only.set(true);
|
||||
internal_conn.auto_commit.set(false);
|
||||
|
||||
// Create ProjectColumn structs from compiled expressions
|
||||
let columns: Vec<ProjectColumn> = compiled_exprs
|
||||
.into_iter()
|
||||
.zip(aliases)
|
||||
.map(|(compiled, _alias)| ProjectColumn { compiled })
|
||||
.collect();
|
||||
|
||||
Ok(Self {
|
||||
columns,
|
||||
input_column_names,
|
||||
output_column_names,
|
||||
tracker: None,
|
||||
internal_conn,
|
||||
})
|
||||
}
|
||||
|
||||
fn project_values(&self, values: &[Value]) -> Vec<Value> {
|
||||
let mut output = Vec::new();
|
||||
|
||||
for col in &self.columns {
|
||||
// Use the internal connection's pager for expression evaluation
|
||||
let internal_pager = self.internal_conn.pager.borrow().clone();
|
||||
|
||||
// Execute the compiled expression (handles both columns and complex expressions)
|
||||
let result = col
|
||||
.compiled
|
||||
.execute(values, internal_pager)
|
||||
.expect("Failed to execute compiled expression for the Project operator");
|
||||
output.push(result);
|
||||
}
|
||||
|
||||
output
|
||||
}
|
||||
}
|
||||
|
||||
impl IncrementalOperator for ProjectOperator {
|
||||
fn eval(
|
||||
&mut self,
|
||||
state: &mut EvalState,
|
||||
_cursors: &mut DbspStateCursors,
|
||||
) -> Result<IOResult<Delta>> {
|
||||
let delta = match state {
|
||||
EvalState::Init { deltas } => {
|
||||
// Project operators only use left_delta, right_delta must be empty
|
||||
assert!(
|
||||
deltas.right.is_empty(),
|
||||
"ProjectOperator expects right_delta to be empty"
|
||||
);
|
||||
std::mem::take(&mut deltas.left)
|
||||
}
|
||||
_ => unreachable!(
|
||||
"ProjectOperator doesn't execute the state machine. Should be in Init state"
|
||||
),
|
||||
};
|
||||
|
||||
let mut output_delta = Delta::new();
|
||||
|
||||
for (row, weight) in delta.changes {
|
||||
if let Some(tracker) = &self.tracker {
|
||||
tracker.lock().unwrap().record_project();
|
||||
}
|
||||
|
||||
let projected = self.project_values(&row.values);
|
||||
let projected_row = HashableRow::new(row.rowid, projected);
|
||||
output_delta.changes.push((projected_row, weight));
|
||||
}
|
||||
|
||||
*state = EvalState::Done;
|
||||
Ok(IOResult::Done(output_delta))
|
||||
}
|
||||
|
||||
fn commit(
|
||||
&mut self,
|
||||
deltas: DeltaPair,
|
||||
_cursors: &mut DbspStateCursors,
|
||||
) -> Result<IOResult<Delta>> {
|
||||
// Project operator only uses left delta, right must be empty
|
||||
assert!(
|
||||
deltas.right.is_empty(),
|
||||
"ProjectOperator expects right delta to be empty in commit"
|
||||
);
|
||||
|
||||
let mut output_delta = Delta::new();
|
||||
|
||||
// Commit the delta to our internal state and build output
|
||||
for (row, weight) in &deltas.left.changes {
|
||||
if let Some(tracker) = &self.tracker {
|
||||
tracker.lock().unwrap().record_project();
|
||||
}
|
||||
let projected = self.project_values(&row.values);
|
||||
let projected_row = HashableRow::new(row.rowid, projected);
|
||||
output_delta.changes.push((projected_row, *weight));
|
||||
}
|
||||
|
||||
Ok(crate::types::IOResult::Done(output_delta))
|
||||
}
|
||||
|
||||
fn set_tracker(&mut self, tracker: Arc<Mutex<ComputationTracker>>) {
|
||||
self.tracker = Some(tracker);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user