core/mvcc: Wrap StreamingLogicalLogReader::buffer with RwLock

This commit is contained in:
Pekka Enberg
2025-09-26 14:17:51 +03:00
parent 96accef06c
commit 1402e9841e

View File

@@ -11,7 +11,7 @@ use crate::{
types::{IOCompletions, ImmutableRecord},
Buffer, Completion, CompletionError, LimboError, Pager, Result,
};
use std::{cell::RefCell, sync::Arc};
use std::sync::{Arc, RwLock};
use crate::{types::IOResult, File};
@@ -237,7 +237,7 @@ struct StreamingLogicalLogReader {
/// Log Header
header: Option<Arc<LogHeader>>,
/// Cached buffer after io read
buffer: Arc<RefCell<Vec<u8>>>,
buffer: Arc<RwLock<Vec<u8>>>,
/// Position to read from loaded buffer
buffer_offset: usize,
file_size: usize,
@@ -251,7 +251,7 @@ impl StreamingLogicalLogReader {
file,
offset: 0,
header: None,
buffer: Arc::new(RefCell::new(Vec::with_capacity(4096))),
buffer: Arc::new(RwLock::new(Vec::with_capacity(4096))),
buffer_offset: 0,
file_size,
state: StreamingState::NeedTransactionStart,
@@ -366,7 +366,7 @@ impl StreamingLogicalLogReader {
fn consume_u8(&mut self, io: &Arc<dyn crate::IO>) -> Result<u8> {
self.read_more_data(io, 1)?;
let r = self.buffer.borrow()[self.buffer_offset];
let r = self.buffer.read().unwrap()[self.buffer_offset];
self.buffer_offset += 1;
Ok(r)
}
@@ -374,7 +374,7 @@ impl StreamingLogicalLogReader {
fn consume_u64(&mut self, io: &Arc<dyn crate::IO>) -> Result<u64> {
self.read_more_data(io, 8)?;
let r = u64::from_be_bytes(
self.buffer.borrow()[self.buffer_offset..self.buffer_offset + 8]
self.buffer.read().unwrap()[self.buffer_offset..self.buffer_offset + 8]
.try_into()
.unwrap(),
);
@@ -384,7 +384,8 @@ impl StreamingLogicalLogReader {
fn consume_varint(&mut self, io: &Arc<dyn crate::IO>) -> Result<(u64, usize)> {
self.read_more_data(io, 9)?;
let buffer = &self.buffer.borrow()[self.buffer_offset..];
let buffer_guard = self.buffer.read().unwrap();
let buffer = &buffer_guard[self.buffer_offset..];
let (v, n) = read_varint(buffer)?;
self.buffer_offset += n;
Ok((v, n))
@@ -392,13 +393,14 @@ impl StreamingLogicalLogReader {
fn consume_buffer(&mut self, io: &Arc<dyn crate::IO>, amount: usize) -> Result<Vec<u8>> {
self.read_more_data(io, amount)?;
let buffer = self.buffer.borrow()[self.buffer_offset..self.buffer_offset + amount].to_vec();
let buffer =
self.buffer.read().unwrap()[self.buffer_offset..self.buffer_offset + amount].to_vec();
self.buffer_offset += amount;
Ok(buffer)
}
fn get_buffer(&self) -> std::cell::Ref<'_, Vec<u8>> {
self.buffer.borrow()
fn get_buffer(&self) -> std::sync::RwLockReadGuard<'_, Vec<u8>> {
self.buffer.read().unwrap()
}
pub fn read_more_data(&mut self, io: &Arc<dyn crate::IO>, need: usize) -> Result<()> {
@@ -412,7 +414,7 @@ impl StreamingLogicalLogReader {
let buffer = self.buffer.clone();
let completion: Box<ReadComplete> = Box::new(move |res| {
let buffer = buffer.clone();
let mut buffer = buffer.borrow_mut();
let mut buffer = buffer.write().unwrap();
let Ok((buf, _bytes_read)) = res else {
tracing::trace!("couldn't ready log err={:?}", res,);
return;
@@ -426,13 +428,13 @@ impl StreamingLogicalLogReader {
self.offset += to_read;
// cleanup consumed bytes
// this could be better for sure
let _ = self.buffer.borrow_mut().drain(0..self.buffer_offset);
let _ = self.buffer.write().unwrap().drain(0..self.buffer_offset);
self.buffer_offset = 0;
Ok(())
}
fn bytes_can_read(&self) -> usize {
self.buffer.borrow().len() - self.buffer_offset
self.buffer.read().unwrap().len() - self.buffer_offset
}
}