core/vdbe: Replace get_btree_{table,index}_cursor() calls with get_cursor()

This commit is contained in:
Pekka Enberg
2025-03-04 14:35:57 +02:00
parent cdcaebb878
commit f3ee86d784

View File

@@ -288,28 +288,6 @@ impl ProgramState {
self.parameters.clear();
}
pub fn get_btree_table_cursor<'a>(&'a self, cursor_id: CursorID) -> &'a mut BTreeCursor {
let mut cursors = self.cursors.borrow_mut();
let cursor = cursors
.get_mut(cursor_id)
.expect("cursor id out of bounds")
.as_mut()
.expect("cursor not allocated")
.as_btree_mut();
unsafe { std::mem::transmute(cursor) }
}
pub fn get_btree_index_cursor<'a>(&'a self, cursor_id: CursorID) -> &'a mut BTreeCursor {
let mut cursors = self.cursors.borrow_mut();
let cursor = cursors
.get_mut(cursor_id)
.expect("cursor id out of bounds")
.as_mut()
.expect("cursor not allocated")
.as_btree_mut();
unsafe { std::mem::transmute(cursor) }
}
pub fn get_cursor<'a>(&'a self, cursor_id: CursorID) -> std::cell::RefMut<'a, Cursor> {
let cursors = self.cursors.borrow_mut();
std::cell::RefMut::map(cursors, |c| {
@@ -325,8 +303,8 @@ macro_rules! must_be_btree_cursor {
($cursor_id:expr, $cursor_ref:expr, $state:expr, $insn_name:expr) => {{
let (_, cursor_type) = $cursor_ref.get($cursor_id).unwrap();
let cursor = match cursor_type {
CursorType::BTreeTable(_) => $state.get_btree_table_cursor($cursor_id),
CursorType::BTreeIndex(_) => $state.get_btree_index_cursor($cursor_id),
CursorType::BTreeTable(_) => $state.get_cursor($cursor_id),
CursorType::BTreeIndex(_) => $state.get_cursor($cursor_id),
CursorType::Pseudo(_) => panic!("{} on pseudo cursor", $insn_name),
CursorType::Sorter => panic!("{} on sorter cursor", $insn_name),
CursorType::VirtualTable(_) => panic!("{} on virtual table cursor", $insn_name),
@@ -456,9 +434,12 @@ impl Program {
state.pc += 1;
}
Insn::NullRow { cursor_id } => {
let cursor =
must_be_btree_cursor!(*cursor_id, self.cursor_ref, state, "NullRow");
cursor.set_null_flag(true);
{
let mut cursor =
must_be_btree_cursor!(*cursor_id, self.cursor_ref, state, "NullRow");
let cursor = cursor.as_btree_mut();
cursor.set_null_flag(true);
}
state.pc += 1;
}
Insn::Compare {
@@ -968,24 +949,36 @@ impl Program {
content_reg: _,
num_fields: _,
} => {
let mut cursors = state.cursors.borrow_mut();
let cursor = PseudoCursor::new();
cursors
.get_mut(*cursor_id)
.unwrap()
.replace(Cursor::new_pseudo(cursor));
{
let mut cursors = state.cursors.borrow_mut();
let cursor = PseudoCursor::new();
cursors
.get_mut(*cursor_id)
.unwrap()
.replace(Cursor::new_pseudo(cursor));
}
state.pc += 1;
}
Insn::RewindAsync { cursor_id } => {
let cursor =
must_be_btree_cursor!(*cursor_id, self.cursor_ref, state, "RewindAsync");
return_if_io!(cursor.rewind());
{
let mut cursor = must_be_btree_cursor!(
*cursor_id,
self.cursor_ref,
state,
"RewindAsync"
);
let cursor = cursor.as_btree_mut();
return_if_io!(cursor.rewind());
}
state.pc += 1;
}
Insn::LastAsync { cursor_id } => {
let cursor =
must_be_btree_cursor!(*cursor_id, self.cursor_ref, state, "LastAsync");
return_if_io!(cursor.last());
{
let mut cursor =
must_be_btree_cursor!(*cursor_id, self.cursor_ref, state, "LastAsync");
let cursor = cursor.as_btree_mut();
return_if_io!(cursor.last());
}
state.pc += 1;
}
Insn::LastAwait {
@@ -993,10 +986,14 @@ impl Program {
pc_if_empty,
} => {
assert!(pc_if_empty.is_offset());
let cursor =
must_be_btree_cursor!(*cursor_id, self.cursor_ref, state, "LastAwait");
cursor.wait_for_completion()?;
if cursor.is_empty() {
let is_empty = {
let mut cursor =
must_be_btree_cursor!(*cursor_id, self.cursor_ref, state, "LastAwait");
let cursor = cursor.as_btree_mut();
cursor.wait_for_completion()?;
cursor.is_empty()
};
if is_empty {
state.pc = pc_if_empty.to_offset_int();
} else {
state.pc += 1;
@@ -1007,10 +1004,18 @@ impl Program {
pc_if_empty,
} => {
assert!(pc_if_empty.is_offset());
let cursor =
must_be_btree_cursor!(*cursor_id, self.cursor_ref, state, "RewindAwait");
cursor.wait_for_completion()?;
if cursor.is_empty() {
let is_empty = {
let mut cursor = must_be_btree_cursor!(
*cursor_id,
self.cursor_ref,
state,
"RewindAwait"
);
let cursor = cursor.as_btree_mut();
cursor.wait_for_completion()?;
cursor.is_empty()
};
if is_empty {
state.pc = pc_if_empty.to_offset_int();
} else {
state.pc += 1;
@@ -1022,27 +1027,38 @@ impl Program {
dest,
} => {
if let Some((index_cursor_id, table_cursor_id)) = state.deferred_seek.take() {
let index_cursor = state.get_btree_index_cursor(index_cursor_id);
let rowid = index_cursor.rowid()?;
let table_cursor = state.get_btree_table_cursor(table_cursor_id);
match table_cursor.seek(SeekKey::TableRowId(rowid.unwrap()), SeekOp::EQ)? {
CursorResult::Ok(_) => {}
CursorResult::IO => {
state.deferred_seek = Some((index_cursor_id, table_cursor_id));
return Ok(StepResult::IO);
let deferred_seek = {
let rowid = {
let mut index_cursor = state.get_cursor(index_cursor_id);
let index_cursor = index_cursor.as_btree_mut();
let rowid = index_cursor.rowid()?;
rowid
};
let mut table_cursor = state.get_cursor(table_cursor_id);
let table_cursor = table_cursor.as_btree_mut();
match table_cursor
.seek(SeekKey::TableRowId(rowid.unwrap()), SeekOp::EQ)?
{
CursorResult::Ok(_) => None,
CursorResult::IO => Some((index_cursor_id, table_cursor_id)),
}
};
if let Some(deferred_seek) = deferred_seek {
state.deferred_seek = Some(deferred_seek);
return Ok(StepResult::IO);
}
}
let (_, cursor_type) = self.cursor_ref.get(*cursor_id).unwrap();
match cursor_type {
CursorType::BTreeTable(_) | CursorType::BTreeIndex(_) => {
let value = {
let cursor = must_be_btree_cursor!(
let mut cursor = must_be_btree_cursor!(
*cursor_id,
self.cursor_ref,
state,
"Column"
);
let cursor = cursor.as_btree_mut();
let record = cursor.record();
if let Some(record) = record.as_ref() {
if cursor.get_null_flag() {
@@ -1105,17 +1121,23 @@ impl Program {
return Ok(StepResult::Row);
}
Insn::NextAsync { cursor_id } => {
let cursor =
must_be_btree_cursor!(*cursor_id, self.cursor_ref, state, "NextAsync");
cursor.set_null_flag(false);
return_if_io!(cursor.next());
{
let mut cursor =
must_be_btree_cursor!(*cursor_id, self.cursor_ref, state, "NextAsync");
let cursor = cursor.as_btree_mut();
cursor.set_null_flag(false);
return_if_io!(cursor.next());
}
state.pc += 1;
}
Insn::PrevAsync { cursor_id } => {
let cursor =
must_be_btree_cursor!(*cursor_id, self.cursor_ref, state, "PrevAsync");
cursor.set_null_flag(false);
return_if_io!(cursor.prev());
{
let mut cursor =
must_be_btree_cursor!(*cursor_id, self.cursor_ref, state, "PrevAsync");
let cursor = cursor.as_btree_mut();
cursor.set_null_flag(false);
return_if_io!(cursor.prev());
}
state.pc += 1;
}
Insn::PrevAwait {
@@ -1123,10 +1145,14 @@ impl Program {
pc_if_next,
} => {
assert!(pc_if_next.is_offset());
let cursor =
must_be_btree_cursor!(*cursor_id, self.cursor_ref, state, "PrevAwait");
cursor.wait_for_completion()?;
if !cursor.is_empty() {
let is_empty = {
let mut cursor =
must_be_btree_cursor!(*cursor_id, self.cursor_ref, state, "PrevAwait");
let cursor = cursor.as_btree_mut();
cursor.wait_for_completion()?;
cursor.is_empty()
};
if !is_empty {
state.pc = pc_if_next.to_offset_int();
} else {
state.pc += 1;
@@ -1137,10 +1163,14 @@ impl Program {
pc_if_next,
} => {
assert!(pc_if_next.is_offset());
let cursor =
must_be_btree_cursor!(*cursor_id, self.cursor_ref, state, "NextAwait");
cursor.wait_for_completion()?;
if !cursor.is_empty() {
let is_empty = {
let mut cursor =
must_be_btree_cursor!(*cursor_id, self.cursor_ref, state, "NextAwait");
let cursor = cursor.as_btree_mut();
cursor.wait_for_completion()?;
cursor.is_empty()
};
if !is_empty {
state.pc = pc_if_next.to_offset_int();
} else {
state.pc += 1;
@@ -1273,15 +1303,26 @@ impl Program {
}
Insn::RowId { cursor_id, dest } => {
if let Some((index_cursor_id, table_cursor_id)) = state.deferred_seek.take() {
let index_cursor = state.get_btree_index_cursor(index_cursor_id);
let rowid = index_cursor.rowid()?;
let table_cursor = state.get_btree_table_cursor(table_cursor_id);
match table_cursor.seek(SeekKey::TableRowId(rowid.unwrap()), SeekOp::EQ)? {
CursorResult::Ok(_) => {}
CursorResult::IO => {
state.deferred_seek = Some((index_cursor_id, table_cursor_id));
return Ok(StepResult::IO);
}
let deferred_seek = {
let rowid = {
let mut index_cursor = state.get_cursor(index_cursor_id);
let index_cursor = index_cursor.as_btree_mut();
let rowid = index_cursor.rowid()?;
rowid
};
let mut table_cursor = state.get_cursor(table_cursor_id);
let table_cursor = table_cursor.as_btree_mut();
let deferred_seek = match table_cursor
.seek(SeekKey::TableRowId(rowid.unwrap()), SeekOp::EQ)?
{
CursorResult::Ok(_) => None,
CursorResult::IO => Some((index_cursor_id, table_cursor_id)),
};
deferred_seek
};
if let Some(deferred_seek) = deferred_seek {
state.deferred_seek = Some(deferred_seek);
return Ok(StepResult::IO);
}
}
let mut cursors = state.cursors.borrow_mut();
@@ -1318,25 +1359,33 @@ impl Program {
target_pc,
} => {
assert!(target_pc.is_offset());
let cursor = state.get_btree_table_cursor(*cursor_id);
let rowid = match &state.registers[*src_reg] {
OwnedValue::Integer(rowid) => *rowid as u64,
OwnedValue::Null => {
state.pc = target_pc.to_offset_int();
continue;
}
other => {
return Err(LimboError::InternalError(
format!("SeekRowid: the value in the register is not an integer or NULL: {}", other)
));
let pc = {
let mut cursor = state.get_cursor(*cursor_id);
let cursor = cursor.as_btree_mut();
let rowid = match &state.registers[*src_reg] {
OwnedValue::Integer(rowid) => Some(*rowid as u64),
OwnedValue::Null => None,
other => {
return Err(LimboError::InternalError(
format!("SeekRowid: the value in the register is not an integer or NULL: {}", other)
));
}
};
match rowid {
Some(rowid) => {
let found = return_if_io!(
cursor.seek(SeekKey::TableRowId(rowid), SeekOp::EQ)
);
if !found {
target_pc.to_offset_int()
} else {
state.pc + 1
}
}
None => target_pc.to_offset_int(),
}
};
let found = return_if_io!(cursor.seek(SeekKey::TableRowId(rowid), SeekOp::EQ));
if !found {
state.pc = target_pc.to_offset_int();
} else {
state.pc += 1;
}
state.pc = pc;
}
Insn::DeferredSeek {
index_cursor_id,
@@ -1354,40 +1403,54 @@ impl Program {
} => {
assert!(target_pc.is_offset());
if *is_index {
let cursor = state.get_btree_index_cursor(*cursor_id);
let record_from_regs: Record =
make_owned_record(&state.registers, start_reg, num_regs);
let found = return_if_io!(
cursor.seek(SeekKey::IndexKey(&record_from_regs), SeekOp::GE)
);
let found = {
let mut cursor = state.get_cursor(*cursor_id);
let cursor = cursor.as_btree_mut();
let record_from_regs: Record =
make_owned_record(&state.registers, start_reg, num_regs);
let found = return_if_io!(
cursor.seek(SeekKey::IndexKey(&record_from_regs), SeekOp::GE)
);
found
};
if !found {
state.pc = target_pc.to_offset_int();
} else {
state.pc += 1;
}
} else {
let cursor = state.get_btree_table_cursor(*cursor_id);
let rowid = match &state.registers[*start_reg] {
OwnedValue::Null => {
// All integer values are greater than null so we just rewind the cursor
return_if_io!(cursor.rewind());
state.pc += 1;
continue;
}
OwnedValue::Integer(rowid) => *rowid as u64,
_ => {
return Err(LimboError::InternalError(
"SeekGE: the value in the register is not an integer".into(),
));
let pc = {
let mut cursor = state.get_cursor(*cursor_id);
let cursor = cursor.as_btree_mut();
let rowid = match &state.registers[*start_reg] {
OwnedValue::Null => {
// All integer values are greater than null so we just rewind the cursor
return_if_io!(cursor.rewind());
None
}
OwnedValue::Integer(rowid) => Some(*rowid as u64),
_ => {
return Err(LimboError::InternalError(
"SeekGE: the value in the register is not an integer"
.into(),
));
}
};
match rowid {
Some(rowid) => {
let found = return_if_io!(
cursor.seek(SeekKey::TableRowId(rowid), SeekOp::GE)
);
if !found {
target_pc.to_offset_int()
} else {
state.pc + 1
}
}
None => state.pc + 1,
}
};
let found =
return_if_io!(cursor.seek(SeekKey::TableRowId(rowid), SeekOp::GE));
if !found {
state.pc = target_pc.to_offset_int();
} else {
state.pc += 1;
}
state.pc = pc;
}
}
Insn::SeekGT {
@@ -1399,40 +1462,55 @@ impl Program {
} => {
assert!(target_pc.is_offset());
if *is_index {
let cursor = state.get_btree_index_cursor(*cursor_id);
let record_from_regs: Record =
make_owned_record(&state.registers, start_reg, num_regs);
let found = return_if_io!(
cursor.seek(SeekKey::IndexKey(&record_from_regs), SeekOp::GT)
);
let found = {
let mut cursor = state.get_cursor(*cursor_id);
let cursor = cursor.as_btree_mut();
let record_from_regs: Record =
make_owned_record(&state.registers, start_reg, num_regs);
let found = return_if_io!(
cursor.seek(SeekKey::IndexKey(&record_from_regs), SeekOp::GT)
);
found
};
if !found {
state.pc = target_pc.to_offset_int();
} else {
state.pc += 1;
}
} else {
let cursor = state.get_btree_table_cursor(*cursor_id);
let rowid = match &state.registers[*start_reg] {
OwnedValue::Null => {
// All integer values are greater than null so we just rewind the cursor
return_if_io!(cursor.rewind());
state.pc += 1;
continue;
}
OwnedValue::Integer(rowid) => *rowid as u64,
_ => {
return Err(LimboError::InternalError(
"SeekGT: the value in the register is not an integer".into(),
));
}
let pc = {
let mut cursor = state.get_cursor(*cursor_id);
let cursor = cursor.as_btree_mut();
let rowid = match &state.registers[*start_reg] {
OwnedValue::Null => {
// All integer values are greater than null so we just rewind the cursor
return_if_io!(cursor.rewind());
None
}
OwnedValue::Integer(rowid) => Some(*rowid as u64),
_ => {
return Err(LimboError::InternalError(
"SeekGT: the value in the register is not an integer"
.into(),
));
}
};
let found = match rowid {
Some(rowid) => {
let found = return_if_io!(
cursor.seek(SeekKey::TableRowId(rowid), SeekOp::GT)
);
if !found {
target_pc.to_offset_int()
} else {
state.pc + 1
}
}
None => state.pc + 1,
};
found
};
let found =
return_if_io!(cursor.seek(SeekKey::TableRowId(rowid), SeekOp::GT));
if !found {
state.pc = target_pc.to_offset_int();
} else {
state.pc += 1;
}
state.pc = pc;
}
}
Insn::IdxGE {
@@ -1443,10 +1521,11 @@ impl Program {
} => {
assert!(target_pc.is_offset());
let pc = {
let cursor = state.get_btree_index_cursor(*cursor_id);
let mut cursor = state.get_cursor(*cursor_id);
let cursor = cursor.as_btree_mut();
let record_from_regs: Record =
make_owned_record(&state.registers, start_reg, num_regs);
if let Some(ref idx_record) = *cursor.record() {
let pc = if let Some(ref idx_record) = *cursor.record() {
// Compare against the same number of values
if idx_record.get_values()[..record_from_regs.len()]
>= record_from_regs.get_values()[..]
@@ -1457,7 +1536,8 @@ impl Program {
}
} else {
target_pc.to_offset_int()
}
};
pc
};
state.pc = pc;
}
@@ -1469,10 +1549,11 @@ impl Program {
} => {
assert!(target_pc.is_offset());
let pc = {
let cursor = state.get_btree_index_cursor(*cursor_id);
let mut cursor = state.get_cursor(*cursor_id);
let cursor = cursor.as_btree_mut();
let record_from_regs: Record =
make_owned_record(&state.registers, start_reg, num_regs);
if let Some(ref idx_record) = *cursor.record() {
let pc = if let Some(ref idx_record) = *cursor.record() {
// Compare against the same number of values
if idx_record.get_values()[..record_from_regs.len()]
<= record_from_regs.get_values()[..]
@@ -1483,7 +1564,8 @@ impl Program {
}
} else {
target_pc.to_offset_int()
}
};
pc
};
state.pc = pc;
}
@@ -1495,10 +1577,11 @@ impl Program {
} => {
assert!(target_pc.is_offset());
let pc = {
let cursor = state.get_btree_index_cursor(*cursor_id);
let mut cursor = state.get_cursor(*cursor_id);
let cursor = cursor.as_btree_mut();
let record_from_regs: Record =
make_owned_record(&state.registers, start_reg, num_regs);
if let Some(ref idx_record) = *cursor.record() {
let pc = if let Some(ref idx_record) = *cursor.record() {
// Compare against the same number of values
if idx_record.get_values()[..record_from_regs.len()]
> record_from_regs.get_values()[..]
@@ -1509,7 +1592,8 @@ impl Program {
}
} else {
target_pc.to_offset_int()
}
};
pc
};
state.pc = pc;
}
@@ -1521,10 +1605,11 @@ impl Program {
} => {
assert!(target_pc.is_offset());
let pc = {
let cursor = state.get_btree_index_cursor(*cursor_id);
let mut cursor = state.get_cursor(*cursor_id);
let cursor = cursor.as_btree_mut();
let record_from_regs: Record =
make_owned_record(&state.registers, start_reg, num_regs);
if let Some(ref idx_record) = *cursor.record() {
let pc = if let Some(ref idx_record) = *cursor.record() {
// Compare against the same number of values
if idx_record.get_values()[..record_from_regs.len()]
< record_from_regs.get_values()[..]
@@ -1535,7 +1620,8 @@ impl Program {
}
} else {
target_pc.to_offset_int()
}
};
pc
};
state.pc = pc;
}
@@ -2684,41 +2770,53 @@ impl Program {
record_reg,
flag: _,
} => {
let cursor = &mut state.get_btree_table_cursor(*cursor);
let record = match &state.registers[*record_reg] {
OwnedValue::Record(r) => r,
_ => unreachable!("Not a record! Cannot insert a non record value."),
};
let key = &state.registers[*key_reg];
// NOTE(pere): Sending moved_before == true is okay because we moved before but
// if we were to set to false after starting a balance procedure, it might
// leave undefined state.
return_if_io!(cursor.insert(key, record, true));
{
let mut cursor = state.get_cursor(*cursor);
let cursor = cursor.as_btree_mut();
let record = match &state.registers[*record_reg] {
OwnedValue::Record(r) => r,
_ => unreachable!("Not a record! Cannot insert a non record value."),
};
let key = &state.registers[*key_reg];
// NOTE(pere): Sending moved_before == true is okay because we moved before but
// if we were to set to false after starting a balance procedure, it might
// leave undefined state.
return_if_io!(cursor.insert(key, record, true));
}
state.pc += 1;
}
Insn::InsertAwait { cursor_id } => {
let cursor = &mut state.get_btree_table_cursor(*cursor_id);
cursor.wait_for_completion()?;
// Only update last_insert_rowid for regular table inserts, not schema modifications
if cursor.root_page() != 1 {
if let Some(rowid) = cursor.rowid()? {
if let Some(conn) = self.connection.upgrade() {
conn.update_last_rowid(rowid);
{
let mut cursor = state.get_cursor(*cursor_id);
let cursor = cursor.as_btree_mut();
cursor.wait_for_completion()?;
// Only update last_insert_rowid for regular table inserts, not schema modifications
if cursor.root_page() != 1 {
if let Some(rowid) = cursor.rowid()? {
if let Some(conn) = self.connection.upgrade() {
conn.update_last_rowid(rowid);
}
let prev_changes = self.n_change.get();
self.n_change.set(prev_changes + 1);
}
let prev_changes = self.n_change.get();
self.n_change.set(prev_changes + 1);
}
}
state.pc += 1;
}
Insn::DeleteAsync { cursor_id } => {
let cursor = &mut state.get_btree_table_cursor(*cursor_id);
return_if_io!(cursor.delete());
{
let mut cursor = state.get_cursor(*cursor_id);
let cursor = cursor.as_btree_mut();
return_if_io!(cursor.delete());
}
state.pc += 1;
}
Insn::DeleteAwait { cursor_id } => {
let cursor = &mut state.get_btree_table_cursor(*cursor_id);
cursor.wait_for_completion()?;
{
let mut cursor = state.get_cursor(*cursor_id);
let cursor = cursor.as_btree_mut();
cursor.wait_for_completion()?;
}
let prev_changes = self.n_change.get();
self.n_change.set(prev_changes + 1);
state.pc += 1;
@@ -2726,9 +2824,13 @@ impl Program {
Insn::NewRowid {
cursor, rowid_reg, ..
} => {
let cursor = &mut state.get_btree_table_cursor(*cursor);
// TODO: make io handle rng
let rowid = return_if_io!(get_new_rowid(cursor, thread_rng()));
let rowid = {
let mut cursor = state.get_cursor(*cursor);
let cursor = cursor.as_btree_mut();
// TODO: make io handle rng
let rowid = return_if_io!(get_new_rowid(cursor, thread_rng()));
rowid
};
state.registers[*rowid_reg] = OwnedValue::Integer(rowid);
state.pc += 1;
}
@@ -2771,9 +2873,13 @@ impl Program {
rowid_reg,
target_pc,
} => {
let cursor =
must_be_btree_cursor!(*cursor, self.cursor_ref, state, "NotExists");
let exists = return_if_io!(cursor.exists(&state.registers[*rowid_reg]));
let exists = {
let mut cursor =
must_be_btree_cursor!(*cursor, self.cursor_ref, state, "NotExists");
let cursor = cursor.as_btree_mut();
let exists = return_if_io!(cursor.exists(&state.registers[*rowid_reg]));
exists
};
if exists {
state.pc += 1;
} else {