From 8fab59975fe6e8bf0bc6c13d1cb206daf51adc66 Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Thu, 2 Jan 2020 16:15:27 +0100 Subject: [PATCH 01/22] Adds basic ChainMonitor to the system ChainMonitor is the actor in charge of checking new blocks. It improves from the previous zmq_subscriber by also doing polling and, therefore, making sure that no blocks are missed. Documentation and tests are still required. Partially covers #31 --- pisa/chain_monitor.py | 92 ++++++++++++++++++++++++++++++++++++ pisa/pisad.py | 12 ++++- pisa/responder.py | 23 ++------- pisa/utils/zmq_subscriber.py | 34 ------------- pisa/watcher.py | 23 ++------- 5 files changed, 112 insertions(+), 72 deletions(-) create mode 100644 pisa/chain_monitor.py delete mode 100644 pisa/utils/zmq_subscriber.py diff --git a/pisa/chain_monitor.py b/pisa/chain_monitor.py new file mode 100644 index 0000000..74b21f1 --- /dev/null +++ b/pisa/chain_monitor.py @@ -0,0 +1,92 @@ +import zmq +import binascii +from queue import Queue +from threading import Thread, Event, Condition + +from common.logger import Logger +from pisa.conf import FEED_PROTOCOL, FEED_ADDR, FEED_PORT +from pisa.block_processor import BlockProcessor + +logger = Logger("ChainMonitor") + + +class ChainMonitor: + def __init__(self): + self.best_tip = None + self.last_tips = [] + self.terminate = False + + self.check_tip = Event() + self.lock = Condition() + + self.zmqContext = zmq.Context() + self.zmqSubSocket = self.zmqContext.socket(zmq.SUB) + self.zmqSubSocket.setsockopt(zmq.RCVHWM, 0) + self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock") + self.zmqSubSocket.connect("%s://%s:%s" % (FEED_PROTOCOL, FEED_ADDR, FEED_PORT)) + + self.watcher_queue = None + self.watcher_asleep = True + self.responder_queue = None + self.responder_asleep = True + + def attach_watcher(self, queue, awake): + self.watcher_queue = queue + self.watcher_asleep = awake + + def attach_responder(self, queue, awake): + self.responder_queue = queue + self.responder_asleep = awake + + def notify_subscribers(self, block_hash): + if not self.watcher_asleep: + self.watcher_queue.put(block_hash) + + if not self.responder_asleep: + self.responder_queue.put(block_hash) + + def update_state(self, block_hash): + self.best_tip = block_hash + self.last_tips.append(block_hash) + + if len(self.last_tips) > 10: + self.last_tips.pop(0) + + def monitor_chain(self): + self.best_tip = BlockProcessor.get_best_block_hash() + Thread(target=self.monitor_chain_polling).start() + Thread(target=self.monitor_chain_zmq).start() + + def monitor_chain_polling(self): + while self.terminate: + self.check_tip.wait(timeout=60) + + # Terminate could have been set wile the thread was blocked in wait + if not self.terminate: + current_tip = BlockProcessor.get_best_block_hash() + + self.lock.acquire() + if current_tip != self.best_tip: + self.update_state(current_tip) + self.notify_subscribers(current_tip) + logger.info("New block received via polling", block_hash=current_tip) + self.lock.release() + + def monitor_chain_zmq(self): + while not self.terminate: + msg = self.zmqSubSocket.recv_multipart() + + # Terminate could have been set wile the thread was blocked in recv + if not self.terminate: + topic = msg[0] + body = msg[1] + + if topic == b"hashblock": + block_hash = binascii.hexlify(body).decode("utf-8") + + self.lock.acquire() + if block_hash != self.best_tip and block_hash not in self.last_tips: + self.update_state(block_hash) + self.notify_subscribers(block_hash) + logger.info("New block received via zmq", block_hash=block_hash) + self.lock.release() diff --git a/pisa/pisad.py b/pisa/pisad.py index 9c258ce..ed185c6 100644 --- a/pisa/pisad.py +++ b/pisa/pisad.py @@ -10,6 +10,7 @@ from pisa.builder import Builder from pisa.conf import BTC_NETWORK, PISA_SECRET_KEY from pisa.responder import Responder from pisa.db_manager import DBManager +from pisa.chain_monitor import ChainMonitor from pisa.block_processor import BlockProcessor from pisa.tools import can_connect_to_bitcoind, in_correct_network @@ -46,13 +47,18 @@ if __name__ == "__main__": try: db_manager = DBManager(DB_PATH) + # Create the chain monitor and start monitoring the chain + chain_monitor = ChainMonitor() + chain_monitor.monitor_chain() + watcher_appointments_data = db_manager.load_watcher_appointments() responder_trackers_data = db_manager.load_responder_trackers() with open(PISA_SECRET_KEY, "rb") as key_file: secret_key_der = key_file.read() - watcher = Watcher(db_manager, secret_key_der) + watcher = Watcher(db_manager, chain_monitor, secret_key_der) + chain_monitor.attach_watcher(watcher.block_queue, watcher.asleep) if len(watcher_appointments_data) == 0 and len(responder_trackers_data) == 0: logger.info("Fresh bootstrap") @@ -65,7 +71,7 @@ if __name__ == "__main__": last_block_responder = db_manager.load_last_block_hash_responder() # FIXME: 32-reorgs-offline dropped txs are not used at this point. - responder = Responder(db_manager) + responder = Responder(db_manager, chain_monitor) last_common_ancestor_responder = None missed_blocks_responder = None @@ -82,6 +88,8 @@ if __name__ == "__main__": # Build Watcher with Responder and backed up data. If the blocks of both match we don't perform the # search twice. watcher.responder = responder + chain_monitor.attach_responder(responder.block_queue, responder.asleep) + if last_block_watcher is not None: if last_block_watcher == last_block_responder: missed_blocks_watcher = missed_blocks_responder diff --git a/pisa/responder.py b/pisa/responder.py index 0fce1cb..dbfac15 100644 --- a/pisa/responder.py +++ b/pisa/responder.py @@ -6,7 +6,6 @@ from common.logger import Logger from pisa.cleaner import Cleaner from pisa.carrier import Carrier from pisa.block_processor import BlockProcessor -from pisa.utils.zmq_subscriber import ZMQSubscriber CONFIRMATIONS_BEFORE_RETRY = 6 MIN_CONFIRMATIONS = 6 @@ -135,14 +134,14 @@ class Responder: """ - def __init__(self, db_manager): + def __init__(self, db_manager, chain_monitor): self.trackers = dict() self.tx_tracker_map = dict() self.unconfirmed_txs = [] self.missed_confirmations = dict() self.asleep = True self.block_queue = Queue() - self.zmq_subscriber = None + self.chain_monitor = chain_monitor self.db_manager = db_manager @staticmethod @@ -260,19 +259,8 @@ class Responder: if self.asleep: self.asleep = False - zmq_thread = Thread(target=self.do_subscribe) - responder = Thread(target=self.do_watch) - zmq_thread.start() - responder.start() - - def do_subscribe(self): - """ - Initializes a :obj:`ZMQSubscriber ` instance to listen to new blocks - from ``bitcoind``. Block ids are received trough the ``block_queue``. - """ - - self.zmq_subscriber = ZMQSubscriber(parent="Responder") - self.zmq_subscriber.handle(self.block_queue) + self.chain_monitor.responder_asleep = False + Thread(target=self.do_watch).start() def do_watch(self): """ @@ -327,8 +315,7 @@ class Responder: # Go back to sleep if there are no more pending trackers self.asleep = True - self.zmq_subscriber.terminate = True - self.block_queue = Queue() + self.chain_monitor.responder_asleep = True logger.info("No more pending trackers, going back to sleep") diff --git a/pisa/utils/zmq_subscriber.py b/pisa/utils/zmq_subscriber.py deleted file mode 100644 index ecec9af..0000000 --- a/pisa/utils/zmq_subscriber.py +++ /dev/null @@ -1,34 +0,0 @@ -import zmq -import binascii -from common.logger import Logger -from pisa.conf import FEED_PROTOCOL, FEED_ADDR, FEED_PORT - - -# ToDo: #7-add-async-back-to-zmq -class ZMQSubscriber: - """ Adapted from https://github.com/bitcoin/bitcoin/blob/master/contrib/zmq/zmq_sub.py""" - - def __init__(self, parent): - self.zmqContext = zmq.Context() - self.zmqSubSocket = self.zmqContext.socket(zmq.SUB) - self.zmqSubSocket.setsockopt(zmq.RCVHWM, 0) - self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock") - self.zmqSubSocket.connect("%s://%s:%s" % (FEED_PROTOCOL, FEED_ADDR, FEED_PORT)) - self.logger = Logger("ZMQSubscriber-{}".format(parent)) - - self.terminate = False - - def handle(self, block_queue): - while not self.terminate: - msg = self.zmqSubSocket.recv_multipart() - - # Terminate could have been set wile the thread was blocked in recv - if not self.terminate: - topic = msg[0] - body = msg[1] - - if topic == b"hashblock": - block_hash = binascii.hexlify(body).decode("utf-8") - block_queue.put(block_hash) - - self.logger.info("New block received via ZMQ", block_hash=block_hash) diff --git a/pisa/watcher.py b/pisa/watcher.py index 9d659db..d6e0729 100644 --- a/pisa/watcher.py +++ b/pisa/watcher.py @@ -9,7 +9,6 @@ from common.logger import Logger from pisa.cleaner import Cleaner from pisa.responder import Responder from pisa.block_processor import BlockProcessor -from pisa.utils.zmq_subscriber import ZMQSubscriber from pisa.conf import EXPIRY_DELTA, MAX_APPOINTMENTS logger = Logger("Watcher") @@ -58,13 +57,13 @@ class Watcher: """ - def __init__(self, db_manager, sk_der, responder=None, max_appointments=MAX_APPOINTMENTS): + def __init__(self, db_manager, chain_monitor, sk_der, responder=None, max_appointments=MAX_APPOINTMENTS): self.appointments = dict() self.locator_uuid_map = dict() self.asleep = True self.block_queue = Queue() self.max_appointments = max_appointments - self.zmq_subscriber = None + self.chain_monitor = chain_monitor self.db_manager = db_manager self.signing_key = Cryptographer.load_private_key_der(sk_der) @@ -127,10 +126,8 @@ class Watcher: if self.asleep: self.asleep = False - zmq_thread = Thread(target=self.do_subscribe) - watcher = Thread(target=self.do_watch) - zmq_thread.start() - watcher.start() + self.chain_monitor.watcher_asleep = False + Thread(target=self.do_watch).start() logger.info("Waking up") @@ -151,15 +148,6 @@ class Watcher: return appointment_added, signature - def do_subscribe(self): - """ - Initializes a ``ZMQSubscriber`` instance to listen to new blocks from ``bitcoind``. Block ids are received - trough the ``block_queue``. - """ - - self.zmq_subscriber = ZMQSubscriber(parent="Watcher") - self.zmq_subscriber.handle(self.block_queue) - def do_watch(self): """ Monitors the blockchain whilst there are pending appointments. @@ -221,8 +209,7 @@ class Watcher: # Go back to sleep if there are no more appointments self.asleep = True - self.zmq_subscriber.terminate = True - self.block_queue = Queue() + self.chain_monitor.watcher_asleep = True logger.info("No more pending appointments, going back to sleep") From c594018dce377968cb212b10dcdf2c157fd9f700 Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Fri, 3 Jan 2020 11:37:26 +0100 Subject: [PATCH 02/22] Bug fix: Properly initializes Responder --- pisa/chain_monitor.py | 3 +-- pisa/pisad.py | 13 ++++++------- pisa/watcher.py | 2 +- 3 files changed, 8 insertions(+), 10 deletions(-) diff --git a/pisa/chain_monitor.py b/pisa/chain_monitor.py index 74b21f1..4442488 100644 --- a/pisa/chain_monitor.py +++ b/pisa/chain_monitor.py @@ -1,6 +1,5 @@ import zmq import binascii -from queue import Queue from threading import Thread, Event, Condition from common.logger import Logger @@ -26,8 +25,8 @@ class ChainMonitor: self.zmqSubSocket.connect("%s://%s:%s" % (FEED_PROTOCOL, FEED_ADDR, FEED_PORT)) self.watcher_queue = None - self.watcher_asleep = True self.responder_queue = None + self.watcher_asleep = True self.responder_asleep = True def attach_watcher(self, queue, awake): diff --git a/pisa/pisad.py b/pisa/pisad.py index ed185c6..4935d40 100644 --- a/pisa/pisad.py +++ b/pisa/pisad.py @@ -71,7 +71,6 @@ if __name__ == "__main__": last_block_responder = db_manager.load_last_block_hash_responder() # FIXME: 32-reorgs-offline dropped txs are not used at this point. - responder = Responder(db_manager, chain_monitor) last_common_ancestor_responder = None missed_blocks_responder = None @@ -82,13 +81,13 @@ if __name__ == "__main__": ) missed_blocks_responder = block_processor.get_missed_blocks(last_common_ancestor_responder) - responder.trackers, responder.tx_tracker_map = Builder.build_trackers(responder_trackers_data) - responder.block_queue = Builder.build_block_queue(missed_blocks_responder) + watcher.responder.trackers, watcher.responder.tx_tracker_map = Builder.build_trackers( + responder_trackers_data + ) + watcher.responder.block_queue = Builder.build_block_queue(missed_blocks_responder) - # Build Watcher with Responder and backed up data. If the blocks of both match we don't perform the - # search twice. - watcher.responder = responder - chain_monitor.attach_responder(responder.block_queue, responder.asleep) + # Build Watcher. If the blocks of both match we don't perform the search twice. + chain_monitor.attach_responder(watcher.responder.block_queue, watcher.responder.asleep) if last_block_watcher is not None: if last_block_watcher == last_block_responder: diff --git a/pisa/watcher.py b/pisa/watcher.py index d6e0729..1da5c75 100644 --- a/pisa/watcher.py +++ b/pisa/watcher.py @@ -68,7 +68,7 @@ class Watcher: self.signing_key = Cryptographer.load_private_key_der(sk_der) if not isinstance(responder, Responder): - self.responder = Responder(db_manager) + self.responder = Responder(db_manager, chain_monitor) @staticmethod def compute_locator(tx_id): From 3889d30e313c75dace4e321b25282e4fcf68830b Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Fri, 3 Jan 2020 13:35:52 +0100 Subject: [PATCH 03/22] Minor refactor Runs chain_monitor as daemon so it terminates along with the main thread. Reorders attack_responder so it is in the same stanza as attack_watcher --- pisa/chain_monitor.py | 6 +++--- pisa/pisad.py | 3 +-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/pisa/chain_monitor.py b/pisa/chain_monitor.py index 4442488..c9e9f83 100644 --- a/pisa/chain_monitor.py +++ b/pisa/chain_monitor.py @@ -53,11 +53,11 @@ class ChainMonitor: def monitor_chain(self): self.best_tip = BlockProcessor.get_best_block_hash() - Thread(target=self.monitor_chain_polling).start() - Thread(target=self.monitor_chain_zmq).start() + Thread(target=self.monitor_chain_polling, daemon=True).start() + Thread(target=self.monitor_chain_zmq, daemon=True).start() def monitor_chain_polling(self): - while self.terminate: + while not self.terminate: self.check_tip.wait(timeout=60) # Terminate could have been set wile the thread was blocked in wait diff --git a/pisa/pisad.py b/pisa/pisad.py index 4935d40..b027662 100644 --- a/pisa/pisad.py +++ b/pisa/pisad.py @@ -59,6 +59,7 @@ if __name__ == "__main__": watcher = Watcher(db_manager, chain_monitor, secret_key_der) chain_monitor.attach_watcher(watcher.block_queue, watcher.asleep) + chain_monitor.attach_responder(watcher.responder.block_queue, watcher.responder.asleep) if len(watcher_appointments_data) == 0 and len(responder_trackers_data) == 0: logger.info("Fresh bootstrap") @@ -87,8 +88,6 @@ if __name__ == "__main__": watcher.responder.block_queue = Builder.build_block_queue(missed_blocks_responder) # Build Watcher. If the blocks of both match we don't perform the search twice. - chain_monitor.attach_responder(watcher.responder.block_queue, watcher.responder.asleep) - if last_block_watcher is not None: if last_block_watcher == last_block_responder: missed_blocks_watcher = missed_blocks_responder From 069e15fdba0ab0166adaf23b3e4327fa50064feb Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Fri, 3 Jan 2020 13:36:52 +0100 Subject: [PATCH 04/22] Updates current tests to work with chain_monitor instead of zmq_sub --- test/pisa/unit/test_api.py | 9 ++- test/pisa/unit/test_responder.py | 106 ++++++++++++++----------------- test/pisa/unit/test_watcher.py | 62 ++++++++---------- 3 files changed, 81 insertions(+), 96 deletions(-) diff --git a/test/pisa/unit/test_api.py b/test/pisa/unit/test_api.py index 2dc830b..b6ca968 100644 --- a/test/pisa/unit/test_api.py +++ b/test/pisa/unit/test_api.py @@ -10,6 +10,7 @@ from pisa.watcher import Watcher from pisa.tools import bitcoin_cli from pisa import HOST, PORT from pisa.conf import MAX_APPOINTMENTS +from pisa.chain_monitor import ChainMonitor from test.pisa.unit.conftest import ( generate_block, @@ -37,7 +38,13 @@ def run_api(db_manager): format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.NoEncryption(), ) - watcher = Watcher(db_manager, sk_der) + + chain_monitor = ChainMonitor() + chain_monitor.monitor_chain() + + watcher = Watcher(db_manager, chain_monitor, sk_der) + chain_monitor.attach_watcher(watcher.block_queue, watcher.asleep) + chain_monitor.attach_responder(watcher.responder.block_queue, watcher.responder.asleep) api_thread = Thread(target=API(watcher).start) api_thread.daemon = True diff --git a/test/pisa/unit/test_responder.py b/test/pisa/unit/test_responder.py index a9e99da..b3e339e 100644 --- a/test/pisa/unit/test_responder.py +++ b/test/pisa/unit/test_responder.py @@ -5,15 +5,14 @@ from uuid import uuid4 from shutil import rmtree from copy import deepcopy from threading import Thread -from queue import Queue, Empty from pisa.db_manager import DBManager from pisa.responder import Responder, TransactionTracker from pisa.block_processor import BlockProcessor +from pisa.chain_monitor import ChainMonitor from pisa.tools import bitcoin_cli from common.constants import LOCATOR_LEN_HEX -from common.tools import check_sha256_hex_format from bitcoind_mock.utils import sha256d from bitcoind_mock.transaction import TX @@ -21,8 +20,19 @@ from test.pisa.unit.conftest import generate_block, generate_blocks, get_random_ @pytest.fixture(scope="module") -def responder(db_manager): - return Responder(db_manager) +def chain_monitor(): + chain_monitor = ChainMonitor() + chain_monitor.monitor_chain() + + return chain_monitor + + +@pytest.fixture(scope="module") +def responder(db_manager, chain_monitor): + responder = Responder(db_manager, chain_monitor) + chain_monitor.attach_responder(responder.block_queue, responder.asleep) + + return responder @pytest.fixture() @@ -145,17 +155,19 @@ def test_tracker_from_dict_invalid_data(): def test_init_responder(responder): - assert type(responder.trackers) is dict and len(responder.trackers) == 0 - assert type(responder.tx_tracker_map) is dict and len(responder.tx_tracker_map) == 0 - assert type(responder.unconfirmed_txs) is list and len(responder.unconfirmed_txs) == 0 - assert type(responder.missed_confirmations) is dict and len(responder.missed_confirmations) == 0 + assert isinstance(responder.trackers, dict) and len(responder.trackers) == 0 + assert isinstance(responder.tx_tracker_map, dict) and len(responder.tx_tracker_map) == 0 + assert isinstance(responder.unconfirmed_txs, list) and len(responder.unconfirmed_txs) == 0 + assert isinstance(responder.missed_confirmations, dict) and len(responder.missed_confirmations) == 0 + assert isinstance(responder.chain_monitor, ChainMonitor) assert responder.block_queue.empty() assert responder.asleep is True - assert responder.zmq_subscriber is None -def test_handle_breach(db_manager): - responder = Responder(db_manager) +def test_handle_breach(db_manager, chain_monitor): + responder = Responder(db_manager, chain_monitor) + chain_monitor.attach_responder(responder.block_queue, responder.asleep) + uuid = uuid4().hex tracker = create_dummy_tracker() @@ -172,11 +184,10 @@ def test_handle_breach(db_manager): assert receipt.delivered is True - # The responder automatically fires add_tracker on adding a tracker if it is asleep. We need to stop the processes now. - # To do so we delete all the trackers, stop the zmq and create a new fake block to unblock the queue.get method + # The responder automatically fires add_tracker on adding a tracker if it is asleep. We need to stop the processes + # now. To do so we delete all the trackers, and generate a new block. responder.trackers = dict() - responder.zmq_subscriber.terminate = True - responder.block_queue.put(get_random_value_hex(32)) + generate_block() def test_add_bad_response(responder): @@ -205,7 +216,7 @@ def test_add_bad_response(responder): def test_add_tracker(responder): - responder.asleep = False + # Responder is asleep for _ in range(20): uuid = uuid4().hex @@ -237,7 +248,8 @@ def test_add_tracker(responder): def test_add_tracker_same_penalty_txid(responder): - # Create the same tracker using two different uuids + # Responder is asleep + confirmations = 0 locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end = create_dummy_tracker_data(random_txid=True) uuid_1 = uuid4().hex @@ -264,7 +276,7 @@ def test_add_tracker_same_penalty_txid(responder): def test_add_tracker_already_confirmed(responder): - responder.asleep = False + # Responder is asleep for i in range(20): uuid = uuid4().hex @@ -278,29 +290,10 @@ def test_add_tracker_already_confirmed(responder): assert penalty_txid not in responder.unconfirmed_txs -def test_do_subscribe(responder): - responder.block_queue = Queue() - - zmq_thread = Thread(target=responder.do_subscribe) - zmq_thread.daemon = True - zmq_thread.start() - - try: - generate_block() - block_hash = responder.block_queue.get() - assert check_sha256_hex_format(block_hash) - - except Empty: - assert False - - -def test_do_watch(temp_db_manager): - responder = Responder(temp_db_manager) - responder.block_queue = Queue() - - zmq_thread = Thread(target=responder.do_subscribe) - zmq_thread.daemon = True - zmq_thread.start() +def test_do_watch(temp_db_manager, chain_monitor): + # Create a fresh responder to simplify the test + responder = Responder(temp_db_manager, chain_monitor) + chain_monitor.attach_responder(responder.block_queue, False) trackers = [create_dummy_tracker(penalty_rawtx=TX.create_dummy_transaction()) for _ in range(20)] @@ -314,9 +307,7 @@ def test_do_watch(temp_db_manager): responder.unconfirmed_txs.append(tracker.penalty_txid) # Let's start to watch - watch_thread = Thread(target=responder.do_watch) - watch_thread.daemon = True - watch_thread.start() + Thread(target=responder.do_watch, daemon=True).start() # And broadcast some of the transactions broadcast_txs = [] @@ -324,8 +315,10 @@ def test_do_watch(temp_db_manager): bitcoin_cli().sendrawtransaction(tracker.penalty_rawtx) broadcast_txs.append(tracker.penalty_txid) + print(responder.unconfirmed_txs) # Mine a block generate_block() + print(responder.unconfirmed_txs) # The transactions we sent shouldn't be in the unconfirmed transaction list anymore assert not set(broadcast_txs).issubset(responder.unconfirmed_txs) @@ -350,13 +343,9 @@ def test_do_watch(temp_db_manager): assert responder.asleep is True -def test_check_confirmations(temp_db_manager): - responder = Responder(temp_db_manager) - responder.block_queue = Queue() - - zmq_thread = Thread(target=responder.do_subscribe) - zmq_thread.daemon = True - zmq_thread.start() +def test_check_confirmations(temp_db_manager, chain_monitor): + responder = Responder(temp_db_manager, chain_monitor) + chain_monitor.attach_responder(responder.block_queue, responder.asleep) # check_confirmations checks, given a list of transaction for a block, what of the known penalty transaction have # been confirmed. To test this we need to create a list of transactions and the state of the responder @@ -386,7 +375,7 @@ def test_check_confirmations(temp_db_manager): assert responder.missed_confirmations[tx] == 1 -# WIP: Check this properly, a bug pass unnoticed! +# TODO: Check this properly, a bug pass unnoticed! def test_get_txs_to_rebroadcast(responder): # Let's create a few fake txids and assign at least 6 missing confirmations to each txs_missing_too_many_conf = {get_random_value_hex(32): 6 + i for i in range(10)} @@ -410,13 +399,13 @@ def test_get_txs_to_rebroadcast(responder): assert txs_to_rebroadcast == list(txs_missing_too_many_conf.keys()) -def test_get_completed_trackers(db_manager): +def test_get_completed_trackers(db_manager, chain_monitor): initial_height = bitcoin_cli().getblockcount() - # Let's use a fresh responder for this to make it easier to compare the results - responder = Responder(db_manager) + responder = Responder(db_manager, chain_monitor) + chain_monitor.attach_responder(responder.block_queue, responder.asleep) - # A complete tracker is a tracker that has reached the appointment end with enough confirmations (> MIN_CONFIRMATIONS) + # A complete tracker is a tracker that has reached the appointment end with enough confs (> MIN_CONFIRMATIONS) # We'll create three type of transactions: end reached + enough conf, end reached + no enough conf, end not reached trackers_end_conf = { uuid4().hex: create_dummy_tracker(penalty_rawtx=TX.create_dummy_transaction()) for _ in range(10) @@ -461,9 +450,10 @@ def test_get_completed_trackers(db_manager): assert set(completed_trackers_ids) == set(ended_trackers_keys) -def test_rebroadcast(db_manager): - responder = Responder(db_manager) +def test_rebroadcast(db_manager, chain_monitor): + responder = Responder(db_manager, chain_monitor) responder.asleep = False + chain_monitor.attach_responder(responder.block_queue, responder.asleep) txs_to_rebroadcast = [] diff --git a/test/pisa/unit/test_watcher.py b/test/pisa/unit/test_watcher.py index 8ec331c..2b2c0cb 100644 --- a/test/pisa/unit/test_watcher.py +++ b/test/pisa/unit/test_watcher.py @@ -1,22 +1,15 @@ import pytest from uuid import uuid4 from threading import Thread -from queue import Queue, Empty from cryptography.hazmat.primitives import serialization from pisa.watcher import Watcher from pisa.responder import Responder from pisa.tools import bitcoin_cli -from test.pisa.unit.conftest import ( - generate_block, - generate_blocks, - generate_dummy_appointment, - get_random_value_hex, - generate_keypair, -) +from test.pisa.unit.conftest import generate_blocks, generate_dummy_appointment, get_random_value_hex, generate_keypair +from pisa.chain_monitor import ChainMonitor from pisa.conf import EXPIRY_DELTA, MAX_APPOINTMENTS -from common.tools import check_sha256_hex_format from common.cryptographer import Cryptographer @@ -35,8 +28,20 @@ sk_der = signing_key.private_bytes( @pytest.fixture(scope="module") -def watcher(db_manager): - return Watcher(db_manager, sk_der) +def chain_monitor(): + chain_monitor = ChainMonitor() + chain_monitor.monitor_chain() + + return chain_monitor + + +@pytest.fixture(scope="module") +def watcher(db_manager, chain_monitor): + watcher = Watcher(db_manager, chain_monitor, sk_der) + chain_monitor.attach_watcher(watcher.block_queue, watcher.asleep) + chain_monitor.attach_responder(watcher.responder.block_queue, watcher.responder.asleep) + + return watcher @pytest.fixture(scope="module") @@ -67,17 +72,18 @@ def create_appointments(n): return appointments, locator_uuid_map, dispute_txs -def test_init(watcher): - assert type(watcher.appointments) is dict and len(watcher.appointments) == 0 - assert type(watcher.locator_uuid_map) is dict and len(watcher.locator_uuid_map) == 0 +def test_init(run_bitcoind, watcher): + assert isinstance(watcher.appointments, dict) and len(watcher.appointments) == 0 + assert isinstance(watcher.locator_uuid_map, dict) and len(watcher.locator_uuid_map) == 0 + assert isinstance(watcher.chain_monitor, ChainMonitor) assert watcher.block_queue.empty() assert watcher.asleep is True + assert watcher.max_appointments == MAX_APPOINTMENTS - assert watcher.zmq_subscriber is None assert type(watcher.responder) is Responder -def test_add_appointment(run_bitcoind, watcher): +def test_add_appointment(watcher): # The watcher automatically fires do_watch and do_subscribe on adding an appointment if it is asleep (initial state) # Avoid this by setting the state to awake. watcher.asleep = False @@ -121,36 +127,18 @@ def test_add_too_many_appointments(watcher): assert sig is None -def test_do_subscribe(watcher): - watcher.block_queue = Queue() - - zmq_thread = Thread(target=watcher.do_subscribe) - zmq_thread.daemon = True - zmq_thread.start() - - try: - generate_block() - block_hash = watcher.block_queue.get() - assert check_sha256_hex_format(block_hash) - - except Empty: - assert False - - def test_do_watch(watcher): # We will wipe all the previous data and add 5 appointments watcher.appointments, watcher.locator_uuid_map, dispute_txs = create_appointments(APPOINTMENTS) + watcher.chain_monitor.watcher_asleep = False - watch_thread = Thread(target=watcher.do_watch) - watch_thread.daemon = True - watch_thread.start() + Thread(target=watcher.do_watch, daemon=True).start() # Broadcast the first two for dispute_tx in dispute_txs[:2]: bitcoin_cli().sendrawtransaction(dispute_tx) - # After leaving some time for the block to be mined and processed, the number of appointments should have reduced - # by two + # After generating enough blocks, the number of appointments should have reduced by two generate_blocks(START_TIME_OFFSET + END_TIME_OFFSET) assert len(watcher.appointments) == APPOINTMENTS - 2 From e5514cefcec439d3a84a45402562c0754f86d275 Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Mon, 6 Jan 2020 12:27:02 +0100 Subject: [PATCH 05/22] Updates old docs/comments regarding zmq --- pisa/responder.py | 9 ++++----- pisa/watcher.py | 8 ++++---- test/pisa/unit/test_responder.py | 2 +- 3 files changed, 9 insertions(+), 10 deletions(-) diff --git a/pisa/responder.py b/pisa/responder.py index dbfac15..89b1096 100644 --- a/pisa/responder.py +++ b/pisa/responder.py @@ -126,9 +126,9 @@ class Responder: has missed. Used to trigger rebroadcast if needed. asleep (:obj:`bool`): A flag that signals whether the :obj:`Responder` is asleep or awake. block_queue (:obj:`Queue`): A queue used by the :obj:`Responder` to receive block hashes from ``bitcoind``. It - is populated by the :obj:`ZMQSubscriber `. - zmq_subscriber (:obj:`ZMQSubscriber `): a ``ZMQSubscriber`` instance - used to receive new block notifications from ``bitcoind``. + is populated by the :obj:`ChainMonitor `. + chain_monitor (:obj:`ChainMonitor `): a ``ChainMonitor`` instance used to track + new blocks received by ``bitcoind``. db_manager (:obj:`DBManager `): A ``DBManager`` instance to interact with the database. @@ -223,8 +223,7 @@ class Responder: The :obj:`TransactionTracker` is stored in ``trackers`` and ``tx_tracker_map`` and the ``penalty_txid`` added to ``unconfirmed_txs`` if ``confirmations=0``. Finally, the data is also stored in the database. - ``add_tracker`` awakes the :obj:`Responder` and creates a connection with the - :obj:`ZMQSubscriber ` if he is asleep. + ``add_tracker`` awakes the :obj:`Responder` if it is asleep. Args: uuid (:obj:`str`): a unique identifier for the appointment. diff --git a/pisa/watcher.py b/pisa/watcher.py index 1da5c75..079953b 100644 --- a/pisa/watcher.py +++ b/pisa/watcher.py @@ -27,7 +27,7 @@ class Watcher: If an appointment reaches its end with no breach, the data is simply deleted. The :class:`Watcher` receives information about new received blocks via the ``block_queue`` that is populated by the - :obj:`ZMQSubscriber `. + :obj:`ChainMonitor `. Args: db_manager (:obj:`DBManager `): a ``DBManager`` instance to interact with the database. @@ -45,11 +45,11 @@ class Watcher: appointments with the same ``locator``. asleep (:obj:`bool`): A flag that signals whether the :obj:`Watcher` is asleep or awake. block_queue (:obj:`Queue`): A queue used by the :obj:`Watcher` to receive block hashes from ``bitcoind``. It is - populated by the :obj:`ZMQSubscriber `. + populated by the :obj:`ChainMonitor `. max_appointments(:obj:`int`): the maximum amount of appointments that the :obj:`Watcher` will keep at any given time. - zmq_subscriber (:obj:`ZMQSubscriber `): a ZMQSubscriber instance used - to receive new block notifications from ``bitcoind``. + chain_monitor (:obj:`ChainMonitor `): a ``ChainMonitor`` instance used to track + new blocks received by ``bitcoind``. db_manager (:obj:`DBManager `): A db manager instance to interact with the database. Raises: diff --git a/test/pisa/unit/test_responder.py b/test/pisa/unit/test_responder.py index b3e339e..d515ff7 100644 --- a/test/pisa/unit/test_responder.py +++ b/test/pisa/unit/test_responder.py @@ -195,7 +195,7 @@ def test_add_bad_response(responder): tracker = create_dummy_tracker() # Now that the asleep / awake functionality has been tested we can avoid manually killing the responder by setting - # to awake. That will prevent the zmq thread to be launched again. + # to awake. That will prevent the chain_monitor thread to be launched again. responder.asleep = False # A txid instead of a rawtx should be enough for unit tests using the bitcoind mock, better tests are needed though. From b2272aa4eaeec2c19f0cc2385b9593da60826636 Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Mon, 6 Jan 2020 13:32:25 +0100 Subject: [PATCH 06/22] Adds ChainMonitor config parameters --- pisa/sample_conf.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pisa/sample_conf.py b/pisa/sample_conf.py index 71e64a0..8d08590 100644 --- a/pisa/sample_conf.py +++ b/pisa/sample_conf.py @@ -5,6 +5,10 @@ BTC_RPC_HOST = "localhost" BTC_RPC_PORT = 18443 BTC_NETWORK = "regtest" +# CHAIN MONITOR +POLLING_DELTA = 60 +BLOCK_WINDOW_SIZE = 10 + # ZMQ FEED_PROTOCOL = "tcp" FEED_ADDR = "127.0.0.1" From 9bb69d1f5a51753b5828040f4aa2e25d282dc525 Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Mon, 6 Jan 2020 13:32:35 +0100 Subject: [PATCH 07/22] Uses conf parameters instead of hardcoded values for the ChainMonitor and includes docstrings --- pisa/chain_monitor.py | 106 ++++++++++++++++++++++++++++++++++++------ 1 file changed, 92 insertions(+), 14 deletions(-) diff --git a/pisa/chain_monitor.py b/pisa/chain_monitor.py index c9e9f83..fdbc2b4 100644 --- a/pisa/chain_monitor.py +++ b/pisa/chain_monitor.py @@ -3,13 +3,36 @@ import binascii from threading import Thread, Event, Condition from common.logger import Logger -from pisa.conf import FEED_PROTOCOL, FEED_ADDR, FEED_PORT +from pisa.conf import FEED_PROTOCOL, FEED_ADDR, FEED_PORT, POLLING_DELTA, BLOCK_WINDOW_SIZE from pisa.block_processor import BlockProcessor logger = Logger("ChainMonitor") class ChainMonitor: + """ + The :class:`ChainMonitor` is the class in charge of monitoring the blockchain (via ``bitcoind``) to detect new + blocks on top of the best chain. If a new best block is spotted, the chain monitor will notify the + :obj:`Watcher ` and the :obj:`Responder ` using ``Queues``. + + The :class:`ChainMonitor` monitors the chain using two methods: ``zmq`` and ``polling``. Blocks are only notified + once per queue and the notification is triggered by the method that detects the block faster. + + Attributes: + best_tip (:obj:`str`): a block hash representing the current best tip. + last_tips (:obj:`list`): a list of last chain tips. Used as an sliding window to avoid notifying about old tips. + terminate (:obj:`bool`): a flag to signal the termination of the :class:`ChainMonitor` (shutdown the tower). + check_tip (:obj:`Event`): an event that it's triggered at fixed time intervals and controls the polling thread. + lock (:obj:`Condition`): a lock used to protect concurrent access to the queues and ``best_tip`` by the zmq and + polling threads. + zmqSubSocket (:obj:`socket`): a socket to connect to ``bitcoind`` via ``zmq``. + watcher_queue (:obj:`Queue`): a queue to send new best tips to the :obj:`Watcher `. + responder_queue (:obj:`Queue`): a queue to send new best tips to the + :obj:`Responder `. + watcher_asleep (:obj:`bool`): a flag that signals whether to send information to the ``Watcher`` or not. + responder_asleep (:obj:`bool`): a flag that signals whether to send information to the ``Responder`` or not. + """ + def __init__(self): self.best_tip = None self.last_tips = [] @@ -29,36 +52,74 @@ class ChainMonitor: self.watcher_asleep = True self.responder_asleep = True - def attach_watcher(self, queue, awake): - self.watcher_queue = queue - self.watcher_asleep = awake + def attach_watcher(self, queue, asleep): + """ + Attaches a :obj:`Watcher ` to the :class:`ChainMonitor`. The ``Watcher`` and the + ``ChainMonitor`` are connected via the ``watcher_queue`` and the ``watcher_asleep`` flag. + + Args: + queue (:obj:`Queue`): the queue to be used to send blocks hashes to the ``Watcher``. + asleep( :obj:`bool`): whether the ``Watcher`` is initially awake or asleep. It is changed on the fly from + the ``Watcher`` when the state changes. + """ + + self.watcher_queue = queue + self.watcher_asleep = asleep + + def attach_responder(self, queue, asleep): + """ + Attaches a :obj:`Responder ` to the :class:`ChainMonitor`. The ``Responder`` and the + ``ChainMonitor`` are connected via the ``responder_queue`` and the ``responder_asleep`` flag. + + Args: + queue (:obj:`Queue`): the queue to be used to send blocks hashes to the ``Responder``. + asleep( :obj:`bool`): whether the ``Responder`` is initially awake or asleep. It is changed on the fly from + the ``Responder`` when the state changes. + """ - def attach_responder(self, queue, awake): self.responder_queue = queue - self.responder_asleep = awake + self.responder_asleep = asleep def notify_subscribers(self, block_hash): + """ + Notifies the subscribers (``Watcher`` and ``Responder``) about a new block provided they are awake. It does so + by putting the hash in the corresponding queue(s). + + Args: + block_hash (:obj:`str`): the new block hash to be sent to the awake subscribers. + """ + if not self.watcher_asleep: self.watcher_queue.put(block_hash) if not self.responder_asleep: self.responder_queue.put(block_hash) - def update_state(self, block_hash): + def update_state(self, block_hash, max_block_window_size=BLOCK_WINDOW_SIZE): + """ + Updates the state of the ``ChainMonitor``. The state is represented as the ``best_tip`` and the list of + ``last_tips``. ``last_tips`` is bounded to ``max_block_window_size``. + + Args: + block_hash (:obj:`block_hash`): the new best tip. + max_block_window_size (:obj:`int`): the maximum length of the ``last_tips`` list. + """ + self.best_tip = block_hash self.last_tips.append(block_hash) - if len(self.last_tips) > 10: + if len(self.last_tips) > max_block_window_size: self.last_tips.pop(0) - def monitor_chain(self): - self.best_tip = BlockProcessor.get_best_block_hash() - Thread(target=self.monitor_chain_polling, daemon=True).start() - Thread(target=self.monitor_chain_zmq, daemon=True).start() + def monitor_chain_polling(self, polling_delta=POLLING_DELTA): + """ + Monitors ``bitcoind`` via polling. Once the method is fired, it keeps monitoring as long as ``terminate`` is not + set. Polling is performed once every ``polling_delta`` seconds. If a new best tip if found, the shared lock is + acquired, the state is updated and the subscribers are notified, and finally the lock is released. + """ - def monitor_chain_polling(self): while not self.terminate: - self.check_tip.wait(timeout=60) + self.check_tip.wait(timeout=polling_delta) # Terminate could have been set wile the thread was blocked in wait if not self.terminate: @@ -72,6 +133,12 @@ class ChainMonitor: self.lock.release() def monitor_chain_zmq(self): + """ + Monitors ``bitcoind`` via zmq. Once the method is fired, it keeps monitoring as long as ``terminate`` is not + set. If a new best tip if found, the shared lock is acquired, the state is updated and the subscribers are + notified, and finally the lock is released. + """ + while not self.terminate: msg = self.zmqSubSocket.recv_multipart() @@ -89,3 +156,14 @@ class ChainMonitor: self.notify_subscribers(block_hash) logger.info("New block received via zmq", block_hash=block_hash) self.lock.release() + + def monitor_chain(self): + """ + Main :class:`ChainMonitor` method. It initializes the ``best_tip`` to the current one (by querying the + :obj:`BlockProcessor `) and creates two threads, one per each monitoring + approach (``zmq`` and ``polling``) + """ + + self.best_tip = BlockProcessor.get_best_block_hash() + Thread(target=self.monitor_chain_polling, daemon=True).start() + Thread(target=self.monitor_chain_zmq, daemon=True).start() From f10c3c46eb6c96e8cc625db2f8d3e4ac36470c2d Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Tue, 7 Jan 2020 16:08:10 +0100 Subject: [PATCH 08/22] Updates tests to use ChainMonitor as global fixture --- test/pisa/unit/conftest.py | 12 ++++++++++++ test/pisa/unit/test_api.py | 5 +---- test/pisa/unit/test_responder.py | 8 -------- test/pisa/unit/test_watcher.py | 8 -------- 4 files changed, 13 insertions(+), 20 deletions(-) diff --git a/test/pisa/unit/conftest.py b/test/pisa/unit/conftest.py index 4ff9028..6616611 100644 --- a/test/pisa/unit/conftest.py +++ b/test/pisa/unit/conftest.py @@ -15,6 +15,7 @@ from pisa.responder import TransactionTracker from pisa.watcher import Watcher from pisa.tools import bitcoin_cli from pisa.db_manager import DBManager +from pisa.chain_monitor import ChainMonitor from common.appointment import Appointment from bitcoind_mock.utils import sha256d @@ -50,6 +51,17 @@ def db_manager(): rmtree("test_db") +@pytest.fixture(scope="module") +def chain_monitor(): + chain_monitor = ChainMonitor() + chain_monitor.monitor_chain() + + yield chain_monitor + + chain_monitor.terminate = True + generate_block() + + def generate_keypair(): client_sk = ec.generate_private_key(ec.SECP256K1, default_backend()) client_pk = client_sk.public_key() diff --git a/test/pisa/unit/test_api.py b/test/pisa/unit/test_api.py index b6ca968..1e1d4ef 100644 --- a/test/pisa/unit/test_api.py +++ b/test/pisa/unit/test_api.py @@ -31,7 +31,7 @@ locator_dispute_tx_map = {} @pytest.fixture(scope="module") -def run_api(db_manager): +def run_api(db_manager, chain_monitor): sk, pk = generate_keypair() sk_der = sk.private_bytes( encoding=serialization.Encoding.DER, @@ -39,9 +39,6 @@ def run_api(db_manager): encryption_algorithm=serialization.NoEncryption(), ) - chain_monitor = ChainMonitor() - chain_monitor.monitor_chain() - watcher = Watcher(db_manager, chain_monitor, sk_der) chain_monitor.attach_watcher(watcher.block_queue, watcher.asleep) chain_monitor.attach_responder(watcher.responder.block_queue, watcher.responder.asleep) diff --git a/test/pisa/unit/test_responder.py b/test/pisa/unit/test_responder.py index d515ff7..b5f5949 100644 --- a/test/pisa/unit/test_responder.py +++ b/test/pisa/unit/test_responder.py @@ -19,14 +19,6 @@ from bitcoind_mock.transaction import TX from test.pisa.unit.conftest import generate_block, generate_blocks, get_random_value_hex -@pytest.fixture(scope="module") -def chain_monitor(): - chain_monitor = ChainMonitor() - chain_monitor.monitor_chain() - - return chain_monitor - - @pytest.fixture(scope="module") def responder(db_manager, chain_monitor): responder = Responder(db_manager, chain_monitor) diff --git a/test/pisa/unit/test_watcher.py b/test/pisa/unit/test_watcher.py index 2b2c0cb..80dd396 100644 --- a/test/pisa/unit/test_watcher.py +++ b/test/pisa/unit/test_watcher.py @@ -27,14 +27,6 @@ sk_der = signing_key.private_bytes( ) -@pytest.fixture(scope="module") -def chain_monitor(): - chain_monitor = ChainMonitor() - chain_monitor.monitor_chain() - - return chain_monitor - - @pytest.fixture(scope="module") def watcher(db_manager, chain_monitor): watcher = Watcher(db_manager, chain_monitor, sk_der) From 31b4c2e993c8e33a7e281b3d748ea953cf06ca07 Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Tue, 7 Jan 2020 16:08:43 +0100 Subject: [PATCH 09/22] Bug fix and moves update condition to update_state The tip update was performed in the wrong order, so tips never made it to last_tips --- pisa/chain_monitor.py | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/pisa/chain_monitor.py b/pisa/chain_monitor.py index fdbc2b4..087096b 100644 --- a/pisa/chain_monitor.py +++ b/pisa/chain_monitor.py @@ -103,13 +103,22 @@ class ChainMonitor: Args: block_hash (:obj:`block_hash`): the new best tip. max_block_window_size (:obj:`int`): the maximum length of the ``last_tips`` list. + + Returns: + (:obj:`bool`): ``True`` is the state was successfully updated, ``False`` otherwise. """ - self.best_tip = block_hash - self.last_tips.append(block_hash) + if block_hash != self.best_tip and block_hash not in self.last_tips: + self.last_tips.append(self.best_tip) + self.best_tip = block_hash - if len(self.last_tips) > max_block_window_size: - self.last_tips.pop(0) + if len(self.last_tips) > max_block_window_size: + self.last_tips.pop(0) + + return True + + else: + return False def monitor_chain_polling(self, polling_delta=POLLING_DELTA): """ @@ -126,8 +135,7 @@ class ChainMonitor: current_tip = BlockProcessor.get_best_block_hash() self.lock.acquire() - if current_tip != self.best_tip: - self.update_state(current_tip) + if self.update_state(current_tip): self.notify_subscribers(current_tip) logger.info("New block received via polling", block_hash=current_tip) self.lock.release() @@ -151,8 +159,7 @@ class ChainMonitor: block_hash = binascii.hexlify(body).decode("utf-8") self.lock.acquire() - if block_hash != self.best_tip and block_hash not in self.last_tips: - self.update_state(block_hash) + if self.update_state(block_hash): self.notify_subscribers(block_hash) logger.info("New block received via zmq", block_hash=block_hash) self.lock.release() From 40e656dcd3571f8aea52a33a2c9aa69f1afe5041 Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Tue, 7 Jan 2020 16:09:49 +0100 Subject: [PATCH 10/22] ChainMonitor unit tests --- test/pisa/unit/test_chain_monitor.py | 194 +++++++++++++++++++++++++++ 1 file changed, 194 insertions(+) create mode 100644 test/pisa/unit/test_chain_monitor.py diff --git a/test/pisa/unit/test_chain_monitor.py b/test/pisa/unit/test_chain_monitor.py new file mode 100644 index 0000000..d3037d8 --- /dev/null +++ b/test/pisa/unit/test_chain_monitor.py @@ -0,0 +1,194 @@ +import zmq +import time +from threading import Thread, Event, Condition + +from pisa.watcher import Watcher +from pisa.responder import Responder +from pisa.block_processor import BlockProcessor +from pisa.chain_monitor import ChainMonitor + +from test.pisa.unit.conftest import get_random_value_hex, generate_block + + +def test_init(run_bitcoind): + # run_bitcoind is started here instead of later on to avoid race conditions while it initializes + + # Not much to test here, just sanity checks to make sure nothings goes south in the future + chain_monitor = ChainMonitor() + + assert chain_monitor.best_tip is None + assert isinstance(chain_monitor.last_tips, list) and len(chain_monitor.last_tips) == 0 + assert chain_monitor.terminate is False + assert isinstance(chain_monitor.check_tip, Event) + assert isinstance(chain_monitor.lock, Condition) + assert isinstance(chain_monitor.zmqSubSocket, zmq.Socket) + + # The Queues and asleep flags are initialized when attaching the corresponding subscriber + assert chain_monitor.watcher_queue is None + assert chain_monitor.responder_queue is None + assert chain_monitor.watcher_asleep and chain_monitor.responder_asleep + + +def test_attach_watcher(chain_monitor): + watcher = Watcher(db_manager=None, chain_monitor=chain_monitor, sk_der=None) + chain_monitor.attach_watcher(watcher.block_queue, watcher.asleep) + + # booleans are not passed as reference in Python, so the flags need to be set separately + assert watcher.asleep == chain_monitor.watcher_asleep + watcher.asleep = False + assert chain_monitor.watcher_asleep != watcher.asleep + + # Test that the Queue work + r_hash = get_random_value_hex(32) + chain_monitor.watcher_queue.put(r_hash) + assert watcher.block_queue.get() == r_hash + + +def test_attach_responder(chain_monitor): + responder = Responder(db_manager=None, chain_monitor=chain_monitor) + chain_monitor.attach_responder(responder.block_queue, responder.asleep) + + # Same kind of testing as with the attach watcher + assert responder.asleep == chain_monitor.watcher_asleep + responder.asleep = False + assert chain_monitor.watcher_asleep != responder.asleep + + r_hash = get_random_value_hex(32) + chain_monitor.responder_queue.put(r_hash) + assert responder.block_queue.get() == r_hash + + +def test_notify_subscribers(chain_monitor): + # Subscribers are only notified as long as they are awake + new_block = get_random_value_hex(32) + + # Queues should be empty to start with + assert chain_monitor.watcher_queue.empty() + assert chain_monitor.responder_queue.empty() + + chain_monitor.watcher_asleep = True + chain_monitor.responder_asleep = True + chain_monitor.notify_subscribers(new_block) + + # And remain empty afterwards since both subscribers where asleep + assert chain_monitor.watcher_queue.empty() + assert chain_monitor.responder_queue.empty() + + # Let's flag them as awake and try again + chain_monitor.watcher_asleep = False + chain_monitor.responder_asleep = False + chain_monitor.notify_subscribers(new_block) + + assert chain_monitor.watcher_queue.get() == new_block + assert chain_monitor.responder_queue.get() == new_block + + +def test_update_state(chain_monitor): + # The state is updated after receiving a new block (and only if the block is not already known). + # Let's start by setting a best_tip and a couple of old tips + new_block_hash = get_random_value_hex(32) + chain_monitor.best_tip = new_block_hash + chain_monitor.last_tips = [get_random_value_hex(32) for _ in range(5)] + + # Now we can try to update the state with an old best_tip and see how it doesn't work + assert chain_monitor.update_state(chain_monitor.last_tips[0]) is False + + # Same should happen with the current tip + assert chain_monitor.update_state(chain_monitor.best_tip) is False + + # The state should be correctly updated with a new block hash, the chain tip should change and the old tip should + # has been added to the last_tips + another_block_hash = get_random_value_hex(32) + assert chain_monitor.update_state(another_block_hash) is True + assert chain_monitor.best_tip == another_block_hash and new_block_hash == chain_monitor.last_tips[-1] + + +def test_monitor_chain_polling(): + # Try polling with the Watcher + chain_monitor = ChainMonitor() + chain_monitor.best_tip = BlockProcessor.get_best_block_hash() + + watcher = Watcher(db_manager=None, chain_monitor=chain_monitor, sk_der=None) + chain_monitor.attach_watcher(watcher.block_queue, asleep=False) + + # monitor_chain_polling runs until terminate if set + polling_thread = Thread(target=chain_monitor.monitor_chain_polling, kwargs={"polling_delta": 0.1}, daemon=True) + polling_thread.start() + + # Check that nothings changes as long as a block is not generated + for _ in range(5): + assert chain_monitor.watcher_queue.empty() + time.sleep(0.1) + + # And that it does if we generate a block + generate_block() + + chain_monitor.watcher_queue.get() + assert chain_monitor.watcher_queue.empty() + + chain_monitor.terminate = True + polling_thread.join() + + +def test_monitor_chain_zmq(): + # Try zmq with the Responder + chain_monitor = ChainMonitor() + chain_monitor.best_tip = BlockProcessor.get_best_block_hash() + + responder = Responder(db_manager=None, chain_monitor=chain_monitor) + chain_monitor.attach_responder(responder.block_queue, asleep=False) + + zmq_thread = Thread(target=chain_monitor.monitor_chain_zmq, daemon=True) + zmq_thread.start() + + # Queues should start empty + assert chain_monitor.responder_queue.empty() + + # And have a new block every time we generate one + for _ in range(3): + generate_block() + chain_monitor.responder_queue.get() + assert chain_monitor.responder_queue.empty() + + # If we flag it to sleep no notification is sent + chain_monitor.responder_asleep = True + + for _ in range(3): + generate_block() + assert chain_monitor.responder_queue.empty() + + chain_monitor.terminate = True + # The zmq thread needs a block generation to release from the recv method. + generate_block() + + zmq_thread.join() + + +def test_monitor_chain(): + # Not much to test here, this should launch two threads (one per monitor approach) and finish on terminate + chain_monitor = ChainMonitor() + + watcher = Watcher(db_manager=None, chain_monitor=chain_monitor, sk_der=None) + responder = Responder(db_manager=None, chain_monitor=chain_monitor) + chain_monitor.attach_responder(responder.block_queue, asleep=False) + chain_monitor.attach_watcher(watcher.block_queue, asleep=False) + + chain_monitor.best_tip = None + chain_monitor.monitor_chain() + + # The tip is updated before starting the threads, so it should have changed. + assert chain_monitor.best_tip is not None + + # Blocks should be received + for _ in range(5): + generate_block() + watcher_block = chain_monitor.watcher_queue.get() + responder_block = chain_monitor.responder_queue.get() + assert watcher_block == responder_block + assert chain_monitor.watcher_queue.empty() + assert chain_monitor.responder_queue.empty() + + # And the thread be terminated on terminate + chain_monitor.terminate = True + # The zmq thread needs a block generation to release from the recv method. + generate_block() From dfc90cd9306895605ba2de2b929752a2470e4fd8 Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Tue, 7 Jan 2020 16:31:17 +0100 Subject: [PATCH 11/22] Sets ChainMonitor terminate on shutdown and removes unused imports --- pisa/pisad.py | 2 +- test/pisa/unit/test_api.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/pisa/pisad.py b/pisa/pisad.py index b027662..ebf175a 100644 --- a/pisa/pisad.py +++ b/pisa/pisad.py @@ -8,7 +8,6 @@ from pisa.api import API from pisa.watcher import Watcher from pisa.builder import Builder from pisa.conf import BTC_NETWORK, PISA_SECRET_KEY -from pisa.responder import Responder from pisa.db_manager import DBManager from pisa.chain_monitor import ChainMonitor from pisa.block_processor import BlockProcessor @@ -20,6 +19,7 @@ logger = Logger("Daemon") def handle_signals(signal_received, frame): logger.info("Closing connection with appointments db") db_manager.db.close() + chain_monitor.terminate = True logger.info("Shutting down PISA") exit(0) diff --git a/test/pisa/unit/test_api.py b/test/pisa/unit/test_api.py index 1e1d4ef..fb236a0 100644 --- a/test/pisa/unit/test_api.py +++ b/test/pisa/unit/test_api.py @@ -10,7 +10,6 @@ from pisa.watcher import Watcher from pisa.tools import bitcoin_cli from pisa import HOST, PORT from pisa.conf import MAX_APPOINTMENTS -from pisa.chain_monitor import ChainMonitor from test.pisa.unit.conftest import ( generate_block, From 7aa74d59530f1c4d4087d2a0b976b03c1fdf7c99 Mon Sep 17 00:00:00 2001 From: Turtle Date: Mon, 23 Dec 2019 21:48:45 -0500 Subject: [PATCH 12/22] Move config options used by watcher and responder to the constructor --- pisa/pisad.py | 17 ++++++++++++++--- pisa/responder.py | 5 +++-- pisa/utils/zmq_subscriber.py | 6 ++++-- pisa/watcher.py | 13 ++++++------- test/pisa/unit/conftest.py | 23 +++++++++++++++++++++++ test/pisa/unit/test_api.py | 5 +++-- test/pisa/unit/test_responder.py | 15 ++++++++------- test/pisa/unit/test_watcher.py | 5 +++-- 8 files changed, 64 insertions(+), 25 deletions(-) diff --git a/pisa/pisad.py b/pisa/pisad.py index 9c258ce..c5b8dff 100644 --- a/pisa/pisad.py +++ b/pisa/pisad.py @@ -7,7 +7,16 @@ from common.logger import Logger from pisa.api import API from pisa.watcher import Watcher from pisa.builder import Builder -from pisa.conf import BTC_NETWORK, PISA_SECRET_KEY +from pisa.conf import ( + BTC_NETWORK, + FEED_PROTOCOL, + FEED_ADDR, + FEED_PORT, + MAX_APPOINTMENTS, + EXPIRY_DELTA, + MIN_TO_SELF_DELAY, + PISA_SECRET_KEY, +) from pisa.responder import Responder from pisa.db_manager import DBManager from pisa.block_processor import BlockProcessor @@ -52,7 +61,9 @@ if __name__ == "__main__": with open(PISA_SECRET_KEY, "rb") as key_file: secret_key_der = key_file.read() - watcher = Watcher(db_manager, secret_key_der) + pisa_config = load_config(conf) + + watcher = Watcher(db_manager, secret_key_der, config=pisa_config) if len(watcher_appointments_data) == 0 and len(responder_trackers_data) == 0: logger.info("Fresh bootstrap") @@ -65,7 +76,7 @@ if __name__ == "__main__": last_block_responder = db_manager.load_last_block_hash_responder() # FIXME: 32-reorgs-offline dropped txs are not used at this point. - responder = Responder(db_manager) + responder = Responder(db_manager, pisa_config) last_common_ancestor_responder = None missed_blocks_responder = None diff --git a/pisa/responder.py b/pisa/responder.py index 0fce1cb..2d856e4 100644 --- a/pisa/responder.py +++ b/pisa/responder.py @@ -135,13 +135,14 @@ class Responder: """ - def __init__(self, db_manager): + def __init__(self, db_manager, config): self.trackers = dict() self.tx_tracker_map = dict() self.unconfirmed_txs = [] self.missed_confirmations = dict() self.asleep = True self.block_queue = Queue() + self.config = config self.zmq_subscriber = None self.db_manager = db_manager @@ -271,7 +272,7 @@ class Responder: from ``bitcoind``. Block ids are received trough the ``block_queue``. """ - self.zmq_subscriber = ZMQSubscriber(parent="Responder") + self.zmq_subscriber = ZMQSubscriber(self.config, parent="Responder") self.zmq_subscriber.handle(self.block_queue) def do_watch(self): diff --git a/pisa/utils/zmq_subscriber.py b/pisa/utils/zmq_subscriber.py index ecec9af..5bf29f5 100644 --- a/pisa/utils/zmq_subscriber.py +++ b/pisa/utils/zmq_subscriber.py @@ -8,12 +8,14 @@ from pisa.conf import FEED_PROTOCOL, FEED_ADDR, FEED_PORT class ZMQSubscriber: """ Adapted from https://github.com/bitcoin/bitcoin/blob/master/contrib/zmq/zmq_sub.py""" - def __init__(self, parent): + def __init__(self, config, parent): self.zmqContext = zmq.Context() self.zmqSubSocket = self.zmqContext.socket(zmq.SUB) self.zmqSubSocket.setsockopt(zmq.RCVHWM, 0) self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock") - self.zmqSubSocket.connect("%s://%s:%s" % (FEED_PROTOCOL, FEED_ADDR, FEED_PORT)) + self.zmqSubSocket.connect( + "%s://%s:%s" % (config.get("FEED_PROTOCOL"), config.get("FEED_ADDR"), config.get("FEED_PORT")) + ) self.logger = Logger("ZMQSubscriber-{}".format(parent)) self.terminate = False diff --git a/pisa/watcher.py b/pisa/watcher.py index 9d659db..356c707 100644 --- a/pisa/watcher.py +++ b/pisa/watcher.py @@ -10,7 +10,6 @@ from pisa.cleaner import Cleaner from pisa.responder import Responder from pisa.block_processor import BlockProcessor from pisa.utils.zmq_subscriber import ZMQSubscriber -from pisa.conf import EXPIRY_DELTA, MAX_APPOINTMENTS logger = Logger("Watcher") @@ -58,18 +57,18 @@ class Watcher: """ - def __init__(self, db_manager, sk_der, responder=None, max_appointments=MAX_APPOINTMENTS): + def __init__(self, db_manager, sk_der, config, responder=None): self.appointments = dict() self.locator_uuid_map = dict() self.asleep = True self.block_queue = Queue() - self.max_appointments = max_appointments + self.config = config self.zmq_subscriber = None self.db_manager = db_manager self.signing_key = Cryptographer.load_private_key_der(sk_der) if not isinstance(responder, Responder): - self.responder = Responder(db_manager) + self.responder = Responder(db_manager, self.config) @staticmethod def compute_locator(tx_id): @@ -115,7 +114,7 @@ class Watcher: """ - if len(self.appointments) < self.max_appointments: + if len(self.appointments) < self.config.get("MAX_APPOINTMENTS"): uuid = uuid4().hex self.appointments[uuid] = appointment @@ -157,7 +156,7 @@ class Watcher: trough the ``block_queue``. """ - self.zmq_subscriber = ZMQSubscriber(parent="Watcher") + self.zmq_subscriber = ZMQSubscriber(self.config, parent="Watcher") self.zmq_subscriber.handle(self.block_queue) def do_watch(self): @@ -182,7 +181,7 @@ class Watcher: expired_appointments = [ uuid for uuid, appointment in self.appointments.items() - if block["height"] > appointment.end_time + EXPIRY_DELTA + if block["height"] > appointment.end_time + self.config.get("EXPIRY_DELTA") ] Cleaner.delete_expired_appointment( diff --git a/test/pisa/unit/conftest.py b/test/pisa/unit/conftest.py index 4ff9028..41a9540 100644 --- a/test/pisa/unit/conftest.py +++ b/test/pisa/unit/conftest.py @@ -147,3 +147,26 @@ def generate_dummy_tracker(): ) return TransactionTracker.from_dict(tracker_data) + + +def get_config(): + config = { + "BTC_RPC_USER": "username", + "BTC_RPC_PASSWD": "password", + "BTC_RPC_HOST": "localhost", + "BTC_RPC_PORT": 8332, + "BTC_NETWORK": "regtest", + "FEED_PROTOCOL": "tcp", + "FEED_ADDR": "127.0.0.1", + "FEED_PORT": 28332, + "MAX_APPOINTMENTS": 100, + "EXPIRY_DELTA": 6, + "MIN_TO_SELF_DELAY": 20, + "SERVER_LOG_FILE": "pisa.log", + "PISA_SECRET_KEY": "pisa_sk.der", + "CLIENT_LOG_FILE": "pisa.log", + "TEST_LOG_FILE": "test.log", + "DB_PATH": "appointments", + } + + return config diff --git a/test/pisa/unit/test_api.py b/test/pisa/unit/test_api.py index 2dc830b..75c5f8f 100644 --- a/test/pisa/unit/test_api.py +++ b/test/pisa/unit/test_api.py @@ -9,7 +9,7 @@ from pisa.api import API from pisa.watcher import Watcher from pisa.tools import bitcoin_cli from pisa import HOST, PORT -from pisa.conf import MAX_APPOINTMENTS +from pisa.conf import MAX_APPOINTMENTS, EXPIRY_DELTA from test.pisa.unit.conftest import ( generate_block, @@ -17,6 +17,7 @@ from test.pisa.unit.conftest import ( get_random_value_hex, generate_dummy_appointment_data, generate_keypair, + get_config, ) from common.constants import LOCATOR_LEN_BYTES @@ -37,7 +38,7 @@ def run_api(db_manager): format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.NoEncryption(), ) - watcher = Watcher(db_manager, sk_der) + watcher = Watcher(db_manager, sk_der, get_config()) api_thread = Thread(target=API(watcher).start) api_thread.daemon = True diff --git a/test/pisa/unit/test_responder.py b/test/pisa/unit/test_responder.py index a9e99da..ddd8105 100644 --- a/test/pisa/unit/test_responder.py +++ b/test/pisa/unit/test_responder.py @@ -17,12 +17,12 @@ from common.tools import check_sha256_hex_format from bitcoind_mock.utils import sha256d from bitcoind_mock.transaction import TX -from test.pisa.unit.conftest import generate_block, generate_blocks, get_random_value_hex +from test.pisa.unit.conftest import generate_block, generate_blocks, get_random_value_hex, get_config @pytest.fixture(scope="module") def responder(db_manager): - return Responder(db_manager) + return Responder(db_manager, get_config()) @pytest.fixture() @@ -151,11 +151,12 @@ def test_init_responder(responder): assert type(responder.missed_confirmations) is dict and len(responder.missed_confirmations) == 0 assert responder.block_queue.empty() assert responder.asleep is True + assert type(responder.config) is dict assert responder.zmq_subscriber is None def test_handle_breach(db_manager): - responder = Responder(db_manager) + responder = Responder(db_manager, get_config()) uuid = uuid4().hex tracker = create_dummy_tracker() @@ -295,7 +296,7 @@ def test_do_subscribe(responder): def test_do_watch(temp_db_manager): - responder = Responder(temp_db_manager) + responder = Responder(temp_db_manager, get_config()) responder.block_queue = Queue() zmq_thread = Thread(target=responder.do_subscribe) @@ -351,7 +352,7 @@ def test_do_watch(temp_db_manager): def test_check_confirmations(temp_db_manager): - responder = Responder(temp_db_manager) + responder = Responder(temp_db_manager, get_config()) responder.block_queue = Queue() zmq_thread = Thread(target=responder.do_subscribe) @@ -414,7 +415,7 @@ def test_get_completed_trackers(db_manager): initial_height = bitcoin_cli().getblockcount() # Let's use a fresh responder for this to make it easier to compare the results - responder = Responder(db_manager) + responder = Responder(db_manager, get_config()) # A complete tracker is a tracker that has reached the appointment end with enough confirmations (> MIN_CONFIRMATIONS) # We'll create three type of transactions: end reached + enough conf, end reached + no enough conf, end not reached @@ -462,7 +463,7 @@ def test_get_completed_trackers(db_manager): def test_rebroadcast(db_manager): - responder = Responder(db_manager) + responder = Responder(db_manager, get_config()) responder.asleep = False txs_to_rebroadcast = [] diff --git a/test/pisa/unit/test_watcher.py b/test/pisa/unit/test_watcher.py index 8ec331c..bb94c60 100644 --- a/test/pisa/unit/test_watcher.py +++ b/test/pisa/unit/test_watcher.py @@ -13,6 +13,7 @@ from test.pisa.unit.conftest import ( generate_dummy_appointment, get_random_value_hex, generate_keypair, + get_config, ) from pisa.conf import EXPIRY_DELTA, MAX_APPOINTMENTS @@ -36,7 +37,7 @@ sk_der = signing_key.private_bytes( @pytest.fixture(scope="module") def watcher(db_manager): - return Watcher(db_manager, sk_der) + return Watcher(db_manager, sk_der, get_config()) @pytest.fixture(scope="module") @@ -72,7 +73,7 @@ def test_init(watcher): assert type(watcher.locator_uuid_map) is dict and len(watcher.locator_uuid_map) == 0 assert watcher.block_queue.empty() assert watcher.asleep is True - assert watcher.max_appointments == MAX_APPOINTMENTS + assert type(watcher.config) is dict assert watcher.zmq_subscriber is None assert type(watcher.responder) is Responder From c22bf220f0e7ff7a0b070c0851fae0a654932884 Mon Sep 17 00:00:00 2001 From: Turtle Date: Mon, 23 Dec 2019 22:40:33 -0500 Subject: [PATCH 13/22] Move config options used by inspector to the constructor --- pisa/api.py | 5 +++-- pisa/inspector.py | 11 ++++++----- pisa/pisad.py | 13 ++----------- test/pisa/unit/test_api.py | 7 ++++--- test/pisa/unit/test_inspector.py | 12 ++++++------ 5 files changed, 21 insertions(+), 27 deletions(-) diff --git a/pisa/api.py b/pisa/api.py index 9468166..ec8dbda 100644 --- a/pisa/api.py +++ b/pisa/api.py @@ -17,8 +17,9 @@ logger = Logger("API") class API: - def __init__(self, watcher): + def __init__(self, watcher, config): self.watcher = watcher + self.config = config def add_appointment(self): """ @@ -41,7 +42,7 @@ class API: # Check content type once if properly defined request_data = json.loads(request.get_json()) - inspector = Inspector() + inspector = Inspector(self.config) appointment = inspector.inspect( request_data.get("appointment"), request_data.get("signature"), request_data.get("public_key") ) diff --git a/pisa/inspector.py b/pisa/inspector.py index 00a7fc2..fcc570e 100644 --- a/pisa/inspector.py +++ b/pisa/inspector.py @@ -5,7 +5,6 @@ from common.constants import LOCATOR_LEN_HEX from common.cryptographer import Cryptographer from pisa import errors -import pisa.conf as conf from common.logger import Logger from common.appointment import Appointment from pisa.block_processor import BlockProcessor @@ -23,6 +22,9 @@ class Inspector: The :class:`Inspector` class is in charge of verifying that the appointment data provided by the user is correct. """ + def __init__(self, config): + self.config = config + def inspect(self, appointment_data, signature, public_key): """ Inspects whether the data provided by the user is correct. @@ -221,8 +223,7 @@ class Inspector: return rcode, message - @staticmethod - def check_to_self_delay(to_self_delay): + def check_to_self_delay(self, to_self_delay): """ Checks if the provided ``to_self_delay`` is correct. @@ -255,10 +256,10 @@ class Inspector: rcode = errors.APPOINTMENT_WRONG_FIELD_TYPE message = "wrong to_self_delay data type ({})".format(t) - elif to_self_delay < conf.MIN_TO_SELF_DELAY: + elif to_self_delay < self.config.get("MIN_TO_SELF_DELAY"): rcode = errors.APPOINTMENT_FIELD_TOO_SMALL message = "to_self_delay too small. The to_self_delay should be at least {} (current: {})".format( - conf.MIN_TO_SELF_DELAY, to_self_delay + self.config.get("MIN_TO_SELF_DELAY"), to_self_delay ) if message is not None: diff --git a/pisa/pisad.py b/pisa/pisad.py index c5b8dff..be9d571 100644 --- a/pisa/pisad.py +++ b/pisa/pisad.py @@ -7,16 +7,7 @@ from common.logger import Logger from pisa.api import API from pisa.watcher import Watcher from pisa.builder import Builder -from pisa.conf import ( - BTC_NETWORK, - FEED_PROTOCOL, - FEED_ADDR, - FEED_PORT, - MAX_APPOINTMENTS, - EXPIRY_DELTA, - MIN_TO_SELF_DELAY, - PISA_SECRET_KEY, -) +import pisa.conf as conf from pisa.responder import Responder from pisa.db_manager import DBManager from pisa.block_processor import BlockProcessor @@ -108,7 +99,7 @@ if __name__ == "__main__": watcher.block_queue = Builder.build_block_queue(missed_blocks_watcher) # Fire the API - API(watcher).start() + API(watcher, config=pisa_config).start() except Exception as e: logger.error("An error occurred: {}. Shutting down".format(e)) diff --git a/test/pisa/unit/test_api.py b/test/pisa/unit/test_api.py index 75c5f8f..820b11a 100644 --- a/test/pisa/unit/test_api.py +++ b/test/pisa/unit/test_api.py @@ -9,7 +9,6 @@ from pisa.api import API from pisa.watcher import Watcher from pisa.tools import bitcoin_cli from pisa import HOST, PORT -from pisa.conf import MAX_APPOINTMENTS, EXPIRY_DELTA from test.pisa.unit.conftest import ( generate_block, @@ -29,6 +28,8 @@ MULTIPLE_APPOINTMENTS = 10 appointments = [] locator_dispute_tx_map = {} +config = get_config() + @pytest.fixture(scope="module") def run_api(db_manager): @@ -40,7 +41,7 @@ def run_api(db_manager): ) watcher = Watcher(db_manager, sk_der, get_config()) - api_thread = Thread(target=API(watcher).start) + api_thread = Thread(target=API(watcher, config).start) api_thread.daemon = True api_thread.start() @@ -103,7 +104,7 @@ def test_request_multiple_appointments_same_locator(new_appt_data, n=MULTIPLE_AP def test_add_too_many_appointment(new_appt_data): - for _ in range(MAX_APPOINTMENTS - len(appointments)): + for _ in range(config.get("MAX_APPOINTMENTS") - len(appointments)): r = add_appointment(new_appt_data) assert r.status_code == 200 diff --git a/test/pisa/unit/test_inspector.py b/test/pisa/unit/test_inspector.py index 7b18460..4dbafce 100644 --- a/test/pisa/unit/test_inspector.py +++ b/test/pisa/unit/test_inspector.py @@ -10,13 +10,13 @@ from common.appointment import Appointment from pisa.block_processor import BlockProcessor from pisa.conf import MIN_TO_SELF_DELAY -from test.pisa.unit.conftest import get_random_value_hex, generate_dummy_appointment_data, generate_keypair +from test.pisa.unit.conftest import get_random_value_hex, generate_dummy_appointment_data, generate_keypair, get_config from common.constants import LOCATOR_LEN_BYTES, LOCATOR_LEN_HEX from common.cryptographer import Cryptographer -inspector = Inspector() +inspector = Inspector(get_config()) APPOINTMENT_OK = (0, None) NO_HEX_STRINGS = [ @@ -126,21 +126,21 @@ def test_check_to_self_delay(): # Right value, right format to_self_delays = [MIN_TO_SELF_DELAY, MIN_TO_SELF_DELAY + 1, MIN_TO_SELF_DELAY + 1000] for to_self_delay in to_self_delays: - assert Inspector.check_to_self_delay(to_self_delay) == APPOINTMENT_OK + assert inspector.check_to_self_delay(to_self_delay) == APPOINTMENT_OK # to_self_delay too small to_self_delays = [MIN_TO_SELF_DELAY - 1, MIN_TO_SELF_DELAY - 2, 0, -1, -1000] for to_self_delay in to_self_delays: - assert Inspector.check_to_self_delay(to_self_delay)[0] == APPOINTMENT_FIELD_TOO_SMALL + assert inspector.check_to_self_delay(to_self_delay)[0] == APPOINTMENT_FIELD_TOO_SMALL # Empty field to_self_delay = None - assert Inspector.check_to_self_delay(to_self_delay)[0] == APPOINTMENT_EMPTY_FIELD + assert inspector.check_to_self_delay(to_self_delay)[0] == APPOINTMENT_EMPTY_FIELD # Wrong data type to_self_delays = WRONG_TYPES for to_self_delay in to_self_delays: - assert Inspector.check_to_self_delay(to_self_delay)[0] == APPOINTMENT_WRONG_FIELD_TYPE + assert inspector.check_to_self_delay(to_self_delay)[0] == APPOINTMENT_WRONG_FIELD_TYPE def test_check_blob(): From 1a26d7d6a3418f3d9d79c5e46beeee45fd6b1a2c Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Mon, 13 Jan 2020 15:31:54 +0100 Subject: [PATCH 14/22] Fixes typos based on @orbitalturtle comments --- pisa/chain_monitor.py | 8 ++++---- test/pisa/unit/test_chain_monitor.py | 6 +++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/pisa/chain_monitor.py b/pisa/chain_monitor.py index 087096b..6327692 100644 --- a/pisa/chain_monitor.py +++ b/pisa/chain_monitor.py @@ -20,9 +20,9 @@ class ChainMonitor: Attributes: best_tip (:obj:`str`): a block hash representing the current best tip. - last_tips (:obj:`list`): a list of last chain tips. Used as an sliding window to avoid notifying about old tips. + last_tips (:obj:`list`): a list of last chain tips. Used as a sliding window to avoid notifying about old tips. terminate (:obj:`bool`): a flag to signal the termination of the :class:`ChainMonitor` (shutdown the tower). - check_tip (:obj:`Event`): an event that it's triggered at fixed time intervals and controls the polling thread. + check_tip (:obj:`Event`): an event that's triggered at fixed time intervals and controls the polling thread. lock (:obj:`Condition`): a lock used to protect concurrent access to the queues and ``best_tip`` by the zmq and polling threads. zmqSubSocket (:obj:`socket`): a socket to connect to ``bitcoind`` via ``zmq``. @@ -130,7 +130,7 @@ class ChainMonitor: while not self.terminate: self.check_tip.wait(timeout=polling_delta) - # Terminate could have been set wile the thread was blocked in wait + # Terminate could have been set while the thread was blocked in wait if not self.terminate: current_tip = BlockProcessor.get_best_block_hash() @@ -150,7 +150,7 @@ class ChainMonitor: while not self.terminate: msg = self.zmqSubSocket.recv_multipart() - # Terminate could have been set wile the thread was blocked in recv + # Terminate could have been set while the thread was blocked in recv if not self.terminate: topic = msg[0] body = msg[1] diff --git a/test/pisa/unit/test_chain_monitor.py b/test/pisa/unit/test_chain_monitor.py index d3037d8..4e8cdfb 100644 --- a/test/pisa/unit/test_chain_monitor.py +++ b/test/pisa/unit/test_chain_monitor.py @@ -13,7 +13,7 @@ from test.pisa.unit.conftest import get_random_value_hex, generate_block def test_init(run_bitcoind): # run_bitcoind is started here instead of later on to avoid race conditions while it initializes - # Not much to test here, just sanity checks to make sure nothings goes south in the future + # Not much to test here, just sanity checks to make sure nothing goes south in the future chain_monitor = ChainMonitor() assert chain_monitor.best_tip is None @@ -70,7 +70,7 @@ def test_notify_subscribers(chain_monitor): chain_monitor.responder_asleep = True chain_monitor.notify_subscribers(new_block) - # And remain empty afterwards since both subscribers where asleep + # And remain empty afterwards since both subscribers were asleep assert chain_monitor.watcher_queue.empty() assert chain_monitor.responder_queue.empty() @@ -115,7 +115,7 @@ def test_monitor_chain_polling(): polling_thread = Thread(target=chain_monitor.monitor_chain_polling, kwargs={"polling_delta": 0.1}, daemon=True) polling_thread.start() - # Check that nothings changes as long as a block is not generated + # Check that nothing changes as long as a block is not generated for _ in range(5): assert chain_monitor.watcher_queue.empty() time.sleep(0.1) From eb71ab8fc429a1c581834621bd249a04daf61135 Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Mon, 13 Jan 2020 15:42:27 +0100 Subject: [PATCH 15/22] Adds missing args on docs and adds polling_delta parm to monitor_chain --- pisa/chain_monitor.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/pisa/chain_monitor.py b/pisa/chain_monitor.py index 6327692..689a223 100644 --- a/pisa/chain_monitor.py +++ b/pisa/chain_monitor.py @@ -125,6 +125,9 @@ class ChainMonitor: Monitors ``bitcoind`` via polling. Once the method is fired, it keeps monitoring as long as ``terminate`` is not set. Polling is performed once every ``polling_delta`` seconds. If a new best tip if found, the shared lock is acquired, the state is updated and the subscribers are notified, and finally the lock is released. + + Args: + polling_delta (:obj:`int`): the time delta between polls. """ while not self.terminate: @@ -164,13 +167,16 @@ class ChainMonitor: logger.info("New block received via zmq", block_hash=block_hash) self.lock.release() - def monitor_chain(self): + def monitor_chain(self, polling_delta=POLLING_DELTA): """ Main :class:`ChainMonitor` method. It initializes the ``best_tip`` to the current one (by querying the :obj:`BlockProcessor `) and creates two threads, one per each monitoring - approach (``zmq`` and ``polling``) + approach (``zmq`` and ``polling``). + + Args: + polling_delta (:obj:`int`): the time delta between polls by the ``monitor_chain_polling`` thread. """ self.best_tip = BlockProcessor.get_best_block_hash() - Thread(target=self.monitor_chain_polling, daemon=True).start() + Thread(target=self.monitor_chain_polling, daemon=True, kwargs={"polling_delta": polling_delta}).start() Thread(target=self.monitor_chain_zmq, daemon=True).start() From ae772bf91b05bf1173de88f54248d042bf343242 Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Mon, 13 Jan 2020 15:48:52 +0100 Subject: [PATCH 16/22] Adds missing tests Tests that given a block hash and the two monitor threads running, the hash is only notified to the subscribers once (by the first thread that notices it) --- test/pisa/unit/test_chain_monitor.py | 33 +++++++++++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/test/pisa/unit/test_chain_monitor.py b/test/pisa/unit/test_chain_monitor.py index 4e8cdfb..4c8b38c 100644 --- a/test/pisa/unit/test_chain_monitor.py +++ b/test/pisa/unit/test_chain_monitor.py @@ -97,7 +97,7 @@ def test_update_state(chain_monitor): assert chain_monitor.update_state(chain_monitor.best_tip) is False # The state should be correctly updated with a new block hash, the chain tip should change and the old tip should - # has been added to the last_tips + # have been added to the last_tips another_block_hash = get_random_value_hex(32) assert chain_monitor.update_state(another_block_hash) is True assert chain_monitor.best_tip == another_block_hash and new_block_hash == chain_monitor.last_tips[-1] @@ -192,3 +192,34 @@ def test_monitor_chain(): chain_monitor.terminate = True # The zmq thread needs a block generation to release from the recv method. generate_block() + + +def test_monitor_chain_single_update(): + # This test tests that if both threads try to add the same block to the queue, only the first one will make it + chain_monitor = ChainMonitor() + + watcher = Watcher(db_manager=None, chain_monitor=chain_monitor, sk_der=None) + responder = Responder(db_manager=None, chain_monitor=chain_monitor) + chain_monitor.attach_responder(responder.block_queue, asleep=False) + chain_monitor.attach_watcher(watcher.block_queue, asleep=False) + + chain_monitor.best_tip = None + + # We will create a block and wait for the polling thread. Then check the queues to see that the block hash has only + # been added once. + chain_monitor.monitor_chain(polling_delta=2) + generate_block() + + watcher_block = chain_monitor.watcher_queue.get() + responder_block = chain_monitor.responder_queue.get() + assert watcher_block == responder_block + assert chain_monitor.watcher_queue.empty() + assert chain_monitor.responder_queue.empty() + + # The delta for polling is 2 secs, so let's wait and see + time.sleep(2) + assert chain_monitor.watcher_queue.empty() + assert chain_monitor.responder_queue.empty() + + # We can also force an update and see that it won't go through + assert chain_monitor.update_state(watcher_block) is False From e22bd8953457ee446fdbbaf496aa89f61304ac3e Mon Sep 17 00:00:00 2001 From: Turtle Date: Mon, 6 Jan 2020 01:07:22 -0500 Subject: [PATCH 17/22] Test load_config functions --- test/pisa/unit/test_pisad.py | 52 ++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) create mode 100644 test/pisa/unit/test_pisad.py diff --git a/test/pisa/unit/test_pisad.py b/test/pisa/unit/test_pisad.py new file mode 100644 index 0000000..fae1d85 --- /dev/null +++ b/test/pisa/unit/test_pisad.py @@ -0,0 +1,52 @@ +import importlib +import os +import pytest +from pathlib import Path +from shutil import copyfile + +from pisa.pisad import load_config + +test_conf_file_path = os.getcwd() + "/test/pisa/unit/test_conf.py" + + +def test_load_config(): + # Copy the sample-conf.py file to use as a test config file. + copyfile(os.getcwd() + "/pisa/sample_conf.py", test_conf_file_path) + + import test.pisa.unit.test_conf as conf + + # If the file has all the correct fields and data, it should return a dict. + conf_dict = load_config(conf) + assert type(conf_dict) == dict + + # Delete the file. + os.remove(test_conf_file_path) + + +def test_bad_load_config(): + # Create a messed up version of the file that should throw an error. + with open(test_conf_file_path, "w") as f: + f.write('# bitcoind\nBTC_RPC_USER = 0000\nBTC_RPC_PASSWD = "password"\nBTC_RPC_HOST = 000') + + import test.pisa.unit.test_conf as conf + + importlib.reload(conf) + + with pytest.raises(Exception): + conf_dict = load_config(conf) + + os.remove(test_conf_file_path) + + +def test_empty_load_config(): + # Create an empty version of the file that should throw an error. + open(test_conf_file_path, "a") + + import test.pisa.unit.test_conf as conf + + importlib.reload(conf) + + with pytest.raises(Exception): + conf_dict = load_config(conf) + + os.remove(test_conf_file_path) From 1188ddda17d11b9261acc89c03bc99a0a7113902 Mon Sep 17 00:00:00 2001 From: Turtle Date: Mon, 6 Jan 2020 01:08:23 -0500 Subject: [PATCH 18/22] Add load_config function --- pisa/pisad.py | 58 +++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 52 insertions(+), 6 deletions(-) diff --git a/pisa/pisad.py b/pisa/pisad.py index be9d571..5e84a72 100644 --- a/pisa/pisad.py +++ b/pisa/pisad.py @@ -2,7 +2,6 @@ from getopt import getopt from sys import argv, exit from signal import signal, SIGINT, SIGQUIT, SIGTERM -from pisa.conf import DB_PATH from common.logger import Logger from pisa.api import API from pisa.watcher import Watcher @@ -24,6 +23,53 @@ def handle_signals(signal_received, frame): exit(0) +def load_config(config): + """ + Looks through all of the config options to make sure they contain the right type of data and builds a config + dictionary. + + Args: + config (:obj:`module`): It takes in a config module object. + + Returns: + :obj:`dict` A dictionary containing the config values. + """ + + conf_dict = {} + + conf_fields = { + "BTC_RPC_USER": {"value": config.BTC_RPC_USER, "type": str}, + "BTC_RPC_PASSWD": {"value": config.BTC_RPC_PASSWD, "type": str}, + "BTC_RPC_HOST": {"value": config.BTC_RPC_HOST, "type": str}, + "BTC_RPC_PORT": {"value": config.BTC_RPC_PORT, "type": int}, + "BTC_NETWORK": {"value": config.BTC_NETWORK, "type": str}, + "FEED_PROTOCOL": {"value": config.FEED_PROTOCOL, "type": str}, + "FEED_ADDR": {"value": config.FEED_ADDR, "type": str}, + "FEED_PORT": {"value": config.FEED_PORT, "type": int}, + "MAX_APPOINTMENTS": {"value": config.MAX_APPOINTMENTS, "type": int}, + "EXPIRY_DELTA": {"value": config.EXPIRY_DELTA, "type": int}, + "MIN_TO_SELF_DELAY": {"value": config.MIN_TO_SELF_DELAY, "type": int}, + "SERVER_LOG_FILE": {"value": config.SERVER_LOG_FILE, "type": str}, + "PISA_SECRET_KEY": {"value": config.PISA_SECRET_KEY, "type": str}, + "CLIENT_LOG_FILE": {"value": config.CLIENT_LOG_FILE, "type": str}, + "TEST_LOG_FILE": {"value": config.TEST_LOG_FILE, "type": str}, + "DB_PATH": {"value": config.DB_PATH, "type": str}, + } + + for field in conf_fields: + value = conf_fields[field]["value"] + correct_type = conf_fields[field]["type"] + + if (value is not None) and isinstance(value, correct_type): + conf_dict[field] = value + else: + err_msg = "{} variable in config is of the wrong type".format(field) + logger.error(err_msg) + raise ValueError(err_msg) + + return conf_dict + + if __name__ == "__main__": logger.info("Starting PISA") @@ -36,24 +82,24 @@ if __name__ == "__main__": # FIXME: Leaving this here for future option/arguments pass + pisa_config = load_config(conf) + if not can_connect_to_bitcoind(): logger.error("Can't connect to bitcoind. Shutting down") - elif not in_correct_network(BTC_NETWORK): + elif not in_correct_network(pisa_config.get("BTC_NETWORK")): logger.error("bitcoind is running on a different network, check conf.py and bitcoin.conf. Shutting down") else: try: - db_manager = DBManager(DB_PATH) + db_manager = DBManager(pisa_config.get("DB_PATH")) watcher_appointments_data = db_manager.load_watcher_appointments() responder_trackers_data = db_manager.load_responder_trackers() - with open(PISA_SECRET_KEY, "rb") as key_file: + with open(pisa_config.get("PISA_SECRET_KEY"), "rb") as key_file: secret_key_der = key_file.read() - pisa_config = load_config(conf) - watcher = Watcher(db_manager, secret_key_der, config=pisa_config) if len(watcher_appointments_data) == 0 and len(responder_trackers_data) == 0: From 40d7ca1912e06dc9eae12ccace43db538add3f4c Mon Sep 17 00:00:00 2001 From: Turtle Date: Sat, 30 Nov 2019 00:42:36 -0500 Subject: [PATCH 19/22] Refactor add_appointment cli code --- apps/cli/pisa_cli.py | 228 ++++++++++++++++++++------------- common/tools.py | 13 ++ pisa/watcher.py | 18 +-- test/pisa/unit/conftest.py | 3 +- test/pisa/unit/test_watcher.py | 6 +- 5 files changed, 162 insertions(+), 106 deletions(-) diff --git a/apps/cli/pisa_cli.py b/apps/cli/pisa_cli.py index 0f2eb92..440860d 100644 --- a/apps/cli/pisa_cli.py +++ b/apps/cli/pisa_cli.py @@ -9,8 +9,8 @@ from getopt import getopt, GetoptError from requests import ConnectTimeout, ConnectionError from uuid import uuid4 -from apps.cli.blob import Blob from apps.cli.help import help_add_appointment, help_get_appointment +from apps.cli.blob import Blob from apps.cli import ( DEFAULT_PISA_API_SERVER, DEFAULT_PISA_API_PORT, @@ -22,9 +22,8 @@ from apps.cli import ( from common.logger import Logger from common.appointment import Appointment -from common.constants import LOCATOR_LEN_HEX from common.cryptographer import Cryptographer -from common.tools import check_sha256_hex_format +from common.tools import check_sha256_hex_format, compute_locator HTTP_OK = 200 @@ -46,11 +45,13 @@ def generate_dummy_appointment(): "to_self_delay": 20, } - print("Generating dummy appointment data:" "\n\n" + json.dumps(dummy_appointment_data, indent=4, sort_keys=True)) + logger.info( + "Generating dummy appointment data:" "\n\n" + json.dumps(dummy_appointment_data, indent=4, sort_keys=True) + ) json.dump(dummy_appointment_data, open("dummy_appointment_data.json", "w")) - print("\nData stored in dummy_appointment_data.json") + logger.info("\nData stored in dummy_appointment_data.json") # Loads and returns Pisa keys from disk @@ -61,11 +62,12 @@ def load_key_file_data(file_name): return key except FileNotFoundError: - raise FileNotFoundError("File not found.") + logger.error("Client's key file not found. Please check your settings.") + return False - -def compute_locator(tx_id): - return tx_id[:LOCATOR_LEN_HEX] + except IOError as e: + logger.error("I/O error({}): {}".format(e.errno, e.strerror)) + return False # Makes sure that the folder APPOINTMENTS_FOLDER_NAME exists, then saves the appointment and signature in it. @@ -85,12 +87,81 @@ def save_signed_appointment(appointment, signature): def add_appointment(args): - appointment_data = None + # Get appointment data from user. + appointment_data = parse_add_appointment_args(args) + + if appointment_data is None: + logger.error("The provided appointment JSON is empty") + return False + + valid_txid = check_sha256_hex_format(appointment_data.get("tx_id")) + + if not valid_txid: + logger.error("The provided txid is not valid") + return False + + tx_id = appointment_data.get("tx_id") + tx = appointment_data.get("tx") + + if None not in [tx_id, tx]: + appointment_data["locator"] = compute_locator(tx_id) + appointment_data["encrypted_blob"] = Cryptographer.encrypt(Blob(tx), tx_id) + + else: + logger.error("Appointment data is missing some fields.") + return False + + appointment = Appointment.from_dict(appointment_data) + + signature = get_appointment_signature(appointment) + hex_pk_der = get_pk() + + if not (appointment and signature and hex_pk_der): + return False + + data = {"appointment": appointment.to_dict(), "signature": signature, "public_key": hex_pk_der.decode("utf-8")} + + appointment_json = json.dumps(data, sort_keys=True, separators=(",", ":")) + + # Send appointment to the server. + add_appointment_endpoint = "http://{}:{}".format(pisa_api_server, pisa_api_port) + response_json = post_data_to_add_appointment_endpoint(add_appointment_endpoint, appointment_json) + + if response_json is None: + return False + + signature = response_json.get("signature") + # Check that the server signed the appointment as it should. + if signature is None: + logger.error("The response does not contain the signature of the appointment.") + return False + + valid = check_signature(signature, appointment) + + if not valid: + logger.error("The returned appointment's signature is invalid") + return False + + logger.info("Appointment accepted and signed by Pisa") + # all good, store appointment and signature + try: + save_signed_appointment(appointment.to_dict(), signature) + + except OSError as e: + logger.error("There was an error while saving the appointment", error=e) + return False + + return True + + +# Parse arguments passed to add_appointment and handle them accordingly. +# Returns appointment data. +def parse_add_appointment_args(args): use_help = "Use 'help add_appointment' for help of how to use the command" if not args: logger.error("No appointment data provided. " + use_help) - return False + return None arg_opt = args.pop(0) @@ -102,7 +173,7 @@ def add_appointment(args): fin = args.pop(0) if not os.path.isfile(fin): logger.error("Can't find file", filename=fin) - return False + return None try: with open(fin) as f: @@ -110,63 +181,19 @@ def add_appointment(args): except IOError as e: logger.error("I/O error", errno=e.errno, error=e.strerror) - return False + return None else: appointment_data = json.loads(arg_opt) except json.JSONDecodeError: logger.error("Non-JSON encoded data provided as appointment. " + use_help) - return False + return None - if not appointment_data: - logger.error("The provided JSON is empty") - return False + return appointment_data - valid_locator = check_sha256_hex_format(appointment_data.get("tx_id")) - - if not valid_locator: - logger.error("The provided locator is not valid") - return False - - add_appointment_endpoint = "http://{}:{}".format(pisa_api_server, pisa_api_port) - appointment = Appointment.from_dict(appointment_data) - - try: - sk_der = load_key_file_data(CLI_PRIVATE_KEY) - cli_sk = Cryptographer.load_private_key_der(sk_der) - - except ValueError: - logger.error("Failed to deserialize the public key. It might be in an unsupported format") - return False - - except FileNotFoundError: - logger.error("Client's private key file not found. Please check your settings") - return False - - except IOError as e: - logger.error("I/O error", errno=e.errno, error=e.strerror) - return False - - signature = Cryptographer.sign(appointment.serialize(), cli_sk) - - try: - cli_pk_der = load_key_file_data(CLI_PUBLIC_KEY) - hex_pk_der = binascii.hexlify(cli_pk_der) - - except FileNotFoundError: - logger.error("Client's public key file not found. Please check your settings") - return False - - except IOError as e: - logger.error("I/O error", errno=e.errno, error=e.strerror) - return False - - # FIXME: Exceptions for hexlify need to be covered - - data = {"appointment": appointment, "signature": signature, "public_key": hex_pk_der.decode("utf-8")} - - appointment_json = json.dumps(data, sort_keys=True, separators=(",", ":")) +# Sends appointment data to add_appointment endpoint to be processed by the server. +def post_data_to_add_appointment_endpoint(add_appointment_endpoint, appointment_json): logger.info("Sending appointment to PISA") try: @@ -176,15 +203,15 @@ def add_appointment(args): except json.JSONDecodeError: logger.error("The response was not valid JSON") - return False + return None except ConnectTimeout: logger.error("Can't connect to pisa API. Connection timeout") - return False + return None except ConnectionError: logger.error("Can't connect to pisa API. Server cannot be reached") - return False + return None if r.status_code != HTTP_OK: if "error" not in response_json: @@ -196,14 +223,17 @@ def add_appointment(args): status_code=r.status_code, description=error, ) - return False + return None if "signature" not in response_json: logger.error("The response does not contain the signature of the appointment") - return False + return None - signature = response_json["signature"] - # verify that the returned signature is valid + return response_json + + +# Verify that the signature returned from the watchtower is valid. +def check_signature(signature, appointment): try: pisa_pk_der = load_key_file_data(PISA_PUBLIC_KEY) pisa_pk = Cryptographer.load_public_key_der(pisa_pk_der) @@ -212,7 +242,7 @@ def add_appointment(args): logger.error("Failed to deserialize the public key. It might be in an unsupported format") return False - is_sig_valid = Cryptographer.verify(appointment.serialize(), signature, pisa_pk) + return Cryptographer.verify(appointment.serialize(), signature, pisa_pk) except FileNotFoundError: logger.error("Pisa's public key file not found. Please check your settings") @@ -222,21 +252,6 @@ def add_appointment(args): logger.error("I/O error", errno=e.errno, error=e.strerror) return False - if not is_sig_valid: - logger.error("The returned appointment's signature is invalid") - return False - - logger.info("Appointment accepted and signed by Pisa") - # all good, store appointment and signature - try: - save_signed_appointment(appointment, signature) - - except OSError as e: - logger.error("There was an error while saving the appointment", error=e) - return False - - return True - def get_appointment(args): if not args: @@ -260,8 +275,9 @@ def get_appointment(args): try: r = requests.get(url=get_appointment_endpoint + parameters, timeout=5) + logger.info("Appointment response returned from server: " + str(r)) + return True - print(json.dumps(r.json(), indent=4, sort_keys=True)) except ConnectTimeout: logger.error("Can't connect to pisa API. Connection timeout") return False @@ -270,7 +286,47 @@ def get_appointment(args): logger.error("Can't connect to pisa API. Server cannot be reached") return False - return True + +def get_appointment_signature(appointment): + try: + sk_der = load_key_file_data(CLI_PRIVATE_KEY) + cli_sk = Cryptographer.load_private_key_der(sk_der) + + signature = Cryptographer.sign(appointment.serialize(), cli_sk) + + return signature + + except ValueError: + logger.error("Failed to deserialize the public key. It might be in an unsupported format") + return False + + except FileNotFoundError: + logger.error("Client's private key file not found. Please check your settings") + return False + + except IOError as e: + logger.error("I/O error", errno=e.errno, error=e.strerror) + return False + + +def get_pk(): + try: + cli_pk_der = load_key_file_data(CLI_PUBLIC_KEY) + hex_pk_der = binascii.hexlify(cli_pk_der) + + return hex_pk_der + + except FileNotFoundError: + logger.error("Client's public key file not found. Please check your settings") + return False + + except IOError as e: + logger.error("I/O error", errno=e.errno, error=e.strerror) + return False + + except binascii.Error as e: + logger.error("Could not successfully encode public key as hex: ", e) + return False def show_usage(): diff --git a/common/tools.py b/common/tools.py index e527e33..2ac6d1e 100644 --- a/common/tools.py +++ b/common/tools.py @@ -1,4 +1,5 @@ import re +from common.constants import LOCATOR_LEN_HEX def check_sha256_hex_format(value): @@ -12,3 +13,15 @@ def check_sha256_hex_format(value): :mod:`bool`: Whether or not the value matches the format. """ return isinstance(value, str) and re.match(r"^[0-9A-Fa-f]{64}$", value) is not None + + +def compute_locator(tx_id): + """ + Computes an appointment locator given a transaction id. + Args: + tx_id (:obj:`str`): the transaction id used to compute the locator. + Returns: + (:obj:`str`): The computed locator. + """ + + return tx_id[:LOCATOR_LEN_HEX] diff --git a/pisa/watcher.py b/pisa/watcher.py index 9d659db..3d46032 100644 --- a/pisa/watcher.py +++ b/pisa/watcher.py @@ -3,7 +3,7 @@ from queue import Queue from threading import Thread from common.cryptographer import Cryptographer -from common.constants import LOCATOR_LEN_HEX +from common.tools import compute_locator from common.logger import Logger from pisa.cleaner import Cleaner @@ -71,20 +71,6 @@ class Watcher: if not isinstance(responder, Responder): self.responder = Responder(db_manager) - @staticmethod - def compute_locator(tx_id): - """ - Computes an appointment locator given a transaction id. - - Args: - tx_id (:obj:`str`): the transaction id used to compute the locator. - - Returns: - (:obj:`str`): The computed locator. - """ - - return tx_id[:LOCATOR_LEN_HEX] - def add_appointment(self, appointment): """ Adds a new appointment to the ``appointments`` dictionary if ``max_appointments`` has not been reached. @@ -238,7 +224,7 @@ class Watcher: found. """ - potential_locators = {Watcher.compute_locator(txid): txid for txid in txids} + potential_locators = {compute_locator(txid): txid for txid in txids} # Check is any of the tx_ids in the received block is an actual match intersection = set(self.locator_uuid_map.keys()).intersection(potential_locators.keys()) diff --git a/test/pisa/unit/conftest.py b/test/pisa/unit/conftest.py index 4ff9028..cc215c5 100644 --- a/test/pisa/unit/conftest.py +++ b/test/pisa/unit/conftest.py @@ -16,6 +16,7 @@ from pisa.watcher import Watcher from pisa.tools import bitcoin_cli from pisa.db_manager import DBManager from common.appointment import Appointment +from common.tools import compute_locator from bitcoind_mock.utils import sha256d from bitcoind_mock.transaction import TX @@ -103,7 +104,7 @@ def generate_dummy_appointment_data(real_height=True, start_time_offset=5, end_t encoding=serialization.Encoding.DER, format=serialization.PublicFormat.SubjectPublicKeyInfo ) - locator = Watcher.compute_locator(dispute_txid) + locator = compute_locator(dispute_txid) blob = Blob(dummy_appointment_data.get("tx")) encrypted_blob = Cryptographer.encrypt(blob, dummy_appointment_data.get("tx_id")) diff --git a/test/pisa/unit/test_watcher.py b/test/pisa/unit/test_watcher.py index 8ec331c..4fa2252 100644 --- a/test/pisa/unit/test_watcher.py +++ b/test/pisa/unit/test_watcher.py @@ -16,7 +16,7 @@ from test.pisa.unit.conftest import ( ) from pisa.conf import EXPIRY_DELTA, MAX_APPOINTMENTS -from common.tools import check_sha256_hex_format +from common.tools import check_sha256_hex_format, compute_locator from common.cryptographer import Cryptographer @@ -46,7 +46,7 @@ def txids(): @pytest.fixture(scope="module") def locator_uuid_map(txids): - return {Watcher.compute_locator(txid): uuid4().hex for txid in txids} + return {compute_locator(txid): uuid4().hex for txid in txids} def create_appointments(n): @@ -219,7 +219,7 @@ def test_filter_valid_breaches(watcher): dummy_appointment, _ = generate_dummy_appointment() dummy_appointment.encrypted_blob.data = encrypted_blob - dummy_appointment.locator = Watcher.compute_locator(dispute_txid) + dummy_appointment.locator = compute_locator(dispute_txid) uuid = uuid4().hex appointments = {uuid: dummy_appointment} From 39208e4b144ba70c519347da698afd619889a021 Mon Sep 17 00:00:00 2001 From: Turtle Date: Mon, 23 Dec 2019 16:19:01 -0500 Subject: [PATCH 20/22] Test new functions split off in pisa_cli --- test/apps/cli/unit/test_pisa_cli.py | 160 ++++++++++++++++++++++++---- 1 file changed, 142 insertions(+), 18 deletions(-) diff --git a/test/apps/cli/unit/test_pisa_cli.py b/test/apps/cli/unit/test_pisa_cli.py index e14fd0b..4f47f98 100644 --- a/test/apps/cli/unit/test_pisa_cli.py +++ b/test/apps/cli/unit/test_pisa_cli.py @@ -1,23 +1,41 @@ import responses import json +import os +import pytest from binascii import hexlify from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import ec +from common.appointment import Appointment +from common.cryptographer import Cryptographer + import apps.cli.pisa_cli as pisa_cli from test.apps.cli.unit.conftest import get_random_value_hex -# TODO: should find a way of doing without this -from apps.cli.pisa_cli import build_appointment - # dummy keys for the tests pisa_sk = ec.generate_private_key(ec.SECP256K1, default_backend()) pisa_pk = pisa_sk.public_key() other_sk = ec.generate_private_key(ec.SECP256K1, default_backend()) +pisa_sk_der = pisa_sk.private_bytes( + encoding=serialization.Encoding.DER, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption(), +) +pisa_pk_der = pisa_pk.public_bytes( + encoding=serialization.Encoding.DER, format=serialization.PublicFormat.SubjectPublicKeyInfo +) + +other_sk_der = other_sk.private_bytes( + encoding=serialization.Encoding.DER, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption(), +) + + # Replace the key in the module with a key we control for the tests pisa_cli.pisa_public_key = pisa_pk # Replace endpoint with dummy one @@ -31,19 +49,49 @@ dummy_appointment_request = { "start_time": 1500, "end_time": 50000, "to_self_delay": 200, + "encrypted_blob": get_random_value_hex(120), } -dummy_appointment = build_appointment(**dummy_appointment_request) -# FIXME: USE CRYPTOGRAPHER +# This is the format appointment turns into once it hits "add_appointment" +dummy_appointment_full = { + "locator": get_random_value_hex(32), + "start_time": 1500, + "end_time": 50000, + "to_self_delay": 200, + "encrypted_blob": get_random_value_hex(120), +} + +dummy_appointment = Appointment.from_dict(dummy_appointment_full) -def sign_appointment(sk, appointment): - data = json.dumps(appointment, sort_keys=True, separators=(",", ":")).encode("utf-8") - return hexlify(sk.sign(data, ec.ECDSA(hashes.SHA256()))).decode("utf-8") +def get_dummy_pisa_sk_der(*args): + return pisa_sk_der -def get_dummy_pisa_pk(der_data): - return pisa_pk +def get_dummy_pisa_pk_der(*args): + return pisa_pk_der + + +def get_dummy_hex_pk_der(*args): + return hexlify(get_dummy_pisa_pk_der(None)) + + +def get_dummy_signature(*args): + sk = Cryptographer.load_private_key_der(pisa_sk_der) + return Cryptographer.sign(dummy_appointment.serialize(), sk) + + +def get_bad_signature(*args): + sk = Cryptographer.load_private_key_der(other_sk_der) + return Cryptographer.sign(dummy_appointment.serialize(), sk) + + +def valid_sig(*args): + return True + + +def invalid_sig(*args): + return False @responses.activate @@ -51,10 +99,12 @@ def test_add_appointment(monkeypatch): # Simulate a request to add_appointment for dummy_appointment, make sure that the right endpoint is requested # and the return value is True - # make sure the test uses the right dummy key instead of loading it from disk - monkeypatch.setattr(pisa_cli, "load_public_key", get_dummy_pisa_pk) + # Make sure the test uses the dummy signature + monkeypatch.setattr(pisa_cli, "get_appointment_signature", get_dummy_signature) + monkeypatch.setattr(pisa_cli, "get_pk", get_dummy_hex_pk_der) + monkeypatch.setattr(pisa_cli, "check_signature", valid_sig) - response = {"locator": dummy_appointment["locator"], "signature": sign_appointment(pisa_sk, dummy_appointment)} + response = {"locator": dummy_appointment.to_dict()["locator"], "signature": get_dummy_signature(None)} request_url = "http://{}/".format(pisa_endpoint) responses.add(responses.POST, request_url, json=response, status=200) @@ -72,12 +122,14 @@ def test_add_appointment_with_invalid_signature(monkeypatch): # Simulate a request to add_appointment for dummy_appointment, but sign with a different key, # make sure that the right endpoint is requested, but the return value is False - # make sure the test uses the right dummy key instead of loading it from disk - monkeypatch.setattr(pisa_cli, "load_public_key", get_dummy_pisa_pk) + # Make sure the test uses the bad dummy signature + monkeypatch.setattr(pisa_cli, "get_appointment_signature", get_bad_signature) + monkeypatch.setattr(pisa_cli, "get_pk", get_dummy_hex_pk_der) + monkeypatch.setattr(pisa_cli, "check_signature", invalid_sig) response = { - "locator": dummy_appointment["locator"], - "signature": sign_appointment(other_sk, dummy_appointment), # signing with a different key + "locator": dummy_appointment.to_dict()["locator"], + "signature": get_bad_signature(None), # Sign with a bad key } request_url = "http://{}/".format(pisa_endpoint) @@ -86,3 +138,75 @@ def test_add_appointment_with_invalid_signature(monkeypatch): result = pisa_cli.add_appointment([json.dumps(dummy_appointment_request)]) assert not result + + +def test_parse_add_appointment_args(): + # If no args are passed, function should fail. + appt_data = pisa_cli.parse_add_appointment_args(None) + assert not appt_data + + # If file doesn't exist, function should fail. + appt_data = pisa_cli.parse_add_appointment_args(["-f", "nonexistent_file"]) + assert not appt_data + + # If file exists and has data in it, function should work. + with open("appt_test_file", "w") as f: + json.dump(dummy_appointment_request, f) + + appt_data = pisa_cli.parse_add_appointment_args(["-f", "appt_test_file"]) + assert appt_data + + os.remove("appt_test_file") + + # If appointment json is passed in, funcion should work. + appt_data = pisa_cli.parse_add_appointment_args([json.dumps(dummy_appointment_request)]) + assert appt_data + + +@responses.activate +def test_post_data_to_add_appointment_endpoint(): + response = { + "locator": dummy_appointment.to_dict()["locator"], + "signature": Cryptographer.sign(dummy_appointment.serialize(), pisa_sk), + } + + request_url = "http://{}/".format(pisa_endpoint) + responses.add(responses.POST, request_url, json=response, status=200) + + response = pisa_cli.post_data_to_add_appointment_endpoint(request_url, json.dumps(dummy_appointment_request)) + + assert len(responses.calls) == 1 + assert responses.calls[0].request.url == request_url + + assert response + + +def test_check_signature(monkeypatch): + # Make sure the test uses the right dummy key instead of loading it from disk + monkeypatch.setattr(pisa_cli, "load_key_file_data", get_dummy_pisa_pk_der) + + valid = pisa_cli.check_signature(get_dummy_signature(), dummy_appointment) + + assert valid + + valid = pisa_cli.check_signature(get_bad_signature(), dummy_appointment) + + assert not valid + + +def test_get_appointment_signature(monkeypatch): + # Make sure the test uses the right dummy key instead of loading it from disk + monkeypatch.setattr(pisa_cli, "load_key_file_data", get_dummy_pisa_sk_der) + + signature = pisa_cli.get_appointment_signature(dummy_appointment) + + assert isinstance(signature, str) + + +def test_get_pk(monkeypatch): + # Make sure the test uses the right dummy key instead of loading it from disk + monkeypatch.setattr(pisa_cli, "load_key_file_data", get_dummy_pisa_pk_der) + + pk = pisa_cli.get_pk() + + assert isinstance(pk, bytes) From 3775b78500f2427fd2e356c8e4a862fc5d395cec Mon Sep 17 00:00:00 2001 From: Turtle Date: Thu, 26 Dec 2019 21:54:55 -0500 Subject: [PATCH 21/22] Fix filename typo in cli README --- apps/cli/README.md | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/apps/cli/README.md b/apps/cli/README.md index a7b9393..9002c93 100644 --- a/apps/cli/README.md +++ b/apps/cli/README.md @@ -1,6 +1,6 @@ -# pisa-cli +# pisa_cli -`pisa-cli` is a command line interface to interact with the PISA server, written in Python3. +`pisa_cli` is a command line interface to interact with the PISA server, written in Python3. ## Dependencies Refer to [DEPENDENCIES.md](DEPENDENCIES.md) @@ -11,7 +11,7 @@ Refer to [INSTALL.md](INSTALL.md) ## Usage - python pisa-cli.py [global options] command [command options] [arguments] + python pisa_cli.py [global options] command [command options] [arguments] #### Global options @@ -54,7 +54,7 @@ The API will return a `text/plain` HTTP response code `200/OK` if the appointmen #### Usage - python pisa-cli add_appointment [command options] / + python pisa_cli add_appointment [command options] / if `-f, --file` **is** specified, then the command expects a path to a json file instead of a json encoded string as parameter. @@ -100,7 +100,7 @@ if `-f, --file` **is** specified, then the command expects a path to a json file #### Usage - python pisa-cli get_appointment + python pisa_cli get_appointment @@ -109,18 +109,18 @@ if `-f, --file` **is** specified, then the command expects a path to a json file Shows the list of commands or help about how to run a specific command. #### Usage - python pisa-cli help + python pisa_cli help or - python pisa-cli help command + python pisa_cli help command ## Example 1. Generate a new dummy appointment. **Note:** this appointment will never be fulfilled (it will eventually expire) since it does not corresopond to a valid transaction. However it can be used to interact with the PISA API. ``` - python pisa-cli.py generate_dummy_appointment + python pisa_cli.py generate_dummy_appointment ``` That will create a json file that follows the appointment data structure filled with dummy data and store it in `dummy_appointment_data.json`. @@ -128,7 +128,7 @@ or 2. Send the appointment to the PISA API. Which will then start monitoring for matching transactions. ``` - python pisa-cli.py add_appointment -f dummy_appointment_data.json + python pisa_cli.py add_appointment -f dummy_appointment_data.json ``` This returns a appointment locator that can be used to get updates about this appointment from PISA. @@ -136,9 +136,9 @@ or 3. Test that PISA is still watching the appointment by replacing the appointment locator received into the following command: ``` - python pisa-cli.py get_appointment + python pisa_cli.py get_appointment ``` ## PISA API -If you wish to read about the underlying API, and how to write your own tool to interact with it, refer to [PISA-API.md](PISA-API.md) \ No newline at end of file +If you wish to read about the underlying API, and how to write your own tool to interact with it, refer to [PISA-API.md](PISA-API.md) From bd41c8b862e76a3020d7583ea7d8b854cc2d3d90 Mon Sep 17 00:00:00 2001 From: Turtle Date: Fri, 27 Dec 2019 01:29:34 -0500 Subject: [PATCH 22/22] Add unit tests for remaining cli functions --- test/apps/cli/unit/test_pisa_cli.py | 80 ++++++++++++++++++++++++++--- 1 file changed, 72 insertions(+), 8 deletions(-) diff --git a/test/apps/cli/unit/test_pisa_cli.py b/test/apps/cli/unit/test_pisa_cli.py index 4f47f98..74c6a95 100644 --- a/test/apps/cli/unit/test_pisa_cli.py +++ b/test/apps/cli/unit/test_pisa_cli.py @@ -1,11 +1,11 @@ import responses import json import os -import pytest +import shutil from binascii import hexlify from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import ec from common.appointment import Appointment @@ -49,7 +49,6 @@ dummy_appointment_request = { "start_time": 1500, "end_time": 50000, "to_self_delay": 200, - "encrypted_blob": get_random_value_hex(120), } # This is the format appointment turns into once it hits "add_appointment" @@ -73,7 +72,7 @@ def get_dummy_pisa_pk_der(*args): def get_dummy_hex_pk_der(*args): - return hexlify(get_dummy_pisa_pk_der(None)) + return hexlify(get_dummy_pisa_pk_der()) def get_dummy_signature(*args): @@ -104,7 +103,7 @@ def test_add_appointment(monkeypatch): monkeypatch.setattr(pisa_cli, "get_pk", get_dummy_hex_pk_der) monkeypatch.setattr(pisa_cli, "check_signature", valid_sig) - response = {"locator": dummy_appointment.to_dict()["locator"], "signature": get_dummy_signature(None)} + response = {"locator": dummy_appointment.to_dict()["locator"], "signature": get_dummy_signature()} request_url = "http://{}/".format(pisa_endpoint) responses.add(responses.POST, request_url, json=response, status=200) @@ -129,7 +128,7 @@ def test_add_appointment_with_invalid_signature(monkeypatch): response = { "locator": dummy_appointment.to_dict()["locator"], - "signature": get_bad_signature(None), # Sign with a bad key + "signature": get_bad_signature(), # Sign with a bad key } request_url = "http://{}/".format(pisa_endpoint) @@ -137,7 +136,44 @@ def test_add_appointment_with_invalid_signature(monkeypatch): result = pisa_cli.add_appointment([json.dumps(dummy_appointment_request)]) - assert not result + assert result is False + + +def test_load_key_file_data(): + # If file exists and has data in it, function should work. + with open("key_test_file", "w+b") as f: + f.write(pisa_sk_der) + + appt_data = pisa_cli.load_key_file_data("key_test_file") + assert appt_data + + os.remove("key_test_file") + + # If file doesn't exist, function should fail. + appt_data = pisa_cli.load_key_file_data("nonexistent_file") + assert not appt_data + + +def test_save_signed_appointment(monkeypatch): + monkeypatch.setattr(pisa_cli, "APPOINTMENTS_FOLDER_NAME", "test_appointments") + + pisa_cli.save_signed_appointment(dummy_appointment.to_dict(), get_dummy_signature()) + + # In folder "Appointments," grab all files and print them. + files = os.listdir("test_appointments") + + found = False + for f in files: + if dummy_appointment.to_dict().get("locator") in f: + found = True + + assert found + + # If "appointments" directory doesn't exist, function should create it. + assert os.path.exists("test_appointments") + + # Delete test directory once we're done. + shutil.rmtree("test_appointments") def test_parse_add_appointment_args(): @@ -158,7 +194,7 @@ def test_parse_add_appointment_args(): os.remove("appt_test_file") - # If appointment json is passed in, funcion should work. + # If appointment json is passed in, function should work. appt_data = pisa_cli.parse_add_appointment_args([json.dumps(dummy_appointment_request)]) assert appt_data @@ -194,6 +230,34 @@ def test_check_signature(monkeypatch): assert not valid +@responses.activate +def test_get_appointment(): + # Response of get_appointment endpoint is an appointment with status added to it. + dummy_appointment_full["status"] = "being_watched" + response = dummy_appointment_full + + request_url = "http://{}/".format(pisa_endpoint) + "get_appointment?locator={}".format(response.get("locator")) + responses.add(responses.GET, request_url, json=response, status=200) + + result = pisa_cli.get_appointment([response.get("locator")]) + + assert len(responses.calls) == 1 + assert responses.calls[0].request.url == request_url + + assert result + + +@responses.activate +def test_get_appointment_err(): + locator = get_random_value_hex(32) + + # Test that get_appointment handles a connection error appropriately. + request_url = "http://{}/".format(pisa_endpoint) + "get_appointment?locator=".format(locator) + responses.add(responses.GET, request_url, body=ConnectionError()) + + assert not pisa_cli.get_appointment([locator]) + + def test_get_appointment_signature(monkeypatch): # Make sure the test uses the right dummy key instead of loading it from disk monkeypatch.setattr(pisa_cli, "load_key_file_data", get_dummy_pisa_sk_der)