diff --git a/sphinx-key/src/conn/mqtt.rs b/sphinx-key/src/conn/mqtt.rs index 1da6621..cc2c3de 100644 --- a/sphinx-key/src/conn/mqtt.rs +++ b/sphinx-key/src/conn/mqtt.rs @@ -12,6 +12,61 @@ use std::thread; use esp_idf_sys::{self}; use esp_idf_sys::EspError; +pub fn make_client(broker: &str) -> Result>> { + let conf = MqttClientConfiguration { + client_id: Some("rust-esp32-std-demo-1"), + // FIXME - mqtts + // crt_bundle_attach: Some(esp_idf_sys::esp_crt_bundle_attach), + ..Default::default() + }; + + let b = format!("mqtt://{}", broker); + println!("===> CONNECT TO {}", b); + let (mut client, mut connection) = EspMqttClient::new_with_conn(b, &conf)?; + + info!("MQTT client started"); + + Ok(client) +} + +pub fn start_listening(client: &EspMqttClient>) -> Result<()> { + // Need to immediately start pumping the connection for messages, or else subscribe() and publish() below will not work + // Note that when using the alternative constructor - `EspMqttClient::new` - you don't need to + // spawn a new thread, as the messages will be pumped with a backpressure into the callback you provide. + // Yet, you still need to efficiently process each message in the callback without blocking for too long. + thread::spawn(move || { + info!("MQTT Listening for messages"); + + while let Some(msg) = connection.next() { + match msg { + Err(e) => info!("MQTT Message ERROR: {}", e), + Ok(msg) => { + eventloop.post(&Message::new([0; 256]), None).unwrap(); + info!("MQTT Message: {:?}", msg); + }, + } + } + + info!("MQTT connection loop exit"); + }); + + client.subscribe("rust-esp32-std-demo", QoS::AtMostOnce)?; + + info!("Subscribed to all topics (rust-esp32-std-demo)"); + + client.publish( + "rust-esp32-std-demo", + QoS::AtMostOnce, + false, + "Hello from rust-esp32-std-demo!".as_bytes(), + )?; + + info!("Published a hello message to topic \"rust-esp32-std-demo\""); + + Ok(()) +} + +// this one is both together pub fn mqtt_client(broker: &str, mut eventloop: EspBackgroundEventLoop) -> Result>> { info!("About to start MQTT client"); diff --git a/sphinx-key/src/core/events.rs b/sphinx-key/src/core/events.rs index 7c136db..03b9d8c 100644 --- a/sphinx-key/src/core/events.rs +++ b/sphinx-key/src/core/events.rs @@ -2,6 +2,11 @@ use esp_idf_svc::eventloop::*; use embedded_svc::httpd::Result; use esp_idf_sys::{self, c_types}; +use embedded_svc::mqtt::client::utils::ConnState; +use embedded_svc::mqtt::client::{Client, Connection, MessageImpl, Publish, QoS}; +use esp_idf_svc::mqtt::client::*; +use esp_idf_sys::EspError; +use std::sync::Arc; use log::*; const MSG_SIZE: usize = 256; @@ -39,7 +44,7 @@ impl EspTypedEventDeserializer for Message { } } -pub fn make_eventloop() -> Result<(EspBackgroundEventLoop, EspBackgroundSubscription)> { +pub fn make_eventloop(client: &EspMqttClient>) -> Result<(EspBackgroundEventLoop, EspBackgroundSubscription)> { use embedded_svc::event_bus::EventBus; info!("About to start a background event loop"); @@ -48,6 +53,12 @@ pub fn make_eventloop() -> Result<(EspBackgroundEventLoop, EspBackgroundSubscrip info!("About to subscribe to the background event loop"); let subscription = eventloop.subscribe(|message: &Message| { info!("!!! Got message from the event loop"); //: {:?}", message.0); + let _ = client.publish( + "rust-esp32-std-demo-return", + QoS::AtMostOnce, + false, + "Hello from rust-esp32-std-demo!".as_bytes(), + ); })?; // let subscription = eventloop.subscribe(cb)?; diff --git a/sphinx-key/src/main.rs b/sphinx-key/src/main.rs index cf563a2..9176e7d 100644 --- a/sphinx-key/src/main.rs +++ b/sphinx-key/src/main.rs @@ -15,7 +15,6 @@ use esp_idf_svc::nvs::*; use esp_idf_svc::nvs_storage::EspNvsStorage; use embedded_svc::storage::Storage; use embedded_svc::wifi::Wifi; -use embedded_svc::event_bus::EventBus; fn main() -> Result<()> { // Temporary. Will disappear once ESP-IDF 4.4 is released, but for now it is necessary to call this function once, @@ -35,9 +34,11 @@ fn main() -> Result<()> { println!("=============> START CLIENT NOW <============== {:?}", exist); // store.remove("config").expect("couldnt remove config"); let wifi = start_client(default_nvs.clone(), &exist)?; + + let mqtt = conn::mqtt::make_client(&exist.broker)?; // if the subscription goes out of scope its dropped // the sub needs to publish back to mqtt??? - let (eventloop, _sub) = make_eventloop()?; + let (eventloop, _sub) = make_eventloop(&mqtt)?; let mqtt_client = conn::mqtt::mqtt_client(&exist.broker, eventloop)?; println!("{:?}", wifi.get_status());