diff --git a/broker/src/lss.rs b/broker/src/lss.rs index 7002f65..1db34ed 100644 --- a/broker/src/lss.rs +++ b/broker/src/lss.rs @@ -1,5 +1,5 @@ use crate::conn::{ChannelRequest, LssReq}; -use anyhow::Result; +use anyhow::{anyhow, Result}; use lss_connector::{InitResponse, LssBroker, Response, SignerMutations}; use rocket::tokio; use rumqttd::oneshot; @@ -75,17 +75,17 @@ pub fn lss_tasks( let lss_conn_ = lss_conn.clone(); let init_tx_ = init_tx.clone(); tokio::task::spawn(async move { - while let Some((cid, connected, oneshot_send_tx)) = reconn_rx.recv().await { + while let Some((cid, connected, dance_complete_tx)) = reconn_rx.recv().await { if connected { log::info!("CLIENT {} reconnected!", cid); if let Err(e) = reconnect_dance(&cid, &lss_conn_, &init_tx_).await { log::error!("reconnect dance failed {:?}", e); - let _ = oneshot_send_tx.send(false); + let _ = dance_complete_tx.send(false); } else { - let _ = oneshot_send_tx.send(true); + let _ = dance_complete_tx.send(true); } } else { - let _ = oneshot_send_tx.send(false); + let _ = dance_complete_tx.send(false); } } }); @@ -97,19 +97,9 @@ async fn reconnect_dance( mqtt_tx: &mpsc::Sender, ) -> Result<()> { log::debug!("Reconnect dance started, proceeding with step 1"); - let ir = loop { - if let Ok(ir) = dance_step_1(cid, lss_conn, mqtt_tx).await { - break ir; - } - sleep(2).await; - }; + let ir = dance_step_1(cid, lss_conn, mqtt_tx).await?; log::debug!("Step 1 finished, now onto step 2"); - loop { - if let Ok(_) = dance_step_2(cid, lss_conn, mqtt_tx, &ir).await { - break; - } - sleep(2).await; - } + let _ = dance_step_2(cid, lss_conn, mqtt_tx, &ir).await?; log::debug!("Reconnect dance finished!"); Ok(()) } @@ -122,7 +112,10 @@ async fn dance_step_1( let init_bytes = lss_conn.make_init_msg().await?; log::debug!("starting dance_step_1 send for {}", cid); let reply = ChannelRequest::send_for(cid, topics::INIT_1_MSG, init_bytes, mqtt_tx).await?; - log::debug!("send for completed"); + log::debug!("dance_step_1 send for completed"); + if reply.is_empty() { + return Err(anyhow!("dance step 1 did not complete")); + } let ir = Response::from_slice(&reply)?.into_init()?; Ok(ir) } @@ -136,7 +129,10 @@ async fn dance_step_2( let state_bytes = lss_conn.get_created_state_msg(ir).await?; log::debug!("starting dance_step_2 send for {}", cid); let reply2 = ChannelRequest::send_for(cid, topics::INIT_2_MSG, state_bytes, mqtt_tx).await?; - log::debug!("send for completed"); + log::debug!("dance_step_2 send for completed"); + if reply2.is_empty() { + return Err(anyhow!("dance step 2 did not complete")); + } let cr = Response::from_slice(&reply2)?.into_created()?; lss_conn.handle(Response::Created(cr)).await?; Ok(()) diff --git a/broker/src/mqtt.rs b/broker/src/mqtt.rs index 3fdf997..a0ebc2d 100644 --- a/broker/src/mqtt.rs +++ b/broker/src/mqtt.rs @@ -63,7 +63,8 @@ pub fn start_broker( // receive replies from LSS initialization let _init_task = std::thread::spawn(move || { while let Some(msg) = init_receiver.blocking_recv() { - pub_and_wait(msg, &conns_, &init_rx, &mut link_tx_); + // Retry three times + pub_and_wait(msg, &conns_, &init_rx, &mut link_tx_, Some(3)); } }); @@ -74,7 +75,14 @@ pub fn start_broker( let _relay_task = std::thread::spawn(move || { while let Some(msg) = receiver.blocking_recv() { log::debug!("Received message here: {:?}", msg); - pub_and_wait(msg, &connections, &msg_rx, &mut link_tx); + let retries = if msg.topic == topics::CONTROL { + // Don't retry + Some(0) + } else { + // Retry indefinitely + None + }; + pub_and_wait(msg, &connections, &msg_rx, &mut link_tx, retries); } }); @@ -143,7 +151,9 @@ fn pub_and_wait( conns_: &Arc>, msg_rx: &std::sync::mpsc::Receiver<(String, String, Vec)>, link_tx: &mut LinkTx, + retries: Option, ) { + let mut counter = 0u8; loop { log::debug!("looping in pub_and_wait"); let reply = if let Some(cid) = msg.cid.clone() { @@ -152,11 +162,7 @@ fn pub_and_wait( let res_opt = pub_timeout(&cid, &msg.topic, &msg.message, &msg_rx, link_tx); log::debug!("client responded!"); // control topic should be able to fail early without retrying - if res_opt.is_none() && msg.topic == topics::CONTROL { - Some(ChannelReply::empty()) - } else { - res_opt - } + res_opt } else { log::debug!("publishing to all clients"); let cs = conns_.lock().unwrap(); @@ -186,6 +192,15 @@ fn pub_and_wait( } break; } + if let Some(attempt) = retries { + if counter == attempt { + if let Err(_) = msg.reply_tx.send(ChannelReply::empty()) { + log::warn!("could not send on reply_tx"); + } + break; + } + } + counter = counter + 1; } }