Fix btree to invalidate RecordCursor

Use `read_value` instead of `deserialize_column_data`
Add `sqlite_int_float_compare` which takes care of out of range
floats
This commit is contained in:
Krishna Vishal
2025-06-23 19:38:14 +05:30
parent 515712b7f2
commit c7aa3c3d93
2 changed files with 116 additions and 180 deletions

View File

@@ -800,10 +800,12 @@ impl BTreeCursor {
std::mem::swap(payload, &mut payload_swap);
let mut reuse_immutable = self.get_immutable_record_or_create();
crate::storage::sqlite3_ondisk::read_record(
&payload_swap,
reuse_immutable.as_mut().unwrap(),
)?;
reuse_immutable
.as_mut()
.unwrap()
.start_serialization(&payload_swap);
self.record_cursor.borrow_mut().invalidate();
let _ = read_overflow_state.take();
Ok(CursorResult::Ok(()))
@@ -1563,10 +1565,11 @@ impl BTreeCursor {
if let Some(next_page) = first_overflow_page {
return_if_io!(self.process_overflow_read(payload, *next_page, *payload_size))
} else {
crate::storage::sqlite3_ondisk::read_record(
payload,
self.get_immutable_record_or_create().as_mut().unwrap(),
)?
self.get_immutable_record_or_create()
.as_mut()
.unwrap()
.start_serialization(payload);
self.record_cursor.borrow_mut().invalidate();
};
let (target_leaf_page_is_in_left_subtree, is_eq) = {
let record = self.get_immutable_record();
@@ -1846,10 +1849,12 @@ impl BTreeCursor {
if let Some(next_page) = first_overflow_page {
return_if_io!(self.process_overflow_read(payload, *next_page, *payload_size))
} else {
crate::storage::sqlite3_ondisk::read_record(
payload,
self.get_immutable_record_or_create().as_mut().unwrap(),
)?
self.get_immutable_record_or_create()
.as_mut()
.unwrap()
.start_serialization(payload);
self.record_cursor.borrow_mut().invalidate();
};
let (_, found) = self.compare_with_current_record(key, seek_op);
moving_up_to_parent.set(false);
@@ -1939,10 +1944,12 @@ impl BTreeCursor {
if let Some(next_page) = first_overflow_page {
return_if_io!(self.process_overflow_read(payload, *next_page, *payload_size))
} else {
crate::storage::sqlite3_ondisk::read_record(
payload,
self.get_immutable_record_or_create().as_mut().unwrap(),
)?
self.get_immutable_record_or_create()
.as_mut()
.unwrap()
.start_serialization(payload);
self.record_cursor.borrow_mut().invalidate();
};
let (cmp, found) = self.compare_with_current_record(key, seek_op);
if found {
@@ -2011,10 +2018,12 @@ impl BTreeCursor {
if let Some(next_page) = next_page {
self.process_overflow_read(payload, next_page, payload_size)
} else {
crate::storage::sqlite3_ondisk::read_record(
payload,
self.get_immutable_record_or_create().as_mut().unwrap(),
)?;
self.get_immutable_record_or_create()
.as_mut()
.unwrap()
.start_serialization(payload);
self.record_cursor.borrow_mut().invalidate();
Ok(CursorResult::Ok(()))
}
}
@@ -4005,6 +4014,7 @@ impl BTreeCursor {
.as_mut()
.unwrap()
.invalidate();
self.record_cursor.borrow_mut().invalidate();
}
#[instrument(skip_all, level = Level::INFO)]
@@ -4123,10 +4133,11 @@ impl BTreeCursor {
if let Some(next_page) = first_overflow_page {
return_if_io!(self.process_overflow_read(payload, next_page, payload_size))
} else {
crate::storage::sqlite3_ondisk::read_record(
payload,
self.get_immutable_record_or_create().as_mut().unwrap(),
)?
self.get_immutable_record_or_create()
.as_mut()
.unwrap()
.start_serialization(payload);
self.record_cursor.borrow_mut().invalidate();
};
*self.parse_record_state.borrow_mut() = ParseRecordState::Init;

View File

@@ -856,7 +856,7 @@ impl ImmutableRecord {
let mut cursor = RecordCursor::new();
match cursor.get_values(self) {
Ok(values) => values,
Err(err) => Vec::new(),
Err(_) => Vec::new(),
}
}
@@ -1068,12 +1068,10 @@ impl RecordCursor {
let (header_size, bytes_read) = read_varint(payload)?;
self.header_offset = bytes_read;
self.header_offsets.push(header_size as u32);
}
if !self.header_offsets.is_empty() {
let header_size = self.header_offsets[0] as usize;
let mut current_offset = header_size as u32;
while self.header_offset < header_size && self.header_offset < payload.len() {
while self.header_offset < header_size as usize && self.header_offset < payload.len() {
let (serial_type, bytes_read) = read_varint(&payload[self.header_offset..])?;
self.header_offset += bytes_read;
self.serial_types.push(serial_type as u32);
@@ -1081,11 +1079,13 @@ impl RecordCursor {
let serial_type_obj = SerialType::try_from(serial_type)?;
let column_data_size = serial_type_obj.size();
let current_offset = self.header_offsets.last().copied().unwrap_or(0);
self.header_offsets
.push(current_offset + column_data_size as u32);
self.header_offsets.push(current_offset);
current_offset += column_data_size as u32;
self.header_parsed_count += 1;
}
self.header_offsets.push(current_offset);
}
Ok(())
@@ -1096,169 +1096,42 @@ impl RecordCursor {
record: &ImmutableRecord,
target_idx: usize,
) -> Result<()> {
let payload = record.get_payload();
if payload.is_empty() {
return Ok(());
}
if self.header_parsed_count == 0 && self.header_offsets.is_empty() {
let (header_size, bytes_read) = read_varint(payload)?;
self.header_offset = bytes_read;
self.header_offsets.push(header_size as u32);
self.header_offsets.push(header_size as u32);
}
if !self.header_offsets.is_empty() {
let header_size = self.header_offsets[0] as usize;
while self.header_parsed_count <= target_idx
&& self.header_offset < header_size
&& self.header_offset < payload.len()
{
let (serial_type, bytes_read) = match read_varint(&payload[self.header_offset..]) {
Ok(result) => result,
Err(_) => break,
};
self.header_offset += bytes_read;
self.serial_types.push(serial_type as u32);
let serial_type_obj = match SerialType::try_from(serial_type) {
Ok(st) => st,
Err(_) => break,
};
let data_size = serial_type_obj.size();
let current_offset = self.header_offsets.last().copied().unwrap_or(0);
self.header_offsets.push(current_offset + data_size as u32);
self.header_parsed_count += 1;
}
}
Ok(())
let _ = target_idx;
self.parse_full_header(record)
}
pub fn deserialize_column(&self, record: &ImmutableRecord, idx: usize) -> Result<RefValue> {
if idx >= self.serial_types.len() || idx + 2 >= self.header_offsets.len() {
// SQLite returns NULL for out-of-bounds column access
if idx >= self.serial_types.len() {
return Ok(RefValue::Null);
}
let serial_type = self.serial_types[idx];
let serial_type_obj = SerialType::try_from(serial_type as u64)?;
match serial_type_obj.kind() {
SerialTypeKind::Null => return Ok(RefValue::Null),
SerialTypeKind::ConstInt0 => return Ok(RefValue::Integer(0)),
SerialTypeKind::ConstInt1 => return Ok(RefValue::Integer(1)),
_ => {} // Continue to read from payload
}
// Check if we have enough header_offsets for non-constant types
// We need header_size (index 0) + column_offsets (index 1..n) + end_offset (index n+1)
if idx + 2 >= self.header_offsets.len() {
return Ok(RefValue::Null);
}
let start_offset = self.header_offsets[idx + 1] as usize;
let end_offset = self.header_offsets[idx + 2] as usize;
let payload = record.get_payload();
if start_offset >= payload.len() || end_offset > payload.len() {
return Err(LimboError::InternalError(
"Column offset out of bounds".into(),
));
return Ok(RefValue::Null);
}
let column_data = &payload[start_offset..end_offset];
self.deserialize_column_data(serial_type, column_data)
}
fn deserialize_column_data(&self, serial_type: u32, data: &[u8]) -> Result<RefValue> {
let serial_type_obj = SerialType::try_from(serial_type as u64)?;
let value = match serial_type_obj.kind() {
SerialTypeKind::Null => RefValue::Null,
SerialTypeKind::ConstInt0 => RefValue::Integer(0),
SerialTypeKind::ConstInt1 => RefValue::Integer(1),
SerialTypeKind::I8 => {
if data.is_empty() {
crate::bail_corrupt_error!("Invalid UInt8 value");
}
let val = data[0] as i8;
RefValue::Integer(val as i64)
}
SerialTypeKind::I16 => {
if data.len() != 2 {
crate::bail_corrupt_error!("Invalid UInt16 value");
}
let val = i16::from_be_bytes([data[0], data[1]]);
RefValue::Integer(val as i64)
}
SerialTypeKind::I24 => {
if data.len() != 3 {
crate::bail_corrupt_error!("Invalid BEInt24 value");
}
let sign_extension = if data[0] <= 127 { 0 } else { 255 };
let val = i32::from_be_bytes([sign_extension, data[0], data[1], data[2]]);
RefValue::Integer(val as i64)
}
SerialTypeKind::I32 => {
if data.len() != 4 {
crate::bail_corrupt_error!("Invalid BEInt32 value");
}
let val = i32::from_be_bytes([data[0], data[1], data[2], data[3]]);
RefValue::Integer(val as i64)
}
SerialTypeKind::I48 => {
if data.len() != 6 {
crate::bail_corrupt_error!("Invalid BEInt48 value");
}
let sign_extension = if data[0] <= 127 { 0 } else { 255 };
let val = i64::from_be_bytes([
sign_extension,
sign_extension,
data[0],
data[1],
data[2],
data[3],
data[4],
data[5],
]);
RefValue::Integer(val)
}
SerialTypeKind::I64 => {
if data.len() < 8 {
crate::bail_corrupt_error!("Invalid BEInt64 value");
}
RefValue::Integer(i64::from_be_bytes([
data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7],
]))
}
SerialTypeKind::F64 => {
if data.len() < 8 {
crate::bail_corrupt_error!("Invalid BEFloat64 value");
}
RefValue::Float(f64::from_be_bytes([
data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7],
]))
}
SerialTypeKind::Blob => {
let content_size = serial_type_obj.size();
if data.len() < content_size {
crate::bail_corrupt_error!("Invalid Blob value");
}
if content_size == 0 {
RefValue::Blob(RawSlice::new(std::ptr::null(), 0))
} else {
let ptr = &data[0] as *const u8;
let slice = RawSlice::new(ptr, content_size);
RefValue::Blob(slice)
}
}
SerialTypeKind::Text => {
let content_size = serial_type_obj.size();
if data.len() < content_size {
crate::bail_corrupt_error!(
"Invalid String value, length {} < expected length {}",
data.len(),
content_size
);
}
let slice = if content_size == 0 {
RawSlice::new(std::ptr::null(), 0)
} else {
let ptr = &data[0] as *const u8;
RawSlice::new(ptr, content_size)
};
RefValue::Text(TextRef {
value: slice,
subtype: TextSubtype::Text,
})
}
};
let (value, _) = crate::storage::sqlite3_ondisk::read_value(column_data, serial_type_obj)?;
Ok(value)
}
@@ -1421,6 +1294,30 @@ impl PartialOrd<RefValue> for RefValue {
}
}
fn sqlite_int_float_compare(int_val: i64, float_val: f64) -> std::cmp::Ordering {
if float_val.is_nan() {
return std::cmp::Ordering::Greater;
}
if float_val < -9223372036854775808.0 {
return std::cmp::Ordering::Greater;
}
if float_val >= 9223372036854775808.0 {
return std::cmp::Ordering::Less;
}
let float_as_int = float_val as i64;
match int_val.cmp(&float_as_int) {
std::cmp::Ordering::Equal => {
let int_as_float = int_val as f64;
int_as_float
.partial_cmp(&float_val)
.unwrap_or(std::cmp::Ordering::Equal)
}
other => other,
}
}
/// A bitfield that represents the comparison spec for index keys.
/// Since indexed columns can individually specify ASC/DESC, each key must
/// be compared differently.
@@ -1505,6 +1402,34 @@ pub fn compare_immutable(
std::cmp::Ordering::Equal
}
#[derive(Debug, Clone, Copy)]
pub enum RecordCompare {
Int,
String,
Generic,
}
impl RecordCompare {
pub fn compare(
&self,
serialized: &[u8],
unpacked: &[RefValue],
index_info: &IndexKeyInfo,
collations: &[CollationSeq],
skip: usize,
) -> Result<std::cmp::Ordering> {
Ok(std::cmp::Ordering::Equal)
}
}
// pub find_compare(unpacked: &[RefValue], index_info: &IndexKeyInfo) -> RecordCompare {
// if unpacked.len() > 0 && index_info.num_cols <= 13 {
// match &unpacked[0] {
// RefValue::Integer(_) if can_use_int_
// }
// }
// }
const I8_LOW: i64 = -128;
const I8_HIGH: i64 = 127;
const I16_LOW: i64 = -32768;