SorterChunk read and write should return completions

This commit is contained in:
pedrocarlo
2025-08-06 12:32:52 -03:00
parent 0c9ecbc768
commit 2ffc5ee423

View File

@@ -155,7 +155,7 @@ impl Sorter {
SortedChunkIOState::WriteComplete => {
all_read_complete = false;
// Write complete, we can now read from the chunk.
chunk.read()?;
let _c = chunk.read()?;
}
SortedChunkIOState::WaitingForWrite | SortedChunkIOState::WaitingForRead => {
all_read_complete = false;
@@ -249,7 +249,7 @@ impl Sorter {
.min_chunk_read_buffer_size
.max(self.max_payload_size_in_buffer + 9);
let mut chunk = SortedChunk::new(chunk_file.clone(), chunk_buffer_size);
chunk.write(&mut self.records)?;
let _c = chunk.write(&mut self.records)?;
self.chunks.push(chunk);
self.current_buffer_size = 0;
@@ -346,19 +346,16 @@ impl SortedChunk {
let record = self.records.pop();
if self.records.is_empty() && self.io_state.get() != SortedChunkIOState::ReadEOF {
// We've consumed the last record. Read more payload into the buffer.
self.read()?;
if self.chunk_size - self.total_bytes_read.get() == 0 {
self.io_state.set(SortedChunkIOState::ReadEOF);
} else {
let _c = self.read()?;
}
}
Ok(record)
}
fn read(&mut self) -> Result<()> {
if self.io_state.get() == SortedChunkIOState::ReadEOF {
return Ok(());
}
if self.chunk_size - self.total_bytes_read.get() == 0 {
self.io_state.set(SortedChunkIOState::ReadEOF);
return Ok(());
}
fn read(&mut self) -> Result<Completion> {
self.io_state.set(SortedChunkIOState::WaitingForRead);
let read_buffer_size = self.buffer.borrow().len() - self.buffer_len.get();
@@ -395,11 +392,11 @@ impl SortedChunk {
});
let c = Completion::new_read(read_buffer_ref, read_complete);
let _c = self.file.pread(self.total_bytes_read.get(), c)?;
Ok(())
let c = self.file.pread(self.total_bytes_read.get(), c)?;
Ok(c)
}
fn write(&mut self, records: &mut Vec<SortableImmutableRecord>) -> Result<()> {
fn write(&mut self, records: &mut Vec<SortableImmutableRecord>) -> Result<Completion> {
assert!(self.io_state.get() == SortedChunkIOState::None);
self.io_state.set(SortedChunkIOState::WaitingForWrite);
self.chunk_size = 0;
@@ -440,8 +437,8 @@ impl SortedChunk {
});
let c = Completion::new_write(write_complete);
let _c = self.file.pwrite(0, buffer_ref, c)?;
Ok(())
let c = self.file.pwrite(0, buffer_ref, c)?;
Ok(c)
}
}