mirror of
https://github.com/aljazceru/python-teos.git
synced 2025-12-18 06:34:19 +01:00
Add unit tests for remaining cli functions
This commit is contained in:
@@ -1,11 +1,11 @@
|
|||||||
import responses
|
import responses
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
import pytest
|
import shutil
|
||||||
from binascii import hexlify
|
from binascii import hexlify
|
||||||
|
|
||||||
from cryptography.hazmat.backends import default_backend
|
from cryptography.hazmat.backends import default_backend
|
||||||
from cryptography.hazmat.primitives import hashes, serialization
|
from cryptography.hazmat.primitives import serialization
|
||||||
from cryptography.hazmat.primitives.asymmetric import ec
|
from cryptography.hazmat.primitives.asymmetric import ec
|
||||||
|
|
||||||
from common.appointment import Appointment
|
from common.appointment import Appointment
|
||||||
@@ -49,7 +49,6 @@ dummy_appointment_request = {
|
|||||||
"start_time": 1500,
|
"start_time": 1500,
|
||||||
"end_time": 50000,
|
"end_time": 50000,
|
||||||
"to_self_delay": 200,
|
"to_self_delay": 200,
|
||||||
"encrypted_blob": get_random_value_hex(120),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
# This is the format appointment turns into once it hits "add_appointment"
|
# This is the format appointment turns into once it hits "add_appointment"
|
||||||
@@ -73,7 +72,7 @@ def get_dummy_pisa_pk_der(*args):
|
|||||||
|
|
||||||
|
|
||||||
def get_dummy_hex_pk_der(*args):
|
def get_dummy_hex_pk_der(*args):
|
||||||
return hexlify(get_dummy_pisa_pk_der(None))
|
return hexlify(get_dummy_pisa_pk_der())
|
||||||
|
|
||||||
|
|
||||||
def get_dummy_signature(*args):
|
def get_dummy_signature(*args):
|
||||||
@@ -104,7 +103,7 @@ def test_add_appointment(monkeypatch):
|
|||||||
monkeypatch.setattr(pisa_cli, "get_pk", get_dummy_hex_pk_der)
|
monkeypatch.setattr(pisa_cli, "get_pk", get_dummy_hex_pk_der)
|
||||||
monkeypatch.setattr(pisa_cli, "check_signature", valid_sig)
|
monkeypatch.setattr(pisa_cli, "check_signature", valid_sig)
|
||||||
|
|
||||||
response = {"locator": dummy_appointment.to_dict()["locator"], "signature": get_dummy_signature(None)}
|
response = {"locator": dummy_appointment.to_dict()["locator"], "signature": get_dummy_signature()}
|
||||||
|
|
||||||
request_url = "http://{}/".format(pisa_endpoint)
|
request_url = "http://{}/".format(pisa_endpoint)
|
||||||
responses.add(responses.POST, request_url, json=response, status=200)
|
responses.add(responses.POST, request_url, json=response, status=200)
|
||||||
@@ -129,7 +128,7 @@ def test_add_appointment_with_invalid_signature(monkeypatch):
|
|||||||
|
|
||||||
response = {
|
response = {
|
||||||
"locator": dummy_appointment.to_dict()["locator"],
|
"locator": dummy_appointment.to_dict()["locator"],
|
||||||
"signature": get_bad_signature(None), # Sign with a bad key
|
"signature": get_bad_signature(), # Sign with a bad key
|
||||||
}
|
}
|
||||||
|
|
||||||
request_url = "http://{}/".format(pisa_endpoint)
|
request_url = "http://{}/".format(pisa_endpoint)
|
||||||
@@ -137,7 +136,44 @@ def test_add_appointment_with_invalid_signature(monkeypatch):
|
|||||||
|
|
||||||
result = pisa_cli.add_appointment([json.dumps(dummy_appointment_request)])
|
result = pisa_cli.add_appointment([json.dumps(dummy_appointment_request)])
|
||||||
|
|
||||||
assert not result
|
assert result is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_load_key_file_data():
|
||||||
|
# If file exists and has data in it, function should work.
|
||||||
|
with open("key_test_file", "w+b") as f:
|
||||||
|
f.write(pisa_sk_der)
|
||||||
|
|
||||||
|
appt_data = pisa_cli.load_key_file_data("key_test_file")
|
||||||
|
assert appt_data
|
||||||
|
|
||||||
|
os.remove("key_test_file")
|
||||||
|
|
||||||
|
# If file doesn't exist, function should fail.
|
||||||
|
appt_data = pisa_cli.load_key_file_data("nonexistent_file")
|
||||||
|
assert not appt_data
|
||||||
|
|
||||||
|
|
||||||
|
def test_save_signed_appointment(monkeypatch):
|
||||||
|
monkeypatch.setattr(pisa_cli, "APPOINTMENTS_FOLDER_NAME", "test_appointments")
|
||||||
|
|
||||||
|
pisa_cli.save_signed_appointment(dummy_appointment.to_dict(), get_dummy_signature())
|
||||||
|
|
||||||
|
# In folder "Appointments," grab all files and print them.
|
||||||
|
files = os.listdir("test_appointments")
|
||||||
|
|
||||||
|
found = False
|
||||||
|
for f in files:
|
||||||
|
if dummy_appointment.to_dict().get("locator") in f:
|
||||||
|
found = True
|
||||||
|
|
||||||
|
assert found
|
||||||
|
|
||||||
|
# If "appointments" directory doesn't exist, function should create it.
|
||||||
|
assert os.path.exists("test_appointments")
|
||||||
|
|
||||||
|
# Delete test directory once we're done.
|
||||||
|
shutil.rmtree("test_appointments")
|
||||||
|
|
||||||
|
|
||||||
def test_parse_add_appointment_args():
|
def test_parse_add_appointment_args():
|
||||||
@@ -158,7 +194,7 @@ def test_parse_add_appointment_args():
|
|||||||
|
|
||||||
os.remove("appt_test_file")
|
os.remove("appt_test_file")
|
||||||
|
|
||||||
# If appointment json is passed in, funcion should work.
|
# If appointment json is passed in, function should work.
|
||||||
appt_data = pisa_cli.parse_add_appointment_args([json.dumps(dummy_appointment_request)])
|
appt_data = pisa_cli.parse_add_appointment_args([json.dumps(dummy_appointment_request)])
|
||||||
assert appt_data
|
assert appt_data
|
||||||
|
|
||||||
@@ -194,6 +230,34 @@ def test_check_signature(monkeypatch):
|
|||||||
assert not valid
|
assert not valid
|
||||||
|
|
||||||
|
|
||||||
|
@responses.activate
|
||||||
|
def test_get_appointment():
|
||||||
|
# Response of get_appointment endpoint is an appointment with status added to it.
|
||||||
|
dummy_appointment_full["status"] = "being_watched"
|
||||||
|
response = dummy_appointment_full
|
||||||
|
|
||||||
|
request_url = "http://{}/".format(pisa_endpoint) + "get_appointment?locator={}".format(response.get("locator"))
|
||||||
|
responses.add(responses.GET, request_url, json=response, status=200)
|
||||||
|
|
||||||
|
result = pisa_cli.get_appointment([response.get("locator")])
|
||||||
|
|
||||||
|
assert len(responses.calls) == 1
|
||||||
|
assert responses.calls[0].request.url == request_url
|
||||||
|
|
||||||
|
assert result
|
||||||
|
|
||||||
|
|
||||||
|
@responses.activate
|
||||||
|
def test_get_appointment_err():
|
||||||
|
locator = get_random_value_hex(32)
|
||||||
|
|
||||||
|
# Test that get_appointment handles a connection error appropriately.
|
||||||
|
request_url = "http://{}/".format(pisa_endpoint) + "get_appointment?locator=".format(locator)
|
||||||
|
responses.add(responses.GET, request_url, body=ConnectionError())
|
||||||
|
|
||||||
|
assert not pisa_cli.get_appointment([locator])
|
||||||
|
|
||||||
|
|
||||||
def test_get_appointment_signature(monkeypatch):
|
def test_get_appointment_signature(monkeypatch):
|
||||||
# Make sure the test uses the right dummy key instead of loading it from disk
|
# Make sure the test uses the right dummy key instead of loading it from disk
|
||||||
monkeypatch.setattr(pisa_cli, "load_key_file_data", get_dummy_pisa_sk_der)
|
monkeypatch.setattr(pisa_cli, "load_key_file_data", get_dummy_pisa_sk_der)
|
||||||
|
|||||||
Reference in New Issue
Block a user