use blocking_recv to cross rumqttd sync channel and tokio channle

This commit is contained in:
Evan Feenstra
2023-02-20 15:16:10 -08:00
parent f96280a222
commit 90342391da
4 changed files with 42 additions and 55 deletions

View File

@@ -1,15 +1,12 @@
use crate::util::Settings;
use crate::{ChannelReply, ChannelRequest};
use rocket::tokio::time::timeout;
use rocket::tokio::{self, sync::broadcast, sync::mpsc};
use rocket::tokio::{sync::broadcast, sync::mpsc};
use rumqttd::{Alert, AlertEvent, Broker, Config, Notification};
use sphinx_signer::sphinx_glyph::topics;
use std::time::Duration;
// pub const INTERNAL_CONTROL: &str = "INTERNAL_CONTROL";
// must get a reply within this time, or disconnects
const REPLY_TIMEOUT_MS: u64 = 10000;
// const REPLY_TIMEOUT_MS: u64 = 10000;
pub fn start_broker(
mut receiver: mpsc::Receiver<ChannelRequest>,
@@ -35,50 +32,42 @@ pub fn start_broker(
// connected/disconnected status alerts
let status_sender_ = status_sender.clone();
let _alerts_handle = tokio::spawn(async move {
loop {
let alert = alerts.poll();
println!("Alert: {:?}", alert);
match alert.1 {
Alert::Event(cid, event) => {
if cid == client_id {
if let Some(status) = match event {
AlertEvent::Connect => Some(true),
AlertEvent::Disconnect => Some(false),
_ => None,
} {
let _ = status_sender_.send(status).await;
}
let _alerts_handle = std::thread::spawn(move || loop {
let alert = alerts.poll();
println!("Alert: {:?}", alert);
match alert.1 {
Alert::Event(cid, event) => {
if cid == client_id {
if let Some(status) = match event {
AlertEvent::Connect => Some(true),
AlertEvent::Disconnect => Some(false),
_ => None,
} {
let _ = status_sender_.blocking_send(status);
}
}
_ => (),
}
_ => (),
}
});
// msg forwarding
let (msg_tx, mut msg_rx): (mpsc::Sender<Vec<u8>>, mpsc::Receiver<Vec<u8>>) =
mpsc::channel(1000);
// link_tx.subscribe(INTERNAL_CONTROL)?;
link_tx.subscribe(topics::VLS_RETURN)?;
link_tx.subscribe(topics::CONTROL_RETURN)?;
link_tx.subscribe(topics::ERROR)?;
mpsc::channel(10000);
link_tx.subscribe(topics::VLS_RETURN).unwrap();
link_tx.subscribe(topics::CONTROL_RETURN).unwrap();
link_tx.subscribe(topics::ERROR).unwrap();
let _sub_task = tokio::spawn(async move {
println!("ummm....");
let _sub_task = std::thread::spawn(move || {
while let Ok(message) = link_rx.recv() {
if let Some(n) = message {
match n {
Notification::Forward(f) => {
println!("GOT A FORWARDED MSG! FORWARD! {:?}", f.publish.topic);
if f.publish.topic == topics::ERROR {
let _ = error_sender.send(f.publish.topic.to_vec());
} else {
println!("send now on msg_tx {:?}", f.publish.payload.to_vec());
if let Err(e) = msg_tx.send(f.publish.payload.to_vec()).await {
if let Err(e) = msg_tx.blocking_send(f.publish.payload.to_vec()) {
log::error!("failed to pub to msg_tx! {:?}", e);
}
println!("sent on msg_tx");
}
}
_ => (),
@@ -87,36 +76,26 @@ pub fn start_broker(
}
});
let _relay_task = tokio::spawn(async move {
while let Some(msg) = receiver.recv().await {
let _relay_task = std::thread::spawn(move || {
while let Some(msg) = receiver.blocking_recv() {
if let Err(e) = link_tx.publish(msg.topic, msg.message) {
log::error!("failed to pub to link_tx! {:?}", e);
}
println!("PUBBED TO LINKTX....");
// let rep = msg_rx.recv().await;
// println!("REPPPP {:?}", rep);
match timeout(Duration::from_millis(REPLY_TIMEOUT_MS), msg_rx.recv()).await {
Ok(rep) => {
println!("GOT A REPLY {:?}", rep);
if let Some(reply) = rep {
if let Err(_) = msg.reply_tx.send(ChannelReply { reply }) {
log::warn!("could not send on reply_tx");
}
}
}
Err(e) => {
log::warn!("reply_tx timed out {:?}", e);
let _ = status_sender.send(false).await;
let rep = msg_rx.blocking_recv();
if let Some(reply) = rep {
if let Err(_) = msg.reply_tx.send(ChannelReply { reply }) {
log::warn!("could not send on reply_tx");
}
}
}
});
// _sub_task.await.unwrap();
// _relay_task.await.unwrap();
// alerts_handle.await?;
std::thread::sleep(Duration::from_secs(1));
// alerts_handle.await?;
// sub_task.await?;
// relay_task.await?;
Ok(())
}