diff --git a/core/io/vfs.rs b/core/io/vfs.rs index e7446550d..15e5a3853 100644 --- a/core/io/vfs.rs +++ b/core/io/vfs.rs @@ -1,10 +1,12 @@ use super::{Buffer, Completion, File, MemoryIO, OpenFlags, IO}; use crate::ext::VfsMod; use crate::io::clock::{Clock, Instant}; +use crate::io::CompletionInner; use crate::{LimboError, Result}; use std::ffi::{c_void, CString}; +use std::ptr::NonNull; use std::sync::Arc; -use turso_ext::{VfsFileImpl, VfsImpl}; +use turso_ext::{BufferRef, IOCallback, SendPtr, VfsFileImpl, VfsImpl}; impl Clock for VfsMod { fn now(&self) -> Instant { @@ -75,6 +77,24 @@ impl VfsMod { } } +// #Safety: +// the callback wrapper in the extension library is FnOnce, so we know +/// # Safety +/// the callback wrapper in the extension library is FnOnce, so we know +/// that the into_raw/from_raw contract will hold +unsafe extern "C" fn callback_fn(result: i32, ctx: SendPtr) { + let completion = Completion { + inner: (Arc::from_raw(ctx.inner().as_ptr() as *mut CompletionInner)), + }; + completion.complete(result); +} + +fn to_callback(c: Completion) -> IOCallback { + IOCallback::new(callback_fn, unsafe { + NonNull::new_unchecked(Arc::into_raw(c.inner) as *mut c_void) + }) +} + impl File for VfsFileImpl { fn lock_file(&self, exclusive: bool) -> Result<()> { let vfs = unsafe { &*self.vfs }; @@ -98,47 +118,64 @@ impl File for VfsFileImpl { } fn pread(&self, pos: usize, c: Completion) -> Result { - let r = c.as_read(); - let result = { - let buf = r.buf(); - let count = buf.len(); - let vfs = unsafe { &*self.vfs }; - unsafe { (vfs.read)(self.file, buf.as_mut_ptr(), count, pos as i64) } - }; - if result < 0 { - Err(LimboError::ExtensionError("pread failed".to_string())) - } else { - c.complete(result); - Ok(c) + if self.vfs.is_null() { + c.complete(-1); + return Err(LimboError::ExtensionError("VFS is null".to_string())); } + let r = c.as_read(); + let buf = r.buf(); + let len = buf.len(); + let cb = to_callback(c.clone()); + let vfs = unsafe { &*self.vfs }; + let res = unsafe { + (vfs.read)( + self.file, + BufferRef::new(buf.as_mut_ptr(), len), + pos as i64, + cb, + ) + }; + if res.is_error() { + return Err(LimboError::ExtensionError("pread failed".to_string())); + } + Ok(c) } fn pwrite(&self, pos: usize, buffer: Arc, c: Completion) -> Result { - let count = buffer.as_slice().len(); if self.vfs.is_null() { + c.complete(-1); return Err(LimboError::ExtensionError("VFS is null".to_string())); } let vfs = unsafe { &*self.vfs }; - let result = - unsafe { (vfs.write)(self.file, buffer.as_ptr() as *mut u8, count, pos as i64) }; - - if result < 0 { - Err(LimboError::ExtensionError("pwrite failed".to_string())) - } else { - c.complete(result); - Ok(c) + let res = unsafe { + let buf = buffer.clone(); + let len = buf.len(); + let cb = to_callback(c.clone()); + (vfs.write)( + self.file, + BufferRef::new(buf.as_ptr() as *mut u8, len), + pos as i64, + cb, + ) + }; + if res.is_error() { + return Err(LimboError::ExtensionError("pwrite failed".to_string())); } + Ok(c) } fn sync(&self, c: Completion) -> Result { - let vfs = unsafe { &*self.vfs }; - let result = unsafe { (vfs.sync)(self.file) }; - if result < 0 { - Err(LimboError::ExtensionError("sync failed".to_string())) - } else { - c.complete(0); - Ok(c) + if self.vfs.is_null() { + c.complete(-1); + return Err(LimboError::ExtensionError("VFS is null".to_string())); } + let vfs = unsafe { &*self.vfs }; + let cb = to_callback(c.clone()); + let res = unsafe { (vfs.sync)(self.file, cb) }; + if res.is_error() { + return Err(LimboError::ExtensionError("sync failed".to_string())); + } + Ok(c) } fn size(&self) -> Result { @@ -153,16 +190,16 @@ impl File for VfsFileImpl { fn truncate(&self, len: usize, c: Completion) -> Result { if self.vfs.is_null() { + c.complete(-1); return Err(LimboError::ExtensionError("VFS is null".to_string())); } let vfs = unsafe { &*self.vfs }; - let result = unsafe { (vfs.truncate)(self.file, len as i64) }; - if result.is_error() { - Err(LimboError::ExtensionError("truncate failed".to_string())) - } else { - c.complete(0); - Ok(c) + let cb = to_callback(c.clone()); + let res = unsafe { (vfs.truncate)(self.file, len as i64, cb) }; + if res.is_error() { + return Err(LimboError::ExtensionError("truncate failed".to_string())); } + Ok(c) } } diff --git a/extensions/core/src/lib.rs b/extensions/core/src/lib.rs index 29a5244ef..da2f03ff2 100644 --- a/extensions/core/src/lib.rs +++ b/extensions/core/src/lib.rs @@ -13,7 +13,10 @@ pub use turso_macros::VfsDerive; pub use turso_macros::{register_extension, scalar, AggregateDerive, VTabModuleDerive}; pub use types::{ResultCode, StepResult, Value, ValueType}; #[cfg(feature = "vfs")] -pub use vfs_modules::{RegisterVfsFn, VfsExtension, VfsFile, VfsFileImpl, VfsImpl, VfsInterface}; +pub use vfs_modules::{ + BufferRef, Callback, IOCallback, RegisterVfsFn, SendPtr, VfsExtension, VfsFile, VfsFileImpl, + VfsImpl, VfsInterface, +}; use vtabs::RegisterModuleFn; pub use vtabs::{ Conn, Connection, ConstraintInfo, ConstraintOp, ConstraintUsage, ExtIndexInfo, IndexInfo, diff --git a/extensions/core/src/vfs_modules.rs b/extensions/core/src/vfs_modules.rs index 1a5e48b25..8e7915e01 100644 --- a/extensions/core/src/vfs_modules.rs +++ b/extensions/core/src/vfs_modules.rs @@ -1,5 +1,9 @@ use crate::{ExtResult, ExtensionApi, ResultCode}; -use std::ffi::{c_char, c_void}; +use std::{ + ffi::{c_char, c_void}, + ops::{Deref, DerefMut}, + ptr::NonNull, +}; /// Field for ExtensionApi to interface with VFS extensions, /// separated to more easily feature flag out for WASM builds. @@ -38,10 +42,10 @@ pub trait VfsFile: Send + Sync { fn unlock(&self) -> ExtResult<()> { Ok(()) } - fn read(&mut self, buf: &mut [u8], count: usize, offset: i64) -> ExtResult; - fn write(&mut self, buf: &[u8], count: usize, offset: i64) -> ExtResult; - fn sync(&self) -> ExtResult<()>; - fn truncate(&self, len: i64) -> ExtResult<()>; + fn read(&mut self, buf: BufferRef, offset: i64, cb: Callback) -> ExtResult<()>; + fn write(&mut self, buf: BufferRef, offset: i64, cb: Callback) -> ExtResult<()>; + fn sync(&self, cb: Callback) -> ExtResult<()>; + fn truncate(&self, len: i64, cb: Callback) -> ExtResult<()>; fn size(&self) -> i64; } @@ -63,6 +67,93 @@ pub struct VfsImpl { pub truncate: VfsTruncate, } +/// a wrapper around the raw `*mut u8` buffer for extensions. +#[derive(Debug)] +#[repr(C)] +pub struct BufferRef { + _ptr: NonNull, + len: usize, +} + +unsafe impl Send for BufferRef {} +impl BufferRef { + /// create a new `BufferRef` from a raw pointer + /// + /// # Safety + /// The caller must ensure that the pointer is valid and the buffer is not deallocated. + /// This function should only be called with a pointer to a buffer allocated by the `Buffer` type defined in the `core` module. + pub unsafe fn new(ptr: *mut u8, len: usize) -> Self { + Self { + _ptr: NonNull::new(ptr).expect("Received null buffer pointer"), + len, + } + } + + pub fn len(&self) -> usize { + self.len + } + + pub fn is_empty(&self) -> bool { + self.len == 0 + } + + /// Get a safe slice reference to the buffer + pub fn as_slice(&self) -> &[u8] { + unsafe { std::slice::from_raw_parts(self._ptr.as_ptr(), self.len) } + } + + /// Get a safe mutable slice reference to the buffer + pub fn as_mut_slice(&mut self) -> &mut [u8] { + unsafe { std::slice::from_raw_parts_mut(self._ptr.as_ptr(), self.len) } + } +} + +impl Deref for BufferRef { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + self.as_slice() + } +} + +impl DerefMut for BufferRef { + fn deref_mut(&mut self) -> &mut Self::Target { + self.as_mut_slice() + } +} +pub type Callback = Box; + +#[repr(C)] +pub struct IOCallback { + pub callback: CallbackFn, + pub ctx: SendPtr, +} + +unsafe impl Send for IOCallback {} + +#[repr(transparent)] +/// Wrapper type to support creating Box obj +/// that needs to call a C function with an opaque pointer. +pub struct SendPtr(NonNull); +unsafe impl Send for SendPtr {} + +impl SendPtr { + pub fn inner(&self) -> NonNull { + self.0 + } +} + +impl IOCallback { + pub fn new(cb: CallbackFn, ctx: NonNull) -> Self { + Self { + callback: cb, + ctx: SendPtr(ctx), + } + } +} + +pub type CallbackFn = unsafe extern "C" fn(res: i32, user_data: SendPtr); + pub type RegisterVfsFn = unsafe extern "C" fn(name: *const c_char, vfs: *const VfsImpl) -> ResultCode; @@ -75,15 +166,24 @@ pub type VfsOpen = unsafe extern "C" fn( pub type VfsClose = unsafe extern "C" fn(file: *const c_void) -> ResultCode; -pub type VfsRead = - unsafe extern "C" fn(file: *const c_void, buf: *mut u8, count: usize, offset: i64) -> i32; +pub type VfsRead = unsafe extern "C" fn( + file: *const c_void, + buf: BufferRef, + offset: i64, + cb: IOCallback, +) -> ResultCode; -pub type VfsWrite = - unsafe extern "C" fn(file: *const c_void, buf: *const u8, count: usize, offset: i64) -> i32; +pub type VfsWrite = unsafe extern "C" fn( + file: *const c_void, + buf: BufferRef, + offset: i64, + cb: IOCallback, +) -> ResultCode; -pub type VfsSync = unsafe extern "C" fn(file: *const c_void) -> i32; +pub type VfsSync = unsafe extern "C" fn(file: *const c_void, cb: IOCallback) -> ResultCode; -pub type VfsTruncate = unsafe extern "C" fn(file: *const c_void, len: i64) -> ResultCode; +pub type VfsTruncate = + unsafe extern "C" fn(file: *const c_void, len: i64, cb: IOCallback) -> ResultCode; pub type VfsLock = unsafe extern "C" fn(file: *const c_void, exclusive: bool) -> ResultCode; diff --git a/extensions/tests/src/lib.rs b/extensions/tests/src/lib.rs index ffa2d03c3..a4cb366ec 100644 --- a/extensions/tests/src/lib.rs +++ b/extensions/tests/src/lib.rs @@ -1,17 +1,17 @@ use std::cell::RefCell; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, VecDeque}; use std::fs::{File, OpenOptions}; use std::io::{Read, Seek, SeekFrom, Write}; use std::num::NonZeroUsize; use std::rc::Rc; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use turso_ext::{ register_extension, scalar, Connection, ConstraintInfo, ConstraintOp, ConstraintUsage, ExtResult, IndexInfo, OrderByInfo, ResultCode, StepResult, VTabCursor, VTabKind, VTabModule, VTabModuleDerive, VTable, Value, }; #[cfg(not(target_family = "wasm"))] -use turso_ext::{VfsDerive, VfsExtension, VfsFile}; +use turso_ext::{BufferRef, Callback, VfsDerive, VfsExtension, VfsFile}; register_extension! { vtabs: { KVStoreVTabModule, TableStatsVtabModule }, @@ -265,7 +265,35 @@ impl VTable for KVStoreTable { } } +#[derive(Default, Clone)] +pub struct CallbackQueue { + queue: Arc>>, +} + +impl CallbackQueue { + pub fn new() -> Self { + Self { + queue: Arc::new(Mutex::new(VecDeque::new())), + } + } + + // Store a callback with its result for later execution + pub fn enqueue(&self, callback: Callback, result: i32) { + let mut queue = self.queue.lock().unwrap(); + queue.push_back((callback, result)); + } + + // Process all pending callbacks + pub fn process_all(&self) { + let mut queue = self.queue.lock().unwrap(); + while let Some((callback, result)) = queue.pop_front() { + callback(result); + } + } +} + pub struct TestFile { + io: CallbackQueue, file: File, } @@ -274,7 +302,9 @@ pub struct TestFS; #[cfg(not(target_family = "wasm"))] #[derive(VfsDerive, Default)] -pub struct TestFS; +pub struct TestFS { + callbacks: CallbackQueue, +} // Test that we can have additional extension types in the same file // and still register the vfs at comptime if linking staticly @@ -287,6 +317,12 @@ fn test_scalar(_args: turso_ext::Value) -> turso_ext::Value { impl VfsExtension for TestFS { const NAME: &'static str = "testvfs"; type File = TestFile; + fn run_once(&self) -> ExtResult<()> { + log::debug!("running once with testing VFS"); + self.callbacks.process_all(); + Ok(()) + } + fn open_file(&self, path: &str, flags: i32, _direct: bool) -> ExtResult { let _ = env_logger::try_init(); log::debug!("opening file with testing VFS: {path} flags: {flags}"); @@ -296,42 +332,67 @@ impl VfsExtension for TestFS { .create(flags & 1 != 0) .open(path) .map_err(|_| ResultCode::Error)?; - Ok(TestFile { file }) + Ok(TestFile { + file, + io: self.callbacks.clone(), + }) } } #[cfg(not(target_family = "wasm"))] impl VfsFile for TestFile { - fn read(&mut self, buf: &mut [u8], count: usize, offset: i64) -> ExtResult { - log::debug!("reading file with testing VFS: bytes: {count} offset: {offset}"); + fn read(&mut self, mut buf: BufferRef, offset: i64, cb: Callback) -> ExtResult<()> { + log::debug!( + "reading file with testing VFS: bytes: {} offset: {}", + buf.len(), + offset + ); if self.file.seek(SeekFrom::Start(offset as u64)).is_err() { return Err(ResultCode::Error); } - self.file - .read(&mut buf[..count]) + let len = buf.len(); + let buf = buf.as_mut_slice(); + let res = self + .file + .read(&mut buf[..len]) .map_err(|_| ResultCode::Error) - .map(|n| n as i32) + .map(|n| n as i32)?; + self.io.enqueue(cb, res); + Ok(()) } - fn write(&mut self, buf: &[u8], count: usize, offset: i64) -> ExtResult { - log::debug!("writing to file with testing VFS: bytes: {count} offset: {offset}"); + fn write(&mut self, buf: turso_ext::BufferRef, offset: i64, cb: Callback) -> ExtResult<()> { + log::debug!( + "writing to file with testing VFS: bytes: {} offset: {offset}", + buf.len() + ); if self.file.seek(SeekFrom::Start(offset as u64)).is_err() { return Err(ResultCode::Error); } - self.file - .write(&buf[..count]) + let len = buf.len(); + let n = self + .file + .write(&buf[..len]) .map_err(|_| ResultCode::Error) - .map(|n| n as i32) + .map(|n| n as i32)?; + self.io.enqueue(cb, n); + Ok(()) } - fn sync(&self) -> ExtResult<()> { + fn sync(&self, cb: Callback) -> ExtResult<()> { log::debug!("syncing file with testing VFS"); - self.file.sync_all().map_err(|_| ResultCode::Error) + self.file.sync_all().map_err(|_| ResultCode::Error)?; + self.io.enqueue(cb, 0); + Ok(()) } - fn truncate(&self, len: i64) -> ExtResult<()> { + fn truncate(&self, len: i64, cb: Callback) -> ExtResult<()> { log::debug!("truncating file with testing VFS to length: {len}"); - self.file.set_len(len as u64).map_err(|_| ResultCode::Error) + self.file + .set_len(len as u64) + .map_err(|_| ResultCode::Error)?; + self.io.enqueue(cb, 0); + Ok(()) } fn size(&self) -> i64 { diff --git a/macros/src/ext/vfs_derive.rs b/macros/src/ext/vfs_derive.rs index 5e813410a..2f3befa90 100644 --- a/macros/src/ext/vfs_derive.rs +++ b/macros/src/ext/vfs_derive.rs @@ -70,6 +70,13 @@ pub fn derive_vfs_module(input: TokenStream) -> TokenStream { (api.vfs_interface.register_vfs)(name, vfsimpl) } + fn __get_cb(cb: ::turso_ext::IOCallback) -> ::std::boxed::Box { + let callback: ::std::boxed::Box = ::std::boxed::Box::new(move | res: i32| { + unsafe { (cb.callback)(res, cb.ctx) }; + }); + callback + } + #[no_mangle] pub unsafe extern "C" fn #open_fn_name( ctx: *const ::std::ffi::c_void, @@ -109,18 +116,19 @@ pub fn derive_vfs_module(input: TokenStream) -> TokenStream { ::turso_ext::ResultCode::OK } - #[no_mangle] - pub unsafe extern "C" fn #read_fn_name(file_ptr: *const ::std::ffi::c_void, buf: *mut u8, count: usize, offset: i64) -> i32 { + #[no_mangle] + pub unsafe extern "C" fn #read_fn_name(file_ptr: *const ::std::ffi::c_void, buf: ::turso_ext::BufferRef, offset: i64, cb: ::turso_ext::IOCallback) -> ::turso_ext::ResultCode { if file_ptr.is_null() { - return -1; + return ::turso_ext::ResultCode::Error; } + let callback = __get_cb(cb); let vfs_file: &mut ::turso_ext::VfsFileImpl = &mut *(file_ptr as *mut ::turso_ext::VfsFileImpl); let file: &mut <#struct_name as ::turso_ext::VfsExtension>::File = &mut *(vfs_file.file as *mut <#struct_name as ::turso_ext::VfsExtension>::File); - match <#struct_name as ::turso_ext::VfsExtension>::File::read(file, ::std::slice::from_raw_parts_mut(buf, count), count, offset) { - Ok(n) => n, - Err(_) => -1, + if <#struct_name as ::turso_ext::VfsExtension>::File::read(file, buf, offset, callback).is_err() { + return ::turso_ext::ResultCode::Error; } + ::turso_ext::ResultCode::OK } #[no_mangle] @@ -136,17 +144,18 @@ pub fn derive_vfs_module(input: TokenStream) -> TokenStream { } #[no_mangle] - pub unsafe extern "C" fn #write_fn_name(file_ptr: *const ::std::ffi::c_void, buf: *const u8, count: usize, offset: i64) -> i32 { + pub unsafe extern "C" fn #write_fn_name(file_ptr: *const ::std::ffi::c_void, buf: ::turso_ext::BufferRef, offset: i64, cb: ::turso_ext::IOCallback) -> ::turso_ext::ResultCode { if file_ptr.is_null() { - return -1; + return ::turso_ext::ResultCode::Error; } + let callback = __get_cb(cb); let vfs_file: &mut ::turso_ext::VfsFileImpl = &mut *(file_ptr as *mut ::turso_ext::VfsFileImpl); let file: &mut <#struct_name as ::turso_ext::VfsExtension>::File = &mut *(vfs_file.file as *mut <#struct_name as ::turso_ext::VfsExtension>::File); - match <#struct_name as ::turso_ext::VfsExtension>::File::write(file, ::std::slice::from_raw_parts(buf, count), count, offset) { - Ok(n) => n, - Err(_) => -1, + if <#struct_name as ::turso_ext::VfsExtension>::File::write(file, buf, offset, callback).is_err() { + return ::turso_ext::ResultCode::Error; } + ::turso_ext::ResultCode::OK } #[no_mangle] @@ -178,29 +187,31 @@ pub fn derive_vfs_module(input: TokenStream) -> TokenStream { } #[no_mangle] - pub unsafe extern "C" fn #sync_fn_name(file_ptr: *const ::std::ffi::c_void) -> i32 { + pub unsafe extern "C" fn #sync_fn_name(file_ptr: *const ::std::ffi::c_void, cb: ::turso_ext::IOCallback) -> ::turso_ext::ResultCode { if file_ptr.is_null() { - return -1; + return ::turso_ext::ResultCode::Error; } + let callback = __get_cb(cb); let vfs_file: &mut ::turso_ext::VfsFileImpl = &mut *(file_ptr as *mut ::turso_ext::VfsFileImpl); let file: &mut <#struct_name as ::turso_ext::VfsExtension>::File = &mut *(vfs_file.file as *mut <#struct_name as ::turso_ext::VfsExtension>::File); - if <#struct_name as ::turso_ext::VfsExtension>::File::sync(file).is_err() { - return -1; + if <#struct_name as ::turso_ext::VfsExtension>::File::sync(file, callback).is_err() { + return ::turso_ext::ResultCode::Error; } - 0 + ::turso_ext::ResultCode::OK } #[no_mangle] - pub unsafe extern "C" fn #trunc_fn_name(file_ptr: *const ::std::ffi::c_void, len: i64) -> ::turso_ext::ResultCode { + pub unsafe extern "C" fn #trunc_fn_name(file_ptr: *const ::std::ffi::c_void, len: i64, cb: ::turso_ext::IOCallback) -> ::turso_ext::ResultCode { if file_ptr.is_null() { - return ::turso_ext::ResultCode::Error; + return turso_ext::ResultCode::Error; } + let callback = __get_cb(cb); let vfs_file: &mut ::turso_ext::VfsFileImpl = &mut *(file_ptr as *mut ::turso_ext::VfsFileImpl); let file: &mut <#struct_name as ::turso_ext::VfsExtension>::File = &mut *(vfs_file.file as *mut <#struct_name as ::turso_ext::VfsExtension>::File); - if <#struct_name as ::turso_ext::VfsExtension>::File::truncate(file, len).is_err() { - return ::turso_ext::ResultCode::Error; + if <#struct_name as ::turso_ext::VfsExtension>::File::truncate(file, len, callback).is_err() { + return turso_ext::ResultCode::Error; } ::turso_ext::ResultCode::OK }