broker: add try counter to pub_and_wait

set to none to retry indefinitely until a response is received
closes #96
This commit is contained in:
irriden
2023-07-22 19:26:31 +00:00
parent 1491546f40
commit 2cf6fd7c1f
2 changed files with 37 additions and 26 deletions

View File

@@ -1,5 +1,5 @@
use crate::conn::{ChannelRequest, LssReq};
use anyhow::Result;
use anyhow::{anyhow, Result};
use lss_connector::{InitResponse, LssBroker, Response, SignerMutations};
use rocket::tokio;
use rumqttd::oneshot;
@@ -75,17 +75,17 @@ pub fn lss_tasks(
let lss_conn_ = lss_conn.clone();
let init_tx_ = init_tx.clone();
tokio::task::spawn(async move {
while let Some((cid, connected, oneshot_send_tx)) = reconn_rx.recv().await {
while let Some((cid, connected, dance_complete_tx)) = reconn_rx.recv().await {
if connected {
log::info!("CLIENT {} reconnected!", cid);
if let Err(e) = reconnect_dance(&cid, &lss_conn_, &init_tx_).await {
log::error!("reconnect dance failed {:?}", e);
let _ = oneshot_send_tx.send(false);
let _ = dance_complete_tx.send(false);
} else {
let _ = oneshot_send_tx.send(true);
let _ = dance_complete_tx.send(true);
}
} else {
let _ = oneshot_send_tx.send(false);
let _ = dance_complete_tx.send(false);
}
}
});
@@ -97,19 +97,9 @@ async fn reconnect_dance(
mqtt_tx: &mpsc::Sender<ChannelRequest>,
) -> Result<()> {
log::debug!("Reconnect dance started, proceeding with step 1");
let ir = loop {
if let Ok(ir) = dance_step_1(cid, lss_conn, mqtt_tx).await {
break ir;
}
sleep(2).await;
};
let ir = dance_step_1(cid, lss_conn, mqtt_tx).await?;
log::debug!("Step 1 finished, now onto step 2");
loop {
if let Ok(_) = dance_step_2(cid, lss_conn, mqtt_tx, &ir).await {
break;
}
sleep(2).await;
}
let _ = dance_step_2(cid, lss_conn, mqtt_tx, &ir).await?;
log::debug!("Reconnect dance finished!");
Ok(())
}
@@ -122,7 +112,10 @@ async fn dance_step_1(
let init_bytes = lss_conn.make_init_msg().await?;
log::debug!("starting dance_step_1 send for {}", cid);
let reply = ChannelRequest::send_for(cid, topics::INIT_1_MSG, init_bytes, mqtt_tx).await?;
log::debug!("send for completed");
log::debug!("dance_step_1 send for completed");
if reply.is_empty() {
return Err(anyhow!("dance step 1 did not complete"));
}
let ir = Response::from_slice(&reply)?.into_init()?;
Ok(ir)
}
@@ -136,7 +129,10 @@ async fn dance_step_2(
let state_bytes = lss_conn.get_created_state_msg(ir).await?;
log::debug!("starting dance_step_2 send for {}", cid);
let reply2 = ChannelRequest::send_for(cid, topics::INIT_2_MSG, state_bytes, mqtt_tx).await?;
log::debug!("send for completed");
log::debug!("dance_step_2 send for completed");
if reply2.is_empty() {
return Err(anyhow!("dance step 2 did not complete"));
}
let cr = Response::from_slice(&reply2)?.into_created()?;
lss_conn.handle(Response::Created(cr)).await?;
Ok(())

View File

@@ -63,7 +63,8 @@ pub fn start_broker(
// receive replies from LSS initialization
let _init_task = std::thread::spawn(move || {
while let Some(msg) = init_receiver.blocking_recv() {
pub_and_wait(msg, &conns_, &init_rx, &mut link_tx_);
// Retry three times
pub_and_wait(msg, &conns_, &init_rx, &mut link_tx_, Some(3));
}
});
@@ -74,7 +75,14 @@ pub fn start_broker(
let _relay_task = std::thread::spawn(move || {
while let Some(msg) = receiver.blocking_recv() {
log::debug!("Received message here: {:?}", msg);
pub_and_wait(msg, &connections, &msg_rx, &mut link_tx);
let retries = if msg.topic == topics::CONTROL {
// Don't retry
Some(0)
} else {
// Retry indefinitely
None
};
pub_and_wait(msg, &connections, &msg_rx, &mut link_tx, retries);
}
});
@@ -143,7 +151,9 @@ fn pub_and_wait(
conns_: &Arc<Mutex<Connections>>,
msg_rx: &std::sync::mpsc::Receiver<(String, String, Vec<u8>)>,
link_tx: &mut LinkTx,
retries: Option<u8>,
) {
let mut counter = 0u8;
loop {
log::debug!("looping in pub_and_wait");
let reply = if let Some(cid) = msg.cid.clone() {
@@ -152,11 +162,7 @@ fn pub_and_wait(
let res_opt = pub_timeout(&cid, &msg.topic, &msg.message, &msg_rx, link_tx);
log::debug!("client responded!");
// control topic should be able to fail early without retrying
if res_opt.is_none() && msg.topic == topics::CONTROL {
Some(ChannelReply::empty())
} else {
res_opt
}
res_opt
} else {
log::debug!("publishing to all clients");
let cs = conns_.lock().unwrap();
@@ -186,6 +192,15 @@ fn pub_and_wait(
}
break;
}
if let Some(attempt) = retries {
if counter == attempt {
if let Err(_) = msg.reply_tx.send(ChannelReply::empty()) {
log::warn!("could not send on reply_tx");
}
break;
}
}
counter = counter + 1;
}
}