Merge 'Sorter IO Completions' from Pedro Muniz

Convert Sorter code to use state machines and not ignore completions.
Also simplifies some logic that seemed redundant to me. Also, I was
getting some IO errors because we were opening one file per Chunk, so I
fixed this by using only one file per sorter, and just using offsets in
the file for each chunk.
Builds on top of #2520

Closes #2473
This commit is contained in:
Jussi Saurio
2025-08-11 15:42:58 +03:00
committed by GitHub
2 changed files with 300 additions and 191 deletions

View File

@@ -3727,7 +3727,7 @@ pub fn op_sorter_insert(
Register::Record(record) => record,
_ => unreachable!("SorterInsert on non-record register"),
};
cursor.insert(record)?;
return_if_io!(cursor.insert(record));
}
state.pc += 1;
Ok(InsnFunctionStepResult::Step)

View File

@@ -7,6 +7,8 @@ use std::rc::Rc;
use std::sync::Arc;
use tempfile;
use crate::return_if_io;
use crate::util::IOExt;
use crate::{
error::LimboError,
io::{Buffer, Completion, File, OpenFlags, IO},
@@ -17,6 +19,40 @@ use crate::{
Result,
};
#[derive(Debug, Clone, Copy)]
enum SortState {
Start,
Flush,
InitHeap,
Next,
}
#[derive(Debug, Clone, Copy)]
enum InsertState {
Start,
Insert,
}
#[derive(Debug, Clone, Copy)]
enum InitChunkHeapState {
Start,
PushChunk,
}
struct TempFile {
// When temp_dir is dropped the folder is deleted
_temp_dir: tempfile::TempDir,
file: Arc<dyn File>,
}
impl core::ops::Deref for TempFile {
type Target = Arc<dyn File>;
fn deref(&self) -> &Self::Target {
&self.file
}
}
pub struct Sorter {
/// The records in the in-memory buffer.
records: Vec<SortableImmutableRecord>,
@@ -41,10 +77,16 @@ pub struct Sorter {
max_payload_size_in_buffer: usize,
/// The IO object.
io: Arc<dyn IO>,
/// The indices of the chunks for which the read is not complete.
wait_for_read_complete: Vec<usize>,
/// The temporary directory for chunk files.
temp_dir: Option<tempfile::TempDir>,
/// The temporary file for chunks.
temp_file: Option<TempFile>,
/// Offset where the next chunk will be placed in the `temp_file`
next_chunk_offset: usize,
/// State machine for [Sorter::sort]
sort_state: SortState,
/// State machine for [Sorter::insert]
insert_state: InsertState,
/// State machine for [Sorter::init_chunk_heap]
init_chunk_heap_state: InitChunkHeapState,
}
impl Sorter {
@@ -77,8 +119,11 @@ impl Sorter {
min_chunk_read_buffer_size: min_chunk_read_buffer_size_bytes,
max_payload_size_in_buffer: 0,
io,
wait_for_read_complete: Vec::new(),
temp_dir: None,
temp_file: None,
next_chunk_offset: 0,
sort_state: SortState::Start,
insert_state: InsertState::Start,
init_chunk_heap_state: InitChunkHeapState::Start,
}
}
@@ -92,16 +137,39 @@ impl Sorter {
// We do the sorting here since this is what is called by the SorterSort instruction
pub fn sort(&mut self) -> Result<IOResult<()>> {
if self.chunks.is_empty() {
self.records.sort();
self.records.reverse();
} else {
self.flush()?;
if let IOResult::IO = self.init_chunk_heap()? {
return Ok(IOResult::IO);
loop {
match self.sort_state {
SortState::Start => {
if self.chunks.is_empty() {
self.records.sort();
self.records.reverse();
self.sort_state = SortState::Next;
} else {
self.sort_state = SortState::Flush;
}
}
SortState::Flush => {
self.sort_state = SortState::InitHeap;
if let Some(_c) = self.flush()? {
return Ok(IOResult::IO);
}
}
SortState::InitHeap => {
if self.chunks.iter().any(|chunk| {
matches!(chunk.io_state.get(), SortedChunkIOState::WaitingForWrite)
}) {
return Ok(IOResult::IO);
}
return_if_io!(self.init_chunk_heap());
self.sort_state = SortState::Next;
}
SortState::Next => {
return_if_io!(self.next());
self.sort_state = SortState::Start;
return Ok(IOResult::Done(()));
}
}
}
self.next()
}
pub fn next(&mut self) -> Result<IOResult<()>> {
@@ -132,82 +200,95 @@ impl Sorter {
self.current.as_ref()
}
pub fn insert(&mut self, record: &ImmutableRecord) -> Result<()> {
pub fn insert(&mut self, record: &ImmutableRecord) -> Result<IOResult<()>> {
let payload_size = record.get_payload().len();
if self.current_buffer_size + payload_size > self.max_buffer_size {
self.flush()?;
loop {
match self.insert_state {
InsertState::Start => {
self.insert_state = InsertState::Insert;
if self.current_buffer_size + payload_size > self.max_buffer_size {
if let Some(_c) = self.flush()? {
return Ok(IOResult::IO);
}
}
}
InsertState::Insert => {
if self.chunks.iter().any(|chunk| {
matches!(chunk.io_state.get(), SortedChunkIOState::WaitingForWrite)
}) {
return Ok(IOResult::IO);
}
self.records.push(SortableImmutableRecord::new(
record.clone(),
self.key_len,
self.index_key_info.clone(),
)?);
self.current_buffer_size += payload_size;
self.max_payload_size_in_buffer =
self.max_payload_size_in_buffer.max(payload_size);
self.insert_state = InsertState::Start;
return Ok(IOResult::Done(()));
}
}
}
self.records.push(SortableImmutableRecord::new(
record.clone(),
self.key_len,
self.index_key_info.clone(),
)?);
self.current_buffer_size += payload_size;
self.max_payload_size_in_buffer = self.max_payload_size_in_buffer.max(payload_size);
Ok(())
}
fn init_chunk_heap(&mut self) -> Result<IOResult<()>> {
let mut all_read_complete = true;
// Make sure all chunks read at least one record into their buffer.
for chunk in self.chunks.iter_mut() {
match chunk.io_state.get() {
SortedChunkIOState::WriteComplete => {
all_read_complete = false;
// Write complete, we can now read from the chunk.
chunk.read()?;
match self.init_chunk_heap_state {
InitChunkHeapState::Start => {
let mut completions = Vec::with_capacity(self.chunks.len());
for chunk in self.chunks.iter_mut() {
let c = chunk.read()?;
completions.push(c);
}
SortedChunkIOState::WaitingForWrite | SortedChunkIOState::WaitingForRead => {
all_read_complete = false;
self.init_chunk_heap_state = InitChunkHeapState::PushChunk;
Ok(IOResult::IO)
}
InitChunkHeapState::PushChunk => {
// Make sure all chunks read at least one record into their buffer.
if self
.chunks
.iter()
.any(|chunk| matches!(chunk.io_state.get(), SortedChunkIOState::WaitingForRead))
{
return Ok(IOResult::IO);
}
SortedChunkIOState::ReadEOF | SortedChunkIOState::ReadComplete => {}
_ => {
unreachable!("Unexpected chunk IO state: {:?}", chunk.io_state.get())
self.chunk_heap.reserve(self.chunks.len());
// TODO: blocking will be unnecessary here with IO completions
let io = self.io.clone();
for chunk_idx in 0..self.chunks.len() {
io.block(|| self.push_to_chunk_heap(chunk_idx))?;
}
self.init_chunk_heap_state = InitChunkHeapState::Start;
Ok(IOResult::Done(()))
}
}
if !all_read_complete {
return Ok(IOResult::IO);
}
self.chunk_heap.reserve(self.chunks.len());
for chunk_idx in 0..self.chunks.len() {
self.push_to_chunk_heap(chunk_idx)?;
}
Ok(IOResult::Done(()))
}
fn next_from_chunk_heap(&mut self) -> Result<IOResult<Option<SortableImmutableRecord>>> {
let mut all_read_complete = true;
for chunk_idx in self.wait_for_read_complete.iter() {
let chunk_io_state = self.chunks[*chunk_idx].io_state.get();
match chunk_io_state {
SortedChunkIOState::ReadComplete | SortedChunkIOState::ReadEOF => {}
SortedChunkIOState::WaitingForRead => {
all_read_complete = false;
}
_ => {
unreachable!("Unexpected chunk IO state: {:?}", chunk_io_state)
}
}
}
if !all_read_complete {
// Make sure all chunks read at least one record into their buffer.
if self
.chunks
.iter()
.any(|chunk| matches!(chunk.io_state.get(), SortedChunkIOState::WaitingForRead))
{
return Ok(IOResult::IO);
}
self.wait_for_read_complete.clear();
if let Some((next_record, next_chunk_idx)) = self.chunk_heap.pop() {
self.push_to_chunk_heap(next_chunk_idx)?;
// TODO: blocking will be unnecessary here with IO completions
let io = self.io.clone();
io.block(|| self.push_to_chunk_heap(next_chunk_idx))?;
Ok(IOResult::Done(Some(next_record.0)))
} else {
Ok(IOResult::Done(None))
}
}
fn push_to_chunk_heap(&mut self, chunk_idx: usize) -> Result<()> {
fn push_to_chunk_heap(&mut self, chunk_idx: usize) -> Result<IOResult<()>> {
let chunk = &mut self.chunks[chunk_idx];
if chunk.has_more() {
let record = chunk.next()?.unwrap();
if let Some(record) = return_if_io!(chunk.next()) {
self.chunk_heap.push((
Reverse(SortableImmutableRecord::new(
record,
@@ -216,52 +297,76 @@ impl Sorter {
)?),
chunk_idx,
));
if let SortedChunkIOState::WaitingForRead = chunk.io_state.get() {
self.wait_for_read_complete.push(chunk_idx);
}
}
Ok(())
Ok(IOResult::Done(()))
}
fn flush(&mut self) -> Result<()> {
fn flush(&mut self) -> Result<Option<Completion>> {
if self.records.is_empty() {
return Ok(());
// Dummy completion to not complicate logic handling
return Ok(None);
}
self.records.sort();
if self.temp_dir.is_none() {
self.temp_dir = Some(tempfile::tempdir().map_err(LimboError::IOError)?);
}
let chunk_file_path = self
.temp_dir
.as_ref()
.unwrap()
.path()
.join(format!("chunk_{}", self.chunks.len()));
let chunk_file =
self.io
.open_file(chunk_file_path.to_str().unwrap(), OpenFlags::Create, false)?;
let chunk_file = match &self.temp_file {
Some(temp_file) => temp_file.file.clone(),
None => {
let temp_dir = tempfile::tempdir().map_err(LimboError::IOError)?;
let chunk_file_path = temp_dir.as_ref().join("chunk_file");
let chunk_file = self.io.open_file(
chunk_file_path.to_str().unwrap(),
OpenFlags::Create,
false,
)?;
self.temp_file = Some(TempFile {
_temp_dir: temp_dir,
file: chunk_file.clone(),
});
chunk_file
}
};
// Make sure the chunk buffer size can fit the largest record and its size varint.
let chunk_buffer_size = self
.min_chunk_read_buffer_size
.max(self.max_payload_size_in_buffer + 9);
let mut chunk = SortedChunk::new(chunk_file.clone(), chunk_buffer_size);
chunk.write(&mut self.records)?;
let mut chunk_size = 0;
// Pre-compute varint lengths for record sizes to determine the total buffer size.
let mut record_size_lengths = Vec::with_capacity(self.records.len());
for record in self.records.iter() {
let record_size = record.record.get_payload().len();
let size_len = varint_len(record_size as u64);
record_size_lengths.push(size_len);
chunk_size += size_len + record_size;
}
let mut chunk = SortedChunk::new(chunk_file, self.next_chunk_offset, chunk_buffer_size);
let c = chunk.write(&mut self.records, record_size_lengths, chunk_size)?;
self.chunks.push(chunk);
self.current_buffer_size = 0;
self.max_payload_size_in_buffer = 0;
// increase offset start for next chunk
self.next_chunk_offset += chunk_size;
Ok(())
Ok(Some(c))
}
}
#[derive(Debug, Clone, Copy)]
enum NextState {
Start,
Finish,
}
struct SortedChunk {
/// The chunk file.
file: Arc<dyn File>,
/// Offset of the start of chunk in file
start_offset: usize,
/// The size of this chunk file in bytes.
chunk_size: usize,
/// The read buffer.
@@ -274,91 +379,104 @@ struct SortedChunk {
io_state: Rc<Cell<SortedChunkIOState>>,
/// The total number of bytes read from the chunk file.
total_bytes_read: Rc<Cell<usize>>,
/// State machine for [SortedChunk::next]
next_state: NextState,
}
impl SortedChunk {
fn new(file: Arc<dyn File>, buffer_size: usize) -> Self {
fn new(file: Arc<dyn File>, start_offset: usize, buffer_size: usize) -> Self {
Self {
file,
start_offset,
chunk_size: 0,
buffer: Rc::new(RefCell::new(vec![0; buffer_size])),
buffer_len: Rc::new(Cell::new(0)),
records: Vec::new(),
io_state: Rc::new(Cell::new(SortedChunkIOState::None)),
total_bytes_read: Rc::new(Cell::new(0)),
next_state: NextState::Start,
}
}
fn has_more(&self) -> bool {
!self.records.is_empty() || self.io_state.get() != SortedChunkIOState::ReadEOF
}
fn next(&mut self) -> Result<Option<ImmutableRecord>> {
let mut buffer_len = self.buffer_len.get();
if self.records.is_empty() && buffer_len == 0 {
return Ok(None);
}
if self.records.is_empty() {
let mut buffer_ref = self.buffer.borrow_mut();
let buffer = buffer_ref.as_mut_slice();
let mut buffer_offset = 0;
while buffer_offset < buffer_len {
// Extract records from the buffer until we run out of the buffer or we hit an incomplete record.
let (record_size, bytes_read) =
match read_varint(&buffer[buffer_offset..buffer_len]) {
Ok((record_size, bytes_read)) => (record_size as usize, bytes_read),
Err(LimboError::Corrupt(_))
if self.io_state.get() != SortedChunkIOState::ReadEOF =>
{
// Failed to decode a partial varint.
break;
}
Err(e) => {
return Err(e);
}
};
if record_size > buffer_len - (buffer_offset + bytes_read) {
if self.io_state.get() == SortedChunkIOState::ReadEOF {
crate::bail_corrupt_error!("Incomplete record");
fn next(&mut self) -> Result<IOResult<Option<ImmutableRecord>>> {
loop {
match self.next_state {
NextState::Start => {
let mut buffer_len = self.buffer_len.get();
if self.records.is_empty() && buffer_len == 0 {
return Ok(IOResult::Done(None));
}
if self.records.is_empty() {
let mut buffer_ref = self.buffer.borrow_mut();
let buffer = buffer_ref.as_mut_slice();
let mut buffer_offset = 0;
while buffer_offset < buffer_len {
// Extract records from the buffer until we run out of the buffer or we hit an incomplete record.
let (record_size, bytes_read) =
match read_varint(&buffer[buffer_offset..buffer_len]) {
Ok((record_size, bytes_read)) => {
(record_size as usize, bytes_read)
}
Err(LimboError::Corrupt(_))
if self.io_state.get() != SortedChunkIOState::ReadEOF =>
{
// Failed to decode a partial varint.
break;
}
Err(e) => {
return Err(e);
}
};
if record_size > buffer_len - (buffer_offset + bytes_read) {
if self.io_state.get() == SortedChunkIOState::ReadEOF {
crate::bail_corrupt_error!("Incomplete record");
}
break;
}
buffer_offset += bytes_read;
let mut record = ImmutableRecord::new(record_size);
record.start_serialization(
&buffer[buffer_offset..buffer_offset + record_size],
);
buffer_offset += record_size;
self.records.push(record);
}
if buffer_offset < buffer_len {
buffer.copy_within(buffer_offset..buffer_len, 0);
buffer_len -= buffer_offset;
} else {
buffer_len = 0;
}
self.buffer_len.set(buffer_len);
self.records.reverse();
}
self.next_state = NextState::Finish;
// This check is done to see if we need to read more from the chunk before popping the record
if self.records.len() == 1 && self.io_state.get() != SortedChunkIOState::ReadEOF
{
// We've consumed the last record. Read more payload into the buffer.
if self.chunk_size - self.total_bytes_read.get() == 0 {
self.io_state.set(SortedChunkIOState::ReadEOF);
} else {
let _c = self.read()?;
return Ok(IOResult::IO);
}
}
break;
}
buffer_offset += bytes_read;
let mut record = ImmutableRecord::new(record_size);
record.start_serialization(&buffer[buffer_offset..buffer_offset + record_size]);
buffer_offset += record_size;
self.records.push(record);
NextState::Finish => {
self.next_state = NextState::Start;
return Ok(IOResult::Done(self.records.pop()));
}
}
if buffer_offset < buffer_len {
buffer.copy_within(buffer_offset..buffer_len, 0);
buffer_len -= buffer_offset;
} else {
buffer_len = 0;
}
self.buffer_len.set(buffer_len);
self.records.reverse();
}
let record = self.records.pop();
if self.records.is_empty() && self.io_state.get() != SortedChunkIOState::ReadEOF {
// We've consumed the last record. Read more payload into the buffer.
self.read()?;
}
Ok(record)
}
fn read(&mut self) -> Result<()> {
if self.io_state.get() == SortedChunkIOState::ReadEOF {
return Ok(());
}
if self.chunk_size - self.total_bytes_read.get() == 0 {
self.io_state.set(SortedChunkIOState::ReadEOF);
return Ok(());
}
fn read(&mut self) -> Result<Completion> {
self.io_state.set(SortedChunkIOState::WaitingForRead);
let read_buffer_size = self.buffer.borrow().len() - self.buffer_len.get();
@@ -395,23 +513,21 @@ impl SortedChunk {
});
let c = Completion::new_read(read_buffer_ref, read_complete);
let _c = self.file.pread(self.total_bytes_read.get(), c)?;
Ok(())
let c = self
.file
.pread(self.start_offset + self.total_bytes_read.get(), c)?;
Ok(c)
}
fn write(&mut self, records: &mut Vec<SortableImmutableRecord>) -> Result<()> {
fn write(
&mut self,
records: &mut Vec<SortableImmutableRecord>,
record_size_lengths: Vec<usize>,
chunk_size: usize,
) -> Result<Completion> {
assert!(self.io_state.get() == SortedChunkIOState::None);
self.io_state.set(SortedChunkIOState::WaitingForWrite);
self.chunk_size = 0;
// Pre-compute varint lengths for record sizes to determine the total buffer size.
let mut record_size_lengths = Vec::with_capacity(records.len());
for record in records.iter() {
let record_size = record.record.get_payload().len();
let size_len = varint_len(record_size as u64);
record_size_lengths.push(size_len);
self.chunk_size += size_len + record_size;
}
self.chunk_size = chunk_size;
let buffer = Buffer::new_temporary(self.chunk_size);
@@ -440,8 +556,8 @@ impl SortedChunk {
});
let c = Completion::new_write(write_complete);
let _c = self.file.pwrite(0, buffer_ref, c)?;
Ok(())
let c = self.file.pwrite(self.start_offset, buffer_ref, c)?;
Ok(c)
}
}
@@ -564,9 +680,9 @@ impl Eq for SortableImmutableRecord {}
enum SortedChunkIOState {
WaitingForRead,
ReadComplete,
ReadEOF,
WaitingForWrite,
WriteComplete,
ReadEOF,
None,
}
@@ -575,6 +691,7 @@ mod tests {
use super::*;
use crate::translate::collate::CollationSeq;
use crate::types::{ImmutableRecord, RefValue, Value, ValueType};
use crate::util::IOExt;
use crate::PlatformIO;
use rand_chacha::{
rand_core::{RngCore, SeedableRng},
@@ -600,16 +717,17 @@ mod tests {
let mut rng = ChaCha8Rng::seed_from_u64(seed);
let io = Arc::new(PlatformIO::new().unwrap());
let mut sorter = Sorter::new(
&[SortOrder::Asc],
vec![CollationSeq::Binary],
256,
64,
io.clone(),
);
let attempts = 8;
for _ in 0..attempts {
let mut sorter = Sorter::new(
&[SortOrder::Asc],
vec![CollationSeq::Binary],
256,
64,
io.clone(),
);
let num_records = 1000 + rng.next_u64() % 2000;
let num_records = num_records as i64;
@@ -622,17 +740,13 @@ mod tests {
values.append(&mut generate_values(&mut rng, &value_types));
let record = ImmutableRecord::from_values(&values, values.len());
sorter.insert(&record).expect("Failed to insert the record");
io.block(|| sorter.insert(&record))
.expect("Failed to insert the record");
initial_records.push(record);
}
loop {
if let IOResult::IO = sorter.sort().expect("Failed to sort the records") {
io.run_once().expect("Failed to run the IO");
continue;
}
break;
}
io.block(|| sorter.sort())
.expect("Failed to sort the records");
assert!(!sorter.is_empty());
assert!(!sorter.chunks.is_empty());
@@ -644,13 +758,8 @@ mod tests {
// Check that the record remained unchanged after sorting.
assert_eq!(record, &initial_records[(num_records - i - 1) as usize]);
loop {
if let IOResult::IO = sorter.next().expect("Failed to get the next record") {
io.run_once().expect("Failed to run the IO");
continue;
}
break;
}
io.block(|| sorter.next())
.expect("Failed to get the next record");
}
assert!(!sorter.has_more());
}