diff --git a/zmq/cl-zmq.py b/zmq/cl-zmq.py index 23b4a24..7a1ecb8 100755 --- a/zmq/cl-zmq.py +++ b/zmq/cl-zmq.py @@ -28,7 +28,6 @@ # NOTIFICATION_TYPE_NAMES below. ############################################################################### -import time import json import functools @@ -50,10 +49,12 @@ NOTIFICATION_TYPE_NAMES = ['channel_opened', 'sendpay_success', 'sendpay_failure'] + class NotificationType(): """ Wrapper for notification type string to generate the corresponding plugin option strings. By convention of lightningd, the cli options use dashes in place of rather than underscores or no spaces.""" + def __init__(self, notification_type_name): self.notification_type_name = notification_type_name @@ -66,15 +67,18 @@ class NotificationType(): def hwm_option(self): return "zmq-pub-{}-hwm".format(str(self).replace("_", "-")) + NOTIFICATION_TYPES = [NotificationType(n) for n in NOTIFICATION_TYPE_NAMES] ############################################################################### + class Publisher(): """ Holds the connection state and accepts incoming notifications that come from the subscription. If there is an associated publishing endpoint connected, it will encode and pass the contents of the notification. """ + def __init__(self): self.factory = ZmqFactory() self.connection_map = {} @@ -95,12 +99,14 @@ class Publisher(): connection = self.connection_map[notification_type_name] connection.publish(message, tag=tag) + publisher = Publisher() ############################################################################### ZMQ_TRANSPORT_PREFIXES = ['tcp://', "ipc://", 'inproc://', "pgm://", "epgm://"] + class Setup(): """ Does some light validation of the plugin option input and generates a dictionary to configure the Twisted and ZeroMQ setup """ @@ -125,7 +131,7 @@ class Setup(): level='warn') for endpoint in Setup._iter_endpoints_not_ok(options): plugin.log(("Endpoint option {} doesn't appear to be recognized" - ).format(endpoint), level='warn') + ).format(endpoint), level='warn') ########################################################################### @@ -144,7 +150,7 @@ class Setup(): for e, nt, hwm in Setup._iter_endpoint_setup(options): if e not in setup: setup[e] = {'notification_type_names': [], - 'high_water_mark': hwm} + 'high_water_mark': hwm} setup[e]['notification_type_names'].append(str(nt)) # use the lowest high water mark given for the endpoint setup[e]['high_water_mark'] = min( @@ -165,6 +171,7 @@ class Setup(): plugin = Plugin() + @plugin.init() def init(options, configuration, plugin, **kwargs): Setup.check_option_warnings(options, plugin) @@ -172,18 +179,20 @@ def init(options, configuration, plugin, **kwargs): Setup.log_setup_dict(setup_dict, plugin) reactor.callFromThread(publisher.load_setup, setup_dict) + def on_notification(notification_type_name, plugin, *args, **kwargs): if len(args) != 0: plugin.log("got unexpected args: {}".format(args), level="warn") reactor.callFromThread(publisher.publish_notification, notification_type_name, *args, **kwargs) + DEFAULT_HIGH_WATER_MARK = 1000 for nt in NOTIFICATION_TYPES: # subscribe to all notifications on = functools.partial(on_notification, str(nt)) - on.__annotations__ = {} # needed to please Plugin._coerce_arguments() + on.__annotations__ = {} # needed to please Plugin._coerce_arguments() plugin.add_subscription(str(nt), on) # zmq socket binding option endpoint_opt = nt.endpoint_option() @@ -198,9 +207,11 @@ for nt in NOTIFICATION_TYPES: ############################################################################### + def plugin_thread(): plugin.run() reactor.callFromThread(reactor.stop) + reactor.callInThread(plugin_thread) reactor.run() diff --git a/zmq/example-subscriber.py b/zmq/example-subscriber.py index 22608e4..ed86d1d 100755 --- a/zmq/example-subscriber.py +++ b/zmq/example-subscriber.py @@ -35,7 +35,6 @@ import json import argparse from twisted.internet import reactor -from twisted.internet.task import LoopingCall from txzmq import ZmqEndpoint, ZmqEndpointType from txzmq import ZmqFactory @@ -52,6 +51,7 @@ NOTIFICATION_TYPE_NAMES = ['channel_opened', 'sendpay_success', 'sendpay_failure'] + class NotificationType(): def __init__(self, notification_type_name): self.notification_type_name = notification_type_name @@ -65,10 +65,12 @@ class NotificationType(): def argparse_namespace_attribute(self): return "zmq_sub_{}".format((self)) + NOTIFICATION_TYPES = [NotificationType(n) for n in NOTIFICATION_TYPE_NAMES] ############################################################################### + class Subscriber(): def __init__(self): self.factory = ZmqFactory() @@ -76,7 +78,7 @@ class Subscriber(): def _log_message(self, message, tag): tag = tag.decode("utf8") message = json.dumps(json.loads(message.decode("utf8")), indent=1, - sort_keys=True) + sort_keys=True) current_time = time.strftime('%X %x %Z') print("{} - {}\n{}".format(current_time, tag, message))