From 1491546f404ec34aa44c50de984da1cd46dbe5b8 Mon Sep 17 00:00:00 2001 From: irriden Date: Sat, 22 Jul 2023 17:56:03 +0000 Subject: [PATCH] broker: add client id to client list only after it has completed the reconnect dance closes #97 --- broker/src/lss.rs | 17 +++++++++++++++-- broker/src/main.rs | 14 ++++++++++---- broker/src/mqtt.rs | 25 ++++++++++++------------- 3 files changed, 37 insertions(+), 19 deletions(-) diff --git a/broker/src/lss.rs b/broker/src/lss.rs index e43f0d2..7002f65 100644 --- a/broker/src/lss.rs +++ b/broker/src/lss.rs @@ -2,6 +2,7 @@ use crate::conn::{ChannelRequest, LssReq}; use anyhow::Result; use lss_connector::{InitResponse, LssBroker, Response, SignerMutations}; use rocket::tokio; +use rumqttd::oneshot; use sphinx_signer::sphinx_glyph::topics; use std::time::Duration; use tokio::sync::mpsc; @@ -52,7 +53,7 @@ async fn send_created( pub fn lss_tasks( lss_conn: LssBroker, mut lss_rx: mpsc::Receiver, - mut reconn_rx: mpsc::Receiver<(String, bool)>, + mut reconn_rx: mpsc::Receiver<(String, bool, oneshot::Sender)>, init_tx: mpsc::Sender, ) { // msg handler (from CLN looper) @@ -74,12 +75,17 @@ pub fn lss_tasks( let lss_conn_ = lss_conn.clone(); let init_tx_ = init_tx.clone(); tokio::task::spawn(async move { - while let Some((cid, connected)) = reconn_rx.recv().await { + while let Some((cid, connected, oneshot_send_tx)) = reconn_rx.recv().await { if connected { log::info!("CLIENT {} reconnected!", cid); if let Err(e) = reconnect_dance(&cid, &lss_conn_, &init_tx_).await { log::error!("reconnect dance failed {:?}", e); + let _ = oneshot_send_tx.send(false); + } else { + let _ = oneshot_send_tx.send(true); } + } else { + let _ = oneshot_send_tx.send(false); } } }); @@ -90,18 +96,21 @@ async fn reconnect_dance( lss_conn: &LssBroker, mqtt_tx: &mpsc::Sender, ) -> Result<()> { + log::debug!("Reconnect dance started, proceeding with step 1"); let ir = loop { if let Ok(ir) = dance_step_1(cid, lss_conn, mqtt_tx).await { break ir; } sleep(2).await; }; + log::debug!("Step 1 finished, now onto step 2"); loop { if let Ok(_) = dance_step_2(cid, lss_conn, mqtt_tx, &ir).await { break; } sleep(2).await; } + log::debug!("Reconnect dance finished!"); Ok(()) } @@ -111,7 +120,9 @@ async fn dance_step_1( mqtt_tx: &mpsc::Sender, ) -> Result { let init_bytes = lss_conn.make_init_msg().await?; + log::debug!("starting dance_step_1 send for {}", cid); let reply = ChannelRequest::send_for(cid, topics::INIT_1_MSG, init_bytes, mqtt_tx).await?; + log::debug!("send for completed"); let ir = Response::from_slice(&reply)?.into_init()?; Ok(ir) } @@ -123,7 +134,9 @@ async fn dance_step_2( ir: &InitResponse, ) -> Result<()> { let state_bytes = lss_conn.get_created_state_msg(ir).await?; + log::debug!("starting dance_step_2 send for {}", cid); let reply2 = ChannelRequest::send_for(cid, topics::INIT_2_MSG, state_bytes, mqtt_tx).await?; + log::debug!("send for completed"); let cr = Response::from_slice(&reply2)?.into_created()?; lss_conn.handle(Response::Created(cr)).await?; Ok(()) diff --git a/broker/src/main.rs b/broker/src/main.rs index 19030f3..0499c44 100644 --- a/broker/src/main.rs +++ b/broker/src/main.rs @@ -74,7 +74,7 @@ async fn run_main(parent_fd: i32) -> rocket::Rocket { let (error_tx, error_rx) = broadcast::channel(10000); error_log::log_errors(error_rx); - let (reconn_tx, reconn_rx) = mpsc::channel::<(String, bool)>(10000); + let (reconn_tx, reconn_rx) = mpsc::channel::<(String, bool, std_oneshot::Sender)>(10000); // waits until first connection let conns = broker_setup( @@ -146,7 +146,7 @@ pub async fn broker_setup( settings: Settings, mqtt_rx: mpsc::Receiver, init_rx: mpsc::Receiver, - reconn_tx: mpsc::Sender<(String, bool)>, + reconn_tx: mpsc::Sender<(String, bool, std_oneshot::Sender)>, error_tx: broadcast::Sender>, ) -> Arc> { let (auth_tx, auth_rx) = std::sync::mpsc::channel::(); @@ -193,10 +193,16 @@ pub async fn broker_setup( let _ = startup_tx.send(cid.to_string()); while let Ok((cid, connected)) = status_rx.recv() { log::info!("=> reconnected: {}: {}", cid, connected); + let (dance_complete_tx, dance_complete_rx) = std_oneshot::channel::(); + let _ = reconn_tx_.blocking_send((cid.clone(), connected, dance_complete_tx)); + let dance_complete = dance_complete_rx.recv().unwrap_or_else(|e| { + log::info!("dance_complete channel died before receiving response: {}", e); + false + }); let mut cs = conns_.lock().unwrap(); - cs.client_action(&cid, connected); + cs.client_action(&cid, dance_complete); + log::debug!("List: {:?}, action: {}", cs, dance_complete); drop(cs); - let _ = reconn_tx_.blocking_send((cid, connected)); } }); let _first_client_id = startup_rx.recv(); diff --git a/broker/src/mqtt.rs b/broker/src/mqtt.rs index 9d08aaa..3fdf997 100644 --- a/broker/src/mqtt.rs +++ b/broker/src/mqtt.rs @@ -145,25 +145,24 @@ fn pub_and_wait( link_tx: &mut LinkTx, ) { loop { - let cs = conns_.lock().unwrap(); - let client_list = cs.clients.clone(); - drop(cs); + log::debug!("looping in pub_and_wait"); let reply = if let Some(cid) = msg.cid.clone() { - if !client_list.contains(&cid) { + // for a specific client + log::debug!("publishing to a specific client"); + let res_opt = pub_timeout(&cid, &msg.topic, &msg.message, &msg_rx, link_tx); + log::debug!("client responded!"); + // control topic should be able to fail early without retrying + if res_opt.is_none() && msg.topic == topics::CONTROL { Some(ChannelReply::empty()) } else { - // for a specific client - log::debug!("publishing to a specific client"); - let res_opt = pub_timeout(&cid, &msg.topic, &msg.message, &msg_rx, link_tx); - // control topic should be able to fail early without retrying - if res_opt.is_none() && msg.topic == topics::CONTROL { - Some(ChannelReply::empty()) - } else { - res_opt - } + res_opt } } else { log::debug!("publishing to all clients"); + let cs = conns_.lock().unwrap(); + let client_list = cs.clients.clone(); + log::debug!("got the list lock!"); + drop(cs); // send to each client in turn if client_list.len() == 0 { // wait a second if there are no clients