diff --git a/broker/src/chain_tracker.rs b/broker/src/chain_tracker.rs index c4ee919..4f924de 100644 --- a/broker/src/chain_tracker.rs +++ b/broker/src/chain_tracker.rs @@ -1,4 +1,4 @@ -use crate::{ChannelReply, ChannelRequest}; +use crate::{mqtt::PUB_TOPIC, ChannelReply, ChannelRequest}; use async_trait::async_trait; use rocket::tokio::sync::{mpsc, oneshot}; use vls_protocol::{Error, Result}; @@ -28,8 +28,7 @@ impl MqttSignerPort { } async fn send_request(&self, message: Vec) -> Result> { - let (reply_tx, reply_rx) = oneshot::channel(); - let request = ChannelRequest { message, reply_tx }; + let (request, reply_rx) = ChannelRequest::new(PUB_TOPIC, message); self.sender.send(request).await.map_err(|_| Error::Eof)?; Ok(reply_rx) } diff --git a/broker/src/main.rs b/broker/src/main.rs index 23a1f49..3a939b2 100644 --- a/broker/src/main.rs +++ b/broker/src/main.rs @@ -31,9 +31,21 @@ pub struct Channel { /// Responses are received on the oneshot sender #[derive(Debug)] pub struct ChannelRequest { + pub topic: String, pub message: Vec, pub reply_tx: oneshot::Sender, } +impl ChannelRequest { + pub fn new(topic: &str, message: Vec) -> (Self, oneshot::Receiver) { + let (reply_tx, reply_rx) = oneshot::channel(); + let cr = ChannelRequest { + topic: topic.to_string(), + message, + reply_tx, + }; + (cr, reply_rx) + } +} // mpsc reply #[derive(Debug)] diff --git a/broker/src/mqtt.rs b/broker/src/mqtt.rs index 72a1e0b..41acae9 100644 --- a/broker/src/mqtt.rs +++ b/broker/src/mqtt.rs @@ -12,8 +12,10 @@ use std::sync::Arc; use std::sync::{LazyLock, Mutex}; use std::time::Duration; +pub const PUB_TOPIC: &str = "sphinx"; +pub const CONTROL_TOPIC: &str = "sphinx-control"; const SUB_TOPIC: &str = "sphinx-return"; -const PUB_TOPIC: &str = "sphinx"; +const CONTROL_SUB_TOPIC: &str = "sphinx-control-return"; const USERNAME: &str = "sphinx-key"; const PASSWORD: &str = "sphinx-key-pass"; // must get a reply within this time, or disconnects @@ -49,7 +51,10 @@ pub async fn start_broker( let (msg_tx, mut msg_rx): (mpsc::Sender>, mpsc::Receiver>) = mpsc::channel(1000); let (mut link_tx, mut link_rx) = builder.clone().connect("localclient", 200).await.unwrap(); - link_tx.subscribe([SUB_TOPIC]).await.unwrap(); + link_tx + .subscribe([SUB_TOPIC, CONTROL_SUB_TOPIC]) + .await + .unwrap(); let router_tx = builder.router_tx(); let status_sender_ = status_sender.clone(); @@ -84,7 +89,7 @@ pub async fn start_broker( let relay_task = tokio::spawn(async move { while let Some(msg) = receiver.recv().await { link_tx - .publish(PUB_TOPIC, false, msg.message) + .publish(&msg.topic, false, msg.message) .await .expect("could not mqtt pub"); match timeout(Duration::from_millis(REPLY_TIMEOUT_MS), msg_rx.recv()).await { diff --git a/broker/src/routes.rs b/broker/src/routes.rs index b90f2a1..3566098 100644 --- a/broker/src/routes.rs +++ b/broker/src/routes.rs @@ -1,7 +1,7 @@ -use crate::ChannelRequest; +use crate::{mqtt::CONTROL_TOPIC, ChannelRequest}; use rocket::fairing::{Fairing, Info, Kind}; use rocket::http::Header; -use rocket::tokio::sync::{mpsc::Sender, oneshot}; +use rocket::tokio::sync::mpsc::Sender; use rocket::*; use rocket::{Request, Response}; @@ -14,8 +14,7 @@ pub async fn yo(sender: &State>, msg: &str) -> Result, connected: bool, ) -> anyhow::Result<()> { + return Ok(()); if !connected { return Ok(()); } @@ -69,12 +67,8 @@ pub async fn iteration( message: WireString("ping".as_bytes().to_vec()), }; let ping_bytes = parser::request_from_msg(ping, sequence, 0)?; - let (reply_tx, reply_rx) = oneshot::channel(); // Send a request to the MQTT handler to send to signer - let request = ChannelRequest { - message: ping_bytes, - reply_tx, - }; + let (request, reply_rx) = ChannelRequest::new(PUB_TOPIC, ping_bytes); tx.send(request).await?; println!("tx.send(request)"); let res = reply_rx.await?; diff --git a/broker/src/unix_fd.rs b/broker/src/unix_fd.rs index 8c42c45..da96a0b 100644 --- a/broker/src/unix_fd.rs +++ b/broker/src/unix_fd.rs @@ -1,3 +1,4 @@ +use crate::mqtt::PUB_TOPIC; use crate::util::Settings; use crate::{Channel, ChannelReply, ChannelRequest}; use bitcoin::blockdata::constants::ChainHash; @@ -125,10 +126,8 @@ impl SignerLoop { } fn send_request(&mut self, message: Vec) -> Result> { - // Create a one-shot channel to receive the reply - let (reply_tx, reply_rx) = oneshot::channel(); // Send a request to the MQTT handler to send to signer - let request = ChannelRequest { message, reply_tx }; + let (request, reply_rx) = ChannelRequest::new(PUB_TOPIC, message); // This can fail if MQTT shuts down self.chan .sender diff --git a/parser/src/control.rs b/parser/src/control.rs index e7fa63c..6d0449f 100644 --- a/parser/src/control.rs +++ b/parser/src/control.rs @@ -38,8 +38,8 @@ impl Controller { } pub fn build_msg(&mut self, msg: ControlMessage) -> anyhow::Result> { let data = rmp_serde::to_vec(&msg)?; - let ret = nonce::build_msg(&data, &self.0, self.2)?; self.2 = self.2 + 1; + let ret = nonce::build_msg(&data, &self.0, self.2)?; Ok(ret) } pub fn build_response(&self, msg: ControlResponse) -> anyhow::Result> { @@ -54,4 +54,12 @@ impl Controller { pub fn parse_response(&self, input: &[u8]) -> anyhow::Result { Ok(rmp_serde::from_slice(input)?) } + pub fn handle(&mut self, input: &[u8]) -> anyhow::Result> { + let msg = self.parse_msg(input)?; + let res = match msg { + ControlMessage::Nonce => ControlResponse::Nonce(self.2), + _ => ControlResponse::Nonce(self.2), + }; + Ok(self.build_response(res)?) + } } diff --git a/sphinx-key/src/conn/mqtt.rs b/sphinx-key/src/conn/mqtt.rs index a176058..3ba8115 100644 --- a/sphinx-key/src/conn/mqtt.rs +++ b/sphinx-key/src/conn/mqtt.rs @@ -15,6 +15,7 @@ use std::thread; pub const VLS_TOPIC: &str = "sphinx"; pub const CONTROL_TOPIC: &str = "sphinx-control"; pub const RETURN_TOPIC: &str = "sphinx-return"; +pub const CONTROL_RETURN_TOPIC: &str = "sphinx-control-return"; pub const USERNAME: &str = "sphinx-key"; pub const PASSWORD: &str = "sphinx-key-pass"; pub const QOS: QoS = QoS::AtMostOnce; diff --git a/tester/README.md b/tester/README.md index 77e041d..82cf5a5 100644 --- a/tester/README.md +++ b/tester/README.md @@ -2,7 +2,7 @@ #### test control messages -cargo run --bin sphinx-key-tester -- --test +cargo run --bin sphinx-key-tester -- --test --log cd broker cargo run -- --test diff --git a/tester/src/ctrl.rs b/tester/src/ctrl.rs index 6303539..6bbc9a6 100644 --- a/tester/src/ctrl.rs +++ b/tester/src/ctrl.rs @@ -1,6 +1,6 @@ use dotenv::dotenv; use serde::{Deserialize, Serialize}; -use sphinx_key_parser::control::{ControlMessage, Controller}; +use sphinx_key_parser::control::{ControlMessage, ControlResponse, Controller}; use sphinx_key_signer::lightning_signer::bitcoin::Network; use std::env; use std::time::Duration; @@ -35,7 +35,10 @@ async fn main() -> anyhow::Result<()> { .await?; let response: String = res.text().await?; - println!("res {:?}", response); + let res_bytes = hex::decode(response).expect("couldnt decode response"); + + let resp = ctrl.parse_response(&res_bytes).expect("nope"); + println!("RESponse from the ESP!!! {:?}", resp); Ok(()) } diff --git a/tester/src/main.rs b/tester/src/main.rs index cd105ed..7d2630b 100644 --- a/tester/src/main.rs +++ b/tester/src/main.rs @@ -2,6 +2,7 @@ use sphinx_key_parser as parser; use sphinx_key_signer::lightning_signer::bitcoin::Network; use clap::{App, AppSettings, Arg}; +use dotenv::dotenv; use rumqttc::{self, AsyncClient, Event, MqttOptions, Packet, QoS}; use sphinx_key_signer::control::Controller; use sphinx_key_signer::vls_protocol::{model::PubKey, msgs}; @@ -13,15 +14,17 @@ use std::time::Duration; const SUB_TOPIC: &str = "sphinx"; const CONTROL_TOPIC: &str = "sphinx-control"; +const CONTROL_PUB_TOPIC: &str = "sphinx-control-return"; const PUB_TOPIC: &str = "sphinx-return"; const USERNAME: &str = "sphinx-key"; const PASSWORD: &str = "sphinx-key-pass"; -const DEV_SEED: [u8; 32] = [0; 32]; #[tokio::main(worker_threads = 1)] async fn main() -> Result<(), Box> { setup_logging("sphinx-key-tester ", "info"); + dotenv().ok(); + let app = App::new("tester") .setting(AppSettings::NoAutoVersion) .about("CLN:mqtt-tester - MQTT client signer") @@ -65,8 +68,10 @@ async fn main() -> Result<(), Box> { .await .expect("could not mqtt subscribe"); + let seed_string: String = env::var("SEED").expect("no seed"); + let seed = hex::decode(seed_string).expect("couldnt decode seed"); // make the controller to validate Control messages - let mut ctrlr = controller_from_seed(&Network::Regtest, &DEV_SEED[..]); + let mut ctrlr = controller_from_seed(&Network::Regtest, &seed); if is_test { // test handler loop @@ -97,10 +102,17 @@ async fn main() -> Result<(), Box> { .expect("could not mqtt publish"); } CONTROL_TOPIC => { - match ctrlr.parse_msg(&msg_bytes) { - Ok(msg) => { - log::info!("CONTROL MSG {:?}", msg); - // create a response and mqtt pub here + match ctrlr.handle(&msg_bytes) { + Ok(response) => { + client + .publish( + CONTROL_PUB_TOPIC, + QoS::AtMostOnce, + false, + response, + ) + .await + .expect("could not mqtt publish"); } Err(e) => log::warn!("error parsing ctrl msg {:?}", e), };