chain tracker froontend in mqtt broker

This commit is contained in:
Evan Feenstra
2022-06-27 17:02:11 -07:00
parent f31a3becf0
commit 2f92844e62
4 changed files with 65 additions and 6 deletions

View File

@@ -8,6 +8,8 @@ default-run = "sphinx-key-broker"
[dependencies]
vls-protocol = { git = "https://gitlab.com/Evanfeenstra/validating-lightning-signer", branch = "partial-std" }
vls-proxy = { git = "https://gitlab.com/Evanfeenstra/validating-lightning-signer", branch = "partial-std" }
vls-frontend = { git = "https://gitlab.com/Evanfeenstra/validating-lightning-signer", branch = "partial-std" }
vls-protocol-client = { git = "https://gitlab.com/Evanfeenstra/validating-lightning-signer", branch = "partial-std" }
# vls-protocol = { path = "../../../evanf/validating-lightning-signer/vls-protocol" }
# vls-proxy = { path = "../../../evanf/validating-lightning-signer/vls-proxy" }
rumqttd = { git = "https://github.com/Evanfeenstra/rumqtt", branch = "metrics" }
@@ -25,6 +27,8 @@ clap_derive = "3.2.6"
chrono = "0.4"
once_cell = "1.12.0"
bitcoin = "0.28.1"
async-trait = "0.1"
url = { version = "2.2" }
[features]
default = ["std"]

View File

@@ -6,13 +6,17 @@ mod unix_fd;
mod util;
use crate::mqtt::start_broker;
use crate::unix_fd::SignerLoop;
use clap::{App, AppSettings, Arg, arg};
use crate::unix_fd::{MqttSignerPort, SignerLoop};
use bitcoin::Network;
use clap::{arg, App, AppSettings, Arg};
use std::env;
use std::sync::Arc;
use tokio::sync::{mpsc, oneshot};
use url::Url;
use vls_frontend::Frontend;
use vls_proxy::client::UnixClient;
use vls_proxy::connection::{open_parent_fd, UnixConnection};
use bitcoin::Network;
use vls_proxy::portfront::SignerPortFront;
pub struct Channel {
pub sequence: u16,
@@ -53,10 +57,9 @@ fn main() -> anyhow::Result<()> {
.help("bitcoin network")
.long("network")
.value_parser(["regtest", "signet", "testnet", "mainnet", "bitcoin"])
.default_value("regtest")
.default_value("regtest"),
);
let matches = app.get_matches();
let network_string: &String = matches.get_one("network").expect("expected a network");
@@ -84,7 +87,7 @@ fn main() -> anyhow::Result<()> {
let (tx, rx) = mpsc::channel(1000);
let (status_tx, mut status_rx) = mpsc::channel(1000);
log::info!("=> start broker");
let _runtime = start_broker(rx, status_tx, "sphinx-1");
let runtime = start_broker(rx, status_tx, "sphinx-1");
log::info!("=> wait for connected status");
// wait for connection = true
let status = status_rx.blocking_recv().expect("couldnt receive");
@@ -94,6 +97,18 @@ fn main() -> anyhow::Result<()> {
init::blocking_connect(tx.clone(), network);
log::info!("=====> sent seed!");
if let Ok(btc_url) = env::var("BITCOIND_RPC_URL") {
let signer_port = MqttSignerPort::new(tx.clone());
let frontend = Frontend::new(
Arc::new(SignerPortFront {
signer_port: Box::new(signer_port),
}),
Url::parse(&btc_url).expect("malformed btc rpc url"),
);
runtime.block_on(async {
frontend.start();
});
}
// listen to reqs from CLN
let conn = UnixConnection::new(parent_fd);
let client = UnixClient::new(conn);

View File

@@ -5,7 +5,9 @@ use std::thread;
use tokio::sync::{mpsc, oneshot};
// use tokio::task::spawn_blocking;
use crate::{Channel, ChannelReply, ChannelRequest};
use async_trait::async_trait;
use vls_protocol::{msgs, msgs::Message, Error, Result};
use vls_protocol_client::SignerPort;
use vls_proxy::client::Client;
#[derive(Clone, Debug)]
@@ -132,3 +134,39 @@ impl<C: 'static + Client> SignerLoop<C> {
Ok(reply.reply)
}
}
pub struct MqttSignerPort {
sender: mpsc::Sender<ChannelRequest>,
}
#[async_trait]
impl SignerPort for MqttSignerPort {
async fn handle_message(&self, message: Vec<u8>) -> Result<Vec<u8>> {
let reply_rx = self.send_request(message).await?;
self.get_reply(reply_rx).await
}
fn clone(&self) -> Box<dyn SignerPort> {
Box::new(Self {
sender: self.sender.clone(),
})
}
}
impl MqttSignerPort {
pub fn new(sender: mpsc::Sender<ChannelRequest>) -> Self {
Self { sender }
}
async fn send_request(&self, message: Vec<u8>) -> Result<oneshot::Receiver<ChannelReply>> {
let (reply_tx, reply_rx) = oneshot::channel();
let request = ChannelRequest { message, reply_tx };
self.sender.send(request).await.map_err(|_| Error::Eof)?;
Ok(reply_rx)
}
async fn get_reply(&self, reply_rx: oneshot::Receiver<ChannelReply>) -> Result<Vec<u8>> {
let reply = reply_rx.blocking_recv().map_err(|_| Error::Eof)?;
Ok(reply.reply)
}
}

View File

@@ -9,6 +9,8 @@ sphinx-key-signer = { path = "../signer" }
sphinx-key-parser = { path = "../parser" }
vls-protocol = { git = "https://gitlab.com/Evanfeenstra/validating-lightning-signer", branch = "partial-std" }
vls-protocol-signer = { git = "https://gitlab.com/Evanfeenstra/validating-lightning-signer", branch = "partial-std", default-features = false, features = ["std", "secp-lowmemory"] }
# vls-protocol = { path = "../../../evanf/validating-lightning-signer/vls-protocol" }
# vls-protocol-signer = { path = "../../../evanf/validating-lightning-signer/vls-protocol-signer", default-features = false, features = ["std", "secp-lowmemory"] }
anyhow = {version = "1", features = ["backtrace"]}
log = "0.4"
rumqttc = "0.12.0"