diff --git a/extensions/tests/src/lib.rs b/extensions/tests/src/lib.rs index 0e60765b9..a4cb366ec 100644 --- a/extensions/tests/src/lib.rs +++ b/extensions/tests/src/lib.rs @@ -1,10 +1,10 @@ use std::cell::RefCell; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, VecDeque}; use std::fs::{File, OpenOptions}; use std::io::{Read, Seek, SeekFrom, Write}; use std::num::NonZeroUsize; use std::rc::Rc; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use turso_ext::{ register_extension, scalar, Connection, ConstraintInfo, ConstraintOp, ConstraintUsage, ExtResult, IndexInfo, OrderByInfo, ResultCode, StepResult, VTabCursor, VTabKind, VTabModule, @@ -265,7 +265,35 @@ impl VTable for KVStoreTable { } } +#[derive(Default, Clone)] +pub struct CallbackQueue { + queue: Arc>>, +} + +impl CallbackQueue { + pub fn new() -> Self { + Self { + queue: Arc::new(Mutex::new(VecDeque::new())), + } + } + + // Store a callback with its result for later execution + pub fn enqueue(&self, callback: Callback, result: i32) { + let mut queue = self.queue.lock().unwrap(); + queue.push_back((callback, result)); + } + + // Process all pending callbacks + pub fn process_all(&self) { + let mut queue = self.queue.lock().unwrap(); + while let Some((callback, result)) = queue.pop_front() { + callback(result); + } + } +} + pub struct TestFile { + io: CallbackQueue, file: File, } @@ -274,7 +302,9 @@ pub struct TestFS; #[cfg(not(target_family = "wasm"))] #[derive(VfsDerive, Default)] -pub struct TestFS; +pub struct TestFS { + callbacks: CallbackQueue, +} // Test that we can have additional extension types in the same file // and still register the vfs at comptime if linking staticly @@ -287,6 +317,12 @@ fn test_scalar(_args: turso_ext::Value) -> turso_ext::Value { impl VfsExtension for TestFS { const NAME: &'static str = "testvfs"; type File = TestFile; + fn run_once(&self) -> ExtResult<()> { + log::debug!("running once with testing VFS"); + self.callbacks.process_all(); + Ok(()) + } + fn open_file(&self, path: &str, flags: i32, _direct: bool) -> ExtResult { let _ = env_logger::try_init(); log::debug!("opening file with testing VFS: {path} flags: {flags}"); @@ -296,7 +332,10 @@ impl VfsExtension for TestFS { .create(flags & 1 != 0) .open(path) .map_err(|_| ResultCode::Error)?; - Ok(TestFile { file }) + Ok(TestFile { + file, + io: self.callbacks.clone(), + }) } } @@ -318,7 +357,7 @@ impl VfsFile for TestFile { .read(&mut buf[..len]) .map_err(|_| ResultCode::Error) .map(|n| n as i32)?; - cb(res as i32); + self.io.enqueue(cb, res); Ok(()) } @@ -336,14 +375,14 @@ impl VfsFile for TestFile { .write(&buf[..len]) .map_err(|_| ResultCode::Error) .map(|n| n as i32)?; - cb(n as i32); + self.io.enqueue(cb, n); Ok(()) } fn sync(&self, cb: Callback) -> ExtResult<()> { log::debug!("syncing file with testing VFS"); self.file.sync_all().map_err(|_| ResultCode::Error)?; - cb(0); + self.io.enqueue(cb, 0); Ok(()) } @@ -352,7 +391,7 @@ impl VfsFile for TestFile { self.file .set_len(len as u64) .map_err(|_| ResultCode::Error)?; - cb(0); + self.io.enqueue(cb, 0); Ok(()) }