Add Sequence and SequenceTest opcodes to vdbe and sorter

This commit is contained in:
PThorpe92
2025-09-23 22:34:13 -04:00
parent 7dccff0bee
commit 3c8216caab
3 changed files with 79 additions and 1 deletions

View File

@@ -5345,6 +5345,49 @@ pub fn op_function(
Ok(InsnFunctionStepResult::Step)
}
pub fn op_sequence(
program: &Program,
state: &mut ProgramState,
insn: &Insn,
pager: &Arc<Pager>,
mv_store: Option<&Arc<MvStore>>,
) -> Result<InsnFunctionStepResult> {
load_insn!(
Sequence {
cursor_id,
target_reg
},
insn
);
let cursor = state.get_cursor(*cursor_id).as_sorter_mut();
let seq_num = cursor.next_sequence();
state.registers[*target_reg] = Register::Value(Value::Integer(seq_num));
state.pc += 1;
Ok(InsnFunctionStepResult::Step)
}
pub fn op_sequence_test(
program: &Program,
state: &mut ProgramState,
insn: &Insn,
pager: &Arc<Pager>,
mv_store: Option<&Arc<MvStore>>,
) -> Result<InsnFunctionStepResult> {
load_insn!(
SequenceTest {
cursor_id,
target_pc,
value_reg
},
insn
);
let cursor = state.get_cursor(*cursor_id).as_sorter_mut();
if cursor.seq_beginning() {
state.pc = target_pc.as_offset_int();
}
Ok(InsnFunctionStepResult::Step)
}
pub fn op_init_coroutine(
program: &Program,
state: &mut ProgramState,

View File

@@ -1126,6 +1126,20 @@ pub enum Insn {
target_pc: BranchOffset,
},
/// Find the next available sequence number for cursor P1. Write the sequence number into register P2.
/// The sequence number on the cursor is incremented after this instruction.
Sequence {
cursor_id: CursorID,
target_reg: usize,
},
/// P1 is a sorter cursor. If the sequence counter is currently zero, jump to P2. Regardless of whether or not the jump is taken, increment the the sequence value.
SequenceTest {
cursor_id: CursorID,
target_pc: BranchOffset,
value_reg: usize,
},
// OP_Explain
Explain {
p1: usize, // P1: address of instruction
@@ -1295,6 +1309,8 @@ impl InsnVariants {
InsnVariants::IfNeg => execute::op_if_neg,
InsnVariants::Explain => execute::op_noop,
InsnVariants::OpenDup => execute::op_open_dup,
InsnVariants::Sequence => execute::op_sequence,
InsnVariants::SequenceTest => execute::op_sequence_test,
}
}
}

View File

@@ -88,6 +88,7 @@ pub struct Sorter {
insert_state: InsertState,
/// State machine for [Sorter::init_chunk_heap]
init_chunk_heap_state: InitChunkHeapState,
seq_count: i64,
}
impl Sorter {
@@ -125,6 +126,7 @@ impl Sorter {
sort_state: SortState::Start,
insert_state: InsertState::Start,
init_chunk_heap_state: InitChunkHeapState::Start,
seq_count: 0,
}
}
@@ -136,6 +138,21 @@ impl Sorter {
self.current.is_some()
}
/// Get current sequence count and increment it
pub fn next_sequence(&mut self) -> i64 {
let current = self.seq_count;
self.seq_count += 1;
current
}
/// Test if at beginning of sequence (count == 0) and increment
/// Returns true if this was the first call (seq_count was 0)
pub fn seq_beginning(&mut self) -> bool {
let was_zero = self.seq_count == 0;
self.seq_count += 1;
was_zero
}
// We do the sorting here since this is what is called by the SorterSort instruction
pub fn sort(&mut self) -> Result<IOResult<()>> {
loop {
@@ -578,6 +595,7 @@ struct SortableImmutableRecord {
record: ImmutableRecord,
cursor: RecordCursor,
key_values: RefCell<Vec<RefValue>>,
key_len: usize,
index_key_info: Rc<Vec<KeyInfo>>,
/// The key deserialization error, if any.
deserialization_error: RefCell<Option<LimboError>>,
@@ -601,6 +619,7 @@ impl SortableImmutableRecord {
key_values: RefCell::new(Vec::with_capacity(key_len)),
index_key_info,
deserialization_error: RefCell::new(None),
key_len,
})
}
@@ -638,7 +657,7 @@ impl Ord for SortableImmutableRecord {
let this_key_values_len = self.key_values.borrow().len();
let other_key_values_len = other.key_values.borrow().len();
for i in 0..self.cursor.serial_types.len() {
for i in 0..self.key_len {
// Lazily deserialize the key values if they haven't been deserialized already.
if i >= this_key_values_len {
self.try_deserialize_key(i);