diff --git a/sphinx-key/src/conn/mqtt.rs b/sphinx-key/src/conn/mqtt.rs index cc2c3de..2bb78ba 100644 --- a/sphinx-key/src/conn/mqtt.rs +++ b/sphinx-key/src/conn/mqtt.rs @@ -4,6 +4,7 @@ use embedded_svc::event_bus::Postbox; use embedded_svc::event_bus::EventBus; use embedded_svc::mqtt::client::utils::ConnState; use embedded_svc::mqtt::client::{Client, Connection, MessageImpl, Publish, QoS}; +use embedded_svc::mqtt::client::utils::Connection as MqttConnection; use esp_idf_svc::mqtt::client::*; use anyhow::Result; use esp_idf_svc::eventloop::*; @@ -11,8 +12,10 @@ use log::*; use std::thread; use esp_idf_sys::{self}; use esp_idf_sys::EspError; +use esp_idf_hal::mutex::Condvar; +use std::sync::{Arc, Mutex}; -pub fn make_client(broker: &str) -> Result>> { +pub fn make_client(broker: &str) -> Result<(EspMqttClient>, MqttConnection)> { let conf = MqttClientConfiguration { client_id: Some("rust-esp32-std-demo-1"), // FIXME - mqtts @@ -22,14 +25,15 @@ pub fn make_client(broker: &str) -> Result CONNECT TO {}", b); - let (mut client, mut connection) = EspMqttClient::new_with_conn(b, &conf)?; - + // let (mut client, mut connection) = EspMqttClient::new_with_conn(b, &conf)?; + let cc = EspMqttClient::new_with_conn(b, &conf)?; +// info!("MQTT client started"); - Ok(client) + Ok(cc) } -pub fn start_listening(client: &EspMqttClient>) -> Result<()> { +pub fn start_listening(mqtt: Arc>>>, mut connection: MqttConnection, mut eventloop: EspBackgroundEventLoop) -> Result<()> { // Need to immediately start pumping the connection for messages, or else subscribe() and publish() below will not work // Note that when using the alternative constructor - `EspMqttClient::new` - you don't need to // spawn a new thread, as the messages will be pumped with a backpressure into the callback you provide. @@ -50,6 +54,8 @@ pub fn start_listening(client: &EspMqttClient>) info!("MQTT connection loop exit"); }); + let mut client = mqtt.lock().unwrap(); + client.subscribe("rust-esp32-std-demo", QoS::AtMostOnce)?; info!("Subscribed to all topics (rust-esp32-std-demo)"); diff --git a/sphinx-key/src/core/events.rs b/sphinx-key/src/core/events.rs index 03b9d8c..e6d0b81 100644 --- a/sphinx-key/src/core/events.rs +++ b/sphinx-key/src/core/events.rs @@ -6,7 +6,7 @@ use embedded_svc::mqtt::client::utils::ConnState; use embedded_svc::mqtt::client::{Client, Connection, MessageImpl, Publish, QoS}; use esp_idf_svc::mqtt::client::*; use esp_idf_sys::EspError; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use log::*; const MSG_SIZE: usize = 256; @@ -44,16 +44,17 @@ impl EspTypedEventDeserializer for Message { } } -pub fn make_eventloop(client: &EspMqttClient>) -> Result<(EspBackgroundEventLoop, EspBackgroundSubscription)> { +pub fn make_eventloop(client: Arc>>>) -> Result<(EspBackgroundEventLoop, EspBackgroundSubscription)> { use embedded_svc::event_bus::EventBus; info!("About to start a background event loop"); let mut eventloop = EspBackgroundEventLoop::new(&Default::default())?; info!("About to subscribe to the background event loop"); - let subscription = eventloop.subscribe(|message: &Message| { + let subscription = eventloop.subscribe(move |message: &Message| { info!("!!! Got message from the event loop"); //: {:?}", message.0); - let _ = client.publish( + let mut mqtt_ = client.lock().unwrap(); + let _ = mqtt_.publish( "rust-esp32-std-demo-return", QoS::AtMostOnce, false, diff --git a/sphinx-key/src/main.rs b/sphinx-key/src/main.rs index 9176e7d..f1f9102 100644 --- a/sphinx-key/src/main.rs +++ b/sphinx-key/src/main.rs @@ -7,7 +7,7 @@ use crate::core::{events::*, config::*}; use sphinx_key_signer; use esp_idf_sys as _; // If using the `binstart` feature of `esp-idf-sys`, always keep this module imported use std::thread; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::time::Duration; use anyhow::Result; @@ -35,11 +35,13 @@ fn main() -> Result<()> { // store.remove("config").expect("couldnt remove config"); let wifi = start_client(default_nvs.clone(), &exist)?; - let mqtt = conn::mqtt::make_client(&exist.broker)?; + let mqtt_and_conn = conn::mqtt::make_client(&exist.broker)?; + + let mqtt = Arc::new(Mutex::new(mqtt_and_conn.0)); // if the subscription goes out of scope its dropped // the sub needs to publish back to mqtt??? - let (eventloop, _sub) = make_eventloop(&mqtt)?; - let mqtt_client = conn::mqtt::mqtt_client(&exist.broker, eventloop)?; + let (eventloop, _sub) = make_eventloop(mqtt.clone())?; + let mqtt_client = conn::mqtt::start_listening(mqtt, mqtt_and_conn.1, eventloop)?; println!("{:?}", wifi.get_status()); for s in 0..60 {