handle errors in broker

This commit is contained in:
Evan Feenstra
2022-09-15 12:00:03 -07:00
parent fad091c413
commit cbb262829c
5 changed files with 46 additions and 15 deletions

View File

@@ -40,3 +40,6 @@ COPY --from=build /sphinx-key-broker/target/release/sphinx-key-broker /usr/src/s
# rocket # rocket
ENV ROCKET_ADDRESS=0.0.0.0 ENV ROCKET_ADDRESS=0.0.0.0
EXPOSE 8000 EXPOSE 8000
# Run the binary
# CMD ["/usr/src/sphinx-key-broker"]

View File

@@ -13,7 +13,7 @@ use crate::util::read_broker_config;
use clap::{App, AppSettings, Arg}; use clap::{App, AppSettings, Arg};
use rocket::tokio::{ use rocket::tokio::{
self, self,
sync::{mpsc, oneshot}, sync::{mpsc, oneshot, broadcast},
}; };
use std::env; use std::env;
use std::sync::Arc; use std::sync::Arc;
@@ -95,13 +95,14 @@ async fn run_main(parent_fd: i32) -> rocket::Rocket<rocket::Build> {
let (tx, rx) = mpsc::channel(1000); let (tx, rx) = mpsc::channel(1000);
let (status_tx, mut status_rx) = mpsc::channel(1000); let (status_tx, mut status_rx) = mpsc::channel(1000);
let (error_tx, _) = broadcast::channel(1000);
log::info!("=> start broker on network: {}", settings.network); log::info!("=> start broker on network: {}", settings.network);
start_broker(rx, status_tx, "sphinx-1", &settings).await; start_broker(rx, status_tx, error_tx.clone(), "sphinx-1", &settings).await;
log::info!("=> wait for connected status"); log::info!("=> wait for connected status");
// wait for connection = true // wait for connection = true
let status = status_rx.recv().await.expect("couldnt receive"); let status = status_rx.recv().await.expect("couldnt receive");
log::info!("=> connection status: {}", status); log::info!("=> connection status: {}", status);
assert_eq!(status, true, "expected connected = true"); // assert_eq!(status, true, "expected connected = true");
if let Ok(btc_url) = env::var("BITCOIND_RPC_URL") { if let Ok(btc_url) = env::var("BITCOIND_RPC_URL") {
let signer_port = MqttSignerPort::new(tx.clone()); let signer_port = MqttSignerPort::new(tx.clone());
@@ -125,5 +126,5 @@ async fn run_main(parent_fd: i32) -> rocket::Rocket<rocket::Build> {
signer_loop.start(Some(&settings)); signer_loop.start(Some(&settings));
}); });
routes::launch_rocket(tx) routes::launch_rocket(tx, error_tx)
} }

View File

