From a8800ac375276f30d76adc695750bdfc19d2588d Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Fri, 6 Dec 2019 13:22:14 +0100 Subject: [PATCH] Integrates encryption/decryption within the Cryptographer. Close #63 Includes unittests. Also reformats test_inspector to avoid using cli functions --- apps/cli/blob.py | 26 ----- apps/cli/pisa_cli.py | 17 +-- common/cryptographer.py | 50 +++++++-- pisa/tools.py | 5 - pisa/watcher.py | 7 +- test/unit/conftest.py | 3 +- test/unit/test_cryptographer.py | 182 ++++++++++++++++++++++---------- test/unit/test_inspector.py | 24 ++--- test/unit/test_responder.py | 5 +- test/unit/test_tools.py | 42 +++++--- test/unit/test_watcher.py | 6 +- 11 files changed, 228 insertions(+), 139 deletions(-) diff --git a/apps/cli/blob.py b/apps/cli/blob.py index 4ec3e0f..0d26a48 100644 --- a/apps/cli/blob.py +++ b/apps/cli/blob.py @@ -12,29 +12,3 @@ class Blob: raise ValueError("Non-Hex character found in txid.") self.data = data - - def encrypt(self, tx_id): - if len(tx_id) != 64: - raise ValueError("txid does not matches the expected size (32-byte / 64 hex chars).") - - elif re.search(r"^[0-9A-Fa-f]+$", tx_id) is None: - raise ValueError("Non-Hex character found in txid.") - - # Transaction to be encrypted - # FIXME: The blob data should contain more things that just the transaction. Leaving like this for now. - tx = unhexlify(self.data) - - # sk is the H(txid) (32-byte) and nonce is set to 0 (12-byte) - sk = sha256(unhexlify(tx_id)).digest() - nonce = bytearray(12) - - # Encrypt the data - cipher = ChaCha20Poly1305(sk) - encrypted_blob = cipher.encrypt(nonce=nonce, data=tx, associated_data=None) - encrypted_blob = hexlify(encrypted_blob).decode() - - logger.info( - "Creating new blob", sk=hexlify(sk).decode(), nonce=hexlify(nonce).decode(), encrypted_blob=encrypted_blob - ) - - return encrypted_blob diff --git a/apps/cli/pisa_cli.py b/apps/cli/pisa_cli.py index 8692e22..251c53a 100644 --- a/apps/cli/pisa_cli.py +++ b/apps/cli/pisa_cli.py @@ -1,4 +1,3 @@ -import re import os import sys import json @@ -29,6 +28,8 @@ from apps.cli import ( ) from common.constants import LOCATOR_LEN_HEX +from common.cryptographer import Cryptographer +from common.tools import check_sha256_hex_format HTTP_OK = 200 @@ -162,7 +163,7 @@ def add_appointment(args): logger.error("The provided JSON is empty.") return False - valid_locator = check_txid_format(appointment_data.get("tx_id")) + valid_locator = check_sha256_hex_format(appointment_data.get("tx_id")) if not valid_locator: logger.error("The provided locator is not valid.") @@ -288,7 +289,7 @@ def get_appointment(args): sys.exit(help_get_appointment()) else: locator = arg_opt - valid_locator = check_txid_format(locator) + valid_locator = check_sha256_hex_format(locator) if not valid_locator: logger.error("The provided locator is not valid: {}".format(locator)) @@ -317,7 +318,7 @@ def build_appointment(tx, tx_id, start_time, end_time, dispute_delta): # FIXME: The blob data should contain more things that just the transaction. Leaving like this for now. blob = Blob(tx) - encrypted_blob = blob.encrypt(tx_id) + encrypted_blob = Cryptographer.encrypt(blob, tx_id) appointment = { "locator": locator, @@ -330,14 +331,6 @@ def build_appointment(tx, tx_id, start_time, end_time, dispute_delta): return appointment -def check_txid_format(txid): - if len(txid) != 64: - sys.exit("locator does not matches the expected size (32-byte / 64 hex chars).") - - # TODO: #12-check-txid-regexp - return re.search(r"^[0-9A-Fa-f]+$", txid) is not None - - def show_usage(): return ( "USAGE: " diff --git a/common/cryptographer.py b/common/cryptographer.py index 0256be2..5983e74 100644 --- a/common/cryptographer.py +++ b/common/cryptographer.py @@ -3,30 +3,68 @@ from binascii import unhexlify, hexlify from cryptography.exceptions import InvalidTag from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305 +from common.tools import check_sha256_hex_format + from pisa.logger import Logger logger = Logger("Cryptographer") class Cryptographer: + @staticmethod + def check_data_key_format(data, key): + if len(data) % 2: + error = "Incorrect (Odd-length) value." + logger.error(error, data=data) + raise ValueError(error) + + if not check_sha256_hex_format(key): + error = "Key must be a 32-byte hex value (64 hex chars)." + logger.error(error, key=key) + raise ValueError(error) + + return True + + @staticmethod + def encrypt(blob, key, rtype="hex"): + if rtype not in ["hex", "bytes"]: + raise ValueError("Wrong return type. Return type must be 'hex' or 'bytes'") + + Cryptographer.check_data_key_format(blob.data, key) + + # Transaction to be encrypted + # FIXME: The blob data should contain more things that just the transaction. Leaving like this for now. + tx = unhexlify(blob.data) + + # sk is the H(txid) (32-byte) and nonce is set to 0 (12-byte) + sk = sha256(unhexlify(key)).digest() + nonce = bytearray(12) + + logger.info("Encrypting blob.", sk=hexlify(sk).decode(), nonce=hexlify(nonce).decode(), blob=blob.data) + + # Encrypt the data + cipher = ChaCha20Poly1305(sk) + encrypted_blob = cipher.encrypt(nonce=nonce, data=tx, associated_data=None) + + if rtype == "hex": + encrypted_blob = hexlify(encrypted_blob).decode("utf8") + + return encrypted_blob + @staticmethod # ToDo: #20-test-tx-decrypting-edge-cases def decrypt(encrypted_blob, key, rtype="hex"): if rtype not in ["hex", "bytes"]: raise ValueError("Wrong return type. Return type must be 'hex' or 'bytes'") - if len(encrypted_blob.data) % 2: - logger.info( - "Incorrect (Odd-length) value to be decrypted.", encrypted_blob=encrypted_blob.data, dispute_txid=key - ) - return None + Cryptographer.check_data_key_format(encrypted_blob.data, key) # sk is the H(txid) (32-byte) and nonce is set to 0 (12-byte) sk = sha256(unhexlify(key)).digest() nonce = bytearray(12) logger.info( - "Creating new blob.", + "Decrypting Blob.", sk=hexlify(sk).decode(), nonce=hexlify(nonce).decode(), encrypted_blob=encrypted_blob.data, diff --git a/pisa/tools.py b/pisa/tools.py index 9222e0c..bb2174c 100644 --- a/pisa/tools.py +++ b/pisa/tools.py @@ -39,8 +39,3 @@ def in_correct_network(network): correct_network = True return correct_network - - -def check_txid_format(txid): - # TODO: #12-check-txid-regexp - return isinstance(txid, str) and re.search(r"^[0-9A-Fa-f]{64}$", txid) is not None diff --git a/pisa/watcher.py b/pisa/watcher.py index 4b2a4b7..7350a94 100644 --- a/pisa/watcher.py +++ b/pisa/watcher.py @@ -180,7 +180,12 @@ class Watcher: for locator, dispute_txid in matches.items(): for uuid in self.locator_uuid_map[locator]: - justice_rawtx = Cryptographer.decrypt(self.appointments[uuid].encrypted_blob, dispute_txid) + try: + justice_rawtx = Cryptographer.decrypt(self.appointments[uuid].encrypted_blob, dispute_txid) + + except ValueError: + justice_rawtx = None + justice_tx = BlockProcessor.decode_raw_transaction(justice_rawtx) if justice_tx is not None: diff --git a/test/unit/conftest.py b/test/unit/conftest.py index 4c32a6d..9479557 100644 --- a/test/unit/conftest.py +++ b/test/unit/conftest.py @@ -24,6 +24,7 @@ from test.simulator.transaction import TX from test.simulator.bitcoind_sim import run_simulator, HOST, PORT from common.constants import LOCATOR_LEN_HEX +from common.cryptographer import Cryptographer @pytest.fixture(scope="session") @@ -113,7 +114,7 @@ def generate_dummy_appointment_data(real_height=True, start_time_offset=5, end_t locator = Watcher.compute_locator(dispute_txid) blob = Blob(dummy_appointment_data.get("tx")) - encrypted_blob = blob.encrypt((dummy_appointment_data.get("tx_id"))) + encrypted_blob = Cryptographer.encrypt(blob, dummy_appointment_data.get("tx_id")) appointment_data = { "locator": locator, diff --git a/test/unit/test_cryptographer.py b/test/unit/test_cryptographer.py index 9c7c198..9e4d160 100644 --- a/test/unit/test_cryptographer.py +++ b/test/unit/test_cryptographer.py @@ -1,5 +1,6 @@ import binascii +from apps.cli.blob import Blob from common.cryptographer import Cryptographer from pisa.encrypted_blob import EncryptedBlob from test.unit.conftest import get_random_value_hex @@ -7,68 +8,139 @@ from test.unit.conftest import get_random_value_hex data = "6097cdf52309b1b2124efeed36bd34f46dc1c25ad23ac86f28380f746254f777" key = "b2e984a570f6f49bc38ace178e09147b0aa296cbb7c92eb01412f7e2d07b5659" encrypted_data = "8f31028097a8bf12a92e088caab5cf3fcddf0d35ed2b72c24b12269373efcdea04f9d2a820adafe830c20ff132d89810" -encrypted_blob = EncryptedBlob(encrypted_data) -def test_decrypt_wrong_data(): - random_key = get_random_value_hex(32) - random_encrypted_data = get_random_value_hex(64) - random_encrypted_blob = EncryptedBlob(random_encrypted_data) +def test_check_data_key_format_wrong_data(): + data = get_random_value_hex(64)[:-1] + key = get_random_value_hex(32) - # Trying to decrypt random data (in AES_GCM-128) should result in an InvalidTag exception. Our decrypt function - # returns None - hex_tx = Cryptographer.decrypt(random_encrypted_blob, random_key) - assert hex_tx is None - - -def test_decrypt_odd_length(): - random_key = get_random_value_hex(32) - random_encrypted_data_odd = get_random_value_hex(64)[:-1] - random_encrypted_blob_odd = EncryptedBlob(random_encrypted_data_odd) - - assert Cryptographer.decrypt(random_encrypted_blob_odd, random_key) is None - - -def test_decrypt_hex(): - # Valid data should run with no InvalidTag and verify - assert Cryptographer.decrypt(encrypted_blob, key) == data - - -def test_decrypt_bytes(): - # We can also get the decryption in bytes - byte_blob = Cryptographer.decrypt(encrypted_blob, key, rtype="bytes") - assert isinstance(byte_blob, bytes) and byte_blob == binascii.unhexlify(data) - - -def test_decrypt_wrong_return(): - # Any other type but "hex" (default) or "bytes" should fail try: - Cryptographer.decrypt(encrypted_blob, key, rtype="random_value") + Cryptographer.check_data_key_format(data, key) + assert False + + except ValueError as e: + assert "Odd-length" in str(e) + + +def test_check_data_key_format_wrong_key(): + data = get_random_value_hex(64) + key = get_random_value_hex(33) + + try: + Cryptographer.check_data_key_format(data, key) + assert False + + except ValueError as e: + assert "32-byte hex" in str(e) + + +def test_check_data_key_format(): + data = get_random_value_hex(64) + key = get_random_value_hex(32) + + assert Cryptographer.check_data_key_format(data, key) is True + + +def test_encrypt_odd_length_data(): + blob = Blob(get_random_value_hex(64)[-1]) + key = get_random_value_hex(32) + + try: + Cryptographer.encrypt(blob, key) assert False except ValueError: assert True -# def test_encrypt(): -# # Valid data, valid key -# data = get_random_value_hex(64) -# blob = Blob(data, SUPPORTED_CIPHERS[0], SUPPORTED_HASH_FUNCTIONS[0]) -# key = get_random_value_hex(32) -# -# encrypted_blob = blob.encrypt(key) -# -# # Invalid key (note that encrypt cannot be called with invalid data since that's checked when the Blob is created) -# invalid_key = unhexlify(get_random_value_hex(32)) -# -# try: -# blob.encrypt(invalid_key) -# assert False, "Able to create encrypt with invalid key" -# -# except ValueError: -# assert True -# -# # Check that two encryptions of the same data have the same result -# encrypted_blob2 = blob.encrypt(key) -# -# assert encrypted_blob == encrypted_blob2 and id(encrypted_blob) != id(encrypted_blob2) +def test_encrypt_wrong_key_size(): + blob = Blob(get_random_value_hex(64)) + key = get_random_value_hex(31) + + try: + Cryptographer.encrypt(blob, key) + assert False + + except ValueError: + assert True + + +def test_encrypt_hex(): + blob = Blob(data) + + assert Cryptographer.encrypt(blob, key) == encrypted_data + + +def test_encrypt_bytes(): + blob = Blob(data) + + byte_blob = Cryptographer.encrypt(blob, key, rtype="bytes") + assert isinstance(byte_blob, bytes) and byte_blob == binascii.unhexlify(encrypted_data) + + +def test_encrypt_wrong_return(): + # Any other type but "hex" (default) or "bytes" should fail + try: + Cryptographer.encrypt(Blob(data), key, rtype="random_value") + assert False + + except ValueError: + assert True + + +def test_decrypt_invalid_tag(): + random_key = get_random_value_hex(32) + random_encrypted_data = get_random_value_hex(64) + random_encrypted_blob = EncryptedBlob(random_encrypted_data) + + # Trying to decrypt random data should result in an InvalidTag exception. Our decrypt function + # returns None + hex_tx = Cryptographer.decrypt(random_encrypted_blob, random_key) + assert hex_tx is None + + +def test_decrypt_odd_length_data(): + random_key = get_random_value_hex(32) + random_encrypted_data_odd = get_random_value_hex(64)[:-1] + random_encrypted_blob_odd = EncryptedBlob(random_encrypted_data_odd) + + try: + Cryptographer.decrypt(random_encrypted_blob_odd, random_key) + assert False + + except ValueError: + assert True + + +def test_decrypt_wrong_key_size(): + random_key = get_random_value_hex(31) + random_encrypted_data_odd = get_random_value_hex(64) + random_encrypted_blob_odd = EncryptedBlob(random_encrypted_data_odd) + + try: + Cryptographer.decrypt(random_encrypted_blob_odd, random_key) + assert False + + except ValueError: + assert True + + +def test_decrypt_hex(): + # Valid data should run with no InvalidTag and verify + assert Cryptographer.decrypt(EncryptedBlob(encrypted_data), key) == data + + +def test_decrypt_bytes(): + # We can also get the decryption in bytes + byte_blob = Cryptographer.decrypt(EncryptedBlob(encrypted_data), key, rtype="bytes") + assert isinstance(byte_blob, bytes) and byte_blob == binascii.unhexlify(data) + + +def test_decrypt_wrong_return(): + # Any other type but "hex" (default) or "bytes" should fail + try: + Cryptographer.decrypt(EncryptedBlob(encrypted_data), key, rtype="random_value") + assert False + + except ValueError: + assert True diff --git a/test/unit/test_inspector.py b/test/unit/test_inspector.py index b191ddb..2603e61 100644 --- a/test/unit/test_inspector.py +++ b/test/unit/test_inspector.py @@ -5,8 +5,6 @@ from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.asymmetric import ec -from apps.cli.pisa_cli import build_appointment - from pisa import c_logger from pisa.errors import * from pisa.inspector import Inspector @@ -14,7 +12,7 @@ from pisa.appointment import Appointment from pisa.block_processor import BlockProcessor from pisa.conf import MIN_DISPUTE_DELTA -from test.unit.conftest import get_random_value_hex +from test.unit.conftest import get_random_value_hex, generate_dummy_appointment_data from common.constants import LOCATOR_LEN_BYTES, LOCATOR_LEN_HEX @@ -179,25 +177,17 @@ def test_check_blob(): def test_check_appointment_signature(generate_keypair): client_sk, client_pk = generate_keypair - dummy_appointment_request = { - "tx": get_random_value_hex(192), - "tx_id": get_random_value_hex(32), - "start_time": 1500, - "end_time": 50000, - "dispute_delta": 200, - } - dummy_appointment = build_appointment(**dummy_appointment_request) - - # Verify that an appointment signed by the client is valid - signature = sign_appointment(client_sk, dummy_appointment) - assert Inspector.check_appointment_signature(dummy_appointment, signature, client_pk) + dummy_appointment_data, _ = generate_dummy_appointment_data(real_height=False) + assert Inspector.check_appointment_signature( + dummy_appointment_data["appointment"], dummy_appointment_data["signature"], dummy_appointment_data["public_key"] + ) fake_sk = ec.generate_private_key(ec.SECP256K1, default_backend()) # Create a bad signature to make sure inspector rejects it - bad_signature = sign_appointment(fake_sk, dummy_appointment) + bad_signature = sign_appointment(fake_sk, dummy_appointment_data["appointment"]) assert ( - Inspector.check_appointment_signature(dummy_appointment, bad_signature, client_pk)[0] + Inspector.check_appointment_signature(dummy_appointment_data["appointment"], bad_signature, client_pk)[0] == APPOINTMENT_INVALID_SIGNATURE ) diff --git a/test/unit/test_responder.py b/test/unit/test_responder.py index b41e972..4b88094 100644 --- a/test/unit/test_responder.py +++ b/test/unit/test_responder.py @@ -11,9 +11,10 @@ from pisa import c_logger from pisa.db_manager import DBManager from pisa.responder import Responder, Job from pisa.block_processor import BlockProcessor -from pisa.tools import check_txid_format, bitcoin_cli +from pisa.tools import bitcoin_cli from common.constants import LOCATOR_LEN_HEX +from common.tools import check_sha256_hex_format from test.simulator.utils import sha256d from test.simulator.bitcoind_sim import TX @@ -288,7 +289,7 @@ def test_do_subscribe(responder): try: generate_block() block_hash = responder.block_queue.get() - assert check_txid_format(block_hash) + assert check_sha256_hex_format(block_hash) except Empty: assert False diff --git a/test/unit/test_tools.py b/test/unit/test_tools.py index 0709441..9b32b24 100644 --- a/test/unit/test_tools.py +++ b/test/unit/test_tools.py @@ -1,5 +1,7 @@ from pisa import c_logger -from pisa.tools import can_connect_to_bitcoind, in_correct_network, bitcoin_cli, check_txid_format +from pisa.tools import can_connect_to_bitcoind, in_correct_network, bitcoin_cli + +from common.tools import check_sha256_hex_format c_logger.disabled = True @@ -30,14 +32,30 @@ def test_bitcoin_cli(): assert False -def test_check_txid_format(): - assert check_txid_format(None) is False - assert check_txid_format("") is False - assert check_txid_format(0x0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF) is False # wrong type - assert check_txid_format("abcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcd") is True # lowercase - assert check_txid_format("ABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCD") is True # uppercase - assert check_txid_format("0123456789abcdef0123456789ABCDEF0123456789abcdef0123456789ABCDEF") is True # mixed case - assert check_txid_format("0123456789012345678901234567890123456789012345678901234567890123") is True # only nums - assert check_txid_format("0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdf") is False # too short - assert check_txid_format("0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0") is False # too long - assert check_txid_format("g123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef") is False # non-hex +def test_check_sha256_hex_format(): + assert check_sha256_hex_format(None) is False + assert check_sha256_hex_format("") is False + assert ( + check_sha256_hex_format(0x0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF) is False + ) # wrong type + assert ( + check_sha256_hex_format("abcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcd") is True + ) # lowercase + assert ( + check_sha256_hex_format("ABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCD") is True + ) # uppercase + assert ( + check_sha256_hex_format("0123456789abcdef0123456789ABCDEF0123456789abcdef0123456789ABCDEF") is True + ) # mixed case + assert ( + check_sha256_hex_format("0123456789012345678901234567890123456789012345678901234567890123") is True + ) # only nums + assert ( + check_sha256_hex_format("0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdf") is False + ) # too short + assert ( + check_sha256_hex_format("0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0") is False + ) # too long + assert ( + check_sha256_hex_format("g123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef") is False + ) # non-hex diff --git a/test/unit/test_watcher.py b/test/unit/test_watcher.py index bbcdbb8..960a57a 100644 --- a/test/unit/test_watcher.py +++ b/test/unit/test_watcher.py @@ -12,10 +12,12 @@ from cryptography.exceptions import InvalidSignature from pisa import c_logger from pisa.watcher import Watcher from pisa.responder import Responder -from pisa.tools import check_txid_format, bitcoin_cli +from pisa.tools import bitcoin_cli from test.unit.conftest import generate_block, generate_blocks, generate_dummy_appointment, get_random_value_hex from pisa.conf import EXPIRY_DELTA, PISA_SECRET_KEY, MAX_APPOINTMENTS +from common.tools import check_sha256_hex_format + c_logger.disabled = True APPOINTMENTS = 5 @@ -152,7 +154,7 @@ def test_do_subscribe(watcher): try: generate_block() block_hash = watcher.block_queue.get() - assert check_txid_format(block_hash) + assert check_sha256_hex_format(block_hash) except Empty: assert False