diff --git a/broker/Cargo.lock b/broker/Cargo.lock index ec1354a..8064979 100644 --- a/broker/Cargo.lock +++ b/broker/Cargo.lock @@ -17,12 +17,6 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" -[[package]] -name = "adler32" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aae1277d39aeec15cb388266ecc24b11c80469deae6067e17a1a7aa9e5c1f234" - [[package]] name = "aead" version = "0.5.1" @@ -84,21 +78,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "alloc-no-stdlib" -version = "2.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc7bb162ec39d46ab1ca8c77bf72e890535becd1751bb45f64c597edb4c8c6b3" - -[[package]] -name = "alloc-stdlib" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94fb8275041c72129eb51b7d0322c29b8387a0386127718b096429201a5d6ece" -dependencies = [ - "alloc-no-stdlib", -] - [[package]] name = "android_system_properties" version = "0.1.5" @@ -129,12 +108,6 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c3419eecc9f5967e6f0f29a0c3fefe22bda6ea34b15798f3c452cb81f2c3fa7" -[[package]] -name = "ascii" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d92bec98840b8f03a5ff5413de5293bfcd8bf96467cf5452609f939ec6f5de16" - [[package]] name = "async-channel" version = "1.8.0" @@ -226,7 +199,11 @@ dependencies = [ "pin-project-lite", "rustversion", "serde", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", "sync_wrapper", + "tokio", "tower", "tower-http", "tower-layer", @@ -377,37 +354,6 @@ dependencies = [ "syn", ] -[[package]] -name = "brotli" -version = "3.3.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1a0b1dbcc8ae29329621f8d4f0d835787c1c38bb1401979b49d13b0b305ff68" -dependencies = [ - "alloc-no-stdlib", - "alloc-stdlib", - "brotli-decompressor", -] - -[[package]] -name = "brotli-decompressor" -version = "2.3.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b6561fd3f895a11e8f72af2cb7d22e08366bebc2b6b57f7744c4bda27034744" -dependencies = [ - "alloc-no-stdlib", - "alloc-stdlib", -] - -[[package]] -name = "buf_redux" -version = "0.8.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b953a6887648bb07a535631f2bc00fbdb2a2216f135552cb3f534ed136b9c07f" -dependencies = [ - "memchr", - "safemem", -] - [[package]] name = "bumpalo" version = "3.11.1" @@ -463,12 +409,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "chunked_transfer" -version = "1.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cca491388666e04d7248af3f60f0c40cfb0991c72205595d7c396e3510207d1a" - [[package]] name = "cipher" version = "0.4.3" @@ -803,16 +743,6 @@ version = "2.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "23d8666cb01533c39dde32bcbab8e227b4ed6679b2c925eba05feabea39508fb" -[[package]] -name = "deflate" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c86f7e25f518f4b81808a2cf1c50996a61f5c2eb394b2393bd87f2a4780a432f" -dependencies = [ - "adler32", - "gzip-header", -] - [[package]] name = "delegate" version = "0.6.2" @@ -1030,18 +960,6 @@ dependencies = [ "version_check", ] -[[package]] -name = "filetime" -version = "0.2.20" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a3de6e8d11b22ff9edc6d916f890800597d60f8b2da1caf2955c274638d6412" -dependencies = [ - "cfg-if 1.0.0", - "libc", - "redox_syscall", - "windows-sys 0.45.0", -] - [[package]] name = "fixedbitset" version = "0.4.2" @@ -1259,15 +1177,6 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574" -[[package]] -name = "gzip-header" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95cc527b92e6029a62960ad99aa8a6660faa4555fe5f731aab13aa6a921795a2" -dependencies = [ - "crc32fast", -] - [[package]] name = "h2" version = "0.3.15" @@ -1752,6 +1661,7 @@ checksum = "ff50ecb28bb86013e935fb6683ab1f6d3a20016f123c76fd4c27470076ac30f5" dependencies = [ "cfg-if 1.0.0", "generator", + "pin-utils", "scoped-tls", "serde", "serde_json", @@ -1874,16 +1784,6 @@ version = "0.3.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" -[[package]] -name = "mime_guess" -version = "2.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4192263c238a5f0d0c6bfd21f336a313a4ce1c450542449ca191bb657b4642ef" -dependencies = [ - "mime", - "unicase", -] - [[package]] name = "minimal-lexical" version = "0.2.1" @@ -1937,24 +1837,6 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" -[[package]] -name = "multipart" -version = "0.18.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "00dec633863867f29cb39df64a397cdf4a6354708ddd7759f70c7fb51c5f9182" -dependencies = [ - "buf_redux", - "httparse", - "log", - "mime", - "mime_guess", - "quick-error", - "rand", - "safemem", - "tempfile", - "twoway", -] - [[package]] name = "nanorand" version = "0.7.0" @@ -2051,15 +1933,6 @@ dependencies = [ "libc", ] -[[package]] -name = "num_threads" -version = "0.1.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2819ce041d2ee131036f4fc9d6ae7ae125a3a40e97ba64d04fe799ad9dabbb44" -dependencies = [ - "libc", -] - [[package]] name = "object" version = "0.29.0" @@ -2084,6 +1957,15 @@ version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "86f0b0d4bf799edbc74508c1e8bf170ff5f41238e5f8225603ca7caaae2b7860" +[[package]] +name = "oneshot" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc22d22931513428ea6cc089e942d38600e3d00976eef8c86de6b8a3aadec6eb" +dependencies = [ + "loom", +] + [[package]] name = "opaque-debug" version = "0.3.0" @@ -2764,31 +2646,6 @@ dependencies = [ "serde", ] -[[package]] -name = "rouille" -version = "3.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f86e4c51a773f953f02bbab5fd049f004bfd384341d62da2a079aff812ab176" -dependencies = [ - "base64", - "brotli", - "chrono", - "deflate", - "filetime", - "multipart", - "num_cpus", - "percent-encoding", - "rand", - "serde", - "serde_derive", - "serde_json", - "sha1", - "threadpool", - "time 0.3.17", - "tiny_http", - "url", -] - [[package]] name = "rumqttc" version = "0.12.0" @@ -2808,22 +2665,22 @@ dependencies = [ [[package]] name = "rumqttd" -version = "0.12.5" -source = "git+https://github.com/Evanfeenstra/rumqtt?branch=sphinx#c9e2174c9385ef99b95698c04d5911c991789b3c" +version = "0.12.6" +source = "git+https://github.com/Evanfeenstra/rumqtt?branch=sphinx-2#8811257886364ce0d1f09773c3aa5596e457d8c0" dependencies = [ + "axum", "bytes", "clap 4.1.4", "config", "flume", "metrics", "metrics-exporter-prometheus", + "oneshot", "parking_lot 0.11.2", - "rouille", "rustls-pemfile 1.0.1", "serde", "serde_json", "slab", - "sphinx-auther 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", "thiserror", "tokio", "tokio-rustls", @@ -2913,12 +2770,6 @@ version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4501abdff3ae82a1c1b477a17252eb69cee9e66eb915c1abaa4f44d873df9f09" -[[package]] -name = "safemem" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef703b7cb59335eae2eb93ceb664c0eb7ea6bf567079d843e09420219668e072" - [[package]] name = "scoped-tls" version = "1.0.1" @@ -3020,6 +2871,15 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db0969fff533976baadd92e08b1d102c5a3d8a8049eadfd69d4d1e3c5b2ed189" +dependencies = [ + "serde", +] + [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -3072,17 +2932,6 @@ dependencies = [ "unsafe-libyaml", ] -[[package]] -name = "sha1" -version = "0.10.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f04293dc80c3993519f2d7f6f511707ee7094fe0c6d3406feb330cdb3540eba3" -dependencies = [ - "cfg-if 1.0.0", - "cpufeatures", - "digest", -] - [[package]] name = "sha2" version = "0.10.6" @@ -3168,19 +3017,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "sphinx-auther" -version = "0.1.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33bd24149ede6f4ec091326eacf550cfa3fc00492d4e627a045c1bd690255362" -dependencies = [ - "anyhow", - "base64", - "hex", - "log", - "secp256k1", -] - [[package]] name = "sphinx-auther" version = "0.1.12" @@ -3203,7 +3039,7 @@ dependencies = [ "rmp-serde", "serde", "serde_json", - "sphinx-auther 0.1.12 (git+https://github.com/stakwork/sphinx-rs)", + "sphinx-auther", ] [[package]] @@ -3386,15 +3222,6 @@ dependencies = [ "once_cell", ] -[[package]] -name = "threadpool" -version = "1.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa" -dependencies = [ - "num_cpus", -] - [[package]] name = "time" version = "0.1.45" @@ -3413,8 +3240,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a561bf4617eebd33bca6434b988f39ed798e527f51a1e797d0ee4f61c0a38376" dependencies = [ "itoa", - "libc", - "num_threads", "serde", "time-core", "time-macros", @@ -3435,18 +3260,6 @@ dependencies = [ "time-core", ] -[[package]] -name = "tiny_http" -version = "0.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "389915df6413a2e74fb181895f933386023c71110878cd0825588928e64cdc82" -dependencies = [ - "ascii", - "chunked_transfer", - "httpdate", - "log", -] - [[package]] name = "tokio" version = "1.26.0" @@ -3716,15 +3529,6 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" -[[package]] -name = "twoway" -version = "0.1.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59b11b2b5241ba34be09c3cc85a36e56e48f9888862e19cedf23336d35316ed1" -dependencies = [ - "memchr", -] - [[package]] name = "txoo" version = "0.2.0" @@ -3787,15 +3591,6 @@ dependencies = [ "version_check", ] -[[package]] -name = "unicase" -version = "2.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50f37be617794602aabbeee0be4f259dc1778fabe05e2d67ee8f79326d5cb4f6" -dependencies = [ - "version_check", -] - [[package]] name = "unicode-bidi" version = "0.3.8" diff --git a/broker/Cargo.toml b/broker/Cargo.toml index b721c87..7cb1091 100644 --- a/broker/Cargo.toml +++ b/broker/Cargo.toml @@ -14,7 +14,7 @@ vls-protocol = { git = "https://gitlab.com/lightning-signer/validating-lightning vls-proxy = { git = "https://gitlab.com/lightning-signer/validating-lightning-signer.git", rev = "dcd8628893b5504b3ac2d3eb8cc5ed36f36d7625" } vls-frontend = { git = "https://gitlab.com/lightning-signer/validating-lightning-signer.git", rev = "dcd8628893b5504b3ac2d3eb8cc5ed36f36d7625" } vls-protocol-client = { git = "https://gitlab.com/lightning-signer/validating-lightning-signer.git", rev = "dcd8628893b5504b3ac2d3eb8cc5ed36f36d7625" } -rumqttd = { git = "https://github.com/Evanfeenstra/rumqtt", branch = "sphinx" } +rumqttd = { git = "https://github.com/Evanfeenstra/rumqtt", branch = "sphinx-2" } pretty_env_logger = "0.4.0" confy = "0.4.0" secp256k1 = { version = "0.24.0", features = ["rand-std", "bitcoin_hashes"] } diff --git a/broker/src/main.rs b/broker/src/main.rs index da007d3..7ff4200 100644 --- a/broker/src/main.rs +++ b/broker/src/main.rs @@ -8,7 +8,7 @@ mod unix_fd; mod util; use crate::chain_tracker::MqttSignerPort; -use crate::mqtt::start_broker; +use crate::mqtt::{check_auth, start_broker}; use crate::unix_fd::SignerLoop; use crate::util::read_broker_config; use clap::{arg, App}; @@ -16,6 +16,7 @@ use rocket::tokio::{ self, sync::{broadcast, mpsc, oneshot}, }; +use rumqttd::AuthMsg; use std::env; use std::sync::Arc; use url::Url; @@ -25,9 +26,27 @@ use vls_proxy::connection::{open_parent_fd, UnixConnection}; use vls_proxy::portfront::SignerPortFront; use vls_proxy::util::{add_hsmd_args, handle_hsmd_version}; +pub struct Connections { + pub pubkey: Option, + pub clients: Vec, +} + +impl Connections { + pub fn new() -> Self { + Self { + pubkey: None, + clients: Vec::new(), + } + } + pub fn set_pubkey(&mut self, pk: &str) { + self.pubkey = Some(pk.to_string()) + } +} + pub struct Channel { pub sequence: u16, pub sender: mpsc::Sender, + pub pubkey: [u8; 33], } /// Responses are received on the oneshot sender @@ -55,7 +74,7 @@ pub struct ChannelReply { pub reply: Vec, } -const CLIENT_ID: &str = "sphinx-1"; +// const CLIENT_ID: &str = "sphinx-1"; const BROKER_CONFIG_PATH: &str = "../broker.conf"; #[rocket::launch] @@ -99,19 +118,28 @@ async fn run_main(parent_fd: i32) -> rocket::Rocket { let settings = read_broker_config(BROKER_CONFIG_PATH); let (mqtt_tx, mqtt_rx) = mpsc::channel(10000); + let (auth_tx, auth_rx) = std::sync::mpsc::channel::(); // let (unix_tx, mut unix_rx) = mpsc::channel(10000); let (status_tx, mut status_rx) = mpsc::channel(10000); let (error_tx, error_rx) = broadcast::channel(10000); error_log::log_errors(error_rx); + let mut conns = Connections::new(); + + std::thread::spawn(move || { + while let Ok(am) = auth_rx.recv() { + let ok = check_auth(&am.username, &am.password, &mut conns); + let _ = am.reply.send(ok); + } + }); + log::info!("=> start broker on network: {}", settings.network); - start_broker(mqtt_rx, status_tx, error_tx.clone(), CLIENT_ID, settings) + start_broker(mqtt_rx, status_tx, error_tx.clone(), settings, auth_tx) .expect("BROKER FAILED TO START"); log::info!("=> wait for connected status"); // wait for connection = true let status = status_rx.recv().await.expect("couldnt receive"); - log::info!("=> connection status: {}", status); - // assert_eq!(status, true, "expected connected = true"); + log::info!("=> connected: {}: {}", status.0, status.1); // let mqtt_tx_ = mqtt_tx.clone(); // tokio::spawn(async move { diff --git a/broker/src/mqtt.rs b/broker/src/mqtt.rs index fb9ca74..2c40f47 100644 --- a/broker/src/mqtt.rs +++ b/broker/src/mqtt.rs @@ -1,7 +1,8 @@ use crate::util::Settings; use crate::{ChannelReply, ChannelRequest}; use rocket::tokio::{sync::broadcast, sync::mpsc}; -use rumqttd::{protocol::QoS, Alert, AlertEvent, Broker, Config, Notification}; +use rumqttd::{Alert, AlertEvent, AuthMsg, Broker, Config, Notification}; +use sphinx_signer::sphinx_glyph::sphinx_auther::token::Token; use sphinx_signer::sphinx_glyph::topics; use std::time::Duration; @@ -10,13 +11,13 @@ use std::time::Duration; pub fn start_broker( mut receiver: mpsc::Receiver, - status_sender: mpsc::Sender, + status_sender: mpsc::Sender<(String, bool)>, error_sender: broadcast::Sender>, - expected_client_id: &str, settings: Settings, + auth_sender: std::sync::mpsc::Sender, ) -> anyhow::Result<()> { let conf = config(settings); - let client_id = expected_client_id.to_string(); + // let client_id = expected_client_id.to_string(); let mut broker = Broker::new(conf); let mut alerts = broker.alerts(vec![ @@ -26,8 +27,11 @@ pub fn start_broker( ])?; let (mut link_tx, mut link_rx) = broker.link("localclient")?; + let auth_sender_ = auth_sender.clone(); std::thread::spawn(move || { - broker.start().expect("could not start broker"); + broker + .start(Some(auth_sender_)) + .expect("could not start broker"); }); // connected/disconnected status alerts @@ -37,13 +41,15 @@ pub fn start_broker( log::info!("Alert: {:?}", alert); match alert.1 { Alert::Event(cid, event) => { - if cid == client_id { + // dont alert for local connections + let locals = vec!["console", "localclient"]; + if !locals.contains(&cid.as_str()) { if let Some(status) = match event { AlertEvent::Connect => Some(true), AlertEvent::Disconnect => Some(false), _ => None, } { - let _ = status_sender_.blocking_send(status); + let _ = status_sender_.blocking_send((cid, status)); } } } @@ -78,8 +84,7 @@ pub fn start_broker( let _relay_task = std::thread::spawn(move || { while let Some(msg) = receiver.blocking_recv() { - let qos = QoS::AtLeastOnce; - if let Err(e) = link_tx.publish_qos(msg.topic, msg.message, qos) { + if let Err(e) = link_tx.publish(msg.topic, msg.message) { log::error!("failed to pub to link_tx! {:?}", e); } let rep = msg_rx.blocking_recv(); @@ -100,8 +105,30 @@ pub fn start_broker( Ok(()) } +pub fn check_auth(username: &str, password: &str, conns: &mut crate::Connections) -> bool { + match Token::from_base64(password) { + Ok(t) => match t.recover() { + Ok(pubkey) => { + if &pubkey.to_string() == username { + if let Some(pk) = &conns.pubkey { + // if there is an existing then it must match it + pk == username + } else { + conns.set_pubkey(username); + true + } + } else { + false + } + } + Err(_) => false, + }, + Err(_) => false, + } +} + fn config(settings: Settings) -> Config { - use rumqttd::{ConnectionSettings, ConsoleSettings, ServerSettings, SphinxLoginCredentials}; + use rumqttd::{ConnectionSettings, ConsoleSettings, ServerSettings}; use std::collections::HashMap; use std::net::{Ipv4Addr, SocketAddrV4}; let router = rumqttd::RouterConfig { @@ -126,7 +153,6 @@ fn config(settings: Settings) -> Config { max_inflight_count: 200, max_inflight_size: 1024, auth: None, - sphinx_auth: Some(SphinxLoginCredentials { within: None }), dynamic_filters: true, }, tls: None, diff --git a/broker/src/run_test.rs b/broker/src/run_test.rs index ea6e409..1c15050 100644 --- a/broker/src/run_test.rs +++ b/broker/src/run_test.rs @@ -1,13 +1,15 @@ -use crate::mqtt::start_broker; +use crate::mqtt::{check_auth, start_broker}; use crate::routes::launch_rocket; use crate::util::Settings; use crate::ChannelRequest; +use crate::Connections; use rocket::tokio::{self, sync::broadcast, sync::mpsc}; +use rumqttd::AuthMsg; use sphinx_signer::{parser, sphinx_glyph::topics}; use vls_protocol::serde_bolt::WireString; use vls_protocol::{msgs, msgs::Message}; -const CLIENT_ID: &str = "test-1"; +// const CLIENT_ID: &str = "test-1"; pub async fn run_test() -> rocket::Rocket { log::info!("TEST..."); @@ -18,24 +20,29 @@ pub async fn run_test() -> rocket::Rocket { let settings = Settings::default(); let (tx, rx) = mpsc::channel(1000); + let (auth_tx, auth_rx) = std::sync::mpsc::channel::(); let (status_tx, mut status_rx) = mpsc::channel(1000); let (error_tx, error_rx) = broadcast::channel(1000); crate::error_log::log_errors(error_rx); - start_broker(rx, status_tx, error_tx.clone(), CLIENT_ID, settings) + let mut conns = Connections::new(); + + std::thread::spawn(move || { + while let Ok(am) = auth_rx.recv() { + let ok = check_auth(&am.username, &am.password, &mut conns); + let _ = am.reply.send(ok); + } + }); + + start_broker(rx, status_tx, error_tx.clone(), settings, auth_tx) .expect("FAILED TO START BROKER"); log::info!("BROKER started!"); - log::info!("=> wait for connected status"); - // wait for connection = true - let status = status_rx.recv().await.expect("couldnt receive"); - log::info!("=> connection status: {}", status); - // let mut connected = false; // let tx_ = tx.clone(); tokio::spawn(async move { while let Some(status) = status_rx.recv().await { - log::info!("========> CONNECTED! {}", status); + log::info!("========> CONNECTED! {} {}", status.0, status.1); } }); // tokio::spawn(async move { diff --git a/broker/src/unix_fd.rs b/broker/src/unix_fd.rs index fd8c75f..b539f3e 100644 --- a/broker/src/unix_fd.rs +++ b/broker/src/unix_fd.rs @@ -20,6 +20,7 @@ impl Channel { Self { sender, sequence: 0, + pubkey: [0; 33], // init with empty pubkey } } } @@ -95,7 +96,9 @@ impl SignerLoop { self.client.write(reply)?; } msg => { + let mut catch_init = false; if let Message::HsmdInit(m) = msg { + catch_init = true; if let Some(set) = settings { if ChainHash::using_genesis_block(set.network).as_bytes() != &m.chain_params.0 @@ -106,7 +109,7 @@ impl SignerLoop { panic!("Got HsmdInit without settings - likely because HsmdInit was sent after startup"); } } - let reply = self.handle_message(raw_msg)?; + let reply = self.handle_message(raw_msg, catch_init)?; // Write the reply to the node self.client.write_vec(reply)?; // info!("replied {}", self.log_prefix); @@ -115,7 +118,7 @@ impl SignerLoop { } } - fn handle_message(&mut self, message: Vec) -> Result> { + fn handle_message(&mut self, message: Vec, catch_init: bool) -> Result> { let dbid = self.client_id.as_ref().map(|c| c.dbid).unwrap_or(0); let peer_id = self .client_id @@ -123,13 +126,29 @@ impl SignerLoop { .map(|c| c.peer_id.serialize()) .unwrap_or([0u8; 33]); let md = parser::raw_request_from_bytes(message, self.chan.sequence, peer_id, dbid)?; + // send to glyph let reply_rx = self.send_request(md)?; let res = self.get_reply(reply_rx)?; let reply = parser::raw_response_from_bytes(res, self.chan.sequence)?; + // add to the sequence self.chan.sequence = self.chan.sequence.wrapping_add(1); + // catch the pubkey if its the first one connection + if catch_init { + let _ = self.set_channel_pubkey(reply.clone()); + } Ok(reply) } + fn set_channel_pubkey(&mut self, raw_msg: Vec) -> Result<()> { + let msg = msgs::from_vec(raw_msg.clone())?; + match msg { + Message::HsmdInitReplyV2(r) => self.chan.pubkey = r.node_id.0, + Message::HsmdInit2Reply(r) => self.chan.pubkey = r.node_id.0, + _ => (), + }; + Ok(()) + } + fn send_request(&mut self, message: Vec) -> Result> { // Send a request to the MQTT handler to send to signer let (request, reply_rx) = ChannelRequest::new(topics::VLS, message); diff --git a/broker/src/util.rs b/broker/src/util.rs index 924c625..c756f3b 100644 --- a/broker/src/util.rs +++ b/broker/src/util.rs @@ -83,7 +83,7 @@ pub fn setup_logging(who: &str, level_arg: &str) { .level(log::LevelFilter::from_str(&level).expect("level")) .level_for("h2", log::LevelFilter::Info) .level_for("sled", log::LevelFilter::Info) - // .level_for("rumqttd", log::LevelFilter::Warn) + .level_for("rumqttd", log::LevelFilter::Warn) .level_for("rocket", log::LevelFilter::Warn) .level_for("tracing", log::LevelFilter::Warn) .level_for("_", log::LevelFilter::Warn)