From bcf936344cc85b2affc4848802710fff7ed1aa6a Mon Sep 17 00:00:00 2001 From: Evan Feenstra Date: Tue, 24 May 2022 08:44:47 -0700 Subject: [PATCH] eventloop background processor, MQTT client --- sphinx-key/hmq.html | 202 ++++++++++++++++++++++++++++++++++ sphinx-key/{go.sh => run.sh} | 0 sphinx-key/src/clear.rs | 7 +- sphinx-key/src/conn/http.rs | 12 +- sphinx-key/src/conn/mod.rs | 21 +--- sphinx-key/src/conn/mqtt.rs | 69 ++++++++++++ sphinx-key/src/conn/wifi.rs | 24 ++-- sphinx-key/src/core/config.rs | 67 +++++++++++ sphinx-key/src/core/events.rs | 55 +++++++++ sphinx-key/src/core/mod.rs | 2 + sphinx-key/src/main.rs | 101 ++++------------- 11 files changed, 434 insertions(+), 126 deletions(-) create mode 100644 sphinx-key/hmq.html rename sphinx-key/{go.sh => run.sh} (100%) create mode 100644 sphinx-key/src/conn/mqtt.rs create mode 100644 sphinx-key/src/core/config.rs create mode 100644 sphinx-key/src/core/events.rs create mode 100644 sphinx-key/src/core/mod.rs diff --git a/sphinx-key/hmq.html b/sphinx-key/hmq.html new file mode 100644 index 0000000..8e534b8 --- /dev/null +++ b/sphinx-key/hmq.html @@ -0,0 +1,202 @@ + + + + + + + HMQ CLIENT + + + + + +
+
+
+

Connection + {{ connection.state }} + ({{ connection.error }}) + +

+
+

+ +

+

+ +

+

+ +

+

+ +

+
+
+
+ +
+
+

Subscriptions

+
+

+ +

+

+ +

+

+ +

+
+
    +
  • +
    +
    + QoS + {{ info.qos }} +
    +
    +
    +
    + Topic + {{ filter }} +
    +
    + +
  • +
+
+
+ +
+
+

Messages

+
+

+ +

+

+ +

+

+ +

+
+
    +
  • + {{ message.topic }}: + {{ message.payload }} +
  • +
