mirror of
https://github.com/aljazceru/python-teos.git
synced 2026-01-25 09:14:23 +01:00
Updates get_appointment status messages and small changes
This commit is contained in:
51
pisa-btc/apps/cli/blob.py
Normal file
51
pisa-btc/apps/cli/blob.py
Normal file
@@ -0,0 +1,51 @@
|
||||
from binascii import hexlify, unhexlify
|
||||
from hashlib import sha256
|
||||
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
|
||||
from conf import SUPPORTED_HASH_FUNCTIONS, SUPPORTED_CIPHERS
|
||||
|
||||
|
||||
class Blob:
|
||||
def __init__(self, data, cipher, hash_function):
|
||||
self.data = data
|
||||
self.cipher = cipher
|
||||
self.hash_function = hash_function
|
||||
|
||||
# FIXME: We only support SHA256 for now
|
||||
if self.hash_function.upper() not in SUPPORTED_HASH_FUNCTIONS:
|
||||
raise Exception("Hash function not supported ({}). Supported Hash functions: {}"
|
||||
.format(self.hash_function, SUPPORTED_HASH_FUNCTIONS))
|
||||
|
||||
# FIXME: We only support AES-GCM-128 for now
|
||||
if self.cipher.upper() not in SUPPORTED_CIPHERS:
|
||||
raise Exception("Cipher not supported ({}). Supported ciphers: {}".format(self.hash_function,
|
||||
SUPPORTED_CIPHERS))
|
||||
|
||||
def encrypt(self, tx_id, debug, logging):
|
||||
# Transaction to be encrypted
|
||||
# FIXME: The blob data should contain more things that just the transaction. Leaving like this for now.
|
||||
tx = unhexlify(self.data)
|
||||
|
||||
# FIXME: tx_id should not be necessary (can be derived from tx SegWit-like). Passing it for now
|
||||
# Extend the key using HKDF
|
||||
tx_id = unhexlify(tx_id)
|
||||
|
||||
# master_key = H(tx_id | tx_id)
|
||||
master_key = sha256(tx_id + tx_id).digest()
|
||||
|
||||
# The 16 MSB of the master key will serve as the AES GCM 128 secret key. The 16 LSB will serve as the IV.
|
||||
sk = master_key[:16]
|
||||
nonce = master_key[16:]
|
||||
|
||||
# Encrypt the data
|
||||
aesgcm = AESGCM(sk)
|
||||
encrypted_blob = aesgcm.encrypt(nonce=nonce, data=tx, associated_data=None)
|
||||
encrypted_blob = hexlify(encrypted_blob).decode()
|
||||
|
||||
if debug:
|
||||
logging.info("[Client] creating new blob")
|
||||
logging.info("[Client] master key: {}".format(hexlify(master_key).decode()))
|
||||
logging.info("[Client] sk: {}".format(hexlify(sk).decode()))
|
||||
logging.info("[Client] nonce: {}".format(hexlify(nonce).decode()))
|
||||
logging.info("[Client] encrypted_blob: {}".format(encrypted_blob))
|
||||
|
||||
return encrypted_blob
|
||||
188
pisa-btc/apps/cli/pisa-cli.py
Normal file
188
pisa-btc/apps/cli/pisa-cli.py
Normal file
@@ -0,0 +1,188 @@
|
||||
import requests
|
||||
import re
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
from getopt import getopt, GetoptError
|
||||
from sys import argv
|
||||
import logging
|
||||
from conf import CLIENT_LOG_FILE
|
||||
from hashlib import sha256
|
||||
from binascii import unhexlify
|
||||
from apps.blob import Blob
|
||||
from requests import ConnectTimeout, ConnectionError
|
||||
from apps import DEFAULT_PISA_API_SERVER, DEFAULT_PISA_API_PORT
|
||||
|
||||
|
||||
def add_appointment(args):
|
||||
use_help = "Use 'help add_appointment' for help of how to use the command."
|
||||
|
||||
if args:
|
||||
arg_opt = args.pop(0)
|
||||
|
||||
try:
|
||||
if arg_opt in ['-h', '--help']:
|
||||
sys.exit(help_add_appointment())
|
||||
|
||||
if arg_opt in ['-f', '--file']:
|
||||
if args:
|
||||
fin = args.pop(0)
|
||||
if os.path.isfile(fin):
|
||||
appointment_data = json.load(open(fin))
|
||||
else:
|
||||
sys.exit("Can't find file " + fin)
|
||||
else:
|
||||
sys.exit("No file provided as appointment. " + use_help)
|
||||
else:
|
||||
appointment_data = json.loads(arg_opt)
|
||||
|
||||
except json.JSONDecodeError:
|
||||
sys.exit("Non-JSON encoded data provided as appointment. " + use_help)
|
||||
|
||||
else:
|
||||
sys.exit("No appointment data provided. " + use_help)
|
||||
|
||||
valid_locator = check_txid_format(appointment_data.get('tx_id'))
|
||||
|
||||
if valid_locator:
|
||||
pisa_url = "http://{}:{}".format(pisa_api_server, pisa_api_port)
|
||||
appointment = build_appointment(appointment_data.get('tx'), appointment_data.get('tx_id'),
|
||||
appointment_data.get('start_time'), appointment_data.get('end_time'),
|
||||
appointment_data.get('dispute_delta'), debug, logging)
|
||||
|
||||
if debug:
|
||||
logging.info("[Client] sending appointment to PISA")
|
||||
|
||||
try:
|
||||
r = requests.post(url=pisa_url, json=json.dumps(appointment), timeout=5)
|
||||
|
||||
if debug:
|
||||
logging.info("[Client] {} (code: {})".format(r.text, r.status_code))
|
||||
else:
|
||||
print("{} (code: {}).".format(r.text, r.status_code))
|
||||
|
||||
except ConnectTimeout:
|
||||
if debug:
|
||||
logging.info("[Client] can't connect to pisa API. Connection timeout")
|
||||
else:
|
||||
sys.exit("Can't connect to pisa API. Connection timeout.")
|
||||
|
||||
except ConnectionError:
|
||||
if debug:
|
||||
logging.info("[Client] can't connect to pisa API. Server cannot be reached")
|
||||
else:
|
||||
sys.exit("Can't connect to pisa API. Server cannot be reached.")
|
||||
else:
|
||||
raise sys.exit("The provided locator is not valid.")
|
||||
|
||||
|
||||
def build_appointment(tx, tx_id, start_block, end_block, dispute_delta, debug, logging):
|
||||
locator = sha256(unhexlify(tx_id)).hexdigest()
|
||||
|
||||
cipher = "AES-GCM-128"
|
||||
hash_function = "SHA256"
|
||||
|
||||
# FIXME: The blob data should contain more things that just the transaction. Leaving like this for now.
|
||||
blob = Blob(tx, cipher, hash_function)
|
||||
|
||||
# FIXME: tx_id should not be necessary (can be derived from tx SegWit-like). Passing it for now
|
||||
encrypted_blob = blob.encrypt(tx_id, debug, logging)
|
||||
|
||||
appointment = {"locator": locator, "start_time": start_block, "end_time": end_block,
|
||||
"dispute_delta": dispute_delta, "encrypted_blob": encrypted_blob, "cipher": cipher, "hash_function":
|
||||
hash_function}
|
||||
|
||||
return appointment
|
||||
|
||||
|
||||
def check_txid_format(txid):
|
||||
if len(txid) != 64:
|
||||
raise Exception("txid does not matches the expected size (32-byte / 64 hex chars).")
|
||||
|
||||
return re.search(r'^[0-9A-Fa-f]+$', txid) is not None
|
||||
|
||||
|
||||
def show_usage(show_and_quit=False):
|
||||
print("USAGE: "
|
||||
"\n\tpython pisa-cli.py [global options] command [command options] [arguments]"
|
||||
"\n\nCOMMANDS:"
|
||||
"\n\tadd_appointment \tRegisters a json formatted appointment to the PISA server."
|
||||
"\n\tget_appointment \tGets json formatted data about an appointment from the PISA server."
|
||||
"\n\thelp \t\t\tShows a list of commands or help for a specific command."
|
||||
|
||||
"\n\nGLOBAL OPTIONS:"
|
||||
"\n\t-s, --server \tAPI server where to send the requests. Defaults to localhost (modifiable in __init__.py)"
|
||||
"\n\t-p, --port \tAPI port where to send the requests. Defaults to 9814 (modifiable in __init__.py)"
|
||||
"\n\t-d, --debug \tshows debug information and stores it in pisa.log"
|
||||
"\n\t-h --help \tshows this message.")
|
||||
|
||||
if show_and_quit:
|
||||
exit(-1)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
debug = False
|
||||
command = None
|
||||
appointment_data = None
|
||||
pisa_api_server = DEFAULT_PISA_API_SERVER
|
||||
pisa_api_port = DEFAULT_PISA_API_PORT
|
||||
commands = ['add_appointment', 'get_appointment', 'help']
|
||||
|
||||
try:
|
||||
opts, args = getopt(argv[1:], 's:p:dh', ['server', 'port', 'debug', 'help'])
|
||||
|
||||
for opt, arg in opts:
|
||||
if opt in ['-s', 'server']:
|
||||
if arg:
|
||||
pisa_api_server = arg
|
||||
|
||||
if opt in ['-p', '--port']:
|
||||
if arg:
|
||||
pisa_api_port = int(arg)
|
||||
|
||||
if opt in ['-d', '--debug']:
|
||||
debug = True
|
||||
|
||||
if opt in ['-h', '--help']:
|
||||
show_usage()
|
||||
|
||||
if args:
|
||||
command = args.pop(0)
|
||||
|
||||
if command in commands:
|
||||
if command == 'add_appointment':
|
||||
appointment_data = add_appointment(args)
|
||||
|
||||
elif command == 'get_appointment':
|
||||
pass
|
||||
|
||||
elif command == 'help':
|
||||
if args:
|
||||
command = args.pop(0)
|
||||
|
||||
if command == 'add_appointment':
|
||||
pass
|
||||
|
||||
elif command == "get_appointment":
|
||||
pass
|
||||
|
||||
else:
|
||||
sys.exit("Unknown command. Use help to check the list of available commands.")
|
||||
else:
|
||||
show_usage()
|
||||
|
||||
else:
|
||||
sys.exit("Unknown command. Use help to check the list of available commands.")
|
||||
|
||||
except GetoptError as e:
|
||||
print(e)
|
||||
except json.JSONDecodeError as e:
|
||||
print('Non-JSON encoded appointment passed as parameter.')
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO, handlers=[
|
||||
logging.FileHandler(CLIENT_LOG_FILE),
|
||||
logging.StreamHandler()
|
||||
])
|
||||
|
||||
|
||||
Reference in New Issue
Block a user