diff --git a/channeldb/db.go b/channeldb/db.go index 17275c29..173aada2 100644 --- a/channeldb/db.go +++ b/channeldb/db.go @@ -291,6 +291,7 @@ func CreateWithBackend(backend kvdb.Backend, modifiers ...OptionModifier) (*DB, chanDB.graph, err = NewChannelGraph( backend, opts.RejectCacheSize, opts.ChannelCacheSize, opts.BatchCommitInterval, opts.PreAllocCacheNumNodes, + opts.UseGraphCache, ) if err != nil { return nil, err diff --git a/channeldb/graph.go b/channeldb/graph.go index 3cda1d78..d1bb85b1 100644 --- a/channeldb/graph.go +++ b/channeldb/graph.go @@ -188,8 +188,8 @@ type ChannelGraph struct { // NewChannelGraph allocates a new ChannelGraph backed by a DB instance. The // returned instance has its own unique reject cache and channel cache. func NewChannelGraph(db kvdb.Backend, rejectCacheSize, chanCacheSize int, - batchCommitInterval time.Duration, - preAllocCacheNumNodes int) (*ChannelGraph, error) { + batchCommitInterval time.Duration, preAllocCacheNumNodes int, + useGraphCache bool) (*ChannelGraph, error) { if err := initChannelGraph(db); err != nil { return nil, err @@ -199,7 +199,6 @@ func NewChannelGraph(db kvdb.Backend, rejectCacheSize, chanCacheSize int, db: db, rejectCache: newRejectCache(rejectCacheSize), chanCache: newChannelCache(chanCacheSize), - graphCache: NewGraphCache(preAllocCacheNumNodes), } g.chanScheduler = batch.NewTimeScheduler( db, &g.cacheMu, batchCommitInterval, @@ -208,18 +207,25 @@ func NewChannelGraph(db kvdb.Backend, rejectCacheSize, chanCacheSize int, db, nil, batchCommitInterval, ) - startTime := time.Now() - log.Debugf("Populating in-memory channel graph, this might take a " + - "while...") - err := g.ForEachNodeCacheable(func(tx kvdb.RTx, node GraphCacheNode) error { - return g.graphCache.AddNode(tx, node) - }) - if err != nil { - return nil, err - } + // The graph cache can be turned off (e.g. for mobile users) for a + // speed/memory usage tradeoff. + if useGraphCache { + g.graphCache = NewGraphCache(preAllocCacheNumNodes) + startTime := time.Now() + log.Debugf("Populating in-memory channel graph, this might " + + "take a while...") + err := g.ForEachNodeCacheable( + func(tx kvdb.RTx, node GraphCacheNode) error { + return g.graphCache.AddNode(tx, node) + }, + ) + if err != nil { + return nil, err + } - log.Debugf("Finished populating in-memory channel graph (took %v, %s)", - time.Since(startTime), g.graphCache.Stats()) + log.Debugf("Finished populating in-memory channel graph (took "+ + "%v, %s)", time.Since(startTime), g.graphCache.Stats()) + } return g, nil } @@ -302,6 +308,16 @@ func initChannelGraph(db kvdb.Backend) error { return nil } +// NewPathFindTx returns a new read transaction that can be used for a single +// path finding session. Will return nil if the graph cache is enabled. +func (c *ChannelGraph) NewPathFindTx() (kvdb.RTx, error) { + if c.graphCache != nil { + return nil, nil + } + + return c.db.BeginReadTx() +} + // ForEachChannel iterates through all the channel edges stored within the // graph and invokes the passed callback for each edge. The callback takes two // edges as since this is a directed graph, both the in/out edges are visited. @@ -370,10 +386,45 @@ func (c *ChannelGraph) ForEachChannel(cb func(*ChannelEdgeInfo, // halted with the error propagated back up to the caller. // // Unknown policies are passed into the callback as nil values. -func (c *ChannelGraph) ForEachNodeChannel(node route.Vertex, +func (c *ChannelGraph) ForEachNodeChannel(tx kvdb.RTx, node route.Vertex, cb func(channel *DirectedChannel) error) error { - return c.graphCache.ForEachChannel(node, cb) + if c.graphCache != nil { + return c.graphCache.ForEachChannel(node, cb) + } + + // Fallback that uses the database. + toNodeCallback := func() route.Vertex { + return node + } + toNodeFeatures, err := c.FetchNodeFeatures(node) + if err != nil { + return err + } + + dbCallback := func(tx kvdb.RTx, e *ChannelEdgeInfo, p1, + p2 *ChannelEdgePolicy) error { + + cachedInPolicy := NewCachedPolicy(p2) + cachedInPolicy.ToNodePubKey = toNodeCallback + cachedInPolicy.ToNodeFeatures = toNodeFeatures + + directedChannel := &DirectedChannel{ + ChannelID: e.ChannelID, + IsNode1: node == e.NodeKey1Bytes, + OtherNode: e.NodeKey2Bytes, + Capacity: e.Capacity, + OutPolicySet: p1 != nil, + InPolicy: cachedInPolicy, + } + + if node == e.NodeKey2Bytes { + directedChannel.OtherNode = e.NodeKey1Bytes + } + + return cb(directedChannel) + } + return nodeTraversal(tx, node[:], c.db, dbCallback) } // FetchNodeFeatures returns the features of a given node. If no features are @@ -381,7 +432,27 @@ func (c *ChannelGraph) ForEachNodeChannel(node route.Vertex, func (c *ChannelGraph) FetchNodeFeatures( node route.Vertex) (*lnwire.FeatureVector, error) { - return c.graphCache.GetFeatures(node), nil + if c.graphCache != nil { + return c.graphCache.GetFeatures(node), nil + } + + // Fallback that uses the database. + targetNode, err := c.FetchLightningNode(node) + switch err { + + // If the node exists and has features, return them directly. + case nil: + return targetNode.Features, nil + + // If we couldn't find a node announcement, populate a blank feature + // vector. + case ErrGraphNodeNotFound: + return lnwire.EmptyFeatureVector(), nil + + // Otherwise, bubble the error up. + default: + return nil, err + } } // DisabledChannelIDs returns the channel ids of disabled channels. @@ -601,11 +672,14 @@ func (c *ChannelGraph) AddLightningNode(node *LightningNode, r := &batch.Request{ Update: func(tx kvdb.RwTx) error { - cNode := newGraphCacheNode( - node.PubKeyBytes, node.Features, - ) - if err := c.graphCache.AddNode(tx, cNode); err != nil { - return err + if c.graphCache != nil { + cNode := newGraphCacheNode( + node.PubKeyBytes, node.Features, + ) + err := c.graphCache.AddNode(tx, cNode) + if err != nil { + return err + } } return addLightningNode(tx, node) @@ -686,7 +760,9 @@ func (c *ChannelGraph) DeleteLightningNode(nodePub route.Vertex) error { return ErrGraphNodeNotFound } - c.graphCache.RemoveNode(nodePub) + if c.graphCache != nil { + c.graphCache.RemoveNode(nodePub) + } return c.deleteLightningNode(nodes, nodePub[:]) }, func() {}) @@ -814,7 +890,9 @@ func (c *ChannelGraph) addChannelEdge(tx kvdb.RwTx, edge *ChannelEdgeInfo) error return ErrEdgeAlreadyExist } - c.graphCache.AddChannel(edge, nil, nil) + if c.graphCache != nil { + c.graphCache.AddChannel(edge, nil, nil) + } // Before we insert the channel into the database, we'll ensure that // both nodes already exist in the channel graph. If either node @@ -1015,7 +1093,9 @@ func (c *ChannelGraph) UpdateChannelEdge(edge *ChannelEdgeInfo) error { return ErrEdgeNotFound } - c.graphCache.UpdateChannel(edge) + if c.graphCache != nil { + c.graphCache.UpdateChannel(edge) + } return putChanEdgeInfo(edgeIndex, edge, chanKey) }, func() {}) @@ -1153,7 +1233,10 @@ func (c *ChannelGraph) PruneGraph(spentOutputs []*wire.OutPoint, c.chanCache.remove(channel.ChannelID) } - log.Debugf("Pruned graph, cache now has %s", c.graphCache.Stats()) + if c.graphCache != nil { + log.Debugf("Pruned graph, cache now has %s", + c.graphCache.Stats()) + } return chansClosed, nil } @@ -1255,7 +1338,9 @@ func (c *ChannelGraph) pruneGraphNodes(nodes kvdb.RwBucket, continue } - c.graphCache.RemoveNode(nodePubKey) + if c.graphCache != nil { + c.graphCache.RemoveNode(nodePubKey) + } // If we reach this point, then there are no longer any edges // that connect this node, so we can delete it. @@ -2100,10 +2185,12 @@ func (c *ChannelGraph) delChannelEdge(edges, edgeIndex, chanIndex, zombieIndex, return err } - c.graphCache.RemoveChannel( - edgeInfo.NodeKey1Bytes, edgeInfo.NodeKey2Bytes, - edgeInfo.ChannelID, - ) + if c.graphCache != nil { + c.graphCache.RemoveChannel( + edgeInfo.NodeKey1Bytes, edgeInfo.NodeKey2Bytes, + edgeInfo.ChannelID, + ) + } // We'll also remove the entry in the edge update index bucket before // we delete the edges themselves so we can access their last update @@ -2360,7 +2447,12 @@ func updateEdgePolicy(tx kvdb.RwTx, edge *ChannelEdgePolicy, ) copy(fromNodePubKey[:], fromNode) copy(toNodePubKey[:], toNode) - graphCache.UpdatePolicy(edge, fromNodePubKey, toNodePubKey, isUpdate1) + + if graphCache != nil { + graphCache.UpdatePolicy( + edge, fromNodePubKey, toNodePubKey, isUpdate1, + ) + } return isUpdate1, nil } @@ -3629,7 +3721,9 @@ func (c *ChannelGraph) MarkEdgeZombie(chanID uint64, "bucket: %w", err) } - c.graphCache.RemoveChannel(pubKey1, pubKey2, chanID) + if c.graphCache != nil { + c.graphCache.RemoveChannel(pubKey1, pubKey2, chanID) + } return markEdgeZombie(zombieIndex, chanID, pubKey1, pubKey2) }) @@ -3691,10 +3785,13 @@ func (c *ChannelGraph) MarkEdgeLive(chanID uint64) error { if err != nil { return err } - for _, edgeInfo := range edgeInfos { - c.graphCache.AddChannel( - edgeInfo.Info, edgeInfo.Policy1, edgeInfo.Policy2, - ) + if c.graphCache != nil { + for _, edgeInfo := range edgeInfos { + c.graphCache.AddChannel( + edgeInfo.Info, edgeInfo.Policy1, + edgeInfo.Policy2, + ) + } } return nil diff --git a/channeldb/graph_test.go b/channeldb/graph_test.go index 27b97984..f66b5795 100644 --- a/channeldb/graph_test.go +++ b/channeldb/graph_test.go @@ -77,6 +77,7 @@ func MakeTestGraph(modifiers ...OptionModifier) (*ChannelGraph, func(), error) { graph, err := NewChannelGraph( backend, opts.RejectCacheSize, opts.ChannelCacheSize, opts.BatchCommitInterval, opts.PreAllocCacheNumNodes, + true, ) if err != nil { backendCleanup() diff --git a/channeldb/options.go b/channeldb/options.go index ad22fa8e..bc53bb47 100644 --- a/channeldb/options.go +++ b/channeldb/options.go @@ -45,6 +45,11 @@ type Options struct { // graph cache, so we can pre-allocate the map accordingly. PreAllocCacheNumNodes int + // UseGraphCache denotes whether the in-memory graph cache should be + // used or a fallback version that uses the underlying database for + // path finding. + UseGraphCache bool + // clock is the time source used by the database. clock clock.Clock @@ -65,6 +70,7 @@ func DefaultOptions() Options { RejectCacheSize: DefaultRejectCacheSize, ChannelCacheSize: DefaultChannelCacheSize, PreAllocCacheNumNodes: DefaultPreAllocCacheNumNodes, + UseGraphCache: true, clock: clock.NewDefaultClock(), } } @@ -93,6 +99,13 @@ func OptionSetPreAllocCacheNumNodes(n int) OptionModifier { } } +// OptionSetUseGraphCache sets the UseGraphCache option to the given value. +func OptionSetUseGraphCache(use bool) OptionModifier { + return func(o *Options) { + o.UseGraphCache = use + } +} + // OptionSetSyncFreelist allows the database to sync its freelist. func OptionSetSyncFreelist(b bool) OptionModifier { return func(o *Options) { diff --git a/config_builder.go b/config_builder.go index b4f8e2b4..a2ae613e 100644 --- a/config_builder.go +++ b/config_builder.go @@ -847,6 +847,7 @@ func (d *DefaultDatabaseBuilder) BuildDatabase( channeldb.OptionSetChannelCacheSize(cfg.Caches.ChannelCacheSize), channeldb.OptionSetBatchCommitInterval(cfg.DB.BatchCommitInterval), channeldb.OptionDryRunMigration(cfg.DryRunMigration), + channeldb.OptionSetUseGraphCache(!cfg.DB.NoGraphCache), } // We want to pre-allocate the channel graph cache according to what we diff --git a/docs/release-notes/release-notes-0.14.0.md b/docs/release-notes/release-notes-0.14.0.md index 3051ea2d..35e34bde 100644 --- a/docs/release-notes/release-notes-0.14.0.md +++ b/docs/release-notes/release-notes-0.14.0.md @@ -92,6 +92,14 @@ usage. Users running `lnd` on low-memory systems are advised to run with the removes zombie channels from the graph, reducing the number of channels that need to be kept in memory. +There is a [fallback option](https://github.com/lightningnetwork/lnd/pull/5840) +`db.no-graph-cache=true` that can be used when running a Bolt (`bbolt`) based +database backend. Using the database for path finding is considerably slower +than using the in-memory graph cache but uses less RAM. The fallback option is +not available for `etcd` or Postgres database backends because of the way they +handle long-running database transactions that are required for the path finding +operations. + ## Protocol Extensions ### Explicit Channel Negotiation diff --git a/lncfg/db.go b/lncfg/db.go index 8ff929f5..7c000477 100644 --- a/lncfg/db.go +++ b/lncfg/db.go @@ -57,6 +57,8 @@ type DB struct { Bolt *kvdb.BoltConfig `group:"bolt" namespace:"bolt" description:"Bolt settings."` Postgres *postgres.Config `group:"postgres" namespace:"postgres" description:"Postgres settings."` + + NoGraphCache bool `long:"no-graph-cache" description:"Don't use the in-memory graph cache for path finding. Much slower but uses less RAM. Can only be used with a bolt database backend."` } // DefaultDB creates and returns a new default DB config. @@ -87,8 +89,21 @@ func (db *DB) Validate() error { } default: - return fmt.Errorf("unknown backend, must be either \"%v\" or \"%v\"", - BoltBackend, EtcdBackend) + return fmt.Errorf("unknown backend, must be either '%v' or "+ + "'%v'", BoltBackend, EtcdBackend) + } + + // The path finding uses a manual read transaction that's open for a + // potentially long time. That works fine with the locking model of + // bbolt but can lead to locks or rolled back transactions with etcd or + // postgres. And since we already have a smaller memory footprint for + // remote database setups (due to not needing to memory-map the bbolt DB + // files), we can keep the graph in memory instead. But for mobile + // devices the tradeoff between a smaller memory footprint and the + // longer time needed for path finding might be a desirable one. + if db.NoGraphCache && db.Backend != BoltBackend { + return fmt.Errorf("cannot use no-graph-cache with database "+ + "backend '%v'", db.Backend) } return nil diff --git a/lntest/itest/lnd_routing_test.go b/lntest/itest/lnd_routing_test.go index 54ea43c1..8a6d40e6 100644 --- a/lntest/itest/lnd_routing_test.go +++ b/lntest/itest/lnd_routing_test.go @@ -433,15 +433,46 @@ func testSingleHopSendToRouteCase(net *lntest.NetworkHarness, t *harnessTest, // We'll query the daemon for routes from Alice to Carol and then // send payments through the routes. func testMultiHopSendToRoute(net *lntest.NetworkHarness, t *harnessTest) { + t.t.Run("with cache", func(tt *testing.T) { + ht := newHarnessTest(tt, t.lndHarness) + runMultiHopSendToRoute(net, ht, true) + }) + if *dbBackendFlag == "bbolt" { + t.t.Run("without cache", func(tt *testing.T) { + ht := newHarnessTest(tt, t.lndHarness) + runMultiHopSendToRoute(net, ht, false) + }) + } +} + +// runMultiHopSendToRoute tests that payments are properly processed +// through a provided route. We'll create the following network topology: +// Alice --100k--> Bob --100k--> Carol +// We'll query the daemon for routes from Alice to Carol and then +// send payments through the routes. +func runMultiHopSendToRoute(net *lntest.NetworkHarness, t *harnessTest, + useGraphCache bool) { + ctxb := context.Background() + var opts []string + if !useGraphCache { + opts = append(opts, "--db.no-graph-cache") + } + + alice := net.NewNode(t.t, "Alice", opts) + defer shutdownAndAssert(net, t, alice) + + net.ConnectNodes(t.t, alice, net.Bob) + net.SendCoins(t.t, btcutil.SatoshiPerBitcoin, alice) + const chanAmt = btcutil.Amount(100000) var networkChans []*lnrpc.ChannelPoint // Open a channel with 100k satoshis between Alice and Bob with Alice // being the sole funder of the channel. chanPointAlice := openChannelAndAssert( - t, net, net.Alice, net.Bob, + t, net, alice, net.Bob, lntest.OpenChannelParams{ Amt: chanAmt, }, @@ -483,7 +514,7 @@ func testMultiHopSendToRoute(net *lntest.NetworkHarness, t *harnessTest) { } // Wait for all nodes to have seen all channels. - nodes := []*lntest.HarnessNode{net.Alice, net.Bob, carol} + nodes := []*lntest.HarnessNode{alice, net.Bob, carol} nodeNames := []string{"Alice", "Bob", "Carol"} for _, chanPoint := range networkChans { for i, node := range nodes { @@ -529,7 +560,7 @@ func testMultiHopSendToRoute(net *lntest.NetworkHarness, t *harnessTest) { FinalCltvDelta: chainreg.DefaultBitcoinTimeLockDelta, } ctxt, _ := context.WithTimeout(ctxb, defaultTimeout) - routes, err := net.Alice.QueryRoutes(ctxt, routesReq) + routes, err := alice.QueryRoutes(ctxt, routesReq) if err != nil { t.Fatalf("unable to get route: %v", err) } @@ -565,7 +596,7 @@ func testMultiHopSendToRoute(net *lntest.NetworkHarness, t *harnessTest) { PaymentHash: rHash, Route: &route, } - resp, err := net.Alice.RouterClient.SendToRouteV2(ctxt, sendReq) + resp, err := alice.RouterClient.SendToRouteV2(ctxt, sendReq) if err != nil { t.Fatalf("unable to send payment: %v", err) } @@ -593,10 +624,10 @@ func testMultiHopSendToRoute(net *lntest.NetworkHarness, t *harnessTest) { bobFundPoint, amountPaid, int64(0)) assertAmountPaid(t, "Alice(local) => Bob(remote)", net.Bob, aliceFundPoint, int64(0), amountPaid+(baseFee*numPayments)) - assertAmountPaid(t, "Alice(local) => Bob(remote)", net.Alice, + assertAmountPaid(t, "Alice(local) => Bob(remote)", alice, aliceFundPoint, amountPaid+(baseFee*numPayments), int64(0)) - closeChannelAndAssert(t, net, net.Alice, chanPointAlice, false) + closeChannelAndAssert(t, net, alice, chanPointAlice, false) closeChannelAndAssert(t, net, carol, chanPointBob, false) } diff --git a/routing/graph.go b/routing/graph.go index 7e0ba65b..54ddd46b 100644 --- a/routing/graph.go +++ b/routing/graph.go @@ -2,6 +2,7 @@ package routing import ( "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/routing/route" ) @@ -25,6 +26,7 @@ type routingGraph interface { // database. type CachedGraph struct { graph *channeldb.ChannelGraph + tx kvdb.RTx source route.Vertex } @@ -32,27 +34,40 @@ type CachedGraph struct { // interface. var _ routingGraph = (*CachedGraph)(nil) -// NewCachedGraph instantiates a new db-connected routing graph. It implictly +// NewCachedGraph instantiates a new db-connected routing graph. It implicitly // instantiates a new read transaction. -func NewCachedGraph(graph *channeldb.ChannelGraph) (*CachedGraph, error) { - sourceNode, err := graph.SourceNode() +func NewCachedGraph(sourceNode *channeldb.LightningNode, + graph *channeldb.ChannelGraph) (*CachedGraph, error) { + + tx, err := graph.NewPathFindTx() if err != nil { return nil, err } return &CachedGraph{ graph: graph, + tx: tx, source: sourceNode.PubKeyBytes, }, nil } +// close attempts to close the underlying db transaction. This is a no-op in +// case the underlying graph uses an in-memory cache. +func (g *CachedGraph) close() error { + if g.tx == nil { + return nil + } + + return g.tx.Rollback() +} + // forEachNodeChannel calls the callback for every channel of the given node. // // NOTE: Part of the routingGraph interface. func (g *CachedGraph) forEachNodeChannel(nodePub route.Vertex, cb func(channel *channeldb.DirectedChannel) error) error { - return g.graph.ForEachNodeChannel(nodePub, cb) + return g.graph.ForEachNodeChannel(g.tx, nodePub, cb) } // sourceNode returns the source node of the graph. diff --git a/routing/integrated_routing_context_test.go b/routing/integrated_routing_context_test.go index 79fa3708..bbca1975 100644 --- a/routing/integrated_routing_context_test.go +++ b/routing/integrated_routing_context_test.go @@ -145,7 +145,7 @@ func (c *integratedRoutingContext) testPayment(maxParts uint32, c.t.Fatal(err) } - getBandwidthHints := func() (bandwidthHints, error) { + getBandwidthHints := func(_ routingGraph) (bandwidthHints, error) { // Create bandwidth hints based on local channel balances. bandwidthHints := map[uint64]lnwire.MilliSatoshi{} for _, ch := range c.graph.nodes[c.source.pubkey].channels { @@ -179,7 +179,11 @@ func (c *integratedRoutingContext) testPayment(maxParts uint32, } session, err := newPaymentSession( - &payment, getBandwidthHints, c.graph, mc, c.pathFindingCfg, + &payment, getBandwidthHints, + func() (routingGraph, func(), error) { + return c.graph, func() {}, nil + }, + mc, c.pathFindingCfg, ) if err != nil { c.t.Fatal(err) diff --git a/routing/pathfind_test.go b/routing/pathfind_test.go index dcc3b0a1..dbe38475 100644 --- a/routing/pathfind_test.go +++ b/routing/pathfind_test.go @@ -150,7 +150,9 @@ type testChan struct { // makeTestGraph creates a new instance of a channeldb.ChannelGraph for testing // purposes. A callback which cleans up the created temporary directories is // also returned and intended to be executed after the test completes. -func makeTestGraph() (*channeldb.ChannelGraph, kvdb.Backend, func(), error) { +func makeTestGraph(useCache bool) (*channeldb.ChannelGraph, kvdb.Backend, + func(), error) { + // First, create a temporary directory to be used for the duration of // this test. tempDirName, err := ioutil.TempDir("", "channeldb") @@ -173,6 +175,7 @@ func makeTestGraph() (*channeldb.ChannelGraph, kvdb.Backend, func(), error) { graph, err := channeldb.NewChannelGraph( backend, opts.RejectCacheSize, opts.ChannelCacheSize, opts.BatchCommitInterval, opts.PreAllocCacheNumNodes, + useCache, ) if err != nil { cleanUp() @@ -184,7 +187,7 @@ func makeTestGraph() (*channeldb.ChannelGraph, kvdb.Backend, func(), error) { // parseTestGraph returns a fully populated ChannelGraph given a path to a JSON // file which encodes a test graph. -func parseTestGraph(path string) (*testGraphInstance, error) { +func parseTestGraph(useCache bool, path string) (*testGraphInstance, error) { graphJSON, err := ioutil.ReadFile(path) if err != nil { return nil, err @@ -209,7 +212,7 @@ func parseTestGraph(path string) (*testGraphInstance, error) { testAddrs = append(testAddrs, testAddr) // Next, create a temporary graph database for usage within the test. - graph, graphBackend, cleanUp, err := makeTestGraph() + graph, graphBackend, cleanUp, err := makeTestGraph(useCache) if err != nil { return nil, err } @@ -528,8 +531,8 @@ func (g *testGraphInstance) getLink(chanID lnwire.ShortChannelID) ( // a deterministical way and added to the channel graph. A list of nodes is // not required and derived from the channel data. The goal is to keep // instantiating a test channel graph as light weight as possible. -func createTestGraphFromChannels(testChannels []*testChannel, source string) ( - *testGraphInstance, error) { +func createTestGraphFromChannels(useCache bool, testChannels []*testChannel, + source string) (*testGraphInstance, error) { // We'll use this fake address for the IP address of all the nodes in // our tests. This value isn't needed for path finding so it doesn't @@ -542,7 +545,7 @@ func createTestGraphFromChannels(testChannels []*testChannel, source string) ( testAddrs = append(testAddrs, testAddr) // Next, create a temporary graph database for usage within the test. - graph, graphBackend, cleanUp, err := makeTestGraph() + graph, graphBackend, cleanUp, err := makeTestGraph(useCache) if err != nil { return nil, err } @@ -768,13 +771,106 @@ func createTestGraphFromChannels(testChannels []*testChannel, source string) ( }, nil } -// TestFindLowestFeePath tests that out of two routes with identical total +// TestPathFinding tests all path finding related cases both with the in-memory +// graph cached turned on and off. +func TestPathFinding(t *testing.T) { + testCases := []struct { + name string + fn func(t *testing.T, useCache bool) + }{{ + name: "lowest fee path", + fn: runFindLowestFeePath, + }, { + name: "basic graph path finding", + fn: runBasicGraphPathFinding, + }, { + name: "path finding with additional edges", + fn: runPathFindingWithAdditionalEdges, + }, { + name: "new route path too long", + fn: runNewRoutePathTooLong, + }, { + name: "path not available", + fn: runPathNotAvailable, + }, { + name: "destination tlv graph fallback", + fn: runDestTLVGraphFallback, + }, { + name: "missing feature dependency", + fn: runMissingFeatureDep, + }, { + name: "unknown required features", + fn: runUnknownRequiredFeatures, + }, { + name: "destination payment address", + fn: runDestPaymentAddr, + }, { + name: "path insufficient capacity", + fn: runPathInsufficientCapacity, + }, { + name: "route fail min HTLC", + fn: runRouteFailMinHTLC, + }, { + name: "route fail max HTLC", + fn: runRouteFailMaxHTLC, + }, { + name: "route fail disabled edge", + fn: runRouteFailDisabledEdge, + }, { + name: "path source edges bandwidth", + fn: runPathSourceEdgesBandwidth, + }, { + name: "restrict outgoing channel", + fn: runRestrictOutgoingChannel, + }, { + name: "restrict last hop", + fn: runRestrictLastHop, + }, { + name: "CLTV limit", + fn: runCltvLimit, + }, { + name: "probability routing", + fn: runProbabilityRouting, + }, { + name: "equal cost route selection", + fn: runEqualCostRouteSelection, + }, { + name: "no cycle", + fn: runNoCycle, + }, { + name: "route to self", + fn: runRouteToSelf, + }} + + // Run with graph cache enabled. + for _, tc := range testCases { + tc := tc + + t.Run("cache=true/"+tc.name, func(tt *testing.T) { + tt.Parallel() + + tc.fn(tt, true) + }) + } + + // And with the DB fallback to make sure everything works the same + // still. + for _, tc := range testCases { + tc := tc + + t.Run("cache=false/"+tc.name, func(tt *testing.T) { + tt.Parallel() + + tc.fn(tt, false) + }) + } +} + +// runFindLowestFeePath tests that out of two routes with identical total // time lock values, the route with the lowest total fee should be returned. // The fee rates are chosen such that the test failed on the previous edge // weight function where one of the terms was fee squared. -func TestFindLowestFeePath(t *testing.T) { - t.Parallel() - +func runFindLowestFeePath(t *testing.T, useCache bool) { // Set up a test graph with two paths from roasbeef to target. Both // paths have equal total time locks, but the path through b has lower // fees (700 compared to 800 for the path through a). @@ -811,7 +907,7 @@ func TestFindLowestFeePath(t *testing.T) { }), } - ctx := newPathFindingTestContext(t, testChannels, "roasbeef") + ctx := newPathFindingTestContext(t, useCache, testChannels, "roasbeef") defer ctx.cleanup() const ( @@ -916,10 +1012,8 @@ var basicGraphPathFindingTests = []basicGraphPathFindingTestCase{ expectFailureNoPath: true, }} -func TestBasicGraphPathFinding(t *testing.T) { - t.Parallel() - - testGraphInstance, err := parseTestGraph(basicGraphFilePath) +func runBasicGraphPathFinding(t *testing.T, useCache bool) { + testGraphInstance, err := parseTestGraph(useCache, basicGraphFilePath) if err != nil { t.Fatalf("unable to create graph: %v", err) } @@ -1091,14 +1185,12 @@ func testBasicGraphPathFindingCase(t *testing.T, graphInstance *testGraphInstanc } } -// TestPathFindingWithAdditionalEdges asserts that we are able to find paths to +// runPathFindingWithAdditionalEdges asserts that we are able to find paths to // nodes that do not exist in the graph by way of hop hints. We also test that // the path can support custom TLV records for the receiver under the // appropriate circumstances. -func TestPathFindingWithAdditionalEdges(t *testing.T) { - t.Parallel() - - graph, err := parseTestGraph(basicGraphFilePath) +func runPathFindingWithAdditionalEdges(t *testing.T, useCache bool) { + graph, err := parseTestGraph(useCache, basicGraphFilePath) if err != nil { t.Fatalf("unable to create graph: %v", err) } @@ -1502,9 +1594,7 @@ func TestNewRoute(t *testing.T) { } } -func TestNewRoutePathTooLong(t *testing.T) { - t.Parallel() - +func runNewRoutePathTooLong(t *testing.T, useCache bool) { var testChannels []*testChannel // Setup a linear network of 21 hops. @@ -1522,7 +1612,7 @@ func TestNewRoutePathTooLong(t *testing.T) { fromNode = toNode } - ctx := newPathFindingTestContext(t, testChannels, "start") + ctx := newPathFindingTestContext(t, useCache, testChannels, "start") defer ctx.cleanup() // Assert that we can find 20 hop routes. @@ -1552,10 +1642,8 @@ func TestNewRoutePathTooLong(t *testing.T) { } } -func TestPathNotAvailable(t *testing.T) { - t.Parallel() - - graph, err := parseTestGraph(basicGraphFilePath) +func runPathNotAvailable(t *testing.T, useCache bool) { + graph, err := parseTestGraph(useCache, basicGraphFilePath) if err != nil { t.Fatalf("unable to create graph: %v", err) } @@ -1587,12 +1675,10 @@ func TestPathNotAvailable(t *testing.T) { } } -// TestDestTLVGraphFallback asserts that we properly detect when we can send TLV +// runDestTLVGraphFallback asserts that we properly detect when we can send TLV // records to a receiver, and also that we fallback to the receiver's node // announcement if we don't have an invoice features. -func TestDestTLVGraphFallback(t *testing.T) { - t.Parallel() - +func runDestTLVGraphFallback(t *testing.T, useCache bool) { testChannels := []*testChannel{ asymmetricTestChannel("roasbeef", "luoji", 100000, &testChannelPolicy{ @@ -1621,7 +1707,7 @@ func TestDestTLVGraphFallback(t *testing.T) { }, 0), } - ctx := newPathFindingTestContext(t, testChannels, "roasbeef") + ctx := newPathFindingTestContext(t, useCache, testChannels, "roasbeef") defer ctx.cleanup() sourceNode, err := ctx.graph.SourceNode() @@ -1689,12 +1775,10 @@ func TestDestTLVGraphFallback(t *testing.T) { assertExpectedPath(t, ctx.testGraphInstance.aliasMap, path, "luoji") } -// TestMissingFeatureDep asserts that we fail path finding when the +// runMissingFeatureDep asserts that we fail path finding when the // destination's features are broken, in that the feature vector doesn't signal // all transitive dependencies. -func TestMissingFeatureDep(t *testing.T) { - t.Parallel() - +func runMissingFeatureDep(t *testing.T, useCache bool) { testChannels := []*testChannel{ asymmetricTestChannel("roasbeef", "conner", 100000, &testChannelPolicy{ @@ -1728,7 +1812,7 @@ func TestMissingFeatureDep(t *testing.T) { ), } - ctx := newPathFindingTestContext(t, testChannels, "roasbeef") + ctx := newPathFindingTestContext(t, useCache, testChannels, "roasbeef") defer ctx.cleanup() // Conner's node in the graph has a broken feature vector, since it @@ -1766,12 +1850,10 @@ func TestMissingFeatureDep(t *testing.T) { } } -// TestUnknownRequiredFeatures asserts that we fail path finding when the +// runUnknownRequiredFeatures asserts that we fail path finding when the // destination requires an unknown required feature, and that we skip // intermediaries that signal unknown required features. -func TestUnknownRequiredFeatures(t *testing.T) { - t.Parallel() - +func runUnknownRequiredFeatures(t *testing.T, useCache bool) { testChannels := []*testChannel{ asymmetricTestChannel("roasbeef", "conner", 100000, &testChannelPolicy{ @@ -1805,7 +1887,7 @@ func TestUnknownRequiredFeatures(t *testing.T) { ), } - ctx := newPathFindingTestContext(t, testChannels, "roasbeef") + ctx := newPathFindingTestContext(t, useCache, testChannels, "roasbeef") defer ctx.cleanup() conner := ctx.keyFromAlias("conner") @@ -1832,12 +1914,10 @@ func TestUnknownRequiredFeatures(t *testing.T) { } } -// TestDestPaymentAddr asserts that we properly detect when we can send a +// runDestPaymentAddr asserts that we properly detect when we can send a // payment address to a receiver, and also that we fallback to the receiver's // node announcement if we don't have an invoice features. -func TestDestPaymentAddr(t *testing.T) { - t.Parallel() - +func runDestPaymentAddr(t *testing.T, useCache bool) { testChannels := []*testChannel{ symmetricTestChannel("roasbeef", "luoji", 100000, &testChannelPolicy{ @@ -1849,7 +1929,7 @@ func TestDestPaymentAddr(t *testing.T) { ), } - ctx := newPathFindingTestContext(t, testChannels, "roasbeef") + ctx := newPathFindingTestContext(t, useCache, testChannels, "roasbeef") defer ctx.cleanup() luoji := ctx.keyFromAlias("luoji") @@ -1877,10 +1957,8 @@ func TestDestPaymentAddr(t *testing.T) { assertExpectedPath(t, ctx.testGraphInstance.aliasMap, path, "luoji") } -func TestPathInsufficientCapacity(t *testing.T) { - t.Parallel() - - graph, err := parseTestGraph(basicGraphFilePath) +func runPathInsufficientCapacity(t *testing.T, useCache bool) { + graph, err := parseTestGraph(useCache, basicGraphFilePath) if err != nil { t.Fatalf("unable to create graph: %v", err) } @@ -1912,12 +1990,10 @@ func TestPathInsufficientCapacity(t *testing.T) { } } -// TestRouteFailMinHTLC tests that if we attempt to route an HTLC which is +// runRouteFailMinHTLC tests that if we attempt to route an HTLC which is // smaller than the advertised minHTLC of an edge, then path finding fails. -func TestRouteFailMinHTLC(t *testing.T) { - t.Parallel() - - graph, err := parseTestGraph(basicGraphFilePath) +func runRouteFailMinHTLC(t *testing.T, useCache bool) { + graph, err := parseTestGraph(useCache, basicGraphFilePath) if err != nil { t.Fatalf("unable to create graph: %v", err) } @@ -1943,11 +2019,9 @@ func TestRouteFailMinHTLC(t *testing.T) { } } -// TestRouteFailMaxHTLC tests that if we attempt to route an HTLC which is +// runRouteFailMaxHTLC tests that if we attempt to route an HTLC which is // larger than the advertised max HTLC of an edge, then path finding fails. -func TestRouteFailMaxHTLC(t *testing.T) { - t.Parallel() - +func runRouteFailMaxHTLC(t *testing.T, useCache bool) { // Set up a test graph: // roasbeef <--> firstHop <--> secondHop <--> target // We will be adjusting the max HTLC of the edge between the first and @@ -1974,7 +2048,7 @@ func TestRouteFailMaxHTLC(t *testing.T) { }), } - ctx := newPathFindingTestContext(t, testChannels, "roasbeef") + ctx := newPathFindingTestContext(t, useCache, testChannels, "roasbeef") defer ctx.cleanup() // First, attempt to send a payment greater than the max HTLC we are @@ -2007,15 +2081,13 @@ func TestRouteFailMaxHTLC(t *testing.T) { } } -// TestRouteFailDisabledEdge tests that if we attempt to route to an edge +// runRouteFailDisabledEdge tests that if we attempt to route to an edge // that's disabled, then that edge is disqualified, and the routing attempt // will fail. We also test that this is true only for non-local edges, as we'll // ignore the disable flags, with the assumption that the correct bandwidth is // found among the bandwidth hints. -func TestRouteFailDisabledEdge(t *testing.T) { - t.Parallel() - - graph, err := parseTestGraph(basicGraphFilePath) +func runRouteFailDisabledEdge(t *testing.T, useCache bool) { + graph, err := parseTestGraph(useCache, basicGraphFilePath) if err != nil { t.Fatalf("unable to create graph: %v", err) } @@ -2089,13 +2161,11 @@ func TestRouteFailDisabledEdge(t *testing.T) { } } -// TestPathSourceEdgesBandwidth tests that explicitly passing in a set of +// runPathSourceEdgesBandwidth tests that explicitly passing in a set of // bandwidth hints is used by the path finding algorithm to consider whether to // use a local channel. -func TestPathSourceEdgesBandwidth(t *testing.T) { - t.Parallel() - - graph, err := parseTestGraph(basicGraphFilePath) +func runPathSourceEdgesBandwidth(t *testing.T, useCache bool) { + graph, err := parseTestGraph(useCache, basicGraphFilePath) if err != nil { t.Fatalf("unable to create graph: %v", err) } @@ -2390,11 +2460,9 @@ func TestNewRouteFromEmptyHops(t *testing.T) { } } -// TestRestrictOutgoingChannel asserts that a outgoing channel restriction is +// runRestrictOutgoingChannel asserts that a outgoing channel restriction is // obeyed by the path finding algorithm. -func TestRestrictOutgoingChannel(t *testing.T) { - t.Parallel() - +func runRestrictOutgoingChannel(t *testing.T, useCache bool) { // Define channel id constants const ( chanSourceA = 1 @@ -2430,7 +2498,7 @@ func TestRestrictOutgoingChannel(t *testing.T) { }, chanSourceTarget), } - ctx := newPathFindingTestContext(t, testChannels, "roasbeef") + ctx := newPathFindingTestContext(t, useCache, testChannels, "roasbeef") defer ctx.cleanup() const ( @@ -2473,11 +2541,9 @@ func TestRestrictOutgoingChannel(t *testing.T) { } } -// TestRestrictLastHop asserts that a last hop restriction is obeyed by the path +// runRestrictLastHop asserts that a last hop restriction is obeyed by the path // finding algorithm. -func TestRestrictLastHop(t *testing.T) { - t.Parallel() - +func runRestrictLastHop(t *testing.T, useCache bool) { // Set up a test graph with three possible paths from roasbeef to // target. The path via channel 1 and 2 is the lowest cost path. testChannels := []*testChannel{ @@ -2497,7 +2563,7 @@ func TestRestrictLastHop(t *testing.T) { }, 4), } - ctx := newPathFindingTestContext(t, testChannels, "source") + ctx := newPathFindingTestContext(t, useCache, testChannels, "source") defer ctx.cleanup() paymentAmt := lnwire.NewMSatFromSatoshis(100) @@ -2518,15 +2584,23 @@ func TestRestrictLastHop(t *testing.T) { } } -// TestCltvLimit asserts that a cltv limit is obeyed by the path finding +// runCltvLimit asserts that a cltv limit is obeyed by the path finding // algorithm. -func TestCltvLimit(t *testing.T) { - t.Run("no limit", func(t *testing.T) { testCltvLimit(t, 2016, 1) }) - t.Run("no path", func(t *testing.T) { testCltvLimit(t, 50, 0) }) - t.Run("force high cost", func(t *testing.T) { testCltvLimit(t, 80, 3) }) +func runCltvLimit(t *testing.T, useCache bool) { + t.Run("no limit", func(t *testing.T) { + testCltvLimit(t, useCache, 2016, 1) + }) + t.Run("no path", func(t *testing.T) { + testCltvLimit(t, useCache, 50, 0) + }) + t.Run("force high cost", func(t *testing.T) { + testCltvLimit(t, useCache, 80, 3) + }) } -func testCltvLimit(t *testing.T, limit uint32, expectedChannel uint64) { +func testCltvLimit(t *testing.T, useCache bool, limit uint32, + expectedChannel uint64) { + t.Parallel() // Set up a test graph with three possible paths to the target. The path @@ -2560,7 +2634,7 @@ func testCltvLimit(t *testing.T, limit uint32, expectedChannel uint64) { }), } - ctx := newPathFindingTestContext(t, testChannels, "roasbeef") + ctx := newPathFindingTestContext(t, useCache, testChannels, "roasbeef") defer ctx.cleanup() paymentAmt := lnwire.NewMSatFromSatoshis(100) @@ -2603,11 +2677,9 @@ func testCltvLimit(t *testing.T, limit uint32, expectedChannel uint64) { } } -// TestProbabilityRouting asserts that path finding not only takes into account +// runProbabilityRouting asserts that path finding not only takes into account // fees but also success probability. -func TestProbabilityRouting(t *testing.T) { - t.Parallel() - +func runProbabilityRouting(t *testing.T, useCache bool) { testCases := []struct { name string p10, p11, p20 float64 @@ -2693,15 +2765,16 @@ func TestProbabilityRouting(t *testing.T) { t.Run(tc.name, func(t *testing.T) { testProbabilityRouting( - t, tc.amount, tc.p10, tc.p11, tc.p20, + t, useCache, tc.amount, tc.p10, tc.p11, tc.p20, tc.minProbability, tc.expectedChan, ) }) } } -func testProbabilityRouting(t *testing.T, paymentAmt btcutil.Amount, - p10, p11, p20, minProbability float64, expectedChan uint64) { +func testProbabilityRouting(t *testing.T, useCache bool, + paymentAmt btcutil.Amount, p10, p11, p20, minProbability float64, + expectedChan uint64) { t.Parallel() @@ -2728,7 +2801,7 @@ func testProbabilityRouting(t *testing.T, paymentAmt btcutil.Amount, }, 20), } - ctx := newPathFindingTestContext(t, testChannels, "roasbeef") + ctx := newPathFindingTestContext(t, useCache, testChannels, "roasbeef") defer ctx.cleanup() alias := ctx.testGraphInstance.aliasMap @@ -2782,11 +2855,9 @@ func testProbabilityRouting(t *testing.T, paymentAmt btcutil.Amount, } } -// TestEqualCostRouteSelection asserts that route probability will be used as a +// runEqualCostRouteSelection asserts that route probability will be used as a // tie breaker in case the path finding probabilities are equal. -func TestEqualCostRouteSelection(t *testing.T) { - t.Parallel() - +func runEqualCostRouteSelection(t *testing.T, useCache bool) { // Set up a test graph with two possible paths to the target: via a and // via b. The routing fees and probabilities are chosen such that the // algorithm will first explore target->a->source (backwards search). @@ -2811,7 +2882,7 @@ func TestEqualCostRouteSelection(t *testing.T) { }, 2), } - ctx := newPathFindingTestContext(t, testChannels, "source") + ctx := newPathFindingTestContext(t, useCache, testChannels, "source") defer ctx.cleanup() alias := ctx.testGraphInstance.aliasMap @@ -2848,11 +2919,9 @@ func TestEqualCostRouteSelection(t *testing.T) { } } -// TestNoCycle tries to guide the path finding algorithm into reconstructing an +// runNoCycle tries to guide the path finding algorithm into reconstructing an // endless route. It asserts that the algorithm is able to handle this properly. -func TestNoCycle(t *testing.T) { - t.Parallel() - +func runNoCycle(t *testing.T, useCache bool) { // Set up a test graph with two paths: source->a->target and // source->b->c->target. The fees are setup such that, searching // backwards, the algorithm will evaluate the following end of the route @@ -2882,7 +2951,7 @@ func TestNoCycle(t *testing.T) { }, 5), } - ctx := newPathFindingTestContext(t, testChannels, "source") + ctx := newPathFindingTestContext(t, useCache, testChannels, "source") defer ctx.cleanup() const ( @@ -2922,10 +2991,8 @@ func TestNoCycle(t *testing.T) { } } -// TestRouteToSelf tests that it is possible to find a route to the self node. -func TestRouteToSelf(t *testing.T) { - t.Parallel() - +// runRouteToSelf tests that it is possible to find a route to the self node. +func runRouteToSelf(t *testing.T, useCache bool) { testChannels := []*testChannel{ symmetricTestChannel("source", "a", 100000, &testChannelPolicy{ Expiry: 144, @@ -2941,7 +3008,7 @@ func TestRouteToSelf(t *testing.T) { }, 3), } - ctx := newPathFindingTestContext(t, testChannels, "source") + ctx := newPathFindingTestContext(t, useCache, testChannels, "source") defer ctx.cleanup() paymentAmt := lnwire.NewMSatFromSatoshis(100) @@ -2979,11 +3046,11 @@ type pathFindingTestContext struct { source route.Vertex } -func newPathFindingTestContext(t *testing.T, testChannels []*testChannel, - source string) *pathFindingTestContext { +func newPathFindingTestContext(t *testing.T, useCache bool, + testChannels []*testChannel, source string) *pathFindingTestContext { testGraphInstance, err := createTestGraphFromChannels( - testChannels, source, + useCache, testChannels, source, ) if err != nil { t.Fatalf("unable to create graph: %v", err) @@ -3059,11 +3126,22 @@ func dbFindPath(graph *channeldb.ChannelGraph, source, target route.Vertex, amt lnwire.MilliSatoshi, finalHtlcExpiry int32) ([]*channeldb.CachedEdgePolicy, error) { - routingGraph, err := NewCachedGraph(graph) + sourceNode, err := graph.SourceNode() if err != nil { return nil, err } + routingGraph, err := NewCachedGraph(sourceNode, graph) + if err != nil { + return nil, err + } + + defer func() { + if err := routingGraph.close(); err != nil { + log.Errorf("Error closing db tx: %v", err) + } + }() + return findPath( &graphParams{ additionalEdges: additionalEdges, diff --git a/routing/payment_lifecycle_test.go b/routing/payment_lifecycle_test.go index 64add459..df8cbe4a 100644 --- a/routing/payment_lifecycle_test.go +++ b/routing/payment_lifecycle_test.go @@ -180,7 +180,7 @@ func TestRouterPaymentStateMachine(t *testing.T) { }, 2), } - testGraph, err := createTestGraphFromChannels(testChannels, "a") + testGraph, err := createTestGraphFromChannels(true, testChannels, "a") if err != nil { t.Fatalf("unable to create graph: %v", err) } diff --git a/routing/payment_session.go b/routing/payment_session.go index 8895d28f..4d593113 100644 --- a/routing/payment_session.go +++ b/routing/payment_session.go @@ -164,7 +164,7 @@ type PaymentSession interface { type paymentSession struct { additionalEdges map[route.Vertex][]*channeldb.CachedEdgePolicy - getBandwidthHints func() (bandwidthHints, error) + getBandwidthHints func(routingGraph) (bandwidthHints, error) payment *LightningPayment @@ -172,7 +172,7 @@ type paymentSession struct { pathFinder pathFinder - routingGraph routingGraph + getRoutingGraph func() (routingGraph, func(), error) // pathFindingConfig defines global parameters that control the // trade-off in path finding between fees and probabiity. @@ -192,8 +192,8 @@ type paymentSession struct { // newPaymentSession instantiates a new payment session. func newPaymentSession(p *LightningPayment, - getBandwidthHints func() (bandwidthHints, error), - routingGraph routingGraph, + getBandwidthHints func(routingGraph) (bandwidthHints, error), + getRoutingGraph func() (routingGraph, func(), error), missionControl MissionController, pathFindingConfig PathFindingConfig) ( *paymentSession, error) { @@ -209,7 +209,7 @@ func newPaymentSession(p *LightningPayment, getBandwidthHints: getBandwidthHints, payment: p, pathFinder: findPath, - routingGraph: routingGraph, + getRoutingGraph: getRoutingGraph, pathFindingConfig: pathFindingConfig, missionControl: missionControl, minShardAmt: DefaultShardMinAmt, @@ -274,33 +274,42 @@ func (p *paymentSession) RequestRoute(maxAmt, feeLimit lnwire.MilliSatoshi, } for { + // Get a routing graph. + routingGraph, cleanup, err := p.getRoutingGraph() + if err != nil { + return nil, err + } + // We'll also obtain a set of bandwidthHints from the lower // layer for each of our outbound channels. This will allow the // path finding to skip any links that aren't active or just // don't have enough bandwidth to carry the payment. New // bandwidth hints are queried for every new path finding // attempt, because concurrent payments may change balances. - bandwidthHints, err := p.getBandwidthHints() + bandwidthHints, err := p.getBandwidthHints(routingGraph) if err != nil { return nil, err } p.log.Debugf("pathfinding for amt=%v", maxAmt) - sourceVertex := p.routingGraph.sourceNode() + sourceVertex := routingGraph.sourceNode() // Find a route for the current amount. path, err := p.pathFinder( &graphParams{ additionalEdges: p.additionalEdges, bandwidthHints: bandwidthHints, - graph: p.routingGraph, + graph: routingGraph, }, restrictions, &p.pathFindingConfig, sourceVertex, p.payment.Target, maxAmt, finalHtlcExpiry, ) + // Close routing graph. + cleanup() + switch { case err == errNoPathFound: // Don't split if this is a legacy payment without mpp diff --git a/routing/payment_session_source.go b/routing/payment_session_source.go index 6889d0a1..930d69e0 100644 --- a/routing/payment_session_source.go +++ b/routing/payment_session_source.go @@ -17,7 +17,10 @@ var _ PaymentSessionSource = (*SessionSource)(nil) type SessionSource struct { // Graph is the channel graph that will be used to gather metrics from // and also to carry out path finding queries. - Graph routingGraph + Graph *channeldb.ChannelGraph + + // SourceNode is the graph's source node. + SourceNode *channeldb.LightningNode // GetLink is a method that allows querying the lower link layer // to determine the up to date available bandwidth at a prospective link @@ -40,6 +43,21 @@ type SessionSource struct { PathFindingConfig PathFindingConfig } +// getRoutingGraph returns a routing graph and a clean-up function for +// pathfinding. +func (m *SessionSource) getRoutingGraph() (routingGraph, func(), error) { + routingTx, err := NewCachedGraph(m.SourceNode, m.Graph) + if err != nil { + return nil, nil, err + } + return routingTx, func() { + err := routingTx.close() + if err != nil { + log.Errorf("Error closing db tx: %v", err) + } + }, nil +} + // NewPaymentSession creates a new payment session backed by the latest prune // view from Mission Control. An optional set of routing hints can be provided // in order to populate additional edges to explore when finding a path to the @@ -47,14 +65,14 @@ type SessionSource struct { func (m *SessionSource) NewPaymentSession(p *LightningPayment) ( PaymentSession, error) { - sourceNode := m.Graph.sourceNode() - - getBandwidthHints := func() (bandwidthHints, error) { - return newBandwidthManager(m.Graph, sourceNode, m.GetLink) + getBandwidthHints := func(graph routingGraph) (bandwidthHints, error) { + return newBandwidthManager( + graph, m.SourceNode.PubKeyBytes, m.GetLink, + ) } session, err := newPaymentSession( - p, getBandwidthHints, m.Graph, + p, getBandwidthHints, m.getRoutingGraph, m.MissionControl, m.PathFindingConfig, ) if err != nil { diff --git a/routing/payment_session_test.go b/routing/payment_session_test.go index f177da73..11823d4c 100644 --- a/routing/payment_session_test.go +++ b/routing/payment_session_test.go @@ -116,10 +116,12 @@ func TestUpdateAdditionalEdge(t *testing.T) { // Create the paymentsession. session, err := newPaymentSession( payment, - func() (bandwidthHints, error) { + func(routingGraph) (bandwidthHints, error) { return &mockBandwidthHints{}, nil }, - &sessionGraph{}, + func() (routingGraph, func(), error) { + return &sessionGraph{}, func() {}, nil + }, &MissionControl{}, PathFindingConfig{}, ) @@ -194,10 +196,12 @@ func TestRequestRoute(t *testing.T) { session, err := newPaymentSession( payment, - func() (bandwidthHints, error) { + func(routingGraph) (bandwidthHints, error) { return &mockBandwidthHints{}, nil }, - &sessionGraph{}, + func() (routingGraph, func(), error) { + return &sessionGraph{}, func() {}, nil + }, &MissionControl{}, PathFindingConfig{}, ) diff --git a/routing/router_test.go b/routing/router_test.go index 77681b2a..34bfcac0 100644 --- a/routing/router_test.go +++ b/routing/router_test.go @@ -129,11 +129,11 @@ func createTestCtxFromGraphInstanceAssumeValid(t *testing.T, ) require.NoError(t, err, "failed to create missioncontrol") - cachedGraph, err := NewCachedGraph(graphInstance.graph) + sourceNode, err := graphInstance.graph.SourceNode() require.NoError(t, err) - sessionSource := &SessionSource{ - Graph: cachedGraph, + Graph: graphInstance.graph, + SourceNode: sourceNode, GetLink: graphInstance.getLink, PathFindingConfig: pathFindingConfig, MissionControl: mc, @@ -190,7 +190,7 @@ func createTestCtxFromGraphInstanceAssumeValid(t *testing.T, func createTestCtxSingleNode(t *testing.T, startingHeight uint32) (*testCtx, func()) { - graph, graphBackend, cleanup, err := makeTestGraph() + graph, graphBackend, cleanup, err := makeTestGraph(true) require.NoError(t, err, "failed to make test graph") sourceNode, err := createTestNode() @@ -216,7 +216,7 @@ func createTestCtxFromFile(t *testing.T, // We'll attempt to locate and parse out the file // that encodes the graph that our tests should be run against. - graphInstance, err := parseTestGraph(testGraph) + graphInstance, err := parseTestGraph(true, testGraph) require.NoError(t, err, "unable to create test graph") return createTestCtxFromGraphInstance( @@ -387,7 +387,7 @@ func TestChannelUpdateValidation(t *testing.T) { }, 2), } - testGraph, err := createTestGraphFromChannels(testChannels, "a") + testGraph, err := createTestGraphFromChannels(true, testChannels, "a") require.NoError(t, err, "unable to create graph") defer testGraph.cleanUp() @@ -1246,7 +1246,7 @@ func TestIgnoreChannelEdgePolicyForUnknownChannel(t *testing.T) { // Setup an initially empty network. testChannels := []*testChannel{} testGraph, err := createTestGraphFromChannels( - testChannels, "roasbeef", + true, testChannels, "roasbeef", ) if err != nil { t.Fatalf("unable to create graph: %v", err) @@ -2260,7 +2260,9 @@ func TestPruneChannelGraphStaleEdges(t *testing.T) { for _, strictPruning := range []bool{true, false} { // We'll create our test graph and router backed with these test // channels we've created. - testGraph, err := createTestGraphFromChannels(testChannels, "a") + testGraph, err := createTestGraphFromChannels( + true, testChannels, "a", + ) if err != nil { t.Fatalf("unable to create test graph: %v", err) } @@ -2390,7 +2392,9 @@ func testPruneChannelGraphDoubleDisabled(t *testing.T, assumeValid bool) { // We'll create our test graph and router backed with these test // channels we've created. - testGraph, err := createTestGraphFromChannels(testChannels, "self") + testGraph, err := createTestGraphFromChannels( + true, testChannels, "self", + ) if err != nil { t.Fatalf("unable to create test graph: %v", err) } @@ -2760,7 +2764,7 @@ func TestUnknownErrorSource(t *testing.T) { }, 4), } - testGraph, err := createTestGraphFromChannels(testChannels, "a") + testGraph, err := createTestGraphFromChannels(true, testChannels, "a") defer testGraph.cleanUp() if err != nil { t.Fatalf("unable to create graph: %v", err) @@ -2896,7 +2900,7 @@ func TestSendToRouteStructuredError(t *testing.T) { }, 2), } - testGraph, err := createTestGraphFromChannels(testChannels, "a") + testGraph, err := createTestGraphFromChannels(true, testChannels, "a") if err != nil { t.Fatalf("unable to create graph: %v", err) } @@ -3145,7 +3149,7 @@ func TestSendToRouteMaxHops(t *testing.T) { }, 1), } - testGraph, err := createTestGraphFromChannels(testChannels, "a") + testGraph, err := createTestGraphFromChannels(true, testChannels, "a") if err != nil { t.Fatalf("unable to create graph: %v", err) } @@ -3256,7 +3260,7 @@ func TestBuildRoute(t *testing.T) { }, 4), } - testGraph, err := createTestGraphFromChannels(testChannels, "a") + testGraph, err := createTestGraphFromChannels(true, testChannels, "a") if err != nil { t.Fatalf("unable to create graph: %v", err) } @@ -3496,7 +3500,7 @@ func createDummyTestGraph(t *testing.T) *testGraphInstance { }, 2), } - testGraph, err := createTestGraphFromChannels(testChannels, "a") + testGraph, err := createTestGraphFromChannels(true, testChannels, "a") require.NoError(t, err, "failed to create graph") return testGraph } diff --git a/sample-lnd.conf b/sample-lnd.conf index 2bcbe10c..69533d33 100644 --- a/sample-lnd.conf +++ b/sample-lnd.conf @@ -1118,6 +1118,9 @@ litecoin.node=ltcd ; a batch of modifications to disk. Defaults to 500 milliseconds. ; db.batch-commit-interval=500ms +; Don't use the in-memory graph cache for path finding. Much slower but uses +; less RAM. Can only be used with a bolt database backend. +; db.no-graph-cache=true [etcd] @@ -1170,6 +1173,27 @@ litecoin.node=ltcd ; disable. ; db.postgres.timeout= +[bolt] + +; If true, prevents the database from syncing its freelist to disk. +; db.bolt.nofreelistsync=1 + +; Whether the databases used within lnd should automatically be compacted on +; every startup (and if the database has the configured minimum age). This is +; disabled by default because it requires additional disk space to be available +; during the compaction that is freed afterwards. In general compaction leads to +; smaller database files. +; db.bolt.auto-compact=true + +; How long ago the last compaction of a database file must be for it to be +; considered for auto compaction again. Can be set to 0 to compact on every +; startup. (default: 168h) +; db.bolt.auto-compact-min-age=0 + +; Specify the timeout to be used when opening the database. +; db.bolt.dbtimeout=60s + + [cluster] ; Enables leader election if set. @@ -1217,26 +1241,6 @@ litecoin.node=ltcd ; The TLS certificate to use for establishing the remote signer's identity. ; remotesigner.tlscertpath=/path/to/remote/signer/tls.cert -[bolt] - -; If true, prevents the database from syncing its freelist to disk. -; db.bolt.nofreelistsync=1 - -; Whether the databases used within lnd should automatically be compacted on -; every startup (and if the database has the configured minimum age). This is -; disabled by default because it requires additional disk space to be available -; during the compaction that is freed afterwards. In general compaction leads to -; smaller database files. -; db.bolt.auto-compact=true - -; How long ago the last compaction of a database file must be for it to be -; considered for auto compaction again. Can be set to 0 to compact on every -; startup. (default: 168h) -; db.bolt.auto-compact-min-age=0 - -; Specify the timeout to be used when opening the database. -; db.bolt.dbtimeout=60s - [gossip] diff --git a/server.go b/server.go index e2eb3039..354b90db 100644 --- a/server.go +++ b/server.go @@ -860,12 +860,13 @@ func newServer(cfg *Config, listenAddrs []net.Addr, MinProbability: routingConfig.MinRouteProbability, } - cachedGraph, err := routing.NewCachedGraph(chanGraph) + sourceNode, err := chanGraph.SourceNode() if err != nil { - return nil, err + return nil, fmt.Errorf("error getting source node: %v", err) } paymentSessionSource := &routing.SessionSource{ - Graph: cachedGraph, + Graph: chanGraph, + SourceNode: sourceNode, MissionControl: s.missionControl, GetLink: s.htlcSwitch.GetLinkByShortID, PathFindingConfig: pathFindingConfig,