mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-06 08:44:23 +01:00
Merge 'Change views to use DBSP circuits' from Glauber Costa
Instead of using static elements, use a dynamically generated DBSP- circuit to keep views. The DBSP circuit is generated from the logical plan, which only supports enough for us to generate the DBSP circuit at the moment. The state of the view is still kept inside the IncrementalView, instead of materialized at the operator level. As a consequence, this still depends on us always populating the view at startup. Fixing this is the next step. Closes #2815
This commit is contained in:
2922
core/incremental/compiler.rs
Normal file
2922
core/incremental/compiler.rs
Normal file
File diff suppressed because it is too large
Load Diff
@@ -1,3 +1,4 @@
|
||||
pub mod compiler;
|
||||
pub mod dbsp;
|
||||
pub mod expr_compiler;
|
||||
pub mod hashable_row;
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,13 +1,12 @@
|
||||
use super::compiler::{DbspCircuit, DbspCompiler, DeltaSet};
|
||||
use super::dbsp::{RowKeyStream, RowKeyZSet};
|
||||
use super::operator::{
|
||||
AggregateFunction, AggregateOperator, ComputationTracker, Delta, FilterOperator,
|
||||
FilterPredicate, IncrementalOperator, ProjectOperator,
|
||||
};
|
||||
use super::operator::{ComputationTracker, Delta, FilterPredicate};
|
||||
use crate::schema::{BTreeTable, Column, Schema};
|
||||
use crate::translate::logical::LogicalPlanBuilder;
|
||||
use crate::types::{IOCompletions, IOResult, Value};
|
||||
use crate::util::{extract_column_name_from_expr, extract_view_columns};
|
||||
use crate::util::extract_view_columns;
|
||||
use crate::{io_yield_one, Completion, LimboError, Result, Statement};
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::fmt;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use turso_parser::ast;
|
||||
@@ -60,8 +59,7 @@ pub struct ViewTransactionState {
|
||||
/// for large aggregations, because then we don't have to re-compute when opening the database
|
||||
/// again.
|
||||
///
|
||||
/// Right now we are supporting the simplest views by keeping the operators in the view and
|
||||
/// applying them in a sane order. But the general solution would turn this into a DBSP circuit.
|
||||
/// Uses DBSP circuits for incremental computation.
|
||||
#[derive(Debug)]
|
||||
pub struct IncrementalView {
|
||||
// Stream of row keys for this view
|
||||
@@ -75,12 +73,11 @@ pub struct IncrementalView {
|
||||
// The SELECT statement that defines how to transform input data
|
||||
pub select_stmt: ast::Select,
|
||||
|
||||
// Internal filter operator for predicate evaluation
|
||||
filter_operator: Option<FilterOperator>,
|
||||
// Internal project operator for value transformation
|
||||
project_operator: Option<ProjectOperator>,
|
||||
// Internal aggregate operator for GROUP BY and aggregations
|
||||
aggregate_operator: Option<AggregateOperator>,
|
||||
// DBSP circuit that encapsulates the computation
|
||||
circuit: DbspCircuit,
|
||||
// Track whether circuit has been initialized with data
|
||||
circuit_initialized: bool,
|
||||
|
||||
// Tables referenced by this view (extracted from FROM clause and JOINs)
|
||||
base_table: Arc<BTreeTable>,
|
||||
// The view's output columns with their types
|
||||
@@ -108,6 +105,25 @@ impl IncrementalView {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Try to compile the SELECT statement into a DBSP circuit
|
||||
fn try_compile_circuit(
|
||||
select: &ast::Select,
|
||||
schema: &Schema,
|
||||
_base_table: &Arc<BTreeTable>,
|
||||
) -> Result<DbspCircuit> {
|
||||
// Build the logical plan from the SELECT statement
|
||||
let mut builder = LogicalPlanBuilder::new(schema);
|
||||
// Convert Select to a Stmt for the builder
|
||||
let stmt = ast::Stmt::Select(select.clone());
|
||||
let logical_plan = builder.build_statement(&stmt)?;
|
||||
|
||||
// Compile the logical plan to a DBSP circuit
|
||||
let compiler = DbspCompiler::new();
|
||||
let circuit = compiler.compile(&logical_plan)?;
|
||||
|
||||
Ok(circuit)
|
||||
}
|
||||
|
||||
/// Get an iterator over column names, using enumerated naming for unnamed columns
|
||||
pub fn column_names(&self) -> impl Iterator<Item = String> + '_ {
|
||||
self.columns.iter().enumerate().map(|(i, col)| {
|
||||
@@ -136,14 +152,6 @@ impl IncrementalView {
|
||||
false
|
||||
}
|
||||
|
||||
/// Apply filter operator to check if values pass the view's WHERE clause
|
||||
fn apply_filter(&self, values: &[Value]) -> bool {
|
||||
if let Some(ref filter_op) = self.filter_operator {
|
||||
filter_op.evaluate_predicate(values)
|
||||
} else {
|
||||
true
|
||||
}
|
||||
}
|
||||
pub fn from_sql(sql: &str, schema: &Schema) -> Result<Self> {
|
||||
let mut parser = Parser::new(sql.as_bytes());
|
||||
let cmd = parser.next_cmd()?;
|
||||
@@ -173,10 +181,6 @@ impl IncrementalView {
|
||||
// Extract output columns using the shared function
|
||||
let view_columns = extract_view_columns(&select, schema);
|
||||
|
||||
// Extract GROUP BY columns and aggregate functions
|
||||
let (group_by_columns, aggregate_functions, _old_output_names) =
|
||||
Self::extract_aggregation_info(&select);
|
||||
|
||||
let (join_tables, join_condition) = Self::extract_join_info(&select);
|
||||
if join_tables.is_some() || join_condition.is_some() {
|
||||
return Err(LimboError::ParseError(
|
||||
@@ -199,105 +203,43 @@ impl IncrementalView {
|
||||
));
|
||||
};
|
||||
|
||||
let base_table_column_names = base_table
|
||||
.columns
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(i, col)| col.name.clone().unwrap_or_else(|| format!("column_{i}")))
|
||||
.collect();
|
||||
|
||||
Self::new(
|
||||
name,
|
||||
Vec::new(), // Empty initial data
|
||||
where_predicate,
|
||||
select.clone(),
|
||||
base_table,
|
||||
base_table_column_names,
|
||||
view_columns,
|
||||
group_by_columns,
|
||||
aggregate_functions,
|
||||
schema,
|
||||
)
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new(
|
||||
name: String,
|
||||
initial_data: Vec<(i64, Vec<Value>)>,
|
||||
where_predicate: FilterPredicate,
|
||||
select_stmt: ast::Select,
|
||||
base_table: Arc<BTreeTable>,
|
||||
base_table_column_names: Vec<String>,
|
||||
columns: Vec<Column>,
|
||||
group_by_columns: Vec<String>,
|
||||
aggregate_functions: Vec<AggregateFunction>,
|
||||
schema: &Schema,
|
||||
) -> Result<Self> {
|
||||
let mut records = BTreeMap::new();
|
||||
|
||||
for (row_key, values) in initial_data {
|
||||
records.insert(row_key, values);
|
||||
}
|
||||
|
||||
// Create initial stream with row keys
|
||||
let mut zset = RowKeyZSet::new();
|
||||
for (row_key, values) in &records {
|
||||
use crate::incremental::hashable_row::HashableRow;
|
||||
let row = HashableRow::new(*row_key, values.clone());
|
||||
zset.insert(row, 1);
|
||||
}
|
||||
let records = BTreeMap::new();
|
||||
|
||||
// Create the tracker that will be shared by all operators
|
||||
let tracker = Arc::new(Mutex::new(ComputationTracker::new()));
|
||||
|
||||
// Create filter operator if we have a predicate
|
||||
let filter_operator = if !matches!(where_predicate, FilterPredicate::None) {
|
||||
let mut filter_op =
|
||||
FilterOperator::new(where_predicate.clone(), base_table_column_names.clone());
|
||||
filter_op.set_tracker(tracker.clone());
|
||||
Some(filter_op)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
// Compile the SELECT statement into a DBSP circuit
|
||||
let circuit = Self::try_compile_circuit(&select_stmt, schema, &base_table)?;
|
||||
|
||||
// Check if this is an aggregated view
|
||||
let is_aggregated = !group_by_columns.is_empty() || !aggregate_functions.is_empty();
|
||||
|
||||
// Create aggregate operator if needed
|
||||
let aggregate_operator = if is_aggregated {
|
||||
let mut agg_op = AggregateOperator::new(
|
||||
group_by_columns,
|
||||
aggregate_functions,
|
||||
base_table_column_names.clone(),
|
||||
);
|
||||
agg_op.set_tracker(tracker.clone());
|
||||
Some(agg_op)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// Only create project operator for non-aggregated views
|
||||
let project_operator = if !is_aggregated {
|
||||
let mut proj_op = ProjectOperator::from_select(
|
||||
&select_stmt,
|
||||
base_table_column_names.clone(),
|
||||
schema,
|
||||
)?;
|
||||
proj_op.set_tracker(tracker.clone());
|
||||
Some(proj_op)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
// Circuit will be initialized when we first call merge_delta
|
||||
let circuit_initialized = false;
|
||||
|
||||
Ok(Self {
|
||||
stream: RowKeyStream::from_zset(zset),
|
||||
stream: RowKeyStream::from_zset(RowKeyZSet::new()),
|
||||
name,
|
||||
records,
|
||||
where_predicate,
|
||||
select_stmt,
|
||||
filter_operator,
|
||||
project_operator,
|
||||
aggregate_operator,
|
||||
circuit,
|
||||
circuit_initialized,
|
||||
base_table,
|
||||
columns,
|
||||
populate_state: PopulateState::Start,
|
||||
@@ -338,46 +280,28 @@ impl IncrementalView {
|
||||
// Get the base table from referenced tables
|
||||
let table = &self.base_table;
|
||||
|
||||
// Build column list for SELECT clause
|
||||
let select_columns = if let Some(ref project_op) = self.project_operator {
|
||||
// Get the columns used by the projection operator
|
||||
let mut columns = Vec::new();
|
||||
for col in project_op.columns() {
|
||||
// Check if it's a simple column reference
|
||||
if let turso_parser::ast::Expr::Id(name) = &col.expr {
|
||||
columns.push(name.as_str().to_string());
|
||||
} else {
|
||||
// For expressions, we need all columns (for now)
|
||||
columns.clear();
|
||||
columns.push("*".to_string());
|
||||
break;
|
||||
}
|
||||
}
|
||||
if columns.is_empty() || columns.contains(&"*".to_string()) {
|
||||
"*".to_string()
|
||||
} else {
|
||||
// Add the columns and always include rowid
|
||||
columns.join(", ").to_string()
|
||||
}
|
||||
} else {
|
||||
// No projection, use all columns
|
||||
// Check if the table has a rowid alias (INTEGER PRIMARY KEY column)
|
||||
let has_rowid_alias = table.columns.iter().any(|col| col.is_rowid_alias);
|
||||
|
||||
// For now, select all columns since we don't have the static operators
|
||||
// The circuit will handle filtering and projection
|
||||
// If there's a rowid alias, we don't need to select rowid separately
|
||||
let select_clause = if has_rowid_alias {
|
||||
"*".to_string()
|
||||
} else {
|
||||
"*, rowid".to_string()
|
||||
};
|
||||
|
||||
// Build WHERE clause from filter operator
|
||||
let where_clause = if let Some(ref filter_op) = self.filter_operator {
|
||||
self.build_where_clause(filter_op.predicate())?
|
||||
} else {
|
||||
String::new()
|
||||
};
|
||||
// Build WHERE clause from the where_predicate
|
||||
let where_clause = self.build_where_clause(&self.where_predicate)?;
|
||||
|
||||
// Construct the final query
|
||||
let query = if where_clause.is_empty() {
|
||||
format!("SELECT {}, rowid FROM {}", select_columns, table.name)
|
||||
format!("SELECT {} FROM {}", select_clause, table.name)
|
||||
} else {
|
||||
format!(
|
||||
"SELECT {}, rowid FROM {} WHERE {}",
|
||||
select_columns, table.name, where_clause
|
||||
"SELECT {} FROM {} WHERE {}",
|
||||
select_clause, table.name, where_clause
|
||||
)
|
||||
};
|
||||
Ok(query)
|
||||
@@ -494,20 +418,40 @@ impl IncrementalView {
|
||||
let all_values: Vec<crate::types::Value> =
|
||||
row.get_values().cloned().collect();
|
||||
|
||||
// The last value should be the rowid
|
||||
let rowid = match all_values.last() {
|
||||
Some(crate::types::Value::Integer(id)) => *id,
|
||||
_ => {
|
||||
// This shouldn't happen - rowid must be an integer
|
||||
*rows_processed += 1;
|
||||
batch_count += 1;
|
||||
continue;
|
||||
}
|
||||
// Determine how to extract the rowid
|
||||
// If there's a rowid alias (INTEGER PRIMARY KEY), the rowid is one of the columns
|
||||
// Otherwise, it's the last value we explicitly selected
|
||||
let (rowid, values) = if let Some((idx, _)) =
|
||||
self.base_table.get_rowid_alias_column()
|
||||
{
|
||||
// The rowid is the value at the rowid alias column index
|
||||
let rowid = match all_values.get(idx) {
|
||||
Some(crate::types::Value::Integer(id)) => *id,
|
||||
_ => {
|
||||
// This shouldn't happen - rowid alias must be an integer
|
||||
*rows_processed += 1;
|
||||
batch_count += 1;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
// All values are table columns (no separate rowid was selected)
|
||||
(rowid, all_values)
|
||||
} else {
|
||||
// The last value is the explicitly selected rowid
|
||||
let rowid = match all_values.last() {
|
||||
Some(crate::types::Value::Integer(id)) => *id,
|
||||
_ => {
|
||||
// This shouldn't happen - rowid must be an integer
|
||||
*rows_processed += 1;
|
||||
batch_count += 1;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
// Get all values except the rowid
|
||||
let values = all_values[..all_values.len() - 1].to_vec();
|
||||
(rowid, values)
|
||||
};
|
||||
|
||||
// Get all values except the rowid
|
||||
let values = all_values[..all_values.len() - 1].to_vec();
|
||||
|
||||
// Add to batch delta - let merge_delta handle filtering and aggregation
|
||||
batch_delta.insert(rowid, values);
|
||||
|
||||
@@ -542,120 +486,6 @@ impl IncrementalView {
|
||||
}
|
||||
}
|
||||
|
||||
/// Extract GROUP BY columns and aggregate functions from SELECT statement
|
||||
fn extract_aggregation_info(
|
||||
select: &ast::Select,
|
||||
) -> (Vec<String>, Vec<AggregateFunction>, Vec<String>) {
|
||||
use turso_parser::ast::*;
|
||||
|
||||
let mut group_by_columns = Vec::new();
|
||||
let mut aggregate_functions = Vec::new();
|
||||
let mut output_column_names = Vec::new();
|
||||
|
||||
if let OneSelect::Select {
|
||||
ref group_by,
|
||||
ref columns,
|
||||
..
|
||||
} = select.body.select
|
||||
{
|
||||
// Extract GROUP BY columns
|
||||
if let Some(group_by) = group_by {
|
||||
for expr in &group_by.exprs {
|
||||
if let Some(col_name) = extract_column_name_from_expr(expr) {
|
||||
group_by_columns.push(col_name);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Extract aggregate functions and column names/aliases from SELECT list
|
||||
for result_col in columns {
|
||||
match result_col {
|
||||
ResultColumn::Expr(expr, alias) => {
|
||||
// Extract aggregate functions
|
||||
let mut found_aggregates = Vec::new();
|
||||
Self::extract_aggregates_from_expr(expr, &mut found_aggregates);
|
||||
|
||||
// Determine the output column name
|
||||
let col_name = if let Some(As::As(alias_name)) = alias {
|
||||
// Use the provided alias
|
||||
alias_name.as_str().to_string()
|
||||
} else if !found_aggregates.is_empty() {
|
||||
// Use the default name from the aggregate function
|
||||
found_aggregates[0].default_output_name()
|
||||
} else if let Some(name) = extract_column_name_from_expr(expr) {
|
||||
// Use the column name
|
||||
name
|
||||
} else {
|
||||
// Fallback to a generic name
|
||||
format!("column{}", output_column_names.len() + 1)
|
||||
};
|
||||
|
||||
output_column_names.push(col_name);
|
||||
aggregate_functions.extend(found_aggregates);
|
||||
}
|
||||
ResultColumn::Star => {
|
||||
// For SELECT *, we'd need to know the base table columns
|
||||
// This is handled elsewhere
|
||||
}
|
||||
ResultColumn::TableStar(_) => {
|
||||
// Similar to Star, but for a specific table
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
(group_by_columns, aggregate_functions, output_column_names)
|
||||
}
|
||||
|
||||
/// Recursively extract aggregate functions from an expression
|
||||
fn extract_aggregates_from_expr(
|
||||
expr: &ast::Expr,
|
||||
aggregate_functions: &mut Vec<AggregateFunction>,
|
||||
) {
|
||||
use crate::function::Func;
|
||||
use turso_parser::ast::*;
|
||||
|
||||
match expr {
|
||||
// Handle COUNT(*) and similar aggregate functions with *
|
||||
Expr::FunctionCallStar { name, .. } => {
|
||||
// FunctionCallStar is typically COUNT(*), which has 0 args
|
||||
if let Ok(func) = Func::resolve_function(name.as_str(), 0) {
|
||||
// Use the centralized mapping from operator.rs
|
||||
// For COUNT(*), we pass None as the input column
|
||||
if let Some(agg_func) = AggregateFunction::from_sql_function(&func, None) {
|
||||
aggregate_functions.push(agg_func);
|
||||
}
|
||||
}
|
||||
}
|
||||
Expr::FunctionCall { name, args, .. } => {
|
||||
// Regular function calls with arguments
|
||||
let arg_count = args.len();
|
||||
|
||||
if let Ok(func) = Func::resolve_function(name.as_str(), arg_count) {
|
||||
// Extract the input column if there's an argument
|
||||
let input_column = if arg_count > 0 {
|
||||
args.first().and_then(extract_column_name_from_expr)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// Use the centralized mapping from operator.rs
|
||||
if let Some(agg_func) =
|
||||
AggregateFunction::from_sql_function(&func, input_column)
|
||||
{
|
||||
aggregate_functions.push(agg_func);
|
||||
}
|
||||
}
|
||||
}
|
||||
// Recursively check binary expressions, etc.
|
||||
Expr::Binary(left, _, right) => {
|
||||
Self::extract_aggregates_from_expr(left, aggregate_functions);
|
||||
Self::extract_aggregates_from_expr(right, aggregate_functions);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
/// Extract JOIN information from SELECT statement
|
||||
#[allow(clippy::type_complexity)]
|
||||
pub fn extract_join_info(
|
||||
@@ -743,48 +573,36 @@ impl IncrementalView {
|
||||
|
||||
/// Get current data merged with transaction state
|
||||
pub fn current_data(&self, tx_state: Option<&ViewTransactionState>) -> Vec<(i64, Vec<Value>)> {
|
||||
// Start with committed records
|
||||
|
||||
if let Some(tx_state) = tx_state {
|
||||
// processed_delta = input delta for now. Need to apply operations
|
||||
let processed_delta = &tx_state.delta;
|
||||
// Use circuit to process uncommitted changes
|
||||
let mut uncommitted = DeltaSet::new();
|
||||
uncommitted.insert(self.base_table.name.clone(), tx_state.delta.clone());
|
||||
|
||||
// For non-aggregation views, merge the processed delta with committed records
|
||||
let mut result_map: BTreeMap<i64, Vec<Value>> = self.records.clone();
|
||||
|
||||
for (row, weight) in &processed_delta.changes {
|
||||
if *weight > 0 && self.apply_filter(&row.values) {
|
||||
result_map.insert(row.rowid, row.values.clone());
|
||||
} else if *weight < 0 {
|
||||
result_map.remove(&row.rowid);
|
||||
// Execute with uncommitted changes (won't affect circuit state)
|
||||
match self.circuit.execute(HashMap::new(), uncommitted) {
|
||||
Ok(processed_delta) => {
|
||||
// Merge processed delta with committed records
|
||||
let mut result_map: BTreeMap<i64, Vec<Value>> = self.records.clone();
|
||||
for (row, weight) in &processed_delta.changes {
|
||||
if *weight > 0 {
|
||||
result_map.insert(row.rowid, row.values.clone());
|
||||
} else if *weight < 0 {
|
||||
result_map.remove(&row.rowid);
|
||||
}
|
||||
}
|
||||
result_map.into_iter().collect()
|
||||
}
|
||||
Err(e) => {
|
||||
// Return error or panic - no fallback
|
||||
panic!("Failed to execute circuit with uncommitted data: {e:?}");
|
||||
}
|
||||
}
|
||||
|
||||
result_map.into_iter().collect()
|
||||
} else {
|
||||
// No transaction state: return committed records
|
||||
self.records.clone().into_iter().collect()
|
||||
}
|
||||
}
|
||||
|
||||
/// Apply filter operator to a delta if present
|
||||
fn apply_filter_to_delta(&mut self, delta: Delta) -> Delta {
|
||||
if let Some(ref mut filter_op) = self.filter_operator {
|
||||
filter_op.process_delta(delta)
|
||||
} else {
|
||||
delta
|
||||
}
|
||||
}
|
||||
|
||||
/// Apply aggregation operator to a delta if this is an aggregated view
|
||||
fn apply_aggregation_to_delta(&mut self, delta: Delta) -> Delta {
|
||||
if let Some(ref mut agg_op) = self.aggregate_operator {
|
||||
agg_op.process_delta(delta)
|
||||
} else {
|
||||
delta
|
||||
}
|
||||
}
|
||||
|
||||
/// Merge a delta of changes into the view's current state
|
||||
pub fn merge_delta(&mut self, delta: &Delta) {
|
||||
// Early return if delta is empty
|
||||
@@ -792,16 +610,33 @@ impl IncrementalView {
|
||||
return;
|
||||
}
|
||||
|
||||
// Apply operators in pipeline
|
||||
let mut current_delta = delta.clone();
|
||||
current_delta = self.apply_filter_to_delta(current_delta);
|
||||
// Use the circuit to process the delta
|
||||
let mut input_data = HashMap::new();
|
||||
input_data.insert(self.base_table.name.clone(), delta.clone());
|
||||
|
||||
// Apply projection operator if present (for non-aggregated views)
|
||||
if let Some(ref mut project_op) = self.project_operator {
|
||||
current_delta = project_op.process_delta(current_delta);
|
||||
// If circuit hasn't been initialized yet, initialize it first
|
||||
// This happens during populate_from_table
|
||||
if !self.circuit_initialized {
|
||||
// Initialize the circuit with empty state
|
||||
self.circuit
|
||||
.initialize(HashMap::new())
|
||||
.expect("Failed to initialize circuit");
|
||||
self.circuit_initialized = true;
|
||||
}
|
||||
|
||||
current_delta = self.apply_aggregation_to_delta(current_delta);
|
||||
// Execute the circuit to process the delta
|
||||
let current_delta = match self.circuit.execute(input_data.clone(), DeltaSet::empty()) {
|
||||
Ok(output) => {
|
||||
// Commit the changes to the circuit's internal state
|
||||
self.circuit
|
||||
.commit(input_data)
|
||||
.expect("Failed to commit to circuit");
|
||||
output
|
||||
}
|
||||
Err(e) => {
|
||||
panic!("Failed to execute circuit: {e:?}");
|
||||
}
|
||||
};
|
||||
|
||||
// Update records and stream with the processed delta
|
||||
let mut zset_delta = RowKeyZSet::new();
|
||||
@@ -819,441 +654,3 @@ impl IncrementalView {
|
||||
self.stream.apply_delta(&zset_delta);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::incremental::operator::{Delta, IncrementalOperator};
|
||||
use crate::schema::{BTreeTable, Column, Schema, Type};
|
||||
use crate::types::Value;
|
||||
use std::sync::Arc;
|
||||
fn create_test_schema() -> Schema {
|
||||
let mut schema = Schema::new(false);
|
||||
let table = BTreeTable {
|
||||
root_page: 1,
|
||||
name: "t".to_string(),
|
||||
columns: vec![
|
||||
Column {
|
||||
name: Some("a".to_string()),
|
||||
ty: Type::Integer,
|
||||
ty_str: "INTEGER".to_string(),
|
||||
primary_key: false,
|
||||
is_rowid_alias: false,
|
||||
notnull: false,
|
||||
default: None,
|
||||
unique: false,
|
||||
collation: None,
|
||||
hidden: false,
|
||||
},
|
||||
Column {
|
||||
name: Some("b".to_string()),
|
||||
ty: Type::Integer,
|
||||
ty_str: "INTEGER".to_string(),
|
||||
primary_key: false,
|
||||
is_rowid_alias: false,
|
||||
notnull: false,
|
||||
default: None,
|
||||
unique: false,
|
||||
collation: None,
|
||||
hidden: false,
|
||||
},
|
||||
Column {
|
||||
name: Some("c".to_string()),
|
||||
ty: Type::Integer,
|
||||
ty_str: "INTEGER".to_string(),
|
||||
primary_key: false,
|
||||
is_rowid_alias: false,
|
||||
notnull: false,
|
||||
default: None,
|
||||
unique: false,
|
||||
collation: None,
|
||||
hidden: false,
|
||||
},
|
||||
],
|
||||
primary_key_columns: vec![],
|
||||
has_rowid: true,
|
||||
is_strict: false,
|
||||
unique_sets: None,
|
||||
};
|
||||
schema.add_btree_table(Arc::new(table));
|
||||
schema
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_projection_simple_columns() {
|
||||
let schema = create_test_schema();
|
||||
let sql = "CREATE MATERIALIZED VIEW v AS SELECT a, b FROM t";
|
||||
|
||||
let view = IncrementalView::from_sql(sql, &schema).unwrap();
|
||||
|
||||
assert!(view.project_operator.is_some());
|
||||
let project_op = view.project_operator.as_ref().unwrap();
|
||||
|
||||
let mut delta = Delta::new();
|
||||
delta.insert(
|
||||
1,
|
||||
vec![Value::Integer(10), Value::Integer(20), Value::Integer(30)],
|
||||
);
|
||||
|
||||
let mut temp_project = project_op.clone();
|
||||
temp_project.initialize(delta);
|
||||
let result = temp_project.get_current_state();
|
||||
|
||||
let (output, _weight) = result.changes.first().unwrap();
|
||||
assert_eq!(output.values, vec![Value::Integer(10), Value::Integer(20)]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_projection_arithmetic_expression() {
|
||||
let schema = create_test_schema();
|
||||
let sql = "CREATE MATERIALIZED VIEW v AS SELECT a * 2 as doubled FROM t";
|
||||
|
||||
let view = IncrementalView::from_sql(sql, &schema).unwrap();
|
||||
|
||||
assert!(view.project_operator.is_some());
|
||||
let project_op = view.project_operator.as_ref().unwrap();
|
||||
|
||||
let mut delta = Delta::new();
|
||||
delta.insert(
|
||||
1,
|
||||
vec![Value::Integer(4), Value::Integer(2), Value::Integer(0)],
|
||||
);
|
||||
|
||||
let mut temp_project = project_op.clone();
|
||||
temp_project.initialize(delta);
|
||||
let result = temp_project.get_current_state();
|
||||
|
||||
let (output, _weight) = result.changes.first().unwrap();
|
||||
assert_eq!(output.values, vec![Value::Integer(8)]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_projection_multiple_expressions() {
|
||||
let schema = create_test_schema();
|
||||
let sql = "CREATE MATERIALIZED VIEW v AS SELECT a + b as sum, a - b as diff, c FROM t";
|
||||
|
||||
let view = IncrementalView::from_sql(sql, &schema).unwrap();
|
||||
|
||||
assert!(view.project_operator.is_some());
|
||||
let project_op = view.project_operator.as_ref().unwrap();
|
||||
|
||||
let mut delta = Delta::new();
|
||||
delta.insert(
|
||||
1,
|
||||
vec![Value::Integer(10), Value::Integer(3), Value::Integer(7)],
|
||||
);
|
||||
|
||||
let mut temp_project = project_op.clone();
|
||||
temp_project.initialize(delta);
|
||||
let result = temp_project.get_current_state();
|
||||
|
||||
let (output, _weight) = result.changes.first().unwrap();
|
||||
assert_eq!(
|
||||
output.values,
|
||||
vec![Value::Integer(13), Value::Integer(7), Value::Integer(7),]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_projection_function_call() {
|
||||
let schema = create_test_schema();
|
||||
let sql = "CREATE MATERIALIZED VIEW v AS SELECT abs(a - 300) as abs_diff, b FROM t";
|
||||
|
||||
let view = IncrementalView::from_sql(sql, &schema).unwrap();
|
||||
|
||||
assert!(view.project_operator.is_some());
|
||||
let project_op = view.project_operator.as_ref().unwrap();
|
||||
|
||||
let mut delta = Delta::new();
|
||||
delta.insert(
|
||||
1,
|
||||
vec![Value::Integer(255), Value::Integer(20), Value::Integer(30)],
|
||||
);
|
||||
|
||||
let mut temp_project = project_op.clone();
|
||||
temp_project.initialize(delta);
|
||||
let result = temp_project.get_current_state();
|
||||
|
||||
let (output, _weight) = result.changes.first().unwrap();
|
||||
// abs(255 - 300) = abs(-45) = 45
|
||||
assert_eq!(output.values, vec![Value::Integer(45), Value::Integer(20),]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_projection_mixed_columns_and_expressions() {
|
||||
let schema = create_test_schema();
|
||||
let sql = "CREATE MATERIALIZED VIEW v AS SELECT a, b * 2 as doubled, c, a + b + c as total FROM t";
|
||||
|
||||
let view = IncrementalView::from_sql(sql, &schema).unwrap();
|
||||
|
||||
assert!(view.project_operator.is_some());
|
||||
let project_op = view.project_operator.as_ref().unwrap();
|
||||
|
||||
let mut delta = Delta::new();
|
||||
delta.insert(
|
||||
1,
|
||||
vec![Value::Integer(1), Value::Integer(5), Value::Integer(3)],
|
||||
);
|
||||
|
||||
let mut temp_project = project_op.clone();
|
||||
temp_project.initialize(delta);
|
||||
let result = temp_project.get_current_state();
|
||||
|
||||
let (output, _weight) = result.changes.first().unwrap();
|
||||
assert_eq!(
|
||||
output.values,
|
||||
vec![
|
||||
Value::Integer(1),
|
||||
Value::Integer(10),
|
||||
Value::Integer(3),
|
||||
Value::Integer(9),
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_projection_complex_expression() {
|
||||
let schema = create_test_schema();
|
||||
let sql = "CREATE MATERIALIZED VIEW v AS SELECT (a * 2) + (b * 3) as weighted, c / 2 as half FROM t";
|
||||
|
||||
let view = IncrementalView::from_sql(sql, &schema).unwrap();
|
||||
|
||||
assert!(view.project_operator.is_some());
|
||||
let project_op = view.project_operator.as_ref().unwrap();
|
||||
|
||||
let mut delta = Delta::new();
|
||||
delta.insert(
|
||||
1,
|
||||
vec![Value::Integer(5), Value::Integer(2), Value::Integer(10)],
|
||||
);
|
||||
|
||||
let mut temp_project = project_op.clone();
|
||||
temp_project.initialize(delta);
|
||||
let result = temp_project.get_current_state();
|
||||
|
||||
let (output, _weight) = result.changes.first().unwrap();
|
||||
assert_eq!(output.values, vec![Value::Integer(16), Value::Integer(5),]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_projection_with_where_clause() {
|
||||
let schema = create_test_schema();
|
||||
let sql = "CREATE MATERIALIZED VIEW v AS SELECT a, a * 2 as doubled FROM t WHERE b > 2";
|
||||
|
||||
let view = IncrementalView::from_sql(sql, &schema).unwrap();
|
||||
|
||||
assert!(view.project_operator.is_some());
|
||||
assert!(view.filter_operator.is_some());
|
||||
|
||||
let project_op = view.project_operator.as_ref().unwrap();
|
||||
|
||||
let mut delta = Delta::new();
|
||||
delta.insert(
|
||||
1,
|
||||
vec![Value::Integer(4), Value::Integer(3), Value::Integer(0)],
|
||||
);
|
||||
|
||||
let mut temp_project = project_op.clone();
|
||||
temp_project.initialize(delta);
|
||||
let result = temp_project.get_current_state();
|
||||
|
||||
let (output, _weight) = result.changes.first().unwrap();
|
||||
assert_eq!(output.values, vec![Value::Integer(4), Value::Integer(8),]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_projection_more_output_columns_than_input() {
|
||||
let schema = create_test_schema();
|
||||
let sql = "CREATE MATERIALIZED VIEW v AS SELECT a, b, a * 2 as doubled_a, b * 3 as tripled_b, a + b as sum, hex(c) as hex_c FROM t";
|
||||
|
||||
let view = IncrementalView::from_sql(sql, &schema).unwrap();
|
||||
|
||||
assert!(view.project_operator.is_some());
|
||||
let project_op = view.project_operator.as_ref().unwrap();
|
||||
|
||||
let mut delta = Delta::new();
|
||||
delta.insert(
|
||||
1,
|
||||
vec![Value::Integer(5), Value::Integer(2), Value::Integer(15)],
|
||||
);
|
||||
|
||||
let mut temp_project = project_op.clone();
|
||||
temp_project.initialize(delta);
|
||||
let result = temp_project.get_current_state();
|
||||
|
||||
let (output, _weight) = result.changes.first().unwrap();
|
||||
// 3 input columns -> 6 output columns
|
||||
assert_eq!(
|
||||
output.values,
|
||||
vec![
|
||||
Value::Integer(5), // a
|
||||
Value::Integer(2), // b
|
||||
Value::Integer(10), // a * 2
|
||||
Value::Integer(6), // b * 3
|
||||
Value::Integer(7), // a + b
|
||||
Value::Text("3135".into()), // hex(15) - SQLite converts to string "15" then hex encodes
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_aggregation_count_with_group_by() {
|
||||
let schema = create_test_schema();
|
||||
let sql = "CREATE MATERIALIZED VIEW v AS SELECT a, COUNT(*) FROM t GROUP BY a";
|
||||
|
||||
let mut view = IncrementalView::from_sql(sql, &schema).unwrap();
|
||||
|
||||
// Verify the view has an aggregate operator
|
||||
assert!(view.aggregate_operator.is_some());
|
||||
|
||||
// Insert some test data
|
||||
let mut delta = Delta::new();
|
||||
delta.insert(
|
||||
1,
|
||||
vec![Value::Integer(1), Value::Integer(10), Value::Integer(100)],
|
||||
);
|
||||
delta.insert(
|
||||
2,
|
||||
vec![Value::Integer(2), Value::Integer(20), Value::Integer(200)],
|
||||
);
|
||||
delta.insert(
|
||||
3,
|
||||
vec![Value::Integer(1), Value::Integer(30), Value::Integer(300)],
|
||||
);
|
||||
|
||||
// Process the delta
|
||||
view.merge_delta(&delta);
|
||||
|
||||
// Verify we only processed the 3 rows we inserted
|
||||
assert_eq!(view.tracker.lock().unwrap().aggregation_updates, 3);
|
||||
|
||||
// Check the aggregated results
|
||||
let results = view.current_data(None);
|
||||
|
||||
// Should have 2 groups: a=1 with count=2, a=2 with count=1
|
||||
assert_eq!(results.len(), 2);
|
||||
|
||||
// Find the group with a=1
|
||||
let group1 = results
|
||||
.iter()
|
||||
.find(|(_, vals)| vals[0] == Value::Integer(1))
|
||||
.unwrap();
|
||||
assert_eq!(group1.1[0], Value::Integer(1)); // a=1
|
||||
assert_eq!(group1.1[1], Value::Integer(2)); // COUNT(*)=2
|
||||
|
||||
// Find the group with a=2
|
||||
let group2 = results
|
||||
.iter()
|
||||
.find(|(_, vals)| vals[0] == Value::Integer(2))
|
||||
.unwrap();
|
||||
assert_eq!(group2.1[0], Value::Integer(2)); // a=2
|
||||
assert_eq!(group2.1[1], Value::Integer(1)); // COUNT(*)=1
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_aggregation_sum_with_filter() {
|
||||
let schema = create_test_schema();
|
||||
let sql = "CREATE MATERIALIZED VIEW v AS SELECT SUM(b) FROM t WHERE a > 1";
|
||||
|
||||
let mut view = IncrementalView::from_sql(sql, &schema).unwrap();
|
||||
|
||||
assert!(view.aggregate_operator.is_some());
|
||||
assert!(view.filter_operator.is_some());
|
||||
|
||||
let mut delta = Delta::new();
|
||||
delta.insert(
|
||||
1,
|
||||
vec![Value::Integer(1), Value::Integer(10), Value::Integer(100)],
|
||||
);
|
||||
delta.insert(
|
||||
2,
|
||||
vec![Value::Integer(2), Value::Integer(20), Value::Integer(200)],
|
||||
);
|
||||
delta.insert(
|
||||
3,
|
||||
vec![Value::Integer(3), Value::Integer(30), Value::Integer(300)],
|
||||
);
|
||||
|
||||
view.merge_delta(&delta);
|
||||
|
||||
// Should filter all 3 rows
|
||||
assert_eq!(view.tracker.lock().unwrap().filter_evaluations, 3);
|
||||
// But only aggregate the 2 that passed the filter (a > 1)
|
||||
assert_eq!(view.tracker.lock().unwrap().aggregation_updates, 2);
|
||||
|
||||
let results = view.current_data(None);
|
||||
|
||||
// Should have 1 row with sum of b where a > 1
|
||||
assert_eq!(results.len(), 1);
|
||||
assert_eq!(results[0].1[0], Value::Integer(50)); // SUM(b) = 20 + 30
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_aggregation_incremental_updates() {
|
||||
let schema = create_test_schema();
|
||||
let sql = "CREATE MATERIALIZED VIEW v AS SELECT a, COUNT(*), SUM(b) FROM t GROUP BY a";
|
||||
|
||||
let mut view = IncrementalView::from_sql(sql, &schema).unwrap();
|
||||
|
||||
// Initial insert
|
||||
let mut delta1 = Delta::new();
|
||||
delta1.insert(
|
||||
1,
|
||||
vec![Value::Integer(1), Value::Integer(10), Value::Integer(100)],
|
||||
);
|
||||
delta1.insert(
|
||||
2,
|
||||
vec![Value::Integer(1), Value::Integer(20), Value::Integer(200)],
|
||||
);
|
||||
|
||||
view.merge_delta(&delta1);
|
||||
|
||||
// Verify we processed exactly 2 rows for the first batch
|
||||
assert_eq!(view.tracker.lock().unwrap().aggregation_updates, 2);
|
||||
|
||||
// Check initial state
|
||||
let results1 = view.current_data(None);
|
||||
assert_eq!(results1.len(), 1);
|
||||
assert_eq!(results1[0].1[1], Value::Integer(2)); // COUNT(*)=2
|
||||
assert_eq!(results1[0].1[2], Value::Integer(30)); // SUM(b)=30
|
||||
|
||||
// Reset counter to track second batch separately
|
||||
view.tracker.lock().unwrap().aggregation_updates = 0;
|
||||
|
||||
// Add more data
|
||||
let mut delta2 = Delta::new();
|
||||
delta2.insert(
|
||||
3,
|
||||
vec![Value::Integer(1), Value::Integer(5), Value::Integer(300)],
|
||||
);
|
||||
delta2.insert(
|
||||
4,
|
||||
vec![Value::Integer(2), Value::Integer(15), Value::Integer(400)],
|
||||
);
|
||||
|
||||
view.merge_delta(&delta2);
|
||||
|
||||
// Should only process the 2 new rows, not recompute everything
|
||||
assert_eq!(view.tracker.lock().unwrap().aggregation_updates, 2);
|
||||
|
||||
// Check updated state
|
||||
let results2 = view.current_data(None);
|
||||
assert_eq!(results2.len(), 2);
|
||||
|
||||
// Group a=1
|
||||
let group1 = results2
|
||||
.iter()
|
||||
.find(|(_, vals)| vals[0] == Value::Integer(1))
|
||||
.unwrap();
|
||||
assert_eq!(group1.1[1], Value::Integer(3)); // COUNT(*)=3
|
||||
assert_eq!(group1.1[2], Value::Integer(35)); // SUM(b)=35
|
||||
|
||||
// Group a=2
|
||||
let group2 = results2
|
||||
.iter()
|
||||
.find(|(_, vals)| vals[0] == Value::Integer(2))
|
||||
.unwrap();
|
||||
assert_eq!(group2.1[1], Value::Integer(1)); // COUNT(*)=1
|
||||
assert_eq!(group2.1[2], Value::Integer(15)); // SUM(b)=15
|
||||
}
|
||||
}
|
||||
|
||||
3076
core/translate/logical.rs
Normal file
3076
core/translate/logical.rs
Normal file
File diff suppressed because it is too large
Load Diff
@@ -21,6 +21,7 @@ pub(crate) mod group_by;
|
||||
pub(crate) mod index;
|
||||
pub(crate) mod insert;
|
||||
pub(crate) mod integrity_check;
|
||||
pub(crate) mod logical;
|
||||
pub(crate) mod main_loop;
|
||||
pub(crate) mod optimizer;
|
||||
pub(crate) mod order_by;
|
||||
|
||||
@@ -385,3 +385,174 @@ do_execsql_test_on_specific_db {:memory:} matview-projections {
|
||||
SELECT * from v;
|
||||
} {4|3|7|22|3
|
||||
3|4|7|22|3}
|
||||
|
||||
do_execsql_test_on_specific_db {:memory:} matview-rollback-insert {
|
||||
CREATE TABLE t(a INTEGER, b INTEGER);
|
||||
INSERT INTO t VALUES (1, 10), (2, 20), (3, 30);
|
||||
|
||||
CREATE MATERIALIZED VIEW v AS
|
||||
SELECT * FROM t WHERE b > 15;
|
||||
|
||||
SELECT * FROM v ORDER BY a;
|
||||
|
||||
BEGIN;
|
||||
INSERT INTO t VALUES (4, 40), (5, 50);
|
||||
SELECT * FROM v ORDER BY a;
|
||||
ROLLBACK;
|
||||
|
||||
SELECT * FROM v ORDER BY a;
|
||||
} {2|20
|
||||
3|30
|
||||
2|20
|
||||
3|30
|
||||
4|40
|
||||
5|50
|
||||
2|20
|
||||
3|30}
|
||||
|
||||
do_execsql_test_on_specific_db {:memory:} matview-rollback-delete {
|
||||
CREATE TABLE t(a INTEGER, b INTEGER);
|
||||
INSERT INTO t VALUES (1, 10), (2, 20), (3, 30), (4, 40);
|
||||
|
||||
CREATE MATERIALIZED VIEW v AS
|
||||
SELECT * FROM t WHERE b > 15;
|
||||
|
||||
SELECT * FROM v ORDER BY a;
|
||||
|
||||
BEGIN;
|
||||
DELETE FROM t WHERE a IN (2, 3);
|
||||
SELECT * FROM v ORDER BY a;
|
||||
ROLLBACK;
|
||||
|
||||
SELECT * FROM v ORDER BY a;
|
||||
} {2|20
|
||||
3|30
|
||||
4|40
|
||||
4|40
|
||||
2|20
|
||||
3|30
|
||||
4|40}
|
||||
|
||||
do_execsql_test_on_specific_db {:memory:} matview-rollback-update {
|
||||
CREATE TABLE t(a INTEGER, b INTEGER);
|
||||
INSERT INTO t VALUES (1, 10), (2, 20), (3, 30);
|
||||
|
||||
CREATE MATERIALIZED VIEW v AS
|
||||
SELECT * FROM t WHERE b > 15;
|
||||
|
||||
SELECT * FROM v ORDER BY a;
|
||||
|
||||
BEGIN;
|
||||
UPDATE t SET b = 5 WHERE a = 2;
|
||||
UPDATE t SET b = 35 WHERE a = 1;
|
||||
SELECT * FROM v ORDER BY a;
|
||||
ROLLBACK;
|
||||
|
||||
SELECT * FROM v ORDER BY a;
|
||||
} {2|20
|
||||
3|30
|
||||
1|35
|
||||
3|30
|
||||
2|20
|
||||
3|30}
|
||||
|
||||
do_execsql_test_on_specific_db {:memory:} matview-rollback-aggregation {
|
||||
CREATE TABLE sales(product_id INTEGER, amount INTEGER);
|
||||
INSERT INTO sales VALUES (1, 100), (1, 200), (2, 150), (2, 250);
|
||||
|
||||
CREATE MATERIALIZED VIEW product_totals AS
|
||||
SELECT product_id, SUM(amount) as total, COUNT(*) as cnt
|
||||
FROM sales
|
||||
GROUP BY product_id;
|
||||
|
||||
SELECT * FROM product_totals ORDER BY product_id;
|
||||
|
||||
BEGIN;
|
||||
INSERT INTO sales VALUES (1, 50), (3, 300);
|
||||
SELECT * FROM product_totals ORDER BY product_id;
|
||||
ROLLBACK;
|
||||
|
||||
SELECT * FROM product_totals ORDER BY product_id;
|
||||
} {1|300|2
|
||||
2|400|2
|
||||
1|350|3
|
||||
2|400|2
|
||||
3|300|1
|
||||
1|300|2
|
||||
2|400|2}
|
||||
|
||||
do_execsql_test_on_specific_db {:memory:} matview-rollback-mixed-operations {
|
||||
CREATE TABLE orders(id INTEGER PRIMARY KEY, customer INTEGER, amount INTEGER);
|
||||
INSERT INTO orders VALUES (1, 100, 50), (2, 200, 75), (3, 100, 25);
|
||||
|
||||
CREATE MATERIALIZED VIEW customer_totals AS
|
||||
SELECT customer, SUM(amount) as total, COUNT(*) as cnt
|
||||
FROM orders
|
||||
GROUP BY customer;
|
||||
|
||||
SELECT * FROM customer_totals ORDER BY customer;
|
||||
|
||||
BEGIN;
|
||||
INSERT INTO orders VALUES (4, 100, 100);
|
||||
UPDATE orders SET amount = 150 WHERE id = 2;
|
||||
DELETE FROM orders WHERE id = 3;
|
||||
SELECT * FROM customer_totals ORDER BY customer;
|
||||
ROLLBACK;
|
||||
|
||||
SELECT * FROM customer_totals ORDER BY customer;
|
||||
} {100|75|2
|
||||
200|75|1
|
||||
100|150|2
|
||||
200|150|1
|
||||
100|75|2
|
||||
200|75|1}
|
||||
|
||||
do_execsql_test_on_specific_db {:memory:} matview-rollback-filtered-aggregation {
|
||||
CREATE TABLE transactions(id INTEGER, account INTEGER, amount INTEGER, type TEXT);
|
||||
INSERT INTO transactions VALUES
|
||||
(1, 100, 50, 'deposit'),
|
||||
(2, 100, 30, 'withdraw'),
|
||||
(3, 200, 100, 'deposit'),
|
||||
(4, 200, 40, 'withdraw');
|
||||
|
||||
CREATE MATERIALIZED VIEW deposits AS
|
||||
SELECT account, SUM(amount) as total_deposits, COUNT(*) as cnt
|
||||
FROM transactions
|
||||
WHERE type = 'deposit'
|
||||
GROUP BY account;
|
||||
|
||||
SELECT * FROM deposits ORDER BY account;
|
||||
|
||||
BEGIN;
|
||||
INSERT INTO transactions VALUES (5, 100, 75, 'deposit');
|
||||
UPDATE transactions SET amount = 60 WHERE id = 1;
|
||||
DELETE FROM transactions WHERE id = 3;
|
||||
SELECT * FROM deposits ORDER BY account;
|
||||
ROLLBACK;
|
||||
|
||||
SELECT * FROM deposits ORDER BY account;
|
||||
} {100|50|1
|
||||
200|100|1
|
||||
100|135|2
|
||||
100|50|1
|
||||
200|100|1}
|
||||
|
||||
do_execsql_test_on_specific_db {:memory:} matview-rollback-empty-view {
|
||||
CREATE TABLE t(a INTEGER, b INTEGER);
|
||||
INSERT INTO t VALUES (1, 5), (2, 8);
|
||||
|
||||
CREATE MATERIALIZED VIEW v AS
|
||||
SELECT * FROM t WHERE b > 10;
|
||||
|
||||
SELECT COUNT(*) FROM v;
|
||||
|
||||
BEGIN;
|
||||
INSERT INTO t VALUES (3, 15), (4, 20);
|
||||
SELECT * FROM v ORDER BY a;
|
||||
ROLLBACK;
|
||||
|
||||
SELECT COUNT(*) FROM v;
|
||||
} {0
|
||||
3|15
|
||||
4|20
|
||||
0}
|
||||
|
||||
Reference in New Issue
Block a user