support defered sync in the bindings core

This commit is contained in:
Nikita Sivukhin
2025-10-02 15:57:50 +04:00
parent 5ec2d96bc1
commit 64389447e3
2 changed files with 83 additions and 22 deletions

View File

@@ -45,7 +45,7 @@ pub enum GeneratorResponse {
operations: i64,
main_wal: i64,
revert_wal: i64,
last_pull_unix_time: i64,
last_pull_unix_time: Option<i64>,
last_push_unix_time: Option<i64>,
revision: Option<String>,
},

View File

@@ -24,13 +24,7 @@ use crate::{
#[napi]
pub struct SyncEngine {
path: String,
client_name: String,
wal_pull_batch_size: u32,
long_poll_timeout: Option<std::time::Duration>,
protocol_version: DatabaseSyncEngineProtocolVersion,
tables_ignore: Vec<String>,
use_transform: bool,
opts: SyncEngineOptsFilled,
io: Option<Arc<dyn turso_core::IO>>,
protocol: Option<Arc<JsProtocolIo>>,
sync_engine: Arc<RwLock<Option<DatabaseSyncEngine<JsProtocolIo>>>>,
@@ -123,6 +117,49 @@ pub struct SyncEngineOpts {
pub tables_ignore: Option<Vec<String>>,
pub use_transform: bool,
pub protocol_version: Option<SyncEngineProtocolVersion>,
pub bootstrap_if_empty: bool,
pub remote_encryption: Option<String>,
}
struct SyncEngineOptsFilled {
pub path: String,
pub client_name: String,
pub wal_pull_batch_size: u32,
pub long_poll_timeout: Option<std::time::Duration>,
pub tables_ignore: Vec<String>,
pub use_transform: bool,
pub protocol_version: DatabaseSyncEngineProtocolVersion,
pub bootstrap_if_empty: bool,
pub remote_encryption: Option<CipherMode>,
}
#[derive(Debug, Clone, Copy)]
enum CipherMode {
Aes256Gcm = 1,
Aes128Gcm = 2,
ChaCha20Poly1305 = 3,
Aegis256 = 4,
}
impl CipherMode {
/// Returns the nonce size for this cipher mode.
pub fn required_nonce_size(&self) -> usize {
match self {
CipherMode::Aes256Gcm | CipherMode::Aes128Gcm | CipherMode::ChaCha20Poly1305 => 12,
CipherMode::Aegis256 => 32,
}
}
/// Returns the tag size for this cipher mode.
pub fn required_tag_size(&self) -> usize {
// All supported ciphers use 16-byte tags
16
}
/// Returns the total metadata size (nonce + tag) for this cipher mode.
fn required_metadata_size(&self) -> usize {
self.required_nonce_size() + self.required_tag_size()
}
}
#[napi]
@@ -158,7 +195,7 @@ impl SyncEngine {
tracing: opts.tracing.clone(),
}),
)?));
Ok(SyncEngine {
let opts_filled = SyncEngineOptsFilled {
path: opts.path,
client_name: opts.client_name.unwrap_or("turso-sync-js".to_string()),
wal_pull_batch_size: opts.wal_pull_batch_size.unwrap_or(100),
@@ -167,37 +204,61 @@ impl SyncEngine {
.map(|x| std::time::Duration::from_millis(x as u64)),
tables_ignore: opts.tables_ignore.unwrap_or_default(),
use_transform: opts.use_transform,
#[allow(clippy::arc_with_non_send_sync)]
sync_engine: Arc::new(RwLock::new(None)),
io: Some(io),
protocol: Some(Arc::new(JsProtocolIo::default())),
#[allow(clippy::arc_with_non_send_sync)]
db,
protocol_version: match opts.protocol_version {
Some(SyncEngineProtocolVersion::Legacy) | None => {
DatabaseSyncEngineProtocolVersion::Legacy
}
_ => DatabaseSyncEngineProtocolVersion::V1,
},
bootstrap_if_empty: opts.bootstrap_if_empty,
remote_encryption: match opts.remote_encryption.as_deref() {
Some("aes256gcm") => Some(CipherMode::Aes256Gcm),
Some("aes128gcm") => Some(CipherMode::Aes128Gcm),
Some("chacha20poly1305") => Some(CipherMode::ChaCha20Poly1305),
Some("aegis256") => Some(CipherMode::Aegis256),
None => None,
_ => {
return Err(napi::Error::new(
napi::Status::GenericFailure,
"unsupported remote cipher",
))
}
},
};
Ok(SyncEngine {
opts: opts_filled,
#[allow(clippy::arc_with_non_send_sync)]
sync_engine: Arc::new(RwLock::new(None)),
io: Some(io),
protocol: Some(Arc::new(JsProtocolIo::default())),
#[allow(clippy::arc_with_non_send_sync)]
db,
})
}
#[napi]
pub fn connect(&mut self) -> napi::Result<GeneratorHolder> {
let opts = DatabaseSyncEngineOpts {
client_name: self.client_name.clone(),
wal_pull_batch_size: self.wal_pull_batch_size as u64,
long_poll_timeout: self.long_poll_timeout,
tables_ignore: self.tables_ignore.clone(),
use_transform: self.use_transform,
protocol_version_hint: self.protocol_version,
client_name: self.opts.client_name.clone(),
wal_pull_batch_size: self.opts.wal_pull_batch_size as u64,
long_poll_timeout: self.opts.long_poll_timeout,
tables_ignore: self.opts.tables_ignore.clone(),
use_transform: self.opts.use_transform,
protocol_version_hint: self.opts.protocol_version,
bootstrap_if_empty: self.opts.bootstrap_if_empty,
reserved_bytes: self
.opts
.remote_encryption
.clone()
.map(|x| x.required_metadata_size())
.unwrap_or(0),
};
let io = self.io()?;
let protocol = self.protocol()?;
let sync_engine = self.sync_engine.clone();
let db = self.db.clone();
let path = self.path.clone();
let path = self.opts.path.clone();
let generator = genawaiter::sync::Gen::new(|coro| async move {
let coro = Coro::new((), coro);
let initialized =