From 42d7c2f857994be97149ebaf97bb266f3a3f3c74 Mon Sep 17 00:00:00 2001 From: Evan Feenstra Date: Tue, 24 May 2022 09:15:28 -0700 Subject: [PATCH 1/3] try mqtt pub from background eventlopp --- sphinx-key/src/conn/mqtt.rs | 55 +++++++++++++++++++++++++++++++++++ sphinx-key/src/core/events.rs | 13 ++++++++- sphinx-key/src/main.rs | 5 ++-- 3 files changed, 70 insertions(+), 3 deletions(-) diff --git a/sphinx-key/src/conn/mqtt.rs b/sphinx-key/src/conn/mqtt.rs index 1da6621..cc2c3de 100644 --- a/sphinx-key/src/conn/mqtt.rs +++ b/sphinx-key/src/conn/mqtt.rs @@ -12,6 +12,61 @@ use std::thread; use esp_idf_sys::{self}; use esp_idf_sys::EspError; +pub fn make_client(broker: &str) -> Result>> { + let conf = MqttClientConfiguration { + client_id: Some("rust-esp32-std-demo-1"), + // FIXME - mqtts + // crt_bundle_attach: Some(esp_idf_sys::esp_crt_bundle_attach), + ..Default::default() + }; + + let b = format!("mqtt://{}", broker); + println!("===> CONNECT TO {}", b); + let (mut client, mut connection) = EspMqttClient::new_with_conn(b, &conf)?; + + info!("MQTT client started"); + + Ok(client) +} + +pub fn start_listening(client: &EspMqttClient>) -> Result<()> { + // Need to immediately start pumping the connection for messages, or else subscribe() and publish() below will not work + // Note that when using the alternative constructor - `EspMqttClient::new` - you don't need to + // spawn a new thread, as the messages will be pumped with a backpressure into the callback you provide. + // Yet, you still need to efficiently process each message in the callback without blocking for too long. + thread::spawn(move || { + info!("MQTT Listening for messages"); + + while let Some(msg) = connection.next() { + match msg { + Err(e) => info!("MQTT Message ERROR: {}", e), + Ok(msg) => { + eventloop.post(&Message::new([0; 256]), None).unwrap(); + info!("MQTT Message: {:?}", msg); + }, + } + } + + info!("MQTT connection loop exit"); + }); + + client.subscribe("rust-esp32-std-demo", QoS::AtMostOnce)?; + + info!("Subscribed to all topics (rust-esp32-std-demo)"); + + client.publish( + "rust-esp32-std-demo", + QoS::AtMostOnce, + false, + "Hello from rust-esp32-std-demo!".as_bytes(), + )?; + + info!("Published a hello message to topic \"rust-esp32-std-demo\""); + + Ok(()) +} + +// this one is both together pub fn mqtt_client(broker: &str, mut eventloop: EspBackgroundEventLoop) -> Result>> { info!("About to start MQTT client"); diff --git a/sphinx-key/src/core/events.rs b/sphinx-key/src/core/events.rs index 7c136db..03b9d8c 100644 --- a/sphinx-key/src/core/events.rs +++ b/sphinx-key/src/core/events.rs @@ -2,6 +2,11 @@ use esp_idf_svc::eventloop::*; use embedded_svc::httpd::Result; use esp_idf_sys::{self, c_types}; +use embedded_svc::mqtt::client::utils::ConnState; +use embedded_svc::mqtt::client::{Client, Connection, MessageImpl, Publish, QoS}; +use esp_idf_svc::mqtt::client::*; +use esp_idf_sys::EspError; +use std::sync::Arc; use log::*; const MSG_SIZE: usize = 256; @@ -39,7 +44,7 @@ impl EspTypedEventDeserializer for Message { } } -pub fn make_eventloop() -> Result<(EspBackgroundEventLoop, EspBackgroundSubscription)> { +pub fn make_eventloop(client: &EspMqttClient>) -> Result<(EspBackgroundEventLoop, EspBackgroundSubscription)> { use embedded_svc::event_bus::EventBus; info!("About to start a background event loop"); @@ -48,6 +53,12 @@ pub fn make_eventloop() -> Result<(EspBackgroundEventLoop, EspBackgroundSubscrip info!("About to subscribe to the background event loop"); let subscription = eventloop.subscribe(|message: &Message| { info!("!!! Got message from the event loop"); //: {:?}", message.0); + let _ = client.publish( + "rust-esp32-std-demo-return", + QoS::AtMostOnce, + false, + "Hello from rust-esp32-std-demo!".as_bytes(), + ); })?; // let subscription = eventloop.subscribe(cb)?; diff --git a/sphinx-key/src/main.rs b/sphinx-key/src/main.rs index cf563a2..9176e7d 100644 --- a/sphinx-key/src/main.rs +++ b/sphinx-key/src/main.rs @@ -15,7 +15,6 @@ use esp_idf_svc::nvs::*; use esp_idf_svc::nvs_storage::EspNvsStorage; use embedded_svc::storage::Storage; use embedded_svc::wifi::Wifi; -use embedded_svc::event_bus::EventBus; fn main() -> Result<()> { // Temporary. Will disappear once ESP-IDF 4.4 is released, but for now it is necessary to call this function once, @@ -35,9 +34,11 @@ fn main() -> Result<()> { println!("=============> START CLIENT NOW <============== {:?}", exist); // store.remove("config").expect("couldnt remove config"); let wifi = start_client(default_nvs.clone(), &exist)?; + + let mqtt = conn::mqtt::make_client(&exist.broker)?; // if the subscription goes out of scope its dropped // the sub needs to publish back to mqtt??? - let (eventloop, _sub) = make_eventloop()?; + let (eventloop, _sub) = make_eventloop(&mqtt)?; let mqtt_client = conn::mqtt::mqtt_client(&exist.broker, eventloop)?; println!("{:?}", wifi.get_status()); From e5923942309ae0806b43a16f5959b73fd254f184 Mon Sep 17 00:00:00 2001 From: Evan Feenstra Date: Tue, 24 May 2022 17:37:41 -0700 Subject: [PATCH 2/3] use Arc> to pass across the thread bound, first create client, then eventloop, then listen --- sphinx-key/src/conn/mqtt.rs | 16 +++++++++++----- sphinx-key/src/core/events.rs | 9 +++++---- sphinx-key/src/main.rs | 10 ++++++---- 3 files changed, 22 insertions(+), 13 deletions(-) diff --git a/sphinx-key/src/conn/mqtt.rs b/sphinx-key/src/conn/mqtt.rs index cc2c3de..2bb78ba 100644 --- a/sphinx-key/src/conn/mqtt.rs +++ b/sphinx-key/src/conn/mqtt.rs @@ -4,6 +4,7 @@ use embedded_svc::event_bus::Postbox; use embedded_svc::event_bus::EventBus; use embedded_svc::mqtt::client::utils::ConnState; use embedded_svc::mqtt::client::{Client, Connection, MessageImpl, Publish, QoS}; +use embedded_svc::mqtt::client::utils::Connection as MqttConnection; use esp_idf_svc::mqtt::client::*; use anyhow::Result; use esp_idf_svc::eventloop::*; @@ -11,8 +12,10 @@ use log::*; use std::thread; use esp_idf_sys::{self}; use esp_idf_sys::EspError; +use esp_idf_hal::mutex::Condvar; +use std::sync::{Arc, Mutex}; -pub fn make_client(broker: &str) -> Result>> { +pub fn make_client(broker: &str) -> Result<(EspMqttClient>, MqttConnection)> { let conf = MqttClientConfiguration { client_id: Some("rust-esp32-std-demo-1"), // FIXME - mqtts @@ -22,14 +25,15 @@ pub fn make_client(broker: &str) -> Result CONNECT TO {}", b); - let (mut client, mut connection) = EspMqttClient::new_with_conn(b, &conf)?; - + // let (mut client, mut connection) = EspMqttClient::new_with_conn(b, &conf)?; + let cc = EspMqttClient::new_with_conn(b, &conf)?; +// info!("MQTT client started"); - Ok(client) + Ok(cc) } -pub fn start_listening(client: &EspMqttClient>) -> Result<()> { +pub fn start_listening(mqtt: Arc>>>, mut connection: MqttConnection, mut eventloop: EspBackgroundEventLoop) -> Result<()> { // Need to immediately start pumping the connection for messages, or else subscribe() and publish() below will not work // Note that when using the alternative constructor - `EspMqttClient::new` - you don't need to // spawn a new thread, as the messages will be pumped with a backpressure into the callback you provide. @@ -50,6 +54,8 @@ pub fn start_listening(client: &EspMqttClient>) info!("MQTT connection loop exit"); }); + let mut client = mqtt.lock().unwrap(); + client.subscribe("rust-esp32-std-demo", QoS::AtMostOnce)?; info!("Subscribed to all topics (rust-esp32-std-demo)"); diff --git a/sphinx-key/src/core/events.rs b/sphinx-key/src/core/events.rs index 03b9d8c..e6d0b81 100644 --- a/sphinx-key/src/core/events.rs +++ b/sphinx-key/src/core/events.rs @@ -6,7 +6,7 @@ use embedded_svc::mqtt::client::utils::ConnState; use embedded_svc::mqtt::client::{Client, Connection, MessageImpl, Publish, QoS}; use esp_idf_svc::mqtt::client::*; use esp_idf_sys::EspError; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use log::*; const MSG_SIZE: usize = 256; @@ -44,16 +44,17 @@ impl EspTypedEventDeserializer for Message { } } -pub fn make_eventloop(client: &EspMqttClient>) -> Result<(EspBackgroundEventLoop, EspBackgroundSubscription)> { +pub fn make_eventloop(client: Arc>>>) -> Result<(EspBackgroundEventLoop, EspBackgroundSubscription)> { use embedded_svc::event_bus::EventBus; info!("About to start a background event loop"); let mut eventloop = EspBackgroundEventLoop::new(&Default::default())?; info!("About to subscribe to the background event loop"); - let subscription = eventloop.subscribe(|message: &Message| { + let subscription = eventloop.subscribe(move |message: &Message| { info!("!!! Got message from the event loop"); //: {:?}", message.0); - let _ = client.publish( + let mut mqtt_ = client.lock().unwrap(); + let _ = mqtt_.publish( "rust-esp32-std-demo-return", QoS::AtMostOnce, false, diff --git a/sphinx-key/src/main.rs b/sphinx-key/src/main.rs index 9176e7d..f1f9102 100644 --- a/sphinx-key/src/main.rs +++ b/sphinx-key/src/main.rs @@ -7,7 +7,7 @@ use crate::core::{events::*, config::*}; use sphinx_key_signer; use esp_idf_sys as _; // If using the `binstart` feature of `esp-idf-sys`, always keep this module imported use std::thread; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::time::Duration; use anyhow::Result; @@ -35,11 +35,13 @@ fn main() -> Result<()> { // store.remove("config").expect("couldnt remove config"); let wifi = start_client(default_nvs.clone(), &exist)?; - let mqtt = conn::mqtt::make_client(&exist.broker)?; + let mqtt_and_conn = conn::mqtt::make_client(&exist.broker)?; + + let mqtt = Arc::new(Mutex::new(mqtt_and_conn.0)); // if the subscription goes out of scope its dropped // the sub needs to publish back to mqtt??? - let (eventloop, _sub) = make_eventloop(&mqtt)?; - let mqtt_client = conn::mqtt::mqtt_client(&exist.broker, eventloop)?; + let (eventloop, _sub) = make_eventloop(mqtt.clone())?; + let mqtt_client = conn::mqtt::start_listening(mqtt, mqtt_and_conn.1, eventloop)?; println!("{:?}", wifi.get_status()); for s in 0..60 { From dda11135be7149f1cc491c0ddb47f753e39a03a2 Mon Sep 17 00:00:00 2001 From: Evan Feenstra Date: Tue, 24 May 2022 21:34:43 -0700 Subject: [PATCH 3/3] mqtt ping back working --- sphinx-key/src/conn/mqtt.rs | 100 ++++++++-------------------------- sphinx-key/src/conn/wifi.rs | 4 +- sphinx-key/src/core/config.rs | 6 +- sphinx-key/src/core/events.rs | 24 +++++--- sphinx-key/src/main.rs | 6 +- 5 files changed, 46 insertions(+), 94 deletions(-) diff --git a/sphinx-key/src/conn/mqtt.rs b/sphinx-key/src/conn/mqtt.rs index 2bb78ba..d351cb9 100644 --- a/sphinx-key/src/conn/mqtt.rs +++ b/sphinx-key/src/conn/mqtt.rs @@ -1,13 +1,12 @@ use crate::core::events::Message; -use embedded_svc::event_bus::Postbox; -use embedded_svc::event_bus::EventBus; +use embedded_svc::event_bus::Postbox; use embedded_svc::mqtt::client::utils::ConnState; use embedded_svc::mqtt::client::{Client, Connection, MessageImpl, Publish, QoS}; use embedded_svc::mqtt::client::utils::Connection as MqttConnection; use esp_idf_svc::mqtt::client::*; use anyhow::Result; -use esp_idf_svc::eventloop::*; +use esp_idf_svc::eventloop::EspBackgroundEventLoop; use log::*; use std::thread; use esp_idf_sys::{self}; @@ -15,9 +14,16 @@ use esp_idf_sys::EspError; use esp_idf_hal::mutex::Condvar; use std::sync::{Arc, Mutex}; -pub fn make_client(broker: &str) -> Result<(EspMqttClient>, MqttConnection)> { +pub const TOPIC: &str = "sphinx"; +pub const RETURN_TOPIC: &str = "sphinx-return"; +pub const CLIENT_ID: &str = "sphinx-1"; + +pub fn make_client(broker: &str) -> Result<( + EspMqttClient>, + MqttConnection +)> { let conf = MqttClientConfiguration { - client_id: Some("rust-esp32-std-demo-1"), + client_id: Some(CLIENT_ID), // FIXME - mqtts // crt_bundle_attach: Some(esp_idf_sys::esp_crt_bundle_attach), ..Default::default() @@ -33,11 +39,13 @@ pub fn make_client(broker: &str) -> Result<(EspMqttClient>>>, mut connection: MqttConnection, mut eventloop: EspBackgroundEventLoop) -> Result<()> { - // Need to immediately start pumping the connection for messages, or else subscribe() and publish() below will not work - // Note that when using the alternative constructor - `EspMqttClient::new` - you don't need to - // spawn a new thread, as the messages will be pumped with a backpressure into the callback you provide. - // Yet, you still need to efficiently process each message in the callback without blocking for too long. +pub fn start_listening( + mqtt: Arc>>>, + mut connection: MqttConnection, + mut eventloop: EspBackgroundEventLoop +) -> Result<()> { + + // must start pumping before subscribe or publish will work thread::spawn(move || { info!("MQTT Listening for messages"); @@ -45,86 +53,26 @@ pub fn start_listening(mqtt: Arc info!("MQTT Message ERROR: {}", e), Ok(msg) => { - eventloop.post(&Message::new([0; 256]), None).unwrap(); + if let Err(e) = eventloop.post(&Message::new([0; 256]), None) { + warn!("failed to post to eventloop {:?}", e); + } info!("MQTT Message: {:?}", msg); }, } } - info!("MQTT connection loop exit"); }); let mut client = mqtt.lock().unwrap(); - client.subscribe("rust-esp32-std-demo", QoS::AtMostOnce)?; - - info!("Subscribed to all topics (rust-esp32-std-demo)"); + client.subscribe(TOPIC, QoS::AtMostOnce)?; client.publish( - "rust-esp32-std-demo", + TOPIC, QoS::AtMostOnce, false, - "Hello from rust-esp32-std-demo!".as_bytes(), + format!("Hello from {}!", CLIENT_ID).as_bytes(), )?; - info!("Published a hello message to topic \"rust-esp32-std-demo\""); - Ok(()) } - -// this one is both together -pub fn mqtt_client(broker: &str, mut eventloop: EspBackgroundEventLoop) -> Result>> { - info!("About to start MQTT client"); - - let conf = MqttClientConfiguration { - client_id: Some("rust-esp32-std-demo-1"), - // FIXME - mqtts - // crt_bundle_attach: Some(esp_idf_sys::esp_crt_bundle_attach), - ..Default::default() - }; - - let b = format!("mqtt://{}", broker); - println!("===> CONNECT TO {}", b); - let (mut client, mut connection) = EspMqttClient::new_with_conn(b, &conf)?; - - info!("MQTT client started"); - - // let subscription = eventloop.subscribe(|message: &Message| { - // log::info!("!!! Got message from the event loop"); //: {:?}", message.0); - // })?; - - // Need to immediately start pumping the connection for messages, or else subscribe() and publish() below will not work - // Note that when using the alternative constructor - `EspMqttClient::new` - you don't need to - // spawn a new thread, as the messages will be pumped with a backpressure into the callback you provide. - // Yet, you still need to efficiently process each message in the callback without blocking for too long. - thread::spawn(move || { - info!("MQTT Listening for messages"); - - while let Some(msg) = connection.next() { - match msg { - Err(e) => info!("MQTT Message ERROR: {}", e), - Ok(msg) => { - eventloop.post(&Message::new([0; 256]), None).unwrap(); - info!("MQTT Message: {:?}", msg); - }, - } - } - - info!("MQTT connection loop exit"); - }); - - client.subscribe("rust-esp32-std-demo", QoS::AtMostOnce)?; - - info!("Subscribed to all topics (rust-esp32-std-demo)"); - - client.publish( - "rust-esp32-std-demo", - QoS::AtMostOnce, - false, - "Hello from rust-esp32-std-demo!".as_bytes(), - )?; - - info!("Published a hello message to topic \"rust-esp32-std-demo\""); - - Ok(client) -} \ No newline at end of file diff --git a/sphinx-key/src/conn/wifi.rs b/sphinx-key/src/conn/wifi.rs index c6ef6fb..8000f21 100644 --- a/sphinx-key/src/conn/wifi.rs +++ b/sphinx-key/src/conn/wifi.rs @@ -16,7 +16,6 @@ use std::time::Duration; use std::sync::Arc; use std::thread; -#[allow(dead_code)] pub fn start_client( default_nvs: Arc, config: &Config, @@ -73,8 +72,7 @@ pub fn start_client( Ok(wifi) } -#[allow(dead_code)] -pub fn start_server( +pub fn start_access_point( default_nvs: Arc, ) -> Result> { let netif_stack = Arc::new(EspNetifStack::new()?); diff --git a/sphinx-key/src/core/config.rs b/sphinx-key/src/core/config.rs index 8750e83..58f7b0f 100644 --- a/sphinx-key/src/core/config.rs +++ b/sphinx-key/src/core/config.rs @@ -24,7 +24,7 @@ arp -a http://192.168.71.1/?broker=52.91.253.115%3A1883 */ -pub fn start_client(default_nvs: Arc, config: &Config) -> Result> { +pub fn start_wifi_client(default_nvs: Arc, config: &Config) -> Result> { let wifi = conn::wifi::start_client( default_nvs, config @@ -33,13 +33,13 @@ pub fn start_client(default_nvs: Arc, config: &Config) -> Result Ok(wifi) } -pub fn start_server_and_wait(default_nvs: Arc) -> Result<(Box, Config)> { +pub fn start_config_server_and_wait(default_nvs: Arc) -> Result<(Box, Config)> { let mutex = Arc::new((Mutex::new(None), Condvar::new())); #[allow(clippy::redundant_clone)] #[allow(unused_mut)] - let mut wifi = conn::wifi::start_server( + let mut wifi = conn::wifi::start_access_point( default_nvs.clone(), )?; diff --git a/sphinx-key/src/core/events.rs b/sphinx-key/src/core/events.rs index e6d0b81..72acc0a 100644 --- a/sphinx-key/src/core/events.rs +++ b/sphinx-key/src/core/events.rs @@ -1,15 +1,16 @@ +use crate::conn::mqtt::RETURN_TOPIC; use esp_idf_svc::eventloop::*; use embedded_svc::httpd::Result; use esp_idf_sys::{self, c_types}; use embedded_svc::mqtt::client::utils::ConnState; -use embedded_svc::mqtt::client::{Client, Connection, MessageImpl, Publish, QoS}; +use embedded_svc::mqtt::client::{MessageImpl, Publish, QoS}; use esp_idf_svc::mqtt::client::*; use esp_idf_sys::EspError; use std::sync::{Arc, Mutex}; use log::*; -const MSG_SIZE: usize = 256; +pub const MSG_SIZE: usize = 256; #[derive(Copy, Clone, Debug)] pub struct Message([u8; MSG_SIZE]); @@ -53,13 +54,18 @@ pub fn make_eventloop(client: Arc if let Err(err) = m_.publish( + RETURN_TOPIC, + QoS::AtMostOnce, + false, + "The processed message: ***".as_bytes(), + ) { + log::warn!("failed to mqtt publish! {:?}", err); + }, + Err(_) => log::warn!("failed to lock Mutex") + }; + })?; // let subscription = eventloop.subscribe(cb)?; diff --git a/sphinx-key/src/main.rs b/sphinx-key/src/main.rs index f1f9102..d327938 100644 --- a/sphinx-key/src/main.rs +++ b/sphinx-key/src/main.rs @@ -33,7 +33,7 @@ fn main() -> Result<()> { if let Some(exist) = existing { println!("=============> START CLIENT NOW <============== {:?}", exist); // store.remove("config").expect("couldnt remove config"); - let wifi = start_client(default_nvs.clone(), &exist)?; + let wifi = start_wifi_client(default_nvs.clone(), &exist)?; let mqtt_and_conn = conn::mqtt::make_client(&exist.broker)?; @@ -41,7 +41,7 @@ fn main() -> Result<()> { // if the subscription goes out of scope its dropped // the sub needs to publish back to mqtt??? let (eventloop, _sub) = make_eventloop(mqtt.clone())?; - let mqtt_client = conn::mqtt::start_listening(mqtt, mqtt_and_conn.1, eventloop)?; + let _mqtt_client = conn::mqtt::start_listening(mqtt, mqtt_and_conn.1, eventloop)?; println!("{:?}", wifi.get_status()); for s in 0..60 { @@ -51,7 +51,7 @@ fn main() -> Result<()> { drop(wifi); } else { println!("=============> START SERVER NOW AND WAIT <=============="); - if let Ok((wifi, config)) = start_server_and_wait(default_nvs.clone()) { + if let Ok((wifi, config)) = start_config_server_and_wait(default_nvs.clone()) { store.put("config", &config).expect("could not store config"); println!("CONFIG SAVED"); drop(wifi);