+
+
+
+ + + + + + + \ No newline at end of file diff --git a/sphinx-key/go.sh b/sphinx-key/run.sh similarity index 100% rename from sphinx-key/go.sh rename to sphinx-key/run.sh diff --git a/sphinx-key/src/clear.rs b/sphinx-key/src/clear.rs index e979f32..8292bd3 100644 --- a/sphinx-key/src/clear.rs +++ b/sphinx-key/src/clear.rs @@ -1,17 +1,12 @@ -#![allow(unused_imports)] - use esp_idf_sys as _; // If using the `binstart` feature of `esp-idf-sys`, always keep this module imported use esp_idf_svc::nvs::*; use esp_idf_svc::nvs_storage::EspNvsStorage; - -use embedded_svc::httpd::*; -use embedded_svc::wifi::*; use embedded_svc::storage::Storage; use std::sync::Arc; -fn main() -> Result<()> { +fn main() -> anyhow::Result<()> { let default_nvs = Arc::new(EspDefaultNvs::new()?); let mut store = EspNvsStorage::new_default(default_nvs.clone(), "sphinx", true).expect("no storage"); store.remove("config").expect("couldnt remove config 1"); diff --git a/sphinx-key/src/conn/http.rs b/sphinx-key/src/conn/http.rs index 989eac3..f21cc1c 100644 --- a/sphinx-key/src/conn/http.rs +++ b/sphinx-key/src/conn/http.rs @@ -1,12 +1,16 @@ -use crate::conn::{Params, Config}; use crate::conn::html; +use crate::core::config::Config; -use url; use embedded_svc::httpd::*; use esp_idf_svc::httpd as idf; use std::sync::{Condvar, Mutex, Arc}; use embedded_svc::httpd::registry::Registry; -use esp_idf_sys::{self}; +use serde::Deserialize; + +#[derive(Clone, Debug, Deserialize)] +pub struct Params { + pub config: String +} #[allow(unused_variables)] pub fn config_server(mutex: Arc<(Mutex>, Condvar)>) -> Result { @@ -15,7 +19,7 @@ pub fn config_server(mutex: Arc<(Mutex>, Condvar)>) -> Result Result>> { + info!("About to start MQTT client"); + + let conf = MqttClientConfiguration { + client_id: Some("rust-esp32-std-demo-1"), + // FIXME - mqtts + // crt_bundle_attach: Some(esp_idf_sys::esp_crt_bundle_attach), + ..Default::default() + }; + + let b = format!("mqtt://{}", broker); + println!("===> CONNECT TO {}", b); + let (mut client, mut connection) = EspMqttClient::new_with_conn(b, &conf)?; + + info!("MQTT client started"); + + // let subscription = eventloop.subscribe(|message: &Message| { + // log::info!("!!! Got message from the event loop"); //: {:?}", message.0); + // })?; + + // Need to immediately start pumping the connection for messages, or else subscribe() and publish() below will not work + // Note that when using the alternative constructor - `EspMqttClient::new` - you don't need to + // spawn a new thread, as the messages will be pumped with a backpressure into the callback you provide. + // Yet, you still need to efficiently process each message in the callback without blocking for too long. + thread::spawn(move || { + info!("MQTT Listening for messages"); + + while let Some(msg) = connection.next() { + match msg { + Err(e) => info!("MQTT Message ERROR: {}", e), + Ok(msg) => { + eventloop.post(&Message::new([0; 256]), None).unwrap(); + info!("MQTT Message: {:?}", msg); + }, + } + } + + info!("MQTT connection loop exit"); + }); + + client.subscribe("rust-esp32-std-demo", QoS::AtMostOnce)?; + + info!("Subscribed to all topics (rust-esp32-std-demo)"); + + client.publish( + "rust-esp32-std-demo", + QoS::AtMostOnce, + false, + "Hello from rust-esp32-std-demo!".as_bytes(), + )?; + + info!("Published a hello message to topic \"rust-esp32-std-demo\""); + + Ok(client) +} \ No newline at end of file diff --git a/sphinx-key/src/conn/wifi.rs b/sphinx-key/src/conn/wifi.rs index a1b497c..c6ef6fb 100644 --- a/sphinx-key/src/conn/wifi.rs +++ b/sphinx-key/src/conn/wifi.rs @@ -1,4 +1,4 @@ -use crate::conn::Config; +use crate::core::config::Config; use esp_idf_svc::wifi::*; use esp_idf_svc::sysloop::*; @@ -12,18 +12,18 @@ use embedded_svc::ping::Ping; use embedded_svc::ipv4; use log::*; -use anyhow::bail; use std::time::Duration; use std::sync::Arc; use std::thread; #[allow(dead_code)] pub fn start_client( - netif_stack: Arc, - sys_loop_stack: Arc, default_nvs: Arc, config: &Config, ) -> Result> { + let netif_stack = Arc::new(EspNetifStack::new()?); + let sys_loop_stack = Arc::new(EspSysLoopStack::new()?); + let mut wifi = Box::new(EspWifi::new(netif_stack, sys_loop_stack, default_nvs)?); let ap_infos = wifi.scan()?; let ssid = config.ssid.as_str(); @@ -47,16 +47,7 @@ pub fn start_client( } ))?; - // not working - info!("Wifi client configuration set, about to get status"); - match wifi.wait_status_with_timeout(Duration::from_secs(20), |status| !status.is_transitional()) { - Ok(_) => (), - Err(e) => warn!("Unexpected Wifi status: {:?}", e), - }; - let status = wifi.get_status(); - println!("=> wifi STATUS 1 {:?}", status); - - info!("...Wifi client configuration set, AGAIN get status"); + info!("...Wifi client configuration set, get status"); match wifi.wait_status_with_timeout(Duration::from_secs(20), |status| !status.is_transitional()) { Ok(_) => (), Err(e) => warn!("Unexpected Wifi status: {:?}", e), @@ -84,10 +75,11 @@ pub fn start_client( #[allow(dead_code)] pub fn start_server( - netif_stack: Arc, - sys_loop_stack: Arc, default_nvs: Arc, ) -> Result> { + let netif_stack = Arc::new(EspNetifStack::new()?); + let sys_loop_stack = Arc::new(EspSysLoopStack::new()?); + let mut wifi = Box::new(EspWifi::new(netif_stack, sys_loop_stack, default_nvs)?); wifi.set_configuration(&Configuration::AccessPoint( AccessPointConfiguration { diff --git a/sphinx-key/src/core/config.rs b/sphinx-key/src/core/config.rs new file mode 100644 index 0000000..8750e83 --- /dev/null +++ b/sphinx-key/src/core/config.rs @@ -0,0 +1,67 @@ +use crate::conn; + +use anyhow::Result; +use std::sync::{Condvar, Mutex, Arc}; +use std::time::Duration; +use serde::{Serialize, Deserialize}; + +use esp_idf_svc::nvs::*; +use esp_idf_svc::wifi::*; +use embedded_svc::wifi::*; + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct Config { + pub broker: String, + pub ssid: String, + pub pass: String, +} + +/* +52.91.253.115:1883 + +arp -a + +http://192.168.71.1/?broker=52.91.253.115%3A1883 +*/ + +pub fn start_client(default_nvs: Arc, config: &Config) -> Result> { + let wifi = conn::wifi::start_client( + default_nvs, + config + )?; + println!("CLIENT CONNECTED!!!!!! {:?}", wifi.get_status()); + Ok(wifi) +} + +pub fn start_server_and_wait(default_nvs: Arc) -> Result<(Box, Config)> { + + let mutex = Arc::new((Mutex::new(None), Condvar::new())); + + #[allow(clippy::redundant_clone)] + #[allow(unused_mut)] + let mut wifi = conn::wifi::start_server( + default_nvs.clone(), + )?; + + let httpd = conn::http::config_server(mutex.clone()); + + let mut wait = mutex.0.lock().unwrap(); + + let config: &Config = loop { + if let Some(conf) = &*wait { + break conf; + } else { + wait = mutex + .1 + .wait_timeout(wait, Duration::from_secs(1)) + .unwrap() + .0; + } + }; + + drop(httpd); + // drop(wifi); + // thread::sleep(Duration::from_secs(1)); + println!("===> config! {:?}", config); + Ok((wifi, config.clone())) +} \ No newline at end of file diff --git a/sphinx-key/src/core/events.rs b/sphinx-key/src/core/events.rs new file mode 100644 index 0000000..7c136db --- /dev/null +++ b/sphinx-key/src/core/events.rs @@ -0,0 +1,55 @@ + +use esp_idf_svc::eventloop::*; +use embedded_svc::httpd::Result; +use esp_idf_sys::{self, c_types}; +use log::*; + +const MSG_SIZE: usize = 256; + +#[derive(Copy, Clone, Debug)] +pub struct Message([u8; MSG_SIZE]); + +impl Message { + pub fn new(bytes: [u8; MSG_SIZE]) -> Self { + Self(bytes) + } +} + +impl EspTypedEventSource for Message { + fn source() -> *const c_types::c_char { + b"DEMO-SERVICE\0".as_ptr() as *const _ + } +} + +impl EspTypedEventSerializer for Message { + fn serialize( + event: &Message, + f: impl for<'a> FnOnce(&'a EspEventPostData) -> R, + ) -> R { + f(&unsafe { EspEventPostData::new(Self::source(), Self::event_id(), event) }) + } +} + +impl EspTypedEventDeserializer for Message { + fn deserialize( + data: &EspEventFetchData, + f: &mut impl for<'a> FnMut(&'a Message) -> R, + ) -> R { + f(unsafe { data.as_payload() }) + } +} + +pub fn make_eventloop() -> Result<(EspBackgroundEventLoop, EspBackgroundSubscription)> { + use embedded_svc::event_bus::EventBus; + + info!("About to start a background event loop"); + let mut eventloop = EspBackgroundEventLoop::new(&Default::default())?; + + info!("About to subscribe to the background event loop"); + let subscription = eventloop.subscribe(|message: &Message| { + info!("!!! Got message from the event loop"); //: {:?}", message.0); + })?; + // let subscription = eventloop.subscribe(cb)?; + + Ok((eventloop, subscription)) +} \ No newline at end of file diff --git a/sphinx-key/src/core/mod.rs b/sphinx-key/src/core/mod.rs new file mode 100644 index 0000000..401876e --- /dev/null +++ b/sphinx-key/src/core/mod.rs @@ -0,0 +1,2 @@ +pub mod events; +pub mod config; \ No newline at end of file diff --git a/sphinx-key/src/main.rs b/sphinx-key/src/main.rs index f7c0c33..cf563a2 100644 --- a/sphinx-key/src/main.rs +++ b/sphinx-key/src/main.rs @@ -1,27 +1,21 @@ -#![allow(unused_imports)] mod conn; +mod core; + +use crate::core::{events::*, config::*}; use sphinx_key_signer; use esp_idf_sys as _; // If using the `binstart` feature of `esp-idf-sys`, always keep this module imported use std::thread; -use log::*; - -use std::sync::{Condvar, Mutex, Arc, atomic::*}; -use std::time::*; +use std::sync::Arc; +use std::time::Duration; +use anyhow::Result; use esp_idf_svc::nvs::*; use esp_idf_svc::nvs_storage::EspNvsStorage; -use esp_idf_svc::netif::*; -use esp_idf_svc::eventloop::*; -use esp_idf_svc::sysloop::*; -use esp_idf_svc::wifi::*; - -use embedded_svc::httpd::*; -use embedded_svc::wifi::*; use embedded_svc::storage::Storage; -// use log::*; -// use url; +use embedded_svc::wifi::Wifi; +use embedded_svc::event_bus::EventBus; fn main() -> Result<()> { // Temporary. Will disappear once ESP-IDF 4.4 is released, but for now it is necessary to call this function once, @@ -35,20 +29,26 @@ fn main() -> Result<()> { thread::sleep(Duration::from_secs(1)); let default_nvs = Arc::new(EspDefaultNvs::new()?); - // let storage = Arc::new(Mutex::new(EspNvsStorage::new_default(default_nvs.clone(), "sphinx", true).expect("NVS FAIL"))); let mut store = EspNvsStorage::new_default(default_nvs.clone(), "sphinx", true).expect("no storage"); - // uncomment to clear: - // store.remove("config").expect("couldnt remove config 1"); - let existing: Option = store.get("config").expect("failed"); + let existing: Option = store.get("config").expect("failed"); if let Some(exist) = existing { println!("=============> START CLIENT NOW <============== {:?}", exist); // store.remove("config").expect("couldnt remove config"); - if let Err(e) = start_client(default_nvs.clone(), &exist) { - error!("CLIENT ERROR {:?}", e); + let wifi = start_client(default_nvs.clone(), &exist)?; + // if the subscription goes out of scope its dropped + // the sub needs to publish back to mqtt??? + let (eventloop, _sub) = make_eventloop()?; + let mqtt_client = conn::mqtt::mqtt_client(&exist.broker, eventloop)?; + + println!("{:?}", wifi.get_status()); + for s in 0..60 { + log::info!("Shutting down in {} secs", 60 - s); + thread::sleep(Duration::from_secs(1)); } + drop(wifi); } else { println!("=============> START SERVER NOW AND WAIT <=============="); - if let Ok((mut wifi, config)) = start_server_and_wait(default_nvs.clone()) { + if let Ok((wifi, config)) = start_server_and_wait(default_nvs.clone()) { store.put("config", &config).expect("could not store config"); println!("CONFIG SAVED"); drop(wifi); @@ -58,62 +58,3 @@ fn main() -> Result<()> { Ok(()) } - -fn start_server_and_wait(default_nvs: Arc) -> Result<(Box, conn::Config)> { - let netif_stack = Arc::new(EspNetifStack::new()?); - let sys_loop_stack = Arc::new(EspSysLoopStack::new()?); - - let mutex = Arc::new((Mutex::new(None), Condvar::new())); - - #[allow(clippy::redundant_clone)] - #[allow(unused_mut)] - let mut wifi = conn::wifi::start_server( - netif_stack.clone(), - sys_loop_stack.clone(), - default_nvs.clone(), - )?; - - let httpd = conn::http::config_server(mutex.clone()); - - let mut wait = mutex.0.lock().unwrap(); - - let config: &conn::Config = loop { - if let Some(conf) = &*wait { - break conf; - } else { - wait = mutex - .1 - .wait_timeout(wait, Duration::from_secs(1)) - .unwrap() - .0; - } - }; - - drop(httpd); - // drop(wifi); - // thread::sleep(Duration::from_secs(1)); - println!("===> config! {:?}", config); - Ok((wifi, config.clone())) -} - -fn start_client(default_nvs: Arc, config: &conn::Config) -> Result<()> { - let netif_stack = Arc::new(EspNetifStack::new()?); - let sys_loop_stack = Arc::new(EspSysLoopStack::new()?); - - let wifi = conn::wifi::start_client( - netif_stack.clone(), - sys_loop_stack.clone(), - default_nvs.clone(), - config - )?; - - println!("CLIENT CONNECTED!!!!!! {:?}", wifi.get_status()); - - let mut i = 0; - loop { - thread::sleep(Duration::from_secs(5)); - i = i + 1; - println!("wait forever... {}", i); - } - Ok(()) -} \ No newline at end of file