mirror of
https://github.com/aljazceru/python-teos.git
synced 2025-12-17 22:24:23 +01:00
Refactored validation logic; improved error handling; avoid loading the key unnecessarily
This commit is contained in:
@@ -26,9 +26,7 @@ HTTP_OK = 200
|
||||
|
||||
logger = Logger("Client")
|
||||
|
||||
with open(PISA_PUBLIC_KEY, "r") as key_file:
|
||||
pubkey_pem = key_file.read().encode("utf-8")
|
||||
pisa_public_key = load_pem_public_key(pubkey_pem, backend=default_backend())
|
||||
pisa_public_key = None
|
||||
|
||||
|
||||
# FIXME: TESTING ENDPOINT, WON'T BE THERE IN PRODUCTION
|
||||
@@ -48,6 +46,28 @@ def generate_dummy_appointment():
|
||||
print('\nData stored in dummy_appointment_data.json')
|
||||
|
||||
|
||||
# Verifies that the appointment signature is a valid signature from Pisa, returning True or False accordingly.
|
||||
# Will raise NotFoundError or IOError if the attempts to open and read the public key file fail.
|
||||
# Will raise ValueError if it the public key file was present but it failed to be unserialized.
|
||||
def is_appointment_signature_valid(appointment, signature):
|
||||
# Load the key the first time this is used
|
||||
if pisa_public_key is None:
|
||||
try:
|
||||
with open(PISA_PUBLIC_KEY, "r") as key_file:
|
||||
pubkey_pem = key_file.read().encode("utf-8")
|
||||
pisa_public_key = load_pem_public_key(pubkey_pem, backend=default_backend())
|
||||
except cryptography.exceptions.UnsupportedAlgorithm:
|
||||
raise ValueError("Could not unserialize the public key (unsupported algorithm).")
|
||||
try:
|
||||
sig_bytes = unhexlify(response_json['signature'].encode('utf-8'))
|
||||
data = appointment.to_json().encode("utf-8")
|
||||
pisa_public_key.verify(sig_bytes, data, ec.ECDSA(hashes.SHA256()))
|
||||
except InvalidSignature:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def add_appointment(args):
|
||||
appointment_data = None
|
||||
use_help = "Use 'help add_appointment' for help of how to use the command."
|
||||
@@ -97,9 +117,8 @@ def add_appointment(args):
|
||||
logger.error("The response does not contain the signature of the appointment.")
|
||||
else:
|
||||
# verify that the returned signature is valid
|
||||
sig_bytes = unhexlify(response_json['signature'].encode('utf-8'))
|
||||
data = appointment.to_json().encode("utf-8")
|
||||
pisa_public_key.verify(sig_bytes, data, ec.ECDSA(hashes.SHA256()))
|
||||
if is_appointment_signature_valid(appointment, response_json['signature']) == False:
|
||||
logger.error("The returned appointment's signature is invalid.")
|
||||
else:
|
||||
if 'error' not in response_json:
|
||||
logger.error("The server returned status code {}, but no error description."
|
||||
@@ -117,9 +136,10 @@ def add_appointment(args):
|
||||
|
||||
except ConnectionError:
|
||||
logger.error("Can't connect to pisa API. Server cannot be reached.")
|
||||
|
||||
except InvalidSignature:
|
||||
logger.error("The returned appointment's signature is invalid.")
|
||||
except FileNotFoundError:
|
||||
logger.error("Pisa's public key file not found. Please check your settings.")
|
||||
except IOError e:
|
||||
logger.error("I/O error({0}): {1}".format(e.errno, e.strerror))
|
||||
else:
|
||||
logger.error("The provided locator is not valid.")
|
||||
else:
|
||||
@@ -167,7 +187,15 @@ def build_appointment(tx, tx_id, start_block, end_block, dispute_delta):
|
||||
blob = Blob(tx, cipher, hash_function)
|
||||
encrypted_blob = blob.encrypt(tx_id)
|
||||
|
||||
return Appointment(locator, start_block, end_block, dispute_delta, encrypted_blob, cipher, hash_function)
|
||||
return {
|
||||
'locator': locator,
|
||||
'start_block': start_block,
|
||||
'end_block': end_block,
|
||||
'dispute_delta': dispute_delta,
|
||||
'encrypted_blob': encrypted_blob,
|
||||
'cipher': cipher,
|
||||
'hash_function': hash_function
|
||||
}
|
||||
|
||||
|
||||
def check_txid_format(txid):
|
||||
|
||||
Reference in New Issue
Block a user