diff --git a/core/incremental/compiler.rs b/core/incremental/compiler.rs index 6b9996a85..e15fc6dd2 100644 --- a/core/incremental/compiler.rs +++ b/core/incremental/compiler.rs @@ -7,16 +7,16 @@ use crate::incremental::dbsp::Delta; use crate::incremental::expr_compiler::CompiledExpression; -use crate::incremental::hashable_row::HashableRow; use crate::incremental::operator::{ EvalState, FilterOperator, FilterPredicate, IncrementalOperator, InputOperator, ProjectOperator, }; -use crate::storage::btree::{BTreeCursor, BTreeKey}; +use crate::incremental::persistence::WriteRow; +use crate::storage::btree::BTreeCursor; // Note: logical module must be made pub(crate) in translate/mod.rs use crate::translate::logical::{ BinaryOperator, LogicalExpr, LogicalPlan, LogicalSchema, SchemaRef, }; -use crate::types::{IOResult, SeekKey, SeekOp, SeekResult, Value}; +use crate::types::{IOResult, SeekKey, Value}; use crate::Pager; use crate::{return_and_restore_if_io, return_if_io, LimboError, Result}; use std::collections::HashMap; @@ -27,119 +27,6 @@ use std::sync::Arc; // The state table is always a key-value store with 3 columns: key, state, and weight. const OPERATOR_COLUMNS: usize = 3; -/// State machine for writing a row to the materialized view -#[derive(Debug)] -pub enum WriteViewRow { - /// Initial empty state - Empty, - - /// Reading existing record to get current weight - GetRecord, - - /// Deleting the row (when final weight <= 0) - Delete, - - /// Inserting/updating the row with new weight - Insert { - /// The final weight to write - final_weight: isize, - }, - - /// Completed processing this row - Done, -} - -impl WriteViewRow { - fn new() -> Self { - Self::Empty - } - fn write_row( - &mut self, - cursor: &mut BTreeCursor, - row: HashableRow, - weight: isize, - ) -> Result> { - loop { - match self { - WriteViewRow::Empty => { - let key = SeekKey::TableRowId(row.rowid); - let res = return_if_io!(cursor.seek(key, SeekOp::GE { eq_only: true })); - match res { - SeekResult::Found => *self = WriteViewRow::GetRecord, - _ => { - *self = WriteViewRow::Insert { - final_weight: weight, - } - } - } - } - WriteViewRow::GetRecord => { - let existing_record = return_if_io!(cursor.record()); - let r = existing_record.ok_or_else(|| { - crate::LimboError::InternalError(format!( - "Found rowid {} in storage but could not read record", - row.rowid - )) - })?; - let values = r.get_values(); - - // last value should contain the weight - let existing_weight = match values.last() { - Some(ref_val) => match ref_val.to_owned() { - Value::Integer(w) => w as isize, - _ => { - return Err(crate::LimboError::InternalError(format!( - "Invalid weight value in storage for rowid {}", - row.rowid - ))) - } - }, - None => { - return Err(crate::LimboError::InternalError(format!( - "No weight value found in storage for rowid {}", - row.rowid - ))) - } - }; - let final_weight = existing_weight + weight; - if final_weight <= 0 { - *self = WriteViewRow::Delete - } else { - *self = WriteViewRow::Insert { final_weight } - } - } - WriteViewRow::Delete => { - // Delete the row. Important: when delete returns I/O, the btree operation - // has already completed in memory, so mark as Done to avoid retry - *self = WriteViewRow::Done; - return_if_io!(cursor.delete()); - } - WriteViewRow::Insert { final_weight } => { - let key = SeekKey::TableRowId(row.rowid); - return_if_io!(cursor.seek(key, SeekOp::GE { eq_only: true })); - - // Create the record values: row values + weight - let mut values = row.values.clone(); - values.push(Value::Integer(*final_weight as i64)); - - // Create an ImmutableRecord from the values - let immutable_record = - crate::types::ImmutableRecord::from_values(&values, values.len()); - let btree_key = BTreeKey::new_table_rowid(row.rowid, Some(&immutable_record)); - // Insert the row. Important: when insert returns I/O, the btree operation - // has already completed in memory, so mark as Done to avoid retry - *self = WriteViewRow::Done; - return_if_io!(cursor.insert(&btree_key)); - } - WriteViewRow::Done => { - break; - } - } - } - Ok(IOResult::Done(())) - } -} - /// State machine for commit operations pub enum CommitState { /// Initial state - ready to start commit @@ -160,7 +47,7 @@ pub enum CommitState { /// Current index in delta.changes being processed current_index: usize, /// State for writing individual rows - write_row_state: WriteViewRow, + write_row_state: WriteRow, /// Cursor for view data btree - created fresh for each row view_cursor: Box, }, @@ -537,7 +424,7 @@ impl DbspCircuit { self.commit_state = CommitState::UpdateView { delta, current_index: 0, - write_row_state: WriteViewRow::new(), + write_row_state: WriteRow::new(), view_cursor, }; } @@ -554,9 +441,9 @@ impl DbspCircuit { } else { let (row, weight) = delta.changes[*current_index].clone(); - // If we're starting a new row (Empty state), we need a fresh cursor + // If we're starting a new row (GetRecord state), we need a fresh cursor // due to btree cursor state machine limitations - if matches!(write_row_state, WriteViewRow::Empty) { + if matches!(write_row_state, WriteRow::GetRecord) { *view_cursor = Box::new(BTreeCursor::new_table( None, pager.clone(), @@ -565,10 +452,19 @@ impl DbspCircuit { )); } + // Build the view row format: row values + weight + let key = SeekKey::TableRowId(row.rowid); + let row_values = row.values.clone(); + let build_fn = move |final_weight: isize| -> Vec { + let mut values = row_values.clone(); + values.push(Value::Integer(final_weight as i64)); + values + }; + return_and_restore_if_io!( &mut self.commit_state, state, - write_row_state.write_row(view_cursor, row, weight) + write_row_state.write_row(view_cursor, key, build_fn, weight) ); // Move to next row @@ -587,7 +483,7 @@ impl DbspCircuit { self.commit_state = CommitState::UpdateView { delta, current_index: *current_index + 1, - write_row_state: WriteViewRow::new(), + write_row_state: WriteRow::new(), view_cursor, }; } diff --git a/core/incremental/mod.rs b/core/incremental/mod.rs index 755a27351..285c70978 100644 --- a/core/incremental/mod.rs +++ b/core/incremental/mod.rs @@ -4,4 +4,5 @@ pub mod dbsp; pub mod expr_compiler; pub mod hashable_row; pub mod operator; +pub mod persistence; pub mod view; diff --git a/core/incremental/operator.rs b/core/incremental/operator.rs index 46a933b87..8d4490826 100644 --- a/core/incremental/operator.rs +++ b/core/incremental/operator.rs @@ -6,8 +6,9 @@ use crate::function::{AggFunc, Func}; use crate::incremental::dbsp::Delta; use crate::incremental::expr_compiler::CompiledExpression; use crate::incremental::hashable_row::HashableRow; -use crate::storage::btree::{BTreeCursor, BTreeKey}; -use crate::types::{IOResult, SeekKey, SeekOp, SeekResult, Text}; +use crate::incremental::persistence::{ReadRecord, WriteRow}; +use crate::storage::btree::BTreeCursor; +use crate::types::{IOResult, SeekKey, Text}; use crate::{ return_and_restore_if_io, return_if_io, Connection, Database, Result, SymbolTable, Value, }; @@ -17,160 +18,6 @@ use std::sync::{Arc, Mutex}; use turso_macros::match_ignore_ascii_case; use turso_parser::ast::{As, Expr, Literal, Name, OneSelect, Operator, ResultColumn}; -#[derive(Debug)] -pub enum ReadRecord { - GetRecord, - Done { state: Option }, -} - -impl ReadRecord { - fn new() -> Self { - ReadRecord::GetRecord - } - - fn read_record( - &mut self, - key: SeekKey, - aggregates: &[AggregateFunction], - cursor: &mut BTreeCursor, - ) -> Result>> { - loop { - match self { - ReadRecord::GetRecord => { - let res = return_if_io!(cursor.seek(key.clone(), SeekOp::GE { eq_only: true })); - if !matches!(res, SeekResult::Found) { - *self = ReadRecord::Done { state: None }; - } else { - let record = return_if_io!(cursor.record()); - let r = record.ok_or_else(|| { - crate::LimboError::InternalError(format!( - "Found key {key:?} in aggregate storage but could not read record" - )) - })?; - let values = r.get_values(); - let blob = values[1].to_owned(); - - let (state, _group_key) = match blob { - Value::Blob(blob) => AggregateState::from_blob(&blob, aggregates) - .ok_or_else(|| { - crate::LimboError::InternalError(format!( - "Cannot deserialize aggregate state {blob:?}", - )) - }), - _ => Err(crate::LimboError::ParseError( - "Value in aggregator not blob".to_string(), - )), - }?; - *self = ReadRecord::Done { state: Some(state) } - } - } - ReadRecord::Done { state } => return Ok(IOResult::Done(state.clone())), - } - } - } -} - -#[derive(Debug)] -pub(crate) enum WriteRecord { - GetRecord, - Delete { final_weight: isize }, - Insert { final_weight: isize }, - Done, -} -impl WriteRecord { - fn new() -> Self { - WriteRecord::GetRecord - } - - fn write_record( - &mut self, - key: SeekKey, - record: HashableRow, - weight: isize, - cursor: &mut BTreeCursor, - ) -> Result> { - loop { - match self { - WriteRecord::GetRecord => { - let res = return_if_io!(cursor.seek(key.clone(), SeekOp::GE { eq_only: true })); - if !matches!(res, SeekResult::Found) { - *self = WriteRecord::Insert { - final_weight: weight, - }; - } else { - let existing_record = return_if_io!(cursor.record()); - let r = existing_record.ok_or_else(|| { - crate::LimboError::InternalError(format!( - "Found key {key:?} in aggregate storage but could not read record" - )) - })?; - let values = r.get_values(); - // values[2] should contain the weight - let existing_weight = match values[2].to_owned() { - Value::Integer(w) => w as isize, - _ => { - return Err(crate::LimboError::InternalError(format!( - "Invalid weight value in aggregate storage for key {key:?}" - ))) - } - }; - let final_weight = existing_weight + weight; - if final_weight <= 0 { - *self = WriteRecord::Delete { final_weight } - } else { - *self = WriteRecord::Insert { final_weight } - } - } - } - WriteRecord::Delete { final_weight: _ } => { - let res = return_if_io!(cursor.seek(key.clone(), SeekOp::GE { eq_only: true })); - if !matches!(res, SeekResult::Found) { - return Err(crate::LimboError::InternalError(format!( - "record not found for {key:?}, but we had just GetRecord! Should not be possible" - ))); - } - // Done - row was deleted and weights cancel out. - // If we iniated the delete we will complete, so Done has to be set - // before so we don't come back here. - *self = WriteRecord::Done; - return_if_io!(cursor.delete()); - } - WriteRecord::Insert { final_weight } => { - return_if_io!(cursor.seek(key.clone(), SeekOp::GE { eq_only: true })); - // Build the key and insert the record - let key_i64 = match key { - SeekKey::TableRowId(id) => id, - _ => { - return Err(crate::LimboError::InternalError( - "Expected TableRowId for aggregate storage".to_string(), - )) - } - }; - // Create the record values: key, blob, weight - let record_values = vec![ - Value::Integer(key_i64), - record.values[0].clone(), // The blob with serialized state - Value::Integer(*final_weight as i64), - ]; - - // Create an ImmutableRecord from the values - let immutable_record = crate::types::ImmutableRecord::from_values( - &record_values, - record_values.len(), - ); - let btree_key = BTreeKey::new_table_rowid(key_i64, Some(&immutable_record)); - - *self = WriteRecord::Done; - return_if_io!(cursor.insert(&btree_key)); - } - WriteRecord::Done => { - return Ok(IOResult::Done(())); - } - } - } - } -} - type ComputedStates = HashMap, AggregateState)>; // group_key_str -> (group_key, state) #[derive(Debug)] enum AggregateCommitState { @@ -182,7 +29,7 @@ enum AggregateCommitState { delta: Delta, computed_states: ComputedStates, current_idx: usize, - write_record: WriteRecord, + write_row: WriteRow, }, Done { delta: Delta, @@ -1384,7 +1231,7 @@ impl AggregateState { /// Deserialize aggregate state from a binary blob /// Returns the aggregate state and the group key values - fn from_blob(blob: &[u8], aggregates: &[AggregateFunction]) -> Option<(Self, Vec)> { + pub fn from_blob(blob: &[u8], aggregates: &[AggregateFunction]) -> Option<(Self, Vec)> { let mut cursor = 0; // Check version byte @@ -1768,14 +1615,14 @@ impl IncrementalOperator for AggregateOperator { delta: output_delta, computed_states, current_idx: 0, - write_record: WriteRecord::new(), + write_row: WriteRow::new(), }; } AggregateCommitState::PersistDelta { delta, computed_states, current_idx, - write_record, + write_row, } => { let states_vec: Vec<_> = computed_states.iter().collect(); @@ -1795,10 +1642,25 @@ impl IncrementalOperator for AggregateOperator { let state_blob = agg_state.to_blob(&self.aggregates, group_key); let blob_row = HashableRow::new(0, vec![Value::Blob(state_blob)]); + // Build the aggregate storage format: [key, blob, weight] + let seek_key_clone = seek_key.clone(); + let blob_value = blob_row.values[0].clone(); + let build_fn = move |final_weight: isize| -> Vec { + let key_i64 = match seek_key_clone.clone() { + SeekKey::TableRowId(id) => id, + _ => panic!("Expected TableRowId"), + }; + vec![ + Value::Integer(key_i64), + blob_value.clone(), // The blob with serialized state + Value::Integer(final_weight as i64), + ] + }; + return_and_restore_if_io!( &mut self.commit_state, state, - write_record.write_record(seek_key, blob_row, weight, cursor) + write_row.write_row(cursor, seek_key, build_fn, weight) ); let delta = std::mem::take(delta); @@ -1808,7 +1670,7 @@ impl IncrementalOperator for AggregateOperator { delta, computed_states, current_idx: *current_idx + 1, - write_record: WriteRecord::new(), // Reset for next write + write_row: WriteRow::new(), // Reset for next write }; } } diff --git a/core/incremental/persistence.rs b/core/incremental/persistence.rs new file mode 100644 index 000000000..381b406aa --- /dev/null +++ b/core/incremental/persistence.rs @@ -0,0 +1,176 @@ +use crate::incremental::operator::{AggregateFunction, AggregateState}; +use crate::storage::btree::{BTreeCursor, BTreeKey}; +use crate::types::{IOResult, SeekKey, SeekOp, SeekResult}; +use crate::{return_if_io, Result, Value}; + +#[derive(Debug, Default)] +pub enum ReadRecord { + #[default] + GetRecord, + Done { + state: Option, + }, +} + +impl ReadRecord { + pub fn new() -> Self { + Self::default() + } + + pub fn read_record( + &mut self, + key: SeekKey, + aggregates: &[AggregateFunction], + cursor: &mut BTreeCursor, + ) -> Result>> { + loop { + match self { + ReadRecord::GetRecord => { + let res = return_if_io!(cursor.seek(key.clone(), SeekOp::GE { eq_only: true })); + if !matches!(res, SeekResult::Found) { + *self = ReadRecord::Done { state: None }; + } else { + let record = return_if_io!(cursor.record()); + let r = record.ok_or_else(|| { + crate::LimboError::InternalError(format!( + "Found key {key:?} in aggregate storage but could not read record" + )) + })?; + let values = r.get_values(); + let blob = values[1].to_owned(); + + let (state, _group_key) = match blob { + Value::Blob(blob) => AggregateState::from_blob(&blob, aggregates) + .ok_or_else(|| { + crate::LimboError::InternalError(format!( + "Cannot deserialize aggregate state {blob:?}", + )) + }), + _ => Err(crate::LimboError::ParseError( + "Value in aggregator not blob".to_string(), + )), + }?; + *self = ReadRecord::Done { state: Some(state) } + } + } + ReadRecord::Done { state } => return Ok(IOResult::Done(state.clone())), + } + } + } +} + +#[derive(Debug, Default)] +pub enum WriteRow { + #[default] + GetRecord, + Delete, + Insert { + final_weight: isize, + }, + Done, +} + +impl WriteRow { + pub fn new() -> Self { + Self::default() + } + + /// Write a row with weight management. + /// + /// # Arguments + /// * `cursor` - BTree cursor for the storage + /// * `key` - The key to seek (TableRowId) + /// * `build_record` - Function that builds the record values to insert. + /// Takes the final_weight and returns the complete record values. + /// * `weight` - The weight delta to apply + pub fn write_row( + &mut self, + cursor: &mut BTreeCursor, + key: SeekKey, + build_record: F, + weight: isize, + ) -> Result> + where + F: Fn(isize) -> Vec, + { + loop { + match self { + WriteRow::GetRecord => { + let res = return_if_io!(cursor.seek(key.clone(), SeekOp::GE { eq_only: true })); + if !matches!(res, SeekResult::Found) { + *self = WriteRow::Insert { + final_weight: weight, + }; + } else { + let existing_record = return_if_io!(cursor.record()); + let r = existing_record.ok_or_else(|| { + crate::LimboError::InternalError(format!( + "Found key {key:?} in storage but could not read record" + )) + })?; + let values = r.get_values(); + + // Weight is always the last value + let existing_weight = match values.last() { + Some(val) => match val.to_owned() { + Value::Integer(w) => w as isize, + _ => { + return Err(crate::LimboError::InternalError(format!( + "Invalid weight value in storage for key {key:?}" + ))) + } + }, + None => { + return Err(crate::LimboError::InternalError(format!( + "No weight value found in storage for key {key:?}" + ))) + } + }; + + let final_weight = existing_weight + weight; + if final_weight <= 0 { + *self = WriteRow::Delete + } else { + *self = WriteRow::Insert { final_weight } + } + } + } + WriteRow::Delete => { + // Mark as Done before delete to avoid retry on I/O + *self = WriteRow::Done; + return_if_io!(cursor.delete()); + } + WriteRow::Insert { final_weight } => { + return_if_io!(cursor.seek(key.clone(), SeekOp::GE { eq_only: true })); + + // Extract the row ID from the key + let key_i64 = match key { + SeekKey::TableRowId(id) => id, + _ => { + return Err(crate::LimboError::InternalError( + "Expected TableRowId for storage".to_string(), + )) + } + }; + + // Build the record values using the provided function + let record_values = build_record(*final_weight); + + // Create an ImmutableRecord from the values + let immutable_record = crate::types::ImmutableRecord::from_values( + &record_values, + record_values.len(), + ); + let btree_key = BTreeKey::new_table_rowid(key_i64, Some(&immutable_record)); + + // Mark as Done before insert to avoid retry on I/O + *self = WriteRow::Done; + return_if_io!(cursor.insert(&btree_key)); + } + WriteRow::Done => { + return Ok(IOResult::Done(())); + } + } + } + } +}