Merge pull request #60 from stakwork/broker-ctrl

refactor broker to use tokio rather than std::thread
This commit is contained in:
Evan Feenstra
2022-09-09 11:48:07 -07:00
committed by GitHub
24 changed files with 489 additions and 223 deletions

View File

@@ -13,7 +13,6 @@ vls-protocol-client = { git = "https://gitlab.com/lightning-signer/validating-li
rumqttd = { git = "https://github.com/Evanfeenstra/rumqtt", branch = "metrics" } rumqttd = { git = "https://github.com/Evanfeenstra/rumqtt", branch = "metrics" }
pretty_env_logger = "0.4.0" pretty_env_logger = "0.4.0"
confy = "0.4.0" confy = "0.4.0"
tokio = { version = "1.4.0", features = ["rt", "rt-multi-thread", "macros"] }
sphinx-key-parser = { path = "../parser" } sphinx-key-parser = { path = "../parser" }
secp256k1 = { version = "0.24.0", features = ["rand-std", "bitcoin_hashes"] } secp256k1 = { version = "0.24.0", features = ["rand-std", "bitcoin_hashes"] }
anyhow = {version = "1", features = ["backtrace"]} anyhow = {version = "1", features = ["backtrace"]}
@@ -28,6 +27,9 @@ bitcoin = "0.29.0"
async-trait = "0.1" async-trait = "0.1"
url = { version = "2.2" } url = { version = "2.2" }
toml = "0.5.9" toml = "0.5.9"
rocket = {version = "0.5.0-rc.2", features = ["json"]}
thiserror = "1.0.31"
hex = "0.4.3"
[features] [features]
default = ["std"] default = ["std"]

3
broker/rocket.toml Normal file
View File

@@ -0,0 +1,3 @@
[default]
address = "0.0.0.0"

View File

@@ -1,6 +1,6 @@
use crate::{ChannelReply, ChannelRequest}; use crate::{mqtt::PUB_TOPIC, ChannelReply, ChannelRequest};
use async_trait::async_trait; use async_trait::async_trait;
use tokio::sync::{mpsc, oneshot}; use rocket::tokio::sync::{mpsc, oneshot};
use vls_protocol::{Error, Result}; use vls_protocol::{Error, Result};
use vls_protocol_client::SignerPort; use vls_protocol_client::SignerPort;
@@ -28,8 +28,7 @@ impl MqttSignerPort {
} }
async fn send_request(&self, message: Vec<u8>) -> Result<oneshot::Receiver<ChannelReply>> { async fn send_request(&self, message: Vec<u8>) -> Result<oneshot::Receiver<ChannelReply>> {
let (reply_tx, reply_rx) = oneshot::channel(); let (request, reply_rx) = ChannelRequest::new(PUB_TOPIC, message);
let request = ChannelRequest { message, reply_tx };
self.sender.send(request).await.map_err(|_| Error::Eof)?; self.sender.send(request).await.map_err(|_| Error::Eof)?;
Ok(reply_rx) Ok(reply_rx)
} }

View File

@@ -1,6 +1,7 @@
#![feature(once_cell)] #![feature(once_cell)]
mod chain_tracker; mod chain_tracker;
mod mqtt; mod mqtt;
mod routes;
mod run_test; mod run_test;
mod unix_fd; mod unix_fd;
mod util; mod util;
@@ -10,9 +11,12 @@ use crate::mqtt::start_broker;
use crate::unix_fd::SignerLoop; use crate::unix_fd::SignerLoop;
use crate::util::read_broker_config; use crate::util::read_broker_config;
use clap::{App, AppSettings, Arg}; use clap::{App, AppSettings, Arg};
use rocket::tokio::{
self,
sync::{mpsc, oneshot},
};
use std::env; use std::env;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::{mpsc, oneshot};
use url::Url; use url::Url;
use vls_frontend::Frontend; use vls_frontend::Frontend;
use vls_proxy::client::UnixClient; use vls_proxy::client::UnixClient;
@@ -27,9 +31,21 @@ pub struct Channel {
/// Responses are received on the oneshot sender /// Responses are received on the oneshot sender
#[derive(Debug)] #[derive(Debug)]
pub struct ChannelRequest { pub struct ChannelRequest {
pub topic: String,
pub message: Vec<u8>, pub message: Vec<u8>,
pub reply_tx: oneshot::Sender<ChannelReply>, pub reply_tx: oneshot::Sender<ChannelReply>,
} }
impl ChannelRequest {
pub fn new(topic: &str, message: Vec<u8>) -> (Self, oneshot::Receiver<ChannelReply>) {
let (reply_tx, reply_rx) = oneshot::channel();
let cr = ChannelRequest {
topic: topic.to_string(),
message,
reply_tx,
};
(cr, reply_rx)
}
}
// mpsc reply // mpsc reply
#[derive(Debug)] #[derive(Debug)]
@@ -39,7 +55,8 @@ pub struct ChannelReply {
const BROKER_CONFIG_PATH: &str = "../broker.conf"; const BROKER_CONFIG_PATH: &str = "../broker.conf";
fn main() -> anyhow::Result<()> { #[rocket::launch]
async fn rocket() -> _ {
let parent_fd = open_parent_fd(); let parent_fd = open_parent_fd();
util::setup_logging("hsmd ", "info"); util::setup_logging("hsmd ", "info");
@@ -63,26 +80,28 @@ fn main() -> anyhow::Result<()> {
let version = let version =
env::var("GREENLIGHT_VERSION").expect("set GREENLIGHT_VERSION to match c-lightning"); env::var("GREENLIGHT_VERSION").expect("set GREENLIGHT_VERSION to match c-lightning");
println!("{}", version); println!("{}", version);
return Ok(()); panic!("end")
} } else {
if matches.is_present("test") {
if matches.is_present("test") { run_test::run_test().await
run_test::run_test(); } else {
return Ok(()); run_main(parent_fd).await
}
} }
}
async fn run_main(parent_fd: i32) -> rocket::Rocket<rocket::Build> {
let settings = read_broker_config(BROKER_CONFIG_PATH); let settings = read_broker_config(BROKER_CONFIG_PATH);
let (tx, rx) = mpsc::channel(1000); let (tx, rx) = mpsc::channel(1000);
let (status_tx, mut status_rx) = mpsc::channel(1000); let (status_tx, mut status_rx) = mpsc::channel(1000);
log::info!("=> start broker on network: {}", settings.network); log::info!("=> start broker on network: {}", settings.network);
let runtime = start_broker(rx, status_tx, "sphinx-1", &settings); start_broker(rx, status_tx, "sphinx-1", &settings).await;
log::info!("=> wait for connected status"); log::info!("=> wait for connected status");
// wait for connection = true // wait for connection = true
let status = status_rx.blocking_recv().expect("couldnt receive"); let status = status_rx.recv().await.expect("couldnt receive");
log::info!("=> connection status: {}", status); log::info!("=> connection status: {}", status);
assert_eq!(status, true, "expected connected = true"); assert_eq!(status, true, "expected connected = true");
// runtime.block_on(async {
if let Ok(btc_url) = env::var("BITCOIND_RPC_URL") { if let Ok(btc_url) = env::var("BITCOIND_RPC_URL") {
let signer_port = MqttSignerPort::new(tx.clone()); let signer_port = MqttSignerPort::new(tx.clone());
@@ -93,17 +112,18 @@ fn main() -> anyhow::Result<()> {
}), }),
Url::parse(&btc_url).expect("malformed btc rpc url"), Url::parse(&btc_url).expect("malformed btc rpc url"),
); );
runtime.block_on(async { tokio::spawn(async move {
frontend.start(); frontend.start();
}); });
} }
// listen to reqs from CLN
let conn = UnixConnection::new(parent_fd); let conn = UnixConnection::new(parent_fd);
let client = UnixClient::new(conn); let client = UnixClient::new(conn);
// TODO pass status_rx into SignerLoop // TODO pass status_rx into SignerLoop
let mut signer_loop = SignerLoop::new(client, tx); let mut signer_loop = SignerLoop::new(client, tx.clone());
signer_loop.start(Some(&settings)); // spawn CLN listener on a std thread
// }) std::thread::spawn(move || {
signer_loop.start(Some(&settings));
});
Ok(()) routes::launch_rocket(tx)
} }

