diff --git a/README.md b/README.md index fb57990..e8979dd 100644 --- a/README.md +++ b/README.md @@ -22,6 +22,7 @@ Community curated plugins for c-lightning. | [sendinvoiceless][sendinvoiceless] | Sends some money without an invoice from the receiving node. | | [sitzprobe][sitzprobe] | A Lightning Network payment rehearsal utility | | [summary][summary] | Print a nice summary of the node status | +| [zmq][zmq] | Publishes notifications via [ZeroMQ][zmq-home] to configured endpoints | ## Installation @@ -136,3 +137,5 @@ your environment. [js-api]: https://github.com/darosior/clightningjs [monitor]: https://github.com/renepickhardt/plugins/tree/master/monitor [reckless]: https://github.com/darosior/reckless +[zmq-home]: https://zeromq.org/ +[zmq]: https://github.com/lightningd/plugins/tree/master/zmq diff --git a/zmq/README.md b/zmq/README.md new file mode 100644 index 0000000..c6f029c --- /dev/null +++ b/zmq/README.md @@ -0,0 +1,43 @@ +# ZeroMQ Publisher Plugin + +This module forwards [notifications](https://github.com/ElementsProject/lightning/blob/master/doc/PLUGINS.md#notification-types) to ZeroMQ endpoints depending on configuration. + +The usage and setup mimics [similar functionality in `bitcoind`](https://github.com/bitcoin/bitcoin/blob/master/doc/zmq.md) for opting-in to notifications and selecting [high water mark (ZMQ\_HWM)](http://api.zeromq.org/2-1:zmq-setsockopt) preferences. + + +## Dependencies + +[Twisted](https://twistedmatrix.com) and [txZMQ](https://pypi.org/project/txZMQ/) are used by this plugin. + +``` +$ sudo pip3 install twisted txzmq +``` + +## Installation + +For general plugin installation instructions see the repos main +[README.md](https://github.com/lightningd/plugins/blob/master/README.md#Installation) + +## Usage + +The plugin registeres CLI options for opting in to notifications at given endpoints. +Eg running with: +``` + $ lightningd --zmq-pub-connect=ipc:///tmp/cl-zmq \ + --zmq-pub-disconnect=tcp://127.0.0.1:5555 +``` +will publish `connect` and `disconnect` notifications to the specified endpoints. The default high water mark is 1000, which is the same as `bitcoind`'s default. + +This plugin does not interpret the content of the data, merely passes it on. + +The ZMQ `tag` used for subscribing is the UTF-8 encoded string of the notification type name string. +Eg. for the `invoice_payment` notification, the tag for subscribers will be `b'invoice_payment'`. The data published will be UTF-8 encoded JSON which comes from `lightningd`. + +## Example + +[example-subscriber.py](example-subscriber.py) is provided as an example subscriber to mirror the publishing code. + +## Tips and Tricks + +- The plugin subscribes to all notifications under in the `NOTIFICATION_TYPE_NAMES` regardless of whether they are connected to ZMQ endpoints via CLI launch. For avoiding that performance overhead, entries can be dropped from the list to avoid subscribing to them. +- Unless there are changes to the plugin interfaces, notification types that are added in the future should be easy to include by adding the value to the `NOTIFICATION_TYPE_NAMES` list. diff --git a/zmq/cl-zmq.py b/zmq/cl-zmq.py new file mode 100755 index 0000000..59357c9 --- /dev/null +++ b/zmq/cl-zmq.py @@ -0,0 +1,206 @@ +#!/usr/bin/env python3 +# Copyright (c) 2019 lightningd +# Distributed under the BSD 3-Clause License, see the accompanying file LICENSE + +############################################################################### +# ZeroMQ publishing plugin for lightningd +# +# Using Twisted and txZMQ frameworks, this plugin binds to ZeroMQ endpoints and +# publishes notification of all possible subscriptions that have been opted-in +# for via lightningd launch parameter. +# +# This plugin doesn't interpret any of the content of the data which comes out +# of lightningd, it merely passes the received JSON through as encoded UTF-8, +# with the 'tag' being set to the Notification Type name (also encoded as +# UTF-8). It follows that adding future possible subscriptions *should* be as +# easy as appending it to NOTIFICATION_TYPE_NAMES below. +# +# The user-selectable configuration takes inspiration from the bitcoind ZeroMQ +# integration. The endpoint must be explicitly given as an argument to enable +# it. Also, the high water mark argument for the binding is set as an +# additional launch option. +# +# Due to how the plugins must register via getmanifest, this will opt-in to all +# subscriptions and ignore the messages from ones not bound to ZMQ endpoints. +# Hence, there might be a minor performance impact from subscription messages +# that result in no publish action. This can be mitigated by dropping +# notifications that are not of interest to your ZeroMQ subscribers from +# NOTIFICATION_TYPE_NAMES below. +############################################################################### + +import time +import json +import functools + +from twisted.internet import reactor +from txzmq import ZmqEndpoint, ZmqEndpointType +from txzmq import ZmqFactory +from txzmq import ZmqPubConnection + +from lightning import Plugin + +############################################################################### + +NOTIFICATION_TYPE_NAMES = ['channel_opened', + 'connect', + 'disconnect', + 'invoice_payment', + 'warning', + 'forward_event', + 'sendpay_success', + 'sendpay_failure'] + +class NotificationType(): + """ Wrapper for notification type string to generate the corresponding + plugin option strings. By convention of lightningd, the cli options + use dashes in place of rather than underscores or no spaces.""" + def __init__(self, notification_type_name): + self.notification_type_name = notification_type_name + + def __str__(self): + return self.notification_type_name + + def endpoint_option(self): + return "zmq-pub-{}".format(str(self).replace("_", "-")) + + def hwm_option(self): + return "zmq-pub-{}-hwm".format(str(self).replace("_", "-")) + +NOTIFICATION_TYPES = [NotificationType(n) for n in NOTIFICATION_TYPE_NAMES] + +############################################################################### + +class Publisher(): + """ Holds the connection state and accepts incoming notifications that + come from the subscription. If there is an associated publishing + endpoint connected, it will encode and pass the contents of the + notification. """ + def __init__(self): + self.factory = ZmqFactory() + self.connection_map = {} + + def load_setup(self, setup): + for e, s in setup.items(): + endpoint = ZmqEndpoint(ZmqEndpointType.bind, e) + ZmqPubConnection.highWaterMark = s['high_water_mark'] + connection = ZmqPubConnection(self.factory, endpoint) + for n in s['notification_type_names']: + self.connection_map[n] = connection + + def publish_notification(self, notification_type_name, *args, **kwargs): + if notification_type_name not in self.connection_map: + return + tag = notification_type_name.encode("utf8") + message = json.dumps(kwargs).encode("utf8") + connection = self.connection_map[notification_type_name] + connection.publish(message, tag=tag) + +publisher = Publisher() + +############################################################################### + +ZMQ_TRANSPORT_PREFIXES = ['tcp://', "ipc://", 'inproc://', "pgm://", "epgm://"] + +class Setup(): + """ Does some light validation of the plugin option input and generates a + dictionary to configure the Twisted and ZeroMQ setup """ + def _at_least_one_binding(options): + n_bindings = sum(1 for o, v in options.items() if + not o.endswith("-hwm") and v != "null") + return n_bindings > 0 + + def _iter_endpoints_not_ok(options): + for nt in NOTIFICATION_TYPES: + endpoint_opt = nt.endpoint_option() + endpoint = options[endpoint_opt] + if endpoint != "null": + if len([1 for prefix in ZMQ_TRANSPORT_PREFIXES if + endpoint.startswith(prefix)]) != 0: + continue + yield endpoint + + def check_option_warnings(options, plugin): + if not Setup._at_least_one_binding(options): + plugin.log("No zmq publish sockets are bound as per launch args", + level='warn') + for endpoint in Setup._iter_endpoints_not_ok(options): + plugin.log(("Endpoint option {} doesn't appear to be recognized" + ).format(endpoint), level='warn') + + ########################################################################### + + def _iter_endpoint_setup(options): + for nt in NOTIFICATION_TYPES: + endpoint_opt = nt.endpoint_option() + if options[endpoint_opt] == "null": + continue + endpoint = options[endpoint_opt] + hwm_opt = nt.hwm_option() + hwm = int(options[hwm_opt]) + yield endpoint, nt, hwm + + def get_setup_dict(options): + setup = {} + for e, nt, hwm in Setup._iter_endpoint_setup(options): + if e not in setup: + setup[e] = {'notification_type_names': [], + 'high_water_mark': hwm} + setup[e]['notification_type_names'].append(str(nt)) + # use the lowest high water mark given for the endpoint + setup[e]['high_water_mark'] = min( + setup[e]['high_water_mark'], hwm) + return setup + + ########################################################################### + + def log_setup_dict(setup, plugin): + for e, s in setup.items(): + m = ("Endpoint {} will get events from {} subscriptions " + "published with high water mark {}") + m = m.format(e, s['notification_type_names'], s['high_water_mark']) + plugin.log(m) + + +############################################################################### + +plugin = Plugin() + +@plugin.init() +def init(options, configuration, plugin, **kwargs): + Setup.check_option_warnings(options, plugin) + setup_dict = Setup.get_setup_dict(options) + Setup.log_setup_dict(setup_dict, plugin) + reactor.callFromThread(publisher.load_setup, setup_dict) + +def on_notification(notification_type_name, plugin, *args, **kwargs): + if len(args) != 0: + plugin.log("got unexpected args: {}".format(args), level="warn") + reactor.callFromThread(publisher.publish_notification, + notification_type_name, *args, **kwargs) + +DEFAULT_HIGH_WATER_MARK = 1000 + +for nt in NOTIFICATION_TYPES: + # subscribe to all notifications + on = functools.partial(on_notification, str(nt)) + on.__annotations__ = {} # needed to please Plugin._coerce_arguments() + plugin.add_subscription(str(nt), on) + # zmq socket binding option + endpoint_opt = nt.endpoint_option() + endpoint_desc = "Enable publish {} info to ZMQ socket endpoint".format(nt) + plugin.add_option(endpoint_opt, None, endpoint_desc, opt_type='string') + # high water mark option + hwm_opt = nt.hwm_option() + hwm_desc = ("Set publish {} info message high water mark " + "(default: {})".format(nt, DEFAULT_HIGH_WATER_MARK)) + plugin.add_option(hwm_opt, DEFAULT_HIGH_WATER_MARK, hwm_desc, + opt_type='int') + +############################################################################### + +def plugin_thread(): + plugin.run() + reactor.callFromThread(reactor.stop) + +reactor.callInThread(plugin_thread) +reactor.run() diff --git a/zmq/example-subscriber.py b/zmq/example-subscriber.py new file mode 100755 index 0000000..22608e4 --- /dev/null +++ b/zmq/example-subscriber.py @@ -0,0 +1,115 @@ +#!/usr/bin/env python3 +# Copyright (c) 2019 lightningd +# Distributed under the BSD 3-Clause License, see the accompanying file LICENSE + +############################################################################### +# An example ZeroMQ subscriber client for the plugin. +# +# This module connects and subscribes to ZeroMQ endpoints, decodes the messages +# and simply logs the JSON payload to stdout. +# +# It also uses Twisted and txZMQ frameworks, but on the subscriber side. +# +# To use, the plugin must be loaded for lightningd and the parameters must be +# given as launch arguments to publish notifications at given endpoints. +# Eg. for publishing 'connect' and 'disconnect' notifications:. +# $ lightningd --zmq-pub-connect=ipc:///tmp/cl-zmq \ +# --zmq-pub-disconnect=tcp://127.0.0.1:5555 +# +# The client must then be started with arguments for the same endpoints. +# Eg. to subscribe to those 'connect' and 'disconnect' notifications: +# +# $ python3 example-subscriber.py --zmq-sub-connect=ipc:///tmp/cl-zmq \ +# --zmq-sub-disconnect=tcp://127.0.0.1:5555 +# +# You can test receiving the subscription by connecting and disconnecting from +# nodes on the network: +# Eg. +# $ lightning-cli connect +# $ lightning-cli disconnect +# +############################################################################### + +import time +import json +import argparse + +from twisted.internet import reactor +from twisted.internet.task import LoopingCall + +from txzmq import ZmqEndpoint, ZmqEndpointType +from txzmq import ZmqFactory +from txzmq import ZmqSubConnection + +############################################################################### + +NOTIFICATION_TYPE_NAMES = ['channel_opened', + 'connect', + 'disconnect', + 'invoice_payment', + 'warning', + 'forward_event', + 'sendpay_success', + 'sendpay_failure'] + +class NotificationType(): + def __init__(self, notification_type_name): + self.notification_type_name = notification_type_name + + def __str__(self): + return self.notification_type_name + + def endpoint_option(self): + return "zmq-sub-{}".format(str(self).replace("_", "-")) + + def argparse_namespace_attribute(self): + return "zmq_sub_{}".format((self)) + +NOTIFICATION_TYPES = [NotificationType(n) for n in NOTIFICATION_TYPE_NAMES] + +############################################################################### + +class Subscriber(): + def __init__(self): + self.factory = ZmqFactory() + + def _log_message(self, message, tag): + tag = tag.decode("utf8") + message = json.dumps(json.loads(message.decode("utf8")), indent=1, + sort_keys=True) + current_time = time.strftime('%X %x %Z') + print("{} - {}\n{}".format(current_time, tag, message)) + + def _load_setup(self, setup): + for e, notification_type_names in setup.items(): + endpoint = ZmqEndpoint(ZmqEndpointType.connect, e) + connection = ZmqSubConnection(self.factory, endpoint) + for n in notification_type_names: + tag = n.encode("utf8") + connection.gotMessage = self._log_message + connection.subscribe(tag) + + def parse_and_load_settings(self, settings): + setup = {} + for nt in NOTIFICATION_TYPES: + attr = nt.argparse_namespace_attribute() + endpoint = getattr(settings, attr) + if endpoint is None: + continue + if endpoint not in setup: + setup[endpoint] = [] + setup[endpoint].append(str(nt)) + self._load_setup(setup) + + +############################################################################### + +parser = argparse.ArgumentParser(prog="example-subscriber.py") +for nt in NOTIFICATION_TYPES: + h = "subscribe to {} events published from this endpoint".format(nt) + parser.add_argument('--' + nt.endpoint_option(), type=str, help=h) +settings = parser.parse_args() + +subscriber = Subscriber() +subscriber.parse_and_load_settings(settings) +reactor.run()