Add forwarding history sync from lnd to db

This commit is contained in:
Yaacov Akiba Slama
2021-02-05 07:11:53 +02:00
parent 22b2b365ed
commit aca8c4f2c0
9 changed files with 173 additions and 1 deletions

62
db.go
View File

@@ -69,3 +69,65 @@ func registerPayment(destination, paymentHash, paymentSecret []byte, incomingAmo
}
return nil
}
func insertChannel(chanID uint64, channelPoint string, nodeID []byte) error {
_, err := pgxPool.Exec(context.Background(),
`INSERT INTO
channels (chanis, channel_point, nodeid)
VALUES ($1, $2, $3)`,
chanID, channelPoint, nodeID)
if err != nil {
return fmt.Errorf("insertChannel(%v, %s, %x) error: %w",
chanID, channelPoint, nodeID, err)
}
return nil
}
func lastForwardingEvent() (int64, error) {
var last int64
err := pgxPool.QueryRow(context.Background(),
`SELECT coalesce(MAX("timestamp"), 0) AS last FROM forwarding_history`).Scan(&last)
if err != nil {
return 0, err
}
return last, nil
}
func insertForwardingEvents(rowSrc pgx.CopyFromSource) error {
tx, err := pgxPool.Begin(context.Background())
if err != nil {
return fmt.Errorf("pgxPool.Begin() error: %w", err)
}
defer tx.Rollback(context.Background())
_, err = tx.Exec(context.Background(), `
CREATE TEMP TABLE tmp_table ON COMMIT DROP AS
SELECT *
FROM forwarding_history
WITH NO DATA;
`)
if err != nil {
return fmt.Errorf("CREATE TEMP TABLE error: %w", err)
}
count, err := tx.CopyFrom(context.Background(),
pgx.Identifier{"tmp_table"},
[]string{"timestamp", "chanid_in", "chanid_out", "amt_msat_in", "amt_msat_out"}, rowSrc)
if err != nil {
return fmt.Errorf("CopyFrom() error: %w", err)
}
log.Printf("count1: %v", count)
cmdTag, err := tx.Exec(context.Background(), `
INSERT INTO forwarding_history
SELECT *
FROM tmp_table
ON CONFLICT DO NOTHING
`)
if err != nil {
return fmt.Errorf("INSERT INTO forwarding_history error: %w", err)
}
log.Printf("count2: %v", cmdTag.RowsAffected())
return tx.Commit(context.Background())
}

85
forwarding_history.go Normal file
View File

@@ -0,0 +1,85 @@
package main
import (
"context"
"fmt"
"log"
"os"
"time"
"github.com/lightningnetwork/lnd/lnrpc"
"google.golang.org/grpc/metadata"
)
type copyFromEvents struct {
events []*lnrpc.ForwardingEvent
idx int
err error
}
func (cfe *copyFromEvents) Next() bool {
cfe.idx++
return cfe.idx < len(cfe.events)
}
func (cfe *copyFromEvents) Values() ([]interface{}, error) {
event := cfe.events[cfe.idx]
values := []interface{}{
event.TimestampNs,
event.ChanIdIn, event.ChanIdOut,
event.AmtInMsat, event.AmtOutMsat}
return values, nil
}
func (cfe *copyFromEvents) Err() error {
return cfe.err
}
func forwardingHistorySynchronize() {
for {
err := forwardingHistorySynchronizeOnce()
log.Printf("forwardingHistorySynchronizeOnce() err: %v", err)
time.Sleep(1 * time.Minute)
}
}
func forwardingHistorySynchronizeOnce() error {
last, err := lastForwardingEvent()
if err != nil {
return fmt.Errorf("lastForwardingEvent() error: %w", err)
}
log.Printf("last1: %v", last)
last = last/1_000_000_000 - 1*3600
if last <= 0 {
last = 1
}
log.Printf("last2: %v", last)
now := time.Now()
endTime := uint64(now.Add(time.Hour * 24).Unix())
clientCtx := metadata.AppendToOutgoingContext(context.Background(), "macaroon", os.Getenv("LND_MACAROON_HEX"))
indexOffset := uint32(0)
for {
forwardHistory, err := client.ForwardingHistory(clientCtx, &lnrpc.ForwardingHistoryRequest{
StartTime: uint64(last),
EndTime: endTime,
NumMaxEvents: 10000,
IndexOffset: indexOffset,
})
if err != nil {
log.Printf("ForwardingHistory error: %v", err)
return fmt.Errorf("client.ForwardingHistory() error: %w", err)
}
log.Printf("Offset: %v, Events: %v", indexOffset, len(forwardHistory.ForwardingEvents))
if len(forwardHistory.ForwardingEvents) == 0 {
break
}
indexOffset = forwardHistory.LastOffsetIndex
cfe := copyFromEvents{events: forwardHistory.ForwardingEvents, idx: -1}
err = insertForwardingEvents(&cfe)
if err != nil {
log.Printf("insertForwardingEvents() error: %v", err)
return fmt.Errorf("insertForwardingEvents() error: %w", err)
}
}
return nil
}

2
go.mod
View File

@@ -16,4 +16,4 @@ require (
google.golang.org/grpc v1.31.0
)
replace github.com/lightningnetwork/lnd v0.11.0-beta => github.com/breez/lnd v0.11.0-beta.rc4.0.20201101122458-227226f00b18
replace github.com/lightningnetwork/lnd v0.11.0-beta => github.com/breez/lnd v0.11.0-beta.rc4.0.20210125150416-0c10146b223c

View File

@@ -240,6 +240,10 @@ func resumeOrCancel(
OutgoingRequestedChanId: chanID,
OnionBlob: onionBlob,
})
err := insertChannel(chanID, channelPoint, destination)
if err != nil {
log.Printf("insertChannel error: %v", err)
}
return
}
log.Printf("getChannel(%x, %v) returns 0", destination, channelPoint)

View File

@@ -0,0 +1 @@
DROP TABLE public.forwarding_history;

View File

@@ -0,0 +1,10 @@
CREATE TABLE public.forwarding_history (
"timestamp" bigint NOT NULL,
chanid_in bigint NOT NULL,
chanid_out bigint NOT NULL,
amt_msat_in bigint NOT NULL,
amt_msat_out bigint NOT NULL,
CONSTRAINT timestamp_pkey PRIMARY KEY ("timestamp")
);
CREATE INDEX forwarding_history_chanid_in_idx ON public.forwarding_history (chanid_in);
CREATE INDEX forwarding_history_chanid_out_idx ON public.forwarding_history (chanid_out);

View File

@@ -0,0 +1 @@
DROP TABLE public.channels;

View File

@@ -0,0 +1,7 @@
CREATE TABLE public.channels (
chanid bigint NOT NULL,
channel_point varchar NULL,
nodeid bytea NULL,
CONSTRAINT chanid_pkey PRIMARY KEY (chanid)
);
CREATE INDEX channels_nodeid_idx ON public.channels (nodeid);

View File

@@ -376,6 +376,8 @@ func main() {
go intercept()
go forwardingHistorySynchronize()
s := grpc.NewServer(
grpc_middleware.WithUnaryServerChain(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
if md, ok := metadata.FromIncomingContext(ctx); ok {