diff --git a/apps/cli/pisa_cli.py b/apps/cli/pisa_cli.py index 60ea067..18d68fd 100644 --- a/apps/cli/pisa_cli.py +++ b/apps/cli/pisa_cli.py @@ -44,19 +44,22 @@ def generate_dummy_appointment(): print('\nData stored in dummy_appointment_data.json') -# Loads Pisa's public key from disk and verifies that the appointment signature is a valid signature from Pisa, -# returning True or False accordingly. +# Loads and returns Pisa's public key from disk. # Will raise NotFoundError or IOError if the attempts to open and read the public key file fail. # Will raise ValueError if it the public key file was present but it failed to be deserialized. -def is_appointment_signature_valid(appointment, signature) -> bool: - # Load the key from disk +def load_pisa_public_key(): try: with open(PISA_PUBLIC_KEY, "r") as key_file: pubkey_pem = key_file.read().encode("utf-8") pisa_public_key = load_pem_public_key(pubkey_pem, backend=default_backend()) + return pisa_public_key except UnsupportedAlgorithm: raise ValueError("Could not deserialize the public key (unsupported algorithm).") + +# Verifies that the appointment signature is a valid signature with public key `pk`, +# returning True or False accordingly. +def is_appointment_signature_valid(appointment, signature, pk): try: sig_bytes = unhexlify(signature.encode('utf-8')) data = json.dumps(appointment, sort_keys=True, separators=(',', ':')).encode("utf-8") @@ -130,15 +133,15 @@ def add_appointment(args): except json.JSONDecodeError: logger.error("The response was not valid JSON.") - return + return False except ConnectTimeout: logger.error("Can't connect to pisa API. Connection timeout.") - return + return False except ConnectionError: logger.error("Can't connect to pisa API. Server cannot be reached.") - return + return False if r.status_code == HTTP_OK: if 'signature' not in response_json: @@ -147,22 +150,24 @@ def add_appointment(args): signature = response_json['signature'] # verify that the returned signature is valid try: - is_sig_valid = is_appointment_signature_valid(appointment, signature) + pk = load_pisa_public_key() + is_sig_valid = is_appointment_signature_valid(appointment, signature, pk) except ValueError: logger.error("Failed to deserialize the public key. It might be in an unsupported format.") - return + return False except FileNotFoundError: logger.error("Pisa's public key file not found. Please check your settings.") - return + return False except IOError as e: logger.error("I/O error({}): {}".format(e.errno, e.strerror)) - return + return False if is_sig_valid: logger.info("Appointment accepted and signed by Pisa.") # all good, store appointment and signature try: save_signed_appointment(appointment, signature) + return True except OSError as e: logger.error("There was an error while saving the appointment: {}".format(e)) else: @@ -182,6 +187,8 @@ def add_appointment(args): else: logger.error("No appointment data provided. " + use_help) + return False # return False for any path that returned an error message + def get_appointment(args): if args: @@ -200,7 +207,7 @@ def get_appointment(args): r = requests.get(url=get_appointment_endpoint + parameters, timeout=5) print(json.dumps(r.json(), indent=4, sort_keys=True)) - + return True except ConnectTimeout: logger.error("Can't connect to pisa API. Connection timeout.") @@ -213,6 +220,8 @@ def get_appointment(args): else: logger.error("The provided locator is not valid.") + return False # return False for any path that returned an error message + def build_appointment(tx, tx_id, start_time, end_time, dispute_delta): locator = sha256(unhexlify(tx_id)).hexdigest() diff --git a/test/apps/cli/test_pisa_cli.py b/test/apps/cli/test_pisa_cli.py index 63ef87f..dc545b2 100644 --- a/test/apps/cli/test_pisa_cli.py +++ b/test/apps/cli/test_pisa_cli.py @@ -44,15 +44,18 @@ def sign_appointment(sk, appointment): def test_is_appointment_signature_valid(): # Verify that an appointment signed by Pisa is valid signature = sign_appointment(pisa_sk, dummy_appointment) - assert pisa_cli.is_appointment_signature_valid(dummy_appointment, signature) + assert pisa_cli.is_appointment_signature_valid(dummy_appointment, signature, pisa_pk) # Test that a signature from a different key is indeed invalid other_signature = sign_appointment(other_sk, dummy_appointment) - assert not pisa_cli.is_appointment_signature_valid(dummy_appointment, other_signature) + assert not pisa_cli.is_appointment_signature_valid(dummy_appointment, other_signature, pisa_pk) @responses.activate def test_add_appointment(): + # Simulate a request to add_appointment for dummy_appointment, make sure that the right endpoint is requested + # and the return value is True + response = { 'locator': dummy_appointment['locator'], 'signature': sign_appointment(pisa_sk, dummy_appointment) @@ -61,7 +64,26 @@ def test_add_appointment(): request_url = 'http://{}/'.format(pisa_endpoint) responses.add(responses.POST, request_url, json=response, status=200) - pisa_cli.add_appointment([json.dumps(dummy_appointment_request)]) + result = pisa_cli.add_appointment([json.dumps(dummy_appointment_request)]) assert len(responses.calls) == 1 assert responses.calls[0].request.url == request_url + + assert result + + +@responses.activate +def test_add_appointment_with_invalid_signature(): + # Simulate a request to add_appointment for dummy_appointment, but sign with a different key, + # make sure that the right endpoint is requested, but the return value is False + response = { + 'locator': dummy_appointment['locator'], + 'signature': sign_appointment(other_sk, dummy_appointment) # signing with a different key + } + + request_url = 'http://{}/'.format(pisa_endpoint) + responses.add(responses.POST, request_url, json=response, status=200) + + result = pisa_cli.add_appointment([json.dumps(dummy_appointment_request)]) + + assert not result