mirror of
https://github.com/stakwork/sphinx-key.git
synced 2025-12-19 08:14:28 +01:00
Merge pull request #27 from stakwork/feat/broker-reconnect
Feat/broker reconnect
This commit is contained in:
@@ -4,7 +4,8 @@ members = [
|
|||||||
"signer",
|
"signer",
|
||||||
"broker",
|
"broker",
|
||||||
"parser",
|
"parser",
|
||||||
"tester"
|
"auther",
|
||||||
|
"tester",
|
||||||
]
|
]
|
||||||
|
|
||||||
exclude = [
|
exclude = [
|
||||||
|
|||||||
17
auther/Cargo.toml
Normal file
17
auther/Cargo.toml
Normal file
@@ -0,0 +1,17 @@
|
|||||||
|
[package]
|
||||||
|
name = "sphinx-key-auther"
|
||||||
|
version = "0.1.0"
|
||||||
|
authors = ["Evan Feenstra <evanfeenstra@gmail.com>"]
|
||||||
|
edition = "2018"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
secp256k1 = { version = "0.22.0", default-features = false, features = ["std", "rand-std", "bitcoin_hashes"] }
|
||||||
|
anyhow = {version = "1", features = ["backtrace"]}
|
||||||
|
log = "0.4"
|
||||||
|
|
||||||
|
[features]
|
||||||
|
default = [ "no-std", "secp-recovery", "secp-lowmemory" ]
|
||||||
|
no-std = ["secp256k1/alloc"]
|
||||||
|
secp-lowmemory = ["secp256k1/lowmemory"]
|
||||||
|
secp-recovery = ["secp256k1/recovery"]
|
||||||
|
rand = ["secp256k1/rand-std"]
|
||||||
64
auther/src/lib.rs
Normal file
64
auther/src/lib.rs
Normal file
@@ -0,0 +1,64 @@
|
|||||||
|
use secp256k1::ecdsa::Signature;
|
||||||
|
use secp256k1::hashes::sha256d::Hash as Sha256dHash;
|
||||||
|
use secp256k1::hashes::Hash;
|
||||||
|
use secp256k1::{Message, Secp256k1, SecretKey};
|
||||||
|
|
||||||
|
pub struct Token(u64);
|
||||||
|
|
||||||
|
impl Token {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self(0)
|
||||||
|
}
|
||||||
|
/// Sign a Lightning message
|
||||||
|
pub fn sign_message(
|
||||||
|
&self,
|
||||||
|
message: &Vec<u8>,
|
||||||
|
secret_key: &SecretKey,
|
||||||
|
) -> anyhow::Result<Vec<u8>> {
|
||||||
|
let mut buffer = String::from("Lightning Signed Message:").into_bytes();
|
||||||
|
buffer.extend(message);
|
||||||
|
let secp_ctx = Secp256k1::signing_only();
|
||||||
|
let hash = Sha256dHash::hash(&buffer);
|
||||||
|
let encmsg = secp256k1::Message::from_slice(&hash[..])?;
|
||||||
|
let sig = secp_ctx.sign_ecdsa_recoverable(&encmsg, &secret_key);
|
||||||
|
let (rid, sig) = sig.serialize_compact();
|
||||||
|
let mut res = sig.to_vec();
|
||||||
|
res.push(rid.to_i32() as u8);
|
||||||
|
Ok(res)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn sign<T: secp256k1::Signing>(
|
||||||
|
secp: &Secp256k1<T>,
|
||||||
|
input: Vec<u8>,
|
||||||
|
secret_key: &SecretKey,
|
||||||
|
) -> Signature {
|
||||||
|
let message = hash_message(input);
|
||||||
|
secp.sign_ecdsa(&message, &secret_key)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn hash_message(input: Vec<u8>) -> Message {
|
||||||
|
let hash = Sha256dHash::hash(&input);
|
||||||
|
Message::from_slice(&hash[..]).expect("encmsg failed")
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use crate::*;
|
||||||
|
use secp256k1::{PublicKey, Secp256k1, SecretKey};
|
||||||
|
|
||||||
|
fn secret_key() -> SecretKey {
|
||||||
|
SecretKey::from_slice(&[0xcd; 32]).expect("32 bytes, within curve order")
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_sign() {
|
||||||
|
let secp = Secp256k1::new();
|
||||||
|
let sk = secret_key();
|
||||||
|
let public_key = PublicKey::from_secret_key(&secp, &sk);
|
||||||
|
let input = vec![1, 2, 3];
|
||||||
|
let message = hash_message(input);
|
||||||
|
let sig = sign(&secp, vec![1, 2, 3], &sk);
|
||||||
|
assert!(secp.verify_ecdsa(&message, &sig, &public_key).is_ok());
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -6,7 +6,7 @@ default-run = "sphinx-key-broker"
|
|||||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
rumqttd = "0.11.0"
|
rumqttd = { git = "https://github.com/Evanfeenstra/rumqtt", branch = "metrics" }
|
||||||
pretty_env_logger = "0.4.0"
|
pretty_env_logger = "0.4.0"
|
||||||
confy = "0.4.0"
|
confy = "0.4.0"
|
||||||
tokio = { version = "1.4.0", features = ["rt", "rt-multi-thread", "macros"] }
|
tokio = { version = "1.4.0", features = ["rt", "rt-multi-thread", "macros"] }
|
||||||
@@ -20,12 +20,8 @@ fern = { version = "0.6", features = ["colored"] }
|
|||||||
rumqttc = "0.12.0"
|
rumqttc = "0.12.0"
|
||||||
clap = "=3.0.0-beta.2"
|
clap = "=3.0.0-beta.2"
|
||||||
clap_derive = "=3.0.0-beta.5"
|
clap_derive = "=3.0.0-beta.5"
|
||||||
|
chrono = "0.4"
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
default = ["std"]
|
default = ["std"]
|
||||||
std = ["vls-protocol/std"]
|
std = ["vls-protocol/std"]
|
||||||
|
|
||||||
[[bin]]
|
|
||||||
name = "test_client"
|
|
||||||
path = "src/test_client.rs"
|
|
||||||
# ./target/debug/test_client
|
|
||||||
@@ -2,6 +2,7 @@ mod init;
|
|||||||
mod mqtt;
|
mod mqtt;
|
||||||
mod run_test;
|
mod run_test;
|
||||||
mod unix_fd;
|
mod unix_fd;
|
||||||
|
mod util;
|
||||||
|
|
||||||
use crate::mqtt::start_broker;
|
use crate::mqtt::start_broker;
|
||||||
use crate::unix_fd::SignerLoop;
|
use crate::unix_fd::SignerLoop;
|
||||||
@@ -10,7 +11,6 @@ use std::env;
|
|||||||
use tokio::sync::{mpsc, oneshot};
|
use tokio::sync::{mpsc, oneshot};
|
||||||
use vls_proxy::client::UnixClient;
|
use vls_proxy::client::UnixClient;
|
||||||
use vls_proxy::connection::{open_parent_fd, UnixConnection};
|
use vls_proxy::connection::{open_parent_fd, UnixConnection};
|
||||||
use vls_proxy::util::setup_logging;
|
|
||||||
|
|
||||||
pub struct Channel {
|
pub struct Channel {
|
||||||
pub sequence: u16,
|
pub sequence: u16,
|
||||||
@@ -18,12 +18,14 @@ pub struct Channel {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Responses are received on the oneshot sender
|
/// Responses are received on the oneshot sender
|
||||||
|
#[derive(Debug)]
|
||||||
pub struct ChannelRequest {
|
pub struct ChannelRequest {
|
||||||
pub message: Vec<u8>,
|
pub message: Vec<u8>,
|
||||||
pub reply_tx: oneshot::Sender<ChannelReply>,
|
pub reply_tx: oneshot::Sender<ChannelReply>,
|
||||||
}
|
}
|
||||||
|
|
||||||
// mpsc reply
|
// mpsc reply
|
||||||
|
#[derive(Debug)]
|
||||||
pub struct ChannelReply {
|
pub struct ChannelReply {
|
||||||
pub reply: Vec<u8>,
|
pub reply: Vec<u8>,
|
||||||
}
|
}
|
||||||
@@ -31,7 +33,7 @@ pub struct ChannelReply {
|
|||||||
fn main() -> anyhow::Result<()> {
|
fn main() -> anyhow::Result<()> {
|
||||||
let parent_fd = open_parent_fd();
|
let parent_fd = open_parent_fd();
|
||||||
|
|
||||||
setup_logging("hsmd ", "info");
|
util::setup_logging("hsmd ", "info");
|
||||||
let app = App::new("signer")
|
let app = App::new("signer")
|
||||||
.setting(AppSettings::NoAutoVersion)
|
.setting(AppSettings::NoAutoVersion)
|
||||||
.about("CLN:mqtt - connects to an embedded VLS over a MQTT connection")
|
.about("CLN:mqtt - connects to an embedded VLS over a MQTT connection")
|
||||||
@@ -49,7 +51,7 @@ fn main() -> anyhow::Result<()> {
|
|||||||
// Pretend to be the right version, given to us by an env var
|
// Pretend to be the right version, given to us by an env var
|
||||||
let version =
|
let version =
|
||||||
env::var("GREENLIGHT_VERSION").expect("set GREENLIGHT_VERSION to match c-lightning");
|
env::var("GREENLIGHT_VERSION").expect("set GREENLIGHT_VERSION to match c-lightning");
|
||||||
println!("{}", version);
|
log::info!("{}", version);
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -57,12 +59,14 @@ fn main() -> anyhow::Result<()> {
|
|||||||
run_test::run_test();
|
run_test::run_test();
|
||||||
} else {
|
} else {
|
||||||
let (tx, rx) = mpsc::channel(1000);
|
let (tx, rx) = mpsc::channel(1000);
|
||||||
let runtime = start_broker(true, rx);
|
let (status_tx, _status_rx) = mpsc::channel(1000);
|
||||||
|
let runtime = start_broker(rx, status_tx, "sphinx-1");
|
||||||
runtime.block_on(async {
|
runtime.block_on(async {
|
||||||
init::connect(tx.clone()).await;
|
init::connect(tx.clone()).await;
|
||||||
// listen to reqs from CLN
|
// listen to reqs from CLN
|
||||||
let conn = UnixConnection::new(parent_fd);
|
let conn = UnixConnection::new(parent_fd);
|
||||||
let client = UnixClient::new(conn);
|
let client = UnixClient::new(conn);
|
||||||
|
// TODO pass status_rx into SignerLoop
|
||||||
let mut signer_loop = SignerLoop::new(client, tx);
|
let mut signer_loop = SignerLoop::new(client, tx);
|
||||||
signer_loop.start();
|
signer_loop.start();
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -1,60 +1,72 @@
|
|||||||
use crate::{ChannelReply, ChannelRequest};
|
use crate::{ChannelReply, ChannelRequest};
|
||||||
use librumqttd::{async_locallink::construct_broker, Config};
|
use librumqttd::{
|
||||||
|
async_locallink,
|
||||||
|
consolelink::{self, ConsoleLink},
|
||||||
|
rumqttlog::router::ConnectionMetrics,
|
||||||
|
Config,
|
||||||
|
};
|
||||||
|
|
||||||
|
use std::sync::Arc;
|
||||||
use std::thread;
|
use std::thread;
|
||||||
use tokio::sync::{mpsc, oneshot};
|
use std::time::Duration;
|
||||||
|
use tokio::sync::mpsc;
|
||||||
|
use tokio::time::timeout;
|
||||||
|
|
||||||
const SUB_TOPIC: &str = "sphinx-return";
|
const SUB_TOPIC: &str = "sphinx-return";
|
||||||
const PUB_TOPIC: &str = "sphinx";
|
const PUB_TOPIC: &str = "sphinx";
|
||||||
|
const USERNAME: &str = "sphinx-key";
|
||||||
|
const PASSWORD: &str = "sphinx-key-pass";
|
||||||
|
|
||||||
pub fn start_broker(
|
pub fn start_broker(
|
||||||
wait_for_ready_message: bool,
|
|
||||||
mut receiver: mpsc::Receiver<ChannelRequest>,
|
mut receiver: mpsc::Receiver<ChannelRequest>,
|
||||||
|
status_sender: mpsc::Sender<bool>,
|
||||||
|
expected_client_id: &str,
|
||||||
) -> tokio::runtime::Runtime {
|
) -> tokio::runtime::Runtime {
|
||||||
let config: Config = confy::load_path("config/rumqttd.conf").unwrap();
|
let config = config();
|
||||||
|
let client_id = expected_client_id.to_string();
|
||||||
|
|
||||||
let (mut router, console, servers, builder) = construct_broker(config);
|
let (mut router, servers, builder) = async_locallink::construct(config.clone());
|
||||||
|
|
||||||
thread::spawn(move || {
|
thread::spawn(move || {
|
||||||
router.start().expect("could not start router");
|
router.start().expect("could not start router");
|
||||||
});
|
});
|
||||||
|
|
||||||
|
let mut client_connected = false;
|
||||||
|
|
||||||
let mut rt_builder = tokio::runtime::Builder::new_multi_thread();
|
let mut rt_builder = tokio::runtime::Builder::new_multi_thread();
|
||||||
rt_builder.enable_all();
|
rt_builder.enable_all();
|
||||||
let rt = rt_builder.build().unwrap();
|
let rt = rt_builder.build().unwrap();
|
||||||
rt.block_on(async {
|
rt.block_on(async {
|
||||||
// channel to block until READY received
|
|
||||||
let (ready_tx, ready_rx) = oneshot::channel();
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let (msg_tx, mut msg_rx): (mpsc::Sender<Vec<u8>>, mpsc::Receiver<Vec<u8>>) =
|
let (msg_tx, mut msg_rx): (mpsc::Sender<Vec<u8>>, mpsc::Receiver<Vec<u8>>) =
|
||||||
mpsc::channel(1000);
|
mpsc::channel(1000);
|
||||||
let (mut tx, mut rx) = builder.connect("localclient", 200).await.unwrap();
|
let (mut link_tx, mut link_rx) =
|
||||||
tx.subscribe([SUB_TOPIC]).await.unwrap();
|
builder.clone().connect("localclient", 200).await.unwrap();
|
||||||
|
link_tx.subscribe([SUB_TOPIC]).await.unwrap();
|
||||||
|
|
||||||
let console_task = tokio::spawn(console);
|
let router_tx = builder.router_tx();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let config = config.clone().into();
|
||||||
|
let router_tx = router_tx.clone();
|
||||||
|
let console: Arc<ConsoleLink> = Arc::new(ConsoleLink::new(config, router_tx));
|
||||||
|
loop {
|
||||||
|
let metrics = consolelink::request_metrics(console.clone(), client_id.clone());
|
||||||
|
if let Some(c) = metrics_to_status(metrics, client_connected) {
|
||||||
|
client_connected = c;
|
||||||
|
status_sender
|
||||||
|
.send(c)
|
||||||
|
.await
|
||||||
|
.expect("couldnt send connection status");
|
||||||
|
}
|
||||||
|
tokio::time::sleep(Duration::from_millis(500)).await;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
let sub_task = tokio::spawn(async move {
|
let sub_task = tokio::spawn(async move {
|
||||||
// ready message loop
|
while let Ok(message) = link_rx.recv().await {
|
||||||
// let ready_tx_ = ready_tx.clone();
|
|
||||||
if wait_for_ready_message {
|
|
||||||
loop {
|
|
||||||
let message = rx.recv().await.unwrap();
|
|
||||||
if let Some(payload) = message.payload.get(0) {
|
|
||||||
let content = String::from_utf8_lossy(&payload[..]);
|
|
||||||
log::info!("received message content: {}", content);
|
|
||||||
if content == "READY" {
|
|
||||||
ready_tx.send(true).expect("could not send ready");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
loop {
|
|
||||||
let message = rx.recv().await.unwrap();
|
|
||||||
// println!("T = {}, P = {:?}", message.topic, message.payload.len());
|
|
||||||
// println!("count {}", message.payload.len());
|
|
||||||
for payload in message.payload {
|
for payload in message.payload {
|
||||||
if let Err(e) = msg_tx.send(payload.to_vec()).await {
|
if let Err(e) = msg_tx.send(payload.to_vec()).await {
|
||||||
println!("pub err {:?}", e);
|
log::warn!("pub err {:?}", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -62,12 +74,16 @@ pub fn start_broker(
|
|||||||
|
|
||||||
let relay_task = tokio::spawn(async move {
|
let relay_task = tokio::spawn(async move {
|
||||||
while let Some(msg) = receiver.recv().await {
|
while let Some(msg) = receiver.recv().await {
|
||||||
tx.publish(PUB_TOPIC, false, msg.message)
|
link_tx
|
||||||
|
.publish(PUB_TOPIC, false, msg.message)
|
||||||
.await
|
.await
|
||||||
.expect("could not mqtt pub");
|
.expect("could not mqtt pub");
|
||||||
let reply = msg_rx.recv().await.expect("could not unwrap msg_rx.recv()");
|
if let Ok(reply) = timeout(Duration::from_millis(1000), msg_rx.recv()).await {
|
||||||
if let Err(_) = msg.reply_tx.send(ChannelReply { reply }) {
|
if let Err(_) = msg.reply_tx.send(ChannelReply {
|
||||||
log::warn!("could not send on reply_tx");
|
reply: reply.unwrap(),
|
||||||
|
}) {
|
||||||
|
log::warn!("could not send on reply_tx");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@@ -75,13 +91,81 @@ pub fn start_broker(
|
|||||||
servers.await;
|
servers.await;
|
||||||
sub_task.await.unwrap();
|
sub_task.await.unwrap();
|
||||||
relay_task.await.unwrap();
|
relay_task.await.unwrap();
|
||||||
console_task.await.unwrap();
|
|
||||||
});
|
});
|
||||||
if wait_for_ready_message {
|
|
||||||
log::info!("waiting for READY...");
|
|
||||||
ready_rx.await.expect("Could not receive from channel.");
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
|
|
||||||
rt
|
rt
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn metrics_to_status(metrics: ConnectionMetrics, client_connected: bool) -> Option<bool> {
|
||||||
|
match metrics.tracker() {
|
||||||
|
Some(t) => {
|
||||||
|
// wait for subscription to be sure
|
||||||
|
if t.concrete_subscriptions_count() > 0 {
|
||||||
|
if !client_connected {
|
||||||
|
Some(true) // changed to true
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
if client_connected {
|
||||||
|
Some(false)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn config() -> Config {
|
||||||
|
use librumqttd::rumqttlog::Config as RouterConfig;
|
||||||
|
use librumqttd::{
|
||||||
|
ConnectionLoginCredentials, ConnectionSettings, ConsoleSettings, ServerSettings,
|
||||||
|
};
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::net::{Ipv4Addr, SocketAddrV4};
|
||||||
|
use std::path::PathBuf;
|
||||||
|
let id = 0;
|
||||||
|
let router = RouterConfig {
|
||||||
|
id,
|
||||||
|
dir: PathBuf::from("/tmp/rumqttd"),
|
||||||
|
max_segment_size: 10240,
|
||||||
|
max_segment_count: 10,
|
||||||
|
max_connections: 10001,
|
||||||
|
};
|
||||||
|
let mut servers = HashMap::new();
|
||||||
|
servers.insert(
|
||||||
|
"0".to_string(),
|
||||||
|
ServerSettings {
|
||||||
|
cert: None,
|
||||||
|
listen: SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 1883).into(),
|
||||||
|
next_connection_delay_ms: 1,
|
||||||
|
connections: ConnectionSettings {
|
||||||
|
connection_timeout_ms: 5000,
|
||||||
|
max_client_id_len: 256,
|
||||||
|
throttle_delay_ms: 0,
|
||||||
|
max_payload_size: 5120,
|
||||||
|
max_inflight_count: 200,
|
||||||
|
max_inflight_size: 1024,
|
||||||
|
login_credentials: Some(vec![ConnectionLoginCredentials {
|
||||||
|
username: USERNAME.to_string(),
|
||||||
|
password: PASSWORD.to_string(),
|
||||||
|
}]),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
);
|
||||||
|
Config {
|
||||||
|
id,
|
||||||
|
servers,
|
||||||
|
router,
|
||||||
|
console: ConsoleSettings {
|
||||||
|
listen: SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 3030).into(),
|
||||||
|
},
|
||||||
|
cluster: None,
|
||||||
|
replicator: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -5,6 +5,8 @@ use tokio::sync::{mpsc, oneshot};
|
|||||||
use vls_protocol::serde_bolt::WireString;
|
use vls_protocol::serde_bolt::WireString;
|
||||||
use vls_protocol::{msgs, msgs::Message};
|
use vls_protocol::{msgs, msgs::Message};
|
||||||
|
|
||||||
|
const CLIENT_ID: &str = "test-1";
|
||||||
|
|
||||||
pub fn run_test() {
|
pub fn run_test() {
|
||||||
log::info!("TEST...");
|
log::info!("TEST...");
|
||||||
|
|
||||||
@@ -12,16 +14,31 @@ pub fn run_test() {
|
|||||||
let mut sequence = 1;
|
let mut sequence = 1;
|
||||||
|
|
||||||
let (tx, rx) = mpsc::channel(1000);
|
let (tx, rx) = mpsc::channel(1000);
|
||||||
let runtime = start_broker(true, rx);
|
let (status_tx, mut status_rx) = mpsc::channel(1000);
|
||||||
log::info!("======> READY received! start now");
|
let runtime = start_broker(rx, status_tx, CLIENT_ID);
|
||||||
runtime.block_on(async {
|
runtime.block_on(async {
|
||||||
|
let mut connected = false;
|
||||||
loop {
|
loop {
|
||||||
if let Err(e) = iteration(id, sequence, tx.clone()).await {
|
tokio::select! {
|
||||||
panic!("iteration failed {:?}", e);
|
status = status_rx.recv() => {
|
||||||
}
|
if let Some(connection_status) = status {
|
||||||
sequence = sequence.wrapping_add(1);
|
connected = connection_status;
|
||||||
id += 1;
|
id = 0;
|
||||||
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
|
sequence = 1;
|
||||||
|
log::info!("========> CONNECTED! {}", connection_status);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
res = iteration(id, sequence, tx.clone(), connected) => {
|
||||||
|
if let Err(e) = res {
|
||||||
|
log::warn!("===> iteration failed {:?}", e);
|
||||||
|
}
|
||||||
|
if connected {
|
||||||
|
sequence = sequence.wrapping_add(1);
|
||||||
|
id += 1;
|
||||||
|
}
|
||||||
|
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
|
||||||
|
}
|
||||||
|
};
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@@ -30,7 +47,12 @@ pub async fn iteration(
|
|||||||
id: u16,
|
id: u16,
|
||||||
sequence: u16,
|
sequence: u16,
|
||||||
tx: mpsc::Sender<ChannelRequest>,
|
tx: mpsc::Sender<ChannelRequest>,
|
||||||
|
connected: bool,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
|
if !connected {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
// log::info!("do a ping!");
|
||||||
let ping = msgs::Ping {
|
let ping = msgs::Ping {
|
||||||
id,
|
id,
|
||||||
message: WireString("ping".as_bytes().to_vec()),
|
message: WireString("ping".as_bytes().to_vec()),
|
||||||
@@ -42,7 +64,7 @@ pub async fn iteration(
|
|||||||
message: ping_bytes,
|
message: ping_bytes,
|
||||||
reply_tx,
|
reply_tx,
|
||||||
};
|
};
|
||||||
let _ = tx.send(request).await;
|
tx.send(request).await?;
|
||||||
let res = reply_rx.await?;
|
let res = reply_rx.await?;
|
||||||
let reply = parser::response_from_bytes(res.reply, sequence)?;
|
let reply = parser::response_from_bytes(res.reply, sequence)?;
|
||||||
match reply {
|
match reply {
|
||||||
|
|||||||
@@ -1,59 +0,0 @@
|
|||||||
|
|
||||||
use sphinx_key_parser::MsgDriver;
|
|
||||||
|
|
||||||
use tokio::{task, time};
|
|
||||||
use rumqttc::{self, AsyncClient, MqttOptions, QoS, Event, Packet};
|
|
||||||
use std::error::Error;
|
|
||||||
use std::time::Duration;
|
|
||||||
use vls_protocol::msgs;
|
|
||||||
|
|
||||||
#[tokio::main(worker_threads = 1)]
|
|
||||||
async fn main() -> Result<(), Box<dyn Error>> {
|
|
||||||
pretty_env_logger::init();
|
|
||||||
// color_backtrace::install();
|
|
||||||
|
|
||||||
let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
|
|
||||||
mqttoptions.set_credentials("sphinx-key", "sphinx-key-pass");
|
|
||||||
mqttoptions.set_keep_alive(Duration::from_secs(5));
|
|
||||||
|
|
||||||
let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10);
|
|
||||||
task::spawn(async move {
|
|
||||||
requests(client).await;
|
|
||||||
time::sleep(Duration::from_secs(3)).await;
|
|
||||||
});
|
|
||||||
|
|
||||||
loop {
|
|
||||||
let event = eventloop.poll().await;
|
|
||||||
// println!("{:?}", event.unwrap());
|
|
||||||
if let Event::Incoming(packet) = event.unwrap() {
|
|
||||||
if let Packet::Publish(p) = packet {
|
|
||||||
let mut m = MsgDriver::new(p.payload.to_vec());
|
|
||||||
let (sequence, dbid) = msgs::read_serial_request_header(&mut m).expect("read ping header");
|
|
||||||
assert_eq!(dbid, 0);
|
|
||||||
assert_eq!(sequence, 0);
|
|
||||||
let ping: msgs::Ping =
|
|
||||||
msgs::read_message(&mut m).expect("failed to read ping message");
|
|
||||||
println!("INCOMING: {:?}", ping);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn requests(client: AsyncClient) {
|
|
||||||
|
|
||||||
client
|
|
||||||
.subscribe("sphinx", QoS::AtMostOnce)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
for _ in 1..=10 {
|
|
||||||
client
|
|
||||||
.publish("trigger", QoS::AtMostOnce, false, vec![1; 1])
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
time::sleep(Duration::from_secs(1)).await;
|
|
||||||
}
|
|
||||||
|
|
||||||
time::sleep(Duration::from_secs(120)).await;
|
|
||||||
}
|
|
||||||
34
broker/src/util.rs
Normal file
34
broker/src/util.rs
Normal file
@@ -0,0 +1,34 @@
|
|||||||
|
use std::env;
|
||||||
|
use std::str::FromStr;
|
||||||
|
|
||||||
|
pub fn setup_logging(who: &str, level_arg: &str) {
|
||||||
|
use fern::colors::{Color, ColoredLevelConfig};
|
||||||
|
let colors = ColoredLevelConfig::new()
|
||||||
|
.info(Color::Green)
|
||||||
|
.error(Color::Red)
|
||||||
|
.warn(Color::Yellow);
|
||||||
|
let level = env::var("RUST_LOG").unwrap_or(level_arg.to_string());
|
||||||
|
let who = who.to_string();
|
||||||
|
fern::Dispatch::new()
|
||||||
|
.format(move |out, message, record| {
|
||||||
|
out.finish(format_args!(
|
||||||
|
"[{} {}/{} {}] {}",
|
||||||
|
chrono::Local::now().format("%Y-%m-%dT%H:%M:%S%.3f"),
|
||||||
|
who,
|
||||||
|
record.target(),
|
||||||
|
colors.color(record.level()),
|
||||||
|
message
|
||||||
|
))
|
||||||
|
})
|
||||||
|
.level(log::LevelFilter::from_str(&level).expect("level"))
|
||||||
|
.level_for("h2", log::LevelFilter::Info)
|
||||||
|
.level_for("sled", log::LevelFilter::Info)
|
||||||
|
.level_for(
|
||||||
|
"librumqttd::rumqttlog::router::router",
|
||||||
|
log::LevelFilter::Warn,
|
||||||
|
)
|
||||||
|
.chain(std::io::stdout())
|
||||||
|
// .chain(fern::log_file("/tmp/output.log")?)
|
||||||
|
.apply()
|
||||||
|
.expect("log config");
|
||||||
|
}
|
||||||
@@ -20,97 +20,116 @@ async fn main() -> Result<(), Box<dyn Error>> {
|
|||||||
.setting(AppSettings::NoAutoVersion)
|
.setting(AppSettings::NoAutoVersion)
|
||||||
.about("CLN:mqtt-tester - MQTT client signer")
|
.about("CLN:mqtt-tester - MQTT client signer")
|
||||||
.arg(Arg::from("--test run a test against the embedded device"));
|
.arg(Arg::from("--test run a test against the embedded device"));
|
||||||
|
let matches = app.get_matches();
|
||||||
let mut try_i = 0;
|
let is_test = matches.is_present("test");
|
||||||
let (client, mut eventloop) = loop {
|
// main loop - alternate between "reconnection" and "handler"
|
||||||
let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
|
loop {
|
||||||
mqttoptions.set_credentials(USERNAME, PASSWORD);
|
let mut try_i = 0;
|
||||||
mqttoptions.set_keep_alive(Duration::from_secs(5));
|
let (client, mut eventloop) = loop {
|
||||||
let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10);
|
let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
|
||||||
match eventloop.poll().await {
|
mqttoptions.set_credentials(USERNAME, PASSWORD);
|
||||||
Ok(event) => {
|
mqttoptions.set_keep_alive(Duration::from_secs(5));
|
||||||
if let Some(_) = incoming_conn_ack(event) {
|
let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10);
|
||||||
println!("==========> MQTT connected!");
|
match eventloop.poll().await {
|
||||||
break (client, eventloop);
|
Ok(event) => {
|
||||||
|
if let Some(_) = incoming_conn_ack(event) {
|
||||||
|
println!("==========> MQTT connected!");
|
||||||
|
break (client, eventloop);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
try_i = try_i + 1;
|
||||||
|
println!("reconnect.... {}", try_i);
|
||||||
|
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(_) => {
|
|
||||||
try_i = try_i + 1;
|
|
||||||
println!("reconnect.... {}", try_i);
|
|
||||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
client
|
|
||||||
.subscribe(SUB_TOPIC, QoS::AtMostOnce)
|
|
||||||
.await
|
|
||||||
.expect("could not mqtt subscribe");
|
|
||||||
|
|
||||||
client
|
|
||||||
.publish(
|
|
||||||
PUB_TOPIC,
|
|
||||||
QoS::AtMostOnce,
|
|
||||||
false,
|
|
||||||
"READY".as_bytes().to_vec(),
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.expect("could not pub");
|
|
||||||
|
|
||||||
let matches = app.get_matches();
|
|
||||||
if matches.is_present("test") {
|
|
||||||
loop {
|
|
||||||
let event = eventloop.poll().await.expect("failed to unwrap event");
|
|
||||||
// println!("{:?}", event);
|
|
||||||
if let Some(ping_bytes) = incoming_bytes(event) {
|
|
||||||
let (ping, sequence, dbid): (msgs::Ping, u16, u64) =
|
|
||||||
parser::request_from_bytes(ping_bytes).expect("read ping header");
|
|
||||||
println!("sequence {}", sequence);
|
|
||||||
println!("dbid {}", dbid);
|
|
||||||
println!("INCOMING: {:?}", ping);
|
|
||||||
let pong = msgs::Pong {
|
|
||||||
id: ping.id,
|
|
||||||
message: ping.message,
|
|
||||||
};
|
|
||||||
let bytes = parser::raw_response_from_msg(pong, sequence)
|
|
||||||
.expect("couldnt parse raw response");
|
|
||||||
client
|
|
||||||
.publish(PUB_TOPIC, QoS::AtMostOnce, false, bytes)
|
|
||||||
.await
|
|
||||||
.expect("could not mqtt publish");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// once the init loop is done, the root_handler is returned
|
|
||||||
let root_handler = loop {
|
|
||||||
let init_event = eventloop.poll().await.expect("failed to unwrap event");
|
|
||||||
// this may be another kind of message like MQTT ConnAck
|
|
||||||
// loop around again and wait for the init
|
|
||||||
if let Some(init_msg_bytes) = incoming_bytes(init_event) {
|
|
||||||
let InitResponse {
|
|
||||||
root_handler,
|
|
||||||
init_reply,
|
|
||||||
} = sphinx_key_signer::init(init_msg_bytes).expect("failed to init signer");
|
|
||||||
client
|
|
||||||
.publish(PUB_TOPIC, QoS::AtMostOnce, false, init_reply)
|
|
||||||
.await
|
|
||||||
.expect("could not publish init response");
|
|
||||||
// return the root_handler and finish the init loop
|
|
||||||
break root_handler;
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
// the actual loop
|
|
||||||
loop {
|
client
|
||||||
let event = eventloop.poll().await.expect("failed to unwrap event");
|
.subscribe(SUB_TOPIC, QoS::AtMostOnce)
|
||||||
let dummy_peer = PubKey([0; 33]);
|
.await
|
||||||
if let Some(msg_bytes) = incoming_bytes(event) {
|
.expect("could not mqtt subscribe");
|
||||||
match sphinx_key_signer::handle(&root_handler, msg_bytes, dummy_peer.clone()) {
|
|
||||||
Ok(b) => client
|
if is_test {
|
||||||
.publish(PUB_TOPIC, QoS::AtMostOnce, false, b)
|
// test handler loop
|
||||||
.await
|
loop {
|
||||||
.expect("could not publish init response"),
|
match eventloop.poll().await {
|
||||||
Err(e) => panic!("HANDLE FAILED {:?}", e),
|
Ok(event) => {
|
||||||
};
|
// println!("{:?}", event);
|
||||||
|
if let Some(ping_bytes) = incoming_bytes(event) {
|
||||||
|
let (ping, sequence, dbid): (msgs::Ping, u16, u64) =
|
||||||
|
parser::request_from_bytes(ping_bytes).expect("read ping header");
|
||||||
|
println!("sequence {}", sequence);
|
||||||
|
println!("dbid {}", dbid);
|
||||||
|
println!("INCOMING: {:?}", ping);
|
||||||
|
let pong = msgs::Pong {
|
||||||
|
id: ping.id,
|
||||||
|
message: ping.message,
|
||||||
|
};
|
||||||
|
let bytes = parser::raw_response_from_msg(pong, sequence)
|
||||||
|
.expect("couldnt parse raw response");
|
||||||
|
client
|
||||||
|
.publish(PUB_TOPIC, QoS::AtMostOnce, false, bytes)
|
||||||
|
.await
|
||||||
|
.expect("could not mqtt publish");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
log::warn!("diconnected {:?}", e);
|
||||||
|
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||||
|
break; // break out of this loop to reconnect
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// once the init loop is done, the root_handler is returned
|
||||||
|
let root_handler = loop {
|
||||||
|
if let Ok(init_event) = eventloop.poll().await {
|
||||||
|
// this may be another kind of message like MQTT ConnAck
|
||||||
|
// loop around again and wait for the init
|
||||||
|
if let Some(init_msg_bytes) = incoming_bytes(init_event) {
|
||||||
|
let InitResponse {
|
||||||
|
root_handler,
|
||||||
|
init_reply,
|
||||||
|
} = sphinx_key_signer::init(init_msg_bytes).expect("failed to init signer");
|
||||||
|
client
|
||||||
|
.publish(PUB_TOPIC, QoS::AtMostOnce, false, init_reply)
|
||||||
|
.await
|
||||||
|
.expect("could not publish init response");
|
||||||
|
// return the root_handler and finish the init loop
|
||||||
|
break Some(root_handler);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||||
|
log::warn!("failed to initialize! Lost connection");
|
||||||
|
break None;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
// the actual handler loop
|
||||||
|
loop {
|
||||||
|
if let Some(rh) = &root_handler {
|
||||||
|
match eventloop.poll().await {
|
||||||
|
Ok(event) => {
|
||||||
|
let dummy_peer = PubKey([0; 33]);
|
||||||
|
if let Some(msg_bytes) = incoming_bytes(event) {
|
||||||
|
match sphinx_key_signer::handle(rh, msg_bytes, dummy_peer.clone()) {
|
||||||
|
Ok(b) => client
|
||||||
|
.publish(PUB_TOPIC, QoS::AtMostOnce, false, b)
|
||||||
|
.await
|
||||||
|
.expect("could not publish init response"),
|
||||||
|
Err(e) => panic!("HANDLE FAILED {:?}", e),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
log::warn!("diconnected {:?}", e);
|
||||||
|
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||||
|
break; // break out of this loop to reconnect
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user