From 1402e9841e9844db07f129a8dbc0fc373d44d1d1 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Fri, 26 Sep 2025 14:17:51 +0300 Subject: [PATCH] core/mvcc: Wrap StreamingLogicalLogReader::buffer with RwLock --- core/mvcc/persistent_storage/logical_log.rs | 26 +++++++++++---------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/core/mvcc/persistent_storage/logical_log.rs b/core/mvcc/persistent_storage/logical_log.rs index 9c436ecb0..89ac6d053 100644 --- a/core/mvcc/persistent_storage/logical_log.rs +++ b/core/mvcc/persistent_storage/logical_log.rs @@ -11,7 +11,7 @@ use crate::{ types::{IOCompletions, ImmutableRecord}, Buffer, Completion, CompletionError, LimboError, Pager, Result, }; -use std::{cell::RefCell, sync::Arc}; +use std::sync::{Arc, RwLock}; use crate::{types::IOResult, File}; @@ -237,7 +237,7 @@ struct StreamingLogicalLogReader { /// Log Header header: Option>, /// Cached buffer after io read - buffer: Arc>>, + buffer: Arc>>, /// Position to read from loaded buffer buffer_offset: usize, file_size: usize, @@ -251,7 +251,7 @@ impl StreamingLogicalLogReader { file, offset: 0, header: None, - buffer: Arc::new(RefCell::new(Vec::with_capacity(4096))), + buffer: Arc::new(RwLock::new(Vec::with_capacity(4096))), buffer_offset: 0, file_size, state: StreamingState::NeedTransactionStart, @@ -366,7 +366,7 @@ impl StreamingLogicalLogReader { fn consume_u8(&mut self, io: &Arc) -> Result { self.read_more_data(io, 1)?; - let r = self.buffer.borrow()[self.buffer_offset]; + let r = self.buffer.read().unwrap()[self.buffer_offset]; self.buffer_offset += 1; Ok(r) } @@ -374,7 +374,7 @@ impl StreamingLogicalLogReader { fn consume_u64(&mut self, io: &Arc) -> Result { self.read_more_data(io, 8)?; let r = u64::from_be_bytes( - self.buffer.borrow()[self.buffer_offset..self.buffer_offset + 8] + self.buffer.read().unwrap()[self.buffer_offset..self.buffer_offset + 8] .try_into() .unwrap(), ); @@ -384,7 +384,8 @@ impl StreamingLogicalLogReader { fn consume_varint(&mut self, io: &Arc) -> Result<(u64, usize)> { self.read_more_data(io, 9)?; - let buffer = &self.buffer.borrow()[self.buffer_offset..]; + let buffer_guard = self.buffer.read().unwrap(); + let buffer = &buffer_guard[self.buffer_offset..]; let (v, n) = read_varint(buffer)?; self.buffer_offset += n; Ok((v, n)) @@ -392,13 +393,14 @@ impl StreamingLogicalLogReader { fn consume_buffer(&mut self, io: &Arc, amount: usize) -> Result> { self.read_more_data(io, amount)?; - let buffer = self.buffer.borrow()[self.buffer_offset..self.buffer_offset + amount].to_vec(); + let buffer = + self.buffer.read().unwrap()[self.buffer_offset..self.buffer_offset + amount].to_vec(); self.buffer_offset += amount; Ok(buffer) } - fn get_buffer(&self) -> std::cell::Ref<'_, Vec> { - self.buffer.borrow() + fn get_buffer(&self) -> std::sync::RwLockReadGuard<'_, Vec> { + self.buffer.read().unwrap() } pub fn read_more_data(&mut self, io: &Arc, need: usize) -> Result<()> { @@ -412,7 +414,7 @@ impl StreamingLogicalLogReader { let buffer = self.buffer.clone(); let completion: Box = Box::new(move |res| { let buffer = buffer.clone(); - let mut buffer = buffer.borrow_mut(); + let mut buffer = buffer.write().unwrap(); let Ok((buf, _bytes_read)) = res else { tracing::trace!("couldn't ready log err={:?}", res,); return; @@ -426,13 +428,13 @@ impl StreamingLogicalLogReader { self.offset += to_read; // cleanup consumed bytes // this could be better for sure - let _ = self.buffer.borrow_mut().drain(0..self.buffer_offset); + let _ = self.buffer.write().unwrap().drain(0..self.buffer_offset); self.buffer_offset = 0; Ok(()) } fn bytes_can_read(&self) -> usize { - self.buffer.borrow().len() - self.buffer_offset + self.buffer.read().unwrap().len() - self.buffer_offset } }