diff --git a/core/io/io_uring.rs b/core/io/io_uring.rs index ffd1da137..e9a483159 100644 --- a/core/io/io_uring.rs +++ b/core/io/io_uring.rs @@ -269,6 +269,18 @@ impl InnerUringIO { self.free_files.push_back(id); Ok(()) } + + #[cfg(debug_assertions)] + fn debug_check_fixed(&self, idx: u32, ptr: *const u8, len: usize) { + let (base, blen) = self.free_arenas[idx as usize].expect("slot not registered"); + let start = base.as_ptr() as usize; + let end = start + blen; + let p = ptr as usize; + assert!( + p >= start && p + len <= end, + "Fixed operation, pointer out of registered range" + ); + } } impl WrappedIOUring { @@ -564,7 +576,6 @@ impl UringFile { self.id } } - unsafe impl Send for UringFile {} unsafe impl Sync for UringFile {} @@ -641,26 +652,27 @@ impl File for UringFile { fn pwrite(&self, pos: usize, buffer: Arc, c: Completion) -> Result { let mut io = self.io.borrow_mut(); let write = { + let ptr = buf.as_ptr(); + let len = buf.len(); with_fd!(self, |fd| { if let Some(idx) = buf.fixed_id() { trace!( "pwrite_fixed(pos = {}, length = {}, idx= {})", pos, - buffer.len(), + len, idx ); - io_uring::opcode::WriteFixed::new( - fd, - buffer.as_ptr(), - buffer.len() as u32, - idx as u16, - ) - .offset(pos as u64) - .build() - .user_data(get_key(c.clone())) + #[cfg(debug_assertions)] + { + io.debug_check_fixed(idx, ptr, len); + } + io_uring::opcode::WriteFixed::new(fd, ptr, len as u32, idx as u16) + .offset(pos as u64) + .build() + .user_data(get_key(c.clone())) } else { trace!("pwrite(pos = {}, length = {})", pos, buffer.len()); - io_uring::opcode::Write::new(fd, buffer.as_ptr(), buffer.len() as u32) + io_uring::opcode::Write::new(fd, ptr, len as u32) .offset(pos as u64) .build() .user_data(get_key(c.clone())) diff --git a/core/storage/buffer_pool.rs b/core/storage/buffer_pool.rs index 05529505f..f2802febe 100644 --- a/core/storage/buffer_pool.rs +++ b/core/storage/buffer_pool.rs @@ -14,9 +14,16 @@ pub static BUFFER_POOL: OnceLock> = OnceLock::new(); #[derive(Debug)] /// A buffer allocated from an arena from `[BufferPool]` pub struct ArenaBuffer { + /// Pointer to the start of the buffer ptr: NonNull, + /// Identifier for the `[Arena]` the buffer came from arena_id: u32, + /// The index of the first page making up the buffer page_idx: u32, + /// The requested length of the allocation. + /// The actual size of what is allocated for the + /// buffer is `len` rounded up to the next multiple of + /// `[Arena::page_size]` len: usize, } @@ -32,7 +39,8 @@ impl ArenaBuffer { #[inline(always)] /// Returns the `id` of the underlying arena, only if it was registered with `io_uring` - pub fn fixed_id(&self) -> Option { + pub const fn fixed_id(&self) -> Option { + // Arenas which are not registered will have `id`s <= UNREGISTERED_START if self.arena_id < UNREGISTERED_START { Some(self.arena_id) } else { @@ -76,7 +84,8 @@ impl std::ops::DerefMut for ArenaBuffer { } } -/// Static Buffer pool managing multiple arenas +/// Static Buffer pool managing multiple memory arenas +/// of which `[ArenaBuffer]`s are returned for requested allocations pub struct BufferPool { inner: UnsafeCell, } @@ -84,11 +93,21 @@ unsafe impl Sync for BufferPool {} unsafe impl Send for BufferPool {} struct PoolInner { + /// An instance of the program's IO, used for registering + /// Arena's with io_uring. io: Option>, + /// An Arena which returns `ArenaBuffer`s of size `db_page_size`. page_arena: Option, + /// An Arena which returns `ArenaBuffer`s of size `db_page_size` + /// plus 24 byte `WAL_FRAME_HEADER_SIZE`, preventing the fragmentation + /// or complex book-keeping needed to use the same arena for both sizes. wal_frame_arena: Option, + /// A lock preventing concurrent initialization. init_lock: Mutex<()>, + /// The size of each `Arena`, in bytes. arena_size: AtomicUsize, + /// The `[Database::page_size]`, which the `page_arena` will use to + /// return buffers from `Self::get_page`. db_page_size: AtomicUsize, } @@ -102,12 +121,16 @@ impl Default for BufferPool { } impl BufferPool { - pub const DEFAULT_ARENA_SIZE: usize = 4 * 1024 * 1024; // 4MB arena - pub const TEST_AREA_SIZE: usize = 1024 * 1024; // 1MB arena for testing - pub const DEFAULT_PAGE_SIZE: usize = 4096; // 4KB default page size - const MAX_ARENA_SIZE: usize = 32 * 1024 * 1024; // 32MB max arena + /// 4MB Default size for each `Arena` + pub const DEFAULT_ARENA_SIZE: usize = 4 * 1024 * 1024; + /// 1MB size For testing/CI + pub const TEST_AREA_SIZE: usize = 1024 * 1024; + /// 4KB default page_size + pub const DEFAULT_PAGE_SIZE: usize = 4096; + /// Maximum size for each Arena (64MB total) + const MAX_ARENA_SIZE: usize = 32 * 1024 * 1024; - pub fn new(arena_size: usize) -> Self { + fn new(arena_size: usize) -> Self { turso_assert!( arena_size < Self::MAX_ARENA_SIZE, "Arena size cannot exceed {} bytes", @@ -125,16 +148,26 @@ impl BufferPool { } } + /// Request a `Buffer` of size `len` pub fn allocate(len: usize) -> Buffer { let pool = BUFFER_POOL.get().expect("BufferPool must be initialized"); pool.inner().allocate(len) } + /// Request a `Buffer` the size of the `db_page_size` the `BufferPool` + /// was initialized with. pub fn get_page(&self) -> Buffer { let inner = self.inner_mut(); inner.get_page() } + /// Request a `Buffer` for use with a WAL frame, + /// `[Database::page_size] + `WAL_FRAME_HEADER_SIZE` + pub fn get_wal_frame(&self) -> Buffer { + let inner = self.inner_mut(); + inner.get_frame() + } + fn inner(&self) -> &PoolInner { unsafe { &*self.inner.get() } } @@ -145,8 +178,8 @@ impl BufferPool { } /// Create a static `BufferPool` initialize the pool to the default page size, **without** - /// creating an arena. Arena will be created when `[BufferPool::finalize_page_size]` is called. - /// Until then the pool will return temporary buffers to prevent reallocation of the + /// poulating the Arenas. Arenas will not be created until `[BufferPool::finalize_page_size]`, + /// and the pool will temporarily return temporary buffers to prevent reallocation of the /// arena if the page size is set to something other than the default value. pub fn begin_init(io: &Arc, arena_size: usize) -> Arc { let pool = BUFFER_POOL.get_or_init(|| Arc::new(BufferPool::new(arena_size))); @@ -181,6 +214,8 @@ impl BufferPool { } #[inline] + /// Marks the underlying pages for an `ArenaBuffer` on `Drop` as free and + /// available for use by another allocation. pub fn free(&self, size: usize, arena_id: u32, page_id: u32) { self.inner_mut().free(size, arena_id, page_id); } @@ -188,7 +223,7 @@ impl BufferPool { impl PoolInner { /// Allocate a buffer of the given length from the pool, falling back to - /// temporary thread local buffers if the pool is not initialized or full + /// temporary thread local buffers if the pool is not initialized or full. pub fn allocate(&self, len: usize) -> Buffer { turso_assert!(len > 0, "Cannot allocate zero-length buffer"); @@ -244,6 +279,23 @@ impl PoolInner { Buffer::new_temporary(db_page_size) } + fn get_frame(&mut self) -> Buffer { + let len = self.db_page_size.load(Ordering::Relaxed) + WAL_FRAME_HEADER_SIZE; + let Some(wal_arena) = self.wal_frame_arena.as_ref() else { + return Buffer::new_temporary(len); + }; + if let Some(FreeEntry { ptr, first_idx }) = wal_arena.try_alloc(len) { + tracing::trace!( + "Allocated WAL frame buffer of length {} from arena {} at index {}", + len, + wal_arena.id, + first_idx + ); + return Buffer::new_pooled(ArenaBuffer::new(ptr, len, wal_arena.id, first_idx)); + } + Buffer::new_temporary(len) + } + /// Allocate a new arena for the pool to use fn init_arenas(&mut self) -> crate::Result<()> { // Prevent concurrent growth @@ -297,8 +349,17 @@ impl PoolInner { Ok(()) } - pub fn free(&self, size: usize, arena_id: u32, page_idx: u32) { - // common case: check WAL frame arena first + fn free(&self, size: usize, arena_id: u32, page_idx: u32) { + // allocations of size `page_size` are more common, so we check that arena first. + if let Some(arena) = self.page_arena.as_ref() { + if arena_id == arena.id { + let pages = size.div_ceil(arena.page_size); + tracing::trace!("Freeing {} pages from arena {}", pages, arena_id); + arena.free(page_idx, pages); + return; + } + } + // check WAL frame arena if let Some(wal_arena) = self.wal_frame_arena.as_ref() { if arena_id == wal_arena.id { let pages = size.div_ceil(wal_arena.page_size); @@ -307,38 +368,28 @@ impl PoolInner { return; } } - // Otherwise use regular arena - let arena = self - .page_arena - .as_ref() - .expect("pool arena not initialized, cannot free buffer"); - let pages = size.div_ceil(arena.page_size); - tracing::trace!("Freeing {} pages from arena {}", pages, arena_id); - turso_assert!( - arena_id == arena.id, - "should not free from different arena. {arena_id} != {}", - arena.id - ); - arena.free(page_idx, pages); + panic!("ArenaBuffer freed with no available parent Arena"); } } -/// Preallocated block of memory used by the pool to distribute Buffers +/// Preallocated block of memory used by the pool to distribute `ArenaBuffer`s struct Arena { - /// Identifier to tie allocations back to the arena + /// Identifier to tie allocations back to the arena. If the arena is registerd + /// with `io_uring`, then the ID represents the index of the arena into the ring's + /// sparse registered buffer array created on the ring's initialization. id: u32, - /// base pointer to the arena returned by `mmap` + /// Base pointer to the arena returned by `mmap` base: NonNull, - /// Total number of pages currently allocated from this arena + /// Total number of pages currently allocated/in use. allocated_pages: AtomicUsize, - /// Currently free pages + /// Currently free pages. free_pages: SpinLock, /// Total size of the arena in bytes arena_size: usize, /// Page size the total arena is divided into. /// Because most allocations are of size `[Database::page_size]`, with an /// additional 24 byte wal frame header, we treat this as the `page_size` to reduce - /// fragmentation to 24 bytes for regular pages + /// fragmentation to 24 bytes for regular pages. page_size: usize, } @@ -353,9 +404,11 @@ struct FreeEntry { first_idx: u32, } +/// Slots 0 and 1 will be reserved for Arenas which are registered buffers +/// with io_uring. const UNREGISTERED_START: u32 = 2; -/// For an arena which isn't registered with `io_uring` +/// ID's for an Arena which is not registered with `io_uring` /// registered arena will always have id = 0..=1 static NEXT_ID: AtomicU32 = AtomicU32::new(UNREGISTERED_START); @@ -384,16 +437,20 @@ impl Arena { }) } + /// Attempt to mark N pages as `allocated` and return a pointer to and + /// index of the first page in the allocation. pub fn try_alloc(&self, size: usize) -> Option { let pages = size.div_ceil(self.page_size) as u32; let mut freemap = self.free_pages.lock(); let first_idx = if pages == 1 { + // use the optimized method for individual pages which attempts + // to leave large contiguous areas free of fragmentation for + // larger `runs` freemap.alloc_one()? } else { freemap.alloc_run(pages)? }; - self.allocated_pages .fetch_add(pages as usize, Ordering::Relaxed); let offset = first_idx as usize * self.page_size; @@ -401,6 +458,7 @@ impl Arena { Some(FreeEntry { ptr, first_idx }) } + /// Mark `count` pages starting at `page_idx` as free. pub fn free(&self, page_idx: u32, count: usize) { let mut bm = self.free_pages.lock(); bm.free_run(page_idx, count as u32); diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index f6cf88881..d2c24770a 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -1760,15 +1760,13 @@ pub fn read_entire_wal_dumb(file: &Arc) -> Result, io: &Arc, offset: usize, - page_size: u32, complete: Box, ) -> Result { tracing::trace!("begin_read_wal_frame_raw(offset={})", offset); - let buf = Arc::new(BufferPool::allocate( - page_size as usize + WAL_FRAME_HEADER_SIZE, - )); + let buf = Arc::new(buffer_pool.get_wal_frame()); #[allow(clippy::arc_with_non_send_sync)] let c = Completion::new_read(buf, complete); let c = io.pread(offset, c)?; @@ -1810,6 +1808,7 @@ pub fn parse_wal_frame_header(frame: &[u8]) -> (WalFrameHeader, &[u8]) { } pub fn prepare_wal_frame( + buffer_pool: &Arc, wal_header: &WalHeader, prev_checksums: (u32, u32), page_size: u32, @@ -1819,7 +1818,7 @@ pub fn prepare_wal_frame( ) -> ((u32, u32), Arc) { tracing::trace!(page_number); - let buffer = BufferPool::allocate(page_size as usize + WAL_FRAME_HEADER_SIZE); + let buffer = buffer_pool.get_wal_frame(); let frame = buffer.as_mut_slice(); frame[WAL_FRAME_HEADER_SIZE..].copy_from_slice(page); diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 1f7f8e0c2..ccc92677f 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -933,7 +933,7 @@ impl Wal for WalFile { } }); let c = - begin_read_wal_frame_raw(&self.get_shared().file, offset, self.page_size(), complete)?; + begin_read_wal_frame_raw(&self.buffer_pool, &self.get_shared().file, offset, complete)?; Ok(c) } @@ -1003,6 +1003,7 @@ impl Wal for WalFile { let header = header.lock(); let checksums = self.last_checksum; let (checksums, frame_bytes) = prepare_wal_frame( + &self.buffer_pool, &header, checksums, header.page_size, @@ -1044,6 +1045,7 @@ impl Wal for WalFile { let page_content = page.get_contents(); let page_buf = page_content.as_ptr(); let (frame_checksums, frame_bytes) = prepare_wal_frame( + &self.buffer_pool, &header, checksums, header.page_size,