Merge branch 'master' into 64-data-to-disk

This commit is contained in:
Sergi Delgado Segura
2020-01-15 12:44:29 +01:00
19 changed files with 1075 additions and 329 deletions

View File

@@ -1,6 +1,6 @@
# pisa-cli # pisa_cli
`pisa-cli` is a command line interface to interact with the PISA server, written in Python3. `pisa_cli` is a command line interface to interact with the PISA server, written in Python3.
## Dependencies ## Dependencies
Refer to [DEPENDENCIES.md](DEPENDENCIES.md) Refer to [DEPENDENCIES.md](DEPENDENCIES.md)
@@ -11,7 +11,7 @@ Refer to [INSTALL.md](INSTALL.md)
## Usage ## Usage
python pisa-cli.py [global options] command [command options] [arguments] python pisa_cli.py [global options] command [command options] [arguments]
#### Global options #### Global options
@@ -54,7 +54,7 @@ The API will return a `text/plain` HTTP response code `200/OK` if the appointmen
#### Usage #### Usage
python pisa-cli add_appointment [command options] <appointment>/<path_to_appointment_file> python pisa_cli add_appointment [command options] <appointment>/<path_to_appointment_file>
if `-f, --file` **is** specified, then the command expects a path to a json file instead of a json encoded if `-f, --file` **is** specified, then the command expects a path to a json file instead of a json encoded
string as parameter. string as parameter.
@@ -100,7 +100,7 @@ if `-f, --file` **is** specified, then the command expects a path to a json file
#### Usage #### Usage
python pisa-cli get_appointment <appointment_locator> python pisa_cli get_appointment <appointment_locator>
@@ -109,18 +109,18 @@ if `-f, --file` **is** specified, then the command expects a path to a json file
Shows the list of commands or help about how to run a specific command. Shows the list of commands or help about how to run a specific command.
#### Usage #### Usage
python pisa-cli help python pisa_cli help
or or
python pisa-cli help command python pisa_cli help command
## Example ## Example
1. Generate a new dummy appointment. **Note:** this appointment will never be fulfilled (it will eventually expire) since it does not corresopond to a valid transaction. However it can be used to interact with the PISA API. 1. Generate a new dummy appointment. **Note:** this appointment will never be fulfilled (it will eventually expire) since it does not corresopond to a valid transaction. However it can be used to interact with the PISA API.
``` ```
python pisa-cli.py generate_dummy_appointment python pisa_cli.py generate_dummy_appointment
``` ```
That will create a json file that follows the appointment data structure filled with dummy data and store it in `dummy_appointment_data.json`. That will create a json file that follows the appointment data structure filled with dummy data and store it in `dummy_appointment_data.json`.
@@ -128,7 +128,7 @@ or
2. Send the appointment to the PISA API. Which will then start monitoring for matching transactions. 2. Send the appointment to the PISA API. Which will then start monitoring for matching transactions.
``` ```
python pisa-cli.py add_appointment -f dummy_appointment_data.json python pisa_cli.py add_appointment -f dummy_appointment_data.json
``` ```
This returns a appointment locator that can be used to get updates about this appointment from PISA. This returns a appointment locator that can be used to get updates about this appointment from PISA.
@@ -136,9 +136,9 @@ or
3. Test that PISA is still watching the appointment by replacing the appointment locator received into the following command: 3. Test that PISA is still watching the appointment by replacing the appointment locator received into the following command:
``` ```
python pisa-cli.py get_appointment <appointment_locator> python pisa_cli.py get_appointment <appointment_locator>
``` ```
## PISA API ## PISA API
If you wish to read about the underlying API, and how to write your own tool to interact with it, refer to [PISA-API.md](PISA-API.md) If you wish to read about the underlying API, and how to write your own tool to interact with it, refer to [PISA-API.md](PISA-API.md)

View File

@@ -9,8 +9,8 @@ from getopt import getopt, GetoptError
from requests import ConnectTimeout, ConnectionError from requests import ConnectTimeout, ConnectionError
from uuid import uuid4 from uuid import uuid4
from apps.cli.blob import Blob
from apps.cli.help import help_add_appointment, help_get_appointment from apps.cli.help import help_add_appointment, help_get_appointment
from apps.cli.blob import Blob
from apps.cli import ( from apps.cli import (
DEFAULT_PISA_API_SERVER, DEFAULT_PISA_API_SERVER,
DEFAULT_PISA_API_PORT, DEFAULT_PISA_API_PORT,
@@ -22,9 +22,8 @@ from apps.cli import (
from common.logger import Logger from common.logger import Logger
from common.appointment import Appointment from common.appointment import Appointment
from common.constants import LOCATOR_LEN_HEX
from common.cryptographer import Cryptographer from common.cryptographer import Cryptographer
from common.tools import check_sha256_hex_format from common.tools import check_sha256_hex_format, compute_locator
HTTP_OK = 200 HTTP_OK = 200
@@ -46,11 +45,13 @@ def generate_dummy_appointment():
"to_self_delay": 20, "to_self_delay": 20,
} }
print("Generating dummy appointment data:" "\n\n" + json.dumps(dummy_appointment_data, indent=4, sort_keys=True)) logger.info(
"Generating dummy appointment data:" "\n\n" + json.dumps(dummy_appointment_data, indent=4, sort_keys=True)
)
json.dump(dummy_appointment_data, open("dummy_appointment_data.json", "w")) json.dump(dummy_appointment_data, open("dummy_appointment_data.json", "w"))
print("\nData stored in dummy_appointment_data.json") logger.info("\nData stored in dummy_appointment_data.json")
# Loads and returns Pisa keys from disk # Loads and returns Pisa keys from disk
@@ -61,11 +62,12 @@ def load_key_file_data(file_name):
return key return key
except FileNotFoundError: except FileNotFoundError:
raise FileNotFoundError("File not found.") logger.error("Client's key file not found. Please check your settings.")
return False
except IOError as e:
def compute_locator(tx_id): logger.error("I/O error({}): {}".format(e.errno, e.strerror))
return tx_id[:LOCATOR_LEN_HEX] return False
# Makes sure that the folder APPOINTMENTS_FOLDER_NAME exists, then saves the appointment and signature in it. # Makes sure that the folder APPOINTMENTS_FOLDER_NAME exists, then saves the appointment and signature in it.
@@ -85,12 +87,81 @@ def save_signed_appointment(appointment, signature):
def add_appointment(args): def add_appointment(args):
appointment_data = None # Get appointment data from user.
appointment_data = parse_add_appointment_args(args)
if appointment_data is None:
logger.error("The provided appointment JSON is empty")
return False
valid_txid = check_sha256_hex_format(appointment_data.get("tx_id"))
if not valid_txid:
logger.error("The provided txid is not valid")
return False
tx_id = appointment_data.get("tx_id")
tx = appointment_data.get("tx")
if None not in [tx_id, tx]:
appointment_data["locator"] = compute_locator(tx_id)
appointment_data["encrypted_blob"] = Cryptographer.encrypt(Blob(tx), tx_id)
else:
logger.error("Appointment data is missing some fields.")
return False
appointment = Appointment.from_dict(appointment_data)
signature = get_appointment_signature(appointment)
hex_pk_der = get_pk()
if not (appointment and signature and hex_pk_der):
return False
data = {"appointment": appointment.to_dict(), "signature": signature, "public_key": hex_pk_der.decode("utf-8")}
appointment_json = json.dumps(data, sort_keys=True, separators=(",", ":"))
# Send appointment to the server.
add_appointment_endpoint = "http://{}:{}".format(pisa_api_server, pisa_api_port)
response_json = post_data_to_add_appointment_endpoint(add_appointment_endpoint, appointment_json)
if response_json is None:
return False
signature = response_json.get("signature")
# Check that the server signed the appointment as it should.
if signature is None:
logger.error("The response does not contain the signature of the appointment.")
return False
valid = check_signature(signature, appointment)
if not valid:
logger.error("The returned appointment's signature is invalid")
return False
logger.info("Appointment accepted and signed by Pisa")
# all good, store appointment and signature
try:
save_signed_appointment(appointment.to_dict(), signature)
except OSError as e:
logger.error("There was an error while saving the appointment", error=e)
return False
return True
# Parse arguments passed to add_appointment and handle them accordingly.
# Returns appointment data.
def parse_add_appointment_args(args):
use_help = "Use 'help add_appointment' for help of how to use the command" use_help = "Use 'help add_appointment' for help of how to use the command"
if not args: if not args:
logger.error("No appointment data provided. " + use_help) logger.error("No appointment data provided. " + use_help)
return False return None
arg_opt = args.pop(0) arg_opt = args.pop(0)
@@ -102,7 +173,7 @@ def add_appointment(args):
fin = args.pop(0) fin = args.pop(0)
if not os.path.isfile(fin): if not os.path.isfile(fin):
logger.error("Can't find file", filename=fin) logger.error("Can't find file", filename=fin)
return False return None
try: try:
with open(fin) as f: with open(fin) as f:
@@ -110,63 +181,19 @@ def add_appointment(args):
except IOError as e: except IOError as e:
logger.error("I/O error", errno=e.errno, error=e.strerror) logger.error("I/O error", errno=e.errno, error=e.strerror)
return False return None
else: else:
appointment_data = json.loads(arg_opt) appointment_data = json.loads(arg_opt)
except json.JSONDecodeError: except json.JSONDecodeError:
logger.error("Non-JSON encoded data provided as appointment. " + use_help) logger.error("Non-JSON encoded data provided as appointment. " + use_help)
return False return None
if not appointment_data: return appointment_data
logger.error("The provided JSON is empty")
return False
valid_locator = check_sha256_hex_format(appointment_data.get("tx_id"))
if not valid_locator:
logger.error("The provided locator is not valid")
return False
add_appointment_endpoint = "http://{}:{}".format(pisa_api_server, pisa_api_port)
appointment = Appointment.from_dict(appointment_data)
try:
sk_der = load_key_file_data(CLI_PRIVATE_KEY)
cli_sk = Cryptographer.load_private_key_der(sk_der)
except ValueError:
logger.error("Failed to deserialize the public key. It might be in an unsupported format")
return False
except FileNotFoundError:
logger.error("Client's private key file not found. Please check your settings")
return False
except IOError as e:
logger.error("I/O error", errno=e.errno, error=e.strerror)
return False
signature = Cryptographer.sign(appointment.serialize(), cli_sk)
try:
cli_pk_der = load_key_file_data(CLI_PUBLIC_KEY)
hex_pk_der = binascii.hexlify(cli_pk_der)
except FileNotFoundError:
logger.error("Client's public key file not found. Please check your settings")
return False
except IOError as e:
logger.error("I/O error", errno=e.errno, error=e.strerror)
return False
# FIXME: Exceptions for hexlify need to be covered
data = {"appointment": appointment, "signature": signature, "public_key": hex_pk_der.decode("utf-8")}
appointment_json = json.dumps(data, sort_keys=True, separators=(",", ":"))
# Sends appointment data to add_appointment endpoint to be processed by the server.
def post_data_to_add_appointment_endpoint(add_appointment_endpoint, appointment_json):
logger.info("Sending appointment to PISA") logger.info("Sending appointment to PISA")
try: try:
@@ -176,15 +203,15 @@ def add_appointment(args):
except json.JSONDecodeError: except json.JSONDecodeError:
logger.error("The response was not valid JSON") logger.error("The response was not valid JSON")
return False return None
except ConnectTimeout: except ConnectTimeout:
logger.error("Can't connect to pisa API. Connection timeout") logger.error("Can't connect to pisa API. Connection timeout")
return False return None
except ConnectionError: except ConnectionError:
logger.error("Can't connect to pisa API. Server cannot be reached") logger.error("Can't connect to pisa API. Server cannot be reached")
return False return None
if r.status_code != HTTP_OK: if r.status_code != HTTP_OK:
if "error" not in response_json: if "error" not in response_json:
@@ -196,14 +223,17 @@ def add_appointment(args):
status_code=r.status_code, status_code=r.status_code,
description=error, description=error,
) )
return False return None
if "signature" not in response_json: if "signature" not in response_json:
logger.error("The response does not contain the signature of the appointment") logger.error("The response does not contain the signature of the appointment")
return False return None
signature = response_json["signature"] return response_json
# verify that the returned signature is valid
# Verify that the signature returned from the watchtower is valid.
def check_signature(signature, appointment):
try: try:
pisa_pk_der = load_key_file_data(PISA_PUBLIC_KEY) pisa_pk_der = load_key_file_data(PISA_PUBLIC_KEY)
pisa_pk = Cryptographer.load_public_key_der(pisa_pk_der) pisa_pk = Cryptographer.load_public_key_der(pisa_pk_der)
@@ -212,7 +242,7 @@ def add_appointment(args):
logger.error("Failed to deserialize the public key. It might be in an unsupported format") logger.error("Failed to deserialize the public key. It might be in an unsupported format")
return False return False
is_sig_valid = Cryptographer.verify(appointment.serialize(), signature, pisa_pk) return Cryptographer.verify(appointment.serialize(), signature, pisa_pk)
except FileNotFoundError: except FileNotFoundError:
logger.error("Pisa's public key file not found. Please check your settings") logger.error("Pisa's public key file not found. Please check your settings")
@@ -222,21 +252,6 @@ def add_appointment(args):
logger.error("I/O error", errno=e.errno, error=e.strerror) logger.error("I/O error", errno=e.errno, error=e.strerror)
return False return False
if not is_sig_valid:
logger.error("The returned appointment's signature is invalid")
return False
logger.info("Appointment accepted and signed by Pisa")
# all good, store appointment and signature
try:
save_signed_appointment(appointment, signature)
except OSError as e:
logger.error("There was an error while saving the appointment", error=e)
return False
return True
def get_appointment(args): def get_appointment(args):
if not args: if not args:
@@ -260,8 +275,9 @@ def get_appointment(args):
try: try:
r = requests.get(url=get_appointment_endpoint + parameters, timeout=5) r = requests.get(url=get_appointment_endpoint + parameters, timeout=5)
logger.info("Appointment response returned from server: " + str(r))
return True
print(json.dumps(r.json(), indent=4, sort_keys=True))
except ConnectTimeout: except ConnectTimeout:
logger.error("Can't connect to pisa API. Connection timeout") logger.error("Can't connect to pisa API. Connection timeout")
return False return False
@@ -270,7 +286,47 @@ def get_appointment(args):
logger.error("Can't connect to pisa API. Server cannot be reached") logger.error("Can't connect to pisa API. Server cannot be reached")
return False return False
return True
def get_appointment_signature(appointment):
try:
sk_der = load_key_file_data(CLI_PRIVATE_KEY)
cli_sk = Cryptographer.load_private_key_der(sk_der)
signature = Cryptographer.sign(appointment.serialize(), cli_sk)
return signature
except ValueError:
logger.error("Failed to deserialize the public key. It might be in an unsupported format")
return False
except FileNotFoundError:
logger.error("Client's private key file not found. Please check your settings")
return False
except IOError as e:
logger.error("I/O error", errno=e.errno, error=e.strerror)
return False
def get_pk():
try:
cli_pk_der = load_key_file_data(CLI_PUBLIC_KEY)
hex_pk_der = binascii.hexlify(cli_pk_der)
return hex_pk_der
except FileNotFoundError:
logger.error("Client's public key file not found. Please check your settings")
return False
except IOError as e:
logger.error("I/O error", errno=e.errno, error=e.strerror)
return False
except binascii.Error as e:
logger.error("Could not successfully encode public key as hex: ", e)
return False
def show_usage(): def show_usage():

