From 4761ce04f87dc7b1f03e2cbd780a87d388113300 Mon Sep 17 00:00:00 2001 From: Evan Feenstra Date: Fri, 21 Apr 2023 14:59:22 +0200 Subject: [PATCH] mqtt topic fixes --- sphinx-key/src/conn/mqtt.rs | 18 +++++++++--------- sphinx-key/src/core/events.rs | 36 +++++++++++++++++++++++------------ sphinx-key/src/main.rs | 1 + 3 files changed, 34 insertions(+), 21 deletions(-) diff --git a/sphinx-key/src/conn/mqtt.rs b/sphinx-key/src/conn/mqtt.rs index 1dd9c50..4268f15 100644 --- a/sphinx-key/src/conn/mqtt.rs +++ b/sphinx-key/src/conn/mqtt.rs @@ -69,15 +69,15 @@ pub fn make_client( Event::Received(msg) => { let topic_opt = msg.topic(); if let Some(topic) = topic_opt { - match topic { - topics::VLS => tx - .send(CoreEvent::VlsMessage(msg.data().to_vec())) - .expect("couldnt send Event::VlsMessage"), - topics::CONTROL => tx - .send(CoreEvent::Control(msg.data().to_vec())) - .expect("couldnt send Event::Control"), - _ => log::warn!("unrecognized topic {}", topic), - }; + if topic.ends_with(topics::VLS) { + tx.send(CoreEvent::VlsMessage(msg.data().to_vec())) + .expect("couldnt send Event::VlsMessage"); + } else if topic.ends_with(topics::CONTROL) { + tx.send(CoreEvent::Control(msg.data().to_vec())) + .expect("couldnt send Event::Control"); + } else { + log::warn!("unrecognized topic {}", topic); + } } else { log::warn!("empty topic in msg!!!"); } diff --git a/sphinx-key/src/core/events.rs b/sphinx-key/src/core/events.rs index f162176..8670a1a 100644 --- a/sphinx-key/src/core/events.rs +++ b/sphinx-key/src/core/events.rs @@ -57,16 +57,19 @@ pub fn make_event_loop( seed: [u8; 32], policy: &Policy, mut ctrlr: Controller, + client_id: &str, ) -> Result<()> { while let Ok(event) = rx.recv() { log::info!("BROKER IP AND PORT: {}", config.broker); // wait for a Connection first. match event { Event::Connected => { - log::info!("SUBSCRIBE to {}", topics::VLS); - mqtt.subscribe(topics::VLS, QOS) + let vls_topic = format!("{}/{}", client_id, topics::VLS); + log::info!("SUBSCRIBE to {}", vls_topic); + mqtt.subscribe(&vls_topic, QOS) .expect("could not MQTT subscribe"); - mqtt.subscribe(topics::CONTROL, QOS) + let control_topic = format!("{}/{}", client_id, topics::CONTROL); + mqtt.subscribe(&control_topic, QOS) .expect("could not MQTT subscribe"); led_tx.send(Status::Connected).unwrap(); break; @@ -84,13 +87,16 @@ pub fn make_event_loop( sphinx_signer::root::init(seed, network, policy, persister).expect("failed to init signer"); // signing loop + log::info!("=> starting the main signing loop..."); while let Ok(event) = rx.recv() { match event { Event::Connected => { - log::info!("SUBSCRIBE TO {}", topics::VLS); - mqtt.subscribe(topics::VLS, QOS) + let vls_topic = format!("{}/{}", client_id, topics::VLS); + mqtt.subscribe(&vls_topic, QOS) .expect("could not MQTT subscribe"); - mqtt.subscribe(topics::CONTROL, QOS) + log::info!("SUBSCRIBE TO {}", vls_topic); + let control_topic = format!("{}/{}", client_id, topics::CONTROL); + mqtt.subscribe(&control_topic, QOS) .expect("could not MQTT subscribe"); led_tx.send(Status::Connected).unwrap(); } @@ -103,13 +109,15 @@ pub fn make_event_loop( let _ret = match sphinx_signer::root::handle(&root_handler, msg_bytes.clone(), do_log) { Ok(b) => { - mqtt.publish(topics::VLS_RETURN, QOS, false, &b) + let vls_return_topic = format!("{}/{}", client_id, topics::VLS_RETURN); + mqtt.publish(&vls_return_topic, QOS, false, &b) .expect("could not publish VLS response"); } Err(e) => { let err_msg = GlyphError::new(1, &e.to_string()); log::error!("HANDLE FAILED {:?}", e); - mqtt.publish(topics::ERROR, QOS, false, &err_msg.to_vec()[..]) + let error_topic = format!("{}/{}", client_id, topics::ERROR); + mqtt.publish(&error_topic, QOS, false, &err_msg.to_vec()[..]) .expect("could not publish VLS error"); // panic!("HANDLE FAILED {:?}", e); } @@ -123,7 +131,8 @@ pub fn make_event_loop( { let res_data = rmp_serde::to_vec_named(&res).expect("could not publish control response"); - mqtt.publish(topics::CONTROL_RETURN, QOS, false, &res_data) + let control_return_topic = format!("{}/{}", client_id, topics::CONTROL_RETURN); + mqtt.publish(&control_return_topic, QOS, false, &res_data) .expect("could not publish control response"); } } @@ -210,14 +219,16 @@ pub fn make_event_loop( _seed: [u8; 32], _policy: &Policy, mut _ctrlr: Controller, + client_id: &str, ) -> Result<()> { log::info!("About to subscribe to the mpsc channel"); while let Ok(event) = rx.recv() { match event { Event::Connected => { led_tx.send(Status::ConnectedToMqtt).unwrap(); - log::info!("SUBSCRIBE TO {}", topics::VLS); - mqtt.subscribe(topics::VLS, QOS) + let vls_topic = format!("{}/{}", client_id, topics::VLS); + log::info!("SUBSCRIBE TO {}", vls_topic); + mqtt.subscribe(&vls_topic, QOS) .expect("could not MQTT subscribe"); } Event::VlsMessage(msg_bytes) => { @@ -226,7 +237,8 @@ pub fn make_event_loop( if do_log { log::info!("GOT A PING MESSAGE! returning pong now..."); } - mqtt.publish(topics::VLS_RETURN, QOS, false, b) + let vls_return_topic = format!("{}/{}", client_id, topics::VLS_RETURN); + mqtt.publish(&vls_return_topic, QOS, false, b) .expect("could not publish ping response"); } Event::Disconnected => { diff --git a/sphinx-key/src/main.rs b/sphinx-key/src/main.rs index 51a8123..8ce3c02 100644 --- a/sphinx-key/src/main.rs +++ b/sphinx-key/src/main.rs @@ -164,6 +164,7 @@ fn make_and_launch_client( seed, policy, ctrlr, + CLIENT_ID, )?; Ok(()) }