From 61b3f569976da59a2b8c30f9ce46b95db5289d65 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Sat, 27 Sep 2025 14:25:35 +0300 Subject: [PATCH] core/vdbe: Wrap SortedChunk::io_state with RwLock --- core/vdbe/sorter.rs | 46 ++++++++++++++++++++++++++------------------- 1 file changed, 27 insertions(+), 19 deletions(-) diff --git a/core/vdbe/sorter.rs b/core/vdbe/sorter.rs index 088fba8f9..e7ef60503 100644 --- a/core/vdbe/sorter.rs +++ b/core/vdbe/sorter.rs @@ -175,7 +175,10 @@ impl Sorter { SortState::InitHeap => { turso_assert!( !self.chunks.iter().any(|chunk| { - matches!(chunk.io_state.get(), SortedChunkIOState::WaitingForWrite) + matches!( + *chunk.io_state.read().unwrap(), + SortedChunkIOState::WaitingForWrite + ) }), "chunks should been written" ); @@ -231,7 +234,10 @@ impl Sorter { InsertState::Insert => { turso_assert!( !self.chunks.iter().any(|chunk| { - matches!(chunk.io_state.get(), SortedChunkIOState::WaitingForWrite) + matches!( + *chunk.io_state.read().unwrap(), + SortedChunkIOState::WaitingForWrite + ) }), "chunks should have written" ); @@ -272,7 +278,7 @@ impl Sorter { // Make sure all chunks read at least one record into their buffer. turso_assert!( !self.chunks.iter().any(|chunk| matches!( - chunk.io_state.get(), + *chunk.io_state.read().unwrap(), SortedChunkIOState::WaitingForRead )), "chunks should have been read" @@ -292,10 +298,10 @@ impl Sorter { fn next_from_chunk_heap(&mut self) -> Result>> { // Make sure all chunks read at least one record into their buffer. turso_assert!( - !self - .chunks - .iter() - .any(|chunk| matches!(chunk.io_state.get(), SortedChunkIOState::WaitingForRead)), + !self.chunks.iter().any(|chunk| matches!( + *chunk.io_state.read().unwrap(), + SortedChunkIOState::WaitingForRead + )), "chunks should have been read" ); @@ -400,7 +406,7 @@ struct SortedChunk { /// The records decoded from the chunk file. records: Vec, /// The current IO state of the chunk. - io_state: Rc>, + io_state: Arc>, /// The total number of bytes read from the chunk file. total_bytes_read: Rc>, /// State machine for [SortedChunk::next] @@ -416,7 +422,7 @@ impl SortedChunk { buffer: Arc::new(RwLock::new(vec![0; buffer_size])), buffer_len: Arc::new(atomic::AtomicUsize::new(0)), records: Vec::new(), - io_state: Rc::new(Cell::new(SortedChunkIOState::None)), + io_state: Arc::new(RwLock::new(SortedChunkIOState::None)), total_bytes_read: Rc::new(Cell::new(0)), next_state: NextState::Start, } @@ -451,7 +457,8 @@ impl SortedChunk { (record_size as usize, bytes_read) } Err(LimboError::Corrupt(_)) - if self.io_state.get() != SortedChunkIOState::ReadEOF => + if *self.io_state.read().unwrap() + != SortedChunkIOState::ReadEOF => { // Failed to decode a partial varint. break; @@ -461,7 +468,7 @@ impl SortedChunk { } }; if record_size > buffer_len - (buffer_offset + bytes_read) { - if self.io_state.get() == SortedChunkIOState::ReadEOF { + if *self.io_state.read().unwrap() == SortedChunkIOState::ReadEOF { crate::bail_corrupt_error!("Incomplete record"); } break; @@ -489,11 +496,12 @@ impl SortedChunk { self.next_state = NextState::Finish; // This check is done to see if we need to read more from the chunk before popping the record - if self.records.len() == 1 && self.io_state.get() != SortedChunkIOState::ReadEOF + if self.records.len() == 1 + && *self.io_state.read().unwrap() != SortedChunkIOState::ReadEOF { // We've consumed the last record. Read more payload into the buffer. if self.chunk_size - self.total_bytes_read.get() == 0 { - self.io_state.set(SortedChunkIOState::ReadEOF); + *self.io_state.write().unwrap() = SortedChunkIOState::ReadEOF; } else { let c = self.read()?; io_yield_one!(c); @@ -509,7 +517,7 @@ impl SortedChunk { } fn read(&mut self) -> Result { - self.io_state.set(SortedChunkIOState::WaitingForRead); + *self.io_state.write().unwrap() = SortedChunkIOState::WaitingForRead; let read_buffer_size = self.buffer.read().unwrap().len() - self.buffer_len(); let read_buffer_size = read_buffer_size.min(self.chunk_size - self.total_bytes_read.get()); @@ -530,10 +538,10 @@ impl SortedChunk { let bytes_read = bytes_read as usize; if bytes_read == 0 { - chunk_io_state_copy.set(SortedChunkIOState::ReadEOF); + *chunk_io_state_copy.write().unwrap() = SortedChunkIOState::ReadEOF; return; } - chunk_io_state_copy.set(SortedChunkIOState::ReadComplete); + *chunk_io_state_copy.write().unwrap() = SortedChunkIOState::ReadComplete; let mut stored_buf_ref = stored_buffer_copy.write().unwrap(); let stored_buf = stored_buf_ref.as_mut_slice(); @@ -560,8 +568,8 @@ impl SortedChunk { record_size_lengths: Vec, chunk_size: usize, ) -> Result { - assert!(self.io_state.get() == SortedChunkIOState::None); - self.io_state.set(SortedChunkIOState::WaitingForWrite); + assert!(*self.io_state.read().unwrap() == SortedChunkIOState::None); + *self.io_state.write().unwrap() = SortedChunkIOState::WaitingForWrite; self.chunk_size = chunk_size; let buffer = Buffer::new_temporary(self.chunk_size); @@ -586,7 +594,7 @@ impl SortedChunk { let Ok(bytes_written) = res else { return; }; - chunk_io_state_copy.set(SortedChunkIOState::WriteComplete); + *chunk_io_state_copy.write().unwrap() = SortedChunkIOState::WriteComplete; let buf_len = buffer_ref_copy.len(); if bytes_written < buf_len as i32 { tracing::error!("wrote({bytes_written}) less than expected({buf_len})");