synchronize channels

This commit is contained in:
Jesse de Wit
2023-12-29 11:35:32 +01:00
parent 39d36318e2
commit 03a1708a1e
4 changed files with 239 additions and 0 deletions

82
history/channel_sync.go Normal file
View File

@@ -0,0 +1,82 @@
package history
import (
"context"
"log"
"sync"
"time"
"github.com/breez/lspd/common"
)
type ChannelSync struct {
nodes []*common.Node
store Store
}
func NewChannelSync(nodes []*common.Node, store Store) *ChannelSync {
return &ChannelSync{
nodes: nodes,
store: store,
}
}
var channelSyncInterval time.Duration = time.Minute * 5
func (s *ChannelSync) ChannelsSynchronize(ctx context.Context) {
s.channelsSynchronizeOnce(ctx)
for {
select {
case <-ctx.Done():
return
case <-time.After(channelSyncInterval):
}
s.channelsSynchronizeOnce(ctx)
}
}
func (s *ChannelSync) channelsSynchronizeOnce(ctx context.Context) {
var wg sync.WaitGroup
wg.Add(len(s.nodes))
for _, n := range s.nodes {
go func(node *common.Node) {
s.channelsSynchronizeNodeOnce(ctx, node)
wg.Done()
}(n)
}
wg.Wait()
}
func (s *ChannelSync) channelsSynchronizeNodeOnce(ctx context.Context, node *common.Node) {
lastUpdate := time.Now()
log.Printf("ChannelsSynchronizeNodeOnce(%x) - Begin %v", node.NodeId, lastUpdate)
channels, err := node.Client.ListChannels()
if err != nil {
log.Printf("ChannelsSynchronizeNodeOnce(%x)- ListChannels error: %v", node.NodeId, err)
return
}
updates := make([]*ChannelUpdate, len(channels))
for i, c := range channels {
if c == nil {
continue
}
updates[i] = &ChannelUpdate{
NodeID: node.NodeId,
PeerId: c.PeerId,
AliasScid: c.AliasScid,
ConfirmedScid: c.ConfirmedScid,
ChannelPoint: c.ChannelPoint,
LastUpdate: lastUpdate,
}
}
err = s.store.UpdateChannels(ctx, updates)
if err != nil {
log.Printf("ChannelsSynchronizeNodeOnce(%x) - store.UpdateChannels error: %v", node.NodeId, err)
return
}
log.Printf("ChannelsSynchronizeNodeOnce(%x) - Done %v", node.NodeId, lastUpdate)
}

22
history/store.go Normal file
View File

@@ -0,0 +1,22 @@
package history
import (
"context"
"time"
"github.com/breez/lspd/lightning"
"github.com/btcsuite/btcd/wire"
)
type ChannelUpdate struct {
NodeID []byte
PeerId []byte
AliasScid *lightning.ShortChannelID
ConfirmedScid *lightning.ShortChannelID
ChannelPoint *wire.OutPoint
LastUpdate time.Time
}
type Store interface {
UpdateChannels(ctx context.Context, updates []*ChannelUpdate) error
}

View File

@@ -18,6 +18,7 @@ import (
"github.com/breez/lspd/cln"
"github.com/breez/lspd/common"
"github.com/breez/lspd/config"
"github.com/breez/lspd/history"
"github.com/breez/lspd/interceptor"
"github.com/breez/lspd/lnd"
"github.com/breez/lspd/lsps0"
@@ -99,6 +100,7 @@ func main() {
openingStore := postgresql.NewPostgresOpeningStore(pool)
notificationsStore := postgresql.NewNotificationsStore(pool)
lsps2Store := postgresql.NewLsps2Store(pool)
historyStore := postgresql.NewHistoryStore(pool)
ctx, cancel := context.WithCancel(context.Background())
notificationService := notifications.NewNotificationService(notificationsStore)
@@ -108,6 +110,8 @@ func main() {
go lsps2CleanupService.Start(ctx)
notificationCleanupService := notifications.NewCleanupService(notificationsStore)
go notificationCleanupService.Start(ctx)
channelSync := history.NewChannelSync(nodes, historyStore)
go channelSync.ChannelsSynchronize(ctx)
var interceptors []interceptor.HtlcInterceptor
for _, node := range nodes {

131
postgresql/history_store.go Normal file
View File

@@ -0,0 +1,131 @@
package postgresql
import (
"context"
"fmt"
"log"
"github.com/GoWebProd/uuid7"
"github.com/breez/lspd/history"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)
type copyFromChanUpdates struct {
channels []*history.ChannelUpdate
idx int
err error
}
func (cfe *copyFromChanUpdates) Next() bool {
if len(cfe.channels) == 0 {
return false
}
for {
cfe.idx++
if cfe.idx >= len(cfe.channels) {
return false
}
if cfe.channels[cfe.idx] == nil {
continue
}
return true
}
}
func (cfe *copyFromChanUpdates) Values() ([]interface{}, error) {
channel := cfe.channels[cfe.idx]
var aliasScid *int64
if channel.AliasScid != nil {
tmp := uint64(*channel.AliasScid)
tmp2 := int64(tmp)
aliasScid = &tmp2
}
var confirmedScid *int64
if channel.ConfirmedScid != nil {
tmp := uint64(*channel.ConfirmedScid)
tmp2 := int64(tmp)
confirmedScid = &tmp2
}
values := []interface{}{
channel.NodeID,
channel.PeerId,
aliasScid,
confirmedScid,
channel.ChannelPoint.Hash[:],
channel.ChannelPoint.Index,
channel.LastUpdate,
channel.LastUpdate,
}
return values, nil
}
func (cfe *copyFromChanUpdates) Err() error {
return cfe.err
}
type HistoryStore struct {
pool *pgxpool.Pool
generator *uuid7.Generator
}
func NewHistoryStore(pool *pgxpool.Pool) *HistoryStore {
return &HistoryStore{
pool: pool,
generator: uuid7.New(),
}
}
func (s *HistoryStore) UpdateChannels(
ctx context.Context,
updates []*history.ChannelUpdate,
) error {
if len(updates) == 0 {
return nil
}
tx, err := s.pool.Begin(ctx)
if err != nil {
return fmt.Errorf("pgxPool.Begin() error: %w", err)
}
defer tx.Rollback(ctx)
_, err = tx.Exec(ctx, `
CREATE TEMP TABLE tmp_table ON COMMIT DROP AS
SELECT *
FROM channels
WITH NO DATA;
`)
if err != nil {
return fmt.Errorf("CREATE TEMP TABLE error: %w", err)
}
rowSrc := &copyFromChanUpdates{channels: updates, idx: -1}
count, err := tx.CopyFrom(ctx,
pgx.Identifier{"tmp_table"},
[]string{"nodeid", "peerid", "alias_scid", "confirmed_scid", "funding_tx_id", "funding_tx_outnum", "first_seen", "last_update"},
rowSrc)
if err != nil {
return fmt.Errorf("CopyFrom() error: %w", err)
}
log.Printf("UpdateChannels - count1: %v", count)
cmdTag, err := tx.Exec(ctx, `
INSERT INTO channels
SELECT *
FROM tmp_table
ON CONFLICT (nodeid, funding_tx_id, funding_tx_outnum) DO UPDATE SET
alias_scid = EXCLUDED.alias_scid,
confirmed_scid = EXCLUDED.confirmed_scid,
last_update = EXCLUDED.last_update
`)
if err != nil {
return fmt.Errorf("INSERT INTO channels error: %w", err)
}
log.Printf("UpdateChannels - count2: %v", cmdTag.RowsAffected())
return tx.Commit(ctx)
}