From 564a7534ff53e0f32ec0aa7f02a84003b1411285 Mon Sep 17 00:00:00 2001 From: irriden Date: Tue, 8 Aug 2023 19:17:15 +0000 Subject: [PATCH 1/2] broker: rework lss init logic now the logic for the first connection and subsequent connections is nearly the same --- broker/src/lss.rs | 192 +++++++++++++++++++++++---------------------- broker/src/main.rs | 40 +++------- 2 files changed, 109 insertions(+), 123 deletions(-) diff --git a/broker/src/lss.rs b/broker/src/lss.rs index 4f0766a..0ecbf69 100644 --- a/broker/src/lss.rs +++ b/broker/src/lss.rs @@ -3,64 +3,38 @@ use anyhow::{anyhow, Result}; use lss_connector::{InitResponse, LssBroker, Response, SignerMutations}; use rocket::tokio; use rumqttd::oneshot; +use rumqttd::oneshot as std_oneshot; use sphinx_signer::sphinx_glyph::topics; -use std::time::Duration; use tokio::sync::mpsc; -pub async fn lss_setup(uri: &str, mqtt_tx: mpsc::Sender) -> Result { - // LSS required - let (spk, msg_bytes) = LssBroker::get_server_pubkey(uri).await?; - let ir = loop { - if let Ok(ir) = send_init(msg_bytes.clone(), &mqtt_tx).await { - break ir; - } - sleep(2).await; - }; - - let lss_conn = LssBroker::new(uri, ir.clone(), spk).await?; - // this only returns the initial state if it was requested by signer - let msg_bytes2 = lss_conn.get_created_state_msg(&ir).await?; - let cr = loop { - if let Ok(ir) = send_created(msg_bytes2.clone(), &mqtt_tx).await { - break ir; - } - sleep(2).await; - }; - - lss_conn.handle(Response::Created(cr)).await?; - - Ok(lss_conn) -} - -async fn send_init( - msg_bytes: Vec, - mqtt_tx: &mpsc::Sender, -) -> Result { - let reply = ChannelRequest::send(topics::INIT_1_MSG, msg_bytes, &mqtt_tx).await?; - let ir = Response::from_slice(&reply)?.into_init()?; - Ok(ir) -} - -async fn send_created( - msg_bytes: Vec, - mqtt_tx: &mpsc::Sender, -) -> Result { - let reply2 = ChannelRequest::send(topics::INIT_2_MSG, msg_bytes, &mqtt_tx).await?; - let cr = Response::from_slice(&reply2)?.into_created()?; - Ok(cr) -} - pub fn lss_tasks( - lss_conn: LssBroker, - mut lss_rx: mpsc::Receiver, - mut reconn_rx: mpsc::Receiver<(String, oneshot::Sender)>, + uri: String, + lss_rx: mpsc::Receiver, + mut conn_rx: mpsc::Receiver<(String, oneshot::Sender)>, init_tx: mpsc::Sender, ) { - // msg handler (from CLN looper) - let lss_conn_ = lss_conn.clone(); + tokio::task::spawn(async move { + // first connection - initializes lssbroker + let lss_conn = loop { + let (cid, dance_complete_tx) = conn_rx.recv().await.unwrap(); + match try_dance(&cid, &uri, None, &init_tx, dance_complete_tx).await { + Some(broker) => break broker, + None => log::warn!("broker not initialized, try connecting again..."), + } + }; + spawn_lss_rx(lss_conn.clone(), lss_rx); + // connect handler for all subsequent connections + while let Some((cid, dance_complete_tx)) = conn_rx.recv().await { + log::info!("CLIENT {} connected!", cid); + let _ = try_dance(&cid, &uri, Some(&lss_conn), &init_tx, dance_complete_tx).await; + } + }); +} + +fn spawn_lss_rx(lss_conn: LssBroker, mut lss_rx: mpsc::Receiver) { tokio::task::spawn(async move { while let Some(req) = lss_rx.recv().await { - match lss_conn_.handle_bytes(&req.message).await { + match lss_conn.handle_bytes(&req.message).await { Ok(msg) => { let _ = req.reply_tx.send(msg); } @@ -70,50 +44,64 @@ pub fn lss_tasks( } } }); - - // reconnect handler (when a client reconnects) - let lss_conn_ = lss_conn.clone(); - let init_tx_ = init_tx.clone(); - tokio::task::spawn(async move { - while let Some((cid, dance_complete_tx)) = reconn_rx.recv().await { - log::info!("CLIENT {} reconnected!", cid); - if let Err(e) = reconnect_dance(&cid, &lss_conn_, &init_tx_).await { - log::error!("reconnect dance failed {:?}", e); - let _ = dance_complete_tx.send(false); - } else { - let _ = dance_complete_tx.send(true); - } - } - }); } -async fn reconnect_dance( +async fn try_dance( cid: &str, - lss_conn: &LssBroker, - mqtt_tx: &mpsc::Sender, -) -> Result<()> { - log::debug!("Reconnect dance started, proceeding with step 1"); - let ir = dance_step_1(cid, lss_conn, mqtt_tx).await?; - log::debug!("Step 1 finished, now onto step 2"); - let _ = dance_step_2(cid, lss_conn, mqtt_tx, &ir).await?; - log::debug!("Reconnect dance finished!"); - Ok(()) + uri: &str, + lss_conn: Option<&LssBroker>, + init_tx: &mpsc::Sender, + dance_complete_tx: std_oneshot::Sender, +) -> Option { + match connect_dance(cid, uri, lss_conn, init_tx).await { + Ok(broker) => { + let _ = dance_complete_tx.send(true); + // none if lss_conn is some, some otherwise + broker + } + Err(e) => { + log::warn!("connect_dance failed: {:?}", e); + let _ = dance_complete_tx.send(false); + None + } + } } +async fn connect_dance( + cid: &str, + uri: &str, + lss_conn: Option<&LssBroker>, + mqtt_tx: &mpsc::Sender, +) -> Result> { + let (new_broker, ir) = dance_step_1(cid, uri, lss_conn, mqtt_tx).await?; + let lss_conn = new_broker.as_ref().xor(lss_conn).ok_or(anyhow!( + "should never happen, either we use the newly initialized, or the one passed in" + ))?; + let _ = dance_step_2(cid, lss_conn, mqtt_tx, &ir).await?; + // only some when lss_conn is none + Ok(new_broker) +} + +// initializes a new broker in case lss_conn is none async fn dance_step_1( cid: &str, - lss_conn: &LssBroker, + uri: &str, + lss_conn: Option<&LssBroker>, mqtt_tx: &mpsc::Sender, -) -> Result { - let init_bytes = lss_conn.make_init_msg().await?; - log::debug!("starting dance_step_1 send for {}", cid); - let reply = ChannelRequest::send_for(cid, topics::INIT_1_MSG, init_bytes, mqtt_tx).await?; - log::debug!("dance_step_1 send for completed"); - if reply.is_empty() { - return Err(anyhow!("dance step 1 did not complete")); +) -> Result<(Option, InitResponse)> { + match lss_conn { + Some(lss_conn) => { + let init_bytes = lss_conn.make_init_msg().await?; + let ir = send_init(cid, init_bytes, mqtt_tx).await?; + Ok((None, ir)) + } + None => { + let (spk, init_bytes) = LssBroker::get_server_pubkey(uri).await?; + let ir = send_init(cid, init_bytes, mqtt_tx).await?; + let lss_conn = Some(LssBroker::new(uri, ir.clone(), spk).await?); + Ok((lss_conn, ir)) + } } - let ir = Response::from_slice(&reply)?.into_init()?; - Ok(ir) } async fn dance_step_2( @@ -123,17 +111,33 @@ async fn dance_step_2( ir: &InitResponse, ) -> Result<()> { let state_bytes = lss_conn.get_created_state_msg(ir).await?; - log::debug!("starting dance_step_2 send for {}", cid); - let reply2 = ChannelRequest::send_for(cid, topics::INIT_2_MSG, state_bytes, mqtt_tx).await?; - log::debug!("dance_step_2 send for completed"); - if reply2.is_empty() { - return Err(anyhow!("dance step 2 did not complete")); - } - let cr = Response::from_slice(&reply2)?.into_created()?; + let cr = send_created(cid, state_bytes, mqtt_tx).await?; lss_conn.handle(Response::Created(cr)).await?; Ok(()) } -async fn sleep(s: u64) { - tokio::time::sleep(Duration::from_secs(s)).await; +async fn send_init( + cid: &str, + msg_bytes: Vec, + mqtt_tx: &mpsc::Sender, +) -> Result { + let reply = ChannelRequest::send_for(cid, topics::INIT_1_MSG, msg_bytes, mqtt_tx).await?; + if reply.is_empty() { + return Err(anyhow!("send init did not complete, reply is empty")); + } + let ir = Response::from_slice(&reply)?.into_init()?; + Ok(ir) +} + +async fn send_created( + cid: &str, + msg_bytes: Vec, + mqtt_tx: &mpsc::Sender, +) -> Result { + let reply = ChannelRequest::send_for(cid, topics::INIT_2_MSG, msg_bytes, mqtt_tx).await?; + if reply.is_empty() { + return Err(anyhow!("send created did not complete, reply is empty")); + } + let cr = Response::from_slice(&reply)?.into_created()?; + Ok(cr) } diff --git a/broker/src/main.rs b/broker/src/main.rs index 3921f84..707435b 100644 --- a/broker/src/main.rs +++ b/broker/src/main.rs @@ -20,8 +20,8 @@ use rocket::tokio::{ sync::{broadcast, mpsc}, }; use rumqttd::{oneshot as std_oneshot, AuthMsg}; +use std::env; use std::sync::{Arc, Mutex}; -use std::{env, time::Duration}; use url::Url; use vls_frontend::{frontend::SourceFactory, Frontend}; use vls_proxy::client::UnixClient; @@ -74,38 +74,26 @@ async fn run_main(parent_fd: i32) -> rocket::Rocket { let (error_tx, error_rx) = broadcast::channel(10000); error_log::log_errors(error_rx); - let (reconn_tx, reconn_rx) = mpsc::channel::<(String, std_oneshot::Sender)>(10000); + let (conn_tx, conn_rx) = mpsc::channel::<(String, std_oneshot::Sender)>(10000); // this does not wait for the first connection let conns = broker_setup( settings, mqtt_rx, init_rx, - reconn_tx.clone(), + conn_tx, error_tx.clone(), ) .await; let (lss_tx, lss_rx) = mpsc::channel::(10000); + // TODO: add a validation here of the uri setting to make sure LSS is running if let Ok(lss_uri) = env::var("VLS_LSS") { - // waits until LSS confirmation from signer - let lss_broker = loop { - match lss::lss_setup(&lss_uri, init_tx.clone()).await { - Ok(l) => { - break l; - } - Err(e) => { - let _ = error_tx.send(e.to_string().as_bytes().to_vec()); - log::error!("failed LSS setup, trying again..."); - tokio::time::sleep(Duration::from_secs(3)).await; - } - } - }; - lss::lss_tasks(lss_broker, lss_rx, reconn_rx, init_tx); - log::info!("=> lss broker connection created!"); + log::info!("Spawning lss tasks..."); + lss::lss_tasks(lss_uri, lss_rx, conn_rx, init_tx); } else { log::warn!("running without LSS"); - }; + } if let Ok(btc_url) = env::var("BITCOIND_RPC_URL") { let signer_port = MqttSignerPort::new(mqtt_tx.clone(), lss_tx.clone()); @@ -142,7 +130,7 @@ pub async fn broker_setup( settings: Settings, mqtt_rx: mpsc::Receiver, init_rx: mpsc::Receiver, - reconn_tx: mpsc::Sender<(String, std_oneshot::Sender)>, + conn_tx: mpsc::Sender<(String, std_oneshot::Sender)>, error_tx: broadcast::Sender>, ) -> Arc> { let (auth_tx, auth_rx) = std::sync::mpsc::channel::(); @@ -177,22 +165,16 @@ pub async fn broker_setup( // client connections state let conns_ = conns.clone(); std::thread::spawn(move || { - log::info!("=> wait for connected status"); - // wait for connection = true - let (cid, connected) = status_rx.recv().expect("couldnt receive"); - let mut cs = conns_.lock().unwrap(); - cs.client_action(&cid, connected); - drop(cs); - log::info!("=> connected: {}: {}", cid, connected); + log::info!("=> waiting first connection..."); while let Ok((cid, connected)) = status_rx.recv() { - log::info!("=> reconnected: {}: {}", cid, connected); + log::info!("=> connection status: {}: {}", cid, connected); let mut cs = conns_.lock().unwrap(); // drop it from list until ready cs.client_action(&cid, false); drop(cs); if connected { let (dance_complete_tx, dance_complete_rx) = std_oneshot::channel::(); - let _ = reconn_tx.blocking_send((cid.clone(), dance_complete_tx)); + let _ = conn_tx.blocking_send((cid.clone(), dance_complete_tx)); let dance_complete = dance_complete_rx.recv().unwrap_or_else(|e| { log::info!( "dance_complete channel died before receiving response: {}", From 5a0b064659e42772c137060ee501bd7531a64467 Mon Sep 17 00:00:00 2001 From: irriden Date: Wed, 9 Aug 2023 16:08:41 +0000 Subject: [PATCH 2/2] broker: first connection triggers signerloop start and rocket launch --- broker/src/conn.rs | 3 +++ broker/src/main.rs | 18 ++++++++---------- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/broker/src/conn.rs b/broker/src/conn.rs index 12452b3..c9ed0e5 100644 --- a/broker/src/conn.rs +++ b/broker/src/conn.rs @@ -18,6 +18,9 @@ impl Connections { current: None, } } + pub fn len(&self) -> usize { + self.clients.len() + } pub fn set_pubkey(&mut self, pk: &str) { self.pubkey = Some(pk.to_string()) } diff --git a/broker/src/main.rs b/broker/src/main.rs index 707435b..042fb09 100644 --- a/broker/src/main.rs +++ b/broker/src/main.rs @@ -76,15 +76,7 @@ async fn run_main(parent_fd: i32) -> rocket::Rocket { let (conn_tx, conn_rx) = mpsc::channel::<(String, std_oneshot::Sender)>(10000); - // this does not wait for the first connection - let conns = broker_setup( - settings, - mqtt_rx, - init_rx, - conn_tx, - error_tx.clone(), - ) - .await; + let conns = broker_setup(settings, mqtt_rx, init_rx, conn_tx, error_tx.clone()).await; let (lss_tx, lss_rx) = mpsc::channel::(10000); // TODO: add a validation here of the uri setting to make sure LSS is running @@ -112,7 +104,12 @@ async fn run_main(parent_fd: i32) -> rocket::Rocket { } // test sleep FIXME - tokio::time::sleep(std::time::Duration::from_secs(1)).await; + while conns.lock().unwrap().len() == 0 { + log::debug!( + "waiting for first connection before proceeding with SignerLoop and Rocket launch" + ); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } let cln_client = UnixClient::new(UnixConnection::new(parent_fd)); // TODO pass status_rx into SignerLoop? @@ -182,6 +179,7 @@ pub async fn broker_setup( ); false }); + log::info!("adding client to the list? {}", dance_complete); let mut cs = conns_.lock().unwrap(); cs.client_action(&cid, dance_complete); log::debug!("List: {:?}, action: {}", cs, dance_complete);