From 04af0e7c41ad43069fa46a3c927be49fb12ce71a Mon Sep 17 00:00:00 2001 From: Evan Feenstra Date: Sun, 2 Jul 2023 17:44:58 -0700 Subject: [PATCH] do not allow ctrl msgs to fake client id --- broker/src/conn.rs | 30 +++++++++++++++++++++++++++--- broker/src/main.rs | 28 +++++++++++++++++----------- broker/src/mqtt.rs | 20 ++++++++++++-------- broker/src/routes.rs | 6 ++++-- broker/src/run_test.rs | 13 ++++++++++--- 5 files changed, 70 insertions(+), 27 deletions(-) diff --git a/broker/src/conn.rs b/broker/src/conn.rs index 4ee2d4b..a7421d4 100644 --- a/broker/src/conn.rs +++ b/broker/src/conn.rs @@ -1,6 +1,6 @@ +use anyhow::Result; use rocket::tokio::sync::{mpsc, oneshot}; use serde::{Deserialize, Serialize}; -use anyhow::Result; #[derive(Debug, Serialize, Deserialize)] pub struct Connections { @@ -65,7 +65,11 @@ impl ChannelRequest { }; (cr, reply_rx) } - pub async fn send(topic: &str, message: Vec, sender: &mpsc::Sender) -> Result> { + pub async fn send( + topic: &str, + message: Vec, + sender: &mpsc::Sender, + ) -> Result> { let (reply_tx, reply_rx) = oneshot::channel(); let req = ChannelRequest { topic: topic.to_string(), @@ -77,7 +81,12 @@ impl ChannelRequest { let reply = reply_rx.await?; Ok(reply.reply) } - pub async fn send_for(cid: &str, topic: &str, message: Vec, sender: &mpsc::Sender) -> Result> { + pub async fn send_for( + cid: &str, + topic: &str, + message: Vec, + sender: &mpsc::Sender, + ) -> Result> { let (reply_tx, reply_rx) = oneshot::channel(); let req = ChannelRequest { topic: topic.to_string(), @@ -110,6 +119,21 @@ pub struct ChannelReply { pub topic_end: String, pub reply: Vec, } +impl ChannelReply { + pub fn new(topic_end: String, reply: Vec) -> Self { + Self { topic_end, reply } + } + pub fn empty() -> Self { + Self { + topic_end: "".to_string(), + reply: Vec::new(), + } + } + // failed channel request + pub fn is_empty(&self) -> bool { + self.topic_end.len() == 0 && self.reply.len() == 0 + } +} /// Responses are received on the oneshot sender #[derive(Debug)] diff --git a/broker/src/main.rs b/broker/src/main.rs index 8a14393..19030f3 100644 --- a/broker/src/main.rs +++ b/broker/src/main.rs @@ -1,18 +1,18 @@ // #![feature(once_cell)] mod chain_tracker; +mod conn; mod error_log; +mod looper; +mod lss; mod mqtt; mod routes; mod run_test; -mod looper; mod util; -mod conn; -mod lss; -use crate::conn::{Connections, ChannelRequest, LssReq}; use crate::chain_tracker::MqttSignerPort; -use crate::mqtt::{check_auth, start_broker}; +use crate::conn::{ChannelRequest, Connections, LssReq}; use crate::looper::SignerLoop; +use crate::mqtt::{check_auth, start_broker}; use crate::util::{read_broker_config, Settings}; use clap::{arg, App}; use rocket::tokio::{ @@ -20,8 +20,8 @@ use rocket::tokio::{ sync::{broadcast, mpsc}, }; use rumqttd::{oneshot as std_oneshot, AuthMsg}; -use std::{env, time::Duration}; use std::sync::{Arc, Mutex}; +use std::{env, time::Duration}; use url::Url; use vls_frontend::{frontend::SourceFactory, Frontend}; use vls_proxy::client::UnixClient; @@ -77,7 +77,14 @@ async fn run_main(parent_fd: i32) -> rocket::Rocket { let (reconn_tx, reconn_rx) = mpsc::channel::<(String, bool)>(10000); // waits until first connection - let conns = broker_setup(settings, mqtt_rx, init_rx, reconn_tx.clone(), error_tx.clone()).await; + let conns = broker_setup( + settings, + mqtt_rx, + init_rx, + reconn_tx.clone(), + error_tx.clone(), + ) + .await; tokio::time::sleep(Duration::from_secs(1)).await; @@ -85,10 +92,10 @@ async fn run_main(parent_fd: i32) -> rocket::Rocket { let _lss_broker = if let Ok(lss_uri) = env::var("VLS_LSS") { // waits until LSS confirmation from signer let lss_broker = loop { - match lss::lss_setup(&lss_uri, init_tx.clone()).await{ + match lss::lss_setup(&lss_uri, init_tx.clone()).await { Ok(l) => { break l; - }, + } Err(e) => { let _ = error_tx.send(e.to_string().as_bytes().to_vec()); log::error!("failed LSS setup, trying again..."); @@ -157,7 +164,7 @@ pub async fn broker_setup( let _ = am.reply.send(ok); } }); - + // broker log::info!("=> start broker on network: {}", settings.network); start_broker( @@ -196,4 +203,3 @@ pub async fn broker_setup( conns } - diff --git a/broker/src/mqtt.rs b/broker/src/mqtt.rs index d8cc48d..b58077a 100644 --- a/broker/src/mqtt.rs +++ b/broker/src/mqtt.rs @@ -155,18 +155,22 @@ fn pub_and_wait( link_tx: &mut LinkTx, ) { loop { + let cs = conns_.lock().unwrap(); + let client_list = cs.clients.clone(); + drop(cs); let reply = if let Some(cid) = msg.cid.clone() { - // for a specific client - log::debug!("publishing to a specific client"); - pub_timeout(&cid, &msg.topic, &msg.message, &msg_rx, link_tx) + if !client_list.contains(&cid) { + Some(ChannelReply::empty()) + } else { + // for a specific client + log::debug!("publishing to a specific client"); + pub_timeout(&cid, &msg.topic, &msg.message, &msg_rx, link_tx) + } } else { log::debug!("publishing to all clients"); // send to each client in turn - let cs = conns_.lock().unwrap(); - let client_list = cs.clients.clone(); - drop(cs); - // wait a second if there are no clients if client_list.len() == 0 { + // wait a second if there are no clients std::thread::sleep(Duration::from_secs(1)); None } else { @@ -207,7 +211,7 @@ fn pub_timeout( let dur = Duration::from_secs(9); if let Ok((cid, topic_end, reply)) = msg_rx.recv_timeout(dur) { if &cid == client_id { - return Some(ChannelReply { reply, topic_end }); + return Some(ChannelReply::new(topic_end, reply)); } else { log::warn!("Mismatched client id!"); // wait a second before trying again diff --git a/broker/src/routes.rs b/broker/src/routes.rs index 886352e..a92e723 100644 --- a/broker/src/routes.rs +++ b/broker/src/routes.rs @@ -1,6 +1,6 @@ -use crate::util::Settings; use crate::conn::ChannelRequest; use crate::conn::Connections; +use crate::util::Settings; use rocket::fairing::{Fairing, Info, Kind}; use rocket::http::Header; use rocket::response::stream::{Event, EventStream}; @@ -40,7 +40,9 @@ pub async fn control( let _ = sender.send(request).await.map_err(|_| Error::Fail)?; // wait for reply let reply = reply_rx.await.map_err(|_| Error::Fail)?; - + if reply.is_empty() { + return Err(Error::Fail); + } Ok(hex::encode(reply.reply).to_string()) } diff --git a/broker/src/run_test.rs b/broker/src/run_test.rs index 34827b1..2f3e48c 100644 --- a/broker/src/run_test.rs +++ b/broker/src/run_test.rs @@ -1,10 +1,10 @@ +use crate::conn::ChannelRequest; use crate::routes::launch_rocket; use crate::util::Settings; -use crate::conn::ChannelRequest; use rocket::tokio::{self, sync::broadcast, sync::mpsc}; +use sphinx_signer::vls_protocol::{msgs, msgs::Message}; use sphinx_signer::{parser, sphinx_glyph::topics}; use vls_protocol::serde_bolt::WireString; -use sphinx_signer::vls_protocol::{msgs, msgs::Message}; // const CLIENT_ID: &str = "test-1"; @@ -23,7 +23,14 @@ pub async fn run_test() -> rocket::Rocket { crate::error_log::log_errors(error_rx); // block until connection - let conns = crate::broker_setup(settings, mqtt_rx, init_rx, conn_tx.clone(), error_tx.clone()).await; + let conns = crate::broker_setup( + settings, + mqtt_rx, + init_rx, + conn_tx.clone(), + error_tx.clone(), + ) + .await; log::info!("=> off to the races!"); let tx_ = mqtt_tx.clone();