From 9d1ca1c8ca30602d80245182de66da1ac77edbb1 Mon Sep 17 00:00:00 2001 From: PThorpe92 Date: Sat, 2 Aug 2025 18:27:00 -0400 Subject: [PATCH] Add ReadFixed/WriteFixed opcodes for buffers from registered arena --- core/io/io_uring.rs | 47 ++++++++++++++++++++++++++++++------- core/lib.rs | 2 +- core/storage/btree.rs | 2 +- core/storage/buffer_pool.rs | 10 ++++---- core/storage/pager.rs | 2 +- 5 files changed, 48 insertions(+), 15 deletions(-) diff --git a/core/io/io_uring.rs b/core/io/io_uring.rs index 61ba4badf..c39bff69a 100644 --- a/core/io/io_uring.rs +++ b/core/io/io_uring.rs @@ -584,17 +584,30 @@ impl File for UringFile { fn pread(&self, pos: usize, c: Completion) -> Result { let r = c.as_read(); - trace!("pread(pos = {}, length = {})", pos, r.buf().len()); let mut io = self.io.borrow_mut(); let read_e = { let buf = r.buf(); let len = buf.len(); - let buf = buf.as_mut_ptr(); with_fd!(self, |fd| { - io_uring::opcode::Read::new(fd, buf, len as u32) - .offset(pos as u64) - .build() - .user_data(get_key(c.clone())) + if let Some(idx) = buf.fixed_id() { + trace!( + "pread_fixed(pos = {}, length = {}, idx = {})", + pos, + len, + idx + ); + io_uring::opcode::ReadFixed::new(fd, buf.as_mut_ptr(), len as u32, idx as u16) + .offset(pos as u64) + .build() + .user_data(get_key(c.clone())) + } else { + trace!("pread(pos = {}, length = {})", pos, len); + // Use Read opcode if fixed buffer is not available + io_uring::opcode::Read::new(fd, buf.as_mut_ptr(), len as u32) + .offset(pos as u64) + .build() + .user_data(get_key(c.clone())) + } }) }; io.ring.submit_entry(&read_e); @@ -604,12 +617,30 @@ impl File for UringFile { fn pwrite(&self, pos: usize, buffer: Arc, c: Completion) -> Result { let mut io = self.io.borrow_mut(); let write = { - trace!("pwrite(pos = {}, length = {})", pos, buffer.len()); with_fd!(self, |fd| { - io_uring::opcode::Write::new(fd, buffer.as_ptr(), buffer.len() as u32) + if let Some(idx) = buf.fixed_id() { + trace!( + "pwrite_fixed(pos = {}, length = {}, idx= {})", + pos, + buffer.len(), + idx + ); + io_uring::opcode::WriteFixed::new( + fd, + buffer.as_ptr(), + buffer.len() as u32, + idx as u16, + ) .offset(pos as u64) .build() .user_data(get_key(c.clone())) + } else { + trace!("pwrite(pos = {}, length = {})", pos, buffer.len()); + io_uring::opcode::Write::new(fd, buffer.as_ptr(), buffer.len() as u32) + .offset(pos as u64) + .build() + .user_data(get_key(c.clone())) + } }) }; io.ring.submit_entry(&write); diff --git a/core/lib.rs b/core/lib.rs index f79c9358d..adc9849fb 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -393,7 +393,7 @@ impl Database { Some(size) => size, }; let buffer_pool = - BufferPool::begin_init(&self.io, arena_size).finalize_page_size(size)?; + BufferPool::begin_init(&self.io, arena_size).finalize_with_page_size(size)?; let db_state = self.db_state.clone(); let wal = Rc::new(RefCell::new(WalFile::new( diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 2d432fe89..b292357a0 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -8452,7 +8452,7 @@ mod tests { let io: Arc = Arc::new(MemoryIO::new()); let buffer_pool = - BufferPool::begin_init(&io, page_size * 128).finalize_page_size(page_size); + BufferPool::begin_init(&io, page_size * 128).finalize_with_page_size(page_size); let db_file = Arc::new(DatabaseFile::new( io.open_file(":memory:", OpenFlags::Create, false).unwrap(), diff --git a/core/storage/buffer_pool.rs b/core/storage/buffer_pool.rs index b0d2b79c0..dbe9e0f92 100644 --- a/core/storage/buffer_pool.rs +++ b/core/storage/buffer_pool.rs @@ -177,9 +177,9 @@ impl BufferPool { unsafe { &mut *self.inner.get() } } - /// Initialize the pool to the default page size, **without** creating an arena - /// Arena will be created when finalize_page_size is called, - /// until then the pool will return temporary buffers to prevent reallocation of the + /// Create a static `BufferPool` initialize the pool to the default page size, **without** + /// creating an arena. Arena will be created when `[BufferPool::finalize_page_size]` is called. + /// Until then the pool will return temporary buffers to prevent reallocation of the /// arena if the page size is set to something other than the default value. pub fn begin_init(io: &Arc, arena_size: usize) -> Arc { let pool = BUFFER_POOL.get_or_init(|| Arc::new(BufferPool::new(arena_size))); @@ -191,7 +191,9 @@ impl BufferPool { pool.clone() } - pub fn finalize_page_size(&self, page_size: usize) -> crate::Result> { + /// Call when `[Database::db_state]` is initialized, providing the `page_size` to allocate + /// an arena for the pool. Before this call, the pool will use temporary buffers + pub fn finalize_with_page_size(&self, page_size: usize) -> crate::Result> { let pool = BUFFER_POOL.get().expect("BufferPool must be initialized"); let inner = pool.inner_mut(); tracing::trace!("finalize page size called with size {page_size}"); diff --git a/core/storage/pager.rs b/core/storage/pager.rs index fc2562ba5..74cfa1594 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -1767,7 +1767,7 @@ impl Pager { default_header.page_size = PageSize::new(size).expect("page size"); } self.buffer_pool - .finalize_page_size(default_header.page_size.get() as usize)?; + .finalize_with_page_size(default_header.page_size.get() as usize)?; let page = allocate_new_page(1, &self.buffer_pool, 0); let contents = page.get_contents();