central topics consts shared between modules

This commit is contained in:
Evan Feenstra
2022-09-12 10:37:04 -07:00
parent 6d566107f6
commit 51659c74ac
11 changed files with 48 additions and 49 deletions

View File

@@ -1,6 +1,7 @@
use crate::{mqtt::PUB_TOPIC, ChannelReply, ChannelRequest}; use crate::{ChannelReply, ChannelRequest};
use async_trait::async_trait; use async_trait::async_trait;
use rocket::tokio::sync::{mpsc, oneshot}; use rocket::tokio::sync::{mpsc, oneshot};
use sphinx_key_parser::topics;
use vls_protocol::{Error, Result}; use vls_protocol::{Error, Result};
use vls_protocol_client::SignerPort; use vls_protocol_client::SignerPort;
@@ -28,7 +29,7 @@ impl MqttSignerPort {
} }
async fn send_request(&self, message: Vec<u8>) -> Result<oneshot::Receiver<ChannelReply>> { async fn send_request(&self, message: Vec<u8>) -> Result<oneshot::Receiver<ChannelReply>> {
let (request, reply_rx) = ChannelRequest::new(PUB_TOPIC, message); let (request, reply_rx) = ChannelRequest::new(topics::VLS, message);
self.sender.send(request).await.map_err(|_| Error::Eof)?; self.sender.send(request).await.map_err(|_| Error::Eof)?;
Ok(reply_rx) Ok(reply_rx)
} }

View File

@@ -8,14 +8,11 @@ use librumqttd::{
}; };
use rocket::tokio::time::timeout; use rocket::tokio::time::timeout;
use rocket::tokio::{self, sync::mpsc}; use rocket::tokio::{self, sync::mpsc};
use sphinx_key_parser::topics;
use std::sync::Arc; use std::sync::Arc;
use std::sync::{LazyLock, Mutex}; use std::sync::{LazyLock, Mutex};
use std::time::Duration; use std::time::Duration;
pub const PUB_TOPIC: &str = "sphinx";
pub const CONTROL_TOPIC: &str = "sphinx-control";
const SUB_TOPIC: &str = "sphinx-return";
const CONTROL_SUB_TOPIC: &str = "sphinx-control-return";
const USERNAME: &str = "sphinx-key"; const USERNAME: &str = "sphinx-key";
const PASSWORD: &str = "sphinx-key-pass"; const PASSWORD: &str = "sphinx-key-pass";
// must get a reply within this time, or disconnects // must get a reply within this time, or disconnects
@@ -52,7 +49,7 @@ pub async fn start_broker(
mpsc::channel(1000); mpsc::channel(1000);
let (mut link_tx, mut link_rx) = builder.clone().connect("localclient", 200).await.unwrap(); let (mut link_tx, mut link_rx) = builder.clone().connect("localclient", 200).await.unwrap();
link_tx link_tx
.subscribe([SUB_TOPIC, CONTROL_SUB_TOPIC]) .subscribe([topics::VLS_RETURN, topics::CONTROL_RETURN])
.await .await
.unwrap(); .unwrap();

View File

@@ -1,9 +1,10 @@
use crate::{mqtt::CONTROL_TOPIC, ChannelRequest}; use crate::ChannelRequest;
use rocket::fairing::{Fairing, Info, Kind}; use rocket::fairing::{Fairing, Info, Kind};
use rocket::http::Header; use rocket::http::Header;
use rocket::tokio::sync::mpsc::Sender; use rocket::tokio::sync::mpsc::Sender;
use rocket::*; use rocket::*;
use rocket::{Request, Response}; use rocket::{Request, Response};
use sphinx_key_parser::topics;
pub type Result<T> = std::result::Result<T, Error>; pub type Result<T> = std::result::Result<T, Error>;
@@ -14,7 +15,7 @@ pub async fn yo(sender: &State<Sender<ChannelRequest>>, msg: &str) -> Result<Str
if message.len() < 65 { if message.len() < 65 {
return Err(Error::Fail); return Err(Error::Fail);
} }
let (request, reply_rx) = ChannelRequest::new(CONTROL_TOPIC, message); let (request, reply_rx) = ChannelRequest::new(topics::CONTROL, message);
// send to ESP // send to ESP
let _ = sender.send(request).await.map_err(|_| Error::Fail)?; let _ = sender.send(request).await.map_err(|_| Error::Fail)?;
// wait for reply // wait for reply

View File

@@ -1,9 +1,10 @@
use crate::mqtt::{start_broker, PUB_TOPIC}; use crate::mqtt::start_broker;
use crate::routes::launch_rocket; use crate::routes::launch_rocket;
use crate::util::Settings; use crate::util::Settings;
use crate::ChannelRequest; use crate::ChannelRequest;
use rocket::tokio::{self, sync::mpsc}; use rocket::tokio::{self, sync::mpsc};
use sphinx_key_parser as parser; use sphinx_key_parser as parser;
use sphinx_key_parser::topics;
use vls_protocol::serde_bolt::WireString; use vls_protocol::serde_bolt::WireString;
use vls_protocol::{msgs, msgs::Message}; use vls_protocol::{msgs, msgs::Message};
@@ -68,7 +69,7 @@ pub async fn iteration(
}; };
let ping_bytes = parser::request_from_msg(ping, sequence, 0)?; let ping_bytes = parser::request_from_msg(ping, sequence, 0)?;
// Send a request to the MQTT handler to send to signer // Send a request to the MQTT handler to send to signer
let (request, reply_rx) = ChannelRequest::new(PUB_TOPIC, ping_bytes); let (request, reply_rx) = ChannelRequest::new(topics::VLS, ping_bytes);
tx.send(request).await?; tx.send(request).await?;
println!("tx.send(request)"); println!("tx.send(request)");
let res = reply_rx.await?; let res = reply_rx.await?;

