From 6adf9ef1443fd04d8dacc2092fe3c00cbe8034ca Mon Sep 17 00:00:00 2001 From: irriden Date: Thu, 20 Jul 2023 17:46:56 +0000 Subject: [PATCH 1/3] broker: remove sequence number from individual channel structs for now broker sets sequence number to 0 across the board --- broker/src/conn.rs | 1 - broker/src/looper.rs | 8 +++----- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/broker/src/conn.rs b/broker/src/conn.rs index a7421d4..9c02d2a 100644 --- a/broker/src/conn.rs +++ b/broker/src/conn.rs @@ -41,7 +41,6 @@ impl Connections { } pub struct Channel { - pub sequence: u16, pub sender: mpsc::Sender, pub pubkey: [u8; 33], } diff --git a/broker/src/looper.rs b/broker/src/looper.rs index 02f7d26..2b50e5b 100644 --- a/broker/src/looper.rs +++ b/broker/src/looper.rs @@ -33,7 +33,6 @@ impl Channel { pub fn new(sender: mpsc::Sender) -> Self { Self { sender, - sequence: 0, pubkey: [0; 33], // init with empty pubkey } } @@ -158,7 +157,7 @@ impl SignerLoop { .as_ref() .map(|c| c.peer_id.serialize()) .unwrap_or([0u8; 33]); - let md = parser::raw_request_from_bytes(message, self.chan.sequence, peer_id, dbid)?; + let md = parser::raw_request_from_bytes(message, 0u16, peer_id, dbid)?; // send to signer log::info!("SEND ON {}", topics::VLS); let (res_topic, res) = self.send_request_wait(topics::VLS, md)?; @@ -178,9 +177,8 @@ impl SignerLoop { the_res = res2; } // create reply bytes for CLN - let reply = parser::raw_response_from_bytes(the_res, self.chan.sequence)?; - // add to the sequence - self.chan.sequence = self.chan.sequence.wrapping_add(1); + let reply = parser::raw_response_from_bytes(the_res, 0u16)?; + // catch the pubkey if its the first one connection if catch_init { let _ = self.set_channel_pubkey(reply.clone()); From ad0a2f90ad972ab02fd42cb6df36fa833b83ffcd Mon Sep 17 00:00:00 2001 From: irriden Date: Thu, 20 Jul 2023 18:27:51 +0000 Subject: [PATCH 2/3] broker: make done_being_busy ordering::release, not ordering::relaxed we want to make sure that all operations before this call have completed before setting BUSY back to false --- broker/src/looper.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/broker/src/looper.rs b/broker/src/looper.rs index 2b50e5b..5bda02a 100644 --- a/broker/src/looper.rs +++ b/broker/src/looper.rs @@ -20,7 +20,7 @@ pub fn try_to_get_busy() -> std::result::Result { // set BUSY back to false pub fn done_being_busy() { - BUSY.store(false, Ordering::Relaxed); + BUSY.store(false, Ordering::Release); } #[derive(Clone, Debug)] From 3053a9273a449248bb8c3c56bf7631cf7c13ec69 Mon Sep 17 00:00:00 2001 From: irriden Date: Thu, 20 Jul 2023 19:20:40 +0000 Subject: [PATCH 3/3] broker: create atomicu16, use as global sequence number --- broker/src/looper.rs | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/broker/src/looper.rs b/broker/src/looper.rs index 5bda02a..36ea109 100644 --- a/broker/src/looper.rs +++ b/broker/src/looper.rs @@ -5,13 +5,14 @@ use log::*; use rocket::tokio::sync::mpsc; use secp256k1::PublicKey; use sphinx_signer::{parser, sphinx_glyph::topics}; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU16, Ordering}; use std::thread; use std::time::Duration; use vls_protocol::{msgs, msgs::Message, Error, Result}; use vls_proxy::client::Client; pub static BUSY: AtomicBool = AtomicBool::new(false); +pub static COUNTER: AtomicU16 = AtomicU16::new(0u16); // set BUSY to true if its false pub fn try_to_get_busy() -> std::result::Result { @@ -157,7 +158,12 @@ impl SignerLoop { .as_ref() .map(|c| c.peer_id.serialize()) .unwrap_or([0u8; 33]); - let md = parser::raw_request_from_bytes(message, 0u16, peer_id, dbid)?; + let md = parser::raw_request_from_bytes( + message, + COUNTER.load(Ordering::Relaxed), + peer_id, + dbid, + )?; // send to signer log::info!("SEND ON {}", topics::VLS); let (res_topic, res) = self.send_request_wait(topics::VLS, md)?; @@ -177,8 +183,9 @@ impl SignerLoop { the_res = res2; } // create reply bytes for CLN - let reply = parser::raw_response_from_bytes(the_res, 0u16)?; - + let reply = parser::raw_response_from_bytes(the_res, COUNTER.load(Ordering::Relaxed))?; + // add to the sequence + COUNTER.fetch_add(1u16, Ordering::Relaxed); // catch the pubkey if its the first one connection if catch_init { let _ = self.set_channel_pubkey(reply.clone());