add components stats table

This commit is contained in:
Nikita Sivukhin
2025-10-29 18:10:48 +04:00
parent 96990e1168
commit a440102b9b

View File

@@ -20,7 +20,7 @@ use crate::{
operations,
vector_types::{Vector, VectorType},
},
Connection, LimboError, Result, Statement, Value, ValueRef,
Connection, LimboError, Result, Value, ValueRef,
};
/// Simple inverted index for sparse vectors
@@ -38,27 +38,45 @@ pub struct VectorSparseInvertedIndexMethodAttachment {
patterns: Vec<ast::Select>,
}
pub enum VectorSparseInvertedIndexCreateState {
Init,
Run { stmt: Box<Statement> },
}
#[derive(Debug)]
pub enum VectorSparseInvertedIndexInsertState {
Init,
Prepare {
positions: Option<Vec<u32>>,
vector: Option<Vector<'static>>,
sum: f64,
rowid: i64,
idx: usize,
},
Seek {
positions: Option<Vec<u32>>,
SeekScratch {
vector: Option<Vector<'static>>,
sum: f64,
key: Option<ImmutableRecord>,
rowid: i64,
idx: usize,
},
Insert {
positions: Option<Vec<u32>>,
InsertScratch {
vector: Option<Vector<'static>>,
sum: f64,
key: Option<ImmutableRecord>,
rowid: i64,
idx: usize,
},
SeekStats {
vector: Option<Vector<'static>>,
sum: f64,
key: Option<ImmutableRecord>,
rowid: i64,
idx: usize,
},
ReadStats {
vector: Option<Vector<'static>>,
sum: f64,
rowid: i64,
idx: usize,
},
UpdateStats {
vector: Option<Vector<'static>>,
sum: f64,
key: Option<ImmutableRecord>,
rowid: i64,
idx: usize,
@@ -69,18 +87,41 @@ pub enum VectorSparseInvertedIndexInsertState {
pub enum VectorSparseInvertedIndexDeleteState {
Init,
Prepare {
positions: Option<Vec<u32>>,
vector: Option<Vector<'static>>,
sum: f64,
rowid: i64,
idx: usize,
},
Seek {
positions: Option<Vec<u32>>,
SeekScratch {
vector: Option<Vector<'static>>,
sum: f64,
key: Option<ImmutableRecord>,
rowid: i64,
idx: usize,
},
Insert {
positions: Option<Vec<u32>>,
DeleteScratch {
vector: Option<Vector<'static>>,
sum: f64,
rowid: i64,
idx: usize,
},
SeekStats {
vector: Option<Vector<'static>>,
sum: f64,
key: Option<ImmutableRecord>,
rowid: i64,
idx: usize,
},
ReadStats {
vector: Option<Vector<'static>>,
sum: f64,
rowid: i64,
idx: usize,
},
UpdateStats {
vector: Option<Vector<'static>>,
sum: f64,
key: Option<ImmutableRecord>,
rowid: i64,
idx: usize,
},
@@ -145,10 +186,12 @@ enum VectorSparseInvertedIndexSearchState {
pub struct VectorSparseInvertedIndexMethodCursor {
configuration: IndexMethodConfiguration,
delta: f64,
scratch_btree: String,
scratch_cursor: Option<BTreeCursor>,
stats_btree: String,
stats_cursor: Option<BTreeCursor>,
main_btree: Option<BTreeCursor>,
create_state: VectorSparseInvertedIndexCreateState,
insert_state: VectorSparseInvertedIndexInsertState,
delete_state: VectorSparseInvertedIndexDeleteState,
search_state: VectorSparseInvertedIndexSearchState,
@@ -194,13 +237,20 @@ impl IndexMethodAttachment for VectorSparseInvertedIndexMethodAttachment {
impl VectorSparseInvertedIndexMethodCursor {
pub fn new(configuration: IndexMethodConfiguration) -> Self {
let scratch_btree = format!("{}_scratch", configuration.index_name);
let stats_btree = format!("{}_stats", configuration.index_name);
let delta = match configuration.parameters.get("delta") {
Some(&Value::Float(delta)) => delta,
None => 0.0,
};
Self {
configuration,
delta,
scratch_btree,
scratch_cursor: None,
stats_btree,
stats_cursor: None,
main_btree: None,
search_result: VecDeque::new(),
create_state: VectorSparseInvertedIndexCreateState::Init,
insert_state: VectorSparseInvertedIndexInsertState::Init,
delete_state: VectorSparseInvertedIndexDeleteState::Init,
search_state: VectorSparseInvertedIndexSearchState::Init,
@@ -208,57 +258,72 @@ impl VectorSparseInvertedIndexMethodCursor {
}
}
fn key_info() -> KeyInfo {
KeyInfo {
collation: CollationSeq::Binary,
sort_order: SortOrder::Asc,
}
}
impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor {
fn create(&mut self, connection: &Arc<Connection>) -> Result<IOResult<()>> {
loop {
match &mut self.create_state {
VectorSparseInvertedIndexCreateState::Init => {
let columns = &self.configuration.columns;
let columns = columns.iter().map(|x| x.name.as_str()).collect::<Vec<_>>();
let sql = format!(
"CREATE INDEX {} ON {} USING {} ({})",
self.scratch_btree,
self.configuration.table_name,
BACKING_BTREE_INDEX_METHOD_NAME,
columns.join(", ")
);
let stmt = connection.prepare(&sql)?;
connection.start_nested();
self.create_state = VectorSparseInvertedIndexCreateState::Run {
stmt: Box::new(stmt),
};
}
VectorSparseInvertedIndexCreateState::Run { stmt } => {
// we need to properly track subprograms and propagate result to the root program to make this execution async
let result = stmt.run_ignore_rows();
connection.end_nested();
result?;
return Ok(IOResult::Done(()));
}
}
// we need to properly track subprograms and propagate result to the root program to make this execution async
let columns = &self.configuration.columns;
let columns = columns.iter().map(|x| x.name.as_str()).collect::<Vec<_>>();
let scratch_index_create = format!(
"CREATE INDEX {} ON {} USING {} ({})",
self.scratch_btree,
self.configuration.table_name,
BACKING_BTREE_INDEX_METHOD_NAME,
columns.join(", ")
);
let stats_index_create = format!(
"CREATE INDEX {} ON {} USING {} ({})",
self.stats_btree,
self.configuration.table_name,
BACKING_BTREE_INDEX_METHOD_NAME,
columns.join(", ")
);
for sql in [scratch_index_create, stats_index_create] {
let mut stmt = connection.prepare(&sql)?;
connection.start_nested();
let result = stmt.run_ignore_rows();
connection.end_nested();
result?;
}
Ok(IOResult::Done(()))
}
fn destroy(&mut self, connection: &Arc<Connection>) -> Result<IOResult<()>> {
let sql = format!("DROP INDEX {}", self.scratch_btree);
let mut stmt = connection.prepare(&sql)?;
connection.start_nested();
let result = stmt.run_ignore_rows();
connection.end_nested();
result?;
let scratch_index_drop = format!("DROP INDEX {}", self.scratch_btree);
let stats_index_drop = format!("DROP INDEX {}", self.stats_btree);
for sql in [scratch_index_drop, stats_index_drop] {
let mut stmt = connection.prepare(&sql)?;
connection.start_nested();
let result = stmt.run_ignore_rows();
connection.end_nested();
result?;
}
Ok(IOResult::Done(()))
}
fn open_read(&mut self, connection: &Arc<Connection>) -> Result<IOResult<()>> {
let key_info = KeyInfo {
collation: CollationSeq::Binary,
sort_order: SortOrder::Asc,
};
self.scratch_cursor = Some(open_index_cursor(
connection,
&self.configuration.table_name,
&self.scratch_btree,
vec![key_info, key_info],
// component, length, rowid
vec![key_info(), key_info(), key_info()],
)?);
self.stats_cursor = Some(open_index_cursor(
connection,
&self.configuration.table_name,
&self.stats_btree,
// component, count, non-zero-min, non-zero-max
vec![key_info(), key_info(), key_info(), key_info()],
)?);
self.main_btree = Some(open_table_cursor(
connection,
@@ -268,23 +333,32 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor {
}
fn open_write(&mut self, connection: &Arc<Connection>) -> Result<IOResult<()>> {
let key_info = KeyInfo {
collation: CollationSeq::Binary,
sort_order: SortOrder::Asc,
};
self.scratch_cursor = Some(open_index_cursor(
connection,
&self.configuration.table_name,
&self.scratch_btree,
vec![key_info, key_info],
// component, length, rowid
vec![key_info(), key_info(), key_info()],
)?);
self.stats_cursor = Some(open_index_cursor(
connection,
&self.configuration.table_name,
&self.stats_btree,
// component
vec![key_info()],
)?);
Ok(IOResult::Done(()))
}
fn insert(&mut self, values: &[Register]) -> Result<IOResult<()>> {
let Some(cursor) = &mut self.scratch_cursor else {
let Some(scratch_cursor) = &mut self.scratch_cursor else {
return Err(LimboError::InternalError(
"cursor must be opened".to_string(),
"scratch cursor must be opened".to_string(),
));
};
let Some(stats_cursor) = &mut self.stats_cursor else {
return Err(LimboError::InternalError(
"stats cursor must be opened".to_string(),
));
};
loop {
@@ -296,7 +370,7 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor {
"first value must be sparse vector".to_string(),
));
};
let vector = Vector::from_slice(vector)?;
let vector = Vector::from_vec(vector.to_vec())?;
if !matches!(vector.vector_type, VectorType::Float32Sparse) {
return Err(LimboError::InternalError(
"first value must be sparse vector".to_string(),
@@ -307,64 +381,196 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor {
"second value must be i64 rowid".to_string(),
));
};
let sparse = vector.as_f32_sparse();
let sum = vector.as_f32_sparse().values.iter().sum::<f32>() as f64;
self.insert_state = VectorSparseInvertedIndexInsertState::Prepare {
positions: Some(sparse.idx.to_vec()),
vector: Some(vector),
sum,
rowid,
idx: 0,
}
}
VectorSparseInvertedIndexInsertState::Prepare {
positions,
vector,
sum,
rowid,
idx,
} => {
let p = positions.as_ref().unwrap();
if *idx == p.len() {
let v = vector.as_ref().unwrap();
if *idx == v.as_f32_sparse().idx.len() {
self.insert_state = VectorSparseInvertedIndexInsertState::Init;
return Ok(IOResult::Done(()));
}
let position = p[*idx];
let position = v.as_f32_sparse().idx[*idx];
let key = ImmutableRecord::from_values(
&[Value::Integer(position as i64), Value::Integer(*rowid)],
2,
&[
Value::Integer(position as i64),
Value::Float(*sum),
Value::Integer(*rowid),
],
3,
);
self.insert_state = VectorSparseInvertedIndexInsertState::Seek {
self.insert_state = VectorSparseInvertedIndexInsertState::SeekScratch {
vector: vector.take(),
sum: *sum,
idx: *idx,
rowid: *rowid,
positions: positions.take(),
key: Some(key),
};
}
VectorSparseInvertedIndexInsertState::Seek {
positions,
VectorSparseInvertedIndexInsertState::SeekScratch {
vector,
sum,
rowid,
idx,
key,
} => {
let k = key.as_ref().unwrap();
let _ = return_if_io!(
cursor.seek(SeekKey::IndexKey(k), SeekOp::GE { eq_only: false })
);
self.insert_state = VectorSparseInvertedIndexInsertState::Insert {
let _ =
return_if_io!(scratch_cursor
.seek(SeekKey::IndexKey(k), SeekOp::GE { eq_only: false }));
self.insert_state = VectorSparseInvertedIndexInsertState::InsertScratch {
vector: vector.take(),
sum: *sum,
idx: *idx,
rowid: *rowid,
positions: positions.take(),
key: key.take(),
};
}
VectorSparseInvertedIndexInsertState::Insert {
positions,
VectorSparseInvertedIndexInsertState::InsertScratch {
vector,
sum,
rowid,
idx,
key,
} => {
let k = key.as_ref().unwrap();
return_if_io!(cursor.insert(&BTreeKey::IndexKey(k)));
return_if_io!(scratch_cursor.insert(&BTreeKey::IndexKey(k)));
let v = vector.as_ref().unwrap();
let position = v.as_f32_sparse().idx[*idx];
let key = ImmutableRecord::from_values(&[Value::Integer(position as i64)], 1);
self.insert_state = VectorSparseInvertedIndexInsertState::SeekStats {
vector: vector.take(),
sum: *sum,
idx: *idx,
rowid: *rowid,
key: Some(key),
};
}
VectorSparseInvertedIndexInsertState::SeekStats {
vector,
sum,
key,
rowid,
idx,
} => {
let k = key.as_ref().unwrap();
let result = return_if_io!(
stats_cursor.seek(SeekKey::IndexKey(k), SeekOp::GE { eq_only: false })
);
match result {
SeekResult::Found => {
self.insert_state = VectorSparseInvertedIndexInsertState::ReadStats {
vector: vector.take(),
sum: *sum,
idx: *idx,
rowid: *rowid,
};
}
SeekResult::NotFound | SeekResult::TryAdvance => {
let v = vector.as_ref().unwrap();
let position = v.as_f32_sparse().idx[*idx];
let value = v.as_f32_sparse().values[*idx] as f64;
tracing::debug!(
"update stats(insert): {} (cnt={}, min={}, max={})",
position,
1,
value,
value,
);
let key = ImmutableRecord::from_values(
&[
Value::Integer(position as i64),
Value::Integer(1 as i64),
Value::Float(value),
Value::Float(value),
],
4,
);
self.insert_state = VectorSparseInvertedIndexInsertState::UpdateStats {
vector: vector.take(),
sum: *sum,
idx: *idx,
rowid: *rowid,
key: Some(key),
};
}
}
}
VectorSparseInvertedIndexInsertState::ReadStats {
vector,
sum,
rowid,
idx,
} => {
let record = return_if_io!(stats_cursor.record()).unwrap();
let ValueRef::Integer(cnt) = record.get_value(1)? else {
return Err(LimboError::Corrupt(format!(
"stats index corrupted: expected integer"
)));
};
let ValueRef::Float(min) = record.get_value(2)? else {
return Err(LimboError::Corrupt(format!(
"stats index corrupted: expected float"
)));
};
let ValueRef::Float(max) = record.get_value(3)? else {
return Err(LimboError::Corrupt(format!(
"stats index corrupted: expected float"
)));
};
let v = vector.as_ref().unwrap();
let position = v.as_f32_sparse().idx[*idx];
let value = v.as_f32_sparse().values[*idx] as f64;
tracing::debug!(
"update stats(insert): {} (cnt={}, min={}, max={})",
position,
cnt + 1,
value.min(min),
value.max(max),
);
let key = ImmutableRecord::from_values(
&[
Value::Integer(position as i64),
Value::Integer(cnt + 1),
Value::Float(value.min(min)),
Value::Float(value.max(max)),
],
4,
);
self.insert_state = VectorSparseInvertedIndexInsertState::UpdateStats {
vector: vector.take(),
sum: *sum,
idx: *idx,
rowid: *rowid,
key: Some(key),
};
}
VectorSparseInvertedIndexInsertState::UpdateStats {
vector,
sum,
key,
rowid,
idx,
} => {
let k = key.as_ref().unwrap();
return_if_io!(stats_cursor.insert(&BTreeKey::IndexKey(k)));
self.insert_state = VectorSparseInvertedIndexInsertState::Prepare {
vector: vector.take(),
sum: *sum,
idx: *idx + 1,
rowid: *rowid,
positions: positions.take(),
};
}
}
@@ -377,6 +583,11 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor {
"cursor must be opened".to_string(),
));
};
let Some(stats_cursor) = &mut self.stats_cursor else {
return Err(LimboError::InternalError(
"stats cursor must be opened".to_string(),
));
};
loop {
tracing::debug!("delete_state: {:?}", self.delete_state);
match &mut self.delete_state {
@@ -386,7 +597,7 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor {
"first value must be sparse vector".to_string(),
));
};
let vector = Vector::from_slice(vector)?;
let vector = Vector::from_vec(vector.to_vec())?;
if !matches!(vector.vector_type, VectorType::Float32Sparse) {
return Err(LimboError::InternalError(
"first value must be sparse vector".to_string(),
@@ -397,37 +608,45 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor {
"second value must be i64 rowid".to_string(),
));
};
let sparse = vector.as_f32_sparse();
let sum = vector.as_f32_sparse().values.iter().sum::<f32>() as f64;
self.delete_state = VectorSparseInvertedIndexDeleteState::Prepare {
positions: Some(sparse.idx.to_vec()),
vector: Some(vector),
sum,
rowid,
idx: 0,
}
}
VectorSparseInvertedIndexDeleteState::Prepare {
positions,
vector,
sum,
rowid,
idx,
} => {
let p = positions.as_ref().unwrap();
if *idx == p.len() {
let v = vector.as_ref().unwrap();
if *idx == v.as_f32_sparse().idx.len() {
self.delete_state = VectorSparseInvertedIndexDeleteState::Init;
return Ok(IOResult::Done(()));
}
let position = p[*idx];
let position = v.as_f32_sparse().idx[*idx];
let key = ImmutableRecord::from_values(
&[Value::Integer(position as i64), Value::Integer(*rowid)],
2,
&[
Value::Integer(position as i64),
Value::Float(*sum),
Value::Integer(*rowid),
],
3,
);
self.delete_state = VectorSparseInvertedIndexDeleteState::Seek {
self.delete_state = VectorSparseInvertedIndexDeleteState::SeekScratch {
vector: vector.take(),
idx: *idx,
sum: *sum,
rowid: *rowid,
positions: positions.take(),
key: Some(key),
};
}
VectorSparseInvertedIndexDeleteState::Seek {
positions,
VectorSparseInvertedIndexDeleteState::SeekScratch {
vector,
sum,
rowid,
idx,
key,
@@ -439,22 +658,121 @@ impl IndexMethodCursor for VectorSparseInvertedIndexMethodCursor {
if !matches!(result, SeekResult::Found) {
return Err(LimboError::Corrupt("inverted index corrupted".to_string()));
}
self.delete_state = VectorSparseInvertedIndexDeleteState::Insert {
self.delete_state = VectorSparseInvertedIndexDeleteState::DeleteScratch {
vector: vector.take(),
sum: *sum,
idx: *idx,
rowid: *rowid,
positions: positions.take(),
};
}
VectorSparseInvertedIndexDeleteState::Insert {
positions,
VectorSparseInvertedIndexDeleteState::DeleteScratch {
vector,
sum,
rowid,
idx,
} => {
return_if_io!(cursor.delete());
let v = vector.as_ref().unwrap();
let position = v.as_f32_sparse().idx[*idx];
let key = ImmutableRecord::from_values(&[Value::Integer(position as i64)], 1);
self.delete_state = VectorSparseInvertedIndexDeleteState::SeekStats {
vector: vector.take(),
sum: *sum,
idx: *idx,
rowid: *rowid,
key: Some(key),
};
}
VectorSparseInvertedIndexDeleteState::SeekStats {
vector,
sum,
key,
rowid,
idx,
} => {
let k = key.as_ref().unwrap();
let result = return_if_io!(
stats_cursor.seek(SeekKey::IndexKey(k), SeekOp::GE { eq_only: true })
);
match result {
SeekResult::Found => {
self.delete_state = VectorSparseInvertedIndexDeleteState::ReadStats {
vector: vector.take(),
sum: *sum,
idx: *idx,
rowid: *rowid,
};
}
SeekResult::NotFound | SeekResult::TryAdvance => {
return Err(LimboError::Corrupt(format!(
"stats index corrupted: can't find component row"
)))
}
}
}
VectorSparseInvertedIndexDeleteState::ReadStats {
vector,
sum,
rowid,
idx,
} => {
let record = return_if_io!(stats_cursor.record()).unwrap();
let ValueRef::Integer(cnt) = record.get_value(1)? else {
return Err(LimboError::Corrupt(format!(
"stats index corrupted: expected integer"
)));
};
let ValueRef::Float(min) = record.get_value(2)? else {
return Err(LimboError::Corrupt(format!(
"stats index corrupted: expected float"
)));
};
let ValueRef::Float(max) = record.get_value(3)? else {
return Err(LimboError::Corrupt(format!(
"stats index corrupted: expected float"
)));
};
let v = vector.as_ref().unwrap();
let position = v.as_f32_sparse().idx[*idx];
tracing::debug!(
"update stats(delete): {} (cnt={}, min={}, max={})",
position,
cnt - 1,
min,
max,
);
let key = ImmutableRecord::from_values(
&[
Value::Integer(position as i64),
Value::Integer(cnt - 1),
Value::Float(min),
Value::Float(max),
],
4,
);
self.delete_state = VectorSparseInvertedIndexDeleteState::UpdateStats {
vector: vector.take(),
sum: *sum,
idx: *idx,
rowid: *rowid,
key: Some(key),
};
}
VectorSparseInvertedIndexDeleteState::UpdateStats {
vector,
sum,
key,
rowid,
idx,
} => {
let k = key.as_ref().unwrap();
return_if_io!(stats_cursor.insert(&BTreeKey::IndexKey(k)));
self.delete_state = VectorSparseInvertedIndexDeleteState::Prepare {
vector: vector.take(),
sum: *sum,
idx: *idx + 1,
rowid: *rowid,
positions: positions.take(),
};
}
}