From ddfa77997d08789cd1cd805a52d33e171f81deb4 Mon Sep 17 00:00:00 2001 From: Nikita Sivukhin Date: Thu, 25 Sep 2025 10:54:50 +0400 Subject: [PATCH] adjust sync package napi-rs code --- bindings/javascript/sync/src/generator.rs | 14 ++++- bindings/javascript/sync/src/lib.rs | 74 +++++++++-------------- 2 files changed, 38 insertions(+), 50 deletions(-) diff --git a/bindings/javascript/sync/src/generator.rs b/bindings/javascript/sync/src/generator.rs index 00ae9661e..b5208cd15 100644 --- a/bindings/javascript/sync/src/generator.rs +++ b/bindings/javascript/sync/src/generator.rs @@ -7,9 +7,6 @@ use std::{ use turso_sync_engine::types::{DbChangesStatus, ProtocolCommand}; -pub const GENERATOR_RESUME_IO: u32 = 0; -pub const GENERATOR_RESUME_DONE: u32 = 1; - pub trait Generator { fn resume(&mut self, result: Option) -> napi::Result; } @@ -57,6 +54,17 @@ pub enum GeneratorResponse { }, } +#[napi] +impl SyncEngineChanges { + #[napi] + pub fn empty(&self) -> bool { + let Some(changes) = self.status.as_ref() else { + return true; + }; + changes.file_slot.is_none() + } +} + #[napi] #[derive(Clone)] pub struct GeneratorHolder { diff --git a/bindings/javascript/sync/src/lib.rs b/bindings/javascript/sync/src/lib.rs index f851af991..b0dd3f8b9 100644 --- a/bindings/javascript/sync/src/lib.rs +++ b/bindings/javascript/sync/src/lib.rs @@ -6,13 +6,12 @@ pub mod js_protocol_io; use std::{ collections::HashMap, - sync::{Arc, Mutex, OnceLock, RwLock, RwLockReadGuard}, + sync::{Arc, Mutex, RwLock, RwLockReadGuard}, }; use napi::bindgen_prelude::{AsyncTask, Either5, Null}; use napi_derive::napi; -use tracing_subscriber::{filter::LevelFilter, fmt::format::FmtSpan}; -use turso_node::IoLoopTask; +use turso_node::{DatabaseOpts, IoLoopTask}; use turso_sync_engine::{ database_sync_engine::{DatabaseSyncEngine, DatabaseSyncEngineOpts}, types::{Coro, DatabaseChangeType, DatabaseSyncEngineProtocolVersion}, @@ -23,11 +22,6 @@ use crate::{ js_protocol_io::{JsProtocolIo, JsProtocolRequestBytes}, }; -#[napi(object)] -pub struct DatabaseOpts { - pub path: String, -} - #[napi] pub struct SyncEngine { path: String, @@ -40,17 +34,17 @@ pub struct SyncEngine { io: Option>, protocol: Option>, sync_engine: Arc>>>, - opened: Arc>>, + db: Arc>, } -#[napi] +#[napi(string_enum = "lowercase")] pub enum DatabaseChangeTypeJs { Insert, Update, Delete, } -#[napi] +#[napi(string_enum = "lowercase")] pub enum SyncEngineProtocolVersion { Legacy, V1, @@ -131,31 +125,10 @@ pub struct SyncEngineOpts { pub protocol_version: Option, } -static TRACING_INIT: OnceLock<()> = OnceLock::new(); -pub fn init_tracing(level_filter: LevelFilter) { - TRACING_INIT.get_or_init(|| { - tracing_subscriber::fmt() - .with_ansi(false) - .with_thread_ids(true) - .with_span_events(FmtSpan::ACTIVE) - .with_max_level(level_filter) - .init(); - }); -} - #[napi] impl SyncEngine { #[napi(constructor)] pub fn new(opts: SyncEngineOpts) -> napi::Result { - // helpful for local debugging - match opts.tracing.as_deref() { - Some("error") => init_tracing(LevelFilter::ERROR), - Some("warn") => init_tracing(LevelFilter::WARN), - Some("info") => init_tracing(LevelFilter::INFO), - Some("debug") => init_tracing(LevelFilter::DEBUG), - Some("trace") => init_tracing(LevelFilter::TRACE), - _ => {} - } let is_memory = opts.path == ":memory:"; let io: Arc = if is_memory { Arc::new(turso_core::MemoryIO::new()) @@ -174,6 +147,16 @@ impl SyncEngine { turso_node::browser::opfs() } }; + let db = Arc::new(Mutex::new(turso_node::Database::new_with_io( + opts.path.clone(), + io.clone(), + Some(DatabaseOpts { + file_must_exist: None, + readonly: None, + timeout: None, + tracing: opts.tracing.clone(), + }), + )?)); Ok(SyncEngine { path: opts.path, client_name: opts.client_name.unwrap_or("turso-sync-js".to_string()), @@ -188,7 +171,7 @@ impl SyncEngine { io: Some(io), protocol: Some(Arc::new(JsProtocolIo::default())), #[allow(clippy::arc_with_non_send_sync)] - opened: Arc::new(Mutex::new(None)), + db, protocol_version: match opts.protocol_version { Some(SyncEngineProtocolVersion::Legacy) | None => { DatabaseSyncEngineProtocolVersion::Legacy @@ -199,7 +182,7 @@ impl SyncEngine { } #[napi] - pub fn init(&mut self) -> napi::Result { + pub fn connect(&mut self) -> napi::Result { let opts = DatabaseSyncEngineOpts { client_name: self.client_name.clone(), wal_pull_batch_size: self.wal_pull_batch_size as u64, @@ -212,17 +195,21 @@ impl SyncEngine { let io = self.io()?; let protocol = self.protocol()?; let sync_engine = self.sync_engine.clone(); - let opened = self.opened.clone(); + let db = self.db.clone(); let path = self.path.clone(); let generator = genawaiter::sync::Gen::new(|coro| async move { let coro = Coro::new((), coro); let initialized = DatabaseSyncEngine::new(&coro, io.clone(), protocol, &path, opts).await?; let connection = initialized.connect_rw(&coro).await?; - let db = turso_node::Database::create(None, io.clone(), connection, path); + db.lock().unwrap().set_connected(connection).map_err(|e| { + turso_sync_engine::errors::Error::DatabaseSyncEngineError(format!( + "failed to connect sync engine: {e}" + )) + })?; *sync_engine.write().unwrap() = Some(initialized); - *opened.lock().unwrap() = Some(db); + Ok(()) }); Ok(GeneratorHolder { @@ -314,21 +301,14 @@ impl SyncEngine { } #[napi] - pub fn open(&self) -> napi::Result { - let opened = self.opened.lock().unwrap(); - let Some(opened) = opened.as_ref() else { - return Err(napi::Error::new( - napi::Status::GenericFailure, - "sync_engine must be initialized".to_string(), - )); - }; - Ok(opened.clone()) + pub fn db(&self) -> napi::Result { + Ok(self.db.lock().unwrap().clone()) } #[napi] pub fn close(&mut self) { let _ = self.sync_engine.write().unwrap().take(); - let _ = self.opened.lock().unwrap().take().unwrap(); + let _ = self.db.lock().unwrap().close(); let _ = self.io.take(); let _ = self.protocol.take(); }