diff --git a/broker/src/chain_tracker.rs b/broker/src/chain_tracker.rs new file mode 100644 index 0000000..06cd663 --- /dev/null +++ b/broker/src/chain_tracker.rs @@ -0,0 +1,41 @@ +use crate::{ChannelReply, ChannelRequest}; +use async_trait::async_trait; +use tokio::sync::{mpsc, oneshot}; +use vls_protocol::{Error, Result}; +use vls_protocol_client::SignerPort; + +pub struct MqttSignerPort { + sender: mpsc::Sender, +} + +#[async_trait] +impl SignerPort for MqttSignerPort { + async fn handle_message(&self, message: Vec) -> Result> { + let reply_rx = self.send_request(message).await?; + self.get_reply(reply_rx).await + } + + fn clone(&self) -> Box { + Box::new(Self { + sender: self.sender.clone(), + }) + } +} + +impl MqttSignerPort { + pub fn new(sender: mpsc::Sender) -> Self { + Self { sender } + } + + async fn send_request(&self, message: Vec) -> Result> { + let (reply_tx, reply_rx) = oneshot::channel(); + let request = ChannelRequest { message, reply_tx }; + self.sender.send(request).await.map_err(|_| Error::Eof)?; + Ok(reply_rx) + } + + async fn get_reply(&self, reply_rx: oneshot::Receiver) -> Result> { + let reply = reply_rx.await.map_err(|_| Error::Eof)?; + Ok(reply.reply) + } +} diff --git a/broker/src/main.rs b/broker/src/main.rs index 1cd2f7c..4830fff 100644 --- a/broker/src/main.rs +++ b/broker/src/main.rs @@ -1,12 +1,14 @@ #![feature(once_cell)] +mod chain_tracker; mod init; mod mqtt; mod run_test; mod unix_fd; mod util; +use crate::chain_tracker::MqttSignerPort; use crate::mqtt::start_broker; -use crate::unix_fd::{MqttSignerPort, SignerLoop}; +use crate::unix_fd::SignerLoop; use bitcoin::Network; use clap::{arg, App, AppSettings, Arg}; use std::env; diff --git a/broker/src/unix_fd.rs b/broker/src/unix_fd.rs index e81fa55..4161fa5 100644 --- a/broker/src/unix_fd.rs +++ b/broker/src/unix_fd.rs @@ -1,13 +1,10 @@ use crate::{Channel, ChannelReply, ChannelRequest}; -use async_trait::async_trait; use log::*; use secp256k1::PublicKey; use sphinx_key_parser as parser; use std::thread; use tokio::sync::{mpsc, oneshot}; -use tokio::task::spawn_blocking; use vls_protocol::{msgs, msgs::Message, Error, Result}; -use vls_protocol_client::SignerPort; use vls_proxy::client::Client; #[derive(Clone, Debug)] @@ -134,42 +131,3 @@ impl SignerLoop { Ok(reply.reply) } } - -pub struct MqttSignerPort { - sender: mpsc::Sender, -} - -#[async_trait] -impl SignerPort for MqttSignerPort { - async fn handle_message(&self, message: Vec) -> Result> { - let reply_rx = self.send_request(message).await?; - self.get_reply(reply_rx).await - } - - fn clone(&self) -> Box { - Box::new(Self { - sender: self.sender.clone(), - }) - } -} - -impl MqttSignerPort { - pub fn new(sender: mpsc::Sender) -> Self { - Self { sender } - } - - async fn send_request(&self, message: Vec) -> Result> { - let (reply_tx, reply_rx) = oneshot::channel(); - let request = ChannelRequest { message, reply_tx }; - self.sender.send(request).await.map_err(|_| Error::Eof)?; - Ok(reply_rx) - } - - async fn get_reply(&self, reply_rx: oneshot::Receiver) -> Result> { - let reply = spawn_blocking(move || reply_rx.blocking_recv()) - .await - .map_err(|_| Error::Eof)? - .map_err(|_| Error::Eof)?; - Ok(reply.reply) - } -}