From c0882f3c1e7a792d25033dd621ba3c6a8aa53afc Mon Sep 17 00:00:00 2001 From: Evan Feenstra Date: Tue, 6 Sep 2022 13:33:58 -0700 Subject: [PATCH] try to integrate rocket --- broker/Cargo.toml | 2 ++ broker/rocket.toml | 3 ++ broker/src/init.rs | 8 ++--- broker/src/main.rs | 1 + broker/src/mqtt.rs | 8 +++-- broker/src/run_test.rs | 5 +++ broker/src/server.rs | 70 ++++++++++++++++++++++++++++++++++++++++++ tester/Cargo.toml | 4 +++ tester/readme.md | 1 + 9 files changed, 95 insertions(+), 7 deletions(-) create mode 100644 broker/rocket.toml create mode 100644 broker/src/server.rs create mode 100644 tester/readme.md diff --git a/broker/Cargo.toml b/broker/Cargo.toml index 2f8618d..ded9fae 100644 --- a/broker/Cargo.toml +++ b/broker/Cargo.toml @@ -27,6 +27,8 @@ once_cell = "1.12.0" bitcoin = "0.29.0" async-trait = "0.1" url = { version = "2.2" } +rocket = "0.5.0-rc.2" +thiserror = "1.0.31" [features] default = ["std"] diff --git a/broker/rocket.toml b/broker/rocket.toml new file mode 100644 index 0000000..d8f7169 --- /dev/null +++ b/broker/rocket.toml @@ -0,0 +1,3 @@ + +[default] +address = "0.0.0.0" \ No newline at end of file diff --git a/broker/src/init.rs b/broker/src/init.rs index f03c68b..c96f55f 100644 --- a/broker/src/init.rs +++ b/broker/src/init.rs @@ -7,8 +7,8 @@ use vls_protocol::model::Secret; use vls_protocol::{msgs, serde_bolt::WireString}; use vls_proxy::util::{read_allowlist, read_integration_test_seed}; -pub fn blocking_connect(tx: mpsc::Sender, network: Network) { - let init_msg_2 = crate::init::make_init_msg(network).expect("couldnt make init msg"); +pub fn _blocking_connect(tx: mpsc::Sender, network: Network) { + let init_msg_2 = crate::init::_make_init_msg(network).expect("couldnt make init msg"); let (reply_tx, reply_rx) = oneshot::channel(); // Send a request to the MQTT handler to send to signer let request = ChannelRequest { @@ -22,7 +22,7 @@ pub fn blocking_connect(tx: mpsc::Sender, network: Network) { } pub async fn _connect(tx: mpsc::Sender, network: Network) { - let init_msg_2 = crate::init::make_init_msg(network).expect("could make init msg"); + let init_msg_2 = crate::init::_make_init_msg(network).expect("could make init msg"); let (reply_tx, reply_rx) = oneshot::channel(); // Send a request to the MQTT handler to send to signer let request = ChannelRequest { @@ -35,7 +35,7 @@ pub async fn _connect(tx: mpsc::Sender, network: Network) { println!("REPLY {:?}", reply); } -pub fn make_init_msg(network: Network) -> anyhow::Result> { +pub fn _make_init_msg(network: Network) -> anyhow::Result> { let allowlist = read_allowlist() .into_iter() .map(|s| WireString(s.as_bytes().to_vec())) diff --git a/broker/src/main.rs b/broker/src/main.rs index 374f7db..57c4060 100644 --- a/broker/src/main.rs +++ b/broker/src/main.rs @@ -3,6 +3,7 @@ mod chain_tracker; mod init; mod mqtt; mod run_test; +mod server; mod unix_fd; mod util; diff --git a/broker/src/mqtt.rs b/broker/src/mqtt.rs index e3ac1ca..938d40b 100644 --- a/broker/src/mqtt.rs +++ b/broker/src/mqtt.rs @@ -41,9 +41,12 @@ pub fn start_broker( router.start().expect("could not start router"); }); - let mut rt_builder = tokio::runtime::Builder::new_multi_thread(); + let mut rt_builder = rocket::tokio::runtime::Builder::new_multi_thread(); + // for graceful shutdown of rocket + rt_builder.worker_threads(rocket::Config::from(rocket::Config::figment()).workers); + rt_builder.thread_name("rocket-worker-thread"); rt_builder.enable_all(); - let rt = rt_builder.build().unwrap(); + let rt = rt_builder.build().expect("failed to build runtime"); rt.block_on(async { tokio::spawn(async move { let (msg_tx, mut msg_rx): (mpsc::Sender>, mpsc::Receiver>) = @@ -107,7 +110,6 @@ pub fn start_broker( } } } - println!("BOOM RECEIVER CLOSED!"); }); servers.await; diff --git a/broker/src/run_test.rs b/broker/src/run_test.rs index 8102071..cf9c1bb 100644 --- a/broker/src/run_test.rs +++ b/broker/src/run_test.rs @@ -1,4 +1,5 @@ use crate::mqtt::start_broker; +use crate::server::launch_rocket; use crate::ChannelRequest; use sphinx_key_parser as parser; use tokio::sync::{mpsc, oneshot}; @@ -17,6 +18,10 @@ pub fn run_test() { let (status_tx, mut status_rx) = mpsc::channel(1000); let runtime = start_broker(rx, status_tx, CLIENT_ID); runtime.block_on(async { + let _r = launch_rocket(tx.clone()) + .await + .expect("couldnt launch rocket"); + println!("ROCKET!!!"); let mut connected = false; loop { tokio::select! { diff --git a/broker/src/server.rs b/broker/src/server.rs new file mode 100644 index 0000000..84e57c7 --- /dev/null +++ b/broker/src/server.rs @@ -0,0 +1,70 @@ +use crate::ChannelRequest; + +use rocket::fairing::{Fairing, Info, Kind}; +use rocket::http::Header; +use rocket::*; +use tokio::sync::mpsc; + +pub async fn launch_rocket( + tx: mpsc::Sender, +) -> std::result::Result, rocket::Error> { + let rz = routes![tester]; + println!("LAUNCH ROCKET!"); + rocket::build() + .mount("/api/", rz) + .attach(CORS) + .manage(tx) + .launch() + .await +} + +#[get("/tester")] +pub async fn tester() -> Result { + // + Ok("hi".to_string()) +} + +pub struct CORS; + +#[rocket::async_trait] +impl Fairing for CORS { + fn info(&self) -> Info { + Info { + name: "Add CORS headers to responses", + kind: Kind::Response, + } + } + + async fn on_response<'r>(&self, _request: &'r Request<'_>, response: &mut Response<'r>) { + response.set_header(Header::new("Access-Control-Allow-Origin", "*")); + response.set_header(Header::new( + "Access-Control-Allow-Methods", + "POST, GET, PATCH, OPTIONS", + )); + response.set_header(Header::new("Access-Control-Allow-Headers", "*")); + response.set_header(Header::new("Access-Control-Allow-Credentials", "true")); + } +} + +pub type Result = std::result::Result; + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("anyhow error: {0}")] + Anyhow(#[from] anyhow::Error), + #[error("not found")] + NotFound, +} + +use rocket::http::Status; +use rocket::response::{self, Responder}; +impl<'r, 'o: 'r> Responder<'r, 'o> for Error { + fn respond_to(self, req: &'r rocket::Request<'_>) -> response::Result<'o> { + // log `self` to your favored error tracker, e.g. + // sentry::capture_error(&self); + match self { + // in our simplistic example, we're happy to respond with the default 500 responder in all cases + _ => Status::InternalServerError.respond_to(req), + } + } +} diff --git a/tester/Cargo.toml b/tester/Cargo.toml index be125f6..aa6919b 100644 --- a/tester/Cargo.toml +++ b/tester/Cargo.toml @@ -26,6 +26,10 @@ urlencoding = "2.1.0" dotenv = "0.15.0" rocket = "0.5.0-rc.2" +[[bin]] +name = "tester" +path = "src/main.rs" + [[bin]] name = "config" path = "src/config.rs" diff --git a/tester/readme.md b/tester/readme.md new file mode 100644 index 0000000..98daa39 --- /dev/null +++ b/tester/readme.md @@ -0,0 +1 @@ +cargo run --bin tester -- --test --log \ No newline at end of file