From 40d7ca1912e06dc9eae12ccace43db538add3f4c Mon Sep 17 00:00:00 2001 From: Turtle Date: Sat, 30 Nov 2019 00:42:36 -0500 Subject: [PATCH 1/4] Refactor add_appointment cli code --- apps/cli/pisa_cli.py | 228 ++++++++++++++++++++------------- common/tools.py | 13 ++ pisa/watcher.py | 18 +-- test/pisa/unit/conftest.py | 3 +- test/pisa/unit/test_watcher.py | 6 +- 5 files changed, 162 insertions(+), 106 deletions(-) diff --git a/apps/cli/pisa_cli.py b/apps/cli/pisa_cli.py index 0f2eb92..440860d 100644 --- a/apps/cli/pisa_cli.py +++ b/apps/cli/pisa_cli.py @@ -9,8 +9,8 @@ from getopt import getopt, GetoptError from requests import ConnectTimeout, ConnectionError from uuid import uuid4 -from apps.cli.blob import Blob from apps.cli.help import help_add_appointment, help_get_appointment +from apps.cli.blob import Blob from apps.cli import ( DEFAULT_PISA_API_SERVER, DEFAULT_PISA_API_PORT, @@ -22,9 +22,8 @@ from apps.cli import ( from common.logger import Logger from common.appointment import Appointment -from common.constants import LOCATOR_LEN_HEX from common.cryptographer import Cryptographer -from common.tools import check_sha256_hex_format +from common.tools import check_sha256_hex_format, compute_locator HTTP_OK = 200 @@ -46,11 +45,13 @@ def generate_dummy_appointment(): "to_self_delay": 20, } - print("Generating dummy appointment data:" "\n\n" + json.dumps(dummy_appointment_data, indent=4, sort_keys=True)) + logger.info( + "Generating dummy appointment data:" "\n\n" + json.dumps(dummy_appointment_data, indent=4, sort_keys=True) + ) json.dump(dummy_appointment_data, open("dummy_appointment_data.json", "w")) - print("\nData stored in dummy_appointment_data.json") + logger.info("\nData stored in dummy_appointment_data.json") # Loads and returns Pisa keys from disk @@ -61,11 +62,12 @@ def load_key_file_data(file_name): return key except FileNotFoundError: - raise FileNotFoundError("File not found.") + logger.error("Client's key file not found. Please check your settings.") + return False - -def compute_locator(tx_id): - return tx_id[:LOCATOR_LEN_HEX] + except IOError as e: + logger.error("I/O error({}): {}".format(e.errno, e.strerror)) + return False # Makes sure that the folder APPOINTMENTS_FOLDER_NAME exists, then saves the appointment and signature in it. @@ -85,12 +87,81 @@ def save_signed_appointment(appointment, signature): def add_appointment(args): - appointment_data = None + # Get appointment data from user. + appointment_data = parse_add_appointment_args(args) + + if appointment_data is None: + logger.error("The provided appointment JSON is empty") + return False + + valid_txid = check_sha256_hex_format(appointment_data.get("tx_id")) + + if not valid_txid: + logger.error("The provided txid is not valid") + return False + + tx_id = appointment_data.get("tx_id") + tx = appointment_data.get("tx") + + if None not in [tx_id, tx]: + appointment_data["locator"] = compute_locator(tx_id) + appointment_data["encrypted_blob"] = Cryptographer.encrypt(Blob(tx), tx_id) + + else: + logger.error("Appointment data is missing some fields.") + return False + + appointment = Appointment.from_dict(appointment_data) + + signature = get_appointment_signature(appointment) + hex_pk_der = get_pk() + + if not (appointment and signature and hex_pk_der): + return False + + data = {"appointment": appointment.to_dict(), "signature": signature, "public_key": hex_pk_der.decode("utf-8")} + + appointment_json = json.dumps(data, sort_keys=True, separators=(",", ":")) + + # Send appointment to the server. + add_appointment_endpoint = "http://{}:{}".format(pisa_api_server, pisa_api_port) + response_json = post_data_to_add_appointment_endpoint(add_appointment_endpoint, appointment_json) + + if response_json is None: + return False + + signature = response_json.get("signature") + # Check that the server signed the appointment as it should. + if signature is None: + logger.error("The response does not contain the signature of the appointment.") + return False + + valid = check_signature(signature, appointment) + + if not valid: + logger.error("The returned appointment's signature is invalid") + return False + + logger.info("Appointment accepted and signed by Pisa") + # all good, store appointment and signature + try: + save_signed_appointment(appointment.to_dict(), signature) + + except OSError as e: + logger.error("There was an error while saving the appointment", error=e) + return False + + return True + + +# Parse arguments passed to add_appointment and handle them accordingly. +# Returns appointment data. +def parse_add_appointment_args(args): use_help = "Use 'help add_appointment' for help of how to use the command" if not args: logger.error("No appointment data provided. " + use_help) - return False + return None arg_opt = args.pop(0) @@ -102,7 +173,7 @@ def add_appointment(args): fin = args.pop(0) if not os.path.isfile(fin): logger.error("Can't find file", filename=fin) - return False + return None try: with open(fin) as f: @@ -110,63 +181,19 @@ def add_appointment(args): except IOError as e: logger.error("I/O error", errno=e.errno, error=e.strerror) - return False + return None else: appointment_data = json.loads(arg_opt) except json.JSONDecodeError: logger.error("Non-JSON encoded data provided as appointment. " + use_help) - return False + return None - if not appointment_data: - logger.error("The provided JSON is empty") - return False + return appointment_data - valid_locator = check_sha256_hex_format(appointment_data.get("tx_id")) - - if not valid_locator: - logger.error("The provided locator is not valid") - return False - - add_appointment_endpoint = "http://{}:{}".format(pisa_api_server, pisa_api_port) - appointment = Appointment.from_dict(appointment_data) - - try: - sk_der = load_key_file_data(CLI_PRIVATE_KEY) - cli_sk = Cryptographer.load_private_key_der(sk_der) - - except ValueError: - logger.error("Failed to deserialize the public key. It might be in an unsupported format") - return False - - except FileNotFoundError: - logger.error("Client's private key file not found. Please check your settings") - return False - - except IOError as e: - logger.error("I/O error", errno=e.errno, error=e.strerror) - return False - - signature = Cryptographer.sign(appointment.serialize(), cli_sk) - - try: - cli_pk_der = load_key_file_data(CLI_PUBLIC_KEY) - hex_pk_der = binascii.hexlify(cli_pk_der) - - except FileNotFoundError: - logger.error("Client's public key file not found. Please check your settings") - return False - - except IOError as e: - logger.error("I/O error", errno=e.errno, error=e.strerror) - return False - - # FIXME: Exceptions for hexlify need to be covered - - data = {"appointment": appointment, "signature": signature, "public_key": hex_pk_der.decode("utf-8")} - - appointment_json = json.dumps(data, sort_keys=True, separators=(",", ":")) +# Sends appointment data to add_appointment endpoint to be processed by the server. +def post_data_to_add_appointment_endpoint(add_appointment_endpoint, appointment_json): logger.info("Sending appointment to PISA") try: @@ -176,15 +203,15 @@ def add_appointment(args): except json.JSONDecodeError: logger.error("The response was not valid JSON") - return False + return None except ConnectTimeout: logger.error("Can't connect to pisa API. Connection timeout") - return False + return None except ConnectionError: logger.error("Can't connect to pisa API. Server cannot be reached") - return False + return None if r.status_code != HTTP_OK: if "error" not in response_json: @@ -196,14 +223,17 @@ def add_appointment(args): status_code=r.status_code, description=error, ) - return False + return None if "signature" not in response_json: logger.error("The response does not contain the signature of the appointment") - return False + return None - signature = response_json["signature"] - # verify that the returned signature is valid + return response_json + + +# Verify that the signature returned from the watchtower is valid. +def check_signature(signature, appointment): try: pisa_pk_der = load_key_file_data(PISA_PUBLIC_KEY) pisa_pk = Cryptographer.load_public_key_der(pisa_pk_der) @@ -212,7 +242,7 @@ def add_appointment(args): logger.error("Failed to deserialize the public key. It might be in an unsupported format") return False - is_sig_valid = Cryptographer.verify(appointment.serialize(), signature, pisa_pk) + return Cryptographer.verify(appointment.serialize(), signature, pisa_pk) except FileNotFoundError: logger.error("Pisa's public key file not found. Please check your settings") @@ -222,21 +252,6 @@ def add_appointment(args): logger.error("I/O error", errno=e.errno, error=e.strerror) return False - if not is_sig_valid: - logger.error("The returned appointment's signature is invalid") - return False - - logger.info("Appointment accepted and signed by Pisa") - # all good, store appointment and signature - try: - save_signed_appointment(appointment, signature) - - except OSError as e: - logger.error("There was an error while saving the appointment", error=e) - return False - - return True - def get_appointment(args): if not args: @@ -260,8 +275,9 @@ def get_appointment(args): try: r = requests.get(url=get_appointment_endpoint + parameters, timeout=5) + logger.info("Appointment response returned from server: " + str(r)) + return True - print(json.dumps(r.json(), indent=4, sort_keys=True)) except ConnectTimeout: logger.error("Can't connect to pisa API. Connection timeout") return False @@ -270,7 +286,47 @@ def get_appointment(args): logger.error("Can't connect to pisa API. Server cannot be reached") return False - return True + +def get_appointment_signature(appointment): + try: + sk_der = load_key_file_data(CLI_PRIVATE_KEY) + cli_sk = Cryptographer.load_private_key_der(sk_der) + + signature = Cryptographer.sign(appointment.serialize(), cli_sk) + + return signature + + except ValueError: + logger.error("Failed to deserialize the public key. It might be in an unsupported format") + return False + + except FileNotFoundError: + logger.error("Client's private key file not found. Please check your settings") + return False + + except IOError as e: + logger.error("I/O error", errno=e.errno, error=e.strerror) + return False + + +def get_pk(): + try: + cli_pk_der = load_key_file_data(CLI_PUBLIC_KEY) + hex_pk_der = binascii.hexlify(cli_pk_der) + + return hex_pk_der + + except FileNotFoundError: + logger.error("Client's public key file not found. Please check your settings") + return False + + except IOError as e: + logger.error("I/O error", errno=e.errno, error=e.strerror) + return False + + except binascii.Error as e: + logger.error("Could not successfully encode public key as hex: ", e) + return False def show_usage(): diff --git a/common/tools.py b/common/tools.py index e527e33..2ac6d1e 100644 --- a/common/tools.py +++ b/common/tools.py @@ -1,4 +1,5 @@ import re +from common.constants import LOCATOR_LEN_HEX def check_sha256_hex_format(value): @@ -12,3 +13,15 @@ def check_sha256_hex_format(value): :mod:`bool`: Whether or not the value matches the format. """ return isinstance(value, str) and re.match(r"^[0-9A-Fa-f]{64}$", value) is not None + + +def compute_locator(tx_id): + """ + Computes an appointment locator given a transaction id. + Args: + tx_id (:obj:`str`): the transaction id used to compute the locator. + Returns: + (:obj:`str`): The computed locator. + """ + + return tx_id[:LOCATOR_LEN_HEX] diff --git a/pisa/watcher.py b/pisa/watcher.py index 9d659db..3d46032 100644 --- a/pisa/watcher.py +++ b/pisa/watcher.py @@ -3,7 +3,7 @@ from queue import Queue from threading import Thread from common.cryptographer import Cryptographer -from common.constants import LOCATOR_LEN_HEX +from common.tools import compute_locator from common.logger import Logger from pisa.cleaner import Cleaner @@ -71,20 +71,6 @@ class Watcher: if not isinstance(responder, Responder): self.responder = Responder(db_manager) - @staticmethod - def compute_locator(tx_id): - """ - Computes an appointment locator given a transaction id. - - Args: - tx_id (:obj:`str`): the transaction id used to compute the locator. - - Returns: - (:obj:`str`): The computed locator. - """ - - return tx_id[:LOCATOR_LEN_HEX] - def add_appointment(self, appointment): """ Adds a new appointment to the ``appointments`` dictionary if ``max_appointments`` has not been reached. @@ -238,7 +224,7 @@ class Watcher: found. """ - potential_locators = {Watcher.compute_locator(txid): txid for txid in txids} + potential_locators = {compute_locator(txid): txid for txid in txids} # Check is any of the tx_ids in the received block is an actual match intersection = set(self.locator_uuid_map.keys()).intersection(potential_locators.keys()) diff --git a/test/pisa/unit/conftest.py b/test/pisa/unit/conftest.py index 4ff9028..cc215c5 100644 --- a/test/pisa/unit/conftest.py +++ b/test/pisa/unit/conftest.py @@ -16,6 +16,7 @@ from pisa.watcher import Watcher from pisa.tools import bitcoin_cli from pisa.db_manager import DBManager from common.appointment import Appointment +from common.tools import compute_locator from bitcoind_mock.utils import sha256d from bitcoind_mock.transaction import TX @@ -103,7 +104,7 @@ def generate_dummy_appointment_data(real_height=True, start_time_offset=5, end_t encoding=serialization.Encoding.DER, format=serialization.PublicFormat.SubjectPublicKeyInfo ) - locator = Watcher.compute_locator(dispute_txid) + locator = compute_locator(dispute_txid) blob = Blob(dummy_appointment_data.get("tx")) encrypted_blob = Cryptographer.encrypt(blob, dummy_appointment_data.get("tx_id")) diff --git a/test/pisa/unit/test_watcher.py b/test/pisa/unit/test_watcher.py index 8ec331c..4fa2252 100644 --- a/test/pisa/unit/test_watcher.py +++ b/test/pisa/unit/test_watcher.py @@ -16,7 +16,7 @@ from test.pisa.unit.conftest import ( ) from pisa.conf import EXPIRY_DELTA, MAX_APPOINTMENTS -from common.tools import check_sha256_hex_format +from common.tools import check_sha256_hex_format, compute_locator from common.cryptographer import Cryptographer @@ -46,7 +46,7 @@ def txids(): @pytest.fixture(scope="module") def locator_uuid_map(txids): - return {Watcher.compute_locator(txid): uuid4().hex for txid in txids} + return {compute_locator(txid): uuid4().hex for txid in txids} def create_appointments(n): @@ -219,7 +219,7 @@ def test_filter_valid_breaches(watcher): dummy_appointment, _ = generate_dummy_appointment() dummy_appointment.encrypted_blob.data = encrypted_blob - dummy_appointment.locator = Watcher.compute_locator(dispute_txid) + dummy_appointment.locator = compute_locator(dispute_txid) uuid = uuid4().hex appointments = {uuid: dummy_appointment} From 39208e4b144ba70c519347da698afd619889a021 Mon Sep 17 00:00:00 2001 From: Turtle Date: Mon, 23 Dec 2019 16:19:01 -0500 Subject: [PATCH 2/4] Test new functions split off in pisa_cli --- test/apps/cli/unit/test_pisa_cli.py | 160 ++++++++++++++++++++++++---- 1 file changed, 142 insertions(+), 18 deletions(-) diff --git a/test/apps/cli/unit/test_pisa_cli.py b/test/apps/cli/unit/test_pisa_cli.py index e14fd0b..4f47f98 100644 --- a/test/apps/cli/unit/test_pisa_cli.py +++ b/test/apps/cli/unit/test_pisa_cli.py @@ -1,23 +1,41 @@ import responses import json +import os +import pytest from binascii import hexlify from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import ec +from common.appointment import Appointment +from common.cryptographer import Cryptographer + import apps.cli.pisa_cli as pisa_cli from test.apps.cli.unit.conftest import get_random_value_hex -# TODO: should find a way of doing without this -from apps.cli.pisa_cli import build_appointment - # dummy keys for the tests pisa_sk = ec.generate_private_key(ec.SECP256K1, default_backend()) pisa_pk = pisa_sk.public_key() other_sk = ec.generate_private_key(ec.SECP256K1, default_backend()) +pisa_sk_der = pisa_sk.private_bytes( + encoding=serialization.Encoding.DER, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption(), +) +pisa_pk_der = pisa_pk.public_bytes( + encoding=serialization.Encoding.DER, format=serialization.PublicFormat.SubjectPublicKeyInfo +) + +other_sk_der = other_sk.private_bytes( + encoding=serialization.Encoding.DER, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption(), +) + + # Replace the key in the module with a key we control for the tests pisa_cli.pisa_public_key = pisa_pk # Replace endpoint with dummy one @@ -31,19 +49,49 @@ dummy_appointment_request = { "start_time": 1500, "end_time": 50000, "to_self_delay": 200, + "encrypted_blob": get_random_value_hex(120), } -dummy_appointment = build_appointment(**dummy_appointment_request) -# FIXME: USE CRYPTOGRAPHER +# This is the format appointment turns into once it hits "add_appointment" +dummy_appointment_full = { + "locator": get_random_value_hex(32), + "start_time": 1500, + "end_time": 50000, + "to_self_delay": 200, + "encrypted_blob": get_random_value_hex(120), +} + +dummy_appointment = Appointment.from_dict(dummy_appointment_full) -def sign_appointment(sk, appointment): - data = json.dumps(appointment, sort_keys=True, separators=(",", ":")).encode("utf-8") - return hexlify(sk.sign(data, ec.ECDSA(hashes.SHA256()))).decode("utf-8") +def get_dummy_pisa_sk_der(*args): + return pisa_sk_der -def get_dummy_pisa_pk(der_data): - return pisa_pk +def get_dummy_pisa_pk_der(*args): + return pisa_pk_der + + +def get_dummy_hex_pk_der(*args): + return hexlify(get_dummy_pisa_pk_der(None)) + + +def get_dummy_signature(*args): + sk = Cryptographer.load_private_key_der(pisa_sk_der) + return Cryptographer.sign(dummy_appointment.serialize(), sk) + + +def get_bad_signature(*args): + sk = Cryptographer.load_private_key_der(other_sk_der) + return Cryptographer.sign(dummy_appointment.serialize(), sk) + + +def valid_sig(*args): + return True + + +def invalid_sig(*args): + return False @responses.activate @@ -51,10 +99,12 @@ def test_add_appointment(monkeypatch): # Simulate a request to add_appointment for dummy_appointment, make sure that the right endpoint is requested # and the return value is True - # make sure the test uses the right dummy key instead of loading it from disk - monkeypatch.setattr(pisa_cli, "load_public_key", get_dummy_pisa_pk) + # Make sure the test uses the dummy signature + monkeypatch.setattr(pisa_cli, "get_appointment_signature", get_dummy_signature) + monkeypatch.setattr(pisa_cli, "get_pk", get_dummy_hex_pk_der) + monkeypatch.setattr(pisa_cli, "check_signature", valid_sig) - response = {"locator": dummy_appointment["locator"], "signature": sign_appointment(pisa_sk, dummy_appointment)} + response = {"locator": dummy_appointment.to_dict()["locator"], "signature": get_dummy_signature(None)} request_url = "http://{}/".format(pisa_endpoint) responses.add(responses.POST, request_url, json=response, status=200) @@ -72,12 +122,14 @@ def test_add_appointment_with_invalid_signature(monkeypatch): # Simulate a request to add_appointment for dummy_appointment, but sign with a different key, # make sure that the right endpoint is requested, but the return value is False - # make sure the test uses the right dummy key instead of loading it from disk - monkeypatch.setattr(pisa_cli, "load_public_key", get_dummy_pisa_pk) + # Make sure the test uses the bad dummy signature + monkeypatch.setattr(pisa_cli, "get_appointment_signature", get_bad_signature) + monkeypatch.setattr(pisa_cli, "get_pk", get_dummy_hex_pk_der) + monkeypatch.setattr(pisa_cli, "check_signature", invalid_sig) response = { - "locator": dummy_appointment["locator"], - "signature": sign_appointment(other_sk, dummy_appointment), # signing with a different key + "locator": dummy_appointment.to_dict()["locator"], + "signature": get_bad_signature(None), # Sign with a bad key } request_url = "http://{}/".format(pisa_endpoint) @@ -86,3 +138,75 @@ def test_add_appointment_with_invalid_signature(monkeypatch): result = pisa_cli.add_appointment([json.dumps(dummy_appointment_request)]) assert not result + + +def test_parse_add_appointment_args(): + # If no args are passed, function should fail. + appt_data = pisa_cli.parse_add_appointment_args(None) + assert not appt_data + + # If file doesn't exist, function should fail. + appt_data = pisa_cli.parse_add_appointment_args(["-f", "nonexistent_file"]) + assert not appt_data + + # If file exists and has data in it, function should work. + with open("appt_test_file", "w") as f: + json.dump(dummy_appointment_request, f) + + appt_data = pisa_cli.parse_add_appointment_args(["-f", "appt_test_file"]) + assert appt_data + + os.remove("appt_test_file") + + # If appointment json is passed in, funcion should work. + appt_data = pisa_cli.parse_add_appointment_args([json.dumps(dummy_appointment_request)]) + assert appt_data + + +@responses.activate +def test_post_data_to_add_appointment_endpoint(): + response = { + "locator": dummy_appointment.to_dict()["locator"], + "signature": Cryptographer.sign(dummy_appointment.serialize(), pisa_sk), + } + + request_url = "http://{}/".format(pisa_endpoint) + responses.add(responses.POST, request_url, json=response, status=200) + + response = pisa_cli.post_data_to_add_appointment_endpoint(request_url, json.dumps(dummy_appointment_request)) + + assert len(responses.calls) == 1 + assert responses.calls[0].request.url == request_url + + assert response + + +def test_check_signature(monkeypatch): + # Make sure the test uses the right dummy key instead of loading it from disk + monkeypatch.setattr(pisa_cli, "load_key_file_data", get_dummy_pisa_pk_der) + + valid = pisa_cli.check_signature(get_dummy_signature(), dummy_appointment) + + assert valid + + valid = pisa_cli.check_signature(get_bad_signature(), dummy_appointment) + + assert not valid + + +def test_get_appointment_signature(monkeypatch): + # Make sure the test uses the right dummy key instead of loading it from disk + monkeypatch.setattr(pisa_cli, "load_key_file_data", get_dummy_pisa_sk_der) + + signature = pisa_cli.get_appointment_signature(dummy_appointment) + + assert isinstance(signature, str) + + +def test_get_pk(monkeypatch): + # Make sure the test uses the right dummy key instead of loading it from disk + monkeypatch.setattr(pisa_cli, "load_key_file_data", get_dummy_pisa_pk_der) + + pk = pisa_cli.get_pk() + + assert isinstance(pk, bytes) From 3775b78500f2427fd2e356c8e4a862fc5d395cec Mon Sep 17 00:00:00 2001 From: Turtle Date: Thu, 26 Dec 2019 21:54:55 -0500 Subject: [PATCH 3/4] Fix filename typo in cli README --- apps/cli/README.md | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/apps/cli/README.md b/apps/cli/README.md index a7b9393..9002c93 100644 --- a/apps/cli/README.md +++ b/apps/cli/README.md @@ -1,6 +1,6 @@ -# pisa-cli +# pisa_cli -`pisa-cli` is a command line interface to interact with the PISA server, written in Python3. +`pisa_cli` is a command line interface to interact with the PISA server, written in Python3. ## Dependencies Refer to [DEPENDENCIES.md](DEPENDENCIES.md) @@ -11,7 +11,7 @@ Refer to [INSTALL.md](INSTALL.md) ## Usage - python pisa-cli.py [global options] command [command options] [arguments] + python pisa_cli.py [global options] command [command options] [arguments] #### Global options @@ -54,7 +54,7 @@ The API will return a `text/plain` HTTP response code `200/OK` if the appointmen #### Usage - python pisa-cli add_appointment [command options] / + python pisa_cli add_appointment [command options] / if `-f, --file` **is** specified, then the command expects a path to a json file instead of a json encoded string as parameter. @@ -100,7 +100,7 @@ if `-f, --file` **is** specified, then the command expects a path to a json file #### Usage - python pisa-cli get_appointment + python pisa_cli get_appointment @@ -109,18 +109,18 @@ if `-f, --file` **is** specified, then the command expects a path to a json file Shows the list of commands or help about how to run a specific command. #### Usage - python pisa-cli help + python pisa_cli help or - python pisa-cli help command + python pisa_cli help command ## Example 1. Generate a new dummy appointment. **Note:** this appointment will never be fulfilled (it will eventually expire) since it does not corresopond to a valid transaction. However it can be used to interact with the PISA API. ``` - python pisa-cli.py generate_dummy_appointment + python pisa_cli.py generate_dummy_appointment ``` That will create a json file that follows the appointment data structure filled with dummy data and store it in `dummy_appointment_data.json`. @@ -128,7 +128,7 @@ or 2. Send the appointment to the PISA API. Which will then start monitoring for matching transactions. ``` - python pisa-cli.py add_appointment -f dummy_appointment_data.json + python pisa_cli.py add_appointment -f dummy_appointment_data.json ``` This returns a appointment locator that can be used to get updates about this appointment from PISA. @@ -136,9 +136,9 @@ or 3. Test that PISA is still watching the appointment by replacing the appointment locator received into the following command: ``` - python pisa-cli.py get_appointment + python pisa_cli.py get_appointment ``` ## PISA API -If you wish to read about the underlying API, and how to write your own tool to interact with it, refer to [PISA-API.md](PISA-API.md) \ No newline at end of file +If you wish to read about the underlying API, and how to write your own tool to interact with it, refer to [PISA-API.md](PISA-API.md) From bd41c8b862e76a3020d7583ea7d8b854cc2d3d90 Mon Sep 17 00:00:00 2001 From: Turtle Date: Fri, 27 Dec 2019 01:29:34 -0500 Subject: [PATCH 4/4] Add unit tests for remaining cli functions --- test/apps/cli/unit/test_pisa_cli.py | 80 ++++++++++++++++++++++++++--- 1 file changed, 72 insertions(+), 8 deletions(-) diff --git a/test/apps/cli/unit/test_pisa_cli.py b/test/apps/cli/unit/test_pisa_cli.py index 4f47f98..74c6a95 100644 --- a/test/apps/cli/unit/test_pisa_cli.py +++ b/test/apps/cli/unit/test_pisa_cli.py @@ -1,11 +1,11 @@ import responses import json import os -import pytest +import shutil from binascii import hexlify from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import ec from common.appointment import Appointment @@ -49,7 +49,6 @@ dummy_appointment_request = { "start_time": 1500, "end_time": 50000, "to_self_delay": 200, - "encrypted_blob": get_random_value_hex(120), } # This is the format appointment turns into once it hits "add_appointment" @@ -73,7 +72,7 @@ def get_dummy_pisa_pk_der(*args): def get_dummy_hex_pk_der(*args): - return hexlify(get_dummy_pisa_pk_der(None)) + return hexlify(get_dummy_pisa_pk_der()) def get_dummy_signature(*args): @@ -104,7 +103,7 @@ def test_add_appointment(monkeypatch): monkeypatch.setattr(pisa_cli, "get_pk", get_dummy_hex_pk_der) monkeypatch.setattr(pisa_cli, "check_signature", valid_sig) - response = {"locator": dummy_appointment.to_dict()["locator"], "signature": get_dummy_signature(None)} + response = {"locator": dummy_appointment.to_dict()["locator"], "signature": get_dummy_signature()} request_url = "http://{}/".format(pisa_endpoint) responses.add(responses.POST, request_url, json=response, status=200) @@ -129,7 +128,7 @@ def test_add_appointment_with_invalid_signature(monkeypatch): response = { "locator": dummy_appointment.to_dict()["locator"], - "signature": get_bad_signature(None), # Sign with a bad key + "signature": get_bad_signature(), # Sign with a bad key } request_url = "http://{}/".format(pisa_endpoint) @@ -137,7 +136,44 @@ def test_add_appointment_with_invalid_signature(monkeypatch): result = pisa_cli.add_appointment([json.dumps(dummy_appointment_request)]) - assert not result + assert result is False + + +def test_load_key_file_data(): + # If file exists and has data in it, function should work. + with open("key_test_file", "w+b") as f: + f.write(pisa_sk_der) + + appt_data = pisa_cli.load_key_file_data("key_test_file") + assert appt_data + + os.remove("key_test_file") + + # If file doesn't exist, function should fail. + appt_data = pisa_cli.load_key_file_data("nonexistent_file") + assert not appt_data + + +def test_save_signed_appointment(monkeypatch): + monkeypatch.setattr(pisa_cli, "APPOINTMENTS_FOLDER_NAME", "test_appointments") + + pisa_cli.save_signed_appointment(dummy_appointment.to_dict(), get_dummy_signature()) + + # In folder "Appointments," grab all files and print them. + files = os.listdir("test_appointments") + + found = False + for f in files: + if dummy_appointment.to_dict().get("locator") in f: + found = True + + assert found + + # If "appointments" directory doesn't exist, function should create it. + assert os.path.exists("test_appointments") + + # Delete test directory once we're done. + shutil.rmtree("test_appointments") def test_parse_add_appointment_args(): @@ -158,7 +194,7 @@ def test_parse_add_appointment_args(): os.remove("appt_test_file") - # If appointment json is passed in, funcion should work. + # If appointment json is passed in, function should work. appt_data = pisa_cli.parse_add_appointment_args([json.dumps(dummy_appointment_request)]) assert appt_data @@ -194,6 +230,34 @@ def test_check_signature(monkeypatch): assert not valid +@responses.activate +def test_get_appointment(): + # Response of get_appointment endpoint is an appointment with status added to it. + dummy_appointment_full["status"] = "being_watched" + response = dummy_appointment_full + + request_url = "http://{}/".format(pisa_endpoint) + "get_appointment?locator={}".format(response.get("locator")) + responses.add(responses.GET, request_url, json=response, status=200) + + result = pisa_cli.get_appointment([response.get("locator")]) + + assert len(responses.calls) == 1 + assert responses.calls[0].request.url == request_url + + assert result + + +@responses.activate +def test_get_appointment_err(): + locator = get_random_value_hex(32) + + # Test that get_appointment handles a connection error appropriately. + request_url = "http://{}/".format(pisa_endpoint) + "get_appointment?locator=".format(locator) + responses.add(responses.GET, request_url, body=ConnectionError()) + + assert not pisa_cli.get_appointment([locator]) + + def test_get_appointment_signature(monkeypatch): # Make sure the test uses the right dummy key instead of loading it from disk monkeypatch.setattr(pisa_cli, "load_key_file_data", get_dummy_pisa_sk_der)