Updates cli tests and adds some missing ones

This commit is contained in:
Sergi Delgado Segura
2020-02-01 12:28:44 +01:00
parent 5a49a93710
commit ee4269d047

View File

@@ -9,36 +9,31 @@ from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ec from cryptography.hazmat.primitives.asymmetric import ec
from common.tools import compute_locator
from common.appointment import Appointment from common.appointment import Appointment
from common.cryptographer import Cryptographer from common.cryptographer import Cryptographer
from apps.cli.blob import Blob
import apps.cli.pisa_cli as pisa_cli import apps.cli.pisa_cli as pisa_cli
from test.apps.cli.unit.conftest import get_random_value_hex from test.apps.cli.unit.conftest import get_random_value_hex
# dummy keys for the tests # dummy keys for the tests
pisa_sk = ec.generate_private_key(ec.SECP256K1, default_backend()) dummy_sk = ec.generate_private_key(ec.SECP256K1, default_backend())
pisa_pk = pisa_sk.public_key() dummy_pk = dummy_sk.public_key()
another_sk = ec.generate_private_key(ec.SECP256K1, default_backend())
other_sk = ec.generate_private_key(ec.SECP256K1, default_backend()) dummy_sk_der = dummy_sk.private_bytes(
pisa_sk_der = pisa_sk.private_bytes(
encoding=serialization.Encoding.DER, encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.TraditionalOpenSSL, format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(), encryption_algorithm=serialization.NoEncryption(),
) )
pisa_pk_der = pisa_pk.public_bytes( dummy_pk_der = dummy_pk.public_bytes(
encoding=serialization.Encoding.DER, format=serialization.PublicFormat.SubjectPublicKeyInfo encoding=serialization.Encoding.DER, format=serialization.PublicFormat.SubjectPublicKeyInfo
) )
other_sk_der = other_sk.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
# Replace the key in the module with a key we control for the tests # Replace the key in the module with a key we control for the tests
pisa_cli.pisa_public_key = pisa_pk pisa_cli.pisa_public_key = dummy_pk
# Replace endpoint with dummy one # Replace endpoint with dummy one
pisa_cli.pisa_api_server = "dummy.com" pisa_cli.pisa_api_server = "dummy.com"
pisa_cli.pisa_api_port = 12345 pisa_cli.pisa_api_port = 12345
@@ -54,22 +49,24 @@ dummy_appointment_request = {
# This is the format appointment turns into once it hits "add_appointment" # This is the format appointment turns into once it hits "add_appointment"
dummy_appointment_full = { dummy_appointment_full = {
"locator": get_random_value_hex(16), "locator": compute_locator(dummy_appointment_request.get("tx_id")),
"start_time": 1500, "start_time": dummy_appointment_request.get("start_time"),
"end_time": 50000, "end_time": dummy_appointment_request.get("end_time"),
"to_self_delay": 200, "to_self_delay": dummy_appointment_request.get("to_self_delay"),
"encrypted_blob": get_random_value_hex(120), "encrypted_blob": Cryptographer.encrypt(
Blob(dummy_appointment_request.get("tx")), dummy_appointment_request.get("tx_id")
),
} }
dummy_appointment = Appointment.from_dict(dummy_appointment_full) dummy_appointment = Appointment.from_dict(dummy_appointment_full)
def get_dummy_pisa_sk_der(*args): def load_dummy_keys(*args):
return pisa_sk_der return dummy_pk, dummy_sk, dummy_pk_der
def get_dummy_pisa_pk_der(*args): def get_dummy_pisa_pk_der(*args):
return pisa_pk_der return dummy_pk_der
def get_dummy_hex_pk_der(*args): def get_dummy_hex_pk_der(*args):
@@ -77,42 +74,52 @@ def get_dummy_hex_pk_der(*args):
def get_dummy_signature(*args): def get_dummy_signature(*args):
sk = Cryptographer.load_private_key_der(pisa_sk_der) return Cryptographer.sign(dummy_appointment.serialize(), dummy_sk)
return Cryptographer.sign(dummy_appointment.serialize(), sk)
def get_bad_signature(*args): def get_bad_signature(*args):
sk = Cryptographer.load_private_key_der(other_sk_der) return Cryptographer.sign(dummy_appointment.serialize(), another_sk)
return Cryptographer.sign(dummy_appointment.serialize(), sk)
def valid_sig(*args): def test_load_keys():
return True # Let's first create a private key and public key files
private_key_file_path = "sk_test_file"
public_key_file_path = "pk_test_file"
def invalid_sig(*args): with open(private_key_file_path, "wb") as f:
return False f.write(dummy_sk_der)
with open(public_key_file_path, "wb") as f:
f.write(dummy_pk_der)
# Now we can test the function passing the using this files (we'll use the same pk for both)
r = pisa_cli.load_keys(public_key_file_path, private_key_file_path, public_key_file_path)
assert isinstance(r, tuple)
assert len(r) == 3
# If any param does not match we should get None as result
assert pisa_cli.load_keys(None, private_key_file_path, public_key_file_path) is None
assert pisa_cli.load_keys(public_key_file_path, None, public_key_file_path) is None
assert pisa_cli.load_keys(public_key_file_path, private_key_file_path, None) is None
# The same should happen if we pass a public key where a private should be, for instance
assert pisa_cli.load_keys(private_key_file_path, public_key_file_path, private_key_file_path) is None
os.remove(private_key_file_path)
os.remove(public_key_file_path)
# TODO: 90-add-more-add-appointment-tests
@responses.activate @responses.activate
def test_add_appointment(monkeypatch): def test_add_appointment(monkeypatch):
# Simulate a request to add_appointment for dummy_appointment, make sure that the right endpoint is requested # Simulate a request to add_appointment for dummy_appointment, make sure that the right endpoint is requested
# and the return value is True # and the return value is True
monkeypatch.setattr(pisa_cli, "load_keys", load_dummy_keys)
# Make sure the test uses the dummy signature response = {"locator": dummy_appointment.locator, "signature": get_dummy_signature()}
monkeypatch.setattr(pisa_cli, "get_appointment_signature", get_dummy_signature)
monkeypatch.setattr(pisa_cli, "get_pk", get_dummy_hex_pk_der)
monkeypatch.setattr(pisa_cli, "check_signature", valid_sig)
response = {"locator": dummy_appointment.to_dict()["locator"], "signature": get_dummy_signature()}
responses.add(responses.POST, pisa_endpoint, json=response, status=200) responses.add(responses.POST, pisa_endpoint, json=response, status=200)
result = pisa_cli.add_appointment([json.dumps(dummy_appointment_request)]) result = pisa_cli.add_appointment([json.dumps(dummy_appointment_request)])
assert len(responses.calls) == 1 assert len(responses.calls) == 1
assert responses.calls[0].request.url == pisa_endpoint assert responses.calls[0].request.url == pisa_endpoint
assert result assert result
@@ -122,9 +129,7 @@ def test_add_appointment_with_invalid_signature(monkeypatch):
# make sure that the right endpoint is requested, but the return value is False # make sure that the right endpoint is requested, but the return value is False
# Make sure the test uses the bad dummy signature # Make sure the test uses the bad dummy signature
monkeypatch.setattr(pisa_cli, "get_appointment_signature", get_bad_signature) monkeypatch.setattr(pisa_cli, "load_keys", load_dummy_keys)
monkeypatch.setattr(pisa_cli, "get_pk", get_dummy_hex_pk_der)
monkeypatch.setattr(pisa_cli, "check_signature", invalid_sig)
response = { response = {
"locator": dummy_appointment.to_dict()["locator"], "locator": dummy_appointment.to_dict()["locator"],
@@ -132,50 +137,11 @@ def test_add_appointment_with_invalid_signature(monkeypatch):
} }
responses.add(responses.POST, pisa_endpoint, json=response, status=200) responses.add(responses.POST, pisa_endpoint, json=response, status=200)
result = pisa_cli.add_appointment([json.dumps(dummy_appointment_request)]) result = pisa_cli.add_appointment([json.dumps(dummy_appointment_request)])
assert result is False assert result is False
def test_load_key_file_data():
# If file exists and has data in it, function should work.
with open("key_test_file", "w+b") as f:
f.write(pisa_sk_der)
appt_data = pisa_cli.load_key_file_data("key_test_file")
assert appt_data
os.remove("key_test_file")
# If file doesn't exist, function should fail.
with pytest.raises(FileNotFoundError):
assert pisa_cli.load_key_file_data("nonexistent_file")
def test_save_signed_appointment(monkeypatch):
appointments_folder = "test_appointments_receipts"
pisa_cli.config["APPOINTMENTS_FOLDER_NAME"] = appointments_folder
pisa_cli.save_signed_appointment(dummy_appointment.to_dict(), get_dummy_signature())
# In folder "Appointments," grab all files and print them.
files = os.listdir(appointments_folder)
found = False
for f in files:
if dummy_appointment.to_dict().get("locator") in f:
found = True
assert found
# If "appointments" directory doesn't exist, function should create it.
assert os.path.exists(appointments_folder)
# Delete test directory once we're done.
shutil.rmtree(appointments_folder)
def test_parse_add_appointment_args(): def test_parse_add_appointment_args():
# If no args are passed, function should fail. # If no args are passed, function should fail.
appt_data = pisa_cli.parse_add_appointment_args(None) appt_data = pisa_cli.parse_add_appointment_args(None)
@@ -200,33 +166,58 @@ def test_parse_add_appointment_args():
@responses.activate @responses.activate
def test_post_data_to_add_appointment_endpoint(): def test_post_appointment():
response = { response = {
"locator": dummy_appointment.to_dict()["locator"], "locator": dummy_appointment.to_dict()["locator"],
"signature": Cryptographer.sign(dummy_appointment.serialize(), pisa_sk), "signature": Cryptographer.sign(dummy_appointment.serialize(), dummy_pk),
} }
responses.add(responses.POST, pisa_endpoint, json=response, status=200) responses.add(responses.POST, pisa_endpoint, json=response, status=200)
response = pisa_cli.post_appointment(json.dumps(dummy_appointment_request))
response = pisa_cli.post_data_to_add_appointment_endpoint(json.dumps(dummy_appointment_request))
assert len(responses.calls) == 1 assert len(responses.calls) == 1
assert responses.calls[0].request.url == pisa_endpoint assert responses.calls[0].request.url == pisa_endpoint
assert response assert response
def test_check_signature(monkeypatch): @responses.activate
# Make sure the test uses the right dummy key instead of loading it from disk def test_process_post_appointment_response():
monkeypatch.setattr(pisa_cli, "load_key_file_data", get_dummy_pisa_pk_der) # Let's first crete a response
response = {
"locator": dummy_appointment.to_dict()["locator"],
"signature": Cryptographer.sign(dummy_appointment.serialize(), dummy_pk),
}
valid = pisa_cli.check_signature(get_dummy_signature(), dummy_appointment) # A 200 OK with a correct json response should return the json of the response
responses.add(responses.POST, pisa_endpoint, json=response, status=200)
r = pisa_cli.post_appointment(json.dumps(dummy_appointment_request))
assert pisa_cli.process_post_appointment_response(r) == r.json()
assert valid # If we modify the response code tor a rejection (lets say 404) we should get None
responses.replace(responses.POST, pisa_endpoint, json=response, status=404)
r = pisa_cli.post_appointment(json.dumps(dummy_appointment_request))
assert pisa_cli.process_post_appointment_response(r) is None
valid = pisa_cli.check_signature(get_bad_signature(), dummy_appointment) # The same should happen if the response is not in json
responses.replace(responses.POST, pisa_endpoint, status=404)
r = pisa_cli.post_appointment(json.dumps(dummy_appointment_request))
assert pisa_cli.process_post_appointment_response(r) is None
assert not valid
def test_save_appointment_receipt(monkeypatch):
appointments_folder = "test_appointments_receipts"
pisa_cli.config["APPOINTMENTS_FOLDER_NAME"] = appointments_folder
# The functions creates a new directory if it does not exist
assert not os.path.exists(appointments_folder)
pisa_cli.save_appointment_receipt(dummy_appointment.to_dict(), get_dummy_signature())
assert os.path.exists(appointments_folder)
# Check that the receipt has been saved by checking the file names
files = os.listdir(appointments_folder)
assert any([dummy_appointment.locator in f for f in files])
shutil.rmtree(appointments_folder)
@responses.activate @responses.activate
@@ -237,39 +228,19 @@ def test_get_appointment():
request_url = "{}get_appointment?locator={}".format(pisa_endpoint, response.get("locator")) request_url = "{}get_appointment?locator={}".format(pisa_endpoint, response.get("locator"))
responses.add(responses.GET, request_url, json=response, status=200) responses.add(responses.GET, request_url, json=response, status=200)
result = pisa_cli.get_appointment(response.get("locator"))
result = pisa_cli.get_appointment([response.get("locator")])
assert len(responses.calls) == 1 assert len(responses.calls) == 1
assert responses.calls[0].request.url == request_url assert responses.calls[0].request.url == request_url
assert result.get("locator") == response.get("locator") assert result.get("locator") == response.get("locator")
@responses.activate @responses.activate
def test_get_appointment_err(): def test_get_appointment_err():
locator = get_random_value_hex(32) locator = get_random_value_hex(16)
# Test that get_appointment handles a connection error appropriately. # Test that get_appointment handles a connection error appropriately.
request_url = "{}get_appointment?locator=".format(pisa_endpoint, locator) request_url = "{}get_appointment?locator=".format(pisa_endpoint, locator)
responses.add(responses.GET, request_url, body=ConnectionError()) responses.add(responses.GET, request_url, body=ConnectionError())
assert not pisa_cli.get_appointment([locator]) assert not pisa_cli.get_appointment(locator)
def test_get_appointment_signature(monkeypatch):
# Make sure the test uses the right dummy key instead of loading it from disk
monkeypatch.setattr(pisa_cli, "load_key_file_data", get_dummy_pisa_sk_der)
signature = pisa_cli.get_appointment_signature(dummy_appointment)
assert isinstance(signature, str)
def test_get_pk(monkeypatch):
# Make sure the test uses the right dummy key instead of loading it from disk
monkeypatch.setattr(pisa_cli, "load_key_file_data", get_dummy_pisa_pk_der)
pk = pisa_cli.get_pk()
assert isinstance(pk, bytes)