Merge pull request #81 from stakwork/feat/lss2-details

Feat/lss2 details
This commit is contained in:
Evan Feenstra
2023-06-06 13:48:16 -07:00
committed by GitHub
21 changed files with 1872 additions and 1163 deletions

1836
broker/Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -9,11 +9,12 @@ strip = "debuginfo"
[dependencies]
sphinx-signer = { git = "https://github.com/stakwork/sphinx-rs" }
# sphinx-key-parser = { path = "../parser" }
vls-protocol = { git = "https://gitlab.com/lightning-signer/validating-lightning-signer.git", rev = "dcd8628893b5504b3ac2d3eb8cc5ed36f36d7625" }
vls-proxy = { git = "https://gitlab.com/lightning-signer/validating-lightning-signer.git", rev = "dcd8628893b5504b3ac2d3eb8cc5ed36f36d7625" }
vls-frontend = { git = "https://gitlab.com/lightning-signer/validating-lightning-signer.git", rev = "dcd8628893b5504b3ac2d3eb8cc5ed36f36d7625" }
vls-protocol-client = { git = "https://gitlab.com/lightning-signer/validating-lightning-signer.git", rev = "dcd8628893b5504b3ac2d3eb8cc5ed36f36d7625" }
lss-connector = { git = "https://github.com/stakwork/sphinx-rs" }
# lss-connector = { path = "../../sphinx-rs/lss-connector" }
vls-protocol = { git = "https://gitlab.com/lightning-signer/validating-lightning-signer.git", rev = "0c8c4474af62c4a13e2a32d34b569e8092ed414a" }
vls-proxy = { git = "https://gitlab.com/lightning-signer/validating-lightning-signer.git", rev = "0c8c4474af62c4a13e2a32d34b569e8092ed414a" }
vls-frontend = { git = "https://gitlab.com/lightning-signer/validating-lightning-signer.git", rev = "0c8c4474af62c4a13e2a32d34b569e8092ed414a" }
vls-protocol-client = { git = "https://gitlab.com/lightning-signer/validating-lightning-signer.git", rev = "0c8c4474af62c4a13e2a32d34b569e8092ed414a" }
rumqttd = { git = "https://github.com/Evanfeenstra/rumqtt", branch = "sphinx-2" }
pretty_env_logger = "0.4.0"
confy = "0.4.0"

68
broker/lss.md Normal file
View File

@@ -0,0 +1,68 @@
# steps to integrate LSS
### initialization
##### broker
- check that there is an LSS url to use
- LssClient::get_server_pubkey
- send server pubkey to signer
##### signer
- let client_id = keys_manager.get_persistence_pubkey()
- let auth_token = keys_manager.get_persistence_auth_token(&server_pubkey)
- let shared_secret = keys_manager.get_persistence_shared_secret(&server_pubkey)
- create a ExternalPersistHelper locally and init `state`
- helper.new_nonce
- send the client_id, auth_token, and nonce back to the broker
##### broker
- create Auth
- LssClient::new
- get ALL muts from cloud
- let (muts, server_hmac) = client.get("".to_string(), &nonce)
- send the muts and server_hmac to signer
##### signer
- check the server hmac
- insert the muts into local state
- let handler_builder = handler_builder.lss_state(...);
- (what is the above line do it muts are already inserted???)
- let (handler, muts) = handler_builder.build();
- helper.client_hmac
- send the muts and client_hmac back to broker
##### broker
- store the muts using the LssClient (client.put(muts, &client_hmac))
- send server_hmac back to signer???
- init the Unix Fd connection finally, so the hsmd_init message comes
##### signer
- need to verify server hmac here???
### VLS
##### signer
- let (reply, muts) = handler.handle(msg)
- let client_hmac = helper.client_hmac(&muts);
- send muts and hmac to broker
##### broker
- client.put(muts, &client_hmac).await?
- server hmac sent back to signer
##### signer
- verify server hmac
- finally, send the VLS reply back to broker
##### broker
- forward the VLS reply back to CLN

View File

@@ -1,4 +1,4 @@
use crate::{ChannelReply, ChannelRequest};
use crate::conn::{ChannelReply, ChannelRequest};
use async_trait::async_trait;
use rocket::tokio::sync::{mpsc, oneshot};
use sphinx_signer::sphinx_glyph::topics;
@@ -16,10 +16,8 @@ impl SignerPort for MqttSignerPort {
self.get_reply(reply_rx).await
}
fn clone(&self) -> Box<dyn SignerPort> {
Box::new(Self {
sender: self.sender.clone(),
})
fn is_ready(&self) -> bool {
true
}
}

126
broker/src/conn.rs Normal file
View File

@@ -0,0 +1,126 @@
use rocket::tokio::sync::{mpsc, oneshot};
use serde::{Deserialize, Serialize};
use anyhow::Result;
#[derive(Debug, Serialize, Deserialize)]
pub struct Connections {
pub pubkey: Option<String>,
pub clients: Vec<String>,
}
impl Connections {
pub fn new() -> Self {
Self {
pubkey: None,
clients: Vec::new(),
}
}
pub fn set_pubkey(&mut self, pk: &str) {
self.pubkey = Some(pk.to_string())
}
pub fn add_client(&mut self, cid: &str) {
let cids = cid.to_string();
if !self.clients.contains(&cids) {
// new client is added to beginning of Vec
self.clients.insert(0, cids);
}
}
pub fn remove_client(&mut self, cid: &str) {
let cids = cid.to_string();
if self.clients.contains(&cids) {
self.clients.retain(|x| x != cid)
}
}
pub fn client_action(&mut self, cid: &str, connected: bool) {
if connected {
self.add_client(cid);
} else {
self.remove_client(cid);
}
}
}
pub struct Channel {
pub sequence: u16,
pub sender: mpsc::Sender<ChannelRequest>,
pub pubkey: [u8; 33],
}
/// Responses are received on the oneshot sender
#[derive(Debug)]
pub struct ChannelRequest {
pub topic: String,
pub message: Vec<u8>,
pub reply_tx: oneshot::Sender<ChannelReply>,
pub cid: Option<String>, // if it exists, only try the one client
}
impl ChannelRequest {
pub fn new(topic: &str, message: Vec<u8>) -> (Self, oneshot::Receiver<ChannelReply>) {
let (reply_tx, reply_rx) = oneshot::channel();
let cr = ChannelRequest {
topic: topic.to_string(),
message,
reply_tx,
cid: None,
};
(cr, reply_rx)
}
pub async fn send(topic: &str, message: Vec<u8>, sender: &mpsc::Sender<ChannelRequest>) -> Result<Vec<u8>> {
let (reply_tx, reply_rx) = oneshot::channel();
let req = ChannelRequest {
topic: topic.to_string(),
message,
reply_tx,
cid: None,
};
let _ = sender.send(req).await;
let reply = reply_rx.await?;
Ok(reply.reply)
}
pub async fn send_for(cid: &str, topic: &str, message: Vec<u8>, sender: &mpsc::Sender<ChannelRequest>) -> Result<Vec<u8>> {
let (reply_tx, reply_rx) = oneshot::channel();
let req = ChannelRequest {
topic: topic.to_string(),
message,
reply_tx,
cid: Some(cid.to_string()),
};
let _ = sender.send(req).await;
let reply = reply_rx.await?;
Ok(reply.reply)
}
pub fn for_cid(&mut self, cid: &str) {
self.cid = Some(cid.to_string())
}
pub fn new_for(
cid: &str,
topic: &str,
message: Vec<u8>,
) -> (Self, oneshot::Receiver<ChannelReply>) {
let (mut cr, reply_rx) = ChannelRequest::new(topic, message);
cr.for_cid(cid);
(cr, reply_rx)
}
}
// mpsc reply
#[derive(Debug)]
pub struct ChannelReply {
// the return topic end part (after last "/")
pub topic_end: String,
pub reply: Vec<u8>,
}
/// Responses are received on the oneshot sender
#[derive(Debug)]
pub struct LssReq {
pub message: Vec<u8>,
pub reply_tx: oneshot::Sender<Vec<u8>>,
}
impl LssReq {
pub fn new(message: Vec<u8>) -> (Self, oneshot::Receiver<Vec<u8>>) {
let (reply_tx, reply_rx) = oneshot::channel();
let cr = Self { message, reply_tx };
(cr, reply_rx)
}
}

View File

