Merge pull request #123 from stakwork/ticketlock

Ticketlock
This commit is contained in:
Evan Feenstra
2023-09-06 09:47:11 -07:00
committed by GitHub
2 changed files with 37 additions and 30 deletions

View File

@@ -1,10 +1,11 @@
use crate::conn::{ChannelRequest, LssReq}; use crate::conn::{ChannelRequest, LssReq};
use crate::looper::{done_being_busy, try_to_get_busy}; use crate::looper::{is_my_turn, my_turn_is_done, take_a_ticket};
use anyhow::Result; use anyhow::Result;
use async_trait::async_trait; use async_trait::async_trait;
use rocket::tokio; use rocket::tokio;
use sphinx_signer::{parser, sphinx_glyph::topics}; use sphinx_signer::{parser, sphinx_glyph::topics};
use std::time::Duration; use std::time::Duration;
use std::thread;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use vls_protocol::Error; use vls_protocol::Error;
use vls_protocol_client::{ClientResult, SignerPort}; use vls_protocol_client::{ClientResult, SignerPort};
@@ -32,12 +33,15 @@ impl MqttSignerPort {
async fn send_and_wait(&self, message: Vec<u8>) -> Result<Vec<u8>> { async fn send_and_wait(&self, message: Vec<u8>) -> Result<Vec<u8>> {
// wait until not busy // wait until not busy
let ticket = take_a_ticket();
loop { loop {
match try_to_get_busy() { if is_my_turn(ticket) {
Ok(_) => break, break;
Err(_) => tokio::time::sleep(Duration::from_millis(5)).await, } else {
}; thread::sleep(Duration::from_millis(5));
} }
}
// add the serial request header // add the serial request header
let m = parser::raw_request_from_bytes(message, 0, [0; 33], 0)?; let m = parser::raw_request_from_bytes(message, 0, [0; 33], 0)?;
let (res_topic, res) = self.send_request_wait(topics::VLS, m).await?; let (res_topic, res) = self.send_request_wait(topics::VLS, m).await?;
@@ -53,7 +57,9 @@ impl MqttSignerPort {
} }
// remove the serial request header // remove the serial request header
let r = parser::raw_response_from_bytes(the_res, 0)?; let r = parser::raw_response_from_bytes(the_res, 0)?;
done_being_busy();
my_turn_is_done();
Ok(r) Ok(r)
} }

View File

@@ -5,23 +5,26 @@ use log::*;
use rocket::tokio::sync::mpsc; use rocket::tokio::sync::mpsc;
use secp256k1::PublicKey; use secp256k1::PublicKey;
use sphinx_signer::{parser, sphinx_glyph::topics}; use sphinx_signer::{parser, sphinx_glyph::topics};
use std::sync::atomic::{AtomicBool, AtomicU16, Ordering}; use std::sync::atomic::{AtomicU16, Ordering};
use std::thread; use std::thread;
use std::time::Duration; use std::time::Duration;
use vls_protocol::{msgs, msgs::Message, Error, Result}; use vls_protocol::{msgs, msgs::Message, Error, Result};
use vls_proxy::client::Client; use vls_proxy::client::Client;
pub static BUSY: AtomicBool = AtomicBool::new(false); static COUNTER: AtomicU16 = AtomicU16::new(0u16);
pub static COUNTER: AtomicU16 = AtomicU16::new(0u16); static CURRENT: AtomicU16 = AtomicU16::new(0u16);
// set BUSY to true if its false pub fn take_a_ticket() -> u16 {
pub fn try_to_get_busy() -> std::result::Result<bool, bool> { COUNTER.fetch_add(1u16, Ordering::SeqCst)
BUSY.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
} }
// set BUSY back to false pub fn is_my_turn(ticket: u16) -> bool {
pub fn done_being_busy() { let curr = CURRENT.load(Ordering::SeqCst);
BUSY.store(false, Ordering::Release); curr == ticket
}
pub fn my_turn_is_done() {
CURRENT.fetch_add(1u16, Ordering::SeqCst);
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@@ -145,11 +148,13 @@ impl<C: 'static + Client> SignerLoop<C> {
fn handle_message(&mut self, message: Vec<u8>, catch_init: bool) -> Result<Vec<u8>> { fn handle_message(&mut self, message: Vec<u8>, catch_init: bool) -> Result<Vec<u8>> {
// wait until not busy // wait until not busy
let ticket = take_a_ticket();
loop { loop {
match try_to_get_busy() { if is_my_turn(ticket) {
Ok(_) => break, break;
Err(_) => thread::sleep(Duration::from_millis(5)), } else {
}; thread::sleep(Duration::from_millis(5));
}
} }
let dbid = self.client_id.as_ref().map(|c| c.dbid).unwrap_or(0); let dbid = self.client_id.as_ref().map(|c| c.dbid).unwrap_or(0);
@@ -158,12 +163,7 @@ impl<C: 'static + Client> SignerLoop<C> {
.as_ref() .as_ref()
.map(|c| c.peer_id.serialize()) .map(|c| c.peer_id.serialize())
.unwrap_or([0u8; 33]); .unwrap_or([0u8; 33]);
let md = parser::raw_request_from_bytes( let md = parser::raw_request_from_bytes(message, ticket, peer_id, dbid)?;
message,
COUNTER.load(Ordering::Relaxed),
peer_id,
dbid,
)?;
// send to signer // send to signer
log::info!("SEND ON {}", topics::VLS); log::info!("SEND ON {}", topics::VLS);
let (res_topic, res) = self.send_request_wait(topics::VLS, md)?; let (res_topic, res) = self.send_request_wait(topics::VLS, md)?;
@@ -184,15 +184,16 @@ impl<C: 'static + Client> SignerLoop<C> {
res res
}; };
// create reply bytes for CLN // create reply bytes for CLN
let reply = parser::raw_response_from_bytes(the_res, COUNTER.load(Ordering::Relaxed))?; let reply = parser::raw_response_from_bytes(the_res, ticket)?;
// add to the sequence
COUNTER.fetch_add(1u16, Ordering::Relaxed);
// catch the pubkey if its the first one connection // catch the pubkey if its the first one connection
if catch_init { if catch_init {
let _ = self.set_channel_pubkey(reply.clone()); let _ = self.set_channel_pubkey(reply.clone());
} }
// unlock
done_being_busy(); // next turn
my_turn_is_done();
Ok(reply) Ok(reply)
} }