diff --git a/core/storage/buffer_pool.rs b/core/storage/buffer_pool.rs index 4d7beae2b..4d1ab7e5c 100644 --- a/core/storage/buffer_pool.rs +++ b/core/storage/buffer_pool.rs @@ -1,45 +1,534 @@ +use crate::fast_lock::SpinLock; use crate::io::BufferData; +use crate::storage::sqlite3_ondisk::WAL_FRAME_HEADER_SIZE; +use crate::{turso_assert, Buffer, LimboError, IO}; use parking_lot::Mutex; -use std::pin::Pin; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::cell::{RefCell, UnsafeCell}; +use std::ptr::NonNull; +use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering}; +use std::sync::{Arc, OnceLock}; -use super::sqlite3_ondisk::PageSize; +pub static BUFFER_POOL: OnceLock> = OnceLock::new(); -pub struct BufferPool { - pub free_buffers: Mutex>, - page_size: AtomicUsize, +// Packed metadata structure +// Bits 0-23: logical length (16MB max) +// Bits 26-40: arena_id (15 bits) +// Bits 41-55: page_idx (15 bits) +#[derive(Clone, Copy)] +#[repr(transparent)] +struct Metadata(u64); +impl Metadata { + const fn new(logical_len: usize, arena_id: u32, page_idx: u32) -> Self { + let mut meta = logical_len as u64 & 0xFFFFFF; + meta |= ((arena_id as u64) & 0x7FFF) << 26; + meta |= ((page_idx as u64) & 0x7FFF) << 41; + Self(meta) + } + const fn logical_len(&self) -> usize { + (self.0 & 0xFFFFFF) as usize + } + + const fn arena_id(&self) -> u32 { + ((self.0 >> 26) & 0x7FFF) as u32 + } + + const fn page_idx(&self) -> u32 { + ((self.0 >> 41) & 0x7FFF) as u32 + } } -impl BufferPool { - pub fn new(page_size: Option) -> Self { - Self { - free_buffers: Mutex::new(Vec::new()), - page_size: AtomicUsize::new(page_size.unwrap_or(PageSize::DEFAULT as usize)), +#[derive(Debug)] +/// A buffer allocated from an arena, or a clone of one. +/// NOTE: we currently only deep clone a buffer in the btree when defragmenting. +/// we can remove all the plumbing when that is merged. +pub struct ArenaBuffer { + ptr: NonNull, + meta: Metadata, +} +const REGISTERED_ID: u32 = 0; + +impl std::fmt::Debug for Metadata { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("BufferMetadata") + .field("logical_len", &self.logical_len()) + .field("arena_id", &self.arena_id()) + .finish() + } +} + +impl ArenaBuffer { + const fn new(ptr: NonNull, meta: Metadata) -> Self { + ArenaBuffer { ptr, meta } + } + + #[inline(always)] + /// Returns the `id` of the underlying arena if it is registered with `io_uring` + pub fn fixed_id(&self) -> Option { + let id = self.meta.arena_id(); + if id == REGISTERED_ID { + Some(id) + } else { + None } } - pub fn set_page_size(&self, page_size: usize) { - self.page_size.store(page_size, Ordering::Relaxed); + /// The requested size of the allocation. The actual size is always rounded up to + /// the next multiple of ARENA_PAGE_SIZE. + pub const fn logical_len(&self) -> usize { + self.meta.logical_len() } - - pub fn get(&self) -> BufferData { - let buffer = self.free_buffers.lock().pop(); - buffer.unwrap_or_else(|| Pin::new(vec![0; self.page_size.load(Ordering::Relaxed)])) + pub fn as_slice(&self) -> &[u8] { + unsafe { std::slice::from_raw_parts(self.ptr.as_ptr(), self.logical_len()) } } - - pub fn put(&self, buffer: BufferData) { - self.free_buffers.lock().push(buffer); + pub fn as_mut_slice(&mut self) -> &mut [u8] { + unsafe { std::slice::from_raw_parts_mut(self.ptr.as_ptr(), self.logical_len()) } } } -#[cfg(test)] -mod tests { - use super::*; - - fn is_send_sync_static() {} - - #[test] - fn test_send_sync() { - is_send_sync_static::(); +impl Drop for ArenaBuffer { + fn drop(&mut self) { + let pool = BUFFER_POOL + .get() + .expect("BufferPool not initialized, cannot free ArenaBuffer"); + pool.free( + self.meta.arena_id(), + self.meta.page_idx(), + self.logical_len(), + ); + } +} + +impl std::ops::Deref for ArenaBuffer { + type Target = [u8]; + fn deref(&self) -> &Self::Target { + self.as_slice() + } +} + +impl std::ops::DerefMut for ArenaBuffer { + fn deref_mut(&mut self) -> &mut Self::Target { + self.as_mut_slice() + } +} + +/// Static Buffer pool managing multiple arenas +pub struct BufferPool { + inner: UnsafeCell, +} +unsafe impl Sync for BufferPool {} +unsafe impl Send for BufferPool {} + +struct PoolInner { + io: Option>, + arena: Option, + init_lock: Mutex<()>, + arena_size: AtomicUsize, + db_page_size: AtomicUsize, +} + +unsafe impl Sync for PoolInner {} +unsafe impl Send for PoolInner {} + +impl Default for BufferPool { + fn default() -> Self { + Self::new(Self::DEFAULT_ARENA_SIZE) + } +} + +/// Helper for initializing the pool properly. We need buffers for IO to read the +/// header to discover the page_size, so we don't want to initialize a whole arena +/// just to drop it when we find out the page size is different. So we have this +/// builder that is returned which can either be called `finalize` with the page size, +/// or it will simply finalize with the default page size when it goes out of scope. +pub struct BufferPoolBuilder; +impl BufferPoolBuilder { + pub fn finalize_page_size(&self, page_size: usize) -> crate::Result<()> { + let pool = BUFFER_POOL.get().expect("BufferPool must be initialized"); + let inner = pool.inner_mut(); + if inner.arena.is_some() { + return Ok(()); + } + tracing::trace!("finalize page size called with size {page_size}"); + if page_size != BufferPool::DEFAULT_PAGE_SIZE { + // so far we have handed out some temporary buffers, since the page size is not + // default, we need to clear the cache so they aren't reused for other operations. + TEMP_BUFFER_CACHE.with(|cache| { + cache.borrow_mut().reinit_cache(page_size); + }); + } + inner.db_page_size.store(page_size, Ordering::Relaxed); + inner.init_arena() + } +} + +/// If `begin_init` is called without `finalize`, we will just finalize +/// the pool with the default page size. +impl Drop for BufferPoolBuilder { + fn drop(&mut self) { + // finalize_page_size is idempotent, and will not do anything if an arena already exists + let _ = self.finalize_page_size(BufferPool::DEFAULT_PAGE_SIZE); + } +} + +impl BufferPool { + pub const DEFAULT_ARENA_SIZE: usize = 4 * 1024 * 1024; // 4MB arena + pub const DEFAULT_PAGE_SIZE: usize = 4096; // 4KB default page size + const MAX_ARENA_SIZE: usize = 32 * 1024 * 1024; // 32MB max arena + + pub fn new(arena_size: usize) -> Self { + turso_assert!( + arena_size < Self::MAX_ARENA_SIZE, + "Arena size cannot exceed {} bytes", + Self::MAX_ARENA_SIZE + ); + Self { + inner: UnsafeCell::new(PoolInner { + arena: None, + arena_size: arena_size.into(), + db_page_size: Self::DEFAULT_PAGE_SIZE.into(), + init_lock: Mutex::new(()), + io: None, + }), + } + } + + pub fn allocate(len: usize) -> Buffer { + let pool = BUFFER_POOL.get().expect("BufferPool must be initialized"); + pool.inner().allocate(len) + } + + pub fn get_page() -> Buffer { + let pool = BUFFER_POOL.get().expect("BufferPool must be initialized"); + let inner = pool.inner(); + inner.allocate(inner.db_page_size.load(Ordering::Relaxed)) + } + + fn inner(&self) -> &PoolInner { + unsafe { &*self.inner.get() } + } + + #[allow(clippy::mut_from_ref)] + fn inner_mut(&self) -> &mut PoolInner { + unsafe { &mut *self.inner.get() } + } + + /// Initialize the pool to the default page size, WITHOUT creating an arena + /// Arena will be created when set_page_size is called + pub fn begin_init(io: &Arc, arena_size: usize) -> BufferPoolBuilder { + let pool = BUFFER_POOL.get_or_init(|| Arc::new(BufferPool::new(arena_size))); + let inner = pool.inner_mut(); + // Just store the IO handle, don't create arena yet + if inner.io.is_none() { + inner.io = Some(Arc::clone(io)); + } + BufferPoolBuilder {} + } + + #[inline] + pub fn free(&self, arena_id: u32, page_id: u32, size: usize) { + self.inner_mut().free(arena_id, page_id, size); + } +} + +impl PoolInner { + pub fn allocate(&self, len: usize) -> Buffer { + turso_assert!(len > 0, "Cannot allocate zero-length buffer"); + let Some(arena) = self.arena.as_ref() else { + // pool isn't fully initialized, return temporary buffer + return Buffer::new_temporary(len); + }; + if let Some(FreeEntry { ptr, first_idx }) = arena.try_alloc(len) { + tracing::trace!( + "Allocated buffer of length {} from arena {} at index {}", + len, + arena.id, + first_idx + ); + return Buffer::new_pooled(ArenaBuffer::new( + ptr, + Metadata::new(len, arena.id, first_idx), + )); + } + Buffer::new_temporary(len) + } + + /// Allocate a new arena for the pool to use + fn init_arena(&mut self) -> crate::Result<()> { + // Prevent concurrent growth + let Some(_guard) = self.init_lock.try_lock() else { + tracing::debug!("Buffer pool is already growing, skipping initialization"); + return Ok(()); // Already in progress + }; + let arena_size = self.arena_size.load(Ordering::Relaxed); + let io = self.io.as_ref().expect("Pool not initialized").clone(); + match Arena::new( + self.db_page_size.load(Ordering::Relaxed) + WAL_FRAME_HEADER_SIZE, + arena_size, + &io, + ) { + Ok(arena) => { + tracing::trace!( + "Growing buffer pool: added arena {} with size {} MB", + arena.id, + arena_size / (1024 * 1024) + ); + self.arena = Some(arena); + Ok(()) + } + Err(e) => { + tracing::error!("Failed to create new arena: {:?}", e); + Err(LimboError::InternalError(format!( + "Failed to create new arena: {e}", + ))) + } + } + } + + pub fn free(&mut self, arena_id: u32, page_idx: u32, size: usize) { + let arena = self + .arena + .as_mut() + .expect("pool arena not initialized, cannot free buffer"); + let pages = size.div_ceil(arena.page_size); + tracing::trace!("Freeing {} pages from arena {}", pages, arena_id); + turso_assert!( + arena_id == arena.id, + "should not free from different arena. {arena_id} != {}", + arena.id + ); + arena.free(page_idx, pages); + } +} + +/// A single memory arena +struct Arena { + /// Identifier to tie allocations back to the arena + id: u32, + /// base pointer to the arena returned by `mmap` + base: NonNull, + allocated_pages: AtomicUsize, + free_pages: SpinLock>, + arena_size: usize, + page_size: usize, +} + +impl Drop for Arena { + fn drop(&mut self) { + unsafe { arena::dealloc(self.base.as_ptr(), self.arena_size) }; + } +} + +struct FreeEntry { + ptr: NonNull, + first_idx: u32, +} + +const UNREGISTERED: u32 = 1; + +/// For an arena which isn't registered with `io_uring` +/// registered arena will always have id = 0 +static NEXT_ID: AtomicU32 = AtomicU32::new(UNREGISTERED); + +impl Arena { + /// Create a new arena with the given size and page size. + fn new(page_size: usize, arena_size: usize, io: &Arc) -> Result { + let ptr = unsafe { arena::alloc(arena_size) }; + let base = NonNull::new(ptr).ok_or("Failed to allocate arena")?; + let total_pages = arena_size / page_size; + let id = if io.register_fixed_buffer(base, arena_size).is_ok() { + REGISTERED_ID + } else { + NEXT_ID.fetch_add(1, Ordering::Relaxed) + }; + + Ok(Self { + id, + base, + free_pages: SpinLock::new((0..total_pages as u32).collect()), + allocated_pages: AtomicUsize::new(0), + page_size, + arena_size, + }) + } + + pub fn try_alloc(&self, size: usize) -> Option { + let pages = size.div_ceil(self.page_size); + let mut free = self.free_pages.lock(); + if free.len() < pages { + return None; + } + if pages == 1 { + // fast path: most allocations are single pages + if let Some(page_idx) = free.pop() { + self.allocated_pages.fetch_add(pages, Ordering::Relaxed); + let offset = page_idx as usize * self.page_size; + let ptr = unsafe { NonNull::new_unchecked(self.base.as_ptr().add(offset)) }; + tracing::trace!( + "Allocated single page at index {} from arena {}", + page_idx, + self.id + ); + return Some(FreeEntry { + ptr, + first_idx: page_idx, + }); + } + } else { + return self.try_alloc_many(pages); + } + None + } + + /// Free pages back to this arena + pub fn free(&self, page_idx: u32, count: usize) { + let mut free = self.free_pages.lock(); + // Add pages back to freelist + for i in 0..count { + free.push(page_idx + i as u32); + } + self.allocated_pages.fetch_sub(count, Ordering::Relaxed); + } + + fn try_alloc_many(&self, pages: usize) -> Option { + // TODO: for now, we don't request buffers > len of a wal frame. + // if we eventually do, we can optimize this further + let mut free = self.free_pages.lock(); + if pages <= 3 && free.len() >= pages { + let start = free.len() - pages; + let first_idx = free[start]; + + let mut consecutive = true; + for j in 1..pages { + if free[start + j] != first_idx + j as u32 { + consecutive = false; + break; + } + } + if consecutive { + free.truncate(start); + self.allocated_pages.fetch_add(pages, Ordering::Relaxed); + let offset = first_idx as usize * self.page_size; + let ptr = unsafe { NonNull::new_unchecked(self.base.as_ptr().add(offset)) }; + return Some(FreeEntry { ptr, first_idx }); + } + } + // Fall back to searching from the beginning + for i in 0..free.len().saturating_sub(pages - 1) { + let first_idx = free[i]; + let mut consecutive = true; + for j in 1..pages { + if free[i + j] != first_idx + j as u32 { + consecutive = false; + break; + } + } + + if consecutive { + free.drain(i..i + pages); + self.allocated_pages.fetch_add(pages, Ordering::Relaxed); + let offset = first_idx as usize * self.page_size; + let ptr = unsafe { NonNull::new_unchecked(self.base.as_ptr().add(offset)) }; + return Some(FreeEntry { ptr, first_idx }); + } + } + None + } +} + +#[cfg(unix)] +mod arena { + #[cfg(target_os = "macos")] + use libc::MAP_ANON as MAP_ANONYMOUS; + #[cfg(target_os = "linux")] + use libc::MAP_ANONYMOUS; + use libc::{mmap, munmap, MAP_PRIVATE, PROT_READ, PROT_WRITE}; + use std::ffi::c_void; + + pub unsafe fn alloc(len: usize) -> *mut u8 { + let ptr = mmap( + std::ptr::null_mut(), + len, + PROT_READ | PROT_WRITE, + MAP_PRIVATE | MAP_ANONYMOUS, + -1, + 0, + ); + if ptr == libc::MAP_FAILED { + panic!("mmap failed: {}", std::io::Error::last_os_error()); + } + #[cfg(target_os = "linux")] + { + libc::madvise(ptr, len, libc::MADV_HUGEPAGE); + } + ptr as *mut u8 + } + + pub unsafe fn dealloc(ptr: *mut u8, len: usize) { + let result = munmap(ptr as *mut c_void, len); + if result != 0 { + panic!("munmap failed: {}", std::io::Error::last_os_error()); + } + } +} + +#[cfg(not(unix))] +mod arena { + use crate::bufferpool::DEFAULT_PAGE_SIZE; + pub fn alloc(len: usize) -> *mut u8 { + let layout = std::alloc::Layout::from_size_align(len, std::mem::size_of::()).unwrap(); + unsafe { std::alloc::alloc(layout) } + } + pub fn dealloc(ptr: *mut u8, len: usize) { + let layout = std::alloc::Layout::from_size_align(len, std::mem::size_of::()).unwrap(); + unsafe { std::alloc::dealloc(ptr, layout) }; + } +} + +thread_local! { + pub static TEMP_BUFFER_CACHE: RefCell = RefCell::new(TempBufferCache::new()); +} + +pub(crate) struct TempBufferCache { + // Buffers indexed by size, we only cache common sizes + page_size: usize, + page_buffers: Vec, + wal_frame_buffers: Vec, + max_cached: usize, +} + +impl TempBufferCache { + fn new() -> Self { + Self { + page_size: BufferPool::DEFAULT_PAGE_SIZE, + page_buffers: Vec::with_capacity(8), + wal_frame_buffers: Vec::with_capacity(8), + max_cached: 512, + } + } + + fn reinit_cache(&mut self, page_size: usize) { + self.page_buffers.clear(); + self.wal_frame_buffers.clear(); + self.page_size = page_size; + } + + pub fn get_buffer(&mut self, size: usize) -> Option { + match size { + sz if sz == self.page_size => self.page_buffers.pop(), + sz if sz == (self.page_size + WAL_FRAME_HEADER_SIZE) => self.wal_frame_buffers.pop(), + _ => None, + } + } + + pub fn return_buffer(&mut self, buff: BufferData, len: usize) { + let sz = self.page_size; + let cache = match len { + n if n.eq(&sz) => &mut self.page_buffers, + n if n.eq(&(sz + WAL_FRAME_HEADER_SIZE)) => &mut self.wal_frame_buffers, + _ => return, + }; + if self.max_cached > cache.len() { + cache.push(buff); + } } }