diff --git a/ark-lib/src/oor.rs b/ark-lib/src/oor.rs index 3b1ad5a..c27a9e0 100644 --- a/ark-lib/src/oor.rs +++ b/ark-lib/src/oor.rs @@ -2,7 +2,10 @@ use std::io; -use bitcoin::{Amount, FeeRate, OutPoint, ScriptBuf, Sequence, Transaction, TxIn, TxOut, Weight, Witness}; +use bitcoin::{ + Amount, FeeRate, OutPoint, ScriptBuf, Sequence, Transaction, Txid, TxIn, TxOut, Weight, + Witness, +}; use bitcoin::hashes::Hash; use bitcoin::secp256k1::{schnorr, KeyPair, PublicKey}; use bitcoin::sighash::{self, SighashCache, TapSighash, TapSighashType}; @@ -52,6 +55,10 @@ impl OorPayment { } } + pub fn txid(&self) -> Txid { + self.unsigned_transaction().txid() + } + pub fn sighashes(&self) -> Vec { let tx = self.unsigned_transaction(); let prevs = self.inputs.iter().map(|i| i.txout()).collect::>(); diff --git a/arkd-rpc-client/src/arkd.rs b/arkd-rpc-client/src/arkd.rs index e4929ff..3b867dc 100644 --- a/arkd-rpc-client/src/arkd.rs +++ b/arkd-rpc-client/src/arkd.rs @@ -687,6 +687,28 @@ pub mod admin_service_client { .insert(GrpcMethod::new("arkd.AdminService", "WalletStatus")); self.inner.unary(req, path, codec).await } + pub async fn trigger_round( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/arkd.AdminService/TriggerRound", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("arkd.AdminService", "TriggerRound")); + self.inner.unary(req, path, codec).await + } pub async fn stop( &mut self, request: impl tonic::IntoRequest, diff --git a/arkd/rpc-protos/arkd.proto b/arkd/rpc-protos/arkd.proto index f36c156..caa07f5 100644 --- a/arkd/rpc-protos/arkd.proto +++ b/arkd/rpc-protos/arkd.proto @@ -170,6 +170,7 @@ message VtxoSignaturesRequest { /// Administration service for arkd. service AdminService { rpc WalletStatus(Empty) returns (WalletStatusResponse) {} + rpc TriggerRound(Empty) returns (Empty) {} rpc Stop(Empty) returns (Empty) {} } diff --git a/arkd/src/lib.rs b/arkd/src/lib.rs index 9ddfc16..a83346c 100644 --- a/arkd/src/lib.rs +++ b/arkd/src/lib.rs @@ -92,6 +92,7 @@ pub struct App { round_busy: Arc>, round_event_tx: tokio::sync::broadcast::Sender, round_input_tx: tokio::sync::mpsc::UnboundedSender, + round_trigger_tx: tokio::sync::mpsc::Sender<()>, } impl App { @@ -168,6 +169,7 @@ impl App { let (round_event_tx, _rx) = tokio::sync::broadcast::channel(8); let (round_input_tx, round_input_rx) = tokio::sync::mpsc::unbounded_channel(); + let (round_trigger_tx, round_trigger_rx) = tokio::sync::mpsc::channel(1); let ret = Arc::new(App { config: config, @@ -180,6 +182,7 @@ impl App { round_busy: Arc::new(RwLock::new(())), round_event_tx: round_event_tx, round_input_tx: round_input_tx, + round_trigger_tx: round_trigger_tx, }); let app = ret.clone(); @@ -193,7 +196,7 @@ impl App { ret = rpcserver::run_admin_rpc_server(app.clone()) => { ret.context("error from admin gRPC server") }, - ret = round::run_round_scheduler(app.clone(), round_input_rx) => { + ret = round::run_round_scheduler(app.clone(), round_input_rx, round_trigger_rx) => { ret.context("error from round scheduler") }, } @@ -202,7 +205,7 @@ impl App { ret = rpcserver::run_public_rpc_server(app.clone()) => { ret.context("error from public gRPC server") }, - ret = round::run_round_scheduler(app.clone(), round_input_rx) => { + ret = round::run_round_scheduler(app.clone(), round_input_rx, round_trigger_rx) => { ret.context("error from round scheduler") }, } @@ -271,7 +274,7 @@ impl App { if let Some(dup) = self.db.atomic_check_mark_oors_cosigned(ids.iter().copied())? { bail!("attempted to double sign OOR for vtxo {}", dup) } else { - info!("Cosigning OOR tx with inputs: {:?}", ids); + info!("Cosigning OOR tx {} with inputs: {:?}", payment.txid(), ids); let (nonces, sigs) = payment.sign_asp(&self.master_key, &user_nonces); Ok((nonces, sigs)) } diff --git a/arkd/src/main.rs b/arkd/src/main.rs index 04eaaa4..63484a8 100644 --- a/arkd/src/main.rs +++ b/arkd/src/main.rs @@ -46,10 +46,10 @@ enum Command { DropOorConflicts, #[command()] Rpc { - #[command(subcommand)] - cmd: RpcCommand, #[arg(long, default_value = RPC_ADDR)] addr: String, + #[command(subcommand)] + cmd: RpcCommand, }, } @@ -59,6 +59,8 @@ enum RpcCommand { Balance, #[command()] GetAddress, + #[command()] + TriggerRound, /// Stop arkd. #[command()] Stop, @@ -141,8 +143,12 @@ async fn inner_main() -> anyhow::Result<()> { } async fn run_rpc(addr: &str, cmd: RpcCommand) -> anyhow::Result<()> { - let asp_endpoint = tonic::transport::Uri::from_str(&format!("http://{}", addr)) - .context("invalid asp addr")?; + let addr = if addr.starts_with("http") { + addr.to_owned() + } else { + format!("http://{}", addr) + }; + let asp_endpoint = tonic::transport::Uri::from_str(&addr).context("invalid asp addr")?; let mut asp = rpc::AdminServiceClient::connect(asp_endpoint) .await.context("failed to connect to asp")?; @@ -155,6 +161,9 @@ async fn run_rpc(addr: &str, cmd: RpcCommand) -> anyhow::Result<()> { let res = asp.wallet_status(rpc::Empty {}).await?.into_inner(); println!("{}", res.address); }, + RpcCommand::TriggerRound => { + asp.trigger_round(rpc::Empty {}).await?.into_inner(); + } RpcCommand::Stop => unimplemented!(), } Ok(()) diff --git a/arkd/src/round/mod.rs b/arkd/src/round/mod.rs index e6b8231..f190d01 100644 --- a/arkd/src/round/mod.rs +++ b/arkd/src/round/mod.rs @@ -150,6 +150,7 @@ fn validate_forfeit_sigs( pub async fn run_round_scheduler( app: Arc, mut round_input_rx: tokio::sync::mpsc::UnboundedReceiver, + mut round_trigger_rx: tokio::sync::mpsc::Receiver<()>, ) -> anyhow::Result<()> { let cfg = &app.config; @@ -165,6 +166,10 @@ pub async fn run_round_scheduler( 'sleep: loop { tokio::select! { () = &mut timeout => break 'sleep, + Some(()) = round_trigger_rx.recv() => { + info!("Starting round based on admin RPC trigger"); + break 'sleep; + }, _ = round_input_rx.recv() => {}, } } diff --git a/arkd/src/rpc/arkd.rs b/arkd/src/rpc/arkd.rs index 888a2a0..77a7b6a 100644 --- a/arkd/src/rpc/arkd.rs +++ b/arkd/src/rpc/arkd.rs @@ -949,6 +949,10 @@ pub mod admin_service_server { tonic::Response, tonic::Status, >; + async fn trigger_round( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; async fn stop( &self, request: tonic::Request, @@ -1078,6 +1082,50 @@ pub mod admin_service_server { }; Box::pin(fut) } + "/arkd.AdminService/TriggerRound" => { + #[allow(non_camel_case_types)] + struct TriggerRoundSvc(pub Arc); + impl tonic::server::UnaryService + for TriggerRoundSvc { + type Response = super::Empty; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::trigger_round(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = TriggerRoundSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } "/arkd.AdminService/Stop" => { #[allow(non_camel_case_types)] struct StopSvc(pub Arc); diff --git a/arkd/src/rpcserver.rs b/arkd/src/rpcserver.rs index 9db31d1..2a60909 100644 --- a/arkd/src/rpcserver.rs +++ b/arkd/src/rpcserver.rs @@ -332,6 +332,18 @@ impl rpc::AdminService for Arc { })) } + async fn trigger_round( + &self, + _req: tonic::Request, + ) -> Result, tonic::Status> { + match self.round_trigger_tx.try_send(()) { + Err(tokio::sync::mpsc::error::TrySendError::Closed(())) => { + Err(internal!("round scheduler closed")) + }, + _ => Ok(tonic::Response::new(rpc::Empty {})), + } + } + async fn stop( &self, _req: tonic::Request, diff --git a/noah/src/exit.rs b/noah/src/exit.rs index 3358b1f..a2cc47f 100644 --- a/noah/src/exit.rs +++ b/noah/src/exit.rs @@ -116,11 +116,10 @@ impl Wallet { // Broadcast exit txs. for tx in &exit.broadcast { + trace!("Broadcasting tx {}: {}", tx.txid(), bitcoin::consensus::encode::serialize_hex(tx)); if let Err(e) = self.onchain.broadcast_tx(tx).await { error!("Error broadcasting exit tx {}: {}", tx.txid(), e); - trace!("Error broadcasting exit tx {}: {}", - bitcoin::consensus::encode::serialize_hex(tx), e, - ); + error!("Tx {}: {}", tx.txid(), bitcoin::consensus::encode::serialize_hex(tx)); } } @@ -222,7 +221,6 @@ impl Wallet { let sighash = shc.taproot_script_spend_signature_hash( i, &sighash::Prevouts::All(&prevouts), leaf_hash, sighash::TapSighashType::Default, ).expect("all prevouts provided"); - trace!("sighash: {}", sighash); assert_eq!(vtxo_key.public_key(), claim.spec.user_pubkey); let sig = SECP.sign_schnorr(&sighash.into(), &vtxo_key);