diff --git a/crates/cdk-signatory/Cargo.toml b/crates/cdk-signatory/Cargo.toml index 1e5990d5..4fbfa4ea 100644 --- a/crates/cdk-signatory/Cargo.toml +++ b/crates/cdk-signatory/Cargo.toml @@ -33,6 +33,7 @@ home.workspace = true thiserror.workspace = true tracing-subscriber.workspace = true tokio = { workspace = true, features = ["full"] } +tokio-stream.workspace = true [target.'cfg(target_arch = "wasm32")'.dependencies] tokio = { workspace = true, features = ["rt", "macros", "sync", "time"] } diff --git a/crates/cdk-signatory/src/bin/cli/mod.rs b/crates/cdk-signatory/src/bin/cli/mod.rs index 53feed28..798181be 100644 --- a/crates/cdk-signatory/src/bin/cli/mod.rs +++ b/crates/cdk-signatory/src/bin/cli/mod.rs @@ -15,7 +15,7 @@ use cdk_common::database::MintKeysDatabase; use cdk_common::CurrencyUnit; #[cfg(feature = "redb")] use cdk_redb::MintRedbDatabase; -use cdk_signatory::{db_signatory, grpc_server}; +use cdk_signatory::{db_signatory, start_grpc_server}; #[cfg(feature = "sqlite")] use cdk_sqlite::MintSqliteDatabase; use clap::Parser; @@ -167,7 +167,7 @@ pub async fn cli_main() -> Result<()> { let socket_addr = SocketAddr::from_str(&format!("{}:{}", args.listen_addr, args.listen_port))?; - grpc_server(signatory, socket_addr, certs).await?; + start_grpc_server(signatory, socket_addr, certs).await?; Ok(()) } diff --git a/crates/cdk-signatory/src/lib.rs b/crates/cdk-signatory/src/lib.rs index 6485b62d..348e3445 100644 --- a/crates/cdk-signatory/src/lib.rs +++ b/crates/cdk-signatory/src/lib.rs @@ -12,7 +12,9 @@ mod proto; #[cfg(feature = "grpc")] -pub use proto::{client::SignatoryRpcClient, server::grpc_server}; +pub use proto::{ + client::SignatoryRpcClient, server::start_grpc_server, server::start_grpc_server_with_incoming, +}; mod common; diff --git a/crates/cdk-signatory/src/proto/server.rs b/crates/cdk-signatory/src/proto/server.rs index 4fa6f95a..90f1f8da 100644 --- a/crates/cdk-signatory/src/proto/server.rs +++ b/crates/cdk-signatory/src/proto/server.rs @@ -1,12 +1,17 @@ +//! This module contains the generated gRPC server code for the Signatory service. use std::net::SocketAddr; use std::path::Path; +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio_stream::Stream; +use tonic::transport::server::Connected; use tonic::transport::{Certificate, Identity, Server, ServerTlsConfig}; use tonic::{Request, Response, Status}; use crate::proto::{self, signatory_server}; use crate::signatory::Signatory; +/// The server implementation for the Signatory service. pub struct CdkSignatoryServer where T: Signatory + Send + Sync + 'static, @@ -130,6 +135,7 @@ where } } +/// Error type for the gRPC server #[derive(thiserror::Error, Debug)] pub enum Error { /// Transport error @@ -141,7 +147,7 @@ pub enum Error { } /// Runs the signatory server -pub async fn grpc_server>( +pub async fn start_grpc_server>( signatory: T, addr: SocketAddr, tls_dir: Option, @@ -220,3 +226,23 @@ where .await?; Ok(()) } + +/// Starts the gRPC signatory server with an incoming stream of connections. +pub async fn start_grpc_server_with_incoming( + signatory: T, + incoming: I, +) -> Result<(), Error> +where + T: Signatory + Send + Sync + 'static, + I: Stream>, + IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static, + IE: Into>, +{ + Server::builder() + .add_service(signatory_server::SignatoryServer::new(CdkSignatoryServer { + inner: signatory, + })) + .serve_with_incoming(incoming) + .await?; + Ok(()) +}