more solid reconnect dance

This commit is contained in:
Evan Feenstra
2023-06-28 12:45:51 -07:00
parent 0c0cea7f6f
commit 2c8ba0a26f

View File

@@ -1,33 +1,60 @@
use anyhow::Result;
use rocket::tokio::{
self,
sync::{mpsc},
};
use std::time::Duration;
use crate::conn::{ChannelRequest, LssReq};
use lss_connector::{LssBroker, Response};
use anyhow::Result;
use lss_connector::{InitResponse, LssBroker, Response, SignerMutations};
use rocket::tokio;
use sphinx_signer::sphinx_glyph::topics;
use std::time::Duration;
use tokio::sync::mpsc;
pub async fn lss_setup(uri: &str, mqtt_tx: mpsc::Sender<ChannelRequest>) -> Result<LssBroker> {
// LSS required
let (spk, msg_bytes) = LssBroker::get_server_pubkey(uri).await?;
let reply = ChannelRequest::send(topics::INIT_MSG, msg_bytes, &mqtt_tx).await?;
let ir = Response::from_slice(&reply)?.as_init()?;
let ir = loop {
if let Ok(ir) = send_init(msg_bytes.clone(), &mqtt_tx).await {
break ir;
}
sleep(2).await;
};
let lss_conn = LssBroker::new(uri, ir.clone(), spk).await?;
// this only returns the initial state if it was requested by signer
let msg_bytes2 = lss_conn.get_created_state_msg(&ir).await?;
let reply2 = ChannelRequest::send(topics::INIT_MSG, msg_bytes2, &mqtt_tx).await?;
let cr = Response::from_slice(&reply2)?.as_created()?;
let cr = loop {
if let Ok(ir) = send_created(msg_bytes2.clone(), &mqtt_tx).await {
break ir;
}
sleep(2).await;
};
lss_conn.handle(Response::Created(cr)).await?;
Ok(lss_conn)
}
pub fn lss_tasks(lss_conn: LssBroker, mut lss_rx: mpsc::Receiver<LssReq>, mut reconn_rx: mpsc::Receiver<(String, bool)>, init_tx: mpsc::Sender<ChannelRequest>) {
async fn send_init(
msg_bytes: Vec<u8>,
mqtt_tx: &mpsc::Sender<ChannelRequest>,
) -> Result<InitResponse> {
let reply = ChannelRequest::send(topics::INIT_MSG, msg_bytes, &mqtt_tx).await?;
let ir = Response::from_slice(&reply)?.as_init()?;
Ok(ir)
}
async fn send_created(
msg_bytes: Vec<u8>,
mqtt_tx: &mpsc::Sender<ChannelRequest>,
) -> Result<SignerMutations> {
let reply2 = ChannelRequest::send(topics::INIT_MSG, msg_bytes, &mqtt_tx).await?;
let cr = Response::from_slice(&reply2)?.as_created()?;
Ok(cr)
}
pub fn lss_tasks(
lss_conn: LssBroker,
mut lss_rx: mpsc::Receiver<LssReq>,
mut reconn_rx: mpsc::Receiver<(String, bool)>,
init_tx: mpsc::Sender<ChannelRequest>,
) {
// msg handler (from CLN looper)
let lss_conn_ = lss_conn.clone();
tokio::task::spawn(async move {
@@ -35,7 +62,7 @@ pub fn lss_tasks(lss_conn: LssBroker, mut lss_rx: mpsc::Receiver<LssReq>, mut re
match lss_conn_.handle_bytes(&req.message).await {
Ok(msg) => {
let _ = req.reply_tx.send(msg);
},
}
Err(e) => {
log::error!("failed lss_handle {:?}", e);
}
@@ -58,15 +85,52 @@ pub fn lss_tasks(lss_conn: LssBroker, mut lss_rx: mpsc::Receiver<LssReq>, mut re
});
}
async fn reconnect_dance(cid: &str, lss_conn: &LssBroker, mqtt_tx: &mpsc::Sender<ChannelRequest>) -> Result<()> {
async fn reconnect_dance(
cid: &str,
lss_conn: &LssBroker,
mqtt_tx: &mpsc::Sender<ChannelRequest>,
) -> Result<()> {
// sleep 3 seconds to make sure ESP32 subscription is active
tokio::time::sleep(Duration::from_secs(3)).await;
sleep(3).await;
let ir = loop {
if let Ok(ir) = dance_step_1(cid, lss_conn, mqtt_tx).await {
break ir;
}
sleep(2).await;
};
loop {
if let Ok(_) = dance_step_2(cid, lss_conn, mqtt_tx, &ir).await {
break;
}
sleep(2).await;
}
Ok(())
}
async fn dance_step_1(
cid: &str,
lss_conn: &LssBroker,
mqtt_tx: &mpsc::Sender<ChannelRequest>,
) -> Result<InitResponse> {
let init_bytes = lss_conn.make_init_msg().await?;
let reply = ChannelRequest::send_for(cid, topics::INIT_MSG, init_bytes, mqtt_tx).await?;
let ir = Response::from_slice(&reply)?.as_init()?;
let state_bytes = lss_conn.get_created_state_msg(&ir).await?;
Ok(ir)
}
async fn dance_step_2(
cid: &str,
lss_conn: &LssBroker,
mqtt_tx: &mpsc::Sender<ChannelRequest>,
ir: &InitResponse,
) -> Result<()> {
let state_bytes = lss_conn.get_created_state_msg(ir).await?;
let reply2 = ChannelRequest::send_for(cid, topics::INIT_MSG, state_bytes, mqtt_tx).await?;
let cr = Response::from_slice(&reply2)?.as_created()?;
lss_conn.handle(Response::Created(cr)).await?;
Ok(())
}
async fn sleep(s: u64) {
tokio::time::sleep(Duration::from_secs(s)).await;
}