use atomic bool to lock loop erhandler

This commit is contained in:
Evan Feenstra
2023-06-02 13:30:38 -07:00
parent c380865d1f
commit 17e5b9eb9f

View File

@@ -5,7 +5,12 @@ use log::*;
use rocket::tokio::sync::mpsc;
use secp256k1::PublicKey;
use sphinx_signer::{parser, sphinx_glyph::topics};
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use std::thread;
use std::time::Duration;
use vls_protocol::{msgs, msgs::Message, Error, Result};
use vls_proxy::client::Client;
@@ -32,6 +37,7 @@ pub struct SignerLoop<C: 'static + Client> {
chan: Channel,
client_id: Option<ClientId>,
lss_tx: mpsc::Sender<LssReq>,
busy: Arc<AtomicBool>,
}
impl<C: 'static + Client> SignerLoop<C> {
@@ -48,6 +54,7 @@ impl<C: 'static + Client> SignerLoop<C> {
chan: Channel::new(sender),
client_id: None,
lss_tx,
busy: Arc::new(AtomicBool::new(false)),
}
}
@@ -57,6 +64,7 @@ impl<C: 'static + Client> SignerLoop<C> {
lss_tx: mpsc::Sender<LssReq>,
sender: mpsc::Sender<ChannelRequest>,
client_id: ClientId,
busy: Arc<AtomicBool>,
) -> Self {
let log_prefix = format!("{}/{}", std::process::id(), client.id());
Self {
@@ -65,6 +73,7 @@ impl<C: 'static + Client> SignerLoop<C> {
chan: Channel::new(sender),
client_id: Some(client_id),
lss_tx,
busy,
}
}
@@ -99,6 +108,7 @@ impl<C: 'static + Client> SignerLoop<C> {
self.lss_tx.clone(),
self.chan.sender.clone(),
client_id,
self.busy.clone(),
);
thread::spawn(move || new_loop.start(None));
}
@@ -130,6 +140,19 @@ impl<C: 'static + Client> SignerLoop<C> {
}
fn handle_message(&mut self, message: Vec<u8>, catch_init: bool) -> Result<Vec<u8>> {
// let l = self.lock.lock().unwrap();
loop {
let is_busy = self.busy.load(Ordering::Relaxed);
if !is_busy {
// busy now!
self.busy.store(true, Ordering::Relaxed);
break;
} else {
// wait 5 ms
thread::sleep(Duration::from_millis(5));
}
}
let dbid = self.client_id.as_ref().map(|c| c.dbid).unwrap_or(0);
let peer_id = self
.client_id
@@ -156,6 +179,8 @@ impl<C: 'static + Client> SignerLoop<C> {
if catch_init {
let _ = self.set_channel_pubkey(reply.clone());
}
// unlock
self.busy.store(false, Ordering::Relaxed);
Ok(reply)
}