From 7b6fc0f3b6725731670e5cca268283ca4c073e09 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Sat, 27 Sep 2025 14:29:48 +0300 Subject: [PATCH] core/vdbe: Wrap SortedChunk::total_bytes_read with AtomicUsize --- core/vdbe/sorter.rs | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/core/vdbe/sorter.rs b/core/vdbe/sorter.rs index e7ef60503..105d8095d 100644 --- a/core/vdbe/sorter.rs +++ b/core/vdbe/sorter.rs @@ -1,6 +1,6 @@ use turso_parser::ast::SortOrder; -use std::cell::{Cell, RefCell}; +use std::cell::RefCell; use std::cmp::{Eq, Ord, Ordering, PartialEq, PartialOrd, Reverse}; use std::collections::BinaryHeap; use std::rc::Rc; @@ -408,7 +408,7 @@ struct SortedChunk { /// The current IO state of the chunk. io_state: Arc>, /// The total number of bytes read from the chunk file. - total_bytes_read: Rc>, + total_bytes_read: Arc, /// State machine for [SortedChunk::next] next_state: NextState, } @@ -423,7 +423,7 @@ impl SortedChunk { buffer_len: Arc::new(atomic::AtomicUsize::new(0)), records: Vec::new(), io_state: Arc::new(RwLock::new(SortedChunkIOState::None)), - total_bytes_read: Rc::new(Cell::new(0)), + total_bytes_read: Arc::new(atomic::AtomicUsize::new(0)), next_state: NextState::Start, } } @@ -500,7 +500,9 @@ impl SortedChunk { && *self.io_state.read().unwrap() != SortedChunkIOState::ReadEOF { // We've consumed the last record. Read more payload into the buffer. - if self.chunk_size - self.total_bytes_read.get() == 0 { + if self.chunk_size - self.total_bytes_read.load(atomic::Ordering::SeqCst) + == 0 + { *self.io_state.write().unwrap() = SortedChunkIOState::ReadEOF; } else { let c = self.read()?; @@ -520,7 +522,8 @@ impl SortedChunk { *self.io_state.write().unwrap() = SortedChunkIOState::WaitingForRead; let read_buffer_size = self.buffer.read().unwrap().len() - self.buffer_len(); - let read_buffer_size = read_buffer_size.min(self.chunk_size - self.total_bytes_read.get()); + let read_buffer_size = read_buffer_size + .min(self.chunk_size - self.total_bytes_read.load(atomic::Ordering::SeqCst)); let read_buffer = Buffer::new_temporary(read_buffer_size); let read_buffer_ref = Arc::new(read_buffer); @@ -552,13 +555,14 @@ impl SortedChunk { stored_buf_len += bytes_read; stored_buffer_len_copy.store(stored_buf_len, atomic::Ordering::SeqCst); - total_bytes_read_copy.set(total_bytes_read_copy.get() + bytes_read); + total_bytes_read_copy.fetch_add(bytes_read, atomic::Ordering::SeqCst); }); let c = Completion::new_read(read_buffer_ref, read_complete); - let c = self - .file - .pread(self.start_offset + self.total_bytes_read.get() as u64, c)?; + let c = self.file.pread( + self.start_offset + self.total_bytes_read.load(atomic::Ordering::SeqCst) as u64, + c, + )?; Ok(c) }