diff --git a/core/io/mod.rs b/core/io/mod.rs index ab299ef64..f7766ef84 100644 --- a/core/io/mod.rs +++ b/core/io/mod.rs @@ -24,21 +24,36 @@ pub trait File: Send + Sync { buffers: Vec>>, c: Completion, ) -> Result { - // FIXME: for now, stupid default so i dont have to impl for all backends - let counter = Rc::new(Cell::new(0)); - let len = buffers.len(); + use std::sync::atomic::{AtomicUsize, Ordering}; + if buffers.is_empty() { + c.complete(0); + return Ok(c); + } + // naive default implementation can be overridden on backends where it makes sense to let mut pos = pos; + let outstanding = Arc::new(AtomicUsize::new(buffers.len())); + let total_written = Arc::new(AtomicUsize::new(0)); + for buf in buffers { - let _counter = counter.clone(); - let _c = c.clone(); - let default_c = Completion::new_write(move |_| { - _counter.set(_counter.get() + 1); - if _counter.get() == len { - _c.complete(len as i32); // complete the original completion - } - }); let len = buf.borrow().len(); - self.pwrite(pos, buf, default_c)?; + let child_c = { + let c_main = c.clone(); + let outstanding = outstanding.clone(); + let total_written = total_written.clone(); + Completion::new_write(move |n| { + // accumulate bytes actually reported by the backend + total_written.fetch_add(n as usize, Ordering::Relaxed); + if outstanding.fetch_sub(1, Ordering::AcqRel) == 1 { + // last one finished + c_main.complete(total_written.load(Ordering::Acquire) as i32); + } + }) + }; + if let Err(e) = self.pwrite(pos, buf.clone(), child_c) { + // best-effort: mark as done so caller won't wait forever + c.complete(-1); + return Err(e); + } pos += len; } Ok(c)