Merge pull request #58 from stakwork/event-msgs

Event msgs
This commit is contained in:
Evan Feenstra
2022-09-01 09:38:17 -07:00
committed by GitHub
2 changed files with 50 additions and 28 deletions

View File

@@ -12,7 +12,9 @@ use log::*;
use std::sync::mpsc; use std::sync::mpsc;
use std::thread; use std::thread;
pub const TOPIC: &str = "sphinx"; pub const VLS_TOPIC: &str = "sphinx";
pub const CONTROL_TOPIC: &str = "sphinx-control";
pub const OTA_TOPIC: &str = "sphinx-ota";
pub const RETURN_TOPIC: &str = "sphinx-return"; pub const RETURN_TOPIC: &str = "sphinx-return";
pub const USERNAME: &str = "sphinx-key"; pub const USERNAME: &str = "sphinx-key";
pub const PASSWORD: &str = "sphinx-key-pass"; pub const PASSWORD: &str = "sphinx-key-pass";
@@ -78,9 +80,25 @@ pub fn start_listening(
Event::Subscribed(_mes_id) => info!("RECEIVED Subscribed MESSAGE"), Event::Subscribed(_mes_id) => info!("RECEIVED Subscribed MESSAGE"),
Event::Unsubscribed(_mes_id) => info!("RECEIVED Unsubscribed MESSAGE"), Event::Unsubscribed(_mes_id) => info!("RECEIVED Unsubscribed MESSAGE"),
Event::Published(_mes_id) => info!("RECEIVED Published MESSAGE"), Event::Published(_mes_id) => info!("RECEIVED Published MESSAGE"),
Event::Received(msg) => tx Event::Received(msg) => {
.send(CoreEvent::Message(msg.data().to_vec())) let topic_opt = msg.topic();
.expect("couldnt send Event::Message"), if let Some(topic) = topic_opt {
match topic {
VLS_TOPIC => tx
.send(CoreEvent::VlsMessage(msg.data().to_vec()))
.expect("couldnt send Event::VlsMessage"),
CONTROL_TOPIC => tx
.send(CoreEvent::Control(msg.data().to_vec()))
.expect("couldnt send Event::Control"),
OTA_TOPIC => tx
.send(CoreEvent::Ota(msg.data().to_vec()))
.expect("couldnt send Event::Ota"),
_ => log::warn!("unrecognized topic {}", topic),
};
} else {
log::warn!("empty topic in msg!!!");
}
}
Event::Deleted(_mes_id) => info!("RECEIVED Deleted MESSAGE"), Event::Deleted(_mes_id) => info!("RECEIVED Deleted MESSAGE"),
}, },
}, },

View File

