Decouple Value parsing and Record loading.

Introduced `RecordCursor`, using which we can parse the record
header incrementally.
This commit is contained in:
Krishna Vishal
2025-06-20 11:48:53 +05:30
parent 180bcc7b60
commit 35fa9b368c

View File

@@ -761,10 +761,6 @@ pub struct ImmutableRecord {
//
// payload is the Vec<u8> but in order to use Register which holds ImmutableRecord as a Value - we store Vec<u8> as Value::Blob
payload: Value,
header_offsets: Vec<u32>,
serial_types: Vec<u32>,
header_parsed_count: usize,
header_offset: usize,
}
// impl Debug for ImmutableRecord {
@@ -850,299 +846,23 @@ impl<'a> AppendWriter<'a> {
}
impl ImmutableRecord {
pub fn new(payload_capacity: usize, value_capacity: usize) -> Self {
pub fn new(payload_capacity: usize) -> Self {
Self {
payload: Value::Blob(Vec::with_capacity(payload_capacity)),
header_offsets: Vec::with_capacity(value_capacity + 1),
serial_types: Vec::with_capacity(value_capacity),
header_parsed_count: 0,
header_offset: 0,
payload: Vec::with_capacity(payload_capacity),
}
}
pub fn get<T: TryFrom<RefValue, Error = LimboError>>(&mut self, idx: usize) -> Result<T> {
let value = self.get_value(idx)?;
T::try_from(value)
}
fn parse_full_header(&mut self) -> Result<()> {
if self.payload.is_empty() {
return Ok(());
}
if self.header_parsed_count == 0 && self.header_offsets.is_empty() {
let (header_size, bytes_read) = read_varint(&self.payload)?;
self.header_offset = bytes_read;
self.header_offsets.push(header_size as u32);
}
if !self.header_offsets.is_empty() {
let header_size = self.header_offsets[0] as usize;
while self.header_offset < header_size && self.header_offset < self.payload.len() {
let (serial_type_u64, bytes_read) =
read_varint(&self.payload[self.header_offset..])?;
let serial_type = serial_type_u64 as u32;
self.header_offset += bytes_read;
self.serial_types.push(serial_type);
let serial_type_obj = SerialType::try_from(serial_type_u64)?;
let column_data_size = serial_type_obj.size();
let current_offset = self.header_offsets.last().copied().unwrap_or(0);
self.header_offsets
.push(current_offset + column_data_size as u32);
self.header_parsed_count += 1;
}
}
Ok(())
}
fn deserialize_column(&self, idx: usize) -> Result<RefValue> {
if idx >= self.serial_types.len() || idx + 2 >= self.header_offsets.len() {
return Ok(RefValue::Null);
}
let serial_type = self.serial_types[idx];
let start_offset = self.header_offsets[idx + 1] as usize;
let end_offset = self.header_offsets[idx + 2] as usize;
if start_offset > self.payload.len() || end_offset > self.payload.len() {
return Err(LimboError::Corrupt("Column data out of bounds".into()));
}
let column_data = &self.payload[start_offset..end_offset];
self.deserialize_column_data(serial_type, column_data)
}
fn deserialize_column_data(&self, serial_type: u32, data: &[u8]) -> Result<RefValue> {
let serial_type_obj = SerialType::try_from(serial_type as u64)?;
match serial_type_obj.kind() {
SerialTypeKind::Null => Ok(RefValue::Null),
SerialTypeKind::ConstInt0 => Ok(RefValue::Integer(0)),
SerialTypeKind::ConstInt1 => Ok(RefValue::Integer(1)),
SerialTypeKind::I8 => {
if data.len() >= 1 {
Ok(RefValue::Integer(data[0] as i8 as i64))
} else {
Err(LimboError::Corrupt("Invalid I8 data".into()))
}
}
SerialTypeKind::I16 => {
if data.len() >= 2 {
let value = i16::from_be_bytes([data[0], data[1]]) as i64;
Ok(RefValue::Integer(value))
} else {
Err(LimboError::Corrupt("Invalid I16 data".into()))
}
}
SerialTypeKind::I24 => {
if data.len() >= 3 {
// Sign extend the 24-bit value
let mut bytes = [0u8; 4];
bytes[1..4].copy_from_slice(data);
let value = i32::from_be_bytes(bytes);
// Sign extend from 24-bit to 32-bit
let value = if (value & 0x00800000) != 0 {
value | 0xFF000000u32 as i32
} else {
value
};
Ok(RefValue::Integer(value as i64))
} else {
Err(LimboError::Corrupt("Invalid I24 data".into()))
}
}
SerialTypeKind::I32 => {
if data.len() >= 4 {
let value = i32::from_be_bytes([data[0], data[1], data[2], data[3]]) as i64;
Ok(RefValue::Integer(value))
} else {
Err(LimboError::Corrupt("Invalid I32 data".into()))
}
}
SerialTypeKind::I48 => {
if data.len() >= 6 {
let mut bytes = [0u8; 8];
bytes[2..8].copy_from_slice(data);
let value = i64::from_be_bytes(bytes);
let value = if (value & 0x0000800000000000) != 0 {
value | 0xFFFF000000000000u64 as i64
} else {
value
};
Ok(RefValue::Integer(value))
} else {
Err(LimboError::Corrupt("Invalid I48 data".into()))
}
}
SerialTypeKind::I64 => {
if data.len() >= 8 {
let value = i64::from_be_bytes([
data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7],
]);
Ok(RefValue::Integer(value))
} else {
Err(LimboError::Corrupt("Invalid I64 data".into()))
}
}
SerialTypeKind::F64 => {
if data.len() >= 8 {
let bits = u64::from_be_bytes([
data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7],
]);
let value = f64::from_bits(bits);
Ok(RefValue::Float(value))
} else {
Err(LimboError::Corrupt("Invalid F64 data".into()))
}
}
SerialTypeKind::Text => {
let raw_slice = RawSlice::new(data.as_ptr(), data.len());
Ok(RefValue::Text(TextRef {
value: raw_slice,
subtype: TextSubtype::Text,
}))
}
SerialTypeKind::Blob => {
// Create RawSlice pointing to the data
let raw_slice = RawSlice::new(data.as_ptr(), data.len());
Ok(RefValue::Blob(raw_slice))
}
pub fn get_values(&self) -> Vec<RefValue> {
let mut cursor = RecordCursor::new();
match cursor.get_values(self) {
Ok(values) => values,
Err(err) => Vec::new(),
}
}
fn ensure_parsed_upto(&mut self, target_idx: usize) -> Result<()> {
if self.payload.is_empty() {
return Ok(());
}
if self.header_parsed_count == 0 && self.header_offsets.is_empty() {
let (header_size, bytes_read) = read_varint(&self.payload)?;
self.header_offset = bytes_read;
self.header_offsets.push(header_size as u32);
self.header_offsets.push(header_size as u32);
}
if !self.header_offsets.is_empty() {
let header_size = self.header_offsets[0] as usize;
while self.header_parsed_count <= target_idx
&& self.header_offset < header_size
&& self.header_offset < self.payload.len()
{
let (serial_type_u64, bytes_read) =
match read_varint(&self.payload[self.header_offset..]) {
Ok(result) => result,
Err(_) => break,
};
let serial_type = serial_type_u64 as u32;
self.header_offset += bytes_read;
self.serial_types.push(serial_type);
let serial_type_obj = match SerialType::try_from(serial_type_u64) {
Ok(st) => st,
Err(_) => break,
};
let data_size = serial_type_obj.size();
let current_offset = self.header_offsets.last().copied().unwrap_or(0);
self.header_offsets.push(current_offset + data_size as u32);
self.header_parsed_count += 1;
}
}
Ok(())
}
pub fn count(&mut self) -> usize {
if self.is_invalidated() {
return 0;
}
if let Ok(()) = self.parse_full_header() {
self.serial_types.len()
} else {
0
}
}
pub fn last_value(&mut self) -> Option<Result<RefValue>> {
let count = self.count();
if count > 0 {
Some(self.get_value_opt(count - 1)?)
} else {
None
}
}
pub fn get_values(&mut self) -> Result<Vec<RefValue>> {
if self.is_invalidated() {
return Ok(Vec::new());
}
self.parse_full_header()?;
let mut values = Vec::with_capacity(self.serial_types.len());
for i in 0..self.serial_types.len() {
values.push(self.get_value(i)?);
}
Ok(values)
}
pub fn get_value(&mut self, idx: usize) -> Result<RefValue> {
if self.is_invalidated() {
return Err(LimboError::InternalError("Record not initialized".into()));
}
self.ensure_parsed_upto(idx)?;
if idx >= self.serial_types.len() {
return Ok(RefValue::Null);
}
self.deserialize_column(idx)
}
pub fn get_value_opt(&mut self, idx: usize) -> Option<Result<RefValue>> {
if self.is_invalidated() {
return None;
}
match self.ensure_parsed_upto(idx) {
Ok(()) => {
if idx >= self.serial_types.len() {
return None;
}
Some(self.get_value(idx))
}
Err(e) => Some(Err(e)),
}
}
pub fn len(&mut self) -> usize {
self.count()
}
pub fn is_empty(&self) -> bool {
self.is_invalidated() || self.payload.is_empty()
}
pub fn is_empty(&self) -> bool {
self.values.is_empty()
}
pub fn from_registers<'a>(
registers: impl IntoIterator<Item = &'a Register> + Copy,
len: usize,
) -> Self {
let mut values = Vec::with_capacity(len);
let mut serials = Vec::with_capacity(len);
pub fn from_registers(registers: &[Register]) -> Self {
let mut values = Vec::with_capacity(registers.len());
let mut serials = Vec::with_capacity(registers.len());
let mut size_header = 0;
let mut size_values = 0;
@@ -1248,21 +968,11 @@ impl ImmutableRecord {
}
writer.assert_finish_capacity();
Self {
payload: Value::Blob(buf),
header_offsets: Vec::new(),
serial_types: Vec::new(),
header_parsed_count: 0,
header_offset: 0,
}
}
pub fn start_serialization(&mut self, payload: &[u8]) {
self.payload.as_blob_mut().extend_from_slice(payload);
self.header_offsets.clear();
self.serial_types.clear();
self.header_parsed_count = 0;
self.header_offset = 0;
}
pub fn end_serialization(&mut self) {}
@@ -1270,23 +980,14 @@ impl ImmutableRecord {
pub fn add_value(&mut self, value: RefValue) {}
pub fn invalidate(&mut self) {
self.payload.as_blob_mut().clear();
self.header_offsets.clear();
self.serial_types.clear();
self.header_parsed_count = 0;
self.header_offset = 0;
}
self.payload.as_blob_mut().clear(); }
pub fn is_invalidated(&self) -> bool {
self.payload.as_blob().is_empty() && self.header_offsets.is_empty()
self.payload.as_blob_mut.is_empty() && self.header_offsets.is_empty()
}
pub fn get_payload(&self) -> &[u8] {
self.payload.as_blob()
}
pub fn as_blob_value(&self) -> &Value {
&self.payload
&self.payload.as_blob()
}
pub fn debug_string(&mut self) -> String {
@@ -1294,52 +995,262 @@ impl ImmutableRecord {
return "ImmutableRecord { invalidated }".to_string();
}
match self.get_values() {
Ok(values) => values
.iter()
.enumerate()
.map(|(_i, v)| match v {
RefValue::Null => "NULL".to_string(),
RefValue::Integer(i) => format!("Integer({})", i),
RefValue::Float(f) => format!("Float({})", f),
RefValue::Text(t) => format!("Text({})", t.as_str()),
RefValue::Blob(b) => format!("Blob({})", String::from_utf8_lossy(b.to_slice())),
if !self.header_offsets.is_empty() {
let header_size = self.header_offsets[0] as usize;
while self.header_offset < header_size && self.header_offset < payload.len() {
let (serial_type, bytes_read) = read_varint(&payload[self.header_offset..])?;
self.header_offset += bytes_read;
self.serial_types.push(serial_type as u32);
let serial_type_obj = SerialType::try_from(serial_type)?;
let column_data_size = serial_type_obj.size();
let current_offset = self.header_offsets.last().copied().unwrap_or(0);
self.header_offsets
.push(current_offset + column_data_size as u32);
self.header_parsed_count += 1;
}
}
Ok(())
}
pub fn ensure_parsed_upto(
&mut self,
record: &ImmutableRecord,
target_idx: usize,
) -> Result<()> {
let payload = record.get_payload();
if payload.is_empty() {
return Ok(());
}
if self.header_parsed_count == 0 && self.header_offsets.is_empty() {
let (header_size, bytes_read) = read_varint(payload)?;
self.header_offset = bytes_read;
self.header_offsets.push(header_size as u32);
self.header_offsets.push(header_size as u32);
}
if !self.header_offsets.is_empty() {
let header_size = self.header_offsets[0] as usize;
while self.header_parsed_count <= target_idx
&& self.header_offset < header_size
&& self.header_offset < payload.len()
{
let (serial_type, bytes_read) = match read_varint(&payload[self.header_offset..]) {
Ok(result) => result,
Err(_) => break,
};
self.header_offset += bytes_read;
self.serial_types.push(serial_type as u32);
let serial_type_obj = match SerialType::try_from(serial_type) {
Ok(st) => st,
Err(_) => break,
};
let data_size = serial_type_obj.size();
let current_offset = self.header_offsets.last().copied().unwrap_or(0);
self.header_offsets.push(current_offset + data_size as u32);
self.header_parsed_count += 1;
}
}
Ok(())
}
pub fn deserialize_column(&self, record: &ImmutableRecord, idx: usize) -> Result<RefValue> {
if idx >= self.serial_types.len() || idx + 2 >= self.header_offsets.len() {
return Ok(RefValue::Null);
}
let serial_type = self.serial_types[idx];
let start_offset = self.header_offsets[idx + 1] as usize;
let end_offset = self.header_offsets[idx + 2] as usize;
let payload = record.get_payload();
if start_offset >= payload.len() || end_offset > payload.len() {
return Err(LimboError::InternalError(
"Column offset out of bounds".into(),
));
}
let column_data = &payload[start_offset..end_offset];
self.deserialize_column_data(serial_type, column_data)
}
fn deserialize_column_data(&self, serial_type: u32, data: &[u8]) -> Result<RefValue> {
let serial_type_obj = SerialType::try_from(serial_type as u64)?;
let value = match serial_type_obj.kind() {
SerialTypeKind::Null => RefValue::Null,
SerialTypeKind::ConstInt0 => RefValue::Integer(0),
SerialTypeKind::ConstInt1 => RefValue::Integer(1),
SerialTypeKind::I8 => {
if data.is_empty() {
crate::bail_corrupt_error!("Invalid UInt8 value");
}
let val = data[0] as i8;
RefValue::Integer(val as i64)
}
SerialTypeKind::I16 => {
if data.len() != 2 {
crate::bail_corrupt_error!("Invalid UInt16 value");
}
let val = i16::from_be_bytes([data[0], data[1]]);
RefValue::Integer(val as i64)
}
SerialTypeKind::I24 => {
if data.len() != 3 {
crate::bail_corrupt_error!("Invalid BEInt24 value");
}
let sign_extension = if data[0] <= 127 { 0 } else { 255 };
let val = i32::from_be_bytes([sign_extension, data[0], data[1], data[2]]);
RefValue::Integer(val as i64)
}
SerialTypeKind::I32 => {
if data.len() != 4 {
crate::bail_corrupt_error!("Invalid BEInt32 value");
}
let val = i32::from_be_bytes([data[0], data[1], data[2], data[3]]);
RefValue::Integer(val as i64)
}
SerialTypeKind::I48 => {
if data.len() != 6 {
crate::bail_corrupt_error!("Invalid BEInt48 value");
}
let sign_extension = if data[0] <= 127 { 0 } else { 255 };
let val = i64::from_be_bytes([
sign_extension,
sign_extension,
data[0],
data[1],
data[2],
data[3],
data[4],
data[5],
]);
RefValue::Integer(val)
}
SerialTypeKind::I64 => {
if data.len() < 8 {
crate::bail_corrupt_error!("Invalid BEInt64 value");
}
RefValue::Integer(i64::from_be_bytes([
data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7],
]))
}
SerialTypeKind::F64 => {
if data.len() < 8 {
crate::bail_corrupt_error!("Invalid BEFloat64 value");
}
RefValue::Float(f64::from_be_bytes([
data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7],
]))
}
SerialTypeKind::Blob => {
let content_size = serial_type_obj.size();
if data.len() < content_size {
crate::bail_corrupt_error!("Invalid Blob value");
}
if content_size == 0 {
RefValue::Blob(RawSlice::new(std::ptr::null(), 0))
} else {
let ptr = &data[0] as *const u8;
let slice = RawSlice::new(ptr, content_size);
RefValue::Blob(slice)
}
}
SerialTypeKind::Text => {
let content_size = serial_type_obj.size();
if data.len() < content_size {
crate::bail_corrupt_error!(
"Invalid String value, length {} < expected length {}",
data.len(),
content_size
);
}
let slice = if content_size == 0 {
RawSlice::new(std::ptr::null(), 0)
} else {
let ptr = &data[0] as *const u8;
RawSlice::new(ptr, content_size)
};
RefValue::Text(TextRef {
value: slice,
subtype: TextSubtype::Text,
})
.collect::<Vec<_>>()
.join(", "),
Err(e) => format!("ImmutableRecord {{ error: {} }}", e),
}
};
Ok(value)
}
pub fn get_value(&mut self, record: &ImmutableRecord, idx: usize) -> Result<RefValue> {
if record.is_invalidated() {
return Err(LimboError::InternalError("Record not initialized".into()));
}
self.ensure_parsed_upto(record, idx)?;
if idx >= self.serial_types.len() {
return Ok(RefValue::Null);
}
self.deserialize_column(record, idx)
}
pub fn get_value_opt(
&mut self,
record: &ImmutableRecord,
idx: usize,
) -> Option<Result<RefValue>> {
if record.is_invalidated() {
return None;
}
match self.ensure_parsed_upto(record, idx) {
Ok(()) => {
if idx >= self.serial_types.len() {
return None;
}
Some(self.get_value(record, idx))
}
Err(e) => Some(Err(e)),
}
}
}
// &mut requirement for ImmutableRecord is problematic for this.
// for now we use debug_string()
// impl Display for ImmutableRecord {
// fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// let values = match self.get_values() {
// Ok(values) => values,
// Err(e) => {
// eprintln!("Failed to get values for display: {}", e); // or use a logger
// return Err(std::fmt::Error);
// }
// };
// for value in values {
// match value {
// RefValue::Null => write!(f, "NULL")?,
// RefValue::Integer(i) => write!(f, "Integer({})", i)?,
// RefValue::Float(flo) => write!(f, "Float({})", flo)?,
// RefValue::Text(text_ref) => write!(f, "Text({})", text_ref.as_str())?,
// RefValue::Blob(raw_slice) => {
// write!(f, "Blob({})", String::from_utf8_lossy(raw_slice.to_slice()))?
// }
// }
// if value != *values.last().unwrap() {
// write!(f, ", ")?;
// }
// }
// Ok(())
// }
// }
pub fn count(&mut self, record: &ImmutableRecord) -> usize {
if record.is_invalidated() {
return 0;
}
if let Ok(()) = self.parse_full_header(record) {
self.serial_types.len()
} else {
0
}
}
pub fn get_values(&mut self, record: &ImmutableRecord) -> Result<Vec<RefValue>> {
if record.is_invalidated() {
return Ok(Vec::new());
}
self.parse_full_header(record)?;
let mut values = Vec::with_capacity(self.serial_types.len());
for i in 0..self.serial_types.len() {
values.push(self.get_value(record, i)?);
}
Ok(values)
}
pub fn len(&mut self, record: &ImmutableRecord) -> usize {
self.count(record)
}
}
impl RefValue {
pub fn to_ffi(&self) -> ExtValue {
@@ -1988,6 +1899,8 @@ mod tests {
assert_eq!(
buf.len(),
header_length // 9 bytes (header size + 8 serial types)
+ 0 // ConstInt0: 0 bytes
+ 0 // ConstInt1: 0 bytes
+ size_of::<i8>() // I8: 1 byte
+ size_of::<i16>() // I16: 2 bytes
+ (size_of::<i32>() - 1) // I24: 3 bytes