Files
turso/core/vdbe/mod.rs
Jussi Saurio 5632a6046e Merge 'Fix: allow DeferredSeek on more than one cursor per program' from Jussi Saurio
Found while fuzzing nested subqueries. Since subqueries result in nested
plans, it quickly revealed that there can be multiple `DeferredSeek`
instructions issued for different cursors, but our `ProgramState` only
supported one at a time.

Closes #1610
2025-05-30 09:39:23 +03:00

703 lines
23 KiB
Rust

//! The virtual database engine (VDBE).
//!
//! The VDBE is a register-based virtual machine that execute bytecode
//! instructions that represent SQL statements. When an application prepares
//! an SQL statement, the statement is compiled into a sequence of bytecode
//! instructions that perform the needed operations, such as reading or
//! writing to a b-tree, sorting, or aggregating data.
//!
//! The instruction set of the VDBE is similar to SQLite's instruction set,
//! but with the exception that bytecodes that perform I/O operations are
//! return execution back to the caller instead of blocking. This is because
//! Limbo is designed for applications that need high concurrency such as
//! serverless runtimes. In addition, asynchronous I/O makes storage
//! disaggregation easier.
//!
//! You can find a full list of SQLite opcodes at:
//!
//! https://www.sqlite.org/opcode.html
pub mod builder;
pub mod execute;
pub mod explain;
pub mod insn;
pub mod likeop;
pub mod sorter;
use crate::{
error::LimboError,
fast_lock::SpinLock,
function::{AggFunc, FuncCtx},
storage::{pager::PagerCacheflushStatus, sqlite3_ondisk::SmallVec},
translate::plan::TableReferences,
};
use crate::{
storage::{btree::BTreeCursor, pager::Pager, sqlite3_ondisk::DatabaseHeader},
translate::plan::ResultSetColumn,
types::{AggContext, Cursor, CursorResult, ImmutableRecord, SeekKey, SeekOp, Value},
vdbe::{builder::CursorType, insn::Insn},
};
#[cfg(feature = "json")]
use crate::json::JsonCacheCell;
use crate::{Connection, MvStore, Result, TransactionState};
use builder::CursorKey;
use execute::{InsnFunction, InsnFunctionStepResult, OpIdxDeleteState};
use rand::{
distributions::{Distribution, Uniform},
Rng,
};
use regex::Regex;
use std::{
cell::{Cell, RefCell},
collections::HashMap,
ffi::c_void,
num::NonZero,
ops::Deref,
rc::{Rc, Weak},
sync::Arc,
};
/// We use labels to indicate that we want to jump to whatever the instruction offset
/// will be at runtime, because the offset cannot always be determined when the jump
/// instruction is created.
///
/// In some cases, we want to jump to EXACTLY a specific instruction.
/// - Example: a condition is not met, so we want to jump to wherever Halt is.
/// In other cases, we don't care what the exact instruction is, but we know that we
/// want to jump to whatever comes AFTER a certain instruction.
/// - Example: a Next instruction will want to jump to "whatever the start of the loop is",
/// but it doesn't care what instruction that is.
///
/// The reason this distinction is important is that we might reorder instructions that are
/// constant at compile time, and when we do that, we need to change the offsets of any impacted
/// jump instructions, so the instruction that comes immediately after "next Insn" might have changed during the reordering.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum JumpTarget {
ExactlyThisInsn,
AfterThisInsn,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
/// Represents a target for a jump instruction.
/// Stores 32-bit ints to keep the enum word-sized.
pub enum BranchOffset {
/// A label is a named location in the program.
/// If there are references to it, it must always be resolved to an Offset
/// via program.resolve_label().
Label(u32),
/// An offset is a direct index into the instruction list.
Offset(InsnReference),
/// A placeholder is a temporary value to satisfy the compiler.
/// It must be set later.
Placeholder,
}
impl BranchOffset {
/// Returns true if the branch offset is a label.
pub fn is_label(&self) -> bool {
matches!(self, BranchOffset::Label(_))
}
/// Returns true if the branch offset is an offset.
pub fn is_offset(&self) -> bool {
matches!(self, BranchOffset::Offset(_))
}
/// Returns the offset value. Panics if the branch offset is a label or placeholder.
pub fn to_offset_int(&self) -> InsnReference {
match self {
BranchOffset::Label(v) => unreachable!("Unresolved label: {}", v),
BranchOffset::Offset(v) => *v,
BranchOffset::Placeholder => unreachable!("Unresolved placeholder"),
}
}
/// Returns the branch offset as a signed integer.
/// Used in explain output, where we don't want to panic in case we have an unresolved
/// label or placeholder.
pub fn to_debug_int(&self) -> i32 {
match self {
BranchOffset::Label(v) => *v as i32,
BranchOffset::Offset(v) => *v as i32,
BranchOffset::Placeholder => i32::MAX,
}
}
/// Adds an integer value to the branch offset.
/// Returns a new branch offset.
/// Panics if the branch offset is a label or placeholder.
pub fn add<N: Into<u32>>(self, n: N) -> BranchOffset {
BranchOffset::Offset(self.to_offset_int() + n.into())
}
pub fn sub<N: Into<u32>>(self, n: N) -> BranchOffset {
BranchOffset::Offset(self.to_offset_int() - n.into())
}
}
pub type CursorID = usize;
pub type PageIdx = usize;
// Index of insn in list of insns
type InsnReference = u32;
#[derive(Debug)]
pub enum StepResult {
Done,
IO,
Row,
Interrupt,
Busy,
}
/// If there is I/O, the instruction is restarted.
/// Evaluate a Result<CursorResult<T>>, if IO return Ok(StepResult::IO).
#[macro_export]
macro_rules! return_if_io {
($expr:expr) => {
match $expr? {
CursorResult::Ok(v) => v,
CursorResult::IO => return Ok(StepResult::IO),
}
};
}
struct RegexCache {
like: HashMap<String, Regex>,
glob: HashMap<String, Regex>,
}
impl RegexCache {
fn new() -> Self {
Self {
like: HashMap::new(),
glob: HashMap::new(),
}
}
}
struct Bitfield<const N: usize>([u64; N]);
impl<const N: usize> Bitfield<N> {
fn new() -> Self {
Self([0; N])
}
fn set(&mut self, bit: usize) {
assert!(bit < N * 64, "bit out of bounds");
self.0[bit / 64] |= 1 << (bit % 64);
}
fn unset(&mut self, bit: usize) {
assert!(bit < N * 64, "bit out of bounds");
self.0[bit / 64] &= !(1 << (bit % 64));
}
fn get(&self, bit: usize) -> bool {
assert!(bit < N * 64, "bit out of bounds");
(self.0[bit / 64] & (1 << (bit % 64))) != 0
}
}
type VTabOpaqueCursorCloseFn = unsafe extern "C" fn(*const c_void) -> limbo_ext::ResultCode;
pub struct VTabOpaqueCursor {
cursor: *const c_void,
close: VTabOpaqueCursorCloseFn,
}
impl VTabOpaqueCursor {
pub fn new(cursor: *const c_void, close: VTabOpaqueCursorCloseFn) -> Result<Self> {
if cursor.is_null() {
return Err(LimboError::InternalError(
"VTabOpaqueCursor: cursor is null".into(),
));
}
Ok(Self { cursor, close })
}
pub fn as_ptr(&self) -> *const c_void {
self.cursor
}
}
impl Drop for VTabOpaqueCursor {
fn drop(&mut self) {
let result = unsafe { (self.close)(self.cursor) };
if !result.is_ok() {
tracing::error!("Failed to close virtual table cursor");
}
}
}
#[derive(Copy, Clone, PartialEq, Eq)]
/// The commit state of the program.
/// There are two states:
/// - Ready: The program is ready to run the next instruction, or has shut down after
/// the last instruction.
/// - Committing: The program is committing a write transaction. It is waiting for the pager to finish flushing the cache to disk,
/// primarily to the WAL, but also possibly checkpointing the WAL to the database file.
enum CommitState {
Ready,
Committing,
}
#[derive(Debug, Clone)]
pub enum Register {
Value(Value),
Aggregate(AggContext),
Record(ImmutableRecord),
}
/// A row is a the list of registers that hold the values for a filtered row. This row is a pointer, therefore
/// after stepping again, row will be invalidated to be sure it doesn't point to somewhere unexpected.
#[derive(Debug)]
pub struct Row {
values: *const Register,
count: usize,
}
/// The program state describes the environment in which the program executes.
pub struct ProgramState {
pub pc: InsnReference,
cursors: RefCell<Vec<Option<Cursor>>>,
registers: Vec<Register>,
pub(crate) result_row: Option<Row>,
last_compare: Option<std::cmp::Ordering>,
deferred_seeks: Vec<Option<(CursorID, CursorID)>>,
ended_coroutine: Bitfield<4>, // flag to indicate that a coroutine has ended (key is the yield register. currently we assume that the yield register is always between 0-255, YOLO)
/// Indicate whether an [Insn::Once] instruction at a given program counter position has already been executed, well, once.
once: SmallVec<u32, 4>,
regex_cache: RegexCache,
pub(crate) mv_tx_id: Option<crate::mvcc::database::TxID>,
interrupted: bool,
parameters: HashMap<NonZero<usize>, Value>,
commit_state: CommitState,
#[cfg(feature = "json")]
json_cache: JsonCacheCell,
op_idx_delete_state: Option<OpIdxDeleteState>,
}
impl ProgramState {
pub fn new(max_registers: usize, max_cursors: usize) -> Self {
let cursors: RefCell<Vec<Option<Cursor>>> =
RefCell::new((0..max_cursors).map(|_| None).collect());
let registers = vec![Register::Value(Value::Null); max_registers];
Self {
pc: 0,
cursors,
registers,
result_row: None,
last_compare: None,
deferred_seeks: vec![None; max_cursors],
ended_coroutine: Bitfield::new(),
once: SmallVec::<u32, 4>::new(),
regex_cache: RegexCache::new(),
mv_tx_id: None,
interrupted: false,
parameters: HashMap::new(),
commit_state: CommitState::Ready,
#[cfg(feature = "json")]
json_cache: JsonCacheCell::new(),
op_idx_delete_state: None,
}
}
pub fn column_count(&self) -> usize {
self.registers.len()
}
pub fn column(&self, i: usize) -> Option<String> {
Some(format!("{:?}", self.registers[i]))
}
pub fn interrupt(&mut self) {
self.interrupted = true;
}
pub fn is_interrupted(&self) -> bool {
self.interrupted
}
pub fn bind_at(&mut self, index: NonZero<usize>, value: Value) {
self.parameters.insert(index, value);
}
pub fn get_parameter(&self, index: NonZero<usize>) -> Value {
self.parameters.get(&index).cloned().unwrap_or(Value::Null)
}
pub fn reset(&mut self) {
self.pc = 0;
self.cursors.borrow_mut().iter_mut().for_each(|c| *c = None);
self.registers
.iter_mut()
.for_each(|r| *r = Register::Value(Value::Null));
self.last_compare = None;
self.deferred_seeks.iter_mut().for_each(|s| *s = None);
self.ended_coroutine.0 = [0; 4];
self.regex_cache.like.clear();
self.interrupted = false;
self.parameters.clear();
#[cfg(feature = "json")]
self.json_cache.clear()
}
pub fn get_cursor<'a>(&'a self, cursor_id: CursorID) -> std::cell::RefMut<'a, Cursor> {
let cursors = self.cursors.borrow_mut();
std::cell::RefMut::map(cursors, |c| {
c.get_mut(cursor_id)
.unwrap_or_else(|| panic!("cursor id {} out of bounds", cursor_id))
.as_mut()
.unwrap_or_else(|| panic!("cursor id {} is None", cursor_id))
})
}
}
impl Register {
pub fn get_owned_value(&self) -> &Value {
match self {
Register::Value(v) => v,
_ => unreachable!(),
}
}
}
#[macro_export]
macro_rules! must_be_btree_cursor {
($cursor_id:expr, $cursor_ref:expr, $state:expr, $insn_name:expr) => {{
let (_, cursor_type) = $cursor_ref.get($cursor_id).unwrap();
let cursor = match cursor_type {
CursorType::BTreeTable(_) => $state.get_cursor($cursor_id),
CursorType::BTreeIndex(_) => $state.get_cursor($cursor_id),
CursorType::Pseudo(_) => panic!("{} on pseudo cursor", $insn_name),
CursorType::Sorter => panic!("{} on sorter cursor", $insn_name),
CursorType::VirtualTable(_) => panic!("{} on virtual table cursor", $insn_name),
};
cursor
}};
}
#[derive(Debug)]
pub struct Program {
pub max_registers: usize,
pub insns: Vec<(Insn, InsnFunction)>,
pub cursor_ref: Vec<(Option<CursorKey>, CursorType)>,
pub database_header: Arc<SpinLock<DatabaseHeader>>,
pub comments: Option<Vec<(InsnReference, &'static str)>>,
pub parameters: crate::parameters::Parameters,
pub connection: Weak<Connection>,
pub n_change: Cell<i64>,
pub change_cnt_on: bool,
pub result_columns: Vec<ResultSetColumn>,
pub table_references: TableReferences,
}
impl Program {
pub fn step(
&self,
state: &mut ProgramState,
mv_store: Option<Rc<MvStore>>,
pager: Rc<Pager>,
) -> Result<StepResult> {
loop {
if state.is_interrupted() {
return Ok(StepResult::Interrupt);
}
// invalidate row
let _ = state.result_row.take();
let (insn, insn_function) = &self.insns[state.pc as usize];
trace_insn(self, state.pc as InsnReference, insn);
let res = insn_function(self, state, insn, &pager, mv_store.as_ref())?;
match res {
InsnFunctionStepResult::Step => {}
InsnFunctionStepResult::Done => return Ok(StepResult::Done),
InsnFunctionStepResult::IO => return Ok(StepResult::IO),
InsnFunctionStepResult::Row => return Ok(StepResult::Row),
InsnFunctionStepResult::Interrupt => return Ok(StepResult::Interrupt),
InsnFunctionStepResult::Busy => return Ok(StepResult::Busy),
}
}
}
pub fn commit_txn(
&self,
pager: Rc<Pager>,
program_state: &mut ProgramState,
mv_store: Option<&Rc<MvStore>>,
) -> Result<StepResult> {
if let Some(mv_store) = mv_store {
let conn = self.connection.upgrade().unwrap();
let auto_commit = conn.auto_commit.get();
if auto_commit {
let mut mv_transactions = conn.mv_transactions.borrow_mut();
for tx_id in mv_transactions.iter() {
mv_store.commit_tx(*tx_id).unwrap();
}
mv_transactions.clear();
}
Ok(StepResult::Done)
} else {
let connection = self
.connection
.upgrade()
.expect("only weak ref to connection?");
let auto_commit = connection.auto_commit.get();
tracing::trace!("Halt auto_commit {}", auto_commit);
if program_state.commit_state == CommitState::Committing {
self.step_end_write_txn(&pager, &mut program_state.commit_state, connection.deref())
} else if auto_commit {
let current_state = connection.transaction_state.get();
match current_state {
TransactionState::Write => self.step_end_write_txn(
&pager,
&mut program_state.commit_state,
connection.deref(),
),
TransactionState::Read => {
connection.transaction_state.replace(TransactionState::None);
pager.end_read_tx()?;
Ok(StepResult::Done)
}
TransactionState::None => Ok(StepResult::Done),
}
} else {
if self.change_cnt_on {
if let Some(conn) = self.connection.upgrade() {
conn.set_changes(self.n_change.get());
}
}
Ok(StepResult::Done)
}
}
}
fn step_end_write_txn(
&self,
pager: &Rc<Pager>,
commit_state: &mut CommitState,
connection: &Connection,
) -> Result<StepResult> {
let cacheflush_status = pager.end_tx()?;
match cacheflush_status {
PagerCacheflushStatus::Done(_) => {
if self.change_cnt_on {
if let Some(conn) = self.connection.upgrade() {
conn.set_changes(self.n_change.get());
}
}
connection.transaction_state.replace(TransactionState::None);
*commit_state = CommitState::Ready;
}
PagerCacheflushStatus::IO => {
tracing::trace!("Cacheflush IO");
*commit_state = CommitState::Committing;
return Ok(StepResult::IO);
}
}
Ok(StepResult::Done)
}
#[rustfmt::skip]
pub fn explain(&self) -> String {
let mut buff = String::with_capacity(1024);
buff.push_str("addr opcode p1 p2 p3 p4 p5 comment\n");
buff.push_str("---- ----------------- ---- ---- ---- ------------- -- -------\n");
let mut indent_count: usize = 0;
let indent = " ";
let mut prev_insn: Option<&Insn> = None;
for (addr, (insn, _)) in self.insns.iter().enumerate() {
indent_count = get_indent_count(indent_count, insn, prev_insn);
print_insn(
self,
addr as InsnReference,
insn,
indent.repeat(indent_count),
&mut buff,
);
buff.push('\n');
prev_insn = Some(insn);
}
buff
}
}
fn get_new_rowid<R: Rng>(cursor: &mut BTreeCursor, mut rng: R) -> Result<CursorResult<i64>> {
match cursor.seek_to_last()? {
CursorResult::Ok(()) => {}
CursorResult::IO => return Ok(CursorResult::IO),
}
let mut rowid = cursor
.rowid()?
.unwrap_or(0) // if BTree is empty - use 0 as initial value for rowid
.checked_add(1) // add 1 but be careful with overflows
.unwrap_or(u64::MAX); // in case of overflow - use u64::MAX
if rowid > i64::MAX.try_into().unwrap() {
let distribution = Uniform::from(1..=i64::MAX);
let max_attempts = 100;
for count in 0..max_attempts {
rowid = distribution.sample(&mut rng).try_into().unwrap();
match cursor.seek(SeekKey::TableRowId(rowid), SeekOp::EQ)? {
CursorResult::Ok(false) => break, // Found a non-existing rowid
CursorResult::Ok(true) => {
if count == max_attempts - 1 {
return Err(LimboError::InternalError(
"Failed to generate a new rowid".to_string(),
));
} else {
continue; // Try next random rowid
}
}
CursorResult::IO => return Ok(CursorResult::IO),
}
}
}
Ok(CursorResult::Ok(rowid.try_into().unwrap()))
}
fn make_record(registers: &[Register], start_reg: &usize, count: &usize) -> ImmutableRecord {
ImmutableRecord::from_registers(&registers[*start_reg..*start_reg + *count])
}
fn trace_insn(program: &Program, addr: InsnReference, insn: &Insn) {
if !tracing::enabled!(tracing::Level::TRACE) {
return;
}
tracing::trace!(
"{}",
explain::insn_to_str(
program,
addr,
insn,
String::new(),
program.comments.as_ref().and_then(|comments| comments
.iter()
.find(|(offset, _)| *offset == addr)
.map(|(_, comment)| comment)
.copied())
)
);
}
fn print_insn(program: &Program, addr: InsnReference, insn: &Insn, indent: String, w: &mut String) {
let s = explain::insn_to_str(
program,
addr,
insn,
indent,
program.comments.as_ref().and_then(|comments| {
comments
.iter()
.find(|(offset, _)| *offset == addr)
.map(|(_, comment)| comment)
.copied()
}),
);
w.push_str(&s);
}
fn get_indent_count(indent_count: usize, curr_insn: &Insn, prev_insn: Option<&Insn>) -> usize {
let indent_count = if let Some(insn) = prev_insn {
match insn {
Insn::Rewind { .. }
| Insn::Last { .. }
| Insn::SorterSort { .. }
| Insn::SeekGE { .. }
| Insn::SeekGT { .. }
| Insn::SeekLE { .. }
| Insn::SeekLT { .. } => indent_count + 1,
_ => indent_count,
}
} else {
indent_count
};
match curr_insn {
Insn::Next { .. } | Insn::SorterNext { .. } | Insn::Prev { .. } => indent_count - 1,
_ => indent_count,
}
}
pub trait FromValueRow<'a> {
fn from_value(value: &'a Value) -> Result<Self>
where
Self: Sized + 'a;
}
impl<'a> FromValueRow<'a> for i64 {
fn from_value(value: &'a Value) -> Result<Self> {
match value {
Value::Integer(i) => Ok(*i),
_ => Err(LimboError::ConversionError("Expected integer value".into())),
}
}
}
impl<'a> FromValueRow<'a> for f64 {
fn from_value(value: &'a Value) -> Result<Self> {
match value {
Value::Float(f) => Ok(*f),
_ => Err(LimboError::ConversionError("Expected integer value".into())),
}
}
}
impl<'a> FromValueRow<'a> for String {
fn from_value(value: &'a Value) -> Result<Self> {
match value {
Value::Text(s) => Ok(s.as_str().to_string()),
_ => Err(LimboError::ConversionError("Expected text value".into())),
}
}
}
impl<'a> FromValueRow<'a> for &'a str {
fn from_value(value: &'a Value) -> Result<Self> {
match value {
Value::Text(s) => Ok(s.as_str()),
_ => Err(LimboError::ConversionError("Expected text value".into())),
}
}
}
impl<'a> FromValueRow<'a> for &'a Value {
fn from_value(value: &'a Value) -> Result<Self> {
Ok(value)
}
}
impl Row {
pub fn get<'a, T: FromValueRow<'a> + 'a>(&'a self, idx: usize) -> Result<T> {
let value = unsafe { self.values.add(idx).as_ref().unwrap() };
let value = match value {
Register::Value(owned_value) => owned_value,
_ => unreachable!("a row should be formed of values only"),
};
T::from_value(value)
}
pub fn get_value<'a>(&'a self, idx: usize) -> &'a Value {
let value = unsafe { self.values.add(idx).as_ref().unwrap() };
match value {
Register::Value(owned_value) => owned_value,
_ => unreachable!("a row should be formed of values only"),
}
}
pub fn get_values(&self) -> impl Iterator<Item = &Value> {
let values = unsafe { std::slice::from_raw_parts(self.values, self.count) };
// This should be ownedvalues
// TODO: add check for this
values.iter().map(|v| v.get_owned_value())
}
pub fn len(&self) -> usize {
self.count
}
}