@@ -1,11 +1,16 @@
use crate::conn::{Channel, ChannelRequest, LssReq};
use crate::util::Settings;
use crate::{Channel, ChannelReply, ChannelRequest};
use bitcoin::blockdata::constants::ChainHash;
use log::*;
use rocket::tokio::sync::{mpsc, oneshot};
use rocket::tokio::sync::mpsc;
use secp256k1::PublicKey;
use sphinx_signer::{parser, sphinx_glyph::topics};
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use std::thread;
use std::time::Duration;
use vls_protocol::{msgs, msgs::Message, Error, Result};
use vls_proxy::client::Client;
@@ -31,25 +36,35 @@ pub struct SignerLoop<C: 'static + Client> {
log_prefix: String,
chan: Channel,
client_id: Option<ClientId>,
lss_tx: mpsc::Sender<LssReq>,
busy: Arc<AtomicBool>,
}
impl<C: 'static + Client> SignerLoop<C> {
/// Create a loop for the root (lightningd) connection, but doesn't start it yet
pub fn new(client: C, sender: mpsc::Sender<ChannelRequest>) -> Self {
pub fn new(
client: C,
lss_tx: mpsc::Sender<LssReq>,
sender: mpsc::Sender<ChannelRequest>,
) -> Self {
let log_prefix = format!("{}/{}", std::process::id(), client.id());
Self {
client,
log_prefix,
chan: Channel::new(sender),
client_id: None,
lss_tx,
busy: Arc::new(AtomicBool::new(false)),
}
}
// Create a loop for a non-root connection
fn new_for_client(
client: C,
lss_tx: mpsc::Sender<LssReq>,
sender: mpsc::Sender<ChannelRequest>,
client_id: ClientId,
busy: Arc<AtomicBool>,
) -> Self {
let log_prefix = format!("{}/{}", std::process::id(), client.id());
Self {
@@ -57,6 +72,8 @@ impl<C: 'static + Client> SignerLoop<C> {
log_prefix,
chan: Channel::new(sender),
client_id: Some(client_id),
lss_tx,
busy,
}
}
@@ -86,8 +103,13 @@ impl<C: 'static + Client> SignerLoop<C> {
peer_id,
dbid: m.dbid,
};
let mut new_loop =
SignerLoop::new_for_client(new_client, self.chan.sender.clone(), client_id);
let mut new_loop = SignerLoop::new_for_client(
new_client,
self.lss_tx.clone(),
self.chan.sender.clone(),
client_id,
self.busy.clone(),
);
thread::spawn(move || new_loop.start(None));
}
Message::Memleak(_) => {
@@ -110,15 +132,25 @@ impl<C: 'static + Client> SignerLoop<C> {
}
}
let reply = self.handle_message(raw_msg, catch_init)?;
// Write the reply to the node
// Write the reply to CLN
self.client.write_vec(reply)?;
// info!("replied {}", self.log_prefix);
}
}
}
}
fn handle_message(&mut self, message: Vec<u8>, catch_init: bool) -> Result<Vec<u8>> {
// wait until not busy
loop {
match self
.busy
.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
{
Ok(_) => break,
Err(_) => thread::sleep(Duration::from_millis(5)),
};
}
let dbid = self.client_id.as_ref().map(|c| c.dbid).unwrap_or(0);
let peer_id = self
.client_id
@@ -126,16 +158,34 @@ impl<C: 'static + Client> SignerLoop<C> {
.map(|c| c.peer_id.serialize())
.unwrap_or([0u8; 33]);
let md = parser::raw_request_from_bytes(message, self.chan.sequence, peer_id, dbid)?;
// send to glyph
let reply_rx = self.send_request(md)?;
let res = self.get_reply(reply_rx)?;
let reply = parser::raw_response_from_bytes(res, self.chan.sequence)?;
// send to signer
log::info!("SEND ON {}", topics::VLS);
let (res_topic, res) = self.send_request_and_get_reply(topics::VLS, md)?;
log::info!("GOT ON {}", res_topic);
let mut the_res = res.clone();
if res_topic == topics::LSS_RES {
// send reply to LSS to store muts
let lss_reply = self.send_lss_and_get_reply(res)?;
log::info!("LSS REPLY LEN {}", &lss_reply.len());
// send to signer for HMAC validation, and get final reply
log::info!("SEND ON {}", topics::LSS_MSG);
let (res_topic2, res2) = self.send_request_and_get_reply(topics::LSS_MSG, lss_reply)?;
log::info!("GOT ON {}, send to CLN", res_topic2);
if res_topic2 != topics::VLS_RETURN {
log::warn!("got a topic NOT on {}", topics::VLS_RETURN);
}
the_res = res2;
}
// create reply bytes for CLN
let reply = parser::raw_response_from_bytes(the_res, self.chan.sequence)?;
// add to the sequence
self.chan.sequence = self.chan.sequence.wrapping_add(1);
// catch the pubkey if its the first one connection
if catch_init {
let _ = self.set_channel_pubkey(reply.clone());
}
// unlock
self.busy.store(false, Ordering::Relaxed);
Ok(reply)
}
@@ -154,21 +204,31 @@ impl<C: 'static + Client> SignerLoop<C> {
Ok(())
}
fn send_request(&mut self, message: Vec<u8>) -> Result<oneshot::Receiver<ChannelReply>> {
// returns (topic, payload)
// might halt if signer is offline
fn send_request_and_get_reply(
&mut self,
topic: &str,
message: Vec<u8>,
) -> Result<(String, Vec<u8>)> {
// Send a request to the MQTT handler to send to signer
let (request, reply_rx) = ChannelRequest::new(topics::VLS, message);
let (request, reply_rx) = ChannelRequest::new(topic, message);
// This can fail if MQTT shuts down
self.chan
.sender
.blocking_send(request)
.map_err(|_| Error::Eof)?;
Ok(reply_rx)
let reply = reply_rx.blocking_recv().map_err(|_| Error::Eof)?;
Ok((reply.topic_end, reply.reply))
}
fn get_reply(&mut self, reply_rx: oneshot::Receiver<ChannelReply>) -> Result<Vec<u8>> {
// Wait for the signer reply
// Can fail if MQTT shuts down
let reply = reply_rx.blocking_recv().map_err(|_| Error::Eof)?;
Ok(reply.reply)
fn send_lss_and_get_reply(&mut self, message: Vec<u8>) -> Result<Vec<u8>> {
// Send a request to the MQTT handler to send to signer
let (request, reply_rx) = LssReq::new(message);
// This can fail if MQTT shuts down
self.lss_tx.blocking_send(request).map_err(|_| Error::Eof)?;
let res = reply_rx.blocking_recv().map_err(|_| Error::Eof)?;
Ok(res)
}
}

72
broker/src/lss.rs Normal file
View File

@@ -0,0 +1,72 @@
use anyhow::Result;
use rocket::tokio::{
self,
sync::{mpsc},
};
use std::time::Duration;
use crate::conn::{ChannelRequest, LssReq};
use lss_connector::{LssBroker, Response};
use sphinx_signer::sphinx_glyph::topics;
pub async fn lss_setup(uri: &str, mqtt_tx: mpsc::Sender<ChannelRequest>) -> Result<LssBroker> {
// LSS required
let (spk, msg_bytes) = LssBroker::get_server_pubkey(uri).await?;
let reply = ChannelRequest::send(topics::LSS_MSG, msg_bytes, &mqtt_tx).await?;
let ir = Response::from_slice(&reply)?.as_init()?;
let lss_conn = LssBroker::new(uri, ir.clone(), spk).await?;
// this only returns the initial state if it was requested by signer
let msg_bytes2 = lss_conn.get_created_state_msg(&ir).await?;
let reply2 = ChannelRequest::send(topics::LSS_MSG, msg_bytes2, &mqtt_tx).await?;
let cr = Response::from_slice(&reply2)?.as_created()?;
lss_conn.handle(Response::Created(cr)).await?;
Ok(lss_conn)
}
pub fn lss_tasks(lss_conn: LssBroker, mut lss_rx: mpsc::Receiver<LssReq>, mut reconn_rx: mpsc::Receiver<(String, bool)>, mqtt_tx: mpsc::Sender<ChannelRequest>) {
// msg handler (from CLN looper)
let lss_conn_ = lss_conn.clone();
tokio::task::spawn(async move{
while let Some(req) = lss_rx.recv().await {
match lss_conn_.handle_bytes(&req.message).await {
Ok(msg) => {
let _ = req.reply_tx.send(msg);
},
Err(e) => {
log::error!("failed lss_handle {:?}", e);
}
}
}
});
// reconnect handler (when a client reconnects)
let lss_conn_ = lss_conn.clone();
let mqtt_tx_ = mqtt_tx.clone();
tokio::task::spawn(async move{
while let Some((cid, connected)) = reconn_rx.recv().await {
if connected {
log::info!("CLIENT {} reconnected!", cid);
if let Err(e) = reconnect_dance(&cid, &lss_conn_, &mqtt_tx_).await {
log::error!("reconnect dance failed {:?}", e);
}
}
}
});
}
async fn reconnect_dance(cid: &str, lss_conn: &LssBroker, mqtt_tx: &mpsc::Sender<ChannelRequest>) -> Result<()> {
// sleep 3 seconds to make sure ESP32 subscription is active
tokio::time::sleep(Duration::from_secs(3)).await;
let init_bytes = lss_conn.make_init_msg().await?;
let reply = ChannelRequest::send_for(cid, topics::LSS_MSG, init_bytes, mqtt_tx).await?;
let ir = Response::from_slice(&reply)?.as_init()?;
let state_bytes = lss_conn.get_created_state_msg(&ir).await?;
let reply2 = ChannelRequest::send_for(cid, topics::LSS_MSG, state_bytes, mqtt_tx).await?;
let cr = Response::from_slice(&reply2)?.as_created()?;
lss_conn.handle(Response::Created(cr)).await?;
Ok(())
}

View File

@@ -4,21 +4,23 @@ mod error_log;
mod mqtt;
mod routes;
mod run_test;
mod unix_fd;
mod looper;
mod util;
mod conn;
mod lss;
use crate::conn::{Connections, ChannelRequest, LssReq};
use crate::chain_tracker::MqttSignerPort;
use crate::mqtt::{check_auth, start_broker};
use crate::unix_fd::SignerLoop;
use crate::looper::SignerLoop;
use crate::util::{read_broker_config, Settings};
use clap::{arg, App};
use rocket::tokio::{
self,
sync::{broadcast, mpsc, oneshot},
sync::{broadcast, mpsc},
};
use rumqttd::{oneshot as std_oneshot, AuthMsg};
use serde::{Deserialize, Serialize};
use std::env;
use std::{env, time::Duration};
use std::sync::{Arc, Mutex};
use url::Url;
use vls_frontend::{frontend::SourceFactory, Frontend};
@@ -27,91 +29,6 @@ use vls_proxy::connection::{open_parent_fd, UnixConnection};
use vls_proxy::portfront::SignerPortFront;
use vls_proxy::util::{add_hsmd_args, handle_hsmd_version};
#[derive(Debug, Serialize, Deserialize)]
pub struct Connections {
pub pubkey: Option<String>,
pub clients: Vec<String>,
}
impl Connections {
pub fn new() -> Self {
Self {
pubkey: None,
clients: Vec::new(),
}
}
pub fn set_pubkey(&mut self, pk: &str) {
self.pubkey = Some(pk.to_string())
}
pub fn add_client(&mut self, cid: &str) {
let cids = cid.to_string();
if !self.clients.contains(&cids) {
self.clients.push(cids)
}
}
pub fn remove_client(&mut self, cid: &str) {
let cids = cid.to_string();
if self.clients.contains(&cids) {
self.clients.retain(|x| x != cid)
}
}
pub fn client_action(&mut self, cid: &str, connected: bool) {
if connected {
self.add_client(cid);
} else {
self.remove_client(cid);
}
}
}
pub struct Channel {
pub sequence: u16,
pub sender: mpsc::Sender<ChannelRequest>,
pub pubkey: [u8; 33],
}
/// Responses are received on the oneshot sender
#[derive(Debug)]
pub struct ChannelRequest {
pub topic: String,
pub message: Vec<u8>,
pub reply_tx: oneshot::Sender<ChannelReply>,
pub cid: Option<String>, // if it exists, only try the one client
}
impl ChannelRequest {
pub fn new(topic: &str, message: Vec<u8>) -> (Self, oneshot::Receiver<ChannelReply>) {
let (reply_tx, reply_rx) = oneshot::channel();
let cr = ChannelRequest {
topic: topic.to_string(),
message,
reply_tx,
cid: None,
};
(cr, reply_rx)
}
pub fn for_cid(&mut self, cid: &str) {
self.cid = Some(cid.to_string())
}
pub fn new_for(
cid: &str,
topic: &str,
message: Vec<u8>,
) -> (Self, oneshot::Receiver<ChannelReply>) {
let (mut cr, reply_rx) = ChannelRequest::new(topic, message);
cr.for_cid(cid);
(cr, reply_rx)
}
}
// mpsc reply
#[derive(Debug)]
pub struct ChannelReply {
pub reply: Vec<u8>,
}
// const CLIENT_ID: &str = "sphinx-1";
const BROKER_CONFIG_PATH: &str = "../broker.conf";
#[rocket::launch]
async fn rocket() -> _ {
let parent_fd = open_parent_fd();
@@ -135,9 +52,9 @@ async fn rocket() -> _ {
panic!("end")
} else {
if matches.is_present("test") {
run_test::run_test()
run_test::run_test().await
} else {
run_main(parent_fd)
run_main(parent_fd).await
}
}
}
@@ -149,10 +66,78 @@ fn make_clap_app() -> App<'static> {
add_hsmd_args(app)
}
async fn run_main(parent_fd: i32) -> rocket::Rocket<rocket::Build> {
let settings = read_broker_config();
let (mqtt_tx, mqtt_rx) = mpsc::channel(10000);
let (error_tx, error_rx) = broadcast::channel(10000);
error_log::log_errors(error_rx);
let (reconn_tx, reconn_rx) = mpsc::channel::<(String, bool)>(10000);
// waits until first connection
let conns = broker_setup(settings, mqtt_rx, reconn_tx.clone(), error_tx.clone()).await;
tokio::time::sleep(Duration::from_secs(1)).await;
let (lss_tx, lss_rx) = mpsc::channel::<LssReq>(10000);
let _lss_broker = if let Ok(lss_uri) = env::var("VLS_LSS") {
// waits until LSS confirmation from signer
let lss_broker = loop {
match lss::lss_setup(&lss_uri, mqtt_tx.clone()).await{
Ok(l) => {
break l;
},
Err(e) => {
let _ = error_tx.send(e.to_string().as_bytes().to_vec());
log::error!("failed LSS setup, trying again...");
tokio::time::sleep(Duration::from_secs(3)).await;
}
}
};
lss::lss_tasks(lss_broker.clone(), lss_rx, reconn_rx, mqtt_tx.clone());
log::info!("=> lss broker connection created!");
Some(lss_broker)
} else {
log::warn!("running without LSS");
None
};
if let Ok(btc_url) = env::var("BITCOIND_RPC_URL") {
let signer_port = MqttSignerPort::new(mqtt_tx.clone());
let port_front = SignerPortFront::new(Arc::new(signer_port), settings.network);
let source_factory = Arc::new(SourceFactory::new(".", settings.network));
let frontend = Frontend::new(
Arc::new(port_front),
source_factory,
Url::parse(&btc_url).expect("malformed btc rpc url"),
);
tokio::spawn(async move {
frontend.start();
});
} else {
log::warn!("Running without a frontend")
}
// test sleep FIXME
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
let cln_client = UnixClient::new(UnixConnection::new(parent_fd));
// TODO pass status_rx into SignerLoop?
let mut signer_loop = SignerLoop::new(cln_client, lss_tx.clone(), mqtt_tx.clone());
// spawn CLN listener
std::thread::spawn(move || {
signer_loop.start(Some(settings));
});
routes::launch_rocket(mqtt_tx, error_tx, settings, conns)
}
// blocks until a connection received
pub fn main_setup(
pub async fn broker_setup(
settings: Settings,
mqtt_rx: mpsc::Receiver<ChannelRequest>,
reconn_tx: mpsc::Sender<(String, bool)>,
error_tx: broadcast::Sender<Vec<u8>>,
) -> Arc<Mutex<Connections>> {
let (auth_tx, auth_rx) = std::sync::mpsc::channel::<AuthMsg>();
@@ -170,7 +155,7 @@ pub fn main_setup(
let _ = am.reply.send(ok);
}
});
// broker
log::info!("=> start broker on network: {}", settings.network);
start_broker(
@@ -186,6 +171,7 @@ pub fn main_setup(
// client connections state
let (startup_tx, startup_rx) = std_oneshot::channel();
let conns_ = conns.clone();
let reconn_tx_ = reconn_tx.clone();
std::thread::spawn(move || {
log::info!("=> wait for connected status");
// wait for connection = true
@@ -196,9 +182,11 @@ pub fn main_setup(
log::info!("=> connected: {}: {}", cid, connected);
let _ = startup_tx.send(true);
while let Ok((cid, connected)) = status_rx.recv() {
log::info!("=> reconnected: {}: {}", cid, connected);
let mut cs = conns_.lock().unwrap();
cs.client_action(&cid, connected);
drop(cs)
drop(cs);
let _ = reconn_tx_.blocking_send((cid, connected));
}
});
let _ = startup_rx.recv();
@@ -206,47 +194,3 @@ pub fn main_setup(
conns
}
fn run_main(parent_fd: i32) -> rocket::Rocket<rocket::Build> {
let settings = read_broker_config(BROKER_CONFIG_PATH);
let (mqtt_tx, mqtt_rx) = mpsc::channel(10000);
let (error_tx, error_rx) = broadcast::channel(10000);
error_log::log_errors(error_rx);
let conns = main_setup(settings, mqtt_rx, error_tx.clone());
// let mqtt_tx_ = mqtt_tx.clone();
// tokio::spawn(async move {
// while let Some(msg) = unix_rx.recv().await {
// // update LSS here?
// if let Err(e) = mqtt_tx_.send(msg).await {
// log::error!("failed to send on mqtt_tx {:?}", e);
// }
// }
// });
if let Ok(btc_url) = env::var("BITCOIND_RPC_URL") {
let signer_port = Box::new(MqttSignerPort::new(mqtt_tx.clone()));
let port_front = SignerPortFront::new(signer_port, settings.network);
let source_factory = Arc::new(SourceFactory::new(".", settings.network));
let frontend = Frontend::new(
Arc::new(port_front),
source_factory,
Url::parse(&btc_url).expect("malformed btc rpc url"),
);
tokio::spawn(async move {
frontend.start();
});
} else {
log::warn!("Running without a frontend")
}
let cln_client = UnixClient::new(UnixConnection::new(parent_fd));
// TODO pass status_rx into SignerLoop?
let mut signer_loop = SignerLoop::new(cln_client, mqtt_tx.clone());
// spawn CLN listener
std::thread::spawn(move || {
signer_loop.start(Some(settings));
});
routes::launch_rocket(mqtt_tx, error_tx, settings, conns)
}

View File

@@ -1,6 +1,6 @@
use crate::conn::Connections;
use crate::conn::{ChannelReply, ChannelRequest};
use crate::util::Settings;
use crate::Connections;
use crate::{ChannelReply, ChannelRequest};
use rocket::tokio::{sync::broadcast, sync::mpsc};
use rumqttd::{local::LinkTx, Alert, AlertEvent, AuthMsg, Broker, Config, Notification};
use sphinx_signer::sphinx_glyph::sphinx_auther::token::Token;
@@ -75,56 +75,41 @@ pub fn start_broker(
});
// String is the client id
let (msg_tx, msg_rx) = std::sync::mpsc::channel::<(String, Vec<u8>)>();
let (msg_tx, msg_rx) = std::sync::mpsc::channel::<(String, String, Vec<u8>)>();
// receive from CLN, Frontend, or Controller
// receive from CLN, Frontend, Controller, or LSS
let conns_ = connections.clone();
let _relay_task = std::thread::spawn(move || {
while let Some(msg) = receiver.blocking_recv() {
if let Some(cid) = msg.cid {
// for a specific client
let pub_topic = format!("{}/{}", cid, msg.topic);
if let Err(e) = link_tx.publish(pub_topic, msg.message.clone()) {
log::error!("failed to pub to link_tx! {} {:?}", cid, e);
}
let rep = msg_rx.recv();
if let Ok((cid, reply)) = rep {
if let Err(_) = msg.reply_tx.send(ChannelReply { reply }) {
log::warn!("could not send on reply_tx {}", cid);
}
}
} else {
// send to each client in turn
'retry_loop: loop {
// get the current list of connected clients
loop {
let reply = if let Some(cid) = msg.cid.clone() {
// for a specific client
pub_wait(&cid, &msg.topic, &msg.message, &msg_rx, &mut link_tx)
} else {
// send to each client in turn
let cs = conns_.lock().unwrap();
let client_list = cs.clients.clone();
drop(cs);
// wait a second if there are no clients
if client_list.len() == 0 {
std::thread::sleep(Duration::from_secs(1));
}
for client in client_list.iter() {
let pub_topic = format!("{}/{}", client, msg.topic);
if let Err(e) = link_tx.publish(pub_topic, msg.message.clone()) {
log::error!("failed to pub to link_tx! {:?}", e);
}
// and receive from the correct client (or timeout to next)
let dur = Duration::from_secs(9);
let rep = msg_rx.recv_timeout(dur);
if let Ok((cid, reply)) = rep {
if &cid == client {
if let Err(_) = msg.reply_tx.send(ChannelReply { reply }) {
log::warn!("could not send on reply_tx");
}
break 'retry_loop;
} else {
log::warn!("Mismatched client id!");
// wait a second before trying again
std::thread::sleep(Duration::from_secs(1));
None
} else {
let mut rep = None;
for cid in client_list.iter() {
rep = pub_wait(&cid, &msg.topic, &msg.message, &msg_rx, &mut link_tx);
if let Some(_) = &rep {
break;
}
}
rep
}
};
if let Some(reply) = reply {
if let Err(_) = msg.reply_tx.send(reply) {
log::warn!("could not send on reply_tx");
}
break;
}
}
}
@@ -144,12 +129,16 @@ pub fn start_broker(
if topic.ends_with(topics::ERROR) {
let _ = error_sender.send(f.publish.payload.to_vec());
} else {
// VLS, CONTROL, LSS
let ts: Vec<&str> = topic.split("/").collect();
if ts.len() != 2 {
continue;
}
let cid = ts[0].to_string();
if let Err(e) = msg_tx.send((cid, f.publish.payload.to_vec())) {
let topic_end = ts[1].to_string();
if let Err(e) =
msg_tx.send((cid, topic_end, f.publish.payload.to_vec()))
{
log::error!("failed to pub to msg_tx! {:?}", e);
}
}
@@ -169,15 +158,45 @@ pub fn start_broker(
Ok(())
}
// publish to signer and wait for response
// if timeout is exceed, try next signer
fn pub_wait(
client_id: &str,
topic: &str,
payload: &[u8],
msg_rx: &std::sync::mpsc::Receiver<(String, String, Vec<u8>)>,
link_tx: &mut LinkTx,
) -> Option<ChannelReply> {
let pub_topic = format!("{}/{}", client_id, topic);
log::info!("SENDING TO {} on topic {}", client_id, topic);
if let Err(e) = link_tx.publish(pub_topic, payload.to_vec()) {
log::error!("failed to pub to link_tx! {:?}", e);
}
// and receive from the correct client (or timeout to next)
let dur = Duration::from_secs(9);
if let Ok((cid, topic_end, reply)) = msg_rx.recv_timeout(dur) {
if &cid == client_id {
return Some(ChannelReply { reply, topic_end });
} else {
log::warn!("Mismatched client id!");
// wait a second before trying again
std::thread::sleep(Duration::from_secs(1));
}
}
None
}
fn subs(cid: &str, mut ltx: LinkTx) {
ltx.subscribe(format!("{}/{}", cid, topics::VLS_RETURN))
.unwrap();
ltx.subscribe(format!("{}/{}", cid, topics::CONTROL_RETURN))
.unwrap();
ltx.subscribe(format!("{}/{}", cid, topics::ERROR)).unwrap();
ltx.subscribe(format!("{}/{}", cid, topics::LSS_RES))
.unwrap();
}
fn unsubs(cid: &str, mut ltx: LinkTx) {
fn unsubs(_cid: &str, mut _ltx: LinkTx) {
// ltx.unsubscribe(format!("{}/{}", cid, topics::VLS_RETURN))
// .unwrap();
// ltx.unsubscribe(format!("{}/{}", cid, topics::CONTROL_RETURN))

View File

@@ -1,6 +1,6 @@
use crate::util::Settings;
use crate::ChannelRequest;
use crate::Connections;
use crate::conn::ChannelRequest;
use crate::conn::Connections;
use rocket::fairing::{Fairing, Info, Kind};
use rocket::http::Header;
use rocket::response::stream::{Event, EventStream};
@@ -31,7 +31,7 @@ pub async fn control(
cid: &str,
) -> Result<String> {
let message = hex::decode(msg)?;
// FIXME validate?
// FIXME validate? and auth here?
if message.len() < 65 {
return Err(Error::Fail);
}

View File

@@ -1,6 +1,6 @@
use crate::routes::launch_rocket;
use crate::util::Settings;
use crate::ChannelRequest;
use crate::conn::ChannelRequest;
use rocket::tokio::{self, sync::broadcast, sync::mpsc};
use sphinx_signer::{parser, sphinx_glyph::topics};
use vls_protocol::serde_bolt::WireString;
@@ -8,7 +8,7 @@ use vls_protocol::{msgs, msgs::Message};
// const CLIENT_ID: &str = "test-1";
pub fn run_test() -> rocket::Rocket<rocket::Build> {
pub async fn run_test() -> rocket::Rocket<rocket::Build> {
log::info!("TEST...");
// let mut id = 0u16;
@@ -17,10 +17,12 @@ pub fn run_test() -> rocket::Rocket<rocket::Build> {
let settings = Settings::default();
let (mqtt_tx, mqtt_rx) = mpsc::channel(10000);
let (error_tx, error_rx) = broadcast::channel(10000);
let (conn_tx, _conn_rx) = mpsc::channel(10000);
crate::error_log::log_errors(error_rx);
// block until connection
let conns = crate::main_setup(settings, mqtt_rx, error_tx.clone());
let conns = crate::broker_setup(settings, mqtt_rx, conn_tx.clone(), error_tx.clone()).await;
log::info!("=> off to the races!");
let tx_ = mqtt_tx.clone();

View File

@@ -22,7 +22,10 @@ impl Default for Settings {
}
}
pub fn read_broker_config(config_path: &str) -> Settings {
const BROKER_CONFIG_PATH: &str = "../broker.conf";
pub fn read_broker_config() -> Settings {
let config_path = BROKER_CONFIG_PATH;
let mut settings = Settings::default();
if let Ok(set) = fs::read_to_string(config_path) {
let table = Value::from_str(&set)

66
sphinx-key/Cargo.lock generated
View File

@@ -181,8 +181,6 @@ checksum = "0694ea59225b0c5f3cb405ff3f670e4828358ed26aec49dc352f730f0cb1a8a3"
dependencies = [
"bech32",
"bitcoin_hashes",
"core2",
"hashbrown 0.8.2",
"secp256k1",
"serde",
]
@@ -193,7 +191,6 @@ version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90064b8dee6815a6470d60bad07bbbaee885c0e12d04177138fa3291a01b7bc4"
dependencies = [
"core2",
"serde",
]
@@ -206,7 +203,7 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "bolt-derive"
version = "0.1.0"
source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=e13c8cd994b310f598c0b2902741d89ad5472382#e13c8cd994b310f598c0b2902741d89ad5472382"
source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=53e6c4beac81f6b450af0bde3d8e1ad6883aa0a4#53e6c4beac81f6b450af0bde3d8e1ad6883aa0a4"
dependencies = [
"proc-macro2",
"quote",
@@ -418,15 +415,6 @@ version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc"
[[package]]
name = "core2"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "239fa3ae9b63c2dc74bd3fa852d4792b8b305ae64eeede946265b6af62f1fff3"
dependencies = [
"memchr",
]
[[package]]
name = "cpufeatures"
version = "0.2.6"
@@ -1267,17 +1255,18 @@ dependencies = [
[[package]]
name = "lightning"
version = "0.0.114"
source = "git+https://github.com/lightningdevkit/rust-lightning.git?rev=a7600dcd584db0c46fdcd99d71d5b271f3052892#a7600dcd584db0c46fdcd99d71d5b271f3052892"
version = "0.0.115"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e009e1c0c21f66378b491bb40f548682138c63e09db6f3a05af59f8804bb9f4a"
dependencies = [
"bitcoin",
"musig2",
]
[[package]]
name = "lightning-invoice"
version = "0.22.0"
source = "git+https://github.com/lightningdevkit/rust-lightning.git?rev=a7600dcd584db0c46fdcd99d71d5b271f3052892#a7600dcd584db0c46fdcd99d71d5b271f3052892"
version = "0.23.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4e44b0e2822c8811470137d2339fdfe67a699b3248bb1606d1d02eb6a1e9f0a"
dependencies = [
"bech32",
"bitcoin",
@@ -1324,7 +1313,7 @@ dependencies = [
[[package]]
name = "lss-connector"
version = "0.1.0"
source = "git+https://github.com/stakwork/sphinx-rs.git#56d64d2341880063c3668d616548bc8f4abc0b74"
source = "git+https://github.com/stakwork/sphinx-rs.git#24b96a3375989f7f083fabf62b60f1e1e878a390"
dependencies = [
"anyhow",
"log",
@@ -1332,6 +1321,7 @@ dependencies = [
"secp256k1",
"serde",
"serde-big-array",
"sphinx-glyph",
"vls-protocol-signer",
]
@@ -1368,14 +1358,6 @@ dependencies = [
"adler",
]
[[package]]
name = "musig2"
version = "0.1.0"
source = "git+https://github.com/arik-so/rust-musig2?rev=27797d7#27797d78cf64e8974e38d7f31ebb11e455015a9e"
dependencies = [
"bitcoin",
]
[[package]]
name = "nb"
version = "0.1.3"
@@ -1983,7 +1965,7 @@ dependencies = [
[[package]]
name = "sphinx-auther"
version = "0.1.12"
source = "git+https://github.com/stakwork/sphinx-rs.git#56d64d2341880063c3668d616548bc8f4abc0b74"
source = "git+https://github.com/stakwork/sphinx-rs.git#24b96a3375989f7f083fabf62b60f1e1e878a390"
dependencies = [
"anyhow",
"base64",
@@ -1995,7 +1977,7 @@ dependencies = [
[[package]]
name = "sphinx-crypter"
version = "0.1.0"
source = "git+https://github.com/stakwork/sphinx-rs.git#56d64d2341880063c3668d616548bc8f4abc0b74"
source = "git+https://github.com/stakwork/sphinx-rs.git#24b96a3375989f7f083fabf62b60f1e1e878a390"
dependencies = [
"anyhow",
"chacha20poly1305",
@@ -2006,7 +1988,7 @@ dependencies = [
[[package]]
name = "sphinx-glyph"
version = "0.1.2"
source = "git+https://github.com/stakwork/sphinx-rs.git#56d64d2341880063c3668d616548bc8f4abc0b74"
source = "git+https://github.com/stakwork/sphinx-rs.git#24b96a3375989f7f083fabf62b60f1e1e878a390"
dependencies = [
"anyhow",
"hex",
@@ -2030,6 +2012,7 @@ dependencies = [
"esp-idf-sys",
"hex",
"log",
"lss-connector",
"rmp-serde",
"serde",
"serde_json",
@@ -2042,7 +2025,7 @@ dependencies = [
[[package]]
name = "sphinx-signer"
version = "0.1.0"
source = "git+https://github.com/stakwork/sphinx-rs.git#56d64d2341880063c3668d616548bc8f4abc0b74"
source = "git+https://github.com/stakwork/sphinx-rs.git#24b96a3375989f7f083fabf62b60f1e1e878a390"
dependencies = [
"anyhow",
"bip39",
@@ -2287,12 +2270,11 @@ dependencies = [
[[package]]
name = "txoo"
version = "0.3.0"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04d0beccb482c6106605c4eaf4d4bc4ece62b431f148a3f7c0d53a28c0aed6e7"
checksum = "0d8d7e67ea44d2f4df67df6c91e4c2d4e199b4f4950074ccc5cb141a3be60e01"
dependencies = [
"bitcoin",
"core2",
"log",
"serde",
]
@@ -2402,8 +2384,8 @@ checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
[[package]]
name = "vls-core"
version = "0.2.1"
source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=e13c8cd994b310f598c0b2902741d89ad5472382#e13c8cd994b310f598c0b2902741d89ad5472382"
version = "0.9.0"
source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=53e6c4beac81f6b450af0bde3d8e1ad6883aa0a4#53e6c4beac81f6b450af0bde3d8e1ad6883aa0a4"
dependencies = [
"anyhow",
"bitcoin",
@@ -2424,8 +2406,8 @@ dependencies = [
[[package]]
name = "vls-persist"
version = "0.2.0"
source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=e13c8cd994b310f598c0b2902741d89ad5472382#e13c8cd994b310f598c0b2902741d89ad5472382"
version = "0.9.0"
source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=53e6c4beac81f6b450af0bde3d8e1ad6883aa0a4#53e6c4beac81f6b450af0bde3d8e1ad6883aa0a4"
dependencies = [
"hex",
"log",
@@ -2437,8 +2419,8 @@ dependencies = [
[[package]]
name = "vls-protocol"
version = "0.2.0"
source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=e13c8cd994b310f598c0b2902741d89ad5472382#e13c8cd994b310f598c0b2902741d89ad5472382"
version = "0.9.0"
source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=53e6c4beac81f6b450af0bde3d8e1ad6883aa0a4#53e6c4beac81f6b450af0bde3d8e1ad6883aa0a4"
dependencies = [
"as-any",
"bolt-derive",
@@ -2451,8 +2433,8 @@ dependencies = [
[[package]]
name = "vls-protocol-signer"
version = "0.2.0"
source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=e13c8cd994b310f598c0b2902741d89ad5472382#e13c8cd994b310f598c0b2902741d89ad5472382"
version = "0.9.0"
source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=53e6c4beac81f6b450af0bde3d8e1ad6883aa0a4#53e6c4beac81f6b450af0bde3d8e1ad6883aa0a4"
dependencies = [
"bit-vec",
"log",

View File

@@ -22,6 +22,7 @@ no_persist = []
[dependencies]
sphinx-signer = { git = "https://github.com/stakwork/sphinx-rs.git", optional = true }
sphinx-crypter = { git = "https://github.com/stakwork/sphinx-rs.git" }
lss-connector = { git = "https://github.com/stakwork/sphinx-rs.git", default-features = false }
bitflags = "1.3.2"
esp-idf-sys = { version = "0.32.1", features = ["binstart"] }
embedded-svc = "0.24.0"

View File

@@ -2,7 +2,7 @@ CFLAGS=-fno-pic
CC=$PWD/.embuild/espressif/tools/riscv32-esp-elf/esp-2021r2-patch3-8.4.0/riscv32-esp-elf/bin/riscv32-esp-elf-gcc
cargo +nightly build --release
SSID=sphinx-1 PASS=sphinx-1234 cargo build --release
esptool.py --chip esp32c3 elf2image target/riscv32imc-esp-espidf/release/sphinx-key

View File

@@ -2,6 +2,7 @@ use crate::core::events::Event as CoreEvent;
use sphinx_signer::sphinx_glyph::topics;
use anyhow::Result;
use embedded_svc::mqtt::client::Details;
use embedded_svc::mqtt::client::{Connection, Event, Message as MqttMessage, MessageImpl, QoS};
use embedded_svc::utils::mqtt::client::ConnState;
// use embedded_svc::utils::mqtt::client::Connection as MqttConnection;
@@ -29,7 +30,6 @@ pub fn make_client(
task_stack: 12288,
username: Some(username),
password: Some(password),
// FIXME - mqtts
crt_bundle_attach: Some(esp_idf_sys::esp_crt_bundle_attach),
..Default::default()
};
@@ -51,6 +51,8 @@ pub fn make_client(
thread::spawn(move || {
info!("MQTT Listening for messages");
let mut inflight = Vec::new();
let mut inflight_topic = "".to_string();
loop {
match connection.next() {
Some(msg) => match msg {
@@ -76,19 +78,50 @@ pub fn make_client(
Event::Unsubscribed(_mes_id) => info!("RECEIVED Unsubscribed MESSAGE"),
Event::Published(_mes_id) => info!("RECEIVED Published MESSAGE"),
Event::Received(msg) => {
let topic_opt = msg.topic();
if let Some(topic) = topic_opt {
let incoming_message: Option<(String, Vec<u8>)> = match msg.details() {
Details::Complete => {
if let Some(topic) = msg.topic() {
Some((topic.to_string(), msg.data().to_vec()))
} else {
None
}
}
Details::InitialChunk(chunk_info) => {
if let Some(topic) = msg.topic() {
inflight_topic = topic.to_string();
inflight.extend(msg.data().iter());
None
} else {
None
}
}
Details::SubsequentChunk(chunk_data) => {
inflight.extend(msg.data().iter());
if inflight.len() == chunk_data.total_data_size {
let ret = Some((inflight_topic, inflight));
inflight_topic = "".to_string();
inflight = Vec::new();
ret
} else {
None
}
}
};
if let Some((topic, data)) = incoming_message {
if topic.ends_with(topics::VLS) {
tx.send(CoreEvent::VlsMessage(msg.data().to_vec()))
tx.send(CoreEvent::VlsMessage(data))
.expect("couldnt send Event::VlsMessage");
} else if topic.ends_with(topics::LSS_MSG) {
log::info!("received data len {}", data.len());
tx.send(CoreEvent::LssMessage(data))
.expect("couldnt send Event::LssMessage");
} else if topic.ends_with(topics::CONTROL) {
tx.send(CoreEvent::Control(msg.data().to_vec()))
tx.send(CoreEvent::Control(data))
.expect("couldnt send Event::Control");
} else {
log::warn!("unrecognized topic {}", topic);
}
} else {
log::warn!("empty topic in msg!!!");
}
}
Event::Deleted(_mes_id) => info!("RECEIVED Deleted MESSAGE"),

View File

@@ -1,9 +1,11 @@
use crate::conn::mqtt::QOS;
use crate::core::lss;
use crate::ota::{update_sphinx_key, validate_ota_message};
use lss_connector::secp256k1::PublicKey;
use sphinx_signer::lightning_signer::bitcoin::Network;
use sphinx_signer::lightning_signer::persist::Persist;
use sphinx_signer::persist::FsPersister;
use sphinx_signer::persist::{FsPersister, ThreadMemoPersister};
use sphinx_signer::sphinx_glyph::control::{
Config, ControlMessage, ControlResponse, Controller, Policy,
};
@@ -26,6 +28,7 @@ pub enum Event {
Connected,
Disconnected,
VlsMessage(Vec<u8>),
LssMessage(Vec<u8>),
Control(Vec<u8>),
}
@@ -48,6 +51,32 @@ pub enum Status {
pub const ROOT_STORE: &str = "/sdcard/store";
pub const SUB_TOPICS: &[&str] = &[topics::VLS, topics::LSS_MSG, topics::CONTROL];
fn mqtt_sub(
mqtt: &mut EspMqttClient<ConnState<MessageImpl, EspError>>,
client_id: &str,
topics: &[&str],
) {
for top in topics {
let topic = format!("{}/{}", client_id, top);
log::info!("SUBSCRIBE to {}", topic);
mqtt.subscribe(&topic, QOS)
.expect("could not MQTT subscribe");
}
}
fn mqtt_pub(
mqtt: &mut EspMqttClient<ConnState<MessageImpl, EspError>>,
client_id: &str,
top: &str,
payload: &[u8],
) {
let topic = format!("{}/{}", client_id, top);
mqtt.publish(&topic, QOS, false, payload)
.expect("could not MQTT publish");
}
// the main event loop
#[cfg(not(feature = "pingpong"))]
pub fn make_event_loop(
@@ -61,20 +90,14 @@ pub fn make_event_loop(
policy: &Policy,
mut ctrlr: Controller,
client_id: &str,
node_id: &PublicKey,
) -> Result<()> {
while let Ok(event) = rx.recv() {
log::info!("BROKER IP AND PORT: {}", config.broker);
// wait for a Connection first.
match event {
Event::Connected => {
let vls_topic = format!("{}/{}", client_id, topics::VLS);
log::info!("SUBSCRIBE to {}", vls_topic);
mqtt.subscribe(&vls_topic, QOS)
.expect("could not MQTT subscribe");
let control_topic = format!("{}/{}", client_id, topics::CONTROL);
mqtt.subscribe(&control_topic, QOS)
.expect("could not MQTT subscribe");
led_tx.send(Status::Connected).unwrap();
mqtt_sub(&mut mqtt, client_id, SUB_TOPICS);
break;
}
_ => (),
@@ -83,25 +106,31 @@ pub fn make_event_loop(
// create the fs persister
// 8 character max file names
let persister: Arc<dyn Persist> = Arc::new(FsPersister::new(&ROOT_STORE, Some(8)));
// let persister: Arc<dyn Persist> = Arc::new(FsPersister::new(&ROOT_STORE, Some(8)));
let persister = Arc::new(ThreadMemoPersister {});
// initialize the RootHandler
let handler_builder =
sphinx_signer::root::builder(seed, network, policy, persister).expect("failed to init signer");
let (root_handler, _) = handler_builder.build();
let rhb = sphinx_signer::root::builder(seed, network, policy, persister, node_id)
.expect("failed to init signer");
// FIXME it right to restart here?
let (root_handler, lss_signer) = match lss::init_lss(client_id, &rx, rhb, &mut mqtt) {
Ok(rl) => rl,
Err(e) => {
log::error!("failed to init lss {:?}", e);
unsafe { esp_idf_sys::esp_restart() };
}
};
// store the previous msgs processed, for LSS last step
let mut msgs: Option<(Vec<u8>, Vec<u8>)> = None;
// signing loop
log::info!("=> starting the main signing loop...");
while let Ok(event) = rx.recv() {
match event {
Event::Connected => {
let vls_topic = format!("{}/{}", client_id, topics::VLS);
mqtt.subscribe(&vls_topic, QOS)
.expect("could not MQTT subscribe");
log::info!("SUBSCRIBE TO {}", vls_topic);
let control_topic = format!("{}/{}", client_id, topics::CONTROL);
mqtt.subscribe(&control_topic, QOS)
.expect("could not MQTT subscribe");
mqtt_sub(&mut mqtt, client_id, SUB_TOPICS);
led_tx.send(Status::Connected).unwrap();
}
Event::Disconnected => {
@@ -110,22 +139,41 @@ pub fn make_event_loop(
}
Event::VlsMessage(ref msg_bytes) => {
led_tx.send(Status::Signing).unwrap();
let _ret =
match sphinx_signer::root::handle(&root_handler, msg_bytes.clone(), do_log) {
Ok(b) => {
let vls_return_topic = format!("{}/{}", client_id, topics::VLS_RETURN);
mqtt.publish(&vls_return_topic, QOS, false, &b)
.expect("could not publish VLS response");
let _ret = match sphinx_signer::root::handle_with_lss(
&root_handler,
&lss_signer,
msg_bytes.clone(),
do_log,
) {
Ok((vls_b, lss_b)) => {
if lss_b.len() == 0 {
// no muts, respond directly back!
mqtt_pub(&mut mqtt, client_id, topics::VLS_RETURN, &vls_b);
} else {
// muts! send LSS first!
msgs = Some((vls_b, lss_b.clone()));
mqtt_pub(&mut mqtt, client_id, topics::LSS_RES, &lss_b);
}
Err(e) => {
let err_msg = GlyphError::new(1, &e.to_string());
log::error!("HANDLE FAILED {:?}", e);
let error_topic = format!("{}/{}", client_id, topics::ERROR);
mqtt.publish(&error_topic, QOS, false, &err_msg.to_vec()[..])
.expect("could not publish VLS error");
// panic!("HANDLE FAILED {:?}", e);
}
};
}
Err(e) => {
let err_msg = GlyphError::new(1, &e.to_string());
log::error!("HANDLE FAILED {:?}", e);
mqtt_pub(&mut mqtt, client_id, topics::ERROR, &err_msg.to_vec()[..]);
}
};
}
Event::LssMessage(ref msg_bytes) => {
match lss::handle_lss_msg(msg_bytes, &msgs, &lss_signer) {
Ok((ret_topic, bytes)) => {
// set msgs back to None
msgs = None;
mqtt_pub(&mut mqtt, client_id, &ret_topic, &bytes);
}
Err(e) => {
let err_msg = GlyphError::new(1, &e.to_string());
mqtt_pub(&mut mqtt, client_id, topics::ERROR, &err_msg.to_vec()[..]);
}
}
}
Event::Control(ref msg_bytes) => {
log::info!("GOT A CONTROL MSG");
@@ -135,9 +183,7 @@ pub fn make_event_loop(
{
let res_data =
rmp_serde::to_vec_named(&res).expect("could not publish control response");
let control_return_topic = format!("{}/{}", client_id, topics::CONTROL_RETURN);
mqtt.publish(&control_return_topic, QOS, false, &res_data)
.expect("could not publish control response");
mqtt_pub(&mut mqtt, client_id, topics::CONTROL_RETURN, &res_data);
}
}
}
@@ -224,16 +270,14 @@ pub fn make_event_loop(
_policy: &Policy,
mut _ctrlr: Controller,
client_id: &str,
_node_id: &PublicKey,
) -> Result<()> {
log::info!("About to subscribe to the mpsc channel");
while let Ok(event) = rx.recv() {
match event {
Event::Connected => {
led_tx.send(Status::ConnectedToMqtt).unwrap();
let vls_topic = format!("{}/{}", client_id, topics::VLS);
log::info!("SUBSCRIBE TO {}", vls_topic);
mqtt.subscribe(&vls_topic, QOS)
.expect("could not MQTT subscribe");
mqtt_sub(&mut mqtt, client_id, &[topics::VLS]);
}
Event::VlsMessage(msg_bytes) => {
led_tx.send(Status::Signing).unwrap();
@@ -241,10 +285,9 @@ pub fn make_event_loop(
if do_log {
log::info!("GOT A PING MESSAGE! returning pong now...");
}
let vls_return_topic = format!("{}/{}", client_id, topics::VLS_RETURN);
mqtt.publish(&vls_return_topic, QOS, false, b)
.expect("could not publish ping response");
mqtt_pub(&mut mqtt, client_id, topics::VLS_RETURN, &b);
}
Event::LssMessage(_) => (),
Event::Disconnected => {
led_tx.send(Status::ConnectingToMqtt).unwrap();
log::info!("GOT A Event::Disconnected msg!");

View File

@@ -0,0 +1,43 @@
use crate::conn::mqtt::QOS;
use crate::core::events::Event;
use anyhow::{anyhow, Result};
use embedded_svc::mqtt::client::MessageImpl;
use embedded_svc::utils::mqtt::client::ConnState;
use esp_idf_svc::mqtt::client::EspMqttClient;
use esp_idf_sys::EspError;
use lss_connector::{secp256k1::PublicKey, LssSigner, Msg as LssMsg};
use sphinx_signer::sphinx_glyph::topics;
use sphinx_signer::{self, RootHandler, RootHandlerBuilder};
use std::sync::mpsc;
pub use lss_connector::handle_lss_msg;
pub fn init_lss(
client_id: &str,
rx: &mpsc::Receiver<Event>,
handler_builder: RootHandlerBuilder,
mqtt: &mut EspMqttClient<ConnState<MessageImpl, EspError>>,
) -> Result<(RootHandler, LssSigner)> {
let first_lss_msg = match rx.recv()? {
Event::LssMessage(b) => b,
_ => return Err(anyhow!("not a lss msg")),
};
let init = LssMsg::from_slice(&first_lss_msg)?.as_init()?;
let server_pubkey = PublicKey::from_slice(&init.server_pubkey)?;
let (lss_signer, res1) = LssSigner::new(&handler_builder, &server_pubkey);
let lss_res_topic = format!("{}/{}", client_id, topics::LSS_RES);
mqtt.publish(&lss_res_topic, QOS, false, &res1)
.expect("could not publish LSS response");
let second_lss_msg = match rx.recv()? {
Event::LssMessage(b) => b,
_ => return Err(anyhow!("not a lss msg")),
};
let created = LssMsg::from_slice(&second_lss_msg)?.as_created()?;
let (root_handler, res2) = lss_signer.build_with_lss(created, handler_builder)?;
mqtt.publish(&lss_res_topic, QOS, false, &res2)
.expect("could not publish LSS response 2");
Ok((root_handler, lss_signer))
}

View File

@@ -1,3 +1,4 @@
pub mod config;
pub mod control;
pub mod events;
pub mod lss;

View File

@@ -147,11 +147,14 @@ fn make_and_launch_client(
// make the controller to validate Control messages
let ctrlr = controller_from_seed(&network, &seed[..], flash);
let pubkey = hex::encode(ctrlr.pubkey().serialize());
let token = ctrlr.make_auth_token().expect("couldnt make auth token");
log::info!("PUBKEY {} TOKEN {}", &pubkey, &token);
let mqtt_client = conn::mqtt::make_client(&config.broker, CLIENT_ID, &pubkey, &token, tx)?;
let pubkey = ctrlr.pubkey();
let pubkey_str = hex::encode(&pubkey.serialize());
let token = ctrlr.make_auth_token().expect("couldnt make auth token");
log::info!("PUBKEY {} TOKEN {}", &pubkey_str, &token);
let client_id = random_word(8);
let mqtt_client = conn::mqtt::make_client(&config.broker, &client_id, &pubkey_str, &token, tx)?;
// let mqtt_client = conn::mqtt::start_listening(mqtt, connection, tx)?;
// this blocks forever... the "main thread"
@@ -170,7 +173,17 @@ fn make_and_launch_client(
seed,
policy,
ctrlr,
CLIENT_ID,
&client_id,
&pubkey,
)?;
Ok(())
}
pub fn random_word(n: usize) -> String {
use sphinx_crypter::secp256k1::rand::{self, distributions::Alphanumeric, Rng};
rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(n)
.map(char::from)
.collect()
}

View File

@@ -72,19 +72,21 @@ async fn main() -> Result<(), Box<dyn Error>> {
}
};
let vls_topic = format!("{}/{}", client_id, topics::VLS);
client
.subscribe(topics::VLS, QoS::AtMostOnce)
.subscribe(vls_topic, QoS::AtMostOnce)
.await
.expect("could not mqtt subscribe");
let ctrl_topic = format!("{}/{}", client_id, topics::CONTROL);
client
.subscribe(topics::CONTROL, QoS::AtMostOnce)
.subscribe(ctrl_topic, QoS::AtMostOnce)
.await
.expect("could not mqtt subscribe");
if is_test {
run_test(eventloop, &client, ctrlr, is_log).await;
run_test(eventloop, &client, ctrlr, is_log, client_id).await;
} else {
run_main(eventloop, &client, ctrlr, is_log, &seed, network).await;
run_main(eventloop, &client, ctrlr, is_log, &seed, network, client_id).await;
}
}
}
@@ -96,14 +98,16 @@ async fn run_main(
is_log: bool,
seed: &[u8],
network: Network,
client_id: &str,
) {
let store_path = env::var("STORE_PATH").unwrap_or(ROOT_STORE.to_string());
let seed32: [u8; 32] = seed.try_into().expect("wrong seed");
let persister: Arc<dyn Persist> = Arc::new(FsPersister::new(&store_path, None));
let policy = types::Policy::default();
let root_handler = sphinx_signer::root::init(seed32, network, &policy, persister)
let handler_builder = sphinx_signer::root::builder(seed32, network, &policy, persister)
.expect("Could not initialize root_handler");
let (root_handler, _muts) = handler_builder.build();
// the actual handler loop
loop {
match eventloop.poll().await {
@@ -111,46 +115,52 @@ async fn run_main(
println!("{:?}", event);
if let Some((topic, msg_bytes)) = incoming_bytes(event) {
println!("MSG BYTES {:}", msg_bytes.len());
match topic.as_str() {
topics::VLS => {
match sphinx_signer::root::handle(&root_handler, msg_bytes, is_log) {
Ok(b) => client
.publish(topics::VLS_RETURN, QoS::AtMostOnce, false, b)
log::info!(">>> {}", topic.as_str());
if topic.as_str().ends_with(topics::VLS) {
match sphinx_signer::root::handle(&root_handler, msg_bytes, is_log) {
Ok(b) => {
let ret_topic = format!("{}/{}", client_id, topics::VLS_RETURN);
client
.publish(ret_topic, QoS::AtMostOnce, false, b)
.await
.expect("could not publish init response"),
Err(e) => client
.expect("could not publish init response");
},
Err(e) => {
let err_topic = format!("{}/{}", client_id, topics::ERROR);
client
.publish(
topics::ERROR,
err_topic,
QoS::AtMostOnce,
false,
e.to_string().as_bytes(),
)
.await
.expect("could not publish error response"),
};
}
topics::CONTROL => {
match ctrlr.handle(&msg_bytes) {
Ok((_msg, res)) => {
let res_data = rmp_serde::to_vec_named(&res)
.expect("could not build control response");
client
.publish(
topics::CONTROL_RETURN,
QoS::AtMostOnce,
false,
res_data,
)
.await
.expect("could not mqtt publish");
}
Err(e) => log::warn!("error parsing ctrl msg {:?}", e),
};
}
_ => log::info!("invalid topic"),
.expect("could not publish error response");
}
};
} else if topic.as_str().ends_with(topics::CONTROL) {
match ctrlr.handle(&msg_bytes) {
Ok((_msg, res)) => {
let res_data = rmp_serde::to_vec_named(&res)
.expect("could not build control response");
let ctrl_ret_topic = format!("{}/{}", client_id, topics::CONTROL_RETURN);
client
.publish(
ctrl_ret_topic,
QoS::AtMostOnce,
false,
res_data,
)
.await
.expect("could not mqtt publish");
}
Err(e) => log::warn!("error parsing ctrl msg {:?}", e),
};
} else {
log::info!("invalid topic");
}
}
}
},
Err(e) => {
log::warn!("diconnected {:?}", e);
tokio::time::sleep(Duration::from_secs(1)).await;
@@ -165,6 +175,7 @@ async fn run_test(
client: &AsyncClient,
mut ctrlr: Controller,
is_log: bool,
client_id: &str,
) {
// test handler loop
loop {
@@ -172,44 +183,45 @@ async fn run_test(
Ok(event) => {
println!("{:?}", event);
if let Some((topic, msg_bytes)) = incoming_bytes(event) {
match topic.as_str() {
topics::VLS => {
let (ping, header) =
parser::request_from_bytes::<msgs::Ping>(msg_bytes)
.expect("read ping header");
if is_log {
println!("INCOMING: {:?}", ping);
log::info!(">>> {}", topic.as_str());
if topic.as_str().ends_with(topics::VLS) {
let (ping, header) =
parser::request_from_bytes::<msgs::Ping>(msg_bytes)
.expect("read ping header");
if is_log {
println!("INCOMING: {:?}", ping);
}
let pong = msgs::Pong {
id: ping.id,
message: ping.message,
};
let bytes = parser::raw_response_from_msg(pong, header.sequence)
.expect("couldnt parse raw response");
let ret_topic = format!("{}/{}", client_id, topics::VLS_RETURN);
client
.publish(ret_topic, QoS::AtMostOnce, false, bytes)
.await
.expect("could not mqtt publish");
} else if topic.as_str().ends_with(topics::CONTROL) {
match ctrlr.handle(&msg_bytes) {
Ok((_msg, res)) => {
let res_data = rmp_serde::to_vec_named(&res)
.expect("could not build control response");
let ctrl_ret_topic = format!("{}/{}", client_id, topics::CONTROL_RETURN);
client
.publish(
ctrl_ret_topic,
QoS::AtMostOnce,
false,
res_data,
)
.await
.expect("could not mqtt publish");
}
let pong = msgs::Pong {
id: ping.id,
message: ping.message,
};
let bytes = parser::raw_response_from_msg(pong, header.sequence)
.expect("couldnt parse raw response");
client
.publish(topics::VLS_RETURN, QoS::AtMostOnce, false, bytes)
.await
.expect("could not mqtt publish");
}
topics::CONTROL => {
match ctrlr.handle(&msg_bytes) {
Ok((_msg, res)) => {
let res_data = rmp_serde::to_vec_named(&res)
.expect("could not build control response");
client
.publish(
topics::CONTROL_RETURN,
QoS::AtMostOnce,
false,
res_data,
)
.await
.expect("could not mqtt publish");
}
Err(e) => log::warn!("error parsing ctrl msg {:?}", e),
};
}
_ => log::info!("invalid topic"),
Err(e) => log::warn!("error parsing ctrl msg {:?}", e),
};
} else {
log::info!("invalid topic");
}
}
}