From 3891c15907be6f999100ec014bf440c29f27cde6 Mon Sep 17 00:00:00 2001 From: Evan Feenstra Date: Mon, 26 Jun 2023 13:46:50 -0700 Subject: [PATCH] include LSS tx in ChainTracker message handler --- broker/src/chain_tracker.rs | 56 +++++++++++++++++++++++-------------- broker/src/main.rs | 2 +- 2 files changed, 36 insertions(+), 22 deletions(-) diff --git a/broker/src/chain_tracker.rs b/broker/src/chain_tracker.rs index a3c678e..a03506f 100644 --- a/broker/src/chain_tracker.rs +++ b/broker/src/chain_tracker.rs @@ -1,19 +1,20 @@ -use crate::conn::{ChannelReply, ChannelRequest}; +use crate::conn::{ChannelRequest, LssReq}; +use anyhow::Result; use async_trait::async_trait; -use rocket::tokio::sync::{mpsc, oneshot}; +use rocket::tokio::sync::mpsc; use sphinx_signer::{parser, sphinx_glyph::topics}; -use vls_protocol::{Error, Result}; +use vls_protocol::Error; use vls_protocol_client::{ClientResult, SignerPort}; pub struct MqttSignerPort { sender: mpsc::Sender, + lss_tx: mpsc::Sender, } #[async_trait] impl SignerPort for MqttSignerPort { async fn handle_message(&self, message: Vec) -> ClientResult> { - let reply_rx = self.send_request(message).await?; - self.get_reply(reply_rx).await + Ok(self.send_and_wait(message).await.map_err(|_| Error::Eof)?) } fn is_ready(&self) -> bool { @@ -22,24 +23,37 @@ impl SignerPort for MqttSignerPort { } impl MqttSignerPort { - pub fn new(sender: mpsc::Sender) -> Self { - Self { sender } + pub fn new(sender: mpsc::Sender, lss_tx: mpsc::Sender) -> Self { + Self { sender, lss_tx } } - async fn send_request(&self, message: Vec) -> Result> { - let m = parser::raw_request_from_bytes(message, 0, [0;33], 0)?; - let (request, reply_rx) = ChannelRequest::new(topics::VLS, m); - log::info!("=> MqttSignerPort send request now"); - self.sender.send(request).await.map_err(|_| Error::Eof)?; - log::info!("=> MqttSignerPort sent"); - Ok(reply_rx) - } - - async fn get_reply(&self, reply_rx: oneshot::Receiver) -> ClientResult> { - let reply = reply_rx.await.map_err(|_| Error::Eof)?; - log::info!("=> MqttSignerPort got response {:?}", reply.reply); - let r = parser::raw_response_from_bytes(reply.reply, 0)?; - log::info!("=> MqttSignerPort parsed response {:?}", r); + async fn send_and_wait(&self, message: Vec) -> Result> { + let m = parser::raw_request_from_bytes(message, 0, [0; 33], 0)?; + let (res_topic, res) = self.send_request_wait(topics::VLS, m).await?; + let mut the_res = res.clone(); + if res_topic == topics::LSS_RES { + // send LSS instead + let lss_reply = self.send_lss(res).await?; + let (_res_topic, res2) = self.send_request_wait(topics::LSS_MSG, lss_reply).await?; + the_res = res2; + } + let r = parser::raw_response_from_bytes(the_res, 0)?; Ok(r) } + + async fn send_request_wait(&self, topic: &str, message: Vec) -> Result<(String, Vec)> { + let (request, reply_rx) = ChannelRequest::new(topic, message); + self.sender.send(request).await?; + let reply = reply_rx.await?; + Ok((reply.topic_end, reply.reply)) + } + + async fn send_lss(&self, message: Vec) -> Result> { + // Send a request to the MQTT handler to send to signer + let (request, reply_rx) = LssReq::new(message); + // This can fail if MQTT shuts down + self.lss_tx.send(request).await?; + let res = reply_rx.await?; + Ok(res) + } } diff --git a/broker/src/main.rs b/broker/src/main.rs index bd800b6..8a14393 100644 --- a/broker/src/main.rs +++ b/broker/src/main.rs @@ -105,7 +105,7 @@ async fn run_main(parent_fd: i32) -> rocket::Rocket { }; if let Ok(btc_url) = env::var("BITCOIND_RPC_URL") { - let signer_port = MqttSignerPort::new(mqtt_tx.clone()); + let signer_port = MqttSignerPort::new(mqtt_tx.clone(), lss_tx.clone()); let port_front = SignerPortFront::new(Arc::new(signer_port), settings.network); let source_factory = Arc::new(SourceFactory::new(".", settings.network)); let frontend = Frontend::new(