state machine for insert

This commit is contained in:
pedrocarlo
2025-08-06 14:07:11 -03:00
parent 2ec58b0264
commit c02936eb30
2 changed files with 37 additions and 13 deletions

View File

@@ -3727,7 +3727,7 @@ pub fn op_sorter_insert(
Register::Record(record) => record,
_ => unreachable!("SorterInsert on non-record register"),
};
cursor.insert(record)?;
return_if_io!(cursor.insert(record));
}
state.pc += 1;
Ok(InsnFunctionStepResult::Step)

View File

@@ -26,6 +26,12 @@ enum SortState {
Next,
}
#[derive(Debug, Clone, Copy)]
enum InsertState {
Start,
Insert,
}
pub struct Sorter {
/// The records in the in-memory buffer.
records: Vec<SortableImmutableRecord>,
@@ -56,6 +62,8 @@ pub struct Sorter {
temp_dir: Option<tempfile::TempDir>,
/// State machine for [Sorter::sort]
sort_state: SortState,
/// State machine for [Sorter::insert]
insert_state: InsertState,
}
impl Sorter {
@@ -91,6 +99,7 @@ impl Sorter {
wait_for_read_complete: Vec::new(),
temp_dir: None,
sort_state: SortState::Start,
insert_state: InsertState::Start,
}
}
@@ -162,19 +171,32 @@ impl Sorter {
self.current.as_ref()
}
pub fn insert(&mut self, record: &ImmutableRecord) -> Result<()> {
pub fn insert(&mut self, record: &ImmutableRecord) -> Result<IOResult<()>> {
let payload_size = record.get_payload().len();
if self.current_buffer_size + payload_size > self.max_buffer_size {
let _c = self.flush()?;
loop {
match self.insert_state {
InsertState::Start => {
self.insert_state = InsertState::Insert;
if self.current_buffer_size + payload_size > self.max_buffer_size {
if let Some(_c) = self.flush()? {
return Ok(IOResult::IO);
}
}
}
InsertState::Insert => {
self.records.push(SortableImmutableRecord::new(
record.clone(),
self.key_len,
self.index_key_info.clone(),
)?);
self.current_buffer_size += payload_size;
self.max_payload_size_in_buffer =
self.max_payload_size_in_buffer.max(payload_size);
self.insert_state = InsertState::Start;
return Ok(IOResult::Done(()));
}
}
}
self.records.push(SortableImmutableRecord::new(
record.clone(),
self.key_len,
self.index_key_info.clone(),
)?);
self.current_buffer_size += payload_size;
self.max_payload_size_in_buffer = self.max_payload_size_in_buffer.max(payload_size);
Ok(())
}
fn init_chunk_heap(&mut self) -> Result<IOResult<()>> {
@@ -619,6 +641,7 @@ mod tests {
use super::*;
use crate::translate::collate::CollationSeq;
use crate::types::{ImmutableRecord, RefValue, Value, ValueType};
use crate::util::IOExt;
use crate::PlatformIO;
use rand_chacha::{
rand_core::{RngCore, SeedableRng},
@@ -666,7 +689,8 @@ mod tests {
values.append(&mut generate_values(&mut rng, &value_types));
let record = ImmutableRecord::from_values(&values, values.len());
sorter.insert(&record).expect("Failed to insert the record");
io.block(|| sorter.insert(&record))
.expect("Failed to insert the record");
initial_records.push(record);
}