From bedf7062e1208f01ff3f457cc3c6a4b310d1bef0 Mon Sep 17 00:00:00 2001 From: Evan Feenstra Date: Thu, 1 Jun 2023 17:21:21 -0700 Subject: [PATCH] cleanup lss code --- broker/src/conn.rs | 25 +++++++++++++ broker/src/lss.rs | 66 +++++++++++++++++++++++++++++++++++ broker/src/main.rs | 87 +++++++--------------------------------------- 3 files changed, 104 insertions(+), 74 deletions(-) create mode 100644 broker/src/lss.rs diff --git a/broker/src/conn.rs b/broker/src/conn.rs index baf2894..61cc729 100644 --- a/broker/src/conn.rs +++ b/broker/src/conn.rs @@ -1,5 +1,6 @@ use rocket::tokio::sync::{mpsc, oneshot}; use serde::{Deserialize, Serialize}; +use anyhow::Result; #[derive(Debug, Serialize, Deserialize)] pub struct Connections { @@ -63,6 +64,30 @@ impl ChannelRequest { }; (cr, reply_rx) } + pub async fn send(topic: &str, message: Vec, sender: &mpsc::Sender) -> Result> { + let (reply_tx, reply_rx) = oneshot::channel(); + let req = ChannelRequest { + topic: topic.to_string(), + message, + reply_tx, + cid: None, + }; + let _ = sender.send(req).await; + let reply = reply_rx.await?; + Ok(reply.reply) + } + pub async fn send_for(cid: &str, topic: &str, message: Vec, sender: &mpsc::Sender) -> Result> { + let (reply_tx, reply_rx) = oneshot::channel(); + let req = ChannelRequest { + topic: topic.to_string(), + message, + reply_tx, + cid: Some(cid.to_string()), + }; + let _ = sender.send(req).await; + let reply = reply_rx.await?; + Ok(reply.reply) + } pub fn for_cid(&mut self, cid: &str) { self.cid = Some(cid.to_string()) } diff --git a/broker/src/lss.rs b/broker/src/lss.rs new file mode 100644 index 0000000..07df782 --- /dev/null +++ b/broker/src/lss.rs @@ -0,0 +1,66 @@ +use anyhow::Result; +use rocket::tokio::{ + self, + sync::{mpsc}, +}; +use crate::conn::{ChannelRequest, LssReq}; +use lss_connector::{LssBroker, Response}; +use sphinx_signer::sphinx_glyph::topics; + +pub async fn lss_setup(uri: &str, mqtt_tx: mpsc::Sender) -> Result { + + // LSS required + let (spk, msg_bytes) = LssBroker::get_server_pubkey(uri).await?; + let reply = ChannelRequest::send(topics::LSS_MSG, msg_bytes, &mqtt_tx).await?; + let ir = Response::from_slice(&reply)?.as_init()?; + + let (lss_conn, msg_bytes2) = LssBroker::new(uri, ir, spk).await?; + let reply2 = ChannelRequest::send(topics::LSS_MSG, msg_bytes2, &mqtt_tx).await?; + let cr = Response::from_slice(&reply2)?.as_created()?; + + lss_conn.handle(Response::Created(cr)).await?; + + Ok(lss_conn) +} + +pub fn lss_tasks(lss_conn: LssBroker, mut lss_rx: mpsc::Receiver, mut reconn_rx: mpsc::Receiver<(String, bool)>, mqtt_tx: mpsc::Sender) { + // msg handler (from CLN looper) + let lss_conn_ = lss_conn.clone(); + tokio::task::spawn(async move{ + while let Some(req) = lss_rx.recv().await { + match lss_conn_.handle_bytes(&req.message).await { + Ok(msg) => { + log::info!("payload to send {:?}", &msg); + let _ = req.reply_tx.send(msg); + }, + Err(e) => { + log::error!("failed lss_handle {:?}", e); + } + } + } + }); + + // reconnect handler (when a client reconnects) + let lss_conn_ = lss_conn.clone(); + let mqtt_tx_ = mqtt_tx.clone(); + tokio::task::spawn(async move{ + while let Some((cid, connected)) = reconn_rx.recv().await { + if connected { + log::info!("CLIENT {} reconnected!", cid); + if let Err(e) = reconnect_dance(&cid, &lss_conn_, &mqtt_tx_).await { + log::error!("reconnect dance failed {:?}", e); + } + } + } + }); +} + +async fn reconnect_dance(cid: &str, lss_conn: &LssBroker, mqtt_tx: &mpsc::Sender) -> Result<()> { + let init_bytes = lss_conn.make_init_msg().await?; + let reply = ChannelRequest::send_for(cid, topics::LSS_MSG, init_bytes, mqtt_tx).await?; + let state_bytes = lss_conn.get_initial_state_msg(&reply).await?; + let reply2 = ChannelRequest::send_for(cid, topics::LSS_MSG, state_bytes, mqtt_tx).await?; + let cr = Response::from_slice(&reply2)?.as_created()?; + lss_conn.handle(Response::Created(cr)).await?; + Ok(()) +} diff --git a/broker/src/main.rs b/broker/src/main.rs index d1febe7..187e57c 100644 --- a/broker/src/main.rs +++ b/broker/src/main.rs @@ -7,13 +7,13 @@ mod run_test; mod looper; mod util; mod conn; +mod lss; use crate::conn::{Connections, ChannelRequest, LssReq}; use crate::chain_tracker::MqttSignerPort; use crate::mqtt::{check_auth, start_broker}; use crate::looper::SignerLoop; use crate::util::{read_broker_config, Settings}; -use anyhow::Result; use clap::{arg, App}; use rocket::tokio::{ self, @@ -28,9 +28,6 @@ use vls_proxy::client::UnixClient; use vls_proxy::connection::{open_parent_fd, UnixConnection}; use vls_proxy::portfront::SignerPortFront; use vls_proxy::util::{add_hsmd_args, handle_hsmd_version}; -use lss_connector::{LssBroker, Response}; -use sphinx_signer::sphinx_glyph::topics; - #[rocket::launch] async fn rocket() -> _ { @@ -81,16 +78,22 @@ async fn run_main(parent_fd: i32) -> rocket::Rocket { // waits until first connection let conns = broker_setup(settings, mqtt_rx, reconn_tx.clone(), error_tx.clone()).await; - let (lss_tx, lss_rx) = mpsc::channel(10000); + let (lss_tx, lss_rx) = mpsc::channel::(10000); let _lss_broker = if let Ok(lss_uri) = env::var("VLS_LSS") { // waits until LSS confirmation from signer - let lss_broker = match lss_setup(&lss_uri, lss_rx, reconn_rx, mqtt_tx.clone()).await{ - Ok(l) => l, - Err(e) => { - let _ = error_tx.send(e.to_string().as_bytes().to_vec()); - panic!("{:?}", e); + let lss_broker = loop { + match lss::lss_setup(&lss_uri, mqtt_tx.clone()).await{ + Ok(l) => { + break l; + }, + Err(e) => { + let _ = error_tx.send(e.to_string().as_bytes().to_vec()); + log::error!("failed LSS setup, trying again..."); + tokio::time::sleep(std::time::Duration::from_secs(3)).await; + } } }; + lss::lss_tasks(lss_broker.clone(), lss_rx, reconn_rx, mqtt_tx.clone()); log::info!("=> lss broker connection created!"); Some(lss_broker) } else { @@ -128,70 +131,6 @@ async fn run_main(parent_fd: i32) -> rocket::Rocket { routes::launch_rocket(mqtt_tx, error_tx, settings, conns) } -pub async fn lss_setup(uri: &str, mut lss_rx: mpsc::Receiver, mut reconn_rx: mpsc::Receiver<(String, bool)>, mqtt_tx: mpsc::Sender) -> Result { - - // LSS required - let (spk, msg_bytes) = LssBroker::get_server_pubkey(uri).await?; - let (req1, reply_rx) = ChannelRequest::new(topics::LSS_MSG, msg_bytes); - let _ = mqtt_tx.send(req1).await; - let first_lss_response = reply_rx.await?; - let ir = Response::from_slice(&first_lss_response.reply)?.as_init()?; - - let (lss_conn, msg_bytes2) = LssBroker::new(uri, ir, spk).await?; - let (req2, reply_rx2) = ChannelRequest::new(topics::LSS_MSG, msg_bytes2); - let _ = mqtt_tx.send(req2).await; - let created_res = reply_rx2.await?; - let cr = Response::from_slice(&created_res.reply)?.as_created()?; - - lss_conn.handle(Response::Created(cr)).await?; - - // msg handler (from CLN looper) - let lss_conn_ = lss_conn.clone(); - tokio::task::spawn(async move{ - while let Some(req) = lss_rx.recv().await { - match lss_conn_.handle_bytes(&req.message).await { - Ok(msg) => { - log::info!("payload to send {:?}", &msg); - let _ = req.reply_tx.send(msg); - }, - Err(e) => { - log::error!("failed lss_handle {:?}", e); - } - } - } - }); - - // reconnect handler (when a client reconnects) - let lss_conn_ = lss_conn.clone(); - let mqtt_tx_ = mqtt_tx.clone(); - tokio::task::spawn(async move{ - while let Some((cid, connected)) = reconn_rx.recv().await { - if connected { - log::info!("CLIENT {} reconnected!", cid); - if let Err(e) = reconnect_dance(&cid, &lss_conn_, &mqtt_tx_).await { - log::error!("reconnect dance failed {:?}", e); - } - } - } - }); - - Ok(lss_conn) -} - -async fn reconnect_dance(cid: &str, lss_conn: &LssBroker, mqtt_tx: &mpsc::Sender) -> Result<()> { - let init_bytes = lss_conn.make_init_msg().await?; - let (req, reply_rx) = ChannelRequest::new_for(cid, topics::LSS_MSG, init_bytes); - let _ = mqtt_tx.send(req).await; - let first_lss_response = reply_rx.await?; - let state_bytes = lss_conn.get_initial_state_msg(&first_lss_response.reply).await?; - let (req2, reply_rx2) = ChannelRequest::new_for(cid, topics::LSS_MSG, state_bytes); - let _ = mqtt_tx.send(req2).await; - let created_res = reply_rx2.await?; - let cr = Response::from_slice(&created_res.reply)?.as_created()?; - lss_conn.handle(Response::Created(cr)).await?; - Ok(()) -} - // blocks until a connection received pub async fn broker_setup( settings: Settings,