From c02936eb3032cebd655c8355270987a4541c12f3 Mon Sep 17 00:00:00 2001 From: pedrocarlo Date: Wed, 6 Aug 2025 14:07:11 -0300 Subject: [PATCH] state machine for `insert` --- core/vdbe/execute.rs | 2 +- core/vdbe/sorter.rs | 48 +++++++++++++++++++++++++++++++++----------- 2 files changed, 37 insertions(+), 13 deletions(-) diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index f87253f4e..871c1a0ad 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -3727,7 +3727,7 @@ pub fn op_sorter_insert( Register::Record(record) => record, _ => unreachable!("SorterInsert on non-record register"), }; - cursor.insert(record)?; + return_if_io!(cursor.insert(record)); } state.pc += 1; Ok(InsnFunctionStepResult::Step) diff --git a/core/vdbe/sorter.rs b/core/vdbe/sorter.rs index 30a24e629..ecbecf3af 100644 --- a/core/vdbe/sorter.rs +++ b/core/vdbe/sorter.rs @@ -26,6 +26,12 @@ enum SortState { Next, } +#[derive(Debug, Clone, Copy)] +enum InsertState { + Start, + Insert, +} + pub struct Sorter { /// The records in the in-memory buffer. records: Vec, @@ -56,6 +62,8 @@ pub struct Sorter { temp_dir: Option, /// State machine for [Sorter::sort] sort_state: SortState, + /// State machine for [Sorter::insert] + insert_state: InsertState, } impl Sorter { @@ -91,6 +99,7 @@ impl Sorter { wait_for_read_complete: Vec::new(), temp_dir: None, sort_state: SortState::Start, + insert_state: InsertState::Start, } } @@ -162,19 +171,32 @@ impl Sorter { self.current.as_ref() } - pub fn insert(&mut self, record: &ImmutableRecord) -> Result<()> { + pub fn insert(&mut self, record: &ImmutableRecord) -> Result> { let payload_size = record.get_payload().len(); - if self.current_buffer_size + payload_size > self.max_buffer_size { - let _c = self.flush()?; + loop { + match self.insert_state { + InsertState::Start => { + self.insert_state = InsertState::Insert; + if self.current_buffer_size + payload_size > self.max_buffer_size { + if let Some(_c) = self.flush()? { + return Ok(IOResult::IO); + } + } + } + InsertState::Insert => { + self.records.push(SortableImmutableRecord::new( + record.clone(), + self.key_len, + self.index_key_info.clone(), + )?); + self.current_buffer_size += payload_size; + self.max_payload_size_in_buffer = + self.max_payload_size_in_buffer.max(payload_size); + self.insert_state = InsertState::Start; + return Ok(IOResult::Done(())); + } + } } - self.records.push(SortableImmutableRecord::new( - record.clone(), - self.key_len, - self.index_key_info.clone(), - )?); - self.current_buffer_size += payload_size; - self.max_payload_size_in_buffer = self.max_payload_size_in_buffer.max(payload_size); - Ok(()) } fn init_chunk_heap(&mut self) -> Result> { @@ -619,6 +641,7 @@ mod tests { use super::*; use crate::translate::collate::CollationSeq; use crate::types::{ImmutableRecord, RefValue, Value, ValueType}; + use crate::util::IOExt; use crate::PlatformIO; use rand_chacha::{ rand_core::{RngCore, SeedableRng}, @@ -666,7 +689,8 @@ mod tests { values.append(&mut generate_values(&mut rng, &value_types)); let record = ImmutableRecord::from_values(&values, values.len()); - sorter.insert(&record).expect("Failed to insert the record"); + io.block(|| sorter.insert(&record)) + .expect("Failed to insert the record"); initial_records.push(record); }