From 3e92e51fc26d55a825081c28c885a333863dffaa Mon Sep 17 00:00:00 2001 From: Jesse de Wit Date: Fri, 29 Dec 2023 11:04:46 +0100 Subject: [PATCH] delete existing history sync --- lnd/forwarding_event_store.go | 12 -- lnd/forwarding_history.go | 183 --------------------------- lnd/interceptor.go | 5 - main.go | 4 +- postgresql/forwarding_event_store.go | 68 ---------- 5 files changed, 1 insertion(+), 271 deletions(-) delete mode 100644 lnd/forwarding_event_store.go delete mode 100644 lnd/forwarding_history.go delete mode 100644 postgresql/forwarding_event_store.go diff --git a/lnd/forwarding_event_store.go b/lnd/forwarding_event_store.go deleted file mode 100644 index 22fc4a4..0000000 --- a/lnd/forwarding_event_store.go +++ /dev/null @@ -1,12 +0,0 @@ -package lnd - -type CopyFromSource interface { - Next() bool - Values() ([]interface{}, error) - Err() error -} - -type ForwardingEventStore interface { - LastForwardingEvent() (int64, error) - InsertForwardingEvents(rowSrc CopyFromSource) error -} diff --git a/lnd/forwarding_history.go b/lnd/forwarding_history.go deleted file mode 100644 index debd93f..0000000 --- a/lnd/forwarding_history.go +++ /dev/null @@ -1,183 +0,0 @@ -package lnd - -import ( - "context" - "encoding/hex" - "fmt" - "log" - "time" - - "github.com/breez/lspd/interceptor" - "github.com/lightningnetwork/lnd/htlcswitch/hop" - "github.com/lightningnetwork/lnd/lnrpc" - "github.com/lightningnetwork/lnd/lnrpc/chainrpc" -) - -type copyFromEvents struct { - events []*lnrpc.ForwardingEvent - idx int - err error -} - -func (cfe *copyFromEvents) Next() bool { - cfe.idx++ - return cfe.idx < len(cfe.events) -} - -func (cfe *copyFromEvents) Values() ([]interface{}, error) { - event := cfe.events[cfe.idx] - values := []interface{}{ - event.TimestampNs, - int64(event.ChanIdIn), int64(event.ChanIdOut), - event.AmtInMsat, event.AmtOutMsat} - return values, nil -} - -func (cfe *copyFromEvents) Err() error { - return cfe.err -} - -type ForwardingHistorySync struct { - client *LndClient - interceptStore interceptor.InterceptStore - forwardingStore ForwardingEventStore -} - -func NewForwardingHistorySync( - client *LndClient, - interceptStore interceptor.InterceptStore, - forwardingStore ForwardingEventStore, -) *ForwardingHistorySync { - return &ForwardingHistorySync{ - client: client, - interceptStore: interceptStore, - forwardingStore: forwardingStore, - } -} - -func (s *ForwardingHistorySync) ChannelsSynchronize(ctx context.Context) { - lastSync := time.Now().Add(-6 * time.Minute) - for { - if ctx.Err() != nil { - return - } - - stream, err := s.client.chainNotifierClient.RegisterBlockEpochNtfn(ctx, &chainrpc.BlockEpoch{}) - if err != nil { - log.Printf("chainNotifierClient.RegisterBlockEpochNtfn(): %v", err) - <-time.After(time.Second) - continue - } - - for { - if ctx.Err() != nil { - return - } - - _, err := stream.Recv() - if err != nil { - log.Printf("stream.Recv: %v", err) - <-time.After(time.Second) - break - } - if lastSync.Add(5 * time.Minute).Before(time.Now()) { - select { - case <-ctx.Done(): - return - case <-time.After(1 * time.Minute): - } - err = s.ChannelsSynchronizeOnce() - lastSync = time.Now() - log.Printf("channelsSynchronizeOnce() err: %v", err) - } - } - } -} - -func (s *ForwardingHistorySync) ChannelsSynchronizeOnce() error { - log.Printf("channelsSynchronizeOnce - begin") - channels, err := s.client.client.ListChannels(context.Background(), &lnrpc.ListChannelsRequest{PrivateOnly: true}) - if err != nil { - log.Printf("ListChannels error: %v", err) - return fmt.Errorf("client.ListChannels() error: %w", err) - } - log.Printf("channelsSynchronizeOnce - received channels") - lastUpdate := time.Now() - for _, c := range channels.Channels { - nodeID, err := hex.DecodeString(c.RemotePubkey) - if err != nil { - log.Printf("hex.DecodeString in channelsSynchronizeOnce error: %v", err) - continue - } - confirmedChanId := c.ChanId - if c.ZeroConf { - confirmedChanId = c.ZeroConfConfirmedScid - if confirmedChanId == hop.Source.ToUint64() { - confirmedChanId = 0 - } - } - err = s.interceptStore.InsertChannel(c.ChanId, confirmedChanId, c.ChannelPoint, nodeID, lastUpdate) - if err != nil { - log.Printf("insertChannel(%v, %v, %x) in channelsSynchronizeOnce error: %v", c.ChanId, c.ChannelPoint, nodeID, err) - continue - } - } - log.Printf("channelsSynchronizeOnce - done") - - return nil -} - -func (s *ForwardingHistorySync) ForwardingHistorySynchronize(ctx context.Context) { - for { - if ctx.Err() != nil { - return - } - - err := s.ForwardingHistorySynchronizeOnce() - log.Printf("forwardingHistorySynchronizeOnce() err: %v", err) - select { - case <-time.After(1 * time.Minute): - case <-ctx.Done(): - } - } -} - -func (s *ForwardingHistorySync) ForwardingHistorySynchronizeOnce() error { - last, err := s.forwardingStore.LastForwardingEvent() - if err != nil { - return fmt.Errorf("lastForwardingEvent() error: %w", err) - } - log.Printf("last1: %v", last) - last = last/1_000_000_000 - 1*3600 - if last <= 0 { - last = 1 - } - log.Printf("last2: %v", last) - now := time.Now() - endTime := uint64(now.Add(time.Hour * 24).Unix()) - indexOffset := uint32(0) - for { - forwardHistory, err := s.client.client.ForwardingHistory(context.Background(), &lnrpc.ForwardingHistoryRequest{ - StartTime: uint64(last), - EndTime: endTime, - NumMaxEvents: 10000, - IndexOffset: indexOffset, - }) - if err != nil { - log.Printf("ForwardingHistory error: %v", err) - return fmt.Errorf("client.ForwardingHistory() error: %w", err) - } - log.Printf("Offset: %v, Events: %v", indexOffset, len(forwardHistory.ForwardingEvents)) - if len(forwardHistory.ForwardingEvents) == 0 { - break - } - indexOffset = forwardHistory.LastOffsetIndex - cfe := copyFromEvents{events: forwardHistory.ForwardingEvents, idx: -1} - err = s.forwardingStore.InsertForwardingEvents(&cfe) - if err != nil { - log.Printf("insertForwardingEvents() error: %v", err) - return fmt.Errorf("insertForwardingEvents() error: %w", err) - } - } - return nil -} diff --git a/lnd/interceptor.go b/lnd/interceptor.go index c3eef64..0dee610 100644 --- a/lnd/interceptor.go +++ b/lnd/interceptor.go @@ -23,7 +23,6 @@ import ( ) type LndHtlcInterceptor struct { - fwsync *ForwardingHistorySync interceptor *interceptor.Interceptor config *config.NodeConfig client *LndClient @@ -37,13 +36,11 @@ type LndHtlcInterceptor struct { func NewLndHtlcInterceptor( conf *config.NodeConfig, client *LndClient, - fwsync *ForwardingHistorySync, interceptor *interceptor.Interceptor, ) (*LndHtlcInterceptor, error) { i := &LndHtlcInterceptor{ config: conf, client: client, - fwsync: fwsync, interceptor: interceptor, } @@ -57,8 +54,6 @@ func (i *LndHtlcInterceptor) Start() error { i.ctx = ctx i.cancel = cancel i.stopRequested = false - go i.fwsync.ForwardingHistorySynchronize(ctx) - go i.fwsync.ChannelsSynchronize(ctx) return i.intercept() } diff --git a/main.go b/main.go index 47effaf..91e85b7 100644 --- a/main.go +++ b/main.go @@ -97,7 +97,6 @@ func main() { interceptStore := postgresql.NewPostgresInterceptStore(pool) openingStore := postgresql.NewPostgresOpeningStore(pool) - forwardingStore := postgresql.NewForwardingEventStore(pool) notificationsStore := postgresql.NewNotificationsStore(pool) lsps2Store := postgresql.NewLsps2Store(pool) @@ -127,9 +126,8 @@ func main() { } client.StartListeners() - fwsync := lnd.NewForwardingHistorySync(client, interceptStore, forwardingStore) interceptor := interceptor.NewInterceptHandler(client, node.NodeConfig, interceptStore, openingService, feeEstimator, feeStrategy, notificationService) - htlcInterceptor, err = lnd.NewLndHtlcInterceptor(node.NodeConfig, client, fwsync, interceptor) + htlcInterceptor, err = lnd.NewLndHtlcInterceptor(node.NodeConfig, client, interceptor) if err != nil { log.Fatalf("failed to initialize LND interceptor: %v", err) } diff --git a/postgresql/forwarding_event_store.go b/postgresql/forwarding_event_store.go deleted file mode 100644 index ce0fbff..0000000 --- a/postgresql/forwarding_event_store.go +++ /dev/null @@ -1,68 +0,0 @@ -package postgresql - -import ( - "context" - "fmt" - "log" - - "github.com/breez/lspd/lnd" - "github.com/jackc/pgx/v5" - "github.com/jackc/pgx/v5/pgxpool" -) - -type ForwardingEventStore struct { - pool *pgxpool.Pool -} - -func NewForwardingEventStore(pool *pgxpool.Pool) *ForwardingEventStore { - return &ForwardingEventStore{pool: pool} -} - -func (s *ForwardingEventStore) LastForwardingEvent() (int64, error) { - var last int64 - err := s.pool.QueryRow(context.Background(), - `SELECT coalesce(MAX("timestamp"), 0) AS last FROM forwarding_history`).Scan(&last) - if err != nil { - return 0, err - } - return last, nil -} - -func (s *ForwardingEventStore) InsertForwardingEvents(rowSrc lnd.CopyFromSource) error { - - tx, err := s.pool.Begin(context.Background()) - if err != nil { - return fmt.Errorf("pgxPool.Begin() error: %w", err) - } - defer tx.Rollback(context.Background()) - - _, err = tx.Exec(context.Background(), ` - CREATE TEMP TABLE tmp_table ON COMMIT DROP AS - SELECT * - FROM forwarding_history - WITH NO DATA; - `) - if err != nil { - return fmt.Errorf("CREATE TEMP TABLE error: %w", err) - } - - count, err := tx.CopyFrom(context.Background(), - pgx.Identifier{"tmp_table"}, - []string{"timestamp", "chanid_in", "chanid_out", "amt_msat_in", "amt_msat_out"}, rowSrc) - if err != nil { - return fmt.Errorf("CopyFrom() error: %w", err) - } - log.Printf("count1: %v", count) - - cmdTag, err := tx.Exec(context.Background(), ` - INSERT INTO forwarding_history - SELECT * - FROM tmp_table - ON CONFLICT DO NOTHING - `) - if err != nil { - return fmt.Errorf("INSERT INTO forwarding_history error: %w", err) - } - log.Printf("count2: %v", cmdTag.RowsAffected()) - return tx.Commit(context.Background()) -}