Merge pull request #20 from stakwork/mpsc-loop

Mpsc loop
This commit is contained in:
Evan Feenstra
2022-06-06 14:26:23 -07:00
committed by GitHub
7 changed files with 127 additions and 142 deletions

View File

@@ -83,3 +83,5 @@ then in the sphinx-key dir, with the CC variable set as above:
`cargo build`
and flash using the instructions further above

View File

@@ -1,13 +1,15 @@
use crate::{ChannelRequest,ChannelReply};
use crate::{ChannelReply, ChannelRequest};
use librumqttd::{async_locallink::construct_broker, Config};
use std::thread;
use tokio::sync::{oneshot, mpsc};
use tokio::sync::{mpsc, oneshot};
const SUB_TOPIC: &str = "sphinx-return";
const PUB_TOPIC: &str = "sphinx";
pub fn start_broker(wait_for_ready_message: bool, mut receiver: mpsc::Receiver<ChannelRequest>) -> tokio::runtime::Runtime {
pub fn start_broker(
wait_for_ready_message: bool,
mut receiver: mpsc::Receiver<ChannelRequest>,
) -> tokio::runtime::Runtime {
let config: Config = confy::load_path("config/rumqttd.conf").unwrap();
let (mut router, console, servers, builder) = construct_broker(config);
@@ -23,7 +25,8 @@ pub fn start_broker(wait_for_ready_message: bool, mut receiver: mpsc::Receiver<C
// channel to block until READY received
let (ready_tx, ready_rx) = oneshot::channel();
tokio::spawn(async move {
let (msg_tx, mut msg_rx): (mpsc::Sender<Vec<u8>>, mpsc::Receiver<Vec<u8>>) = mpsc::channel(1000);
let (msg_tx, mut msg_rx): (mpsc::Sender<Vec<u8>>, mpsc::Receiver<Vec<u8>>) =
mpsc::channel(1000);
let (mut tx, mut rx) = builder.connect("localclient", 200).await.unwrap();
tx.subscribe([SUB_TOPIC]).await.unwrap();
@@ -37,6 +40,7 @@ pub fn start_broker(wait_for_ready_message: bool, mut receiver: mpsc::Receiver<C
let message = rx.recv().await.unwrap();
if let Some(payload) = message.payload.get(0) {
let content = String::from_utf8_lossy(&payload[..]);
log::info!("received message content: {}", content);
if content == "READY" {
ready_tx.send(true).expect("could not send ready");
break;
@@ -58,7 +62,9 @@ pub fn start_broker(wait_for_ready_message: bool, mut receiver: mpsc::Receiver<C
let relay_task = tokio::spawn(async move {
while let Some(msg) = receiver.recv().await {
tx.publish(PUB_TOPIC, false, msg.message).await.expect("could not mqtt pub");
tx.publish(PUB_TOPIC, false, msg.message)
.await
.expect("could not mqtt pub");
let reply = msg_rx.recv().await.expect("could not unwrap msg_rx.recv()");
if let Err(_) = msg.reply_tx.send(ChannelReply { reply }) {
log::warn!("could not send on reply_tx");

View File

@@ -1,23 +1,70 @@
use sphinx_key_parser::MsgDriver;
use lightning_signer::persist::{DummyPersister, Persist};
use lightning_signer::Arc;
use vls_protocol::model::PubKey;
use vls_protocol::msgs;
use sphinx_key_parser::MsgDriver;
use vls_protocol::msgs::{self, read_serial_request_header, write_serial_response_header, Message};
use vls_protocol::serde_bolt::WireString;
use vls_protocol_signer::handler::{Handler, RootHandler};
use vls_protocol_signer::lightning_signer;
use vls_protocol_signer::vls_protocol;
pub fn parse_ping(msg_bytes: Vec<u8>) -> msgs::Ping {
let mut m = MsgDriver::new(msg_bytes);
let (sequence, dbid) = msgs::read_serial_request_header(&mut m).expect("read ping header");
let ping: msgs::Ping =
msgs::read_message(&mut m).expect("failed to read ping message");
ping
pub use vls_protocol_signer::handler::{Handler, RootHandler};
pub struct InitResponse {
pub root_handler: RootHandler,
pub init_reply: Vec<u8>,
}
pub fn say_hi() {
pub fn init(bytes: Vec<u8>) -> anyhow::Result<InitResponse> {
let persister: Arc<dyn Persist> = Arc::new(DummyPersister);
println!("Hello, world!");
let mut md = MsgDriver::new(bytes);
let (sequence, dbid) = read_serial_request_header(&mut md).expect("read init header");
assert_eq!(dbid, 0);
assert_eq!(sequence, 0);
let init: msgs::HsmdInit2 = msgs::read_message(&mut md).expect("failed to read init message");
log::info!("init {:?}", init);
let allowlist = init
.dev_allowlist
.iter()
.map(|s| from_wire_string(s))
.collect::<Vec<_>>();
let seed = init.dev_seed.as_ref().map(|s| s.0).expect("no seed");
let root_handler = RootHandler::new(0, Some(seed), persister, allowlist);
let init_reply = root_handler
.handle(Message::HsmdInit2(init))
.expect("handle init");
let mut reply = MsgDriver::new_empty();
write_serial_response_header(&mut reply, sequence).expect("write init header");
msgs::write_vec(&mut reply, init_reply.as_vec()).expect("write init reply");
Ok(InitResponse {
root_handler,
init_reply: reply.bytes(),
})
}
pub fn handle(_root_handler: &RootHandler, _bytes: Vec<u8>) -> anyhow::Result<Vec<u8>> {
Ok(Vec::new())
}
pub fn parse_ping_and_form_response(msg_bytes: Vec<u8>) -> Vec<u8> {
let mut m = MsgDriver::new(msg_bytes);
let (sequence, _dbid) = msgs::read_serial_request_header(&mut m).expect("read ping header");
let ping: msgs::Ping = msgs::read_message(&mut m).expect("failed to read ping message");
let mut md = MsgDriver::new_empty();
msgs::write_serial_response_header(&mut md, sequence)
.expect("failed to write_serial_request_header");
let pong = msgs::Pong {
id: ping.id,
message: ping.message,
};
msgs::write(&mut md, pong).expect("failed to serial write");
md.bytes()
}
// pub fn say_hi() {
// let persister: Arc<dyn Persist> = Arc::new(DummyPersister);
// println!("Hello, world!");
// }
fn from_wire_string(s: &WireString) -> String {
String::from_utf8(s.0.to_vec()).expect("malformed string")
}

View File

@@ -1,18 +1,14 @@
use crate::core::events::Message;
use embedded_svc::event_bus::Postbox;
use embedded_svc::mqtt::client::utils::ConnState;
use embedded_svc::mqtt::client::{Client, Connection, MessageImpl, Publish, QoS, Event, Message as MqttMessage};
use embedded_svc::mqtt::client::utils::Connection as MqttConnection;
use esp_idf_svc::mqtt::client::*;
use anyhow::Result;
use esp_idf_svc::eventloop::EspBackgroundEventLoop;
use log::*;
use std::thread;
use esp_idf_sys::{self};
use esp_idf_sys::EspError;
use esp_idf_hal::mutex::Condvar;
use std::sync::{Arc, Mutex};
use std::sync::{mpsc};
pub const TOPIC: &str = "sphinx";
pub const RETURN_TOPIC: &str = "sphinx-return";
@@ -46,10 +42,10 @@ pub fn make_client(broker: &str) -> Result<(
}
pub fn start_listening(
mqtt: Arc<Mutex<EspMqttClient<ConnState<MessageImpl, EspError>>>>,
mut client: EspMqttClient<ConnState<MessageImpl, EspError>>,
mut connection: MqttConnection<Condvar, MessageImpl, EspError>,
mut eventloop: EspBackgroundEventLoop
) -> Result<()> {
tx: mpsc::Sender<Vec<u8>>,
) -> Result<EspMqttClient<ConnState<MessageImpl, EspError>>> {
// must start pumping before subscribe or publish will work
thread::spawn(move || {
@@ -60,12 +56,7 @@ pub fn start_listening(
Err(e) => info!("MQTT Message ERROR: {}", e),
Ok(msg) => {
if let Event::Received(msg) = msg {
info!("MQTT MESSAGE RECEIVED!");
if let Ok(m) = Message::new_from_slice(&msg.data()) {
if let Err(e) = eventloop.post(&m, None) {
warn!("failed to post to eventloop {:?}", e);
}
}
tx.send(msg.data().to_vec()).expect("could send to TX");
}
},
}
@@ -73,16 +64,19 @@ pub fn start_listening(
info!("MQTT connection loop exit");
});
let mut client = mqtt.lock().unwrap();
// log::info!("lock mqtt mutex guard");
// let mut client = mqtt.lock().unwrap();
log::info!("SUBSCRIBE TO {}", TOPIC);
client.subscribe(TOPIC, QoS::AtMostOnce)?;
log::info!("PUBLISH {} to {}", "READY", RETURN_TOPIC);
client.publish(
RETURN_TOPIC,
QoS::AtMostOnce,
false,
format!("Hello from {}!", CLIENT_ID).as_bytes(),
format!("READY").as_bytes(),
)?;
Ok(())
Ok(client)
}

View File

@@ -22,6 +22,9 @@ pub struct Config {
arp -a
http://192.168.71.1/?broker=52.91.253.115%3A1883
http://192.168.71.1/?broker=192.168.86.222%3A1883
*/
pub fn start_wifi_client(default_nvs: Arc<EspDefaultNvs>, config: &Config) -> Result<Box<EspWifi>> {

View File

@@ -1,107 +1,42 @@
use crate::conn::mqtt::RETURN_TOPIC;
use crate::periph::led::Led;
use sphinx_key_signer::parse_ping;
use sphinx_key_signer::{self, InitResponse, PubKey};
use std::sync::{mpsc};
use esp_idf_svc::eventloop::*;
use esp_idf_sys;
use embedded_svc::httpd::Result;
use esp_idf_sys::{self, c_types};
use embedded_svc::mqtt::client::utils::ConnState;
use embedded_svc::mqtt::client::{MessageImpl, Publish, QoS};
use esp_idf_svc::mqtt::client::*;
use esp_idf_sys::EspError;
use std::sync::{Arc, Mutex};
use log::*;
use std::cmp::min;
pub const MSG_SIZE: usize = 256;
pub fn make_event_loop(mut mqtt: EspMqttClient<ConnState<MessageImpl, EspError>>, rx: mpsc::Receiver<Vec<u8>>) -> Result<()> {
#[derive(Copy, Clone, Debug)]
pub struct Message([u8; MSG_SIZE]);
// initialize the RootHandler
let init_msg_bytes = rx.recv().expect("NO INIT MSG");
let InitResponse { root_handler, init_reply } = sphinx_key_signer::init(init_msg_bytes).expect("failed to init signer");
mqtt.publish(RETURN_TOPIC, QoS::AtMostOnce, false, init_reply).expect("could not publish init response");
impl Message {
pub fn _new(bytes: [u8; MSG_SIZE]) -> Self {
Self(bytes)
}
// the first byte is the length of the message
pub fn new_from_slice(src: &[u8]) -> Result<Self> {
if src.len() > MSG_SIZE - 1 {
return Err(anyhow::anyhow!("message too long"));
}
let mut dest = [0; MSG_SIZE];
dest[0] = src.len() as u8; // this would crash if MSG_SIZE>256
for i in 0..min(src.len(), MSG_SIZE) {
dest[i+1] = src[i];
}
Ok(Self(dest))
}
pub fn read_bytes(&self) -> Vec<u8> {
let l = self.0[0] as usize;
self.0[1..l+1].to_vec()
}
pub fn read_string(&self) -> String {
String::from_utf8_lossy(&self.0).to_string()
}
}
impl EspTypedEventSource for Message {
fn source() -> *const c_types::c_char {
b"SPHINX\0".as_ptr() as *const _
}
}
impl EspTypedEventSerializer<Message> for Message {
fn serialize<R>(
event: &Message,
f: impl for<'a> FnOnce(&'a EspEventPostData) -> R,
) -> R {
f(&unsafe { EspEventPostData::new(Self::source(), Self::event_id(), event) })
}
}
impl EspTypedEventDeserializer<Message> for Message {
fn deserialize<R>(
data: &EspEventFetchData,
f: &mut impl for<'a> FnMut(&'a Message) -> R,
) -> R {
f(unsafe { data.as_payload() })
}
}
pub fn make_eventloop(client: Arc<Mutex<EspMqttClient<ConnState<MessageImpl, EspError>>>>) -> Result<(EspBackgroundEventLoop, EspBackgroundSubscription)> {
use embedded_svc::event_bus::EventBus;
info!("About to start a background event loop");
let mut eventloop = EspBackgroundEventLoop::new(
&BackgroundLoopConfiguration {
task_stack_size: 8192,
.. Default::default()
},
)?;
let mut green = Led::new(0x000100, 10);
info!("About to subscribe to the background event loop");
let subscription = eventloop.subscribe(move |message: &Message| {
info!("!!! Got message from the event loop"); //: {:?}", message.0);
green.blink();
let msg_bytes = message.read_bytes();
// let msg_str = String::from_utf8_lossy(&msg[..]);
match client.lock() {
Ok(mut m_) => {
let ping = parse_ping(msg_bytes);
if let Err(err) = m_.publish(
RETURN_TOPIC,
QoS::AtMostOnce,
false,
format!("Got and parsed the ping!!!").as_bytes(),
) {
log::warn!("failed to mqtt publish! {:?}", err);
};
},
Err(_) => log::warn!("failed to lock Mutex<Client>")
// signing loop
let dummy_peer = PubKey([0; 33]);
while let Ok(msg_bytes) = rx.recv() {
let _ret = match sphinx_key_signer::handle(&root_handler, msg_bytes, dummy_peer.clone()) {
Ok(b) => mqtt.publish(RETURN_TOPIC, QoS::AtMostOnce, false, b).expect("could not publish init response"),
Err(e) => panic!("HANDLE FAILED {:?}", e),
};
})?;
// let subscription = eventloop.subscribe(cb)?;
}
Ok((eventloop, subscription))
Ok(())
}
pub fn make_test_event_loop(mut mqtt: EspMqttClient<ConnState<MessageImpl, EspError>>, rx: mpsc::Receiver<Vec<u8>>) -> Result<()> {
info!("About to subscribe to the mpsc channel");
while let Ok(msg_bytes) = rx.recv() {
let b = sphinx_key_signer::parse_ping_and_form_response(msg_bytes);
log::info!("GOT A PING MESSAGE! returning pong now...");
mqtt.publish(RETURN_TOPIC, QoS::AtMostOnce, false, b).expect("could not publish init response");
}
Ok(())
}

View File

@@ -6,10 +6,9 @@ mod periph;
use crate::core::{events::*, config::*};
use crate::periph::led::Led;
use sphinx_key_signer;
use esp_idf_sys as _; // If using the `binstart` feature of `esp-idf-sys`, always keep this module imported
use std::thread;
use std::sync::{Arc, Mutex};
use std::sync::{Arc, mpsc};
use std::time::Duration;
use anyhow::Result;
@@ -25,8 +24,6 @@ fn main() -> Result<()> {
esp_idf_svc::log::EspLogger::initialize_default();
sphinx_key_signer::say_hi();
thread::sleep(Duration::from_secs(1));
let default_nvs = Arc::new(EspDefaultNvs::new()?);
@@ -37,22 +34,23 @@ fn main() -> Result<()> {
// store.remove("config").expect("couldnt remove config");
let wifi = start_wifi_client(default_nvs.clone(), &exist)?;
let mqtt_and_conn = conn::mqtt::make_client(&exist.broker)?;
let mqtt = Arc::new(Mutex::new(mqtt_and_conn.0));
// if the subscription goes out of scope its dropped
// the sub needs to publish back to mqtt???
let (eventloop, _sub) = make_eventloop(mqtt.clone())?;
let _mqtt_client = conn::mqtt::start_listening(mqtt, mqtt_and_conn.1, eventloop)?;
let (tx, rx) = mpsc::channel();
// _conn needs to stay in scope or its dropped
let (mqtt, connection) = conn::mqtt::make_client(&exist.broker)?;
let mqtt_client = conn::mqtt::start_listening(mqtt, connection, tx)?;
// this blocks forever... the "main thread"
log::info!(">>>>>>>>>>> blocking forever...");
make_test_event_loop(mqtt_client, rx)?;
let mut blue = Led::new(0x000001, 100);
println!("{:?}", wifi.get_status());
loop {
log::info!("Listening...");
blue.blink();
thread::sleep(Duration::from_secs(1));
}
drop(wifi);
// drop(wifi);
} else {
println!("=============> START SERVER NOW AND WAIT <==============");
if let Ok((wifi, config)) = start_config_server_and_wait(default_nvs.clone()) {