mqtt ping back working

This commit is contained in:
Evan Feenstra
2022-05-24 21:34:43 -07:00
parent e592394230
commit dda11135be
5 changed files with 46 additions and 94 deletions

View File

@@ -1,13 +1,12 @@
use crate::core::events::Message;
use embedded_svc::event_bus::Postbox;
use embedded_svc::event_bus::EventBus;
use embedded_svc::event_bus::Postbox;
use embedded_svc::mqtt::client::utils::ConnState;
use embedded_svc::mqtt::client::{Client, Connection, MessageImpl, Publish, QoS};
use embedded_svc::mqtt::client::utils::Connection as MqttConnection;
use esp_idf_svc::mqtt::client::*;
use anyhow::Result;
use esp_idf_svc::eventloop::*;
use esp_idf_svc::eventloop::EspBackgroundEventLoop;
use log::*;
use std::thread;
use esp_idf_sys::{self};
@@ -15,9 +14,16 @@ use esp_idf_sys::EspError;
use esp_idf_hal::mutex::Condvar;
use std::sync::{Arc, Mutex};
pub fn make_client(broker: &str) -> Result<(EspMqttClient<ConnState<MessageImpl, EspError>>, MqttConnection<Condvar, MessageImpl, EspError>)> {
pub const TOPIC: &str = "sphinx";
pub const RETURN_TOPIC: &str = "sphinx-return";
pub const CLIENT_ID: &str = "sphinx-1";
pub fn make_client(broker: &str) -> Result<(
EspMqttClient<ConnState<MessageImpl, EspError>>,
MqttConnection<Condvar, MessageImpl, EspError>
)> {
let conf = MqttClientConfiguration {
client_id: Some("rust-esp32-std-demo-1"),
client_id: Some(CLIENT_ID),
// FIXME - mqtts
// crt_bundle_attach: Some(esp_idf_sys::esp_crt_bundle_attach),
..Default::default()
@@ -33,11 +39,13 @@ pub fn make_client(broker: &str) -> Result<(EspMqttClient<ConnState<MessageImpl,
Ok(cc)
}
pub fn start_listening(mqtt: Arc<Mutex<EspMqttClient<ConnState<MessageImpl, EspError>>>>, mut connection: MqttConnection<Condvar, MessageImpl, EspError>, mut eventloop: EspBackgroundEventLoop) -> Result<()> {
// Need to immediately start pumping the connection for messages, or else subscribe() and publish() below will not work
// Note that when using the alternative constructor - `EspMqttClient::new` - you don't need to
// spawn a new thread, as the messages will be pumped with a backpressure into the callback you provide.
// Yet, you still need to efficiently process each message in the callback without blocking for too long.
pub fn start_listening(
mqtt: Arc<Mutex<EspMqttClient<ConnState<MessageImpl, EspError>>>>,
mut connection: MqttConnection<Condvar, MessageImpl, EspError>,
mut eventloop: EspBackgroundEventLoop
) -> Result<()> {
// must start pumping before subscribe or publish will work
thread::spawn(move || {
info!("MQTT Listening for messages");
@@ -45,86 +53,26 @@ pub fn start_listening(mqtt: Arc<Mutex<EspMqttClient<ConnState<MessageImpl, EspE
match msg {
Err(e) => info!("MQTT Message ERROR: {}", e),
Ok(msg) => {
eventloop.post(&Message::new([0; 256]), None).unwrap();
if let Err(e) = eventloop.post(&Message::new([0; 256]), None) {
warn!("failed to post to eventloop {:?}", e);
}
info!("MQTT Message: {:?}", msg);
},
}
}
info!("MQTT connection loop exit");
});
let mut client = mqtt.lock().unwrap();
client.subscribe("rust-esp32-std-demo", QoS::AtMostOnce)?;
info!("Subscribed to all topics (rust-esp32-std-demo)");
client.subscribe(TOPIC, QoS::AtMostOnce)?;
client.publish(
"rust-esp32-std-demo",
TOPIC,
QoS::AtMostOnce,
false,
"Hello from rust-esp32-std-demo!".as_bytes(),
format!("Hello from {}!", CLIENT_ID).as_bytes(),
)?;
info!("Published a hello message to topic \"rust-esp32-std-demo\"");
Ok(())
}
// this one is both together
pub fn mqtt_client(broker: &str, mut eventloop: EspBackgroundEventLoop) -> Result<EspMqttClient<ConnState<MessageImpl, EspError>>> {
info!("About to start MQTT client");
let conf = MqttClientConfiguration {
client_id: Some("rust-esp32-std-demo-1"),
// FIXME - mqtts
// crt_bundle_attach: Some(esp_idf_sys::esp_crt_bundle_attach),
..Default::default()
};
let b = format!("mqtt://{}", broker);
println!("===> CONNECT TO {}", b);
let (mut client, mut connection) = EspMqttClient::new_with_conn(b, &conf)?;
info!("MQTT client started");
// let subscription = eventloop.subscribe(|message: &Message| {
// log::info!("!!! Got message from the event loop"); //: {:?}", message.0);
// })?;
// Need to immediately start pumping the connection for messages, or else subscribe() and publish() below will not work
// Note that when using the alternative constructor - `EspMqttClient::new` - you don't need to
// spawn a new thread, as the messages will be pumped with a backpressure into the callback you provide.
// Yet, you still need to efficiently process each message in the callback without blocking for too long.
thread::spawn(move || {
info!("MQTT Listening for messages");
while let Some(msg) = connection.next() {
match msg {
Err(e) => info!("MQTT Message ERROR: {}", e),
Ok(msg) => {
eventloop.post(&Message::new([0; 256]), None).unwrap();
info!("MQTT Message: {:?}", msg);
},
}
}
info!("MQTT connection loop exit");
});
client.subscribe("rust-esp32-std-demo", QoS::AtMostOnce)?;
info!("Subscribed to all topics (rust-esp32-std-demo)");
client.publish(
"rust-esp32-std-demo",
QoS::AtMostOnce,
false,
"Hello from rust-esp32-std-demo!".as_bytes(),
)?;
info!("Published a hello message to topic \"rust-esp32-std-demo\"");
Ok(client)
}

View File

@@ -16,7 +16,6 @@ use std::time::Duration;
use std::sync::Arc;
use std::thread;
#[allow(dead_code)]
pub fn start_client(
default_nvs: Arc<EspDefaultNvs>,
config: &Config,
@@ -73,8 +72,7 @@ pub fn start_client(
Ok(wifi)
}
#[allow(dead_code)]
pub fn start_server(
pub fn start_access_point(
default_nvs: Arc<EspDefaultNvs>,
) -> Result<Box<EspWifi>> {
let netif_stack = Arc::new(EspNetifStack::new()?);

View File

@@ -24,7 +24,7 @@ arp -a
http://192.168.71.1/?broker=52.91.253.115%3A1883
*/
pub fn start_client(default_nvs: Arc<EspDefaultNvs>, config: &Config) -> Result<Box<EspWifi>> {
pub fn start_wifi_client(default_nvs: Arc<EspDefaultNvs>, config: &Config) -> Result<Box<EspWifi>> {
let wifi = conn::wifi::start_client(
default_nvs,
config
@@ -33,13 +33,13 @@ pub fn start_client(default_nvs: Arc<EspDefaultNvs>, config: &Config) -> Result
Ok(wifi)
}
pub fn start_server_and_wait(default_nvs: Arc<EspDefaultNvs>) -> Result<(Box<EspWifi>, Config)> {
pub fn start_config_server_and_wait(default_nvs: Arc<EspDefaultNvs>) -> Result<(Box<EspWifi>, Config)> {
let mutex = Arc::new((Mutex::new(None), Condvar::new()));
#[allow(clippy::redundant_clone)]
#[allow(unused_mut)]
let mut wifi = conn::wifi::start_server(
let mut wifi = conn::wifi::start_access_point(
default_nvs.clone(),
)?;

View File

@@ -1,15 +1,16 @@
use crate::conn::mqtt::RETURN_TOPIC;
use esp_idf_svc::eventloop::*;
use embedded_svc::httpd::Result;
use esp_idf_sys::{self, c_types};
use embedded_svc::mqtt::client::utils::ConnState;
use embedded_svc::mqtt::client::{Client, Connection, MessageImpl, Publish, QoS};
use embedded_svc::mqtt::client::{MessageImpl, Publish, QoS};
use esp_idf_svc::mqtt::client::*;
use esp_idf_sys::EspError;
use std::sync::{Arc, Mutex};
use log::*;
const MSG_SIZE: usize = 256;
pub const MSG_SIZE: usize = 256;
#[derive(Copy, Clone, Debug)]
pub struct Message([u8; MSG_SIZE]);
@@ -53,13 +54,18 @@ pub fn make_eventloop(client: Arc<Mutex<EspMqttClient<ConnState<MessageImpl, Esp
info!("About to subscribe to the background event loop");
let subscription = eventloop.subscribe(move |message: &Message| {
info!("!!! Got message from the event loop"); //: {:?}", message.0);
let mut mqtt_ = client.lock().unwrap();
let _ = mqtt_.publish(
"rust-esp32-std-demo-return",
QoS::AtMostOnce,
false,
"Hello from rust-esp32-std-demo!".as_bytes(),
);
match client.lock() {
Ok(mut m_) => if let Err(err) = m_.publish(
RETURN_TOPIC,
QoS::AtMostOnce,
false,
"The processed message: ***".as_bytes(),
) {
log::warn!("failed to mqtt publish! {:?}", err);
},
Err(_) => log::warn!("failed to lock Mutex<Client>")
};
})?;
// let subscription = eventloop.subscribe(cb)?;

View File

@@ -33,7 +33,7 @@ fn main() -> Result<()> {
if let Some(exist) = existing {
println!("=============> START CLIENT NOW <============== {:?}", exist);
// store.remove("config").expect("couldnt remove config");
let wifi = start_client(default_nvs.clone(), &exist)?;
let wifi = start_wifi_client(default_nvs.clone(), &exist)?;
let mqtt_and_conn = conn::mqtt::make_client(&exist.broker)?;
@@ -41,7 +41,7 @@ fn main() -> Result<()> {
// if the subscription goes out of scope its dropped
// the sub needs to publish back to mqtt???
let (eventloop, _sub) = make_eventloop(mqtt.clone())?;
let mqtt_client = conn::mqtt::start_listening(mqtt, mqtt_and_conn.1, eventloop)?;
let _mqtt_client = conn::mqtt::start_listening(mqtt, mqtt_and_conn.1, eventloop)?;
println!("{:?}", wifi.get_status());
for s in 0..60 {
@@ -51,7 +51,7 @@ fn main() -> Result<()> {
drop(wifi);
} else {
println!("=============> START SERVER NOW AND WAIT <==============");
if let Ok((wifi, config)) = start_server_and_wait(default_nvs.clone()) {
if let Ok((wifi, config)) = start_config_server_and_wait(default_nvs.clone()) {
store.put("config", &config).expect("could not store config");
println!("CONFIG SAVED");
drop(wifi);