mirror of
https://github.com/aljazceru/turso.git
synced 2026-02-08 17:54:22 +01:00
Fix default io:;File::pwritev impl
This commit is contained in:
@@ -24,21 +24,36 @@ pub trait File: Send + Sync {
|
||||
buffers: Vec<Arc<RefCell<Buffer>>>,
|
||||
c: Completion,
|
||||
) -> Result<Completion> {
|
||||
// FIXME: for now, stupid default so i dont have to impl for all backends
|
||||
let counter = Rc::new(Cell::new(0));
|
||||
let len = buffers.len();
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
if buffers.is_empty() {
|
||||
c.complete(0);
|
||||
return Ok(c);
|
||||
}
|
||||
// naive default implementation can be overridden on backends where it makes sense to
|
||||
let mut pos = pos;
|
||||
let outstanding = Arc::new(AtomicUsize::new(buffers.len()));
|
||||
let total_written = Arc::new(AtomicUsize::new(0));
|
||||
|
||||
for buf in buffers {
|
||||
let _counter = counter.clone();
|
||||
let _c = c.clone();
|
||||
let default_c = Completion::new_write(move |_| {
|
||||
_counter.set(_counter.get() + 1);
|
||||
if _counter.get() == len {
|
||||
_c.complete(len as i32); // complete the original completion
|
||||
}
|
||||
});
|
||||
let len = buf.borrow().len();
|
||||
self.pwrite(pos, buf, default_c)?;
|
||||
let child_c = {
|
||||
let c_main = c.clone();
|
||||
let outstanding = outstanding.clone();
|
||||
let total_written = total_written.clone();
|
||||
Completion::new_write(move |n| {
|
||||
// accumulate bytes actually reported by the backend
|
||||
total_written.fetch_add(n as usize, Ordering::Relaxed);
|
||||
if outstanding.fetch_sub(1, Ordering::AcqRel) == 1 {
|
||||
// last one finished
|
||||
c_main.complete(total_written.load(Ordering::Acquire) as i32);
|
||||
}
|
||||
})
|
||||
};
|
||||
if let Err(e) = self.pwrite(pos, buf.clone(), child_c) {
|
||||
// best-effort: mark as done so caller won't wait forever
|
||||
c.complete(-1);
|
||||
return Err(e);
|
||||
}
|
||||
pos += len;
|
||||
}
|
||||
Ok(c)
|
||||
|
||||
Reference in New Issue
Block a user