mirror of
https://github.com/stakwork/sphinx-key.git
synced 2026-01-31 13:24:54 +01:00
broker: add signer types
reads the first byte of the hello message payload to determine the type. if the payload is empty, the signer type is receivesend, otherwise, whatever the byte specifies. signer types are then stored as values in the conns hash table whenever a signer is added to it.
This commit is contained in:
10
broker/Cargo.lock
generated
10
broker/Cargo.lock
generated
@@ -1718,7 +1718,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "lss-connector"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/stakwork/sphinx-rs?rev=1abce4dedfc6be8cb261e4faa11d9a753ee323ce#1abce4dedfc6be8cb261e4faa11d9a753ee323ce"
|
||||
source = "git+https://github.com/stakwork/sphinx-rs?rev=64d5c8aa166c4ff46b0817bc4011f39a3c949de7#64d5c8aa166c4ff46b0817bc4011f39a3c949de7"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"lightning-storage-server",
|
||||
@@ -2720,7 +2720,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "rmp-utils"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/stakwork/sphinx-rs?rev=1abce4dedfc6be8cb261e4faa11d9a753ee323ce#1abce4dedfc6be8cb261e4faa11d9a753ee323ce"
|
||||
source = "git+https://github.com/stakwork/sphinx-rs?rev=64d5c8aa166c4ff46b0817bc4011f39a3c949de7#64d5c8aa166c4ff46b0817bc4011f39a3c949de7"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"log",
|
||||
@@ -3311,7 +3311,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "sphinx-auther"
|
||||
version = "0.1.12"
|
||||
source = "git+https://github.com/stakwork/sphinx-rs?rev=1abce4dedfc6be8cb261e4faa11d9a753ee323ce#1abce4dedfc6be8cb261e4faa11d9a753ee323ce"
|
||||
source = "git+https://github.com/stakwork/sphinx-rs?rev=64d5c8aa166c4ff46b0817bc4011f39a3c949de7#64d5c8aa166c4ff46b0817bc4011f39a3c949de7"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"base64 0.21.2",
|
||||
@@ -3323,7 +3323,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "sphinx-glyph"
|
||||
version = "0.1.2"
|
||||
source = "git+https://github.com/stakwork/sphinx-rs?rev=1abce4dedfc6be8cb261e4faa11d9a753ee323ce#1abce4dedfc6be8cb261e4faa11d9a753ee323ce"
|
||||
source = "git+https://github.com/stakwork/sphinx-rs?rev=64d5c8aa166c4ff46b0817bc4011f39a3c949de7#64d5c8aa166c4ff46b0817bc4011f39a3c949de7"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"hex",
|
||||
@@ -3369,7 +3369,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "sphinx-signer"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/stakwork/sphinx-rs?rev=1abce4dedfc6be8cb261e4faa11d9a753ee323ce#1abce4dedfc6be8cb261e4faa11d9a753ee323ce"
|
||||
source = "git+https://github.com/stakwork/sphinx-rs?rev=64d5c8aa166c4ff46b0817bc4011f39a3c949de7#64d5c8aa166c4ff46b0817bc4011f39a3c949de7"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"bip39",
|
||||
|
||||
@@ -39,8 +39,8 @@ vls-proxy = { git = "https://gitlab.com/lightning-signer/validating-li
|
||||
# vls-protocol-client = { path = "../../vls/vls-protocol-client" }
|
||||
# vls-proxy = { path = "../../vls/vls-proxy" }
|
||||
|
||||
lss-connector = { git = "https://github.com/stakwork/sphinx-rs", rev = "1abce4dedfc6be8cb261e4faa11d9a753ee323ce" }
|
||||
sphinx-signer = { git = "https://github.com/stakwork/sphinx-rs", rev = "1abce4dedfc6be8cb261e4faa11d9a753ee323ce" }
|
||||
lss-connector = { git = "https://github.com/stakwork/sphinx-rs", rev = "64d5c8aa166c4ff46b0817bc4011f39a3c949de7" }
|
||||
sphinx-signer = { git = "https://github.com/stakwork/sphinx-rs", rev = "64d5c8aa166c4ff46b0817bc4011f39a3c949de7" }
|
||||
# lss-connector = { path = "../../sphinx-rs/lss-connector" }
|
||||
# sphinx-signer = { path = "../../sphinx-rs/signer" }
|
||||
|
||||
|
||||
@@ -1,12 +1,13 @@
|
||||
use anyhow::Result;
|
||||
use rocket::tokio::sync::{mpsc, oneshot};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use sphinx_signer::sphinx_glyph::types::SignerType;
|
||||
use std::collections::HashMap;
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct Connections {
|
||||
pub pubkey: Option<String>,
|
||||
pub clients: HashMap<String, bool>,
|
||||
pub clients: HashMap<String, SignerType>,
|
||||
pub current: Option<String>,
|
||||
}
|
||||
|
||||
@@ -27,23 +28,16 @@ impl Connections {
|
||||
pub fn set_current(&mut self, cid: String) {
|
||||
self.current = Some(cid);
|
||||
}
|
||||
fn add_client(&mut self, cid: &str) {
|
||||
self.clients.insert(cid.to_string(), true);
|
||||
pub fn add_client(&mut self, cid: &str, signer_type: SignerType) {
|
||||
self.clients.insert(cid.to_string(), signer_type);
|
||||
self.current = Some(cid.to_string());
|
||||
}
|
||||
fn remove_client(&mut self, cid: &str) {
|
||||
pub fn remove_client(&mut self, cid: &str) {
|
||||
self.clients.remove(cid);
|
||||
if self.current == Some(cid.to_string()) {
|
||||
self.current = None;
|
||||
}
|
||||
}
|
||||
pub fn client_action(&mut self, cid: &str, connected: bool) {
|
||||
if connected {
|
||||
self.add_client(cid);
|
||||
} else {
|
||||
self.remove_client(cid);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Channel {
|
||||
|
||||
@@ -163,27 +163,32 @@ pub async fn broker_setup(
|
||||
let conns_ = conns.clone();
|
||||
std::thread::spawn(move || {
|
||||
log::info!("=> waiting first connection...");
|
||||
while let Ok((cid, connected)) = status_rx.recv() {
|
||||
while let Ok((cid, connected, signer_type_opt)) = status_rx.recv() {
|
||||
log::info!("=> connection status: {}: {}", cid, connected);
|
||||
let mut cs = conns_.lock().unwrap();
|
||||
// drop it from list until ready
|
||||
cs.client_action(&cid, false);
|
||||
cs.remove_client(&cid);
|
||||
drop(cs);
|
||||
if connected {
|
||||
// In mqtt.rs, we always send a signer type if connected == true
|
||||
let signer_type = signer_type_opt.unwrap();
|
||||
let (dance_complete_tx, dance_complete_rx) = std_oneshot::channel::<bool>();
|
||||
let _ = conn_tx.blocking_send((cid.clone(), dance_complete_tx));
|
||||
let dance_complete = dance_complete_rx.recv().unwrap_or_else(|e| {
|
||||
log::info!(
|
||||
log::warn!(
|
||||
"dance_complete channel died before receiving response: {}",
|
||||
e
|
||||
);
|
||||
false
|
||||
});
|
||||
log::info!("adding client to the list? {}", dance_complete);
|
||||
let mut cs = conns_.lock().unwrap();
|
||||
cs.client_action(&cid, dance_complete);
|
||||
log::debug!("List: {:?}, action: {}", cs, dance_complete);
|
||||
drop(cs);
|
||||
if dance_complete {
|
||||
log::info!("adding client to the list: {}, type: {:?}", &cid, signer_type);
|
||||
let mut cs = conns_.lock().unwrap();
|
||||
cs.add_client(&cid, signer_type);
|
||||
drop(cs);
|
||||
} else {
|
||||
log::warn!("dance failed, client not added to the list");
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
@@ -4,7 +4,7 @@ use crate::util::Settings;
|
||||
use rocket::tokio::{sync::broadcast, sync::mpsc};
|
||||
use rumqttd::{local::LinkTx, AuthMsg, Broker, Config, Notification};
|
||||
use sphinx_signer::sphinx_glyph::sphinx_auther::token::Token;
|
||||
use sphinx_signer::sphinx_glyph::topics;
|
||||
use sphinx_signer::sphinx_glyph::{topics, types::SignerType};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Duration;
|
||||
|
||||
@@ -15,7 +15,7 @@ pub fn start_broker(
|
||||
settings: Settings,
|
||||
mut receiver: mpsc::Receiver<ChannelRequest>,
|
||||
mut init_receiver: mpsc::Receiver<ChannelRequest>,
|
||||
status_sender: std::sync::mpsc::Sender<(String, bool)>,
|
||||
status_sender: std::sync::mpsc::Sender<(String, bool, Option<SignerType>)>,
|
||||
error_sender: broadcast::Sender<Vec<u8>>,
|
||||
auth_sender: std::sync::mpsc::Sender<AuthMsg>,
|
||||
connections: Arc<Mutex<Connections>>,
|
||||
@@ -39,18 +39,19 @@ pub fn start_broker(
|
||||
});
|
||||
|
||||
// connected/disconnected status alerts
|
||||
let (internal_status_tx, internal_status_rx) = std::sync::mpsc::channel::<(bool, String)>();
|
||||
let (internal_status_tx, internal_status_rx) =
|
||||
std::sync::mpsc::channel::<(bool, String, Option<SignerType>)>();
|
||||
|
||||
// track connections
|
||||
let link_tx_ = link_tx.clone();
|
||||
let _conns_task = std::thread::spawn(move || {
|
||||
while let Ok((is, cid)) = internal_status_rx.recv() {
|
||||
while let Ok((is, cid, signer_type)) = internal_status_rx.recv() {
|
||||
if is {
|
||||
subs(&cid, link_tx_.clone());
|
||||
} else {
|
||||
unsubs(&cid, link_tx_.clone());
|
||||
}
|
||||
let _ = status_sender.send((cid, is));
|
||||
let _ = status_sender.send((cid, is, signer_type));
|
||||
}
|
||||
});
|
||||
|
||||
@@ -112,9 +113,21 @@ pub fn start_broker(
|
||||
let topic_end = ts[1].to_string();
|
||||
|
||||
if topic.ends_with(topics::HELLO) {
|
||||
let _ = internal_status_tx.send((true, cid));
|
||||
let signer_type = match f.publish.payload.get(0) {
|
||||
Some(byte) => match SignerType::from_byte(*byte) {
|
||||
Ok(signer_type) => signer_type,
|
||||
Err(e) => {
|
||||
log::warn!("Could not deserialize signer type: {}", e);
|
||||
continue;
|
||||
}
|
||||
},
|
||||
// This is the ReceiveSend signer type
|
||||
None => SignerType::default(),
|
||||
};
|
||||
log::debug!("caught hello message for id: {}, type: {:?}", cid, signer_type);
|
||||
let _ = internal_status_tx.send((true, cid, Some(signer_type)));
|
||||
} else if topic.ends_with(topics::BYE) {
|
||||
let _ = internal_status_tx.send((false, cid));
|
||||
let _ = internal_status_tx.send((false, cid, None));
|
||||
} else {
|
||||
// VLS, CONTROL, LSS
|
||||
let pld = f.publish.payload.to_vec();
|
||||
|
||||
Reference in New Issue
Block a user