pisa -> teos

This commit is contained in:
Sergi Delgado Segura
2020-03-17 13:05:55 +01:00
parent 15126618d5
commit 7c7ff909d7
56 changed files with 373 additions and 372 deletions

View File

@@ -11,10 +11,10 @@ from common.appointment import Appointment
from common.cryptographer import Cryptographer
from common.blob import Blob
import apps.cli.wt_cli as wt_cli
import apps.cli.teos_cli as teos_cli
from test.apps.cli.unit.conftest import get_random_value_hex
common.cryptographer.logger = Logger(actor="Cryptographer", log_name_prefix=wt_cli.LOG_PREFIX)
common.cryptographer.logger = Logger(actor="Cryptographer", log_name_prefix=teos_cli.LOG_PREFIX)
# dummy keys for the tests
dummy_sk = PrivateKey()
@@ -23,11 +23,11 @@ another_sk = PrivateKey()
# Replace the key in the module with a key we control for the tests
wt_cli.pisa_public_key = dummy_pk
teos_cli.teos_public_key = dummy_pk
# Replace endpoint with dummy one
wt_cli.pisa_api_server = "https://dummy.com"
wt_cli.pisa_api_port = 12345
pisa_endpoint = "{}:{}/".format(wt_cli.pisa_api_server, wt_cli.pisa_api_port)
teos_cli.teos_api_server = "https://dummy.com"
teos_cli.teos_api_port = 12345
teos_endpoint = "{}:{}/".format(teos_cli.teos_api_server, teos_cli.teos_api_port)
dummy_appointment_request = {
"tx": get_random_value_hex(192),
@@ -74,17 +74,17 @@ def get_bad_signature(*args):
# f.write(dummy_pk_der)
#
# # Now we can test the function passing the using this files (we'll use the same pk for both)
# r = wt_cli.load_keys(public_key_file_path, private_key_file_path, public_key_file_path)
# r = teos_cli.load_keys(public_key_file_path, private_key_file_path, public_key_file_path)
# assert isinstance(r, tuple)
# assert len(r) == 3
#
# # If any param does not match we should get None as result
# assert wt_cli.load_keys(None, private_key_file_path, public_key_file_path) is None
# assert wt_cli.load_keys(public_key_file_path, None, public_key_file_path) is None
# assert wt_cli.load_keys(public_key_file_path, private_key_file_path, None) is None
# assert teos_cli.load_keys(None, private_key_file_path, public_key_file_path) is None
# assert teos_cli.load_keys(public_key_file_path, None, public_key_file_path) is None
# assert teos_cli.load_keys(public_key_file_path, private_key_file_path, None) is None
#
# # The same should happen if we pass a public key where a private should be, for instance
# assert wt_cli.load_keys(private_key_file_path, public_key_file_path, private_key_file_path) is None
# assert teos_cli.load_keys(private_key_file_path, public_key_file_path, private_key_file_path) is None
#
# os.remove(private_key_file_path)
# os.remove(public_key_file_path)
@@ -95,14 +95,14 @@ def get_bad_signature(*args):
def test_add_appointment(monkeypatch):
# Simulate a request to add_appointment for dummy_appointment, make sure that the right endpoint is requested
# and the return value is True
monkeypatch.setattr(wt_cli, "load_keys", load_dummy_keys)
monkeypatch.setattr(teos_cli, "load_keys", load_dummy_keys)
response = {"locator": dummy_appointment.locator, "signature": get_dummy_signature()}
responses.add(responses.POST, pisa_endpoint, json=response, status=200)
result = wt_cli.add_appointment([json.dumps(dummy_appointment_request)])
responses.add(responses.POST, teos_endpoint, json=response, status=200)
result = teos_cli.add_appointment([json.dumps(dummy_appointment_request)])
assert len(responses.calls) == 1
assert responses.calls[0].request.url == pisa_endpoint
assert responses.calls[0].request.url == teos_endpoint
assert result
@@ -112,39 +112,39 @@ def test_add_appointment_with_invalid_signature(monkeypatch):
# make sure that the right endpoint is requested, but the return value is False
# Make sure the test uses the bad dummy signature
monkeypatch.setattr(wt_cli, "load_keys", load_dummy_keys)
monkeypatch.setattr(teos_cli, "load_keys", load_dummy_keys)
response = {
"locator": dummy_appointment.to_dict()["locator"],
"signature": get_bad_signature(), # Sign with a bad key
}
responses.add(responses.POST, pisa_endpoint, json=response, status=200)
result = wt_cli.add_appointment([json.dumps(dummy_appointment_request)])
responses.add(responses.POST, teos_endpoint, json=response, status=200)
result = teos_cli.add_appointment([json.dumps(dummy_appointment_request)])
assert result is False
def test_parse_add_appointment_args():
# If no args are passed, function should fail.
appt_data = wt_cli.parse_add_appointment_args(None)
appt_data = teos_cli.parse_add_appointment_args(None)
assert not appt_data
# If file doesn't exist, function should fail.
appt_data = wt_cli.parse_add_appointment_args(["-f", "nonexistent_file"])
appt_data = teos_cli.parse_add_appointment_args(["-f", "nonexistent_file"])
assert not appt_data
# If file exists and has data in it, function should work.
with open("appt_test_file", "w") as f:
json.dump(dummy_appointment_request, f)
appt_data = wt_cli.parse_add_appointment_args(["-f", "appt_test_file"])
appt_data = teos_cli.parse_add_appointment_args(["-f", "appt_test_file"])
assert appt_data
os.remove("appt_test_file")
# If appointment json is passed in, function should work.
appt_data = wt_cli.parse_add_appointment_args([json.dumps(dummy_appointment_request)])
appt_data = teos_cli.parse_add_appointment_args([json.dumps(dummy_appointment_request)])
assert appt_data
@@ -155,11 +155,11 @@ def test_post_appointment():
"signature": Cryptographer.sign(dummy_appointment.serialize(), dummy_pk),
}
responses.add(responses.POST, pisa_endpoint, json=response, status=200)
response = wt_cli.post_appointment(json.dumps(dummy_appointment_request))
responses.add(responses.POST, teos_endpoint, json=response, status=200)
response = teos_cli.post_appointment(json.dumps(dummy_appointment_request))
assert len(responses.calls) == 1
assert responses.calls[0].request.url == pisa_endpoint
assert responses.calls[0].request.url == teos_endpoint
assert response
@@ -172,28 +172,28 @@ def test_process_post_appointment_response():
}
# A 200 OK with a correct json response should return the json of the response
responses.add(responses.POST, pisa_endpoint, json=response, status=200)
r = wt_cli.post_appointment(json.dumps(dummy_appointment_request))
assert wt_cli.process_post_appointment_response(r) == r.json()
responses.add(responses.POST, teos_endpoint, json=response, status=200)
r = teos_cli.post_appointment(json.dumps(dummy_appointment_request))
assert teos_cli.process_post_appointment_response(r) == r.json()
# If we modify the response code tor a rejection (lets say 404) we should get None
responses.replace(responses.POST, pisa_endpoint, json=response, status=404)
r = wt_cli.post_appointment(json.dumps(dummy_appointment_request))
assert wt_cli.process_post_appointment_response(r) is None
responses.replace(responses.POST, teos_endpoint, json=response, status=404)
r = teos_cli.post_appointment(json.dumps(dummy_appointment_request))
assert teos_cli.process_post_appointment_response(r) is None
# The same should happen if the response is not in json
responses.replace(responses.POST, pisa_endpoint, status=404)
r = wt_cli.post_appointment(json.dumps(dummy_appointment_request))
assert wt_cli.process_post_appointment_response(r) is None
responses.replace(responses.POST, teos_endpoint, status=404)
r = teos_cli.post_appointment(json.dumps(dummy_appointment_request))
assert teos_cli.process_post_appointment_response(r) is None
def test_save_appointment_receipt(monkeypatch):
appointments_folder = "test_appointments_receipts"
wt_cli.config["APPOINTMENTS_FOLDER_NAME"] = appointments_folder
teos_cli.config["APPOINTMENTS_FOLDER_NAME"] = appointments_folder
# The functions creates a new directory if it does not exist
assert not os.path.exists(appointments_folder)
wt_cli.save_appointment_receipt(dummy_appointment.to_dict(), get_dummy_signature())
teos_cli.save_appointment_receipt(dummy_appointment.to_dict(), get_dummy_signature())
assert os.path.exists(appointments_folder)
# Check that the receipt has been saved by checking the file names
@@ -209,9 +209,9 @@ def test_get_appointment():
dummy_appointment_full["status"] = "being_watched"
response = dummy_appointment_full
request_url = "{}get_appointment?locator={}".format(pisa_endpoint, response.get("locator"))
request_url = "{}get_appointment?locator={}".format(teos_endpoint, response.get("locator"))
responses.add(responses.GET, request_url, json=response, status=200)
result = wt_cli.get_appointment(response.get("locator"))
result = teos_cli.get_appointment(response.get("locator"))
assert len(responses.calls) == 1
assert responses.calls[0].request.url == request_url
@@ -223,7 +223,7 @@ def test_get_appointment_err():
locator = get_random_value_hex(16)
# Test that get_appointment handles a connection error appropriately.
request_url = "{}get_appointment?locator=".format(pisa_endpoint, locator)
request_url = "{}get_appointment?locator=".format(teos_endpoint, locator)
responses.add(responses.GET, request_url, body=ConnectionError())
assert not wt_cli.get_appointment(locator)
assert not teos_cli.get_appointment(locator)