From 895f79f61e1727d94a36665bd6ae4222fa4b1fcc Mon Sep 17 00:00:00 2001 From: Evan Feenstra Date: Thu, 9 Jun 2022 13:07:46 -0700 Subject: [PATCH 1/4] broker reconnection loop using tokio::select macro, both tx and rx --- broker/Cargo.toml | 4 +- broker/src/main.rs | 14 ++++-- broker/src/mqtt.rs | 110 ++++++++++++++++++++++++++++++----------- broker/src/run_test.rs | 33 ++++++++++--- broker/src/util.rs | 34 +++++++++++++ tester/src/main.rs | 18 +++---- 6 files changed, 162 insertions(+), 51 deletions(-) create mode 100644 broker/src/util.rs diff --git a/broker/Cargo.toml b/broker/Cargo.toml index a48e65a..deb5652 100644 --- a/broker/Cargo.toml +++ b/broker/Cargo.toml @@ -6,7 +6,8 @@ default-run = "sphinx-key-broker" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -rumqttd = "0.11.0" +# rumqttd = "0.11.0" +rumqttd = { path = "../../rumqtt/rumqttd" } pretty_env_logger = "0.4.0" confy = "0.4.0" tokio = { version = "1.4.0", features = ["rt", "rt-multi-thread", "macros"] } @@ -20,6 +21,7 @@ fern = { version = "0.6", features = ["colored"] } rumqttc = "0.12.0" clap = "=3.0.0-beta.2" clap_derive = "=3.0.0-beta.5" +chrono = "0.4" [features] default = ["std"] diff --git a/broker/src/main.rs b/broker/src/main.rs index 8c83ac8..ad392cb 100644 --- a/broker/src/main.rs +++ b/broker/src/main.rs @@ -1,15 +1,15 @@ mod mqtt; mod run_test; mod unix_fd; +mod util; +use crate::mqtt::start_broker; use crate::unix_fd::SignerLoop; use clap::{App, AppSettings, Arg}; -use crate::mqtt::start_broker; use std::env; use tokio::sync::{mpsc, oneshot}; use vls_proxy::client::UnixClient; use vls_proxy::connection::{open_parent_fd, UnixConnection}; -use vls_proxy::util::setup_logging; pub struct Channel { pub sequence: u16, @@ -30,7 +30,12 @@ pub struct ChannelReply { fn main() -> anyhow::Result<()> { let parent_fd = open_parent_fd(); - setup_logging("hsmd ", "info"); + /* + simple_logger::SimpleLogger::new() + .with_utc_timestamps() + .with_module_level("async_io", log::LevelFilter::Off) + */ + util::setup_logging("hsmd ", "info"); let app = App::new("signer") .setting(AppSettings::NoAutoVersion) .about("CLN:mqtt - connects to an embedded VLS over a MQTT connection") @@ -56,7 +61,8 @@ fn main() -> anyhow::Result<()> { run_test::run_test(); } else { let (tx, rx) = mpsc::channel(1000); - let _runtime = start_broker(true, rx); + let (status_tx, status_rx) = mpsc::channel(1000); + let _runtime = start_broker(rx, status_tx, "sphinx-1"); // listen to reqs from CLN let conn = UnixConnection::new(parent_fd); let client = UnixClient::new(conn); diff --git a/broker/src/mqtt.rs b/broker/src/mqtt.rs index 8ad5fce..712aa74 100644 --- a/broker/src/mqtt.rs +++ b/broker/src/mqtt.rs @@ -1,63 +1,118 @@ use crate::{ChannelReply, ChannelRequest}; -use librumqttd::{async_locallink::construct_broker, Config}; +use librumqttd::{ + async_locallink::construct_broker, + consolelink::{self, ConsoleLink}, + Config, +}; +use std::sync::Arc; use std::thread; -use tokio::sync::{mpsc, oneshot}; +use std::time::Duration; +use tokio::sync::mpsc; const SUB_TOPIC: &str = "sphinx-return"; const PUB_TOPIC: &str = "sphinx"; pub fn start_broker( - wait_for_ready_message: bool, mut receiver: mpsc::Receiver, + status_sender: mpsc::Sender, + expected_client_id: &str, ) -> tokio::runtime::Runtime { let config: Config = confy::load_path("config/rumqttd.conf").unwrap(); + let client_id = expected_client_id.to_string(); - let (mut router, console, servers, builder) = construct_broker(config); + let (mut router, servers, builder) = construct_broker(config.clone()); thread::spawn(move || { router.start().expect("could not start router"); }); + let mut client_connected = false; + + // let (status_tx, mut status_rx): (mpsc::Sender, mpsc::Receiver) = + // mpsc::channel(1000); + let mut rt_builder = tokio::runtime::Builder::new_multi_thread(); rt_builder.enable_all(); let rt = rt_builder.build().unwrap(); rt.block_on(async { - // channel to block until READY received - let (ready_tx, ready_rx) = oneshot::channel(); tokio::spawn(async move { let (msg_tx, mut msg_rx): (mpsc::Sender>, mpsc::Receiver>) = mpsc::channel(1000); - let (mut tx, mut rx) = builder.connect("localclient", 200).await.unwrap(); + let (mut tx, mut rx) = builder.clone().connect("localclient", 200).await.unwrap(); tx.subscribe([SUB_TOPIC]).await.unwrap(); - let console_task = tokio::spawn(console); + let router_tx = builder.router_tx(); + tokio::spawn(async move { + let config = config.clone().into(); + let router_tx = router_tx.clone(); + let console: Arc = Arc::new(ConsoleLink::new(config, router_tx)); + loop { + let metrics = consolelink::request_metrics(console.clone(), client_id.clone()); + match metrics.tracker() { + Some(t) => { + // wait for subscription to be sure + if t.concrete_subscriptions_len() > 0 { + if !client_connected { + println!("CLIENT CONNECTED!"); + client_connected = true; + status_sender + .send(true) + .await + .expect("couldnt send true statu"); + } + } + } + None => { + if client_connected { + println!("CLIENT DIsCONNECTED!"); + client_connected = false; + status_sender + .send(false) + .await + .expect("couldnt send false status"); + } + } + } + tokio::time::sleep(Duration::from_millis(850)).await; + } + }); let sub_task = tokio::spawn(async move { // ready message loop // let ready_tx_ = ready_tx.clone(); - if wait_for_ready_message { + loop { + // wait for CONNECTED + // loop { + // let status = status_rx.recv().await.unwrap(); + // if status { + // break; + // } + // } + // now wait for READY + // loop { + // let message = rx.recv().await.unwrap(); + // if let Some(payload) = message.payload.get(0) { + // let content = String::from_utf8_lossy(&payload[..]); + // log::info!("received message content: {}", content); + // if content == "READY" { + // // ready_tx.send(true).expect("could not send ready"); + // break; + // } + // } + // } + // now start parsing... or break for DISCONNECT + println!("OK START PARSING!"); loop { let message = rx.recv().await.unwrap(); - if let Some(payload) = message.payload.get(0) { - let content = String::from_utf8_lossy(&payload[..]); - log::info!("received message content: {}", content); - if content == "READY" { - ready_tx.send(true).expect("could not send ready"); - break; + println!("T = {}, P = {:?}", message.topic, message.payload.len()); + // println!("count {}", message.payload.len()); + for payload in message.payload { + if let Err(e) = msg_tx.send(payload.to_vec()).await { + println!("pub err {:?}", e); } } } } - loop { - let message = rx.recv().await.unwrap(); - // println!("T = {}, P = {:?}", message.topic, message.payload.len()); - // println!("count {}", message.payload.len()); - for payload in message.payload { - if let Err(e) = msg_tx.send(payload.to_vec()).await { - println!("pub err {:?}", e); - } - } - } }); let relay_task = tokio::spawn(async move { @@ -75,12 +130,7 @@ pub fn start_broker( servers.await; sub_task.await.unwrap(); relay_task.await.unwrap(); - console_task.await.unwrap(); }); - if wait_for_ready_message { - log::info!("waiting for READY..."); - ready_rx.await.expect("Could not receive from channel."); - } }); rt diff --git a/broker/src/run_test.rs b/broker/src/run_test.rs index f856ac9..865d570 100644 --- a/broker/src/run_test.rs +++ b/broker/src/run_test.rs @@ -12,16 +12,30 @@ pub fn run_test() { let mut sequence = 1; let (tx, rx) = mpsc::channel(1000); - let runtime = start_broker(true, rx); + let (status_tx, mut status_rx) = mpsc::channel(1000); + let runtime = start_broker(rx, status_tx, "test-1"); log::info!("======> READY received! start now"); runtime.block_on(async { + let mut connected = false; loop { - if let Err(e) = iteration(id, sequence, tx.clone()).await { - panic!("iteration failed {:?}", e); - } - sequence = sequence.wrapping_add(1); - id += 1; - tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + tokio::select! { + status = status_rx.recv() => { + // println!("got a status"); + if let Some(connection_status) = status { + connected = connected; + println!("========> CONNETED! {}", connection_status); + } + } + res = iteration(id, sequence, tx.clone(), connected) => { + println!("iteration! connected: {}", connected); + if let Err(e) = res { + panic!("iteration failed {:?}", e); + } + sequence = sequence.wrapping_add(1); + id += 1; + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + } + }; } }); } @@ -30,7 +44,12 @@ pub async fn iteration( id: u16, sequence: u16, tx: mpsc::Sender, + connected: bool, ) -> anyhow::Result<()> { + if !connected { + return Ok(()); + } + println!("do a ping!"); let ping = msgs::Ping { id, message: WireString("ping".as_bytes().to_vec()), diff --git a/broker/src/util.rs b/broker/src/util.rs new file mode 100644 index 0000000..a2ae7d7 --- /dev/null +++ b/broker/src/util.rs @@ -0,0 +1,34 @@ +use std::env; +use std::str::FromStr; + +pub fn setup_logging(who: &str, level_arg: &str) { + use fern::colors::{Color, ColoredLevelConfig}; + let colors = ColoredLevelConfig::new() + .info(Color::Green) + .error(Color::Red) + .warn(Color::Yellow); + let level = env::var("RUST_LOG").unwrap_or(level_arg.to_string()); + let who = who.to_string(); + fern::Dispatch::new() + .format(move |out, message, record| { + out.finish(format_args!( + "[{} {}/{} {}] {}", + chrono::Local::now().format("%Y-%m-%dT%H:%M:%S%.3f"), + who, + record.target(), + colors.color(record.level()), + message + )) + }) + .level(log::LevelFilter::from_str(&level).expect("level")) + .level_for("h2", log::LevelFilter::Info) + .level_for("sled", log::LevelFilter::Info) + .level_for( + "librumqttd::rumqttlog::router::router", + log::LevelFilter::Warn, + ) + .chain(std::io::stdout()) + // .chain(fern::log_file("/tmp/output.log")?) + .apply() + .expect("log config"); +} diff --git a/tester/src/main.rs b/tester/src/main.rs index 43ebe9c..df9fe5b 100644 --- a/tester/src/main.rs +++ b/tester/src/main.rs @@ -47,15 +47,15 @@ async fn main() -> Result<(), Box> { .await .expect("could not mqtt subscribe"); - client - .publish( - PUB_TOPIC, - QoS::AtMostOnce, - false, - "READY".as_bytes().to_vec(), - ) - .await - .expect("could not pub"); + // client + // .publish( + // PUB_TOPIC, + // QoS::AtMostOnce, + // false, + // "READY".as_bytes().to_vec(), + // ) + // .await + // .expect("could not pub"); let matches = app.get_matches(); if matches.is_present("test") { From 937ff0d8deba40a6142f67024f6d9670fb47c121 Mon Sep 17 00:00:00 2001 From: Evan Feenstra Date: Thu, 9 Jun 2022 13:31:13 -0700 Subject: [PATCH 2/4] tokio select reconnect twice, but not more? --- broker/src/main.rs | 5 ---- broker/src/mqtt.rs | 62 ++++++++++++++++++++++-------------------- broker/src/run_test.rs | 12 +++++--- 3 files changed, 40 insertions(+), 39 deletions(-) diff --git a/broker/src/main.rs b/broker/src/main.rs index ad392cb..a67d54f 100644 --- a/broker/src/main.rs +++ b/broker/src/main.rs @@ -30,11 +30,6 @@ pub struct ChannelReply { fn main() -> anyhow::Result<()> { let parent_fd = open_parent_fd(); - /* - simple_logger::SimpleLogger::new() - .with_utc_timestamps() - .with_module_level("async_io", log::LevelFilter::Off) - */ util::setup_logging("hsmd ", "info"); let app = App::new("signer") .setting(AppSettings::NoAutoVersion) diff --git a/broker/src/mqtt.rs b/broker/src/mqtt.rs index 712aa74..fe6b61a 100644 --- a/broker/src/mqtt.rs +++ b/broker/src/mqtt.rs @@ -80,43 +80,44 @@ pub fn start_broker( let sub_task = tokio::spawn(async move { // ready message loop // let ready_tx_ = ready_tx.clone(); + // loop { + // wait for CONNECTED + // loop { + // let status = status_rx.recv().await.unwrap(); + // if status { + // break; + // } + // } + // now wait for READY + // loop { + // let message = rx.recv().await.unwrap(); + // if let Some(payload) = message.payload.get(0) { + // let content = String::from_utf8_lossy(&payload[..]); + // log::info!("received message content: {}", content); + // if content == "READY" { + // // ready_tx.send(true).expect("could not send ready"); + // break; + // } + // } + // } + // now start parsing... or break for DISCONNECT + // println!("OK START PARSING!"); loop { - // wait for CONNECTED - // loop { - // let status = status_rx.recv().await.unwrap(); - // if status { - // break; - // } - // } - // now wait for READY - // loop { - // let message = rx.recv().await.unwrap(); - // if let Some(payload) = message.payload.get(0) { - // let content = String::from_utf8_lossy(&payload[..]); - // log::info!("received message content: {}", content); - // if content == "READY" { - // // ready_tx.send(true).expect("could not send ready"); - // break; - // } - // } - // } - // now start parsing... or break for DISCONNECT - println!("OK START PARSING!"); - loop { - let message = rx.recv().await.unwrap(); - println!("T = {}, P = {:?}", message.topic, message.payload.len()); - // println!("count {}", message.payload.len()); - for payload in message.payload { - if let Err(e) = msg_tx.send(payload.to_vec()).await { - println!("pub err {:?}", e); - } + let message = rx.recv().await.unwrap(); + println!("T = {}, P = {:?}", message.topic, message.payload.len()); + // println!("count {}", message.payload.len()); + for payload in message.payload { + if let Err(e) = msg_tx.send(payload.to_vec()).await { + println!("pub err {:?}", e); } } } + // } }); let relay_task = tokio::spawn(async move { - while let Some(msg) = receiver.recv().await { + loop { + let msg = receiver.recv().await.unwrap(); tx.publish(PUB_TOPIC, false, msg.message) .await .expect("could not mqtt pub"); @@ -125,6 +126,7 @@ pub fn start_broker( log::warn!("could not send on reply_tx"); } } + // println!("ABORT! relay task finished <<<<<<<<<<<<<<<"); }); servers.await; diff --git a/broker/src/run_test.rs b/broker/src/run_test.rs index 865d570..db86a8f 100644 --- a/broker/src/run_test.rs +++ b/broker/src/run_test.rs @@ -22,17 +22,21 @@ pub fn run_test() { status = status_rx.recv() => { // println!("got a status"); if let Some(connection_status) = status { - connected = connected; + connected = connection_status; + id = 0; + sequence = 1; println!("========> CONNETED! {}", connection_status); } } res = iteration(id, sequence, tx.clone(), connected) => { - println!("iteration! connected: {}", connected); + println!("iteration! {}", connected); if let Err(e) = res { panic!("iteration failed {:?}", e); } - sequence = sequence.wrapping_add(1); - id += 1; + if connected { + sequence = sequence.wrapping_add(1); + id += 1; + } tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; } }; From f1e8e13dda9d8f1752f0749415aa5631019e69e9 Mon Sep 17 00:00:00 2001 From: Evan Feenstra Date: Thu, 9 Jun 2022 19:23:40 -0700 Subject: [PATCH 3/4] broker reconnect working --- broker/src/main.rs | 4 ++- broker/src/mqtt.rs | 77 +++++++++++++++--------------------------- broker/src/run_test.rs | 11 +++--- 3 files changed, 34 insertions(+), 58 deletions(-) diff --git a/broker/src/main.rs b/broker/src/main.rs index a67d54f..eed32e5 100644 --- a/broker/src/main.rs +++ b/broker/src/main.rs @@ -17,12 +17,14 @@ pub struct Channel { } /// Responses are received on the oneshot sender +#[derive(Debug)] pub struct ChannelRequest { pub message: Vec, pub reply_tx: oneshot::Sender, } // mpsc reply +#[derive(Debug)] pub struct ChannelReply { pub reply: Vec, } @@ -48,7 +50,7 @@ fn main() -> anyhow::Result<()> { // Pretend to be the right version, given to us by an env var let version = env::var("GREENLIGHT_VERSION").expect("set GREENLIGHT_VERSION to match c-lightning"); - println!("{}", version); + log::info!("{}", version); return Ok(()); } diff --git a/broker/src/mqtt.rs b/broker/src/mqtt.rs index fe6b61a..15340bb 100644 --- a/broker/src/mqtt.rs +++ b/broker/src/mqtt.rs @@ -8,6 +8,7 @@ use std::sync::Arc; use std::thread; use std::time::Duration; use tokio::sync::mpsc; +use tokio::time::timeout; const SUB_TOPIC: &str = "sphinx-return"; const PUB_TOPIC: &str = "sphinx"; @@ -48,85 +49,61 @@ pub fn start_broker( let console: Arc = Arc::new(ConsoleLink::new(config, router_tx)); loop { let metrics = consolelink::request_metrics(console.clone(), client_id.clone()); - match metrics.tracker() { + let changed: Option = match metrics.tracker() { Some(t) => { // wait for subscription to be sure if t.concrete_subscriptions_len() > 0 { if !client_connected { - println!("CLIENT CONNECTED!"); - client_connected = true; - status_sender - .send(true) - .await - .expect("couldnt send true statu"); + Some(true) // changed to true + } else { + None } + } else { + None } } None => { if client_connected { - println!("CLIENT DIsCONNECTED!"); - client_connected = false; - status_sender - .send(false) - .await - .expect("couldnt send false status"); + Some(false) + } else { + None } } + }; + if let Some(c) = changed { + client_connected = c; + status_sender + .send(c) + .await + .expect("couldnt send connection status"); } - tokio::time::sleep(Duration::from_millis(850)).await; + tokio::time::sleep(Duration::from_millis(800)).await; } }); let sub_task = tokio::spawn(async move { - // ready message loop - // let ready_tx_ = ready_tx.clone(); - // loop { - // wait for CONNECTED - // loop { - // let status = status_rx.recv().await.unwrap(); - // if status { - // break; - // } - // } - // now wait for READY - // loop { - // let message = rx.recv().await.unwrap(); - // if let Some(payload) = message.payload.get(0) { - // let content = String::from_utf8_lossy(&payload[..]); - // log::info!("received message content: {}", content); - // if content == "READY" { - // // ready_tx.send(true).expect("could not send ready"); - // break; - // } - // } - // } - // now start parsing... or break for DISCONNECT - // println!("OK START PARSING!"); - loop { - let message = rx.recv().await.unwrap(); - println!("T = {}, P = {:?}", message.topic, message.payload.len()); - // println!("count {}", message.payload.len()); + while let Ok(message) = rx.recv().await { for payload in message.payload { if let Err(e) = msg_tx.send(payload.to_vec()).await { - println!("pub err {:?}", e); + log::warn!("pub err {:?}", e); } } } - // } }); let relay_task = tokio::spawn(async move { - loop { - let msg = receiver.recv().await.unwrap(); + while let Some(msg) = receiver.recv().await { tx.publish(PUB_TOPIC, false, msg.message) .await .expect("could not mqtt pub"); - let reply = msg_rx.recv().await.expect("could not unwrap msg_rx.recv()"); - if let Err(_) = msg.reply_tx.send(ChannelReply { reply }) { - log::warn!("could not send on reply_tx"); + if let Ok(reply) = timeout(Duration::from_millis(1000), msg_rx.recv()).await { + if let Err(_) = msg.reply_tx.send(ChannelReply { + reply: reply.unwrap(), + }) { + log::warn!("could not send on reply_tx"); + } } } - // println!("ABORT! relay task finished <<<<<<<<<<<<<<<"); }); servers.await; diff --git a/broker/src/run_test.rs b/broker/src/run_test.rs index db86a8f..952cc8f 100644 --- a/broker/src/run_test.rs +++ b/broker/src/run_test.rs @@ -14,24 +14,21 @@ pub fn run_test() { let (tx, rx) = mpsc::channel(1000); let (status_tx, mut status_rx) = mpsc::channel(1000); let runtime = start_broker(rx, status_tx, "test-1"); - log::info!("======> READY received! start now"); runtime.block_on(async { let mut connected = false; loop { tokio::select! { status = status_rx.recv() => { - // println!("got a status"); if let Some(connection_status) = status { connected = connection_status; id = 0; sequence = 1; - println!("========> CONNETED! {}", connection_status); + log::info!("========> CONNETED! {}", connection_status); } } res = iteration(id, sequence, tx.clone(), connected) => { - println!("iteration! {}", connected); if let Err(e) = res { - panic!("iteration failed {:?}", e); + log::warn!("===> iteration failed {:?}", e); } if connected { sequence = sequence.wrapping_add(1); @@ -53,7 +50,7 @@ pub async fn iteration( if !connected { return Ok(()); } - println!("do a ping!"); + // log::info!("do a ping!"); let ping = msgs::Ping { id, message: WireString("ping".as_bytes().to_vec()), @@ -65,7 +62,7 @@ pub async fn iteration( message: ping_bytes, reply_tx, }; - let _ = tx.send(request).await; + tx.send(request).await?; let res = reply_rx.await?; let reply = parser::response_from_bytes(res.reply, sequence)?; match reply { From 7ee7ac071cba1ee1d1fbad86b5a59f3f009b3fb9 Mon Sep 17 00:00:00 2001 From: Evan Feenstra Date: Fri, 10 Jun 2022 10:08:11 -0700 Subject: [PATCH 4/4] refactor broker metrics conn, single threaded virtual esp, hardcoded config, start auther module --- Cargo.toml | 3 +- auther/Cargo.toml | 17 ++++ auther/src/lib.rs | 64 +++++++++++++ broker/Cargo.toml | 8 +- broker/src/mqtt.rs | 121 ++++++++++++++++------- broker/src/run_test.rs | 6 +- broker/src/test_client.rs | 59 ------------ tester/src/main.rs | 195 +++++++++++++++++++++----------------- 8 files changed, 283 insertions(+), 190 deletions(-) create mode 100644 auther/Cargo.toml create mode 100644 auther/src/lib.rs delete mode 100644 broker/src/test_client.rs diff --git a/Cargo.toml b/Cargo.toml index c6908db..3a1c696 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,8 @@ members = [ "signer", "broker", "parser", - "tester" + "auther", + "tester", ] exclude = [ diff --git a/auther/Cargo.toml b/auther/Cargo.toml new file mode 100644 index 0000000..6deb5cd --- /dev/null +++ b/auther/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "sphinx-key-auther" +version = "0.1.0" +authors = ["Evan Feenstra "] +edition = "2018" + +[dependencies] +secp256k1 = { version = "0.22.0", default-features = false, features = ["std", "rand-std", "bitcoin_hashes"] } +anyhow = {version = "1", features = ["backtrace"]} +log = "0.4" + +[features] +default = [ "no-std", "secp-recovery", "secp-lowmemory" ] +no-std = ["secp256k1/alloc"] +secp-lowmemory = ["secp256k1/lowmemory"] +secp-recovery = ["secp256k1/recovery"] +rand = ["secp256k1/rand-std"] \ No newline at end of file diff --git a/auther/src/lib.rs b/auther/src/lib.rs new file mode 100644 index 0000000..e2c4695 --- /dev/null +++ b/auther/src/lib.rs @@ -0,0 +1,64 @@ +use secp256k1::ecdsa::Signature; +use secp256k1::hashes::sha256d::Hash as Sha256dHash; +use secp256k1::hashes::Hash; +use secp256k1::{Message, Secp256k1, SecretKey}; + +pub struct Token(u64); + +impl Token { + pub fn new() -> Self { + Self(0) + } + /// Sign a Lightning message + pub fn sign_message( + &self, + message: &Vec, + secret_key: &SecretKey, + ) -> anyhow::Result> { + let mut buffer = String::from("Lightning Signed Message:").into_bytes(); + buffer.extend(message); + let secp_ctx = Secp256k1::signing_only(); + let hash = Sha256dHash::hash(&buffer); + let encmsg = secp256k1::Message::from_slice(&hash[..])?; + let sig = secp_ctx.sign_ecdsa_recoverable(&encmsg, &secret_key); + let (rid, sig) = sig.serialize_compact(); + let mut res = sig.to_vec(); + res.push(rid.to_i32() as u8); + Ok(res) + } +} + +pub fn sign( + secp: &Secp256k1, + input: Vec, + secret_key: &SecretKey, +) -> Signature { + let message = hash_message(input); + secp.sign_ecdsa(&message, &secret_key) +} + +pub fn hash_message(input: Vec) -> Message { + let hash = Sha256dHash::hash(&input); + Message::from_slice(&hash[..]).expect("encmsg failed") +} + +#[cfg(test)] +mod tests { + use crate::*; + use secp256k1::{PublicKey, Secp256k1, SecretKey}; + + fn secret_key() -> SecretKey { + SecretKey::from_slice(&[0xcd; 32]).expect("32 bytes, within curve order") + } + + #[test] + fn test_sign() { + let secp = Secp256k1::new(); + let sk = secret_key(); + let public_key = PublicKey::from_secret_key(&secp, &sk); + let input = vec![1, 2, 3]; + let message = hash_message(input); + let sig = sign(&secp, vec![1, 2, 3], &sk); + assert!(secp.verify_ecdsa(&message, &sig, &public_key).is_ok()); + } +} diff --git a/broker/Cargo.toml b/broker/Cargo.toml index deb5652..434467f 100644 --- a/broker/Cargo.toml +++ b/broker/Cargo.toml @@ -6,8 +6,7 @@ default-run = "sphinx-key-broker" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -# rumqttd = "0.11.0" -rumqttd = { path = "../../rumqtt/rumqttd" } +rumqttd = { git = "https://github.com/Evanfeenstra/rumqtt", branch = "metrics" } pretty_env_logger = "0.4.0" confy = "0.4.0" tokio = { version = "1.4.0", features = ["rt", "rt-multi-thread", "macros"] } @@ -26,8 +25,3 @@ chrono = "0.4" [features] default = ["std"] std = ["vls-protocol/std"] - -[[bin]] -name = "test_client" -path = "src/test_client.rs" -# ./target/debug/test_client \ No newline at end of file diff --git a/broker/src/mqtt.rs b/broker/src/mqtt.rs index 15340bb..fd09938 100644 --- a/broker/src/mqtt.rs +++ b/broker/src/mqtt.rs @@ -1,9 +1,11 @@ use crate::{ChannelReply, ChannelRequest}; use librumqttd::{ - async_locallink::construct_broker, + async_locallink, consolelink::{self, ConsoleLink}, + rumqttlog::router::ConnectionMetrics, Config, }; + use std::sync::Arc; use std::thread; use std::time::Duration; @@ -12,16 +14,18 @@ use tokio::time::timeout; const SUB_TOPIC: &str = "sphinx-return"; const PUB_TOPIC: &str = "sphinx"; +const USERNAME: &str = "sphinx-key"; +const PASSWORD: &str = "sphinx-key-pass"; pub fn start_broker( mut receiver: mpsc::Receiver, status_sender: mpsc::Sender, expected_client_id: &str, ) -> tokio::runtime::Runtime { - let config: Config = confy::load_path("config/rumqttd.conf").unwrap(); + let config = config(); let client_id = expected_client_id.to_string(); - let (mut router, servers, builder) = construct_broker(config.clone()); + let (mut router, servers, builder) = async_locallink::construct(config.clone()); thread::spawn(move || { router.start().expect("could not start router"); @@ -29,9 +33,6 @@ pub fn start_broker( let mut client_connected = false; - // let (status_tx, mut status_rx): (mpsc::Sender, mpsc::Receiver) = - // mpsc::channel(1000); - let mut rt_builder = tokio::runtime::Builder::new_multi_thread(); rt_builder.enable_all(); let rt = rt_builder.build().unwrap(); @@ -39,8 +40,9 @@ pub fn start_broker( tokio::spawn(async move { let (msg_tx, mut msg_rx): (mpsc::Sender>, mpsc::Receiver>) = mpsc::channel(1000); - let (mut tx, mut rx) = builder.clone().connect("localclient", 200).await.unwrap(); - tx.subscribe([SUB_TOPIC]).await.unwrap(); + let (mut link_tx, mut link_rx) = + builder.clone().connect("localclient", 200).await.unwrap(); + link_tx.subscribe([SUB_TOPIC]).await.unwrap(); let router_tx = builder.router_tx(); tokio::spawn(async move { @@ -49,40 +51,19 @@ pub fn start_broker( let console: Arc = Arc::new(ConsoleLink::new(config, router_tx)); loop { let metrics = consolelink::request_metrics(console.clone(), client_id.clone()); - let changed: Option = match metrics.tracker() { - Some(t) => { - // wait for subscription to be sure - if t.concrete_subscriptions_len() > 0 { - if !client_connected { - Some(true) // changed to true - } else { - None - } - } else { - None - } - } - None => { - if client_connected { - Some(false) - } else { - None - } - } - }; - if let Some(c) = changed { + if let Some(c) = metrics_to_status(metrics, client_connected) { client_connected = c; status_sender .send(c) .await .expect("couldnt send connection status"); } - tokio::time::sleep(Duration::from_millis(800)).await; + tokio::time::sleep(Duration::from_millis(500)).await; } }); let sub_task = tokio::spawn(async move { - while let Ok(message) = rx.recv().await { + while let Ok(message) = link_rx.recv().await { for payload in message.payload { if let Err(e) = msg_tx.send(payload.to_vec()).await { log::warn!("pub err {:?}", e); @@ -93,7 +74,8 @@ pub fn start_broker( let relay_task = tokio::spawn(async move { while let Some(msg) = receiver.recv().await { - tx.publish(PUB_TOPIC, false, msg.message) + link_tx + .publish(PUB_TOPIC, false, msg.message) .await .expect("could not mqtt pub"); if let Ok(reply) = timeout(Duration::from_millis(1000), msg_rx.recv()).await { @@ -114,3 +96,76 @@ pub fn start_broker( rt } + +fn metrics_to_status(metrics: ConnectionMetrics, client_connected: bool) -> Option { + match metrics.tracker() { + Some(t) => { + // wait for subscription to be sure + if t.concrete_subscriptions_count() > 0 { + if !client_connected { + Some(true) // changed to true + } else { + None + } + } else { + None + } + } + None => { + if client_connected { + Some(false) + } else { + None + } + } + } +} + +fn config() -> Config { + use librumqttd::rumqttlog::Config as RouterConfig; + use librumqttd::{ + ConnectionLoginCredentials, ConnectionSettings, ConsoleSettings, ServerSettings, + }; + use std::collections::HashMap; + use std::net::{Ipv4Addr, SocketAddrV4}; + use std::path::PathBuf; + let id = 0; + let router = RouterConfig { + id, + dir: PathBuf::from("/tmp/rumqttd"), + max_segment_size: 10240, + max_segment_count: 10, + max_connections: 10001, + }; + let mut servers = HashMap::new(); + servers.insert( + "0".to_string(), + ServerSettings { + cert: None, + listen: SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 1883).into(), + next_connection_delay_ms: 1, + connections: ConnectionSettings { + connection_timeout_ms: 5000, + max_client_id_len: 256, + throttle_delay_ms: 0, + max_payload_size: 5120, + max_inflight_count: 200, + max_inflight_size: 1024, + login_credentials: Some(vec![ConnectionLoginCredentials { + username: USERNAME.to_string(), + password: PASSWORD.to_string(), + }]), + }, + }, + ); + Config { + id, + servers, + router, + console: ConsoleSettings { + listen: SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 3030).into(), + }, + cluster: None, + replicator: None, + } +} diff --git a/broker/src/run_test.rs b/broker/src/run_test.rs index 952cc8f..2039f72 100644 --- a/broker/src/run_test.rs +++ b/broker/src/run_test.rs @@ -5,6 +5,8 @@ use tokio::sync::{mpsc, oneshot}; use vls_protocol::serde_bolt::WireString; use vls_protocol::{msgs, msgs::Message}; +const CLIENT_ID: &str = "test-1"; + pub fn run_test() { log::info!("TEST..."); @@ -13,7 +15,7 @@ pub fn run_test() { let (tx, rx) = mpsc::channel(1000); let (status_tx, mut status_rx) = mpsc::channel(1000); - let runtime = start_broker(rx, status_tx, "test-1"); + let runtime = start_broker(rx, status_tx, CLIENT_ID); runtime.block_on(async { let mut connected = false; loop { @@ -23,7 +25,7 @@ pub fn run_test() { connected = connection_status; id = 0; sequence = 1; - log::info!("========> CONNETED! {}", connection_status); + log::info!("========> CONNECTED! {}", connection_status); } } res = iteration(id, sequence, tx.clone(), connected) => { diff --git a/broker/src/test_client.rs b/broker/src/test_client.rs deleted file mode 100644 index 7bf0f80..0000000 --- a/broker/src/test_client.rs +++ /dev/null @@ -1,59 +0,0 @@ - -use sphinx_key_parser::MsgDriver; - -use tokio::{task, time}; -use rumqttc::{self, AsyncClient, MqttOptions, QoS, Event, Packet}; -use std::error::Error; -use std::time::Duration; -use vls_protocol::msgs; - -#[tokio::main(worker_threads = 1)] -async fn main() -> Result<(), Box> { - pretty_env_logger::init(); - // color_backtrace::install(); - - let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883); - mqttoptions.set_credentials("sphinx-key", "sphinx-key-pass"); - mqttoptions.set_keep_alive(Duration::from_secs(5)); - - let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10); - task::spawn(async move { - requests(client).await; - time::sleep(Duration::from_secs(3)).await; - }); - - loop { - let event = eventloop.poll().await; - // println!("{:?}", event.unwrap()); - if let Event::Incoming(packet) = event.unwrap() { - if let Packet::Publish(p) = packet { - let mut m = MsgDriver::new(p.payload.to_vec()); - let (sequence, dbid) = msgs::read_serial_request_header(&mut m).expect("read ping header"); - assert_eq!(dbid, 0); - assert_eq!(sequence, 0); - let ping: msgs::Ping = - msgs::read_message(&mut m).expect("failed to read ping message"); - println!("INCOMING: {:?}", ping); - } - } - } -} - -async fn requests(client: AsyncClient) { - - client - .subscribe("sphinx", QoS::AtMostOnce) - .await - .unwrap(); - - for _ in 1..=10 { - client - .publish("trigger", QoS::AtMostOnce, false, vec![1; 1]) - .await - .unwrap(); - - time::sleep(Duration::from_secs(1)).await; - } - - time::sleep(Duration::from_secs(120)).await; -} \ No newline at end of file diff --git a/tester/src/main.rs b/tester/src/main.rs index df9fe5b..7e007e6 100644 --- a/tester/src/main.rs +++ b/tester/src/main.rs @@ -20,97 +20,116 @@ async fn main() -> Result<(), Box> { .setting(AppSettings::NoAutoVersion) .about("CLN:mqtt-tester - MQTT client signer") .arg(Arg::from("--test run a test against the embedded device")); - - let mut try_i = 0; - let (client, mut eventloop) = loop { - let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883); - mqttoptions.set_credentials(USERNAME, PASSWORD); - mqttoptions.set_keep_alive(Duration::from_secs(5)); - let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10); - match eventloop.poll().await { - Ok(event) => { - if let Some(_) = incoming_conn_ack(event) { - println!("==========> MQTT connected!"); - break (client, eventloop); + let matches = app.get_matches(); + let is_test = matches.is_present("test"); + // main loop - alternate between "reconnection" and "handler" + loop { + let mut try_i = 0; + let (client, mut eventloop) = loop { + let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883); + mqttoptions.set_credentials(USERNAME, PASSWORD); + mqttoptions.set_keep_alive(Duration::from_secs(5)); + let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10); + match eventloop.poll().await { + Ok(event) => { + if let Some(_) = incoming_conn_ack(event) { + println!("==========> MQTT connected!"); + break (client, eventloop); + } + } + Err(_) => { + try_i = try_i + 1; + println!("reconnect.... {}", try_i); + tokio::time::sleep(Duration::from_secs(1)).await; } } - Err(_) => { - try_i = try_i + 1; - println!("reconnect.... {}", try_i); - tokio::time::sleep(Duration::from_secs(1)).await; - } - } - }; - - client - .subscribe(SUB_TOPIC, QoS::AtMostOnce) - .await - .expect("could not mqtt subscribe"); - - // client - // .publish( - // PUB_TOPIC, - // QoS::AtMostOnce, - // false, - // "READY".as_bytes().to_vec(), - // ) - // .await - // .expect("could not pub"); - - let matches = app.get_matches(); - if matches.is_present("test") { - loop { - let event = eventloop.poll().await.expect("failed to unwrap event"); - // println!("{:?}", event); - if let Some(ping_bytes) = incoming_bytes(event) { - let (ping, sequence, dbid): (msgs::Ping, u16, u64) = - parser::request_from_bytes(ping_bytes).expect("read ping header"); - println!("sequence {}", sequence); - println!("dbid {}", dbid); - println!("INCOMING: {:?}", ping); - let pong = msgs::Pong { - id: ping.id, - message: ping.message, - }; - let bytes = parser::raw_response_from_msg(pong, sequence) - .expect("couldnt parse raw response"); - client - .publish(PUB_TOPIC, QoS::AtMostOnce, false, bytes) - .await - .expect("could not mqtt publish"); - } - } - } else { - // once the init loop is done, the root_handler is returned - let root_handler = loop { - let init_event = eventloop.poll().await.expect("failed to unwrap event"); - // this may be another kind of message like MQTT ConnAck - // loop around again and wait for the init - if let Some(init_msg_bytes) = incoming_bytes(init_event) { - let InitResponse { - root_handler, - init_reply, - } = sphinx_key_signer::init(init_msg_bytes).expect("failed to init signer"); - client - .publish(PUB_TOPIC, QoS::AtMostOnce, false, init_reply) - .await - .expect("could not publish init response"); - // return the root_handler and finish the init loop - break root_handler; - } }; - // the actual loop - loop { - let event = eventloop.poll().await.expect("failed to unwrap event"); - let dummy_peer = PubKey([0; 33]); - if let Some(msg_bytes) = incoming_bytes(event) { - match sphinx_key_signer::handle(&root_handler, msg_bytes, dummy_peer.clone()) { - Ok(b) => client - .publish(PUB_TOPIC, QoS::AtMostOnce, false, b) - .await - .expect("could not publish init response"), - Err(e) => panic!("HANDLE FAILED {:?}", e), - }; + + client + .subscribe(SUB_TOPIC, QoS::AtMostOnce) + .await + .expect("could not mqtt subscribe"); + + if is_test { + // test handler loop + loop { + match eventloop.poll().await { + Ok(event) => { + // println!("{:?}", event); + if let Some(ping_bytes) = incoming_bytes(event) { + let (ping, sequence, dbid): (msgs::Ping, u16, u64) = + parser::request_from_bytes(ping_bytes).expect("read ping header"); + println!("sequence {}", sequence); + println!("dbid {}", dbid); + println!("INCOMING: {:?}", ping); + let pong = msgs::Pong { + id: ping.id, + message: ping.message, + }; + let bytes = parser::raw_response_from_msg(pong, sequence) + .expect("couldnt parse raw response"); + client + .publish(PUB_TOPIC, QoS::AtMostOnce, false, bytes) + .await + .expect("could not mqtt publish"); + } + } + Err(e) => { + log::warn!("diconnected {:?}", e); + tokio::time::sleep(Duration::from_secs(1)).await; + break; // break out of this loop to reconnect + } + } + } + } else { + // once the init loop is done, the root_handler is returned + let root_handler = loop { + if let Ok(init_event) = eventloop.poll().await { + // this may be another kind of message like MQTT ConnAck + // loop around again and wait for the init + if let Some(init_msg_bytes) = incoming_bytes(init_event) { + let InitResponse { + root_handler, + init_reply, + } = sphinx_key_signer::init(init_msg_bytes).expect("failed to init signer"); + client + .publish(PUB_TOPIC, QoS::AtMostOnce, false, init_reply) + .await + .expect("could not publish init response"); + // return the root_handler and finish the init loop + break Some(root_handler); + } + } else { + tokio::time::sleep(Duration::from_secs(1)).await; + log::warn!("failed to initialize! Lost connection"); + break None; + } + }; + // the actual handler loop + loop { + if let Some(rh) = &root_handler { + match eventloop.poll().await { + Ok(event) => { + let dummy_peer = PubKey([0; 33]); + if let Some(msg_bytes) = incoming_bytes(event) { + match sphinx_key_signer::handle(rh, msg_bytes, dummy_peer.clone()) { + Ok(b) => client + .publish(PUB_TOPIC, QoS::AtMostOnce, false, b) + .await + .expect("could not publish init response"), + Err(e) => panic!("HANDLE FAILED {:?}", e), + }; + } + } + Err(e) => { + log::warn!("diconnected {:?}", e); + tokio::time::sleep(Duration::from_secs(1)).await; + break; // break out of this loop to reconnect + } + } + } else { + break; + } } } }