mirror of
https://github.com/aljazceru/python-teos.git
synced 2025-12-17 22:24:23 +01:00
Updates teos to work with the new conf file format and redefines how objects are built
Configuration parameters are now load from a .conf file (see template.conf as an example). The code has 3 config levels: default (teos/__init__.py), config file (<data_dir>/teos.conf) and command line. Most of the config parameters are only modificable trough the config file for now. The priority order is: command line, config file, default. Objects that need config parameters are now built in teosd instead of inside other classes. Therefore teosd acts like a factory and config parameters don't have to be passed around between objects. This also implies that a lot of the object creation logic has been changed. This should simplify unit testing.
This commit is contained in:
@@ -4,8 +4,6 @@ from threading import Thread, Event, Condition
|
||||
|
||||
from teos import LOG_PREFIX
|
||||
from common.logger import Logger
|
||||
from teos.conf import FEED_PROTOCOL, FEED_ADDR, FEED_PORT, POLLING_DELTA, BLOCK_WINDOW_SIZE
|
||||
from teos.block_processor import BlockProcessor
|
||||
|
||||
logger = Logger(actor="ChainMonitor", log_name_prefix=LOG_PREFIX)
|
||||
|
||||
@@ -22,6 +20,8 @@ class ChainMonitor:
|
||||
Args:
|
||||
watcher_queue (:obj:`Queue`): the queue to be used to send blocks hashes to the ``Watcher``.
|
||||
responder_queue (:obj:`Queue`): the queue to be used to send blocks hashes to the ``Responder``.
|
||||
block_processor (:obj:`BlockProcessor <teos.block_processor.BlockProcessor>`): a blockProcessor instance.
|
||||
bitcoind_feed_params (:obj:`dict`): a dict with the feed (ZMQ) connection parameters.
|
||||
|
||||
Attributes:
|
||||
best_tip (:obj:`str`): a block hash representing the current best tip.
|
||||
@@ -34,9 +34,13 @@ class ChainMonitor:
|
||||
watcher_queue (:obj:`Queue`): a queue to send new best tips to the :obj:`Watcher <teos.watcher.Watcher>`.
|
||||
responder_queue (:obj:`Queue`): a queue to send new best tips to the
|
||||
:obj:`Responder <teos.responder.Responder>`.
|
||||
|
||||
polling_delta (:obj:`int`): time between polls (in seconds).
|
||||
max_block_window_size (:obj:`int`): max size of last_tips.
|
||||
block_processor (:obj:`BlockProcessor <teos.block_processor.BlockProcessor>`): a blockProcessor instance.
|
||||
"""
|
||||
|
||||
def __init__(self, watcher_queue, responder_queue):
|
||||
def __init__(self, watcher_queue, responder_queue, block_processor, bitcoind_feed_params):
|
||||
self.best_tip = None
|
||||
self.last_tips = []
|
||||
self.terminate = False
|
||||
@@ -48,11 +52,22 @@ class ChainMonitor:
|
||||
self.zmqSubSocket = self.zmqContext.socket(zmq.SUB)
|
||||
self.zmqSubSocket.setsockopt(zmq.RCVHWM, 0)
|
||||
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock")
|
||||
self.zmqSubSocket.connect("%s://%s:%s" % (FEED_PROTOCOL, FEED_ADDR, FEED_PORT))
|
||||
self.zmqSubSocket.connect(
|
||||
"%s://%s:%s"
|
||||
% (
|
||||
bitcoind_feed_params.get("FEED_PROTOCOL"),
|
||||
bitcoind_feed_params.get("FEED_CONNECT"),
|
||||
bitcoind_feed_params.get("FEED_PORT"),
|
||||
)
|
||||
)
|
||||
|
||||
self.watcher_queue = watcher_queue
|
||||
self.responder_queue = responder_queue
|
||||
|
||||
self.polling_delta = 60
|
||||
self.max_block_window_size = 10
|
||||
self.block_processor = block_processor
|
||||
|
||||
def notify_subscribers(self, block_hash):
|
||||
"""
|
||||
Notifies the subscribers (``Watcher`` and ``Responder``) about a new block. It does so by putting the hash in
|
||||
@@ -66,14 +81,13 @@ class ChainMonitor:
|
||||
self.watcher_queue.put(block_hash)
|
||||
self.responder_queue.put(block_hash)
|
||||
|
||||
def update_state(self, block_hash, max_block_window_size=BLOCK_WINDOW_SIZE):
|
||||
def update_state(self, block_hash):
|
||||
"""
|
||||
Updates the state of the ``ChainMonitor``. The state is represented as the ``best_tip`` and the list of
|
||||
``last_tips``. ``last_tips`` is bounded to ``max_block_window_size``.
|
||||
|
||||
Args:
|
||||
block_hash (:obj:`block_hash`): the new best tip.
|
||||
max_block_window_size (:obj:`int`): the maximum length of the ``last_tips`` list.
|
||||
|
||||
Returns:
|
||||
(:obj:`bool`): ``True`` is the state was successfully updated, ``False`` otherwise.
|
||||
@@ -83,7 +97,7 @@ class ChainMonitor:
|
||||
self.last_tips.append(self.best_tip)
|
||||
self.best_tip = block_hash
|
||||
|
||||
if len(self.last_tips) > max_block_window_size:
|
||||
if len(self.last_tips) > self.max_block_window_size:
|
||||
self.last_tips.pop(0)
|
||||
|
||||
return True
|
||||
@@ -91,22 +105,19 @@ class ChainMonitor:
|
||||
else:
|
||||
return False
|
||||
|
||||
def monitor_chain_polling(self, polling_delta=POLLING_DELTA):
|
||||
def monitor_chain_polling(self):
|
||||
"""
|
||||
Monitors ``bitcoind`` via polling. Once the method is fired, it keeps monitoring as long as ``terminate`` is not
|
||||
set. Polling is performed once every ``polling_delta`` seconds. If a new best tip if found, the shared lock is
|
||||
acquired, the state is updated and the subscribers are notified, and finally the lock is released.
|
||||
|
||||
Args:
|
||||
polling_delta (:obj:`int`): the time delta between polls.
|
||||
"""
|
||||
|
||||
while not self.terminate:
|
||||
self.check_tip.wait(timeout=polling_delta)
|
||||
self.check_tip.wait(timeout=self.polling_delta)
|
||||
|
||||
# Terminate could have been set while the thread was blocked in wait
|
||||
if not self.terminate:
|
||||
current_tip = BlockProcessor.get_best_block_hash()
|
||||
current_tip = self.block_processor.get_best_block_hash()
|
||||
|
||||
self.lock.acquire()
|
||||
if self.update_state(current_tip):
|
||||
@@ -138,16 +149,13 @@ class ChainMonitor:
|
||||
logger.info("New block received via zmq", block_hash=block_hash)
|
||||
self.lock.release()
|
||||
|
||||
def monitor_chain(self, polling_delta=POLLING_DELTA):
|
||||
def monitor_chain(self):
|
||||
"""
|
||||
Main :class:`ChainMonitor` method. It initializes the ``best_tip`` to the current one (by querying the
|
||||
:obj:`BlockProcessor <teos.block_processor.BlockProcessor>`) and creates two threads, one per each monitoring
|
||||
approach (``zmq`` and ``polling``).
|
||||
|
||||
Args:
|
||||
polling_delta (:obj:`int`): the time delta between polls by the ``monitor_chain_polling`` thread.
|
||||
"""
|
||||
|
||||
self.best_tip = BlockProcessor.get_best_block_hash()
|
||||
Thread(target=self.monitor_chain_polling, daemon=True, kwargs={"polling_delta": polling_delta}).start()
|
||||
self.best_tip = self.block_processor.get_best_block_hash()
|
||||
Thread(target=self.monitor_chain_polling, daemon=True).start()
|
||||
Thread(target=self.monitor_chain_zmq, daemon=True).start()
|
||||
|
||||
Reference in New Issue
Block a user