From cc75bc448ec8b2f3bd0166612f713911d1ed7fd0 Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Sat, 2 Aug 2025 21:47:32 -0400 Subject: [PATCH] Move TLC buffer cache to io/mod --- core/io/mod.rs | 61 +++++++++++++++- core/storage/buffer_pool.rs | 141 +++++++----------------------------- 2 files changed, 85 insertions(+), 117 deletions(-) diff --git a/core/io/mod.rs b/core/io/mod.rs index e78b9d519..0bded8c49 100644 --- a/core/io/mod.rs +++ b/core/io/mod.rs @@ -1,5 +1,6 @@ -use crate::storage::buffer_pool::{ArenaBuffer, TEMP_BUFFER_CACHE}; -use crate::Result; +use crate::storage::buffer_pool::ArenaBuffer; +use crate::storage::sqlite3_ondisk::WAL_FRAME_HEADER_SIZE; +use crate::{BufferPool, Result}; use bitflags::bitflags; use cfg_block::cfg_block; use std::fmt; @@ -330,9 +331,11 @@ impl Drop for Buffer { fn drop(&mut self) { let len = self.len(); if let Self::Heap(buf) = self { - crate::storage::buffer_pool::TEMP_BUFFER_CACHE.with(|cache| { + TEMP_BUFFER_CACHE.with(|cache| { + let mut cache = cache.borrow_mut(); + // take ownership of the buffer by swapping it with a dummy let buffer = std::mem::replace(buf, Pin::new(vec![].into_boxed_slice())); - cache.borrow_mut().return_buffer(buffer, len); + cache.return_buffer(buffer, len); }); } } @@ -406,6 +409,56 @@ impl Buffer { } } +thread_local! { + /// thread local cache to re-use temporary buffers to prevent churn when pool overflows + pub static TEMP_BUFFER_CACHE: RefCell = RefCell::new(TempBufferCache::new()); +} + +pub(crate) struct TempBufferCache { + // Buffers indexed by size, we only cache common sizes + page_size: usize, + page_buffers: Vec, + wal_frame_buffers: Vec, + max_cached: usize, +} + +impl TempBufferCache { + fn new() -> Self { + Self { + page_size: BufferPool::DEFAULT_PAGE_SIZE, + page_buffers: Vec::with_capacity(8), + wal_frame_buffers: Vec::with_capacity(8), + max_cached: 512, + } + } + + pub fn reinit_cache(&mut self, page_size: usize) { + self.page_buffers.clear(); + self.wal_frame_buffers.clear(); + self.page_size = page_size; + } + + fn get_buffer(&mut self, size: usize) -> Option { + match size { + sz if sz == self.page_size => self.page_buffers.pop(), + sz if sz == (self.page_size + WAL_FRAME_HEADER_SIZE) => self.wal_frame_buffers.pop(), + _ => None, + } + } + + fn return_buffer(&mut self, buff: BufferData, len: usize) { + let sz = self.page_size; + let cache = match len { + n if n.eq(&sz) => &mut self.page_buffers, + n if n.eq(&(sz + WAL_FRAME_HEADER_SIZE)) => &mut self.wal_frame_buffers, + _ => return, + }; + if self.max_cached > cache.len() { + cache.push(buff); + } + } +} + cfg_block! { #[cfg(all(target_os = "linux", feature = "io_uring"))] { mod io_uring; diff --git a/core/storage/buffer_pool.rs b/core/storage/buffer_pool.rs index dbe9e0f92..5f824b9cb 100644 --- a/core/storage/buffer_pool.rs +++ b/core/storage/buffer_pool.rs @@ -1,72 +1,41 @@ use crate::fast_lock::SpinLock; -use crate::io::BufferData; +use crate::io::TEMP_BUFFER_CACHE; use crate::storage::sqlite3_ondisk::WAL_FRAME_HEADER_SIZE; use crate::{turso_assert, Buffer, LimboError, IO}; use parking_lot::Mutex; -use std::cell::{RefCell, UnsafeCell}; +use std::cell::UnsafeCell; use std::ptr::NonNull; use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering}; use std::sync::{Arc, OnceLock}; pub static BUFFER_POOL: OnceLock> = OnceLock::new(); -// Packed metadata structure -// Bits 0-23: logical length (16MB max) -// Bits 26-40: arena_id (15 bits) -// Bits 41-55: page_idx (15 bits) -#[derive(Clone, Copy)] -#[repr(transparent)] -struct Metadata(u64); -impl Metadata { - const fn new(logical_len: usize, arena_id: u32, page_idx: u32) -> Self { - let mut meta = logical_len as u64 & 0xFFFFFF; - meta |= ((arena_id as u64) & 0x7FFF) << 26; - meta |= ((page_idx as u64) & 0x7FFF) << 41; - Self(meta) - } - const fn logical_len(&self) -> usize { - (self.0 & 0xFFFFFF) as usize - } - - const fn arena_id(&self) -> u32 { - ((self.0 >> 26) & 0x7FFF) as u32 - } - - const fn page_idx(&self) -> u32 { - ((self.0 >> 41) & 0x7FFF) as u32 - } -} - #[derive(Debug)] -/// A buffer allocated from an arena, or a clone of one. -/// NOTE: we currently only deep clone a buffer in the btree when defragmenting. -/// we can remove all the plumbing when that is merged. +/// A buffer allocated from an arena from `[BufferPool]` pub struct ArenaBuffer { ptr: NonNull, - meta: Metadata, + arena_id: u32, + page_idx: u32, + len: usize, } + const REGISTERED_ID: u32 = 0; -impl std::fmt::Debug for Metadata { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("BufferMetadata") - .field("logical_len", &self.logical_len()) - .field("arena_id", &self.arena_id()) - .finish() - } -} - impl ArenaBuffer { - const fn new(ptr: NonNull, meta: Metadata) -> Self { - ArenaBuffer { ptr, meta } + const fn new(ptr: NonNull, len: usize, arena_id: u32, page_idx: u32) -> Self { + ArenaBuffer { + ptr, + arena_id, + page_idx, + len, + } } #[inline(always)] /// Returns the `id` of the underlying arena if it is registered with `io_uring` pub fn fixed_id(&self) -> Option { - let id = self.meta.arena_id(); - if id == REGISTERED_ID { - Some(id) + if self.arena_id == REGISTERED_ID { + Some(REGISTERED_ID) } else { None } @@ -75,7 +44,7 @@ impl ArenaBuffer { /// The requested size of the allocation. The actual size is always rounded up to /// the next multiple of ARENA_PAGE_SIZE. pub const fn logical_len(&self) -> usize { - self.meta.logical_len() + self.len } pub fn as_slice(&self) -> &[u8] { unsafe { std::slice::from_raw_parts(self.ptr.as_ptr(), self.logical_len()) } @@ -90,11 +59,7 @@ impl Drop for ArenaBuffer { let pool = BUFFER_POOL .get() .expect("BufferPool not initialized, cannot free ArenaBuffer"); - pool.free( - self.meta.arena_id(), - self.meta.page_idx(), - self.logical_len(), - ); + pool.free(self.logical_len(), self.arena_id, self.page_idx); } } @@ -213,12 +178,14 @@ impl BufferPool { } #[inline] - pub fn free(&self, arena_id: u32, page_id: u32, size: usize) { - self.inner_mut().free(arena_id, page_id, size); + pub fn free(&self, size: usize, arena_id: u32, page_id: u32) { + self.inner_mut().free(size, arena_id, page_id); } } impl PoolInner { + /// Allocate a buffer of the given length from the pool, falling back to + /// temporary thread local buffers if the pool is not initialized or full pub fn allocate(&self, len: usize) -> Buffer { turso_assert!(len > 0, "Cannot allocate zero-length buffer"); let Some(arena) = self.arena.as_ref() else { @@ -232,10 +199,7 @@ impl PoolInner { arena.id, first_idx ); - return Buffer::new_pooled(ArenaBuffer::new( - ptr, - Metadata::new(len, arena.id, first_idx), - )); + return Buffer::new_pooled(ArenaBuffer::new(ptr, len, arena.id, first_idx)); } Buffer::new_temporary(len) } @@ -272,7 +236,7 @@ impl PoolInner { } } - pub fn free(&mut self, arena_id: u32, page_idx: u32, size: usize) { + pub fn free(&mut self, size: usize, arena_id: u32, page_idx: u32) { let arena = self .arena .as_mut() @@ -346,7 +310,7 @@ impl Arena { return None; } if pages == 1 { - // fast path: most allocations are single pages + // fast path: for now, most/all allocations are single pages if let Some(page_idx) = free.pop() { self.allocated_pages.fetch_add(pages, Ordering::Relaxed); let offset = page_idx as usize * self.page_size; @@ -377,9 +341,10 @@ impl Arena { self.allocated_pages.fetch_sub(count, Ordering::Relaxed); } + #[cold] fn try_alloc_many(&self, pages: usize) -> Option { - // TODO: for now, we don't request buffers > len of a wal frame. - // if we eventually do, we can optimize this further + // TODO (preston): we can optimize this further when we start allocating larger + // contiguous blocks for coalescing. this is 'unused' for now let mut free = self.free_pages.lock(); if pages <= 3 && free.len() >= pages { let start = free.len() - pages; @@ -461,7 +426,6 @@ mod arena { #[cfg(not(unix))] mod arena { - use crate::bufferpool::DEFAULT_PAGE_SIZE; pub fn alloc(len: usize) -> *mut u8 { let layout = std::alloc::Layout::from_size_align(len, std::mem::size_of::()).unwrap(); unsafe { std::alloc::alloc(layout) } @@ -471,52 +435,3 @@ mod arena { unsafe { std::alloc::dealloc(ptr, layout) }; } } - -thread_local! { - pub static TEMP_BUFFER_CACHE: RefCell = RefCell::new(TempBufferCache::new()); -} - -pub(crate) struct TempBufferCache { - // Buffers indexed by size, we only cache common sizes - page_size: usize, - page_buffers: Vec, - wal_frame_buffers: Vec, - max_cached: usize, -} - -impl TempBufferCache { - fn new() -> Self { - Self { - page_size: BufferPool::DEFAULT_PAGE_SIZE, - page_buffers: Vec::with_capacity(8), - wal_frame_buffers: Vec::with_capacity(8), - max_cached: 512, - } - } - - fn reinit_cache(&mut self, page_size: usize) { - self.page_buffers.clear(); - self.wal_frame_buffers.clear(); - self.page_size = page_size; - } - - pub fn get_buffer(&mut self, size: usize) -> Option { - match size { - sz if sz == self.page_size => self.page_buffers.pop(), - sz if sz == (self.page_size + WAL_FRAME_HEADER_SIZE) => self.wal_frame_buffers.pop(), - _ => None, - } - } - - pub fn return_buffer(&mut self, buff: BufferData, len: usize) { - let sz = self.page_size; - let cache = match len { - n if n.eq(&sz) => &mut self.page_buffers, - n if n.eq(&(sz + WAL_FRAME_HEADER_SIZE)) => &mut self.wal_frame_buffers, - _ => return, - }; - if self.max_cached > cache.len() { - cache.push(buff); - } - } -}