round trip ControlMessage auth test

This commit is contained in:
Evan Feenstra
2022-09-08 14:47:58 -07:00
parent 21ed9f97ea
commit de68957cdf
11 changed files with 65 additions and 33 deletions

View File

@@ -1,4 +1,4 @@
use crate::{ChannelReply, ChannelRequest}; use crate::{mqtt::PUB_TOPIC, ChannelReply, ChannelRequest};
use async_trait::async_trait; use async_trait::async_trait;
use rocket::tokio::sync::{mpsc, oneshot}; use rocket::tokio::sync::{mpsc, oneshot};
use vls_protocol::{Error, Result}; use vls_protocol::{Error, Result};
@@ -28,8 +28,7 @@ impl MqttSignerPort {
} }
async fn send_request(&self, message: Vec<u8>) -> Result<oneshot::Receiver<ChannelReply>> { async fn send_request(&self, message: Vec<u8>) -> Result<oneshot::Receiver<ChannelReply>> {
let (reply_tx, reply_rx) = oneshot::channel(); let (request, reply_rx) = ChannelRequest::new(PUB_TOPIC, message);
let request = ChannelRequest { message, reply_tx };
self.sender.send(request).await.map_err(|_| Error::Eof)?; self.sender.send(request).await.map_err(|_| Error::Eof)?;
Ok(reply_rx) Ok(reply_rx)
} }

View File

