mirror of
https://github.com/aljazceru/python-teos.git
synced 2025-12-18 06:34:19 +01:00
pisa-cli saving appointments and signatures to disk on success
This commit is contained in:
@@ -1,5 +1,5 @@
|
|||||||
import logging
|
import logging
|
||||||
from logger import Logger
|
from .logger import Logger
|
||||||
|
|
||||||
# PISA-SERVER
|
# PISA-SERVER
|
||||||
DEFAULT_PISA_API_SERVER = 'btc.pisa.watch'
|
DEFAULT_PISA_API_SERVER = 'btc.pisa.watch'
|
||||||
@@ -7,6 +7,7 @@ DEFAULT_PISA_API_PORT = 9814
|
|||||||
|
|
||||||
# PISA-CLI
|
# PISA-CLI
|
||||||
CLIENT_LOG_FILE = 'pisa-cli.log'
|
CLIENT_LOG_FILE = 'pisa-cli.log'
|
||||||
|
APPOINTMENTS_FOLDER_NAME = 'appointments'
|
||||||
|
|
||||||
# CRYPTO
|
# CRYPTO
|
||||||
SUPPORTED_HASH_FUNCTIONS = ["SHA256"]
|
SUPPORTED_HASH_FUNCTIONS = ["SHA256"]
|
||||||
|
|||||||
@@ -3,11 +3,13 @@ import os
|
|||||||
import sys
|
import sys
|
||||||
import json
|
import json
|
||||||
import requests
|
import requests
|
||||||
|
import time
|
||||||
from sys import argv
|
from sys import argv
|
||||||
from hashlib import sha256
|
from hashlib import sha256
|
||||||
from binascii import unhexlify
|
from binascii import unhexlify
|
||||||
from getopt import getopt, GetoptError
|
from getopt import getopt, GetoptError
|
||||||
from requests import ConnectTimeout, ConnectionError
|
from requests import ConnectTimeout, ConnectionError
|
||||||
|
from uuid import uuid4
|
||||||
|
|
||||||
|
|
||||||
from cryptography.hazmat.backends import default_backend
|
from cryptography.hazmat.backends import default_backend
|
||||||
@@ -18,7 +20,7 @@ from cryptography.exceptions import InvalidSignature, UnsupportedAlgorithm
|
|||||||
|
|
||||||
from apps.cli.blob import Blob
|
from apps.cli.blob import Blob
|
||||||
from apps.cli.help import help_add_appointment, help_get_appointment
|
from apps.cli.help import help_add_appointment, help_get_appointment
|
||||||
from apps.cli import DEFAULT_PISA_API_SERVER, DEFAULT_PISA_API_PORT, PISA_PUBLIC_KEY
|
from apps.cli import DEFAULT_PISA_API_SERVER, DEFAULT_PISA_API_PORT, PISA_PUBLIC_KEY, APPOINTMENTS_FOLDER_NAME
|
||||||
from apps.cli import logger
|
from apps.cli import logger
|
||||||
|
|
||||||
|
|
||||||
@@ -45,7 +47,7 @@ def generate_dummy_appointment():
|
|||||||
# Loads Pisa's public key from disk and verifies that the appointment signature is a valid signature from Pisa,
|
# Loads Pisa's public key from disk and verifies that the appointment signature is a valid signature from Pisa,
|
||||||
# returning True or False accordingly.
|
# returning True or False accordingly.
|
||||||
# Will raise NotFoundError or IOError if the attempts to open and read the public key file fail.
|
# Will raise NotFoundError or IOError if the attempts to open and read the public key file fail.
|
||||||
# Will raise ValueError if it the public key file was present but it failed to be unserialized.
|
# Will raise ValueError if it the public key file was present but it failed to be deserialized.
|
||||||
def is_appointment_signature_valid(appointment, signature):
|
def is_appointment_signature_valid(appointment, signature):
|
||||||
# Load the key from disk
|
# Load the key from disk
|
||||||
try:
|
try:
|
||||||
@@ -53,7 +55,7 @@ def is_appointment_signature_valid(appointment, signature):
|
|||||||
pubkey_pem = key_file.read().encode("utf-8")
|
pubkey_pem = key_file.read().encode("utf-8")
|
||||||
pisa_public_key = load_pem_public_key(pubkey_pem, backend=default_backend())
|
pisa_public_key = load_pem_public_key(pubkey_pem, backend=default_backend())
|
||||||
except UnsupportedAlgorithm:
|
except UnsupportedAlgorithm:
|
||||||
raise ValueError("Could not unserialize the public key (unsupported algorithm).")
|
raise ValueError("Could not deserialize the public key (unsupported algorithm).")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
sig_bytes = unhexlify(signature.encode('utf-8'))
|
sig_bytes = unhexlify(signature.encode('utf-8'))
|
||||||
@@ -65,6 +67,24 @@ def is_appointment_signature_valid(appointment, signature):
|
|||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def save_signed_appointment(appointment, signature):
|
||||||
|
# Create the appointments directory if it doesn't already exist
|
||||||
|
try:
|
||||||
|
os.makedirs(APPOINTMENTS_FOLDER_NAME)
|
||||||
|
except FileExistsError:
|
||||||
|
# directory already exists
|
||||||
|
pass
|
||||||
|
|
||||||
|
timestamp = int(time.time()*1000)
|
||||||
|
locator = appointment['locator']
|
||||||
|
uuid = uuid4() # prevent filename collisions
|
||||||
|
filename = "{}/appointment-{}-{}-{}.json".format(APPOINTMENTS_FOLDER_NAME, timestamp, locator, uuid)
|
||||||
|
data = {"appointment": appointment, "signature": signature}
|
||||||
|
|
||||||
|
with open(filename, "w") as f:
|
||||||
|
json.dump(data, f)
|
||||||
|
|
||||||
|
|
||||||
def add_appointment(args):
|
def add_appointment(args):
|
||||||
appointment_data = None
|
appointment_data = None
|
||||||
use_help = "Use 'help add_appointment' for help of how to use the command."
|
use_help = "Use 'help add_appointment' for help of how to use the command."
|
||||||
@@ -106,16 +126,20 @@ def add_appointment(args):
|
|||||||
try:
|
try:
|
||||||
r = requests.post(url=add_appointment_endpoint, json=appointment_json, timeout=5)
|
r = requests.post(url=add_appointment_endpoint, json=appointment_json, timeout=5)
|
||||||
|
|
||||||
logger.info("{} (code: {}).".format(r.json(), r.status_code))
|
|
||||||
|
|
||||||
response_json = r.json()
|
response_json = r.json()
|
||||||
|
print(response_json)
|
||||||
|
|
||||||
if r.status_code == HTTP_OK:
|
if r.status_code == HTTP_OK:
|
||||||
if 'signature' not in response_json:
|
if 'signature' not in response_json:
|
||||||
logger.error("The response does not contain the signature of the appointment.")
|
logger.error("The response does not contain the signature of the appointment.")
|
||||||
else:
|
else:
|
||||||
|
signature = response_json['signature']
|
||||||
# verify that the returned signature is valid
|
# verify that the returned signature is valid
|
||||||
if not is_appointment_signature_valid(appointment, response_json['signature']):
|
if is_appointment_signature_valid(appointment, signature):
|
||||||
|
logger.info("Appointment accepted and signed by Pisa.")
|
||||||
|
# TODO: store on disk
|
||||||
|
save_signed_appointment(appointment, signature)
|
||||||
|
else:
|
||||||
logger.error("The returned appointment's signature is invalid.")
|
logger.error("The returned appointment's signature is invalid.")
|
||||||
else:
|
else:
|
||||||
if 'error' not in response_json:
|
if 'error' not in response_json:
|
||||||
|
|||||||
Reference in New Issue
Block a user