impl internal Event enum, resubscribe if broker restarts

This commit is contained in:
Evan Feenstra
2022-06-12 11:19:06 -07:00
parent a1ecd05526
commit 94f9f8d733
3 changed files with 74 additions and 54 deletions

View File

@@ -16,9 +16,9 @@ const SUB_TOPIC: &str = "sphinx-return";
const PUB_TOPIC: &str = "sphinx"; const PUB_TOPIC: &str = "sphinx";
const USERNAME: &str = "sphinx-key"; const USERNAME: &str = "sphinx-key";
const PASSWORD: &str = "sphinx-key-pass"; const PASSWORD: &str = "sphinx-key-pass";
// must get a reply within this time, or disconnects
const REPLY_TIMEOUT_MS: u64 = 1000; const REPLY_TIMEOUT_MS: u64 = 1000;
// static CONNECTED: OnceCell<bool> = OnceCell::new();
static CONNECTED: SyncLazy<Mutex<bool>> = SyncLazy::new(|| Mutex::new(false)); static CONNECTED: SyncLazy<Mutex<bool>> = SyncLazy::new(|| Mutex::new(false));
fn set_connected(b: bool) { fn set_connected(b: bool) {
*CONNECTED.lock().unwrap() = b; *CONNECTED.lock().unwrap() = b;
@@ -41,9 +41,6 @@ pub fn start_broker(
router.start().expect("could not start router"); router.start().expect("could not start router");
}); });
// let mut client_connected = AtomicBool::new(false);
// CONNECTED.set(false).expect("could init CONNECTED");
let mut rt_builder = tokio::runtime::Builder::new_multi_thread(); let mut rt_builder = tokio::runtime::Builder::new_multi_thread();
rt_builder.enable_all(); rt_builder.enable_all();
let rt = rt_builder.build().unwrap(); let rt = rt_builder.build().unwrap();
@@ -167,7 +164,7 @@ fn config() -> Config {
}; };
let mut servers = HashMap::new(); let mut servers = HashMap::new();
servers.insert( servers.insert(
"0".to_string(), id.to_string(),
ServerSettings { ServerSettings {
cert: None, cert: None,
listen: SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 1883).into(), listen: SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 1883).into(),

View File

@@ -1,10 +1,11 @@
use crate::core::events::Event as CoreEvent;
use embedded_svc::mqtt::client::utils::ConnState; use embedded_svc::mqtt::client::utils::ConnState;
use embedded_svc::mqtt::client::{Client, Connection, MessageImpl, Publish, QoS, Event, Message as MqttMessage}; use embedded_svc::mqtt::client::{Connection, MessageImpl, QoS, Event, Message as MqttMessage};
use embedded_svc::mqtt::client::utils::Connection as MqttConnection; use embedded_svc::mqtt::client::utils::Connection as MqttConnection;
use esp_idf_svc::mqtt::client::*; use esp_idf_svc::mqtt::client::*;
use anyhow::Result; use anyhow::Result;
use log::*; use log::*;
use std::time::Duration;
use std::thread; use std::thread;
use esp_idf_sys::{self}; use esp_idf_sys::{self};
use esp_idf_sys::EspError; use esp_idf_sys::EspError;
@@ -15,6 +16,7 @@ pub const TOPIC: &str = "sphinx";
pub const RETURN_TOPIC: &str = "sphinx-return"; pub const RETURN_TOPIC: &str = "sphinx-return";
pub const USERNAME: &str = "sphinx-key"; pub const USERNAME: &str = "sphinx-key";
pub const PASSWORD: &str = "sphinx-key-pass"; pub const PASSWORD: &str = "sphinx-key-pass";
pub const QOS: QoS = QoS::AtMostOnce;
pub fn make_client(broker: &str, client_id: &str) -> Result<( pub fn make_client(broker: &str, client_id: &str) -> Result<(
EspMqttClient<ConnState<MessageImpl, EspError>>, EspMqttClient<ConnState<MessageImpl, EspError>>,
@@ -33,32 +35,20 @@ pub fn make_client(broker: &str, client_id: &str) -> Result<(
let b = format!("mqtt://{}", broker); let b = format!("mqtt://{}", broker);
// let (mut client, mut connection) = EspMqttClient::new_with_conn(b, &conf)?; // let (mut client, mut connection) = EspMqttClient::new_with_conn(b, &conf)?;
let cc = loop { let cc = EspMqttClient::new_with_conn(b, &conf)?;
let broker_url = b.clone();
info!("===> CONNECT TO {}", &broker_url);
match EspMqttClient::new_with_conn(broker_url, &conf) {
Ok(c_c) => {
info!("EspMqttClient::new_with_conn finished");
break c_c
},
Err(_) => {
thread::sleep(Duration::from_secs(1));
}
}
};
//
info!("MQTT client started"); info!("MQTT client started");
Ok(cc) Ok(cc)
} }
pub fn start_listening( pub fn start_listening(
mut client: EspMqttClient<ConnState<MessageImpl, EspError>>, client: EspMqttClient<ConnState<MessageImpl, EspError>>,
mut connection: MqttConnection<Condvar, MessageImpl, EspError>, mut connection: MqttConnection<Condvar, MessageImpl, EspError>,
tx: mpsc::Sender<Vec<u8>>, tx: mpsc::Sender<CoreEvent>,
) -> Result<EspMqttClient<ConnState<MessageImpl, EspError>>> { ) -> Result<EspMqttClient<ConnState<MessageImpl, EspError>>> {
// must start pumping before subscribe or publish will work // must start pumping before subscribe or publish will not work
thread::spawn(move || { thread::spawn(move || {
info!("MQTT Listening for messages"); info!("MQTT Listening for messages");
loop { loop {
@@ -73,20 +63,20 @@ pub fn start_listening(
}, },
Ok(msg) => { Ok(msg) => {
match msg { match msg {
Event::BeforeConnect => info!("RECEIVED BEFORE CONNECT MESSAGE"), Event::BeforeConnect => info!("RECEIVED BeforeConnect MESSAGE"),
Event::Connected(flag) => { Event::Connected(_flag) => {
if flag { info!("RECEIVED Connected MESSAGE");
info!("RECEIVED CONNECTED = TRUE MESSAGE"); tx.send(CoreEvent::Connected).expect("couldnt send Event::Connected");
} else {
info!("RECEIVED CONNECTED = FALSE MESSAGE");
}
}, },
Event::Disconnected => warn!("RECEIVED DISCONNECTION MESSAGE"), Event::Disconnected => {
Event::Subscribed(_mes_id) => info!("RECEIVED SUBSCRIBED MESSAGE"), warn!("RECEIVED Disconnected MESSAGE");
Event::Unsubscribed(_mes_id) => info!("RECEIVED UNSUBSCRIBED MESSAGE"), tx.send(CoreEvent::Disconnected).expect("couldnt send Event::Disconnected");
Event::Published(_mes_id) => info!("RECEIVED PUBLISHED MESSAGE"), },
Event::Received(msg) => tx.send(msg.data().to_vec()).expect("could send to TX"), Event::Subscribed(_mes_id) => info!("RECEIVED Subscribed MESSAGE"),
Event::Deleted(_mes_id) => info!("RECEIVED DELETED MESSAGE"), Event::Unsubscribed(_mes_id) => info!("RECEIVED Unsubscribed MESSAGE"),
Event::Published(_mes_id) => info!("RECEIVED Published MESSAGE"),
Event::Received(msg) => tx.send(CoreEvent::Message(msg.data().to_vec())).expect("couldnt send Event::Message"),
Event::Deleted(_mes_id) => info!("RECEIVED Deleted MESSAGE"),
} }
}, },
} }
@@ -97,8 +87,8 @@ pub fn start_listening(
//info!("MQTT connection loop exit"); //info!("MQTT connection loop exit");
}); });
log::info!("SUBSCRIBE TO {}", TOPIC); // log::info!("SUBSCRIBE TO {}", TOPIC);
client.subscribe(TOPIC, QoS::AtMostOnce)?; // client.subscribe(TOPIC, QoS::AtMostOnce)?;
Ok(client) Ok(client)
} }

View File

@@ -1,43 +1,76 @@
use crate::conn::mqtt::RETURN_TOPIC; use crate::conn::mqtt::{RETURN_TOPIC, TOPIC, QOS};
use sphinx_key_signer::{self, InitResponse, PubKey}; use sphinx_key_signer::{self, InitResponse, PubKey};
use std::sync::mpsc; use std::sync::mpsc;
use esp_idf_sys; use esp_idf_sys;
use embedded_svc::mqtt::client::Client;
use embedded_svc::httpd::Result; use embedded_svc::httpd::Result;
use embedded_svc::mqtt::client::utils::ConnState; use embedded_svc::mqtt::client::utils::ConnState;
use embedded_svc::mqtt::client::{MessageImpl, Publish, QoS}; use embedded_svc::mqtt::client::{MessageImpl, Publish};
use esp_idf_svc::mqtt::client::*; use esp_idf_svc::mqtt::client::*;
use esp_idf_sys::EspError; use esp_idf_sys::EspError;
use log::*; use log::*;
pub enum Event {
Connected,
Disconnected,
Message(Vec<u8>)
}
#[cfg(not(feature = "pingpong"))] #[cfg(not(feature = "pingpong"))]
pub fn make_event_loop(mut mqtt: EspMqttClient<ConnState<MessageImpl, EspError>>, rx: mpsc::Receiver<Vec<u8>>) -> Result<()> { pub fn make_event_loop(mut mqtt: EspMqttClient<ConnState<MessageImpl, EspError>>, rx: mpsc::Receiver<Event>) -> Result<()> {
// initialize the RootHandler // initialize the RootHandler
let init_msg_bytes = rx.recv().expect("NO INIT MSG"); let root_handler = while let Ok(event) = rx.recv() {
let InitResponse { root_handler, init_reply } = sphinx_key_signer::init(init_msg_bytes).expect("failed to init signer"); if let Event::Message(msg_bytes) = event {
mqtt.publish(RETURN_TOPIC, QoS::AtMostOnce, false, init_reply).expect("could not publish init response"); let InitResponse { root_handler, init_reply } = sphinx_key_signer::init(event.clone()).expect("failed to init signer");
mqtt.publish(RETURN_TOPIC, QOS, false, init_reply).expect("could not publish init response");
break root_handler
}
};
// signing loop // signing loop
let dummy_peer = PubKey([0; 33]); let dummy_peer = PubKey([0; 33]);
while let Ok(msg_bytes) = rx.recv() { while let Ok(msg_bytes) = rx.recv() {
let _ret = match sphinx_key_signer::handle(&root_handler, msg_bytes, dummy_peer.clone()) { match event {
Ok(b) => mqtt.publish(RETURN_TOPIC, QoS::AtMostOnce, false, b).expect("could not publish init response"), Event::Connected => {
log::info!("SUBSCRIBE TO {}", TOPIC);
mqtt.subscribe(TOPIC, QOS).expect("could not MQTT subscribe");
},
Event::Message(ref msg_bytes) => {
let _ret = match sphinx_key_signer::handle(&root_handler, msg_bytes.clone(), dummy_peer.clone()) {
Ok(b) => mqtt.publish(RETURN_TOPIC, QOS, false, b).expect("could not publish init response"),
Err(e) => panic!("HANDLE FAILED {:?}", e), Err(e) => panic!("HANDLE FAILED {:?}", e),
}; };
},
Event::Disconnected => {
log::info!("GOT A Event::Disconnected msg!");
}
}
} }
Ok(()) Ok(())
} }
#[cfg(feature = "pingpong")] #[cfg(feature = "pingpong")]
pub fn make_event_loop(mut mqtt: EspMqttClient<ConnState<MessageImpl, EspError>>, rx: mpsc::Receiver<Vec<u8>>) -> Result<()> { pub fn make_event_loop(mut mqtt: EspMqttClient<ConnState<MessageImpl, EspError>>, rx: mpsc::Receiver<Event>) -> Result<()> {
info!("About to subscribe to the mpsc channel"); info!("About to subscribe to the mpsc channel");
while let Ok(msg_bytes) = rx.recv() { while let Ok(event) = rx.recv() {
match event {
Event::Connected => {
log::info!("SUBSCRIBE TO {}", TOPIC);
mqtt.subscribe(TOPIC, QOS).expect("could not MQTT subscribe");
},
Event::Message(msg_bytes) => {
let b = sphinx_key_signer::parse_ping_and_form_response(msg_bytes); let b = sphinx_key_signer::parse_ping_and_form_response(msg_bytes);
log::info!("GOT A PING MESSAGE! returning pong now..."); log::info!("GOT A PING MESSAGE! returning pong now...");
mqtt.publish(RETURN_TOPIC, QoS::AtMostOnce, false, b).expect("could not publish init response"); mqtt.publish(RETURN_TOPIC, QOS, false, b).expect("could not publish init response");
},
Event::Disconnected => {
log::info!("GOT A Event::Disconnected msg!");
}
}
} }
Ok(()) Ok(())