zmq: fix flake8 nits

This commit is contained in:
Michael Schmoock
2020-12-13 13:17:29 +01:00
committed by Christian Decker
parent 844b45bbc4
commit e0b2b6e515
2 changed files with 19 additions and 6 deletions

View File

@@ -28,7 +28,6 @@
# NOTIFICATION_TYPE_NAMES below.
###############################################################################
import time
import json
import functools
@@ -50,10 +49,12 @@ NOTIFICATION_TYPE_NAMES = ['channel_opened',
'sendpay_success',
'sendpay_failure']
class NotificationType():
""" Wrapper for notification type string to generate the corresponding
plugin option strings. By convention of lightningd, the cli options
use dashes in place of rather than underscores or no spaces."""
def __init__(self, notification_type_name):
self.notification_type_name = notification_type_name
@@ -66,15 +67,18 @@ class NotificationType():
def hwm_option(self):
return "zmq-pub-{}-hwm".format(str(self).replace("_", "-"))
NOTIFICATION_TYPES = [NotificationType(n) for n in NOTIFICATION_TYPE_NAMES]
###############################################################################
class Publisher():
""" Holds the connection state and accepts incoming notifications that
come from the subscription. If there is an associated publishing
endpoint connected, it will encode and pass the contents of the
notification. """
def __init__(self):
self.factory = ZmqFactory()
self.connection_map = {}
@@ -95,12 +99,14 @@ class Publisher():
connection = self.connection_map[notification_type_name]
connection.publish(message, tag=tag)
publisher = Publisher()
###############################################################################
ZMQ_TRANSPORT_PREFIXES = ['tcp://', "ipc://", 'inproc://', "pgm://", "epgm://"]
class Setup():
""" Does some light validation of the plugin option input and generates a
dictionary to configure the Twisted and ZeroMQ setup """
@@ -125,7 +131,7 @@ class Setup():
level='warn')
for endpoint in Setup._iter_endpoints_not_ok(options):
plugin.log(("Endpoint option {} doesn't appear to be recognized"
).format(endpoint), level='warn')
).format(endpoint), level='warn')
###########################################################################
@@ -144,7 +150,7 @@ class Setup():
for e, nt, hwm in Setup._iter_endpoint_setup(options):
if e not in setup:
setup[e] = {'notification_type_names': [],
'high_water_mark': hwm}
'high_water_mark': hwm}
setup[e]['notification_type_names'].append(str(nt))
# use the lowest high water mark given for the endpoint
setup[e]['high_water_mark'] = min(
@@ -165,6 +171,7 @@ class Setup():
plugin = Plugin()
@plugin.init()
def init(options, configuration, plugin, **kwargs):
Setup.check_option_warnings(options, plugin)
@@ -172,18 +179,20 @@ def init(options, configuration, plugin, **kwargs):
Setup.log_setup_dict(setup_dict, plugin)
reactor.callFromThread(publisher.load_setup, setup_dict)
def on_notification(notification_type_name, plugin, *args, **kwargs):
if len(args) != 0:
plugin.log("got unexpected args: {}".format(args), level="warn")
reactor.callFromThread(publisher.publish_notification,
notification_type_name, *args, **kwargs)
DEFAULT_HIGH_WATER_MARK = 1000
for nt in NOTIFICATION_TYPES:
# subscribe to all notifications
on = functools.partial(on_notification, str(nt))
on.__annotations__ = {} # needed to please Plugin._coerce_arguments()
on.__annotations__ = {} # needed to please Plugin._coerce_arguments()
plugin.add_subscription(str(nt), on)
# zmq socket binding option
endpoint_opt = nt.endpoint_option()
@@ -198,9 +207,11 @@ for nt in NOTIFICATION_TYPES:
###############################################################################
def plugin_thread():
plugin.run()
reactor.callFromThread(reactor.stop)
reactor.callInThread(plugin_thread)
reactor.run()

View File

@@ -35,7 +35,6 @@ import json
import argparse
from twisted.internet import reactor
from twisted.internet.task import LoopingCall
from txzmq import ZmqEndpoint, ZmqEndpointType
from txzmq import ZmqFactory
@@ -52,6 +51,7 @@ NOTIFICATION_TYPE_NAMES = ['channel_opened',
'sendpay_success',
'sendpay_failure']
class NotificationType():
def __init__(self, notification_type_name):
self.notification_type_name = notification_type_name
@@ -65,10 +65,12 @@ class NotificationType():
def argparse_namespace_attribute(self):
return "zmq_sub_{}".format((self))
NOTIFICATION_TYPES = [NotificationType(n) for n in NOTIFICATION_TYPE_NAMES]
###############################################################################
class Subscriber():
def __init__(self):
self.factory = ZmqFactory()
@@ -76,7 +78,7 @@ class Subscriber():
def _log_message(self, message, tag):
tag = tag.decode("utf8")
message = json.dumps(json.loads(message.decode("utf8")), indent=1,
sort_keys=True)
sort_keys=True)
current_time = time.strftime('%X %x %Z')
print("{} - {}\n{}".format(current_time, tag, message))