eventloop background processor, MQTT client

This commit is contained in:
Evan Feenstra
2022-05-24 08:44:47 -07:00
parent 88439f169a
commit bcf936344c
11 changed files with 434 additions and 126 deletions

202
sphinx-key/hmq.html Normal file
View File

@@ -0,0 +1,202 @@
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>HMQ CLIENT</title>
<link rel="stylesheet" href="https://unpkg.com/bulma@0.6.1/css/bulma.css">
<style>
body{
min-height: 100vh;
background: #232327;
}
.title{
color:#26b1b2;
}
</style>
</head>
<body>
<section id="app">
<section class="section">
<div class="container">
<h1 class="title">Connection
<small v-bind:class="{'has-text-danger': connection.error, 'has-text-grey': !connection.error}">{{ connection.state }}
<span v-if="connection.error">({{ connection.error }})</span>
</small>
</h1>
<div class="field is-grouped">
<p class="control">
<input class="input" v-model="username" placeholder="Username">
</p>
<p class="control">
<input class="input" v-model="password" placeholder="Password">
</p>
<p class="control">
<button class="button is-primary" v-on:click="mqtt.connect()" v-bind:disabled="connected">Connect</button>
</p>
<p class="control">
<button class="button" v-on:click="mqtt.disconnect()" v-bind:disabled="!connected">Disconnect</button>
</p>
</div>
</div>
</section>
<section class="section">
<div class="container">
<h1 class="title">Subscriptions</h1>
<div class="field is-grouped">
<p class="control">
<input class="input" v-model="subscribe.filter" placeholder="Topic">
</p>
<p class="control">
<input class="input" v-model="subscribe.qos" placeholder="QoS">
</p>
<p class="control">
<button class="button" v-on:click="mqtt.subscribe(subscribe.filter, subscribe.qos)" v-bind:disabled="!connected">Subscribe</button>
</p>
</div>
<ul>
<li v-for="(info, filter) in subscriptions" class="field is-grouped">
<div class="control">
<div class="tags has-addons">
<span class="tag">QoS</span>
<span class="tag is-dark">{{ info.qos }}</span>
</div>
</div>
<div class="control">
<div class="tags has-addons">
<span class="tag">Topic</span>
<span class="tag is-primary">{{ filter }}</span>
</div>
</div>
<a class="tag is-delete is-danger" v-on:click="mqtt.unsubscribe(filter)" v-bind:disabled="!connected"></a>
</li>
</ul>
</div>
</section>
<section class="section">
<div class="container">
<h1 class="title">Messages</h1>
<div class="field is-grouped">
<p class="control">
<input class="input" v-model="publish.topic" placeholder="Topic">
</p>
<p class="control">
<input class="input" v-model="publish.payload" placeholder="Payload">
</p>
<p class="control">
<button class="button" v-on:click="mqtt.publish(publish.topic, publish.payload)" v-bind:disabled="!connected">Publish</button>
</p>
</div>
<ul>
<li v-for="message in messages">
<code>{{ message.topic }}</code>:
<code>{{ message.payload }}</code>
</li>
</ul>
</div>
</section>
</section>
<script src="https://unpkg.com/paho-mqtt@1.1.0"></script>
<script src="https://unpkg.com/vue@2.5.9"></script>
<script>
const IP = '52.91.253.115'
const PORT = 1888
var app = new Vue({
el: '#app',
created() {
var mqtt = new Paho.Client(IP, Number(PORT), "/ws", "client1");
mqtt.onConnectionLost = ({ errorCode, errorMessage }) => {
this.connection.state = 'disconnected';
this.connection.error = (errorCode === 0) ? '' : errorMessage;
}
mqtt.onMessageArrived = ({ payloadBytes, destinationName, qos }) => {
this.messages.unshift({
received: new Date(),
payload: String.fromCharCode.apply(String, payloadBytes),
payload_raw: payloadBytes,
topic: destinationName,
qos,
})
this.messages = this.messages.slice(0, 10)
},
this.mqtt = {
connect: () => {
this.connection.state = 'connecting';
this.connection.error = '';
console.log(this.username, this.password)
mqtt.connect({
userName: this.username || '',
password: this.password || '',
onSuccess: () => {
this.connection.state = 'connected';
},
onFailure: ({ errorCode, errorMessage }) => {
this.connection.state = 'connection failed';
this.connection.error = errorMessage;
},
});
},
disconnect: () => {
mqtt.disconnect();
this.subscriptions = {};
},
publish: (topic, payload) => {
console.log('publish',topic,payload)
mqtt.send(topic, payload);
},
subscribe: (filter, qos) => {
qos = parseInt(qos);
Vue.set(this.subscriptions, filter, {
state: 'subscribing',
qos,
})
mqtt.subscribe(filter, {
qos,
onSuccess: () => {
this.subscriptions[filter].state = 'subscribed';
}
});
},
unsubscribe: (filter) => {
if (this.subscriptions[filter]) {
this.subscriptions[filter].state = 'unsubscribing';
mqtt.unsubscribe(filter, {
onSuccess: () => {
Vue.delete(this.subscriptions, filter);
}
});
}
},
}
},
data: {
username: '',
password: '',
connection: {
state: 'idle',
error: '',
},
subscribe: {
filter: '',
qos: 0,
},
publish: {
topic: '',
payload: '',
},
subscriptions: {},
messages: [],
},
computed: {
connected() { return this.connection.state == 'connected'; },
}
});
</script>
</body>
</html>

