put conflict topics

This commit is contained in:
Evan Feenstra
2023-09-15 14:27:49 -07:00
parent c7f44bb856
commit 258ecbe3f8
5 changed files with 39 additions and 32 deletions

10
broker/Cargo.lock generated
View File

@@ -1691,7 +1691,7 @@ dependencies = [
[[package]]
name = "lss-connector"
version = "0.1.0"
source = "git+https://github.com/stakwork/sphinx-rs?rev=4f3485a687f6e518282c981d9ecf0adfca38e947#4f3485a687f6e518282c981d9ecf0adfca38e947"
source = "git+https://github.com/stakwork/sphinx-rs?rev=b4b0f78f24e91cb632ef3d99a3304435c9dd3540#b4b0f78f24e91cb632ef3d99a3304435c9dd3540"
dependencies = [
"anyhow",
"lightning-storage-server",
@@ -2693,7 +2693,7 @@ dependencies = [
[[package]]
name = "rmp-utils"
version = "0.1.0"
source = "git+https://github.com/stakwork/sphinx-rs?rev=4f3485a687f6e518282c981d9ecf0adfca38e947#4f3485a687f6e518282c981d9ecf0adfca38e947"
source = "git+https://github.com/stakwork/sphinx-rs?rev=b4b0f78f24e91cb632ef3d99a3304435c9dd3540#b4b0f78f24e91cb632ef3d99a3304435c9dd3540"
dependencies = [
"anyhow",
"log",
@@ -3268,7 +3268,7 @@ dependencies = [
[[package]]
name = "sphinx-auther"
version = "0.1.12"
source = "git+https://github.com/stakwork/sphinx-rs?rev=4f3485a687f6e518282c981d9ecf0adfca38e947#4f3485a687f6e518282c981d9ecf0adfca38e947"
source = "git+https://github.com/stakwork/sphinx-rs?rev=b4b0f78f24e91cb632ef3d99a3304435c9dd3540#b4b0f78f24e91cb632ef3d99a3304435c9dd3540"
dependencies = [
"anyhow",
"base64 0.21.2",
@@ -3280,7 +3280,7 @@ dependencies = [
[[package]]
name = "sphinx-glyph"
version = "0.1.2"
source = "git+https://github.com/stakwork/sphinx-rs?rev=4f3485a687f6e518282c981d9ecf0adfca38e947#4f3485a687f6e518282c981d9ecf0adfca38e947"
source = "git+https://github.com/stakwork/sphinx-rs?rev=b4b0f78f24e91cb632ef3d99a3304435c9dd3540#b4b0f78f24e91cb632ef3d99a3304435c9dd3540"
dependencies = [
"anyhow",
"hex",
@@ -3326,7 +3326,7 @@ dependencies = [
[[package]]
name = "sphinx-signer"
version = "0.1.0"
source = "git+https://github.com/stakwork/sphinx-rs?rev=4f3485a687f6e518282c981d9ecf0adfca38e947#4f3485a687f6e518282c981d9ecf0adfca38e947"
source = "git+https://github.com/stakwork/sphinx-rs?rev=b4b0f78f24e91cb632ef3d99a3304435c9dd3540#b4b0f78f24e91cb632ef3d99a3304435c9dd3540"
dependencies = [
"anyhow",
"bip39",

View File

@@ -39,8 +39,8 @@ vls-proxy = { git = "https://gitlab.com/lightning-signer/validating-li
# vls-protocol-client = { path = "../../vls/vls-protocol-client" }
# vls-proxy = { path = "../../vls/vls-proxy" }
lss-connector = { git = "https://github.com/stakwork/sphinx-rs", rev = "4f3485a687f6e518282c981d9ecf0adfca38e947" }
sphinx-signer = { git = "https://github.com/stakwork/sphinx-rs", rev = "4f3485a687f6e518282c981d9ecf0adfca38e947" }
lss-connector = { git = "https://github.com/stakwork/sphinx-rs", rev = "b4b0f78f24e91cb632ef3d99a3304435c9dd3540" }
sphinx-signer = { git = "https://github.com/stakwork/sphinx-rs", rev = "b4b0f78f24e91cb632ef3d99a3304435c9dd3540" }
# lss-connector = { path = "../../sphinx-rs/lss-connector" }
# sphinx-signer = { path = "../../sphinx-rs/signer" }

View File

@@ -49,7 +49,7 @@ impl MqttSignerPort {
if res_topic == topics::LSS_RES {
// send LSS instead
let lss_reply = self.send_lss(res).await?;
let (res_topic2, res2) = self.send_request_wait(topics::LSS_MSG, lss_reply).await?;
let (res_topic2, res2) = self.send_request_wait(&lss_reply.0, lss_reply.1).await?;
if res_topic2 != topics::VLS_RES {
log::warn!("ChainTracker got a topic NOT on {}", topics::VLS_RES);
}
@@ -70,8 +70,8 @@ impl MqttSignerPort {
Ok((reply.topic_end, reply.reply))
}
async fn send_lss(&self, message: Vec<u8>) -> Result<Vec<u8>> {
let (request, reply_rx) = LssReq::new(message);
async fn send_lss(&self, message: Vec<u8>) -> Result<(String, Vec<u8>)> {
let (request, reply_rx) = LssReq::new(topics::LSS_MSG.to_string(), message);
self.lss_tx.send(request).await?;
let res = reply_rx.await?;
Ok(res)

View File

@@ -143,13 +143,14 @@ impl ChannelReply {
/// Responses are received on the oneshot sender
#[derive(Debug)]
pub struct LssReq {
pub topic: String,
pub message: Vec<u8>,
pub reply_tx: oneshot::Sender<Vec<u8>>,
pub reply_tx: oneshot::Sender<(String, Vec<u8>)>,
}
impl LssReq {
pub fn new(message: Vec<u8>) -> (Self, oneshot::Receiver<Vec<u8>>) {
pub fn new(topic: String, message: Vec<u8>) -> (Self, oneshot::Receiver<(String, Vec<u8>)>) {
let (reply_tx, reply_rx) = oneshot::channel();
let cr = Self { message, reply_tx };
let cr = Self { topic, message, reply_tx };
(cr, reply_rx)
}
}

View File

@@ -175,23 +175,29 @@ impl<C: 'static + Client> SignerLoop<C> {
.unwrap_or([0u8; 33]);
let md = parser::raw_request_from_bytes(message, ticket, peer_id, dbid)?;
// send to signer
let the_res = loop {
log::info!("SEND ON {}", topics::VLS);
let (res_topic, res) = self.send_request_wait(topics::VLS, md)?;
let (res_topic, res) = self.send_request_wait(topics::VLS, md.clone())?;
log::info!("GOT ON {}", res_topic);
let the_res = if res_topic == topics::LSS_RES {
if res_topic == topics::LSS_RES {
// send reply to LSS to store muts
let lss_reply = self.send_lss(res)?;
log::info!("LSS REPLY LEN {}", &lss_reply.len());
let lss_reply = self.send_lss(topics::LSS_MSG.to_string(), res)?;
log::info!("LSS REPLY LEN {}", &lss_reply.1.len());
// send to signer for HMAC validation, and get final reply
log::info!("SEND ON {}", topics::LSS_MSG);
let (res_topic2, res2) = self.send_request_wait(topics::LSS_MSG, lss_reply)?;
log::info!("GOT ON {}, send to CLN", res_topic2);
log::info!("SEND ON {}", lss_reply.0);
let (res_topic2, res2) = self.send_request_wait(&lss_reply.0, lss_reply.1)?;
log::info!("GOT ON {}, send to CLN?", res_topic2);
if res_topic2 != topics::VLS_RES {
log::warn!("got a topic NOT on {}", topics::VLS_RES);
}
res2
if res_topic2 == topics::LSS_CONFLICT_RES {
// try again...
continue;
}
break res2;
} else {
res
break res;
};
};
// create reply bytes for CLN
let reply = parser::raw_response_from_bytes(the_res, ticket)?;
@@ -237,9 +243,9 @@ impl<C: 'static + Client> SignerLoop<C> {
Ok((reply.topic_end, reply.reply))
}
fn send_lss(&mut self, message: Vec<u8>) -> Result<Vec<u8>> {
fn send_lss(&mut self, topic: String, message: Vec<u8>) -> Result<(String, Vec<u8>)> {
// Send a request to the LSS server
let (request, reply_rx) = LssReq::new(message);
let (request, reply_rx) = LssReq::new(topic, message);
self.lss_tx.blocking_send(request).map_err(|_| Error::Eof)?;
let res = reply_rx.blocking_recv().map_err(|_| Error::Eof)?;
Ok(res)