diff --git a/autopilot/graph.go b/autopilot/graph.go index 2624aa79..e630f8d3 100644 --- a/autopilot/graph.go +++ b/autopilot/graph.go @@ -148,7 +148,7 @@ func (d *databaseChannelGraph) addRandChannel(node1, node2 *btcec.PublicKey, return nil, err } - dbNode, err := d.db.FetchLightningNode(nil, vertex) + dbNode, err := d.db.FetchLightningNode(vertex) switch { case err == channeldb.ErrGraphNodeNotFound: fallthrough diff --git a/channeldb/db.go b/channeldb/db.go index 3ceb93d9..63363989 100644 --- a/channeldb/db.go +++ b/channeldb/db.go @@ -23,6 +23,7 @@ import ( "github.com/lightningnetwork/lnd/clock" "github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/routing/route" ) const ( @@ -286,10 +287,14 @@ func CreateWithBackend(backend kvdb.Backend, modifiers ...OptionModifier) (*DB, // Set the parent pointer (only used in tests). chanDB.channelStateDB.parent = chanDB - chanDB.graph = newChannelGraph( + var err error + chanDB.graph, err = NewChannelGraph( backend, opts.RejectCacheSize, opts.ChannelCacheSize, opts.BatchCommitInterval, ) + if err != nil { + return nil, err + } // Synchronize the version of database and apply migrations if needed. if err := chanDB.syncVersions(dbVersions); err != nil { @@ -305,7 +310,7 @@ func (d *DB) Path() string { return d.dbPath } -var topLevelBuckets = [][]byte{ +var dbTopLevelBuckets = [][]byte{ openChannelBucket, closedChannelBucket, forwardingLogBucket, @@ -316,10 +321,6 @@ var topLevelBuckets = [][]byte{ paymentsIndexBucket, peersBucket, nodeInfoBucket, - nodeBucket, - edgeBucket, - edgeIndexBucket, - graphMetaBucket, metaBucket, closeSummaryBucket, outpointBucket, @@ -330,7 +331,7 @@ var topLevelBuckets = [][]byte{ // operation is fully atomic. func (d *DB) Wipe() error { err := kvdb.Update(d, func(tx kvdb.RwTx) error { - for _, tlb := range topLevelBuckets { + for _, tlb := range dbTopLevelBuckets { err := tx.DeleteTopLevelBucket(tlb) if err != nil && err != kvdb.ErrBucketNotFound { return err @@ -358,42 +359,12 @@ func initChannelDB(db kvdb.Backend) error { return nil } - for _, tlb := range topLevelBuckets { + for _, tlb := range dbTopLevelBuckets { if _, err := tx.CreateTopLevelBucket(tlb); err != nil { return err } } - nodes := tx.ReadWriteBucket(nodeBucket) - _, err = nodes.CreateBucket(aliasIndexBucket) - if err != nil { - return err - } - _, err = nodes.CreateBucket(nodeUpdateIndexBucket) - if err != nil { - return err - } - - edges := tx.ReadWriteBucket(edgeBucket) - if _, err := edges.CreateBucket(edgeIndexBucket); err != nil { - return err - } - if _, err := edges.CreateBucket(edgeUpdateIndexBucket); err != nil { - return err - } - if _, err := edges.CreateBucket(channelPointBucket); err != nil { - return err - } - if _, err := edges.CreateBucket(zombieBucket); err != nil { - return err - } - - graphMeta := tx.ReadWriteBucket(graphMetaBucket) - _, err = graphMeta.CreateBucket(pruneLogBucket) - if err != nil { - return err - } - meta.DbVersionNumber = getLatestDBVersion(dbVersions) return putMeta(meta, tx) }, func() {}) @@ -1157,30 +1128,21 @@ func (d *DB) AddrsForNode(nodePub *btcec.PublicKey) ([]net.Addr, return nil, err } - var graphNode LightningNode - err = kvdb.View(d, func(tx kvdb.RTx) error { - // We'll also query the graph for this peer to see if they have - // any addresses that we don't currently have stored within the - // link node database. - nodes := tx.ReadBucket(nodeBucket) - if nodes == nil { - return ErrGraphNotFound - } - compressedPubKey := nodePub.SerializeCompressed() - graphNode, err = fetchLightningNode(nodes, compressedPubKey) - if err != nil && err != ErrGraphNodeNotFound { - // If the node isn't found, then that's OK, as we still - // have the link node data. - return err - } - - return nil - }, func() { - linkNode = nil - }) + // We'll also query the graph for this peer to see if they have any + // addresses that we don't currently have stored within the link node + // database. + pubKey, err := route.NewVertexFromBytes(nodePub.SerializeCompressed()) if err != nil { return nil, err } + graphNode, err := d.graph.FetchLightningNode(pubKey) + if err != nil && err != ErrGraphNodeNotFound { + return nil, err + } else if err == ErrGraphNodeNotFound { + // If the node isn't found, then that's OK, as we still have the + // link node data. But any other error needs to be returned. + graphNode = &LightningNode{} + } // Now that we have both sources of addrs for this node, we'll use a // map to de-duplicate any addresses between the two sources, and diff --git a/channeldb/graph.go b/channeldb/graph.go index cb926830..92b04bd9 100644 --- a/channeldb/graph.go +++ b/channeldb/graph.go @@ -184,10 +184,14 @@ type ChannelGraph struct { nodeScheduler batch.Scheduler } -// newChannelGraph allocates a new ChannelGraph backed by a DB instance. The +// NewChannelGraph allocates a new ChannelGraph backed by a DB instance. The // returned instance has its own unique reject cache and channel cache. -func newChannelGraph(db kvdb.Backend, rejectCacheSize, chanCacheSize int, - batchCommitInterval time.Duration) *ChannelGraph { +func NewChannelGraph(db kvdb.Backend, rejectCacheSize, chanCacheSize int, + batchCommitInterval time.Duration) (*ChannelGraph, error) { + + if err := initChannelGraph(db); err != nil { + return nil, err + } g := &ChannelGraph{ db: db, @@ -201,7 +205,85 @@ func newChannelGraph(db kvdb.Backend, rejectCacheSize, chanCacheSize int, db, nil, batchCommitInterval, ) - return g + return g, nil +} + +var graphTopLevelBuckets = [][]byte{ + nodeBucket, + edgeBucket, + edgeIndexBucket, + graphMetaBucket, +} + +// Wipe completely deletes all saved state within all used buckets within the +// database. The deletion is done in a single transaction, therefore this +// operation is fully atomic. +func (c *ChannelGraph) Wipe() error { + err := kvdb.Update(c.db, func(tx kvdb.RwTx) error { + for _, tlb := range graphTopLevelBuckets { + err := tx.DeleteTopLevelBucket(tlb) + if err != nil && err != kvdb.ErrBucketNotFound { + return err + } + } + return nil + }, func() {}) + if err != nil { + return err + } + + return initChannelGraph(c.db) +} + +// createChannelDB creates and initializes a fresh version of channeldb. In +// the case that the target path has not yet been created or doesn't yet exist, +// then the path is created. Additionally, all required top-level buckets used +// within the database are created. +func initChannelGraph(db kvdb.Backend) error { + err := kvdb.Update(db, func(tx kvdb.RwTx) error { + for _, tlb := range graphTopLevelBuckets { + if _, err := tx.CreateTopLevelBucket(tlb); err != nil { + return err + } + } + + nodes := tx.ReadWriteBucket(nodeBucket) + _, err := nodes.CreateBucketIfNotExists(aliasIndexBucket) + if err != nil { + return err + } + _, err = nodes.CreateBucketIfNotExists(nodeUpdateIndexBucket) + if err != nil { + return err + } + + edges := tx.ReadWriteBucket(edgeBucket) + _, err = edges.CreateBucketIfNotExists(edgeIndexBucket) + if err != nil { + return err + } + _, err = edges.CreateBucketIfNotExists(edgeUpdateIndexBucket) + if err != nil { + return err + } + _, err = edges.CreateBucketIfNotExists(channelPointBucket) + if err != nil { + return err + } + _, err = edges.CreateBucketIfNotExists(zombieBucket) + if err != nil { + return err + } + + graphMeta := tx.ReadWriteBucket(graphMetaBucket) + _, err = graphMeta.CreateBucketIfNotExists(pruneLogBucket) + return err + }, func() {}) + if err != nil { + return fmt.Errorf("unable to create new channel graph: %v", err) + } + + return nil } // Database returns a pointer to the underlying database. @@ -218,7 +300,9 @@ func (c *ChannelGraph) Database() kvdb.Backend { // NOTE: If an edge can't be found, or wasn't advertised, then a nil pointer // for that particular channel edge routing policy will be passed into the // callback. -func (c *ChannelGraph) ForEachChannel(cb func(*ChannelEdgeInfo, *ChannelEdgePolicy, *ChannelEdgePolicy) error) error { +func (c *ChannelGraph) ForEachChannel(cb func(*ChannelEdgeInfo, + *ChannelEdgePolicy, *ChannelEdgePolicy) error) error { + // TODO(roasbeef): ptr map to reduce # of allocs? no duplicates return kvdb.View(c.db, func(tx kvdb.RTx) error { @@ -2356,17 +2440,11 @@ func (l *LightningNode) isPublic(tx kvdb.RTx, sourcePubKey []byte) (bool, error) // FetchLightningNode attempts to look up a target node by its identity public // key. If the node isn't found in the database, then ErrGraphNodeNotFound is // returned. -// -// If the caller wishes to re-use an existing boltdb transaction, then it -// should be passed as the first argument. Otherwise the first argument should -// be nil and a fresh transaction will be created to execute the graph -// traversal. -func (c *ChannelGraph) FetchLightningNode(tx kvdb.RTx, nodePub route.Vertex) ( +func (c *ChannelGraph) FetchLightningNode(nodePub route.Vertex) ( *LightningNode, error) { var node *LightningNode - - fetchNode := func(tx kvdb.RTx) error { + err := kvdb.View(c.db, func(tx kvdb.RTx) error { // First grab the nodes bucket which stores the mapping from // pubKey to node information. nodes := tx.ReadBucket(nodeBucket) @@ -2393,14 +2471,9 @@ func (c *ChannelGraph) FetchLightningNode(tx kvdb.RTx, nodePub route.Vertex) ( node = &n return nil - } - - var err error - if tx == nil { - err = kvdb.View(c.db, fetchNode, func() {}) - } else { - err = fetchNode(tx) - } + }, func() { + node = nil + }) if err != nil { return nil, err } diff --git a/channeldb/graph_test.go b/channeldb/graph_test.go index 6d044297..d2953a52 100644 --- a/channeldb/graph_test.go +++ b/channeldb/graph_test.go @@ -6,10 +6,12 @@ import ( "errors" "fmt" "image/color" + "io/ioutil" "math" "math/big" prand "math/rand" "net" + "os" "reflect" "runtime" "sync" @@ -45,6 +47,48 @@ var ( testPub = route.Vertex{2, 202, 4} ) +// MakeTestGraph creates a new instance of the ChannelGraph for testing +// purposes. A callback which cleans up the created temporary directories is +// also returned and intended to be executed after the test completes. +func MakeTestGraph(modifiers ...OptionModifier) (*ChannelGraph, func(), error) { + // First, create a temporary directory to be used for the duration of + // this test. + tempDirName, err := ioutil.TempDir("", "channelgraph") + if err != nil { + return nil, nil, err + } + + opts := DefaultOptions() + for _, modifier := range modifiers { + modifier(&opts) + } + + // Next, create channelgraph for the first time. + backend, backendCleanup, err := kvdb.GetTestBackend(tempDirName, "cgr") + if err != nil { + backendCleanup() + return nil, nil, err + } + + graph, err := NewChannelGraph( + backend, opts.RejectCacheSize, opts.ChannelCacheSize, + opts.BatchCommitInterval, + ) + if err != nil { + backendCleanup() + _ = os.RemoveAll(tempDirName) + return nil, nil, err + } + + cleanUp := func() { + _ = backend.Close() + backendCleanup() + _ = os.RemoveAll(tempDirName) + } + + return graph, cleanUp, nil +} + func createLightningNode(db kvdb.Backend, priv *btcec.PrivateKey) (*LightningNode, error) { updateTime := prand.Int63() @@ -76,14 +120,12 @@ func createTestVertex(db kvdb.Backend) (*LightningNode, error) { func TestNodeInsertionAndDeletion(t *testing.T) { t.Parallel() - db, cleanUp, err := MakeTestDB() + graph, cleanUp, err := MakeTestGraph() defer cleanUp() if err != nil { t.Fatalf("unable to make test database: %v", err) } - graph := db.ChannelGraph() - // We'd like to test basic insertion/deletion for vertexes from the // graph, so we'll create a test vertex to start with. node := &LightningNode{ @@ -107,7 +149,7 @@ func TestNodeInsertionAndDeletion(t *testing.T) { // Next, fetch the node from the database to ensure everything was // serialized properly. - dbNode, err := graph.FetchLightningNode(nil, testPub) + dbNode, err := graph.FetchLightningNode(testPub) if err != nil { t.Fatalf("unable to locate node: %v", err) } @@ -131,7 +173,7 @@ func TestNodeInsertionAndDeletion(t *testing.T) { // Finally, attempt to fetch the node again. This should fail as the // node should have been deleted from the database. - _, err = graph.FetchLightningNode(nil, testPub) + _, err = graph.FetchLightningNode(testPub) if err != ErrGraphNodeNotFound { t.Fatalf("fetch after delete should fail!") } @@ -142,14 +184,12 @@ func TestNodeInsertionAndDeletion(t *testing.T) { func TestPartialNode(t *testing.T) { t.Parallel() - db, cleanUp, err := MakeTestDB() + graph, cleanUp, err := MakeTestGraph() defer cleanUp() if err != nil { t.Fatalf("unable to make test database: %v", err) } - graph := db.ChannelGraph() - // We want to be able to insert nodes into the graph that only has the // PubKey set. node := &LightningNode{ @@ -163,7 +203,7 @@ func TestPartialNode(t *testing.T) { // Next, fetch the node from the database to ensure everything was // serialized properly. - dbNode, err := graph.FetchLightningNode(nil, testPub) + dbNode, err := graph.FetchLightningNode(testPub) if err != nil { t.Fatalf("unable to locate node: %v", err) } @@ -195,7 +235,7 @@ func TestPartialNode(t *testing.T) { // Finally, attempt to fetch the node again. This should fail as the // node should have been deleted from the database. - _, err = graph.FetchLightningNode(nil, testPub) + _, err = graph.FetchLightningNode(testPub) if err != ErrGraphNodeNotFound { t.Fatalf("fetch after delete should fail!") } @@ -204,14 +244,12 @@ func TestPartialNode(t *testing.T) { func TestAliasLookup(t *testing.T) { t.Parallel() - db, cleanUp, err := MakeTestDB() + graph, cleanUp, err := MakeTestGraph() defer cleanUp() if err != nil { t.Fatalf("unable to make test database: %v", err) } - graph := db.ChannelGraph() - // We'd like to test the alias index within the database, so first // create a new test node. testNode, err := createTestVertex(graph.db) @@ -258,13 +296,11 @@ func TestAliasLookup(t *testing.T) { func TestSourceNode(t *testing.T) { t.Parallel() - db, cleanUp, err := MakeTestDB() - defer cleanUp() + graph, cleanUp, err := MakeTestGraph() if err != nil { t.Fatalf("unable to make test database: %v", err) } - - graph := db.ChannelGraph() + defer cleanUp() // We'd like to test the setting/getting of the source node, so we // first create a fake node to use within the test. @@ -299,14 +335,12 @@ func TestSourceNode(t *testing.T) { func TestEdgeInsertionDeletion(t *testing.T) { t.Parallel() - db, cleanUp, err := MakeTestDB() + graph, cleanUp, err := MakeTestGraph() defer cleanUp() if err != nil { t.Fatalf("unable to make test database: %v", err) } - graph := db.ChannelGraph() - // We'd like to test the insertion/deletion of edges, so we create two // vertexes to connect. node1, err := createTestVertex(graph.db) @@ -434,13 +468,12 @@ func createEdge(height, txIndex uint32, txPosition uint16, outPointIndex uint32, func TestDisconnectBlockAtHeight(t *testing.T) { t.Parallel() - db, cleanUp, err := MakeTestDB() + graph, cleanUp, err := MakeTestGraph() defer cleanUp() if err != nil { t.Fatalf("unable to make test database: %v", err) } - graph := db.ChannelGraph() sourceNode, err := createTestVertex(graph.db) if err != nil { t.Fatalf("unable to create source node: %v", err) @@ -721,14 +754,12 @@ func createChannelEdge(db kvdb.Backend, node1, node2 *LightningNode) (*ChannelEd func TestEdgeInfoUpdates(t *testing.T) { t.Parallel() - db, cleanUp, err := MakeTestDB() + graph, cleanUp, err := MakeTestGraph() defer cleanUp() if err != nil { t.Fatalf("unable to make test database: %v", err) } - graph := db.ChannelGraph() - // We'd like to test the update of edges inserted into the database, so // we create two vertexes to connect. node1, err := createTestVertex(graph.db) @@ -851,14 +882,12 @@ func newEdgePolicy(chanID uint64, db kvdb.Backend, func TestGraphTraversal(t *testing.T) { t.Parallel() - db, cleanUp, err := MakeTestDB() + graph, cleanUp, err := MakeTestGraph() defer cleanUp() if err != nil { t.Fatalf("unable to make test database: %v", err) } - graph := db.ChannelGraph() - // We'd like to test some of the graph traversal capabilities within // the DB, so we'll create a series of fake nodes to insert into the // graph. @@ -1112,13 +1141,12 @@ func assertChanViewEqualChanPoints(t *testing.T, a []EdgePoint, b []*wire.OutPoi func TestGraphPruning(t *testing.T) { t.Parallel() - db, cleanUp, err := MakeTestDB() + graph, cleanUp, err := MakeTestGraph() defer cleanUp() if err != nil { t.Fatalf("unable to make test database: %v", err) } - graph := db.ChannelGraph() sourceNode, err := createTestVertex(graph.db) if err != nil { t.Fatalf("unable to create source node: %v", err) @@ -1320,14 +1348,12 @@ func TestGraphPruning(t *testing.T) { func TestHighestChanID(t *testing.T) { t.Parallel() - db, cleanUp, err := MakeTestDB() + graph, cleanUp, err := MakeTestGraph() defer cleanUp() if err != nil { t.Fatalf("unable to make test database: %v", err) } - graph := db.ChannelGraph() - // If we don't yet have any channels in the database, then we should // get a channel ID of zero if we ask for the highest channel ID. bestID, err := graph.HighestChanID() @@ -1397,14 +1423,12 @@ func TestHighestChanID(t *testing.T) { func TestChanUpdatesInHorizon(t *testing.T) { t.Parallel() - db, cleanUp, err := MakeTestDB() + graph, cleanUp, err := MakeTestGraph() defer cleanUp() if err != nil { t.Fatalf("unable to make test database: %v", err) } - graph := db.ChannelGraph() - // If we issue an arbitrary query before any channel updates are // inserted in the database, we should get zero results. chanUpdates, err := graph.ChanUpdatesInHorizon( @@ -1567,14 +1591,12 @@ func TestChanUpdatesInHorizon(t *testing.T) { func TestNodeUpdatesInHorizon(t *testing.T) { t.Parallel() - db, cleanUp, err := MakeTestDB() + graph, cleanUp, err := MakeTestGraph() defer cleanUp() if err != nil { t.Fatalf("unable to make test database: %v", err) } - graph := db.ChannelGraph() - startTime := time.Unix(1234, 0) endTime := startTime @@ -1690,14 +1712,12 @@ func TestNodeUpdatesInHorizon(t *testing.T) { func TestFilterKnownChanIDs(t *testing.T) { t.Parallel() - db, cleanUp, err := MakeTestDB() + graph, cleanUp, err := MakeTestGraph() defer cleanUp() if err != nil { t.Fatalf("unable to make test database: %v", err) } - graph := db.ChannelGraph() - // If we try to filter out a set of channel ID's before we even know of // any channels, then we should get the entire set back. preChanIDs := []uint64{1, 2, 3, 4} @@ -1807,14 +1827,12 @@ func TestFilterKnownChanIDs(t *testing.T) { func TestFilterChannelRange(t *testing.T) { t.Parallel() - db, cleanUp, err := MakeTestDB() + graph, cleanUp, err := MakeTestGraph() defer cleanUp() if err != nil { t.Fatalf("unable to make test database: %v", err) } - graph := db.ChannelGraph() - // We'll first populate our graph with two nodes. All channels created // below will be made between these two nodes. node1, err := createTestVertex(graph.db) @@ -1941,14 +1959,12 @@ func TestFilterChannelRange(t *testing.T) { func TestFetchChanInfos(t *testing.T) { t.Parallel() - db, cleanUp, err := MakeTestDB() + graph, cleanUp, err := MakeTestGraph() defer cleanUp() if err != nil { t.Fatalf("unable to make test database: %v", err) } - graph := db.ChannelGraph() - // We'll first populate our graph with two nodes. All channels created // below will be made between these two nodes. node1, err := createTestVertex(graph.db) @@ -2063,14 +2079,12 @@ func TestFetchChanInfos(t *testing.T) { func TestIncompleteChannelPolicies(t *testing.T) { t.Parallel() - db, cleanUp, err := MakeTestDB() + graph, cleanUp, err := MakeTestGraph() defer cleanUp() if err != nil { t.Fatalf("unable to make test database: %v", err) } - graph := db.ChannelGraph() - // Create two nodes. node1, err := createTestVertex(graph.db) if err != nil { @@ -2171,13 +2185,12 @@ func TestIncompleteChannelPolicies(t *testing.T) { func TestChannelEdgePruningUpdateIndexDeletion(t *testing.T) { t.Parallel() - db, cleanUp, err := MakeTestDB() + graph, cleanUp, err := MakeTestGraph() defer cleanUp() if err != nil { t.Fatalf("unable to make test database: %v", err) } - graph := db.ChannelGraph() sourceNode, err := createTestVertex(graph.db) if err != nil { t.Fatalf("unable to create source node: %v", err) @@ -2326,7 +2339,7 @@ func TestChannelEdgePruningUpdateIndexDeletion(t *testing.T) { func TestPruneGraphNodes(t *testing.T) { t.Parallel() - db, cleanUp, err := MakeTestDB() + graph, cleanUp, err := MakeTestGraph() defer cleanUp() if err != nil { t.Fatalf("unable to make test database: %v", err) @@ -2334,7 +2347,6 @@ func TestPruneGraphNodes(t *testing.T) { // We'll start off by inserting our source node, to ensure that it's // the only node left after we prune the graph. - graph := db.ChannelGraph() sourceNode, err := createTestVertex(graph.db) if err != nil { t.Fatalf("unable to create source node: %v", err) @@ -2398,7 +2410,7 @@ func TestPruneGraphNodes(t *testing.T) { // Finally, we'll ensure that node3, the only fully unconnected node as // properly deleted from the graph and not another node in its place. - _, err = graph.FetchLightningNode(nil, node3.PubKeyBytes) + _, err = graph.FetchLightningNode(node3.PubKeyBytes) if err == nil { t.Fatalf("node 3 should have been deleted!") } @@ -2410,14 +2422,12 @@ func TestPruneGraphNodes(t *testing.T) { func TestAddChannelEdgeShellNodes(t *testing.T) { t.Parallel() - db, cleanUp, err := MakeTestDB() + graph, cleanUp, err := MakeTestGraph() defer cleanUp() if err != nil { t.Fatalf("unable to make test database: %v", err) } - graph := db.ChannelGraph() - // To start, we'll create two nodes, and only add one of them to the // channel graph. node1, err := createTestVertex(graph.db) @@ -2441,7 +2451,7 @@ func TestAddChannelEdgeShellNodes(t *testing.T) { // Ensure that node1 was inserted as a full node, while node2 only has // a shell node present. - node1, err = graph.FetchLightningNode(nil, node1.PubKeyBytes) + node1, err = graph.FetchLightningNode(node1.PubKeyBytes) if err != nil { t.Fatalf("unable to fetch node1: %v", err) } @@ -2449,7 +2459,7 @@ func TestAddChannelEdgeShellNodes(t *testing.T) { t.Fatalf("have shell announcement for node1, shouldn't") } - node2, err = graph.FetchLightningNode(nil, node2.PubKeyBytes) + node2, err = graph.FetchLightningNode(node2.PubKeyBytes) if err != nil { t.Fatalf("unable to fetch node2: %v", err) } @@ -2464,14 +2474,12 @@ func TestAddChannelEdgeShellNodes(t *testing.T) { func TestNodePruningUpdateIndexDeletion(t *testing.T) { t.Parallel() - db, cleanUp, err := MakeTestDB() + graph, cleanUp, err := MakeTestGraph() defer cleanUp() if err != nil { t.Fatalf("unable to make test database: %v", err) } - graph := db.ChannelGraph() - // We'll first populate our graph with a single node that will be // removed shortly. node1, err := createTestVertex(graph.db) @@ -2534,44 +2542,41 @@ func TestNodeIsPublic(t *testing.T) { // We'll need to create a separate database and channel graph for each // participant to replicate real-world scenarios (private edges being in // some graphs but not others, etc.). - aliceDB, cleanUp, err := MakeTestDB() + aliceGraph, cleanUp, err := MakeTestGraph() defer cleanUp() if err != nil { t.Fatalf("unable to make test database: %v", err) } - aliceNode, err := createTestVertex(aliceDB) + aliceNode, err := createTestVertex(aliceGraph.db) if err != nil { t.Fatalf("unable to create test node: %v", err) } - aliceGraph := aliceDB.ChannelGraph() if err := aliceGraph.SetSourceNode(aliceNode); err != nil { t.Fatalf("unable to set source node: %v", err) } - bobDB, cleanUp, err := MakeTestDB() + bobGraph, cleanUp, err := MakeTestGraph() defer cleanUp() if err != nil { t.Fatalf("unable to make test database: %v", err) } - bobNode, err := createTestVertex(bobDB) + bobNode, err := createTestVertex(bobGraph.db) if err != nil { t.Fatalf("unable to create test node: %v", err) } - bobGraph := bobDB.ChannelGraph() if err := bobGraph.SetSourceNode(bobNode); err != nil { t.Fatalf("unable to set source node: %v", err) } - carolDB, cleanUp, err := MakeTestDB() + carolGraph, cleanUp, err := MakeTestGraph() defer cleanUp() if err != nil { t.Fatalf("unable to make test database: %v", err) } - carolNode, err := createTestVertex(carolDB) + carolNode, err := createTestVertex(carolGraph.db) if err != nil { t.Fatalf("unable to create test node: %v", err) } - carolGraph := carolDB.ChannelGraph() if err := carolGraph.SetSourceNode(carolNode); err != nil { t.Fatalf("unable to set source node: %v", err) } @@ -2683,14 +2688,12 @@ func TestNodeIsPublic(t *testing.T) { func TestDisabledChannelIDs(t *testing.T) { t.Parallel() - db, cleanUp, err := MakeTestDB() + graph, cleanUp, err := MakeTestGraph() if err != nil { t.Fatalf("unable to make test database: %v", err) } defer cleanUp() - graph := db.ChannelGraph() - // Create first node and add it to the graph. node1, err := createTestVertex(graph.db) if err != nil { @@ -2781,14 +2784,12 @@ func TestDisabledChannelIDs(t *testing.T) { func TestEdgePolicyMissingMaxHtcl(t *testing.T) { t.Parallel() - db, cleanUp, err := MakeTestDB() + graph, cleanUp, err := MakeTestGraph() defer cleanUp() if err != nil { t.Fatalf("unable to make test database: %v", err) } - graph := db.ChannelGraph() - // We'd like to test the update of edges inserted into the database, so // we create two vertexes to connect. node1, err := createTestVertex(graph.db) @@ -2961,12 +2962,11 @@ func TestGraphZombieIndex(t *testing.T) { t.Parallel() // We'll start by creating our test graph along with a test edge. - db, cleanUp, err := MakeTestDB() + graph, cleanUp, err := MakeTestGraph() defer cleanUp() if err != nil { t.Fatalf("unable to create test database: %v", err) } - graph := db.ChannelGraph() node1, err := createTestVertex(graph.db) if err != nil { @@ -3136,7 +3136,7 @@ func compareEdgePolicies(a, b *ChannelEdgePolicy) error { return nil } -// TestLightningNodeSigVerifcation checks that we can use the LightningNode's +// TestLightningNodeSigVerification checks that we can use the LightningNode's // pubkey to verify signatures. func TestLightningNodeSigVerification(t *testing.T) { t.Parallel() @@ -3164,13 +3164,13 @@ func TestLightningNodeSigVerification(t *testing.T) { } // Create a LightningNode from the same private key. - db, cleanUp, err := MakeTestDB() + graph, cleanUp, err := MakeTestGraph() if err != nil { t.Fatalf("unable to make test database: %v", err) } defer cleanUp() - node, err := createLightningNode(db, priv) + node, err := createLightningNode(graph.db, priv) if err != nil { t.Fatalf("unable to create node: %v", err) } @@ -3214,11 +3214,10 @@ func TestComputeFee(t *testing.T) { func TestBatchedAddChannelEdge(t *testing.T) { t.Parallel() - db, cleanUp, err := MakeTestDB() + graph, cleanUp, err := MakeTestGraph() require.Nil(t, err) defer cleanUp() - graph := db.ChannelGraph() sourceNode, err := createTestVertex(graph.db) require.Nil(t, err) err = graph.SetSourceNode(sourceNode) @@ -3297,12 +3296,10 @@ func TestBatchedAddChannelEdge(t *testing.T) { func TestBatchedUpdateEdgePolicy(t *testing.T) { t.Parallel() - db, cleanUp, err := MakeTestDB() + graph, cleanUp, err := MakeTestGraph() require.Nil(t, err) defer cleanUp() - graph := db.ChannelGraph() - // We'd like to test the update of edges inserted into the database, so // we create two vertexes to connect. node1, err := createTestVertex(graph.db) diff --git a/routing/graph.go b/routing/graph.go index 83f06807..be58698f 100644 --- a/routing/graph.go +++ b/routing/graph.go @@ -85,7 +85,7 @@ func (g *dbRoutingTx) sourceNode() route.Vertex { func (g *dbRoutingTx) fetchNodeFeatures(nodePub route.Vertex) ( *lnwire.FeatureVector, error) { - targetNode, err := g.graph.FetchLightningNode(g.tx, nodePub) + targetNode, err := g.graph.FetchLightningNode(nodePub) switch err { // If the node exists and has features, return them directly. diff --git a/routing/pathfind_test.go b/routing/pathfind_test.go index 67fecb32..d098429c 100644 --- a/routing/pathfind_test.go +++ b/routing/pathfind_test.go @@ -23,6 +23,7 @@ import ( "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/record" "github.com/lightningnetwork/lnd/routing/route" @@ -148,26 +149,36 @@ type testChan struct { // makeTestGraph creates a new instance of a channeldb.ChannelGraph for testing // purposes. A callback which cleans up the created temporary directories is // also returned and intended to be executed after the test completes. -func makeTestGraph() (*channeldb.ChannelGraph, func(), error) { +func makeTestGraph() (*channeldb.ChannelGraph, kvdb.Backend, func(), error) { // First, create a temporary directory to be used for the duration of // this test. tempDirName, err := ioutil.TempDir("", "channeldb") if err != nil { - return nil, nil, err + return nil, nil, nil, err } - // Next, create channeldb for the first time. - cdb, err := channeldb.Open(tempDirName) + // Next, create channelgraph for the first time. + backend, backendCleanup, err := kvdb.GetTestBackend(tempDirName, "cgr") if err != nil { - return nil, nil, err + return nil, nil, nil, err } cleanUp := func() { - cdb.Close() - os.RemoveAll(tempDirName) + backendCleanup() + _ = os.RemoveAll(tempDirName) } - return cdb.ChannelGraph(), cleanUp, nil + opts := channeldb.DefaultOptions() + graph, err := channeldb.NewChannelGraph( + backend, opts.RejectCacheSize, opts.ChannelCacheSize, + opts.BatchCommitInterval, + ) + if err != nil { + cleanUp() + return nil, nil, nil, err + } + + return graph, backend, cleanUp, nil } // parseTestGraph returns a fully populated ChannelGraph given a path to a JSON @@ -197,7 +208,7 @@ func parseTestGraph(path string) (*testGraphInstance, error) { testAddrs = append(testAddrs, testAddr) // Next, create a temporary graph database for usage within the test. - graph, cleanUp, err := makeTestGraph() + graph, graphBackend, cleanUp, err := makeTestGraph() if err != nil { return nil, err } @@ -381,11 +392,12 @@ func parseTestGraph(path string) (*testGraphInstance, error) { } return &testGraphInstance{ - graph: graph, - cleanUp: cleanUp, - aliasMap: aliasMap, - privKeyMap: privKeyMap, - channelIDs: channelIDs, + graph: graph, + graphBackend: graphBackend, + cleanUp: cleanUp, + aliasMap: aliasMap, + privKeyMap: privKeyMap, + channelIDs: channelIDs, }, nil } @@ -447,8 +459,9 @@ type testChannel struct { } type testGraphInstance struct { - graph *channeldb.ChannelGraph - cleanUp func() + graph *channeldb.ChannelGraph + graphBackend kvdb.Backend + cleanUp func() // aliasMap is a map from a node's alias to its public key. This type is // provided in order to allow easily look up from the human memorable alias @@ -482,7 +495,7 @@ func createTestGraphFromChannels(testChannels []*testChannel, source string) ( testAddrs = append(testAddrs, testAddr) // Next, create a temporary graph database for usage within the test. - graph, cleanUp, err := makeTestGraph() + graph, graphBackend, cleanUp, err := makeTestGraph() if err != nil { return nil, err } @@ -671,10 +684,11 @@ func createTestGraphFromChannels(testChannels []*testChannel, source string) ( } return &testGraphInstance{ - graph: graph, - cleanUp: cleanUp, - aliasMap: aliasMap, - privKeyMap: privKeyMap, + graph: graph, + graphBackend: graphBackend, + cleanUp: cleanUp, + aliasMap: aliasMap, + privKeyMap: privKeyMap, }, nil } @@ -2120,7 +2134,7 @@ func TestPathFindSpecExample(t *testing.T) { // Carol, so we set "B" as the source node so path finding starts from // Bob. bob := ctx.aliases["B"] - bobNode, err := ctx.graph.FetchLightningNode(nil, bob) + bobNode, err := ctx.graph.FetchLightningNode(bob) if err != nil { t.Fatalf("unable to find bob: %v", err) } @@ -2170,7 +2184,7 @@ func TestPathFindSpecExample(t *testing.T) { // Next, we'll set A as the source node so we can assert that we create // the proper route for any queries starting with Alice. alice := ctx.aliases["A"] - aliceNode, err := ctx.graph.FetchLightningNode(nil, alice) + aliceNode, err := ctx.graph.FetchLightningNode(alice) if err != nil { t.Fatalf("unable to find alice: %v", err) } diff --git a/routing/router.go b/routing/router.go index 6ebf86c1..00fa4d31 100644 --- a/routing/router.go +++ b/routing/router.go @@ -2505,8 +2505,10 @@ func (r *ChannelRouter) GetChannelByID(chanID lnwire.ShortChannelID) ( // within the graph. // // NOTE: This method is part of the ChannelGraphSource interface. -func (r *ChannelRouter) FetchLightningNode(node route.Vertex) (*channeldb.LightningNode, error) { - return r.cfg.Graph.FetchLightningNode(nil, node) +func (r *ChannelRouter) FetchLightningNode( + node route.Vertex) (*channeldb.LightningNode, error) { + + return r.cfg.Graph.FetchLightningNode(node) } // ForEachNode is used to iterate over every node in router topology. diff --git a/routing/router_test.go b/routing/router_test.go index 2633bd5a..510d18bf 100644 --- a/routing/router_test.go +++ b/routing/router_test.go @@ -125,8 +125,7 @@ func createTestCtxFromGraphInstanceAssumeValid(t *testing.T, } mc, err := NewMissionControl( - graphInstance.graph.Database(), route.Vertex{}, - mcConfig, + graphInstance.graphBackend, route.Vertex{}, mcConfig, ) require.NoError(t, err, "failed to create missioncontrol") @@ -188,7 +187,6 @@ func createTestCtxFromGraphInstanceAssumeValid(t *testing.T, cleanUp := func() { ctx.router.Stop() - graphInstance.cleanUp() } return ctx, cleanUp @@ -197,17 +195,10 @@ func createTestCtxFromGraphInstanceAssumeValid(t *testing.T, func createTestCtxSingleNode(t *testing.T, startingHeight uint32) (*testCtx, func()) { - var ( - graph *channeldb.ChannelGraph - sourceNode *channeldb.LightningNode - cleanup func() - err error - ) - - graph, cleanup, err = makeTestGraph() + graph, graphBackend, cleanup, err := makeTestGraph() require.NoError(t, err, "failed to make test graph") - sourceNode, err = createTestNode() + sourceNode, err := createTestNode() require.NoError(t, err, "failed to create test node") require.NoError(t, @@ -215,8 +206,9 @@ func createTestCtxSingleNode(t *testing.T, ) graphInstance := &testGraphInstance{ - graph: graph, - cleanUp: cleanup, + graph: graph, + graphBackend: graphBackend, + cleanUp: cleanup, } return createTestCtxFromGraphInstance( @@ -1577,7 +1569,7 @@ func TestAddEdgeUnknownVertexes(t *testing.T) { t.Fatalf("unable to find any routes: %v", err) } - copy1, err := ctx.graph.FetchLightningNode(nil, pub1) + copy1, err := ctx.graph.FetchLightningNode(pub1) if err != nil { t.Fatalf("unable to fetch node: %v", err) } @@ -1586,7 +1578,7 @@ func TestAddEdgeUnknownVertexes(t *testing.T) { t.Fatalf("fetched node not equal to original") } - copy2, err := ctx.graph.FetchLightningNode(nil, pub2) + copy2, err := ctx.graph.FetchLightningNode(pub2) if err != nil { t.Fatalf("unable to fetch node: %v", err) } diff --git a/rpcserver.go b/rpcserver.go index 67934bac..3cc48132 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -5539,7 +5539,7 @@ func (r *rpcServer) GetNodeInfo(ctx context.Context, // With the public key decoded, attempt to fetch the node corresponding // to this public key. If the node cannot be found, then an error will // be returned. - node, err := graph.FetchLightningNode(nil, pubKey) + node, err := graph.FetchLightningNode(pubKey) switch { case err == channeldb.ErrGraphNodeNotFound: return nil, status.Error(codes.NotFound, err.Error()) diff --git a/server.go b/server.go index f8f1f53e..0b1afe40 100644 --- a/server.go +++ b/server.go @@ -3921,7 +3921,7 @@ func (s *server) fetchNodeAdvertisedAddr(pub *btcec.PublicKey) (net.Addr, error) return nil, err } - node, err := s.graphDB.FetchLightningNode(nil, vertex) + node, err := s.graphDB.FetchLightningNode(vertex) if err != nil { return nil, err }