mirror of
https://github.com/aljazceru/breez-lnd.git
synced 2025-12-18 06:34:27 +01:00
Merge pull request #1687 from halseth/disable-inactive-channels
Send ChannelUpdate with Disabled bit if channel inactive for 20 minutes
This commit is contained in:
236
server.go
236
server.go
@@ -166,6 +166,11 @@ type server struct {
|
||||
// changed since last start.
|
||||
currentNodeAnn *lnwire.NodeAnnouncement
|
||||
|
||||
// sendDisabled is used to keep track of the disabled flag of the last
|
||||
// sent ChannelUpdate from announceChanStatus.
|
||||
sentDisabled map[wire.OutPoint]bool
|
||||
sentDisabledMtx sync.Mutex
|
||||
|
||||
quit chan struct{}
|
||||
|
||||
wg sync.WaitGroup
|
||||
@@ -275,6 +280,7 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl,
|
||||
inboundPeers: make(map[string]*peer),
|
||||
outboundPeers: make(map[string]*peer),
|
||||
peerConnectedListeners: make(map[string][]chan<- lnpeer.Peer),
|
||||
sentDisabled: make(map[wire.OutPoint]bool),
|
||||
|
||||
globalFeatures: lnwire.NewFeatureVector(globalFeatures,
|
||||
lnwire.GlobalFeatures),
|
||||
@@ -676,7 +682,9 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl,
|
||||
return ErrServerShuttingDown
|
||||
}
|
||||
},
|
||||
DisableChannel: s.disableChannel,
|
||||
DisableChannel: func(op wire.OutPoint) error {
|
||||
return s.announceChanStatus(op, true)
|
||||
},
|
||||
}, chanDB)
|
||||
|
||||
s.breachArbiter = newBreachArbiter(&BreachConfig{
|
||||
@@ -985,6 +993,11 @@ func (s *server) Start() error {
|
||||
srvrLog.Infof("Auto peer bootstrapping is disabled")
|
||||
}
|
||||
|
||||
// Start a goroutine that will periodically send out ChannelUpdates
|
||||
// based on a channel's status.
|
||||
s.wg.Add(1)
|
||||
go s.watchChannelStatus()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -2855,10 +2868,23 @@ func (s *server) fetchNodeAdvertisedAddr(pub *btcec.PublicKey) (net.Addr, error)
|
||||
return node.Addresses[0], nil
|
||||
}
|
||||
|
||||
// disableChannel disables a channel, resulting in it not being able to forward
|
||||
// payments. This is done by sending a new channel update across the network
|
||||
// with the disabled flag set.
|
||||
func (s *server) disableChannel(op wire.OutPoint) error {
|
||||
// announceChanStatus disables a channel if disabled=true, otherwise activates
|
||||
// it. This is done by sending a new channel update across the network with the
|
||||
// disabled flag set accordingly. The result of disabling the channel is it not
|
||||
// being able to forward payments.
|
||||
func (s *server) announceChanStatus(op wire.OutPoint, disabled bool) error {
|
||||
s.sentDisabledMtx.Lock()
|
||||
defer s.sentDisabledMtx.Unlock()
|
||||
|
||||
// If we have already sent out an update reflecting the current status,
|
||||
// skip this channel.
|
||||
alreadyDisabled, ok := s.sentDisabled[op]
|
||||
if ok && alreadyDisabled == disabled {
|
||||
return nil
|
||||
}
|
||||
|
||||
srvrLog.Debugf("Announcing channel(%v) disabled=%v", op, disabled)
|
||||
|
||||
// Retrieve the latest update for this channel. We'll use this
|
||||
// as our starting point to send the new update.
|
||||
chanUpdate, err := s.fetchLastChanUpdateByOutPoint(op)
|
||||
@@ -2866,12 +2892,22 @@ func (s *server) disableChannel(op wire.OutPoint) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// Set the bit responsible for marking a channel as disabled.
|
||||
chanUpdate.Flags |= lnwire.ChanUpdateDisabled
|
||||
if disabled {
|
||||
// Set the bit responsible for marking a channel as disabled.
|
||||
chanUpdate.Flags |= lnwire.ChanUpdateDisabled
|
||||
} else {
|
||||
// Clear the bit responsible for marking a channel as disabled.
|
||||
chanUpdate.Flags &= ^lnwire.ChanUpdateDisabled
|
||||
}
|
||||
|
||||
// We must now update the message's timestamp and generate a new
|
||||
// signature.
|
||||
chanUpdate.Timestamp = uint32(time.Now().Unix())
|
||||
newTimestamp := uint32(time.Now().Unix())
|
||||
if newTimestamp <= chanUpdate.Timestamp {
|
||||
// Timestamp must increase for message to propagate.
|
||||
newTimestamp = chanUpdate.Timestamp + 1
|
||||
}
|
||||
chanUpdate.Timestamp = newTimestamp
|
||||
|
||||
chanUpdateMsg, err := chanUpdate.DataToSign()
|
||||
if err != nil {
|
||||
@@ -2889,35 +2925,59 @@ func (s *server) disableChannel(op wire.OutPoint) error {
|
||||
}
|
||||
|
||||
// Once signed, we'll send the new update to all of our peers.
|
||||
return s.applyChannelUpdate(chanUpdate)
|
||||
if err := s.applyChannelUpdate(chanUpdate); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// We'll keep track of the status set in the last update we sent, to
|
||||
// avoid sending updates if nothing has changed.
|
||||
s.sentDisabled[op] = disabled
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// fetchLastChanUpdateByOutPoint fetches the latest update for a channel from
|
||||
// our point of view.
|
||||
func (s *server) fetchLastChanUpdateByOutPoint(op wire.OutPoint) (*lnwire.ChannelUpdate, error) {
|
||||
// fetchLastChanUpdateByOutPoint fetches the latest policy for our direction of
|
||||
// a channel, and crafts a new ChannelUpdate with this policy. Returns an error
|
||||
// in case our ChannelEdgePolicy is not found in the database.
|
||||
func (s *server) fetchLastChanUpdateByOutPoint(op wire.OutPoint) (
|
||||
*lnwire.ChannelUpdate, error) {
|
||||
|
||||
// Get the edge info and policies for this channel from the graph.
|
||||
graph := s.chanDB.ChannelGraph()
|
||||
info, edge1, edge2, err := graph.FetchChannelEdgesByOutpoint(&op)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if edge1 == nil || edge2 == nil {
|
||||
return nil, fmt.Errorf("unable to find channel(%v)", op)
|
||||
// Helper function to extract the owner of the given policy.
|
||||
owner := func(edge *channeldb.ChannelEdgePolicy) []byte {
|
||||
var pubKey *btcec.PublicKey
|
||||
switch {
|
||||
case edge.Flags&lnwire.ChanUpdateDirection == 0:
|
||||
pubKey, _ = info.NodeKey1()
|
||||
case edge.Flags&lnwire.ChanUpdateDirection == 1:
|
||||
pubKey, _ = info.NodeKey2()
|
||||
}
|
||||
|
||||
// If pubKey was not found, just return nil.
|
||||
if pubKey == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return pubKey.SerializeCompressed()
|
||||
}
|
||||
|
||||
// If we're the outgoing node on the first edge, then that
|
||||
// means the second edge is our policy. Otherwise, the first
|
||||
// edge is our policy.
|
||||
var local *channeldb.ChannelEdgePolicy
|
||||
|
||||
// Extract the channel update from the policy we own, if any.
|
||||
ourPubKey := s.identityPriv.PubKey().SerializeCompressed()
|
||||
if bytes.Equal(edge1.Node.PubKeyBytes[:], ourPubKey) {
|
||||
local = edge2
|
||||
} else {
|
||||
local = edge1
|
||||
if edge1 != nil && bytes.Equal(ourPubKey, owner(edge1)) {
|
||||
return extractChannelUpdate(info, edge1)
|
||||
}
|
||||
|
||||
return extractChannelUpdate(info, local)
|
||||
if edge2 != nil && bytes.Equal(ourPubKey, owner(edge2)) {
|
||||
return extractChannelUpdate(info, edge2)
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("unable to find channel(%v)", op)
|
||||
}
|
||||
|
||||
// extractChannelUpdate retrieves a lnwire.ChannelUpdate message from an edge's
|
||||
@@ -2948,22 +3008,6 @@ func extractChannelUpdate(info *channeldb.ChannelEdgeInfo,
|
||||
// applyChannelUpdate applies the channel update to the different sub-systems of
|
||||
// the server.
|
||||
func (s *server) applyChannelUpdate(update *lnwire.ChannelUpdate) error {
|
||||
newChannelPolicy := &channeldb.ChannelEdgePolicy{
|
||||
SigBytes: update.Signature.ToSignatureBytes(),
|
||||
ChannelID: update.ShortChannelID.ToUint64(),
|
||||
LastUpdate: time.Unix(int64(update.Timestamp), 0),
|
||||
Flags: update.Flags,
|
||||
TimeLockDelta: update.TimeLockDelta,
|
||||
MinHTLC: update.HtlcMinimumMsat,
|
||||
FeeBaseMSat: lnwire.MilliSatoshi(update.BaseFee),
|
||||
FeeProportionalMillionths: lnwire.MilliSatoshi(update.FeeRate),
|
||||
}
|
||||
|
||||
err := s.chanRouter.UpdateEdge(newChannelPolicy)
|
||||
if err != nil && !routing.IsError(err, routing.ErrIgnored) {
|
||||
return err
|
||||
}
|
||||
|
||||
pubKey := s.identityPriv.PubKey()
|
||||
errChan := s.authGossiper.ProcessLocalAnnouncement(update, pubKey)
|
||||
select {
|
||||
@@ -2973,3 +3017,115 @@ func (s *server) applyChannelUpdate(update *lnwire.ChannelUpdate) error {
|
||||
return ErrServerShuttingDown
|
||||
}
|
||||
}
|
||||
|
||||
// watchChannelStatus periodically queries the Switch for the status of the
|
||||
// open channels, and sends out ChannelUpdates to the network indicating their
|
||||
// active status. Currently we'll send out either a Disabled or Active update
|
||||
// if the channel has been in the same status over a given amount of time.
|
||||
//
|
||||
// NOTE: This MUST be run as a goroutine.
|
||||
func (s *server) watchChannelStatus() {
|
||||
defer s.wg.Done()
|
||||
|
||||
// A map with values activeStatus is used to keep track of the first
|
||||
// time we saw a link changing to the current active status.
|
||||
type activeStatus struct {
|
||||
active bool
|
||||
time time.Time
|
||||
}
|
||||
status := make(map[wire.OutPoint]activeStatus)
|
||||
|
||||
// We'll check in on the channel statuses every 1/4 of the timeout.
|
||||
unchangedTimeout := cfg.InactiveChanTimeout
|
||||
tickerTimeout := unchangedTimeout / 4
|
||||
|
||||
if unchangedTimeout == 0 || tickerTimeout == 0 {
|
||||
srvrLog.Debugf("Won't watch channel statuses")
|
||||
return
|
||||
}
|
||||
|
||||
ticker := time.NewTicker(tickerTimeout)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
channels, err := s.chanDB.FetchAllOpenChannels()
|
||||
if err != nil {
|
||||
srvrLog.Errorf("Unable to fetch open "+
|
||||
"channels: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
// For each open channel, update the status. We'll copy
|
||||
// the updated statuses to a new map, to avoid keeping
|
||||
// the status of closed channels around.
|
||||
newStatus := make(map[wire.OutPoint]activeStatus)
|
||||
for _, c := range channels {
|
||||
chanID := lnwire.NewChanIDFromOutPoint(
|
||||
&c.FundingOutpoint)
|
||||
|
||||
// Get the current active stauts from the
|
||||
// Switch.
|
||||
active := s.htlcSwitch.HasActiveLink(chanID)
|
||||
|
||||
var currentStatus activeStatus
|
||||
|
||||
// If this link is not in the map, or the
|
||||
// status has changed, set an updated active
|
||||
// status.
|
||||
st, ok := status[c.FundingOutpoint]
|
||||
if !ok || st.active != active {
|
||||
currentStatus = activeStatus{
|
||||
active: active,
|
||||
time: time.Now(),
|
||||
}
|
||||
} else {
|
||||
// The status is unchanged, we'll keep
|
||||
// it as is.
|
||||
currentStatus = st
|
||||
}
|
||||
|
||||
newStatus[c.FundingOutpoint] = currentStatus
|
||||
}
|
||||
|
||||
// Set the status map to the map of new statuses.
|
||||
status = newStatus
|
||||
|
||||
// If no change in status has happened during the last
|
||||
// interval, we'll send out an update. Note that we add
|
||||
// the negative of the timeout to set our limit in the
|
||||
// past.
|
||||
limit := time.Now().Add(-unchangedTimeout)
|
||||
|
||||
// We'll send out an update for all channels that have
|
||||
// had their status unchanged for longer than the limit.
|
||||
// NOTE: We also make sure to activate any channel when
|
||||
// we connect to a peer, to make them available for
|
||||
// path finding immediately.
|
||||
for op, st := range status {
|
||||
disable := !st.active
|
||||
|
||||
if st.time.Before(limit) {
|
||||
// Before we attempt to announce the
|
||||
// status of the channel, we remove it
|
||||
// from the status map such that it
|
||||
// will need a full unchaged interval
|
||||
// before we attempt to announce its
|
||||
// status again.
|
||||
delete(status, op)
|
||||
|
||||
err = s.announceChanStatus(op, disable)
|
||||
if err != nil {
|
||||
srvrLog.Errorf("Unable to "+
|
||||
"disable channel: %v",
|
||||
err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
case <-s.quit:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user