diff --git a/README.md b/README.md index 924d1bb..792cdf5 100644 --- a/README.md +++ b/README.md @@ -83,3 +83,5 @@ then in the sphinx-key dir, with the CC variable set as above: `cargo build` and flash using the instructions further above + + diff --git a/broker/src/mqtt.rs b/broker/src/mqtt.rs index 3762ada..8ad5fce 100644 --- a/broker/src/mqtt.rs +++ b/broker/src/mqtt.rs @@ -1,13 +1,15 @@ -use crate::{ChannelRequest,ChannelReply}; +use crate::{ChannelReply, ChannelRequest}; use librumqttd::{async_locallink::construct_broker, Config}; use std::thread; -use tokio::sync::{oneshot, mpsc}; +use tokio::sync::{mpsc, oneshot}; const SUB_TOPIC: &str = "sphinx-return"; const PUB_TOPIC: &str = "sphinx"; -pub fn start_broker(wait_for_ready_message: bool, mut receiver: mpsc::Receiver) -> tokio::runtime::Runtime { - +pub fn start_broker( + wait_for_ready_message: bool, + mut receiver: mpsc::Receiver, +) -> tokio::runtime::Runtime { let config: Config = confy::load_path("config/rumqttd.conf").unwrap(); let (mut router, console, servers, builder) = construct_broker(config); @@ -23,7 +25,8 @@ pub fn start_broker(wait_for_ready_message: bool, mut receiver: mpsc::Receiver>, mpsc::Receiver>) = mpsc::channel(1000); + let (msg_tx, mut msg_rx): (mpsc::Sender>, mpsc::Receiver>) = + mpsc::channel(1000); let (mut tx, mut rx) = builder.connect("localclient", 200).await.unwrap(); tx.subscribe([SUB_TOPIC]).await.unwrap(); @@ -37,6 +40,7 @@ pub fn start_broker(wait_for_ready_message: bool, mut receiver: mpsc::Receiver Result<( } pub fn start_listening( - mqtt: Arc>>>, + mut client: EspMqttClient>, mut connection: MqttConnection, tx: mpsc::Sender>, -) -> Result<()> { +) -> Result>> { // must start pumping before subscribe or publish will work thread::spawn(move || { @@ -56,13 +56,7 @@ pub fn start_listening( Err(e) => info!("MQTT Message ERROR: {}", e), Ok(msg) => { if let Event::Received(msg) = msg { - info!("MQTT MESSAGE RECEIVED!"); - // if let Ok(m) = Message::new_from_slice(&msg.data()) { - // if let Err(e) = eventloop.post(&m, None) { - // warn!("failed to post to eventloop {:?}", e); - // } - // } - tx.send(msg.data().to_vec()).unwrap(); + tx.send(msg.data().to_vec()).expect("could send to TX"); } }, } @@ -70,10 +64,13 @@ pub fn start_listening( info!("MQTT connection loop exit"); }); - let mut client = mqtt.lock().unwrap(); + // log::info!("lock mqtt mutex guard"); + // let mut client = mqtt.lock().unwrap(); + log::info!("SUBSCRIBE TO {}", TOPIC); client.subscribe(TOPIC, QoS::AtMostOnce)?; + log::info!("PUBLISH {} to {}", "READY", RETURN_TOPIC); client.publish( RETURN_TOPIC, QoS::AtMostOnce, @@ -81,5 +78,5 @@ pub fn start_listening( format!("READY").as_bytes(), )?; - Ok(()) + Ok(client) } diff --git a/sphinx-key/src/core/config.rs b/sphinx-key/src/core/config.rs index 58f7b0f..d554f45 100644 --- a/sphinx-key/src/core/config.rs +++ b/sphinx-key/src/core/config.rs @@ -22,6 +22,9 @@ pub struct Config { arp -a http://192.168.71.1/?broker=52.91.253.115%3A1883 + +http://192.168.71.1/?broker=192.168.86.222%3A1883 + */ pub fn start_wifi_client(default_nvs: Arc, config: &Config) -> Result> { diff --git a/sphinx-key/src/core/events.rs b/sphinx-key/src/core/events.rs index 8fecc39..029dd99 100644 --- a/sphinx-key/src/core/events.rs +++ b/sphinx-key/src/core/events.rs @@ -1,58 +1,41 @@ use crate::conn::mqtt::RETURN_TOPIC; use sphinx_key_signer::{self, InitResponse}; use std::sync::{mpsc}; -use std::thread; -use embedded_svc::httpd::Result; use esp_idf_sys; +use embedded_svc::httpd::Result; use embedded_svc::mqtt::client::utils::ConnState; use embedded_svc::mqtt::client::{MessageImpl, Publish, QoS}; use esp_idf_svc::mqtt::client::*; use esp_idf_sys::EspError; -use std::sync::{Arc, Mutex}; use log::*; -pub fn make_event_thread(mqtt: Arc>>>, rx: mpsc::Receiver>) -> Result<()> { +pub fn make_event_loop(mut mqtt: EspMqttClient>, rx: mpsc::Receiver>) -> Result<()> { - thread::spawn(move||{ - let mut client = mqtt.lock().unwrap(); - info!("About to subscribe to the mpsc channel"); + // initialize the RootHandler + let init_msg_bytes = rx.recv().expect("NO INIT MSG"); + let InitResponse { root_handler, init_reply } = sphinx_key_signer::init(init_msg_bytes).expect("failed to init signer"); + mqtt.publish(RETURN_TOPIC, QoS::AtMostOnce, false, init_reply).expect("could not publish init response"); - let init_msg_bytes = rx.recv().expect("NO INIT MSG"); - let InitResponse { root_handler, init_reply } = sphinx_key_signer::init(init_msg_bytes).expect("failed to init signer"); - client.publish( - RETURN_TOPIC, - QoS::AtMostOnce, - false, - init_reply, - ).expect("could not publish init response"); - - while let Ok(msg_bytes) = rx.recv() { - let _ret = match sphinx_key_signer::handle(&root_handler, msg_bytes) { - Ok(b) => client.publish(RETURN_TOPIC, QoS::AtMostOnce, false, b).expect("could not publish init response"), - Err(e) => panic!("HANDLE FAILED {:?}", e), - }; - } - - }); + // signing loop + while let Ok(msg_bytes) = rx.recv() { + let _ret = match sphinx_key_signer::handle(&root_handler, msg_bytes) { + Ok(b) => mqtt.publish(RETURN_TOPIC, QoS::AtMostOnce, false, b).expect("could not publish init response"), + Err(e) => panic!("HANDLE FAILED {:?}", e), + }; + } Ok(()) } +pub fn make_test_event_loop(mut mqtt: EspMqttClient>, rx: mpsc::Receiver>) -> Result<()> { -pub fn make_test_event_thread(mqtt: Arc>>>, rx: mpsc::Receiver>) -> Result<()> { - - thread::spawn(move||{ - let mut client = mqtt.lock().unwrap(); - info!("About to subscribe to the mpsc channel"); - - while let Ok(msg_bytes) = rx.recv() { - let b = sphinx_key_signer::parse_ping_and_form_response(msg_bytes); - log::info!("GOT A PING MESSAGE! returning pong now..."); - client.publish(RETURN_TOPIC, QoS::AtMostOnce, false, b).expect("could not publish init response"); - } - - }); + info!("About to subscribe to the mpsc channel"); + while let Ok(msg_bytes) = rx.recv() { + let b = sphinx_key_signer::parse_ping_and_form_response(msg_bytes); + log::info!("GOT A PING MESSAGE! returning pong now..."); + mqtt.publish(RETURN_TOPIC, QoS::AtMostOnce, false, b).expect("could not publish init response"); + } Ok(()) } diff --git a/sphinx-key/src/main.rs b/sphinx-key/src/main.rs index 258a171..b850335 100644 --- a/sphinx-key/src/main.rs +++ b/sphinx-key/src/main.rs @@ -8,7 +8,7 @@ use crate::periph::led::Led; use esp_idf_sys as _; // If using the `binstart` feature of `esp-idf-sys`, always keep this module imported use std::thread; -use std::sync::{Arc, Mutex, mpsc}; +use std::sync::{Arc, mpsc}; use std::time::Duration; use anyhow::Result; @@ -34,16 +34,16 @@ fn main() -> Result<()> { // store.remove("config").expect("couldnt remove config"); let wifi = start_wifi_client(default_nvs.clone(), &exist)?; - let mqtt_and_conn = conn::mqtt::make_client(&exist.broker)?; - - let mqtt = Arc::new(Mutex::new(mqtt_and_conn.0)); - // if the subscription goes out of scope its dropped - // the sub needs to publish back to mqtt??? let (tx, rx) = mpsc::channel(); - make_test_event_thread(mqtt.clone(), rx)?; - let _mqtt_client = conn::mqtt::start_listening(mqtt, mqtt_and_conn.1, tx)?; + // _conn needs to stay in scope or its dropped + let (mqtt, connection) = conn::mqtt::make_client(&exist.broker)?; + let mqtt_client = conn::mqtt::start_listening(mqtt, connection, tx)?; + + // this blocks forever... the "main thread" + log::info!(">>>>>>>>>>> blocking forever..."); + make_test_event_loop(mqtt_client, rx)?; + let mut blue = Led::new(0x000001, 100); - println!("{:?}", wifi.get_status()); loop { log::info!("Listening...");