length-value encoding for Message impl, new_from_slice

This commit is contained in:
Evan Feenstra
2022-05-25 10:46:17 -07:00
parent a3f06fb982
commit cb3897ccfc
3 changed files with 28 additions and 17 deletions

View File

@@ -1,4 +1,4 @@
use crate::core::events::{Message, MSG_SIZE};
use crate::core::events::Message;
use embedded_svc::event_bus::Postbox;
use embedded_svc::mqtt::client::utils::ConnState;
@@ -39,15 +39,6 @@ pub fn make_client(broker: &str) -> Result<(
Ok(cc)
}
fn slice_to_arr(v: &[u8]) -> [u8; MSG_SIZE] {
let mut buf = [0; MSG_SIZE];
let l = if v.len() < MSG_SIZE { v.len() } else { MSG_SIZE };
for i in 0..l {
buf[i] = v[i]
}
buf
}
pub fn start_listening(
mqtt: Arc<Mutex<EspMqttClient<ConnState<MessageImpl, EspError>>>>,
mut connection: MqttConnection<Condvar, MessageImpl, EspError>,
@@ -63,9 +54,10 @@ pub fn start_listening(
Err(e) => info!("MQTT Message ERROR: {}", e),
Ok(msg) => {
if let Event::Received(msg) = msg {
let d = slice_to_arr(msg.data().as_ref());
if let Err(e) = eventloop.post(&Message::new(d), None) {
warn!("failed to post to eventloop {:?}", e);
if let Ok(m) = Message::new_from_slice(&msg.data()) {
if let Err(e) = eventloop.post(&m, None) {
warn!("failed to post to eventloop {:?}", e);
}
}
info!("MQTT Message: {:?}", msg);
}

View File

@@ -9,6 +9,7 @@ use esp_idf_svc::mqtt::client::*;
use esp_idf_sys::EspError;
use std::sync::{Arc, Mutex};
use log::*;
use std::cmp::min;
pub const MSG_SIZE: usize = 256;
@@ -16,14 +17,30 @@ pub const MSG_SIZE: usize = 256;
pub struct Message([u8; MSG_SIZE]);
impl Message {
pub fn new(bytes: [u8; MSG_SIZE]) -> Self {
pub fn _new(bytes: [u8; MSG_SIZE]) -> Self {
Self(bytes)
}
// the first byte is the length of the message
pub fn new_from_slice(src: &[u8]) -> std::result::Result<Self, anyhow::Error> {
if src.len() > MSG_SIZE - 1 {
return Err(anyhow::anyhow!("message too long"));
}
let mut dest = [0; MSG_SIZE];
dest[0] = src.len() as u8;
for i in 0..min(src.len(), MSG_SIZE) {
dest[i+1] = src[i];
}
Ok(Self(dest))
}
pub fn read_bytes(&self) -> Vec<u8> {
let l = self.0[0] as usize;
self.0[1..l+1].to_vec()
}
}
impl EspTypedEventSource for Message {
fn source() -> *const c_types::c_char {
b"DEMO-SERVICE\0".as_ptr() as *const _
b"SPHINX\0".as_ptr() as *const _
}
}
@@ -54,12 +71,14 @@ pub fn make_eventloop(client: Arc<Mutex<EspMqttClient<ConnState<MessageImpl, Esp
info!("About to subscribe to the background event loop");
let subscription = eventloop.subscribe(move |message: &Message| {
info!("!!! Got message from the event loop"); //: {:?}", message.0);
let msg = message.read_bytes();
let msg_str = String::from_utf8_lossy(&msg[..]);
match client.lock() {
Ok(mut m_) => if let Err(err) = m_.publish(
RETURN_TOPIC,
QoS::AtMostOnce,
false,
"The processed message: ***".as_bytes(),
format!("The processed message: {}", msg_str).as_bytes(),
) {
log::warn!("failed to mqtt publish! {:?}", err);
},

View File

@@ -1,2 +1,2 @@
pub mod events;
pub mod config;
pub mod config;