Merge pull request #88 from stakwork/fix/frontend-header

Fix/frontend header
This commit is contained in:
Evan Feenstra
2023-06-27 13:31:42 -07:00
committed by GitHub
4 changed files with 68 additions and 34 deletions

View File

@@ -1,19 +1,23 @@
use crate::conn::{ChannelReply, ChannelRequest};
use crate::conn::{ChannelRequest, LssReq};
use crate::looper::{done_being_busy, try_to_get_busy};
use anyhow::Result;
use async_trait::async_trait;
use rocket::tokio::sync::{mpsc, oneshot};
use sphinx_signer::sphinx_glyph::topics;
use vls_protocol::{Error, Result};
use rocket::tokio;
use sphinx_signer::{parser, sphinx_glyph::topics};
use std::time::Duration;
use tokio::sync::mpsc;
use vls_protocol::Error;
use vls_protocol_client::{ClientResult, SignerPort};
pub struct MqttSignerPort {
sender: mpsc::Sender<ChannelRequest>,
lss_tx: mpsc::Sender<LssReq>,
}
#[async_trait]
impl SignerPort for MqttSignerPort {
async fn handle_message(&self, message: Vec<u8>) -> ClientResult<Vec<u8>> {
let reply_rx = self.send_request(message).await?;
self.get_reply(reply_rx).await
Ok(self.send_and_wait(message).await.map_err(|_| Error::Eof)?)
}
fn is_ready(&self) -> bool {
@@ -22,18 +26,48 @@ impl SignerPort for MqttSignerPort {
}
impl MqttSignerPort {
pub fn new(sender: mpsc::Sender<ChannelRequest>) -> Self {
Self { sender }
pub fn new(sender: mpsc::Sender<ChannelRequest>, lss_tx: mpsc::Sender<LssReq>) -> Self {
Self { sender, lss_tx }
}
async fn send_request(&self, message: Vec<u8>) -> Result<oneshot::Receiver<ChannelReply>> {
let (request, reply_rx) = ChannelRequest::new(topics::VLS, message);
self.sender.send(request).await.map_err(|_| Error::Eof)?;
Ok(reply_rx)
async fn send_and_wait(&self, message: Vec<u8>) -> Result<Vec<u8>> {
// wait until not busy
loop {
match try_to_get_busy() {
Ok(_) => break,
Err(_) => tokio::time::sleep(Duration::from_millis(5)).await,
};
}
// add the serial request header
let m = parser::raw_request_from_bytes(message, 0, [0; 33], 0)?;
let (res_topic, res) = self.send_request_wait(topics::VLS, m).await?;
let mut the_res = res.clone();
if res_topic == topics::LSS_RES {
// send LSS instead
let lss_reply = self.send_lss(res).await?;
let (res_topic2, res2) = self.send_request_wait(topics::LSS_MSG, lss_reply).await?;
if res_topic2 != topics::VLS_RETURN {
log::warn!("ChainTracker got a topic NOT on {}", topics::VLS_RETURN);
}
the_res = res2;
}
// remove the serial request header
let r = parser::raw_response_from_bytes(the_res, 0)?;
done_being_busy();
Ok(r)
}
async fn get_reply(&self, reply_rx: oneshot::Receiver<ChannelReply>) -> ClientResult<Vec<u8>> {
let reply = reply_rx.await.map_err(|_| Error::Eof)?;
Ok(reply.reply)
async fn send_request_wait(&self, topic: &str, message: Vec<u8>) -> Result<(String, Vec<u8>)> {
let (request, reply_rx) = ChannelRequest::new(topic, message);
self.sender.send(request).await?;
let reply = reply_rx.await?;
Ok((reply.topic_end, reply.reply))
}
async fn send_lss(&self, message: Vec<u8>) -> Result<Vec<u8>> {
let (request, reply_rx) = LssReq::new(message);
self.lss_tx.send(request).await?;
let res = reply_rx.await?;
Ok(res)
}
}

View File

@@ -5,15 +5,24 @@ use log::*;
use rocket::tokio::sync::mpsc;
use secp256k1::PublicKey;
use sphinx_signer::{parser, sphinx_glyph::topics};
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use std::time::Duration;
use vls_protocol::{msgs, msgs::Message, Error, Result};
use vls_proxy::client::Client;
pub static BUSY: AtomicBool = AtomicBool::new(false);
// set BUSY to true if its false
pub fn try_to_get_busy() -> std::result::Result<bool, bool> {
BUSY.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
}
// set BUSY back to false
pub fn done_being_busy() {
BUSY.store(false, Ordering::Relaxed);
}
#[derive(Clone, Debug)]
pub struct ClientId {
pub peer_id: PublicKey,
@@ -37,7 +46,6 @@ pub struct SignerLoop<C: 'static + Client> {
chan: Channel,
client_id: Option<ClientId>,
lss_tx: mpsc::Sender<LssReq>,
busy: Arc<AtomicBool>,
}
impl<C: 'static + Client> SignerLoop<C> {
@@ -54,7 +62,6 @@ impl<C: 'static + Client> SignerLoop<C> {
chan: Channel::new(sender),
client_id: None,
lss_tx,
busy: Arc::new(AtomicBool::new(false)),
}
}
@@ -64,7 +71,6 @@ impl<C: 'static + Client> SignerLoop<C> {
lss_tx: mpsc::Sender<LssReq>,
sender: mpsc::Sender<ChannelRequest>,
client_id: ClientId,
busy: Arc<AtomicBool>,
) -> Self {
let log_prefix = format!("{}/{}", std::process::id(), client.id());
Self {
@@ -73,7 +79,6 @@ impl<C: 'static + Client> SignerLoop<C> {
chan: Channel::new(sender),
client_id: Some(client_id),
lss_tx,
busy,
}
}
@@ -108,7 +113,6 @@ impl<C: 'static + Client> SignerLoop<C> {
self.lss_tx.clone(),
self.chan.sender.clone(),
client_id,
self.busy.clone(),
);
thread::spawn(move || new_loop.start(None));
}
@@ -142,10 +146,7 @@ impl<C: 'static + Client> SignerLoop<C> {
fn handle_message(&mut self, message: Vec<u8>, catch_init: bool) -> Result<Vec<u8>> {
// wait until not busy
loop {
match self
.busy
.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
{
match try_to_get_busy() {
Ok(_) => break,
Err(_) => thread::sleep(Duration::from_millis(5)),
};
@@ -185,7 +186,7 @@ impl<C: 'static + Client> SignerLoop<C> {
let _ = self.set_channel_pubkey(reply.clone());
}
// unlock
self.busy.store(false, Ordering::Relaxed);
done_being_busy();
Ok(reply)
}
@@ -220,9 +221,8 @@ impl<C: 'static + Client> SignerLoop<C> {
}
fn send_lss(&mut self, message: Vec<u8>) -> Result<Vec<u8>> {
// Send a request to the MQTT handler to send to signer
// Send a request to the LSS server
let (request, reply_rx) = LssReq::new(message);
// This can fail if MQTT shuts down
self.lss_tx.blocking_send(request).map_err(|_| Error::Eof)?;
let res = reply_rx.blocking_recv().map_err(|_| Error::Eof)?;
Ok(res)

View File

@@ -105,7 +105,7 @@ async fn run_main(parent_fd: i32) -> rocket::Rocket<rocket::Build> {
};
if let Ok(btc_url) = env::var("BITCOIND_RPC_URL") {
let signer_port = MqttSignerPort::new(mqtt_tx.clone());
let signer_port = MqttSignerPort::new(mqtt_tx.clone(), lss_tx.clone());
let port_front = SignerPortFront::new(Arc::new(signer_port), settings.network);
let source_factory = Arc::new(SourceFactory::new(".", settings.network));
let frontend = Frontend::new(

View File

@@ -284,8 +284,8 @@ fn config(settings: Settings) -> Config {
connections: ConnectionSettings {
connection_timeout_ms: 5000,
throttle_delay_ms: 0,
max_payload_size: 20480,
max_inflight_count: 200,
max_payload_size: 262144,
max_inflight_count: 256,
max_inflight_size: 1024,
auth: None,
dynamic_filters: true,