have RecordCursor::get_values return an Iterator for actual lazy deserialization. Unfortunately we won't see much improvement yet as we do not store the RecordCursor when calling ImmutableRecord::get_values

This commit is contained in:
pedrocarlo
2025-11-11 14:32:29 -03:00
parent e1d36a2221
commit bc06bb0415
4 changed files with 96 additions and 54 deletions

View File

@@ -450,11 +450,11 @@ impl<Clock: LogicalClock> CheckpointStateMachine<Clock> {
let record = ImmutableRecord::from_bin_record(row_version.row.data.clone());
let mut record_cursor = RecordCursor::new();
record_cursor.parse_full_header(&record).unwrap();
let values = record_cursor.get_values(&record)?;
let values = record_cursor.get_values(&record);
let mut values = values
.into_iter()
.map(|value| value.to_owned())
.collect::<Vec<_>>();
.map(|value| value.map(|v| v.to_owned()))
.collect::<Result<Vec<_>>>()?;
values[3] = Value::Integer(root_page as i64);
let record = ImmutableRecord::from_values(&values, values.len());
row_version.row.data = record.get_payload().to_owned();

View File

@@ -1902,12 +1902,10 @@ impl<Clock: LogicalClock> MvStore<Clock> {
let row_data = row.data.clone();
let record = ImmutableRecord::from_bin_record(row_data);
let mut record_cursor = RecordCursor::new();
let record_values = record_cursor.get_values(&record).unwrap();
let ValueRef::Integer(root_page) = record_values[3] else {
panic!(
"Expected integer value for root page, got {:?}",
record_values[3]
);
let mut record_values = record_cursor.get_values(&record);
let val = record_values.nth(3).unwrap()?;
let ValueRef::Integer(root_page) = val else {
panic!("Expected integer value for root page, got {val:?}");
};
if root_page < 0 {
let table_id = self.get_table_id_from_root_page(root_page);

View File

@@ -1618,7 +1618,7 @@ impl BTreeCursor {
.index_info
.as_ref()
.expect("indexbtree_move_to without index_info");
find_compare(&key_values, index_info)
find_compare(key_values.iter().peekable(), index_info)
};
tracing::debug!("Using record comparison strategy: {:?}", record_comparer);
let tie_breaker = get_tie_breaker_from_seek_op(cmp);
@@ -2002,7 +2002,7 @@ impl BTreeCursor {
.index_info
.as_ref()
.expect("indexbtree_seek without index_info");
find_compare(&key_values, index_info)
find_compare(key_values.iter().peekable(), index_info)
};
tracing::debug!(

View File

@@ -20,6 +20,7 @@ use crate::vtab::VirtualTableCursor;
use crate::{Completion, CompletionError, Result, IO};
use std::borrow::{Borrow, Cow};
use std::fmt::{Debug, Display};
use std::iter::Peekable;
use std::ops::Deref;
use std::task::Waker;
@@ -993,9 +994,14 @@ impl ImmutableRecord {
// TODO: inline the complete record parsing code here.
// Its probably more efficient.
// fixme(pedrocarlo): this function is very inneficient and kind of misleading because
// it always deserializes the columns
pub fn get_values<'a>(&'a self) -> Vec<ValueRef<'a>> {
let mut cursor = RecordCursor::new();
cursor.get_values(self).unwrap_or_default()
cursor
.get_values(self)
.collect::<Result<Vec<_>>>()
.unwrap_or_default()
}
pub fn from_registers<'a, I: Iterator<Item = &'a Register> + Clone>(
@@ -1010,7 +1016,7 @@ impl ImmutableRecord {
}
pub fn from_values<'a>(
values: impl IntoIterator<Item = &'a Value> + Clone,
values: impl IntoIterator<Item = impl AsValueRef + 'a> + Clone,
len: usize,
) -> Self {
let mut serials = Vec::with_capacity(len);
@@ -1020,7 +1026,7 @@ impl ImmutableRecord {
let mut serial_type_buf = [0; 9];
// write serial types
for value in values.clone() {
let serial_type = SerialType::from(value);
let serial_type = SerialType::from(value.as_value_ref());
let n = write_varint(&mut serial_type_buf[0..], serial_type.into());
serials.push((serial_type_buf, n));
@@ -1049,28 +1055,29 @@ impl ImmutableRecord {
// write content
for value in values {
let value = value.as_value_ref();
match value {
Value::Null => {}
Value::Integer(i) => {
ValueRef::Null => {}
ValueRef::Integer(i) => {
let serial_type = SerialType::from(value);
match serial_type.kind() {
SerialTypeKind::ConstInt0 | SerialTypeKind::ConstInt1 => {}
SerialTypeKind::I8 => writer.extend_from_slice(&(*i as i8).to_be_bytes()),
SerialTypeKind::I16 => writer.extend_from_slice(&(*i as i16).to_be_bytes()),
SerialTypeKind::I8 => writer.extend_from_slice(&(i as i8).to_be_bytes()),
SerialTypeKind::I16 => writer.extend_from_slice(&(i as i16).to_be_bytes()),
SerialTypeKind::I24 => {
writer.extend_from_slice(&(*i as i32).to_be_bytes()[1..])
writer.extend_from_slice(&(i as i32).to_be_bytes()[1..])
} // remove most significant byte
SerialTypeKind::I32 => writer.extend_from_slice(&(*i as i32).to_be_bytes()),
SerialTypeKind::I32 => writer.extend_from_slice(&(i as i32).to_be_bytes()),
SerialTypeKind::I48 => writer.extend_from_slice(&i.to_be_bytes()[2..]), // remove 2 most significant bytes
SerialTypeKind::I64 => writer.extend_from_slice(&i.to_be_bytes()),
other => panic!("Serial type is not an integer: {other:?}"),
}
}
Value::Float(f) => writer.extend_from_slice(&f.to_be_bytes()),
Value::Text(t) => {
ValueRef::Float(f) => writer.extend_from_slice(&f.to_be_bytes()),
ValueRef::Text(t) => {
writer.extend_from_slice(t.value.as_bytes());
}
Value::Blob(b) => {
ValueRef::Blob(b) => {
writer.extend_from_slice(b);
}
};
@@ -1440,19 +1447,48 @@ impl RecordCursor {
/// * `Ok(Vec<RefValue>)` - All values in column order
/// * `Err(LimboError)` - Parsing or deserialization failed
///
pub fn get_values<'a>(&mut self, record: &'a ImmutableRecord) -> Result<Vec<ValueRef<'a>>> {
if record.is_invalidated() {
return Ok(Vec::new());
pub fn get_values<'a, 'b>(
&'b mut self,
record: &'a ImmutableRecord,
) -> Peekable<impl ExactSizeIterator<Item = Result<ValueRef<'a>>> + use<'a, 'b>> {
struct GetValues<'a, 'b> {
cursor: &'b mut RecordCursor,
record: &'a ImmutableRecord,
idx: usize,
}
self.parse_full_header(record)?;
let mut result = Vec::with_capacity(self.serial_types.len());
impl<'a, 'b> Iterator for GetValues<'a, 'b> {
type Item = Result<ValueRef<'a>>;
for i in 0..self.serial_types.len() {
result.push(self.deserialize_column(record, i)?);
fn next(&mut self) -> Option<Self::Item> {
if self.idx == 0 {
// So that we can have the full length of serial types
if let Err(err) = self.cursor.parse_full_header(self.record) {
return Some(Err(err));
}
}
if !self.record.is_invalidated() && self.idx < self.cursor.serial_types.len() {
let res = self.cursor.deserialize_column(self.record, self.idx);
self.idx += 1;
Some(res)
} else {
None
}
}
}
Ok(result)
impl<'a, 'b> ExactSizeIterator for GetValues<'a, 'b> {
fn len(&self) -> usize {
self.cursor.serial_types.len() - self.idx
}
}
let get_values = GetValues {
cursor: self,
record,
idx: 0,
};
get_values.peekable()
}
}
@@ -1790,9 +1826,16 @@ impl RecordCompare {
}
}
pub fn find_compare(unpacked: &[ValueRef], index_info: &IndexInfo) -> RecordCompare {
if !unpacked.is_empty() && index_info.num_cols <= 13 {
match &unpacked[0] {
pub fn find_compare<I, E, V>(unpacked: I, index_info: &IndexInfo) -> RecordCompare
where
V: AsValueRef,
E: ExactSizeIterator<Item = V>,
I: IntoIterator<IntoIter = Peekable<E>, Item = V>,
{
let mut unpacked = unpacked.into_iter();
if unpacked.len() != 0 && index_info.num_cols <= 13 {
let val = unpacked.peek().unwrap();
match val.as_value_ref() {
ValueRef::Integer(_) => RecordCompare::Int,
ValueRef::Text(_) if index_info.key_info[0].collation == CollationSeq::Binary => {
RecordCompare::String
@@ -2297,23 +2340,24 @@ impl SerialType {
}
}
impl From<&Value> for SerialType {
fn from(value: &Value) -> Self {
impl<T: AsValueRef> From<T> for SerialType {
fn from(value: T) -> Self {
let value = value.as_value_ref();
match value {
Value::Null => SerialType::null(),
Value::Integer(i) => match i {
ValueRef::Null => SerialType::null(),
ValueRef::Integer(i) => match i {
0 => SerialType::const_int0(),
1 => SerialType::const_int1(),
i if *i >= I8_LOW && *i <= I8_HIGH => SerialType::i8(),
i if *i >= I16_LOW && *i <= I16_HIGH => SerialType::i16(),
i if *i >= I24_LOW && *i <= I24_HIGH => SerialType::i24(),
i if *i >= I32_LOW && *i <= I32_HIGH => SerialType::i32(),
i if *i >= I48_LOW && *i <= I48_HIGH => SerialType::i48(),
i if (I8_LOW..=I8_HIGH).contains(&i) => SerialType::i8(),
i if (I16_LOW..=I16_HIGH).contains(&i) => SerialType::i16(),
i if (I24_LOW..=I24_HIGH).contains(&i) => SerialType::i24(),
i if (I32_LOW..=I32_HIGH).contains(&i) => SerialType::i32(),
i if (I48_LOW..=I48_HIGH).contains(&i) => SerialType::i48(),
_ => SerialType::i64(),
},
Value::Float(_) => SerialType::f64(),
Value::Text(t) => SerialType::text(t.value.len() as u64),
Value::Blob(b) => SerialType::blob(b.len() as u64),
ValueRef::Float(_) => SerialType::f64(),
ValueRef::Text(t) => SerialType::text(t.value.len() as u64),
ValueRef::Blob(b) => SerialType::blob(b.len() as u64),
}
}
}
@@ -2785,7 +2829,7 @@ mod tests {
tie_breaker,
);
let comparer = find_compare(&unpacked_values, index_info);
let comparer = find_compare(unpacked_values.iter().peekable(), index_info);
let optimized_result = comparer
.compare(&serialized, &unpacked_values, index_info, 0, tie_breaker)
.unwrap();
@@ -3227,33 +3271,33 @@ mod tests {
);
let index_info_large = create_index_info(15, vec![SortOrder::Asc; 15], collations_large);
let int_values = vec![
let int_values = [
ValueRef::Integer(42),
ValueRef::Text(TextRef::new("hello", TextSubtype::Text)),
];
assert!(matches!(
find_compare(&int_values, &index_info_small),
find_compare(int_values.iter().peekable(), &index_info_small),
RecordCompare::Int
));
let string_values = vec![
let string_values = [
ValueRef::Text(TextRef::new("hello", TextSubtype::Text)),
ValueRef::Integer(42),
];
assert!(matches!(
find_compare(&string_values, &index_info_small),
find_compare(string_values.iter().peekable(), &index_info_small),
RecordCompare::String
));
let large_values: Vec<ValueRef> = (0..15).map(ValueRef::Integer).collect();
assert!(matches!(
find_compare(&large_values, &index_info_large),
find_compare(large_values.iter().peekable(), &index_info_large),
RecordCompare::Generic
));
let blob_values = vec![ValueRef::Blob(&[1, 2, 3])];
let blob_values = [ValueRef::Blob(&[1, 2, 3])];
assert!(matches!(
find_compare(&blob_values, &index_info_small),
find_compare(blob_values.iter().peekable(), &index_info_small),
RecordCompare::Generic
));
}