notifications: notify htlc when peer offline

This commit is contained in:
Jesse de Wit
2023-06-15 15:55:09 +02:00
parent ddc0195b41
commit 09e8bd3cb6
4 changed files with 173 additions and 68 deletions

View File

@@ -66,6 +66,10 @@ type NodeConfig struct {
// The channel can be closed if not used this duration in seconds.
MaxInactiveDuration uint64 `json:"maxInactiveDuration,string"`
// The maximum time to hold a htlc after sending a notification when the
// peer is offline.
NotificationTimeout string `json:"notificationTimeout,string"`
// Set this field to connect to an LND node.
Lnd *LndConfig `json:"lnd,omitempty"`

View File

@@ -13,6 +13,7 @@ import (
"github.com/breez/lspd/chain"
"github.com/breez/lspd/config"
"github.com/breez/lspd/lightning"
"github.com/breez/lspd/notifications"
"github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/wire"
sphinx "github.com/lightningnetwork/lightning-onion"
@@ -49,12 +50,13 @@ type InterceptResult struct {
}
type Interceptor struct {
client lightning.Client
config *config.NodeConfig
store InterceptStore
feeEstimator chain.FeeEstimator
feeStrategy chain.FeeStrategy
payHashGroup singleflight.Group
client lightning.Client
config *config.NodeConfig
store InterceptStore
feeEstimator chain.FeeEstimator
feeStrategy chain.FeeStrategy
payHashGroup singleflight.Group
notificationService *notifications.NotificationService
}
func NewInterceptor(
@@ -63,13 +65,15 @@ func NewInterceptor(
store InterceptStore,
feeEstimator chain.FeeEstimator,
feeStrategy chain.FeeStrategy,
notificationService *notifications.NotificationService,
) *Interceptor {
return &Interceptor{
client: client,
config: config,
store: store,
feeEstimator: feeEstimator,
feeStrategy: feeStrategy,
client: client,
config: config,
store: store,
feeEstimator: feeEstimator,
feeStrategy: feeStrategy,
notificationService: notificationService,
}
}
@@ -85,12 +89,13 @@ func (i *Interceptor) Intercept(scid *basetypes.ShortChannelID, reqPaymentHash [
}, nil
}
nextHop, err := i.client.GetPeerId(scid)
isRegistered := paymentSecret != nil
isProbe := isRegistered && !bytes.Equal(paymentHash, reqPaymentHash)
nextHop, _ := i.client.GetPeerId(scid)
if err != nil {
log.Printf("GetPeerId(%s) error: %v", scid.ToString(), err)
return InterceptResult{
Action: INTERCEPT_FAIL_HTLC_WITH_CODE,
FailureCode: FAILURE_TEMPORARY_NODE_FAILURE,
Action: INTERCEPT_RESUME,
}, nil
}
@@ -102,68 +107,163 @@ func (i *Interceptor) Intercept(scid *basetypes.ShortChannelID, reqPaymentHash [
}, nil
}
if paymentSecret == nil {
// nextHop is set if the sender's scid corresponds to a known channel
// destination is set if the payment was registered for a channel open.
// The 'actual' next hop will be either of those. Or nil if the next hop
// is unknown.
if nextHop == nil {
nextHop = destination
}
if nextHop != nil {
isConnected, err := i.client.IsConnected(nextHop)
if err != nil {
log.Printf("IsConnected(%x) error: %v", nextHop, err)
return InterceptResult{
Action: INTERCEPT_FAIL_HTLC_WITH_CODE,
FailureCode: FAILURE_TEMPORARY_CHANNEL_FAILURE,
}, nil
}
if !isConnected {
// If not connected, send a notification to the registered
// notification service for this client if available.
notified, err := i.notificationService.Notify(
hex.EncodeToString(nextHop),
reqPaymentHashStr,
)
// If this errors or the client is not notified, the client
// is offline or unknown. We'll resume the HTLC (which will
// result in UNKOWN_NEXT_PEER)
if err != nil {
return InterceptResult{
Action: INTERCEPT_RESUME,
}, nil
}
if notified {
log.Printf("Notified %x of pending htlc", nextHop)
d, err := time.ParseDuration(i.config.NotificationTimeout)
if err != nil {
log.Printf("WARN: No NotificationTimeout set. Using default 1m")
d = time.Minute
}
timeout := time.Now().Add(d)
err = i.client.WaitOnline(nextHop, timeout)
// If there's an error waiting, resume the htlc. It will
// probably fail with UNKNOWN_NEXT_PEER.
if err != nil {
log.Printf(
"waiting for peer %x to come online failed with %v",
nextHop,
err,
)
return InterceptResult{
Action: INTERCEPT_RESUME,
}, nil
}
log.Printf("Peer %x is back online. Continue htlc.", nextHop)
// At this point we know a few things.
// - This is either a channel partner or a registered payment
// - they were offline
// - They got notified about the htlc
// - They came back online
// So if this payment was not registered, this is a channel
// partner and we have to wait for the channel to become active
// before we can forward.
if !isRegistered {
err = i.client.WaitChannelActive(nextHop, timeout)
if err != nil {
log.Printf(
"waiting for channnel with %x to become active failed with %v",
nextHop,
err,
)
return InterceptResult{
Action: INTERCEPT_RESUME,
}, nil
}
}
} else if isProbe {
return InterceptResult{
Action: INTERCEPT_FAIL_HTLC_WITH_CODE,
FailureCode: FAILURE_INCORRECT_OR_UNKNOWN_PAYMENT_DETAILS,
}, nil
} else {
// If we haven't notified, resume the htlc. It will probably
// fail with UNKNOWN_NEXT_PEER.
return InterceptResult{
Action: INTERCEPT_RESUME,
}, nil
}
}
}
if !isRegistered {
return InterceptResult{
Action: INTERCEPT_RESUME,
}, nil
}
if channelPoint == nil {
if bytes.Equal(paymentHash, reqPaymentHash) {
// TODO: When opening_fee_params is enforced, turn this check in a temporary channel failure.
if params == nil {
log.Printf("DEPRECATED: Intercepted htlc with deprecated fee mechanism. Using default fees. payment hash: %s", reqPaymentHashStr)
params = &OpeningFeeParams{
MinMsat: uint64(i.config.ChannelMinimumFeeMsat),
Proportional: uint32(i.config.ChannelFeePermyriad * 100),
ValidUntil: time.Now().UTC().Add(time.Duration(time.Hour * 24)).Format(basetypes.TIME_FORMAT),
MaxIdleTime: uint32(i.config.MaxInactiveDuration / 600),
MaxClientToSelfDelay: uint32(10000),
}
}
if int64(reqIncomingExpiry)-int64(reqOutgoingExpiry) < int64(i.config.TimeLockDelta) {
return InterceptResult{
Action: INTERCEPT_FAIL_HTLC_WITH_CODE,
FailureCode: FAILURE_TEMPORARY_CHANNEL_FAILURE,
}, nil
}
validUntil, err := time.Parse(basetypes.TIME_FORMAT, params.ValidUntil)
if err != nil {
log.Printf("time.Parse(%s, %s) failed. Failing channel open: %v", basetypes.TIME_FORMAT, params.ValidUntil, err)
return InterceptResult{
Action: INTERCEPT_FAIL_HTLC_WITH_CODE,
FailureCode: FAILURE_TEMPORARY_CHANNEL_FAILURE,
}, nil
}
if time.Now().UTC().After(validUntil) {
if !i.isCurrentChainFeeCheaper(token, params) {
log.Printf("Intercepted expired payment registration. Failing payment. payment hash: %x, valid until: %s", paymentHash, params.ValidUntil)
return InterceptResult{
Action: INTERCEPT_FAIL_HTLC_WITH_CODE,
FailureCode: FAILURE_TEMPORARY_CHANNEL_FAILURE,
}, nil
}
log.Printf("Intercepted expired payment registration. Opening channel anyway, because it's cheaper at the current rate. paymenthash: %s, params: %+v", reqPaymentHashStr, params)
}
channelPoint, err = i.openChannel(reqPaymentHash, destination, incomingAmountMsat, tag)
if err != nil {
log.Printf("openChannel(%x, %v) err: %v", destination, incomingAmountMsat, err)
return InterceptResult{
Action: INTERCEPT_FAIL_HTLC_WITH_CODE,
FailureCode: FAILURE_TEMPORARY_CHANNEL_FAILURE,
}, nil
}
} else { //probing
if isProbe {
return InterceptResult{
Action: INTERCEPT_FAIL_HTLC_WITH_CODE,
FailureCode: FAILURE_INCORRECT_OR_UNKNOWN_PAYMENT_DETAILS,
}, nil
}
// TODO: When opening_fee_params is enforced, turn this check in a temporary channel failure.
if params == nil {
log.Printf("DEPRECATED: Intercepted htlc with deprecated fee mechanism. Using default fees. payment hash: %s", reqPaymentHashStr)
params = &OpeningFeeParams{
MinMsat: uint64(i.config.ChannelMinimumFeeMsat),
Proportional: uint32(i.config.ChannelFeePermyriad * 100),
ValidUntil: time.Now().UTC().Add(time.Duration(time.Hour * 24)).Format(basetypes.TIME_FORMAT),
MaxIdleTime: uint32(i.config.MaxInactiveDuration / 600),
MaxClientToSelfDelay: uint32(10000),
}
}
if int64(reqIncomingExpiry)-int64(reqOutgoingExpiry) < int64(i.config.TimeLockDelta) {
return InterceptResult{
Action: INTERCEPT_FAIL_HTLC_WITH_CODE,
FailureCode: FAILURE_TEMPORARY_CHANNEL_FAILURE,
}, nil
}
validUntil, err := time.Parse(basetypes.TIME_FORMAT, params.ValidUntil)
if err != nil {
log.Printf("time.Parse(%s, %s) failed. Failing channel open: %v", basetypes.TIME_FORMAT, params.ValidUntil, err)
return InterceptResult{
Action: INTERCEPT_FAIL_HTLC_WITH_CODE,
FailureCode: FAILURE_TEMPORARY_CHANNEL_FAILURE,
}, nil
}
if time.Now().UTC().After(validUntil) {
if !i.isCurrentChainFeeCheaper(token, params) {
log.Printf("Intercepted expired payment registration. Failing payment. payment hash: %x, valid until: %s", paymentHash, params.ValidUntil)
return InterceptResult{
Action: INTERCEPT_FAIL_HTLC_WITH_CODE,
FailureCode: FAILURE_TEMPORARY_CHANNEL_FAILURE,
}, nil
}
log.Printf("Intercepted expired payment registration. Opening channel anyway, because it's cheaper at the current rate. paymenthash: %s, params: %+v", reqPaymentHashStr, params)
}
channelPoint, err = i.openChannel(reqPaymentHash, destination, incomingAmountMsat, tag)
if err != nil {
log.Printf("openChannel(%x, %v) err: %v", destination, incomingAmountMsat, err)
return InterceptResult{
Action: INTERCEPT_FAIL_HTLC_WITH_CODE,
FailureCode: FAILURE_TEMPORARY_CHANNEL_FAILURE,
}, nil
}
}
pubKey, err := btcec.ParsePubKey(destination)

View File

@@ -57,7 +57,7 @@ func testFailureBobOffline(p *testParams) {
log.Printf("Alice paying")
route := constructRoute(p.lsp.LightningNode(), p.BreezClient().Node(), channelId, lntest.NewShortChanIDFromString("1x0x0"), outerAmountMsat)
_, err := alice.PayViaRoute(outerAmountMsat, outerInvoice.paymentHash, outerInvoice.paymentSecret, route)
assert.Contains(p.t, err.Error(), "WIRE_TEMPORARY_CHANNEL_FAILURE")
assert.Contains(p.t, err.Error(), "WIRE_UNKNOWN_NEXT_PEER")
log.Printf("Starting breez client again")
p.BreezClient().Start()

View File

@@ -79,6 +79,7 @@ func main() {
interceptStore := postgresql.NewPostgresInterceptStore(pool)
forwardingStore := postgresql.NewForwardingEventStore(pool)
notificationsStore := postgresql.NewNotificationsStore(pool)
notificationService := notifications.NewNotificationService(notificationsStore)
var interceptors []interceptor.HtlcInterceptor
for _, node := range nodes {
@@ -91,7 +92,7 @@ func main() {
client.StartListeners()
fwsync := lnd.NewForwardingHistorySync(client, interceptStore, forwardingStore)
interceptor := interceptor.NewInterceptor(client, node, interceptStore, feeEstimator, feeStrategy)
interceptor := interceptor.NewInterceptor(client, node, interceptStore, feeEstimator, feeStrategy, notificationService)
htlcInterceptor, err = lnd.NewLndHtlcInterceptor(node, client, fwsync, interceptor)
if err != nil {
log.Fatalf("failed to initialize LND interceptor: %v", err)
@@ -104,7 +105,7 @@ func main() {
log.Fatalf("failed to initialize CLN client: %v", err)
}
interceptor := interceptor.NewInterceptor(client, node, interceptStore, feeEstimator, feeStrategy)
interceptor := interceptor.NewInterceptor(client, node, interceptStore, feeEstimator, feeStrategy, notificationService)
htlcInterceptor, err = cln.NewClnHtlcInterceptor(node, client, interceptor)
if err != nil {
log.Fatalf("failed to initialize CLN interceptor: %v", err)