diff --git a/core/io/io_uring.rs b/core/io/io_uring.rs index 61ba4badf..a5b32c661 100644 --- a/core/io/io_uring.rs +++ b/core/io/io_uring.rs @@ -5,6 +5,7 @@ use crate::io::clock::{Clock, Instant}; use crate::storage::wal::CKPT_BATCH_PAGES; use crate::{turso_assert, LimboError, MemoryIO, Result}; use rustix::fs::{self, FlockOperation, OFlags}; +use std::ptr::NonNull; use std::{ cell::RefCell, collections::{HashMap, VecDeque}, @@ -32,7 +33,7 @@ const FILES: u32 = 8; const IOVEC_POOL_SIZE: usize = 64; /// Maximum number of iovec entries per writev operation. -/// IOV_MAX is typically 1024, but we limit it to a smaller number +/// IOV_MAX is typically 1024 const MAX_IOVEC_ENTRIES: usize = CKPT_BATCH_PAGES; /// Maximum number of I/O operations to wait for in a single run, @@ -40,6 +41,9 @@ const MAX_IOVEC_ENTRIES: usize = CKPT_BATCH_PAGES; /// make, but can increase single operation latency. const MAX_WAIT: usize = 4; +/// One memory arena for DB pages and another for WAL frames +const ARENA_COUNT: usize = 2; + pub struct UringIO { inner: Rc>, } @@ -57,6 +61,7 @@ struct WrappedIOUring { struct InnerUringIO { ring: WrappedIOUring, free_files: VecDeque, + free_arenas: [Option<(NonNull, usize)>; ARENA_COUNT], } /// preallocated vec of iovec arrays to avoid allocations during writev operations @@ -108,7 +113,8 @@ impl UringIO { // RL_MEMLOCK cap is typically 8MB, the current design is to have one large arena // registered at startup and therefore we can simply use the zero index, falling back // to similar logic as the existing buffer pool for cases where it is over capacity. - ring.submitter().register_buffers_sparse(1)?; + ring.submitter() + .register_buffers_sparse(ARENA_COUNT as u32)?; let inner = InnerUringIO { ring: WrappedIOUring { ring, @@ -117,6 +123,7 @@ impl UringIO { iov_pool: IovecPool::new(), }, free_files: (0..FILES).collect(), + free_arenas: [const { None }; ARENA_COUNT], }; debug!("Using IO backend 'io-uring'"); Ok(Self { @@ -263,6 +270,18 @@ impl InnerUringIO { self.free_files.push_back(id); Ok(()) } + + #[cfg(debug_assertions)] + fn debug_check_fixed(&self, idx: u32, ptr: *const u8, len: usize) { + let (base, blen) = self.free_arenas[idx as usize].expect("slot not registered"); + let start = base.as_ptr() as usize; + let end = start + blen; + let p = ptr as usize; + turso_assert!( + p >= start && p + len <= end, + "Fixed operation, pointer out of registered range" + ); + } } impl WrappedIOUring { @@ -333,21 +352,34 @@ impl WrappedIOUring { break; } } + // If we have coalesced everything into a single iovec, submit as a single`pwrite` if iov_count == 1 { - // If we have coalesced everything into a single iovec, submit as a single`pwrite` let entry = with_fd!(st.file_id, |fd| { - io_uring::opcode::Write::new( - fd, - iov_allocation[0].iov_base as *const u8, - iov_allocation[0].iov_len as u32, - ) - .offset(st.file_pos as u64) - .build() - .user_data(key) + if let Some(id) = st.bufs[st.current_buffer_idx].fixed_id() { + io_uring::opcode::WriteFixed::new( + fd, + iov_allocation[0].iov_base as *const u8, + iov_allocation[0].iov_len as u32, + id as u16, + ) + .offset(st.file_pos as u64) + .build() + .user_data(key) + } else { + io_uring::opcode::Write::new( + fd, + iov_allocation[0].iov_base as *const u8, + iov_allocation[0].iov_len as u32, + ) + .offset(st.file_pos as u64) + .build() + .user_data(key) + } }); self.submit_entry(&entry); return; } + // Store the pointers and get the pointer to the iovec array that we pass // to the writev operation, and keep the array itself alive let ptr = iov_allocation.as_ptr() as *mut libc::iovec; @@ -477,16 +509,20 @@ impl IO for UringIO { Arc::new(MemoryIO::new()) } - fn register_fixed_buffer(&self, ptr: std::ptr::NonNull, len: usize) -> Result<()> { + fn register_fixed_buffer(&self, ptr: std::ptr::NonNull, len: usize) -> Result { turso_assert!( len % 512 == 0, "fixed buffer length must be logical block aligned" ); - let inner = self.inner.borrow_mut(); - let default_id = 0; + let mut inner = self.inner.borrow_mut(); + let slot = inner + .free_arenas + .iter() + .position(|e| e.is_none()) + .ok_or_else(|| LimboError::UringIOError("no free fixed buffer slots".into()))?; unsafe { inner.ring.ring.submitter().register_buffers_update( - default_id, + slot as u32, &[libc::iovec { iov_base: ptr.as_ptr() as *mut libc::c_void, iov_len: len, @@ -494,7 +530,8 @@ impl IO for UringIO { None, )? }; - Ok(()) + inner.free_arenas[slot] = Some((ptr, len)); + Ok(slot as u32) } } @@ -540,7 +577,6 @@ impl UringFile { self.id } } - unsafe impl Send for UringFile {} unsafe impl Sync for UringFile {} @@ -584,17 +620,35 @@ impl File for UringFile { fn pread(&self, pos: usize, c: Completion) -> Result { let r = c.as_read(); - trace!("pread(pos = {}, length = {})", pos, r.buf().len()); let mut io = self.io.borrow_mut(); let read_e = { let buf = r.buf(); + let ptr = buf.as_mut_ptr(); let len = buf.len(); - let buf = buf.as_mut_ptr(); with_fd!(self, |fd| { - io_uring::opcode::Read::new(fd, buf, len as u32) - .offset(pos as u64) - .build() - .user_data(get_key(c.clone())) + if let Some(idx) = buf.fixed_id() { + trace!( + "pread_fixed(pos = {}, length = {}, idx = {})", + pos, + len, + idx + ); + #[cfg(debug_assertions)] + { + io.debug_check_fixed(idx, ptr, len); + } + io_uring::opcode::ReadFixed::new(fd, ptr, len as u32, idx as u16) + .offset(pos as u64) + .build() + .user_data(get_key(c.clone())) + } else { + trace!("pread(pos = {}, length = {})", pos, len); + // Use Read opcode if fixed buffer is not available + io_uring::opcode::Read::new(fd, buf.as_mut_ptr(), len as u32) + .offset(pos as u64) + .build() + .user_data(get_key(c.clone())) + } }) }; io.ring.submit_entry(&read_e); @@ -604,12 +658,31 @@ impl File for UringFile { fn pwrite(&self, pos: usize, buffer: Arc, c: Completion) -> Result { let mut io = self.io.borrow_mut(); let write = { - trace!("pwrite(pos = {}, length = {})", pos, buffer.len()); + let ptr = buffer.as_ptr(); + let len = buffer.len(); with_fd!(self, |fd| { - io_uring::opcode::Write::new(fd, buffer.as_ptr(), buffer.len() as u32) - .offset(pos as u64) - .build() - .user_data(get_key(c.clone())) + if let Some(idx) = buffer.fixed_id() { + trace!( + "pwrite_fixed(pos = {}, length = {}, idx= {})", + pos, + len, + idx + ); + #[cfg(debug_assertions)] + { + io.debug_check_fixed(idx, ptr, len); + } + io_uring::opcode::WriteFixed::new(fd, ptr, len as u32, idx as u16) + .offset(pos as u64) + .build() + .user_data(get_key(c.clone())) + } else { + trace!("pwrite(pos = {}, length = {})", pos, buffer.len()); + io_uring::opcode::Write::new(fd, ptr, len as u32) + .offset(pos as u64) + .build() + .user_data(get_key(c.clone())) + } }) }; io.ring.submit_entry(&write); diff --git a/core/io/mod.rs b/core/io/mod.rs index 52b9be2e2..74c71b378 100644 --- a/core/io/mod.rs +++ b/core/io/mod.rs @@ -1,10 +1,13 @@ -use crate::Result; +use crate::storage::buffer_pool::ArenaBuffer; +use crate::storage::sqlite3_ondisk::WAL_FRAME_HEADER_SIZE; +use crate::{BufferPool, Result}; use bitflags::bitflags; use cfg_block::cfg_block; +use std::cell::RefCell; use std::fmt; use std::ptr::NonNull; use std::sync::Arc; -use std::{cell::Cell, fmt::Debug, mem::ManuallyDrop, pin::Pin, rc::Rc}; +use std::{cell::Cell, fmt::Debug, pin::Pin}; pub trait File: Send + Sync { fn lock_file(&self, exclusive: bool) -> Result<()>; @@ -18,6 +21,9 @@ pub trait File: Send + Sync { c.complete(0); return Ok(c); } + if buffers.len() == 1 { + return self.pwrite(pos, buffers[0].clone(), c); + } // naive default implementation can be overridden on backends where it makes sense to let mut pos = pos; let outstanding = Arc::new(AtomicUsize::new(buffers.len())); @@ -68,7 +74,7 @@ pub trait File: Send + Sync { while pos < file_size { let chunk_size = (file_size - pos).min(BUFFER_SIZE); // Read from source - let read_buffer = Arc::new(Buffer::allocate(chunk_size, Rc::new(|_| {}))); + let read_buffer = Arc::new(Buffer::new_temporary(chunk_size)); let read_completion = self.pread( pos, Completion::new_read(read_buffer.clone(), move |_, _| {}), @@ -119,7 +125,7 @@ pub trait IO: Clock + Send + Sync { fn get_memory_io(&self) -> Arc; - fn register_fixed_buffer(&self, _ptr: NonNull, _len: usize) -> Result<()> { + fn register_fixed_buffer(&self, _ptr: NonNull, _len: usize) -> Result { Err(crate::LimboError::InternalError( "unsupported operation".to_string(), )) @@ -290,62 +296,164 @@ impl TruncateCompletion { } } -pub type BufferData = Pin>; +pub type BufferData = Pin>; -pub type BufferDropFn = Rc; - -pub struct Buffer { - data: ManuallyDrop, - drop: BufferDropFn, +pub enum Buffer { + Heap(BufferData), + Pooled(ArenaBuffer), } impl Debug for Buffer { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{:?}", self.data) + match self { + Self::Pooled(p) => write!(f, "{p:?}"), + Self::Heap(buf) => write!(f, "{buf:?}: {}", buf.len()), + } } } impl Drop for Buffer { fn drop(&mut self) { - let data = unsafe { ManuallyDrop::take(&mut self.data) }; - (self.drop)(data); + let len = self.len(); + if let Self::Heap(buf) = self { + TEMP_BUFFER_CACHE.with(|cache| { + let mut cache = cache.borrow_mut(); + // take ownership of the buffer by swapping it with a dummy + let buffer = std::mem::replace(buf, Pin::new(vec![].into_boxed_slice())); + cache.return_buffer(buffer, len); + }); + } } } impl Buffer { - pub fn allocate(size: usize, drop: BufferDropFn) -> Self { - let data = ManuallyDrop::new(Pin::new(vec![0; size])); - Self { data, drop } + pub fn new(data: Vec) -> Self { + tracing::trace!("buffer::new({:?})", data); + Self::Heap(Pin::new(data.into_boxed_slice())) } - pub fn new(data: BufferData, drop: BufferDropFn) -> Self { - let data = ManuallyDrop::new(data); - Self { data, drop } + /// Returns the index of the underlying `Arena` if it was registered with + /// io_uring. Only for use with `UringIO` backend. + pub fn fixed_id(&self) -> Option { + match self { + Self::Heap { .. } => None, + Self::Pooled(buf) => buf.fixed_id(), + } + } + + pub fn new_pooled(buf: ArenaBuffer) -> Self { + tracing::trace!("new_pooled({:?})", buf); + Self::Pooled(buf) + } + + pub fn new_temporary(size: usize) -> Self { + TEMP_BUFFER_CACHE.with(|cache| { + if let Some(buffer) = cache.borrow_mut().get_buffer(size) { + Self::Heap(buffer) + } else { + Self::Heap(Pin::new(vec![0; size].into_boxed_slice())) + } + }) } pub fn len(&self) -> usize { - self.data.len() + match self { + Self::Heap(buf) => buf.len(), + Self::Pooled(buf) => buf.logical_len(), + } } pub fn is_empty(&self) -> bool { - self.data.is_empty() + self.len() == 0 } pub fn as_slice(&self) -> &[u8] { - &self.data + match self { + Self::Heap(buf) => { + // SAFETY: The buffer is guaranteed to be valid for the lifetime of the slice + unsafe { std::slice::from_raw_parts(buf.as_ptr(), buf.len()) } + } + Self::Pooled(buf) => buf, + } } #[allow(clippy::mut_from_ref)] pub fn as_mut_slice(&self) -> &mut [u8] { - unsafe { std::slice::from_raw_parts_mut(self.as_mut_ptr(), self.data.len()) } + unsafe { std::slice::from_raw_parts_mut(self.as_mut_ptr(), self.len()) } } - + #[inline] pub fn as_ptr(&self) -> *const u8 { - self.data.as_ptr() + match self { + Self::Heap(buf) => buf.as_ptr(), + Self::Pooled(buf) => buf.as_ptr(), + } + } + #[inline] + pub fn as_mut_ptr(&self) -> *mut u8 { + match self { + Self::Heap(buf) => buf.as_ptr() as *mut u8, + Self::Pooled(buf) => buf.as_ptr() as *mut u8, + } + } +} + +thread_local! { + /// thread local cache to re-use temporary buffers to prevent churn when pool overflows + pub static TEMP_BUFFER_CACHE: RefCell = RefCell::new(TempBufferCache::new()); +} + +/// A cache for temporary or any additional `Buffer` allocations beyond +/// what the `BufferPool` has room for, or for use before the pool is +/// fully initialized. +pub(crate) struct TempBufferCache { + /// The `[Database::page_size]` at the time the cache is initiated. + page_size: usize, + /// Cache of buffers of size `self.page_size`. + page_buffers: Vec, + /// Cache of buffers of size `self.page_size` + WAL_FRAME_HEADER_SIZE. + wal_frame_buffers: Vec, + /// Maximum number of buffers that will live in each cache. + max_cached: usize, +} + +impl TempBufferCache { + const DEFAULT_MAX_CACHE_SIZE: usize = 256; + + fn new() -> Self { + Self { + page_size: BufferPool::DEFAULT_PAGE_SIZE, + page_buffers: Vec::with_capacity(8), + wal_frame_buffers: Vec::with_capacity(8), + max_cached: Self::DEFAULT_MAX_CACHE_SIZE, + } } - pub fn as_mut_ptr(&self) -> *mut u8 { - self.data.as_ptr() as *mut u8 + /// If the `[Database::page_size]` is set, any temporary buffers that might + /// exist prior need to be cleared and new `page_size` needs to be saved. + pub fn reinit_cache(&mut self, page_size: usize) { + self.page_buffers.clear(); + self.wal_frame_buffers.clear(); + self.page_size = page_size; + } + + fn get_buffer(&mut self, size: usize) -> Option { + match size { + sz if sz == self.page_size => self.page_buffers.pop(), + sz if sz == (self.page_size + WAL_FRAME_HEADER_SIZE) => self.wal_frame_buffers.pop(), + _ => None, + } + } + + fn return_buffer(&mut self, buff: BufferData, len: usize) { + let sz = self.page_size; + let cache = match len { + n if n.eq(&sz) => &mut self.page_buffers, + n if n.eq(&(sz + WAL_FRAME_HEADER_SIZE)) => &mut self.wal_frame_buffers, + _ => return, + }; + if self.max_cached > cache.len() { + cache.push(buff); + } } } diff --git a/core/lib.rs b/core/lib.rs index 648037c6a..d5b21bd3d 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -380,6 +380,11 @@ impl Database { } fn init_pager(&self, page_size: Option) -> Result { + let arena_size = if std::env::var("TESTING").is_ok_and(|v| v.eq_ignore_ascii_case("true")) { + BufferPool::TEST_ARENA_SIZE + } else { + BufferPool::DEFAULT_ARENA_SIZE + }; // Open existing WAL file if present let mut maybe_shared_wal = self.maybe_shared_wal.write(); if let Some(shared_wal) = maybe_shared_wal.clone() { @@ -387,7 +392,8 @@ impl Database { None => unsafe { (*shared_wal.get()).page_size() as usize }, Some(size) => size, }; - let buffer_pool = Arc::new(BufferPool::new(Some(size))); + let buffer_pool = + BufferPool::begin_init(&self.io, arena_size).finalize_with_page_size(size)?; let db_state = self.db_state.clone(); let wal = Rc::new(RefCell::new(WalFile::new( @@ -406,8 +412,8 @@ impl Database { )?; return Ok(pager); } + let buffer_pool = BufferPool::begin_init(&self.io, arena_size); - let buffer_pool = Arc::new(BufferPool::new(page_size)); // No existing WAL; create one. let db_state = self.db_state.clone(); let mut pager = Pager::new( @@ -423,16 +429,15 @@ impl Database { let size = match page_size { Some(size) => size as u32, None => { - let size = pager + pager // if None is passed in, we know that we already initialized so we can safely call `with_header` here .io .block(|| pager.with_header(|header| header.page_size)) .unwrap_or_default() - .get(); - buffer_pool.set_page_size(size as usize); - size + .get() } }; + pager.page_size.set(Some(size)); let wal_path = format!("{}-wal", self.path); let file = self.io.open_file(&wal_path, OpenFlags::Create, false)?; let real_shared_wal = WalFileShared::new_shared(size, &self.io, file)?; @@ -1373,6 +1378,7 @@ impl Connection { } *self._db.maybe_shared_wal.write() = None; + self.pager.borrow_mut().clear_page_cache(); let pager = self._db.init_pager(Some(size as usize))?; self.pager.replace(Rc::new(pager)); self.pager.borrow().set_initial_page_size(size); diff --git a/core/storage/btree.rs b/core/storage/btree.rs index 4fc0b6245..33788b211 100644 --- a/core/storage/btree.rs +++ b/core/storage/btree.rs @@ -7215,7 +7215,6 @@ mod tests { use tempfile::TempDir; use crate::{ - io::BufferData, storage::{ btree::{compute_free_space, fill_cell_payload, payload_overflow_threshold_max}, sqlite3_ondisk::{BTreeCell, PageContent, PageType}, @@ -7230,11 +7229,7 @@ mod tests { fn get_page(id: usize) -> BTreePage { let page = Arc::new(Page::new(id)); - let drop_fn = Rc::new(|_| {}); - let inner = PageContent::new( - 0, - Arc::new(Buffer::new(BufferData::new(vec![0; 4096]), drop_fn)), - ); + let inner = PageContent::new(0, Arc::new(Buffer::new_temporary(4096))); page.get().contents.replace(inner); let page = Arc::new(BTreePageInner { page: RefCell::new(page), @@ -8455,21 +8450,15 @@ mod tests { fn setup_test_env(database_size: u32) -> Rc { let page_size = 512; - let buffer_pool = Arc::new(BufferPool::new(Some(page_size as usize))); - - // Initialize buffer pool with correctly sized buffers - for _ in 0..10 { - let vec = vec![0; page_size as usize]; // Initialize with correct length, not just capacity - buffer_pool.put(Pin::new(vec)); - } - let io: Arc = Arc::new(MemoryIO::new()); + let buffer_pool = BufferPool::begin_init(&io, page_size * 128); + let db_file = Arc::new(DatabaseFile::new( io.open_file(":memory:", OpenFlags::Create, false).unwrap(), )); let wal_file = io.open_file("test.wal", OpenFlags::Create, false).unwrap(); - let wal_shared = WalFileShared::new_shared(page_size, &io, wal_file).unwrap(); + let wal_shared = WalFileShared::new_shared(page_size as u32, &io, wal_file).unwrap(); let wal = Rc::new(RefCell::new(WalFile::new( io.clone(), wal_shared, @@ -8499,7 +8488,9 @@ mod tests { pager .io .block(|| { - pager.with_header_mut(|header| header.page_size = PageSize::new(page_size).unwrap()) + pager.with_header_mut(|header| { + header.page_size = PageSize::new(page_size as u32).unwrap() + }) }) .unwrap(); @@ -8522,16 +8513,17 @@ mod tests { // Setup overflow pages (2, 3, 4) with linking let mut current_page = 2u32; while current_page <= 4 { - let drop_fn = Rc::new(|_buf| {}); #[allow(clippy::arc_with_non_send_sync)] - let buf = Arc::new(Buffer::allocate( + let buf = Arc::new(Buffer::new_temporary( pager .io .block(|| pager.with_header(|header| header.page_size))? .get() as usize, - drop_fn, )); - let c = Completion::new_write(|_| {}); + let _buf = buf.clone(); + let c = Completion::new_write(move |_| { + let _ = _buf.clone(); + }); let _c = pager .db_file .write_page(current_page as usize, buf.clone(), c)?; diff --git a/core/storage/buffer_pool.rs b/core/storage/buffer_pool.rs index 4d7beae2b..75ba9a634 100644 --- a/core/storage/buffer_pool.rs +++ b/core/storage/buffer_pool.rs @@ -1,45 +1,497 @@ -use crate::io::BufferData; +use crate::fast_lock::SpinLock; +use crate::io::TEMP_BUFFER_CACHE; +use crate::storage::page_bitmap::PageBitmap; +use crate::storage::sqlite3_ondisk::WAL_FRAME_HEADER_SIZE; +use crate::{turso_assert, Buffer, LimboError, IO}; use parking_lot::Mutex; -use std::pin::Pin; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::cell::UnsafeCell; +use std::ptr::NonNull; +use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering}; +use std::sync::{Arc, OnceLock}; -use super::sqlite3_ondisk::PageSize; +pub static BUFFER_POOL: OnceLock> = OnceLock::new(); -pub struct BufferPool { - pub free_buffers: Mutex>, - page_size: AtomicUsize, +#[derive(Debug)] +/// A buffer allocated from an arena from `[BufferPool]` +pub struct ArenaBuffer { + /// Pointer to the start of the buffer + ptr: NonNull, + /// Identifier for the `[Arena]` the buffer came from + arena_id: u32, + /// The index of the first page making up the buffer + page_idx: u32, + /// The requested length of the allocation. + /// The actual size of what is allocated for the + /// buffer is `len` rounded up to the next multiple of + /// `[Arena::page_size]` + len: usize, } -impl BufferPool { - pub fn new(page_size: Option) -> Self { - Self { - free_buffers: Mutex::new(Vec::new()), - page_size: AtomicUsize::new(page_size.unwrap_or(PageSize::DEFAULT as usize)), +impl ArenaBuffer { + const fn new(ptr: NonNull, len: usize, arena_id: u32, page_idx: u32) -> Self { + ArenaBuffer { + ptr, + arena_id, + page_idx, + len, } } - pub fn set_page_size(&self, page_size: usize) { - self.page_size.store(page_size, Ordering::Relaxed); + #[inline(always)] + /// Returns the `id` of the underlying arena, only if it was registered with `io_uring` + pub const fn fixed_id(&self) -> Option { + // Arenas which are not registered will have `id`s <= UNREGISTERED_START + if self.arena_id < UNREGISTERED_START { + Some(self.arena_id) + } else { + None + } } - pub fn get(&self) -> BufferData { - let buffer = self.free_buffers.lock().pop(); - buffer.unwrap_or_else(|| Pin::new(vec![0; self.page_size.load(Ordering::Relaxed)])) + /// The requested size of the allocation, the actual size of the underlying buffer is rounded up to + /// the next multiple of the arena's page_size + pub const fn logical_len(&self) -> usize { + self.len } - - pub fn put(&self, buffer: BufferData) { - self.free_buffers.lock().push(buffer); + pub fn as_slice(&self) -> &[u8] { + unsafe { std::slice::from_raw_parts(self.ptr.as_ptr(), self.logical_len()) } + } + pub fn as_mut_slice(&mut self) -> &mut [u8] { + unsafe { std::slice::from_raw_parts_mut(self.ptr.as_ptr(), self.logical_len()) } } } -#[cfg(test)] -mod tests { - use super::*; - - fn is_send_sync_static() {} - - #[test] - fn test_send_sync() { - is_send_sync_static::(); +impl Drop for ArenaBuffer { + fn drop(&mut self) { + let pool = BUFFER_POOL + .get() + .expect("BufferPool not initialized, cannot free ArenaBuffer"); + let inner = pool.inner_mut(); + inner.free(self.logical_len(), self.arena_id, self.page_idx); + } +} + +impl std::ops::Deref for ArenaBuffer { + type Target = [u8]; + fn deref(&self) -> &Self::Target { + self.as_slice() + } +} + +impl std::ops::DerefMut for ArenaBuffer { + fn deref_mut(&mut self) -> &mut Self::Target { + self.as_mut_slice() + } +} + +/// Static Buffer pool managing multiple memory arenas +/// of which `[ArenaBuffer]`s are returned for requested allocations +pub struct BufferPool { + inner: UnsafeCell, +} +unsafe impl Sync for BufferPool {} +unsafe impl Send for BufferPool {} + +struct PoolInner { + /// An instance of the program's IO, used for registering + /// Arena's with io_uring. + io: Option>, + /// An Arena which returns `ArenaBuffer`s of size `db_page_size`. + page_arena: Option, + /// An Arena which returns `ArenaBuffer`s of size `db_page_size` + /// plus 24 byte `WAL_FRAME_HEADER_SIZE`, preventing the fragmentation + /// or complex book-keeping needed to use the same arena for both sizes. + wal_frame_arena: Option, + /// A lock preventing concurrent initialization. + init_lock: Mutex<()>, + /// The size of each `Arena`, in bytes. + arena_size: AtomicUsize, + /// The `[Database::page_size]`, which the `page_arena` will use to + /// return buffers from `Self::get_page`. + db_page_size: AtomicUsize, +} + +unsafe impl Sync for PoolInner {} +unsafe impl Send for PoolInner {} + +impl Default for BufferPool { + fn default() -> Self { + Self::new(Self::DEFAULT_ARENA_SIZE) + } +} + +impl BufferPool { + /// 3MB Default size for each `Arena`. Any higher and + /// it will fail to register the second arena with io_uring due + /// to `RL_MEMLOCK` limit for un-privileged processes being 8MB total. + pub const DEFAULT_ARENA_SIZE: usize = 3 * 1024 * 1024; + /// 1MB size For testing/CI + pub const TEST_ARENA_SIZE: usize = 1024 * 1024; + /// 4KB default page_size + pub const DEFAULT_PAGE_SIZE: usize = 4096; + /// Maximum size for each Arena (64MB total) + const MAX_ARENA_SIZE: usize = 32 * 1024 * 1024; + /// 64kb Minimum arena size + const MIN_ARENA_SIZE: usize = 1024 * 64; + fn new(arena_size: usize) -> Self { + turso_assert!( + (Self::MIN_ARENA_SIZE..Self::MAX_ARENA_SIZE).contains(&arena_size), + "Arena size needs to be between {}..{} bytes", + Self::MIN_ARENA_SIZE, + Self::MAX_ARENA_SIZE + ); + Self { + inner: UnsafeCell::new(PoolInner { + page_arena: None, + wal_frame_arena: None, + arena_size: arena_size.into(), + db_page_size: Self::DEFAULT_PAGE_SIZE.into(), + init_lock: Mutex::new(()), + io: None, + }), + } + } + + /// Request a `Buffer` of size `len` + #[inline] + pub fn allocate(len: usize) -> Buffer { + let pool = BUFFER_POOL.get().expect("BufferPool must be initialized"); + pool.inner().allocate(len) + } + + /// Request a `Buffer` the size of the `db_page_size` the `BufferPool` was initialized with. + #[inline] + pub fn get_page(&self) -> Buffer { + let inner = self.inner_mut(); + inner.get_page() + } + + /// Request a `Buffer` for use with a WAL frame, + /// `[Database::page_size] + `WAL_FRAME_HEADER_SIZE` + #[inline] + pub fn get_wal_frame(&self) -> Buffer { + let inner = self.inner_mut(); + inner.get_frame() + } + + #[inline] + fn inner(&self) -> &PoolInner { + unsafe { &*self.inner.get() } + } + + #[inline] + #[allow(clippy::mut_from_ref)] + fn inner_mut(&self) -> &mut PoolInner { + unsafe { &mut *self.inner.get() } + } + + /// Create a static `BufferPool` initialize the pool to the default page size, **without** + /// populating the Arenas. Arenas will not be created until `[BufferPool::finalize_page_size]`, + /// and the pool will temporarily return temporary buffers to prevent reallocation of the + /// arena if the page size is set to something other than the default value. + pub fn begin_init(io: &Arc, arena_size: usize) -> Arc { + let pool = BUFFER_POOL.get_or_init(|| Arc::new(BufferPool::new(arena_size))); + let inner = pool.inner_mut(); + // Just store the IO handle, don't create arena yet + if inner.io.is_none() { + inner.io = Some(Arc::clone(io)); + } + pool.clone() + } + + /// Call when `[Database::db_state]` is initialized, providing the `page_size` to allocate + /// an arena for the pool. Before this call, the pool will use temporary buffers which are + /// cached in thread local storage. + pub fn finalize_with_page_size(&self, page_size: usize) -> crate::Result> { + let pool = BUFFER_POOL.get().expect("BufferPool must be initialized"); + let inner = pool.inner_mut(); + tracing::trace!("finalize page size called with size {page_size}"); + if page_size != BufferPool::DEFAULT_PAGE_SIZE { + // so far we have handed out some temporary buffers, since the page size is not + // default, we need to clear the cache so they aren't reused for other operations. + TEMP_BUFFER_CACHE.with(|cache| { + cache.borrow_mut().reinit_cache(page_size); + }); + } + if inner.page_arena.is_some() { + return Ok(pool.clone()); + } + inner.db_page_size.store(page_size, Ordering::Relaxed); + inner.init_arenas()?; + Ok(pool.clone()) + } + + #[inline] + /// Marks the underlying pages for an `ArenaBuffer` on `Drop` as free and + /// available for use by another allocation. + pub fn free(&self, size: usize, arena_id: u32, page_id: u32) { + self.inner_mut().free(size, arena_id, page_id); + } +} + +impl PoolInner { + /// Allocate a buffer of the given length from the pool, falling back to + /// temporary thread local buffers if the pool is not initialized or is full. + pub fn allocate(&self, len: usize) -> Buffer { + turso_assert!(len > 0, "Cannot allocate zero-length buffer"); + + let db_page_size = self.db_page_size.load(Ordering::Relaxed); + let wal_frame_size = db_page_size + WAL_FRAME_HEADER_SIZE; + + // Check if this is exactly a WAL frame size allocation + if len == wal_frame_size { + return self + .wal_frame_arena + .as_ref() + .and_then(|wal_arena| wal_arena.try_alloc(len)) + .unwrap_or(Buffer::new_temporary(len)); + } + // For all other sizes, use regular arena + self.page_arena + .as_ref() + .and_then(|arena| arena.try_alloc(len)) + .unwrap_or(Buffer::new_temporary(len)) + } + + fn get_page(&mut self) -> Buffer { + let db_page_size = self.db_page_size.load(Ordering::Relaxed); + self.page_arena + .as_ref() + .and_then(|arena| arena.try_alloc(db_page_size)) + .unwrap_or(Buffer::new_temporary(db_page_size)) + } + + fn get_frame(&mut self) -> Buffer { + let len = self.db_page_size.load(Ordering::Relaxed) + WAL_FRAME_HEADER_SIZE; + self.wal_frame_arena + .as_ref() + .and_then(|wal_arena| wal_arena.try_alloc(len)) + .unwrap_or(Buffer::new_temporary(len)) + } + + /// Allocate a new arena for the pool to use + fn init_arenas(&mut self) -> crate::Result<()> { + // Prevent concurrent growth + let Some(_guard) = self.init_lock.try_lock() else { + tracing::debug!("Buffer pool is already growing, skipping initialization"); + return Ok(()); // Already in progress + }; + let arena_size = self.arena_size.load(Ordering::Relaxed); + let db_page_size = self.db_page_size.load(Ordering::Relaxed); + let io = self.io.as_ref().expect("Pool not initialized").clone(); + + // Create regular page arena + match Arena::new(db_page_size, arena_size, &io) { + Ok(arena) => { + tracing::trace!( + "added arena {} with size {} MB and page size {}", + arena.id, + arena_size / (1024 * 1024), + db_page_size + ); + self.page_arena = Some(arena); + } + Err(e) => { + tracing::error!("Failed to create arena: {:?}", e); + return Err(LimboError::InternalError(format!( + "Failed to create arena: {e}", + ))); + } + } + + // Create WAL frame arena + let wal_frame_size = db_page_size + WAL_FRAME_HEADER_SIZE; + match Arena::new(wal_frame_size, arena_size, &io) { + Ok(arena) => { + tracing::trace!( + "added WAL frame arena {} with size {} MB and page size {}", + arena.id, + arena_size / (1024 * 1024), + wal_frame_size + ); + self.wal_frame_arena = Some(arena); + } + Err(e) => { + tracing::error!("Failed to create WAL frame arena: {:?}", e); + return Err(LimboError::InternalError(format!( + "Failed to create WAL frame arena: {e}", + ))); + } + } + + Ok(()) + } + + fn free(&self, size: usize, arena_id: u32, page_idx: u32) { + // allocations of size `page_size` are more common, so we check that arena first. + if let Some(arena) = self.page_arena.as_ref() { + if arena_id == arena.id { + arena.free(page_idx, size); + return; + } + } + // check WAL frame arena + if let Some(wal_arena) = self.wal_frame_arena.as_ref() { + if arena_id == wal_arena.id { + wal_arena.free(page_idx, size); + return; + } + } + panic!("ArenaBuffer freed with no available parent Arena"); + } +} + +/// Preallocated block of memory used by the pool to distribute `ArenaBuffer`s +struct Arena { + /// Identifier to tie allocations back to the arena. If the arena is registerd + /// with `io_uring`, then the ID represents the index of the arena into the ring's + /// sparse registered buffer array created on the ring's initialization. + id: u32, + /// Base pointer to the arena returned by `mmap` + base: NonNull, + /// Total number of pages currently allocated/in use. + allocated_pages: AtomicUsize, + /// Currently free pages. + free_pages: SpinLock, + /// Total size of the arena in bytes + arena_size: usize, + /// Page size the total arena is divided into. + /// Because most allocations are of size `[Database::page_size]`, with an + /// additional 24 byte wal frame header, we treat this as the `page_size` to reduce + /// fragmentation to 24 bytes for regular pages. + page_size: usize, +} + +impl Drop for Arena { + fn drop(&mut self) { + unsafe { arena::dealloc(self.base.as_ptr(), self.arena_size) }; + } +} + +/// Slots 0 and 1 will be reserved for Arenas which are registered buffers +/// with io_uring. +const UNREGISTERED_START: u32 = 2; + +/// ID's for an Arena which is not registered with `io_uring` +/// registered arena will always have id = 0..=1 +static NEXT_ID: AtomicU32 = AtomicU32::new(UNREGISTERED_START); + +impl Arena { + /// Create a new arena with the given size and page size. + /// NOTE: Minimum arena size is page_size * 64 + fn new(page_size: usize, arena_size: usize, io: &Arc) -> Result { + let min_pages = arena_size.div_ceil(page_size); + let rounded_pages = (min_pages.max(64) + 63) & !63; + let rounded_bytes = rounded_pages * page_size; + // Guard against the global cap + if rounded_bytes > BufferPool::MAX_ARENA_SIZE { + return Err(format!( + "arena size {} B exceeds hard limit of {} B", + rounded_bytes, + BufferPool::MAX_ARENA_SIZE + )); + } + let ptr = unsafe { arena::alloc(rounded_bytes) }; + let base = NonNull::new(ptr).ok_or("Failed to allocate arena")?; + let id = io + .register_fixed_buffer(base, rounded_bytes) + .unwrap_or_else(|_| { + // Register with io_uring if possible, otherwise use next available ID + let next_id = NEXT_ID.fetch_add(1, Ordering::Relaxed); + tracing::trace!("Allocating arena with id {}", next_id); + next_id + }); + let map = PageBitmap::new(rounded_pages as u32); + Ok(Self { + id, + base, + free_pages: SpinLock::new(map), + allocated_pages: AtomicUsize::new(0), + page_size, + arena_size: rounded_bytes, + }) + } + + /// Allocate a `Buffer` large enough for logical length `size`. + /// May span multiple pages + pub fn try_alloc(&self, size: usize) -> Option { + let pages = size.div_ceil(self.page_size) as u32; + let mut freemap = self.free_pages.lock(); + + let first_idx = if pages == 1 { + // use the optimized method for individual pages which attempts + // to leave large contiguous areas free of fragmentation for + // larger `runs`. + freemap.alloc_one()? + } else { + freemap.alloc_run(pages)? + }; + self.allocated_pages + .fetch_add(pages as usize, Ordering::Relaxed); + let offset = first_idx as usize * self.page_size; + let ptr = unsafe { NonNull::new_unchecked(self.base.as_ptr().add(offset)) }; + Some(Buffer::new_pooled(ArenaBuffer::new( + ptr, size, self.id, first_idx, + ))) + } + + /// Mark all relevant pages that include `size` starting at `page_idx` as free. + pub fn free(&self, page_idx: u32, size: usize) { + let mut bm = self.free_pages.lock(); + let count = size.div_ceil(self.page_size); + turso_assert!( + !bm.check_run_free(page_idx, count as u32), + "must not already be marked free" + ); + bm.free_run(page_idx, count as u32); + self.allocated_pages.fetch_sub(count, Ordering::Relaxed); + } +} + +#[cfg(unix)] +mod arena { + #[cfg(target_os = "macos")] + use libc::MAP_ANON as MAP_ANONYMOUS; + #[cfg(target_os = "linux")] + use libc::MAP_ANONYMOUS; + use libc::{mmap, munmap, MAP_PRIVATE, PROT_READ, PROT_WRITE}; + use std::ffi::c_void; + + pub unsafe fn alloc(len: usize) -> *mut u8 { + let ptr = mmap( + std::ptr::null_mut(), + len, + PROT_READ | PROT_WRITE, + MAP_PRIVATE | MAP_ANONYMOUS, + -1, + 0, + ); + if ptr == libc::MAP_FAILED { + panic!("mmap failed: {}", std::io::Error::last_os_error()); + } + #[cfg(target_os = "linux")] + { + libc::madvise(ptr, len, libc::MADV_HUGEPAGE); + } + ptr as *mut u8 + } + + pub unsafe fn dealloc(ptr: *mut u8, len: usize) { + let result = munmap(ptr as *mut c_void, len); + if result != 0 { + panic!("munmap failed: {}", std::io::Error::last_os_error()); + } + } +} + +#[cfg(not(unix))] +mod arena { + pub fn alloc(len: usize) -> *mut u8 { + let layout = std::alloc::Layout::from_size_align(len, std::mem::size_of::()).unwrap(); + unsafe { std::alloc::alloc(layout) } + } + pub fn dealloc(ptr: *mut u8, len: usize) { + let layout = std::alloc::Layout::from_size_align(len, std::mem::size_of::()).unwrap(); + unsafe { std::alloc::dealloc(ptr, layout) }; } } diff --git a/core/storage/page_bitmap.rs b/core/storage/page_bitmap.rs index 51942b1e1..5bb5465e1 100644 --- a/core/storage/page_bitmap.rs +++ b/core/storage/page_bitmap.rs @@ -314,7 +314,7 @@ impl PageBitmap { /// /// Word 1: ...11111111_11110000 (bits 60-63 must be checked) /// Word 2: 00000000_01111111... (bits 0-6 must be checked) - fn check_run_free(&self, start: u32, len: u32) -> bool { + pub(super) fn check_run_free(&self, start: u32, len: u32) -> bool { if start + len > self.n_pages { return false; } diff --git a/core/storage/page_cache.rs b/core/storage/page_cache.rs index 2a2dcc04f..d1d6499b6 100644 --- a/core/storage/page_cache.rs +++ b/core/storage/page_cache.rs @@ -621,12 +621,12 @@ impl PageHashMap { #[cfg(test)] mod tests { use super::*; - use crate::io::{Buffer, BufferData}; use crate::storage::page_cache::CacheError; use crate::storage::pager::{Page, PageRef}; use crate::storage::sqlite3_ondisk::PageContent; + use crate::BufferPool; use std::ptr::NonNull; - use std::{num::NonZeroUsize, pin::Pin, rc::Rc, sync::Arc}; + use std::{num::NonZeroUsize, sync::Arc}; use lru::LruCache; use rand_chacha::{ @@ -642,8 +642,7 @@ mod tests { pub fn page_with_content(page_id: usize) -> PageRef { let page = Arc::new(Page::new(page_id)); { - let buffer_drop_fn = Rc::new(|_data: BufferData| {}); - let buffer = Buffer::new(Pin::new(vec![0; 4096]), buffer_drop_fn); + let buffer = BufferPool::allocate(4096); let page_content = PageContent { offset: 0, buffer: Arc::new(buffer), diff --git a/core/storage/pager.rs b/core/storage/pager.rs index 11e5999b1..73c95532b 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -1,15 +1,18 @@ use crate::result::LimboResult; -use crate::storage::btree::BTreePageInner; -use crate::storage::buffer_pool::BufferPool; -use crate::storage::database::DatabaseStorage; -use crate::storage::sqlite3_ondisk::{ - self, parse_wal_frame_header, DatabaseHeader, PageContent, PageSize, PageType, +use crate::storage::{ + btree::BTreePageInner, + buffer_pool::BufferPool, + database::DatabaseStorage, + sqlite3_ondisk::{ + self, parse_wal_frame_header, DatabaseHeader, PageContent, PageSize, PageType, + }, + wal::{CheckpointResult, Wal}, }; -use crate::storage::wal::{CheckpointResult, Wal}; -use crate::types::{IOResult, WalFrameInfo}; use crate::util::IOExt as _; -use crate::{return_if_io, Completion, TransactionState}; -use crate::{turso_assert, Buffer, Connection, LimboError, Result}; +use crate::{ + return_if_io, turso_assert, types::WalFrameInfo, Completion, Connection, IOResult, LimboError, + Result, TransactionState, +}; use parking_lot::RwLock; use std::cell::{Cell, OnceCell, RefCell, UnsafeCell}; use std::collections::HashSet; @@ -787,7 +790,7 @@ impl Pager { )?; turso_assert!( - ptrmap_page.get().id == ptrmap_pg_no as usize, + ptrmap_page.get().id == ptrmap_pg_no, "ptrmap page has unexpected number" ); self.add_dirty(&ptrmap_page); @@ -1763,6 +1766,8 @@ impl Pager { if let Some(size) = self.page_size.get() { default_header.page_size = PageSize::new(size).expect("page size"); } + self.buffer_pool + .finalize_with_page_size(default_header.page_size.get() as usize)?; let page = allocate_new_page(1, &self.buffer_pool, 0); let contents = page.get_contents(); @@ -2151,12 +2156,8 @@ impl Pager { pub fn allocate_new_page(page_id: usize, buffer_pool: &Arc, offset: usize) -> PageRef { let page = Arc::new(Page::new(page_id)); { - let buffer = buffer_pool.get(); - let bp = buffer_pool.clone(); - let drop_fn = Rc::new(move |buf| { - bp.put(buf); - }); - let buffer = Arc::new(Buffer::new(buffer, drop_fn)); + let buffer = buffer_pool.get_page(); + let buffer = Arc::new(buffer); page.set_loaded(); page.get().contents = Some(PageContent::new(offset, buffer)); } @@ -2432,10 +2433,10 @@ mod ptrmap_tests { )); // Construct interfaces for the pager - let buffer_pool = Arc::new(BufferPool::new(Some(page_size as usize))); - let page_cache = Arc::new(RwLock::new(DumbLruPageCache::new( - (initial_db_pages + 10) as usize, - ))); + let pages = initial_db_pages + 10; + let sz = std::cmp::max(std::cmp::min(pages, 64), pages); + let buffer_pool = BufferPool::begin_init(&io, (sz * page_size) as usize); + let page_cache = Arc::new(RwLock::new(DumbLruPageCache::new(sz as usize))); let wal = Rc::new(RefCell::new(WalFile::new( io.clone(), diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 8b2a39b4f..d2c24770a 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -852,13 +852,9 @@ pub fn begin_read_page( allow_empty_read: bool, ) -> Result { tracing::trace!("begin_read_btree_page(page_idx = {})", page_idx); - let buf = buffer_pool.get(); - let drop_fn = Rc::new(move |buf| { - let buffer_pool = buffer_pool.clone(); - buffer_pool.put(buf); - }); + let buf = buffer_pool.get_page(); #[allow(clippy::arc_with_non_send_sync)] - let buf = Arc::new(Buffer::new(buf, drop_fn)); + let buf = Arc::new(buf); let complete = Box::new(move |mut buf: Arc, bytes_read: i32| { let buf_len = buf.len(); turso_assert!( @@ -867,7 +863,7 @@ pub fn begin_read_page( ); let page = page.clone(); if bytes_read == 0 { - buf = Arc::new(Buffer::allocate(0, Rc::new(|_| {}))); + buf = Arc::new(Buffer::new_temporary(0)); } if finish_read_page(page_idx, buf, page.clone()).is_err() { page.set_error(); @@ -1557,10 +1553,9 @@ pub fn write_varint_to_vec(value: u64, payload: &mut Vec) { /// We need to read the WAL file on open to reconstruct the WAL frame cache. pub fn read_entire_wal_dumb(file: &Arc) -> Result>> { - let drop_fn = Rc::new(|_buf| {}); let size = file.size()?; #[allow(clippy::arc_with_non_send_sync)] - let buf_for_pread = Arc::new(Buffer::allocate(size as usize, drop_fn)); + let buf_for_pread = Arc::new(Buffer::new_temporary(size as usize)); let header = Arc::new(SpinLock::new(WalHeader::default())); #[allow(clippy::arc_with_non_send_sync)] let wal_file_shared_ret = Arc::new(UnsafeCell::new(WalFileShared { @@ -1765,17 +1760,13 @@ pub fn read_entire_wal_dumb(file: &Arc) -> Result, io: &Arc, offset: usize, - page_size: u32, - complete: Box, i32)>, + complete: Box, ) -> Result { tracing::trace!("begin_read_wal_frame_raw(offset={})", offset); - let drop_fn = Rc::new(|_buf| {}); - let buf = Arc::new(Buffer::allocate( - page_size as usize + WAL_FRAME_HEADER_SIZE, - drop_fn, - )); + let buf = Arc::new(buffer_pool.get_wal_frame()); #[allow(clippy::arc_with_non_send_sync)] let c = Completion::new_read(buf, complete); let c = io.pread(offset, c)?; @@ -1786,15 +1777,11 @@ pub fn begin_read_wal_frame( io: &Arc, offset: usize, buffer_pool: Arc, - complete: Box, i32)>, + complete: Box, ) -> Result { tracing::trace!("begin_read_wal_frame(offset={})", offset); - let buf = buffer_pool.get(); - let drop_fn = Rc::new(move |buf| { - let buffer_pool = buffer_pool.clone(); - buffer_pool.put(buf); - }); - let buf = Arc::new(Buffer::new(buf, drop_fn)); + let buf = buffer_pool.get_page(); + let buf = Arc::new(buf); #[allow(clippy::arc_with_non_send_sync)] let c = Completion::new_read(buf, complete); let c = io.pread(offset, c)?; @@ -1821,6 +1808,7 @@ pub fn parse_wal_frame_header(frame: &[u8]) -> (WalFrameHeader, &[u8]) { } pub fn prepare_wal_frame( + buffer_pool: &Arc, wal_header: &WalHeader, prev_checksums: (u32, u32), page_size: u32, @@ -1830,8 +1818,7 @@ pub fn prepare_wal_frame( ) -> ((u32, u32), Arc) { tracing::trace!(page_number); - let drop_fn = Rc::new(|_buf| {}); - let buffer = Buffer::allocate(page_size as usize + WAL_FRAME_HEADER_SIZE, drop_fn); + let buffer = buffer_pool.get_wal_frame(); let frame = buffer.as_mut_slice(); frame[WAL_FRAME_HEADER_SIZE..].copy_from_slice(page); @@ -1858,9 +1845,7 @@ pub fn prepare_wal_frame( pub fn begin_write_wal_header(io: &Arc, header: &WalHeader) -> Result { tracing::trace!("begin_write_wal_header"); let buffer = { - let drop_fn = Rc::new(|_buf| {}); - - let buffer = Buffer::allocate(WAL_HEADER_SIZE, drop_fn); + let buffer = Buffer::new_temporary(WAL_HEADER_SIZE); let buf = buffer.as_mut_slice(); buf[0..4].copy_from_slice(&header.magic.to_be_bytes()); diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 86e6a02a2..f69700545 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -336,10 +336,8 @@ impl Batch { self.items.insert(id, buf_clone); // Re-initialize scratch with a fresh buffer - let raw = pool.get(); - let pool_clone = pool.clone(); - let drop_fn = Rc::new(move |b| pool_clone.put(b)); - let new_buf = Arc::new(Buffer::new(raw, drop_fn)); + let raw = pool.get_page(); + let new_buf = Arc::new(raw); unsafe { let inner = &mut *scratch.inner.get(); @@ -929,13 +927,13 @@ impl Wal for WalFile { bytes_read == buf_len as i32, "read({bytes_read}) != expected({buf_len})" ); - let buf_ptr = buf.as_ptr(); + let buf_ptr = buf.as_mut_ptr(); unsafe { std::ptr::copy_nonoverlapping(buf_ptr, frame_ptr, frame_len); } }); let c = - begin_read_wal_frame_raw(&self.get_shared().file, offset, self.page_size(), complete)?; + begin_read_wal_frame_raw(&self.buffer_pool, &self.get_shared().file, offset, complete)?; Ok(c) } @@ -1005,6 +1003,7 @@ impl Wal for WalFile { let header = header.lock(); let checksums = self.last_checksum; let (checksums, frame_bytes) = prepare_wal_frame( + &self.buffer_pool, &header, checksums, header.page_size, @@ -1046,6 +1045,7 @@ impl Wal for WalFile { let page_content = page.get_contents(); let page_buf = page_content.as_ptr(); let (frame_checksums, frame_bytes) = prepare_wal_frame( + &self.buffer_pool, &header, checksums, header.page_size, @@ -1218,14 +1218,9 @@ impl WalFile { buffer_pool: Arc, ) -> Self { let checkpoint_page = Arc::new(Page::new(0)); - let buffer = buffer_pool.get(); + let buffer = buffer_pool.get_page(); { - let buffer_pool = buffer_pool.clone(); - let drop_fn = Rc::new(move |buf| { - buffer_pool.put(buf); - }); - checkpoint_page.get().contents = - Some(PageContent::new(0, Arc::new(Buffer::new(buffer, drop_fn)))); + checkpoint_page.get().contents = Some(PageContent::new(0, Arc::new(buffer))); } let header = unsafe { shared.get().as_mut().unwrap().wal_header.lock() }; diff --git a/core/vdbe/execute.rs b/core/vdbe/execute.rs index 10415eb64..f87253f4e 100644 --- a/core/vdbe/execute.rs +++ b/core/vdbe/execute.rs @@ -3,6 +3,7 @@ use crate::function::AlterTableFunc; use crate::numeric::{NullableInteger, Numeric}; use crate::schema::Table; use crate::storage::btree::{integrity_check, IntegrityCheckError, IntegrityCheckState}; +use crate::storage::buffer_pool::BUFFER_POOL; use crate::storage::database::DatabaseFile; use crate::storage::page_cache::DumbLruPageCache; use crate::storage::pager::{AtomicDbState, CreateBTreeFlags, DbState}; @@ -62,9 +63,7 @@ use crate::{ vector::{vector32, vector64, vector_distance_cos, vector_distance_l2, vector_extract}, }; -use crate::{ - info, turso_assert, BufferPool, OpenFlags, RefValue, Row, StepResult, TransactionState, -}; +use crate::{info, turso_assert, OpenFlags, RefValue, Row, StepResult, TransactionState}; use super::{ insn::{Cookie, RegisterOrLiteral}, @@ -6460,7 +6459,7 @@ pub fn op_open_ephemeral( .block(|| pager.with_header(|header| header.page_size))? .get(); - let buffer_pool = Arc::new(BufferPool::new(Some(page_size as usize))); + let buffer_pool = BUFFER_POOL.get().expect("Buffer pool not initialized"); let page_cache = Arc::new(RwLock::new(DumbLruPageCache::default())); let pager = Rc::new(Pager::new( @@ -6478,7 +6477,6 @@ pub fn op_open_ephemeral( .block(|| pager.with_header(|header| header.page_size)) .unwrap_or_default() .get() as usize; - buffer_pool.set_page_size(page_size); state.op_open_ephemeral_state = OpOpenEphemeralState::StartingTxn { pager }; } diff --git a/core/vdbe/sorter.rs b/core/vdbe/sorter.rs index 3ee943555..f27b57f54 100644 --- a/core/vdbe/sorter.rs +++ b/core/vdbe/sorter.rs @@ -9,7 +9,7 @@ use tempfile; use crate::{ error::LimboError, - io::{Buffer, BufferData, Completion, File, OpenFlags, IO}, + io::{Buffer, Completion, File, OpenFlags, IO}, storage::sqlite3_ondisk::{read_varint, varint_len, write_varint}, translate::collate::CollationSeq, turso_assert, @@ -364,8 +364,7 @@ impl SortedChunk { let read_buffer_size = self.buffer.borrow().len() - self.buffer_len.get(); let read_buffer_size = read_buffer_size.min(self.chunk_size - self.total_bytes_read.get()); - let drop_fn = Rc::new(|_buffer: BufferData| {}); - let read_buffer = Buffer::allocate(read_buffer_size, drop_fn); + let read_buffer = Buffer::new_temporary(read_buffer_size); let read_buffer_ref = Arc::new(read_buffer); let chunk_io_state_copy = self.io_state.clone(); @@ -414,8 +413,7 @@ impl SortedChunk { self.chunk_size += size_len + record_size; } - let drop_fn = Rc::new(|_buffer: BufferData| {}); - let buffer = Buffer::allocate(self.chunk_size, drop_fn); + let buffer = Buffer::new_temporary(self.chunk_size); let mut buf_pos = 0; let buf = buffer.as_mut_slice(); diff --git a/packages/turso-sync-engine/src/database_sync_operations.rs b/packages/turso-sync-engine/src/database_sync_operations.rs index f53d0cd8b..0e8d145a6 100644 --- a/packages/turso-sync-engine/src/database_sync_operations.rs +++ b/packages/turso-sync-engine/src/database_sync_operations.rs @@ -1,4 +1,4 @@ -use std::{rc::Rc, sync::Arc}; +use std::sync::Arc; use turso_core::{types::Text, Buffer, Completion, LimboError, Value}; @@ -48,7 +48,7 @@ pub async fn db_bootstrap( let content_len = chunk.len(); // todo(sivukhin): optimize allocations here #[allow(clippy::arc_with_non_send_sync)] - let buffer = Arc::new(Buffer::allocate(chunk.len(), Rc::new(|_| {}))); + let buffer = Arc::new(Buffer::new_temporary(chunk.len())); buffer.as_mut_slice().copy_from_slice(chunk); let mut completions = Vec::with_capacity(dbs.len()); for db in dbs { diff --git a/scripts/limbo-sqlite3 b/scripts/limbo-sqlite3 index 1e8e63290..ce742f795 100755 --- a/scripts/limbo-sqlite3 +++ b/scripts/limbo-sqlite3 @@ -11,7 +11,7 @@ EXPERIMENTAL_FLAGS="" # if RUST_LOG is non-empty, enable tracing output if [ -n "$RUST_LOG" ]; then - "$TURSODB" -m list -q $EXPERIMENTAL_FLAGS -t testing/test.log "$@" + TESTING="true" "$TURSODB" -m list -q $EXPERIMENTAL_FLAGS -t testing/test.log "$@" else - "$TURSODB" -m list -q $EXPERIMENTAL_FLAGS "$@" + TESTING="true" "$TURSODB" -m list -q $EXPERIMENTAL_FLAGS "$@" fi diff --git a/tests/integration/query_processing/test_btree.rs b/tests/integration/query_processing/test_btree.rs index e98b762ed..0d174e50c 100644 --- a/tests/integration/query_processing/test_btree.rs +++ b/tests/integration/query_processing/test_btree.rs @@ -1,7 +1,6 @@ use std::{ collections::{HashMap, HashSet}, path::Path, - pin::Pin, rc::Rc, sync::Arc, }; @@ -428,10 +427,13 @@ impl BTreeGenerator<'_> { } fn write_at(io: &impl IO, file: Arc, offset: usize, data: &[u8]) { - let completion = Completion::new_write(|_| {}); - let drop_fn = Rc::new(move |_| {}); #[allow(clippy::arc_with_non_send_sync)] - let buffer = Arc::new(Buffer::new(Pin::new(data.to_vec()), drop_fn)); + let buffer = Arc::new(Buffer::new(data.to_vec())); + let _buf = buffer.clone(); + let completion = Completion::new_write(move |_| { + // reference the buffer to keep alive for async io + let _buf = _buf.clone(); + }); let result = file.pwrite(offset, buffer, completion).unwrap(); while !result.is_completed() { io.run_once().unwrap();