From 21eed576b2561b6f1851a6a91a7bb77df2a9f33b Mon Sep 17 00:00:00 2001 From: Sergi Delgado Date: Thu, 4 Apr 2019 17:34:26 +0100 Subject: [PATCH] Refactors tx_watcher and zmq_subscriber Merges tx_watcher and zmq_subscriber into a single file: watcher. Watcher will handle all the watching process and container two inner classes, the watcher and the ZMQHandler --- .gitignore | 2 +- pisa-btc/pisa/tx_watcher.py | 23 ---------- pisa-btc/pisa/watcher.py | 79 +++++++++++++++++++++++++++++++++ pisa-btc/pisa/zmq_subscriber.py | 37 --------------- 4 files changed, 80 insertions(+), 61 deletions(-) delete mode 100644 pisa-btc/pisa/tx_watcher.py create mode 100644 pisa-btc/pisa/watcher.py delete mode 100644 pisa-btc/pisa/zmq_subscriber.py diff --git a/.gitignore b/.gitignore index 493f51b..7694bde 100644 --- a/.gitignore +++ b/.gitignore @@ -13,6 +13,6 @@ logs/ .idea conf.py bitcoin.conf -__pycache__/* +*__pycache__ .pending* diff --git a/pisa-btc/pisa/tx_watcher.py b/pisa-btc/pisa/tx_watcher.py deleted file mode 100644 index ed55d6a..0000000 --- a/pisa-btc/pisa/tx_watcher.py +++ /dev/null @@ -1,23 +0,0 @@ -from pisa import shared -from bitcoinrpc.authproxy import AuthServiceProxy, JSONRPCException -from conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT - - -def watch_txs(debug): - bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT)) - - while True: - block_hash = shared.block_queue.get() - - try: - block = bitcoin_cli.getblock(block_hash) - - prev_tx_id = block.get('previousblockhash') - txs = block.get('tx') - - if debug: - # Log shit - print(prev_tx_id, txs) - - except JSONRPCException as e: - print(e) diff --git a/pisa-btc/pisa/watcher.py b/pisa-btc/pisa/watcher.py new file mode 100644 index 0000000..b584ee4 --- /dev/null +++ b/pisa-btc/pisa/watcher.py @@ -0,0 +1,79 @@ +import zmq +import binascii +from queue import Queue +from threading import Thread +from bitcoinrpc.authproxy import AuthServiceProxy, JSONRPCException +from conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT, FEED_PROTOCOL, FEED_ADDR, FEED_PORT + + +class ZMQHandler: + """ Adapted from https://github.com/bitcoin/bitcoin/blob/master/contrib/zmq/zmq_sub.py""" + def __init__(self): + self.zmqContext = zmq.Context() + self.zmqSubSocket = self.zmqContext.socket(zmq.SUB) + self.zmqSubSocket.setsockopt(zmq.RCVHWM, 0) + self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock") + self.zmqSubSocket.connect("%s://%s:%s" % (FEED_PROTOCOL, FEED_ADDR, FEED_PORT)) + + def handle(self, debug, block_queue): + msg = self.zmqSubSocket.recv_multipart() + topic = msg[0] + body = msg[1] + + if topic == b"hashblock": + block_hash = binascii.hexlify(body).decode('UTF-8') + block_queue.put(block_hash) + + if debug: + print("New block received from Core ", block_hash) + + +class Watcher: + def __init__(self, debug): + self.appointments = [] + self.sleep = True + self.debug = debug + self.block_queue = Queue() + + def add_appointment(self, appointment): + # ToDo: Discuss about validation of input data + self.appointments.append(appointment) + + if self.sleep: + self.sleep = False + zmq_subscriber = Thread(target=self.do_subscribe, args=[self.block_queue]) + zmq_subscriber.start() + self.do_watch() + + # Rationale: + # The Watcher will analyze every received block looking for appointment matches. If there is no work + # to do the watcher can sleep (appointments = [] and sleep = True) otherwise for every received block + # the watcher will get the list of transactions and compare it with the list of appointments + + def do_subscribe(self, block_queue): + daemon = ZMQHandler() + daemon.handle(self.debug, block_queue) + + def do_watch(self): + bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, + BTC_RPC_PORT)) + + while len(self.appointments) > 0: + block_hash = self.block_queue.get() + + try: + block = bitcoin_cli.getblock(block_hash) + + prev_block_id = block.get('previousblockhash') + txs = block.get('tx') + + # ToDo: Check for every tx in txs if there is an appointment that matches (MS 16-bytes) + + if self.debug: + print("New block received ", block_hash) + print("Prev. block hash ", prev_block_id) + print("List of transactions", txs) + + except JSONRPCException as e: + print(e) + diff --git a/pisa-btc/pisa/zmq_subscriber.py b/pisa-btc/pisa/zmq_subscriber.py deleted file mode 100644 index c572f3f..0000000 --- a/pisa-btc/pisa/zmq_subscriber.py +++ /dev/null @@ -1,37 +0,0 @@ -""" -Adapted from https://github.com/bitcoin/bitcoin/blob/master/contrib/zmq/zmq_sub.py -""" - -import binascii -import zmq -import conf -from pisa import shared - - -class ZMQHandler: - def __init__(self): - self.zmqContext = zmq.Context() - - self.zmqSubSocket = self.zmqContext.socket(zmq.SUB) - self.zmqSubSocket.setsockopt(zmq.RCVHWM, 0) - self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock") - - self.zmqSubSocket.connect("%s://%s:%s" % (conf.FEED_PROTOCOL, conf.FEED_ADDR, conf.FEED_PORT)) - - def handle(self, debug): - msg = self.zmqSubSocket.recv_multipart() - topic = msg[0] - body = msg[1] - - if topic == b"hashblock": - block_hash = binascii.hexlify(body).decode('UTF-8') - shared.block_queue.put(block_hash) - - if debug: - # Log shit - pass - - -def run_subscribe(debug): - daemon = ZMQHandler() - daemon.handle(debug)