From c7aa3c3d9377e8a97aa4b7306a2dcba5601bd671 Mon Sep 17 00:00:00 2001 From: Krishna Vishal Date: Mon, 23 Jun 2025 19:38:14 +0530 Subject: [PATCH] Fix btree to invalidate RecordCursor Use `read_value` instead of `deserialize_column_data` Add `sqlite_int_float_compare` which takes care of out of range floats --- core/storage/btree.rs | 59 ++++++----- core/types.rs | 237 +++++++++++++++--------------------------- 2 files changed, 116 insertions(+), 180 deletions(-) diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 6d245290b..9f7c57542 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -800,10 +800,12 @@ impl BTreeCursor { std::mem::swap(payload, &mut payload_swap); let mut reuse_immutable = self.get_immutable_record_or_create(); - crate::storage::sqlite3_ondisk::read_record( - &payload_swap, - reuse_immutable.as_mut().unwrap(), - )?; + + reuse_immutable + .as_mut() + .unwrap() + .start_serialization(&payload_swap); + self.record_cursor.borrow_mut().invalidate(); let _ = read_overflow_state.take(); Ok(CursorResult::Ok(())) @@ -1563,10 +1565,11 @@ impl BTreeCursor { if let Some(next_page) = first_overflow_page { return_if_io!(self.process_overflow_read(payload, *next_page, *payload_size)) } else { - crate::storage::sqlite3_ondisk::read_record( - payload, - self.get_immutable_record_or_create().as_mut().unwrap(), - )? + self.get_immutable_record_or_create() + .as_mut() + .unwrap() + .start_serialization(payload); + self.record_cursor.borrow_mut().invalidate(); }; let (target_leaf_page_is_in_left_subtree, is_eq) = { let record = self.get_immutable_record(); @@ -1846,10 +1849,12 @@ impl BTreeCursor { if let Some(next_page) = first_overflow_page { return_if_io!(self.process_overflow_read(payload, *next_page, *payload_size)) } else { - crate::storage::sqlite3_ondisk::read_record( - payload, - self.get_immutable_record_or_create().as_mut().unwrap(), - )? + self.get_immutable_record_or_create() + .as_mut() + .unwrap() + .start_serialization(payload); + + self.record_cursor.borrow_mut().invalidate(); }; let (_, found) = self.compare_with_current_record(key, seek_op); moving_up_to_parent.set(false); @@ -1939,10 +1944,12 @@ impl BTreeCursor { if let Some(next_page) = first_overflow_page { return_if_io!(self.process_overflow_read(payload, *next_page, *payload_size)) } else { - crate::storage::sqlite3_ondisk::read_record( - payload, - self.get_immutable_record_or_create().as_mut().unwrap(), - )? + self.get_immutable_record_or_create() + .as_mut() + .unwrap() + .start_serialization(payload); + + self.record_cursor.borrow_mut().invalidate(); }; let (cmp, found) = self.compare_with_current_record(key, seek_op); if found { @@ -2011,10 +2018,12 @@ impl BTreeCursor { if let Some(next_page) = next_page { self.process_overflow_read(payload, next_page, payload_size) } else { - crate::storage::sqlite3_ondisk::read_record( - payload, - self.get_immutable_record_or_create().as_mut().unwrap(), - )?; + self.get_immutable_record_or_create() + .as_mut() + .unwrap() + .start_serialization(payload); + + self.record_cursor.borrow_mut().invalidate(); Ok(CursorResult::Ok(())) } } @@ -4005,6 +4014,7 @@ impl BTreeCursor { .as_mut() .unwrap() .invalidate(); + self.record_cursor.borrow_mut().invalidate(); } #[instrument(skip_all, level = Level::INFO)] @@ -4123,10 +4133,11 @@ impl BTreeCursor { if let Some(next_page) = first_overflow_page { return_if_io!(self.process_overflow_read(payload, next_page, payload_size)) } else { - crate::storage::sqlite3_ondisk::read_record( - payload, - self.get_immutable_record_or_create().as_mut().unwrap(), - )? + self.get_immutable_record_or_create() + .as_mut() + .unwrap() + .start_serialization(payload); + self.record_cursor.borrow_mut().invalidate(); }; *self.parse_record_state.borrow_mut() = ParseRecordState::Init; diff --git a/core/types.rs b/core/types.rs index 55b78d7b6..01cb0b115 100644 --- a/core/types.rs +++ b/core/types.rs @@ -856,7 +856,7 @@ impl ImmutableRecord { let mut cursor = RecordCursor::new(); match cursor.get_values(self) { Ok(values) => values, - Err(err) => Vec::new(), + Err(_) => Vec::new(), } } @@ -1068,12 +1068,10 @@ impl RecordCursor { let (header_size, bytes_read) = read_varint(payload)?; self.header_offset = bytes_read; self.header_offsets.push(header_size as u32); - } - if !self.header_offsets.is_empty() { - let header_size = self.header_offsets[0] as usize; + let mut current_offset = header_size as u32; - while self.header_offset < header_size && self.header_offset < payload.len() { + while self.header_offset < header_size as usize && self.header_offset < payload.len() { let (serial_type, bytes_read) = read_varint(&payload[self.header_offset..])?; self.header_offset += bytes_read; self.serial_types.push(serial_type as u32); @@ -1081,11 +1079,13 @@ impl RecordCursor { let serial_type_obj = SerialType::try_from(serial_type)?; let column_data_size = serial_type_obj.size(); - let current_offset = self.header_offsets.last().copied().unwrap_or(0); - self.header_offsets - .push(current_offset + column_data_size as u32); + self.header_offsets.push(current_offset); + current_offset += column_data_size as u32; + self.header_parsed_count += 1; } + + self.header_offsets.push(current_offset); } Ok(()) @@ -1096,169 +1096,42 @@ impl RecordCursor { record: &ImmutableRecord, target_idx: usize, ) -> Result<()> { - let payload = record.get_payload(); - if payload.is_empty() { - return Ok(()); - } - - if self.header_parsed_count == 0 && self.header_offsets.is_empty() { - let (header_size, bytes_read) = read_varint(payload)?; - self.header_offset = bytes_read; - self.header_offsets.push(header_size as u32); - self.header_offsets.push(header_size as u32); - } - - if !self.header_offsets.is_empty() { - let header_size = self.header_offsets[0] as usize; - - while self.header_parsed_count <= target_idx - && self.header_offset < header_size - && self.header_offset < payload.len() - { - let (serial_type, bytes_read) = match read_varint(&payload[self.header_offset..]) { - Ok(result) => result, - Err(_) => break, - }; - - self.header_offset += bytes_read; - self.serial_types.push(serial_type as u32); - - let serial_type_obj = match SerialType::try_from(serial_type) { - Ok(st) => st, - Err(_) => break, - }; - let data_size = serial_type_obj.size(); - let current_offset = self.header_offsets.last().copied().unwrap_or(0); - self.header_offsets.push(current_offset + data_size as u32); - - self.header_parsed_count += 1; - } - } - Ok(()) + let _ = target_idx; + self.parse_full_header(record) } pub fn deserialize_column(&self, record: &ImmutableRecord, idx: usize) -> Result { - if idx >= self.serial_types.len() || idx + 2 >= self.header_offsets.len() { + // SQLite returns NULL for out-of-bounds column access + if idx >= self.serial_types.len() { return Ok(RefValue::Null); } let serial_type = self.serial_types[idx]; + let serial_type_obj = SerialType::try_from(serial_type as u64)?; + + match serial_type_obj.kind() { + SerialTypeKind::Null => return Ok(RefValue::Null), + SerialTypeKind::ConstInt0 => return Ok(RefValue::Integer(0)), + SerialTypeKind::ConstInt1 => return Ok(RefValue::Integer(1)), + _ => {} // Continue to read from payload + } + + // Check if we have enough header_offsets for non-constant types + // We need header_size (index 0) + column_offsets (index 1..n) + end_offset (index n+1) + if idx + 2 >= self.header_offsets.len() { + return Ok(RefValue::Null); + } + let start_offset = self.header_offsets[idx + 1] as usize; let end_offset = self.header_offsets[idx + 2] as usize; let payload = record.get_payload(); if start_offset >= payload.len() || end_offset > payload.len() { - return Err(LimboError::InternalError( - "Column offset out of bounds".into(), - )); + return Ok(RefValue::Null); } let column_data = &payload[start_offset..end_offset]; - self.deserialize_column_data(serial_type, column_data) - } - - fn deserialize_column_data(&self, serial_type: u32, data: &[u8]) -> Result { - let serial_type_obj = SerialType::try_from(serial_type as u64)?; - let value = match serial_type_obj.kind() { - SerialTypeKind::Null => RefValue::Null, - SerialTypeKind::ConstInt0 => RefValue::Integer(0), - SerialTypeKind::ConstInt1 => RefValue::Integer(1), - SerialTypeKind::I8 => { - if data.is_empty() { - crate::bail_corrupt_error!("Invalid UInt8 value"); - } - let val = data[0] as i8; - RefValue::Integer(val as i64) - } - SerialTypeKind::I16 => { - if data.len() != 2 { - crate::bail_corrupt_error!("Invalid UInt16 value"); - } - let val = i16::from_be_bytes([data[0], data[1]]); - RefValue::Integer(val as i64) - } - SerialTypeKind::I24 => { - if data.len() != 3 { - crate::bail_corrupt_error!("Invalid BEInt24 value"); - } - let sign_extension = if data[0] <= 127 { 0 } else { 255 }; - let val = i32::from_be_bytes([sign_extension, data[0], data[1], data[2]]); - RefValue::Integer(val as i64) - } - SerialTypeKind::I32 => { - if data.len() != 4 { - crate::bail_corrupt_error!("Invalid BEInt32 value"); - } - let val = i32::from_be_bytes([data[0], data[1], data[2], data[3]]); - RefValue::Integer(val as i64) - } - SerialTypeKind::I48 => { - if data.len() != 6 { - crate::bail_corrupt_error!("Invalid BEInt48 value"); - } - let sign_extension = if data[0] <= 127 { 0 } else { 255 }; - let val = i64::from_be_bytes([ - sign_extension, - sign_extension, - data[0], - data[1], - data[2], - data[3], - data[4], - data[5], - ]); - RefValue::Integer(val) - } - SerialTypeKind::I64 => { - if data.len() < 8 { - crate::bail_corrupt_error!("Invalid BEInt64 value"); - } - RefValue::Integer(i64::from_be_bytes([ - data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7], - ])) - } - SerialTypeKind::F64 => { - if data.len() < 8 { - crate::bail_corrupt_error!("Invalid BEFloat64 value"); - } - RefValue::Float(f64::from_be_bytes([ - data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7], - ])) - } - SerialTypeKind::Blob => { - let content_size = serial_type_obj.size(); - if data.len() < content_size { - crate::bail_corrupt_error!("Invalid Blob value"); - } - if content_size == 0 { - RefValue::Blob(RawSlice::new(std::ptr::null(), 0)) - } else { - let ptr = &data[0] as *const u8; - let slice = RawSlice::new(ptr, content_size); - RefValue::Blob(slice) - } - } - SerialTypeKind::Text => { - let content_size = serial_type_obj.size(); - if data.len() < content_size { - crate::bail_corrupt_error!( - "Invalid String value, length {} < expected length {}", - data.len(), - content_size - ); - } - let slice = if content_size == 0 { - RawSlice::new(std::ptr::null(), 0) - } else { - let ptr = &data[0] as *const u8; - RawSlice::new(ptr, content_size) - }; - RefValue::Text(TextRef { - value: slice, - subtype: TextSubtype::Text, - }) - } - }; + let (value, _) = crate::storage::sqlite3_ondisk::read_value(column_data, serial_type_obj)?; Ok(value) } @@ -1421,6 +1294,30 @@ impl PartialOrd for RefValue { } } +fn sqlite_int_float_compare(int_val: i64, float_val: f64) -> std::cmp::Ordering { + if float_val.is_nan() { + return std::cmp::Ordering::Greater; + } + + if float_val < -9223372036854775808.0 { + return std::cmp::Ordering::Greater; + } + if float_val >= 9223372036854775808.0 { + return std::cmp::Ordering::Less; + } + + let float_as_int = float_val as i64; + match int_val.cmp(&float_as_int) { + std::cmp::Ordering::Equal => { + let int_as_float = int_val as f64; + int_as_float + .partial_cmp(&float_val) + .unwrap_or(std::cmp::Ordering::Equal) + } + other => other, + } +} + /// A bitfield that represents the comparison spec for index keys. /// Since indexed columns can individually specify ASC/DESC, each key must /// be compared differently. @@ -1505,6 +1402,34 @@ pub fn compare_immutable( std::cmp::Ordering::Equal } +#[derive(Debug, Clone, Copy)] +pub enum RecordCompare { + Int, + String, + Generic, +} + +impl RecordCompare { + pub fn compare( + &self, + serialized: &[u8], + unpacked: &[RefValue], + index_info: &IndexKeyInfo, + collations: &[CollationSeq], + skip: usize, + ) -> Result { + Ok(std::cmp::Ordering::Equal) + } +} + +// pub find_compare(unpacked: &[RefValue], index_info: &IndexKeyInfo) -> RecordCompare { +// if unpacked.len() > 0 && index_info.num_cols <= 13 { +// match &unpacked[0] { +// RefValue::Integer(_) if can_use_int_ +// } +// } +// } + const I8_LOW: i64 = -128; const I8_HIGH: i64 = 127; const I16_LOW: i64 = -32768;