Use new zeroconf mechanism from lnd 15.1

This commit is contained in:
Yaacov Akiba Slama
2022-09-22 21:17:06 +03:00
5 changed files with 88 additions and 26 deletions

34
db.go
View File

@@ -11,7 +11,6 @@ import (
"github.com/jackc/pgtype"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/lightningnetwork/lnd/lnwire"
)
var (
@@ -74,17 +73,23 @@ func registerPayment(destination, paymentHash, paymentSecret []byte, incomingAmo
return nil
}
func insertChannel(chanID uint64, channelPoint string, nodeID []byte, lastUpdate time.Time) error {
_, err := pgxPool.Exec(context.Background(),
`INSERT INTO
channels (chanid, channel_point, nodeid, last_update)
VALUES ($1, $2, $3, $4)
ON CONFLICT (chanid) DO UPDATE SET last_update=$4`,
chanID, channelPoint, nodeID, lastUpdate)
func insertChannel(initialChanID, confirmedChanId uint64, channelPoint string, nodeID []byte, lastUpdate time.Time) error {
query := `INSERT INTO
channels (initial_chanid, confirmed_chanid, channel_point, nodeid, last_update)
VALUES ($1, NULLIF($2, 0), $3, $4, $5)
ON CONFLICT (channel_point) DO UPDATE SET confirmed_chanid=NULLIF($2,0), last_update=$4`
c, err := pgxPool.Exec(context.Background(),
query, int64(initialChanID), int64(confirmedChanId), channelPoint, nodeID, lastUpdate)
if err != nil {
log.Printf("insertChannel(%v, %v, %s, %x) error: %v",
initialChanID, confirmedChanId, channelPoint, nodeID, err)
return fmt.Errorf("insertChannel(%v, %s, %x) error: %w",
chanID, channelPoint, nodeID, err)
initialChanID, confirmedChanId, nodeID, err)
}
log.Printf("insertChannel(%v, %s, %x) result: %v",
initialChanID, confirmedChanId, nodeID, c.String())
return nil
}
@@ -94,9 +99,9 @@ func confirmedChannels(sNodeID string) (map[string]uint64, error) {
return nil, fmt.Errorf("hex.DecodeString(%v) error: %w", sNodeID, err)
}
rows, err := pgxPool.Query(context.Background(),
`SELECT chanid, channel_point
`SELECT confirmed_chanid, channel_point
FROM channels
WHERE nodeid=$1`,
WHERE nodeid=$1 AND confirmed_chanid IS NOT NULL`,
nodeID)
if err != nil {
return nil, fmt.Errorf("channels(%x) error: %w", nodeID, err)
@@ -105,17 +110,14 @@ func confirmedChannels(sNodeID string) (map[string]uint64, error) {
chans := make(map[string]uint64)
for rows.Next() {
var (
chanID uint64
chanID int64
channelPoint string
)
err = rows.Scan(&chanID, &channelPoint)
if err != nil {
return nil, fmt.Errorf("channels(%x) rows.Scan error: %w", nodeID, err)
}
sid := lnwire.NewShortChanIDFromInt(chanID)
if !sid.IsFake() {
chans[channelPoint] = chanID
}
chans[channelPoint] = uint64(chanID)
}
return chans, rows.Err()
}

View File

@@ -8,6 +8,7 @@ import (
"os"
"time"
"github.com/lightningnetwork/lnd/htlcswitch/hop"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnrpc/chainrpc"
"google.golang.org/grpc/metadata"
@@ -28,7 +29,7 @@ func (cfe *copyFromEvents) Values() ([]interface{}, error) {
event := cfe.events[cfe.idx]
values := []interface{}{
event.TimestampNs,
event.ChanIdIn, event.ChanIdOut,
int64(event.ChanIdIn), int64(event.ChanIdOut),
event.AmtInMsat, event.AmtOutMsat}
return values, nil
}
@@ -83,7 +84,14 @@ func channelsSynchronizeOnce() error {
log.Printf("hex.DecodeString in channelsSynchronizeOnce error: %v", err)
continue
}
err = insertChannel(c.ChanId, c.ChannelPoint, nodeID, lastUpdate)
confirmedChanId := c.ChanId
if c.ZeroConf {
confirmedChanId = c.ZeroConfConfirmedScid
if confirmedChanId == hop.Source.ToUint64() {
confirmedChanId = 0
}
}
err = insertChannel(c.ChanId, confirmedChanId, c.ChannelPoint, nodeID, lastUpdate)
if err != nil {
log.Printf("insertChannel(%v, %v, %x) in channelsSynchronizeOnce error: %v", c.ChanId, c.ChannelPoint, nodeID, err)
continue

View File

@@ -13,6 +13,7 @@ import (
"github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/htlcswitch/hop"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnrpc/routerrpc"
"github.com/lightningnetwork/lnd/lnwire"
@@ -63,6 +64,7 @@ func openChannel(ctx context.Context, client lnrpc.LightningClient, paymentHash,
LocalFundingAmount: capacity,
TargetConf: 6,
Private: true,
ZeroConf: true,
})
if err != nil {
log.Printf("client.OpenChannelSync(%x, %v) error: %v", destination, capacity, err)
@@ -80,20 +82,27 @@ func openChannel(ctx context.Context, client lnrpc.LightningClient, paymentHash,
return channelPoint.GetFundingTxidBytes(), channelPoint.OutputIndex, err
}
func getChannel(ctx context.Context, client lnrpc.LightningClient, node []byte, channelPoint string) uint64 {
func getChannel(ctx context.Context, client lnrpc.LightningClient, node []byte, channelPoint string) (uint64, uint64) {
r, err := client.ListChannels(ctx, &lnrpc.ListChannelsRequest{Peer: node})
if err != nil {
log.Printf("client.ListChannels(%x) error: %v", node, err)
return 0
return 0, 0
}
for _, c := range r.Channels {
log.Printf("getChannel(%x): %v", node, c.ChanId)
if c.ChannelPoint == channelPoint && c.Active {
return c.ChanId
confirmedChanId := c.ChanId
if c.ZeroConf {
confirmedChanId = c.ZeroConfConfirmedScid
if confirmedChanId == hop.Source.ToUint64() {
confirmedChanId = 0
}
}
return c.ChanId, confirmedChanId
}
}
log.Printf("No channel found: getChannel(%x)", node)
return 0
return 0, 0
}
func failForwardSend(interceptorClient routerrpc.Router_HtlcInterceptorClient, incomingCircuitKey *routerrpc.CircuitKey) {
@@ -271,16 +280,16 @@ func resumeOrCancel(
) {
deadline := time.Now().Add(10 * time.Second)
for {
chanID := getChannel(ctx, client, destination, channelPoint)
if chanID != 0 {
initialChanID, confirmedChanID := getChannel(ctx, client, destination, channelPoint)
if initialChanID != 0 {
interceptorClient.Send(&routerrpc.ForwardHtlcInterceptResponse{
IncomingCircuitKey: incomingCircuitKey,
Action: routerrpc.ResolveHoldForwardAction_RESUME,
OutgoingAmountMsat: outgoingAmountMsat,
OutgoingRequestedChanId: chanID,
OutgoingRequestedChanId: initialChanID,
OnionBlob: onionBlob,
})
err := insertChannel(chanID, channelPoint, destination, time.Now())
err := insertChannel(initialChanID, confirmedChanID, channelPoint, destination, time.Now())
if err != nil {
log.Printf("insertChannel error: %v", err)
}

View File

@@ -0,0 +1,21 @@
ALTER INDEX public.channels_nodeid_idx RENAME TO channels_new_nodeid_idx;
ALTER INDEX public.channels_channel_point_pkey RENAME TO channels_new_channel_point_pkey
ALTER TABLE public.channels RENAME TO channels_new;
CREATE TABLE public.channels (
chanid int8 NOT NULL,
channel_point varchar NULL,
nodeid bytea NULL,
last_update timestamp NULL,
CONSTRAINT chanid_pkey PRIMARY KEY (chanid)
);
CREATE INDEX channels_nodeid_idx ON public.channels USING btree (nodeid);
INSERT INTO public.channels
SELECT initial_chanid chanid, channel_point, nodeid, last_update FROM channels_new;
INSERT INTO public.channels
SELECT confirmed_chanid chanid, channel_point, nodeid, last_update FROM channels_new
WHERE confirmed_chanid IS NOT NULL AND confirmed_chanid <> initial_chanid;
DROP TABLE channels_new;

View File

@@ -0,0 +1,22 @@
ALTER INDEX public.channels_nodeid_idx RENAME TO channels_old_nodeid_idx;
ALTER INDEX public.chanid_pkey RENAME TO channels_old_chanid_pkey;
ALTER TABLE public.channels RENAME TO channels_old;
CREATE TABLE public.channels (
initial_chanid int8 NOT NULL,
confirmed_chanid int8 NULL,
channel_point varchar NOT NULL,
nodeid bytea NOT NULL,
last_update timestamp NULL,
CONSTRAINT channels_channel_point_pkey PRIMARY KEY (channel_point)
);
CREATE INDEX channels_nodeid_idx ON public.channels USING btree (nodeid);
INSERT INTO public.channels
SELECT
min(chanid) initial_chanid,
CASE WHEN (max(chanid) >> 40) < (3 << 17) THEN NULL ELSE max(chanid) END confirmed_chanid,
channel_point, nodeid, max(last_update) last_update
FROM channels_old GROUP BY channel_point, nodeid;
DROP TABLE public.channels_old;