diff --git a/broker/Cargo.toml b/broker/Cargo.toml index 539f18c..c2e56f0 100644 --- a/broker/Cargo.toml +++ b/broker/Cargo.toml @@ -8,6 +8,8 @@ default-run = "sphinx-key-broker" [dependencies] vls-protocol = { git = "https://gitlab.com/Evanfeenstra/validating-lightning-signer", branch = "partial-std" } vls-proxy = { git = "https://gitlab.com/Evanfeenstra/validating-lightning-signer", branch = "partial-std" } +vls-frontend = { git = "https://gitlab.com/Evanfeenstra/validating-lightning-signer", branch = "partial-std" } +vls-protocol-client = { git = "https://gitlab.com/Evanfeenstra/validating-lightning-signer", branch = "partial-std" } # vls-protocol = { path = "../../../evanf/validating-lightning-signer/vls-protocol" } # vls-proxy = { path = "../../../evanf/validating-lightning-signer/vls-proxy" } rumqttd = { git = "https://github.com/Evanfeenstra/rumqtt", branch = "metrics" } @@ -25,6 +27,8 @@ clap_derive = "3.2.6" chrono = "0.4" once_cell = "1.12.0" bitcoin = "0.28.1" +async-trait = "0.1" +url = { version = "2.2" } [features] default = ["std"] diff --git a/broker/src/main.rs b/broker/src/main.rs index 3d84339..52470b0 100644 --- a/broker/src/main.rs +++ b/broker/src/main.rs @@ -6,13 +6,17 @@ mod unix_fd; mod util; use crate::mqtt::start_broker; -use crate::unix_fd::SignerLoop; -use clap::{App, AppSettings, Arg, arg}; +use crate::unix_fd::{MqttSignerPort, SignerLoop}; +use bitcoin::Network; +use clap::{arg, App, AppSettings, Arg}; use std::env; +use std::sync::Arc; use tokio::sync::{mpsc, oneshot}; +use url::Url; +use vls_frontend::Frontend; use vls_proxy::client::UnixClient; use vls_proxy::connection::{open_parent_fd, UnixConnection}; -use bitcoin::Network; +use vls_proxy::portfront::SignerPortFront; pub struct Channel { pub sequence: u16, @@ -53,9 +57,8 @@ fn main() -> anyhow::Result<()> { .help("bitcoin network") .long("network") .value_parser(["regtest", "signet", "testnet", "mainnet", "bitcoin"]) - .default_value("regtest") + .default_value("regtest"), ); - let matches = app.get_matches(); @@ -84,7 +87,7 @@ fn main() -> anyhow::Result<()> { let (tx, rx) = mpsc::channel(1000); let (status_tx, mut status_rx) = mpsc::channel(1000); log::info!("=> start broker"); - let _runtime = start_broker(rx, status_tx, "sphinx-1"); + let runtime = start_broker(rx, status_tx, "sphinx-1"); log::info!("=> wait for connected status"); // wait for connection = true let status = status_rx.blocking_recv().expect("couldnt receive"); @@ -94,6 +97,18 @@ fn main() -> anyhow::Result<()> { init::blocking_connect(tx.clone(), network); log::info!("=====> sent seed!"); + if let Ok(btc_url) = env::var("BITCOIND_RPC_URL") { + let signer_port = MqttSignerPort::new(tx.clone()); + let frontend = Frontend::new( + Arc::new(SignerPortFront { + signer_port: Box::new(signer_port), + }), + Url::parse(&btc_url).expect("malformed btc rpc url"), + ); + runtime.block_on(async { + frontend.start(); + }); + } // listen to reqs from CLN let conn = UnixConnection::new(parent_fd); let client = UnixClient::new(conn); diff --git a/broker/src/unix_fd.rs b/broker/src/unix_fd.rs index 31fb507..d3b9503 100644 --- a/broker/src/unix_fd.rs +++ b/broker/src/unix_fd.rs @@ -5,7 +5,9 @@ use std::thread; use tokio::sync::{mpsc, oneshot}; // use tokio::task::spawn_blocking; use crate::{Channel, ChannelReply, ChannelRequest}; +use async_trait::async_trait; use vls_protocol::{msgs, msgs::Message, Error, Result}; +use vls_protocol_client::SignerPort; use vls_proxy::client::Client; #[derive(Clone, Debug)] @@ -132,3 +134,39 @@ impl SignerLoop { Ok(reply.reply) } } + +pub struct MqttSignerPort { + sender: mpsc::Sender, +} + +#[async_trait] +impl SignerPort for MqttSignerPort { + async fn handle_message(&self, message: Vec) -> Result> { + let reply_rx = self.send_request(message).await?; + self.get_reply(reply_rx).await + } + + fn clone(&self) -> Box { + Box::new(Self { + sender: self.sender.clone(), + }) + } +} + +impl MqttSignerPort { + pub fn new(sender: mpsc::Sender) -> Self { + Self { sender } + } + + async fn send_request(&self, message: Vec) -> Result> { + let (reply_tx, reply_rx) = oneshot::channel(); + let request = ChannelRequest { message, reply_tx }; + self.sender.send(request).await.map_err(|_| Error::Eof)?; + Ok(reply_rx) + } + + async fn get_reply(&self, reply_rx: oneshot::Receiver) -> Result> { + let reply = reply_rx.blocking_recv().map_err(|_| Error::Eof)?; + Ok(reply.reply) + } +} diff --git a/tester/Cargo.toml b/tester/Cargo.toml index eb9ab26..7b767e0 100644 --- a/tester/Cargo.toml +++ b/tester/Cargo.toml @@ -9,6 +9,8 @@ sphinx-key-signer = { path = "../signer" } sphinx-key-parser = { path = "../parser" } vls-protocol = { git = "https://gitlab.com/Evanfeenstra/validating-lightning-signer", branch = "partial-std" } vls-protocol-signer = { git = "https://gitlab.com/Evanfeenstra/validating-lightning-signer", branch = "partial-std", default-features = false, features = ["std", "secp-lowmemory"] } +# vls-protocol = { path = "../../../evanf/validating-lightning-signer/vls-protocol" } +# vls-protocol-signer = { path = "../../../evanf/validating-lightning-signer/vls-protocol-signer", default-features = false, features = ["std", "secp-lowmemory"] } anyhow = {version = "1", features = ["backtrace"]} log = "0.4" rumqttc = "0.12.0"