diff --git a/Cargo.lock b/Cargo.lock index 8ac5ac00f..3037bf399 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4331,6 +4331,7 @@ dependencies = [ "napi", "napi-build", "napi-derive", + "tracing", "tracing-subscriber", "turso_core", ] diff --git a/bindings/javascript/src/browser.rs b/bindings/javascript/src/browser.rs new file mode 100644 index 000000000..f9c6bffa9 --- /dev/null +++ b/bindings/javascript/src/browser.rs @@ -0,0 +1,254 @@ +use std::sync::Arc; + +use napi::bindgen_prelude::*; +use napi_derive::napi; +use turso_core::{storage::database::DatabaseFile, Clock, File, Instant, IO}; + +use crate::{init_tracing, is_memory, Database, DatabaseOpts}; + +pub struct NoopTask; + +impl Task for NoopTask { + type Output = (); + type JsValue = (); + fn compute(&mut self) -> Result { + Ok(()) + } + fn resolve(&mut self, _: Env, _: Self::Output) -> Result { + Ok(()) + } +} + +#[napi] +/// turso-db in the the browser requires explicit thread pool initialization +/// so, we just put no-op task on the thread pool and force emnapi to allocate web worker +pub fn init_thread_pool() -> napi::Result> { + Ok(AsyncTask::new(NoopTask)) +} + +pub struct ConnectTask { + path: String, + is_memory: bool, + io: Arc, +} + +pub struct ConnectResult { + db: Arc, + conn: Arc, +} + +unsafe impl Send for ConnectResult {} + +impl Task for ConnectTask { + type Output = ConnectResult; + type JsValue = Database; + + fn compute(&mut self) -> Result { + let file = self + .io + .open_file(&self.path, turso_core::OpenFlags::Create, false) + .map_err(|e| Error::new(Status::GenericFailure, format!("Failed to open file: {e}")))?; + + let db_file = Arc::new(DatabaseFile::new(file)); + let db = turso_core::Database::open(self.io.clone(), &self.path, db_file, false, true) + .map_err(|e| { + Error::new( + Status::GenericFailure, + format!("Failed to open database: {e}"), + ) + })?; + + let conn = db + .connect() + .map_err(|e| Error::new(Status::GenericFailure, format!("Failed to connect: {e}")))?; + + Ok(ConnectResult { db, conn }) + } + + fn resolve(&mut self, _: Env, result: Self::Output) -> Result { + Ok(Database::create( + Some(result.db), + self.io.clone(), + result.conn, + self.is_memory, + )) + } +} + +#[napi] +// we offload connect to the web-worker because: +// 1. browser main-thread do not support Atomic.wait operations +// 2. turso-db use blocking IO [io.wait_for_completion(c)] in few places during initialization path +// +// so, we offload connect to the worker thread +pub fn connect(path: String, opts: Option) -> Result> { + if let Some(opts) = opts { + init_tracing(opts.tracing); + } + let task = if is_memory(&path) { + ConnectTask { + io: Arc::new(turso_core::MemoryIO::new()), + is_memory: true, + path, + } + } else { + let io = Arc::new(Opfs::new()?); + ConnectTask { + io, + is_memory: false, + path, + } + }; + Ok(AsyncTask::new(task)) +} +#[napi] +#[derive(Clone)] +pub struct Opfs; + +#[napi] +#[derive(Clone)] +struct OpfsFile { + handle: i32, +} + +#[napi] +impl Opfs { + #[napi(constructor)] + pub fn new() -> napi::Result { + Ok(Self) + } +} + +impl Clock for Opfs { + fn now(&self) -> Instant { + Instant { secs: 0, micros: 0 } // TODO + } +} + +#[link(wasm_import_module = "env")] +extern "C" { + fn lookup_file(path: *const u8, path_len: usize) -> i32; + fn read(handle: i32, buffer: *mut u8, buffer_len: usize, offset: i32) -> i32; + fn write(handle: i32, buffer: *const u8, buffer_len: usize, offset: i32) -> i32; + fn sync(handle: i32) -> i32; + fn truncate(handle: i32, length: usize) -> i32; + fn size(handle: i32) -> i32; + fn is_web_worker() -> bool; +} + +fn is_web_worker_safe() -> bool { + unsafe { is_web_worker() } +} + +impl IO for Opfs { + fn open_file( + &self, + path: &str, + _: turso_core::OpenFlags, + _: bool, + ) -> turso_core::Result> { + tracing::info!("open_file: {}", path); + let result = unsafe { lookup_file(path.as_ptr(), path.len()) }; + if result >= 0 { + Ok(Arc::new(OpfsFile { handle: result })) + } else if result == -404 { + Err(turso_core::LimboError::InternalError( + "files must be created in advance for OPFS IO".to_string(), + )) + } else { + Err(turso_core::LimboError::InternalError(format!( + "unexpected file lookup error: {result}" + ))) + } + } + + fn remove_file(&self, _: &str) -> turso_core::Result<()> { + Ok(()) + } +} + +impl File for OpfsFile { + fn lock_file(&self, _: bool) -> turso_core::Result<()> { + Ok(()) + } + + fn unlock_file(&self) -> turso_core::Result<()> { + Ok(()) + } + + fn pread( + &self, + pos: u64, + c: turso_core::Completion, + ) -> turso_core::Result { + assert!( + is_web_worker_safe(), + "opfs must be used only from web worker for now" + ); + tracing::debug!("pread({}): pos={}", self.handle, pos); + let handle = self.handle; + let read_c = c.as_read(); + let buffer = read_c.buf_arc(); + let buffer = buffer.as_mut_slice(); + let result = unsafe { read(handle, buffer.as_mut_ptr(), buffer.len(), pos as i32) }; + c.complete(result as i32); + Ok(c) + } + + fn pwrite( + &self, + pos: u64, + buffer: Arc, + c: turso_core::Completion, + ) -> turso_core::Result { + assert!( + is_web_worker_safe(), + "opfs must be used only from web worker for now" + ); + tracing::debug!("pwrite({}): pos={}", self.handle, pos); + let handle = self.handle; + let buffer = buffer.as_slice(); + let result = unsafe { write(handle, buffer.as_ptr(), buffer.len(), pos as i32) }; + c.complete(result as i32); + Ok(c) + } + + fn sync(&self, c: turso_core::Completion) -> turso_core::Result { + assert!( + is_web_worker_safe(), + "opfs must be used only from web worker for now" + ); + tracing::debug!("sync({})", self.handle); + let handle = self.handle; + let result = unsafe { sync(handle) }; + c.complete(result as i32); + Ok(c) + } + + fn truncate( + &self, + len: u64, + c: turso_core::Completion, + ) -> turso_core::Result { + assert!( + is_web_worker_safe(), + "opfs must be used only from web worker for now" + ); + tracing::debug!("truncate({}): len={}", self.handle, len); + let handle = self.handle; + let result = unsafe { truncate(handle, len as usize) }; + c.complete(result as i32); + Ok(c) + } + + fn size(&self) -> turso_core::Result { + assert!( + is_web_worker_safe(), + "size can be called only from web worker context" + ); + tracing::debug!("size({})", self.handle); + let handle = self.handle; + let result = unsafe { size(handle) }; + Ok(result as u64) + } +} diff --git a/bindings/javascript/src/lib.rs b/bindings/javascript/src/lib.rs index 3b0d8a466..36d99430b 100644 --- a/bindings/javascript/src/lib.rs +++ b/bindings/javascript/src/lib.rs @@ -10,14 +10,20 @@ //! - Iterating through query results //! - Managing the I/O event loop +#[cfg(feature = "browser")] +pub mod browser; + use napi::bindgen_prelude::*; use napi::{Env, Task}; use napi_derive::napi; +use std::sync::OnceLock; use std::{ cell::{Cell, RefCell}, num::NonZeroUsize, sync::Arc, }; +use tracing_subscriber::filter::LevelFilter; +use tracing_subscriber::fmt::format::FmtSpan; /// Step result constants const STEP_ROW: u32 = 1; @@ -38,12 +44,109 @@ enum PresentationMode { pub struct Database { _db: Option>, io: Arc, - conn: Arc, + conn: Option>, is_memory: bool, is_open: Cell, default_safe_integers: Cell, } +pub(crate) fn is_memory(path: &str) -> bool { + path == ":memory:" +} + +static TRACING_INIT: OnceLock<()> = OnceLock::new(); +pub(crate) fn init_tracing(level_filter: Option) { + let Some(level_filter) = level_filter else { + return; + }; + let level_filter = match level_filter.as_ref() { + "info" => LevelFilter::INFO, + "debug" => LevelFilter::DEBUG, + "trace" => LevelFilter::TRACE, + _ => return, + }; + TRACING_INIT.get_or_init(|| { + tracing_subscriber::fmt() + .with_ansi(false) + .with_thread_ids(true) + .with_span_events(FmtSpan::ACTIVE) + .with_max_level(level_filter) + .init(); + }); +} + +pub enum DbTask { + Batch { + conn: Arc, + sql: String, + }, + Step { + stmt: Arc>>, + }, +} + +unsafe impl Send for DbTask {} + +impl Task for DbTask { + type Output = u32; + type JsValue = u32; + + fn compute(&mut self) -> Result { + match self { + DbTask::Batch { conn, sql } => { + batch_sync(conn, sql)?; + Ok(0) + } + DbTask::Step { stmt } => step_sync(stmt), + } + } + + fn resolve(&mut self, _: Env, output: Self::Output) -> Result { + Ok(output) + } +} + +#[napi(object)] +pub struct DatabaseOpts { + pub tracing: Option, +} + +fn batch_sync(conn: &Arc, sql: &str) -> napi::Result<()> { + conn.prepare_execute_batch(&sql).map_err(|e| { + Error::new( + Status::GenericFailure, + format!("Failed to execute batch: {e}"), + ) + })?; + Ok(()) +} + +fn step_sync(stmt: &Arc>>) -> napi::Result { + let mut stmt_ref = stmt.borrow_mut(); + let stmt = stmt_ref + .as_mut() + .ok_or_else(|| Error::new(Status::GenericFailure, "Statement has been finalized"))?; + + let final_result = match stmt.step() { + Ok(turso_core::StepResult::Row) => return Ok(STEP_ROW), + Ok(turso_core::StepResult::IO) => return Ok(STEP_IO), + Ok(turso_core::StepResult::Done) => Ok(STEP_DONE), + Ok(turso_core::StepResult::Interrupt) => Err(Error::new( + Status::GenericFailure, + "Statement was interrupted", + )), + Ok(turso_core::StepResult::Busy) => { + Err(Error::new(Status::GenericFailure, "Database is busy")) + } + Err(e) => Err(Error::new( + Status::GenericFailure, + format!("Step failed: {e}"), + )), + }; + let _ = stmt_ref.take(); + final_result +} + #[napi] impl Database { /// Creates a new database instance. @@ -51,9 +154,11 @@ impl Database { /// # Arguments /// * `path` - The path to the database file. #[napi(constructor)] - pub fn new(path: String) -> Result { - let is_memory = path == ":memory:"; - let io: Arc = if is_memory { + pub fn new(path: String, opts: Option) -> Result { + if let Some(opts) = opts { + init_tracing(opts.tracing); + } + let io: Arc = if is_memory(&path) { Arc::new(turso_core::MemoryIO::new()) } else { Arc::new(turso_core::PlatformIO::new().map_err(|e| { @@ -61,6 +166,11 @@ impl Database { })?) }; + #[cfg(feature = "browser")] + if !is_memory(&path) { + return Err(Error::new(Status::GenericFailure, format!("sync constructor is not supported for FS-backed databases in the WASM. Use async connect(...) method instead"))); + } + let file = io .open_file(&path, turso_core::OpenFlags::Create, false) .map_err(|e| Error::new(Status::GenericFailure, format!("Failed to open file: {e}")))?; @@ -78,7 +188,7 @@ impl Database { .connect() .map_err(|e| Error::new(Status::GenericFailure, format!("Failed to connect: {e}")))?; - Ok(Self::create(Some(db), io, conn, is_memory)) + Ok(Self::create(Some(db), io, conn, is_memory(&path))) } pub fn create( @@ -90,13 +200,23 @@ impl Database { Database { _db: db, io, - conn, + conn: Some(conn), is_memory, is_open: Cell::new(true), default_safe_integers: Cell::new(false), } } + fn conn(&self) -> Result> { + let Some(conn) = self.conn.as_ref() else { + return Err(napi::Error::new( + napi::Status::GenericFailure, + "connection is not set", + )); + }; + Ok(conn.clone()) + } + /// Returns whether the database is in memory-only mode. #[napi(getter)] pub fn memory(&self) -> bool { @@ -109,7 +229,7 @@ impl Database { self.is_open.get() } - /// Executes a batch of SQL statements. + /// Executes a batch of SQL statements on main thread /// /// # Arguments /// @@ -117,14 +237,23 @@ impl Database { /// /// # Returns #[napi] - pub fn batch(&self, sql: String) -> Result<()> { - self.conn.prepare_execute_batch(&sql).map_err(|e| { - Error::new( - Status::GenericFailure, - format!("Failed to execute batch: {e}"), - ) - })?; - Ok(()) + pub fn batch_sync(&self, sql: String) -> Result<()> { + batch_sync(&self.conn()?, &sql) + } + + /// Executes a batch of SQL statements outside of main thread + /// + /// # Arguments + /// + /// * `sql` - The SQL statements to execute. + /// + /// # Returns + #[napi] + pub fn batch_async(&self, sql: String) -> Result> { + Ok(AsyncTask::new(DbTask::Batch { + conn: self.conn()?.clone(), + sql: sql, + })) } /// Prepares a statement for execution. @@ -139,14 +268,14 @@ impl Database { #[napi] pub fn prepare(&self, sql: String) -> Result { let stmt = self - .conn + .conn()? .prepare(&sql) .map_err(|e| Error::new(Status::GenericFailure, format!("{e}")))?; let column_names: Vec = (0..stmt.num_columns()) .map(|i| std::ffi::CString::new(stmt.get_column_name(i).to_string()).unwrap()) .collect(); Ok(Statement { - stmt: RefCell::new(Some(stmt)), + stmt: Arc::new(RefCell::new(Some(stmt))), column_names, mode: RefCell::new(PresentationMode::Expanded), safe_integers: Cell::new(self.default_safe_integers.get()), @@ -160,7 +289,7 @@ impl Database { /// The rowid of the last row inserted. #[napi] pub fn last_insert_rowid(&self) -> Result { - Ok(self.conn.last_insert_rowid()) + Ok(self.conn()?.last_insert_rowid()) } /// Returns the number of changes made by the last statement. @@ -170,7 +299,7 @@ impl Database { /// The number of changes made by the last statement. #[napi] pub fn changes(&self) -> Result { - Ok(self.conn.changes()) + Ok(self.conn()?.changes()) } /// Returns the total number of changes made by all statements. @@ -180,7 +309,7 @@ impl Database { /// The total number of changes made by all statements. #[napi] pub fn total_changes(&self) -> Result { - Ok(self.conn.total_changes()) + Ok(self.conn()?.total_changes()) } /// Closes the database connection. @@ -189,9 +318,10 @@ impl Database { /// /// `Ok(())` if the database is closed successfully. #[napi] - pub fn close(&self) -> Result<()> { + pub fn close(&mut self) -> Result<()> { self.is_open.set(false); - // Database close is handled automatically when dropped + let _ = self._db.take().unwrap(); + let _ = self.conn.take().unwrap(); Ok(()) } @@ -225,7 +355,7 @@ impl Database { /// A prepared statement. #[napi] pub struct Statement { - stmt: RefCell>, + stmt: Arc>>, column_names: Vec, mode: RefCell, safe_integers: Cell, @@ -344,31 +474,20 @@ impl Statement { Ok(()) } - /// Step the statement and return result code: + /// Step the statement and return result code (executed on the main thread): /// 1 = Row available, 2 = Done, 3 = I/O needed #[napi] - pub fn step(&self) -> Result { - let mut stmt_ref = self.stmt.borrow_mut(); - let stmt = stmt_ref - .as_mut() - .ok_or_else(|| Error::new(Status::GenericFailure, "Statement has been finalized"))?; + pub fn step_sync(&self) -> Result { + step_sync(&self.stmt) + } - match stmt.step() { - Ok(turso_core::StepResult::Row) => Ok(STEP_ROW), - Ok(turso_core::StepResult::Done) => Ok(STEP_DONE), - Ok(turso_core::StepResult::IO) => Ok(STEP_IO), - Ok(turso_core::StepResult::Interrupt) => Err(Error::new( - Status::GenericFailure, - "Statement was interrupted", - )), - Ok(turso_core::StepResult::Busy) => { - Err(Error::new(Status::GenericFailure, "Database is busy")) - } - Err(e) => Err(Error::new( - Status::GenericFailure, - format!("Step failed: {e}"), - )), - } + /// Step the statement and return result code (executed on the background thread): + /// 1 = Row available, 2 = Done, 3 = I/O needed + #[napi] + pub fn step_async(&self) -> Result> { + Ok(AsyncTask::new(DbTask::Step { + stmt: self.stmt.clone(), + })) } /// Get the current row data according to the presentation mode @@ -543,8 +662,17 @@ fn to_js_value<'a>( turso_core::Value::Float(f) => ToNapiValue::into_unknown(*f, env), turso_core::Value::Text(s) => ToNapiValue::into_unknown(s.as_str(), env), turso_core::Value::Blob(b) => { - let buffer = Buffer::from(b.as_slice()); - ToNapiValue::into_unknown(buffer, env) + #[cfg(not(feature = "browser"))] + { + let buffer = Buffer::from(b.as_slice()); + ToNapiValue::into_unknown(buffer, env) + } + // emnapi do not support Buffer + #[cfg(feature = "browser")] + { + let buffer = Uint8Array::from(b.as_slice()); + ToNapiValue::into_unknown(buffer, env) + } } } }