diff --git a/db.go b/db.go index acc3202..9789528 100644 --- a/db.go +++ b/db.go @@ -11,7 +11,6 @@ import ( "github.com/jackc/pgtype" "github.com/jackc/pgx/v4" "github.com/jackc/pgx/v4/pgxpool" - "github.com/lightningnetwork/lnd/lnwire" ) var ( @@ -74,17 +73,23 @@ func registerPayment(destination, paymentHash, paymentSecret []byte, incomingAmo return nil } -func insertChannel(chanID uint64, channelPoint string, nodeID []byte, lastUpdate time.Time) error { - _, err := pgxPool.Exec(context.Background(), - `INSERT INTO - channels (chanid, channel_point, nodeid, last_update) - VALUES ($1, $2, $3, $4) - ON CONFLICT (chanid) DO UPDATE SET last_update=$4`, - chanID, channelPoint, nodeID, lastUpdate) +func insertChannel(initialChanID, confirmedChanId uint64, channelPoint string, nodeID []byte, lastUpdate time.Time) error { + + query := `INSERT INTO + channels (initial_chanid, confirmed_chanid, channel_point, nodeid, last_update) + VALUES ($1, NULLIF($2, 0), $3, $4, $5) + ON CONFLICT (channel_point) DO UPDATE SET confirmed_chanid=NULLIF($2,0), last_update=$4` + + c, err := pgxPool.Exec(context.Background(), + query, int64(initialChanID), int64(confirmedChanId), channelPoint, nodeID, lastUpdate) if err != nil { + log.Printf("insertChannel(%v, %v, %s, %x) error: %v", + initialChanID, confirmedChanId, channelPoint, nodeID, err) return fmt.Errorf("insertChannel(%v, %s, %x) error: %w", - chanID, channelPoint, nodeID, err) + initialChanID, confirmedChanId, nodeID, err) } + log.Printf("insertChannel(%v, %s, %x) result: %v", + initialChanID, confirmedChanId, nodeID, c.String()) return nil } @@ -94,9 +99,9 @@ func confirmedChannels(sNodeID string) (map[string]uint64, error) { return nil, fmt.Errorf("hex.DecodeString(%v) error: %w", sNodeID, err) } rows, err := pgxPool.Query(context.Background(), - `SELECT chanid, channel_point + `SELECT confirmed_chanid, channel_point FROM channels - WHERE nodeid=$1`, + WHERE nodeid=$1 AND confirmed_chanid IS NOT NULL`, nodeID) if err != nil { return nil, fmt.Errorf("channels(%x) error: %w", nodeID, err) @@ -105,17 +110,14 @@ func confirmedChannels(sNodeID string) (map[string]uint64, error) { chans := make(map[string]uint64) for rows.Next() { var ( - chanID uint64 + chanID int64 channelPoint string ) err = rows.Scan(&chanID, &channelPoint) if err != nil { return nil, fmt.Errorf("channels(%x) rows.Scan error: %w", nodeID, err) } - sid := lnwire.NewShortChanIDFromInt(chanID) - if !sid.IsFake() { - chans[channelPoint] = chanID - } + chans[channelPoint] = uint64(chanID) } return chans, rows.Err() } diff --git a/forwarding_history.go b/forwarding_history.go index 4a98caa..7fd4f18 100644 --- a/forwarding_history.go +++ b/forwarding_history.go @@ -8,6 +8,7 @@ import ( "os" "time" + "github.com/lightningnetwork/lnd/htlcswitch/hop" "github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnrpc/chainrpc" "google.golang.org/grpc/metadata" @@ -28,7 +29,7 @@ func (cfe *copyFromEvents) Values() ([]interface{}, error) { event := cfe.events[cfe.idx] values := []interface{}{ event.TimestampNs, - event.ChanIdIn, event.ChanIdOut, + int64(event.ChanIdIn), int64(event.ChanIdOut), event.AmtInMsat, event.AmtOutMsat} return values, nil } @@ -83,7 +84,14 @@ func channelsSynchronizeOnce() error { log.Printf("hex.DecodeString in channelsSynchronizeOnce error: %v", err) continue } - err = insertChannel(c.ChanId, c.ChannelPoint, nodeID, lastUpdate) + confirmedChanId := c.ChanId + if c.ZeroConf { + confirmedChanId = c.ZeroConfConfirmedScid + if confirmedChanId == hop.Source.ToUint64() { + confirmedChanId = 0 + } + } + err = insertChannel(c.ChanId, confirmedChanId, c.ChannelPoint, nodeID, lastUpdate) if err != nil { log.Printf("insertChannel(%v, %v, %x) in channelsSynchronizeOnce error: %v", c.ChanId, c.ChannelPoint, nodeID, err) continue diff --git a/intercept.go b/intercept.go index 1f4ca2a..59d28e8 100644 --- a/intercept.go +++ b/intercept.go @@ -13,6 +13,7 @@ import ( "github.com/btcsuite/btcd/btcec/v2" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/wire" + "github.com/lightningnetwork/lnd/htlcswitch/hop" "github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnrpc/routerrpc" "github.com/lightningnetwork/lnd/lnwire" @@ -63,6 +64,7 @@ func openChannel(ctx context.Context, client lnrpc.LightningClient, paymentHash, LocalFundingAmount: capacity, TargetConf: 6, Private: true, + ZeroConf: true, }) if err != nil { log.Printf("client.OpenChannelSync(%x, %v) error: %v", destination, capacity, err) @@ -80,20 +82,27 @@ func openChannel(ctx context.Context, client lnrpc.LightningClient, paymentHash, return channelPoint.GetFundingTxidBytes(), channelPoint.OutputIndex, err } -func getChannel(ctx context.Context, client lnrpc.LightningClient, node []byte, channelPoint string) uint64 { +func getChannel(ctx context.Context, client lnrpc.LightningClient, node []byte, channelPoint string) (uint64, uint64) { r, err := client.ListChannels(ctx, &lnrpc.ListChannelsRequest{Peer: node}) if err != nil { log.Printf("client.ListChannels(%x) error: %v", node, err) - return 0 + return 0, 0 } for _, c := range r.Channels { log.Printf("getChannel(%x): %v", node, c.ChanId) if c.ChannelPoint == channelPoint && c.Active { - return c.ChanId + confirmedChanId := c.ChanId + if c.ZeroConf { + confirmedChanId = c.ZeroConfConfirmedScid + if confirmedChanId == hop.Source.ToUint64() { + confirmedChanId = 0 + } + } + return c.ChanId, confirmedChanId } } log.Printf("No channel found: getChannel(%x)", node) - return 0 + return 0, 0 } func failForwardSend(interceptorClient routerrpc.Router_HtlcInterceptorClient, incomingCircuitKey *routerrpc.CircuitKey) { @@ -271,16 +280,16 @@ func resumeOrCancel( ) { deadline := time.Now().Add(10 * time.Second) for { - chanID := getChannel(ctx, client, destination, channelPoint) - if chanID != 0 { + initialChanID, confirmedChanID := getChannel(ctx, client, destination, channelPoint) + if initialChanID != 0 { interceptorClient.Send(&routerrpc.ForwardHtlcInterceptResponse{ IncomingCircuitKey: incomingCircuitKey, Action: routerrpc.ResolveHoldForwardAction_RESUME, OutgoingAmountMsat: outgoingAmountMsat, - OutgoingRequestedChanId: chanID, + OutgoingRequestedChanId: initialChanID, OnionBlob: onionBlob, }) - err := insertChannel(chanID, channelPoint, destination, time.Now()) + err := insertChannel(initialChanID, confirmedChanID, channelPoint, destination, time.Now()) if err != nil { log.Printf("insertChannel error: %v", err) } diff --git a/postgresql/migrations/000008_one_record_per_channel.down.sql b/postgresql/migrations/000008_one_record_per_channel.down.sql new file mode 100644 index 0000000..4ba53f6 --- /dev/null +++ b/postgresql/migrations/000008_one_record_per_channel.down.sql @@ -0,0 +1,21 @@ +ALTER INDEX public.channels_nodeid_idx RENAME TO channels_new_nodeid_idx; +ALTER INDEX public.channels_channel_point_pkey RENAME TO channels_new_channel_point_pkey +ALTER TABLE public.channels RENAME TO channels_new; + +CREATE TABLE public.channels ( + chanid int8 NOT NULL, + channel_point varchar NULL, + nodeid bytea NULL, + last_update timestamp NULL, + CONSTRAINT chanid_pkey PRIMARY KEY (chanid) +); +CREATE INDEX channels_nodeid_idx ON public.channels USING btree (nodeid); + +INSERT INTO public.channels +SELECT initial_chanid chanid, channel_point, nodeid, last_update FROM channels_new; + +INSERT INTO public.channels +SELECT confirmed_chanid chanid, channel_point, nodeid, last_update FROM channels_new + WHERE confirmed_chanid IS NOT NULL AND confirmed_chanid <> initial_chanid; + +DROP TABLE channels_new; diff --git a/postgresql/migrations/000008_one_record_per_channel.up.sql b/postgresql/migrations/000008_one_record_per_channel.up.sql new file mode 100644 index 0000000..2261a23 --- /dev/null +++ b/postgresql/migrations/000008_one_record_per_channel.up.sql @@ -0,0 +1,22 @@ +ALTER INDEX public.channels_nodeid_idx RENAME TO channels_old_nodeid_idx; +ALTER INDEX public.chanid_pkey RENAME TO channels_old_chanid_pkey; +ALTER TABLE public.channels RENAME TO channels_old; + +CREATE TABLE public.channels ( + initial_chanid int8 NOT NULL, + confirmed_chanid int8 NULL, + channel_point varchar NOT NULL, + nodeid bytea NOT NULL, + last_update timestamp NULL, + CONSTRAINT channels_channel_point_pkey PRIMARY KEY (channel_point) +); +CREATE INDEX channels_nodeid_idx ON public.channels USING btree (nodeid); + +INSERT INTO public.channels +SELECT + min(chanid) initial_chanid, + CASE WHEN (max(chanid) >> 40) < (3 << 17) THEN NULL ELSE max(chanid) END confirmed_chanid, + channel_point, nodeid, max(last_update) last_update +FROM channels_old GROUP BY channel_point, nodeid; + +DROP TABLE public.channels_old;