broker: add client id to client list only after it has completed the reconnect dance

closes #97
This commit is contained in:
irriden
2023-07-22 17:56:03 +00:00
parent 9d7e8b751f
commit 1491546f40
3 changed files with 37 additions and 19 deletions

View File

@@ -2,6 +2,7 @@ use crate::conn::{ChannelRequest, LssReq};
use anyhow::Result;
use lss_connector::{InitResponse, LssBroker, Response, SignerMutations};
use rocket::tokio;
use rumqttd::oneshot;
use sphinx_signer::sphinx_glyph::topics;
use std::time::Duration;
use tokio::sync::mpsc;
@@ -52,7 +53,7 @@ async fn send_created(
pub fn lss_tasks(
lss_conn: LssBroker,
mut lss_rx: mpsc::Receiver<LssReq>,
mut reconn_rx: mpsc::Receiver<(String, bool)>,
mut reconn_rx: mpsc::Receiver<(String, bool, oneshot::Sender<bool>)>,
init_tx: mpsc::Sender<ChannelRequest>,
) {
// msg handler (from CLN looper)
@@ -74,12 +75,17 @@ pub fn lss_tasks(
let lss_conn_ = lss_conn.clone();
let init_tx_ = init_tx.clone();
tokio::task::spawn(async move {
while let Some((cid, connected)) = reconn_rx.recv().await {
while let Some((cid, connected, oneshot_send_tx)) = reconn_rx.recv().await {
if connected {
log::info!("CLIENT {} reconnected!", cid);
if let Err(e) = reconnect_dance(&cid, &lss_conn_, &init_tx_).await {
log::error!("reconnect dance failed {:?}", e);
let _ = oneshot_send_tx.send(false);
} else {
let _ = oneshot_send_tx.send(true);
}
} else {
let _ = oneshot_send_tx.send(false);
}
}
});
@@ -90,18 +96,21 @@ async fn reconnect_dance(
lss_conn: &LssBroker,
mqtt_tx: &mpsc::Sender<ChannelRequest>,
) -> Result<()> {
log::debug!("Reconnect dance started, proceeding with step 1");
let ir = loop {
if let Ok(ir) = dance_step_1(cid, lss_conn, mqtt_tx).await {
break ir;
}
sleep(2).await;
};
log::debug!("Step 1 finished, now onto step 2");
loop {
if let Ok(_) = dance_step_2(cid, lss_conn, mqtt_tx, &ir).await {
break;
}
sleep(2).await;
}
log::debug!("Reconnect dance finished!");
Ok(())
}
@@ -111,7 +120,9 @@ async fn dance_step_1(
mqtt_tx: &mpsc::Sender<ChannelRequest>,
) -> Result<InitResponse> {
let init_bytes = lss_conn.make_init_msg().await?;
log::debug!("starting dance_step_1 send for {}", cid);
let reply = ChannelRequest::send_for(cid, topics::INIT_1_MSG, init_bytes, mqtt_tx).await?;
log::debug!("send for completed");
let ir = Response::from_slice(&reply)?.into_init()?;
Ok(ir)
}
@@ -123,7 +134,9 @@ async fn dance_step_2(
ir: &InitResponse,
) -> Result<()> {
let state_bytes = lss_conn.get_created_state_msg(ir).await?;
log::debug!("starting dance_step_2 send for {}", cid);
let reply2 = ChannelRequest::send_for(cid, topics::INIT_2_MSG, state_bytes, mqtt_tx).await?;
log::debug!("send for completed");
let cr = Response::from_slice(&reply2)?.into_created()?;
lss_conn.handle(Response::Created(cr)).await?;
Ok(())

View File

@@ -74,7 +74,7 @@ async fn run_main(parent_fd: i32) -> rocket::Rocket<rocket::Build> {
let (error_tx, error_rx) = broadcast::channel(10000);
error_log::log_errors(error_rx);
let (reconn_tx, reconn_rx) = mpsc::channel::<(String, bool)>(10000);
let (reconn_tx, reconn_rx) = mpsc::channel::<(String, bool, std_oneshot::Sender<bool>)>(10000);
// waits until first connection
let conns = broker_setup(
@@ -146,7 +146,7 @@ pub async fn broker_setup(
settings: Settings,
mqtt_rx: mpsc::Receiver<ChannelRequest>,
init_rx: mpsc::Receiver<ChannelRequest>,
reconn_tx: mpsc::Sender<(String, bool)>,
reconn_tx: mpsc::Sender<(String, bool, std_oneshot::Sender<bool>)>,
error_tx: broadcast::Sender<Vec<u8>>,
) -> Arc<Mutex<Connections>> {
let (auth_tx, auth_rx) = std::sync::mpsc::channel::<AuthMsg>();
@@ -193,10 +193,16 @@ pub async fn broker_setup(
let _ = startup_tx.send(cid.to_string());
while let Ok((cid, connected)) = status_rx.recv() {
log::info!("=> reconnected: {}: {}", cid, connected);
let (dance_complete_tx, dance_complete_rx) = std_oneshot::channel::<bool>();
let _ = reconn_tx_.blocking_send((cid.clone(), connected, dance_complete_tx));
let dance_complete = dance_complete_rx.recv().unwrap_or_else(|e| {
log::info!("dance_complete channel died before receiving response: {}", e);
false
});
let mut cs = conns_.lock().unwrap();
cs.client_action(&cid, connected);
cs.client_action(&cid, dance_complete);
log::debug!("List: {:?}, action: {}", cs, dance_complete);
drop(cs);
let _ = reconn_tx_.blocking_send((cid, connected));
}
});
let _first_client_id = startup_rx.recv();

View File

@@ -145,25 +145,24 @@ fn pub_and_wait(
link_tx: &mut LinkTx,
) {
loop {
let cs = conns_.lock().unwrap();
let client_list = cs.clients.clone();
drop(cs);
log::debug!("looping in pub_and_wait");
let reply = if let Some(cid) = msg.cid.clone() {
if !client_list.contains(&cid) {
// for a specific client
log::debug!("publishing to a specific client");
let res_opt = pub_timeout(&cid, &msg.topic, &msg.message, &msg_rx, link_tx);
log::debug!("client responded!");
// control topic should be able to fail early without retrying
if res_opt.is_none() && msg.topic == topics::CONTROL {
Some(ChannelReply::empty())
} else {
// for a specific client
log::debug!("publishing to a specific client");
let res_opt = pub_timeout(&cid, &msg.topic, &msg.message, &msg_rx, link_tx);
// control topic should be able to fail early without retrying
if res_opt.is_none() && msg.topic == topics::CONTROL {
Some(ChannelReply::empty())
} else {
res_opt
}
res_opt
}
} else {
log::debug!("publishing to all clients");
let cs = conns_.lock().unwrap();
let client_list = cs.clients.clone();
log::debug!("got the list lock!");
drop(cs);
// send to each client in turn
if client_list.len() == 0 {
// wait a second if there are no clients