From 09e8bd3cb6b5ebfe96babda2626f9f539f348014 Mon Sep 17 00:00:00 2001 From: Jesse de Wit Date: Thu, 15 Jun 2023 15:55:09 +0200 Subject: [PATCH] notifications: notify htlc when peer offline --- config/config.go | 4 + interceptor/intercept.go | 230 +++++++++++++++++++++++++++----------- itest/bob_offline_test.go | 2 +- main.go | 5 +- 4 files changed, 173 insertions(+), 68 deletions(-) diff --git a/config/config.go b/config/config.go index 0f18c94..dd287e5 100644 --- a/config/config.go +++ b/config/config.go @@ -66,6 +66,10 @@ type NodeConfig struct { // The channel can be closed if not used this duration in seconds. MaxInactiveDuration uint64 `json:"maxInactiveDuration,string"` + // The maximum time to hold a htlc after sending a notification when the + // peer is offline. + NotificationTimeout string `json:"notificationTimeout,string"` + // Set this field to connect to an LND node. Lnd *LndConfig `json:"lnd,omitempty"` diff --git a/interceptor/intercept.go b/interceptor/intercept.go index 35b6324..cffa14c 100644 --- a/interceptor/intercept.go +++ b/interceptor/intercept.go @@ -13,6 +13,7 @@ import ( "github.com/breez/lspd/chain" "github.com/breez/lspd/config" "github.com/breez/lspd/lightning" + "github.com/breez/lspd/notifications" "github.com/btcsuite/btcd/btcec/v2" "github.com/btcsuite/btcd/wire" sphinx "github.com/lightningnetwork/lightning-onion" @@ -49,12 +50,13 @@ type InterceptResult struct { } type Interceptor struct { - client lightning.Client - config *config.NodeConfig - store InterceptStore - feeEstimator chain.FeeEstimator - feeStrategy chain.FeeStrategy - payHashGroup singleflight.Group + client lightning.Client + config *config.NodeConfig + store InterceptStore + feeEstimator chain.FeeEstimator + feeStrategy chain.FeeStrategy + payHashGroup singleflight.Group + notificationService *notifications.NotificationService } func NewInterceptor( @@ -63,13 +65,15 @@ func NewInterceptor( store InterceptStore, feeEstimator chain.FeeEstimator, feeStrategy chain.FeeStrategy, + notificationService *notifications.NotificationService, ) *Interceptor { return &Interceptor{ - client: client, - config: config, - store: store, - feeEstimator: feeEstimator, - feeStrategy: feeStrategy, + client: client, + config: config, + store: store, + feeEstimator: feeEstimator, + feeStrategy: feeStrategy, + notificationService: notificationService, } } @@ -85,12 +89,13 @@ func (i *Interceptor) Intercept(scid *basetypes.ShortChannelID, reqPaymentHash [ }, nil } - nextHop, err := i.client.GetPeerId(scid) + isRegistered := paymentSecret != nil + isProbe := isRegistered && !bytes.Equal(paymentHash, reqPaymentHash) + nextHop, _ := i.client.GetPeerId(scid) if err != nil { log.Printf("GetPeerId(%s) error: %v", scid.ToString(), err) return InterceptResult{ - Action: INTERCEPT_FAIL_HTLC_WITH_CODE, - FailureCode: FAILURE_TEMPORARY_NODE_FAILURE, + Action: INTERCEPT_RESUME, }, nil } @@ -102,68 +107,163 @@ func (i *Interceptor) Intercept(scid *basetypes.ShortChannelID, reqPaymentHash [ }, nil } - if paymentSecret == nil { + // nextHop is set if the sender's scid corresponds to a known channel + // destination is set if the payment was registered for a channel open. + // The 'actual' next hop will be either of those. Or nil if the next hop + // is unknown. + if nextHop == nil { + nextHop = destination + } + + if nextHop != nil { + isConnected, err := i.client.IsConnected(nextHop) + if err != nil { + log.Printf("IsConnected(%x) error: %v", nextHop, err) + return InterceptResult{ + Action: INTERCEPT_FAIL_HTLC_WITH_CODE, + FailureCode: FAILURE_TEMPORARY_CHANNEL_FAILURE, + }, nil + } + + if !isConnected { + // If not connected, send a notification to the registered + // notification service for this client if available. + notified, err := i.notificationService.Notify( + hex.EncodeToString(nextHop), + reqPaymentHashStr, + ) + + // If this errors or the client is not notified, the client + // is offline or unknown. We'll resume the HTLC (which will + // result in UNKOWN_NEXT_PEER) + if err != nil { + return InterceptResult{ + Action: INTERCEPT_RESUME, + }, nil + } + + if notified { + log.Printf("Notified %x of pending htlc", nextHop) + d, err := time.ParseDuration(i.config.NotificationTimeout) + if err != nil { + log.Printf("WARN: No NotificationTimeout set. Using default 1m") + d = time.Minute + } + timeout := time.Now().Add(d) + err = i.client.WaitOnline(nextHop, timeout) + + // If there's an error waiting, resume the htlc. It will + // probably fail with UNKNOWN_NEXT_PEER. + if err != nil { + log.Printf( + "waiting for peer %x to come online failed with %v", + nextHop, + err, + ) + return InterceptResult{ + Action: INTERCEPT_RESUME, + }, nil + } + + log.Printf("Peer %x is back online. Continue htlc.", nextHop) + // At this point we know a few things. + // - This is either a channel partner or a registered payment + // - they were offline + // - They got notified about the htlc + // - They came back online + // So if this payment was not registered, this is a channel + // partner and we have to wait for the channel to become active + // before we can forward. + if !isRegistered { + err = i.client.WaitChannelActive(nextHop, timeout) + if err != nil { + log.Printf( + "waiting for channnel with %x to become active failed with %v", + nextHop, + err, + ) + return InterceptResult{ + Action: INTERCEPT_RESUME, + }, nil + } + } + } else if isProbe { + return InterceptResult{ + Action: INTERCEPT_FAIL_HTLC_WITH_CODE, + FailureCode: FAILURE_INCORRECT_OR_UNKNOWN_PAYMENT_DETAILS, + }, nil + } else { + // If we haven't notified, resume the htlc. It will probably + // fail with UNKNOWN_NEXT_PEER. + return InterceptResult{ + Action: INTERCEPT_RESUME, + }, nil + } + } + } + + if !isRegistered { return InterceptResult{ Action: INTERCEPT_RESUME, }, nil } if channelPoint == nil { - if bytes.Equal(paymentHash, reqPaymentHash) { - // TODO: When opening_fee_params is enforced, turn this check in a temporary channel failure. - if params == nil { - log.Printf("DEPRECATED: Intercepted htlc with deprecated fee mechanism. Using default fees. payment hash: %s", reqPaymentHashStr) - params = &OpeningFeeParams{ - MinMsat: uint64(i.config.ChannelMinimumFeeMsat), - Proportional: uint32(i.config.ChannelFeePermyriad * 100), - ValidUntil: time.Now().UTC().Add(time.Duration(time.Hour * 24)).Format(basetypes.TIME_FORMAT), - MaxIdleTime: uint32(i.config.MaxInactiveDuration / 600), - MaxClientToSelfDelay: uint32(10000), - } - } - - if int64(reqIncomingExpiry)-int64(reqOutgoingExpiry) < int64(i.config.TimeLockDelta) { - return InterceptResult{ - Action: INTERCEPT_FAIL_HTLC_WITH_CODE, - FailureCode: FAILURE_TEMPORARY_CHANNEL_FAILURE, - }, nil - } - - validUntil, err := time.Parse(basetypes.TIME_FORMAT, params.ValidUntil) - if err != nil { - log.Printf("time.Parse(%s, %s) failed. Failing channel open: %v", basetypes.TIME_FORMAT, params.ValidUntil, err) - return InterceptResult{ - Action: INTERCEPT_FAIL_HTLC_WITH_CODE, - FailureCode: FAILURE_TEMPORARY_CHANNEL_FAILURE, - }, nil - } - - if time.Now().UTC().After(validUntil) { - if !i.isCurrentChainFeeCheaper(token, params) { - log.Printf("Intercepted expired payment registration. Failing payment. payment hash: %x, valid until: %s", paymentHash, params.ValidUntil) - return InterceptResult{ - Action: INTERCEPT_FAIL_HTLC_WITH_CODE, - FailureCode: FAILURE_TEMPORARY_CHANNEL_FAILURE, - }, nil - } - - log.Printf("Intercepted expired payment registration. Opening channel anyway, because it's cheaper at the current rate. paymenthash: %s, params: %+v", reqPaymentHashStr, params) - } - - channelPoint, err = i.openChannel(reqPaymentHash, destination, incomingAmountMsat, tag) - if err != nil { - log.Printf("openChannel(%x, %v) err: %v", destination, incomingAmountMsat, err) - return InterceptResult{ - Action: INTERCEPT_FAIL_HTLC_WITH_CODE, - FailureCode: FAILURE_TEMPORARY_CHANNEL_FAILURE, - }, nil - } - } else { //probing + if isProbe { return InterceptResult{ Action: INTERCEPT_FAIL_HTLC_WITH_CODE, FailureCode: FAILURE_INCORRECT_OR_UNKNOWN_PAYMENT_DETAILS, }, nil } + + // TODO: When opening_fee_params is enforced, turn this check in a temporary channel failure. + if params == nil { + log.Printf("DEPRECATED: Intercepted htlc with deprecated fee mechanism. Using default fees. payment hash: %s", reqPaymentHashStr) + params = &OpeningFeeParams{ + MinMsat: uint64(i.config.ChannelMinimumFeeMsat), + Proportional: uint32(i.config.ChannelFeePermyriad * 100), + ValidUntil: time.Now().UTC().Add(time.Duration(time.Hour * 24)).Format(basetypes.TIME_FORMAT), + MaxIdleTime: uint32(i.config.MaxInactiveDuration / 600), + MaxClientToSelfDelay: uint32(10000), + } + } + + if int64(reqIncomingExpiry)-int64(reqOutgoingExpiry) < int64(i.config.TimeLockDelta) { + return InterceptResult{ + Action: INTERCEPT_FAIL_HTLC_WITH_CODE, + FailureCode: FAILURE_TEMPORARY_CHANNEL_FAILURE, + }, nil + } + + validUntil, err := time.Parse(basetypes.TIME_FORMAT, params.ValidUntil) + if err != nil { + log.Printf("time.Parse(%s, %s) failed. Failing channel open: %v", basetypes.TIME_FORMAT, params.ValidUntil, err) + return InterceptResult{ + Action: INTERCEPT_FAIL_HTLC_WITH_CODE, + FailureCode: FAILURE_TEMPORARY_CHANNEL_FAILURE, + }, nil + } + + if time.Now().UTC().After(validUntil) { + if !i.isCurrentChainFeeCheaper(token, params) { + log.Printf("Intercepted expired payment registration. Failing payment. payment hash: %x, valid until: %s", paymentHash, params.ValidUntil) + return InterceptResult{ + Action: INTERCEPT_FAIL_HTLC_WITH_CODE, + FailureCode: FAILURE_TEMPORARY_CHANNEL_FAILURE, + }, nil + } + + log.Printf("Intercepted expired payment registration. Opening channel anyway, because it's cheaper at the current rate. paymenthash: %s, params: %+v", reqPaymentHashStr, params) + } + + channelPoint, err = i.openChannel(reqPaymentHash, destination, incomingAmountMsat, tag) + if err != nil { + log.Printf("openChannel(%x, %v) err: %v", destination, incomingAmountMsat, err) + return InterceptResult{ + Action: INTERCEPT_FAIL_HTLC_WITH_CODE, + FailureCode: FAILURE_TEMPORARY_CHANNEL_FAILURE, + }, nil + } } pubKey, err := btcec.ParsePubKey(destination) diff --git a/itest/bob_offline_test.go b/itest/bob_offline_test.go index 8b214b7..9e04fda 100644 --- a/itest/bob_offline_test.go +++ b/itest/bob_offline_test.go @@ -57,7 +57,7 @@ func testFailureBobOffline(p *testParams) { log.Printf("Alice paying") route := constructRoute(p.lsp.LightningNode(), p.BreezClient().Node(), channelId, lntest.NewShortChanIDFromString("1x0x0"), outerAmountMsat) _, err := alice.PayViaRoute(outerAmountMsat, outerInvoice.paymentHash, outerInvoice.paymentSecret, route) - assert.Contains(p.t, err.Error(), "WIRE_TEMPORARY_CHANNEL_FAILURE") + assert.Contains(p.t, err.Error(), "WIRE_UNKNOWN_NEXT_PEER") log.Printf("Starting breez client again") p.BreezClient().Start() diff --git a/main.go b/main.go index 08d3446..2965738 100644 --- a/main.go +++ b/main.go @@ -79,6 +79,7 @@ func main() { interceptStore := postgresql.NewPostgresInterceptStore(pool) forwardingStore := postgresql.NewForwardingEventStore(pool) notificationsStore := postgresql.NewNotificationsStore(pool) + notificationService := notifications.NewNotificationService(notificationsStore) var interceptors []interceptor.HtlcInterceptor for _, node := range nodes { @@ -91,7 +92,7 @@ func main() { client.StartListeners() fwsync := lnd.NewForwardingHistorySync(client, interceptStore, forwardingStore) - interceptor := interceptor.NewInterceptor(client, node, interceptStore, feeEstimator, feeStrategy) + interceptor := interceptor.NewInterceptor(client, node, interceptStore, feeEstimator, feeStrategy, notificationService) htlcInterceptor, err = lnd.NewLndHtlcInterceptor(node, client, fwsync, interceptor) if err != nil { log.Fatalf("failed to initialize LND interceptor: %v", err) @@ -104,7 +105,7 @@ func main() { log.Fatalf("failed to initialize CLN client: %v", err) } - interceptor := interceptor.NewInterceptor(client, node, interceptStore, feeEstimator, feeStrategy) + interceptor := interceptor.NewInterceptor(client, node, interceptStore, feeEstimator, feeStrategy, notificationService) htlcInterceptor, err = cln.NewClnHtlcInterceptor(node, client, interceptor) if err != nil { log.Fatalf("failed to initialize CLN interceptor: %v", err)