diff --git a/broker/src/mqtt.rs b/broker/src/mqtt.rs index ed1d795..d1f882c 100644 --- a/broker/src/mqtt.rs +++ b/broker/src/mqtt.rs @@ -81,50 +81,35 @@ pub fn start_broker( let conns_ = connections.clone(); let _relay_task = std::thread::spawn(move || { while let Some(msg) = receiver.blocking_recv() { - if let Some(cid) = msg.cid { - // for a specific client - let pub_topic = format!("{}/{}", cid, msg.topic); - if let Err(e) = link_tx.publish(pub_topic, msg.message.clone()) { - log::error!("failed to pub to link_tx! {} {:?}", cid, e); - } - if let Ok((cid, topic_end, reply)) = msg_rx.recv() { - if let Err(_) = msg.reply_tx.send(ChannelReply { reply, topic_end }) { - log::warn!("could not send on reply_tx {}", cid); - } - } - } else { - // send to each client in turn - 'retry_loop: loop { - // get the current list of connected clients + loop { + let reply = if let Some(cid) = msg.cid.clone() { + // for a specific client + pub_wait(&cid, &msg.topic, &msg.message, &msg_rx, &mut link_tx) + } else { + // send to each client in turn let cs = conns_.lock().unwrap(); let client_list = cs.clients.clone(); drop(cs); // wait a second if there are no clients if client_list.len() == 0 { std::thread::sleep(Duration::from_secs(1)); - } - for client in client_list.iter() { - let pub_topic = format!("{}/{}", client, msg.topic); - log::info!("SENDING TO {} on topic {}", client, msg.topic); - if let Err(e) = link_tx.publish(pub_topic, msg.message.clone()) { - log::error!("failed to pub to link_tx! {:?}", e); - } - // and receive from the correct client (or timeout to next) - let dur = Duration::from_secs(9); - if let Ok((cid, topic_end, reply)) = msg_rx.recv_timeout(dur) { - if &cid == client { - if let Err(_) = msg.reply_tx.send(ChannelReply { reply, topic_end }) - { - log::warn!("could not send on reply_tx"); - } - break 'retry_loop; - } else { - log::warn!("Mismatched client id!"); - // wait a second before trying again - std::thread::sleep(Duration::from_secs(1)); + None + } else { + let mut rep = None; + for cid in client_list.iter() { + rep = pub_wait(&cid, &msg.topic, &msg.message, &msg_rx, &mut link_tx); + if let Some(_) = &rep { + break; } } + rep } + }; + if let Some(reply) = reply { + if let Err(_) = msg.reply_tx.send(reply) { + log::warn!("could not send on reply_tx"); + } + break; } } } @@ -173,6 +158,34 @@ pub fn start_broker( Ok(()) } +// publish to signer and wait for response +// if timeout is exceed, try next signer +fn pub_wait( + client_id: &str, + topic: &str, + payload: &[u8], + msg_rx: &std::sync::mpsc::Receiver<(String, String, Vec)>, + link_tx: &mut LinkTx, +) -> Option { + let pub_topic = format!("{}/{}", client_id, topic); + log::info!("SENDING TO {} on topic {}", client_id, topic); + if let Err(e) = link_tx.publish(pub_topic, payload.to_vec()) { + log::error!("failed to pub to link_tx! {:?}", e); + } + // and receive from the correct client (or timeout to next) + let dur = Duration::from_secs(9); + if let Ok((cid, topic_end, reply)) = msg_rx.recv_timeout(dur) { + if &cid == client_id { + return Some(ChannelReply { reply, topic_end }); + } else { + log::warn!("Mismatched client id!"); + // wait a second before trying again + std::thread::sleep(Duration::from_secs(1)); + } + } + None +} + fn subs(cid: &str, mut ltx: LinkTx) { ltx.subscribe(format!("{}/{}", cid, topics::VLS_RETURN)) .unwrap();