unify code used for persistence.

We have code written for BTree (ZSet) persistence in both compiler.rs
and operator.rs, because there are minor differences between them. With
joins coming, it is time to unify this code.
This commit is contained in:
Glauber Costa
2025-09-08 05:37:41 -05:00
parent 8997670936
commit 1fd345f382
4 changed files with 219 additions and 284 deletions

View File

@@ -7,16 +7,16 @@
use crate::incremental::dbsp::Delta;
use crate::incremental::expr_compiler::CompiledExpression;
use crate::incremental::hashable_row::HashableRow;
use crate::incremental::operator::{
EvalState, FilterOperator, FilterPredicate, IncrementalOperator, InputOperator, ProjectOperator,
};
use crate::storage::btree::{BTreeCursor, BTreeKey};
use crate::incremental::persistence::WriteRow;
use crate::storage::btree::BTreeCursor;
// Note: logical module must be made pub(crate) in translate/mod.rs
use crate::translate::logical::{
BinaryOperator, LogicalExpr, LogicalPlan, LogicalSchema, SchemaRef,
};
use crate::types::{IOResult, SeekKey, SeekOp, SeekResult, Value};
use crate::types::{IOResult, SeekKey, Value};
use crate::Pager;
use crate::{return_and_restore_if_io, return_if_io, LimboError, Result};
use std::collections::HashMap;
@@ -27,119 +27,6 @@ use std::sync::Arc;
// The state table is always a key-value store with 3 columns: key, state, and weight.
const OPERATOR_COLUMNS: usize = 3;
/// State machine for writing a row to the materialized view
#[derive(Debug)]
pub enum WriteViewRow {
/// Initial empty state
Empty,
/// Reading existing record to get current weight
GetRecord,
/// Deleting the row (when final weight <= 0)
Delete,
/// Inserting/updating the row with new weight
Insert {
/// The final weight to write
final_weight: isize,
},
/// Completed processing this row
Done,
}
impl WriteViewRow {
fn new() -> Self {
Self::Empty
}
fn write_row(
&mut self,
cursor: &mut BTreeCursor,
row: HashableRow,
weight: isize,
) -> Result<IOResult<()>> {
loop {
match self {
WriteViewRow::Empty => {
let key = SeekKey::TableRowId(row.rowid);
let res = return_if_io!(cursor.seek(key, SeekOp::GE { eq_only: true }));
match res {
SeekResult::Found => *self = WriteViewRow::GetRecord,
_ => {
*self = WriteViewRow::Insert {
final_weight: weight,
}
}
}
}
WriteViewRow::GetRecord => {
let existing_record = return_if_io!(cursor.record());
let r = existing_record.ok_or_else(|| {
crate::LimboError::InternalError(format!(
"Found rowid {} in storage but could not read record",
row.rowid
))
})?;
let values = r.get_values();
// last value should contain the weight
let existing_weight = match values.last() {
Some(ref_val) => match ref_val.to_owned() {
Value::Integer(w) => w as isize,
_ => {
return Err(crate::LimboError::InternalError(format!(
"Invalid weight value in storage for rowid {}",
row.rowid
)))
}
},
None => {
return Err(crate::LimboError::InternalError(format!(
"No weight value found in storage for rowid {}",
row.rowid
)))
}
};
let final_weight = existing_weight + weight;
if final_weight <= 0 {
*self = WriteViewRow::Delete
} else {
*self = WriteViewRow::Insert { final_weight }
}
}
WriteViewRow::Delete => {
// Delete the row. Important: when delete returns I/O, the btree operation
// has already completed in memory, so mark as Done to avoid retry
*self = WriteViewRow::Done;
return_if_io!(cursor.delete());
}
WriteViewRow::Insert { final_weight } => {
let key = SeekKey::TableRowId(row.rowid);
return_if_io!(cursor.seek(key, SeekOp::GE { eq_only: true }));
// Create the record values: row values + weight
let mut values = row.values.clone();
values.push(Value::Integer(*final_weight as i64));
// Create an ImmutableRecord from the values
let immutable_record =
crate::types::ImmutableRecord::from_values(&values, values.len());
let btree_key = BTreeKey::new_table_rowid(row.rowid, Some(&immutable_record));
// Insert the row. Important: when insert returns I/O, the btree operation
// has already completed in memory, so mark as Done to avoid retry
*self = WriteViewRow::Done;
return_if_io!(cursor.insert(&btree_key));
}
WriteViewRow::Done => {
break;
}
}
}
Ok(IOResult::Done(()))
}
}
/// State machine for commit operations
pub enum CommitState {
/// Initial state - ready to start commit
@@ -160,7 +47,7 @@ pub enum CommitState {
/// Current index in delta.changes being processed
current_index: usize,
/// State for writing individual rows
write_row_state: WriteViewRow,
write_row_state: WriteRow,
/// Cursor for view data btree - created fresh for each row
view_cursor: Box<BTreeCursor>,
},
@@ -537,7 +424,7 @@ impl DbspCircuit {
self.commit_state = CommitState::UpdateView {
delta,
current_index: 0,
write_row_state: WriteViewRow::new(),
write_row_state: WriteRow::new(),
view_cursor,
};
}
@@ -554,9 +441,9 @@ impl DbspCircuit {
} else {
let (row, weight) = delta.changes[*current_index].clone();
// If we're starting a new row (Empty state), we need a fresh cursor
// If we're starting a new row (GetRecord state), we need a fresh cursor
// due to btree cursor state machine limitations
if matches!(write_row_state, WriteViewRow::Empty) {
if matches!(write_row_state, WriteRow::GetRecord) {
*view_cursor = Box::new(BTreeCursor::new_table(
None,
pager.clone(),
@@ -565,10 +452,19 @@ impl DbspCircuit {
));
}
// Build the view row format: row values + weight
let key = SeekKey::TableRowId(row.rowid);
let row_values = row.values.clone();
let build_fn = move |final_weight: isize| -> Vec<Value> {
let mut values = row_values.clone();
values.push(Value::Integer(final_weight as i64));
values
};
return_and_restore_if_io!(
&mut self.commit_state,
state,
write_row_state.write_row(view_cursor, row, weight)
write_row_state.write_row(view_cursor, key, build_fn, weight)
);
// Move to next row
@@ -587,7 +483,7 @@ impl DbspCircuit {
self.commit_state = CommitState::UpdateView {
delta,
current_index: *current_index + 1,
write_row_state: WriteViewRow::new(),
write_row_state: WriteRow::new(),
view_cursor,
};
}

View File

@@ -4,4 +4,5 @@ pub mod dbsp;
pub mod expr_compiler;
pub mod hashable_row;
pub mod operator;
pub mod persistence;
pub mod view;

View File

@@ -6,8 +6,9 @@ use crate::function::{AggFunc, Func};
use crate::incremental::dbsp::Delta;
use crate::incremental::expr_compiler::CompiledExpression;
use crate::incremental::hashable_row::HashableRow;
use crate::storage::btree::{BTreeCursor, BTreeKey};
use crate::types::{IOResult, SeekKey, SeekOp, SeekResult, Text};
use crate::incremental::persistence::{ReadRecord, WriteRow};
use crate::storage::btree::BTreeCursor;
use crate::types::{IOResult, SeekKey, Text};
use crate::{
return_and_restore_if_io, return_if_io, Connection, Database, Result, SymbolTable, Value,
};
@@ -17,160 +18,6 @@ use std::sync::{Arc, Mutex};
use turso_macros::match_ignore_ascii_case;
use turso_parser::ast::{As, Expr, Literal, Name, OneSelect, Operator, ResultColumn};
#[derive(Debug)]
pub enum ReadRecord {
GetRecord,
Done { state: Option<AggregateState> },
}
impl ReadRecord {
fn new() -> Self {
ReadRecord::GetRecord
}
fn read_record(
&mut self,
key: SeekKey,
aggregates: &[AggregateFunction],
cursor: &mut BTreeCursor,
) -> Result<IOResult<Option<AggregateState>>> {
loop {
match self {
ReadRecord::GetRecord => {
let res = return_if_io!(cursor.seek(key.clone(), SeekOp::GE { eq_only: true }));
if !matches!(res, SeekResult::Found) {
*self = ReadRecord::Done { state: None };
} else {
let record = return_if_io!(cursor.record());
let r = record.ok_or_else(|| {
crate::LimboError::InternalError(format!(
"Found key {key:?} in aggregate storage but could not read record"
))
})?;
let values = r.get_values();
let blob = values[1].to_owned();
let (state, _group_key) = match blob {
Value::Blob(blob) => AggregateState::from_blob(&blob, aggregates)
.ok_or_else(|| {
crate::LimboError::InternalError(format!(
"Cannot deserialize aggregate state {blob:?}",
))
}),
_ => Err(crate::LimboError::ParseError(
"Value in aggregator not blob".to_string(),
)),
}?;
*self = ReadRecord::Done { state: Some(state) }
}
}
ReadRecord::Done { state } => return Ok(IOResult::Done(state.clone())),
}
}
}
}
#[derive(Debug)]
pub(crate) enum WriteRecord {
GetRecord,
Delete { final_weight: isize },
Insert { final_weight: isize },
Done,
}
impl WriteRecord {
fn new() -> Self {
WriteRecord::GetRecord
}
fn write_record(
&mut self,
key: SeekKey,
record: HashableRow,
weight: isize,
cursor: &mut BTreeCursor,
) -> Result<IOResult<()>> {
loop {
match self {
WriteRecord::GetRecord => {
let res = return_if_io!(cursor.seek(key.clone(), SeekOp::GE { eq_only: true }));
if !matches!(res, SeekResult::Found) {
*self = WriteRecord::Insert {
final_weight: weight,
};
} else {
let existing_record = return_if_io!(cursor.record());
let r = existing_record.ok_or_else(|| {
crate::LimboError::InternalError(format!(
"Found key {key:?} in aggregate storage but could not read record"
))
})?;
let values = r.get_values();
// values[2] should contain the weight
let existing_weight = match values[2].to_owned() {
Value::Integer(w) => w as isize,
_ => {
return Err(crate::LimboError::InternalError(format!(
"Invalid weight value in aggregate storage for key {key:?}"
)))
}
};
let final_weight = existing_weight + weight;
if final_weight <= 0 {
*self = WriteRecord::Delete { final_weight }
} else {
*self = WriteRecord::Insert { final_weight }
}
}
}
WriteRecord::Delete { final_weight: _ } => {
let res = return_if_io!(cursor.seek(key.clone(), SeekOp::GE { eq_only: true }));
if !matches!(res, SeekResult::Found) {
return Err(crate::LimboError::InternalError(format!(
"record not found for {key:?}, but we had just GetRecord! Should not be possible"
)));
}
// Done - row was deleted and weights cancel out.
// If we iniated the delete we will complete, so Done has to be set
// before so we don't come back here.
*self = WriteRecord::Done;
return_if_io!(cursor.delete());
}
WriteRecord::Insert { final_weight } => {
return_if_io!(cursor.seek(key.clone(), SeekOp::GE { eq_only: true }));
// Build the key and insert the record
let key_i64 = match key {
SeekKey::TableRowId(id) => id,
_ => {
return Err(crate::LimboError::InternalError(
"Expected TableRowId for aggregate storage".to_string(),
))
}
};
// Create the record values: key, blob, weight
let record_values = vec![
Value::Integer(key_i64),
record.values[0].clone(), // The blob with serialized state
Value::Integer(*final_weight as i64),
];
// Create an ImmutableRecord from the values
let immutable_record = crate::types::ImmutableRecord::from_values(
&record_values,
record_values.len(),
);
let btree_key = BTreeKey::new_table_rowid(key_i64, Some(&immutable_record));
*self = WriteRecord::Done;
return_if_io!(cursor.insert(&btree_key));
}
WriteRecord::Done => {
return Ok(IOResult::Done(()));
}
}
}
}
}
type ComputedStates = HashMap<String, (Vec<Value>, AggregateState)>; // group_key_str -> (group_key, state)
#[derive(Debug)]
enum AggregateCommitState {
@@ -182,7 +29,7 @@ enum AggregateCommitState {
delta: Delta,
computed_states: ComputedStates,
current_idx: usize,
write_record: WriteRecord,
write_row: WriteRow,
},
Done {
delta: Delta,
@@ -1384,7 +1231,7 @@ impl AggregateState {
/// Deserialize aggregate state from a binary blob
/// Returns the aggregate state and the group key values
fn from_blob(blob: &[u8], aggregates: &[AggregateFunction]) -> Option<(Self, Vec<Value>)> {
pub fn from_blob(blob: &[u8], aggregates: &[AggregateFunction]) -> Option<(Self, Vec<Value>)> {
let mut cursor = 0;
// Check version byte
@@ -1768,14 +1615,14 @@ impl IncrementalOperator for AggregateOperator {
delta: output_delta,
computed_states,
current_idx: 0,
write_record: WriteRecord::new(),
write_row: WriteRow::new(),
};
}
AggregateCommitState::PersistDelta {
delta,
computed_states,
current_idx,
write_record,
write_row,
} => {
let states_vec: Vec<_> = computed_states.iter().collect();
@@ -1795,10 +1642,25 @@ impl IncrementalOperator for AggregateOperator {
let state_blob = agg_state.to_blob(&self.aggregates, group_key);
let blob_row = HashableRow::new(0, vec![Value::Blob(state_blob)]);
// Build the aggregate storage format: [key, blob, weight]
let seek_key_clone = seek_key.clone();
let blob_value = blob_row.values[0].clone();
let build_fn = move |final_weight: isize| -> Vec<Value> {
let key_i64 = match seek_key_clone.clone() {
SeekKey::TableRowId(id) => id,
_ => panic!("Expected TableRowId"),
};
vec![
Value::Integer(key_i64),
blob_value.clone(), // The blob with serialized state
Value::Integer(final_weight as i64),
]
};
return_and_restore_if_io!(
&mut self.commit_state,
state,
write_record.write_record(seek_key, blob_row, weight, cursor)
write_row.write_row(cursor, seek_key, build_fn, weight)
);
let delta = std::mem::take(delta);
@@ -1808,7 +1670,7 @@ impl IncrementalOperator for AggregateOperator {
delta,
computed_states,
current_idx: *current_idx + 1,
write_record: WriteRecord::new(), // Reset for next write
write_row: WriteRow::new(), // Reset for next write
};
}
}

View File

@@ -0,0 +1,176 @@
use crate::incremental::operator::{AggregateFunction, AggregateState};
use crate::storage::btree::{BTreeCursor, BTreeKey};
use crate::types::{IOResult, SeekKey, SeekOp, SeekResult};
use crate::{return_if_io, Result, Value};
#[derive(Debug, Default)]
pub enum ReadRecord {
#[default]
GetRecord,
Done {
state: Option<AggregateState>,
},
}
impl ReadRecord {
pub fn new() -> Self {
Self::default()
}
pub fn read_record(
&mut self,
key: SeekKey,
aggregates: &[AggregateFunction],
cursor: &mut BTreeCursor,
) -> Result<IOResult<Option<AggregateState>>> {
loop {
match self {
ReadRecord::GetRecord => {
let res = return_if_io!(cursor.seek(key.clone(), SeekOp::GE { eq_only: true }));
if !matches!(res, SeekResult::Found) {
*self = ReadRecord::Done { state: None };
} else {
let record = return_if_io!(cursor.record());
let r = record.ok_or_else(|| {
crate::LimboError::InternalError(format!(
"Found key {key:?} in aggregate storage but could not read record"
))
})?;
let values = r.get_values();
let blob = values[1].to_owned();
let (state, _group_key) = match blob {
Value::Blob(blob) => AggregateState::from_blob(&blob, aggregates)
.ok_or_else(|| {
crate::LimboError::InternalError(format!(
"Cannot deserialize aggregate state {blob:?}",
))
}),
_ => Err(crate::LimboError::ParseError(
"Value in aggregator not blob".to_string(),
)),
}?;
*self = ReadRecord::Done { state: Some(state) }
}
}
ReadRecord::Done { state } => return Ok(IOResult::Done(state.clone())),
}
}
}
}
#[derive(Debug, Default)]
pub enum WriteRow {
#[default]
GetRecord,
Delete,
Insert {
final_weight: isize,
},
Done,
}
impl WriteRow {
pub fn new() -> Self {
Self::default()
}
/// Write a row with weight management.
///
/// # Arguments
/// * `cursor` - BTree cursor for the storage
/// * `key` - The key to seek (TableRowId)
/// * `build_record` - Function that builds the record values to insert.
/// Takes the final_weight and returns the complete record values.
/// * `weight` - The weight delta to apply
pub fn write_row<F>(
&mut self,
cursor: &mut BTreeCursor,
key: SeekKey,
build_record: F,
weight: isize,
) -> Result<IOResult<()>>
where
F: Fn(isize) -> Vec<Value>,
{
loop {
match self {
WriteRow::GetRecord => {
let res = return_if_io!(cursor.seek(key.clone(), SeekOp::GE { eq_only: true }));
if !matches!(res, SeekResult::Found) {
*self = WriteRow::Insert {
final_weight: weight,
};
} else {
let existing_record = return_if_io!(cursor.record());
let r = existing_record.ok_or_else(|| {
crate::LimboError::InternalError(format!(
"Found key {key:?} in storage but could not read record"
))
})?;
let values = r.get_values();
// Weight is always the last value
let existing_weight = match values.last() {
Some(val) => match val.to_owned() {
Value::Integer(w) => w as isize,
_ => {
return Err(crate::LimboError::InternalError(format!(
"Invalid weight value in storage for key {key:?}"
)))
}
},
None => {
return Err(crate::LimboError::InternalError(format!(
"No weight value found in storage for key {key:?}"
)))
}
};
let final_weight = existing_weight + weight;
if final_weight <= 0 {
*self = WriteRow::Delete
} else {
*self = WriteRow::Insert { final_weight }
}
}
}
WriteRow::Delete => {
// Mark as Done before delete to avoid retry on I/O
*self = WriteRow::Done;
return_if_io!(cursor.delete());
}
WriteRow::Insert { final_weight } => {
return_if_io!(cursor.seek(key.clone(), SeekOp::GE { eq_only: true }));
// Extract the row ID from the key
let key_i64 = match key {
SeekKey::TableRowId(id) => id,
_ => {
return Err(crate::LimboError::InternalError(
"Expected TableRowId for storage".to_string(),
))
}
};
// Build the record values using the provided function
let record_values = build_record(*final_weight);
// Create an ImmutableRecord from the values
let immutable_record = crate::types::ImmutableRecord::from_values(
&record_values,
record_values.len(),
);
let btree_key = BTreeKey::new_table_rowid(key_i64, Some(&immutable_record));
// Mark as Done before insert to avoid retry on I/O
*self = WriteRow::Done;
return_if_io!(cursor.insert(&btree_key));
}
WriteRow::Done => {
return Ok(IOResult::Done(()));
}
}
}
}
}