From 2f92844e629b1b6bbda908af0694f6ec6355b732 Mon Sep 17 00:00:00 2001 From: Evan Feenstra Date: Mon, 27 Jun 2022 17:02:11 -0700 Subject: [PATCH 1/4] chain tracker froontend in mqtt broker --- broker/Cargo.toml | 4 ++++ broker/src/main.rs | 27 +++++++++++++++++++++------ broker/src/unix_fd.rs | 38 ++++++++++++++++++++++++++++++++++++++ tester/Cargo.toml | 2 ++ 4 files changed, 65 insertions(+), 6 deletions(-) diff --git a/broker/Cargo.toml b/broker/Cargo.toml index 539f18c..c2e56f0 100644 --- a/broker/Cargo.toml +++ b/broker/Cargo.toml @@ -8,6 +8,8 @@ default-run = "sphinx-key-broker" [dependencies] vls-protocol = { git = "https://gitlab.com/Evanfeenstra/validating-lightning-signer", branch = "partial-std" } vls-proxy = { git = "https://gitlab.com/Evanfeenstra/validating-lightning-signer", branch = "partial-std" } +vls-frontend = { git = "https://gitlab.com/Evanfeenstra/validating-lightning-signer", branch = "partial-std" } +vls-protocol-client = { git = "https://gitlab.com/Evanfeenstra/validating-lightning-signer", branch = "partial-std" } # vls-protocol = { path = "../../../evanf/validating-lightning-signer/vls-protocol" } # vls-proxy = { path = "../../../evanf/validating-lightning-signer/vls-proxy" } rumqttd = { git = "https://github.com/Evanfeenstra/rumqtt", branch = "metrics" } @@ -25,6 +27,8 @@ clap_derive = "3.2.6" chrono = "0.4" once_cell = "1.12.0" bitcoin = "0.28.1" +async-trait = "0.1" +url = { version = "2.2" } [features] default = ["std"] diff --git a/broker/src/main.rs b/broker/src/main.rs index 3d84339..52470b0 100644 --- a/broker/src/main.rs +++ b/broker/src/main.rs @@ -6,13 +6,17 @@ mod unix_fd; mod util; use crate::mqtt::start_broker; -use crate::unix_fd::SignerLoop; -use clap::{App, AppSettings, Arg, arg}; +use crate::unix_fd::{MqttSignerPort, SignerLoop}; +use bitcoin::Network; +use clap::{arg, App, AppSettings, Arg}; use std::env; +use std::sync::Arc; use tokio::sync::{mpsc, oneshot}; +use url::Url; +use vls_frontend::Frontend; use vls_proxy::client::UnixClient; use vls_proxy::connection::{open_parent_fd, UnixConnection}; -use bitcoin::Network; +use vls_proxy::portfront::SignerPortFront; pub struct Channel { pub sequence: u16, @@ -53,9 +57,8 @@ fn main() -> anyhow::Result<()> { .help("bitcoin network") .long("network") .value_parser(["regtest", "signet", "testnet", "mainnet", "bitcoin"]) - .default_value("regtest") + .default_value("regtest"), ); - let matches = app.get_matches(); @@ -84,7 +87,7 @@ fn main() -> anyhow::Result<()> { let (tx, rx) = mpsc::channel(1000); let (status_tx, mut status_rx) = mpsc::channel(1000); log::info!("=> start broker"); - let _runtime = start_broker(rx, status_tx, "sphinx-1"); + let runtime = start_broker(rx, status_tx, "sphinx-1"); log::info!("=> wait for connected status"); // wait for connection = true let status = status_rx.blocking_recv().expect("couldnt receive"); @@ -94,6 +97,18 @@ fn main() -> anyhow::Result<()> { init::blocking_connect(tx.clone(), network); log::info!("=====> sent seed!"); + if let Ok(btc_url) = env::var("BITCOIND_RPC_URL") { + let signer_port = MqttSignerPort::new(tx.clone()); + let frontend = Frontend::new( + Arc::new(SignerPortFront { + signer_port: Box::new(signer_port), + }), + Url::parse(&btc_url).expect("malformed btc rpc url"), + ); + runtime.block_on(async { + frontend.start(); + }); + } // listen to reqs from CLN let conn = UnixConnection::new(parent_fd); let client = UnixClient::new(conn); diff --git a/broker/src/unix_fd.rs b/broker/src/unix_fd.rs index 31fb507..d3b9503 100644 --- a/broker/src/unix_fd.rs +++ b/broker/src/unix_fd.rs @@ -5,7 +5,9 @@ use std::thread; use tokio::sync::{mpsc, oneshot}; // use tokio::task::spawn_blocking; use crate::{Channel, ChannelReply, ChannelRequest}; +use async_trait::async_trait; use vls_protocol::{msgs, msgs::Message, Error, Result}; +use vls_protocol_client::SignerPort; use vls_proxy::client::Client; #[derive(Clone, Debug)] @@ -132,3 +134,39 @@ impl SignerLoop { Ok(reply.reply) } } + +pub struct MqttSignerPort { + sender: mpsc::Sender, +} + +#[async_trait] +impl SignerPort for MqttSignerPort { + async fn handle_message(&self, message: Vec) -> Result> { + let reply_rx = self.send_request(message).await?; + self.get_reply(reply_rx).await + } + + fn clone(&self) -> Box { + Box::new(Self { + sender: self.sender.clone(), + }) + } +} + +impl MqttSignerPort { + pub fn new(sender: mpsc::Sender) -> Self { + Self { sender } + } + + async fn send_request(&self, message: Vec) -> Result> { + let (reply_tx, reply_rx) = oneshot::channel(); + let request = ChannelRequest { message, reply_tx }; + self.sender.send(request).await.map_err(|_| Error::Eof)?; + Ok(reply_rx) + } + + async fn get_reply(&self, reply_rx: oneshot::Receiver) -> Result> { + let reply = reply_rx.blocking_recv().map_err(|_| Error::Eof)?; + Ok(reply.reply) + } +} diff --git a/tester/Cargo.toml b/tester/Cargo.toml index eb9ab26..7b767e0 100644 --- a/tester/Cargo.toml +++ b/tester/Cargo.toml @@ -9,6 +9,8 @@ sphinx-key-signer = { path = "../signer" } sphinx-key-parser = { path = "../parser" } vls-protocol = { git = "https://gitlab.com/Evanfeenstra/validating-lightning-signer", branch = "partial-std" } vls-protocol-signer = { git = "https://gitlab.com/Evanfeenstra/validating-lightning-signer", branch = "partial-std", default-features = false, features = ["std", "secp-lowmemory"] } +# vls-protocol = { path = "../../../evanf/validating-lightning-signer/vls-protocol" } +# vls-protocol-signer = { path = "../../../evanf/validating-lightning-signer/vls-protocol-signer", default-features = false, features = ["std", "secp-lowmemory"] } anyhow = {version = "1", features = ["backtrace"]} log = "0.4" rumqttc = "0.12.0" From e936d6784aa9a38e4f2e5a96db35b135a0a224a3 Mon Sep 17 00:00:00 2001 From: Evan Feenstra Date: Mon, 27 Jun 2022 17:15:26 -0700 Subject: [PATCH 2/4] copy SignerPort get_reply from vls --- broker/src/unix_fd.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/broker/src/unix_fd.rs b/broker/src/unix_fd.rs index d3b9503..e81fa55 100644 --- a/broker/src/unix_fd.rs +++ b/broker/src/unix_fd.rs @@ -1,11 +1,11 @@ +use crate::{Channel, ChannelReply, ChannelRequest}; +use async_trait::async_trait; use log::*; use secp256k1::PublicKey; use sphinx_key_parser as parser; use std::thread; use tokio::sync::{mpsc, oneshot}; -// use tokio::task::spawn_blocking; -use crate::{Channel, ChannelReply, ChannelRequest}; -use async_trait::async_trait; +use tokio::task::spawn_blocking; use vls_protocol::{msgs, msgs::Message, Error, Result}; use vls_protocol_client::SignerPort; use vls_proxy::client::Client; @@ -166,7 +166,10 @@ impl MqttSignerPort { } async fn get_reply(&self, reply_rx: oneshot::Receiver) -> Result> { - let reply = reply_rx.blocking_recv().map_err(|_| Error::Eof)?; + let reply = spawn_blocking(move || reply_rx.blocking_recv()) + .await + .map_err(|_| Error::Eof)? + .map_err(|_| Error::Eof)?; Ok(reply.reply) } } From b30b8fe361f77e3e1633887109d927bc4b5f1a34 Mon Sep 17 00:00:00 2001 From: Evan Feenstra Date: Mon, 27 Jun 2022 17:17:19 -0700 Subject: [PATCH 3/4] cleanup main func --- broker/src/main.rs | 67 +++++++++++++++++++++++----------------------- 1 file changed, 34 insertions(+), 33 deletions(-) diff --git a/broker/src/main.rs b/broker/src/main.rs index 52470b0..1cd2f7c 100644 --- a/broker/src/main.rs +++ b/broker/src/main.rs @@ -83,40 +83,41 @@ fn main() -> anyhow::Result<()> { log::info!("NETWORK: {}", network.to_string()); if matches.is_present("test") { run_test::run_test(); - } else { - let (tx, rx) = mpsc::channel(1000); - let (status_tx, mut status_rx) = mpsc::channel(1000); - log::info!("=> start broker"); - let runtime = start_broker(rx, status_tx, "sphinx-1"); - log::info!("=> wait for connected status"); - // wait for connection = true - let status = status_rx.blocking_recv().expect("couldnt receive"); - log::info!("=> connection status: {}", status); - assert_eq!(status, true, "expected connected = true"); - // runtime.block_on(async { - init::blocking_connect(tx.clone(), network); - log::info!("=====> sent seed!"); - - if let Ok(btc_url) = env::var("BITCOIND_RPC_URL") { - let signer_port = MqttSignerPort::new(tx.clone()); - let frontend = Frontend::new( - Arc::new(SignerPortFront { - signer_port: Box::new(signer_port), - }), - Url::parse(&btc_url).expect("malformed btc rpc url"), - ); - runtime.block_on(async { - frontend.start(); - }); - } - // listen to reqs from CLN - let conn = UnixConnection::new(parent_fd); - let client = UnixClient::new(conn); - // TODO pass status_rx into SignerLoop - let mut signer_loop = SignerLoop::new(client, tx); - signer_loop.start(); - // }) + return Ok(()); } + let (tx, rx) = mpsc::channel(1000); + let (status_tx, mut status_rx) = mpsc::channel(1000); + log::info!("=> start broker"); + let runtime = start_broker(rx, status_tx, "sphinx-1"); + log::info!("=> wait for connected status"); + // wait for connection = true + let status = status_rx.blocking_recv().expect("couldnt receive"); + log::info!("=> connection status: {}", status); + assert_eq!(status, true, "expected connected = true"); + // runtime.block_on(async { + init::blocking_connect(tx.clone(), network); + log::info!("=====> sent seed!"); + + if let Ok(btc_url) = env::var("BITCOIND_RPC_URL") { + let signer_port = MqttSignerPort::new(tx.clone()); + let frontend = Frontend::new( + Arc::new(SignerPortFront { + signer_port: Box::new(signer_port), + }), + Url::parse(&btc_url).expect("malformed btc rpc url"), + ); + runtime.block_on(async { + frontend.start(); + }); + } + // listen to reqs from CLN + let conn = UnixConnection::new(parent_fd); + let client = UnixClient::new(conn); + // TODO pass status_rx into SignerLoop + let mut signer_loop = SignerLoop::new(client, tx); + signer_loop.start(); + // }) + Ok(()) } From 7b463d5553a2710c75596cc4bb3280d65ace4844 Mon Sep 17 00:00:00 2001 From: Evan Feenstra Date: Tue, 28 Jun 2022 08:13:34 -0700 Subject: [PATCH 4/4] async get_reply in chain tracker, split into its own module --- broker/src/chain_tracker.rs | 41 ++++++++++++++++++++++++++++++++++++ broker/src/main.rs | 4 +++- broker/src/unix_fd.rs | 42 ------------------------------------- 3 files changed, 44 insertions(+), 43 deletions(-) create mode 100644 broker/src/chain_tracker.rs diff --git a/broker/src/chain_tracker.rs b/broker/src/chain_tracker.rs new file mode 100644 index 0000000..06cd663 --- /dev/null +++ b/broker/src/chain_tracker.rs @@ -0,0 +1,41 @@ +use crate::{ChannelReply, ChannelRequest}; +use async_trait::async_trait; +use tokio::sync::{mpsc, oneshot}; +use vls_protocol::{Error, Result}; +use vls_protocol_client::SignerPort; + +pub struct MqttSignerPort { + sender: mpsc::Sender, +} + +#[async_trait] +impl SignerPort for MqttSignerPort { + async fn handle_message(&self, message: Vec) -> Result> { + let reply_rx = self.send_request(message).await?; + self.get_reply(reply_rx).await + } + + fn clone(&self) -> Box { + Box::new(Self { + sender: self.sender.clone(), + }) + } +} + +impl MqttSignerPort { + pub fn new(sender: mpsc::Sender) -> Self { + Self { sender } + } + + async fn send_request(&self, message: Vec) -> Result> { + let (reply_tx, reply_rx) = oneshot::channel(); + let request = ChannelRequest { message, reply_tx }; + self.sender.send(request).await.map_err(|_| Error::Eof)?; + Ok(reply_rx) + } + + async fn get_reply(&self, reply_rx: oneshot::Receiver) -> Result> { + let reply = reply_rx.await.map_err(|_| Error::Eof)?; + Ok(reply.reply) + } +} diff --git a/broker/src/main.rs b/broker/src/main.rs index 1cd2f7c..4830fff 100644 --- a/broker/src/main.rs +++ b/broker/src/main.rs @@ -1,12 +1,14 @@ #![feature(once_cell)] +mod chain_tracker; mod init; mod mqtt; mod run_test; mod unix_fd; mod util; +use crate::chain_tracker::MqttSignerPort; use crate::mqtt::start_broker; -use crate::unix_fd::{MqttSignerPort, SignerLoop}; +use crate::unix_fd::SignerLoop; use bitcoin::Network; use clap::{arg, App, AppSettings, Arg}; use std::env; diff --git a/broker/src/unix_fd.rs b/broker/src/unix_fd.rs index e81fa55..4161fa5 100644 --- a/broker/src/unix_fd.rs +++ b/broker/src/unix_fd.rs @@ -1,13 +1,10 @@ use crate::{Channel, ChannelReply, ChannelRequest}; -use async_trait::async_trait; use log::*; use secp256k1::PublicKey; use sphinx_key_parser as parser; use std::thread; use tokio::sync::{mpsc, oneshot}; -use tokio::task::spawn_blocking; use vls_protocol::{msgs, msgs::Message, Error, Result}; -use vls_protocol_client::SignerPort; use vls_proxy::client::Client; #[derive(Clone, Debug)] @@ -134,42 +131,3 @@ impl SignerLoop { Ok(reply.reply) } } - -pub struct MqttSignerPort { - sender: mpsc::Sender, -} - -#[async_trait] -impl SignerPort for MqttSignerPort { - async fn handle_message(&self, message: Vec) -> Result> { - let reply_rx = self.send_request(message).await?; - self.get_reply(reply_rx).await - } - - fn clone(&self) -> Box { - Box::new(Self { - sender: self.sender.clone(), - }) - } -} - -impl MqttSignerPort { - pub fn new(sender: mpsc::Sender) -> Self { - Self { sender } - } - - async fn send_request(&self, message: Vec) -> Result> { - let (reply_tx, reply_rx) = oneshot::channel(); - let request = ChannelRequest { message, reply_tx }; - self.sender.send(request).await.map_err(|_| Error::Eof)?; - Ok(reply_rx) - } - - async fn get_reply(&self, reply_rx: oneshot::Receiver) -> Result> { - let reply = spawn_blocking(move || reply_rx.blocking_recv()) - .await - .map_err(|_| Error::Eof)? - .map_err(|_| Error::Eof)?; - Ok(reply.reply) - } -}