use Arc<Mutex<MqttClient>> to pass across the thread bound, first create client, then eventloop, then listen

This commit is contained in:
Evan Feenstra
2022-05-24 17:37:41 -07:00
parent 42d7c2f857
commit e592394230
3 changed files with 22 additions and 13 deletions

View File

@@ -4,6 +4,7 @@ use embedded_svc::event_bus::Postbox;
use embedded_svc::event_bus::EventBus; use embedded_svc::event_bus::EventBus;
use embedded_svc::mqtt::client::utils::ConnState; use embedded_svc::mqtt::client::utils::ConnState;
use embedded_svc::mqtt::client::{Client, Connection, MessageImpl, Publish, QoS}; use embedded_svc::mqtt::client::{Client, Connection, MessageImpl, Publish, QoS};
use embedded_svc::mqtt::client::utils::Connection as MqttConnection;
use esp_idf_svc::mqtt::client::*; use esp_idf_svc::mqtt::client::*;
use anyhow::Result; use anyhow::Result;
use esp_idf_svc::eventloop::*; use esp_idf_svc::eventloop::*;
@@ -11,8 +12,10 @@ use log::*;
use std::thread; use std::thread;
use esp_idf_sys::{self}; use esp_idf_sys::{self};
use esp_idf_sys::EspError; use esp_idf_sys::EspError;
use esp_idf_hal::mutex::Condvar;
use std::sync::{Arc, Mutex};
pub fn make_client(broker: &str) -> Result<EspMqttClient<ConnState<MessageImpl, EspError>>> { pub fn make_client(broker: &str) -> Result<(EspMqttClient<ConnState<MessageImpl, EspError>>, MqttConnection<Condvar, MessageImpl, EspError>)> {
let conf = MqttClientConfiguration { let conf = MqttClientConfiguration {
client_id: Some("rust-esp32-std-demo-1"), client_id: Some("rust-esp32-std-demo-1"),
// FIXME - mqtts // FIXME - mqtts
@@ -22,14 +25,15 @@ pub fn make_client(broker: &str) -> Result<EspMqttClient<ConnState<MessageImpl,
let b = format!("mqtt://{}", broker); let b = format!("mqtt://{}", broker);
println!("===> CONNECT TO {}", b); println!("===> CONNECT TO {}", b);
let (mut client, mut connection) = EspMqttClient::new_with_conn(b, &conf)?; // let (mut client, mut connection) = EspMqttClient::new_with_conn(b, &conf)?;
let cc = EspMqttClient::new_with_conn(b, &conf)?;
//
info!("MQTT client started"); info!("MQTT client started");
Ok(client) Ok(cc)
} }
pub fn start_listening(client: &EspMqttClient<ConnState<MessageImpl, EspError>>) -> Result<()> { pub fn start_listening(mqtt: Arc<Mutex<EspMqttClient<ConnState<MessageImpl, EspError>>>>, mut connection: MqttConnection<Condvar, MessageImpl, EspError>, mut eventloop: EspBackgroundEventLoop) -> Result<()> {
// Need to immediately start pumping the connection for messages, or else subscribe() and publish() below will not work // Need to immediately start pumping the connection for messages, or else subscribe() and publish() below will not work
// Note that when using the alternative constructor - `EspMqttClient::new` - you don't need to // Note that when using the alternative constructor - `EspMqttClient::new` - you don't need to
// spawn a new thread, as the messages will be pumped with a backpressure into the callback you provide. // spawn a new thread, as the messages will be pumped with a backpressure into the callback you provide.
@@ -50,6 +54,8 @@ pub fn start_listening(client: &EspMqttClient<ConnState<MessageImpl, EspError>>)
info!("MQTT connection loop exit"); info!("MQTT connection loop exit");
}); });
let mut client = mqtt.lock().unwrap();
client.subscribe("rust-esp32-std-demo", QoS::AtMostOnce)?; client.subscribe("rust-esp32-std-demo", QoS::AtMostOnce)?;
info!("Subscribed to all topics (rust-esp32-std-demo)"); info!("Subscribed to all topics (rust-esp32-std-demo)");

