mqtt chunk handling

This commit is contained in:
Evan Feenstra
2023-06-06 10:55:22 -07:00
parent 7947853cfd
commit 98c13041ff

View File

@@ -2,6 +2,7 @@ use crate::core::events::Event as CoreEvent;
use sphinx_signer::sphinx_glyph::topics;
use anyhow::Result;
use embedded_svc::mqtt::client::Details;
use embedded_svc::mqtt::client::{Connection, Event, Message as MqttMessage, MessageImpl, QoS};
use embedded_svc::utils::mqtt::client::ConnState;
// use embedded_svc::utils::mqtt::client::Connection as MqttConnection;
@@ -50,6 +51,8 @@ pub fn make_client(
thread::spawn(move || {
info!("MQTT Listening for messages");
let mut inflight = Vec::new();
let mut inflight_topic = "".to_string();
loop {
match connection.next() {
Some(msg) => match msg {
@@ -75,25 +78,50 @@ pub fn make_client(
Event::Unsubscribed(_mes_id) => info!("RECEIVED Unsubscribed MESSAGE"),
Event::Published(_mes_id) => info!("RECEIVED Published MESSAGE"),
Event::Received(msg) => {
let topic_opt = msg.topic();
log::info!("received msg details {:?}", msg.details());
if let Some(topic) = topic_opt {
let incoming_message: Option<(String, Vec<u8>)> = match msg.details() {
Details::Complete => {
if let Some(topic) = msg.topic() {
Some((topic.to_string(), msg.data().to_vec()))
} else {
None
}
}
Details::InitialChunk(chunk_info) => {
if let Some(topic) = msg.topic() {
inflight_topic = topic.to_string();
inflight.extend(msg.data().iter());
None
} else {
None
}
}
Details::SubsequentChunk(chunk_data) => {
inflight.extend(msg.data().iter());
if inflight.len() == chunk_data.total_data_size {
let ret = Some((inflight_topic, inflight));
inflight_topic = "".to_string();
inflight = Vec::new();
ret
} else {
None
}
}
};
if let Some((topic, data)) = incoming_message {
if topic.ends_with(topics::VLS) {
tx.send(CoreEvent::VlsMessage(msg.data().to_vec()))
tx.send(CoreEvent::VlsMessage(data))
.expect("couldnt send Event::VlsMessage");
} else if topic.ends_with(topics::LSS_MSG) {
let data = msg.data().to_vec();
log::info!("received data len {}", data.len());
tx.send(CoreEvent::LssMessage(data))
.expect("couldnt send Event::LssMessage");
} else if topic.ends_with(topics::CONTROL) {
tx.send(CoreEvent::Control(msg.data().to_vec()))
tx.send(CoreEvent::Control(data))
.expect("couldnt send Event::Control");
} else {
log::warn!("unrecognized topic {}", topic);
}
} else {
log::warn!("empty topic in msg!!!");
}
}
Event::Deleted(_mes_id) => info!("RECEIVED Deleted MESSAGE"),