diff --git a/history/channel_sync.go b/history/channel_sync.go new file mode 100644 index 0000000..c4b58ff --- /dev/null +++ b/history/channel_sync.go @@ -0,0 +1,82 @@ +package history + +import ( + "context" + "log" + "sync" + "time" + + "github.com/breez/lspd/common" +) + +type ChannelSync struct { + nodes []*common.Node + store Store +} + +func NewChannelSync(nodes []*common.Node, store Store) *ChannelSync { + return &ChannelSync{ + nodes: nodes, + store: store, + } +} + +var channelSyncInterval time.Duration = time.Minute * 5 + +func (s *ChannelSync) ChannelsSynchronize(ctx context.Context) { + s.channelsSynchronizeOnce(ctx) + + for { + select { + case <-ctx.Done(): + return + case <-time.After(channelSyncInterval): + } + + s.channelsSynchronizeOnce(ctx) + } +} + +func (s *ChannelSync) channelsSynchronizeOnce(ctx context.Context) { + var wg sync.WaitGroup + wg.Add(len(s.nodes)) + for _, n := range s.nodes { + go func(node *common.Node) { + s.channelsSynchronizeNodeOnce(ctx, node) + wg.Done() + }(n) + } + wg.Wait() +} + +func (s *ChannelSync) channelsSynchronizeNodeOnce(ctx context.Context, node *common.Node) { + lastUpdate := time.Now() + log.Printf("ChannelsSynchronizeNodeOnce(%x) - Begin %v", node.NodeId, lastUpdate) + channels, err := node.Client.ListChannels() + if err != nil { + log.Printf("ChannelsSynchronizeNodeOnce(%x)- ListChannels error: %v", node.NodeId, err) + return + } + + updates := make([]*ChannelUpdate, len(channels)) + for i, c := range channels { + if c == nil { + continue + } + updates[i] = &ChannelUpdate{ + NodeID: node.NodeId, + PeerId: c.PeerId, + AliasScid: c.AliasScid, + ConfirmedScid: c.ConfirmedScid, + ChannelPoint: c.ChannelPoint, + LastUpdate: lastUpdate, + } + } + err = s.store.UpdateChannels(ctx, updates) + if err != nil { + log.Printf("ChannelsSynchronizeNodeOnce(%x) - store.UpdateChannels error: %v", node.NodeId, err) + return + } + + log.Printf("ChannelsSynchronizeNodeOnce(%x) - Done %v", node.NodeId, lastUpdate) +} diff --git a/history/store.go b/history/store.go new file mode 100644 index 0000000..29107e0 --- /dev/null +++ b/history/store.go @@ -0,0 +1,22 @@ +package history + +import ( + "context" + "time" + + "github.com/breez/lspd/lightning" + "github.com/btcsuite/btcd/wire" +) + +type ChannelUpdate struct { + NodeID []byte + PeerId []byte + AliasScid *lightning.ShortChannelID + ConfirmedScid *lightning.ShortChannelID + ChannelPoint *wire.OutPoint + LastUpdate time.Time +} + +type Store interface { + UpdateChannels(ctx context.Context, updates []*ChannelUpdate) error +} diff --git a/main.go b/main.go index 8e86e5a..189dd4a 100644 --- a/main.go +++ b/main.go @@ -18,6 +18,7 @@ import ( "github.com/breez/lspd/cln" "github.com/breez/lspd/common" "github.com/breez/lspd/config" + "github.com/breez/lspd/history" "github.com/breez/lspd/interceptor" "github.com/breez/lspd/lnd" "github.com/breez/lspd/lsps0" @@ -99,6 +100,7 @@ func main() { openingStore := postgresql.NewPostgresOpeningStore(pool) notificationsStore := postgresql.NewNotificationsStore(pool) lsps2Store := postgresql.NewLsps2Store(pool) + historyStore := postgresql.NewHistoryStore(pool) ctx, cancel := context.WithCancel(context.Background()) notificationService := notifications.NewNotificationService(notificationsStore) @@ -108,6 +110,8 @@ func main() { go lsps2CleanupService.Start(ctx) notificationCleanupService := notifications.NewCleanupService(notificationsStore) go notificationCleanupService.Start(ctx) + channelSync := history.NewChannelSync(nodes, historyStore) + go channelSync.ChannelsSynchronize(ctx) var interceptors []interceptor.HtlcInterceptor for _, node := range nodes { diff --git a/postgresql/history_store.go b/postgresql/history_store.go new file mode 100644 index 0000000..acca5ca --- /dev/null +++ b/postgresql/history_store.go @@ -0,0 +1,131 @@ +package postgresql + +import ( + "context" + "fmt" + "log" + + "github.com/GoWebProd/uuid7" + "github.com/breez/lspd/history" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" +) + +type copyFromChanUpdates struct { + channels []*history.ChannelUpdate + idx int + err error +} + +func (cfe *copyFromChanUpdates) Next() bool { + if len(cfe.channels) == 0 { + return false + } + + for { + cfe.idx++ + if cfe.idx >= len(cfe.channels) { + return false + } + + if cfe.channels[cfe.idx] == nil { + continue + } + + return true + } +} + +func (cfe *copyFromChanUpdates) Values() ([]interface{}, error) { + channel := cfe.channels[cfe.idx] + var aliasScid *int64 + if channel.AliasScid != nil { + tmp := uint64(*channel.AliasScid) + tmp2 := int64(tmp) + aliasScid = &tmp2 + } + var confirmedScid *int64 + if channel.ConfirmedScid != nil { + tmp := uint64(*channel.ConfirmedScid) + tmp2 := int64(tmp) + confirmedScid = &tmp2 + } + values := []interface{}{ + channel.NodeID, + channel.PeerId, + aliasScid, + confirmedScid, + channel.ChannelPoint.Hash[:], + channel.ChannelPoint.Index, + channel.LastUpdate, + channel.LastUpdate, + } + return values, nil +} + +func (cfe *copyFromChanUpdates) Err() error { + return cfe.err +} + +type HistoryStore struct { + pool *pgxpool.Pool + generator *uuid7.Generator +} + +func NewHistoryStore(pool *pgxpool.Pool) *HistoryStore { + return &HistoryStore{ + pool: pool, + generator: uuid7.New(), + } +} + +func (s *HistoryStore) UpdateChannels( + ctx context.Context, + updates []*history.ChannelUpdate, +) error { + if len(updates) == 0 { + return nil + } + + tx, err := s.pool.Begin(ctx) + if err != nil { + return fmt.Errorf("pgxPool.Begin() error: %w", err) + } + defer tx.Rollback(ctx) + + _, err = tx.Exec(ctx, ` + CREATE TEMP TABLE tmp_table ON COMMIT DROP AS + SELECT * + FROM channels + WITH NO DATA; + `) + if err != nil { + return fmt.Errorf("CREATE TEMP TABLE error: %w", err) + } + + rowSrc := ©FromChanUpdates{channels: updates, idx: -1} + count, err := tx.CopyFrom(ctx, + pgx.Identifier{"tmp_table"}, + []string{"nodeid", "peerid", "alias_scid", "confirmed_scid", "funding_tx_id", "funding_tx_outnum", "first_seen", "last_update"}, + rowSrc) + if err != nil { + return fmt.Errorf("CopyFrom() error: %w", err) + } + log.Printf("UpdateChannels - count1: %v", count) + + cmdTag, err := tx.Exec(ctx, ` + INSERT INTO channels + SELECT * + FROM tmp_table + ON CONFLICT (nodeid, funding_tx_id, funding_tx_outnum) DO UPDATE SET + alias_scid = EXCLUDED.alias_scid, + confirmed_scid = EXCLUDED.confirmed_scid, + last_update = EXCLUDED.last_update + `) + if err != nil { + return fmt.Errorf("INSERT INTO channels error: %w", err) + } + log.Printf("UpdateChannels - count2: %v", cmdTag.RowsAffected()) + + return tx.Commit(ctx) +}