diff --git a/core/index_method/toy_vector_sparse_ivf.rs b/core/index_method/toy_vector_sparse_ivf.rs index 51b53dc03..d4b85c973 100644 --- a/core/index_method/toy_vector_sparse_ivf.rs +++ b/core/index_method/toy_vector_sparse_ivf.rs @@ -20,7 +20,7 @@ use crate::{ operations, vector_types::{Vector, VectorType}, }, - Connection, LimboError, Result, Statement, Value, ValueRef, + Connection, LimboError, Result, Value, ValueRef, }; /// Simple inverted index for sparse vectors @@ -38,27 +38,45 @@ pub struct VectorSparseInvertedIndexMethodAttachment { patterns: Vec, } -pub enum VectorSparseInvertedIndexCreateState { - Init, - Run { stmt: Box }, -} - #[derive(Debug)] pub enum VectorSparseInvertedIndexInsertState { Init, Prepare { - positions: Option>, + vector: Option>, + sum: f64, rowid: i64, idx: usize, }, - Seek { - positions: Option>, + SeekInverted { + vector: Option>, + sum: f64, key: Option, rowid: i64, idx: usize, }, - Insert { - positions: Option>, + InsertInverted { + vector: Option>, + sum: f64, + key: Option, + rowid: i64, + idx: usize, + }, + SeekStats { + vector: Option>, + sum: f64, + key: Option, + rowid: i64, + idx: usize, + }, + ReadStats { + vector: Option>, + sum: f64, + rowid: i64, + idx: usize, + }, + UpdateStats { + vector: Option>, + sum: f64, key: Option, rowid: i64, idx: usize, @@ -69,18 +87,47 @@ pub enum VectorSparseInvertedIndexInsertState { pub enum VectorSparseInvertedIndexDeleteState { Init, Prepare { - positions: Option>, + vector: Option>, + sum: f64, rowid: i64, idx: usize, }, - Seek { - positions: Option>, + SeekInverted { + vector: Option>, + sum: f64, key: Option, rowid: i64, idx: usize, }, - Insert { - positions: Option>, + NextInverted { + vector: Option>, + sum: f64, + rowid: i64, + idx: usize, + }, + DeleteInverted { + vector: Option>, + sum: f64, + rowid: i64, + idx: usize, + }, + SeekStats { + vector: Option>, + sum: f64, + key: Option, + rowid: i64, + idx: usize, + }, + ReadStats { + vector: Option>, + sum: f64, + rowid: i64, + idx: usize, + }, + UpdateStats { + vector: Option>, + sum: f64, + key: Option, rowid: i64, idx: usize, }, @@ -101,54 +148,166 @@ impl Ord for FloatOrd { } } +#[derive(Debug)] +struct ComponentStat { + position: u32, + cnt: i64, + min: f64, + max: f64, +} + +fn parse_stat_row(record: Option<&ImmutableRecord>) -> Result { + let Some(record) = record else { + return Err(LimboError::Corrupt( + "stats index corrupted: expected row".to_string(), + )); + }; + let ValueRef::Integer(position) = record.get_value(0)? else { + return Err(LimboError::Corrupt( + "stats index corrupted: expected integer".to_string(), + )); + }; + let ValueRef::Integer(cnt) = record.get_value(1)? else { + return Err(LimboError::Corrupt( + "stats index corrupted: expected integer".to_string(), + )); + }; + let ValueRef::Float(min) = record.get_value(2)? else { + return Err(LimboError::Corrupt( + "stats index corrupted: expected float".to_string(), + )); + }; + let ValueRef::Float(max) = record.get_value(3)? else { + return Err(LimboError::Corrupt( + "stats index corrupted: expected float".to_string(), + )); + }; + Ok(ComponentStat { + position: position as u32, + cnt, + min, + max, + }) +} +#[derive(Debug)] +struct ComponentRow { + position: u32, + sum: f64, + rowid: i64, +} + +fn parse_inverted_index_row(record: Option<&ImmutableRecord>) -> Result { + let Some(record) = record else { + return Err(LimboError::Corrupt( + "inverted index corrupted: expected row".to_string(), + )); + }; + let ValueRef::Integer(position) = record.get_value(0)? else { + return Err(LimboError::Corrupt( + "inverted index corrupted: expected integer".to_string(), + )); + }; + let ValueRef::Float(sum) = record.get_value(1)? else { + return Err(LimboError::Corrupt( + "inverted index corrupted: expected float".to_string(), + )); + }; + let ValueRef::Integer(rowid) = record.get_value(2)? else { + return Err(LimboError::Corrupt( + "inverted index corrupted: expected integer".to_string(), + )); + }; + Ok(ComponentRow { + position: position as u32, + sum, + rowid, + }) +} + #[derive(Debug)] enum VectorSparseInvertedIndexSearchState { Init, - Prepare { - collected: Option>, - positions: Option>, + CollectComponentsSeek { + sum: f64, + vector: Option>, idx: usize, + components: Option>, + limit: i64, + key: Option, + }, + CollectComponentsRead { + sum: f64, + vector: Option>, + idx: usize, + components: Option>, limit: i64, }, Seek { + sum: f64, + components: Option>, collected: Option>, - positions: Option>, - key: Option, - idx: usize, + distances: Option>, limit: i64, + key: Option, + sum_threshold: Option, + component: Option, }, Read { + sum: f64, + components: Option>, collected: Option>, - positions: Option>, - key: Option, - idx: usize, + distances: Option>, limit: i64, + sum_threshold: Option, + component: u32, + current: Option>, }, Next { + sum: f64, + components: Option>, collected: Option>, - positions: Option>, - key: Option, - idx: usize, + distances: Option>, limit: i64, + sum_threshold: Option, + component: u32, + current: Option>, }, EvaluateSeek { - rowids: Option>, + sum: f64, + components: Option>, + collected: Option>, distances: Option>, limit: i64, + current: Option>, + rowid: Option, }, EvaluateRead { - rowids: Option>, + sum: f64, + components: Option>, + collected: Option>, distances: Option>, limit: i64, + current: Option>, + rowid: i64, }, } +#[derive(Debug, PartialEq)] +pub enum ScanOrder { + DatasetFrequencyAsc, + QueryWeightDesc, +} + pub struct VectorSparseInvertedIndexMethodCursor { configuration: IndexMethodConfiguration, - scratch_btree: String, - scratch_cursor: Option, + delta: f64, + scan_portion: f64, + scan_order: ScanOrder, + inverted_index_btree: String, + inverted_index_cursor: Option, + stats_btree: String, + stats_cursor: Option, main_btree: Option, - create_state: VectorSparseInvertedIndexCreateState, insert_state: VectorSparseInvertedIndexInsertState, delete_state: VectorSparseInvertedIndexDeleteState, search_state: VectorSparseInvertedIndexSearchState, @@ -193,14 +352,36 @@ impl IndexMethodAttachment for VectorSparseInvertedIndexMethodAttachment { impl VectorSparseInvertedIndexMethodCursor { pub fn new(configuration: IndexMethodConfiguration) -> Self { - let scratch_btree = format!("{}_scratch", configuration.index_name); + let inverted_index_btree = format!("{}_inverted_index", configuration.index_name); + let stats_btree = format!("{}_stats", configuration.index_name); + let delta = match configuration.parameters.get("delta") { + Some(&Value::Float(delta)) => delta, + _ => 0.0, + }; + let scan_portion = match configuration.parameters.get("scan_portion") { + Some(&Value::Float(scan_portion)) => scan_portion, + _ => 1.0, + }; + let scan_order = match configuration.parameters.get("scan_order") { + Some(Value::Text(scan_order)) if scan_order.as_str() == "dataset_frequency_asc" => { + ScanOrder::DatasetFrequencyAsc + } + Some(Value::Text(scan_order)) if scan_order.as_str() == "query_weight_desc" => { + ScanOrder::QueryWeightDesc + } + _ => ScanOrder::QueryWeightDesc, + }; Self { configuration, - scratch_btree, - scratch_cursor: None, + delta, + scan_portion, + scan_order, + inverted_index_btree, + inverted_index_cursor: None, + stats_btree, + stats_cursor: None, main_btree: None, search_result: VecDeque::new(), - create_state: VectorSparseInvertedIndexCreateState::Init, insert_state: VectorSparseInvertedIndexInsertState::Init, delete_state: VectorSparseInvertedIndexDeleteState::Init, search_state: VectorSparseInvertedIndexSearchState::Init, @@ -208,57 +389,72 @@ impl VectorSparseInvertedIndexMethodCursor { } } +fn key_info() -> KeyInfo { + KeyInfo { + collation: CollationSeq::Binary, + sort_order: SortOrder::Asc, + } +} + impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { fn create(&mut self, connection: &Arc) -> Result> { - loop { - match &mut self.create_state { - VectorSparseInvertedIndexCreateState::Init => { - let columns = &self.configuration.columns; - let columns = columns.iter().map(|x| x.name.as_str()).collect::>(); - let sql = format!( - "CREATE INDEX {} ON {} USING {} ({})", - self.scratch_btree, - self.configuration.table_name, - BACKING_BTREE_INDEX_METHOD_NAME, - columns.join(", ") - ); - let stmt = connection.prepare(&sql)?; - connection.start_nested(); - self.create_state = VectorSparseInvertedIndexCreateState::Run { - stmt: Box::new(stmt), - }; - } - VectorSparseInvertedIndexCreateState::Run { stmt } => { - // we need to properly track subprograms and propagate result to the root program to make this execution async - let result = stmt.run_ignore_rows(); - connection.end_nested(); - result?; - return Ok(IOResult::Done(())); - } - } + // we need to properly track subprograms and propagate result to the root program to make this execution async + + let columns = &self.configuration.columns; + let columns = columns.iter().map(|x| x.name.as_str()).collect::>(); + let inverted_index_create = format!( + "CREATE INDEX {} ON {} USING {} ({})", + self.inverted_index_btree, + self.configuration.table_name, + BACKING_BTREE_INDEX_METHOD_NAME, + columns.join(", ") + ); + let stats_index_create = format!( + "CREATE INDEX {} ON {} USING {} ({})", + self.stats_btree, + self.configuration.table_name, + BACKING_BTREE_INDEX_METHOD_NAME, + columns.join(", ") + ); + for sql in [inverted_index_create, stats_index_create] { + let mut stmt = connection.prepare(&sql)?; + connection.start_nested(); + let result = stmt.run_ignore_rows(); + connection.end_nested(); + result?; } + + Ok(IOResult::Done(())) } fn destroy(&mut self, connection: &Arc) -> Result> { - let sql = format!("DROP INDEX {}", self.scratch_btree); - let mut stmt = connection.prepare(&sql)?; - connection.start_nested(); - let result = stmt.run_ignore_rows(); - connection.end_nested(); - result?; + let inverted_index_drop = format!("DROP INDEX {}", self.inverted_index_btree); + let stats_index_drop = format!("DROP INDEX {}", self.stats_btree); + for sql in [inverted_index_drop, stats_index_drop] { + let mut stmt = connection.prepare(&sql)?; + connection.start_nested(); + let result = stmt.run_ignore_rows(); + connection.end_nested(); + result?; + } + Ok(IOResult::Done(())) } fn open_read(&mut self, connection: &Arc) -> Result> { - let key_info = KeyInfo { - collation: CollationSeq::Binary, - sort_order: SortOrder::Asc, - }; - self.scratch_cursor = Some(open_index_cursor( + self.inverted_index_cursor = Some(open_index_cursor( connection, &self.configuration.table_name, - &self.scratch_btree, - vec![key_info, key_info], + &self.inverted_index_btree, + // component, length, rowid + vec![key_info(), key_info(), key_info()], + )?); + self.stats_cursor = Some(open_index_cursor( + connection, + &self.configuration.table_name, + &self.stats_btree, + // component + vec![key_info()], )?); self.main_btree = Some(open_table_cursor( connection, @@ -268,23 +464,32 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { } fn open_write(&mut self, connection: &Arc) -> Result> { - let key_info = KeyInfo { - collation: CollationSeq::Binary, - sort_order: SortOrder::Asc, - }; - self.scratch_cursor = Some(open_index_cursor( + self.inverted_index_cursor = Some(open_index_cursor( connection, &self.configuration.table_name, - &self.scratch_btree, - vec![key_info, key_info], + &self.inverted_index_btree, + // component, length, rowid + vec![key_info(), key_info(), key_info()], + )?); + self.stats_cursor = Some(open_index_cursor( + connection, + &self.configuration.table_name, + &self.stats_btree, + // component + vec![key_info()], )?); Ok(IOResult::Done(())) } fn insert(&mut self, values: &[Register]) -> Result> { - let Some(cursor) = &mut self.scratch_cursor else { + let Some(inverted_cursor) = &mut self.inverted_index_cursor else { return Err(LimboError::InternalError( - "cursor must be opened".to_string(), + "inverted cursor must be opened".to_string(), + )); + }; + let Some(stats_cursor) = &mut self.stats_cursor else { + return Err(LimboError::InternalError( + "stats cursor must be opened".to_string(), )); }; loop { @@ -296,7 +501,7 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { "first value must be sparse vector".to_string(), )); }; - let vector = Vector::from_slice(vector)?; + let vector = Vector::from_vec(vector.to_vec())?; if !matches!(vector.vector_type, VectorType::Float32Sparse) { return Err(LimboError::InternalError( "first value must be sparse vector".to_string(), @@ -307,64 +512,189 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { "second value must be i64 rowid".to_string(), )); }; - let sparse = vector.as_f32_sparse(); + let sum = vector.as_f32_sparse().values.iter().sum::() as f64; self.insert_state = VectorSparseInvertedIndexInsertState::Prepare { - positions: Some(sparse.idx.to_vec()), + vector: Some(vector), + sum, rowid, idx: 0, } } VectorSparseInvertedIndexInsertState::Prepare { - positions, + vector, + sum, rowid, idx, } => { - let p = positions.as_ref().unwrap(); - if *idx == p.len() { + let v = vector.as_ref().unwrap(); + if *idx == v.as_f32_sparse().idx.len() { self.insert_state = VectorSparseInvertedIndexInsertState::Init; return Ok(IOResult::Done(())); } - let position = p[*idx]; + let position = v.as_f32_sparse().idx[*idx]; let key = ImmutableRecord::from_values( - &[Value::Integer(position as i64), Value::Integer(*rowid)], - 2, + &[ + Value::Integer(position as i64), + Value::Float(*sum), + Value::Integer(*rowid), + ], + 3, ); - self.insert_state = VectorSparseInvertedIndexInsertState::Seek { + tracing::debug!( + "insert_state: seek: component={}, sum={}, rowid={}", + vector.as_ref().unwrap().as_f32_sparse().idx[*idx], + *sum, + *rowid, + ); + self.insert_state = VectorSparseInvertedIndexInsertState::SeekInverted { + vector: vector.take(), + sum: *sum, idx: *idx, rowid: *rowid, - positions: positions.take(), key: Some(key), }; } - VectorSparseInvertedIndexInsertState::Seek { - positions, + VectorSparseInvertedIndexInsertState::SeekInverted { + vector, + sum, rowid, idx, key, } => { let k = key.as_ref().unwrap(); - let _ = return_if_io!( - cursor.seek(SeekKey::IndexKey(k), SeekOp::GE { eq_only: false }) - ); - self.insert_state = VectorSparseInvertedIndexInsertState::Insert { + let result = + return_if_io!(inverted_cursor + .seek(SeekKey::IndexKey(k), SeekOp::GE { eq_only: true })); + tracing::debug!("insert_state: seek: result={:?}", result); + self.insert_state = VectorSparseInvertedIndexInsertState::InsertInverted { + vector: vector.take(), + sum: *sum, idx: *idx, rowid: *rowid, - positions: positions.take(), key: key.take(), }; } - VectorSparseInvertedIndexInsertState::Insert { - positions, + VectorSparseInvertedIndexInsertState::InsertInverted { + vector, + sum, rowid, idx, key, } => { let k = key.as_ref().unwrap(); - return_if_io!(cursor.insert(&BTreeKey::IndexKey(k))); + return_if_io!(inverted_cursor.insert(&BTreeKey::IndexKey(k))); + + let v = vector.as_ref().unwrap(); + let position = v.as_f32_sparse().idx[*idx]; + let key = ImmutableRecord::from_values(&[Value::Integer(position as i64)], 1); + self.insert_state = VectorSparseInvertedIndexInsertState::SeekStats { + vector: vector.take(), + sum: *sum, + idx: *idx, + rowid: *rowid, + key: Some(key), + }; + } + VectorSparseInvertedIndexInsertState::SeekStats { + vector, + sum, + key, + rowid, + idx, + } => { + let k = key.as_ref().unwrap(); + let result = return_if_io!( + stats_cursor.seek(SeekKey::IndexKey(k), SeekOp::GE { eq_only: true }) + ); + match result { + SeekResult::Found => { + self.insert_state = VectorSparseInvertedIndexInsertState::ReadStats { + vector: vector.take(), + sum: *sum, + idx: *idx, + rowid: *rowid, + }; + } + SeekResult::NotFound | SeekResult::TryAdvance => { + let v = vector.as_ref().unwrap(); + let position = v.as_f32_sparse().idx[*idx]; + let value = v.as_f32_sparse().values[*idx] as f64; + tracing::debug!( + "update stats(insert): {} (cnt={}, min={}, max={})", + position, + 1, + value, + value, + ); + let key = ImmutableRecord::from_values( + &[ + Value::Integer(position as i64), + Value::Integer(1), + Value::Float(value), + Value::Float(value), + ], + 4, + ); + self.insert_state = VectorSparseInvertedIndexInsertState::UpdateStats { + vector: vector.take(), + sum: *sum, + idx: *idx, + rowid: *rowid, + key: Some(key), + }; + } + } + } + VectorSparseInvertedIndexInsertState::ReadStats { + vector, + sum, + rowid, + idx, + } => { + let record = return_if_io!(stats_cursor.record()); + let component = parse_stat_row(record.as_deref())?; + let v = vector.as_ref().unwrap(); + let position = v.as_f32_sparse().idx[*idx]; + let value = v.as_f32_sparse().values[*idx] as f64; + tracing::debug!( + "update stats(insert): {} (cnt={}, min={}, max={})", + position, + component.cnt + 1, + value.min(component.min), + value.max(component.max), + ); + let key = ImmutableRecord::from_values( + &[ + Value::Integer(position as i64), + Value::Integer(component.cnt + 1), + Value::Float(value.min(component.min)), + Value::Float(value.max(component.max)), + ], + 4, + ); + self.insert_state = VectorSparseInvertedIndexInsertState::UpdateStats { + vector: vector.take(), + sum: *sum, + idx: *idx, + rowid: *rowid, + key: Some(key), + }; + } + VectorSparseInvertedIndexInsertState::UpdateStats { + vector, + sum, + key, + rowid, + idx, + } => { + let k = key.as_ref().unwrap(); + return_if_io!(stats_cursor.insert(&BTreeKey::IndexKey(k))); + self.insert_state = VectorSparseInvertedIndexInsertState::Prepare { + vector: vector.take(), + sum: *sum, idx: *idx + 1, rowid: *rowid, - positions: positions.take(), }; } } @@ -372,11 +702,16 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { } fn delete(&mut self, values: &[Register]) -> Result> { - let Some(cursor) = &mut self.scratch_cursor else { + let Some(cursor) = &mut self.inverted_index_cursor else { return Err(LimboError::InternalError( "cursor must be opened".to_string(), )); }; + let Some(stats_cursor) = &mut self.stats_cursor else { + return Err(LimboError::InternalError( + "stats cursor must be opened".to_string(), + )); + }; loop { tracing::debug!("delete_state: {:?}", self.delete_state); match &mut self.delete_state { @@ -386,7 +721,7 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { "first value must be sparse vector".to_string(), )); }; - let vector = Vector::from_slice(vector)?; + let vector = Vector::from_vec(vector.to_vec())?; if !matches!(vector.vector_type, VectorType::Float32Sparse) { return Err(LimboError::InternalError( "first value must be sparse vector".to_string(), @@ -397,64 +732,193 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { "second value must be i64 rowid".to_string(), )); }; - let sparse = vector.as_f32_sparse(); + let sum = vector.as_f32_sparse().values.iter().sum::() as f64; self.delete_state = VectorSparseInvertedIndexDeleteState::Prepare { - positions: Some(sparse.idx.to_vec()), + vector: Some(vector), + sum, rowid, idx: 0, } } VectorSparseInvertedIndexDeleteState::Prepare { - positions, + vector, + sum, rowid, idx, } => { - let p = positions.as_ref().unwrap(); - if *idx == p.len() { + let v = vector.as_ref().unwrap(); + if *idx == v.as_f32_sparse().idx.len() { self.delete_state = VectorSparseInvertedIndexDeleteState::Init; return Ok(IOResult::Done(())); } - let position = p[*idx]; + let position = v.as_f32_sparse().idx[*idx]; let key = ImmutableRecord::from_values( - &[Value::Integer(position as i64), Value::Integer(*rowid)], - 2, + &[ + Value::Integer(position as i64), + Value::Float(*sum), + Value::Integer(*rowid), + ], + 3, ); - self.delete_state = VectorSparseInvertedIndexDeleteState::Seek { + self.delete_state = VectorSparseInvertedIndexDeleteState::SeekInverted { + vector: vector.take(), idx: *idx, + sum: *sum, rowid: *rowid, - positions: positions.take(), key: Some(key), }; } - VectorSparseInvertedIndexDeleteState::Seek { - positions, + VectorSparseInvertedIndexDeleteState::SeekInverted { + vector, + sum, rowid, idx, key, } => { + tracing::debug!( + "delete_state: seek: component={}, sum={}, rowid={}", + vector.as_ref().unwrap().as_f32_sparse().idx[*idx], + *sum, + *rowid, + ); let k = key.as_ref().unwrap(); let result = return_if_io!( cursor.seek(SeekKey::IndexKey(k), SeekOp::GE { eq_only: true }) ); - if !matches!(result, SeekResult::Found) { + match result { + SeekResult::Found => { + self.delete_state = + VectorSparseInvertedIndexDeleteState::DeleteInverted { + vector: vector.take(), + sum: *sum, + idx: *idx, + rowid: *rowid, + }; + } + SeekResult::TryAdvance => { + self.delete_state = + VectorSparseInvertedIndexDeleteState::NextInverted { + vector: vector.take(), + sum: *sum, + idx: *idx, + rowid: *rowid, + }; + } + SeekResult::NotFound => { + return Err(LimboError::Corrupt("inverted index corrupted".to_string())) + } + } + } + VectorSparseInvertedIndexDeleteState::NextInverted { + vector, + sum, + rowid, + idx, + } => { + if !return_if_io!(cursor.next()) { return Err(LimboError::Corrupt("inverted index corrupted".to_string())); } - self.delete_state = VectorSparseInvertedIndexDeleteState::Insert { + self.delete_state = VectorSparseInvertedIndexDeleteState::DeleteInverted { + vector: vector.take(), + sum: *sum, idx: *idx, rowid: *rowid, - positions: positions.take(), }; } - VectorSparseInvertedIndexDeleteState::Insert { - positions, + VectorSparseInvertedIndexDeleteState::DeleteInverted { + vector, + sum, rowid, idx, } => { return_if_io!(cursor.delete()); + let v = vector.as_ref().unwrap(); + let position = v.as_f32_sparse().idx[*idx]; + let key = ImmutableRecord::from_values(&[Value::Integer(position as i64)], 1); + self.delete_state = VectorSparseInvertedIndexDeleteState::SeekStats { + vector: vector.take(), + sum: *sum, + idx: *idx, + rowid: *rowid, + key: Some(key), + }; + } + VectorSparseInvertedIndexDeleteState::SeekStats { + vector, + sum, + key, + rowid, + idx, + } => { + let k = key.as_ref().unwrap(); + let result = return_if_io!( + stats_cursor.seek(SeekKey::IndexKey(k), SeekOp::GE { eq_only: true }) + ); + match result { + SeekResult::Found => { + self.delete_state = VectorSparseInvertedIndexDeleteState::ReadStats { + vector: vector.take(), + sum: *sum, + idx: *idx, + rowid: *rowid, + }; + } + SeekResult::NotFound | SeekResult::TryAdvance => { + return Err(LimboError::Corrupt( + "stats index corrupted: can't find component row".to_string(), + )) + } + } + } + VectorSparseInvertedIndexDeleteState::ReadStats { + vector, + sum, + rowid, + idx, + } => { + let record = return_if_io!(stats_cursor.record()); + let component = parse_stat_row(record.as_deref())?; + let v = vector.as_ref().unwrap(); + let position = v.as_f32_sparse().idx[*idx]; + tracing::debug!( + "update stats(delete): {} (cnt={}, min={}, max={})", + position, + component.cnt - 1, + component.min, + component.max, + ); + let key = ImmutableRecord::from_values( + &[ + Value::Integer(position as i64), + Value::Integer(component.cnt - 1), + Value::Float(component.min), + Value::Float(component.max), + ], + 4, + ); + self.delete_state = VectorSparseInvertedIndexDeleteState::UpdateStats { + vector: vector.take(), + sum: *sum, + idx: *idx, + rowid: *rowid, + key: Some(key), + }; + } + VectorSparseInvertedIndexDeleteState::UpdateStats { + vector, + sum, + key, + rowid, + idx, + } => { + let k = key.as_ref().unwrap(); + return_if_io!(stats_cursor.insert(&BTreeKey::IndexKey(k))); + self.delete_state = VectorSparseInvertedIndexDeleteState::Prepare { + vector: vector.take(), + sum: *sum, idx: *idx + 1, rowid: *rowid, - positions: positions.take(), }; } } @@ -462,7 +926,12 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { } fn query_start(&mut self, values: &[Register]) -> Result> { - let Some(scratch) = &mut self.scratch_cursor else { + let Some(inverted) = &mut self.inverted_index_cursor else { + return Err(LimboError::InternalError( + "cursor must be opened".to_string(), + )); + }; + let Some(stats) = &mut self.stats_cursor else { return Err(LimboError::InternalError( "cursor must be opened".to_string(), )); @@ -486,161 +955,331 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { "second value must be i64 limit parameter".to_string(), )); }; - let vector = Vector::from_slice(vector)?; + let vector = Vector::from_vec(vector.to_vec())?; if !matches!(vector.vector_type, VectorType::Float32Sparse) { return Err(LimboError::InternalError( "first value must be sparse vector".to_string(), )); } let sparse = vector.as_f32_sparse(); - self.search_state = VectorSparseInvertedIndexSearchState::Prepare { - collected: Some(HashSet::new()), - positions: Some(sparse.idx.to_vec()), - idx: 0, - limit, - }; + let sum = sparse.values.iter().sum::() as f64; + self.search_state = + VectorSparseInvertedIndexSearchState::CollectComponentsSeek { + sum, + vector: Some(vector), + idx: 0, + components: Some(Vec::new()), + key: None, + limit, + }; } - VectorSparseInvertedIndexSearchState::Prepare { - collected, - positions, + VectorSparseInvertedIndexSearchState::CollectComponentsSeek { + sum, + vector, idx, + components, limit, + key, } => { - let p = positions.as_ref().unwrap(); - if *idx == p.len() { - let mut rowids = collected - .take() - .unwrap() - .iter() - .cloned() + let p = &vector.as_ref().unwrap().as_f32_sparse().idx[*idx..]; + if p.is_empty() && key.is_none() { + let mut components = components.take().unwrap(); + match self.scan_order { + ScanOrder::DatasetFrequencyAsc => { + // order by cnt ASC in order to check low-cardinality components first + components.sort_by_key(|(c, _)| c.cnt); + } + ScanOrder::QueryWeightDesc => { + // order by weight DESC in order to check high-impact components first + components + .sort_by_key(|(_, w)| std::cmp::Reverse(FloatOrd(*w as f64))); + } + } + let take = (components.len() as f64 * self.scan_portion).ceil() as usize; + let components = components + .into_iter() + .take(take) + .map(|(c, _)| c) .collect::>(); - rowids.sort(); - self.search_state = VectorSparseInvertedIndexSearchState::EvaluateSeek { - rowids: Some(rowids), + + tracing::debug!( + "query_start: components: {:?}, delta: {}, scan_portion: {}, scan_order: {:?}", + components, + self.delta, + self.scan_portion, + self.scan_order, + ); + self.search_state = VectorSparseInvertedIndexSearchState::Seek { + sum: *sum, + components: Some(components.into()), + collected: Some(HashSet::new()), distances: Some(BTreeSet::new()), limit: *limit, + key: None, + component: None, + sum_threshold: None, }; continue; } - let position = p[*idx]; - let key = ImmutableRecord::from_values(&[Value::Integer(position as i64)], 1); - self.search_state = VectorSparseInvertedIndexSearchState::Seek { - collected: collected.take(), - positions: positions.take(), - key: Some(key), - idx: *idx, - limit: *limit, - }; - } - VectorSparseInvertedIndexSearchState::Seek { - collected, - positions, - key, - idx, - limit, - } => { + if key.is_none() { + let position = vector.as_ref().unwrap().as_f32_sparse().idx[*idx]; + *key = Some(ImmutableRecord::from_values( + &[Value::Integer(position as i64)], + 1, + )); + } let k = key.as_ref().unwrap(); let result = return_if_io!( - scratch.seek(SeekKey::IndexKey(k), SeekOp::GE { eq_only: false }) + stats.seek(SeekKey::IndexKey(k), SeekOp::GE { eq_only: true }) + ); + match result { + SeekResult::Found => { + self.search_state = + VectorSparseInvertedIndexSearchState::CollectComponentsRead { + sum: *sum, + vector: vector.take(), + idx: *idx, + components: components.take(), + limit: *limit, + }; + } + SeekResult::NotFound | SeekResult::TryAdvance => { + self.search_state = + VectorSparseInvertedIndexSearchState::CollectComponentsSeek { + sum: *sum, + components: components.take(), + vector: vector.take(), + idx: *idx + 1, + limit: *limit, + key: None, + }; + } + } + } + VectorSparseInvertedIndexSearchState::CollectComponentsRead { + sum, + vector, + idx, + components, + limit, + } => { + let record = return_if_io!(stats.record()); + let v = vector.as_ref().unwrap().as_f32_sparse().values[*idx]; + let component = parse_stat_row(record.as_deref())?; + components.as_mut().unwrap().push((component, v)); + self.search_state = + VectorSparseInvertedIndexSearchState::CollectComponentsSeek { + sum: *sum, + components: components.take(), + vector: vector.take(), + idx: *idx + 1, + limit: *limit, + key: None, + }; + } + VectorSparseInvertedIndexSearchState::Seek { + sum, + components, + collected, + distances, + limit, + key, + component, + sum_threshold, + } => { + let c = components.as_ref().unwrap(); + if c.is_empty() && key.is_none() { + let distances = distances.take().unwrap(); + self.search_result = distances.iter().map(|(d, i)| (*i, d.0)).collect(); + return Ok(IOResult::Done(!self.search_result.is_empty())); + } + if key.is_none() { + // we estimate jaccard distance with the following approach: + // J = min(L, M1 + M2 + ... + Mr) / (Q + N - min(L, M1 + M2 + ... + Mr)) + // so we want J > best + delta; define M1 + M2 + ... + Mr = M + // J = min(L, M) / (Q + L - min(L, M)) > best + delta + // we need to consider two cases: + // 1. L < M: J = L / (Q + L - L) > best + delta => L > (best + delta) * Q + // 2. L > M: J = M / (Q + L - M) > best + delta => M > (best + delta) * (Q + L - M) => L < M / (best + delta) - (Q - M) + // so we have two intervals: [(best + delta) * Q .. M] and [M .. M / (best + delta) - (Q - M)] + // to simplify code for now we will pick upper bound from second range if it is not degenerate, otherwise check first range + let m = c.iter().map(|c| c.max).sum::().min(*sum); + if distances.as_ref().unwrap().len() >= *limit as usize { + if let Some((max_threshold, _)) = distances.as_ref().unwrap().last() { + let best = 1.0 - max_threshold.0; + let delta = self.delta; + let q = *sum; + + if best > 0.0 { + let first_range_l = (best + delta) * q; + let second_range_r = m / (best + delta) - (q - m); + if m <= second_range_r { + *sum_threshold = Some(second_range_r); + } else if first_range_l <= m { + *sum_threshold = Some(m); + } else { + *sum_threshold = Some(-1.0); + } + tracing::debug!( + "sum_threshold={:?}, max_threshold={}, remained_sum={}, sum={}, components={:?}", + sum_threshold, + best, + m, + sum, + c + ); + } + } + } + let c = components.as_mut().unwrap().pop_front().unwrap(); + *key = Some(ImmutableRecord::from_values( + &[Value::Integer(c.position as i64)], + 1, + )); + *component = Some(c.position); + } + let k = key.as_ref().unwrap(); + let result = return_if_io!( + inverted.seek(SeekKey::IndexKey(k), SeekOp::GE { eq_only: false }) ); match result { SeekResult::Found => { self.search_state = VectorSparseInvertedIndexSearchState::Read { + sum: *sum, + components: components.take(), collected: collected.take(), - positions: positions.take(), - key: key.take(), - idx: *idx, + distances: distances.take(), + current: Some(Vec::new()), limit: *limit, + sum_threshold: sum_threshold.take(), + component: component.unwrap(), }; } SeekResult::TryAdvance | SeekResult::NotFound => { self.search_state = VectorSparseInvertedIndexSearchState::Next { + sum: *sum, + components: components.take(), collected: collected.take(), - positions: positions.take(), - key: key.take(), - idx: *idx, + distances: distances.take(), + current: Some(Vec::new()), limit: *limit, + sum_threshold: sum_threshold.take(), + component: component.unwrap(), }; } } } VectorSparseInvertedIndexSearchState::Read { + sum, + components, collected, - positions, - key, - idx, + distances, limit, + sum_threshold, + component, + current, } => { - let record = return_if_io!(scratch.record()); - if let Some(record) = record { - let ValueRef::Integer(position) = record.get_value(0)? else { - return Err(LimboError::InternalError( - "first value of index record must be int".to_string(), - )); + let record = return_if_io!(inverted.record()); + let row = parse_inverted_index_row(record.as_deref())?; + if row.position != *component + || (sum_threshold.is_some() && row.sum > sum_threshold.unwrap()) + { + let mut current = current.take().unwrap(); + current.sort(); + + self.search_state = VectorSparseInvertedIndexSearchState::EvaluateSeek { + sum: *sum, + components: components.take(), + collected: collected.take(), + distances: distances.take(), + limit: *limit, + current: Some(current.into()), + rowid: None, }; - let ValueRef::Integer(rowid) = record.get_value(1)? else { - return Err(LimboError::InternalError( - "second value of index record must be int".to_string(), - )); - }; - tracing::debug!("position/rowid: {}/{}", position, rowid); - if position == positions.as_ref().unwrap()[*idx] as i64 { - collected.as_mut().unwrap().insert(rowid); - self.search_state = VectorSparseInvertedIndexSearchState::Next { - collected: collected.take(), - positions: positions.take(), - key: key.take(), - idx: *idx, - limit: *limit, - }; - continue; - } + continue; } - self.search_state = VectorSparseInvertedIndexSearchState::Prepare { + if collected.as_mut().unwrap().insert(row.rowid) { + current.as_mut().unwrap().push(row.rowid); + } + + self.search_state = VectorSparseInvertedIndexSearchState::Next { + sum: *sum, + components: components.take(), collected: collected.take(), - positions: positions.take(), - idx: *idx + 1, + distances: distances.take(), limit: *limit, + sum_threshold: *sum_threshold, + component: *component, + current: current.take(), }; } VectorSparseInvertedIndexSearchState::Next { + sum, + components, collected, - positions, - key, - idx, + distances, limit, + sum_threshold, + component, + current, } => { - let result = return_if_io!(scratch.next()); + let result = return_if_io!(inverted.next()); if !result { - self.search_state = VectorSparseInvertedIndexSearchState::Prepare { + let mut current = current.take().unwrap(); + current.sort(); + + self.search_state = VectorSparseInvertedIndexSearchState::EvaluateSeek { + sum: *sum, + components: components.take(), collected: collected.take(), - positions: positions.take(), - idx: *idx + 1, + distances: distances.take(), limit: *limit, + current: Some(current.into()), + rowid: None, }; } else { self.search_state = VectorSparseInvertedIndexSearchState::Read { + sum: *sum, + components: components.take(), collected: collected.take(), - positions: positions.take(), - key: key.take(), - idx: *idx, + distances: distances.take(), limit: *limit, + sum_threshold: *sum_threshold, + component: *component, + current: current.take(), }; } } VectorSparseInvertedIndexSearchState::EvaluateSeek { - rowids, + sum, + components, + collected, distances, limit, + current, + rowid, } => { - let Some(rowid) = rowids.as_ref().unwrap().last() else { - let distances = distances.take().unwrap(); - self.search_result = distances.iter().map(|(d, i)| (*i, d.0)).collect(); - return Ok(IOResult::Done(!self.search_result.is_empty())); - }; - let result = return_if_io!( - main.seek(SeekKey::TableRowId(*rowid), SeekOp::GE { eq_only: true }) - ); + let c = current.as_ref().unwrap(); + if c.is_empty() && rowid.is_none() { + self.search_state = VectorSparseInvertedIndexSearchState::Seek { + sum: *sum, + components: components.take(), + collected: collected.take(), + distances: distances.take(), + limit: *limit, + component: None, + key: None, + sum_threshold: None, + }; + continue; + } + if rowid.is_none() { + *rowid = Some(current.as_mut().unwrap().pop_front().unwrap()); + } + + let rowid = *rowid.as_ref().unwrap(); + let k = SeekKey::TableRowId(rowid); + let result = return_if_io!(main.seek(k, SeekOp::GE { eq_only: true })); if !matches!(result, SeekResult::Found) { return Err(LimboError::Corrupt( "vector_sparse_ivf corrupted: unable to find rowid in main table" @@ -648,18 +1287,25 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { )); }; self.search_state = VectorSparseInvertedIndexSearchState::EvaluateRead { - rowids: rowids.take(), + sum: *sum, + components: components.take(), + collected: collected.take(), distances: distances.take(), limit: *limit, + current: current.take(), + rowid, }; } VectorSparseInvertedIndexSearchState::EvaluateRead { - rowids, + sum, + components, + collected, distances, limit, + current, + rowid, } => { let record = return_if_io!(main.record()); - let rowid = rowids.as_mut().unwrap().pop().unwrap(); if let Some(record) = record { let column_idx = self.configuration.columns[0].pos_in_table; let ValueRef::Blob(data) = record.get_value(column_idx)? else { @@ -691,16 +1337,20 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor { ); let distance = operations::jaccard::vector_distance_jaccard(&data, &arg)?; let distances = distances.as_mut().unwrap(); - distances.insert((FloatOrd(distance), rowid)); + distances.insert((FloatOrd(distance), *rowid)); if distances.len() > *limit as usize { let _ = distances.pop_last(); } } self.search_state = VectorSparseInvertedIndexSearchState::EvaluateSeek { - rowids: rowids.take(), + sum: *sum, + components: components.take(), + collected: collected.take(), distances: distances.take(), limit: *limit, + current: current.take(), + rowid: None, }; } } diff --git a/core/translate/integrity_check.rs b/core/translate/integrity_check.rs index a2bafe3f0..c4ee30289 100644 --- a/core/translate/integrity_check.rs +++ b/core/translate/integrity_check.rs @@ -18,7 +18,9 @@ pub fn translate_integrity_check( root_pages.push(table.root_page); if let Some(indexes) = schema.indexes.get(table.name.as_str()) { for index in indexes.iter() { - root_pages.push(index.root_page); + if index.root_page > 0 { + root_pages.push(index.root_page); + } } } }; diff --git a/core/types.rs b/core/types.rs index 617d68a2e..cec21c92f 100644 --- a/core/types.rs +++ b/core/types.rs @@ -16,7 +16,7 @@ use crate::translate::plan::IterationDirection; use crate::vdbe::sorter::Sorter; use crate::vdbe::Register; use crate::vtab::VirtualTableCursor; -use crate::{turso_assert, Completion, CompletionError, Result, IO}; +use crate::{Completion, CompletionError, Result, IO}; use std::fmt::{Debug, Display}; use std::task::Waker; @@ -1594,9 +1594,21 @@ pub fn compare_immutable( r: &[ValueRef], column_info: &[KeyInfo], ) -> std::cmp::Ordering { - assert_eq!(l.len(), r.len()); - turso_assert!(column_info.len() >= l.len(), "column_info.len() < l.len()"); - for (i, (l, r)) in l.iter().zip(r).enumerate() { + assert!( + l.len() >= column_info.len(), + "{} < {}", + l.len(), + column_info.len() + ); + assert!( + r.len() >= column_info.len(), + "{} < {}", + r.len(), + column_info.len() + ); + let l_values = l.iter().take(column_info.len()); + let r_values = r.iter().take(column_info.len()); + for (i, (l, r)) in l_values.zip(r_values).enumerate() { let column_order = column_info[i].sort_order; let collation = column_info[i].collation; let cmp = match (l, r) { @@ -1720,10 +1732,6 @@ fn compare_records_int( index_info: &IndexInfo, tie_breaker: std::cmp::Ordering, ) -> Result { - turso_assert!( - index_info.key_info.len() >= unpacked.len(), - "index_info.key_info.len() < unpacked.len()" - ); let payload = serialized.get_payload(); if payload.len() < 2 { return compare_records_generic(serialized, unpacked, index_info, 0, tie_breaker); @@ -1813,10 +1821,6 @@ fn compare_records_string( index_info: &IndexInfo, tie_breaker: std::cmp::Ordering, ) -> Result { - turso_assert!( - index_info.key_info.len() >= unpacked.len(), - "index_info.key_info.len() < unpacked.len()" - ); let payload = serialized.get_payload(); if payload.len() < 2 { return compare_records_generic(serialized, unpacked, index_info, 0, tie_breaker); @@ -1926,10 +1930,6 @@ pub fn compare_records_generic( skip: usize, tie_breaker: std::cmp::Ordering, ) -> Result { - turso_assert!( - index_info.key_info.len() >= unpacked.len(), - "index_info.key_info.len() < unpacked.len()" - ); let payload = serialized.get_payload(); if payload.is_empty() { return Ok(std::cmp::Ordering::Less); @@ -1960,7 +1960,8 @@ pub fn compare_records_generic( } let mut field_idx = skip; - while field_idx < unpacked.len() && header_pos < header_end { + let field_limit = unpacked.len().min(index_info.key_info.len()); + while field_idx < field_limit && header_pos < header_end { let (serial_type_raw, bytes_read) = read_varint(&payload[header_pos..])?; header_pos += bytes_read; diff --git a/core/vector/operations/slice.rs b/core/vector/operations/slice.rs index a1f6b99d8..9b6de84a2 100644 --- a/core/vector/operations/slice.rs +++ b/core/vector/operations/slice.rs @@ -37,7 +37,7 @@ pub fn vector_slice(vector: &Vector, start: usize, end: usize) -> Result { + assert!( + *a >= *b || (*a - *b).abs() < 1e-5, + "a={}, b={}, delta={}", + *a, + *b, + delta + ); + assert!( + *a - delta <= *b || (*a - delta - *b).abs() < 1e-5, + "a={}, b={}, delta={}", + *a, + *b, + delta + ); + } + _ => panic!("unexpected column values"), + } + } } for row in simple_rows.iter().skip(index_rows.len()) { match row[1] { - rusqlite::types::Value::Real(r) => assert_eq!(r, 1.0), + rusqlite::types::Value::Real(r) => assert!((1.0 - r) < 1e-5), _ => panic!("unexpected simple row value"), } } - tracing::info!("simple: {:?}, index_rows: {:?}", simple_rows, index_rows); } } }