use SyncLazy to track client connected state

This commit is contained in:
Evan Feenstra
2022-06-12 09:52:02 -07:00
parent 5a49be61b3
commit a1ecd05526
5 changed files with 46 additions and 13 deletions

View File

@@ -21,6 +21,7 @@ rumqttc = "0.12.0"
clap = "=3.0.0-beta.2" clap = "=3.0.0-beta.2"
clap_derive = "=3.0.0-beta.5" clap_derive = "=3.0.0-beta.5"
chrono = "0.4" chrono = "0.4"
once_cell = "1.12.0"
[features] [features]
default = ["std"] default = ["std"]

View File

@@ -0,0 +1,3 @@
[toolchain]
channel = "nightly"

View File

@@ -1,3 +1,4 @@
#![feature(once_cell)]
mod init; mod init;
mod mqtt; mod mqtt;
mod run_test; mod run_test;

View File

@@ -5,10 +5,10 @@ use librumqttd::{
rumqttlog::router::ConnectionMetrics, rumqttlog::router::ConnectionMetrics,
Config, Config,
}; };
use std::sync::Arc; use std::sync::Arc;
use std::thread; use std::thread;
use std::time::Duration; use std::time::Duration;
use std::{lazy::SyncLazy, sync::Mutex};
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::time::timeout; use tokio::time::timeout;
@@ -16,6 +16,16 @@ const SUB_TOPIC: &str = "sphinx-return";
const PUB_TOPIC: &str = "sphinx"; const PUB_TOPIC: &str = "sphinx";
const USERNAME: &str = "sphinx-key"; const USERNAME: &str = "sphinx-key";
const PASSWORD: &str = "sphinx-key-pass"; const PASSWORD: &str = "sphinx-key-pass";
const REPLY_TIMEOUT_MS: u64 = 1000;
// static CONNECTED: OnceCell<bool> = OnceCell::new();
static CONNECTED: SyncLazy<Mutex<bool>> = SyncLazy::new(|| Mutex::new(false));
fn set_connected(b: bool) {
*CONNECTED.lock().unwrap() = b;
}
fn get_connected() -> bool {
*CONNECTED.lock().unwrap()
}
pub fn start_broker( pub fn start_broker(
mut receiver: mpsc::Receiver<ChannelRequest>, mut receiver: mpsc::Receiver<ChannelRequest>,
@@ -31,7 +41,8 @@ pub fn start_broker(
router.start().expect("could not start router"); router.start().expect("could not start router");
}); });
let mut client_connected = false; // let mut client_connected = AtomicBool::new(false);
// CONNECTED.set(false).expect("could init CONNECTED");
let mut rt_builder = tokio::runtime::Builder::new_multi_thread(); let mut rt_builder = tokio::runtime::Builder::new_multi_thread();
rt_builder.enable_all(); rt_builder.enable_all();
@@ -45,16 +56,17 @@ pub fn start_broker(
link_tx.subscribe([SUB_TOPIC]).await.unwrap(); link_tx.subscribe([SUB_TOPIC]).await.unwrap();
let router_tx = builder.router_tx(); let router_tx = builder.router_tx();
let status_sender_ = status_sender.clone();
tokio::spawn(async move { tokio::spawn(async move {
let config = config.clone().into(); let config = config.clone().into();
let router_tx = router_tx.clone(); let router_tx = router_tx.clone();
let console: Arc<ConsoleLink> = Arc::new(ConsoleLink::new(config, router_tx)); let console: Arc<ConsoleLink> = Arc::new(ConsoleLink::new(config, router_tx));
loop { loop {
let metrics = consolelink::request_metrics(console.clone(), client_id.clone()); let metrics = consolelink::request_metrics(console.clone(), client_id.clone());
if let Some(c) = metrics_to_status(metrics, client_connected) { if let Some(c) = metrics_to_status(metrics, get_connected()) {
client_connected = c; set_connected(c);
log::info!("connection status changed to: {}", c); log::info!("connection status changed to: {}", c);
status_sender status_sender_
.send(c) .send(c)
.await .await
.expect("couldnt send connection status"); .expect("couldnt send connection status");
@@ -71,6 +83,7 @@ pub fn start_broker(
} }
} }
} }
println!("BOOM LINK_TX CLOSED!");
}); });
let relay_task = tokio::spawn(async move { let relay_task = tokio::spawn(async move {
@@ -79,14 +92,25 @@ pub fn start_broker(
.publish(PUB_TOPIC, false, msg.message) .publish(PUB_TOPIC, false, msg.message)
.await .await
.expect("could not mqtt pub"); .expect("could not mqtt pub");
if let Ok(reply) = timeout(Duration::from_millis(1000), msg_rx.recv()).await { match timeout(Duration::from_millis(REPLY_TIMEOUT_MS), msg_rx.recv()).await {
Ok(reply) => {
if let Err(_) = msg.reply_tx.send(ChannelReply { if let Err(_) = msg.reply_tx.send(ChannelReply {
reply: reply.unwrap(), reply: reply.unwrap(),
}) { }) {
log::warn!("could not send on reply_tx"); log::warn!("could not send on reply_tx");
} }
} }
Err(e) => {
log::warn!("reply_tx timed out {:?}", e);
set_connected(false);
status_sender
.send(false)
.await
.expect("couldnt send connection status");
} }
}
}
println!("BOOM RECEIVER CLOSED!");
}); });
servers.await; servers.await;

View File

@@ -31,8 +31,10 @@ pub fn run_test() {
res = iteration(id, sequence, tx.clone(), connected) => { res = iteration(id, sequence, tx.clone(), connected) => {
if let Err(e) = res { if let Err(e) = res {
log::warn!("===> iteration failed {:?}", e); log::warn!("===> iteration failed {:?}", e);
} // connected = false;
if connected { // id = 0;
// sequence = 1;
} else if connected {
sequence = sequence.wrapping_add(1); sequence = sequence.wrapping_add(1);
id += 1; id += 1;
} }
@@ -52,7 +54,7 @@ pub async fn iteration(
if !connected { if !connected {
return Ok(()); return Ok(());
} }
// log::info!("do a ping!"); log::info!("do a ping!");
let ping = msgs::Ping { let ping = msgs::Ping {
id, id,
message: WireString("ping".as_bytes().to_vec()), message: WireString("ping".as_bytes().to_vec()),
@@ -65,7 +67,9 @@ pub async fn iteration(
reply_tx, reply_tx,
}; };
tx.send(request).await?; tx.send(request).await?;
println!("tx.send(request)");
let res = reply_rx.await?; let res = reply_rx.await?;
println!("reply_rx.await");
let reply = parser::response_from_bytes(res.reply, sequence)?; let reply = parser::response_from_bytes(res.reply, sequence)?;
match reply { match reply {
Message::Pong(p) => { Message::Pong(p) => {