diff --git a/packages/turso-sync-js/build.rs b/packages/turso-sync-js/build.rs index bbfc9e4b9..0f1b01002 100644 --- a/packages/turso-sync-js/build.rs +++ b/packages/turso-sync-js/build.rs @@ -1,3 +1,3 @@ fn main() { - napi_build::setup(); + napi_build::setup(); } diff --git a/packages/turso-sync-js/src/generator.rs b/packages/turso-sync-js/src/generator.rs index 74f7823d8..b7fc0e487 100644 --- a/packages/turso-sync-js/src/generator.rs +++ b/packages/turso-sync-js/src/generator.rs @@ -7,39 +7,39 @@ pub const GENERATOR_RESUME_IO: u32 = 0; pub const GENERATOR_RESUME_DONE: u32 = 1; pub trait Generator { - fn resume(&mut self, result: Option) -> napi::Result; + fn resume(&mut self, result: Option) -> napi::Result; } impl>> Generator - for genawaiter::sync::Gen, F> + for genawaiter::sync::Gen, F> { - fn resume(&mut self, error: Option) -> napi::Result { - let result = match error { - Some(err) => Err(turso_sync_engine::errors::Error::DatabaseSyncEngineError( - format!("JsProtocolIo error: {err}"), - )), - None => Ok(()), - }; - match self.resume_with(result) { - genawaiter::GeneratorState::Yielded(ProtocolCommand::IO) => Ok(GENERATOR_RESUME_IO), - genawaiter::GeneratorState::Complete(Ok(())) => Ok(GENERATOR_RESUME_DONE), - genawaiter::GeneratorState::Complete(Err(err)) => Err(napi::Error::new( - napi::Status::GenericFailure, - format!("sync engine operation failed: {err}"), - )), + fn resume(&mut self, error: Option) -> napi::Result { + let result = match error { + Some(err) => Err(turso_sync_engine::errors::Error::DatabaseSyncEngineError( + format!("JsProtocolIo error: {err}"), + )), + None => Ok(()), + }; + match self.resume_with(result) { + genawaiter::GeneratorState::Yielded(ProtocolCommand::IO) => Ok(GENERATOR_RESUME_IO), + genawaiter::GeneratorState::Complete(Ok(())) => Ok(GENERATOR_RESUME_DONE), + genawaiter::GeneratorState::Complete(Err(err)) => Err(napi::Error::new( + napi::Status::GenericFailure, + format!("sync engine operation failed: {err}"), + )), + } } - } } #[napi] pub struct GeneratorHolder { - pub(crate) inner: Box>, + pub(crate) inner: Box>, } #[napi] impl GeneratorHolder { - #[napi] - pub fn resume(&self, error: Option) -> napi::Result { - self.inner.lock().unwrap().resume(error) - } + #[napi] + pub fn resume(&self, error: Option) -> napi::Result { + self.inner.lock().unwrap().resume(error) + } } diff --git a/packages/turso-sync-js/src/js_protocol_io.rs b/packages/turso-sync-js/src/js_protocol_io.rs index 5a2827973..02942f01a 100644 --- a/packages/turso-sync-js/src/js_protocol_io.rs +++ b/packages/turso-sync-js/src/js_protocol_io.rs @@ -1,8 +1,8 @@ #![deny(clippy::all)] use std::{ - collections::VecDeque, - sync::{Arc, Mutex, MutexGuard}, + collections::VecDeque, + sync::{Arc, Mutex, MutexGuard}, }; use napi::bindgen_prelude::*; @@ -11,18 +11,18 @@ use turso_sync_engine::protocol_io::{DataCompletion, DataPollResult, ProtocolIO} #[napi] pub enum JsProtocolRequest { - Http { - method: String, - path: String, - body: Option, - }, - FullRead { - path: String, - }, - FullWrite { - path: String, - content: Buffer, - }, + Http { + method: String, + path: String, + body: Option, + }, + FullRead { + path: String, + }, + FullWrite { + path: String, + content: Buffer, + }, } #[derive(Clone)] @@ -30,166 +30,166 @@ pub enum JsProtocolRequest { pub struct JsDataCompletion(Arc>); struct JsDataCompletionInner { - status: Option, - chunks: VecDeque, - finished: bool, - err: Option, + status: Option, + chunks: VecDeque, + finished: bool, + err: Option, } impl JsDataCompletion { - fn inner(&self) -> turso_sync_engine::Result> { - let inner = self.0.lock().unwrap(); - if let Some(err) = &inner.err { - return Err(turso_sync_engine::errors::Error::DatabaseSyncEngineError( - err.clone(), - )); + fn inner(&self) -> turso_sync_engine::Result> { + let inner = self.0.lock().unwrap(); + if let Some(err) = &inner.err { + return Err(turso_sync_engine::errors::Error::DatabaseSyncEngineError( + err.clone(), + )); + } + Ok(inner) } - Ok(inner) - } } impl DataCompletion for JsDataCompletion { - type DataPollResult = JsDataPollResult; + type DataPollResult = JsDataPollResult; - fn status(&self) -> turso_sync_engine::Result> { - let inner = self.inner()?; - Ok(inner.status) - } + fn status(&self) -> turso_sync_engine::Result> { + let inner = self.inner()?; + Ok(inner.status) + } - fn poll_data(&self) -> turso_sync_engine::Result> { - let mut inner = self.inner()?; - let chunk = inner.chunks.pop_front(); - Ok(chunk.map(JsDataPollResult)) - } + fn poll_data(&self) -> turso_sync_engine::Result> { + let mut inner = self.inner()?; + let chunk = inner.chunks.pop_front(); + Ok(chunk.map(JsDataPollResult)) + } - fn is_done(&self) -> turso_sync_engine::Result { - let inner = self.inner()?; - Ok(inner.finished) - } + fn is_done(&self) -> turso_sync_engine::Result { + let inner = self.inner()?; + Ok(inner.finished) + } } #[napi] impl JsDataCompletion { - #[napi] - pub fn poison(&self, err: String) { - let mut completion = self.0.lock().unwrap(); - completion.err = Some(err); - } + #[napi] + pub fn poison(&self, err: String) { + let mut completion = self.0.lock().unwrap(); + completion.err = Some(err); + } - #[napi] - pub fn status(&self, value: u32) { - let mut completion = self.0.lock().unwrap(); - completion.status = Some(value as u16); - } + #[napi] + pub fn status(&self, value: u32) { + let mut completion = self.0.lock().unwrap(); + completion.status = Some(value as u16); + } - #[napi] - pub fn push(&self, value: Buffer) { - let mut completion = self.0.lock().unwrap(); - completion.chunks.push_back(value); - } + #[napi] + pub fn push(&self, value: Buffer) { + let mut completion = self.0.lock().unwrap(); + completion.chunks.push_back(value); + } - #[napi] - pub fn done(&self) { - let mut completion = self.0.lock().unwrap(); - completion.finished = true; - } + #[napi] + pub fn done(&self) { + let mut completion = self.0.lock().unwrap(); + completion.finished = true; + } } #[napi] pub struct JsDataPollResult(Buffer); impl DataPollResult for JsDataPollResult { - fn data(&self) -> &[u8] { - &self.0 - } + fn data(&self) -> &[u8] { + &self.0 + } } #[napi] pub struct JsProtocolRequestData { - request: Arc>>, - completion: JsDataCompletion, + request: Arc>>, + completion: JsDataCompletion, } #[napi] impl JsProtocolRequestData { - #[napi] - pub fn request(&self) -> JsProtocolRequest { - let mut request = self.request.lock().unwrap(); - request.take().unwrap() - } - #[napi] - pub fn completion(&self) -> JsDataCompletion { - self.completion.clone() - } + #[napi] + pub fn request(&self) -> JsProtocolRequest { + let mut request = self.request.lock().unwrap(); + request.take().unwrap() + } + #[napi] + pub fn completion(&self) -> JsDataCompletion { + self.completion.clone() + } } impl ProtocolIO for JsProtocolIo { - type DataCompletion = JsDataCompletion; - fn http( - &self, - method: &str, - path: &str, - body: Option>, - ) -> turso_sync_engine::Result { - Ok(self.add_request(JsProtocolRequest::Http { - method: method.to_string(), - path: path.to_string(), - body: body.map(Buffer::from), - })) - } + type DataCompletion = JsDataCompletion; + fn http( + &self, + method: &str, + path: &str, + body: Option>, + ) -> turso_sync_engine::Result { + Ok(self.add_request(JsProtocolRequest::Http { + method: method.to_string(), + path: path.to_string(), + body: body.map(Buffer::from), + })) + } - fn full_read(&self, path: &str) -> turso_sync_engine::Result { - Ok(self.add_request(JsProtocolRequest::FullRead { - path: path.to_string(), - })) - } + fn full_read(&self, path: &str) -> turso_sync_engine::Result { + Ok(self.add_request(JsProtocolRequest::FullRead { + path: path.to_string(), + })) + } - fn full_write( - &self, - path: &str, - content: Vec, - ) -> turso_sync_engine::Result { - Ok(self.add_request(JsProtocolRequest::FullWrite { - path: path.to_string(), - content: Buffer::from(content), - })) - } + fn full_write( + &self, + path: &str, + content: Vec, + ) -> turso_sync_engine::Result { + Ok(self.add_request(JsProtocolRequest::FullWrite { + path: path.to_string(), + content: Buffer::from(content), + })) + } } #[napi] pub struct JsProtocolIo { - requests: Mutex>, + requests: Mutex>, } impl Default for JsProtocolIo { - fn default() -> Self { - Self { - requests: Mutex::new(Vec::new()), + fn default() -> Self { + Self { + requests: Mutex::new(Vec::new()), + } } - } } #[napi] impl JsProtocolIo { - #[napi] - pub fn take_request(&self) -> Option { - self.requests.lock().unwrap().pop() - } + #[napi] + pub fn take_request(&self) -> Option { + self.requests.lock().unwrap().pop() + } - fn add_request(&self, request: JsProtocolRequest) -> JsDataCompletion { - let completion = JsDataCompletionInner { - chunks: VecDeque::new(), - finished: false, - err: None, - status: None, - }; - let completion = JsDataCompletion(Arc::new(Mutex::new(completion))); + fn add_request(&self, request: JsProtocolRequest) -> JsDataCompletion { + let completion = JsDataCompletionInner { + chunks: VecDeque::new(), + finished: false, + err: None, + status: None, + }; + let completion = JsDataCompletion(Arc::new(Mutex::new(completion))); - let mut requests = self.requests.lock().unwrap(); - requests.push(JsProtocolRequestData { - request: Arc::new(Mutex::new(Some(request))), - completion: completion.clone(), - }); - completion - } + let mut requests = self.requests.lock().unwrap(); + requests.push(JsProtocolRequestData { + request: Arc::new(Mutex::new(Some(request))), + completion: completion.clone(), + }); + completion + } } diff --git a/packages/turso-sync-js/src/lib.rs b/packages/turso-sync-js/src/lib.rs index 3ec64a7cb..6384c5d8a 100644 --- a/packages/turso-sync-js/src/lib.rs +++ b/packages/turso-sync-js/src/lib.rs @@ -9,158 +9,161 @@ use napi::bindgen_prelude::AsyncTask; use napi_derive::napi; use turso_node::IoLoopTask; use turso_sync_engine::{ - database_sync_engine::{DatabaseSyncEngine, DatabaseSyncEngineOpts}, - types::Coro, + database_sync_engine::{DatabaseSyncEngine, DatabaseSyncEngineOpts}, + types::Coro, }; use crate::{ - generator::GeneratorHolder, - js_protocol_io::{JsProtocolIo, JsProtocolRequestData}, + generator::GeneratorHolder, + js_protocol_io::{JsProtocolIo, JsProtocolRequestData}, }; #[napi(object)] pub struct DatabaseOpts { - pub path: String, + pub path: String, } #[napi] pub struct SyncEngine { - path: String, - client_name: String, - wal_pull_batch_size: u32, - io: Arc, - protocol: Arc, - sync_engine: Arc>>>, - opened: Arc>>, + path: String, + client_name: String, + wal_pull_batch_size: u32, + io: Arc, + protocol: Arc, + sync_engine: Arc>>>, + opened: Arc>>, } #[napi(object)] pub struct SyncEngineOpts { - pub path: String, - pub client_name: Option, - pub wal_pull_batch_size: Option, + pub path: String, + pub client_name: Option, + pub wal_pull_batch_size: Option, } #[napi] impl SyncEngine { - #[napi(constructor)] - pub fn new(opts: SyncEngineOpts) -> napi::Result { - Ok(SyncEngine { - path: opts.path, - client_name: opts.client_name.unwrap_or("turso-sync-js".to_string()), - wal_pull_batch_size: opts.wal_pull_batch_size.unwrap_or(100), - sync_engine: Arc::new(Mutex::new(None)), - io: Arc::new(turso_core::PlatformIO::new().map_err(|e| { - napi::Error::new( - napi::Status::GenericFailure, - format!("Failed to create IO: {e}"), - ) - })?), - protocol: Arc::new(JsProtocolIo::default()), - #[allow(clippy::arc_with_non_send_sync)] - opened: Arc::new(Mutex::new(None)), - }) - } - - #[napi] - pub fn init(&self) -> GeneratorHolder { - let opts = DatabaseSyncEngineOpts { - client_name: self.client_name.clone(), - wal_pull_batch_size: self.wal_pull_batch_size as u64, - }; - - let protocol = self.protocol.clone(); - let sync_engine = self.sync_engine.clone(); - let io = self.io.clone(); - let opened = self.opened.clone(); - let path = self.path.clone(); - let generator = genawaiter::sync::Gen::new(|coro| async move { - let initialized = DatabaseSyncEngine::new(&coro, io.clone(), protocol, &path, opts).await?; - let connection = initialized.connect(&coro).await?; - let db = turso_node::Database::create(None, io.clone(), connection, false); - - *sync_engine.lock().unwrap() = Some(initialized); - *opened.lock().unwrap() = Some(db); - Ok(()) - }); - GeneratorHolder { - inner: Box::new(Mutex::new(generator)), + #[napi(constructor)] + pub fn new(opts: SyncEngineOpts) -> napi::Result { + Ok(SyncEngine { + path: opts.path, + client_name: opts.client_name.unwrap_or("turso-sync-js".to_string()), + wal_pull_batch_size: opts.wal_pull_batch_size.unwrap_or(100), + sync_engine: Arc::new(Mutex::new(None)), + io: Arc::new(turso_core::PlatformIO::new().map_err(|e| { + napi::Error::new( + napi::Status::GenericFailure, + format!("Failed to create IO: {e}"), + ) + })?), + protocol: Arc::new(JsProtocolIo::default()), + #[allow(clippy::arc_with_non_send_sync)] + opened: Arc::new(Mutex::new(None)), + }) } - } - #[napi] - pub fn io_loop_sync(&self) -> napi::Result<()> { - self - .io - .run_once() - .map_err(|e| napi::Error::new(napi::Status::GenericFailure, format!("IO error: {e}")))?; - Ok(()) - } + #[napi] + pub fn init(&self) -> GeneratorHolder { + let opts = DatabaseSyncEngineOpts { + client_name: self.client_name.clone(), + wal_pull_batch_size: self.wal_pull_batch_size as u64, + }; - /// Runs the I/O loop asynchronously, returning a Promise. - #[napi(ts_return_type = "Promise")] - pub fn io_loop_async(&self) -> AsyncTask { - let io = self.io.clone(); - AsyncTask::new(IoLoopTask { io }) - } + let protocol = self.protocol.clone(); + let sync_engine = self.sync_engine.clone(); + let io = self.io.clone(); + let opened = self.opened.clone(); + let path = self.path.clone(); + let generator = genawaiter::sync::Gen::new(|coro| async move { + let initialized = + DatabaseSyncEngine::new(&coro, io.clone(), protocol, &path, opts).await?; + let connection = initialized.connect(&coro).await?; + let db = turso_node::Database::create(None, io.clone(), connection, false); - #[napi] - pub fn protocol_io(&self) -> Option { - self.protocol.take_request() - } - - #[napi] - pub fn sync(&self) -> GeneratorHolder { - self.run(async move |coro, sync_engine| sync_engine.sync(coro).await) - } - - #[napi] - pub fn push(&self) -> GeneratorHolder { - self.run(async move |coro, sync_engine| sync_engine.push(coro).await) - } - - #[napi] - pub fn pull(&self) -> GeneratorHolder { - self.run(async move |coro, sync_engine| sync_engine.pull(coro).await) - } - - #[napi] - pub fn open(&self) -> napi::Result { - let opened = self.opened.lock().unwrap(); - let Some(opened) = opened.as_ref() else { - return Err(napi::Error::new( - napi::Status::GenericFailure, - "sync_engine must be initialized".to_string(), - )); - }; - Ok(opened.clone()) - } - - fn run( - &self, - f: impl AsyncFnOnce(&Coro, &mut DatabaseSyncEngine) -> turso_sync_engine::Result<()> - + 'static, - ) -> GeneratorHolder { - let sync_engine = self.sync_engine.clone(); - #[allow(clippy::await_holding_lock)] - let generator = genawaiter::sync::Gen::new(|coro| async move { - let Ok(mut sync_engine) = sync_engine.try_lock() else { - let nasty_error = "sync_engine is busy".to_string(); - return Err(turso_sync_engine::errors::Error::DatabaseSyncEngineError( - nasty_error, - )); - }; - let Some(sync_engine) = sync_engine.as_mut() else { - let error = "sync_engine must be initialized".to_string(); - return Err(turso_sync_engine::errors::Error::DatabaseSyncEngineError( - error, - )); - }; - f(&coro, sync_engine).await?; - Ok(()) - }); - GeneratorHolder { - inner: Box::new(Mutex::new(generator)), + *sync_engine.lock().unwrap() = Some(initialized); + *opened.lock().unwrap() = Some(db); + Ok(()) + }); + GeneratorHolder { + inner: Box::new(Mutex::new(generator)), + } + } + + #[napi] + pub fn io_loop_sync(&self) -> napi::Result<()> { + self.io.run_once().map_err(|e| { + napi::Error::new(napi::Status::GenericFailure, format!("IO error: {e}")) + })?; + Ok(()) + } + + /// Runs the I/O loop asynchronously, returning a Promise. + #[napi(ts_return_type = "Promise")] + pub fn io_loop_async(&self) -> AsyncTask { + let io = self.io.clone(); + AsyncTask::new(IoLoopTask { io }) + } + + #[napi] + pub fn protocol_io(&self) -> Option { + self.protocol.take_request() + } + + #[napi] + pub fn sync(&self) -> GeneratorHolder { + self.run(async move |coro, sync_engine| sync_engine.sync(coro).await) + } + + #[napi] + pub fn push(&self) -> GeneratorHolder { + self.run(async move |coro, sync_engine| sync_engine.push(coro).await) + } + + #[napi] + pub fn pull(&self) -> GeneratorHolder { + self.run(async move |coro, sync_engine| sync_engine.pull(coro).await) + } + + #[napi] + pub fn open(&self) -> napi::Result { + let opened = self.opened.lock().unwrap(); + let Some(opened) = opened.as_ref() else { + return Err(napi::Error::new( + napi::Status::GenericFailure, + "sync_engine must be initialized".to_string(), + )); + }; + Ok(opened.clone()) + } + + fn run( + &self, + f: impl AsyncFnOnce( + &Coro, + &mut DatabaseSyncEngine, + ) -> turso_sync_engine::Result<()> + + 'static, + ) -> GeneratorHolder { + let sync_engine = self.sync_engine.clone(); + #[allow(clippy::await_holding_lock)] + let generator = genawaiter::sync::Gen::new(|coro| async move { + let Ok(mut sync_engine) = sync_engine.try_lock() else { + let nasty_error = "sync_engine is busy".to_string(); + return Err(turso_sync_engine::errors::Error::DatabaseSyncEngineError( + nasty_error, + )); + }; + let Some(sync_engine) = sync_engine.as_mut() else { + let error = "sync_engine must be initialized".to_string(); + return Err(turso_sync_engine::errors::Error::DatabaseSyncEngineError( + error, + )); + }; + f(&coro, sync_engine).await?; + Ok(()) + }); + GeneratorHolder { + inner: Box::new(Mutex::new(generator)), + } } - } }