Refactored pisa_cli to improve testability; added test with wrong signature returned

This commit is contained in:
Salvatore Ingala
2019-10-25 18:33:24 +08:00
committed by Sergi Delgado Segura
parent 956d7091eb
commit 21db172727
2 changed files with 46 additions and 15 deletions

View File

@@ -44,19 +44,22 @@ def generate_dummy_appointment():
print('\nData stored in dummy_appointment_data.json') print('\nData stored in dummy_appointment_data.json')
# Loads Pisa's public key from disk and verifies that the appointment signature is a valid signature from Pisa, # Loads and returns Pisa's public key from disk.
# returning True or False accordingly.
# Will raise NotFoundError or IOError if the attempts to open and read the public key file fail. # Will raise NotFoundError or IOError if the attempts to open and read the public key file fail.
# Will raise ValueError if it the public key file was present but it failed to be deserialized. # Will raise ValueError if it the public key file was present but it failed to be deserialized.
def is_appointment_signature_valid(appointment, signature) -> bool: def load_pisa_public_key():
# Load the key from disk
try: try:
with open(PISA_PUBLIC_KEY, "r") as key_file: with open(PISA_PUBLIC_KEY, "r") as key_file:
pubkey_pem = key_file.read().encode("utf-8") pubkey_pem = key_file.read().encode("utf-8")
pisa_public_key = load_pem_public_key(pubkey_pem, backend=default_backend()) pisa_public_key = load_pem_public_key(pubkey_pem, backend=default_backend())
return pisa_public_key
except UnsupportedAlgorithm: except UnsupportedAlgorithm:
raise ValueError("Could not deserialize the public key (unsupported algorithm).") raise ValueError("Could not deserialize the public key (unsupported algorithm).")
# Verifies that the appointment signature is a valid signature with public key `pk`,
# returning True or False accordingly.
def is_appointment_signature_valid(appointment, signature, pk):
try: try:
sig_bytes = unhexlify(signature.encode('utf-8')) sig_bytes = unhexlify(signature.encode('utf-8'))
data = json.dumps(appointment, sort_keys=True, separators=(',', ':')).encode("utf-8") data = json.dumps(appointment, sort_keys=True, separators=(',', ':')).encode("utf-8")
@@ -130,15 +133,15 @@ def add_appointment(args):
except json.JSONDecodeError: except json.JSONDecodeError:
logger.error("The response was not valid JSON.") logger.error("The response was not valid JSON.")
return return False
except ConnectTimeout: except ConnectTimeout:
logger.error("Can't connect to pisa API. Connection timeout.") logger.error("Can't connect to pisa API. Connection timeout.")
return return False
except ConnectionError: except ConnectionError:
logger.error("Can't connect to pisa API. Server cannot be reached.") logger.error("Can't connect to pisa API. Server cannot be reached.")
return return False
if r.status_code == HTTP_OK: if r.status_code == HTTP_OK:
if 'signature' not in response_json: if 'signature' not in response_json:
@@ -147,22 +150,24 @@ def add_appointment(args):
signature = response_json['signature'] signature = response_json['signature']
# verify that the returned signature is valid # verify that the returned signature is valid
try: try:
is_sig_valid = is_appointment_signature_valid(appointment, signature) pk = load_pisa_public_key()
is_sig_valid = is_appointment_signature_valid(appointment, signature, pk)
except ValueError: except ValueError:
logger.error("Failed to deserialize the public key. It might be in an unsupported format.") logger.error("Failed to deserialize the public key. It might be in an unsupported format.")
return return False
except FileNotFoundError: except FileNotFoundError:
logger.error("Pisa's public key file not found. Please check your settings.") logger.error("Pisa's public key file not found. Please check your settings.")
return return False
except IOError as e: except IOError as e:
logger.error("I/O error({}): {}".format(e.errno, e.strerror)) logger.error("I/O error({}): {}".format(e.errno, e.strerror))
return return False
if is_sig_valid: if is_sig_valid:
logger.info("Appointment accepted and signed by Pisa.") logger.info("Appointment accepted and signed by Pisa.")
# all good, store appointment and signature # all good, store appointment and signature
try: try:
save_signed_appointment(appointment, signature) save_signed_appointment(appointment, signature)
return True
except OSError as e: except OSError as e:
logger.error("There was an error while saving the appointment: {}".format(e)) logger.error("There was an error while saving the appointment: {}".format(e))
else: else:
@@ -182,6 +187,8 @@ def add_appointment(args):
else: else:
logger.error("No appointment data provided. " + use_help) logger.error("No appointment data provided. " + use_help)
return False # return False for any path that returned an error message
def get_appointment(args): def get_appointment(args):
if args: if args:
@@ -200,7 +207,7 @@ def get_appointment(args):
r = requests.get(url=get_appointment_endpoint + parameters, timeout=5) r = requests.get(url=get_appointment_endpoint + parameters, timeout=5)
print(json.dumps(r.json(), indent=4, sort_keys=True)) print(json.dumps(r.json(), indent=4, sort_keys=True))
return True
except ConnectTimeout: except ConnectTimeout:
logger.error("Can't connect to pisa API. Connection timeout.") logger.error("Can't connect to pisa API. Connection timeout.")
@@ -213,6 +220,8 @@ def get_appointment(args):
else: else:
logger.error("The provided locator is not valid.") logger.error("The provided locator is not valid.")
return False # return False for any path that returned an error message
def build_appointment(tx, tx_id, start_time, end_time, dispute_delta): def build_appointment(tx, tx_id, start_time, end_time, dispute_delta):
locator = sha256(unhexlify(tx_id)).hexdigest() locator = sha256(unhexlify(tx_id)).hexdigest()

