diff --git a/broker/Cargo.toml b/broker/Cargo.toml index 246b431..b4d461e 100644 --- a/broker/Cargo.toml +++ b/broker/Cargo.toml @@ -13,7 +13,6 @@ vls-protocol-client = { git = "https://gitlab.com/lightning-signer/validating-li rumqttd = { git = "https://github.com/Evanfeenstra/rumqtt", branch = "metrics" } pretty_env_logger = "0.4.0" confy = "0.4.0" -tokio = { version = "1.4.0", features = ["rt", "rt-multi-thread", "macros"] } sphinx-key-parser = { path = "../parser" } secp256k1 = { version = "0.24.0", features = ["rand-std", "bitcoin_hashes"] } anyhow = {version = "1", features = ["backtrace"]} @@ -28,6 +27,8 @@ bitcoin = "0.29.0" async-trait = "0.1" url = { version = "2.2" } toml = "0.5.9" +rocket = {version = "0.5.0-rc.2", features = ["json"]} +thiserror = "1.0.31" [features] default = ["std"] diff --git a/broker/src/chain_tracker.rs b/broker/src/chain_tracker.rs index 06cd663..c4ee919 100644 --- a/broker/src/chain_tracker.rs +++ b/broker/src/chain_tracker.rs @@ -1,6 +1,6 @@ use crate::{ChannelReply, ChannelRequest}; use async_trait::async_trait; -use tokio::sync::{mpsc, oneshot}; +use rocket::tokio::sync::{mpsc, oneshot}; use vls_protocol::{Error, Result}; use vls_protocol_client::SignerPort; diff --git a/broker/src/main.rs b/broker/src/main.rs index 0242b6c..118e61d 100644 --- a/broker/src/main.rs +++ b/broker/src/main.rs @@ -1,6 +1,7 @@ #![feature(once_cell)] mod chain_tracker; mod mqtt; +mod routes; mod run_test; mod unix_fd; mod util; @@ -12,7 +13,7 @@ use crate::util::read_broker_config; use clap::{App, AppSettings, Arg}; use std::env; use std::sync::Arc; -use tokio::sync::{mpsc, oneshot}; +use rocket::tokio::{self, sync::{mpsc, oneshot}}; use url::Url; use vls_frontend::Frontend; use vls_proxy::client::UnixClient; @@ -39,7 +40,8 @@ pub struct ChannelReply { const BROKER_CONFIG_PATH: &str = "../broker.conf"; -fn main() -> anyhow::Result<()> { +#[rocket::launch] +async fn rocket() -> _ { let parent_fd = open_parent_fd(); util::setup_logging("hsmd ", "info"); @@ -63,26 +65,28 @@ fn main() -> anyhow::Result<()> { let version = env::var("GREENLIGHT_VERSION").expect("set GREENLIGHT_VERSION to match c-lightning"); println!("{}", version); - return Ok(()); - } - - if matches.is_present("test") { - run_test::run_test(); - return Ok(()); + panic!("end") + } else { + if matches.is_present("test") { + run_test::run_test().await + } else { + run_main(parent_fd).await + } } +} +async fn run_main(parent_fd: i32) -> rocket::Rocket { let settings = read_broker_config(BROKER_CONFIG_PATH); let (tx, rx) = mpsc::channel(1000); let (status_tx, mut status_rx) = mpsc::channel(1000); log::info!("=> start broker on network: {}", settings.network); - let runtime = start_broker(rx, status_tx, "sphinx-1", &settings); + start_broker(rx, status_tx, "sphinx-1", &settings).await; log::info!("=> wait for connected status"); // wait for connection = true - let status = status_rx.blocking_recv().expect("couldnt receive"); + let status = status_rx.recv().await.expect("couldnt receive"); log::info!("=> connection status: {}", status); assert_eq!(status, true, "expected connected = true"); - // runtime.block_on(async { if let Ok(btc_url) = env::var("BITCOIND_RPC_URL") { let signer_port = MqttSignerPort::new(tx.clone()); @@ -93,7 +97,7 @@ fn main() -> anyhow::Result<()> { }), Url::parse(&btc_url).expect("malformed btc rpc url"), ); - runtime.block_on(async { + tokio::spawn(async move { frontend.start(); }); } @@ -101,9 +105,10 @@ fn main() -> anyhow::Result<()> { let conn = UnixConnection::new(parent_fd); let client = UnixClient::new(conn); // TODO pass status_rx into SignerLoop - let mut signer_loop = SignerLoop::new(client, tx); - signer_loop.start(Some(&settings)); - // }) + let mut signer_loop = SignerLoop::new(client, tx.clone()); + std::thread::spawn(move || { + signer_loop.start(Some(&settings)); + }); - Ok(()) + routes::launch_rocket(tx) } diff --git a/broker/src/mqtt.rs b/broker/src/mqtt.rs index 09a7760..f2fbd5d 100644 --- a/broker/src/mqtt.rs +++ b/broker/src/mqtt.rs @@ -6,12 +6,11 @@ use librumqttd::{ rumqttlog::router::ConnectionMetrics, Config, }; +use rocket::tokio::time::timeout; +use rocket::tokio::{self, sync::mpsc}; use std::sync::Arc; use std::sync::{LazyLock, Mutex}; -use std::thread; use std::time::Duration; -use tokio::sync::mpsc; -use tokio::time::timeout; const SUB_TOPIC: &str = "sphinx-return"; const PUB_TOPIC: &str = "sphinx"; @@ -28,100 +27,92 @@ fn get_connected() -> bool { *CONNECTED.lock().unwrap() } -pub fn start_broker( +pub async fn start_broker( mut receiver: mpsc::Receiver, status_sender: mpsc::Sender, expected_client_id: &str, settings: &Settings, -) -> tokio::runtime::Runtime { +) { let config = config(settings); let client_id = expected_client_id.to_string(); let (mut router, servers, builder) = async_locallink::construct(config.clone()); - thread::spawn(move || { + tokio::spawn(async move { router.start().expect("could not start router"); }); - let mut rt_builder = tokio::runtime::Builder::new_multi_thread(); - rt_builder.enable_all(); - let rt = rt_builder.build().unwrap(); - rt.block_on(async { - tokio::spawn(async move { - let (msg_tx, mut msg_rx): (mpsc::Sender>, mpsc::Receiver>) = - mpsc::channel(1000); - let (mut link_tx, mut link_rx) = - builder.clone().connect("localclient", 200).await.unwrap(); - link_tx.subscribe([SUB_TOPIC]).await.unwrap(); + tokio::spawn(async move { + let (msg_tx, mut msg_rx): (mpsc::Sender>, mpsc::Receiver>) = + mpsc::channel(1000); + let (mut link_tx, mut link_rx) = builder.clone().connect("localclient", 200).await.unwrap(); + link_tx.subscribe([SUB_TOPIC]).await.unwrap(); - let router_tx = builder.router_tx(); - let status_sender_ = status_sender.clone(); - tokio::spawn(async move { - let config = config.clone().into(); - let router_tx = router_tx.clone(); - let console: Arc = Arc::new(ConsoleLink::new(config, router_tx)); - loop { - let metrics = consolelink::request_metrics(console.clone(), client_id.clone()); - if let Some(c) = metrics_to_status(metrics, get_connected()) { - set_connected(c); - log::info!("connection status changed to: {}", c); - status_sender_ - .send(c) + let router_tx = builder.router_tx(); + let status_sender_ = status_sender.clone(); + tokio::spawn(async move { + let config = config.clone().into(); + let router_tx = router_tx.clone(); + let console: Arc = Arc::new(ConsoleLink::new(config, router_tx)); + loop { + let metrics = consolelink::request_metrics(console.clone(), client_id.clone()); + if let Some(c) = metrics_to_status(metrics, get_connected()) { + set_connected(c); + log::info!("connection status changed to: {}", c); + status_sender_ + .send(c) + .await + .expect("couldnt send connection status"); + } + tokio::time::sleep(Duration::from_millis(500)).await; + } + }); + + let sub_task = tokio::spawn(async move { + while let Ok(message) = link_rx.recv().await { + for payload in message.payload { + if let Err(e) = msg_tx.send(payload.to_vec()).await { + log::warn!("pub err {:?}", e); + } + } + } + println!("BOOM LINK_TX CLOSED!"); + }); + + let relay_task = tokio::spawn(async move { + while let Some(msg) = receiver.recv().await { + link_tx + .publish(PUB_TOPIC, false, msg.message) + .await + .expect("could not mqtt pub"); + match timeout(Duration::from_millis(REPLY_TIMEOUT_MS), msg_rx.recv()).await { + Ok(reply) => { + if let Err(_) = msg.reply_tx.send(ChannelReply { + reply: reply.unwrap(), + }) { + log::warn!("could not send on reply_tx"); + } + } + Err(e) => { + log::warn!("reply_tx timed out {:?}", e); + set_connected(false); + status_sender + .send(false) .await .expect("couldnt send connection status"); } - tokio::time::sleep(Duration::from_millis(500)).await; } - }); - - let sub_task = tokio::spawn(async move { - while let Ok(message) = link_rx.recv().await { - for payload in message.payload { - if let Err(e) = msg_tx.send(payload.to_vec()).await { - log::warn!("pub err {:?}", e); - } - } - } - println!("BOOM LINK_TX CLOSED!"); - }); - - let relay_task = tokio::spawn(async move { - while let Some(msg) = receiver.recv().await { - link_tx - .publish(PUB_TOPIC, false, msg.message) - .await - .expect("could not mqtt pub"); - match timeout(Duration::from_millis(REPLY_TIMEOUT_MS), msg_rx.recv()).await { - Ok(reply) => { - if let Err(_) = msg.reply_tx.send(ChannelReply { - reply: reply.unwrap(), - }) { - log::warn!("could not send on reply_tx"); - } - } - Err(e) => { - log::warn!("reply_tx timed out {:?}", e); - set_connected(false); - status_sender - .send(false) - .await - .expect("couldnt send connection status"); - } - } - } - println!("BOOM RECEIVER CLOSED!"); - }); - - servers.await; - sub_task.await.unwrap(); - relay_task.await.unwrap(); + } + println!("BOOM RECEIVER CLOSED!"); }); + + servers.await; + sub_task.await.unwrap(); + relay_task.await.unwrap(); }); // give one second for router to spawn listeners - std::thread::sleep(std::time::Duration::from_secs(1)); - - rt + tokio::time::sleep(std::time::Duration::from_secs(1)).await; } fn metrics_to_status(metrics: ConnectionMetrics, client_connected: bool) -> Option { diff --git a/broker/src/routes.rs b/broker/src/routes.rs new file mode 100644 index 0000000..c57ca17 --- /dev/null +++ b/broker/src/routes.rs @@ -0,0 +1,62 @@ +use crate::ChannelRequest; +use rocket::fairing::{Fairing, Info, Kind}; +use rocket::http::Header; +use rocket::serde::json::json; +use rocket::tokio::sync::mpsc::Sender; +use rocket::*; +use rocket::{Request, Response}; + +pub fn launch_rocket(tx: Sender) -> Rocket { + rocket::build() + .mount("/api/", routes![yo]) + .attach(CORS) + .manage(tx) +} + +pub type Result = std::result::Result; + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("io error: {0}")] + Io(#[from] std::io::Error), +} + +#[get("/yo")] +pub async fn yo() -> Result { + Ok(json!(1).to_string()) +} + +pub struct CORS; + +#[rocket::async_trait] +impl Fairing for CORS { + fn info(&self) -> Info { + Info { + name: "Add CORS headers to responses", + kind: Kind::Response, + } + } + + async fn on_response<'r>(&self, _request: &'r Request<'_>, response: &mut Response<'r>) { + response.set_header(Header::new("Access-Control-Allow-Origin", "*")); + response.set_header(Header::new( + "Access-Control-Allow-Methods", + "POST, GET, PATCH, OPTIONS", + )); + response.set_header(Header::new("Access-Control-Allow-Headers", "*")); + response.set_header(Header::new("Access-Control-Allow-Credentials", "true")); + } +} + +use rocket::http::Status; +use rocket::response::{self, Responder}; +impl<'r, 'o: 'r> Responder<'r, 'o> for Error { + fn respond_to(self, req: &'r rocket::Request<'_>) -> response::Result<'o> { + // log `self` to your favored error tracker, e.g. + // sentry::capture_error(&self); + match self { + // in our simplistic example, we're happy to respond with the default 500 responder in all cases + _ => Status::InternalServerError.respond_to(req), + } + } +} diff --git a/broker/src/run_test.rs b/broker/src/run_test.rs index 9b354b7..5d8a4c4 100644 --- a/broker/src/run_test.rs +++ b/broker/src/run_test.rs @@ -1,14 +1,18 @@ use crate::mqtt::start_broker; +use crate::routes::launch_rocket; use crate::util::Settings; use crate::ChannelRequest; +use rocket::tokio::{ + self, + sync::{mpsc, oneshot}, +}; use sphinx_key_parser as parser; -use tokio::sync::{mpsc, oneshot}; use vls_protocol::serde_bolt::WireString; use vls_protocol::{msgs, msgs::Message}; const CLIENT_ID: &str = "test-1"; -pub fn run_test() { +pub async fn run_test() -> rocket::Rocket { log::info!("TEST..."); let mut id = 0u16; @@ -18,9 +22,10 @@ pub fn run_test() { let (tx, rx) = mpsc::channel(1000); let (status_tx, mut status_rx) = mpsc::channel(1000); - let runtime = start_broker(rx, status_tx, CLIENT_ID, &settings); - runtime.block_on(async { - let mut connected = false; + start_broker(rx, status_tx, CLIENT_ID, &settings).await; + let mut connected = false; + let tx_ = tx.clone(); + tokio::spawn(async move { loop { tokio::select! { status = status_rx.recv() => { @@ -31,7 +36,7 @@ pub fn run_test() { log::info!("========> CONNECTED! {}", connection_status); } } - res = iteration(id, sequence, tx.clone(), connected) => { + res = iteration(id, sequence, tx_.clone(), connected) => { if let Err(e) = res { log::warn!("===> iteration failed {:?}", e); // connected = false; @@ -46,6 +51,7 @@ pub fn run_test() { }; } }); + launch_rocket(tx) } pub async fn iteration( diff --git a/broker/src/unix_fd.rs b/broker/src/unix_fd.rs index 9adcd8f..8c42c45 100644 --- a/broker/src/unix_fd.rs +++ b/broker/src/unix_fd.rs @@ -2,10 +2,10 @@ use crate::util::Settings; use crate::{Channel, ChannelReply, ChannelRequest}; use bitcoin::blockdata::constants::ChainHash; use log::*; +use rocket::tokio::sync::{mpsc, oneshot}; use secp256k1::PublicKey; use sphinx_key_parser as parser; use std::thread; -use tokio::sync::{mpsc, oneshot}; use vls_protocol::{msgs, msgs::Message, Error, Result}; use vls_proxy::client::Client; diff --git a/broker/src/util.rs b/broker/src/util.rs index 9fd57e2..301b271 100644 --- a/broker/src/util.rs +++ b/broker/src/util.rs @@ -99,7 +99,8 @@ fn read_port_setting(table: &Value) -> Option { if temp <= 1023 { panic!("The port number is not an integer greater than 1023") } - if temp > u16::MAX.into() { + let max: i64 = u16::MAX.into(); + if temp > max { panic!("The port number is way too big!") } log::info!("Read broker port setting: {}", temp);