broker: client list is now a hashmap

the key is the fixed, per-signer id
the value is the session id, generated randomly for each connection
This commit is contained in:
irriden
2023-07-26 00:54:16 +00:00
parent ea593cfea4
commit e408866e73
2 changed files with 34 additions and 20 deletions

View File

@@ -1,38 +1,40 @@
use anyhow::Result;
use rocket::tokio::sync::{mpsc, oneshot};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Debug, Serialize, Deserialize)]
pub struct Connections {
pub pubkey: Option<String>,
pub clients: Vec<String>,
pub clients: HashMap<String, String>,
pub current: String,
}
impl Connections {
pub fn new() -> Self {
Self {
pubkey: None,
clients: Vec::new(),
clients: HashMap::new(),
current: String::default(),
}
}
pub fn set_pubkey(&mut self, pk: &str) {
self.pubkey = Some(pk.to_string())
}
pub fn set_current(&mut self, cid: String) {
self.current = cid;
}
pub fn add_client(&mut self, cid: &str) {
let cids = cid.to_string();
if !self.clients.contains(&cids) {
// new client is added to beginning of Vec
self.clients.insert(0, cids);
}
let (value, key) = cid.split_once("_").expect("id is missing _ delimeter");
self.clients.insert(value.to_string(), key.to_string());
}
pub fn remove_client(&mut self, cid: &str) {
let cids = cid.to_string();
if self.clients.contains(&cids) {
self.clients.retain(|x| x != cid)
}
let (value, _key) = cid.split_once("_").expect("id is missing _ delimeter");
self.clients.remove(value);
}
pub fn client_action(&mut self, cid: &str, connected: bool) {
if connected {
self.current = cid.to_string();
self.add_client(cid);
} else {
self.remove_client(cid);

View File

@@ -159,11 +159,11 @@ fn pub_and_wait(
let reply = if let Some(cid) = msg.cid.clone() {
// for a specific client
log::debug!("publishing to a specific client");
let res_opt = pub_timeout(&cid, &msg.topic, &msg.message, &msg_rx, link_tx);
res_opt
pub_timeout(&cid, &msg.topic, &msg.message, &msg_rx, link_tx)
} else {
log::debug!("publishing to all clients");
let cs = conns_.lock().unwrap();
let current = cs.current.clone();
let client_list = cs.clients.clone();
log::debug!("got the list lock!");
drop(cs);
@@ -173,11 +173,20 @@ fn pub_and_wait(
std::thread::sleep(Duration::from_secs(1));
None
} else {
let mut rep = None;
for cid in client_list.iter() {
rep = pub_timeout(&cid, &msg.topic, &msg.message, &msg_rx, link_tx);
if let Some(_) = &rep {
break;
// Try the current connection
let mut rep = pub_timeout(&current, &msg.topic, &msg.message, &msg_rx, link_tx);
// If that failed, try looking for some other signer
if rep.is_none() {
for (id, session) in client_list.iter() {
let cid = format!("{}_{}", id, session);
rep = pub_timeout(&cid, &msg.topic, &msg.message, &msg_rx, link_tx);
if rep.is_some() {
let mut cs = conns_.lock().unwrap();
log::debug!("got the list lock!");
cs.set_current(cid);
drop(cs);
break;
}
}
}
rep
@@ -192,13 +201,16 @@ fn pub_and_wait(
} else {
log::debug!("couldn't reach any clients...");
}
if let Some(attempt) = retries {
if counter == attempt {
if let Some(max) = retries {
log::debug!("counter: {}, retries: {}", counter, max);
if counter == max {
if let Err(_) = msg.reply_tx.send(ChannelReply::empty()) {
log::warn!("could not send on reply_tx");
}
break;
}
} else {
log::debug!("retrying indefinitely");
}
counter = counter + 1;
}