diff --git a/core/index_method/toy_vector_sparse_ivf.rs b/core/index_method/toy_vector_sparse_ivf.rs index 51b53dc03..a83c56f45 100644 --- a/core/index_method/toy_vector_sparse_ivf.rs +++ b/core/index_method/toy_vector_sparse_ivf.rs @@ -20,7 +20,7 @@ use crate::{ operations, vector_types::{Vector, VectorType}, }, - Connection, LimboError, Result, Statement, Value, ValueRef, + Connection, LimboError, Result, Value, ValueRef, }; /// Simple inverted index for sparse vectors @@ -38,27 +38,45 @@ pub struct VectorSparseInvertedIndexMethodAttachment { patterns: Vec, } -pub enum VectorSparseInvertedIndexCreateState { - Init, - Run { stmt: Box }, -} - #[derive(Debug)] pub enum VectorSparseInvertedIndexInsertState { Init, Prepare { - positions: Option>, + vector: Option>, + sum: f64, rowid: i64, idx: usize, }, - Seek { - positions: Option>, + SeekScratch { + vector: Option>, + sum: f64, key: Option, rowid: i64, idx: usize, }, - Insert { - positions: Option>, + InsertScratch { + vector: Option>, + sum: f64, + key: Option, + rowid: i64, + idx: usize, + }, + SeekStats { + vector: Option>, + sum: f64, + key: Option, + rowid: i64, + idx: usize, + }, + ReadStats { + vector: Option>, + sum: f64, + rowid: i64, + idx: usize, + }, + UpdateStats { + vector: Option>, + sum: f64, key: Option, rowid: i64, idx: usize, @@ -69,18 +87,41 @@ pub enum VectorSparseInvertedIndexInsertState { pub enum VectorSparseInvertedIndexDeleteState { Init, Prepare { - positions: Option>, + vector: Option>, + sum: f64, rowid: i64, idx: usize, }, - Seek { - positions: Option>, + SeekScratch { + vector: Option>, + sum: f64, key: Option, rowid: i64, idx: usize, }, - Insert { - positions: Option>, + DeleteScratch { + vector: Option>, + sum: f64, + rowid: i64, + idx: usize, + }, + SeekStats { + vector: Option>, + sum: f64, + key: Option, + rowid: i64, + idx: usize, + }, + ReadStats { + vector: Option>, + sum: f64, + rowid: i64, + idx: usize, + }, + UpdateStats { + vector: Option>, + sum: f64, + key: Option, rowid: i64, idx: usize, }, @@ -145,10 +186,12 @@ enum VectorSparseInvertedIndexSearchState { pub struct VectorSparseInvertedIndexMethodCursor { configuration: IndexMethodConfiguration, + delta: f64, scratch_btree: String, scratch_cursor: Option, + stats_btree: String, + stats_cursor: Option, main_btree: Option, - create_state: VectorSparseInvertedIndexCreateState, insert_state: VectorSparseInvertedIndexInsertState, delete_state: VectorSparseInvertedIndexDeleteState, search_state: VectorSparseInvertedIndexSearchState, @@ -194,13 +237,20 @@ impl IndexMethodAttachment for VectorSparseInvertedIndexMethodAttachment { impl VectorSparseInvertedIndexMethodCursor { pub fn new(configuration: IndexMethodConfiguration) -> Self { let scratch_btree = format!("{}_scratch", configuration.index_name); + let stats_btree = format!("{}_stats", configuration.index_name); + let delta = match configuration.parameters.get("delta") { + Some(&Value::Float(delta)) => delta, + None => 0.0, + }; Self { configuration, + delta, scratch_btree, scratch_cursor: None, + stats_btree, + stats_cursor: None, main_btree: None, search_result: VecDeque::new(), - create_state: VectorSparseInvertedIndexCreateState::Init, insert_state: VectorSparseInvertedIndexInsertState::Init, delete_state: VectorSparseInvertedIndexDeleteState::Init, search_state: VectorSparseInvertedIndexSearchState::Init, @@ -208,57 +258,72 @@ impl VectorSparseInvertedIndexMethodCursor { } } +fn key_info() -> KeyInfo { + KeyInfo { + collation: CollationSeq::Binary, + sort_order: SortOrder::Asc, + } +} + impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { fn create(&mut self, connection: &Arc) -> Result> { - loop { - match &mut self.create_state { - VectorSparseInvertedIndexCreateState::Init => { - let columns = &self.configuration.columns; - let columns = columns.iter().map(|x| x.name.as_str()).collect::>(); - let sql = format!( - "CREATE INDEX {} ON {} USING {} ({})", - self.scratch_btree, - self.configuration.table_name, - BACKING_BTREE_INDEX_METHOD_NAME, - columns.join(", ") - ); - let stmt = connection.prepare(&sql)?; - connection.start_nested(); - self.create_state = VectorSparseInvertedIndexCreateState::Run { - stmt: Box::new(stmt), - }; - } - VectorSparseInvertedIndexCreateState::Run { stmt } => { - // we need to properly track subprograms and propagate result to the root program to make this execution async - let result = stmt.run_ignore_rows(); - connection.end_nested(); - result?; - return Ok(IOResult::Done(())); - } - } + // we need to properly track subprograms and propagate result to the root program to make this execution async + + let columns = &self.configuration.columns; + let columns = columns.iter().map(|x| x.name.as_str()).collect::>(); + let scratch_index_create = format!( + "CREATE INDEX {} ON {} USING {} ({})", + self.scratch_btree, + self.configuration.table_name, + BACKING_BTREE_INDEX_METHOD_NAME, + columns.join(", ") + ); + let stats_index_create = format!( + "CREATE INDEX {} ON {} USING {} ({})", + self.stats_btree, + self.configuration.table_name, + BACKING_BTREE_INDEX_METHOD_NAME, + columns.join(", ") + ); + for sql in [scratch_index_create, stats_index_create] { + let mut stmt = connection.prepare(&sql)?; + connection.start_nested(); + let result = stmt.run_ignore_rows(); + connection.end_nested(); + result?; } + + Ok(IOResult::Done(())) } fn destroy(&mut self, connection: &Arc) -> Result> { - let sql = format!("DROP INDEX {}", self.scratch_btree); - let mut stmt = connection.prepare(&sql)?; - connection.start_nested(); - let result = stmt.run_ignore_rows(); - connection.end_nested(); - result?; + let scratch_index_drop = format!("DROP INDEX {}", self.scratch_btree); + let stats_index_drop = format!("DROP INDEX {}", self.stats_btree); + for sql in [scratch_index_drop, stats_index_drop] { + let mut stmt = connection.prepare(&sql)?; + connection.start_nested(); + let result = stmt.run_ignore_rows(); + connection.end_nested(); + result?; + } + Ok(IOResult::Done(())) } fn open_read(&mut self, connection: &Arc) -> Result> { - let key_info = KeyInfo { - collation: CollationSeq::Binary, - sort_order: SortOrder::Asc, - }; self.scratch_cursor = Some(open_index_cursor( connection, &self.configuration.table_name, &self.scratch_btree, - vec![key_info, key_info], + // component, length, rowid + vec![key_info(), key_info(), key_info()], + )?); + self.stats_cursor = Some(open_index_cursor( + connection, + &self.configuration.table_name, + &self.stats_btree, + // component, count, non-zero-min, non-zero-max + vec![key_info(), key_info(), key_info(), key_info()], )?); self.main_btree = Some(open_table_cursor( connection, @@ -268,23 +333,32 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { } fn open_write(&mut self, connection: &Arc) -> Result> { - let key_info = KeyInfo { - collation: CollationSeq::Binary, - sort_order: SortOrder::Asc, - }; self.scratch_cursor = Some(open_index_cursor( connection, &self.configuration.table_name, &self.scratch_btree, - vec![key_info, key_info], + // component, length, rowid + vec![key_info(), key_info(), key_info()], + )?); + self.stats_cursor = Some(open_index_cursor( + connection, + &self.configuration.table_name, + &self.stats_btree, + // component + vec![key_info()], )?); Ok(IOResult::Done(())) } fn insert(&mut self, values: &[Register]) -> Result> { - let Some(cursor) = &mut self.scratch_cursor else { + let Some(scratch_cursor) = &mut self.scratch_cursor else { return Err(LimboError::InternalError( - "cursor must be opened".to_string(), + "scratch cursor must be opened".to_string(), + )); + }; + let Some(stats_cursor) = &mut self.stats_cursor else { + return Err(LimboError::InternalError( + "stats cursor must be opened".to_string(), )); }; loop { @@ -296,7 +370,7 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { "first value must be sparse vector".to_string(), )); }; - let vector = Vector::from_slice(vector)?; + let vector = Vector::from_vec(vector.to_vec())?; if !matches!(vector.vector_type, VectorType::Float32Sparse) { return Err(LimboError::InternalError( "first value must be sparse vector".to_string(), @@ -307,64 +381,196 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { "second value must be i64 rowid".to_string(), )); }; - let sparse = vector.as_f32_sparse(); + let sum = vector.as_f32_sparse().values.iter().sum::() as f64; self.insert_state = VectorSparseInvertedIndexInsertState::Prepare { - positions: Some(sparse.idx.to_vec()), + vector: Some(vector), + sum, rowid, idx: 0, } } VectorSparseInvertedIndexInsertState::Prepare { - positions, + vector, + sum, rowid, idx, } => { - let p = positions.as_ref().unwrap(); - if *idx == p.len() { + let v = vector.as_ref().unwrap(); + if *idx == v.as_f32_sparse().idx.len() { self.insert_state = VectorSparseInvertedIndexInsertState::Init; return Ok(IOResult::Done(())); } - let position = p[*idx]; + let position = v.as_f32_sparse().idx[*idx]; let key = ImmutableRecord::from_values( - &[Value::Integer(position as i64), Value::Integer(*rowid)], - 2, + &[ + Value::Integer(position as i64), + Value::Float(*sum), + Value::Integer(*rowid), + ], + 3, ); - self.insert_state = VectorSparseInvertedIndexInsertState::Seek { + self.insert_state = VectorSparseInvertedIndexInsertState::SeekScratch { + vector: vector.take(), + sum: *sum, idx: *idx, rowid: *rowid, - positions: positions.take(), key: Some(key), }; } - VectorSparseInvertedIndexInsertState::Seek { - positions, + VectorSparseInvertedIndexInsertState::SeekScratch { + vector, + sum, rowid, idx, key, } => { let k = key.as_ref().unwrap(); - let _ = return_if_io!( - cursor.seek(SeekKey::IndexKey(k), SeekOp::GE { eq_only: false }) - ); - self.insert_state = VectorSparseInvertedIndexInsertState::Insert { + let _ = + return_if_io!(scratch_cursor + .seek(SeekKey::IndexKey(k), SeekOp::GE { eq_only: false })); + self.insert_state = VectorSparseInvertedIndexInsertState::InsertScratch { + vector: vector.take(), + sum: *sum, idx: *idx, rowid: *rowid, - positions: positions.take(), key: key.take(), }; } - VectorSparseInvertedIndexInsertState::Insert { - positions, + VectorSparseInvertedIndexInsertState::InsertScratch { + vector, + sum, rowid, idx, key, } => { let k = key.as_ref().unwrap(); - return_if_io!(cursor.insert(&BTreeKey::IndexKey(k))); + return_if_io!(scratch_cursor.insert(&BTreeKey::IndexKey(k))); + + let v = vector.as_ref().unwrap(); + let position = v.as_f32_sparse().idx[*idx]; + let key = ImmutableRecord::from_values(&[Value::Integer(position as i64)], 1); + self.insert_state = VectorSparseInvertedIndexInsertState::SeekStats { + vector: vector.take(), + sum: *sum, + idx: *idx, + rowid: *rowid, + key: Some(key), + }; + } + VectorSparseInvertedIndexInsertState::SeekStats { + vector, + sum, + key, + rowid, + idx, + } => { + let k = key.as_ref().unwrap(); + let result = return_if_io!( + stats_cursor.seek(SeekKey::IndexKey(k), SeekOp::GE { eq_only: false }) + ); + match result { + SeekResult::Found => { + self.insert_state = VectorSparseInvertedIndexInsertState::ReadStats { + vector: vector.take(), + sum: *sum, + idx: *idx, + rowid: *rowid, + }; + } + SeekResult::NotFound | SeekResult::TryAdvance => { + let v = vector.as_ref().unwrap(); + let position = v.as_f32_sparse().idx[*idx]; + let value = v.as_f32_sparse().values[*idx] as f64; + tracing::debug!( + "update stats(insert): {} (cnt={}, min={}, max={})", + position, + 1, + value, + value, + ); + let key = ImmutableRecord::from_values( + &[ + Value::Integer(position as i64), + Value::Integer(1 as i64), + Value::Float(value), + Value::Float(value), + ], + 4, + ); + self.insert_state = VectorSparseInvertedIndexInsertState::UpdateStats { + vector: vector.take(), + sum: *sum, + idx: *idx, + rowid: *rowid, + key: Some(key), + }; + } + } + } + VectorSparseInvertedIndexInsertState::ReadStats { + vector, + sum, + rowid, + idx, + } => { + let record = return_if_io!(stats_cursor.record()).unwrap(); + let ValueRef::Integer(cnt) = record.get_value(1)? else { + return Err(LimboError::Corrupt(format!( + "stats index corrupted: expected integer" + ))); + }; + let ValueRef::Float(min) = record.get_value(2)? else { + return Err(LimboError::Corrupt(format!( + "stats index corrupted: expected float" + ))); + }; + let ValueRef::Float(max) = record.get_value(3)? else { + return Err(LimboError::Corrupt(format!( + "stats index corrupted: expected float" + ))); + }; + let v = vector.as_ref().unwrap(); + let position = v.as_f32_sparse().idx[*idx]; + let value = v.as_f32_sparse().values[*idx] as f64; + tracing::debug!( + "update stats(insert): {} (cnt={}, min={}, max={})", + position, + cnt + 1, + value.min(min), + value.max(max), + ); + let key = ImmutableRecord::from_values( + &[ + Value::Integer(position as i64), + Value::Integer(cnt + 1), + Value::Float(value.min(min)), + Value::Float(value.max(max)), + ], + 4, + ); + self.insert_state = VectorSparseInvertedIndexInsertState::UpdateStats { + vector: vector.take(), + sum: *sum, + idx: *idx, + rowid: *rowid, + key: Some(key), + }; + } + VectorSparseInvertedIndexInsertState::UpdateStats { + vector, + sum, + key, + rowid, + idx, + } => { + let k = key.as_ref().unwrap(); + return_if_io!(stats_cursor.insert(&BTreeKey::IndexKey(k))); + self.insert_state = VectorSparseInvertedIndexInsertState::Prepare { + vector: vector.take(), + sum: *sum, idx: *idx + 1, rowid: *rowid, - positions: positions.take(), }; } } @@ -377,6 +583,11 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { "cursor must be opened".to_string(), )); }; + let Some(stats_cursor) = &mut self.stats_cursor else { + return Err(LimboError::InternalError( + "stats cursor must be opened".to_string(), + )); + }; loop { tracing::debug!("delete_state: {:?}", self.delete_state); match &mut self.delete_state { @@ -386,7 +597,7 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { "first value must be sparse vector".to_string(), )); }; - let vector = Vector::from_slice(vector)?; + let vector = Vector::from_vec(vector.to_vec())?; if !matches!(vector.vector_type, VectorType::Float32Sparse) { return Err(LimboError::InternalError( "first value must be sparse vector".to_string(), @@ -397,37 +608,45 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { "second value must be i64 rowid".to_string(), )); }; - let sparse = vector.as_f32_sparse(); + let sum = vector.as_f32_sparse().values.iter().sum::() as f64; self.delete_state = VectorSparseInvertedIndexDeleteState::Prepare { - positions: Some(sparse.idx.to_vec()), + vector: Some(vector), + sum, rowid, idx: 0, } } VectorSparseInvertedIndexDeleteState::Prepare { - positions, + vector, + sum, rowid, idx, } => { - let p = positions.as_ref().unwrap(); - if *idx == p.len() { + let v = vector.as_ref().unwrap(); + if *idx == v.as_f32_sparse().idx.len() { self.delete_state = VectorSparseInvertedIndexDeleteState::Init; return Ok(IOResult::Done(())); } - let position = p[*idx]; + let position = v.as_f32_sparse().idx[*idx]; let key = ImmutableRecord::from_values( - &[Value::Integer(position as i64), Value::Integer(*rowid)], - 2, + &[ + Value::Integer(position as i64), + Value::Float(*sum), + Value::Integer(*rowid), + ], + 3, ); - self.delete_state = VectorSparseInvertedIndexDeleteState::Seek { + self.delete_state = VectorSparseInvertedIndexDeleteState::SeekScratch { + vector: vector.take(), idx: *idx, + sum: *sum, rowid: *rowid, - positions: positions.take(), key: Some(key), }; } - VectorSparseInvertedIndexDeleteState::Seek { - positions, + VectorSparseInvertedIndexDeleteState::SeekScratch { + vector, + sum, rowid, idx, key, @@ -439,22 +658,121 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { if !matches!(result, SeekResult::Found) { return Err(LimboError::Corrupt("inverted index corrupted".to_string())); } - self.delete_state = VectorSparseInvertedIndexDeleteState::Insert { + self.delete_state = VectorSparseInvertedIndexDeleteState::DeleteScratch { + vector: vector.take(), + sum: *sum, idx: *idx, rowid: *rowid, - positions: positions.take(), }; } - VectorSparseInvertedIndexDeleteState::Insert { - positions, + VectorSparseInvertedIndexDeleteState::DeleteScratch { + vector, + sum, rowid, idx, } => { return_if_io!(cursor.delete()); + let v = vector.as_ref().unwrap(); + let position = v.as_f32_sparse().idx[*idx]; + let key = ImmutableRecord::from_values(&[Value::Integer(position as i64)], 1); + self.delete_state = VectorSparseInvertedIndexDeleteState::SeekStats { + vector: vector.take(), + sum: *sum, + idx: *idx, + rowid: *rowid, + key: Some(key), + }; + } + VectorSparseInvertedIndexDeleteState::SeekStats { + vector, + sum, + key, + rowid, + idx, + } => { + let k = key.as_ref().unwrap(); + let result = return_if_io!( + stats_cursor.seek(SeekKey::IndexKey(k), SeekOp::GE { eq_only: true }) + ); + match result { + SeekResult::Found => { + self.delete_state = VectorSparseInvertedIndexDeleteState::ReadStats { + vector: vector.take(), + sum: *sum, + idx: *idx, + rowid: *rowid, + }; + } + SeekResult::NotFound | SeekResult::TryAdvance => { + return Err(LimboError::Corrupt(format!( + "stats index corrupted: can't find component row" + ))) + } + } + } + VectorSparseInvertedIndexDeleteState::ReadStats { + vector, + sum, + rowid, + idx, + } => { + let record = return_if_io!(stats_cursor.record()).unwrap(); + let ValueRef::Integer(cnt) = record.get_value(1)? else { + return Err(LimboError::Corrupt(format!( + "stats index corrupted: expected integer" + ))); + }; + let ValueRef::Float(min) = record.get_value(2)? else { + return Err(LimboError::Corrupt(format!( + "stats index corrupted: expected float" + ))); + }; + let ValueRef::Float(max) = record.get_value(3)? else { + return Err(LimboError::Corrupt(format!( + "stats index corrupted: expected float" + ))); + }; + let v = vector.as_ref().unwrap(); + let position = v.as_f32_sparse().idx[*idx]; + tracing::debug!( + "update stats(delete): {} (cnt={}, min={}, max={})", + position, + cnt - 1, + min, + max, + ); + let key = ImmutableRecord::from_values( + &[ + Value::Integer(position as i64), + Value::Integer(cnt - 1), + Value::Float(min), + Value::Float(max), + ], + 4, + ); + self.delete_state = VectorSparseInvertedIndexDeleteState::UpdateStats { + vector: vector.take(), + sum: *sum, + idx: *idx, + rowid: *rowid, + key: Some(key), + }; + } + VectorSparseInvertedIndexDeleteState::UpdateStats { + vector, + sum, + key, + rowid, + idx, + } => { + let k = key.as_ref().unwrap(); + return_if_io!(stats_cursor.insert(&BTreeKey::IndexKey(k))); + self.delete_state = VectorSparseInvertedIndexDeleteState::Prepare { + vector: vector.take(), + sum: *sum, idx: *idx + 1, rowid: *rowid, - positions: positions.take(), }; } }