View File

@@ -1,4 +1,5 @@
import re import re
from common.constants import LOCATOR_LEN_HEX
def check_sha256_hex_format(value): def check_sha256_hex_format(value):
@@ -12,3 +13,15 @@ def check_sha256_hex_format(value):
:mod:`bool`: Whether or not the value matches the format. :mod:`bool`: Whether or not the value matches the format.
""" """
return isinstance(value, str) and re.match(r"^[0-9A-Fa-f]{64}$", value) is not None return isinstance(value, str) and re.match(r"^[0-9A-Fa-f]{64}$", value) is not None
def compute_locator(tx_id):
"""
Computes an appointment locator given a transaction id.
Args:
tx_id (:obj:`str`): the transaction id used to compute the locator.
Returns:
(:obj:`str`): The computed locator.
"""
return tx_id[:LOCATOR_LEN_HEX]

View File

@@ -17,21 +17,22 @@ logger = Logger("API")
class API: class API:
def __init__(self, watcher): def __init__(self, watcher, config):
self.watcher = watcher self.watcher = watcher
self.config = config
def add_appointment(self): def add_appointment(self):
""" """
Main endpoint of the Watchtower. Main endpoint of the Watchtower.
The client sends requests (appointments) to this endpoint to request a job to the Watchtower. Requests must be json The client sends requests (appointments) to this endpoint to request a job to the Watchtower. Requests must be
encoded and contain an ``appointment`` field and optionally a ``signature`` and ``public_key`` fields. json encoded and contain an ``appointment`` field and optionally a ``signature`` and ``public_key`` fields.
Returns: Returns:
:obj:`tuple`: A tuple containing the response (``json``) and response code (``int``). For accepted appointments, :obj:`tuple`: A tuple containing the response (``json``) and response code (``int``). For accepted
the ``rcode`` is always 0 and the response contains the signed receipt. For rejected appointments, the ``rcode`` appointments, the ``rcode`` is always 0 and the response contains the signed receipt. For rejected
is a negative value and the response contains the error message. Error messages can be found at appointments, the ``rcode`` is a negative value and the response contains the error message. Error messages
:mod:`Errors <pisa.errors>`. can be found at :mod:`Errors <pisa.errors>`.
""" """
remote_addr = request.environ.get("REMOTE_ADDR") remote_addr = request.environ.get("REMOTE_ADDR")
@@ -41,7 +42,7 @@ class API:
# Check content type once if properly defined # Check content type once if properly defined
request_data = json.loads(request.get_json()) request_data = json.loads(request.get_json())
inspector = Inspector() inspector = Inspector(self.config)
appointment = inspector.inspect( appointment = inspector.inspect(
request_data.get("appointment"), request_data.get("signature"), request_data.get("public_key") request_data.get("appointment"), request_data.get("signature"), request_data.get("public_key")
) )
@@ -165,8 +166,8 @@ class API:
""" """
Provides the block height of the Watchtower. Provides the block height of the Watchtower.
This is a testing endpoint that (most likely) will be removed in production. Its purpose is to give information to This is a testing endpoint that (most likely) will be removed in production. Its purpose is to give information
testers about the current block so they can define a dummy appointment without having to run a bitcoin node. to testers about the current block so they can define a dummy appointment without having to run a bitcoin node.
Returns: Returns:
:obj:`dict`: A json encoded dictionary containing the block height. :obj:`dict`: A json encoded dictionary containing the block height.

182
pisa/chain_monitor.py Normal file
View File

@@ -0,0 +1,182 @@
import zmq
import binascii
from threading import Thread, Event, Condition
from common.logger import Logger
from pisa.conf import FEED_PROTOCOL, FEED_ADDR, FEED_PORT, POLLING_DELTA, BLOCK_WINDOW_SIZE
from pisa.block_processor import BlockProcessor
logger = Logger("ChainMonitor")
class ChainMonitor:
"""
The :class:`ChainMonitor` is the class in charge of monitoring the blockchain (via ``bitcoind``) to detect new
blocks on top of the best chain. If a new best block is spotted, the chain monitor will notify the
:obj:`Watcher <pisa.watcher.Watcher>` and the :obj:`Responder <pisa.responder.Responder>` using ``Queues``.
The :class:`ChainMonitor` monitors the chain using two methods: ``zmq`` and ``polling``. Blocks are only notified
once per queue and the notification is triggered by the method that detects the block faster.
Attributes:
best_tip (:obj:`str`): a block hash representing the current best tip.
last_tips (:obj:`list`): a list of last chain tips. Used as a sliding window to avoid notifying about old tips.
terminate (:obj:`bool`): a flag to signal the termination of the :class:`ChainMonitor` (shutdown the tower).
check_tip (:obj:`Event`): an event that's triggered at fixed time intervals and controls the polling thread.
lock (:obj:`Condition`): a lock used to protect concurrent access to the queues and ``best_tip`` by the zmq and
polling threads.
zmqSubSocket (:obj:`socket`): a socket to connect to ``bitcoind`` via ``zmq``.
watcher_queue (:obj:`Queue`): a queue to send new best tips to the :obj:`Watcher <pisa.watcher.Watcher>`.
responder_queue (:obj:`Queue`): a queue to send new best tips to the
:obj:`Responder <pisa.responder.Responder>`.
watcher_asleep (:obj:`bool`): a flag that signals whether to send information to the ``Watcher`` or not.
responder_asleep (:obj:`bool`): a flag that signals whether to send information to the ``Responder`` or not.
"""
def __init__(self):
self.best_tip = None
self.last_tips = []
self.terminate = False
self.check_tip = Event()
self.lock = Condition()
self.zmqContext = zmq.Context()
self.zmqSubSocket = self.zmqContext.socket(zmq.SUB)
self.zmqSubSocket.setsockopt(zmq.RCVHWM, 0)
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock")
self.zmqSubSocket.connect("%s://%s:%s" % (FEED_PROTOCOL, FEED_ADDR, FEED_PORT))
self.watcher_queue = None
self.responder_queue = None
self.watcher_asleep = True
self.responder_asleep = True
def attach_watcher(self, queue, asleep):
"""
Attaches a :obj:`Watcher <pisa.watcher.Watcher>` to the :class:`ChainMonitor`. The ``Watcher`` and the
``ChainMonitor`` are connected via the ``watcher_queue`` and the ``watcher_asleep`` flag.
Args:
queue (:obj:`Queue`): the queue to be used to send blocks hashes to the ``Watcher``.
asleep( :obj:`bool`): whether the ``Watcher`` is initially awake or asleep. It is changed on the fly from
the ``Watcher`` when the state changes.
"""
self.watcher_queue = queue
self.watcher_asleep = asleep
def attach_responder(self, queue, asleep):
"""
Attaches a :obj:`Responder <pisa.responder.Responder>` to the :class:`ChainMonitor`. The ``Responder`` and the
``ChainMonitor`` are connected via the ``responder_queue`` and the ``responder_asleep`` flag.
Args:
queue (:obj:`Queue`): the queue to be used to send blocks hashes to the ``Responder``.
asleep( :obj:`bool`): whether the ``Responder`` is initially awake or asleep. It is changed on the fly from
the ``Responder`` when the state changes.
"""
self.responder_queue = queue
self.responder_asleep = asleep
def notify_subscribers(self, block_hash):
"""
Notifies the subscribers (``Watcher`` and ``Responder``) about a new block provided they are awake. It does so
by putting the hash in the corresponding queue(s).
Args:
block_hash (:obj:`str`): the new block hash to be sent to the awake subscribers.
"""
if not self.watcher_asleep:
self.watcher_queue.put(block_hash)
if not self.responder_asleep:
self.responder_queue.put(block_hash)
def update_state(self, block_hash, max_block_window_size=BLOCK_WINDOW_SIZE):
"""
Updates the state of the ``ChainMonitor``. The state is represented as the ``best_tip`` and the list of
``last_tips``. ``last_tips`` is bounded to ``max_block_window_size``.
Args:
block_hash (:obj:`block_hash`): the new best tip.
max_block_window_size (:obj:`int`): the maximum length of the ``last_tips`` list.
Returns:
(:obj:`bool`): ``True`` is the state was successfully updated, ``False`` otherwise.
"""
if block_hash != self.best_tip and block_hash not in self.last_tips:
self.last_tips.append(self.best_tip)
self.best_tip = block_hash
if len(self.last_tips) > max_block_window_size:
self.last_tips.pop(0)
return True
else:
return False
def monitor_chain_polling(self, polling_delta=POLLING_DELTA):
"""
Monitors ``bitcoind`` via polling. Once the method is fired, it keeps monitoring as long as ``terminate`` is not
set. Polling is performed once every ``polling_delta`` seconds. If a new best tip if found, the shared lock is
acquired, the state is updated and the subscribers are notified, and finally the lock is released.
Args:
polling_delta (:obj:`int`): the time delta between polls.
"""
while not self.terminate:
self.check_tip.wait(timeout=polling_delta)
# Terminate could have been set while the thread was blocked in wait
if not self.terminate:
current_tip = BlockProcessor.get_best_block_hash()
self.lock.acquire()
if self.update_state(current_tip):
self.notify_subscribers(current_tip)
logger.info("New block received via polling", block_hash=current_tip)
self.lock.release()
def monitor_chain_zmq(self):
"""
Monitors ``bitcoind`` via zmq. Once the method is fired, it keeps monitoring as long as ``terminate`` is not
set. If a new best tip if found, the shared lock is acquired, the state is updated and the subscribers are
notified, and finally the lock is released.
"""
while not self.terminate:
msg = self.zmqSubSocket.recv_multipart()
# Terminate could have been set while the thread was blocked in recv
if not self.terminate:
topic = msg[0]
body = msg[1]
if topic == b"hashblock":
block_hash = binascii.hexlify(body).decode("utf-8")
self.lock.acquire()
if self.update_state(block_hash):
self.notify_subscribers(block_hash)
logger.info("New block received via zmq", block_hash=block_hash)
self.lock.release()
def monitor_chain(self, polling_delta=POLLING_DELTA):
"""
Main :class:`ChainMonitor` method. It initializes the ``best_tip`` to the current one (by querying the
:obj:`BlockProcessor <pisa.block_processor.BlockProcessor>`) and creates two threads, one per each monitoring
approach (``zmq`` and ``polling``).
Args:
polling_delta (:obj:`int`): the time delta between polls by the ``monitor_chain_polling`` thread.
"""
self.best_tip = BlockProcessor.get_best_block_hash()
Thread(target=self.monitor_chain_polling, daemon=True, kwargs={"polling_delta": polling_delta}).start()
Thread(target=self.monitor_chain_zmq, daemon=True).start()

