diff --git a/core/io/io_uring.rs b/core/io/io_uring.rs index 29fc31ade..64204b309 100644 --- a/core/io/io_uring.rs +++ b/core/io/io_uring.rs @@ -354,7 +354,7 @@ impl WrappedIOUring { // If we have coalesced everything into a single iovec, submit as a single`pwrite` if iov_count == 1 { let entry = with_fd!(st.file_id, |fd| { - if let Some(id) = st.bufs[st.current_buffer_idx].borrow().fixed_id() { + if let Some(id) = st.bufs[st.current_buffer_idx].fixed_id() { io_uring::opcode::WriteFixed::new( fd, iov_allocation[0].iov_base as *const u8, @@ -657,10 +657,10 @@ impl File for UringFile { fn pwrite(&self, pos: usize, buffer: Arc, c: Completion) -> Result { let mut io = self.io.borrow_mut(); let write = { - let ptr = buf.as_ptr(); - let len = buf.len(); + let ptr = buffer.as_ptr(); + let len = buffer.len(); with_fd!(self, |fd| { - if let Some(idx) = buf.fixed_id() { + if let Some(idx) = buffer.fixed_id() { trace!( "pwrite_fixed(pos = {}, length = {}, idx= {})", pos, diff --git a/core/io/mod.rs b/core/io/mod.rs index f637747e7..66dbae949 100644 --- a/core/io/mod.rs +++ b/core/io/mod.rs @@ -331,6 +331,9 @@ impl Buffer { tracing::trace!("buffer::new({:?})", data); Self::Heap(Pin::new(data.into_boxed_slice())) } + + /// Returns the index of the underlying `Arena` if it was registered with + /// io_uring. Only for use with `UringIO` backend. pub fn fixed_id(&self) -> Option { match self { Self::Heap { .. } => None, @@ -376,16 +379,16 @@ impl Buffer { #[allow(clippy::mut_from_ref)] pub fn as_mut_slice(&self) -> &mut [u8] { + unsafe { std::slice::from_raw_parts_mut(self.as_mut_ptr(), self.len()) } + } + #[inline] + pub fn as_ptr(&self) -> *const u8 { match self { - Self::Heap(buf) => { - // SAFETY: The buffer is guaranteed to be valid for the lifetime of the slice - unsafe { std::slice::from_raw_parts_mut(buf.as_ptr() as *mut u8, buf.len()) } - } - Self::Pooled(buf) => unsafe { - std::slice::from_raw_parts_mut(buf.as_ptr() as *mut u8, buf.len()) - }, + Self::Heap(buf) => buf.as_ptr(), + Self::Pooled(buf) => buf.as_ptr(), } } + #[inline] pub fn as_mut_ptr(&self) -> *mut u8 { match self { Self::Heap(buf) => buf.as_ptr() as *mut u8, @@ -399,24 +402,34 @@ thread_local! { pub static TEMP_BUFFER_CACHE: RefCell = RefCell::new(TempBufferCache::new()); } +/// A cache for temporary or any additional `Buffer` allocations beyond +/// what the `BufferPool` has room for, or for use before the pool is +/// fully initialized. pub(crate) struct TempBufferCache { - // Buffers indexed by size, we only cache common sizes + /// The `[Database::page_size]` at the time the cache is initiated. page_size: usize, + /// Cache of buffers of size `self.page_size`. page_buffers: Vec, + /// Cache of buffers of size `self.page_size` + WAL_FRAME_HEADER_SIZE. wal_frame_buffers: Vec, + /// Maximum number of buffers that will live in each cache. max_cached: usize, } impl TempBufferCache { + const DEFAULT_MAX_CACHE_SIZE: usize = 256; + fn new() -> Self { Self { page_size: BufferPool::DEFAULT_PAGE_SIZE, page_buffers: Vec::with_capacity(8), wal_frame_buffers: Vec::with_capacity(8), - max_cached: 512, + max_cached: Self::DEFAULT_MAX_CACHE_SIZE, } } + /// If the `[Database::page_size]` is set, any temporary buffers that might + /// exist prior need to be cleared and new `page_size` needs to be saved. pub fn reinit_cache(&mut self, page_size: usize) { self.page_buffers.clear(); self.wal_frame_buffers.clear(); diff --git a/core/storage/page_cache.rs b/core/storage/page_cache.rs index 2297ab322..d1d6499b6 100644 --- a/core/storage/page_cache.rs +++ b/core/storage/page_cache.rs @@ -626,7 +626,7 @@ mod tests { use crate::storage::sqlite3_ondisk::PageContent; use crate::BufferPool; use std::ptr::NonNull; - use std::{num::NonZeroUsize, pin::Pin, rc::Rc, sync::Arc}; + use std::{num::NonZeroUsize, sync::Arc}; use lru::LruCache; use rand_chacha::{ diff --git a/core/storage/pager.rs b/core/storage/pager.rs index a736a1805..d44369632 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -14,7 +14,7 @@ use crate::{ Result, TransactionState, }; use parking_lot::RwLock; -use std::cell::{Cell, OnceCell, UnsafeCell}; +use std::cell::{Cell, OnceCell, RefCell, UnsafeCell}; use std::collections::HashSet; use std::hash; use std::rc::Rc; @@ -790,7 +790,7 @@ impl Pager { )?; turso_assert!( - ptrmap_page.get().id == ptrmap_pg_no as usize, + ptrmap_page.get().id == ptrmap_pg_no, "ptrmap page has unexpected number" ); self.add_dirty(&ptrmap_page); diff --git a/core/storage/wal.rs b/core/storage/wal.rs index ccc92677f..f69700545 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -927,7 +927,7 @@ impl Wal for WalFile { bytes_read == buf_len as i32, "read({bytes_read}) != expected({buf_len})" ); - let buf_ptr = buf.as_ptr(); + let buf_ptr = buf.as_mut_ptr(); unsafe { std::ptr::copy_nonoverlapping(buf_ptr, frame_ptr, frame_len); } diff --git a/tests/integration/query_processing/test_btree.rs b/tests/integration/query_processing/test_btree.rs index 0e6571698..0d174e50c 100644 --- a/tests/integration/query_processing/test_btree.rs +++ b/tests/integration/query_processing/test_btree.rs @@ -430,7 +430,7 @@ fn write_at(io: &impl IO, file: Arc, offset: usize, data: &[u8]) { #[allow(clippy::arc_with_non_send_sync)] let buffer = Arc::new(Buffer::new(data.to_vec())); let _buf = buffer.clone(); - let completion = Completion::new_write(|_| { + let completion = Completion::new_write(move |_| { // reference the buffer to keep alive for async io let _buf = _buf.clone(); });