From 17e5b9eb9fac9ab8c6f9c457ae60f3d297e42f3e Mon Sep 17 00:00:00 2001 From: Evan Feenstra Date: Fri, 2 Jun 2023 13:30:38 -0700 Subject: [PATCH] use atomic bool to lock loop erhandler --- broker/src/looper.rs | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/broker/src/looper.rs b/broker/src/looper.rs index 67c1b73..60b60a1 100644 --- a/broker/src/looper.rs +++ b/broker/src/looper.rs @@ -5,7 +5,12 @@ use log::*; use rocket::tokio::sync::mpsc; use secp256k1::PublicKey; use sphinx_signer::{parser, sphinx_glyph::topics}; +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, +}; use std::thread; +use std::time::Duration; use vls_protocol::{msgs, msgs::Message, Error, Result}; use vls_proxy::client::Client; @@ -32,6 +37,7 @@ pub struct SignerLoop { chan: Channel, client_id: Option, lss_tx: mpsc::Sender, + busy: Arc, } impl SignerLoop { @@ -48,6 +54,7 @@ impl SignerLoop { chan: Channel::new(sender), client_id: None, lss_tx, + busy: Arc::new(AtomicBool::new(false)), } } @@ -57,6 +64,7 @@ impl SignerLoop { lss_tx: mpsc::Sender, sender: mpsc::Sender, client_id: ClientId, + busy: Arc, ) -> Self { let log_prefix = format!("{}/{}", std::process::id(), client.id()); Self { @@ -65,6 +73,7 @@ impl SignerLoop { chan: Channel::new(sender), client_id: Some(client_id), lss_tx, + busy, } } @@ -99,6 +108,7 @@ impl SignerLoop { self.lss_tx.clone(), self.chan.sender.clone(), client_id, + self.busy.clone(), ); thread::spawn(move || new_loop.start(None)); } @@ -130,6 +140,19 @@ impl SignerLoop { } fn handle_message(&mut self, message: Vec, catch_init: bool) -> Result> { + // let l = self.lock.lock().unwrap(); + loop { + let is_busy = self.busy.load(Ordering::Relaxed); + if !is_busy { + // busy now! + self.busy.store(true, Ordering::Relaxed); + break; + } else { + // wait 5 ms + thread::sleep(Duration::from_millis(5)); + } + } + let dbid = self.client_id.as_ref().map(|c| c.dbid).unwrap_or(0); let peer_id = self .client_id @@ -156,6 +179,8 @@ impl SignerLoop { if catch_init { let _ = self.set_channel_pubkey(reply.clone()); } + // unlock + self.busy.store(false, Ordering::Relaxed); Ok(reply) }