mirror of
https://github.com/aljazceru/lspd.git
synced 2025-12-19 23:04:22 +01:00
delete existing history sync
This commit is contained in:
@@ -1,12 +0,0 @@
|
|||||||
package lnd
|
|
||||||
|
|
||||||
type CopyFromSource interface {
|
|
||||||
Next() bool
|
|
||||||
Values() ([]interface{}, error)
|
|
||||||
Err() error
|
|
||||||
}
|
|
||||||
|
|
||||||
type ForwardingEventStore interface {
|
|
||||||
LastForwardingEvent() (int64, error)
|
|
||||||
InsertForwardingEvents(rowSrc CopyFromSource) error
|
|
||||||
}
|
|
||||||
@@ -1,183 +0,0 @@
|
|||||||
package lnd
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"encoding/hex"
|
|
||||||
"fmt"
|
|
||||||
"log"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/breez/lspd/interceptor"
|
|
||||||
"github.com/lightningnetwork/lnd/htlcswitch/hop"
|
|
||||||
"github.com/lightningnetwork/lnd/lnrpc"
|
|
||||||
"github.com/lightningnetwork/lnd/lnrpc/chainrpc"
|
|
||||||
)
|
|
||||||
|
|
||||||
type copyFromEvents struct {
|
|
||||||
events []*lnrpc.ForwardingEvent
|
|
||||||
idx int
|
|
||||||
err error
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cfe *copyFromEvents) Next() bool {
|
|
||||||
cfe.idx++
|
|
||||||
return cfe.idx < len(cfe.events)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cfe *copyFromEvents) Values() ([]interface{}, error) {
|
|
||||||
event := cfe.events[cfe.idx]
|
|
||||||
values := []interface{}{
|
|
||||||
event.TimestampNs,
|
|
||||||
int64(event.ChanIdIn), int64(event.ChanIdOut),
|
|
||||||
event.AmtInMsat, event.AmtOutMsat}
|
|
||||||
return values, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cfe *copyFromEvents) Err() error {
|
|
||||||
return cfe.err
|
|
||||||
}
|
|
||||||
|
|
||||||
type ForwardingHistorySync struct {
|
|
||||||
client *LndClient
|
|
||||||
interceptStore interceptor.InterceptStore
|
|
||||||
forwardingStore ForwardingEventStore
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewForwardingHistorySync(
|
|
||||||
client *LndClient,
|
|
||||||
interceptStore interceptor.InterceptStore,
|
|
||||||
forwardingStore ForwardingEventStore,
|
|
||||||
) *ForwardingHistorySync {
|
|
||||||
return &ForwardingHistorySync{
|
|
||||||
client: client,
|
|
||||||
interceptStore: interceptStore,
|
|
||||||
forwardingStore: forwardingStore,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *ForwardingHistorySync) ChannelsSynchronize(ctx context.Context) {
|
|
||||||
lastSync := time.Now().Add(-6 * time.Minute)
|
|
||||||
for {
|
|
||||||
if ctx.Err() != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
stream, err := s.client.chainNotifierClient.RegisterBlockEpochNtfn(ctx, &chainrpc.BlockEpoch{})
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("chainNotifierClient.RegisterBlockEpochNtfn(): %v", err)
|
|
||||||
<-time.After(time.Second)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
for {
|
|
||||||
if ctx.Err() != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err := stream.Recv()
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("stream.Recv: %v", err)
|
|
||||||
<-time.After(time.Second)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
if lastSync.Add(5 * time.Minute).Before(time.Now()) {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
case <-time.After(1 * time.Minute):
|
|
||||||
}
|
|
||||||
err = s.ChannelsSynchronizeOnce()
|
|
||||||
lastSync = time.Now()
|
|
||||||
log.Printf("channelsSynchronizeOnce() err: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *ForwardingHistorySync) ChannelsSynchronizeOnce() error {
|
|
||||||
log.Printf("channelsSynchronizeOnce - begin")
|
|
||||||
channels, err := s.client.client.ListChannels(context.Background(), &lnrpc.ListChannelsRequest{PrivateOnly: true})
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("ListChannels error: %v", err)
|
|
||||||
return fmt.Errorf("client.ListChannels() error: %w", err)
|
|
||||||
}
|
|
||||||
log.Printf("channelsSynchronizeOnce - received channels")
|
|
||||||
lastUpdate := time.Now()
|
|
||||||
for _, c := range channels.Channels {
|
|
||||||
nodeID, err := hex.DecodeString(c.RemotePubkey)
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("hex.DecodeString in channelsSynchronizeOnce error: %v", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
confirmedChanId := c.ChanId
|
|
||||||
if c.ZeroConf {
|
|
||||||
confirmedChanId = c.ZeroConfConfirmedScid
|
|
||||||
if confirmedChanId == hop.Source.ToUint64() {
|
|
||||||
confirmedChanId = 0
|
|
||||||
}
|
|
||||||
}
|
|
||||||
err = s.interceptStore.InsertChannel(c.ChanId, confirmedChanId, c.ChannelPoint, nodeID, lastUpdate)
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("insertChannel(%v, %v, %x) in channelsSynchronizeOnce error: %v", c.ChanId, c.ChannelPoint, nodeID, err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
log.Printf("channelsSynchronizeOnce - done")
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *ForwardingHistorySync) ForwardingHistorySynchronize(ctx context.Context) {
|
|
||||||
for {
|
|
||||||
if ctx.Err() != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
err := s.ForwardingHistorySynchronizeOnce()
|
|
||||||
log.Printf("forwardingHistorySynchronizeOnce() err: %v", err)
|
|
||||||
select {
|
|
||||||
case <-time.After(1 * time.Minute):
|
|
||||||
case <-ctx.Done():
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *ForwardingHistorySync) ForwardingHistorySynchronizeOnce() error {
|
|
||||||
last, err := s.forwardingStore.LastForwardingEvent()
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("lastForwardingEvent() error: %w", err)
|
|
||||||
}
|
|
||||||
log.Printf("last1: %v", last)
|
|
||||||
last = last/1_000_000_000 - 1*3600
|
|
||||||
if last <= 0 {
|
|
||||||
last = 1
|
|
||||||
}
|
|
||||||
log.Printf("last2: %v", last)
|
|
||||||
now := time.Now()
|
|
||||||
endTime := uint64(now.Add(time.Hour * 24).Unix())
|
|
||||||
indexOffset := uint32(0)
|
|
||||||
for {
|
|
||||||
forwardHistory, err := s.client.client.ForwardingHistory(context.Background(), &lnrpc.ForwardingHistoryRequest{
|
|
||||||
StartTime: uint64(last),
|
|
||||||
EndTime: endTime,
|
|
||||||
NumMaxEvents: 10000,
|
|
||||||
IndexOffset: indexOffset,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("ForwardingHistory error: %v", err)
|
|
||||||
return fmt.Errorf("client.ForwardingHistory() error: %w", err)
|
|
||||||
}
|
|
||||||
log.Printf("Offset: %v, Events: %v", indexOffset, len(forwardHistory.ForwardingEvents))
|
|
||||||
if len(forwardHistory.ForwardingEvents) == 0 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
indexOffset = forwardHistory.LastOffsetIndex
|
|
||||||
cfe := copyFromEvents{events: forwardHistory.ForwardingEvents, idx: -1}
|
|
||||||
err = s.forwardingStore.InsertForwardingEvents(&cfe)
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("insertForwardingEvents() error: %v", err)
|
|
||||||
return fmt.Errorf("insertForwardingEvents() error: %w", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
@@ -23,7 +23,6 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type LndHtlcInterceptor struct {
|
type LndHtlcInterceptor struct {
|
||||||
fwsync *ForwardingHistorySync
|
|
||||||
interceptor *interceptor.Interceptor
|
interceptor *interceptor.Interceptor
|
||||||
config *config.NodeConfig
|
config *config.NodeConfig
|
||||||
client *LndClient
|
client *LndClient
|
||||||
@@ -37,13 +36,11 @@ type LndHtlcInterceptor struct {
|
|||||||
func NewLndHtlcInterceptor(
|
func NewLndHtlcInterceptor(
|
||||||
conf *config.NodeConfig,
|
conf *config.NodeConfig,
|
||||||
client *LndClient,
|
client *LndClient,
|
||||||
fwsync *ForwardingHistorySync,
|
|
||||||
interceptor *interceptor.Interceptor,
|
interceptor *interceptor.Interceptor,
|
||||||
) (*LndHtlcInterceptor, error) {
|
) (*LndHtlcInterceptor, error) {
|
||||||
i := &LndHtlcInterceptor{
|
i := &LndHtlcInterceptor{
|
||||||
config: conf,
|
config: conf,
|
||||||
client: client,
|
client: client,
|
||||||
fwsync: fwsync,
|
|
||||||
interceptor: interceptor,
|
interceptor: interceptor,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -57,8 +54,6 @@ func (i *LndHtlcInterceptor) Start() error {
|
|||||||
i.ctx = ctx
|
i.ctx = ctx
|
||||||
i.cancel = cancel
|
i.cancel = cancel
|
||||||
i.stopRequested = false
|
i.stopRequested = false
|
||||||
go i.fwsync.ForwardingHistorySynchronize(ctx)
|
|
||||||
go i.fwsync.ChannelsSynchronize(ctx)
|
|
||||||
|
|
||||||
return i.intercept()
|
return i.intercept()
|
||||||
}
|
}
|
||||||
|
|||||||
4
main.go
4
main.go
@@ -97,7 +97,6 @@ func main() {
|
|||||||
|
|
||||||
interceptStore := postgresql.NewPostgresInterceptStore(pool)
|
interceptStore := postgresql.NewPostgresInterceptStore(pool)
|
||||||
openingStore := postgresql.NewPostgresOpeningStore(pool)
|
openingStore := postgresql.NewPostgresOpeningStore(pool)
|
||||||
forwardingStore := postgresql.NewForwardingEventStore(pool)
|
|
||||||
notificationsStore := postgresql.NewNotificationsStore(pool)
|
notificationsStore := postgresql.NewNotificationsStore(pool)
|
||||||
lsps2Store := postgresql.NewLsps2Store(pool)
|
lsps2Store := postgresql.NewLsps2Store(pool)
|
||||||
|
|
||||||
@@ -127,9 +126,8 @@ func main() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
client.StartListeners()
|
client.StartListeners()
|
||||||
fwsync := lnd.NewForwardingHistorySync(client, interceptStore, forwardingStore)
|
|
||||||
interceptor := interceptor.NewInterceptHandler(client, node.NodeConfig, interceptStore, openingService, feeEstimator, feeStrategy, notificationService)
|
interceptor := interceptor.NewInterceptHandler(client, node.NodeConfig, interceptStore, openingService, feeEstimator, feeStrategy, notificationService)
|
||||||
htlcInterceptor, err = lnd.NewLndHtlcInterceptor(node.NodeConfig, client, fwsync, interceptor)
|
htlcInterceptor, err = lnd.NewLndHtlcInterceptor(node.NodeConfig, client, interceptor)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("failed to initialize LND interceptor: %v", err)
|
log.Fatalf("failed to initialize LND interceptor: %v", err)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,68 +0,0 @@
|
|||||||
package postgresql
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"log"
|
|
||||||
|
|
||||||
"github.com/breez/lspd/lnd"
|
|
||||||
"github.com/jackc/pgx/v5"
|
|
||||||
"github.com/jackc/pgx/v5/pgxpool"
|
|
||||||
)
|
|
||||||
|
|
||||||
type ForwardingEventStore struct {
|
|
||||||
pool *pgxpool.Pool
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewForwardingEventStore(pool *pgxpool.Pool) *ForwardingEventStore {
|
|
||||||
return &ForwardingEventStore{pool: pool}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *ForwardingEventStore) LastForwardingEvent() (int64, error) {
|
|
||||||
var last int64
|
|
||||||
err := s.pool.QueryRow(context.Background(),
|
|
||||||
`SELECT coalesce(MAX("timestamp"), 0) AS last FROM forwarding_history`).Scan(&last)
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
return last, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *ForwardingEventStore) InsertForwardingEvents(rowSrc lnd.CopyFromSource) error {
|
|
||||||
|
|
||||||
tx, err := s.pool.Begin(context.Background())
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("pgxPool.Begin() error: %w", err)
|
|
||||||
}
|
|
||||||
defer tx.Rollback(context.Background())
|
|
||||||
|
|
||||||
_, err = tx.Exec(context.Background(), `
|
|
||||||
CREATE TEMP TABLE tmp_table ON COMMIT DROP AS
|
|
||||||
SELECT *
|
|
||||||
FROM forwarding_history
|
|
||||||
WITH NO DATA;
|
|
||||||
`)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("CREATE TEMP TABLE error: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
count, err := tx.CopyFrom(context.Background(),
|
|
||||||
pgx.Identifier{"tmp_table"},
|
|
||||||
[]string{"timestamp", "chanid_in", "chanid_out", "amt_msat_in", "amt_msat_out"}, rowSrc)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("CopyFrom() error: %w", err)
|
|
||||||
}
|
|
||||||
log.Printf("count1: %v", count)
|
|
||||||
|
|
||||||
cmdTag, err := tx.Exec(context.Background(), `
|
|
||||||
INSERT INTO forwarding_history
|
|
||||||
SELECT *
|
|
||||||
FROM tmp_table
|
|
||||||
ON CONFLICT DO NOTHING
|
|
||||||
`)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("INSERT INTO forwarding_history error: %w", err)
|
|
||||||
}
|
|
||||||
log.Printf("count2: %v", cmdTag.RowsAffected())
|
|
||||||
return tx.Commit(context.Background())
|
|
||||||
}
|
|
||||||
Reference in New Issue
Block a user