mirror of
https://github.com/aljazceru/python-teos.git
synced 2025-12-17 14:14:22 +01:00
Reformats code to match code guidelines
This commit is contained in:
@@ -2,12 +2,12 @@ import logging
|
||||
from apps.cli.logger import Logger
|
||||
|
||||
# PISA-SERVER
|
||||
DEFAULT_PISA_API_SERVER = 'btc.pisa.watch'
|
||||
DEFAULT_PISA_API_SERVER = "btc.pisa.watch"
|
||||
DEFAULT_PISA_API_PORT = 9814
|
||||
|
||||
# PISA-CLI
|
||||
CLIENT_LOG_FILE = 'pisa-cli.log'
|
||||
APPOINTMENTS_FOLDER_NAME = 'appointments'
|
||||
CLIENT_LOG_FILE = "pisa-cli.log"
|
||||
APPOINTMENTS_FOLDER_NAME = "appointments"
|
||||
|
||||
# CRYPTO
|
||||
SUPPORTED_HASH_FUNCTIONS = ["SHA256"]
|
||||
@@ -16,9 +16,8 @@ SUPPORTED_CIPHERS = ["AES-GCM-128"]
|
||||
PISA_PUBLIC_KEY = "pisa_pk.pem"
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(format='%(message)s', level=logging.INFO, handlers=[
|
||||
logging.FileHandler(CLIENT_LOG_FILE),
|
||||
logging.StreamHandler()
|
||||
])
|
||||
logging.basicConfig(
|
||||
format="%(message)s", level=logging.INFO, handlers=[logging.FileHandler(CLIENT_LOG_FILE), logging.StreamHandler()]
|
||||
)
|
||||
|
||||
logger = Logger("Client")
|
||||
|
||||
@@ -9,7 +9,7 @@ from apps.cli import logger
|
||||
|
||||
class Blob:
|
||||
def __init__(self, data, cipher, hash_function):
|
||||
if type(data) is not str or re.search(r'^[0-9A-Fa-f]+$', data) is None:
|
||||
if type(data) is not str or re.search(r"^[0-9A-Fa-f]+$", data) is None:
|
||||
raise ValueError("Non-Hex character found in txid.")
|
||||
|
||||
self.data = data
|
||||
@@ -18,19 +18,23 @@ class Blob:
|
||||
|
||||
# FIXME: We only support SHA256 for now
|
||||
if self.hash_function.upper() not in SUPPORTED_HASH_FUNCTIONS:
|
||||
raise ValueError("Hash function not supported ({}). Supported Hash functions: {}"
|
||||
.format(self.hash_function, SUPPORTED_HASH_FUNCTIONS))
|
||||
raise ValueError(
|
||||
"Hash function not supported ({}). Supported Hash functions: {}".format(
|
||||
self.hash_function, SUPPORTED_HASH_FUNCTIONS
|
||||
)
|
||||
)
|
||||
|
||||
# FIXME: We only support AES-GCM-128 for now
|
||||
if self.cipher.upper() not in SUPPORTED_CIPHERS:
|
||||
raise ValueError("Cipher not supported ({}). Supported ciphers: {}".format(self.hash_function,
|
||||
SUPPORTED_CIPHERS))
|
||||
raise ValueError(
|
||||
"Cipher not supported ({}). Supported ciphers: {}".format(self.hash_function, SUPPORTED_CIPHERS)
|
||||
)
|
||||
|
||||
def encrypt(self, tx_id):
|
||||
if len(tx_id) != 64:
|
||||
raise ValueError("txid does not matches the expected size (32-byte / 64 hex chars).")
|
||||
|
||||
elif re.search(r'^[0-9A-Fa-f]+$', tx_id) is None:
|
||||
elif re.search(r"^[0-9A-Fa-f]+$", tx_id) is None:
|
||||
raise ValueError("Non-Hex character found in txid.")
|
||||
|
||||
# Transaction to be encrypted
|
||||
@@ -50,10 +54,12 @@ class Blob:
|
||||
encrypted_blob = aesgcm.encrypt(nonce=nonce, data=tx, associated_data=None)
|
||||
encrypted_blob = hexlify(encrypted_blob).decode()
|
||||
|
||||
logger.info("Creating new blob",
|
||||
master_key=hexlify(master_key).decode(),
|
||||
sk=hexlify(sk).decode(),
|
||||
nonce=hexlify(nonce).decode(),
|
||||
encrypted_blob=encrypted_blob)
|
||||
logger.info(
|
||||
"Creating new blob",
|
||||
master_key=hexlify(master_key).decode(),
|
||||
sk=hexlify(sk).decode(),
|
||||
nonce=hexlify(nonce).decode(),
|
||||
encrypted_blob=encrypted_blob,
|
||||
)
|
||||
|
||||
return encrypted_blob
|
||||
|
||||
@@ -1,21 +1,25 @@
|
||||
def help_add_appointment():
|
||||
return "NAME:" \
|
||||
"\tpython pisa-cli add_appointment - Registers a json formatted appointment to the PISA server." \
|
||||
"\n\nUSAGE:" \
|
||||
"\tpython pisa-cli add_appointment [command options] appointment/path_to_appointment_file" \
|
||||
"\n\nDESCRIPTION:" \
|
||||
"\n\n\tRegisters a json formatted appointment to the PISA server." \
|
||||
"\n\tif -f, --file *is* specified, then the command expects a path to a json file instead of a json encoded " \
|
||||
"\n\tstring as parameter." \
|
||||
"\n\nOPTIONS:" \
|
||||
"\n\t -f, --file path_to_json_file\t loads the appointment data from the specified json file instead of" \
|
||||
return (
|
||||
"NAME:"
|
||||
"\tpython pisa-cli add_appointment - Registers a json formatted appointment to the PISA server."
|
||||
"\n\nUSAGE:"
|
||||
"\tpython pisa-cli add_appointment [command options] appointment/path_to_appointment_file"
|
||||
"\n\nDESCRIPTION:"
|
||||
"\n\n\tRegisters a json formatted appointment to the PISA server."
|
||||
"\n\tif -f, --file *is* specified, then the command expects a path to a json file instead of a json encoded "
|
||||
"\n\tstring as parameter."
|
||||
"\n\nOPTIONS:"
|
||||
"\n\t -f, --file path_to_json_file\t loads the appointment data from the specified json file instead of"
|
||||
"\n\t\t\t\t\t command line"
|
||||
)
|
||||
|
||||
|
||||
def help_get_appointment():
|
||||
return "NAME:" \
|
||||
"\tpython pisa-cli get_appointment - Gets json formatted data about an appointment from the PISA server." \
|
||||
"\n\nUSAGE:" \
|
||||
"\tpython pisa-cli get_appointment appointment_locator" \
|
||||
"\n\nDESCRIPTION:" \
|
||||
"\n\n\tGets json formatted data about an appointment from the PISA server.\n"
|
||||
return (
|
||||
"NAME:"
|
||||
"\tpython pisa-cli get_appointment - Gets json formatted data about an appointment from the PISA server."
|
||||
"\n\nUSAGE:"
|
||||
"\tpython pisa-cli get_appointment appointment_locator"
|
||||
"\n\nDESCRIPTION:"
|
||||
"\n\n\tGets json formatted data about an appointment from the PISA server.\n"
|
||||
)
|
||||
|
||||
@@ -11,7 +11,6 @@ from getopt import getopt, GetoptError
|
||||
from requests import ConnectTimeout, ConnectionError
|
||||
from uuid import uuid4
|
||||
|
||||
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
from cryptography.hazmat.primitives.serialization import load_pem_public_key
|
||||
@@ -34,14 +33,19 @@ def generate_dummy_appointment():
|
||||
|
||||
current_height = r.json().get("block_count")
|
||||
|
||||
dummy_appointment_data = {"tx": os.urandom(192).hex(), "tx_id": os.urandom(32).hex(),
|
||||
"start_time": current_height + 5, "end_time": current_height + 10, "dispute_delta": 20}
|
||||
dummy_appointment_data = {
|
||||
"tx": os.urandom(192).hex(),
|
||||
"tx_id": os.urandom(32).hex(),
|
||||
"start_time": current_height + 5,
|
||||
"end_time": current_height + 10,
|
||||
"dispute_delta": 20,
|
||||
}
|
||||
|
||||
print('Generating dummy appointment data:''\n\n' + json.dumps(dummy_appointment_data, indent=4, sort_keys=True))
|
||||
print("Generating dummy appointment data:" "\n\n" + json.dumps(dummy_appointment_data, indent=4, sort_keys=True))
|
||||
|
||||
json.dump(dummy_appointment_data, open('dummy_appointment_data.json', 'w'))
|
||||
json.dump(dummy_appointment_data, open("dummy_appointment_data.json", "w"))
|
||||
|
||||
print('\nData stored in dummy_appointment_data.json')
|
||||
print("\nData stored in dummy_appointment_data.json")
|
||||
|
||||
|
||||
# Loads and returns Pisa's public key from disk.
|
||||
@@ -53,6 +57,7 @@ def load_pisa_public_key():
|
||||
pubkey_pem = key_file.read().encode("utf-8")
|
||||
pisa_public_key = load_pem_public_key(pubkey_pem, backend=default_backend())
|
||||
return pisa_public_key
|
||||
|
||||
except UnsupportedAlgorithm:
|
||||
raise ValueError("Could not deserialize the public key (unsupported algorithm).")
|
||||
|
||||
@@ -61,10 +66,11 @@ def load_pisa_public_key():
|
||||
# returning True or False accordingly.
|
||||
def is_appointment_signature_valid(appointment, signature, pk):
|
||||
try:
|
||||
sig_bytes = unhexlify(signature.encode('utf-8'))
|
||||
data = json.dumps(appointment, sort_keys=True, separators=(',', ':')).encode("utf-8")
|
||||
sig_bytes = unhexlify(signature.encode("utf-8"))
|
||||
data = json.dumps(appointment, sort_keys=True, separators=(",", ":")).encode("utf-8")
|
||||
pk.verify(sig_bytes, data, ec.ECDSA(hashes.SHA256()))
|
||||
return True
|
||||
|
||||
except InvalidSignature:
|
||||
return False
|
||||
|
||||
@@ -75,8 +81,9 @@ def save_signed_appointment(appointment, signature):
|
||||
os.makedirs(APPOINTMENTS_FOLDER_NAME, exist_ok=True)
|
||||
|
||||
timestamp = int(time.time())
|
||||
locator = appointment['locator']
|
||||
locator = appointment["locator"]
|
||||
uuid = uuid4().hex # prevent filename collisions
|
||||
|
||||
filename = "{}/appointment-{}-{}-{}.json".format(APPOINTMENTS_FOLDER_NAME, timestamp, locator, uuid)
|
||||
data = {"appointment": appointment, "signature": signature}
|
||||
|
||||
@@ -95,10 +102,10 @@ def add_appointment(args):
|
||||
arg_opt = args.pop(0)
|
||||
|
||||
try:
|
||||
if arg_opt in ['-h', '--help']:
|
||||
if arg_opt in ["-h", "--help"]:
|
||||
sys.exit(help_add_appointment())
|
||||
|
||||
if arg_opt in ['-f', '--file']:
|
||||
if arg_opt in ["-f", "--file"]:
|
||||
fin = args.pop(0)
|
||||
if not os.path.isfile(fin):
|
||||
logger.error("Can't find file " + fin)
|
||||
@@ -107,6 +114,7 @@ def add_appointment(args):
|
||||
try:
|
||||
with open(fin) as f:
|
||||
appointment_data = json.load(f)
|
||||
|
||||
except IOError as e:
|
||||
logger.error("I/O error({}): {}".format(e.errno, e.strerror))
|
||||
return False
|
||||
@@ -121,17 +129,21 @@ def add_appointment(args):
|
||||
logger.error("The provided JSON is empty.")
|
||||
return False
|
||||
|
||||
valid_locator = check_txid_format(appointment_data.get('tx_id'))
|
||||
valid_locator = check_txid_format(appointment_data.get("tx_id"))
|
||||
|
||||
if not valid_locator:
|
||||
logger.error("The provided locator is not valid.")
|
||||
return False
|
||||
|
||||
add_appointment_endpoint = "http://{}:{}".format(pisa_api_server, pisa_api_port)
|
||||
appointment = build_appointment(appointment_data.get('tx'), appointment_data.get('tx_id'),
|
||||
appointment_data.get('start_time'), appointment_data.get('end_time'),
|
||||
appointment_data.get('dispute_delta'))
|
||||
appointment_json = json.dumps(appointment, sort_keys=True, separators=(',', ':'))
|
||||
appointment = build_appointment(
|
||||
appointment_data.get("tx"),
|
||||
appointment_data.get("tx_id"),
|
||||
appointment_data.get("start_time"),
|
||||
appointment_data.get("end_time"),
|
||||
appointment_data.get("dispute_delta"),
|
||||
)
|
||||
appointment_json = json.dumps(appointment, sort_keys=True, separators=(",", ":"))
|
||||
|
||||
logger.info("Sending appointment to PISA")
|
||||
|
||||
@@ -153,30 +165,33 @@ def add_appointment(args):
|
||||
return False
|
||||
|
||||
if r.status_code != HTTP_OK:
|
||||
if 'error' not in response_json:
|
||||
logger.error("The server returned status code {}, but no error description."
|
||||
.format(r.status_code))
|
||||
if "error" not in response_json:
|
||||
logger.error("The server returned status code {}, but no error description.".format(r.status_code))
|
||||
else:
|
||||
error = response_json['error']
|
||||
logger.error("The server returned status code {}, and the following error: {}."
|
||||
.format(r.status_code, error))
|
||||
error = response_json["error"]
|
||||
logger.error(
|
||||
"The server returned status code {}, and the following error: {}.".format(r.status_code, error)
|
||||
)
|
||||
return False
|
||||
|
||||
if 'signature' not in response_json:
|
||||
if "signature" not in response_json:
|
||||
logger.error("The response does not contain the signature of the appointment.")
|
||||
return False
|
||||
|
||||
signature = response_json['signature']
|
||||
signature = response_json["signature"]
|
||||
# verify that the returned signature is valid
|
||||
try:
|
||||
pk = load_pisa_public_key()
|
||||
is_sig_valid = is_appointment_signature_valid(appointment, signature, pk)
|
||||
|
||||
except ValueError:
|
||||
logger.error("Failed to deserialize the public key. It might be in an unsupported format.")
|
||||
return False
|
||||
|
||||
except FileNotFoundError:
|
||||
logger.error("Pisa's public key file not found. Please check your settings.")
|
||||
return False
|
||||
|
||||
except IOError as e:
|
||||
logger.error("I/O error({}): {}".format(e.errno, e.strerror))
|
||||
return False
|
||||
@@ -189,6 +204,7 @@ def add_appointment(args):
|
||||
# all good, store appointment and signature
|
||||
try:
|
||||
save_signed_appointment(appointment, signature)
|
||||
|
||||
except OSError as e:
|
||||
logger.error("There was an error while saving the appointment: {}".format(e))
|
||||
return False
|
||||
@@ -203,7 +219,7 @@ def get_appointment(args):
|
||||
|
||||
arg_opt = args.pop(0)
|
||||
|
||||
if arg_opt in ['-h', '--help']:
|
||||
if arg_opt in ["-h", "--help"]:
|
||||
sys.exit(help_get_appointment())
|
||||
else:
|
||||
locator = arg_opt
|
||||
@@ -215,6 +231,7 @@ def get_appointment(args):
|
||||
|
||||
get_appointment_endpoint = "http://{}:{}/get_appointment".format(pisa_api_server, pisa_api_port)
|
||||
parameters = "?locator={}".format(locator)
|
||||
|
||||
try:
|
||||
r = requests.get(url=get_appointment_endpoint + parameters, timeout=5)
|
||||
|
||||
@@ -241,8 +258,14 @@ def build_appointment(tx, tx_id, start_time, end_time, dispute_delta):
|
||||
encrypted_blob = blob.encrypt(tx_id)
|
||||
|
||||
appointment = {
|
||||
'locator': locator, 'start_time': start_time, 'end_time': end_time, 'dispute_delta': dispute_delta,
|
||||
'encrypted_blob': encrypted_blob, 'cipher': cipher, 'hash_function': hash_function}
|
||||
"locator": locator,
|
||||
"start_time": start_time,
|
||||
"end_time": end_time,
|
||||
"dispute_delta": dispute_delta,
|
||||
"encrypted_blob": encrypted_blob,
|
||||
"cipher": cipher,
|
||||
"hash_function": hash_function,
|
||||
}
|
||||
|
||||
return appointment
|
||||
|
||||
@@ -252,61 +275,62 @@ def check_txid_format(txid):
|
||||
sys.exit("locator does not matches the expected size (32-byte / 64 hex chars).")
|
||||
|
||||
# TODO: #12-check-txid-regexp
|
||||
return re.search(r'^[0-9A-Fa-f]+$', txid) is not None
|
||||
return re.search(r"^[0-9A-Fa-f]+$", txid) is not None
|
||||
|
||||
|
||||
def show_usage():
|
||||
return ('USAGE: '
|
||||
'\n\tpython pisa-cli.py [global options] command [command options] [arguments]'
|
||||
'\n\nCOMMANDS:'
|
||||
'\n\tadd_appointment \tRegisters a json formatted appointment to the PISA server.'
|
||||
'\n\tget_appointment \tGets json formatted data about an appointment from the PISA server.'
|
||||
'\n\thelp \t\t\tShows a list of commands or help for a specific command.'
|
||||
|
||||
'\n\nGLOBAL OPTIONS:'
|
||||
'\n\t-s, --server \tAPI server where to send the requests. Defaults to btc.pisa.watch (modifiable in '
|
||||
'__init__.py)'
|
||||
'\n\t-p, --port \tAPI port where to send the requests. Defaults to 9814 (modifiable in __init__.py)'
|
||||
'\n\t-d, --debug \tshows debug information and stores it in pisa.log'
|
||||
'\n\t-h --help \tshows this message.')
|
||||
return (
|
||||
"USAGE: "
|
||||
"\n\tpython pisa-cli.py [global options] command [command options] [arguments]"
|
||||
"\n\nCOMMANDS:"
|
||||
"\n\tadd_appointment \tRegisters a json formatted appointment to the PISA server."
|
||||
"\n\tget_appointment \tGets json formatted data about an appointment from the PISA server."
|
||||
"\n\thelp \t\t\tShows a list of commands or help for a specific command."
|
||||
"\n\nGLOBAL OPTIONS:"
|
||||
"\n\t-s, --server \tAPI server where to send the requests. Defaults to btc.pisa.watch (modifiable in "
|
||||
"__init__.py)"
|
||||
"\n\t-p, --port \tAPI port where to send the requests. Defaults to 9814 (modifiable in __init__.py)"
|
||||
"\n\t-d, --debug \tshows debug information and stores it in pisa.log"
|
||||
"\n\t-h --help \tshows this message."
|
||||
)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
if __name__ == "__main__":
|
||||
pisa_api_server = DEFAULT_PISA_API_SERVER
|
||||
pisa_api_port = DEFAULT_PISA_API_PORT
|
||||
commands = ['add_appointment', 'get_appointment', 'help']
|
||||
testing_commands = ['generate_dummy_appointment']
|
||||
commands = ["add_appointment", "get_appointment", "help"]
|
||||
testing_commands = ["generate_dummy_appointment"]
|
||||
|
||||
try:
|
||||
opts, args = getopt(argv[1:], 's:p:h', ['server', 'port', 'help'])
|
||||
opts, args = getopt(argv[1:], "s:p:h", ["server", "port", "help"])
|
||||
|
||||
for opt, arg in opts:
|
||||
if opt in ['-s', 'server']:
|
||||
if opt in ["-s", "server"]:
|
||||
if arg:
|
||||
pisa_api_server = arg
|
||||
|
||||
if opt in ['-p', '--port']:
|
||||
if opt in ["-p", "--port"]:
|
||||
if arg:
|
||||
pisa_api_port = int(arg)
|
||||
|
||||
if opt in ['-h', '--help']:
|
||||
if opt in ["-h", "--help"]:
|
||||
sys.exit(show_usage())
|
||||
|
||||
if args:
|
||||
command = args.pop(0)
|
||||
|
||||
if command in commands:
|
||||
if command == 'add_appointment':
|
||||
if command == "add_appointment":
|
||||
add_appointment(args)
|
||||
|
||||
elif command == 'get_appointment':
|
||||
elif command == "get_appointment":
|
||||
get_appointment(args)
|
||||
|
||||
elif command == 'help':
|
||||
elif command == "help":
|
||||
if args:
|
||||
command = args.pop(0)
|
||||
|
||||
if command == 'add_appointment':
|
||||
if command == "add_appointment":
|
||||
sys.exit(help_add_appointment())
|
||||
|
||||
elif command == "get_appointment":
|
||||
@@ -320,7 +344,7 @@ if __name__ == '__main__':
|
||||
|
||||
# FIXME: testing command, not for production
|
||||
elif command in testing_commands:
|
||||
if command == 'generate_dummy_appointment':
|
||||
if command == "generate_dummy_appointment":
|
||||
generate_dummy_appointment()
|
||||
|
||||
else:
|
||||
|
||||
@@ -9,39 +9,35 @@ from cryptography.hazmat.primitives.asymmetric import ec
|
||||
# Simple tool to generate an ECDSA private key using the secp256k1 curve and save private and public keys
|
||||
# as 'pisa_sk.pem' 'and pisa_pk.pem', respectively.
|
||||
|
||||
SK_FILE_NAME = 'pisa_sk.pem'
|
||||
PK_FILE_NAME = 'pisa_pk.pem'
|
||||
SK_FILE_NAME = "pisa_sk.pem"
|
||||
PK_FILE_NAME = "pisa_pk.pem"
|
||||
|
||||
|
||||
def save_sk(sk, filename):
|
||||
pem = sk.private_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PrivateFormat.TraditionalOpenSSL,
|
||||
encryption_algorithm=serialization.NoEncryption()
|
||||
encryption_algorithm=serialization.NoEncryption(),
|
||||
)
|
||||
with open(filename, 'wb') as pem_out:
|
||||
|
||||
with open(filename, "wb") as pem_out:
|
||||
pem_out.write(pem)
|
||||
|
||||
|
||||
def save_pk(pk, filename):
|
||||
pem = pk.public_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PublicFormat.SubjectPublicKeyInfo
|
||||
)
|
||||
with open(filename, 'wb') as pem_out:
|
||||
pem = pk.public_bytes(encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo)
|
||||
with open(filename, "wb") as pem_out:
|
||||
pem_out.write(pem)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
if __name__ == "__main__":
|
||||
if os.path.exists(SK_FILE_NAME):
|
||||
print("A key with name \"{}\" already exists. Aborting.".format(SK_FILE_NAME))
|
||||
print('A key with name "{}" already exists. Aborting.'.format(SK_FILE_NAME))
|
||||
exit(1)
|
||||
|
||||
sk = ec.generate_private_key(
|
||||
ec.SECP256K1, default_backend()
|
||||
)
|
||||
sk = ec.generate_private_key(ec.SECP256K1, default_backend())
|
||||
pk = sk.public_key()
|
||||
|
||||
save_sk(sk, SK_FILE_NAME)
|
||||
save_pk(pk, PK_FILE_NAME)
|
||||
print("Saved private key \"{}\" and public key \"{}\".".format(SK_FILE_NAME, PK_FILE_NAME))
|
||||
print('Saved private key "{}" and public key "{}".'.format(SK_FILE_NAME, PK_FILE_NAME))
|
||||
|
||||
Reference in New Issue
Block a user