try to integrate rocket

This commit is contained in:
Evan Feenstra
2022-09-06 13:33:58 -07:00
parent b8090d6c12
commit c0882f3c1e
9 changed files with 95 additions and 7 deletions

View File

@@ -27,6 +27,8 @@ once_cell = "1.12.0"
bitcoin = "0.29.0" bitcoin = "0.29.0"
async-trait = "0.1" async-trait = "0.1"
url = { version = "2.2" } url = { version = "2.2" }
rocket = "0.5.0-rc.2"
thiserror = "1.0.31"
[features] [features]
default = ["std"] default = ["std"]

3
broker/rocket.toml Normal file
View File

@@ -0,0 +1,3 @@
[default]
address = "0.0.0.0"

View File

@@ -7,8 +7,8 @@ use vls_protocol::model::Secret;
use vls_protocol::{msgs, serde_bolt::WireString}; use vls_protocol::{msgs, serde_bolt::WireString};
use vls_proxy::util::{read_allowlist, read_integration_test_seed}; use vls_proxy::util::{read_allowlist, read_integration_test_seed};
pub fn blocking_connect(tx: mpsc::Sender<ChannelRequest>, network: Network) { pub fn _blocking_connect(tx: mpsc::Sender<ChannelRequest>, network: Network) {
let init_msg_2 = crate::init::make_init_msg(network).expect("couldnt make init msg"); let init_msg_2 = crate::init::_make_init_msg(network).expect("couldnt make init msg");
let (reply_tx, reply_rx) = oneshot::channel(); let (reply_tx, reply_rx) = oneshot::channel();
// Send a request to the MQTT handler to send to signer // Send a request to the MQTT handler to send to signer
let request = ChannelRequest { let request = ChannelRequest {
@@ -22,7 +22,7 @@ pub fn blocking_connect(tx: mpsc::Sender<ChannelRequest>, network: Network) {
} }
pub async fn _connect(tx: mpsc::Sender<ChannelRequest>, network: Network) { pub async fn _connect(tx: mpsc::Sender<ChannelRequest>, network: Network) {
let init_msg_2 = crate::init::make_init_msg(network).expect("could make init msg"); let init_msg_2 = crate::init::_make_init_msg(network).expect("could make init msg");
let (reply_tx, reply_rx) = oneshot::channel(); let (reply_tx, reply_rx) = oneshot::channel();
// Send a request to the MQTT handler to send to signer // Send a request to the MQTT handler to send to signer
let request = ChannelRequest { let request = ChannelRequest {
@@ -35,7 +35,7 @@ pub async fn _connect(tx: mpsc::Sender<ChannelRequest>, network: Network) {
println!("REPLY {:?}", reply); println!("REPLY {:?}", reply);
} }
pub fn make_init_msg(network: Network) -> anyhow::Result<Vec<u8>> { pub fn _make_init_msg(network: Network) -> anyhow::Result<Vec<u8>> {
let allowlist = read_allowlist() let allowlist = read_allowlist()
.into_iter() .into_iter()
.map(|s| WireString(s.as_bytes().to_vec())) .map(|s| WireString(s.as_bytes().to_vec()))

View File

@@ -3,6 +3,7 @@ mod chain_tracker;
mod init; mod init;
mod mqtt; mod mqtt;
mod run_test; mod run_test;
mod server;
mod unix_fd; mod unix_fd;
mod util; mod util;

View File

@@ -41,9 +41,12 @@ pub fn start_broker(
router.start().expect("could not start router"); router.start().expect("could not start router");
}); });
let mut rt_builder = tokio::runtime::Builder::new_multi_thread(); let mut rt_builder = rocket::tokio::runtime::Builder::new_multi_thread();
// for graceful shutdown of rocket
rt_builder.worker_threads(rocket::Config::from(rocket::Config::figment()).workers);
rt_builder.thread_name("rocket-worker-thread");
rt_builder.enable_all(); rt_builder.enable_all();
let rt = rt_builder.build().unwrap(); let rt = rt_builder.build().expect("failed to build runtime");
rt.block_on(async { rt.block_on(async {
tokio::spawn(async move { tokio::spawn(async move {
let (msg_tx, mut msg_rx): (mpsc::Sender<Vec<u8>>, mpsc::Receiver<Vec<u8>>) = let (msg_tx, mut msg_rx): (mpsc::Sender<Vec<u8>>, mpsc::Receiver<Vec<u8>>) =
@@ -107,7 +110,6 @@ pub fn start_broker(
} }
} }
} }
println!("BOOM RECEIVER CLOSED!");
}); });
servers.await; servers.await;

View File

@@ -1,4 +1,5 @@
use crate::mqtt::start_broker; use crate::mqtt::start_broker;
use crate::server::launch_rocket;
use crate::ChannelRequest; use crate::ChannelRequest;
use sphinx_key_parser as parser; use sphinx_key_parser as parser;
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
@@ -17,6 +18,10 @@ pub fn run_test() {
let (status_tx, mut status_rx) = mpsc::channel(1000); let (status_tx, mut status_rx) = mpsc::channel(1000);
let runtime = start_broker(rx, status_tx, CLIENT_ID); let runtime = start_broker(rx, status_tx, CLIENT_ID);
runtime.block_on(async { runtime.block_on(async {
let _r = launch_rocket(tx.clone())
.await
.expect("couldnt launch rocket");
println!("ROCKET!!!");
let mut connected = false; let mut connected = false;
loop { loop {
tokio::select! { tokio::select! {

70
broker/src/server.rs Normal file
View File

@@ -0,0 +1,70 @@
use crate::ChannelRequest;
use rocket::fairing::{Fairing, Info, Kind};
use rocket::http::Header;
use rocket::*;
use tokio::sync::mpsc;
pub async fn launch_rocket(
tx: mpsc::Sender<ChannelRequest>,
) -> std::result::Result<Rocket<Ignite>, rocket::Error> {
let rz = routes![tester];
println!("LAUNCH ROCKET!");
rocket::build()
.mount("/api/", rz)
.attach(CORS)
.manage(tx)
.launch()
.await
}
#[get("/tester")]
pub async fn tester() -> Result<String> {
//
Ok("hi".to_string())
}
pub struct CORS;
#[rocket::async_trait]
impl Fairing for CORS {
fn info(&self) -> Info {
Info {
name: "Add CORS headers to responses",
kind: Kind::Response,
}
}
async fn on_response<'r>(&self, _request: &'r Request<'_>, response: &mut Response<'r>) {
response.set_header(Header::new("Access-Control-Allow-Origin", "*"));
response.set_header(Header::new(
"Access-Control-Allow-Methods",
"POST, GET, PATCH, OPTIONS",
));
response.set_header(Header::new("Access-Control-Allow-Headers", "*"));
response.set_header(Header::new("Access-Control-Allow-Credentials", "true"));
}
}
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("anyhow error: {0}")]
Anyhow(#[from] anyhow::Error),
#[error("not found")]
NotFound,
}
use rocket::http::Status;
use rocket::response::{self, Responder};
impl<'r, 'o: 'r> Responder<'r, 'o> for Error {
fn respond_to(self, req: &'r rocket::Request<'_>) -> response::Result<'o> {
// log `self` to your favored error tracker, e.g.
// sentry::capture_error(&self);
match self {
// in our simplistic example, we're happy to respond with the default 500 responder in all cases
_ => Status::InternalServerError.respond_to(req),
}
}
}

View File

@@ -26,6 +26,10 @@ urlencoding = "2.1.0"
dotenv = "0.15.0" dotenv = "0.15.0"
rocket = "0.5.0-rc.2" rocket = "0.5.0-rc.2"
[[bin]]
name = "tester"
path = "src/main.rs"
[[bin]] [[bin]]
name = "config" name = "config"
path = "src/config.rs" path = "src/config.rs"

1
tester/readme.md Normal file
View File

@@ -0,0 +1 @@
cargo run --bin tester -- --test --log