multi-signer broker refactor

This commit is contained in:
Evan Feenstra
2023-03-21 13:21:39 -07:00
parent 700e8f4ef3
commit 167d2d3c09
7 changed files with 297 additions and 123 deletions

View File

@@ -10,15 +10,16 @@ mod util;
use crate::chain_tracker::MqttSignerPort;
use crate::mqtt::{check_auth, start_broker};
use crate::unix_fd::SignerLoop;
use crate::util::read_broker_config;
use crate::util::{read_broker_config, Settings};
use clap::{arg, App};
use rocket::tokio::{
self,
sync::{broadcast, mpsc, oneshot},
};
use rumqttd::AuthMsg;
use rumqttd::{oneshot as std_oneshot, AuthMsg};
use serde::{Deserialize, Serialize};
use std::env;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use url::Url;
use vls_frontend::{frontend::SourceFactory, Frontend};
use vls_proxy::client::UnixClient;
@@ -26,6 +27,7 @@ use vls_proxy::connection::{open_parent_fd, UnixConnection};
use vls_proxy::portfront::SignerPortFront;
use vls_proxy::util::{add_hsmd_args, handle_hsmd_version};
#[derive(Debug, Serialize, Deserialize)]
pub struct Connections {
pub pubkey: Option<String>,
pub clients: Vec<String>,
@@ -41,6 +43,25 @@ impl Connections {
pub fn set_pubkey(&mut self, pk: &str) {
self.pubkey = Some(pk.to_string())
}
pub fn add_client(&mut self, cid: &str) {
let cids = cid.to_string();
if !self.clients.contains(&cids) {
self.clients.push(cids)
}
}
pub fn remove_client(&mut self, cid: &str) {
let cids = cid.to_string();
if self.clients.contains(&cids) {
self.clients.retain(|x| x != cid)
}
}
pub fn client_action(&mut self, cid: &str, connected: bool) {
if connected {
self.add_client(cid);
} else {
self.remove_client(cid);
}
}
}
pub struct Channel {
@@ -55,6 +76,7 @@ pub struct ChannelRequest {
pub topic: String,
pub message: Vec<u8>,
pub reply_tx: oneshot::Sender<ChannelReply>,
pub cid: Option<String>, // if it exists, only try the one client
}
impl ChannelRequest {
pub fn new(topic: &str, message: Vec<u8>) -> (Self, oneshot::Receiver<ChannelReply>) {
@@ -63,9 +85,22 @@ impl ChannelRequest {
topic: topic.to_string(),
message,
reply_tx,
cid: None,
};
(cr, reply_rx)
}
pub fn for_cid(&mut self, cid: &str) {
self.cid = Some(cid.to_string())
}
pub fn new_for(
cid: &str,
topic: &str,
message: Vec<u8>,
) -> (Self, oneshot::Receiver<ChannelReply>) {
let (mut cr, reply_rx) = ChannelRequest::new(topic, message);
cr.for_cid(cid);
(cr, reply_rx)
}
}
// mpsc reply
@@ -100,9 +135,9 @@ async fn rocket() -> _ {
panic!("end")
} else {
if matches.is_present("test") {
run_test::run_test().await
run_test::run_test()
} else {
run_main(parent_fd).await
run_main(parent_fd)
}
}
}
@@ -114,32 +149,71 @@ fn make_clap_app() -> App<'static> {
add_hsmd_args(app)
}
async fn run_main(parent_fd: i32) -> rocket::Rocket<rocket::Build> {
let settings = read_broker_config(BROKER_CONFIG_PATH);
let (mqtt_tx, mqtt_rx) = mpsc::channel(10000);
// blocks until a connection received
pub fn main_setup(
settings: Settings,
mqtt_rx: mpsc::Receiver<ChannelRequest>,
error_tx: broadcast::Sender<Vec<u8>>,
) -> Arc<Mutex<Connections>> {
let (auth_tx, auth_rx) = std::sync::mpsc::channel::<AuthMsg>();
// let (unix_tx, mut unix_rx) = mpsc::channel(10000);
let (status_tx, mut status_rx) = mpsc::channel(10000);
let (error_tx, error_rx) = broadcast::channel(10000);
error_log::log_errors(error_rx);
let (status_tx, status_rx) = std::sync::mpsc::channel();
let mut conns = Connections::new();
let conns1 = Connections::new();
let conns = Arc::new(Mutex::new(conns1));
// authenticator
let conns_ = conns.clone();
std::thread::spawn(move || {
while let Ok(am) = auth_rx.recv() {
let ok = check_auth(&am.username, &am.password, &mut conns);
let mut cs = conns_.lock().unwrap();
let ok = check_auth(&am.username, &am.password, &mut cs);
let _ = am.reply.send(ok);
}
});
// broker
log::info!("=> start broker on network: {}", settings.network);
start_broker(mqtt_rx, status_tx, error_tx.clone(), settings, auth_tx)
.expect("BROKER FAILED TO START");
log::info!("=> wait for connected status");
// wait for connection = true
let status = status_rx.recv().await.expect("couldnt receive");
log::info!("=> connected: {}: {}", status.0, status.1);
start_broker(
settings,
mqtt_rx,
status_tx,
error_tx.clone(),
auth_tx,
conns.clone(),
)
.expect("BROKER FAILED TO START");
// client connections state
let (startup_tx, startup_rx) = std_oneshot::channel();
let conns_ = conns.clone();
std::thread::spawn(move || {
log::info!("=> wait for connected status");
// wait for connection = true
let (cid, connected) = status_rx.recv().expect("couldnt receive");
let mut cs = conns_.lock().unwrap();
cs.client_action(&cid, connected);
drop(cs);
log::info!("=> connected: {}: {}", cid, connected);
let _ = startup_tx.send(true);
while let Ok((cid, connected)) = status_rx.recv() {
let mut cs = conns_.lock().unwrap();
cs.client_action(&cid, connected);
drop(cs)
}
});
let _ = startup_rx.recv();
conns
}
fn run_main(parent_fd: i32) -> rocket::Rocket<rocket::Build> {
let settings = read_broker_config(BROKER_CONFIG_PATH);
let (mqtt_tx, mqtt_rx) = mpsc::channel(10000);
let (error_tx, error_rx) = broadcast::channel(10000);
error_log::log_errors(error_rx);
let conns = main_setup(settings, mqtt_rx, error_tx.clone());
// let mqtt_tx_ = mqtt_tx.clone();
// tokio::spawn(async move {
@@ -163,15 +237,16 @@ async fn run_main(parent_fd: i32) -> rocket::Rocket<rocket::Build> {
tokio::spawn(async move {
frontend.start();
});
} else {
log::warn!("Running without a frontend")
}
let conn = UnixConnection::new(parent_fd);
let client = UnixClient::new(conn);
// TODO pass status_rx into SignerLoop
let mut signer_loop = SignerLoop::new(client, mqtt_tx.clone());
// spawn CLN listener on a std thread
let cln_client = UnixClient::new(UnixConnection::new(parent_fd));
// TODO pass status_rx into SignerLoop?
let mut signer_loop = SignerLoop::new(cln_client, mqtt_tx.clone());
// spawn CLN listener
std::thread::spawn(move || {
signer_loop.start(Some(settings));
});
routes::launch_rocket(mqtt_tx, error_tx, settings)
routes::launch_rocket(mqtt_tx, error_tx, settings, conns)
}