From 7b29d1688415143f8320920017571dc00f6a7d6f Mon Sep 17 00:00:00 2001 From: Evan Feenstra Date: Mon, 6 Jun 2022 13:17:24 -0700 Subject: [PATCH 1/3] signer init and handler, replace BackgroundEventLoop with mpsc channel, msg parse on esp --- signer/src/lib.rs | 73 ++++++++++++++---- sphinx-key/src/conn/mqtt.rs | 21 +++--- sphinx-key/src/core/events.rs | 135 +++++++++++----------------------- sphinx-key/src/main.rs | 12 ++- 4 files changed, 117 insertions(+), 124 deletions(-) diff --git a/signer/src/lib.rs b/signer/src/lib.rs index 8fd5672..769e4a0 100644 --- a/signer/src/lib.rs +++ b/signer/src/lib.rs @@ -1,23 +1,70 @@ -use sphinx_key_parser::MsgDriver; use lightning_signer::persist::{DummyPersister, Persist}; use lightning_signer::Arc; -use vls_protocol::model::PubKey; -use vls_protocol::msgs; +use sphinx_key_parser::MsgDriver; +use vls_protocol::msgs::{self, read_serial_request_header, write_serial_response_header, Message}; use vls_protocol::serde_bolt::WireString; -use vls_protocol_signer::handler::{Handler, RootHandler}; use vls_protocol_signer::lightning_signer; use vls_protocol_signer::vls_protocol; -pub fn parse_ping(msg_bytes: Vec) -> msgs::Ping { - let mut m = MsgDriver::new(msg_bytes); - let (sequence, dbid) = msgs::read_serial_request_header(&mut m).expect("read ping header"); - let ping: msgs::Ping = - msgs::read_message(&mut m).expect("failed to read ping message"); - ping +pub use vls_protocol_signer::handler::{Handler, RootHandler}; + +pub struct InitResponse { + pub root_handler: RootHandler, + pub init_reply: Vec, } -pub fn say_hi() { +pub fn init(bytes: Vec) -> anyhow::Result { let persister: Arc = Arc::new(DummyPersister); - - println!("Hello, world!"); + let mut md = MsgDriver::new(bytes); + let (sequence, dbid) = read_serial_request_header(&mut md).expect("read init header"); + assert_eq!(dbid, 0); + assert_eq!(sequence, 0); + let init: msgs::HsmdInit2 = msgs::read_message(&mut md).expect("failed to read init message"); + log::info!("init {:?}", init); + let allowlist = init + .dev_allowlist + .iter() + .map(|s| from_wire_string(s)) + .collect::>(); + let seed = init.dev_seed.as_ref().map(|s| s.0).expect("no seed"); + let root_handler = RootHandler::new(0, Some(seed), persister, allowlist); + let init_reply = root_handler + .handle(Message::HsmdInit2(init)) + .expect("handle init"); + let mut reply = MsgDriver::new_empty(); + write_serial_response_header(&mut reply, sequence).expect("write init header"); + msgs::write_vec(&mut reply, init_reply.as_vec()).expect("write init reply"); + Ok(InitResponse { + root_handler, + init_reply: reply.bytes(), + }) +} + +pub fn handle(_root_handler: &RootHandler, _bytes: Vec) -> anyhow::Result> { + Ok(Vec::new()) +} + +pub fn parse_ping_and_form_response(msg_bytes: Vec) -> Vec { + let mut m = MsgDriver::new(msg_bytes); + let (sequence, _dbid) = msgs::read_serial_request_header(&mut m).expect("read ping header"); + let ping: msgs::Ping = msgs::read_message(&mut m).expect("failed to read ping message"); + let mut md = MsgDriver::new_empty(); + msgs::write_serial_response_header(&mut md, sequence) + .expect("failed to write_serial_request_header"); + let pong = msgs::Pong { + id: ping.id, + message: ping.message, + }; + msgs::write(&mut md, pong).expect("failed to serial write"); + md.bytes() +} + +// pub fn say_hi() { +// let persister: Arc = Arc::new(DummyPersister); + +// println!("Hello, world!"); +// } + +fn from_wire_string(s: &WireString) -> String { + String::from_utf8(s.0.to_vec()).expect("malformed string") } diff --git a/sphinx-key/src/conn/mqtt.rs b/sphinx-key/src/conn/mqtt.rs index 0bca42d..75f7f90 100644 --- a/sphinx-key/src/conn/mqtt.rs +++ b/sphinx-key/src/conn/mqtt.rs @@ -1,18 +1,14 @@ -use crate::core::events::Message; - -use embedded_svc::event_bus::Postbox; use embedded_svc::mqtt::client::utils::ConnState; use embedded_svc::mqtt::client::{Client, Connection, MessageImpl, Publish, QoS, Event, Message as MqttMessage}; use embedded_svc::mqtt::client::utils::Connection as MqttConnection; use esp_idf_svc::mqtt::client::*; use anyhow::Result; -use esp_idf_svc::eventloop::EspBackgroundEventLoop; use log::*; use std::thread; use esp_idf_sys::{self}; use esp_idf_sys::EspError; use esp_idf_hal::mutex::Condvar; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, mpsc}; pub const TOPIC: &str = "sphinx"; pub const RETURN_TOPIC: &str = "sphinx-return"; @@ -48,7 +44,7 @@ pub fn make_client(broker: &str) -> Result<( pub fn start_listening( mqtt: Arc>>>, mut connection: MqttConnection, - mut eventloop: EspBackgroundEventLoop + tx: mpsc::Sender>, ) -> Result<()> { // must start pumping before subscribe or publish will work @@ -61,11 +57,12 @@ pub fn start_listening( Ok(msg) => { if let Event::Received(msg) = msg { info!("MQTT MESSAGE RECEIVED!"); - if let Ok(m) = Message::new_from_slice(&msg.data()) { - if let Err(e) = eventloop.post(&m, None) { - warn!("failed to post to eventloop {:?}", e); - } - } + // if let Ok(m) = Message::new_from_slice(&msg.data()) { + // if let Err(e) = eventloop.post(&m, None) { + // warn!("failed to post to eventloop {:?}", e); + // } + // } + tx.send(msg.data().to_vec()).unwrap(); } }, } @@ -81,7 +78,7 @@ pub fn start_listening( RETURN_TOPIC, QoS::AtMostOnce, false, - format!("Hello from {}!", CLIENT_ID).as_bytes(), + format!("READY").as_bytes(), )?; Ok(()) diff --git a/sphinx-key/src/core/events.rs b/sphinx-key/src/core/events.rs index 6e641c4..8fecc39 100644 --- a/sphinx-key/src/core/events.rs +++ b/sphinx-key/src/core/events.rs @@ -1,107 +1,58 @@ use crate::conn::mqtt::RETURN_TOPIC; -use crate::periph::led::Led; -use sphinx_key_signer::parse_ping; +use sphinx_key_signer::{self, InitResponse}; +use std::sync::{mpsc}; +use std::thread; -use esp_idf_svc::eventloop::*; use embedded_svc::httpd::Result; -use esp_idf_sys::{self, c_types}; +use esp_idf_sys; use embedded_svc::mqtt::client::utils::ConnState; use embedded_svc::mqtt::client::{MessageImpl, Publish, QoS}; use esp_idf_svc::mqtt::client::*; use esp_idf_sys::EspError; use std::sync::{Arc, Mutex}; use log::*; -use std::cmp::min; -pub const MSG_SIZE: usize = 256; +pub fn make_event_thread(mqtt: Arc>>>, rx: mpsc::Receiver>) -> Result<()> { -#[derive(Copy, Clone, Debug)] -pub struct Message([u8; MSG_SIZE]); + thread::spawn(move||{ + let mut client = mqtt.lock().unwrap(); + info!("About to subscribe to the mpsc channel"); -impl Message { - pub fn _new(bytes: [u8; MSG_SIZE]) -> Self { - Self(bytes) - } - // the first byte is the length of the message - pub fn new_from_slice(src: &[u8]) -> Result { - if src.len() > MSG_SIZE - 1 { - return Err(anyhow::anyhow!("message too long")); + let init_msg_bytes = rx.recv().expect("NO INIT MSG"); + let InitResponse { root_handler, init_reply } = sphinx_key_signer::init(init_msg_bytes).expect("failed to init signer"); + client.publish( + RETURN_TOPIC, + QoS::AtMostOnce, + false, + init_reply, + ).expect("could not publish init response"); + + while let Ok(msg_bytes) = rx.recv() { + let _ret = match sphinx_key_signer::handle(&root_handler, msg_bytes) { + Ok(b) => client.publish(RETURN_TOPIC, QoS::AtMostOnce, false, b).expect("could not publish init response"), + Err(e) => panic!("HANDLE FAILED {:?}", e), + }; } - let mut dest = [0; MSG_SIZE]; - dest[0] = src.len() as u8; // this would crash if MSG_SIZE>256 - for i in 0..min(src.len(), MSG_SIZE) { - dest[i+1] = src[i]; + + }); + + Ok(()) +} + + +pub fn make_test_event_thread(mqtt: Arc>>>, rx: mpsc::Receiver>) -> Result<()> { + + thread::spawn(move||{ + let mut client = mqtt.lock().unwrap(); + info!("About to subscribe to the mpsc channel"); + + while let Ok(msg_bytes) = rx.recv() { + let b = sphinx_key_signer::parse_ping_and_form_response(msg_bytes); + log::info!("GOT A PING MESSAGE! returning pong now..."); + client.publish(RETURN_TOPIC, QoS::AtMostOnce, false, b).expect("could not publish init response"); } - Ok(Self(dest)) - } - pub fn read_bytes(&self) -> Vec { - let l = self.0[0] as usize; - self.0[1..l+1].to_vec() - } - pub fn read_string(&self) -> String { - String::from_utf8_lossy(&self.0).to_string() - } -} - -impl EspTypedEventSource for Message { - fn source() -> *const c_types::c_char { - b"SPHINX\0".as_ptr() as *const _ - } -} - -impl EspTypedEventSerializer for Message { - fn serialize( - event: &Message, - f: impl for<'a> FnOnce(&'a EspEventPostData) -> R, - ) -> R { - f(&unsafe { EspEventPostData::new(Self::source(), Self::event_id(), event) }) - } -} - -impl EspTypedEventDeserializer for Message { - fn deserialize( - data: &EspEventFetchData, - f: &mut impl for<'a> FnMut(&'a Message) -> R, - ) -> R { - f(unsafe { data.as_payload() }) - } -} - -pub fn make_eventloop(client: Arc>>>) -> Result<(EspBackgroundEventLoop, EspBackgroundSubscription)> { - use embedded_svc::event_bus::EventBus; - - info!("About to start a background event loop"); - let mut eventloop = EspBackgroundEventLoop::new( - &BackgroundLoopConfiguration { - task_stack_size: 8192, - .. Default::default() - }, - )?; - let mut green = Led::new(0x000100, 10); - - info!("About to subscribe to the background event loop"); - let subscription = eventloop.subscribe(move |message: &Message| { - info!("!!! Got message from the event loop"); //: {:?}", message.0); - green.blink(); - let msg_bytes = message.read_bytes(); - // let msg_str = String::from_utf8_lossy(&msg[..]); - match client.lock() { - Ok(mut m_) => { - let ping = parse_ping(msg_bytes); - if let Err(err) = m_.publish( - RETURN_TOPIC, - QoS::AtMostOnce, - false, - format!("Got and parsed the ping!!!").as_bytes(), - ) { - log::warn!("failed to mqtt publish! {:?}", err); - }; - }, - Err(_) => log::warn!("failed to lock Mutex") - }; - - })?; - // let subscription = eventloop.subscribe(cb)?; - - Ok((eventloop, subscription)) + + }); + + Ok(()) } diff --git a/sphinx-key/src/main.rs b/sphinx-key/src/main.rs index ae7140c..258a171 100644 --- a/sphinx-key/src/main.rs +++ b/sphinx-key/src/main.rs @@ -6,10 +6,9 @@ mod periph; use crate::core::{events::*, config::*}; use crate::periph::led::Led; -use sphinx_key_signer; use esp_idf_sys as _; // If using the `binstart` feature of `esp-idf-sys`, always keep this module imported use std::thread; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, mpsc}; use std::time::Duration; use anyhow::Result; @@ -25,8 +24,6 @@ fn main() -> Result<()> { esp_idf_svc::log::EspLogger::initialize_default(); - sphinx_key_signer::say_hi(); - thread::sleep(Duration::from_secs(1)); let default_nvs = Arc::new(EspDefaultNvs::new()?); @@ -42,8 +39,9 @@ fn main() -> Result<()> { let mqtt = Arc::new(Mutex::new(mqtt_and_conn.0)); // if the subscription goes out of scope its dropped // the sub needs to publish back to mqtt??? - let (eventloop, _sub) = make_eventloop(mqtt.clone())?; - let _mqtt_client = conn::mqtt::start_listening(mqtt, mqtt_and_conn.1, eventloop)?; + let (tx, rx) = mpsc::channel(); + make_test_event_thread(mqtt.clone(), rx)?; + let _mqtt_client = conn::mqtt::start_listening(mqtt, mqtt_and_conn.1, tx)?; let mut blue = Led::new(0x000001, 100); println!("{:?}", wifi.get_status()); @@ -52,7 +50,7 @@ fn main() -> Result<()> { blue.blink(); thread::sleep(Duration::from_secs(1)); } - drop(wifi); + // drop(wifi); } else { println!("=============> START SERVER NOW AND WAIT <=============="); if let Ok((wifi, config)) = start_config_server_and_wait(default_nvs.clone()) { From 066a44a5b6f0ae609830988f204f700eb0d8febc Mon Sep 17 00:00:00 2001 From: Evan Feenstra Date: Mon, 6 Jun 2022 14:01:01 -0700 Subject: [PATCH 2/3] VLS ping pong test running on ESP, no Mutex --- README.md | 2 ++ broker/src/mqtt.rs | 18 +++++++---- sphinx-key/src/conn/mqtt.rs | 21 ++++++------- sphinx-key/src/core/config.rs | 3 ++ sphinx-key/src/core/events.rs | 57 ++++++++++++----------------------- sphinx-key/src/main.rs | 18 +++++------ 6 files changed, 55 insertions(+), 64 deletions(-) diff --git a/README.md b/README.md index 924d1bb..792cdf5 100644 --- a/README.md +++ b/README.md @@ -83,3 +83,5 @@ then in the sphinx-key dir, with the CC variable set as above: `cargo build` and flash using the instructions further above + + diff --git a/broker/src/mqtt.rs b/broker/src/mqtt.rs index 3762ada..8ad5fce 100644 --- a/broker/src/mqtt.rs +++ b/broker/src/mqtt.rs @@ -1,13 +1,15 @@ -use crate::{ChannelRequest,ChannelReply}; +use crate::{ChannelReply, ChannelRequest}; use librumqttd::{async_locallink::construct_broker, Config}; use std::thread; -use tokio::sync::{oneshot, mpsc}; +use tokio::sync::{mpsc, oneshot}; const SUB_TOPIC: &str = "sphinx-return"; const PUB_TOPIC: &str = "sphinx"; -pub fn start_broker(wait_for_ready_message: bool, mut receiver: mpsc::Receiver) -> tokio::runtime::Runtime { - +pub fn start_broker( + wait_for_ready_message: bool, + mut receiver: mpsc::Receiver, +) -> tokio::runtime::Runtime { let config: Config = confy::load_path("config/rumqttd.conf").unwrap(); let (mut router, console, servers, builder) = construct_broker(config); @@ -23,7 +25,8 @@ pub fn start_broker(wait_for_ready_message: bool, mut receiver: mpsc::Receiver>, mpsc::Receiver>) = mpsc::channel(1000); + let (msg_tx, mut msg_rx): (mpsc::Sender>, mpsc::Receiver>) = + mpsc::channel(1000); let (mut tx, mut rx) = builder.connect("localclient", 200).await.unwrap(); tx.subscribe([SUB_TOPIC]).await.unwrap(); @@ -37,6 +40,7 @@ pub fn start_broker(wait_for_ready_message: bool, mut receiver: mpsc::Receiver Result<( } pub fn start_listening( - mqtt: Arc>>>, + mut client: EspMqttClient>, mut connection: MqttConnection, tx: mpsc::Sender>, -) -> Result<()> { +) -> Result>> { // must start pumping before subscribe or publish will work thread::spawn(move || { @@ -56,13 +56,7 @@ pub fn start_listening( Err(e) => info!("MQTT Message ERROR: {}", e), Ok(msg) => { if let Event::Received(msg) = msg { - info!("MQTT MESSAGE RECEIVED!"); - // if let Ok(m) = Message::new_from_slice(&msg.data()) { - // if let Err(e) = eventloop.post(&m, None) { - // warn!("failed to post to eventloop {:?}", e); - // } - // } - tx.send(msg.data().to_vec()).unwrap(); + tx.send(msg.data().to_vec()).expect("could send to TX"); } }, } @@ -70,10 +64,13 @@ pub fn start_listening( info!("MQTT connection loop exit"); }); - let mut client = mqtt.lock().unwrap(); + // log::info!("lock mqtt mutex guard"); + // let mut client = mqtt.lock().unwrap(); + log::info!("SUBSCRIBE TO {}", TOPIC); client.subscribe(TOPIC, QoS::AtMostOnce)?; + log::info!("PUBLISH {} to {}", "READY", RETURN_TOPIC); client.publish( RETURN_TOPIC, QoS::AtMostOnce, @@ -81,5 +78,5 @@ pub fn start_listening( format!("READY").as_bytes(), )?; - Ok(()) + Ok(client) } diff --git a/sphinx-key/src/core/config.rs b/sphinx-key/src/core/config.rs index 58f7b0f..d554f45 100644 --- a/sphinx-key/src/core/config.rs +++ b/sphinx-key/src/core/config.rs @@ -22,6 +22,9 @@ pub struct Config { arp -a http://192.168.71.1/?broker=52.91.253.115%3A1883 + +http://192.168.71.1/?broker=192.168.86.222%3A1883 + */ pub fn start_wifi_client(default_nvs: Arc, config: &Config) -> Result> { diff --git a/sphinx-key/src/core/events.rs b/sphinx-key/src/core/events.rs index 8fecc39..029dd99 100644 --- a/sphinx-key/src/core/events.rs +++ b/sphinx-key/src/core/events.rs @@ -1,58 +1,41 @@ use crate::conn::mqtt::RETURN_TOPIC; use sphinx_key_signer::{self, InitResponse}; use std::sync::{mpsc}; -use std::thread; -use embedded_svc::httpd::Result; use esp_idf_sys; +use embedded_svc::httpd::Result; use embedded_svc::mqtt::client::utils::ConnState; use embedded_svc::mqtt::client::{MessageImpl, Publish, QoS}; use esp_idf_svc::mqtt::client::*; use esp_idf_sys::EspError; -use std::sync::{Arc, Mutex}; use log::*; -pub fn make_event_thread(mqtt: Arc>>>, rx: mpsc::Receiver>) -> Result<()> { +pub fn make_event_loop(mut mqtt: EspMqttClient>, rx: mpsc::Receiver>) -> Result<()> { - thread::spawn(move||{ - let mut client = mqtt.lock().unwrap(); - info!("About to subscribe to the mpsc channel"); + // initialize the RootHandler + let init_msg_bytes = rx.recv().expect("NO INIT MSG"); + let InitResponse { root_handler, init_reply } = sphinx_key_signer::init(init_msg_bytes).expect("failed to init signer"); + mqtt.publish(RETURN_TOPIC, QoS::AtMostOnce, false, init_reply).expect("could not publish init response"); - let init_msg_bytes = rx.recv().expect("NO INIT MSG"); - let InitResponse { root_handler, init_reply } = sphinx_key_signer::init(init_msg_bytes).expect("failed to init signer"); - client.publish( - RETURN_TOPIC, - QoS::AtMostOnce, - false, - init_reply, - ).expect("could not publish init response"); - - while let Ok(msg_bytes) = rx.recv() { - let _ret = match sphinx_key_signer::handle(&root_handler, msg_bytes) { - Ok(b) => client.publish(RETURN_TOPIC, QoS::AtMostOnce, false, b).expect("could not publish init response"), - Err(e) => panic!("HANDLE FAILED {:?}", e), - }; - } - - }); + // signing loop + while let Ok(msg_bytes) = rx.recv() { + let _ret = match sphinx_key_signer::handle(&root_handler, msg_bytes) { + Ok(b) => mqtt.publish(RETURN_TOPIC, QoS::AtMostOnce, false, b).expect("could not publish init response"), + Err(e) => panic!("HANDLE FAILED {:?}", e), + }; + } Ok(()) } +pub fn make_test_event_loop(mut mqtt: EspMqttClient>, rx: mpsc::Receiver>) -> Result<()> { -pub fn make_test_event_thread(mqtt: Arc>>>, rx: mpsc::Receiver>) -> Result<()> { - - thread::spawn(move||{ - let mut client = mqtt.lock().unwrap(); - info!("About to subscribe to the mpsc channel"); - - while let Ok(msg_bytes) = rx.recv() { - let b = sphinx_key_signer::parse_ping_and_form_response(msg_bytes); - log::info!("GOT A PING MESSAGE! returning pong now..."); - client.publish(RETURN_TOPIC, QoS::AtMostOnce, false, b).expect("could not publish init response"); - } - - }); + info!("About to subscribe to the mpsc channel"); + while let Ok(msg_bytes) = rx.recv() { + let b = sphinx_key_signer::parse_ping_and_form_response(msg_bytes); + log::info!("GOT A PING MESSAGE! returning pong now..."); + mqtt.publish(RETURN_TOPIC, QoS::AtMostOnce, false, b).expect("could not publish init response"); + } Ok(()) } diff --git a/sphinx-key/src/main.rs b/sphinx-key/src/main.rs index 258a171..b850335 100644 --- a/sphinx-key/src/main.rs +++ b/sphinx-key/src/main.rs @@ -8,7 +8,7 @@ use crate::periph::led::Led; use esp_idf_sys as _; // If using the `binstart` feature of `esp-idf-sys`, always keep this module imported use std::thread; -use std::sync::{Arc, Mutex, mpsc}; +use std::sync::{Arc, mpsc}; use std::time::Duration; use anyhow::Result; @@ -34,16 +34,16 @@ fn main() -> Result<()> { // store.remove("config").expect("couldnt remove config"); let wifi = start_wifi_client(default_nvs.clone(), &exist)?; - let mqtt_and_conn = conn::mqtt::make_client(&exist.broker)?; - - let mqtt = Arc::new(Mutex::new(mqtt_and_conn.0)); - // if the subscription goes out of scope its dropped - // the sub needs to publish back to mqtt??? let (tx, rx) = mpsc::channel(); - make_test_event_thread(mqtt.clone(), rx)?; - let _mqtt_client = conn::mqtt::start_listening(mqtt, mqtt_and_conn.1, tx)?; + // _conn needs to stay in scope or its dropped + let (mqtt, connection) = conn::mqtt::make_client(&exist.broker)?; + let mqtt_client = conn::mqtt::start_listening(mqtt, connection, tx)?; + + // this blocks forever... the "main thread" + log::info!(">>>>>>>>>>> blocking forever..."); + make_test_event_loop(mqtt_client, rx)?; + let mut blue = Led::new(0x000001, 100); - println!("{:?}", wifi.get_status()); loop { log::info!("Listening..."); From da9e955d44e0846a12851245cee5960bea08ead7 Mon Sep 17 00:00:00 2001 From: Evan Feenstra Date: Mon, 6 Jun 2022 14:25:05 -0700 Subject: [PATCH 3/3] real signer handler, fix dummy pear --- sphinx-key/src/core/events.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sphinx-key/src/core/events.rs b/sphinx-key/src/core/events.rs index 029dd99..e08729f 100644 --- a/sphinx-key/src/core/events.rs +++ b/sphinx-key/src/core/events.rs @@ -1,5 +1,5 @@ use crate::conn::mqtt::RETURN_TOPIC; -use sphinx_key_signer::{self, InitResponse}; +use sphinx_key_signer::{self, InitResponse, PubKey}; use std::sync::{mpsc}; use esp_idf_sys; @@ -18,8 +18,9 @@ pub fn make_event_loop(mut mqtt: EspMqttClient> mqtt.publish(RETURN_TOPIC, QoS::AtMostOnce, false, init_reply).expect("could not publish init response"); // signing loop + let dummy_peer = PubKey([0; 33]); while let Ok(msg_bytes) = rx.recv() { - let _ret = match sphinx_key_signer::handle(&root_handler, msg_bytes) { + let _ret = match sphinx_key_signer::handle(&root_handler, msg_bytes, dummy_peer.clone()) { Ok(b) => mqtt.publish(RETURN_TOPIC, QoS::AtMostOnce, false, b).expect("could not publish init response"), Err(e) => panic!("HANDLE FAILED {:?}", e), };