cleanup lss code

This commit is contained in:
Evan Feenstra
2023-06-01 17:21:21 -07:00
parent 67d988a76f
commit bedf7062e1
3 changed files with 104 additions and 74 deletions

View File

@@ -1,5 +1,6 @@
use rocket::tokio::sync::{mpsc, oneshot};
use serde::{Deserialize, Serialize};
use anyhow::Result;
#[derive(Debug, Serialize, Deserialize)]
pub struct Connections {
@@ -63,6 +64,30 @@ impl ChannelRequest {
};
(cr, reply_rx)
}
pub async fn send(topic: &str, message: Vec<u8>, sender: &mpsc::Sender<ChannelRequest>) -> Result<Vec<u8>> {
let (reply_tx, reply_rx) = oneshot::channel();
let req = ChannelRequest {
topic: topic.to_string(),
message,
reply_tx,
cid: None,
};
let _ = sender.send(req).await;
let reply = reply_rx.await?;
Ok(reply.reply)
}
pub async fn send_for(cid: &str, topic: &str, message: Vec<u8>, sender: &mpsc::Sender<ChannelRequest>) -> Result<Vec<u8>> {
let (reply_tx, reply_rx) = oneshot::channel();
let req = ChannelRequest {
topic: topic.to_string(),
message,
reply_tx,
cid: Some(cid.to_string()),
};
let _ = sender.send(req).await;
let reply = reply_rx.await?;
Ok(reply.reply)
}
pub fn for_cid(&mut self, cid: &str) {
self.cid = Some(cid.to_string())
}

66
broker/src/lss.rs Normal file
View File

@@ -0,0 +1,66 @@
use anyhow::Result;
use rocket::tokio::{
self,
sync::{mpsc},
};
use crate::conn::{ChannelRequest, LssReq};
use lss_connector::{LssBroker, Response};
use sphinx_signer::sphinx_glyph::topics;
pub async fn lss_setup(uri: &str, mqtt_tx: mpsc::Sender<ChannelRequest>) -> Result<LssBroker> {
// LSS required
let (spk, msg_bytes) = LssBroker::get_server_pubkey(uri).await?;
let reply = ChannelRequest::send(topics::LSS_MSG, msg_bytes, &mqtt_tx).await?;
let ir = Response::from_slice(&reply)?.as_init()?;
let (lss_conn, msg_bytes2) = LssBroker::new(uri, ir, spk).await?;
let reply2 = ChannelRequest::send(topics::LSS_MSG, msg_bytes2, &mqtt_tx).await?;
let cr = Response::from_slice(&reply2)?.as_created()?;
lss_conn.handle(Response::Created(cr)).await?;
Ok(lss_conn)
}
pub fn lss_tasks(lss_conn: LssBroker, mut lss_rx: mpsc::Receiver<LssReq>, mut reconn_rx: mpsc::Receiver<(String, bool)>, mqtt_tx: mpsc::Sender<ChannelRequest>) {
// msg handler (from CLN looper)
let lss_conn_ = lss_conn.clone();
tokio::task::spawn(async move{
while let Some(req) = lss_rx.recv().await {
match lss_conn_.handle_bytes(&req.message).await {
Ok(msg) => {
log::info!("payload to send {:?}", &msg);
let _ = req.reply_tx.send(msg);
},
Err(e) => {
log::error!("failed lss_handle {:?}", e);
}
}
}
});
// reconnect handler (when a client reconnects)
let lss_conn_ = lss_conn.clone();
let mqtt_tx_ = mqtt_tx.clone();
tokio::task::spawn(async move{
while let Some((cid, connected)) = reconn_rx.recv().await {
if connected {
log::info!("CLIENT {} reconnected!", cid);
if let Err(e) = reconnect_dance(&cid, &lss_conn_, &mqtt_tx_).await {
log::error!("reconnect dance failed {:?}", e);
}
}
}
});
}
async fn reconnect_dance(cid: &str, lss_conn: &LssBroker, mqtt_tx: &mpsc::Sender<ChannelRequest>) -> Result<()> {
let init_bytes = lss_conn.make_init_msg().await?;
let reply = ChannelRequest::send_for(cid, topics::LSS_MSG, init_bytes, mqtt_tx).await?;
let state_bytes = lss_conn.get_initial_state_msg(&reply).await?;
let reply2 = ChannelRequest::send_for(cid, topics::LSS_MSG, state_bytes, mqtt_tx).await?;
let cr = Response::from_slice(&reply2)?.as_created()?;
lss_conn.handle(Response::Created(cr)).await?;
Ok(())
}

View File

