mirror of
https://github.com/aljazceru/python-teos.git
synced 2025-12-17 22:24:23 +01:00
Added signature verification to pisa_cli
This commit is contained in:
@@ -11,6 +11,8 @@ CLIENT_LOG_FILE = 'pisa.log'
|
||||
SUPPORTED_HASH_FUNCTIONS = ["SHA256"]
|
||||
SUPPORTED_CIPHERS = ["AES-GCM-128"]
|
||||
|
||||
PUBLIC_KEY_FILE = "signing_key_pub.pem"
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(format='%(message)s', level=logging.INFO, handlers=[
|
||||
logging.FileHandler(CLIENT_LOG_FILE),
|
||||
|
||||
@@ -9,14 +9,25 @@ from binascii import unhexlify
|
||||
from getopt import getopt, GetoptError
|
||||
from requests import ConnectTimeout, ConnectionError
|
||||
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
from cryptography.hazmat.primitives.serialization import load_pem_public_key
|
||||
from cryptography.hazmat.primitives.asymmetric import ec
|
||||
from cryptography.exceptions import InvalidSignature
|
||||
|
||||
from pisa.logger import Logger
|
||||
from apps.cli.blob import Blob
|
||||
from apps.cli.help import help_add_appointment, help_get_appointment
|
||||
from apps.cli import DEFAULT_PISA_API_SERVER, DEFAULT_PISA_API_PORT
|
||||
from apps.cli import DEFAULT_PISA_API_SERVER, DEFAULT_PISA_API_PORT, PUBLIC_KEY_FILE
|
||||
|
||||
HTTP_OK = 200
|
||||
|
||||
logger = Logger("Client")
|
||||
|
||||
with open(PUBLIC_KEY_FILE, "r") as key_file:
|
||||
pubkey_pem = key_file.read().encode("utf-8")
|
||||
pisa_public_key = load_pem_public_key(pubkey_pem, backend=default_backend())
|
||||
|
||||
|
||||
# FIXME: TESTING ENDPOINT, WON'T BE THERE IN PRODUCTION
|
||||
def generate_dummy_appointment():
|
||||
@@ -75,13 +86,37 @@ def add_appointment(args):
|
||||
try:
|
||||
r = requests.post(url=add_appointment_endpoint, json=json.dumps(appointment), timeout=5)
|
||||
|
||||
logger.info("{} (code: {}).".format(r.text, r.status_code))
|
||||
logger.info("{} (code: {}).".format(r.json(), r.status_code))
|
||||
|
||||
response_json = r.json()
|
||||
|
||||
if r.status_code == HTTP_OK:
|
||||
if 'signature' not in response_json:
|
||||
logger.error("The response does not contain the signature of the appointment.")
|
||||
else:
|
||||
# verify that the returned signature is valid
|
||||
signature = response_json['signature']
|
||||
pisa_public_key.verify(signature.encode("utf-8"), data, ec.ECDSA(hashes.SHA256()))
|
||||
else:
|
||||
if 'error' not in response_json:
|
||||
logger.error("The server returned status code {}, but no error description."
|
||||
.format(r.status_code))
|
||||
else:
|
||||
error = r.json()['error']
|
||||
logger.error("The server returned status code {}, and the following error: {}."
|
||||
.format(r.status_code))
|
||||
|
||||
except json.JSONDecodeError:
|
||||
logger.error("The response was not valid JSON.")
|
||||
|
||||
except ConnectTimeout:
|
||||
logger.error("Can't connect to pisa API. Connection timeout.")
|
||||
|
||||
except ConnectionError:
|
||||
logger.error("Can't connect to pisa API. Server cannot be reached.")
|
||||
|
||||
except InvalidSignature:
|
||||
logger.error("The returned appointment's signature is invalid.")
|
||||
else:
|
||||
logger.error("The provided locator is not valid.")
|
||||
else:
|
||||
|
||||
@@ -31,8 +31,8 @@ class Watcher:
|
||||
raise ValueError("No signing key provided. Please fix your pisa.conf")
|
||||
else:
|
||||
with open(SIGNING_KEY_FILE, "r") as key_file:
|
||||
pubkey_pem = key_file.read().encode("utf-8")
|
||||
self.signing_key = load_pem_private_key(pubkey_pem, password=None, backend=default_backend())
|
||||
privkey_pem = key_file.read().encode("utf-8")
|
||||
self.signing_key = load_pem_private_key(privkey_pem, password=None, backend=default_backend())
|
||||
|
||||
def add_appointment(self, appointment):
|
||||
# Rationale:
|
||||
|
||||
Reference in New Issue
Block a user