diff --git a/broker/Cargo.lock b/broker/Cargo.lock index e5a08cc..4a04550 100644 --- a/broker/Cargo.lock +++ b/broker/Cargo.lock @@ -1713,7 +1713,7 @@ dependencies = [ [[package]] name = "lss-connector" version = "0.1.0" -source = "git+https://github.com/stakwork/sphinx-rs#607a8d64f5c1f87d7d5dad61806f9d8d12bd87d6" +source = "git+https://github.com/stakwork/sphinx-rs#07be5d75f102ac329c210604f58ea0ca78f053a2" dependencies = [ "anyhow", "lightning-storage-server", @@ -3173,7 +3173,7 @@ dependencies = [ [[package]] name = "sphinx-auther" version = "0.1.12" -source = "git+https://github.com/stakwork/sphinx-rs#607a8d64f5c1f87d7d5dad61806f9d8d12bd87d6" +source = "git+https://github.com/stakwork/sphinx-rs#07be5d75f102ac329c210604f58ea0ca78f053a2" dependencies = [ "anyhow", "base64 0.13.1", @@ -3185,7 +3185,7 @@ dependencies = [ [[package]] name = "sphinx-glyph" version = "0.1.2" -source = "git+https://github.com/stakwork/sphinx-rs#607a8d64f5c1f87d7d5dad61806f9d8d12bd87d6" +source = "git+https://github.com/stakwork/sphinx-rs#07be5d75f102ac329c210604f58ea0ca78f053a2" dependencies = [ "anyhow", "hex", @@ -3231,7 +3231,7 @@ dependencies = [ [[package]] name = "sphinx-signer" version = "0.1.0" -source = "git+https://github.com/stakwork/sphinx-rs#607a8d64f5c1f87d7d5dad61806f9d8d12bd87d6" +source = "git+https://github.com/stakwork/sphinx-rs#07be5d75f102ac329c210604f58ea0ca78f053a2" dependencies = [ "anyhow", "bip39", diff --git a/broker/src/main.rs b/broker/src/main.rs index fc066e5..1547abb 100644 --- a/broker/src/main.rs +++ b/broker/src/main.rs @@ -28,7 +28,7 @@ use vls_proxy::client::UnixClient; use vls_proxy::connection::{open_parent_fd, UnixConnection}; use vls_proxy::portfront::SignerPortFront; use vls_proxy::util::{add_hsmd_args, handle_hsmd_version}; -use lss_connector::{LssBroker, Response, lss_handle}; +use lss_connector::{LssBroker, Response}; use sphinx_signer::sphinx_glyph::topics; @@ -76,13 +76,15 @@ async fn run_main(parent_fd: i32) -> rocket::Rocket { let (error_tx, error_rx) = broadcast::channel(10000); error_log::log_errors(error_rx); + let (reconn_tx, reconn_rx) = mpsc::channel::<(String, bool)>(10000); + // waits until first connection - let conns = broker_setup(settings, mqtt_rx, error_tx.clone()).await; + let conns = broker_setup(settings, mqtt_rx, reconn_tx.clone(), error_tx.clone()).await; let (lss_tx, lss_rx) = mpsc::channel(10000); let _lss_broker = if let Ok(lss_uri) = env::var("VLS_LSS") { // waits until LSS confirmation from signer - let lss_broker = match lss_setup(&lss_uri, lss_rx, mqtt_tx.clone()).await{ + let lss_broker = match lss_setup(&lss_uri, lss_rx, reconn_rx, mqtt_tx.clone()).await{ Ok(l) => l, Err(e) => { let _ = error_tx.send(e.to_string().as_bytes().to_vec()); @@ -126,14 +128,13 @@ async fn run_main(parent_fd: i32) -> rocket::Rocket { routes::launch_rocket(mqtt_tx, error_tx, settings, conns) } -pub async fn lss_setup(uri: &str, mut lss_rx: mpsc::Receiver, mqtt_tx: mpsc::Sender) -> Result { +pub async fn lss_setup(uri: &str, mut lss_rx: mpsc::Receiver, mut reconn_rx: mpsc::Receiver<(String, bool)>, mqtt_tx: mpsc::Sender) -> Result { // LSS required let (spk, msg_bytes) = LssBroker::get_server_pubkey(uri).await?; let (req1, reply_rx) = ChannelRequest::new(topics::LSS_MSG, msg_bytes); let _ = mqtt_tx.send(req1).await; let first_lss_response = reply_rx.await?; - let ir = Response::from_slice(&first_lss_response.reply)?.as_init()?; let (lss_conn, msg_bytes2) = LssBroker::new(uri, ir, spk).await?; @@ -144,10 +145,11 @@ pub async fn lss_setup(uri: &str, mut lss_rx: mpsc::Receiver, mqtt_tx: m lss_conn.handle(Response::Created(cr)).await?; - let persister = lss_conn.persister(); + // msg handler (from CLN looper) + let lss_conn_ = lss_conn.clone(); tokio::task::spawn(async move{ while let Some(req) = lss_rx.recv().await { - match lss_handle(&persister, &req.message).await { + match lss_conn_.handle_bytes(&req.message).await { Ok(msg) => { log::info!("payload to send {:?}", &msg); let _ = req.reply_tx.send(msg); @@ -159,13 +161,42 @@ pub async fn lss_setup(uri: &str, mut lss_rx: mpsc::Receiver, mqtt_tx: m } }); + // reconnect handler (when a client reconnects) + let lss_conn_ = lss_conn.clone(); + let mqtt_tx_ = mqtt_tx.clone(); + tokio::task::spawn(async move{ + while let Some((cid, connected)) = reconn_rx.recv().await { + if connected { + log::info!("CLIENT {} reconnected!", cid); + if let Err(e) = reconnect_dance(&cid, &lss_conn_, &mqtt_tx_).await { + log::error!("reconnect dance failed {:?}", e); + } + } + } + }); + Ok(lss_conn) } +async fn reconnect_dance(cid: &str, lss_conn: &LssBroker, mqtt_tx: &mpsc::Sender) -> Result<()> { + let init_bytes = lss_conn.make_init_msg().await?; + let (req, reply_rx) = ChannelRequest::new_for(cid, topics::LSS_MSG, init_bytes); + let _ = mqtt_tx.send(req).await; + let first_lss_response = reply_rx.await?; + let state_bytes = lss_conn.get_initial_state_msg(&first_lss_response.reply).await?; + let (req2, reply_rx2) = ChannelRequest::new_for(cid, topics::LSS_MSG, state_bytes); + let _ = mqtt_tx.send(req2).await; + let created_res = reply_rx2.await?; + let cr = Response::from_slice(&created_res.reply)?.as_created()?; + lss_conn.handle(Response::Created(cr)).await?; + Ok(()) +} + // blocks until a connection received pub async fn broker_setup( settings: Settings, mqtt_rx: mpsc::Receiver, + reconn_tx: mpsc::Sender<(String, bool)>, error_tx: broadcast::Sender>, ) -> Arc> { let (auth_tx, auth_rx) = std::sync::mpsc::channel::(); @@ -199,6 +230,7 @@ pub async fn broker_setup( // client connections state let (startup_tx, startup_rx) = std_oneshot::channel(); let conns_ = conns.clone(); + let reconn_tx_ = reconn_tx.clone(); std::thread::spawn(move || { log::info!("=> wait for connected status"); // wait for connection = true @@ -211,6 +243,7 @@ pub async fn broker_setup( while let Ok((cid, connected)) = status_rx.recv() { let mut cs = conns_.lock().unwrap(); cs.client_action(&cid, connected); + let _ = reconn_tx_.blocking_send((cid, connected)); drop(cs) } }); diff --git a/broker/src/run_test.rs b/broker/src/run_test.rs index c5fb203..370146c 100644 --- a/broker/src/run_test.rs +++ b/broker/src/run_test.rs @@ -17,11 +17,12 @@ pub async fn run_test() -> rocket::Rocket { let settings = Settings::default(); let (mqtt_tx, mqtt_rx) = mpsc::channel(10000); let (error_tx, error_rx) = broadcast::channel(10000); + let (conn_tx, _conn_rx) = mpsc::channel(10000); crate::error_log::log_errors(error_rx); // block until connection - let conns = crate::broker_setup(settings, mqtt_rx, error_tx.clone()).await; + let conns = crate::broker_setup(settings, mqtt_rx, conn_tx.clone(), error_tx.clone()).await; log::info!("=> off to the races!"); let tx_ = mqtt_tx.clone();