@@ -7,7 +7,7 @@ use librumqttd::{
Config, Config,
}; };
use rocket::tokio::time::timeout; use rocket::tokio::time::timeout;
use rocket::tokio::{self, sync::mpsc}; use rocket::tokio::{self, sync::mpsc, sync::broadcast};
use sphinx_key_parser::topics; use sphinx_key_parser::topics;
use std::sync::Arc; use std::sync::Arc;
use std::sync::{LazyLock, Mutex}; use std::sync::{LazyLock, Mutex};
@@ -29,6 +29,7 @@ fn get_connected() -> bool {
pub async fn start_broker( pub async fn start_broker(
mut receiver: mpsc::Receiver<ChannelRequest>, mut receiver: mpsc::Receiver<ChannelRequest>,
status_sender: mpsc::Sender<bool>, status_sender: mpsc::Sender<bool>,
error_sender: broadcast::Sender<Vec<u8>>,
expected_client_id: &str, expected_client_id: &str,
settings: &Settings, settings: &Settings,
) { ) {
@@ -49,7 +50,7 @@ pub async fn start_broker(
mpsc::channel(1000); mpsc::channel(1000);
let (mut link_tx, mut link_rx) = builder.clone().connect("localclient", 200).await.unwrap(); let (mut link_tx, mut link_rx) = builder.clone().connect("localclient", 200).await.unwrap();
link_tx link_tx
.subscribe([topics::VLS_RETURN, topics::CONTROL_RETURN]) .subscribe([topics::VLS_RETURN, topics::CONTROL_RETURN, topics::ERROR])
.await .await
.unwrap(); .unwrap();
@@ -76,6 +77,9 @@ pub async fn start_broker(
let sub_task = tokio::spawn(async move { let sub_task = tokio::spawn(async move {
while let Ok(message) = link_rx.recv().await { while let Ok(message) = link_rx.recv().await {
for payload in message.payload { for payload in message.payload {
if message.topic == topics::ERROR {
let _ = error_sender.send(payload.to_vec());
}
if let Err(e) = msg_tx.send(payload.to_vec()).await { if let Err(e) = msg_tx.send(payload.to_vec()).await {
log::warn!("pub err {:?}", e); log::warn!("pub err {:?}", e);
} }

View File

@@ -1,15 +1,16 @@
use crate::ChannelRequest; use crate::ChannelRequest;
use rocket::fairing::{Fairing, Info, Kind}; use rocket::fairing::{Fairing, Info, Kind};
use rocket::http::Header; use rocket::http::Header;
use rocket::tokio::sync::mpsc::Sender; use rocket::tokio::sync::{mpsc::Sender, broadcast::{self, error::RecvError}};
use rocket::response::stream::{EventStream, Event};
use rocket::tokio::select;
use rocket::*; use rocket::*;
use rocket::{Request, Response}; use sphinx_key_parser::{topics, error::Error as ParserError};
use sphinx_key_parser::topics;
pub type Result<T> = std::result::Result<T, Error>; pub type Result<T> = std::result::Result<T, Error>;
#[post("/control?<msg>")] #[post("/control?<msg>")]
pub async fn yo(sender: &State<Sender<ChannelRequest>>, msg: &str) -> Result<String> { pub async fn control(sender: &State<Sender<ChannelRequest>>, msg: &str) -> Result<String> {
let message = hex::decode(msg)?; let message = hex::decode(msg)?;
// FIXME validate? // FIXME validate?
if message.len() < 65 { if message.len() < 65 {
@@ -23,11 +24,31 @@ pub async fn yo(sender: &State<Sender<ChannelRequest>>, msg: &str) -> Result<Str
Ok(hex::encode(reply.reply).to_string()) Ok(hex::encode(reply.reply).to_string())
} }
pub fn launch_rocket(tx: Sender<ChannelRequest>) -> Rocket<Build> { #[get("/errors")]
async fn errors(error_tx: &State<broadcast::Sender<Vec<u8>>>, mut end: Shutdown) -> EventStream![] {
let mut rx = error_tx.subscribe();
EventStream! {
loop {
let msg = select! {
msg = rx.recv() => match msg {
Ok(msg) => ParserError::from_slice(&msg[..]),
Err(RecvError::Closed) => break,
Err(RecvError::Lagged(_)) => continue,
},
_ = &mut end => break,
};
yield Event::json(&msg);
}
}
}
pub fn launch_rocket(tx: Sender<ChannelRequest>, error_tx: broadcast::Sender<Vec<u8>>) -> Rocket<Build> {
rocket::build() rocket::build()
.mount("/api/", routes![yo]) .mount("/api/", routes![control, errors])
.attach(CORS) .attach(CORS)
.manage(tx) .manage(tx)
.manage(error_tx)
} }
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]

View File

@@ -2,7 +2,7 @@ use crate::mqtt::start_broker;
use crate::routes::launch_rocket; use crate::routes::launch_rocket;
use crate::util::Settings; use crate::util::Settings;
use crate::ChannelRequest; use crate::ChannelRequest;
use rocket::tokio::{self, sync::mpsc}; use rocket::tokio::{self, sync::mpsc, sync::broadcast};
use sphinx_key_parser as parser; use sphinx_key_parser as parser;
use sphinx_key_parser::topics; use sphinx_key_parser::topics;
use vls_protocol::serde_bolt::WireString; use vls_protocol::serde_bolt::WireString;
@@ -20,7 +20,8 @@ pub async fn run_test() -> rocket::Rocket<rocket::Build> {
let (tx, rx) = mpsc::channel(1000); let (tx, rx) = mpsc::channel(1000);
let (status_tx, mut status_rx) = mpsc::channel(1000); let (status_tx, mut status_rx) = mpsc::channel(1000);
start_broker(rx, status_tx, CLIENT_ID, &settings).await; let (error_tx, _) = broadcast::channel(1000);
start_broker(rx, status_tx, error_tx.clone(), CLIENT_ID, &settings).await;
let mut connected = false; let mut connected = false;
let tx_ = tx.clone(); let tx_ = tx.clone();
tokio::spawn(async move { tokio::spawn(async move {
@@ -49,10 +50,11 @@ pub async fn run_test() -> rocket::Rocket<rocket::Build> {
}; };
} }
}); });
launch_rocket(tx) launch_rocket(tx, error_tx)
} }
#[allow(dead_code)] #[allow(dead_code)]
#[allow(unused)]
pub async fn iteration( pub async fn iteration(
id: u16, id: u16,
sequence: u16, sequence: u16,