Add documentation and comments to new buffer pool, add get_frame api

This commit is contained in:
PThorpe92
2025-08-05 12:50:47 -04:00
parent d94e252ef9
commit 66964fd8d2
4 changed files with 122 additions and 51 deletions

View File

@@ -269,6 +269,18 @@ impl InnerUringIO {
self.free_files.push_back(id);
Ok(())
}
#[cfg(debug_assertions)]
fn debug_check_fixed(&self, idx: u32, ptr: *const u8, len: usize) {
let (base, blen) = self.free_arenas[idx as usize].expect("slot not registered");
let start = base.as_ptr() as usize;
let end = start + blen;
let p = ptr as usize;
assert!(
p >= start && p + len <= end,
"Fixed operation, pointer out of registered range"
);
}
}
impl WrappedIOUring {
@@ -564,7 +576,6 @@ impl UringFile {
self.id
}
}
unsafe impl Send for UringFile {}
unsafe impl Sync for UringFile {}
@@ -641,26 +652,27 @@ impl File for UringFile {
fn pwrite(&self, pos: usize, buffer: Arc<crate::Buffer>, c: Completion) -> Result<Completion> {
let mut io = self.io.borrow_mut();
let write = {
let ptr = buf.as_ptr();
let len = buf.len();
with_fd!(self, |fd| {
if let Some(idx) = buf.fixed_id() {
trace!(
"pwrite_fixed(pos = {}, length = {}, idx= {})",
pos,
buffer.len(),
len,
idx
);
io_uring::opcode::WriteFixed::new(
fd,
buffer.as_ptr(),
buffer.len() as u32,
idx as u16,
)
.offset(pos as u64)
.build()
.user_data(get_key(c.clone()))
#[cfg(debug_assertions)]
{
io.debug_check_fixed(idx, ptr, len);
}
io_uring::opcode::WriteFixed::new(fd, ptr, len as u32, idx as u16)
.offset(pos as u64)
.build()
.user_data(get_key(c.clone()))
} else {
trace!("pwrite(pos = {}, length = {})", pos, buffer.len());
io_uring::opcode::Write::new(fd, buffer.as_ptr(), buffer.len() as u32)
io_uring::opcode::Write::new(fd, ptr, len as u32)
.offset(pos as u64)
.build()
.user_data(get_key(c.clone()))

View File

@@ -14,9 +14,16 @@ pub static BUFFER_POOL: OnceLock<Arc<BufferPool>> = OnceLock::new();
#[derive(Debug)]
/// A buffer allocated from an arena from `[BufferPool]`
pub struct ArenaBuffer {
/// Pointer to the start of the buffer
ptr: NonNull<u8>,
/// Identifier for the `[Arena]` the buffer came from
arena_id: u32,
/// The index of the first page making up the buffer
page_idx: u32,
/// The requested length of the allocation.
/// The actual size of what is allocated for the
/// buffer is `len` rounded up to the next multiple of
/// `[Arena::page_size]`
len: usize,
}
@@ -32,7 +39,8 @@ impl ArenaBuffer {
#[inline(always)]
/// Returns the `id` of the underlying arena, only if it was registered with `io_uring`
pub fn fixed_id(&self) -> Option<u32> {
pub const fn fixed_id(&self) -> Option<u32> {
// Arenas which are not registered will have `id`s <= UNREGISTERED_START
if self.arena_id < UNREGISTERED_START {
Some(self.arena_id)
} else {
@@ -76,7 +84,8 @@ impl std::ops::DerefMut for ArenaBuffer {
}
}
/// Static Buffer pool managing multiple arenas
/// Static Buffer pool managing multiple memory arenas
/// of which `[ArenaBuffer]`s are returned for requested allocations
pub struct BufferPool {
inner: UnsafeCell<PoolInner>,
}
@@ -84,11 +93,21 @@ unsafe impl Sync for BufferPool {}
unsafe impl Send for BufferPool {}
struct PoolInner {
/// An instance of the program's IO, used for registering
/// Arena's with io_uring.
io: Option<Arc<dyn IO>>,
/// An Arena which returns `ArenaBuffer`s of size `db_page_size`.
page_arena: Option<Arena>,
/// An Arena which returns `ArenaBuffer`s of size `db_page_size`
/// plus 24 byte `WAL_FRAME_HEADER_SIZE`, preventing the fragmentation
/// or complex book-keeping needed to use the same arena for both sizes.
wal_frame_arena: Option<Arena>,
/// A lock preventing concurrent initialization.
init_lock: Mutex<()>,
/// The size of each `Arena`, in bytes.
arena_size: AtomicUsize,
/// The `[Database::page_size]`, which the `page_arena` will use to
/// return buffers from `Self::get_page`.
db_page_size: AtomicUsize,
}
@@ -102,12 +121,16 @@ impl Default for BufferPool {
}
impl BufferPool {
pub const DEFAULT_ARENA_SIZE: usize = 4 * 1024 * 1024; // 4MB arena
pub const TEST_AREA_SIZE: usize = 1024 * 1024; // 1MB arena for testing
pub const DEFAULT_PAGE_SIZE: usize = 4096; // 4KB default page size
const MAX_ARENA_SIZE: usize = 32 * 1024 * 1024; // 32MB max arena
/// 4MB Default size for each `Arena`
pub const DEFAULT_ARENA_SIZE: usize = 4 * 1024 * 1024;
/// 1MB size For testing/CI
pub const TEST_AREA_SIZE: usize = 1024 * 1024;
/// 4KB default page_size
pub const DEFAULT_PAGE_SIZE: usize = 4096;
/// Maximum size for each Arena (64MB total)
const MAX_ARENA_SIZE: usize = 32 * 1024 * 1024;
pub fn new(arena_size: usize) -> Self {
fn new(arena_size: usize) -> Self {
turso_assert!(
arena_size < Self::MAX_ARENA_SIZE,
"Arena size cannot exceed {} bytes",
@@ -125,16 +148,26 @@ impl BufferPool {
}
}
/// Request a `Buffer` of size `len`
pub fn allocate(len: usize) -> Buffer {
let pool = BUFFER_POOL.get().expect("BufferPool must be initialized");
pool.inner().allocate(len)
}
/// Request a `Buffer` the size of the `db_page_size` the `BufferPool`
/// was initialized with.
pub fn get_page(&self) -> Buffer {
let inner = self.inner_mut();
inner.get_page()
}
/// Request a `Buffer` for use with a WAL frame,
/// `[Database::page_size] + `WAL_FRAME_HEADER_SIZE`
pub fn get_wal_frame(&self) -> Buffer {
let inner = self.inner_mut();
inner.get_frame()
}
fn inner(&self) -> &PoolInner {
unsafe { &*self.inner.get() }
}
@@ -145,8 +178,8 @@ impl BufferPool {
}
/// Create a static `BufferPool` initialize the pool to the default page size, **without**
/// creating an arena. Arena will be created when `[BufferPool::finalize_page_size]` is called.
/// Until then the pool will return temporary buffers to prevent reallocation of the
/// poulating the Arenas. Arenas will not be created until `[BufferPool::finalize_page_size]`,
/// and the pool will temporarily return temporary buffers to prevent reallocation of the
/// arena if the page size is set to something other than the default value.
pub fn begin_init(io: &Arc<dyn IO>, arena_size: usize) -> Arc<Self> {
let pool = BUFFER_POOL.get_or_init(|| Arc::new(BufferPool::new(arena_size)));
@@ -181,6 +214,8 @@ impl BufferPool {
}
#[inline]
/// Marks the underlying pages for an `ArenaBuffer` on `Drop` as free and
/// available for use by another allocation.
pub fn free(&self, size: usize, arena_id: u32, page_id: u32) {
self.inner_mut().free(size, arena_id, page_id);
}
@@ -188,7 +223,7 @@ impl BufferPool {
impl PoolInner {
/// Allocate a buffer of the given length from the pool, falling back to
/// temporary thread local buffers if the pool is not initialized or full
/// temporary thread local buffers if the pool is not initialized or full.
pub fn allocate(&self, len: usize) -> Buffer {
turso_assert!(len > 0, "Cannot allocate zero-length buffer");
@@ -244,6 +279,23 @@ impl PoolInner {
Buffer::new_temporary(db_page_size)
}
fn get_frame(&mut self) -> Buffer {
let len = self.db_page_size.load(Ordering::Relaxed) + WAL_FRAME_HEADER_SIZE;
let Some(wal_arena) = self.wal_frame_arena.as_ref() else {
return Buffer::new_temporary(len);
};
if let Some(FreeEntry { ptr, first_idx }) = wal_arena.try_alloc(len) {
tracing::trace!(
"Allocated WAL frame buffer of length {} from arena {} at index {}",
len,
wal_arena.id,
first_idx
);
return Buffer::new_pooled(ArenaBuffer::new(ptr, len, wal_arena.id, first_idx));
}
Buffer::new_temporary(len)
}
/// Allocate a new arena for the pool to use
fn init_arenas(&mut self) -> crate::Result<()> {
// Prevent concurrent growth
@@ -297,8 +349,17 @@ impl PoolInner {
Ok(())
}
pub fn free(&self, size: usize, arena_id: u32, page_idx: u32) {
// common case: check WAL frame arena first
fn free(&self, size: usize, arena_id: u32, page_idx: u32) {
// allocations of size `page_size` are more common, so we check that arena first.
if let Some(arena) = self.page_arena.as_ref() {
if arena_id == arena.id {
let pages = size.div_ceil(arena.page_size);
tracing::trace!("Freeing {} pages from arena {}", pages, arena_id);
arena.free(page_idx, pages);
return;
}
}
// check WAL frame arena
if let Some(wal_arena) = self.wal_frame_arena.as_ref() {
if arena_id == wal_arena.id {
let pages = size.div_ceil(wal_arena.page_size);
@@ -307,38 +368,28 @@ impl PoolInner {
return;
}
}
// Otherwise use regular arena
let arena = self
.page_arena
.as_ref()
.expect("pool arena not initialized, cannot free buffer");
let pages = size.div_ceil(arena.page_size);
tracing::trace!("Freeing {} pages from arena {}", pages, arena_id);
turso_assert!(
arena_id == arena.id,
"should not free from different arena. {arena_id} != {}",
arena.id
);
arena.free(page_idx, pages);
panic!("ArenaBuffer freed with no available parent Arena");
}
}
/// Preallocated block of memory used by the pool to distribute Buffers
/// Preallocated block of memory used by the pool to distribute `ArenaBuffer`s
struct Arena {
/// Identifier to tie allocations back to the arena
/// Identifier to tie allocations back to the arena. If the arena is registerd
/// with `io_uring`, then the ID represents the index of the arena into the ring's
/// sparse registered buffer array created on the ring's initialization.
id: u32,
/// base pointer to the arena returned by `mmap`
/// Base pointer to the arena returned by `mmap`
base: NonNull<u8>,
/// Total number of pages currently allocated from this arena
/// Total number of pages currently allocated/in use.
allocated_pages: AtomicUsize,
/// Currently free pages
/// Currently free pages.
free_pages: SpinLock<PageBitmap>,
/// Total size of the arena in bytes
arena_size: usize,
/// Page size the total arena is divided into.
/// Because most allocations are of size `[Database::page_size]`, with an
/// additional 24 byte wal frame header, we treat this as the `page_size` to reduce
/// fragmentation to 24 bytes for regular pages
/// fragmentation to 24 bytes for regular pages.
page_size: usize,
}
@@ -353,9 +404,11 @@ struct FreeEntry {
first_idx: u32,
}
/// Slots 0 and 1 will be reserved for Arenas which are registered buffers
/// with io_uring.
const UNREGISTERED_START: u32 = 2;
/// For an arena which isn't registered with `io_uring`
/// ID's for an Arena which is not registered with `io_uring`
/// registered arena will always have id = 0..=1
static NEXT_ID: AtomicU32 = AtomicU32::new(UNREGISTERED_START);
@@ -384,16 +437,20 @@ impl Arena {
})
}
/// Attempt to mark N pages as `allocated` and return a pointer to and
/// index of the first page in the allocation.
pub fn try_alloc(&self, size: usize) -> Option<FreeEntry> {
let pages = size.div_ceil(self.page_size) as u32;
let mut freemap = self.free_pages.lock();
let first_idx = if pages == 1 {
// use the optimized method for individual pages which attempts
// to leave large contiguous areas free of fragmentation for
// larger `runs`
freemap.alloc_one()?
} else {
freemap.alloc_run(pages)?
};
self.allocated_pages
.fetch_add(pages as usize, Ordering::Relaxed);
let offset = first_idx as usize * self.page_size;
@@ -401,6 +458,7 @@ impl Arena {
Some(FreeEntry { ptr, first_idx })
}
/// Mark `count` pages starting at `page_idx` as free.
pub fn free(&self, page_idx: u32, count: usize) {
let mut bm = self.free_pages.lock();
bm.free_run(page_idx, count as u32);

View File

@@ -1760,15 +1760,13 @@ pub fn read_entire_wal_dumb(file: &Arc<dyn File>) -> Result<Arc<UnsafeCell<WalFi
}
pub fn begin_read_wal_frame_raw(
buffer_pool: &Arc<BufferPool>,
io: &Arc<dyn File>,
offset: usize,
page_size: u32,
complete: Box<Complete>,
) -> Result<Completion> {
tracing::trace!("begin_read_wal_frame_raw(offset={})", offset);
let buf = Arc::new(BufferPool::allocate(
page_size as usize + WAL_FRAME_HEADER_SIZE,
));
let buf = Arc::new(buffer_pool.get_wal_frame());
#[allow(clippy::arc_with_non_send_sync)]
let c = Completion::new_read(buf, complete);
let c = io.pread(offset, c)?;
@@ -1810,6 +1808,7 @@ pub fn parse_wal_frame_header(frame: &[u8]) -> (WalFrameHeader, &[u8]) {
}
pub fn prepare_wal_frame(
buffer_pool: &Arc<BufferPool>,
wal_header: &WalHeader,
prev_checksums: (u32, u32),
page_size: u32,
@@ -1819,7 +1818,7 @@ pub fn prepare_wal_frame(
) -> ((u32, u32), Arc<Buffer>) {
tracing::trace!(page_number);
let buffer = BufferPool::allocate(page_size as usize + WAL_FRAME_HEADER_SIZE);
let buffer = buffer_pool.get_wal_frame();
let frame = buffer.as_mut_slice();
frame[WAL_FRAME_HEADER_SIZE..].copy_from_slice(page);

View File

@@ -933,7 +933,7 @@ impl Wal for WalFile {
}
});
let c =
begin_read_wal_frame_raw(&self.get_shared().file, offset, self.page_size(), complete)?;
begin_read_wal_frame_raw(&self.buffer_pool, &self.get_shared().file, offset, complete)?;
Ok(c)
}
@@ -1003,6 +1003,7 @@ impl Wal for WalFile {
let header = header.lock();
let checksums = self.last_checksum;
let (checksums, frame_bytes) = prepare_wal_frame(
&self.buffer_pool,
&header,
checksums,
header.page_size,
@@ -1044,6 +1045,7 @@ impl Wal for WalFile {
let page_content = page.get_contents();
let page_buf = page_content.as_ptr();
let (frame_checksums, frame_bytes) = prepare_wal_frame(
&self.buffer_pool,
&header,
checksums,
header.page_size,