Merge 'BufferPool: add arena backed pool to support fixed opcodes and coalescing' from Preston Thorpe

This PR introduces a BufferPool which allocates all (usually) of the
memory we will need up front as two large arenas (one for WAL frames and
one for DB pages) and hands our pages from those two arena allocations.
(each will be 3 MB by default, because 8MB is typically the RL_MEMLOCK
limit so without `setcap cap_ipc_lock` changed, it will not be able to
properly register the arenas with io_uring).
Any additional memory needed will fall back to the previous style buffer
pool, with a simple thread local cache, which should be slightly faster
for most cases.

Closes #2419
This commit is contained in:
Preston Thorpe
2025-08-08 14:38:21 -04:00
committed by GitHub
15 changed files with 803 additions and 194 deletions

View File

@@ -5,6 +5,7 @@ use crate::io::clock::{Clock, Instant};
use crate::storage::wal::CKPT_BATCH_PAGES;
use crate::{turso_assert, LimboError, MemoryIO, Result};
use rustix::fs::{self, FlockOperation, OFlags};
use std::ptr::NonNull;
use std::{
cell::RefCell,
collections::{HashMap, VecDeque},
@@ -32,7 +33,7 @@ const FILES: u32 = 8;
const IOVEC_POOL_SIZE: usize = 64;
/// Maximum number of iovec entries per writev operation.
/// IOV_MAX is typically 1024, but we limit it to a smaller number
/// IOV_MAX is typically 1024
const MAX_IOVEC_ENTRIES: usize = CKPT_BATCH_PAGES;
/// Maximum number of I/O operations to wait for in a single run,
@@ -40,6 +41,9 @@ const MAX_IOVEC_ENTRIES: usize = CKPT_BATCH_PAGES;
/// make, but can increase single operation latency.
const MAX_WAIT: usize = 4;
/// One memory arena for DB pages and another for WAL frames
const ARENA_COUNT: usize = 2;
pub struct UringIO {
inner: Rc<RefCell<InnerUringIO>>,
}
@@ -57,6 +61,7 @@ struct WrappedIOUring {
struct InnerUringIO {
ring: WrappedIOUring,
free_files: VecDeque<u32>,
free_arenas: [Option<(NonNull<u8>, usize)>; ARENA_COUNT],
}
/// preallocated vec of iovec arrays to avoid allocations during writev operations
@@ -108,7 +113,8 @@ impl UringIO {
// RL_MEMLOCK cap is typically 8MB, the current design is to have one large arena
// registered at startup and therefore we can simply use the zero index, falling back
// to similar logic as the existing buffer pool for cases where it is over capacity.
ring.submitter().register_buffers_sparse(1)?;
ring.submitter()
.register_buffers_sparse(ARENA_COUNT as u32)?;
let inner = InnerUringIO {
ring: WrappedIOUring {
ring,
@@ -117,6 +123,7 @@ impl UringIO {
iov_pool: IovecPool::new(),
},
free_files: (0..FILES).collect(),
free_arenas: [const { None }; ARENA_COUNT],
};
debug!("Using IO backend 'io-uring'");
Ok(Self {
@@ -263,6 +270,18 @@ impl InnerUringIO {
self.free_files.push_back(id);
Ok(())
}
#[cfg(debug_assertions)]
fn debug_check_fixed(&self, idx: u32, ptr: *const u8, len: usize) {
let (base, blen) = self.free_arenas[idx as usize].expect("slot not registered");
let start = base.as_ptr() as usize;
let end = start + blen;
let p = ptr as usize;
turso_assert!(
p >= start && p + len <= end,
"Fixed operation, pointer out of registered range"
);
}
}
impl WrappedIOUring {
@@ -333,21 +352,34 @@ impl WrappedIOUring {
break;
}
}
// If we have coalesced everything into a single iovec, submit as a single`pwrite`
if iov_count == 1 {
// If we have coalesced everything into a single iovec, submit as a single`pwrite`
let entry = with_fd!(st.file_id, |fd| {
io_uring::opcode::Write::new(
fd,
iov_allocation[0].iov_base as *const u8,
iov_allocation[0].iov_len as u32,
)
.offset(st.file_pos as u64)
.build()
.user_data(key)
if let Some(id) = st.bufs[st.current_buffer_idx].fixed_id() {
io_uring::opcode::WriteFixed::new(
fd,
iov_allocation[0].iov_base as *const u8,
iov_allocation[0].iov_len as u32,
id as u16,
)
.offset(st.file_pos as u64)
.build()
.user_data(key)
} else {
io_uring::opcode::Write::new(
fd,
iov_allocation[0].iov_base as *const u8,
iov_allocation[0].iov_len as u32,
)
.offset(st.file_pos as u64)
.build()
.user_data(key)
}
});
self.submit_entry(&entry);
return;
}
// Store the pointers and get the pointer to the iovec array that we pass
// to the writev operation, and keep the array itself alive
let ptr = iov_allocation.as_ptr() as *mut libc::iovec;
@@ -477,16 +509,20 @@ impl IO for UringIO {
Arc::new(MemoryIO::new())
}
fn register_fixed_buffer(&self, ptr: std::ptr::NonNull<u8>, len: usize) -> Result<()> {
fn register_fixed_buffer(&self, ptr: std::ptr::NonNull<u8>, len: usize) -> Result<u32> {
turso_assert!(
len % 512 == 0,
"fixed buffer length must be logical block aligned"
);
let inner = self.inner.borrow_mut();
let default_id = 0;
let mut inner = self.inner.borrow_mut();
let slot = inner
.free_arenas
.iter()
.position(|e| e.is_none())
.ok_or_else(|| LimboError::UringIOError("no free fixed buffer slots".into()))?;
unsafe {
inner.ring.ring.submitter().register_buffers_update(
default_id,
slot as u32,
&[libc::iovec {
iov_base: ptr.as_ptr() as *mut libc::c_void,
iov_len: len,
@@ -494,7 +530,8 @@ impl IO for UringIO {
None,
)?
};
Ok(())
inner.free_arenas[slot] = Some((ptr, len));
Ok(slot as u32)
}
}
@@ -540,7 +577,6 @@ impl UringFile {
self.id
}
}
unsafe impl Send for UringFile {}
unsafe impl Sync for UringFile {}
@@ -584,17 +620,35 @@ impl File for UringFile {
fn pread(&self, pos: usize, c: Completion) -> Result<Completion> {
let r = c.as_read();
trace!("pread(pos = {}, length = {})", pos, r.buf().len());
let mut io = self.io.borrow_mut();
let read_e = {
let buf = r.buf();
let ptr = buf.as_mut_ptr();
let len = buf.len();
let buf = buf.as_mut_ptr();
with_fd!(self, |fd| {
io_uring::opcode::Read::new(fd, buf, len as u32)
.offset(pos as u64)
.build()
.user_data(get_key(c.clone()))
if let Some(idx) = buf.fixed_id() {
trace!(
"pread_fixed(pos = {}, length = {}, idx = {})",
pos,
len,
idx
);
#[cfg(debug_assertions)]
{
io.debug_check_fixed(idx, ptr, len);
}
io_uring::opcode::ReadFixed::new(fd, ptr, len as u32, idx as u16)
.offset(pos as u64)
.build()
.user_data(get_key(c.clone()))
} else {
trace!("pread(pos = {}, length = {})", pos, len);
// Use Read opcode if fixed buffer is not available
io_uring::opcode::Read::new(fd, buf.as_mut_ptr(), len as u32)
.offset(pos as u64)
.build()
.user_data(get_key(c.clone()))
}
})
};
io.ring.submit_entry(&read_e);
@@ -604,12 +658,31 @@ impl File for UringFile {
fn pwrite(&self, pos: usize, buffer: Arc<crate::Buffer>, c: Completion) -> Result<Completion> {
let mut io = self.io.borrow_mut();
let write = {
trace!("pwrite(pos = {}, length = {})", pos, buffer.len());
let ptr = buffer.as_ptr();
let len = buffer.len();
with_fd!(self, |fd| {
io_uring::opcode::Write::new(fd, buffer.as_ptr(), buffer.len() as u32)
.offset(pos as u64)
.build()
.user_data(get_key(c.clone()))
if let Some(idx) = buffer.fixed_id() {
trace!(
"pwrite_fixed(pos = {}, length = {}, idx= {})",
pos,
len,
idx
);
#[cfg(debug_assertions)]
{
io.debug_check_fixed(idx, ptr, len);
}
io_uring::opcode::WriteFixed::new(fd, ptr, len as u32, idx as u16)
.offset(pos as u64)
.build()
.user_data(get_key(c.clone()))
} else {
trace!("pwrite(pos = {}, length = {})", pos, buffer.len());
io_uring::opcode::Write::new(fd, ptr, len as u32)
.offset(pos as u64)
.build()
.user_data(get_key(c.clone()))
}
})
};
io.ring.submit_entry(&write);

View File

@@ -1,10 +1,13 @@
use crate::Result;
use crate::storage::buffer_pool::ArenaBuffer;
use crate::storage::sqlite3_ondisk::WAL_FRAME_HEADER_SIZE;
use crate::{BufferPool, Result};
use bitflags::bitflags;
use cfg_block::cfg_block;
use std::cell::RefCell;
use std::fmt;
use std::ptr::NonNull;
use std::sync::Arc;
use std::{cell::Cell, fmt::Debug, mem::ManuallyDrop, pin::Pin, rc::Rc};
use std::{cell::Cell, fmt::Debug, pin::Pin};
pub trait File: Send + Sync {
fn lock_file(&self, exclusive: bool) -> Result<()>;
@@ -18,6 +21,9 @@ pub trait File: Send + Sync {
c.complete(0);
return Ok(c);
}
if buffers.len() == 1 {
return self.pwrite(pos, buffers[0].clone(), c);
}
// naive default implementation can be overridden on backends where it makes sense to
let mut pos = pos;
let outstanding = Arc::new(AtomicUsize::new(buffers.len()));
@@ -68,7 +74,7 @@ pub trait File: Send + Sync {
while pos < file_size {
let chunk_size = (file_size - pos).min(BUFFER_SIZE);
// Read from source
let read_buffer = Arc::new(Buffer::allocate(chunk_size, Rc::new(|_| {})));
let read_buffer = Arc::new(Buffer::new_temporary(chunk_size));
let read_completion = self.pread(
pos,
Completion::new_read(read_buffer.clone(), move |_, _| {}),
@@ -119,7 +125,7 @@ pub trait IO: Clock + Send + Sync {
fn get_memory_io(&self) -> Arc<MemoryIO>;
fn register_fixed_buffer(&self, _ptr: NonNull<u8>, _len: usize) -> Result<()> {
fn register_fixed_buffer(&self, _ptr: NonNull<u8>, _len: usize) -> Result<u32> {
Err(crate::LimboError::InternalError(
"unsupported operation".to_string(),
))
@@ -290,62 +296,164 @@ impl TruncateCompletion {
}
}
pub type BufferData = Pin<Vec<u8>>;
pub type BufferData = Pin<Box<[u8]>>;
pub type BufferDropFn = Rc<dyn Fn(BufferData)>;
pub struct Buffer {
data: ManuallyDrop<BufferData>,
drop: BufferDropFn,
pub enum Buffer {
Heap(BufferData),
Pooled(ArenaBuffer),
}
impl Debug for Buffer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self.data)
match self {
Self::Pooled(p) => write!(f, "{p:?}"),
Self::Heap(buf) => write!(f, "{buf:?}: {}", buf.len()),
}
}
}
impl Drop for Buffer {
fn drop(&mut self) {
let data = unsafe { ManuallyDrop::take(&mut self.data) };
(self.drop)(data);
let len = self.len();
if let Self::Heap(buf) = self {
TEMP_BUFFER_CACHE.with(|cache| {
let mut cache = cache.borrow_mut();
// take ownership of the buffer by swapping it with a dummy
let buffer = std::mem::replace(buf, Pin::new(vec![].into_boxed_slice()));
cache.return_buffer(buffer, len);
});
}
}
}
impl Buffer {
pub fn allocate(size: usize, drop: BufferDropFn) -> Self {
let data = ManuallyDrop::new(Pin::new(vec![0; size]));
Self { data, drop }
pub fn new(data: Vec<u8>) -> Self {
tracing::trace!("buffer::new({:?})", data);
Self::Heap(Pin::new(data.into_boxed_slice()))
}
pub fn new(data: BufferData, drop: BufferDropFn) -> Self {
let data = ManuallyDrop::new(data);
Self { data, drop }
/// Returns the index of the underlying `Arena` if it was registered with
/// io_uring. Only for use with `UringIO` backend.
pub fn fixed_id(&self) -> Option<u32> {
match self {
Self::Heap { .. } => None,
Self::Pooled(buf) => buf.fixed_id(),
}
}
pub fn new_pooled(buf: ArenaBuffer) -> Self {
tracing::trace!("new_pooled({:?})", buf);
Self::Pooled(buf)
}
pub fn new_temporary(size: usize) -> Self {
TEMP_BUFFER_CACHE.with(|cache| {
if let Some(buffer) = cache.borrow_mut().get_buffer(size) {
Self::Heap(buffer)
} else {
Self::Heap(Pin::new(vec![0; size].into_boxed_slice()))
}
})
}
pub fn len(&self) -> usize {
self.data.len()
match self {
Self::Heap(buf) => buf.len(),
Self::Pooled(buf) => buf.logical_len(),
}
}
pub fn is_empty(&self) -> bool {
self.data.is_empty()
self.len() == 0
}
pub fn as_slice(&self) -> &[u8] {
&self.data
match self {
Self::Heap(buf) => {
// SAFETY: The buffer is guaranteed to be valid for the lifetime of the slice
unsafe { std::slice::from_raw_parts(buf.as_ptr(), buf.len()) }
}
Self::Pooled(buf) => buf,
}
}
#[allow(clippy::mut_from_ref)]
pub fn as_mut_slice(&self) -> &mut [u8] {
unsafe { std::slice::from_raw_parts_mut(self.as_mut_ptr(), self.data.len()) }
unsafe { std::slice::from_raw_parts_mut(self.as_mut_ptr(), self.len()) }
}
#[inline]
pub fn as_ptr(&self) -> *const u8 {
self.data.as_ptr()
match self {
Self::Heap(buf) => buf.as_ptr(),
Self::Pooled(buf) => buf.as_ptr(),
}
}
#[inline]
pub fn as_mut_ptr(&self) -> *mut u8 {
match self {
Self::Heap(buf) => buf.as_ptr() as *mut u8,
Self::Pooled(buf) => buf.as_ptr() as *mut u8,
}
}
}
thread_local! {
/// thread local cache to re-use temporary buffers to prevent churn when pool overflows
pub static TEMP_BUFFER_CACHE: RefCell<TempBufferCache> = RefCell::new(TempBufferCache::new());
}
/// A cache for temporary or any additional `Buffer` allocations beyond
/// what the `BufferPool` has room for, or for use before the pool is
/// fully initialized.
pub(crate) struct TempBufferCache {
/// The `[Database::page_size]` at the time the cache is initiated.
page_size: usize,
/// Cache of buffers of size `self.page_size`.
page_buffers: Vec<BufferData>,
/// Cache of buffers of size `self.page_size` + WAL_FRAME_HEADER_SIZE.
wal_frame_buffers: Vec<BufferData>,
/// Maximum number of buffers that will live in each cache.
max_cached: usize,
}
impl TempBufferCache {
const DEFAULT_MAX_CACHE_SIZE: usize = 256;
fn new() -> Self {
Self {
page_size: BufferPool::DEFAULT_PAGE_SIZE,
page_buffers: Vec::with_capacity(8),
wal_frame_buffers: Vec::with_capacity(8),
max_cached: Self::DEFAULT_MAX_CACHE_SIZE,
}
}
pub fn as_mut_ptr(&self) -> *mut u8 {
self.data.as_ptr() as *mut u8
/// If the `[Database::page_size]` is set, any temporary buffers that might
/// exist prior need to be cleared and new `page_size` needs to be saved.
pub fn reinit_cache(&mut self, page_size: usize) {
self.page_buffers.clear();
self.wal_frame_buffers.clear();
self.page_size = page_size;
}
fn get_buffer(&mut self, size: usize) -> Option<BufferData> {
match size {
sz if sz == self.page_size => self.page_buffers.pop(),
sz if sz == (self.page_size + WAL_FRAME_HEADER_SIZE) => self.wal_frame_buffers.pop(),
_ => None,
}
}
fn return_buffer(&mut self, buff: BufferData, len: usize) {
let sz = self.page_size;
let cache = match len {
n if n.eq(&sz) => &mut self.page_buffers,
n if n.eq(&(sz + WAL_FRAME_HEADER_SIZE)) => &mut self.wal_frame_buffers,
_ => return,
};
if self.max_cached > cache.len() {
cache.push(buff);
}
}
}

View File

@@ -380,6 +380,11 @@ impl Database {
}
fn init_pager(&self, page_size: Option<usize>) -> Result<Pager> {
let arena_size = if std::env::var("TESTING").is_ok_and(|v| v.eq_ignore_ascii_case("true")) {
BufferPool::TEST_ARENA_SIZE
} else {
BufferPool::DEFAULT_ARENA_SIZE
};
// Open existing WAL file if present
let mut maybe_shared_wal = self.maybe_shared_wal.write();
if let Some(shared_wal) = maybe_shared_wal.clone() {
@@ -387,7 +392,8 @@ impl Database {
None => unsafe { (*shared_wal.get()).page_size() as usize },
Some(size) => size,
};
let buffer_pool = Arc::new(BufferPool::new(Some(size)));
let buffer_pool =
BufferPool::begin_init(&self.io, arena_size).finalize_with_page_size(size)?;
let db_state = self.db_state.clone();
let wal = Rc::new(RefCell::new(WalFile::new(
@@ -406,8 +412,8 @@ impl Database {
)?;
return Ok(pager);
}
let buffer_pool = BufferPool::begin_init(&self.io, arena_size);
let buffer_pool = Arc::new(BufferPool::new(page_size));
// No existing WAL; create one.
let db_state = self.db_state.clone();
let mut pager = Pager::new(
@@ -423,16 +429,15 @@ impl Database {
let size = match page_size {
Some(size) => size as u32,
None => {
let size = pager
pager // if None is passed in, we know that we already initialized so we can safely call `with_header` here
.io
.block(|| pager.with_header(|header| header.page_size))
.unwrap_or_default()
.get();
buffer_pool.set_page_size(size as usize);
size
.get()
}
};
pager.page_size.set(Some(size));
let wal_path = format!("{}-wal", self.path);
let file = self.io.open_file(&wal_path, OpenFlags::Create, false)?;
let real_shared_wal = WalFileShared::new_shared(size, &self.io, file)?;
@@ -1373,6 +1378,7 @@ impl Connection {
}
*self._db.maybe_shared_wal.write() = None;
self.pager.borrow_mut().clear_page_cache();
let pager = self._db.init_pager(Some(size as usize))?;
self.pager.replace(Rc::new(pager));
self.pager.borrow().set_initial_page_size(size);

View File

@@ -7215,7 +7215,6 @@ mod tests {
use tempfile::TempDir;
use crate::{
io::BufferData,
storage::{
btree::{compute_free_space, fill_cell_payload, payload_overflow_threshold_max},
sqlite3_ondisk::{BTreeCell, PageContent, PageType},
@@ -7230,11 +7229,7 @@ mod tests {
fn get_page(id: usize) -> BTreePage {
let page = Arc::new(Page::new(id));
let drop_fn = Rc::new(|_| {});
let inner = PageContent::new(
0,
Arc::new(Buffer::new(BufferData::new(vec![0; 4096]), drop_fn)),
);
let inner = PageContent::new(0, Arc::new(Buffer::new_temporary(4096)));
page.get().contents.replace(inner);
let page = Arc::new(BTreePageInner {
page: RefCell::new(page),
@@ -8455,21 +8450,15 @@ mod tests {
fn setup_test_env(database_size: u32) -> Rc<Pager> {
let page_size = 512;
let buffer_pool = Arc::new(BufferPool::new(Some(page_size as usize)));
// Initialize buffer pool with correctly sized buffers
for _ in 0..10 {
let vec = vec![0; page_size as usize]; // Initialize with correct length, not just capacity
buffer_pool.put(Pin::new(vec));
}
let io: Arc<dyn IO> = Arc::new(MemoryIO::new());
let buffer_pool = BufferPool::begin_init(&io, page_size * 128);
let db_file = Arc::new(DatabaseFile::new(
io.open_file(":memory:", OpenFlags::Create, false).unwrap(),
));
let wal_file = io.open_file("test.wal", OpenFlags::Create, false).unwrap();
let wal_shared = WalFileShared::new_shared(page_size, &io, wal_file).unwrap();
let wal_shared = WalFileShared::new_shared(page_size as u32, &io, wal_file).unwrap();
let wal = Rc::new(RefCell::new(WalFile::new(
io.clone(),
wal_shared,
@@ -8499,7 +8488,9 @@ mod tests {
pager
.io
.block(|| {
pager.with_header_mut(|header| header.page_size = PageSize::new(page_size).unwrap())
pager.with_header_mut(|header| {
header.page_size = PageSize::new(page_size as u32).unwrap()
})
})
.unwrap();
@@ -8522,16 +8513,17 @@ mod tests {
// Setup overflow pages (2, 3, 4) with linking
let mut current_page = 2u32;
while current_page <= 4 {
let drop_fn = Rc::new(|_buf| {});
#[allow(clippy::arc_with_non_send_sync)]
let buf = Arc::new(Buffer::allocate(
let buf = Arc::new(Buffer::new_temporary(
pager
.io
.block(|| pager.with_header(|header| header.page_size))?
.get() as usize,
drop_fn,
));
let c = Completion::new_write(|_| {});
let _buf = buf.clone();
let c = Completion::new_write(move |_| {
let _ = _buf.clone();
});
let _c = pager
.db_file
.write_page(current_page as usize, buf.clone(), c)?;

View File

@@ -1,45 +1,497 @@
use crate::io::BufferData;
use crate::fast_lock::SpinLock;
use crate::io::TEMP_BUFFER_CACHE;
use crate::storage::page_bitmap::PageBitmap;
use crate::storage::sqlite3_ondisk::WAL_FRAME_HEADER_SIZE;
use crate::{turso_assert, Buffer, LimboError, IO};
use parking_lot::Mutex;
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::cell::UnsafeCell;
use std::ptr::NonNull;
use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
use std::sync::{Arc, OnceLock};
use super::sqlite3_ondisk::PageSize;
pub static BUFFER_POOL: OnceLock<Arc<BufferPool>> = OnceLock::new();
pub struct BufferPool {
pub free_buffers: Mutex<Vec<BufferData>>,
page_size: AtomicUsize,
#[derive(Debug)]
/// A buffer allocated from an arena from `[BufferPool]`
pub struct ArenaBuffer {
/// Pointer to the start of the buffer
ptr: NonNull<u8>,
/// Identifier for the `[Arena]` the buffer came from
arena_id: u32,
/// The index of the first page making up the buffer
page_idx: u32,
/// The requested length of the allocation.
/// The actual size of what is allocated for the
/// buffer is `len` rounded up to the next multiple of
/// `[Arena::page_size]`
len: usize,
}
impl BufferPool {
pub fn new(page_size: Option<usize>) -> Self {
Self {
free_buffers: Mutex::new(Vec::new()),
page_size: AtomicUsize::new(page_size.unwrap_or(PageSize::DEFAULT as usize)),
impl ArenaBuffer {
const fn new(ptr: NonNull<u8>, len: usize, arena_id: u32, page_idx: u32) -> Self {
ArenaBuffer {
ptr,
arena_id,
page_idx,
len,
}
}
pub fn set_page_size(&self, page_size: usize) {
self.page_size.store(page_size, Ordering::Relaxed);
#[inline(always)]
/// Returns the `id` of the underlying arena, only if it was registered with `io_uring`
pub const fn fixed_id(&self) -> Option<u32> {
// Arenas which are not registered will have `id`s <= UNREGISTERED_START
if self.arena_id < UNREGISTERED_START {
Some(self.arena_id)
} else {
None
}
}
pub fn get(&self) -> BufferData {
let buffer = self.free_buffers.lock().pop();
buffer.unwrap_or_else(|| Pin::new(vec![0; self.page_size.load(Ordering::Relaxed)]))
/// The requested size of the allocation, the actual size of the underlying buffer is rounded up to
/// the next multiple of the arena's page_size
pub const fn logical_len(&self) -> usize {
self.len
}
pub fn put(&self, buffer: BufferData) {
self.free_buffers.lock().push(buffer);
pub fn as_slice(&self) -> &[u8] {
unsafe { std::slice::from_raw_parts(self.ptr.as_ptr(), self.logical_len()) }
}
pub fn as_mut_slice(&mut self) -> &mut [u8] {
unsafe { std::slice::from_raw_parts_mut(self.ptr.as_ptr(), self.logical_len()) }
}
}
#[cfg(test)]
mod tests {
use super::*;
fn is_send_sync_static<T: Send + Sync + 'static>() {}
#[test]
fn test_send_sync() {
is_send_sync_static::<BufferPool>();
impl Drop for ArenaBuffer {
fn drop(&mut self) {
let pool = BUFFER_POOL
.get()
.expect("BufferPool not initialized, cannot free ArenaBuffer");
let inner = pool.inner_mut();
inner.free(self.logical_len(), self.arena_id, self.page_idx);
}
}
impl std::ops::Deref for ArenaBuffer {
type Target = [u8];
fn deref(&self) -> &Self::Target {
self.as_slice()
}
}
impl std::ops::DerefMut for ArenaBuffer {
fn deref_mut(&mut self) -> &mut Self::Target {
self.as_mut_slice()
}
}
/// Static Buffer pool managing multiple memory arenas
/// of which `[ArenaBuffer]`s are returned for requested allocations
pub struct BufferPool {
inner: UnsafeCell<PoolInner>,
}
unsafe impl Sync for BufferPool {}
unsafe impl Send for BufferPool {}
struct PoolInner {
/// An instance of the program's IO, used for registering
/// Arena's with io_uring.
io: Option<Arc<dyn IO>>,
/// An Arena which returns `ArenaBuffer`s of size `db_page_size`.
page_arena: Option<Arena>,
/// An Arena which returns `ArenaBuffer`s of size `db_page_size`
/// plus 24 byte `WAL_FRAME_HEADER_SIZE`, preventing the fragmentation
/// or complex book-keeping needed to use the same arena for both sizes.
wal_frame_arena: Option<Arena>,
/// A lock preventing concurrent initialization.
init_lock: Mutex<()>,
/// The size of each `Arena`, in bytes.
arena_size: AtomicUsize,
/// The `[Database::page_size]`, which the `page_arena` will use to
/// return buffers from `Self::get_page`.
db_page_size: AtomicUsize,
}
unsafe impl Sync for PoolInner {}
unsafe impl Send for PoolInner {}
impl Default for BufferPool {
fn default() -> Self {
Self::new(Self::DEFAULT_ARENA_SIZE)
}
}
impl BufferPool {
/// 3MB Default size for each `Arena`. Any higher and
/// it will fail to register the second arena with io_uring due
/// to `RL_MEMLOCK` limit for un-privileged processes being 8MB total.
pub const DEFAULT_ARENA_SIZE: usize = 3 * 1024 * 1024;
/// 1MB size For testing/CI
pub const TEST_ARENA_SIZE: usize = 1024 * 1024;
/// 4KB default page_size
pub const DEFAULT_PAGE_SIZE: usize = 4096;
/// Maximum size for each Arena (64MB total)
const MAX_ARENA_SIZE: usize = 32 * 1024 * 1024;
/// 64kb Minimum arena size
const MIN_ARENA_SIZE: usize = 1024 * 64;
fn new(arena_size: usize) -> Self {
turso_assert!(
(Self::MIN_ARENA_SIZE..Self::MAX_ARENA_SIZE).contains(&arena_size),
"Arena size needs to be between {}..{} bytes",
Self::MIN_ARENA_SIZE,
Self::MAX_ARENA_SIZE
);
Self {
inner: UnsafeCell::new(PoolInner {
page_arena: None,
wal_frame_arena: None,
arena_size: arena_size.into(),
db_page_size: Self::DEFAULT_PAGE_SIZE.into(),
init_lock: Mutex::new(()),
io: None,
}),
}
}
/// Request a `Buffer` of size `len`
#[inline]
pub fn allocate(len: usize) -> Buffer {
let pool = BUFFER_POOL.get().expect("BufferPool must be initialized");
pool.inner().allocate(len)
}
/// Request a `Buffer` the size of the `db_page_size` the `BufferPool` was initialized with.
#[inline]
pub fn get_page(&self) -> Buffer {
let inner = self.inner_mut();
inner.get_page()
}
/// Request a `Buffer` for use with a WAL frame,
/// `[Database::page_size] + `WAL_FRAME_HEADER_SIZE`
#[inline]
pub fn get_wal_frame(&self) -> Buffer {
let inner = self.inner_mut();
inner.get_frame()
}
#[inline]
fn inner(&self) -> &PoolInner {
unsafe { &*self.inner.get() }
}
#[inline]
#[allow(clippy::mut_from_ref)]
fn inner_mut(&self) -> &mut PoolInner {
unsafe { &mut *self.inner.get() }
}
/// Create a static `BufferPool` initialize the pool to the default page size, **without**
/// populating the Arenas. Arenas will not be created until `[BufferPool::finalize_page_size]`,
/// and the pool will temporarily return temporary buffers to prevent reallocation of the
/// arena if the page size is set to something other than the default value.
pub fn begin_init(io: &Arc<dyn IO>, arena_size: usize) -> Arc<Self> {
let pool = BUFFER_POOL.get_or_init(|| Arc::new(BufferPool::new(arena_size)));
let inner = pool.inner_mut();
// Just store the IO handle, don't create arena yet
if inner.io.is_none() {
inner.io = Some(Arc::clone(io));
}
pool.clone()
}
/// Call when `[Database::db_state]` is initialized, providing the `page_size` to allocate
/// an arena for the pool. Before this call, the pool will use temporary buffers which are
/// cached in thread local storage.
pub fn finalize_with_page_size(&self, page_size: usize) -> crate::Result<Arc<Self>> {
let pool = BUFFER_POOL.get().expect("BufferPool must be initialized");
let inner = pool.inner_mut();
tracing::trace!("finalize page size called with size {page_size}");
if page_size != BufferPool::DEFAULT_PAGE_SIZE {
// so far we have handed out some temporary buffers, since the page size is not
// default, we need to clear the cache so they aren't reused for other operations.
TEMP_BUFFER_CACHE.with(|cache| {
cache.borrow_mut().reinit_cache(page_size);
});
}
if inner.page_arena.is_some() {
return Ok(pool.clone());
}
inner.db_page_size.store(page_size, Ordering::Relaxed);
inner.init_arenas()?;
Ok(pool.clone())
}
#[inline]
/// Marks the underlying pages for an `ArenaBuffer` on `Drop` as free and
/// available for use by another allocation.
pub fn free(&self, size: usize, arena_id: u32, page_id: u32) {
self.inner_mut().free(size, arena_id, page_id);
}
}
impl PoolInner {
/// Allocate a buffer of the given length from the pool, falling back to
/// temporary thread local buffers if the pool is not initialized or is full.
pub fn allocate(&self, len: usize) -> Buffer {
turso_assert!(len > 0, "Cannot allocate zero-length buffer");
let db_page_size = self.db_page_size.load(Ordering::Relaxed);
let wal_frame_size = db_page_size + WAL_FRAME_HEADER_SIZE;
// Check if this is exactly a WAL frame size allocation
if len == wal_frame_size {
return self
.wal_frame_arena
.as_ref()
.and_then(|wal_arena| wal_arena.try_alloc(len))
.unwrap_or(Buffer::new_temporary(len));
}
// For all other sizes, use regular arena
self.page_arena
.as_ref()
.and_then(|arena| arena.try_alloc(len))
.unwrap_or(Buffer::new_temporary(len))
}
fn get_page(&mut self) -> Buffer {
let db_page_size = self.db_page_size.load(Ordering::Relaxed);
self.page_arena
.as_ref()
.and_then(|arena| arena.try_alloc(db_page_size))
.unwrap_or(Buffer::new_temporary(db_page_size))
}
fn get_frame(&mut self) -> Buffer {
let len = self.db_page_size.load(Ordering::Relaxed) + WAL_FRAME_HEADER_SIZE;
self.wal_frame_arena
.as_ref()
.and_then(|wal_arena| wal_arena.try_alloc(len))
.unwrap_or(Buffer::new_temporary(len))
}
/// Allocate a new arena for the pool to use
fn init_arenas(&mut self) -> crate::Result<()> {
// Prevent concurrent growth
let Some(_guard) = self.init_lock.try_lock() else {
tracing::debug!("Buffer pool is already growing, skipping initialization");
return Ok(()); // Already in progress
};
let arena_size = self.arena_size.load(Ordering::Relaxed);
let db_page_size = self.db_page_size.load(Ordering::Relaxed);
let io = self.io.as_ref().expect("Pool not initialized").clone();
// Create regular page arena
match Arena::new(db_page_size, arena_size, &io) {
Ok(arena) => {
tracing::trace!(
"added arena {} with size {} MB and page size {}",
arena.id,
arena_size / (1024 * 1024),
db_page_size
);
self.page_arena = Some(arena);
}
Err(e) => {
tracing::error!("Failed to create arena: {:?}", e);
return Err(LimboError::InternalError(format!(
"Failed to create arena: {e}",
)));
}
}
// Create WAL frame arena
let wal_frame_size = db_page_size + WAL_FRAME_HEADER_SIZE;
match Arena::new(wal_frame_size, arena_size, &io) {
Ok(arena) => {
tracing::trace!(
"added WAL frame arena {} with size {} MB and page size {}",
arena.id,
arena_size / (1024 * 1024),
wal_frame_size
);
self.wal_frame_arena = Some(arena);
}
Err(e) => {
tracing::error!("Failed to create WAL frame arena: {:?}", e);
return Err(LimboError::InternalError(format!(
"Failed to create WAL frame arena: {e}",
)));
}
}
Ok(())
}
fn free(&self, size: usize, arena_id: u32, page_idx: u32) {
// allocations of size `page_size` are more common, so we check that arena first.
if let Some(arena) = self.page_arena.as_ref() {
if arena_id == arena.id {
arena.free(page_idx, size);
return;
}
}
// check WAL frame arena
if let Some(wal_arena) = self.wal_frame_arena.as_ref() {
if arena_id == wal_arena.id {
wal_arena.free(page_idx, size);
return;
}
}
panic!("ArenaBuffer freed with no available parent Arena");
}
}
/// Preallocated block of memory used by the pool to distribute `ArenaBuffer`s
struct Arena {
/// Identifier to tie allocations back to the arena. If the arena is registerd
/// with `io_uring`, then the ID represents the index of the arena into the ring's
/// sparse registered buffer array created on the ring's initialization.
id: u32,
/// Base pointer to the arena returned by `mmap`
base: NonNull<u8>,
/// Total number of pages currently allocated/in use.
allocated_pages: AtomicUsize,
/// Currently free pages.
free_pages: SpinLock<PageBitmap>,
/// Total size of the arena in bytes
arena_size: usize,
/// Page size the total arena is divided into.
/// Because most allocations are of size `[Database::page_size]`, with an
/// additional 24 byte wal frame header, we treat this as the `page_size` to reduce
/// fragmentation to 24 bytes for regular pages.
page_size: usize,
}
impl Drop for Arena {
fn drop(&mut self) {
unsafe { arena::dealloc(self.base.as_ptr(), self.arena_size) };
}
}
/// Slots 0 and 1 will be reserved for Arenas which are registered buffers
/// with io_uring.
const UNREGISTERED_START: u32 = 2;
/// ID's for an Arena which is not registered with `io_uring`
/// registered arena will always have id = 0..=1
static NEXT_ID: AtomicU32 = AtomicU32::new(UNREGISTERED_START);
impl Arena {
/// Create a new arena with the given size and page size.
/// NOTE: Minimum arena size is page_size * 64
fn new(page_size: usize, arena_size: usize, io: &Arc<dyn IO>) -> Result<Self, String> {
let min_pages = arena_size.div_ceil(page_size);
let rounded_pages = (min_pages.max(64) + 63) & !63;
let rounded_bytes = rounded_pages * page_size;
// Guard against the global cap
if rounded_bytes > BufferPool::MAX_ARENA_SIZE {
return Err(format!(
"arena size {} B exceeds hard limit of {} B",
rounded_bytes,
BufferPool::MAX_ARENA_SIZE
));
}
let ptr = unsafe { arena::alloc(rounded_bytes) };
let base = NonNull::new(ptr).ok_or("Failed to allocate arena")?;
let id = io
.register_fixed_buffer(base, rounded_bytes)
.unwrap_or_else(|_| {
// Register with io_uring if possible, otherwise use next available ID
let next_id = NEXT_ID.fetch_add(1, Ordering::Relaxed);
tracing::trace!("Allocating arena with id {}", next_id);
next_id
});
let map = PageBitmap::new(rounded_pages as u32);
Ok(Self {
id,
base,
free_pages: SpinLock::new(map),
allocated_pages: AtomicUsize::new(0),
page_size,
arena_size: rounded_bytes,
})
}
/// Allocate a `Buffer` large enough for logical length `size`.
/// May span multiple pages
pub fn try_alloc(&self, size: usize) -> Option<Buffer> {
let pages = size.div_ceil(self.page_size) as u32;
let mut freemap = self.free_pages.lock();
let first_idx = if pages == 1 {
// use the optimized method for individual pages which attempts
// to leave large contiguous areas free of fragmentation for
// larger `runs`.
freemap.alloc_one()?
} else {
freemap.alloc_run(pages)?
};
self.allocated_pages
.fetch_add(pages as usize, Ordering::Relaxed);
let offset = first_idx as usize * self.page_size;
let ptr = unsafe { NonNull::new_unchecked(self.base.as_ptr().add(offset)) };
Some(Buffer::new_pooled(ArenaBuffer::new(
ptr, size, self.id, first_idx,
)))
}
/// Mark all relevant pages that include `size` starting at `page_idx` as free.
pub fn free(&self, page_idx: u32, size: usize) {
let mut bm = self.free_pages.lock();
let count = size.div_ceil(self.page_size);
turso_assert!(
!bm.check_run_free(page_idx, count as u32),
"must not already be marked free"
);
bm.free_run(page_idx, count as u32);
self.allocated_pages.fetch_sub(count, Ordering::Relaxed);
}
}
#[cfg(unix)]
mod arena {
#[cfg(target_os = "macos")]
use libc::MAP_ANON as MAP_ANONYMOUS;
#[cfg(target_os = "linux")]
use libc::MAP_ANONYMOUS;
use libc::{mmap, munmap, MAP_PRIVATE, PROT_READ, PROT_WRITE};
use std::ffi::c_void;
pub unsafe fn alloc(len: usize) -> *mut u8 {
let ptr = mmap(
std::ptr::null_mut(),
len,
PROT_READ | PROT_WRITE,
MAP_PRIVATE | MAP_ANONYMOUS,
-1,
0,
);
if ptr == libc::MAP_FAILED {
panic!("mmap failed: {}", std::io::Error::last_os_error());
}
#[cfg(target_os = "linux")]
{
libc::madvise(ptr, len, libc::MADV_HUGEPAGE);
}
ptr as *mut u8
}
pub unsafe fn dealloc(ptr: *mut u8, len: usize) {
let result = munmap(ptr as *mut c_void, len);
if result != 0 {
panic!("munmap failed: {}", std::io::Error::last_os_error());
}
}
}
#[cfg(not(unix))]
mod arena {
pub fn alloc(len: usize) -> *mut u8 {
let layout = std::alloc::Layout::from_size_align(len, std::mem::size_of::<u8>()).unwrap();
unsafe { std::alloc::alloc(layout) }
}
pub fn dealloc(ptr: *mut u8, len: usize) {
let layout = std::alloc::Layout::from_size_align(len, std::mem::size_of::<u8>()).unwrap();
unsafe { std::alloc::dealloc(ptr, layout) };
}
}

View File

@@ -314,7 +314,7 @@ impl PageBitmap {
///
/// Word 1: ...11111111_11110000 (bits 60-63 must be checked)
/// Word 2: 00000000_01111111... (bits 0-6 must be checked)
fn check_run_free(&self, start: u32, len: u32) -> bool {
pub(super) fn check_run_free(&self, start: u32, len: u32) -> bool {
if start + len > self.n_pages {
return false;
}

View File

@@ -621,12 +621,12 @@ impl PageHashMap {
#[cfg(test)]
mod tests {
use super::*;
use crate::io::{Buffer, BufferData};
use crate::storage::page_cache::CacheError;
use crate::storage::pager::{Page, PageRef};
use crate::storage::sqlite3_ondisk::PageContent;
use crate::BufferPool;
use std::ptr::NonNull;
use std::{num::NonZeroUsize, pin::Pin, rc::Rc, sync::Arc};
use std::{num::NonZeroUsize, sync::Arc};
use lru::LruCache;
use rand_chacha::{
@@ -642,8 +642,7 @@ mod tests {
pub fn page_with_content(page_id: usize) -> PageRef {
let page = Arc::new(Page::new(page_id));
{
let buffer_drop_fn = Rc::new(|_data: BufferData| {});
let buffer = Buffer::new(Pin::new(vec![0; 4096]), buffer_drop_fn);
let buffer = BufferPool::allocate(4096);
let page_content = PageContent {
offset: 0,
buffer: Arc::new(buffer),

View File

@@ -1,15 +1,18 @@
use crate::result::LimboResult;
use crate::storage::btree::BTreePageInner;
use crate::storage::buffer_pool::BufferPool;
use crate::storage::database::DatabaseStorage;
use crate::storage::sqlite3_ondisk::{
self, parse_wal_frame_header, DatabaseHeader, PageContent, PageSize, PageType,
use crate::storage::{
btree::BTreePageInner,
buffer_pool::BufferPool,
database::DatabaseStorage,
sqlite3_ondisk::{
self, parse_wal_frame_header, DatabaseHeader, PageContent, PageSize, PageType,
},
wal::{CheckpointResult, Wal},
};
use crate::storage::wal::{CheckpointResult, Wal};
use crate::types::{IOResult, WalFrameInfo};
use crate::util::IOExt as _;
use crate::{return_if_io, Completion, TransactionState};
use crate::{turso_assert, Buffer, Connection, LimboError, Result};
use crate::{
return_if_io, turso_assert, types::WalFrameInfo, Completion, Connection, IOResult, LimboError,
Result, TransactionState,
};
use parking_lot::RwLock;
use std::cell::{Cell, OnceCell, RefCell, UnsafeCell};
use std::collections::HashSet;
@@ -787,7 +790,7 @@ impl Pager {
)?;
turso_assert!(
ptrmap_page.get().id == ptrmap_pg_no as usize,
ptrmap_page.get().id == ptrmap_pg_no,
"ptrmap page has unexpected number"
);
self.add_dirty(&ptrmap_page);
@@ -1763,6 +1766,8 @@ impl Pager {
if let Some(size) = self.page_size.get() {
default_header.page_size = PageSize::new(size).expect("page size");
}
self.buffer_pool
.finalize_with_page_size(default_header.page_size.get() as usize)?;
let page = allocate_new_page(1, &self.buffer_pool, 0);
let contents = page.get_contents();
@@ -2151,12 +2156,8 @@ impl Pager {
pub fn allocate_new_page(page_id: usize, buffer_pool: &Arc<BufferPool>, offset: usize) -> PageRef {
let page = Arc::new(Page::new(page_id));
{
let buffer = buffer_pool.get();
let bp = buffer_pool.clone();
let drop_fn = Rc::new(move |buf| {
bp.put(buf);
});
let buffer = Arc::new(Buffer::new(buffer, drop_fn));
let buffer = buffer_pool.get_page();
let buffer = Arc::new(buffer);
page.set_loaded();
page.get().contents = Some(PageContent::new(offset, buffer));
}
@@ -2432,10 +2433,10 @@ mod ptrmap_tests {
));
// Construct interfaces for the pager
let buffer_pool = Arc::new(BufferPool::new(Some(page_size as usize)));
let page_cache = Arc::new(RwLock::new(DumbLruPageCache::new(
(initial_db_pages + 10) as usize,
)));
let pages = initial_db_pages + 10;
let sz = std::cmp::max(std::cmp::min(pages, 64), pages);
let buffer_pool = BufferPool::begin_init(&io, (sz * page_size) as usize);
let page_cache = Arc::new(RwLock::new(DumbLruPageCache::new(sz as usize)));
let wal = Rc::new(RefCell::new(WalFile::new(
io.clone(),

View File

@@ -852,13 +852,9 @@ pub fn begin_read_page(
allow_empty_read: bool,
) -> Result<Completion> {
tracing::trace!("begin_read_btree_page(page_idx = {})", page_idx);
let buf = buffer_pool.get();
let drop_fn = Rc::new(move |buf| {
let buffer_pool = buffer_pool.clone();
buffer_pool.put(buf);
});
let buf = buffer_pool.get_page();
#[allow(clippy::arc_with_non_send_sync)]
let buf = Arc::new(Buffer::new(buf, drop_fn));
let buf = Arc::new(buf);
let complete = Box::new(move |mut buf: Arc<Buffer>, bytes_read: i32| {
let buf_len = buf.len();
turso_assert!(
@@ -867,7 +863,7 @@ pub fn begin_read_page(
);
let page = page.clone();
if bytes_read == 0 {
buf = Arc::new(Buffer::allocate(0, Rc::new(|_| {})));
buf = Arc::new(Buffer::new_temporary(0));
}
if finish_read_page(page_idx, buf, page.clone()).is_err() {
page.set_error();
@@ -1557,10 +1553,9 @@ pub fn write_varint_to_vec(value: u64, payload: &mut Vec<u8>) {
/// We need to read the WAL file on open to reconstruct the WAL frame cache.
pub fn read_entire_wal_dumb(file: &Arc<dyn File>) -> Result<Arc<UnsafeCell<WalFileShared>>> {
let drop_fn = Rc::new(|_buf| {});
let size = file.size()?;
#[allow(clippy::arc_with_non_send_sync)]
let buf_for_pread = Arc::new(Buffer::allocate(size as usize, drop_fn));
let buf_for_pread = Arc::new(Buffer::new_temporary(size as usize));
let header = Arc::new(SpinLock::new(WalHeader::default()));
#[allow(clippy::arc_with_non_send_sync)]
let wal_file_shared_ret = Arc::new(UnsafeCell::new(WalFileShared {
@@ -1765,17 +1760,13 @@ pub fn read_entire_wal_dumb(file: &Arc<dyn File>) -> Result<Arc<UnsafeCell<WalFi
}
pub fn begin_read_wal_frame_raw(
buffer_pool: &Arc<BufferPool>,
io: &Arc<dyn File>,
offset: usize,
page_size: u32,
complete: Box<dyn Fn(Arc<Buffer>, i32)>,
complete: Box<Complete>,
) -> Result<Completion> {
tracing::trace!("begin_read_wal_frame_raw(offset={})", offset);
let drop_fn = Rc::new(|_buf| {});
let buf = Arc::new(Buffer::allocate(
page_size as usize + WAL_FRAME_HEADER_SIZE,
drop_fn,
));
let buf = Arc::new(buffer_pool.get_wal_frame());
#[allow(clippy::arc_with_non_send_sync)]
let c = Completion::new_read(buf, complete);
let c = io.pread(offset, c)?;
@@ -1786,15 +1777,11 @@ pub fn begin_read_wal_frame(
io: &Arc<dyn File>,
offset: usize,
buffer_pool: Arc<BufferPool>,
complete: Box<dyn Fn(Arc<Buffer>, i32)>,
complete: Box<Complete>,
) -> Result<Completion> {
tracing::trace!("begin_read_wal_frame(offset={})", offset);
let buf = buffer_pool.get();
let drop_fn = Rc::new(move |buf| {
let buffer_pool = buffer_pool.clone();
buffer_pool.put(buf);
});
let buf = Arc::new(Buffer::new(buf, drop_fn));
let buf = buffer_pool.get_page();
let buf = Arc::new(buf);
#[allow(clippy::arc_with_non_send_sync)]
let c = Completion::new_read(buf, complete);
let c = io.pread(offset, c)?;
@@ -1821,6 +1808,7 @@ pub fn parse_wal_frame_header(frame: &[u8]) -> (WalFrameHeader, &[u8]) {
}
pub fn prepare_wal_frame(
buffer_pool: &Arc<BufferPool>,
wal_header: &WalHeader,
prev_checksums: (u32, u32),
page_size: u32,
@@ -1830,8 +1818,7 @@ pub fn prepare_wal_frame(
) -> ((u32, u32), Arc<Buffer>) {
tracing::trace!(page_number);
let drop_fn = Rc::new(|_buf| {});
let buffer = Buffer::allocate(page_size as usize + WAL_FRAME_HEADER_SIZE, drop_fn);
let buffer = buffer_pool.get_wal_frame();
let frame = buffer.as_mut_slice();
frame[WAL_FRAME_HEADER_SIZE..].copy_from_slice(page);
@@ -1858,9 +1845,7 @@ pub fn prepare_wal_frame(
pub fn begin_write_wal_header(io: &Arc<dyn File>, header: &WalHeader) -> Result<Completion> {
tracing::trace!("begin_write_wal_header");
let buffer = {
let drop_fn = Rc::new(|_buf| {});
let buffer = Buffer::allocate(WAL_HEADER_SIZE, drop_fn);
let buffer = Buffer::new_temporary(WAL_HEADER_SIZE);
let buf = buffer.as_mut_slice();
buf[0..4].copy_from_slice(&header.magic.to_be_bytes());

View File

@@ -336,10 +336,8 @@ impl Batch {
self.items.insert(id, buf_clone);
// Re-initialize scratch with a fresh buffer
let raw = pool.get();
let pool_clone = pool.clone();
let drop_fn = Rc::new(move |b| pool_clone.put(b));
let new_buf = Arc::new(Buffer::new(raw, drop_fn));
let raw = pool.get_page();
let new_buf = Arc::new(raw);
unsafe {
let inner = &mut *scratch.inner.get();
@@ -929,13 +927,13 @@ impl Wal for WalFile {
bytes_read == buf_len as i32,
"read({bytes_read}) != expected({buf_len})"
);
let buf_ptr = buf.as_ptr();
let buf_ptr = buf.as_mut_ptr();
unsafe {
std::ptr::copy_nonoverlapping(buf_ptr, frame_ptr, frame_len);
}
});
let c =
begin_read_wal_frame_raw(&self.get_shared().file, offset, self.page_size(), complete)?;
begin_read_wal_frame_raw(&self.buffer_pool, &self.get_shared().file, offset, complete)?;
Ok(c)
}
@@ -1005,6 +1003,7 @@ impl Wal for WalFile {
let header = header.lock();
let checksums = self.last_checksum;
let (checksums, frame_bytes) = prepare_wal_frame(
&self.buffer_pool,
&header,
checksums,
header.page_size,
@@ -1046,6 +1045,7 @@ impl Wal for WalFile {
let page_content = page.get_contents();
let page_buf = page_content.as_ptr();
let (frame_checksums, frame_bytes) = prepare_wal_frame(
&self.buffer_pool,
&header,
checksums,
header.page_size,
@@ -1218,14 +1218,9 @@ impl WalFile {
buffer_pool: Arc<BufferPool>,
) -> Self {
let checkpoint_page = Arc::new(Page::new(0));
let buffer = buffer_pool.get();
let buffer = buffer_pool.get_page();
{
let buffer_pool = buffer_pool.clone();
let drop_fn = Rc::new(move |buf| {
buffer_pool.put(buf);
});
checkpoint_page.get().contents =
Some(PageContent::new(0, Arc::new(Buffer::new(buffer, drop_fn))));
checkpoint_page.get().contents = Some(PageContent::new(0, Arc::new(buffer)));
}
let header = unsafe { shared.get().as_mut().unwrap().wal_header.lock() };

View File

@@ -3,6 +3,7 @@ use crate::function::AlterTableFunc;
use crate::numeric::{NullableInteger, Numeric};
use crate::schema::Table;
use crate::storage::btree::{integrity_check, IntegrityCheckError, IntegrityCheckState};
use crate::storage::buffer_pool::BUFFER_POOL;
use crate::storage::database::DatabaseFile;
use crate::storage::page_cache::DumbLruPageCache;
use crate::storage::pager::{AtomicDbState, CreateBTreeFlags, DbState};
@@ -62,9 +63,7 @@ use crate::{
vector::{vector32, vector64, vector_distance_cos, vector_distance_l2, vector_extract},
};
use crate::{
info, turso_assert, BufferPool, OpenFlags, RefValue, Row, StepResult, TransactionState,
};
use crate::{info, turso_assert, OpenFlags, RefValue, Row, StepResult, TransactionState};
use super::{
insn::{Cookie, RegisterOrLiteral},
@@ -6460,7 +6459,7 @@ pub fn op_open_ephemeral(
.block(|| pager.with_header(|header| header.page_size))?
.get();
let buffer_pool = Arc::new(BufferPool::new(Some(page_size as usize)));
let buffer_pool = BUFFER_POOL.get().expect("Buffer pool not initialized");
let page_cache = Arc::new(RwLock::new(DumbLruPageCache::default()));
let pager = Rc::new(Pager::new(
@@ -6478,7 +6477,6 @@ pub fn op_open_ephemeral(
.block(|| pager.with_header(|header| header.page_size))
.unwrap_or_default()
.get() as usize;
buffer_pool.set_page_size(page_size);
state.op_open_ephemeral_state = OpOpenEphemeralState::StartingTxn { pager };
}

View File

@@ -9,7 +9,7 @@ use tempfile;
use crate::{
error::LimboError,
io::{Buffer, BufferData, Completion, File, OpenFlags, IO},
io::{Buffer, Completion, File, OpenFlags, IO},
storage::sqlite3_ondisk::{read_varint, varint_len, write_varint},
translate::collate::CollationSeq,
turso_assert,
@@ -364,8 +364,7 @@ impl SortedChunk {
let read_buffer_size = self.buffer.borrow().len() - self.buffer_len.get();
let read_buffer_size = read_buffer_size.min(self.chunk_size - self.total_bytes_read.get());
let drop_fn = Rc::new(|_buffer: BufferData| {});
let read_buffer = Buffer::allocate(read_buffer_size, drop_fn);
let read_buffer = Buffer::new_temporary(read_buffer_size);
let read_buffer_ref = Arc::new(read_buffer);
let chunk_io_state_copy = self.io_state.clone();
@@ -414,8 +413,7 @@ impl SortedChunk {
self.chunk_size += size_len + record_size;
}
let drop_fn = Rc::new(|_buffer: BufferData| {});
let buffer = Buffer::allocate(self.chunk_size, drop_fn);
let buffer = Buffer::new_temporary(self.chunk_size);
let mut buf_pos = 0;
let buf = buffer.as_mut_slice();

View File

@@ -1,4 +1,4 @@
use std::{rc::Rc, sync::Arc};
use std::sync::Arc;
use turso_core::{types::Text, Buffer, Completion, LimboError, Value};
@@ -48,7 +48,7 @@ pub async fn db_bootstrap<C: ProtocolIO>(
let content_len = chunk.len();
// todo(sivukhin): optimize allocations here
#[allow(clippy::arc_with_non_send_sync)]
let buffer = Arc::new(Buffer::allocate(chunk.len(), Rc::new(|_| {})));
let buffer = Arc::new(Buffer::new_temporary(chunk.len()));
buffer.as_mut_slice().copy_from_slice(chunk);
let mut completions = Vec::with_capacity(dbs.len());
for db in dbs {

View File

@@ -11,7 +11,7 @@ EXPERIMENTAL_FLAGS=""
# if RUST_LOG is non-empty, enable tracing output
if [ -n "$RUST_LOG" ]; then
"$TURSODB" -m list -q $EXPERIMENTAL_FLAGS -t testing/test.log "$@"
TESTING="true" "$TURSODB" -m list -q $EXPERIMENTAL_FLAGS -t testing/test.log "$@"
else
"$TURSODB" -m list -q $EXPERIMENTAL_FLAGS "$@"
TESTING="true" "$TURSODB" -m list -q $EXPERIMENTAL_FLAGS "$@"
fi

View File

@@ -1,7 +1,6 @@
use std::{
collections::{HashMap, HashSet},
path::Path,
pin::Pin,
rc::Rc,
sync::Arc,
};
@@ -428,10 +427,13 @@ impl BTreeGenerator<'_> {
}
fn write_at(io: &impl IO, file: Arc<dyn File>, offset: usize, data: &[u8]) {
let completion = Completion::new_write(|_| {});
let drop_fn = Rc::new(move |_| {});
#[allow(clippy::arc_with_non_send_sync)]
let buffer = Arc::new(Buffer::new(Pin::new(data.to_vec()), drop_fn));
let buffer = Arc::new(Buffer::new(data.to_vec()));
let _buf = buffer.clone();
let completion = Completion::new_write(move |_| {
// reference the buffer to keep alive for async io
let _buf = _buf.clone();
});
let result = file.pwrite(offset, buffer, completion).unwrap();
while !result.is_completed() {
io.run_once().unwrap();