From 51659c74acf6abade95f79d2ea9f52bec522ef91 Mon Sep 17 00:00:00 2001 From: Evan Feenstra Date: Mon, 12 Sep 2022 10:37:04 -0700 Subject: [PATCH] central topics consts shared between modules --- broker/src/chain_tracker.rs | 5 +++-- broker/src/mqtt.rs | 7 ++----- broker/src/routes.rs | 5 +++-- broker/src/run_test.rs | 5 +++-- broker/src/unix_fd.rs | 4 ++-- parser/src/lib.rs | 1 + parser/src/topics.rs | 6 ++++++ signer/src/lib.rs | 2 +- sphinx-key/src/conn/mqtt.rs | 12 +++--------- sphinx-key/src/core/events.rs | 25 +++++++++++++------------ tester/src/main.rs | 25 +++++++++++-------------- 11 files changed, 48 insertions(+), 49 deletions(-) create mode 100644 parser/src/topics.rs diff --git a/broker/src/chain_tracker.rs b/broker/src/chain_tracker.rs index 4f924de..7ed7b28 100644 --- a/broker/src/chain_tracker.rs +++ b/broker/src/chain_tracker.rs @@ -1,6 +1,7 @@ -use crate::{mqtt::PUB_TOPIC, ChannelReply, ChannelRequest}; +use crate::{ChannelReply, ChannelRequest}; use async_trait::async_trait; use rocket::tokio::sync::{mpsc, oneshot}; +use sphinx_key_parser::topics; use vls_protocol::{Error, Result}; use vls_protocol_client::SignerPort; @@ -28,7 +29,7 @@ impl MqttSignerPort { } async fn send_request(&self, message: Vec) -> Result> { - let (request, reply_rx) = ChannelRequest::new(PUB_TOPIC, message); + let (request, reply_rx) = ChannelRequest::new(topics::VLS, message); self.sender.send(request).await.map_err(|_| Error::Eof)?; Ok(reply_rx) } diff --git a/broker/src/mqtt.rs b/broker/src/mqtt.rs index 41acae9..05f9b36 100644 --- a/broker/src/mqtt.rs +++ b/broker/src/mqtt.rs @@ -8,14 +8,11 @@ use librumqttd::{ }; use rocket::tokio::time::timeout; use rocket::tokio::{self, sync::mpsc}; +use sphinx_key_parser::topics; use std::sync::Arc; use std::sync::{LazyLock, Mutex}; use std::time::Duration; -pub const PUB_TOPIC: &str = "sphinx"; -pub const CONTROL_TOPIC: &str = "sphinx-control"; -const SUB_TOPIC: &str = "sphinx-return"; -const CONTROL_SUB_TOPIC: &str = "sphinx-control-return"; const USERNAME: &str = "sphinx-key"; const PASSWORD: &str = "sphinx-key-pass"; // must get a reply within this time, or disconnects @@ -52,7 +49,7 @@ pub async fn start_broker( mpsc::channel(1000); let (mut link_tx, mut link_rx) = builder.clone().connect("localclient", 200).await.unwrap(); link_tx - .subscribe([SUB_TOPIC, CONTROL_SUB_TOPIC]) + .subscribe([topics::VLS_RETURN, topics::CONTROL_RETURN]) .await .unwrap(); diff --git a/broker/src/routes.rs b/broker/src/routes.rs index 3566098..d177857 100644 --- a/broker/src/routes.rs +++ b/broker/src/routes.rs @@ -1,9 +1,10 @@ -use crate::{mqtt::CONTROL_TOPIC, ChannelRequest}; +use crate::ChannelRequest; use rocket::fairing::{Fairing, Info, Kind}; use rocket::http::Header; use rocket::tokio::sync::mpsc::Sender; use rocket::*; use rocket::{Request, Response}; +use sphinx_key_parser::topics; pub type Result = std::result::Result; @@ -14,7 +15,7 @@ pub async fn yo(sender: &State>, msg: &str) -> Result SignerLoop { fn send_request(&mut self, message: Vec) -> Result> { // Send a request to the MQTT handler to send to signer - let (request, reply_rx) = ChannelRequest::new(PUB_TOPIC, message); + let (request, reply_rx) = ChannelRequest::new(topics::VLS, message); // This can fail if MQTT shuts down self.chan .sender diff --git a/parser/src/lib.rs b/parser/src/lib.rs index f90df23..8f3d9fc 100644 --- a/parser/src/lib.rs +++ b/parser/src/lib.rs @@ -1,4 +1,5 @@ pub mod control; +pub mod topics; use serde::ser; use std::cmp::min; diff --git a/parser/src/topics.rs b/parser/src/topics.rs new file mode 100644 index 0000000..5e0a0bb --- /dev/null +++ b/parser/src/topics.rs @@ -0,0 +1,6 @@ +pub const VLS: &str = "sphinx"; +pub const VLS_RETURN: &str = "sphinx-return"; +pub const CONTROL: &str = "sphinx-control"; +pub const CONTROL_RETURN: &str = "sphinx-control-return"; +pub const PROXY: &str = "sphinx-proxy"; +pub const PROXY_RETURN: &str = "sphinx-proxy-return"; diff --git a/signer/src/lib.rs b/signer/src/lib.rs index a0d1d56..3cf1e81 100644 --- a/signer/src/lib.rs +++ b/signer/src/lib.rs @@ -20,7 +20,7 @@ use vls_protocol_signer::lightning_signer::wallet::Wallet; pub use vls_protocol_signer::vls_protocol; pub use derive::node_keys as derive_node_keys; -pub use sphinx_key_parser::{control, MsgDriver}; +pub use sphinx_key_parser::{control, topics, MsgDriver}; pub use sphinx_key_persister::FsPersister; pub struct InitResponse { pub root_handler: RootHandler, diff --git a/sphinx-key/src/conn/mqtt.rs b/sphinx-key/src/conn/mqtt.rs index 3ba8115..aefae26 100644 --- a/sphinx-key/src/conn/mqtt.rs +++ b/sphinx-key/src/conn/mqtt.rs @@ -1,4 +1,5 @@ use crate::core::events::Event as CoreEvent; +use sphinx_key_signer::topics; use anyhow::Result; use embedded_svc::mqtt::client::utils::ConnState; @@ -12,10 +13,6 @@ use log::*; use std::sync::mpsc; use std::thread; -pub const VLS_TOPIC: &str = "sphinx"; -pub const CONTROL_TOPIC: &str = "sphinx-control"; -pub const RETURN_TOPIC: &str = "sphinx-return"; -pub const CONTROL_RETURN_TOPIC: &str = "sphinx-control-return"; pub const USERNAME: &str = "sphinx-key"; pub const PASSWORD: &str = "sphinx-key-pass"; pub const QOS: QoS = QoS::AtMostOnce; @@ -84,10 +81,10 @@ pub fn start_listening( let topic_opt = msg.topic(); if let Some(topic) = topic_opt { match topic { - VLS_TOPIC => tx + topics::VLS => tx .send(CoreEvent::VlsMessage(msg.data().to_vec())) .expect("couldnt send Event::VlsMessage"), - CONTROL_TOPIC => tx + topics::CONTROL => tx .send(CoreEvent::Control(msg.data().to_vec())) .expect("couldnt send Event::Control"), _ => log::warn!("unrecognized topic {}", topic), @@ -105,8 +102,5 @@ pub fn start_listening( //info!("MQTT connection loop exit"); }); - // log::info!("SUBSCRIBE TO {}", TOPIC); - // client.subscribe(TOPIC, QoS::AtMostOnce)?; - Ok(client) } diff --git a/sphinx-key/src/core/events.rs b/sphinx-key/src/core/events.rs index abffd02..af31462 100644 --- a/sphinx-key/src/core/events.rs +++ b/sphinx-key/src/core/events.rs @@ -1,8 +1,9 @@ -use crate::conn::mqtt::{CONTROL_RETURN_TOPIC, CONTROL_TOPIC, QOS, RETURN_TOPIC, VLS_TOPIC}; +use crate::conn::mqtt::QOS; use crate::core::control::{controller_from_seed, FlashPersister}; use sphinx_key_signer::control::Config; use sphinx_key_signer::lightning_signer::bitcoin::Network; +use sphinx_key_signer::topics; use sphinx_key_signer::vls_protocol::model::PubKey; use sphinx_key_signer::{self, make_init_msg, InitResponse}; use std::sync::{mpsc, Arc, Mutex}; @@ -52,10 +53,10 @@ pub fn make_event_loop( // wait for a Connection first. match event { Event::Connected => { - log::info!("SUBSCRIBE to {}", VLS_TOPIC); - mqtt.subscribe(VLS_TOPIC, QOS) + log::info!("SUBSCRIBE to {}", topics::VLS); + mqtt.subscribe(topics::VLS, QOS) .expect("could not MQTT subscribe"); - mqtt.subscribe(CONTROL_TOPIC, QOS) + mqtt.subscribe(topics::CONTROL, QOS) .expect("could not MQTT subscribe"); led_tx.send(Status::Connected).unwrap(); break; @@ -79,10 +80,10 @@ pub fn make_event_loop( while let Ok(event) = rx.recv() { match event { Event::Connected => { - log::info!("SUBSCRIBE TO {}", VLS_TOPIC); - mqtt.subscribe(VLS_TOPIC, QOS) + log::info!("SUBSCRIBE TO {}", topics::VLS); + mqtt.subscribe(topics::VLS, QOS) .expect("could not MQTT subscribe"); - mqtt.subscribe(CONTROL_TOPIC, QOS) + mqtt.subscribe(topics::CONTROL, QOS) .expect("could not MQTT subscribe"); led_tx.send(Status::Connected).unwrap(); } @@ -99,7 +100,7 @@ pub fn make_event_loop( do_log, ) { Ok(b) => { - mqtt.publish(RETURN_TOPIC, QOS, false, &b) + mqtt.publish(topics::VLS_RETURN, QOS, false, &b) .expect("could not publish VLS response"); } Err(e) => { @@ -113,7 +114,7 @@ pub fn make_event_loop( match ctrlr.handle(msg_bytes) { Ok((response, _new_policy)) => { // log::info!("CONTROL MSG {:?}", response); - mqtt.publish(CONTROL_RETURN_TOPIC, QOS, false, &response) + mqtt.publish(topics::CONTROL_RETURN, QOS, false, &response) .expect("could not publish control response"); } Err(e) => log::warn!("error parsing ctrl msg {:?}", e), @@ -139,8 +140,8 @@ pub fn make_event_loop( match event { Event::Connected => { led_tx.send(Status::ConnectedToMqtt).unwrap(); - log::info!("SUBSCRIBE TO {}", TOPIC); - mqtt.subscribe(TOPIC, QOS) + log::info!("SUBSCRIBE TO {}", topics::VLS); + mqtt.subscribe(topics::VLS, QOS) .expect("could not MQTT subscribe"); } Event::VlsMessage(msg_bytes) => { @@ -149,7 +150,7 @@ pub fn make_event_loop( if do_log { log::info!("GOT A PING MESSAGE! returning pong now..."); } - mqtt.publish(RETURN_TOPIC, QOS, false, b) + mqtt.publish(topics::VLS_RETURN, QOS, false, b) .expect("could not publish ping response"); } Event::Disconnected => { diff --git a/tester/src/main.rs b/tester/src/main.rs index 4f3f76e..07aae3b 100644 --- a/tester/src/main.rs +++ b/tester/src/main.rs @@ -1,3 +1,4 @@ +use parser::topics; use sphinx_key_parser as parser; use sphinx_key_signer::lightning_signer::bitcoin::Network; @@ -13,10 +14,6 @@ use std::error::Error; use std::str::FromStr; use std::time::Duration; -const SUB_TOPIC: &str = "sphinx"; -const CONTROL_TOPIC: &str = "sphinx-control"; -const CONTROL_PUB_TOPIC: &str = "sphinx-control-return"; -const PUB_TOPIC: &str = "sphinx-return"; const USERNAME: &str = "sphinx-key"; const PASSWORD: &str = "sphinx-key-pass"; @@ -61,11 +58,11 @@ async fn main() -> Result<(), Box> { }; client - .subscribe(SUB_TOPIC, QoS::AtMostOnce) + .subscribe(topics::VLS, QoS::AtMostOnce) .await .expect("could not mqtt subscribe"); client - .subscribe(CONTROL_TOPIC, QoS::AtMostOnce) + .subscribe(topics::CONTROL, QoS::AtMostOnce) .await .expect("could not mqtt subscribe"); @@ -83,7 +80,7 @@ async fn main() -> Result<(), Box> { // println!("{:?}", event); if let Some((topic, msg_bytes)) = incoming_bytes(event) { match topic.as_str() { - SUB_TOPIC => { + topics::VLS => { let (ping, sequence, dbid): (msgs::Ping, u16, u64) = parser::request_from_bytes(msg_bytes) .expect("read ping header"); @@ -99,16 +96,16 @@ async fn main() -> Result<(), Box> { let bytes = parser::raw_response_from_msg(pong, sequence) .expect("couldnt parse raw response"); client - .publish(PUB_TOPIC, QoS::AtMostOnce, false, bytes) + .publish(topics::VLS_RETURN, QoS::AtMostOnce, false, bytes) .await .expect("could not mqtt publish"); } - CONTROL_TOPIC => { + topics::CONTROL => { match ctrlr.handle(&msg_bytes) { Ok((response, _new_policy)) => { client .publish( - CONTROL_PUB_TOPIC, + topics::CONTROL_RETURN, QoS::AtMostOnce, false, response, @@ -145,7 +142,7 @@ async fn main() -> Result<(), Box> { let dummy_peer = PubKey([0; 33]); if let Some((topic, msg_bytes)) = incoming_bytes(event) { match topic.as_str() { - SUB_TOPIC => { + topics::VLS => { match sphinx_key_signer::handle( &root_handler, msg_bytes, @@ -153,18 +150,18 @@ async fn main() -> Result<(), Box> { is_log, ) { Ok(b) => client - .publish(PUB_TOPIC, QoS::AtMostOnce, false, b) + .publish(topics::VLS_RETURN, QoS::AtMostOnce, false, b) .await .expect("could not publish init response"), Err(e) => panic!("HANDLE FAILED {:?}", e), }; } - CONTROL_TOPIC => { + topics::CONTROL => { match ctrlr.handle(&msg_bytes) { Ok((response, _new_policy)) => { client .publish( - CONTROL_PUB_TOPIC, + topics::CONTROL_RETURN, QoS::AtMostOnce, false, response,