diff --git a/sphinx-key/src/conn/mqtt.rs b/sphinx-key/src/conn/mqtt.rs index c8834cf..0f2fd23 100644 --- a/sphinx-key/src/conn/mqtt.rs +++ b/sphinx-key/src/conn/mqtt.rs @@ -12,7 +12,9 @@ use log::*; use std::sync::mpsc; use std::thread; -pub const TOPIC: &str = "sphinx"; +pub const VLS_TOPIC: &str = "sphinx"; +pub const CONTROL_TOPIC: &str = "sphinx-control"; +pub const OTA_TOPIC: &str = "sphinx-ota"; pub const RETURN_TOPIC: &str = "sphinx-return"; pub const USERNAME: &str = "sphinx-key"; pub const PASSWORD: &str = "sphinx-key-pass"; @@ -78,9 +80,25 @@ pub fn start_listening( Event::Subscribed(_mes_id) => info!("RECEIVED Subscribed MESSAGE"), Event::Unsubscribed(_mes_id) => info!("RECEIVED Unsubscribed MESSAGE"), Event::Published(_mes_id) => info!("RECEIVED Published MESSAGE"), - Event::Received(msg) => tx - .send(CoreEvent::Message(msg.data().to_vec())) - .expect("couldnt send Event::Message"), + Event::Received(msg) => { + let topic_opt = msg.topic(); + if let Some(topic) = topic_opt { + match topic { + VLS_TOPIC => tx + .send(CoreEvent::VlsMessage(msg.data().to_vec())) + .expect("couldnt send Event::VlsMessage"), + CONTROL_TOPIC => tx + .send(CoreEvent::Control(msg.data().to_vec())) + .expect("couldnt send Event::Control"), + OTA_TOPIC => tx + .send(CoreEvent::Ota(msg.data().to_vec())) + .expect("couldnt send Event::Ota"), + _ => log::warn!("unrecognized topic {}", topic), + }; + } else { + log::warn!("empty topic in msg!!!"); + } + } Event::Deleted(_mes_id) => info!("RECEIVED Deleted MESSAGE"), }, }, diff --git a/sphinx-key/src/core/events.rs b/sphinx-key/src/core/events.rs index be8b5ab..70c9569 100644 --- a/sphinx-key/src/core/events.rs +++ b/sphinx-key/src/core/events.rs @@ -1,4 +1,4 @@ -use crate::conn::mqtt::{QOS, RETURN_TOPIC, TOPIC}; +use crate::conn::mqtt::{CONTROL_TOPIC, OTA_TOPIC, QOS, RETURN_TOPIC, VLS_TOPIC}; use crate::core::config::Config; use crate::core::init::make_init_msg; @@ -6,7 +6,6 @@ use sphinx_key_signer::lightning_signer::bitcoin::Network; use sphinx_key_signer::vls_protocol::model::PubKey; use sphinx_key_signer::{self, InitResponse}; use std::sync::mpsc; -use std::time::SystemTime; use embedded_svc::httpd::Result; use embedded_svc::mqtt::client::utils::ConnState; @@ -19,7 +18,9 @@ use esp_idf_sys::EspError; pub enum Event { Connected, Disconnected, - Message(Vec), + VlsMessage(Vec), + Ota(Vec), + Control(Vec), } #[derive(Debug, Ord, PartialOrd, Eq, PartialEq)] @@ -47,25 +48,20 @@ pub fn make_event_loop( ) -> Result<()> { while let Ok(event) = rx.recv() { log::info!("BROKER IP AND PORT: {}", config.broker); + // wait for a Connection first. match event { Event::Connected => { - log::info!("SUBSCRIBE to {}", TOPIC); - mqtt.subscribe(TOPIC, QOS) + log::info!("SUBSCRIBE to {}", VLS_TOPIC); + mqtt.subscribe(VLS_TOPIC, QOS) + .expect("could not MQTT subscribe"); + mqtt.subscribe(CONTROL_TOPIC, QOS) + .expect("could not MQTT subscribe"); + mqtt.subscribe(OTA_TOPIC, QOS) .expect("could not MQTT subscribe"); led_tx.send(Status::Connected).unwrap(); break; } - Event::Message(ref _msg_bytes) => { - panic!("should not be a message before connection"); - } - Event::Disconnected => { - led_tx.send(Status::ConnectingToMqtt).unwrap(); - log::info!("GOT an early Event::Disconnected msg!"); - let now = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap(); - log::info!("Tracking the time: {}", now.as_secs()); - } + _ => (), } } @@ -80,12 +76,20 @@ pub fn make_event_loop( while let Ok(event) = rx.recv() { match event { Event::Connected => { - log::info!("SUBSCRIBE TO {}", TOPIC); - mqtt.subscribe(TOPIC, QOS) + log::info!("SUBSCRIBE TO {}", VLS_TOPIC); + mqtt.subscribe(VLS_TOPIC, QOS) + .expect("could not MQTT subscribe"); + mqtt.subscribe(CONTROL_TOPIC, QOS) + .expect("could not MQTT subscribe"); + mqtt.subscribe(OTA_TOPIC, QOS) .expect("could not MQTT subscribe"); led_tx.send(Status::Connected).unwrap(); } - Event::Message(ref msg_bytes) => { + Event::Disconnected => { + led_tx.send(Status::ConnectingToMqtt).unwrap(); + log::info!("GOT A Event::Disconnected msg!"); + } + Event::VlsMessage(ref msg_bytes) => { led_tx.send(Status::Signing).unwrap(); let _ret = match sphinx_key_signer::handle( &root_handler, @@ -103,10 +107,8 @@ pub fn make_event_loop( } }; } - Event::Disconnected => { - led_tx.send(Status::ConnectingToMqtt).unwrap(); - log::info!("GOT A Event::Disconnected msg!"); - } + Event::Control(_) => (), + Event::Ota(_) => (), } } @@ -131,7 +133,7 @@ pub fn make_event_loop( mqtt.subscribe(TOPIC, QOS) .expect("could not MQTT subscribe"); } - Event::Message(msg_bytes) => { + Event::VlsMessage(msg_bytes) => { led_tx.send(Status::Signing).unwrap(); let b = sphinx_key_signer::parse_ping_and_form_response(msg_bytes); if do_log { @@ -144,6 +146,8 @@ pub fn make_event_loop( led_tx.send(Status::ConnectingToMqtt).unwrap(); log::info!("GOT A Event::Disconnected msg!"); } + Event::Control(_) => (), + Event::Ota(_) => (), } }