diff --git a/pisa/chain_monitor.py b/pisa/chain_monitor.py new file mode 100644 index 0000000..74b21f1 --- /dev/null +++ b/pisa/chain_monitor.py @@ -0,0 +1,92 @@ +import zmq +import binascii +from queue import Queue +from threading import Thread, Event, Condition + +from common.logger import Logger +from pisa.conf import FEED_PROTOCOL, FEED_ADDR, FEED_PORT +from pisa.block_processor import BlockProcessor + +logger = Logger("ChainMonitor") + + +class ChainMonitor: + def __init__(self): + self.best_tip = None + self.last_tips = [] + self.terminate = False + + self.check_tip = Event() + self.lock = Condition() + + self.zmqContext = zmq.Context() + self.zmqSubSocket = self.zmqContext.socket(zmq.SUB) + self.zmqSubSocket.setsockopt(zmq.RCVHWM, 0) + self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock") + self.zmqSubSocket.connect("%s://%s:%s" % (FEED_PROTOCOL, FEED_ADDR, FEED_PORT)) + + self.watcher_queue = None + self.watcher_asleep = True + self.responder_queue = None + self.responder_asleep = True + + def attach_watcher(self, queue, awake): + self.watcher_queue = queue + self.watcher_asleep = awake + + def attach_responder(self, queue, awake): + self.responder_queue = queue + self.responder_asleep = awake + + def notify_subscribers(self, block_hash): + if not self.watcher_asleep: + self.watcher_queue.put(block_hash) + + if not self.responder_asleep: + self.responder_queue.put(block_hash) + + def update_state(self, block_hash): + self.best_tip = block_hash + self.last_tips.append(block_hash) + + if len(self.last_tips) > 10: + self.last_tips.pop(0) + + def monitor_chain(self): + self.best_tip = BlockProcessor.get_best_block_hash() + Thread(target=self.monitor_chain_polling).start() + Thread(target=self.monitor_chain_zmq).start() + + def monitor_chain_polling(self): + while self.terminate: + self.check_tip.wait(timeout=60) + + # Terminate could have been set wile the thread was blocked in wait + if not self.terminate: + current_tip = BlockProcessor.get_best_block_hash() + + self.lock.acquire() + if current_tip != self.best_tip: + self.update_state(current_tip) + self.notify_subscribers(current_tip) + logger.info("New block received via polling", block_hash=current_tip) + self.lock.release() + + def monitor_chain_zmq(self): + while not self.terminate: + msg = self.zmqSubSocket.recv_multipart() + + # Terminate could have been set wile the thread was blocked in recv + if not self.terminate: + topic = msg[0] + body = msg[1] + + if topic == b"hashblock": + block_hash = binascii.hexlify(body).decode("utf-8") + + self.lock.acquire() + if block_hash != self.best_tip and block_hash not in self.last_tips: + self.update_state(block_hash) + self.notify_subscribers(block_hash) + logger.info("New block received via zmq", block_hash=block_hash) + self.lock.release() diff --git a/pisa/pisad.py b/pisa/pisad.py index 9c258ce..ed185c6 100644 --- a/pisa/pisad.py +++ b/pisa/pisad.py @@ -10,6 +10,7 @@ from pisa.builder import Builder from pisa.conf import BTC_NETWORK, PISA_SECRET_KEY from pisa.responder import Responder from pisa.db_manager import DBManager +from pisa.chain_monitor import ChainMonitor from pisa.block_processor import BlockProcessor from pisa.tools import can_connect_to_bitcoind, in_correct_network @@ -46,13 +47,18 @@ if __name__ == "__main__": try: db_manager = DBManager(DB_PATH) + # Create the chain monitor and start monitoring the chain + chain_monitor = ChainMonitor() + chain_monitor.monitor_chain() + watcher_appointments_data = db_manager.load_watcher_appointments() responder_trackers_data = db_manager.load_responder_trackers() with open(PISA_SECRET_KEY, "rb") as key_file: secret_key_der = key_file.read() - watcher = Watcher(db_manager, secret_key_der) + watcher = Watcher(db_manager, chain_monitor, secret_key_der) + chain_monitor.attach_watcher(watcher.block_queue, watcher.asleep) if len(watcher_appointments_data) == 0 and len(responder_trackers_data) == 0: logger.info("Fresh bootstrap") @@ -65,7 +71,7 @@ if __name__ == "__main__": last_block_responder = db_manager.load_last_block_hash_responder() # FIXME: 32-reorgs-offline dropped txs are not used at this point. - responder = Responder(db_manager) + responder = Responder(db_manager, chain_monitor) last_common_ancestor_responder = None missed_blocks_responder = None @@ -82,6 +88,8 @@ if __name__ == "__main__": # Build Watcher with Responder and backed up data. If the blocks of both match we don't perform the # search twice. watcher.responder = responder + chain_monitor.attach_responder(responder.block_queue, responder.asleep) + if last_block_watcher is not None: if last_block_watcher == last_block_responder: missed_blocks_watcher = missed_blocks_responder diff --git a/pisa/responder.py b/pisa/responder.py index 0fce1cb..dbfac15 100644 --- a/pisa/responder.py +++ b/pisa/responder.py @@ -6,7 +6,6 @@ from common.logger import Logger from pisa.cleaner import Cleaner from pisa.carrier import Carrier from pisa.block_processor import BlockProcessor -from pisa.utils.zmq_subscriber import ZMQSubscriber CONFIRMATIONS_BEFORE_RETRY = 6 MIN_CONFIRMATIONS = 6 @@ -135,14 +134,14 @@ class Responder: """ - def __init__(self, db_manager): + def __init__(self, db_manager, chain_monitor): self.trackers = dict() self.tx_tracker_map = dict() self.unconfirmed_txs = [] self.missed_confirmations = dict() self.asleep = True self.block_queue = Queue() - self.zmq_subscriber = None + self.chain_monitor = chain_monitor self.db_manager = db_manager @staticmethod @@ -260,19 +259,8 @@ class Responder: if self.asleep: self.asleep = False - zmq_thread = Thread(target=self.do_subscribe) - responder = Thread(target=self.do_watch) - zmq_thread.start() - responder.start() - - def do_subscribe(self): - """ - Initializes a :obj:`ZMQSubscriber ` instance to listen to new blocks - from ``bitcoind``. Block ids are received trough the ``block_queue``. - """ - - self.zmq_subscriber = ZMQSubscriber(parent="Responder") - self.zmq_subscriber.handle(self.block_queue) + self.chain_monitor.responder_asleep = False + Thread(target=self.do_watch).start() def do_watch(self): """ @@ -327,8 +315,7 @@ class Responder: # Go back to sleep if there are no more pending trackers self.asleep = True - self.zmq_subscriber.terminate = True - self.block_queue = Queue() + self.chain_monitor.responder_asleep = True logger.info("No more pending trackers, going back to sleep") diff --git a/pisa/utils/zmq_subscriber.py b/pisa/utils/zmq_subscriber.py deleted file mode 100644 index ecec9af..0000000 --- a/pisa/utils/zmq_subscriber.py +++ /dev/null @@ -1,34 +0,0 @@ -import zmq -import binascii -from common.logger import Logger -from pisa.conf import FEED_PROTOCOL, FEED_ADDR, FEED_PORT - - -# ToDo: #7-add-async-back-to-zmq -class ZMQSubscriber: - """ Adapted from https://github.com/bitcoin/bitcoin/blob/master/contrib/zmq/zmq_sub.py""" - - def __init__(self, parent): - self.zmqContext = zmq.Context() - self.zmqSubSocket = self.zmqContext.socket(zmq.SUB) - self.zmqSubSocket.setsockopt(zmq.RCVHWM, 0) - self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock") - self.zmqSubSocket.connect("%s://%s:%s" % (FEED_PROTOCOL, FEED_ADDR, FEED_PORT)) - self.logger = Logger("ZMQSubscriber-{}".format(parent)) - - self.terminate = False - - def handle(self, block_queue): - while not self.terminate: - msg = self.zmqSubSocket.recv_multipart() - - # Terminate could have been set wile the thread was blocked in recv - if not self.terminate: - topic = msg[0] - body = msg[1] - - if topic == b"hashblock": - block_hash = binascii.hexlify(body).decode("utf-8") - block_queue.put(block_hash) - - self.logger.info("New block received via ZMQ", block_hash=block_hash) diff --git a/pisa/watcher.py b/pisa/watcher.py index 9d659db..d6e0729 100644 --- a/pisa/watcher.py +++ b/pisa/watcher.py @@ -9,7 +9,6 @@ from common.logger import Logger from pisa.cleaner import Cleaner from pisa.responder import Responder from pisa.block_processor import BlockProcessor -from pisa.utils.zmq_subscriber import ZMQSubscriber from pisa.conf import EXPIRY_DELTA, MAX_APPOINTMENTS logger = Logger("Watcher") @@ -58,13 +57,13 @@ class Watcher: """ - def __init__(self, db_manager, sk_der, responder=None, max_appointments=MAX_APPOINTMENTS): + def __init__(self, db_manager, chain_monitor, sk_der, responder=None, max_appointments=MAX_APPOINTMENTS): self.appointments = dict() self.locator_uuid_map = dict() self.asleep = True self.block_queue = Queue() self.max_appointments = max_appointments - self.zmq_subscriber = None + self.chain_monitor = chain_monitor self.db_manager = db_manager self.signing_key = Cryptographer.load_private_key_der(sk_der) @@ -127,10 +126,8 @@ class Watcher: if self.asleep: self.asleep = False - zmq_thread = Thread(target=self.do_subscribe) - watcher = Thread(target=self.do_watch) - zmq_thread.start() - watcher.start() + self.chain_monitor.watcher_asleep = False + Thread(target=self.do_watch).start() logger.info("Waking up") @@ -151,15 +148,6 @@ class Watcher: return appointment_added, signature - def do_subscribe(self): - """ - Initializes a ``ZMQSubscriber`` instance to listen to new blocks from ``bitcoind``. Block ids are received - trough the ``block_queue``. - """ - - self.zmq_subscriber = ZMQSubscriber(parent="Watcher") - self.zmq_subscriber.handle(self.block_queue) - def do_watch(self): """ Monitors the blockchain whilst there are pending appointments. @@ -221,8 +209,7 @@ class Watcher: # Go back to sleep if there are no more appointments self.asleep = True - self.zmq_subscriber.terminate = True - self.block_queue = Queue() + self.chain_monitor.watcher_asleep = True logger.info("No more pending appointments, going back to sleep")