From 209dda952a268008655b38fc9d35f16d48ebb061 Mon Sep 17 00:00:00 2001 From: Evan Feenstra Date: Thu, 16 Jun 2022 09:24:11 -0700 Subject: [PATCH] fix broker init on release --- README.md | 2 +- broker/src/main.rs | 10 ++++- signer/Cargo.toml | 4 +- signer/src/lib.rs | 3 ++ sphinx-key/src/conn/mqtt.rs | 83 ++++++++++++++++++----------------- sphinx-key/src/core/events.rs | 74 +++++++++++++++++++++---------- 6 files changed, 109 insertions(+), 67 deletions(-) diff --git a/README.md b/README.md index 06f57fc..e651b3d 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ Find the path to your `riscv32-esp-elf-gcc` binary within the `.embuild` dir: ### flash test -`espflash target/riscv32imc-esp-espidf/debug/sphinx-key` +`espflash target/riscv32imc-esp-espidf/debug/sphinx-key --monitor` ### build release diff --git a/broker/src/main.rs b/broker/src/main.rs index c6a2ac2..88a2df1 100644 --- a/broker/src/main.rs +++ b/broker/src/main.rs @@ -60,10 +60,18 @@ fn main() -> anyhow::Result<()> { run_test::run_test(); } else { let (tx, rx) = mpsc::channel(1000); - let (status_tx, _status_rx) = mpsc::channel(1000); + let (status_tx, mut status_rx) = mpsc::channel(1000); + log::info!("=> start broker"); let _runtime = start_broker(rx, status_tx, "sphinx-1"); + log::info!("=> wait for connected status"); + // wait for connection = true + let status = status_rx.blocking_recv().expect("couldnt receive"); + log::info!("=> connection status: {}", status); + assert_eq!(status, true, "expected connected = true"); // runtime.block_on(async { init::blocking_connect(tx.clone()); + log::info!("=====> sent seed!"); + // listen to reqs from CLN let conn = UnixConnection::new(parent_fd); let client = UnixClient::new(conn); diff --git a/signer/Cargo.toml b/signer/Cargo.toml index 7a74189..357ba93 100644 --- a/signer/Cargo.toml +++ b/signer/Cargo.toml @@ -6,11 +6,9 @@ edition = "2018" [dependencies] sphinx-key-parser = { path = "../parser" } +# vls-protocol-signer = { path = "../../../evanf/validating-lightning-signer/vls-protocol-signer", default-features = false, features = ["secp-lowmemory"] } vls-protocol-signer = { git = "https://gitlab.com/lightning-signer/validating-lightning-signer", default-features = false, features = ["secp-lowmemory"] } anyhow = {version = "1", features = ["backtrace"]} log = "0.4" -[patch.crates-io] -# Low-memory version of secp256k1 with static precomputation -secp256k1 = { git = "https://github.com/devrandom/rust-secp256k1.git", rev = "4e745ebe7e4c9cd0a7e9c8d5c42e989522e52f71" } diff --git a/signer/src/lib.rs b/signer/src/lib.rs index e02af9b..2d3181a 100644 --- a/signer/src/lib.rs +++ b/signer/src/lib.rs @@ -26,8 +26,11 @@ pub fn init(bytes: Vec) -> anyhow::Result { .iter() .map(|s| from_wire_string(s)) .collect::>(); + log::info!("allowlist {:?}", allowlist); let seed = init.dev_seed.as_ref().map(|s| s.0).expect("no seed"); + log::info!("seed {:?}", seed); let root_handler = RootHandler::new(0, Some(seed), persister, allowlist); + log::info!("root_handler created"); let init_reply = root_handler .handle(Message::HsmdInit2(init)) .expect("handle init"); diff --git a/sphinx-key/src/conn/mqtt.rs b/sphinx-key/src/conn/mqtt.rs index 8c7a8bb..0ff559c 100644 --- a/sphinx-key/src/conn/mqtt.rs +++ b/sphinx-key/src/conn/mqtt.rs @@ -1,16 +1,16 @@ use crate::core::events::Event as CoreEvent; -use embedded_svc::mqtt::client::utils::ConnState; -use embedded_svc::mqtt::client::{Connection, MessageImpl, QoS, Event, Message as MqttMessage}; -use embedded_svc::mqtt::client::utils::Connection as MqttConnection; -use esp_idf_svc::mqtt::client::*; use anyhow::Result; -use log::*; -use std::thread; -use esp_idf_sys::{self}; -use esp_idf_sys::EspError; +use embedded_svc::mqtt::client::utils::ConnState; +use embedded_svc::mqtt::client::utils::Connection as MqttConnection; +use embedded_svc::mqtt::client::{Connection, Event, Message as MqttMessage, MessageImpl, QoS}; use esp_idf_hal::mutex::Condvar; -use std::sync::{mpsc}; +use esp_idf_svc::mqtt::client::*; +use esp_idf_sys::EspError; +use esp_idf_sys::{self}; +use log::*; +use std::sync::mpsc; +use std::thread; pub const TOPIC: &str = "sphinx"; pub const RETURN_TOPIC: &str = "sphinx-return"; @@ -18,10 +18,14 @@ pub const USERNAME: &str = "sphinx-key"; pub const PASSWORD: &str = "sphinx-key-pass"; pub const QOS: QoS = QoS::AtMostOnce; -pub fn make_client(broker: &str, client_id: &str) -> Result<( - EspMqttClient>, +pub fn make_client( + broker: &str, + client_id: &str, +) -> Result<( + EspMqttClient>, MqttConnection, )> { + log::info!("make_client with id {}", client_id); let conf = MqttClientConfiguration { client_id: Some(client_id), buffer_size: 2048, @@ -44,42 +48,41 @@ pub fn make_client(broker: &str, client_id: &str) -> Result<( pub fn start_listening( client: EspMqttClient>, - mut connection: MqttConnection, + mut connection: MqttConnection, tx: mpsc::Sender, ) -> Result>> { - // must start pumping before subscribe or publish will not work thread::spawn(move || { info!("MQTT Listening for messages"); loop { match connection.next() { - Some(msg) => { - match msg { - Err(e) => match e.to_string().as_ref() { - "ESP_FAIL" => { - error!("ESP_FAIL msg!"); - }, - _ => error!("Unknown error: {}", e), - }, - Ok(msg) => { - match msg { - Event::BeforeConnect => info!("RECEIVED BeforeConnect MESSAGE"), - Event::Connected(_flag) => { - info!("RECEIVED Connected MESSAGE"); - tx.send(CoreEvent::Connected).expect("couldnt send Event::Connected"); - }, - Event::Disconnected => { - warn!("RECEIVED Disconnected MESSAGE"); - tx.send(CoreEvent::Disconnected).expect("couldnt send Event::Disconnected"); - }, - Event::Subscribed(_mes_id) => info!("RECEIVED Subscribed MESSAGE"), - Event::Unsubscribed(_mes_id) => info!("RECEIVED Unsubscribed MESSAGE"), - Event::Published(_mes_id) => info!("RECEIVED Published MESSAGE"), - Event::Received(msg) => tx.send(CoreEvent::Message(msg.data().to_vec())).expect("couldnt send Event::Message"), - Event::Deleted(_mes_id) => info!("RECEIVED Deleted MESSAGE"), - } - }, - } + Some(msg) => match msg { + Err(e) => match e.to_string().as_ref() { + "ESP_FAIL" => { + error!("ESP_FAIL msg!"); + } + _ => error!("Unknown error: {}", e), + }, + Ok(msg) => match msg { + Event::BeforeConnect => info!("RECEIVED BeforeConnect MESSAGE"), + Event::Connected(_flag) => { + info!("RECEIVED Connected MESSAGE"); + tx.send(CoreEvent::Connected) + .expect("couldnt send Event::Connected"); + } + Event::Disconnected => { + warn!("RECEIVED Disconnected MESSAGE"); + tx.send(CoreEvent::Disconnected) + .expect("couldnt send Event::Disconnected"); + } + Event::Subscribed(_mes_id) => info!("RECEIVED Subscribed MESSAGE"), + Event::Unsubscribed(_mes_id) => info!("RECEIVED Unsubscribed MESSAGE"), + Event::Published(_mes_id) => info!("RECEIVED Published MESSAGE"), + Event::Received(msg) => tx + .send(CoreEvent::Message(msg.data().to_vec())) + .expect("couldnt send Event::Message"), + Event::Deleted(_mes_id) => info!("RECEIVED Deleted MESSAGE"), + }, }, None => break, } diff --git a/sphinx-key/src/core/events.rs b/sphinx-key/src/core/events.rs index 30f5803..252e1de 100644 --- a/sphinx-key/src/core/events.rs +++ b/sphinx-key/src/core/events.rs @@ -1,32 +1,49 @@ -use crate::conn::mqtt::{RETURN_TOPIC, TOPIC, QOS}; -use sphinx_key_signer::{self, InitResponse}; +use crate::conn::mqtt::{QOS, RETURN_TOPIC, TOPIC}; use sphinx_key_signer::vls_protocol::model::PubKey; +use sphinx_key_signer::{self, InitResponse}; use std::sync::mpsc; -use esp_idf_sys; -use embedded_svc::mqtt::client::Client; use embedded_svc::httpd::Result; use embedded_svc::mqtt::client::utils::ConnState; +use embedded_svc::mqtt::client::Client; use embedded_svc::mqtt::client::{MessageImpl, Publish}; use esp_idf_svc::mqtt::client::*; +use esp_idf_sys; use esp_idf_sys::EspError; pub enum Event { Connected, Disconnected, - Message(Vec) + Message(Vec), } #[cfg(not(feature = "pingpong"))] -pub fn make_event_loop(mut mqtt: EspMqttClient>, rx: mpsc::Receiver, do_log: bool) -> Result<()> { - +pub fn make_event_loop( + mut mqtt: EspMqttClient>, + rx: mpsc::Receiver, + do_log: bool, +) -> Result<()> { // initialize the RootHandler let root_handler = loop { if let Ok(event) = rx.recv() { - if let Event::Message(msg_bytes) = event { - let InitResponse { root_handler, init_reply } = sphinx_key_signer::init(msg_bytes).expect("failed to init signer"); - mqtt.publish(RETURN_TOPIC, QOS, false, init_reply).expect("could not publish init response"); - break root_handler + match event { + Event::Connected => { + log::info!("SUBSCRIBE to {}", TOPIC); + mqtt.subscribe(TOPIC, QOS) + .expect("could not MQTT subscribe"); + } + Event::Message(ref msg_bytes) => { + let InitResponse { + root_handler, + init_reply, + } = sphinx_key_signer::init(msg_bytes.clone()).expect("failed to init signer"); + mqtt.publish(RETURN_TOPIC, QOS, false, init_reply) + .expect("could not publish init response"); + break root_handler; + } + Event::Disconnected => { + log::info!("GOT an early Event::Disconnected msg!"); + } } } }; @@ -37,14 +54,22 @@ pub fn make_event_loop(mut mqtt: EspMqttClient> match event { Event::Connected => { log::info!("SUBSCRIBE TO {}", TOPIC); - mqtt.subscribe(TOPIC, QOS).expect("could not MQTT subscribe"); - }, + mqtt.subscribe(TOPIC, QOS) + .expect("could not MQTT subscribe"); + } Event::Message(ref msg_bytes) => { - let _ret = match sphinx_key_signer::handle(&root_handler, msg_bytes.clone(), dummy_peer.clone(), do_log) { - Ok(b) => mqtt.publish(RETURN_TOPIC, QOS, false, b).expect("could not publish init response"), + let _ret = match sphinx_key_signer::handle( + &root_handler, + msg_bytes.clone(), + dummy_peer.clone(), + do_log, + ) { + Ok(b) => mqtt + .publish(RETURN_TOPIC, QOS, false, b) + .expect("could not publish init response"), Err(e) => panic!("HANDLE FAILED {:?}", e), }; - }, + } Event::Disconnected => { log::info!("GOT A Event::Disconnected msg!"); } @@ -55,22 +80,27 @@ pub fn make_event_loop(mut mqtt: EspMqttClient> } #[cfg(feature = "pingpong")] -pub fn make_event_loop(mut mqtt: EspMqttClient>, rx: mpsc::Receiver, do_log: bool) -> Result<()> { - +pub fn make_event_loop( + mut mqtt: EspMqttClient>, + rx: mpsc::Receiver, + do_log: bool, +) -> Result<()> { log::info!("About to subscribe to the mpsc channel"); while let Ok(event) = rx.recv() { match event { Event::Connected => { log::info!("SUBSCRIBE TO {}", TOPIC); - mqtt.subscribe(TOPIC, QOS).expect("could not MQTT subscribe"); - }, + mqtt.subscribe(TOPIC, QOS) + .expect("could not MQTT subscribe"); + } Event::Message(msg_bytes) => { let b = sphinx_key_signer::parse_ping_and_form_response(msg_bytes); if do_log { log::info!("GOT A PING MESSAGE! returning pong now..."); } - mqtt.publish(RETURN_TOPIC, QOS, false, b).expect("could not publish ping response"); - }, + mqtt.publish(RETURN_TOPIC, QOS, false, b) + .expect("could not publish ping response"); + } Event::Disconnected => { log::info!("GOT A Event::Disconnected msg!"); }