broker: switch rumqttd branch to asdf

This commit is contained in:
irriden
2023-11-25 05:27:07 +00:00
parent 1bce903224
commit af7e5eca08
4 changed files with 10 additions and 11 deletions

2
broker/Cargo.lock generated
View File

@@ -2826,7 +2826,7 @@ dependencies = [
[[package]] [[package]]
name = "rumqttd" name = "rumqttd"
version = "0.15.0" version = "0.15.0"
source = "git+https://github.com/Evanfeenstra/rumqtt?branch=sphinx-3#c087e39bf297c157d96de986f85733bfa0bb9f39" source = "git+https://github.com/Evanfeenstra/rumqtt?branch=sphinx-asdf#3dd8727c13be54ee48f9e5efb664fe7f94a5cb25"
dependencies = [ dependencies = [
"async-tungstenite", "async-tungstenite",
"axum", "axum",

View File

@@ -22,7 +22,7 @@ once_cell = "1.12.0"
pretty_env_logger = "0.4.0" pretty_env_logger = "0.4.0"
rocket = { version = "0.5.0-rc.2", features = ["json"] } rocket = { version = "0.5.0-rc.2", features = ["json"] }
rumqttc = "0.12.0" rumqttc = "0.12.0"
rumqttd = { git = "https://github.com/Evanfeenstra/rumqtt", branch = "sphinx-3", features = ["websocket"] } rumqttd = { git = "https://github.com/Evanfeenstra/rumqtt", branch = "sphinx-asdf", features = ["websocket"] }
secp256k1 = { version = "0.24.0", features = ["rand-std", "bitcoin_hashes"] } secp256k1 = { version = "0.24.0", features = ["rand-std", "bitcoin_hashes"] }
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"

View File

@@ -1,4 +1,3 @@
// #![feature(once_cell)]
mod chain_tracker; mod chain_tracker;
mod conn; mod conn;
mod error_log; mod error_log;
@@ -20,7 +19,7 @@ use rocket::tokio::{
self, self,
sync::{broadcast, mpsc}, sync::{broadcast, mpsc},
}; };
use rumqttd::{oneshot as std_oneshot, AuthMsg}; use rumqttd::{oneshot as std_oneshot, AuthMsg, AuthType};
use std::env; use std::env;
use std::sync::Arc; use std::sync::Arc;
use url::Url; use url::Url;
@@ -133,11 +132,14 @@ pub fn broker_setup(
std::thread::spawn(move || { std::thread::spawn(move || {
while let Ok(am) = auth_rx.recv() { while let Ok(am) = auth_rx.recv() {
let pubkey = current_pubkey(); let pubkey = current_pubkey();
let (ok, new_pubkey) = check_auth(&am.username, &am.password, &pubkey); let (ok, new_pubkey) = match am.msg {
AuthType::Login(login) => check_auth(&login.username, &login.password, &pubkey),
_ => (true, None),
};
if let Some(np) = new_pubkey { if let Some(np) = new_pubkey {
conns_set_pubkey(np); conns_set_pubkey(np);
} }
let _ = am.reply.send(ok); let _ = am.tx.send(ok);
} }
}); });

View File

@@ -20,18 +20,15 @@ pub fn start_broker(
let conf = config(settings); let conf = config(settings);
// println!("CONF {:?}", conf); // println!("CONF {:?}", conf);
let mut broker = Broker::new(conf); let mut broker = Broker::new(conf, Some(auth_sender));
let (mut link_tx, mut link_rx) = broker.link("localclient")?; let (mut link_tx, mut link_rx) = broker.link("localclient")?;
let _ = link_tx.subscribe(format!("+/{}", topics::HELLO)); let _ = link_tx.subscribe(format!("+/{}", topics::HELLO));
let _ = link_tx.subscribe(format!("+/{}", topics::BYE)); let _ = link_tx.subscribe(format!("+/{}", topics::BYE));
let auth_sender_ = auth_sender;
std::thread::spawn(move || { std::thread::spawn(move || {
broker broker.start().expect("could not start broker");
.start(Some(auth_sender_))
.expect("could not start broker");
}); });
// connected/disconnected status alerts // connected/disconnected status alerts