From 05df548b104e77c624f8af15aa8995536ab2a18f Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Thu, 22 May 2025 09:43:28 +0300 Subject: [PATCH] core/io: Add wait_for_completion() to I/O dispatcher --- bindings/wasm/lib.rs | 7 +++++++ core/io/generic.rs | 7 +++++++ core/io/io_uring.rs | 15 +++++++++++---- core/io/memory.rs | 4 ++++ core/io/mod.rs | 32 +++++++++++++++++++++++++++++--- core/io/unix.rs | 7 +++++++ core/io/vfs.rs | 4 ++++ core/io/windows.rs | 13 ++++++++++--- core/storage/sqlite3_ondisk.rs | 1 + core/storage/wal.rs | 1 + simulator/runner/io.rs | 7 +++++++ 11 files changed, 88 insertions(+), 10 deletions(-) diff --git a/bindings/wasm/lib.rs b/bindings/wasm/lib.rs index 4277f1f3f..3bf8e14ff 100644 --- a/bindings/wasm/lib.rs +++ b/bindings/wasm/lib.rs @@ -283,6 +283,13 @@ impl limbo_core::IO for PlatformIO { })) } + fn wait_for_completion(&self, c: Arc) -> Result<()> { + while !c.is_completed() { + self.run_once()?; + } + Ok(()) + } + fn run_once(&self) -> Result<()> { Ok(()) } diff --git a/core/io/generic.rs b/core/io/generic.rs index 60e11f119..11dfe9971 100644 --- a/core/io/generic.rs +++ b/core/io/generic.rs @@ -35,6 +35,13 @@ impl IO for GenericIO { })) } + fn wait_for_completion(&self, c: Arc) -> Result<()> { + while !c.is_completed() { + self.run_once()?; + } + Ok(()) + } + fn run_once(&self) -> Result<()> { Ok(()) } diff --git a/core/io/io_uring.rs b/core/io/io_uring.rs index 44d4c5696..5f0045888 100644 --- a/core/io/io_uring.rs +++ b/core/io/io_uring.rs @@ -45,7 +45,7 @@ unsafe impl Sync for UringIO {} struct WrappedIOUring { ring: io_uring::IoUring, pending_ops: usize, - pub pending: [Option; MAX_IOVECS as usize + 1], + pub pending: [Option>; MAX_IOVECS as usize + 1], key: u64, } @@ -169,6 +169,13 @@ impl IO for UringIO { Ok(uring_file) } + fn wait_for_completion(&self, c: Arc) -> Result<()> { + while !c.is_completed() { + self.run_once()?; + } + Ok(()) + } + fn run_once(&self) -> Result<()> { trace!("run_once()"); let mut inner = self.inner.borrow_mut(); @@ -298,16 +305,16 @@ impl File for UringFile { }; io.ring.submit_entry( &write, - Completion::Write(WriteCompletion::new(Box::new(move |result| { + Arc::new(Completion::Write(WriteCompletion::new(Box::new(move |result| { c.complete(result); // NOTE: Explicitly reference buffer to ensure it lives until here let _ = buffer.borrow(); - }))), + })))), ); Ok(()) } - fn sync(&self, c: Completion) -> Result<()> { + fn sync(&self, c: Arc) -> Result<()> { let fd = io_uring::types::Fd(self.file.as_raw_fd()); let mut io = self.io.borrow_mut(); trace!("sync()"); diff --git a/core/io/memory.rs b/core/io/memory.rs index cb20c6e63..415da8ca3 100644 --- a/core/io/memory.rs +++ b/core/io/memory.rs @@ -53,6 +53,10 @@ impl IO for MemoryIO { Ok(()) } + fn wait_for_completion(&self, _c: Arc) -> Result<()> { + todo!(); + } + fn generate_random_number(&self) -> i64 { let mut buf = [0u8; 8]; getrandom::getrandom(&mut buf).unwrap(); diff --git a/core/io/mod.rs b/core/io/mod.rs index 4cbe785c5..9c5f6d9c2 100644 --- a/core/io/mod.rs +++ b/core/io/mod.rs @@ -42,6 +42,8 @@ pub trait IO: Clock + Send + Sync { fn run_once(&self) -> Result<()>; + fn wait_for_completion(&self, c: Arc) -> Result<()>; + fn generate_random_number(&self) -> i64; fn get_memory_io(&self) -> Arc; @@ -60,9 +62,18 @@ pub enum Completion { pub struct ReadCompletion { pub buf: Arc>, pub complete: Box, + pub is_completed: RefCell, } impl Completion { + pub fn is_completed(&self) -> bool { + match self { + Self::Read(r) => *r.is_completed.borrow(), + Self::Write(w) => *w.is_completed.borrow(), + Self::Sync(s) => *s.is_completed.borrow(), + } + } + pub fn complete(&self, result: i32) { match self { Self::Read(r) => r.complete(), @@ -83,15 +94,21 @@ impl Completion { pub struct WriteCompletion { pub complete: Box, + pub is_completed: RefCell, } pub struct SyncCompletion { pub complete: Box, + pub is_completed: RefCell, } impl ReadCompletion { pub fn new(buf: Arc>, complete: Box) -> Self { - Self { buf, complete } + Self { + buf, + complete, + is_completed: RefCell::new(false), + } } pub fn buf(&self) -> Ref<'_, Buffer> { @@ -104,26 +121,35 @@ impl ReadCompletion { pub fn complete(&self) { (self.complete)(self.buf.clone()); + *self.is_completed.borrow_mut() = true; } } impl WriteCompletion { pub fn new(complete: Box) -> Self { - Self { complete } + Self { + complete, + is_completed: RefCell::new(false), + } } pub fn complete(&self, bytes_written: i32) { (self.complete)(bytes_written); + *self.is_completed.borrow_mut() = true; } } impl SyncCompletion { pub fn new(complete: Box) -> Self { - Self { complete } + Self { + complete, + is_completed: RefCell::new(false), + } } pub fn complete(&self, res: i32) { (self.complete)(res); + *self.is_completed.borrow_mut() = true; } } diff --git a/core/io/unix.rs b/core/io/unix.rs index 8c2f121e3..6772d6e45 100644 --- a/core/io/unix.rs +++ b/core/io/unix.rs @@ -256,6 +256,13 @@ impl IO for UnixIO { Ok(()) } + fn wait_for_completion(&self, c: Arc) -> Result<()> { + while !c.is_completed() { + self.run_once()?; + } + Ok(()) + } + fn generate_random_number(&self) -> i64 { let mut buf = [0u8; 8]; getrandom::getrandom(&mut buf).unwrap(); diff --git a/core/io/vfs.rs b/core/io/vfs.rs index 7e953ebd0..7aae691e6 100644 --- a/core/io/vfs.rs +++ b/core/io/vfs.rs @@ -43,6 +43,10 @@ impl IO for VfsMod { Ok(()) } + fn wait_for_completion(&self, _c: Arc) -> Result<()> { + todo!(); + } + fn generate_random_number(&self) -> i64 { if self.ctx.is_null() { return -1; diff --git a/core/io/windows.rs b/core/io/windows.rs index a329abc14..dfa4dc241 100644 --- a/core/io/windows.rs +++ b/core/io/windows.rs @@ -33,6 +33,13 @@ impl IO for WindowsIO { })) } + fn wait_for_completion(&self, c: Arc) -> Result<()> { + while !c.is_completed() { + self.run_once()?; + } + Ok(()) + } + fn run_once(&self) -> Result<()> { Ok(()) } @@ -74,7 +81,7 @@ impl File for WindowsFile { unimplemented!() } - fn pread(&self, pos: usize, c: Completion) -> Result<()> { + fn pread(&self, pos: usize, c: Arc) -> Result<()> { let mut file = self.file.borrow_mut(); file.seek(std::io::SeekFrom::Start(pos as u64))?; { @@ -87,7 +94,7 @@ impl File for WindowsFile { Ok(()) } - fn pwrite(&self, pos: usize, buffer: Arc>, c: Completion) -> Result<()> { + fn pwrite(&self, pos: usize, buffer: Arc>, c: Arc) -> Result<()> { let mut file = self.file.borrow_mut(); file.seek(std::io::SeekFrom::Start(pos as u64))?; let buf = buffer.borrow(); @@ -97,7 +104,7 @@ impl File for WindowsFile { Ok(()) } - fn sync(&self, c: Completion) -> Result<()> { + fn sync(&self, c: Arc) -> Result<()> { let file = self.file.borrow_mut(); file.sync_all().map_err(LimboError::IOError)?; c.complete(0); diff --git a/core/storage/sqlite3_ondisk.rs b/core/storage/sqlite3_ondisk.rs index dcbed64d0..e7bb68b41 100644 --- a/core/storage/sqlite3_ondisk.rs +++ b/core/storage/sqlite3_ondisk.rs @@ -892,6 +892,7 @@ pub fn begin_sync(db_file: Arc, syncing: Rc>) complete: Box::new(move |_| { *syncing.borrow_mut() = false; }), + is_completed: RefCell::new(false), }); #[allow(clippy::arc_with_non_send_sync)] db_file.sync(Arc::new(completion))?; diff --git a/core/storage/wal.rs b/core/storage/wal.rs index 4eab23e44..c929d69e3 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -751,6 +751,7 @@ impl Wal for WalFile { debug!("wal_sync finish"); *syncing.borrow_mut() = false; }), + is_completed: RefCell::new(false), }); shared.file.sync(Arc::new(completion))?; } diff --git a/simulator/runner/io.rs b/simulator/runner/io.rs index c775b3f9e..634873453 100644 --- a/simulator/runner/io.rs +++ b/simulator/runner/io.rs @@ -83,6 +83,13 @@ impl IO for SimulatorIO { Ok(file) } + fn wait_for_completion(&self, c: Arc) -> Result<()> { + while !c.is_completed() { + self.run_once()?; + } + Ok(()) + } + fn run_once(&self) -> Result<()> { if *self.fault.borrow() { *self.nr_run_once_faults.borrow_mut() += 1;