diff --git a/broker/Cargo.toml b/broker/Cargo.toml index a48e65a..deb5652 100644 --- a/broker/Cargo.toml +++ b/broker/Cargo.toml @@ -6,7 +6,8 @@ default-run = "sphinx-key-broker" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -rumqttd = "0.11.0" +# rumqttd = "0.11.0" +rumqttd = { path = "../../rumqtt/rumqttd" } pretty_env_logger = "0.4.0" confy = "0.4.0" tokio = { version = "1.4.0", features = ["rt", "rt-multi-thread", "macros"] } @@ -20,6 +21,7 @@ fern = { version = "0.6", features = ["colored"] } rumqttc = "0.12.0" clap = "=3.0.0-beta.2" clap_derive = "=3.0.0-beta.5" +chrono = "0.4" [features] default = ["std"] diff --git a/broker/src/main.rs b/broker/src/main.rs index 8c83ac8..ad392cb 100644 --- a/broker/src/main.rs +++ b/broker/src/main.rs @@ -1,15 +1,15 @@ mod mqtt; mod run_test; mod unix_fd; +mod util; +use crate::mqtt::start_broker; use crate::unix_fd::SignerLoop; use clap::{App, AppSettings, Arg}; -use crate::mqtt::start_broker; use std::env; use tokio::sync::{mpsc, oneshot}; use vls_proxy::client::UnixClient; use vls_proxy::connection::{open_parent_fd, UnixConnection}; -use vls_proxy::util::setup_logging; pub struct Channel { pub sequence: u16, @@ -30,7 +30,12 @@ pub struct ChannelReply { fn main() -> anyhow::Result<()> { let parent_fd = open_parent_fd(); - setup_logging("hsmd ", "info"); + /* + simple_logger::SimpleLogger::new() + .with_utc_timestamps() + .with_module_level("async_io", log::LevelFilter::Off) + */ + util::setup_logging("hsmd ", "info"); let app = App::new("signer") .setting(AppSettings::NoAutoVersion) .about("CLN:mqtt - connects to an embedded VLS over a MQTT connection") @@ -56,7 +61,8 @@ fn main() -> anyhow::Result<()> { run_test::run_test(); } else { let (tx, rx) = mpsc::channel(1000); - let _runtime = start_broker(true, rx); + let (status_tx, status_rx) = mpsc::channel(1000); + let _runtime = start_broker(rx, status_tx, "sphinx-1"); // listen to reqs from CLN let conn = UnixConnection::new(parent_fd); let client = UnixClient::new(conn); diff --git a/broker/src/mqtt.rs b/broker/src/mqtt.rs index 8ad5fce..712aa74 100644 --- a/broker/src/mqtt.rs +++ b/broker/src/mqtt.rs @@ -1,63 +1,118 @@ use crate::{ChannelReply, ChannelRequest}; -use librumqttd::{async_locallink::construct_broker, Config}; +use librumqttd::{ + async_locallink::construct_broker, + consolelink::{self, ConsoleLink}, + Config, +}; +use std::sync::Arc; use std::thread; -use tokio::sync::{mpsc, oneshot}; +use std::time::Duration; +use tokio::sync::mpsc; const SUB_TOPIC: &str = "sphinx-return"; const PUB_TOPIC: &str = "sphinx"; pub fn start_broker( - wait_for_ready_message: bool, mut receiver: mpsc::Receiver, + status_sender: mpsc::Sender, + expected_client_id: &str, ) -> tokio::runtime::Runtime { let config: Config = confy::load_path("config/rumqttd.conf").unwrap(); + let client_id = expected_client_id.to_string(); - let (mut router, console, servers, builder) = construct_broker(config); + let (mut router, servers, builder) = construct_broker(config.clone()); thread::spawn(move || { router.start().expect("could not start router"); }); + let mut client_connected = false; + + // let (status_tx, mut status_rx): (mpsc::Sender, mpsc::Receiver) = + // mpsc::channel(1000); + let mut rt_builder = tokio::runtime::Builder::new_multi_thread(); rt_builder.enable_all(); let rt = rt_builder.build().unwrap(); rt.block_on(async { - // channel to block until READY received - let (ready_tx, ready_rx) = oneshot::channel(); tokio::spawn(async move { let (msg_tx, mut msg_rx): (mpsc::Sender>, mpsc::Receiver>) = mpsc::channel(1000); - let (mut tx, mut rx) = builder.connect("localclient", 200).await.unwrap(); + let (mut tx, mut rx) = builder.clone().connect("localclient", 200).await.unwrap(); tx.subscribe([SUB_TOPIC]).await.unwrap(); - let console_task = tokio::spawn(console); + let router_tx = builder.router_tx(); + tokio::spawn(async move { + let config = config.clone().into(); + let router_tx = router_tx.clone(); + let console: Arc = Arc::new(ConsoleLink::new(config, router_tx)); + loop { + let metrics = consolelink::request_metrics(console.clone(), client_id.clone()); + match metrics.tracker() { + Some(t) => { + // wait for subscription to be sure + if t.concrete_subscriptions_len() > 0 { + if !client_connected { + println!("CLIENT CONNECTED!"); + client_connected = true; + status_sender + .send(true) + .await + .expect("couldnt send true statu"); + } + } + } + None => { + if client_connected { + println!("CLIENT DIsCONNECTED!"); + client_connected = false; + status_sender + .send(false) + .await + .expect("couldnt send false status"); + } + } + } + tokio::time::sleep(Duration::from_millis(850)).await; + } + }); let sub_task = tokio::spawn(async move { // ready message loop // let ready_tx_ = ready_tx.clone(); - if wait_for_ready_message { + loop { + // wait for CONNECTED + // loop { + // let status = status_rx.recv().await.unwrap(); + // if status { + // break; + // } + // } + // now wait for READY + // loop { + // let message = rx.recv().await.unwrap(); + // if let Some(payload) = message.payload.get(0) { + // let content = String::from_utf8_lossy(&payload[..]); + // log::info!("received message content: {}", content); + // if content == "READY" { + // // ready_tx.send(true).expect("could not send ready"); + // break; + // } + // } + // } + // now start parsing... or break for DISCONNECT + println!("OK START PARSING!"); loop { let message = rx.recv().await.unwrap(); - if let Some(payload) = message.payload.get(0) { - let content = String::from_utf8_lossy(&payload[..]); - log::info!("received message content: {}", content); - if content == "READY" { - ready_tx.send(true).expect("could not send ready"); - break; + println!("T = {}, P = {:?}", message.topic, message.payload.len()); + // println!("count {}", message.payload.len()); + for payload in message.payload { + if let Err(e) = msg_tx.send(payload.to_vec()).await { + println!("pub err {:?}", e); } } } } - loop { - let message = rx.recv().await.unwrap(); - // println!("T = {}, P = {:?}", message.topic, message.payload.len()); - // println!("count {}", message.payload.len()); - for payload in message.payload { - if let Err(e) = msg_tx.send(payload.to_vec()).await { - println!("pub err {:?}", e); - } - } - } }); let relay_task = tokio::spawn(async move { @@ -75,12 +130,7 @@ pub fn start_broker( servers.await; sub_task.await.unwrap(); relay_task.await.unwrap(); - console_task.await.unwrap(); }); - if wait_for_ready_message { - log::info!("waiting for READY..."); - ready_rx.await.expect("Could not receive from channel."); - } }); rt diff --git a/broker/src/run_test.rs b/broker/src/run_test.rs index f856ac9..865d570 100644 --- a/broker/src/run_test.rs +++ b/broker/src/run_test.rs @@ -12,16 +12,30 @@ pub fn run_test() { let mut sequence = 1; let (tx, rx) = mpsc::channel(1000); - let runtime = start_broker(true, rx); + let (status_tx, mut status_rx) = mpsc::channel(1000); + let runtime = start_broker(rx, status_tx, "test-1"); log::info!("======> READY received! start now"); runtime.block_on(async { + let mut connected = false; loop { - if let Err(e) = iteration(id, sequence, tx.clone()).await { - panic!("iteration failed {:?}", e); - } - sequence = sequence.wrapping_add(1); - id += 1; - tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + tokio::select! { + status = status_rx.recv() => { + // println!("got a status"); + if let Some(connection_status) = status { + connected = connected; + println!("========> CONNETED! {}", connection_status); + } + } + res = iteration(id, sequence, tx.clone(), connected) => { + println!("iteration! connected: {}", connected); + if let Err(e) = res { + panic!("iteration failed {:?}", e); + } + sequence = sequence.wrapping_add(1); + id += 1; + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + } + }; } }); } @@ -30,7 +44,12 @@ pub async fn iteration( id: u16, sequence: u16, tx: mpsc::Sender, + connected: bool, ) -> anyhow::Result<()> { + if !connected { + return Ok(()); + } + println!("do a ping!"); let ping = msgs::Ping { id, message: WireString("ping".as_bytes().to_vec()), diff --git a/broker/src/util.rs b/broker/src/util.rs new file mode 100644 index 0000000..a2ae7d7 --- /dev/null +++ b/broker/src/util.rs @@ -0,0 +1,34 @@ +use std::env; +use std::str::FromStr; + +pub fn setup_logging(who: &str, level_arg: &str) { + use fern::colors::{Color, ColoredLevelConfig}; + let colors = ColoredLevelConfig::new() + .info(Color::Green) + .error(Color::Red) + .warn(Color::Yellow); + let level = env::var("RUST_LOG").unwrap_or(level_arg.to_string()); + let who = who.to_string(); + fern::Dispatch::new() + .format(move |out, message, record| { + out.finish(format_args!( + "[{} {}/{} {}] {}", + chrono::Local::now().format("%Y-%m-%dT%H:%M:%S%.3f"), + who, + record.target(), + colors.color(record.level()), + message + )) + }) + .level(log::LevelFilter::from_str(&level).expect("level")) + .level_for("h2", log::LevelFilter::Info) + .level_for("sled", log::LevelFilter::Info) + .level_for( + "librumqttd::rumqttlog::router::router", + log::LevelFilter::Warn, + ) + .chain(std::io::stdout()) + // .chain(fern::log_file("/tmp/output.log")?) + .apply() + .expect("log config"); +} diff --git a/tester/src/main.rs b/tester/src/main.rs index 43ebe9c..df9fe5b 100644 --- a/tester/src/main.rs +++ b/tester/src/main.rs @@ -47,15 +47,15 @@ async fn main() -> Result<(), Box> { .await .expect("could not mqtt subscribe"); - client - .publish( - PUB_TOPIC, - QoS::AtMostOnce, - false, - "READY".as_bytes().to_vec(), - ) - .await - .expect("could not pub"); + // client + // .publish( + // PUB_TOPIC, + // QoS::AtMostOnce, + // false, + // "READY".as_bytes().to_vec(), + // ) + // .await + // .expect("could not pub"); let matches = app.get_matches(); if matches.is_present("test") {