core/vdbe: Wrap SortedChunk::total_bytes_read with AtomicUsize

This commit is contained in:
Pekka Enberg
2025-09-27 14:29:48 +03:00
parent 61b3f56997
commit 7b6fc0f3b6

View File

@@ -1,6 +1,6 @@
use turso_parser::ast::SortOrder;
use std::cell::{Cell, RefCell};
use std::cell::RefCell;
use std::cmp::{Eq, Ord, Ordering, PartialEq, PartialOrd, Reverse};
use std::collections::BinaryHeap;
use std::rc::Rc;
@@ -408,7 +408,7 @@ struct SortedChunk {
/// The current IO state of the chunk.
io_state: Arc<RwLock<SortedChunkIOState>>,
/// The total number of bytes read from the chunk file.
total_bytes_read: Rc<Cell<usize>>,
total_bytes_read: Arc<atomic::AtomicUsize>,
/// State machine for [SortedChunk::next]
next_state: NextState,
}
@@ -423,7 +423,7 @@ impl SortedChunk {
buffer_len: Arc::new(atomic::AtomicUsize::new(0)),
records: Vec::new(),
io_state: Arc::new(RwLock::new(SortedChunkIOState::None)),
total_bytes_read: Rc::new(Cell::new(0)),
total_bytes_read: Arc::new(atomic::AtomicUsize::new(0)),
next_state: NextState::Start,
}
}
@@ -500,7 +500,9 @@ impl SortedChunk {
&& *self.io_state.read().unwrap() != SortedChunkIOState::ReadEOF
{
// We've consumed the last record. Read more payload into the buffer.
if self.chunk_size - self.total_bytes_read.get() == 0 {
if self.chunk_size - self.total_bytes_read.load(atomic::Ordering::SeqCst)
== 0
{
*self.io_state.write().unwrap() = SortedChunkIOState::ReadEOF;
} else {
let c = self.read()?;
@@ -520,7 +522,8 @@ impl SortedChunk {
*self.io_state.write().unwrap() = SortedChunkIOState::WaitingForRead;
let read_buffer_size = self.buffer.read().unwrap().len() - self.buffer_len();
let read_buffer_size = read_buffer_size.min(self.chunk_size - self.total_bytes_read.get());
let read_buffer_size = read_buffer_size
.min(self.chunk_size - self.total_bytes_read.load(atomic::Ordering::SeqCst));
let read_buffer = Buffer::new_temporary(read_buffer_size);
let read_buffer_ref = Arc::new(read_buffer);
@@ -552,13 +555,14 @@ impl SortedChunk {
stored_buf_len += bytes_read;
stored_buffer_len_copy.store(stored_buf_len, atomic::Ordering::SeqCst);
total_bytes_read_copy.set(total_bytes_read_copy.get() + bytes_read);
total_bytes_read_copy.fetch_add(bytes_read, atomic::Ordering::SeqCst);
});
let c = Completion::new_read(read_buffer_ref, read_complete);
let c = self
.file
.pread(self.start_offset + self.total_bytes_read.get() as u64, c)?;
let c = self.file.pread(
self.start_offset + self.total_bytes_read.load(atomic::Ordering::SeqCst) as u64,
c,
)?;
Ok(c)
}