Synchronize channels to DB after each block and check channels from DB

This commit is contained in:
Yaacov Akiba Slama
2021-05-24 12:24:54 +03:00
parent 133c87ec27
commit ad31aa8921
4 changed files with 74 additions and 12 deletions

34
db.go
View File

@@ -2,6 +2,7 @@ package main
import ( import (
"context" "context"
"encoding/hex"
"fmt" "fmt"
"log" "log"
"os" "os"
@@ -10,6 +11,7 @@ import (
"github.com/jackc/pgtype" "github.com/jackc/pgtype"
"github.com/jackc/pgx/v4" "github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool" "github.com/jackc/pgx/v4/pgxpool"
"github.com/lightningnetwork/lnd/lnwire"
) )
var ( var (
@@ -86,6 +88,38 @@ func insertChannel(chanID uint64, channelPoint string, nodeID []byte, lastUpdate
return nil return nil
} }
func confirmedChannels(sNodeID string) (map[string]uint64, error) {
nodeID, err := hex.DecodeString(sNodeID)
if err != nil {
return nil, fmt.Errorf("hex.DecodeString(%v) error: %w", sNodeID, err)
}
rows, err := pgxPool.Query(context.Background(),
`SELECT chanid, channel_point
FROM channels
WHERE nodeid=$1`,
nodeID)
if err != nil {
return nil, fmt.Errorf("channels(%x) error: %w", nodeID, err)
}
defer rows.Close()
chans := make(map[string]uint64)
for rows.Next() {
var (
chanID uint64
channelPoint string
)
err = rows.Scan(&chanID, &channelPoint)
if err != nil {
return nil, fmt.Errorf("channels(%x) rows.Scan error: %w", nodeID, err)
}
sid := lnwire.NewShortChanIDFromInt(chanID)
if !sid.IsFake() {
chans[channelPoint] = chanID
}
}
return chans, rows.Err()
}
func lastForwardingEvent() (int64, error) { func lastForwardingEvent() (int64, error) {
var last int64 var last int64
err := pgxPool.QueryRow(context.Background(), err := pgxPool.QueryRow(context.Background(),

View File

@@ -9,6 +9,7 @@ import (
"time" "time"
"github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnrpc/chainrpc"
"google.golang.org/grpc/metadata" "google.golang.org/grpc/metadata"
) )
@@ -36,11 +37,31 @@ func (cfe *copyFromEvents) Err() error {
return cfe.err return cfe.err
} }
func channelsSynchronize() { func channelsSynchronize(client chainrpc.ChainNotifierClient) {
lastSync := time.Now().Add(-6 * time.Minute)
for { for {
err := channelsSynchronizeOnce() cancellableCtx, cancel := context.WithCancel(context.Background())
log.Printf("channelsSynchronizeOnce() err: %v", err) clientCtx := metadata.AppendToOutgoingContext(cancellableCtx, "macaroon", os.Getenv("LND_MACAROON_HEX"))
time.Sleep(1 * time.Hour) stream, err := client.RegisterBlockEpochNtfn(clientCtx, &chainrpc.BlockEpoch{})
if err != nil {
log.Printf("chainNotifierClient.RegisterBlockEpochNtfn(): %v", err)
cancel()
}
for {
_, err := stream.Recv()
if err != nil {
log.Printf("stream.Recv: %v", err)
break
}
if lastSync.Add(5 * time.Minute).Before(time.Now()) {
time.Sleep(30 * time.Second)
err = channelsSynchronizeOnce()
lastSync = time.Now()
log.Printf("channelsSynchronizeOnce() err: %v", err)
}
}
cancel()
} }
} }

1
go.mod
View File

@@ -17,6 +17,7 @@ require (
github.com/lightningnetwork/lnd v0.11.0-beta github.com/lightningnetwork/lnd v0.11.0-beta
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
google.golang.org/grpc v1.29.1 google.golang.org/grpc v1.29.1
google.golang.org/protobuf v1.23.0
) )
replace github.com/lightningnetwork/lnd v0.11.0-beta => github.com/breez/lnd v0.11.0-beta.rc4.0.20210125150416-0c10146b223c replace github.com/lightningnetwork/lnd v0.11.0-beta => github.com/breez/lnd v0.11.0-beta.rc4.0.20210125150416-0c10146b223c

View File

@@ -19,6 +19,7 @@ import (
"github.com/caddyserver/certmagic" "github.com/caddyserver/certmagic"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
"github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnrpc/chainrpc"
"github.com/lightningnetwork/lnd/lnrpc/routerrpc" "github.com/lightningnetwork/lnd/lnrpc/routerrpc"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
"golang.org/x/sync/singleflight" "golang.org/x/sync/singleflight"
@@ -46,6 +47,7 @@ type server struct{}
var ( var (
client lnrpc.LightningClient client lnrpc.LightningClient
routerClient routerrpc.RouterClient routerClient routerrpc.RouterClient
chainNotifierClient chainrpc.ChainNotifierClient
openChannelReqGroup singleflight.Group openChannelReqGroup singleflight.Group
privateKey *btcec.PrivateKey privateKey *btcec.PrivateKey
publicKey *btcec.PublicKey publicKey *btcec.PublicKey
@@ -229,16 +231,16 @@ func (s *server) CheckChannels(ctx context.Context, in *lspdrpc.Encrypted) (*lsp
func getNotFakeChannels(nodeID string, channelPoints map[string]uint64) (map[string]uint64, error) { func getNotFakeChannels(nodeID string, channelPoints map[string]uint64) (map[string]uint64, error) {
r := make(map[string]uint64) r := make(map[string]uint64)
channels, err := getNodeChannels(nodeID) if len(channelPoints) == 0 {
return r, nil
}
channels, err := confirmedChannels(nodeID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
for _, c := range channels { for channelPoint, chanID := range channels {
if _, ok := channelPoints[c.ChannelPoint]; ok { if _, ok := channelPoints[channelPoint]; ok {
sid := lnwire.NewShortChanIDFromInt(c.ChanId) r[channelPoint] = chanID
if !sid.IsFake() {
r[c.ChannelPoint] = c.ChanId
}
} }
} }
return r, nil return r, nil
@@ -246,6 +248,9 @@ func getNotFakeChannels(nodeID string, channelPoints map[string]uint64) (map[str
func getClosedChannels(nodeID string, channelPoints map[string]uint64) (map[string]uint64, error) { func getClosedChannels(nodeID string, channelPoints map[string]uint64) (map[string]uint64, error) {
r := make(map[string]uint64) r := make(map[string]uint64)
if len(channelPoints) == 0 {
return r, nil
}
waitingCloseChannels, err := getWaitingCloseChannels(nodeID) waitingCloseChannels, err := getWaitingCloseChannels(nodeID)
if err != nil { if err != nil {
return nil, err return nil, err
@@ -363,6 +368,7 @@ func main() {
defer conn.Close() defer conn.Close()
client = lnrpc.NewLightningClient(conn) client = lnrpc.NewLightningClient(conn)
routerClient = routerrpc.NewRouterClient(conn) routerClient = routerrpc.NewRouterClient(conn)
chainNotifierClient = chainrpc.NewChainNotifierClient(conn)
clientCtx := metadata.AppendToOutgoingContext(context.Background(), "macaroon", os.Getenv("LND_MACAROON_HEX")) clientCtx := metadata.AppendToOutgoingContext(context.Background(), "macaroon", os.Getenv("LND_MACAROON_HEX"))
info, err := client.GetInfo(clientCtx, &lnrpc.GetInfoRequest{}) info, err := client.GetInfo(clientCtx, &lnrpc.GetInfoRequest{})
@@ -379,7 +385,7 @@ func main() {
go intercept() go intercept()
go forwardingHistorySynchronize() go forwardingHistorySynchronize()
go channelsSynchronize() go channelsSynchronize(chainNotifierClient)
s := grpc.NewServer( s := grpc.NewServer(
grpc_middleware.WithUnaryServerChain(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { grpc_middleware.WithUnaryServerChain(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {