Files
python-teos/tests/simulator/zmq_publisher.py
Sergi Delgado Segura 157456f164 Improves simulator and adds basic appointment test
The getblock rpc call was missing the block and previousblock hashes.
2019-08-19 18:18:10 +01:00

13 lines
385 B
Python

import zmq
class ZMQPublisher:
def __init__(self, topic, feed_protocol, feed_addr, feed_port):
self.topic = topic
self.context = zmq.Context()
self.socket = self.context.socket(zmq.PUB)
self.socket.bind("%s://%s:%s" % (feed_protocol, feed_addr, feed_port))
def publish_data(self, data):
self.socket.send_multipart([self.topic, data])