sleep before lss reconnect dance, cleaner mqtt sub topics

This commit is contained in:
Evan Feenstra
2023-06-05 21:36:49 -07:00
parent 391354959d
commit 11a5bff5f1
3 changed files with 12 additions and 8 deletions

View File

@@ -3,6 +3,7 @@ use rocket::tokio::{
self, self,
sync::{mpsc}, sync::{mpsc},
}; };
use std::time::Duration;
use crate::conn::{ChannelRequest, LssReq}; use crate::conn::{ChannelRequest, LssReq};
use lss_connector::{LssBroker, Response}; use lss_connector::{LssBroker, Response};
use sphinx_signer::sphinx_glyph::topics; use sphinx_signer::sphinx_glyph::topics;
@@ -58,6 +59,8 @@ pub fn lss_tasks(lss_conn: LssBroker, mut lss_rx: mpsc::Receiver<LssReq>, mut re
} }
async fn reconnect_dance(cid: &str, lss_conn: &LssBroker, mqtt_tx: &mpsc::Sender<ChannelRequest>) -> Result<()> { async fn reconnect_dance(cid: &str, lss_conn: &LssBroker, mqtt_tx: &mpsc::Sender<ChannelRequest>) -> Result<()> {
// sleep 3 seconds to make sure ESP32 subscription is active
tokio::time::sleep(Duration::from_secs(3)).await;
let init_bytes = lss_conn.make_init_msg().await?; let init_bytes = lss_conn.make_init_msg().await?;
let reply = ChannelRequest::send_for(cid, topics::LSS_MSG, init_bytes, mqtt_tx).await?; let reply = ChannelRequest::send_for(cid, topics::LSS_MSG, init_bytes, mqtt_tx).await?;
let ir = Response::from_slice(&reply)?.as_init()?; let ir = Response::from_slice(&reply)?.as_init()?;

View File

@@ -20,7 +20,7 @@ use rocket::tokio::{
sync::{broadcast, mpsc}, sync::{broadcast, mpsc},
}; };
use rumqttd::{oneshot as std_oneshot, AuthMsg}; use rumqttd::{oneshot as std_oneshot, AuthMsg};
use std::env; use std::{env, time::Duration};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use url::Url; use url::Url;
use vls_frontend::{frontend::SourceFactory, Frontend}; use vls_frontend::{frontend::SourceFactory, Frontend};
@@ -78,6 +78,8 @@ async fn run_main(parent_fd: i32) -> rocket::Rocket<rocket::Build> {
// waits until first connection // waits until first connection
let conns = broker_setup(settings, mqtt_rx, reconn_tx.clone(), error_tx.clone()).await; let conns = broker_setup(settings, mqtt_rx, reconn_tx.clone(), error_tx.clone()).await;
tokio::time::sleep(Duration::from_secs(1)).await;
let (lss_tx, lss_rx) = mpsc::channel::<LssReq>(10000); let (lss_tx, lss_rx) = mpsc::channel::<LssReq>(10000);
let _lss_broker = if let Ok(lss_uri) = env::var("VLS_LSS") { let _lss_broker = if let Ok(lss_uri) = env::var("VLS_LSS") {
// waits until LSS confirmation from signer // waits until LSS confirmation from signer
@@ -89,7 +91,7 @@ async fn run_main(parent_fd: i32) -> rocket::Rocket<rocket::Build> {
Err(e) => { Err(e) => {
let _ = error_tx.send(e.to_string().as_bytes().to_vec()); let _ = error_tx.send(e.to_string().as_bytes().to_vec());
log::error!("failed LSS setup, trying again..."); log::error!("failed LSS setup, trying again...");
tokio::time::sleep(std::time::Duration::from_secs(3)).await; tokio::time::sleep(Duration::from_secs(3)).await;
} }
} }
}; };

View File

@@ -50,6 +50,8 @@ pub enum Status {
pub const ROOT_STORE: &str = "/sdcard/store"; pub const ROOT_STORE: &str = "/sdcard/store";
pub const SUB_TOPICS: &[&str] = &[topics::VLS, topics::LSS_MSG, topics::CONTROL];
fn mqtt_sub( fn mqtt_sub(
mqtt: &mut EspMqttClient<ConnState<MessageImpl, EspError>>, mqtt: &mut EspMqttClient<ConnState<MessageImpl, EspError>>,
client_id: &str, client_id: &str,
@@ -93,8 +95,7 @@ pub fn make_event_loop(
// wait for a Connection first. // wait for a Connection first.
match event { match event {
Event::Connected => { Event::Connected => {
let ts = &[topics::VLS, topics::LSS_MSG, topics::CONTROL]; mqtt_sub(&mut mqtt, client_id, SUB_TOPICS);
mqtt_sub(&mut mqtt, client_id, ts);
break; break;
} }
_ => (), _ => (),
@@ -127,8 +128,7 @@ pub fn make_event_loop(
while let Ok(event) = rx.recv() { while let Ok(event) = rx.recv() {
match event { match event {
Event::Connected => { Event::Connected => {
let ts = &[topics::VLS, topics::LSS_MSG, topics::CONTROL]; mqtt_sub(&mut mqtt, client_id, SUB_TOPICS);
mqtt_sub(&mut mqtt, client_id, ts);
led_tx.send(Status::Connected).unwrap(); led_tx.send(Status::Connected).unwrap();
} }
Event::Disconnected => { Event::Disconnected => {
@@ -274,8 +274,7 @@ pub fn make_event_loop(
match event { match event {
Event::Connected => { Event::Connected => {
led_tx.send(Status::ConnectedToMqtt).unwrap(); led_tx.send(Status::ConnectedToMqtt).unwrap();
let ts = &[topics::VLS]; mqtt_sub(&mut mqtt, client_id, &[topics::VLS]);
mqtt_sub(&mut mqtt, client_id, ts);
} }
Event::VlsMessage(msg_bytes) => { Event::VlsMessage(msg_bytes) => {
led_tx.send(Status::Signing).unwrap(); led_tx.send(Status::Signing).unwrap();