diff --git a/test/apps/cli/unit/test_pisa_cli.py b/test/apps/cli/unit/test_pisa_cli.py index 4f47f98..74c6a95 100644 --- a/test/apps/cli/unit/test_pisa_cli.py +++ b/test/apps/cli/unit/test_pisa_cli.py @@ -1,11 +1,11 @@ import responses import json import os -import pytest +import shutil from binascii import hexlify from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import ec from common.appointment import Appointment @@ -49,7 +49,6 @@ dummy_appointment_request = { "start_time": 1500, "end_time": 50000, "to_self_delay": 200, - "encrypted_blob": get_random_value_hex(120), } # This is the format appointment turns into once it hits "add_appointment" @@ -73,7 +72,7 @@ def get_dummy_pisa_pk_der(*args): def get_dummy_hex_pk_der(*args): - return hexlify(get_dummy_pisa_pk_der(None)) + return hexlify(get_dummy_pisa_pk_der()) def get_dummy_signature(*args): @@ -104,7 +103,7 @@ def test_add_appointment(monkeypatch): monkeypatch.setattr(pisa_cli, "get_pk", get_dummy_hex_pk_der) monkeypatch.setattr(pisa_cli, "check_signature", valid_sig) - response = {"locator": dummy_appointment.to_dict()["locator"], "signature": get_dummy_signature(None)} + response = {"locator": dummy_appointment.to_dict()["locator"], "signature": get_dummy_signature()} request_url = "http://{}/".format(pisa_endpoint) responses.add(responses.POST, request_url, json=response, status=200) @@ -129,7 +128,7 @@ def test_add_appointment_with_invalid_signature(monkeypatch): response = { "locator": dummy_appointment.to_dict()["locator"], - "signature": get_bad_signature(None), # Sign with a bad key + "signature": get_bad_signature(), # Sign with a bad key } request_url = "http://{}/".format(pisa_endpoint) @@ -137,7 +136,44 @@ def test_add_appointment_with_invalid_signature(monkeypatch): result = pisa_cli.add_appointment([json.dumps(dummy_appointment_request)]) - assert not result + assert result is False + + +def test_load_key_file_data(): + # If file exists and has data in it, function should work. + with open("key_test_file", "w+b") as f: + f.write(pisa_sk_der) + + appt_data = pisa_cli.load_key_file_data("key_test_file") + assert appt_data + + os.remove("key_test_file") + + # If file doesn't exist, function should fail. + appt_data = pisa_cli.load_key_file_data("nonexistent_file") + assert not appt_data + + +def test_save_signed_appointment(monkeypatch): + monkeypatch.setattr(pisa_cli, "APPOINTMENTS_FOLDER_NAME", "test_appointments") + + pisa_cli.save_signed_appointment(dummy_appointment.to_dict(), get_dummy_signature()) + + # In folder "Appointments," grab all files and print them. + files = os.listdir("test_appointments") + + found = False + for f in files: + if dummy_appointment.to_dict().get("locator") in f: + found = True + + assert found + + # If "appointments" directory doesn't exist, function should create it. + assert os.path.exists("test_appointments") + + # Delete test directory once we're done. + shutil.rmtree("test_appointments") def test_parse_add_appointment_args(): @@ -158,7 +194,7 @@ def test_parse_add_appointment_args(): os.remove("appt_test_file") - # If appointment json is passed in, funcion should work. + # If appointment json is passed in, function should work. appt_data = pisa_cli.parse_add_appointment_args([json.dumps(dummy_appointment_request)]) assert appt_data @@ -194,6 +230,34 @@ def test_check_signature(monkeypatch): assert not valid +@responses.activate +def test_get_appointment(): + # Response of get_appointment endpoint is an appointment with status added to it. + dummy_appointment_full["status"] = "being_watched" + response = dummy_appointment_full + + request_url = "http://{}/".format(pisa_endpoint) + "get_appointment?locator={}".format(response.get("locator")) + responses.add(responses.GET, request_url, json=response, status=200) + + result = pisa_cli.get_appointment([response.get("locator")]) + + assert len(responses.calls) == 1 + assert responses.calls[0].request.url == request_url + + assert result + + +@responses.activate +def test_get_appointment_err(): + locator = get_random_value_hex(32) + + # Test that get_appointment handles a connection error appropriately. + request_url = "http://{}/".format(pisa_endpoint) + "get_appointment?locator=".format(locator) + responses.add(responses.GET, request_url, body=ConnectionError()) + + assert not pisa_cli.get_appointment([locator]) + + def test_get_appointment_signature(monkeypatch): # Make sure the test uses the right dummy key instead of loading it from disk monkeypatch.setattr(pisa_cli, "load_key_file_data", get_dummy_pisa_sk_der)