diff --git a/broker/src/main.rs b/broker/src/main.rs index 2752780..401833f 100644 --- a/broker/src/main.rs +++ b/broker/src/main.rs @@ -101,7 +101,9 @@ async fn run_main(parent_fd: i32) -> rocket::Rocket { error_log::log_errors(error_rx); log::info!("=> start broker on network: {}", settings.network); - start_broker(rx, status_tx, error_tx.clone(), CLIENT_ID, settings).await; + start_broker(rx, status_tx, error_tx.clone(), CLIENT_ID, settings) + .await + .expect("BROKER FAILED TO START"); log::info!("=> wait for connected status"); // wait for connection = true let status = status_rx.recv().await.expect("couldnt receive"); diff --git a/broker/src/mqtt.rs b/broker/src/mqtt.rs index c3f2d09..4333cf9 100644 --- a/broker/src/mqtt.rs +++ b/broker/src/mqtt.rs @@ -1,6 +1,5 @@ use crate::util::Settings; use crate::{ChannelReply, ChannelRequest}; -use rocket::tokio::task; use rocket::tokio::time::timeout; use rocket::tokio::{self, sync::broadcast, sync::mpsc}; use rumqttd::{Alert, AlertEvent, Broker, Config, Notification}; @@ -31,7 +30,7 @@ pub async fn start_broker( // connected/disconnected status alerts let status_sender_ = status_sender.clone(); - let alerts_handle = tokio::spawn(async move { + let _alerts_handle = tokio::spawn(async move { loop { let alert = alerts.poll(); println!("Alert: {alert:?}"); @@ -60,8 +59,9 @@ pub async fn start_broker( link_tx.subscribe(topics::VLS_RETURN)?; link_tx.subscribe(topics::CONTROL_RETURN)?; link_tx.subscribe(topics::ERROR)?; - let sub_task = tokio::spawn(async move { + let _sub_task = tokio::spawn(async move { while let Ok(message) = link_rx.recv() { + println!("MESG RECEIVED!!!!!! {:?}", message); if let Some(n) = message { match n { Notification::Forward(f) => { @@ -76,9 +76,9 @@ pub async fn start_broker( } } }); - let relay_task = tokio::spawn(async move { + let _relay_task = tokio::spawn(async move { while let Some(msg) = receiver.recv().await { - link_tx.publish(msg.topic, msg.message); + let _ = link_tx.publish(msg.topic, msg.message); match timeout(Duration::from_millis(REPLY_TIMEOUT_MS), msg_rx.recv()).await { Ok(reply) => { if let Err(_) = msg.reply_tx.send(ChannelReply { @@ -95,9 +95,12 @@ pub async fn start_broker( } }); - alerts_handle.await?; - sub_task.await?; - relay_task.await?; + println!("wait..."); + tokio::time::sleep(Duration::from_secs(2)).await; + println!("done waiting"); + // alerts_handle.await?; + // sub_task.await?; + // relay_task.await?; Ok(()) } diff --git a/broker/src/run_test.rs b/broker/src/run_test.rs index add9211..de1fc21 100644 --- a/broker/src/run_test.rs +++ b/broker/src/run_test.rs @@ -3,9 +3,9 @@ use crate::routes::launch_rocket; use crate::util::Settings; use crate::ChannelRequest; use rocket::tokio::{self, sync::broadcast, sync::mpsc}; +use sphinx_signer::{parser, sphinx_glyph::topics}; use vls_protocol::serde_bolt::WireString; use vls_protocol::{msgs, msgs::Message}; -use sphinx_signer::{parser, sphinx_glyph::topics}; const CLIENT_ID: &str = "test-1"; @@ -22,7 +22,10 @@ pub async fn run_test() -> rocket::Rocket { let (error_tx, error_rx) = broadcast::channel(1000); crate::error_log::log_errors(error_rx); - start_broker(rx, status_tx, error_tx.clone(), CLIENT_ID, settings).await; + start_broker(rx, status_tx, error_tx.clone(), CLIENT_ID, settings) + .await + .expect("FAILED TO START BROKER"); + log::info!("BROKER started!"); let mut connected = false; let tx_ = tx.clone(); tokio::spawn(async move { diff --git a/broker/src/util.rs b/broker/src/util.rs index 670d3e7..8b15b1f 100644 --- a/broker/src/util.rs +++ b/broker/src/util.rs @@ -83,11 +83,8 @@ pub fn setup_logging(who: &str, level_arg: &str) { .level(log::LevelFilter::from_str(&level).expect("level")) .level_for("h2", log::LevelFilter::Info) .level_for("sled", log::LevelFilter::Info) - .level_for("librumqttd::async_locallink", log::LevelFilter::Error) - .level_for( - "librumqttd::rumqttlog::router::router", - log::LevelFilter::Warn, - ) + .level_for("rumqttd", log::LevelFilter::Warn) + .level_for("rocket", log::LevelFilter::Warn) .chain(std::io::stdout()) // .chain(fern::log_file("/tmp/output.log")?) .apply()