View File

@@ -6,7 +6,7 @@ use embedded_svc::mqtt::client::utils::ConnState;
use embedded_svc::mqtt::client::{Client, Connection, MessageImpl, Publish, QoS}; use embedded_svc::mqtt::client::{Client, Connection, MessageImpl, Publish, QoS};
use esp_idf_svc::mqtt::client::*; use esp_idf_svc::mqtt::client::*;
use esp_idf_sys::EspError; use esp_idf_sys::EspError;
use std::sync::Arc; use std::sync::{Arc, Mutex};
use log::*; use log::*;
const MSG_SIZE: usize = 256; const MSG_SIZE: usize = 256;
@@ -44,16 +44,17 @@ impl EspTypedEventDeserializer<Message> for Message {
} }
} }
pub fn make_eventloop(client: &EspMqttClient<ConnState<MessageImpl, EspError>>) -> Result<(EspBackgroundEventLoop, EspBackgroundSubscription)> { pub fn make_eventloop(client: Arc<Mutex<EspMqttClient<ConnState<MessageImpl, EspError>>>>) -> Result<(EspBackgroundEventLoop, EspBackgroundSubscription)> {
use embedded_svc::event_bus::EventBus; use embedded_svc::event_bus::EventBus;
info!("About to start a background event loop"); info!("About to start a background event loop");
let mut eventloop = EspBackgroundEventLoop::new(&Default::default())?; let mut eventloop = EspBackgroundEventLoop::new(&Default::default())?;
info!("About to subscribe to the background event loop"); info!("About to subscribe to the background event loop");
let subscription = eventloop.subscribe(|message: &Message| { let subscription = eventloop.subscribe(move |message: &Message| {
info!("!!! Got message from the event loop"); //: {:?}", message.0); info!("!!! Got message from the event loop"); //: {:?}", message.0);
let _ = client.publish( let mut mqtt_ = client.lock().unwrap();
let _ = mqtt_.publish(
"rust-esp32-std-demo-return", "rust-esp32-std-demo-return",
QoS::AtMostOnce, QoS::AtMostOnce,
false, false,

View File

@@ -7,7 +7,7 @@ use crate::core::{events::*, config::*};
use sphinx_key_signer; use sphinx_key_signer;
use esp_idf_sys as _; // If using the `binstart` feature of `esp-idf-sys`, always keep this module imported use esp_idf_sys as _; // If using the `binstart` feature of `esp-idf-sys`, always keep this module imported
use std::thread; use std::thread;
use std::sync::Arc; use std::sync::{Arc, Mutex};
use std::time::Duration; use std::time::Duration;
use anyhow::Result; use anyhow::Result;
@@ -35,11 +35,13 @@ fn main() -> Result<()> {
// store.remove("config").expect("couldnt remove config"); // store.remove("config").expect("couldnt remove config");
let wifi = start_client(default_nvs.clone(), &exist)?; let wifi = start_client(default_nvs.clone(), &exist)?;
let mqtt = conn::mqtt::make_client(&exist.broker)?; let mqtt_and_conn = conn::mqtt::make_client(&exist.broker)?;
let mqtt = Arc::new(Mutex::new(mqtt_and_conn.0));
// if the subscription goes out of scope its dropped // if the subscription goes out of scope its dropped
// the sub needs to publish back to mqtt??? // the sub needs to publish back to mqtt???
let (eventloop, _sub) = make_eventloop(&mqtt)?; let (eventloop, _sub) = make_eventloop(mqtt.clone())?;
let mqtt_client = conn::mqtt::mqtt_client(&exist.broker, eventloop)?; let mqtt_client = conn::mqtt::start_listening(mqtt, mqtt_and_conn.1, eventloop)?;
println!("{:?}", wifi.get_status()); println!("{:?}", wifi.get_status());
for s in 0..60 { for s in 0..60 {