Update Buffer type in io module to adjust to new pool

This commit is contained in:
PThorpe92
2025-08-02 11:26:48 -04:00
parent d38cd6360a
commit 39bccc2357

View File

@@ -1,10 +1,11 @@
use crate::storage::buffer_pool::{ArenaBuffer, TEMP_BUFFER_CACHE};
use crate::Result;
use bitflags::bitflags;
use cfg_block::cfg_block;
use std::fmt;
use std::ptr::NonNull;
use std::sync::Arc;
use std::{cell::Cell, fmt::Debug, mem::ManuallyDrop, pin::Pin, rc::Rc};
use std::{cell::Cell, fmt::Debug, pin::Pin};
pub trait File: Send + Sync {
fn lock_file(&self, exclusive: bool) -> Result<()>;
@@ -290,62 +291,115 @@ impl TruncateCompletion {
}
}
pub type BufferData = Pin<Vec<u8>>;
pub type BufferData = Pin<Box<[u8]>>;
pub type BufferDropFn = Rc<dyn Fn(BufferData)>;
pub struct Buffer {
data: ManuallyDrop<BufferData>,
drop: BufferDropFn,
pub enum Buffer {
Heap(BufferData),
Pooled(ArenaBuffer),
}
impl Debug for Buffer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self.data)
match self {
Self::Pooled(p) => write!(f, "{p:?}"),
Self::Heap(buf) => write!(f, "{buf:?}: {}", buf.len()),
}
}
}
impl Clone for Buffer {
fn clone(&self) -> Self {
match self {
Self::Heap(buf) => {
let len = buf.len();
Self::Heap(Vec::from(&buf[..len]).into_boxed_slice().into())
}
Self::Pooled(buf) => {
// Clone pooled buffers as heap buffers
let data = Vec::from(buf.as_slice());
Self::Heap(Pin::new(data.into_boxed_slice()))
}
}
}
}
impl Drop for Buffer {
fn drop(&mut self) {
let data = unsafe { ManuallyDrop::take(&mut self.data) };
(self.drop)(data);
let len = self.len();
if let Self::Heap(buf) = self {
crate::storage::buffer_pool::TEMP_BUFFER_CACHE.with(|cache| {
let buffer = std::mem::replace(buf, Pin::new(vec![].into_boxed_slice()));
cache.borrow_mut().return_buffer(buffer, len);
});
}
}
}
impl Buffer {
pub fn allocate(size: usize, drop: BufferDropFn) -> Self {
let data = ManuallyDrop::new(Pin::new(vec![0; size]));
Self { data, drop }
pub fn new(data: Vec<u8>) -> Self {
tracing::trace!("buffer::new({:?})", data);
Self::Heap(Pin::new(data.into_boxed_slice()))
}
pub fn fixed_id(&self) -> Option<u32> {
match self {
Self::Heap { .. } => None,
Self::Pooled(buf) => buf.fixed_id(),
}
}
pub fn new(data: BufferData, drop: BufferDropFn) -> Self {
let data = ManuallyDrop::new(data);
Self { data, drop }
pub fn new_pooled(buf: ArenaBuffer) -> Self {
tracing::trace!("new_pooled({:?})", buf);
Self::Pooled(buf)
}
pub fn new_temporary(size: usize) -> Self {
TEMP_BUFFER_CACHE.with(|cache| {
if let Some(buffer) = cache.borrow_mut().get_buffer(size) {
Self::Heap(buffer)
} else {
Self::Heap(Pin::new(vec![0; size].into_boxed_slice()))
}
})
}
pub fn len(&self) -> usize {
self.data.len()
match self {
Self::Heap(buf) => buf.len(),
Self::Pooled(buf) => buf.logical_len(),
}
}
pub fn is_empty(&self) -> bool {
self.data.is_empty()
self.len() == 0
}
pub fn as_slice(&self) -> &[u8] {
&self.data
match self {
Self::Heap(buf) => {
// SAFETY: The buffer is guaranteed to be valid for the lifetime of the slice
unsafe { std::slice::from_raw_parts(buf.as_ptr(), buf.len()) }
}
Self::Pooled(buf) => buf,
}
}
#[allow(clippy::mut_from_ref)]
pub fn as_mut_slice(&self) -> &mut [u8] {
unsafe { std::slice::from_raw_parts_mut(self.as_mut_ptr(), self.data.len()) }
match self {
Self::Heap(buf) => {
// SAFETY: The buffer is guaranteed to be valid for the lifetime of the slice
unsafe { std::slice::from_raw_parts_mut(buf.as_ptr() as *mut u8, buf.len()) }
}
Self::Pooled(buf) => unsafe {
std::slice::from_raw_parts_mut(buf.as_ptr() as *mut u8, buf.len())
},
}
}
pub fn as_ptr(&self) -> *const u8 {
self.data.as_ptr()
}
pub fn as_mut_ptr(&self) -> *mut u8 {
self.data.as_ptr() as *mut u8
pub fn as_mut_ptr(&mut self) -> *mut u8 {
match self {
Self::Heap(buf) => buf.as_mut_ptr(),
Self::Pooled(buf) => buf.as_mut_ptr(),
}
}
}