broker to new rumqttd (not working)

This commit is contained in:
Evan Feenstra
2023-02-20 11:51:32 -08:00
parent b91ea599fd
commit 80215821ce
7 changed files with 80 additions and 63 deletions

View File

@@ -6,6 +6,8 @@ use rumqttd::{Alert, AlertEvent, Broker, Config, Notification};
use sphinx_signer::sphinx_glyph::topics;
use std::time::Duration;
// pub const INTERNAL_CONTROL: &str = "INTERNAL_CONTROL";
// must get a reply within this time, or disconnects
const REPLY_TIMEOUT_MS: u64 = 10000;
@@ -20,13 +22,13 @@ pub fn start_broker(
let client_id = expected_client_id.to_string();
let mut broker = Broker::new(conf);
let mut alerts = broker
.alerts(vec![
// "/alerts/error/+".to_string(),
"/alerts/event/connect/+".to_string(),
"/alerts/event/disconnect/+".to_string(),
])?;
let mut alerts = broker.alerts(vec![
// "/alerts/error/+".to_string(),
"/alerts/event/connect/+".to_string(),
"/alerts/event/disconnect/+".to_string(),
])?;
let (mut link_tx, mut link_rx) = broker.link("localclient")?;
std::thread::spawn(move || {
broker.start().expect("could not start broker");
});
@@ -51,29 +53,32 @@ pub fn start_broker(
}
_ => (),
}
tokio::time::sleep(Duration::from_millis(40)).await;
}
});
// msg forwarding
let (msg_tx, mut msg_rx): (mpsc::Sender<Vec<u8>>, mpsc::Receiver<Vec<u8>>) =
mpsc::channel(1000);
// link_tx.subscribe(INTERNAL_CONTROL)?;
link_tx.subscribe(topics::VLS_RETURN)?;
link_tx.subscribe(topics::CONTROL_RETURN)?;
link_tx.subscribe(topics::ERROR)?;
let _sub_task = tokio::spawn(async move {
println!("ummm....");
while let Ok(message) = link_rx.recv() {
println!("GOT A MSG ON LINK RX");
if let Some(n) = message {
match n {
Notification::Forward(f) => {
println!("GOT A FORWARDED MSG! FORWARD!");
println!("GOT A FORWARDED MSG! FORWARD! {:?}", f.publish.topic);
if f.publish.topic == topics::ERROR {
let _ = error_sender.send(f.publish.topic.to_vec());
} else {
println!("send now on msg_tx {:?}", f.publish.payload.to_vec());
if let Err(e) = msg_tx.send(f.publish.payload.to_vec()).await {
log::error!("failed to pub to msg_tx! {:?}", e);
}
println!("sent on msg_tx");
}
}
_ => (),
@@ -81,19 +86,22 @@ pub fn start_broker(
}
}
});
let _relay_task = tokio::spawn(async move {
while let Some(msg) = receiver.recv().await {
println!("YO YO YO got a receiver msg! {:?}", msg);
if let Err(e) = link_tx.publish(msg.topic, msg.message) {
log::error!("failed to pub to link_tx! {:?}", e);
}
println!("PUBBED TO LINKTX....");
// let rep = msg_rx.recv().await;
// println!("REPPPP {:?}", rep);
match timeout(Duration::from_millis(REPLY_TIMEOUT_MS), msg_rx.recv()).await {
Ok(reply) => {
println!("send on channelreply!");
if let Err(_) = msg.reply_tx.send(ChannelReply {
reply: reply.unwrap(),
}) {
log::warn!("could not send on reply_tx");
Ok(rep) => {
println!("GOT A REPLY {:?}", rep);
if let Some(reply) = rep {
if let Err(_) = msg.reply_tx.send(ChannelReply { reply }) {
log::warn!("could not send on reply_tx");
}
}
}
Err(e) => {
@@ -118,9 +126,10 @@ fn config(settings: Settings) -> Config {
use std::net::{Ipv4Addr, SocketAddrV4};
let router = rumqttd::RouterConfig {
instant_ack: true,
max_segment_size: 10240,
max_segment_size: 104857600,
max_segment_count: 10,
max_connections: 10001,
max_connections: 10010,
max_read_len: 10240,
..Default::default()
};
let mut servers = HashMap::new();