diff --git a/broker/Dockerfile b/broker/Dockerfile index 12c1406..927be50 100644 --- a/broker/Dockerfile +++ b/broker/Dockerfile @@ -40,3 +40,6 @@ COPY --from=build /sphinx-key-broker/target/release/sphinx-key-broker /usr/src/s # rocket ENV ROCKET_ADDRESS=0.0.0.0 EXPOSE 8000 + +# Run the binary +# CMD ["/usr/src/sphinx-key-broker"] \ No newline at end of file diff --git a/broker/src/main.rs b/broker/src/main.rs index 3a939b2..15a925f 100644 --- a/broker/src/main.rs +++ b/broker/src/main.rs @@ -13,7 +13,7 @@ use crate::util::read_broker_config; use clap::{App, AppSettings, Arg}; use rocket::tokio::{ self, - sync::{mpsc, oneshot}, + sync::{mpsc, oneshot, broadcast}, }; use std::env; use std::sync::Arc; @@ -95,13 +95,14 @@ async fn run_main(parent_fd: i32) -> rocket::Rocket { let (tx, rx) = mpsc::channel(1000); let (status_tx, mut status_rx) = mpsc::channel(1000); + let (error_tx, _) = broadcast::channel(1000); log::info!("=> start broker on network: {}", settings.network); - start_broker(rx, status_tx, "sphinx-1", &settings).await; + start_broker(rx, status_tx, error_tx.clone(), "sphinx-1", &settings).await; log::info!("=> wait for connected status"); // wait for connection = true let status = status_rx.recv().await.expect("couldnt receive"); log::info!("=> connection status: {}", status); - assert_eq!(status, true, "expected connected = true"); + // assert_eq!(status, true, "expected connected = true"); if let Ok(btc_url) = env::var("BITCOIND_RPC_URL") { let signer_port = MqttSignerPort::new(tx.clone()); @@ -125,5 +126,5 @@ async fn run_main(parent_fd: i32) -> rocket::Rocket { signer_loop.start(Some(&settings)); }); - routes::launch_rocket(tx) + routes::launch_rocket(tx, error_tx) } diff --git a/broker/src/mqtt.rs b/broker/src/mqtt.rs index 05f9b36..7e58ec3 100644 --- a/broker/src/mqtt.rs +++ b/broker/src/mqtt.rs @@ -7,7 +7,7 @@ use librumqttd::{ Config, }; use rocket::tokio::time::timeout; -use rocket::tokio::{self, sync::mpsc}; +use rocket::tokio::{self, sync::mpsc, sync::broadcast}; use sphinx_key_parser::topics; use std::sync::Arc; use std::sync::{LazyLock, Mutex}; @@ -29,6 +29,7 @@ fn get_connected() -> bool { pub async fn start_broker( mut receiver: mpsc::Receiver, status_sender: mpsc::Sender, + error_sender: broadcast::Sender>, expected_client_id: &str, settings: &Settings, ) { @@ -49,7 +50,7 @@ pub async fn start_broker( mpsc::channel(1000); let (mut link_tx, mut link_rx) = builder.clone().connect("localclient", 200).await.unwrap(); link_tx - .subscribe([topics::VLS_RETURN, topics::CONTROL_RETURN]) + .subscribe([topics::VLS_RETURN, topics::CONTROL_RETURN, topics::ERROR]) .await .unwrap(); @@ -76,6 +77,9 @@ pub async fn start_broker( let sub_task = tokio::spawn(async move { while let Ok(message) = link_rx.recv().await { for payload in message.payload { + if message.topic == topics::ERROR { + let _ = error_sender.send(payload.to_vec()); + } if let Err(e) = msg_tx.send(payload.to_vec()).await { log::warn!("pub err {:?}", e); } diff --git a/broker/src/routes.rs b/broker/src/routes.rs index d177857..fc64d29 100644 --- a/broker/src/routes.rs +++ b/broker/src/routes.rs @@ -1,15 +1,16 @@ use crate::ChannelRequest; use rocket::fairing::{Fairing, Info, Kind}; use rocket::http::Header; -use rocket::tokio::sync::mpsc::Sender; +use rocket::tokio::sync::{mpsc::Sender, broadcast::{self, error::RecvError}}; +use rocket::response::stream::{EventStream, Event}; +use rocket::tokio::select; use rocket::*; -use rocket::{Request, Response}; -use sphinx_key_parser::topics; +use sphinx_key_parser::{topics, error::Error as ParserError}; pub type Result = std::result::Result; #[post("/control?")] -pub async fn yo(sender: &State>, msg: &str) -> Result { +pub async fn control(sender: &State>, msg: &str) -> Result { let message = hex::decode(msg)?; // FIXME validate? if message.len() < 65 { @@ -23,11 +24,31 @@ pub async fn yo(sender: &State>, msg: &str) -> Result) -> Rocket { +#[get("/errors")] +async fn errors(error_tx: &State>>, mut end: Shutdown) -> EventStream![] { + let mut rx = error_tx.subscribe(); + EventStream! { + loop { + let msg = select! { + msg = rx.recv() => match msg { + Ok(msg) => ParserError::from_slice(&msg[..]), + Err(RecvError::Closed) => break, + Err(RecvError::Lagged(_)) => continue, + }, + _ = &mut end => break, + }; + + yield Event::json(&msg); + } + } +} + +pub fn launch_rocket(tx: Sender, error_tx: broadcast::Sender>) -> Rocket { rocket::build() - .mount("/api/", routes![yo]) + .mount("/api/", routes![control, errors]) .attach(CORS) .manage(tx) + .manage(error_tx) } #[derive(Debug, thiserror::Error)] diff --git a/broker/src/run_test.rs b/broker/src/run_test.rs index 572769a..9f94a5a 100644 --- a/broker/src/run_test.rs +++ b/broker/src/run_test.rs @@ -2,7 +2,7 @@ use crate::mqtt::start_broker; use crate::routes::launch_rocket; use crate::util::Settings; use crate::ChannelRequest; -use rocket::tokio::{self, sync::mpsc}; +use rocket::tokio::{self, sync::mpsc, sync::broadcast}; use sphinx_key_parser as parser; use sphinx_key_parser::topics; use vls_protocol::serde_bolt::WireString; @@ -20,7 +20,8 @@ pub async fn run_test() -> rocket::Rocket { let (tx, rx) = mpsc::channel(1000); let (status_tx, mut status_rx) = mpsc::channel(1000); - start_broker(rx, status_tx, CLIENT_ID, &settings).await; + let (error_tx, _) = broadcast::channel(1000); + start_broker(rx, status_tx, error_tx.clone(), CLIENT_ID, &settings).await; let mut connected = false; let tx_ = tx.clone(); tokio::spawn(async move { @@ -49,10 +50,11 @@ pub async fn run_test() -> rocket::Rocket { }; } }); - launch_rocket(tx) + launch_rocket(tx, error_tx) } #[allow(dead_code)] +#[allow(unused)] pub async fn iteration( id: u16, sequence: u16,