mirror of
https://github.com/stakwork/sphinx-key.git
synced 2026-02-01 05:44:19 +01:00
@@ -18,6 +18,9 @@ impl Connections {
|
||||
current: None,
|
||||
}
|
||||
}
|
||||
pub fn len(&self) -> usize {
|
||||
self.clients.len()
|
||||
}
|
||||
pub fn set_pubkey(&mut self, pk: &str) {
|
||||
self.pubkey = Some(pk.to_string())
|
||||
}
|
||||
|
||||
@@ -3,64 +3,38 @@ use anyhow::{anyhow, Result};
|
||||
use lss_connector::{InitResponse, LssBroker, Response, SignerMutations};
|
||||
use rocket::tokio;
|
||||
use rumqttd::oneshot;
|
||||
use rumqttd::oneshot as std_oneshot;
|
||||
use sphinx_signer::sphinx_glyph::topics;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
pub async fn lss_setup(uri: &str, mqtt_tx: mpsc::Sender<ChannelRequest>) -> Result<LssBroker> {
|
||||
// LSS required
|
||||
let (spk, msg_bytes) = LssBroker::get_server_pubkey(uri).await?;
|
||||
let ir = loop {
|
||||
if let Ok(ir) = send_init(msg_bytes.clone(), &mqtt_tx).await {
|
||||
break ir;
|
||||
}
|
||||
sleep(2).await;
|
||||
};
|
||||
|
||||
let lss_conn = LssBroker::new(uri, ir.clone(), spk).await?;
|
||||
// this only returns the initial state if it was requested by signer
|
||||
let msg_bytes2 = lss_conn.get_created_state_msg(&ir).await?;
|
||||
let cr = loop {
|
||||
if let Ok(ir) = send_created(msg_bytes2.clone(), &mqtt_tx).await {
|
||||
break ir;
|
||||
}
|
||||
sleep(2).await;
|
||||
};
|
||||
|
||||
lss_conn.handle(Response::Created(cr)).await?;
|
||||
|
||||
Ok(lss_conn)
|
||||
}
|
||||
|
||||
async fn send_init(
|
||||
msg_bytes: Vec<u8>,
|
||||
mqtt_tx: &mpsc::Sender<ChannelRequest>,
|
||||
) -> Result<InitResponse> {
|
||||
let reply = ChannelRequest::send(topics::INIT_1_MSG, msg_bytes, &mqtt_tx).await?;
|
||||
let ir = Response::from_slice(&reply)?.into_init()?;
|
||||
Ok(ir)
|
||||
}
|
||||
|
||||
async fn send_created(
|
||||
msg_bytes: Vec<u8>,
|
||||
mqtt_tx: &mpsc::Sender<ChannelRequest>,
|
||||
) -> Result<SignerMutations> {
|
||||
let reply2 = ChannelRequest::send(topics::INIT_2_MSG, msg_bytes, &mqtt_tx).await?;
|
||||
let cr = Response::from_slice(&reply2)?.into_created()?;
|
||||
Ok(cr)
|
||||
}
|
||||
|
||||
pub fn lss_tasks(
|
||||
lss_conn: LssBroker,
|
||||
mut lss_rx: mpsc::Receiver<LssReq>,
|
||||
mut reconn_rx: mpsc::Receiver<(String, oneshot::Sender<bool>)>,
|
||||
uri: String,
|
||||
lss_rx: mpsc::Receiver<LssReq>,
|
||||
mut conn_rx: mpsc::Receiver<(String, oneshot::Sender<bool>)>,
|
||||
init_tx: mpsc::Sender<ChannelRequest>,
|
||||
) {
|
||||
// msg handler (from CLN looper)
|
||||
let lss_conn_ = lss_conn.clone();
|
||||
tokio::task::spawn(async move {
|
||||
// first connection - initializes lssbroker
|
||||
let lss_conn = loop {
|
||||
let (cid, dance_complete_tx) = conn_rx.recv().await.unwrap();
|
||||
match try_dance(&cid, &uri, None, &init_tx, dance_complete_tx).await {
|
||||
Some(broker) => break broker,
|
||||
None => log::warn!("broker not initialized, try connecting again..."),
|
||||
}
|
||||
};
|
||||
spawn_lss_rx(lss_conn.clone(), lss_rx);
|
||||
// connect handler for all subsequent connections
|
||||
while let Some((cid, dance_complete_tx)) = conn_rx.recv().await {
|
||||
log::info!("CLIENT {} connected!", cid);
|
||||
let _ = try_dance(&cid, &uri, Some(&lss_conn), &init_tx, dance_complete_tx).await;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
fn spawn_lss_rx(lss_conn: LssBroker, mut lss_rx: mpsc::Receiver<LssReq>) {
|
||||
tokio::task::spawn(async move {
|
||||
while let Some(req) = lss_rx.recv().await {
|
||||
match lss_conn_.handle_bytes(&req.message).await {
|
||||
match lss_conn.handle_bytes(&req.message).await {
|
||||
Ok(msg) => {
|
||||
let _ = req.reply_tx.send(msg);
|
||||
}
|
||||
@@ -70,50 +44,64 @@ pub fn lss_tasks(
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// reconnect handler (when a client reconnects)
|
||||
let lss_conn_ = lss_conn.clone();
|
||||
let init_tx_ = init_tx.clone();
|
||||
tokio::task::spawn(async move {
|
||||
while let Some((cid, dance_complete_tx)) = reconn_rx.recv().await {
|
||||
log::info!("CLIENT {} reconnected!", cid);
|
||||
if let Err(e) = reconnect_dance(&cid, &lss_conn_, &init_tx_).await {
|
||||
log::error!("reconnect dance failed {:?}", e);
|
||||
let _ = dance_complete_tx.send(false);
|
||||
} else {
|
||||
let _ = dance_complete_tx.send(true);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async fn reconnect_dance(
|
||||
async fn try_dance(
|
||||
cid: &str,
|
||||
lss_conn: &LssBroker,
|
||||
mqtt_tx: &mpsc::Sender<ChannelRequest>,
|
||||
) -> Result<()> {
|
||||
log::debug!("Reconnect dance started, proceeding with step 1");
|
||||
let ir = dance_step_1(cid, lss_conn, mqtt_tx).await?;
|
||||
log::debug!("Step 1 finished, now onto step 2");
|
||||
let _ = dance_step_2(cid, lss_conn, mqtt_tx, &ir).await?;
|
||||
log::debug!("Reconnect dance finished!");
|
||||
Ok(())
|
||||
uri: &str,
|
||||
lss_conn: Option<&LssBroker>,
|
||||
init_tx: &mpsc::Sender<ChannelRequest>,
|
||||
dance_complete_tx: std_oneshot::Sender<bool>,
|
||||
) -> Option<LssBroker> {
|
||||
match connect_dance(cid, uri, lss_conn, init_tx).await {
|
||||
Ok(broker) => {
|
||||
let _ = dance_complete_tx.send(true);
|
||||
// none if lss_conn is some, some otherwise
|
||||
broker
|
||||
}
|
||||
Err(e) => {
|
||||
log::warn!("connect_dance failed: {:?}", e);
|
||||
let _ = dance_complete_tx.send(false);
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn connect_dance(
|
||||
cid: &str,
|
||||
uri: &str,
|
||||
lss_conn: Option<&LssBroker>,
|
||||
mqtt_tx: &mpsc::Sender<ChannelRequest>,
|
||||
) -> Result<Option<LssBroker>> {
|
||||
let (new_broker, ir) = dance_step_1(cid, uri, lss_conn, mqtt_tx).await?;
|
||||
let lss_conn = new_broker.as_ref().xor(lss_conn).ok_or(anyhow!(
|
||||
"should never happen, either we use the newly initialized, or the one passed in"
|
||||
))?;
|
||||
let _ = dance_step_2(cid, lss_conn, mqtt_tx, &ir).await?;
|
||||
// only some when lss_conn is none
|
||||
Ok(new_broker)
|
||||
}
|
||||
|
||||
// initializes a new broker in case lss_conn is none
|
||||
async fn dance_step_1(
|
||||
cid: &str,
|
||||
lss_conn: &LssBroker,
|
||||
uri: &str,
|
||||
lss_conn: Option<&LssBroker>,
|
||||
mqtt_tx: &mpsc::Sender<ChannelRequest>,
|
||||
) -> Result<InitResponse> {
|
||||
let init_bytes = lss_conn.make_init_msg().await?;
|
||||
log::debug!("starting dance_step_1 send for {}", cid);
|
||||
let reply = ChannelRequest::send_for(cid, topics::INIT_1_MSG, init_bytes, mqtt_tx).await?;
|
||||
log::debug!("dance_step_1 send for completed");
|
||||
if reply.is_empty() {
|
||||
return Err(anyhow!("dance step 1 did not complete"));
|
||||
) -> Result<(Option<LssBroker>, InitResponse)> {
|
||||
match lss_conn {
|
||||
Some(lss_conn) => {
|
||||
let init_bytes = lss_conn.make_init_msg().await?;
|
||||
let ir = send_init(cid, init_bytes, mqtt_tx).await?;
|
||||
Ok((None, ir))
|
||||
}
|
||||
None => {
|
||||
let (spk, init_bytes) = LssBroker::get_server_pubkey(uri).await?;
|
||||
let ir = send_init(cid, init_bytes, mqtt_tx).await?;
|
||||
let lss_conn = Some(LssBroker::new(uri, ir.clone(), spk).await?);
|
||||
Ok((lss_conn, ir))
|
||||
}
|
||||
}
|
||||
let ir = Response::from_slice(&reply)?.into_init()?;
|
||||
Ok(ir)
|
||||
}
|
||||
|
||||
async fn dance_step_2(
|
||||
@@ -123,17 +111,33 @@ async fn dance_step_2(
|
||||
ir: &InitResponse,
|
||||
) -> Result<()> {
|
||||
let state_bytes = lss_conn.get_created_state_msg(ir).await?;
|
||||
log::debug!("starting dance_step_2 send for {}", cid);
|
||||
let reply2 = ChannelRequest::send_for(cid, topics::INIT_2_MSG, state_bytes, mqtt_tx).await?;
|
||||
log::debug!("dance_step_2 send for completed");
|
||||
if reply2.is_empty() {
|
||||
return Err(anyhow!("dance step 2 did not complete"));
|
||||
}
|
||||
let cr = Response::from_slice(&reply2)?.into_created()?;
|
||||
let cr = send_created(cid, state_bytes, mqtt_tx).await?;
|
||||
lss_conn.handle(Response::Created(cr)).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn sleep(s: u64) {
|
||||
tokio::time::sleep(Duration::from_secs(s)).await;
|
||||
async fn send_init(
|
||||
cid: &str,
|
||||
msg_bytes: Vec<u8>,
|
||||
mqtt_tx: &mpsc::Sender<ChannelRequest>,
|
||||
) -> Result<InitResponse> {
|
||||
let reply = ChannelRequest::send_for(cid, topics::INIT_1_MSG, msg_bytes, mqtt_tx).await?;
|
||||
if reply.is_empty() {
|
||||
return Err(anyhow!("send init did not complete, reply is empty"));
|
||||
}
|
||||
let ir = Response::from_slice(&reply)?.into_init()?;
|
||||
Ok(ir)
|
||||
}
|
||||
|
||||
async fn send_created(
|
||||
cid: &str,
|
||||
msg_bytes: Vec<u8>,
|
||||
mqtt_tx: &mpsc::Sender<ChannelRequest>,
|
||||
) -> Result<SignerMutations> {
|
||||
let reply = ChannelRequest::send_for(cid, topics::INIT_2_MSG, msg_bytes, mqtt_tx).await?;
|
||||
if reply.is_empty() {
|
||||
return Err(anyhow!("send created did not complete, reply is empty"));
|
||||
}
|
||||
let cr = Response::from_slice(&reply)?.into_created()?;
|
||||
Ok(cr)
|
||||
}
|
||||
|
||||
@@ -20,8 +20,8 @@ use rocket::tokio::{
|
||||
sync::{broadcast, mpsc},
|
||||
};
|
||||
use rumqttd::{oneshot as std_oneshot, AuthMsg};
|
||||
use std::env;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::{env, time::Duration};
|
||||
use url::Url;
|
||||
use vls_frontend::{frontend::SourceFactory, Frontend};
|
||||
use vls_proxy::client::UnixClient;
|
||||
@@ -74,38 +74,18 @@ async fn run_main(parent_fd: i32) -> rocket::Rocket<rocket::Build> {
|
||||
let (error_tx, error_rx) = broadcast::channel(10000);
|
||||
error_log::log_errors(error_rx);
|
||||
|
||||
let (reconn_tx, reconn_rx) = mpsc::channel::<(String, std_oneshot::Sender<bool>)>(10000);
|
||||
let (conn_tx, conn_rx) = mpsc::channel::<(String, std_oneshot::Sender<bool>)>(10000);
|
||||
|
||||
// this does not wait for the first connection
|
||||
let conns = broker_setup(
|
||||
settings,
|
||||
mqtt_rx,
|
||||
init_rx,
|
||||
reconn_tx.clone(),
|
||||
error_tx.clone(),
|
||||
)
|
||||
.await;
|
||||
let conns = broker_setup(settings, mqtt_rx, init_rx, conn_tx, error_tx.clone()).await;
|
||||
|
||||
let (lss_tx, lss_rx) = mpsc::channel::<LssReq>(10000);
|
||||
// TODO: add a validation here of the uri setting to make sure LSS is running
|
||||
if let Ok(lss_uri) = env::var("VLS_LSS") {
|
||||
// waits until LSS confirmation from signer
|
||||
let lss_broker = loop {
|
||||
match lss::lss_setup(&lss_uri, init_tx.clone()).await {
|
||||
Ok(l) => {
|
||||
break l;
|
||||
}
|
||||
Err(e) => {
|
||||
let _ = error_tx.send(e.to_string().as_bytes().to_vec());
|
||||
log::error!("failed LSS setup, trying again...");
|
||||
tokio::time::sleep(Duration::from_secs(3)).await;
|
||||
}
|
||||
}
|
||||
};
|
||||
lss::lss_tasks(lss_broker, lss_rx, reconn_rx, init_tx);
|
||||
log::info!("=> lss broker connection created!");
|
||||
log::info!("Spawning lss tasks...");
|
||||
lss::lss_tasks(lss_uri, lss_rx, conn_rx, init_tx);
|
||||
} else {
|
||||
log::warn!("running without LSS");
|
||||
};
|
||||
}
|
||||
|
||||
if let Ok(btc_url) = env::var("BITCOIND_RPC_URL") {
|
||||
let signer_port = MqttSignerPort::new(mqtt_tx.clone(), lss_tx.clone());
|
||||
@@ -124,7 +104,12 @@ async fn run_main(parent_fd: i32) -> rocket::Rocket<rocket::Build> {
|
||||
}
|
||||
|
||||
// test sleep FIXME
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
while conns.lock().unwrap().len() == 0 {
|
||||
log::debug!(
|
||||
"waiting for first connection before proceeding with SignerLoop and Rocket launch"
|
||||
);
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
}
|
||||
|
||||
let cln_client = UnixClient::new(UnixConnection::new(parent_fd));
|
||||
// TODO pass status_rx into SignerLoop?
|
||||
@@ -142,7 +127,7 @@ pub async fn broker_setup(
|
||||
settings: Settings,
|
||||
mqtt_rx: mpsc::Receiver<ChannelRequest>,
|
||||
init_rx: mpsc::Receiver<ChannelRequest>,
|
||||
reconn_tx: mpsc::Sender<(String, std_oneshot::Sender<bool>)>,
|
||||
conn_tx: mpsc::Sender<(String, std_oneshot::Sender<bool>)>,
|
||||
error_tx: broadcast::Sender<Vec<u8>>,
|
||||
) -> Arc<Mutex<Connections>> {
|
||||
let (auth_tx, auth_rx) = std::sync::mpsc::channel::<AuthMsg>();
|
||||
@@ -177,22 +162,16 @@ pub async fn broker_setup(
|
||||
// client connections state
|
||||
let conns_ = conns.clone();
|
||||
std::thread::spawn(move || {
|
||||
log::info!("=> wait for connected status");
|
||||
// wait for connection = true
|
||||
let (cid, connected) = status_rx.recv().expect("couldnt receive");
|
||||
let mut cs = conns_.lock().unwrap();
|
||||
cs.client_action(&cid, connected);
|
||||
drop(cs);
|
||||
log::info!("=> connected: {}: {}", cid, connected);
|
||||
log::info!("=> waiting first connection...");
|
||||
while let Ok((cid, connected)) = status_rx.recv() {
|
||||
log::info!("=> reconnected: {}: {}", cid, connected);
|
||||
log::info!("=> connection status: {}: {}", cid, connected);
|
||||
let mut cs = conns_.lock().unwrap();
|
||||
// drop it from list until ready
|
||||
cs.client_action(&cid, false);
|
||||
drop(cs);
|
||||
if connected {
|
||||
let (dance_complete_tx, dance_complete_rx) = std_oneshot::channel::<bool>();
|
||||
let _ = reconn_tx.blocking_send((cid.clone(), dance_complete_tx));
|
||||
let _ = conn_tx.blocking_send((cid.clone(), dance_complete_tx));
|
||||
let dance_complete = dance_complete_rx.recv().unwrap_or_else(|e| {
|
||||
log::info!(
|
||||
"dance_complete channel died before receiving response: {}",
|
||||
@@ -200,6 +179,7 @@ pub async fn broker_setup(
|
||||
);
|
||||
false
|
||||
});
|
||||
log::info!("adding client to the list? {}", dance_complete);
|
||||
let mut cs = conns_.lock().unwrap();
|
||||
cs.client_action(&cid, dance_complete);
|
||||
log::debug!("List: {:?}, action: {}", cs, dance_complete);
|
||||
|
||||
Reference in New Issue
Block a user