From aca8c4f2c0cf709a1a0829fc48e4233d3674310e Mon Sep 17 00:00:00 2001 From: Yaacov Akiba Slama Date: Fri, 5 Feb 2021 07:11:53 +0200 Subject: [PATCH] Add forwarding history sync from lnd to db --- db.go | 62 ++++++++++++++ forwarding_history.go | 85 +++++++++++++++++++ go.mod | 2 +- intercept.go | 4 + .../000005_forwarding_history.down.sql | 1 + .../000005_forwarding_history.up.sql | 10 +++ .../migrations/000006_channels.down.sql | 1 + postgresql/migrations/000006_channels.up.sql | 7 ++ server.go | 2 + 9 files changed, 173 insertions(+), 1 deletion(-) create mode 100644 forwarding_history.go create mode 100644 postgresql/migrations/000005_forwarding_history.down.sql create mode 100644 postgresql/migrations/000005_forwarding_history.up.sql create mode 100644 postgresql/migrations/000006_channels.down.sql create mode 100644 postgresql/migrations/000006_channels.up.sql diff --git a/db.go b/db.go index eb2883c..b082f41 100644 --- a/db.go +++ b/db.go @@ -69,3 +69,65 @@ func registerPayment(destination, paymentHash, paymentSecret []byte, incomingAmo } return nil } + +func insertChannel(chanID uint64, channelPoint string, nodeID []byte) error { + _, err := pgxPool.Exec(context.Background(), + `INSERT INTO + channels (chanis, channel_point, nodeid) + VALUES ($1, $2, $3)`, + chanID, channelPoint, nodeID) + if err != nil { + return fmt.Errorf("insertChannel(%v, %s, %x) error: %w", + chanID, channelPoint, nodeID, err) + } + return nil +} + +func lastForwardingEvent() (int64, error) { + var last int64 + err := pgxPool.QueryRow(context.Background(), + `SELECT coalesce(MAX("timestamp"), 0) AS last FROM forwarding_history`).Scan(&last) + if err != nil { + return 0, err + } + return last, nil +} + +func insertForwardingEvents(rowSrc pgx.CopyFromSource) error { + + tx, err := pgxPool.Begin(context.Background()) + if err != nil { + return fmt.Errorf("pgxPool.Begin() error: %w", err) + } + defer tx.Rollback(context.Background()) + + _, err = tx.Exec(context.Background(), ` + CREATE TEMP TABLE tmp_table ON COMMIT DROP AS + SELECT * + FROM forwarding_history + WITH NO DATA; + `) + if err != nil { + return fmt.Errorf("CREATE TEMP TABLE error: %w", err) + } + + count, err := tx.CopyFrom(context.Background(), + pgx.Identifier{"tmp_table"}, + []string{"timestamp", "chanid_in", "chanid_out", "amt_msat_in", "amt_msat_out"}, rowSrc) + if err != nil { + return fmt.Errorf("CopyFrom() error: %w", err) + } + log.Printf("count1: %v", count) + + cmdTag, err := tx.Exec(context.Background(), ` + INSERT INTO forwarding_history + SELECT * + FROM tmp_table + ON CONFLICT DO NOTHING + `) + if err != nil { + return fmt.Errorf("INSERT INTO forwarding_history error: %w", err) + } + log.Printf("count2: %v", cmdTag.RowsAffected()) + return tx.Commit(context.Background()) +} diff --git a/forwarding_history.go b/forwarding_history.go new file mode 100644 index 0000000..5ee298f --- /dev/null +++ b/forwarding_history.go @@ -0,0 +1,85 @@ +package main + +import ( + "context" + "fmt" + "log" + "os" + "time" + + "github.com/lightningnetwork/lnd/lnrpc" + "google.golang.org/grpc/metadata" +) + +type copyFromEvents struct { + events []*lnrpc.ForwardingEvent + idx int + err error +} + +func (cfe *copyFromEvents) Next() bool { + cfe.idx++ + return cfe.idx < len(cfe.events) +} + +func (cfe *copyFromEvents) Values() ([]interface{}, error) { + event := cfe.events[cfe.idx] + values := []interface{}{ + event.TimestampNs, + event.ChanIdIn, event.ChanIdOut, + event.AmtInMsat, event.AmtOutMsat} + return values, nil +} + +func (cfe *copyFromEvents) Err() error { + return cfe.err +} + +func forwardingHistorySynchronize() { + for { + err := forwardingHistorySynchronizeOnce() + log.Printf("forwardingHistorySynchronizeOnce() err: %v", err) + time.Sleep(1 * time.Minute) + } +} + +func forwardingHistorySynchronizeOnce() error { + last, err := lastForwardingEvent() + if err != nil { + return fmt.Errorf("lastForwardingEvent() error: %w", err) + } + log.Printf("last1: %v", last) + last = last/1_000_000_000 - 1*3600 + if last <= 0 { + last = 1 + } + log.Printf("last2: %v", last) + now := time.Now() + endTime := uint64(now.Add(time.Hour * 24).Unix()) + clientCtx := metadata.AppendToOutgoingContext(context.Background(), "macaroon", os.Getenv("LND_MACAROON_HEX")) + indexOffset := uint32(0) + for { + forwardHistory, err := client.ForwardingHistory(clientCtx, &lnrpc.ForwardingHistoryRequest{ + StartTime: uint64(last), + EndTime: endTime, + NumMaxEvents: 10000, + IndexOffset: indexOffset, + }) + if err != nil { + log.Printf("ForwardingHistory error: %v", err) + return fmt.Errorf("client.ForwardingHistory() error: %w", err) + } + log.Printf("Offset: %v, Events: %v", indexOffset, len(forwardHistory.ForwardingEvents)) + if len(forwardHistory.ForwardingEvents) == 0 { + break + } + indexOffset = forwardHistory.LastOffsetIndex + cfe := copyFromEvents{events: forwardHistory.ForwardingEvents, idx: -1} + err = insertForwardingEvents(&cfe) + if err != nil { + log.Printf("insertForwardingEvents() error: %v", err) + return fmt.Errorf("insertForwardingEvents() error: %w", err) + } + } + return nil +} diff --git a/go.mod b/go.mod index f791783..92414a7 100644 --- a/go.mod +++ b/go.mod @@ -16,4 +16,4 @@ require ( google.golang.org/grpc v1.31.0 ) -replace github.com/lightningnetwork/lnd v0.11.0-beta => github.com/breez/lnd v0.11.0-beta.rc4.0.20201101122458-227226f00b18 +replace github.com/lightningnetwork/lnd v0.11.0-beta => github.com/breez/lnd v0.11.0-beta.rc4.0.20210125150416-0c10146b223c diff --git a/intercept.go b/intercept.go index ab629d5..68bdb80 100644 --- a/intercept.go +++ b/intercept.go @@ -240,6 +240,10 @@ func resumeOrCancel( OutgoingRequestedChanId: chanID, OnionBlob: onionBlob, }) + err := insertChannel(chanID, channelPoint, destination) + if err != nil { + log.Printf("insertChannel error: %v", err) + } return } log.Printf("getChannel(%x, %v) returns 0", destination, channelPoint) diff --git a/postgresql/migrations/000005_forwarding_history.down.sql b/postgresql/migrations/000005_forwarding_history.down.sql new file mode 100644 index 0000000..4dcb607 --- /dev/null +++ b/postgresql/migrations/000005_forwarding_history.down.sql @@ -0,0 +1 @@ +DROP TABLE public.forwarding_history; \ No newline at end of file diff --git a/postgresql/migrations/000005_forwarding_history.up.sql b/postgresql/migrations/000005_forwarding_history.up.sql new file mode 100644 index 0000000..0762b1a --- /dev/null +++ b/postgresql/migrations/000005_forwarding_history.up.sql @@ -0,0 +1,10 @@ +CREATE TABLE public.forwarding_history ( + "timestamp" bigint NOT NULL, + chanid_in bigint NOT NULL, + chanid_out bigint NOT NULL, + amt_msat_in bigint NOT NULL, + amt_msat_out bigint NOT NULL, + CONSTRAINT timestamp_pkey PRIMARY KEY ("timestamp") +); +CREATE INDEX forwarding_history_chanid_in_idx ON public.forwarding_history (chanid_in); +CREATE INDEX forwarding_history_chanid_out_idx ON public.forwarding_history (chanid_out); \ No newline at end of file diff --git a/postgresql/migrations/000006_channels.down.sql b/postgresql/migrations/000006_channels.down.sql new file mode 100644 index 0000000..14a4bcc --- /dev/null +++ b/postgresql/migrations/000006_channels.down.sql @@ -0,0 +1 @@ +DROP TABLE public.channels; \ No newline at end of file diff --git a/postgresql/migrations/000006_channels.up.sql b/postgresql/migrations/000006_channels.up.sql new file mode 100644 index 0000000..659a565 --- /dev/null +++ b/postgresql/migrations/000006_channels.up.sql @@ -0,0 +1,7 @@ +CREATE TABLE public.channels ( + chanid bigint NOT NULL, + channel_point varchar NULL, + nodeid bytea NULL, + CONSTRAINT chanid_pkey PRIMARY KEY (chanid) +); +CREATE INDEX channels_nodeid_idx ON public.channels (nodeid); diff --git a/server.go b/server.go index 9c37c8d..74aaf48 100644 --- a/server.go +++ b/server.go @@ -376,6 +376,8 @@ func main() { go intercept() + go forwardingHistorySynchronize() + s := grpc.NewServer( grpc_middleware.WithUnaryServerChain(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { if md, ok := metadata.FromIncomingContext(ctx); ok {