mqtt topic fixes

This commit is contained in:
Evan Feenstra
2023-04-21 14:59:22 +02:00
parent 8d22e12267
commit 4761ce04f8
3 changed files with 34 additions and 21 deletions

View File

@@ -69,15 +69,15 @@ pub fn make_client(
Event::Received(msg) => {
let topic_opt = msg.topic();
if let Some(topic) = topic_opt {
match topic {
topics::VLS => tx
.send(CoreEvent::VlsMessage(msg.data().to_vec()))
.expect("couldnt send Event::VlsMessage"),
topics::CONTROL => tx
.send(CoreEvent::Control(msg.data().to_vec()))
.expect("couldnt send Event::Control"),
_ => log::warn!("unrecognized topic {}", topic),
};
if topic.ends_with(topics::VLS) {
tx.send(CoreEvent::VlsMessage(msg.data().to_vec()))
.expect("couldnt send Event::VlsMessage");
} else if topic.ends_with(topics::CONTROL) {
tx.send(CoreEvent::Control(msg.data().to_vec()))
.expect("couldnt send Event::Control");
} else {
log::warn!("unrecognized topic {}", topic);
}
} else {
log::warn!("empty topic in msg!!!");
}

View File

@@ -57,16 +57,19 @@ pub fn make_event_loop(
seed: [u8; 32],
policy: &Policy,
mut ctrlr: Controller,
client_id: &str,
) -> Result<()> {
while let Ok(event) = rx.recv() {
log::info!("BROKER IP AND PORT: {}", config.broker);
// wait for a Connection first.
match event {
Event::Connected => {
log::info!("SUBSCRIBE to {}", topics::VLS);
mqtt.subscribe(topics::VLS, QOS)
let vls_topic = format!("{}/{}", client_id, topics::VLS);
log::info!("SUBSCRIBE to {}", vls_topic);
mqtt.subscribe(&vls_topic, QOS)
.expect("could not MQTT subscribe");
mqtt.subscribe(topics::CONTROL, QOS)
let control_topic = format!("{}/{}", client_id, topics::CONTROL);
mqtt.subscribe(&control_topic, QOS)
.expect("could not MQTT subscribe");
led_tx.send(Status::Connected).unwrap();
break;
@@ -84,13 +87,16 @@ pub fn make_event_loop(
sphinx_signer::root::init(seed, network, policy, persister).expect("failed to init signer");
// signing loop
log::info!("=> starting the main signing loop...");
while let Ok(event) = rx.recv() {
match event {
Event::Connected => {
log::info!("SUBSCRIBE TO {}", topics::VLS);
mqtt.subscribe(topics::VLS, QOS)
let vls_topic = format!("{}/{}", client_id, topics::VLS);
mqtt.subscribe(&vls_topic, QOS)
.expect("could not MQTT subscribe");
mqtt.subscribe(topics::CONTROL, QOS)
log::info!("SUBSCRIBE TO {}", vls_topic);
let control_topic = format!("{}/{}", client_id, topics::CONTROL);
mqtt.subscribe(&control_topic, QOS)
.expect("could not MQTT subscribe");
led_tx.send(Status::Connected).unwrap();
}
@@ -103,13 +109,15 @@ pub fn make_event_loop(
let _ret =
match sphinx_signer::root::handle(&root_handler, msg_bytes.clone(), do_log) {
Ok(b) => {
mqtt.publish(topics::VLS_RETURN, QOS, false, &b)
let vls_return_topic = format!("{}/{}", client_id, topics::VLS_RETURN);
mqtt.publish(&vls_return_topic, QOS, false, &b)
.expect("could not publish VLS response");
}
Err(e) => {
let err_msg = GlyphError::new(1, &e.to_string());
log::error!("HANDLE FAILED {:?}", e);
mqtt.publish(topics::ERROR, QOS, false, &err_msg.to_vec()[..])
let error_topic = format!("{}/{}", client_id, topics::ERROR);
mqtt.publish(&error_topic, QOS, false, &err_msg.to_vec()[..])
.expect("could not publish VLS error");
// panic!("HANDLE FAILED {:?}", e);
}
@@ -123,7 +131,8 @@ pub fn make_event_loop(
{
let res_data =
rmp_serde::to_vec_named(&res).expect("could not publish control response");
mqtt.publish(topics::CONTROL_RETURN, QOS, false, &res_data)
let control_return_topic = format!("{}/{}", client_id, topics::CONTROL_RETURN);
mqtt.publish(&control_return_topic, QOS, false, &res_data)
.expect("could not publish control response");
}
}
@@ -210,14 +219,16 @@ pub fn make_event_loop(
_seed: [u8; 32],
_policy: &Policy,
mut _ctrlr: Controller,
client_id: &str,
) -> Result<()> {
log::info!("About to subscribe to the mpsc channel");
while let Ok(event) = rx.recv() {
match event {
Event::Connected => {
led_tx.send(Status::ConnectedToMqtt).unwrap();
log::info!("SUBSCRIBE TO {}", topics::VLS);
mqtt.subscribe(topics::VLS, QOS)
let vls_topic = format!("{}/{}", client_id, topics::VLS);
log::info!("SUBSCRIBE TO {}", vls_topic);
mqtt.subscribe(&vls_topic, QOS)
.expect("could not MQTT subscribe");
}
Event::VlsMessage(msg_bytes) => {
@@ -226,7 +237,8 @@ pub fn make_event_loop(
if do_log {
log::info!("GOT A PING MESSAGE! returning pong now...");
}
mqtt.publish(topics::VLS_RETURN, QOS, false, b)
let vls_return_topic = format!("{}/{}", client_id, topics::VLS_RETURN);
mqtt.publish(&vls_return_topic, QOS, false, b)
.expect("could not publish ping response");
}
Event::Disconnected => {

View File

@@ -164,6 +164,7 @@ fn make_and_launch_client(
seed,
policy,
ctrlr,
CLIENT_ID,
)?;
Ok(())
}