diff --git a/broker/src/mqtt.rs b/broker/src/mqtt.rs index b188413..0e3f51a 100644 --- a/broker/src/mqtt.rs +++ b/broker/src/mqtt.rs @@ -172,20 +172,43 @@ fn pub_and_wait( std::thread::sleep(Duration::from_secs(1)); None } else { - let current = current.unwrap(); + let current_cid = current.clone().unwrap(); // Try the current connection - let mut rep = pub_timeout(¤t, &msg.topic, &msg.message, &msg_rx, link_tx); + let mut rep = pub_timeout(¤t_cid, &msg.topic, &msg.message, &msg_rx, link_tx); + + // We restart the loop in case a new signer connects while pinging for connections + // as this could mean that the LSS state advanced broker side only, and signer side + // it's one step behind. Here and also in the for loop. + // We do it as soon as we know. + let cs = conns_.lock().unwrap(); + let new_current = cs.current.clone(); + drop(cs); + if new_current != current { + log::warn!("Client list changed, starting over!"); + counter = 0; + continue; + } + // If that failed, try looking for some other signer if rep.is_none() { - for cid in client_list.into_keys().filter(|k| k != ¤t) { + for cid in client_list.into_keys().filter(|k| k != ¤t_cid) { rep = pub_timeout(&cid, &msg.topic, &msg.message, &msg_rx, link_tx); + let mut cs = conns_.lock().unwrap(); + log::debug!("got the list lock!"); + let new_current = cs.current.clone(); + if new_current != current { + log::info!("Client list changed, starting over!"); + drop(cs); + counter = 0; + rep = None; + break; + } if rep.is_some() { - let mut cs = conns_.lock().unwrap(); - log::debug!("got the list lock!"); cs.set_current(cid.to_string()); drop(cs); break; } + drop(cs); } } rep