View File

@@ -5,7 +5,6 @@ from common.constants import LOCATOR_LEN_HEX
from common.cryptographer import Cryptographer from common.cryptographer import Cryptographer
from pisa import errors from pisa import errors
import pisa.conf as conf
from common.logger import Logger from common.logger import Logger
from common.appointment import Appointment from common.appointment import Appointment
from pisa.block_processor import BlockProcessor from pisa.block_processor import BlockProcessor
@@ -23,6 +22,9 @@ class Inspector:
The :class:`Inspector` class is in charge of verifying that the appointment data provided by the user is correct. The :class:`Inspector` class is in charge of verifying that the appointment data provided by the user is correct.
""" """
def __init__(self, config):
self.config = config
def inspect(self, appointment_data, signature, public_key): def inspect(self, appointment_data, signature, public_key):
""" """
Inspects whether the data provided by the user is correct. Inspects whether the data provided by the user is correct.
@@ -221,8 +223,7 @@ class Inspector:
return rcode, message return rcode, message
@staticmethod def check_to_self_delay(self, to_self_delay):
def check_to_self_delay(to_self_delay):
""" """
Checks if the provided ``to_self_delay`` is correct. Checks if the provided ``to_self_delay`` is correct.
@@ -255,10 +256,10 @@ class Inspector:
rcode = errors.APPOINTMENT_WRONG_FIELD_TYPE rcode = errors.APPOINTMENT_WRONG_FIELD_TYPE
message = "wrong to_self_delay data type ({})".format(t) message = "wrong to_self_delay data type ({})".format(t)
elif to_self_delay < conf.MIN_TO_SELF_DELAY: elif to_self_delay < self.config.get("MIN_TO_SELF_DELAY"):
rcode = errors.APPOINTMENT_FIELD_TOO_SMALL rcode = errors.APPOINTMENT_FIELD_TOO_SMALL
message = "to_self_delay too small. The to_self_delay should be at least {} (current: {})".format( message = "to_self_delay too small. The to_self_delay should be at least {} (current: {})".format(
conf.MIN_TO_SELF_DELAY, to_self_delay self.config.get("MIN_TO_SELF_DELAY"), to_self_delay
) )
if message is not None: if message is not None:

View File

@@ -2,14 +2,13 @@ from getopt import getopt
from sys import argv, exit from sys import argv, exit
from signal import signal, SIGINT, SIGQUIT, SIGTERM from signal import signal, SIGINT, SIGQUIT, SIGTERM
from pisa.conf import DB_PATH
from common.logger import Logger from common.logger import Logger
from pisa.api import API from pisa.api import API
from pisa.watcher import Watcher from pisa.watcher import Watcher
from pisa.builder import Builder from pisa.builder import Builder
from pisa.conf import BTC_NETWORK, PISA_SECRET_KEY import pisa.conf as conf
from pisa.responder import Responder
from pisa.db_manager import DBManager from pisa.db_manager import DBManager
from pisa.chain_monitor import ChainMonitor
from pisa.block_processor import BlockProcessor from pisa.block_processor import BlockProcessor
from pisa.tools import can_connect_to_bitcoind, in_correct_network from pisa.tools import can_connect_to_bitcoind, in_correct_network
@@ -19,11 +18,59 @@ logger = Logger("Daemon")
def handle_signals(signal_received, frame): def handle_signals(signal_received, frame):
logger.info("Closing connection with appointments db") logger.info("Closing connection with appointments db")
db_manager.db.close() db_manager.db.close()
chain_monitor.terminate = True
logger.info("Shutting down PISA") logger.info("Shutting down PISA")
exit(0) exit(0)
def load_config(config):
"""
Looks through all of the config options to make sure they contain the right type of data and builds a config
dictionary.
Args:
config (:obj:`module`): It takes in a config module object.
Returns:
:obj:`dict` A dictionary containing the config values.
"""
conf_dict = {}
conf_fields = {
"BTC_RPC_USER": {"value": config.BTC_RPC_USER, "type": str},
"BTC_RPC_PASSWD": {"value": config.BTC_RPC_PASSWD, "type": str},
"BTC_RPC_HOST": {"value": config.BTC_RPC_HOST, "type": str},
"BTC_RPC_PORT": {"value": config.BTC_RPC_PORT, "type": int},
"BTC_NETWORK": {"value": config.BTC_NETWORK, "type": str},
"FEED_PROTOCOL": {"value": config.FEED_PROTOCOL, "type": str},
"FEED_ADDR": {"value": config.FEED_ADDR, "type": str},
"FEED_PORT": {"value": config.FEED_PORT, "type": int},
"MAX_APPOINTMENTS": {"value": config.MAX_APPOINTMENTS, "type": int},
"EXPIRY_DELTA": {"value": config.EXPIRY_DELTA, "type": int},
"MIN_TO_SELF_DELAY": {"value": config.MIN_TO_SELF_DELAY, "type": int},
"SERVER_LOG_FILE": {"value": config.SERVER_LOG_FILE, "type": str},
"PISA_SECRET_KEY": {"value": config.PISA_SECRET_KEY, "type": str},
"CLIENT_LOG_FILE": {"value": config.CLIENT_LOG_FILE, "type": str},
"TEST_LOG_FILE": {"value": config.TEST_LOG_FILE, "type": str},
"DB_PATH": {"value": config.DB_PATH, "type": str},
}
for field in conf_fields:
value = conf_fields[field]["value"]
correct_type = conf_fields[field]["type"]
if (value is not None) and isinstance(value, correct_type):
conf_dict[field] = value
else:
err_msg = "{} variable in config is of the wrong type".format(field)
logger.error(err_msg)
raise ValueError(err_msg)
return conf_dict
if __name__ == "__main__": if __name__ == "__main__":
logger.info("Starting PISA") logger.info("Starting PISA")
@@ -36,23 +83,31 @@ if __name__ == "__main__":
# FIXME: Leaving this here for future option/arguments # FIXME: Leaving this here for future option/arguments
pass pass
pisa_config = load_config(conf)
if not can_connect_to_bitcoind(): if not can_connect_to_bitcoind():
logger.error("Can't connect to bitcoind. Shutting down") logger.error("Can't connect to bitcoind. Shutting down")
elif not in_correct_network(BTC_NETWORK): elif not in_correct_network(pisa_config.get("BTC_NETWORK")):
logger.error("bitcoind is running on a different network, check conf.py and bitcoin.conf. Shutting down") logger.error("bitcoind is running on a different network, check conf.py and bitcoin.conf. Shutting down")
else: else:
try: try:
db_manager = DBManager(DB_PATH) db_manager = DBManager(pisa_config.get("DB_PATH"))
# Create the chain monitor and start monitoring the chain
chain_monitor = ChainMonitor()
chain_monitor.monitor_chain()
watcher_appointments_data = db_manager.load_watcher_appointments() watcher_appointments_data = db_manager.load_watcher_appointments()
responder_trackers_data = db_manager.load_responder_trackers() responder_trackers_data = db_manager.load_responder_trackers()
with open(PISA_SECRET_KEY, "rb") as key_file: with open(pisa_config.get("PISA_SECRET_KEY"), "rb") as key_file:
secret_key_der = key_file.read() secret_key_der = key_file.read()
watcher = Watcher(db_manager, secret_key_der) watcher = Watcher(db_manager, chain_monitor, secret_key_der, pisa_config)
chain_monitor.attach_watcher(watcher.block_queue, watcher.asleep)
chain_monitor.attach_responder(watcher.responder.block_queue, watcher.responder.asleep)
if len(watcher_appointments_data) == 0 and len(responder_trackers_data) == 0: if len(watcher_appointments_data) == 0 and len(responder_trackers_data) == 0:
logger.info("Fresh bootstrap") logger.info("Fresh bootstrap")
@@ -65,7 +120,6 @@ if __name__ == "__main__":
last_block_responder = db_manager.load_last_block_hash_responder() last_block_responder = db_manager.load_last_block_hash_responder()
# FIXME: 32-reorgs-offline dropped txs are not used at this point. # FIXME: 32-reorgs-offline dropped txs are not used at this point.
responder = Responder(db_manager)
last_common_ancestor_responder = None last_common_ancestor_responder = None
missed_blocks_responder = None missed_blocks_responder = None
@@ -76,12 +130,12 @@ if __name__ == "__main__":
) )
missed_blocks_responder = block_processor.get_missed_blocks(last_common_ancestor_responder) missed_blocks_responder = block_processor.get_missed_blocks(last_common_ancestor_responder)
responder.trackers, responder.tx_tracker_map = Builder.build_trackers(responder_trackers_data) watcher.responder.trackers, watcher.responder.tx_tracker_map = Builder.build_trackers(
responder.block_queue = Builder.build_block_queue(missed_blocks_responder) responder_trackers_data
)
watcher.responder.block_queue = Builder.build_block_queue(missed_blocks_responder)
# Build Watcher with Responder and backed up data. If the blocks of both match we don't perform the # Build Watcher. If the blocks of both match we don't perform the search twice.
# search twice.
watcher.responder = responder
if last_block_watcher is not None: if last_block_watcher is not None:
if last_block_watcher == last_block_responder: if last_block_watcher == last_block_responder:
missed_blocks_watcher = missed_blocks_responder missed_blocks_watcher = missed_blocks_responder
@@ -97,7 +151,7 @@ if __name__ == "__main__":
watcher.block_queue = Builder.build_block_queue(missed_blocks_watcher) watcher.block_queue = Builder.build_block_queue(missed_blocks_watcher)
# Fire the API # Fire the API
API(watcher).start() API(watcher, config=pisa_config).start()
except Exception as e: except Exception as e:
logger.error("An error occurred: {}. Shutting down".format(e)) logger.error("An error occurred: {}. Shutting down".format(e))

View File

