try mqtt pub from background eventlopp

This commit is contained in:
Evan Feenstra
2022-05-24 09:15:28 -07:00
parent bcf936344c
commit 42d7c2f857
3 changed files with 70 additions and 3 deletions

View File

@@ -12,6 +12,61 @@ use std::thread;
use esp_idf_sys::{self};
use esp_idf_sys::EspError;
pub fn make_client(broker: &str) -> Result<EspMqttClient<ConnState<MessageImpl, EspError>>> {
let conf = MqttClientConfiguration {
client_id: Some("rust-esp32-std-demo-1"),
// FIXME - mqtts
// crt_bundle_attach: Some(esp_idf_sys::esp_crt_bundle_attach),
..Default::default()
};
let b = format!("mqtt://{}", broker);
println!("===> CONNECT TO {}", b);
let (mut client, mut connection) = EspMqttClient::new_with_conn(b, &conf)?;
info!("MQTT client started");
Ok(client)
}
pub fn start_listening(client: &EspMqttClient<ConnState<MessageImpl, EspError>>) -> Result<()> {
// Need to immediately start pumping the connection for messages, or else subscribe() and publish() below will not work
// Note that when using the alternative constructor - `EspMqttClient::new` - you don't need to
// spawn a new thread, as the messages will be pumped with a backpressure into the callback you provide.
// Yet, you still need to efficiently process each message in the callback without blocking for too long.
thread::spawn(move || {
info!("MQTT Listening for messages");
while let Some(msg) = connection.next() {
match msg {
Err(e) => info!("MQTT Message ERROR: {}", e),
Ok(msg) => {
eventloop.post(&Message::new([0; 256]), None).unwrap();
info!("MQTT Message: {:?}", msg);
},
}
}
info!("MQTT connection loop exit");
});
client.subscribe("rust-esp32-std-demo", QoS::AtMostOnce)?;
info!("Subscribed to all topics (rust-esp32-std-demo)");
client.publish(
"rust-esp32-std-demo",
QoS::AtMostOnce,
false,
"Hello from rust-esp32-std-demo!".as_bytes(),
)?;
info!("Published a hello message to topic \"rust-esp32-std-demo\"");
Ok(())
}
// this one is both together
pub fn mqtt_client(broker: &str, mut eventloop: EspBackgroundEventLoop) -> Result<EspMqttClient<ConnState<MessageImpl, EspError>>> {
info!("About to start MQTT client");

View File

@@ -2,6 +2,11 @@
use esp_idf_svc::eventloop::*;
use embedded_svc::httpd::Result;
use esp_idf_sys::{self, c_types};
use embedded_svc::mqtt::client::utils::ConnState;
use embedded_svc::mqtt::client::{Client, Connection, MessageImpl, Publish, QoS};
use esp_idf_svc::mqtt::client::*;
use esp_idf_sys::EspError;
use std::sync::Arc;
use log::*;
const MSG_SIZE: usize = 256;
@@ -39,7 +44,7 @@ impl EspTypedEventDeserializer<Message> for Message {
}
}
pub fn make_eventloop() -> Result<(EspBackgroundEventLoop, EspBackgroundSubscription)> {
pub fn make_eventloop(client: &EspMqttClient<ConnState<MessageImpl, EspError>>) -> Result<(EspBackgroundEventLoop, EspBackgroundSubscription)> {
use embedded_svc::event_bus::EventBus;
info!("About to start a background event loop");
@@ -48,6 +53,12 @@ pub fn make_eventloop() -> Result<(EspBackgroundEventLoop, EspBackgroundSubscrip
info!("About to subscribe to the background event loop");
let subscription = eventloop.subscribe(|message: &Message| {
info!("!!! Got message from the event loop"); //: {:?}", message.0);
let _ = client.publish(
"rust-esp32-std-demo-return",
QoS::AtMostOnce,
false,
"Hello from rust-esp32-std-demo!".as_bytes(),
);
})?;
// let subscription = eventloop.subscribe(cb)?;

View File

@@ -15,7 +15,6 @@ use esp_idf_svc::nvs::*;
use esp_idf_svc::nvs_storage::EspNvsStorage;
use embedded_svc::storage::Storage;
use embedded_svc::wifi::Wifi;
use embedded_svc::event_bus::EventBus;
fn main() -> Result<()> {
// Temporary. Will disappear once ESP-IDF 4.4 is released, but for now it is necessary to call this function once,
@@ -35,9 +34,11 @@ fn main() -> Result<()> {
println!("=============> START CLIENT NOW <============== {:?}", exist);
// store.remove("config").expect("couldnt remove config");
let wifi = start_client(default_nvs.clone(), &exist)?;
let mqtt = conn::mqtt::make_client(&exist.broker)?;
// if the subscription goes out of scope its dropped
// the sub needs to publish back to mqtt???
let (eventloop, _sub) = make_eventloop()?;
let (eventloop, _sub) = make_eventloop(&mqtt)?;
let mqtt_client = conn::mqtt::mqtt_client(&exist.broker, eventloop)?;
println!("{:?}", wifi.get_status());