@@ -7,13 +7,13 @@ mod run_test;
mod looper;
mod util;
mod conn;
mod lss;
use crate::conn::{Connections, ChannelRequest, LssReq};
use crate::chain_tracker::MqttSignerPort;
use crate::mqtt::{check_auth, start_broker};
use crate::looper::SignerLoop;
use crate::util::{read_broker_config, Settings};
use anyhow::Result;
use clap::{arg, App};
use rocket::tokio::{
self,
@@ -28,9 +28,6 @@ use vls_proxy::client::UnixClient;
use vls_proxy::connection::{open_parent_fd, UnixConnection};
use vls_proxy::portfront::SignerPortFront;
use vls_proxy::util::{add_hsmd_args, handle_hsmd_version};
use lss_connector::{LssBroker, Response};
use sphinx_signer::sphinx_glyph::topics;
#[rocket::launch]
async fn rocket() -> _ {
@@ -81,16 +78,22 @@ async fn run_main(parent_fd: i32) -> rocket::Rocket<rocket::Build> {
// waits until first connection
let conns = broker_setup(settings, mqtt_rx, reconn_tx.clone(), error_tx.clone()).await;
let (lss_tx, lss_rx) = mpsc::channel(10000);
let (lss_tx, lss_rx) = mpsc::channel::<LssReq>(10000);
let _lss_broker = if let Ok(lss_uri) = env::var("VLS_LSS") {
// waits until LSS confirmation from signer
let lss_broker = match lss_setup(&lss_uri, lss_rx, reconn_rx, mqtt_tx.clone()).await{
Ok(l) => l,
Err(e) => {
let _ = error_tx.send(e.to_string().as_bytes().to_vec());
panic!("{:?}", e);
let lss_broker = loop {
match lss::lss_setup(&lss_uri, mqtt_tx.clone()).await{
Ok(l) => {
break l;
},
Err(e) => {
let _ = error_tx.send(e.to_string().as_bytes().to_vec());
log::error!("failed LSS setup, trying again...");
tokio::time::sleep(std::time::Duration::from_secs(3)).await;
}
}
};
lss::lss_tasks(lss_broker.clone(), lss_rx, reconn_rx, mqtt_tx.clone());
log::info!("=> lss broker connection created!");
Some(lss_broker)
} else {
@@ -128,70 +131,6 @@ async fn run_main(parent_fd: i32) -> rocket::Rocket<rocket::Build> {
routes::launch_rocket(mqtt_tx, error_tx, settings, conns)
}
pub async fn lss_setup(uri: &str, mut lss_rx: mpsc::Receiver<LssReq>, mut reconn_rx: mpsc::Receiver<(String, bool)>, mqtt_tx: mpsc::Sender<ChannelRequest>) -> Result<LssBroker> {
// LSS required
let (spk, msg_bytes) = LssBroker::get_server_pubkey(uri).await?;
let (req1, reply_rx) = ChannelRequest::new(topics::LSS_MSG, msg_bytes);
let _ = mqtt_tx.send(req1).await;
let first_lss_response = reply_rx.await?;
let ir = Response::from_slice(&first_lss_response.reply)?.as_init()?;
let (lss_conn, msg_bytes2) = LssBroker::new(uri, ir, spk).await?;
let (req2, reply_rx2) = ChannelRequest::new(topics::LSS_MSG, msg_bytes2);
let _ = mqtt_tx.send(req2).await;
let created_res = reply_rx2.await?;
let cr = Response::from_slice(&created_res.reply)?.as_created()?;
lss_conn.handle(Response::Created(cr)).await?;
// msg handler (from CLN looper)
let lss_conn_ = lss_conn.clone();
tokio::task::spawn(async move{
while let Some(req) = lss_rx.recv().await {
match lss_conn_.handle_bytes(&req.message).await {
Ok(msg) => {
log::info!("payload to send {:?}", &msg);
let _ = req.reply_tx.send(msg);
},
Err(e) => {
log::error!("failed lss_handle {:?}", e);
}
}
}
});
// reconnect handler (when a client reconnects)
let lss_conn_ = lss_conn.clone();
let mqtt_tx_ = mqtt_tx.clone();
tokio::task::spawn(async move{
while let Some((cid, connected)) = reconn_rx.recv().await {
if connected {
log::info!("CLIENT {} reconnected!", cid);
if let Err(e) = reconnect_dance(&cid, &lss_conn_, &mqtt_tx_).await {
log::error!("reconnect dance failed {:?}", e);
}
}
}
});
Ok(lss_conn)
}
async fn reconnect_dance(cid: &str, lss_conn: &LssBroker, mqtt_tx: &mpsc::Sender<ChannelRequest>) -> Result<()> {
let init_bytes = lss_conn.make_init_msg().await?;
let (req, reply_rx) = ChannelRequest::new_for(cid, topics::LSS_MSG, init_bytes);
let _ = mqtt_tx.send(req).await;
let first_lss_response = reply_rx.await?;
let state_bytes = lss_conn.get_initial_state_msg(&first_lss_response.reply).await?;
let (req2, reply_rx2) = ChannelRequest::new_for(cid, topics::LSS_MSG, state_bytes);
let _ = mqtt_tx.send(req2).await;
let created_res = reply_rx2.await?;
let cr = Response::from_slice(&created_res.reply)?.as_created()?;
lss_conn.handle(Response::Created(cr)).await?;
Ok(())
}
// blocks until a connection received
pub async fn broker_setup(
settings: Settings,