@@ -6,7 +6,6 @@ from common.logger import Logger
from pisa.cleaner import Cleaner from pisa.cleaner import Cleaner
from pisa.carrier import Carrier from pisa.carrier import Carrier
from pisa.block_processor import BlockProcessor from pisa.block_processor import BlockProcessor
from pisa.utils.zmq_subscriber import ZMQSubscriber
CONFIRMATIONS_BEFORE_RETRY = 6 CONFIRMATIONS_BEFORE_RETRY = 6
MIN_CONFIRMATIONS = 6 MIN_CONFIRMATIONS = 6
@@ -128,22 +127,22 @@ class Responder:
has missed. Used to trigger rebroadcast if needed. has missed. Used to trigger rebroadcast if needed.
asleep (:obj:`bool`): A flag that signals whether the :obj:`Responder` is asleep or awake. asleep (:obj:`bool`): A flag that signals whether the :obj:`Responder` is asleep or awake.
block_queue (:obj:`Queue`): A queue used by the :obj:`Responder` to receive block hashes from ``bitcoind``. It block_queue (:obj:`Queue`): A queue used by the :obj:`Responder` to receive block hashes from ``bitcoind``. It
is populated by the :obj:`ZMQSubscriber <pisa.utils.zmq_subscriber.ZMQSubscriber>`. is populated by the :obj:`ChainMonitor <pisa.chain_monitor.ChainMonitor>`.
zmq_subscriber (:obj:`ZMQSubscriber <pisa.utils.zmq_subscriber.ZMQSubscriber>`): a ``ZMQSubscriber`` instance chain_monitor (:obj:`ChainMonitor <pisa.chain_monitor.ChainMonitor>`): a ``ChainMonitor`` instance used to track
used to receive new block notifications from ``bitcoind``. new blocks received by ``bitcoind``.
db_manager (:obj:`DBManager <pisa.db_manager.DBManager>`): A ``DBManager`` instance to interact with the db_manager (:obj:`DBManager <pisa.db_manager.DBManager>`): A ``DBManager`` instance to interact with the
database. database.
""" """
def __init__(self, db_manager): def __init__(self, db_manager, chain_monitor):
self.trackers = dict() self.trackers = dict()
self.tx_tracker_map = dict() self.tx_tracker_map = dict()
self.unconfirmed_txs = [] self.unconfirmed_txs = []
self.missed_confirmations = dict() self.missed_confirmations = dict()
self.asleep = True self.asleep = True
self.block_queue = Queue() self.block_queue = Queue()
self.zmq_subscriber = None self.chain_monitor = chain_monitor
self.db_manager = db_manager self.db_manager = db_manager
@staticmethod @staticmethod
@@ -226,8 +225,7 @@ class Responder:
``penalty_txid`` added to ``unconfirmed_txs`` if ``confirmations=0``. Finally, all the data is stored in the ``penalty_txid`` added to ``unconfirmed_txs`` if ``confirmations=0``. Finally, all the data is stored in the
database. database.
``add_tracker`` awakes the :obj:`Responder` and creates a connection with the ``add_tracker`` awakes the :obj:`Responder` if it is asleep.
:obj:`ZMQSubscriber <pisa.utils.zmq_subscriber.ZMQSubscriber>` if he is asleep.
Args: Args:
uuid (:obj:`str`): a unique identifier for the appointment. uuid (:obj:`str`): a unique identifier for the appointment.
@@ -268,19 +266,8 @@ class Responder:
if self.asleep: if self.asleep:
self.asleep = False self.asleep = False
zmq_thread = Thread(target=self.do_subscribe) self.chain_monitor.responder_asleep = False
responder = Thread(target=self.do_watch) Thread(target=self.do_watch).start()
zmq_thread.start()
responder.start()
def do_subscribe(self):
"""
Initializes a :obj:`ZMQSubscriber <pisa.utils.zmq_subscriber.ZMQSubscriber>` instance to listen to new blocks
from ``bitcoind``. Block ids are received trough the ``block_queue``.
"""
self.zmq_subscriber = ZMQSubscriber(parent="Responder")
self.zmq_subscriber.handle(self.block_queue)
def do_watch(self): def do_watch(self):
""" """
@@ -335,8 +322,7 @@ class Responder:
# Go back to sleep if there are no more pending trackers # Go back to sleep if there are no more pending trackers
self.asleep = True self.asleep = True
self.zmq_subscriber.terminate = True self.chain_monitor.responder_asleep = True
self.block_queue = Queue()
logger.info("No more pending trackers, going back to sleep") logger.info("No more pending trackers, going back to sleep")
@@ -492,9 +478,6 @@ class Responder:
else: else:
# If the penalty transaction is missing, we need to reset the tracker. # If the penalty transaction is missing, we need to reset the tracker.
# DISCUSS: Adding tracker back, should we flag it as retried?
# FIXME: Whether we decide to increase the retried counter or not, the current counter should be
# maintained. There is no way of doing so with the current approach. Update if required
self.handle_breach( self.handle_breach(
tracker.locator, tracker.locator,
uuid, uuid,

View File

@@ -5,6 +5,10 @@ BTC_RPC_HOST = "localhost"
BTC_RPC_PORT = 18443 BTC_RPC_PORT = 18443
BTC_NETWORK = "regtest" BTC_NETWORK = "regtest"
# CHAIN MONITOR
POLLING_DELTA = 60
BLOCK_WINDOW_SIZE = 10
# ZMQ # ZMQ
FEED_PROTOCOL = "tcp" FEED_PROTOCOL = "tcp"
FEED_ADDR = "127.0.0.1" FEED_ADDR = "127.0.0.1"

View File

@@ -8,12 +8,14 @@ from pisa.conf import FEED_PROTOCOL, FEED_ADDR, FEED_PORT
class ZMQSubscriber: class ZMQSubscriber:
""" Adapted from https://github.com/bitcoin/bitcoin/blob/master/contrib/zmq/zmq_sub.py""" """ Adapted from https://github.com/bitcoin/bitcoin/blob/master/contrib/zmq/zmq_sub.py"""
def __init__(self, parent): def __init__(self, config, parent):
self.zmqContext = zmq.Context() self.zmqContext = zmq.Context()
self.zmqSubSocket = self.zmqContext.socket(zmq.SUB) self.zmqSubSocket = self.zmqContext.socket(zmq.SUB)
self.zmqSubSocket.setsockopt(zmq.RCVHWM, 0) self.zmqSubSocket.setsockopt(zmq.RCVHWM, 0)
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock") self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock")
self.zmqSubSocket.connect("%s://%s:%s" % (FEED_PROTOCOL, FEED_ADDR, FEED_PORT)) self.zmqSubSocket.connect(
"%s://%s:%s" % (config.get("FEED_PROTOCOL"), config.get("FEED_ADDR"), config.get("FEED_PORT"))
)
self.logger = Logger("ZMQSubscriber-{}".format(parent)) self.logger = Logger("ZMQSubscriber-{}".format(parent))
self.terminate = False self.terminate = False

View File

@@ -3,15 +3,14 @@ from queue import Queue
from threading import Thread from threading import Thread
from common.cryptographer import Cryptographer from common.cryptographer import Cryptographer
from common.constants import LOCATOR_LEN_HEX
from common.appointment import Appointment from common.appointment import Appointment
from common.tools import compute_locator
from common.logger import Logger from common.logger import Logger
from pisa.cleaner import Cleaner from pisa.cleaner import Cleaner
from pisa.responder import Responder from pisa.responder import Responder
from pisa.block_processor import BlockProcessor from pisa.block_processor import BlockProcessor
from pisa.utils.zmq_subscriber import ZMQSubscriber
from pisa.conf import EXPIRY_DELTA, MAX_APPOINTMENTS
logger = Logger("Watcher") logger = Logger("Watcher")
@@ -29,15 +28,17 @@ class Watcher:
If an appointment reaches its end with no breach, the data is simply deleted. If an appointment reaches its end with no breach, the data is simply deleted.
The :class:`Watcher` receives information about new received blocks via the ``block_queue`` that is populated by the The :class:`Watcher` receives information about new received blocks via the ``block_queue`` that is populated by the
:obj:`ZMQSubscriber <pisa.utils.zmq_subscriber>`. :obj:`ChainMonitor <pisa.chain_monitor.ChainMonitor>`.
Args: Args:
db_manager (:obj:`DBManager <pisa.db_manager>`): a ``DBManager`` instance to interact with the database. db_manager (:obj:`DBManager <pisa.db_manager>`): a ``DBManager`` instance to interact with the database.
chain_monitor (:obj:`ChainMonitor <pisa.chain_monitor.ChainMonitor>`): a ``ChainMonitor`` instance used to track
new blocks received by ``bitcoind``.
sk_der (:obj:`bytes`): a DER encoded private key used to sign appointment receipts (signaling acceptance). sk_der (:obj:`bytes`): a DER encoded private key used to sign appointment receipts (signaling acceptance).
config (:obj:`dict`): a dictionary containing all the configuration parameters. Used locally to retrieve
``MAX_APPOINTMENTS`` and ``EXPIRY_DELTA``.
responder (:obj:`Responder <pisa.responder.Responder>`): a ``Responder`` instance. If ``None`` is passed, a new responder (:obj:`Responder <pisa.responder.Responder>`): a ``Responder`` instance. If ``None`` is passed, a new
instance is created. Populated instances are useful when bootstrapping the system from backed-up data. instance is created. Populated instances are useful when bootstrapping the system from backed-up data.
max_appointments(:obj:`int`): the maximum amount of appointments that the :obj:`Watcher` will keep at any given
time. Defaults to ``MAX_APPOINTMENTS``.
Attributes: Attributes:
@@ -48,44 +49,31 @@ class Watcher:
appointments with the same ``locator``. appointments with the same ``locator``.
asleep (:obj:`bool`): A flag that signals whether the :obj:`Watcher` is asleep or awake. asleep (:obj:`bool`): A flag that signals whether the :obj:`Watcher` is asleep or awake.
block_queue (:obj:`Queue`): A queue used by the :obj:`Watcher` to receive block hashes from ``bitcoind``. It is block_queue (:obj:`Queue`): A queue used by the :obj:`Watcher` to receive block hashes from ``bitcoind``. It is
populated by the :obj:`ZMQSubscriber <pisa.utils.zmq_subscriber.ZMQSubscriber>`. populated by the :obj:`ChainMonitor <pisa.chain_monitor.ChainMonitor>`.
max_appointments(:obj:`int`): the maximum amount of appointments that the :obj:`Watcher` will keep at any given chain_monitor (:obj:`ChainMonitor <pisa.chain_monitor.ChainMonitor>`): a ``ChainMonitor`` instance used to track
time. new blocks received by ``bitcoind``.
zmq_subscriber (:obj:`ZMQSubscriber <pisa.utils.zmq_subscriber.ZMQSubscriber>`): a ZMQSubscriber instance used config (:obj:`dict`): a dictionary containing all the configuration parameters. Used locally to retrieve
to receive new block notifications from ``bitcoind``. ``MAX_APPOINTMENTS`` and ``EXPIRY_DELTA``.
db_manager (:obj:`DBManager <pisa.db_manager>`): A db manager instance to interact with the database. db_manager (:obj:`DBManager <pisa.db_manager>`): A db manager instance to interact with the database.
signing_key (:mod:`EllipticCurvePrivateKey`): a private key used to sign accepted appointments.
Raises: Raises:
ValueError: if `pisa_sk_file` is not found. ValueError: if `pisa_sk_file` is not found.
""" """
def __init__(self, db_manager, sk_der, responder=None, max_appointments=MAX_APPOINTMENTS): def __init__(self, db_manager, chain_monitor, sk_der, config, responder=None):
self.appointments = dict() self.appointments = dict()
self.locator_uuid_map = dict() self.locator_uuid_map = dict()
self.asleep = True self.asleep = True
self.block_queue = Queue() self.block_queue = Queue()
self.max_appointments = max_appointments self.chain_monitor = chain_monitor
self.zmq_subscriber = None self.config = config
self.db_manager = db_manager self.db_manager = db_manager
self.signing_key = Cryptographer.load_private_key_der(sk_der) self.signing_key = Cryptographer.load_private_key_der(sk_der)
if not isinstance(responder, Responder): if not isinstance(responder, Responder):
self.responder = Responder(db_manager) self.responder = Responder(db_manager, chain_monitor)
@staticmethod
def compute_locator(tx_id):
"""
Computes an appointment locator given a transaction id.
Args:
tx_id (:obj:`str`): the transaction id used to compute the locator.
Returns:
(:obj:`str`): The computed locator.
"""
return tx_id[:LOCATOR_LEN_HEX]
def add_appointment(self, appointment): def add_appointment(self, appointment):
""" """
@@ -117,8 +105,8 @@ class Watcher:
""" """
if len(self.appointments) < self.max_appointments: if len(self.appointments) < self.config.get("MAX_APPOINTMENTS"):
# Appointments are stored in disk, we only keep the end_time, locator and locator_uuid map in memory
uuid = uuid4().hex uuid = uuid4().hex
self.appointments[uuid] = {"locator": appointment.locator, "end_time": appointment.end_time} self.appointments[uuid] = {"locator": appointment.locator, "end_time": appointment.end_time}
@@ -130,10 +118,8 @@ class Watcher:
if self.asleep: if self.asleep:
self.asleep = False self.asleep = False
zmq_thread = Thread(target=self.do_subscribe) self.chain_monitor.watcher_asleep = False
watcher = Thread(target=self.do_watch) Thread(target=self.do_watch).start()
zmq_thread.start()
watcher.start()
logger.info("Waking up") logger.info("Waking up")
@@ -153,15 +139,6 @@ class Watcher:
return appointment_added, signature return appointment_added, signature
def do_subscribe(self):
"""
Initializes a ``ZMQSubscriber`` instance to listen to new blocks from ``bitcoind``. Block ids are received
trough the ``block_queue``.
"""
self.zmq_subscriber = ZMQSubscriber(parent="Watcher")
self.zmq_subscriber.handle(self.block_queue)
def do_watch(self): def do_watch(self):
""" """
Monitors the blockchain whilst there are pending appointments. Monitors the blockchain whilst there are pending appointments.
@@ -184,7 +161,7 @@ class Watcher:
expired_appointments = [ expired_appointments = [
uuid uuid
for uuid, appointment_data in self.appointments.items() for uuid, appointment_data in self.appointments.items()
if block["height"] > appointment_data.get("end_time") + EXPIRY_DELTA if block["height"] > appointment_data.get("end_time") + self.config.get("EXPIRY_DELTA")
] ]
Cleaner.delete_expired_appointment( Cleaner.delete_expired_appointment(
@@ -223,8 +200,7 @@ class Watcher:
# Go back to sleep if there are no more appointments # Go back to sleep if there are no more appointments
self.asleep = True self.asleep = True
self.zmq_subscriber.terminate = True self.chain_monitor.watcher_asleep = True
self.block_queue = Queue()
logger.info("No more pending appointments, going back to sleep") logger.info("No more pending appointments, going back to sleep")
@@ -240,7 +216,7 @@ class Watcher:
found. found.
""" """
potential_locators = {Watcher.compute_locator(txid): txid for txid in txids} potential_locators = {compute_locator(txid): txid for txid in txids}
# Check is any of the tx_ids in the received block is an actual match # Check is any of the tx_ids in the received block is an actual match
intersection = set(self.locator_uuid_map.keys()).intersection(potential_locators.keys()) intersection = set(self.locator_uuid_map.keys()).intersection(potential_locators.keys())

View File

@@ -1,23 +1,41 @@
import responses import responses
import json import json
import os
import shutil
from binascii import hexlify from binascii import hexlify
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ec from cryptography.hazmat.primitives.asymmetric import ec
from common.appointment import Appointment
from common.cryptographer import Cryptographer
import apps.cli.pisa_cli as pisa_cli import apps.cli.pisa_cli as pisa_cli
from test.apps.cli.unit.conftest import get_random_value_hex from test.apps.cli.unit.conftest import get_random_value_hex
# TODO: should find a way of doing without this
from apps.cli.pisa_cli import build_appointment
# dummy keys for the tests # dummy keys for the tests
pisa_sk = ec.generate_private_key(ec.SECP256K1, default_backend()) pisa_sk = ec.generate_private_key(ec.SECP256K1, default_backend())
pisa_pk = pisa_sk.public_key() pisa_pk = pisa_sk.public_key()
other_sk = ec.generate_private_key(ec.SECP256K1, default_backend()) other_sk = ec.generate_private_key(ec.SECP256K1, default_backend())
pisa_sk_der = pisa_sk.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
pisa_pk_der = pisa_pk.public_bytes(
encoding=serialization.Encoding.DER, format=serialization.PublicFormat.SubjectPublicKeyInfo
)
other_sk_der = other_sk.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
# Replace the key in the module with a key we control for the tests # Replace the key in the module with a key we control for the tests
pisa_cli.pisa_public_key = pisa_pk pisa_cli.pisa_public_key = pisa_pk
# Replace endpoint with dummy one # Replace endpoint with dummy one
@@ -32,18 +50,47 @@ dummy_appointment_request = {
"end_time": 50000, "end_time": 50000,
"to_self_delay": 200, "to_self_delay": 200,
} }
dummy_appointment = build_appointment(**dummy_appointment_request)
# FIXME: USE CRYPTOGRAPHER # This is the format appointment turns into once it hits "add_appointment"
dummy_appointment_full = {
"locator": get_random_value_hex(32),
"start_time": 1500,
"end_time": 50000,
"to_self_delay": 200,
"encrypted_blob": get_random_value_hex(120),
}
dummy_appointment = Appointment.from_dict(dummy_appointment_full)
def sign_appointment(sk, appointment): def get_dummy_pisa_sk_der(*args):
data = json.dumps(appointment, sort_keys=True, separators=(",", ":")).encode("utf-8") return pisa_sk_der
return hexlify(sk.sign(data, ec.ECDSA(hashes.SHA256()))).decode("utf-8")
def get_dummy_pisa_pk(der_data): def get_dummy_pisa_pk_der(*args):
return pisa_pk return pisa_pk_der
def get_dummy_hex_pk_der(*args):
return hexlify(get_dummy_pisa_pk_der())
def get_dummy_signature(*args):
sk = Cryptographer.load_private_key_der(pisa_sk_der)
return Cryptographer.sign(dummy_appointment.serialize(), sk)
def get_bad_signature(*args):
sk = Cryptographer.load_private_key_der(other_sk_der)
return Cryptographer.sign(dummy_appointment.serialize(), sk)
def valid_sig(*args):
return True
def invalid_sig(*args):
return False
@responses.activate @responses.activate
@@ -51,10 +98,12 @@ def test_add_appointment(monkeypatch):
# Simulate a request to add_appointment for dummy_appointment, make sure that the right endpoint is requested # Simulate a request to add_appointment for dummy_appointment, make sure that the right endpoint is requested
# and the return value is True # and the return value is True
# make sure the test uses the right dummy key instead of loading it from disk # Make sure the test uses the dummy signature
monkeypatch.setattr(pisa_cli, "load_public_key", get_dummy_pisa_pk) monkeypatch.setattr(pisa_cli, "get_appointment_signature", get_dummy_signature)
monkeypatch.setattr(pisa_cli, "get_pk", get_dummy_hex_pk_der)
monkeypatch.setattr(pisa_cli, "check_signature", valid_sig)
response = {"locator": dummy_appointment["locator"], "signature": sign_appointment(pisa_sk, dummy_appointment)} response = {"locator": dummy_appointment.to_dict()["locator"], "signature": get_dummy_signature()}
request_url = "http://{}/".format(pisa_endpoint) request_url = "http://{}/".format(pisa_endpoint)
responses.add(responses.POST, request_url, json=response, status=200) responses.add(responses.POST, request_url, json=response, status=200)
@@ -72,12 +121,14 @@ def test_add_appointment_with_invalid_signature(monkeypatch):
# Simulate a request to add_appointment for dummy_appointment, but sign with a different key, # Simulate a request to add_appointment for dummy_appointment, but sign with a different key,
# make sure that the right endpoint is requested, but the return value is False # make sure that the right endpoint is requested, but the return value is False
# make sure the test uses the right dummy key instead of loading it from disk # Make sure the test uses the bad dummy signature
monkeypatch.setattr(pisa_cli, "load_public_key", get_dummy_pisa_pk) monkeypatch.setattr(pisa_cli, "get_appointment_signature", get_bad_signature)
monkeypatch.setattr(pisa_cli, "get_pk", get_dummy_hex_pk_der)
monkeypatch.setattr(pisa_cli, "check_signature", invalid_sig)
response = { response = {
"locator": dummy_appointment["locator"], "locator": dummy_appointment.to_dict()["locator"],
"signature": sign_appointment(other_sk, dummy_appointment), # signing with a different key "signature": get_bad_signature(), # Sign with a bad key
} }
request_url = "http://{}/".format(pisa_endpoint) request_url = "http://{}/".format(pisa_endpoint)
@@ -85,4 +136,141 @@ def test_add_appointment_with_invalid_signature(monkeypatch):
result = pisa_cli.add_appointment([json.dumps(dummy_appointment_request)]) result = pisa_cli.add_appointment([json.dumps(dummy_appointment_request)])
assert not result assert result is False
def test_load_key_file_data():
# If file exists and has data in it, function should work.
with open("key_test_file", "w+b") as f:
f.write(pisa_sk_der)
appt_data = pisa_cli.load_key_file_data("key_test_file")
assert appt_data
os.remove("key_test_file")
# If file doesn't exist, function should fail.
appt_data = pisa_cli.load_key_file_data("nonexistent_file")
assert not appt_data
def test_save_signed_appointment(monkeypatch):
monkeypatch.setattr(pisa_cli, "APPOINTMENTS_FOLDER_NAME", "test_appointments")
pisa_cli.save_signed_appointment(dummy_appointment.to_dict(), get_dummy_signature())
# In folder "Appointments," grab all files and print them.
files = os.listdir("test_appointments")
found = False
for f in files:
if dummy_appointment.to_dict().get("locator") in f:
found = True
assert found
# If "appointments" directory doesn't exist, function should create it.
assert os.path.exists("test_appointments")
# Delete test directory once we're done.
shutil.rmtree("test_appointments")
def test_parse_add_appointment_args():
# If no args are passed, function should fail.
appt_data = pisa_cli.parse_add_appointment_args(None)
assert not appt_data
# If file doesn't exist, function should fail.
appt_data = pisa_cli.parse_add_appointment_args(["-f", "nonexistent_file"])
assert not appt_data
# If file exists and has data in it, function should work.
with open("appt_test_file", "w") as f:
json.dump(dummy_appointment_request, f)
appt_data = pisa_cli.parse_add_appointment_args(["-f", "appt_test_file"])
assert appt_data
os.remove("appt_test_file")
# If appointment json is passed in, function should work.
appt_data = pisa_cli.parse_add_appointment_args([json.dumps(dummy_appointment_request)])
assert appt_data
@responses.activate
def test_post_data_to_add_appointment_endpoint():
response = {
"locator": dummy_appointment.to_dict()["locator"],
"signature": Cryptographer.sign(dummy_appointment.serialize(), pisa_sk),
}
request_url = "http://{}/".format(pisa_endpoint)
responses.add(responses.POST, request_url, json=response, status=200)
response = pisa_cli.post_data_to_add_appointment_endpoint(request_url, json.dumps(dummy_appointment_request))
assert len(responses.calls) == 1
assert responses.calls[0].request.url == request_url
assert response
def test_check_signature(monkeypatch):
# Make sure the test uses the right dummy key instead of loading it from disk
monkeypatch.setattr(pisa_cli, "load_key_file_data", get_dummy_pisa_pk_der)
valid = pisa_cli.check_signature(get_dummy_signature(), dummy_appointment)
assert valid
valid = pisa_cli.check_signature(get_bad_signature(), dummy_appointment)
assert not valid
@responses.activate
def test_get_appointment():
# Response of get_appointment endpoint is an appointment with status added to it.
dummy_appointment_full["status"] = "being_watched"
response = dummy_appointment_full
request_url = "http://{}/".format(pisa_endpoint) + "get_appointment?locator={}".format(response.get("locator"))
responses.add(responses.GET, request_url, json=response, status=200)
result = pisa_cli.get_appointment([response.get("locator")])
assert len(responses.calls) == 1
assert responses.calls[0].request.url == request_url
assert result
@responses.activate
def test_get_appointment_err():
locator = get_random_value_hex(32)
# Test that get_appointment handles a connection error appropriately.
request_url = "http://{}/".format(pisa_endpoint) + "get_appointment?locator=".format(locator)
responses.add(responses.GET, request_url, body=ConnectionError())
assert not pisa_cli.get_appointment([locator])
def test_get_appointment_signature(monkeypatch):
# Make sure the test uses the right dummy key instead of loading it from disk
monkeypatch.setattr(pisa_cli, "load_key_file_data", get_dummy_pisa_sk_der)
signature = pisa_cli.get_appointment_signature(dummy_appointment)
assert isinstance(signature, str)
def test_get_pk(monkeypatch):
# Make sure the test uses the right dummy key instead of loading it from disk
monkeypatch.setattr(pisa_cli, "load_key_file_data", get_dummy_pisa_pk_der)
pk = pisa_cli.get_pk()
assert isinstance(pk, bytes)

View File

@@ -12,10 +12,11 @@ from cryptography.hazmat.primitives import serialization
from apps.cli.blob import Blob from apps.cli.blob import Blob
from pisa.responder import TransactionTracker from pisa.responder import TransactionTracker
from pisa.watcher import Watcher
from pisa.tools import bitcoin_cli from pisa.tools import bitcoin_cli
from pisa.db_manager import DBManager from pisa.db_manager import DBManager
from pisa.chain_monitor import ChainMonitor
from common.appointment import Appointment from common.appointment import Appointment
from common.tools import compute_locator
from bitcoind_mock.utils import sha256d from bitcoind_mock.utils import sha256d
from bitcoind_mock.transaction import TX from bitcoind_mock.transaction import TX
@@ -50,6 +51,17 @@ def db_manager():
rmtree("test_db") rmtree("test_db")
@pytest.fixture(scope="module")
def chain_monitor():
chain_monitor = ChainMonitor()
chain_monitor.monitor_chain()
yield chain_monitor
chain_monitor.terminate = True
generate_block()
def generate_keypair(): def generate_keypair():
client_sk = ec.generate_private_key(ec.SECP256K1, default_backend()) client_sk = ec.generate_private_key(ec.SECP256K1, default_backend())
client_pk = client_sk.public_key() client_pk = client_sk.public_key()
@@ -103,7 +115,7 @@ def generate_dummy_appointment_data(real_height=True, start_time_offset=5, end_t
encoding=serialization.Encoding.DER, format=serialization.PublicFormat.SubjectPublicKeyInfo encoding=serialization.Encoding.DER, format=serialization.PublicFormat.SubjectPublicKeyInfo
) )
locator = Watcher.compute_locator(dispute_txid) locator = compute_locator(dispute_txid)
blob = Blob(dummy_appointment_data.get("tx")) blob = Blob(dummy_appointment_data.get("tx"))
encrypted_blob = Cryptographer.encrypt(blob, dummy_appointment_data.get("tx_id")) encrypted_blob = Cryptographer.encrypt(blob, dummy_appointment_data.get("tx_id"))
@@ -147,3 +159,26 @@ def generate_dummy_tracker():
) )
return TransactionTracker.from_dict(tracker_data) return TransactionTracker.from_dict(tracker_data)
def get_config():
config = {
"BTC_RPC_USER": "username",
"BTC_RPC_PASSWD": "password",
"BTC_RPC_HOST": "localhost",
"BTC_RPC_PORT": 8332,
"BTC_NETWORK": "regtest",
"FEED_PROTOCOL": "tcp",
"FEED_ADDR": "127.0.0.1",
"FEED_PORT": 28332,
"MAX_APPOINTMENTS": 100,
"EXPIRY_DELTA": 6,
"MIN_TO_SELF_DELAY": 20,
"SERVER_LOG_FILE": "pisa.log",
"PISA_SECRET_KEY": "pisa_sk.der",
"CLIENT_LOG_FILE": "pisa.log",
"TEST_LOG_FILE": "test.log",
"DB_PATH": "appointments",
}
return config

