Merge 'Implement Once and OpenAutoindex opcodes' from Jussi Saurio

Some enabler work for enabling autoindexing, which means creating a
transient in memory index when a persistent index is not found. This is
used to avoid nested full table scans.
@pereman2 I added some functionality to your `SmallVec` so would
appreciate a review of those parts

Reviewed-by: Pere Diaz Bou <pere-altea@homail.com>

Closes #1340
This commit is contained in:
Jussi Saurio
2025-04-14 11:45:41 +03:00
6 changed files with 156 additions and 18 deletions

View File

@@ -501,9 +501,9 @@ Modifiers:
| NotNull | Yes | |
| Null | Yes | |
| NullRow | Yes | |
| Once | No | |
| OpenAutoindex | No | |
| OpenEphemeral | No | |
| Once | Yes | |
| OpenAutoindex | Yes | |
| OpenEphemeral | Yes | |
| OpenPseudo | Yes | |
| OpenRead | Yes | |
| OpenWrite | Yes | |

View File

@@ -1057,13 +1057,16 @@ pub fn validate_serial_type(value: u64) -> Result<SerialType> {
}
}
struct SmallVec<T> {
pub data: [std::mem::MaybeUninit<T>; 64],
pub struct SmallVec<T, const N: usize = 64> {
/// Stack allocated data
pub data: [std::mem::MaybeUninit<T>; N],
/// Length of the vector, accounting for both stack and heap allocated data
pub len: usize,
/// Extra data on heap
pub extra_data: Option<Vec<T>>,
}
impl<T: Default + Copy> SmallVec<T> {
impl<T: Default + Copy, const N: usize> SmallVec<T, N> {
pub fn new() -> Self {
Self {
data: unsafe { std::mem::MaybeUninit::uninit().assume_init() },
@@ -1084,6 +1087,50 @@ impl<T: Default + Copy> SmallVec<T> {
self.len += 1;
}
}
fn get_from_heap(&self, index: usize) -> T {
assert!(self.extra_data.is_some());
assert!(index >= self.data.len());
let extra_data_index = index - self.data.len();
let extra_data = self.extra_data.as_ref().unwrap();
assert!(extra_data_index < extra_data.len());
extra_data[extra_data_index]
}
pub fn get(&self, index: usize) -> Option<T> {
if index >= self.len {
return None;
}
let data_is_on_stack = index < self.data.len();
if data_is_on_stack {
// SAFETY: We know this index is initialized we checked for index < self.len earlier above.
unsafe { Some(self.data[index].assume_init()) }
} else {
Some(self.get_from_heap(index))
}
}
}
impl<T: Default + Copy, const N: usize> SmallVec<T, N> {
pub fn iter(&self) -> SmallVecIter<'_, T, N> {
SmallVecIter { vec: self, pos: 0 }
}
}
pub struct SmallVecIter<'a, T, const N: usize> {
vec: &'a SmallVec<T, N>,
pos: usize,
}
impl<'a, T: Default + Copy, const N: usize> Iterator for SmallVecIter<'a, T, N> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
self.vec.get(self.pos).map(|item| {
self.pos += 1;
item
})
}
}
pub fn read_record(payload: &[u8], reuse_immutable: &mut ImmutableRecord) -> Result<()> {
@@ -1099,7 +1146,7 @@ pub fn read_record(payload: &[u8], reuse_immutable: &mut ImmutableRecord) -> Res
let mut header_size = (header_size as usize) - nr;
pos += nr;
let mut serial_types = SmallVec::new();
let mut serial_types = SmallVec::<u64, 64>::new();
while header_size > 0 {
let (serial_type, nr) = read_varint(&reuse_immutable.get_payload()[pos..])?;
let serial_type = validate_serial_type(serial_type)?;
@@ -1685,4 +1732,33 @@ mod tests {
let result = validate_serial_type(10);
assert!(result.is_err());
}
#[test]
fn test_smallvec_iter() {
let mut small_vec = SmallVec::<i32, 4>::new();
(0..8).for_each(|i| small_vec.push(i));
let mut iter = small_vec.iter();
assert_eq!(iter.next(), Some(0));
assert_eq!(iter.next(), Some(1));
assert_eq!(iter.next(), Some(2));
assert_eq!(iter.next(), Some(3));
assert_eq!(iter.next(), Some(4));
assert_eq!(iter.next(), Some(5));
assert_eq!(iter.next(), Some(6));
assert_eq!(iter.next(), Some(7));
assert_eq!(iter.next(), None);
}
#[test]
fn test_smallvec_get() {
let mut small_vec = SmallVec::<i32, 4>::new();
(0..8).for_each(|i| small_vec.push(i));
(0..8).for_each(|i| {
assert_eq!(small_vec.get(i), Some(i as i32));
});
assert_eq!(small_vec.get(8), None);
}
}

View File

@@ -4350,12 +4350,13 @@ pub fn op_open_ephemeral(
pager: &Rc<Pager>,
mv_store: Option<&Rc<MvStore>>,
) -> Result<InsnFunctionStepResult> {
let Insn::OpenEphemeral {
cursor_id,
is_table,
} = insn
else {
unreachable!("unexpected Insn {:?}", insn)
let (cursor_id, is_table) = match insn {
Insn::OpenEphemeral {
cursor_id,
is_table,
} => (*cursor_id, *is_table),
Insn::OpenAutoindex { cursor_id } => (*cursor_id, false),
_ => unreachable!("unexpected Insn {:?}", insn),
};
let conn = program.connection.upgrade().unwrap();
@@ -4378,7 +4379,7 @@ pub fn op_open_ephemeral(
buffer_pool,
)?);
let flag = if *is_table {
let flag = if is_table {
&CreateBTreeFlags::new_table()
} else {
&CreateBTreeFlags::new_index()
@@ -4386,7 +4387,7 @@ pub fn op_open_ephemeral(
let root_page = pager.btree_create(flag);
let (_, cursor_type) = program.cursor_ref.get(*cursor_id).unwrap();
let (_, cursor_type) = program.cursor_ref.get(cursor_id).unwrap();
let mv_cursor = match state.mv_tx_id {
Some(tx_id) => {
let table_id = root_page as u64;
@@ -4407,13 +4408,13 @@ pub fn op_open_ephemeral(
match cursor_type {
CursorType::BTreeTable(_) => {
cursors
.get_mut(*cursor_id)
.get_mut(cursor_id)
.unwrap()
.replace(Cursor::new_btree(cursor));
}
CursorType::BTreeIndex(_) => {
cursors
.get_mut(*cursor_id)
.get_mut(cursor_id)
.unwrap()
.replace(Cursor::new_btree(cursor));
}
@@ -4432,6 +4433,34 @@ pub fn op_open_ephemeral(
Ok(InsnFunctionStepResult::Step)
}
/// Execute the [Insn::Once] instruction.
///
/// This instruction is used to execute a block of code only once.
/// If the instruction is executed again, it will jump to the target program counter.
pub fn op_once(
program: &Program,
state: &mut ProgramState,
insn: &Insn,
pager: &Rc<Pager>,
mv_store: Option<&Rc<MvStore>>,
) -> Result<InsnFunctionStepResult> {
let Insn::Once {
target_pc_when_reentered,
} = insn
else {
unreachable!("unexpected Insn: {:?}", insn)
};
assert!(target_pc_when_reentered.is_offset());
let offset = state.pc;
if state.once.iter().any(|o| o == offset) {
state.pc = target_pc_when_reentered.to_offset_int();
return Ok(InsnFunctionStepResult::Step);
}
state.once.push(offset);
state.pc += 1;
Ok(InsnFunctionStepResult::Step)
}
fn exec_lower(reg: &OwnedValue) -> Option<OwnedValue> {
match reg {
OwnedValue::Text(t) => Some(OwnedValue::build_text(&t.as_str().to_lowercase())),

View File

@@ -1321,6 +1321,26 @@ pub fn insn_to_str(
if *is_table { "true" } else { "false" }
),
),
Insn::OpenAutoindex { cursor_id } => (
"OpenAutoindex",
*cursor_id as i32,
0,
0,
OwnedValue::build_text(""),
0,
format!("cursor={}", cursor_id),
),
Insn::Once {
target_pc_when_reentered,
} => (
"Once",
target_pc_when_reentered.to_debug_int(),
0,
0,
OwnedValue::build_text(""),
0,
format!("goto {}", target_pc_when_reentered.to_debug_int()),
),
};
format!(
"{:<4} {:<17} {:<4} {:<4} {:<4} {:<13} {:<2} {}",

View File

@@ -780,6 +780,14 @@ pub enum Insn {
cursor_id: usize,
is_table: bool,
},
/// Works the same as OpenEphemeral, name just distinguishes its use; used for transient indexes in joins.
OpenAutoindex {
cursor_id: usize,
},
/// Fall through to the next instruction on the first invocation, otherwise jump to target_pc
Once {
target_pc_when_reentered: BranchOffset,
},
}
impl Insn {
@@ -888,7 +896,8 @@ impl Insn {
Insn::Noop => execute::op_noop,
Insn::PageCount { .. } => execute::op_page_count,
Insn::ReadCookie { .. } => execute::op_read_cookie,
Insn::OpenEphemeral { .. } => execute::op_open_ephemeral,
Insn::OpenEphemeral { .. } | Insn::OpenAutoindex { .. } => execute::op_open_ephemeral,
Insn::Once { .. } => execute::op_once,
}
}
}

View File

@@ -28,6 +28,7 @@ use crate::{
error::LimboError,
fast_lock::SpinLock,
function::{AggFunc, FuncCtx},
storage::sqlite3_ondisk::SmallVec,
};
use crate::{
@@ -232,6 +233,8 @@ pub struct ProgramState {
last_compare: Option<std::cmp::Ordering>,
deferred_seek: Option<(CursorID, CursorID)>,
ended_coroutine: Bitfield<4>, // flag to indicate that a coroutine has ended (key is the yield register. currently we assume that the yield register is always between 0-255, YOLO)
/// Indicate whether an [Insn::Once] instruction at a given program counter position has already been executed, well, once.
once: SmallVec<u32, 4>,
regex_cache: RegexCache,
pub(crate) mv_tx_id: Option<crate::mvcc::database::TxID>,
interrupted: bool,
@@ -254,6 +257,7 @@ impl ProgramState {
last_compare: None,
deferred_seek: None,
ended_coroutine: Bitfield::new(),
once: SmallVec::<u32, 4>::new(),
regex_cache: RegexCache::new(),
mv_tx_id: None,
interrupted: false,