control msg tester, fix mqtt router startup

This commit is contained in:
Evan Feenstra
2022-09-07 17:05:31 -07:00
parent 3992dfe6fe
commit 21ed9f97ea
8 changed files with 108 additions and 28 deletions

View File

@@ -29,6 +29,7 @@ url = { version = "2.2" }
toml = "0.5.9"
rocket = {version = "0.5.0-rc.2", features = ["json"]}
thiserror = "1.0.31"
hex = "0.4.3"
[features]
default = ["std"]

View File

@@ -11,9 +11,12 @@ use crate::mqtt::start_broker;
use crate::unix_fd::SignerLoop;
use crate::util::read_broker_config;
use clap::{App, AppSettings, Arg};
use rocket::tokio::{
self,
sync::{mpsc, oneshot},
};
use std::env;
use std::sync::Arc;
use rocket::tokio::{self, sync::{mpsc, oneshot}};
use url::Url;
use vls_frontend::Frontend;
use vls_proxy::client::UnixClient;
@@ -101,11 +104,11 @@ async fn run_main(parent_fd: i32) -> rocket::Rocket<rocket::Build> {
frontend.start();
});
}
// listen to reqs from CLN
let conn = UnixConnection::new(parent_fd);
let client = UnixClient::new(conn);
// TODO pass status_rx into SignerLoop
let mut signer_loop = SignerLoop::new(client, tx.clone());
// spawn CLN listener on a std thread
std::thread::spawn(move || {
signer_loop.start(Some(&settings));
});

View File

@@ -38,11 +38,14 @@ pub async fn start_broker(
let (mut router, servers, builder) = async_locallink::construct(config.clone());
tokio::spawn(async move {
// std thread for the router
std::thread::spawn(move || {
log::info!("start mqtt router");
router.start().expect("could not start router");
});
tokio::spawn(async move {
log::info!("start mqtt relayer and localclient");
let (msg_tx, mut msg_rx): (mpsc::Sender<Vec<u8>>, mpsc::Receiver<Vec<u8>>) =
mpsc::channel(1000);
let (mut link_tx, mut link_rx) = builder.clone().connect("localclient", 200).await.unwrap();
@@ -76,7 +79,6 @@ pub async fn start_broker(
}
}
}
println!("BOOM LINK_TX CLOSED!");
});
let relay_task = tokio::spawn(async move {
@@ -103,7 +105,6 @@ pub async fn start_broker(
}
}
}
println!("BOOM RECEIVER CLOSED!");
});
servers.await;
@@ -112,7 +113,7 @@ pub async fn start_broker(
});
// give one second for router to spawn listeners
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
}
fn metrics_to_status(metrics: ConnectionMetrics, client_connected: bool) -> Option<bool> {

View File

@@ -1,11 +1,28 @@
use crate::ChannelRequest;
use rocket::fairing::{Fairing, Info, Kind};
use rocket::http::Header;
use rocket::serde::json::json;
use rocket::tokio::sync::mpsc::Sender;
use rocket::tokio::sync::{mpsc::Sender, oneshot};
use rocket::*;
use rocket::{Request, Response};
pub type Result<T> = std::result::Result<T, Error>;
#[post("/control?<msg>")]
pub async fn yo(sender: &State<Sender<ChannelRequest>>, msg: &str) -> Result<String> {
let message = hex::decode(msg)?;
// FIXME validate?
if message.len() < 65 {
return Err(Error::Fail);
}
let (reply_tx, reply_rx) = oneshot::channel();
let request = ChannelRequest { message, reply_tx };
// send to ESP
let _ = sender.send(request).await.map_err(|_| Error::Fail)?;
// wait for reply
let reply = reply_rx.await.map_err(|_| Error::Fail)?;
Ok(hex::encode(reply.reply).to_string())
}
pub fn launch_rocket(tx: Sender<ChannelRequest>) -> Rocket<Build> {
rocket::build()
.mount("/api/", routes![yo])
@@ -13,17 +30,27 @@ pub fn launch_rocket(tx: Sender<ChannelRequest>) -> Rocket<Build> {
.manage(tx)
}
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("failed")]
Fail,
#[error("io error: {0}")]
Io(#[from] std::io::Error),
#[error("hex error: {0}")]
Hex(#[from] hex::FromHexError),
}
#[get("/yo")]
pub async fn yo() -> Result<String> {
Ok(json!(1).to_string())
use rocket::http::Status;
use rocket::response::{self, Responder};
impl<'r, 'o: 'r> Responder<'r, 'o> for Error {
fn respond_to(self, req: &'r rocket::Request<'_>) -> response::Result<'o> {
// log `self` to your favored error tracker, e.g.
// sentry::capture_error(&self);
match self {
// in our simplistic example, we're happy to respond with the default 500 responder in all cases
_ => Status::InternalServerError.respond_to(req),
}
}
}
pub struct CORS;
@@ -47,16 +74,3 @@ impl Fairing for CORS {
response.set_header(Header::new("Access-Control-Allow-Credentials", "true"));
}
}
use rocket::http::Status;
use rocket::response::{self, Responder};
impl<'r, 'o: 'r> Responder<'r, 'o> for Error {
fn respond_to(self, req: &'r rocket::Request<'_>) -> response::Result<'o> {
// log `self` to your favored error tracker, e.g.
// sentry::capture_error(&self);
match self {
// in our simplistic example, we're happy to respond with the default 500 responder in all cases
_ => Status::InternalServerError.respond_to(req),
}
}
}

View File

@@ -33,3 +33,7 @@ path = "src/config.rs"
[[bin]]
name = "config-server"
path = "src/server.rs"
[[bin]]
name = "ctrl"
path = "src/ctrl.rs"

11
tester/README.md Normal file
View File

@@ -0,0 +1,11 @@
#### test control messages
cargo run --bin sphinx-key-tester -- --test
cd broker
cargo run -- --test
cargo run --bin ctrl

46
tester/src/ctrl.rs Normal file
View File

@@ -0,0 +1,46 @@
use dotenv::dotenv;
use serde::{Deserialize, Serialize};
use sphinx_key_parser::control::{ControlMessage, Controller};
use sphinx_key_signer::lightning_signer::bitcoin::Network;
use std::env;
use std::time::Duration;
const URL: &str = "http://localhost:8000/api";
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct EcdhBody {
pub pubkey: String,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
dotenv().ok();
let seed_string: String = env::var("SEED").expect("no seed");
let seed = hex::decode(seed_string).expect("yo");
let mut ctrl = controller_from_seed(&Network::Regtest, &seed);
let msg = ctrl.build_msg(ControlMessage::Nonce)?;
let msg_hex = hex::encode(&msg);
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(10))
.build()
.expect("couldnt build reqwest client");
let res = client
.post(format!("{}/control?msg={}", URL, msg_hex))
.header("Content-Type", "application/json")
.send()
.await?;
let response: String = res.text().await?;
println!("res {:?}", response);
Ok(())
}
pub fn controller_from_seed(network: &Network, seed: &[u8]) -> Controller {
let (pk, sk) = sphinx_key_signer::derive_node_keys(network, seed);
Controller::new(sk, pk, 0)
}

View File

@@ -48,9 +48,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
break (client, eventloop);
}
}
Err(_) => {
Err(e) => {
try_i = try_i + 1;
println!("reconnect.... {}", try_i);
println!("reconnect.... {} {:?}", try_i, e);
tokio::time::sleep(Duration::from_secs(1)).await;
}
}