include LSS tx in ChainTracker message handler

This commit is contained in:
Evan Feenstra
2023-06-26 13:46:50 -07:00
parent 1b54c5358c
commit 3891c15907
2 changed files with 36 additions and 22 deletions

View File

@@ -1,19 +1,20 @@
use crate::conn::{ChannelReply, ChannelRequest};
use crate::conn::{ChannelRequest, LssReq};
use anyhow::Result;
use async_trait::async_trait;
use rocket::tokio::sync::{mpsc, oneshot};
use rocket::tokio::sync::mpsc;
use sphinx_signer::{parser, sphinx_glyph::topics};
use vls_protocol::{Error, Result};
use vls_protocol::Error;
use vls_protocol_client::{ClientResult, SignerPort};
pub struct MqttSignerPort {
sender: mpsc::Sender<ChannelRequest>,
lss_tx: mpsc::Sender<LssReq>,
}
#[async_trait]
impl SignerPort for MqttSignerPort {
async fn handle_message(&self, message: Vec<u8>) -> ClientResult<Vec<u8>> {
let reply_rx = self.send_request(message).await?;
self.get_reply(reply_rx).await
Ok(self.send_and_wait(message).await.map_err(|_| Error::Eof)?)
}
fn is_ready(&self) -> bool {
@@ -22,24 +23,37 @@ impl SignerPort for MqttSignerPort {
}
impl MqttSignerPort {
pub fn new(sender: mpsc::Sender<ChannelRequest>) -> Self {
Self { sender }
pub fn new(sender: mpsc::Sender<ChannelRequest>, lss_tx: mpsc::Sender<LssReq>) -> Self {
Self { sender, lss_tx }
}
async fn send_request(&self, message: Vec<u8>) -> Result<oneshot::Receiver<ChannelReply>> {
let m = parser::raw_request_from_bytes(message, 0, [0;33], 0)?;
let (request, reply_rx) = ChannelRequest::new(topics::VLS, m);
log::info!("=> MqttSignerPort send request now");
self.sender.send(request).await.map_err(|_| Error::Eof)?;
log::info!("=> MqttSignerPort sent");
Ok(reply_rx)
}
async fn get_reply(&self, reply_rx: oneshot::Receiver<ChannelReply>) -> ClientResult<Vec<u8>> {
let reply = reply_rx.await.map_err(|_| Error::Eof)?;
log::info!("=> MqttSignerPort got response {:?}", reply.reply);
let r = parser::raw_response_from_bytes(reply.reply, 0)?;
log::info!("=> MqttSignerPort parsed response {:?}", r);
async fn send_and_wait(&self, message: Vec<u8>) -> Result<Vec<u8>> {
let m = parser::raw_request_from_bytes(message, 0, [0; 33], 0)?;
let (res_topic, res) = self.send_request_wait(topics::VLS, m).await?;
let mut the_res = res.clone();
if res_topic == topics::LSS_RES {
// send LSS instead
let lss_reply = self.send_lss(res).await?;
let (_res_topic, res2) = self.send_request_wait(topics::LSS_MSG, lss_reply).await?;
the_res = res2;
}
let r = parser::raw_response_from_bytes(the_res, 0)?;
Ok(r)
}
async fn send_request_wait(&self, topic: &str, message: Vec<u8>) -> Result<(String, Vec<u8>)> {
let (request, reply_rx) = ChannelRequest::new(topic, message);
self.sender.send(request).await?;
let reply = reply_rx.await?;
Ok((reply.topic_end, reply.reply))
}
async fn send_lss(&self, message: Vec<u8>) -> Result<Vec<u8>> {
// Send a request to the MQTT handler to send to signer
let (request, reply_rx) = LssReq::new(message);
// This can fail if MQTT shuts down
self.lss_tx.send(request).await?;
let res = reply_rx.await?;
Ok(res)
}
}

View File

@@ -105,7 +105,7 @@ async fn run_main(parent_fd: i32) -> rocket::Rocket<rocket::Build> {
};
if let Ok(btc_url) = env::var("BITCOIND_RPC_URL") {
let signer_port = MqttSignerPort::new(mqtt_tx.clone());
let signer_port = MqttSignerPort::new(mqtt_tx.clone(), lss_tx.clone());
let port_front = SignerPortFront::new(Arc::new(signer_port), settings.network);
let source_factory = Arc::new(SourceFactory::new(".", settings.network));
let frontend = Frontend::new(