testing pipeline

This commit is contained in:
callebtc
2022-07-10 19:51:13 +02:00
parent 9781343298
commit 4bddb90b32
6 changed files with 377 additions and 130 deletions

View File

@@ -10,8 +10,8 @@ import (
log "github.com/sirupsen/logrus"
)
// dispatchChannelAcceptor is the channel acceptor event loop
func (app *app) dispatchChannelAcceptor(ctx context.Context) {
// DispatchChannelAcceptor is the channel acceptor event loop
func (app *App) DispatchChannelAcceptor(ctx context.Context) {
// the channel event logger
go func() {
err := app.logChannelEvents(ctx)
@@ -36,9 +36,9 @@ func (app *app) dispatchChannelAcceptor(ctx context.Context) {
}
func (app *app) interceptChannelEvents(ctx context.Context) error {
func (app *App) interceptChannelEvents(ctx context.Context) error {
// get the lnd grpc connection
acceptClient, err := app.client.ChannelAcceptor(ctx)
acceptClient, err := app.lnd.channelAcceptor(ctx)
if err != nil {
panic(err)
}
@@ -50,7 +50,7 @@ func (app *app) interceptChannelEvents(ctx context.Context) error {
}
// print the incoming channel request
alias, err := app.getNodeAlias(ctx, hex.EncodeToString(req.NodePubkey))
alias, err := app.lnd.getNodeAlias(ctx, hex.EncodeToString(req.NodePubkey))
if err != nil {
log.Errorf(err.Error())
}
@@ -62,7 +62,7 @@ func (app *app) interceptChannelEvents(ctx context.Context) error {
}
log.Debugf("[channel] New channel request from %s", node_info_string)
info, err := app.getNodeInfo(ctx, hex.EncodeToString(req.NodePubkey))
info, err := app.lnd.getNodeInfo(ctx, hex.EncodeToString(req.NodePubkey))
if err != nil {
log.Errorf(err.Error())
}
@@ -129,8 +129,8 @@ func (app *app) interceptChannelEvents(ctx context.Context) error {
}
func (app *app) logChannelEvents(ctx context.Context) error {
stream, err := app.client.SubscribeChannelEvents(ctx, &lnrpc.ChannelEventSubscription{})
func (app *App) logChannelEvents(ctx context.Context) error {
stream, err := app.lnd.subscribeChannelEvents(ctx, &lnrpc.ChannelEventSubscription{})
if err != nil {
return err
}
@@ -141,7 +141,7 @@ func (app *app) logChannelEvents(ctx context.Context) error {
}
switch event.Type {
case lnrpc.ChannelEventUpdate_OPEN_CHANNEL:
alias, err := app.getNodeAlias(ctx, event.GetOpenChannel().RemotePubkey)
alias, err := app.lnd.getNodeAlias(ctx, event.GetOpenChannel().RemotePubkey)
if err != nil {
log.Errorf(err.Error())
alias = trimPubKey([]byte(event.GetOpenChannel().RemotePubkey))
@@ -152,6 +152,6 @@ func (app *app) logChannelEvents(ctx context.Context) error {
)
log.Infof("[channel] Opened channel %s %s", parse_channelID(event.GetOpenChannel().ChanId), channel_info_string)
}
// log.Debugf("[channel] Event: %s", event.String())
log.Tracef("[channel] Event: %s", event.String())
}
}

1
go.mod
View File

@@ -6,6 +6,7 @@ require (
github.com/jinzhu/configor v1.2.1
github.com/lightningnetwork/lnd v0.14.3-beta
github.com/sirupsen/logrus v1.8.1
github.com/stretchr/testify v1.7.0
google.golang.org/grpc v1.46.2
gopkg.in/macaroon.v2 v2.1.0
)

View File

@@ -1,17 +1,12 @@
package main
import (
"context"
"encoding/base64"
"encoding/hex"
"errors"
"fmt"
"math/big"
"strconv"
"time"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/routing/route"
log "github.com/sirupsen/logrus"
)
@@ -58,76 +53,75 @@ func parse_channelID(e uint64) string {
return fmt.Sprintf("%dx%dx%d", int_block3, int_block2, int_block1)
}
// Heavily inspired by by Joost Jager's circuitbreaker
func (app *app) getNodeInfo(ctx context.Context, pubkey string) (nodeInfo *lnrpc.NodeInfo, err error) {
client := app.client
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
// These functions are inspired by by Joost Jager's circuitbreaker
info, err := client.GetNodeInfo(ctx, &lnrpc.NodeInfoRequest{
PubKey: pubkey,
})
if err != nil {
return &lnrpc.NodeInfo{}, err
}
return info, nil
}
// // getNodeInfo returns the information of a node given a pubKey
// func (app *App) getNodeInfo(ctx context.Context, pubkey string) (nodeInfo *lnrpc.NodeInfo, err error) {
// ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
// defer cancel()
// getNodeAlias returns the alias of a node pubkey
func (app *app) getNodeAlias(ctx context.Context, pubkey string) (string, error) {
info, err := app.getNodeInfo(ctx, pubkey)
if err != nil {
return "", err
}
// info, err := app.client.GetNodeInfo(ctx, &lnrpc.NodeInfoRequest{
// PubKey: pubkey,
// })
// if err != nil {
// return &lnrpc.NodeInfo{}, err
// }
// return info, nil
// }
if info.Node == nil {
return "", errors.New("node info not available")
}
return info.Node.Alias, nil
}
// // getNodeAlias returns the alias of a node pubkey
// func (app *App) getNodeAlias(ctx context.Context, pubkey string) (string, error) {
// info, err := app.getNodeInfo(ctx, pubkey)
// if err != nil {
// return "", err
// }
// getMyPubkey returns the pubkey of my own node
func (app *app) getMyInfo(ctx context.Context) (*lnrpc.GetInfoResponse, error) {
client := app.client
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
// if info.Node == nil {
// return "", errors.New("node info not available")
// }
// return info.Node.Alias, nil
// }
info, err := client.GetInfo(ctx, &lnrpc.GetInfoRequest{})
if err != nil {
return &lnrpc.GetInfoResponse{}, err
}
return info, nil
}
// // getMyPubkey returns the pubkey of my own node
// func (app *App) getMyInfo(ctx context.Context) (*lnrpc.GetInfoResponse, error) {
// ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
// defer cancel()
type channelEdge struct {
node1Pub, node2Pub route.Vertex
}
// info, err := app.client.GetInfo(ctx, &lnrpc.GetInfoRequest{})
// if err != nil {
// return &lnrpc.GetInfoResponse{}, err
// }
// return info, nil
// }
// getPubKeyFromChannel returns the pubkey of the remote node in a channel
func (app *app) getPubKeyFromChannel(ctx context.Context, chan_id uint64) (*channelEdge, error) {
client := app.client
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
// type channelEdge struct {
// node1Pub, node2Pub route.Vertex
// }
info, err := client.GetChanInfo(ctx, &lnrpc.ChanInfoRequest{
ChanId: chan_id,
})
if err != nil {
return nil, err
}
// // getPubKeyFromChannel returns the pubkey of the remote node in a channel
// func (app *App) getPubKeyFromChannel(ctx context.Context, chan_id uint64) (*channelEdge, error) {
// ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
// defer cancel()
node1Pub, err := route.NewVertexFromStr(info.Node1Pub)
if err != nil {
return nil, err
}
// info, err := app.client.GetChanInfo(ctx, &lnrpc.ChanInfoRequest{
// ChanId: chan_id,
// })
// if err != nil {
// return nil, err
// }
node2Pub, err := route.NewVertexFromStr(info.Node2Pub)
if err != nil {
return nil, err
}
// node1Pub, err := route.NewVertexFromStr(info.Node1Pub)
// if err != nil {
// return nil, err
// }
return &channelEdge{
node1Pub: node1Pub,
node2Pub: node2Pub,
}, nil
}
// node2Pub, err := route.NewVertexFromStr(info.Node2Pub)
// if err != nil {
// return nil, err
// }
// return &channelEdge{
// node1Pub: node1Pub,
// node2Pub: node2Pub,
// }, nil
// }

View File

@@ -10,10 +10,8 @@ import (
log "github.com/sirupsen/logrus"
)
// dispatchHTLCAcceptor is the HTLC acceptor event loop
func (app *app) dispatchHTLCAcceptor(ctx context.Context) {
app.router = routerrpc.NewRouterClient(app.conn)
// DispatchHTLCAcceptor is the HTLC acceptor event loop
func (app *App) DispatchHTLCAcceptor(ctx context.Context) {
go func() {
err := app.logHtlcEvents(ctx)
if err != nil {
@@ -36,9 +34,9 @@ func (app *app) dispatchHTLCAcceptor(ctx context.Context) {
}
// interceptHtlcEvents intercepts incoming htlc events
func (app *app) interceptHtlcEvents(ctx context.Context) error {
func (app *App) interceptHtlcEvents(ctx context.Context) error {
// interceptor, decide whether to accept or reject
interceptor, err := app.router.HtlcInterceptor(ctx)
interceptor, err := app.lnd.htlcInterceptor(ctx)
if err != nil {
return err
}
@@ -52,35 +50,35 @@ func (app *app) interceptHtlcEvents(ctx context.Context) error {
decision_chan := make(chan bool, 1)
go app.htlcInterceptDecision(ctx, event, decision_chan)
channelEdge, err := app.getPubKeyFromChannel(ctx, event.IncomingCircuitKey.ChanId)
channelEdge, err := app.lnd.getPubKeyFromChannel(ctx, event.IncomingCircuitKey.ChanId)
if err != nil {
log.Error("[forward] Error getting pubkey for channel %s", parse_channelID(event.IncomingCircuitKey.ChanId))
log.Errorf("[forward] Error getting pubkey for channel %s", parse_channelID(event.IncomingCircuitKey.ChanId))
}
var pubkeyFrom, aliasFrom, pubkeyTo, aliasTo string
if channelEdge.node1Pub.String() != app.myPubkey {
pubkeyFrom = channelEdge.node1Pub.String()
if channelEdge.Node1Pub != app.myInfo.IdentityPubkey {
pubkeyFrom = channelEdge.Node1Pub
} else {
pubkeyFrom = channelEdge.node2Pub.String()
pubkeyFrom = channelEdge.Node2Pub
}
aliasFrom, err = app.getNodeAlias(ctx, pubkeyFrom)
aliasFrom, err = app.lnd.getNodeAlias(ctx, pubkeyFrom)
if err != nil {
aliasFrom = trimPubKey([]byte(pubkeyFrom))
log.Error("[forward] Error getting alias for node %s", aliasFrom)
log.Errorf("[forward] Error getting alias for node %s", aliasFrom)
}
channelEdgeTo, err := app.getPubKeyFromChannel(ctx, event.OutgoingRequestedChanId)
channelEdgeTo, err := app.lnd.getPubKeyFromChannel(ctx, event.OutgoingRequestedChanId)
if err != nil {
log.Error("[forward] Error getting pubkey for channel %s", parse_channelID(event.OutgoingRequestedChanId))
log.Errorf("[forward] Error getting pubkey for channel %s", parse_channelID(event.OutgoingRequestedChanId))
}
if channelEdgeTo.node1Pub.String() != app.myPubkey {
pubkeyTo = channelEdgeTo.node1Pub.String()
if channelEdgeTo.Node1Pub != app.myInfo.IdentityPubkey {
pubkeyTo = channelEdgeTo.Node1Pub
} else {
pubkeyTo = channelEdgeTo.node2Pub.String()
pubkeyTo = channelEdgeTo.Node2Pub
}
aliasTo, err = app.getNodeAlias(ctx, pubkeyTo)
aliasTo, err = app.lnd.getNodeAlias(ctx, pubkeyTo)
if err != nil {
aliasTo = trimPubKey([]byte(pubkeyTo))
log.Error("[forward] Error getting alias for node %s", aliasTo)
log.Errorf("[forward] Error getting alias for node %s", aliasTo)
}
forward_info_string := fmt.Sprintf(
@@ -118,7 +116,7 @@ func (app *app) interceptHtlcEvents(ctx context.Context) error {
// 1. Either use a allowlist or a denylist.
// 2. If a single channel ID is used (12320768x65536x0), check the incoming ID of the HTLC against the list.
// 3. If two channel IDs are used (7929856x65537x0->7143424x65537x0), check the incoming ID and the outgoing ID of the HTLC against the list.
func (app *app) htlcInterceptDecision(ctx context.Context, event *routerrpc.ForwardHtlcInterceptRequest, decision_chan chan bool) {
func (app *App) htlcInterceptDecision(ctx context.Context, event *routerrpc.ForwardHtlcInterceptRequest, decision_chan chan bool) {
var accept bool
var listToParse []string
@@ -158,9 +156,9 @@ func (app *app) htlcInterceptDecision(ctx context.Context, event *routerrpc.Forw
}
// logHtlcEvents reports on incoming htlc events
func (app *app) logHtlcEvents(ctx context.Context) error {
func (app *App) logHtlcEvents(ctx context.Context) error {
// htlc event subscriber, reports on incoming htlc events
stream, err := app.router.SubscribeHtlcEvents(ctx, &routerrpc.SubscribeHtlcEventsRequest{})
stream, err := app.lnd.subscribeHtlcEvents(ctx, &routerrpc.SubscribeHtlcEventsRequest{})
if err != nil {
return err
}

66
main.go
View File

@@ -21,12 +21,20 @@ const (
ctxKeyWaitGroup key = iota
)
type app struct {
client lnrpc.LightningClient
conn *grpc.ClientConn
router routerrpc.RouterClient
myInfo *lnrpc.GetInfoResponse
myPubkey string
type App struct {
lnd lndclient
myInfo *lnrpc.GetInfoResponse
}
func NewApp(ctx context.Context, lnd lndclient) *App {
myInfo, err := lnd.getMyInfo(ctx)
if err != nil {
log.Errorf("Could not get my node info: %s", err)
}
return &App{
lnd: lnd,
myInfo: myInfo,
}
}
// gets the lnd grpc connection
@@ -57,41 +65,39 @@ func getClientConnection(ctx context.Context) (*grpc.ClientConn, error) {
if err != nil {
return nil, err
}
return conn, nil
}
func newLndClient(ctx context.Context) (*LndClient, error) {
conn, err := getClientConnection(ctx)
if err != nil {
log.Errorf("Connection failed: %s", err)
return &LndClient{}, err
}
client := lnrpc.NewLightningClient(conn)
router := routerrpc.NewRouterClient(conn)
return &LndClient{
client: client,
conn: conn,
router: router,
}, nil
}
func main() {
ctx := context.Background()
for {
conn, err := getClientConnection(ctx)
lnd, err := newLndClient(ctx)
if err != nil {
log.Errorf("Could not connect to lnd: %s", err)
log.Errorf("Failed to create lnd client: %s", err)
return
}
client := lnrpc.NewLightningClient(conn)
app := app{
client: client,
conn: conn,
}
app := NewApp(ctx, lnd)
myInfo, err := app.getMyInfo(ctx)
if err != nil {
log.Errorf("Could not get my node info: %s", err)
continue
if len(app.myInfo.Alias) > 0 {
log.Infof("Connected to %s (%s)", app.myInfo.Alias, trimPubKey([]byte(app.myInfo.IdentityPubkey)))
} else {
app.myInfo = myInfo
app.myPubkey = myInfo.IdentityPubkey
}
myAlias := app.myInfo.Alias
if len(myAlias) > 0 {
log.Infof("Connected to %s (%s)", myAlias, trimPubKey([]byte(app.myPubkey)))
} else {
log.Infof("Connected to %s", app.myPubkey)
log.Infof("Connected to %s", app.myInfo.IdentityPubkey)
}
var wg sync.WaitGroup
@@ -99,10 +105,10 @@ func main() {
wg.Add(2)
// channel acceptor
app.dispatchChannelAcceptor(ctx)
app.DispatchChannelAcceptor(ctx)
// htlc acceptor
app.dispatchHTLCAcceptor(ctx)
app.DispatchHTLCAcceptor(ctx)
wg.Wait()
log.Info("All routines stopped. Waiting for new connection.")

248
main_test.go Normal file
View File

@@ -0,0 +1,248 @@
package main
import (
"context"
"errors"
"testing"
"time"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnrpc/routerrpc"
"github.com/stretchr/testify/require"
)
type lndclientMock struct {
htlcEvents chan *routerrpc.HtlcEvent
htlcInterceptorRequests chan *routerrpc.ForwardHtlcInterceptRequest
htlcInterceptorResponses chan *routerrpc.ForwardHtlcInterceptResponse
channelEvents chan *lnrpc.ChannelEventUpdate
channelAcceptorRequests chan *lnrpc.ChannelAcceptRequest
channelAcceptorResponse chan *lnrpc.ChannelAcceptResponse
}
func newLndclientMock() *lndclientMock {
return &lndclientMock{
htlcEvents: make(chan *routerrpc.HtlcEvent),
htlcInterceptorRequests: make(chan *routerrpc.ForwardHtlcInterceptRequest),
htlcInterceptorResponses: make(chan *routerrpc.ForwardHtlcInterceptResponse),
channelAcceptorRequests: make(chan *lnrpc.ChannelAcceptRequest),
channelAcceptorResponse: make(chan *lnrpc.ChannelAcceptResponse),
}
}
// --------------- Channel events mocks ---------------
type channelAcceptorMock struct {
lnrpc.Lightning_ChannelAcceptorClient
channelAcceptorRequests chan *lnrpc.ChannelAcceptRequest
channelAcceptorResponse chan *lnrpc.ChannelAcceptResponse
}
func (lnd *lndclientMock) channelAcceptor(ctx context.Context) (
lnrpc.Lightning_ChannelAcceptorClient, error) {
return &channelAcceptorMock{
channelAcceptorRequests: lnd.channelAcceptorRequests,
channelAcceptorResponse: lnd.channelAcceptorResponse,
}, nil
}
func (c *channelAcceptorMock) RecvMsg(m interface{}) error {
req := m.(*lnrpc.ChannelAcceptRequest)
c.channelAcceptorRequests <- req
return nil
}
type channelEventsMock struct {
lnrpc.Lightning_SubscribeChannelEventsClient
channelEvents chan *lnrpc.ChannelEventUpdate
}
func (h *channelEventsMock) Recv() (*lnrpc.ChannelEventUpdate, error) {
event := <-h.channelEvents
return event, nil
}
func (l *lndclientMock) subscribeChannelEvents(ctx context.Context,
in *lnrpc.ChannelEventSubscription) (
lnrpc.Lightning_SubscribeChannelEventsClient, error) {
return &channelEventsMock{
channelEvents: l.channelEvents,
}, nil
}
// --------------- Node info mocks ---------------
// getNodeInfo returns the information of a node given a pubKey
func (lnd *lndclientMock) getNodeInfo(ctx context.Context, pubkey string) (
nodeInfo *lnrpc.NodeInfo, err error) {
info := &lnrpc.NodeInfo{
Node: &lnrpc.LightningNode{
Alias: "alias-" + trimPubKey([]byte(pubkey)),
},
NumChannels: 2,
TotalCapacity: 1234,
}
return info, nil
}
// getNodeAlias returns the alias of a node pubkey
func (lnd *lndclientMock) getNodeAlias(ctx context.Context, pubkey string) (
string, error) {
info, err := lnd.getNodeInfo(ctx, pubkey)
if err != nil {
return "", err
}
if info.Node == nil {
return "", errors.New("node info not available")
}
return info.Node.Alias, nil
}
func (lnd *lndclientMock) getMyInfo(ctx context.Context) (
*lnrpc.GetInfoResponse, error) {
info := &lnrpc.GetInfoResponse{
IdentityPubkey: "my-pubkey-is-very-long-for-trimming-pubkey",
Alias: "my-alias",
}
return info, nil
}
func (lnd *lndclientMock) getPubKeyFromChannel(ctx context.Context, chan_id uint64) (
*lnrpc.ChannelEdge, error) {
return &lnrpc.ChannelEdge{
Node1Pub: "my-pubkey-is-very-long-for-trimming-pubkey",
Node2Pub: "other-pubkey-is-very-long-for-trimming-pubkey",
}, nil
}
// --------------- HTLC events mock ---------------
type htlcEventsMock struct {
routerrpc.Router_SubscribeHtlcEventsClient
htlcEvents chan *routerrpc.HtlcEvent
}
func (h *htlcEventsMock) Recv() (*routerrpc.HtlcEvent, error) {
event := <-h.htlcEvents
return event, nil
}
type htlcInterceptorMock struct {
routerrpc.Router_HtlcInterceptorClient
htlcInterceptorRequests chan *routerrpc.ForwardHtlcInterceptRequest
htlcInterceptorResponses chan *routerrpc.ForwardHtlcInterceptResponse
}
func (h *htlcInterceptorMock) Send(resp *routerrpc.ForwardHtlcInterceptResponse) error {
h.htlcInterceptorResponses <- resp
return nil
}
func (h *htlcInterceptorMock) Recv() (*routerrpc.ForwardHtlcInterceptRequest, error) {
event := <-h.htlcInterceptorRequests
return event, nil
}
func (l *lndclientMock) subscribeHtlcEvents(ctx context.Context,
in *routerrpc.SubscribeHtlcEventsRequest) (
routerrpc.Router_SubscribeHtlcEventsClient, error) {
return &htlcEventsMock{
htlcEvents: l.htlcEvents,
}, nil
}
func (l *lndclientMock) htlcInterceptor(ctx context.Context) (
routerrpc.Router_HtlcInterceptorClient, error) {
return &htlcInterceptorMock{
htlcInterceptorRequests: l.htlcInterceptorRequests,
htlcInterceptorResponses: l.htlcInterceptorResponses,
}, nil
}
func TestApp(t *testing.T) {
client := newLndclientMock()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
app := NewApp(ctx, client)
app.DispatchChannelAcceptor(ctx)
app.DispatchHTLCAcceptor(ctx)
time.Sleep(1 * time.Second)
cancel()
}
func TestHTLCFail(t *testing.T) {
client := newLndclientMock()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
app := NewApp(ctx, client)
app.DispatchHTLCAcceptor(ctx)
time.Sleep(1 * time.Second)
key := &routerrpc.CircuitKey{
ChanId: 770495967390531585,
HtlcId: 1337,
}
client.htlcInterceptorRequests <- &routerrpc.ForwardHtlcInterceptRequest{
IncomingCircuitKey: key,
OutgoingRequestedChanId: 759495353533530113,
}
resp := <-client.htlcInterceptorResponses
require.Equal(t, routerrpc.ResolveHoldForwardAction_FAIL, resp.Action)
client.htlcEvents <- &routerrpc.HtlcEvent{
EventType: routerrpc.HtlcEvent_FORWARD,
IncomingChannelId: key.ChanId,
IncomingHtlcId: key.HtlcId,
Event: &routerrpc.HtlcEvent_SettleEvent{},
}
}
func TestHTLCSuccess(t *testing.T) {
client := newLndclientMock()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
app := NewApp(ctx, client)
app.DispatchHTLCAcceptor(ctx)
time.Sleep(1 * time.Second)
key := &routerrpc.CircuitKey{
ChanId: 770495967390531585,
HtlcId: 1337,
}
client.htlcInterceptorRequests <- &routerrpc.ForwardHtlcInterceptRequest{
IncomingCircuitKey: key,
OutgoingRequestedChanId: 759495353533530113,
}
Configuration.ForwardMode = "allowlist"
Configuration.ForwardAllowlist = []string{"700762x1327x1->690757x1005x1"}
resp := <-client.htlcInterceptorResponses
require.Equal(t, routerrpc.ResolveHoldForwardAction_RESUME, resp.Action)
client.htlcEvents <- &routerrpc.HtlcEvent{
EventType: routerrpc.HtlcEvent_FORWARD,
IncomingChannelId: key.ChanId,
IncomingHtlcId: key.HtlcId,
Event: &routerrpc.HtlcEvent_SettleEvent{},
}
}