mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-22 08:25:29 +01:00
Add incremental and on-demand parsing of ImmutableRecord.
First step at resolving the currently wasteful eager parsing.
This commit is contained in:
458
core/types.rs
458
core/types.rs
@@ -8,7 +8,7 @@ use crate::ext::{ExtValue, ExtValueType};
|
||||
use crate::pseudo::PseudoCursor;
|
||||
use crate::schema::Index;
|
||||
use crate::storage::btree::BTreeCursor;
|
||||
use crate::storage::sqlite3_ondisk::write_varint;
|
||||
use crate::storage::sqlite3_ondisk::{read_varint, write_varint};
|
||||
use crate::translate::collate::CollationSeq;
|
||||
use crate::translate::plan::IterationDirection;
|
||||
use crate::vdbe::sorter::Sorter;
|
||||
@@ -753,7 +753,7 @@ impl<'a> TryFrom<&'a RefValue> for &'a str {
|
||||
/// A value in a record that has already been serialized can stay serialized and what this struct offsers
|
||||
/// is easy acces to each value which point to the payload.
|
||||
/// The name might be contradictory as it is immutable in the sense that you cannot modify the values without modifying the payload.
|
||||
#[derive(Eq, Ord, PartialEq, PartialOrd)]
|
||||
#[derive(Debug, Clone, Eq, Ord, PartialEq, PartialOrd)]
|
||||
pub struct ImmutableRecord {
|
||||
// We have to be super careful with this buffer since we make values point to the payload we need to take care reallocations
|
||||
// happen in a controlled manner. If we realocate with values that should be correct, they will now point to undefined data.
|
||||
@@ -761,18 +761,20 @@ pub struct ImmutableRecord {
|
||||
//
|
||||
// payload is the Vec<u8> but in order to use Register which holds ImmutableRecord as a Value - we store Vec<u8> as Value::Blob
|
||||
payload: Value,
|
||||
pub values: Vec<RefValue>,
|
||||
recreating: bool,
|
||||
header_offsets: Vec<u32>,
|
||||
serial_types: Vec<u32>,
|
||||
header_parsed_count: usize,
|
||||
header_offset: usize,
|
||||
}
|
||||
|
||||
impl Debug for ImmutableRecord {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("ImmutableRecord")
|
||||
.field("values", &self.values)
|
||||
.field("recreating", &self.recreating)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
// impl Debug for ImmutableRecord {
|
||||
// fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
// f.debug_struct("ImmutableRecord")
|
||||
// .field("values", &self.values)
|
||||
// .field("recreating", &self.recreating)
|
||||
// .finish()
|
||||
// }
|
||||
// }
|
||||
|
||||
#[derive(PartialEq)]
|
||||
pub enum ParseRecordState {
|
||||
@@ -851,44 +853,284 @@ impl ImmutableRecord {
|
||||
pub fn new(payload_capacity: usize, value_capacity: usize) -> Self {
|
||||
Self {
|
||||
payload: Value::Blob(Vec::with_capacity(payload_capacity)),
|
||||
values: Vec::with_capacity(value_capacity),
|
||||
recreating: false,
|
||||
header_offsets: Vec::with_capacity(value_capacity + 1),
|
||||
serial_types: Vec::with_capacity(value_capacity),
|
||||
header_parsed_count: 0,
|
||||
header_offset: 0,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get<'a, T: TryFrom<&'a RefValue, Error = LimboError> + 'a>(
|
||||
&'a self,
|
||||
idx: usize,
|
||||
) -> Result<T> {
|
||||
let value = self
|
||||
.values
|
||||
.get(idx)
|
||||
.ok_or(LimboError::InternalError("Index out of bounds".into()))?;
|
||||
pub fn get<T: TryFrom<RefValue, Error = LimboError>>(&mut self, idx: usize) -> Result<T> {
|
||||
let value = self.get_value(idx)?;
|
||||
T::try_from(value)
|
||||
}
|
||||
|
||||
pub fn count(&self) -> usize {
|
||||
self.values.len()
|
||||
fn parse_full_header(&mut self) -> Result<()> {
|
||||
if self.payload.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if self.header_parsed_count == 0 && self.header_offsets.is_empty() {
|
||||
let (header_size, bytes_read) = read_varint(&self.payload)?;
|
||||
self.header_offset = bytes_read;
|
||||
self.header_offsets.push(header_size as u32);
|
||||
}
|
||||
|
||||
if !self.header_offsets.is_empty() {
|
||||
let header_size = self.header_offsets[0] as usize;
|
||||
|
||||
while self.header_offset < header_size && self.header_offset < self.payload.len() {
|
||||
let (serial_type_u64, bytes_read) =
|
||||
read_varint(&self.payload[self.header_offset..])?;
|
||||
let serial_type = serial_type_u64 as u32;
|
||||
self.header_offset += bytes_read;
|
||||
self.serial_types.push(serial_type);
|
||||
|
||||
let serial_type_obj = SerialType::try_from(serial_type_u64)?;
|
||||
let column_data_size = serial_type_obj.size();
|
||||
|
||||
let current_offset = self.header_offsets.last().copied().unwrap_or(0);
|
||||
self.header_offsets
|
||||
.push(current_offset + column_data_size as u32);
|
||||
self.header_parsed_count += 1;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn last_value(&self) -> Option<&RefValue> {
|
||||
self.values.last()
|
||||
fn deserialize_column(&self, idx: usize) -> Result<RefValue> {
|
||||
if idx >= self.serial_types.len() || idx + 2 >= self.header_offsets.len() {
|
||||
return Ok(RefValue::Null);
|
||||
}
|
||||
|
||||
let serial_type = self.serial_types[idx];
|
||||
let start_offset = self.header_offsets[idx + 1] as usize;
|
||||
let end_offset = self.header_offsets[idx + 2] as usize;
|
||||
|
||||
if start_offset > self.payload.len() || end_offset > self.payload.len() {
|
||||
return Err(LimboError::Corrupt("Column data out of bounds".into()));
|
||||
}
|
||||
|
||||
let column_data = &self.payload[start_offset..end_offset];
|
||||
self.deserialize_column_data(serial_type, column_data)
|
||||
}
|
||||
|
||||
pub fn get_values(&self) -> &Vec<RefValue> {
|
||||
&self.values
|
||||
fn deserialize_column_data(&self, serial_type: u32, data: &[u8]) -> Result<RefValue> {
|
||||
let serial_type_obj = SerialType::try_from(serial_type as u64)?;
|
||||
|
||||
match serial_type_obj.kind() {
|
||||
SerialTypeKind::Null => Ok(RefValue::Null),
|
||||
SerialTypeKind::ConstInt0 => Ok(RefValue::Integer(0)),
|
||||
SerialTypeKind::ConstInt1 => Ok(RefValue::Integer(1)),
|
||||
SerialTypeKind::I8 => {
|
||||
if data.len() >= 1 {
|
||||
Ok(RefValue::Integer(data[0] as i8 as i64))
|
||||
} else {
|
||||
Err(LimboError::Corrupt("Invalid I8 data".into()))
|
||||
}
|
||||
}
|
||||
SerialTypeKind::I16 => {
|
||||
if data.len() >= 2 {
|
||||
let value = i16::from_be_bytes([data[0], data[1]]) as i64;
|
||||
Ok(RefValue::Integer(value))
|
||||
} else {
|
||||
Err(LimboError::Corrupt("Invalid I16 data".into()))
|
||||
}
|
||||
}
|
||||
SerialTypeKind::I24 => {
|
||||
if data.len() >= 3 {
|
||||
// Sign extend the 24-bit value
|
||||
let mut bytes = [0u8; 4];
|
||||
bytes[1..4].copy_from_slice(data);
|
||||
let value = i32::from_be_bytes(bytes);
|
||||
// Sign extend from 24-bit to 32-bit
|
||||
let value = if (value & 0x00800000) != 0 {
|
||||
value | 0xFF000000u32 as i32
|
||||
} else {
|
||||
value
|
||||
};
|
||||
Ok(RefValue::Integer(value as i64))
|
||||
} else {
|
||||
Err(LimboError::Corrupt("Invalid I24 data".into()))
|
||||
}
|
||||
}
|
||||
SerialTypeKind::I32 => {
|
||||
if data.len() >= 4 {
|
||||
let value = i32::from_be_bytes([data[0], data[1], data[2], data[3]]) as i64;
|
||||
Ok(RefValue::Integer(value))
|
||||
} else {
|
||||
Err(LimboError::Corrupt("Invalid I32 data".into()))
|
||||
}
|
||||
}
|
||||
SerialTypeKind::I48 => {
|
||||
if data.len() >= 6 {
|
||||
let mut bytes = [0u8; 8];
|
||||
bytes[2..8].copy_from_slice(data);
|
||||
let value = i64::from_be_bytes(bytes);
|
||||
let value = if (value & 0x0000800000000000) != 0 {
|
||||
value | 0xFFFF000000000000u64 as i64
|
||||
} else {
|
||||
value
|
||||
};
|
||||
Ok(RefValue::Integer(value))
|
||||
} else {
|
||||
Err(LimboError::Corrupt("Invalid I48 data".into()))
|
||||
}
|
||||
}
|
||||
SerialTypeKind::I64 => {
|
||||
if data.len() >= 8 {
|
||||
let value = i64::from_be_bytes([
|
||||
data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7],
|
||||
]);
|
||||
Ok(RefValue::Integer(value))
|
||||
} else {
|
||||
Err(LimboError::Corrupt("Invalid I64 data".into()))
|
||||
}
|
||||
}
|
||||
SerialTypeKind::F64 => {
|
||||
if data.len() >= 8 {
|
||||
let bits = u64::from_be_bytes([
|
||||
data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7],
|
||||
]);
|
||||
let value = f64::from_bits(bits);
|
||||
Ok(RefValue::Float(value))
|
||||
} else {
|
||||
Err(LimboError::Corrupt("Invalid F64 data".into()))
|
||||
}
|
||||
}
|
||||
SerialTypeKind::Text => {
|
||||
let raw_slice = RawSlice::new(data.as_ptr(), data.len());
|
||||
Ok(RefValue::Text(TextRef {
|
||||
value: raw_slice,
|
||||
subtype: TextSubtype::Text,
|
||||
}))
|
||||
}
|
||||
SerialTypeKind::Blob => {
|
||||
// Create RawSlice pointing to the data
|
||||
let raw_slice = RawSlice::new(data.as_ptr(), data.len());
|
||||
Ok(RefValue::Blob(raw_slice))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_value(&self, idx: usize) -> &RefValue {
|
||||
&self.values[idx]
|
||||
fn ensure_parsed_upto(&mut self, target_idx: usize) -> Result<()> {
|
||||
if self.payload.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if self.header_parsed_count == 0 && self.header_offsets.is_empty() {
|
||||
let (header_size, bytes_read) = read_varint(&self.payload)?;
|
||||
self.header_offset = bytes_read;
|
||||
self.header_offsets.push(header_size as u32);
|
||||
|
||||
self.header_offsets.push(header_size as u32);
|
||||
}
|
||||
|
||||
if !self.header_offsets.is_empty() {
|
||||
let header_size = self.header_offsets[0] as usize;
|
||||
|
||||
while self.header_parsed_count <= target_idx
|
||||
&& self.header_offset < header_size
|
||||
&& self.header_offset < self.payload.len()
|
||||
{
|
||||
let (serial_type_u64, bytes_read) =
|
||||
match read_varint(&self.payload[self.header_offset..]) {
|
||||
Ok(result) => result,
|
||||
Err(_) => break,
|
||||
};
|
||||
let serial_type = serial_type_u64 as u32;
|
||||
self.header_offset += bytes_read;
|
||||
|
||||
self.serial_types.push(serial_type);
|
||||
|
||||
let serial_type_obj = match SerialType::try_from(serial_type_u64) {
|
||||
Ok(st) => st,
|
||||
Err(_) => break,
|
||||
};
|
||||
let data_size = serial_type_obj.size();
|
||||
|
||||
let current_offset = self.header_offsets.last().copied().unwrap_or(0);
|
||||
self.header_offsets.push(current_offset + data_size as u32);
|
||||
|
||||
self.header_parsed_count += 1;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn get_value_opt(&self, idx: usize) -> Option<&RefValue> {
|
||||
self.values.get(idx)
|
||||
pub fn count(&mut self) -> usize {
|
||||
if self.is_invalidated() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
if let Ok(()) = self.parse_full_header() {
|
||||
self.serial_types.len()
|
||||
} else {
|
||||
0
|
||||
}
|
||||
}
|
||||
|
||||
pub fn len(&self) -> usize {
|
||||
self.values.len()
|
||||
pub fn last_value(&mut self) -> Option<Result<RefValue>> {
|
||||
let count = self.count();
|
||||
if count > 0 {
|
||||
Some(self.get_value_opt(count - 1)?)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_values(&mut self) -> Result<Vec<RefValue>> {
|
||||
if self.is_invalidated() {
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
|
||||
self.parse_full_header()?;
|
||||
|
||||
let mut values = Vec::with_capacity(self.serial_types.len());
|
||||
for i in 0..self.serial_types.len() {
|
||||
values.push(self.get_value(i)?);
|
||||
}
|
||||
Ok(values)
|
||||
}
|
||||
|
||||
pub fn get_value(&mut self, idx: usize) -> Result<RefValue> {
|
||||
if self.is_invalidated() {
|
||||
return Err(LimboError::InternalError("Record not initialized".into()));
|
||||
}
|
||||
|
||||
self.ensure_parsed_upto(idx)?;
|
||||
|
||||
if idx >= self.serial_types.len() {
|
||||
return Ok(RefValue::Null);
|
||||
}
|
||||
|
||||
self.deserialize_column(idx)
|
||||
}
|
||||
|
||||
pub fn get_value_opt(&mut self, idx: usize) -> Option<Result<RefValue>> {
|
||||
if self.is_invalidated() {
|
||||
return None;
|
||||
}
|
||||
|
||||
match self.ensure_parsed_upto(idx) {
|
||||
Ok(()) => {
|
||||
if idx >= self.serial_types.len() {
|
||||
return None;
|
||||
}
|
||||
Some(self.get_value(idx))
|
||||
}
|
||||
Err(e) => Some(Err(e)),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn len(&mut self) -> usize {
|
||||
self.count()
|
||||
}
|
||||
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.is_invalidated() || self.payload.is_empty()
|
||||
}
|
||||
|
||||
pub fn is_empty(&self) -> bool {
|
||||
@@ -917,9 +1159,11 @@ impl ImmutableRecord {
|
||||
size_header += n;
|
||||
size_values += value_size;
|
||||
}
|
||||
|
||||
let mut header_size = size_header;
|
||||
const MIN_HEADER_SIZE: usize = 126;
|
||||
if header_size <= MIN_HEADER_SIZE {
|
||||
assert!(header_size <= 126);
|
||||
// common case
|
||||
// This case means the header size can be contained by a single byte, therefore
|
||||
// header_size == size of serial types + 1 byte from the header size
|
||||
@@ -927,16 +1171,22 @@ impl ImmutableRecord {
|
||||
// header size here will be 126 == (2^7 - 1)
|
||||
header_size += 1;
|
||||
} else {
|
||||
todo!("calculate big header size extra bytes");
|
||||
// get header varint len
|
||||
// header_size += n;
|
||||
// if( nVarint<sqlite3VarintLen(nHdr) ) nHdr++;
|
||||
// Rare case of a really large header
|
||||
let mut temp_buf = [0u8; 9];
|
||||
let n_varint = write_varint(&mut temp_buf, header_size as u64); // or however you get varint length
|
||||
header_size += n_varint;
|
||||
|
||||
// Check if adding the varint bytes changes the varint length
|
||||
let new_n_varint = write_varint(&mut temp_buf, header_size as u64);
|
||||
if n_varint < new_n_varint {
|
||||
header_size += 1;
|
||||
}
|
||||
}
|
||||
|
||||
// 1. write header size
|
||||
let mut buf = Vec::new();
|
||||
buf.reserve_exact(header_size + size_values);
|
||||
assert_eq!(buf.capacity(), header_size + size_values);
|
||||
assert!(header_size <= 126);
|
||||
let n = write_varint(&mut serial_type_buf, header_size as u64);
|
||||
|
||||
buf.resize(buf.capacity(), 0);
|
||||
@@ -1000,32 +1250,35 @@ impl ImmutableRecord {
|
||||
writer.assert_finish_capacity();
|
||||
Self {
|
||||
payload: Value::Blob(buf),
|
||||
values,
|
||||
recreating: false,
|
||||
header_offsets: Vec::new(),
|
||||
serial_types: Vec::new(),
|
||||
header_parsed_count: 0,
|
||||
header_offset: 0,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn start_serialization(&mut self, payload: &[u8]) {
|
||||
self.recreating = true;
|
||||
self.payload.as_blob_mut().extend_from_slice(payload);
|
||||
}
|
||||
pub fn end_serialization(&mut self) {
|
||||
assert!(self.recreating);
|
||||
self.recreating = false;
|
||||
self.header_offsets.clear();
|
||||
self.serial_types.clear();
|
||||
self.header_parsed_count = 0;
|
||||
self.header_offset = 0;
|
||||
}
|
||||
|
||||
pub fn add_value(&mut self, value: RefValue) {
|
||||
assert!(self.recreating);
|
||||
self.values.push(value);
|
||||
}
|
||||
pub fn end_serialization(&mut self) {}
|
||||
|
||||
pub fn add_value(&mut self, value: RefValue) {}
|
||||
|
||||
pub fn invalidate(&mut self) {
|
||||
self.payload.as_blob_mut().clear();
|
||||
self.values.clear();
|
||||
self.header_offsets.clear();
|
||||
self.serial_types.clear();
|
||||
self.header_parsed_count = 0;
|
||||
self.header_offset = 0;
|
||||
}
|
||||
|
||||
pub fn is_invalidated(&self) -> bool {
|
||||
self.payload.as_blob().is_empty()
|
||||
self.payload.as_blob().is_empty() && self.header_offsets.is_empty()
|
||||
}
|
||||
|
||||
pub fn get_payload(&self) -> &[u8] {
|
||||
@@ -1035,65 +1288,58 @@ impl ImmutableRecord {
|
||||
pub fn as_blob_value(&self) -> &Value {
|
||||
&self.payload
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for ImmutableRecord {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
for value in &self.values {
|
||||
match value {
|
||||
RefValue::Null => write!(f, "NULL")?,
|
||||
RefValue::Integer(i) => write!(f, "Integer({})", *i)?,
|
||||
RefValue::Float(flo) => write!(f, "Float({})", *flo)?,
|
||||
RefValue::Text(text_ref) => write!(f, "Text({})", text_ref.as_str())?,
|
||||
RefValue::Blob(raw_slice) => {
|
||||
write!(f, "Blob({})", String::from_utf8_lossy(raw_slice.to_slice()))?
|
||||
}
|
||||
}
|
||||
if value != self.values.last().unwrap() {
|
||||
write!(f, ", ")?;
|
||||
}
|
||||
pub fn debug_string(&mut self) -> String {
|
||||
if self.is_invalidated() {
|
||||
return "ImmutableRecord { invalidated }".to_string();
|
||||
}
|
||||
|
||||
match self.get_values() {
|
||||
Ok(values) => values
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(_i, v)| match v {
|
||||
RefValue::Null => "NULL".to_string(),
|
||||
RefValue::Integer(i) => format!("Integer({})", i),
|
||||
RefValue::Float(f) => format!("Float({})", f),
|
||||
RefValue::Text(t) => format!("Text({})", t.as_str()),
|
||||
RefValue::Blob(b) => format!("Blob({})", String::from_utf8_lossy(b.to_slice())),
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
.join(", "),
|
||||
Err(e) => format!("ImmutableRecord {{ error: {} }}", e),
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Clone for ImmutableRecord {
|
||||
fn clone(&self) -> Self {
|
||||
let mut new_values = Vec::new();
|
||||
let new_payload = self.payload.clone();
|
||||
for value in &self.values {
|
||||
let value = match value {
|
||||
RefValue::Null => RefValue::Null,
|
||||
RefValue::Integer(i) => RefValue::Integer(*i),
|
||||
RefValue::Float(f) => RefValue::Float(*f),
|
||||
RefValue::Text(text_ref) => {
|
||||
// let's update pointer
|
||||
let ptr_start = self.payload.as_blob().as_ptr() as usize;
|
||||
let ptr_end = text_ref.value.data as usize;
|
||||
let len = ptr_end - ptr_start;
|
||||
let new_ptr = unsafe { new_payload.as_blob().as_ptr().add(len) };
|
||||
RefValue::Text(TextRef {
|
||||
value: RawSlice::new(new_ptr, text_ref.value.len),
|
||||
subtype: text_ref.subtype.clone(),
|
||||
})
|
||||
}
|
||||
RefValue::Blob(raw_slice) => {
|
||||
let ptr_start = self.payload.as_blob().as_ptr() as usize;
|
||||
let ptr_end = raw_slice.data as usize;
|
||||
let len = ptr_end - ptr_start;
|
||||
let new_ptr = unsafe { new_payload.as_blob().as_ptr().add(len) };
|
||||
RefValue::Blob(RawSlice::new(new_ptr, raw_slice.len))
|
||||
}
|
||||
};
|
||||
new_values.push(value);
|
||||
}
|
||||
Self {
|
||||
payload: new_payload,
|
||||
values: new_values,
|
||||
recreating: self.recreating,
|
||||
}
|
||||
}
|
||||
}
|
||||
// &mut requirement for ImmutableRecord is problematic for this.
|
||||
// for now we use debug_string()
|
||||
// impl Display for ImmutableRecord {
|
||||
// fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
// let values = match self.get_values() {
|
||||
// Ok(values) => values,
|
||||
// Err(e) => {
|
||||
// eprintln!("Failed to get values for display: {}", e); // or use a logger
|
||||
// return Err(std::fmt::Error);
|
||||
// }
|
||||
// };
|
||||
// for value in values {
|
||||
// match value {
|
||||
// RefValue::Null => write!(f, "NULL")?,
|
||||
// RefValue::Integer(i) => write!(f, "Integer({})", i)?,
|
||||
// RefValue::Float(flo) => write!(f, "Float({})", flo)?,
|
||||
// RefValue::Text(text_ref) => write!(f, "Text({})", text_ref.as_str())?,
|
||||
// RefValue::Blob(raw_slice) => {
|
||||
// write!(f, "Blob({})", String::from_utf8_lossy(raw_slice.to_slice()))?
|
||||
// }
|
||||
// }
|
||||
// if value != *values.last().unwrap() {
|
||||
// write!(f, ", ")?;
|
||||
// }
|
||||
// }
|
||||
// Ok(())
|
||||
// }
|
||||
// }
|
||||
|
||||
impl RefValue {
|
||||
pub fn to_ffi(&self) -> ExtValue {
|
||||
|
||||
Reference in New Issue
Block a user