diff --git a/Cargo.toml b/Cargo.toml index c6908db..3a1c696 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,8 @@ members = [ "signer", "broker", "parser", - "tester" + "auther", + "tester", ] exclude = [ diff --git a/auther/Cargo.toml b/auther/Cargo.toml new file mode 100644 index 0000000..6deb5cd --- /dev/null +++ b/auther/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "sphinx-key-auther" +version = "0.1.0" +authors = ["Evan Feenstra "] +edition = "2018" + +[dependencies] +secp256k1 = { version = "0.22.0", default-features = false, features = ["std", "rand-std", "bitcoin_hashes"] } +anyhow = {version = "1", features = ["backtrace"]} +log = "0.4" + +[features] +default = [ "no-std", "secp-recovery", "secp-lowmemory" ] +no-std = ["secp256k1/alloc"] +secp-lowmemory = ["secp256k1/lowmemory"] +secp-recovery = ["secp256k1/recovery"] +rand = ["secp256k1/rand-std"] \ No newline at end of file diff --git a/auther/src/lib.rs b/auther/src/lib.rs new file mode 100644 index 0000000..e2c4695 --- /dev/null +++ b/auther/src/lib.rs @@ -0,0 +1,64 @@ +use secp256k1::ecdsa::Signature; +use secp256k1::hashes::sha256d::Hash as Sha256dHash; +use secp256k1::hashes::Hash; +use secp256k1::{Message, Secp256k1, SecretKey}; + +pub struct Token(u64); + +impl Token { + pub fn new() -> Self { + Self(0) + } + /// Sign a Lightning message + pub fn sign_message( + &self, + message: &Vec, + secret_key: &SecretKey, + ) -> anyhow::Result> { + let mut buffer = String::from("Lightning Signed Message:").into_bytes(); + buffer.extend(message); + let secp_ctx = Secp256k1::signing_only(); + let hash = Sha256dHash::hash(&buffer); + let encmsg = secp256k1::Message::from_slice(&hash[..])?; + let sig = secp_ctx.sign_ecdsa_recoverable(&encmsg, &secret_key); + let (rid, sig) = sig.serialize_compact(); + let mut res = sig.to_vec(); + res.push(rid.to_i32() as u8); + Ok(res) + } +} + +pub fn sign( + secp: &Secp256k1, + input: Vec, + secret_key: &SecretKey, +) -> Signature { + let message = hash_message(input); + secp.sign_ecdsa(&message, &secret_key) +} + +pub fn hash_message(input: Vec) -> Message { + let hash = Sha256dHash::hash(&input); + Message::from_slice(&hash[..]).expect("encmsg failed") +} + +#[cfg(test)] +mod tests { + use crate::*; + use secp256k1::{PublicKey, Secp256k1, SecretKey}; + + fn secret_key() -> SecretKey { + SecretKey::from_slice(&[0xcd; 32]).expect("32 bytes, within curve order") + } + + #[test] + fn test_sign() { + let secp = Secp256k1::new(); + let sk = secret_key(); + let public_key = PublicKey::from_secret_key(&secp, &sk); + let input = vec![1, 2, 3]; + let message = hash_message(input); + let sig = sign(&secp, vec![1, 2, 3], &sk); + assert!(secp.verify_ecdsa(&message, &sig, &public_key).is_ok()); + } +} diff --git a/broker/Cargo.toml b/broker/Cargo.toml index a48e65a..434467f 100644 --- a/broker/Cargo.toml +++ b/broker/Cargo.toml @@ -6,7 +6,7 @@ default-run = "sphinx-key-broker" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -rumqttd = "0.11.0" +rumqttd = { git = "https://github.com/Evanfeenstra/rumqtt", branch = "metrics" } pretty_env_logger = "0.4.0" confy = "0.4.0" tokio = { version = "1.4.0", features = ["rt", "rt-multi-thread", "macros"] } @@ -20,12 +20,8 @@ fern = { version = "0.6", features = ["colored"] } rumqttc = "0.12.0" clap = "=3.0.0-beta.2" clap_derive = "=3.0.0-beta.5" +chrono = "0.4" [features] default = ["std"] std = ["vls-protocol/std"] - -[[bin]] -name = "test_client" -path = "src/test_client.rs" -# ./target/debug/test_client \ No newline at end of file diff --git a/broker/src/main.rs b/broker/src/main.rs index b3459d2..91f4db9 100644 --- a/broker/src/main.rs +++ b/broker/src/main.rs @@ -2,6 +2,7 @@ mod init; mod mqtt; mod run_test; mod unix_fd; +mod util; use crate::mqtt::start_broker; use crate::unix_fd::SignerLoop; @@ -10,7 +11,6 @@ use std::env; use tokio::sync::{mpsc, oneshot}; use vls_proxy::client::UnixClient; use vls_proxy::connection::{open_parent_fd, UnixConnection}; -use vls_proxy::util::setup_logging; pub struct Channel { pub sequence: u16, @@ -18,12 +18,14 @@ pub struct Channel { } /// Responses are received on the oneshot sender +#[derive(Debug)] pub struct ChannelRequest { pub message: Vec, pub reply_tx: oneshot::Sender, } // mpsc reply +#[derive(Debug)] pub struct ChannelReply { pub reply: Vec, } @@ -31,7 +33,7 @@ pub struct ChannelReply { fn main() -> anyhow::Result<()> { let parent_fd = open_parent_fd(); - setup_logging("hsmd ", "info"); + util::setup_logging("hsmd ", "info"); let app = App::new("signer") .setting(AppSettings::NoAutoVersion) .about("CLN:mqtt - connects to an embedded VLS over a MQTT connection") @@ -49,7 +51,7 @@ fn main() -> anyhow::Result<()> { // Pretend to be the right version, given to us by an env var let version = env::var("GREENLIGHT_VERSION").expect("set GREENLIGHT_VERSION to match c-lightning"); - println!("{}", version); + log::info!("{}", version); return Ok(()); } @@ -57,12 +59,14 @@ fn main() -> anyhow::Result<()> { run_test::run_test(); } else { let (tx, rx) = mpsc::channel(1000); - let runtime = start_broker(true, rx); + let (status_tx, _status_rx) = mpsc::channel(1000); + let runtime = start_broker(rx, status_tx, "sphinx-1"); runtime.block_on(async { init::connect(tx.clone()).await; // listen to reqs from CLN let conn = UnixConnection::new(parent_fd); let client = UnixClient::new(conn); + // TODO pass status_rx into SignerLoop let mut signer_loop = SignerLoop::new(client, tx); signer_loop.start(); }) diff --git a/broker/src/mqtt.rs b/broker/src/mqtt.rs index 8ad5fce..fd09938 100644 --- a/broker/src/mqtt.rs +++ b/broker/src/mqtt.rs @@ -1,60 +1,72 @@ use crate::{ChannelReply, ChannelRequest}; -use librumqttd::{async_locallink::construct_broker, Config}; +use librumqttd::{ + async_locallink, + consolelink::{self, ConsoleLink}, + rumqttlog::router::ConnectionMetrics, + Config, +}; + +use std::sync::Arc; use std::thread; -use tokio::sync::{mpsc, oneshot}; +use std::time::Duration; +use tokio::sync::mpsc; +use tokio::time::timeout; const SUB_TOPIC: &str = "sphinx-return"; const PUB_TOPIC: &str = "sphinx"; +const USERNAME: &str = "sphinx-key"; +const PASSWORD: &str = "sphinx-key-pass"; pub fn start_broker( - wait_for_ready_message: bool, mut receiver: mpsc::Receiver, + status_sender: mpsc::Sender, + expected_client_id: &str, ) -> tokio::runtime::Runtime { - let config: Config = confy::load_path("config/rumqttd.conf").unwrap(); + let config = config(); + let client_id = expected_client_id.to_string(); - let (mut router, console, servers, builder) = construct_broker(config); + let (mut router, servers, builder) = async_locallink::construct(config.clone()); thread::spawn(move || { router.start().expect("could not start router"); }); + let mut client_connected = false; + let mut rt_builder = tokio::runtime::Builder::new_multi_thread(); rt_builder.enable_all(); let rt = rt_builder.build().unwrap(); rt.block_on(async { - // channel to block until READY received - let (ready_tx, ready_rx) = oneshot::channel(); tokio::spawn(async move { let (msg_tx, mut msg_rx): (mpsc::Sender>, mpsc::Receiver>) = mpsc::channel(1000); - let (mut tx, mut rx) = builder.connect("localclient", 200).await.unwrap(); - tx.subscribe([SUB_TOPIC]).await.unwrap(); + let (mut link_tx, mut link_rx) = + builder.clone().connect("localclient", 200).await.unwrap(); + link_tx.subscribe([SUB_TOPIC]).await.unwrap(); - let console_task = tokio::spawn(console); + let router_tx = builder.router_tx(); + tokio::spawn(async move { + let config = config.clone().into(); + let router_tx = router_tx.clone(); + let console: Arc = Arc::new(ConsoleLink::new(config, router_tx)); + loop { + let metrics = consolelink::request_metrics(console.clone(), client_id.clone()); + if let Some(c) = metrics_to_status(metrics, client_connected) { + client_connected = c; + status_sender + .send(c) + .await + .expect("couldnt send connection status"); + } + tokio::time::sleep(Duration::from_millis(500)).await; + } + }); let sub_task = tokio::spawn(async move { - // ready message loop - // let ready_tx_ = ready_tx.clone(); - if wait_for_ready_message { - loop { - let message = rx.recv().await.unwrap(); - if let Some(payload) = message.payload.get(0) { - let content = String::from_utf8_lossy(&payload[..]); - log::info!("received message content: {}", content); - if content == "READY" { - ready_tx.send(true).expect("could not send ready"); - break; - } - } - } - } - loop { - let message = rx.recv().await.unwrap(); - // println!("T = {}, P = {:?}", message.topic, message.payload.len()); - // println!("count {}", message.payload.len()); + while let Ok(message) = link_rx.recv().await { for payload in message.payload { if let Err(e) = msg_tx.send(payload.to_vec()).await { - println!("pub err {:?}", e); + log::warn!("pub err {:?}", e); } } } @@ -62,12 +74,16 @@ pub fn start_broker( let relay_task = tokio::spawn(async move { while let Some(msg) = receiver.recv().await { - tx.publish(PUB_TOPIC, false, msg.message) + link_tx + .publish(PUB_TOPIC, false, msg.message) .await .expect("could not mqtt pub"); - let reply = msg_rx.recv().await.expect("could not unwrap msg_rx.recv()"); - if let Err(_) = msg.reply_tx.send(ChannelReply { reply }) { - log::warn!("could not send on reply_tx"); + if let Ok(reply) = timeout(Duration::from_millis(1000), msg_rx.recv()).await { + if let Err(_) = msg.reply_tx.send(ChannelReply { + reply: reply.unwrap(), + }) { + log::warn!("could not send on reply_tx"); + } } } }); @@ -75,13 +91,81 @@ pub fn start_broker( servers.await; sub_task.await.unwrap(); relay_task.await.unwrap(); - console_task.await.unwrap(); }); - if wait_for_ready_message { - log::info!("waiting for READY..."); - ready_rx.await.expect("Could not receive from channel."); - } }); rt } + +fn metrics_to_status(metrics: ConnectionMetrics, client_connected: bool) -> Option { + match metrics.tracker() { + Some(t) => { + // wait for subscription to be sure + if t.concrete_subscriptions_count() > 0 { + if !client_connected { + Some(true) // changed to true + } else { + None + } + } else { + None + } + } + None => { + if client_connected { + Some(false) + } else { + None + } + } + } +} + +fn config() -> Config { + use librumqttd::rumqttlog::Config as RouterConfig; + use librumqttd::{ + ConnectionLoginCredentials, ConnectionSettings, ConsoleSettings, ServerSettings, + }; + use std::collections::HashMap; + use std::net::{Ipv4Addr, SocketAddrV4}; + use std::path::PathBuf; + let id = 0; + let router = RouterConfig { + id, + dir: PathBuf::from("/tmp/rumqttd"), + max_segment_size: 10240, + max_segment_count: 10, + max_connections: 10001, + }; + let mut servers = HashMap::new(); + servers.insert( + "0".to_string(), + ServerSettings { + cert: None, + listen: SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 1883).into(), + next_connection_delay_ms: 1, + connections: ConnectionSettings { + connection_timeout_ms: 5000, + max_client_id_len: 256, + throttle_delay_ms: 0, + max_payload_size: 5120, + max_inflight_count: 200, + max_inflight_size: 1024, + login_credentials: Some(vec![ConnectionLoginCredentials { + username: USERNAME.to_string(), + password: PASSWORD.to_string(), + }]), + }, + }, + ); + Config { + id, + servers, + router, + console: ConsoleSettings { + listen: SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 3030).into(), + }, + cluster: None, + replicator: None, + } +} diff --git a/broker/src/run_test.rs b/broker/src/run_test.rs index f856ac9..2039f72 100644 --- a/broker/src/run_test.rs +++ b/broker/src/run_test.rs @@ -5,6 +5,8 @@ use tokio::sync::{mpsc, oneshot}; use vls_protocol::serde_bolt::WireString; use vls_protocol::{msgs, msgs::Message}; +const CLIENT_ID: &str = "test-1"; + pub fn run_test() { log::info!("TEST..."); @@ -12,16 +14,31 @@ pub fn run_test() { let mut sequence = 1; let (tx, rx) = mpsc::channel(1000); - let runtime = start_broker(true, rx); - log::info!("======> READY received! start now"); + let (status_tx, mut status_rx) = mpsc::channel(1000); + let runtime = start_broker(rx, status_tx, CLIENT_ID); runtime.block_on(async { + let mut connected = false; loop { - if let Err(e) = iteration(id, sequence, tx.clone()).await { - panic!("iteration failed {:?}", e); - } - sequence = sequence.wrapping_add(1); - id += 1; - tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + tokio::select! { + status = status_rx.recv() => { + if let Some(connection_status) = status { + connected = connection_status; + id = 0; + sequence = 1; + log::info!("========> CONNECTED! {}", connection_status); + } + } + res = iteration(id, sequence, tx.clone(), connected) => { + if let Err(e) = res { + log::warn!("===> iteration failed {:?}", e); + } + if connected { + sequence = sequence.wrapping_add(1); + id += 1; + } + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + } + }; } }); } @@ -30,7 +47,12 @@ pub async fn iteration( id: u16, sequence: u16, tx: mpsc::Sender, + connected: bool, ) -> anyhow::Result<()> { + if !connected { + return Ok(()); + } + // log::info!("do a ping!"); let ping = msgs::Ping { id, message: WireString("ping".as_bytes().to_vec()), @@ -42,7 +64,7 @@ pub async fn iteration( message: ping_bytes, reply_tx, }; - let _ = tx.send(request).await; + tx.send(request).await?; let res = reply_rx.await?; let reply = parser::response_from_bytes(res.reply, sequence)?; match reply { diff --git a/broker/src/test_client.rs b/broker/src/test_client.rs deleted file mode 100644 index 7bf0f80..0000000 --- a/broker/src/test_client.rs +++ /dev/null @@ -1,59 +0,0 @@ - -use sphinx_key_parser::MsgDriver; - -use tokio::{task, time}; -use rumqttc::{self, AsyncClient, MqttOptions, QoS, Event, Packet}; -use std::error::Error; -use std::time::Duration; -use vls_protocol::msgs; - -#[tokio::main(worker_threads = 1)] -async fn main() -> Result<(), Box> { - pretty_env_logger::init(); - // color_backtrace::install(); - - let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883); - mqttoptions.set_credentials("sphinx-key", "sphinx-key-pass"); - mqttoptions.set_keep_alive(Duration::from_secs(5)); - - let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10); - task::spawn(async move { - requests(client).await; - time::sleep(Duration::from_secs(3)).await; - }); - - loop { - let event = eventloop.poll().await; - // println!("{:?}", event.unwrap()); - if let Event::Incoming(packet) = event.unwrap() { - if let Packet::Publish(p) = packet { - let mut m = MsgDriver::new(p.payload.to_vec()); - let (sequence, dbid) = msgs::read_serial_request_header(&mut m).expect("read ping header"); - assert_eq!(dbid, 0); - assert_eq!(sequence, 0); - let ping: msgs::Ping = - msgs::read_message(&mut m).expect("failed to read ping message"); - println!("INCOMING: {:?}", ping); - } - } - } -} - -async fn requests(client: AsyncClient) { - - client - .subscribe("sphinx", QoS::AtMostOnce) - .await - .unwrap(); - - for _ in 1..=10 { - client - .publish("trigger", QoS::AtMostOnce, false, vec![1; 1]) - .await - .unwrap(); - - time::sleep(Duration::from_secs(1)).await; - } - - time::sleep(Duration::from_secs(120)).await; -} \ No newline at end of file diff --git a/broker/src/util.rs b/broker/src/util.rs new file mode 100644 index 0000000..a2ae7d7 --- /dev/null +++ b/broker/src/util.rs @@ -0,0 +1,34 @@ +use std::env; +use std::str::FromStr; + +pub fn setup_logging(who: &str, level_arg: &str) { + use fern::colors::{Color, ColoredLevelConfig}; + let colors = ColoredLevelConfig::new() + .info(Color::Green) + .error(Color::Red) + .warn(Color::Yellow); + let level = env::var("RUST_LOG").unwrap_or(level_arg.to_string()); + let who = who.to_string(); + fern::Dispatch::new() + .format(move |out, message, record| { + out.finish(format_args!( + "[{} {}/{} {}] {}", + chrono::Local::now().format("%Y-%m-%dT%H:%M:%S%.3f"), + who, + record.target(), + colors.color(record.level()), + message + )) + }) + .level(log::LevelFilter::from_str(&level).expect("level")) + .level_for("h2", log::LevelFilter::Info) + .level_for("sled", log::LevelFilter::Info) + .level_for( + "librumqttd::rumqttlog::router::router", + log::LevelFilter::Warn, + ) + .chain(std::io::stdout()) + // .chain(fern::log_file("/tmp/output.log")?) + .apply() + .expect("log config"); +} diff --git a/tester/src/main.rs b/tester/src/main.rs index 43ebe9c..7e007e6 100644 --- a/tester/src/main.rs +++ b/tester/src/main.rs @@ -20,97 +20,116 @@ async fn main() -> Result<(), Box> { .setting(AppSettings::NoAutoVersion) .about("CLN:mqtt-tester - MQTT client signer") .arg(Arg::from("--test run a test against the embedded device")); - - let mut try_i = 0; - let (client, mut eventloop) = loop { - let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883); - mqttoptions.set_credentials(USERNAME, PASSWORD); - mqttoptions.set_keep_alive(Duration::from_secs(5)); - let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10); - match eventloop.poll().await { - Ok(event) => { - if let Some(_) = incoming_conn_ack(event) { - println!("==========> MQTT connected!"); - break (client, eventloop); + let matches = app.get_matches(); + let is_test = matches.is_present("test"); + // main loop - alternate between "reconnection" and "handler" + loop { + let mut try_i = 0; + let (client, mut eventloop) = loop { + let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883); + mqttoptions.set_credentials(USERNAME, PASSWORD); + mqttoptions.set_keep_alive(Duration::from_secs(5)); + let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10); + match eventloop.poll().await { + Ok(event) => { + if let Some(_) = incoming_conn_ack(event) { + println!("==========> MQTT connected!"); + break (client, eventloop); + } + } + Err(_) => { + try_i = try_i + 1; + println!("reconnect.... {}", try_i); + tokio::time::sleep(Duration::from_secs(1)).await; } } - Err(_) => { - try_i = try_i + 1; - println!("reconnect.... {}", try_i); - tokio::time::sleep(Duration::from_secs(1)).await; - } - } - }; - - client - .subscribe(SUB_TOPIC, QoS::AtMostOnce) - .await - .expect("could not mqtt subscribe"); - - client - .publish( - PUB_TOPIC, - QoS::AtMostOnce, - false, - "READY".as_bytes().to_vec(), - ) - .await - .expect("could not pub"); - - let matches = app.get_matches(); - if matches.is_present("test") { - loop { - let event = eventloop.poll().await.expect("failed to unwrap event"); - // println!("{:?}", event); - if let Some(ping_bytes) = incoming_bytes(event) { - let (ping, sequence, dbid): (msgs::Ping, u16, u64) = - parser::request_from_bytes(ping_bytes).expect("read ping header"); - println!("sequence {}", sequence); - println!("dbid {}", dbid); - println!("INCOMING: {:?}", ping); - let pong = msgs::Pong { - id: ping.id, - message: ping.message, - }; - let bytes = parser::raw_response_from_msg(pong, sequence) - .expect("couldnt parse raw response"); - client - .publish(PUB_TOPIC, QoS::AtMostOnce, false, bytes) - .await - .expect("could not mqtt publish"); - } - } - } else { - // once the init loop is done, the root_handler is returned - let root_handler = loop { - let init_event = eventloop.poll().await.expect("failed to unwrap event"); - // this may be another kind of message like MQTT ConnAck - // loop around again and wait for the init - if let Some(init_msg_bytes) = incoming_bytes(init_event) { - let InitResponse { - root_handler, - init_reply, - } = sphinx_key_signer::init(init_msg_bytes).expect("failed to init signer"); - client - .publish(PUB_TOPIC, QoS::AtMostOnce, false, init_reply) - .await - .expect("could not publish init response"); - // return the root_handler and finish the init loop - break root_handler; - } }; - // the actual loop - loop { - let event = eventloop.poll().await.expect("failed to unwrap event"); - let dummy_peer = PubKey([0; 33]); - if let Some(msg_bytes) = incoming_bytes(event) { - match sphinx_key_signer::handle(&root_handler, msg_bytes, dummy_peer.clone()) { - Ok(b) => client - .publish(PUB_TOPIC, QoS::AtMostOnce, false, b) - .await - .expect("could not publish init response"), - Err(e) => panic!("HANDLE FAILED {:?}", e), - }; + + client + .subscribe(SUB_TOPIC, QoS::AtMostOnce) + .await + .expect("could not mqtt subscribe"); + + if is_test { + // test handler loop + loop { + match eventloop.poll().await { + Ok(event) => { + // println!("{:?}", event); + if let Some(ping_bytes) = incoming_bytes(event) { + let (ping, sequence, dbid): (msgs::Ping, u16, u64) = + parser::request_from_bytes(ping_bytes).expect("read ping header"); + println!("sequence {}", sequence); + println!("dbid {}", dbid); + println!("INCOMING: {:?}", ping); + let pong = msgs::Pong { + id: ping.id, + message: ping.message, + }; + let bytes = parser::raw_response_from_msg(pong, sequence) + .expect("couldnt parse raw response"); + client + .publish(PUB_TOPIC, QoS::AtMostOnce, false, bytes) + .await + .expect("could not mqtt publish"); + } + } + Err(e) => { + log::warn!("diconnected {:?}", e); + tokio::time::sleep(Duration::from_secs(1)).await; + break; // break out of this loop to reconnect + } + } + } + } else { + // once the init loop is done, the root_handler is returned + let root_handler = loop { + if let Ok(init_event) = eventloop.poll().await { + // this may be another kind of message like MQTT ConnAck + // loop around again and wait for the init + if let Some(init_msg_bytes) = incoming_bytes(init_event) { + let InitResponse { + root_handler, + init_reply, + } = sphinx_key_signer::init(init_msg_bytes).expect("failed to init signer"); + client + .publish(PUB_TOPIC, QoS::AtMostOnce, false, init_reply) + .await + .expect("could not publish init response"); + // return the root_handler and finish the init loop + break Some(root_handler); + } + } else { + tokio::time::sleep(Duration::from_secs(1)).await; + log::warn!("failed to initialize! Lost connection"); + break None; + } + }; + // the actual handler loop + loop { + if let Some(rh) = &root_handler { + match eventloop.poll().await { + Ok(event) => { + let dummy_peer = PubKey([0; 33]); + if let Some(msg_bytes) = incoming_bytes(event) { + match sphinx_key_signer::handle(rh, msg_bytes, dummy_peer.clone()) { + Ok(b) => client + .publish(PUB_TOPIC, QoS::AtMostOnce, false, b) + .await + .expect("could not publish init response"), + Err(e) => panic!("HANDLE FAILED {:?}", e), + }; + } + } + Err(e) => { + log::warn!("diconnected {:?}", e); + tokio::time::sleep(Duration::from_secs(1)).await; + break; // break out of this loop to reconnect + } + } + } else { + break; + } } } }