diff --git a/broker/Cargo.toml b/broker/Cargo.toml index 539f18c..c2e56f0 100644 --- a/broker/Cargo.toml +++ b/broker/Cargo.toml @@ -8,6 +8,8 @@ default-run = "sphinx-key-broker" [dependencies] vls-protocol = { git = "https://gitlab.com/Evanfeenstra/validating-lightning-signer", branch = "partial-std" } vls-proxy = { git = "https://gitlab.com/Evanfeenstra/validating-lightning-signer", branch = "partial-std" } +vls-frontend = { git = "https://gitlab.com/Evanfeenstra/validating-lightning-signer", branch = "partial-std" } +vls-protocol-client = { git = "https://gitlab.com/Evanfeenstra/validating-lightning-signer", branch = "partial-std" } # vls-protocol = { path = "../../../evanf/validating-lightning-signer/vls-protocol" } # vls-proxy = { path = "../../../evanf/validating-lightning-signer/vls-proxy" } rumqttd = { git = "https://github.com/Evanfeenstra/rumqtt", branch = "metrics" } @@ -25,6 +27,8 @@ clap_derive = "3.2.6" chrono = "0.4" once_cell = "1.12.0" bitcoin = "0.28.1" +async-trait = "0.1" +url = { version = "2.2" } [features] default = ["std"] diff --git a/broker/src/chain_tracker.rs b/broker/src/chain_tracker.rs new file mode 100644 index 0000000..06cd663 --- /dev/null +++ b/broker/src/chain_tracker.rs @@ -0,0 +1,41 @@ +use crate::{ChannelReply, ChannelRequest}; +use async_trait::async_trait; +use tokio::sync::{mpsc, oneshot}; +use vls_protocol::{Error, Result}; +use vls_protocol_client::SignerPort; + +pub struct MqttSignerPort { + sender: mpsc::Sender, +} + +#[async_trait] +impl SignerPort for MqttSignerPort { + async fn handle_message(&self, message: Vec) -> Result> { + let reply_rx = self.send_request(message).await?; + self.get_reply(reply_rx).await + } + + fn clone(&self) -> Box { + Box::new(Self { + sender: self.sender.clone(), + }) + } +} + +impl MqttSignerPort { + pub fn new(sender: mpsc::Sender) -> Self { + Self { sender } + } + + async fn send_request(&self, message: Vec) -> Result> { + let (reply_tx, reply_rx) = oneshot::channel(); + let request = ChannelRequest { message, reply_tx }; + self.sender.send(request).await.map_err(|_| Error::Eof)?; + Ok(reply_rx) + } + + async fn get_reply(&self, reply_rx: oneshot::Receiver) -> Result> { + let reply = reply_rx.await.map_err(|_| Error::Eof)?; + Ok(reply.reply) + } +} diff --git a/broker/src/main.rs b/broker/src/main.rs index 3d84339..4830fff 100644 --- a/broker/src/main.rs +++ b/broker/src/main.rs @@ -1,18 +1,24 @@ #![feature(once_cell)] +mod chain_tracker; mod init; mod mqtt; mod run_test; mod unix_fd; mod util; +use crate::chain_tracker::MqttSignerPort; use crate::mqtt::start_broker; use crate::unix_fd::SignerLoop; -use clap::{App, AppSettings, Arg, arg}; +use bitcoin::Network; +use clap::{arg, App, AppSettings, Arg}; use std::env; +use std::sync::Arc; use tokio::sync::{mpsc, oneshot}; +use url::Url; +use vls_frontend::Frontend; use vls_proxy::client::UnixClient; use vls_proxy::connection::{open_parent_fd, UnixConnection}; -use bitcoin::Network; +use vls_proxy::portfront::SignerPortFront; pub struct Channel { pub sequence: u16, @@ -53,9 +59,8 @@ fn main() -> anyhow::Result<()> { .help("bitcoin network") .long("network") .value_parser(["regtest", "signet", "testnet", "mainnet", "bitcoin"]) - .default_value("regtest") + .default_value("regtest"), ); - let matches = app.get_matches(); @@ -80,28 +85,41 @@ fn main() -> anyhow::Result<()> { log::info!("NETWORK: {}", network.to_string()); if matches.is_present("test") { run_test::run_test(); - } else { - let (tx, rx) = mpsc::channel(1000); - let (status_tx, mut status_rx) = mpsc::channel(1000); - log::info!("=> start broker"); - let _runtime = start_broker(rx, status_tx, "sphinx-1"); - log::info!("=> wait for connected status"); - // wait for connection = true - let status = status_rx.blocking_recv().expect("couldnt receive"); - log::info!("=> connection status: {}", status); - assert_eq!(status, true, "expected connected = true"); - // runtime.block_on(async { - init::blocking_connect(tx.clone(), network); - log::info!("=====> sent seed!"); - - // listen to reqs from CLN - let conn = UnixConnection::new(parent_fd); - let client = UnixClient::new(conn); - // TODO pass status_rx into SignerLoop - let mut signer_loop = SignerLoop::new(client, tx); - signer_loop.start(); - // }) + return Ok(()); } + let (tx, rx) = mpsc::channel(1000); + let (status_tx, mut status_rx) = mpsc::channel(1000); + log::info!("=> start broker"); + let runtime = start_broker(rx, status_tx, "sphinx-1"); + log::info!("=> wait for connected status"); + // wait for connection = true + let status = status_rx.blocking_recv().expect("couldnt receive"); + log::info!("=> connection status: {}", status); + assert_eq!(status, true, "expected connected = true"); + // runtime.block_on(async { + init::blocking_connect(tx.clone(), network); + log::info!("=====> sent seed!"); + + if let Ok(btc_url) = env::var("BITCOIND_RPC_URL") { + let signer_port = MqttSignerPort::new(tx.clone()); + let frontend = Frontend::new( + Arc::new(SignerPortFront { + signer_port: Box::new(signer_port), + }), + Url::parse(&btc_url).expect("malformed btc rpc url"), + ); + runtime.block_on(async { + frontend.start(); + }); + } + // listen to reqs from CLN + let conn = UnixConnection::new(parent_fd); + let client = UnixClient::new(conn); + // TODO pass status_rx into SignerLoop + let mut signer_loop = SignerLoop::new(client, tx); + signer_loop.start(); + // }) + Ok(()) } diff --git a/broker/src/unix_fd.rs b/broker/src/unix_fd.rs index 31fb507..4161fa5 100644 --- a/broker/src/unix_fd.rs +++ b/broker/src/unix_fd.rs @@ -1,10 +1,9 @@ +use crate::{Channel, ChannelReply, ChannelRequest}; use log::*; use secp256k1::PublicKey; use sphinx_key_parser as parser; use std::thread; use tokio::sync::{mpsc, oneshot}; -// use tokio::task::spawn_blocking; -use crate::{Channel, ChannelReply, ChannelRequest}; use vls_protocol::{msgs, msgs::Message, Error, Result}; use vls_proxy::client::Client; diff --git a/tester/Cargo.toml b/tester/Cargo.toml index eb9ab26..7b767e0 100644 --- a/tester/Cargo.toml +++ b/tester/Cargo.toml @@ -9,6 +9,8 @@ sphinx-key-signer = { path = "../signer" } sphinx-key-parser = { path = "../parser" } vls-protocol = { git = "https://gitlab.com/Evanfeenstra/validating-lightning-signer", branch = "partial-std" } vls-protocol-signer = { git = "https://gitlab.com/Evanfeenstra/validating-lightning-signer", branch = "partial-std", default-features = false, features = ["std", "secp-lowmemory"] } +# vls-protocol = { path = "../../../evanf/validating-lightning-signer/vls-protocol" } +# vls-protocol-signer = { path = "../../../evanf/validating-lightning-signer/vls-protocol-signer", default-features = false, features = ["std", "secp-lowmemory"] } anyhow = {version = "1", features = ["backtrace"]} log = "0.4" rumqttc = "0.12.0"