diff --git a/.gitignore b/.gitignore index 0e8fc16..6e68ecb 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,4 @@ Cargo.lock notes.md test-flash .env +teststore \ No newline at end of file diff --git a/broker/src/main.rs b/broker/src/main.rs index 15a925f..333eab6 100644 --- a/broker/src/main.rs +++ b/broker/src/main.rs @@ -53,6 +53,7 @@ pub struct ChannelReply { pub reply: Vec, } +const CLIENT_ID: &str = "sphinx-1"; const BROKER_CONFIG_PATH: &str = "../broker.conf"; #[rocket::launch] @@ -97,7 +98,7 @@ async fn run_main(parent_fd: i32) -> rocket::Rocket { let (status_tx, mut status_rx) = mpsc::channel(1000); let (error_tx, _) = broadcast::channel(1000); log::info!("=> start broker on network: {}", settings.network); - start_broker(rx, status_tx, error_tx.clone(), "sphinx-1", &settings).await; + start_broker(rx, status_tx, error_tx.clone(),CLIENT_ID, &settings).await; log::info!("=> wait for connected status"); // wait for connection = true let status = status_rx.recv().await.expect("couldnt receive"); diff --git a/signer/src/lib.rs b/signer/src/lib.rs index fff6438..bee79d6 100644 --- a/signer/src/lib.rs +++ b/signer/src/lib.rs @@ -27,12 +27,13 @@ pub struct InitResponse { pub init_reply: Vec, } -pub const ROOT_STORE: &str = "/sdcard/store"; +// pub const ROOT_STORE: &str = "/sdcard/store"; pub fn init( bytes: Vec, network: Network, po: &control::Policy, + root_store_path: &str, ) -> anyhow::Result { // let persister: Arc = Arc::new(DummyPersister); let mut md = MsgDriver::new(bytes); @@ -52,7 +53,7 @@ pub fn init( let policy = make_policy(network, po); let validator_factory = Arc::new(SimpleValidatorFactory::new_with_policy(policy)); let random_time_factory = RandomStartingTimeFactory::new(); - let persister: Arc = Arc::new(FsPersister::new(ROOT_STORE)); + let persister: Arc = Arc::new(FsPersister::new(root_store_path)); let clock = Arc::new(StandardClock()); let services = NodeServices { validator_factory, diff --git a/sphinx-key/src/core/events.rs b/sphinx-key/src/core/events.rs index ddc8d3a..568fb45 100644 --- a/sphinx-key/src/core/events.rs +++ b/sphinx-key/src/core/events.rs @@ -34,6 +34,8 @@ pub enum Status { Signing, } +pub const ROOT_STORE: &str = "/sdcard/store"; + // the main event loop #[cfg(not(feature = "pingpong"))] pub fn make_event_loop( @@ -69,7 +71,8 @@ pub fn make_event_loop( let InitResponse { root_handler, init_reply: _, - } = sphinx_key_signer::init(init_msg, network, policy).expect("failed to init signer"); + } = sphinx_key_signer::init(init_msg, network, policy, ROOT_STORE) + .expect("failed to init signer"); // signing loop let dummy_peer = PubKey([0; 33]); diff --git a/tester/src/main.rs b/tester/src/main.rs index 918efc9..9588134 100644 --- a/tester/src/main.rs +++ b/tester/src/main.rs @@ -4,7 +4,7 @@ use sphinx_key_signer::lightning_signer::bitcoin::Network; use clap::{App, AppSettings, Arg}; use dotenv::dotenv; -use rumqttc::{self, AsyncClient, Event, MqttOptions, Packet, QoS}; +use rumqttc::{self, AsyncClient, Event, EventLoop, MqttOptions, Packet, QoS}; use sphinx_key_signer::control::Controller; use sphinx_key_signer::vls_protocol::{model::PubKey, msgs}; use sphinx_key_signer::{self, InitResponse}; @@ -14,6 +14,8 @@ use std::error::Error; use std::str::FromStr; use std::time::Duration; +pub const ROOT_STORE: &str = "teststore"; + #[tokio::main(worker_threads = 1)] async fn main() -> Result<(), Box> { setup_logging("sphinx-key-tester ", "info"); @@ -38,12 +40,17 @@ async fn main() -> Result<(), Box> { let seed_string: String = env::var("SEED").expect("no seed"); let seed = hex::decode(seed_string).expect("couldnt decode seed"); // make the controller to validate Control messages - let mut ctrlr = controller_from_seed(&network, &seed); + let ctrlr = controller_from_seed(&network, &seed); let pubkey = hex::encode(&ctrlr.pubkey().serialize()); let token = ctrlr.make_auth_token()?; - let (client, mut eventloop) = loop { - let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883); + let client_id = if is_test { + "test-1" + } else { + "sphinx-1" + }; + let (client, eventloop) = loop { + let mut mqttoptions = MqttOptions::new(client_id, "localhost", 1883); mqttoptions.set_credentials(pubkey.clone(), token.clone()); mqttoptions.set_keep_alive(Duration::from_secs(5)); let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10); @@ -72,117 +79,125 @@ async fn main() -> Result<(), Box> { .expect("could not mqtt subscribe"); if is_test { - // test handler loop - loop { - match eventloop.poll().await { - Ok(event) => { - // println!("{:?}", event); - if let Some((topic, msg_bytes)) = incoming_bytes(event) { - match topic.as_str() { - topics::VLS => { - let (ping, sequence, dbid): (msgs::Ping, u16, u64) = - parser::request_from_bytes(msg_bytes) - .expect("read ping header"); - if is_log { - println!("sequence {}", sequence); - println!("dbid {}", dbid); - println!("INCOMING: {:?}", ping); - } - let pong = msgs::Pong { - id: ping.id, - message: ping.message, - }; - let bytes = parser::raw_response_from_msg(pong, sequence) - .expect("couldnt parse raw response"); + run_test(eventloop, &client, ctrlr, is_log).await; + } else { + run_main(eventloop, &client, ctrlr, is_log, &seed, network).await; + } + } +} + +async fn run_main(mut eventloop: EventLoop, client: &AsyncClient, mut ctrlr: Controller, is_log: bool, seed: &[u8], network: Network) { + let seed32: [u8; 32] = seed.try_into().expect("wrong seed"); + let init_msg = + sphinx_key_signer::make_init_msg(network, seed32).expect("failed to make init msg"); + let InitResponse { + root_handler, + init_reply: _, + } = sphinx_key_signer::init(init_msg, network, &Default::default(), ROOT_STORE) + .expect("failed to init signer"); + // the actual handler loop + loop { + match eventloop.poll().await { + Ok(event) => { + let dummy_peer = PubKey([0; 33]); + if let Some((topic, msg_bytes)) = incoming_bytes(event) { + match topic.as_str() { + topics::VLS => { + match sphinx_key_signer::handle( + &root_handler, + msg_bytes, + dummy_peer.clone(), + is_log, + ) { + Ok(b) => client + .publish(topics::VLS_RETURN, QoS::AtMostOnce, false, b) + .await + .expect("could not publish init response"), + Err(e) => panic!("HANDLE FAILED {:?}", e), + }; + } + topics::CONTROL => { + match ctrlr.handle(&msg_bytes) { + Ok((response, _new_policy)) => { client - .publish(topics::VLS_RETURN, QoS::AtMostOnce, false, bytes) + .publish( + topics::CONTROL_RETURN, + QoS::AtMostOnce, + false, + response, + ) .await .expect("could not mqtt publish"); } - topics::CONTROL => { - match ctrlr.handle(&msg_bytes) { - Ok((response, _new_policy)) => { - client - .publish( - topics::CONTROL_RETURN, - QoS::AtMostOnce, - false, - response, - ) - .await - .expect("could not mqtt publish"); - } - Err(e) => log::warn!("error parsing ctrl msg {:?}", e), - }; - } - _ => log::info!("invalid topic"), - } + Err(e) => log::warn!("error parsing ctrl msg {:?}", e), + }; } - } - Err(e) => { - log::warn!("diconnected {:?}", e); - tokio::time::sleep(Duration::from_secs(1)).await; - break; // break out of this loop to reconnect + _ => log::info!("invalid topic"), } } } - } else { - let seed32: [u8; 32] = seed.try_into().expect("wrong seed"); - let init_msg = - sphinx_key_signer::make_init_msg(network, seed32).expect("failed to make init msg"); - let InitResponse { - root_handler, - init_reply: _, - } = sphinx_key_signer::init(init_msg, network, &Default::default()) - .expect("failed to init signer"); - // the actual handler loop - loop { - match eventloop.poll().await { - Ok(event) => { - let dummy_peer = PubKey([0; 33]); - if let Some((topic, msg_bytes)) = incoming_bytes(event) { - match topic.as_str() { - topics::VLS => { - match sphinx_key_signer::handle( - &root_handler, - msg_bytes, - dummy_peer.clone(), - is_log, - ) { - Ok(b) => client - .publish(topics::VLS_RETURN, QoS::AtMostOnce, false, b) - .await - .expect("could not publish init response"), - Err(e) => panic!("HANDLE FAILED {:?}", e), - }; - } - topics::CONTROL => { - match ctrlr.handle(&msg_bytes) { - Ok((response, _new_policy)) => { - client - .publish( - topics::CONTROL_RETURN, - QoS::AtMostOnce, - false, - response, - ) - .await - .expect("could not mqtt publish"); - } - Err(e) => log::warn!("error parsing ctrl msg {:?}", e), - }; - } - _ => log::info!("invalid topic"), + Err(e) => { + log::warn!("diconnected {:?}", e); + tokio::time::sleep(Duration::from_secs(1)).await; + break; // break out of this loop to reconnect + } + } + } +} + +async fn run_test(mut eventloop: EventLoop, client: &AsyncClient, mut ctrlr: Controller, is_log: bool) { + // test handler loop + loop { + match eventloop.poll().await { + Ok(event) => { + // println!("{:?}", event); + if let Some((topic, msg_bytes)) = incoming_bytes(event) { + match topic.as_str() { + topics::VLS => { + let (ping, sequence, dbid): (msgs::Ping, u16, u64) = + parser::request_from_bytes(msg_bytes) + .expect("read ping header"); + if is_log { + println!("sequence {}", sequence); + println!("dbid {}", dbid); + println!("INCOMING: {:?}", ping); } + let pong = msgs::Pong { + id: ping.id, + message: ping.message, + }; + let bytes = parser::raw_response_from_msg(pong, sequence) + .expect("couldnt parse raw response"); + client + .publish(topics::VLS_RETURN, QoS::AtMostOnce, false, bytes) + .await + .expect("could not mqtt publish"); } - } - Err(e) => { - log::warn!("diconnected {:?}", e); - tokio::time::sleep(Duration::from_secs(1)).await; - break; // break out of this loop to reconnect + topics::CONTROL => { + match ctrlr.handle(&msg_bytes) { + Ok((response, _new_policy)) => { + client + .publish( + topics::CONTROL_RETURN, + QoS::AtMostOnce, + false, + response, + ) + .await + .expect("could not mqtt publish"); + } + Err(e) => log::warn!("error parsing ctrl msg {:?}", e), + }; + } + _ => log::info!("invalid topic"), } } } + Err(e) => { + log::warn!("diconnected {:?}", e); + tokio::time::sleep(Duration::from_secs(1)).await; + break; // break out of this loop to reconnect + } } } }