From f0150ce585c7f8a363ed12e46d2eb42206a6087e Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Wed, 4 Dec 2019 17:46:07 +0100 Subject: [PATCH] Removes hash/cipher configuration and changes AESGCM128 for CHACHA20POLY1305 Updates tests accordingly --- apps/cli/__init__.py | 4 -- apps/cli/blob.py | 41 ++++------------- apps/cli/pisa_cli.py | 14 +++--- pisa/api.py | 2 +- pisa/appointment.py | 28 ++---------- pisa/cryptographer.py | 15 +++---- pisa/encrypted_blob.py | 17 +------ pisa/errors.py | 2 - pisa/inspector.py | 54 +--------------------- pisa/responder.py | 4 +- pisa/sample_conf.py | 4 -- pisa/watcher.py | 4 +- test/unit/conftest.py | 13 ++---- test/unit/test_api.py | 1 + test/unit/test_appointment.py | 16 ------- test/unit/test_blob.py | 77 ++------------------------------ test/unit/test_cleaner.py | 2 +- test/unit/test_cryptographer.py | 26 ++++++++++- test/unit/test_db_manager.py | 2 - test/unit/test_encrypted_blob.py | 18 -------- test/unit/test_inspector.py | 66 ++++----------------------- test/unit/test_watcher.py | 19 ++++---- 22 files changed, 78 insertions(+), 351 deletions(-) diff --git a/apps/cli/__init__.py b/apps/cli/__init__.py index 58be430..0e044d5 100644 --- a/apps/cli/__init__.py +++ b/apps/cli/__init__.py @@ -9,10 +9,6 @@ DEFAULT_PISA_API_PORT = 9814 CLIENT_LOG_FILE = "pisa-cli.log" APPOINTMENTS_FOLDER_NAME = "appointments" -# CRYPTO -SUPPORTED_HASH_FUNCTIONS = ["SHA256"] -SUPPORTED_CIPHERS = ["AES-GCM-128"] - CLI_PUBLIC_KEY = "cli_pk.pem" CLI_PRIVATE_KEY = "cli_sk.pem" PISA_PUBLIC_KEY = "pisa_pk.pem" diff --git a/apps/cli/blob.py b/apps/cli/blob.py index 27e1d38..4ec3e0f 100644 --- a/apps/cli/blob.py +++ b/apps/cli/blob.py @@ -1,34 +1,17 @@ import re from hashlib import sha256 from binascii import hexlify, unhexlify -from cryptography.hazmat.primitives.ciphers.aead import AESGCM +from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305 -from apps.cli import SUPPORTED_HASH_FUNCTIONS, SUPPORTED_CIPHERS from apps.cli import logger class Blob: - def __init__(self, data, cipher, hash_function): + def __init__(self, data): if type(data) is not str or re.search(r"^[0-9A-Fa-f]+$", data) is None: raise ValueError("Non-Hex character found in txid.") self.data = data - self.cipher = cipher - self.hash_function = hash_function - - # FIXME: We only support SHA256 for now - if self.hash_function.upper() not in SUPPORTED_HASH_FUNCTIONS: - raise ValueError( - "Hash function not supported ({}). Supported Hash functions: {}".format( - self.hash_function, SUPPORTED_HASH_FUNCTIONS - ) - ) - - # FIXME: We only support AES-GCM-128 for now - if self.cipher.upper() not in SUPPORTED_CIPHERS: - raise ValueError( - "Cipher not supported ({}). Supported ciphers: {}".format(self.hash_function, SUPPORTED_CIPHERS) - ) def encrypt(self, tx_id): if len(tx_id) != 64: @@ -40,26 +23,18 @@ class Blob: # Transaction to be encrypted # FIXME: The blob data should contain more things that just the transaction. Leaving like this for now. tx = unhexlify(self.data) - tx_id = unhexlify(tx_id) - # master_key = H(tx_id | tx_id) - master_key = sha256(tx_id + tx_id).digest() - - # The 16 MSB of the master key will serve as the AES GCM 128 secret key. The 16 LSB will serve as the IV. - sk = master_key[:16] - nonce = master_key[16:] + # sk is the H(txid) (32-byte) and nonce is set to 0 (12-byte) + sk = sha256(unhexlify(tx_id)).digest() + nonce = bytearray(12) # Encrypt the data - aesgcm = AESGCM(sk) - encrypted_blob = aesgcm.encrypt(nonce=nonce, data=tx, associated_data=None) + cipher = ChaCha20Poly1305(sk) + encrypted_blob = cipher.encrypt(nonce=nonce, data=tx, associated_data=None) encrypted_blob = hexlify(encrypted_blob).decode() logger.info( - "Creating new blob", - master_key=hexlify(master_key).decode(), - sk=hexlify(sk).decode(), - nonce=hexlify(nonce).decode(), - encrypted_blob=encrypted_blob, + "Creating new blob", sk=hexlify(sk).decode(), nonce=hexlify(nonce).decode(), encrypted_blob=encrypted_blob ) return encrypted_blob diff --git a/apps/cli/pisa_cli.py b/apps/cli/pisa_cli.py index 12213dd..7e0fc88 100644 --- a/apps/cli/pisa_cli.py +++ b/apps/cli/pisa_cli.py @@ -5,7 +5,6 @@ import json import requests import time from sys import argv -from hashlib import sha256 from binascii import hexlify, unhexlify from getopt import getopt, GetoptError from requests import ConnectTimeout, ConnectionError @@ -91,6 +90,10 @@ def load_private_key(sk_pem): raise ValueError("Could not deserialize the private key (unsupported algorithm).") +def compute_locator(tx_id): + return tx_id[:32] + + # returning True or False accordingly. def is_appointment_signature_valid(appointment, signature, pk): try: @@ -308,13 +311,10 @@ def get_appointment(args): def build_appointment(tx, tx_id, start_time, end_time, dispute_delta): - locator = sha256(unhexlify(tx_id)).hexdigest() - - cipher = "AES-GCM-128" - hash_function = "SHA256" + locator = compute_locator(tx_id) # FIXME: The blob data should contain more things that just the transaction. Leaving like this for now. - blob = Blob(tx, cipher, hash_function) + blob = Blob(tx) encrypted_blob = blob.encrypt(tx_id) appointment = { @@ -323,8 +323,6 @@ def build_appointment(tx, tx_id, start_time, end_time, dispute_delta): "end_time": end_time, "dispute_delta": dispute_delta, "encrypted_blob": encrypted_blob, - "cipher": cipher, - "hash_function": hash_function, } return appointment diff --git a/pisa/api.py b/pisa/api.py index a7c79b2..8f1b1a1 100644 --- a/pisa/api.py +++ b/pisa/api.py @@ -79,7 +79,7 @@ def get_appointment(): response = [] # ToDo: #15-add-system-monitor - if not isinstance(locator, str) or len(locator) != 64: + if not isinstance(locator, str) or len(locator) != 32: response.append({"locator": locator, "status": "not_found"}) return jsonify(response) diff --git a/pisa/appointment.py b/pisa/appointment.py index 61bd2e3..aa56e3d 100644 --- a/pisa/appointment.py +++ b/pisa/appointment.py @@ -6,16 +6,12 @@ from pisa.encrypted_blob import EncryptedBlob # Basic appointment structure class Appointment: # DISCUSS: 35-appointment-checks - def __init__( - self, locator, start_time, end_time, dispute_delta, encrypted_blob, cipher, hash_function, triggered=False - ): + def __init__(self, locator, start_time, end_time, dispute_delta, encrypted_blob, triggered=False): self.locator = locator self.start_time = start_time # ToDo: #4-standardize-appointment-fields self.end_time = end_time # ToDo: #4-standardize-appointment-fields self.dispute_delta = dispute_delta self.encrypted_blob = EncryptedBlob(encrypted_blob) - self.cipher = cipher - self.hash_function = hash_function self.triggered = triggered @classmethod @@ -25,30 +21,14 @@ class Appointment: end_time = appointment_data.get("end_time") # ToDo: #4-standardize-appointment-fields dispute_delta = appointment_data.get("dispute_delta") encrypted_blob_data = appointment_data.get("encrypted_blob") - cipher = appointment_data.get("cipher") - hash_function = appointment_data.get("hash_function") triggered = True if appointment_data.get("triggered") is True else False - if any( - v is None - for v in [ - locator, - start_time, - end_time, - dispute_delta, - encrypted_blob_data, - cipher, - hash_function, - triggered, - ] - ): + if any(v is None for v in [locator, start_time, end_time, dispute_delta, encrypted_blob_data, triggered]): raise ValueError("Wrong appointment data, some fields are missing") else: - appointment = cls( - locator, start_time, end_time, dispute_delta, encrypted_blob_data, cipher, hash_function, triggered - ) + appointment = cls(locator, start_time, end_time, dispute_delta, encrypted_blob_data, triggered) return appointment @@ -60,8 +40,6 @@ class Appointment: "end_time": self.end_time, "dispute_delta": self.dispute_delta, "encrypted_blob": self.encrypted_blob.data, - "cipher": self.cipher, - "hash_function": self.hash_function, "triggered": self.triggered, } diff --git a/pisa/cryptographer.py b/pisa/cryptographer.py index 0dd507a..79da257 100644 --- a/pisa/cryptographer.py +++ b/pisa/cryptographer.py @@ -1,7 +1,7 @@ from hashlib import sha256 from binascii import unhexlify, hexlify from cryptography.exceptions import InvalidTag -from cryptography.hazmat.primitives.ciphers.aead import AESGCM +from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305 from pisa.logger import Logger @@ -23,24 +23,19 @@ class Cryptographer: ) return None - # master_key = H(tx_id | tx_id) - key = unhexlify(key) - master_key = sha256(key + key).digest() - - # The 16 MSB of the master key will serve as the AES GCM 128 secret key. The 16 LSB will serve as the IV. - sk = master_key[:16] - nonce = master_key[16:] + # sk is the H(txid) (32-byte) and nonce is set to 0 (12-byte) + sk = sha256(unhexlify(key)).digest() + nonce = bytearray(12) logger.info( "Creating new blob.", - master_key=hexlify(master_key).decode(), sk=hexlify(sk).decode(), nonce=hexlify(nonce).decode(), encrypted_blob=encrypted_blob.data, ) # Decrypt - cipher = AESGCM(sk) + cipher = ChaCha20Poly1305(sk) data = unhexlify(encrypted_blob.data.encode()) try: diff --git a/pisa/encrypted_blob.py b/pisa/encrypted_blob.py index 36f51f7..945b49c 100644 --- a/pisa/encrypted_blob.py +++ b/pisa/encrypted_blob.py @@ -1,20 +1,5 @@ -from pisa.conf import SUPPORTED_CIPHERS, SUPPORTED_HASH_FUNCTIONS - - class EncryptedBlob: - def __init__(self, data, cipher="AES-GCM-128", hash_function="SHA256"): - if cipher in SUPPORTED_CIPHERS: - self.cipher = cipher - - else: - raise ValueError("Cipher not supported") - - if hash_function in SUPPORTED_HASH_FUNCTIONS: - self.hash_function = hash_function - - else: - raise ValueError("Hash function not supported") - + def __init__(self, data): self.data = data def __eq__(self, other): diff --git a/pisa/errors.py b/pisa/errors.py index 275e097..9cb5172 100644 --- a/pisa/errors.py +++ b/pisa/errors.py @@ -6,8 +6,6 @@ APPOINTMENT_WRONG_FIELD_FORMAT = -4 APPOINTMENT_FIELD_TOO_SMALL = -5 APPOINTMENT_FIELD_TOO_BIG = -6 APPOINTMENT_WRONG_FIELD = -7 -APPOINTMENT_CIPHER_NOT_SUPPORTED = -8 -APPOINTMENT_HASH_FUNCTION_NOT_SUPPORTED = -9 APPOINTMENT_INVALID_SIGNATURE = -10 # Custom RPC errors diff --git a/pisa/inspector.py b/pisa/inspector.py index 56b2ac6..81125a7 100644 --- a/pisa/inspector.py +++ b/pisa/inspector.py @@ -37,10 +37,6 @@ class Inspector: rcode, message = self.check_delta(appt.get("dispute_delta")) if rcode == 0: rcode, message = self.check_blob(appt.get("encrypted_blob")) - if rcode == 0: - rcode, message = self.check_cipher(appt.get("cipher")) - if rcode == 0: - rcode, message = self.check_hash_function(appt.get("hash_function")) if rcode == 0: rcode, message = self.check_appointment_signature(appt, signature, public_key) @@ -68,7 +64,7 @@ class Inspector: rcode = errors.APPOINTMENT_WRONG_FIELD_TYPE message = "wrong locator data type ({})".format(type(locator)) - elif len(locator) != 64: + elif len(locator) != 32: rcode = errors.APPOINTMENT_WRONG_FIELD_SIZE message = "wrong locator size ({})".format(len(locator)) # TODO: #12-check-txid-regexp @@ -200,54 +196,6 @@ class Inspector: return rcode, message - @staticmethod - def check_cipher(cipher): - message = None - rcode = 0 - - t = type(cipher) - - if cipher is None: - rcode = errors.APPOINTMENT_EMPTY_FIELD - message = "empty cipher received" - - elif t != str: - rcode = errors.APPOINTMENT_WRONG_FIELD_TYPE - message = "wrong cipher data type ({})".format(t) - - elif cipher.upper() not in conf.SUPPORTED_CIPHERS: - rcode = errors.APPOINTMENT_CIPHER_NOT_SUPPORTED - message = "cipher not supported: {}".format(cipher) - - if message is not None: - logger.error(message) - - return rcode, message - - @staticmethod - def check_hash_function(hash_function): - message = None - rcode = 0 - - t = type(hash_function) - - if hash_function is None: - rcode = errors.APPOINTMENT_EMPTY_FIELD - message = "empty hash_function received" - - elif t != str: - rcode = errors.APPOINTMENT_WRONG_FIELD_TYPE - message = "wrong hash_function data type ({})".format(t) - - elif hash_function.upper() not in conf.SUPPORTED_HASH_FUNCTIONS: - rcode = errors.APPOINTMENT_HASH_FUNCTION_NOT_SUPPORTED - message = "hash_function not supported {}".format(hash_function) - - if message is not None: - logger.error(message) - - return rcode, message - @staticmethod # Verifies that the appointment signature is a valid signature with public key def check_appointment_signature(appointment, signature, pk_pem): diff --git a/pisa/responder.py b/pisa/responder.py index 2cc00ff..fd7e6fc 100644 --- a/pisa/responder.py +++ b/pisa/responder.py @@ -1,8 +1,6 @@ import json from queue import Queue -from hashlib import sha256 from threading import Thread -from binascii import unhexlify from pisa.logger import Logger from pisa.cleaner import Cleaner @@ -25,7 +23,7 @@ class Job: # FIXME: locator is here so we can give info about jobs for now. It can be either passed from watcher or info # can be directly got from DB - self.locator = sha256(unhexlify(dispute_txid)).hexdigest() + self.locator = dispute_txid[:32] @classmethod def from_dict(cls, job_data): diff --git a/pisa/sample_conf.py b/pisa/sample_conf.py index e975490..fafe0ab 100644 --- a/pisa/sample_conf.py +++ b/pisa/sample_conf.py @@ -23,9 +23,5 @@ CLIENT_LOG_FILE = "pisa.log" # TEST TEST_LOG_FILE = "test.log" -# CRYPTO -SUPPORTED_HASH_FUNCTIONS = ["SHA256"] -SUPPORTED_CIPHERS = ["AES-GCM-128"] - # LEVELDB DB_PATH = "appointments" diff --git a/pisa/watcher.py b/pisa/watcher.py index d4c6cc2..a451abb 100644 --- a/pisa/watcher.py +++ b/pisa/watcher.py @@ -1,8 +1,6 @@ from uuid import uuid4 from queue import Queue -from hashlib import sha256 from threading import Thread -from binascii import unhexlify from cryptography.hazmat.primitives import hashes from cryptography.hazmat.backends import default_backend @@ -42,7 +40,7 @@ class Watcher: @staticmethod def compute_locator(tx_id): - return sha256(unhexlify(tx_id)).hexdigest() + return tx_id[:32] def sign_appointment(self, appointment): data = appointment.serialize() diff --git a/test/unit/conftest.py b/test/unit/conftest.py index cb3b912..49f8d5f 100644 --- a/test/unit/conftest.py +++ b/test/unit/conftest.py @@ -5,8 +5,7 @@ import requests from time import sleep from shutil import rmtree from threading import Thread -from hashlib import sha256 -from binascii import hexlify, unhexlify +from binascii import hexlify from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes @@ -15,6 +14,7 @@ from cryptography.hazmat.primitives import serialization from apps.cli.blob import Blob from pisa.responder import Job +from pisa.watcher import Watcher from pisa.tools import bitcoin_cli from pisa.db_manager import DBManager from pisa.appointment import Appointment @@ -99,9 +99,6 @@ def generate_dummy_appointment_data(real_height=True, start_time_offset=5, end_t "dispute_delta": 20, } - cipher = "AES-GCM-128" - hash_function = "SHA256" - # dummy keys for this test client_sk = ec.generate_private_key(ec.SECP256K1, default_backend()) client_pk = ( @@ -110,8 +107,8 @@ def generate_dummy_appointment_data(real_height=True, start_time_offset=5, end_t .decode("utf-8") ) - locator = sha256(unhexlify(dispute_txid)).hexdigest() - blob = Blob(dummy_appointment_data.get("tx"), cipher, hash_function) + locator = Watcher.compute_locator(dispute_txid) + blob = Blob(dummy_appointment_data.get("tx")) encrypted_blob = blob.encrypt((dummy_appointment_data.get("tx_id"))) @@ -121,8 +118,6 @@ def generate_dummy_appointment_data(real_height=True, start_time_offset=5, end_t "end_time": dummy_appointment_data.get("end_time"), "dispute_delta": dummy_appointment_data.get("dispute_delta"), "encrypted_blob": encrypted_blob, - "cipher": cipher, - "hash_function": hash_function, } signature = sign_appointment(client_sk, appointment_data) diff --git a/test/unit/test_api.py b/test/unit/test_api.py index f973c06..73d390a 100644 --- a/test/unit/test_api.py +++ b/test/unit/test_api.py @@ -147,6 +147,7 @@ def test_request_appointment_watcher(new_appt_data): appointment_status = [appointment.pop("status") for appointment in received_appointments] # Check that the appointment is within the received appoints + print("AAA", new_appt_data["appointment"], received_appointments) assert new_appt_data["appointment"] in received_appointments # Check that all the appointments are being watched diff --git a/test/unit/test_appointment.py b/test/unit/test_appointment.py index 329ef10..c50badb 100644 --- a/test/unit/test_appointment.py +++ b/test/unit/test_appointment.py @@ -17,8 +17,6 @@ def appointment_data(): end_time = 120 dispute_delta = 20 encrypted_blob_data = get_random_value_hex(100) - cipher = "AES-GCM-128" - hash_function = "SHA256" return { "locator": locator, @@ -26,8 +24,6 @@ def appointment_data(): "end_time": end_time, "dispute_delta": dispute_delta, "encrypted_blob": encrypted_blob_data, - "cipher": cipher, - "hash_function": hash_function, } @@ -42,8 +38,6 @@ def test_init_appointment(appointment_data): appointment_data["end_time"], appointment_data["dispute_delta"], appointment_data["encrypted_blob"], - appointment_data["cipher"], - appointment_data["hash_function"], ) assert ( @@ -52,8 +46,6 @@ def test_init_appointment(appointment_data): and appointment_data["end_time"] == appointment.end_time and appointment_data["dispute_delta"] == appointment.dispute_delta and EncryptedBlob(appointment_data["encrypted_blob"]) == appointment.encrypted_blob - and appointment_data["cipher"] == appointment.cipher - and appointment_data["hash_function"] == appointment.hash_function ) @@ -64,8 +56,6 @@ def test_to_dict(appointment_data): appointment_data["end_time"], appointment_data["dispute_delta"], appointment_data["encrypted_blob"], - appointment_data["cipher"], - appointment_data["hash_function"], ) dict_appointment = appointment.to_dict() @@ -76,8 +66,6 @@ def test_to_dict(appointment_data): and appointment_data["end_time"] == dict_appointment["end_time"] and appointment_data["dispute_delta"] == dict_appointment["dispute_delta"] and EncryptedBlob(appointment_data["encrypted_blob"]) == EncryptedBlob(dict_appointment["encrypted_blob"]) - and appointment_data["cipher"] == dict_appointment["cipher"] - and appointment_data["hash_function"] == dict_appointment["hash_function"] ) @@ -88,8 +76,6 @@ def test_to_json(appointment_data): appointment_data["end_time"], appointment_data["dispute_delta"], appointment_data["encrypted_blob"], - appointment_data["cipher"], - appointment_data["hash_function"], ) dict_appointment = json.loads(appointment.to_json()) @@ -100,8 +86,6 @@ def test_to_json(appointment_data): and appointment_data["end_time"] == dict_appointment["end_time"] and appointment_data["dispute_delta"] == dict_appointment["dispute_delta"] and EncryptedBlob(appointment_data["encrypted_blob"]) == EncryptedBlob(dict_appointment["encrypted_blob"]) - and appointment_data["cipher"] == dict_appointment["cipher"] - and appointment_data["hash_function"] == dict_appointment["hash_function"] ) diff --git a/test/unit/test_blob.py b/test/unit/test_blob.py index fbb0add..21b7bf6 100644 --- a/test/unit/test_blob.py +++ b/test/unit/test_blob.py @@ -3,88 +3,19 @@ from binascii import unhexlify from pisa import c_logger from apps.cli.blob import Blob from test.unit.conftest import get_random_value_hex -from pisa.conf import SUPPORTED_CIPHERS, SUPPORTED_HASH_FUNCTIONS c_logger.disabled = True def test_init_blob(): data = get_random_value_hex(64) + blob = Blob(data) + assert isinstance(blob, Blob) - # Fixed (valid) hash function, try different valid ciphers - hash_function = SUPPORTED_HASH_FUNCTIONS[0] - for cipher in SUPPORTED_CIPHERS: - cipher_cases = [cipher, cipher.lower(), cipher.capitalize()] - - for case in cipher_cases: - blob = Blob(data, case, hash_function) - assert blob.data == data and blob.cipher == case and blob.hash_function == hash_function - - # Fixed (valid) cipher, try different valid hash functions - cipher = SUPPORTED_CIPHERS[0] - for hash_function in SUPPORTED_HASH_FUNCTIONS: - hash_function_cases = [hash_function, hash_function.lower(), hash_function.capitalize()] - - for case in hash_function_cases: - blob = Blob(data, cipher, case) - assert blob.data == data and blob.cipher == cipher and blob.hash_function == case - - # Invalid data - data = unhexlify(get_random_value_hex(64)) - cipher = SUPPORTED_CIPHERS[0] - hash_function = SUPPORTED_HASH_FUNCTIONS[0] - + # Wrong data try: - Blob(data, cipher, hash_function) + Blob(unhexlify(get_random_value_hex(64))) assert False, "Able to create blob with wrong data" except ValueError: assert True - - # Invalid cipher - data = get_random_value_hex(64) - cipher = "A" * 10 - hash_function = SUPPORTED_HASH_FUNCTIONS[0] - - try: - Blob(data, cipher, hash_function) - assert False, "Able to create blob with wrong data" - - except ValueError: - assert True - - # Invalid hash function - data = get_random_value_hex(64) - cipher = SUPPORTED_CIPHERS[0] - hash_function = "A" * 10 - - try: - Blob(data, cipher, hash_function) - assert False, "Able to create blob with wrong data" - - except ValueError: - assert True - - -def test_encrypt(): - # Valid data, valid key - data = get_random_value_hex(64) - blob = Blob(data, SUPPORTED_CIPHERS[0], SUPPORTED_HASH_FUNCTIONS[0]) - key = get_random_value_hex(32) - - encrypted_blob = blob.encrypt(key) - - # Invalid key (note that encrypt cannot be called with invalid data since that's checked when the Blob is created) - invalid_key = unhexlify(get_random_value_hex(32)) - - try: - blob.encrypt(invalid_key) - assert False, "Able to create encrypt with invalid key" - - except ValueError: - assert True - - # Check that two encryptions of the same data have the same result - encrypted_blob2 = blob.encrypt(key) - - assert encrypted_blob == encrypted_blob2 and id(encrypted_blob) != id(encrypted_blob2) diff --git a/test/unit/test_cleaner.py b/test/unit/test_cleaner.py index 8b08df1..58cbeb9 100644 --- a/test/unit/test_cleaner.py +++ b/test/unit/test_cleaner.py @@ -25,7 +25,7 @@ def set_up_appointments(db_manager, total_appointments): uuid = uuid4().hex locator = get_random_value_hex(32) - appointment = Appointment(locator, None, None, None, None, None, None) + appointment = Appointment(locator, None, None, None, None, None) appointments[uuid] = appointment locator_uuid_map[locator] = [uuid] diff --git a/test/unit/test_cryptographer.py b/test/unit/test_cryptographer.py index 09b862a..331fcca 100644 --- a/test/unit/test_cryptographer.py +++ b/test/unit/test_cryptographer.py @@ -6,7 +6,7 @@ from test.unit.conftest import get_random_value_hex data = "6097cdf52309b1b2124efeed36bd34f46dc1c25ad23ac86f28380f746254f777" key = "b2e984a570f6f49bc38ace178e09147b0aa296cbb7c92eb01412f7e2d07b5659" -encrypted_data = "092e93d4a34aac4367075506f2c050ddfa1a201ee6669b65058572904dcea642aeb01ea4b57293618e8c46809dfadadc" +encrypted_data = "8f31028097a8bf12a92e088caab5cf3fcddf0d35ed2b72c24b12269373efcdea04f9d2a820adafe830c20ff132d89810" encrypted_blob = EncryptedBlob(encrypted_data) @@ -49,3 +49,27 @@ def test_decrypt_wrong_return(): except ValueError: assert True + + +# def test_encrypt(): +# # Valid data, valid key +# data = get_random_value_hex(64) +# blob = Blob(data, SUPPORTED_CIPHERS[0], SUPPORTED_HASH_FUNCTIONS[0]) +# key = get_random_value_hex(32) +# +# encrypted_blob = blob.encrypt(key) +# +# # Invalid key (note that encrypt cannot be called with invalid data since that's checked when the Blob is created) +# invalid_key = unhexlify(get_random_value_hex(32)) +# +# try: +# blob.encrypt(invalid_key) +# assert False, "Able to create encrypt with invalid key" +# +# except ValueError: +# assert True +# +# # Check that two encryptions of the same data have the same result +# encrypted_blob2 = blob.encrypt(key) +# +# assert encrypted_blob == encrypted_blob2 and id(encrypted_blob) != id(encrypted_blob2) diff --git a/test/unit/test_db_manager.py b/test/unit/test_db_manager.py index 3507f5d..b1ae2bb 100644 --- a/test/unit/test_db_manager.py +++ b/test/unit/test_db_manager.py @@ -40,7 +40,6 @@ def test_init(): # Check that the db can be created if it does not exist db_manager = open_create_db(db_path) assert isinstance(db_manager, DBManager) - print(type(db_manager)) db_manager.db.close() # Check that we can open an already create db @@ -188,7 +187,6 @@ def test_delete_locator_map(db_manager): assert len(locator_maps) != 0 for locator, uuids in locator_maps.items(): - print(locator) db_manager.delete_locator_map(locator) locator_maps = db_manager.load_appointments_db(prefix=LOCATOR_MAP_PREFIX) diff --git a/test/unit/test_encrypted_blob.py b/test/unit/test_encrypted_blob.py index 98119c4..33b4ece 100644 --- a/test/unit/test_encrypted_blob.py +++ b/test/unit/test_encrypted_blob.py @@ -11,24 +11,6 @@ def test_init_encrypted_blob(): assert EncryptedBlob(data).data == data -def test_init_encrypted_blob_wrong_cipher(): - try: - EncryptedBlob(get_random_value_hex(64), cipher="") - assert False - - except ValueError: - assert True - - -def test_init_encrypted_blob_wrong_hash_function(): - try: - EncryptedBlob(get_random_value_hex(64), hash_function="") - assert False - - except ValueError: - assert True - - def test_equal(): data = get_random_value_hex(64) e_blob1 = EncryptedBlob(data) diff --git a/test/unit/test_inspector.py b/test/unit/test_inspector.py index 297642c..500c146 100644 --- a/test/unit/test_inspector.py +++ b/test/unit/test_inspector.py @@ -13,16 +13,16 @@ from pisa.appointment import Appointment from pisa.block_processor import BlockProcessor from test.unit.conftest import get_random_value_hex -from pisa.conf import MIN_DISPUTE_DELTA, SUPPORTED_CIPHERS, SUPPORTED_HASH_FUNCTIONS +from pisa.conf import MIN_DISPUTE_DELTA c_logger.disabled = True inspector = Inspector() APPOINTMENT_OK = (0, None) -NO_HEX_STRINGS = ["R" * 64, get_random_value_hex(31) + "PP", "$" * 64, " " * 64] -WRONG_TYPES = [[], "", get_random_value_hex(32), 3.2, 2.0, (), object, {}, " " * 32, object()] -WRONG_TYPES_NO_STR = [[], unhexlify(get_random_value_hex(32)), 3.2, 2.0, (), object, {}, object()] +NO_HEX_STRINGS = ["R" * 32, get_random_value_hex(15) + "PP", "$" * 32, " " * 32] +WRONG_TYPES = [[], "", get_random_value_hex(16), 3.2, 2.0, (), object, {}, " " * 32, object()] +WRONG_TYPES_NO_STR = [[], unhexlify(get_random_value_hex(16)), 3.2, 2.0, (), object, {}, object()] def sign_appointment(sk, appointment): @@ -32,15 +32,15 @@ def sign_appointment(sk, appointment): def test_check_locator(): # Right appointment type, size and format - locator = get_random_value_hex(32) + locator = get_random_value_hex(16) assert Inspector.check_locator(locator) == APPOINTMENT_OK # Wrong size (too big) - locator = get_random_value_hex(33) + locator = get_random_value_hex(17) assert Inspector.check_locator(locator)[0] == APPOINTMENT_WRONG_FIELD_SIZE # Wrong size (too small) - locator = get_random_value_hex(31) + locator = get_random_value_hex(15) assert Inspector.check_locator(locator)[0] == APPOINTMENT_WRONG_FIELD_SIZE # Empty @@ -157,50 +157,6 @@ def test_check_blob(): assert Inspector.check_blob(encrypted_blob)[0] == APPOINTMENT_WRONG_FIELD_FORMAT -def test_check_cipher(): - # Right format and content (any case combination should be accepted) - for cipher in SUPPORTED_CIPHERS: - cipher_cases = [cipher, cipher.lower(), cipher.capitalize()] - for case in cipher_cases: - assert Inspector.check_cipher(case) == APPOINTMENT_OK - - # Wrong type - ciphers = WRONG_TYPES_NO_STR - for cipher in ciphers: - assert Inspector.check_cipher(cipher)[0] == APPOINTMENT_WRONG_FIELD_TYPE - - # Wrong value - ciphers = NO_HEX_STRINGS - for cipher in ciphers: - assert Inspector.check_cipher(cipher)[0] == APPOINTMENT_CIPHER_NOT_SUPPORTED - - # Empty field - cipher = None - assert Inspector.check_cipher(cipher)[0] == APPOINTMENT_EMPTY_FIELD - - -def test_check_hash_function(): - # Right format and content (any case combination should be accepted) - for hash_function in SUPPORTED_HASH_FUNCTIONS: - hash_function_cases = [hash_function, hash_function.lower(), hash_function.capitalize()] - for case in hash_function_cases: - assert Inspector.check_hash_function(case) == APPOINTMENT_OK - - # Wrong type - hash_functions = WRONG_TYPES_NO_STR - for hash_function in hash_functions: - assert Inspector.check_hash_function(hash_function)[0] == APPOINTMENT_WRONG_FIELD_TYPE - - # Wrong value - hash_functions = NO_HEX_STRINGS - for hash_function in hash_functions: - assert Inspector.check_hash_function(hash_function)[0] == APPOINTMENT_HASH_FUNCTION_NOT_SUPPORTED - - # Empty field - hash_function = None - assert Inspector.check_hash_function(hash_function)[0] == APPOINTMENT_EMPTY_FIELD - - def test_check_appointment_signature(generate_keypair): client_sk, client_pk = generate_keypair @@ -240,13 +196,11 @@ def test_inspect(run_bitcoind, generate_keypair): assert type(appointment) == tuple and appointment[0] != 0 # Valid appointment - locator = get_random_value_hex(32) + locator = get_random_value_hex(16) start_time = BlockProcessor.get_block_count() + 5 end_time = start_time + 20 dispute_delta = MIN_DISPUTE_DELTA encrypted_blob = get_random_value_hex(64) - cipher = SUPPORTED_CIPHERS[0] - hash_function = SUPPORTED_HASH_FUNCTIONS[0] appointment_data = { "locator": locator, @@ -254,8 +208,6 @@ def test_inspect(run_bitcoind, generate_keypair): "end_time": end_time, "dispute_delta": dispute_delta, "encrypted_blob": encrypted_blob, - "cipher": cipher, - "hash_function": hash_function, } signature = sign_appointment(client_sk, appointment_data) @@ -269,6 +221,4 @@ def test_inspect(run_bitcoind, generate_keypair): and appointment.end_time == end_time and appointment.dispute_delta == dispute_delta and appointment.encrypted_blob.data == encrypted_blob - and appointment.cipher == cipher - and appointment.hash_function == hash_function ) diff --git a/test/unit/test_watcher.py b/test/unit/test_watcher.py index 7ab1ed1..bbcdbb8 100644 --- a/test/unit/test_watcher.py +++ b/test/unit/test_watcher.py @@ -1,8 +1,6 @@ import pytest from uuid import uuid4 -from hashlib import sha256 from threading import Thread -from binascii import unhexlify from queue import Queue, Empty from cryptography.hazmat.backends import default_backend @@ -44,7 +42,7 @@ def txids(): @pytest.fixture(scope="module") def locator_uuid_map(txids): - return {sha256(unhexlify(txid)).hexdigest(): uuid4().hex for txid in txids} + return {Watcher.compute_locator(txid): uuid4().hex for txid in txids} def create_appointments(n): @@ -232,18 +230,17 @@ def test_filter_valid_matches_random_data(watcher): def test_filter_valid_matches(watcher): dispute_txid = "0437cd7f8525ceed2324359c2d0ba26006d92d856a9c20fa0241106ee5a597c9" encrypted_blob = ( - "29f55518945408f567bb7feb4d7bb15ba88b7d8ca0223a44d5c67dfe32d038caee7613e35736025d95ad4ecd6538a50" - "74cbe8d7739705697a5dc4d19b8a6e4459ed2d1b0d0a9b18c49bc2187dcbfb4046b14d58a1add83235fc632efc398d5" - "0abcb7738f1a04b3783d025c1828b4e8a8dc8f13f2843e6bc3bf08eade02fc7e2c4dce7d2f83b055652e944ac114e0b" - "72a9abcd98fd1d785a5d976c05ed780e033e125fa083c6591b6029aa68dbc099f148a2bc2e0cb63733e68af717d48d5" - "a312b5f5b2fcca9561b2ff4191f9cdff936a43f6efef4ee45fbaf1f18d0a4b006f3fc8399dd8ecb21f709d4583bba14" - "4af6d49fa99d7be2ca21059a997475aa8642b66b921dc7fc0321b6a2f6927f6f9bab55c75e17a19dc3b2ae895b6d4a4" - "f64f8eb21b1e" + "a62aa9bb3c8591e4d5de10f1bd49db92432ce2341af55762cdc9242c08662f97f5f47da0a1aa88373508cd6e67e87eefddeca0cee98c1" + "967ec1c1ecbb4c5e8bf08aa26159214e6c0bc4b2c7c247f87e7601d15c746fc4e711be95ba0e363001280138ba9a65b06c4aa6f592b21" + "3635ee763984d522a4c225814510c8f7ab0801f36d4a68f5ee7dd3930710005074121a172c29beba79ed647ebaf7e7fab1bbd9a208251" + "ef5486feadf2c46e33a7d66adf9dbbc5f67b55a34b1b3c4909dd34a482d759b0bc25ecd2400f656db509466d7479b5b92a2fadabccc9e" + "c8918da8979a9feadea27531643210368fee494d3aaa4983e05d6cf082a49105e2f8a7c7821899239ba7dee12940acd7d8a629894b5d31" + "e94b439cfe8d2e9f21e974ae5342a70c91e8" ) dummy_appointment, _ = generate_dummy_appointment() dummy_appointment.encrypted_blob.data = encrypted_blob - dummy_appointment.locator = sha256(unhexlify(dispute_txid)).hexdigest() + dummy_appointment.locator = Watcher.compute_locator(dispute_txid) uuid = uuid4().hex appointments = {uuid: dummy_appointment}