do not allow ctrl msgs to fake client id

This commit is contained in:
Evan Feenstra
2023-07-02 17:44:58 -07:00
parent dde735e2ce
commit 04af0e7c41
5 changed files with 70 additions and 27 deletions

View File

@@ -1,6 +1,6 @@
use anyhow::Result;
use rocket::tokio::sync::{mpsc, oneshot};
use serde::{Deserialize, Serialize};
use anyhow::Result;
#[derive(Debug, Serialize, Deserialize)]
pub struct Connections {
@@ -65,7 +65,11 @@ impl ChannelRequest {
};
(cr, reply_rx)
}
pub async fn send(topic: &str, message: Vec<u8>, sender: &mpsc::Sender<ChannelRequest>) -> Result<Vec<u8>> {
pub async fn send(
topic: &str,
message: Vec<u8>,
sender: &mpsc::Sender<ChannelRequest>,
) -> Result<Vec<u8>> {
let (reply_tx, reply_rx) = oneshot::channel();
let req = ChannelRequest {
topic: topic.to_string(),
@@ -77,7 +81,12 @@ impl ChannelRequest {
let reply = reply_rx.await?;
Ok(reply.reply)
}
pub async fn send_for(cid: &str, topic: &str, message: Vec<u8>, sender: &mpsc::Sender<ChannelRequest>) -> Result<Vec<u8>> {
pub async fn send_for(
cid: &str,
topic: &str,
message: Vec<u8>,
sender: &mpsc::Sender<ChannelRequest>,
) -> Result<Vec<u8>> {
let (reply_tx, reply_rx) = oneshot::channel();
let req = ChannelRequest {
topic: topic.to_string(),
@@ -110,6 +119,21 @@ pub struct ChannelReply {
pub topic_end: String,
pub reply: Vec<u8>,
}
impl ChannelReply {
pub fn new(topic_end: String, reply: Vec<u8>) -> Self {
Self { topic_end, reply }
}
pub fn empty() -> Self {
Self {
topic_end: "".to_string(),
reply: Vec::new(),
}
}
// failed channel request
pub fn is_empty(&self) -> bool {
self.topic_end.len() == 0 && self.reply.len() == 0
}
}
/// Responses are received on the oneshot sender
#[derive(Debug)]

View File

@@ -1,18 +1,18 @@
// #![feature(once_cell)]
mod chain_tracker;
mod conn;
mod error_log;
mod looper;
mod lss;
mod mqtt;
mod routes;
mod run_test;
mod looper;
mod util;
mod conn;
mod lss;
use crate::conn::{Connections, ChannelRequest, LssReq};
use crate::chain_tracker::MqttSignerPort;
use crate::mqtt::{check_auth, start_broker};
use crate::conn::{ChannelRequest, Connections, LssReq};
use crate::looper::SignerLoop;
use crate::mqtt::{check_auth, start_broker};
use crate::util::{read_broker_config, Settings};
use clap::{arg, App};
use rocket::tokio::{
@@ -20,8 +20,8 @@ use rocket::tokio::{
sync::{broadcast, mpsc},
};
use rumqttd::{oneshot as std_oneshot, AuthMsg};
use std::{env, time::Duration};
use std::sync::{Arc, Mutex};
use std::{env, time::Duration};
use url::Url;
use vls_frontend::{frontend::SourceFactory, Frontend};
use vls_proxy::client::UnixClient;
@@ -77,7 +77,14 @@ async fn run_main(parent_fd: i32) -> rocket::Rocket<rocket::Build> {
let (reconn_tx, reconn_rx) = mpsc::channel::<(String, bool)>(10000);
// waits until first connection
let conns = broker_setup(settings, mqtt_rx, init_rx, reconn_tx.clone(), error_tx.clone()).await;
let conns = broker_setup(
settings,
mqtt_rx,
init_rx,
reconn_tx.clone(),
error_tx.clone(),
)
.await;
tokio::time::sleep(Duration::from_secs(1)).await;
@@ -85,10 +92,10 @@ async fn run_main(parent_fd: i32) -> rocket::Rocket<rocket::Build> {
let _lss_broker = if let Ok(lss_uri) = env::var("VLS_LSS") {
// waits until LSS confirmation from signer
let lss_broker = loop {
match lss::lss_setup(&lss_uri, init_tx.clone()).await{
match lss::lss_setup(&lss_uri, init_tx.clone()).await {
Ok(l) => {
break l;
},
}
Err(e) => {
let _ = error_tx.send(e.to_string().as_bytes().to_vec());
log::error!("failed LSS setup, trying again...");
@@ -157,7 +164,7 @@ pub async fn broker_setup(
let _ = am.reply.send(ok);
}
});
// broker
log::info!("=> start broker on network: {}", settings.network);
start_broker(
@@ -196,4 +203,3 @@ pub async fn broker_setup(
conns
}

View File

@@ -155,18 +155,22 @@ fn pub_and_wait(
link_tx: &mut LinkTx,
) {
loop {
let cs = conns_.lock().unwrap();
let client_list = cs.clients.clone();
drop(cs);
let reply = if let Some(cid) = msg.cid.clone() {
// for a specific client
log::debug!("publishing to a specific client");
pub_timeout(&cid, &msg.topic, &msg.message, &msg_rx, link_tx)
if !client_list.contains(&cid) {
Some(ChannelReply::empty())
} else {
// for a specific client
log::debug!("publishing to a specific client");
pub_timeout(&cid, &msg.topic, &msg.message, &msg_rx, link_tx)
}
} else {
log::debug!("publishing to all clients");
// send to each client in turn
let cs = conns_.lock().unwrap();
let client_list = cs.clients.clone();
drop(cs);
// wait a second if there are no clients
if client_list.len() == 0 {
// wait a second if there are no clients
std::thread::sleep(Duration::from_secs(1));
None
} else {
@@ -207,7 +211,7 @@ fn pub_timeout(
let dur = Duration::from_secs(9);
if let Ok((cid, topic_end, reply)) = msg_rx.recv_timeout(dur) {
if &cid == client_id {
return Some(ChannelReply { reply, topic_end });
return Some(ChannelReply::new(topic_end, reply));
} else {
log::warn!("Mismatched client id!");
// wait a second before trying again

View File

@@ -1,6 +1,6 @@
use crate::util::Settings;
use crate::conn::ChannelRequest;
use crate::conn::Connections;
use crate::util::Settings;
use rocket::fairing::{Fairing, Info, Kind};
use rocket::http::Header;
use rocket::response::stream::{Event, EventStream};
@@ -40,7 +40,9 @@ pub async fn control(
let _ = sender.send(request).await.map_err(|_| Error::Fail)?;
// wait for reply
let reply = reply_rx.await.map_err(|_| Error::Fail)?;
if reply.is_empty() {
return Err(Error::Fail);
}
Ok(hex::encode(reply.reply).to_string())
}

View File

@@ -1,10 +1,10 @@
use crate::conn::ChannelRequest;
use crate::routes::launch_rocket;
use crate::util::Settings;
use crate::conn::ChannelRequest;
use rocket::tokio::{self, sync::broadcast, sync::mpsc};
use sphinx_signer::vls_protocol::{msgs, msgs::Message};
use sphinx_signer::{parser, sphinx_glyph::topics};
use vls_protocol::serde_bolt::WireString;
use sphinx_signer::vls_protocol::{msgs, msgs::Message};
// const CLIENT_ID: &str = "test-1";
@@ -23,7 +23,14 @@ pub async fn run_test() -> rocket::Rocket<rocket::Build> {
crate::error_log::log_errors(error_rx);
// block until connection
let conns = crate::broker_setup(settings, mqtt_rx, init_rx, conn_tx.clone(), error_tx.clone()).await;
let conns = crate::broker_setup(
settings,
mqtt_rx,
init_rx,
conn_tx.clone(),
error_tx.clone(),
)
.await;
log::info!("=> off to the races!");
let tx_ = mqtt_tx.clone();