View File

@@ -44,15 +44,18 @@ def sign_appointment(sk, appointment):
def test_is_appointment_signature_valid(): def test_is_appointment_signature_valid():
# Verify that an appointment signed by Pisa is valid # Verify that an appointment signed by Pisa is valid
signature = sign_appointment(pisa_sk, dummy_appointment) signature = sign_appointment(pisa_sk, dummy_appointment)
assert pisa_cli.is_appointment_signature_valid(dummy_appointment, signature) assert pisa_cli.is_appointment_signature_valid(dummy_appointment, signature, pisa_pk)
# Test that a signature from a different key is indeed invalid # Test that a signature from a different key is indeed invalid
other_signature = sign_appointment(other_sk, dummy_appointment) other_signature = sign_appointment(other_sk, dummy_appointment)
assert not pisa_cli.is_appointment_signature_valid(dummy_appointment, other_signature) assert not pisa_cli.is_appointment_signature_valid(dummy_appointment, other_signature, pisa_pk)
@responses.activate @responses.activate
def test_add_appointment(): def test_add_appointment():
# Simulate a request to add_appointment for dummy_appointment, make sure that the right endpoint is requested
# and the return value is True
response = { response = {
'locator': dummy_appointment['locator'], 'locator': dummy_appointment['locator'],
'signature': sign_appointment(pisa_sk, dummy_appointment) 'signature': sign_appointment(pisa_sk, dummy_appointment)
@@ -61,7 +64,26 @@ def test_add_appointment():
request_url = 'http://{}/'.format(pisa_endpoint) request_url = 'http://{}/'.format(pisa_endpoint)
responses.add(responses.POST, request_url, json=response, status=200) responses.add(responses.POST, request_url, json=response, status=200)
pisa_cli.add_appointment([json.dumps(dummy_appointment_request)]) result = pisa_cli.add_appointment([json.dumps(dummy_appointment_request)])
assert len(responses.calls) == 1 assert len(responses.calls) == 1
assert responses.calls[0].request.url == request_url assert responses.calls[0].request.url == request_url
assert result
@responses.activate
def test_add_appointment_with_invalid_signature():
# Simulate a request to add_appointment for dummy_appointment, but sign with a different key,
# make sure that the right endpoint is requested, but the return value is False
response = {
'locator': dummy_appointment['locator'],
'signature': sign_appointment(other_sk, dummy_appointment) # signing with a different key
}
request_url = 'http://{}/'.format(pisa_endpoint)
responses.add(responses.POST, request_url, json=response, status=200)
result = pisa_cli.add_appointment([json.dumps(dummy_appointment_request)])
assert not result