diff --git a/routing/missioncontrol.go b/routing/missioncontrol.go index 364a0e3a..627b1789 100644 --- a/routing/missioncontrol.go +++ b/routing/missioncontrol.go @@ -5,8 +5,6 @@ import ( "sync" "time" - "github.com/coreos/bbolt" - "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/routing/route" ) @@ -16,6 +14,30 @@ const ( // half-life duration defines after how much time a penalized node or // channel is back at 50% probability. DefaultPenaltyHalfLife = time.Hour + + // minSecondChanceInterval is the minimum time required between + // second-chance failures. + // + // If nodes return a channel policy related failure, they may get a + // second chance to forward the payment. It could be that the channel + // policy that we are aware of is not up to date. This is especially + // important in case of mobile apps that are mostly offline. + // + // However, we don't want to give nodes the option to endlessly return + // new channel updates so that we are kept busy trying to route through + // that node until the payment loop times out. + // + // Therefore we only grant a second chance to a node if the previous + // second chance is sufficiently long ago. This is what + // minSecondChanceInterval defines. If a second policy failure comes in + // within that interval, we will apply a penalty. + // + // Second chances granted are tracked on the level of node pairs. This + // means that if a node has multiple channels to the same peer, they + // will only get a single second chance to route to that peer again. + // Nodes forward non-strict, so it isn't necessary to apply a less + // restrictive channel level tracking scheme here. + minSecondChanceInterval = time.Minute ) // MissionControl contains state which summarizes the past attempts of HTLC @@ -30,6 +52,10 @@ const ( type MissionControl struct { history map[route.Vertex]*nodeHistory + // lastSecondChance tracks the last time a second chance was granted for + // a directed node pair. + lastSecondChance map[DirectedNodePair]time.Time + // now is expected to return the current time. It is supplied as an // external function to enable deterministic unit tests. now func() time.Time @@ -127,13 +153,13 @@ func NewMissionControl(cfg *MissionControlConfig) *MissionControl { cfg.PenaltyHalfLife, cfg.AprioriHopProbability) return &MissionControl{ - history: make(map[route.Vertex]*nodeHistory), - now: time.Now, - cfg: cfg, + history: make(map[route.Vertex]*nodeHistory), + lastSecondChance: make(map[DirectedNodePair]time.Time), + now: time.Now, + cfg: cfg, } } - // ResetHistory resets the history of MissionControl returning it to a state as // if no payment attempts have been made. func (m *MissionControl) ResetHistory() { @@ -141,6 +167,7 @@ func (m *MissionControl) ResetHistory() { defer m.Unlock() m.history = make(map[route.Vertex]*nodeHistory) + m.lastSecondChance = make(map[DirectedNodePair]time.Time) log.Debugf("Mission control history cleared") } @@ -209,6 +236,37 @@ func (m *MissionControl) getEdgeProbabilityForNode(nodeHistory *nodeHistory, return probability } +// requestSecondChance checks whether the node fromNode can have a second chance +// at providing a channel update for its channel with toNode. +func (m *MissionControl) requestSecondChance(timestamp time.Time, + fromNode, toNode route.Vertex) bool { + + // Look up previous second chance time. + pair := DirectedNodePair{ + From: fromNode, + To: toNode, + } + lastSecondChance, ok := m.lastSecondChance[pair] + + // If the channel hasn't already be given a second chance or its last + // second chance was long ago, we give it another chance. + if !ok || timestamp.Sub(lastSecondChance) > minSecondChanceInterval { + m.lastSecondChance[pair] = timestamp + + log.Debugf("Second chance granted for %v->%v", fromNode, toNode) + + return true + } + + // Otherwise penalize the channel, because we don't allow channel + // updates that are that frequent. This is to prevent nodes from keeping + // us busy by continuously sending new channel updates. + log.Debugf("Second chance denied for %v->%v, remaining interval: %v", + fromNode, toNode, timestamp.Sub(lastSecondChance)) + + return false +} + // createHistoryIfNotExists returns the history for the given node. If the node // is yet unknown, it will create an empty history structure. func (m *MissionControl) createHistoryIfNotExists(vertex route.Vertex) *nodeHistory { @@ -237,6 +295,26 @@ func (m *MissionControl) ReportVertexFailure(v route.Vertex) { history.lastFail = &now } +// ReportEdgePolicyFailure reports a policy related failure. +func (m *MissionControl) ReportEdgePolicyFailure(failedEdge edge) { + now := m.now() + + m.Lock() + defer m.Unlock() + + // We may have an out of date graph. Therefore we don't always penalize + // immediately. If some time has passed since the last policy failure, + // we grant the node a second chance at forwarding the payment. + if m.requestSecondChance( + now, failedEdge.from, failedEdge.to, + ) { + return + } + + history := m.createHistoryIfNotExists(failedEdge.from) + history.lastFail = &now +} + // ReportEdgeFailure reports a channel level failure. // // TODO(roasbeef): also add value attempted to send and capacity of channel diff --git a/routing/missioncontrol_test.go b/routing/missioncontrol_test.go index f6f658b8..237f81df 100644 --- a/routing/missioncontrol_test.go +++ b/routing/missioncontrol_test.go @@ -106,3 +106,25 @@ func TestMissionControl(t *testing.T) { t.Fatal("unexpected number of channels") } } + +// TestMissionControlChannelUpdate tests that the first channel update is not +// penalizing the channel yet. +func TestMissionControlChannelUpdate(t *testing.T) { + ctx := createMcTestContext(t) + + testEdge := edge{ + channel: 123, + } + + // Report a policy related failure. Because it is the first, we don't + // expect a penalty. + ctx.mc.ReportEdgePolicyFailure(testEdge) + + ctx.expectP(0, 0.8) + + // Report another failure for the same channel. We expect it to be + // pruned. + ctx.mc.ReportEdgePolicyFailure(testEdge) + + ctx.expectP(0, 0) +} diff --git a/routing/mock_test.go b/routing/mock_test.go index cf0a3f3b..4c4f97eb 100644 --- a/routing/mock_test.go +++ b/routing/mock_test.go @@ -102,6 +102,8 @@ func (m *mockMissionControl) ReportEdgeFailure(failedEdge edge, minPenalizeAmt lnwire.MilliSatoshi) { } +func (m *mockMissionControl) ReportEdgePolicyFailure(failedEdge edge) {} + func (m *mockMissionControl) ReportVertexFailure(v route.Vertex) {} func (m *mockMissionControl) GetEdgeProbability(fromNode route.Vertex, edge EdgeLocator, diff --git a/routing/nodepair.go b/routing/nodepair.go new file mode 100644 index 00000000..edec8e02 --- /dev/null +++ b/routing/nodepair.go @@ -0,0 +1,10 @@ +package routing + +import ( + "github.com/lightningnetwork/lnd/routing/route" +) + +// DirectedNodePair stores a directed pair of nodes. +type DirectedNodePair struct { + From, To route.Vertex +} diff --git a/routing/payment_session.go b/routing/payment_session.go index 4a2c0a5b..f138ae31 100644 --- a/routing/payment_session.go +++ b/routing/payment_session.go @@ -52,12 +52,6 @@ type paymentSession struct { bandwidthHints map[uint64]lnwire.MilliSatoshi - // errFailedFeeChans is a map of the short channel IDs that were the - // source of policy related routing failures during this payment attempt. - // We'll use this map to prune out channels when the first error may not - // require pruning, but any subsequent ones do. - errFailedPolicyChans map[nodeChannel]struct{} - sessionSource *SessionSource preBuiltRoute *route.Route @@ -109,25 +103,7 @@ func (p *paymentSession) ReportEdgeFailure(failedEdge edge, // // TODO(joostjager): Move this logic into global mission control. func (p *paymentSession) ReportEdgePolicyFailure(failedEdge edge) { - key := nodeChannel{ - node: failedEdge.from, - channel: failedEdge.channel, - } - - // Check to see if we've already reported a policy related failure for - // this channel. If so, then we'll prune out the vertex. - _, ok := p.errFailedPolicyChans[key] - if ok { - // TODO(joostjager): is this aggressive pruning still necessary? - // Just pruning edges may also work unless there is a huge - // number of failing channels from that node? - p.ReportVertexFailure(key.node) - - return - } - - // Finally, we'll record a policy failure from this node and move on. - p.errFailedPolicyChans[key] = struct{}{} + p.sessionSource.MissionControl.ReportEdgePolicyFailure(failedEdge) } // RequestRoute returns a route which is likely to be capable for successfully diff --git a/routing/payment_session_source.go b/routing/payment_session_source.go index 69738026..77dad51e 100644 --- a/routing/payment_session_source.go +++ b/routing/payment_session_source.go @@ -119,11 +119,10 @@ func (m *SessionSource) NewPaymentSession(routeHints [][]zpay32.HopHint, } return &paymentSession{ - additionalEdges: edges, - bandwidthHints: bandwidthHints, - errFailedPolicyChans: make(map[nodeChannel]struct{}), - sessionSource: m, - pathFinder: findPath, + additionalEdges: edges, + bandwidthHints: bandwidthHints, + sessionSource: m, + pathFinder: findPath, }, nil } @@ -131,9 +130,8 @@ func (m *SessionSource) NewPaymentSession(routeHints [][]zpay32.HopHint, // used for failure reporting to missioncontrol. func (m *SessionSource) NewPaymentSessionForRoute(preBuiltRoute *route.Route) PaymentSession { return &paymentSession{ - errFailedPolicyChans: make(map[nodeChannel]struct{}), - sessionSource: m, - preBuiltRoute: preBuiltRoute, + sessionSource: m, + preBuiltRoute: preBuiltRoute, } } @@ -142,9 +140,8 @@ func (m *SessionSource) NewPaymentSessionForRoute(preBuiltRoute *route.Route) Pa // missioncontrol for resumed payment we don't want to make more attempts for. func (m *SessionSource) NewPaymentSessionEmpty() PaymentSession { return &paymentSession{ - errFailedPolicyChans: make(map[nodeChannel]struct{}), - sessionSource: m, - preBuiltRoute: &route.Route{}, - preBuiltRouteTried: true, + sessionSource: m, + preBuiltRoute: &route.Route{}, + preBuiltRouteTried: true, } } diff --git a/routing/router.go b/routing/router.go index d84d5875..0fd1caf9 100644 --- a/routing/router.go +++ b/routing/router.go @@ -178,6 +178,9 @@ type MissionController interface { ReportEdgeFailure(failedEdge edge, minPenalizeAmt lnwire.MilliSatoshi) + // ReportEdgePolicyFailure reports a policy related failure. + ReportEdgePolicyFailure(failedEdge edge) + // ReportVertexFailure reports a node level failure. ReportVertexFailure(v route.Vertex) @@ -1826,6 +1829,47 @@ func (r *ChannelRouter) sendPayment( } +// tryApplyChannelUpdate tries to apply a channel update present in the failure +// message if any. +func (r *ChannelRouter) tryApplyChannelUpdate(rt *route.Route, + errorSourceIdx int, failure lnwire.FailureMessage) error { + + // It makes no sense to apply our own channel updates. + if errorSourceIdx == 0 { + log.Errorf("Channel update of ourselves received") + + return nil + } + + // Extract channel update if the error contains one. + update := r.extractChannelUpdate(failure) + if update == nil { + return nil + } + + // Parse pubkey to allow validation of the channel update. This should + // always succeed, otherwise there is something wrong in our + // implementation. Therefore return an error. + errVertex := rt.Hops[errorSourceIdx-1].PubKeyBytes + errSource, err := btcec.ParsePubKey( + errVertex[:], btcec.S256(), + ) + if err != nil { + log.Errorf("Cannot parse pubkey: idx=%v, pubkey=%v", + errorSourceIdx, errVertex) + + return err + } + + // Apply channel update. + if !r.applyChannelUpdate(update, errSource) { + log.Debugf("Invalid channel update received: node=%x", + errVertex) + } + + return nil +} + // processSendError analyzes the error for the payment attempt received from the // switch and updates mission control and/or channel policies. Depending on the // error type, this error is either the final outcome of the payment or we need @@ -1851,32 +1895,28 @@ func (r *ChannelRouter) processSendError(paySession PaymentSession, return true, channeldb.FailureReasonError } - var ( - failureSourceIdx = fErr.FailureSourceIdx + failureMessage := fErr.FailureMessage + failureSourceIdx := fErr.FailureSourceIdx - failureVertex route.Vertex - failureSource *btcec.PublicKey - err error - ) + // Apply channel update if the error contains one. For unknown + // failures, failureMessage is nil. + if failureMessage != nil { + err := r.tryApplyChannelUpdate( + rt, failureSourceIdx, failureMessage, + ) + if err != nil { + return true, channeldb.FailureReasonError + } + } + + var failureVertex route.Vertex // For any non-self failure, look up the source pub key in the hops // slice. Otherwise return the self node pubkey. if failureSourceIdx > 0 { failureVertex = rt.Hops[failureSourceIdx-1].PubKeyBytes - failureSource, err = btcec.ParsePubKey(failureVertex[:], btcec.S256()) - if err != nil { - log.Errorf("Cannot parse pubkey %v: %v", - failureVertex, err) - - return true, channeldb.FailureReasonError - } } else { failureVertex = r.selfNode.PubKeyBytes - failureSource, err = r.selfNode.PubKey() - if err != nil { - log.Errorf("Cannot parse self pubkey: %v", err) - return true, channeldb.FailureReasonError - } } log.Tracef("Node %x (index %v) reported failure when sending htlc", failureVertex, failureSourceIdx) @@ -1885,41 +1925,7 @@ func (r *ChannelRouter) processSendError(paySession PaymentSession, // update with id may not be available. failedEdge, failedAmt := getFailedEdge(rt, failureSourceIdx) - // processChannelUpdateAndRetry is a closure that - // handles a failure message containing a channel - // update. This function always tries to apply the - // channel update and passes on the result to the - // payment session to adjust its view on the reliability - // of the network. - // - // As channel id, the locally determined channel id is - // used. It does not rely on the channel id that is part - // of the channel update message, because the remote - // node may lie to us or the update may be corrupt. - processChannelUpdateAndRetry := func( - update *lnwire.ChannelUpdate, - pubKey *btcec.PublicKey) { - - // Try to apply the channel update. - updateOk := r.applyChannelUpdate(update, pubKey) - - // If the update could not be applied, prune the - // edge. There is no reason to continue trying - // this channel. - // - // TODO: Could even prune the node completely? - // Or is there a valid reason for the channel - // update to fail? - if !updateOk { - paySession.ReportEdgeFailure( - failedEdge, 0, - ) - } - - paySession.ReportEdgePolicyFailure(failedEdge) - } - - switch onionErr := fErr.FailureMessage.(type) { + switch fErr.FailureMessage.(type) { // If the end destination didn't know the payment // hash or we sent the wrong payment amount to the @@ -1975,7 +1981,6 @@ func (r *ChannelRouter) processSendError(paySession PaymentSession, // that sent us this error, as it doesn't now what the // correct block height is. case *lnwire.FailExpiryTooSoon: - r.applyChannelUpdate(&onionErr.Update, failureSource) paySession.ReportVertexFailure(failureVertex) return false, 0 @@ -1996,34 +2001,27 @@ func (r *ChannelRouter) processSendError(paySession PaymentSession, // amount, we'll apply the new minimum amount and retry // routing. case *lnwire.FailAmountBelowMinimum: - processChannelUpdateAndRetry( - &onionErr.Update, failureSource, - ) + paySession.ReportEdgePolicyFailure(failedEdge) return false, 0 // If we get a failure due to a fee, we'll apply the // new fee update, and retry our attempt using the // newly updated fees. case *lnwire.FailFeeInsufficient: - processChannelUpdateAndRetry( - &onionErr.Update, failureSource, - ) + paySession.ReportEdgePolicyFailure(failedEdge) return false, 0 // If we get the failure for an intermediate node that // disagrees with our time lock values, then we'll // apply the new delta value and try it once more. case *lnwire.FailIncorrectCltvExpiry: - processChannelUpdateAndRetry( - &onionErr.Update, failureSource, - ) + paySession.ReportEdgePolicyFailure(failedEdge) return false, 0 // The outgoing channel that this node was meant to // forward one is currently disabled, so we'll apply // the update and continue. case *lnwire.FailChannelDisabled: - r.applyChannelUpdate(&onionErr.Update, failureSource) paySession.ReportEdgeFailure(failedEdge, 0) return false, 0 @@ -2031,7 +2029,6 @@ func (r *ChannelRouter) processSendError(paySession PaymentSession, // sufficient capacity, so we'll prune this edge for // now, and continue onwards with our path finding. case *lnwire.FailTemporaryChannelFailure: - r.applyChannelUpdate(onionErr.Update, failureSource) paySession.ReportEdgeFailure(failedEdge, failedAmt) return false, 0 @@ -2103,6 +2100,29 @@ func (r *ChannelRouter) processSendError(paySession PaymentSession, } } +// extractChannelUpdate examines the error and extracts the channel update. +func (r *ChannelRouter) extractChannelUpdate( + failure lnwire.FailureMessage) *lnwire.ChannelUpdate { + + var update *lnwire.ChannelUpdate + switch onionErr := failure.(type) { + case *lnwire.FailExpiryTooSoon: + update = &onionErr.Update + case *lnwire.FailAmountBelowMinimum: + update = &onionErr.Update + case *lnwire.FailFeeInsufficient: + update = &onionErr.Update + case *lnwire.FailIncorrectCltvExpiry: + update = &onionErr.Update + case *lnwire.FailChannelDisabled: + update = &onionErr.Update + case *lnwire.FailTemporaryChannelFailure: + update = onionErr.Update + } + + return update +} + // getFailedEdge tries to locate the failing channel given a route and the // pubkey of the node that sent the failure. It will assume that the failure is // associated with the outgoing channel of the failing node. As a second result, @@ -2147,11 +2167,6 @@ func getFailedEdge(route *route.Route, failureSource int) (edge, // database. It returns a bool indicating whether the updates was successful. func (r *ChannelRouter) applyChannelUpdate(msg *lnwire.ChannelUpdate, pubKey *btcec.PublicKey) bool { - // If we get passed a nil channel update (as it's optional with some - // onion errors), then we'll exit early with a success result. - if msg == nil { - return true - } ch, _, _, err := r.GetChannelByID(msg.ShortChannelID) if err != nil {