mirror of
https://github.com/aljazceru/python-teos.git
synced 2025-12-18 22:54:23 +01:00
Improves cli
- Improves modularity - Adds missing exceptions - Adds docstrings - Simplifies some method names
This commit is contained in:
@@ -13,13 +13,12 @@ from apps.cli import config, LOG_PREFIX
|
||||
from apps.cli.help import help_add_appointment, help_get_appointment
|
||||
from apps.cli.blob import Blob
|
||||
|
||||
from common import constants
|
||||
from common.logger import Logger
|
||||
from common.appointment import Appointment
|
||||
from common.cryptographer import Cryptographer
|
||||
from common.tools import check_sha256_hex_format, check_locator_format, compute_locator
|
||||
|
||||
|
||||
HTTP_OK = 200
|
||||
logger = Logger(actor="Client", log_name_prefix=LOG_PREFIX)
|
||||
|
||||
|
||||
@@ -47,39 +46,81 @@ def generate_dummy_appointment():
|
||||
logger.info("\nData stored in dummy_appointment_data.json")
|
||||
|
||||
|
||||
# Loads and returns Pisa keys from disk
|
||||
def load_key_file_data(file_name):
|
||||
try:
|
||||
with open(file_name, "rb") as key_file:
|
||||
key = key_file.read()
|
||||
return key
|
||||
def load_keys(pisa_pk_path, cli_sk_path, cli_pk_path):
|
||||
"""
|
||||
Loads all the keys required so sign, send, and verify the appointment.
|
||||
|
||||
except FileNotFoundError as e:
|
||||
logger.error("Client's key file not found. Please check your settings")
|
||||
raise e
|
||||
Args:
|
||||
pisa_pk_path (:obj:`str`): path to the PISA public key file.
|
||||
cli_sk_path (:obj:`str`): path to the client private key file.
|
||||
cli_pk_path (:obj:`str`): path to the client public key file.
|
||||
|
||||
except IOError as e:
|
||||
logger.error("I/O error({}): {}".format(e.errno, e.strerror))
|
||||
raise e
|
||||
Returns:
|
||||
:obj:`tuple` or ``None``: a three item tuple containing a pisa_pk object, cli_sk object and the cli_sk_der
|
||||
encoded key if all keys can be loaded. ``None`` otherwise.
|
||||
"""
|
||||
|
||||
pisa_pk_der = Cryptographer.load_key_file(pisa_pk_path)
|
||||
pisa_pk = Cryptographer.load_public_key_der(pisa_pk_der)
|
||||
|
||||
# Makes sure that the folder APPOINTMENTS_FOLDER_NAME exists, then saves the appointment and signature in it.
|
||||
def save_signed_appointment(appointment, signature):
|
||||
# Create the appointments directory if it doesn't already exist
|
||||
os.makedirs(config.get("APPOINTMENTS_FOLDER_NAME"), exist_ok=True)
|
||||
if pisa_pk is None:
|
||||
logger.error("PISA's public key file not found. Please check your settings")
|
||||
return None
|
||||
|
||||
timestamp = int(time.time())
|
||||
locator = appointment["locator"]
|
||||
uuid = uuid4().hex # prevent filename collisions
|
||||
cli_sk_der = Cryptographer.load_key_file(cli_sk_path)
|
||||
cli_sk = Cryptographer.load_private_key_der(cli_sk_der)
|
||||
|
||||
filename = "{}/appointment-{}-{}-{}.json".format(config.get("APPOINTMENTS_FOLDER_NAME"), timestamp, locator, uuid)
|
||||
data = {"appointment": appointment, "signature": signature}
|
||||
if cli_sk is None:
|
||||
logger.error("Client's private key file not found. Please check your settings")
|
||||
return None
|
||||
|
||||
with open(filename, "w") as f:
|
||||
json.dump(data, f)
|
||||
cli_pk_der = Cryptographer.load_key_file(cli_pk_path)
|
||||
|
||||
if cli_pk_der is None:
|
||||
logger.error("Client's public key file not found. Please check your settings")
|
||||
return None
|
||||
|
||||
return pisa_pk, cli_sk, cli_pk_der
|
||||
|
||||
|
||||
def add_appointment(args):
|
||||
"""
|
||||
Manages the add_appointment command, from argument parsing, trough sending the appointment to the tower, until
|
||||
saving the appointment receipt.
|
||||
|
||||
The life cycle of the function is as follows:
|
||||
- Load the add_appointment arguments
|
||||
- Check that the given commitment_txid is correct (proper format and not missing)
|
||||
- Check that the transaction is correct (not missing)
|
||||
- Create the appointment locator and encrypted blob from the commitment_txid and the penalty_tx
|
||||
- Load the client private key and sign the appointment
|
||||
- Send the appointment to the tower
|
||||
- Wait for the response
|
||||
- Check the tower's response and signature
|
||||
- Store the receipt (appointment + signature) on disk
|
||||
|
||||
If any of the above-mentioned steps fails, the method returns false, otherwise it returns true.
|
||||
|
||||
Args:
|
||||
args (:obj:`list`): a list of arguments to pass to ``parse_add_appointment_args``. Must contain a json encoded
|
||||
appointment, or the file option and the path to a file containing a json encoded appointment.
|
||||
|
||||
Returns:
|
||||
:obj:`bool`: True if the appointment is accepted by the tower and the receipt is properly stored, false if any
|
||||
error occurs during the process.
|
||||
"""
|
||||
|
||||
pisa_pk, cli_sk, cli_pk_der = load_keys(
|
||||
config.get("PISA_PUBLIC_KEY"), config.get("CLI_PRIVATE_KEY"), config.get("CLI_PUBLIC_KEY")
|
||||
)
|
||||
|
||||
try:
|
||||
hex_pk_der = binascii.hexlify(cli_pk_der)
|
||||
|
||||
except binascii.Error as e:
|
||||
logger.error("Could not successfully encode public key as hex", error=str(e))
|
||||
return False
|
||||
|
||||
# Get appointment data from user.
|
||||
appointment_data = parse_add_appointment_args(args)
|
||||
|
||||
@@ -105,17 +146,16 @@ def add_appointment(args):
|
||||
return False
|
||||
|
||||
appointment = Appointment.from_dict(appointment_data)
|
||||
signature = Cryptographer.sign(appointment.serialize(), cli_sk)
|
||||
|
||||
signature = get_appointment_signature(appointment)
|
||||
hex_pk_der = get_pk()
|
||||
|
||||
if not (appointment and signature and hex_pk_der):
|
||||
if not (appointment and signature):
|
||||
return False
|
||||
|
||||
data = {"appointment": appointment.to_dict(), "signature": signature, "public_key": hex_pk_der.decode("utf-8")}
|
||||
|
||||
# Send appointment to the server.
|
||||
response_json = post_data_to_add_appointment_endpoint(data)
|
||||
server_response = post_appointment(data)
|
||||
response_json = process_post_appointment_response(server_response)
|
||||
|
||||
if response_json is None:
|
||||
return False
|
||||
@@ -126,27 +166,29 @@ def add_appointment(args):
|
||||
logger.error("The response does not contain the signature of the appointment")
|
||||
return False
|
||||
|
||||
valid = check_signature(signature, appointment)
|
||||
|
||||
if not valid:
|
||||
if not Cryptographer.verify(appointment.serialize(), signature, pisa_pk):
|
||||
logger.error("The returned appointment's signature is invalid")
|
||||
return False
|
||||
|
||||
logger.info("Appointment accepted and signed by Pisa")
|
||||
# all good, store appointment and signature
|
||||
try:
|
||||
save_signed_appointment(appointment.to_dict(), signature)
|
||||
logger.info("Appointment accepted and signed by PISA")
|
||||
|
||||
except OSError as e:
|
||||
logger.error("There was an error while saving the appointment", error=e)
|
||||
return False
|
||||
|
||||
return True
|
||||
# All good, store appointment and signature
|
||||
return save_appointment_receipt(appointment.to_dict(), signature)
|
||||
|
||||
|
||||
# Parse arguments passed to add_appointment and handle them accordingly.
|
||||
# Returns appointment data.
|
||||
def parse_add_appointment_args(args):
|
||||
"""
|
||||
Parses the arguments of the add_appointment command.
|
||||
|
||||
Args:
|
||||
args (:obj:`list`): a list of arguments to pass to ``parse_add_appointment_args``. Must contain a json encoded
|
||||
appointment, or the file option and the path to a file containing a json encoded appointment.
|
||||
|
||||
Returns:
|
||||
:obj:`dict` or :obj:`None`: A dictionary containing the appointment data if it can be loaded. ``None``
|
||||
otherwise.
|
||||
"""
|
||||
|
||||
use_help = "Use 'help add_appointment' for help of how to use the command"
|
||||
|
||||
if not args:
|
||||
@@ -182,80 +224,118 @@ def parse_add_appointment_args(args):
|
||||
return appointment_data
|
||||
|
||||
|
||||
# Sends appointment data to add_appointment endpoint to be processed by the server.
|
||||
def post_data_to_add_appointment_endpoint(data):
|
||||
def post_appointment(data):
|
||||
"""
|
||||
Sends appointment data to add_appointment endpoint to be processed by the tower.
|
||||
|
||||
Args:
|
||||
data (:obj:`dict`): a dictionary containing three fields: an appointment, the client-side signature, and the
|
||||
der-encoded client public key.
|
||||
|
||||
Returns:
|
||||
:obj:`dict` or ``None``: a json-encoded dictionary with the server response if the data can be posted.
|
||||
None otherwise.
|
||||
"""
|
||||
|
||||
logger.info("Sending appointment to PISA")
|
||||
|
||||
try:
|
||||
add_appointment_endpoint = "http://{}:{}".format(pisa_api_server, pisa_api_port)
|
||||
r = requests.post(url=add_appointment_endpoint, json=json.dumps(data), timeout=5)
|
||||
return requests.post(url=add_appointment_endpoint, json=json.dumps(data), timeout=5)
|
||||
|
||||
response_json = r.json()
|
||||
except ConnectTimeout:
|
||||
logger.error("Can't connect to PISA API. Connection timeout")
|
||||
return None
|
||||
|
||||
except ConnectionError:
|
||||
logger.error("Can't connect to PISA API. Server cannot be reached")
|
||||
return None
|
||||
|
||||
|
||||
def process_post_appointment_response(response):
|
||||
"""
|
||||
Processes the server response to an add_appointment request.
|
||||
|
||||
Args:
|
||||
response (:obj:`requests.models.Response`): a ``Response` object obtained from the sent request.
|
||||
|
||||
Returns:
|
||||
:obj:`dict` or :obj:`None`: a dictionary containing the tower's response data if it can be properly parsed and
|
||||
the response type is ``HTTP_OK``. ``None`` otherwise.
|
||||
"""
|
||||
|
||||
try:
|
||||
response_json = response.json()
|
||||
|
||||
except json.JSONDecodeError:
|
||||
logger.error("The response was not valid JSON")
|
||||
return None
|
||||
|
||||
except ConnectTimeout:
|
||||
logger.error("Can't connect to pisa API. Connection timeout")
|
||||
return None
|
||||
|
||||
except ConnectionError:
|
||||
logger.error("Can't connect to pisa API. Server cannot be reached")
|
||||
return None
|
||||
|
||||
if r.status_code != HTTP_OK:
|
||||
if response.status_code != constants.HTTP_OK:
|
||||
if "error" not in response_json:
|
||||
logger.error("The server returned an error status code but no error description", status_code=r.status_code)
|
||||
logger.error(
|
||||
"The server returned an error status code but no error description", status_code=response.status_code
|
||||
)
|
||||
else:
|
||||
error = response_json["error"]
|
||||
logger.error(
|
||||
"The server returned an error status code with an error description",
|
||||
status_code=r.status_code,
|
||||
status_code=response.status_code,
|
||||
description=error,
|
||||
)
|
||||
return None
|
||||
|
||||
if "signature" not in response_json:
|
||||
logger.error("The response does not contain the signature of the appointment")
|
||||
return None
|
||||
|
||||
return response_json
|
||||
|
||||
|
||||
# Verify that the signature returned from the watchtower is valid.
|
||||
def check_signature(signature, appointment):
|
||||
def save_appointment_receipt(appointment, signature):
|
||||
"""
|
||||
Saves an appointment receipt to disk. A receipt consists in an appointment and a signature from the tower.
|
||||
|
||||
Args:
|
||||
appointment (:obj:`Appointment <common.appointment.Appointment>`): the appointment to be saved on disk.
|
||||
signature (:obj:`str`): the signature of the appointment performed by the tower.
|
||||
|
||||
Returns:
|
||||
:obj:`bool`: True if the appointment if properly saved, false otherwise.
|
||||
|
||||
Raises:
|
||||
IOError: if an error occurs whilst writing the file on disk.
|
||||
"""
|
||||
|
||||
# Create the appointments directory if it doesn't already exist
|
||||
os.makedirs(config.get("APPOINTMENTS_FOLDER_NAME"), exist_ok=True)
|
||||
|
||||
timestamp = int(time.time())
|
||||
locator = appointment["locator"]
|
||||
uuid = uuid4().hex # prevent filename collisions
|
||||
|
||||
filename = "{}/appointment-{}-{}-{}.json".format(config.get("APPOINTMENTS_FOLDER_NAME"), timestamp, locator, uuid)
|
||||
data = {"appointment": appointment, "signature": signature}
|
||||
|
||||
try:
|
||||
pisa_pk_der = load_key_file_data(config.get("PISA_PUBLIC_KEY"))
|
||||
pisa_pk = Cryptographer.load_public_key_der(pisa_pk_der)
|
||||
|
||||
if pisa_pk is None:
|
||||
logger.error("Failed to deserialize the public key. It might be in an unsupported format")
|
||||
return False
|
||||
|
||||
return Cryptographer.verify(appointment.serialize(), signature, pisa_pk)
|
||||
|
||||
except FileNotFoundError:
|
||||
logger.error("Pisa's public key file not found. Please check your settings")
|
||||
return False
|
||||
with open(filename, "w") as f:
|
||||
json.dump(data, f)
|
||||
return True
|
||||
|
||||
except IOError as e:
|
||||
logger.error("I/O error", errno=e.errno, error=e.strerror)
|
||||
logger.error("There was an error while saving the appointment", error=e)
|
||||
return False
|
||||
|
||||
|
||||
def get_appointment(args):
|
||||
if not args:
|
||||
logger.error("No arguments were given")
|
||||
return None
|
||||
def get_appointment(locator):
|
||||
"""
|
||||
Gets information about an appointment from the tower.
|
||||
|
||||
arg_opt = args.pop(0)
|
||||
Args:
|
||||
locator (:obj:`str`): the appointment locator used to identify it.
|
||||
|
||||
if arg_opt in ["-h", "--help"]:
|
||||
sys.exit(help_get_appointment())
|
||||
else:
|
||||
locator = arg_opt
|
||||
valid_locator = check_locator_format(locator)
|
||||
Returns:
|
||||
:obj:`dict` or :obj:`None`: a dictionary containing thew appointment data if the locator is valid and the tower
|
||||
responds. ``None`` otherwise.
|
||||
"""
|
||||
|
||||
valid_locator = check_locator_format(locator)
|
||||
|
||||
if not valid_locator:
|
||||
logger.error("The provided locator is not valid", locator=locator)
|
||||
@@ -266,60 +346,17 @@ def get_appointment(args):
|
||||
|
||||
try:
|
||||
r = requests.get(url=get_appointment_endpoint + parameters, timeout=5)
|
||||
logger.info("Appointment response returned from server: {}".format(r.json()))
|
||||
return r.json()
|
||||
|
||||
except ConnectTimeout:
|
||||
logger.error("Can't connect to pisa API. Connection timeout")
|
||||
logger.error("Can't connect to PISA API. Connection timeout")
|
||||
return None
|
||||
|
||||
except ConnectionError:
|
||||
logger.error("Can't connect to pisa API. Server cannot be reached")
|
||||
logger.error("Can't connect to PISA API. Server cannot be reached")
|
||||
return None
|
||||
|
||||
|
||||
def get_appointment_signature(appointment):
|
||||
try:
|
||||
sk_der = load_key_file_data(config.get("CLI_PRIVATE_KEY"))
|
||||
cli_sk = Cryptographer.load_private_key_der(sk_der)
|
||||
|
||||
signature = Cryptographer.sign(appointment.serialize(), cli_sk)
|
||||
|
||||
return signature
|
||||
|
||||
except ValueError:
|
||||
logger.error("Failed to deserialize the public key. It might be in an unsupported format")
|
||||
return False
|
||||
|
||||
except FileNotFoundError:
|
||||
logger.error("Client's private key file not found. Please check your settings")
|
||||
return False
|
||||
|
||||
except IOError as e:
|
||||
logger.error("I/O error", errno=e.errno, error=e.strerror)
|
||||
return False
|
||||
|
||||
|
||||
def get_pk():
|
||||
try:
|
||||
cli_pk_der = load_key_file_data(config.get("CLI_PUBLIC_KEY"))
|
||||
hex_pk_der = binascii.hexlify(cli_pk_der)
|
||||
|
||||
return hex_pk_der
|
||||
|
||||
except FileNotFoundError:
|
||||
logger.error("Client's public key file not found. Please check your settings")
|
||||
return False
|
||||
|
||||
except IOError as e:
|
||||
logger.error("I/O error", errno=e.errno, error=e.strerror)
|
||||
return False
|
||||
|
||||
except binascii.Error as e:
|
||||
logger.error("Could not successfully encode public key as hex: ", e)
|
||||
return False
|
||||
|
||||
|
||||
def show_usage():
|
||||
return (
|
||||
"USAGE: "
|
||||
@@ -332,7 +369,7 @@ def show_usage():
|
||||
"\n\t-s, --server \tAPI server where to send the requests. Defaults to btc.pisa.watch (modifiable in "
|
||||
"__init__.py)"
|
||||
"\n\t-p, --port \tAPI port where to send the requests. Defaults to 9814 (modifiable in __init__.py)"
|
||||
"\n\t-d, --debug \tshows debug information and stores it in pisa.log"
|
||||
"\n\t-d, --debug \tshows debug information and stores it in pisa_cli.log"
|
||||
"\n\t-h --help \tshows this message."
|
||||
)
|
||||
|
||||
@@ -366,7 +403,18 @@ if __name__ == "__main__":
|
||||
add_appointment(args)
|
||||
|
||||
elif command == "get_appointment":
|
||||
get_appointment(args)
|
||||
if not args:
|
||||
logger.error("No arguments were given")
|
||||
|
||||
else:
|
||||
arg_opt = args.pop(0)
|
||||
|
||||
if arg_opt in ["-h", "--help"]:
|
||||
sys.exit(help_get_appointment())
|
||||
|
||||
appointment_data = get_appointment(arg_opt)
|
||||
if appointment_data:
|
||||
print(appointment_data)
|
||||
|
||||
elif command == "help":
|
||||
if args:
|
||||
|
||||
Reference in New Issue
Block a user