View File

@@ -6,15 +6,16 @@ use librumqttd::{
rumqttlog::router::ConnectionMetrics, rumqttlog::router::ConnectionMetrics,
Config, Config,
}; };
use rocket::tokio::time::timeout;
use rocket::tokio::{self, sync::mpsc};
use std::sync::Arc; use std::sync::Arc;
use std::sync::{LazyLock, Mutex}; use std::sync::{LazyLock, Mutex};
use std::thread;
use std::time::Duration; use std::time::Duration;
use tokio::sync::mpsc;
use tokio::time::timeout;
pub const PUB_TOPIC: &str = "sphinx";
pub const CONTROL_TOPIC: &str = "sphinx-control";
const SUB_TOPIC: &str = "sphinx-return"; const SUB_TOPIC: &str = "sphinx-return";
const PUB_TOPIC: &str = "sphinx"; const CONTROL_SUB_TOPIC: &str = "sphinx-control-return";
const USERNAME: &str = "sphinx-key"; const USERNAME: &str = "sphinx-key";
const PASSWORD: &str = "sphinx-key-pass"; const PASSWORD: &str = "sphinx-key-pass";
// must get a reply within this time, or disconnects // must get a reply within this time, or disconnects
@@ -28,100 +29,96 @@ fn get_connected() -> bool {
*CONNECTED.lock().unwrap() *CONNECTED.lock().unwrap()
} }
pub fn start_broker( pub async fn start_broker(
mut receiver: mpsc::Receiver<ChannelRequest>, mut receiver: mpsc::Receiver<ChannelRequest>,
status_sender: mpsc::Sender<bool>, status_sender: mpsc::Sender<bool>,
expected_client_id: &str, expected_client_id: &str,
settings: &Settings, settings: &Settings,
) -> tokio::runtime::Runtime { ) {
let config = config(settings); let config = config(settings);
let client_id = expected_client_id.to_string(); let client_id = expected_client_id.to_string();
let (mut router, servers, builder) = async_locallink::construct(config.clone()); let (mut router, servers, builder) = async_locallink::construct(config.clone());
thread::spawn(move || { // std thread for the router
std::thread::spawn(move || {
log::info!("start mqtt router");
router.start().expect("could not start router"); router.start().expect("could not start router");
}); });
let mut rt_builder = tokio::runtime::Builder::new_multi_thread(); tokio::spawn(async move {
rt_builder.enable_all(); log::info!("start mqtt relayer and localclient");
let rt = rt_builder.build().unwrap(); let (msg_tx, mut msg_rx): (mpsc::Sender<Vec<u8>>, mpsc::Receiver<Vec<u8>>) =
rt.block_on(async { mpsc::channel(1000);
tokio::spawn(async move { let (mut link_tx, mut link_rx) = builder.clone().connect("localclient", 200).await.unwrap();
let (msg_tx, mut msg_rx): (mpsc::Sender<Vec<u8>>, mpsc::Receiver<Vec<u8>>) = link_tx
mpsc::channel(1000); .subscribe([SUB_TOPIC, CONTROL_SUB_TOPIC])
let (mut link_tx, mut link_rx) = .await
builder.clone().connect("localclient", 200).await.unwrap(); .unwrap();
link_tx.subscribe([SUB_TOPIC]).await.unwrap();
let router_tx = builder.router_tx(); let router_tx = builder.router_tx();
let status_sender_ = status_sender.clone(); let status_sender_ = status_sender.clone();
tokio::spawn(async move { tokio::spawn(async move {
let config = config.clone().into(); let config = config.clone().into();
let router_tx = router_tx.clone(); let router_tx = router_tx.clone();
let console: Arc<ConsoleLink> = Arc::new(ConsoleLink::new(config, router_tx)); let console: Arc<ConsoleLink> = Arc::new(ConsoleLink::new(config, router_tx));
loop { loop {
let metrics = consolelink::request_metrics(console.clone(), client_id.clone()); let metrics = consolelink::request_metrics(console.clone(), client_id.clone());
if let Some(c) = metrics_to_status(metrics, get_connected()) { if let Some(c) = metrics_to_status(metrics, get_connected()) {
set_connected(c); set_connected(c);
log::info!("connection status changed to: {}", c); log::info!("connection status changed to: {}", c);
status_sender_ status_sender_
.send(c) .send(c)
.await
.expect("couldnt send connection status");
}
tokio::time::sleep(Duration::from_millis(500)).await;
}
});
let sub_task = tokio::spawn(async move {
while let Ok(message) = link_rx.recv().await {
for payload in message.payload {
if let Err(e) = msg_tx.send(payload.to_vec()).await {
log::warn!("pub err {:?}", e);
}
}
}
});
let relay_task = tokio::spawn(async move {
while let Some(msg) = receiver.recv().await {
link_tx
.publish(&msg.topic, false, msg.message)
.await
.expect("could not mqtt pub");
match timeout(Duration::from_millis(REPLY_TIMEOUT_MS), msg_rx.recv()).await {
Ok(reply) => {
if let Err(_) = msg.reply_tx.send(ChannelReply {
reply: reply.unwrap(),
}) {
log::warn!("could not send on reply_tx");
}
}
Err(e) => {
log::warn!("reply_tx timed out {:?}", e);
set_connected(false);
status_sender
.send(false)
.await .await
.expect("couldnt send connection status"); .expect("couldnt send connection status");
} }
tokio::time::sleep(Duration::from_millis(500)).await;
} }
}); }
let sub_task = tokio::spawn(async move {
while let Ok(message) = link_rx.recv().await {
for payload in message.payload {
if let Err(e) = msg_tx.send(payload.to_vec()).await {
log::warn!("pub err {:?}", e);
}
}
}
println!("BOOM LINK_TX CLOSED!");
});
let relay_task = tokio::spawn(async move {
while let Some(msg) = receiver.recv().await {
link_tx
.publish(PUB_TOPIC, false, msg.message)
.await
.expect("could not mqtt pub");
match timeout(Duration::from_millis(REPLY_TIMEOUT_MS), msg_rx.recv()).await {
Ok(reply) => {
if let Err(_) = msg.reply_tx.send(ChannelReply {
reply: reply.unwrap(),
}) {
log::warn!("could not send on reply_tx");
}
}
Err(e) => {
log::warn!("reply_tx timed out {:?}", e);
set_connected(false);
status_sender
.send(false)
.await
.expect("couldnt send connection status");
}
}
}
println!("BOOM RECEIVER CLOSED!");
});
servers.await;
sub_task.await.unwrap();
relay_task.await.unwrap();
}); });
servers.await;
sub_task.await.unwrap();
relay_task.await.unwrap();
}); });
// give one second for router to spawn listeners // give one second for router to spawn listeners
std::thread::sleep(std::time::Duration::from_secs(1)); tokio::time::sleep(std::time::Duration::from_secs(10)).await;
rt
} }
fn metrics_to_status(metrics: ConnectionMetrics, client_connected: bool) -> Option<bool> { fn metrics_to_status(metrics: ConnectionMetrics, client_connected: bool) -> Option<bool> {

75
broker/src/routes.rs Normal file
View File

@@ -0,0 +1,75 @@
use crate::{mqtt::CONTROL_TOPIC, ChannelRequest};
use rocket::fairing::{Fairing, Info, Kind};
use rocket::http::Header;
use rocket::tokio::sync::mpsc::Sender;
use rocket::*;
use rocket::{Request, Response};
pub type Result<T> = std::result::Result<T, Error>;
#[post("/control?<msg>")]
pub async fn yo(sender: &State<Sender<ChannelRequest>>, msg: &str) -> Result<String> {
let message = hex::decode(msg)?;
// FIXME validate?
if message.len() < 65 {
return Err(Error::Fail);
}
let (request, reply_rx) = ChannelRequest::new(CONTROL_TOPIC, message);
// send to ESP
let _ = sender.send(request).await.map_err(|_| Error::Fail)?;
// wait for reply
let reply = reply_rx.await.map_err(|_| Error::Fail)?;
Ok(hex::encode(reply.reply).to_string())
}
pub fn launch_rocket(tx: Sender<ChannelRequest>) -> Rocket<Build> {
rocket::build()
.mount("/api/", routes![yo])
.attach(CORS)
.manage(tx)
}
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("failed")]
Fail,
#[error("io error: {0}")]
Io(#[from] std::io::Error),
#[error("hex error: {0}")]
Hex(#[from] hex::FromHexError),
}
use rocket::http::Status;
use rocket::response::{self, Responder};
impl<'r, 'o: 'r> Responder<'r, 'o> for Error {
fn respond_to(self, req: &'r rocket::Request<'_>) -> response::Result<'o> {
// log `self` to your favored error tracker, e.g.
// sentry::capture_error(&self);
match self {
// in our simplistic example, we're happy to respond with the default 500 responder in all cases
_ => Status::InternalServerError.respond_to(req),
}
}
}
pub struct CORS;
#[rocket::async_trait]
impl Fairing for CORS {
fn info(&self) -> Info {
Info {
name: "Add CORS headers to responses",
kind: Kind::Response,
}
}
async fn on_response<'r>(&self, _request: &'r Request<'_>, response: &mut Response<'r>) {
response.set_header(Header::new("Access-Control-Allow-Origin", "*"));
response.set_header(Header::new(
"Access-Control-Allow-Methods",
"POST, GET, PATCH, OPTIONS",
));
response.set_header(Header::new("Access-Control-Allow-Headers", "*"));
response.set_header(Header::new("Access-Control-Allow-Credentials", "true"));
}
}

