From 8ca6ca87fdbfee3d60628df4ca0ed08877ad8895 Mon Sep 17 00:00:00 2001 From: Yaacov Akiba Slama Date: Fri, 19 Mar 2021 13:47:36 +0200 Subject: [PATCH] Synchronize private channels from lnd to the channels table every hour --- db.go | 3 ++- forwarding_history.go | 35 +++++++++++++++++++++++++++++++++++ server.go | 1 + 3 files changed, 38 insertions(+), 1 deletion(-) diff --git a/db.go b/db.go index f28f883..a2aeec5 100644 --- a/db.go +++ b/db.go @@ -74,7 +74,8 @@ func insertChannel(chanID uint64, channelPoint string, nodeID []byte) error { _, err := pgxPool.Exec(context.Background(), `INSERT INTO channels (chanid, channel_point, nodeid) - VALUES ($1, $2, $3)`, + VALUES ($1, $2, $3) + ON CONFLICT (chanid) DO NOTHING`, chanID, channelPoint, nodeID) if err != nil { return fmt.Errorf("insertChannel(%v, %s, %x) error: %w", diff --git a/forwarding_history.go b/forwarding_history.go index 5ee298f..48bc35d 100644 --- a/forwarding_history.go +++ b/forwarding_history.go @@ -2,6 +2,7 @@ package main import ( "context" + "encoding/hex" "fmt" "log" "os" @@ -35,6 +36,40 @@ func (cfe *copyFromEvents) Err() error { return cfe.err } +func channelsSynchronize() { + for { + err := channelsSynchronizeOnce() + log.Printf("channelsSynchronizeOnce() err: %v", err) + time.Sleep(1 * time.Hour) + } +} + +func channelsSynchronizeOnce() error { + log.Printf("channelsSynchronizeOnce - begin") + clientCtx := metadata.AppendToOutgoingContext(context.Background(), "macaroon", os.Getenv("LND_MACAROON_HEX")) + channels, err := client.ListChannels(clientCtx, &lnrpc.ListChannelsRequest{PrivateOnly: true}) + if err != nil { + log.Printf("ListChannels error: %v", err) + return fmt.Errorf("client.ListChannels() error: %w", err) + } + log.Printf("channelsSynchronizeOnce - received channels") + for _, c := range channels.Channels { + nodeID, err := hex.DecodeString(c.RemotePubkey) + if err != nil { + log.Printf("hex.DecodeString in channelsSynchronizeOnce error: %v", err) + continue + } + err = insertChannel(c.ChanId, c.ChannelPoint, nodeID) + if err != nil { + log.Printf("insertChannel(%v, %v, %x) in channelsSynchronizeOnce error: %v", c.ChanId, c.ChannelPoint, nodeID, err) + continue + } + } + log.Printf("channelsSynchronizeOnce - done") + + return nil +} + func forwardingHistorySynchronize() { for { err := forwardingHistorySynchronizeOnce() diff --git a/server.go b/server.go index 89b01ee..ec6cba0 100644 --- a/server.go +++ b/server.go @@ -379,6 +379,7 @@ func main() { go intercept() go forwardingHistorySynchronize() + go channelsSynchronize() s := grpc.NewServer( grpc_middleware.WithUnaryServerChain(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {