mirror of
https://github.com/aljazceru/python-teos.git
synced 2025-12-18 06:34:19 +01:00
Merge pull request #82 from sr-gi/chainmonitor
Implements the ChainMonitor
This commit is contained in:
16
pisa/api.py
16
pisa/api.py
@@ -25,14 +25,14 @@ class API:
|
||||
"""
|
||||
Main endpoint of the Watchtower.
|
||||
|
||||
The client sends requests (appointments) to this endpoint to request a job to the Watchtower. Requests must be json
|
||||
encoded and contain an ``appointment`` field and optionally a ``signature`` and ``public_key`` fields.
|
||||
The client sends requests (appointments) to this endpoint to request a job to the Watchtower. Requests must be
|
||||
json encoded and contain an ``appointment`` field and optionally a ``signature`` and ``public_key`` fields.
|
||||
|
||||
Returns:
|
||||
:obj:`tuple`: A tuple containing the response (``json``) and response code (``int``). For accepted appointments,
|
||||
the ``rcode`` is always 0 and the response contains the signed receipt. For rejected appointments, the ``rcode``
|
||||
is a negative value and the response contains the error message. Error messages can be found at
|
||||
:mod:`Errors <pisa.errors>`.
|
||||
:obj:`tuple`: A tuple containing the response (``json``) and response code (``int``). For accepted
|
||||
appointments, the ``rcode`` is always 0 and the response contains the signed receipt. For rejected
|
||||
appointments, the ``rcode`` is a negative value and the response contains the error message. Error messages
|
||||
can be found at :mod:`Errors <pisa.errors>`.
|
||||
"""
|
||||
|
||||
remote_addr = request.environ.get("REMOTE_ADDR")
|
||||
@@ -167,8 +167,8 @@ class API:
|
||||
"""
|
||||
Provides the block height of the Watchtower.
|
||||
|
||||
This is a testing endpoint that (most likely) will be removed in production. Its purpose is to give information to
|
||||
testers about the current block so they can define a dummy appointment without having to run a bitcoin node.
|
||||
This is a testing endpoint that (most likely) will be removed in production. Its purpose is to give information
|
||||
to testers about the current block so they can define a dummy appointment without having to run a bitcoin node.
|
||||
|
||||
Returns:
|
||||
:obj:`dict`: A json encoded dictionary containing the block height.
|
||||
|
||||
182
pisa/chain_monitor.py
Normal file
182
pisa/chain_monitor.py
Normal file
@@ -0,0 +1,182 @@
|
||||
import zmq
|
||||
import binascii
|
||||
from threading import Thread, Event, Condition
|
||||
|
||||
from common.logger import Logger
|
||||
from pisa.conf import FEED_PROTOCOL, FEED_ADDR, FEED_PORT, POLLING_DELTA, BLOCK_WINDOW_SIZE
|
||||
from pisa.block_processor import BlockProcessor
|
||||
|
||||
logger = Logger("ChainMonitor")
|
||||
|
||||
|
||||
class ChainMonitor:
|
||||
"""
|
||||
The :class:`ChainMonitor` is the class in charge of monitoring the blockchain (via ``bitcoind``) to detect new
|
||||
blocks on top of the best chain. If a new best block is spotted, the chain monitor will notify the
|
||||
:obj:`Watcher <pisa.watcher.Watcher>` and the :obj:`Responder <pisa.responder.Responder>` using ``Queues``.
|
||||
|
||||
The :class:`ChainMonitor` monitors the chain using two methods: ``zmq`` and ``polling``. Blocks are only notified
|
||||
once per queue and the notification is triggered by the method that detects the block faster.
|
||||
|
||||
Attributes:
|
||||
best_tip (:obj:`str`): a block hash representing the current best tip.
|
||||
last_tips (:obj:`list`): a list of last chain tips. Used as a sliding window to avoid notifying about old tips.
|
||||
terminate (:obj:`bool`): a flag to signal the termination of the :class:`ChainMonitor` (shutdown the tower).
|
||||
check_tip (:obj:`Event`): an event that's triggered at fixed time intervals and controls the polling thread.
|
||||
lock (:obj:`Condition`): a lock used to protect concurrent access to the queues and ``best_tip`` by the zmq and
|
||||
polling threads.
|
||||
zmqSubSocket (:obj:`socket`): a socket to connect to ``bitcoind`` via ``zmq``.
|
||||
watcher_queue (:obj:`Queue`): a queue to send new best tips to the :obj:`Watcher <pisa.watcher.Watcher>`.
|
||||
responder_queue (:obj:`Queue`): a queue to send new best tips to the
|
||||
:obj:`Responder <pisa.responder.Responder>`.
|
||||
watcher_asleep (:obj:`bool`): a flag that signals whether to send information to the ``Watcher`` or not.
|
||||
responder_asleep (:obj:`bool`): a flag that signals whether to send information to the ``Responder`` or not.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.best_tip = None
|
||||
self.last_tips = []
|
||||
self.terminate = False
|
||||
|
||||
self.check_tip = Event()
|
||||
self.lock = Condition()
|
||||
|
||||
self.zmqContext = zmq.Context()
|
||||
self.zmqSubSocket = self.zmqContext.socket(zmq.SUB)
|
||||
self.zmqSubSocket.setsockopt(zmq.RCVHWM, 0)
|
||||
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock")
|
||||
self.zmqSubSocket.connect("%s://%s:%s" % (FEED_PROTOCOL, FEED_ADDR, FEED_PORT))
|
||||
|
||||
self.watcher_queue = None
|
||||
self.responder_queue = None
|
||||
self.watcher_asleep = True
|
||||
self.responder_asleep = True
|
||||
|
||||
def attach_watcher(self, queue, asleep):
|
||||
"""
|
||||
Attaches a :obj:`Watcher <pisa.watcher.Watcher>` to the :class:`ChainMonitor`. The ``Watcher`` and the
|
||||
``ChainMonitor`` are connected via the ``watcher_queue`` and the ``watcher_asleep`` flag.
|
||||
|
||||
Args:
|
||||
queue (:obj:`Queue`): the queue to be used to send blocks hashes to the ``Watcher``.
|
||||
asleep( :obj:`bool`): whether the ``Watcher`` is initially awake or asleep. It is changed on the fly from
|
||||
the ``Watcher`` when the state changes.
|
||||
"""
|
||||
|
||||
self.watcher_queue = queue
|
||||
self.watcher_asleep = asleep
|
||||
|
||||
def attach_responder(self, queue, asleep):
|
||||
"""
|
||||
Attaches a :obj:`Responder <pisa.responder.Responder>` to the :class:`ChainMonitor`. The ``Responder`` and the
|
||||
``ChainMonitor`` are connected via the ``responder_queue`` and the ``responder_asleep`` flag.
|
||||
|
||||
Args:
|
||||
queue (:obj:`Queue`): the queue to be used to send blocks hashes to the ``Responder``.
|
||||
asleep( :obj:`bool`): whether the ``Responder`` is initially awake or asleep. It is changed on the fly from
|
||||
the ``Responder`` when the state changes.
|
||||
"""
|
||||
|
||||
self.responder_queue = queue
|
||||
self.responder_asleep = asleep
|
||||
|
||||
def notify_subscribers(self, block_hash):
|
||||
"""
|
||||
Notifies the subscribers (``Watcher`` and ``Responder``) about a new block provided they are awake. It does so
|
||||
by putting the hash in the corresponding queue(s).
|
||||
|
||||
Args:
|
||||
block_hash (:obj:`str`): the new block hash to be sent to the awake subscribers.
|
||||
"""
|
||||
|
||||
if not self.watcher_asleep:
|
||||
self.watcher_queue.put(block_hash)
|
||||
|
||||
if not self.responder_asleep:
|
||||
self.responder_queue.put(block_hash)
|
||||
|
||||
def update_state(self, block_hash, max_block_window_size=BLOCK_WINDOW_SIZE):
|
||||
"""
|
||||
Updates the state of the ``ChainMonitor``. The state is represented as the ``best_tip`` and the list of
|
||||
``last_tips``. ``last_tips`` is bounded to ``max_block_window_size``.
|
||||
|
||||
Args:
|
||||
block_hash (:obj:`block_hash`): the new best tip.
|
||||
max_block_window_size (:obj:`int`): the maximum length of the ``last_tips`` list.
|
||||
|
||||
Returns:
|
||||
(:obj:`bool`): ``True`` is the state was successfully updated, ``False`` otherwise.
|
||||
"""
|
||||
|
||||
if block_hash != self.best_tip and block_hash not in self.last_tips:
|
||||
self.last_tips.append(self.best_tip)
|
||||
self.best_tip = block_hash
|
||||
|
||||
if len(self.last_tips) > max_block_window_size:
|
||||
self.last_tips.pop(0)
|
||||
|
||||
return True
|
||||
|
||||
else:
|
||||
return False
|
||||
|
||||
def monitor_chain_polling(self, polling_delta=POLLING_DELTA):
|
||||
"""
|
||||
Monitors ``bitcoind`` via polling. Once the method is fired, it keeps monitoring as long as ``terminate`` is not
|
||||
set. Polling is performed once every ``polling_delta`` seconds. If a new best tip if found, the shared lock is
|
||||
acquired, the state is updated and the subscribers are notified, and finally the lock is released.
|
||||
|
||||
Args:
|
||||
polling_delta (:obj:`int`): the time delta between polls.
|
||||
"""
|
||||
|
||||
while not self.terminate:
|
||||
self.check_tip.wait(timeout=polling_delta)
|
||||
|
||||
# Terminate could have been set while the thread was blocked in wait
|
||||
if not self.terminate:
|
||||
current_tip = BlockProcessor.get_best_block_hash()
|
||||
|
||||
self.lock.acquire()
|
||||
if self.update_state(current_tip):
|
||||
self.notify_subscribers(current_tip)
|
||||
logger.info("New block received via polling", block_hash=current_tip)
|
||||
self.lock.release()
|
||||
|
||||
def monitor_chain_zmq(self):
|
||||
"""
|
||||
Monitors ``bitcoind`` via zmq. Once the method is fired, it keeps monitoring as long as ``terminate`` is not
|
||||
set. If a new best tip if found, the shared lock is acquired, the state is updated and the subscribers are
|
||||
notified, and finally the lock is released.
|
||||
"""
|
||||
|
||||
while not self.terminate:
|
||||
msg = self.zmqSubSocket.recv_multipart()
|
||||
|
||||
# Terminate could have been set while the thread was blocked in recv
|
||||
if not self.terminate:
|
||||
topic = msg[0]
|
||||
body = msg[1]
|
||||
|
||||
if topic == b"hashblock":
|
||||
block_hash = binascii.hexlify(body).decode("utf-8")
|
||||
|
||||
self.lock.acquire()
|
||||
if self.update_state(block_hash):
|
||||
self.notify_subscribers(block_hash)
|
||||
logger.info("New block received via zmq", block_hash=block_hash)
|
||||
self.lock.release()
|
||||
|
||||
def monitor_chain(self, polling_delta=POLLING_DELTA):
|
||||
"""
|
||||
Main :class:`ChainMonitor` method. It initializes the ``best_tip`` to the current one (by querying the
|
||||
:obj:`BlockProcessor <pisa.block_processor.BlockProcessor>`) and creates two threads, one per each monitoring
|
||||
approach (``zmq`` and ``polling``).
|
||||
|
||||
Args:
|
||||
polling_delta (:obj:`int`): the time delta between polls by the ``monitor_chain_polling`` thread.
|
||||
"""
|
||||
|
||||
self.best_tip = BlockProcessor.get_best_block_hash()
|
||||
Thread(target=self.monitor_chain_polling, daemon=True, kwargs={"polling_delta": polling_delta}).start()
|
||||
Thread(target=self.monitor_chain_zmq, daemon=True).start()
|
||||
@@ -7,8 +7,8 @@ from pisa.api import API
|
||||
from pisa.watcher import Watcher
|
||||
from pisa.builder import Builder
|
||||
import pisa.conf as conf
|
||||
from pisa.responder import Responder
|
||||
from pisa.db_manager import DBManager
|
||||
from pisa.chain_monitor import ChainMonitor
|
||||
from pisa.block_processor import BlockProcessor
|
||||
from pisa.tools import can_connect_to_bitcoind, in_correct_network
|
||||
|
||||
@@ -18,6 +18,7 @@ logger = Logger("Daemon")
|
||||
def handle_signals(signal_received, frame):
|
||||
logger.info("Closing connection with appointments db")
|
||||
db_manager.db.close()
|
||||
chain_monitor.terminate = True
|
||||
|
||||
logger.info("Shutting down PISA")
|
||||
exit(0)
|
||||
@@ -94,13 +95,19 @@ if __name__ == "__main__":
|
||||
try:
|
||||
db_manager = DBManager(pisa_config.get("DB_PATH"))
|
||||
|
||||
# Create the chain monitor and start monitoring the chain
|
||||
chain_monitor = ChainMonitor()
|
||||
chain_monitor.monitor_chain()
|
||||
|
||||
watcher_appointments_data = db_manager.load_watcher_appointments()
|
||||
responder_trackers_data = db_manager.load_responder_trackers()
|
||||
|
||||
with open(pisa_config.get("PISA_SECRET_KEY"), "rb") as key_file:
|
||||
secret_key_der = key_file.read()
|
||||
|
||||
watcher = Watcher(db_manager, secret_key_der, config=pisa_config)
|
||||
watcher = Watcher(db_manager, chain_monitor, secret_key_der, pisa_config)
|
||||
chain_monitor.attach_watcher(watcher.block_queue, watcher.asleep)
|
||||
chain_monitor.attach_responder(watcher.responder.block_queue, watcher.responder.asleep)
|
||||
|
||||
if len(watcher_appointments_data) == 0 and len(responder_trackers_data) == 0:
|
||||
logger.info("Fresh bootstrap")
|
||||
@@ -113,7 +120,6 @@ if __name__ == "__main__":
|
||||
last_block_responder = db_manager.load_last_block_hash_responder()
|
||||
|
||||
# FIXME: 32-reorgs-offline dropped txs are not used at this point.
|
||||
responder = Responder(db_manager, pisa_config)
|
||||
last_common_ancestor_responder = None
|
||||
missed_blocks_responder = None
|
||||
|
||||
@@ -124,12 +130,12 @@ if __name__ == "__main__":
|
||||
)
|
||||
missed_blocks_responder = block_processor.get_missed_blocks(last_common_ancestor_responder)
|
||||
|
||||
responder.trackers, responder.tx_tracker_map = Builder.build_trackers(responder_trackers_data)
|
||||
responder.block_queue = Builder.build_block_queue(missed_blocks_responder)
|
||||
watcher.responder.trackers, watcher.responder.tx_tracker_map = Builder.build_trackers(
|
||||
responder_trackers_data
|
||||
)
|
||||
watcher.responder.block_queue = Builder.build_block_queue(missed_blocks_responder)
|
||||
|
||||
# Build Watcher with Responder and backed up data. If the blocks of both match we don't perform the
|
||||
# search twice.
|
||||
watcher.responder = responder
|
||||
# Build Watcher. If the blocks of both match we don't perform the search twice.
|
||||
if last_block_watcher is not None:
|
||||
if last_block_watcher == last_block_responder:
|
||||
missed_blocks_watcher = missed_blocks_responder
|
||||
|
||||
@@ -6,7 +6,6 @@ from common.logger import Logger
|
||||
from pisa.cleaner import Cleaner
|
||||
from pisa.carrier import Carrier
|
||||
from pisa.block_processor import BlockProcessor
|
||||
from pisa.utils.zmq_subscriber import ZMQSubscriber
|
||||
|
||||
CONFIRMATIONS_BEFORE_RETRY = 6
|
||||
MIN_CONFIRMATIONS = 6
|
||||
@@ -127,23 +126,22 @@ class Responder:
|
||||
has missed. Used to trigger rebroadcast if needed.
|
||||
asleep (:obj:`bool`): A flag that signals whether the :obj:`Responder` is asleep or awake.
|
||||
block_queue (:obj:`Queue`): A queue used by the :obj:`Responder` to receive block hashes from ``bitcoind``. It
|
||||
is populated by the :obj:`ZMQSubscriber <pisa.utils.zmq_subscriber.ZMQSubscriber>`.
|
||||
zmq_subscriber (:obj:`ZMQSubscriber <pisa.utils.zmq_subscriber.ZMQSubscriber>`): a ``ZMQSubscriber`` instance
|
||||
used to receive new block notifications from ``bitcoind``.
|
||||
is populated by the :obj:`ChainMonitor <pisa.chain_monitor.ChainMonitor>`.
|
||||
chain_monitor (:obj:`ChainMonitor <pisa.chain_monitor.ChainMonitor>`): a ``ChainMonitor`` instance used to track
|
||||
new blocks received by ``bitcoind``.
|
||||
db_manager (:obj:`DBManager <pisa.db_manager.DBManager>`): A ``DBManager`` instance to interact with the
|
||||
database.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, db_manager, config):
|
||||
def __init__(self, db_manager, chain_monitor):
|
||||
self.trackers = dict()
|
||||
self.tx_tracker_map = dict()
|
||||
self.unconfirmed_txs = []
|
||||
self.missed_confirmations = dict()
|
||||
self.asleep = True
|
||||
self.block_queue = Queue()
|
||||
self.config = config
|
||||
self.zmq_subscriber = None
|
||||
self.chain_monitor = chain_monitor
|
||||
self.db_manager = db_manager
|
||||
|
||||
@staticmethod
|
||||
@@ -225,8 +223,7 @@ class Responder:
|
||||
The :obj:`TransactionTracker` is stored in ``trackers`` and ``tx_tracker_map`` and the ``penalty_txid`` added to
|
||||
``unconfirmed_txs`` if ``confirmations=0``. Finally, the data is also stored in the database.
|
||||
|
||||
``add_tracker`` awakes the :obj:`Responder` and creates a connection with the
|
||||
:obj:`ZMQSubscriber <pisa.utils.zmq_subscriber.ZMQSubscriber>` if he is asleep.
|
||||
``add_tracker`` awakes the :obj:`Responder` if it is asleep.
|
||||
|
||||
Args:
|
||||
uuid (:obj:`str`): a unique identifier for the appointment.
|
||||
@@ -261,19 +258,8 @@ class Responder:
|
||||
|
||||
if self.asleep:
|
||||
self.asleep = False
|
||||
zmq_thread = Thread(target=self.do_subscribe)
|
||||
responder = Thread(target=self.do_watch)
|
||||
zmq_thread.start()
|
||||
responder.start()
|
||||
|
||||
def do_subscribe(self):
|
||||
"""
|
||||
Initializes a :obj:`ZMQSubscriber <pisa.utils.zmq_subscriber.ZMQSubscriber>` instance to listen to new blocks
|
||||
from ``bitcoind``. Block ids are received trough the ``block_queue``.
|
||||
"""
|
||||
|
||||
self.zmq_subscriber = ZMQSubscriber(self.config, parent="Responder")
|
||||
self.zmq_subscriber.handle(self.block_queue)
|
||||
self.chain_monitor.responder_asleep = False
|
||||
Thread(target=self.do_watch).start()
|
||||
|
||||
def do_watch(self):
|
||||
"""
|
||||
@@ -328,8 +314,7 @@ class Responder:
|
||||
|
||||
# Go back to sleep if there are no more pending trackers
|
||||
self.asleep = True
|
||||
self.zmq_subscriber.terminate = True
|
||||
self.block_queue = Queue()
|
||||
self.chain_monitor.responder_asleep = True
|
||||
|
||||
logger.info("No more pending trackers, going back to sleep")
|
||||
|
||||
@@ -482,9 +467,6 @@ class Responder:
|
||||
|
||||
else:
|
||||
# If the penalty transaction is missing, we need to reset the tracker.
|
||||
# DISCUSS: Adding tracker back, should we flag it as retried?
|
||||
# FIXME: Whether we decide to increase the retried counter or not, the current counter should be
|
||||
# maintained. There is no way of doing so with the current approach. Update if required
|
||||
self.handle_breach(
|
||||
tracker.locator,
|
||||
uuid,
|
||||
|
||||
@@ -5,6 +5,10 @@ BTC_RPC_HOST = "localhost"
|
||||
BTC_RPC_PORT = 18443
|
||||
BTC_NETWORK = "regtest"
|
||||
|
||||
# CHAIN MONITOR
|
||||
POLLING_DELTA = 60
|
||||
BLOCK_WINDOW_SIZE = 10
|
||||
|
||||
# ZMQ
|
||||
FEED_PROTOCOL = "tcp"
|
||||
FEED_ADDR = "127.0.0.1"
|
||||
|
||||
@@ -9,7 +9,6 @@ from common.logger import Logger
|
||||
from pisa.cleaner import Cleaner
|
||||
from pisa.responder import Responder
|
||||
from pisa.block_processor import BlockProcessor
|
||||
from pisa.utils.zmq_subscriber import ZMQSubscriber
|
||||
|
||||
logger = Logger("Watcher")
|
||||
|
||||
@@ -27,15 +26,17 @@ class Watcher:
|
||||
If an appointment reaches its end with no breach, the data is simply deleted.
|
||||
|
||||
The :class:`Watcher` receives information about new received blocks via the ``block_queue`` that is populated by the
|
||||
:obj:`ZMQSubscriber <pisa.utils.zmq_subscriber>`.
|
||||
:obj:`ChainMonitor <pisa.chain_monitor.ChainMonitor>`.
|
||||
|
||||
Args:
|
||||
db_manager (:obj:`DBManager <pisa.db_manager>`): a ``DBManager`` instance to interact with the database.
|
||||
chain_monitor (:obj:`ChainMonitor <pisa.chain_monitor.ChainMonitor>`): a ``ChainMonitor`` instance used to track
|
||||
new blocks received by ``bitcoind``.
|
||||
sk_der (:obj:`bytes`): a DER encoded private key used to sign appointment receipts (signaling acceptance).
|
||||
config (:obj:`dict`): a dictionary containing all the configuration parameters. Used locally to retrieve
|
||||
``MAX_APPOINTMENTS`` and ``EXPIRY_DELTA``.
|
||||
responder (:obj:`Responder <pisa.responder.Responder>`): a ``Responder`` instance. If ``None`` is passed, a new
|
||||
instance is created. Populated instances are useful when bootstrapping the system from backed-up data.
|
||||
max_appointments(:obj:`int`): the maximum amount of appointments that the :obj:`Watcher` will keep at any given
|
||||
time. Defaults to ``MAX_APPOINTMENTS``.
|
||||
|
||||
|
||||
Attributes:
|
||||
@@ -45,30 +46,31 @@ class Watcher:
|
||||
appointments with the same ``locator``.
|
||||
asleep (:obj:`bool`): A flag that signals whether the :obj:`Watcher` is asleep or awake.
|
||||
block_queue (:obj:`Queue`): A queue used by the :obj:`Watcher` to receive block hashes from ``bitcoind``. It is
|
||||
populated by the :obj:`ZMQSubscriber <pisa.utils.zmq_subscriber.ZMQSubscriber>`.
|
||||
max_appointments(:obj:`int`): the maximum amount of appointments that the :obj:`Watcher` will keep at any given
|
||||
time.
|
||||
zmq_subscriber (:obj:`ZMQSubscriber <pisa.utils.zmq_subscriber.ZMQSubscriber>`): a ZMQSubscriber instance used
|
||||
to receive new block notifications from ``bitcoind``.
|
||||
populated by the :obj:`ChainMonitor <pisa.chain_monitor.ChainMonitor>`.
|
||||
chain_monitor (:obj:`ChainMonitor <pisa.chain_monitor.ChainMonitor>`): a ``ChainMonitor`` instance used to track
|
||||
new blocks received by ``bitcoind``.
|
||||
config (:obj:`dict`): a dictionary containing all the configuration parameters. Used locally to retrieve
|
||||
``MAX_APPOINTMENTS`` and ``EXPIRY_DELTA``.
|
||||
db_manager (:obj:`DBManager <pisa.db_manager>`): A db manager instance to interact with the database.
|
||||
signing_key (:mod:`EllipticCurvePrivateKey`): a private key used to sign accepted appointments.
|
||||
|
||||
Raises:
|
||||
ValueError: if `pisa_sk_file` is not found.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, db_manager, sk_der, config, responder=None):
|
||||
def __init__(self, db_manager, chain_monitor, sk_der, config, responder=None):
|
||||
self.appointments = dict()
|
||||
self.locator_uuid_map = dict()
|
||||
self.asleep = True
|
||||
self.block_queue = Queue()
|
||||
self.chain_monitor = chain_monitor
|
||||
self.config = config
|
||||
self.zmq_subscriber = None
|
||||
self.db_manager = db_manager
|
||||
self.signing_key = Cryptographer.load_private_key_der(sk_der)
|
||||
|
||||
if not isinstance(responder, Responder):
|
||||
self.responder = Responder(db_manager, self.config)
|
||||
self.responder = Responder(db_manager, chain_monitor)
|
||||
|
||||
def add_appointment(self, appointment):
|
||||
"""
|
||||
@@ -112,10 +114,8 @@ class Watcher:
|
||||
|
||||
if self.asleep:
|
||||
self.asleep = False
|
||||
zmq_thread = Thread(target=self.do_subscribe)
|
||||
watcher = Thread(target=self.do_watch)
|
||||
zmq_thread.start()
|
||||
watcher.start()
|
||||
self.chain_monitor.watcher_asleep = False
|
||||
Thread(target=self.do_watch).start()
|
||||
|
||||
logger.info("Waking up")
|
||||
|
||||
@@ -136,15 +136,6 @@ class Watcher:
|
||||
|
||||
return appointment_added, signature
|
||||
|
||||
def do_subscribe(self):
|
||||
"""
|
||||
Initializes a ``ZMQSubscriber`` instance to listen to new blocks from ``bitcoind``. Block ids are received
|
||||
trough the ``block_queue``.
|
||||
"""
|
||||
|
||||
self.zmq_subscriber = ZMQSubscriber(self.config, parent="Watcher")
|
||||
self.zmq_subscriber.handle(self.block_queue)
|
||||
|
||||
def do_watch(self):
|
||||
"""
|
||||
Monitors the blockchain whilst there are pending appointments.
|
||||
@@ -206,8 +197,7 @@ class Watcher:
|
||||
|
||||
# Go back to sleep if there are no more appointments
|
||||
self.asleep = True
|
||||
self.zmq_subscriber.terminate = True
|
||||
self.block_queue = Queue()
|
||||
self.chain_monitor.watcher_asleep = True
|
||||
|
||||
logger.info("No more pending appointments, going back to sleep")
|
||||
|
||||
|
||||
@@ -12,9 +12,9 @@ from cryptography.hazmat.primitives import serialization
|
||||
|
||||
from apps.cli.blob import Blob
|
||||
from pisa.responder import TransactionTracker
|
||||
from pisa.watcher import Watcher
|
||||
from pisa.tools import bitcoin_cli
|
||||
from pisa.db_manager import DBManager
|
||||
from pisa.chain_monitor import ChainMonitor
|
||||
from common.appointment import Appointment
|
||||
from common.tools import compute_locator
|
||||
|
||||
@@ -51,6 +51,17 @@ def db_manager():
|
||||
rmtree("test_db")
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def chain_monitor():
|
||||
chain_monitor = ChainMonitor()
|
||||
chain_monitor.monitor_chain()
|
||||
|
||||
yield chain_monitor
|
||||
|
||||
chain_monitor.terminate = True
|
||||
generate_block()
|
||||
|
||||
|
||||
def generate_keypair():
|
||||
client_sk = ec.generate_private_key(ec.SECP256K1, default_backend())
|
||||
client_pk = client_sk.public_key()
|
||||
|
||||
@@ -32,14 +32,17 @@ config = get_config()
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def run_api(db_manager):
|
||||
def run_api(db_manager, chain_monitor):
|
||||
sk, pk = generate_keypair()
|
||||
sk_der = sk.private_bytes(
|
||||
encoding=serialization.Encoding.DER,
|
||||
format=serialization.PrivateFormat.TraditionalOpenSSL,
|
||||
encryption_algorithm=serialization.NoEncryption(),
|
||||
)
|
||||
watcher = Watcher(db_manager, sk_der, get_config())
|
||||
|
||||
watcher = Watcher(db_manager, chain_monitor, sk_der, get_config())
|
||||
chain_monitor.attach_watcher(watcher.block_queue, watcher.asleep)
|
||||
chain_monitor.attach_responder(watcher.responder.block_queue, watcher.responder.asleep)
|
||||
|
||||
api_thread = Thread(target=API(watcher, config).start)
|
||||
api_thread.daemon = True
|
||||
|
||||
225
test/pisa/unit/test_chain_monitor.py
Normal file
225
test/pisa/unit/test_chain_monitor.py
Normal file
@@ -0,0 +1,225 @@
|
||||
import zmq
|
||||
import time
|
||||
from threading import Thread, Event, Condition
|
||||
|
||||
from pisa.watcher import Watcher
|
||||
from pisa.responder import Responder
|
||||
from pisa.block_processor import BlockProcessor
|
||||
from pisa.chain_monitor import ChainMonitor
|
||||
|
||||
from test.pisa.unit.conftest import get_random_value_hex, generate_block, get_config
|
||||
|
||||
|
||||
def test_init(run_bitcoind):
|
||||
# run_bitcoind is started here instead of later on to avoid race conditions while it initializes
|
||||
|
||||
# Not much to test here, just sanity checks to make sure nothing goes south in the future
|
||||
chain_monitor = ChainMonitor()
|
||||
|
||||
assert chain_monitor.best_tip is None
|
||||
assert isinstance(chain_monitor.last_tips, list) and len(chain_monitor.last_tips) == 0
|
||||
assert chain_monitor.terminate is False
|
||||
assert isinstance(chain_monitor.check_tip, Event)
|
||||
assert isinstance(chain_monitor.lock, Condition)
|
||||
assert isinstance(chain_monitor.zmqSubSocket, zmq.Socket)
|
||||
|
||||
# The Queues and asleep flags are initialized when attaching the corresponding subscriber
|
||||
assert chain_monitor.watcher_queue is None
|
||||
assert chain_monitor.responder_queue is None
|
||||
assert chain_monitor.watcher_asleep and chain_monitor.responder_asleep
|
||||
|
||||
|
||||
def test_attach_watcher(chain_monitor):
|
||||
watcher = Watcher(db_manager=None, chain_monitor=chain_monitor, sk_der=None, config=get_config())
|
||||
chain_monitor.attach_watcher(watcher.block_queue, watcher.asleep)
|
||||
|
||||
# booleans are not passed as reference in Python, so the flags need to be set separately
|
||||
assert watcher.asleep == chain_monitor.watcher_asleep
|
||||
watcher.asleep = False
|
||||
assert chain_monitor.watcher_asleep != watcher.asleep
|
||||
|
||||
# Test that the Queue work
|
||||
r_hash = get_random_value_hex(32)
|
||||
chain_monitor.watcher_queue.put(r_hash)
|
||||
assert watcher.block_queue.get() == r_hash
|
||||
|
||||
|
||||
def test_attach_responder(chain_monitor):
|
||||
responder = Responder(db_manager=None, chain_monitor=chain_monitor)
|
||||
chain_monitor.attach_responder(responder.block_queue, responder.asleep)
|
||||
|
||||
# Same kind of testing as with the attach watcher
|
||||
assert responder.asleep == chain_monitor.watcher_asleep
|
||||
responder.asleep = False
|
||||
assert chain_monitor.watcher_asleep != responder.asleep
|
||||
|
||||
r_hash = get_random_value_hex(32)
|
||||
chain_monitor.responder_queue.put(r_hash)
|
||||
assert responder.block_queue.get() == r_hash
|
||||
|
||||
|
||||
def test_notify_subscribers(chain_monitor):
|
||||
# Subscribers are only notified as long as they are awake
|
||||
new_block = get_random_value_hex(32)
|
||||
|
||||
# Queues should be empty to start with
|
||||
assert chain_monitor.watcher_queue.empty()
|
||||
assert chain_monitor.responder_queue.empty()
|
||||
|
||||
chain_monitor.watcher_asleep = True
|
||||
chain_monitor.responder_asleep = True
|
||||
chain_monitor.notify_subscribers(new_block)
|
||||
|
||||
# And remain empty afterwards since both subscribers were asleep
|
||||
assert chain_monitor.watcher_queue.empty()
|
||||
assert chain_monitor.responder_queue.empty()
|
||||
|
||||
# Let's flag them as awake and try again
|
||||
chain_monitor.watcher_asleep = False
|
||||
chain_monitor.responder_asleep = False
|
||||
chain_monitor.notify_subscribers(new_block)
|
||||
|
||||
assert chain_monitor.watcher_queue.get() == new_block
|
||||
assert chain_monitor.responder_queue.get() == new_block
|
||||
|
||||
|
||||
def test_update_state(chain_monitor):
|
||||
# The state is updated after receiving a new block (and only if the block is not already known).
|
||||
# Let's start by setting a best_tip and a couple of old tips
|
||||
new_block_hash = get_random_value_hex(32)
|
||||
chain_monitor.best_tip = new_block_hash
|
||||
chain_monitor.last_tips = [get_random_value_hex(32) for _ in range(5)]
|
||||
|
||||
# Now we can try to update the state with an old best_tip and see how it doesn't work
|
||||
assert chain_monitor.update_state(chain_monitor.last_tips[0]) is False
|
||||
|
||||
# Same should happen with the current tip
|
||||
assert chain_monitor.update_state(chain_monitor.best_tip) is False
|
||||
|
||||
# The state should be correctly updated with a new block hash, the chain tip should change and the old tip should
|
||||
# have been added to the last_tips
|
||||
another_block_hash = get_random_value_hex(32)
|
||||
assert chain_monitor.update_state(another_block_hash) is True
|
||||
assert chain_monitor.best_tip == another_block_hash and new_block_hash == chain_monitor.last_tips[-1]
|
||||
|
||||
|
||||
def test_monitor_chain_polling():
|
||||
# Try polling with the Watcher
|
||||
chain_monitor = ChainMonitor()
|
||||
chain_monitor.best_tip = BlockProcessor.get_best_block_hash()
|
||||
|
||||
watcher = Watcher(db_manager=None, chain_monitor=chain_monitor, sk_der=None, config=get_config())
|
||||
chain_monitor.attach_watcher(watcher.block_queue, asleep=False)
|
||||
|
||||
# monitor_chain_polling runs until terminate if set
|
||||
polling_thread = Thread(target=chain_monitor.monitor_chain_polling, kwargs={"polling_delta": 0.1}, daemon=True)
|
||||
polling_thread.start()
|
||||
|
||||
# Check that nothing changes as long as a block is not generated
|
||||
for _ in range(5):
|
||||
assert chain_monitor.watcher_queue.empty()
|
||||
time.sleep(0.1)
|
||||
|
||||
# And that it does if we generate a block
|
||||
generate_block()
|
||||
|
||||
chain_monitor.watcher_queue.get()
|
||||
assert chain_monitor.watcher_queue.empty()
|
||||
|
||||
chain_monitor.terminate = True
|
||||
polling_thread.join()
|
||||
|
||||
|
||||
def test_monitor_chain_zmq():
|
||||
# Try zmq with the Responder
|
||||
chain_monitor = ChainMonitor()
|
||||
chain_monitor.best_tip = BlockProcessor.get_best_block_hash()
|
||||
|
||||
responder = Responder(db_manager=None, chain_monitor=chain_monitor)
|
||||
chain_monitor.attach_responder(responder.block_queue, asleep=False)
|
||||
|
||||
zmq_thread = Thread(target=chain_monitor.monitor_chain_zmq, daemon=True)
|
||||
zmq_thread.start()
|
||||
|
||||
# Queues should start empty
|
||||
assert chain_monitor.responder_queue.empty()
|
||||
|
||||
# And have a new block every time we generate one
|
||||
for _ in range(3):
|
||||
generate_block()
|
||||
chain_monitor.responder_queue.get()
|
||||
assert chain_monitor.responder_queue.empty()
|
||||
|
||||
# If we flag it to sleep no notification is sent
|
||||
chain_monitor.responder_asleep = True
|
||||
|
||||
for _ in range(3):
|
||||
generate_block()
|
||||
assert chain_monitor.responder_queue.empty()
|
||||
|
||||
chain_monitor.terminate = True
|
||||
# The zmq thread needs a block generation to release from the recv method.
|
||||
generate_block()
|
||||
|
||||
zmq_thread.join()
|
||||
|
||||
|
||||
def test_monitor_chain():
|
||||
# Not much to test here, this should launch two threads (one per monitor approach) and finish on terminate
|
||||
chain_monitor = ChainMonitor()
|
||||
|
||||
watcher = Watcher(db_manager=None, chain_monitor=chain_monitor, sk_der=None, config=get_config())
|
||||
responder = Responder(db_manager=None, chain_monitor=chain_monitor)
|
||||
chain_monitor.attach_responder(responder.block_queue, asleep=False)
|
||||
chain_monitor.attach_watcher(watcher.block_queue, asleep=False)
|
||||
|
||||
chain_monitor.best_tip = None
|
||||
chain_monitor.monitor_chain()
|
||||
|
||||
# The tip is updated before starting the threads, so it should have changed.
|
||||
assert chain_monitor.best_tip is not None
|
||||
|
||||
# Blocks should be received
|
||||
for _ in range(5):
|
||||
generate_block()
|
||||
watcher_block = chain_monitor.watcher_queue.get()
|
||||
responder_block = chain_monitor.responder_queue.get()
|
||||
assert watcher_block == responder_block
|
||||
assert chain_monitor.watcher_queue.empty()
|
||||
assert chain_monitor.responder_queue.empty()
|
||||
|
||||
# And the thread be terminated on terminate
|
||||
chain_monitor.terminate = True
|
||||
# The zmq thread needs a block generation to release from the recv method.
|
||||
generate_block()
|
||||
|
||||
|
||||
def test_monitor_chain_single_update():
|
||||
# This test tests that if both threads try to add the same block to the queue, only the first one will make it
|
||||
chain_monitor = ChainMonitor()
|
||||
|
||||
watcher = Watcher(db_manager=None, chain_monitor=chain_monitor, sk_der=None, config=get_config())
|
||||
responder = Responder(db_manager=None, chain_monitor=chain_monitor)
|
||||
chain_monitor.attach_responder(responder.block_queue, asleep=False)
|
||||
chain_monitor.attach_watcher(watcher.block_queue, asleep=False)
|
||||
|
||||
chain_monitor.best_tip = None
|
||||
|
||||
# We will create a block and wait for the polling thread. Then check the queues to see that the block hash has only
|
||||
# been added once.
|
||||
chain_monitor.monitor_chain(polling_delta=2)
|
||||
generate_block()
|
||||
|
||||
watcher_block = chain_monitor.watcher_queue.get()
|
||||
responder_block = chain_monitor.responder_queue.get()
|
||||
assert watcher_block == responder_block
|
||||
assert chain_monitor.watcher_queue.empty()
|
||||
assert chain_monitor.responder_queue.empty()
|
||||
|
||||
# The delta for polling is 2 secs, so let's wait and see
|
||||
time.sleep(2)
|
||||
assert chain_monitor.watcher_queue.empty()
|
||||
assert chain_monitor.responder_queue.empty()
|
||||
|
||||
# We can also force an update and see that it won't go through
|
||||
assert chain_monitor.update_state(watcher_block) is False
|
||||
@@ -5,24 +5,26 @@ from uuid import uuid4
|
||||
from shutil import rmtree
|
||||
from copy import deepcopy
|
||||
from threading import Thread
|
||||
from queue import Queue, Empty
|
||||
|
||||
from pisa.db_manager import DBManager
|
||||
from pisa.responder import Responder, TransactionTracker
|
||||
from pisa.block_processor import BlockProcessor
|
||||
from pisa.chain_monitor import ChainMonitor
|
||||
from pisa.tools import bitcoin_cli
|
||||
|
||||
from common.constants import LOCATOR_LEN_HEX
|
||||
from common.tools import check_sha256_hex_format
|
||||
|
||||
from bitcoind_mock.utils import sha256d
|
||||
from bitcoind_mock.transaction import TX
|
||||
from test.pisa.unit.conftest import generate_block, generate_blocks, get_random_value_hex, get_config
|
||||
from test.pisa.unit.conftest import generate_block, generate_blocks, get_random_value_hex
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def responder(db_manager):
|
||||
return Responder(db_manager, get_config())
|
||||
def responder(db_manager, chain_monitor):
|
||||
responder = Responder(db_manager, chain_monitor)
|
||||
chain_monitor.attach_responder(responder.block_queue, responder.asleep)
|
||||
|
||||
return responder
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
@@ -145,18 +147,19 @@ def test_tracker_from_dict_invalid_data():
|
||||
|
||||
|
||||
def test_init_responder(responder):
|
||||
assert type(responder.trackers) is dict and len(responder.trackers) == 0
|
||||
assert type(responder.tx_tracker_map) is dict and len(responder.tx_tracker_map) == 0
|
||||
assert type(responder.unconfirmed_txs) is list and len(responder.unconfirmed_txs) == 0
|
||||
assert type(responder.missed_confirmations) is dict and len(responder.missed_confirmations) == 0
|
||||
assert isinstance(responder.trackers, dict) and len(responder.trackers) == 0
|
||||
assert isinstance(responder.tx_tracker_map, dict) and len(responder.tx_tracker_map) == 0
|
||||
assert isinstance(responder.unconfirmed_txs, list) and len(responder.unconfirmed_txs) == 0
|
||||
assert isinstance(responder.missed_confirmations, dict) and len(responder.missed_confirmations) == 0
|
||||
assert isinstance(responder.chain_monitor, ChainMonitor)
|
||||
assert responder.block_queue.empty()
|
||||
assert responder.asleep is True
|
||||
assert type(responder.config) is dict
|
||||
assert responder.zmq_subscriber is None
|
||||
|
||||
|
||||
def test_handle_breach(db_manager):
|
||||
responder = Responder(db_manager, get_config())
|
||||
def test_handle_breach(db_manager, chain_monitor):
|
||||
responder = Responder(db_manager, chain_monitor)
|
||||
chain_monitor.attach_responder(responder.block_queue, responder.asleep)
|
||||
|
||||
uuid = uuid4().hex
|
||||
tracker = create_dummy_tracker()
|
||||
|
||||
@@ -173,11 +176,10 @@ def test_handle_breach(db_manager):
|
||||
|
||||
assert receipt.delivered is True
|
||||
|
||||
# The responder automatically fires add_tracker on adding a tracker if it is asleep. We need to stop the processes now.
|
||||
# To do so we delete all the trackers, stop the zmq and create a new fake block to unblock the queue.get method
|
||||
# The responder automatically fires add_tracker on adding a tracker if it is asleep. We need to stop the processes
|
||||
# now. To do so we delete all the trackers, and generate a new block.
|
||||
responder.trackers = dict()
|
||||
responder.zmq_subscriber.terminate = True
|
||||
responder.block_queue.put(get_random_value_hex(32))
|
||||
generate_block()
|
||||
|
||||
|
||||
def test_add_bad_response(responder):
|
||||
@@ -185,7 +187,7 @@ def test_add_bad_response(responder):
|
||||
tracker = create_dummy_tracker()
|
||||
|
||||
# Now that the asleep / awake functionality has been tested we can avoid manually killing the responder by setting
|
||||
# to awake. That will prevent the zmq thread to be launched again.
|
||||
# to awake. That will prevent the chain_monitor thread to be launched again.
|
||||
responder.asleep = False
|
||||
|
||||
# A txid instead of a rawtx should be enough for unit tests using the bitcoind mock, better tests are needed though.
|
||||
@@ -206,7 +208,7 @@ def test_add_bad_response(responder):
|
||||
|
||||
|
||||
def test_add_tracker(responder):
|
||||
responder.asleep = False
|
||||
# Responder is asleep
|
||||
|
||||
for _ in range(20):
|
||||
uuid = uuid4().hex
|
||||
@@ -238,7 +240,8 @@ def test_add_tracker(responder):
|
||||
|
||||
|
||||
def test_add_tracker_same_penalty_txid(responder):
|
||||
# Create the same tracker using two different uuids
|
||||
# Responder is asleep
|
||||
|
||||
confirmations = 0
|
||||
locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end = create_dummy_tracker_data(random_txid=True)
|
||||
uuid_1 = uuid4().hex
|
||||
@@ -265,7 +268,7 @@ def test_add_tracker_same_penalty_txid(responder):
|
||||
|
||||
|
||||
def test_add_tracker_already_confirmed(responder):
|
||||
responder.asleep = False
|
||||
# Responder is asleep
|
||||
|
||||
for i in range(20):
|
||||
uuid = uuid4().hex
|
||||
@@ -279,29 +282,10 @@ def test_add_tracker_already_confirmed(responder):
|
||||
assert penalty_txid not in responder.unconfirmed_txs
|
||||
|
||||
|
||||
def test_do_subscribe(responder):
|
||||
responder.block_queue = Queue()
|
||||
|
||||
zmq_thread = Thread(target=responder.do_subscribe)
|
||||
zmq_thread.daemon = True
|
||||
zmq_thread.start()
|
||||
|
||||
try:
|
||||
generate_block()
|
||||
block_hash = responder.block_queue.get()
|
||||
assert check_sha256_hex_format(block_hash)
|
||||
|
||||
except Empty:
|
||||
assert False
|
||||
|
||||
|
||||
def test_do_watch(temp_db_manager):
|
||||
responder = Responder(temp_db_manager, get_config())
|
||||
responder.block_queue = Queue()
|
||||
|
||||
zmq_thread = Thread(target=responder.do_subscribe)
|
||||
zmq_thread.daemon = True
|
||||
zmq_thread.start()
|
||||
def test_do_watch(temp_db_manager, chain_monitor):
|
||||
# Create a fresh responder to simplify the test
|
||||
responder = Responder(temp_db_manager, chain_monitor)
|
||||
chain_monitor.attach_responder(responder.block_queue, False)
|
||||
|
||||
trackers = [create_dummy_tracker(penalty_rawtx=TX.create_dummy_transaction()) for _ in range(20)]
|
||||
|
||||
@@ -315,9 +299,7 @@ def test_do_watch(temp_db_manager):
|
||||
responder.unconfirmed_txs.append(tracker.penalty_txid)
|
||||
|
||||
# Let's start to watch
|
||||
watch_thread = Thread(target=responder.do_watch)
|
||||
watch_thread.daemon = True
|
||||
watch_thread.start()
|
||||
Thread(target=responder.do_watch, daemon=True).start()
|
||||
|
||||
# And broadcast some of the transactions
|
||||
broadcast_txs = []
|
||||
@@ -351,13 +333,9 @@ def test_do_watch(temp_db_manager):
|
||||
assert responder.asleep is True
|
||||
|
||||
|
||||
def test_check_confirmations(temp_db_manager):
|
||||
responder = Responder(temp_db_manager, get_config())
|
||||
responder.block_queue = Queue()
|
||||
|
||||
zmq_thread = Thread(target=responder.do_subscribe)
|
||||
zmq_thread.daemon = True
|
||||
zmq_thread.start()
|
||||
def test_check_confirmations(temp_db_manager, chain_monitor):
|
||||
responder = Responder(temp_db_manager, chain_monitor)
|
||||
chain_monitor.attach_responder(responder.block_queue, responder.asleep)
|
||||
|
||||
# check_confirmations checks, given a list of transaction for a block, what of the known penalty transaction have
|
||||
# been confirmed. To test this we need to create a list of transactions and the state of the responder
|
||||
@@ -387,7 +365,7 @@ def test_check_confirmations(temp_db_manager):
|
||||
assert responder.missed_confirmations[tx] == 1
|
||||
|
||||
|
||||
# WIP: Check this properly, a bug pass unnoticed!
|
||||
# TODO: Check this properly, a bug pass unnoticed!
|
||||
def test_get_txs_to_rebroadcast(responder):
|
||||
# Let's create a few fake txids and assign at least 6 missing confirmations to each
|
||||
txs_missing_too_many_conf = {get_random_value_hex(32): 6 + i for i in range(10)}
|
||||
@@ -411,13 +389,13 @@ def test_get_txs_to_rebroadcast(responder):
|
||||
assert txs_to_rebroadcast == list(txs_missing_too_many_conf.keys())
|
||||
|
||||
|
||||
def test_get_completed_trackers(db_manager):
|
||||
def test_get_completed_trackers(db_manager, chain_monitor):
|
||||
initial_height = bitcoin_cli().getblockcount()
|
||||
|
||||
# Let's use a fresh responder for this to make it easier to compare the results
|
||||
responder = Responder(db_manager, get_config())
|
||||
responder = Responder(db_manager, chain_monitor)
|
||||
chain_monitor.attach_responder(responder.block_queue, responder.asleep)
|
||||
|
||||
# A complete tracker is a tracker that has reached the appointment end with enough confirmations (> MIN_CONFIRMATIONS)
|
||||
# A complete tracker is a tracker that has reached the appointment end with enough confs (> MIN_CONFIRMATIONS)
|
||||
# We'll create three type of transactions: end reached + enough conf, end reached + no enough conf, end not reached
|
||||
trackers_end_conf = {
|
||||
uuid4().hex: create_dummy_tracker(penalty_rawtx=TX.create_dummy_transaction()) for _ in range(10)
|
||||
@@ -462,9 +440,10 @@ def test_get_completed_trackers(db_manager):
|
||||
assert set(completed_trackers_ids) == set(ended_trackers_keys)
|
||||
|
||||
|
||||
def test_rebroadcast(db_manager):
|
||||
responder = Responder(db_manager, get_config())
|
||||
def test_rebroadcast(db_manager, chain_monitor):
|
||||
responder = Responder(db_manager, chain_monitor)
|
||||
responder.asleep = False
|
||||
chain_monitor.attach_responder(responder.block_queue, responder.asleep)
|
||||
|
||||
txs_to_rebroadcast = []
|
||||
|
||||
|
||||
@@ -1,14 +1,15 @@
|
||||
import pytest
|
||||
from uuid import uuid4
|
||||
from threading import Thread
|
||||
from queue import Queue, Empty
|
||||
from cryptography.hazmat.primitives.asymmetric import ec
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
|
||||
from pisa.watcher import Watcher
|
||||
from pisa.responder import Responder
|
||||
from pisa.tools import bitcoin_cli
|
||||
from pisa.chain_monitor import ChainMonitor
|
||||
|
||||
from test.pisa.unit.conftest import (
|
||||
generate_block,
|
||||
generate_blocks,
|
||||
generate_dummy_appointment,
|
||||
get_random_value_hex,
|
||||
@@ -17,7 +18,7 @@ from test.pisa.unit.conftest import (
|
||||
)
|
||||
from pisa.conf import EXPIRY_DELTA, MAX_APPOINTMENTS
|
||||
|
||||
from common.tools import check_sha256_hex_format, compute_locator
|
||||
from common.tools import compute_locator
|
||||
from common.cryptographer import Cryptographer
|
||||
|
||||
|
||||
@@ -36,8 +37,12 @@ sk_der = signing_key.private_bytes(
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def watcher(db_manager):
|
||||
return Watcher(db_manager, sk_der, get_config())
|
||||
def watcher(db_manager, chain_monitor):
|
||||
watcher = Watcher(db_manager, chain_monitor, sk_der, get_config())
|
||||
chain_monitor.attach_watcher(watcher.block_queue, watcher.asleep)
|
||||
chain_monitor.attach_responder(watcher.responder.block_queue, watcher.responder.asleep)
|
||||
|
||||
return watcher
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
@@ -68,17 +73,18 @@ def create_appointments(n):
|
||||
return appointments, locator_uuid_map, dispute_txs
|
||||
|
||||
|
||||
def test_init(watcher):
|
||||
assert type(watcher.appointments) is dict and len(watcher.appointments) == 0
|
||||
assert type(watcher.locator_uuid_map) is dict and len(watcher.locator_uuid_map) == 0
|
||||
assert watcher.block_queue.empty()
|
||||
def test_init(run_bitcoind, watcher):
|
||||
assert isinstance(watcher.appointments, dict) and len(watcher.appointments) == 0
|
||||
assert isinstance(watcher.locator_uuid_map, dict) and len(watcher.locator_uuid_map) == 0
|
||||
assert watcher.asleep is True
|
||||
assert type(watcher.config) is dict
|
||||
assert watcher.zmq_subscriber is None
|
||||
assert type(watcher.responder) is Responder
|
||||
assert watcher.block_queue.empty()
|
||||
assert isinstance(watcher.chain_monitor, ChainMonitor)
|
||||
assert isinstance(watcher.config, dict)
|
||||
assert isinstance(watcher.signing_key, ec.EllipticCurvePrivateKey)
|
||||
assert isinstance(watcher.responder, Responder)
|
||||
|
||||
|
||||
def test_add_appointment(run_bitcoind, watcher):
|
||||
def test_add_appointment(watcher):
|
||||
# The watcher automatically fires do_watch and do_subscribe on adding an appointment if it is asleep (initial state)
|
||||
# Avoid this by setting the state to awake.
|
||||
watcher.asleep = False
|
||||
@@ -122,36 +128,18 @@ def test_add_too_many_appointments(watcher):
|
||||
assert sig is None
|
||||
|
||||
|
||||
def test_do_subscribe(watcher):
|
||||
watcher.block_queue = Queue()
|
||||
|
||||
zmq_thread = Thread(target=watcher.do_subscribe)
|
||||
zmq_thread.daemon = True
|
||||
zmq_thread.start()
|
||||
|
||||
try:
|
||||
generate_block()
|
||||
block_hash = watcher.block_queue.get()
|
||||
assert check_sha256_hex_format(block_hash)
|
||||
|
||||
except Empty:
|
||||
assert False
|
||||
|
||||
|
||||
def test_do_watch(watcher):
|
||||
# We will wipe all the previous data and add 5 appointments
|
||||
watcher.appointments, watcher.locator_uuid_map, dispute_txs = create_appointments(APPOINTMENTS)
|
||||
watcher.chain_monitor.watcher_asleep = False
|
||||
|
||||
watch_thread = Thread(target=watcher.do_watch)
|
||||
watch_thread.daemon = True
|
||||
watch_thread.start()
|
||||
Thread(target=watcher.do_watch, daemon=True).start()
|
||||
|
||||
# Broadcast the first two
|
||||
for dispute_tx in dispute_txs[:2]:
|
||||
bitcoin_cli().sendrawtransaction(dispute_tx)
|
||||
|
||||
# After leaving some time for the block to be mined and processed, the number of appointments should have reduced
|
||||
# by two
|
||||
# After generating enough blocks, the number of appointments should have reduced by two
|
||||
generate_blocks(START_TIME_OFFSET + END_TIME_OFFSET)
|
||||
|
||||
assert len(watcher.appointments) == APPOINTMENTS - 2
|
||||
|
||||
Reference in New Issue
Block a user