View File

@@ -1,14 +1,15 @@
use crate::mqtt::start_broker; use crate::mqtt::{start_broker, PUB_TOPIC};
use crate::routes::launch_rocket;
use crate::util::Settings; use crate::util::Settings;
use crate::ChannelRequest; use crate::ChannelRequest;
use rocket::tokio::{self, sync::mpsc};
use sphinx_key_parser as parser; use sphinx_key_parser as parser;
use tokio::sync::{mpsc, oneshot};
use vls_protocol::serde_bolt::WireString; use vls_protocol::serde_bolt::WireString;
use vls_protocol::{msgs, msgs::Message}; use vls_protocol::{msgs, msgs::Message};
const CLIENT_ID: &str = "test-1"; const CLIENT_ID: &str = "test-1";
pub fn run_test() { pub async fn run_test() -> rocket::Rocket<rocket::Build> {
log::info!("TEST..."); log::info!("TEST...");
let mut id = 0u16; let mut id = 0u16;
@@ -18,9 +19,10 @@ pub fn run_test() {
let (tx, rx) = mpsc::channel(1000); let (tx, rx) = mpsc::channel(1000);
let (status_tx, mut status_rx) = mpsc::channel(1000); let (status_tx, mut status_rx) = mpsc::channel(1000);
let runtime = start_broker(rx, status_tx, CLIENT_ID, &settings); start_broker(rx, status_tx, CLIENT_ID, &settings).await;
runtime.block_on(async { let mut connected = false;
let mut connected = false; let tx_ = tx.clone();
tokio::spawn(async move {
loop { loop {
tokio::select! { tokio::select! {
status = status_rx.recv() => { status = status_rx.recv() => {
@@ -31,7 +33,7 @@ pub fn run_test() {
log::info!("========> CONNECTED! {}", connection_status); log::info!("========> CONNECTED! {}", connection_status);
} }
} }
res = iteration(id, sequence, tx.clone(), connected) => { res = iteration(id, sequence, tx_.clone(), connected) => {
if let Err(e) = res { if let Err(e) = res {
log::warn!("===> iteration failed {:?}", e); log::warn!("===> iteration failed {:?}", e);
// connected = false; // connected = false;
@@ -46,6 +48,7 @@ pub fn run_test() {
}; };
} }
}); });
launch_rocket(tx)
} }
pub async fn iteration( pub async fn iteration(
@@ -54,6 +57,7 @@ pub async fn iteration(
tx: mpsc::Sender<ChannelRequest>, tx: mpsc::Sender<ChannelRequest>,
connected: bool, connected: bool,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
return Ok(());
if !connected { if !connected {
return Ok(()); return Ok(());
} }
@@ -63,12 +67,8 @@ pub async fn iteration(
message: WireString("ping".as_bytes().to_vec()), message: WireString("ping".as_bytes().to_vec()),
}; };
let ping_bytes = parser::request_from_msg(ping, sequence, 0)?; let ping_bytes = parser::request_from_msg(ping, sequence, 0)?;
let (reply_tx, reply_rx) = oneshot::channel();
// Send a request to the MQTT handler to send to signer // Send a request to the MQTT handler to send to signer
let request = ChannelRequest { let (request, reply_rx) = ChannelRequest::new(PUB_TOPIC, ping_bytes);
message: ping_bytes,
reply_tx,
};
tx.send(request).await?; tx.send(request).await?;
println!("tx.send(request)"); println!("tx.send(request)");
let res = reply_rx.await?; let res = reply_rx.await?;

View File

@@ -1,11 +1,12 @@
use crate::mqtt::PUB_TOPIC;
use crate::util::Settings; use crate::util::Settings;
use crate::{Channel, ChannelReply, ChannelRequest}; use crate::{Channel, ChannelReply, ChannelRequest};
use bitcoin::blockdata::constants::ChainHash; use bitcoin::blockdata::constants::ChainHash;
use log::*; use log::*;
use rocket::tokio::sync::{mpsc, oneshot};
use secp256k1::PublicKey; use secp256k1::PublicKey;
use sphinx_key_parser as parser; use sphinx_key_parser as parser;
use std::thread; use std::thread;
use tokio::sync::{mpsc, oneshot};
use vls_protocol::{msgs, msgs::Message, Error, Result}; use vls_protocol::{msgs, msgs::Message, Error, Result};
use vls_proxy::client::Client; use vls_proxy::client::Client;
@@ -125,10 +126,8 @@ impl<C: 'static + Client> SignerLoop<C> {
} }
fn send_request(&mut self, message: Vec<u8>) -> Result<oneshot::Receiver<ChannelReply>> { fn send_request(&mut self, message: Vec<u8>) -> Result<oneshot::Receiver<ChannelReply>> {
// Create a one-shot channel to receive the reply
let (reply_tx, reply_rx) = oneshot::channel();
// Send a request to the MQTT handler to send to signer // Send a request to the MQTT handler to send to signer
let request = ChannelRequest { message, reply_tx }; let (request, reply_rx) = ChannelRequest::new(PUB_TOPIC, message);
// This can fail if MQTT shuts down // This can fail if MQTT shuts down
self.chan self.chan
.sender .sender

View File

@@ -99,7 +99,8 @@ fn read_port_setting(table: &Value) -> Option<u16> {
if temp <= 1023 { if temp <= 1023 {
panic!("The port number is not an integer greater than 1023") panic!("The port number is not an integer greater than 1023")
} }
if temp > u16::MAX.into() { let max: i64 = u16::MAX.into();
if temp > max {
panic!("The port number is way too big!") panic!("The port number is way too big!")
} }
log::info!("Read broker port setting: {}", temp); log::info!("Read broker port setting: {}", temp);

View File

@@ -8,7 +8,7 @@ vls-protocol = { git = "https://gitlab.com/lightning-signer/validating-lightning
serde = { version = "1.0", default-features = false } serde = { version = "1.0", default-features = false }
rmp-serde = "1.1.0" rmp-serde = "1.1.0"
serde_bolt = { version = "0.2", default-features = false } serde_bolt = { version = "0.2", default-features = false }
sphinx-auther = "0.1.9" sphinx-auther = "0.1.10"
anyhow = "1" anyhow = "1"
[features] [features]

View File

@@ -1,10 +1,12 @@
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use sphinx_auther::nonce; use sphinx_auther::nonce;
use sphinx_auther::secp256k1::{PublicKey, SecretKey}; use sphinx_auther::secp256k1::{PublicKey, SecretKey};
use std::sync::{Arc, Mutex};
#[derive(Serialize, Deserialize, Debug, Clone)] #[derive(Serialize, Deserialize, Debug, Clone)]
pub enum ControlMessage { pub enum ControlMessage {
Nonce, Nonce,
ResetWifi,
QueryPolicy, QueryPolicy,
UpdatePolicy(Policy), UpdatePolicy(Policy),
Ota(OtaParams), Ota(OtaParams),
@@ -13,6 +15,7 @@ pub enum ControlMessage {
#[derive(Serialize, Deserialize, Debug, Clone)] #[derive(Serialize, Deserialize, Debug, Clone)]
pub enum ControlResponse { pub enum ControlResponse {
Nonce(u64), Nonce(u64),
ResetWifi,
PolicyCurrent(Policy), PolicyCurrent(Policy),
PolicyUpdated(Policy), PolicyUpdated(Policy),
OtaConfirm(OtaParams), OtaConfirm(OtaParams),
@@ -29,17 +32,31 @@ pub struct OtaParams {
pub url: String, pub url: String,
} }
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct WifiParams {
pub ssid: String,
pub password: String,
}
// u64 is the nonce. Each signature must have a higher nonce // u64 is the nonce. Each signature must have a higher nonce
pub struct Controller(SecretKey, PublicKey, u64); pub struct Controller(SecretKey, PublicKey, u64, Arc<Mutex<dyn ControlPersist>>);
impl Controller { impl Controller {
pub fn new(sk: SecretKey, pk: PublicKey, nonce: u64) -> Self { pub fn new(sk: SecretKey, pk: PublicKey, nonce: u64) -> Self {
Self(sk, pk, nonce) Self(sk, pk, nonce, Arc::new(Mutex::new(DummyPersister)))
}
pub fn new_with_persister(
sk: SecretKey,
pk: PublicKey,
nonce: u64,
per: Arc<Mutex<dyn ControlPersist>>,
) -> Self {
Self(sk, pk, nonce, per)
} }
pub fn build_msg(&mut self, msg: ControlMessage) -> anyhow::Result<Vec<u8>> { pub fn build_msg(&mut self, msg: ControlMessage) -> anyhow::Result<Vec<u8>> {
let data = rmp_serde::to_vec(&msg)?; let data = rmp_serde::to_vec(&msg)?;
let ret = nonce::build_msg(&data, &self.0, self.2)?;
self.2 = self.2 + 1; self.2 = self.2 + 1;
let ret = nonce::build_msg(&data, &self.0, self.2)?;
Ok(ret) Ok(ret)
} }
pub fn build_response(&self, msg: ControlResponse) -> anyhow::Result<Vec<u8>> { pub fn build_response(&self, msg: ControlResponse) -> anyhow::Result<Vec<u8>> {
@@ -51,7 +68,51 @@ impl Controller {
self.2 = self.2 + 1; self.2 = self.2 + 1;
Ok(ret) Ok(ret)
} }
pub fn parse_msg_no_nonce(&mut self, input: &[u8]) -> anyhow::Result<ControlMessage> {
let (msg, _nonce) = nonce::parse_msg_no_nonce(input, &self.1)?;
let ret = rmp_serde::from_slice(&msg)?;
Ok(ret)
}
pub fn parse_response(&self, input: &[u8]) -> anyhow::Result<ControlResponse> { pub fn parse_response(&self, input: &[u8]) -> anyhow::Result<ControlResponse> {
Ok(rmp_serde::from_slice(input)?) Ok(rmp_serde::from_slice(input)?)
} }
pub fn handle(&mut self, input: &[u8]) -> anyhow::Result<(Vec<u8>, Option<Policy>)> {
let msg = self.parse_msg_no_nonce(input)?;
// increment the nonce EXCEPT for Nonce requests
let mut store = self.3.lock().unwrap();
match msg {
ControlMessage::Nonce => (),
_ => {
self.2 = self.2 + 1;
store.set_nonce(self.2);
}
}
let mut new_policy = None;
let res = match msg {
ControlMessage::Nonce => ControlResponse::Nonce(self.2),
ControlMessage::ResetWifi => {
store.reset();
ControlResponse::ResetWifi
}
ControlMessage::UpdatePolicy(np) => {
new_policy = Some(np.clone());
ControlResponse::PolicyUpdated(np)
}
_ => ControlResponse::Nonce(self.2),
};
let response = self.build_response(res)?;
Ok((response, new_policy))
}
}
pub trait ControlPersist: Sync + Send {
fn reset(&mut self);
fn set_nonce(&mut self, nonce: u64);
}
pub struct DummyPersister;
impl ControlPersist for DummyPersister {
fn reset(&mut self) {}
fn set_nonce(&mut self, _nonce: u64) {}
} }

View File

@@ -10,7 +10,7 @@ use lightning_signer::util::clock::StandardClock;
use lightning_signer::util::velocity::{VelocityControlIntervalType, VelocityControlSpec}; use lightning_signer::util::velocity::{VelocityControlIntervalType, VelocityControlSpec};
use randomstartingtime::RandomStartingTimeFactory; use randomstartingtime::RandomStartingTimeFactory;
use std::sync::Arc; use std::sync::Arc;
use vls_protocol::model::PubKey; use vls_protocol::model::{PubKey, Secret};
use vls_protocol::msgs::{self, read_serial_request_header, write_serial_response_header, Message}; use vls_protocol::msgs::{self, read_serial_request_header, write_serial_response_header, Message};
use vls_protocol::serde_bolt::WireString; use vls_protocol::serde_bolt::WireString;
use vls_protocol_signer::handler::{Handler, RootHandler}; use vls_protocol_signer::handler::{Handler, RootHandler};
@@ -141,6 +141,22 @@ pub fn handle(
Ok(out_md.bytes()) Ok(out_md.bytes())
} }
pub fn make_init_msg(network: Network, seed: [u8; 32]) -> anyhow::Result<Vec<u8>> {
let allowlist = Vec::new();
log::info!("allowlist {:?} seed {:?}", allowlist, seed);
let init = msgs::HsmdInit2 {
derivation_style: 0,
network_name: WireString(network.to_string().as_bytes().to_vec()),
dev_seed: Some(Secret(seed)),
dev_allowlist: allowlist,
};
let sequence = 0;
let mut md = MsgDriver::new_empty();
msgs::write_serial_request_header(&mut md, sequence, 0)?;
msgs::write(&mut md, init)?;
Ok(md.bytes())
}
pub fn parse_ping_and_form_response(msg_bytes: Vec<u8>) -> Vec<u8> { pub fn parse_ping_and_form_response(msg_bytes: Vec<u8>) -> Vec<u8> {
let mut m = MsgDriver::new(msg_bytes); let mut m = MsgDriver::new(msg_bytes);
let (sequence, _dbid) = msgs::read_serial_request_header(&mut m).expect("read ping header"); let (sequence, _dbid) = msgs::read_serial_request_header(&mut m).expect("read ping header");

4
sphinx-key/Cargo.lock generated
View File

@@ -1971,9 +1971,9 @@ dependencies = [
[[package]] [[package]]
name = "sphinx-auther" name = "sphinx-auther"
version = "0.1.9" version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07ba95c8bd0600a9853ed6320701423362bfeac8d69034ed9585cb289d849701" checksum = "452ac3986f03e8d403a21f81883d0f5058152af4ae006a26ee00e3a31af20302"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"base64", "base64",

View File

@@ -79,7 +79,7 @@ espflash --monitor $FLASHPORT target/riscv32imc-esp-espidf/debug/tiny-esp32
- `git clone https://github.com/stakwork/sphinx-key.git` - `git clone https://github.com/stakwork/sphinx-key.git`
- `cd sphinx-key/sphinx-key` - `cd sphinx-key/sphinx-key`
- `export CFLAGS=-fno-pic` - `export CFLAGS=-fno-pic`
- `export CC=$HOME/tiny-esp32/.embuild/espressif/tools/riscv32-esp-elf/*/riscv32-esp-elf/bin/riscv32-esp-elf-gcc` - `export CC=$HOME/tiny-esp32/.embuild/espressif/tools/riscv32-esp-elf/esp-2021r2-patch3-8.4.0/riscv32-esp-elf/bin/riscv32-esp-elf-gcc`
- `cargo build`. You are now building the sphinx-key signer! - `cargo build`. You are now building the sphinx-key signer!
- `virtualenv venv` - `virtualenv venv`
- `source venv/bin/activate` - `source venv/bin/activate`
@@ -119,7 +119,7 @@ Password: password of the wifi from the previous step
- Run `export FLASHPORT=[full file path noted in the previous step]`. In my case: `export FLASHPORT=/dev/tty.usbserial-1420` - Run `export FLASHPORT=[full file path noted in the previous step]`. In my case: `export FLASHPORT=/dev/tty.usbserial-1420`
- `cd ~/sphinx-key/sphinx-key` - `cd ~/sphinx-key/sphinx-key`
- `export CFLAGS=-fno-pic` - `export CFLAGS=-fno-pic`
- `export CC=$HOME/tiny-esp32/.embuild/espressif/tools/riscv32-esp-elf/*/riscv32-esp-elf/bin/riscv32-esp-elf-gcc` - `export CC=$HOME/tiny-esp32/.embuild/espressif/tools/riscv32-esp-elf/esp-2021r2-patch3-8.4.0/riscv32-esp-elf/bin/riscv32-esp-elf-gcc`
- `cargo build`. You are now building the sphinx-key signer! - `cargo build`. You are now building the sphinx-key signer!
- `source venv/bin/activate` - `source venv/bin/activate`
- `esptool.py --chip esp32-c3 elf2image target/riscv32imc-esp-espidf/debug/sphinx-key` - `esptool.py --chip esp32-c3 elf2image target/riscv32imc-esp-espidf/debug/sphinx-key`
@@ -140,7 +140,7 @@ espmonitor $FLASHPORT
- Run `export FLASHPORT=[full file path noted in the previous step]`. In my case: `export FLASHPORT=/dev/tty.usbserial-1420` - Run `export FLASHPORT=[full file path noted in the previous step]`. In my case: `export FLASHPORT=/dev/tty.usbserial-1420`
- `cd ~/sphinx-key/sphinx-key` - `cd ~/sphinx-key/sphinx-key`
- `export CFLAGS=-fno-pic` - `export CFLAGS=-fno-pic`
- `export CC=$HOME/tiny-esp32/.embuild/espressif/tools/riscv32-esp-elf/*/riscv32-esp-elf/bin/riscv32-esp-elf-gcc` - `export CC=$HOME/tiny-esp32/.embuild/espressif/tools/riscv32-esp-elf/esp-2021r2-patch3-8.4.0/riscv32-esp-elf/bin/riscv32-esp-elf-gcc`
- `cargo build`. - `cargo build`.
- `espflash --monitor $FLASHPORT target/riscv32imc-esp-espidf/debug/clear`. - `espflash --monitor $FLASHPORT target/riscv32imc-esp-espidf/debug/clear`.
- In the logs, wait until you see the message `NVS cleared!`. Congratulations, you have now cleared all the persistent data on the ESP32. - In the logs, wait until you see the message `NVS cleared!`. Congratulations, you have now cleared all the persistent data on the ESP32.

View File

@@ -15,6 +15,7 @@ use std::thread;
pub const VLS_TOPIC: &str = "sphinx"; pub const VLS_TOPIC: &str = "sphinx";
pub const CONTROL_TOPIC: &str = "sphinx-control"; pub const CONTROL_TOPIC: &str = "sphinx-control";
pub const RETURN_TOPIC: &str = "sphinx-return"; pub const RETURN_TOPIC: &str = "sphinx-return";
pub const CONTROL_RETURN_TOPIC: &str = "sphinx-control-return";
pub const USERNAME: &str = "sphinx-key"; pub const USERNAME: &str = "sphinx-key";
pub const PASSWORD: &str = "sphinx-key-pass"; pub const PASSWORD: &str = "sphinx-key-pass";
pub const QOS: QoS = QoS::AtMostOnce; pub const QOS: QoS = QoS::AtMostOnce;

View File

@@ -0,0 +1,26 @@
use embedded_svc::storage::StorageBase;
use esp_idf_svc::nvs_storage::EspNvsStorage;
use sphinx_key_signer::control::{ControlPersist, Controller};
use sphinx_key_signer::lightning_signer::bitcoin::Network;
use std::sync::{Arc, Mutex};
// the controller validates Control messages
pub fn controller_from_seed(
network: &Network,
seed: &[u8],
flash: Arc<Mutex<FlashPersister>>,
) -> Controller {
let (pk, sk) = sphinx_key_signer::derive_node_keys(network, seed);
Controller::new_with_persister(sk, pk, 0, flash)
}
pub struct FlashPersister(pub EspNvsStorage);
impl ControlPersist for FlashPersister {
fn reset(&mut self) {
self.0.remove("config").expect("couldnt remove config 1");
}
fn set_nonce(&mut self, nonce: u64) {
// self.0.remove("config").expect("couldnt remove config 1");
}
}

View File

@@ -1,12 +1,12 @@
use crate::conn::mqtt::{CONTROL_TOPIC, QOS, RETURN_TOPIC, VLS_TOPIC}; use crate::conn::mqtt::{CONTROL_RETURN_TOPIC, CONTROL_TOPIC, QOS, RETURN_TOPIC, VLS_TOPIC};
use crate::core::config::Config; use crate::core::config::Config;
use crate::core::init::make_init_msg; use crate::core::control::{controller_from_seed, FlashPersister};
use sphinx_key_signer::control::Controller;
use sphinx_key_signer::lightning_signer::bitcoin::Network; use sphinx_key_signer::lightning_signer::bitcoin::Network;
use sphinx_key_signer::make_init_msg;
use sphinx_key_signer::vls_protocol::model::PubKey; use sphinx_key_signer::vls_protocol::model::PubKey;
use sphinx_key_signer::{self, InitResponse}; use sphinx_key_signer::{self, InitResponse};
use std::sync::mpsc; use std::sync::{mpsc, Arc, Mutex};
use embedded_svc::httpd::Result; use embedded_svc::httpd::Result;
use embedded_svc::mqtt::client::utils::ConnState; use embedded_svc::mqtt::client::utils::ConnState;
@@ -36,12 +36,6 @@ pub enum Status {
Signing, Signing,
} }
// the controller validates Control messages
pub fn controller_from_seed(network: &Network, seed: &[u8]) -> Controller {
let (pk, sk) = sphinx_key_signer::derive_node_keys(network, seed);
Controller::new(sk, pk, 0)
}
// the main event loop // the main event loop
#[cfg(not(feature = "pingpong"))] #[cfg(not(feature = "pingpong"))]
pub fn make_event_loop( pub fn make_event_loop(
@@ -51,6 +45,7 @@ pub fn make_event_loop(
do_log: bool, do_log: bool,
led_tx: mpsc::Sender<Status>, led_tx: mpsc::Sender<Status>,
config: Config, config: Config,
flash: Arc<Mutex<FlashPersister>>,
) -> Result<()> { ) -> Result<()> {
while let Ok(event) = rx.recv() { while let Ok(event) = rx.recv() {
log::info!("BROKER IP AND PORT: {}", config.broker); log::info!("BROKER IP AND PORT: {}", config.broker);
@@ -77,7 +72,7 @@ pub fn make_event_loop(
} = sphinx_key_signer::init(init_msg, network).expect("failed to init signer"); } = sphinx_key_signer::init(init_msg, network).expect("failed to init signer");
// make the controller to validate Control messages // make the controller to validate Control messages
let mut ctrlr = controller_from_seed(&network, &config.seed[..]); let mut ctrlr = controller_from_seed(&network, &config.seed[..], flash);
// signing loop // signing loop
let dummy_peer = PubKey([0; 33]); let dummy_peer = PubKey([0; 33]);
@@ -105,7 +100,7 @@ pub fn make_event_loop(
) { ) {
Ok(b) => { Ok(b) => {
mqtt.publish(RETURN_TOPIC, QOS, false, &b) mqtt.publish(RETURN_TOPIC, QOS, false, &b)
.expect("could not publish init response"); .expect("could not publish VLS response");
} }
Err(e) => { Err(e) => {
log::error!("HANDLE FAILED {:?}", e); log::error!("HANDLE FAILED {:?}", e);
@@ -114,10 +109,12 @@ pub fn make_event_loop(
}; };
} }
Event::Control(ref msg_bytes) => { Event::Control(ref msg_bytes) => {
match ctrlr.parse_msg(msg_bytes) { log::info!("GOT A CONTROL MSG");
Ok(msg) => { match ctrlr.handle(msg_bytes) {
log::info!("CONTROL MSG {:?}", msg); Ok((response, _new_policy)) => {
// create a response and mqtt pub here // log::info!("CONTROL MSG {:?}", response);
mqtt.publish(CONTROL_RETURN_TOPIC, QOS, false, &response)
.expect("could not publish control response");
} }
Err(e) => log::warn!("error parsing ctrl msg {:?}", e), Err(e) => log::warn!("error parsing ctrl msg {:?}", e),
}; };

View File

@@ -1,20 +0,0 @@
use sphinx_key_signer::lightning_signer::bitcoin::Network;
use sphinx_key_signer::vls_protocol::model::Secret;
use sphinx_key_signer::vls_protocol::{msgs, serde_bolt::WireString};
use sphinx_key_signer::MsgDriver;
pub fn make_init_msg(network: Network, seed: [u8; 32]) -> anyhow::Result<Vec<u8>> {
let allowlist = Vec::new();
log::info!("allowlist {:?} seed {:?}", allowlist, seed);
let init = msgs::HsmdInit2 {
derivation_style: 0,
network_name: WireString(network.to_string().as_bytes().to_vec()),
dev_seed: Some(Secret(seed)),
dev_allowlist: allowlist,
};
let sequence = 0;
let mut md = MsgDriver::new_empty();
msgs::write_serial_request_header(&mut md, sequence, 0)?;
msgs::write(&mut md, init)?;
Ok(md.bytes())
}

View File

@@ -1,3 +1,3 @@
pub mod config; pub mod config;
pub mod control;
pub mod events; pub mod events;
pub mod init;

View File

@@ -3,6 +3,7 @@ mod conn;
mod core; mod core;
mod periph; mod periph;
use crate::core::control::FlashPersister;
use crate::core::{config::*, events::*}; use crate::core::{config::*, events::*};
use crate::periph::led::led_control_loop; use crate::periph::led::led_control_loop;
#[allow(unused_imports)] #[allow(unused_imports)]
@@ -10,7 +11,7 @@ use crate::periph::sd::{mount_sd_card, simple_fs_test};
use anyhow::Result; use anyhow::Result;
use esp_idf_sys as _; // If using the `binstart` feature of `esp-idf-sys`, always keep this module imported use esp_idf_sys as _; // If using the `binstart` feature of `esp-idf-sys`, always keep this module imported
use std::sync::{mpsc, Arc}; use std::sync::{mpsc, Arc, Mutex};
use std::thread; use std::thread;
use std::time::Duration; use std::time::Duration;
use std::time::SystemTime; use std::time::SystemTime;
@@ -88,8 +89,9 @@ fn main() -> Result<()> {
led_tx.send(Status::ConnectingToMqtt).unwrap(); led_tx.send(Status::ConnectingToMqtt).unwrap();
// _conn needs to stay in scope or its dropped // _conn needs to stay in scope or its dropped
let flash = Arc::new(Mutex::new(FlashPersister(store)));
loop { loop {
if let Ok(()) = make_and_launch_client(exist.clone(), led_tx.clone()) { if let Ok(()) = make_and_launch_client(exist.clone(), led_tx.clone(), flash.clone()) {
println!("Exited out of the event loop, trying again in 5 seconds..."); println!("Exited out of the event loop, trying again in 5 seconds...");
thread::sleep(Duration::from_secs(5)); thread::sleep(Duration::from_secs(5));
} else { } else {
@@ -113,7 +115,11 @@ fn main() -> Result<()> {
Ok(()) Ok(())
} }
fn make_and_launch_client(config: Config, led_tx: mpsc::Sender<Status>) -> anyhow::Result<()> { fn make_and_launch_client(
config: Config,
led_tx: mpsc::Sender<Status>,
flash: Arc<Mutex<FlashPersister>>,
) -> anyhow::Result<()> {
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
let (mqtt, connection) = conn::mqtt::make_client(&config.broker, CLIENT_ID)?; let (mqtt, connection) = conn::mqtt::make_client(&config.broker, CLIENT_ID)?;
let mqtt_client = conn::mqtt::start_listening(mqtt, connection, tx)?; let mqtt_client = conn::mqtt::start_listening(mqtt, connection, tx)?;
@@ -131,6 +137,6 @@ fn make_and_launch_client(config: Config, led_tx: mpsc::Sender<Status>) -> anyho
log::info!("Network set to {:?}", network); log::info!("Network set to {:?}", network);
log::info!(">>>>>>>>>>> blocking forever..."); log::info!(">>>>>>>>>>> blocking forever...");
log::info!("{:?}", config); log::info!("{:?}", config);
make_event_loop(mqtt_client, rx, network, do_log, led_tx, config)?; make_event_loop(mqtt_client, rx, network, do_log, led_tx, config, flash)?;
Ok(()) Ok(())
} }

View File

@@ -33,3 +33,7 @@ path = "src/config.rs"
[[bin]] [[bin]]
name = "config-server" name = "config-server"
path = "src/server.rs" path = "src/server.rs"
[[bin]]
name = "ctrl"
path = "src/ctrl.rs"

11
tester/README.md Normal file
View File

@@ -0,0 +1,11 @@
#### test control messages
cargo run --bin sphinx-key-tester -- --test --log
cd broker
cargo run -- --test
cargo run --bin ctrl

54
tester/src/ctrl.rs Normal file
View File

@@ -0,0 +1,54 @@
use dotenv::dotenv;
use serde::{Deserialize, Serialize};
use sphinx_key_parser::control::{ControlMessage, Controller};
use sphinx_key_signer::lightning_signer::bitcoin::Network;
use std::env;
use std::time::Duration;
const DEFAULT_URL: &str = "http://localhost:8000/api";
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct EcdhBody {
pub pubkey: String,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
dotenv().ok();
let nonce_string: String = env::var("NONCE").unwrap_or("0".to_string());
let nonce: u64 = nonce_string.parse::<u64>().expect("failed to parse nonce");
let broker_url: String = env::var("BROKER_URL").unwrap_or(DEFAULT_URL.to_string());
let seed_string: String = env::var("SEED").expect("no seed");
let seed = hex::decode(seed_string).expect("yo");
let mut ctrl = controller_from_seed(&Network::Regtest, &seed, nonce);
let msg = ctrl.build_msg(ControlMessage::Nonce)?;
let msg_hex = hex::encode(&msg);
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(10))
.build()
.expect("couldnt build reqwest client");
let res = client
.post(format!("{}/control?msg={}", broker_url, msg_hex))
.header("Content-Type", "application/json")
.send()
.await?;
let response: String = res.text().await?;
let res_bytes = hex::decode(response).expect("couldnt decode response");
let resp = ctrl.parse_response(&res_bytes).expect("nope");
println!("RESponse from the ESP!!! {:?}", resp);
Ok(())
}
pub fn controller_from_seed(network: &Network, seed: &[u8], nonce: u64) -> Controller {
let (pk, sk) = sphinx_key_signer::derive_node_keys(network, seed);
Controller::new(sk, pk, nonce)
}

View File

@@ -2,10 +2,12 @@ use sphinx_key_parser as parser;
use sphinx_key_signer::lightning_signer::bitcoin::Network; use sphinx_key_signer::lightning_signer::bitcoin::Network;
use clap::{App, AppSettings, Arg}; use clap::{App, AppSettings, Arg};
use dotenv::dotenv;
use rumqttc::{self, AsyncClient, Event, MqttOptions, Packet, QoS}; use rumqttc::{self, AsyncClient, Event, MqttOptions, Packet, QoS};
use sphinx_key_signer::control::Controller; use sphinx_key_signer::control::Controller;
use sphinx_key_signer::vls_protocol::{model::PubKey, msgs}; use sphinx_key_signer::vls_protocol::{model::PubKey, msgs};
use sphinx_key_signer::{self, InitResponse}; use sphinx_key_signer::{self, InitResponse};
use std::convert::TryInto;
use std::env; use std::env;
use std::error::Error; use std::error::Error;
use std::str::FromStr; use std::str::FromStr;
@@ -13,15 +15,17 @@ use std::time::Duration;
const SUB_TOPIC: &str = "sphinx"; const SUB_TOPIC: &str = "sphinx";
const CONTROL_TOPIC: &str = "sphinx-control"; const CONTROL_TOPIC: &str = "sphinx-control";
const CONTROL_PUB_TOPIC: &str = "sphinx-control-return";
const PUB_TOPIC: &str = "sphinx-return"; const PUB_TOPIC: &str = "sphinx-return";
const USERNAME: &str = "sphinx-key"; const USERNAME: &str = "sphinx-key";
const PASSWORD: &str = "sphinx-key-pass"; const PASSWORD: &str = "sphinx-key-pass";
const DEV_SEED: [u8; 32] = [0; 32];
#[tokio::main(worker_threads = 1)] #[tokio::main(worker_threads = 1)]
async fn main() -> Result<(), Box<dyn Error>> { async fn main() -> Result<(), Box<dyn Error>> {
setup_logging("sphinx-key-tester ", "info"); setup_logging("sphinx-key-tester ", "info");
dotenv().ok();
let app = App::new("tester") let app = App::new("tester")
.setting(AppSettings::NoAutoVersion) .setting(AppSettings::NoAutoVersion)
.about("CLN:mqtt-tester - MQTT client signer") .about("CLN:mqtt-tester - MQTT client signer")
@@ -48,9 +52,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
break (client, eventloop); break (client, eventloop);
} }
} }
Err(_) => { Err(e) => {
try_i = try_i + 1; try_i = try_i + 1;
println!("reconnect.... {}", try_i); println!("reconnect.... {} {:?}", try_i, e);
tokio::time::sleep(Duration::from_secs(1)).await; tokio::time::sleep(Duration::from_secs(1)).await;
} }
} }
@@ -65,8 +69,11 @@ async fn main() -> Result<(), Box<dyn Error>> {
.await .await
.expect("could not mqtt subscribe"); .expect("could not mqtt subscribe");
let network = Network::Regtest;
let seed_string: String = env::var("SEED").expect("no seed");
let seed = hex::decode(seed_string).expect("couldnt decode seed");
// make the controller to validate Control messages // make the controller to validate Control messages
let mut ctrlr = controller_from_seed(&Network::Regtest, &DEV_SEED[..]); let mut ctrlr = controller_from_seed(&network, &seed);
if is_test { if is_test {
// test handler loop // test handler loop
@@ -97,10 +104,17 @@ async fn main() -> Result<(), Box<dyn Error>> {
.expect("could not mqtt publish"); .expect("could not mqtt publish");
} }
CONTROL_TOPIC => { CONTROL_TOPIC => {
match ctrlr.parse_msg(&msg_bytes) { match ctrlr.handle(&msg_bytes) {
Ok(msg) => { Ok((response, _new_policy)) => {
log::info!("CONTROL MSG {:?}", msg); client
// create a response and mqtt pub here .publish(
CONTROL_PUB_TOPIC,
QoS::AtMostOnce,
false,
response,
)
.await
.expect("could not mqtt publish");
} }
Err(e) => log::warn!("error parsing ctrl msg {:?}", e), Err(e) => log::warn!("error parsing ctrl msg {:?}", e),
}; };
@@ -117,59 +131,59 @@ async fn main() -> Result<(), Box<dyn Error>> {
} }
} }
} else { } else {
// once the init loop is done, the root_handler is returned let seed32: [u8; 32] = seed.try_into().expect("wrong seed");
let root_handler = loop { let init_msg =
if let Ok(init_event) = eventloop.poll().await { sphinx_key_signer::make_init_msg(network, seed32).expect("failed to make init msg");
// this may be another kind of message like MQTT ConnAck let InitResponse {
// loop around again and wait for the init root_handler,
if let Some((_topic, init_msg_bytes)) = incoming_bytes(init_event) { init_reply: _,
let InitResponse { } = sphinx_key_signer::init(init_msg, network).expect("failed to init signer");
root_handler,
init_reply,
} = sphinx_key_signer::init(init_msg_bytes, Network::Regtest)
.expect("failed to init signer");
client
.publish(PUB_TOPIC, QoS::AtMostOnce, false, init_reply)
.await
.expect("could not publish init response");
// return the root_handler and finish the init loop
break Some(root_handler);
}
} else {
tokio::time::sleep(Duration::from_secs(1)).await;
log::warn!("failed to initialize! Lost connection");
break None;
}
};
// the actual handler loop // the actual handler loop
loop { loop {
if let Some(rh) = &root_handler { match eventloop.poll().await {
match eventloop.poll().await { Ok(event) => {
Ok(event) => { let dummy_peer = PubKey([0; 33]);
let dummy_peer = PubKey([0; 33]); if let Some((topic, msg_bytes)) = incoming_bytes(event) {
if let Some((_topic, msg_bytes)) = incoming_bytes(event) { match topic.as_str() {
match sphinx_key_signer::handle( SUB_TOPIC => {
rh, match sphinx_key_signer::handle(
msg_bytes, &root_handler,
dummy_peer.clone(), msg_bytes,
is_log, dummy_peer.clone(),
) { is_log,
Ok(b) => client ) {
.publish(PUB_TOPIC, QoS::AtMostOnce, false, b) Ok(b) => client
.await .publish(PUB_TOPIC, QoS::AtMostOnce, false, b)
.expect("could not publish init response"), .await
Err(e) => panic!("HANDLE FAILED {:?}", e), .expect("could not publish init response"),
}; Err(e) => panic!("HANDLE FAILED {:?}", e),
};
}
CONTROL_TOPIC => {
match ctrlr.handle(&msg_bytes) {
Ok((response, _new_policy)) => {
client
.publish(
CONTROL_PUB_TOPIC,
QoS::AtMostOnce,
false,
response,
)
.await
.expect("could not mqtt publish");
}
Err(e) => log::warn!("error parsing ctrl msg {:?}", e),
};
}
_ => log::info!("invalid topic"),
} }
} }
Err(e) => {
log::warn!("diconnected {:?}", e);
tokio::time::sleep(Duration::from_secs(1)).await;
break; // break out of this loop to reconnect
}
} }
} else { Err(e) => {
break; log::warn!("diconnected {:?}", e);
tokio::time::sleep(Duration::from_secs(1)).await;
break; // break out of this loop to reconnect
}
} }
} }
} }