diff --git a/lntest/node.go b/lntest/node.go index 20166df9..186a5bea 100644 --- a/lntest/node.go +++ b/lntest/node.go @@ -6,6 +6,7 @@ import ( "crypto/rand" "encoding/hex" "encoding/json" + "errors" "fmt" "io" "io/ioutil" @@ -23,7 +24,6 @@ import ( "github.com/btcsuite/btcd/rpcclient" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" - "github.com/go-errors/errors" "github.com/jackc/pgx/v4/pgxpool" "github.com/lightningnetwork/lnd/chanbackup" "github.com/lightningnetwork/lnd/lnrpc" @@ -36,7 +36,9 @@ import ( "github.com/lightningnetwork/lnd/lntest/wait" "github.com/lightningnetwork/lnd/macaroons" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" + "google.golang.org/grpc/status" "gopkg.in/macaroon.v2" ) @@ -316,8 +318,13 @@ type HarnessNode struct { // node and the outpoint. policyUpdates policyUpdateMap - quit chan struct{} - wg sync.WaitGroup + // runCtx is a context with cancel method. It's used to signal when the + // node needs to quit, and used as the parent context when spawning + // children contexts for RPC requests. + runCtx context.Context + cancel context.CancelFunc + + wg sync.WaitGroup lnrpc.LightningClient @@ -634,7 +641,10 @@ func renameFile(fromFileName, toFileName string) { func (hn *HarnessNode) start(lndBinary string, lndError chan<- error, wait bool) error { - hn.quit = make(chan struct{}) + // Init the runCtx. + ctxt, cancel := context.WithCancel(context.Background()) + hn.runCtx = ctxt + hn.cancel = cancel args := hn.Cfg.genArgs() hn.cmd = exec.Command(lndBinary, args...) @@ -739,7 +749,7 @@ func (hn *HarnessNode) start(lndBinary string, lndError chan<- error, err := hn.cmd.Wait() if err != nil { - lndError <- errors.Errorf("%v\n%v\n", err, errb.String()) + lndError <- fmt.Errorf("%v\n%v", err, errb.String()) } // Signal any onlookers that this process has exited. @@ -816,7 +826,7 @@ func (hn *HarnessNode) waitForState(conn grpc.ClientConnInterface, predicate func(state lnrpc.WalletState) bool) error { stateClient := lnrpc.NewStateClient(conn) - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(hn.runCtx) defer cancel() stateStream, err := stateClient.SubscribeState( @@ -1016,8 +1026,7 @@ func (hn *HarnessNode) Unlock(ctx context.Context, // waitTillServerStarted makes a subscription to the server's state change and // blocks until the server is in state ServerActive. func (hn *HarnessNode) waitTillServerStarted() error { - ctxb := context.Background() - ctxt, cancel := context.WithTimeout(ctxb, NodeStartTimeout) + ctxt, cancel := context.WithTimeout(hn.runCtx, NodeStartTimeout) defer cancel() client, err := hn.StateClient.SubscribeState( @@ -1183,7 +1192,7 @@ func (hn *HarnessNode) ConnectRPCWithMacaroon(mac *macaroon.Macaroon) ( grpc.WithTransportCredentials(tlsCreds), } - ctx, cancel := context.WithTimeout(context.Background(), DefaultTimeout) + ctx, cancel := context.WithTimeout(hn.runCtx, DefaultTimeout) defer cancel() if mac == nil { @@ -1247,10 +1256,9 @@ func (hn *HarnessNode) stop() error { // Don't watch for error because sometimes the RPC connection gets // closed before a response is returned. req := lnrpc.StopRequest{} - ctx := context.Background() err := wait.NoError(func() error { - _, err := hn.LightningClient.StopDaemon(ctx, &req) + _, err := hn.LightningClient.StopDaemon(hn.runCtx, &req) switch { case err == nil: return nil @@ -1275,10 +1283,10 @@ func (hn *HarnessNode) stop() error { return fmt.Errorf("process did not exit") } - close(hn.quit) + // Stop the runCtx and wait for goroutines to finish. + hn.cancel() hn.wg.Wait() - hn.quit = nil hn.processExit = nil hn.LightningClient = nil hn.WalletUnlockerClient = nil @@ -1287,13 +1295,25 @@ func (hn *HarnessNode) stop() error { // Close any attempts at further grpc connections. if hn.conn != nil { - err := hn.conn.Close() - if err != nil && - !strings.Contains(err.Error(), "connection is closing") { + err := status.Code(hn.conn.Close()) + switch err { + case codes.OK: + return nil - return fmt.Errorf("error attempting to stop grpc "+ - "client: %v", err) + // When the context is canceled above, we might get the + // following error as the context is no longer active. + case codes.Canceled: + return nil + + case codes.Unknown: + return fmt.Errorf("unknown error attempting to stop "+ + "grpc client: %v", err) + + default: + return fmt.Errorf("error attempting to stop "+ + "grpc client: %v", err) } + } return nil @@ -1440,7 +1460,7 @@ func (hn *HarnessNode) lightningNetworkWatcher() { hn.handlePolicyUpdateWatchRequest(watchRequest) } - case <-hn.quit: + case <-hn.runCtx.Done(): return } } @@ -1577,7 +1597,7 @@ func (hn *HarnessNode) WaitForBlockchainSync(ctx context.Context) error { case <-ctx.Done(): return fmt.Errorf("timeout while waiting for " + "blockchain sync") - case <-hn.quit: + case <-hn.runCtx.Done(): return nil case <-ticker.C: } @@ -1586,13 +1606,14 @@ func (hn *HarnessNode) WaitForBlockchainSync(ctx context.Context) error { // WaitForBalance waits until the node sees the expected confirmed/unconfirmed // balance within their wallet. -func (hn *HarnessNode) WaitForBalance(expectedBalance btcutil.Amount, confirmed bool) error { - ctx := context.Background() +func (hn *HarnessNode) WaitForBalance(expectedBalance btcutil.Amount, + confirmed bool) error { + req := &lnrpc.WalletBalanceRequest{} var lastBalance btcutil.Amount doesBalanceMatch := func() bool { - balance, err := hn.WalletBalance(ctx, req) + balance, err := hn.WalletBalance(hn.runCtx, req) if err != nil { return false } @@ -1705,7 +1726,7 @@ func (hn *HarnessNode) handleOpenChannelWatchRequest(req *chanWatchRequest) { // node. This lets us handle the case where a node has already seen a // channel before a notification has been requested, causing us to miss // it. - chanFound := checkChanPointInGraph(context.Background(), hn, targetChan) + chanFound := checkChanPointInGraph(hn.runCtx, hn, targetChan) if chanFound { close(req.eventChan) return @@ -1795,16 +1816,14 @@ func (hn *HarnessNode) newTopologyClient( func (hn *HarnessNode) receiveTopologyClientStream( receiver chan *lnrpc.GraphTopologyUpdate) error { - ctxb := context.Background() - // Create a topology client to receive graph updates. - client, err := hn.newTopologyClient(ctxb) + client, err := hn.newTopologyClient(hn.runCtx) if err != nil { return fmt.Errorf("create topologyClient failed: %v", err) } // We use the context to time out when retrying graph subscription. - ctxt, cancel := context.WithTimeout(ctxb, DefaultTimeout) + ctxt, cancel := context.WithTimeout(hn.runCtx, DefaultTimeout) defer cancel() for { @@ -1823,12 +1842,12 @@ func (hn *HarnessNode) receiveTopologyClientStream( return fmt.Errorf("graph subscription: " + "router not started before timeout") case <-time.After(wait.PollInterval): - case <-hn.quit: + case <-hn.runCtx.Done(): return nil } // Re-create the topology client. - client, err = hn.newTopologyClient(ctxb) + client, err = hn.newTopologyClient(hn.runCtx) if err != nil { return fmt.Errorf("create topologyClient "+ "failed: %v", err) @@ -1840,6 +1859,10 @@ func (hn *HarnessNode) receiveTopologyClientStream( // End of subscription stream. Do nothing and quit. return nil + case strings.Contains(err.Error(), context.Canceled.Error()): + // End of subscription stream. Do nothing and quit. + return nil + default: // An expected error is returned, return and leave it // to be handled by the caller. @@ -1849,7 +1872,7 @@ func (hn *HarnessNode) receiveTopologyClientStream( // Send the update or quit. select { case receiver <- update: - case <-hn.quit: + case <-hn.runCtx.Done(): return nil } } @@ -1920,9 +1943,7 @@ func (hn *HarnessNode) handlePolicyUpdateWatchRequest(req *chanWatchRequest) { // the format defined in type policyUpdateMap. func (hn *HarnessNode) getChannelPolicies(include bool) policyUpdateMap { - ctxt, cancel := context.WithTimeout( - context.Background(), DefaultTimeout, - ) + ctxt, cancel := context.WithTimeout(hn.runCtx, DefaultTimeout) defer cancel() graph, err := hn.DescribeGraph(ctxt, &lnrpc.ChannelGraphRequest{