Merge 'Revive async io extension PR' from Preston Thorpe

bringing #1127 back to life, except better because this doesn't add
Tokio as a dependency for extension lib just for tests.

Reviewed-by: Jussi Saurio <jussi.saurio@gmail.com>

Closes #2418
This commit is contained in:
Preston Thorpe
2025-08-14 16:10:09 -04:00
committed by GitHub
5 changed files with 298 additions and 86 deletions

View File

@@ -1,10 +1,12 @@
use super::{Buffer, Completion, File, MemoryIO, OpenFlags, IO};
use crate::ext::VfsMod;
use crate::io::clock::{Clock, Instant};
use crate::io::CompletionInner;
use crate::{LimboError, Result};
use std::ffi::{c_void, CString};
use std::ptr::NonNull;
use std::sync::Arc;
use turso_ext::{VfsFileImpl, VfsImpl};
use turso_ext::{BufferRef, IOCallback, SendPtr, VfsFileImpl, VfsImpl};
impl Clock for VfsMod {
fn now(&self) -> Instant {
@@ -75,6 +77,24 @@ impl VfsMod {
}
}
// #Safety:
// the callback wrapper in the extension library is FnOnce, so we know
/// # Safety
/// the callback wrapper in the extension library is FnOnce, so we know
/// that the into_raw/from_raw contract will hold
unsafe extern "C" fn callback_fn(result: i32, ctx: SendPtr) {
let completion = Completion {
inner: (Arc::from_raw(ctx.inner().as_ptr() as *mut CompletionInner)),
};
completion.complete(result);
}
fn to_callback(c: Completion) -> IOCallback {
IOCallback::new(callback_fn, unsafe {
NonNull::new_unchecked(Arc::into_raw(c.inner) as *mut c_void)
})
}
impl File for VfsFileImpl {
fn lock_file(&self, exclusive: bool) -> Result<()> {
let vfs = unsafe { &*self.vfs };
@@ -98,47 +118,64 @@ impl File for VfsFileImpl {
}
fn pread(&self, pos: usize, c: Completion) -> Result<Completion> {
let r = c.as_read();
let result = {
let buf = r.buf();
let count = buf.len();
let vfs = unsafe { &*self.vfs };
unsafe { (vfs.read)(self.file, buf.as_mut_ptr(), count, pos as i64) }
};
if result < 0 {
Err(LimboError::ExtensionError("pread failed".to_string()))
} else {
c.complete(result);
Ok(c)
if self.vfs.is_null() {
c.complete(-1);
return Err(LimboError::ExtensionError("VFS is null".to_string()));
}
let r = c.as_read();
let buf = r.buf();
let len = buf.len();
let cb = to_callback(c.clone());
let vfs = unsafe { &*self.vfs };
let res = unsafe {
(vfs.read)(
self.file,
BufferRef::new(buf.as_mut_ptr(), len),
pos as i64,
cb,
)
};
if res.is_error() {
return Err(LimboError::ExtensionError("pread failed".to_string()));
}
Ok(c)
}
fn pwrite(&self, pos: usize, buffer: Arc<Buffer>, c: Completion) -> Result<Completion> {
let count = buffer.as_slice().len();
if self.vfs.is_null() {
c.complete(-1);
return Err(LimboError::ExtensionError("VFS is null".to_string()));
}
let vfs = unsafe { &*self.vfs };
let result =
unsafe { (vfs.write)(self.file, buffer.as_ptr() as *mut u8, count, pos as i64) };
if result < 0 {
Err(LimboError::ExtensionError("pwrite failed".to_string()))
} else {
c.complete(result);
Ok(c)
let res = unsafe {
let buf = buffer.clone();
let len = buf.len();
let cb = to_callback(c.clone());
(vfs.write)(
self.file,
BufferRef::new(buf.as_ptr() as *mut u8, len),
pos as i64,
cb,
)
};
if res.is_error() {
return Err(LimboError::ExtensionError("pwrite failed".to_string()));
}
Ok(c)
}
fn sync(&self, c: Completion) -> Result<Completion> {
let vfs = unsafe { &*self.vfs };
let result = unsafe { (vfs.sync)(self.file) };
if result < 0 {
Err(LimboError::ExtensionError("sync failed".to_string()))
} else {
c.complete(0);
Ok(c)
if self.vfs.is_null() {
c.complete(-1);
return Err(LimboError::ExtensionError("VFS is null".to_string()));
}
let vfs = unsafe { &*self.vfs };
let cb = to_callback(c.clone());
let res = unsafe { (vfs.sync)(self.file, cb) };
if res.is_error() {
return Err(LimboError::ExtensionError("sync failed".to_string()));
}
Ok(c)
}
fn size(&self) -> Result<u64> {
@@ -153,16 +190,16 @@ impl File for VfsFileImpl {
fn truncate(&self, len: usize, c: Completion) -> Result<Completion> {
if self.vfs.is_null() {
c.complete(-1);
return Err(LimboError::ExtensionError("VFS is null".to_string()));
}
let vfs = unsafe { &*self.vfs };
let result = unsafe { (vfs.truncate)(self.file, len as i64) };
if result.is_error() {
Err(LimboError::ExtensionError("truncate failed".to_string()))
} else {
c.complete(0);
Ok(c)
let cb = to_callback(c.clone());
let res = unsafe { (vfs.truncate)(self.file, len as i64, cb) };
if res.is_error() {
return Err(LimboError::ExtensionError("truncate failed".to_string()));
}
Ok(c)
}
}

View File

@@ -13,7 +13,10 @@ pub use turso_macros::VfsDerive;
pub use turso_macros::{register_extension, scalar, AggregateDerive, VTabModuleDerive};
pub use types::{ResultCode, StepResult, Value, ValueType};
#[cfg(feature = "vfs")]
pub use vfs_modules::{RegisterVfsFn, VfsExtension, VfsFile, VfsFileImpl, VfsImpl, VfsInterface};
pub use vfs_modules::{
BufferRef, Callback, IOCallback, RegisterVfsFn, SendPtr, VfsExtension, VfsFile, VfsFileImpl,
VfsImpl, VfsInterface,
};
use vtabs::RegisterModuleFn;
pub use vtabs::{
Conn, Connection, ConstraintInfo, ConstraintOp, ConstraintUsage, ExtIndexInfo, IndexInfo,

View File

@@ -1,5 +1,9 @@
use crate::{ExtResult, ExtensionApi, ResultCode};
use std::ffi::{c_char, c_void};
use std::{
ffi::{c_char, c_void},
ops::{Deref, DerefMut},
ptr::NonNull,
};
/// Field for ExtensionApi to interface with VFS extensions,
/// separated to more easily feature flag out for WASM builds.
@@ -38,10 +42,10 @@ pub trait VfsFile: Send + Sync {
fn unlock(&self) -> ExtResult<()> {
Ok(())
}
fn read(&mut self, buf: &mut [u8], count: usize, offset: i64) -> ExtResult<i32>;
fn write(&mut self, buf: &[u8], count: usize, offset: i64) -> ExtResult<i32>;
fn sync(&self) -> ExtResult<()>;
fn truncate(&self, len: i64) -> ExtResult<()>;
fn read(&mut self, buf: BufferRef, offset: i64, cb: Callback) -> ExtResult<()>;
fn write(&mut self, buf: BufferRef, offset: i64, cb: Callback) -> ExtResult<()>;
fn sync(&self, cb: Callback) -> ExtResult<()>;
fn truncate(&self, len: i64, cb: Callback) -> ExtResult<()>;
fn size(&self) -> i64;
}
@@ -63,6 +67,93 @@ pub struct VfsImpl {
pub truncate: VfsTruncate,
}
/// a wrapper around the raw `*mut u8` buffer for extensions.
#[derive(Debug)]
#[repr(C)]
pub struct BufferRef {
_ptr: NonNull<u8>,
len: usize,
}
unsafe impl Send for BufferRef {}
impl BufferRef {
/// create a new `BufferRef` from a raw pointer
///
/// # Safety
/// The caller must ensure that the pointer is valid and the buffer is not deallocated.
/// This function should only be called with a pointer to a buffer allocated by the `Buffer` type defined in the `core` module.
pub unsafe fn new(ptr: *mut u8, len: usize) -> Self {
Self {
_ptr: NonNull::new(ptr).expect("Received null buffer pointer"),
len,
}
}
pub fn len(&self) -> usize {
self.len
}
pub fn is_empty(&self) -> bool {
self.len == 0
}
/// Get a safe slice reference to the buffer
pub fn as_slice(&self) -> &[u8] {
unsafe { std::slice::from_raw_parts(self._ptr.as_ptr(), self.len) }
}
/// Get a safe mutable slice reference to the buffer
pub fn as_mut_slice(&mut self) -> &mut [u8] {
unsafe { std::slice::from_raw_parts_mut(self._ptr.as_ptr(), self.len) }
}
}
impl Deref for BufferRef {
type Target = [u8];
fn deref(&self) -> &Self::Target {
self.as_slice()
}
}
impl DerefMut for BufferRef {
fn deref_mut(&mut self) -> &mut Self::Target {
self.as_mut_slice()
}
}
pub type Callback = Box<dyn FnOnce(i32) + Send>;
#[repr(C)]
pub struct IOCallback {
pub callback: CallbackFn,
pub ctx: SendPtr,
}
unsafe impl Send for IOCallback {}
#[repr(transparent)]
/// Wrapper type to support creating Box<dyn FnOnce()+Send> obj
/// that needs to call a C function with an opaque pointer.
pub struct SendPtr(NonNull<c_void>);
unsafe impl Send for SendPtr {}
impl SendPtr {
pub fn inner(&self) -> NonNull<c_void> {
self.0
}
}
impl IOCallback {
pub fn new(cb: CallbackFn, ctx: NonNull<c_void>) -> Self {
Self {
callback: cb,
ctx: SendPtr(ctx),
}
}
}
pub type CallbackFn = unsafe extern "C" fn(res: i32, user_data: SendPtr);
pub type RegisterVfsFn =
unsafe extern "C" fn(name: *const c_char, vfs: *const VfsImpl) -> ResultCode;
@@ -75,15 +166,24 @@ pub type VfsOpen = unsafe extern "C" fn(
pub type VfsClose = unsafe extern "C" fn(file: *const c_void) -> ResultCode;
pub type VfsRead =
unsafe extern "C" fn(file: *const c_void, buf: *mut u8, count: usize, offset: i64) -> i32;
pub type VfsRead = unsafe extern "C" fn(
file: *const c_void,
buf: BufferRef,
offset: i64,
cb: IOCallback,
) -> ResultCode;
pub type VfsWrite =
unsafe extern "C" fn(file: *const c_void, buf: *const u8, count: usize, offset: i64) -> i32;
pub type VfsWrite = unsafe extern "C" fn(
file: *const c_void,
buf: BufferRef,
offset: i64,
cb: IOCallback,
) -> ResultCode;
pub type VfsSync = unsafe extern "C" fn(file: *const c_void) -> i32;
pub type VfsSync = unsafe extern "C" fn(file: *const c_void, cb: IOCallback) -> ResultCode;
pub type VfsTruncate = unsafe extern "C" fn(file: *const c_void, len: i64) -> ResultCode;
pub type VfsTruncate =
unsafe extern "C" fn(file: *const c_void, len: i64, cb: IOCallback) -> ResultCode;
pub type VfsLock = unsafe extern "C" fn(file: *const c_void, exclusive: bool) -> ResultCode;

View File

@@ -1,17 +1,17 @@
use std::cell::RefCell;
use std::collections::BTreeMap;
use std::collections::{BTreeMap, VecDeque};
use std::fs::{File, OpenOptions};
use std::io::{Read, Seek, SeekFrom, Write};
use std::num::NonZeroUsize;
use std::rc::Rc;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use turso_ext::{
register_extension, scalar, Connection, ConstraintInfo, ConstraintOp, ConstraintUsage,
ExtResult, IndexInfo, OrderByInfo, ResultCode, StepResult, VTabCursor, VTabKind, VTabModule,
VTabModuleDerive, VTable, Value,
};
#[cfg(not(target_family = "wasm"))]
use turso_ext::{VfsDerive, VfsExtension, VfsFile};
use turso_ext::{BufferRef, Callback, VfsDerive, VfsExtension, VfsFile};
register_extension! {
vtabs: { KVStoreVTabModule, TableStatsVtabModule },
@@ -265,7 +265,35 @@ impl VTable for KVStoreTable {
}
}
#[derive(Default, Clone)]
pub struct CallbackQueue {
queue: Arc<Mutex<VecDeque<(Callback, i32)>>>,
}
impl CallbackQueue {
pub fn new() -> Self {
Self {
queue: Arc::new(Mutex::new(VecDeque::new())),
}
}
// Store a callback with its result for later execution
pub fn enqueue(&self, callback: Callback, result: i32) {
let mut queue = self.queue.lock().unwrap();
queue.push_back((callback, result));
}
// Process all pending callbacks
pub fn process_all(&self) {
let mut queue = self.queue.lock().unwrap();
while let Some((callback, result)) = queue.pop_front() {
callback(result);
}
}
}
pub struct TestFile {
io: CallbackQueue,
file: File,
}
@@ -274,7 +302,9 @@ pub struct TestFS;
#[cfg(not(target_family = "wasm"))]
#[derive(VfsDerive, Default)]
pub struct TestFS;
pub struct TestFS {
callbacks: CallbackQueue,
}
// Test that we can have additional extension types in the same file
// and still register the vfs at comptime if linking staticly
@@ -287,6 +317,12 @@ fn test_scalar(_args: turso_ext::Value) -> turso_ext::Value {
impl VfsExtension for TestFS {
const NAME: &'static str = "testvfs";
type File = TestFile;
fn run_once(&self) -> ExtResult<()> {
log::debug!("running once with testing VFS");
self.callbacks.process_all();
Ok(())
}
fn open_file(&self, path: &str, flags: i32, _direct: bool) -> ExtResult<Self::File> {
let _ = env_logger::try_init();
log::debug!("opening file with testing VFS: {path} flags: {flags}");
@@ -296,42 +332,67 @@ impl VfsExtension for TestFS {
.create(flags & 1 != 0)
.open(path)
.map_err(|_| ResultCode::Error)?;
Ok(TestFile { file })
Ok(TestFile {
file,
io: self.callbacks.clone(),
})
}
}
#[cfg(not(target_family = "wasm"))]
impl VfsFile for TestFile {
fn read(&mut self, buf: &mut [u8], count: usize, offset: i64) -> ExtResult<i32> {
log::debug!("reading file with testing VFS: bytes: {count} offset: {offset}");
fn read(&mut self, mut buf: BufferRef, offset: i64, cb: Callback) -> ExtResult<()> {
log::debug!(
"reading file with testing VFS: bytes: {} offset: {}",
buf.len(),
offset
);
if self.file.seek(SeekFrom::Start(offset as u64)).is_err() {
return Err(ResultCode::Error);
}
self.file
.read(&mut buf[..count])
let len = buf.len();
let buf = buf.as_mut_slice();
let res = self
.file
.read(&mut buf[..len])
.map_err(|_| ResultCode::Error)
.map(|n| n as i32)
.map(|n| n as i32)?;
self.io.enqueue(cb, res);
Ok(())
}
fn write(&mut self, buf: &[u8], count: usize, offset: i64) -> ExtResult<i32> {
log::debug!("writing to file with testing VFS: bytes: {count} offset: {offset}");
fn write(&mut self, buf: turso_ext::BufferRef, offset: i64, cb: Callback) -> ExtResult<()> {
log::debug!(
"writing to file with testing VFS: bytes: {} offset: {offset}",
buf.len()
);
if self.file.seek(SeekFrom::Start(offset as u64)).is_err() {
return Err(ResultCode::Error);
}
self.file
.write(&buf[..count])
let len = buf.len();
let n = self
.file
.write(&buf[..len])
.map_err(|_| ResultCode::Error)
.map(|n| n as i32)
.map(|n| n as i32)?;
self.io.enqueue(cb, n);
Ok(())
}
fn sync(&self) -> ExtResult<()> {
fn sync(&self, cb: Callback) -> ExtResult<()> {
log::debug!("syncing file with testing VFS");
self.file.sync_all().map_err(|_| ResultCode::Error)
self.file.sync_all().map_err(|_| ResultCode::Error)?;
self.io.enqueue(cb, 0);
Ok(())
}
fn truncate(&self, len: i64) -> ExtResult<()> {
fn truncate(&self, len: i64, cb: Callback) -> ExtResult<()> {
log::debug!("truncating file with testing VFS to length: {len}");
self.file.set_len(len as u64).map_err(|_| ResultCode::Error)
self.file
.set_len(len as u64)
.map_err(|_| ResultCode::Error)?;
self.io.enqueue(cb, 0);
Ok(())
}
fn size(&self) -> i64 {

View File

@@ -70,6 +70,13 @@ pub fn derive_vfs_module(input: TokenStream) -> TokenStream {
(api.vfs_interface.register_vfs)(name, vfsimpl)
}
fn __get_cb(cb: ::turso_ext::IOCallback) -> ::std::boxed::Box<dyn FnOnce(i32) + Send> {
let callback: ::std::boxed::Box<dyn FnOnce(i32) + Send> = ::std::boxed::Box::new(move | res: i32| {
unsafe { (cb.callback)(res, cb.ctx) };
});
callback
}
#[no_mangle]
pub unsafe extern "C" fn #open_fn_name(
ctx: *const ::std::ffi::c_void,
@@ -109,18 +116,19 @@ pub fn derive_vfs_module(input: TokenStream) -> TokenStream {
::turso_ext::ResultCode::OK
}
#[no_mangle]
pub unsafe extern "C" fn #read_fn_name(file_ptr: *const ::std::ffi::c_void, buf: *mut u8, count: usize, offset: i64) -> i32 {
#[no_mangle]
pub unsafe extern "C" fn #read_fn_name(file_ptr: *const ::std::ffi::c_void, buf: ::turso_ext::BufferRef, offset: i64, cb: ::turso_ext::IOCallback) -> ::turso_ext::ResultCode {
if file_ptr.is_null() {
return -1;
return ::turso_ext::ResultCode::Error;
}
let callback = __get_cb(cb);
let vfs_file: &mut ::turso_ext::VfsFileImpl = &mut *(file_ptr as *mut ::turso_ext::VfsFileImpl);
let file: &mut <#struct_name as ::turso_ext::VfsExtension>::File =
&mut *(vfs_file.file as *mut <#struct_name as ::turso_ext::VfsExtension>::File);
match <#struct_name as ::turso_ext::VfsExtension>::File::read(file, ::std::slice::from_raw_parts_mut(buf, count), count, offset) {
Ok(n) => n,
Err(_) => -1,
if <#struct_name as ::turso_ext::VfsExtension>::File::read(file, buf, offset, callback).is_err() {
return ::turso_ext::ResultCode::Error;
}
::turso_ext::ResultCode::OK
}
#[no_mangle]
@@ -136,17 +144,18 @@ pub fn derive_vfs_module(input: TokenStream) -> TokenStream {
}
#[no_mangle]
pub unsafe extern "C" fn #write_fn_name(file_ptr: *const ::std::ffi::c_void, buf: *const u8, count: usize, offset: i64) -> i32 {
pub unsafe extern "C" fn #write_fn_name(file_ptr: *const ::std::ffi::c_void, buf: ::turso_ext::BufferRef, offset: i64, cb: ::turso_ext::IOCallback) -> ::turso_ext::ResultCode {
if file_ptr.is_null() {
return -1;
return ::turso_ext::ResultCode::Error;
}
let callback = __get_cb(cb);
let vfs_file: &mut ::turso_ext::VfsFileImpl = &mut *(file_ptr as *mut ::turso_ext::VfsFileImpl);
let file: &mut <#struct_name as ::turso_ext::VfsExtension>::File =
&mut *(vfs_file.file as *mut <#struct_name as ::turso_ext::VfsExtension>::File);
match <#struct_name as ::turso_ext::VfsExtension>::File::write(file, ::std::slice::from_raw_parts(buf, count), count, offset) {
Ok(n) => n,
Err(_) => -1,
if <#struct_name as ::turso_ext::VfsExtension>::File::write(file, buf, offset, callback).is_err() {
return ::turso_ext::ResultCode::Error;
}
::turso_ext::ResultCode::OK
}
#[no_mangle]
@@ -178,29 +187,31 @@ pub fn derive_vfs_module(input: TokenStream) -> TokenStream {
}
#[no_mangle]
pub unsafe extern "C" fn #sync_fn_name(file_ptr: *const ::std::ffi::c_void) -> i32 {
pub unsafe extern "C" fn #sync_fn_name(file_ptr: *const ::std::ffi::c_void, cb: ::turso_ext::IOCallback) -> ::turso_ext::ResultCode {
if file_ptr.is_null() {
return -1;
return ::turso_ext::ResultCode::Error;
}
let callback = __get_cb(cb);
let vfs_file: &mut ::turso_ext::VfsFileImpl = &mut *(file_ptr as *mut ::turso_ext::VfsFileImpl);
let file: &mut <#struct_name as ::turso_ext::VfsExtension>::File =
&mut *(vfs_file.file as *mut <#struct_name as ::turso_ext::VfsExtension>::File);
if <#struct_name as ::turso_ext::VfsExtension>::File::sync(file).is_err() {
return -1;
if <#struct_name as ::turso_ext::VfsExtension>::File::sync(file, callback).is_err() {
return ::turso_ext::ResultCode::Error;
}
0
::turso_ext::ResultCode::OK
}
#[no_mangle]
pub unsafe extern "C" fn #trunc_fn_name(file_ptr: *const ::std::ffi::c_void, len: i64) -> ::turso_ext::ResultCode {
pub unsafe extern "C" fn #trunc_fn_name(file_ptr: *const ::std::ffi::c_void, len: i64, cb: ::turso_ext::IOCallback) -> ::turso_ext::ResultCode {
if file_ptr.is_null() {
return ::turso_ext::ResultCode::Error;
return turso_ext::ResultCode::Error;
}
let callback = __get_cb(cb);
let vfs_file: &mut ::turso_ext::VfsFileImpl = &mut *(file_ptr as *mut ::turso_ext::VfsFileImpl);
let file: &mut <#struct_name as ::turso_ext::VfsExtension>::File =
&mut *(vfs_file.file as *mut <#struct_name as ::turso_ext::VfsExtension>::File);
if <#struct_name as ::turso_ext::VfsExtension>::File::truncate(file, len).is_err() {
return ::turso_ext::ResultCode::Error;
if <#struct_name as ::turso_ext::VfsExtension>::File::truncate(file, len, callback).is_err() {
return turso_ext::ResultCode::Error;
}
::turso_ext::ResultCode::OK
}