View File

@@ -9,7 +9,6 @@ from pisa.api import API
from pisa.watcher import Watcher from pisa.watcher import Watcher
from pisa.tools import bitcoin_cli from pisa.tools import bitcoin_cli
from pisa import HOST, PORT from pisa import HOST, PORT
from pisa.conf import MAX_APPOINTMENTS
from test.pisa.unit.conftest import ( from test.pisa.unit.conftest import (
generate_block, generate_block,
@@ -17,6 +16,7 @@ from test.pisa.unit.conftest import (
get_random_value_hex, get_random_value_hex,
generate_dummy_appointment_data, generate_dummy_appointment_data,
generate_keypair, generate_keypair,
get_config,
) )
from common.constants import LOCATOR_LEN_BYTES from common.constants import LOCATOR_LEN_BYTES
@@ -28,18 +28,23 @@ MULTIPLE_APPOINTMENTS = 10
appointments = [] appointments = []
locator_dispute_tx_map = {} locator_dispute_tx_map = {}
config = get_config()
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
def run_api(db_manager): def run_api(db_manager, chain_monitor):
sk, pk = generate_keypair() sk, pk = generate_keypair()
sk_der = sk.private_bytes( sk_der = sk.private_bytes(
encoding=serialization.Encoding.DER, encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.TraditionalOpenSSL, format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(), encryption_algorithm=serialization.NoEncryption(),
) )
watcher = Watcher(db_manager, sk_der)
api_thread = Thread(target=API(watcher).start) watcher = Watcher(db_manager, chain_monitor, sk_der, get_config())
chain_monitor.attach_watcher(watcher.block_queue, watcher.asleep)
chain_monitor.attach_responder(watcher.responder.block_queue, watcher.responder.asleep)
api_thread = Thread(target=API(watcher, config).start)
api_thread.daemon = True api_thread.daemon = True
api_thread.start() api_thread.start()
@@ -102,7 +107,7 @@ def test_request_multiple_appointments_same_locator(new_appt_data, n=MULTIPLE_AP
def test_add_too_many_appointment(new_appt_data): def test_add_too_many_appointment(new_appt_data):
for _ in range(MAX_APPOINTMENTS - len(appointments)): for _ in range(config.get("MAX_APPOINTMENTS") - len(appointments)):
r = add_appointment(new_appt_data) r = add_appointment(new_appt_data)
assert r.status_code == 200 assert r.status_code == 200

View File

@@ -0,0 +1,225 @@
import zmq
import time
from threading import Thread, Event, Condition
from pisa.watcher import Watcher
from pisa.responder import Responder
from pisa.block_processor import BlockProcessor
from pisa.chain_monitor import ChainMonitor
from test.pisa.unit.conftest import get_random_value_hex, generate_block, get_config
def test_init(run_bitcoind):
# run_bitcoind is started here instead of later on to avoid race conditions while it initializes
# Not much to test here, just sanity checks to make sure nothing goes south in the future
chain_monitor = ChainMonitor()
assert chain_monitor.best_tip is None
assert isinstance(chain_monitor.last_tips, list) and len(chain_monitor.last_tips) == 0
assert chain_monitor.terminate is False
assert isinstance(chain_monitor.check_tip, Event)
assert isinstance(chain_monitor.lock, Condition)
assert isinstance(chain_monitor.zmqSubSocket, zmq.Socket)
# The Queues and asleep flags are initialized when attaching the corresponding subscriber
assert chain_monitor.watcher_queue is None
assert chain_monitor.responder_queue is None
assert chain_monitor.watcher_asleep and chain_monitor.responder_asleep
def test_attach_watcher(chain_monitor):
watcher = Watcher(db_manager=None, chain_monitor=chain_monitor, sk_der=None, config=get_config())
chain_monitor.attach_watcher(watcher.block_queue, watcher.asleep)
# booleans are not passed as reference in Python, so the flags need to be set separately
assert watcher.asleep == chain_monitor.watcher_asleep
watcher.asleep = False
assert chain_monitor.watcher_asleep != watcher.asleep
# Test that the Queue work
r_hash = get_random_value_hex(32)
chain_monitor.watcher_queue.put(r_hash)
assert watcher.block_queue.get() == r_hash
def test_attach_responder(chain_monitor):
responder = Responder(db_manager=None, chain_monitor=chain_monitor)
chain_monitor.attach_responder(responder.block_queue, responder.asleep)
# Same kind of testing as with the attach watcher
assert responder.asleep == chain_monitor.watcher_asleep
responder.asleep = False
assert chain_monitor.watcher_asleep != responder.asleep
r_hash = get_random_value_hex(32)
chain_monitor.responder_queue.put(r_hash)
assert responder.block_queue.get() == r_hash
def test_notify_subscribers(chain_monitor):
# Subscribers are only notified as long as they are awake
new_block = get_random_value_hex(32)
# Queues should be empty to start with
assert chain_monitor.watcher_queue.empty()
assert chain_monitor.responder_queue.empty()
chain_monitor.watcher_asleep = True
chain_monitor.responder_asleep = True
chain_monitor.notify_subscribers(new_block)
# And remain empty afterwards since both subscribers were asleep
assert chain_monitor.watcher_queue.empty()
assert chain_monitor.responder_queue.empty()
# Let's flag them as awake and try again
chain_monitor.watcher_asleep = False
chain_monitor.responder_asleep = False
chain_monitor.notify_subscribers(new_block)
assert chain_monitor.watcher_queue.get() == new_block
assert chain_monitor.responder_queue.get() == new_block
def test_update_state(chain_monitor):
# The state is updated after receiving a new block (and only if the block is not already known).
# Let's start by setting a best_tip and a couple of old tips
new_block_hash = get_random_value_hex(32)
chain_monitor.best_tip = new_block_hash
chain_monitor.last_tips = [get_random_value_hex(32) for _ in range(5)]
# Now we can try to update the state with an old best_tip and see how it doesn't work
assert chain_monitor.update_state(chain_monitor.last_tips[0]) is False
# Same should happen with the current tip
assert chain_monitor.update_state(chain_monitor.best_tip) is False
# The state should be correctly updated with a new block hash, the chain tip should change and the old tip should
# have been added to the last_tips
another_block_hash = get_random_value_hex(32)
assert chain_monitor.update_state(another_block_hash) is True
assert chain_monitor.best_tip == another_block_hash and new_block_hash == chain_monitor.last_tips[-1]
def test_monitor_chain_polling():
# Try polling with the Watcher
chain_monitor = ChainMonitor()
chain_monitor.best_tip = BlockProcessor.get_best_block_hash()
watcher = Watcher(db_manager=None, chain_monitor=chain_monitor, sk_der=None, config=get_config())
chain_monitor.attach_watcher(watcher.block_queue, asleep=False)
# monitor_chain_polling runs until terminate if set
polling_thread = Thread(target=chain_monitor.monitor_chain_polling, kwargs={"polling_delta": 0.1}, daemon=True)
polling_thread.start()
# Check that nothing changes as long as a block is not generated
for _ in range(5):
assert chain_monitor.watcher_queue.empty()
time.sleep(0.1)
# And that it does if we generate a block
generate_block()
chain_monitor.watcher_queue.get()
assert chain_monitor.watcher_queue.empty()
chain_monitor.terminate = True
polling_thread.join()
def test_monitor_chain_zmq():
# Try zmq with the Responder
chain_monitor = ChainMonitor()
chain_monitor.best_tip = BlockProcessor.get_best_block_hash()
responder = Responder(db_manager=None, chain_monitor=chain_monitor)
chain_monitor.attach_responder(responder.block_queue, asleep=False)
zmq_thread = Thread(target=chain_monitor.monitor_chain_zmq, daemon=True)
zmq_thread.start()
# Queues should start empty
assert chain_monitor.responder_queue.empty()
# And have a new block every time we generate one
for _ in range(3):
generate_block()
chain_monitor.responder_queue.get()
assert chain_monitor.responder_queue.empty()
# If we flag it to sleep no notification is sent
chain_monitor.responder_asleep = True
for _ in range(3):
generate_block()
assert chain_monitor.responder_queue.empty()
chain_monitor.terminate = True
# The zmq thread needs a block generation to release from the recv method.
generate_block()
zmq_thread.join()
def test_monitor_chain():
# Not much to test here, this should launch two threads (one per monitor approach) and finish on terminate
chain_monitor = ChainMonitor()
watcher = Watcher(db_manager=None, chain_monitor=chain_monitor, sk_der=None, config=get_config())
responder = Responder(db_manager=None, chain_monitor=chain_monitor)
chain_monitor.attach_responder(responder.block_queue, asleep=False)
chain_monitor.attach_watcher(watcher.block_queue, asleep=False)
chain_monitor.best_tip = None
chain_monitor.monitor_chain()
# The tip is updated before starting the threads, so it should have changed.
assert chain_monitor.best_tip is not None
# Blocks should be received
for _ in range(5):
generate_block()
watcher_block = chain_monitor.watcher_queue.get()
responder_block = chain_monitor.responder_queue.get()
assert watcher_block == responder_block
assert chain_monitor.watcher_queue.empty()
assert chain_monitor.responder_queue.empty()
# And the thread be terminated on terminate
chain_monitor.terminate = True
# The zmq thread needs a block generation to release from the recv method.
generate_block()
def test_monitor_chain_single_update():
# This test tests that if both threads try to add the same block to the queue, only the first one will make it
chain_monitor = ChainMonitor()
watcher = Watcher(db_manager=None, chain_monitor=chain_monitor, sk_der=None, config=get_config())
responder = Responder(db_manager=None, chain_monitor=chain_monitor)
chain_monitor.attach_responder(responder.block_queue, asleep=False)
chain_monitor.attach_watcher(watcher.block_queue, asleep=False)
chain_monitor.best_tip = None
# We will create a block and wait for the polling thread. Then check the queues to see that the block hash has only
# been added once.
chain_monitor.monitor_chain(polling_delta=2)
generate_block()
watcher_block = chain_monitor.watcher_queue.get()
responder_block = chain_monitor.responder_queue.get()
assert watcher_block == responder_block
assert chain_monitor.watcher_queue.empty()
assert chain_monitor.responder_queue.empty()
# The delta for polling is 2 secs, so let's wait and see
time.sleep(2)
assert chain_monitor.watcher_queue.empty()
assert chain_monitor.responder_queue.empty()
# We can also force an update and see that it won't go through
assert chain_monitor.update_state(watcher_block) is False

View File

@@ -10,13 +10,13 @@ from common.appointment import Appointment
from pisa.block_processor import BlockProcessor from pisa.block_processor import BlockProcessor
from pisa.conf import MIN_TO_SELF_DELAY from pisa.conf import MIN_TO_SELF_DELAY
from test.pisa.unit.conftest import get_random_value_hex, generate_dummy_appointment_data, generate_keypair from test.pisa.unit.conftest import get_random_value_hex, generate_dummy_appointment_data, generate_keypair, get_config
from common.constants import LOCATOR_LEN_BYTES, LOCATOR_LEN_HEX from common.constants import LOCATOR_LEN_BYTES, LOCATOR_LEN_HEX
from common.cryptographer import Cryptographer from common.cryptographer import Cryptographer
inspector = Inspector() inspector = Inspector(get_config())
APPOINTMENT_OK = (0, None) APPOINTMENT_OK = (0, None)
NO_HEX_STRINGS = [ NO_HEX_STRINGS = [
@@ -126,21 +126,21 @@ def test_check_to_self_delay():
# Right value, right format # Right value, right format
to_self_delays = [MIN_TO_SELF_DELAY, MIN_TO_SELF_DELAY + 1, MIN_TO_SELF_DELAY + 1000] to_self_delays = [MIN_TO_SELF_DELAY, MIN_TO_SELF_DELAY + 1, MIN_TO_SELF_DELAY + 1000]
for to_self_delay in to_self_delays: for to_self_delay in to_self_delays:
assert Inspector.check_to_self_delay(to_self_delay) == APPOINTMENT_OK assert inspector.check_to_self_delay(to_self_delay) == APPOINTMENT_OK
# to_self_delay too small # to_self_delay too small
to_self_delays = [MIN_TO_SELF_DELAY - 1, MIN_TO_SELF_DELAY - 2, 0, -1, -1000] to_self_delays = [MIN_TO_SELF_DELAY - 1, MIN_TO_SELF_DELAY - 2, 0, -1, -1000]
for to_self_delay in to_self_delays: for to_self_delay in to_self_delays:
assert Inspector.check_to_self_delay(to_self_delay)[0] == APPOINTMENT_FIELD_TOO_SMALL assert inspector.check_to_self_delay(to_self_delay)[0] == APPOINTMENT_FIELD_TOO_SMALL
# Empty field # Empty field
to_self_delay = None to_self_delay = None
assert Inspector.check_to_self_delay(to_self_delay)[0] == APPOINTMENT_EMPTY_FIELD assert inspector.check_to_self_delay(to_self_delay)[0] == APPOINTMENT_EMPTY_FIELD
# Wrong data type # Wrong data type
to_self_delays = WRONG_TYPES to_self_delays = WRONG_TYPES
for to_self_delay in to_self_delays: for to_self_delay in to_self_delays:
assert Inspector.check_to_self_delay(to_self_delay)[0] == APPOINTMENT_WRONG_FIELD_TYPE assert inspector.check_to_self_delay(to_self_delay)[0] == APPOINTMENT_WRONG_FIELD_TYPE
def test_check_blob(): def test_check_blob():

View File

@@ -0,0 +1,52 @@
import importlib
import os
import pytest
from pathlib import Path
from shutil import copyfile
from pisa.pisad import load_config
test_conf_file_path = os.getcwd() + "/test/pisa/unit/test_conf.py"
def test_load_config():
# Copy the sample-conf.py file to use as a test config file.
copyfile(os.getcwd() + "/pisa/sample_conf.py", test_conf_file_path)
import test.pisa.unit.test_conf as conf
# If the file has all the correct fields and data, it should return a dict.
conf_dict = load_config(conf)
assert type(conf_dict) == dict
# Delete the file.
os.remove(test_conf_file_path)
def test_bad_load_config():
# Create a messed up version of the file that should throw an error.
with open(test_conf_file_path, "w") as f:
f.write('# bitcoind\nBTC_RPC_USER = 0000\nBTC_RPC_PASSWD = "password"\nBTC_RPC_HOST = 000')
import test.pisa.unit.test_conf as conf
importlib.reload(conf)
with pytest.raises(Exception):
conf_dict = load_config(conf)
os.remove(test_conf_file_path)
def test_empty_load_config():
# Create an empty version of the file that should throw an error.
open(test_conf_file_path, "a")
import test.pisa.unit.test_conf as conf
importlib.reload(conf)
with pytest.raises(Exception):
conf_dict = load_config(conf)
os.remove(test_conf_file_path)

View File

@@ -5,15 +5,14 @@ from uuid import uuid4
from shutil import rmtree from shutil import rmtree
from copy import deepcopy from copy import deepcopy
from threading import Thread from threading import Thread
from queue import Queue, Empty
from pisa.db_manager import DBManager from pisa.db_manager import DBManager
from pisa.responder import Responder, TransactionTracker from pisa.responder import Responder, TransactionTracker
from pisa.block_processor import BlockProcessor from pisa.block_processor import BlockProcessor
from pisa.chain_monitor import ChainMonitor
from pisa.tools import bitcoin_cli from pisa.tools import bitcoin_cli
from common.constants import LOCATOR_LEN_HEX from common.constants import LOCATOR_LEN_HEX
from common.tools import check_sha256_hex_format
from bitcoind_mock.utils import sha256d from bitcoind_mock.utils import sha256d
from bitcoind_mock.transaction import TX from bitcoind_mock.transaction import TX
@@ -21,8 +20,11 @@ from test.pisa.unit.conftest import generate_block, generate_blocks, get_random_
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
def responder(db_manager): def responder(db_manager, chain_monitor):
return Responder(db_manager) responder = Responder(db_manager, chain_monitor)
chain_monitor.attach_responder(responder.block_queue, responder.asleep)
return responder
@pytest.fixture() @pytest.fixture()
@@ -145,17 +147,19 @@ def test_tracker_from_dict_invalid_data():
def test_init_responder(responder): def test_init_responder(responder):
assert type(responder.trackers) is dict and len(responder.trackers) == 0 assert isinstance(responder.trackers, dict) and len(responder.trackers) == 0
assert type(responder.tx_tracker_map) is dict and len(responder.tx_tracker_map) == 0 assert isinstance(responder.tx_tracker_map, dict) and len(responder.tx_tracker_map) == 0
assert type(responder.unconfirmed_txs) is list and len(responder.unconfirmed_txs) == 0 assert isinstance(responder.unconfirmed_txs, list) and len(responder.unconfirmed_txs) == 0
assert type(responder.missed_confirmations) is dict and len(responder.missed_confirmations) == 0 assert isinstance(responder.missed_confirmations, dict) and len(responder.missed_confirmations) == 0
assert isinstance(responder.chain_monitor, ChainMonitor)
assert responder.block_queue.empty() assert responder.block_queue.empty()
assert responder.asleep is True assert responder.asleep is True
assert responder.zmq_subscriber is None
def test_handle_breach(db_manager): def test_handle_breach(db_manager, chain_monitor):
responder = Responder(db_manager) responder = Responder(db_manager, chain_monitor)
chain_monitor.attach_responder(responder.block_queue, responder.asleep)
uuid = uuid4().hex uuid = uuid4().hex
tracker = create_dummy_tracker() tracker = create_dummy_tracker()
@@ -172,11 +176,10 @@ def test_handle_breach(db_manager):
assert receipt.delivered is True assert receipt.delivered is True
# The responder automatically fires add_tracker on adding a tracker if it is asleep. We need to stop the processes now. # The responder automatically fires add_tracker on adding a tracker if it is asleep. We need to stop the processes
# To do so we delete all the trackers, stop the zmq and create a new fake block to unblock the queue.get method # now. To do so we delete all the trackers, and generate a new block.
responder.trackers = dict() responder.trackers = dict()
responder.zmq_subscriber.terminate = True generate_block()
responder.block_queue.put(get_random_value_hex(32))
def test_add_bad_response(responder): def test_add_bad_response(responder):
@@ -184,7 +187,7 @@ def test_add_bad_response(responder):
tracker = create_dummy_tracker() tracker = create_dummy_tracker()
# Now that the asleep / awake functionality has been tested we can avoid manually killing the responder by setting # Now that the asleep / awake functionality has been tested we can avoid manually killing the responder by setting
# to awake. That will prevent the zmq thread to be launched again. # to awake. That will prevent the chain_monitor thread to be launched again.
responder.asleep = False responder.asleep = False
# A txid instead of a rawtx should be enough for unit tests using the bitcoind mock, better tests are needed though. # A txid instead of a rawtx should be enough for unit tests using the bitcoind mock, better tests are needed though.
@@ -205,7 +208,7 @@ def test_add_bad_response(responder):
def test_add_tracker(responder): def test_add_tracker(responder):
responder.asleep = False # Responder is asleep
for _ in range(20): for _ in range(20):
uuid = uuid4().hex uuid = uuid4().hex
@@ -235,7 +238,8 @@ def test_add_tracker(responder):
def test_add_tracker_same_penalty_txid(responder): def test_add_tracker_same_penalty_txid(responder):
# Create the same tracker using two different uuids # Responder is asleep
confirmations = 0 confirmations = 0
locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end = create_dummy_tracker_data(random_txid=True) locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end = create_dummy_tracker_data(random_txid=True)
uuid_1 = uuid4().hex uuid_1 = uuid4().hex
@@ -260,7 +264,7 @@ def test_add_tracker_same_penalty_txid(responder):
def test_add_tracker_already_confirmed(responder): def test_add_tracker_already_confirmed(responder):
responder.asleep = False # Responder is asleep
for i in range(20): for i in range(20):
uuid = uuid4().hex uuid = uuid4().hex
@@ -274,29 +278,10 @@ def test_add_tracker_already_confirmed(responder):
assert penalty_txid not in responder.unconfirmed_txs assert penalty_txid not in responder.unconfirmed_txs
def test_do_subscribe(responder): def test_do_watch(temp_db_manager, chain_monitor):
responder.block_queue = Queue() # Create a fresh responder to simplify the test
responder = Responder(temp_db_manager, chain_monitor)
zmq_thread = Thread(target=responder.do_subscribe) chain_monitor.attach_responder(responder.block_queue, False)
zmq_thread.daemon = True
zmq_thread.start()
try:
generate_block()
block_hash = responder.block_queue.get()
assert check_sha256_hex_format(block_hash)
except Empty:
assert False
def test_do_watch(temp_db_manager):
responder = Responder(temp_db_manager)
responder.block_queue = Queue()
zmq_thread = Thread(target=responder.do_subscribe)
zmq_thread.daemon = True
zmq_thread.start()
trackers = [create_dummy_tracker(penalty_rawtx=TX.create_dummy_transaction()) for _ in range(20)] trackers = [create_dummy_tracker(penalty_rawtx=TX.create_dummy_transaction()) for _ in range(20)]
@@ -318,9 +303,7 @@ def test_do_watch(temp_db_manager):
responder.db_manager.store_responder_tracker(uuid, tracker.to_json()) responder.db_manager.store_responder_tracker(uuid, tracker.to_json())
# Let's start to watch # Let's start to watch
watch_thread = Thread(target=responder.do_watch) Thread(target=responder.do_watch, daemon=True).start()
watch_thread.daemon = True
watch_thread.start()
# And broadcast some of the transactions # And broadcast some of the transactions
broadcast_txs = [] broadcast_txs = []
@@ -354,13 +337,9 @@ def test_do_watch(temp_db_manager):
assert responder.asleep is True assert responder.asleep is True
def test_check_confirmations(temp_db_manager): def test_check_confirmations(temp_db_manager, chain_monitor):
responder = Responder(temp_db_manager) responder = Responder(temp_db_manager, chain_monitor)
responder.block_queue = Queue() chain_monitor.attach_responder(responder.block_queue, responder.asleep)
zmq_thread = Thread(target=responder.do_subscribe)
zmq_thread.daemon = True
zmq_thread.start()
# check_confirmations checks, given a list of transaction for a block, what of the known penalty transaction have # check_confirmations checks, given a list of transaction for a block, what of the known penalty transaction have
# been confirmed. To test this we need to create a list of transactions and the state of the responder # been confirmed. To test this we need to create a list of transactions and the state of the responder
@@ -390,7 +369,7 @@ def test_check_confirmations(temp_db_manager):
assert responder.missed_confirmations[tx] == 1 assert responder.missed_confirmations[tx] == 1
# WIP: Check this properly, a bug pass unnoticed! # TODO: Check this properly, a bug pass unnoticed!
def test_get_txs_to_rebroadcast(responder): def test_get_txs_to_rebroadcast(responder):
# Let's create a few fake txids and assign at least 6 missing confirmations to each # Let's create a few fake txids and assign at least 6 missing confirmations to each
txs_missing_too_many_conf = {get_random_value_hex(32): 6 + i for i in range(10)} txs_missing_too_many_conf = {get_random_value_hex(32): 6 + i for i in range(10)}
@@ -414,13 +393,13 @@ def test_get_txs_to_rebroadcast(responder):
assert txs_to_rebroadcast == list(txs_missing_too_many_conf.keys()) assert txs_to_rebroadcast == list(txs_missing_too_many_conf.keys())
def test_get_completed_trackers(db_manager): def test_get_completed_trackers(db_manager, chain_monitor):
initial_height = bitcoin_cli().getblockcount() initial_height = bitcoin_cli().getblockcount()
# Let's use a fresh responder for this to make it easier to compare the results responder = Responder(db_manager, chain_monitor)
responder = Responder(db_manager) chain_monitor.attach_responder(responder.block_queue, responder.asleep)
# A complete tracker is a tracker that has reached the appointment end with enough confirmations (> MIN_CONFIRMATIONS) # A complete tracker is a tracker that has reached the appointment end with enough confs (> MIN_CONFIRMATIONS)
# We'll create three type of transactions: end reached + enough conf, end reached + no enough conf, end not reached # We'll create three type of transactions: end reached + enough conf, end reached + no enough conf, end not reached
trackers_end_conf = { trackers_end_conf = {
uuid4().hex: create_dummy_tracker(penalty_rawtx=TX.create_dummy_transaction()) for _ in range(10) uuid4().hex: create_dummy_tracker(penalty_rawtx=TX.create_dummy_transaction()) for _ in range(10)
@@ -473,9 +452,10 @@ def test_get_completed_trackers(db_manager):
assert set(completed_trackers_ids) == set(ended_trackers_keys) assert set(completed_trackers_ids) == set(ended_trackers_keys)
def test_rebroadcast(db_manager): def test_rebroadcast(db_manager, chain_monitor):
responder = Responder(db_manager) responder = Responder(db_manager, chain_monitor)
responder.asleep = False responder.asleep = False
chain_monitor.attach_responder(responder.block_queue, responder.asleep)
txs_to_rebroadcast = [] txs_to_rebroadcast = []

View File

@@ -1,22 +1,24 @@
import pytest import pytest
from uuid import uuid4 from uuid import uuid4
from threading import Thread from threading import Thread
from queue import Queue, Empty from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives import serialization
from pisa.watcher import Watcher from pisa.watcher import Watcher
from pisa.responder import Responder from pisa.responder import Responder
from pisa.tools import bitcoin_cli from pisa.tools import bitcoin_cli
from pisa.chain_monitor import ChainMonitor
from test.pisa.unit.conftest import ( from test.pisa.unit.conftest import (
generate_block,
generate_blocks, generate_blocks,
generate_dummy_appointment, generate_dummy_appointment,
get_random_value_hex, get_random_value_hex,
generate_keypair, generate_keypair,
get_config,
) )
from pisa.conf import EXPIRY_DELTA, MAX_APPOINTMENTS from pisa.conf import EXPIRY_DELTA, MAX_APPOINTMENTS
from common.tools import check_sha256_hex_format from common.tools import compute_locator
from common.cryptographer import Cryptographer from common.cryptographer import Cryptographer
@@ -35,8 +37,12 @@ sk_der = signing_key.private_bytes(
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
def watcher(db_manager): def watcher(db_manager, chain_monitor):
return Watcher(db_manager, sk_der) watcher = Watcher(db_manager, chain_monitor, sk_der, get_config())
chain_monitor.attach_watcher(watcher.block_queue, watcher.asleep)
chain_monitor.attach_responder(watcher.responder.block_queue, watcher.responder.asleep)
return watcher
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
@@ -46,7 +52,7 @@ def txids():
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
def locator_uuid_map(txids): def locator_uuid_map(txids):
return {Watcher.compute_locator(txid): uuid4().hex for txid in txids} return {compute_locator(txid): uuid4().hex for txid in txids}
def create_appointments(n): def create_appointments(n):
@@ -67,17 +73,18 @@ def create_appointments(n):
return appointments, locator_uuid_map, dispute_txs return appointments, locator_uuid_map, dispute_txs
def test_init(watcher): def test_init(run_bitcoind, watcher):
assert type(watcher.appointments) is dict and len(watcher.appointments) == 0 assert isinstance(watcher.appointments, dict) and len(watcher.appointments) == 0
assert type(watcher.locator_uuid_map) is dict and len(watcher.locator_uuid_map) == 0 assert isinstance(watcher.locator_uuid_map, dict) and len(watcher.locator_uuid_map) == 0
assert watcher.block_queue.empty()
assert watcher.asleep is True assert watcher.asleep is True
assert watcher.max_appointments == MAX_APPOINTMENTS assert watcher.block_queue.empty()
assert watcher.zmq_subscriber is None assert isinstance(watcher.chain_monitor, ChainMonitor)
assert type(watcher.responder) is Responder assert isinstance(watcher.config, dict)
assert isinstance(watcher.signing_key, ec.EllipticCurvePrivateKey)
assert isinstance(watcher.responder, Responder)
def test_add_appointment(run_bitcoind, watcher): def test_add_appointment(watcher):
# The watcher automatically fires do_watch and do_subscribe on adding an appointment if it is asleep (initial state) # The watcher automatically fires do_watch and do_subscribe on adding an appointment if it is asleep (initial state)
# Avoid this by setting the state to awake. # Avoid this by setting the state to awake.
watcher.asleep = False watcher.asleep = False
@@ -121,25 +128,10 @@ def test_add_too_many_appointments(watcher):
assert sig is None assert sig is None
def test_do_subscribe(watcher):
watcher.block_queue = Queue()
zmq_thread = Thread(target=watcher.do_subscribe)
zmq_thread.daemon = True
zmq_thread.start()
try:
generate_block()
block_hash = watcher.block_queue.get()
assert check_sha256_hex_format(block_hash)
except Empty:
assert False
def test_do_watch(watcher): def test_do_watch(watcher):
# We will wipe all the previous data and add 5 appointments # We will wipe all the previous data and add 5 appointments
appointments, locator_uuid_map, dispute_txs = create_appointments(APPOINTMENTS) appointments, locator_uuid_map, dispute_txs = create_appointments(APPOINTMENTS)
watcher.chain_monitor.watcher_asleep = False
# Set the data into the Watcher and in the db # Set the data into the Watcher and in the db
watcher.locator_uuid_map = locator_uuid_map watcher.locator_uuid_map = locator_uuid_map
@@ -150,16 +142,13 @@ def test_do_watch(watcher):
watcher.db_manager.store_watcher_appointment(uuid, appointment.to_json()) watcher.db_manager.store_watcher_appointment(uuid, appointment.to_json())
watcher.db_manager.store_update_locator_map(appointment.locator, uuid) watcher.db_manager.store_update_locator_map(appointment.locator, uuid)
watch_thread = Thread(target=watcher.do_watch) Thread(target=watcher.do_watch, daemon=True).start()
watch_thread.daemon = True
watch_thread.start()
# Broadcast the first two # Broadcast the first two
for dispute_tx in dispute_txs[:2]: for dispute_tx in dispute_txs[:2]:
bitcoin_cli().sendrawtransaction(dispute_tx) bitcoin_cli().sendrawtransaction(dispute_tx)
# After leaving some time for the block to be mined and processed, the number of appointments should have reduced # After generating enough blocks, the number of appointments should have reduced by two
# by two
generate_blocks(START_TIME_OFFSET + END_TIME_OFFSET) generate_blocks(START_TIME_OFFSET + END_TIME_OFFSET)
assert len(watcher.appointments) == APPOINTMENTS - 2 assert len(watcher.appointments) == APPOINTMENTS - 2
@@ -230,7 +219,7 @@ def test_filter_valid_breaches(watcher):
dummy_appointment, _ = generate_dummy_appointment() dummy_appointment, _ = generate_dummy_appointment()
dummy_appointment.encrypted_blob.data = encrypted_blob dummy_appointment.encrypted_blob.data = encrypted_blob
dummy_appointment.locator = Watcher.compute_locator(dispute_txid) dummy_appointment.locator = compute_locator(dispute_txid)
uuid = uuid4().hex uuid = uuid4().hex
appointments = {uuid: dummy_appointment} appointments = {uuid: dummy_appointment}