mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-23 00:45:37 +01:00
Move TLC buffer cache to io/mod
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
use crate::storage::buffer_pool::{ArenaBuffer, TEMP_BUFFER_CACHE};
|
||||
use crate::Result;
|
||||
use crate::storage::buffer_pool::ArenaBuffer;
|
||||
use crate::storage::sqlite3_ondisk::WAL_FRAME_HEADER_SIZE;
|
||||
use crate::{BufferPool, Result};
|
||||
use bitflags::bitflags;
|
||||
use cfg_block::cfg_block;
|
||||
use std::fmt;
|
||||
@@ -330,9 +331,11 @@ impl Drop for Buffer {
|
||||
fn drop(&mut self) {
|
||||
let len = self.len();
|
||||
if let Self::Heap(buf) = self {
|
||||
crate::storage::buffer_pool::TEMP_BUFFER_CACHE.with(|cache| {
|
||||
TEMP_BUFFER_CACHE.with(|cache| {
|
||||
let mut cache = cache.borrow_mut();
|
||||
// take ownership of the buffer by swapping it with a dummy
|
||||
let buffer = std::mem::replace(buf, Pin::new(vec![].into_boxed_slice()));
|
||||
cache.borrow_mut().return_buffer(buffer, len);
|
||||
cache.return_buffer(buffer, len);
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -406,6 +409,56 @@ impl Buffer {
|
||||
}
|
||||
}
|
||||
|
||||
thread_local! {
|
||||
/// thread local cache to re-use temporary buffers to prevent churn when pool overflows
|
||||
pub static TEMP_BUFFER_CACHE: RefCell<TempBufferCache> = RefCell::new(TempBufferCache::new());
|
||||
}
|
||||
|
||||
pub(crate) struct TempBufferCache {
|
||||
// Buffers indexed by size, we only cache common sizes
|
||||
page_size: usize,
|
||||
page_buffers: Vec<BufferData>,
|
||||
wal_frame_buffers: Vec<BufferData>,
|
||||
max_cached: usize,
|
||||
}
|
||||
|
||||
impl TempBufferCache {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
page_size: BufferPool::DEFAULT_PAGE_SIZE,
|
||||
page_buffers: Vec::with_capacity(8),
|
||||
wal_frame_buffers: Vec::with_capacity(8),
|
||||
max_cached: 512,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn reinit_cache(&mut self, page_size: usize) {
|
||||
self.page_buffers.clear();
|
||||
self.wal_frame_buffers.clear();
|
||||
self.page_size = page_size;
|
||||
}
|
||||
|
||||
fn get_buffer(&mut self, size: usize) -> Option<BufferData> {
|
||||
match size {
|
||||
sz if sz == self.page_size => self.page_buffers.pop(),
|
||||
sz if sz == (self.page_size + WAL_FRAME_HEADER_SIZE) => self.wal_frame_buffers.pop(),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn return_buffer(&mut self, buff: BufferData, len: usize) {
|
||||
let sz = self.page_size;
|
||||
let cache = match len {
|
||||
n if n.eq(&sz) => &mut self.page_buffers,
|
||||
n if n.eq(&(sz + WAL_FRAME_HEADER_SIZE)) => &mut self.wal_frame_buffers,
|
||||
_ => return,
|
||||
};
|
||||
if self.max_cached > cache.len() {
|
||||
cache.push(buff);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
cfg_block! {
|
||||
#[cfg(all(target_os = "linux", feature = "io_uring"))] {
|
||||
mod io_uring;
|
||||
|
||||
@@ -1,72 +1,41 @@
|
||||
use crate::fast_lock::SpinLock;
|
||||
use crate::io::BufferData;
|
||||
use crate::io::TEMP_BUFFER_CACHE;
|
||||
use crate::storage::sqlite3_ondisk::WAL_FRAME_HEADER_SIZE;
|
||||
use crate::{turso_assert, Buffer, LimboError, IO};
|
||||
use parking_lot::Mutex;
|
||||
use std::cell::{RefCell, UnsafeCell};
|
||||
use std::cell::UnsafeCell;
|
||||
use std::ptr::NonNull;
|
||||
use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, OnceLock};
|
||||
|
||||
pub static BUFFER_POOL: OnceLock<Arc<BufferPool>> = OnceLock::new();
|
||||
|
||||
// Packed metadata structure
|
||||
// Bits 0-23: logical length (16MB max)
|
||||
// Bits 26-40: arena_id (15 bits)
|
||||
// Bits 41-55: page_idx (15 bits)
|
||||
#[derive(Clone, Copy)]
|
||||
#[repr(transparent)]
|
||||
struct Metadata(u64);
|
||||
impl Metadata {
|
||||
const fn new(logical_len: usize, arena_id: u32, page_idx: u32) -> Self {
|
||||
let mut meta = logical_len as u64 & 0xFFFFFF;
|
||||
meta |= ((arena_id as u64) & 0x7FFF) << 26;
|
||||
meta |= ((page_idx as u64) & 0x7FFF) << 41;
|
||||
Self(meta)
|
||||
}
|
||||
const fn logical_len(&self) -> usize {
|
||||
(self.0 & 0xFFFFFF) as usize
|
||||
}
|
||||
|
||||
const fn arena_id(&self) -> u32 {
|
||||
((self.0 >> 26) & 0x7FFF) as u32
|
||||
}
|
||||
|
||||
const fn page_idx(&self) -> u32 {
|
||||
((self.0 >> 41) & 0x7FFF) as u32
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
/// A buffer allocated from an arena, or a clone of one.
|
||||
/// NOTE: we currently only deep clone a buffer in the btree when defragmenting.
|
||||
/// we can remove all the plumbing when that is merged.
|
||||
/// A buffer allocated from an arena from `[BufferPool]`
|
||||
pub struct ArenaBuffer {
|
||||
ptr: NonNull<u8>,
|
||||
meta: Metadata,
|
||||
arena_id: u32,
|
||||
page_idx: u32,
|
||||
len: usize,
|
||||
}
|
||||
|
||||
const REGISTERED_ID: u32 = 0;
|
||||
|
||||
impl std::fmt::Debug for Metadata {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("BufferMetadata")
|
||||
.field("logical_len", &self.logical_len())
|
||||
.field("arena_id", &self.arena_id())
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl ArenaBuffer {
|
||||
const fn new(ptr: NonNull<u8>, meta: Metadata) -> Self {
|
||||
ArenaBuffer { ptr, meta }
|
||||
const fn new(ptr: NonNull<u8>, len: usize, arena_id: u32, page_idx: u32) -> Self {
|
||||
ArenaBuffer {
|
||||
ptr,
|
||||
arena_id,
|
||||
page_idx,
|
||||
len,
|
||||
}
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
/// Returns the `id` of the underlying arena if it is registered with `io_uring`
|
||||
pub fn fixed_id(&self) -> Option<u32> {
|
||||
let id = self.meta.arena_id();
|
||||
if id == REGISTERED_ID {
|
||||
Some(id)
|
||||
if self.arena_id == REGISTERED_ID {
|
||||
Some(REGISTERED_ID)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
@@ -75,7 +44,7 @@ impl ArenaBuffer {
|
||||
/// The requested size of the allocation. The actual size is always rounded up to
|
||||
/// the next multiple of ARENA_PAGE_SIZE.
|
||||
pub const fn logical_len(&self) -> usize {
|
||||
self.meta.logical_len()
|
||||
self.len
|
||||
}
|
||||
pub fn as_slice(&self) -> &[u8] {
|
||||
unsafe { std::slice::from_raw_parts(self.ptr.as_ptr(), self.logical_len()) }
|
||||
@@ -90,11 +59,7 @@ impl Drop for ArenaBuffer {
|
||||
let pool = BUFFER_POOL
|
||||
.get()
|
||||
.expect("BufferPool not initialized, cannot free ArenaBuffer");
|
||||
pool.free(
|
||||
self.meta.arena_id(),
|
||||
self.meta.page_idx(),
|
||||
self.logical_len(),
|
||||
);
|
||||
pool.free(self.logical_len(), self.arena_id, self.page_idx);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -213,12 +178,14 @@ impl BufferPool {
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn free(&self, arena_id: u32, page_id: u32, size: usize) {
|
||||
self.inner_mut().free(arena_id, page_id, size);
|
||||
pub fn free(&self, size: usize, arena_id: u32, page_id: u32) {
|
||||
self.inner_mut().free(size, arena_id, page_id);
|
||||
}
|
||||
}
|
||||
|
||||
impl PoolInner {
|
||||
/// Allocate a buffer of the given length from the pool, falling back to
|
||||
/// temporary thread local buffers if the pool is not initialized or full
|
||||
pub fn allocate(&self, len: usize) -> Buffer {
|
||||
turso_assert!(len > 0, "Cannot allocate zero-length buffer");
|
||||
let Some(arena) = self.arena.as_ref() else {
|
||||
@@ -232,10 +199,7 @@ impl PoolInner {
|
||||
arena.id,
|
||||
first_idx
|
||||
);
|
||||
return Buffer::new_pooled(ArenaBuffer::new(
|
||||
ptr,
|
||||
Metadata::new(len, arena.id, first_idx),
|
||||
));
|
||||
return Buffer::new_pooled(ArenaBuffer::new(ptr, len, arena.id, first_idx));
|
||||
}
|
||||
Buffer::new_temporary(len)
|
||||
}
|
||||
@@ -272,7 +236,7 @@ impl PoolInner {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn free(&mut self, arena_id: u32, page_idx: u32, size: usize) {
|
||||
pub fn free(&mut self, size: usize, arena_id: u32, page_idx: u32) {
|
||||
let arena = self
|
||||
.arena
|
||||
.as_mut()
|
||||
@@ -346,7 +310,7 @@ impl Arena {
|
||||
return None;
|
||||
}
|
||||
if pages == 1 {
|
||||
// fast path: most allocations are single pages
|
||||
// fast path: for now, most/all allocations are single pages
|
||||
if let Some(page_idx) = free.pop() {
|
||||
self.allocated_pages.fetch_add(pages, Ordering::Relaxed);
|
||||
let offset = page_idx as usize * self.page_size;
|
||||
@@ -377,9 +341,10 @@ impl Arena {
|
||||
self.allocated_pages.fetch_sub(count, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
#[cold]
|
||||
fn try_alloc_many(&self, pages: usize) -> Option<FreeEntry> {
|
||||
// TODO: for now, we don't request buffers > len of a wal frame.
|
||||
// if we eventually do, we can optimize this further
|
||||
// TODO (preston): we can optimize this further when we start allocating larger
|
||||
// contiguous blocks for coalescing. this is 'unused' for now
|
||||
let mut free = self.free_pages.lock();
|
||||
if pages <= 3 && free.len() >= pages {
|
||||
let start = free.len() - pages;
|
||||
@@ -461,7 +426,6 @@ mod arena {
|
||||
|
||||
#[cfg(not(unix))]
|
||||
mod arena {
|
||||
use crate::bufferpool::DEFAULT_PAGE_SIZE;
|
||||
pub fn alloc(len: usize) -> *mut u8 {
|
||||
let layout = std::alloc::Layout::from_size_align(len, std::mem::size_of::<u8>()).unwrap();
|
||||
unsafe { std::alloc::alloc(layout) }
|
||||
@@ -471,52 +435,3 @@ mod arena {
|
||||
unsafe { std::alloc::dealloc(ptr, layout) };
|
||||
}
|
||||
}
|
||||
|
||||
thread_local! {
|
||||
pub static TEMP_BUFFER_CACHE: RefCell<TempBufferCache> = RefCell::new(TempBufferCache::new());
|
||||
}
|
||||
|
||||
pub(crate) struct TempBufferCache {
|
||||
// Buffers indexed by size, we only cache common sizes
|
||||
page_size: usize,
|
||||
page_buffers: Vec<BufferData>,
|
||||
wal_frame_buffers: Vec<BufferData>,
|
||||
max_cached: usize,
|
||||
}
|
||||
|
||||
impl TempBufferCache {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
page_size: BufferPool::DEFAULT_PAGE_SIZE,
|
||||
page_buffers: Vec::with_capacity(8),
|
||||
wal_frame_buffers: Vec::with_capacity(8),
|
||||
max_cached: 512,
|
||||
}
|
||||
}
|
||||
|
||||
fn reinit_cache(&mut self, page_size: usize) {
|
||||
self.page_buffers.clear();
|
||||
self.wal_frame_buffers.clear();
|
||||
self.page_size = page_size;
|
||||
}
|
||||
|
||||
pub fn get_buffer(&mut self, size: usize) -> Option<BufferData> {
|
||||
match size {
|
||||
sz if sz == self.page_size => self.page_buffers.pop(),
|
||||
sz if sz == (self.page_size + WAL_FRAME_HEADER_SIZE) => self.wal_frame_buffers.pop(),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn return_buffer(&mut self, buff: BufferData, len: usize) {
|
||||
let sz = self.page_size;
|
||||
let cache = match len {
|
||||
n if n.eq(&sz) => &mut self.page_buffers,
|
||||
n if n.eq(&(sz + WAL_FRAME_HEADER_SIZE)) => &mut self.wal_frame_buffers,
|
||||
_ => return,
|
||||
};
|
||||
if self.max_cached > cache.len() {
|
||||
cache.push(buff);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user