From 06043fa60d13b24d1e33e83ec23ea8f18d2a6ce9 Mon Sep 17 00:00:00 2001 From: Evan Feenstra Date: Fri, 10 Feb 2023 13:40:25 -0800 Subject: [PATCH] cmd testing --- broker/src/main.rs | 1 - broker/src/mqtt.rs | 36 ++++++++++++++---------- broker/src/routes.rs | 1 + broker/src/run_test.rs | 63 +++++++++++++++++++++--------------------- broker/src/util.rs | 2 ++ tester/cmd.json | 6 +--- tester/src/main.rs | 2 +- 7 files changed, 58 insertions(+), 53 deletions(-) diff --git a/broker/src/main.rs b/broker/src/main.rs index 401833f..7e736e0 100644 --- a/broker/src/main.rs +++ b/broker/src/main.rs @@ -102,7 +102,6 @@ async fn run_main(parent_fd: i32) -> rocket::Rocket { log::info!("=> start broker on network: {}", settings.network); start_broker(rx, status_tx, error_tx.clone(), CLIENT_ID, settings) - .await .expect("BROKER FAILED TO START"); log::info!("=> wait for connected status"); // wait for connection = true diff --git a/broker/src/mqtt.rs b/broker/src/mqtt.rs index 4333cf9..50d069c 100644 --- a/broker/src/mqtt.rs +++ b/broker/src/mqtt.rs @@ -9,7 +9,7 @@ use std::time::Duration; // must get a reply within this time, or disconnects const REPLY_TIMEOUT_MS: u64 = 10000; -pub async fn start_broker( +pub fn start_broker( mut receiver: mpsc::Receiver, status_sender: mpsc::Sender, error_sender: broadcast::Sender>, @@ -19,21 +19,24 @@ pub async fn start_broker( let conf = config(settings); let client_id = expected_client_id.to_string(); - let broker = Broker::new(conf); + let mut broker = Broker::new(conf); let mut alerts = broker .alerts(vec![ // "/alerts/error/+".to_string(), "/alerts/event/connect/+".to_string(), "/alerts/event/disconnect/+".to_string(), - ]) - .unwrap(); + ])?; + let (mut link_tx, mut link_rx) = broker.link("localclient")?; + std::thread::spawn(move || { + broker.start().expect("could not start broker"); + }); // connected/disconnected status alerts let status_sender_ = status_sender.clone(); let _alerts_handle = tokio::spawn(async move { loop { let alert = alerts.poll(); - println!("Alert: {alert:?}"); + println!("Alert: {:?}", alert); match alert.1 { Alert::Event(cid, event) => { if cid == client_id { @@ -48,12 +51,11 @@ pub async fn start_broker( } _ => (), } - tokio::time::sleep(Duration::from_millis(333)).await; + tokio::time::sleep(Duration::from_millis(40)).await; } }); // msg forwarding - let (mut link_tx, mut link_rx) = broker.link("localclient")?; let (msg_tx, mut msg_rx): (mpsc::Sender>, mpsc::Receiver>) = mpsc::channel(1000); link_tx.subscribe(topics::VLS_RETURN)?; @@ -61,14 +63,17 @@ pub async fn start_broker( link_tx.subscribe(topics::ERROR)?; let _sub_task = tokio::spawn(async move { while let Ok(message) = link_rx.recv() { - println!("MESG RECEIVED!!!!!! {:?}", message); + println!("GOT A MSG ON LINK RX"); if let Some(n) = message { match n { Notification::Forward(f) => { + println!("GOT A FORWARDED MSG! FORWARD!"); if f.publish.topic == topics::ERROR { let _ = error_sender.send(f.publish.topic.to_vec()); } else { - let _ = msg_tx.send(f.publish.payload.to_vec()).await; + if let Err(e) = msg_tx.send(f.publish.payload.to_vec()).await { + log::error!("failed to pub to msg_tx! {:?}", e); + } } } _ => (), @@ -78,9 +83,13 @@ pub async fn start_broker( }); let _relay_task = tokio::spawn(async move { while let Some(msg) = receiver.recv().await { - let _ = link_tx.publish(msg.topic, msg.message); + println!("YO YO YO got a receiver msg! {:?}", msg); + if let Err(e) = link_tx.publish(msg.topic, msg.message) { + log::error!("failed to pub to link_tx! {:?}", e); + } match timeout(Duration::from_millis(REPLY_TIMEOUT_MS), msg_rx.recv()).await { Ok(reply) => { + println!("send on channelreply!"); if let Err(_) = msg.reply_tx.send(ChannelReply { reply: reply.unwrap(), }) { @@ -95,9 +104,8 @@ pub async fn start_broker( } }); - println!("wait..."); - tokio::time::sleep(Duration::from_secs(2)).await; - println!("done waiting"); + std::thread::sleep(Duration::from_secs(1)); + // alerts_handle.await?; // sub_task.await?; // relay_task.await?; @@ -130,7 +138,7 @@ fn config(settings: Settings) -> Config { max_inflight_size: 1024, auth: None, sphinx_auth: Some(SphinxLoginCredentials { within: None }), - dynamic_filters: false, + dynamic_filters: true, }, tls: None, }, diff --git a/broker/src/routes.rs b/broker/src/routes.rs index 71dce30..01a5920 100644 --- a/broker/src/routes.rs +++ b/broker/src/routes.rs @@ -22,6 +22,7 @@ pub async fn control(sender: &State>, msg: &str) -> Resul if message.len() < 65 { return Err(Error::Fail); } + println!("/control? got hit!"); let (request, reply_rx) = ChannelRequest::new(topics::CONTROL, message); // send to ESP let _ = sender.send(request).await.map_err(|_| Error::Fail)?; diff --git a/broker/src/run_test.rs b/broker/src/run_test.rs index de1fc21..e3c19a4 100644 --- a/broker/src/run_test.rs +++ b/broker/src/run_test.rs @@ -12,48 +12,47 @@ const CLIENT_ID: &str = "test-1"; pub async fn run_test() -> rocket::Rocket { log::info!("TEST..."); - let mut id = 0u16; - let mut sequence = 1; + // let mut id = 0u16; + // let mut sequence = 1; let settings = Settings::default(); let (tx, rx) = mpsc::channel(1000); - let (status_tx, mut status_rx) = mpsc::channel(1000); + let (status_tx, _status_rx) = mpsc::channel(1000); let (error_tx, error_rx) = broadcast::channel(1000); crate::error_log::log_errors(error_rx); start_broker(rx, status_tx, error_tx.clone(), CLIENT_ID, settings) - .await .expect("FAILED TO START BROKER"); log::info!("BROKER started!"); - let mut connected = false; - let tx_ = tx.clone(); - tokio::spawn(async move { - loop { - tokio::select! { - status = status_rx.recv() => { - if let Some(connection_status) = status { - connected = connection_status; - id = 0; - sequence = 1; - log::info!("========> CONNECTED! {}", connection_status); - } - } - res = iteration(id, sequence, tx_.clone(), connected) => { - if let Err(e) = res { - log::warn!("===> iteration failed {:?}", e); - // connected = false; - // id = 0; - // sequence = 1; - } else if connected { - sequence = sequence.wrapping_add(1); - id += 1; - } - tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; - } - }; - } - }); + // let mut connected = false; + // let tx_ = tx.clone(); + // tokio::spawn(async move { + // loop { + // tokio::select! { + // status = status_rx.recv() => { + // if let Some(connection_status) = status { + // connected = connection_status; + // id = 0; + // sequence = 1; + // log::info!("========> CONNECTED! {}", connection_status); + // } + // } + // res = iteration(id, sequence, tx_.clone(), connected) => { + // if let Err(e) = res { + // log::warn!("===> iteration failed {:?}", e); + // // connected = false; + // // id = 0; + // // sequence = 1; + // } else if connected { + // sequence = sequence.wrapping_add(1); + // id += 1; + // } + // tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + // } + // }; + // } + // }); launch_rocket(tx, error_tx, settings) } diff --git a/broker/src/util.rs b/broker/src/util.rs index 8b15b1f..c756f3b 100644 --- a/broker/src/util.rs +++ b/broker/src/util.rs @@ -85,6 +85,8 @@ pub fn setup_logging(who: &str, level_arg: &str) { .level_for("sled", log::LevelFilter::Info) .level_for("rumqttd", log::LevelFilter::Warn) .level_for("rocket", log::LevelFilter::Warn) + .level_for("tracing", log::LevelFilter::Warn) + .level_for("_", log::LevelFilter::Warn) .chain(std::io::stdout()) // .chain(fern::log_file("/tmp/output.log")?) .apply() diff --git a/tester/cmd.json b/tester/cmd.json index 8a99e7d..9aa1ecf 100644 --- a/tester/cmd.json +++ b/tester/cmd.json @@ -1,7 +1,3 @@ { - "type": "Ota", - "content": { - "url": "http://192.168.1.10/sphinx-update-", - "version": 0 - } + "type": "GetNonce" } diff --git a/tester/src/main.rs b/tester/src/main.rs index 64bf516..16a42c0 100644 --- a/tester/src/main.rs +++ b/tester/src/main.rs @@ -174,7 +174,7 @@ async fn run_test( loop { match eventloop.poll().await { Ok(event) => { - // println!("{:?}", event); + println!("{:?}", event); if let Some((topic, msg_bytes)) = incoming_bytes(event) { match topic.as_str() { topics::VLS => {