From 0ffba81216224ec2532e006db28e7dfd77c22e01 Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Sun, 3 Aug 2025 15:04:49 -0400 Subject: [PATCH] Make register buffer io trait return the buf index --- core/io/mod.rs | 2 +- core/storage/buffer_pool.rs | 270 ++++++++++++++++++++---------------- 2 files changed, 150 insertions(+), 122 deletions(-) diff --git a/core/io/mod.rs b/core/io/mod.rs index 0bded8c49..cce30f52b 100644 --- a/core/io/mod.rs +++ b/core/io/mod.rs @@ -124,7 +124,7 @@ pub trait IO: Clock + Send + Sync { fn get_memory_io(&self) -> Arc; - fn register_fixed_buffer(&self, _ptr: NonNull, _len: usize) -> Result<()> { + fn register_fixed_buffer(&self, _ptr: NonNull, _len: usize) -> Result { Err(crate::LimboError::InternalError( "unsupported operation".to_string(), )) diff --git a/core/storage/buffer_pool.rs b/core/storage/buffer_pool.rs index c1ede775d..8d0ad2248 100644 --- a/core/storage/buffer_pool.rs +++ b/core/storage/buffer_pool.rs @@ -1,12 +1,13 @@ use crate::fast_lock::SpinLock; use crate::io::TEMP_BUFFER_CACHE; +use crate::storage::page_bitmap::PageBitmap; use crate::storage::sqlite3_ondisk::WAL_FRAME_HEADER_SIZE; use crate::{turso_assert, Buffer, LimboError, IO}; use parking_lot::Mutex; use std::cell::UnsafeCell; use std::ptr::NonNull; use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering}; -use std::sync::{Arc, OnceLock}; +use std::sync::{Arc, OnceLock, Weak}; pub static BUFFER_POOL: OnceLock> = OnceLock::new(); @@ -19,8 +20,6 @@ pub struct ArenaBuffer { len: usize, } -const REGISTERED_ID: u32 = 0; - impl ArenaBuffer { const fn new(ptr: NonNull, len: usize, arena_id: u32, page_idx: u32) -> Self { ArenaBuffer { @@ -32,17 +31,17 @@ impl ArenaBuffer { } #[inline(always)] - /// Returns the `id` of the underlying arena if it is registered with `io_uring` + /// Returns the `id` of the underlying arena, only if it was registered with `io_uring` pub fn fixed_id(&self) -> Option { - if self.arena_id == REGISTERED_ID { - Some(REGISTERED_ID) + if self.arena_id < UNREGISTERED_START { + Some(self.arena_id) } else { None } } - /// The requested size of the allocation. The actual size is always rounded up to - /// the next multiple of the arena.page_size + /// The requested size of the allocation, the actual size of the underlying buffer is rounded up to + /// the next multiple of the arena's page_size pub const fn logical_len(&self) -> usize { self.len } @@ -59,7 +58,8 @@ impl Drop for ArenaBuffer { let pool = BUFFER_POOL .get() .expect("BufferPool not initialized, cannot free ArenaBuffer"); - pool.free(self.logical_len(), self.arena_id, self.page_idx); + let inner = pool.inner_mut(); + inner.free(self.logical_len(), self.arena_id, self.page_idx); } } @@ -85,7 +85,8 @@ unsafe impl Send for BufferPool {} struct PoolInner { io: Option>, - arena: Option, + page_arena: Option, + wal_frame_arena: Option, init_lock: Mutex<()>, arena_size: AtomicUsize, db_page_size: AtomicUsize, @@ -114,7 +115,8 @@ impl BufferPool { ); Self { inner: UnsafeCell::new(PoolInner { - arena: None, + page_arena: None, + wal_frame_arena: None, arena_size: arena_size.into(), db_page_size: Self::DEFAULT_PAGE_SIZE.into(), init_lock: Mutex::new(()), @@ -129,8 +131,8 @@ impl BufferPool { } pub fn get_page(&self) -> Buffer { - let inner = self.inner(); - inner.allocate(inner.db_page_size.load(Ordering::Relaxed)) + let inner = self.inner_mut(); + inner.get_page() } fn inner(&self) -> &PoolInner { @@ -157,7 +159,8 @@ impl BufferPool { } /// Call when `[Database::db_state]` is initialized, providing the `page_size` to allocate - /// an arena for the pool. Before this call, the pool will use temporary buffers + /// an arena for the pool. Before this call, the pool will use temporary buffers which are + /// cached in thread local storage. pub fn finalize_with_page_size(&self, page_size: usize) -> crate::Result> { let pool = BUFFER_POOL.get().expect("BufferPool must be initialized"); let inner = pool.inner_mut(); @@ -169,11 +172,11 @@ impl BufferPool { cache.borrow_mut().reinit_cache(page_size); }); } - if inner.arena.is_some() { + if inner.page_arena.is_some() { return Ok(pool.clone()); } inner.db_page_size.store(page_size, Ordering::Relaxed); - inner.init_arena()?; + inner.init_arenas()?; Ok(pool.clone()) } @@ -188,7 +191,27 @@ impl PoolInner { /// temporary thread local buffers if the pool is not initialized or full pub fn allocate(&self, len: usize) -> Buffer { turso_assert!(len > 0, "Cannot allocate zero-length buffer"); - let Some(arena) = self.arena.as_ref() else { + + let db_page_size = self.db_page_size.load(Ordering::Relaxed); + let wal_frame_size = db_page_size + WAL_FRAME_HEADER_SIZE; + + // Check if this is exactly a WAL frame size allocation + if len == wal_frame_size { + let Some(wal_arena) = self.wal_frame_arena.as_ref() else { + return Buffer::new_temporary(len); + }; + if let Some(FreeEntry { ptr, first_idx }) = wal_arena.try_alloc(len) { + tracing::trace!( + "Allocated WAL frame buffer of length {} from arena {} at index {}", + len, + wal_arena.id, + first_idx + ); + return Buffer::new_pooled(ArenaBuffer::new(ptr, len, wal_arena.id, first_idx)); + } + } + // For all other sizes, use regular arena + let Some(arena) = self.page_arena.as_ref() else { // pool isn't fully initialized, return temporary buffer return Buffer::new_temporary(len); }; @@ -204,42 +227,91 @@ impl PoolInner { Buffer::new_temporary(len) } + fn get_page(&mut self) -> Buffer { + let db_page_size = self.db_page_size.load(Ordering::Relaxed); + let Some(arena) = self.page_arena.as_ref() else { + return Buffer::new_temporary(db_page_size); + }; + if let Some(FreeEntry { ptr, first_idx }) = arena.try_alloc(db_page_size) { + tracing::trace!( + "Allocated page buffer of size {} from arena {} at index {}", + db_page_size, + arena.id, + first_idx + ); + return Buffer::new_pooled(ArenaBuffer::new(ptr, db_page_size, arena.id, first_idx)); + } + Buffer::new_temporary(db_page_size) + } + /// Allocate a new arena for the pool to use - fn init_arena(&mut self) -> crate::Result<()> { + fn init_arenas(&mut self) -> crate::Result<()> { // Prevent concurrent growth let Some(_guard) = self.init_lock.try_lock() else { tracing::debug!("Buffer pool is already growing, skipping initialization"); return Ok(()); // Already in progress }; let arena_size = self.arena_size.load(Ordering::Relaxed); + let db_page_size = self.db_page_size.load(Ordering::Relaxed); let io = self.io.as_ref().expect("Pool not initialized").clone(); - match Arena::new( - self.db_page_size.load(Ordering::Relaxed) + WAL_FRAME_HEADER_SIZE, - arena_size, - &io, - ) { + + // Create regular page arena + match Arena::new(db_page_size, arena_size, &io) { Ok(arena) => { tracing::trace!( - "added arena {} with size {} MB", + "added arena {} with size {} MB and page size {}", arena.id, - arena_size / (1024 * 1024) + arena_size / (1024 * 1024), + db_page_size ); - self.arena = Some(arena); - Ok(()) + self.page_arena = Some(arena); } Err(e) => { - tracing::error!("Failed to create new arena: {:?}", e); - Err(LimboError::InternalError(format!( - "Failed to create new arena: {e}", - ))) + tracing::error!("Failed to create arena: {:?}", e); + return Err(LimboError::InternalError(format!( + "Failed to create arena: {e}", + ))); } } + + // Create WAL frame arena + let wal_frame_size = db_page_size + WAL_FRAME_HEADER_SIZE; + match Arena::new(wal_frame_size, arena_size, &io) { + Ok(arena) => { + tracing::trace!( + "added WAL frame arena {} with size {} MB and page size {}", + arena.id, + arena_size / (1024 * 1024), + wal_frame_size + ); + self.wal_frame_arena = Some(arena); + } + Err(e) => { + tracing::error!("Failed to create WAL frame arena: {:?}", e); + return Err(LimboError::InternalError(format!( + "Failed to create WAL frame arena: {e}", + ))); + } + } + + Ok(()) } - pub fn free(&mut self, size: usize, arena_id: u32, page_idx: u32) { + pub fn free(&self, size: usize, arena_id: u32, page_idx: u32) { + // Check WAL frame arena first + if let Some(wal_arena) = self.wal_frame_arena.as_ref() { + if arena_id == wal_arena.id { + let pages = size.div_ceil(wal_arena.page_size); + tracing::trace!("Freeing {} pages from WAL frame arena {}", pages, arena_id); + wal_arena.free(page_idx, pages); + return; + } + } + + // Otherwise use regular arena let arena = self - .arena - .as_mut() + .page_arena + .as_ref() .expect("pool arena not initialized, cannot free buffer"); let pages = size.div_ceil(arena.page_size); tracing::trace!("Freeing {} pages from arena {}", pages, arena_id); @@ -252,15 +324,22 @@ impl PoolInner { } } -/// A single memory arena +/// Preallocated block of memory used by the pool to distribute Buffers struct Arena { /// Identifier to tie allocations back to the arena id: u32, /// base pointer to the arena returned by `mmap` base: NonNull, + /// Total number of pages currently allocated from this arena allocated_pages: AtomicUsize, - free_pages: SpinLock>, + /// Currently free pages + free_pages: SpinLock, + /// Total size of the arena in bytes arena_size: usize, + /// Page size the total arena is divided into. + /// Because most allocations are of size `[Database::page_size]`, with an + /// additional 24 byte wal frame header, we treat this as the `page_size` to reduce + /// fragmentation to 24 bytes for regular pages page_size: usize, } @@ -275,11 +354,11 @@ struct FreeEntry { first_idx: u32, } -const UNREGISTERED: u32 = 1; +const UNREGISTERED_START: u32 = 2; /// For an arena which isn't registered with `io_uring` -/// registered arena will always have id = 0 -static NEXT_ID: AtomicU32 = AtomicU32::new(UNREGISTERED); +/// registered arena will always have id = 0..=1 +static NEXT_ID: AtomicU32 = AtomicU32::new(UNREGISTERED_START); impl Arena { /// Create a new arena with the given size and page size. @@ -287,16 +366,19 @@ impl Arena { let ptr = unsafe { arena::alloc(arena_size) }; let base = NonNull::new(ptr).ok_or("Failed to allocate arena")?; let total_pages = arena_size / page_size; - let id = if io.register_fixed_buffer(base, arena_size).is_ok() { - REGISTERED_ID - } else { - NEXT_ID.fetch_add(1, Ordering::Relaxed) - }; + let id = io + .register_fixed_buffer(base, arena_size) + .unwrap_or_else(|_| { + // Register with io_uring if possible, otherwise use next available ID + let next_id = NEXT_ID.fetch_add(1, Ordering::Relaxed); + tracing::trace!("Allocating arena with id {}", next_id); + next_id + }); Ok(Self { id, base, - free_pages: SpinLock::new((0..total_pages as u32).collect()), + free_pages: SpinLock::new(PageBitmap::new(total_pages as u32)), allocated_pages: AtomicUsize::new(0), page_size, arena_size, @@ -304,88 +386,34 @@ impl Arena { } pub fn try_alloc(&self, size: usize) -> Option { - let pages = size.div_ceil(self.page_size); - let mut free = self.free_pages.lock(); - if free.len() < pages { - return None; - } - if pages == 1 { - // fast path: for now, most/all allocations are single pages - if let Some(page_idx) = free.pop() { - self.allocated_pages.fetch_add(pages, Ordering::Relaxed); - let offset = page_idx as usize * self.page_size; - let ptr = unsafe { NonNull::new_unchecked(self.base.as_ptr().add(offset)) }; - tracing::trace!( - "Allocated single page at index {} from arena {}", - page_idx, - self.id - ); - return Some(FreeEntry { - ptr, - first_idx: page_idx, - }); - } + let pages = size.div_ceil(self.page_size) as u32; + let mut freemap = self.free_pages.lock(); + + let first_idx = if pages == 1 { + freemap.alloc_one()? } else { - return self.try_alloc_many(pages); - } - None + freemap.alloc_run(pages)? + }; + + self.allocated_pages + .fetch_add(pages as usize, Ordering::Relaxed); + let offset = first_idx as usize * self.page_size; + let ptr = unsafe { NonNull::new_unchecked(self.base.as_ptr().add(offset)) }; + Some(FreeEntry { ptr, first_idx }) + } + + /// Allocate a contiguous run of `pages`. Returns the first page index. + #[inline] + pub fn alloc_run(&self, pages: u32) -> Option { + let mut bm = self.free_pages.lock(); + bm.alloc_run(pages) } - /// Free pages back to this arena pub fn free(&self, page_idx: u32, count: usize) { - let mut free = self.free_pages.lock(); - // Add pages back to freelist - for i in 0..count { - free.push(page_idx + i as u32); - } + let mut bm = self.free_pages.lock(); + bm.free_run(page_idx, count as u32); self.allocated_pages.fetch_sub(count, Ordering::Relaxed); } - - #[cold] - fn try_alloc_many(&self, pages: usize) -> Option { - // TODO (preston): we can optimize this further when we start allocating larger - // contiguous blocks for coalescing. this is 'unused' for now - let mut free = self.free_pages.lock(); - if pages <= 3 && free.len() >= pages { - let start = free.len() - pages; - let first_idx = free[start]; - - let mut consecutive = true; - for j in 1..pages { - if free[start + j] != first_idx + j as u32 { - consecutive = false; - break; - } - } - if consecutive { - free.truncate(start); - self.allocated_pages.fetch_add(pages, Ordering::Relaxed); - let offset = first_idx as usize * self.page_size; - let ptr = unsafe { NonNull::new_unchecked(self.base.as_ptr().add(offset)) }; - return Some(FreeEntry { ptr, first_idx }); - } - } - // Fall back to searching from the beginning - for i in 0..free.len().saturating_sub(pages - 1) { - let first_idx = free[i]; - let mut consecutive = true; - for j in 1..pages { - if free[i + j] != first_idx + j as u32 { - consecutive = false; - break; - } - } - - if consecutive { - free.drain(i..i + pages); - self.allocated_pages.fetch_add(pages, Ordering::Relaxed); - let offset = first_idx as usize * self.page_size; - let ptr = unsafe { NonNull::new_unchecked(self.base.as_ptr().add(offset)) }; - return Some(FreeEntry { ptr, first_idx }); - } - } - None - } } #[cfg(unix)]