refactor broker to use tokio rather than std::thread

This commit is contained in:
Evan Feenstra
2022-09-07 12:05:54 -07:00
parent 440cbf2e59
commit 3992dfe6fe
8 changed files with 168 additions and 102 deletions

View File

@@ -6,12 +6,11 @@ use librumqttd::{
rumqttlog::router::ConnectionMetrics,
Config,
};
use rocket::tokio::time::timeout;
use rocket::tokio::{self, sync::mpsc};
use std::sync::Arc;
use std::sync::{LazyLock, Mutex};
use std::thread;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::time::timeout;
const SUB_TOPIC: &str = "sphinx-return";
const PUB_TOPIC: &str = "sphinx";
@@ -28,100 +27,92 @@ fn get_connected() -> bool {
*CONNECTED.lock().unwrap()
}
pub fn start_broker(
pub async fn start_broker(
mut receiver: mpsc::Receiver<ChannelRequest>,
status_sender: mpsc::Sender<bool>,
expected_client_id: &str,
settings: &Settings,
) -> tokio::runtime::Runtime {
) {
let config = config(settings);
let client_id = expected_client_id.to_string();
let (mut router, servers, builder) = async_locallink::construct(config.clone());
thread::spawn(move || {
tokio::spawn(async move {
router.start().expect("could not start router");
});
let mut rt_builder = tokio::runtime::Builder::new_multi_thread();
rt_builder.enable_all();
let rt = rt_builder.build().unwrap();
rt.block_on(async {
tokio::spawn(async move {
let (msg_tx, mut msg_rx): (mpsc::Sender<Vec<u8>>, mpsc::Receiver<Vec<u8>>) =
mpsc::channel(1000);
let (mut link_tx, mut link_rx) =
builder.clone().connect("localclient", 200).await.unwrap();
link_tx.subscribe([SUB_TOPIC]).await.unwrap();
tokio::spawn(async move {
let (msg_tx, mut msg_rx): (mpsc::Sender<Vec<u8>>, mpsc::Receiver<Vec<u8>>) =
mpsc::channel(1000);
let (mut link_tx, mut link_rx) = builder.clone().connect("localclient", 200).await.unwrap();
link_tx.subscribe([SUB_TOPIC]).await.unwrap();
let router_tx = builder.router_tx();
let status_sender_ = status_sender.clone();
tokio::spawn(async move {
let config = config.clone().into();
let router_tx = router_tx.clone();
let console: Arc<ConsoleLink> = Arc::new(ConsoleLink::new(config, router_tx));
loop {
let metrics = consolelink::request_metrics(console.clone(), client_id.clone());
if let Some(c) = metrics_to_status(metrics, get_connected()) {
set_connected(c);
log::info!("connection status changed to: {}", c);
status_sender_
.send(c)
let router_tx = builder.router_tx();
let status_sender_ = status_sender.clone();
tokio::spawn(async move {
let config = config.clone().into();
let router_tx = router_tx.clone();
let console: Arc<ConsoleLink> = Arc::new(ConsoleLink::new(config, router_tx));
loop {
let metrics = consolelink::request_metrics(console.clone(), client_id.clone());
if let Some(c) = metrics_to_status(metrics, get_connected()) {
set_connected(c);
log::info!("connection status changed to: {}", c);
status_sender_
.send(c)
.await
.expect("couldnt send connection status");
}
tokio::time::sleep(Duration::from_millis(500)).await;
}
});
let sub_task = tokio::spawn(async move {
while let Ok(message) = link_rx.recv().await {
for payload in message.payload {
if let Err(e) = msg_tx.send(payload.to_vec()).await {
log::warn!("pub err {:?}", e);
}
}
}
println!("BOOM LINK_TX CLOSED!");
});
let relay_task = tokio::spawn(async move {
while let Some(msg) = receiver.recv().await {
link_tx
.publish(PUB_TOPIC, false, msg.message)
.await
.expect("could not mqtt pub");
match timeout(Duration::from_millis(REPLY_TIMEOUT_MS), msg_rx.recv()).await {
Ok(reply) => {
if let Err(_) = msg.reply_tx.send(ChannelReply {
reply: reply.unwrap(),
}) {
log::warn!("could not send on reply_tx");
}
}
Err(e) => {
log::warn!("reply_tx timed out {:?}", e);
set_connected(false);
status_sender
.send(false)
.await
.expect("couldnt send connection status");
}
tokio::time::sleep(Duration::from_millis(500)).await;
}
});
let sub_task = tokio::spawn(async move {
while let Ok(message) = link_rx.recv().await {
for payload in message.payload {
if let Err(e) = msg_tx.send(payload.to_vec()).await {
log::warn!("pub err {:?}", e);
}
}
}
println!("BOOM LINK_TX CLOSED!");
});
let relay_task = tokio::spawn(async move {
while let Some(msg) = receiver.recv().await {
link_tx
.publish(PUB_TOPIC, false, msg.message)
.await
.expect("could not mqtt pub");
match timeout(Duration::from_millis(REPLY_TIMEOUT_MS), msg_rx.recv()).await {
Ok(reply) => {
if let Err(_) = msg.reply_tx.send(ChannelReply {
reply: reply.unwrap(),
}) {
log::warn!("could not send on reply_tx");
}
}
Err(e) => {
log::warn!("reply_tx timed out {:?}", e);
set_connected(false);
status_sender
.send(false)
.await
.expect("couldnt send connection status");
}
}
}
println!("BOOM RECEIVER CLOSED!");
});
servers.await;
sub_task.await.unwrap();
relay_task.await.unwrap();
}
println!("BOOM RECEIVER CLOSED!");
});
servers.await;
sub_task.await.unwrap();
relay_task.await.unwrap();
});
// give one second for router to spawn listeners
std::thread::sleep(std::time::Duration::from_secs(1));
rt
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
fn metrics_to_status(metrics: ConnectionMetrics, client_connected: bool) -> Option<bool> {