diff --git a/broker/Cargo.lock b/broker/Cargo.lock index 272ae82..8d03638 100644 --- a/broker/Cargo.lock +++ b/broker/Cargo.lock @@ -1691,7 +1691,7 @@ dependencies = [ [[package]] name = "lss-connector" version = "0.1.0" -source = "git+https://github.com/stakwork/sphinx-rs?rev=4f3485a687f6e518282c981d9ecf0adfca38e947#4f3485a687f6e518282c981d9ecf0adfca38e947" +source = "git+https://github.com/stakwork/sphinx-rs?rev=b4b0f78f24e91cb632ef3d99a3304435c9dd3540#b4b0f78f24e91cb632ef3d99a3304435c9dd3540" dependencies = [ "anyhow", "lightning-storage-server", @@ -2693,7 +2693,7 @@ dependencies = [ [[package]] name = "rmp-utils" version = "0.1.0" -source = "git+https://github.com/stakwork/sphinx-rs?rev=4f3485a687f6e518282c981d9ecf0adfca38e947#4f3485a687f6e518282c981d9ecf0adfca38e947" +source = "git+https://github.com/stakwork/sphinx-rs?rev=b4b0f78f24e91cb632ef3d99a3304435c9dd3540#b4b0f78f24e91cb632ef3d99a3304435c9dd3540" dependencies = [ "anyhow", "log", @@ -3268,7 +3268,7 @@ dependencies = [ [[package]] name = "sphinx-auther" version = "0.1.12" -source = "git+https://github.com/stakwork/sphinx-rs?rev=4f3485a687f6e518282c981d9ecf0adfca38e947#4f3485a687f6e518282c981d9ecf0adfca38e947" +source = "git+https://github.com/stakwork/sphinx-rs?rev=b4b0f78f24e91cb632ef3d99a3304435c9dd3540#b4b0f78f24e91cb632ef3d99a3304435c9dd3540" dependencies = [ "anyhow", "base64 0.21.2", @@ -3280,7 +3280,7 @@ dependencies = [ [[package]] name = "sphinx-glyph" version = "0.1.2" -source = "git+https://github.com/stakwork/sphinx-rs?rev=4f3485a687f6e518282c981d9ecf0adfca38e947#4f3485a687f6e518282c981d9ecf0adfca38e947" +source = "git+https://github.com/stakwork/sphinx-rs?rev=b4b0f78f24e91cb632ef3d99a3304435c9dd3540#b4b0f78f24e91cb632ef3d99a3304435c9dd3540" dependencies = [ "anyhow", "hex", @@ -3326,7 +3326,7 @@ dependencies = [ [[package]] name = "sphinx-signer" version = "0.1.0" -source = "git+https://github.com/stakwork/sphinx-rs?rev=4f3485a687f6e518282c981d9ecf0adfca38e947#4f3485a687f6e518282c981d9ecf0adfca38e947" +source = "git+https://github.com/stakwork/sphinx-rs?rev=b4b0f78f24e91cb632ef3d99a3304435c9dd3540#b4b0f78f24e91cb632ef3d99a3304435c9dd3540" dependencies = [ "anyhow", "bip39", diff --git a/broker/Cargo.toml b/broker/Cargo.toml index 6584fee..a7e0f38 100644 --- a/broker/Cargo.toml +++ b/broker/Cargo.toml @@ -39,8 +39,8 @@ vls-proxy = { git = "https://gitlab.com/lightning-signer/validating-li # vls-protocol-client = { path = "../../vls/vls-protocol-client" } # vls-proxy = { path = "../../vls/vls-proxy" } -lss-connector = { git = "https://github.com/stakwork/sphinx-rs", rev = "4f3485a687f6e518282c981d9ecf0adfca38e947" } -sphinx-signer = { git = "https://github.com/stakwork/sphinx-rs", rev = "4f3485a687f6e518282c981d9ecf0adfca38e947" } +lss-connector = { git = "https://github.com/stakwork/sphinx-rs", rev = "b4b0f78f24e91cb632ef3d99a3304435c9dd3540" } +sphinx-signer = { git = "https://github.com/stakwork/sphinx-rs", rev = "b4b0f78f24e91cb632ef3d99a3304435c9dd3540" } # lss-connector = { path = "../../sphinx-rs/lss-connector" } # sphinx-signer = { path = "../../sphinx-rs/signer" } diff --git a/broker/src/chain_tracker.rs b/broker/src/chain_tracker.rs index c4f5331..be7be56 100644 --- a/broker/src/chain_tracker.rs +++ b/broker/src/chain_tracker.rs @@ -49,7 +49,7 @@ impl MqttSignerPort { if res_topic == topics::LSS_RES { // send LSS instead let lss_reply = self.send_lss(res).await?; - let (res_topic2, res2) = self.send_request_wait(topics::LSS_MSG, lss_reply).await?; + let (res_topic2, res2) = self.send_request_wait(&lss_reply.0, lss_reply.1).await?; if res_topic2 != topics::VLS_RES { log::warn!("ChainTracker got a topic NOT on {}", topics::VLS_RES); } @@ -70,8 +70,8 @@ impl MqttSignerPort { Ok((reply.topic_end, reply.reply)) } - async fn send_lss(&self, message: Vec) -> Result> { - let (request, reply_rx) = LssReq::new(message); + async fn send_lss(&self, message: Vec) -> Result<(String, Vec)> { + let (request, reply_rx) = LssReq::new(topics::LSS_MSG.to_string(), message); self.lss_tx.send(request).await?; let res = reply_rx.await?; Ok(res) diff --git a/broker/src/conn.rs b/broker/src/conn.rs index c9ed0e5..5c615aa 100644 --- a/broker/src/conn.rs +++ b/broker/src/conn.rs @@ -143,13 +143,14 @@ impl ChannelReply { /// Responses are received on the oneshot sender #[derive(Debug)] pub struct LssReq { + pub topic: String, pub message: Vec, - pub reply_tx: oneshot::Sender>, + pub reply_tx: oneshot::Sender<(String, Vec)>, } impl LssReq { - pub fn new(message: Vec) -> (Self, oneshot::Receiver>) { + pub fn new(topic: String, message: Vec) -> (Self, oneshot::Receiver<(String, Vec)>) { let (reply_tx, reply_rx) = oneshot::channel(); - let cr = Self { message, reply_tx }; + let cr = Self { topic, message, reply_tx }; (cr, reply_rx) } } diff --git a/broker/src/looper.rs b/broker/src/looper.rs index ffe280a..710461a 100644 --- a/broker/src/looper.rs +++ b/broker/src/looper.rs @@ -175,23 +175,29 @@ impl SignerLoop { .unwrap_or([0u8; 33]); let md = parser::raw_request_from_bytes(message, ticket, peer_id, dbid)?; // send to signer - log::info!("SEND ON {}", topics::VLS); - let (res_topic, res) = self.send_request_wait(topics::VLS, md)?; - log::info!("GOT ON {}", res_topic); - let the_res = if res_topic == topics::LSS_RES { - // send reply to LSS to store muts - let lss_reply = self.send_lss(res)?; - log::info!("LSS REPLY LEN {}", &lss_reply.len()); - // send to signer for HMAC validation, and get final reply - log::info!("SEND ON {}", topics::LSS_MSG); - let (res_topic2, res2) = self.send_request_wait(topics::LSS_MSG, lss_reply)?; - log::info!("GOT ON {}, send to CLN", res_topic2); - if res_topic2 != topics::VLS_RES { - log::warn!("got a topic NOT on {}", topics::VLS_RES); - } - res2 - } else { - res + let the_res = loop { + log::info!("SEND ON {}", topics::VLS); + let (res_topic, res) = self.send_request_wait(topics::VLS, md.clone())?; + log::info!("GOT ON {}", res_topic); + if res_topic == topics::LSS_RES { + // send reply to LSS to store muts + let lss_reply = self.send_lss(topics::LSS_MSG.to_string(), res)?; + log::info!("LSS REPLY LEN {}", &lss_reply.1.len()); + // send to signer for HMAC validation, and get final reply + log::info!("SEND ON {}", lss_reply.0); + let (res_topic2, res2) = self.send_request_wait(&lss_reply.0, lss_reply.1)?; + log::info!("GOT ON {}, send to CLN?", res_topic2); + if res_topic2 != topics::VLS_RES { + log::warn!("got a topic NOT on {}", topics::VLS_RES); + } + if res_topic2 == topics::LSS_CONFLICT_RES { + // try again... + continue; + } + break res2; + } else { + break res; + }; }; // create reply bytes for CLN let reply = parser::raw_response_from_bytes(the_res, ticket)?; @@ -237,9 +243,9 @@ impl SignerLoop { Ok((reply.topic_end, reply.reply)) } - fn send_lss(&mut self, message: Vec) -> Result> { + fn send_lss(&mut self, topic: String, message: Vec) -> Result<(String, Vec)> { // Send a request to the LSS server - let (request, reply_rx) = LssReq::new(message); + let (request, reply_rx) = LssReq::new(topic, message); self.lss_tx.blocking_send(request).map_err(|_| Error::Eof)?; let res = reply_rx.blocking_recv().map_err(|_| Error::Eof)?; Ok(res)