diff --git a/core/types.rs b/core/types.rs index 5ad89fb08..e31dd548f 100644 --- a/core/types.rs +++ b/core/types.rs @@ -761,10 +761,6 @@ pub struct ImmutableRecord { // // payload is the Vec but in order to use Register which holds ImmutableRecord as a Value - we store Vec as Value::Blob payload: Value, - header_offsets: Vec, - serial_types: Vec, - header_parsed_count: usize, - header_offset: usize, } // impl Debug for ImmutableRecord { @@ -850,299 +846,23 @@ impl<'a> AppendWriter<'a> { } impl ImmutableRecord { - pub fn new(payload_capacity: usize, value_capacity: usize) -> Self { + pub fn new(payload_capacity: usize) -> Self { Self { - payload: Value::Blob(Vec::with_capacity(payload_capacity)), - header_offsets: Vec::with_capacity(value_capacity + 1), - serial_types: Vec::with_capacity(value_capacity), - header_parsed_count: 0, - header_offset: 0, + payload: Vec::with_capacity(payload_capacity), } } - pub fn get>(&mut self, idx: usize) -> Result { - let value = self.get_value(idx)?; - T::try_from(value) - } - - fn parse_full_header(&mut self) -> Result<()> { - if self.payload.is_empty() { - return Ok(()); - } - - if self.header_parsed_count == 0 && self.header_offsets.is_empty() { - let (header_size, bytes_read) = read_varint(&self.payload)?; - self.header_offset = bytes_read; - self.header_offsets.push(header_size as u32); - } - - if !self.header_offsets.is_empty() { - let header_size = self.header_offsets[0] as usize; - - while self.header_offset < header_size && self.header_offset < self.payload.len() { - let (serial_type_u64, bytes_read) = - read_varint(&self.payload[self.header_offset..])?; - let serial_type = serial_type_u64 as u32; - self.header_offset += bytes_read; - self.serial_types.push(serial_type); - - let serial_type_obj = SerialType::try_from(serial_type_u64)?; - let column_data_size = serial_type_obj.size(); - - let current_offset = self.header_offsets.last().copied().unwrap_or(0); - self.header_offsets - .push(current_offset + column_data_size as u32); - self.header_parsed_count += 1; - } - } - - Ok(()) - } - - fn deserialize_column(&self, idx: usize) -> Result { - if idx >= self.serial_types.len() || idx + 2 >= self.header_offsets.len() { - return Ok(RefValue::Null); - } - - let serial_type = self.serial_types[idx]; - let start_offset = self.header_offsets[idx + 1] as usize; - let end_offset = self.header_offsets[idx + 2] as usize; - - if start_offset > self.payload.len() || end_offset > self.payload.len() { - return Err(LimboError::Corrupt("Column data out of bounds".into())); - } - - let column_data = &self.payload[start_offset..end_offset]; - self.deserialize_column_data(serial_type, column_data) - } - - fn deserialize_column_data(&self, serial_type: u32, data: &[u8]) -> Result { - let serial_type_obj = SerialType::try_from(serial_type as u64)?; - - match serial_type_obj.kind() { - SerialTypeKind::Null => Ok(RefValue::Null), - SerialTypeKind::ConstInt0 => Ok(RefValue::Integer(0)), - SerialTypeKind::ConstInt1 => Ok(RefValue::Integer(1)), - SerialTypeKind::I8 => { - if data.len() >= 1 { - Ok(RefValue::Integer(data[0] as i8 as i64)) - } else { - Err(LimboError::Corrupt("Invalid I8 data".into())) - } - } - SerialTypeKind::I16 => { - if data.len() >= 2 { - let value = i16::from_be_bytes([data[0], data[1]]) as i64; - Ok(RefValue::Integer(value)) - } else { - Err(LimboError::Corrupt("Invalid I16 data".into())) - } - } - SerialTypeKind::I24 => { - if data.len() >= 3 { - // Sign extend the 24-bit value - let mut bytes = [0u8; 4]; - bytes[1..4].copy_from_slice(data); - let value = i32::from_be_bytes(bytes); - // Sign extend from 24-bit to 32-bit - let value = if (value & 0x00800000) != 0 { - value | 0xFF000000u32 as i32 - } else { - value - }; - Ok(RefValue::Integer(value as i64)) - } else { - Err(LimboError::Corrupt("Invalid I24 data".into())) - } - } - SerialTypeKind::I32 => { - if data.len() >= 4 { - let value = i32::from_be_bytes([data[0], data[1], data[2], data[3]]) as i64; - Ok(RefValue::Integer(value)) - } else { - Err(LimboError::Corrupt("Invalid I32 data".into())) - } - } - SerialTypeKind::I48 => { - if data.len() >= 6 { - let mut bytes = [0u8; 8]; - bytes[2..8].copy_from_slice(data); - let value = i64::from_be_bytes(bytes); - let value = if (value & 0x0000800000000000) != 0 { - value | 0xFFFF000000000000u64 as i64 - } else { - value - }; - Ok(RefValue::Integer(value)) - } else { - Err(LimboError::Corrupt("Invalid I48 data".into())) - } - } - SerialTypeKind::I64 => { - if data.len() >= 8 { - let value = i64::from_be_bytes([ - data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7], - ]); - Ok(RefValue::Integer(value)) - } else { - Err(LimboError::Corrupt("Invalid I64 data".into())) - } - } - SerialTypeKind::F64 => { - if data.len() >= 8 { - let bits = u64::from_be_bytes([ - data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7], - ]); - let value = f64::from_bits(bits); - Ok(RefValue::Float(value)) - } else { - Err(LimboError::Corrupt("Invalid F64 data".into())) - } - } - SerialTypeKind::Text => { - let raw_slice = RawSlice::new(data.as_ptr(), data.len()); - Ok(RefValue::Text(TextRef { - value: raw_slice, - subtype: TextSubtype::Text, - })) - } - SerialTypeKind::Blob => { - // Create RawSlice pointing to the data - let raw_slice = RawSlice::new(data.as_ptr(), data.len()); - Ok(RefValue::Blob(raw_slice)) - } + pub fn get_values(&self) -> Vec { + let mut cursor = RecordCursor::new(); + match cursor.get_values(self) { + Ok(values) => values, + Err(err) => Vec::new(), } } - fn ensure_parsed_upto(&mut self, target_idx: usize) -> Result<()> { - if self.payload.is_empty() { - return Ok(()); - } - - if self.header_parsed_count == 0 && self.header_offsets.is_empty() { - let (header_size, bytes_read) = read_varint(&self.payload)?; - self.header_offset = bytes_read; - self.header_offsets.push(header_size as u32); - - self.header_offsets.push(header_size as u32); - } - - if !self.header_offsets.is_empty() { - let header_size = self.header_offsets[0] as usize; - - while self.header_parsed_count <= target_idx - && self.header_offset < header_size - && self.header_offset < self.payload.len() - { - let (serial_type_u64, bytes_read) = - match read_varint(&self.payload[self.header_offset..]) { - Ok(result) => result, - Err(_) => break, - }; - let serial_type = serial_type_u64 as u32; - self.header_offset += bytes_read; - - self.serial_types.push(serial_type); - - let serial_type_obj = match SerialType::try_from(serial_type_u64) { - Ok(st) => st, - Err(_) => break, - }; - let data_size = serial_type_obj.size(); - - let current_offset = self.header_offsets.last().copied().unwrap_or(0); - self.header_offsets.push(current_offset + data_size as u32); - - self.header_parsed_count += 1; - } - } - - Ok(()) - } - - pub fn count(&mut self) -> usize { - if self.is_invalidated() { - return 0; - } - - if let Ok(()) = self.parse_full_header() { - self.serial_types.len() - } else { - 0 - } - } - - pub fn last_value(&mut self) -> Option> { - let count = self.count(); - if count > 0 { - Some(self.get_value_opt(count - 1)?) - } else { - None - } - } - - pub fn get_values(&mut self) -> Result> { - if self.is_invalidated() { - return Ok(Vec::new()); - } - - self.parse_full_header()?; - - let mut values = Vec::with_capacity(self.serial_types.len()); - for i in 0..self.serial_types.len() { - values.push(self.get_value(i)?); - } - Ok(values) - } - - pub fn get_value(&mut self, idx: usize) -> Result { - if self.is_invalidated() { - return Err(LimboError::InternalError("Record not initialized".into())); - } - - self.ensure_parsed_upto(idx)?; - - if idx >= self.serial_types.len() { - return Ok(RefValue::Null); - } - - self.deserialize_column(idx) - } - - pub fn get_value_opt(&mut self, idx: usize) -> Option> { - if self.is_invalidated() { - return None; - } - - match self.ensure_parsed_upto(idx) { - Ok(()) => { - if idx >= self.serial_types.len() { - return None; - } - Some(self.get_value(idx)) - } - Err(e) => Some(Err(e)), - } - } - - pub fn len(&mut self) -> usize { - self.count() - } - - pub fn is_empty(&self) -> bool { - self.is_invalidated() || self.payload.is_empty() - } - - pub fn is_empty(&self) -> bool { - self.values.is_empty() - } - - pub fn from_registers<'a>( - registers: impl IntoIterator + Copy, - len: usize, - ) -> Self { - let mut values = Vec::with_capacity(len); - let mut serials = Vec::with_capacity(len); + pub fn from_registers(registers: &[Register]) -> Self { + let mut values = Vec::with_capacity(registers.len()); + let mut serials = Vec::with_capacity(registers.len()); let mut size_header = 0; let mut size_values = 0; @@ -1248,21 +968,11 @@ impl ImmutableRecord { } writer.assert_finish_capacity(); - Self { payload: Value::Blob(buf), - header_offsets: Vec::new(), - serial_types: Vec::new(), - header_parsed_count: 0, - header_offset: 0, - } } pub fn start_serialization(&mut self, payload: &[u8]) { self.payload.as_blob_mut().extend_from_slice(payload); - self.header_offsets.clear(); - self.serial_types.clear(); - self.header_parsed_count = 0; - self.header_offset = 0; } pub fn end_serialization(&mut self) {} @@ -1270,23 +980,14 @@ impl ImmutableRecord { pub fn add_value(&mut self, value: RefValue) {} pub fn invalidate(&mut self) { - self.payload.as_blob_mut().clear(); - self.header_offsets.clear(); - self.serial_types.clear(); - self.header_parsed_count = 0; - self.header_offset = 0; - } + self.payload.as_blob_mut().clear(); } pub fn is_invalidated(&self) -> bool { - self.payload.as_blob().is_empty() && self.header_offsets.is_empty() + self.payload.as_blob_mut.is_empty() && self.header_offsets.is_empty() } pub fn get_payload(&self) -> &[u8] { - self.payload.as_blob() - } - - pub fn as_blob_value(&self) -> &Value { - &self.payload + &self.payload.as_blob() } pub fn debug_string(&mut self) -> String { @@ -1294,52 +995,262 @@ impl ImmutableRecord { return "ImmutableRecord { invalidated }".to_string(); } - match self.get_values() { - Ok(values) => values - .iter() - .enumerate() - .map(|(_i, v)| match v { - RefValue::Null => "NULL".to_string(), - RefValue::Integer(i) => format!("Integer({})", i), - RefValue::Float(f) => format!("Float({})", f), - RefValue::Text(t) => format!("Text({})", t.as_str()), - RefValue::Blob(b) => format!("Blob({})", String::from_utf8_lossy(b.to_slice())), + if !self.header_offsets.is_empty() { + let header_size = self.header_offsets[0] as usize; + + while self.header_offset < header_size && self.header_offset < payload.len() { + let (serial_type, bytes_read) = read_varint(&payload[self.header_offset..])?; + self.header_offset += bytes_read; + self.serial_types.push(serial_type as u32); + + let serial_type_obj = SerialType::try_from(serial_type)?; + let column_data_size = serial_type_obj.size(); + + let current_offset = self.header_offsets.last().copied().unwrap_or(0); + self.header_offsets + .push(current_offset + column_data_size as u32); + self.header_parsed_count += 1; + } + } + + Ok(()) + } + + pub fn ensure_parsed_upto( + &mut self, + record: &ImmutableRecord, + target_idx: usize, + ) -> Result<()> { + let payload = record.get_payload(); + if payload.is_empty() { + return Ok(()); + } + + if self.header_parsed_count == 0 && self.header_offsets.is_empty() { + let (header_size, bytes_read) = read_varint(payload)?; + self.header_offset = bytes_read; + self.header_offsets.push(header_size as u32); + self.header_offsets.push(header_size as u32); + } + + if !self.header_offsets.is_empty() { + let header_size = self.header_offsets[0] as usize; + + while self.header_parsed_count <= target_idx + && self.header_offset < header_size + && self.header_offset < payload.len() + { + let (serial_type, bytes_read) = match read_varint(&payload[self.header_offset..]) { + Ok(result) => result, + Err(_) => break, + }; + + self.header_offset += bytes_read; + self.serial_types.push(serial_type as u32); + + let serial_type_obj = match SerialType::try_from(serial_type) { + Ok(st) => st, + Err(_) => break, + }; + let data_size = serial_type_obj.size(); + let current_offset = self.header_offsets.last().copied().unwrap_or(0); + self.header_offsets.push(current_offset + data_size as u32); + + self.header_parsed_count += 1; + } + } + Ok(()) + } + + pub fn deserialize_column(&self, record: &ImmutableRecord, idx: usize) -> Result { + if idx >= self.serial_types.len() || idx + 2 >= self.header_offsets.len() { + return Ok(RefValue::Null); + } + + let serial_type = self.serial_types[idx]; + let start_offset = self.header_offsets[idx + 1] as usize; + let end_offset = self.header_offsets[idx + 2] as usize; + let payload = record.get_payload(); + + if start_offset >= payload.len() || end_offset > payload.len() { + return Err(LimboError::InternalError( + "Column offset out of bounds".into(), + )); + } + + let column_data = &payload[start_offset..end_offset]; + self.deserialize_column_data(serial_type, column_data) + } + + fn deserialize_column_data(&self, serial_type: u32, data: &[u8]) -> Result { + let serial_type_obj = SerialType::try_from(serial_type as u64)?; + let value = match serial_type_obj.kind() { + SerialTypeKind::Null => RefValue::Null, + SerialTypeKind::ConstInt0 => RefValue::Integer(0), + SerialTypeKind::ConstInt1 => RefValue::Integer(1), + SerialTypeKind::I8 => { + if data.is_empty() { + crate::bail_corrupt_error!("Invalid UInt8 value"); + } + let val = data[0] as i8; + RefValue::Integer(val as i64) + } + SerialTypeKind::I16 => { + if data.len() != 2 { + crate::bail_corrupt_error!("Invalid UInt16 value"); + } + let val = i16::from_be_bytes([data[0], data[1]]); + RefValue::Integer(val as i64) + } + SerialTypeKind::I24 => { + if data.len() != 3 { + crate::bail_corrupt_error!("Invalid BEInt24 value"); + } + let sign_extension = if data[0] <= 127 { 0 } else { 255 }; + let val = i32::from_be_bytes([sign_extension, data[0], data[1], data[2]]); + RefValue::Integer(val as i64) + } + SerialTypeKind::I32 => { + if data.len() != 4 { + crate::bail_corrupt_error!("Invalid BEInt32 value"); + } + let val = i32::from_be_bytes([data[0], data[1], data[2], data[3]]); + RefValue::Integer(val as i64) + } + SerialTypeKind::I48 => { + if data.len() != 6 { + crate::bail_corrupt_error!("Invalid BEInt48 value"); + } + let sign_extension = if data[0] <= 127 { 0 } else { 255 }; + let val = i64::from_be_bytes([ + sign_extension, + sign_extension, + data[0], + data[1], + data[2], + data[3], + data[4], + data[5], + ]); + RefValue::Integer(val) + } + SerialTypeKind::I64 => { + if data.len() < 8 { + crate::bail_corrupt_error!("Invalid BEInt64 value"); + } + RefValue::Integer(i64::from_be_bytes([ + data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7], + ])) + } + SerialTypeKind::F64 => { + if data.len() < 8 { + crate::bail_corrupt_error!("Invalid BEFloat64 value"); + } + RefValue::Float(f64::from_be_bytes([ + data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7], + ])) + } + SerialTypeKind::Blob => { + let content_size = serial_type_obj.size(); + if data.len() < content_size { + crate::bail_corrupt_error!("Invalid Blob value"); + } + if content_size == 0 { + RefValue::Blob(RawSlice::new(std::ptr::null(), 0)) + } else { + let ptr = &data[0] as *const u8; + let slice = RawSlice::new(ptr, content_size); + RefValue::Blob(slice) + } + } + SerialTypeKind::Text => { + let content_size = serial_type_obj.size(); + if data.len() < content_size { + crate::bail_corrupt_error!( + "Invalid String value, length {} < expected length {}", + data.len(), + content_size + ); + } + let slice = if content_size == 0 { + RawSlice::new(std::ptr::null(), 0) + } else { + let ptr = &data[0] as *const u8; + RawSlice::new(ptr, content_size) + }; + RefValue::Text(TextRef { + value: slice, + subtype: TextSubtype::Text, }) - .collect::>() - .join(", "), - Err(e) => format!("ImmutableRecord {{ error: {} }}", e), + } + }; + Ok(value) + } + + pub fn get_value(&mut self, record: &ImmutableRecord, idx: usize) -> Result { + if record.is_invalidated() { + return Err(LimboError::InternalError("Record not initialized".into())); + } + + self.ensure_parsed_upto(record, idx)?; + + if idx >= self.serial_types.len() { + return Ok(RefValue::Null); + } + + self.deserialize_column(record, idx) + } + + pub fn get_value_opt( + &mut self, + record: &ImmutableRecord, + idx: usize, + ) -> Option> { + if record.is_invalidated() { + return None; + } + + match self.ensure_parsed_upto(record, idx) { + Ok(()) => { + if idx >= self.serial_types.len() { + return None; + } + Some(self.get_value(record, idx)) + } + Err(e) => Some(Err(e)), } } -} -// &mut requirement for ImmutableRecord is problematic for this. -// for now we use debug_string() -// impl Display for ImmutableRecord { -// fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { -// let values = match self.get_values() { -// Ok(values) => values, -// Err(e) => { -// eprintln!("Failed to get values for display: {}", e); // or use a logger -// return Err(std::fmt::Error); -// } -// }; -// for value in values { -// match value { -// RefValue::Null => write!(f, "NULL")?, -// RefValue::Integer(i) => write!(f, "Integer({})", i)?, -// RefValue::Float(flo) => write!(f, "Float({})", flo)?, -// RefValue::Text(text_ref) => write!(f, "Text({})", text_ref.as_str())?, -// RefValue::Blob(raw_slice) => { -// write!(f, "Blob({})", String::from_utf8_lossy(raw_slice.to_slice()))? -// } -// } -// if value != *values.last().unwrap() { -// write!(f, ", ")?; -// } -// } -// Ok(()) -// } -// } + pub fn count(&mut self, record: &ImmutableRecord) -> usize { + if record.is_invalidated() { + return 0; + } + + if let Ok(()) = self.parse_full_header(record) { + self.serial_types.len() + } else { + 0 + } + } + + pub fn get_values(&mut self, record: &ImmutableRecord) -> Result> { + if record.is_invalidated() { + return Ok(Vec::new()); + } + + self.parse_full_header(record)?; + + let mut values = Vec::with_capacity(self.serial_types.len()); + for i in 0..self.serial_types.len() { + values.push(self.get_value(record, i)?); + } + Ok(values) + } + + pub fn len(&mut self, record: &ImmutableRecord) -> usize { + self.count(record) + } +} impl RefValue { pub fn to_ffi(&self) -> ExtValue { @@ -1988,6 +1899,8 @@ mod tests { assert_eq!( buf.len(), header_length // 9 bytes (header size + 8 serial types) + + 0 // ConstInt0: 0 bytes + + 0 // ConstInt1: 0 bytes + size_of::() // I8: 1 byte + size_of::() // I16: 2 bytes + (size_of::() - 1) // I24: 3 bytes