ZeroMQ publisher plugin

This commit is contained in:
Jarret Dyrbye
2019-12-01 14:11:23 -07:00
committed by Christian Decker
parent eab10a051b
commit ec65c873b6
4 changed files with 367 additions and 0 deletions

View File

@@ -22,6 +22,7 @@ Community curated plugins for c-lightning.
| [sendinvoiceless][sendinvoiceless] | Sends some money without an invoice from the receiving node. |
| [sitzprobe][sitzprobe] | A Lightning Network payment rehearsal utility |
| [summary][summary] | Print a nice summary of the node status |
| [zmq][zmq] | Publishes notifications via [ZeroMQ][zmq-home] to configured endpoints |
## Installation
@@ -136,3 +137,5 @@ your environment.
[js-api]: https://github.com/darosior/clightningjs
[monitor]: https://github.com/renepickhardt/plugins/tree/master/monitor
[reckless]: https://github.com/darosior/reckless
[zmq-home]: https://zeromq.org/
[zmq]: https://github.com/lightningd/plugins/tree/master/zmq

43
zmq/README.md Normal file
View File

@@ -0,0 +1,43 @@
# ZeroMQ Publisher Plugin
This module forwards [notifications](https://github.com/ElementsProject/lightning/blob/master/doc/PLUGINS.md#notification-types) to ZeroMQ endpoints depending on configuration.
The usage and setup mimics [similar functionality in `bitcoind`](https://github.com/bitcoin/bitcoin/blob/master/doc/zmq.md) for opting-in to notifications and selecting [high water mark (ZMQ\_HWM)](http://api.zeromq.org/2-1:zmq-setsockopt) preferences.
## Dependencies
[Twisted](https://twistedmatrix.com) and [txZMQ](https://pypi.org/project/txZMQ/) are used by this plugin.
```
$ sudo pip3 install twisted txzmq
```
## Installation
For general plugin installation instructions see the repos main
[README.md](https://github.com/lightningd/plugins/blob/master/README.md#Installation)
## Usage
The plugin registeres CLI options for opting in to notifications at given endpoints.
Eg running with:
```
$ lightningd --zmq-pub-connect=ipc:///tmp/cl-zmq \
--zmq-pub-disconnect=tcp://127.0.0.1:5555
```
will publish `connect` and `disconnect` notifications to the specified endpoints. The default high water mark is 1000, which is the same as `bitcoind`'s default.
This plugin does not interpret the content of the data, merely passes it on.
The ZMQ `tag` used for subscribing is the UTF-8 encoded string of the notification type name string.
Eg. for the `invoice_payment` notification, the tag for subscribers will be `b'invoice_payment'`. The data published will be UTF-8 encoded JSON which comes from `lightningd`.
## Example
[example-subscriber.py](example-subscriber.py) is provided as an example subscriber to mirror the publishing code.
## Tips and Tricks
- The plugin subscribes to all notifications under in the `NOTIFICATION_TYPE_NAMES` regardless of whether they are connected to ZMQ endpoints via CLI launch. For avoiding that performance overhead, entries can be dropped from the list to avoid subscribing to them.
- Unless there are changes to the plugin interfaces, notification types that are added in the future should be easy to include by adding the value to the `NOTIFICATION_TYPE_NAMES` list.

206
zmq/cl-zmq.py Executable file
View File

@@ -0,0 +1,206 @@
#!/usr/bin/env python3
# Copyright (c) 2019 lightningd
# Distributed under the BSD 3-Clause License, see the accompanying file LICENSE
###############################################################################
# ZeroMQ publishing plugin for lightningd
#
# Using Twisted and txZMQ frameworks, this plugin binds to ZeroMQ endpoints and
# publishes notification of all possible subscriptions that have been opted-in
# for via lightningd launch parameter.
#
# This plugin doesn't interpret any of the content of the data which comes out
# of lightningd, it merely passes the received JSON through as encoded UTF-8,
# with the 'tag' being set to the Notification Type name (also encoded as
# UTF-8). It follows that adding future possible subscriptions *should* be as
# easy as appending it to NOTIFICATION_TYPE_NAMES below.
#
# The user-selectable configuration takes inspiration from the bitcoind ZeroMQ
# integration. The endpoint must be explicitly given as an argument to enable
# it. Also, the high water mark argument for the binding is set as an
# additional launch option.
#
# Due to how the plugins must register via getmanifest, this will opt-in to all
# subscriptions and ignore the messages from ones not bound to ZMQ endpoints.
# Hence, there might be a minor performance impact from subscription messages
# that result in no publish action. This can be mitigated by dropping
# notifications that are not of interest to your ZeroMQ subscribers from
# NOTIFICATION_TYPE_NAMES below.
###############################################################################
import time
import json
import functools
from twisted.internet import reactor
from txzmq import ZmqEndpoint, ZmqEndpointType
from txzmq import ZmqFactory
from txzmq import ZmqPubConnection
from lightning import Plugin
###############################################################################
NOTIFICATION_TYPE_NAMES = ['channel_opened',
'connect',
'disconnect',
'invoice_payment',
'warning',
'forward_event',
'sendpay_success',
'sendpay_failure']
class NotificationType():
""" Wrapper for notification type string to generate the corresponding
plugin option strings. By convention of lightningd, the cli options
use dashes in place of rather than underscores or no spaces."""
def __init__(self, notification_type_name):
self.notification_type_name = notification_type_name
def __str__(self):
return self.notification_type_name
def endpoint_option(self):
return "zmq-pub-{}".format(str(self).replace("_", "-"))
def hwm_option(self):
return "zmq-pub-{}-hwm".format(str(self).replace("_", "-"))
NOTIFICATION_TYPES = [NotificationType(n) for n in NOTIFICATION_TYPE_NAMES]
###############################################################################
class Publisher():
""" Holds the connection state and accepts incoming notifications that
come from the subscription. If there is an associated publishing
endpoint connected, it will encode and pass the contents of the
notification. """
def __init__(self):
self.factory = ZmqFactory()
self.connection_map = {}
def load_setup(self, setup):
for e, s in setup.items():
endpoint = ZmqEndpoint(ZmqEndpointType.bind, e)
ZmqPubConnection.highWaterMark = s['high_water_mark']
connection = ZmqPubConnection(self.factory, endpoint)
for n in s['notification_type_names']:
self.connection_map[n] = connection
def publish_notification(self, notification_type_name, *args, **kwargs):
if notification_type_name not in self.connection_map:
return
tag = notification_type_name.encode("utf8")
message = json.dumps(kwargs).encode("utf8")
connection = self.connection_map[notification_type_name]
connection.publish(message, tag=tag)
publisher = Publisher()
###############################################################################
ZMQ_TRANSPORT_PREFIXES = ['tcp://', "ipc://", 'inproc://', "pgm://", "epgm://"]
class Setup():
""" Does some light validation of the plugin option input and generates a
dictionary to configure the Twisted and ZeroMQ setup """
def _at_least_one_binding(options):
n_bindings = sum(1 for o, v in options.items() if
not o.endswith("-hwm") and v != "null")
return n_bindings > 0
def _iter_endpoints_not_ok(options):
for nt in NOTIFICATION_TYPES:
endpoint_opt = nt.endpoint_option()
endpoint = options[endpoint_opt]
if endpoint != "null":
if len([1 for prefix in ZMQ_TRANSPORT_PREFIXES if
endpoint.startswith(prefix)]) != 0:
continue
yield endpoint
def check_option_warnings(options, plugin):
if not Setup._at_least_one_binding(options):
plugin.log("No zmq publish sockets are bound as per launch args",
level='warn')
for endpoint in Setup._iter_endpoints_not_ok(options):
plugin.log(("Endpoint option {} doesn't appear to be recognized"
).format(endpoint), level='warn')
###########################################################################
def _iter_endpoint_setup(options):
for nt in NOTIFICATION_TYPES:
endpoint_opt = nt.endpoint_option()
if options[endpoint_opt] == "null":
continue
endpoint = options[endpoint_opt]
hwm_opt = nt.hwm_option()
hwm = int(options[hwm_opt])
yield endpoint, nt, hwm
def get_setup_dict(options):
setup = {}
for e, nt, hwm in Setup._iter_endpoint_setup(options):
if e not in setup:
setup[e] = {'notification_type_names': [],
'high_water_mark': hwm}
setup[e]['notification_type_names'].append(str(nt))
# use the lowest high water mark given for the endpoint
setup[e]['high_water_mark'] = min(
setup[e]['high_water_mark'], hwm)
return setup
###########################################################################
def log_setup_dict(setup, plugin):
for e, s in setup.items():
m = ("Endpoint {} will get events from {} subscriptions "
"published with high water mark {}")
m = m.format(e, s['notification_type_names'], s['high_water_mark'])
plugin.log(m)
###############################################################################
plugin = Plugin()
@plugin.init()
def init(options, configuration, plugin, **kwargs):
Setup.check_option_warnings(options, plugin)
setup_dict = Setup.get_setup_dict(options)
Setup.log_setup_dict(setup_dict, plugin)
reactor.callFromThread(publisher.load_setup, setup_dict)
def on_notification(notification_type_name, plugin, *args, **kwargs):
if len(args) != 0:
plugin.log("got unexpected args: {}".format(args), level="warn")
reactor.callFromThread(publisher.publish_notification,
notification_type_name, *args, **kwargs)
DEFAULT_HIGH_WATER_MARK = 1000
for nt in NOTIFICATION_TYPES:
# subscribe to all notifications
on = functools.partial(on_notification, str(nt))
on.__annotations__ = {} # needed to please Plugin._coerce_arguments()
plugin.add_subscription(str(nt), on)
# zmq socket binding option
endpoint_opt = nt.endpoint_option()
endpoint_desc = "Enable publish {} info to ZMQ socket endpoint".format(nt)
plugin.add_option(endpoint_opt, None, endpoint_desc, opt_type='string')
# high water mark option
hwm_opt = nt.hwm_option()
hwm_desc = ("Set publish {} info message high water mark "
"(default: {})".format(nt, DEFAULT_HIGH_WATER_MARK))
plugin.add_option(hwm_opt, DEFAULT_HIGH_WATER_MARK, hwm_desc,
opt_type='int')
###############################################################################
def plugin_thread():
plugin.run()
reactor.callFromThread(reactor.stop)
reactor.callInThread(plugin_thread)
reactor.run()

115
zmq/example-subscriber.py Executable file
View File

@@ -0,0 +1,115 @@
#!/usr/bin/env python3
# Copyright (c) 2019 lightningd
# Distributed under the BSD 3-Clause License, see the accompanying file LICENSE
###############################################################################
# An example ZeroMQ subscriber client for the plugin.
#
# This module connects and subscribes to ZeroMQ endpoints, decodes the messages
# and simply logs the JSON payload to stdout.
#
# It also uses Twisted and txZMQ frameworks, but on the subscriber side.
#
# To use, the plugin must be loaded for lightningd and the parameters must be
# given as launch arguments to publish notifications at given endpoints.
# Eg. for publishing 'connect' and 'disconnect' notifications:.
# $ lightningd --zmq-pub-connect=ipc:///tmp/cl-zmq \
# --zmq-pub-disconnect=tcp://127.0.0.1:5555
#
# The client must then be started with arguments for the same endpoints.
# Eg. to subscribe to those 'connect' and 'disconnect' notifications:
#
# $ python3 example-subscriber.py --zmq-sub-connect=ipc:///tmp/cl-zmq \
# --zmq-sub-disconnect=tcp://127.0.0.1:5555
#
# You can test receiving the subscription by connecting and disconnecting from
# nodes on the network:
# Eg.
# $ lightning-cli connect <node-id>
# $ lightning-cli disconnect <node-id>
#
###############################################################################
import time
import json
import argparse
from twisted.internet import reactor
from twisted.internet.task import LoopingCall
from txzmq import ZmqEndpoint, ZmqEndpointType
from txzmq import ZmqFactory
from txzmq import ZmqSubConnection
###############################################################################
NOTIFICATION_TYPE_NAMES = ['channel_opened',
'connect',
'disconnect',
'invoice_payment',
'warning',
'forward_event',
'sendpay_success',
'sendpay_failure']
class NotificationType():
def __init__(self, notification_type_name):
self.notification_type_name = notification_type_name
def __str__(self):
return self.notification_type_name
def endpoint_option(self):
return "zmq-sub-{}".format(str(self).replace("_", "-"))
def argparse_namespace_attribute(self):
return "zmq_sub_{}".format((self))
NOTIFICATION_TYPES = [NotificationType(n) for n in NOTIFICATION_TYPE_NAMES]
###############################################################################
class Subscriber():
def __init__(self):
self.factory = ZmqFactory()
def _log_message(self, message, tag):
tag = tag.decode("utf8")
message = json.dumps(json.loads(message.decode("utf8")), indent=1,
sort_keys=True)
current_time = time.strftime('%X %x %Z')
print("{} - {}\n{}".format(current_time, tag, message))
def _load_setup(self, setup):
for e, notification_type_names in setup.items():
endpoint = ZmqEndpoint(ZmqEndpointType.connect, e)
connection = ZmqSubConnection(self.factory, endpoint)
for n in notification_type_names:
tag = n.encode("utf8")
connection.gotMessage = self._log_message
connection.subscribe(tag)
def parse_and_load_settings(self, settings):
setup = {}
for nt in NOTIFICATION_TYPES:
attr = nt.argparse_namespace_attribute()
endpoint = getattr(settings, attr)
if endpoint is None:
continue
if endpoint not in setup:
setup[endpoint] = []
setup[endpoint].append(str(nt))
self._load_setup(setup)
###############################################################################
parser = argparse.ArgumentParser(prog="example-subscriber.py")
for nt in NOTIFICATION_TYPES:
h = "subscribe to {} events published from this endpoint".format(nt)
parser.add_argument('--' + nt.endpoint_option(), type=str, help=h)
settings = parser.parse_args()
subscriber = Subscriber()
subscriber.parse_and_load_settings(settings)
reactor.run()