refactor to channel-based auth, main Connections struct track connected clients

This commit is contained in:
Evan Feenstra
2023-03-20 16:44:06 -07:00
parent 793727704e
commit 700e8f4ef3
7 changed files with 137 additions and 262 deletions

261
broker/Cargo.lock generated
View File

@@ -17,12 +17,6 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]]
name = "adler32"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aae1277d39aeec15cb388266ecc24b11c80469deae6067e17a1a7aa9e5c1f234"
[[package]] [[package]]
name = "aead" name = "aead"
version = "0.5.1" version = "0.5.1"
@@ -84,21 +78,6 @@ dependencies = [
"memchr", "memchr",
] ]
[[package]]
name = "alloc-no-stdlib"
version = "2.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cc7bb162ec39d46ab1ca8c77bf72e890535becd1751bb45f64c597edb4c8c6b3"
[[package]]
name = "alloc-stdlib"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94fb8275041c72129eb51b7d0322c29b8387a0386127718b096429201a5d6ece"
dependencies = [
"alloc-no-stdlib",
]
[[package]] [[package]]
name = "android_system_properties" name = "android_system_properties"
version = "0.1.5" version = "0.1.5"
@@ -129,12 +108,6 @@ version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8c3419eecc9f5967e6f0f29a0c3fefe22bda6ea34b15798f3c452cb81f2c3fa7" checksum = "8c3419eecc9f5967e6f0f29a0c3fefe22bda6ea34b15798f3c452cb81f2c3fa7"
[[package]]
name = "ascii"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d92bec98840b8f03a5ff5413de5293bfcd8bf96467cf5452609f939ec6f5de16"
[[package]] [[package]]
name = "async-channel" name = "async-channel"
version = "1.8.0" version = "1.8.0"
@@ -226,7 +199,11 @@ dependencies = [
"pin-project-lite", "pin-project-lite",
"rustversion", "rustversion",
"serde", "serde",
"serde_json",
"serde_path_to_error",
"serde_urlencoded",
"sync_wrapper", "sync_wrapper",
"tokio",
"tower", "tower",
"tower-http", "tower-http",
"tower-layer", "tower-layer",
@@ -377,37 +354,6 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "brotli"
version = "3.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1a0b1dbcc8ae29329621f8d4f0d835787c1c38bb1401979b49d13b0b305ff68"
dependencies = [
"alloc-no-stdlib",
"alloc-stdlib",
"brotli-decompressor",
]
[[package]]
name = "brotli-decompressor"
version = "2.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4b6561fd3f895a11e8f72af2cb7d22e08366bebc2b6b57f7744c4bda27034744"
dependencies = [
"alloc-no-stdlib",
"alloc-stdlib",
]
[[package]]
name = "buf_redux"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b953a6887648bb07a535631f2bc00fbdb2a2216f135552cb3f534ed136b9c07f"
dependencies = [
"memchr",
"safemem",
]
[[package]] [[package]]
name = "bumpalo" name = "bumpalo"
version = "3.11.1" version = "3.11.1"
@@ -463,12 +409,6 @@ dependencies = [
"winapi", "winapi",
] ]
[[package]]
name = "chunked_transfer"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cca491388666e04d7248af3f60f0c40cfb0991c72205595d7c396e3510207d1a"
[[package]] [[package]]
name = "cipher" name = "cipher"
version = "0.4.3" version = "0.4.3"
@@ -803,16 +743,6 @@ version = "2.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23d8666cb01533c39dde32bcbab8e227b4ed6679b2c925eba05feabea39508fb" checksum = "23d8666cb01533c39dde32bcbab8e227b4ed6679b2c925eba05feabea39508fb"
[[package]]
name = "deflate"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c86f7e25f518f4b81808a2cf1c50996a61f5c2eb394b2393bd87f2a4780a432f"
dependencies = [
"adler32",
"gzip-header",
]
[[package]] [[package]]
name = "delegate" name = "delegate"
version = "0.6.2" version = "0.6.2"
@@ -1030,18 +960,6 @@ dependencies = [
"version_check", "version_check",
] ]
[[package]]
name = "filetime"
version = "0.2.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a3de6e8d11b22ff9edc6d916f890800597d60f8b2da1caf2955c274638d6412"
dependencies = [
"cfg-if 1.0.0",
"libc",
"redox_syscall",
"windows-sys 0.45.0",
]
[[package]] [[package]]
name = "fixedbitset" name = "fixedbitset"
version = "0.4.2" version = "0.4.2"
@@ -1259,15 +1177,6 @@ version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574" checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574"
[[package]]
name = "gzip-header"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95cc527b92e6029a62960ad99aa8a6660faa4555fe5f731aab13aa6a921795a2"
dependencies = [
"crc32fast",
]
[[package]] [[package]]
name = "h2" name = "h2"
version = "0.3.15" version = "0.3.15"
@@ -1752,6 +1661,7 @@ checksum = "ff50ecb28bb86013e935fb6683ab1f6d3a20016f123c76fd4c27470076ac30f5"
dependencies = [ dependencies = [
"cfg-if 1.0.0", "cfg-if 1.0.0",
"generator", "generator",
"pin-utils",
"scoped-tls", "scoped-tls",
"serde", "serde",
"serde_json", "serde_json",
@@ -1874,16 +1784,6 @@ version = "0.3.16"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d"
[[package]]
name = "mime_guess"
version = "2.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4192263c238a5f0d0c6bfd21f336a313a4ce1c450542449ca191bb657b4642ef"
dependencies = [
"mime",
"unicase",
]
[[package]] [[package]]
name = "minimal-lexical" name = "minimal-lexical"
version = "0.2.1" version = "0.2.1"
@@ -1937,24 +1837,6 @@ version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a"
[[package]]
name = "multipart"
version = "0.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00dec633863867f29cb39df64a397cdf4a6354708ddd7759f70c7fb51c5f9182"
dependencies = [
"buf_redux",
"httparse",
"log",
"mime",
"mime_guess",
"quick-error",
"rand",
"safemem",
"tempfile",
"twoway",
]
[[package]] [[package]]
name = "nanorand" name = "nanorand"
version = "0.7.0" version = "0.7.0"
@@ -2051,15 +1933,6 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "num_threads"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2819ce041d2ee131036f4fc9d6ae7ae125a3a40e97ba64d04fe799ad9dabbb44"
dependencies = [
"libc",
]
[[package]] [[package]]
name = "object" name = "object"
version = "0.29.0" version = "0.29.0"
@@ -2084,6 +1957,15 @@ version = "1.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "86f0b0d4bf799edbc74508c1e8bf170ff5f41238e5f8225603ca7caaae2b7860" checksum = "86f0b0d4bf799edbc74508c1e8bf170ff5f41238e5f8225603ca7caaae2b7860"
[[package]]
name = "oneshot"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc22d22931513428ea6cc089e942d38600e3d00976eef8c86de6b8a3aadec6eb"
dependencies = [
"loom",
]
[[package]] [[package]]
name = "opaque-debug" name = "opaque-debug"
version = "0.3.0" version = "0.3.0"
@@ -2764,31 +2646,6 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "rouille"
version = "3.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4f86e4c51a773f953f02bbab5fd049f004bfd384341d62da2a079aff812ab176"
dependencies = [
"base64",
"brotli",
"chrono",
"deflate",
"filetime",
"multipart",
"num_cpus",
"percent-encoding",
"rand",
"serde",
"serde_derive",
"serde_json",
"sha1",
"threadpool",
"time 0.3.17",
"tiny_http",
"url",
]
[[package]] [[package]]
name = "rumqttc" name = "rumqttc"
version = "0.12.0" version = "0.12.0"
@@ -2808,22 +2665,22 @@ dependencies = [
[[package]] [[package]]
name = "rumqttd" name = "rumqttd"
version = "0.12.5" version = "0.12.6"
source = "git+https://github.com/Evanfeenstra/rumqtt?branch=sphinx#c9e2174c9385ef99b95698c04d5911c991789b3c" source = "git+https://github.com/Evanfeenstra/rumqtt?branch=sphinx-2#8811257886364ce0d1f09773c3aa5596e457d8c0"
dependencies = [ dependencies = [
"axum",
"bytes", "bytes",
"clap 4.1.4", "clap 4.1.4",
"config", "config",
"flume", "flume",
"metrics", "metrics",
"metrics-exporter-prometheus", "metrics-exporter-prometheus",
"oneshot",
"parking_lot 0.11.2", "parking_lot 0.11.2",
"rouille",
"rustls-pemfile 1.0.1", "rustls-pemfile 1.0.1",
"serde", "serde",
"serde_json", "serde_json",
"slab", "slab",
"sphinx-auther 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)",
"thiserror", "thiserror",
"tokio", "tokio",
"tokio-rustls", "tokio-rustls",
@@ -2913,12 +2770,6 @@ version = "1.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4501abdff3ae82a1c1b477a17252eb69cee9e66eb915c1abaa4f44d873df9f09" checksum = "4501abdff3ae82a1c1b477a17252eb69cee9e66eb915c1abaa4f44d873df9f09"
[[package]]
name = "safemem"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ef703b7cb59335eae2eb93ceb664c0eb7ea6bf567079d843e09420219668e072"
[[package]] [[package]]
name = "scoped-tls" name = "scoped-tls"
version = "1.0.1" version = "1.0.1"
@@ -3020,6 +2871,15 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "serde_path_to_error"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db0969fff533976baadd92e08b1d102c5a3d8a8049eadfd69d4d1e3c5b2ed189"
dependencies = [
"serde",
]
[[package]] [[package]]
name = "serde_urlencoded" name = "serde_urlencoded"
version = "0.7.1" version = "0.7.1"
@@ -3072,17 +2932,6 @@ dependencies = [
"unsafe-libyaml", "unsafe-libyaml",
] ]
[[package]]
name = "sha1"
version = "0.10.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f04293dc80c3993519f2d7f6f511707ee7094fe0c6d3406feb330cdb3540eba3"
dependencies = [
"cfg-if 1.0.0",
"cpufeatures",
"digest",
]
[[package]] [[package]]
name = "sha2" name = "sha2"
version = "0.10.6" version = "0.10.6"
@@ -3168,19 +3017,6 @@ dependencies = [
"winapi", "winapi",
] ]
[[package]]
name = "sphinx-auther"
version = "0.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33bd24149ede6f4ec091326eacf550cfa3fc00492d4e627a045c1bd690255362"
dependencies = [
"anyhow",
"base64",
"hex",
"log",
"secp256k1",
]
[[package]] [[package]]
name = "sphinx-auther" name = "sphinx-auther"
version = "0.1.12" version = "0.1.12"
@@ -3203,7 +3039,7 @@ dependencies = [
"rmp-serde", "rmp-serde",
"serde", "serde",
"serde_json", "serde_json",
"sphinx-auther 0.1.12 (git+https://github.com/stakwork/sphinx-rs)", "sphinx-auther",
] ]
[[package]] [[package]]
@@ -3386,15 +3222,6 @@ dependencies = [
"once_cell", "once_cell",
] ]
[[package]]
name = "threadpool"
version = "1.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa"
dependencies = [
"num_cpus",
]
[[package]] [[package]]
name = "time" name = "time"
version = "0.1.45" version = "0.1.45"
@@ -3413,8 +3240,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a561bf4617eebd33bca6434b988f39ed798e527f51a1e797d0ee4f61c0a38376" checksum = "a561bf4617eebd33bca6434b988f39ed798e527f51a1e797d0ee4f61c0a38376"
dependencies = [ dependencies = [
"itoa", "itoa",
"libc",
"num_threads",
"serde", "serde",
"time-core", "time-core",
"time-macros", "time-macros",
@@ -3435,18 +3260,6 @@ dependencies = [
"time-core", "time-core",
] ]
[[package]]
name = "tiny_http"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "389915df6413a2e74fb181895f933386023c71110878cd0825588928e64cdc82"
dependencies = [
"ascii",
"chunked_transfer",
"httpdate",
"log",
]
[[package]] [[package]]
name = "tokio" name = "tokio"
version = "1.26.0" version = "1.26.0"
@@ -3716,15 +3529,6 @@ version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642"
[[package]]
name = "twoway"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59b11b2b5241ba34be09c3cc85a36e56e48f9888862e19cedf23336d35316ed1"
dependencies = [
"memchr",
]
[[package]] [[package]]
name = "txoo" name = "txoo"
version = "0.2.0" version = "0.2.0"
@@ -3787,15 +3591,6 @@ dependencies = [
"version_check", "version_check",
] ]
[[package]]
name = "unicase"
version = "2.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50f37be617794602aabbeee0be4f259dc1778fabe05e2d67ee8f79326d5cb4f6"
dependencies = [
"version_check",
]
[[package]] [[package]]
name = "unicode-bidi" name = "unicode-bidi"
version = "0.3.8" version = "0.3.8"

View File

@@ -14,7 +14,7 @@ vls-protocol = { git = "https://gitlab.com/lightning-signer/validating-lightning
vls-proxy = { git = "https://gitlab.com/lightning-signer/validating-lightning-signer.git", rev = "dcd8628893b5504b3ac2d3eb8cc5ed36f36d7625" } vls-proxy = { git = "https://gitlab.com/lightning-signer/validating-lightning-signer.git", rev = "dcd8628893b5504b3ac2d3eb8cc5ed36f36d7625" }
vls-frontend = { git = "https://gitlab.com/lightning-signer/validating-lightning-signer.git", rev = "dcd8628893b5504b3ac2d3eb8cc5ed36f36d7625" } vls-frontend = { git = "https://gitlab.com/lightning-signer/validating-lightning-signer.git", rev = "dcd8628893b5504b3ac2d3eb8cc5ed36f36d7625" }
vls-protocol-client = { git = "https://gitlab.com/lightning-signer/validating-lightning-signer.git", rev = "dcd8628893b5504b3ac2d3eb8cc5ed36f36d7625" } vls-protocol-client = { git = "https://gitlab.com/lightning-signer/validating-lightning-signer.git", rev = "dcd8628893b5504b3ac2d3eb8cc5ed36f36d7625" }
rumqttd = { git = "https://github.com/Evanfeenstra/rumqtt", branch = "sphinx" } rumqttd = { git = "https://github.com/Evanfeenstra/rumqtt", branch = "sphinx-2" }
pretty_env_logger = "0.4.0" pretty_env_logger = "0.4.0"
confy = "0.4.0" confy = "0.4.0"
secp256k1 = { version = "0.24.0", features = ["rand-std", "bitcoin_hashes"] } secp256k1 = { version = "0.24.0", features = ["rand-std", "bitcoin_hashes"] }

View File

@@ -8,7 +8,7 @@ mod unix_fd;
mod util; mod util;
use crate::chain_tracker::MqttSignerPort; use crate::chain_tracker::MqttSignerPort;
use crate::mqtt::start_broker; use crate::mqtt::{check_auth, start_broker};
use crate::unix_fd::SignerLoop; use crate::unix_fd::SignerLoop;
use crate::util::read_broker_config; use crate::util::read_broker_config;
use clap::{arg, App}; use clap::{arg, App};
@@ -16,6 +16,7 @@ use rocket::tokio::{
self, self,
sync::{broadcast, mpsc, oneshot}, sync::{broadcast, mpsc, oneshot},
}; };
use rumqttd::AuthMsg;
use std::env; use std::env;
use std::sync::Arc; use std::sync::Arc;
use url::Url; use url::Url;
@@ -25,9 +26,27 @@ use vls_proxy::connection::{open_parent_fd, UnixConnection};
use vls_proxy::portfront::SignerPortFront; use vls_proxy::portfront::SignerPortFront;
use vls_proxy::util::{add_hsmd_args, handle_hsmd_version}; use vls_proxy::util::{add_hsmd_args, handle_hsmd_version};
pub struct Connections {
pub pubkey: Option<String>,
pub clients: Vec<String>,
}
impl Connections {
pub fn new() -> Self {
Self {
pubkey: None,
clients: Vec::new(),
}
}
pub fn set_pubkey(&mut self, pk: &str) {
self.pubkey = Some(pk.to_string())
}
}
pub struct Channel { pub struct Channel {
pub sequence: u16, pub sequence: u16,
pub sender: mpsc::Sender<ChannelRequest>, pub sender: mpsc::Sender<ChannelRequest>,
pub pubkey: [u8; 33],
} }
/// Responses are received on the oneshot sender /// Responses are received on the oneshot sender
@@ -55,7 +74,7 @@ pub struct ChannelReply {
pub reply: Vec<u8>, pub reply: Vec<u8>,
} }
const CLIENT_ID: &str = "sphinx-1"; // const CLIENT_ID: &str = "sphinx-1";
const BROKER_CONFIG_PATH: &str = "../broker.conf"; const BROKER_CONFIG_PATH: &str = "../broker.conf";
#[rocket::launch] #[rocket::launch]
@@ -99,19 +118,28 @@ async fn run_main(parent_fd: i32) -> rocket::Rocket<rocket::Build> {
let settings = read_broker_config(BROKER_CONFIG_PATH); let settings = read_broker_config(BROKER_CONFIG_PATH);
let (mqtt_tx, mqtt_rx) = mpsc::channel(10000); let (mqtt_tx, mqtt_rx) = mpsc::channel(10000);
let (auth_tx, auth_rx) = std::sync::mpsc::channel::<AuthMsg>();
// let (unix_tx, mut unix_rx) = mpsc::channel(10000); // let (unix_tx, mut unix_rx) = mpsc::channel(10000);
let (status_tx, mut status_rx) = mpsc::channel(10000); let (status_tx, mut status_rx) = mpsc::channel(10000);
let (error_tx, error_rx) = broadcast::channel(10000); let (error_tx, error_rx) = broadcast::channel(10000);
error_log::log_errors(error_rx); error_log::log_errors(error_rx);
let mut conns = Connections::new();
std::thread::spawn(move || {
while let Ok(am) = auth_rx.recv() {
let ok = check_auth(&am.username, &am.password, &mut conns);
let _ = am.reply.send(ok);
}
});
log::info!("=> start broker on network: {}", settings.network); log::info!("=> start broker on network: {}", settings.network);
start_broker(mqtt_rx, status_tx, error_tx.clone(), CLIENT_ID, settings) start_broker(mqtt_rx, status_tx, error_tx.clone(), settings, auth_tx)
.expect("BROKER FAILED TO START"); .expect("BROKER FAILED TO START");
log::info!("=> wait for connected status"); log::info!("=> wait for connected status");
// wait for connection = true // wait for connection = true
let status = status_rx.recv().await.expect("couldnt receive"); let status = status_rx.recv().await.expect("couldnt receive");
log::info!("=> connection status: {}", status); log::info!("=> connected: {}: {}", status.0, status.1);
// assert_eq!(status, true, "expected connected = true");
// let mqtt_tx_ = mqtt_tx.clone(); // let mqtt_tx_ = mqtt_tx.clone();
// tokio::spawn(async move { // tokio::spawn(async move {

View File

@@ -1,7 +1,8 @@
use crate::util::Settings; use crate::util::Settings;
use crate::{ChannelReply, ChannelRequest}; use crate::{ChannelReply, ChannelRequest};
use rocket::tokio::{sync::broadcast, sync::mpsc}; use rocket::tokio::{sync::broadcast, sync::mpsc};
use rumqttd::{protocol::QoS, Alert, AlertEvent, Broker, Config, Notification}; use rumqttd::{Alert, AlertEvent, AuthMsg, Broker, Config, Notification};
use sphinx_signer::sphinx_glyph::sphinx_auther::token::Token;
use sphinx_signer::sphinx_glyph::topics; use sphinx_signer::sphinx_glyph::topics;
use std::time::Duration; use std::time::Duration;
@@ -10,13 +11,13 @@ use std::time::Duration;
pub fn start_broker( pub fn start_broker(
mut receiver: mpsc::Receiver<ChannelRequest>, mut receiver: mpsc::Receiver<ChannelRequest>,
status_sender: mpsc::Sender<bool>, status_sender: mpsc::Sender<(String, bool)>,
error_sender: broadcast::Sender<Vec<u8>>, error_sender: broadcast::Sender<Vec<u8>>,
expected_client_id: &str,
settings: Settings, settings: Settings,
auth_sender: std::sync::mpsc::Sender<AuthMsg>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let conf = config(settings); let conf = config(settings);
let client_id = expected_client_id.to_string(); // let client_id = expected_client_id.to_string();
let mut broker = Broker::new(conf); let mut broker = Broker::new(conf);
let mut alerts = broker.alerts(vec![ let mut alerts = broker.alerts(vec![
@@ -26,8 +27,11 @@ pub fn start_broker(
])?; ])?;
let (mut link_tx, mut link_rx) = broker.link("localclient")?; let (mut link_tx, mut link_rx) = broker.link("localclient")?;
let auth_sender_ = auth_sender.clone();
std::thread::spawn(move || { std::thread::spawn(move || {
broker.start().expect("could not start broker"); broker
.start(Some(auth_sender_))
.expect("could not start broker");
}); });
// connected/disconnected status alerts // connected/disconnected status alerts
@@ -37,13 +41,15 @@ pub fn start_broker(
log::info!("Alert: {:?}", alert); log::info!("Alert: {:?}", alert);
match alert.1 { match alert.1 {
Alert::Event(cid, event) => { Alert::Event(cid, event) => {
if cid == client_id { // dont alert for local connections
let locals = vec!["console", "localclient"];
if !locals.contains(&cid.as_str()) {
if let Some(status) = match event { if let Some(status) = match event {
AlertEvent::Connect => Some(true), AlertEvent::Connect => Some(true),
AlertEvent::Disconnect => Some(false), AlertEvent::Disconnect => Some(false),
_ => None, _ => None,
} { } {
let _ = status_sender_.blocking_send(status); let _ = status_sender_.blocking_send((cid, status));
} }
} }
} }
@@ -78,8 +84,7 @@ pub fn start_broker(
let _relay_task = std::thread::spawn(move || { let _relay_task = std::thread::spawn(move || {
while let Some(msg) = receiver.blocking_recv() { while let Some(msg) = receiver.blocking_recv() {
let qos = QoS::AtLeastOnce; if let Err(e) = link_tx.publish(msg.topic, msg.message) {
if let Err(e) = link_tx.publish_qos(msg.topic, msg.message, qos) {
log::error!("failed to pub to link_tx! {:?}", e); log::error!("failed to pub to link_tx! {:?}", e);
} }
let rep = msg_rx.blocking_recv(); let rep = msg_rx.blocking_recv();
@@ -100,8 +105,30 @@ pub fn start_broker(
Ok(()) Ok(())
} }
pub fn check_auth(username: &str, password: &str, conns: &mut crate::Connections) -> bool {
match Token::from_base64(password) {
Ok(t) => match t.recover() {
Ok(pubkey) => {
if &pubkey.to_string() == username {
if let Some(pk) = &conns.pubkey {
// if there is an existing then it must match it
pk == username
} else {
conns.set_pubkey(username);
true
}
} else {
false
}
}
Err(_) => false,
},
Err(_) => false,
}
}
fn config(settings: Settings) -> Config { fn config(settings: Settings) -> Config {
use rumqttd::{ConnectionSettings, ConsoleSettings, ServerSettings, SphinxLoginCredentials}; use rumqttd::{ConnectionSettings, ConsoleSettings, ServerSettings};
use std::collections::HashMap; use std::collections::HashMap;
use std::net::{Ipv4Addr, SocketAddrV4}; use std::net::{Ipv4Addr, SocketAddrV4};
let router = rumqttd::RouterConfig { let router = rumqttd::RouterConfig {
@@ -126,7 +153,6 @@ fn config(settings: Settings) -> Config {
max_inflight_count: 200, max_inflight_count: 200,
max_inflight_size: 1024, max_inflight_size: 1024,
auth: None, auth: None,
sphinx_auth: Some(SphinxLoginCredentials { within: None }),
dynamic_filters: true, dynamic_filters: true,
}, },
tls: None, tls: None,

View File

@@ -1,13 +1,15 @@
use crate::mqtt::start_broker; use crate::mqtt::{check_auth, start_broker};
use crate::routes::launch_rocket; use crate::routes::launch_rocket;
use crate::util::Settings; use crate::util::Settings;
use crate::ChannelRequest; use crate::ChannelRequest;
use crate::Connections;
use rocket::tokio::{self, sync::broadcast, sync::mpsc}; use rocket::tokio::{self, sync::broadcast, sync::mpsc};
use rumqttd::AuthMsg;
use sphinx_signer::{parser, sphinx_glyph::topics}; use sphinx_signer::{parser, sphinx_glyph::topics};
use vls_protocol::serde_bolt::WireString; use vls_protocol::serde_bolt::WireString;
use vls_protocol::{msgs, msgs::Message}; use vls_protocol::{msgs, msgs::Message};
const CLIENT_ID: &str = "test-1"; // const CLIENT_ID: &str = "test-1";
pub async fn run_test() -> rocket::Rocket<rocket::Build> { pub async fn run_test() -> rocket::Rocket<rocket::Build> {
log::info!("TEST..."); log::info!("TEST...");
@@ -18,24 +20,29 @@ pub async fn run_test() -> rocket::Rocket<rocket::Build> {
let settings = Settings::default(); let settings = Settings::default();
let (tx, rx) = mpsc::channel(1000); let (tx, rx) = mpsc::channel(1000);
let (auth_tx, auth_rx) = std::sync::mpsc::channel::<AuthMsg>();
let (status_tx, mut status_rx) = mpsc::channel(1000); let (status_tx, mut status_rx) = mpsc::channel(1000);
let (error_tx, error_rx) = broadcast::channel(1000); let (error_tx, error_rx) = broadcast::channel(1000);
crate::error_log::log_errors(error_rx); crate::error_log::log_errors(error_rx);
start_broker(rx, status_tx, error_tx.clone(), CLIENT_ID, settings) let mut conns = Connections::new();
std::thread::spawn(move || {
while let Ok(am) = auth_rx.recv() {
let ok = check_auth(&am.username, &am.password, &mut conns);
let _ = am.reply.send(ok);
}
});
start_broker(rx, status_tx, error_tx.clone(), settings, auth_tx)
.expect("FAILED TO START BROKER"); .expect("FAILED TO START BROKER");
log::info!("BROKER started!"); log::info!("BROKER started!");
log::info!("=> wait for connected status");
// wait for connection = true
let status = status_rx.recv().await.expect("couldnt receive");
log::info!("=> connection status: {}", status);
// let mut connected = false; // let mut connected = false;
// let tx_ = tx.clone(); // let tx_ = tx.clone();
tokio::spawn(async move { tokio::spawn(async move {
while let Some(status) = status_rx.recv().await { while let Some(status) = status_rx.recv().await {
log::info!("========> CONNECTED! {}", status); log::info!("========> CONNECTED! {} {}", status.0, status.1);
} }
}); });
// tokio::spawn(async move { // tokio::spawn(async move {

View File

@@ -20,6 +20,7 @@ impl Channel {
Self { Self {
sender, sender,
sequence: 0, sequence: 0,
pubkey: [0; 33], // init with empty pubkey
} }
} }
} }
@@ -95,7 +96,9 @@ impl<C: 'static + Client> SignerLoop<C> {
self.client.write(reply)?; self.client.write(reply)?;
} }
msg => { msg => {
let mut catch_init = false;
if let Message::HsmdInit(m) = msg { if let Message::HsmdInit(m) = msg {
catch_init = true;
if let Some(set) = settings { if let Some(set) = settings {
if ChainHash::using_genesis_block(set.network).as_bytes() if ChainHash::using_genesis_block(set.network).as_bytes()
!= &m.chain_params.0 != &m.chain_params.0
@@ -106,7 +109,7 @@ impl<C: 'static + Client> SignerLoop<C> {
panic!("Got HsmdInit without settings - likely because HsmdInit was sent after startup"); panic!("Got HsmdInit without settings - likely because HsmdInit was sent after startup");
} }
} }
let reply = self.handle_message(raw_msg)?; let reply = self.handle_message(raw_msg, catch_init)?;
// Write the reply to the node // Write the reply to the node
self.client.write_vec(reply)?; self.client.write_vec(reply)?;
// info!("replied {}", self.log_prefix); // info!("replied {}", self.log_prefix);
@@ -115,7 +118,7 @@ impl<C: 'static + Client> SignerLoop<C> {
} }
} }
fn handle_message(&mut self, message: Vec<u8>) -> Result<Vec<u8>> { fn handle_message(&mut self, message: Vec<u8>, catch_init: bool) -> Result<Vec<u8>> {
let dbid = self.client_id.as_ref().map(|c| c.dbid).unwrap_or(0); let dbid = self.client_id.as_ref().map(|c| c.dbid).unwrap_or(0);
let peer_id = self let peer_id = self
.client_id .client_id
@@ -123,13 +126,29 @@ impl<C: 'static + Client> SignerLoop<C> {
.map(|c| c.peer_id.serialize()) .map(|c| c.peer_id.serialize())
.unwrap_or([0u8; 33]); .unwrap_or([0u8; 33]);
let md = parser::raw_request_from_bytes(message, self.chan.sequence, peer_id, dbid)?; let md = parser::raw_request_from_bytes(message, self.chan.sequence, peer_id, dbid)?;
// send to glyph
let reply_rx = self.send_request(md)?; let reply_rx = self.send_request(md)?;
let res = self.get_reply(reply_rx)?; let res = self.get_reply(reply_rx)?;
let reply = parser::raw_response_from_bytes(res, self.chan.sequence)?; let reply = parser::raw_response_from_bytes(res, self.chan.sequence)?;
// add to the sequence
self.chan.sequence = self.chan.sequence.wrapping_add(1); self.chan.sequence = self.chan.sequence.wrapping_add(1);
// catch the pubkey if its the first one connection
if catch_init {
let _ = self.set_channel_pubkey(reply.clone());
}
Ok(reply) Ok(reply)
} }
fn set_channel_pubkey(&mut self, raw_msg: Vec<u8>) -> Result<()> {
let msg = msgs::from_vec(raw_msg.clone())?;
match msg {
Message::HsmdInitReplyV2(r) => self.chan.pubkey = r.node_id.0,
Message::HsmdInit2Reply(r) => self.chan.pubkey = r.node_id.0,
_ => (),
};
Ok(())
}
fn send_request(&mut self, message: Vec<u8>) -> Result<oneshot::Receiver<ChannelReply>> { fn send_request(&mut self, message: Vec<u8>) -> Result<oneshot::Receiver<ChannelReply>> {
// Send a request to the MQTT handler to send to signer // Send a request to the MQTT handler to send to signer
let (request, reply_rx) = ChannelRequest::new(topics::VLS, message); let (request, reply_rx) = ChannelRequest::new(topics::VLS, message);

View File

@@ -83,7 +83,7 @@ pub fn setup_logging(who: &str, level_arg: &str) {
.level(log::LevelFilter::from_str(&level).expect("level")) .level(log::LevelFilter::from_str(&level).expect("level"))
.level_for("h2", log::LevelFilter::Info) .level_for("h2", log::LevelFilter::Info)
.level_for("sled", log::LevelFilter::Info) .level_for("sled", log::LevelFilter::Info)
// .level_for("rumqttd", log::LevelFilter::Warn) .level_for("rumqttd", log::LevelFilter::Warn)
.level_for("rocket", log::LevelFilter::Warn) .level_for("rocket", log::LevelFilter::Warn)
.level_for("tracing", log::LevelFilter::Warn) .level_for("tracing", log::LevelFilter::Warn)
.level_for("_", log::LevelFilter::Warn) .level_for("_", log::LevelFilter::Warn)