View File

@@ -1,4 +1,3 @@
use crate::mqtt::PUB_TOPIC;
use crate::util::Settings; use crate::util::Settings;
use crate::{Channel, ChannelReply, ChannelRequest}; use crate::{Channel, ChannelReply, ChannelRequest};
use bitcoin::blockdata::constants::ChainHash; use bitcoin::blockdata::constants::ChainHash;
@@ -6,6 +5,7 @@ use log::*;
use rocket::tokio::sync::{mpsc, oneshot}; use rocket::tokio::sync::{mpsc, oneshot};
use secp256k1::PublicKey; use secp256k1::PublicKey;
use sphinx_key_parser as parser; use sphinx_key_parser as parser;
use sphinx_key_parser::topics;
use std::thread; use std::thread;
use vls_protocol::{msgs, msgs::Message, Error, Result}; use vls_protocol::{msgs, msgs::Message, Error, Result};
use vls_proxy::client::Client; use vls_proxy::client::Client;
@@ -127,7 +127,7 @@ impl<C: 'static + Client> SignerLoop<C> {
fn send_request(&mut self, message: Vec<u8>) -> Result<oneshot::Receiver<ChannelReply>> { fn send_request(&mut self, message: Vec<u8>) -> Result<oneshot::Receiver<ChannelReply>> {
// Send a request to the MQTT handler to send to signer // Send a request to the MQTT handler to send to signer
let (request, reply_rx) = ChannelRequest::new(PUB_TOPIC, message); let (request, reply_rx) = ChannelRequest::new(topics::VLS, message);
// This can fail if MQTT shuts down // This can fail if MQTT shuts down
self.chan self.chan
.sender .sender

View File

@@ -1,4 +1,5 @@
pub mod control; pub mod control;
pub mod topics;
use serde::ser; use serde::ser;
use std::cmp::min; use std::cmp::min;

6
parser/src/topics.rs Normal file
View File

@@ -0,0 +1,6 @@
pub const VLS: &str = "sphinx";
pub const VLS_RETURN: &str = "sphinx-return";
pub const CONTROL: &str = "sphinx-control";
pub const CONTROL_RETURN: &str = "sphinx-control-return";
pub const PROXY: &str = "sphinx-proxy";
pub const PROXY_RETURN: &str = "sphinx-proxy-return";

View File

@@ -20,7 +20,7 @@ use vls_protocol_signer::lightning_signer::wallet::Wallet;
pub use vls_protocol_signer::vls_protocol; pub use vls_protocol_signer::vls_protocol;
pub use derive::node_keys as derive_node_keys; pub use derive::node_keys as derive_node_keys;
pub use sphinx_key_parser::{control, MsgDriver}; pub use sphinx_key_parser::{control, topics, MsgDriver};
pub use sphinx_key_persister::FsPersister; pub use sphinx_key_persister::FsPersister;
pub struct InitResponse { pub struct InitResponse {
pub root_handler: RootHandler, pub root_handler: RootHandler,

View File

@@ -1,4 +1,5 @@
use crate::core::events::Event as CoreEvent; use crate::core::events::Event as CoreEvent;
use sphinx_key_signer::topics;
use anyhow::Result; use anyhow::Result;
use embedded_svc::mqtt::client::utils::ConnState; use embedded_svc::mqtt::client::utils::ConnState;
@@ -12,10 +13,6 @@ use log::*;
use std::sync::mpsc; use std::sync::mpsc;
use std::thread; use std::thread;
pub const VLS_TOPIC: &str = "sphinx";
pub const CONTROL_TOPIC: &str = "sphinx-control";
pub const RETURN_TOPIC: &str = "sphinx-return";
pub const CONTROL_RETURN_TOPIC: &str = "sphinx-control-return";
pub const USERNAME: &str = "sphinx-key"; pub const USERNAME: &str = "sphinx-key";
pub const PASSWORD: &str = "sphinx-key-pass"; pub const PASSWORD: &str = "sphinx-key-pass";
pub const QOS: QoS = QoS::AtMostOnce; pub const QOS: QoS = QoS::AtMostOnce;
@@ -84,10 +81,10 @@ pub fn start_listening(
let topic_opt = msg.topic(); let topic_opt = msg.topic();
if let Some(topic) = topic_opt { if let Some(topic) = topic_opt {
match topic { match topic {
VLS_TOPIC => tx topics::VLS => tx
.send(CoreEvent::VlsMessage(msg.data().to_vec())) .send(CoreEvent::VlsMessage(msg.data().to_vec()))
.expect("couldnt send Event::VlsMessage"), .expect("couldnt send Event::VlsMessage"),
CONTROL_TOPIC => tx topics::CONTROL => tx
.send(CoreEvent::Control(msg.data().to_vec())) .send(CoreEvent::Control(msg.data().to_vec()))
.expect("couldnt send Event::Control"), .expect("couldnt send Event::Control"),
_ => log::warn!("unrecognized topic {}", topic), _ => log::warn!("unrecognized topic {}", topic),
@@ -105,8 +102,5 @@ pub fn start_listening(
//info!("MQTT connection loop exit"); //info!("MQTT connection loop exit");
}); });
// log::info!("SUBSCRIBE TO {}", TOPIC);
// client.subscribe(TOPIC, QoS::AtMostOnce)?;
Ok(client) Ok(client)
} }

View File

@@ -1,8 +1,9 @@
use crate::conn::mqtt::{CONTROL_RETURN_TOPIC, CONTROL_TOPIC, QOS, RETURN_TOPIC, VLS_TOPIC}; use crate::conn::mqtt::QOS;
use crate::core::control::{controller_from_seed, FlashPersister}; use crate::core::control::{controller_from_seed, FlashPersister};
use sphinx_key_signer::control::Config; use sphinx_key_signer::control::Config;
use sphinx_key_signer::lightning_signer::bitcoin::Network; use sphinx_key_signer::lightning_signer::bitcoin::Network;
use sphinx_key_signer::topics;
use sphinx_key_signer::vls_protocol::model::PubKey; use sphinx_key_signer::vls_protocol::model::PubKey;
use sphinx_key_signer::{self, make_init_msg, InitResponse}; use sphinx_key_signer::{self, make_init_msg, InitResponse};
use std::sync::{mpsc, Arc, Mutex}; use std::sync::{mpsc, Arc, Mutex};
@@ -52,10 +53,10 @@ pub fn make_event_loop(
// wait for a Connection first. // wait for a Connection first.
match event { match event {
Event::Connected => { Event::Connected => {
log::info!("SUBSCRIBE to {}", VLS_TOPIC); log::info!("SUBSCRIBE to {}", topics::VLS);
mqtt.subscribe(VLS_TOPIC, QOS) mqtt.subscribe(topics::VLS, QOS)
.expect("could not MQTT subscribe"); .expect("could not MQTT subscribe");
mqtt.subscribe(CONTROL_TOPIC, QOS) mqtt.subscribe(topics::CONTROL, QOS)
.expect("could not MQTT subscribe"); .expect("could not MQTT subscribe");
led_tx.send(Status::Connected).unwrap(); led_tx.send(Status::Connected).unwrap();
break; break;
@@ -79,10 +80,10 @@ pub fn make_event_loop(
while let Ok(event) = rx.recv() { while let Ok(event) = rx.recv() {
match event { match event {
Event::Connected => { Event::Connected => {
log::info!("SUBSCRIBE TO {}", VLS_TOPIC); log::info!("SUBSCRIBE TO {}", topics::VLS);
mqtt.subscribe(VLS_TOPIC, QOS) mqtt.subscribe(topics::VLS, QOS)
.expect("could not MQTT subscribe"); .expect("could not MQTT subscribe");
mqtt.subscribe(CONTROL_TOPIC, QOS) mqtt.subscribe(topics::CONTROL, QOS)
.expect("could not MQTT subscribe"); .expect("could not MQTT subscribe");
led_tx.send(Status::Connected).unwrap(); led_tx.send(Status::Connected).unwrap();
} }
@@ -99,7 +100,7 @@ pub fn make_event_loop(
do_log, do_log,
) { ) {
Ok(b) => { Ok(b) => {
mqtt.publish(RETURN_TOPIC, QOS, false, &b) mqtt.publish(topics::VLS_RETURN, QOS, false, &b)
.expect("could not publish VLS response"); .expect("could not publish VLS response");
} }
Err(e) => { Err(e) => {
@@ -113,7 +114,7 @@ pub fn make_event_loop(
match ctrlr.handle(msg_bytes) { match ctrlr.handle(msg_bytes) {
Ok((response, _new_policy)) => { Ok((response, _new_policy)) => {
// log::info!("CONTROL MSG {:?}", response); // log::info!("CONTROL MSG {:?}", response);
mqtt.publish(CONTROL_RETURN_TOPIC, QOS, false, &response) mqtt.publish(topics::CONTROL_RETURN, QOS, false, &response)
.expect("could not publish control response"); .expect("could not publish control response");
} }
Err(e) => log::warn!("error parsing ctrl msg {:?}", e), Err(e) => log::warn!("error parsing ctrl msg {:?}", e),
@@ -139,8 +140,8 @@ pub fn make_event_loop(
match event { match event {
Event::Connected => { Event::Connected => {
led_tx.send(Status::ConnectedToMqtt).unwrap(); led_tx.send(Status::ConnectedToMqtt).unwrap();
log::info!("SUBSCRIBE TO {}", TOPIC); log::info!("SUBSCRIBE TO {}", topics::VLS);
mqtt.subscribe(TOPIC, QOS) mqtt.subscribe(topics::VLS, QOS)
.expect("could not MQTT subscribe"); .expect("could not MQTT subscribe");
} }
Event::VlsMessage(msg_bytes) => { Event::VlsMessage(msg_bytes) => {
@@ -149,7 +150,7 @@ pub fn make_event_loop(
if do_log { if do_log {
log::info!("GOT A PING MESSAGE! returning pong now..."); log::info!("GOT A PING MESSAGE! returning pong now...");
} }
mqtt.publish(RETURN_TOPIC, QOS, false, b) mqtt.publish(topics::VLS_RETURN, QOS, false, b)
.expect("could not publish ping response"); .expect("could not publish ping response");
} }
Event::Disconnected => { Event::Disconnected => {

View File

@@ -1,3 +1,4 @@
use parser::topics;
use sphinx_key_parser as parser; use sphinx_key_parser as parser;
use sphinx_key_signer::lightning_signer::bitcoin::Network; use sphinx_key_signer::lightning_signer::bitcoin::Network;
@@ -13,10 +14,6 @@ use std::error::Error;
use std::str::FromStr; use std::str::FromStr;
use std::time::Duration; use std::time::Duration;
const SUB_TOPIC: &str = "sphinx";
const CONTROL_TOPIC: &str = "sphinx-control";
const CONTROL_PUB_TOPIC: &str = "sphinx-control-return";
const PUB_TOPIC: &str = "sphinx-return";
const USERNAME: &str = "sphinx-key"; const USERNAME: &str = "sphinx-key";
const PASSWORD: &str = "sphinx-key-pass"; const PASSWORD: &str = "sphinx-key-pass";
@@ -61,11 +58,11 @@ async fn main() -> Result<(), Box<dyn Error>> {
}; };
client client
.subscribe(SUB_TOPIC, QoS::AtMostOnce) .subscribe(topics::VLS, QoS::AtMostOnce)
.await .await
.expect("could not mqtt subscribe"); .expect("could not mqtt subscribe");
client client
.subscribe(CONTROL_TOPIC, QoS::AtMostOnce) .subscribe(topics::CONTROL, QoS::AtMostOnce)
.await .await
.expect("could not mqtt subscribe"); .expect("could not mqtt subscribe");
@@ -83,7 +80,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
// println!("{:?}", event); // println!("{:?}", event);
if let Some((topic, msg_bytes)) = incoming_bytes(event) { if let Some((topic, msg_bytes)) = incoming_bytes(event) {
match topic.as_str() { match topic.as_str() {
SUB_TOPIC => { topics::VLS => {
let (ping, sequence, dbid): (msgs::Ping, u16, u64) = let (ping, sequence, dbid): (msgs::Ping, u16, u64) =
parser::request_from_bytes(msg_bytes) parser::request_from_bytes(msg_bytes)
.expect("read ping header"); .expect("read ping header");
@@ -99,16 +96,16 @@ async fn main() -> Result<(), Box<dyn Error>> {
let bytes = parser::raw_response_from_msg(pong, sequence) let bytes = parser::raw_response_from_msg(pong, sequence)
.expect("couldnt parse raw response"); .expect("couldnt parse raw response");
client client
.publish(PUB_TOPIC, QoS::AtMostOnce, false, bytes) .publish(topics::VLS_RETURN, QoS::AtMostOnce, false, bytes)
.await .await
.expect("could not mqtt publish"); .expect("could not mqtt publish");
} }
CONTROL_TOPIC => { topics::CONTROL => {
match ctrlr.handle(&msg_bytes) { match ctrlr.handle(&msg_bytes) {
Ok((response, _new_policy)) => { Ok((response, _new_policy)) => {
client client
.publish( .publish(
CONTROL_PUB_TOPIC, topics::CONTROL_RETURN,
QoS::AtMostOnce, QoS::AtMostOnce,
false, false,
response, response,
@@ -145,7 +142,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
let dummy_peer = PubKey([0; 33]); let dummy_peer = PubKey([0; 33]);
if let Some((topic, msg_bytes)) = incoming_bytes(event) { if let Some((topic, msg_bytes)) = incoming_bytes(event) {
match topic.as_str() { match topic.as_str() {
SUB_TOPIC => { topics::VLS => {
match sphinx_key_signer::handle( match sphinx_key_signer::handle(
&root_handler, &root_handler,
msg_bytes, msg_bytes,
@@ -153,18 +150,18 @@ async fn main() -> Result<(), Box<dyn Error>> {
is_log, is_log,
) { ) {
Ok(b) => client Ok(b) => client
.publish(PUB_TOPIC, QoS::AtMostOnce, false, b) .publish(topics::VLS_RETURN, QoS::AtMostOnce, false, b)
.await .await
.expect("could not publish init response"), .expect("could not publish init response"),
Err(e) => panic!("HANDLE FAILED {:?}", e), Err(e) => panic!("HANDLE FAILED {:?}", e),
}; };
} }
CONTROL_TOPIC => { topics::CONTROL => {
match ctrlr.handle(&msg_bytes) { match ctrlr.handle(&msg_bytes) {
Ok((response, _new_policy)) => { Ok((response, _new_policy)) => {
client client
.publish( .publish(
CONTROL_PUB_TOPIC, topics::CONTROL_RETURN,
QoS::AtMostOnce, QoS::AtMostOnce,
false, false,
response, response,