From aa0410c90acbcb2091de9de12e50b6acfcf7e2b9 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Sat, 24 Feb 2018 19:34:03 -0800 Subject: [PATCH 1/2] routing: add new methods to check the freshness of an edge/node In this commit, we add a set of new methods to check the freshness of an edge/node. This will allow callers to skip expensive validation in the case that the router already knows of an item, or knows of a fresher version of that time. A set of tests have been added to ensure basic correctness of these new methods. --- routing/router.go | 148 ++++++++++++++++++++----- routing/router_test.go | 239 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 359 insertions(+), 28 deletions(-) diff --git a/routing/router.go b/routing/router.go index 37df8c04..4eee0846 100644 --- a/routing/router.go +++ b/routing/router.go @@ -33,8 +33,8 @@ const ( DefaultFinalCLTVDelta = 9 ) -// ChannelGraphSource represents the source of information about the topology of -// the lightning network. It's responsible for the addition of nodes, edges, +// ChannelGraphSource represents the source of information about the topology +// of the lightning network. It's responsible for the addition of nodes, edges, // applying edge updates, and returning the current block height with which the // topology is synchronized. type ChannelGraphSource interface { @@ -56,6 +56,22 @@ type ChannelGraphSource interface { // edge considered as not fully constructed. UpdateEdge(policy *channeldb.ChannelEdgePolicy) error + // IsStaleNode returns true if the graph source has a node announcement + // for the target node with a more recent timestamp. This method will + // also return true if we don't have an active channel announcement for + // the target node. + IsStaleNode(node Vertex, timestamp time.Time) bool + + // IsKnownEdge returns true if the graph source already knows of the + // passed channel ID. + IsKnownEdge(chanID lnwire.ShortChannelID) bool + + // IsStaleEdgePolicy returns true if the graph source has a channel + // edge for the passed channel ID (and flags) that have a more recent + // timestamp. + IsStaleEdgePolicy(chanID lnwire.ShortChannelID, timestamp time.Time, + flags lnwire.ChanUpdateFlag) bool + // ForAllOutgoingChannels is used to iterate over all channels // emanating from the "source" node which is the center of the // star-graph. @@ -819,6 +835,42 @@ func (r *ChannelRouter) networkHandler() { } } +// assertNodeAnnFreshness returns a non-nil error if we have an announcement in +// the database for the passed node with a timestamp newer than the passed +// timestamp. ErrIgnored will be returned if we already have the node, and +// ErrOutdated will be returned if we have a timestamp that's after the new +// timestamp. +func (r *ChannelRouter) assertNodeAnnFreshness(node Vertex, + msgTimestamp time.Time) error { + + // If we are not already aware of this node, it means that we don't + // know about any channel using this node. To avoid a DoS attack by + // node announcements, we will ignore such nodes. If we do know about + // this node, check that this update brings info newer than what we + // already have. + lastUpdate, exists, err := r.cfg.Graph.HasLightningNode(node) + if err != nil { + return errors.Errorf("unable to query for the "+ + "existence of node: %v", err) + } + if !exists { + return newErrf(ErrIgnored, "Ignoring node announcement"+ + " for node not found in channel graph (%x)", + node[:]) + } + + // If we've reached this point then we're aware of the vertex being + // advertised. So we now check if the new message has a new time stamp, + // if not then we won't accept the new data as it would override newer + // data. + if !lastUpdate.Before(msgTimestamp) { + return newErrf(ErrOutdated, "Ignoring outdated "+ + "announcement for %x", node[:]) + } + + return nil +} + // processUpdate processes a new relate authenticated channel/edge, node or // channel/edge update network update. If the update didn't affect the internal // state of the draft due to either being out of date, invalid, or redundant, @@ -829,31 +881,12 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error { switch msg := msg.(type) { case *channeldb.LightningNode: - // If we are not already aware of this node, it means that we - // don't know about any channel using this node. To avoid a DoS - // attack by node announcements, we will ignore such nodes. If - // we do know about this node, check that this update brings - // info newer than what we already have. - lastUpdate, exists, err := r.cfg.Graph.HasLightningNode(msg.PubKeyBytes) + // Before we add the node to the database, we'll check to see + // if the announcement is "fresh" or not. If it isn't, then + // we'll return an error. + err := r.assertNodeAnnFreshness(msg.PubKeyBytes, msg.LastUpdate) if err != nil { - return errors.Errorf("unable to query for the "+ - "existence of node: %v", err) - } - if !exists { - return newErrf(ErrIgnored, "Ignoring node announcement"+ - " for node not found in channel graph (%x)", - msg.PubKeyBytes) - } - - // If we've reached this point then we're aware of the vertex - // being advertised. So we now check if the new message has a - // new time stamp, if not then we won't accept the new data as - // it would override newer data. - if exists && lastUpdate.After(msg.LastUpdate) || - lastUpdate.Equal(msg.LastUpdate) { - - return newErrf(ErrOutdated, "Ignoring outdated "+ - "announcement for %x", msg.PubKeyBytes) + return err } if err := r.cfg.Graph.AddLightningNode(msg); err != nil { @@ -1070,8 +1103,7 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error { } invalidateCache = true - log.Infof("New channel update applied: %v", - spew.Sdump(msg)) + log.Debugf("New channel update applied: %v", spew.Sdump(msg)) default: return errors.Errorf("wrong routing update message type") @@ -1907,3 +1939,63 @@ func (r *ChannelRouter) AddProof(chanID lnwire.ShortChannelID, info.AuthProof = proof return r.cfg.Graph.UpdateChannelEdge(info) } + +// IsStaleNode returns true if the graph source has a node announcement for the +// target node with a more recent timestamp. +// +// NOTE: This method is part of the ChannelGraphSource interface. +func (r *ChannelRouter) IsStaleNode(node Vertex, timestamp time.Time) bool { + // If our attempt to assert that the node announcement is fresh fails, + // then we know that this is actually a stale announcement. + return r.assertNodeAnnFreshness(node, timestamp) != nil +} + +// IsKnownEdge returns true if the graph source already knows of the passed +// channel ID. +// +// NOTE: This method is part of the ChannelGraphSource interface. +func (r *ChannelRouter) IsKnownEdge(chanID lnwire.ShortChannelID) bool { + _, _, exists, _ := r.cfg.Graph.HasChannelEdge(chanID.ToUint64()) + return exists +} + +// IsStaleEdgePolicy returns true if the graph soruce has a channel edge for +// the passed channel ID (and flags) that have a more recent timestamp. +// +// NOTE: This method is part of the ChannelGraphSource interface. +func (r *ChannelRouter) IsStaleEdgePolicy(chanID lnwire.ShortChannelID, + timestamp time.Time, flags lnwire.ChanUpdateFlag) bool { + + edge1Timestamp, edge2Timestamp, exists, err := r.cfg.Graph.HasChannelEdge( + chanID.ToUint64(), + ) + if err != nil { + return false + + } + + // If we don't know of the edge, then it means it's fresh (thus not + // stale). + if !exists { + return false + } + + // As edges are directional edge node has a unique policy for the + // direction of the edge they control. Therefore we first check if we + // already have the most up to date information for that edge. If so, + // then we can exit early. + switch { + + // A flag set of 0 indicates this is an announcement for the "first" + // node in the channel. + case flags&lnwire.ChanUpdateDirection == 0: + return !edge1Timestamp.Before(timestamp) + + // Similarly, a flag set of 1 indicates this is an announcement for the + // "second" node in the channel. + case flags&lnwire.ChanUpdateDirection == 1: + return !edge2Timestamp.Before(timestamp) + } + + return false +} diff --git a/routing/router_test.go b/routing/router_test.go index f4be1f88..67bc48f0 100644 --- a/routing/router_test.go +++ b/routing/router_test.go @@ -1391,3 +1391,242 @@ func TestFindPathFeeWeighting(t *testing.T) { t.Fatalf("wrong node: %v", path[0].Node.Alias) } } + +// TestIsStaleNode tests that the IsStaleNode method properly detects stale +// node announcements. +func TestIsStaleNode(t *testing.T) { + t.Parallel() + + const startingBlockHeight = 101 + ctx, cleanUp, err := createTestCtx(startingBlockHeight) + defer cleanUp() + if err != nil { + t.Fatalf("unable to create router: %v", err) + } + + // Before we can insert a node in to the database, we need to create a + // channel that it's linked to. + var ( + pub1 [33]byte + pub2 [33]byte + ) + copy(pub1[:], priv1.PubKey().SerializeCompressed()) + copy(pub2[:], priv2.PubKey().SerializeCompressed()) + + fundingTx, _, chanID, err := createChannelEdge(ctx, + bitcoinKey1.SerializeCompressed(), + bitcoinKey2.SerializeCompressed(), + 10000, 500) + if err != nil { + t.Fatalf("unable to create channel edge: %v", err) + } + fundingBlock := &wire.MsgBlock{ + Transactions: []*wire.MsgTx{fundingTx}, + } + ctx.chain.addBlock(fundingBlock, chanID.BlockHeight, chanID.BlockHeight) + + edge := &channeldb.ChannelEdgeInfo{ + ChannelID: chanID.ToUint64(), + NodeKey1Bytes: pub1, + NodeKey2Bytes: pub2, + BitcoinKey1Bytes: pub1, + BitcoinKey2Bytes: pub2, + AuthProof: nil, + } + if err := ctx.router.AddEdge(edge); err != nil { + t.Fatalf("unable to add edge: %v", err) + } + + // Before we add the node, if we query for staleness, we should get + // false, as we haven't added the full node. + updateTimeStamp := time.Unix(123, 0) + if ctx.router.IsStaleNode(pub1, updateTimeStamp) { + t.Fatalf("incorrectly detected node as stale") + } + + // With the node stub in the database, we'll add the fully node + // announcement to the database. + n1 := &channeldb.LightningNode{ + HaveNodeAnnouncement: true, + LastUpdate: updateTimeStamp, + Addresses: testAddrs, + Color: color.RGBA{1, 2, 3, 0}, + Alias: "node11", + AuthSigBytes: testSig.Serialize(), + Features: testFeatures, + } + copy(n1.PubKeyBytes[:], priv1.PubKey().SerializeCompressed()) + if err := ctx.router.AddNode(n1); err != nil { + t.Fatalf("could not add node: %v", err) + } + + // If we use the same timestamp and query for staleness, we should get + // true. + if !ctx.router.IsStaleNode(pub1, updateTimeStamp) { + t.Fatalf("failure to detect stale node update") + } + + // If we update the timestamp and once again query for staleness, it + // should report false. + newTimeStamp := time.Unix(1234, 0) + if ctx.router.IsStaleNode(pub1, newTimeStamp) { + t.Fatalf("incorrectly detected node as stale") + } +} + +// TestIsKnownEdge tests that the IsKnownEdge method properly detects stale +// channel announcements. +func TestIsKnownEdge(t *testing.T) { + t.Parallel() + + const startingBlockHeight = 101 + ctx, cleanUp, err := createTestCtx(startingBlockHeight) + defer cleanUp() + if err != nil { + t.Fatalf("unable to create router: %v", err) + } + + // First, we'll create a new channel edge (just the info) and insert it + // into the database. + var ( + pub1 [33]byte + pub2 [33]byte + ) + copy(pub1[:], priv1.PubKey().SerializeCompressed()) + copy(pub2[:], priv2.PubKey().SerializeCompressed()) + + fundingTx, _, chanID, err := createChannelEdge(ctx, + bitcoinKey1.SerializeCompressed(), + bitcoinKey2.SerializeCompressed(), + 10000, 500) + if err != nil { + t.Fatalf("unable to create channel edge: %v", err) + } + fundingBlock := &wire.MsgBlock{ + Transactions: []*wire.MsgTx{fundingTx}, + } + ctx.chain.addBlock(fundingBlock, chanID.BlockHeight, chanID.BlockHeight) + + edge := &channeldb.ChannelEdgeInfo{ + ChannelID: chanID.ToUint64(), + NodeKey1Bytes: pub1, + NodeKey2Bytes: pub2, + BitcoinKey1Bytes: pub1, + BitcoinKey2Bytes: pub2, + AuthProof: nil, + } + if err := ctx.router.AddEdge(edge); err != nil { + t.Fatalf("unable to add edge: %v", err) + } + + // Now that the edge has been inserted, query is the router already + // knows of the edge should return true. + if !ctx.router.IsKnownEdge(*chanID) { + t.Fatalf("router should detect edge as known") + } +} + +// TestIsStaleEdgePolicy tests that the IsStaleEdgePolicy properly detects +// stale channel edge update announcements. +func TestIsStaleEdgePolicy(t *testing.T) { + t.Parallel() + + const startingBlockHeight = 101 + ctx, cleanUp, err := createTestCtx(startingBlockHeight, + basicGraphFilePath) + defer cleanUp() + if err != nil { + t.Fatalf("unable to create router: %v", err) + } + + // First, we'll create a new channel edge (just the info) and insert it + // into the database. + var ( + pub1 [33]byte + pub2 [33]byte + ) + copy(pub1[:], priv1.PubKey().SerializeCompressed()) + copy(pub2[:], priv2.PubKey().SerializeCompressed()) + + fundingTx, _, chanID, err := createChannelEdge(ctx, + bitcoinKey1.SerializeCompressed(), + bitcoinKey2.SerializeCompressed(), + 10000, 500) + if err != nil { + t.Fatalf("unable to create channel edge: %v", err) + } + fundingBlock := &wire.MsgBlock{ + Transactions: []*wire.MsgTx{fundingTx}, + } + ctx.chain.addBlock(fundingBlock, chanID.BlockHeight, chanID.BlockHeight) + + // If we query for staleness before adding the edge, we should get + // false. + updateTimeStamp := time.Unix(123, 0) + if ctx.router.IsStaleEdgePolicy(*chanID, updateTimeStamp, 0) { + t.Fatalf("router failed to detect fresh edge policy") + } + if ctx.router.IsStaleEdgePolicy(*chanID, updateTimeStamp, 1) { + t.Fatalf("router failed to detect fresh edge policy") + } + + edge := &channeldb.ChannelEdgeInfo{ + ChannelID: chanID.ToUint64(), + NodeKey1Bytes: pub1, + NodeKey2Bytes: pub2, + BitcoinKey1Bytes: pub1, + BitcoinKey2Bytes: pub2, + AuthProof: nil, + } + if err := ctx.router.AddEdge(edge); err != nil { + t.Fatalf("unable to add edge: %v", err) + } + + // We'll also add two edge policies, one for each direction. + edgePolicy := &channeldb.ChannelEdgePolicy{ + SigBytes: testSig.Serialize(), + ChannelID: edge.ChannelID, + LastUpdate: updateTimeStamp, + TimeLockDelta: 10, + MinHTLC: 1, + FeeBaseMSat: 10, + FeeProportionalMillionths: 10000, + } + edgePolicy.Flags = 0 + if err := ctx.router.UpdateEdge(edgePolicy); err != nil { + t.Fatalf("unable to update edge policy: %v", err) + } + + edgePolicy = &channeldb.ChannelEdgePolicy{ + SigBytes: testSig.Serialize(), + ChannelID: edge.ChannelID, + LastUpdate: updateTimeStamp, + TimeLockDelta: 10, + MinHTLC: 1, + FeeBaseMSat: 10, + FeeProportionalMillionths: 10000, + } + edgePolicy.Flags = 1 + if err := ctx.router.UpdateEdge(edgePolicy); err != nil { + t.Fatalf("unable to update edge policy: %v", err) + } + + // Now that the edges have been added, an identical (chanID, flag, + // timestamp) tuple for each edge should be detected as a stale edge. + if !ctx.router.IsStaleEdgePolicy(*chanID, updateTimeStamp, 0) { + t.Fatalf("router failed to detect stale edge policy") + } + if !ctx.router.IsStaleEdgePolicy(*chanID, updateTimeStamp, 1) { + t.Fatalf("router failed to detect stale edge policy") + } + + // If we now update the timestamp for both edges, the router should + // detect that this tuple represents a fresh edge. + updateTimeStamp = time.Unix(9999, 0) + if ctx.router.IsStaleEdgePolicy(*chanID, updateTimeStamp, 0) { + t.Fatalf("router failed to detect fresh edge policy") + } + if ctx.router.IsStaleEdgePolicy(*chanID, updateTimeStamp, 1) { + t.Fatalf("router failed to detect fresh edge policy") + } +} From e7e4cdcf49617178be8b68fd53151807779fd5f9 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Sat, 24 Feb 2018 19:35:58 -0800 Subject: [PATCH 2/2] discovery: avoid always validating ECDSA sigs by asking router if item is fresh MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In this commit, we reduce the amount of unnecessary work that the gossiper can carry out. When CPU profiling some nodes, I noticed that we’d spend a lot of time validating the signatures for an announcement, only to realize that the router already had it. To remedy this, we’ll use the new methods added to the channel router in order to avoid unnecessarily validating an announcement that is actually stale. This should reduce memory usage (since it uses big int’s under the scenes), and also idle CPU usage. --- discovery/gossiper.go | 35 ++++++++++++++++++++++++++++--- discovery/gossiper_test.go | 43 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+), 3 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index cf629ad7..21f2a026 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -1299,7 +1299,17 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n // information about a node in one of the channels we know about, or a // updating previously advertised information. case *lnwire.NodeAnnouncement: + timestamp := time.Unix(int64(msg.Timestamp), 0) + if nMsg.isRemote { + // We'll quickly ask the router if it already has a + // newer update for this node so we can skip validating + // signatures if not required. + if d.cfg.Router.IsStaleNode(msg.NodeID, timestamp) { + nMsg.err <- nil + return nil + } + if err := ValidateNodeAnn(msg); err != nil { err := errors.Errorf("unable to validate "+ "node announcement: %v", err) @@ -1312,7 +1322,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n features := lnwire.NewFeatureVector(msg.Features, lnwire.GlobalFeatures) node := &channeldb.LightningNode{ HaveNodeAnnouncement: true, - LastUpdate: time.Unix(int64(msg.Timestamp), 0), + LastUpdate: timestamp, Addresses: msg.Addresses, PubKeyBytes: msg.NodeID, Alias: msg.Alias.String(), @@ -1382,6 +1392,13 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n return nil } + // At this point, we'll now ask the router if this is a stale + // update. If so we can skip all the processing below. + if d.cfg.Router.IsKnownEdge(msg.ShortChannelID) { + nMsg.err <- nil + return nil + } + // If this is a remote channel announcement, then we'll validate // all the signatures within the proof as it should be well // formed. @@ -1445,7 +1462,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n if err := d.cfg.Router.AddEdge(edge); err != nil { // If the edge was rejected due to already being known, // then it may be that case that this new message has a - // fresh channel proof, so we'll cechk. + // fresh channel proof, so we'll check. if routing.IsError(err, routing.ErrOutdated, routing.ErrIgnored) { @@ -1585,6 +1602,18 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n return nil } + // Before we perform any of the expensive checks below, we'll + // make sure that the router doesn't already have a fresher + // announcement for this edge. + timestamp := time.Unix(int64(msg.Timestamp), 0) + if d.cfg.Router.IsStaleEdgePolicy( + msg.ShortChannelID, timestamp, msg.Flags, + ) { + + nMsg.err <- nil + return nil + } + // Get the node pub key as far as we don't have it in channel // update announcement message. We'll need this to properly // verify message signature. @@ -1668,7 +1697,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n update := &channeldb.ChannelEdgePolicy{ SigBytes: msg.Signature.ToSignatureBytes(), ChannelID: shortChanID, - LastUpdate: time.Unix(int64(msg.Timestamp), 0), + LastUpdate: timestamp, Flags: msg.Flags, TimeLockDelta: msg.TimeLockDelta, MinHTLC: msg.HtlcMinimumMsat, diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index 4a8502d4..18c69da5 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -199,6 +199,49 @@ func (r *mockGraphSource) GetChannelByID(chanID lnwire.ShortChannelID) ( return chanInfo, edges[0], edges[1], nil } +// IsStaleNode returns true if the graph source has a node announcement for the +// target node with a more recent timestamp. +func (r *mockGraphSource) IsStaleNode(nodePub routing.Vertex, timestamp time.Time) bool { + for _, node := range r.nodes { + if node.PubKeyBytes == nodePub { + return node.LastUpdate.After(timestamp) || + node.LastUpdate.Equal(timestamp) + } + } + + return false +} + +// IsKnownEdge returns true if the graph source already knows of the passed +// channel ID. +func (r *mockGraphSource) IsKnownEdge(chanID lnwire.ShortChannelID) bool { + _, ok := r.infos[chanID.ToUint64()] + return ok +} + +// IsStaleEdgePolicy returns true if the graph source has a channel edge for +// the passed channel ID (and flags) that have a more recent timestamp. +func (r *mockGraphSource) IsStaleEdgePolicy(chanID lnwire.ShortChannelID, + timestamp time.Time, flags lnwire.ChanUpdateFlag) bool { + + edges, ok := r.edges[chanID.ToUint64()] + if !ok { + return false + } + + switch { + + case len(edges) >= 1 && edges[0].Flags == flags: + return !edges[0].LastUpdate.Before(timestamp) + + case len(edges) >= 2 && edges[1].Flags == flags: + return !edges[1].LastUpdate.Before(timestamp) + + default: + return false + } +} + type mockNotifier struct { clientCounter uint32 epochClients map[uint32]chan *chainntnfs.BlockEpoch