retries for client_id-specific sends

This commit is contained in:
Evan Feenstra
2023-06-05 19:45:31 -07:00
parent c4cff01520
commit c19ae08155

View File

@@ -81,50 +81,35 @@ pub fn start_broker(
let conns_ = connections.clone(); let conns_ = connections.clone();
let _relay_task = std::thread::spawn(move || { let _relay_task = std::thread::spawn(move || {
while let Some(msg) = receiver.blocking_recv() { while let Some(msg) = receiver.blocking_recv() {
if let Some(cid) = msg.cid { loop {
// for a specific client let reply = if let Some(cid) = msg.cid.clone() {
let pub_topic = format!("{}/{}", cid, msg.topic); // for a specific client
if let Err(e) = link_tx.publish(pub_topic, msg.message.clone()) { pub_wait(&cid, &msg.topic, &msg.message, &msg_rx, &mut link_tx)
log::error!("failed to pub to link_tx! {} {:?}", cid, e); } else {
} // send to each client in turn
if let Ok((cid, topic_end, reply)) = msg_rx.recv() {
if let Err(_) = msg.reply_tx.send(ChannelReply { reply, topic_end }) {
log::warn!("could not send on reply_tx {}", cid);
}
}
} else {
// send to each client in turn
'retry_loop: loop {
// get the current list of connected clients
let cs = conns_.lock().unwrap(); let cs = conns_.lock().unwrap();
let client_list = cs.clients.clone(); let client_list = cs.clients.clone();
drop(cs); drop(cs);
// wait a second if there are no clients // wait a second if there are no clients
if client_list.len() == 0 { if client_list.len() == 0 {
std::thread::sleep(Duration::from_secs(1)); std::thread::sleep(Duration::from_secs(1));
} None
for client in client_list.iter() { } else {
let pub_topic = format!("{}/{}", client, msg.topic); let mut rep = None;
log::info!("SENDING TO {} on topic {}", client, msg.topic); for cid in client_list.iter() {
if let Err(e) = link_tx.publish(pub_topic, msg.message.clone()) { rep = pub_wait(&cid, &msg.topic, &msg.message, &msg_rx, &mut link_tx);
log::error!("failed to pub to link_tx! {:?}", e); if let Some(_) = &rep {
} break;
// and receive from the correct client (or timeout to next)
let dur = Duration::from_secs(9);
if let Ok((cid, topic_end, reply)) = msg_rx.recv_timeout(dur) {
if &cid == client {
if let Err(_) = msg.reply_tx.send(ChannelReply { reply, topic_end })
{
log::warn!("could not send on reply_tx");
}
break 'retry_loop;
} else {
log::warn!("Mismatched client id!");
// wait a second before trying again
std::thread::sleep(Duration::from_secs(1));
} }
} }
rep
} }
};
if let Some(reply) = reply {
if let Err(_) = msg.reply_tx.send(reply) {
log::warn!("could not send on reply_tx");
}
break;
} }
} }
} }
@@ -173,6 +158,34 @@ pub fn start_broker(
Ok(()) Ok(())
} }
// publish to signer and wait for response
// if timeout is exceed, try next signer
fn pub_wait(
client_id: &str,
topic: &str,
payload: &[u8],
msg_rx: &std::sync::mpsc::Receiver<(String, String, Vec<u8>)>,
link_tx: &mut LinkTx,
) -> Option<ChannelReply> {
let pub_topic = format!("{}/{}", client_id, topic);
log::info!("SENDING TO {} on topic {}", client_id, topic);
if let Err(e) = link_tx.publish(pub_topic, payload.to_vec()) {
log::error!("failed to pub to link_tx! {:?}", e);
}
// and receive from the correct client (or timeout to next)
let dur = Duration::from_secs(9);
if let Ok((cid, topic_end, reply)) = msg_rx.recv_timeout(dur) {
if &cid == client_id {
return Some(ChannelReply { reply, topic_end });
} else {
log::warn!("Mismatched client id!");
// wait a second before trying again
std::thread::sleep(Duration::from_secs(1));
}
}
None
}
fn subs(cid: &str, mut ltx: LinkTx) { fn subs(cid: &str, mut ltx: LinkTx) {
ltx.subscribe(format!("{}/{}", cid, topics::VLS_RETURN)) ltx.subscribe(format!("{}/{}", cid, topics::VLS_RETURN))
.unwrap(); .unwrap();