mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-23 00:45:37 +01:00
Apply review suggestions, remove FreeEntry
This commit is contained in:
@@ -42,7 +42,7 @@ const MAX_IOVEC_ENTRIES: usize = CKPT_BATCH_PAGES;
|
||||
const MAX_WAIT: usize = 4;
|
||||
|
||||
/// One memory arena for DB pages and another for WAL frames
|
||||
const ARENAS: usize = 2;
|
||||
const ARENA_COUNT: usize = 2;
|
||||
|
||||
pub struct UringIO {
|
||||
inner: Rc<RefCell<InnerUringIO>>,
|
||||
@@ -61,7 +61,7 @@ struct WrappedIOUring {
|
||||
struct InnerUringIO {
|
||||
ring: WrappedIOUring,
|
||||
free_files: VecDeque<u32>,
|
||||
free_arenas: [Option<(NonNull<u8>, usize)>; 2],
|
||||
free_arenas: [Option<(NonNull<u8>, usize)>; ARENA_COUNT],
|
||||
}
|
||||
|
||||
/// preallocated vec of iovec arrays to avoid allocations during writev operations
|
||||
@@ -113,7 +113,8 @@ impl UringIO {
|
||||
// RL_MEMLOCK cap is typically 8MB, the current design is to have one large arena
|
||||
// registered at startup and therefore we can simply use the zero index, falling back
|
||||
// to similar logic as the existing buffer pool for cases where it is over capacity.
|
||||
ring.submitter().register_buffers_sparse(ARENAS as u32)?;
|
||||
ring.submitter()
|
||||
.register_buffers_sparse(ARENA_COUNT as u32)?;
|
||||
let inner = InnerUringIO {
|
||||
ring: WrappedIOUring {
|
||||
ring,
|
||||
@@ -122,7 +123,7 @@ impl UringIO {
|
||||
iov_pool: IovecPool::new(),
|
||||
},
|
||||
free_files: (0..FILES).collect(),
|
||||
free_arenas: [const { None }; ARENAS],
|
||||
free_arenas: [const { None }; ARENA_COUNT],
|
||||
};
|
||||
debug!("Using IO backend 'io-uring'");
|
||||
Ok(Self {
|
||||
|
||||
@@ -74,7 +74,7 @@ pub trait File: Send + Sync {
|
||||
while pos < file_size {
|
||||
let chunk_size = (file_size - pos).min(BUFFER_SIZE);
|
||||
// Read from source
|
||||
let read_buffer = Arc::new(Buffer::allocate(chunk_size, Rc::new(|_| {})));
|
||||
let read_buffer = Arc::new(Buffer::new_temporary(chunk_size));
|
||||
let read_completion = self.pread(
|
||||
pos,
|
||||
Completion::new_read(read_buffer.clone(), move |_, _| {}),
|
||||
|
||||
@@ -380,8 +380,8 @@ impl Database {
|
||||
}
|
||||
|
||||
fn init_pager(&self, page_size: Option<usize>) -> Result<Pager> {
|
||||
let arena_size = if std::env::var("CI").is_ok_and(|v| v.eq_ignore_ascii_case("true")) {
|
||||
BufferPool::TEST_AREA_SIZE
|
||||
let arena_size = if std::env::var("TESTING").is_ok_and(|v| v.eq_ignore_ascii_case("true")) {
|
||||
BufferPool::TEST_ARENA_SIZE
|
||||
} else {
|
||||
BufferPool::DEFAULT_ARENA_SIZE
|
||||
};
|
||||
|
||||
@@ -126,7 +126,7 @@ impl BufferPool {
|
||||
/// to `RL_MEMLOCK` limit for un-privileged processes being 8MB total.
|
||||
pub const DEFAULT_ARENA_SIZE: usize = 3 * 1024 * 1024;
|
||||
/// 1MB size For testing/CI
|
||||
pub const TEST_AREA_SIZE: usize = 1024 * 1024;
|
||||
pub const TEST_ARENA_SIZE: usize = 1024 * 1024;
|
||||
/// 4KB default page_size
|
||||
pub const DEFAULT_PAGE_SIZE: usize = 4096;
|
||||
/// Maximum size for each Arena (64MB total)
|
||||
@@ -153,13 +153,14 @@ impl BufferPool {
|
||||
}
|
||||
|
||||
/// Request a `Buffer` of size `len`
|
||||
#[inline]
|
||||
pub fn allocate(len: usize) -> Buffer {
|
||||
let pool = BUFFER_POOL.get().expect("BufferPool must be initialized");
|
||||
pool.inner().allocate(len)
|
||||
}
|
||||
|
||||
/// Request a `Buffer` the size of the `db_page_size` the `BufferPool`
|
||||
/// was initialized with.
|
||||
/// Request a `Buffer` the size of the `db_page_size` the `BufferPool` was initialized with.
|
||||
#[inline]
|
||||
pub fn get_page(&self) -> Buffer {
|
||||
let inner = self.inner_mut();
|
||||
inner.get_page()
|
||||
@@ -167,22 +168,25 @@ impl BufferPool {
|
||||
|
||||
/// Request a `Buffer` for use with a WAL frame,
|
||||
/// `[Database::page_size] + `WAL_FRAME_HEADER_SIZE`
|
||||
#[inline]
|
||||
pub fn get_wal_frame(&self) -> Buffer {
|
||||
let inner = self.inner_mut();
|
||||
inner.get_frame()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn inner(&self) -> &PoolInner {
|
||||
unsafe { &*self.inner.get() }
|
||||
}
|
||||
|
||||
#[inline]
|
||||
#[allow(clippy::mut_from_ref)]
|
||||
fn inner_mut(&self) -> &mut PoolInner {
|
||||
unsafe { &mut *self.inner.get() }
|
||||
}
|
||||
|
||||
/// Create a static `BufferPool` initialize the pool to the default page size, **without**
|
||||
/// poulating the Arenas. Arenas will not be created until `[BufferPool::finalize_page_size]`,
|
||||
/// populating the Arenas. Arenas will not be created until `[BufferPool::finalize_page_size]`,
|
||||
/// and the pool will temporarily return temporary buffers to prevent reallocation of the
|
||||
/// arena if the page size is set to something other than the default value.
|
||||
pub fn begin_init(io: &Arc<dyn IO>, arena_size: usize) -> Arc<Self> {
|
||||
@@ -227,7 +231,7 @@ impl BufferPool {
|
||||
|
||||
impl PoolInner {
|
||||
/// Allocate a buffer of the given length from the pool, falling back to
|
||||
/// temporary thread local buffers if the pool is not initialized or full.
|
||||
/// temporary thread local buffers if the pool is not initialized or is full.
|
||||
pub fn allocate(&self, len: usize) -> Buffer {
|
||||
turso_assert!(len > 0, "Cannot allocate zero-length buffer");
|
||||
|
||||
@@ -236,68 +240,33 @@ impl PoolInner {
|
||||
|
||||
// Check if this is exactly a WAL frame size allocation
|
||||
if len == wal_frame_size {
|
||||
let Some(wal_arena) = self.wal_frame_arena.as_ref() else {
|
||||
return Buffer::new_temporary(len);
|
||||
};
|
||||
if let Some(FreeEntry { ptr, first_idx }) = wal_arena.try_alloc(len) {
|
||||
tracing::trace!(
|
||||
"Allocated WAL frame buffer of length {} from arena {} at index {}",
|
||||
len,
|
||||
wal_arena.id,
|
||||
first_idx
|
||||
);
|
||||
return Buffer::new_pooled(ArenaBuffer::new(ptr, len, wal_arena.id, first_idx));
|
||||
}
|
||||
return self
|
||||
.wal_frame_arena
|
||||
.as_ref()
|
||||
.and_then(|wal_arena| wal_arena.try_alloc(len))
|
||||
.unwrap_or(Buffer::new_temporary(len));
|
||||
}
|
||||
// For all other sizes, use regular arena
|
||||
let Some(arena) = self.page_arena.as_ref() else {
|
||||
// pool isn't fully initialized, return temporary buffer
|
||||
return Buffer::new_temporary(len);
|
||||
};
|
||||
if let Some(FreeEntry { ptr, first_idx }) = arena.try_alloc(len) {
|
||||
tracing::trace!(
|
||||
"Allocated buffer of length {} from arena {} at index {}",
|
||||
len,
|
||||
arena.id,
|
||||
first_idx
|
||||
);
|
||||
return Buffer::new_pooled(ArenaBuffer::new(ptr, len, arena.id, first_idx));
|
||||
}
|
||||
Buffer::new_temporary(len)
|
||||
self.page_arena
|
||||
.as_ref()
|
||||
.and_then(|arena| arena.try_alloc(len))
|
||||
.unwrap_or(Buffer::new_temporary(len))
|
||||
}
|
||||
|
||||
fn get_page(&mut self) -> Buffer {
|
||||
let db_page_size = self.db_page_size.load(Ordering::Relaxed);
|
||||
let Some(arena) = self.page_arena.as_ref() else {
|
||||
return Buffer::new_temporary(db_page_size);
|
||||
};
|
||||
if let Some(FreeEntry { ptr, first_idx }) = arena.try_alloc(db_page_size) {
|
||||
tracing::trace!(
|
||||
"Allocated page buffer of size {} from arena {} at index {}",
|
||||
db_page_size,
|
||||
arena.id,
|
||||
first_idx
|
||||
);
|
||||
return Buffer::new_pooled(ArenaBuffer::new(ptr, db_page_size, arena.id, first_idx));
|
||||
}
|
||||
Buffer::new_temporary(db_page_size)
|
||||
self.page_arena
|
||||
.as_ref()
|
||||
.and_then(|arena| arena.try_alloc(db_page_size))
|
||||
.unwrap_or(Buffer::new_temporary(db_page_size))
|
||||
}
|
||||
|
||||
fn get_frame(&mut self) -> Buffer {
|
||||
let len = self.db_page_size.load(Ordering::Relaxed) + WAL_FRAME_HEADER_SIZE;
|
||||
let Some(wal_arena) = self.wal_frame_arena.as_ref() else {
|
||||
return Buffer::new_temporary(len);
|
||||
};
|
||||
if let Some(FreeEntry { ptr, first_idx }) = wal_arena.try_alloc(len) {
|
||||
tracing::trace!(
|
||||
"Allocated WAL frame buffer of length {} from arena {} at index {}",
|
||||
len,
|
||||
wal_arena.id,
|
||||
first_idx
|
||||
);
|
||||
return Buffer::new_pooled(ArenaBuffer::new(ptr, len, wal_arena.id, first_idx));
|
||||
}
|
||||
Buffer::new_temporary(len)
|
||||
self.wal_frame_arena
|
||||
.as_ref()
|
||||
.and_then(|wal_arena| wal_arena.try_alloc(len))
|
||||
.unwrap_or(Buffer::new_temporary(len))
|
||||
}
|
||||
|
||||
/// Allocate a new arena for the pool to use
|
||||
@@ -403,11 +372,6 @@ impl Drop for Arena {
|
||||
}
|
||||
}
|
||||
|
||||
struct FreeEntry {
|
||||
ptr: NonNull<u8>,
|
||||
first_idx: u32,
|
||||
}
|
||||
|
||||
/// Slots 0 and 1 will be reserved for Arenas which are registered buffers
|
||||
/// with io_uring.
|
||||
const UNREGISTERED_START: u32 = 2;
|
||||
@@ -452,16 +416,16 @@ impl Arena {
|
||||
})
|
||||
}
|
||||
|
||||
/// Attempt to mark N pages as `allocated` and return a pointer to and
|
||||
/// index of the first page in the allocation.
|
||||
pub fn try_alloc(&self, size: usize) -> Option<FreeEntry> {
|
||||
/// Allocate a `Buffer` large enough for logical length `size`.
|
||||
/// May span multiple pages
|
||||
pub fn try_alloc(&self, size: usize) -> Option<Buffer> {
|
||||
let pages = size.div_ceil(self.page_size) as u32;
|
||||
let mut freemap = self.free_pages.lock();
|
||||
|
||||
let first_idx = if pages == 1 {
|
||||
// use the optimized method for individual pages which attempts
|
||||
// to leave large contiguous areas free of fragmentation for
|
||||
// larger `runs`
|
||||
// larger `runs`.
|
||||
freemap.alloc_one()?
|
||||
} else {
|
||||
freemap.alloc_run(pages)?
|
||||
@@ -470,7 +434,9 @@ impl Arena {
|
||||
.fetch_add(pages as usize, Ordering::Relaxed);
|
||||
let offset = first_idx as usize * self.page_size;
|
||||
let ptr = unsafe { NonNull::new_unchecked(self.base.as_ptr().add(offset)) };
|
||||
Some(FreeEntry { ptr, first_idx })
|
||||
Some(Buffer::new_pooled(ArenaBuffer::new(
|
||||
ptr, size, self.id, first_idx,
|
||||
)))
|
||||
}
|
||||
|
||||
/// Mark `count` pages starting at `page_idx` as free.
|
||||
|
||||
@@ -11,7 +11,7 @@ EXPERIMENTAL_FLAGS=""
|
||||
|
||||
# if RUST_LOG is non-empty, enable tracing output
|
||||
if [ -n "$RUST_LOG" ]; then
|
||||
CI=true "$TURSODB" -m list -q $EXPERIMENTAL_FLAGS -t testing/test.log "$@"
|
||||
TESTING="true" "$TURSODB" -m list -q $EXPERIMENTAL_FLAGS -t testing/test.log "$@"
|
||||
else
|
||||
CI=true "$TURSODB" -m list -q $EXPERIMENTAL_FLAGS "$@"
|
||||
TESTING="true" "$TURSODB" -m list -q $EXPERIMENTAL_FLAGS "$@"
|
||||
fi
|
||||
|
||||
Reference in New Issue
Block a user