mirror of
https://github.com/aljazceru/python-teos.git
synced 2025-12-17 22:24:23 +01:00
Updates cli to run with config file
This commit is contained in:
@@ -11,19 +11,18 @@ from uuid import uuid4
|
|||||||
|
|
||||||
from apps.cli.help import help_add_appointment, help_get_appointment
|
from apps.cli.help import help_add_appointment, help_get_appointment
|
||||||
from apps.cli.blob import Blob
|
from apps.cli.blob import Blob
|
||||||
from apps.cli import (
|
import apps.cli.conf as conf
|
||||||
DEFAULT_PISA_API_SERVER,
|
|
||||||
DEFAULT_PISA_API_PORT,
|
|
||||||
CLI_PUBLIC_KEY,
|
|
||||||
CLI_PRIVATE_KEY,
|
|
||||||
PISA_PUBLIC_KEY,
|
|
||||||
APPOINTMENTS_FOLDER_NAME,
|
|
||||||
)
|
|
||||||
|
|
||||||
from common.logger import Logger
|
from common.logger import Logger
|
||||||
from common.appointment import Appointment
|
from common.appointment import Appointment
|
||||||
from common.cryptographer import Cryptographer
|
from common.cryptographer import Cryptographer
|
||||||
from common.tools import check_sha256_hex_format, check_locator_format, compute_locator
|
from common.tools import (
|
||||||
|
check_sha256_hex_format,
|
||||||
|
check_locator_format,
|
||||||
|
compute_locator,
|
||||||
|
check_conf_fields,
|
||||||
|
setup_data_folder,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
HTTP_OK = 200
|
HTTP_OK = 200
|
||||||
@@ -54,6 +53,42 @@ def generate_dummy_appointment():
|
|||||||
logger.info("\nData stored in dummy_appointment_data.json")
|
logger.info("\nData stored in dummy_appointment_data.json")
|
||||||
|
|
||||||
|
|
||||||
|
def load_config(config):
|
||||||
|
"""
|
||||||
|
Looks through all of the config options to make sure they contain the right type of data and builds a config
|
||||||
|
dictionary.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
config (:obj:`module`): It takes in a config module object.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
:obj:`dict` A dictionary containing the config values.
|
||||||
|
"""
|
||||||
|
|
||||||
|
conf_dict = {}
|
||||||
|
|
||||||
|
data_folder = config.DATA_FOLDER
|
||||||
|
if isinstance(data_folder, str):
|
||||||
|
data_folder = os.path.expanduser(data_folder)
|
||||||
|
else:
|
||||||
|
raise ValueError("The provided user folder is invalid.")
|
||||||
|
|
||||||
|
conf_fields = {
|
||||||
|
"DEFAULT_PISA_API_SERVER": {"value": config.DEFAULT_PISA_API_SERVER, "type": str},
|
||||||
|
"DEFAULT_PISA_API_PORT": {"value": config.DEFAULT_PISA_API_PORT, "type": int},
|
||||||
|
"DATA_FOLDER": {"value": data_folder, "type": str},
|
||||||
|
"CLIENT_LOG_FILE": {"value": data_folder + config.CLIENT_LOG_FILE, "type": str},
|
||||||
|
"APPOINTMENTS_FOLDER_NAME": {"value": data_folder + config.APPOINTMENTS_FOLDER_NAME, "type": str},
|
||||||
|
"CLI_PUBLIC_KEY": {"value": data_folder + config.CLI_PUBLIC_KEY, "type": str},
|
||||||
|
"CLI_PRIVATE_KEY": {"value": data_folder + config.CLI_PRIVATE_KEY, "type": str},
|
||||||
|
"PISA_PUBLIC_KEY": {"value": data_folder + config.PISA_PUBLIC_KEY, "type": str},
|
||||||
|
}
|
||||||
|
|
||||||
|
check_conf_fields(conf_fields, logger)
|
||||||
|
|
||||||
|
return conf_dict
|
||||||
|
|
||||||
|
|
||||||
# Loads and returns Pisa keys from disk
|
# Loads and returns Pisa keys from disk
|
||||||
def load_key_file_data(file_name):
|
def load_key_file_data(file_name):
|
||||||
try:
|
try:
|
||||||
@@ -73,13 +108,13 @@ def load_key_file_data(file_name):
|
|||||||
# Makes sure that the folder APPOINTMENTS_FOLDER_NAME exists, then saves the appointment and signature in it.
|
# Makes sure that the folder APPOINTMENTS_FOLDER_NAME exists, then saves the appointment and signature in it.
|
||||||
def save_signed_appointment(appointment, signature):
|
def save_signed_appointment(appointment, signature):
|
||||||
# Create the appointments directory if it doesn't already exist
|
# Create the appointments directory if it doesn't already exist
|
||||||
os.makedirs(APPOINTMENTS_FOLDER_NAME, exist_ok=True)
|
os.makedirs(config.get("APPOINTMENTS_FOLDER_NAME"), exist_ok=True)
|
||||||
|
|
||||||
timestamp = int(time.time())
|
timestamp = int(time.time())
|
||||||
locator = appointment["locator"]
|
locator = appointment["locator"]
|
||||||
uuid = uuid4().hex # prevent filename collisions
|
uuid = uuid4().hex # prevent filename collisions
|
||||||
|
|
||||||
filename = "{}/appointment-{}-{}-{}.json".format(APPOINTMENTS_FOLDER_NAME, timestamp, locator, uuid)
|
filename = "{}/appointment-{}-{}-{}.json".format(config.get("APPOINTMENTS_FOLDER_NAME"), timestamp, locator, uuid)
|
||||||
data = {"appointment": appointment, "signature": signature}
|
data = {"appointment": appointment, "signature": signature}
|
||||||
|
|
||||||
with open(filename, "w") as f:
|
with open(filename, "w") as f:
|
||||||
@@ -233,7 +268,7 @@ def post_data_to_add_appointment_endpoint(data):
|
|||||||
# Verify that the signature returned from the watchtower is valid.
|
# Verify that the signature returned from the watchtower is valid.
|
||||||
def check_signature(signature, appointment):
|
def check_signature(signature, appointment):
|
||||||
try:
|
try:
|
||||||
pisa_pk_der = load_key_file_data(PISA_PUBLIC_KEY)
|
pisa_pk_der = load_key_file_data(config.get("PISA_PUBLIC_KEY"))
|
||||||
pisa_pk = Cryptographer.load_public_key_der(pisa_pk_der)
|
pisa_pk = Cryptographer.load_public_key_der(pisa_pk_der)
|
||||||
|
|
||||||
if pisa_pk is None:
|
if pisa_pk is None:
|
||||||
@@ -287,7 +322,7 @@ def get_appointment(args):
|
|||||||
|
|
||||||
def get_appointment_signature(appointment):
|
def get_appointment_signature(appointment):
|
||||||
try:
|
try:
|
||||||
sk_der = load_key_file_data(CLI_PRIVATE_KEY)
|
sk_der = load_key_file_data(config.get("CLI_PRIVATE_KEY"))
|
||||||
cli_sk = Cryptographer.load_private_key_der(sk_der)
|
cli_sk = Cryptographer.load_private_key_der(sk_der)
|
||||||
|
|
||||||
signature = Cryptographer.sign(appointment.serialize(), cli_sk)
|
signature = Cryptographer.sign(appointment.serialize(), cli_sk)
|
||||||
@@ -309,7 +344,7 @@ def get_appointment_signature(appointment):
|
|||||||
|
|
||||||
def get_pk():
|
def get_pk():
|
||||||
try:
|
try:
|
||||||
cli_pk_der = load_key_file_data(CLI_PUBLIC_KEY)
|
cli_pk_der = load_key_file_data(config.get("CLI_PUBLIC_KEY"))
|
||||||
hex_pk_der = binascii.hexlify(cli_pk_der)
|
hex_pk_der = binascii.hexlify(cli_pk_der)
|
||||||
|
|
||||||
return hex_pk_der
|
return hex_pk_der
|
||||||
@@ -345,11 +380,16 @@ def show_usage():
|
|||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
pisa_api_server = DEFAULT_PISA_API_SERVER
|
config = load_config(conf)
|
||||||
pisa_api_port = DEFAULT_PISA_API_PORT
|
|
||||||
|
pisa_api_server = config.get("DEFAULT_PISA_API_SERVER")
|
||||||
|
pisa_api_port = config.get("DEFAULT_PISA_API_PORT")
|
||||||
commands = ["add_appointment", "get_appointment", "help"]
|
commands = ["add_appointment", "get_appointment", "help"]
|
||||||
testing_commands = ["generate_dummy_appointment"]
|
testing_commands = ["generate_dummy_appointment"]
|
||||||
|
|
||||||
|
# Create user folder if missing
|
||||||
|
setup_data_folder(config.get("DATA_FOLDER"), logger)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
opts, args = getopt(argv[1:], "s:p:h", ["server", "port", "help"])
|
opts, args = getopt(argv[1:], "s:p:h", ["server", "port", "help"])
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user