cargo fmt

This commit is contained in:
Nikita Sivukhin
2025-08-08 02:09:53 +04:00
parent 54879a36ba
commit 50588c5edf
4 changed files with 282 additions and 279 deletions

View File

@@ -1,3 +1,3 @@
fn main() {
napi_build::setup();
napi_build::setup();
}

View File

@@ -7,39 +7,39 @@ pub const GENERATOR_RESUME_IO: u32 = 0;
pub const GENERATOR_RESUME_DONE: u32 = 1;
pub trait Generator {
fn resume(&mut self, result: Option<String>) -> napi::Result<u32>;
fn resume(&mut self, result: Option<String>) -> napi::Result<u32>;
}
impl<F: Future<Output = turso_sync_engine::Result<()>>> Generator
for genawaiter::sync::Gen<ProtocolCommand, turso_sync_engine::Result<()>, F>
for genawaiter::sync::Gen<ProtocolCommand, turso_sync_engine::Result<()>, F>
{
fn resume(&mut self, error: Option<String>) -> napi::Result<u32> {
let result = match error {
Some(err) => Err(turso_sync_engine::errors::Error::DatabaseSyncEngineError(
format!("JsProtocolIo error: {err}"),
)),
None => Ok(()),
};
match self.resume_with(result) {
genawaiter::GeneratorState::Yielded(ProtocolCommand::IO) => Ok(GENERATOR_RESUME_IO),
genawaiter::GeneratorState::Complete(Ok(())) => Ok(GENERATOR_RESUME_DONE),
genawaiter::GeneratorState::Complete(Err(err)) => Err(napi::Error::new(
napi::Status::GenericFailure,
format!("sync engine operation failed: {err}"),
)),
fn resume(&mut self, error: Option<String>) -> napi::Result<u32> {
let result = match error {
Some(err) => Err(turso_sync_engine::errors::Error::DatabaseSyncEngineError(
format!("JsProtocolIo error: {err}"),
)),
None => Ok(()),
};
match self.resume_with(result) {
genawaiter::GeneratorState::Yielded(ProtocolCommand::IO) => Ok(GENERATOR_RESUME_IO),
genawaiter::GeneratorState::Complete(Ok(())) => Ok(GENERATOR_RESUME_DONE),
genawaiter::GeneratorState::Complete(Err(err)) => Err(napi::Error::new(
napi::Status::GenericFailure,
format!("sync engine operation failed: {err}"),
)),
}
}
}
}
#[napi]
pub struct GeneratorHolder {
pub(crate) inner: Box<Mutex<dyn Generator>>,
pub(crate) inner: Box<Mutex<dyn Generator>>,
}
#[napi]
impl GeneratorHolder {
#[napi]
pub fn resume(&self, error: Option<String>) -> napi::Result<u32> {
self.inner.lock().unwrap().resume(error)
}
#[napi]
pub fn resume(&self, error: Option<String>) -> napi::Result<u32> {
self.inner.lock().unwrap().resume(error)
}
}

View File

@@ -1,8 +1,8 @@
#![deny(clippy::all)]
use std::{
collections::VecDeque,
sync::{Arc, Mutex, MutexGuard},
collections::VecDeque,
sync::{Arc, Mutex, MutexGuard},
};
use napi::bindgen_prelude::*;
@@ -11,18 +11,18 @@ use turso_sync_engine::protocol_io::{DataCompletion, DataPollResult, ProtocolIO}
#[napi]
pub enum JsProtocolRequest {
Http {
method: String,
path: String,
body: Option<Buffer>,
},
FullRead {
path: String,
},
FullWrite {
path: String,
content: Buffer,
},
Http {
method: String,
path: String,
body: Option<Buffer>,
},
FullRead {
path: String,
},
FullWrite {
path: String,
content: Buffer,
},
}
#[derive(Clone)]
@@ -30,166 +30,166 @@ pub enum JsProtocolRequest {
pub struct JsDataCompletion(Arc<Mutex<JsDataCompletionInner>>);
struct JsDataCompletionInner {
status: Option<u16>,
chunks: VecDeque<Buffer>,
finished: bool,
err: Option<String>,
status: Option<u16>,
chunks: VecDeque<Buffer>,
finished: bool,
err: Option<String>,
}
impl JsDataCompletion {
fn inner(&self) -> turso_sync_engine::Result<MutexGuard<JsDataCompletionInner>> {
let inner = self.0.lock().unwrap();
if let Some(err) = &inner.err {
return Err(turso_sync_engine::errors::Error::DatabaseSyncEngineError(
err.clone(),
));
fn inner(&self) -> turso_sync_engine::Result<MutexGuard<JsDataCompletionInner>> {
let inner = self.0.lock().unwrap();
if let Some(err) = &inner.err {
return Err(turso_sync_engine::errors::Error::DatabaseSyncEngineError(
err.clone(),
));
}
Ok(inner)
}
Ok(inner)
}
}
impl DataCompletion for JsDataCompletion {
type DataPollResult = JsDataPollResult;
type DataPollResult = JsDataPollResult;
fn status(&self) -> turso_sync_engine::Result<Option<u16>> {
let inner = self.inner()?;
Ok(inner.status)
}
fn status(&self) -> turso_sync_engine::Result<Option<u16>> {
let inner = self.inner()?;
Ok(inner.status)
}
fn poll_data(&self) -> turso_sync_engine::Result<Option<Self::DataPollResult>> {
let mut inner = self.inner()?;
let chunk = inner.chunks.pop_front();
Ok(chunk.map(JsDataPollResult))
}
fn poll_data(&self) -> turso_sync_engine::Result<Option<Self::DataPollResult>> {
let mut inner = self.inner()?;
let chunk = inner.chunks.pop_front();
Ok(chunk.map(JsDataPollResult))
}
fn is_done(&self) -> turso_sync_engine::Result<bool> {
let inner = self.inner()?;
Ok(inner.finished)
}
fn is_done(&self) -> turso_sync_engine::Result<bool> {
let inner = self.inner()?;
Ok(inner.finished)
}
}
#[napi]
impl JsDataCompletion {
#[napi]
pub fn poison(&self, err: String) {
let mut completion = self.0.lock().unwrap();
completion.err = Some(err);
}
#[napi]
pub fn poison(&self, err: String) {
let mut completion = self.0.lock().unwrap();
completion.err = Some(err);
}
#[napi]
pub fn status(&self, value: u32) {
let mut completion = self.0.lock().unwrap();
completion.status = Some(value as u16);
}
#[napi]
pub fn status(&self, value: u32) {
let mut completion = self.0.lock().unwrap();
completion.status = Some(value as u16);
}
#[napi]
pub fn push(&self, value: Buffer) {
let mut completion = self.0.lock().unwrap();
completion.chunks.push_back(value);
}
#[napi]
pub fn push(&self, value: Buffer) {
let mut completion = self.0.lock().unwrap();
completion.chunks.push_back(value);
}
#[napi]
pub fn done(&self) {
let mut completion = self.0.lock().unwrap();
completion.finished = true;
}
#[napi]
pub fn done(&self) {
let mut completion = self.0.lock().unwrap();
completion.finished = true;
}
}
#[napi]
pub struct JsDataPollResult(Buffer);
impl DataPollResult for JsDataPollResult {
fn data(&self) -> &[u8] {
&self.0
}
fn data(&self) -> &[u8] {
&self.0
}
}
#[napi]
pub struct JsProtocolRequestData {
request: Arc<Mutex<Option<JsProtocolRequest>>>,
completion: JsDataCompletion,
request: Arc<Mutex<Option<JsProtocolRequest>>>,
completion: JsDataCompletion,
}
#[napi]
impl JsProtocolRequestData {
#[napi]
pub fn request(&self) -> JsProtocolRequest {
let mut request = self.request.lock().unwrap();
request.take().unwrap()
}
#[napi]
pub fn completion(&self) -> JsDataCompletion {
self.completion.clone()
}
#[napi]
pub fn request(&self) -> JsProtocolRequest {
let mut request = self.request.lock().unwrap();
request.take().unwrap()
}
#[napi]
pub fn completion(&self) -> JsDataCompletion {
self.completion.clone()
}
}
impl ProtocolIO for JsProtocolIo {
type DataCompletion = JsDataCompletion;
fn http(
&self,
method: &str,
path: &str,
body: Option<Vec<u8>>,
) -> turso_sync_engine::Result<JsDataCompletion> {
Ok(self.add_request(JsProtocolRequest::Http {
method: method.to_string(),
path: path.to_string(),
body: body.map(Buffer::from),
}))
}
type DataCompletion = JsDataCompletion;
fn http(
&self,
method: &str,
path: &str,
body: Option<Vec<u8>>,
) -> turso_sync_engine::Result<JsDataCompletion> {
Ok(self.add_request(JsProtocolRequest::Http {
method: method.to_string(),
path: path.to_string(),
body: body.map(Buffer::from),
}))
}
fn full_read(&self, path: &str) -> turso_sync_engine::Result<Self::DataCompletion> {
Ok(self.add_request(JsProtocolRequest::FullRead {
path: path.to_string(),
}))
}
fn full_read(&self, path: &str) -> turso_sync_engine::Result<Self::DataCompletion> {
Ok(self.add_request(JsProtocolRequest::FullRead {
path: path.to_string(),
}))
}
fn full_write(
&self,
path: &str,
content: Vec<u8>,
) -> turso_sync_engine::Result<Self::DataCompletion> {
Ok(self.add_request(JsProtocolRequest::FullWrite {
path: path.to_string(),
content: Buffer::from(content),
}))
}
fn full_write(
&self,
path: &str,
content: Vec<u8>,
) -> turso_sync_engine::Result<Self::DataCompletion> {
Ok(self.add_request(JsProtocolRequest::FullWrite {
path: path.to_string(),
content: Buffer::from(content),
}))
}
}
#[napi]
pub struct JsProtocolIo {
requests: Mutex<Vec<JsProtocolRequestData>>,
requests: Mutex<Vec<JsProtocolRequestData>>,
}
impl Default for JsProtocolIo {
fn default() -> Self {
Self {
requests: Mutex::new(Vec::new()),
fn default() -> Self {
Self {
requests: Mutex::new(Vec::new()),
}
}
}
}
#[napi]
impl JsProtocolIo {
#[napi]
pub fn take_request(&self) -> Option<JsProtocolRequestData> {
self.requests.lock().unwrap().pop()
}
#[napi]
pub fn take_request(&self) -> Option<JsProtocolRequestData> {
self.requests.lock().unwrap().pop()
}
fn add_request(&self, request: JsProtocolRequest) -> JsDataCompletion {
let completion = JsDataCompletionInner {
chunks: VecDeque::new(),
finished: false,
err: None,
status: None,
};
let completion = JsDataCompletion(Arc::new(Mutex::new(completion)));
fn add_request(&self, request: JsProtocolRequest) -> JsDataCompletion {
let completion = JsDataCompletionInner {
chunks: VecDeque::new(),
finished: false,
err: None,
status: None,
};
let completion = JsDataCompletion(Arc::new(Mutex::new(completion)));
let mut requests = self.requests.lock().unwrap();
requests.push(JsProtocolRequestData {
request: Arc::new(Mutex::new(Some(request))),
completion: completion.clone(),
});
completion
}
let mut requests = self.requests.lock().unwrap();
requests.push(JsProtocolRequestData {
request: Arc::new(Mutex::new(Some(request))),
completion: completion.clone(),
});
completion
}
}

View File

@@ -9,158 +9,161 @@ use napi::bindgen_prelude::AsyncTask;
use napi_derive::napi;
use turso_node::IoLoopTask;
use turso_sync_engine::{
database_sync_engine::{DatabaseSyncEngine, DatabaseSyncEngineOpts},
types::Coro,
database_sync_engine::{DatabaseSyncEngine, DatabaseSyncEngineOpts},
types::Coro,
};
use crate::{
generator::GeneratorHolder,
js_protocol_io::{JsProtocolIo, JsProtocolRequestData},
generator::GeneratorHolder,
js_protocol_io::{JsProtocolIo, JsProtocolRequestData},
};
#[napi(object)]
pub struct DatabaseOpts {
pub path: String,
pub path: String,
}
#[napi]
pub struct SyncEngine {
path: String,
client_name: String,
wal_pull_batch_size: u32,
io: Arc<dyn turso_core::IO>,
protocol: Arc<JsProtocolIo>,
sync_engine: Arc<Mutex<Option<DatabaseSyncEngine<JsProtocolIo>>>>,
opened: Arc<Mutex<Option<turso_node::Database>>>,
path: String,
client_name: String,
wal_pull_batch_size: u32,
io: Arc<dyn turso_core::IO>,
protocol: Arc<JsProtocolIo>,
sync_engine: Arc<Mutex<Option<DatabaseSyncEngine<JsProtocolIo>>>>,
opened: Arc<Mutex<Option<turso_node::Database>>>,
}
#[napi(object)]
pub struct SyncEngineOpts {
pub path: String,
pub client_name: Option<String>,
pub wal_pull_batch_size: Option<u32>,
pub path: String,
pub client_name: Option<String>,
pub wal_pull_batch_size: Option<u32>,
}
#[napi]
impl SyncEngine {
#[napi(constructor)]
pub fn new(opts: SyncEngineOpts) -> napi::Result<Self> {
Ok(SyncEngine {
path: opts.path,
client_name: opts.client_name.unwrap_or("turso-sync-js".to_string()),
wal_pull_batch_size: opts.wal_pull_batch_size.unwrap_or(100),
sync_engine: Arc::new(Mutex::new(None)),
io: Arc::new(turso_core::PlatformIO::new().map_err(|e| {
napi::Error::new(
napi::Status::GenericFailure,
format!("Failed to create IO: {e}"),
)
})?),
protocol: Arc::new(JsProtocolIo::default()),
#[allow(clippy::arc_with_non_send_sync)]
opened: Arc::new(Mutex::new(None)),
})
}
#[napi]
pub fn init(&self) -> GeneratorHolder {
let opts = DatabaseSyncEngineOpts {
client_name: self.client_name.clone(),
wal_pull_batch_size: self.wal_pull_batch_size as u64,
};
let protocol = self.protocol.clone();
let sync_engine = self.sync_engine.clone();
let io = self.io.clone();
let opened = self.opened.clone();
let path = self.path.clone();
let generator = genawaiter::sync::Gen::new(|coro| async move {
let initialized = DatabaseSyncEngine::new(&coro, io.clone(), protocol, &path, opts).await?;
let connection = initialized.connect(&coro).await?;
let db = turso_node::Database::create(None, io.clone(), connection, false);
*sync_engine.lock().unwrap() = Some(initialized);
*opened.lock().unwrap() = Some(db);
Ok(())
});
GeneratorHolder {
inner: Box::new(Mutex::new(generator)),
#[napi(constructor)]
pub fn new(opts: SyncEngineOpts) -> napi::Result<Self> {
Ok(SyncEngine {
path: opts.path,
client_name: opts.client_name.unwrap_or("turso-sync-js".to_string()),
wal_pull_batch_size: opts.wal_pull_batch_size.unwrap_or(100),
sync_engine: Arc::new(Mutex::new(None)),
io: Arc::new(turso_core::PlatformIO::new().map_err(|e| {
napi::Error::new(
napi::Status::GenericFailure,
format!("Failed to create IO: {e}"),
)
})?),
protocol: Arc::new(JsProtocolIo::default()),
#[allow(clippy::arc_with_non_send_sync)]
opened: Arc::new(Mutex::new(None)),
})
}
}
#[napi]
pub fn io_loop_sync(&self) -> napi::Result<()> {
self
.io
.run_once()
.map_err(|e| napi::Error::new(napi::Status::GenericFailure, format!("IO error: {e}")))?;
Ok(())
}
#[napi]
pub fn init(&self) -> GeneratorHolder {
let opts = DatabaseSyncEngineOpts {
client_name: self.client_name.clone(),
wal_pull_batch_size: self.wal_pull_batch_size as u64,
};
/// Runs the I/O loop asynchronously, returning a Promise.
#[napi(ts_return_type = "Promise<void>")]
pub fn io_loop_async(&self) -> AsyncTask<IoLoopTask> {
let io = self.io.clone();
AsyncTask::new(IoLoopTask { io })
}
let protocol = self.protocol.clone();
let sync_engine = self.sync_engine.clone();
let io = self.io.clone();
let opened = self.opened.clone();
let path = self.path.clone();
let generator = genawaiter::sync::Gen::new(|coro| async move {
let initialized =
DatabaseSyncEngine::new(&coro, io.clone(), protocol, &path, opts).await?;
let connection = initialized.connect(&coro).await?;
let db = turso_node::Database::create(None, io.clone(), connection, false);
#[napi]
pub fn protocol_io(&self) -> Option<JsProtocolRequestData> {
self.protocol.take_request()
}
#[napi]
pub fn sync(&self) -> GeneratorHolder {
self.run(async move |coro, sync_engine| sync_engine.sync(coro).await)
}
#[napi]
pub fn push(&self) -> GeneratorHolder {
self.run(async move |coro, sync_engine| sync_engine.push(coro).await)
}
#[napi]
pub fn pull(&self) -> GeneratorHolder {
self.run(async move |coro, sync_engine| sync_engine.pull(coro).await)
}
#[napi]
pub fn open(&self) -> napi::Result<turso_node::Database> {
let opened = self.opened.lock().unwrap();
let Some(opened) = opened.as_ref() else {
return Err(napi::Error::new(
napi::Status::GenericFailure,
"sync_engine must be initialized".to_string(),
));
};
Ok(opened.clone())
}
fn run(
&self,
f: impl AsyncFnOnce(&Coro, &mut DatabaseSyncEngine<JsProtocolIo>) -> turso_sync_engine::Result<()>
+ 'static,
) -> GeneratorHolder {
let sync_engine = self.sync_engine.clone();
#[allow(clippy::await_holding_lock)]
let generator = genawaiter::sync::Gen::new(|coro| async move {
let Ok(mut sync_engine) = sync_engine.try_lock() else {
let nasty_error = "sync_engine is busy".to_string();
return Err(turso_sync_engine::errors::Error::DatabaseSyncEngineError(
nasty_error,
));
};
let Some(sync_engine) = sync_engine.as_mut() else {
let error = "sync_engine must be initialized".to_string();
return Err(turso_sync_engine::errors::Error::DatabaseSyncEngineError(
error,
));
};
f(&coro, sync_engine).await?;
Ok(())
});
GeneratorHolder {
inner: Box::new(Mutex::new(generator)),
*sync_engine.lock().unwrap() = Some(initialized);
*opened.lock().unwrap() = Some(db);
Ok(())
});
GeneratorHolder {
inner: Box::new(Mutex::new(generator)),
}
}
#[napi]
pub fn io_loop_sync(&self) -> napi::Result<()> {
self.io.run_once().map_err(|e| {
napi::Error::new(napi::Status::GenericFailure, format!("IO error: {e}"))
})?;
Ok(())
}
/// Runs the I/O loop asynchronously, returning a Promise.
#[napi(ts_return_type = "Promise<void>")]
pub fn io_loop_async(&self) -> AsyncTask<IoLoopTask> {
let io = self.io.clone();
AsyncTask::new(IoLoopTask { io })
}
#[napi]
pub fn protocol_io(&self) -> Option<JsProtocolRequestData> {
self.protocol.take_request()
}
#[napi]
pub fn sync(&self) -> GeneratorHolder {
self.run(async move |coro, sync_engine| sync_engine.sync(coro).await)
}
#[napi]
pub fn push(&self) -> GeneratorHolder {
self.run(async move |coro, sync_engine| sync_engine.push(coro).await)
}
#[napi]
pub fn pull(&self) -> GeneratorHolder {
self.run(async move |coro, sync_engine| sync_engine.pull(coro).await)
}
#[napi]
pub fn open(&self) -> napi::Result<turso_node::Database> {
let opened = self.opened.lock().unwrap();
let Some(opened) = opened.as_ref() else {
return Err(napi::Error::new(
napi::Status::GenericFailure,
"sync_engine must be initialized".to_string(),
));
};
Ok(opened.clone())
}
fn run(
&self,
f: impl AsyncFnOnce(
&Coro,
&mut DatabaseSyncEngine<JsProtocolIo>,
) -> turso_sync_engine::Result<()>
+ 'static,
) -> GeneratorHolder {
let sync_engine = self.sync_engine.clone();
#[allow(clippy::await_holding_lock)]
let generator = genawaiter::sync::Gen::new(|coro| async move {
let Ok(mut sync_engine) = sync_engine.try_lock() else {
let nasty_error = "sync_engine is busy".to_string();
return Err(turso_sync_engine::errors::Error::DatabaseSyncEngineError(
nasty_error,
));
};
let Some(sync_engine) = sync_engine.as_mut() else {
let error = "sync_engine must be initialized".to_string();
return Err(turso_sync_engine::errors::Error::DatabaseSyncEngineError(
error,
));
};
f(&coro, sync_engine).await?;
Ok(())
});
GeneratorHolder {
inner: Box::new(Mutex::new(generator)),
}
}
}
}