mirror of
https://github.com/stakwork/sphinx-key.git
synced 2025-12-19 00:04:25 +01:00
VLS ping pong test running on ESP, no Mutex
This commit is contained in:
@@ -83,3 +83,5 @@ then in the sphinx-key dir, with the CC variable set as above:
|
|||||||
`cargo build`
|
`cargo build`
|
||||||
|
|
||||||
and flash using the instructions further above
|
and flash using the instructions further above
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -1,13 +1,15 @@
|
|||||||
use crate::{ChannelRequest,ChannelReply};
|
use crate::{ChannelReply, ChannelRequest};
|
||||||
use librumqttd::{async_locallink::construct_broker, Config};
|
use librumqttd::{async_locallink::construct_broker, Config};
|
||||||
use std::thread;
|
use std::thread;
|
||||||
use tokio::sync::{oneshot, mpsc};
|
use tokio::sync::{mpsc, oneshot};
|
||||||
|
|
||||||
const SUB_TOPIC: &str = "sphinx-return";
|
const SUB_TOPIC: &str = "sphinx-return";
|
||||||
const PUB_TOPIC: &str = "sphinx";
|
const PUB_TOPIC: &str = "sphinx";
|
||||||
|
|
||||||
pub fn start_broker(wait_for_ready_message: bool, mut receiver: mpsc::Receiver<ChannelRequest>) -> tokio::runtime::Runtime {
|
pub fn start_broker(
|
||||||
|
wait_for_ready_message: bool,
|
||||||
|
mut receiver: mpsc::Receiver<ChannelRequest>,
|
||||||
|
) -> tokio::runtime::Runtime {
|
||||||
let config: Config = confy::load_path("config/rumqttd.conf").unwrap();
|
let config: Config = confy::load_path("config/rumqttd.conf").unwrap();
|
||||||
|
|
||||||
let (mut router, console, servers, builder) = construct_broker(config);
|
let (mut router, console, servers, builder) = construct_broker(config);
|
||||||
@@ -23,7 +25,8 @@ pub fn start_broker(wait_for_ready_message: bool, mut receiver: mpsc::Receiver<C
|
|||||||
// channel to block until READY received
|
// channel to block until READY received
|
||||||
let (ready_tx, ready_rx) = oneshot::channel();
|
let (ready_tx, ready_rx) = oneshot::channel();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let (msg_tx, mut msg_rx): (mpsc::Sender<Vec<u8>>, mpsc::Receiver<Vec<u8>>) = mpsc::channel(1000);
|
let (msg_tx, mut msg_rx): (mpsc::Sender<Vec<u8>>, mpsc::Receiver<Vec<u8>>) =
|
||||||
|
mpsc::channel(1000);
|
||||||
let (mut tx, mut rx) = builder.connect("localclient", 200).await.unwrap();
|
let (mut tx, mut rx) = builder.connect("localclient", 200).await.unwrap();
|
||||||
tx.subscribe([SUB_TOPIC]).await.unwrap();
|
tx.subscribe([SUB_TOPIC]).await.unwrap();
|
||||||
|
|
||||||
@@ -37,6 +40,7 @@ pub fn start_broker(wait_for_ready_message: bool, mut receiver: mpsc::Receiver<C
|
|||||||
let message = rx.recv().await.unwrap();
|
let message = rx.recv().await.unwrap();
|
||||||
if let Some(payload) = message.payload.get(0) {
|
if let Some(payload) = message.payload.get(0) {
|
||||||
let content = String::from_utf8_lossy(&payload[..]);
|
let content = String::from_utf8_lossy(&payload[..]);
|
||||||
|
log::info!("received message content: {}", content);
|
||||||
if content == "READY" {
|
if content == "READY" {
|
||||||
ready_tx.send(true).expect("could not send ready");
|
ready_tx.send(true).expect("could not send ready");
|
||||||
break;
|
break;
|
||||||
@@ -58,7 +62,9 @@ pub fn start_broker(wait_for_ready_message: bool, mut receiver: mpsc::Receiver<C
|
|||||||
|
|
||||||
let relay_task = tokio::spawn(async move {
|
let relay_task = tokio::spawn(async move {
|
||||||
while let Some(msg) = receiver.recv().await {
|
while let Some(msg) = receiver.recv().await {
|
||||||
tx.publish(PUB_TOPIC, false, msg.message).await.expect("could not mqtt pub");
|
tx.publish(PUB_TOPIC, false, msg.message)
|
||||||
|
.await
|
||||||
|
.expect("could not mqtt pub");
|
||||||
let reply = msg_rx.recv().await.expect("could not unwrap msg_rx.recv()");
|
let reply = msg_rx.recv().await.expect("could not unwrap msg_rx.recv()");
|
||||||
if let Err(_) = msg.reply_tx.send(ChannelReply { reply }) {
|
if let Err(_) = msg.reply_tx.send(ChannelReply { reply }) {
|
||||||
log::warn!("could not send on reply_tx");
|
log::warn!("could not send on reply_tx");
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ use std::thread;
|
|||||||
use esp_idf_sys::{self};
|
use esp_idf_sys::{self};
|
||||||
use esp_idf_sys::EspError;
|
use esp_idf_sys::EspError;
|
||||||
use esp_idf_hal::mutex::Condvar;
|
use esp_idf_hal::mutex::Condvar;
|
||||||
use std::sync::{Arc, Mutex, mpsc};
|
use std::sync::{mpsc};
|
||||||
|
|
||||||
pub const TOPIC: &str = "sphinx";
|
pub const TOPIC: &str = "sphinx";
|
||||||
pub const RETURN_TOPIC: &str = "sphinx-return";
|
pub const RETURN_TOPIC: &str = "sphinx-return";
|
||||||
@@ -42,10 +42,10 @@ pub fn make_client(broker: &str) -> Result<(
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn start_listening(
|
pub fn start_listening(
|
||||||
mqtt: Arc<Mutex<EspMqttClient<ConnState<MessageImpl, EspError>>>>,
|
mut client: EspMqttClient<ConnState<MessageImpl, EspError>>,
|
||||||
mut connection: MqttConnection<Condvar, MessageImpl, EspError>,
|
mut connection: MqttConnection<Condvar, MessageImpl, EspError>,
|
||||||
tx: mpsc::Sender<Vec<u8>>,
|
tx: mpsc::Sender<Vec<u8>>,
|
||||||
) -> Result<()> {
|
) -> Result<EspMqttClient<ConnState<MessageImpl, EspError>>> {
|
||||||
|
|
||||||
// must start pumping before subscribe or publish will work
|
// must start pumping before subscribe or publish will work
|
||||||
thread::spawn(move || {
|
thread::spawn(move || {
|
||||||
@@ -56,13 +56,7 @@ pub fn start_listening(
|
|||||||
Err(e) => info!("MQTT Message ERROR: {}", e),
|
Err(e) => info!("MQTT Message ERROR: {}", e),
|
||||||
Ok(msg) => {
|
Ok(msg) => {
|
||||||
if let Event::Received(msg) = msg {
|
if let Event::Received(msg) = msg {
|
||||||
info!("MQTT MESSAGE RECEIVED!");
|
tx.send(msg.data().to_vec()).expect("could send to TX");
|
||||||
// if let Ok(m) = Message::new_from_slice(&msg.data()) {
|
|
||||||
// if let Err(e) = eventloop.post(&m, None) {
|
|
||||||
// warn!("failed to post to eventloop {:?}", e);
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
tx.send(msg.data().to_vec()).unwrap();
|
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
@@ -70,10 +64,13 @@ pub fn start_listening(
|
|||||||
info!("MQTT connection loop exit");
|
info!("MQTT connection loop exit");
|
||||||
});
|
});
|
||||||
|
|
||||||
let mut client = mqtt.lock().unwrap();
|
// log::info!("lock mqtt mutex guard");
|
||||||
|
// let mut client = mqtt.lock().unwrap();
|
||||||
|
|
||||||
|
log::info!("SUBSCRIBE TO {}", TOPIC);
|
||||||
client.subscribe(TOPIC, QoS::AtMostOnce)?;
|
client.subscribe(TOPIC, QoS::AtMostOnce)?;
|
||||||
|
|
||||||
|
log::info!("PUBLISH {} to {}", "READY", RETURN_TOPIC);
|
||||||
client.publish(
|
client.publish(
|
||||||
RETURN_TOPIC,
|
RETURN_TOPIC,
|
||||||
QoS::AtMostOnce,
|
QoS::AtMostOnce,
|
||||||
@@ -81,5 +78,5 @@ pub fn start_listening(
|
|||||||
format!("READY").as_bytes(),
|
format!("READY").as_bytes(),
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
Ok(())
|
Ok(client)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -22,6 +22,9 @@ pub struct Config {
|
|||||||
arp -a
|
arp -a
|
||||||
|
|
||||||
http://192.168.71.1/?broker=52.91.253.115%3A1883
|
http://192.168.71.1/?broker=52.91.253.115%3A1883
|
||||||
|
|
||||||
|
http://192.168.71.1/?broker=192.168.86.222%3A1883
|
||||||
|
|
||||||
*/
|
*/
|
||||||
|
|
||||||
pub fn start_wifi_client(default_nvs: Arc<EspDefaultNvs>, config: &Config) -> Result<Box<EspWifi>> {
|
pub fn start_wifi_client(default_nvs: Arc<EspDefaultNvs>, config: &Config) -> Result<Box<EspWifi>> {
|
||||||
|
|||||||
@@ -1,58 +1,41 @@
|
|||||||
use crate::conn::mqtt::RETURN_TOPIC;
|
use crate::conn::mqtt::RETURN_TOPIC;
|
||||||
use sphinx_key_signer::{self, InitResponse};
|
use sphinx_key_signer::{self, InitResponse};
|
||||||
use std::sync::{mpsc};
|
use std::sync::{mpsc};
|
||||||
use std::thread;
|
|
||||||
|
|
||||||
use embedded_svc::httpd::Result;
|
|
||||||
use esp_idf_sys;
|
use esp_idf_sys;
|
||||||
|
use embedded_svc::httpd::Result;
|
||||||
use embedded_svc::mqtt::client::utils::ConnState;
|
use embedded_svc::mqtt::client::utils::ConnState;
|
||||||
use embedded_svc::mqtt::client::{MessageImpl, Publish, QoS};
|
use embedded_svc::mqtt::client::{MessageImpl, Publish, QoS};
|
||||||
use esp_idf_svc::mqtt::client::*;
|
use esp_idf_svc::mqtt::client::*;
|
||||||
use esp_idf_sys::EspError;
|
use esp_idf_sys::EspError;
|
||||||
use std::sync::{Arc, Mutex};
|
|
||||||
use log::*;
|
use log::*;
|
||||||
|
|
||||||
pub fn make_event_thread(mqtt: Arc<Mutex<EspMqttClient<ConnState<MessageImpl, EspError>>>>, rx: mpsc::Receiver<Vec<u8>>) -> Result<()> {
|
pub fn make_event_loop(mut mqtt: EspMqttClient<ConnState<MessageImpl, EspError>>, rx: mpsc::Receiver<Vec<u8>>) -> Result<()> {
|
||||||
|
|
||||||
thread::spawn(move||{
|
// initialize the RootHandler
|
||||||
let mut client = mqtt.lock().unwrap();
|
let init_msg_bytes = rx.recv().expect("NO INIT MSG");
|
||||||
info!("About to subscribe to the mpsc channel");
|
let InitResponse { root_handler, init_reply } = sphinx_key_signer::init(init_msg_bytes).expect("failed to init signer");
|
||||||
|
mqtt.publish(RETURN_TOPIC, QoS::AtMostOnce, false, init_reply).expect("could not publish init response");
|
||||||
|
|
||||||
let init_msg_bytes = rx.recv().expect("NO INIT MSG");
|
// signing loop
|
||||||
let InitResponse { root_handler, init_reply } = sphinx_key_signer::init(init_msg_bytes).expect("failed to init signer");
|
while let Ok(msg_bytes) = rx.recv() {
|
||||||
client.publish(
|
let _ret = match sphinx_key_signer::handle(&root_handler, msg_bytes) {
|
||||||
RETURN_TOPIC,
|
Ok(b) => mqtt.publish(RETURN_TOPIC, QoS::AtMostOnce, false, b).expect("could not publish init response"),
|
||||||
QoS::AtMostOnce,
|
Err(e) => panic!("HANDLE FAILED {:?}", e),
|
||||||
false,
|
};
|
||||||
init_reply,
|
}
|
||||||
).expect("could not publish init response");
|
|
||||||
|
|
||||||
while let Ok(msg_bytes) = rx.recv() {
|
|
||||||
let _ret = match sphinx_key_signer::handle(&root_handler, msg_bytes) {
|
|
||||||
Ok(b) => client.publish(RETURN_TOPIC, QoS::AtMostOnce, false, b).expect("could not publish init response"),
|
|
||||||
Err(e) => panic!("HANDLE FAILED {:?}", e),
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
});
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn make_test_event_loop(mut mqtt: EspMqttClient<ConnState<MessageImpl, EspError>>, rx: mpsc::Receiver<Vec<u8>>) -> Result<()> {
|
||||||
|
|
||||||
pub fn make_test_event_thread(mqtt: Arc<Mutex<EspMqttClient<ConnState<MessageImpl, EspError>>>>, rx: mpsc::Receiver<Vec<u8>>) -> Result<()> {
|
info!("About to subscribe to the mpsc channel");
|
||||||
|
while let Ok(msg_bytes) = rx.recv() {
|
||||||
thread::spawn(move||{
|
let b = sphinx_key_signer::parse_ping_and_form_response(msg_bytes);
|
||||||
let mut client = mqtt.lock().unwrap();
|
log::info!("GOT A PING MESSAGE! returning pong now...");
|
||||||
info!("About to subscribe to the mpsc channel");
|
mqtt.publish(RETURN_TOPIC, QoS::AtMostOnce, false, b).expect("could not publish init response");
|
||||||
|
}
|
||||||
while let Ok(msg_bytes) = rx.recv() {
|
|
||||||
let b = sphinx_key_signer::parse_ping_and_form_response(msg_bytes);
|
|
||||||
log::info!("GOT A PING MESSAGE! returning pong now...");
|
|
||||||
client.publish(RETURN_TOPIC, QoS::AtMostOnce, false, b).expect("could not publish init response");
|
|
||||||
}
|
|
||||||
|
|
||||||
});
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ use crate::periph::led::Led;
|
|||||||
|
|
||||||
use esp_idf_sys as _; // If using the `binstart` feature of `esp-idf-sys`, always keep this module imported
|
use esp_idf_sys as _; // If using the `binstart` feature of `esp-idf-sys`, always keep this module imported
|
||||||
use std::thread;
|
use std::thread;
|
||||||
use std::sync::{Arc, Mutex, mpsc};
|
use std::sync::{Arc, mpsc};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
|
|
||||||
@@ -34,16 +34,16 @@ fn main() -> Result<()> {
|
|||||||
// store.remove("config").expect("couldnt remove config");
|
// store.remove("config").expect("couldnt remove config");
|
||||||
let wifi = start_wifi_client(default_nvs.clone(), &exist)?;
|
let wifi = start_wifi_client(default_nvs.clone(), &exist)?;
|
||||||
|
|
||||||
let mqtt_and_conn = conn::mqtt::make_client(&exist.broker)?;
|
|
||||||
|
|
||||||
let mqtt = Arc::new(Mutex::new(mqtt_and_conn.0));
|
|
||||||
// if the subscription goes out of scope its dropped
|
|
||||||
// the sub needs to publish back to mqtt???
|
|
||||||
let (tx, rx) = mpsc::channel();
|
let (tx, rx) = mpsc::channel();
|
||||||
make_test_event_thread(mqtt.clone(), rx)?;
|
// _conn needs to stay in scope or its dropped
|
||||||
let _mqtt_client = conn::mqtt::start_listening(mqtt, mqtt_and_conn.1, tx)?;
|
let (mqtt, connection) = conn::mqtt::make_client(&exist.broker)?;
|
||||||
|
let mqtt_client = conn::mqtt::start_listening(mqtt, connection, tx)?;
|
||||||
|
|
||||||
|
// this blocks forever... the "main thread"
|
||||||
|
log::info!(">>>>>>>>>>> blocking forever...");
|
||||||
|
make_test_event_loop(mqtt_client, rx)?;
|
||||||
|
|
||||||
let mut blue = Led::new(0x000001, 100);
|
let mut blue = Led::new(0x000001, 100);
|
||||||
|
|
||||||
println!("{:?}", wifi.get_status());
|
println!("{:?}", wifi.get_status());
|
||||||
loop {
|
loop {
|
||||||
log::info!("Listening...");
|
log::info!("Listening...");
|
||||||
|
|||||||
Reference in New Issue
Block a user