diff --git a/sphinx-key/src/conn/mqtt.rs b/sphinx-key/src/conn/mqtt.rs index 1da6621..d351cb9 100644 --- a/sphinx-key/src/conn/mqtt.rs +++ b/sphinx-key/src/conn/mqtt.rs @@ -1,22 +1,29 @@ use crate::core::events::Message; -use embedded_svc::event_bus::Postbox; -use embedded_svc::event_bus::EventBus; +use embedded_svc::event_bus::Postbox; use embedded_svc::mqtt::client::utils::ConnState; use embedded_svc::mqtt::client::{Client, Connection, MessageImpl, Publish, QoS}; +use embedded_svc::mqtt::client::utils::Connection as MqttConnection; use esp_idf_svc::mqtt::client::*; use anyhow::Result; -use esp_idf_svc::eventloop::*; +use esp_idf_svc::eventloop::EspBackgroundEventLoop; use log::*; use std::thread; use esp_idf_sys::{self}; use esp_idf_sys::EspError; +use esp_idf_hal::mutex::Condvar; +use std::sync::{Arc, Mutex}; -pub fn mqtt_client(broker: &str, mut eventloop: EspBackgroundEventLoop) -> Result>> { - info!("About to start MQTT client"); +pub const TOPIC: &str = "sphinx"; +pub const RETURN_TOPIC: &str = "sphinx-return"; +pub const CLIENT_ID: &str = "sphinx-1"; +pub fn make_client(broker: &str) -> Result<( + EspMqttClient>, + MqttConnection +)> { let conf = MqttClientConfiguration { - client_id: Some("rust-esp32-std-demo-1"), + client_id: Some(CLIENT_ID), // FIXME - mqtts // crt_bundle_attach: Some(esp_idf_sys::esp_crt_bundle_attach), ..Default::default() @@ -24,18 +31,21 @@ pub fn mqtt_client(broker: &str, mut eventloop: EspBackgroundEventLoop) -> Resul let b = format!("mqtt://{}", broker); println!("===> CONNECT TO {}", b); - let (mut client, mut connection) = EspMqttClient::new_with_conn(b, &conf)?; - + // let (mut client, mut connection) = EspMqttClient::new_with_conn(b, &conf)?; + let cc = EspMqttClient::new_with_conn(b, &conf)?; +// info!("MQTT client started"); - // let subscription = eventloop.subscribe(|message: &Message| { - // log::info!("!!! Got message from the event loop"); //: {:?}", message.0); - // })?; + Ok(cc) +} - // Need to immediately start pumping the connection for messages, or else subscribe() and publish() below will not work - // Note that when using the alternative constructor - `EspMqttClient::new` - you don't need to - // spawn a new thread, as the messages will be pumped with a backpressure into the callback you provide. - // Yet, you still need to efficiently process each message in the callback without blocking for too long. +pub fn start_listening( + mqtt: Arc>>>, + mut connection: MqttConnection, + mut eventloop: EspBackgroundEventLoop +) -> Result<()> { + + // must start pumping before subscribe or publish will work thread::spawn(move || { info!("MQTT Listening for messages"); @@ -43,27 +53,26 @@ pub fn mqtt_client(broker: &str, mut eventloop: EspBackgroundEventLoop) -> Resul match msg { Err(e) => info!("MQTT Message ERROR: {}", e), Ok(msg) => { - eventloop.post(&Message::new([0; 256]), None).unwrap(); + if let Err(e) = eventloop.post(&Message::new([0; 256]), None) { + warn!("failed to post to eventloop {:?}", e); + } info!("MQTT Message: {:?}", msg); }, } } - info!("MQTT connection loop exit"); }); - client.subscribe("rust-esp32-std-demo", QoS::AtMostOnce)?; + let mut client = mqtt.lock().unwrap(); - info!("Subscribed to all topics (rust-esp32-std-demo)"); + client.subscribe(TOPIC, QoS::AtMostOnce)?; client.publish( - "rust-esp32-std-demo", + TOPIC, QoS::AtMostOnce, false, - "Hello from rust-esp32-std-demo!".as_bytes(), + format!("Hello from {}!", CLIENT_ID).as_bytes(), )?; - info!("Published a hello message to topic \"rust-esp32-std-demo\""); - - Ok(client) -} \ No newline at end of file + Ok(()) +} diff --git a/sphinx-key/src/conn/wifi.rs b/sphinx-key/src/conn/wifi.rs index c6ef6fb..8000f21 100644 --- a/sphinx-key/src/conn/wifi.rs +++ b/sphinx-key/src/conn/wifi.rs @@ -16,7 +16,6 @@ use std::time::Duration; use std::sync::Arc; use std::thread; -#[allow(dead_code)] pub fn start_client( default_nvs: Arc, config: &Config, @@ -73,8 +72,7 @@ pub fn start_client( Ok(wifi) } -#[allow(dead_code)] -pub fn start_server( +pub fn start_access_point( default_nvs: Arc, ) -> Result> { let netif_stack = Arc::new(EspNetifStack::new()?); diff --git a/sphinx-key/src/core/config.rs b/sphinx-key/src/core/config.rs index 8750e83..58f7b0f 100644 --- a/sphinx-key/src/core/config.rs +++ b/sphinx-key/src/core/config.rs @@ -24,7 +24,7 @@ arp -a http://192.168.71.1/?broker=52.91.253.115%3A1883 */ -pub fn start_client(default_nvs: Arc, config: &Config) -> Result> { +pub fn start_wifi_client(default_nvs: Arc, config: &Config) -> Result> { let wifi = conn::wifi::start_client( default_nvs, config @@ -33,13 +33,13 @@ pub fn start_client(default_nvs: Arc, config: &Config) -> Result Ok(wifi) } -pub fn start_server_and_wait(default_nvs: Arc) -> Result<(Box, Config)> { +pub fn start_config_server_and_wait(default_nvs: Arc) -> Result<(Box, Config)> { let mutex = Arc::new((Mutex::new(None), Condvar::new())); #[allow(clippy::redundant_clone)] #[allow(unused_mut)] - let mut wifi = conn::wifi::start_server( + let mut wifi = conn::wifi::start_access_point( default_nvs.clone(), )?; diff --git a/sphinx-key/src/core/events.rs b/sphinx-key/src/core/events.rs index 7c136db..72acc0a 100644 --- a/sphinx-key/src/core/events.rs +++ b/sphinx-key/src/core/events.rs @@ -1,10 +1,16 @@ +use crate::conn::mqtt::RETURN_TOPIC; use esp_idf_svc::eventloop::*; use embedded_svc::httpd::Result; use esp_idf_sys::{self, c_types}; +use embedded_svc::mqtt::client::utils::ConnState; +use embedded_svc::mqtt::client::{MessageImpl, Publish, QoS}; +use esp_idf_svc::mqtt::client::*; +use esp_idf_sys::EspError; +use std::sync::{Arc, Mutex}; use log::*; -const MSG_SIZE: usize = 256; +pub const MSG_SIZE: usize = 256; #[derive(Copy, Clone, Debug)] pub struct Message([u8; MSG_SIZE]); @@ -39,15 +45,27 @@ impl EspTypedEventDeserializer for Message { } } -pub fn make_eventloop() -> Result<(EspBackgroundEventLoop, EspBackgroundSubscription)> { +pub fn make_eventloop(client: Arc>>>) -> Result<(EspBackgroundEventLoop, EspBackgroundSubscription)> { use embedded_svc::event_bus::EventBus; info!("About to start a background event loop"); let mut eventloop = EspBackgroundEventLoop::new(&Default::default())?; info!("About to subscribe to the background event loop"); - let subscription = eventloop.subscribe(|message: &Message| { + let subscription = eventloop.subscribe(move |message: &Message| { info!("!!! Got message from the event loop"); //: {:?}", message.0); + match client.lock() { + Ok(mut m_) => if let Err(err) = m_.publish( + RETURN_TOPIC, + QoS::AtMostOnce, + false, + "The processed message: ***".as_bytes(), + ) { + log::warn!("failed to mqtt publish! {:?}", err); + }, + Err(_) => log::warn!("failed to lock Mutex") + }; + })?; // let subscription = eventloop.subscribe(cb)?; diff --git a/sphinx-key/src/main.rs b/sphinx-key/src/main.rs index cf563a2..d327938 100644 --- a/sphinx-key/src/main.rs +++ b/sphinx-key/src/main.rs @@ -7,7 +7,7 @@ use crate::core::{events::*, config::*}; use sphinx_key_signer; use esp_idf_sys as _; // If using the `binstart` feature of `esp-idf-sys`, always keep this module imported use std::thread; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::time::Duration; use anyhow::Result; @@ -15,7 +15,6 @@ use esp_idf_svc::nvs::*; use esp_idf_svc::nvs_storage::EspNvsStorage; use embedded_svc::storage::Storage; use embedded_svc::wifi::Wifi; -use embedded_svc::event_bus::EventBus; fn main() -> Result<()> { // Temporary. Will disappear once ESP-IDF 4.4 is released, but for now it is necessary to call this function once, @@ -34,11 +33,15 @@ fn main() -> Result<()> { if let Some(exist) = existing { println!("=============> START CLIENT NOW <============== {:?}", exist); // store.remove("config").expect("couldnt remove config"); - let wifi = start_client(default_nvs.clone(), &exist)?; + let wifi = start_wifi_client(default_nvs.clone(), &exist)?; + + let mqtt_and_conn = conn::mqtt::make_client(&exist.broker)?; + + let mqtt = Arc::new(Mutex::new(mqtt_and_conn.0)); // if the subscription goes out of scope its dropped // the sub needs to publish back to mqtt??? - let (eventloop, _sub) = make_eventloop()?; - let mqtt_client = conn::mqtt::mqtt_client(&exist.broker, eventloop)?; + let (eventloop, _sub) = make_eventloop(mqtt.clone())?; + let _mqtt_client = conn::mqtt::start_listening(mqtt, mqtt_and_conn.1, eventloop)?; println!("{:?}", wifi.get_status()); for s in 0..60 { @@ -48,7 +51,7 @@ fn main() -> Result<()> { drop(wifi); } else { println!("=============> START SERVER NOW AND WAIT <=============="); - if let Ok((wifi, config)) = start_server_and_wait(default_nvs.clone()) { + if let Ok((wifi, config)) = start_config_server_and_wait(default_nvs.clone()) { store.put("config", &config).expect("could not store config"); println!("CONFIG SAVED"); drop(wifi);