From 1fd345f382521fca1fc16b0d4dd2e4720ecd7f7d Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Mon, 8 Sep 2025 05:37:41 -0500 Subject: [PATCH] unify code used for persistence. We have code written for BTree (ZSet) persistence in both compiler.rs and operator.rs, because there are minor differences between them. With joins coming, it is time to unify this code. --- core/incremental/compiler.rs | 140 ++++-------------------- core/incremental/mod.rs | 1 + core/incremental/operator.rs | 186 +++++--------------------------- core/incremental/persistence.rs | 176 ++++++++++++++++++++++++++++++ 4 files changed, 219 insertions(+), 284 deletions(-) create mode 100644 core/incremental/persistence.rs diff --git a/core/incremental/compiler.rs b/core/incremental/compiler.rs index 6b9996a85..e15fc6dd2 100644 --- a/core/incremental/compiler.rs +++ b/core/incremental/compiler.rs @@ -7,16 +7,16 @@ use crate::incremental::dbsp::Delta; use crate::incremental::expr_compiler::CompiledExpression; -use crate::incremental::hashable_row::HashableRow; use crate::incremental::operator::{ EvalState, FilterOperator, FilterPredicate, IncrementalOperator, InputOperator, ProjectOperator, }; -use crate::storage::btree::{BTreeCursor, BTreeKey}; +use crate::incremental::persistence::WriteRow; +use crate::storage::btree::BTreeCursor; // Note: logical module must be made pub(crate) in translate/mod.rs use crate::translate::logical::{ BinaryOperator, LogicalExpr, LogicalPlan, LogicalSchema, SchemaRef, }; -use crate::types::{IOResult, SeekKey, SeekOp, SeekResult, Value}; +use crate::types::{IOResult, SeekKey, Value}; use crate::Pager; use crate::{return_and_restore_if_io, return_if_io, LimboError, Result}; use std::collections::HashMap; @@ -27,119 +27,6 @@ use std::sync::Arc; // The state table is always a key-value store with 3 columns: key, state, and weight. const OPERATOR_COLUMNS: usize = 3; -/// State machine for writing a row to the materialized view -#[derive(Debug)] -pub enum WriteViewRow { - /// Initial empty state - Empty, - - /// Reading existing record to get current weight - GetRecord, - - /// Deleting the row (when final weight <= 0) - Delete, - - /// Inserting/updating the row with new weight - Insert { - /// The final weight to write - final_weight: isize, - }, - - /// Completed processing this row - Done, -} - -impl WriteViewRow { - fn new() -> Self { - Self::Empty - } - fn write_row( - &mut self, - cursor: &mut BTreeCursor, - row: HashableRow, - weight: isize, - ) -> Result> { - loop { - match self { - WriteViewRow::Empty => { - let key = SeekKey::TableRowId(row.rowid); - let res = return_if_io!(cursor.seek(key, SeekOp::GE { eq_only: true })); - match res { - SeekResult::Found => *self = WriteViewRow::GetRecord, - _ => { - *self = WriteViewRow::Insert { - final_weight: weight, - } - } - } - } - WriteViewRow::GetRecord => { - let existing_record = return_if_io!(cursor.record()); - let r = existing_record.ok_or_else(|| { - crate::LimboError::InternalError(format!( - "Found rowid {} in storage but could not read record", - row.rowid - )) - })?; - let values = r.get_values(); - - // last value should contain the weight - let existing_weight = match values.last() { - Some(ref_val) => match ref_val.to_owned() { - Value::Integer(w) => w as isize, - _ => { - return Err(crate::LimboError::InternalError(format!( - "Invalid weight value in storage for rowid {}", - row.rowid - ))) - } - }, - None => { - return Err(crate::LimboError::InternalError(format!( - "No weight value found in storage for rowid {}", - row.rowid - ))) - } - }; - let final_weight = existing_weight + weight; - if final_weight <= 0 { - *self = WriteViewRow::Delete - } else { - *self = WriteViewRow::Insert { final_weight } - } - } - WriteViewRow::Delete => { - // Delete the row. Important: when delete returns I/O, the btree operation - // has already completed in memory, so mark as Done to avoid retry - *self = WriteViewRow::Done; - return_if_io!(cursor.delete()); - } - WriteViewRow::Insert { final_weight } => { - let key = SeekKey::TableRowId(row.rowid); - return_if_io!(cursor.seek(key, SeekOp::GE { eq_only: true })); - - // Create the record values: row values + weight - let mut values = row.values.clone(); - values.push(Value::Integer(*final_weight as i64)); - - // Create an ImmutableRecord from the values - let immutable_record = - crate::types::ImmutableRecord::from_values(&values, values.len()); - let btree_key = BTreeKey::new_table_rowid(row.rowid, Some(&immutable_record)); - // Insert the row. Important: when insert returns I/O, the btree operation - // has already completed in memory, so mark as Done to avoid retry - *self = WriteViewRow::Done; - return_if_io!(cursor.insert(&btree_key)); - } - WriteViewRow::Done => { - break; - } - } - } - Ok(IOResult::Done(())) - } -} - /// State machine for commit operations pub enum CommitState { /// Initial state - ready to start commit @@ -160,7 +47,7 @@ pub enum CommitState { /// Current index in delta.changes being processed current_index: usize, /// State for writing individual rows - write_row_state: WriteViewRow, + write_row_state: WriteRow, /// Cursor for view data btree - created fresh for each row view_cursor: Box, }, @@ -537,7 +424,7 @@ impl DbspCircuit { self.commit_state = CommitState::UpdateView { delta, current_index: 0, - write_row_state: WriteViewRow::new(), + write_row_state: WriteRow::new(), view_cursor, }; } @@ -554,9 +441,9 @@ impl DbspCircuit { } else { let (row, weight) = delta.changes[*current_index].clone(); - // If we're starting a new row (Empty state), we need a fresh cursor + // If we're starting a new row (GetRecord state), we need a fresh cursor // due to btree cursor state machine limitations - if matches!(write_row_state, WriteViewRow::Empty) { + if matches!(write_row_state, WriteRow::GetRecord) { *view_cursor = Box::new(BTreeCursor::new_table( None, pager.clone(), @@ -565,10 +452,19 @@ impl DbspCircuit { )); } + // Build the view row format: row values + weight + let key = SeekKey::TableRowId(row.rowid); + let row_values = row.values.clone(); + let build_fn = move |final_weight: isize| -> Vec { + let mut values = row_values.clone(); + values.push(Value::Integer(final_weight as i64)); + values + }; + return_and_restore_if_io!( &mut self.commit_state, state, - write_row_state.write_row(view_cursor, row, weight) + write_row_state.write_row(view_cursor, key, build_fn, weight) ); // Move to next row @@ -587,7 +483,7 @@ impl DbspCircuit { self.commit_state = CommitState::UpdateView { delta, current_index: *current_index + 1, - write_row_state: WriteViewRow::new(), + write_row_state: WriteRow::new(), view_cursor, }; } diff --git a/core/incremental/mod.rs b/core/incremental/mod.rs index 755a27351..285c70978 100644 --- a/core/incremental/mod.rs +++ b/core/incremental/mod.rs @@ -4,4 +4,5 @@ pub mod dbsp; pub mod expr_compiler; pub mod hashable_row; pub mod operator; +pub mod persistence; pub mod view; diff --git a/core/incremental/operator.rs b/core/incremental/operator.rs index 46a933b87..8d4490826 100644 --- a/core/incremental/operator.rs +++ b/core/incremental/operator.rs @@ -6,8 +6,9 @@ use crate::function::{AggFunc, Func}; use crate::incremental::dbsp::Delta; use crate::incremental::expr_compiler::CompiledExpression; use crate::incremental::hashable_row::HashableRow; -use crate::storage::btree::{BTreeCursor, BTreeKey}; -use crate::types::{IOResult, SeekKey, SeekOp, SeekResult, Text}; +use crate::incremental::persistence::{ReadRecord, WriteRow}; +use crate::storage::btree::BTreeCursor; +use crate::types::{IOResult, SeekKey, Text}; use crate::{ return_and_restore_if_io, return_if_io, Connection, Database, Result, SymbolTable, Value, }; @@ -17,160 +18,6 @@ use std::sync::{Arc, Mutex}; use turso_macros::match_ignore_ascii_case; use turso_parser::ast::{As, Expr, Literal, Name, OneSelect, Operator, ResultColumn}; -#[derive(Debug)] -pub enum ReadRecord { - GetRecord, - Done { state: Option }, -} - -impl ReadRecord { - fn new() -> Self { - ReadRecord::GetRecord - } - - fn read_record( - &mut self, - key: SeekKey, - aggregates: &[AggregateFunction], - cursor: &mut BTreeCursor, - ) -> Result>> { - loop { - match self { - ReadRecord::GetRecord => { - let res = return_if_io!(cursor.seek(key.clone(), SeekOp::GE { eq_only: true })); - if !matches!(res, SeekResult::Found) { - *self = ReadRecord::Done { state: None }; - } else { - let record = return_if_io!(cursor.record()); - let r = record.ok_or_else(|| { - crate::LimboError::InternalError(format!( - "Found key {key:?} in aggregate storage but could not read record" - )) - })?; - let values = r.get_values(); - let blob = values[1].to_owned(); - - let (state, _group_key) = match blob { - Value::Blob(blob) => AggregateState::from_blob(&blob, aggregates) - .ok_or_else(|| { - crate::LimboError::InternalError(format!( - "Cannot deserialize aggregate state {blob:?}", - )) - }), - _ => Err(crate::LimboError::ParseError( - "Value in aggregator not blob".to_string(), - )), - }?; - *self = ReadRecord::Done { state: Some(state) } - } - } - ReadRecord::Done { state } => return Ok(IOResult::Done(state.clone())), - } - } - } -} - -#[derive(Debug)] -pub(crate) enum WriteRecord { - GetRecord, - Delete { final_weight: isize }, - Insert { final_weight: isize }, - Done, -} -impl WriteRecord { - fn new() -> Self { - WriteRecord::GetRecord - } - - fn write_record( - &mut self, - key: SeekKey, - record: HashableRow, - weight: isize, - cursor: &mut BTreeCursor, - ) -> Result> { - loop { - match self { - WriteRecord::GetRecord => { - let res = return_if_io!(cursor.seek(key.clone(), SeekOp::GE { eq_only: true })); - if !matches!(res, SeekResult::Found) { - *self = WriteRecord::Insert { - final_weight: weight, - }; - } else { - let existing_record = return_if_io!(cursor.record()); - let r = existing_record.ok_or_else(|| { - crate::LimboError::InternalError(format!( - "Found key {key:?} in aggregate storage but could not read record" - )) - })?; - let values = r.get_values(); - // values[2] should contain the weight - let existing_weight = match values[2].to_owned() { - Value::Integer(w) => w as isize, - _ => { - return Err(crate::LimboError::InternalError(format!( - "Invalid weight value in aggregate storage for key {key:?}" - ))) - } - }; - let final_weight = existing_weight + weight; - if final_weight <= 0 { - *self = WriteRecord::Delete { final_weight } - } else { - *self = WriteRecord::Insert { final_weight } - } - } - } - WriteRecord::Delete { final_weight: _ } => { - let res = return_if_io!(cursor.seek(key.clone(), SeekOp::GE { eq_only: true })); - if !matches!(res, SeekResult::Found) { - return Err(crate::LimboError::InternalError(format!( - "record not found for {key:?}, but we had just GetRecord! Should not be possible" - ))); - } - // Done - row was deleted and weights cancel out. - // If we iniated the delete we will complete, so Done has to be set - // before so we don't come back here. - *self = WriteRecord::Done; - return_if_io!(cursor.delete()); - } - WriteRecord::Insert { final_weight } => { - return_if_io!(cursor.seek(key.clone(), SeekOp::GE { eq_only: true })); - // Build the key and insert the record - let key_i64 = match key { - SeekKey::TableRowId(id) => id, - _ => { - return Err(crate::LimboError::InternalError( - "Expected TableRowId for aggregate storage".to_string(), - )) - } - }; - // Create the record values: key, blob, weight - let record_values = vec![ - Value::Integer(key_i64), - record.values[0].clone(), // The blob with serialized state - Value::Integer(*final_weight as i64), - ]; - - // Create an ImmutableRecord from the values - let immutable_record = crate::types::ImmutableRecord::from_values( - &record_values, - record_values.len(), - ); - let btree_key = BTreeKey::new_table_rowid(key_i64, Some(&immutable_record)); - - *self = WriteRecord::Done; - return_if_io!(cursor.insert(&btree_key)); - } - WriteRecord::Done => { - return Ok(IOResult::Done(())); - } - } - } - } -} - type ComputedStates = HashMap, AggregateState)>; // group_key_str -> (group_key, state) #[derive(Debug)] enum AggregateCommitState { @@ -182,7 +29,7 @@ enum AggregateCommitState { delta: Delta, computed_states: ComputedStates, current_idx: usize, - write_record: WriteRecord, + write_row: WriteRow, }, Done { delta: Delta, @@ -1384,7 +1231,7 @@ impl AggregateState { /// Deserialize aggregate state from a binary blob /// Returns the aggregate state and the group key values - fn from_blob(blob: &[u8], aggregates: &[AggregateFunction]) -> Option<(Self, Vec)> { + pub fn from_blob(blob: &[u8], aggregates: &[AggregateFunction]) -> Option<(Self, Vec)> { let mut cursor = 0; // Check version byte @@ -1768,14 +1615,14 @@ impl IncrementalOperator for AggregateOperator { delta: output_delta, computed_states, current_idx: 0, - write_record: WriteRecord::new(), + write_row: WriteRow::new(), }; } AggregateCommitState::PersistDelta { delta, computed_states, current_idx, - write_record, + write_row, } => { let states_vec: Vec<_> = computed_states.iter().collect(); @@ -1795,10 +1642,25 @@ impl IncrementalOperator for AggregateOperator { let state_blob = agg_state.to_blob(&self.aggregates, group_key); let blob_row = HashableRow::new(0, vec![Value::Blob(state_blob)]); + // Build the aggregate storage format: [key, blob, weight] + let seek_key_clone = seek_key.clone(); + let blob_value = blob_row.values[0].clone(); + let build_fn = move |final_weight: isize| -> Vec { + let key_i64 = match seek_key_clone.clone() { + SeekKey::TableRowId(id) => id, + _ => panic!("Expected TableRowId"), + }; + vec![ + Value::Integer(key_i64), + blob_value.clone(), // The blob with serialized state + Value::Integer(final_weight as i64), + ] + }; + return_and_restore_if_io!( &mut self.commit_state, state, - write_record.write_record(seek_key, blob_row, weight, cursor) + write_row.write_row(cursor, seek_key, build_fn, weight) ); let delta = std::mem::take(delta); @@ -1808,7 +1670,7 @@ impl IncrementalOperator for AggregateOperator { delta, computed_states, current_idx: *current_idx + 1, - write_record: WriteRecord::new(), // Reset for next write + write_row: WriteRow::new(), // Reset for next write }; } } diff --git a/core/incremental/persistence.rs b/core/incremental/persistence.rs new file mode 100644 index 000000000..381b406aa --- /dev/null +++ b/core/incremental/persistence.rs @@ -0,0 +1,176 @@ +use crate::incremental::operator::{AggregateFunction, AggregateState}; +use crate::storage::btree::{BTreeCursor, BTreeKey}; +use crate::types::{IOResult, SeekKey, SeekOp, SeekResult}; +use crate::{return_if_io, Result, Value}; + +#[derive(Debug, Default)] +pub enum ReadRecord { + #[default] + GetRecord, + Done { + state: Option, + }, +} + +impl ReadRecord { + pub fn new() -> Self { + Self::default() + } + + pub fn read_record( + &mut self, + key: SeekKey, + aggregates: &[AggregateFunction], + cursor: &mut BTreeCursor, + ) -> Result>> { + loop { + match self { + ReadRecord::GetRecord => { + let res = return_if_io!(cursor.seek(key.clone(), SeekOp::GE { eq_only: true })); + if !matches!(res, SeekResult::Found) { + *self = ReadRecord::Done { state: None }; + } else { + let record = return_if_io!(cursor.record()); + let r = record.ok_or_else(|| { + crate::LimboError::InternalError(format!( + "Found key {key:?} in aggregate storage but could not read record" + )) + })?; + let values = r.get_values(); + let blob = values[1].to_owned(); + + let (state, _group_key) = match blob { + Value::Blob(blob) => AggregateState::from_blob(&blob, aggregates) + .ok_or_else(|| { + crate::LimboError::InternalError(format!( + "Cannot deserialize aggregate state {blob:?}", + )) + }), + _ => Err(crate::LimboError::ParseError( + "Value in aggregator not blob".to_string(), + )), + }?; + *self = ReadRecord::Done { state: Some(state) } + } + } + ReadRecord::Done { state } => return Ok(IOResult::Done(state.clone())), + } + } + } +} + +#[derive(Debug, Default)] +pub enum WriteRow { + #[default] + GetRecord, + Delete, + Insert { + final_weight: isize, + }, + Done, +} + +impl WriteRow { + pub fn new() -> Self { + Self::default() + } + + /// Write a row with weight management. + /// + /// # Arguments + /// * `cursor` - BTree cursor for the storage + /// * `key` - The key to seek (TableRowId) + /// * `build_record` - Function that builds the record values to insert. + /// Takes the final_weight and returns the complete record values. + /// * `weight` - The weight delta to apply + pub fn write_row( + &mut self, + cursor: &mut BTreeCursor, + key: SeekKey, + build_record: F, + weight: isize, + ) -> Result> + where + F: Fn(isize) -> Vec, + { + loop { + match self { + WriteRow::GetRecord => { + let res = return_if_io!(cursor.seek(key.clone(), SeekOp::GE { eq_only: true })); + if !matches!(res, SeekResult::Found) { + *self = WriteRow::Insert { + final_weight: weight, + }; + } else { + let existing_record = return_if_io!(cursor.record()); + let r = existing_record.ok_or_else(|| { + crate::LimboError::InternalError(format!( + "Found key {key:?} in storage but could not read record" + )) + })?; + let values = r.get_values(); + + // Weight is always the last value + let existing_weight = match values.last() { + Some(val) => match val.to_owned() { + Value::Integer(w) => w as isize, + _ => { + return Err(crate::LimboError::InternalError(format!( + "Invalid weight value in storage for key {key:?}" + ))) + } + }, + None => { + return Err(crate::LimboError::InternalError(format!( + "No weight value found in storage for key {key:?}" + ))) + } + }; + + let final_weight = existing_weight + weight; + if final_weight <= 0 { + *self = WriteRow::Delete + } else { + *self = WriteRow::Insert { final_weight } + } + } + } + WriteRow::Delete => { + // Mark as Done before delete to avoid retry on I/O + *self = WriteRow::Done; + return_if_io!(cursor.delete()); + } + WriteRow::Insert { final_weight } => { + return_if_io!(cursor.seek(key.clone(), SeekOp::GE { eq_only: true })); + + // Extract the row ID from the key + let key_i64 = match key { + SeekKey::TableRowId(id) => id, + _ => { + return Err(crate::LimboError::InternalError( + "Expected TableRowId for storage".to_string(), + )) + } + }; + + // Build the record values using the provided function + let record_values = build_record(*final_weight); + + // Create an ImmutableRecord from the values + let immutable_record = crate::types::ImmutableRecord::from_values( + &record_values, + record_values.len(), + ); + let btree_key = BTreeKey::new_table_rowid(key_i64, Some(&immutable_record)); + + // Mark as Done before insert to avoid retry on I/O + *self = WriteRow::Done; + return_if_io!(cursor.insert(&btree_key)); + } + WriteRow::Done => { + return Ok(IOResult::Done(())); + } + } + } + } +}