diff --git a/db.go b/db.go index f26f92c..acc3202 100644 --- a/db.go +++ b/db.go @@ -2,6 +2,7 @@ package main import ( "context" + "encoding/hex" "fmt" "log" "os" @@ -10,6 +11,7 @@ import ( "github.com/jackc/pgtype" "github.com/jackc/pgx/v4" "github.com/jackc/pgx/v4/pgxpool" + "github.com/lightningnetwork/lnd/lnwire" ) var ( @@ -86,6 +88,38 @@ func insertChannel(chanID uint64, channelPoint string, nodeID []byte, lastUpdate return nil } +func confirmedChannels(sNodeID string) (map[string]uint64, error) { + nodeID, err := hex.DecodeString(sNodeID) + if err != nil { + return nil, fmt.Errorf("hex.DecodeString(%v) error: %w", sNodeID, err) + } + rows, err := pgxPool.Query(context.Background(), + `SELECT chanid, channel_point + FROM channels + WHERE nodeid=$1`, + nodeID) + if err != nil { + return nil, fmt.Errorf("channels(%x) error: %w", nodeID, err) + } + defer rows.Close() + chans := make(map[string]uint64) + for rows.Next() { + var ( + chanID uint64 + channelPoint string + ) + err = rows.Scan(&chanID, &channelPoint) + if err != nil { + return nil, fmt.Errorf("channels(%x) rows.Scan error: %w", nodeID, err) + } + sid := lnwire.NewShortChanIDFromInt(chanID) + if !sid.IsFake() { + chans[channelPoint] = chanID + } + } + return chans, rows.Err() +} + func lastForwardingEvent() (int64, error) { var last int64 err := pgxPool.QueryRow(context.Background(), diff --git a/forwarding_history.go b/forwarding_history.go index 4310d8f..ac35725 100644 --- a/forwarding_history.go +++ b/forwarding_history.go @@ -9,6 +9,7 @@ import ( "time" "github.com/lightningnetwork/lnd/lnrpc" + "github.com/lightningnetwork/lnd/lnrpc/chainrpc" "google.golang.org/grpc/metadata" ) @@ -36,11 +37,31 @@ func (cfe *copyFromEvents) Err() error { return cfe.err } -func channelsSynchronize() { +func channelsSynchronize(client chainrpc.ChainNotifierClient) { + lastSync := time.Now().Add(-6 * time.Minute) for { - err := channelsSynchronizeOnce() - log.Printf("channelsSynchronizeOnce() err: %v", err) - time.Sleep(1 * time.Hour) + cancellableCtx, cancel := context.WithCancel(context.Background()) + clientCtx := metadata.AppendToOutgoingContext(cancellableCtx, "macaroon", os.Getenv("LND_MACAROON_HEX")) + stream, err := client.RegisterBlockEpochNtfn(clientCtx, &chainrpc.BlockEpoch{}) + if err != nil { + log.Printf("chainNotifierClient.RegisterBlockEpochNtfn(): %v", err) + cancel() + } + + for { + _, err := stream.Recv() + if err != nil { + log.Printf("stream.Recv: %v", err) + break + } + if lastSync.Add(5 * time.Minute).Before(time.Now()) { + time.Sleep(30 * time.Second) + err = channelsSynchronizeOnce() + lastSync = time.Now() + log.Printf("channelsSynchronizeOnce() err: %v", err) + } + } + cancel() } } diff --git a/go.mod b/go.mod index 7573a3a..f3f10eb 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( github.com/lightningnetwork/lnd v0.11.0-beta golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e google.golang.org/grpc v1.29.1 + google.golang.org/protobuf v1.23.0 ) replace github.com/lightningnetwork/lnd v0.11.0-beta => github.com/breez/lnd v0.11.0-beta.rc4.0.20210125150416-0c10146b223c diff --git a/server.go b/server.go index ec6cba0..d3732b2 100644 --- a/server.go +++ b/server.go @@ -19,6 +19,7 @@ import ( "github.com/caddyserver/certmagic" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" "github.com/lightningnetwork/lnd/lnrpc" + "github.com/lightningnetwork/lnd/lnrpc/chainrpc" "github.com/lightningnetwork/lnd/lnrpc/routerrpc" "github.com/lightningnetwork/lnd/lnwire" "golang.org/x/sync/singleflight" @@ -46,6 +47,7 @@ type server struct{} var ( client lnrpc.LightningClient routerClient routerrpc.RouterClient + chainNotifierClient chainrpc.ChainNotifierClient openChannelReqGroup singleflight.Group privateKey *btcec.PrivateKey publicKey *btcec.PublicKey @@ -229,16 +231,16 @@ func (s *server) CheckChannels(ctx context.Context, in *lspdrpc.Encrypted) (*lsp func getNotFakeChannels(nodeID string, channelPoints map[string]uint64) (map[string]uint64, error) { r := make(map[string]uint64) - channels, err := getNodeChannels(nodeID) + if len(channelPoints) == 0 { + return r, nil + } + channels, err := confirmedChannels(nodeID) if err != nil { return nil, err } - for _, c := range channels { - if _, ok := channelPoints[c.ChannelPoint]; ok { - sid := lnwire.NewShortChanIDFromInt(c.ChanId) - if !sid.IsFake() { - r[c.ChannelPoint] = c.ChanId - } + for channelPoint, chanID := range channels { + if _, ok := channelPoints[channelPoint]; ok { + r[channelPoint] = chanID } } return r, nil @@ -246,6 +248,9 @@ func getNotFakeChannels(nodeID string, channelPoints map[string]uint64) (map[str func getClosedChannels(nodeID string, channelPoints map[string]uint64) (map[string]uint64, error) { r := make(map[string]uint64) + if len(channelPoints) == 0 { + return r, nil + } waitingCloseChannels, err := getWaitingCloseChannels(nodeID) if err != nil { return nil, err @@ -363,6 +368,7 @@ func main() { defer conn.Close() client = lnrpc.NewLightningClient(conn) routerClient = routerrpc.NewRouterClient(conn) + chainNotifierClient = chainrpc.NewChainNotifierClient(conn) clientCtx := metadata.AppendToOutgoingContext(context.Background(), "macaroon", os.Getenv("LND_MACAROON_HEX")) info, err := client.GetInfo(clientCtx, &lnrpc.GetInfoRequest{}) @@ -379,7 +385,7 @@ func main() { go intercept() go forwardingHistorySynchronize() - go channelsSynchronize() + go channelsSynchronize(chainNotifierClient) s := grpc.NewServer( grpc_middleware.WithUnaryServerChain(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {