update broker to new rumqttd

This commit is contained in:
Evan Feenstra
2023-02-10 10:57:20 -08:00
parent 28a9aa8d1b
commit ae2855d793
3 changed files with 850 additions and 355 deletions

View File

@@ -1,183 +1,143 @@
use crate::util::Settings;
use crate::{ChannelReply, ChannelRequest};
use librumqttd::{
async_locallink,
consolelink::{self, ConsoleLink},
rumqttlog::router::ConnectionMetrics,
Config,
};
use rocket::tokio::task;
use rocket::tokio::time::timeout;
use rocket::tokio::{self, sync::broadcast, sync::mpsc};
use rumqttd::{Alert, AlertEvent, Broker, Config, Notification};
use sphinx_signer::sphinx_glyph::topics;
use std::sync::Arc;
use std::sync::{LazyLock, Mutex};
use std::time::Duration;
// must get a reply within this time, or disconnects
const REPLY_TIMEOUT_MS: u64 = 10000;
static CONNECTED: LazyLock<Mutex<bool>> = LazyLock::new(|| Mutex::new(false));
fn set_connected(b: bool) {
*CONNECTED.lock().unwrap() = b;
}
fn get_connected() -> bool {
*CONNECTED.lock().unwrap()
}
pub async fn start_broker(
mut receiver: mpsc::Receiver<ChannelRequest>,
status_sender: mpsc::Sender<bool>,
error_sender: broadcast::Sender<Vec<u8>>,
expected_client_id: &str,
settings: Settings,
) {
let config = config(settings);
) -> anyhow::Result<()> {
let conf = config(settings);
let client_id = expected_client_id.to_string();
let (mut router, servers, builder) = async_locallink::construct(config.clone());
let broker = Broker::new(conf);
let mut alerts = broker
.alerts(vec![
// "/alerts/error/+".to_string(),
"/alerts/event/connect/+".to_string(),
"/alerts/event/disconnect/+".to_string(),
])
.unwrap();
// std thread for the router
std::thread::spawn(move || {
log::info!("start mqtt router");
router.start().expect("could not start router");
});
tokio::spawn(async move {
log::info!("start mqtt relayer and localclient");
let (msg_tx, mut msg_rx): (mpsc::Sender<Vec<u8>>, mpsc::Receiver<Vec<u8>>) =
mpsc::channel(1000);
let (mut link_tx, mut link_rx) = builder.clone().connect("localclient", 200).await.unwrap();
link_tx
.subscribe([topics::VLS_RETURN, topics::CONTROL_RETURN, topics::ERROR])
.await
.unwrap();
let router_tx = builder.router_tx();
let status_sender_ = status_sender.clone();
tokio::spawn(async move {
let config = config.clone().into();
let router_tx = router_tx.clone();
let console: Arc<ConsoleLink> = Arc::new(ConsoleLink::new(config, router_tx));
loop {
let metrics = consolelink::request_metrics(console.clone(), client_id.clone());
if let Some(c) = metrics_to_status(metrics, get_connected()) {
set_connected(c);
log::info!("connection status changed to: {}", c);
let _ = status_sender_.send(c).await;
}
tokio::time::sleep(Duration::from_millis(500)).await;
}
});
let sub_task = tokio::spawn(async move {
while let Ok(message) = link_rx.recv().await {
for payload in message.payload {
if message.topic == topics::ERROR {
error_sender.send(payload.to_vec()).expect("asdfasdf");
} else if let Err(e) = msg_tx.send(payload.to_vec()).await {
log::warn!("pub err {:?}", e);
}
}
}
});
let relay_task = tokio::spawn(async move {
while let Some(msg) = receiver.recv().await {
link_tx
.publish(&msg.topic, false, msg.message)
.await
.expect("could not mqtt pub");
match timeout(Duration::from_millis(REPLY_TIMEOUT_MS), msg_rx.recv()).await {
Ok(reply) => {
if let Err(_) = msg.reply_tx.send(ChannelReply {
reply: reply.unwrap(),
}) {
log::warn!("could not send on reply_tx");
// connected/disconnected status alerts
let status_sender_ = status_sender.clone();
let alerts_handle = tokio::spawn(async move {
loop {
let alert = alerts.poll();
println!("Alert: {alert:?}");
match alert.1 {
Alert::Event(cid, event) => {
if cid == client_id {
if let Some(status) = match event {
AlertEvent::Connect => Some(true),
AlertEvent::Disconnect => Some(false),
_ => None,
} {
let _ = status_sender_.send(status).await;
}
}
Err(e) => {
log::warn!("reply_tx timed out {:?}", e);
set_connected(false);
let _ = status_sender.send(false).await;
}
}
_ => (),
}
});
servers.await;
sub_task.await.unwrap();
relay_task.await.unwrap();
tokio::time::sleep(Duration::from_millis(333)).await;
}
});
// give one second for router to spawn listeners
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
}
fn metrics_to_status(metrics: ConnectionMetrics, client_connected: bool) -> Option<bool> {
match metrics.tracker() {
Some(t) => {
// wait for subscription to be sure
if t.concrete_subscriptions_count() > 0 {
if !client_connected {
Some(true) // changed to true
} else {
None
// msg forwarding
let (mut link_tx, mut link_rx) = broker.link("localclient")?;
let (msg_tx, mut msg_rx): (mpsc::Sender<Vec<u8>>, mpsc::Receiver<Vec<u8>>) =
mpsc::channel(1000);
link_tx.subscribe(topics::VLS_RETURN)?;
link_tx.subscribe(topics::CONTROL_RETURN)?;
link_tx.subscribe(topics::ERROR)?;
let sub_task = tokio::spawn(async move {
while let Ok(message) = link_rx.recv() {
if let Some(n) = message {
match n {
Notification::Forward(f) => {
if f.publish.topic == topics::ERROR {
let _ = error_sender.send(f.publish.topic.to_vec());
} else {
let _ = msg_tx.send(f.publish.payload.to_vec()).await;
}
}
_ => (),
};
}
}
});
let relay_task = tokio::spawn(async move {
while let Some(msg) = receiver.recv().await {
link_tx.publish(msg.topic, msg.message);
match timeout(Duration::from_millis(REPLY_TIMEOUT_MS), msg_rx.recv()).await {
Ok(reply) => {
if let Err(_) = msg.reply_tx.send(ChannelReply {
reply: reply.unwrap(),
}) {
log::warn!("could not send on reply_tx");
}
}
Err(e) => {
log::warn!("reply_tx timed out {:?}", e);
let _ = status_sender.send(false).await;
}
} else {
None
}
}
None => {
if client_connected {
Some(false)
} else {
None
}
}
}
});
alerts_handle.await?;
sub_task.await?;
relay_task.await?;
Ok(())
}
fn config(settings: Settings) -> Config {
use librumqttd::rumqttlog::Config as RouterConfig;
use librumqttd::{ConnectionSettings, ConsoleSettings, ServerSettings, SphinxLoginCredentials};
use rumqttd::{ConnectionSettings, ConsoleSettings, ServerSettings, SphinxLoginCredentials};
use std::collections::HashMap;
use std::net::{Ipv4Addr, SocketAddrV4};
use std::path::PathBuf;
let id = 0;
let router = RouterConfig {
id,
dir: PathBuf::from("/tmp/rumqttd"),
let router = rumqttd::RouterConfig {
instant_ack: true,
max_segment_size: 10240,
max_segment_count: 10,
max_connections: 10001,
..Default::default()
};
let mut servers = HashMap::new();
servers.insert(
id.to_string(),
"sphinx-broker".to_string(),
ServerSettings {
cert: None,
name: "sphinx-broker".to_string(),
listen: SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), settings.mqtt_port).into(),
next_connection_delay_ms: 1,
connections: ConnectionSettings {
connection_timeout_ms: 5000,
max_client_id_len: 256,
throttle_delay_ms: 0,
max_payload_size: 5120,
max_inflight_count: 200,
max_inflight_size: 1024,
login_credentials: None,
auth: None,
sphinx_auth: Some(SphinxLoginCredentials { within: None }),
dynamic_filters: false,
},
tls: None,
},
);
Config {
id,
servers,
id: 0,
v4: servers,
router,
console: ConsoleSettings {
listen: SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 3030).into(),
},
console: ConsoleSettings::new("0.0.0.0:3030"),
cluster: None,
replicator: None,
..Default::default()
}
}