incoming msg queue that can be custom processed

This commit is contained in:
Evan Feenstra
2023-06-14 12:02:32 -07:00
parent 7238ec85da
commit e1287cb597

View File

@@ -5,6 +5,7 @@ use rocket::tokio::{sync::broadcast, sync::mpsc};
use rumqttd::{local::LinkTx, Alert, AlertEvent, AuthMsg, Broker, Config, Notification}; use rumqttd::{local::LinkTx, Alert, AlertEvent, AuthMsg, Broker, Config, Notification};
use sphinx_signer::sphinx_glyph::sphinx_auther::token::Token; use sphinx_signer::sphinx_glyph::sphinx_auther::token::Token;
use sphinx_signer::sphinx_glyph::topics; use sphinx_signer::sphinx_glyph::topics;
use std::collections::VecDeque;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::time::Duration; use std::time::Duration;
@@ -77,44 +78,68 @@ pub fn start_broker(
// String is the client id // String is the client id
let (msg_tx, msg_rx) = std::sync::mpsc::channel::<(String, String, Vec<u8>)>(); let (msg_tx, msg_rx) = std::sync::mpsc::channel::<(String, String, Vec<u8>)>();
// receive from CLN, Frontend, Controller, or LSS let q = Arc::new(Mutex::new(VecDeque::new()));
let conns_ = connections.clone();
let q_ = q.clone();
let _relay_task = std::thread::spawn(move || { let _relay_task = std::thread::spawn(move || {
while let Some(msg) = receiver.blocking_recv() { while let Some(msg) = receiver.blocking_recv() {
let mut q_ = q_.lock().unwrap();
q_.push_back(msg);
}
});
let _processor_task = std::thread::spawn(move || {
loop { loop {
let reply = if let Some(cid) = msg.cid.clone() { let mut q = q.lock().unwrap();
// for a specific client if q.len() < 1 {
pub_wait(&cid, &msg.topic, &msg.message, &msg_rx, &mut link_tx) std::thread::sleep(Duration::from_millis(50));
} else { continue;
// send to each client in turn
let cs = conns_.lock().unwrap();
let client_list = cs.clients.clone();
drop(cs);
// wait a second if there are no clients
if client_list.len() == 0 {
std::thread::sleep(Duration::from_secs(1));
None
} else {
let mut rep = None;
for cid in client_list.iter() {
rep = pub_wait(&cid, &msg.topic, &msg.message, &msg_rx, &mut link_tx);
if let Some(_) = &rep {
break;
}
}
rep
}
};
if let Some(reply) = reply {
if let Err(_) = msg.reply_tx.send(reply) {
log::warn!("could not send on reply_tx");
}
break;
} }
let first = q.pop_front().unwrap();
if let Some(cid) = first.cid.clone() {
//
} }
} }
}); });
// receive from CLN, Frontend, Controller, or LSS
// let conns_ = connections.clone();
// let _relay_task = std::thread::spawn(move || {
// while let Some(msg) = receiver.blocking_recv() {
// loop {
// let reply = if let Some(cid) = msg.cid.clone() {
// // for a specific client
// pub_wait(&cid, &msg.topic, &msg.message, &msg_rx, &mut link_tx)
// } else {
// // send to each client in turn
// let cs = conns_.lock().unwrap();
// let client_list = cs.clients.clone();
// drop(cs);
// // wait a second if there are no clients
// if client_list.len() == 0 {
// std::thread::sleep(Duration::from_secs(1));
// None
// } else {
// let mut rep = None;
// for cid in client_list.iter() {
// rep = pub_wait(&cid, &msg.topic, &msg.message, &msg_rx, &mut link_tx);
// if let Some(_) = &rep {
// break;
// }
// }
// rep
// }
// };
// if let Some(reply) = reply {
// if let Err(_) = msg.reply_tx.send(reply) {
// log::warn!("could not send on reply_tx");
// }
// break;
// }
// }
// }
// });
// receive replies back from glyph // receive replies back from glyph
let _sub_task = std::thread::spawn(move || { let _sub_task = std::thread::spawn(move || {
while let Ok(message) = link_rx.recv() { while let Ok(message) = link_rx.recv() {