diff --git a/broker/src/chain_tracker.rs b/broker/src/chain_tracker.rs index b018c86..11a8bd6 100644 --- a/broker/src/chain_tracker.rs +++ b/broker/src/chain_tracker.rs @@ -1,10 +1,11 @@ use crate::conn::{ChannelRequest, LssReq}; -use crate::looper::{done_being_busy, try_to_get_busy}; +use crate::looper::{is_my_turn, my_turn_is_done, take_a_ticket}; use anyhow::Result; use async_trait::async_trait; use rocket::tokio; use sphinx_signer::{parser, sphinx_glyph::topics}; use std::time::Duration; +use std::thread; use tokio::sync::mpsc; use vls_protocol::Error; use vls_protocol_client::{ClientResult, SignerPort}; @@ -32,12 +33,15 @@ impl MqttSignerPort { async fn send_and_wait(&self, message: Vec) -> Result> { // wait until not busy + let ticket = take_a_ticket(); loop { - match try_to_get_busy() { - Ok(_) => break, - Err(_) => tokio::time::sleep(Duration::from_millis(5)).await, - }; + if is_my_turn(ticket) { + break; + } else { + thread::sleep(Duration::from_millis(5)); + } } + // add the serial request header let m = parser::raw_request_from_bytes(message, 0, [0; 33], 0)?; let (res_topic, res) = self.send_request_wait(topics::VLS, m).await?; @@ -53,7 +57,9 @@ impl MqttSignerPort { } // remove the serial request header let r = parser::raw_response_from_bytes(the_res, 0)?; - done_being_busy(); + + my_turn_is_done(); + Ok(r) } diff --git a/broker/src/looper.rs b/broker/src/looper.rs index 670fa4d..87298d4 100644 --- a/broker/src/looper.rs +++ b/broker/src/looper.rs @@ -5,24 +5,17 @@ use log::*; use rocket::tokio::sync::mpsc; use secp256k1::PublicKey; use sphinx_signer::{parser, sphinx_glyph::topics}; -use std::sync::atomic::{AtomicBool, AtomicU16, Ordering}; +use std::sync::atomic::{AtomicU16, Ordering}; use std::thread; use std::time::Duration; use vls_protocol::{msgs, msgs::Message, Error, Result}; use vls_proxy::client::Client; -pub static BUSY: AtomicBool = AtomicBool::new(false); -pub static COUNTER: AtomicU16 = AtomicU16::new(0u16); -pub static CURRENT: AtomicU16 = AtomicU16::new(0u16); +static COUNTER: AtomicU16 = AtomicU16::new(0u16); +static CURRENT: AtomicU16 = AtomicU16::new(0u16); -// set BUSY to true if its false -pub fn try_to_get_busy() -> std::result::Result { - BUSY.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed) -} - -// set BUSY back to false -pub fn done_being_busy() { - BUSY.store(false, Ordering::Release); +pub fn take_a_ticket() -> u16 { + COUNTER.fetch_add(1u16, Ordering::Relaxed) } pub fn is_my_turn(ticket: u16) -> bool { @@ -30,6 +23,10 @@ pub fn is_my_turn(ticket: u16) -> bool { curr == ticket } +pub fn my_turn_is_done() { + CURRENT.fetch_add(1u16, Ordering::Relaxed); +} + #[derive(Clone, Debug)] pub struct ClientId { pub peer_id: PublicKey, @@ -151,7 +148,7 @@ impl SignerLoop { fn handle_message(&mut self, message: Vec, catch_init: bool) -> Result> { // wait until not busy - let ticket = COUNTER.fetch_add(1u16, Ordering::Relaxed); + let ticket = take_a_ticket(); loop { if is_my_turn(ticket) { break; @@ -195,7 +192,7 @@ impl SignerLoop { } // next turn - CURRENT.fetch_add(1u16, Ordering::Relaxed); + my_turn_is_done(); Ok(reply) }