View File

@@ -1,17 +1,12 @@
#![allow(unused_imports)]
use esp_idf_sys as _; // If using the `binstart` feature of `esp-idf-sys`, always keep this module imported
use esp_idf_svc::nvs::*;
use esp_idf_svc::nvs_storage::EspNvsStorage;
use embedded_svc::httpd::*;
use embedded_svc::wifi::*;
use embedded_svc::storage::Storage;
use std::sync::Arc;
fn main() -> Result<()> {
fn main() -> anyhow::Result<()> {
let default_nvs = Arc::new(EspDefaultNvs::new()?);
let mut store = EspNvsStorage::new_default(default_nvs.clone(), "sphinx", true).expect("no storage");
store.remove("config").expect("couldnt remove config 1");

View File

@@ -1,12 +1,16 @@
use crate::conn::{Params, Config};
use crate::conn::html;
use crate::core::config::Config;
use url;
use embedded_svc::httpd::*;
use esp_idf_svc::httpd as idf;
use std::sync::{Condvar, Mutex, Arc};
use embedded_svc::httpd::registry::Registry;
use esp_idf_sys::{self};
use serde::Deserialize;
#[derive(Clone, Debug, Deserialize)]
pub struct Params {
pub config: String
}
#[allow(unused_variables)]
pub fn config_server(mutex: Arc<(Mutex<Option<Config>>, Condvar)>) -> Result<idf::Server> {
@@ -15,7 +19,7 @@ pub fn config_server(mutex: Arc<(Mutex<Option<Config>>, Condvar)>) -> Result<idf
.at("/")
.get(|_| Ok(html::HTML.into()))?
.at("/config")
.post(move |mut request| {
.post(move |request| {
let bod = &request.query_string()
.ok_or(anyhow::anyhow!("failed to parse query string"))?;
println!("bod {:?}", bod);

View File

@@ -1,26 +1,7 @@
pub mod wifi;
pub mod http;
pub mod mqtt;
mod html;
use serde::{Serialize, Deserialize};
#[derive(Clone, Debug, Deserialize)]
pub struct Params {
pub config: String
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Config {
pub broker: String,
pub ssid: String,
pub pass: String,
}
/*
52.91.253.115:1883
curl -X POST 192.168.71.1/config?config=%7B%22ssid%22%3A%22apples%26acorns%22%2C%22pass%22%3A%2242flutes%22%2C%22broker%22%3A%2252.91.253.115%3A1883%22%7D
arp -a
http://192.168.71.1/?broker=52.91.253.115%3A1883
*/

View File

@@ -0,0 +1,69 @@
use crate::core::events::Message;
use embedded_svc::event_bus::Postbox;
use embedded_svc::event_bus::EventBus;
use embedded_svc::mqtt::client::utils::ConnState;
use embedded_svc::mqtt::client::{Client, Connection, MessageImpl, Publish, QoS};
use esp_idf_svc::mqtt::client::*;
use anyhow::Result;
use esp_idf_svc::eventloop::*;
use log::*;
use std::thread;
use esp_idf_sys::{self};
use esp_idf_sys::EspError;
pub fn mqtt_client(broker: &str, mut eventloop: EspBackgroundEventLoop) -> Result<EspMqttClient<ConnState<MessageImpl, EspError>>> {
info!("About to start MQTT client");
let conf = MqttClientConfiguration {
client_id: Some("rust-esp32-std-demo-1"),
// FIXME - mqtts
// crt_bundle_attach: Some(esp_idf_sys::esp_crt_bundle_attach),
..Default::default()
};
let b = format!("mqtt://{}", broker);
println!("===> CONNECT TO {}", b);
let (mut client, mut connection) = EspMqttClient::new_with_conn(b, &conf)?;
info!("MQTT client started");
// let subscription = eventloop.subscribe(|message: &Message| {
// log::info!("!!! Got message from the event loop"); //: {:?}", message.0);
// })?;
// Need to immediately start pumping the connection for messages, or else subscribe() and publish() below will not work
// Note that when using the alternative constructor - `EspMqttClient::new` - you don't need to
// spawn a new thread, as the messages will be pumped with a backpressure into the callback you provide.
// Yet, you still need to efficiently process each message in the callback without blocking for too long.
thread::spawn(move || {
info!("MQTT Listening for messages");
while let Some(msg) = connection.next() {
match msg {
Err(e) => info!("MQTT Message ERROR: {}", e),
Ok(msg) => {
eventloop.post(&Message::new([0; 256]), None).unwrap();
info!("MQTT Message: {:?}", msg);
},
}
}
info!("MQTT connection loop exit");
});
client.subscribe("rust-esp32-std-demo", QoS::AtMostOnce)?;
info!("Subscribed to all topics (rust-esp32-std-demo)");
client.publish(
"rust-esp32-std-demo",
QoS::AtMostOnce,
false,
"Hello from rust-esp32-std-demo!".as_bytes(),
)?;
info!("Published a hello message to topic \"rust-esp32-std-demo\"");
Ok(client)
}

View File

@@ -1,4 +1,4 @@
use crate::conn::Config;
use crate::core::config::Config;
use esp_idf_svc::wifi::*;
use esp_idf_svc::sysloop::*;
@@ -12,18 +12,18 @@ use embedded_svc::ping::Ping;
use embedded_svc::ipv4;
use log::*;
use anyhow::bail;
use std::time::Duration;
use std::sync::Arc;
use std::thread;
#[allow(dead_code)]
pub fn start_client(
netif_stack: Arc<EspNetifStack>,
sys_loop_stack: Arc<EspSysLoopStack>,
default_nvs: Arc<EspDefaultNvs>,
config: &Config,
) -> Result<Box<EspWifi>> {
let netif_stack = Arc::new(EspNetifStack::new()?);
let sys_loop_stack = Arc::new(EspSysLoopStack::new()?);
let mut wifi = Box::new(EspWifi::new(netif_stack, sys_loop_stack, default_nvs)?);
let ap_infos = wifi.scan()?;
let ssid = config.ssid.as_str();
@@ -47,16 +47,7 @@ pub fn start_client(
}
))?;
// not working
info!("Wifi client configuration set, about to get status");
match wifi.wait_status_with_timeout(Duration::from_secs(20), |status| !status.is_transitional()) {
Ok(_) => (),
Err(e) => warn!("Unexpected Wifi status: {:?}", e),
};
let status = wifi.get_status();
println!("=> wifi STATUS 1 {:?}", status);
info!("...Wifi client configuration set, AGAIN get status");
info!("...Wifi client configuration set, get status");
match wifi.wait_status_with_timeout(Duration::from_secs(20), |status| !status.is_transitional()) {
Ok(_) => (),
Err(e) => warn!("Unexpected Wifi status: {:?}", e),
@@ -84,10 +75,11 @@ pub fn start_client(
#[allow(dead_code)]
pub fn start_server(
netif_stack: Arc<EspNetifStack>,
sys_loop_stack: Arc<EspSysLoopStack>,
default_nvs: Arc<EspDefaultNvs>,
) -> Result<Box<EspWifi>> {
let netif_stack = Arc::new(EspNetifStack::new()?);
let sys_loop_stack = Arc::new(EspSysLoopStack::new()?);
let mut wifi = Box::new(EspWifi::new(netif_stack, sys_loop_stack, default_nvs)?);
wifi.set_configuration(&Configuration::AccessPoint(
AccessPointConfiguration {

View File

@@ -0,0 +1,67 @@
use crate::conn;
use anyhow::Result;
use std::sync::{Condvar, Mutex, Arc};
use std::time::Duration;
use serde::{Serialize, Deserialize};
use esp_idf_svc::nvs::*;
use esp_idf_svc::wifi::*;
use embedded_svc::wifi::*;
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Config {
pub broker: String,
pub ssid: String,
pub pass: String,
}
/*
52.91.253.115:1883
arp -a
http://192.168.71.1/?broker=52.91.253.115%3A1883
*/
pub fn start_client(default_nvs: Arc<EspDefaultNvs>, config: &Config) -> Result<Box<EspWifi>> {
let wifi = conn::wifi::start_client(
default_nvs,
config
)?;
println!("CLIENT CONNECTED!!!!!! {:?}", wifi.get_status());
Ok(wifi)
}
pub fn start_server_and_wait(default_nvs: Arc<EspDefaultNvs>) -> Result<(Box<EspWifi>, Config)> {
let mutex = Arc::new((Mutex::new(None), Condvar::new()));
#[allow(clippy::redundant_clone)]
#[allow(unused_mut)]
let mut wifi = conn::wifi::start_server(
default_nvs.clone(),
)?;
let httpd = conn::http::config_server(mutex.clone());
let mut wait = mutex.0.lock().unwrap();
let config: &Config = loop {
if let Some(conf) = &*wait {
break conf;
} else {
wait = mutex
.1
.wait_timeout(wait, Duration::from_secs(1))
.unwrap()
.0;
}
};
drop(httpd);
// drop(wifi);
// thread::sleep(Duration::from_secs(1));
println!("===> config! {:?}", config);
Ok((wifi, config.clone()))
}

View File

@@ -0,0 +1,55 @@
use esp_idf_svc::eventloop::*;
use embedded_svc::httpd::Result;
use esp_idf_sys::{self, c_types};
use log::*;
const MSG_SIZE: usize = 256;
#[derive(Copy, Clone, Debug)]
pub struct Message([u8; MSG_SIZE]);
impl Message {
pub fn new(bytes: [u8; MSG_SIZE]) -> Self {
Self(bytes)
}
}
impl EspTypedEventSource for Message {
fn source() -> *const c_types::c_char {
b"DEMO-SERVICE\0".as_ptr() as *const _
}
}
impl EspTypedEventSerializer<Message> for Message {
fn serialize<R>(
event: &Message,
f: impl for<'a> FnOnce(&'a EspEventPostData) -> R,
) -> R {
f(&unsafe { EspEventPostData::new(Self::source(), Self::event_id(), event) })
}
}
impl EspTypedEventDeserializer<Message> for Message {
fn deserialize<R>(
data: &EspEventFetchData,
f: &mut impl for<'a> FnMut(&'a Message) -> R,
) -> R {
f(unsafe { data.as_payload() })
}
}
pub fn make_eventloop() -> Result<(EspBackgroundEventLoop, EspBackgroundSubscription)> {
use embedded_svc::event_bus::EventBus;
info!("About to start a background event loop");
let mut eventloop = EspBackgroundEventLoop::new(&Default::default())?;
info!("About to subscribe to the background event loop");
let subscription = eventloop.subscribe(|message: &Message| {
info!("!!! Got message from the event loop"); //: {:?}", message.0);
})?;
// let subscription = eventloop.subscribe(cb)?;
Ok((eventloop, subscription))
}

View File

@@ -0,0 +1,2 @@
pub mod events;
pub mod config;

View File

@@ -1,27 +1,21 @@
#![allow(unused_imports)]
mod conn;
mod core;
use crate::core::{events::*, config::*};
use sphinx_key_signer;
use esp_idf_sys as _; // If using the `binstart` feature of `esp-idf-sys`, always keep this module imported
use std::thread;
use log::*;
use std::sync::{Condvar, Mutex, Arc, atomic::*};
use std::time::*;
use std::sync::Arc;
use std::time::Duration;
use anyhow::Result;
use esp_idf_svc::nvs::*;
use esp_idf_svc::nvs_storage::EspNvsStorage;
use esp_idf_svc::netif::*;
use esp_idf_svc::eventloop::*;
use esp_idf_svc::sysloop::*;
use esp_idf_svc::wifi::*;
use embedded_svc::httpd::*;
use embedded_svc::wifi::*;
use embedded_svc::storage::Storage;
// use log::*;
// use url;
use embedded_svc::wifi::Wifi;
use embedded_svc::event_bus::EventBus;
fn main() -> Result<()> {
// Temporary. Will disappear once ESP-IDF 4.4 is released, but for now it is necessary to call this function once,
@@ -35,20 +29,26 @@ fn main() -> Result<()> {
thread::sleep(Duration::from_secs(1));
let default_nvs = Arc::new(EspDefaultNvs::new()?);
// let storage = Arc::new(Mutex::new(EspNvsStorage::new_default(default_nvs.clone(), "sphinx", true).expect("NVS FAIL")));
let mut store = EspNvsStorage::new_default(default_nvs.clone(), "sphinx", true).expect("no storage");
// uncomment to clear:
// store.remove("config").expect("couldnt remove config 1");
let existing: Option<conn::Config> = store.get("config").expect("failed");
let existing: Option<Config> = store.get("config").expect("failed");
if let Some(exist) = existing {
println!("=============> START CLIENT NOW <============== {:?}", exist);
// store.remove("config").expect("couldnt remove config");
if let Err(e) = start_client(default_nvs.clone(), &exist) {
error!("CLIENT ERROR {:?}", e);
let wifi = start_client(default_nvs.clone(), &exist)?;
// if the subscription goes out of scope its dropped
// the sub needs to publish back to mqtt???
let (eventloop, _sub) = make_eventloop()?;
let mqtt_client = conn::mqtt::mqtt_client(&exist.broker, eventloop)?;
println!("{:?}", wifi.get_status());
for s in 0..60 {
log::info!("Shutting down in {} secs", 60 - s);
thread::sleep(Duration::from_secs(1));
}
drop(wifi);
} else {
println!("=============> START SERVER NOW AND WAIT <==============");
if let Ok((mut wifi, config)) = start_server_and_wait(default_nvs.clone()) {
if let Ok((wifi, config)) = start_server_and_wait(default_nvs.clone()) {
store.put("config", &config).expect("could not store config");
println!("CONFIG SAVED");
drop(wifi);
@@ -58,62 +58,3 @@ fn main() -> Result<()> {
Ok(())
}
fn start_server_and_wait(default_nvs: Arc<EspDefaultNvs>) -> Result<(Box<EspWifi>, conn::Config)> {
let netif_stack = Arc::new(EspNetifStack::new()?);
let sys_loop_stack = Arc::new(EspSysLoopStack::new()?);
let mutex = Arc::new((Mutex::new(None), Condvar::new()));
#[allow(clippy::redundant_clone)]
#[allow(unused_mut)]
let mut wifi = conn::wifi::start_server(
netif_stack.clone(),
sys_loop_stack.clone(),
default_nvs.clone(),
)?;
let httpd = conn::http::config_server(mutex.clone());
let mut wait = mutex.0.lock().unwrap();
let config: &conn::Config = loop {
if let Some(conf) = &*wait {
break conf;
} else {
wait = mutex
.1
.wait_timeout(wait, Duration::from_secs(1))
.unwrap()
.0;
}
};
drop(httpd);
// drop(wifi);
// thread::sleep(Duration::from_secs(1));
println!("===> config! {:?}", config);
Ok((wifi, config.clone()))
}
fn start_client(default_nvs: Arc<EspDefaultNvs>, config: &conn::Config) -> Result<()> {
let netif_stack = Arc::new(EspNetifStack::new()?);
let sys_loop_stack = Arc::new(EspSysLoopStack::new()?);
let wifi = conn::wifi::start_client(
netif_stack.clone(),
sys_loop_stack.clone(),
default_nvs.clone(),
config
)?;
println!("CLIENT CONNECTED!!!!!! {:?}", wifi.get_status());
let mut i = 0;
loop {
thread::sleep(Duration::from_secs(5));
i = i + 1;
println!("wait forever... {}", i);
}
Ok(())
}