From 7ab755c6706cd54305f6b9e2aba81f3a3403ca4e Mon Sep 17 00:00:00 2001 From: irriden Date: Tue, 13 Feb 2024 16:54:11 +0000 Subject: [PATCH] smooth++ --- broker/src/conn.rs | 2 -- broker/src/looper.rs | 28 +++++++--------------------- broker/src/lss.rs | 42 +++++++++++++++++++++++------------------- broker/src/main.rs | 18 ++++++++++++++---- 4 files changed, 44 insertions(+), 46 deletions(-) diff --git a/broker/src/conn.rs b/broker/src/conn.rs index 7281d16..3c32c3a 100644 --- a/broker/src/conn.rs +++ b/broker/src/conn.rs @@ -7,8 +7,6 @@ use std::sync::Mutex; pub static CONNS: Lazy> = Lazy::new(|| Mutex::new(Connections::new())); -pub static HSMD_INIT: Mutex> = Mutex::new(Vec::new()); - #[derive(Debug, Serialize, Deserialize, Clone)] pub struct Connections { pub pubkey: Option, diff --git a/broker/src/looper.rs b/broker/src/looper.rs index 80a838d..4780333 100644 --- a/broker/src/looper.rs +++ b/broker/src/looper.rs @@ -1,6 +1,4 @@ -use crate::bitcoin::blockdata::constants::ChainHash; -use crate::bitcoin::Network; -use crate::conn::{ChannelRequest, LssReq, HSMD_INIT}; +use crate::conn::{ChannelRequest, LssReq}; use crate::handle::handle_message; use crate::secp256k1::PublicKey; use log::*; @@ -77,16 +75,16 @@ impl SignerLoop { } /// Start the read loop - pub fn start(&mut self, network: Option) { + pub fn start(&mut self) { info!("loop {}: start", self.log_prefix); - match self.do_loop(network) { + match self.do_loop() { Ok(()) => info!("loop {}: done", self.log_prefix), Err(Error::Eof) => info!("loop {}: ending", self.log_prefix), Err(e) => error!("loop {}: error {:?}", self.log_prefix, e), } } - fn do_loop(&mut self, network: Option) -> Result<()> { + fn do_loop(&mut self) -> Result<()> { loop { let raw_msg = self.client.read_raw()?; // debug!("loop {}: got raw", self.log_prefix); @@ -108,7 +106,7 @@ impl SignerLoop { self.vls_tx.clone(), client_id, ); - thread::spawn(move || new_loop.start(None)); + thread::spawn(move || new_loop.start()); } Message::Memleak(_) => { // info!("Memleak"); @@ -116,20 +114,8 @@ impl SignerLoop { self.client.write(reply)?; } msg => { - if let Message::HsmdInit(ref m) = msg { - if let Some(net) = network { - if ChainHash::using_genesis_block(net).as_bytes() - != m.chain_params.as_ref() - { - panic!("The network settings of CLN and broker don't match!"); - } - } else { - log::error!("No Network provided"); - } - let mut hsmd_raw = HSMD_INIT.lock().unwrap(); - *hsmd_raw = raw_msg; - drop(hsmd_raw); - continue; + if let Message::HsmdInit(ref _m) = msg { + panic!("HsmdInit should have been handled already!"); } // check if we got the same preapprove message less than PREAPPROVE_CACHE_TTL seconds ago if let Message::PreapproveInvoice(_) | Message::PreapproveKeysend(_) = msg { diff --git a/broker/src/lss.rs b/broker/src/lss.rs index 0c28117..cb16449 100644 --- a/broker/src/lss.rs +++ b/broker/src/lss.rs @@ -1,4 +1,3 @@ -use crate::conn::HSMD_INIT; use crate::conn::{ChannelRequest, LssReq}; use anyhow::{anyhow, Result}; use lss_connector::{InitResponse, LssBroker, Response, SignerMutations}; @@ -17,12 +16,13 @@ pub fn lss_tasks( mut conn_rx: mpsc::Receiver<(String, oneshot::Sender)>, init_tx: mpsc::Sender, mut cln_client: UnixClient, + mut hsmd_raw: Vec, ) { tokio::task::spawn(async move { // first connection - initializes lssbroker let (lss_conn, hsmd_init_reply) = loop { let (cid, dance_complete_tx) = conn_rx.recv().await.unwrap(); - match try_dance(&cid, &uri, None, &init_tx, dance_complete_tx).await { + match try_dance(&cid, &uri, None, &init_tx, dance_complete_tx, &mut hsmd_raw).await { Some(ret) => break ret, None => log::warn!("broker not initialized, try connecting again..."), } @@ -32,7 +32,15 @@ pub fn lss_tasks( // connect handler for all subsequent connections while let Some((cid, dance_complete_tx)) = conn_rx.recv().await { log::info!("CLIENT {} connected!", cid); - let _ = try_dance(&cid, &uri, Some(&lss_conn), &init_tx, dance_complete_tx).await; + let _ = try_dance( + &cid, + &uri, + Some(&lss_conn), + &init_tx, + dance_complete_tx, + &mut hsmd_raw, + ) + .await; } }); } @@ -58,8 +66,9 @@ async fn try_dance( lss_conn: Option<&LssBroker>, init_tx: &mpsc::Sender, dance_complete_tx: std_oneshot::Sender, + hsmd_raw: &mut Vec, ) -> Option<(LssBroker, Vec)> { - match connect_dance(cid, uri, lss_conn, init_tx).await { + match connect_dance(cid, uri, lss_conn, init_tx, hsmd_raw).await { Ok(ret) => { let _ = dance_complete_tx.send(true); // none if lss_conn is some, some otherwise @@ -78,13 +87,14 @@ async fn connect_dance( uri: &str, lss_conn_opt: Option<&LssBroker>, mqtt_tx: &mpsc::Sender, + hsmd_raw: &mut Vec, ) -> Result)>> { let (new_broker, ir) = dance_step_1(cid, uri, lss_conn_opt, mqtt_tx).await?; let lss_conn = new_broker.as_ref().xor(lss_conn_opt).ok_or(anyhow!( "should never happen, either we use the newly initialized, or the one passed in" ))?; dance_step_2(cid, lss_conn, mqtt_tx, &ir).await?; - let hsmd_init_reply = dance_step_3(cid, mqtt_tx).await?; + let hsmd_init_reply = dance_step_3(cid, mqtt_tx, hsmd_raw).await?; // only some when lss_conn_opt is none Ok(new_broker.map(|broker| (broker, hsmd_init_reply))) } @@ -123,19 +133,15 @@ async fn dance_step_2( Ok(()) } -async fn dance_step_3(cid: &str, mqtt_tx: &mpsc::Sender) -> Result> { - let (hsmd_raw, mut hsmd_init) = loop { - let hsmd_raw = HSMD_INIT.lock().unwrap().clone(); - if hsmd_raw.is_empty() { - continue; - } - if let Message::HsmdInit(hsmd_init) = msgs::from_vec(hsmd_raw.clone()).unwrap() { - break (hsmd_raw, hsmd_init); - } else { - panic!("Not a hsmd init message"); - } +async fn dance_step_3( + cid: &str, + mqtt_tx: &mpsc::Sender, + hsmd_raw: &mut Vec, +) -> Result> { + let Message::HsmdInit(mut hsmd_init) = msgs::from_vec(hsmd_raw.clone()).unwrap() else { + panic!("Expected a hsmd init message here") }; - let hsmd_init_bytes = parser::raw_request_from_bytes(hsmd_raw, 0, [0u8; 33], 0)?; + let hsmd_init_bytes = parser::raw_request_from_bytes(hsmd_raw.clone(), 0, [0u8; 33], 0)?; let reply = ChannelRequest::send(cid, topics::INIT_3_MSG, hsmd_init_bytes, mqtt_tx).await?; if reply.is_empty() { return Err(anyhow!("Hsmd init failed !")); @@ -145,9 +151,7 @@ async fn dance_step_3(cid: &str, mqtt_tx: &mpsc::Sender) -> Resu match msgs::from_vec(hsmd_init_reply.clone()) { Ok(Message::HsmdInitReplyV4(hir)) => { hsmd_init.hsm_wire_max_version = hir.hsm_version; - let mut hsmd_raw = HSMD_INIT.lock().unwrap(); *hsmd_raw = hsmd_init.as_vec(); - drop(hsmd_raw); } _ => panic!("Not a hsmd init reply v4"), }; diff --git a/broker/src/main.rs b/broker/src/main.rs index 42e3757..cac8f35 100644 --- a/broker/src/main.rs +++ b/broker/src/main.rs @@ -11,6 +11,7 @@ mod util; pub(crate) use sphinx_signer::lightning_signer::bitcoin::{self, secp256k1}; +use crate::bitcoin::blockdata::constants::ChainHash; use crate::chain_tracker::MqttSignerPort; use crate::conn::{conns_set_pubkey, current_pubkey, new_connection, ChannelRequest, LssReq}; use crate::looper::SignerLoop; @@ -26,7 +27,8 @@ use std::env; use std::sync::Arc; use url::Url; use vls_frontend::{frontend::SourceFactory, Frontend}; -use vls_proxy::client::UnixClient; +use vls_protocol::{msgs, msgs::Message}; +use vls_proxy::client::{Client, UnixClient}; use vls_proxy::connection::{open_parent_fd, UnixConnection}; use vls_proxy::portfront::SignerPortFront; use vls_proxy::util::{add_hsmd_args, handle_hsmd_version}; @@ -78,12 +80,20 @@ fn run_main(parent_fd: i32) -> rocket::Rocket { broker_setup(settings, mqtt_rx, init_rx, conn_tx, error_tx.clone()); - let cln_client_a = UnixClient::new(UnixConnection::new(parent_fd)); + let mut cln_client_a = UnixClient::new(UnixConnection::new(parent_fd)); + let hsmd_raw = cln_client_a.read_raw().unwrap(); + let msg = msgs::from_vec(hsmd_raw.clone()).unwrap(); + let Message::HsmdInit(ref m) = msg else { + panic!("Expected a hsmd init message first"); + }; + if ChainHash::using_genesis_block(settings.network).as_bytes() != m.chain_params.as_ref() { + panic!("The network settings of CLN and broker don't match!"); + } let (lss_tx, lss_rx) = mpsc::channel::(10000); // TODO: add a validation here of the uri setting to make sure LSS is running if let Ok(lss_uri) = env::var("VLS_LSS") { log::info!("Spawning lss tasks..."); - lss::lss_tasks(lss_uri, lss_rx, conn_rx, init_tx, cln_client_a); + lss::lss_tasks(lss_uri, lss_rx, conn_rx, init_tx, cln_client_a, hsmd_raw); } else { log::warn!("running without LSS"); } @@ -116,7 +126,7 @@ fn run_main(parent_fd: i32) -> rocket::Rocket { let mut signer_loop = SignerLoop::new(cln_client, mqtt_tx.clone(), lss_tx); // spawn CLN listener std::thread::spawn(move || { - signer_loop.start(Some(settings.network)); + signer_loop.start(); }); routes::launch_rocket(mqtt_tx, error_tx, settings)