cleanup napi-rs bindings Rust code

This commit is contained in:
Nikita Sivukhin
2025-09-24 17:06:52 +04:00
parent 2ccd50ad69
commit cd9cf71568
2 changed files with 221 additions and 248 deletions

View File

@@ -2,9 +2,7 @@ use std::{cell::RefCell, collections::HashMap, sync::Arc};
use napi::bindgen_prelude::*;
use napi_derive::napi;
use turso_core::{Clock, Completion, File, Instant, MemoryIO, IO};
use crate::{is_memory, Database, DatabaseOpts};
use turso_core::{Clock, Completion, File, Instant, IO};
pub struct NoopTask;
@@ -26,32 +24,6 @@ pub fn init_thread_pool() -> napi::Result<AsyncTask<NoopTask>> {
Ok(AsyncTask::new(NoopTask))
}
pub struct ConnectTask {
path: String,
io: Arc<dyn turso_core::IO>,
opts: Option<DatabaseOpts>,
}
pub struct ConnectResult {
db: Database,
}
unsafe impl Send for ConnectResult {}
impl Task for ConnectTask {
type Output = ConnectResult;
type JsValue = Database;
fn compute(&mut self) -> Result<Self::Output> {
let db = Database::new_io(self.path.clone(), self.io.clone(), self.opts.clone())?;
Ok(ConnectResult { db })
}
fn resolve(&mut self, _: Env, result: Self::Output) -> Result<Self::JsValue> {
Ok(result.db)
}
}
#[napi]
#[derive(Clone)]
pub struct Opfs {
@@ -77,23 +49,6 @@ struct OpfsFile {
unsafe impl Send for Opfs {}
unsafe impl Sync for Opfs {}
#[napi]
// we offload connect to the web-worker because
// turso-db use blocking IO [io.wait_for_completion(c)] in few places during initialization path
pub fn connect_db_async(
path: String,
opts: Option<DatabaseOpts>,
) -> Result<AsyncTask<ConnectTask>> {
let io: Arc<dyn turso_core::IO> = if is_memory(&path) {
Arc::new(MemoryIO::new())
} else {
// we must create OPFS IO on the main thread
opfs()
};
let task = ConnectTask { path, io, opts };
Ok(AsyncTask::new(task))
}
#[napi]
pub fn complete_opfs(completion_no: u32, result: i32) {
OPFS.with(|opfs| opfs.complete(completion_no, result))

View File

@@ -10,7 +10,7 @@
//! - Iterating through query results
//! - Managing the I/O event loop
#[cfg(feature = "browser")]
// #[cfg(feature = "browser")]
pub mod browser;
use napi::bindgen_prelude::*;
@@ -24,6 +24,7 @@ use std::{
};
use tracing_subscriber::filter::LevelFilter;
use tracing_subscriber::fmt::format::FmtSpan;
use turso_core::storage::database::DatabaseFile;
/// Step result constants
const STEP_ROW: u32 = 1;
@@ -42,20 +43,27 @@ enum PresentationMode {
#[napi]
#[derive(Clone)]
pub struct Database {
_db: Option<Arc<turso_core::Database>>,
io: Arc<dyn turso_core::IO>,
conn: Option<Arc<turso_core::Connection>>,
path: String,
is_open: Cell<bool>,
inner: Option<Arc<DatabaseInner>>,
default_safe_integers: Cell<bool>,
}
/// database inner is Send to the worker for initial connection
/// that's why we use OnceLock here - in order to make DatabaseInner Send and Sync
pub struct DatabaseInner {
path: String,
opts: Option<DatabaseOpts>,
io: Arc<dyn turso_core::IO>,
db: OnceLock<Option<Arc<turso_core::Database>>>,
conn: OnceLock<Option<Arc<turso_core::Connection>>>,
is_connected: OnceLock<bool>,
}
pub(crate) fn is_memory(path: &str) -> bool {
path == ":memory:"
}
static TRACING_INIT: OnceLock<()> = OnceLock::new();
pub(crate) fn init_tracing(level_filter: Option<String>) {
pub(crate) fn init_tracing(level_filter: &Option<String>) {
let Some(level_filter) = level_filter else {
return;
};
@@ -77,21 +85,23 @@ pub(crate) fn init_tracing(level_filter: Option<String>) {
});
}
pub enum DbTask {
Step {
stmt: Arc<RefCell<Option<turso_core::Statement>>>,
},
}
// for now we make DbTask unsound as turso_core::Database and turso_core::Connection are not fully thread-safe
unsafe impl Send for DbTask {}
pub enum DbTask {
Connect { db: Arc<DatabaseInner> },
}
impl Task for DbTask {
type Output = u32;
type JsValue = u32;
fn compute(&mut self) -> Result<Self::Output> {
match self {
DbTask::Step { stmt } => step_sync(stmt),
DbTask::Connect { db } => {
connect_sync(db)?;
Ok(0)
}
}
}
@@ -100,9 +110,14 @@ impl Task for DbTask {
}
}
/// Most of the options are aligned with better-sqlite API
/// (see https://github.com/WiseLibs/better-sqlite3/blob/master/docs/api.md#new-databasepath-options)
#[napi(object)]
#[derive(Clone)]
pub struct DatabaseOpts {
pub readonly: Option<bool>,
pub timeout: Option<u32>,
pub file_must_exist: Option<bool>,
pub tracing: Option<String>,
}
@@ -110,26 +125,95 @@ fn step_sync(stmt: &Arc<RefCell<Option<turso_core::Statement>>>) -> napi::Result
let mut stmt_ref = stmt.borrow_mut();
let stmt = stmt_ref
.as_mut()
.ok_or_else(|| Error::new(Status::GenericFailure, "Statement has been finalized"))?;
.ok_or_else(|| create_generic_error("statement has been finalized"))?;
match stmt.step() {
Ok(turso_core::StepResult::Row) => Ok(STEP_ROW),
Ok(turso_core::StepResult::IO) => Ok(STEP_IO),
Ok(turso_core::StepResult::Done) => Ok(STEP_DONE),
Ok(turso_core::StepResult::Interrupt) => Err(Error::new(
Status::GenericFailure,
"Statement was interrupted",
)),
Ok(turso_core::StepResult::Busy) => {
Err(Error::new(Status::GenericFailure, "Database is busy"))
Ok(turso_core::StepResult::Interrupt) => {
Err(create_generic_error("statement was interrupted"))
}
Err(e) => Err(Error::new(
Status::GenericFailure,
format!("Step failed: {e}"),
)),
Ok(turso_core::StepResult::Busy) => Err(create_generic_error("database is busy")),
Err(e) => Err(to_generic_error("step failed", e)),
}
}
fn to_generic_error<E: std::error::Error>(message: &str, e: E) -> napi::Error {
Error::new(Status::GenericFailure, format!("{message}: {e}"))
}
fn to_error<E: std::error::Error>(status: napi::Status, message: &str, e: E) -> napi::Error {
Error::new(status, format!("{message}: {e}"))
}
fn create_generic_error(message: &str) -> napi::Error {
Error::new(Status::GenericFailure, message)
}
fn create_error(status: napi::Status, message: &str) -> napi::Error {
Error::new(status, message)
}
fn connect_sync(db: &DatabaseInner) -> napi::Result<()> {
if db.is_connected.get() == Some(&true) {
return Ok(());
}
let mut flags = turso_core::OpenFlags::Create;
let mut busy_timeout = None;
if let Some(opts) = &db.opts {
init_tracing(&opts.tracing);
if opts.readonly == Some(true) {
flags.set(turso_core::OpenFlags::ReadOnly, true);
flags.set(turso_core::OpenFlags::Create, false);
}
if opts.file_must_exist == Some(true) {
flags.set(turso_core::OpenFlags::Create, false);
}
if let Some(timeout) = opts.timeout {
busy_timeout = Some(std::time::Duration::from_millis(timeout as u64));
}
}
tracing::info!("flags: {:?}", flags);
let io = &db.io;
let file = io
.open_file(&db.path, flags, false)
.map_err(|e| to_generic_error("failed to open file", e))?;
let db_file = Arc::new(DatabaseFile::new(file));
let db_core = turso_core::Database::open_with_flags(
io.clone(),
&db.path,
db_file,
flags,
turso_core::DatabaseOpts::new()
.with_mvcc(false)
.with_indexes(true),
None,
)
.map_err(|e| to_generic_error("failed to open database", e))?;
let conn = db_core
.connect()
.map_err(|e| to_generic_error("failed to connect", e))?;
if let Some(busy_timeout) = busy_timeout {
conn.set_busy_timeout(busy_timeout);
}
db.is_connected
.set(true)
.map_err(|_| create_generic_error("db already connected, API misuse"))?;
db.db
.set(Some(db_core))
.map_err(|_| create_generic_error("db already connected, API misuse"))?;
db.conn
.set(Some(conn))
.map_err(|_| create_generic_error("db already connected, API misuse"))?;
Ok(())
}
#[napi]
impl Database {
/// Creates a new database instance.
@@ -137,102 +221,128 @@ impl Database {
/// # Arguments
/// * `path` - The path to the database file.
#[napi(constructor)]
pub fn new_napi(path: String, opts: Option<DatabaseOpts>) -> Result<Self> {
pub fn new_napi(path: String, opts: Option<DatabaseOpts>) -> napi::Result<Self> {
Self::new(path, opts)
}
pub fn new(path: String, opts: Option<DatabaseOpts>) -> Result<Self> {
pub fn new(path: String, opts: Option<DatabaseOpts>) -> napi::Result<Self> {
let io: Arc<dyn turso_core::IO> = if is_memory(&path) {
Arc::new(turso_core::MemoryIO::new())
} else {
#[cfg(not(feature = "browser"))]
{
Arc::new(turso_core::PlatformIO::new().map_err(|e| {
Error::new(Status::GenericFailure, format!("Failed to create IO: {e}"))
})?)
Arc::new(
turso_core::PlatformIO::new()
.map_err(|e| to_generic_error("failed to create IO", e))?,
)
}
#[cfg(feature = "browser")]
{
return Err(napi::Error::new(
napi::Status::GenericFailure,
"FS-backed db must be initialized through connectDbAsync function in the browser",
));
browser::opfs()
}
};
Self::new_io(path, io, opts)
Self::new_with_io(path, io, opts)
}
pub fn new_io(
pub fn new_with_io(
path: String,
io: Arc<dyn turso_core::IO>,
opts: Option<DatabaseOpts>,
) -> Result<Self> {
if let Some(opts) = opts {
init_tracing(opts.tracing);
}
let file = io
.open_file(&path, turso_core::OpenFlags::Create, false)
.map_err(|e| Error::new(Status::GenericFailure, format!("Failed to open file: {e}")))?;
let db_file = Arc::new(DatabaseFile::new(file));
let db =
turso_core::Database::open(io.clone(), &path, db_file, false, true).map_err(|e| {
Error::new(
Status::GenericFailure,
format!("Failed to open database: {e}"),
)
})?;
let conn = db
.connect()
.map_err(|e| Error::new(Status::GenericFailure, format!("Failed to connect: {e}")))?;
Ok(Self::create(Some(db), io, conn, path))
) -> napi::Result<Self> {
Ok(Self {
inner: Some(Arc::new(DatabaseInner {
path,
opts,
io,
db: OnceLock::new(),
conn: OnceLock::new(),
is_connected: OnceLock::new(),
})),
default_safe_integers: Cell::new(false),
})
}
pub fn create(
db: Option<Arc<turso_core::Database>>,
pub fn new_connected(
path: String,
io: Arc<dyn turso_core::IO>,
conn: Arc<turso_core::Connection>,
path: String,
) -> Self {
let db_once = OnceLock::new();
db_once.set(None).unwrap();
let conn_once = OnceLock::new();
conn_once.set(Some(conn)).ok().unwrap();
let is_connected_once = OnceLock::new();
is_connected_once.set(true).unwrap();
Database {
_db: db,
io,
conn: Some(conn),
path,
is_open: Cell::new(true),
inner: Some(Arc::new(DatabaseInner {
path,
io,
opts: None,
db: db_once,
conn: conn_once,
is_connected: is_connected_once,
})),
default_safe_integers: Cell::new(false),
}
}
fn inner(&self) -> napi::Result<&Arc<DatabaseInner>> {
let Some(inner) = &self.inner else {
return Err(create_generic_error("database must be connected"));
};
Ok(inner)
}
/// Connect the database synchronously
/// This method is idempotent and can be called multiple times safely until the database will be closed
#[napi]
pub fn connect_sync(&self) -> napi::Result<()> {
connect_sync(self.inner()?)
}
/// Connect the database asynchronously
/// This method is idempotent and can be called multiple times safely until the database will be closed
#[napi(ts_return_type = "Promise<void>")]
pub fn connect_async(&self) -> napi::Result<AsyncTask<DbTask>> {
Ok(AsyncTask::new(DbTask::Connect {
db: self.inner()?.clone(),
}))
}
fn conn(&self) -> Result<Arc<turso_core::Connection>> {
let Some(conn) = self.conn.as_ref() else {
return Err(napi::Error::new(
napi::Status::GenericFailure,
"connection is not set",
));
let Some(Some(conn)) = self.inner()?.conn.get() else {
return Err(create_generic_error("database must be connected"));
};
Ok(conn.clone())
}
/// Returns whether the database is in memory-only mode.
/// Returns whether the database is in readonly-only mode.
#[napi(getter)]
pub fn memory(&self) -> bool {
is_memory(&self.path)
pub fn readonly(&self) -> napi::Result<bool> {
Ok(self.conn()?.is_readonly(0))
}
/// Returns whether the database is in memory-only mode.
#[napi(getter)]
pub fn path(&self) -> String {
self.path.clone()
pub fn memory(&self) -> napi::Result<bool> {
Ok(is_memory(&self.inner()?.path))
}
/// Returns whether the database is in memory-only mode.
#[napi(getter)]
pub fn path(&self) -> napi::Result<String> {
Ok(self.inner()?.path.clone())
}
/// Returns whether the database connection is open.
#[napi(getter)]
pub fn open(&self) -> bool {
self.is_open.get()
pub fn open(&self) -> napi::Result<bool> {
if self.inner.is_none() {
return Ok(false);
}
Ok(self.inner()?.is_connected.get() == Some(&true))
}
/// Prepares a statement for execution.
@@ -245,11 +355,11 @@ impl Database {
///
/// A `Statement` instance.
#[napi]
pub fn prepare(&self, sql: String) -> Result<Statement> {
pub fn prepare(&self, sql: String) -> napi::Result<Statement> {
let stmt = self
.conn()?
.prepare(&sql)
.map_err(|e| Error::new(Status::GenericFailure, format!("{e}")))?;
.map_err(|e| to_generic_error("prepare failed", e))?;
let column_names: Vec<std::ffi::CString> = (0..stmt.num_columns())
.map(|i| std::ffi::CString::new(stmt.get_column_name(i).to_string()).unwrap())
.collect();
@@ -268,7 +378,7 @@ impl Database {
///
/// The rowid of the last row inserted.
#[napi]
pub fn last_insert_rowid(&self) -> Result<i64> {
pub fn last_insert_rowid(&self) -> napi::Result<i64> {
Ok(self.conn()?.last_insert_rowid())
}
@@ -278,7 +388,7 @@ impl Database {
///
/// The number of changes made by the last statement.
#[napi]
pub fn changes(&self) -> Result<i64> {
pub fn changes(&self) -> napi::Result<i64> {
Ok(self.conn()?.changes())
}
@@ -288,7 +398,7 @@ impl Database {
///
/// The total number of changes made by all statements.
#[napi]
pub fn total_changes(&self) -> Result<i64> {
pub fn total_changes(&self) -> napi::Result<i64> {
Ok(self.conn()?.total_changes())
}
@@ -298,10 +408,8 @@ impl Database {
///
/// `Ok(())` if the database is closed successfully.
#[napi]
pub fn close(&mut self) -> Result<()> {
self.is_open.set(false);
let _ = self.conn.take().unwrap();
let _ = self._db.take();
pub fn close(&mut self) -> napi::Result<()> {
let _ = self.inner.take();
Ok(())
}
@@ -317,18 +425,17 @@ impl Database {
/// Runs the I/O loop synchronously.
#[napi]
pub fn io_loop_sync(&self) -> Result<()> {
self.io
.step()
.map_err(|e| Error::new(Status::GenericFailure, format!("IO error: {e}")))?;
pub fn io_loop_sync(&self) -> napi::Result<()> {
let io = &self.inner()?.io;
io.step().map_err(|e| to_generic_error("IO error", e))?;
Ok(())
}
/// Runs the I/O loop asynchronously, returning a Promise.
#[napi(ts_return_type = "Promise<void>")]
pub fn io_loop_async(&self) -> AsyncTask<IoLoopTask> {
let io = self.io.clone();
AsyncTask::new(IoLoopTask { io })
pub fn io_loop_async(&self) -> napi::Result<AsyncTask<IoLoopTask>> {
let io = self.inner()?.io.clone();
Ok(AsyncTask::new(IoLoopTask { io }))
}
}
@@ -348,7 +455,7 @@ impl Statement {
let mut stmt = self.stmt.borrow_mut();
let stmt = stmt
.as_mut()
.ok_or_else(|| Error::new(Status::GenericFailure, "Statement has been finalized"))?;
.ok_or_else(|| create_generic_error("statement has been finalized"))?;
stmt.reset();
Ok(())
}
@@ -359,7 +466,7 @@ impl Statement {
let stmt = self.stmt.borrow();
let stmt = stmt
.as_ref()
.ok_or_else(|| Error::new(Status::GenericFailure, "Statement has been finalized"))?;
.ok_or_else(|| create_generic_error("statement has been finalized"))?;
Ok(stmt.parameters_count() as u32)
}
@@ -373,10 +480,10 @@ impl Statement {
let stmt = self.stmt.borrow();
let stmt = stmt
.as_ref()
.ok_or_else(|| Error::new(Status::GenericFailure, "Statement has been finalized"))?;
.ok_or_else(|| create_generic_error("statement has been finalized"))?;
let non_zero_idx = NonZeroUsize::new(index as usize).ok_or_else(|| {
Error::new(Status::InvalidArg, "Parameter index must be greater than 0")
create_error(Status::InvalidArg, "parameter index must be greater than 0")
})?;
Ok(stmt.parameters().name(non_zero_idx).map(|s| s.to_string()))
@@ -394,10 +501,10 @@ impl Statement {
let mut stmt = self.stmt.borrow_mut();
let stmt = stmt
.as_mut()
.ok_or_else(|| Error::new(Status::GenericFailure, "Statement has been finalized"))?;
.ok_or_else(|| create_generic_error("statement has been finalized"))?;
let non_zero_idx = NonZeroUsize::new(index as usize).ok_or_else(|| {
Error::new(Status::InvalidArg, "Parameter index must be greater than 0")
create_error(Status::InvalidArg, "parameter index must be greater than 0")
})?;
let value_type = value.get_type()?;
let turso_value = match value_type {
@@ -412,12 +519,9 @@ impl Statement {
}
ValueType::BigInt => {
let bigint_str = value.coerce_to_string()?.into_utf8()?.as_str()?.to_owned();
let bigint_value = bigint_str.parse::<i64>().map_err(|e| {
Error::new(
Status::NumberExpected,
format!("Failed to parse BigInt: {e}"),
)
})?;
let bigint_value = bigint_str
.parse::<i64>()
.map_err(|e| to_error(Status::NumberExpected, "failed to parse BigInt", e))?;
turso_core::Value::Integer(bigint_value)
}
ValueType::String => {
@@ -461,26 +565,17 @@ impl Statement {
step_sync(&self.stmt)
}
/// Step the statement and return result code (executed on the background thread):
/// 1 = Row available, 2 = Done, 3 = I/O needed
#[napi(ts_return_type = "Promise<number>")]
pub fn step_async(&self) -> Result<AsyncTask<DbTask>> {
Ok(AsyncTask::new(DbTask::Step {
stmt: self.stmt.clone(),
}))
}
/// Get the current row data according to the presentation mode
#[napi]
pub fn row<'env>(&self, env: &'env Env) -> Result<Unknown<'env>> {
let stmt_ref = self.stmt.borrow();
let stmt = stmt_ref
.as_ref()
.ok_or_else(|| Error::new(Status::GenericFailure, "Statement has been finalized"))?;
.ok_or_else(|| create_generic_error("statement has been finalized"))?;
let row_data = stmt
.row()
.ok_or_else(|| Error::new(Status::GenericFailure, "No row data available"))?;
.ok_or_else(|| create_generic_error("no row data available"))?;
let mode = self.mode.borrow();
let safe_integers = self.safe_integers.get();
@@ -499,9 +594,8 @@ impl Statement {
.get_values()
.enumerate()
.next()
.ok_or(napi::Error::new(
napi::Status::GenericFailure,
"Pluck mode requires at least one column in the result",
.ok_or(create_generic_error(
"pluck mode requires at least one column in the result",
))?;
to_js_value(env, value, safe_integers)?
}
@@ -563,7 +657,7 @@ impl Statement {
let stmt_ref = self.stmt.borrow();
let stmt = stmt_ref
.as_ref()
.ok_or_else(|| Error::new(Status::GenericFailure, "Statement has been finalized"))?;
.ok_or_else(|| create_generic_error("statement has been finalized"))?;
let column_count = stmt.num_columns();
let mut js_array = env.create_array(column_count as u32)?;
@@ -608,7 +702,7 @@ impl Statement {
/// Async task for running the I/O loop.
pub struct IoLoopTask {
// this field is set in the turso-sync-engine package
// this field is public because it is also set in the sync package
pub io: Arc<dyn turso_core::IO>,
}
@@ -617,9 +711,9 @@ impl Task for IoLoopTask {
type JsValue = ();
fn compute(&mut self) -> napi::Result<Self::Output> {
self.io.step().map_err(|e| {
napi::Error::new(napi::Status::GenericFailure, format!("IO error: {e}"))
})?;
self.io
.step()
.map_err(|e| to_generic_error("IO error", e))?;
Ok(())
}
@@ -661,79 +755,3 @@ fn to_js_value<'a>(
}
}
}
struct DatabaseFile {
file: Arc<dyn turso_core::File>,
}
unsafe impl Send for DatabaseFile {}
unsafe impl Sync for DatabaseFile {}
impl DatabaseFile {
pub fn new(file: Arc<dyn turso_core::File>) -> Self {
Self { file }
}
}
impl turso_core::DatabaseStorage for DatabaseFile {
fn read_header(&self, c: turso_core::Completion) -> turso_core::Result<turso_core::Completion> {
self.file.pread(0, c)
}
fn read_page(
&self,
page_idx: usize,
_io_ctx: &turso_core::IOContext,
c: turso_core::Completion,
) -> turso_core::Result<turso_core::Completion> {
let r = c.as_read();
let size = r.buf().len();
assert!(page_idx > 0);
if !(512..=65536).contains(&size) || size & (size - 1) != 0 {
return Err(turso_core::LimboError::NotADB);
}
let pos = (page_idx as u64 - 1) * size as u64;
self.file.pread(pos, c)
}
fn write_page(
&self,
page_idx: usize,
buffer: Arc<turso_core::Buffer>,
_io_ctx: &turso_core::IOContext,
c: turso_core::Completion,
) -> turso_core::Result<turso_core::Completion> {
let size = buffer.len();
let pos = (page_idx as u64 - 1) * size as u64;
self.file.pwrite(pos, buffer, c)
}
fn write_pages(
&self,
first_page_idx: usize,
page_size: usize,
buffers: Vec<Arc<turso_core::Buffer>>,
_io_ctx: &turso_core::IOContext,
c: turso_core::Completion,
) -> turso_core::Result<turso_core::Completion> {
let pos = first_page_idx.saturating_sub(1) as u64 * page_size as u64;
let c = self.file.pwritev(pos, buffers, c)?;
Ok(c)
}
fn sync(&self, c: turso_core::Completion) -> turso_core::Result<turso_core::Completion> {
self.file.sync(c)
}
fn size(&self) -> turso_core::Result<u64> {
self.file.size()
}
fn truncate(
&self,
len: usize,
c: turso_core::Completion,
) -> turso_core::Result<turso_core::Completion> {
let c = self.file.truncate(len as u64, c)?;
Ok(c)
}
}