diff --git a/README.md b/README.md index 924d1bb..792cdf5 100644 --- a/README.md +++ b/README.md @@ -83,3 +83,5 @@ then in the sphinx-key dir, with the CC variable set as above: `cargo build` and flash using the instructions further above + + diff --git a/broker/src/mqtt.rs b/broker/src/mqtt.rs index 3762ada..8ad5fce 100644 --- a/broker/src/mqtt.rs +++ b/broker/src/mqtt.rs @@ -1,13 +1,15 @@ -use crate::{ChannelRequest,ChannelReply}; +use crate::{ChannelReply, ChannelRequest}; use librumqttd::{async_locallink::construct_broker, Config}; use std::thread; -use tokio::sync::{oneshot, mpsc}; +use tokio::sync::{mpsc, oneshot}; const SUB_TOPIC: &str = "sphinx-return"; const PUB_TOPIC: &str = "sphinx"; -pub fn start_broker(wait_for_ready_message: bool, mut receiver: mpsc::Receiver) -> tokio::runtime::Runtime { - +pub fn start_broker( + wait_for_ready_message: bool, + mut receiver: mpsc::Receiver, +) -> tokio::runtime::Runtime { let config: Config = confy::load_path("config/rumqttd.conf").unwrap(); let (mut router, console, servers, builder) = construct_broker(config); @@ -23,7 +25,8 @@ pub fn start_broker(wait_for_ready_message: bool, mut receiver: mpsc::Receiver>, mpsc::Receiver>) = mpsc::channel(1000); + let (msg_tx, mut msg_rx): (mpsc::Sender>, mpsc::Receiver>) = + mpsc::channel(1000); let (mut tx, mut rx) = builder.connect("localclient", 200).await.unwrap(); tx.subscribe([SUB_TOPIC]).await.unwrap(); @@ -37,6 +40,7 @@ pub fn start_broker(wait_for_ready_message: bool, mut receiver: mpsc::Receiver) -> msgs::Ping { - let mut m = MsgDriver::new(msg_bytes); - let (sequence, dbid) = msgs::read_serial_request_header(&mut m).expect("read ping header"); - let ping: msgs::Ping = - msgs::read_message(&mut m).expect("failed to read ping message"); - ping +pub use vls_protocol_signer::handler::{Handler, RootHandler}; + +pub struct InitResponse { + pub root_handler: RootHandler, + pub init_reply: Vec, } -pub fn say_hi() { +pub fn init(bytes: Vec) -> anyhow::Result { let persister: Arc = Arc::new(DummyPersister); - - println!("Hello, world!"); + let mut md = MsgDriver::new(bytes); + let (sequence, dbid) = read_serial_request_header(&mut md).expect("read init header"); + assert_eq!(dbid, 0); + assert_eq!(sequence, 0); + let init: msgs::HsmdInit2 = msgs::read_message(&mut md).expect("failed to read init message"); + log::info!("init {:?}", init); + let allowlist = init + .dev_allowlist + .iter() + .map(|s| from_wire_string(s)) + .collect::>(); + let seed = init.dev_seed.as_ref().map(|s| s.0).expect("no seed"); + let root_handler = RootHandler::new(0, Some(seed), persister, allowlist); + let init_reply = root_handler + .handle(Message::HsmdInit2(init)) + .expect("handle init"); + let mut reply = MsgDriver::new_empty(); + write_serial_response_header(&mut reply, sequence).expect("write init header"); + msgs::write_vec(&mut reply, init_reply.as_vec()).expect("write init reply"); + Ok(InitResponse { + root_handler, + init_reply: reply.bytes(), + }) +} + +pub fn handle(_root_handler: &RootHandler, _bytes: Vec) -> anyhow::Result> { + Ok(Vec::new()) +} + +pub fn parse_ping_and_form_response(msg_bytes: Vec) -> Vec { + let mut m = MsgDriver::new(msg_bytes); + let (sequence, _dbid) = msgs::read_serial_request_header(&mut m).expect("read ping header"); + let ping: msgs::Ping = msgs::read_message(&mut m).expect("failed to read ping message"); + let mut md = MsgDriver::new_empty(); + msgs::write_serial_response_header(&mut md, sequence) + .expect("failed to write_serial_request_header"); + let pong = msgs::Pong { + id: ping.id, + message: ping.message, + }; + msgs::write(&mut md, pong).expect("failed to serial write"); + md.bytes() +} + +// pub fn say_hi() { +// let persister: Arc = Arc::new(DummyPersister); + +// println!("Hello, world!"); +// } + +fn from_wire_string(s: &WireString) -> String { + String::from_utf8(s.0.to_vec()).expect("malformed string") } diff --git a/sphinx-key/src/conn/mqtt.rs b/sphinx-key/src/conn/mqtt.rs index 0bca42d..ecc1405 100644 --- a/sphinx-key/src/conn/mqtt.rs +++ b/sphinx-key/src/conn/mqtt.rs @@ -1,18 +1,14 @@ -use crate::core::events::Message; - -use embedded_svc::event_bus::Postbox; use embedded_svc::mqtt::client::utils::ConnState; use embedded_svc::mqtt::client::{Client, Connection, MessageImpl, Publish, QoS, Event, Message as MqttMessage}; use embedded_svc::mqtt::client::utils::Connection as MqttConnection; use esp_idf_svc::mqtt::client::*; use anyhow::Result; -use esp_idf_svc::eventloop::EspBackgroundEventLoop; use log::*; use std::thread; use esp_idf_sys::{self}; use esp_idf_sys::EspError; use esp_idf_hal::mutex::Condvar; -use std::sync::{Arc, Mutex}; +use std::sync::{mpsc}; pub const TOPIC: &str = "sphinx"; pub const RETURN_TOPIC: &str = "sphinx-return"; @@ -46,10 +42,10 @@ pub fn make_client(broker: &str) -> Result<( } pub fn start_listening( - mqtt: Arc>>>, + mut client: EspMqttClient>, mut connection: MqttConnection, - mut eventloop: EspBackgroundEventLoop -) -> Result<()> { + tx: mpsc::Sender>, +) -> Result>> { // must start pumping before subscribe or publish will work thread::spawn(move || { @@ -60,12 +56,7 @@ pub fn start_listening( Err(e) => info!("MQTT Message ERROR: {}", e), Ok(msg) => { if let Event::Received(msg) = msg { - info!("MQTT MESSAGE RECEIVED!"); - if let Ok(m) = Message::new_from_slice(&msg.data()) { - if let Err(e) = eventloop.post(&m, None) { - warn!("failed to post to eventloop {:?}", e); - } - } + tx.send(msg.data().to_vec()).expect("could send to TX"); } }, } @@ -73,16 +64,19 @@ pub fn start_listening( info!("MQTT connection loop exit"); }); - let mut client = mqtt.lock().unwrap(); + // log::info!("lock mqtt mutex guard"); + // let mut client = mqtt.lock().unwrap(); + log::info!("SUBSCRIBE TO {}", TOPIC); client.subscribe(TOPIC, QoS::AtMostOnce)?; + log::info!("PUBLISH {} to {}", "READY", RETURN_TOPIC); client.publish( RETURN_TOPIC, QoS::AtMostOnce, false, - format!("Hello from {}!", CLIENT_ID).as_bytes(), + format!("READY").as_bytes(), )?; - Ok(()) + Ok(client) } diff --git a/sphinx-key/src/core/config.rs b/sphinx-key/src/core/config.rs index 58f7b0f..d554f45 100644 --- a/sphinx-key/src/core/config.rs +++ b/sphinx-key/src/core/config.rs @@ -22,6 +22,9 @@ pub struct Config { arp -a http://192.168.71.1/?broker=52.91.253.115%3A1883 + +http://192.168.71.1/?broker=192.168.86.222%3A1883 + */ pub fn start_wifi_client(default_nvs: Arc, config: &Config) -> Result> { diff --git a/sphinx-key/src/core/events.rs b/sphinx-key/src/core/events.rs index 6e641c4..e08729f 100644 --- a/sphinx-key/src/core/events.rs +++ b/sphinx-key/src/core/events.rs @@ -1,107 +1,42 @@ use crate::conn::mqtt::RETURN_TOPIC; -use crate::periph::led::Led; -use sphinx_key_signer::parse_ping; +use sphinx_key_signer::{self, InitResponse, PubKey}; +use std::sync::{mpsc}; -use esp_idf_svc::eventloop::*; +use esp_idf_sys; use embedded_svc::httpd::Result; -use esp_idf_sys::{self, c_types}; use embedded_svc::mqtt::client::utils::ConnState; use embedded_svc::mqtt::client::{MessageImpl, Publish, QoS}; use esp_idf_svc::mqtt::client::*; use esp_idf_sys::EspError; -use std::sync::{Arc, Mutex}; use log::*; -use std::cmp::min; -pub const MSG_SIZE: usize = 256; +pub fn make_event_loop(mut mqtt: EspMqttClient>, rx: mpsc::Receiver>) -> Result<()> { -#[derive(Copy, Clone, Debug)] -pub struct Message([u8; MSG_SIZE]); + // initialize the RootHandler + let init_msg_bytes = rx.recv().expect("NO INIT MSG"); + let InitResponse { root_handler, init_reply } = sphinx_key_signer::init(init_msg_bytes).expect("failed to init signer"); + mqtt.publish(RETURN_TOPIC, QoS::AtMostOnce, false, init_reply).expect("could not publish init response"); -impl Message { - pub fn _new(bytes: [u8; MSG_SIZE]) -> Self { - Self(bytes) - } - // the first byte is the length of the message - pub fn new_from_slice(src: &[u8]) -> Result { - if src.len() > MSG_SIZE - 1 { - return Err(anyhow::anyhow!("message too long")); - } - let mut dest = [0; MSG_SIZE]; - dest[0] = src.len() as u8; // this would crash if MSG_SIZE>256 - for i in 0..min(src.len(), MSG_SIZE) { - dest[i+1] = src[i]; - } - Ok(Self(dest)) - } - pub fn read_bytes(&self) -> Vec { - let l = self.0[0] as usize; - self.0[1..l+1].to_vec() - } - pub fn read_string(&self) -> String { - String::from_utf8_lossy(&self.0).to_string() - } -} - -impl EspTypedEventSource for Message { - fn source() -> *const c_types::c_char { - b"SPHINX\0".as_ptr() as *const _ - } -} - -impl EspTypedEventSerializer for Message { - fn serialize( - event: &Message, - f: impl for<'a> FnOnce(&'a EspEventPostData) -> R, - ) -> R { - f(&unsafe { EspEventPostData::new(Self::source(), Self::event_id(), event) }) - } -} - -impl EspTypedEventDeserializer for Message { - fn deserialize( - data: &EspEventFetchData, - f: &mut impl for<'a> FnMut(&'a Message) -> R, - ) -> R { - f(unsafe { data.as_payload() }) - } -} - -pub fn make_eventloop(client: Arc>>>) -> Result<(EspBackgroundEventLoop, EspBackgroundSubscription)> { - use embedded_svc::event_bus::EventBus; - - info!("About to start a background event loop"); - let mut eventloop = EspBackgroundEventLoop::new( - &BackgroundLoopConfiguration { - task_stack_size: 8192, - .. Default::default() - }, - )?; - let mut green = Led::new(0x000100, 10); - - info!("About to subscribe to the background event loop"); - let subscription = eventloop.subscribe(move |message: &Message| { - info!("!!! Got message from the event loop"); //: {:?}", message.0); - green.blink(); - let msg_bytes = message.read_bytes(); - // let msg_str = String::from_utf8_lossy(&msg[..]); - match client.lock() { - Ok(mut m_) => { - let ping = parse_ping(msg_bytes); - if let Err(err) = m_.publish( - RETURN_TOPIC, - QoS::AtMostOnce, - false, - format!("Got and parsed the ping!!!").as_bytes(), - ) { - log::warn!("failed to mqtt publish! {:?}", err); - }; - }, - Err(_) => log::warn!("failed to lock Mutex") + // signing loop + let dummy_peer = PubKey([0; 33]); + while let Ok(msg_bytes) = rx.recv() { + let _ret = match sphinx_key_signer::handle(&root_handler, msg_bytes, dummy_peer.clone()) { + Ok(b) => mqtt.publish(RETURN_TOPIC, QoS::AtMostOnce, false, b).expect("could not publish init response"), + Err(e) => panic!("HANDLE FAILED {:?}", e), }; - - })?; - // let subscription = eventloop.subscribe(cb)?; + } - Ok((eventloop, subscription)) + Ok(()) +} + +pub fn make_test_event_loop(mut mqtt: EspMqttClient>, rx: mpsc::Receiver>) -> Result<()> { + + info!("About to subscribe to the mpsc channel"); + while let Ok(msg_bytes) = rx.recv() { + let b = sphinx_key_signer::parse_ping_and_form_response(msg_bytes); + log::info!("GOT A PING MESSAGE! returning pong now..."); + mqtt.publish(RETURN_TOPIC, QoS::AtMostOnce, false, b).expect("could not publish init response"); + } + + Ok(()) } diff --git a/sphinx-key/src/main.rs b/sphinx-key/src/main.rs index ae7140c..b850335 100644 --- a/sphinx-key/src/main.rs +++ b/sphinx-key/src/main.rs @@ -6,10 +6,9 @@ mod periph; use crate::core::{events::*, config::*}; use crate::periph::led::Led; -use sphinx_key_signer; use esp_idf_sys as _; // If using the `binstart` feature of `esp-idf-sys`, always keep this module imported use std::thread; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, mpsc}; use std::time::Duration; use anyhow::Result; @@ -25,8 +24,6 @@ fn main() -> Result<()> { esp_idf_svc::log::EspLogger::initialize_default(); - sphinx_key_signer::say_hi(); - thread::sleep(Duration::from_secs(1)); let default_nvs = Arc::new(EspDefaultNvs::new()?); @@ -37,22 +34,23 @@ fn main() -> Result<()> { // store.remove("config").expect("couldnt remove config"); let wifi = start_wifi_client(default_nvs.clone(), &exist)?; - let mqtt_and_conn = conn::mqtt::make_client(&exist.broker)?; - - let mqtt = Arc::new(Mutex::new(mqtt_and_conn.0)); - // if the subscription goes out of scope its dropped - // the sub needs to publish back to mqtt??? - let (eventloop, _sub) = make_eventloop(mqtt.clone())?; - let _mqtt_client = conn::mqtt::start_listening(mqtt, mqtt_and_conn.1, eventloop)?; + let (tx, rx) = mpsc::channel(); + // _conn needs to stay in scope or its dropped + let (mqtt, connection) = conn::mqtt::make_client(&exist.broker)?; + let mqtt_client = conn::mqtt::start_listening(mqtt, connection, tx)?; + + // this blocks forever... the "main thread" + log::info!(">>>>>>>>>>> blocking forever..."); + make_test_event_loop(mqtt_client, rx)?; + let mut blue = Led::new(0x000001, 100); - println!("{:?}", wifi.get_status()); loop { log::info!("Listening..."); blue.blink(); thread::sleep(Duration::from_secs(1)); } - drop(wifi); + // drop(wifi); } else { println!("=============> START SERVER NOW AND WAIT <=============="); if let Ok((wifi, config)) = start_config_server_and_wait(default_nvs.clone()) {