From 90a1dc70e8fdd227b425433194a48a40d86dc563 Mon Sep 17 00:00:00 2001 From: Sergi Delgado Date: Fri, 26 Apr 2019 15:34:09 +0100 Subject: [PATCH] Separates back watcher and zmq subscriber Separates the logic of the watcher from the subscriber for two reasons:\n - First, we should abstract the subscriber as an entity that handles the underlaying bitcoin client and feeds parsed data to pipeline that the watcher can read from. That way we can have different types of subscriber (not necessarily based on zmq).\n - Secondly, the responder will need to also receive data from the subscriber (as a different instance) to keep track of the state of the response. --- pisa-btc/pisa/watcher.py | 28 ++-------------------------- pisa-btc/pisa/zmq_subscriber.py | 26 ++++++++++++++++++++++++++ 2 files changed, 28 insertions(+), 26 deletions(-) create mode 100644 pisa-btc/pisa/zmq_subscriber.py diff --git a/pisa-btc/pisa/watcher.py b/pisa-btc/pisa/watcher.py index 4bef098..be8b540 100644 --- a/pisa-btc/pisa/watcher.py +++ b/pisa-btc/pisa/watcher.py @@ -1,33 +1,9 @@ -import zmq -import binascii from queue import Queue from threading import Thread from pisa.tools import decrypt_tx +from pisa.zmq_subscriber import ZMQHandler from bitcoinrpc.authproxy import AuthServiceProxy, JSONRPCException -from conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT, FEED_PROTOCOL, FEED_ADDR, FEED_PORT, \ - MAX_APPOINTMENTS - - -class ZMQHandler: - """ Adapted from https://github.com/bitcoin/bitcoin/blob/master/contrib/zmq/zmq_sub.py""" - def __init__(self): - self.zmqContext = zmq.Context() - self.zmqSubSocket = self.zmqContext.socket(zmq.SUB) - self.zmqSubSocket.setsockopt(zmq.RCVHWM, 0) - self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock") - self.zmqSubSocket.connect("%s://%s:%s" % (FEED_PROTOCOL, FEED_ADDR, FEED_PORT)) - - def handle(self, block_queue, debug, logging): - msg = self.zmqSubSocket.recv_multipart() - topic = msg[0] - body = msg[1] - - if topic == b"hashblock": - block_hash = binascii.hexlify(body).decode('UTF-8') - block_queue.put(block_hash) - - if debug: - logging.info("[ZMQHandler] new block received via ZMQ".format(block_hash)) +from conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT, MAX_APPOINTMENTS class Watcher: diff --git a/pisa-btc/pisa/zmq_subscriber.py b/pisa-btc/pisa/zmq_subscriber.py new file mode 100644 index 0000000..dde3b44 --- /dev/null +++ b/pisa-btc/pisa/zmq_subscriber.py @@ -0,0 +1,26 @@ +import zmq +import binascii +from conf import FEED_PROTOCOL, FEED_ADDR, FEED_PORT + + +class ZMQHandler: + """ Adapted from https://github.com/bitcoin/bitcoin/blob/master/contrib/zmq/zmq_sub.py""" + def __init__(self): + self.zmqContext = zmq.Context() + self.zmqSubSocket = self.zmqContext.socket(zmq.SUB) + self.zmqSubSocket.setsockopt(zmq.RCVHWM, 0) + self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock") + self.zmqSubSocket.connect("%s://%s:%s" % (FEED_PROTOCOL, FEED_ADDR, FEED_PORT)) + + def handle(self, block_queue, debug, logging): + while True: + msg = self.zmqSubSocket.recv_multipart() + topic = msg[0] + body = msg[1] + + if topic == b"hashblock": + block_hash = binascii.hexlify(body).decode('UTF-8') + block_queue.put(block_hash) + + if debug: + logging.info("[ZMQHandler] new block received via ZMQ".format(block_hash))