mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-09 10:14:21 +01:00
Add ReadFixed/WriteFixed opcodes for buffers from registered arena
This commit is contained in:
@@ -584,17 +584,30 @@ impl File for UringFile {
|
||||
|
||||
fn pread(&self, pos: usize, c: Completion) -> Result<Completion> {
|
||||
let r = c.as_read();
|
||||
trace!("pread(pos = {}, length = {})", pos, r.buf().len());
|
||||
let mut io = self.io.borrow_mut();
|
||||
let read_e = {
|
||||
let buf = r.buf();
|
||||
let len = buf.len();
|
||||
let buf = buf.as_mut_ptr();
|
||||
with_fd!(self, |fd| {
|
||||
io_uring::opcode::Read::new(fd, buf, len as u32)
|
||||
.offset(pos as u64)
|
||||
.build()
|
||||
.user_data(get_key(c.clone()))
|
||||
if let Some(idx) = buf.fixed_id() {
|
||||
trace!(
|
||||
"pread_fixed(pos = {}, length = {}, idx = {})",
|
||||
pos,
|
||||
len,
|
||||
idx
|
||||
);
|
||||
io_uring::opcode::ReadFixed::new(fd, buf.as_mut_ptr(), len as u32, idx as u16)
|
||||
.offset(pos as u64)
|
||||
.build()
|
||||
.user_data(get_key(c.clone()))
|
||||
} else {
|
||||
trace!("pread(pos = {}, length = {})", pos, len);
|
||||
// Use Read opcode if fixed buffer is not available
|
||||
io_uring::opcode::Read::new(fd, buf.as_mut_ptr(), len as u32)
|
||||
.offset(pos as u64)
|
||||
.build()
|
||||
.user_data(get_key(c.clone()))
|
||||
}
|
||||
})
|
||||
};
|
||||
io.ring.submit_entry(&read_e);
|
||||
@@ -604,12 +617,30 @@ impl File for UringFile {
|
||||
fn pwrite(&self, pos: usize, buffer: Arc<crate::Buffer>, c: Completion) -> Result<Completion> {
|
||||
let mut io = self.io.borrow_mut();
|
||||
let write = {
|
||||
trace!("pwrite(pos = {}, length = {})", pos, buffer.len());
|
||||
with_fd!(self, |fd| {
|
||||
io_uring::opcode::Write::new(fd, buffer.as_ptr(), buffer.len() as u32)
|
||||
if let Some(idx) = buf.fixed_id() {
|
||||
trace!(
|
||||
"pwrite_fixed(pos = {}, length = {}, idx= {})",
|
||||
pos,
|
||||
buffer.len(),
|
||||
idx
|
||||
);
|
||||
io_uring::opcode::WriteFixed::new(
|
||||
fd,
|
||||
buffer.as_ptr(),
|
||||
buffer.len() as u32,
|
||||
idx as u16,
|
||||
)
|
||||
.offset(pos as u64)
|
||||
.build()
|
||||
.user_data(get_key(c.clone()))
|
||||
} else {
|
||||
trace!("pwrite(pos = {}, length = {})", pos, buffer.len());
|
||||
io_uring::opcode::Write::new(fd, buffer.as_ptr(), buffer.len() as u32)
|
||||
.offset(pos as u64)
|
||||
.build()
|
||||
.user_data(get_key(c.clone()))
|
||||
}
|
||||
})
|
||||
};
|
||||
io.ring.submit_entry(&write);
|
||||
|
||||
@@ -393,7 +393,7 @@ impl Database {
|
||||
Some(size) => size,
|
||||
};
|
||||
let buffer_pool =
|
||||
BufferPool::begin_init(&self.io, arena_size).finalize_page_size(size)?;
|
||||
BufferPool::begin_init(&self.io, arena_size).finalize_with_page_size(size)?;
|
||||
|
||||
let db_state = self.db_state.clone();
|
||||
let wal = Rc::new(RefCell::new(WalFile::new(
|
||||
|
||||
@@ -8452,7 +8452,7 @@ mod tests {
|
||||
|
||||
let io: Arc<dyn IO> = Arc::new(MemoryIO::new());
|
||||
let buffer_pool =
|
||||
BufferPool::begin_init(&io, page_size * 128).finalize_page_size(page_size);
|
||||
BufferPool::begin_init(&io, page_size * 128).finalize_with_page_size(page_size);
|
||||
|
||||
let db_file = Arc::new(DatabaseFile::new(
|
||||
io.open_file(":memory:", OpenFlags::Create, false).unwrap(),
|
||||
|
||||
@@ -177,9 +177,9 @@ impl BufferPool {
|
||||
unsafe { &mut *self.inner.get() }
|
||||
}
|
||||
|
||||
/// Initialize the pool to the default page size, **without** creating an arena
|
||||
/// Arena will be created when finalize_page_size is called,
|
||||
/// until then the pool will return temporary buffers to prevent reallocation of the
|
||||
/// Create a static `BufferPool` initialize the pool to the default page size, **without**
|
||||
/// creating an arena. Arena will be created when `[BufferPool::finalize_page_size]` is called.
|
||||
/// Until then the pool will return temporary buffers to prevent reallocation of the
|
||||
/// arena if the page size is set to something other than the default value.
|
||||
pub fn begin_init(io: &Arc<dyn IO>, arena_size: usize) -> Arc<Self> {
|
||||
let pool = BUFFER_POOL.get_or_init(|| Arc::new(BufferPool::new(arena_size)));
|
||||
@@ -191,7 +191,9 @@ impl BufferPool {
|
||||
pool.clone()
|
||||
}
|
||||
|
||||
pub fn finalize_page_size(&self, page_size: usize) -> crate::Result<Arc<BufferPool>> {
|
||||
/// Call when `[Database::db_state]` is initialized, providing the `page_size` to allocate
|
||||
/// an arena for the pool. Before this call, the pool will use temporary buffers
|
||||
pub fn finalize_with_page_size(&self, page_size: usize) -> crate::Result<Arc<Self>> {
|
||||
let pool = BUFFER_POOL.get().expect("BufferPool must be initialized");
|
||||
let inner = pool.inner_mut();
|
||||
tracing::trace!("finalize page size called with size {page_size}");
|
||||
|
||||
@@ -1767,7 +1767,7 @@ impl Pager {
|
||||
default_header.page_size = PageSize::new(size).expect("page size");
|
||||
}
|
||||
self.buffer_pool
|
||||
.finalize_page_size(default_header.page_size.get() as usize)?;
|
||||
.finalize_with_page_size(default_header.page_size.get() as usize)?;
|
||||
let page = allocate_new_page(1, &self.buffer_pool, 0);
|
||||
|
||||
let contents = page.get_contents();
|
||||
|
||||
Reference in New Issue
Block a user