mirror of
https://github.com/aljazceru/python-teos.git
synced 2025-12-17 22:24:23 +01:00
Adds basic ChainMonitor to the system
ChainMonitor is the actor in charge of checking new blocks. It improves from the previous zmq_subscriber by also doing polling and, therefore, making sure that no blocks are missed. Documentation and tests are still required. Partially covers #31
This commit is contained in:
92
pisa/chain_monitor.py
Normal file
92
pisa/chain_monitor.py
Normal file
@@ -0,0 +1,92 @@
|
||||
import zmq
|
||||
import binascii
|
||||
from queue import Queue
|
||||
from threading import Thread, Event, Condition
|
||||
|
||||
from common.logger import Logger
|
||||
from pisa.conf import FEED_PROTOCOL, FEED_ADDR, FEED_PORT
|
||||
from pisa.block_processor import BlockProcessor
|
||||
|
||||
logger = Logger("ChainMonitor")
|
||||
|
||||
|
||||
class ChainMonitor:
|
||||
def __init__(self):
|
||||
self.best_tip = None
|
||||
self.last_tips = []
|
||||
self.terminate = False
|
||||
|
||||
self.check_tip = Event()
|
||||
self.lock = Condition()
|
||||
|
||||
self.zmqContext = zmq.Context()
|
||||
self.zmqSubSocket = self.zmqContext.socket(zmq.SUB)
|
||||
self.zmqSubSocket.setsockopt(zmq.RCVHWM, 0)
|
||||
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock")
|
||||
self.zmqSubSocket.connect("%s://%s:%s" % (FEED_PROTOCOL, FEED_ADDR, FEED_PORT))
|
||||
|
||||
self.watcher_queue = None
|
||||
self.watcher_asleep = True
|
||||
self.responder_queue = None
|
||||
self.responder_asleep = True
|
||||
|
||||
def attach_watcher(self, queue, awake):
|
||||
self.watcher_queue = queue
|
||||
self.watcher_asleep = awake
|
||||
|
||||
def attach_responder(self, queue, awake):
|
||||
self.responder_queue = queue
|
||||
self.responder_asleep = awake
|
||||
|
||||
def notify_subscribers(self, block_hash):
|
||||
if not self.watcher_asleep:
|
||||
self.watcher_queue.put(block_hash)
|
||||
|
||||
if not self.responder_asleep:
|
||||
self.responder_queue.put(block_hash)
|
||||
|
||||
def update_state(self, block_hash):
|
||||
self.best_tip = block_hash
|
||||
self.last_tips.append(block_hash)
|
||||
|
||||
if len(self.last_tips) > 10:
|
||||
self.last_tips.pop(0)
|
||||
|
||||
def monitor_chain(self):
|
||||
self.best_tip = BlockProcessor.get_best_block_hash()
|
||||
Thread(target=self.monitor_chain_polling).start()
|
||||
Thread(target=self.monitor_chain_zmq).start()
|
||||
|
||||
def monitor_chain_polling(self):
|
||||
while self.terminate:
|
||||
self.check_tip.wait(timeout=60)
|
||||
|
||||
# Terminate could have been set wile the thread was blocked in wait
|
||||
if not self.terminate:
|
||||
current_tip = BlockProcessor.get_best_block_hash()
|
||||
|
||||
self.lock.acquire()
|
||||
if current_tip != self.best_tip:
|
||||
self.update_state(current_tip)
|
||||
self.notify_subscribers(current_tip)
|
||||
logger.info("New block received via polling", block_hash=current_tip)
|
||||
self.lock.release()
|
||||
|
||||
def monitor_chain_zmq(self):
|
||||
while not self.terminate:
|
||||
msg = self.zmqSubSocket.recv_multipart()
|
||||
|
||||
# Terminate could have been set wile the thread was blocked in recv
|
||||
if not self.terminate:
|
||||
topic = msg[0]
|
||||
body = msg[1]
|
||||
|
||||
if topic == b"hashblock":
|
||||
block_hash = binascii.hexlify(body).decode("utf-8")
|
||||
|
||||
self.lock.acquire()
|
||||
if block_hash != self.best_tip and block_hash not in self.last_tips:
|
||||
self.update_state(block_hash)
|
||||
self.notify_subscribers(block_hash)
|
||||
logger.info("New block received via zmq", block_hash=block_hash)
|
||||
self.lock.release()
|
||||
@@ -10,6 +10,7 @@ from pisa.builder import Builder
|
||||
from pisa.conf import BTC_NETWORK, PISA_SECRET_KEY
|
||||
from pisa.responder import Responder
|
||||
from pisa.db_manager import DBManager
|
||||
from pisa.chain_monitor import ChainMonitor
|
||||
from pisa.block_processor import BlockProcessor
|
||||
from pisa.tools import can_connect_to_bitcoind, in_correct_network
|
||||
|
||||
@@ -46,13 +47,18 @@ if __name__ == "__main__":
|
||||
try:
|
||||
db_manager = DBManager(DB_PATH)
|
||||
|
||||
# Create the chain monitor and start monitoring the chain
|
||||
chain_monitor = ChainMonitor()
|
||||
chain_monitor.monitor_chain()
|
||||
|
||||
watcher_appointments_data = db_manager.load_watcher_appointments()
|
||||
responder_trackers_data = db_manager.load_responder_trackers()
|
||||
|
||||
with open(PISA_SECRET_KEY, "rb") as key_file:
|
||||
secret_key_der = key_file.read()
|
||||
|
||||
watcher = Watcher(db_manager, secret_key_der)
|
||||
watcher = Watcher(db_manager, chain_monitor, secret_key_der)
|
||||
chain_monitor.attach_watcher(watcher.block_queue, watcher.asleep)
|
||||
|
||||
if len(watcher_appointments_data) == 0 and len(responder_trackers_data) == 0:
|
||||
logger.info("Fresh bootstrap")
|
||||
@@ -65,7 +71,7 @@ if __name__ == "__main__":
|
||||
last_block_responder = db_manager.load_last_block_hash_responder()
|
||||
|
||||
# FIXME: 32-reorgs-offline dropped txs are not used at this point.
|
||||
responder = Responder(db_manager)
|
||||
responder = Responder(db_manager, chain_monitor)
|
||||
last_common_ancestor_responder = None
|
||||
missed_blocks_responder = None
|
||||
|
||||
@@ -82,6 +88,8 @@ if __name__ == "__main__":
|
||||
# Build Watcher with Responder and backed up data. If the blocks of both match we don't perform the
|
||||
# search twice.
|
||||
watcher.responder = responder
|
||||
chain_monitor.attach_responder(responder.block_queue, responder.asleep)
|
||||
|
||||
if last_block_watcher is not None:
|
||||
if last_block_watcher == last_block_responder:
|
||||
missed_blocks_watcher = missed_blocks_responder
|
||||
|
||||
@@ -6,7 +6,6 @@ from common.logger import Logger
|
||||
from pisa.cleaner import Cleaner
|
||||
from pisa.carrier import Carrier
|
||||
from pisa.block_processor import BlockProcessor
|
||||
from pisa.utils.zmq_subscriber import ZMQSubscriber
|
||||
|
||||
CONFIRMATIONS_BEFORE_RETRY = 6
|
||||
MIN_CONFIRMATIONS = 6
|
||||
@@ -135,14 +134,14 @@ class Responder:
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, db_manager):
|
||||
def __init__(self, db_manager, chain_monitor):
|
||||
self.trackers = dict()
|
||||
self.tx_tracker_map = dict()
|
||||
self.unconfirmed_txs = []
|
||||
self.missed_confirmations = dict()
|
||||
self.asleep = True
|
||||
self.block_queue = Queue()
|
||||
self.zmq_subscriber = None
|
||||
self.chain_monitor = chain_monitor
|
||||
self.db_manager = db_manager
|
||||
|
||||
@staticmethod
|
||||
@@ -260,19 +259,8 @@ class Responder:
|
||||
|
||||
if self.asleep:
|
||||
self.asleep = False
|
||||
zmq_thread = Thread(target=self.do_subscribe)
|
||||
responder = Thread(target=self.do_watch)
|
||||
zmq_thread.start()
|
||||
responder.start()
|
||||
|
||||
def do_subscribe(self):
|
||||
"""
|
||||
Initializes a :obj:`ZMQSubscriber <pisa.utils.zmq_subscriber.ZMQSubscriber>` instance to listen to new blocks
|
||||
from ``bitcoind``. Block ids are received trough the ``block_queue``.
|
||||
"""
|
||||
|
||||
self.zmq_subscriber = ZMQSubscriber(parent="Responder")
|
||||
self.zmq_subscriber.handle(self.block_queue)
|
||||
self.chain_monitor.responder_asleep = False
|
||||
Thread(target=self.do_watch).start()
|
||||
|
||||
def do_watch(self):
|
||||
"""
|
||||
@@ -327,8 +315,7 @@ class Responder:
|
||||
|
||||
# Go back to sleep if there are no more pending trackers
|
||||
self.asleep = True
|
||||
self.zmq_subscriber.terminate = True
|
||||
self.block_queue = Queue()
|
||||
self.chain_monitor.responder_asleep = True
|
||||
|
||||
logger.info("No more pending trackers, going back to sleep")
|
||||
|
||||
|
||||
@@ -1,34 +0,0 @@
|
||||
import zmq
|
||||
import binascii
|
||||
from common.logger import Logger
|
||||
from pisa.conf import FEED_PROTOCOL, FEED_ADDR, FEED_PORT
|
||||
|
||||
|
||||
# ToDo: #7-add-async-back-to-zmq
|
||||
class ZMQSubscriber:
|
||||
""" Adapted from https://github.com/bitcoin/bitcoin/blob/master/contrib/zmq/zmq_sub.py"""
|
||||
|
||||
def __init__(self, parent):
|
||||
self.zmqContext = zmq.Context()
|
||||
self.zmqSubSocket = self.zmqContext.socket(zmq.SUB)
|
||||
self.zmqSubSocket.setsockopt(zmq.RCVHWM, 0)
|
||||
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock")
|
||||
self.zmqSubSocket.connect("%s://%s:%s" % (FEED_PROTOCOL, FEED_ADDR, FEED_PORT))
|
||||
self.logger = Logger("ZMQSubscriber-{}".format(parent))
|
||||
|
||||
self.terminate = False
|
||||
|
||||
def handle(self, block_queue):
|
||||
while not self.terminate:
|
||||
msg = self.zmqSubSocket.recv_multipart()
|
||||
|
||||
# Terminate could have been set wile the thread was blocked in recv
|
||||
if not self.terminate:
|
||||
topic = msg[0]
|
||||
body = msg[1]
|
||||
|
||||
if topic == b"hashblock":
|
||||
block_hash = binascii.hexlify(body).decode("utf-8")
|
||||
block_queue.put(block_hash)
|
||||
|
||||
self.logger.info("New block received via ZMQ", block_hash=block_hash)
|
||||
@@ -9,7 +9,6 @@ from common.logger import Logger
|
||||
from pisa.cleaner import Cleaner
|
||||
from pisa.responder import Responder
|
||||
from pisa.block_processor import BlockProcessor
|
||||
from pisa.utils.zmq_subscriber import ZMQSubscriber
|
||||
from pisa.conf import EXPIRY_DELTA, MAX_APPOINTMENTS
|
||||
|
||||
logger = Logger("Watcher")
|
||||
@@ -58,13 +57,13 @@ class Watcher:
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, db_manager, sk_der, responder=None, max_appointments=MAX_APPOINTMENTS):
|
||||
def __init__(self, db_manager, chain_monitor, sk_der, responder=None, max_appointments=MAX_APPOINTMENTS):
|
||||
self.appointments = dict()
|
||||
self.locator_uuid_map = dict()
|
||||
self.asleep = True
|
||||
self.block_queue = Queue()
|
||||
self.max_appointments = max_appointments
|
||||
self.zmq_subscriber = None
|
||||
self.chain_monitor = chain_monitor
|
||||
self.db_manager = db_manager
|
||||
self.signing_key = Cryptographer.load_private_key_der(sk_der)
|
||||
|
||||
@@ -127,10 +126,8 @@ class Watcher:
|
||||
|
||||
if self.asleep:
|
||||
self.asleep = False
|
||||
zmq_thread = Thread(target=self.do_subscribe)
|
||||
watcher = Thread(target=self.do_watch)
|
||||
zmq_thread.start()
|
||||
watcher.start()
|
||||
self.chain_monitor.watcher_asleep = False
|
||||
Thread(target=self.do_watch).start()
|
||||
|
||||
logger.info("Waking up")
|
||||
|
||||
@@ -151,15 +148,6 @@ class Watcher:
|
||||
|
||||
return appointment_added, signature
|
||||
|
||||
def do_subscribe(self):
|
||||
"""
|
||||
Initializes a ``ZMQSubscriber`` instance to listen to new blocks from ``bitcoind``. Block ids are received
|
||||
trough the ``block_queue``.
|
||||
"""
|
||||
|
||||
self.zmq_subscriber = ZMQSubscriber(parent="Watcher")
|
||||
self.zmq_subscriber.handle(self.block_queue)
|
||||
|
||||
def do_watch(self):
|
||||
"""
|
||||
Monitors the blockchain whilst there are pending appointments.
|
||||
@@ -221,8 +209,7 @@ class Watcher:
|
||||
|
||||
# Go back to sleep if there are no more appointments
|
||||
self.asleep = True
|
||||
self.zmq_subscriber.terminate = True
|
||||
self.block_queue = Queue()
|
||||
self.chain_monitor.watcher_asleep = True
|
||||
|
||||
logger.info("No more pending appointments, going back to sleep")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user