From d38cd6360a19bc6cdf519a4ab8f4c1c032bc46f9 Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Sat, 2 Aug 2025 11:23:05 -0400 Subject: [PATCH 01/29] Create new arena backed buffer pool --- core/storage/buffer_pool.rs | 547 ++++++++++++++++++++++++++++++++++-- 1 file changed, 518 insertions(+), 29 deletions(-) diff --git a/core/storage/buffer_pool.rs b/core/storage/buffer_pool.rs index 4d7beae2b..4d1ab7e5c 100644 --- a/core/storage/buffer_pool.rs +++ b/core/storage/buffer_pool.rs @@ -1,45 +1,534 @@ +use crate::fast_lock::SpinLock; use crate::io::BufferData; +use crate::storage::sqlite3_ondisk::WAL_FRAME_HEADER_SIZE; +use crate::{turso_assert, Buffer, LimboError, IO}; use parking_lot::Mutex; -use std::pin::Pin; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::cell::{RefCell, UnsafeCell}; +use std::ptr::NonNull; +use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering}; +use std::sync::{Arc, OnceLock}; -use super::sqlite3_ondisk::PageSize; +pub static BUFFER_POOL: OnceLock> = OnceLock::new(); -pub struct BufferPool { - pub free_buffers: Mutex>, - page_size: AtomicUsize, +// Packed metadata structure +// Bits 0-23: logical length (16MB max) +// Bits 26-40: arena_id (15 bits) +// Bits 41-55: page_idx (15 bits) +#[derive(Clone, Copy)] +#[repr(transparent)] +struct Metadata(u64); +impl Metadata { + const fn new(logical_len: usize, arena_id: u32, page_idx: u32) -> Self { + let mut meta = logical_len as u64 & 0xFFFFFF; + meta |= ((arena_id as u64) & 0x7FFF) << 26; + meta |= ((page_idx as u64) & 0x7FFF) << 41; + Self(meta) + } + const fn logical_len(&self) -> usize { + (self.0 & 0xFFFFFF) as usize + } + + const fn arena_id(&self) -> u32 { + ((self.0 >> 26) & 0x7FFF) as u32 + } + + const fn page_idx(&self) -> u32 { + ((self.0 >> 41) & 0x7FFF) as u32 + } } -impl BufferPool { - pub fn new(page_size: Option) -> Self { - Self { - free_buffers: Mutex::new(Vec::new()), - page_size: AtomicUsize::new(page_size.unwrap_or(PageSize::DEFAULT as usize)), +#[derive(Debug)] +/// A buffer allocated from an arena, or a clone of one. +/// NOTE: we currently only deep clone a buffer in the btree when defragmenting. +/// we can remove all the plumbing when that is merged. +pub struct ArenaBuffer { + ptr: NonNull, + meta: Metadata, +} +const REGISTERED_ID: u32 = 0; + +impl std::fmt::Debug for Metadata { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("BufferMetadata") + .field("logical_len", &self.logical_len()) + .field("arena_id", &self.arena_id()) + .finish() + } +} + +impl ArenaBuffer { + const fn new(ptr: NonNull, meta: Metadata) -> Self { + ArenaBuffer { ptr, meta } + } + + #[inline(always)] + /// Returns the `id` of the underlying arena if it is registered with `io_uring` + pub fn fixed_id(&self) -> Option { + let id = self.meta.arena_id(); + if id == REGISTERED_ID { + Some(id) + } else { + None } } - pub fn set_page_size(&self, page_size: usize) { - self.page_size.store(page_size, Ordering::Relaxed); + /// The requested size of the allocation. The actual size is always rounded up to + /// the next multiple of ARENA_PAGE_SIZE. + pub const fn logical_len(&self) -> usize { + self.meta.logical_len() } - - pub fn get(&self) -> BufferData { - let buffer = self.free_buffers.lock().pop(); - buffer.unwrap_or_else(|| Pin::new(vec![0; self.page_size.load(Ordering::Relaxed)])) + pub fn as_slice(&self) -> &[u8] { + unsafe { std::slice::from_raw_parts(self.ptr.as_ptr(), self.logical_len()) } } - - pub fn put(&self, buffer: BufferData) { - self.free_buffers.lock().push(buffer); + pub fn as_mut_slice(&mut self) -> &mut [u8] { + unsafe { std::slice::from_raw_parts_mut(self.ptr.as_ptr(), self.logical_len()) } } } -#[cfg(test)] -mod tests { - use super::*; - - fn is_send_sync_static() {} - - #[test] - fn test_send_sync() { - is_send_sync_static::(); +impl Drop for ArenaBuffer { + fn drop(&mut self) { + let pool = BUFFER_POOL + .get() + .expect("BufferPool not initialized, cannot free ArenaBuffer"); + pool.free( + self.meta.arena_id(), + self.meta.page_idx(), + self.logical_len(), + ); + } +} + +impl std::ops::Deref for ArenaBuffer { + type Target = [u8]; + fn deref(&self) -> &Self::Target { + self.as_slice() + } +} + +impl std::ops::DerefMut for ArenaBuffer { + fn deref_mut(&mut self) -> &mut Self::Target { + self.as_mut_slice() + } +} + +/// Static Buffer pool managing multiple arenas +pub struct BufferPool { + inner: UnsafeCell, +} +unsafe impl Sync for BufferPool {} +unsafe impl Send for BufferPool {} + +struct PoolInner { + io: Option>, + arena: Option, + init_lock: Mutex<()>, + arena_size: AtomicUsize, + db_page_size: AtomicUsize, +} + +unsafe impl Sync for PoolInner {} +unsafe impl Send for PoolInner {} + +impl Default for BufferPool { + fn default() -> Self { + Self::new(Self::DEFAULT_ARENA_SIZE) + } +} + +/// Helper for initializing the pool properly. We need buffers for IO to read the +/// header to discover the page_size, so we don't want to initialize a whole arena +/// just to drop it when we find out the page size is different. So we have this +/// builder that is returned which can either be called `finalize` with the page size, +/// or it will simply finalize with the default page size when it goes out of scope. +pub struct BufferPoolBuilder; +impl BufferPoolBuilder { + pub fn finalize_page_size(&self, page_size: usize) -> crate::Result<()> { + let pool = BUFFER_POOL.get().expect("BufferPool must be initialized"); + let inner = pool.inner_mut(); + if inner.arena.is_some() { + return Ok(()); + } + tracing::trace!("finalize page size called with size {page_size}"); + if page_size != BufferPool::DEFAULT_PAGE_SIZE { + // so far we have handed out some temporary buffers, since the page size is not + // default, we need to clear the cache so they aren't reused for other operations. + TEMP_BUFFER_CACHE.with(|cache| { + cache.borrow_mut().reinit_cache(page_size); + }); + } + inner.db_page_size.store(page_size, Ordering::Relaxed); + inner.init_arena() + } +} + +/// If `begin_init` is called without `finalize`, we will just finalize +/// the pool with the default page size. +impl Drop for BufferPoolBuilder { + fn drop(&mut self) { + // finalize_page_size is idempotent, and will not do anything if an arena already exists + let _ = self.finalize_page_size(BufferPool::DEFAULT_PAGE_SIZE); + } +} + +impl BufferPool { + pub const DEFAULT_ARENA_SIZE: usize = 4 * 1024 * 1024; // 4MB arena + pub const DEFAULT_PAGE_SIZE: usize = 4096; // 4KB default page size + const MAX_ARENA_SIZE: usize = 32 * 1024 * 1024; // 32MB max arena + + pub fn new(arena_size: usize) -> Self { + turso_assert!( + arena_size < Self::MAX_ARENA_SIZE, + "Arena size cannot exceed {} bytes", + Self::MAX_ARENA_SIZE + ); + Self { + inner: UnsafeCell::new(PoolInner { + arena: None, + arena_size: arena_size.into(), + db_page_size: Self::DEFAULT_PAGE_SIZE.into(), + init_lock: Mutex::new(()), + io: None, + }), + } + } + + pub fn allocate(len: usize) -> Buffer { + let pool = BUFFER_POOL.get().expect("BufferPool must be initialized"); + pool.inner().allocate(len) + } + + pub fn get_page() -> Buffer { + let pool = BUFFER_POOL.get().expect("BufferPool must be initialized"); + let inner = pool.inner(); + inner.allocate(inner.db_page_size.load(Ordering::Relaxed)) + } + + fn inner(&self) -> &PoolInner { + unsafe { &*self.inner.get() } + } + + #[allow(clippy::mut_from_ref)] + fn inner_mut(&self) -> &mut PoolInner { + unsafe { &mut *self.inner.get() } + } + + /// Initialize the pool to the default page size, WITHOUT creating an arena + /// Arena will be created when set_page_size is called + pub fn begin_init(io: &Arc, arena_size: usize) -> BufferPoolBuilder { + let pool = BUFFER_POOL.get_or_init(|| Arc::new(BufferPool::new(arena_size))); + let inner = pool.inner_mut(); + // Just store the IO handle, don't create arena yet + if inner.io.is_none() { + inner.io = Some(Arc::clone(io)); + } + BufferPoolBuilder {} + } + + #[inline] + pub fn free(&self, arena_id: u32, page_id: u32, size: usize) { + self.inner_mut().free(arena_id, page_id, size); + } +} + +impl PoolInner { + pub fn allocate(&self, len: usize) -> Buffer { + turso_assert!(len > 0, "Cannot allocate zero-length buffer"); + let Some(arena) = self.arena.as_ref() else { + // pool isn't fully initialized, return temporary buffer + return Buffer::new_temporary(len); + }; + if let Some(FreeEntry { ptr, first_idx }) = arena.try_alloc(len) { + tracing::trace!( + "Allocated buffer of length {} from arena {} at index {}", + len, + arena.id, + first_idx + ); + return Buffer::new_pooled(ArenaBuffer::new( + ptr, + Metadata::new(len, arena.id, first_idx), + )); + } + Buffer::new_temporary(len) + } + + /// Allocate a new arena for the pool to use + fn init_arena(&mut self) -> crate::Result<()> { + // Prevent concurrent growth + let Some(_guard) = self.init_lock.try_lock() else { + tracing::debug!("Buffer pool is already growing, skipping initialization"); + return Ok(()); // Already in progress + }; + let arena_size = self.arena_size.load(Ordering::Relaxed); + let io = self.io.as_ref().expect("Pool not initialized").clone(); + match Arena::new( + self.db_page_size.load(Ordering::Relaxed) + WAL_FRAME_HEADER_SIZE, + arena_size, + &io, + ) { + Ok(arena) => { + tracing::trace!( + "Growing buffer pool: added arena {} with size {} MB", + arena.id, + arena_size / (1024 * 1024) + ); + self.arena = Some(arena); + Ok(()) + } + Err(e) => { + tracing::error!("Failed to create new arena: {:?}", e); + Err(LimboError::InternalError(format!( + "Failed to create new arena: {e}", + ))) + } + } + } + + pub fn free(&mut self, arena_id: u32, page_idx: u32, size: usize) { + let arena = self + .arena + .as_mut() + .expect("pool arena not initialized, cannot free buffer"); + let pages = size.div_ceil(arena.page_size); + tracing::trace!("Freeing {} pages from arena {}", pages, arena_id); + turso_assert!( + arena_id == arena.id, + "should not free from different arena. {arena_id} != {}", + arena.id + ); + arena.free(page_idx, pages); + } +} + +/// A single memory arena +struct Arena { + /// Identifier to tie allocations back to the arena + id: u32, + /// base pointer to the arena returned by `mmap` + base: NonNull, + allocated_pages: AtomicUsize, + free_pages: SpinLock>, + arena_size: usize, + page_size: usize, +} + +impl Drop for Arena { + fn drop(&mut self) { + unsafe { arena::dealloc(self.base.as_ptr(), self.arena_size) }; + } +} + +struct FreeEntry { + ptr: NonNull, + first_idx: u32, +} + +const UNREGISTERED: u32 = 1; + +/// For an arena which isn't registered with `io_uring` +/// registered arena will always have id = 0 +static NEXT_ID: AtomicU32 = AtomicU32::new(UNREGISTERED); + +impl Arena { + /// Create a new arena with the given size and page size. + fn new(page_size: usize, arena_size: usize, io: &Arc) -> Result { + let ptr = unsafe { arena::alloc(arena_size) }; + let base = NonNull::new(ptr).ok_or("Failed to allocate arena")?; + let total_pages = arena_size / page_size; + let id = if io.register_fixed_buffer(base, arena_size).is_ok() { + REGISTERED_ID + } else { + NEXT_ID.fetch_add(1, Ordering::Relaxed) + }; + + Ok(Self { + id, + base, + free_pages: SpinLock::new((0..total_pages as u32).collect()), + allocated_pages: AtomicUsize::new(0), + page_size, + arena_size, + }) + } + + pub fn try_alloc(&self, size: usize) -> Option { + let pages = size.div_ceil(self.page_size); + let mut free = self.free_pages.lock(); + if free.len() < pages { + return None; + } + if pages == 1 { + // fast path: most allocations are single pages + if let Some(page_idx) = free.pop() { + self.allocated_pages.fetch_add(pages, Ordering::Relaxed); + let offset = page_idx as usize * self.page_size; + let ptr = unsafe { NonNull::new_unchecked(self.base.as_ptr().add(offset)) }; + tracing::trace!( + "Allocated single page at index {} from arena {}", + page_idx, + self.id + ); + return Some(FreeEntry { + ptr, + first_idx: page_idx, + }); + } + } else { + return self.try_alloc_many(pages); + } + None + } + + /// Free pages back to this arena + pub fn free(&self, page_idx: u32, count: usize) { + let mut free = self.free_pages.lock(); + // Add pages back to freelist + for i in 0..count { + free.push(page_idx + i as u32); + } + self.allocated_pages.fetch_sub(count, Ordering::Relaxed); + } + + fn try_alloc_many(&self, pages: usize) -> Option { + // TODO: for now, we don't request buffers > len of a wal frame. + // if we eventually do, we can optimize this further + let mut free = self.free_pages.lock(); + if pages <= 3 && free.len() >= pages { + let start = free.len() - pages; + let first_idx = free[start]; + + let mut consecutive = true; + for j in 1..pages { + if free[start + j] != first_idx + j as u32 { + consecutive = false; + break; + } + } + if consecutive { + free.truncate(start); + self.allocated_pages.fetch_add(pages, Ordering::Relaxed); + let offset = first_idx as usize * self.page_size; + let ptr = unsafe { NonNull::new_unchecked(self.base.as_ptr().add(offset)) }; + return Some(FreeEntry { ptr, first_idx }); + } + } + // Fall back to searching from the beginning + for i in 0..free.len().saturating_sub(pages - 1) { + let first_idx = free[i]; + let mut consecutive = true; + for j in 1..pages { + if free[i + j] != first_idx + j as u32 { + consecutive = false; + break; + } + } + + if consecutive { + free.drain(i..i + pages); + self.allocated_pages.fetch_add(pages, Ordering::Relaxed); + let offset = first_idx as usize * self.page_size; + let ptr = unsafe { NonNull::new_unchecked(self.base.as_ptr().add(offset)) }; + return Some(FreeEntry { ptr, first_idx }); + } + } + None + } +} + +#[cfg(unix)] +mod arena { + #[cfg(target_os = "macos")] + use libc::MAP_ANON as MAP_ANONYMOUS; + #[cfg(target_os = "linux")] + use libc::MAP_ANONYMOUS; + use libc::{mmap, munmap, MAP_PRIVATE, PROT_READ, PROT_WRITE}; + use std::ffi::c_void; + + pub unsafe fn alloc(len: usize) -> *mut u8 { + let ptr = mmap( + std::ptr::null_mut(), + len, + PROT_READ | PROT_WRITE, + MAP_PRIVATE | MAP_ANONYMOUS, + -1, + 0, + ); + if ptr == libc::MAP_FAILED { + panic!("mmap failed: {}", std::io::Error::last_os_error()); + } + #[cfg(target_os = "linux")] + { + libc::madvise(ptr, len, libc::MADV_HUGEPAGE); + } + ptr as *mut u8 + } + + pub unsafe fn dealloc(ptr: *mut u8, len: usize) { + let result = munmap(ptr as *mut c_void, len); + if result != 0 { + panic!("munmap failed: {}", std::io::Error::last_os_error()); + } + } +} + +#[cfg(not(unix))] +mod arena { + use crate::bufferpool::DEFAULT_PAGE_SIZE; + pub fn alloc(len: usize) -> *mut u8 { + let layout = std::alloc::Layout::from_size_align(len, std::mem::size_of::()).unwrap(); + unsafe { std::alloc::alloc(layout) } + } + pub fn dealloc(ptr: *mut u8, len: usize) { + let layout = std::alloc::Layout::from_size_align(len, std::mem::size_of::()).unwrap(); + unsafe { std::alloc::dealloc(ptr, layout) }; + } +} + +thread_local! { + pub static TEMP_BUFFER_CACHE: RefCell = RefCell::new(TempBufferCache::new()); +} + +pub(crate) struct TempBufferCache { + // Buffers indexed by size, we only cache common sizes + page_size: usize, + page_buffers: Vec, + wal_frame_buffers: Vec, + max_cached: usize, +} + +impl TempBufferCache { + fn new() -> Self { + Self { + page_size: BufferPool::DEFAULT_PAGE_SIZE, + page_buffers: Vec::with_capacity(8), + wal_frame_buffers: Vec::with_capacity(8), + max_cached: 512, + } + } + + fn reinit_cache(&mut self, page_size: usize) { + self.page_buffers.clear(); + self.wal_frame_buffers.clear(); + self.page_size = page_size; + } + + pub fn get_buffer(&mut self, size: usize) -> Option { + match size { + sz if sz == self.page_size => self.page_buffers.pop(), + sz if sz == (self.page_size + WAL_FRAME_HEADER_SIZE) => self.wal_frame_buffers.pop(), + _ => None, + } + } + + pub fn return_buffer(&mut self, buff: BufferData, len: usize) { + let sz = self.page_size; + let cache = match len { + n if n.eq(&sz) => &mut self.page_buffers, + n if n.eq(&(sz + WAL_FRAME_HEADER_SIZE)) => &mut self.wal_frame_buffers, + _ => return, + }; + if self.max_cached > cache.len() { + cache.push(buff); + } } } From 39bccc2357c1ad7d4fe91716ae18cde371e6d54c Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Sat, 2 Aug 2025 11:26:48 -0400 Subject: [PATCH 02/29] Update Buffer type in io module to adjust to new pool --- core/io/mod.rs | 108 ++++++++++++++++++++++++++++++++++++------------- 1 file changed, 81 insertions(+), 27 deletions(-) diff --git a/core/io/mod.rs b/core/io/mod.rs index 52b9be2e2..c32209d67 100644 --- a/core/io/mod.rs +++ b/core/io/mod.rs @@ -1,10 +1,11 @@ +use crate::storage::buffer_pool::{ArenaBuffer, TEMP_BUFFER_CACHE}; use crate::Result; use bitflags::bitflags; use cfg_block::cfg_block; use std::fmt; use std::ptr::NonNull; use std::sync::Arc; -use std::{cell::Cell, fmt::Debug, mem::ManuallyDrop, pin::Pin, rc::Rc}; +use std::{cell::Cell, fmt::Debug, pin::Pin}; pub trait File: Send + Sync { fn lock_file(&self, exclusive: bool) -> Result<()>; @@ -290,62 +291,115 @@ impl TruncateCompletion { } } -pub type BufferData = Pin>; +pub type BufferData = Pin>; -pub type BufferDropFn = Rc; - -pub struct Buffer { - data: ManuallyDrop, - drop: BufferDropFn, +pub enum Buffer { + Heap(BufferData), + Pooled(ArenaBuffer), } impl Debug for Buffer { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{:?}", self.data) + match self { + Self::Pooled(p) => write!(f, "{p:?}"), + Self::Heap(buf) => write!(f, "{buf:?}: {}", buf.len()), + } + } +} + +impl Clone for Buffer { + fn clone(&self) -> Self { + match self { + Self::Heap(buf) => { + let len = buf.len(); + Self::Heap(Vec::from(&buf[..len]).into_boxed_slice().into()) + } + Self::Pooled(buf) => { + // Clone pooled buffers as heap buffers + let data = Vec::from(buf.as_slice()); + Self::Heap(Pin::new(data.into_boxed_slice())) + } + } } } impl Drop for Buffer { fn drop(&mut self) { - let data = unsafe { ManuallyDrop::take(&mut self.data) }; - (self.drop)(data); + let len = self.len(); + if let Self::Heap(buf) = self { + crate::storage::buffer_pool::TEMP_BUFFER_CACHE.with(|cache| { + let buffer = std::mem::replace(buf, Pin::new(vec![].into_boxed_slice())); + cache.borrow_mut().return_buffer(buffer, len); + }); + } } } impl Buffer { - pub fn allocate(size: usize, drop: BufferDropFn) -> Self { - let data = ManuallyDrop::new(Pin::new(vec![0; size])); - Self { data, drop } + pub fn new(data: Vec) -> Self { + tracing::trace!("buffer::new({:?})", data); + Self::Heap(Pin::new(data.into_boxed_slice())) + } + pub fn fixed_id(&self) -> Option { + match self { + Self::Heap { .. } => None, + Self::Pooled(buf) => buf.fixed_id(), + } } - pub fn new(data: BufferData, drop: BufferDropFn) -> Self { - let data = ManuallyDrop::new(data); - Self { data, drop } + pub fn new_pooled(buf: ArenaBuffer) -> Self { + tracing::trace!("new_pooled({:?})", buf); + Self::Pooled(buf) + } + + pub fn new_temporary(size: usize) -> Self { + TEMP_BUFFER_CACHE.with(|cache| { + if let Some(buffer) = cache.borrow_mut().get_buffer(size) { + Self::Heap(buffer) + } else { + Self::Heap(Pin::new(vec![0; size].into_boxed_slice())) + } + }) } pub fn len(&self) -> usize { - self.data.len() + match self { + Self::Heap(buf) => buf.len(), + Self::Pooled(buf) => buf.logical_len(), + } } pub fn is_empty(&self) -> bool { - self.data.is_empty() + self.len() == 0 } pub fn as_slice(&self) -> &[u8] { - &self.data + match self { + Self::Heap(buf) => { + // SAFETY: The buffer is guaranteed to be valid for the lifetime of the slice + unsafe { std::slice::from_raw_parts(buf.as_ptr(), buf.len()) } + } + Self::Pooled(buf) => buf, + } } #[allow(clippy::mut_from_ref)] pub fn as_mut_slice(&self) -> &mut [u8] { - unsafe { std::slice::from_raw_parts_mut(self.as_mut_ptr(), self.data.len()) } + match self { + Self::Heap(buf) => { + // SAFETY: The buffer is guaranteed to be valid for the lifetime of the slice + unsafe { std::slice::from_raw_parts_mut(buf.as_ptr() as *mut u8, buf.len()) } + } + Self::Pooled(buf) => unsafe { + std::slice::from_raw_parts_mut(buf.as_ptr() as *mut u8, buf.len()) + }, + } } - - pub fn as_ptr(&self) -> *const u8 { - self.data.as_ptr() - } - - pub fn as_mut_ptr(&self) -> *mut u8 { - self.data.as_ptr() as *mut u8 + pub fn as_mut_ptr(&mut self) -> *mut u8 { + match self { + Self::Heap(buf) => buf.as_mut_ptr(), + Self::Pooled(buf) => buf.as_mut_ptr(), + } } } From a02f527c0624e63916b5dbb122498bc84e9eed81 Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Sat, 2 Aug 2025 11:27:13 -0400 Subject: [PATCH 03/29] Add fast path for pwritev on other IO backends --- core/io/mod.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/io/mod.rs b/core/io/mod.rs index c32209d67..e78b9d519 100644 --- a/core/io/mod.rs +++ b/core/io/mod.rs @@ -19,6 +19,9 @@ pub trait File: Send + Sync { c.complete(0); return Ok(c); } + if buffers.len() == 1 { + return self.pwrite(pos, buffers[0].clone(), c); + } // naive default implementation can be overridden on backends where it makes sense to let mut pos = pos; let outstanding = Arc::new(AtomicUsize::new(buffers.len())); From 27113885a9dd2c41b515a5a029248e42548eb11a Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Sat, 2 Aug 2025 11:29:17 -0400 Subject: [PATCH 04/29] Update sorter to use new buffer api --- core/vdbe/sorter.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/core/vdbe/sorter.rs b/core/vdbe/sorter.rs index 3ee943555..f27b57f54 100644 --- a/core/vdbe/sorter.rs +++ b/core/vdbe/sorter.rs @@ -9,7 +9,7 @@ use tempfile; use crate::{ error::LimboError, - io::{Buffer, BufferData, Completion, File, OpenFlags, IO}, + io::{Buffer, Completion, File, OpenFlags, IO}, storage::sqlite3_ondisk::{read_varint, varint_len, write_varint}, translate::collate::CollationSeq, turso_assert, @@ -364,8 +364,7 @@ impl SortedChunk { let read_buffer_size = self.buffer.borrow().len() - self.buffer_len.get(); let read_buffer_size = read_buffer_size.min(self.chunk_size - self.total_bytes_read.get()); - let drop_fn = Rc::new(|_buffer: BufferData| {}); - let read_buffer = Buffer::allocate(read_buffer_size, drop_fn); + let read_buffer = Buffer::new_temporary(read_buffer_size); let read_buffer_ref = Arc::new(read_buffer); let chunk_io_state_copy = self.io_state.clone(); @@ -414,8 +413,7 @@ impl SortedChunk { self.chunk_size += size_len + record_size; } - let drop_fn = Rc::new(|_buffer: BufferData| {}); - let buffer = Buffer::allocate(self.chunk_size, drop_fn); + let buffer = Buffer::new_temporary(self.chunk_size); let mut buf_pos = 0; let buf = buffer.as_mut_slice(); From 4ffb273b53a59467b58276c97c5a74c77f5d94dc Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Sat, 2 Aug 2025 14:05:42 -0400 Subject: [PATCH 05/29] Adjust IO to use new buffer pool and buffer API --- core/storage/btree.rs | 29 +++++--------- core/storage/buffer_pool.rs | 69 +++++++++++++--------------------- core/storage/page_cache.rs | 5 +-- core/storage/pager.rs | 31 ++++++++------- core/storage/sqlite3_ondisk.rs | 36 ++++++------------ core/storage/wal.rs | 15 ++------ 6 files changed, 69 insertions(+), 116 deletions(-) diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 4fc0b6245..2d432fe89 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -7215,7 +7215,6 @@ mod tests { use tempfile::TempDir; use crate::{ - io::BufferData, storage::{ btree::{compute_free_space, fill_cell_payload, payload_overflow_threshold_max}, sqlite3_ondisk::{BTreeCell, PageContent, PageType}, @@ -7230,11 +7229,7 @@ mod tests { fn get_page(id: usize) -> BTreePage { let page = Arc::new(Page::new(id)); - let drop_fn = Rc::new(|_| {}); - let inner = PageContent::new( - 0, - Arc::new(Buffer::new(BufferData::new(vec![0; 4096]), drop_fn)), - ); + let inner = PageContent::new(0, Arc::new(Buffer::new_temporary(4096))); page.get().contents.replace(inner); let page = Arc::new(BTreePageInner { page: RefCell::new(page), @@ -8455,21 +8450,16 @@ mod tests { fn setup_test_env(database_size: u32) -> Rc { let page_size = 512; - let buffer_pool = Arc::new(BufferPool::new(Some(page_size as usize))); - - // Initialize buffer pool with correctly sized buffers - for _ in 0..10 { - let vec = vec![0; page_size as usize]; // Initialize with correct length, not just capacity - buffer_pool.put(Pin::new(vec)); - } - let io: Arc = Arc::new(MemoryIO::new()); + let buffer_pool = + BufferPool::begin_init(&io, page_size * 128).finalize_page_size(page_size); + let db_file = Arc::new(DatabaseFile::new( io.open_file(":memory:", OpenFlags::Create, false).unwrap(), )); let wal_file = io.open_file("test.wal", OpenFlags::Create, false).unwrap(); - let wal_shared = WalFileShared::new_shared(page_size, &io, wal_file).unwrap(); + let wal_shared = WalFileShared::new_shared(page_size as u32, &io, wal_file).unwrap(); let wal = Rc::new(RefCell::new(WalFile::new( io.clone(), wal_shared, @@ -8522,16 +8512,17 @@ mod tests { // Setup overflow pages (2, 3, 4) with linking let mut current_page = 2u32; while current_page <= 4 { - let drop_fn = Rc::new(|_buf| {}); #[allow(clippy::arc_with_non_send_sync)] - let buf = Arc::new(Buffer::allocate( + let buf = Arc::new(Buffer::new_temporary( pager .io .block(|| pager.with_header(|header| header.page_size))? .get() as usize, - drop_fn, )); - let c = Completion::new_write(|_| {}); + let _buf = buf.clone(); + let c = Completion::new_write(|_| { + let _ = _buf.clone(); + }); let _c = pager .db_file .write_page(current_page as usize, buf.clone(), c)?; diff --git a/core/storage/buffer_pool.rs b/core/storage/buffer_pool.rs index 4d1ab7e5c..f2de613a0 100644 --- a/core/storage/buffer_pool.rs +++ b/core/storage/buffer_pool.rs @@ -135,41 +135,6 @@ impl Default for BufferPool { } } -/// Helper for initializing the pool properly. We need buffers for IO to read the -/// header to discover the page_size, so we don't want to initialize a whole arena -/// just to drop it when we find out the page size is different. So we have this -/// builder that is returned which can either be called `finalize` with the page size, -/// or it will simply finalize with the default page size when it goes out of scope. -pub struct BufferPoolBuilder; -impl BufferPoolBuilder { - pub fn finalize_page_size(&self, page_size: usize) -> crate::Result<()> { - let pool = BUFFER_POOL.get().expect("BufferPool must be initialized"); - let inner = pool.inner_mut(); - if inner.arena.is_some() { - return Ok(()); - } - tracing::trace!("finalize page size called with size {page_size}"); - if page_size != BufferPool::DEFAULT_PAGE_SIZE { - // so far we have handed out some temporary buffers, since the page size is not - // default, we need to clear the cache so they aren't reused for other operations. - TEMP_BUFFER_CACHE.with(|cache| { - cache.borrow_mut().reinit_cache(page_size); - }); - } - inner.db_page_size.store(page_size, Ordering::Relaxed); - inner.init_arena() - } -} - -/// If `begin_init` is called without `finalize`, we will just finalize -/// the pool with the default page size. -impl Drop for BufferPoolBuilder { - fn drop(&mut self) { - // finalize_page_size is idempotent, and will not do anything if an arena already exists - let _ = self.finalize_page_size(BufferPool::DEFAULT_PAGE_SIZE); - } -} - impl BufferPool { pub const DEFAULT_ARENA_SIZE: usize = 4 * 1024 * 1024; // 4MB arena pub const DEFAULT_PAGE_SIZE: usize = 4096; // 4KB default page size @@ -197,9 +162,8 @@ impl BufferPool { pool.inner().allocate(len) } - pub fn get_page() -> Buffer { - let pool = BUFFER_POOL.get().expect("BufferPool must be initialized"); - let inner = pool.inner(); + pub fn get_page(&self) -> Buffer { + let inner = self.inner(); inner.allocate(inner.db_page_size.load(Ordering::Relaxed)) } @@ -212,16 +176,37 @@ impl BufferPool { unsafe { &mut *self.inner.get() } } - /// Initialize the pool to the default page size, WITHOUT creating an arena - /// Arena will be created when set_page_size is called - pub fn begin_init(io: &Arc, arena_size: usize) -> BufferPoolBuilder { + /// Initialize the pool to the default page size, **without** creating an arena + /// Arena will be created when finalize_page_size is called, + /// until then the pool will return temporary buffers to prevent reallocation of the + /// arena if the page size is set to something other than the default value. + pub fn begin_init(io: &Arc, arena_size: usize) -> Arc { let pool = BUFFER_POOL.get_or_init(|| Arc::new(BufferPool::new(arena_size))); let inner = pool.inner_mut(); // Just store the IO handle, don't create arena yet if inner.io.is_none() { inner.io = Some(Arc::clone(io)); } - BufferPoolBuilder {} + pool.clone() + } + + pub fn finalize_page_size(&self, page_size: usize) -> crate::Result> { + let pool = BUFFER_POOL.get().expect("BufferPool must be initialized"); + let inner = pool.inner_mut(); + if inner.arena.is_some() { + return Ok(pool.clone()); + } + tracing::trace!("finalize page size called with size {page_size}"); + if page_size != BufferPool::DEFAULT_PAGE_SIZE { + // so far we have handed out some temporary buffers, since the page size is not + // default, we need to clear the cache so they aren't reused for other operations. + TEMP_BUFFER_CACHE.with(|cache| { + cache.borrow_mut().reinit_cache(page_size); + }); + } + inner.db_page_size.store(page_size, Ordering::Relaxed); + inner.init_arena()?; + Ok(pool.clone()) } #[inline] diff --git a/core/storage/page_cache.rs b/core/storage/page_cache.rs index 2a2dcc04f..2297ab322 100644 --- a/core/storage/page_cache.rs +++ b/core/storage/page_cache.rs @@ -621,10 +621,10 @@ impl PageHashMap { #[cfg(test)] mod tests { use super::*; - use crate::io::{Buffer, BufferData}; use crate::storage::page_cache::CacheError; use crate::storage::pager::{Page, PageRef}; use crate::storage::sqlite3_ondisk::PageContent; + use crate::BufferPool; use std::ptr::NonNull; use std::{num::NonZeroUsize, pin::Pin, rc::Rc, sync::Arc}; @@ -642,8 +642,7 @@ mod tests { pub fn page_with_content(page_id: usize) -> PageRef { let page = Arc::new(Page::new(page_id)); { - let buffer_drop_fn = Rc::new(|_data: BufferData| {}); - let buffer = Buffer::new(Pin::new(vec![0; 4096]), buffer_drop_fn); + let buffer = BufferPool::allocate(4096); let page_content = PageContent { offset: 0, buffer: Arc::new(buffer), diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 11e5999b1..d685712d1 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -1,17 +1,20 @@ use crate::result::LimboResult; -use crate::storage::btree::BTreePageInner; -use crate::storage::buffer_pool::BufferPool; -use crate::storage::database::DatabaseStorage; -use crate::storage::sqlite3_ondisk::{ - self, parse_wal_frame_header, DatabaseHeader, PageContent, PageSize, PageType, +use crate::storage::{ + btree::BTreePageInner, + buffer_pool::BufferPool, + database::DatabaseStorage, + sqlite3_ondisk::{ + self, parse_wal_frame_header, DatabaseHeader, PageContent, PageSize, PageType, + }, + wal::{CheckpointResult, Wal}, }; -use crate::storage::wal::{CheckpointResult, Wal}; -use crate::types::{IOResult, WalFrameInfo}; use crate::util::IOExt as _; -use crate::{return_if_io, Completion, TransactionState}; -use crate::{turso_assert, Buffer, Connection, LimboError, Result}; +use crate::{ + return_if_io, turso_assert, Completion, Connection, IOResult, LimboError, Result, + TransactionState, WalFrameInfo, +}; use parking_lot::RwLock; -use std::cell::{Cell, OnceCell, RefCell, UnsafeCell}; +use std::cell::{Cell, OnceCell, UnsafeCell}; use std::collections::HashSet; use std::hash; use std::rc::Rc; @@ -2151,12 +2154,8 @@ impl Pager { pub fn allocate_new_page(page_id: usize, buffer_pool: &Arc, offset: usize) -> PageRef { let page = Arc::new(Page::new(page_id)); { - let buffer = buffer_pool.get(); - let bp = buffer_pool.clone(); - let drop_fn = Rc::new(move |buf| { - bp.put(buf); - }); - let buffer = Arc::new(Buffer::new(buffer, drop_fn)); + let buffer = buffer_pool.get_page(); + let buffer = Arc::new(buffer); page.set_loaded(); page.get().contents = Some(PageContent::new(offset, buffer)); } diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 8b2a39b4f..0fee652d7 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -852,13 +852,9 @@ pub fn begin_read_page( allow_empty_read: bool, ) -> Result { tracing::trace!("begin_read_btree_page(page_idx = {})", page_idx); - let buf = buffer_pool.get(); - let drop_fn = Rc::new(move |buf| { - let buffer_pool = buffer_pool.clone(); - buffer_pool.put(buf); - }); + let buf = buffer_pool.get_page(); #[allow(clippy::arc_with_non_send_sync)] - let buf = Arc::new(Buffer::new(buf, drop_fn)); + let buf = Arc::new(buf); let complete = Box::new(move |mut buf: Arc, bytes_read: i32| { let buf_len = buf.len(); turso_assert!( @@ -867,7 +863,7 @@ pub fn begin_read_page( ); let page = page.clone(); if bytes_read == 0 { - buf = Arc::new(Buffer::allocate(0, Rc::new(|_| {}))); + buf = Arc::new(Buffer::new_temporary(0)); } if finish_read_page(page_idx, buf, page.clone()).is_err() { page.set_error(); @@ -1557,10 +1553,9 @@ pub fn write_varint_to_vec(value: u64, payload: &mut Vec) { /// We need to read the WAL file on open to reconstruct the WAL frame cache. pub fn read_entire_wal_dumb(file: &Arc) -> Result>> { - let drop_fn = Rc::new(|_buf| {}); let size = file.size()?; #[allow(clippy::arc_with_non_send_sync)] - let buf_for_pread = Arc::new(Buffer::allocate(size as usize, drop_fn)); + let buf_for_pread = Arc::new(Buffer::new_temporary(size as usize)); let header = Arc::new(SpinLock::new(WalHeader::default())); #[allow(clippy::arc_with_non_send_sync)] let wal_file_shared_ret = Arc::new(UnsafeCell::new(WalFileShared { @@ -1768,13 +1763,11 @@ pub fn begin_read_wal_frame_raw( io: &Arc, offset: usize, page_size: u32, - complete: Box, i32)>, + complete: Box, ) -> Result { tracing::trace!("begin_read_wal_frame_raw(offset={})", offset); - let drop_fn = Rc::new(|_buf| {}); - let buf = Arc::new(Buffer::allocate( + let buf = Arc::new(BufferPool::allocate( page_size as usize + WAL_FRAME_HEADER_SIZE, - drop_fn, )); #[allow(clippy::arc_with_non_send_sync)] let c = Completion::new_read(buf, complete); @@ -1786,15 +1779,11 @@ pub fn begin_read_wal_frame( io: &Arc, offset: usize, buffer_pool: Arc, - complete: Box, i32)>, + complete: Box, ) -> Result { tracing::trace!("begin_read_wal_frame(offset={})", offset); - let buf = buffer_pool.get(); - let drop_fn = Rc::new(move |buf| { - let buffer_pool = buffer_pool.clone(); - buffer_pool.put(buf); - }); - let buf = Arc::new(Buffer::new(buf, drop_fn)); + let buf = buffer_pool.get_page(); + let buf = Arc::new(buf); #[allow(clippy::arc_with_non_send_sync)] let c = Completion::new_read(buf, complete); let c = io.pread(offset, c)?; @@ -1830,8 +1819,7 @@ pub fn prepare_wal_frame( ) -> ((u32, u32), Arc) { tracing::trace!(page_number); - let drop_fn = Rc::new(|_buf| {}); - let buffer = Buffer::allocate(page_size as usize + WAL_FRAME_HEADER_SIZE, drop_fn); + let buffer = BufferPool::allocate(page_size as usize + WAL_FRAME_HEADER_SIZE); let frame = buffer.as_mut_slice(); frame[WAL_FRAME_HEADER_SIZE..].copy_from_slice(page); @@ -1858,9 +1846,7 @@ pub fn prepare_wal_frame( pub fn begin_write_wal_header(io: &Arc, header: &WalHeader) -> Result { tracing::trace!("begin_write_wal_header"); let buffer = { - let drop_fn = Rc::new(|_buf| {}); - - let buffer = Buffer::allocate(WAL_HEADER_SIZE, drop_fn); + let buffer = BufferPool::new_temporary(WAL_HEADER_SIZE); let buf = buffer.as_mut_slice(); buf[0..4].copy_from_slice(&header.magic.to_be_bytes()); diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 86e6a02a2..1f7f8e0c2 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -336,10 +336,8 @@ impl Batch { self.items.insert(id, buf_clone); // Re-initialize scratch with a fresh buffer - let raw = pool.get(); - let pool_clone = pool.clone(); - let drop_fn = Rc::new(move |b| pool_clone.put(b)); - let new_buf = Arc::new(Buffer::new(raw, drop_fn)); + let raw = pool.get_page(); + let new_buf = Arc::new(raw); unsafe { let inner = &mut *scratch.inner.get(); @@ -1218,14 +1216,9 @@ impl WalFile { buffer_pool: Arc, ) -> Self { let checkpoint_page = Arc::new(Page::new(0)); - let buffer = buffer_pool.get(); + let buffer = buffer_pool.get_page(); { - let buffer_pool = buffer_pool.clone(); - let drop_fn = Rc::new(move |buf| { - buffer_pool.put(buf); - }); - checkpoint_page.get().contents = - Some(PageContent::new(0, Arc::new(Buffer::new(buffer, drop_fn)))); + checkpoint_page.get().contents = Some(PageContent::new(0, Arc::new(buffer))); } let header = unsafe { shared.get().as_mut().unwrap().wal_header.lock() }; From 5750b1229cb63056117d76b2c7bfdaf600e9d743 Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Sat, 2 Aug 2025 14:06:09 -0400 Subject: [PATCH 06/29] Setup and initialize pool properly --- core/lib.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/core/lib.rs b/core/lib.rs index 648037c6a..4736c6cb4 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -387,7 +387,8 @@ impl Database { None => unsafe { (*shared_wal.get()).page_size() as usize }, Some(size) => size, }; - let buffer_pool = Arc::new(BufferPool::new(Some(size))); + let buffer_pool = BufferPool::begin_init(&self.io, BufferPool::DEFAULT_ARENA_SIZE) + .finalize_page_size(size)?; let db_state = self.db_state.clone(); let wal = Rc::new(RefCell::new(WalFile::new( @@ -407,7 +408,8 @@ impl Database { return Ok(pager); } - let buffer_pool = Arc::new(BufferPool::new(page_size)); + let buffer_pool = BufferPool::begin_init(&self.io, BufferPool::DEFAULT_ARENA_SIZE); + // No existing WAL; create one. let db_state = self.db_state.clone(); let mut pager = Pager::new( @@ -428,11 +430,11 @@ impl Database { .block(|| pager.with_header(|header| header.page_size)) .unwrap_or_default() .get(); - buffer_pool.set_page_size(size as usize); size } }; + buffer_pool.finalize_page_size(size as usize); let wal_path = format!("{}-wal", self.path); let file = self.io.open_file(&wal_path, OpenFlags::Create, false)?; let real_shared_wal = WalFileShared::new_shared(size, &self.io, file)?; From 0884fec799296faaaa2f8ec6c880729c3fa78b7c Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Sat, 2 Aug 2025 14:06:38 -0400 Subject: [PATCH 07/29] Use parent buffer pool for ephemeral pager and wal --- core/vdbe/execute.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 10415eb64..f87253f4e 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -3,6 +3,7 @@ use crate::function::AlterTableFunc; use crate::numeric::{NullableInteger, Numeric}; use crate::schema::Table; use crate::storage::btree::{integrity_check, IntegrityCheckError, IntegrityCheckState}; +use crate::storage::buffer_pool::BUFFER_POOL; use crate::storage::database::DatabaseFile; use crate::storage::page_cache::DumbLruPageCache; use crate::storage::pager::{AtomicDbState, CreateBTreeFlags, DbState}; @@ -62,9 +63,7 @@ use crate::{ vector::{vector32, vector64, vector_distance_cos, vector_distance_l2, vector_extract}, }; -use crate::{ - info, turso_assert, BufferPool, OpenFlags, RefValue, Row, StepResult, TransactionState, -}; +use crate::{info, turso_assert, OpenFlags, RefValue, Row, StepResult, TransactionState}; use super::{ insn::{Cookie, RegisterOrLiteral}, @@ -6460,7 +6459,7 @@ pub fn op_open_ephemeral( .block(|| pager.with_header(|header| header.page_size))? .get(); - let buffer_pool = Arc::new(BufferPool::new(Some(page_size as usize))); + let buffer_pool = BUFFER_POOL.get().expect("Buffer pool not initialized"); let page_cache = Arc::new(RwLock::new(DumbLruPageCache::default())); let pager = Rc::new(Pager::new( @@ -6478,7 +6477,6 @@ pub fn op_open_ephemeral( .block(|| pager.with_header(|header| header.page_size)) .unwrap_or_default() .get() as usize; - buffer_pool.set_page_size(page_size); state.op_open_ephemeral_state = OpOpenEphemeralState::StartingTxn { pager }; } From 036ae596c4a6f9bec0dcc1ebbfb93a2813b33ca9 Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Sat, 2 Aug 2025 14:07:23 -0400 Subject: [PATCH 08/29] Fix test with old buffer api --- tests/integration/query_processing/test_btree.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/tests/integration/query_processing/test_btree.rs b/tests/integration/query_processing/test_btree.rs index e98b762ed..7ad4ac828 100644 --- a/tests/integration/query_processing/test_btree.rs +++ b/tests/integration/query_processing/test_btree.rs @@ -428,10 +428,13 @@ impl BTreeGenerator<'_> { } fn write_at(io: &impl IO, file: Arc, offset: usize, data: &[u8]) { - let completion = Completion::new_write(|_| {}); - let drop_fn = Rc::new(move |_| {}); #[allow(clippy::arc_with_non_send_sync)] - let buffer = Arc::new(Buffer::new(Pin::new(data.to_vec()), drop_fn)); + let buffer = Arc::new(Buffer::new(data.to_vec())); + let _buf = buffer.clone(); + let completion = Completion::new_write(|_| { + // reference the buffer to keep alive for async io + let _buf = _buf.clone(); + }); let result = file.pwrite(offset, buffer, completion).unwrap(); while !result.is_completed() { io.run_once().unwrap(); From 7ea52a3f892df6bb2a5ab550857d11cedec5c3c1 Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Sat, 2 Aug 2025 17:31:01 -0400 Subject: [PATCH 09/29] Fix changing page size and initialization for buffer pool --- core/lib.rs | 20 ++++++++++++-------- core/storage/buffer_pool.rs | 9 +++++---- core/storage/pager.rs | 9 +++++---- 3 files changed, 22 insertions(+), 16 deletions(-) diff --git a/core/lib.rs b/core/lib.rs index 4736c6cb4..f79c9358d 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -380,6 +380,11 @@ impl Database { } fn init_pager(&self, page_size: Option) -> Result { + let arena_size = if std::env::var("CI").is_ok_and(|v| v.eq_ignore_ascii_case("true")) { + BufferPool::TEST_AREA_SIZE + } else { + BufferPool::DEFAULT_ARENA_SIZE + }; // Open existing WAL file if present let mut maybe_shared_wal = self.maybe_shared_wal.write(); if let Some(shared_wal) = maybe_shared_wal.clone() { @@ -387,8 +392,8 @@ impl Database { None => unsafe { (*shared_wal.get()).page_size() as usize }, Some(size) => size, }; - let buffer_pool = BufferPool::begin_init(&self.io, BufferPool::DEFAULT_ARENA_SIZE) - .finalize_page_size(size)?; + let buffer_pool = + BufferPool::begin_init(&self.io, arena_size).finalize_page_size(size)?; let db_state = self.db_state.clone(); let wal = Rc::new(RefCell::new(WalFile::new( @@ -407,8 +412,7 @@ impl Database { )?; return Ok(pager); } - - let buffer_pool = BufferPool::begin_init(&self.io, BufferPool::DEFAULT_ARENA_SIZE); + let buffer_pool = BufferPool::begin_init(&self.io, arena_size); // No existing WAL; create one. let db_state = self.db_state.clone(); @@ -425,16 +429,15 @@ impl Database { let size = match page_size { Some(size) => size as u32, None => { - let size = pager + pager // if None is passed in, we know that we already initialized so we can safely call `with_header` here .io .block(|| pager.with_header(|header| header.page_size)) .unwrap_or_default() - .get(); - size + .get() } }; - buffer_pool.finalize_page_size(size as usize); + pager.page_size.set(Some(size)); let wal_path = format!("{}-wal", self.path); let file = self.io.open_file(&wal_path, OpenFlags::Create, false)?; let real_shared_wal = WalFileShared::new_shared(size, &self.io, file)?; @@ -1375,6 +1378,7 @@ impl Connection { } *self._db.maybe_shared_wal.write() = None; + self.pager.borrow_mut().clear_page_cache(); let pager = self._db.init_pager(Some(size as usize))?; self.pager.replace(Rc::new(pager)); self.pager.borrow().set_initial_page_size(size); diff --git a/core/storage/buffer_pool.rs b/core/storage/buffer_pool.rs index f2de613a0..b0d2b79c0 100644 --- a/core/storage/buffer_pool.rs +++ b/core/storage/buffer_pool.rs @@ -137,6 +137,7 @@ impl Default for BufferPool { impl BufferPool { pub const DEFAULT_ARENA_SIZE: usize = 4 * 1024 * 1024; // 4MB arena + pub const TEST_AREA_SIZE: usize = 512 * 1024; // 512KB arena for testing pub const DEFAULT_PAGE_SIZE: usize = 4096; // 4KB default page size const MAX_ARENA_SIZE: usize = 32 * 1024 * 1024; // 32MB max arena @@ -193,9 +194,6 @@ impl BufferPool { pub fn finalize_page_size(&self, page_size: usize) -> crate::Result> { let pool = BUFFER_POOL.get().expect("BufferPool must be initialized"); let inner = pool.inner_mut(); - if inner.arena.is_some() { - return Ok(pool.clone()); - } tracing::trace!("finalize page size called with size {page_size}"); if page_size != BufferPool::DEFAULT_PAGE_SIZE { // so far we have handed out some temporary buffers, since the page size is not @@ -204,6 +202,9 @@ impl BufferPool { cache.borrow_mut().reinit_cache(page_size); }); } + if inner.arena.is_some() { + return Ok(pool.clone()); + } inner.db_page_size.store(page_size, Ordering::Relaxed); inner.init_arena()?; Ok(pool.clone()) @@ -253,7 +254,7 @@ impl PoolInner { ) { Ok(arena) => { tracing::trace!( - "Growing buffer pool: added arena {} with size {} MB", + "added arena {} with size {} MB", arena.id, arena_size / (1024 * 1024) ); diff --git a/core/storage/pager.rs b/core/storage/pager.rs index d685712d1..fc2562ba5 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -1766,6 +1766,8 @@ impl Pager { if let Some(size) = self.page_size.get() { default_header.page_size = PageSize::new(size).expect("page size"); } + self.buffer_pool + .finalize_page_size(default_header.page_size.get() as usize)?; let page = allocate_new_page(1, &self.buffer_pool, 0); let contents = page.get_contents(); @@ -2431,10 +2433,9 @@ mod ptrmap_tests { )); // Construct interfaces for the pager - let buffer_pool = Arc::new(BufferPool::new(Some(page_size as usize))); - let page_cache = Arc::new(RwLock::new(DumbLruPageCache::new( - (initial_db_pages + 10) as usize, - ))); + let sz = initial_db_pages + 10; + let buffer_pool = BufferPool::begin_init(&io, (sz * page_size) as usize); + let page_cache = Arc::new(RwLock::new(DumbLruPageCache::new(sz as usize))); let wal = Rc::new(RefCell::new(WalFile::new( io.clone(), From 2e072cadb0fbb72f66f5a583f548d964e8d67e79 Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Sat, 2 Aug 2025 17:31:32 -0400 Subject: [PATCH 10/29] Add CI=true env var so the tests dont each allocate 4mb by default --- scripts/limbo-sqlite3 | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scripts/limbo-sqlite3 b/scripts/limbo-sqlite3 index 1e8e63290..b59651446 100755 --- a/scripts/limbo-sqlite3 +++ b/scripts/limbo-sqlite3 @@ -11,7 +11,7 @@ EXPERIMENTAL_FLAGS="" # if RUST_LOG is non-empty, enable tracing output if [ -n "$RUST_LOG" ]; then - "$TURSODB" -m list -q $EXPERIMENTAL_FLAGS -t testing/test.log "$@" + CI=true "$TURSODB" -m list -q $EXPERIMENTAL_FLAGS -t testing/test.log "$@" else - "$TURSODB" -m list -q $EXPERIMENTAL_FLAGS "$@" + CI=true "$TURSODB" -m list -q $EXPERIMENTAL_FLAGS "$@" fi From dffa47b0488f4ad601295a9707cde749889b5b2b Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Sat, 2 Aug 2025 17:32:32 -0400 Subject: [PATCH 11/29] Use temp buffer for wal header --- core/storage/sqlite3_ondisk.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 0fee652d7..f6cf88881 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -1846,7 +1846,7 @@ pub fn prepare_wal_frame( pub fn begin_write_wal_header(io: &Arc, header: &WalHeader) -> Result { tracing::trace!("begin_write_wal_header"); let buffer = { - let buffer = BufferPool::new_temporary(WAL_HEADER_SIZE); + let buffer = Buffer::new_temporary(WAL_HEADER_SIZE); let buf = buffer.as_mut_slice(); buf[0..4].copy_from_slice(&header.magic.to_be_bytes()); From 9d1ca1c8ca30602d80245182de66da1ac77edbb1 Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Sat, 2 Aug 2025 18:27:00 -0400 Subject: [PATCH 12/29] Add ReadFixed/WriteFixed opcodes for buffers from registered arena --- core/io/io_uring.rs | 47 ++++++++++++++++++++++++++++++------- core/lib.rs | 2 +- core/storage/btree.rs | 2 +- core/storage/buffer_pool.rs | 10 ++++---- core/storage/pager.rs | 2 +- 5 files changed, 48 insertions(+), 15 deletions(-) diff --git a/core/io/io_uring.rs b/core/io/io_uring.rs index 61ba4badf..c39bff69a 100644 --- a/core/io/io_uring.rs +++ b/core/io/io_uring.rs @@ -584,17 +584,30 @@ impl File for UringFile { fn pread(&self, pos: usize, c: Completion) -> Result { let r = c.as_read(); - trace!("pread(pos = {}, length = {})", pos, r.buf().len()); let mut io = self.io.borrow_mut(); let read_e = { let buf = r.buf(); let len = buf.len(); - let buf = buf.as_mut_ptr(); with_fd!(self, |fd| { - io_uring::opcode::Read::new(fd, buf, len as u32) - .offset(pos as u64) - .build() - .user_data(get_key(c.clone())) + if let Some(idx) = buf.fixed_id() { + trace!( + "pread_fixed(pos = {}, length = {}, idx = {})", + pos, + len, + idx + ); + io_uring::opcode::ReadFixed::new(fd, buf.as_mut_ptr(), len as u32, idx as u16) + .offset(pos as u64) + .build() + .user_data(get_key(c.clone())) + } else { + trace!("pread(pos = {}, length = {})", pos, len); + // Use Read opcode if fixed buffer is not available + io_uring::opcode::Read::new(fd, buf.as_mut_ptr(), len as u32) + .offset(pos as u64) + .build() + .user_data(get_key(c.clone())) + } }) }; io.ring.submit_entry(&read_e); @@ -604,12 +617,30 @@ impl File for UringFile { fn pwrite(&self, pos: usize, buffer: Arc, c: Completion) -> Result { let mut io = self.io.borrow_mut(); let write = { - trace!("pwrite(pos = {}, length = {})", pos, buffer.len()); with_fd!(self, |fd| { - io_uring::opcode::Write::new(fd, buffer.as_ptr(), buffer.len() as u32) + if let Some(idx) = buf.fixed_id() { + trace!( + "pwrite_fixed(pos = {}, length = {}, idx= {})", + pos, + buffer.len(), + idx + ); + io_uring::opcode::WriteFixed::new( + fd, + buffer.as_ptr(), + buffer.len() as u32, + idx as u16, + ) .offset(pos as u64) .build() .user_data(get_key(c.clone())) + } else { + trace!("pwrite(pos = {}, length = {})", pos, buffer.len()); + io_uring::opcode::Write::new(fd, buffer.as_ptr(), buffer.len() as u32) + .offset(pos as u64) + .build() + .user_data(get_key(c.clone())) + } }) }; io.ring.submit_entry(&write); diff --git a/core/lib.rs b/core/lib.rs index f79c9358d..adc9849fb 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -393,7 +393,7 @@ impl Database { Some(size) => size, }; let buffer_pool = - BufferPool::begin_init(&self.io, arena_size).finalize_page_size(size)?; + BufferPool::begin_init(&self.io, arena_size).finalize_with_page_size(size)?; let db_state = self.db_state.clone(); let wal = Rc::new(RefCell::new(WalFile::new( diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 2d432fe89..b292357a0 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -8452,7 +8452,7 @@ mod tests { let io: Arc = Arc::new(MemoryIO::new()); let buffer_pool = - BufferPool::begin_init(&io, page_size * 128).finalize_page_size(page_size); + BufferPool::begin_init(&io, page_size * 128).finalize_with_page_size(page_size); let db_file = Arc::new(DatabaseFile::new( io.open_file(":memory:", OpenFlags::Create, false).unwrap(), diff --git a/core/storage/buffer_pool.rs b/core/storage/buffer_pool.rs index b0d2b79c0..dbe9e0f92 100644 --- a/core/storage/buffer_pool.rs +++ b/core/storage/buffer_pool.rs @@ -177,9 +177,9 @@ impl BufferPool { unsafe { &mut *self.inner.get() } } - /// Initialize the pool to the default page size, **without** creating an arena - /// Arena will be created when finalize_page_size is called, - /// until then the pool will return temporary buffers to prevent reallocation of the + /// Create a static `BufferPool` initialize the pool to the default page size, **without** + /// creating an arena. Arena will be created when `[BufferPool::finalize_page_size]` is called. + /// Until then the pool will return temporary buffers to prevent reallocation of the /// arena if the page size is set to something other than the default value. pub fn begin_init(io: &Arc, arena_size: usize) -> Arc { let pool = BUFFER_POOL.get_or_init(|| Arc::new(BufferPool::new(arena_size))); @@ -191,7 +191,9 @@ impl BufferPool { pool.clone() } - pub fn finalize_page_size(&self, page_size: usize) -> crate::Result> { + /// Call when `[Database::db_state]` is initialized, providing the `page_size` to allocate + /// an arena for the pool. Before this call, the pool will use temporary buffers + pub fn finalize_with_page_size(&self, page_size: usize) -> crate::Result> { let pool = BUFFER_POOL.get().expect("BufferPool must be initialized"); let inner = pool.inner_mut(); tracing::trace!("finalize page size called with size {page_size}"); diff --git a/core/storage/pager.rs b/core/storage/pager.rs index fc2562ba5..74cfa1594 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -1767,7 +1767,7 @@ impl Pager { default_header.page_size = PageSize::new(size).expect("page size"); } self.buffer_pool - .finalize_page_size(default_header.page_size.get() as usize)?; + .finalize_with_page_size(default_header.page_size.get() as usize)?; let page = allocate_new_page(1, &self.buffer_pool, 0); let contents = page.get_contents(); From cc75bc448ec8b2f3bd0166612f713911d1ed7fd0 Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Sat, 2 Aug 2025 21:47:32 -0400 Subject: [PATCH 13/29] Move TLC buffer cache to io/mod --- core/io/mod.rs | 61 +++++++++++++++- core/storage/buffer_pool.rs | 141 +++++++----------------------------- 2 files changed, 85 insertions(+), 117 deletions(-) diff --git a/core/io/mod.rs b/core/io/mod.rs index e78b9d519..0bded8c49 100644 --- a/core/io/mod.rs +++ b/core/io/mod.rs @@ -1,5 +1,6 @@ -use crate::storage::buffer_pool::{ArenaBuffer, TEMP_BUFFER_CACHE}; -use crate::Result; +use crate::storage::buffer_pool::ArenaBuffer; +use crate::storage::sqlite3_ondisk::WAL_FRAME_HEADER_SIZE; +use crate::{BufferPool, Result}; use bitflags::bitflags; use cfg_block::cfg_block; use std::fmt; @@ -330,9 +331,11 @@ impl Drop for Buffer { fn drop(&mut self) { let len = self.len(); if let Self::Heap(buf) = self { - crate::storage::buffer_pool::TEMP_BUFFER_CACHE.with(|cache| { + TEMP_BUFFER_CACHE.with(|cache| { + let mut cache = cache.borrow_mut(); + // take ownership of the buffer by swapping it with a dummy let buffer = std::mem::replace(buf, Pin::new(vec![].into_boxed_slice())); - cache.borrow_mut().return_buffer(buffer, len); + cache.return_buffer(buffer, len); }); } } @@ -406,6 +409,56 @@ impl Buffer { } } +thread_local! { + /// thread local cache to re-use temporary buffers to prevent churn when pool overflows + pub static TEMP_BUFFER_CACHE: RefCell = RefCell::new(TempBufferCache::new()); +} + +pub(crate) struct TempBufferCache { + // Buffers indexed by size, we only cache common sizes + page_size: usize, + page_buffers: Vec, + wal_frame_buffers: Vec, + max_cached: usize, +} + +impl TempBufferCache { + fn new() -> Self { + Self { + page_size: BufferPool::DEFAULT_PAGE_SIZE, + page_buffers: Vec::with_capacity(8), + wal_frame_buffers: Vec::with_capacity(8), + max_cached: 512, + } + } + + pub fn reinit_cache(&mut self, page_size: usize) { + self.page_buffers.clear(); + self.wal_frame_buffers.clear(); + self.page_size = page_size; + } + + fn get_buffer(&mut self, size: usize) -> Option { + match size { + sz if sz == self.page_size => self.page_buffers.pop(), + sz if sz == (self.page_size + WAL_FRAME_HEADER_SIZE) => self.wal_frame_buffers.pop(), + _ => None, + } + } + + fn return_buffer(&mut self, buff: BufferData, len: usize) { + let sz = self.page_size; + let cache = match len { + n if n.eq(&sz) => &mut self.page_buffers, + n if n.eq(&(sz + WAL_FRAME_HEADER_SIZE)) => &mut self.wal_frame_buffers, + _ => return, + }; + if self.max_cached > cache.len() { + cache.push(buff); + } + } +} + cfg_block! { #[cfg(all(target_os = "linux", feature = "io_uring"))] { mod io_uring; diff --git a/core/storage/buffer_pool.rs b/core/storage/buffer_pool.rs index dbe9e0f92..5f824b9cb 100644 --- a/core/storage/buffer_pool.rs +++ b/core/storage/buffer_pool.rs @@ -1,72 +1,41 @@ use crate::fast_lock::SpinLock; -use crate::io::BufferData; +use crate::io::TEMP_BUFFER_CACHE; use crate::storage::sqlite3_ondisk::WAL_FRAME_HEADER_SIZE; use crate::{turso_assert, Buffer, LimboError, IO}; use parking_lot::Mutex; -use std::cell::{RefCell, UnsafeCell}; +use std::cell::UnsafeCell; use std::ptr::NonNull; use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering}; use std::sync::{Arc, OnceLock}; pub static BUFFER_POOL: OnceLock> = OnceLock::new(); -// Packed metadata structure -// Bits 0-23: logical length (16MB max) -// Bits 26-40: arena_id (15 bits) -// Bits 41-55: page_idx (15 bits) -#[derive(Clone, Copy)] -#[repr(transparent)] -struct Metadata(u64); -impl Metadata { - const fn new(logical_len: usize, arena_id: u32, page_idx: u32) -> Self { - let mut meta = logical_len as u64 & 0xFFFFFF; - meta |= ((arena_id as u64) & 0x7FFF) << 26; - meta |= ((page_idx as u64) & 0x7FFF) << 41; - Self(meta) - } - const fn logical_len(&self) -> usize { - (self.0 & 0xFFFFFF) as usize - } - - const fn arena_id(&self) -> u32 { - ((self.0 >> 26) & 0x7FFF) as u32 - } - - const fn page_idx(&self) -> u32 { - ((self.0 >> 41) & 0x7FFF) as u32 - } -} - #[derive(Debug)] -/// A buffer allocated from an arena, or a clone of one. -/// NOTE: we currently only deep clone a buffer in the btree when defragmenting. -/// we can remove all the plumbing when that is merged. +/// A buffer allocated from an arena from `[BufferPool]` pub struct ArenaBuffer { ptr: NonNull, - meta: Metadata, + arena_id: u32, + page_idx: u32, + len: usize, } + const REGISTERED_ID: u32 = 0; -impl std::fmt::Debug for Metadata { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("BufferMetadata") - .field("logical_len", &self.logical_len()) - .field("arena_id", &self.arena_id()) - .finish() - } -} - impl ArenaBuffer { - const fn new(ptr: NonNull, meta: Metadata) -> Self { - ArenaBuffer { ptr, meta } + const fn new(ptr: NonNull, len: usize, arena_id: u32, page_idx: u32) -> Self { + ArenaBuffer { + ptr, + arena_id, + page_idx, + len, + } } #[inline(always)] /// Returns the `id` of the underlying arena if it is registered with `io_uring` pub fn fixed_id(&self) -> Option { - let id = self.meta.arena_id(); - if id == REGISTERED_ID { - Some(id) + if self.arena_id == REGISTERED_ID { + Some(REGISTERED_ID) } else { None } @@ -75,7 +44,7 @@ impl ArenaBuffer { /// The requested size of the allocation. The actual size is always rounded up to /// the next multiple of ARENA_PAGE_SIZE. pub const fn logical_len(&self) -> usize { - self.meta.logical_len() + self.len } pub fn as_slice(&self) -> &[u8] { unsafe { std::slice::from_raw_parts(self.ptr.as_ptr(), self.logical_len()) } @@ -90,11 +59,7 @@ impl Drop for ArenaBuffer { let pool = BUFFER_POOL .get() .expect("BufferPool not initialized, cannot free ArenaBuffer"); - pool.free( - self.meta.arena_id(), - self.meta.page_idx(), - self.logical_len(), - ); + pool.free(self.logical_len(), self.arena_id, self.page_idx); } } @@ -213,12 +178,14 @@ impl BufferPool { } #[inline] - pub fn free(&self, arena_id: u32, page_id: u32, size: usize) { - self.inner_mut().free(arena_id, page_id, size); + pub fn free(&self, size: usize, arena_id: u32, page_id: u32) { + self.inner_mut().free(size, arena_id, page_id); } } impl PoolInner { + /// Allocate a buffer of the given length from the pool, falling back to + /// temporary thread local buffers if the pool is not initialized or full pub fn allocate(&self, len: usize) -> Buffer { turso_assert!(len > 0, "Cannot allocate zero-length buffer"); let Some(arena) = self.arena.as_ref() else { @@ -232,10 +199,7 @@ impl PoolInner { arena.id, first_idx ); - return Buffer::new_pooled(ArenaBuffer::new( - ptr, - Metadata::new(len, arena.id, first_idx), - )); + return Buffer::new_pooled(ArenaBuffer::new(ptr, len, arena.id, first_idx)); } Buffer::new_temporary(len) } @@ -272,7 +236,7 @@ impl PoolInner { } } - pub fn free(&mut self, arena_id: u32, page_idx: u32, size: usize) { + pub fn free(&mut self, size: usize, arena_id: u32, page_idx: u32) { let arena = self .arena .as_mut() @@ -346,7 +310,7 @@ impl Arena { return None; } if pages == 1 { - // fast path: most allocations are single pages + // fast path: for now, most/all allocations are single pages if let Some(page_idx) = free.pop() { self.allocated_pages.fetch_add(pages, Ordering::Relaxed); let offset = page_idx as usize * self.page_size; @@ -377,9 +341,10 @@ impl Arena { self.allocated_pages.fetch_sub(count, Ordering::Relaxed); } + #[cold] fn try_alloc_many(&self, pages: usize) -> Option { - // TODO: for now, we don't request buffers > len of a wal frame. - // if we eventually do, we can optimize this further + // TODO (preston): we can optimize this further when we start allocating larger + // contiguous blocks for coalescing. this is 'unused' for now let mut free = self.free_pages.lock(); if pages <= 3 && free.len() >= pages { let start = free.len() - pages; @@ -461,7 +426,6 @@ mod arena { #[cfg(not(unix))] mod arena { - use crate::bufferpool::DEFAULT_PAGE_SIZE; pub fn alloc(len: usize) -> *mut u8 { let layout = std::alloc::Layout::from_size_align(len, std::mem::size_of::()).unwrap(); unsafe { std::alloc::alloc(layout) } @@ -471,52 +435,3 @@ mod arena { unsafe { std::alloc::dealloc(ptr, layout) }; } } - -thread_local! { - pub static TEMP_BUFFER_CACHE: RefCell = RefCell::new(TempBufferCache::new()); -} - -pub(crate) struct TempBufferCache { - // Buffers indexed by size, we only cache common sizes - page_size: usize, - page_buffers: Vec, - wal_frame_buffers: Vec, - max_cached: usize, -} - -impl TempBufferCache { - fn new() -> Self { - Self { - page_size: BufferPool::DEFAULT_PAGE_SIZE, - page_buffers: Vec::with_capacity(8), - wal_frame_buffers: Vec::with_capacity(8), - max_cached: 512, - } - } - - fn reinit_cache(&mut self, page_size: usize) { - self.page_buffers.clear(); - self.wal_frame_buffers.clear(); - self.page_size = page_size; - } - - pub fn get_buffer(&mut self, size: usize) -> Option { - match size { - sz if sz == self.page_size => self.page_buffers.pop(), - sz if sz == (self.page_size + WAL_FRAME_HEADER_SIZE) => self.wal_frame_buffers.pop(), - _ => None, - } - } - - pub fn return_buffer(&mut self, buff: BufferData, len: usize) { - let sz = self.page_size; - let cache = match len { - n if n.eq(&sz) => &mut self.page_buffers, - n if n.eq(&(sz + WAL_FRAME_HEADER_SIZE)) => &mut self.wal_frame_buffers, - _ => return, - }; - if self.max_cached > cache.len() { - cache.push(buff); - } - } -} From fd09fe1237811adff3a08195eb5c1546486f918c Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Sun, 3 Aug 2025 15:03:37 -0400 Subject: [PATCH 14/29] Adjust io_uring to register two arenas, one for frames and the other for db pages --- core/io/io_uring.rs | 54 +++++++++++++++++++++++++++---------- core/storage/buffer_pool.rs | 2 +- 2 files changed, 41 insertions(+), 15 deletions(-) diff --git a/core/io/io_uring.rs b/core/io/io_uring.rs index c39bff69a..183fdda1e 100644 --- a/core/io/io_uring.rs +++ b/core/io/io_uring.rs @@ -32,7 +32,7 @@ const FILES: u32 = 8; const IOVEC_POOL_SIZE: usize = 64; /// Maximum number of iovec entries per writev operation. -/// IOV_MAX is typically 1024, but we limit it to a smaller number +/// IOV_MAX is typically 1024 const MAX_IOVEC_ENTRIES: usize = CKPT_BATCH_PAGES; /// Maximum number of I/O operations to wait for in a single run, @@ -57,6 +57,7 @@ struct WrappedIOUring { struct InnerUringIO { ring: WrappedIOUring, free_files: VecDeque, + free_arenas: [u32; 2], } /// preallocated vec of iovec arrays to avoid allocations during writev operations @@ -117,6 +118,7 @@ impl UringIO { iov_pool: IovecPool::new(), }, free_files: (0..FILES).collect(), + free_arenas: [u32::MAX; 2], }; debug!("Using IO backend 'io-uring'"); Ok(Self { @@ -310,6 +312,13 @@ impl WrappedIOUring { }); let mut iov_count = 0; let mut last_end: Option<(*const u8, usize)> = None; + for (idx, buffer) in st + .bufs + .iter() + .enumerate() + .skip(st.current_buffer_idx) + .take(MAX_IOVEC_ENTRIES) + { for buffer in st.bufs.iter().skip(st.current_buffer_idx) { let ptr = buffer.as_ptr(); let len = buffer.len(); @@ -333,21 +342,34 @@ impl WrappedIOUring { break; } } + // If we have coalesced everything into a single iovec, submit as a single`pwrite` if iov_count == 1 { - // If we have coalesced everything into a single iovec, submit as a single`pwrite` let entry = with_fd!(st.file_id, |fd| { - io_uring::opcode::Write::new( - fd, - iov_allocation[0].iov_base as *const u8, - iov_allocation[0].iov_len as u32, - ) - .offset(st.file_pos as u64) - .build() - .user_data(key) + if let Some(id) = st.bufs[st.current_buffer_idx].borrow().fixed_id() { + io_uring::opcode::WriteFixed::new( + fd, + iov_allocation[0].iov_base as *const u8, + iov_allocation[0].iov_len as u32, + id as u16, + ) + .offset(st.file_pos as u64) + .build() + .user_data(key) + } else { + io_uring::opcode::Write::new( + fd, + iov_allocation[0].iov_base as *const u8, + iov_allocation[0].iov_len as u32, + ) + .offset(st.file_pos as u64) + .build() + .user_data(key) + } }); self.submit_entry(&entry); return; } + // Store the pointers and get the pointer to the iovec array that we pass // to the writev operation, and keep the array itself alive let ptr = iov_allocation.as_ptr() as *mut libc::iovec; @@ -477,16 +499,20 @@ impl IO for UringIO { Arc::new(MemoryIO::new()) } - fn register_fixed_buffer(&self, ptr: std::ptr::NonNull, len: usize) -> Result<()> { + fn register_fixed_buffer(&self, ptr: std::ptr::NonNull, len: usize) -> Result { turso_assert!( len % 512 == 0, "fixed buffer length must be logical block aligned" ); let inner = self.inner.borrow_mut(); - let default_id = 0; + let Some(id) = inner.free_arenas.iter().position(|&x| x == u32::MAX) else { + return Err(LimboError::UringIOError( + "no free fixed buffer slots available".to_string(), + )); + }; unsafe { inner.ring.ring.submitter().register_buffers_update( - default_id, + id as u32, &[libc::iovec { iov_base: ptr.as_ptr() as *mut libc::c_void, iov_len: len, @@ -494,7 +520,7 @@ impl IO for UringIO { None, )? }; - Ok(()) + Ok(id as u32) } } diff --git a/core/storage/buffer_pool.rs b/core/storage/buffer_pool.rs index 5f824b9cb..c1ede775d 100644 --- a/core/storage/buffer_pool.rs +++ b/core/storage/buffer_pool.rs @@ -42,7 +42,7 @@ impl ArenaBuffer { } /// The requested size of the allocation. The actual size is always rounded up to - /// the next multiple of ARENA_PAGE_SIZE. + /// the next multiple of the arena.page_size pub const fn logical_len(&self) -> usize { self.len } From 3cff47e490efe6d9dc28e6854b167ad454594540 Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Sun, 3 Aug 2025 15:04:07 -0400 Subject: [PATCH 15/29] Fix btree test to properly initialize pool --- core/storage/btree.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/core/storage/btree.rs b/core/storage/btree.rs index b292357a0..4530ee478 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -8451,8 +8451,7 @@ mod tests { let page_size = 512; let io: Arc = Arc::new(MemoryIO::new()); - let buffer_pool = - BufferPool::begin_init(&io, page_size * 128).finalize_with_page_size(page_size); + let buffer_pool = BufferPool::begin_init(&io, page_size * 128); let db_file = Arc::new(DatabaseFile::new( io.open_file(":memory:", OpenFlags::Create, false).unwrap(), @@ -8489,7 +8488,9 @@ mod tests { pager .io .block(|| { - pager.with_header_mut(|header| header.page_size = PageSize::new(page_size).unwrap()) + pager.with_header_mut(|header| { + header.page_size = PageSize::new(page_size as u32).unwrap() + }) }) .unwrap(); From 0ffba81216224ec2532e006db28e7dfd77c22e01 Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Sun, 3 Aug 2025 15:04:49 -0400 Subject: [PATCH 16/29] Make register buffer io trait return the buf index --- core/io/mod.rs | 2 +- core/storage/buffer_pool.rs | 270 ++++++++++++++++++++---------------- 2 files changed, 150 insertions(+), 122 deletions(-) diff --git a/core/io/mod.rs b/core/io/mod.rs index 0bded8c49..cce30f52b 100644 --- a/core/io/mod.rs +++ b/core/io/mod.rs @@ -124,7 +124,7 @@ pub trait IO: Clock + Send + Sync { fn get_memory_io(&self) -> Arc; - fn register_fixed_buffer(&self, _ptr: NonNull, _len: usize) -> Result<()> { + fn register_fixed_buffer(&self, _ptr: NonNull, _len: usize) -> Result { Err(crate::LimboError::InternalError( "unsupported operation".to_string(), )) diff --git a/core/storage/buffer_pool.rs b/core/storage/buffer_pool.rs index c1ede775d..8d0ad2248 100644 --- a/core/storage/buffer_pool.rs +++ b/core/storage/buffer_pool.rs @@ -1,12 +1,13 @@ use crate::fast_lock::SpinLock; use crate::io::TEMP_BUFFER_CACHE; +use crate::storage::page_bitmap::PageBitmap; use crate::storage::sqlite3_ondisk::WAL_FRAME_HEADER_SIZE; use crate::{turso_assert, Buffer, LimboError, IO}; use parking_lot::Mutex; use std::cell::UnsafeCell; use std::ptr::NonNull; use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering}; -use std::sync::{Arc, OnceLock}; +use std::sync::{Arc, OnceLock, Weak}; pub static BUFFER_POOL: OnceLock> = OnceLock::new(); @@ -19,8 +20,6 @@ pub struct ArenaBuffer { len: usize, } -const REGISTERED_ID: u32 = 0; - impl ArenaBuffer { const fn new(ptr: NonNull, len: usize, arena_id: u32, page_idx: u32) -> Self { ArenaBuffer { @@ -32,17 +31,17 @@ impl ArenaBuffer { } #[inline(always)] - /// Returns the `id` of the underlying arena if it is registered with `io_uring` + /// Returns the `id` of the underlying arena, only if it was registered with `io_uring` pub fn fixed_id(&self) -> Option { - if self.arena_id == REGISTERED_ID { - Some(REGISTERED_ID) + if self.arena_id < UNREGISTERED_START { + Some(self.arena_id) } else { None } } - /// The requested size of the allocation. The actual size is always rounded up to - /// the next multiple of the arena.page_size + /// The requested size of the allocation, the actual size of the underlying buffer is rounded up to + /// the next multiple of the arena's page_size pub const fn logical_len(&self) -> usize { self.len } @@ -59,7 +58,8 @@ impl Drop for ArenaBuffer { let pool = BUFFER_POOL .get() .expect("BufferPool not initialized, cannot free ArenaBuffer"); - pool.free(self.logical_len(), self.arena_id, self.page_idx); + let inner = pool.inner_mut(); + inner.free(self.logical_len(), self.arena_id, self.page_idx); } } @@ -85,7 +85,8 @@ unsafe impl Send for BufferPool {} struct PoolInner { io: Option>, - arena: Option, + page_arena: Option, + wal_frame_arena: Option, init_lock: Mutex<()>, arena_size: AtomicUsize, db_page_size: AtomicUsize, @@ -114,7 +115,8 @@ impl BufferPool { ); Self { inner: UnsafeCell::new(PoolInner { - arena: None, + page_arena: None, + wal_frame_arena: None, arena_size: arena_size.into(), db_page_size: Self::DEFAULT_PAGE_SIZE.into(), init_lock: Mutex::new(()), @@ -129,8 +131,8 @@ impl BufferPool { } pub fn get_page(&self) -> Buffer { - let inner = self.inner(); - inner.allocate(inner.db_page_size.load(Ordering::Relaxed)) + let inner = self.inner_mut(); + inner.get_page() } fn inner(&self) -> &PoolInner { @@ -157,7 +159,8 @@ impl BufferPool { } /// Call when `[Database::db_state]` is initialized, providing the `page_size` to allocate - /// an arena for the pool. Before this call, the pool will use temporary buffers + /// an arena for the pool. Before this call, the pool will use temporary buffers which are + /// cached in thread local storage. pub fn finalize_with_page_size(&self, page_size: usize) -> crate::Result> { let pool = BUFFER_POOL.get().expect("BufferPool must be initialized"); let inner = pool.inner_mut(); @@ -169,11 +172,11 @@ impl BufferPool { cache.borrow_mut().reinit_cache(page_size); }); } - if inner.arena.is_some() { + if inner.page_arena.is_some() { return Ok(pool.clone()); } inner.db_page_size.store(page_size, Ordering::Relaxed); - inner.init_arena()?; + inner.init_arenas()?; Ok(pool.clone()) } @@ -188,7 +191,27 @@ impl PoolInner { /// temporary thread local buffers if the pool is not initialized or full pub fn allocate(&self, len: usize) -> Buffer { turso_assert!(len > 0, "Cannot allocate zero-length buffer"); - let Some(arena) = self.arena.as_ref() else { + + let db_page_size = self.db_page_size.load(Ordering::Relaxed); + let wal_frame_size = db_page_size + WAL_FRAME_HEADER_SIZE; + + // Check if this is exactly a WAL frame size allocation + if len == wal_frame_size { + let Some(wal_arena) = self.wal_frame_arena.as_ref() else { + return Buffer::new_temporary(len); + }; + if let Some(FreeEntry { ptr, first_idx }) = wal_arena.try_alloc(len) { + tracing::trace!( + "Allocated WAL frame buffer of length {} from arena {} at index {}", + len, + wal_arena.id, + first_idx + ); + return Buffer::new_pooled(ArenaBuffer::new(ptr, len, wal_arena.id, first_idx)); + } + } + // For all other sizes, use regular arena + let Some(arena) = self.page_arena.as_ref() else { // pool isn't fully initialized, return temporary buffer return Buffer::new_temporary(len); }; @@ -204,42 +227,91 @@ impl PoolInner { Buffer::new_temporary(len) } + fn get_page(&mut self) -> Buffer { + let db_page_size = self.db_page_size.load(Ordering::Relaxed); + let Some(arena) = self.page_arena.as_ref() else { + return Buffer::new_temporary(db_page_size); + }; + if let Some(FreeEntry { ptr, first_idx }) = arena.try_alloc(db_page_size) { + tracing::trace!( + "Allocated page buffer of size {} from arena {} at index {}", + db_page_size, + arena.id, + first_idx + ); + return Buffer::new_pooled(ArenaBuffer::new(ptr, db_page_size, arena.id, first_idx)); + } + Buffer::new_temporary(db_page_size) + } + /// Allocate a new arena for the pool to use - fn init_arena(&mut self) -> crate::Result<()> { + fn init_arenas(&mut self) -> crate::Result<()> { // Prevent concurrent growth let Some(_guard) = self.init_lock.try_lock() else { tracing::debug!("Buffer pool is already growing, skipping initialization"); return Ok(()); // Already in progress }; let arena_size = self.arena_size.load(Ordering::Relaxed); + let db_page_size = self.db_page_size.load(Ordering::Relaxed); let io = self.io.as_ref().expect("Pool not initialized").clone(); - match Arena::new( - self.db_page_size.load(Ordering::Relaxed) + WAL_FRAME_HEADER_SIZE, - arena_size, - &io, - ) { + + // Create regular page arena + match Arena::new(db_page_size, arena_size, &io) { Ok(arena) => { tracing::trace!( - "added arena {} with size {} MB", + "added arena {} with size {} MB and page size {}", arena.id, - arena_size / (1024 * 1024) + arena_size / (1024 * 1024), + db_page_size ); - self.arena = Some(arena); - Ok(()) + self.page_arena = Some(arena); } Err(e) => { - tracing::error!("Failed to create new arena: {:?}", e); - Err(LimboError::InternalError(format!( - "Failed to create new arena: {e}", - ))) + tracing::error!("Failed to create arena: {:?}", e); + return Err(LimboError::InternalError(format!( + "Failed to create arena: {e}", + ))); } } + + // Create WAL frame arena + let wal_frame_size = db_page_size + WAL_FRAME_HEADER_SIZE; + match Arena::new(wal_frame_size, arena_size, &io) { + Ok(arena) => { + tracing::trace!( + "added WAL frame arena {} with size {} MB and page size {}", + arena.id, + arena_size / (1024 * 1024), + wal_frame_size + ); + self.wal_frame_arena = Some(arena); + } + Err(e) => { + tracing::error!("Failed to create WAL frame arena: {:?}", e); + return Err(LimboError::InternalError(format!( + "Failed to create WAL frame arena: {e}", + ))); + } + } + + Ok(()) } - pub fn free(&mut self, size: usize, arena_id: u32, page_idx: u32) { + pub fn free(&self, size: usize, arena_id: u32, page_idx: u32) { + // Check WAL frame arena first + if let Some(wal_arena) = self.wal_frame_arena.as_ref() { + if arena_id == wal_arena.id { + let pages = size.div_ceil(wal_arena.page_size); + tracing::trace!("Freeing {} pages from WAL frame arena {}", pages, arena_id); + wal_arena.free(page_idx, pages); + return; + } + } + + // Otherwise use regular arena let arena = self - .arena - .as_mut() + .page_arena + .as_ref() .expect("pool arena not initialized, cannot free buffer"); let pages = size.div_ceil(arena.page_size); tracing::trace!("Freeing {} pages from arena {}", pages, arena_id); @@ -252,15 +324,22 @@ impl PoolInner { } } -/// A single memory arena +/// Preallocated block of memory used by the pool to distribute Buffers struct Arena { /// Identifier to tie allocations back to the arena id: u32, /// base pointer to the arena returned by `mmap` base: NonNull, + /// Total number of pages currently allocated from this arena allocated_pages: AtomicUsize, - free_pages: SpinLock>, + /// Currently free pages + free_pages: SpinLock, + /// Total size of the arena in bytes arena_size: usize, + /// Page size the total arena is divided into. + /// Because most allocations are of size `[Database::page_size]`, with an + /// additional 24 byte wal frame header, we treat this as the `page_size` to reduce + /// fragmentation to 24 bytes for regular pages page_size: usize, } @@ -275,11 +354,11 @@ struct FreeEntry { first_idx: u32, } -const UNREGISTERED: u32 = 1; +const UNREGISTERED_START: u32 = 2; /// For an arena which isn't registered with `io_uring` -/// registered arena will always have id = 0 -static NEXT_ID: AtomicU32 = AtomicU32::new(UNREGISTERED); +/// registered arena will always have id = 0..=1 +static NEXT_ID: AtomicU32 = AtomicU32::new(UNREGISTERED_START); impl Arena { /// Create a new arena with the given size and page size. @@ -287,16 +366,19 @@ impl Arena { let ptr = unsafe { arena::alloc(arena_size) }; let base = NonNull::new(ptr).ok_or("Failed to allocate arena")?; let total_pages = arena_size / page_size; - let id = if io.register_fixed_buffer(base, arena_size).is_ok() { - REGISTERED_ID - } else { - NEXT_ID.fetch_add(1, Ordering::Relaxed) - }; + let id = io + .register_fixed_buffer(base, arena_size) + .unwrap_or_else(|_| { + // Register with io_uring if possible, otherwise use next available ID + let next_id = NEXT_ID.fetch_add(1, Ordering::Relaxed); + tracing::trace!("Allocating arena with id {}", next_id); + next_id + }); Ok(Self { id, base, - free_pages: SpinLock::new((0..total_pages as u32).collect()), + free_pages: SpinLock::new(PageBitmap::new(total_pages as u32)), allocated_pages: AtomicUsize::new(0), page_size, arena_size, @@ -304,88 +386,34 @@ impl Arena { } pub fn try_alloc(&self, size: usize) -> Option { - let pages = size.div_ceil(self.page_size); - let mut free = self.free_pages.lock(); - if free.len() < pages { - return None; - } - if pages == 1 { - // fast path: for now, most/all allocations are single pages - if let Some(page_idx) = free.pop() { - self.allocated_pages.fetch_add(pages, Ordering::Relaxed); - let offset = page_idx as usize * self.page_size; - let ptr = unsafe { NonNull::new_unchecked(self.base.as_ptr().add(offset)) }; - tracing::trace!( - "Allocated single page at index {} from arena {}", - page_idx, - self.id - ); - return Some(FreeEntry { - ptr, - first_idx: page_idx, - }); - } + let pages = size.div_ceil(self.page_size) as u32; + let mut freemap = self.free_pages.lock(); + + let first_idx = if pages == 1 { + freemap.alloc_one()? } else { - return self.try_alloc_many(pages); - } - None + freemap.alloc_run(pages)? + }; + + self.allocated_pages + .fetch_add(pages as usize, Ordering::Relaxed); + let offset = first_idx as usize * self.page_size; + let ptr = unsafe { NonNull::new_unchecked(self.base.as_ptr().add(offset)) }; + Some(FreeEntry { ptr, first_idx }) + } + + /// Allocate a contiguous run of `pages`. Returns the first page index. + #[inline] + pub fn alloc_run(&self, pages: u32) -> Option { + let mut bm = self.free_pages.lock(); + bm.alloc_run(pages) } - /// Free pages back to this arena pub fn free(&self, page_idx: u32, count: usize) { - let mut free = self.free_pages.lock(); - // Add pages back to freelist - for i in 0..count { - free.push(page_idx + i as u32); - } + let mut bm = self.free_pages.lock(); + bm.free_run(page_idx, count as u32); self.allocated_pages.fetch_sub(count, Ordering::Relaxed); } - - #[cold] - fn try_alloc_many(&self, pages: usize) -> Option { - // TODO (preston): we can optimize this further when we start allocating larger - // contiguous blocks for coalescing. this is 'unused' for now - let mut free = self.free_pages.lock(); - if pages <= 3 && free.len() >= pages { - let start = free.len() - pages; - let first_idx = free[start]; - - let mut consecutive = true; - for j in 1..pages { - if free[start + j] != first_idx + j as u32 { - consecutive = false; - break; - } - } - if consecutive { - free.truncate(start); - self.allocated_pages.fetch_add(pages, Ordering::Relaxed); - let offset = first_idx as usize * self.page_size; - let ptr = unsafe { NonNull::new_unchecked(self.base.as_ptr().add(offset)) }; - return Some(FreeEntry { ptr, first_idx }); - } - } - // Fall back to searching from the beginning - for i in 0..free.len().saturating_sub(pages - 1) { - let first_idx = free[i]; - let mut consecutive = true; - for j in 1..pages { - if free[i + j] != first_idx + j as u32 { - consecutive = false; - break; - } - } - - if consecutive { - free.drain(i..i + pages); - self.allocated_pages.fetch_add(pages, Ordering::Relaxed); - let offset = first_idx as usize * self.page_size; - let ptr = unsafe { NonNull::new_unchecked(self.base.as_ptr().add(offset)) }; - return Some(FreeEntry { ptr, first_idx }); - } - } - None - } } #[cfg(unix)] From 39d230a8997df5cd972e6a67494f2817f2671ec6 Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Sun, 3 Aug 2025 15:05:19 -0400 Subject: [PATCH 17/29] Add bitmap for tracking pages in arena --- core/io/io_uring.rs | 7 ------- core/storage/buffer_pool.rs | 12 ++---------- core/storage/pager.rs | 4 ++-- 3 files changed, 4 insertions(+), 19 deletions(-) diff --git a/core/io/io_uring.rs b/core/io/io_uring.rs index 183fdda1e..a45f558f1 100644 --- a/core/io/io_uring.rs +++ b/core/io/io_uring.rs @@ -312,13 +312,6 @@ impl WrappedIOUring { }); let mut iov_count = 0; let mut last_end: Option<(*const u8, usize)> = None; - for (idx, buffer) in st - .bufs - .iter() - .enumerate() - .skip(st.current_buffer_idx) - .take(MAX_IOVEC_ENTRIES) - { for buffer in st.bufs.iter().skip(st.current_buffer_idx) { let ptr = buffer.as_ptr(); let len = buffer.len(); diff --git a/core/storage/buffer_pool.rs b/core/storage/buffer_pool.rs index 8d0ad2248..09d18a7db 100644 --- a/core/storage/buffer_pool.rs +++ b/core/storage/buffer_pool.rs @@ -7,7 +7,7 @@ use parking_lot::Mutex; use std::cell::UnsafeCell; use std::ptr::NonNull; use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering}; -use std::sync::{Arc, OnceLock, Weak}; +use std::sync::{Arc, OnceLock}; pub static BUFFER_POOL: OnceLock> = OnceLock::new(); @@ -298,7 +298,7 @@ impl PoolInner { } pub fn free(&self, size: usize, arena_id: u32, page_idx: u32) { - // Check WAL frame arena first + // common case: check WAL frame arena first if let Some(wal_arena) = self.wal_frame_arena.as_ref() { if arena_id == wal_arena.id { let pages = size.div_ceil(wal_arena.page_size); @@ -307,7 +307,6 @@ impl PoolInner { return; } } - // Otherwise use regular arena let arena = self .page_arena @@ -402,13 +401,6 @@ impl Arena { Some(FreeEntry { ptr, first_idx }) } - /// Allocate a contiguous run of `pages`. Returns the first page index. - #[inline] - pub fn alloc_run(&self, pages: u32) -> Option { - let mut bm = self.free_pages.lock(); - bm.alloc_run(pages) - } - pub fn free(&self, page_idx: u32, count: usize) { let mut bm = self.free_pages.lock(); bm.free_run(page_idx, count as u32); diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 74cfa1594..a736a1805 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -10,8 +10,8 @@ use crate::storage::{ }; use crate::util::IOExt as _; use crate::{ - return_if_io, turso_assert, Completion, Connection, IOResult, LimboError, Result, - TransactionState, WalFrameInfo, + return_if_io, turso_assert, types::WalFrameInfo, Completion, Connection, IOResult, LimboError, + Result, TransactionState, }; use parking_lot::RwLock; use std::cell::{Cell, OnceCell, UnsafeCell}; From f9df267f42ff74e758ae44abc8dfa891f1061204 Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Mon, 4 Aug 2025 21:41:56 -0400 Subject: [PATCH 18/29] Increase test arena size and fix import --- core/storage/buffer_pool.rs | 2 +- tests/integration/query_processing/test_btree.rs | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/core/storage/buffer_pool.rs b/core/storage/buffer_pool.rs index 09d18a7db..db29134ec 100644 --- a/core/storage/buffer_pool.rs +++ b/core/storage/buffer_pool.rs @@ -103,7 +103,7 @@ impl Default for BufferPool { impl BufferPool { pub const DEFAULT_ARENA_SIZE: usize = 4 * 1024 * 1024; // 4MB arena - pub const TEST_AREA_SIZE: usize = 512 * 1024; // 512KB arena for testing + pub const TEST_AREA_SIZE: usize = 1 * 1024 * 1024; // 1MB arena for testing pub const DEFAULT_PAGE_SIZE: usize = 4096; // 4KB default page size const MAX_ARENA_SIZE: usize = 32 * 1024 * 1024; // 32MB max arena diff --git a/tests/integration/query_processing/test_btree.rs b/tests/integration/query_processing/test_btree.rs index 7ad4ac828..0e6571698 100644 --- a/tests/integration/query_processing/test_btree.rs +++ b/tests/integration/query_processing/test_btree.rs @@ -1,7 +1,6 @@ use std::{ collections::{HashMap, HashSet}, path::Path, - pin::Pin, rc::Rc, sync::Arc, }; From d94e252ef9bc27fabad43aed0bb4c59b91ef5ca7 Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Mon, 4 Aug 2025 22:17:52 -0400 Subject: [PATCH 19/29] Actually pre-register two arenas with the ring --- core/io/io_uring.rs | 27 ++++++++++++++++----------- core/storage/buffer_pool.rs | 2 +- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/core/io/io_uring.rs b/core/io/io_uring.rs index a45f558f1..ffd1da137 100644 --- a/core/io/io_uring.rs +++ b/core/io/io_uring.rs @@ -5,6 +5,7 @@ use crate::io::clock::{Clock, Instant}; use crate::storage::wal::CKPT_BATCH_PAGES; use crate::{turso_assert, LimboError, MemoryIO, Result}; use rustix::fs::{self, FlockOperation, OFlags}; +use std::ptr::NonNull; use std::{ cell::RefCell, collections::{HashMap, VecDeque}, @@ -40,6 +41,9 @@ const MAX_IOVEC_ENTRIES: usize = CKPT_BATCH_PAGES; /// make, but can increase single operation latency. const MAX_WAIT: usize = 4; +/// One memory arena for DB pages and another for WAL frames +const ARENAS: usize = 2; + pub struct UringIO { inner: Rc>, } @@ -57,7 +61,7 @@ struct WrappedIOUring { struct InnerUringIO { ring: WrappedIOUring, free_files: VecDeque, - free_arenas: [u32; 2], + free_arenas: [Option<(NonNull, usize)>; 2], } /// preallocated vec of iovec arrays to avoid allocations during writev operations @@ -109,7 +113,7 @@ impl UringIO { // RL_MEMLOCK cap is typically 8MB, the current design is to have one large arena // registered at startup and therefore we can simply use the zero index, falling back // to similar logic as the existing buffer pool for cases where it is over capacity. - ring.submitter().register_buffers_sparse(1)?; + ring.submitter().register_buffers_sparse(ARENAS as u32)?; let inner = InnerUringIO { ring: WrappedIOUring { ring, @@ -118,7 +122,7 @@ impl UringIO { iov_pool: IovecPool::new(), }, free_files: (0..FILES).collect(), - free_arenas: [u32::MAX; 2], + free_arenas: [const { None }; ARENAS], }; debug!("Using IO backend 'io-uring'"); Ok(Self { @@ -497,15 +501,15 @@ impl IO for UringIO { len % 512 == 0, "fixed buffer length must be logical block aligned" ); - let inner = self.inner.borrow_mut(); - let Some(id) = inner.free_arenas.iter().position(|&x| x == u32::MAX) else { - return Err(LimboError::UringIOError( - "no free fixed buffer slots available".to_string(), - )); - }; + let mut inner = self.inner.borrow_mut(); + let slot = inner + .free_arenas + .iter() + .position(|e| e.is_none()) + .ok_or_else(|| LimboError::UringIOError("no free fixed buffer slots".into()))?; unsafe { inner.ring.ring.submitter().register_buffers_update( - id as u32, + slot as u32, &[libc::iovec { iov_base: ptr.as_ptr() as *mut libc::c_void, iov_len: len, @@ -513,7 +517,8 @@ impl IO for UringIO { None, )? }; - Ok(id as u32) + inner.free_arenas[slot] = Some((ptr, len)); + Ok(slot as u32) } } diff --git a/core/storage/buffer_pool.rs b/core/storage/buffer_pool.rs index db29134ec..05529505f 100644 --- a/core/storage/buffer_pool.rs +++ b/core/storage/buffer_pool.rs @@ -103,7 +103,7 @@ impl Default for BufferPool { impl BufferPool { pub const DEFAULT_ARENA_SIZE: usize = 4 * 1024 * 1024; // 4MB arena - pub const TEST_AREA_SIZE: usize = 1 * 1024 * 1024; // 1MB arena for testing + pub const TEST_AREA_SIZE: usize = 1024 * 1024; // 1MB arena for testing pub const DEFAULT_PAGE_SIZE: usize = 4096; // 4KB default page size const MAX_ARENA_SIZE: usize = 32 * 1024 * 1024; // 32MB max arena From 66964fd8d2dccf075d32e053d48fde587eb935e3 Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Tue, 5 Aug 2025 12:50:47 -0400 Subject: [PATCH 20/29] Add documentation and comments to new buffer pool, add get_frame api --- core/io/io_uring.rs | 36 ++++++---- core/storage/buffer_pool.rs | 124 ++++++++++++++++++++++++--------- core/storage/sqlite3_ondisk.rs | 9 ++- core/storage/wal.rs | 4 +- 4 files changed, 122 insertions(+), 51 deletions(-) diff --git a/core/io/io_uring.rs b/core/io/io_uring.rs index ffd1da137..e9a483159 100644 --- a/core/io/io_uring.rs +++ b/core/io/io_uring.rs @@ -269,6 +269,18 @@ impl InnerUringIO { self.free_files.push_back(id); Ok(()) } + + #[cfg(debug_assertions)] + fn debug_check_fixed(&self, idx: u32, ptr: *const u8, len: usize) { + let (base, blen) = self.free_arenas[idx as usize].expect("slot not registered"); + let start = base.as_ptr() as usize; + let end = start + blen; + let p = ptr as usize; + assert!( + p >= start && p + len <= end, + "Fixed operation, pointer out of registered range" + ); + } } impl WrappedIOUring { @@ -564,7 +576,6 @@ impl UringFile { self.id } } - unsafe impl Send for UringFile {} unsafe impl Sync for UringFile {} @@ -641,26 +652,27 @@ impl File for UringFile { fn pwrite(&self, pos: usize, buffer: Arc, c: Completion) -> Result { let mut io = self.io.borrow_mut(); let write = { + let ptr = buf.as_ptr(); + let len = buf.len(); with_fd!(self, |fd| { if let Some(idx) = buf.fixed_id() { trace!( "pwrite_fixed(pos = {}, length = {}, idx= {})", pos, - buffer.len(), + len, idx ); - io_uring::opcode::WriteFixed::new( - fd, - buffer.as_ptr(), - buffer.len() as u32, - idx as u16, - ) - .offset(pos as u64) - .build() - .user_data(get_key(c.clone())) + #[cfg(debug_assertions)] + { + io.debug_check_fixed(idx, ptr, len); + } + io_uring::opcode::WriteFixed::new(fd, ptr, len as u32, idx as u16) + .offset(pos as u64) + .build() + .user_data(get_key(c.clone())) } else { trace!("pwrite(pos = {}, length = {})", pos, buffer.len()); - io_uring::opcode::Write::new(fd, buffer.as_ptr(), buffer.len() as u32) + io_uring::opcode::Write::new(fd, ptr, len as u32) .offset(pos as u64) .build() .user_data(get_key(c.clone())) diff --git a/core/storage/buffer_pool.rs b/core/storage/buffer_pool.rs index 05529505f..f2802febe 100644 --- a/core/storage/buffer_pool.rs +++ b/core/storage/buffer_pool.rs @@ -14,9 +14,16 @@ pub static BUFFER_POOL: OnceLock> = OnceLock::new(); #[derive(Debug)] /// A buffer allocated from an arena from `[BufferPool]` pub struct ArenaBuffer { + /// Pointer to the start of the buffer ptr: NonNull, + /// Identifier for the `[Arena]` the buffer came from arena_id: u32, + /// The index of the first page making up the buffer page_idx: u32, + /// The requested length of the allocation. + /// The actual size of what is allocated for the + /// buffer is `len` rounded up to the next multiple of + /// `[Arena::page_size]` len: usize, } @@ -32,7 +39,8 @@ impl ArenaBuffer { #[inline(always)] /// Returns the `id` of the underlying arena, only if it was registered with `io_uring` - pub fn fixed_id(&self) -> Option { + pub const fn fixed_id(&self) -> Option { + // Arenas which are not registered will have `id`s <= UNREGISTERED_START if self.arena_id < UNREGISTERED_START { Some(self.arena_id) } else { @@ -76,7 +84,8 @@ impl std::ops::DerefMut for ArenaBuffer { } } -/// Static Buffer pool managing multiple arenas +/// Static Buffer pool managing multiple memory arenas +/// of which `[ArenaBuffer]`s are returned for requested allocations pub struct BufferPool { inner: UnsafeCell, } @@ -84,11 +93,21 @@ unsafe impl Sync for BufferPool {} unsafe impl Send for BufferPool {} struct PoolInner { + /// An instance of the program's IO, used for registering + /// Arena's with io_uring. io: Option>, + /// An Arena which returns `ArenaBuffer`s of size `db_page_size`. page_arena: Option, + /// An Arena which returns `ArenaBuffer`s of size `db_page_size` + /// plus 24 byte `WAL_FRAME_HEADER_SIZE`, preventing the fragmentation + /// or complex book-keeping needed to use the same arena for both sizes. wal_frame_arena: Option, + /// A lock preventing concurrent initialization. init_lock: Mutex<()>, + /// The size of each `Arena`, in bytes. arena_size: AtomicUsize, + /// The `[Database::page_size]`, which the `page_arena` will use to + /// return buffers from `Self::get_page`. db_page_size: AtomicUsize, } @@ -102,12 +121,16 @@ impl Default for BufferPool { } impl BufferPool { - pub const DEFAULT_ARENA_SIZE: usize = 4 * 1024 * 1024; // 4MB arena - pub const TEST_AREA_SIZE: usize = 1024 * 1024; // 1MB arena for testing - pub const DEFAULT_PAGE_SIZE: usize = 4096; // 4KB default page size - const MAX_ARENA_SIZE: usize = 32 * 1024 * 1024; // 32MB max arena + /// 4MB Default size for each `Arena` + pub const DEFAULT_ARENA_SIZE: usize = 4 * 1024 * 1024; + /// 1MB size For testing/CI + pub const TEST_AREA_SIZE: usize = 1024 * 1024; + /// 4KB default page_size + pub const DEFAULT_PAGE_SIZE: usize = 4096; + /// Maximum size for each Arena (64MB total) + const MAX_ARENA_SIZE: usize = 32 * 1024 * 1024; - pub fn new(arena_size: usize) -> Self { + fn new(arena_size: usize) -> Self { turso_assert!( arena_size < Self::MAX_ARENA_SIZE, "Arena size cannot exceed {} bytes", @@ -125,16 +148,26 @@ impl BufferPool { } } + /// Request a `Buffer` of size `len` pub fn allocate(len: usize) -> Buffer { let pool = BUFFER_POOL.get().expect("BufferPool must be initialized"); pool.inner().allocate(len) } + /// Request a `Buffer` the size of the `db_page_size` the `BufferPool` + /// was initialized with. pub fn get_page(&self) -> Buffer { let inner = self.inner_mut(); inner.get_page() } + /// Request a `Buffer` for use with a WAL frame, + /// `[Database::page_size] + `WAL_FRAME_HEADER_SIZE` + pub fn get_wal_frame(&self) -> Buffer { + let inner = self.inner_mut(); + inner.get_frame() + } + fn inner(&self) -> &PoolInner { unsafe { &*self.inner.get() } } @@ -145,8 +178,8 @@ impl BufferPool { } /// Create a static `BufferPool` initialize the pool to the default page size, **without** - /// creating an arena. Arena will be created when `[BufferPool::finalize_page_size]` is called. - /// Until then the pool will return temporary buffers to prevent reallocation of the + /// poulating the Arenas. Arenas will not be created until `[BufferPool::finalize_page_size]`, + /// and the pool will temporarily return temporary buffers to prevent reallocation of the /// arena if the page size is set to something other than the default value. pub fn begin_init(io: &Arc, arena_size: usize) -> Arc { let pool = BUFFER_POOL.get_or_init(|| Arc::new(BufferPool::new(arena_size))); @@ -181,6 +214,8 @@ impl BufferPool { } #[inline] + /// Marks the underlying pages for an `ArenaBuffer` on `Drop` as free and + /// available for use by another allocation. pub fn free(&self, size: usize, arena_id: u32, page_id: u32) { self.inner_mut().free(size, arena_id, page_id); } @@ -188,7 +223,7 @@ impl BufferPool { impl PoolInner { /// Allocate a buffer of the given length from the pool, falling back to - /// temporary thread local buffers if the pool is not initialized or full + /// temporary thread local buffers if the pool is not initialized or full. pub fn allocate(&self, len: usize) -> Buffer { turso_assert!(len > 0, "Cannot allocate zero-length buffer"); @@ -244,6 +279,23 @@ impl PoolInner { Buffer::new_temporary(db_page_size) } + fn get_frame(&mut self) -> Buffer { + let len = self.db_page_size.load(Ordering::Relaxed) + WAL_FRAME_HEADER_SIZE; + let Some(wal_arena) = self.wal_frame_arena.as_ref() else { + return Buffer::new_temporary(len); + }; + if let Some(FreeEntry { ptr, first_idx }) = wal_arena.try_alloc(len) { + tracing::trace!( + "Allocated WAL frame buffer of length {} from arena {} at index {}", + len, + wal_arena.id, + first_idx + ); + return Buffer::new_pooled(ArenaBuffer::new(ptr, len, wal_arena.id, first_idx)); + } + Buffer::new_temporary(len) + } + /// Allocate a new arena for the pool to use fn init_arenas(&mut self) -> crate::Result<()> { // Prevent concurrent growth @@ -297,8 +349,17 @@ impl PoolInner { Ok(()) } - pub fn free(&self, size: usize, arena_id: u32, page_idx: u32) { - // common case: check WAL frame arena first + fn free(&self, size: usize, arena_id: u32, page_idx: u32) { + // allocations of size `page_size` are more common, so we check that arena first. + if let Some(arena) = self.page_arena.as_ref() { + if arena_id == arena.id { + let pages = size.div_ceil(arena.page_size); + tracing::trace!("Freeing {} pages from arena {}", pages, arena_id); + arena.free(page_idx, pages); + return; + } + } + // check WAL frame arena if let Some(wal_arena) = self.wal_frame_arena.as_ref() { if arena_id == wal_arena.id { let pages = size.div_ceil(wal_arena.page_size); @@ -307,38 +368,28 @@ impl PoolInner { return; } } - // Otherwise use regular arena - let arena = self - .page_arena - .as_ref() - .expect("pool arena not initialized, cannot free buffer"); - let pages = size.div_ceil(arena.page_size); - tracing::trace!("Freeing {} pages from arena {}", pages, arena_id); - turso_assert!( - arena_id == arena.id, - "should not free from different arena. {arena_id} != {}", - arena.id - ); - arena.free(page_idx, pages); + panic!("ArenaBuffer freed with no available parent Arena"); } } -/// Preallocated block of memory used by the pool to distribute Buffers +/// Preallocated block of memory used by the pool to distribute `ArenaBuffer`s struct Arena { - /// Identifier to tie allocations back to the arena + /// Identifier to tie allocations back to the arena. If the arena is registerd + /// with `io_uring`, then the ID represents the index of the arena into the ring's + /// sparse registered buffer array created on the ring's initialization. id: u32, - /// base pointer to the arena returned by `mmap` + /// Base pointer to the arena returned by `mmap` base: NonNull, - /// Total number of pages currently allocated from this arena + /// Total number of pages currently allocated/in use. allocated_pages: AtomicUsize, - /// Currently free pages + /// Currently free pages. free_pages: SpinLock, /// Total size of the arena in bytes arena_size: usize, /// Page size the total arena is divided into. /// Because most allocations are of size `[Database::page_size]`, with an /// additional 24 byte wal frame header, we treat this as the `page_size` to reduce - /// fragmentation to 24 bytes for regular pages + /// fragmentation to 24 bytes for regular pages. page_size: usize, } @@ -353,9 +404,11 @@ struct FreeEntry { first_idx: u32, } +/// Slots 0 and 1 will be reserved for Arenas which are registered buffers +/// with io_uring. const UNREGISTERED_START: u32 = 2; -/// For an arena which isn't registered with `io_uring` +/// ID's for an Arena which is not registered with `io_uring` /// registered arena will always have id = 0..=1 static NEXT_ID: AtomicU32 = AtomicU32::new(UNREGISTERED_START); @@ -384,16 +437,20 @@ impl Arena { }) } + /// Attempt to mark N pages as `allocated` and return a pointer to and + /// index of the first page in the allocation. pub fn try_alloc(&self, size: usize) -> Option { let pages = size.div_ceil(self.page_size) as u32; let mut freemap = self.free_pages.lock(); let first_idx = if pages == 1 { + // use the optimized method for individual pages which attempts + // to leave large contiguous areas free of fragmentation for + // larger `runs` freemap.alloc_one()? } else { freemap.alloc_run(pages)? }; - self.allocated_pages .fetch_add(pages as usize, Ordering::Relaxed); let offset = first_idx as usize * self.page_size; @@ -401,6 +458,7 @@ impl Arena { Some(FreeEntry { ptr, first_idx }) } + /// Mark `count` pages starting at `page_idx` as free. pub fn free(&self, page_idx: u32, count: usize) { let mut bm = self.free_pages.lock(); bm.free_run(page_idx, count as u32); diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index f6cf88881..d2c24770a 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -1760,15 +1760,13 @@ pub fn read_entire_wal_dumb(file: &Arc) -> Result, io: &Arc, offset: usize, - page_size: u32, complete: Box, ) -> Result { tracing::trace!("begin_read_wal_frame_raw(offset={})", offset); - let buf = Arc::new(BufferPool::allocate( - page_size as usize + WAL_FRAME_HEADER_SIZE, - )); + let buf = Arc::new(buffer_pool.get_wal_frame()); #[allow(clippy::arc_with_non_send_sync)] let c = Completion::new_read(buf, complete); let c = io.pread(offset, c)?; @@ -1810,6 +1808,7 @@ pub fn parse_wal_frame_header(frame: &[u8]) -> (WalFrameHeader, &[u8]) { } pub fn prepare_wal_frame( + buffer_pool: &Arc, wal_header: &WalHeader, prev_checksums: (u32, u32), page_size: u32, @@ -1819,7 +1818,7 @@ pub fn prepare_wal_frame( ) -> ((u32, u32), Arc) { tracing::trace!(page_number); - let buffer = BufferPool::allocate(page_size as usize + WAL_FRAME_HEADER_SIZE); + let buffer = buffer_pool.get_wal_frame(); let frame = buffer.as_mut_slice(); frame[WAL_FRAME_HEADER_SIZE..].copy_from_slice(page); diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 1f7f8e0c2..ccc92677f 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -933,7 +933,7 @@ impl Wal for WalFile { } }); let c = - begin_read_wal_frame_raw(&self.get_shared().file, offset, self.page_size(), complete)?; + begin_read_wal_frame_raw(&self.buffer_pool, &self.get_shared().file, offset, complete)?; Ok(c) } @@ -1003,6 +1003,7 @@ impl Wal for WalFile { let header = header.lock(); let checksums = self.last_checksum; let (checksums, frame_bytes) = prepare_wal_frame( + &self.buffer_pool, &header, checksums, header.page_size, @@ -1044,6 +1045,7 @@ impl Wal for WalFile { let page_content = page.get_contents(); let page_buf = page_content.as_ptr(); let (frame_checksums, frame_bytes) = prepare_wal_frame( + &self.buffer_pool, &header, checksums, header.page_size, From ff7d9fe9618c26b270d498e3d04c1a34b3bb0271 Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Tue, 5 Aug 2025 12:51:17 -0400 Subject: [PATCH 21/29] Add basic test for buffer pool initialization and basic use --- core/storage/buffer_pool.rs | 83 +++++++++++++++++++++++++++++++++++++ 1 file changed, 83 insertions(+) diff --git a/core/storage/buffer_pool.rs b/core/storage/buffer_pool.rs index f2802febe..b08074d19 100644 --- a/core/storage/buffer_pool.rs +++ b/core/storage/buffer_pool.rs @@ -513,3 +513,86 @@ mod arena { unsafe { std::alloc::dealloc(ptr, layout) }; } } + +#[cfg(test)] +pub mod tests { + use super::*; + const TEST_SIZE: usize = 4096 * 50; + + #[test] + fn test_init_pool() { + let io: Arc = Arc::new(crate::MemoryIO::new()); + let pool = BufferPool::begin_init(&io, TEST_SIZE); + pool.finalize_with_page_size(BufferPool::DEFAULT_PAGE_SIZE) + .expect("pool to initialize"); + assert_eq!(pool.get_page().len(), BufferPool::DEFAULT_PAGE_SIZE); + assert_eq!( + pool.get_wal_frame().len(), + BufferPool::DEFAULT_ARENA_SIZE + WAL_FRAME_HEADER_SIZE, + "get_wal_frame should return page_size + WAL_FRAME_HEADER_SIZE" + ); + assert!( + pool.inner_mut().page_arena.is_some(), + "page arena should be initialized" + ); + assert!( + pool.inner_mut().wal_frame_arena.is_some(), + "wal frame arena should be initialized" + ); + let inner = pool.inner_mut(); + assert!( + inner + .page_arena + .as_ref() + .is_some_and(|a| a.id >= UNREGISTERED_START), + "page arena should not have ID in registered range with MemoryIO" + ); + assert!( + inner + .wal_frame_arena + .as_ref() + .is_some_and(|a| a.id >= UNREGISTERED_START), + "wal frame arena should not have ID in registered range with MemoryIO" + ); + let page = inner.get_page(); + assert_eq!( + inner + .page_arena + .as_ref() + .unwrap() + .allocated_pages + .load(Ordering::Relaxed), + 1 + ); + drop(page); + let frame = inner.get_frame(); + assert_eq!( + inner + .wal_frame_arena + .as_ref() + .unwrap() + .allocated_pages + .load(Ordering::Relaxed), + 1 + ); + drop(frame); + assert_eq!( + inner + .wal_frame_arena + .as_ref() + .unwrap() + .allocated_pages + .load(Ordering::Relaxed), + 0 + ); + assert_eq!( + inner + .page_arena + .as_ref() + .unwrap() + .allocated_pages + .load(Ordering::Relaxed), + 0 + ); + } +} From d7e4ba21f862057f748a17ab2ce7d39e505d95f6 Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Tue, 5 Aug 2025 13:23:56 -0400 Subject: [PATCH 22/29] Add explanation for using 3mb limit --- core/io/io_uring.rs | 9 +++++++-- core/io/mod.rs | 7 ++++--- core/storage/btree.rs | 2 +- core/storage/buffer_pool.rs | 6 ++++-- 4 files changed, 16 insertions(+), 8 deletions(-) diff --git a/core/io/io_uring.rs b/core/io/io_uring.rs index e9a483159..29fc31ade 100644 --- a/core/io/io_uring.rs +++ b/core/io/io_uring.rs @@ -276,7 +276,7 @@ impl InnerUringIO { let start = base.as_ptr() as usize; let end = start + blen; let p = ptr as usize; - assert!( + turso_assert!( p >= start && p + len <= end, "Fixed operation, pointer out of registered range" ); @@ -622,6 +622,7 @@ impl File for UringFile { let mut io = self.io.borrow_mut(); let read_e = { let buf = r.buf(); + let ptr = buf.as_mut_ptr(); let len = buf.len(); with_fd!(self, |fd| { if let Some(idx) = buf.fixed_id() { @@ -631,7 +632,11 @@ impl File for UringFile { len, idx ); - io_uring::opcode::ReadFixed::new(fd, buf.as_mut_ptr(), len as u32, idx as u16) + #[cfg(debug_assertions)] + { + io.debug_check_fixed(idx, ptr, len); + } + io_uring::opcode::ReadFixed::new(fd, ptr, len as u32, idx as u16) .offset(pos as u64) .build() .user_data(get_key(c.clone())) diff --git a/core/io/mod.rs b/core/io/mod.rs index cce30f52b..7c445baa2 100644 --- a/core/io/mod.rs +++ b/core/io/mod.rs @@ -3,6 +3,7 @@ use crate::storage::sqlite3_ondisk::WAL_FRAME_HEADER_SIZE; use crate::{BufferPool, Result}; use bitflags::bitflags; use cfg_block::cfg_block; +use std::cell::RefCell; use std::fmt; use std::ptr::NonNull; use std::sync::Arc; @@ -401,10 +402,10 @@ impl Buffer { }, } } - pub fn as_mut_ptr(&mut self) -> *mut u8 { + pub fn as_mut_ptr(&self) -> *mut u8 { match self { - Self::Heap(buf) => buf.as_mut_ptr(), - Self::Pooled(buf) => buf.as_mut_ptr(), + Self::Heap(buf) => buf.as_ptr() as *mut u8, + Self::Pooled(buf) => buf.as_ptr() as *mut u8, } } } diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 4530ee478..33788b211 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -8521,7 +8521,7 @@ mod tests { .get() as usize, )); let _buf = buf.clone(); - let c = Completion::new_write(|_| { + let c = Completion::new_write(move |_| { let _ = _buf.clone(); }); let _c = pager diff --git a/core/storage/buffer_pool.rs b/core/storage/buffer_pool.rs index b08074d19..23a8ce9af 100644 --- a/core/storage/buffer_pool.rs +++ b/core/storage/buffer_pool.rs @@ -121,8 +121,10 @@ impl Default for BufferPool { } impl BufferPool { - /// 4MB Default size for each `Arena` - pub const DEFAULT_ARENA_SIZE: usize = 4 * 1024 * 1024; + /// 3MB Default size for each `Arena`. Any higher and + /// it will fail to register the second arena with io_uring due + /// to `RL_MEMLOCK` limit for un-privileged processes being 8MB total. + pub const DEFAULT_ARENA_SIZE: usize = 3 * 1024 * 1024; /// 1MB size For testing/CI pub const TEST_AREA_SIZE: usize = 1024 * 1024; /// 4KB default page_size From d41377454cb3012882b0943b1a0013cba543f9d2 Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Tue, 5 Aug 2025 14:29:16 -0400 Subject: [PATCH 23/29] Fix test asserting for arena size instead of page size --- core/storage/buffer_pool.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/storage/buffer_pool.rs b/core/storage/buffer_pool.rs index 23a8ce9af..70696827a 100644 --- a/core/storage/buffer_pool.rs +++ b/core/storage/buffer_pool.rs @@ -530,7 +530,7 @@ pub mod tests { assert_eq!(pool.get_page().len(), BufferPool::DEFAULT_PAGE_SIZE); assert_eq!( pool.get_wal_frame().len(), - BufferPool::DEFAULT_ARENA_SIZE + WAL_FRAME_HEADER_SIZE, + BufferPool::DEFAULT_PAGE_SIZE + WAL_FRAME_HEADER_SIZE, "get_wal_frame should return page_size + WAL_FRAME_HEADER_SIZE" ); assert!( From 34d90d5acbd1e946a2483306d71b7ff1b90d6c7a Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Tue, 5 Aug 2025 14:32:43 -0400 Subject: [PATCH 24/29] Remove Clone impl for Buffer and PageContent to make any copying of page data explicit --- core/io/mod.rs | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/core/io/mod.rs b/core/io/mod.rs index 7c445baa2..f637747e7 100644 --- a/core/io/mod.rs +++ b/core/io/mod.rs @@ -312,22 +312,6 @@ impl Debug for Buffer { } } -impl Clone for Buffer { - fn clone(&self) -> Self { - match self { - Self::Heap(buf) => { - let len = buf.len(); - Self::Heap(Vec::from(&buf[..len]).into_boxed_slice().into()) - } - Self::Pooled(buf) => { - // Clone pooled buffers as heap buffers - let data = Vec::from(buf.as_slice()); - Self::Heap(Pin::new(data.into_boxed_slice())) - } - } - } -} - impl Drop for Buffer { fn drop(&mut self) { let len = self.len(); From faf248df03fb574fd4dc5d3fd6d7042ff008a295 Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Tue, 5 Aug 2025 14:45:46 -0400 Subject: [PATCH 25/29] Add more docs and comments for TempBufferCache --- core/io/io_uring.rs | 8 ++--- core/io/mod.rs | 31 +++++++++++++------ core/storage/page_cache.rs | 2 +- core/storage/pager.rs | 4 +-- core/storage/wal.rs | 2 +- .../query_processing/test_btree.rs | 2 +- 6 files changed, 31 insertions(+), 18 deletions(-) diff --git a/core/io/io_uring.rs b/core/io/io_uring.rs index 29fc31ade..64204b309 100644 --- a/core/io/io_uring.rs +++ b/core/io/io_uring.rs @@ -354,7 +354,7 @@ impl WrappedIOUring { // If we have coalesced everything into a single iovec, submit as a single`pwrite` if iov_count == 1 { let entry = with_fd!(st.file_id, |fd| { - if let Some(id) = st.bufs[st.current_buffer_idx].borrow().fixed_id() { + if let Some(id) = st.bufs[st.current_buffer_idx].fixed_id() { io_uring::opcode::WriteFixed::new( fd, iov_allocation[0].iov_base as *const u8, @@ -657,10 +657,10 @@ impl File for UringFile { fn pwrite(&self, pos: usize, buffer: Arc, c: Completion) -> Result { let mut io = self.io.borrow_mut(); let write = { - let ptr = buf.as_ptr(); - let len = buf.len(); + let ptr = buffer.as_ptr(); + let len = buffer.len(); with_fd!(self, |fd| { - if let Some(idx) = buf.fixed_id() { + if let Some(idx) = buffer.fixed_id() { trace!( "pwrite_fixed(pos = {}, length = {}, idx= {})", pos, diff --git a/core/io/mod.rs b/core/io/mod.rs index f637747e7..66dbae949 100644 --- a/core/io/mod.rs +++ b/core/io/mod.rs @@ -331,6 +331,9 @@ impl Buffer { tracing::trace!("buffer::new({:?})", data); Self::Heap(Pin::new(data.into_boxed_slice())) } + + /// Returns the index of the underlying `Arena` if it was registered with + /// io_uring. Only for use with `UringIO` backend. pub fn fixed_id(&self) -> Option { match self { Self::Heap { .. } => None, @@ -376,16 +379,16 @@ impl Buffer { #[allow(clippy::mut_from_ref)] pub fn as_mut_slice(&self) -> &mut [u8] { + unsafe { std::slice::from_raw_parts_mut(self.as_mut_ptr(), self.len()) } + } + #[inline] + pub fn as_ptr(&self) -> *const u8 { match self { - Self::Heap(buf) => { - // SAFETY: The buffer is guaranteed to be valid for the lifetime of the slice - unsafe { std::slice::from_raw_parts_mut(buf.as_ptr() as *mut u8, buf.len()) } - } - Self::Pooled(buf) => unsafe { - std::slice::from_raw_parts_mut(buf.as_ptr() as *mut u8, buf.len()) - }, + Self::Heap(buf) => buf.as_ptr(), + Self::Pooled(buf) => buf.as_ptr(), } } + #[inline] pub fn as_mut_ptr(&self) -> *mut u8 { match self { Self::Heap(buf) => buf.as_ptr() as *mut u8, @@ -399,24 +402,34 @@ thread_local! { pub static TEMP_BUFFER_CACHE: RefCell = RefCell::new(TempBufferCache::new()); } +/// A cache for temporary or any additional `Buffer` allocations beyond +/// what the `BufferPool` has room for, or for use before the pool is +/// fully initialized. pub(crate) struct TempBufferCache { - // Buffers indexed by size, we only cache common sizes + /// The `[Database::page_size]` at the time the cache is initiated. page_size: usize, + /// Cache of buffers of size `self.page_size`. page_buffers: Vec, + /// Cache of buffers of size `self.page_size` + WAL_FRAME_HEADER_SIZE. wal_frame_buffers: Vec, + /// Maximum number of buffers that will live in each cache. max_cached: usize, } impl TempBufferCache { + const DEFAULT_MAX_CACHE_SIZE: usize = 256; + fn new() -> Self { Self { page_size: BufferPool::DEFAULT_PAGE_SIZE, page_buffers: Vec::with_capacity(8), wal_frame_buffers: Vec::with_capacity(8), - max_cached: 512, + max_cached: Self::DEFAULT_MAX_CACHE_SIZE, } } + /// If the `[Database::page_size]` is set, any temporary buffers that might + /// exist prior need to be cleared and new `page_size` needs to be saved. pub fn reinit_cache(&mut self, page_size: usize) { self.page_buffers.clear(); self.wal_frame_buffers.clear(); diff --git a/core/storage/page_cache.rs b/core/storage/page_cache.rs index 2297ab322..d1d6499b6 100644 --- a/core/storage/page_cache.rs +++ b/core/storage/page_cache.rs @@ -626,7 +626,7 @@ mod tests { use crate::storage::sqlite3_ondisk::PageContent; use crate::BufferPool; use std::ptr::NonNull; - use std::{num::NonZeroUsize, pin::Pin, rc::Rc, sync::Arc}; + use std::{num::NonZeroUsize, sync::Arc}; use lru::LruCache; use rand_chacha::{ diff --git a/core/storage/pager.rs b/core/storage/pager.rs index a736a1805..d44369632 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -14,7 +14,7 @@ use crate::{ Result, TransactionState, }; use parking_lot::RwLock; -use std::cell::{Cell, OnceCell, UnsafeCell}; +use std::cell::{Cell, OnceCell, RefCell, UnsafeCell}; use std::collections::HashSet; use std::hash; use std::rc::Rc; @@ -790,7 +790,7 @@ impl Pager { )?; turso_assert!( - ptrmap_page.get().id == ptrmap_pg_no as usize, + ptrmap_page.get().id == ptrmap_pg_no, "ptrmap page has unexpected number" ); self.add_dirty(&ptrmap_page); diff --git a/core/storage/wal.rs b/core/storage/wal.rs index ccc92677f..f69700545 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -927,7 +927,7 @@ impl Wal for WalFile { bytes_read == buf_len as i32, "read({bytes_read}) != expected({buf_len})" ); - let buf_ptr = buf.as_ptr(); + let buf_ptr = buf.as_mut_ptr(); unsafe { std::ptr::copy_nonoverlapping(buf_ptr, frame_ptr, frame_len); } diff --git a/tests/integration/query_processing/test_btree.rs b/tests/integration/query_processing/test_btree.rs index 0e6571698..0d174e50c 100644 --- a/tests/integration/query_processing/test_btree.rs +++ b/tests/integration/query_processing/test_btree.rs @@ -430,7 +430,7 @@ fn write_at(io: &impl IO, file: Arc, offset: usize, data: &[u8]) { #[allow(clippy::arc_with_non_send_sync)] let buffer = Arc::new(Buffer::new(data.to_vec())); let _buf = buffer.clone(); - let completion = Completion::new_write(|_| { + let completion = Completion::new_write(move |_| { // reference the buffer to keep alive for async io let _buf = _buf.clone(); }); From d182b836ef6044478907cb2e618bde0c41094b9b Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Wed, 6 Aug 2025 22:22:45 -0400 Subject: [PATCH 26/29] Remove basic test for buffer pool to avoid race conditions in CI --- core/storage/buffer_pool.rs | 83 ------------------------------------- 1 file changed, 83 deletions(-) diff --git a/core/storage/buffer_pool.rs b/core/storage/buffer_pool.rs index 70696827a..8c67ee0dd 100644 --- a/core/storage/buffer_pool.rs +++ b/core/storage/buffer_pool.rs @@ -515,86 +515,3 @@ mod arena { unsafe { std::alloc::dealloc(ptr, layout) }; } } - -#[cfg(test)] -pub mod tests { - use super::*; - const TEST_SIZE: usize = 4096 * 50; - - #[test] - fn test_init_pool() { - let io: Arc = Arc::new(crate::MemoryIO::new()); - let pool = BufferPool::begin_init(&io, TEST_SIZE); - pool.finalize_with_page_size(BufferPool::DEFAULT_PAGE_SIZE) - .expect("pool to initialize"); - assert_eq!(pool.get_page().len(), BufferPool::DEFAULT_PAGE_SIZE); - assert_eq!( - pool.get_wal_frame().len(), - BufferPool::DEFAULT_PAGE_SIZE + WAL_FRAME_HEADER_SIZE, - "get_wal_frame should return page_size + WAL_FRAME_HEADER_SIZE" - ); - assert!( - pool.inner_mut().page_arena.is_some(), - "page arena should be initialized" - ); - assert!( - pool.inner_mut().wal_frame_arena.is_some(), - "wal frame arena should be initialized" - ); - let inner = pool.inner_mut(); - assert!( - inner - .page_arena - .as_ref() - .is_some_and(|a| a.id >= UNREGISTERED_START), - "page arena should not have ID in registered range with MemoryIO" - ); - assert!( - inner - .wal_frame_arena - .as_ref() - .is_some_and(|a| a.id >= UNREGISTERED_START), - "wal frame arena should not have ID in registered range with MemoryIO" - ); - let page = inner.get_page(); - assert_eq!( - inner - .page_arena - .as_ref() - .unwrap() - .allocated_pages - .load(Ordering::Relaxed), - 1 - ); - drop(page); - let frame = inner.get_frame(); - assert_eq!( - inner - .wal_frame_arena - .as_ref() - .unwrap() - .allocated_pages - .load(Ordering::Relaxed), - 1 - ); - drop(frame); - assert_eq!( - inner - .wal_frame_arena - .as_ref() - .unwrap() - .allocated_pages - .load(Ordering::Relaxed), - 0 - ); - assert_eq!( - inner - .page_arena - .as_ref() - .unwrap() - .allocated_pages - .load(Ordering::Relaxed), - 0 - ); - } -} From 84ffed709a72c622667f64fb6c262443741bec0d Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Thu, 7 Aug 2025 15:17:39 -0400 Subject: [PATCH 27/29] Round up allocation for wal frame arena to next page multiple of 64 --- core/storage/buffer_pool.rs | 35 ++++++++++++++----- core/storage/page_bitmap.rs | 2 +- core/storage/pager.rs | 3 +- .../src/database_sync_operations.rs | 4 +-- 4 files changed, 31 insertions(+), 13 deletions(-) diff --git a/core/storage/buffer_pool.rs b/core/storage/buffer_pool.rs index 8c67ee0dd..eb44b1122 100644 --- a/core/storage/buffer_pool.rs +++ b/core/storage/buffer_pool.rs @@ -131,11 +131,13 @@ impl BufferPool { pub const DEFAULT_PAGE_SIZE: usize = 4096; /// Maximum size for each Arena (64MB total) const MAX_ARENA_SIZE: usize = 32 * 1024 * 1024; - + /// 64kb Minimum arena size + const MIN_ARENA_SIZE: usize = 1024 * 64; fn new(arena_size: usize) -> Self { turso_assert!( - arena_size < Self::MAX_ARENA_SIZE, - "Arena size cannot exceed {} bytes", + (Self::MIN_ARENA_SIZE..Self::MAX_ARENA_SIZE).contains(&arena_size), + "Arena size needs to be between {}..{} bytes", + Self::MIN_ARENA_SIZE, Self::MAX_ARENA_SIZE ); Self { @@ -416,26 +418,37 @@ static NEXT_ID: AtomicU32 = AtomicU32::new(UNREGISTERED_START); impl Arena { /// Create a new arena with the given size and page size. + /// NOTE: Minimum arena size is page_size * 64 fn new(page_size: usize, arena_size: usize, io: &Arc) -> Result { - let ptr = unsafe { arena::alloc(arena_size) }; + let min_pages = arena_size.div_ceil(page_size); + let rounded_pages = (min_pages.max(64) + 63) & !63; + let rounded_bytes = rounded_pages * page_size; + // Guard against the global cap + if rounded_bytes > BufferPool::MAX_ARENA_SIZE { + return Err(format!( + "arena size {} B exceeds hard limit of {} B", + rounded_bytes, + BufferPool::MAX_ARENA_SIZE + )); + } + let ptr = unsafe { arena::alloc(rounded_bytes) }; let base = NonNull::new(ptr).ok_or("Failed to allocate arena")?; - let total_pages = arena_size / page_size; let id = io - .register_fixed_buffer(base, arena_size) + .register_fixed_buffer(base, rounded_bytes) .unwrap_or_else(|_| { // Register with io_uring if possible, otherwise use next available ID let next_id = NEXT_ID.fetch_add(1, Ordering::Relaxed); tracing::trace!("Allocating arena with id {}", next_id); next_id }); - + let map = PageBitmap::new(rounded_pages as u32); Ok(Self { id, base, - free_pages: SpinLock::new(PageBitmap::new(total_pages as u32)), + free_pages: SpinLock::new(map), allocated_pages: AtomicUsize::new(0), page_size, - arena_size, + arena_size: rounded_bytes, }) } @@ -463,6 +476,10 @@ impl Arena { /// Mark `count` pages starting at `page_idx` as free. pub fn free(&self, page_idx: u32, count: usize) { let mut bm = self.free_pages.lock(); + turso_assert!( + !bm.check_run_free(page_idx, count as u32), + "must not already be marked free" + ); bm.free_run(page_idx, count as u32); self.allocated_pages.fetch_sub(count, Ordering::Relaxed); } diff --git a/core/storage/page_bitmap.rs b/core/storage/page_bitmap.rs index 51942b1e1..5bb5465e1 100644 --- a/core/storage/page_bitmap.rs +++ b/core/storage/page_bitmap.rs @@ -314,7 +314,7 @@ impl PageBitmap { /// /// Word 1: ...11111111_11110000 (bits 60-63 must be checked) /// Word 2: 00000000_01111111... (bits 0-6 must be checked) - fn check_run_free(&self, start: u32, len: u32) -> bool { + pub(super) fn check_run_free(&self, start: u32, len: u32) -> bool { if start + len > self.n_pages { return false; } diff --git a/core/storage/pager.rs b/core/storage/pager.rs index d44369632..73c95532b 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -2433,7 +2433,8 @@ mod ptrmap_tests { )); // Construct interfaces for the pager - let sz = initial_db_pages + 10; + let pages = initial_db_pages + 10; + let sz = std::cmp::max(std::cmp::min(pages, 64), pages); let buffer_pool = BufferPool::begin_init(&io, (sz * page_size) as usize); let page_cache = Arc::new(RwLock::new(DumbLruPageCache::new(sz as usize))); diff --git a/packages/turso-sync-engine/src/database_sync_operations.rs b/packages/turso-sync-engine/src/database_sync_operations.rs index f53d0cd8b..0e8d145a6 100644 --- a/packages/turso-sync-engine/src/database_sync_operations.rs +++ b/packages/turso-sync-engine/src/database_sync_operations.rs @@ -1,4 +1,4 @@ -use std::{rc::Rc, sync::Arc}; +use std::sync::Arc; use turso_core::{types::Text, Buffer, Completion, LimboError, Value}; @@ -48,7 +48,7 @@ pub async fn db_bootstrap( let content_len = chunk.len(); // todo(sivukhin): optimize allocations here #[allow(clippy::arc_with_non_send_sync)] - let buffer = Arc::new(Buffer::allocate(chunk.len(), Rc::new(|_| {}))); + let buffer = Arc::new(Buffer::new_temporary(chunk.len())); buffer.as_mut_slice().copy_from_slice(chunk); let mut completions = Vec::with_capacity(dbs.len()); for db in dbs { From 213d589dd184227c1b3aec8fcd14f21e3c8aceff Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Fri, 8 Aug 2025 10:54:33 -0400 Subject: [PATCH 28/29] Apply review suggestions, remove FreeEntry --- core/io/io_uring.rs | 9 ++-- core/io/mod.rs | 2 +- core/lib.rs | 4 +- core/storage/buffer_pool.rs | 100 ++++++++++++------------------------ scripts/limbo-sqlite3 | 4 +- 5 files changed, 43 insertions(+), 76 deletions(-) diff --git a/core/io/io_uring.rs b/core/io/io_uring.rs index 64204b309..a5b32c661 100644 --- a/core/io/io_uring.rs +++ b/core/io/io_uring.rs @@ -42,7 +42,7 @@ const MAX_IOVEC_ENTRIES: usize = CKPT_BATCH_PAGES; const MAX_WAIT: usize = 4; /// One memory arena for DB pages and another for WAL frames -const ARENAS: usize = 2; +const ARENA_COUNT: usize = 2; pub struct UringIO { inner: Rc>, @@ -61,7 +61,7 @@ struct WrappedIOUring { struct InnerUringIO { ring: WrappedIOUring, free_files: VecDeque, - free_arenas: [Option<(NonNull, usize)>; 2], + free_arenas: [Option<(NonNull, usize)>; ARENA_COUNT], } /// preallocated vec of iovec arrays to avoid allocations during writev operations @@ -113,7 +113,8 @@ impl UringIO { // RL_MEMLOCK cap is typically 8MB, the current design is to have one large arena // registered at startup and therefore we can simply use the zero index, falling back // to similar logic as the existing buffer pool for cases where it is over capacity. - ring.submitter().register_buffers_sparse(ARENAS as u32)?; + ring.submitter() + .register_buffers_sparse(ARENA_COUNT as u32)?; let inner = InnerUringIO { ring: WrappedIOUring { ring, @@ -122,7 +123,7 @@ impl UringIO { iov_pool: IovecPool::new(), }, free_files: (0..FILES).collect(), - free_arenas: [const { None }; ARENAS], + free_arenas: [const { None }; ARENA_COUNT], }; debug!("Using IO backend 'io-uring'"); Ok(Self { diff --git a/core/io/mod.rs b/core/io/mod.rs index 66dbae949..74c71b378 100644 --- a/core/io/mod.rs +++ b/core/io/mod.rs @@ -74,7 +74,7 @@ pub trait File: Send + Sync { while pos < file_size { let chunk_size = (file_size - pos).min(BUFFER_SIZE); // Read from source - let read_buffer = Arc::new(Buffer::allocate(chunk_size, Rc::new(|_| {}))); + let read_buffer = Arc::new(Buffer::new_temporary(chunk_size)); let read_completion = self.pread( pos, Completion::new_read(read_buffer.clone(), move |_, _| {}), diff --git a/core/lib.rs b/core/lib.rs index adc9849fb..d5b21bd3d 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -380,8 +380,8 @@ impl Database { } fn init_pager(&self, page_size: Option) -> Result { - let arena_size = if std::env::var("CI").is_ok_and(|v| v.eq_ignore_ascii_case("true")) { - BufferPool::TEST_AREA_SIZE + let arena_size = if std::env::var("TESTING").is_ok_and(|v| v.eq_ignore_ascii_case("true")) { + BufferPool::TEST_ARENA_SIZE } else { BufferPool::DEFAULT_ARENA_SIZE }; diff --git a/core/storage/buffer_pool.rs b/core/storage/buffer_pool.rs index eb44b1122..2f89d4878 100644 --- a/core/storage/buffer_pool.rs +++ b/core/storage/buffer_pool.rs @@ -126,7 +126,7 @@ impl BufferPool { /// to `RL_MEMLOCK` limit for un-privileged processes being 8MB total. pub const DEFAULT_ARENA_SIZE: usize = 3 * 1024 * 1024; /// 1MB size For testing/CI - pub const TEST_AREA_SIZE: usize = 1024 * 1024; + pub const TEST_ARENA_SIZE: usize = 1024 * 1024; /// 4KB default page_size pub const DEFAULT_PAGE_SIZE: usize = 4096; /// Maximum size for each Arena (64MB total) @@ -153,13 +153,14 @@ impl BufferPool { } /// Request a `Buffer` of size `len` + #[inline] pub fn allocate(len: usize) -> Buffer { let pool = BUFFER_POOL.get().expect("BufferPool must be initialized"); pool.inner().allocate(len) } - /// Request a `Buffer` the size of the `db_page_size` the `BufferPool` - /// was initialized with. + /// Request a `Buffer` the size of the `db_page_size` the `BufferPool` was initialized with. + #[inline] pub fn get_page(&self) -> Buffer { let inner = self.inner_mut(); inner.get_page() @@ -167,22 +168,25 @@ impl BufferPool { /// Request a `Buffer` for use with a WAL frame, /// `[Database::page_size] + `WAL_FRAME_HEADER_SIZE` + #[inline] pub fn get_wal_frame(&self) -> Buffer { let inner = self.inner_mut(); inner.get_frame() } + #[inline] fn inner(&self) -> &PoolInner { unsafe { &*self.inner.get() } } + #[inline] #[allow(clippy::mut_from_ref)] fn inner_mut(&self) -> &mut PoolInner { unsafe { &mut *self.inner.get() } } /// Create a static `BufferPool` initialize the pool to the default page size, **without** - /// poulating the Arenas. Arenas will not be created until `[BufferPool::finalize_page_size]`, + /// populating the Arenas. Arenas will not be created until `[BufferPool::finalize_page_size]`, /// and the pool will temporarily return temporary buffers to prevent reallocation of the /// arena if the page size is set to something other than the default value. pub fn begin_init(io: &Arc, arena_size: usize) -> Arc { @@ -227,7 +231,7 @@ impl BufferPool { impl PoolInner { /// Allocate a buffer of the given length from the pool, falling back to - /// temporary thread local buffers if the pool is not initialized or full. + /// temporary thread local buffers if the pool is not initialized or is full. pub fn allocate(&self, len: usize) -> Buffer { turso_assert!(len > 0, "Cannot allocate zero-length buffer"); @@ -236,68 +240,33 @@ impl PoolInner { // Check if this is exactly a WAL frame size allocation if len == wal_frame_size { - let Some(wal_arena) = self.wal_frame_arena.as_ref() else { - return Buffer::new_temporary(len); - }; - if let Some(FreeEntry { ptr, first_idx }) = wal_arena.try_alloc(len) { - tracing::trace!( - "Allocated WAL frame buffer of length {} from arena {} at index {}", - len, - wal_arena.id, - first_idx - ); - return Buffer::new_pooled(ArenaBuffer::new(ptr, len, wal_arena.id, first_idx)); - } + return self + .wal_frame_arena + .as_ref() + .and_then(|wal_arena| wal_arena.try_alloc(len)) + .unwrap_or(Buffer::new_temporary(len)); } // For all other sizes, use regular arena - let Some(arena) = self.page_arena.as_ref() else { - // pool isn't fully initialized, return temporary buffer - return Buffer::new_temporary(len); - }; - if let Some(FreeEntry { ptr, first_idx }) = arena.try_alloc(len) { - tracing::trace!( - "Allocated buffer of length {} from arena {} at index {}", - len, - arena.id, - first_idx - ); - return Buffer::new_pooled(ArenaBuffer::new(ptr, len, arena.id, first_idx)); - } - Buffer::new_temporary(len) + self.page_arena + .as_ref() + .and_then(|arena| arena.try_alloc(len)) + .unwrap_or(Buffer::new_temporary(len)) } fn get_page(&mut self) -> Buffer { let db_page_size = self.db_page_size.load(Ordering::Relaxed); - let Some(arena) = self.page_arena.as_ref() else { - return Buffer::new_temporary(db_page_size); - }; - if let Some(FreeEntry { ptr, first_idx }) = arena.try_alloc(db_page_size) { - tracing::trace!( - "Allocated page buffer of size {} from arena {} at index {}", - db_page_size, - arena.id, - first_idx - ); - return Buffer::new_pooled(ArenaBuffer::new(ptr, db_page_size, arena.id, first_idx)); - } - Buffer::new_temporary(db_page_size) + self.page_arena + .as_ref() + .and_then(|arena| arena.try_alloc(db_page_size)) + .unwrap_or(Buffer::new_temporary(db_page_size)) } fn get_frame(&mut self) -> Buffer { let len = self.db_page_size.load(Ordering::Relaxed) + WAL_FRAME_HEADER_SIZE; - let Some(wal_arena) = self.wal_frame_arena.as_ref() else { - return Buffer::new_temporary(len); - }; - if let Some(FreeEntry { ptr, first_idx }) = wal_arena.try_alloc(len) { - tracing::trace!( - "Allocated WAL frame buffer of length {} from arena {} at index {}", - len, - wal_arena.id, - first_idx - ); - return Buffer::new_pooled(ArenaBuffer::new(ptr, len, wal_arena.id, first_idx)); - } - Buffer::new_temporary(len) + self.wal_frame_arena + .as_ref() + .and_then(|wal_arena| wal_arena.try_alloc(len)) + .unwrap_or(Buffer::new_temporary(len)) } /// Allocate a new arena for the pool to use @@ -403,11 +372,6 @@ impl Drop for Arena { } } -struct FreeEntry { - ptr: NonNull, - first_idx: u32, -} - /// Slots 0 and 1 will be reserved for Arenas which are registered buffers /// with io_uring. const UNREGISTERED_START: u32 = 2; @@ -452,16 +416,16 @@ impl Arena { }) } - /// Attempt to mark N pages as `allocated` and return a pointer to and - /// index of the first page in the allocation. - pub fn try_alloc(&self, size: usize) -> Option { + /// Allocate a `Buffer` large enough for logical length `size`. + /// May span multiple pages + pub fn try_alloc(&self, size: usize) -> Option { let pages = size.div_ceil(self.page_size) as u32; let mut freemap = self.free_pages.lock(); let first_idx = if pages == 1 { // use the optimized method for individual pages which attempts // to leave large contiguous areas free of fragmentation for - // larger `runs` + // larger `runs`. freemap.alloc_one()? } else { freemap.alloc_run(pages)? @@ -470,7 +434,9 @@ impl Arena { .fetch_add(pages as usize, Ordering::Relaxed); let offset = first_idx as usize * self.page_size; let ptr = unsafe { NonNull::new_unchecked(self.base.as_ptr().add(offset)) }; - Some(FreeEntry { ptr, first_idx }) + Some(Buffer::new_pooled(ArenaBuffer::new( + ptr, size, self.id, first_idx, + ))) } /// Mark `count` pages starting at `page_idx` as free. diff --git a/scripts/limbo-sqlite3 b/scripts/limbo-sqlite3 index b59651446..ce742f795 100755 --- a/scripts/limbo-sqlite3 +++ b/scripts/limbo-sqlite3 @@ -11,7 +11,7 @@ EXPERIMENTAL_FLAGS="" # if RUST_LOG is non-empty, enable tracing output if [ -n "$RUST_LOG" ]; then - CI=true "$TURSODB" -m list -q $EXPERIMENTAL_FLAGS -t testing/test.log "$@" + TESTING="true" "$TURSODB" -m list -q $EXPERIMENTAL_FLAGS -t testing/test.log "$@" else - CI=true "$TURSODB" -m list -q $EXPERIMENTAL_FLAGS "$@" + TESTING="true" "$TURSODB" -m list -q $EXPERIMENTAL_FLAGS "$@" fi From 718114f5da76a64b5d2a09131eeffdddb61cf01e Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Fri, 8 Aug 2025 11:50:16 -0400 Subject: [PATCH 29/29] Keep `free` api consistent with `try_alloc`, size instead of `pages` --- core/storage/buffer_pool.rs | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/core/storage/buffer_pool.rs b/core/storage/buffer_pool.rs index 2f89d4878..75ba9a634 100644 --- a/core/storage/buffer_pool.rs +++ b/core/storage/buffer_pool.rs @@ -326,18 +326,14 @@ impl PoolInner { // allocations of size `page_size` are more common, so we check that arena first. if let Some(arena) = self.page_arena.as_ref() { if arena_id == arena.id { - let pages = size.div_ceil(arena.page_size); - tracing::trace!("Freeing {} pages from arena {}", pages, arena_id); - arena.free(page_idx, pages); + arena.free(page_idx, size); return; } } // check WAL frame arena if let Some(wal_arena) = self.wal_frame_arena.as_ref() { if arena_id == wal_arena.id { - let pages = size.div_ceil(wal_arena.page_size); - tracing::trace!("Freeing {} pages from WAL frame arena {}", pages, arena_id); - wal_arena.free(page_idx, pages); + wal_arena.free(page_idx, size); return; } } @@ -439,9 +435,10 @@ impl Arena { ))) } - /// Mark `count` pages starting at `page_idx` as free. - pub fn free(&self, page_idx: u32, count: usize) { + /// Mark all relevant pages that include `size` starting at `page_idx` as free. + pub fn free(&self, page_idx: u32, size: usize) { let mut bm = self.free_pages.lock(); + let count = size.div_ceil(self.page_size); turso_assert!( !bm.check_run_free(page_idx, count as u32), "must not already be marked free"