@@ -31,9 +31,21 @@ pub struct Channel {
/// Responses are received on the oneshot sender /// Responses are received on the oneshot sender
#[derive(Debug)] #[derive(Debug)]
pub struct ChannelRequest { pub struct ChannelRequest {
pub topic: String,
pub message: Vec<u8>, pub message: Vec<u8>,
pub reply_tx: oneshot::Sender<ChannelReply>, pub reply_tx: oneshot::Sender<ChannelReply>,
} }
impl ChannelRequest {
pub fn new(topic: &str, message: Vec<u8>) -> (Self, oneshot::Receiver<ChannelReply>) {
let (reply_tx, reply_rx) = oneshot::channel();
let cr = ChannelRequest {
topic: topic.to_string(),
message,
reply_tx,
};
(cr, reply_rx)
}
}
// mpsc reply // mpsc reply
#[derive(Debug)] #[derive(Debug)]

View File

@@ -12,8 +12,10 @@ use std::sync::Arc;
use std::sync::{LazyLock, Mutex}; use std::sync::{LazyLock, Mutex};
use std::time::Duration; use std::time::Duration;
pub const PUB_TOPIC: &str = "sphinx";
pub const CONTROL_TOPIC: &str = "sphinx-control";
const SUB_TOPIC: &str = "sphinx-return"; const SUB_TOPIC: &str = "sphinx-return";
const PUB_TOPIC: &str = "sphinx"; const CONTROL_SUB_TOPIC: &str = "sphinx-control-return";
const USERNAME: &str = "sphinx-key"; const USERNAME: &str = "sphinx-key";
const PASSWORD: &str = "sphinx-key-pass"; const PASSWORD: &str = "sphinx-key-pass";
// must get a reply within this time, or disconnects // must get a reply within this time, or disconnects
@@ -49,7 +51,10 @@ pub async fn start_broker(
let (msg_tx, mut msg_rx): (mpsc::Sender<Vec<u8>>, mpsc::Receiver<Vec<u8>>) = let (msg_tx, mut msg_rx): (mpsc::Sender<Vec<u8>>, mpsc::Receiver<Vec<u8>>) =
mpsc::channel(1000); mpsc::channel(1000);
let (mut link_tx, mut link_rx) = builder.clone().connect("localclient", 200).await.unwrap(); let (mut link_tx, mut link_rx) = builder.clone().connect("localclient", 200).await.unwrap();
link_tx.subscribe([SUB_TOPIC]).await.unwrap(); link_tx
.subscribe([SUB_TOPIC, CONTROL_SUB_TOPIC])
.await
.unwrap();
let router_tx = builder.router_tx(); let router_tx = builder.router_tx();
let status_sender_ = status_sender.clone(); let status_sender_ = status_sender.clone();
@@ -84,7 +89,7 @@ pub async fn start_broker(
let relay_task = tokio::spawn(async move { let relay_task = tokio::spawn(async move {
while let Some(msg) = receiver.recv().await { while let Some(msg) = receiver.recv().await {
link_tx link_tx
.publish(PUB_TOPIC, false, msg.message) .publish(&msg.topic, false, msg.message)
.await .await
.expect("could not mqtt pub"); .expect("could not mqtt pub");
match timeout(Duration::from_millis(REPLY_TIMEOUT_MS), msg_rx.recv()).await { match timeout(Duration::from_millis(REPLY_TIMEOUT_MS), msg_rx.recv()).await {

View File

@@ -1,7 +1,7 @@
use crate::ChannelRequest; use crate::{mqtt::CONTROL_TOPIC, ChannelRequest};
use rocket::fairing::{Fairing, Info, Kind}; use rocket::fairing::{Fairing, Info, Kind};
use rocket::http::Header; use rocket::http::Header;
use rocket::tokio::sync::{mpsc::Sender, oneshot}; use rocket::tokio::sync::mpsc::Sender;
use rocket::*; use rocket::*;
use rocket::{Request, Response}; use rocket::{Request, Response};
@@ -14,8 +14,7 @@ pub async fn yo(sender: &State<Sender<ChannelRequest>>, msg: &str) -> Result<Str
if message.len() < 65 { if message.len() < 65 {
return Err(Error::Fail); return Err(Error::Fail);
} }
let (reply_tx, reply_rx) = oneshot::channel(); let (request, reply_rx) = ChannelRequest::new(CONTROL_TOPIC, message);
let request = ChannelRequest { message, reply_tx };
// send to ESP // send to ESP
let _ = sender.send(request).await.map_err(|_| Error::Fail)?; let _ = sender.send(request).await.map_err(|_| Error::Fail)?;
// wait for reply // wait for reply

View File

@@ -1,11 +1,8 @@
use crate::mqtt::start_broker; use crate::mqtt::{start_broker, PUB_TOPIC};
use crate::routes::launch_rocket; use crate::routes::launch_rocket;
use crate::util::Settings; use crate::util::Settings;
use crate::ChannelRequest; use crate::ChannelRequest;
use rocket::tokio::{ use rocket::tokio::{self, sync::mpsc};
self,
sync::{mpsc, oneshot},
};
use sphinx_key_parser as parser; use sphinx_key_parser as parser;
use vls_protocol::serde_bolt::WireString; use vls_protocol::serde_bolt::WireString;
use vls_protocol::{msgs, msgs::Message}; use vls_protocol::{msgs, msgs::Message};
@@ -60,6 +57,7 @@ pub async fn iteration(
tx: mpsc::Sender<ChannelRequest>, tx: mpsc::Sender<ChannelRequest>,
connected: bool, connected: bool,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
return Ok(());
if !connected { if !connected {
return Ok(()); return Ok(());
} }
@@ -69,12 +67,8 @@ pub async fn iteration(
message: WireString("ping".as_bytes().to_vec()), message: WireString("ping".as_bytes().to_vec()),
}; };
let ping_bytes = parser::request_from_msg(ping, sequence, 0)?; let ping_bytes = parser::request_from_msg(ping, sequence, 0)?;
let (reply_tx, reply_rx) = oneshot::channel();
// Send a request to the MQTT handler to send to signer // Send a request to the MQTT handler to send to signer
let request = ChannelRequest { let (request, reply_rx) = ChannelRequest::new(PUB_TOPIC, ping_bytes);
message: ping_bytes,
reply_tx,
};
tx.send(request).await?; tx.send(request).await?;
println!("tx.send(request)"); println!("tx.send(request)");
let res = reply_rx.await?; let res = reply_rx.await?;

View File

@@ -1,3 +1,4 @@
use crate::mqtt::PUB_TOPIC;
use crate::util::Settings; use crate::util::Settings;
use crate::{Channel, ChannelReply, ChannelRequest}; use crate::{Channel, ChannelReply, ChannelRequest};
use bitcoin::blockdata::constants::ChainHash; use bitcoin::blockdata::constants::ChainHash;
@@ -125,10 +126,8 @@ impl<C: 'static + Client> SignerLoop<C> {
} }
fn send_request(&mut self, message: Vec<u8>) -> Result<oneshot::Receiver<ChannelReply>> { fn send_request(&mut self, message: Vec<u8>) -> Result<oneshot::Receiver<ChannelReply>> {
// Create a one-shot channel to receive the reply
let (reply_tx, reply_rx) = oneshot::channel();
// Send a request to the MQTT handler to send to signer // Send a request to the MQTT handler to send to signer
let request = ChannelRequest { message, reply_tx }; let (request, reply_rx) = ChannelRequest::new(PUB_TOPIC, message);
// This can fail if MQTT shuts down // This can fail if MQTT shuts down
self.chan self.chan
.sender .sender

View File

@@ -38,8 +38,8 @@ impl Controller {
} }
pub fn build_msg(&mut self, msg: ControlMessage) -> anyhow::Result<Vec<u8>> { pub fn build_msg(&mut self, msg: ControlMessage) -> anyhow::Result<Vec<u8>> {
let data = rmp_serde::to_vec(&msg)?; let data = rmp_serde::to_vec(&msg)?;
let ret = nonce::build_msg(&data, &self.0, self.2)?;
self.2 = self.2 + 1; self.2 = self.2 + 1;
let ret = nonce::build_msg(&data, &self.0, self.2)?;
Ok(ret) Ok(ret)
} }
pub fn build_response(&self, msg: ControlResponse) -> anyhow::Result<Vec<u8>> { pub fn build_response(&self, msg: ControlResponse) -> anyhow::Result<Vec<u8>> {
@@ -54,4 +54,12 @@ impl Controller {
pub fn parse_response(&self, input: &[u8]) -> anyhow::Result<ControlResponse> { pub fn parse_response(&self, input: &[u8]) -> anyhow::Result<ControlResponse> {
Ok(rmp_serde::from_slice(input)?) Ok(rmp_serde::from_slice(input)?)
} }
pub fn handle(&mut self, input: &[u8]) -> anyhow::Result<Vec<u8>> {
let msg = self.parse_msg(input)?;
let res = match msg {
ControlMessage::Nonce => ControlResponse::Nonce(self.2),
_ => ControlResponse::Nonce(self.2),
};
Ok(self.build_response(res)?)
}
} }

View File

@@ -15,6 +15,7 @@ use std::thread;
pub const VLS_TOPIC: &str = "sphinx"; pub const VLS_TOPIC: &str = "sphinx";
pub const CONTROL_TOPIC: &str = "sphinx-control"; pub const CONTROL_TOPIC: &str = "sphinx-control";
pub const RETURN_TOPIC: &str = "sphinx-return"; pub const RETURN_TOPIC: &str = "sphinx-return";
pub const CONTROL_RETURN_TOPIC: &str = "sphinx-control-return";
pub const USERNAME: &str = "sphinx-key"; pub const USERNAME: &str = "sphinx-key";
pub const PASSWORD: &str = "sphinx-key-pass"; pub const PASSWORD: &str = "sphinx-key-pass";
pub const QOS: QoS = QoS::AtMostOnce; pub const QOS: QoS = QoS::AtMostOnce;

View File

@@ -2,7 +2,7 @@
#### test control messages #### test control messages
cargo run --bin sphinx-key-tester -- --test cargo run --bin sphinx-key-tester -- --test --log
cd broker cd broker
cargo run -- --test cargo run -- --test

View File

@@ -1,6 +1,6 @@
use dotenv::dotenv; use dotenv::dotenv;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use sphinx_key_parser::control::{ControlMessage, Controller}; use sphinx_key_parser::control::{ControlMessage, ControlResponse, Controller};
use sphinx_key_signer::lightning_signer::bitcoin::Network; use sphinx_key_signer::lightning_signer::bitcoin::Network;
use std::env; use std::env;
use std::time::Duration; use std::time::Duration;
@@ -35,7 +35,10 @@ async fn main() -> anyhow::Result<()> {
.await?; .await?;
let response: String = res.text().await?; let response: String = res.text().await?;
println!("res {:?}", response); let res_bytes = hex::decode(response).expect("couldnt decode response");
let resp = ctrl.parse_response(&res_bytes).expect("nope");
println!("RESponse from the ESP!!! {:?}", resp);
Ok(()) Ok(())
} }

View File

@@ -2,6 +2,7 @@ use sphinx_key_parser as parser;
use sphinx_key_signer::lightning_signer::bitcoin::Network; use sphinx_key_signer::lightning_signer::bitcoin::Network;
use clap::{App, AppSettings, Arg}; use clap::{App, AppSettings, Arg};
use dotenv::dotenv;
use rumqttc::{self, AsyncClient, Event, MqttOptions, Packet, QoS}; use rumqttc::{self, AsyncClient, Event, MqttOptions, Packet, QoS};
use sphinx_key_signer::control::Controller; use sphinx_key_signer::control::Controller;
use sphinx_key_signer::vls_protocol::{model::PubKey, msgs}; use sphinx_key_signer::vls_protocol::{model::PubKey, msgs};
@@ -13,15 +14,17 @@ use std::time::Duration;
const SUB_TOPIC: &str = "sphinx"; const SUB_TOPIC: &str = "sphinx";
const CONTROL_TOPIC: &str = "sphinx-control"; const CONTROL_TOPIC: &str = "sphinx-control";
const CONTROL_PUB_TOPIC: &str = "sphinx-control-return";
const PUB_TOPIC: &str = "sphinx-return"; const PUB_TOPIC: &str = "sphinx-return";
const USERNAME: &str = "sphinx-key"; const USERNAME: &str = "sphinx-key";
const PASSWORD: &str = "sphinx-key-pass"; const PASSWORD: &str = "sphinx-key-pass";
const DEV_SEED: [u8; 32] = [0; 32];
#[tokio::main(worker_threads = 1)] #[tokio::main(worker_threads = 1)]
async fn main() -> Result<(), Box<dyn Error>> { async fn main() -> Result<(), Box<dyn Error>> {
setup_logging("sphinx-key-tester ", "info"); setup_logging("sphinx-key-tester ", "info");
dotenv().ok();
let app = App::new("tester") let app = App::new("tester")
.setting(AppSettings::NoAutoVersion) .setting(AppSettings::NoAutoVersion)
.about("CLN:mqtt-tester - MQTT client signer") .about("CLN:mqtt-tester - MQTT client signer")
@@ -65,8 +68,10 @@ async fn main() -> Result<(), Box<dyn Error>> {
.await .await
.expect("could not mqtt subscribe"); .expect("could not mqtt subscribe");
let seed_string: String = env::var("SEED").expect("no seed");
let seed = hex::decode(seed_string).expect("couldnt decode seed");
// make the controller to validate Control messages // make the controller to validate Control messages
let mut ctrlr = controller_from_seed(&Network::Regtest, &DEV_SEED[..]); let mut ctrlr = controller_from_seed(&Network::Regtest, &seed);
if is_test { if is_test {
// test handler loop // test handler loop
@@ -97,10 +102,17 @@ async fn main() -> Result<(), Box<dyn Error>> {
.expect("could not mqtt publish"); .expect("could not mqtt publish");
} }
CONTROL_TOPIC => { CONTROL_TOPIC => {
match ctrlr.parse_msg(&msg_bytes) { match ctrlr.handle(&msg_bytes) {
Ok(msg) => { Ok(response) => {
log::info!("CONTROL MSG {:?}", msg); client
// create a response and mqtt pub here .publish(
CONTROL_PUB_TOPIC,
QoS::AtMostOnce,
false,
response,
)
.await
.expect("could not mqtt publish");
} }
Err(e) => log::warn!("error parsing ctrl msg {:?}", e), Err(e) => log::warn!("error parsing ctrl msg {:?}", e),
}; };