Merge pull request #86 from stakwork/feat/init-topic

Feat/init topic
This commit is contained in:
Evan Feenstra
2023-06-22 13:11:45 -07:00
committed by GitHub
12 changed files with 153 additions and 117 deletions

28
Cargo.lock generated
View File

@@ -224,7 +224,7 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "bolt-derive"
version = "0.1.0"
source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=b355d2235b4ff0a223820e16889ba9cf939c0752#b355d2235b4ff0a223820e16889ba9cf939c0752"
source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=1228aba0c418009c926d78c21501fef0ecc96015#1228aba0c418009c926d78c21501fef0ecc96015"
dependencies = [
"proc-macro2",
"quote",
@@ -1189,7 +1189,7 @@ checksum = "b06a4cde4c0f271a446782e3eff8de789548ce57dbc8eca9292c27f4a42004b4"
[[package]]
name = "lss-connector"
version = "0.1.0"
source = "git+https://github.com/stakwork/sphinx-rs.git#057476d30a233d82d85456a83a10401b72533a8c"
source = "git+https://github.com/stakwork/sphinx-rs.git#b84522dd596606aea9899969fe0d3f68d4eb6632"
dependencies = [
"anyhow",
"log",
@@ -1757,7 +1757,7 @@ dependencies = [
[[package]]
name = "sphinx-auther"
version = "0.1.12"
source = "git+https://github.com/stakwork/sphinx-rs.git#057476d30a233d82d85456a83a10401b72533a8c"
source = "git+https://github.com/stakwork/sphinx-rs.git#b84522dd596606aea9899969fe0d3f68d4eb6632"
dependencies = [
"anyhow",
"base64",
@@ -1769,7 +1769,7 @@ dependencies = [
[[package]]
name = "sphinx-crypter"
version = "0.1.0"
source = "git+https://github.com/stakwork/sphinx-rs.git#057476d30a233d82d85456a83a10401b72533a8c"
source = "git+https://github.com/stakwork/sphinx-rs.git#b84522dd596606aea9899969fe0d3f68d4eb6632"
dependencies = [
"anyhow",
"chacha20poly1305",
@@ -1780,7 +1780,7 @@ dependencies = [
[[package]]
name = "sphinx-glyph"
version = "0.1.2"
source = "git+https://github.com/stakwork/sphinx-rs.git#057476d30a233d82d85456a83a10401b72533a8c"
source = "git+https://github.com/stakwork/sphinx-rs.git#b84522dd596606aea9899969fe0d3f68d4eb6632"
dependencies = [
"anyhow",
"hex",
@@ -1832,7 +1832,7 @@ dependencies = [
[[package]]
name = "sphinx-signer"
version = "0.1.0"
source = "git+https://github.com/stakwork/sphinx-rs.git#057476d30a233d82d85456a83a10401b72533a8c"
source = "git+https://github.com/stakwork/sphinx-rs.git#b84522dd596606aea9899969fe0d3f68d4eb6632"
dependencies = [
"anyhow",
"bip39",
@@ -2112,8 +2112,8 @@ checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
[[package]]
name = "vls-core"
version = "0.9.0"
source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=b355d2235b4ff0a223820e16889ba9cf939c0752#b355d2235b4ff0a223820e16889ba9cf939c0752"
version = "0.9.1"
source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=1228aba0c418009c926d78c21501fef0ecc96015#1228aba0c418009c926d78c21501fef0ecc96015"
dependencies = [
"anyhow",
"bitcoin",
@@ -2134,8 +2134,8 @@ dependencies = [
[[package]]
name = "vls-persist"
version = "0.9.0"
source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=b355d2235b4ff0a223820e16889ba9cf939c0752#b355d2235b4ff0a223820e16889ba9cf939c0752"
version = "0.9.1"
source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=1228aba0c418009c926d78c21501fef0ecc96015#1228aba0c418009c926d78c21501fef0ecc96015"
dependencies = [
"hex",
"log",
@@ -2147,8 +2147,8 @@ dependencies = [
[[package]]
name = "vls-protocol"
version = "0.9.0"
source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=b355d2235b4ff0a223820e16889ba9cf939c0752#b355d2235b4ff0a223820e16889ba9cf939c0752"
version = "0.9.1"
source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=1228aba0c418009c926d78c21501fef0ecc96015#1228aba0c418009c926d78c21501fef0ecc96015"
dependencies = [
"as-any",
"bolt-derive",
@@ -2161,8 +2161,8 @@ dependencies = [
[[package]]
name = "vls-protocol-signer"
version = "0.9.0"
source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=b355d2235b4ff0a223820e16889ba9cf939c0752#b355d2235b4ff0a223820e16889ba9cf939c0752"
version = "0.9.1"
source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=1228aba0c418009c926d78c21501fef0ecc96015#1228aba0c418009c926d78c21501fef0ecc96015"
dependencies = [
"bit-vec",
"log",

40
broker/Cargo.lock generated
View File

@@ -381,7 +381,7 @@ dependencies = [
[[package]]
name = "bolt-derive"
version = "0.1.0"
source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=b355d2235b4ff0a223820e16889ba9cf939c0752#b355d2235b4ff0a223820e16889ba9cf939c0752"
source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=1228aba0c418009c926d78c21501fef0ecc96015#1228aba0c418009c926d78c21501fef0ecc96015"
dependencies = [
"proc-macro2",
"quote",
@@ -1582,7 +1582,7 @@ dependencies = [
[[package]]
name = "lightning-storage-server"
version = "0.3.0"
source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=b355d2235b4ff0a223820e16889ba9cf939c0752#b355d2235b4ff0a223820e16889ba9cf939c0752"
source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=1228aba0c418009c926d78c21501fef0ecc96015#1228aba0c418009c926d78c21501fef0ecc96015"
dependencies = [
"anyhow",
"async-trait",
@@ -1658,7 +1658,7 @@ dependencies = [
[[package]]
name = "lss-connector"
version = "0.1.0"
source = "git+https://github.com/stakwork/sphinx-rs#d3f22ea08e8da221375b0086c3187479c3a01b13"
source = "git+https://github.com/stakwork/sphinx-rs#b84522dd596606aea9899969fe0d3f68d4eb6632"
dependencies = [
"anyhow",
"lightning-storage-server",
@@ -3190,7 +3190,7 @@ dependencies = [
[[package]]
name = "sphinx-auther"
version = "0.1.12"
source = "git+https://github.com/stakwork/sphinx-rs#d3f22ea08e8da221375b0086c3187479c3a01b13"
source = "git+https://github.com/stakwork/sphinx-rs#b84522dd596606aea9899969fe0d3f68d4eb6632"
dependencies = [
"anyhow",
"base64 0.13.1",
@@ -3202,7 +3202,7 @@ dependencies = [
[[package]]
name = "sphinx-glyph"
version = "0.1.2"
source = "git+https://github.com/stakwork/sphinx-rs#d3f22ea08e8da221375b0086c3187479c3a01b13"
source = "git+https://github.com/stakwork/sphinx-rs#b84522dd596606aea9899969fe0d3f68d4eb6632"
dependencies = [
"anyhow",
"hex",
@@ -3248,7 +3248,7 @@ dependencies = [
[[package]]
name = "sphinx-signer"
version = "0.1.0"
source = "git+https://github.com/stakwork/sphinx-rs#d3f22ea08e8da221375b0086c3187479c3a01b13"
source = "git+https://github.com/stakwork/sphinx-rs#b84522dd596606aea9899969fe0d3f68d4eb6632"
dependencies = [
"anyhow",
"bip39",
@@ -3907,8 +3907,8 @@ checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
[[package]]
name = "vls-core"
version = "0.9.0"
source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=b355d2235b4ff0a223820e16889ba9cf939c0752#b355d2235b4ff0a223820e16889ba9cf939c0752"
version = "0.9.1"
source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=1228aba0c418009c926d78c21501fef0ecc96015#1228aba0c418009c926d78c21501fef0ecc96015"
dependencies = [
"anyhow",
"backtrace",
@@ -3931,8 +3931,8 @@ dependencies = [
[[package]]
name = "vls-frontend"
version = "0.9.0"
source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=b355d2235b4ff0a223820e16889ba9cf939c0752#b355d2235b4ff0a223820e16889ba9cf939c0752"
version = "0.9.1"
source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=1228aba0c418009c926d78c21501fef0ecc96015#1228aba0c418009c926d78c21501fef0ecc96015"
dependencies = [
"async-trait",
"lightning-storage-server",
@@ -3945,8 +3945,8 @@ dependencies = [
[[package]]
name = "vls-persist"
version = "0.9.0"
source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=b355d2235b4ff0a223820e16889ba9cf939c0752#b355d2235b4ff0a223820e16889ba9cf939c0752"
version = "0.9.1"
source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=1228aba0c418009c926d78c21501fef0ecc96015#1228aba0c418009c926d78c21501fef0ecc96015"
dependencies = [
"hex",
"kv",
@@ -3959,8 +3959,8 @@ dependencies = [
[[package]]
name = "vls-protocol"
version = "0.9.0"
source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=b355d2235b4ff0a223820e16889ba9cf939c0752#b355d2235b4ff0a223820e16889ba9cf939c0752"
version = "0.9.1"
source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=1228aba0c418009c926d78c21501fef0ecc96015#1228aba0c418009c926d78c21501fef0ecc96015"
dependencies = [
"as-any",
"bolt-derive",
@@ -3973,8 +3973,8 @@ dependencies = [
[[package]]
name = "vls-protocol-client"
version = "0.9.0"
source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=b355d2235b4ff0a223820e16889ba9cf939c0752#b355d2235b4ff0a223820e16889ba9cf939c0752"
version = "0.9.1"
source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=1228aba0c418009c926d78c21501fef0ecc96015#1228aba0c418009c926d78c21501fef0ecc96015"
dependencies = [
"anyhow",
"async-trait",
@@ -3988,8 +3988,8 @@ dependencies = [
[[package]]
name = "vls-protocol-signer"
version = "0.9.0"
source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=b355d2235b4ff0a223820e16889ba9cf939c0752#b355d2235b4ff0a223820e16889ba9cf939c0752"
version = "0.9.1"
source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=1228aba0c418009c926d78c21501fef0ecc96015#1228aba0c418009c926d78c21501fef0ecc96015"
dependencies = [
"bit-vec",
"log",
@@ -4000,8 +4000,8 @@ dependencies = [
[[package]]
name = "vls-proxy"
version = "0.9.0"
source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=b355d2235b4ff0a223820e16889ba9cf939c0752#b355d2235b4ff0a223820e16889ba9cf939c0752"
version = "0.9.1"
source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=1228aba0c418009c926d78c21501fef0ecc96015#1228aba0c418009c926d78c21501fef0ecc96015"
dependencies = [
"anyhow",
"as-any",

View File

@@ -30,10 +30,10 @@ thiserror = "1.0.31"
toml = "0.5.9"
url = { version = "2.2" }
vls-frontend = { git = "https://gitlab.com/lightning-signer/validating-lightning-signer.git", rev = "b355d2235b4ff0a223820e16889ba9cf939c0752" }
vls-protocol = { git = "https://gitlab.com/lightning-signer/validating-lightning-signer.git", rev = "b355d2235b4ff0a223820e16889ba9cf939c0752" }
vls-protocol-client = { git = "https://gitlab.com/lightning-signer/validating-lightning-signer.git", rev = "b355d2235b4ff0a223820e16889ba9cf939c0752" }
vls-proxy = { git = "https://gitlab.com/lightning-signer/validating-lightning-signer.git", rev = "b355d2235b4ff0a223820e16889ba9cf939c0752" }
vls-frontend = { git = "https://gitlab.com/lightning-signer/validating-lightning-signer.git", rev = "1228aba0c418009c926d78c21501fef0ecc96015" }
vls-protocol = { git = "https://gitlab.com/lightning-signer/validating-lightning-signer.git", rev = "1228aba0c418009c926d78c21501fef0ecc96015" }
vls-protocol-client = { git = "https://gitlab.com/lightning-signer/validating-lightning-signer.git", rev = "1228aba0c418009c926d78c21501fef0ecc96015" }
vls-proxy = { git = "https://gitlab.com/lightning-signer/validating-lightning-signer.git", rev = "1228aba0c418009c926d78c21501fef0ecc96015" }
lss-connector = { git = "https://github.com/stakwork/sphinx-rs" }
sphinx-signer = { git = "https://github.com/stakwork/sphinx-rs" }

View File

@@ -160,16 +160,16 @@ impl<C: 'static + Client> SignerLoop<C> {
let md = parser::raw_request_from_bytes(message, self.chan.sequence, peer_id, dbid)?;
// send to signer
log::info!("SEND ON {}", topics::VLS);
let (res_topic, res) = self.send_request_and_get_reply(topics::VLS, md)?;
let (res_topic, res) = self.send_request_wait(topics::VLS, md)?;
log::info!("GOT ON {}", res_topic);
let mut the_res = res.clone();
if res_topic == topics::LSS_RES {
// send reply to LSS to store muts
let lss_reply = self.send_lss_and_get_reply(res)?;
let lss_reply = self.send_lss(res)?;
log::info!("LSS REPLY LEN {}", &lss_reply.len());
// send to signer for HMAC validation, and get final reply
log::info!("SEND ON {}", topics::LSS_MSG);
let (res_topic2, res2) = self.send_request_and_get_reply(topics::LSS_MSG, lss_reply)?;
let (res_topic2, res2) = self.send_request_wait(topics::LSS_MSG, lss_reply)?;
log::info!("GOT ON {}, send to CLN", res_topic2);
if res_topic2 != topics::VLS_RETURN {
log::warn!("got a topic NOT on {}", topics::VLS_RETURN);
@@ -206,11 +206,7 @@ impl<C: 'static + Client> SignerLoop<C> {
// returns (topic, payload)
// might halt if signer is offline
fn send_request_and_get_reply(
&mut self,
topic: &str,
message: Vec<u8>,
) -> Result<(String, Vec<u8>)> {
fn send_request_wait(&mut self, topic: &str, message: Vec<u8>) -> Result<(String, Vec<u8>)> {
// Send a request to the MQTT handler to send to signer
let (request, reply_rx) = ChannelRequest::new(topic, message);
// This can fail if MQTT shuts down
@@ -223,7 +219,7 @@ impl<C: 'static + Client> SignerLoop<C> {
Ok((reply.topic_end, reply.reply))
}
fn send_lss_and_get_reply(&mut self, message: Vec<u8>) -> Result<Vec<u8>> {
fn send_lss(&mut self, message: Vec<u8>) -> Result<Vec<u8>> {
// Send a request to the MQTT handler to send to signer
let (request, reply_rx) = LssReq::new(message);
// This can fail if MQTT shuts down

View File

@@ -12,14 +12,14 @@ pub async fn lss_setup(uri: &str, mqtt_tx: mpsc::Sender<ChannelRequest>) -> Resu
// LSS required
let (spk, msg_bytes) = LssBroker::get_server_pubkey(uri).await?;
let reply = ChannelRequest::send(topics::LSS_MSG, msg_bytes, &mqtt_tx).await?;
let reply = ChannelRequest::send(topics::INIT_MSG, msg_bytes, &mqtt_tx).await?;
let ir = Response::from_slice(&reply)?.as_init()?;
let lss_conn = LssBroker::new(uri, ir.clone(), spk).await?;
// this only returns the initial state if it was requested by signer
let msg_bytes2 = lss_conn.get_created_state_msg(&ir).await?;
let reply2 = ChannelRequest::send(topics::LSS_MSG, msg_bytes2, &mqtt_tx).await?;
let reply2 = ChannelRequest::send(topics::INIT_MSG, msg_bytes2, &mqtt_tx).await?;
let cr = Response::from_slice(&reply2)?.as_created()?;
lss_conn.handle(Response::Created(cr)).await?;
@@ -27,7 +27,7 @@ pub async fn lss_setup(uri: &str, mqtt_tx: mpsc::Sender<ChannelRequest>) -> Resu
Ok(lss_conn)
}
pub fn lss_tasks(lss_conn: LssBroker, mut lss_rx: mpsc::Receiver<LssReq>, mut reconn_rx: mpsc::Receiver<(String, bool)>, mqtt_tx: mpsc::Sender<ChannelRequest>) {
pub fn lss_tasks(lss_conn: LssBroker, mut lss_rx: mpsc::Receiver<LssReq>, mut reconn_rx: mpsc::Receiver<(String, bool)>, init_tx: mpsc::Sender<ChannelRequest>) {
// msg handler (from CLN looper)
let lss_conn_ = lss_conn.clone();
tokio::task::spawn(async move{
@@ -45,12 +45,12 @@ pub fn lss_tasks(lss_conn: LssBroker, mut lss_rx: mpsc::Receiver<LssReq>, mut re
// reconnect handler (when a client reconnects)
let lss_conn_ = lss_conn.clone();
let mqtt_tx_ = mqtt_tx.clone();
let init_tx_ = init_tx.clone();
tokio::task::spawn(async move{
while let Some((cid, connected)) = reconn_rx.recv().await {
if connected {
log::info!("CLIENT {} reconnected!", cid);
if let Err(e) = reconnect_dance(&cid, &lss_conn_, &mqtt_tx_).await {
if let Err(e) = reconnect_dance(&cid, &lss_conn_, &init_tx_).await {
log::error!("reconnect dance failed {:?}", e);
}
}
@@ -62,10 +62,10 @@ async fn reconnect_dance(cid: &str, lss_conn: &LssBroker, mqtt_tx: &mpsc::Sender
// sleep 3 seconds to make sure ESP32 subscription is active
tokio::time::sleep(Duration::from_secs(3)).await;
let init_bytes = lss_conn.make_init_msg().await?;
let reply = ChannelRequest::send_for(cid, topics::LSS_MSG, init_bytes, mqtt_tx).await?;
let reply = ChannelRequest::send_for(cid, topics::INIT_MSG, init_bytes, mqtt_tx).await?;
let ir = Response::from_slice(&reply)?.as_init()?;
let state_bytes = lss_conn.get_created_state_msg(&ir).await?;
let reply2 = ChannelRequest::send_for(cid, topics::LSS_MSG, state_bytes, mqtt_tx).await?;
let reply2 = ChannelRequest::send_for(cid, topics::INIT_MSG, state_bytes, mqtt_tx).await?;
let cr = Response::from_slice(&reply2)?.as_created()?;
lss_conn.handle(Response::Created(cr)).await?;
Ok(())

View File

@@ -70,13 +70,14 @@ async fn run_main(parent_fd: i32) -> rocket::Rocket<rocket::Build> {
let settings = read_broker_config();
let (mqtt_tx, mqtt_rx) = mpsc::channel(10000);
let (init_tx, init_rx) = mpsc::channel(10000);
let (error_tx, error_rx) = broadcast::channel(10000);
error_log::log_errors(error_rx);
let (reconn_tx, reconn_rx) = mpsc::channel::<(String, bool)>(10000);
// waits until first connection
let conns = broker_setup(settings, mqtt_rx, reconn_tx.clone(), error_tx.clone()).await;
let conns = broker_setup(settings, mqtt_rx, init_rx, reconn_tx.clone(), error_tx.clone()).await;
tokio::time::sleep(Duration::from_secs(1)).await;
@@ -84,7 +85,7 @@ async fn run_main(parent_fd: i32) -> rocket::Rocket<rocket::Build> {
let _lss_broker = if let Ok(lss_uri) = env::var("VLS_LSS") {
// waits until LSS confirmation from signer
let lss_broker = loop {
match lss::lss_setup(&lss_uri, mqtt_tx.clone()).await{
match lss::lss_setup(&lss_uri, init_tx.clone()).await{
Ok(l) => {
break l;
},
@@ -95,7 +96,7 @@ async fn run_main(parent_fd: i32) -> rocket::Rocket<rocket::Build> {
}
}
};
lss::lss_tasks(lss_broker.clone(), lss_rx, reconn_rx, mqtt_tx.clone());
lss::lss_tasks(lss_broker.clone(), lss_rx, reconn_rx, init_tx);
log::info!("=> lss broker connection created!");
Some(lss_broker)
} else {
@@ -137,6 +138,7 @@ async fn run_main(parent_fd: i32) -> rocket::Rocket<rocket::Build> {
pub async fn broker_setup(
settings: Settings,
mqtt_rx: mpsc::Receiver<ChannelRequest>,
init_rx: mpsc::Receiver<ChannelRequest>,
reconn_tx: mpsc::Sender<(String, bool)>,
error_tx: broadcast::Sender<Vec<u8>>,
) -> Arc<Mutex<Connections>> {
@@ -161,6 +163,7 @@ pub async fn broker_setup(
start_broker(
settings,
mqtt_rx,
init_rx,
status_tx,
error_tx.clone(),
auth_tx,
@@ -169,7 +172,7 @@ pub async fn broker_setup(
.expect("BROKER FAILED TO START");
// client connections state
let (startup_tx, startup_rx) = std_oneshot::channel();
let (startup_tx, startup_rx) = std_oneshot::channel::<String>();
let conns_ = conns.clone();
let reconn_tx_ = reconn_tx.clone();
std::thread::spawn(move || {
@@ -180,7 +183,7 @@ pub async fn broker_setup(
cs.client_action(&cid, connected);
drop(cs);
log::info!("=> connected: {}: {}", cid, connected);
let _ = startup_tx.send(true);
let _ = startup_tx.send(cid.to_string());
while let Ok((cid, connected)) = status_rx.recv() {
log::info!("=> reconnected: {}: {}", cid, connected);
let mut cs = conns_.lock().unwrap();
@@ -189,7 +192,7 @@ pub async fn broker_setup(
let _ = reconn_tx_.blocking_send((cid, connected));
}
});
let _ = startup_rx.recv();
let _first_client_id = startup_rx.recv();
conns
}

View File

@@ -14,6 +14,7 @@ use std::time::Duration;
pub fn start_broker(
settings: Settings,
mut receiver: mpsc::Receiver<ChannelRequest>,
mut init_receiver: mpsc::Receiver<ChannelRequest>,
status_sender: std::sync::mpsc::Sender<(String, bool)>,
error_sender: broadcast::Sender<Vec<u8>>,
auth_sender: std::sync::mpsc::Sender<AuthMsg>,
@@ -74,44 +75,25 @@ pub fn start_broker(
}
});
// String is the client id
// (client_id, topic_end, payload). topic_end is always topics::LSS_RES
let (init_tx, init_rx) = std::sync::mpsc::channel::<(String, String, Vec<u8>)>();
let mut link_tx_ = link_tx.clone();
let conns_ = connections.clone();
// receive replies from LSS initialization
let _init_task = std::thread::spawn(move || {
while let Some(msg) = init_receiver.blocking_recv() {
pub_and_wait(msg, &conns_, &init_rx, &mut link_tx_);
}
});
// (client_id, topic_end, payload)
let (msg_tx, msg_rx) = std::sync::mpsc::channel::<(String, String, Vec<u8>)>();
// receive from CLN, Frontend, Controller, or LSS
let conns_ = connections.clone();
let _relay_task = std::thread::spawn(move || {
while let Some(msg) = receiver.blocking_recv() {
loop {
let reply = if let Some(cid) = msg.cid.clone() {
// for a specific client
pub_wait(&cid, &msg.topic, &msg.message, &msg_rx, &mut link_tx)
} else {
// send to each client in turn
let cs = conns_.lock().unwrap();
let client_list = cs.clients.clone();
drop(cs);
// wait a second if there are no clients
if client_list.len() == 0 {
std::thread::sleep(Duration::from_secs(1));
None
} else {
let mut rep = None;
for cid in client_list.iter() {
rep = pub_wait(&cid, &msg.topic, &msg.message, &msg_rx, &mut link_tx);
if let Some(_) = &rep {
break;
}
}
rep
}
};
if let Some(reply) = reply {
if let Err(_) = msg.reply_tx.send(reply) {
log::warn!("could not send on reply_tx");
}
break;
}
}
pub_and_wait(msg, &connections, &msg_rx, &mut link_tx);
}
});
@@ -136,10 +118,15 @@ pub fn start_broker(
}
let cid = ts[0].to_string();
let topic_end = ts[1].to_string();
if let Err(e) =
msg_tx.send((cid, topic_end, f.publish.payload.to_vec()))
{
log::error!("failed to pub to msg_tx! {:?}", e);
let pld = f.publish.payload.to_vec();
if topic_end == topics::INIT_RES {
if let Err(e) = init_tx.send((cid, topic_end, pld)) {
log::error!("failed to pub to init_tx! {:?}", e);
}
} else {
if let Err(e) = msg_tx.send((cid, topic_end, pld)) {
log::error!("failed to pub to msg_tx! {:?}", e);
}
}
}
}
@@ -149,6 +136,7 @@ pub fn start_broker(
}
});
// _init_task.await.unwrap();
// _relay_task.await.unwrap();
// _sub_task.await.unwrap();
// _alerts_handle.await?;
@@ -158,9 +146,48 @@ pub fn start_broker(
Ok(())
}
// waits forever until the reply is returned
fn pub_and_wait(
msg: ChannelRequest,
conns_: &Arc<Mutex<Connections>>,
msg_rx: &std::sync::mpsc::Receiver<(String, String, Vec<u8>)>,
link_tx: &mut LinkTx,
) {
loop {
let reply = if let Some(cid) = msg.cid.clone() {
// for a specific client
pub_timeout(&cid, &msg.topic, &msg.message, &msg_rx, link_tx)
} else {
// send to each client in turn
let cs = conns_.lock().unwrap();
let client_list = cs.clients.clone();
drop(cs);
// wait a second if there are no clients
if client_list.len() == 0 {
std::thread::sleep(Duration::from_secs(1));
None
} else {
let mut rep = None;
for cid in client_list.iter() {
rep = pub_timeout(&cid, &msg.topic, &msg.message, &msg_rx, link_tx);
if let Some(_) = &rep {
break;
}
}
rep
}
};
if let Some(reply) = reply {
if let Err(_) = msg.reply_tx.send(reply) {
log::warn!("could not send on reply_tx");
}
break;
}
}
}
// publish to signer and wait for response
// if timeout is exceed, try next signer
fn pub_wait(
fn pub_timeout(
client_id: &str,
topic: &str,
payload: &[u8],
@@ -194,6 +221,8 @@ fn subs(cid: &str, mut ltx: LinkTx) {
ltx.subscribe(format!("{}/{}", cid, topics::ERROR)).unwrap();
ltx.subscribe(format!("{}/{}", cid, topics::LSS_RES))
.unwrap();
ltx.subscribe(format!("{}/{}", cid, topics::INIT_RES))
.unwrap();
}
fn unsubs(_cid: &str, mut _ltx: LinkTx) {

View File

@@ -16,13 +16,14 @@ pub async fn run_test() -> rocket::Rocket<rocket::Build> {
let settings = Settings::default();
let (mqtt_tx, mqtt_rx) = mpsc::channel(10000);
let (_init_tx, init_rx) = mpsc::channel(10000);
let (error_tx, error_rx) = broadcast::channel(10000);
let (conn_tx, _conn_rx) = mpsc::channel(10000);
crate::error_log::log_errors(error_rx);
// block until connection
let conns = crate::broker_setup(settings, mqtt_rx, conn_tx.clone(), error_tx.clone()).await;
let conns = crate::broker_setup(settings, mqtt_rx, init_rx, conn_tx.clone(), error_tx.clone()).await;
log::info!("=> off to the races!");
let tx_ = mqtt_tx.clone();

View File

@@ -112,7 +112,9 @@ pub fn make_client(
if topic.ends_with(topics::VLS) {
tx.send(CoreEvent::VlsMessage(data))
.expect("couldnt send Event::VlsMessage");
} else if topic.ends_with(topics::LSS_MSG) {
} else if topic.ends_with(topics::LSS_MSG)
|| topic.ends_with(topics::INIT_MSG)
{
log::debug!("received data len {}", data.len());
tx.send(CoreEvent::LssMessage(data))
.expect("couldnt send Event::LssMessage");

View File

@@ -35,7 +35,12 @@ pub enum Event {
pub const ROOT_STORE: &str = "/sdcard/store";
pub const SUB_TOPICS: &[&str] = &[topics::VLS, topics::LSS_MSG, topics::CONTROL];
pub const SUB_TOPICS: &[&str] = &[
topics::INIT_MSG,
topics::LSS_MSG,
topics::VLS,
topics::CONTROL,
];
fn mqtt_sub(
mqtt: &mut EspMqttClient<ConnState<MessageImpl, EspError>>,

View File

@@ -26,7 +26,7 @@ pub fn init_lss(
let server_pubkey = PublicKey::from_slice(&init.server_pubkey)?;
let (lss_signer, res1) = LssSigner::new(&handler_builder, &server_pubkey);
let lss_res_topic = format!("{}/{}", client_id, topics::LSS_RES);
let lss_res_topic = format!("{}/{}", client_id, topics::INIT_RES);
mqtt.publish(&lss_res_topic, QOS, false, &res1)
.expect("could not publish LSS response");

28
tester/Cargo.lock generated
View File

@@ -242,7 +242,7 @@ checksum = "6dbe3c979c178231552ecba20214a8272df4e09f232a87aef4320cf06539aded"
[[package]]
name = "bolt-derive"
version = "0.1.0"
source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=b355d2235b4ff0a223820e16889ba9cf939c0752#b355d2235b4ff0a223820e16889ba9cf939c0752"
source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=1228aba0c418009c926d78c21501fef0ecc96015#1228aba0c418009c926d78c21501fef0ecc96015"
dependencies = [
"proc-macro2",
"quote",
@@ -1136,7 +1136,7 @@ dependencies = [
[[package]]
name = "lss-connector"
version = "0.1.0"
source = "git+https://github.com/stakwork/sphinx-rs.git#057476d30a233d82d85456a83a10401b72533a8c"
source = "git+https://github.com/stakwork/sphinx-rs.git#b84522dd596606aea9899969fe0d3f68d4eb6632"
dependencies = [
"anyhow",
"log",
@@ -2081,7 +2081,7 @@ dependencies = [
[[package]]
name = "sphinx-auther"
version = "0.1.12"
source = "git+https://github.com/stakwork/sphinx-rs.git#057476d30a233d82d85456a83a10401b72533a8c"
source = "git+https://github.com/stakwork/sphinx-rs.git#b84522dd596606aea9899969fe0d3f68d4eb6632"
dependencies = [
"anyhow",
"base64 0.13.1",
@@ -2093,7 +2093,7 @@ dependencies = [
[[package]]
name = "sphinx-crypter"
version = "0.1.0"
source = "git+https://github.com/stakwork/sphinx-rs.git#057476d30a233d82d85456a83a10401b72533a8c"
source = "git+https://github.com/stakwork/sphinx-rs.git#b84522dd596606aea9899969fe0d3f68d4eb6632"
dependencies = [
"anyhow",
"chacha20poly1305",
@@ -2104,7 +2104,7 @@ dependencies = [
[[package]]
name = "sphinx-glyph"
version = "0.1.2"
source = "git+https://github.com/stakwork/sphinx-rs.git#057476d30a233d82d85456a83a10401b72533a8c"
source = "git+https://github.com/stakwork/sphinx-rs.git#b84522dd596606aea9899969fe0d3f68d4eb6632"
dependencies = [
"anyhow",
"hex",
@@ -2143,7 +2143,7 @@ dependencies = [
[[package]]
name = "sphinx-signer"
version = "0.1.0"
source = "git+https://github.com/stakwork/sphinx-rs.git#057476d30a233d82d85456a83a10401b72533a8c"
source = "git+https://github.com/stakwork/sphinx-rs.git#b84522dd596606aea9899969fe0d3f68d4eb6632"
dependencies = [
"anyhow",
"bip39",
@@ -2649,8 +2649,8 @@ checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
[[package]]
name = "vls-core"
version = "0.9.0"
source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=b355d2235b4ff0a223820e16889ba9cf939c0752#b355d2235b4ff0a223820e16889ba9cf939c0752"
version = "0.9.1"
source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=1228aba0c418009c926d78c21501fef0ecc96015#1228aba0c418009c926d78c21501fef0ecc96015"
dependencies = [
"anyhow",
"bitcoin",
@@ -2671,8 +2671,8 @@ dependencies = [
[[package]]
name = "vls-persist"
version = "0.9.0"
source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=b355d2235b4ff0a223820e16889ba9cf939c0752#b355d2235b4ff0a223820e16889ba9cf939c0752"
version = "0.9.1"
source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=1228aba0c418009c926d78c21501fef0ecc96015#1228aba0c418009c926d78c21501fef0ecc96015"
dependencies = [
"hex",
"log",
@@ -2684,8 +2684,8 @@ dependencies = [
[[package]]
name = "vls-protocol"
version = "0.9.0"
source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=b355d2235b4ff0a223820e16889ba9cf939c0752#b355d2235b4ff0a223820e16889ba9cf939c0752"
version = "0.9.1"
source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=1228aba0c418009c926d78c21501fef0ecc96015#1228aba0c418009c926d78c21501fef0ecc96015"
dependencies = [
"as-any",
"bolt-derive",
@@ -2698,8 +2698,8 @@ dependencies = [
[[package]]
name = "vls-protocol-signer"
version = "0.9.0"
source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=b355d2235b4ff0a223820e16889ba9cf939c0752#b355d2235b4ff0a223820e16889ba9cf939c0752"
version = "0.9.1"
source = "git+https://gitlab.com/lightning-signer/validating-lightning-signer.git?rev=1228aba0c418009c926d78c21501fef0ecc96015#1228aba0c418009c926d78c21501fef0ecc96015"
dependencies = [
"bit-vec",
"log",