From cd9cf715681d3165fe2265f7a66bfb11f24fa9c4 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Wed, 24 Sep 2025 17:06:52 +0400 Subject: [PATCH] cleanup napi-rs bindings Rust code --- bindings/javascript/src/browser.rs | 47 +--- bindings/javascript/src/lib.rs | 422 +++++++++++++++-------------- 2 files changed, 221 insertions(+), 248 deletions(-) diff --git a/bindings/javascript/src/browser.rs b/bindings/javascript/src/browser.rs index 16b071ec8..53d7f636f 100644 --- a/bindings/javascript/src/browser.rs +++ b/bindings/javascript/src/browser.rs @@ -2,9 +2,7 @@ use std::{cell::RefCell, collections::HashMap, sync::Arc}; use napi::bindgen_prelude::*; use napi_derive::napi; -use turso_core::{Clock, Completion, File, Instant, MemoryIO, IO}; - -use crate::{is_memory, Database, DatabaseOpts}; +use turso_core::{Clock, Completion, File, Instant, IO}; pub struct NoopTask; @@ -26,32 +24,6 @@ pub fn init_thread_pool() -> napi::Result> { Ok(AsyncTask::new(NoopTask)) } -pub struct ConnectTask { - path: String, - io: Arc, - opts: Option, -} - -pub struct ConnectResult { - db: Database, -} - -unsafe impl Send for ConnectResult {} - -impl Task for ConnectTask { - type Output = ConnectResult; - type JsValue = Database; - - fn compute(&mut self) -> Result { - let db = Database::new_io(self.path.clone(), self.io.clone(), self.opts.clone())?; - Ok(ConnectResult { db }) - } - - fn resolve(&mut self, _: Env, result: Self::Output) -> Result { - Ok(result.db) - } -} - #[napi] #[derive(Clone)] pub struct Opfs { @@ -77,23 +49,6 @@ struct OpfsFile { unsafe impl Send for Opfs {} unsafe impl Sync for Opfs {} -#[napi] -// we offload connect to the web-worker because -// turso-db use blocking IO [io.wait_for_completion(c)] in few places during initialization path -pub fn connect_db_async( - path: String, - opts: Option, -) -> Result> { - let io: Arc = if is_memory(&path) { - Arc::new(MemoryIO::new()) - } else { - // we must create OPFS IO on the main thread - opfs() - }; - let task = ConnectTask { path, io, opts }; - Ok(AsyncTask::new(task)) -} - #[napi] pub fn complete_opfs(completion_no: u32, result: i32) { OPFS.with(|opfs| opfs.complete(completion_no, result)) diff --git a/bindings/javascript/src/lib.rs b/bindings/javascript/src/lib.rs index df4724e6e..5d202b78b 100644 --- a/bindings/javascript/src/lib.rs +++ b/bindings/javascript/src/lib.rs @@ -10,7 +10,7 @@ //! - Iterating through query results //! - Managing the I/O event loop -#[cfg(feature = "browser")] +// #[cfg(feature = "browser")] pub mod browser; use napi::bindgen_prelude::*; @@ -24,6 +24,7 @@ use std::{ }; use tracing_subscriber::filter::LevelFilter; use tracing_subscriber::fmt::format::FmtSpan; +use turso_core::storage::database::DatabaseFile; /// Step result constants const STEP_ROW: u32 = 1; @@ -42,20 +43,27 @@ enum PresentationMode { #[napi] #[derive(Clone)] pub struct Database { - _db: Option>, - io: Arc, - conn: Option>, - path: String, - is_open: Cell, + inner: Option>, default_safe_integers: Cell, } +/// database inner is Send to the worker for initial connection +/// that's why we use OnceLock here - in order to make DatabaseInner Send and Sync +pub struct DatabaseInner { + path: String, + opts: Option, + io: Arc, + db: OnceLock>>, + conn: OnceLock>>, + is_connected: OnceLock, +} + pub(crate) fn is_memory(path: &str) -> bool { path == ":memory:" } static TRACING_INIT: OnceLock<()> = OnceLock::new(); -pub(crate) fn init_tracing(level_filter: Option) { +pub(crate) fn init_tracing(level_filter: &Option) { let Some(level_filter) = level_filter else { return; }; @@ -77,21 +85,23 @@ pub(crate) fn init_tracing(level_filter: Option) { }); } -pub enum DbTask { - Step { - stmt: Arc>>, - }, -} - +// for now we make DbTask unsound as turso_core::Database and turso_core::Connection are not fully thread-safe unsafe impl Send for DbTask {} +pub enum DbTask { + Connect { db: Arc }, +} + impl Task for DbTask { type Output = u32; type JsValue = u32; fn compute(&mut self) -> Result { match self { - DbTask::Step { stmt } => step_sync(stmt), + DbTask::Connect { db } => { + connect_sync(db)?; + Ok(0) + } } } @@ -100,9 +110,14 @@ impl Task for DbTask { } } +/// Most of the options are aligned with better-sqlite API +/// (see https://github.com/WiseLibs/better-sqlite3/blob/master/docs/api.md#new-databasepath-options) #[napi(object)] #[derive(Clone)] pub struct DatabaseOpts { + pub readonly: Option, + pub timeout: Option, + pub file_must_exist: Option, pub tracing: Option, } @@ -110,26 +125,95 @@ fn step_sync(stmt: &Arc>>) -> napi::Result let mut stmt_ref = stmt.borrow_mut(); let stmt = stmt_ref .as_mut() - .ok_or_else(|| Error::new(Status::GenericFailure, "Statement has been finalized"))?; + .ok_or_else(|| create_generic_error("statement has been finalized"))?; match stmt.step() { Ok(turso_core::StepResult::Row) => Ok(STEP_ROW), Ok(turso_core::StepResult::IO) => Ok(STEP_IO), Ok(turso_core::StepResult::Done) => Ok(STEP_DONE), - Ok(turso_core::StepResult::Interrupt) => Err(Error::new( - Status::GenericFailure, - "Statement was interrupted", - )), - Ok(turso_core::StepResult::Busy) => { - Err(Error::new(Status::GenericFailure, "Database is busy")) + Ok(turso_core::StepResult::Interrupt) => { + Err(create_generic_error("statement was interrupted")) } - Err(e) => Err(Error::new( - Status::GenericFailure, - format!("Step failed: {e}"), - )), + Ok(turso_core::StepResult::Busy) => Err(create_generic_error("database is busy")), + Err(e) => Err(to_generic_error("step failed", e)), } } +fn to_generic_error(message: &str, e: E) -> napi::Error { + Error::new(Status::GenericFailure, format!("{message}: {e}")) +} + +fn to_error(status: napi::Status, message: &str, e: E) -> napi::Error { + Error::new(status, format!("{message}: {e}")) +} + +fn create_generic_error(message: &str) -> napi::Error { + Error::new(Status::GenericFailure, message) +} + +fn create_error(status: napi::Status, message: &str) -> napi::Error { + Error::new(status, message) +} + +fn connect_sync(db: &DatabaseInner) -> napi::Result<()> { + if db.is_connected.get() == Some(&true) { + return Ok(()); + } + + let mut flags = turso_core::OpenFlags::Create; + let mut busy_timeout = None; + if let Some(opts) = &db.opts { + init_tracing(&opts.tracing); + if opts.readonly == Some(true) { + flags.set(turso_core::OpenFlags::ReadOnly, true); + flags.set(turso_core::OpenFlags::Create, false); + } + if opts.file_must_exist == Some(true) { + flags.set(turso_core::OpenFlags::Create, false); + } + if let Some(timeout) = opts.timeout { + busy_timeout = Some(std::time::Duration::from_millis(timeout as u64)); + } + } + tracing::info!("flags: {:?}", flags); + let io = &db.io; + let file = io + .open_file(&db.path, flags, false) + .map_err(|e| to_generic_error("failed to open file", e))?; + + let db_file = Arc::new(DatabaseFile::new(file)); + let db_core = turso_core::Database::open_with_flags( + io.clone(), + &db.path, + db_file, + flags, + turso_core::DatabaseOpts::new() + .with_mvcc(false) + .with_indexes(true), + None, + ) + .map_err(|e| to_generic_error("failed to open database", e))?; + + let conn = db_core + .connect() + .map_err(|e| to_generic_error("failed to connect", e))?; + + if let Some(busy_timeout) = busy_timeout { + conn.set_busy_timeout(busy_timeout); + } + + db.is_connected + .set(true) + .map_err(|_| create_generic_error("db already connected, API misuse"))?; + db.db + .set(Some(db_core)) + .map_err(|_| create_generic_error("db already connected, API misuse"))?; + db.conn + .set(Some(conn)) + .map_err(|_| create_generic_error("db already connected, API misuse"))?; + Ok(()) +} + #[napi] impl Database { /// Creates a new database instance. @@ -137,102 +221,128 @@ impl Database { /// # Arguments /// * `path` - The path to the database file. #[napi(constructor)] - pub fn new_napi(path: String, opts: Option) -> Result { + pub fn new_napi(path: String, opts: Option) -> napi::Result { Self::new(path, opts) } - pub fn new(path: String, opts: Option) -> Result { + pub fn new(path: String, opts: Option) -> napi::Result { let io: Arc = if is_memory(&path) { Arc::new(turso_core::MemoryIO::new()) } else { #[cfg(not(feature = "browser"))] { - Arc::new(turso_core::PlatformIO::new().map_err(|e| { - Error::new(Status::GenericFailure, format!("Failed to create IO: {e}")) - })?) + Arc::new( + turso_core::PlatformIO::new() + .map_err(|e| to_generic_error("failed to create IO", e))?, + ) } #[cfg(feature = "browser")] { - return Err(napi::Error::new( - napi::Status::GenericFailure, - "FS-backed db must be initialized through connectDbAsync function in the browser", - )); + browser::opfs() } }; - Self::new_io(path, io, opts) + Self::new_with_io(path, io, opts) } - pub fn new_io( + pub fn new_with_io( path: String, io: Arc, opts: Option, - ) -> Result { - if let Some(opts) = opts { - init_tracing(opts.tracing); - } - - let file = io - .open_file(&path, turso_core::OpenFlags::Create, false) - .map_err(|e| Error::new(Status::GenericFailure, format!("Failed to open file: {e}")))?; - - let db_file = Arc::new(DatabaseFile::new(file)); - let db = - turso_core::Database::open(io.clone(), &path, db_file, false, true).map_err(|e| { - Error::new( - Status::GenericFailure, - format!("Failed to open database: {e}"), - ) - })?; - - let conn = db - .connect() - .map_err(|e| Error::new(Status::GenericFailure, format!("Failed to connect: {e}")))?; - - Ok(Self::create(Some(db), io, conn, path)) + ) -> napi::Result { + Ok(Self { + inner: Some(Arc::new(DatabaseInner { + path, + opts, + io, + db: OnceLock::new(), + conn: OnceLock::new(), + is_connected: OnceLock::new(), + })), + default_safe_integers: Cell::new(false), + }) } - pub fn create( - db: Option>, + pub fn new_connected( + path: String, io: Arc, conn: Arc, - path: String, ) -> Self { + let db_once = OnceLock::new(); + db_once.set(None).unwrap(); + + let conn_once = OnceLock::new(); + conn_once.set(Some(conn)).ok().unwrap(); + + let is_connected_once = OnceLock::new(); + is_connected_once.set(true).unwrap(); Database { - _db: db, - io, - conn: Some(conn), - path, - is_open: Cell::new(true), + inner: Some(Arc::new(DatabaseInner { + path, + io, + opts: None, + db: db_once, + conn: conn_once, + is_connected: is_connected_once, + })), default_safe_integers: Cell::new(false), } } + fn inner(&self) -> napi::Result<&Arc> { + let Some(inner) = &self.inner else { + return Err(create_generic_error("database must be connected")); + }; + Ok(inner) + } + + /// Connect the database synchronously + /// This method is idempotent and can be called multiple times safely until the database will be closed + #[napi] + pub fn connect_sync(&self) -> napi::Result<()> { + connect_sync(self.inner()?) + } + + /// Connect the database asynchronously + /// This method is idempotent and can be called multiple times safely until the database will be closed + #[napi(ts_return_type = "Promise")] + pub fn connect_async(&self) -> napi::Result> { + Ok(AsyncTask::new(DbTask::Connect { + db: self.inner()?.clone(), + })) + } + fn conn(&self) -> Result> { - let Some(conn) = self.conn.as_ref() else { - return Err(napi::Error::new( - napi::Status::GenericFailure, - "connection is not set", - )); + let Some(Some(conn)) = self.inner()?.conn.get() else { + return Err(create_generic_error("database must be connected")); }; Ok(conn.clone()) } - /// Returns whether the database is in memory-only mode. + /// Returns whether the database is in readonly-only mode. #[napi(getter)] - pub fn memory(&self) -> bool { - is_memory(&self.path) + pub fn readonly(&self) -> napi::Result { + Ok(self.conn()?.is_readonly(0)) } /// Returns whether the database is in memory-only mode. #[napi(getter)] - pub fn path(&self) -> String { - self.path.clone() + pub fn memory(&self) -> napi::Result { + Ok(is_memory(&self.inner()?.path)) + } + + /// Returns whether the database is in memory-only mode. + #[napi(getter)] + pub fn path(&self) -> napi::Result { + Ok(self.inner()?.path.clone()) } /// Returns whether the database connection is open. #[napi(getter)] - pub fn open(&self) -> bool { - self.is_open.get() + pub fn open(&self) -> napi::Result { + if self.inner.is_none() { + return Ok(false); + } + Ok(self.inner()?.is_connected.get() == Some(&true)) } /// Prepares a statement for execution. @@ -245,11 +355,11 @@ impl Database { /// /// A `Statement` instance. #[napi] - pub fn prepare(&self, sql: String) -> Result { + pub fn prepare(&self, sql: String) -> napi::Result { let stmt = self .conn()? .prepare(&sql) - .map_err(|e| Error::new(Status::GenericFailure, format!("{e}")))?; + .map_err(|e| to_generic_error("prepare failed", e))?; let column_names: Vec = (0..stmt.num_columns()) .map(|i| std::ffi::CString::new(stmt.get_column_name(i).to_string()).unwrap()) .collect(); @@ -268,7 +378,7 @@ impl Database { /// /// The rowid of the last row inserted. #[napi] - pub fn last_insert_rowid(&self) -> Result { + pub fn last_insert_rowid(&self) -> napi::Result { Ok(self.conn()?.last_insert_rowid()) } @@ -278,7 +388,7 @@ impl Database { /// /// The number of changes made by the last statement. #[napi] - pub fn changes(&self) -> Result { + pub fn changes(&self) -> napi::Result { Ok(self.conn()?.changes()) } @@ -288,7 +398,7 @@ impl Database { /// /// The total number of changes made by all statements. #[napi] - pub fn total_changes(&self) -> Result { + pub fn total_changes(&self) -> napi::Result { Ok(self.conn()?.total_changes()) } @@ -298,10 +408,8 @@ impl Database { /// /// `Ok(())` if the database is closed successfully. #[napi] - pub fn close(&mut self) -> Result<()> { - self.is_open.set(false); - let _ = self.conn.take().unwrap(); - let _ = self._db.take(); + pub fn close(&mut self) -> napi::Result<()> { + let _ = self.inner.take(); Ok(()) } @@ -317,18 +425,17 @@ impl Database { /// Runs the I/O loop synchronously. #[napi] - pub fn io_loop_sync(&self) -> Result<()> { - self.io - .step() - .map_err(|e| Error::new(Status::GenericFailure, format!("IO error: {e}")))?; + pub fn io_loop_sync(&self) -> napi::Result<()> { + let io = &self.inner()?.io; + io.step().map_err(|e| to_generic_error("IO error", e))?; Ok(()) } /// Runs the I/O loop asynchronously, returning a Promise. #[napi(ts_return_type = "Promise")] - pub fn io_loop_async(&self) -> AsyncTask { - let io = self.io.clone(); - AsyncTask::new(IoLoopTask { io }) + pub fn io_loop_async(&self) -> napi::Result> { + let io = self.inner()?.io.clone(); + Ok(AsyncTask::new(IoLoopTask { io })) } } @@ -348,7 +455,7 @@ impl Statement { let mut stmt = self.stmt.borrow_mut(); let stmt = stmt .as_mut() - .ok_or_else(|| Error::new(Status::GenericFailure, "Statement has been finalized"))?; + .ok_or_else(|| create_generic_error("statement has been finalized"))?; stmt.reset(); Ok(()) } @@ -359,7 +466,7 @@ impl Statement { let stmt = self.stmt.borrow(); let stmt = stmt .as_ref() - .ok_or_else(|| Error::new(Status::GenericFailure, "Statement has been finalized"))?; + .ok_or_else(|| create_generic_error("statement has been finalized"))?; Ok(stmt.parameters_count() as u32) } @@ -373,10 +480,10 @@ impl Statement { let stmt = self.stmt.borrow(); let stmt = stmt .as_ref() - .ok_or_else(|| Error::new(Status::GenericFailure, "Statement has been finalized"))?; + .ok_or_else(|| create_generic_error("statement has been finalized"))?; let non_zero_idx = NonZeroUsize::new(index as usize).ok_or_else(|| { - Error::new(Status::InvalidArg, "Parameter index must be greater than 0") + create_error(Status::InvalidArg, "parameter index must be greater than 0") })?; Ok(stmt.parameters().name(non_zero_idx).map(|s| s.to_string())) @@ -394,10 +501,10 @@ impl Statement { let mut stmt = self.stmt.borrow_mut(); let stmt = stmt .as_mut() - .ok_or_else(|| Error::new(Status::GenericFailure, "Statement has been finalized"))?; + .ok_or_else(|| create_generic_error("statement has been finalized"))?; let non_zero_idx = NonZeroUsize::new(index as usize).ok_or_else(|| { - Error::new(Status::InvalidArg, "Parameter index must be greater than 0") + create_error(Status::InvalidArg, "parameter index must be greater than 0") })?; let value_type = value.get_type()?; let turso_value = match value_type { @@ -412,12 +519,9 @@ impl Statement { } ValueType::BigInt => { let bigint_str = value.coerce_to_string()?.into_utf8()?.as_str()?.to_owned(); - let bigint_value = bigint_str.parse::().map_err(|e| { - Error::new( - Status::NumberExpected, - format!("Failed to parse BigInt: {e}"), - ) - })?; + let bigint_value = bigint_str + .parse::() + .map_err(|e| to_error(Status::NumberExpected, "failed to parse BigInt", e))?; turso_core::Value::Integer(bigint_value) } ValueType::String => { @@ -461,26 +565,17 @@ impl Statement { step_sync(&self.stmt) } - /// Step the statement and return result code (executed on the background thread): - /// 1 = Row available, 2 = Done, 3 = I/O needed - #[napi(ts_return_type = "Promise")] - pub fn step_async(&self) -> Result> { - Ok(AsyncTask::new(DbTask::Step { - stmt: self.stmt.clone(), - })) - } - /// Get the current row data according to the presentation mode #[napi] pub fn row<'env>(&self, env: &'env Env) -> Result> { let stmt_ref = self.stmt.borrow(); let stmt = stmt_ref .as_ref() - .ok_or_else(|| Error::new(Status::GenericFailure, "Statement has been finalized"))?; + .ok_or_else(|| create_generic_error("statement has been finalized"))?; let row_data = stmt .row() - .ok_or_else(|| Error::new(Status::GenericFailure, "No row data available"))?; + .ok_or_else(|| create_generic_error("no row data available"))?; let mode = self.mode.borrow(); let safe_integers = self.safe_integers.get(); @@ -499,9 +594,8 @@ impl Statement { .get_values() .enumerate() .next() - .ok_or(napi::Error::new( - napi::Status::GenericFailure, - "Pluck mode requires at least one column in the result", + .ok_or(create_generic_error( + "pluck mode requires at least one column in the result", ))?; to_js_value(env, value, safe_integers)? } @@ -563,7 +657,7 @@ impl Statement { let stmt_ref = self.stmt.borrow(); let stmt = stmt_ref .as_ref() - .ok_or_else(|| Error::new(Status::GenericFailure, "Statement has been finalized"))?; + .ok_or_else(|| create_generic_error("statement has been finalized"))?; let column_count = stmt.num_columns(); let mut js_array = env.create_array(column_count as u32)?; @@ -608,7 +702,7 @@ impl Statement { /// Async task for running the I/O loop. pub struct IoLoopTask { - // this field is set in the turso-sync-engine package + // this field is public because it is also set in the sync package pub io: Arc, } @@ -617,9 +711,9 @@ impl Task for IoLoopTask { type JsValue = (); fn compute(&mut self) -> napi::Result { - self.io.step().map_err(|e| { - napi::Error::new(napi::Status::GenericFailure, format!("IO error: {e}")) - })?; + self.io + .step() + .map_err(|e| to_generic_error("IO error", e))?; Ok(()) } @@ -661,79 +755,3 @@ fn to_js_value<'a>( } } } - -struct DatabaseFile { - file: Arc, -} - -unsafe impl Send for DatabaseFile {} -unsafe impl Sync for DatabaseFile {} - -impl DatabaseFile { - pub fn new(file: Arc) -> Self { - Self { file } - } -} - -impl turso_core::DatabaseStorage for DatabaseFile { - fn read_header(&self, c: turso_core::Completion) -> turso_core::Result { - self.file.pread(0, c) - } - fn read_page( - &self, - page_idx: usize, - _io_ctx: &turso_core::IOContext, - c: turso_core::Completion, - ) -> turso_core::Result { - let r = c.as_read(); - let size = r.buf().len(); - assert!(page_idx > 0); - if !(512..=65536).contains(&size) || size & (size - 1) != 0 { - return Err(turso_core::LimboError::NotADB); - } - let pos = (page_idx as u64 - 1) * size as u64; - self.file.pread(pos, c) - } - - fn write_page( - &self, - page_idx: usize, - buffer: Arc, - _io_ctx: &turso_core::IOContext, - c: turso_core::Completion, - ) -> turso_core::Result { - let size = buffer.len(); - let pos = (page_idx as u64 - 1) * size as u64; - self.file.pwrite(pos, buffer, c) - } - - fn write_pages( - &self, - first_page_idx: usize, - page_size: usize, - buffers: Vec>, - _io_ctx: &turso_core::IOContext, - c: turso_core::Completion, - ) -> turso_core::Result { - let pos = first_page_idx.saturating_sub(1) as u64 * page_size as u64; - let c = self.file.pwritev(pos, buffers, c)?; - Ok(c) - } - - fn sync(&self, c: turso_core::Completion) -> turso_core::Result { - self.file.sync(c) - } - - fn size(&self) -> turso_core::Result { - self.file.size() - } - - fn truncate( - &self, - len: usize, - c: turso_core::Completion, - ) -> turso_core::Result { - let c = self.file.truncate(len as u64, c)?; - Ok(c) - } -}