Adds unit tests for register, and adaps existing tests to match the changes in the commands

This commit is contained in:
Sergi Delgado Segura
2020-03-30 22:24:20 +02:00
parent f8cc099696
commit 31a25f3f60

View File

@@ -20,15 +20,17 @@ common.cryptographer.logger = Logger(actor="Cryptographer", log_name_prefix=teos
config = get_config() config = get_config()
# dummy keys for the tests # dummy keys for the tests
dummy_sk = PrivateKey() dummy_cli_sk = PrivateKey.from_int(1)
dummy_pk = dummy_sk.public_key dummy_cli_compressed_pk = dummy_cli_sk.public_key.format(compressed=True)
another_sk = PrivateKey() dummy_teos_sk = PrivateKey.from_int(2)
dummy_teos_pk = dummy_teos_sk.public_key
another_sk = PrivateKey.from_int(3)
teos_url = "http://{}:{}".format(config.get("TEOS_SERVER"), config.get("TEOS_PORT")) teos_url = "http://{}:{}".format(config.get("TEOS_SERVER"), config.get("TEOS_PORT"))
add_appointment_endpoint = "{}/add_appointment".format(teos_url) add_appointment_endpoint = "{}/add_appointment".format(teos_url)
get_appointment_endpoint = "{}/get_appointment".format(teos_url) get_appointment_endpoint = "{}/get_appointment".format(teos_url)
dummy_appointment_request = { dummy_appointment_data = {
"tx": get_random_value_hex(192), "tx": get_random_value_hex(192),
"tx_id": get_random_value_hex(32), "tx_id": get_random_value_hex(32),
"start_time": 1500, "start_time": 1500,
@@ -37,29 +39,93 @@ dummy_appointment_request = {
} }
# This is the format appointment turns into once it hits "add_appointment" # This is the format appointment turns into once it hits "add_appointment"
dummy_appointment_full = { dummy_appointment_dict = {
"locator": compute_locator(dummy_appointment_request.get("tx_id")), "locator": compute_locator(dummy_appointment_data.get("tx_id")),
"start_time": dummy_appointment_request.get("start_time"), "start_time": dummy_appointment_data.get("start_time"),
"end_time": dummy_appointment_request.get("end_time"), "end_time": dummy_appointment_data.get("end_time"),
"to_self_delay": dummy_appointment_request.get("to_self_delay"), "to_self_delay": dummy_appointment_data.get("to_self_delay"),
"encrypted_blob": Cryptographer.encrypt( "encrypted_blob": Cryptographer.encrypt(
Blob(dummy_appointment_request.get("tx")), dummy_appointment_request.get("tx_id") Blob(dummy_appointment_data.get("tx")), dummy_appointment_data.get("tx_id")
), ),
} }
dummy_appointment = Appointment.from_dict(dummy_appointment_full) dummy_appointment = Appointment.from_dict(dummy_appointment_dict)
def load_dummy_keys(*args): def get_signature(message, sk):
return dummy_pk, dummy_sk, dummy_pk.format(compressed=True) return Cryptographer.sign(message, sk)
def get_dummy_signature(*args): def test_register():
return Cryptographer.sign(dummy_appointment.serialize(), dummy_sk) pass
def get_bad_signature(*args): # TODO: 90-add-more-add-appointment-tests
return Cryptographer.sign(dummy_appointment.serialize(), another_sk) @responses.activate
def test_add_appointment():
# Simulate a request to add_appointment for dummy_appointment, make sure that the right endpoint is requested
# and the return value is True
response = {
"locator": dummy_appointment.locator,
"signature": get_signature(dummy_appointment.serialize(), dummy_teos_sk),
"available_slots": 100,
}
responses.add(responses.POST, add_appointment_endpoint, json=response, status=200)
result = teos_cli.add_appointment(
dummy_appointment_data, dummy_cli_sk, dummy_teos_pk, teos_url, config.get("APPOINTMENTS_FOLDER_NAME")
)
assert len(responses.calls) == 1
assert responses.calls[0].request.url == add_appointment_endpoint
assert result
@responses.activate
def test_add_appointment_with_invalid_signature(monkeypatch):
# Simulate a request to add_appointment for dummy_appointment, but sign with a different key,
# make sure that the right endpoint is requested, but the return value is False
response = {
"locator": dummy_appointment.to_dict()["locator"],
"signature": get_signature(dummy_appointment.serialize(), another_sk), # Sign with a bad key
"available_slots": 100,
}
responses.add(responses.POST, add_appointment_endpoint, json=response, status=200)
result = teos_cli.add_appointment(
dummy_appointment_data, dummy_cli_sk, dummy_teos_pk, teos_url, config.get("APPOINTMENTS_FOLDER_NAME")
)
assert result is False
shutil.rmtree(config.get("APPOINTMENTS_FOLDER_NAME"))
@responses.activate
def test_get_appointment():
# Response of get_appointment endpoint is an appointment with status added to it.
response = {
"locator": dummy_appointment_dict.get("locator"),
"status": "being_watch",
"appointment": dummy_appointment_dict,
}
responses.add(responses.POST, get_appointment_endpoint, json=response, status=200)
result = teos_cli.get_appointment(dummy_appointment_dict.get("locator"), dummy_cli_sk, dummy_teos_pk, teos_url)
assert len(responses.calls) == 1
assert responses.calls[0].request.url == get_appointment_endpoint
assert result.get("locator") == response.get("locator")
@responses.activate
def test_get_appointment_err():
locator = get_random_value_hex(16)
# Test that get_appointment handles a connection error appropriately.
responses.add(responses.POST, get_appointment_endpoint, body=ConnectionError())
assert not teos_cli.get_appointment(locator, dummy_cli_sk, dummy_teos_pk, teos_url)
def test_load_keys(): def test_load_keys():
@@ -68,9 +134,9 @@ def test_load_keys():
public_key_file_path = "pk_test_file" public_key_file_path = "pk_test_file"
empty_file_path = "empty_file" empty_file_path = "empty_file"
with open(private_key_file_path, "wb") as f: with open(private_key_file_path, "wb") as f:
f.write(dummy_sk.to_der()) f.write(dummy_cli_sk.to_der())
with open(public_key_file_path, "wb") as f: with open(public_key_file_path, "wb") as f:
f.write(dummy_pk.format(compressed=True)) f.write(dummy_cli_compressed_pk)
with open(empty_file_path, "wb") as f: with open(empty_file_path, "wb") as f:
pass pass
@@ -97,42 +163,44 @@ def test_load_keys():
os.remove(empty_file_path) os.remove(empty_file_path)
# TODO: 90-add-more-add-appointment-tests # WIP: HERE
@responses.activate @responses.activate
def test_add_appointment(monkeypatch): def test_post_request():
# Simulate a request to add_appointment for dummy_appointment, make sure that the right endpoint is requested
# and the return value is True
monkeypatch.setattr(teos_cli, "load_keys", load_dummy_keys)
response = {"locator": dummy_appointment.locator, "signature": get_dummy_signature()}
responses.add(responses.POST, add_appointment_endpoint, json=response, status=200)
result = teos_cli.add_appointment([json.dumps(dummy_appointment_request)], teos_url, config)
assert len(responses.calls) == 1
assert responses.calls[0].request.url == add_appointment_endpoint
assert result
@responses.activate
def test_add_appointment_with_invalid_signature(monkeypatch):
# Simulate a request to add_appointment for dummy_appointment, but sign with a different key,
# make sure that the right endpoint is requested, but the return value is False
# Make sure the test uses the bad dummy signature
monkeypatch.setattr(teos_cli, "load_keys", load_dummy_keys)
response = { response = {
"locator": dummy_appointment.to_dict()["locator"], "locator": dummy_appointment.to_dict()["locator"],
"signature": get_bad_signature(), # Sign with a bad key "signature": get_signature(dummy_appointment.serialize(), dummy_teos_sk),
} }
responses.add(responses.POST, add_appointment_endpoint, json=response, status=200) responses.add(responses.POST, add_appointment_endpoint, json=response, status=200)
result = teos_cli.add_appointment([json.dumps(dummy_appointment_request)], teos_url, config) response = teos_cli.post_request(json.dumps(dummy_appointment_data), add_appointment_endpoint)
shutil.rmtree(config.get("APPOINTMENTS_FOLDER_NAME")) assert len(responses.calls) == 1
assert responses.calls[0].request.url == add_appointment_endpoint
assert response
assert result is False
@responses.activate
def test_process_post_response():
# Let's first crete a response
response = {
"locator": dummy_appointment.to_dict()["locator"],
"signature": get_signature(dummy_appointment.serialize(), dummy_teos_sk),
}
# A 200 OK with a correct json response should return the json of the response
responses.add(responses.POST, add_appointment_endpoint, json=response, status=200)
r = teos_cli.post_request(json.dumps(dummy_appointment_data), add_appointment_endpoint)
assert teos_cli.process_post_response(r) == r.json()
# If we modify the response code for a rejection (lets say 404) we should get None
responses.replace(responses.POST, add_appointment_endpoint, json=response, status=404)
r = teos_cli.post_request(json.dumps(dummy_appointment_data), add_appointment_endpoint)
assert teos_cli.process_post_response(r) is None
# The same should happen if the response is not in json
responses.replace(responses.POST, add_appointment_endpoint, status=404)
r = teos_cli.post_request(json.dumps(dummy_appointment_data), add_appointment_endpoint)
assert teos_cli.process_post_response(r) is None
def test_parse_add_appointment_args(): def test_parse_add_appointment_args():
@@ -146,7 +214,7 @@ def test_parse_add_appointment_args():
# If file exists and has data in it, function should work. # If file exists and has data in it, function should work.
with open("appt_test_file", "w") as f: with open("appt_test_file", "w") as f:
json.dump(dummy_appointment_request, f) json.dump(dummy_appointment_data, f)
appt_data = teos_cli.parse_add_appointment_args(["-f", "appt_test_file"]) appt_data = teos_cli.parse_add_appointment_args(["-f", "appt_test_file"])
assert appt_data assert appt_data
@@ -154,56 +222,21 @@ def test_parse_add_appointment_args():
os.remove("appt_test_file") os.remove("appt_test_file")
# If appointment json is passed in, function should work. # If appointment json is passed in, function should work.
appt_data = teos_cli.parse_add_appointment_args([json.dumps(dummy_appointment_request)]) appt_data = teos_cli.parse_add_appointment_args([json.dumps(dummy_appointment_data)])
assert appt_data assert appt_data
@responses.activate
def test_post_appointment():
response = {
"locator": dummy_appointment.to_dict()["locator"],
"signature": Cryptographer.sign(dummy_appointment.serialize(), dummy_pk),
}
responses.add(responses.POST, add_appointment_endpoint, json=response, status=200)
response = teos_cli.post_appointment(json.dumps(dummy_appointment_request), teos_url)
assert len(responses.calls) == 1
assert responses.calls[0].request.url == add_appointment_endpoint
assert response
@responses.activate
def test_process_post_appointment_response():
# Let's first crete a response
response = {
"locator": dummy_appointment.to_dict()["locator"],
"signature": Cryptographer.sign(dummy_appointment.serialize(), dummy_pk),
}
# A 200 OK with a correct json response should return the json of the response
responses.add(responses.POST, add_appointment_endpoint, json=response, status=200)
r = teos_cli.post_appointment(json.dumps(dummy_appointment_request), teos_url)
assert teos_cli.process_post_appointment_response(r) == r.json()
# If we modify the response code tor a rejection (lets say 404) we should get None
responses.replace(responses.POST, add_appointment_endpoint, json=response, status=404)
r = teos_cli.post_appointment(json.dumps(dummy_appointment_request), teos_url)
assert teos_cli.process_post_appointment_response(r) is None
# The same should happen if the response is not in json
responses.replace(responses.POST, add_appointment_endpoint, status=404)
r = teos_cli.post_appointment(json.dumps(dummy_appointment_request), teos_url)
assert teos_cli.process_post_appointment_response(r) is None
def test_save_appointment_receipt(monkeypatch): def test_save_appointment_receipt(monkeypatch):
appointments_folder = "test_appointments_receipts" appointments_folder = "test_appointments_receipts"
config["APPOINTMENTS_FOLDER_NAME"] = appointments_folder config["APPOINTMENTS_FOLDER_NAME"] = appointments_folder
# The functions creates a new directory if it does not exist # The functions creates a new directory if it does not exist
assert not os.path.exists(appointments_folder) assert not os.path.exists(appointments_folder)
teos_cli.save_appointment_receipt(dummy_appointment.to_dict(), get_dummy_signature(), config) teos_cli.save_appointment_receipt(
dummy_appointment.to_dict(),
get_signature(dummy_appointment.serialize(), dummy_teos_sk),
config.get("APPOINTMENTS_FOLDER_NAME"),
)
assert os.path.exists(appointments_folder) assert os.path.exists(appointments_folder)
# Check that the receipt has been saved by checking the file names # Check that the receipt has been saved by checking the file names
@@ -211,29 +244,3 @@ def test_save_appointment_receipt(monkeypatch):
assert any([dummy_appointment.locator in f for f in files]) assert any([dummy_appointment.locator in f for f in files])
shutil.rmtree(appointments_folder) shutil.rmtree(appointments_folder)
@responses.activate
def test_get_appointment():
# Response of get_appointment endpoint is an appointment with status added to it.
dummy_appointment_full["status"] = "being_watched"
response = dummy_appointment_full
request_url = "{}?locator={}".format(get_appointment_endpoint, response.get("locator"))
responses.add(responses.GET, request_url, json=response, status=200)
result = teos_cli.get_appointment(response.get("locator"), teos_url)
assert len(responses.calls) == 1
assert responses.calls[0].request.url == request_url
assert result.get("locator") == response.get("locator")
@responses.activate
def test_get_appointment_err():
locator = get_random_value_hex(16)
# Test that get_appointment handles a connection error appropriately.
request_url = "{}?locator={}".format(get_appointment_endpoint, locator)
responses.add(responses.GET, request_url, body=ConnectionError())
assert not teos_cli.get_appointment(locator, teos_url)