make LSS optional if no muts are received

This commit is contained in:
Evan Feenstra
2023-06-02 15:40:11 -07:00
parent 2b1933534d
commit ea7e91d1c3
4 changed files with 34 additions and 24 deletions

View File

@@ -87,8 +87,8 @@ pub fn start_broker(
if let Err(e) = link_tx.publish(pub_topic, msg.message.clone()) {
log::error!("failed to pub to link_tx! {} {:?}", cid, e);
}
if let Ok((cid, topic, reply)) = msg_rx.recv() {
if let Err(_) = msg.reply_tx.send(ChannelReply { reply, topic }) {
if let Ok((cid, topic_end, reply)) = msg_rx.recv() {
if let Err(_) = msg.reply_tx.send(ChannelReply { reply, topic_end }) {
log::warn!("could not send on reply_tx {}", cid);
}
}
@@ -111,9 +111,10 @@ pub fn start_broker(
}
// and receive from the correct client (or timeout to next)
let dur = Duration::from_secs(9);
if let Ok((cid, topic, reply)) = msg_rx.recv_timeout(dur) {
if let Ok((cid, topic_end, reply)) = msg_rx.recv_timeout(dur) {
if &cid == client {
if let Err(_) = msg.reply_tx.send(ChannelReply { reply, topic }) {
if let Err(_) = msg.reply_tx.send(ChannelReply { reply, topic_end })
{
log::warn!("could not send on reply_tx");
}
break 'retry_loop;
@@ -149,8 +150,10 @@ pub fn start_broker(
continue;
}
let cid = ts[0].to_string();
let topic = ts[1].to_string();
if let Err(e) = msg_tx.send((cid, topic, f.publish.payload.to_vec())) {
let topic_end = ts[1].to_string();
if let Err(e) =
msg_tx.send((cid, topic_end, f.publish.payload.to_vec()))
{
log::error!("failed to pub to msg_tx! {:?}", e);
}
}