mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-07 17:24:24 +01:00
change all Arc<Completion> to Completion
This commit is contained in:
@@ -694,7 +694,7 @@ impl turso_core::DatabaseStorage for DatabaseFile {
|
||||
&self,
|
||||
page_idx: usize,
|
||||
c: turso_core::Completion,
|
||||
) -> turso_core::Result<Arc<turso_core::Completion>> {
|
||||
) -> turso_core::Result<turso_core::Completion> {
|
||||
let r = c.as_read();
|
||||
let size = r.buf().len();
|
||||
assert!(page_idx > 0);
|
||||
@@ -710,13 +710,13 @@ impl turso_core::DatabaseStorage for DatabaseFile {
|
||||
page_idx: usize,
|
||||
buffer: Arc<std::cell::RefCell<turso_core::Buffer>>,
|
||||
c: turso_core::Completion,
|
||||
) -> turso_core::Result<Arc<turso_core::Completion>> {
|
||||
) -> turso_core::Result<turso_core::Completion> {
|
||||
let size = buffer.borrow().len();
|
||||
let pos = (page_idx - 1) * size;
|
||||
self.file.pwrite(pos, buffer, c.into())
|
||||
}
|
||||
|
||||
fn sync(&self, c: turso_core::Completion) -> turso_core::Result<Arc<turso_core::Completion>> {
|
||||
fn sync(&self, c: turso_core::Completion) -> turso_core::Result<turso_core::Completion> {
|
||||
self.file.sync(c.into())
|
||||
}
|
||||
|
||||
|
||||
@@ -213,11 +213,7 @@ impl turso_core::File for File {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn pread(
|
||||
&self,
|
||||
pos: usize,
|
||||
c: Arc<turso_core::Completion>,
|
||||
) -> Result<Arc<turso_core::Completion>> {
|
||||
fn pread(&self, pos: usize, c: turso_core::Completion) -> Result<turso_core::Completion> {
|
||||
let r = c.as_read();
|
||||
let nr = {
|
||||
let mut buf = r.buf_mut();
|
||||
@@ -233,8 +229,8 @@ impl turso_core::File for File {
|
||||
&self,
|
||||
pos: usize,
|
||||
buffer: Arc<std::cell::RefCell<turso_core::Buffer>>,
|
||||
c: Arc<turso_core::Completion>,
|
||||
) -> Result<Arc<turso_core::Completion>> {
|
||||
c: turso_core::Completion,
|
||||
) -> Result<turso_core::Completion> {
|
||||
let w = c.as_write();
|
||||
let buf = buffer.borrow();
|
||||
let buf: &[u8] = buf.as_slice();
|
||||
@@ -244,7 +240,7 @@ impl turso_core::File for File {
|
||||
Ok(c)
|
||||
}
|
||||
|
||||
fn sync(&self, c: Arc<turso_core::Completion>) -> Result<Arc<turso_core::Completion>> {
|
||||
fn sync(&self, c: turso_core::Completion) -> Result<turso_core::Completion> {
|
||||
self.vfs.sync(self.fd);
|
||||
c.complete(0);
|
||||
#[allow(clippy::arc_with_non_send_sync)]
|
||||
@@ -288,7 +284,7 @@ impl turso_core::IO for PlatformIO {
|
||||
}))
|
||||
}
|
||||
|
||||
fn wait_for_completion(&self, c: Arc<turso_core::Completion>) -> Result<()> {
|
||||
fn wait_for_completion(&self, c: turso_core::Completion) -> Result<()> {
|
||||
while !c.is_completed() {
|
||||
self.run_once()?;
|
||||
}
|
||||
@@ -342,7 +338,7 @@ impl turso_core::DatabaseStorage for DatabaseFile {
|
||||
&self,
|
||||
page_idx: usize,
|
||||
c: turso_core::Completion,
|
||||
) -> Result<Arc<turso_core::Completion>> {
|
||||
) -> Result<turso_core::Completion> {
|
||||
let r = c.as_read();
|
||||
let size = r.buf().len();
|
||||
assert!(page_idx > 0);
|
||||
@@ -358,13 +354,13 @@ impl turso_core::DatabaseStorage for DatabaseFile {
|
||||
page_idx: usize,
|
||||
buffer: Arc<std::cell::RefCell<turso_core::Buffer>>,
|
||||
c: turso_core::Completion,
|
||||
) -> Result<Arc<turso_core::Completion>> {
|
||||
) -> Result<turso_core::Completion> {
|
||||
let size = buffer.borrow().len();
|
||||
let pos = (page_idx - 1) * size;
|
||||
self.file.pwrite(pos, buffer, c.into())
|
||||
}
|
||||
|
||||
fn sync(&self, c: turso_core::Completion) -> Result<Arc<turso_core::Completion>> {
|
||||
fn sync(&self, c: turso_core::Completion) -> Result<turso_core::Completion> {
|
||||
self.file.sync(c.into())
|
||||
}
|
||||
|
||||
|
||||
@@ -35,7 +35,7 @@ impl IO for GenericIO {
|
||||
}))
|
||||
}
|
||||
|
||||
fn wait_for_completion(&self, c: Arc<Completion>) -> Result<()> {
|
||||
fn wait_for_completion(&self, c: Completion) -> Result<()> {
|
||||
while !c.is_completed() {
|
||||
self.run_once()?;
|
||||
}
|
||||
@@ -86,14 +86,11 @@ impl File for GenericFile {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn pread(&self, pos: usize, c: Arc<Completion>) -> Result<Arc<Completion>> {
|
||||
fn pread(&self, pos: usize, c: Completion) -> Result<Completion> {
|
||||
let mut file = self.file.borrow_mut();
|
||||
file.seek(std::io::SeekFrom::Start(pos as u64))?;
|
||||
{
|
||||
let r = match c.completion_type {
|
||||
CompletionType::Read(ref r) => r,
|
||||
_ => unreachable!(),
|
||||
};
|
||||
let r = c.as_read();
|
||||
let mut buf = r.buf_mut();
|
||||
let buf = buf.as_mut_slice();
|
||||
file.read_exact(buf)?;
|
||||
@@ -106,8 +103,8 @@ impl File for GenericFile {
|
||||
&self,
|
||||
pos: usize,
|
||||
buffer: Arc<RefCell<crate::Buffer>>,
|
||||
c: Arc<Completion>,
|
||||
) -> Result<Arc<Completion>> {
|
||||
c: Completion,
|
||||
) -> Result<Completion> {
|
||||
let mut file = self.file.borrow_mut();
|
||||
file.seek(std::io::SeekFrom::Start(pos as u64))?;
|
||||
let buf = buffer.borrow();
|
||||
@@ -117,7 +114,7 @@ impl File for GenericFile {
|
||||
Ok(c)
|
||||
}
|
||||
|
||||
fn sync(&self, c: Arc<Completion>) -> Result<Arc<Completion>> {
|
||||
fn sync(&self, c: Completion) -> Result<Completion> {
|
||||
let mut file = self.file.borrow_mut();
|
||||
file.sync_all().map_err(|err| LimboError::IOError(err))?;
|
||||
c.complete(0);
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
#![allow(clippy::arc_with_non_send_sync)]
|
||||
|
||||
use super::{common, Completion, File, OpenFlags, IO};
|
||||
use super::{common, Completion, CompletionInner, File, OpenFlags, IO};
|
||||
use crate::io::clock::{Clock, Instant};
|
||||
use crate::{turso_assert, LimboError, MemoryIO, Result};
|
||||
use rustix::fs::{self, FlockOperation, OFlags};
|
||||
@@ -168,7 +168,7 @@ impl IO for UringIO {
|
||||
Ok(uring_file)
|
||||
}
|
||||
|
||||
fn wait_for_completion(&self, c: Arc<Completion>) -> Result<()> {
|
||||
fn wait_for_completion(&self, c: Completion) -> Result<()> {
|
||||
while !c.is_completed() {
|
||||
self.run_once()?;
|
||||
}
|
||||
@@ -225,14 +225,15 @@ impl Clock for UringIO {
|
||||
#[inline(always)]
|
||||
/// use the callback pointer as the user_data for the operation as is
|
||||
/// common practice for io_uring to prevent more indirection
|
||||
fn get_key(c: Arc<Completion>) -> u64 {
|
||||
Arc::into_raw(c) as u64
|
||||
fn get_key(c: Completion) -> u64 {
|
||||
Arc::into_raw(c.inner) as u64
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
/// convert the user_data back to an Arc<Completion> pointer
|
||||
fn completion_from_key(key: u64) -> Arc<Completion> {
|
||||
unsafe { Arc::from_raw(key as *const Completion) }
|
||||
/// convert the user_data back to an Completion pointer
|
||||
fn completion_from_key(key: u64) -> Completion {
|
||||
let c_inner = unsafe { Arc::from_raw(key as *const CompletionInner) };
|
||||
Completion { inner: c_inner }
|
||||
}
|
||||
|
||||
pub struct UringFile {
|
||||
@@ -297,7 +298,7 @@ impl File for UringFile {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn pread(&self, pos: usize, c: Arc<Completion>) -> Result<Arc<Completion>> {
|
||||
fn pread(&self, pos: usize, c: Completion) -> Result<Completion> {
|
||||
let r = c.as_read();
|
||||
trace!("pread(pos = {}, length = {})", pos, r.buf().len());
|
||||
let mut io = self.io.borrow_mut();
|
||||
@@ -320,8 +321,8 @@ impl File for UringFile {
|
||||
&self,
|
||||
pos: usize,
|
||||
buffer: Arc<RefCell<crate::Buffer>>,
|
||||
c: Arc<Completion>,
|
||||
) -> Result<Arc<Completion>> {
|
||||
c: Completion,
|
||||
) -> Result<Completion> {
|
||||
let mut io = self.io.borrow_mut();
|
||||
let write = {
|
||||
let buf = buffer.borrow();
|
||||
@@ -337,7 +338,7 @@ impl File for UringFile {
|
||||
Ok(c)
|
||||
}
|
||||
|
||||
fn sync(&self, c: Arc<Completion>) -> Result<Arc<Completion>> {
|
||||
fn sync(&self, c: Completion) -> Result<Completion> {
|
||||
let mut io = self.io.borrow_mut();
|
||||
trace!("sync()");
|
||||
let sync = with_fd!(self, |fd| {
|
||||
|
||||
@@ -53,7 +53,7 @@ impl IO for MemoryIO {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn wait_for_completion(&self, _c: Arc<Completion>) -> Result<()> {
|
||||
fn wait_for_completion(&self, _c: Completion) -> Result<()> {
|
||||
todo!();
|
||||
}
|
||||
|
||||
@@ -83,7 +83,7 @@ impl File for MemoryFile {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn pread(&self, pos: usize, c: Arc<Completion>) -> Result<Arc<Completion>> {
|
||||
fn pread(&self, pos: usize, c: Completion) -> Result<Completion> {
|
||||
let r = c.as_read();
|
||||
let buf_len = r.buf().len();
|
||||
if buf_len == 0 {
|
||||
@@ -128,8 +128,8 @@ impl File for MemoryFile {
|
||||
&self,
|
||||
pos: usize,
|
||||
buffer: Arc<RefCell<Buffer>>,
|
||||
c: Arc<Completion>,
|
||||
) -> Result<Arc<Completion>> {
|
||||
c: Completion,
|
||||
) -> Result<Completion> {
|
||||
let buf = buffer.borrow();
|
||||
let buf_len = buf.len();
|
||||
if buf_len == 0 {
|
||||
@@ -165,7 +165,7 @@ impl File for MemoryFile {
|
||||
Ok(c)
|
||||
}
|
||||
|
||||
fn sync(&self, c: Arc<Completion>) -> Result<Arc<Completion>> {
|
||||
fn sync(&self, c: Completion) -> Result<Completion> {
|
||||
// no-op
|
||||
c.complete(0);
|
||||
Ok(c)
|
||||
|
||||
@@ -14,14 +14,10 @@ use std::{
|
||||
pub trait File: Send + Sync {
|
||||
fn lock_file(&self, exclusive: bool) -> Result<()>;
|
||||
fn unlock_file(&self) -> Result<()>;
|
||||
fn pread(&self, pos: usize, c: Arc<Completion>) -> Result<Arc<Completion>>;
|
||||
fn pwrite(
|
||||
&self,
|
||||
pos: usize,
|
||||
buffer: Arc<RefCell<Buffer>>,
|
||||
c: Arc<Completion>,
|
||||
) -> Result<Arc<Completion>>;
|
||||
fn sync(&self, c: Arc<Completion>) -> Result<Arc<Completion>>;
|
||||
fn pread(&self, pos: usize, c: Completion) -> Result<Completion>;
|
||||
fn pwrite(&self, pos: usize, buffer: Arc<RefCell<Buffer>>, c: Completion)
|
||||
-> Result<Completion>;
|
||||
fn sync(&self, c: Completion) -> Result<Completion>;
|
||||
fn size(&self) -> Result<u64>;
|
||||
}
|
||||
|
||||
@@ -47,7 +43,7 @@ pub trait IO: Clock + Send + Sync {
|
||||
|
||||
fn run_once(&self) -> Result<()>;
|
||||
|
||||
fn wait_for_completion(&self, c: Arc<Completion>) -> Result<()>;
|
||||
fn wait_for_completion(&self, c: Completion) -> Result<()>;
|
||||
|
||||
fn generate_random_number(&self) -> i64;
|
||||
|
||||
|
||||
@@ -286,7 +286,7 @@ impl IO for UnixIO {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn wait_for_completion(&self, c: Arc<Completion>) -> Result<()> {
|
||||
fn wait_for_completion(&self, c: Completion) -> Result<()> {
|
||||
while !c.is_completed() {
|
||||
self.run_once()?;
|
||||
}
|
||||
@@ -305,10 +305,10 @@ impl IO for UnixIO {
|
||||
}
|
||||
|
||||
enum CompletionCallback {
|
||||
Read(Arc<Mutex<std::fs::File>>, Arc<Completion>, usize),
|
||||
Read(Arc<Mutex<std::fs::File>>, Completion, usize),
|
||||
Write(
|
||||
Arc<Mutex<std::fs::File>>,
|
||||
Arc<Completion>,
|
||||
Completion,
|
||||
Arc<RefCell<crate::Buffer>>,
|
||||
usize,
|
||||
),
|
||||
@@ -364,7 +364,7 @@ impl File for UnixFile<'_> {
|
||||
}
|
||||
|
||||
#[instrument(err, skip_all, level = Level::TRACE)]
|
||||
fn pread(&self, pos: usize, c: Arc<Completion>) -> Result<Arc<Completion>> {
|
||||
fn pread(&self, pos: usize, c: Completion) -> Result<Completion> {
|
||||
let file = self.file.lock().unwrap();
|
||||
let result = {
|
||||
let r = c.as_read();
|
||||
@@ -401,8 +401,8 @@ impl File for UnixFile<'_> {
|
||||
&self,
|
||||
pos: usize,
|
||||
buffer: Arc<RefCell<crate::Buffer>>,
|
||||
c: Arc<Completion>,
|
||||
) -> Result<Arc<Completion>> {
|
||||
c: Completion,
|
||||
) -> Result<Completion> {
|
||||
let file = self.file.lock().unwrap();
|
||||
let result = {
|
||||
let buf = buffer.borrow();
|
||||
@@ -432,7 +432,7 @@ impl File for UnixFile<'_> {
|
||||
}
|
||||
|
||||
#[instrument(err, skip_all, level = Level::TRACE)]
|
||||
fn sync(&self, c: Arc<Completion>) -> Result<Arc<Completion>> {
|
||||
fn sync(&self, c: Completion) -> Result<Completion> {
|
||||
let file = self.file.lock().unwrap();
|
||||
let result = fs::fsync(file.as_fd());
|
||||
match result {
|
||||
|
||||
@@ -43,7 +43,7 @@ impl IO for VfsMod {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn wait_for_completion(&self, _c: Arc<Completion>) -> Result<()> {
|
||||
fn wait_for_completion(&self, _c: Completion) -> Result<()> {
|
||||
todo!();
|
||||
}
|
||||
|
||||
@@ -97,7 +97,7 @@ impl File for VfsFileImpl {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn pread(&self, pos: usize, c: Arc<Completion>) -> Result<Arc<Completion>> {
|
||||
fn pread(&self, pos: usize, c: Completion) -> Result<Completion> {
|
||||
let r = c.as_read();
|
||||
let result = {
|
||||
let mut buf = r.buf_mut();
|
||||
@@ -117,8 +117,8 @@ impl File for VfsFileImpl {
|
||||
&self,
|
||||
pos: usize,
|
||||
buffer: Arc<RefCell<Buffer>>,
|
||||
c: Arc<Completion>,
|
||||
) -> Result<Arc<Completion>> {
|
||||
c: Completion,
|
||||
) -> Result<Completion> {
|
||||
let buf = buffer.borrow();
|
||||
let count = buf.as_slice().len();
|
||||
if self.vfs.is_null() {
|
||||
@@ -142,7 +142,7 @@ impl File for VfsFileImpl {
|
||||
}
|
||||
}
|
||||
|
||||
fn sync(&self, c: Arc<Completion>) -> Result<Arc<Completion>> {
|
||||
fn sync(&self, c: Completion) -> Result<Completion> {
|
||||
let vfs = unsafe { &*self.vfs };
|
||||
let result = unsafe { (vfs.sync)(self.file) };
|
||||
if result < 0 {
|
||||
|
||||
@@ -33,7 +33,7 @@ impl IO for WindowsIO {
|
||||
}
|
||||
|
||||
#[instrument(err, skip_all, level = Level::TRACE)]
|
||||
fn wait_for_completion(&self, c: Arc<Completion>) -> Result<()> {
|
||||
fn wait_for_completion(&self, c: Completion) -> Result<()> {
|
||||
while !c.is_completed() {
|
||||
self.run_once()?;
|
||||
}
|
||||
@@ -84,7 +84,7 @@ impl File for WindowsFile {
|
||||
}
|
||||
|
||||
#[instrument(skip(self, c), level = Level::TRACE)]
|
||||
fn pread(&self, pos: usize, c: Arc<Completion>) -> Result<Arc<Completion>> {
|
||||
fn pread(&self, pos: usize, c: Completion) -> Result<Completion> {
|
||||
let mut file = self.file.write();
|
||||
file.seek(std::io::SeekFrom::Start(pos as u64))?;
|
||||
let nr = {
|
||||
@@ -103,8 +103,8 @@ impl File for WindowsFile {
|
||||
&self,
|
||||
pos: usize,
|
||||
buffer: Arc<RefCell<crate::Buffer>>,
|
||||
c: Arc<Completion>,
|
||||
) -> Result<Arc<Completion>> {
|
||||
c: Completion,
|
||||
) -> Result<Completion> {
|
||||
let mut file = self.file.write();
|
||||
file.seek(std::io::SeekFrom::Start(pos as u64))?;
|
||||
let buf = buffer.borrow();
|
||||
@@ -115,7 +115,7 @@ impl File for WindowsFile {
|
||||
}
|
||||
|
||||
#[instrument(err, skip_all, level = Level::TRACE)]
|
||||
fn sync(&self, c: Arc<Completion>) -> Result<Arc<Completion>> {
|
||||
fn sync(&self, c: Completion) -> Result<Completion> {
|
||||
let file = self.file.write();
|
||||
file.sync_all().map_err(LimboError::IOError)?;
|
||||
c.complete(0);
|
||||
|
||||
@@ -9,14 +9,14 @@ use tracing::{instrument, Level};
|
||||
/// the storage medium. A database can either be a file on disk, like in SQLite,
|
||||
/// or something like a remote page server service.
|
||||
pub trait DatabaseStorage: Send + Sync {
|
||||
fn read_page(&self, page_idx: usize, c: Completion) -> Result<Arc<Completion>>;
|
||||
fn read_page(&self, page_idx: usize, c: Completion) -> Result<Completion>;
|
||||
fn write_page(
|
||||
&self,
|
||||
page_idx: usize,
|
||||
buffer: Arc<RefCell<Buffer>>,
|
||||
c: Completion,
|
||||
) -> Result<Arc<Completion>>;
|
||||
fn sync(&self, c: Completion) -> Result<Arc<Completion>>;
|
||||
) -> Result<Completion>;
|
||||
fn sync(&self, c: Completion) -> Result<Completion>;
|
||||
fn size(&self) -> Result<u64>;
|
||||
}
|
||||
|
||||
@@ -33,7 +33,7 @@ unsafe impl Sync for DatabaseFile {}
|
||||
#[cfg(feature = "fs")]
|
||||
impl DatabaseStorage for DatabaseFile {
|
||||
#[instrument(skip_all, level = Level::DEBUG)]
|
||||
fn read_page(&self, page_idx: usize, c: Completion) -> Result<Arc<Completion>> {
|
||||
fn read_page(&self, page_idx: usize, c: Completion) -> Result<Completion> {
|
||||
let r = c.as_read();
|
||||
let size = r.buf().len();
|
||||
assert!(page_idx > 0);
|
||||
@@ -50,7 +50,7 @@ impl DatabaseStorage for DatabaseFile {
|
||||
page_idx: usize,
|
||||
buffer: Arc<RefCell<Buffer>>,
|
||||
c: Completion,
|
||||
) -> Result<Arc<Completion>> {
|
||||
) -> Result<Completion> {
|
||||
let buffer_size = buffer.borrow().len();
|
||||
assert!(page_idx > 0);
|
||||
assert!(buffer_size >= 512);
|
||||
@@ -61,7 +61,7 @@ impl DatabaseStorage for DatabaseFile {
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::DEBUG)]
|
||||
fn sync(&self, c: Completion) -> Result<Arc<Completion>> {
|
||||
fn sync(&self, c: Completion) -> Result<Completion> {
|
||||
self.file.sync(c.into())
|
||||
}
|
||||
|
||||
@@ -87,7 +87,7 @@ unsafe impl Sync for FileMemoryStorage {}
|
||||
|
||||
impl DatabaseStorage for FileMemoryStorage {
|
||||
#[instrument(skip_all, level = Level::DEBUG)]
|
||||
fn read_page(&self, page_idx: usize, c: Completion) -> Result<Arc<Completion>> {
|
||||
fn read_page(&self, page_idx: usize, c: Completion) -> Result<Completion> {
|
||||
let r = c.as_read();
|
||||
let size = r.buf().len();
|
||||
assert!(page_idx > 0);
|
||||
@@ -104,7 +104,7 @@ impl DatabaseStorage for FileMemoryStorage {
|
||||
page_idx: usize,
|
||||
buffer: Arc<RefCell<Buffer>>,
|
||||
c: Completion,
|
||||
) -> Result<Arc<Completion>> {
|
||||
) -> Result<Completion> {
|
||||
let buffer_size = buffer.borrow().len();
|
||||
assert!(buffer_size >= 512);
|
||||
assert!(buffer_size <= 65536);
|
||||
@@ -114,7 +114,7 @@ impl DatabaseStorage for FileMemoryStorage {
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::DEBUG)]
|
||||
fn sync(&self, c: Completion) -> Result<Arc<Completion>> {
|
||||
fn sync(&self, c: Completion) -> Result<Completion> {
|
||||
self.file.sync(c.into())
|
||||
}
|
||||
|
||||
|
||||
@@ -1167,7 +1167,7 @@ impl Pager {
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::DEBUG)]
|
||||
pub fn wal_get_frame(&self, frame_no: u32, frame: &mut [u8]) -> Result<Arc<Completion>> {
|
||||
pub fn wal_get_frame(&self, frame_no: u32, frame: &mut [u8]) -> Result<Completion> {
|
||||
let wal = self.wal.borrow();
|
||||
wal.read_frame_raw(frame_no.into(), frame)
|
||||
}
|
||||
|
||||
@@ -762,7 +762,7 @@ pub fn begin_read_page(
|
||||
buffer_pool: Arc<BufferPool>,
|
||||
page: PageRef,
|
||||
page_idx: usize,
|
||||
) -> Result<Arc<Completion>> {
|
||||
) -> Result<Completion> {
|
||||
tracing::trace!("begin_read_btree_page(page_idx = {})", page_idx);
|
||||
let buf = buffer_pool.get();
|
||||
let drop_fn = Rc::new(move |buf| {
|
||||
@@ -813,7 +813,7 @@ pub fn begin_write_btree_page(
|
||||
pager: &Pager,
|
||||
page: &PageRef,
|
||||
write_counter: Rc<RefCell<usize>>,
|
||||
) -> Result<Arc<Completion>> {
|
||||
) -> Result<Completion> {
|
||||
tracing::trace!("begin_write_btree_page(page={})", page.get().id);
|
||||
let page_source = &pager.db_file;
|
||||
let page_finish = page.clone();
|
||||
@@ -1575,7 +1575,7 @@ pub fn begin_read_wal_frame_raw(
|
||||
offset: usize,
|
||||
page_size: u32,
|
||||
complete: Box<dyn Fn(Arc<RefCell<Buffer>>, i32)>,
|
||||
) -> Result<Arc<Completion>> {
|
||||
) -> Result<Completion> {
|
||||
tracing::trace!("begin_read_wal_frame_raw(offset={})", offset);
|
||||
let drop_fn = Rc::new(|_buf| {});
|
||||
let buf = Arc::new(RefCell::new(Buffer::allocate(
|
||||
@@ -1593,7 +1593,7 @@ pub fn begin_read_wal_frame(
|
||||
offset: usize,
|
||||
buffer_pool: Arc<BufferPool>,
|
||||
complete: Box<dyn Fn(Arc<RefCell<Buffer>>, i32)>,
|
||||
) -> Result<Arc<Completion>> {
|
||||
) -> Result<Completion> {
|
||||
tracing::trace!("begin_read_wal_frame(offset={})", offset);
|
||||
let buf = buffer_pool.get();
|
||||
let drop_fn = Rc::new(move |buf| {
|
||||
|
||||
@@ -209,7 +209,7 @@ pub trait Wal {
|
||||
fn read_frame(&self, frame_id: u64, page: PageRef, buffer_pool: Arc<BufferPool>) -> Result<()>;
|
||||
|
||||
/// Read a raw frame (header included) from the WAL.
|
||||
fn read_frame_raw(&self, frame_id: u64, frame: &mut [u8]) -> Result<Arc<Completion>>;
|
||||
fn read_frame_raw(&self, frame_id: u64, frame: &mut [u8]) -> Result<Completion>;
|
||||
|
||||
/// Write a raw frame (header included) from the WAL.
|
||||
/// Note, that turso-db will use page_no and size_after fields from the header, but will overwrite checksum with proper value
|
||||
@@ -284,7 +284,7 @@ impl Wal for DummyWAL {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn read_frame_raw(&self, _frame_id: u64, _frame: &mut [u8]) -> Result<Arc<Completion>> {
|
||||
fn read_frame_raw(&self, _frame_id: u64, _frame: &mut [u8]) -> Result<Completion> {
|
||||
todo!();
|
||||
}
|
||||
|
||||
@@ -636,7 +636,7 @@ impl Wal for WalFile {
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = Level::DEBUG)]
|
||||
fn read_frame_raw(&self, frame_id: u64, frame: &mut [u8]) -> Result<Arc<Completion>> {
|
||||
fn read_frame_raw(&self, frame_id: u64, frame: &mut [u8]) -> Result<Completion> {
|
||||
tracing::debug!("read_frame({})", frame_id);
|
||||
let offset = self.frame_offset(frame_id);
|
||||
let (frame_ptr, frame_len) = (frame.as_mut_ptr(), frame.len());
|
||||
@@ -731,7 +731,7 @@ impl Wal for WalFile {
|
||||
db_size as u32,
|
||||
page,
|
||||
);
|
||||
let c = Arc::new(Completion::new_write(|_| {}));
|
||||
let c = Completion::new_write(|_| {});
|
||||
let c = shared.file.pwrite(offset, frame_bytes, c)?;
|
||||
self.io.wait_for_completion(c)?;
|
||||
self.complete_append_frame(page_id, frame_id, checksums);
|
||||
|
||||
@@ -402,7 +402,7 @@ impl SortedChunk {
|
||||
read_buffer_ref,
|
||||
read_complete,
|
||||
)));
|
||||
self.file.pread(self.total_bytes_read.get(), Arc::new(c))?;
|
||||
self.file.pread(self.total_bytes_read.get(), c)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -448,7 +448,7 @@ impl SortedChunk {
|
||||
});
|
||||
|
||||
let c = Completion::new(CompletionType::Write(WriteCompletion::new(write_complete)));
|
||||
self.file.pwrite(0, buffer_ref, Arc::new(c))?;
|
||||
self.file.pwrite(0, buffer_ref, c)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -38,12 +38,12 @@ pub(crate) struct SimulatorFile {
|
||||
|
||||
pub latency_probability: usize,
|
||||
|
||||
pub sync_completion: RefCell<Option<Arc<turso_core::Completion>>>,
|
||||
pub sync_completion: RefCell<Option<turso_core::Completion>>,
|
||||
pub queued_io: RefCell<Vec<DelayedIo>>,
|
||||
pub clock: Arc<SimulatorClock>,
|
||||
}
|
||||
|
||||
type IoOperation = Box<dyn FnOnce(&SimulatorFile) -> Result<Arc<turso_core::Completion>>>;
|
||||
type IoOperation = Box<dyn FnOnce(&SimulatorFile) -> Result<turso_core::Completion>>;
|
||||
|
||||
pub struct DelayedIo {
|
||||
pub time: turso_core::Instant,
|
||||
@@ -149,11 +149,7 @@ impl File for SimulatorFile {
|
||||
self.inner.unlock_file()
|
||||
}
|
||||
|
||||
fn pread(
|
||||
&self,
|
||||
pos: usize,
|
||||
c: Arc<turso_core::Completion>,
|
||||
) -> Result<Arc<turso_core::Completion>> {
|
||||
fn pread(&self, pos: usize, c: turso_core::Completion) -> Result<turso_core::Completion> {
|
||||
self.nr_pread_calls.set(self.nr_pread_calls.get() + 1);
|
||||
if self.fault.get() {
|
||||
tracing::debug!("pread fault");
|
||||
@@ -178,8 +174,8 @@ impl File for SimulatorFile {
|
||||
&self,
|
||||
pos: usize,
|
||||
buffer: Arc<RefCell<turso_core::Buffer>>,
|
||||
c: Arc<turso_core::Completion>,
|
||||
) -> Result<Arc<turso_core::Completion>> {
|
||||
c: turso_core::Completion,
|
||||
) -> Result<turso_core::Completion> {
|
||||
self.nr_pwrite_calls.set(self.nr_pwrite_calls.get() + 1);
|
||||
if self.fault.get() {
|
||||
tracing::debug!("pwrite fault");
|
||||
@@ -200,7 +196,7 @@ impl File for SimulatorFile {
|
||||
}
|
||||
}
|
||||
|
||||
fn sync(&self, c: Arc<turso_core::Completion>) -> Result<Arc<turso_core::Completion>> {
|
||||
fn sync(&self, c: turso_core::Completion) -> Result<turso_core::Completion> {
|
||||
self.nr_sync_calls.set(self.nr_sync_calls.get() + 1);
|
||||
if self.fault.get() {
|
||||
// TODO: Enable this when https://github.com/tursodatabase/turso/issues/2091 is fixed.
|
||||
|
||||
@@ -104,7 +104,7 @@ impl IO for SimulatorIO {
|
||||
Ok(file)
|
||||
}
|
||||
|
||||
fn wait_for_completion(&self, c: Arc<turso_core::Completion>) -> Result<()> {
|
||||
fn wait_for_completion(&self, c: turso_core::Completion) -> Result<()> {
|
||||
while !c.is_completed() {
|
||||
self.run_once()?;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user