diff --git a/broker/Cargo.toml b/broker/Cargo.toml index 434467f..c848600 100644 --- a/broker/Cargo.toml +++ b/broker/Cargo.toml @@ -21,6 +21,7 @@ rumqttc = "0.12.0" clap = "=3.0.0-beta.2" clap_derive = "=3.0.0-beta.5" chrono = "0.4" +once_cell = "1.12.0" [features] default = ["std"] diff --git a/broker/rust-toolchain.toml b/broker/rust-toolchain.toml new file mode 100644 index 0000000..fb33ae1 --- /dev/null +++ b/broker/rust-toolchain.toml @@ -0,0 +1,3 @@ +[toolchain] + +channel = "nightly" \ No newline at end of file diff --git a/broker/src/main.rs b/broker/src/main.rs index aa1785b..c6a2ac2 100644 --- a/broker/src/main.rs +++ b/broker/src/main.rs @@ -1,3 +1,4 @@ +#![feature(once_cell)] mod init; mod mqtt; mod run_test; diff --git a/broker/src/mqtt.rs b/broker/src/mqtt.rs index 5b45eec..953a026 100644 --- a/broker/src/mqtt.rs +++ b/broker/src/mqtt.rs @@ -5,10 +5,10 @@ use librumqttd::{ rumqttlog::router::ConnectionMetrics, Config, }; - use std::sync::Arc; use std::thread; use std::time::Duration; +use std::{lazy::SyncLazy, sync::Mutex}; use tokio::sync::mpsc; use tokio::time::timeout; @@ -16,6 +16,16 @@ const SUB_TOPIC: &str = "sphinx-return"; const PUB_TOPIC: &str = "sphinx"; const USERNAME: &str = "sphinx-key"; const PASSWORD: &str = "sphinx-key-pass"; +const REPLY_TIMEOUT_MS: u64 = 1000; + +// static CONNECTED: OnceCell = OnceCell::new(); +static CONNECTED: SyncLazy> = SyncLazy::new(|| Mutex::new(false)); +fn set_connected(b: bool) { + *CONNECTED.lock().unwrap() = b; +} +fn get_connected() -> bool { + *CONNECTED.lock().unwrap() +} pub fn start_broker( mut receiver: mpsc::Receiver, @@ -31,7 +41,8 @@ pub fn start_broker( router.start().expect("could not start router"); }); - let mut client_connected = false; + // let mut client_connected = AtomicBool::new(false); + // CONNECTED.set(false).expect("could init CONNECTED"); let mut rt_builder = tokio::runtime::Builder::new_multi_thread(); rt_builder.enable_all(); @@ -45,16 +56,17 @@ pub fn start_broker( link_tx.subscribe([SUB_TOPIC]).await.unwrap(); let router_tx = builder.router_tx(); + let status_sender_ = status_sender.clone(); tokio::spawn(async move { let config = config.clone().into(); let router_tx = router_tx.clone(); let console: Arc = Arc::new(ConsoleLink::new(config, router_tx)); loop { let metrics = consolelink::request_metrics(console.clone(), client_id.clone()); - if let Some(c) = metrics_to_status(metrics, client_connected) { - client_connected = c; + if let Some(c) = metrics_to_status(metrics, get_connected()) { + set_connected(c); log::info!("connection status changed to: {}", c); - status_sender + status_sender_ .send(c) .await .expect("couldnt send connection status"); @@ -71,6 +83,7 @@ pub fn start_broker( } } } + println!("BOOM LINK_TX CLOSED!"); }); let relay_task = tokio::spawn(async move { @@ -79,14 +92,25 @@ pub fn start_broker( .publish(PUB_TOPIC, false, msg.message) .await .expect("could not mqtt pub"); - if let Ok(reply) = timeout(Duration::from_millis(1000), msg_rx.recv()).await { - if let Err(_) = msg.reply_tx.send(ChannelReply { - reply: reply.unwrap(), - }) { - log::warn!("could not send on reply_tx"); + match timeout(Duration::from_millis(REPLY_TIMEOUT_MS), msg_rx.recv()).await { + Ok(reply) => { + if let Err(_) = msg.reply_tx.send(ChannelReply { + reply: reply.unwrap(), + }) { + log::warn!("could not send on reply_tx"); + } + } + Err(e) => { + log::warn!("reply_tx timed out {:?}", e); + set_connected(false); + status_sender + .send(false) + .await + .expect("couldnt send connection status"); } } } + println!("BOOM RECEIVER CLOSED!"); }); servers.await; diff --git a/broker/src/run_test.rs b/broker/src/run_test.rs index 2039f72..8102071 100644 --- a/broker/src/run_test.rs +++ b/broker/src/run_test.rs @@ -31,8 +31,10 @@ pub fn run_test() { res = iteration(id, sequence, tx.clone(), connected) => { if let Err(e) = res { log::warn!("===> iteration failed {:?}", e); - } - if connected { + // connected = false; + // id = 0; + // sequence = 1; + } else if connected { sequence = sequence.wrapping_add(1); id += 1; } @@ -52,7 +54,7 @@ pub async fn iteration( if !connected { return Ok(()); } - // log::info!("do a ping!"); + log::info!("do a ping!"); let ping = msgs::Ping { id, message: WireString("ping".as_bytes().to_vec()), @@ -65,7 +67,9 @@ pub async fn iteration( reply_tx, }; tx.send(request).await?; + println!("tx.send(request)"); let res = reply_rx.await?; + println!("reply_rx.await"); let reply = parser::response_from_bytes(res.reply, sequence)?; match reply { Message::Pong(p) => {