@@ -1,4 +1,4 @@
use crate::conn::mqtt::{QOS, RETURN_TOPIC, TOPIC}; use crate::conn::mqtt::{CONTROL_TOPIC, OTA_TOPIC, QOS, RETURN_TOPIC, VLS_TOPIC};
use crate::core::config::Config; use crate::core::config::Config;
use crate::core::init::make_init_msg; use crate::core::init::make_init_msg;
@@ -6,7 +6,6 @@ use sphinx_key_signer::lightning_signer::bitcoin::Network;
use sphinx_key_signer::vls_protocol::model::PubKey; use sphinx_key_signer::vls_protocol::model::PubKey;
use sphinx_key_signer::{self, InitResponse}; use sphinx_key_signer::{self, InitResponse};
use std::sync::mpsc; use std::sync::mpsc;
use std::time::SystemTime;
use embedded_svc::httpd::Result; use embedded_svc::httpd::Result;
use embedded_svc::mqtt::client::utils::ConnState; use embedded_svc::mqtt::client::utils::ConnState;
@@ -19,7 +18,9 @@ use esp_idf_sys::EspError;
pub enum Event { pub enum Event {
Connected, Connected,
Disconnected, Disconnected,
Message(Vec<u8>), VlsMessage(Vec<u8>),
Ota(Vec<u8>),
Control(Vec<u8>),
} }
#[derive(Debug, Ord, PartialOrd, Eq, PartialEq)] #[derive(Debug, Ord, PartialOrd, Eq, PartialEq)]
@@ -47,25 +48,20 @@ pub fn make_event_loop(
) -> Result<()> { ) -> Result<()> {
while let Ok(event) = rx.recv() { while let Ok(event) = rx.recv() {
log::info!("BROKER IP AND PORT: {}", config.broker); log::info!("BROKER IP AND PORT: {}", config.broker);
// wait for a Connection first.
match event { match event {
Event::Connected => { Event::Connected => {
log::info!("SUBSCRIBE to {}", TOPIC); log::info!("SUBSCRIBE to {}", VLS_TOPIC);
mqtt.subscribe(TOPIC, QOS) mqtt.subscribe(VLS_TOPIC, QOS)
.expect("could not MQTT subscribe");
mqtt.subscribe(CONTROL_TOPIC, QOS)
.expect("could not MQTT subscribe");
mqtt.subscribe(OTA_TOPIC, QOS)
.expect("could not MQTT subscribe"); .expect("could not MQTT subscribe");
led_tx.send(Status::Connected).unwrap(); led_tx.send(Status::Connected).unwrap();
break; break;
} }
Event::Message(ref _msg_bytes) => { _ => (),
panic!("should not be a message before connection");
}
Event::Disconnected => {
led_tx.send(Status::ConnectingToMqtt).unwrap();
log::info!("GOT an early Event::Disconnected msg!");
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap();
log::info!("Tracking the time: {}", now.as_secs());
}
} }
} }
@@ -80,12 +76,20 @@ pub fn make_event_loop(
while let Ok(event) = rx.recv() { while let Ok(event) = rx.recv() {
match event { match event {
Event::Connected => { Event::Connected => {
log::info!("SUBSCRIBE TO {}", TOPIC); log::info!("SUBSCRIBE TO {}", VLS_TOPIC);
mqtt.subscribe(TOPIC, QOS) mqtt.subscribe(VLS_TOPIC, QOS)
.expect("could not MQTT subscribe");
mqtt.subscribe(CONTROL_TOPIC, QOS)
.expect("could not MQTT subscribe");
mqtt.subscribe(OTA_TOPIC, QOS)
.expect("could not MQTT subscribe"); .expect("could not MQTT subscribe");
led_tx.send(Status::Connected).unwrap(); led_tx.send(Status::Connected).unwrap();
} }
Event::Message(ref msg_bytes) => { Event::Disconnected => {
led_tx.send(Status::ConnectingToMqtt).unwrap();
log::info!("GOT A Event::Disconnected msg!");
}
Event::VlsMessage(ref msg_bytes) => {
led_tx.send(Status::Signing).unwrap(); led_tx.send(Status::Signing).unwrap();
let _ret = match sphinx_key_signer::handle( let _ret = match sphinx_key_signer::handle(
&root_handler, &root_handler,
@@ -103,10 +107,8 @@ pub fn make_event_loop(
} }
}; };
} }
Event::Disconnected => { Event::Control(_) => (),
led_tx.send(Status::ConnectingToMqtt).unwrap(); Event::Ota(_) => (),
log::info!("GOT A Event::Disconnected msg!");
}
} }
} }
@@ -131,7 +133,7 @@ pub fn make_event_loop(
mqtt.subscribe(TOPIC, QOS) mqtt.subscribe(TOPIC, QOS)
.expect("could not MQTT subscribe"); .expect("could not MQTT subscribe");
} }
Event::Message(msg_bytes) => { Event::VlsMessage(msg_bytes) => {
led_tx.send(Status::Signing).unwrap(); led_tx.send(Status::Signing).unwrap();
let b = sphinx_key_signer::parse_ping_and_form_response(msg_bytes); let b = sphinx_key_signer::parse_ping_and_form_response(msg_bytes);
if do_log { if do_log {
@@ -144,6 +146,8 @@ pub fn make_event_loop(
led_tx.send(Status::ConnectingToMqtt).unwrap(); led_tx.send(Status::ConnectingToMqtt).unwrap();
log::info!("GOT A Event::Disconnected msg!"); log::info!("GOT A Event::Disconnected msg!");
} }
Event::Control(_) => (),
Event::Ota(_) => (),
} }
} }