Make register buffer io trait return the buf index

This commit is contained in:
PThorpe92
2025-08-03 15:04:49 -04:00
parent 3cff47e490
commit 0ffba81216
2 changed files with 150 additions and 122 deletions

View File

@@ -124,7 +124,7 @@ pub trait IO: Clock + Send + Sync {
fn get_memory_io(&self) -> Arc<MemoryIO>;
fn register_fixed_buffer(&self, _ptr: NonNull<u8>, _len: usize) -> Result<()> {
fn register_fixed_buffer(&self, _ptr: NonNull<u8>, _len: usize) -> Result<u32> {
Err(crate::LimboError::InternalError(
"unsupported operation".to_string(),
))

View File

@@ -1,12 +1,13 @@
use crate::fast_lock::SpinLock;
use crate::io::TEMP_BUFFER_CACHE;
use crate::storage::page_bitmap::PageBitmap;
use crate::storage::sqlite3_ondisk::WAL_FRAME_HEADER_SIZE;
use crate::{turso_assert, Buffer, LimboError, IO};
use parking_lot::Mutex;
use std::cell::UnsafeCell;
use std::ptr::NonNull;
use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
use std::sync::{Arc, OnceLock};
use std::sync::{Arc, OnceLock, Weak};
pub static BUFFER_POOL: OnceLock<Arc<BufferPool>> = OnceLock::new();
@@ -19,8 +20,6 @@ pub struct ArenaBuffer {
len: usize,
}
const REGISTERED_ID: u32 = 0;
impl ArenaBuffer {
const fn new(ptr: NonNull<u8>, len: usize, arena_id: u32, page_idx: u32) -> Self {
ArenaBuffer {
@@ -32,17 +31,17 @@ impl ArenaBuffer {
}
#[inline(always)]
/// Returns the `id` of the underlying arena if it is registered with `io_uring`
/// Returns the `id` of the underlying arena, only if it was registered with `io_uring`
pub fn fixed_id(&self) -> Option<u32> {
if self.arena_id == REGISTERED_ID {
Some(REGISTERED_ID)
if self.arena_id < UNREGISTERED_START {
Some(self.arena_id)
} else {
None
}
}
/// The requested size of the allocation. The actual size is always rounded up to
/// the next multiple of the arena.page_size
/// The requested size of the allocation, the actual size of the underlying buffer is rounded up to
/// the next multiple of the arena's page_size
pub const fn logical_len(&self) -> usize {
self.len
}
@@ -59,7 +58,8 @@ impl Drop for ArenaBuffer {
let pool = BUFFER_POOL
.get()
.expect("BufferPool not initialized, cannot free ArenaBuffer");
pool.free(self.logical_len(), self.arena_id, self.page_idx);
let inner = pool.inner_mut();
inner.free(self.logical_len(), self.arena_id, self.page_idx);
}
}
@@ -85,7 +85,8 @@ unsafe impl Send for BufferPool {}
struct PoolInner {
io: Option<Arc<dyn IO>>,
arena: Option<Arena>,
page_arena: Option<Arena>,
wal_frame_arena: Option<Arena>,
init_lock: Mutex<()>,
arena_size: AtomicUsize,
db_page_size: AtomicUsize,
@@ -114,7 +115,8 @@ impl BufferPool {
);
Self {
inner: UnsafeCell::new(PoolInner {
arena: None,
page_arena: None,
wal_frame_arena: None,
arena_size: arena_size.into(),
db_page_size: Self::DEFAULT_PAGE_SIZE.into(),
init_lock: Mutex::new(()),
@@ -129,8 +131,8 @@ impl BufferPool {
}
pub fn get_page(&self) -> Buffer {
let inner = self.inner();
inner.allocate(inner.db_page_size.load(Ordering::Relaxed))
let inner = self.inner_mut();
inner.get_page()
}
fn inner(&self) -> &PoolInner {
@@ -157,7 +159,8 @@ impl BufferPool {
}
/// Call when `[Database::db_state]` is initialized, providing the `page_size` to allocate
/// an arena for the pool. Before this call, the pool will use temporary buffers
/// an arena for the pool. Before this call, the pool will use temporary buffers which are
/// cached in thread local storage.
pub fn finalize_with_page_size(&self, page_size: usize) -> crate::Result<Arc<Self>> {
let pool = BUFFER_POOL.get().expect("BufferPool must be initialized");
let inner = pool.inner_mut();
@@ -169,11 +172,11 @@ impl BufferPool {
cache.borrow_mut().reinit_cache(page_size);
});
}
if inner.arena.is_some() {
if inner.page_arena.is_some() {
return Ok(pool.clone());
}
inner.db_page_size.store(page_size, Ordering::Relaxed);
inner.init_arena()?;
inner.init_arenas()?;
Ok(pool.clone())
}
@@ -188,7 +191,27 @@ impl PoolInner {
/// temporary thread local buffers if the pool is not initialized or full
pub fn allocate(&self, len: usize) -> Buffer {
turso_assert!(len > 0, "Cannot allocate zero-length buffer");
let Some(arena) = self.arena.as_ref() else {
let db_page_size = self.db_page_size.load(Ordering::Relaxed);
let wal_frame_size = db_page_size + WAL_FRAME_HEADER_SIZE;
// Check if this is exactly a WAL frame size allocation
if len == wal_frame_size {
let Some(wal_arena) = self.wal_frame_arena.as_ref() else {
return Buffer::new_temporary(len);
};
if let Some(FreeEntry { ptr, first_idx }) = wal_arena.try_alloc(len) {
tracing::trace!(
"Allocated WAL frame buffer of length {} from arena {} at index {}",
len,
wal_arena.id,
first_idx
);
return Buffer::new_pooled(ArenaBuffer::new(ptr, len, wal_arena.id, first_idx));
}
}
// For all other sizes, use regular arena
let Some(arena) = self.page_arena.as_ref() else {
// pool isn't fully initialized, return temporary buffer
return Buffer::new_temporary(len);
};
@@ -204,42 +227,91 @@ impl PoolInner {
Buffer::new_temporary(len)
}
fn get_page(&mut self) -> Buffer {
let db_page_size = self.db_page_size.load(Ordering::Relaxed);
let Some(arena) = self.page_arena.as_ref() else {
return Buffer::new_temporary(db_page_size);
};
if let Some(FreeEntry { ptr, first_idx }) = arena.try_alloc(db_page_size) {
tracing::trace!(
"Allocated page buffer of size {} from arena {} at index {}",
db_page_size,
arena.id,
first_idx
);
return Buffer::new_pooled(ArenaBuffer::new(ptr, db_page_size, arena.id, first_idx));
}
Buffer::new_temporary(db_page_size)
}
/// Allocate a new arena for the pool to use
fn init_arena(&mut self) -> crate::Result<()> {
fn init_arenas(&mut self) -> crate::Result<()> {
// Prevent concurrent growth
let Some(_guard) = self.init_lock.try_lock() else {
tracing::debug!("Buffer pool is already growing, skipping initialization");
return Ok(()); // Already in progress
};
let arena_size = self.arena_size.load(Ordering::Relaxed);
let db_page_size = self.db_page_size.load(Ordering::Relaxed);
let io = self.io.as_ref().expect("Pool not initialized").clone();
match Arena::new(
self.db_page_size.load(Ordering::Relaxed) + WAL_FRAME_HEADER_SIZE,
arena_size,
&io,
) {
// Create regular page arena
match Arena::new(db_page_size, arena_size, &io) {
Ok(arena) => {
tracing::trace!(
"added arena {} with size {} MB",
"added arena {} with size {} MB and page size {}",
arena.id,
arena_size / (1024 * 1024)
arena_size / (1024 * 1024),
db_page_size
);
self.arena = Some(arena);
Ok(())
self.page_arena = Some(arena);
}
Err(e) => {
tracing::error!("Failed to create new arena: {:?}", e);
Err(LimboError::InternalError(format!(
"Failed to create new arena: {e}",
)))
tracing::error!("Failed to create arena: {:?}", e);
return Err(LimboError::InternalError(format!(
"Failed to create arena: {e}",
)));
}
}
// Create WAL frame arena
let wal_frame_size = db_page_size + WAL_FRAME_HEADER_SIZE;
match Arena::new(wal_frame_size, arena_size, &io) {
Ok(arena) => {
tracing::trace!(
"added WAL frame arena {} with size {} MB and page size {}",
arena.id,
arena_size / (1024 * 1024),
wal_frame_size
);
self.wal_frame_arena = Some(arena);
}
Err(e) => {
tracing::error!("Failed to create WAL frame arena: {:?}", e);
return Err(LimboError::InternalError(format!(
"Failed to create WAL frame arena: {e}",
)));
}
}
Ok(())
}
pub fn free(&mut self, size: usize, arena_id: u32, page_idx: u32) {
pub fn free(&self, size: usize, arena_id: u32, page_idx: u32) {
// Check WAL frame arena first
if let Some(wal_arena) = self.wal_frame_arena.as_ref() {
if arena_id == wal_arena.id {
let pages = size.div_ceil(wal_arena.page_size);
tracing::trace!("Freeing {} pages from WAL frame arena {}", pages, arena_id);
wal_arena.free(page_idx, pages);
return;
}
}
// Otherwise use regular arena
let arena = self
.arena
.as_mut()
.page_arena
.as_ref()
.expect("pool arena not initialized, cannot free buffer");
let pages = size.div_ceil(arena.page_size);
tracing::trace!("Freeing {} pages from arena {}", pages, arena_id);
@@ -252,15 +324,22 @@ impl PoolInner {
}
}
/// A single memory arena
/// Preallocated block of memory used by the pool to distribute Buffers
struct Arena {
/// Identifier to tie allocations back to the arena
id: u32,
/// base pointer to the arena returned by `mmap`
base: NonNull<u8>,
/// Total number of pages currently allocated from this arena
allocated_pages: AtomicUsize,
free_pages: SpinLock<Vec<u32>>,
/// Currently free pages
free_pages: SpinLock<PageBitmap>,
/// Total size of the arena in bytes
arena_size: usize,
/// Page size the total arena is divided into.
/// Because most allocations are of size `[Database::page_size]`, with an
/// additional 24 byte wal frame header, we treat this as the `page_size` to reduce
/// fragmentation to 24 bytes for regular pages
page_size: usize,
}
@@ -275,11 +354,11 @@ struct FreeEntry {
first_idx: u32,
}
const UNREGISTERED: u32 = 1;
const UNREGISTERED_START: u32 = 2;
/// For an arena which isn't registered with `io_uring`
/// registered arena will always have id = 0
static NEXT_ID: AtomicU32 = AtomicU32::new(UNREGISTERED);
/// registered arena will always have id = 0..=1
static NEXT_ID: AtomicU32 = AtomicU32::new(UNREGISTERED_START);
impl Arena {
/// Create a new arena with the given size and page size.
@@ -287,16 +366,19 @@ impl Arena {
let ptr = unsafe { arena::alloc(arena_size) };
let base = NonNull::new(ptr).ok_or("Failed to allocate arena")?;
let total_pages = arena_size / page_size;
let id = if io.register_fixed_buffer(base, arena_size).is_ok() {
REGISTERED_ID
} else {
NEXT_ID.fetch_add(1, Ordering::Relaxed)
};
let id = io
.register_fixed_buffer(base, arena_size)
.unwrap_or_else(|_| {
// Register with io_uring if possible, otherwise use next available ID
let next_id = NEXT_ID.fetch_add(1, Ordering::Relaxed);
tracing::trace!("Allocating arena with id {}", next_id);
next_id
});
Ok(Self {
id,
base,
free_pages: SpinLock::new((0..total_pages as u32).collect()),
free_pages: SpinLock::new(PageBitmap::new(total_pages as u32)),
allocated_pages: AtomicUsize::new(0),
page_size,
arena_size,
@@ -304,88 +386,34 @@ impl Arena {
}
pub fn try_alloc(&self, size: usize) -> Option<FreeEntry> {
let pages = size.div_ceil(self.page_size);
let mut free = self.free_pages.lock();
if free.len() < pages {
return None;
}
if pages == 1 {
// fast path: for now, most/all allocations are single pages
if let Some(page_idx) = free.pop() {
self.allocated_pages.fetch_add(pages, Ordering::Relaxed);
let offset = page_idx as usize * self.page_size;
let ptr = unsafe { NonNull::new_unchecked(self.base.as_ptr().add(offset)) };
tracing::trace!(
"Allocated single page at index {} from arena {}",
page_idx,
self.id
);
return Some(FreeEntry {
ptr,
first_idx: page_idx,
});
}
let pages = size.div_ceil(self.page_size) as u32;
let mut freemap = self.free_pages.lock();
let first_idx = if pages == 1 {
freemap.alloc_one()?
} else {
return self.try_alloc_many(pages);
}
None
freemap.alloc_run(pages)?
};
self.allocated_pages
.fetch_add(pages as usize, Ordering::Relaxed);
let offset = first_idx as usize * self.page_size;
let ptr = unsafe { NonNull::new_unchecked(self.base.as_ptr().add(offset)) };
Some(FreeEntry { ptr, first_idx })
}
/// Allocate a contiguous run of `pages`. Returns the first page index.
#[inline]
pub fn alloc_run(&self, pages: u32) -> Option<u32> {
let mut bm = self.free_pages.lock();
bm.alloc_run(pages)
}
/// Free pages back to this arena
pub fn free(&self, page_idx: u32, count: usize) {
let mut free = self.free_pages.lock();
// Add pages back to freelist
for i in 0..count {
free.push(page_idx + i as u32);
}
let mut bm = self.free_pages.lock();
bm.free_run(page_idx, count as u32);
self.allocated_pages.fetch_sub(count, Ordering::Relaxed);
}
#[cold]
fn try_alloc_many(&self, pages: usize) -> Option<FreeEntry> {
// TODO (preston): we can optimize this further when we start allocating larger
// contiguous blocks for coalescing. this is 'unused' for now
let mut free = self.free_pages.lock();
if pages <= 3 && free.len() >= pages {
let start = free.len() - pages;
let first_idx = free[start];
let mut consecutive = true;
for j in 1..pages {
if free[start + j] != first_idx + j as u32 {
consecutive = false;
break;
}
}
if consecutive {
free.truncate(start);
self.allocated_pages.fetch_add(pages, Ordering::Relaxed);
let offset = first_idx as usize * self.page_size;
let ptr = unsafe { NonNull::new_unchecked(self.base.as_ptr().add(offset)) };
return Some(FreeEntry { ptr, first_idx });
}
}
// Fall back to searching from the beginning
for i in 0..free.len().saturating_sub(pages - 1) {
let first_idx = free[i];
let mut consecutive = true;
for j in 1..pages {
if free[i + j] != first_idx + j as u32 {
consecutive = false;
break;
}
}
if consecutive {
free.drain(i..i + pages);
self.allocated_pages.fetch_add(pages, Ordering::Relaxed);
let offset = first_idx as usize * self.page_size;
let ptr = unsafe { NonNull::new_unchecked(self.base.as_ptr().add(offset)) };
return Some(FreeEntry { ptr, first_idx });
}
}
None
}
}
#[cfg(unix)]