mirror of
https://github.com/stakwork/sphinx-key.git
synced 2025-12-17 07:14:23 +01:00
broker side reconnection dance
This commit is contained in:
8
broker/Cargo.lock
generated
8
broker/Cargo.lock
generated
@@ -1713,7 +1713,7 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "lss-connector"
|
name = "lss-connector"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
source = "git+https://github.com/stakwork/sphinx-rs#607a8d64f5c1f87d7d5dad61806f9d8d12bd87d6"
|
source = "git+https://github.com/stakwork/sphinx-rs#07be5d75f102ac329c210604f58ea0ca78f053a2"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"lightning-storage-server",
|
"lightning-storage-server",
|
||||||
@@ -3173,7 +3173,7 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "sphinx-auther"
|
name = "sphinx-auther"
|
||||||
version = "0.1.12"
|
version = "0.1.12"
|
||||||
source = "git+https://github.com/stakwork/sphinx-rs#607a8d64f5c1f87d7d5dad61806f9d8d12bd87d6"
|
source = "git+https://github.com/stakwork/sphinx-rs#07be5d75f102ac329c210604f58ea0ca78f053a2"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"base64 0.13.1",
|
"base64 0.13.1",
|
||||||
@@ -3185,7 +3185,7 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "sphinx-glyph"
|
name = "sphinx-glyph"
|
||||||
version = "0.1.2"
|
version = "0.1.2"
|
||||||
source = "git+https://github.com/stakwork/sphinx-rs#607a8d64f5c1f87d7d5dad61806f9d8d12bd87d6"
|
source = "git+https://github.com/stakwork/sphinx-rs#07be5d75f102ac329c210604f58ea0ca78f053a2"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"hex",
|
"hex",
|
||||||
@@ -3231,7 +3231,7 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "sphinx-signer"
|
name = "sphinx-signer"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
source = "git+https://github.com/stakwork/sphinx-rs#607a8d64f5c1f87d7d5dad61806f9d8d12bd87d6"
|
source = "git+https://github.com/stakwork/sphinx-rs#07be5d75f102ac329c210604f58ea0ca78f053a2"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"bip39",
|
"bip39",
|
||||||
|
|||||||
@@ -28,7 +28,7 @@ use vls_proxy::client::UnixClient;
|
|||||||
use vls_proxy::connection::{open_parent_fd, UnixConnection};
|
use vls_proxy::connection::{open_parent_fd, UnixConnection};
|
||||||
use vls_proxy::portfront::SignerPortFront;
|
use vls_proxy::portfront::SignerPortFront;
|
||||||
use vls_proxy::util::{add_hsmd_args, handle_hsmd_version};
|
use vls_proxy::util::{add_hsmd_args, handle_hsmd_version};
|
||||||
use lss_connector::{LssBroker, Response, lss_handle};
|
use lss_connector::{LssBroker, Response};
|
||||||
use sphinx_signer::sphinx_glyph::topics;
|
use sphinx_signer::sphinx_glyph::topics;
|
||||||
|
|
||||||
|
|
||||||
@@ -76,13 +76,15 @@ async fn run_main(parent_fd: i32) -> rocket::Rocket<rocket::Build> {
|
|||||||
let (error_tx, error_rx) = broadcast::channel(10000);
|
let (error_tx, error_rx) = broadcast::channel(10000);
|
||||||
error_log::log_errors(error_rx);
|
error_log::log_errors(error_rx);
|
||||||
|
|
||||||
|
let (reconn_tx, reconn_rx) = mpsc::channel::<(String, bool)>(10000);
|
||||||
|
|
||||||
// waits until first connection
|
// waits until first connection
|
||||||
let conns = broker_setup(settings, mqtt_rx, error_tx.clone()).await;
|
let conns = broker_setup(settings, mqtt_rx, reconn_tx.clone(), error_tx.clone()).await;
|
||||||
|
|
||||||
let (lss_tx, lss_rx) = mpsc::channel(10000);
|
let (lss_tx, lss_rx) = mpsc::channel(10000);
|
||||||
let _lss_broker = if let Ok(lss_uri) = env::var("VLS_LSS") {
|
let _lss_broker = if let Ok(lss_uri) = env::var("VLS_LSS") {
|
||||||
// waits until LSS confirmation from signer
|
// waits until LSS confirmation from signer
|
||||||
let lss_broker = match lss_setup(&lss_uri, lss_rx, mqtt_tx.clone()).await{
|
let lss_broker = match lss_setup(&lss_uri, lss_rx, reconn_rx, mqtt_tx.clone()).await{
|
||||||
Ok(l) => l,
|
Ok(l) => l,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
let _ = error_tx.send(e.to_string().as_bytes().to_vec());
|
let _ = error_tx.send(e.to_string().as_bytes().to_vec());
|
||||||
@@ -126,14 +128,13 @@ async fn run_main(parent_fd: i32) -> rocket::Rocket<rocket::Build> {
|
|||||||
routes::launch_rocket(mqtt_tx, error_tx, settings, conns)
|
routes::launch_rocket(mqtt_tx, error_tx, settings, conns)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn lss_setup(uri: &str, mut lss_rx: mpsc::Receiver<LssReq>, mqtt_tx: mpsc::Sender<ChannelRequest>) -> Result<LssBroker> {
|
pub async fn lss_setup(uri: &str, mut lss_rx: mpsc::Receiver<LssReq>, mut reconn_rx: mpsc::Receiver<(String, bool)>, mqtt_tx: mpsc::Sender<ChannelRequest>) -> Result<LssBroker> {
|
||||||
|
|
||||||
// LSS required
|
// LSS required
|
||||||
let (spk, msg_bytes) = LssBroker::get_server_pubkey(uri).await?;
|
let (spk, msg_bytes) = LssBroker::get_server_pubkey(uri).await?;
|
||||||
let (req1, reply_rx) = ChannelRequest::new(topics::LSS_MSG, msg_bytes);
|
let (req1, reply_rx) = ChannelRequest::new(topics::LSS_MSG, msg_bytes);
|
||||||
let _ = mqtt_tx.send(req1).await;
|
let _ = mqtt_tx.send(req1).await;
|
||||||
let first_lss_response = reply_rx.await?;
|
let first_lss_response = reply_rx.await?;
|
||||||
|
|
||||||
let ir = Response::from_slice(&first_lss_response.reply)?.as_init()?;
|
let ir = Response::from_slice(&first_lss_response.reply)?.as_init()?;
|
||||||
|
|
||||||
let (lss_conn, msg_bytes2) = LssBroker::new(uri, ir, spk).await?;
|
let (lss_conn, msg_bytes2) = LssBroker::new(uri, ir, spk).await?;
|
||||||
@@ -144,10 +145,11 @@ pub async fn lss_setup(uri: &str, mut lss_rx: mpsc::Receiver<LssReq>, mqtt_tx: m
|
|||||||
|
|
||||||
lss_conn.handle(Response::Created(cr)).await?;
|
lss_conn.handle(Response::Created(cr)).await?;
|
||||||
|
|
||||||
let persister = lss_conn.persister();
|
// msg handler (from CLN looper)
|
||||||
|
let lss_conn_ = lss_conn.clone();
|
||||||
tokio::task::spawn(async move{
|
tokio::task::spawn(async move{
|
||||||
while let Some(req) = lss_rx.recv().await {
|
while let Some(req) = lss_rx.recv().await {
|
||||||
match lss_handle(&persister, &req.message).await {
|
match lss_conn_.handle_bytes(&req.message).await {
|
||||||
Ok(msg) => {
|
Ok(msg) => {
|
||||||
log::info!("payload to send {:?}", &msg);
|
log::info!("payload to send {:?}", &msg);
|
||||||
let _ = req.reply_tx.send(msg);
|
let _ = req.reply_tx.send(msg);
|
||||||
@@ -159,13 +161,42 @@ pub async fn lss_setup(uri: &str, mut lss_rx: mpsc::Receiver<LssReq>, mqtt_tx: m
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// reconnect handler (when a client reconnects)
|
||||||
|
let lss_conn_ = lss_conn.clone();
|
||||||
|
let mqtt_tx_ = mqtt_tx.clone();
|
||||||
|
tokio::task::spawn(async move{
|
||||||
|
while let Some((cid, connected)) = reconn_rx.recv().await {
|
||||||
|
if connected {
|
||||||
|
log::info!("CLIENT {} reconnected!", cid);
|
||||||
|
if let Err(e) = reconnect_dance(&cid, &lss_conn_, &mqtt_tx_).await {
|
||||||
|
log::error!("reconnect dance failed {:?}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
Ok(lss_conn)
|
Ok(lss_conn)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn reconnect_dance(cid: &str, lss_conn: &LssBroker, mqtt_tx: &mpsc::Sender<ChannelRequest>) -> Result<()> {
|
||||||
|
let init_bytes = lss_conn.make_init_msg().await?;
|
||||||
|
let (req, reply_rx) = ChannelRequest::new_for(cid, topics::LSS_MSG, init_bytes);
|
||||||
|
let _ = mqtt_tx.send(req).await;
|
||||||
|
let first_lss_response = reply_rx.await?;
|
||||||
|
let state_bytes = lss_conn.get_initial_state_msg(&first_lss_response.reply).await?;
|
||||||
|
let (req2, reply_rx2) = ChannelRequest::new_for(cid, topics::LSS_MSG, state_bytes);
|
||||||
|
let _ = mqtt_tx.send(req2).await;
|
||||||
|
let created_res = reply_rx2.await?;
|
||||||
|
let cr = Response::from_slice(&created_res.reply)?.as_created()?;
|
||||||
|
lss_conn.handle(Response::Created(cr)).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
// blocks until a connection received
|
// blocks until a connection received
|
||||||
pub async fn broker_setup(
|
pub async fn broker_setup(
|
||||||
settings: Settings,
|
settings: Settings,
|
||||||
mqtt_rx: mpsc::Receiver<ChannelRequest>,
|
mqtt_rx: mpsc::Receiver<ChannelRequest>,
|
||||||
|
reconn_tx: mpsc::Sender<(String, bool)>,
|
||||||
error_tx: broadcast::Sender<Vec<u8>>,
|
error_tx: broadcast::Sender<Vec<u8>>,
|
||||||
) -> Arc<Mutex<Connections>> {
|
) -> Arc<Mutex<Connections>> {
|
||||||
let (auth_tx, auth_rx) = std::sync::mpsc::channel::<AuthMsg>();
|
let (auth_tx, auth_rx) = std::sync::mpsc::channel::<AuthMsg>();
|
||||||
@@ -199,6 +230,7 @@ pub async fn broker_setup(
|
|||||||
// client connections state
|
// client connections state
|
||||||
let (startup_tx, startup_rx) = std_oneshot::channel();
|
let (startup_tx, startup_rx) = std_oneshot::channel();
|
||||||
let conns_ = conns.clone();
|
let conns_ = conns.clone();
|
||||||
|
let reconn_tx_ = reconn_tx.clone();
|
||||||
std::thread::spawn(move || {
|
std::thread::spawn(move || {
|
||||||
log::info!("=> wait for connected status");
|
log::info!("=> wait for connected status");
|
||||||
// wait for connection = true
|
// wait for connection = true
|
||||||
@@ -211,6 +243,7 @@ pub async fn broker_setup(
|
|||||||
while let Ok((cid, connected)) = status_rx.recv() {
|
while let Ok((cid, connected)) = status_rx.recv() {
|
||||||
let mut cs = conns_.lock().unwrap();
|
let mut cs = conns_.lock().unwrap();
|
||||||
cs.client_action(&cid, connected);
|
cs.client_action(&cid, connected);
|
||||||
|
let _ = reconn_tx_.blocking_send((cid, connected));
|
||||||
drop(cs)
|
drop(cs)
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -17,11 +17,12 @@ pub async fn run_test() -> rocket::Rocket<rocket::Build> {
|
|||||||
let settings = Settings::default();
|
let settings = Settings::default();
|
||||||
let (mqtt_tx, mqtt_rx) = mpsc::channel(10000);
|
let (mqtt_tx, mqtt_rx) = mpsc::channel(10000);
|
||||||
let (error_tx, error_rx) = broadcast::channel(10000);
|
let (error_tx, error_rx) = broadcast::channel(10000);
|
||||||
|
let (conn_tx, _conn_rx) = mpsc::channel(10000);
|
||||||
|
|
||||||
crate::error_log::log_errors(error_rx);
|
crate::error_log::log_errors(error_rx);
|
||||||
|
|
||||||
// block until connection
|
// block until connection
|
||||||
let conns = crate::broker_setup(settings, mqtt_rx, error_tx.clone()).await;
|
let conns = crate::broker_setup(settings, mqtt_rx, conn_tx.clone(), error_tx.clone()).await;
|
||||||
log::info!("=> off to the races!");
|
log::info!("=> off to the races!");
|
||||||
|
|
||||||
let tx_ = mqtt_tx.clone();
|
let tx_ = mqtt_tx.clone();
|
||||||
|
|||||||
Reference in New Issue
Block a user