diff --git a/Cargo.lock b/Cargo.lock index aba2a0e..facaa9f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -224,7 +224,7 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bolt-derive" version = "0.1.0" -source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=b355d2235b4ff0a223820e16889ba9cf939c0752#b355d2235b4ff0a223820e16889ba9cf939c0752" +source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=1228aba0c418009c926d78c21501fef0ecc96015#1228aba0c418009c926d78c21501fef0ecc96015" dependencies = [ "proc-macro2", "quote", @@ -1189,7 +1189,7 @@ checksum = "b06a4cde4c0f271a446782e3eff8de789548ce57dbc8eca9292c27f4a42004b4" [[package]] name = "lss-connector" version = "0.1.0" -source = "git+https://github.com/stakwork/sphinx-rs.git#057476d30a233d82d85456a83a10401b72533a8c" +source = "git+https://github.com/stakwork/sphinx-rs.git#b84522dd596606aea9899969fe0d3f68d4eb6632" dependencies = [ "anyhow", "log", @@ -1757,7 +1757,7 @@ dependencies = [ [[package]] name = "sphinx-auther" version = "0.1.12" -source = "git+https://github.com/stakwork/sphinx-rs.git#057476d30a233d82d85456a83a10401b72533a8c" +source = "git+https://github.com/stakwork/sphinx-rs.git#b84522dd596606aea9899969fe0d3f68d4eb6632" dependencies = [ "anyhow", "base64", @@ -1769,7 +1769,7 @@ dependencies = [ [[package]] name = "sphinx-crypter" version = "0.1.0" -source = "git+https://github.com/stakwork/sphinx-rs.git#057476d30a233d82d85456a83a10401b72533a8c" +source = "git+https://github.com/stakwork/sphinx-rs.git#b84522dd596606aea9899969fe0d3f68d4eb6632" dependencies = [ "anyhow", "chacha20poly1305", @@ -1780,7 +1780,7 @@ dependencies = [ [[package]] name = "sphinx-glyph" version = "0.1.2" -source = "git+https://github.com/stakwork/sphinx-rs.git#057476d30a233d82d85456a83a10401b72533a8c" +source = "git+https://github.com/stakwork/sphinx-rs.git#b84522dd596606aea9899969fe0d3f68d4eb6632" dependencies = [ "anyhow", "hex", @@ -1832,7 +1832,7 @@ dependencies = [ [[package]] name = "sphinx-signer" version = "0.1.0" -source = "git+https://github.com/stakwork/sphinx-rs.git#057476d30a233d82d85456a83a10401b72533a8c" +source = "git+https://github.com/stakwork/sphinx-rs.git#b84522dd596606aea9899969fe0d3f68d4eb6632" dependencies = [ "anyhow", "bip39", @@ -2112,8 +2112,8 @@ checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" [[package]] name = "vls-core" -version = "0.9.0" -source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=b355d2235b4ff0a223820e16889ba9cf939c0752#b355d2235b4ff0a223820e16889ba9cf939c0752" +version = "0.9.1" +source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=1228aba0c418009c926d78c21501fef0ecc96015#1228aba0c418009c926d78c21501fef0ecc96015" dependencies = [ "anyhow", "bitcoin", @@ -2134,8 +2134,8 @@ dependencies = [ [[package]] name = "vls-persist" -version = "0.9.0" -source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=b355d2235b4ff0a223820e16889ba9cf939c0752#b355d2235b4ff0a223820e16889ba9cf939c0752" +version = "0.9.1" +source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=1228aba0c418009c926d78c21501fef0ecc96015#1228aba0c418009c926d78c21501fef0ecc96015" dependencies = [ "hex", "log", @@ -2147,8 +2147,8 @@ dependencies = [ [[package]] name = "vls-protocol" -version = "0.9.0" -source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=b355d2235b4ff0a223820e16889ba9cf939c0752#b355d2235b4ff0a223820e16889ba9cf939c0752" +version = "0.9.1" +source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=1228aba0c418009c926d78c21501fef0ecc96015#1228aba0c418009c926d78c21501fef0ecc96015" dependencies = [ "as-any", "bolt-derive", @@ -2161,8 +2161,8 @@ dependencies = [ [[package]] name = "vls-protocol-signer" -version = "0.9.0" -source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=b355d2235b4ff0a223820e16889ba9cf939c0752#b355d2235b4ff0a223820e16889ba9cf939c0752" +version = "0.9.1" +source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=1228aba0c418009c926d78c21501fef0ecc96015#1228aba0c418009c926d78c21501fef0ecc96015" dependencies = [ "bit-vec", "log", diff --git a/broker/Cargo.lock b/broker/Cargo.lock index 5b007d1..457fe70 100644 --- a/broker/Cargo.lock +++ b/broker/Cargo.lock @@ -381,7 +381,7 @@ dependencies = [ [[package]] name = "bolt-derive" version = "0.1.0" -source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=b355d2235b4ff0a223820e16889ba9cf939c0752#b355d2235b4ff0a223820e16889ba9cf939c0752" +source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=1228aba0c418009c926d78c21501fef0ecc96015#1228aba0c418009c926d78c21501fef0ecc96015" dependencies = [ "proc-macro2", "quote", @@ -1582,7 +1582,7 @@ dependencies = [ [[package]] name = "lightning-storage-server" version = "0.3.0" -source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=b355d2235b4ff0a223820e16889ba9cf939c0752#b355d2235b4ff0a223820e16889ba9cf939c0752" +source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=1228aba0c418009c926d78c21501fef0ecc96015#1228aba0c418009c926d78c21501fef0ecc96015" dependencies = [ "anyhow", "async-trait", @@ -1658,7 +1658,7 @@ dependencies = [ [[package]] name = "lss-connector" version = "0.1.0" -source = "git+https://github.com/stakwork/sphinx-rs#d3f22ea08e8da221375b0086c3187479c3a01b13" +source = "git+https://github.com/stakwork/sphinx-rs#b84522dd596606aea9899969fe0d3f68d4eb6632" dependencies = [ "anyhow", "lightning-storage-server", @@ -3190,7 +3190,7 @@ dependencies = [ [[package]] name = "sphinx-auther" version = "0.1.12" -source = "git+https://github.com/stakwork/sphinx-rs#d3f22ea08e8da221375b0086c3187479c3a01b13" +source = "git+https://github.com/stakwork/sphinx-rs#b84522dd596606aea9899969fe0d3f68d4eb6632" dependencies = [ "anyhow", "base64 0.13.1", @@ -3202,7 +3202,7 @@ dependencies = [ [[package]] name = "sphinx-glyph" version = "0.1.2" -source = "git+https://github.com/stakwork/sphinx-rs#d3f22ea08e8da221375b0086c3187479c3a01b13" +source = "git+https://github.com/stakwork/sphinx-rs#b84522dd596606aea9899969fe0d3f68d4eb6632" dependencies = [ "anyhow", "hex", @@ -3248,7 +3248,7 @@ dependencies = [ [[package]] name = "sphinx-signer" version = "0.1.0" -source = "git+https://github.com/stakwork/sphinx-rs#d3f22ea08e8da221375b0086c3187479c3a01b13" +source = "git+https://github.com/stakwork/sphinx-rs#b84522dd596606aea9899969fe0d3f68d4eb6632" dependencies = [ "anyhow", "bip39", @@ -3907,8 +3907,8 @@ checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" [[package]] name = "vls-core" -version = "0.9.0" -source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=b355d2235b4ff0a223820e16889ba9cf939c0752#b355d2235b4ff0a223820e16889ba9cf939c0752" +version = "0.9.1" +source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=1228aba0c418009c926d78c21501fef0ecc96015#1228aba0c418009c926d78c21501fef0ecc96015" dependencies = [ "anyhow", "backtrace", @@ -3931,8 +3931,8 @@ dependencies = [ [[package]] name = "vls-frontend" -version = "0.9.0" -source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=b355d2235b4ff0a223820e16889ba9cf939c0752#b355d2235b4ff0a223820e16889ba9cf939c0752" +version = "0.9.1" +source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=1228aba0c418009c926d78c21501fef0ecc96015#1228aba0c418009c926d78c21501fef0ecc96015" dependencies = [ "async-trait", "lightning-storage-server", @@ -3945,8 +3945,8 @@ dependencies = [ [[package]] name = "vls-persist" -version = "0.9.0" -source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=b355d2235b4ff0a223820e16889ba9cf939c0752#b355d2235b4ff0a223820e16889ba9cf939c0752" +version = "0.9.1" +source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=1228aba0c418009c926d78c21501fef0ecc96015#1228aba0c418009c926d78c21501fef0ecc96015" dependencies = [ "hex", "kv", @@ -3959,8 +3959,8 @@ dependencies = [ [[package]] name = "vls-protocol" -version = "0.9.0" -source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=b355d2235b4ff0a223820e16889ba9cf939c0752#b355d2235b4ff0a223820e16889ba9cf939c0752" +version = "0.9.1" +source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=1228aba0c418009c926d78c21501fef0ecc96015#1228aba0c418009c926d78c21501fef0ecc96015" dependencies = [ "as-any", "bolt-derive", @@ -3973,8 +3973,8 @@ dependencies = [ [[package]] name = "vls-protocol-client" -version = "0.9.0" -source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=b355d2235b4ff0a223820e16889ba9cf939c0752#b355d2235b4ff0a223820e16889ba9cf939c0752" +version = "0.9.1" +source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=1228aba0c418009c926d78c21501fef0ecc96015#1228aba0c418009c926d78c21501fef0ecc96015" dependencies = [ "anyhow", "async-trait", @@ -3988,8 +3988,8 @@ dependencies = [ [[package]] name = "vls-protocol-signer" -version = "0.9.0" -source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=b355d2235b4ff0a223820e16889ba9cf939c0752#b355d2235b4ff0a223820e16889ba9cf939c0752" +version = "0.9.1" +source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=1228aba0c418009c926d78c21501fef0ecc96015#1228aba0c418009c926d78c21501fef0ecc96015" dependencies = [ "bit-vec", "log", @@ -4000,8 +4000,8 @@ dependencies = [ [[package]] name = "vls-proxy" -version = "0.9.0" -source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=b355d2235b4ff0a223820e16889ba9cf939c0752#b355d2235b4ff0a223820e16889ba9cf939c0752" +version = "0.9.1" +source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=1228aba0c418009c926d78c21501fef0ecc96015#1228aba0c418009c926d78c21501fef0ecc96015" dependencies = [ "anyhow", "as-any", diff --git a/broker/Cargo.toml b/broker/Cargo.toml index eb453a5..abbfee6 100644 --- a/broker/Cargo.toml +++ b/broker/Cargo.toml @@ -30,10 +30,10 @@ thiserror = "1.0.31" toml = "0.5.9" url = { version = "2.2" } -vls-frontend = { git = "https://gitlab.com/lightning-signer/validating-lightning-signer.git", rev = "b355d2235b4ff0a223820e16889ba9cf939c0752" } -vls-protocol = { git = "https://gitlab.com/lightning-signer/validating-lightning-signer.git", rev = "b355d2235b4ff0a223820e16889ba9cf939c0752" } -vls-protocol-client = { git = "https://gitlab.com/lightning-signer/validating-lightning-signer.git", rev = "b355d2235b4ff0a223820e16889ba9cf939c0752" } -vls-proxy = { git = "https://gitlab.com/lightning-signer/validating-lightning-signer.git", rev = "b355d2235b4ff0a223820e16889ba9cf939c0752" } +vls-frontend = { git = "https://gitlab.com/lightning-signer/validating-lightning-signer.git", rev = "1228aba0c418009c926d78c21501fef0ecc96015" } +vls-protocol = { git = "https://gitlab.com/lightning-signer/validating-lightning-signer.git", rev = "1228aba0c418009c926d78c21501fef0ecc96015" } +vls-protocol-client = { git = "https://gitlab.com/lightning-signer/validating-lightning-signer.git", rev = "1228aba0c418009c926d78c21501fef0ecc96015" } +vls-proxy = { git = "https://gitlab.com/lightning-signer/validating-lightning-signer.git", rev = "1228aba0c418009c926d78c21501fef0ecc96015" } lss-connector = { git = "https://github.com/stakwork/sphinx-rs" } sphinx-signer = { git = "https://github.com/stakwork/sphinx-rs" } diff --git a/broker/src/looper.rs b/broker/src/looper.rs index 84175b6..d12e3e0 100644 --- a/broker/src/looper.rs +++ b/broker/src/looper.rs @@ -160,16 +160,16 @@ impl SignerLoop { let md = parser::raw_request_from_bytes(message, self.chan.sequence, peer_id, dbid)?; // send to signer log::info!("SEND ON {}", topics::VLS); - let (res_topic, res) = self.send_request_and_get_reply(topics::VLS, md)?; + let (res_topic, res) = self.send_request_wait(topics::VLS, md)?; log::info!("GOT ON {}", res_topic); let mut the_res = res.clone(); if res_topic == topics::LSS_RES { // send reply to LSS to store muts - let lss_reply = self.send_lss_and_get_reply(res)?; + let lss_reply = self.send_lss(res)?; log::info!("LSS REPLY LEN {}", &lss_reply.len()); // send to signer for HMAC validation, and get final reply log::info!("SEND ON {}", topics::LSS_MSG); - let (res_topic2, res2) = self.send_request_and_get_reply(topics::LSS_MSG, lss_reply)?; + let (res_topic2, res2) = self.send_request_wait(topics::LSS_MSG, lss_reply)?; log::info!("GOT ON {}, send to CLN", res_topic2); if res_topic2 != topics::VLS_RETURN { log::warn!("got a topic NOT on {}", topics::VLS_RETURN); @@ -206,11 +206,7 @@ impl SignerLoop { // returns (topic, payload) // might halt if signer is offline - fn send_request_and_get_reply( - &mut self, - topic: &str, - message: Vec, - ) -> Result<(String, Vec)> { + fn send_request_wait(&mut self, topic: &str, message: Vec) -> Result<(String, Vec)> { // Send a request to the MQTT handler to send to signer let (request, reply_rx) = ChannelRequest::new(topic, message); // This can fail if MQTT shuts down @@ -223,7 +219,7 @@ impl SignerLoop { Ok((reply.topic_end, reply.reply)) } - fn send_lss_and_get_reply(&mut self, message: Vec) -> Result> { + fn send_lss(&mut self, message: Vec) -> Result> { // Send a request to the MQTT handler to send to signer let (request, reply_rx) = LssReq::new(message); // This can fail if MQTT shuts down diff --git a/broker/src/lss.rs b/broker/src/lss.rs index e487054..fe32e3e 100644 --- a/broker/src/lss.rs +++ b/broker/src/lss.rs @@ -12,14 +12,14 @@ pub async fn lss_setup(uri: &str, mqtt_tx: mpsc::Sender) -> Resu // LSS required let (spk, msg_bytes) = LssBroker::get_server_pubkey(uri).await?; - let reply = ChannelRequest::send(topics::LSS_MSG, msg_bytes, &mqtt_tx).await?; + let reply = ChannelRequest::send(topics::INIT_MSG, msg_bytes, &mqtt_tx).await?; let ir = Response::from_slice(&reply)?.as_init()?; let lss_conn = LssBroker::new(uri, ir.clone(), spk).await?; // this only returns the initial state if it was requested by signer let msg_bytes2 = lss_conn.get_created_state_msg(&ir).await?; - let reply2 = ChannelRequest::send(topics::LSS_MSG, msg_bytes2, &mqtt_tx).await?; + let reply2 = ChannelRequest::send(topics::INIT_MSG, msg_bytes2, &mqtt_tx).await?; let cr = Response::from_slice(&reply2)?.as_created()?; lss_conn.handle(Response::Created(cr)).await?; @@ -27,7 +27,7 @@ pub async fn lss_setup(uri: &str, mqtt_tx: mpsc::Sender) -> Resu Ok(lss_conn) } -pub fn lss_tasks(lss_conn: LssBroker, mut lss_rx: mpsc::Receiver, mut reconn_rx: mpsc::Receiver<(String, bool)>, mqtt_tx: mpsc::Sender) { +pub fn lss_tasks(lss_conn: LssBroker, mut lss_rx: mpsc::Receiver, mut reconn_rx: mpsc::Receiver<(String, bool)>, init_tx: mpsc::Sender) { // msg handler (from CLN looper) let lss_conn_ = lss_conn.clone(); tokio::task::spawn(async move{ @@ -45,12 +45,12 @@ pub fn lss_tasks(lss_conn: LssBroker, mut lss_rx: mpsc::Receiver, mut re // reconnect handler (when a client reconnects) let lss_conn_ = lss_conn.clone(); - let mqtt_tx_ = mqtt_tx.clone(); + let init_tx_ = init_tx.clone(); tokio::task::spawn(async move{ while let Some((cid, connected)) = reconn_rx.recv().await { if connected { log::info!("CLIENT {} reconnected!", cid); - if let Err(e) = reconnect_dance(&cid, &lss_conn_, &mqtt_tx_).await { + if let Err(e) = reconnect_dance(&cid, &lss_conn_, &init_tx_).await { log::error!("reconnect dance failed {:?}", e); } } @@ -62,10 +62,10 @@ async fn reconnect_dance(cid: &str, lss_conn: &LssBroker, mqtt_tx: &mpsc::Sender // sleep 3 seconds to make sure ESP32 subscription is active tokio::time::sleep(Duration::from_secs(3)).await; let init_bytes = lss_conn.make_init_msg().await?; - let reply = ChannelRequest::send_for(cid, topics::LSS_MSG, init_bytes, mqtt_tx).await?; + let reply = ChannelRequest::send_for(cid, topics::INIT_MSG, init_bytes, mqtt_tx).await?; let ir = Response::from_slice(&reply)?.as_init()?; let state_bytes = lss_conn.get_created_state_msg(&ir).await?; - let reply2 = ChannelRequest::send_for(cid, topics::LSS_MSG, state_bytes, mqtt_tx).await?; + let reply2 = ChannelRequest::send_for(cid, topics::INIT_MSG, state_bytes, mqtt_tx).await?; let cr = Response::from_slice(&reply2)?.as_created()?; lss_conn.handle(Response::Created(cr)).await?; Ok(()) diff --git a/broker/src/main.rs b/broker/src/main.rs index fe848d5..bd800b6 100644 --- a/broker/src/main.rs +++ b/broker/src/main.rs @@ -70,13 +70,14 @@ async fn run_main(parent_fd: i32) -> rocket::Rocket { let settings = read_broker_config(); let (mqtt_tx, mqtt_rx) = mpsc::channel(10000); + let (init_tx, init_rx) = mpsc::channel(10000); let (error_tx, error_rx) = broadcast::channel(10000); error_log::log_errors(error_rx); let (reconn_tx, reconn_rx) = mpsc::channel::<(String, bool)>(10000); // waits until first connection - let conns = broker_setup(settings, mqtt_rx, reconn_tx.clone(), error_tx.clone()).await; + let conns = broker_setup(settings, mqtt_rx, init_rx, reconn_tx.clone(), error_tx.clone()).await; tokio::time::sleep(Duration::from_secs(1)).await; @@ -84,7 +85,7 @@ async fn run_main(parent_fd: i32) -> rocket::Rocket { let _lss_broker = if let Ok(lss_uri) = env::var("VLS_LSS") { // waits until LSS confirmation from signer let lss_broker = loop { - match lss::lss_setup(&lss_uri, mqtt_tx.clone()).await{ + match lss::lss_setup(&lss_uri, init_tx.clone()).await{ Ok(l) => { break l; }, @@ -95,7 +96,7 @@ async fn run_main(parent_fd: i32) -> rocket::Rocket { } } }; - lss::lss_tasks(lss_broker.clone(), lss_rx, reconn_rx, mqtt_tx.clone()); + lss::lss_tasks(lss_broker.clone(), lss_rx, reconn_rx, init_tx); log::info!("=> lss broker connection created!"); Some(lss_broker) } else { @@ -137,6 +138,7 @@ async fn run_main(parent_fd: i32) -> rocket::Rocket { pub async fn broker_setup( settings: Settings, mqtt_rx: mpsc::Receiver, + init_rx: mpsc::Receiver, reconn_tx: mpsc::Sender<(String, bool)>, error_tx: broadcast::Sender>, ) -> Arc> { @@ -161,6 +163,7 @@ pub async fn broker_setup( start_broker( settings, mqtt_rx, + init_rx, status_tx, error_tx.clone(), auth_tx, @@ -169,7 +172,7 @@ pub async fn broker_setup( .expect("BROKER FAILED TO START"); // client connections state - let (startup_tx, startup_rx) = std_oneshot::channel(); + let (startup_tx, startup_rx) = std_oneshot::channel::(); let conns_ = conns.clone(); let reconn_tx_ = reconn_tx.clone(); std::thread::spawn(move || { @@ -180,7 +183,7 @@ pub async fn broker_setup( cs.client_action(&cid, connected); drop(cs); log::info!("=> connected: {}: {}", cid, connected); - let _ = startup_tx.send(true); + let _ = startup_tx.send(cid.to_string()); while let Ok((cid, connected)) = status_rx.recv() { log::info!("=> reconnected: {}: {}", cid, connected); let mut cs = conns_.lock().unwrap(); @@ -189,7 +192,7 @@ pub async fn broker_setup( let _ = reconn_tx_.blocking_send((cid, connected)); } }); - let _ = startup_rx.recv(); + let _first_client_id = startup_rx.recv(); conns } diff --git a/broker/src/mqtt.rs b/broker/src/mqtt.rs index 8a922e8..b938a41 100644 --- a/broker/src/mqtt.rs +++ b/broker/src/mqtt.rs @@ -14,6 +14,7 @@ use std::time::Duration; pub fn start_broker( settings: Settings, mut receiver: mpsc::Receiver, + mut init_receiver: mpsc::Receiver, status_sender: std::sync::mpsc::Sender<(String, bool)>, error_sender: broadcast::Sender>, auth_sender: std::sync::mpsc::Sender, @@ -74,44 +75,25 @@ pub fn start_broker( } }); - // String is the client id + // (client_id, topic_end, payload). topic_end is always topics::LSS_RES + let (init_tx, init_rx) = std::sync::mpsc::channel::<(String, String, Vec)>(); + + let mut link_tx_ = link_tx.clone(); + let conns_ = connections.clone(); + // receive replies from LSS initialization + let _init_task = std::thread::spawn(move || { + while let Some(msg) = init_receiver.blocking_recv() { + pub_and_wait(msg, &conns_, &init_rx, &mut link_tx_); + } + }); + + // (client_id, topic_end, payload) let (msg_tx, msg_rx) = std::sync::mpsc::channel::<(String, String, Vec)>(); // receive from CLN, Frontend, Controller, or LSS - let conns_ = connections.clone(); let _relay_task = std::thread::spawn(move || { while let Some(msg) = receiver.blocking_recv() { - loop { - let reply = if let Some(cid) = msg.cid.clone() { - // for a specific client - pub_wait(&cid, &msg.topic, &msg.message, &msg_rx, &mut link_tx) - } else { - // send to each client in turn - let cs = conns_.lock().unwrap(); - let client_list = cs.clients.clone(); - drop(cs); - // wait a second if there are no clients - if client_list.len() == 0 { - std::thread::sleep(Duration::from_secs(1)); - None - } else { - let mut rep = None; - for cid in client_list.iter() { - rep = pub_wait(&cid, &msg.topic, &msg.message, &msg_rx, &mut link_tx); - if let Some(_) = &rep { - break; - } - } - rep - } - }; - if let Some(reply) = reply { - if let Err(_) = msg.reply_tx.send(reply) { - log::warn!("could not send on reply_tx"); - } - break; - } - } + pub_and_wait(msg, &connections, &msg_rx, &mut link_tx); } }); @@ -136,10 +118,15 @@ pub fn start_broker( } let cid = ts[0].to_string(); let topic_end = ts[1].to_string(); - if let Err(e) = - msg_tx.send((cid, topic_end, f.publish.payload.to_vec())) - { - log::error!("failed to pub to msg_tx! {:?}", e); + let pld = f.publish.payload.to_vec(); + if topic_end == topics::INIT_RES { + if let Err(e) = init_tx.send((cid, topic_end, pld)) { + log::error!("failed to pub to init_tx! {:?}", e); + } + } else { + if let Err(e) = msg_tx.send((cid, topic_end, pld)) { + log::error!("failed to pub to msg_tx! {:?}", e); + } } } } @@ -149,6 +136,7 @@ pub fn start_broker( } }); + // _init_task.await.unwrap(); // _relay_task.await.unwrap(); // _sub_task.await.unwrap(); // _alerts_handle.await?; @@ -158,9 +146,48 @@ pub fn start_broker( Ok(()) } +// waits forever until the reply is returned +fn pub_and_wait( + msg: ChannelRequest, + conns_: &Arc>, + msg_rx: &std::sync::mpsc::Receiver<(String, String, Vec)>, + link_tx: &mut LinkTx, +) { + loop { + let reply = if let Some(cid) = msg.cid.clone() { + // for a specific client + pub_timeout(&cid, &msg.topic, &msg.message, &msg_rx, link_tx) + } else { + // send to each client in turn + let cs = conns_.lock().unwrap(); + let client_list = cs.clients.clone(); + drop(cs); + // wait a second if there are no clients + if client_list.len() == 0 { + std::thread::sleep(Duration::from_secs(1)); + None + } else { + let mut rep = None; + for cid in client_list.iter() { + rep = pub_timeout(&cid, &msg.topic, &msg.message, &msg_rx, link_tx); + if let Some(_) = &rep { + break; + } + } + rep + } + }; + if let Some(reply) = reply { + if let Err(_) = msg.reply_tx.send(reply) { + log::warn!("could not send on reply_tx"); + } + break; + } + } +} + // publish to signer and wait for response -// if timeout is exceed, try next signer -fn pub_wait( +fn pub_timeout( client_id: &str, topic: &str, payload: &[u8], @@ -194,6 +221,8 @@ fn subs(cid: &str, mut ltx: LinkTx) { ltx.subscribe(format!("{}/{}", cid, topics::ERROR)).unwrap(); ltx.subscribe(format!("{}/{}", cid, topics::LSS_RES)) .unwrap(); + ltx.subscribe(format!("{}/{}", cid, topics::INIT_RES)) + .unwrap(); } fn unsubs(_cid: &str, mut _ltx: LinkTx) { diff --git a/broker/src/run_test.rs b/broker/src/run_test.rs index 2786256..34827b1 100644 --- a/broker/src/run_test.rs +++ b/broker/src/run_test.rs @@ -16,13 +16,14 @@ pub async fn run_test() -> rocket::Rocket { let settings = Settings::default(); let (mqtt_tx, mqtt_rx) = mpsc::channel(10000); + let (_init_tx, init_rx) = mpsc::channel(10000); let (error_tx, error_rx) = broadcast::channel(10000); let (conn_tx, _conn_rx) = mpsc::channel(10000); crate::error_log::log_errors(error_rx); // block until connection - let conns = crate::broker_setup(settings, mqtt_rx, conn_tx.clone(), error_tx.clone()).await; + let conns = crate::broker_setup(settings, mqtt_rx, init_rx, conn_tx.clone(), error_tx.clone()).await; log::info!("=> off to the races!"); let tx_ = mqtt_tx.clone(); diff --git a/sphinx-key/src/conn/mqtt.rs b/sphinx-key/src/conn/mqtt.rs index d0f9dcd..a1dd5bb 100644 --- a/sphinx-key/src/conn/mqtt.rs +++ b/sphinx-key/src/conn/mqtt.rs @@ -112,7 +112,9 @@ pub fn make_client( if topic.ends_with(topics::VLS) { tx.send(CoreEvent::VlsMessage(data)) .expect("couldnt send Event::VlsMessage"); - } else if topic.ends_with(topics::LSS_MSG) { + } else if topic.ends_with(topics::LSS_MSG) + || topic.ends_with(topics::INIT_MSG) + { log::debug!("received data len {}", data.len()); tx.send(CoreEvent::LssMessage(data)) .expect("couldnt send Event::LssMessage"); diff --git a/sphinx-key/src/core/events.rs b/sphinx-key/src/core/events.rs index 6090c77..8a69e54 100644 --- a/sphinx-key/src/core/events.rs +++ b/sphinx-key/src/core/events.rs @@ -35,7 +35,12 @@ pub enum Event { pub const ROOT_STORE: &str = "/sdcard/store"; -pub const SUB_TOPICS: &[&str] = &[topics::VLS, topics::LSS_MSG, topics::CONTROL]; +pub const SUB_TOPICS: &[&str] = &[ + topics::INIT_MSG, + topics::LSS_MSG, + topics::VLS, + topics::CONTROL, +]; fn mqtt_sub( mqtt: &mut EspMqttClient>, diff --git a/sphinx-key/src/core/lss.rs b/sphinx-key/src/core/lss.rs index 252a597..01baa24 100644 --- a/sphinx-key/src/core/lss.rs +++ b/sphinx-key/src/core/lss.rs @@ -26,7 +26,7 @@ pub fn init_lss( let server_pubkey = PublicKey::from_slice(&init.server_pubkey)?; let (lss_signer, res1) = LssSigner::new(&handler_builder, &server_pubkey); - let lss_res_topic = format!("{}/{}", client_id, topics::LSS_RES); + let lss_res_topic = format!("{}/{}", client_id, topics::INIT_RES); mqtt.publish(&lss_res_topic, QOS, false, &res1) .expect("could not publish LSS response"); diff --git a/tester/Cargo.lock b/tester/Cargo.lock index 217404f..5f2178f 100644 --- a/tester/Cargo.lock +++ b/tester/Cargo.lock @@ -242,7 +242,7 @@ checksum = "6dbe3c979c178231552ecba20214a8272df4e09f232a87aef4320cf06539aded" [[package]] name = "bolt-derive" version = "0.1.0" -source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=b355d2235b4ff0a223820e16889ba9cf939c0752#b355d2235b4ff0a223820e16889ba9cf939c0752" +source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=1228aba0c418009c926d78c21501fef0ecc96015#1228aba0c418009c926d78c21501fef0ecc96015" dependencies = [ "proc-macro2", "quote", @@ -1136,7 +1136,7 @@ dependencies = [ [[package]] name = "lss-connector" version = "0.1.0" -source = "git+https://github.com/stakwork/sphinx-rs.git#057476d30a233d82d85456a83a10401b72533a8c" +source = "git+https://github.com/stakwork/sphinx-rs.git#b84522dd596606aea9899969fe0d3f68d4eb6632" dependencies = [ "anyhow", "log", @@ -2081,7 +2081,7 @@ dependencies = [ [[package]] name = "sphinx-auther" version = "0.1.12" -source = "git+https://github.com/stakwork/sphinx-rs.git#057476d30a233d82d85456a83a10401b72533a8c" +source = "git+https://github.com/stakwork/sphinx-rs.git#b84522dd596606aea9899969fe0d3f68d4eb6632" dependencies = [ "anyhow", "base64 0.13.1", @@ -2093,7 +2093,7 @@ dependencies = [ [[package]] name = "sphinx-crypter" version = "0.1.0" -source = "git+https://github.com/stakwork/sphinx-rs.git#057476d30a233d82d85456a83a10401b72533a8c" +source = "git+https://github.com/stakwork/sphinx-rs.git#b84522dd596606aea9899969fe0d3f68d4eb6632" dependencies = [ "anyhow", "chacha20poly1305", @@ -2104,7 +2104,7 @@ dependencies = [ [[package]] name = "sphinx-glyph" version = "0.1.2" -source = "git+https://github.com/stakwork/sphinx-rs.git#057476d30a233d82d85456a83a10401b72533a8c" +source = "git+https://github.com/stakwork/sphinx-rs.git#b84522dd596606aea9899969fe0d3f68d4eb6632" dependencies = [ "anyhow", "hex", @@ -2143,7 +2143,7 @@ dependencies = [ [[package]] name = "sphinx-signer" version = "0.1.0" -source = "git+https://github.com/stakwork/sphinx-rs.git#057476d30a233d82d85456a83a10401b72533a8c" +source = "git+https://github.com/stakwork/sphinx-rs.git#b84522dd596606aea9899969fe0d3f68d4eb6632" dependencies = [ "anyhow", "bip39", @@ -2649,8 +2649,8 @@ checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" [[package]] name = "vls-core" -version = "0.9.0" -source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=b355d2235b4ff0a223820e16889ba9cf939c0752#b355d2235b4ff0a223820e16889ba9cf939c0752" +version = "0.9.1" +source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=1228aba0c418009c926d78c21501fef0ecc96015#1228aba0c418009c926d78c21501fef0ecc96015" dependencies = [ "anyhow", "bitcoin", @@ -2671,8 +2671,8 @@ dependencies = [ [[package]] name = "vls-persist" -version = "0.9.0" -source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=b355d2235b4ff0a223820e16889ba9cf939c0752#b355d2235b4ff0a223820e16889ba9cf939c0752" +version = "0.9.1" +source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=1228aba0c418009c926d78c21501fef0ecc96015#1228aba0c418009c926d78c21501fef0ecc96015" dependencies = [ "hex", "log", @@ -2684,8 +2684,8 @@ dependencies = [ [[package]] name = "vls-protocol" -version = "0.9.0" -source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=b355d2235b4ff0a223820e16889ba9cf939c0752#b355d2235b4ff0a223820e16889ba9cf939c0752" +version = "0.9.1" +source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=1228aba0c418009c926d78c21501fef0ecc96015#1228aba0c418009c926d78c21501fef0ecc96015" dependencies = [ "as-any", "bolt-derive", @@ -2698,8 +2698,8 @@ dependencies = [ [[package]] name = "vls-protocol-signer" -version = "0.9.0" -source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=b355d2235b4ff0a223820e16889ba9cf939c0752#b355d2235b4ff0a223820e16889ba9cf939c0752" +version = "0.9.1" +source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=1228aba0c418009c926d78c21501fef0ecc96015#1228aba0c418009c926d78c21501fef0ecc96015" dependencies = [ "bit-vec", "log",