diff --git a/core/types.rs b/core/types.rs index a66dba430..5ad89fb08 100644 --- a/core/types.rs +++ b/core/types.rs @@ -8,7 +8,7 @@ use crate::ext::{ExtValue, ExtValueType}; use crate::pseudo::PseudoCursor; use crate::schema::Index; use crate::storage::btree::BTreeCursor; -use crate::storage::sqlite3_ondisk::write_varint; +use crate::storage::sqlite3_ondisk::{read_varint, write_varint}; use crate::translate::collate::CollationSeq; use crate::translate::plan::IterationDirection; use crate::vdbe::sorter::Sorter; @@ -753,7 +753,7 @@ impl<'a> TryFrom<&'a RefValue> for &'a str { /// A value in a record that has already been serialized can stay serialized and what this struct offsers /// is easy acces to each value which point to the payload. /// The name might be contradictory as it is immutable in the sense that you cannot modify the values without modifying the payload. -#[derive(Eq, Ord, PartialEq, PartialOrd)] +#[derive(Debug, Clone, Eq, Ord, PartialEq, PartialOrd)] pub struct ImmutableRecord { // We have to be super careful with this buffer since we make values point to the payload we need to take care reallocations // happen in a controlled manner. If we realocate with values that should be correct, they will now point to undefined data. @@ -761,18 +761,20 @@ pub struct ImmutableRecord { // // payload is the Vec but in order to use Register which holds ImmutableRecord as a Value - we store Vec as Value::Blob payload: Value, - pub values: Vec, - recreating: bool, + header_offsets: Vec, + serial_types: Vec, + header_parsed_count: usize, + header_offset: usize, } -impl Debug for ImmutableRecord { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("ImmutableRecord") - .field("values", &self.values) - .field("recreating", &self.recreating) - .finish() - } -} +// impl Debug for ImmutableRecord { +// fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +// f.debug_struct("ImmutableRecord") +// .field("values", &self.values) +// .field("recreating", &self.recreating) +// .finish() +// } +// } #[derive(PartialEq)] pub enum ParseRecordState { @@ -851,44 +853,284 @@ impl ImmutableRecord { pub fn new(payload_capacity: usize, value_capacity: usize) -> Self { Self { payload: Value::Blob(Vec::with_capacity(payload_capacity)), - values: Vec::with_capacity(value_capacity), - recreating: false, + header_offsets: Vec::with_capacity(value_capacity + 1), + serial_types: Vec::with_capacity(value_capacity), + header_parsed_count: 0, + header_offset: 0, } } - pub fn get<'a, T: TryFrom<&'a RefValue, Error = LimboError> + 'a>( - &'a self, - idx: usize, - ) -> Result { - let value = self - .values - .get(idx) - .ok_or(LimboError::InternalError("Index out of bounds".into()))?; + pub fn get>(&mut self, idx: usize) -> Result { + let value = self.get_value(idx)?; T::try_from(value) } - pub fn count(&self) -> usize { - self.values.len() + fn parse_full_header(&mut self) -> Result<()> { + if self.payload.is_empty() { + return Ok(()); + } + + if self.header_parsed_count == 0 && self.header_offsets.is_empty() { + let (header_size, bytes_read) = read_varint(&self.payload)?; + self.header_offset = bytes_read; + self.header_offsets.push(header_size as u32); + } + + if !self.header_offsets.is_empty() { + let header_size = self.header_offsets[0] as usize; + + while self.header_offset < header_size && self.header_offset < self.payload.len() { + let (serial_type_u64, bytes_read) = + read_varint(&self.payload[self.header_offset..])?; + let serial_type = serial_type_u64 as u32; + self.header_offset += bytes_read; + self.serial_types.push(serial_type); + + let serial_type_obj = SerialType::try_from(serial_type_u64)?; + let column_data_size = serial_type_obj.size(); + + let current_offset = self.header_offsets.last().copied().unwrap_or(0); + self.header_offsets + .push(current_offset + column_data_size as u32); + self.header_parsed_count += 1; + } + } + + Ok(()) } - pub fn last_value(&self) -> Option<&RefValue> { - self.values.last() + fn deserialize_column(&self, idx: usize) -> Result { + if idx >= self.serial_types.len() || idx + 2 >= self.header_offsets.len() { + return Ok(RefValue::Null); + } + + let serial_type = self.serial_types[idx]; + let start_offset = self.header_offsets[idx + 1] as usize; + let end_offset = self.header_offsets[idx + 2] as usize; + + if start_offset > self.payload.len() || end_offset > self.payload.len() { + return Err(LimboError::Corrupt("Column data out of bounds".into())); + } + + let column_data = &self.payload[start_offset..end_offset]; + self.deserialize_column_data(serial_type, column_data) } - pub fn get_values(&self) -> &Vec { - &self.values + fn deserialize_column_data(&self, serial_type: u32, data: &[u8]) -> Result { + let serial_type_obj = SerialType::try_from(serial_type as u64)?; + + match serial_type_obj.kind() { + SerialTypeKind::Null => Ok(RefValue::Null), + SerialTypeKind::ConstInt0 => Ok(RefValue::Integer(0)), + SerialTypeKind::ConstInt1 => Ok(RefValue::Integer(1)), + SerialTypeKind::I8 => { + if data.len() >= 1 { + Ok(RefValue::Integer(data[0] as i8 as i64)) + } else { + Err(LimboError::Corrupt("Invalid I8 data".into())) + } + } + SerialTypeKind::I16 => { + if data.len() >= 2 { + let value = i16::from_be_bytes([data[0], data[1]]) as i64; + Ok(RefValue::Integer(value)) + } else { + Err(LimboError::Corrupt("Invalid I16 data".into())) + } + } + SerialTypeKind::I24 => { + if data.len() >= 3 { + // Sign extend the 24-bit value + let mut bytes = [0u8; 4]; + bytes[1..4].copy_from_slice(data); + let value = i32::from_be_bytes(bytes); + // Sign extend from 24-bit to 32-bit + let value = if (value & 0x00800000) != 0 { + value | 0xFF000000u32 as i32 + } else { + value + }; + Ok(RefValue::Integer(value as i64)) + } else { + Err(LimboError::Corrupt("Invalid I24 data".into())) + } + } + SerialTypeKind::I32 => { + if data.len() >= 4 { + let value = i32::from_be_bytes([data[0], data[1], data[2], data[3]]) as i64; + Ok(RefValue::Integer(value)) + } else { + Err(LimboError::Corrupt("Invalid I32 data".into())) + } + } + SerialTypeKind::I48 => { + if data.len() >= 6 { + let mut bytes = [0u8; 8]; + bytes[2..8].copy_from_slice(data); + let value = i64::from_be_bytes(bytes); + let value = if (value & 0x0000800000000000) != 0 { + value | 0xFFFF000000000000u64 as i64 + } else { + value + }; + Ok(RefValue::Integer(value)) + } else { + Err(LimboError::Corrupt("Invalid I48 data".into())) + } + } + SerialTypeKind::I64 => { + if data.len() >= 8 { + let value = i64::from_be_bytes([ + data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7], + ]); + Ok(RefValue::Integer(value)) + } else { + Err(LimboError::Corrupt("Invalid I64 data".into())) + } + } + SerialTypeKind::F64 => { + if data.len() >= 8 { + let bits = u64::from_be_bytes([ + data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7], + ]); + let value = f64::from_bits(bits); + Ok(RefValue::Float(value)) + } else { + Err(LimboError::Corrupt("Invalid F64 data".into())) + } + } + SerialTypeKind::Text => { + let raw_slice = RawSlice::new(data.as_ptr(), data.len()); + Ok(RefValue::Text(TextRef { + value: raw_slice, + subtype: TextSubtype::Text, + })) + } + SerialTypeKind::Blob => { + // Create RawSlice pointing to the data + let raw_slice = RawSlice::new(data.as_ptr(), data.len()); + Ok(RefValue::Blob(raw_slice)) + } + } } - pub fn get_value(&self, idx: usize) -> &RefValue { - &self.values[idx] + fn ensure_parsed_upto(&mut self, target_idx: usize) -> Result<()> { + if self.payload.is_empty() { + return Ok(()); + } + + if self.header_parsed_count == 0 && self.header_offsets.is_empty() { + let (header_size, bytes_read) = read_varint(&self.payload)?; + self.header_offset = bytes_read; + self.header_offsets.push(header_size as u32); + + self.header_offsets.push(header_size as u32); + } + + if !self.header_offsets.is_empty() { + let header_size = self.header_offsets[0] as usize; + + while self.header_parsed_count <= target_idx + && self.header_offset < header_size + && self.header_offset < self.payload.len() + { + let (serial_type_u64, bytes_read) = + match read_varint(&self.payload[self.header_offset..]) { + Ok(result) => result, + Err(_) => break, + }; + let serial_type = serial_type_u64 as u32; + self.header_offset += bytes_read; + + self.serial_types.push(serial_type); + + let serial_type_obj = match SerialType::try_from(serial_type_u64) { + Ok(st) => st, + Err(_) => break, + }; + let data_size = serial_type_obj.size(); + + let current_offset = self.header_offsets.last().copied().unwrap_or(0); + self.header_offsets.push(current_offset + data_size as u32); + + self.header_parsed_count += 1; + } + } + + Ok(()) } - pub fn get_value_opt(&self, idx: usize) -> Option<&RefValue> { - self.values.get(idx) + pub fn count(&mut self) -> usize { + if self.is_invalidated() { + return 0; + } + + if let Ok(()) = self.parse_full_header() { + self.serial_types.len() + } else { + 0 + } } - pub fn len(&self) -> usize { - self.values.len() + pub fn last_value(&mut self) -> Option> { + let count = self.count(); + if count > 0 { + Some(self.get_value_opt(count - 1)?) + } else { + None + } + } + + pub fn get_values(&mut self) -> Result> { + if self.is_invalidated() { + return Ok(Vec::new()); + } + + self.parse_full_header()?; + + let mut values = Vec::with_capacity(self.serial_types.len()); + for i in 0..self.serial_types.len() { + values.push(self.get_value(i)?); + } + Ok(values) + } + + pub fn get_value(&mut self, idx: usize) -> Result { + if self.is_invalidated() { + return Err(LimboError::InternalError("Record not initialized".into())); + } + + self.ensure_parsed_upto(idx)?; + + if idx >= self.serial_types.len() { + return Ok(RefValue::Null); + } + + self.deserialize_column(idx) + } + + pub fn get_value_opt(&mut self, idx: usize) -> Option> { + if self.is_invalidated() { + return None; + } + + match self.ensure_parsed_upto(idx) { + Ok(()) => { + if idx >= self.serial_types.len() { + return None; + } + Some(self.get_value(idx)) + } + Err(e) => Some(Err(e)), + } + } + + pub fn len(&mut self) -> usize { + self.count() + } + + pub fn is_empty(&self) -> bool { + self.is_invalidated() || self.payload.is_empty() } pub fn is_empty(&self) -> bool { @@ -917,9 +1159,11 @@ impl ImmutableRecord { size_header += n; size_values += value_size; } + let mut header_size = size_header; const MIN_HEADER_SIZE: usize = 126; if header_size <= MIN_HEADER_SIZE { + assert!(header_size <= 126); // common case // This case means the header size can be contained by a single byte, therefore // header_size == size of serial types + 1 byte from the header size @@ -927,16 +1171,22 @@ impl ImmutableRecord { // header size here will be 126 == (2^7 - 1) header_size += 1; } else { - todo!("calculate big header size extra bytes"); - // get header varint len - // header_size += n; - // if( nVarint bool { - self.payload.as_blob().is_empty() + self.payload.as_blob().is_empty() && self.header_offsets.is_empty() } pub fn get_payload(&self) -> &[u8] { @@ -1035,65 +1288,58 @@ impl ImmutableRecord { pub fn as_blob_value(&self) -> &Value { &self.payload } -} -impl Display for ImmutableRecord { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - for value in &self.values { - match value { - RefValue::Null => write!(f, "NULL")?, - RefValue::Integer(i) => write!(f, "Integer({})", *i)?, - RefValue::Float(flo) => write!(f, "Float({})", *flo)?, - RefValue::Text(text_ref) => write!(f, "Text({})", text_ref.as_str())?, - RefValue::Blob(raw_slice) => { - write!(f, "Blob({})", String::from_utf8_lossy(raw_slice.to_slice()))? - } - } - if value != self.values.last().unwrap() { - write!(f, ", ")?; - } + pub fn debug_string(&mut self) -> String { + if self.is_invalidated() { + return "ImmutableRecord { invalidated }".to_string(); + } + + match self.get_values() { + Ok(values) => values + .iter() + .enumerate() + .map(|(_i, v)| match v { + RefValue::Null => "NULL".to_string(), + RefValue::Integer(i) => format!("Integer({})", i), + RefValue::Float(f) => format!("Float({})", f), + RefValue::Text(t) => format!("Text({})", t.as_str()), + RefValue::Blob(b) => format!("Blob({})", String::from_utf8_lossy(b.to_slice())), + }) + .collect::>() + .join(", "), + Err(e) => format!("ImmutableRecord {{ error: {} }}", e), } - Ok(()) } } -impl Clone for ImmutableRecord { - fn clone(&self) -> Self { - let mut new_values = Vec::new(); - let new_payload = self.payload.clone(); - for value in &self.values { - let value = match value { - RefValue::Null => RefValue::Null, - RefValue::Integer(i) => RefValue::Integer(*i), - RefValue::Float(f) => RefValue::Float(*f), - RefValue::Text(text_ref) => { - // let's update pointer - let ptr_start = self.payload.as_blob().as_ptr() as usize; - let ptr_end = text_ref.value.data as usize; - let len = ptr_end - ptr_start; - let new_ptr = unsafe { new_payload.as_blob().as_ptr().add(len) }; - RefValue::Text(TextRef { - value: RawSlice::new(new_ptr, text_ref.value.len), - subtype: text_ref.subtype.clone(), - }) - } - RefValue::Blob(raw_slice) => { - let ptr_start = self.payload.as_blob().as_ptr() as usize; - let ptr_end = raw_slice.data as usize; - let len = ptr_end - ptr_start; - let new_ptr = unsafe { new_payload.as_blob().as_ptr().add(len) }; - RefValue::Blob(RawSlice::new(new_ptr, raw_slice.len)) - } - }; - new_values.push(value); - } - Self { - payload: new_payload, - values: new_values, - recreating: self.recreating, - } - } -} +// &mut requirement for ImmutableRecord is problematic for this. +// for now we use debug_string() +// impl Display for ImmutableRecord { +// fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +// let values = match self.get_values() { +// Ok(values) => values, +// Err(e) => { +// eprintln!("Failed to get values for display: {}", e); // or use a logger +// return Err(std::fmt::Error); +// } +// }; +// for value in values { +// match value { +// RefValue::Null => write!(f, "NULL")?, +// RefValue::Integer(i) => write!(f, "Integer({})", i)?, +// RefValue::Float(flo) => write!(f, "Float({})", flo)?, +// RefValue::Text(text_ref) => write!(f, "Text({})", text_ref.as_str())?, +// RefValue::Blob(raw_slice) => { +// write!(f, "Blob({})", String::from_utf8_lossy(raw_slice.to_slice()))? +// } +// } +// if value != *values.last().unwrap() { +// write!(f, ", ")?; +// } +// } +// Ok(()) +// } +// } impl RefValue { pub fn to_ffi(&self) -> ExtValue {