async get_reply in chain tracker, split into its own module

This commit is contained in:
Evan Feenstra
2022-06-28 08:13:34 -07:00
parent b30b8fe361
commit 7b463d5553
3 changed files with 44 additions and 43 deletions

View File

@@ -0,0 +1,41 @@
use crate::{ChannelReply, ChannelRequest};
use async_trait::async_trait;
use tokio::sync::{mpsc, oneshot};
use vls_protocol::{Error, Result};
use vls_protocol_client::SignerPort;
pub struct MqttSignerPort {
sender: mpsc::Sender<ChannelRequest>,
}
#[async_trait]
impl SignerPort for MqttSignerPort {
async fn handle_message(&self, message: Vec<u8>) -> Result<Vec<u8>> {
let reply_rx = self.send_request(message).await?;
self.get_reply(reply_rx).await
}
fn clone(&self) -> Box<dyn SignerPort> {
Box::new(Self {
sender: self.sender.clone(),
})
}
}
impl MqttSignerPort {
pub fn new(sender: mpsc::Sender<ChannelRequest>) -> Self {
Self { sender }
}
async fn send_request(&self, message: Vec<u8>) -> Result<oneshot::Receiver<ChannelReply>> {
let (reply_tx, reply_rx) = oneshot::channel();
let request = ChannelRequest { message, reply_tx };
self.sender.send(request).await.map_err(|_| Error::Eof)?;
Ok(reply_rx)
}
async fn get_reply(&self, reply_rx: oneshot::Receiver<ChannelReply>) -> Result<Vec<u8>> {
let reply = reply_rx.await.map_err(|_| Error::Eof)?;
Ok(reply.reply)
}
}

View File

@@ -1,12 +1,14 @@
#![feature(once_cell)] #![feature(once_cell)]
mod chain_tracker;
mod init; mod init;
mod mqtt; mod mqtt;
mod run_test; mod run_test;
mod unix_fd; mod unix_fd;
mod util; mod util;
use crate::chain_tracker::MqttSignerPort;
use crate::mqtt::start_broker; use crate::mqtt::start_broker;
use crate::unix_fd::{MqttSignerPort, SignerLoop}; use crate::unix_fd::SignerLoop;
use bitcoin::Network; use bitcoin::Network;
use clap::{arg, App, AppSettings, Arg}; use clap::{arg, App, AppSettings, Arg};
use std::env; use std::env;

View File

@@ -1,13 +1,10 @@
use crate::{Channel, ChannelReply, ChannelRequest}; use crate::{Channel, ChannelReply, ChannelRequest};
use async_trait::async_trait;
use log::*; use log::*;
use secp256k1::PublicKey; use secp256k1::PublicKey;
use sphinx_key_parser as parser; use sphinx_key_parser as parser;
use std::thread; use std::thread;
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
use tokio::task::spawn_blocking;
use vls_protocol::{msgs, msgs::Message, Error, Result}; use vls_protocol::{msgs, msgs::Message, Error, Result};
use vls_protocol_client::SignerPort;
use vls_proxy::client::Client; use vls_proxy::client::Client;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@@ -134,42 +131,3 @@ impl<C: 'static + Client> SignerLoop<C> {
Ok(reply.reply) Ok(reply.reply)
} }
} }
pub struct MqttSignerPort {
sender: mpsc::Sender<ChannelRequest>,
}
#[async_trait]
impl SignerPort for MqttSignerPort {
async fn handle_message(&self, message: Vec<u8>) -> Result<Vec<u8>> {
let reply_rx = self.send_request(message).await?;
self.get_reply(reply_rx).await
}
fn clone(&self) -> Box<dyn SignerPort> {
Box::new(Self {
sender: self.sender.clone(),
})
}
}
impl MqttSignerPort {
pub fn new(sender: mpsc::Sender<ChannelRequest>) -> Self {
Self { sender }
}
async fn send_request(&self, message: Vec<u8>) -> Result<oneshot::Receiver<ChannelReply>> {
let (reply_tx, reply_rx) = oneshot::channel();
let request = ChannelRequest { message, reply_tx };
self.sender.send(request).await.map_err(|_| Error::Eof)?;
Ok(reply_rx)
}
async fn get_reply(&self, reply_rx: oneshot::Receiver<ChannelReply>) -> Result<Vec<u8>> {
let reply = spawn_blocking(move || reply_rx.blocking_recv())
.await
.map_err(|_| Error::Eof)?
.map_err(|_| Error::Eof)?;
Ok(reply.reply)
}
}