diff --git a/sphinx-key/src/conn/mqtt.rs b/sphinx-key/src/conn/mqtt.rs index 2bb78ba..d351cb9 100644 --- a/sphinx-key/src/conn/mqtt.rs +++ b/sphinx-key/src/conn/mqtt.rs @@ -1,13 +1,12 @@ use crate::core::events::Message; -use embedded_svc::event_bus::Postbox; -use embedded_svc::event_bus::EventBus; +use embedded_svc::event_bus::Postbox; use embedded_svc::mqtt::client::utils::ConnState; use embedded_svc::mqtt::client::{Client, Connection, MessageImpl, Publish, QoS}; use embedded_svc::mqtt::client::utils::Connection as MqttConnection; use esp_idf_svc::mqtt::client::*; use anyhow::Result; -use esp_idf_svc::eventloop::*; +use esp_idf_svc::eventloop::EspBackgroundEventLoop; use log::*; use std::thread; use esp_idf_sys::{self}; @@ -15,9 +14,16 @@ use esp_idf_sys::EspError; use esp_idf_hal::mutex::Condvar; use std::sync::{Arc, Mutex}; -pub fn make_client(broker: &str) -> Result<(EspMqttClient>, MqttConnection)> { +pub const TOPIC: &str = "sphinx"; +pub const RETURN_TOPIC: &str = "sphinx-return"; +pub const CLIENT_ID: &str = "sphinx-1"; + +pub fn make_client(broker: &str) -> Result<( + EspMqttClient>, + MqttConnection +)> { let conf = MqttClientConfiguration { - client_id: Some("rust-esp32-std-demo-1"), + client_id: Some(CLIENT_ID), // FIXME - mqtts // crt_bundle_attach: Some(esp_idf_sys::esp_crt_bundle_attach), ..Default::default() @@ -33,11 +39,13 @@ pub fn make_client(broker: &str) -> Result<(EspMqttClient>>>, mut connection: MqttConnection, mut eventloop: EspBackgroundEventLoop) -> Result<()> { - // Need to immediately start pumping the connection for messages, or else subscribe() and publish() below will not work - // Note that when using the alternative constructor - `EspMqttClient::new` - you don't need to - // spawn a new thread, as the messages will be pumped with a backpressure into the callback you provide. - // Yet, you still need to efficiently process each message in the callback without blocking for too long. +pub fn start_listening( + mqtt: Arc>>>, + mut connection: MqttConnection, + mut eventloop: EspBackgroundEventLoop +) -> Result<()> { + + // must start pumping before subscribe or publish will work thread::spawn(move || { info!("MQTT Listening for messages"); @@ -45,86 +53,26 @@ pub fn start_listening(mqtt: Arc info!("MQTT Message ERROR: {}", e), Ok(msg) => { - eventloop.post(&Message::new([0; 256]), None).unwrap(); + if let Err(e) = eventloop.post(&Message::new([0; 256]), None) { + warn!("failed to post to eventloop {:?}", e); + } info!("MQTT Message: {:?}", msg); }, } } - info!("MQTT connection loop exit"); }); let mut client = mqtt.lock().unwrap(); - client.subscribe("rust-esp32-std-demo", QoS::AtMostOnce)?; - - info!("Subscribed to all topics (rust-esp32-std-demo)"); + client.subscribe(TOPIC, QoS::AtMostOnce)?; client.publish( - "rust-esp32-std-demo", + TOPIC, QoS::AtMostOnce, false, - "Hello from rust-esp32-std-demo!".as_bytes(), + format!("Hello from {}!", CLIENT_ID).as_bytes(), )?; - info!("Published a hello message to topic \"rust-esp32-std-demo\""); - Ok(()) } - -// this one is both together -pub fn mqtt_client(broker: &str, mut eventloop: EspBackgroundEventLoop) -> Result>> { - info!("About to start MQTT client"); - - let conf = MqttClientConfiguration { - client_id: Some("rust-esp32-std-demo-1"), - // FIXME - mqtts - // crt_bundle_attach: Some(esp_idf_sys::esp_crt_bundle_attach), - ..Default::default() - }; - - let b = format!("mqtt://{}", broker); - println!("===> CONNECT TO {}", b); - let (mut client, mut connection) = EspMqttClient::new_with_conn(b, &conf)?; - - info!("MQTT client started"); - - // let subscription = eventloop.subscribe(|message: &Message| { - // log::info!("!!! Got message from the event loop"); //: {:?}", message.0); - // })?; - - // Need to immediately start pumping the connection for messages, or else subscribe() and publish() below will not work - // Note that when using the alternative constructor - `EspMqttClient::new` - you don't need to - // spawn a new thread, as the messages will be pumped with a backpressure into the callback you provide. - // Yet, you still need to efficiently process each message in the callback without blocking for too long. - thread::spawn(move || { - info!("MQTT Listening for messages"); - - while let Some(msg) = connection.next() { - match msg { - Err(e) => info!("MQTT Message ERROR: {}", e), - Ok(msg) => { - eventloop.post(&Message::new([0; 256]), None).unwrap(); - info!("MQTT Message: {:?}", msg); - }, - } - } - - info!("MQTT connection loop exit"); - }); - - client.subscribe("rust-esp32-std-demo", QoS::AtMostOnce)?; - - info!("Subscribed to all topics (rust-esp32-std-demo)"); - - client.publish( - "rust-esp32-std-demo", - QoS::AtMostOnce, - false, - "Hello from rust-esp32-std-demo!".as_bytes(), - )?; - - info!("Published a hello message to topic \"rust-esp32-std-demo\""); - - Ok(client) -} \ No newline at end of file diff --git a/sphinx-key/src/conn/wifi.rs b/sphinx-key/src/conn/wifi.rs index c6ef6fb..8000f21 100644 --- a/sphinx-key/src/conn/wifi.rs +++ b/sphinx-key/src/conn/wifi.rs @@ -16,7 +16,6 @@ use std::time::Duration; use std::sync::Arc; use std::thread; -#[allow(dead_code)] pub fn start_client( default_nvs: Arc, config: &Config, @@ -73,8 +72,7 @@ pub fn start_client( Ok(wifi) } -#[allow(dead_code)] -pub fn start_server( +pub fn start_access_point( default_nvs: Arc, ) -> Result> { let netif_stack = Arc::new(EspNetifStack::new()?); diff --git a/sphinx-key/src/core/config.rs b/sphinx-key/src/core/config.rs index 8750e83..58f7b0f 100644 --- a/sphinx-key/src/core/config.rs +++ b/sphinx-key/src/core/config.rs @@ -24,7 +24,7 @@ arp -a http://192.168.71.1/?broker=52.91.253.115%3A1883 */ -pub fn start_client(default_nvs: Arc, config: &Config) -> Result> { +pub fn start_wifi_client(default_nvs: Arc, config: &Config) -> Result> { let wifi = conn::wifi::start_client( default_nvs, config @@ -33,13 +33,13 @@ pub fn start_client(default_nvs: Arc, config: &Config) -> Result Ok(wifi) } -pub fn start_server_and_wait(default_nvs: Arc) -> Result<(Box, Config)> { +pub fn start_config_server_and_wait(default_nvs: Arc) -> Result<(Box, Config)> { let mutex = Arc::new((Mutex::new(None), Condvar::new())); #[allow(clippy::redundant_clone)] #[allow(unused_mut)] - let mut wifi = conn::wifi::start_server( + let mut wifi = conn::wifi::start_access_point( default_nvs.clone(), )?; diff --git a/sphinx-key/src/core/events.rs b/sphinx-key/src/core/events.rs index e6d0b81..72acc0a 100644 --- a/sphinx-key/src/core/events.rs +++ b/sphinx-key/src/core/events.rs @@ -1,15 +1,16 @@ +use crate::conn::mqtt::RETURN_TOPIC; use esp_idf_svc::eventloop::*; use embedded_svc::httpd::Result; use esp_idf_sys::{self, c_types}; use embedded_svc::mqtt::client::utils::ConnState; -use embedded_svc::mqtt::client::{Client, Connection, MessageImpl, Publish, QoS}; +use embedded_svc::mqtt::client::{MessageImpl, Publish, QoS}; use esp_idf_svc::mqtt::client::*; use esp_idf_sys::EspError; use std::sync::{Arc, Mutex}; use log::*; -const MSG_SIZE: usize = 256; +pub const MSG_SIZE: usize = 256; #[derive(Copy, Clone, Debug)] pub struct Message([u8; MSG_SIZE]); @@ -53,13 +54,18 @@ pub fn make_eventloop(client: Arc if let Err(err) = m_.publish( + RETURN_TOPIC, + QoS::AtMostOnce, + false, + "The processed message: ***".as_bytes(), + ) { + log::warn!("failed to mqtt publish! {:?}", err); + }, + Err(_) => log::warn!("failed to lock Mutex") + }; + })?; // let subscription = eventloop.subscribe(cb)?; diff --git a/sphinx-key/src/main.rs b/sphinx-key/src/main.rs index f1f9102..d327938 100644 --- a/sphinx-key/src/main.rs +++ b/sphinx-key/src/main.rs @@ -33,7 +33,7 @@ fn main() -> Result<()> { if let Some(exist) = existing { println!("=============> START CLIENT NOW <============== {:?}", exist); // store.remove("config").expect("couldnt remove config"); - let wifi = start_client(default_nvs.clone(), &exist)?; + let wifi = start_wifi_client(default_nvs.clone(), &exist)?; let mqtt_and_conn = conn::mqtt::make_client(&exist.broker)?; @@ -41,7 +41,7 @@ fn main() -> Result<()> { // if the subscription goes out of scope its dropped // the sub needs to publish back to mqtt??? let (eventloop, _sub) = make_eventloop(mqtt.clone())?; - let mqtt_client = conn::mqtt::start_listening(mqtt, mqtt_and_conn.1, eventloop)?; + let _mqtt_client = conn::mqtt::start_listening(mqtt, mqtt_and_conn.1, eventloop)?; println!("{:?}", wifi.get_status()); for s in 0..60 { @@ -51,7 +51,7 @@ fn main() -> Result<()> { drop(wifi); } else { println!("=============> START SERVER NOW AND WAIT <=============="); - if let Ok((wifi, config)) = start_server_and_wait(default_nvs.clone()) { + if let Ok((wifi, config)) = start_config_server_and_wait(default_nvs.clone()) { store.put("config", &config).expect("could not store config"); println!("CONFIG SAVED"); drop(wifi);