From ee4269d0471394954b01f85a8223e6fa59e67470 Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Sat, 1 Feb 2020 12:28:44 +0100 Subject: [PATCH] Updates cli tests and adds some missing ones --- test/apps/cli/unit/test_pisa_cli.py | 205 ++++++++++++---------------- 1 file changed, 88 insertions(+), 117 deletions(-) diff --git a/test/apps/cli/unit/test_pisa_cli.py b/test/apps/cli/unit/test_pisa_cli.py index 0ce40c6..d972118 100644 --- a/test/apps/cli/unit/test_pisa_cli.py +++ b/test/apps/cli/unit/test_pisa_cli.py @@ -9,36 +9,31 @@ from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import ec +from common.tools import compute_locator from common.appointment import Appointment from common.cryptographer import Cryptographer +from apps.cli.blob import Blob import apps.cli.pisa_cli as pisa_cli from test.apps.cli.unit.conftest import get_random_value_hex # dummy keys for the tests -pisa_sk = ec.generate_private_key(ec.SECP256K1, default_backend()) -pisa_pk = pisa_sk.public_key() +dummy_sk = ec.generate_private_key(ec.SECP256K1, default_backend()) +dummy_pk = dummy_sk.public_key() +another_sk = ec.generate_private_key(ec.SECP256K1, default_backend()) -other_sk = ec.generate_private_key(ec.SECP256K1, default_backend()) - -pisa_sk_der = pisa_sk.private_bytes( +dummy_sk_der = dummy_sk.private_bytes( encoding=serialization.Encoding.DER, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.NoEncryption(), ) -pisa_pk_der = pisa_pk.public_bytes( +dummy_pk_der = dummy_pk.public_bytes( encoding=serialization.Encoding.DER, format=serialization.PublicFormat.SubjectPublicKeyInfo ) -other_sk_der = other_sk.private_bytes( - encoding=serialization.Encoding.DER, - format=serialization.PrivateFormat.TraditionalOpenSSL, - encryption_algorithm=serialization.NoEncryption(), -) - # Replace the key in the module with a key we control for the tests -pisa_cli.pisa_public_key = pisa_pk +pisa_cli.pisa_public_key = dummy_pk # Replace endpoint with dummy one pisa_cli.pisa_api_server = "dummy.com" pisa_cli.pisa_api_port = 12345 @@ -54,22 +49,24 @@ dummy_appointment_request = { # This is the format appointment turns into once it hits "add_appointment" dummy_appointment_full = { - "locator": get_random_value_hex(16), - "start_time": 1500, - "end_time": 50000, - "to_self_delay": 200, - "encrypted_blob": get_random_value_hex(120), + "locator": compute_locator(dummy_appointment_request.get("tx_id")), + "start_time": dummy_appointment_request.get("start_time"), + "end_time": dummy_appointment_request.get("end_time"), + "to_self_delay": dummy_appointment_request.get("to_self_delay"), + "encrypted_blob": Cryptographer.encrypt( + Blob(dummy_appointment_request.get("tx")), dummy_appointment_request.get("tx_id") + ), } dummy_appointment = Appointment.from_dict(dummy_appointment_full) -def get_dummy_pisa_sk_der(*args): - return pisa_sk_der +def load_dummy_keys(*args): + return dummy_pk, dummy_sk, dummy_pk_der def get_dummy_pisa_pk_der(*args): - return pisa_pk_der + return dummy_pk_der def get_dummy_hex_pk_der(*args): @@ -77,42 +74,52 @@ def get_dummy_hex_pk_der(*args): def get_dummy_signature(*args): - sk = Cryptographer.load_private_key_der(pisa_sk_der) - return Cryptographer.sign(dummy_appointment.serialize(), sk) + return Cryptographer.sign(dummy_appointment.serialize(), dummy_sk) def get_bad_signature(*args): - sk = Cryptographer.load_private_key_der(other_sk_der) - return Cryptographer.sign(dummy_appointment.serialize(), sk) + return Cryptographer.sign(dummy_appointment.serialize(), another_sk) -def valid_sig(*args): - return True - - -def invalid_sig(*args): - return False +def test_load_keys(): + # Let's first create a private key and public key files + private_key_file_path = "sk_test_file" + public_key_file_path = "pk_test_file" + with open(private_key_file_path, "wb") as f: + f.write(dummy_sk_der) + with open(public_key_file_path, "wb") as f: + f.write(dummy_pk_der) + + # Now we can test the function passing the using this files (we'll use the same pk for both) + r = pisa_cli.load_keys(public_key_file_path, private_key_file_path, public_key_file_path) + assert isinstance(r, tuple) + assert len(r) == 3 + + # If any param does not match we should get None as result + assert pisa_cli.load_keys(None, private_key_file_path, public_key_file_path) is None + assert pisa_cli.load_keys(public_key_file_path, None, public_key_file_path) is None + assert pisa_cli.load_keys(public_key_file_path, private_key_file_path, None) is None + + # The same should happen if we pass a public key where a private should be, for instance + assert pisa_cli.load_keys(private_key_file_path, public_key_file_path, private_key_file_path) is None + + os.remove(private_key_file_path) + os.remove(public_key_file_path) +# TODO: 90-add-more-add-appointment-tests @responses.activate def test_add_appointment(monkeypatch): # Simulate a request to add_appointment for dummy_appointment, make sure that the right endpoint is requested # and the return value is True + monkeypatch.setattr(pisa_cli, "load_keys", load_dummy_keys) - # Make sure the test uses the dummy signature - monkeypatch.setattr(pisa_cli, "get_appointment_signature", get_dummy_signature) - monkeypatch.setattr(pisa_cli, "get_pk", get_dummy_hex_pk_der) - monkeypatch.setattr(pisa_cli, "check_signature", valid_sig) - - response = {"locator": dummy_appointment.to_dict()["locator"], "signature": get_dummy_signature()} - + response = {"locator": dummy_appointment.locator, "signature": get_dummy_signature()} responses.add(responses.POST, pisa_endpoint, json=response, status=200) - result = pisa_cli.add_appointment([json.dumps(dummy_appointment_request)]) assert len(responses.calls) == 1 assert responses.calls[0].request.url == pisa_endpoint - assert result @@ -122,9 +129,7 @@ def test_add_appointment_with_invalid_signature(monkeypatch): # make sure that the right endpoint is requested, but the return value is False # Make sure the test uses the bad dummy signature - monkeypatch.setattr(pisa_cli, "get_appointment_signature", get_bad_signature) - monkeypatch.setattr(pisa_cli, "get_pk", get_dummy_hex_pk_der) - monkeypatch.setattr(pisa_cli, "check_signature", invalid_sig) + monkeypatch.setattr(pisa_cli, "load_keys", load_dummy_keys) response = { "locator": dummy_appointment.to_dict()["locator"], @@ -132,50 +137,11 @@ def test_add_appointment_with_invalid_signature(monkeypatch): } responses.add(responses.POST, pisa_endpoint, json=response, status=200) - result = pisa_cli.add_appointment([json.dumps(dummy_appointment_request)]) assert result is False -def test_load_key_file_data(): - # If file exists and has data in it, function should work. - with open("key_test_file", "w+b") as f: - f.write(pisa_sk_der) - - appt_data = pisa_cli.load_key_file_data("key_test_file") - assert appt_data - - os.remove("key_test_file") - - # If file doesn't exist, function should fail. - with pytest.raises(FileNotFoundError): - assert pisa_cli.load_key_file_data("nonexistent_file") - - -def test_save_signed_appointment(monkeypatch): - appointments_folder = "test_appointments_receipts" - pisa_cli.config["APPOINTMENTS_FOLDER_NAME"] = appointments_folder - - pisa_cli.save_signed_appointment(dummy_appointment.to_dict(), get_dummy_signature()) - - # In folder "Appointments," grab all files and print them. - files = os.listdir(appointments_folder) - - found = False - for f in files: - if dummy_appointment.to_dict().get("locator") in f: - found = True - - assert found - - # If "appointments" directory doesn't exist, function should create it. - assert os.path.exists(appointments_folder) - - # Delete test directory once we're done. - shutil.rmtree(appointments_folder) - - def test_parse_add_appointment_args(): # If no args are passed, function should fail. appt_data = pisa_cli.parse_add_appointment_args(None) @@ -200,33 +166,58 @@ def test_parse_add_appointment_args(): @responses.activate -def test_post_data_to_add_appointment_endpoint(): +def test_post_appointment(): response = { "locator": dummy_appointment.to_dict()["locator"], - "signature": Cryptographer.sign(dummy_appointment.serialize(), pisa_sk), + "signature": Cryptographer.sign(dummy_appointment.serialize(), dummy_pk), } responses.add(responses.POST, pisa_endpoint, json=response, status=200) - - response = pisa_cli.post_data_to_add_appointment_endpoint(json.dumps(dummy_appointment_request)) + response = pisa_cli.post_appointment(json.dumps(dummy_appointment_request)) assert len(responses.calls) == 1 assert responses.calls[0].request.url == pisa_endpoint - assert response -def test_check_signature(monkeypatch): - # Make sure the test uses the right dummy key instead of loading it from disk - monkeypatch.setattr(pisa_cli, "load_key_file_data", get_dummy_pisa_pk_der) +@responses.activate +def test_process_post_appointment_response(): + # Let's first crete a response + response = { + "locator": dummy_appointment.to_dict()["locator"], + "signature": Cryptographer.sign(dummy_appointment.serialize(), dummy_pk), + } - valid = pisa_cli.check_signature(get_dummy_signature(), dummy_appointment) + # A 200 OK with a correct json response should return the json of the response + responses.add(responses.POST, pisa_endpoint, json=response, status=200) + r = pisa_cli.post_appointment(json.dumps(dummy_appointment_request)) + assert pisa_cli.process_post_appointment_response(r) == r.json() - assert valid + # If we modify the response code tor a rejection (lets say 404) we should get None + responses.replace(responses.POST, pisa_endpoint, json=response, status=404) + r = pisa_cli.post_appointment(json.dumps(dummy_appointment_request)) + assert pisa_cli.process_post_appointment_response(r) is None - valid = pisa_cli.check_signature(get_bad_signature(), dummy_appointment) + # The same should happen if the response is not in json + responses.replace(responses.POST, pisa_endpoint, status=404) + r = pisa_cli.post_appointment(json.dumps(dummy_appointment_request)) + assert pisa_cli.process_post_appointment_response(r) is None - assert not valid + +def test_save_appointment_receipt(monkeypatch): + appointments_folder = "test_appointments_receipts" + pisa_cli.config["APPOINTMENTS_FOLDER_NAME"] = appointments_folder + + # The functions creates a new directory if it does not exist + assert not os.path.exists(appointments_folder) + pisa_cli.save_appointment_receipt(dummy_appointment.to_dict(), get_dummy_signature()) + assert os.path.exists(appointments_folder) + + # Check that the receipt has been saved by checking the file names + files = os.listdir(appointments_folder) + assert any([dummy_appointment.locator in f for f in files]) + + shutil.rmtree(appointments_folder) @responses.activate @@ -237,39 +228,19 @@ def test_get_appointment(): request_url = "{}get_appointment?locator={}".format(pisa_endpoint, response.get("locator")) responses.add(responses.GET, request_url, json=response, status=200) - - result = pisa_cli.get_appointment([response.get("locator")]) + result = pisa_cli.get_appointment(response.get("locator")) assert len(responses.calls) == 1 assert responses.calls[0].request.url == request_url - assert result.get("locator") == response.get("locator") @responses.activate def test_get_appointment_err(): - locator = get_random_value_hex(32) + locator = get_random_value_hex(16) # Test that get_appointment handles a connection error appropriately. request_url = "{}get_appointment?locator=".format(pisa_endpoint, locator) responses.add(responses.GET, request_url, body=ConnectionError()) - assert not pisa_cli.get_appointment([locator]) - - -def test_get_appointment_signature(monkeypatch): - # Make sure the test uses the right dummy key instead of loading it from disk - monkeypatch.setattr(pisa_cli, "load_key_file_data", get_dummy_pisa_sk_der) - - signature = pisa_cli.get_appointment_signature(dummy_appointment) - - assert isinstance(signature, str) - - -def test_get_pk(monkeypatch): - # Make sure the test uses the right dummy key instead of loading it from disk - monkeypatch.setattr(pisa_cli, "load_key_file_data", get_dummy_pisa_pk_der) - - pk = pisa_cli.get_pk() - - assert isinstance(pk, bytes) + assert not pisa_cli.get_appointment(locator)