From 98c13041ff9d56236c7a131a990e1fa613005f6c Mon Sep 17 00:00:00 2001 From: Evan Feenstra Date: Tue, 6 Jun 2023 10:55:22 -0700 Subject: [PATCH] mqtt chunk handling --- sphinx-key/src/conn/mqtt.rs | 44 ++++++++++++++++++++++++++++++------- 1 file changed, 36 insertions(+), 8 deletions(-) diff --git a/sphinx-key/src/conn/mqtt.rs b/sphinx-key/src/conn/mqtt.rs index aa00a44..9199e41 100644 --- a/sphinx-key/src/conn/mqtt.rs +++ b/sphinx-key/src/conn/mqtt.rs @@ -2,6 +2,7 @@ use crate::core::events::Event as CoreEvent; use sphinx_signer::sphinx_glyph::topics; use anyhow::Result; +use embedded_svc::mqtt::client::Details; use embedded_svc::mqtt::client::{Connection, Event, Message as MqttMessage, MessageImpl, QoS}; use embedded_svc::utils::mqtt::client::ConnState; // use embedded_svc::utils::mqtt::client::Connection as MqttConnection; @@ -50,6 +51,8 @@ pub fn make_client( thread::spawn(move || { info!("MQTT Listening for messages"); + let mut inflight = Vec::new(); + let mut inflight_topic = "".to_string(); loop { match connection.next() { Some(msg) => match msg { @@ -75,25 +78,50 @@ pub fn make_client( Event::Unsubscribed(_mes_id) => info!("RECEIVED Unsubscribed MESSAGE"), Event::Published(_mes_id) => info!("RECEIVED Published MESSAGE"), Event::Received(msg) => { - let topic_opt = msg.topic(); - log::info!("received msg details {:?}", msg.details()); - if let Some(topic) = topic_opt { + let incoming_message: Option<(String, Vec)> = match msg.details() { + Details::Complete => { + if let Some(topic) = msg.topic() { + Some((topic.to_string(), msg.data().to_vec())) + } else { + None + } + } + Details::InitialChunk(chunk_info) => { + if let Some(topic) = msg.topic() { + inflight_topic = topic.to_string(); + inflight.extend(msg.data().iter()); + None + } else { + None + } + } + Details::SubsequentChunk(chunk_data) => { + inflight.extend(msg.data().iter()); + if inflight.len() == chunk_data.total_data_size { + let ret = Some((inflight_topic, inflight)); + inflight_topic = "".to_string(); + inflight = Vec::new(); + ret + } else { + None + } + } + }; + + if let Some((topic, data)) = incoming_message { if topic.ends_with(topics::VLS) { - tx.send(CoreEvent::VlsMessage(msg.data().to_vec())) + tx.send(CoreEvent::VlsMessage(data)) .expect("couldnt send Event::VlsMessage"); } else if topic.ends_with(topics::LSS_MSG) { - let data = msg.data().to_vec(); log::info!("received data len {}", data.len()); tx.send(CoreEvent::LssMessage(data)) .expect("couldnt send Event::LssMessage"); } else if topic.ends_with(topics::CONTROL) { - tx.send(CoreEvent::Control(msg.data().to_vec())) + tx.send(CoreEvent::Control(data)) .expect("couldnt send Event::Control"); } else { log::warn!("unrecognized topic {}", topic); } - } else { - log::warn!("empty topic in msg!!!"); } } Event::Deleted(_mes_id) => info!("RECEIVED Deleted MESSAGE"),