diff --git a/forwarding_history.go b/forwarding_history.go index 7fd4f18..997c6b5 100644 --- a/forwarding_history.go +++ b/forwarding_history.go @@ -38,12 +38,12 @@ func (cfe *copyFromEvents) Err() error { return cfe.err } -func channelsSynchronize(client chainrpc.ChainNotifierClient) { +func channelsSynchronize(client *LndClient) { lastSync := time.Now().Add(-6 * time.Minute) for { cancellableCtx, cancel := context.WithCancel(context.Background()) clientCtx := metadata.AppendToOutgoingContext(cancellableCtx, "macaroon", os.Getenv("LND_MACAROON_HEX")) - stream, err := client.RegisterBlockEpochNtfn(clientCtx, &chainrpc.BlockEpoch{}) + stream, err := client.chainNotifierClient.RegisterBlockEpochNtfn(clientCtx, &chainrpc.BlockEpoch{}) if err != nil { log.Printf("chainNotifierClient.RegisterBlockEpochNtfn(): %v", err) cancel() @@ -59,7 +59,7 @@ func channelsSynchronize(client chainrpc.ChainNotifierClient) { } if lastSync.Add(5 * time.Minute).Before(time.Now()) { time.Sleep(30 * time.Second) - err = channelsSynchronizeOnce() + err = channelsSynchronizeOnce(client) lastSync = time.Now() log.Printf("channelsSynchronizeOnce() err: %v", err) } @@ -68,10 +68,10 @@ func channelsSynchronize(client chainrpc.ChainNotifierClient) { } } -func channelsSynchronizeOnce() error { +func channelsSynchronizeOnce(client *LndClient) error { log.Printf("channelsSynchronizeOnce - begin") clientCtx := metadata.AppendToOutgoingContext(context.Background(), "macaroon", os.Getenv("LND_MACAROON_HEX")) - channels, err := client.ListChannels(clientCtx, &lnrpc.ListChannelsRequest{PrivateOnly: true}) + channels, err := client.client.ListChannels(clientCtx, &lnrpc.ListChannelsRequest{PrivateOnly: true}) if err != nil { log.Printf("ListChannels error: %v", err) return fmt.Errorf("client.ListChannels() error: %w", err) @@ -102,15 +102,15 @@ func channelsSynchronizeOnce() error { return nil } -func forwardingHistorySynchronize() { +func forwardingHistorySynchronize(client *LndClient) { for { - err := forwardingHistorySynchronizeOnce() + err := forwardingHistorySynchronizeOnce(client) log.Printf("forwardingHistorySynchronizeOnce() err: %v", err) time.Sleep(1 * time.Minute) } } -func forwardingHistorySynchronizeOnce() error { +func forwardingHistorySynchronizeOnce(client *LndClient) error { last, err := lastForwardingEvent() if err != nil { return fmt.Errorf("lastForwardingEvent() error: %w", err) @@ -126,7 +126,7 @@ func forwardingHistorySynchronizeOnce() error { clientCtx := metadata.AppendToOutgoingContext(context.Background(), "macaroon", os.Getenv("LND_MACAROON_HEX")) indexOffset := uint32(0) for { - forwardHistory, err := client.ForwardingHistory(clientCtx, &lnrpc.ForwardingHistoryRequest{ + forwardHistory, err := client.client.ForwardingHistory(clientCtx, &lnrpc.ForwardingHistoryRequest{ StartTime: uint64(last), EndTime: endTime, NumMaxEvents: 10000, diff --git a/intercept.go b/intercept.go index 1c20b48..30b3640 100644 --- a/intercept.go +++ b/intercept.go @@ -113,11 +113,11 @@ func failForwardSend(interceptorClient routerrpc.Router_HtlcInterceptorClient, i }) } -func intercept() { +func intercept(client *LndClient) { for { cancellableCtx, cancel := context.WithCancel(context.Background()) clientCtx := metadata.AppendToOutgoingContext(cancellableCtx, "macaroon", os.Getenv("LND_MACAROON_HEX")) - interceptorClient, err := routerClient.HtlcInterceptor(clientCtx) + interceptorClient, err := client.routerClient.HtlcInterceptor(clientCtx) if err != nil { log.Printf("routerClient.HtlcInterceptor(): %v", err) cancel() @@ -162,7 +162,7 @@ func intercept() { if fundingTxID == nil { if bytes.Compare(paymentHash, request.PaymentHash) == 0 { - fundingTxID, fundingTxOutnum, err = openChannel(clientCtx, client, request.PaymentHash, destination, incomingAmountMsat) + fundingTxID, fundingTxOutnum, err = openChannel(clientCtx, client.client, request.PaymentHash, destination, incomingAmountMsat) log.Printf("openChannel(%x, %v) err: %v", destination, incomingAmountMsat, err) if err != nil { failForwardSend(interceptorClient, request.IncomingCircuitKey) @@ -170,7 +170,7 @@ func intercept() { } } else { //probing failureCode := lnrpc.Failure_TEMPORARY_CHANNEL_FAILURE - if err := isConnected(clientCtx, client, destination); err == nil { + if err := isConnected(clientCtx, client.client, destination); err == nil { failureCode = lnrpc.Failure_INCORRECT_OR_UNKNOWN_PAYMENT_DETAILS } interceptorClient.Send(&routerrpc.ForwardHtlcInterceptResponse{ @@ -281,7 +281,7 @@ func resumeOrCancel( ) { deadline := time.Now().Add(10 * time.Second) for { - initialChanID, confirmedChanID := getChannel(ctx, client, destination, channelPoint) + initialChanID, confirmedChanID := getChannel(ctx, client.client, destination, channelPoint) if initialChanID != 0 { interceptorClient.Send(&routerrpc.ForwardHtlcInterceptResponse{ IncomingCircuitKey: incomingCircuitKey, diff --git a/server.go b/server.go index 88dca73..ff84cc6 100644 --- a/server.go +++ b/server.go @@ -3,14 +3,12 @@ package main import ( "context" "crypto/tls" - "crypto/x509" "encoding/hex" "fmt" "log" "net" "os" "strconv" - "strings" "github.com/breez/lspd/btceclegacy" lspdrpc "github.com/breez/lspd/rpc" @@ -18,16 +16,13 @@ import ( "github.com/btcsuite/btcd/btcec/v2" "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/wire" "github.com/caddyserver/certmagic" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" - "github.com/lightningnetwork/lnd/lnrpc" - "github.com/lightningnetwork/lnd/lnrpc/chainrpc" - "github.com/lightningnetwork/lnd/lnrpc/routerrpc" "github.com/lightningnetwork/lnd/lnwire" "golang.org/x/sync/singleflight" "google.golang.org/grpc" "google.golang.org/grpc/codes" - "google.golang.org/grpc/credentials" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" ) @@ -48,9 +43,7 @@ const ( type server struct{} var ( - client lnrpc.LightningClient - routerClient routerrpc.RouterClient - chainNotifierClient chainrpc.ChainNotifierClient + client *LndClient openChannelReqGroup singleflight.Group privateKey *btcec.PrivateKey publicKey *btcec.PublicKey @@ -105,12 +98,12 @@ func (s *server) RegisterPayment(ctx context.Context, in *lspdrpc.RegisterPaymen func (s *server) OpenChannel(ctx context.Context, in *lspdrpc.OpenChannelRequest) (*lspdrpc.OpenChannelReply, error) { r, err, _ := openChannelReqGroup.Do(in.Pubkey, func() (interface{}, error) { - clientCtx := metadata.AppendToOutgoingContext(context.Background(), "macaroon", os.Getenv("LND_MACAROON_HEX")) - nodeChannels, err := getNodeChannels(in.Pubkey) + pubkey, err := hex.DecodeString(in.Pubkey) if err != nil { return nil, err } - pendingChannels, err := getPendingNodeChannels(in.Pubkey) + + channelCount, err := client.GetNodeChannelCount(pubkey) if err != nil { return nil, err } @@ -126,33 +119,25 @@ func (s *server) OpenChannel(ctx context.Context, in *lspdrpc.OpenChannelRequest } log.Printf("os.Getenv(\"CHANNEL_PRIVATE\"): %v, isPrivate: %v, err: %v", os.Getenv("CHANNEL_PRIVATE"), isPrivate, err) - var txidStr string - var outputIndex uint32 - if len(nodeChannels) == 0 && len(pendingChannels) == 0 { - response, err := client.OpenChannelSync(clientCtx, &lnrpc.OpenChannelRequest{ - LocalFundingAmount: channelAmount, - NodePubkeyString: in.Pubkey, - PushSat: 0, - TargetConf: targetConf, - MinHtlcMsat: minHtlcMsat, - Private: isPrivate, + var outPoint *wire.OutPoint + if channelCount == 0 { + outPoint, err = client.OpenChannel(&OpenChannelRequest{ + CapacitySat: uint64(channelAmount), + Destination: pubkey, + TargetConf: targetConf, + MinHtlcMsat: minHtlcMsat, + IsPrivate: isPrivate, }) - log.Printf("Response from OpenChannel: %#v (TX: %v)", response, hex.EncodeToString(response.GetFundingTxidBytes())) if err != nil { log.Printf("Error in OpenChannel: %v", err) return nil, err } - txid, _ := chainhash.NewHash(response.GetFundingTxidBytes()) - outputIndex = response.GetOutputIndex() - // don't fail the request in case we can't format the channel id from - // some reason... - if txid != nil { - txidStr = txid.String() - } + log.Printf("Response from OpenChannel: (TX: %v)", outPoint.String()) } - return &lspdrpc.OpenChannelReply{TxHash: txidStr, OutputIndex: outputIndex}, nil + + return &lspdrpc.OpenChannelReply{TxHash: outPoint.Hash.String(), OutputIndex: outPoint.Index}, nil }) if err != nil { @@ -212,10 +197,10 @@ func (s *server) CheckChannels(ctx context.Context, in *lspdrpc.Encrypted) (*lsp log.Printf("getNotFakeChannels(%v) error: %v", checkChannelsRequest.FakeChannels, err) return nil, fmt.Errorf("getNotFakeChannels(%v) error: %w", checkChannelsRequest.FakeChannels, err) } - closedChannels, err := getClosedChannels(nodeID, checkChannelsRequest.WaitingCloseChannels) + closedChannels, err := client.GetClosedChannels(nodeID, checkChannelsRequest.WaitingCloseChannels) if err != nil { - log.Printf("getNotFakeChannels(%v) error: %v", checkChannelsRequest.FakeChannels, err) - return nil, fmt.Errorf("getNotFakeChannels(%v) error: %w", checkChannelsRequest.FakeChannels, err) + log.Printf("GetClosedChannels(%v) error: %v", checkChannelsRequest.FakeChannels, err) + return nil, fmt.Errorf("GetClosedChannels(%v) error: %w", checkChannelsRequest.FakeChannels, err) } checkChannelsReply := lspdrpc.CheckChannelsReply{ NotFakeChannels: notFakeChannels, @@ -256,72 +241,6 @@ func getNotFakeChannels(nodeID string, channelPoints map[string]uint64) (map[str return r, nil } -func getClosedChannels(nodeID string, channelPoints map[string]uint64) (map[string]uint64, error) { - r := make(map[string]uint64) - if len(channelPoints) == 0 { - return r, nil - } - waitingCloseChannels, err := getWaitingCloseChannels(nodeID) - if err != nil { - return nil, err - } - wcc := make(map[string]struct{}) - for _, c := range waitingCloseChannels { - wcc[c.Channel.ChannelPoint] = struct{}{} - } - for c, h := range channelPoints { - if _, ok := wcc[c]; !ok { - r[c] = h - } - } - return r, nil -} - -func getWaitingCloseChannels(nodeID string) ([]*lnrpc.PendingChannelsResponse_WaitingCloseChannel, error) { - clientCtx := metadata.AppendToOutgoingContext(context.Background(), "macaroon", os.Getenv("LND_MACAROON_HEX")) - pendingResponse, err := client.PendingChannels(clientCtx, &lnrpc.PendingChannelsRequest{}) - if err != nil { - return nil, err - } - var waitingCloseChannels []*lnrpc.PendingChannelsResponse_WaitingCloseChannel - for _, p := range pendingResponse.WaitingCloseChannels { - if p.Channel.RemoteNodePub == nodeID { - waitingCloseChannels = append(waitingCloseChannels, p) - } - } - return waitingCloseChannels, nil -} - -func getNodeChannels(nodeID string) ([]*lnrpc.Channel, error) { - clientCtx := metadata.AppendToOutgoingContext(context.Background(), "macaroon", os.Getenv("LND_MACAROON_HEX")) - listResponse, err := client.ListChannels(clientCtx, &lnrpc.ListChannelsRequest{}) - if err != nil { - return nil, err - } - var nodeChannels []*lnrpc.Channel - for _, channel := range listResponse.Channels { - if channel.RemotePubkey == nodeID { - nodeChannels = append(nodeChannels, channel) - } - } - return nodeChannels, nil -} - -func getPendingNodeChannels(nodeID string) ([]*lnrpc.PendingChannelsResponse_PendingOpenChannel, error) { - clientCtx := metadata.AppendToOutgoingContext(context.Background(), "macaroon", os.Getenv("LND_MACAROON_HEX")) - pendingResponse, err := client.PendingChannels(clientCtx, &lnrpc.PendingChannelsRequest{}) - if err != nil { - return nil, err - } - var pendingChannels []*lnrpc.PendingChannelsResponse_PendingOpenChannel - for _, p := range pendingResponse.PendingOpenChannels { - if p.Channel.RemoteNodePub == nodeID { - pendingChannels = append(pendingChannels, p) - } - } - return pendingChannels, nil -} - func main() { if len(os.Args) > 1 && os.Args[1] == "genkey" { p, err := btcec.NewPrivateKey() @@ -363,25 +282,9 @@ func main() { } } - // Creds file to connect to LND gRPC - cp := x509.NewCertPool() - if !cp.AppendCertsFromPEM([]byte(strings.Replace(os.Getenv("LND_CERT"), "\\n", "\n", -1))) { - log.Fatalf("credentials: failed to append certificates") - } - creds := credentials.NewClientTLSFromCert(cp, "") + client = NewLndClient() - // Address of an LND instance - conn, err := grpc.Dial(os.Getenv("LND_ADDRESS"), grpc.WithTransportCredentials(creds)) - if err != nil { - log.Fatalf("Failed to connect to LND gRPC: %v", err) - } - defer conn.Close() - client = lnrpc.NewLightningClient(conn) - routerClient = routerrpc.NewRouterClient(conn) - chainNotifierClient = chainrpc.NewChainNotifierClient(conn) - - clientCtx := metadata.AppendToOutgoingContext(context.Background(), "macaroon", os.Getenv("LND_MACAROON_HEX")) - info, err := client.GetInfo(clientCtx, &lnrpc.GetInfoRequest{}) + info, err := client.GetInfo() if err != nil { log.Fatalf("client.GetInfo() error: %v", err) } @@ -389,13 +292,13 @@ func main() { nodeName = info.Alias } if nodePubkey == "" { - nodePubkey = info.IdentityPubkey + nodePubkey = info.Pubkey } - go intercept() + go intercept(client) - go forwardingHistorySynchronize() - go channelsSynchronize(chainNotifierClient) + go forwardingHistorySynchronize(client) + go channelsSynchronize(client) s := grpc.NewServer( grpc_middleware.WithUnaryServerChain(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {