mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-20 23:45:18 +01:00
perf/btree: optimize op_column
This commit is contained in:
@@ -100,6 +100,7 @@ pub trait Extendable<T> {
|
||||
}
|
||||
|
||||
impl<T: AnyText> Extendable<T> for Text {
|
||||
#[inline(always)]
|
||||
fn do_extend(&mut self, other: &T) {
|
||||
self.value.clear();
|
||||
self.value.extend_from_slice(other.as_ref().as_bytes());
|
||||
@@ -108,6 +109,7 @@ impl<T: AnyText> Extendable<T> for Text {
|
||||
}
|
||||
|
||||
impl<T: AnyBlob> Extendable<T> for Vec<u8> {
|
||||
#[inline(always)]
|
||||
fn do_extend(&mut self, other: &T) {
|
||||
self.clear();
|
||||
self.extend_from_slice(other.as_slice());
|
||||
@@ -136,6 +138,12 @@ impl AnyText for TextRef {
|
||||
}
|
||||
}
|
||||
|
||||
impl AnyText for &str {
|
||||
fn subtype(&self) -> TextSubtype {
|
||||
TextSubtype::Text
|
||||
}
|
||||
}
|
||||
|
||||
pub trait AnyBlob {
|
||||
fn as_slice(&self) -> &[u8];
|
||||
}
|
||||
@@ -152,6 +160,12 @@ impl AnyBlob for Vec<u8> {
|
||||
}
|
||||
}
|
||||
|
||||
impl AnyBlob for &[u8] {
|
||||
fn as_slice(&self) -> &[u8] {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl AsRef<str> for Text {
|
||||
fn as_ref(&self) -> &str {
|
||||
self.as_str()
|
||||
|
||||
@@ -10,8 +10,7 @@ use crate::storage::pager::{AtomicDbState, CreateBTreeFlags, DbState};
|
||||
use crate::storage::sqlite3_ondisk::read_varint;
|
||||
use crate::translate::collate::CollationSeq;
|
||||
use crate::types::{
|
||||
compare_immutable, compare_records_generic, Extendable, ImmutableRecord, RawSlice, SeekResult,
|
||||
Text, TextRef, TextSubtype,
|
||||
compare_immutable, compare_records_generic, Extendable, ImmutableRecord, SeekResult, Text,
|
||||
};
|
||||
use crate::util::{normalize_ident, IOExt as _};
|
||||
use crate::vdbe::insn::InsertFlags;
|
||||
@@ -1361,60 +1360,6 @@ fn read_varint_fast(buf: &[u8]) -> Result<(u64, usize)> {
|
||||
read_varint(buf)
|
||||
}
|
||||
|
||||
/// This function directly interprets bytes as big-endian signed integers with proper
|
||||
/// sign extension. It's used when the caller already knows the value is an integer
|
||||
/// from parsing the record header.
|
||||
///
|
||||
/// # How OP_Column Uses This
|
||||
///
|
||||
/// In `op_column()`, the record header is parsed incrementally to extract serial types.
|
||||
/// When a serial type indicates an integer (values 1-6), OP_Column can skip the generic
|
||||
/// `read_value()` path and call this function directly:
|
||||
///
|
||||
///
|
||||
/// match serial_type {
|
||||
/// 1..=6 => {
|
||||
/// let expected_len = match serial_type {
|
||||
/// 1 => 1, 2 => 2, 3 => 3, 4 => 4, 5 => 6, 6 => 8, _ => 0,
|
||||
/// };
|
||||
/// Value::Integer(read_integer_fast(data_slice, expected_len))
|
||||
/// }
|
||||
/// // ... other types use generic path
|
||||
/// }
|
||||
///
|
||||
///
|
||||
/// This avoids the general case path: `SerialType::try_from() → match kind() → read_integer()`.
|
||||
#[inline(always)]
|
||||
fn read_integer_fast(buf: &[u8], len: usize) -> i64 {
|
||||
debug_assert!(buf.len() >= len, "Buffer too short for requested length");
|
||||
match len {
|
||||
1 => buf[0] as i8 as i64,
|
||||
2 => i16::from_be_bytes([buf[0], buf[1]]) as i64,
|
||||
3 => {
|
||||
let sign_extension = if buf[0] <= 0x7F { 0 } else { 0xFF };
|
||||
i32::from_be_bytes([sign_extension, buf[0], buf[1], buf[2]]) as i64
|
||||
}
|
||||
4 => i32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]) as i64,
|
||||
6 => {
|
||||
let sign_extension = if buf[0] <= 0x7F { 0 } else { 0xFF };
|
||||
i64::from_be_bytes([
|
||||
sign_extension,
|
||||
sign_extension,
|
||||
buf[0],
|
||||
buf[1],
|
||||
buf[2],
|
||||
buf[3],
|
||||
buf[4],
|
||||
buf[5],
|
||||
])
|
||||
}
|
||||
8 => i64::from_be_bytes([
|
||||
buf[0], buf[1], buf[2], buf[3], buf[4], buf[5], buf[6], buf[7],
|
||||
]),
|
||||
_ => 0,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn op_column(
|
||||
program: &Program,
|
||||
state: &mut ProgramState,
|
||||
@@ -1462,39 +1407,33 @@ pub fn op_column(
|
||||
let (_, cursor_type) = program.cursor_ref.get(*cursor_id).unwrap();
|
||||
match cursor_type {
|
||||
CursorType::BTreeTable(_) | CursorType::BTreeIndex(_) => {
|
||||
let value = 'value: {
|
||||
let mut cursor =
|
||||
'ifnull: {
|
||||
let mut cursor_ref =
|
||||
must_be_btree_cursor!(*cursor_id, program.cursor_ref, state, "Column");
|
||||
let cursor = cursor.as_btree_mut();
|
||||
let cursor = cursor_ref.as_btree_mut();
|
||||
|
||||
if cursor.get_null_flag() {
|
||||
break 'value Some(RefValue::Null);
|
||||
drop(cursor_ref);
|
||||
state.registers[*dest] = Register::Value(Value::Null);
|
||||
state.pc += 1;
|
||||
return Ok(InsnFunctionStepResult::Step);
|
||||
}
|
||||
|
||||
let record_result = return_if_io!(cursor.record());
|
||||
let Some(record) = record_result.as_ref() else {
|
||||
break 'value None;
|
||||
let Some(payload) = record_result.as_ref().map(|r| r.get_payload()) else {
|
||||
break 'ifnull;
|
||||
};
|
||||
|
||||
let payload = record.get_payload();
|
||||
|
||||
if payload.is_empty() {
|
||||
break 'value None;
|
||||
}
|
||||
|
||||
let mut record_cursor = cursor.record_cursor.borrow_mut();
|
||||
|
||||
if record_cursor.serial_types.is_empty() && record_cursor.offsets.is_empty() {
|
||||
if record_cursor.offsets.is_empty() {
|
||||
let (header_size, header_len_bytes) = read_varint_fast(payload)?;
|
||||
let header_size = header_size as usize;
|
||||
|
||||
if header_size > payload.len() || header_size > 98307 {
|
||||
return Err(LimboError::Corrupt("Header size exceeds bounds".into()));
|
||||
}
|
||||
debug_assert!(header_size <= payload.len() && header_size <= 98307, "header_size: {header_size}, header_len_bytes: {header_len_bytes}, payload.len(): {}", payload.len());
|
||||
|
||||
record_cursor.header_size = header_size;
|
||||
record_cursor.header_offset = header_len_bytes;
|
||||
|
||||
record_cursor.offsets.push(header_size);
|
||||
}
|
||||
|
||||
@@ -1504,175 +1443,195 @@ pub fn op_column(
|
||||
.offsets
|
||||
.last()
|
||||
.copied()
|
||||
.unwrap_or(record_cursor.header_size);
|
||||
|
||||
// Adjust data_offset if we already have some columns parsed
|
||||
if !record_cursor.serial_types.is_empty() && !record_cursor.offsets.is_empty() {
|
||||
data_offset = *record_cursor.offsets.last().unwrap();
|
||||
}
|
||||
.expect("header_offset must be set");
|
||||
|
||||
// Parse the header for serial types incrementally until we have the target column
|
||||
while record_cursor.serial_types.len() <= target_column
|
||||
&& parse_pos < record_cursor.header_size
|
||||
&& parse_pos < payload.len()
|
||||
{
|
||||
let (serial_type, varint_len) = read_varint_fast(&payload[parse_pos..])?;
|
||||
|
||||
record_cursor.serial_types.push(serial_type);
|
||||
parse_pos += varint_len;
|
||||
let data_size = match serial_type {
|
||||
// NULL
|
||||
0 => 0,
|
||||
// I8
|
||||
1 => 1,
|
||||
// I16
|
||||
2 => 2,
|
||||
// I24
|
||||
3 => 3,
|
||||
// I32
|
||||
4 => 4,
|
||||
// I48
|
||||
5 => 6,
|
||||
// I64
|
||||
6 => 8,
|
||||
// F64
|
||||
7 => 8,
|
||||
// CONST_INT0
|
||||
8 => 0,
|
||||
// CONST_INT1
|
||||
9 => 0,
|
||||
n if n >= 12 && n % 2 == 0 => (n - 12) / 2,
|
||||
n if n >= 13 && n % 2 == 1 => (n - 13) / 2,
|
||||
// BLOB
|
||||
n if n >= 12 && n & 1 == 0 => (n - 12) >> 1,
|
||||
// TEXT
|
||||
n if n >= 13 && n & 1 == 1 => (n - 13) >> 1,
|
||||
// Reserved
|
||||
10 | 11 => {
|
||||
return Err(LimboError::Corrupt(format!(
|
||||
"Reserved serial type: {serial_type}"
|
||||
)))
|
||||
}
|
||||
_ => {
|
||||
return Err(LimboError::Corrupt(format!(
|
||||
"Invalid serial type: {serial_type}"
|
||||
)))
|
||||
}
|
||||
_ => unreachable!("Invalid serial type: {serial_type}"),
|
||||
} as usize;
|
||||
data_offset += data_size;
|
||||
record_cursor.offsets.push(data_offset);
|
||||
}
|
||||
|
||||
debug_assert!(
|
||||
parse_pos <= record_cursor.header_size,
|
||||
"parse_pos: {parse_pos}, header_size: {}",
|
||||
record_cursor.header_size
|
||||
);
|
||||
record_cursor.header_offset = parse_pos;
|
||||
|
||||
if parse_pos > record_cursor.header_size || data_offset > payload.len() {
|
||||
record_cursor.serial_types.clear();
|
||||
record_cursor.offsets.clear();
|
||||
record_cursor.header_offset = 0;
|
||||
record_cursor.header_size = 0;
|
||||
break 'value None;
|
||||
}
|
||||
|
||||
if target_column >= record_cursor.serial_types.len() {
|
||||
break 'value None;
|
||||
}
|
||||
|
||||
let serial_type = record_cursor.serial_types[target_column];
|
||||
|
||||
// Fast path for common constant cases
|
||||
match serial_type {
|
||||
0 => break 'value Some(RefValue::Null),
|
||||
8 => break 'value Some(RefValue::Integer(0)),
|
||||
9 => break 'value Some(RefValue::Integer(1)),
|
||||
_ => {}
|
||||
}
|
||||
|
||||
if target_column + 1 >= record_cursor.offsets.len() {
|
||||
break 'value None;
|
||||
break 'ifnull;
|
||||
}
|
||||
|
||||
let start_offset = record_cursor.offsets[target_column];
|
||||
let end_offset = record_cursor.offsets[target_column + 1];
|
||||
|
||||
let data_slice = &payload[start_offset..end_offset];
|
||||
let data_len = end_offset - start_offset;
|
||||
// SAFETY: We know that the payload is valid until the next row is processed.
|
||||
let buf = unsafe {
|
||||
std::mem::transmute::<&[u8], &'static [u8]>(&payload[start_offset..end_offset])
|
||||
};
|
||||
let serial_type = record_cursor.serial_types[target_column];
|
||||
drop(record_result);
|
||||
drop(record_cursor);
|
||||
drop(cursor_ref);
|
||||
|
||||
match serial_type {
|
||||
1..=6 => {
|
||||
let expected_len = match serial_type {
|
||||
1 => 1,
|
||||
2 => 2,
|
||||
3 => 3,
|
||||
4 => 4,
|
||||
5 => 6,
|
||||
6 => 8,
|
||||
_ => 0,
|
||||
};
|
||||
|
||||
if data_len >= expected_len {
|
||||
Some(RefValue::Integer(read_integer_fast(
|
||||
data_slice,
|
||||
expected_len,
|
||||
)))
|
||||
} else {
|
||||
return Err(LimboError::Corrupt(format!(
|
||||
"Insufficient data for integer type {serial_type}: expected {expected_len}, got {data_len}"
|
||||
)));
|
||||
}
|
||||
// NULL
|
||||
0 => break 'ifnull,
|
||||
// I8
|
||||
1 => {
|
||||
state.registers[*dest] =
|
||||
Register::Value(Value::Integer(buf[0] as i8 as i64));
|
||||
}
|
||||
// I16
|
||||
2 => {
|
||||
state.registers[*dest] = Register::Value(Value::Integer(
|
||||
i16::from_be_bytes([buf[0], buf[1]]) as i64,
|
||||
));
|
||||
}
|
||||
// I24
|
||||
3 => {
|
||||
let sign_extension = (buf[0] > 0x7F) as u8 * 0xFF;
|
||||
let value = Value::Integer(i32::from_be_bytes([
|
||||
sign_extension,
|
||||
buf[0],
|
||||
buf[1],
|
||||
buf[2],
|
||||
]) as i64);
|
||||
state.registers[*dest] = Register::Value(value);
|
||||
}
|
||||
// I32
|
||||
4 => {
|
||||
let value =
|
||||
Value::Integer(
|
||||
i32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]) as i64
|
||||
);
|
||||
state.registers[*dest] = Register::Value(value);
|
||||
}
|
||||
// I48
|
||||
5 => {
|
||||
let sign_extension = (buf[0] > 0x7F) as u8 * 0xFF;
|
||||
let value = Value::Integer(i64::from_be_bytes([
|
||||
sign_extension,
|
||||
sign_extension,
|
||||
buf[0],
|
||||
buf[1],
|
||||
buf[2],
|
||||
buf[3],
|
||||
buf[4],
|
||||
buf[5],
|
||||
]));
|
||||
state.registers[*dest] = Register::Value(value);
|
||||
}
|
||||
// I64
|
||||
6 => {
|
||||
let value =
|
||||
Value::Integer(i64::from_be_bytes(buf[..8].try_into().unwrap()));
|
||||
state.registers[*dest] = Register::Value(value);
|
||||
}
|
||||
// F64
|
||||
7 => {
|
||||
if data_len >= 8 {
|
||||
let bytes = [
|
||||
data_slice[0],
|
||||
data_slice[1],
|
||||
data_slice[2],
|
||||
data_slice[3],
|
||||
data_slice[4],
|
||||
data_slice[5],
|
||||
data_slice[6],
|
||||
data_slice[7],
|
||||
];
|
||||
Some(RefValue::Float(f64::from_be_bytes(bytes)))
|
||||
} else {
|
||||
None
|
||||
let value = Value::Float(f64::from_be_bytes(buf[..8].try_into().unwrap()));
|
||||
state.registers[*dest] = Register::Value(value);
|
||||
}
|
||||
// CONST_INT0
|
||||
8 => {
|
||||
state.registers[*dest] = Register::Value(Value::Integer(0));
|
||||
}
|
||||
// CONST_INT1
|
||||
9 => {
|
||||
state.registers[*dest] = Register::Value(Value::Integer(1));
|
||||
}
|
||||
// BLOB
|
||||
n if n >= 12 && n & 1 == 0 => {
|
||||
// Try to reuse the registers when allocation is not needed.
|
||||
match state.registers[*dest] {
|
||||
Register::Value(Value::Blob(ref mut existing_blob)) => {
|
||||
existing_blob.do_extend(&buf);
|
||||
}
|
||||
_ => {
|
||||
state.registers[*dest] = Register::Value(Value::Blob(buf.to_vec()));
|
||||
}
|
||||
}
|
||||
}
|
||||
n if n >= 12 && n % 2 == 0 => {
|
||||
Some(RefValue::Blob(RawSlice::create_from(data_slice)))
|
||||
// TEXT
|
||||
n if n >= 13 && n & 1 == 1 => {
|
||||
// Try to reuse the registers when allocation is not needed.
|
||||
match state.registers[*dest] {
|
||||
Register::Value(Value::Text(ref mut existing_text)) => {
|
||||
// SAFETY: We know the text is valid UTF-8 because we only accept valid UTF-8 and the serial type is TEXT.
|
||||
let text = unsafe { std::str::from_utf8_unchecked(buf) };
|
||||
existing_text.do_extend(&text);
|
||||
}
|
||||
_ => {
|
||||
// SAFETY: We know the text is valid UTF-8 because we only accept valid UTF-8 and the serial type is TEXT.
|
||||
let text = unsafe { std::str::from_utf8_unchecked(buf) };
|
||||
state.registers[*dest] =
|
||||
Register::Value(Value::Text(Text::new(text)));
|
||||
}
|
||||
}
|
||||
}
|
||||
n if n >= 13 && n % 2 == 1 => Some(RefValue::Text(TextRef::create_from(
|
||||
data_slice,
|
||||
TextSubtype::Text,
|
||||
))),
|
||||
_ => None,
|
||||
_ => panic!("Invalid serial type: {serial_type}"),
|
||||
}
|
||||
};
|
||||
|
||||
let Some(value) = value else {
|
||||
// DEFAULT handling. Try to reuse the registers when allocation is not needed.
|
||||
let Some(ref default) = default else {
|
||||
state.registers[*dest] = Register::Value(Value::Null);
|
||||
state.pc += 1;
|
||||
return Ok(InsnFunctionStepResult::Step);
|
||||
};
|
||||
match (default, &mut state.registers[*dest]) {
|
||||
(Value::Text(new_text), Register::Value(Value::Text(existing_text))) => {
|
||||
existing_text.do_extend(new_text);
|
||||
}
|
||||
(Value::Blob(new_blob), Register::Value(Value::Blob(existing_blob))) => {
|
||||
existing_blob.do_extend(new_blob);
|
||||
}
|
||||
_ => {
|
||||
state.registers[*dest] = Register::Value(default.clone());
|
||||
}
|
||||
}
|
||||
state.pc += 1;
|
||||
return Ok(InsnFunctionStepResult::Step);
|
||||
};
|
||||
|
||||
// Try to reuse the registers when allocation is not needed.
|
||||
match (&value, &mut state.registers[*dest]) {
|
||||
(RefValue::Text(new_text), Register::Value(Value::Text(existing_text))) => {
|
||||
// DEFAULT handling. Try to reuse the registers when allocation is not needed.
|
||||
let Some(ref default) = default else {
|
||||
state.registers[*dest] = Register::Value(Value::Null);
|
||||
state.pc += 1;
|
||||
return Ok(InsnFunctionStepResult::Step);
|
||||
};
|
||||
match (default, &mut state.registers[*dest]) {
|
||||
(Value::Text(new_text), Register::Value(Value::Text(existing_text))) => {
|
||||
existing_text.do_extend(new_text);
|
||||
}
|
||||
(RefValue::Blob(new_blob), Register::Value(Value::Blob(existing_blob))) => {
|
||||
(Value::Blob(new_blob), Register::Value(Value::Blob(existing_blob))) => {
|
||||
existing_blob.do_extend(new_blob);
|
||||
}
|
||||
_ => {
|
||||
state.registers[*dest] = Register::Value(match value {
|
||||
RefValue::Integer(i) => Value::Integer(i),
|
||||
RefValue::Float(f) => Value::Float(f),
|
||||
RefValue::Text(t) => Value::Text(Text::new(t.as_str())),
|
||||
RefValue::Blob(b) => Value::Blob(b.to_slice().to_vec()),
|
||||
RefValue::Null => Value::Null,
|
||||
});
|
||||
state.registers[*dest] = Register::Value(default.clone());
|
||||
}
|
||||
}
|
||||
state.pc += 1;
|
||||
return Ok(InsnFunctionStepResult::Step);
|
||||
}
|
||||
CursorType::Sorter => {
|
||||
let record = {
|
||||
|
||||
Reference in New Issue
Block a user