refactor broker metrics conn, single threaded virtual esp, hardcoded config, start auther module

This commit is contained in:
Evan Feenstra
2022-06-10 10:08:11 -07:00
parent 7a5d4d7042
commit 7ee7ac071c
8 changed files with 283 additions and 190 deletions

View File

@@ -1,9 +1,11 @@
use crate::{ChannelReply, ChannelRequest};
use librumqttd::{
async_locallink::construct_broker,
async_locallink,
consolelink::{self, ConsoleLink},
rumqttlog::router::ConnectionMetrics,
Config,
};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
@@ -12,16 +14,18 @@ use tokio::time::timeout;
const SUB_TOPIC: &str = "sphinx-return";
const PUB_TOPIC: &str = "sphinx";
const USERNAME: &str = "sphinx-key";
const PASSWORD: &str = "sphinx-key-pass";
pub fn start_broker(
mut receiver: mpsc::Receiver<ChannelRequest>,
status_sender: mpsc::Sender<bool>,
expected_client_id: &str,
) -> tokio::runtime::Runtime {
let config: Config = confy::load_path("config/rumqttd.conf").unwrap();
let config = config();
let client_id = expected_client_id.to_string();
let (mut router, servers, builder) = construct_broker(config.clone());
let (mut router, servers, builder) = async_locallink::construct(config.clone());
thread::spawn(move || {
router.start().expect("could not start router");
@@ -29,9 +33,6 @@ pub fn start_broker(
let mut client_connected = false;
// let (status_tx, mut status_rx): (mpsc::Sender<bool>, mpsc::Receiver<bool>) =
// mpsc::channel(1000);
let mut rt_builder = tokio::runtime::Builder::new_multi_thread();
rt_builder.enable_all();
let rt = rt_builder.build().unwrap();
@@ -39,8 +40,9 @@ pub fn start_broker(
tokio::spawn(async move {
let (msg_tx, mut msg_rx): (mpsc::Sender<Vec<u8>>, mpsc::Receiver<Vec<u8>>) =
mpsc::channel(1000);
let (mut tx, mut rx) = builder.clone().connect("localclient", 200).await.unwrap();
tx.subscribe([SUB_TOPIC]).await.unwrap();
let (mut link_tx, mut link_rx) =
builder.clone().connect("localclient", 200).await.unwrap();
link_tx.subscribe([SUB_TOPIC]).await.unwrap();
let router_tx = builder.router_tx();
tokio::spawn(async move {
@@ -49,40 +51,19 @@ pub fn start_broker(
let console: Arc<ConsoleLink> = Arc::new(ConsoleLink::new(config, router_tx));
loop {
let metrics = consolelink::request_metrics(console.clone(), client_id.clone());
let changed: Option<bool> = match metrics.tracker() {
Some(t) => {
// wait for subscription to be sure
if t.concrete_subscriptions_len() > 0 {
if !client_connected {
Some(true) // changed to true
} else {
None
}
} else {
None
}
}
None => {
if client_connected {
Some(false)
} else {
None
}
}
};
if let Some(c) = changed {
if let Some(c) = metrics_to_status(metrics, client_connected) {
client_connected = c;
status_sender
.send(c)
.await
.expect("couldnt send connection status");
}
tokio::time::sleep(Duration::from_millis(800)).await;
tokio::time::sleep(Duration::from_millis(500)).await;
}
});
let sub_task = tokio::spawn(async move {
while let Ok(message) = rx.recv().await {
while let Ok(message) = link_rx.recv().await {
for payload in message.payload {
if let Err(e) = msg_tx.send(payload.to_vec()).await {
log::warn!("pub err {:?}", e);
@@ -93,7 +74,8 @@ pub fn start_broker(
let relay_task = tokio::spawn(async move {
while let Some(msg) = receiver.recv().await {
tx.publish(PUB_TOPIC, false, msg.message)
link_tx
.publish(PUB_TOPIC, false, msg.message)
.await
.expect("could not mqtt pub");
if let Ok(reply) = timeout(Duration::from_millis(1000), msg_rx.recv()).await {
@@ -114,3 +96,76 @@ pub fn start_broker(
rt
}
fn metrics_to_status(metrics: ConnectionMetrics, client_connected: bool) -> Option<bool> {
match metrics.tracker() {
Some(t) => {
// wait for subscription to be sure
if t.concrete_subscriptions_count() > 0 {
if !client_connected {
Some(true) // changed to true
} else {
None
}
} else {
None
}
}
None => {
if client_connected {
Some(false)
} else {
None
}
}
}
}
fn config() -> Config {
use librumqttd::rumqttlog::Config as RouterConfig;
use librumqttd::{
ConnectionLoginCredentials, ConnectionSettings, ConsoleSettings, ServerSettings,
};
use std::collections::HashMap;
use std::net::{Ipv4Addr, SocketAddrV4};
use std::path::PathBuf;
let id = 0;
let router = RouterConfig {
id,
dir: PathBuf::from("/tmp/rumqttd"),
max_segment_size: 10240,
max_segment_count: 10,
max_connections: 10001,
};
let mut servers = HashMap::new();
servers.insert(
"0".to_string(),
ServerSettings {
cert: None,
listen: SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 1883).into(),
next_connection_delay_ms: 1,
connections: ConnectionSettings {
connection_timeout_ms: 5000,
max_client_id_len: 256,
throttle_delay_ms: 0,
max_payload_size: 5120,
max_inflight_count: 200,
max_inflight_size: 1024,
login_credentials: Some(vec![ConnectionLoginCredentials {
username: USERNAME.to_string(),
password: PASSWORD.to_string(),
}]),
},
},
);
Config {
id,
servers,
router,
console: ConsoleSettings {
listen: SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 3030).into(),
},
cluster: None,
replicator: None,
}
}