some more broker updates

This commit is contained in:
Evan Feenstra
2023-03-15 14:29:34 -07:00
parent a152e663ba
commit c68512ea35
5 changed files with 29 additions and 18 deletions

2
broker/Cargo.lock generated
View File

@@ -2809,7 +2809,7 @@ dependencies = [
[[package]]
name = "rumqttd"
version = "0.12.5"
source = "git+https://github.com/Evanfeenstra/rumqtt?branch=sphinx#66f52f3f2fe739073a4dcc90530e6095915b15a4"
source = "git+https://github.com/Evanfeenstra/rumqtt?branch=sphinx#c9e2174c9385ef99b95698c04d5911c991789b3c"
dependencies = [
"bytes",
"clap 4.1.4",

View File

@@ -98,13 +98,14 @@ fn make_clap_app() -> App<'static> {
async fn run_main(parent_fd: i32) -> rocket::Rocket<rocket::Build> {
let settings = read_broker_config(BROKER_CONFIG_PATH);
let (tx, rx) = mpsc::channel(10000);
let (mqtt_tx, mqtt_rx) = mpsc::channel(10000);
// let (unix_tx, mut unix_rx) = mpsc::channel(10000);
let (status_tx, mut status_rx) = mpsc::channel(10000);
let (error_tx, error_rx) = broadcast::channel(10000);
error_log::log_errors(error_rx);
log::info!("=> start broker on network: {}", settings.network);
start_broker(rx, status_tx, error_tx.clone(), CLIENT_ID, settings)
start_broker(mqtt_rx, status_tx, error_tx.clone(), CLIENT_ID, settings)
.expect("BROKER FAILED TO START");
log::info!("=> wait for connected status");
// wait for connection = true
@@ -112,14 +113,22 @@ async fn run_main(parent_fd: i32) -> rocket::Rocket<rocket::Build> {
log::info!("=> connection status: {}", status);
// assert_eq!(status, true, "expected connected = true");
// let mqtt_tx_ = mqtt_tx.clone();
// tokio::spawn(async move {
// while let Some(msg) = unix_rx.recv().await {
// // update LSS here?
// if let Err(e) = mqtt_tx_.send(msg).await {
// log::error!("failed to send on mqtt_tx {:?}", e);
// }
// }
// });
if let Ok(btc_url) = env::var("BITCOIND_RPC_URL") {
let signer_port = MqttSignerPort::new(tx.clone());
let signer_port = Box::new(MqttSignerPort::new(mqtt_tx.clone()));
let port_front = SignerPortFront::new(signer_port, settings.network);
let source_factory = Arc::new(SourceFactory::new(".", settings.network));
let frontend = Frontend::new(
Arc::new(SignerPortFront::new(
Box::new(signer_port),
settings.network,
)),
Arc::new(port_front),
source_factory,
Url::parse(&btc_url).expect("malformed btc rpc url"),
);
@@ -130,11 +139,11 @@ async fn run_main(parent_fd: i32) -> rocket::Rocket<rocket::Build> {
let conn = UnixConnection::new(parent_fd);
let client = UnixClient::new(conn);
// TODO pass status_rx into SignerLoop
let mut signer_loop = SignerLoop::new(client, tx.clone());
let mut signer_loop = SignerLoop::new(client, mqtt_tx.clone());
// spawn CLN listener on a std thread
std::thread::spawn(move || {
signer_loop.start(Some(settings));
});
routes::launch_rocket(tx, error_tx, settings)
routes::launch_rocket(mqtt_tx, error_tx, settings)
}

View File

@@ -1,7 +1,7 @@
use crate::util::Settings;
use crate::{ChannelReply, ChannelRequest};
use rocket::tokio::{sync::broadcast, sync::mpsc};
use rumqttd::{Alert, AlertEvent, Broker, Config, Notification};
use rumqttd::{protocol::QoS, Alert, AlertEvent, Broker, Config, Notification};
use sphinx_signer::sphinx_glyph::topics;
use std::time::Duration;
@@ -34,7 +34,7 @@ pub fn start_broker(
let status_sender_ = status_sender.clone();
let _alerts_handle = std::thread::spawn(move || loop {
let alert = alerts.poll();
println!("Alert: {:?}", alert);
log::info!("Alert: {:?}", alert);
match alert.1 {
Alert::Event(cid, event) => {
if cid == client_id {
@@ -78,7 +78,8 @@ pub fn start_broker(
let _relay_task = std::thread::spawn(move || {
while let Some(msg) = receiver.blocking_recv() {
if let Err(e) = link_tx.publish(msg.topic, msg.message) {
let qos = QoS::AtLeastOnce;
if let Err(e) = link_tx.publish_qos(msg.topic, msg.message, qos) {
log::error!("failed to pub to link_tx! {:?}", e);
}
let rep = msg_rx.blocking_recv();
@@ -92,7 +93,7 @@ pub fn start_broker(
// _sub_task.await.unwrap();
// _relay_task.await.unwrap();
// alerts_handle.await?;
// _alerts_handle.await?;
std::thread::sleep(Duration::from_secs(1));

View File

@@ -72,9 +72,9 @@ impl<C: 'static + Client> SignerLoop<C> {
fn do_loop(&mut self, settings: Option<Settings>) -> Result<()> {
loop {
let raw_msg = self.client.read_raw()?;
debug!("loop {}: got raw", self.log_prefix);
// debug!("loop {}: got raw", self.log_prefix);
let msg = msgs::from_vec(raw_msg.clone())?;
info!("loop {}: got {:x?}", self.log_prefix, msg);
// info!("loop {}: got {:x?}", self.log_prefix, msg);
match msg {
Message::ClientHsmFd(m) => {
self.client.write(msgs::ClientHsmFdReply {}).unwrap();
@@ -90,6 +90,7 @@ impl<C: 'static + Client> SignerLoop<C> {
thread::spawn(move || new_loop.start(None));
}
Message::Memleak(_) => {
// info!("Memleak");
let reply = msgs::MemleakReply { result: false };
self.client.write(reply)?;
}
@@ -108,7 +109,7 @@ impl<C: 'static + Client> SignerLoop<C> {
let reply = self.handle_message(raw_msg)?;
// Write the reply to the node
self.client.write_vec(reply)?;
info!("replied {}", self.log_prefix);
// info!("replied {}", self.log_prefix);
}
}
}

View File

@@ -83,7 +83,7 @@ pub fn setup_logging(who: &str, level_arg: &str) {
.level(log::LevelFilter::from_str(&level).expect("level"))
.level_for("h2", log::LevelFilter::Info)
.level_for("sled", log::LevelFilter::Info)
.level_for("rumqttd", log::LevelFilter::Warn)
// .level_for("rumqttd", log::LevelFilter::Warn)
.level_for("rocket", log::LevelFilter::Warn)
.level_for("tracing", log::LevelFilter::Warn)
.level_for("_", log::LevelFilter::Warn)