From d30c7d54c8a28a36e8b7b121f9719978087e888f Mon Sep 17 00:00:00 2001 From: pedrocarlo Date: Mon, 28 Jul 2025 13:29:42 -0300 Subject: [PATCH] change all Arc to Completion --- bindings/javascript/src/lib.rs | 6 +++--- bindings/wasm/lib.rs | 20 ++++++++------------ core/io/generic.rs | 15 ++++++--------- core/io/io_uring.rs | 23 ++++++++++++----------- core/io/memory.rs | 10 +++++----- core/io/mod.rs | 14 +++++--------- core/io/unix.rs | 14 +++++++------- core/io/vfs.rs | 10 +++++----- core/io/windows.rs | 10 +++++----- core/storage/database.rs | 18 +++++++++--------- core/storage/pager.rs | 2 +- core/storage/sqlite3_ondisk.rs | 8 ++++---- core/storage/wal.rs | 8 ++++---- core/vdbe/sorter.rs | 4 ++-- simulator/runner/file.rs | 16 ++++++---------- simulator/runner/io.rs | 2 +- 16 files changed, 83 insertions(+), 97 deletions(-) diff --git a/bindings/javascript/src/lib.rs b/bindings/javascript/src/lib.rs index 7abdfd521..ecb5d84ff 100644 --- a/bindings/javascript/src/lib.rs +++ b/bindings/javascript/src/lib.rs @@ -694,7 +694,7 @@ impl turso_core::DatabaseStorage for DatabaseFile { &self, page_idx: usize, c: turso_core::Completion, - ) -> turso_core::Result> { + ) -> turso_core::Result { let r = c.as_read(); let size = r.buf().len(); assert!(page_idx > 0); @@ -710,13 +710,13 @@ impl turso_core::DatabaseStorage for DatabaseFile { page_idx: usize, buffer: Arc>, c: turso_core::Completion, - ) -> turso_core::Result> { + ) -> turso_core::Result { let size = buffer.borrow().len(); let pos = (page_idx - 1) * size; self.file.pwrite(pos, buffer, c.into()) } - fn sync(&self, c: turso_core::Completion) -> turso_core::Result> { + fn sync(&self, c: turso_core::Completion) -> turso_core::Result { self.file.sync(c.into()) } diff --git a/bindings/wasm/lib.rs b/bindings/wasm/lib.rs index e9b435a57..e244f0c6b 100644 --- a/bindings/wasm/lib.rs +++ b/bindings/wasm/lib.rs @@ -213,11 +213,7 @@ impl turso_core::File for File { Ok(()) } - fn pread( - &self, - pos: usize, - c: Arc, - ) -> Result> { + fn pread(&self, pos: usize, c: turso_core::Completion) -> Result { let r = c.as_read(); let nr = { let mut buf = r.buf_mut(); @@ -233,8 +229,8 @@ impl turso_core::File for File { &self, pos: usize, buffer: Arc>, - c: Arc, - ) -> Result> { + c: turso_core::Completion, + ) -> Result { let w = c.as_write(); let buf = buffer.borrow(); let buf: &[u8] = buf.as_slice(); @@ -244,7 +240,7 @@ impl turso_core::File for File { Ok(c) } - fn sync(&self, c: Arc) -> Result> { + fn sync(&self, c: turso_core::Completion) -> Result { self.vfs.sync(self.fd); c.complete(0); #[allow(clippy::arc_with_non_send_sync)] @@ -288,7 +284,7 @@ impl turso_core::IO for PlatformIO { })) } - fn wait_for_completion(&self, c: Arc) -> Result<()> { + fn wait_for_completion(&self, c: turso_core::Completion) -> Result<()> { while !c.is_completed() { self.run_once()?; } @@ -342,7 +338,7 @@ impl turso_core::DatabaseStorage for DatabaseFile { &self, page_idx: usize, c: turso_core::Completion, - ) -> Result> { + ) -> Result { let r = c.as_read(); let size = r.buf().len(); assert!(page_idx > 0); @@ -358,13 +354,13 @@ impl turso_core::DatabaseStorage for DatabaseFile { page_idx: usize, buffer: Arc>, c: turso_core::Completion, - ) -> Result> { + ) -> Result { let size = buffer.borrow().len(); let pos = (page_idx - 1) * size; self.file.pwrite(pos, buffer, c.into()) } - fn sync(&self, c: turso_core::Completion) -> Result> { + fn sync(&self, c: turso_core::Completion) -> Result { self.file.sync(c.into()) } diff --git a/core/io/generic.rs b/core/io/generic.rs index ebbd67d17..15ce52564 100644 --- a/core/io/generic.rs +++ b/core/io/generic.rs @@ -35,7 +35,7 @@ impl IO for GenericIO { })) } - fn wait_for_completion(&self, c: Arc) -> Result<()> { + fn wait_for_completion(&self, c: Completion) -> Result<()> { while !c.is_completed() { self.run_once()?; } @@ -86,14 +86,11 @@ impl File for GenericFile { Ok(()) } - fn pread(&self, pos: usize, c: Arc) -> Result> { + fn pread(&self, pos: usize, c: Completion) -> Result { let mut file = self.file.borrow_mut(); file.seek(std::io::SeekFrom::Start(pos as u64))?; { - let r = match c.completion_type { - CompletionType::Read(ref r) => r, - _ => unreachable!(), - }; + let r = c.as_read(); let mut buf = r.buf_mut(); let buf = buf.as_mut_slice(); file.read_exact(buf)?; @@ -106,8 +103,8 @@ impl File for GenericFile { &self, pos: usize, buffer: Arc>, - c: Arc, - ) -> Result> { + c: Completion, + ) -> Result { let mut file = self.file.borrow_mut(); file.seek(std::io::SeekFrom::Start(pos as u64))?; let buf = buffer.borrow(); @@ -117,7 +114,7 @@ impl File for GenericFile { Ok(c) } - fn sync(&self, c: Arc) -> Result> { + fn sync(&self, c: Completion) -> Result { let mut file = self.file.borrow_mut(); file.sync_all().map_err(|err| LimboError::IOError(err))?; c.complete(0); diff --git a/core/io/io_uring.rs b/core/io/io_uring.rs index aa0f21f3c..2ccb3e30a 100644 --- a/core/io/io_uring.rs +++ b/core/io/io_uring.rs @@ -1,6 +1,6 @@ #![allow(clippy::arc_with_non_send_sync)] -use super::{common, Completion, File, OpenFlags, IO}; +use super::{common, Completion, CompletionInner, File, OpenFlags, IO}; use crate::io::clock::{Clock, Instant}; use crate::{turso_assert, LimboError, MemoryIO, Result}; use rustix::fs::{self, FlockOperation, OFlags}; @@ -168,7 +168,7 @@ impl IO for UringIO { Ok(uring_file) } - fn wait_for_completion(&self, c: Arc) -> Result<()> { + fn wait_for_completion(&self, c: Completion) -> Result<()> { while !c.is_completed() { self.run_once()?; } @@ -225,14 +225,15 @@ impl Clock for UringIO { #[inline(always)] /// use the callback pointer as the user_data for the operation as is /// common practice for io_uring to prevent more indirection -fn get_key(c: Arc) -> u64 { - Arc::into_raw(c) as u64 +fn get_key(c: Completion) -> u64 { + Arc::into_raw(c.inner) as u64 } #[inline(always)] -/// convert the user_data back to an Arc pointer -fn completion_from_key(key: u64) -> Arc { - unsafe { Arc::from_raw(key as *const Completion) } +/// convert the user_data back to an Completion pointer +fn completion_from_key(key: u64) -> Completion { + let c_inner = unsafe { Arc::from_raw(key as *const CompletionInner) }; + Completion { inner: c_inner } } pub struct UringFile { @@ -297,7 +298,7 @@ impl File for UringFile { Ok(()) } - fn pread(&self, pos: usize, c: Arc) -> Result> { + fn pread(&self, pos: usize, c: Completion) -> Result { let r = c.as_read(); trace!("pread(pos = {}, length = {})", pos, r.buf().len()); let mut io = self.io.borrow_mut(); @@ -320,8 +321,8 @@ impl File for UringFile { &self, pos: usize, buffer: Arc>, - c: Arc, - ) -> Result> { + c: Completion, + ) -> Result { let mut io = self.io.borrow_mut(); let write = { let buf = buffer.borrow(); @@ -337,7 +338,7 @@ impl File for UringFile { Ok(c) } - fn sync(&self, c: Arc) -> Result> { + fn sync(&self, c: Completion) -> Result { let mut io = self.io.borrow_mut(); trace!("sync()"); let sync = with_fd!(self, |fd| { diff --git a/core/io/memory.rs b/core/io/memory.rs index 597696605..ac81b39bc 100644 --- a/core/io/memory.rs +++ b/core/io/memory.rs @@ -53,7 +53,7 @@ impl IO for MemoryIO { Ok(()) } - fn wait_for_completion(&self, _c: Arc) -> Result<()> { + fn wait_for_completion(&self, _c: Completion) -> Result<()> { todo!(); } @@ -83,7 +83,7 @@ impl File for MemoryFile { Ok(()) } - fn pread(&self, pos: usize, c: Arc) -> Result> { + fn pread(&self, pos: usize, c: Completion) -> Result { let r = c.as_read(); let buf_len = r.buf().len(); if buf_len == 0 { @@ -128,8 +128,8 @@ impl File for MemoryFile { &self, pos: usize, buffer: Arc>, - c: Arc, - ) -> Result> { + c: Completion, + ) -> Result { let buf = buffer.borrow(); let buf_len = buf.len(); if buf_len == 0 { @@ -165,7 +165,7 @@ impl File for MemoryFile { Ok(c) } - fn sync(&self, c: Arc) -> Result> { + fn sync(&self, c: Completion) -> Result { // no-op c.complete(0); Ok(c) diff --git a/core/io/mod.rs b/core/io/mod.rs index a2717db0b..b58c17ab6 100644 --- a/core/io/mod.rs +++ b/core/io/mod.rs @@ -14,14 +14,10 @@ use std::{ pub trait File: Send + Sync { fn lock_file(&self, exclusive: bool) -> Result<()>; fn unlock_file(&self) -> Result<()>; - fn pread(&self, pos: usize, c: Arc) -> Result>; - fn pwrite( - &self, - pos: usize, - buffer: Arc>, - c: Arc, - ) -> Result>; - fn sync(&self, c: Arc) -> Result>; + fn pread(&self, pos: usize, c: Completion) -> Result; + fn pwrite(&self, pos: usize, buffer: Arc>, c: Completion) + -> Result; + fn sync(&self, c: Completion) -> Result; fn size(&self) -> Result; } @@ -47,7 +43,7 @@ pub trait IO: Clock + Send + Sync { fn run_once(&self) -> Result<()>; - fn wait_for_completion(&self, c: Arc) -> Result<()>; + fn wait_for_completion(&self, c: Completion) -> Result<()>; fn generate_random_number(&self) -> i64; diff --git a/core/io/unix.rs b/core/io/unix.rs index 1526e13e4..cfe6d37fe 100644 --- a/core/io/unix.rs +++ b/core/io/unix.rs @@ -286,7 +286,7 @@ impl IO for UnixIO { Ok(()) } - fn wait_for_completion(&self, c: Arc) -> Result<()> { + fn wait_for_completion(&self, c: Completion) -> Result<()> { while !c.is_completed() { self.run_once()?; } @@ -305,10 +305,10 @@ impl IO for UnixIO { } enum CompletionCallback { - Read(Arc>, Arc, usize), + Read(Arc>, Completion, usize), Write( Arc>, - Arc, + Completion, Arc>, usize, ), @@ -364,7 +364,7 @@ impl File for UnixFile<'_> { } #[instrument(err, skip_all, level = Level::TRACE)] - fn pread(&self, pos: usize, c: Arc) -> Result> { + fn pread(&self, pos: usize, c: Completion) -> Result { let file = self.file.lock().unwrap(); let result = { let r = c.as_read(); @@ -401,8 +401,8 @@ impl File for UnixFile<'_> { &self, pos: usize, buffer: Arc>, - c: Arc, - ) -> Result> { + c: Completion, + ) -> Result { let file = self.file.lock().unwrap(); let result = { let buf = buffer.borrow(); @@ -432,7 +432,7 @@ impl File for UnixFile<'_> { } #[instrument(err, skip_all, level = Level::TRACE)] - fn sync(&self, c: Arc) -> Result> { + fn sync(&self, c: Completion) -> Result { let file = self.file.lock().unwrap(); let result = fs::fsync(file.as_fd()); match result { diff --git a/core/io/vfs.rs b/core/io/vfs.rs index bad8a5940..545ce61a1 100644 --- a/core/io/vfs.rs +++ b/core/io/vfs.rs @@ -43,7 +43,7 @@ impl IO for VfsMod { Ok(()) } - fn wait_for_completion(&self, _c: Arc) -> Result<()> { + fn wait_for_completion(&self, _c: Completion) -> Result<()> { todo!(); } @@ -97,7 +97,7 @@ impl File for VfsFileImpl { Ok(()) } - fn pread(&self, pos: usize, c: Arc) -> Result> { + fn pread(&self, pos: usize, c: Completion) -> Result { let r = c.as_read(); let result = { let mut buf = r.buf_mut(); @@ -117,8 +117,8 @@ impl File for VfsFileImpl { &self, pos: usize, buffer: Arc>, - c: Arc, - ) -> Result> { + c: Completion, + ) -> Result { let buf = buffer.borrow(); let count = buf.as_slice().len(); if self.vfs.is_null() { @@ -142,7 +142,7 @@ impl File for VfsFileImpl { } } - fn sync(&self, c: Arc) -> Result> { + fn sync(&self, c: Completion) -> Result { let vfs = unsafe { &*self.vfs }; let result = unsafe { (vfs.sync)(self.file) }; if result < 0 { diff --git a/core/io/windows.rs b/core/io/windows.rs index 6ffdc005b..701e61269 100644 --- a/core/io/windows.rs +++ b/core/io/windows.rs @@ -33,7 +33,7 @@ impl IO for WindowsIO { } #[instrument(err, skip_all, level = Level::TRACE)] - fn wait_for_completion(&self, c: Arc) -> Result<()> { + fn wait_for_completion(&self, c: Completion) -> Result<()> { while !c.is_completed() { self.run_once()?; } @@ -84,7 +84,7 @@ impl File for WindowsFile { } #[instrument(skip(self, c), level = Level::TRACE)] - fn pread(&self, pos: usize, c: Arc) -> Result> { + fn pread(&self, pos: usize, c: Completion) -> Result { let mut file = self.file.write(); file.seek(std::io::SeekFrom::Start(pos as u64))?; let nr = { @@ -103,8 +103,8 @@ impl File for WindowsFile { &self, pos: usize, buffer: Arc>, - c: Arc, - ) -> Result> { + c: Completion, + ) -> Result { let mut file = self.file.write(); file.seek(std::io::SeekFrom::Start(pos as u64))?; let buf = buffer.borrow(); @@ -115,7 +115,7 @@ impl File for WindowsFile { } #[instrument(err, skip_all, level = Level::TRACE)] - fn sync(&self, c: Arc) -> Result> { + fn sync(&self, c: Completion) -> Result { let file = self.file.write(); file.sync_all().map_err(LimboError::IOError)?; c.complete(0); diff --git a/core/storage/database.rs b/core/storage/database.rs index 5d00a09a6..75278dbdb 100644 --- a/core/storage/database.rs +++ b/core/storage/database.rs @@ -9,14 +9,14 @@ use tracing::{instrument, Level}; /// the storage medium. A database can either be a file on disk, like in SQLite, /// or something like a remote page server service. pub trait DatabaseStorage: Send + Sync { - fn read_page(&self, page_idx: usize, c: Completion) -> Result>; + fn read_page(&self, page_idx: usize, c: Completion) -> Result; fn write_page( &self, page_idx: usize, buffer: Arc>, c: Completion, - ) -> Result>; - fn sync(&self, c: Completion) -> Result>; + ) -> Result; + fn sync(&self, c: Completion) -> Result; fn size(&self) -> Result; } @@ -33,7 +33,7 @@ unsafe impl Sync for DatabaseFile {} #[cfg(feature = "fs")] impl DatabaseStorage for DatabaseFile { #[instrument(skip_all, level = Level::DEBUG)] - fn read_page(&self, page_idx: usize, c: Completion) -> Result> { + fn read_page(&self, page_idx: usize, c: Completion) -> Result { let r = c.as_read(); let size = r.buf().len(); assert!(page_idx > 0); @@ -50,7 +50,7 @@ impl DatabaseStorage for DatabaseFile { page_idx: usize, buffer: Arc>, c: Completion, - ) -> Result> { + ) -> Result { let buffer_size = buffer.borrow().len(); assert!(page_idx > 0); assert!(buffer_size >= 512); @@ -61,7 +61,7 @@ impl DatabaseStorage for DatabaseFile { } #[instrument(skip_all, level = Level::DEBUG)] - fn sync(&self, c: Completion) -> Result> { + fn sync(&self, c: Completion) -> Result { self.file.sync(c.into()) } @@ -87,7 +87,7 @@ unsafe impl Sync for FileMemoryStorage {} impl DatabaseStorage for FileMemoryStorage { #[instrument(skip_all, level = Level::DEBUG)] - fn read_page(&self, page_idx: usize, c: Completion) -> Result> { + fn read_page(&self, page_idx: usize, c: Completion) -> Result { let r = c.as_read(); let size = r.buf().len(); assert!(page_idx > 0); @@ -104,7 +104,7 @@ impl DatabaseStorage for FileMemoryStorage { page_idx: usize, buffer: Arc>, c: Completion, - ) -> Result> { + ) -> Result { let buffer_size = buffer.borrow().len(); assert!(buffer_size >= 512); assert!(buffer_size <= 65536); @@ -114,7 +114,7 @@ impl DatabaseStorage for FileMemoryStorage { } #[instrument(skip_all, level = Level::DEBUG)] - fn sync(&self, c: Completion) -> Result> { + fn sync(&self, c: Completion) -> Result { self.file.sync(c.into()) } diff --git a/core/storage/pager.rs b/core/storage/pager.rs index b531f86ef..426b305d0 100644 --- a/core/storage/pager.rs +++ b/core/storage/pager.rs @@ -1167,7 +1167,7 @@ impl Pager { } #[instrument(skip_all, level = Level::DEBUG)] - pub fn wal_get_frame(&self, frame_no: u32, frame: &mut [u8]) -> Result> { + pub fn wal_get_frame(&self, frame_no: u32, frame: &mut [u8]) -> Result { let wal = self.wal.borrow(); wal.read_frame_raw(frame_no.into(), frame) } diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index 4ec89d300..5a1ab7305 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -762,7 +762,7 @@ pub fn begin_read_page( buffer_pool: Arc, page: PageRef, page_idx: usize, -) -> Result> { +) -> Result { tracing::trace!("begin_read_btree_page(page_idx = {})", page_idx); let buf = buffer_pool.get(); let drop_fn = Rc::new(move |buf| { @@ -813,7 +813,7 @@ pub fn begin_write_btree_page( pager: &Pager, page: &PageRef, write_counter: Rc>, -) -> Result> { +) -> Result { tracing::trace!("begin_write_btree_page(page={})", page.get().id); let page_source = &pager.db_file; let page_finish = page.clone(); @@ -1575,7 +1575,7 @@ pub fn begin_read_wal_frame_raw( offset: usize, page_size: u32, complete: Box>, i32)>, -) -> Result> { +) -> Result { tracing::trace!("begin_read_wal_frame_raw(offset={})", offset); let drop_fn = Rc::new(|_buf| {}); let buf = Arc::new(RefCell::new(Buffer::allocate( @@ -1593,7 +1593,7 @@ pub fn begin_read_wal_frame( offset: usize, buffer_pool: Arc, complete: Box>, i32)>, -) -> Result> { +) -> Result { tracing::trace!("begin_read_wal_frame(offset={})", offset); let buf = buffer_pool.get(); let drop_fn = Rc::new(move |buf| { diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 747bd631c..705484e6d 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -209,7 +209,7 @@ pub trait Wal { fn read_frame(&self, frame_id: u64, page: PageRef, buffer_pool: Arc) -> Result<()>; /// Read a raw frame (header included) from the WAL. - fn read_frame_raw(&self, frame_id: u64, frame: &mut [u8]) -> Result>; + fn read_frame_raw(&self, frame_id: u64, frame: &mut [u8]) -> Result; /// Write a raw frame (header included) from the WAL. /// Note, that turso-db will use page_no and size_after fields from the header, but will overwrite checksum with proper value @@ -284,7 +284,7 @@ impl Wal for DummyWAL { Ok(()) } - fn read_frame_raw(&self, _frame_id: u64, _frame: &mut [u8]) -> Result> { + fn read_frame_raw(&self, _frame_id: u64, _frame: &mut [u8]) -> Result { todo!(); } @@ -636,7 +636,7 @@ impl Wal for WalFile { } #[instrument(skip_all, level = Level::DEBUG)] - fn read_frame_raw(&self, frame_id: u64, frame: &mut [u8]) -> Result> { + fn read_frame_raw(&self, frame_id: u64, frame: &mut [u8]) -> Result { tracing::debug!("read_frame({})", frame_id); let offset = self.frame_offset(frame_id); let (frame_ptr, frame_len) = (frame.as_mut_ptr(), frame.len()); @@ -731,7 +731,7 @@ impl Wal for WalFile { db_size as u32, page, ); - let c = Arc::new(Completion::new_write(|_| {})); + let c = Completion::new_write(|_| {}); let c = shared.file.pwrite(offset, frame_bytes, c)?; self.io.wait_for_completion(c)?; self.complete_append_frame(page_id, frame_id, checksums); diff --git a/core/vdbe/sorter.rs b/core/vdbe/sorter.rs index 021256d2a..dd84fa195 100644 --- a/core/vdbe/sorter.rs +++ b/core/vdbe/sorter.rs @@ -402,7 +402,7 @@ impl SortedChunk { read_buffer_ref, read_complete, ))); - self.file.pread(self.total_bytes_read.get(), Arc::new(c))?; + self.file.pread(self.total_bytes_read.get(), c)?; Ok(()) } @@ -448,7 +448,7 @@ impl SortedChunk { }); let c = Completion::new(CompletionType::Write(WriteCompletion::new(write_complete))); - self.file.pwrite(0, buffer_ref, Arc::new(c))?; + self.file.pwrite(0, buffer_ref, c)?; Ok(()) } } diff --git a/simulator/runner/file.rs b/simulator/runner/file.rs index d1682c353..3905a3ce7 100644 --- a/simulator/runner/file.rs +++ b/simulator/runner/file.rs @@ -38,12 +38,12 @@ pub(crate) struct SimulatorFile { pub latency_probability: usize, - pub sync_completion: RefCell>>, + pub sync_completion: RefCell>, pub queued_io: RefCell>, pub clock: Arc, } -type IoOperation = Box Result>>; +type IoOperation = Box Result>; pub struct DelayedIo { pub time: turso_core::Instant, @@ -149,11 +149,7 @@ impl File for SimulatorFile { self.inner.unlock_file() } - fn pread( - &self, - pos: usize, - c: Arc, - ) -> Result> { + fn pread(&self, pos: usize, c: turso_core::Completion) -> Result { self.nr_pread_calls.set(self.nr_pread_calls.get() + 1); if self.fault.get() { tracing::debug!("pread fault"); @@ -178,8 +174,8 @@ impl File for SimulatorFile { &self, pos: usize, buffer: Arc>, - c: Arc, - ) -> Result> { + c: turso_core::Completion, + ) -> Result { self.nr_pwrite_calls.set(self.nr_pwrite_calls.get() + 1); if self.fault.get() { tracing::debug!("pwrite fault"); @@ -200,7 +196,7 @@ impl File for SimulatorFile { } } - fn sync(&self, c: Arc) -> Result> { + fn sync(&self, c: turso_core::Completion) -> Result { self.nr_sync_calls.set(self.nr_sync_calls.get() + 1); if self.fault.get() { // TODO: Enable this when https://github.com/tursodatabase/turso/issues/2091 is fixed. diff --git a/simulator/runner/io.rs b/simulator/runner/io.rs index 7c888cc7f..29973f889 100644 --- a/simulator/runner/io.rs +++ b/simulator/runner/io.rs @@ -104,7 +104,7 @@ impl IO for SimulatorIO { Ok(file) } - fn wait_for_completion(&self, c: Arc) -> Result<()> { + fn wait_for_completion(&self, c: turso_core::Completion) -> Result<()> { while !c.is_completed() { self.run_once()?; }