From a440102b9ba91a02fbde0c60902b72a635277646 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 29 Oct 2025 18:10:48 +0400 Subject: [PATCH 01/15] add components stats table --- core/index_method/toy_vector_sparse_ivf.rs | 522 +++++++++++++++++---- 1 file changed, 420 insertions(+), 102 deletions(-) diff --git a/core/index_method/toy_vector_sparse_ivf.rs b/core/index_method/toy_vector_sparse_ivf.rs index 51b53dc03..a83c56f45 100644 --- a/core/index_method/toy_vector_sparse_ivf.rs +++ b/core/index_method/toy_vector_sparse_ivf.rs @@ -20,7 +20,7 @@ use crate::{ operations, vector_types::{Vector, VectorType}, }, - Connection, LimboError, Result, Statement, Value, ValueRef, + Connection, LimboError, Result, Value, ValueRef, }; /// Simple inverted index for sparse vectors @@ -38,27 +38,45 @@ pub struct VectorSparseInvertedIndexMethodAttachment { patterns: Vec, } -pub enum VectorSparseInvertedIndexCreateState { - Init, - Run { stmt: Box }, -} - #[derive(Debug)] pub enum VectorSparseInvertedIndexInsertState { Init, Prepare { - positions: Option>, + vector: Option>, + sum: f64, rowid: i64, idx: usize, }, - Seek { - positions: Option>, + SeekScratch { + vector: Option>, + sum: f64, key: Option, rowid: i64, idx: usize, }, - Insert { - positions: Option>, + InsertScratch { + vector: Option>, + sum: f64, + key: Option, + rowid: i64, + idx: usize, + }, + SeekStats { + vector: Option>, + sum: f64, + key: Option, + rowid: i64, + idx: usize, + }, + ReadStats { + vector: Option>, + sum: f64, + rowid: i64, + idx: usize, + }, + UpdateStats { + vector: Option>, + sum: f64, key: Option, rowid: i64, idx: usize, @@ -69,18 +87,41 @@ pub enum VectorSparseInvertedIndexInsertState { pub enum VectorSparseInvertedIndexDeleteState { Init, Prepare { - positions: Option>, + vector: Option>, + sum: f64, rowid: i64, idx: usize, }, - Seek { - positions: Option>, + SeekScratch { + vector: Option>, + sum: f64, key: Option, rowid: i64, idx: usize, }, - Insert { - positions: Option>, + DeleteScratch { + vector: Option>, + sum: f64, + rowid: i64, + idx: usize, + }, + SeekStats { + vector: Option>, + sum: f64, + key: Option, + rowid: i64, + idx: usize, + }, + ReadStats { + vector: Option>, + sum: f64, + rowid: i64, + idx: usize, + }, + UpdateStats { + vector: Option>, + sum: f64, + key: Option, rowid: i64, idx: usize, }, @@ -145,10 +186,12 @@ enum VectorSparseInvertedIndexSearchState { pub struct VectorSparseInvertedIndexMethodCursor { configuration: IndexMethodConfiguration, + delta: f64, scratch_btree: String, scratch_cursor: Option, + stats_btree: String, + stats_cursor: Option, main_btree: Option, - create_state: VectorSparseInvertedIndexCreateState, insert_state: VectorSparseInvertedIndexInsertState, delete_state: VectorSparseInvertedIndexDeleteState, search_state: VectorSparseInvertedIndexSearchState, @@ -194,13 +237,20 @@ impl IndexMethodAttachment for VectorSparseInvertedIndexMethodAttachment { impl VectorSparseInvertedIndexMethodCursor { pub fn new(configuration: IndexMethodConfiguration) -> Self { let scratch_btree = format!("{}_scratch", configuration.index_name); + let stats_btree = format!("{}_stats", configuration.index_name); + let delta = match configuration.parameters.get("delta") { + Some(&Value::Float(delta)) => delta, + None => 0.0, + }; Self { configuration, + delta, scratch_btree, scratch_cursor: None, + stats_btree, + stats_cursor: None, main_btree: None, search_result: VecDeque::new(), - create_state: VectorSparseInvertedIndexCreateState::Init, insert_state: VectorSparseInvertedIndexInsertState::Init, delete_state: VectorSparseInvertedIndexDeleteState::Init, search_state: VectorSparseInvertedIndexSearchState::Init, @@ -208,57 +258,72 @@ impl VectorSparseInvertedIndexMethodCursor { } } +fn key_info() -> KeyInfo { + KeyInfo { + collation: CollationSeq::Binary, + sort_order: SortOrder::Asc, + } +} + impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { fn create(&mut self, connection: &Arc) -> Result> { - loop { - match &mut self.create_state { - VectorSparseInvertedIndexCreateState::Init => { - let columns = &self.configuration.columns; - let columns = columns.iter().map(|x| x.name.as_str()).collect::>(); - let sql = format!( - "CREATE INDEX {} ON {} USING {} ({})", - self.scratch_btree, - self.configuration.table_name, - BACKING_BTREE_INDEX_METHOD_NAME, - columns.join(", ") - ); - let stmt = connection.prepare(&sql)?; - connection.start_nested(); - self.create_state = VectorSparseInvertedIndexCreateState::Run { - stmt: Box::new(stmt), - }; - } - VectorSparseInvertedIndexCreateState::Run { stmt } => { - // we need to properly track subprograms and propagate result to the root program to make this execution async - let result = stmt.run_ignore_rows(); - connection.end_nested(); - result?; - return Ok(IOResult::Done(())); - } - } + // we need to properly track subprograms and propagate result to the root program to make this execution async + + let columns = &self.configuration.columns; + let columns = columns.iter().map(|x| x.name.as_str()).collect::>(); + let scratch_index_create = format!( + "CREATE INDEX {} ON {} USING {} ({})", + self.scratch_btree, + self.configuration.table_name, + BACKING_BTREE_INDEX_METHOD_NAME, + columns.join(", ") + ); + let stats_index_create = format!( + "CREATE INDEX {} ON {} USING {} ({})", + self.stats_btree, + self.configuration.table_name, + BACKING_BTREE_INDEX_METHOD_NAME, + columns.join(", ") + ); + for sql in [scratch_index_create, stats_index_create] { + let mut stmt = connection.prepare(&sql)?; + connection.start_nested(); + let result = stmt.run_ignore_rows(); + connection.end_nested(); + result?; } + + Ok(IOResult::Done(())) } fn destroy(&mut self, connection: &Arc) -> Result> { - let sql = format!("DROP INDEX {}", self.scratch_btree); - let mut stmt = connection.prepare(&sql)?; - connection.start_nested(); - let result = stmt.run_ignore_rows(); - connection.end_nested(); - result?; + let scratch_index_drop = format!("DROP INDEX {}", self.scratch_btree); + let stats_index_drop = format!("DROP INDEX {}", self.stats_btree); + for sql in [scratch_index_drop, stats_index_drop] { + let mut stmt = connection.prepare(&sql)?; + connection.start_nested(); + let result = stmt.run_ignore_rows(); + connection.end_nested(); + result?; + } + Ok(IOResult::Done(())) } fn open_read(&mut self, connection: &Arc) -> Result> { - let key_info = KeyInfo { - collation: CollationSeq::Binary, - sort_order: SortOrder::Asc, - }; self.scratch_cursor = Some(open_index_cursor( connection, &self.configuration.table_name, &self.scratch_btree, - vec![key_info, key_info], + // component, length, rowid + vec![key_info(), key_info(), key_info()], + )?); + self.stats_cursor = Some(open_index_cursor( + connection, + &self.configuration.table_name, + &self.stats_btree, + // component, count, non-zero-min, non-zero-max + vec![key_info(), key_info(), key_info(), key_info()], )?); self.main_btree = Some(open_table_cursor( connection, @@ -268,23 +333,32 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { } fn open_write(&mut self, connection: &Arc) -> Result> { - let key_info = KeyInfo { - collation: CollationSeq::Binary, - sort_order: SortOrder::Asc, - }; self.scratch_cursor = Some(open_index_cursor( connection, &self.configuration.table_name, &self.scratch_btree, - vec![key_info, key_info], + // component, length, rowid + vec![key_info(), key_info(), key_info()], + )?); + self.stats_cursor = Some(open_index_cursor( + connection, + &self.configuration.table_name, + &self.stats_btree, + // component + vec![key_info()], )?); Ok(IOResult::Done(())) } fn insert(&mut self, values: &[Register]) -> Result> { - let Some(cursor) = &mut self.scratch_cursor else { + let Some(scratch_cursor) = &mut self.scratch_cursor else { return Err(LimboError::InternalError( - "cursor must be opened".to_string(), + "scratch cursor must be opened".to_string(), + )); + }; + let Some(stats_cursor) = &mut self.stats_cursor else { + return Err(LimboError::InternalError( + "stats cursor must be opened".to_string(), )); }; loop { @@ -296,7 +370,7 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { "first value must be sparse vector".to_string(), )); }; - let vector = Vector::from_slice(vector)?; + let vector = Vector::from_vec(vector.to_vec())?; if !matches!(vector.vector_type, VectorType::Float32Sparse) { return Err(LimboError::InternalError( "first value must be sparse vector".to_string(), @@ -307,64 +381,196 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { "second value must be i64 rowid".to_string(), )); }; - let sparse = vector.as_f32_sparse(); + let sum = vector.as_f32_sparse().values.iter().sum::() as f64; self.insert_state = VectorSparseInvertedIndexInsertState::Prepare { - positions: Some(sparse.idx.to_vec()), + vector: Some(vector), + sum, rowid, idx: 0, } } VectorSparseInvertedIndexInsertState::Prepare { - positions, + vector, + sum, rowid, idx, } => { - let p = positions.as_ref().unwrap(); - if *idx == p.len() { + let v = vector.as_ref().unwrap(); + if *idx == v.as_f32_sparse().idx.len() { self.insert_state = VectorSparseInvertedIndexInsertState::Init; return Ok(IOResult::Done(())); } - let position = p[*idx]; + let position = v.as_f32_sparse().idx[*idx]; let key = ImmutableRecord::from_values( - &[Value::Integer(position as i64), Value::Integer(*rowid)], - 2, + &[ + Value::Integer(position as i64), + Value::Float(*sum), + Value::Integer(*rowid), + ], + 3, ); - self.insert_state = VectorSparseInvertedIndexInsertState::Seek { + self.insert_state = VectorSparseInvertedIndexInsertState::SeekScratch { + vector: vector.take(), + sum: *sum, idx: *idx, rowid: *rowid, - positions: positions.take(), key: Some(key), }; } - VectorSparseInvertedIndexInsertState::Seek { - positions, + VectorSparseInvertedIndexInsertState::SeekScratch { + vector, + sum, rowid, idx, key, } => { let k = key.as_ref().unwrap(); - let _ = return_if_io!( - cursor.seek(SeekKey::IndexKey(k), SeekOp::GE { eq_only: false }) - ); - self.insert_state = VectorSparseInvertedIndexInsertState::Insert { + let _ = + return_if_io!(scratch_cursor + .seek(SeekKey::IndexKey(k), SeekOp::GE { eq_only: false })); + self.insert_state = VectorSparseInvertedIndexInsertState::InsertScratch { + vector: vector.take(), + sum: *sum, idx: *idx, rowid: *rowid, - positions: positions.take(), key: key.take(), }; } - VectorSparseInvertedIndexInsertState::Insert { - positions, + VectorSparseInvertedIndexInsertState::InsertScratch { + vector, + sum, rowid, idx, key, } => { let k = key.as_ref().unwrap(); - return_if_io!(cursor.insert(&BTreeKey::IndexKey(k))); + return_if_io!(scratch_cursor.insert(&BTreeKey::IndexKey(k))); + + let v = vector.as_ref().unwrap(); + let position = v.as_f32_sparse().idx[*idx]; + let key = ImmutableRecord::from_values(&[Value::Integer(position as i64)], 1); + self.insert_state = VectorSparseInvertedIndexInsertState::SeekStats { + vector: vector.take(), + sum: *sum, + idx: *idx, + rowid: *rowid, + key: Some(key), + }; + } + VectorSparseInvertedIndexInsertState::SeekStats { + vector, + sum, + key, + rowid, + idx, + } => { + let k = key.as_ref().unwrap(); + let result = return_if_io!( + stats_cursor.seek(SeekKey::IndexKey(k), SeekOp::GE { eq_only: false }) + ); + match result { + SeekResult::Found => { + self.insert_state = VectorSparseInvertedIndexInsertState::ReadStats { + vector: vector.take(), + sum: *sum, + idx: *idx, + rowid: *rowid, + }; + } + SeekResult::NotFound | SeekResult::TryAdvance => { + let v = vector.as_ref().unwrap(); + let position = v.as_f32_sparse().idx[*idx]; + let value = v.as_f32_sparse().values[*idx] as f64; + tracing::debug!( + "update stats(insert): {} (cnt={}, min={}, max={})", + position, + 1, + value, + value, + ); + let key = ImmutableRecord::from_values( + &[ + Value::Integer(position as i64), + Value::Integer(1 as i64), + Value::Float(value), + Value::Float(value), + ], + 4, + ); + self.insert_state = VectorSparseInvertedIndexInsertState::UpdateStats { + vector: vector.take(), + sum: *sum, + idx: *idx, + rowid: *rowid, + key: Some(key), + }; + } + } + } + VectorSparseInvertedIndexInsertState::ReadStats { + vector, + sum, + rowid, + idx, + } => { + let record = return_if_io!(stats_cursor.record()).unwrap(); + let ValueRef::Integer(cnt) = record.get_value(1)? else { + return Err(LimboError::Corrupt(format!( + "stats index corrupted: expected integer" + ))); + }; + let ValueRef::Float(min) = record.get_value(2)? else { + return Err(LimboError::Corrupt(format!( + "stats index corrupted: expected float" + ))); + }; + let ValueRef::Float(max) = record.get_value(3)? else { + return Err(LimboError::Corrupt(format!( + "stats index corrupted: expected float" + ))); + }; + let v = vector.as_ref().unwrap(); + let position = v.as_f32_sparse().idx[*idx]; + let value = v.as_f32_sparse().values[*idx] as f64; + tracing::debug!( + "update stats(insert): {} (cnt={}, min={}, max={})", + position, + cnt + 1, + value.min(min), + value.max(max), + ); + let key = ImmutableRecord::from_values( + &[ + Value::Integer(position as i64), + Value::Integer(cnt + 1), + Value::Float(value.min(min)), + Value::Float(value.max(max)), + ], + 4, + ); + self.insert_state = VectorSparseInvertedIndexInsertState::UpdateStats { + vector: vector.take(), + sum: *sum, + idx: *idx, + rowid: *rowid, + key: Some(key), + }; + } + VectorSparseInvertedIndexInsertState::UpdateStats { + vector, + sum, + key, + rowid, + idx, + } => { + let k = key.as_ref().unwrap(); + return_if_io!(stats_cursor.insert(&BTreeKey::IndexKey(k))); + self.insert_state = VectorSparseInvertedIndexInsertState::Prepare { + vector: vector.take(), + sum: *sum, idx: *idx + 1, rowid: *rowid, - positions: positions.take(), }; } } @@ -377,6 +583,11 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { "cursor must be opened".to_string(), )); }; + let Some(stats_cursor) = &mut self.stats_cursor else { + return Err(LimboError::InternalError( + "stats cursor must be opened".to_string(), + )); + }; loop { tracing::debug!("delete_state: {:?}", self.delete_state); match &mut self.delete_state { @@ -386,7 +597,7 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { "first value must be sparse vector".to_string(), )); }; - let vector = Vector::from_slice(vector)?; + let vector = Vector::from_vec(vector.to_vec())?; if !matches!(vector.vector_type, VectorType::Float32Sparse) { return Err(LimboError::InternalError( "first value must be sparse vector".to_string(), @@ -397,37 +608,45 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { "second value must be i64 rowid".to_string(), )); }; - let sparse = vector.as_f32_sparse(); + let sum = vector.as_f32_sparse().values.iter().sum::() as f64; self.delete_state = VectorSparseInvertedIndexDeleteState::Prepare { - positions: Some(sparse.idx.to_vec()), + vector: Some(vector), + sum, rowid, idx: 0, } } VectorSparseInvertedIndexDeleteState::Prepare { - positions, + vector, + sum, rowid, idx, } => { - let p = positions.as_ref().unwrap(); - if *idx == p.len() { + let v = vector.as_ref().unwrap(); + if *idx == v.as_f32_sparse().idx.len() { self.delete_state = VectorSparseInvertedIndexDeleteState::Init; return Ok(IOResult::Done(())); } - let position = p[*idx]; + let position = v.as_f32_sparse().idx[*idx]; let key = ImmutableRecord::from_values( - &[Value::Integer(position as i64), Value::Integer(*rowid)], - 2, + &[ + Value::Integer(position as i64), + Value::Float(*sum), + Value::Integer(*rowid), + ], + 3, ); - self.delete_state = VectorSparseInvertedIndexDeleteState::Seek { + self.delete_state = VectorSparseInvertedIndexDeleteState::SeekScratch { + vector: vector.take(), idx: *idx, + sum: *sum, rowid: *rowid, - positions: positions.take(), key: Some(key), }; } - VectorSparseInvertedIndexDeleteState::Seek { - positions, + VectorSparseInvertedIndexDeleteState::SeekScratch { + vector, + sum, rowid, idx, key, @@ -439,22 +658,121 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { if !matches!(result, SeekResult::Found) { return Err(LimboError::Corrupt("inverted index corrupted".to_string())); } - self.delete_state = VectorSparseInvertedIndexDeleteState::Insert { + self.delete_state = VectorSparseInvertedIndexDeleteState::DeleteScratch { + vector: vector.take(), + sum: *sum, idx: *idx, rowid: *rowid, - positions: positions.take(), }; } - VectorSparseInvertedIndexDeleteState::Insert { - positions, + VectorSparseInvertedIndexDeleteState::DeleteScratch { + vector, + sum, rowid, idx, } => { return_if_io!(cursor.delete()); + let v = vector.as_ref().unwrap(); + let position = v.as_f32_sparse().idx[*idx]; + let key = ImmutableRecord::from_values(&[Value::Integer(position as i64)], 1); + self.delete_state = VectorSparseInvertedIndexDeleteState::SeekStats { + vector: vector.take(), + sum: *sum, + idx: *idx, + rowid: *rowid, + key: Some(key), + }; + } + VectorSparseInvertedIndexDeleteState::SeekStats { + vector, + sum, + key, + rowid, + idx, + } => { + let k = key.as_ref().unwrap(); + let result = return_if_io!( + stats_cursor.seek(SeekKey::IndexKey(k), SeekOp::GE { eq_only: true }) + ); + match result { + SeekResult::Found => { + self.delete_state = VectorSparseInvertedIndexDeleteState::ReadStats { + vector: vector.take(), + sum: *sum, + idx: *idx, + rowid: *rowid, + }; + } + SeekResult::NotFound | SeekResult::TryAdvance => { + return Err(LimboError::Corrupt(format!( + "stats index corrupted: can't find component row" + ))) + } + } + } + VectorSparseInvertedIndexDeleteState::ReadStats { + vector, + sum, + rowid, + idx, + } => { + let record = return_if_io!(stats_cursor.record()).unwrap(); + let ValueRef::Integer(cnt) = record.get_value(1)? else { + return Err(LimboError::Corrupt(format!( + "stats index corrupted: expected integer" + ))); + }; + let ValueRef::Float(min) = record.get_value(2)? else { + return Err(LimboError::Corrupt(format!( + "stats index corrupted: expected float" + ))); + }; + let ValueRef::Float(max) = record.get_value(3)? else { + return Err(LimboError::Corrupt(format!( + "stats index corrupted: expected float" + ))); + }; + let v = vector.as_ref().unwrap(); + let position = v.as_f32_sparse().idx[*idx]; + tracing::debug!( + "update stats(delete): {} (cnt={}, min={}, max={})", + position, + cnt - 1, + min, + max, + ); + let key = ImmutableRecord::from_values( + &[ + Value::Integer(position as i64), + Value::Integer(cnt - 1), + Value::Float(min), + Value::Float(max), + ], + 4, + ); + self.delete_state = VectorSparseInvertedIndexDeleteState::UpdateStats { + vector: vector.take(), + sum: *sum, + idx: *idx, + rowid: *rowid, + key: Some(key), + }; + } + VectorSparseInvertedIndexDeleteState::UpdateStats { + vector, + sum, + key, + rowid, + idx, + } => { + let k = key.as_ref().unwrap(); + return_if_io!(stats_cursor.insert(&BTreeKey::IndexKey(k))); + self.delete_state = VectorSparseInvertedIndexDeleteState::Prepare { + vector: vector.take(), + sum: *sum, idx: *idx + 1, rowid: *rowid, - positions: positions.take(), }; } } From a69dc51b3cc496649c75808872aa3e3b3d28928b Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 29 Oct 2025 18:11:06 +0400 Subject: [PATCH 02/15] adjust assertions in core in order to store auxilary data in the index row --- core/types.rs | 35 ++++++++++++++++++----------------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/core/types.rs b/core/types.rs index 617d68a2e..cec21c92f 100644 --- a/core/types.rs +++ b/core/types.rs @@ -16,7 +16,7 @@ use crate::translate::plan::IterationDirection; use crate::vdbe::sorter::Sorter; use crate::vdbe::Register; use crate::vtab::VirtualTableCursor; -use crate::{turso_assert, Completion, CompletionError, Result, IO}; +use crate::{Completion, CompletionError, Result, IO}; use std::fmt::{Debug, Display}; use std::task::Waker; @@ -1594,9 +1594,21 @@ pub fn compare_immutable( r: &[ValueRef], column_info: &[KeyInfo], ) -> std::cmp::Ordering { - assert_eq!(l.len(), r.len()); - turso_assert!(column_info.len() >= l.len(), "column_info.len() < l.len()"); - for (i, (l, r)) in l.iter().zip(r).enumerate() { + assert!( + l.len() >= column_info.len(), + "{} < {}", + l.len(), + column_info.len() + ); + assert!( + r.len() >= column_info.len(), + "{} < {}", + r.len(), + column_info.len() + ); + let l_values = l.iter().take(column_info.len()); + let r_values = r.iter().take(column_info.len()); + for (i, (l, r)) in l_values.zip(r_values).enumerate() { let column_order = column_info[i].sort_order; let collation = column_info[i].collation; let cmp = match (l, r) { @@ -1720,10 +1732,6 @@ fn compare_records_int( index_info: &IndexInfo, tie_breaker: std::cmp::Ordering, ) -> Result { - turso_assert!( - index_info.key_info.len() >= unpacked.len(), - "index_info.key_info.len() < unpacked.len()" - ); let payload = serialized.get_payload(); if payload.len() < 2 { return compare_records_generic(serialized, unpacked, index_info, 0, tie_breaker); @@ -1813,10 +1821,6 @@ fn compare_records_string( index_info: &IndexInfo, tie_breaker: std::cmp::Ordering, ) -> Result { - turso_assert!( - index_info.key_info.len() >= unpacked.len(), - "index_info.key_info.len() < unpacked.len()" - ); let payload = serialized.get_payload(); if payload.len() < 2 { return compare_records_generic(serialized, unpacked, index_info, 0, tie_breaker); @@ -1926,10 +1930,6 @@ pub fn compare_records_generic( skip: usize, tie_breaker: std::cmp::Ordering, ) -> Result { - turso_assert!( - index_info.key_info.len() >= unpacked.len(), - "index_info.key_info.len() < unpacked.len()" - ); let payload = serialized.get_payload(); if payload.is_empty() { return Ok(std::cmp::Ordering::Less); @@ -1960,7 +1960,8 @@ pub fn compare_records_generic( } let mut field_idx = skip; - while field_idx < unpacked.len() && header_pos < header_end { + let field_limit = unpacked.len().min(index_info.key_info.len()); + while field_idx < field_limit && header_pos < header_end { let (serial_type_raw, bytes_read) = read_varint(&payload[header_pos..])?; header_pos += bytes_read; From c1875954924179d8f76402347a47443bde5deefe Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 29 Oct 2025 22:15:09 +0400 Subject: [PATCH 03/15] finalize improvements for toy index --- core/index_method/toy_vector_sparse_ivf.rs | 517 +++++++++++++++------ 1 file changed, 366 insertions(+), 151 deletions(-) diff --git a/core/index_method/toy_vector_sparse_ivf.rs b/core/index_method/toy_vector_sparse_ivf.rs index a83c56f45..50a7a6eb8 100644 --- a/core/index_method/toy_vector_sparse_ivf.rs +++ b/core/index_method/toy_vector_sparse_ivf.rs @@ -142,45 +142,145 @@ impl Ord for FloatOrd { } } +#[derive(Debug)] +struct ComponentStat { + position: u32, + cnt: i64, + min: f64, + max: f64, +} + +fn parse_stat_row(record: Option<&ImmutableRecord>) -> Result { + let Some(record) = record else { + return Err(LimboError::Corrupt(format!( + "stats index corrupted: expected row" + ))); + }; + let ValueRef::Integer(position) = record.get_value(0)? else { + return Err(LimboError::Corrupt(format!( + "stats index corrupted: expected integer" + ))); + }; + let ValueRef::Integer(cnt) = record.get_value(1)? else { + return Err(LimboError::Corrupt(format!( + "stats index corrupted: expected integer" + ))); + }; + let ValueRef::Float(min) = record.get_value(2)? else { + return Err(LimboError::Corrupt(format!( + "stats index corrupted: expected float" + ))); + }; + let ValueRef::Float(max) = record.get_value(3)? else { + return Err(LimboError::Corrupt(format!( + "stats index corrupted: expected float" + ))); + }; + Ok(ComponentStat { + position: position as u32, + cnt, + min, + max, + }) +} +#[derive(Debug)] +struct ComponentRow { + position: u32, + sum: f64, + rowid: i64, +} + +fn parse_scratch_row(record: Option<&ImmutableRecord>) -> Result { + let Some(record) = record else { + return Err(LimboError::Corrupt(format!( + "scratch index corrupted: expected row" + ))); + }; + let ValueRef::Integer(position) = record.get_value(0)? else { + return Err(LimboError::Corrupt(format!( + "scratch index corrupted: expected integer" + ))); + }; + let ValueRef::Float(sum) = record.get_value(1)? else { + return Err(LimboError::Corrupt(format!( + "scratch index corrupted: expected float" + ))); + }; + let ValueRef::Integer(rowid) = record.get_value(2)? else { + return Err(LimboError::Corrupt(format!( + "scratch index corrupted: expected integer" + ))); + }; + Ok(ComponentRow { + position: position as u32, + sum, + rowid, + }) +} + #[derive(Debug)] enum VectorSparseInvertedIndexSearchState { Init, - Prepare { - collected: Option>, - positions: Option>, - idx: usize, + CollectComponentsSeek { + sum: f64, + positions: Option>, + components: Option>, + limit: i64, + key: Option, + }, + CollectComponentsRead { + sum: f64, + positions: Option>, + components: Option>, limit: i64, }, Seek { + sum: f64, + components: Option>, collected: Option>, - positions: Option>, - key: Option, - idx: usize, + distances: Option>, limit: i64, + key: Option, + sum_threshold: Option, + component: Option, }, Read { + sum: f64, + components: Option>, collected: Option>, - positions: Option>, - key: Option, - idx: usize, + distances: Option>, limit: i64, + sum_threshold: Option, + component: u32, + current: Option>, }, Next { + sum: f64, + components: Option>, collected: Option>, - positions: Option>, - key: Option, - idx: usize, + distances: Option>, limit: i64, + sum_threshold: Option, + component: u32, + current: Option>, }, EvaluateSeek { - rowids: Option>, + sum: f64, + components: Option>, + collected: Option>, distances: Option>, limit: i64, + current: Option>, + rowid: Option, }, EvaluateRead { - rowids: Option>, + sum: f64, + components: Option>, + collected: Option>, distances: Option>, limit: i64, + current: Option>, + rowid: i64, }, } @@ -240,7 +340,7 @@ impl VectorSparseInvertedIndexMethodCursor { let stats_btree = format!("{}_stats", configuration.index_name); let delta = match configuration.parameters.get("delta") { Some(&Value::Float(delta)) => delta, - None => 0.0, + _ => 0.0, }; Self { configuration, @@ -322,8 +422,8 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { connection, &self.configuration.table_name, &self.stats_btree, - // component, count, non-zero-min, non-zero-max - vec![key_info(), key_info(), key_info(), key_info()], + // component + vec![key_info()], )?); self.main_btree = Some(open_table_cursor( connection, @@ -513,38 +613,24 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { rowid, idx, } => { - let record = return_if_io!(stats_cursor.record()).unwrap(); - let ValueRef::Integer(cnt) = record.get_value(1)? else { - return Err(LimboError::Corrupt(format!( - "stats index corrupted: expected integer" - ))); - }; - let ValueRef::Float(min) = record.get_value(2)? else { - return Err(LimboError::Corrupt(format!( - "stats index corrupted: expected float" - ))); - }; - let ValueRef::Float(max) = record.get_value(3)? else { - return Err(LimboError::Corrupt(format!( - "stats index corrupted: expected float" - ))); - }; + let record = return_if_io!(stats_cursor.record()); + let component = parse_stat_row(record.as_deref())?; let v = vector.as_ref().unwrap(); let position = v.as_f32_sparse().idx[*idx]; let value = v.as_f32_sparse().values[*idx] as f64; tracing::debug!( "update stats(insert): {} (cnt={}, min={}, max={})", position, - cnt + 1, - value.min(min), - value.max(max), + component.cnt + 1, + value.min(component.min), + value.max(component.max), ); let key = ImmutableRecord::from_values( &[ Value::Integer(position as i64), - Value::Integer(cnt + 1), - Value::Float(value.min(min)), - Value::Float(value.max(max)), + Value::Integer(component.cnt + 1), + Value::Float(value.min(component.min)), + Value::Float(value.max(component.max)), ], 4, ); @@ -716,37 +802,23 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { rowid, idx, } => { - let record = return_if_io!(stats_cursor.record()).unwrap(); - let ValueRef::Integer(cnt) = record.get_value(1)? else { - return Err(LimboError::Corrupt(format!( - "stats index corrupted: expected integer" - ))); - }; - let ValueRef::Float(min) = record.get_value(2)? else { - return Err(LimboError::Corrupt(format!( - "stats index corrupted: expected float" - ))); - }; - let ValueRef::Float(max) = record.get_value(3)? else { - return Err(LimboError::Corrupt(format!( - "stats index corrupted: expected float" - ))); - }; + let record = return_if_io!(stats_cursor.record()); + let component = parse_stat_row(record.as_deref())?; let v = vector.as_ref().unwrap(); let position = v.as_f32_sparse().idx[*idx]; tracing::debug!( "update stats(delete): {} (cnt={}, min={}, max={})", position, - cnt - 1, - min, - max, + component.cnt - 1, + component.min, + component.max, ); let key = ImmutableRecord::from_values( &[ Value::Integer(position as i64), - Value::Integer(cnt - 1), - Value::Float(min), - Value::Float(max), + Value::Integer(component.cnt - 1), + Value::Float(component.min), + Value::Float(component.max), ], 4, ); @@ -785,6 +857,11 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { "cursor must be opened".to_string(), )); }; + let Some(stats) = &mut self.stats_cursor else { + return Err(LimboError::InternalError( + "cursor must be opened".to_string(), + )); + }; let Some(main) = &mut self.main_btree else { return Err(LimboError::InternalError( "cursor must be opened".to_string(), @@ -811,52 +888,139 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { )); } let sparse = vector.as_f32_sparse(); - self.search_state = VectorSparseInvertedIndexSearchState::Prepare { - collected: Some(HashSet::new()), - positions: Some(sparse.idx.to_vec()), - idx: 0, - limit, - }; + let sum = sparse.values.iter().sum::() as f64; + self.search_state = + VectorSparseInvertedIndexSearchState::CollectComponentsSeek { + sum, + positions: Some(sparse.idx.to_vec().into()), + components: Some(Vec::new()), + key: None, + limit, + }; } - VectorSparseInvertedIndexSearchState::Prepare { - collected, + VectorSparseInvertedIndexSearchState::CollectComponentsSeek { + sum, positions, - idx, + components, limit, + key, } => { let p = positions.as_ref().unwrap(); - if *idx == p.len() { - let mut rowids = collected - .take() - .unwrap() - .iter() - .cloned() - .collect::>(); - rowids.sort(); - self.search_state = VectorSparseInvertedIndexSearchState::EvaluateSeek { - rowids: Some(rowids), + if p.len() == 0 && key.is_none() { + let mut components = components.take().unwrap(); + // order by cnt ASC in order to check low-cardinality components first + components.sort_by_key(|c| c.cnt); + + tracing::debug!( + "query_start: components: {:?}, delta: {}", + components, + self.delta + ); + self.search_state = VectorSparseInvertedIndexSearchState::Seek { + sum: *sum, + components: Some(components.into()), + collected: Some(HashSet::new()), distances: Some(BTreeSet::new()), limit: *limit, + key: None, + component: None, + sum_threshold: None, }; continue; } - let position = p[*idx]; - let key = ImmutableRecord::from_values(&[Value::Integer(position as i64)], 1); - self.search_state = VectorSparseInvertedIndexSearchState::Seek { - collected: collected.take(), - positions: positions.take(), - key: Some(key), - idx: *idx, - limit: *limit, - }; + if key.is_none() { + let position = positions.as_mut().unwrap().pop_front().unwrap(); + *key = Some(ImmutableRecord::from_values( + &[Value::Integer(position as i64)], + 1, + )); + } + let k = key.as_ref().unwrap(); + let result = return_if_io!( + stats.seek(SeekKey::IndexKey(k), SeekOp::GE { eq_only: true }) + ); + match result { + SeekResult::Found => { + self.search_state = + VectorSparseInvertedIndexSearchState::CollectComponentsRead { + sum: *sum, + positions: positions.take(), + components: components.take(), + limit: *limit, + }; + } + SeekResult::NotFound | SeekResult::TryAdvance => { + self.search_state = + VectorSparseInvertedIndexSearchState::CollectComponentsSeek { + sum: *sum, + components: components.take(), + positions: positions.take(), + limit: *limit, + key: None, + }; + } + } } - VectorSparseInvertedIndexSearchState::Seek { - collected, + VectorSparseInvertedIndexSearchState::CollectComponentsRead { + sum, positions, - key, - idx, + components, limit, } => { + let record = return_if_io!(stats.record()); + let component = parse_stat_row(record.as_deref())?; + components.as_mut().unwrap().push(component); + self.search_state = + VectorSparseInvertedIndexSearchState::CollectComponentsSeek { + sum: *sum, + components: components.take(), + positions: positions.take(), + limit: *limit, + key: None, + }; + } + VectorSparseInvertedIndexSearchState::Seek { + sum, + components, + collected, + distances, + limit, + key, + component, + sum_threshold, + } => { + let c = components.as_ref().unwrap(); + if c.len() == 0 && key.is_none() { + let distances = distances.take().unwrap(); + self.search_result = distances.iter().map(|(d, i)| (*i, d.0)).collect(); + return Ok(IOResult::Done(!self.search_result.is_empty())); + } + if key.is_none() { + let remained_sum = c.iter().map(|c| c.max).sum::(); + if distances.as_ref().unwrap().len() >= *limit as usize { + if let Some((max_threshold, _)) = distances.as_ref().unwrap().last() { + let max_threshold = (1.0 - max_threshold.0) + self.delta; + if max_threshold > 0.0 { + *sum_threshold = + Some(remained_sum / max_threshold + remained_sum - *sum); + tracing::info!( + "sum_threshold={:?}, max_threshold={}, remained_sum={}, sum={}, components={:?}", + sum_threshold, + max_threshold, + remained_sum, + sum, + c + ); + } + } + } + let c = components.as_mut().unwrap().pop_front().unwrap(); + *key = Some(ImmutableRecord::from_values( + &[Value::Integer(c.position as i64)], + 1, + )); + *component = Some(c.position); + } let k = key.as_ref().unwrap(); let result = return_if_io!( scratch.seek(SeekKey::IndexKey(k), SeekOp::GE { eq_only: false }) @@ -864,101 +1028,141 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { match result { SeekResult::Found => { self.search_state = VectorSparseInvertedIndexSearchState::Read { + sum: *sum, + components: components.take(), collected: collected.take(), - positions: positions.take(), - key: key.take(), - idx: *idx, + distances: distances.take(), + current: Some(Vec::new()), limit: *limit, + sum_threshold: sum_threshold.take(), + component: component.unwrap(), }; } SeekResult::TryAdvance | SeekResult::NotFound => { self.search_state = VectorSparseInvertedIndexSearchState::Next { + sum: *sum, + components: components.take(), collected: collected.take(), - positions: positions.take(), - key: key.take(), - idx: *idx, + distances: distances.take(), + current: Some(Vec::new()), limit: *limit, + sum_threshold: sum_threshold.take(), + component: component.unwrap(), }; } } } VectorSparseInvertedIndexSearchState::Read { + sum, + components, collected, - positions, - key, - idx, + distances, limit, + sum_threshold, + component, + current, } => { let record = return_if_io!(scratch.record()); - if let Some(record) = record { - let ValueRef::Integer(position) = record.get_value(0)? else { - return Err(LimboError::InternalError( - "first value of index record must be int".to_string(), - )); + let row = parse_scratch_row(record.as_deref())?; + if row.position != *component + || (sum_threshold.is_some() && row.sum > sum_threshold.unwrap()) + { + let mut current = current.take().unwrap(); + current.sort(); + + self.search_state = VectorSparseInvertedIndexSearchState::EvaluateSeek { + sum: *sum, + components: components.take(), + collected: collected.take(), + distances: distances.take(), + limit: *limit, + current: Some(current.into()), + rowid: None, }; - let ValueRef::Integer(rowid) = record.get_value(1)? else { - return Err(LimboError::InternalError( - "second value of index record must be int".to_string(), - )); - }; - tracing::debug!("position/rowid: {}/{}", position, rowid); - if position == positions.as_ref().unwrap()[*idx] as i64 { - collected.as_mut().unwrap().insert(rowid); - self.search_state = VectorSparseInvertedIndexSearchState::Next { - collected: collected.take(), - positions: positions.take(), - key: key.take(), - idx: *idx, - limit: *limit, - }; - continue; - } + continue; } - self.search_state = VectorSparseInvertedIndexSearchState::Prepare { + if collected.as_mut().unwrap().insert(row.rowid) { + current.as_mut().unwrap().push(row.rowid); + } + + self.search_state = VectorSparseInvertedIndexSearchState::Next { + sum: *sum, + components: components.take(), collected: collected.take(), - positions: positions.take(), - idx: *idx + 1, + distances: distances.take(), limit: *limit, + sum_threshold: *sum_threshold, + component: *component, + current: current.take(), }; } VectorSparseInvertedIndexSearchState::Next { + sum, + components, collected, - positions, - key, - idx, + distances, limit, + sum_threshold, + component, + current, } => { let result = return_if_io!(scratch.next()); if !result { - self.search_state = VectorSparseInvertedIndexSearchState::Prepare { + let mut current = current.take().unwrap(); + current.sort(); + + self.search_state = VectorSparseInvertedIndexSearchState::EvaluateSeek { + sum: *sum, + components: components.take(), collected: collected.take(), - positions: positions.take(), - idx: *idx + 1, + distances: distances.take(), limit: *limit, + current: Some(current.into()), + rowid: None, }; } else { self.search_state = VectorSparseInvertedIndexSearchState::Read { + sum: *sum, + components: components.take(), collected: collected.take(), - positions: positions.take(), - key: key.take(), - idx: *idx, + distances: distances.take(), limit: *limit, + sum_threshold: *sum_threshold, + component: *component, + current: current.take(), }; } } VectorSparseInvertedIndexSearchState::EvaluateSeek { - rowids, + sum, + components, + collected, distances, limit, + current, + rowid, } => { - let Some(rowid) = rowids.as_ref().unwrap().last() else { - let distances = distances.take().unwrap(); - self.search_result = distances.iter().map(|(d, i)| (*i, d.0)).collect(); - return Ok(IOResult::Done(!self.search_result.is_empty())); - }; - let result = return_if_io!( - main.seek(SeekKey::TableRowId(*rowid), SeekOp::GE { eq_only: true }) - ); + let c = current.as_ref().unwrap(); + if c.len() == 0 && rowid.is_none() { + self.search_state = VectorSparseInvertedIndexSearchState::Seek { + sum: *sum, + components: components.take(), + collected: collected.take(), + distances: distances.take(), + limit: *limit, + component: None, + key: None, + sum_threshold: None, + }; + continue; + } + if rowid.is_none() { + *rowid = Some(current.as_mut().unwrap().pop_front().unwrap()); + } + + let rowid = rowid.as_ref().unwrap().clone(); + let k = SeekKey::TableRowId(rowid); + let result = return_if_io!(main.seek(k, SeekOp::GE { eq_only: true })); if !matches!(result, SeekResult::Found) { return Err(LimboError::Corrupt( "vector_sparse_ivf corrupted: unable to find rowid in main table" @@ -966,18 +1170,25 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { )); }; self.search_state = VectorSparseInvertedIndexSearchState::EvaluateRead { - rowids: rowids.take(), + sum: *sum, + components: components.take(), + collected: collected.take(), distances: distances.take(), limit: *limit, + current: current.take(), + rowid, }; } VectorSparseInvertedIndexSearchState::EvaluateRead { - rowids, + sum, + components, + collected, distances, limit, + current, + rowid, } => { let record = return_if_io!(main.record()); - let rowid = rowids.as_mut().unwrap().pop().unwrap(); if let Some(record) = record { let column_idx = self.configuration.columns[0].pos_in_table; let ValueRef::Blob(data) = record.get_value(column_idx)? else { @@ -1009,16 +1220,20 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { ); let distance = operations::jaccard::vector_distance_jaccard(&data, &arg)?; let distances = distances.as_mut().unwrap(); - distances.insert((FloatOrd(distance), rowid)); + distances.insert((FloatOrd(distance), *rowid)); if distances.len() > *limit as usize { let _ = distances.pop_last(); } } self.search_state = VectorSparseInvertedIndexSearchState::EvaluateSeek { - rowids: rowids.take(), + sum: *sum, + components: components.take(), + collected: collected.take(), distances: distances.take(), limit: *limit, + current: current.take(), + rowid: None, }; } } From 476de1805a2beb304c877f5ffd9bba1c9491c7f2 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 29 Oct 2025 22:49:17 +0400 Subject: [PATCH 04/15] adjust bounds --- core/index_method/toy_vector_sparse_ivf.rs | 43 ++++++++++++++++------ 1 file changed, 31 insertions(+), 12 deletions(-) diff --git a/core/index_method/toy_vector_sparse_ivf.rs b/core/index_method/toy_vector_sparse_ivf.rs index 50a7a6eb8..4a358f1c4 100644 --- a/core/index_method/toy_vector_sparse_ivf.rs +++ b/core/index_method/toy_vector_sparse_ivf.rs @@ -996,21 +996,40 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { return Ok(IOResult::Done(!self.search_result.is_empty())); } if key.is_none() { - let remained_sum = c.iter().map(|c| c.max).sum::(); + // we estimate jaccard distance with the following approach: + // J = min(L, M1 + M2 + ... + Mr) / (Q + N - min(L, M1 + M2 + ... + Mr)) + // so we want J > best + delta; define M1 + M2 + ... + Mr = M + // J = min(L, M) / (Q + L - min(L, M)) > best + delta + // we need to consider two cases: + // 1. L < M: J = L / (Q + L - L) > best + delta => L > (best + delta) * Q + // 2. L > M: J = M / (Q + L - M) > best + delta => M > (best + delta) * (Q + L - M) => L < M / (best + delta) - (Q - M) + // so we have two intervals: [(best + delta) * Q .. M] and [M .. M / (best + delta) - (Q - M)] + // to simplify code for now we will pick upper bound from second range if it is not degenerate, otherwise check first range + let m = c.iter().map(|c| c.max).sum::().min(*sum); if distances.as_ref().unwrap().len() >= *limit as usize { if let Some((max_threshold, _)) = distances.as_ref().unwrap().last() { - let max_threshold = (1.0 - max_threshold.0) + self.delta; - if max_threshold > 0.0 { - *sum_threshold = - Some(remained_sum / max_threshold + remained_sum - *sum); + let best = 1.0 - max_threshold.0; + let delta = self.delta; + let q = *sum; + + if best > 0.0 { + let first_range_l = (best + delta) * q; + let second_range_r = m / (best + delta) - (q - m); + if m <= second_range_r { + *sum_threshold = Some(second_range_r); + } else if first_range_l <= m { + *sum_threshold = Some(m); + } else { + *sum_threshold = Some(-1.0); + } tracing::info!( - "sum_threshold={:?}, max_threshold={}, remained_sum={}, sum={}, components={:?}", - sum_threshold, - max_threshold, - remained_sum, - sum, - c - ); + "sum_threshold={:?}, max_threshold={}, remained_sum={}, sum={}, components={:?}", + sum_threshold, + best, + m, + sum, + c + ); } } } From 29101a4b17609b6935d792d9a84a3ff13c33cd94 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 29 Oct 2025 22:49:29 +0400 Subject: [PATCH 05/15] fix test --- tests/integration/index_method/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/index_method/mod.rs b/tests/integration/index_method/mod.rs index dd5e32634..126ffb500 100644 --- a/tests/integration/index_method/mod.rs +++ b/tests/integration/index_method/mod.rs @@ -75,7 +75,7 @@ fn test_vector_sparse_ivf_create_destroy() { run(&tmp_db, || cursor.create(&conn)).unwrap(); } conn.wal_insert_end(true).unwrap(); - assert_eq!(schema_rows(), vec!["t", "t_idx_scratch"]); + assert_eq!(schema_rows(), vec!["t", "t_idx_scratch", "t_idx_stats"]); conn.wal_insert_begin().unwrap(); { From 49c1cf63b936b17c5073c5613ac94e5e74d0922d Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Fri, 31 Oct 2025 14:22:55 +0400 Subject: [PATCH 06/15] slightly adjust test --- tests/integration/index_method/mod.rs | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/tests/integration/index_method/mod.rs b/tests/integration/index_method/mod.rs index 126ffb500..1303b6fd6 100644 --- a/tests/integration/index_method/mod.rs +++ b/tests/integration/index_method/mod.rs @@ -264,8 +264,8 @@ fn test_vector_sparse_ivf_fuzz() { const MOD: u32 = 5; let (mut rng, _) = rng_from_time_or_env(); - let mut keys = Vec::new(); - for _ in 0..10 { + let mut operation = 0; + for attempt in 0..10 { let seed = rng.next_u64(); tracing::info!("======== seed: {} ========", seed); @@ -274,11 +274,19 @@ fn test_vector_sparse_ivf_fuzz() { TempDatabase::new_with_rusqlite("CREATE TABLE t(key TEXT PRIMARY KEY, embedding)"); let index_db = TempDatabase::new_with_rusqlite("CREATE TABLE t(key TEXT PRIMARY KEY, embedding)"); + tracing::info!( + "simple_db: {:?}, index_db: {:?}", + simple_db.path, + index_db.path, + ); let simple_conn = simple_db.connect_limbo(); let index_conn = index_db.connect_limbo(); + simple_conn.wal_auto_checkpoint_disable(); + index_conn.wal_auto_checkpoint_disable(); index_conn .execute("CREATE INDEX t_idx ON t USING toy_vector_sparse_ivf (embedding)") .unwrap(); + let vector = |rng: &mut ChaCha8Rng| { let mut values = Vec::with_capacity(DIMS); for _ in 0..DIMS { @@ -291,13 +299,15 @@ fn test_vector_sparse_ivf_fuzz() { format!("[{}]", values.join(", ")) }; + let mut keys = Vec::new(); for _ in 0..200 { let choice = rng.next_u32() % 4; + operation += 1; if choice == 0 { let key = rng.next_u64().to_string(); let v = vector(&mut rng); let sql = format!("INSERT INTO t VALUES ('{key}', vector32_sparse('{v}'))"); - tracing::info!("{}", sql); + tracing::info!("({}) {}", operation, sql); simple_conn.execute(&sql).unwrap(); index_conn.execute(sql).unwrap(); keys.push(key); @@ -307,14 +317,14 @@ fn test_vector_sparse_ivf_fuzz() { let v = vector(&mut rng); let sql = format!("UPDATE t SET embedding = vector32_sparse('{v}') WHERE key = '{key}'",); - tracing::info!("{}", sql); + tracing::info!("({}) {}", operation, sql); simple_conn.execute(&sql).unwrap(); index_conn.execute(&sql).unwrap(); } else if choice == 2 && !keys.is_empty() { let idx = rng.next_u32() as usize % keys.len(); let key = &keys[idx]; let sql = format!("DELETE FROM t WHERE key = '{key}'"); - tracing::info!("{}", sql); + tracing::info!("({}) {}", operation, sql); simple_conn.execute(&sql).unwrap(); index_conn.execute(&sql).unwrap(); keys.remove(idx); @@ -322,7 +332,7 @@ fn test_vector_sparse_ivf_fuzz() { let v = vector(&mut rng); let k = rng.next_u32() % 20 + 1; let sql = format!("SELECT key, vector_distance_jaccard(embedding, vector32_sparse('{v}')) as d FROM t ORDER BY d LIMIT {k}"); - tracing::info!("{}", sql); + tracing::info!("({}) {}", operation, sql); let simple_rows = limbo_exec_rows(&simple_db, &simple_conn, &sql); let index_rows = limbo_exec_rows(&index_db, &index_conn, &sql); assert!(index_rows.len() <= simple_rows.len()); From c2e8f77ea16733d24279ace67d4dc4152e12f56a Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Fri, 31 Oct 2025 14:23:56 +0400 Subject: [PATCH 07/15] fix bug in the toy vector index in case when seek target located in the internal node of index btree --- core/index_method/toy_vector_sparse_ivf.rs | 65 ++++++++++++++++++++-- 1 file changed, 59 insertions(+), 6 deletions(-) diff --git a/core/index_method/toy_vector_sparse_ivf.rs b/core/index_method/toy_vector_sparse_ivf.rs index 4a358f1c4..674ffc45c 100644 --- a/core/index_method/toy_vector_sparse_ivf.rs +++ b/core/index_method/toy_vector_sparse_ivf.rs @@ -99,6 +99,12 @@ pub enum VectorSparseInvertedIndexDeleteState { rowid: i64, idx: usize, }, + NextScratch { + vector: Option>, + sum: f64, + rowid: i64, + idx: usize, + }, DeleteScratch { vector: Option>, sum: f64, @@ -509,6 +515,12 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { ], 3, ); + tracing::debug!( + "insert_state: seek: component={}, sum={}, rowid={}", + vector.as_ref().unwrap().as_f32_sparse().idx[*idx], + *sum, + *rowid, + ); self.insert_state = VectorSparseInvertedIndexInsertState::SeekScratch { vector: vector.take(), sum: *sum, @@ -525,9 +537,10 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { key, } => { let k = key.as_ref().unwrap(); - let _ = - return_if_io!(scratch_cursor - .seek(SeekKey::IndexKey(k), SeekOp::GE { eq_only: false })); + let result = return_if_io!( + scratch_cursor.seek(SeekKey::IndexKey(k), SeekOp::GE { eq_only: true }) + ); + tracing::debug!("insert_state: seek: result={:?}", result); self.insert_state = VectorSparseInvertedIndexInsertState::InsertScratch { vector: vector.take(), sum: *sum, @@ -566,7 +579,7 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { } => { let k = key.as_ref().unwrap(); let result = return_if_io!( - stats_cursor.seek(SeekKey::IndexKey(k), SeekOp::GE { eq_only: false }) + stats_cursor.seek(SeekKey::IndexKey(k), SeekOp::GE { eq_only: true }) ); match result { SeekResult::Found => { @@ -737,11 +750,51 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { idx, key, } => { + tracing::debug!( + "delete_state: seek: component={}, sum={}, rowid={}", + vector.as_ref().unwrap().as_f32_sparse().idx[*idx], + *sum, + *rowid, + ); let k = key.as_ref().unwrap(); let result = return_if_io!( cursor.seek(SeekKey::IndexKey(k), SeekOp::GE { eq_only: true }) ); - if !matches!(result, SeekResult::Found) { + let record = match cursor.record().unwrap() { + IOResult::Done(record) => record, + IOResult::IO(iocompletions) => unreachable!(), + }; + tracing::debug!("delete_state: seek: result={:?}", result); + match result { + SeekResult::Found => { + self.delete_state = + VectorSparseInvertedIndexDeleteState::DeleteScratch { + vector: vector.take(), + sum: *sum, + idx: *idx, + rowid: *rowid, + }; + } + SeekResult::TryAdvance => { + self.delete_state = VectorSparseInvertedIndexDeleteState::NextScratch { + vector: vector.take(), + sum: *sum, + idx: *idx, + rowid: *rowid, + }; + } + SeekResult::NotFound => { + return Err(LimboError::Corrupt("inverted index corrupted".to_string())) + } + } + } + VectorSparseInvertedIndexDeleteState::NextScratch { + vector, + sum, + rowid, + idx, + } => { + if !return_if_io!(cursor.next()) { return Err(LimboError::Corrupt("inverted index corrupted".to_string())); } self.delete_state = VectorSparseInvertedIndexDeleteState::DeleteScratch { @@ -1022,7 +1075,7 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { } else { *sum_threshold = Some(-1.0); } - tracing::info!( + tracing::debug!( "sum_threshold={:?}, max_threshold={}, remained_sum={}, sum={}, components={:?}", sum_threshold, best, From 20ba6990a91b6668921f1d6c3fd64c06c64b6c8e Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Fri, 31 Oct 2025 14:25:07 +0400 Subject: [PATCH 08/15] ignore "virtual" index entries corresponding to the index_methods from integrity check --- core/translate/integrity_check.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/translate/integrity_check.rs b/core/translate/integrity_check.rs index a2bafe3f0..c4ee30289 100644 --- a/core/translate/integrity_check.rs +++ b/core/translate/integrity_check.rs @@ -18,7 +18,9 @@ pub fn translate_integrity_check( root_pages.push(table.root_page); if let Some(indexes) = schema.indexes.get(table.name.as_str()) { for index in indexes.iter() { - root_pages.push(index.root_page); + if index.root_page > 0 { + root_pages.push(index.root_page); + } } } }; From 20919212f97119ada991ad92c20b0f3294297958 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Fri, 31 Oct 2025 14:30:07 +0400 Subject: [PATCH 09/15] fix clippy --- core/index_method/toy_vector_sparse_ivf.rs | 75 ++++++++++------------ tests/integration/index_method/mod.rs | 2 +- 2 files changed, 36 insertions(+), 41 deletions(-) diff --git a/core/index_method/toy_vector_sparse_ivf.rs b/core/index_method/toy_vector_sparse_ivf.rs index 674ffc45c..12dca96c1 100644 --- a/core/index_method/toy_vector_sparse_ivf.rs +++ b/core/index_method/toy_vector_sparse_ivf.rs @@ -158,29 +158,29 @@ struct ComponentStat { fn parse_stat_row(record: Option<&ImmutableRecord>) -> Result { let Some(record) = record else { - return Err(LimboError::Corrupt(format!( - "stats index corrupted: expected row" - ))); + return Err(LimboError::Corrupt( + "stats index corrupted: expected row".to_string(), + )); }; let ValueRef::Integer(position) = record.get_value(0)? else { - return Err(LimboError::Corrupt(format!( - "stats index corrupted: expected integer" - ))); + return Err(LimboError::Corrupt( + "stats index corrupted: expected integer".to_string(), + )); }; let ValueRef::Integer(cnt) = record.get_value(1)? else { - return Err(LimboError::Corrupt(format!( - "stats index corrupted: expected integer" - ))); + return Err(LimboError::Corrupt( + "stats index corrupted: expected integer".to_string(), + )); }; let ValueRef::Float(min) = record.get_value(2)? else { - return Err(LimboError::Corrupt(format!( - "stats index corrupted: expected float" - ))); + return Err(LimboError::Corrupt( + "stats index corrupted: expected float".to_string(), + )); }; let ValueRef::Float(max) = record.get_value(3)? else { - return Err(LimboError::Corrupt(format!( - "stats index corrupted: expected float" - ))); + return Err(LimboError::Corrupt( + "stats index corrupted: expected float".to_string(), + )); }; Ok(ComponentStat { position: position as u32, @@ -198,24 +198,24 @@ struct ComponentRow { fn parse_scratch_row(record: Option<&ImmutableRecord>) -> Result { let Some(record) = record else { - return Err(LimboError::Corrupt(format!( - "scratch index corrupted: expected row" - ))); + return Err(LimboError::Corrupt( + "scratch index corrupted: expected row".to_string(), + )); }; let ValueRef::Integer(position) = record.get_value(0)? else { - return Err(LimboError::Corrupt(format!( - "scratch index corrupted: expected integer" - ))); + return Err(LimboError::Corrupt( + "scratch index corrupted: expected integer".to_string(), + )); }; let ValueRef::Float(sum) = record.get_value(1)? else { - return Err(LimboError::Corrupt(format!( - "scratch index corrupted: expected float" - ))); + return Err(LimboError::Corrupt( + "scratch index corrupted: expected float".to_string(), + )); }; let ValueRef::Integer(rowid) = record.get_value(2)? else { - return Err(LimboError::Corrupt(format!( - "scratch index corrupted: expected integer" - ))); + return Err(LimboError::Corrupt( + "scratch index corrupted: expected integer".to_string(), + )); }; Ok(ComponentRow { position: position as u32, @@ -604,7 +604,7 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { let key = ImmutableRecord::from_values( &[ Value::Integer(position as i64), - Value::Integer(1 as i64), + Value::Integer(1), Value::Float(value), Value::Float(value), ], @@ -760,11 +760,6 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { let result = return_if_io!( cursor.seek(SeekKey::IndexKey(k), SeekOp::GE { eq_only: true }) ); - let record = match cursor.record().unwrap() { - IOResult::Done(record) => record, - IOResult::IO(iocompletions) => unreachable!(), - }; - tracing::debug!("delete_state: seek: result={:?}", result); match result { SeekResult::Found => { self.delete_state = @@ -843,9 +838,9 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { }; } SeekResult::NotFound | SeekResult::TryAdvance => { - return Err(LimboError::Corrupt(format!( - "stats index corrupted: can't find component row" - ))) + return Err(LimboError::Corrupt( + "stats index corrupted: can't find component row".to_string(), + )) } } } @@ -959,7 +954,7 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { key, } => { let p = positions.as_ref().unwrap(); - if p.len() == 0 && key.is_none() { + if p.is_empty() && key.is_none() { let mut components = components.take().unwrap(); // order by cnt ASC in order to check low-cardinality components first components.sort_by_key(|c| c.cnt); @@ -1043,7 +1038,7 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { sum_threshold, } => { let c = components.as_ref().unwrap(); - if c.len() == 0 && key.is_none() { + if c.is_empty() && key.is_none() { let distances = distances.take().unwrap(); self.search_result = distances.iter().map(|(d, i)| (*i, d.0)).collect(); return Ok(IOResult::Done(!self.search_result.is_empty())); @@ -1215,7 +1210,7 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { rowid, } => { let c = current.as_ref().unwrap(); - if c.len() == 0 && rowid.is_none() { + if c.is_empty() && rowid.is_none() { self.search_state = VectorSparseInvertedIndexSearchState::Seek { sum: *sum, components: components.take(), @@ -1232,7 +1227,7 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { *rowid = Some(current.as_mut().unwrap().pop_front().unwrap()); } - let rowid = rowid.as_ref().unwrap().clone(); + let rowid = *rowid.as_ref().unwrap(); let k = SeekKey::TableRowId(rowid); let result = return_if_io!(main.seek(k, SeekOp::GE { eq_only: true })); if !matches!(result, SeekResult::Found) { diff --git a/tests/integration/index_method/mod.rs b/tests/integration/index_method/mod.rs index 1303b6fd6..ae2f73e02 100644 --- a/tests/integration/index_method/mod.rs +++ b/tests/integration/index_method/mod.rs @@ -265,7 +265,7 @@ fn test_vector_sparse_ivf_fuzz() { let (mut rng, _) = rng_from_time_or_env(); let mut operation = 0; - for attempt in 0..10 { + for _ in 0..10 { let seed = rng.next_u64(); tracing::info!("======== seed: {} ========", seed); From 714400b299eb58be72818afa06c50dc10b29fef4 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Fri, 31 Oct 2025 14:50:16 +0400 Subject: [PATCH 10/15] add approximate search --- tests/integration/index_method/mod.rs | 32 ++++++++++++++++++++++----- 1 file changed, 27 insertions(+), 5 deletions(-) diff --git a/tests/integration/index_method/mod.rs b/tests/integration/index_method/mod.rs index ae2f73e02..34f49df2e 100644 --- a/tests/integration/index_method/mod.rs +++ b/tests/integration/index_method/mod.rs @@ -265,7 +265,7 @@ fn test_vector_sparse_ivf_fuzz() { let (mut rng, _) = rng_from_time_or_env(); let mut operation = 0; - for _ in 0..10 { + for delta in [0.0, 0.01, 0.05, 0.1, 0.5] { let seed = rng.next_u64(); tracing::info!("======== seed: {} ========", seed); @@ -284,7 +284,7 @@ fn test_vector_sparse_ivf_fuzz() { simple_conn.wal_auto_checkpoint_disable(); index_conn.wal_auto_checkpoint_disable(); index_conn - .execute("CREATE INDEX t_idx ON t USING toy_vector_sparse_ivf (embedding)") + .execute(&format!("CREATE INDEX t_idx ON t USING toy_vector_sparse_ivf (embedding) WITH (delta = {delta})")) .unwrap(); let vector = |rng: &mut ChaCha8Rng| { @@ -335,17 +335,39 @@ fn test_vector_sparse_ivf_fuzz() { tracing::info!("({}) {}", operation, sql); let simple_rows = limbo_exec_rows(&simple_db, &simple_conn, &sql); let index_rows = limbo_exec_rows(&index_db, &index_conn, &sql); + tracing::info!("simple: {:?}, index_rows: {:?}", simple_rows, index_rows); assert!(index_rows.len() <= simple_rows.len()); for (a, b) in index_rows.iter().zip(simple_rows.iter()) { - assert_eq!(a, b); + if delta == 0.0 { + assert_eq!(a, b); + } else { + match (&a[1], &b[1]) { + (rusqlite::types::Value::Real(a), rusqlite::types::Value::Real(b)) => { + assert!( + *a >= *b || (*a - *b).abs() < 1e-5, + "a={}, b={}, delta={}", + *a, + *b, + delta + ); + assert!( + *a - delta <= *b || (*a - delta - *b).abs() < 1e-5, + "a={}, b={}, delta={}", + *a, + *b, + delta + ); + } + _ => panic!("unexpected column values"), + } + } } for row in simple_rows.iter().skip(index_rows.len()) { match row[1] { - rusqlite::types::Value::Real(r) => assert_eq!(r, 1.0), + rusqlite::types::Value::Real(r) => assert!((1.0 - r) < 1e-5), _ => panic!("unexpected simple row value"), } } - tracing::info!("simple: {:?}, index_rows: {:?}", simple_rows, index_rows); } } } From 47b4e7e376de01bda7ba94d0d24f9f39af0d4c53 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Fri, 31 Oct 2025 15:02:37 +0400 Subject: [PATCH 11/15] fix clippy --- tests/integration/index_method/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/index_method/mod.rs b/tests/integration/index_method/mod.rs index 34f49df2e..9c14d12f1 100644 --- a/tests/integration/index_method/mod.rs +++ b/tests/integration/index_method/mod.rs @@ -284,7 +284,7 @@ fn test_vector_sparse_ivf_fuzz() { simple_conn.wal_auto_checkpoint_disable(); index_conn.wal_auto_checkpoint_disable(); index_conn - .execute(&format!("CREATE INDEX t_idx ON t USING toy_vector_sparse_ivf (embedding) WITH (delta = {delta})")) + .execute(format!("CREATE INDEX t_idx ON t USING toy_vector_sparse_ivf (embedding) WITH (delta = {delta})")) .unwrap(); let vector = |rng: &mut ChaCha8Rng| { From fb63a5a3ff3d81bcfbb0caa9d50ef65c5bb21db3 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 5 Nov 2025 20:44:11 +0400 Subject: [PATCH 12/15] final adjustment to align implementation with blog post --- core/index_method/toy_vector_sparse_ivf.rs | 85 +++++++++++++++++----- 1 file changed, 67 insertions(+), 18 deletions(-) diff --git a/core/index_method/toy_vector_sparse_ivf.rs b/core/index_method/toy_vector_sparse_ivf.rs index 12dca96c1..a3dc20ff2 100644 --- a/core/index_method/toy_vector_sparse_ivf.rs +++ b/core/index_method/toy_vector_sparse_ivf.rs @@ -229,15 +229,17 @@ enum VectorSparseInvertedIndexSearchState { Init, CollectComponentsSeek { sum: f64, - positions: Option>, - components: Option>, + vector: Option>, + idx: usize, + components: Option>, limit: i64, key: Option, }, CollectComponentsRead { sum: f64, - positions: Option>, - components: Option>, + vector: Option>, + idx: usize, + components: Option>, limit: i64, }, Seek { @@ -290,9 +292,17 @@ enum VectorSparseInvertedIndexSearchState { }, } +#[derive(Debug, PartialEq)] +pub enum ScanOrder { + DatasetFrequencyAsc, + QueryWeightDesc, +} + pub struct VectorSparseInvertedIndexMethodCursor { configuration: IndexMethodConfiguration, delta: f64, + scan_portion: f64, + scan_order: ScanOrder, scratch_btree: String, scratch_cursor: Option, stats_btree: String, @@ -348,9 +358,24 @@ impl VectorSparseInvertedIndexMethodCursor { Some(&Value::Float(delta)) => delta, _ => 0.0, }; + let scan_portion = match configuration.parameters.get("scan_portion") { + Some(&Value::Float(scan_portion)) => scan_portion, + _ => 1.0, + }; + let scan_order = match configuration.parameters.get("scan_order") { + Some(Value::Text(scan_order)) if scan_order.as_str() == "dataset_frequency_asc" => { + ScanOrder::DatasetFrequencyAsc + } + Some(Value::Text(scan_order)) if scan_order.as_str() == "query_weight_desc" => { + ScanOrder::QueryWeightDesc + } + _ => ScanOrder::QueryWeightDesc, + }; Self { configuration, delta, + scan_portion, + scan_order, scratch_btree, scratch_cursor: None, stats_btree, @@ -929,7 +954,7 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { "second value must be i64 limit parameter".to_string(), )); }; - let vector = Vector::from_slice(vector)?; + let vector = Vector::from_vec(vector.to_vec())?; if !matches!(vector.vector_type, VectorType::Float32Sparse) { return Err(LimboError::InternalError( "first value must be sparse vector".to_string(), @@ -940,7 +965,8 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { self.search_state = VectorSparseInvertedIndexSearchState::CollectComponentsSeek { sum, - positions: Some(sparse.idx.to_vec().into()), + vector: Some(vector), + idx: 0, components: Some(Vec::new()), key: None, limit, @@ -948,21 +974,39 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { } VectorSparseInvertedIndexSearchState::CollectComponentsSeek { sum, - positions, + vector, + idx, components, limit, key, } => { - let p = positions.as_ref().unwrap(); + let p = &vector.as_ref().unwrap().as_f32_sparse().idx[*idx..]; if p.is_empty() && key.is_none() { let mut components = components.take().unwrap(); - // order by cnt ASC in order to check low-cardinality components first - components.sort_by_key(|c| c.cnt); + match self.scan_order { + ScanOrder::DatasetFrequencyAsc => { + // order by cnt ASC in order to check low-cardinality components first + components.sort_by_key(|(c, _)| c.cnt); + } + ScanOrder::QueryWeightDesc => { + // order by weight DESC in order to check high-impact components first + components + .sort_by_key(|(_, w)| std::cmp::Reverse(FloatOrd(*w as f64))); + } + } + let take = (components.len() as f64 * self.scan_portion).ceil() as usize; + let components = components + .into_iter() + .take(take) + .map(|(c, _)| c) + .collect::>(); tracing::debug!( - "query_start: components: {:?}, delta: {}", + "query_start: components: {:?}, delta: {}, scan_portion: {}, scan_order: {:?}", components, - self.delta + self.delta, + self.scan_portion, + self.scan_order, ); self.search_state = VectorSparseInvertedIndexSearchState::Seek { sum: *sum, @@ -977,7 +1021,7 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { continue; } if key.is_none() { - let position = positions.as_mut().unwrap().pop_front().unwrap(); + let position = vector.as_ref().unwrap().as_f32_sparse().idx[*idx]; *key = Some(ImmutableRecord::from_values( &[Value::Integer(position as i64)], 1, @@ -992,7 +1036,8 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { self.search_state = VectorSparseInvertedIndexSearchState::CollectComponentsRead { sum: *sum, - positions: positions.take(), + vector: vector.take(), + idx: *idx, components: components.take(), limit: *limit, }; @@ -1002,7 +1047,8 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { VectorSparseInvertedIndexSearchState::CollectComponentsSeek { sum: *sum, components: components.take(), - positions: positions.take(), + vector: vector.take(), + idx: *idx + 1, limit: *limit, key: None, }; @@ -1011,18 +1057,21 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { } VectorSparseInvertedIndexSearchState::CollectComponentsRead { sum, - positions, + vector, + idx, components, limit, } => { let record = return_if_io!(stats.record()); + let v = vector.as_ref().unwrap().as_f32_sparse().values[*idx]; let component = parse_stat_row(record.as_deref())?; - components.as_mut().unwrap().push(component); + components.as_mut().unwrap().push((component, v)); self.search_state = VectorSparseInvertedIndexSearchState::CollectComponentsSeek { sum: *sum, components: components.take(), - positions: positions.take(), + vector: vector.take(), + idx: *idx + 1, limit: *limit, key: None, }; From fe974dd41484bd996fd27e63d6bdea4e2f02cd9a Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 5 Nov 2025 20:44:43 +0400 Subject: [PATCH 13/15] fix slice operation implementation --- core/vector/operations/slice.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/vector/operations/slice.rs b/core/vector/operations/slice.rs index a1f6b99d8..9b6de84a2 100644 --- a/core/vector/operations/slice.rs +++ b/core/vector/operations/slice.rs @@ -37,7 +37,7 @@ pub fn vector_slice(vector: &Vector, start: usize, end: usize) -> Result Date: Wed, 5 Nov 2025 20:47:58 +0400 Subject: [PATCH 14/15] rename scratch -> inverted index --- core/index_method/toy_vector_sparse_ivf.rs | 94 +++++++++++----------- 1 file changed, 47 insertions(+), 47 deletions(-) diff --git a/core/index_method/toy_vector_sparse_ivf.rs b/core/index_method/toy_vector_sparse_ivf.rs index a3dc20ff2..03ed37e45 100644 --- a/core/index_method/toy_vector_sparse_ivf.rs +++ b/core/index_method/toy_vector_sparse_ivf.rs @@ -47,14 +47,14 @@ pub enum VectorSparseInvertedIndexInsertState { rowid: i64, idx: usize, }, - SeekScratch { + SeekInverted { vector: Option>, sum: f64, key: Option, rowid: i64, idx: usize, }, - InsertScratch { + InsertInverted { vector: Option>, sum: f64, key: Option, @@ -92,20 +92,20 @@ pub enum VectorSparseInvertedIndexDeleteState { rowid: i64, idx: usize, }, - SeekScratch { + SeekInverted { vector: Option>, sum: f64, key: Option, rowid: i64, idx: usize, }, - NextScratch { + NextInverted { vector: Option>, sum: f64, rowid: i64, idx: usize, }, - DeleteScratch { + DeleteInverted { vector: Option>, sum: f64, rowid: i64, @@ -196,25 +196,25 @@ struct ComponentRow { rowid: i64, } -fn parse_scratch_row(record: Option<&ImmutableRecord>) -> Result { +fn parse_inverted_index_row(record: Option<&ImmutableRecord>) -> Result { let Some(record) = record else { return Err(LimboError::Corrupt( - "scratch index corrupted: expected row".to_string(), + "inverted index corrupted: expected row".to_string(), )); }; let ValueRef::Integer(position) = record.get_value(0)? else { return Err(LimboError::Corrupt( - "scratch index corrupted: expected integer".to_string(), + "inverted index corrupted: expected integer".to_string(), )); }; let ValueRef::Float(sum) = record.get_value(1)? else { return Err(LimboError::Corrupt( - "scratch index corrupted: expected float".to_string(), + "inverted index corrupted: expected float".to_string(), )); }; let ValueRef::Integer(rowid) = record.get_value(2)? else { return Err(LimboError::Corrupt( - "scratch index corrupted: expected integer".to_string(), + "inverted index corrupted: expected integer".to_string(), )); }; Ok(ComponentRow { @@ -303,8 +303,8 @@ pub struct VectorSparseInvertedIndexMethodCursor { delta: f64, scan_portion: f64, scan_order: ScanOrder, - scratch_btree: String, - scratch_cursor: Option, + inverted_index_btree: String, + inverted_index_cursor: Option, stats_btree: String, stats_cursor: Option, main_btree: Option, @@ -352,7 +352,7 @@ impl IndexMethodAttachment for VectorSparseInvertedIndexMethodAttachment { impl VectorSparseInvertedIndexMethodCursor { pub fn new(configuration: IndexMethodConfiguration) -> Self { - let scratch_btree = format!("{}_scratch", configuration.index_name); + let inverted_index_btree = format!("{}_inverted_index", configuration.index_name); let stats_btree = format!("{}_stats", configuration.index_name); let delta = match configuration.parameters.get("delta") { Some(&Value::Float(delta)) => delta, @@ -376,8 +376,8 @@ impl VectorSparseInvertedIndexMethodCursor { delta, scan_portion, scan_order, - scratch_btree, - scratch_cursor: None, + inverted_index_btree, + inverted_index_cursor: None, stats_btree, stats_cursor: None, main_btree: None, @@ -402,9 +402,9 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { let columns = &self.configuration.columns; let columns = columns.iter().map(|x| x.name.as_str()).collect::>(); - let scratch_index_create = format!( + let inverted_index_create = format!( "CREATE INDEX {} ON {} USING {} ({})", - self.scratch_btree, + self.inverted_index_btree, self.configuration.table_name, BACKING_BTREE_INDEX_METHOD_NAME, columns.join(", ") @@ -416,7 +416,7 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { BACKING_BTREE_INDEX_METHOD_NAME, columns.join(", ") ); - for sql in [scratch_index_create, stats_index_create] { + for sql in [inverted_index_create, stats_index_create] { let mut stmt = connection.prepare(&sql)?; connection.start_nested(); let result = stmt.run_ignore_rows(); @@ -428,9 +428,9 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { } fn destroy(&mut self, connection: &Arc) -> Result> { - let scratch_index_drop = format!("DROP INDEX {}", self.scratch_btree); + let inverted_index_drop = format!("DROP INDEX {}", self.inverted_index_btree); let stats_index_drop = format!("DROP INDEX {}", self.stats_btree); - for sql in [scratch_index_drop, stats_index_drop] { + for sql in [inverted_index_drop, stats_index_drop] { let mut stmt = connection.prepare(&sql)?; connection.start_nested(); let result = stmt.run_ignore_rows(); @@ -442,10 +442,10 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { } fn open_read(&mut self, connection: &Arc) -> Result> { - self.scratch_cursor = Some(open_index_cursor( + self.inverted_index_cursor = Some(open_index_cursor( connection, &self.configuration.table_name, - &self.scratch_btree, + &self.inverted_index_btree, // component, length, rowid vec![key_info(), key_info(), key_info()], )?); @@ -464,10 +464,10 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { } fn open_write(&mut self, connection: &Arc) -> Result> { - self.scratch_cursor = Some(open_index_cursor( + self.inverted_index_cursor = Some(open_index_cursor( connection, &self.configuration.table_name, - &self.scratch_btree, + &self.inverted_index_btree, // component, length, rowid vec![key_info(), key_info(), key_info()], )?); @@ -482,9 +482,9 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { } fn insert(&mut self, values: &[Register]) -> Result> { - let Some(scratch_cursor) = &mut self.scratch_cursor else { + let Some(inverted_cursor) = &mut self.inverted_index_cursor else { return Err(LimboError::InternalError( - "scratch cursor must be opened".to_string(), + "inverted cursor must be opened".to_string(), )); }; let Some(stats_cursor) = &mut self.stats_cursor else { @@ -546,7 +546,7 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { *sum, *rowid, ); - self.insert_state = VectorSparseInvertedIndexInsertState::SeekScratch { + self.insert_state = VectorSparseInvertedIndexInsertState::SeekInverted { vector: vector.take(), sum: *sum, idx: *idx, @@ -554,7 +554,7 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { key: Some(key), }; } - VectorSparseInvertedIndexInsertState::SeekScratch { + VectorSparseInvertedIndexInsertState::SeekInverted { vector, sum, rowid, @@ -562,11 +562,11 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { key, } => { let k = key.as_ref().unwrap(); - let result = return_if_io!( - scratch_cursor.seek(SeekKey::IndexKey(k), SeekOp::GE { eq_only: true }) - ); + let result = + return_if_io!(inverted_cursor + .seek(SeekKey::IndexKey(k), SeekOp::GE { eq_only: true })); tracing::debug!("insert_state: seek: result={:?}", result); - self.insert_state = VectorSparseInvertedIndexInsertState::InsertScratch { + self.insert_state = VectorSparseInvertedIndexInsertState::InsertInverted { vector: vector.take(), sum: *sum, idx: *idx, @@ -574,7 +574,7 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { key: key.take(), }; } - VectorSparseInvertedIndexInsertState::InsertScratch { + VectorSparseInvertedIndexInsertState::InsertInverted { vector, sum, rowid, @@ -582,7 +582,7 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { key, } => { let k = key.as_ref().unwrap(); - return_if_io!(scratch_cursor.insert(&BTreeKey::IndexKey(k))); + return_if_io!(inverted_cursor.insert(&BTreeKey::IndexKey(k))); let v = vector.as_ref().unwrap(); let position = v.as_f32_sparse().idx[*idx]; @@ -702,7 +702,7 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { } fn delete(&mut self, values: &[Register]) -> Result> { - let Some(cursor) = &mut self.scratch_cursor else { + let Some(cursor) = &mut self.inverted_index_cursor else { return Err(LimboError::InternalError( "cursor must be opened".to_string(), )); @@ -760,7 +760,7 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { ], 3, ); - self.delete_state = VectorSparseInvertedIndexDeleteState::SeekScratch { + self.delete_state = VectorSparseInvertedIndexDeleteState::SeekInverted { vector: vector.take(), idx: *idx, sum: *sum, @@ -768,7 +768,7 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { key: Some(key), }; } - VectorSparseInvertedIndexDeleteState::SeekScratch { + VectorSparseInvertedIndexDeleteState::SeekInverted { vector, sum, rowid, @@ -788,7 +788,7 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { match result { SeekResult::Found => { self.delete_state = - VectorSparseInvertedIndexDeleteState::DeleteScratch { + VectorSparseInvertedIndexDeleteState::DeleteInverted { vector: vector.take(), sum: *sum, idx: *idx, @@ -796,7 +796,7 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { }; } SeekResult::TryAdvance => { - self.delete_state = VectorSparseInvertedIndexDeleteState::NextScratch { + self.delete_state = VectorSparseInvertedIndexDeleteState::NextInverted { vector: vector.take(), sum: *sum, idx: *idx, @@ -808,7 +808,7 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { } } } - VectorSparseInvertedIndexDeleteState::NextScratch { + VectorSparseInvertedIndexDeleteState::NextInverted { vector, sum, rowid, @@ -817,14 +817,14 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { if !return_if_io!(cursor.next()) { return Err(LimboError::Corrupt("inverted index corrupted".to_string())); } - self.delete_state = VectorSparseInvertedIndexDeleteState::DeleteScratch { + self.delete_state = VectorSparseInvertedIndexDeleteState::DeleteInverted { vector: vector.take(), sum: *sum, idx: *idx, rowid: *rowid, }; } - VectorSparseInvertedIndexDeleteState::DeleteScratch { + VectorSparseInvertedIndexDeleteState::DeleteInverted { vector, sum, rowid, @@ -925,7 +925,7 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { } fn query_start(&mut self, values: &[Register]) -> Result> { - let Some(scratch) = &mut self.scratch_cursor else { + let Some(inverted) = &mut self.inverted_index_cursor else { return Err(LimboError::InternalError( "cursor must be opened".to_string(), )); @@ -1139,7 +1139,7 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { } let k = key.as_ref().unwrap(); let result = return_if_io!( - scratch.seek(SeekKey::IndexKey(k), SeekOp::GE { eq_only: false }) + inverted.seek(SeekKey::IndexKey(k), SeekOp::GE { eq_only: false }) ); match result { SeekResult::Found => { @@ -1178,8 +1178,8 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { component, current, } => { - let record = return_if_io!(scratch.record()); - let row = parse_scratch_row(record.as_deref())?; + let record = return_if_io!(inverted.record()); + let row = parse_inverted_index_row(record.as_deref())?; if row.position != *component || (sum_threshold.is_some() && row.sum > sum_threshold.unwrap()) { @@ -1222,7 +1222,7 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { component, current, } => { - let result = return_if_io!(scratch.next()); + let result = return_if_io!(inverted.next()); if !result { let mut current = current.take().unwrap(); current.sort(); From 68a4c904466e06d6b7e3068c03e52400c97bca97 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 5 Nov 2025 20:53:34 +0400 Subject: [PATCH 15/15] fix fmt and test --- core/index_method/toy_vector_sparse_ivf.rs | 13 +++++++------ tests/integration/index_method/mod.rs | 5 ++++- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/core/index_method/toy_vector_sparse_ivf.rs b/core/index_method/toy_vector_sparse_ivf.rs index 03ed37e45..d4b85c973 100644 --- a/core/index_method/toy_vector_sparse_ivf.rs +++ b/core/index_method/toy_vector_sparse_ivf.rs @@ -796,12 +796,13 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { }; } SeekResult::TryAdvance => { - self.delete_state = VectorSparseInvertedIndexDeleteState::NextInverted { - vector: vector.take(), - sum: *sum, - idx: *idx, - rowid: *rowid, - }; + self.delete_state = + VectorSparseInvertedIndexDeleteState::NextInverted { + vector: vector.take(), + sum: *sum, + idx: *idx, + rowid: *rowid, + }; } SeekResult::NotFound => { return Err(LimboError::Corrupt("inverted index corrupted".to_string())) diff --git a/tests/integration/index_method/mod.rs b/tests/integration/index_method/mod.rs index 9c14d12f1..8285a258f 100644 --- a/tests/integration/index_method/mod.rs +++ b/tests/integration/index_method/mod.rs @@ -75,7 +75,10 @@ fn test_vector_sparse_ivf_create_destroy() { run(&tmp_db, || cursor.create(&conn)).unwrap(); } conn.wal_insert_end(true).unwrap(); - assert_eq!(schema_rows(), vec!["t", "t_idx_scratch", "t_idx_stats"]); + assert_eq!( + schema_rows(), + vec!["t", "t_idx_inverted_index", "t_idx_stats"] + ); conn.wal_insert_begin().unwrap(); {