Removes hash/cipher configuration and changes AESGCM128 for CHACHA20POLY1305

Updates tests accordingly
This commit is contained in:
Sergi Delgado Segura
2019-12-04 17:46:07 +01:00
parent c679d59451
commit f0150ce585
22 changed files with 78 additions and 351 deletions

View File

@@ -5,8 +5,7 @@ import requests
from time import sleep
from shutil import rmtree
from threading import Thread
from hashlib import sha256
from binascii import hexlify, unhexlify
from binascii import hexlify
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
@@ -15,6 +14,7 @@ from cryptography.hazmat.primitives import serialization
from apps.cli.blob import Blob
from pisa.responder import Job
from pisa.watcher import Watcher
from pisa.tools import bitcoin_cli
from pisa.db_manager import DBManager
from pisa.appointment import Appointment
@@ -99,9 +99,6 @@ def generate_dummy_appointment_data(real_height=True, start_time_offset=5, end_t
"dispute_delta": 20,
}
cipher = "AES-GCM-128"
hash_function = "SHA256"
# dummy keys for this test
client_sk = ec.generate_private_key(ec.SECP256K1, default_backend())
client_pk = (
@@ -110,8 +107,8 @@ def generate_dummy_appointment_data(real_height=True, start_time_offset=5, end_t
.decode("utf-8")
)
locator = sha256(unhexlify(dispute_txid)).hexdigest()
blob = Blob(dummy_appointment_data.get("tx"), cipher, hash_function)
locator = Watcher.compute_locator(dispute_txid)
blob = Blob(dummy_appointment_data.get("tx"))
encrypted_blob = blob.encrypt((dummy_appointment_data.get("tx_id")))
@@ -121,8 +118,6 @@ def generate_dummy_appointment_data(real_height=True, start_time_offset=5, end_t
"end_time": dummy_appointment_data.get("end_time"),
"dispute_delta": dummy_appointment_data.get("dispute_delta"),
"encrypted_blob": encrypted_blob,
"cipher": cipher,
"hash_function": hash_function,
}
signature = sign_appointment(client_sk, appointment_data)

View File

@@ -147,6 +147,7 @@ def test_request_appointment_watcher(new_appt_data):
appointment_status = [appointment.pop("status") for appointment in received_appointments]
# Check that the appointment is within the received appoints
print("AAA", new_appt_data["appointment"], received_appointments)
assert new_appt_data["appointment"] in received_appointments
# Check that all the appointments are being watched

View File

@@ -17,8 +17,6 @@ def appointment_data():
end_time = 120
dispute_delta = 20
encrypted_blob_data = get_random_value_hex(100)
cipher = "AES-GCM-128"
hash_function = "SHA256"
return {
"locator": locator,
@@ -26,8 +24,6 @@ def appointment_data():
"end_time": end_time,
"dispute_delta": dispute_delta,
"encrypted_blob": encrypted_blob_data,
"cipher": cipher,
"hash_function": hash_function,
}
@@ -42,8 +38,6 @@ def test_init_appointment(appointment_data):
appointment_data["end_time"],
appointment_data["dispute_delta"],
appointment_data["encrypted_blob"],
appointment_data["cipher"],
appointment_data["hash_function"],
)
assert (
@@ -52,8 +46,6 @@ def test_init_appointment(appointment_data):
and appointment_data["end_time"] == appointment.end_time
and appointment_data["dispute_delta"] == appointment.dispute_delta
and EncryptedBlob(appointment_data["encrypted_blob"]) == appointment.encrypted_blob
and appointment_data["cipher"] == appointment.cipher
and appointment_data["hash_function"] == appointment.hash_function
)
@@ -64,8 +56,6 @@ def test_to_dict(appointment_data):
appointment_data["end_time"],
appointment_data["dispute_delta"],
appointment_data["encrypted_blob"],
appointment_data["cipher"],
appointment_data["hash_function"],
)
dict_appointment = appointment.to_dict()
@@ -76,8 +66,6 @@ def test_to_dict(appointment_data):
and appointment_data["end_time"] == dict_appointment["end_time"]
and appointment_data["dispute_delta"] == dict_appointment["dispute_delta"]
and EncryptedBlob(appointment_data["encrypted_blob"]) == EncryptedBlob(dict_appointment["encrypted_blob"])
and appointment_data["cipher"] == dict_appointment["cipher"]
and appointment_data["hash_function"] == dict_appointment["hash_function"]
)
@@ -88,8 +76,6 @@ def test_to_json(appointment_data):
appointment_data["end_time"],
appointment_data["dispute_delta"],
appointment_data["encrypted_blob"],
appointment_data["cipher"],
appointment_data["hash_function"],
)
dict_appointment = json.loads(appointment.to_json())
@@ -100,8 +86,6 @@ def test_to_json(appointment_data):
and appointment_data["end_time"] == dict_appointment["end_time"]
and appointment_data["dispute_delta"] == dict_appointment["dispute_delta"]
and EncryptedBlob(appointment_data["encrypted_blob"]) == EncryptedBlob(dict_appointment["encrypted_blob"])
and appointment_data["cipher"] == dict_appointment["cipher"]
and appointment_data["hash_function"] == dict_appointment["hash_function"]
)

View File

@@ -3,88 +3,19 @@ from binascii import unhexlify
from pisa import c_logger
from apps.cli.blob import Blob
from test.unit.conftest import get_random_value_hex
from pisa.conf import SUPPORTED_CIPHERS, SUPPORTED_HASH_FUNCTIONS
c_logger.disabled = True
def test_init_blob():
data = get_random_value_hex(64)
blob = Blob(data)
assert isinstance(blob, Blob)
# Fixed (valid) hash function, try different valid ciphers
hash_function = SUPPORTED_HASH_FUNCTIONS[0]
for cipher in SUPPORTED_CIPHERS:
cipher_cases = [cipher, cipher.lower(), cipher.capitalize()]
for case in cipher_cases:
blob = Blob(data, case, hash_function)
assert blob.data == data and blob.cipher == case and blob.hash_function == hash_function
# Fixed (valid) cipher, try different valid hash functions
cipher = SUPPORTED_CIPHERS[0]
for hash_function in SUPPORTED_HASH_FUNCTIONS:
hash_function_cases = [hash_function, hash_function.lower(), hash_function.capitalize()]
for case in hash_function_cases:
blob = Blob(data, cipher, case)
assert blob.data == data and blob.cipher == cipher and blob.hash_function == case
# Invalid data
data = unhexlify(get_random_value_hex(64))
cipher = SUPPORTED_CIPHERS[0]
hash_function = SUPPORTED_HASH_FUNCTIONS[0]
# Wrong data
try:
Blob(data, cipher, hash_function)
Blob(unhexlify(get_random_value_hex(64)))
assert False, "Able to create blob with wrong data"
except ValueError:
assert True
# Invalid cipher
data = get_random_value_hex(64)
cipher = "A" * 10
hash_function = SUPPORTED_HASH_FUNCTIONS[0]
try:
Blob(data, cipher, hash_function)
assert False, "Able to create blob with wrong data"
except ValueError:
assert True
# Invalid hash function
data = get_random_value_hex(64)
cipher = SUPPORTED_CIPHERS[0]
hash_function = "A" * 10
try:
Blob(data, cipher, hash_function)
assert False, "Able to create blob with wrong data"
except ValueError:
assert True
def test_encrypt():
# Valid data, valid key
data = get_random_value_hex(64)
blob = Blob(data, SUPPORTED_CIPHERS[0], SUPPORTED_HASH_FUNCTIONS[0])
key = get_random_value_hex(32)
encrypted_blob = blob.encrypt(key)
# Invalid key (note that encrypt cannot be called with invalid data since that's checked when the Blob is created)
invalid_key = unhexlify(get_random_value_hex(32))
try:
blob.encrypt(invalid_key)
assert False, "Able to create encrypt with invalid key"
except ValueError:
assert True
# Check that two encryptions of the same data have the same result
encrypted_blob2 = blob.encrypt(key)
assert encrypted_blob == encrypted_blob2 and id(encrypted_blob) != id(encrypted_blob2)

View File

@@ -25,7 +25,7 @@ def set_up_appointments(db_manager, total_appointments):
uuid = uuid4().hex
locator = get_random_value_hex(32)
appointment = Appointment(locator, None, None, None, None, None, None)
appointment = Appointment(locator, None, None, None, None, None)
appointments[uuid] = appointment
locator_uuid_map[locator] = [uuid]

View File

@@ -6,7 +6,7 @@ from test.unit.conftest import get_random_value_hex
data = "6097cdf52309b1b2124efeed36bd34f46dc1c25ad23ac86f28380f746254f777"
key = "b2e984a570f6f49bc38ace178e09147b0aa296cbb7c92eb01412f7e2d07b5659"
encrypted_data = "092e93d4a34aac4367075506f2c050ddfa1a201ee6669b65058572904dcea642aeb01ea4b57293618e8c46809dfadadc"
encrypted_data = "8f31028097a8bf12a92e088caab5cf3fcddf0d35ed2b72c24b12269373efcdea04f9d2a820adafe830c20ff132d89810"
encrypted_blob = EncryptedBlob(encrypted_data)
@@ -49,3 +49,27 @@ def test_decrypt_wrong_return():
except ValueError:
assert True
# def test_encrypt():
# # Valid data, valid key
# data = get_random_value_hex(64)
# blob = Blob(data, SUPPORTED_CIPHERS[0], SUPPORTED_HASH_FUNCTIONS[0])
# key = get_random_value_hex(32)
#
# encrypted_blob = blob.encrypt(key)
#
# # Invalid key (note that encrypt cannot be called with invalid data since that's checked when the Blob is created)
# invalid_key = unhexlify(get_random_value_hex(32))
#
# try:
# blob.encrypt(invalid_key)
# assert False, "Able to create encrypt with invalid key"
#
# except ValueError:
# assert True
#
# # Check that two encryptions of the same data have the same result
# encrypted_blob2 = blob.encrypt(key)
#
# assert encrypted_blob == encrypted_blob2 and id(encrypted_blob) != id(encrypted_blob2)

View File

@@ -40,7 +40,6 @@ def test_init():
# Check that the db can be created if it does not exist
db_manager = open_create_db(db_path)
assert isinstance(db_manager, DBManager)
print(type(db_manager))
db_manager.db.close()
# Check that we can open an already create db
@@ -188,7 +187,6 @@ def test_delete_locator_map(db_manager):
assert len(locator_maps) != 0
for locator, uuids in locator_maps.items():
print(locator)
db_manager.delete_locator_map(locator)
locator_maps = db_manager.load_appointments_db(prefix=LOCATOR_MAP_PREFIX)

View File

@@ -11,24 +11,6 @@ def test_init_encrypted_blob():
assert EncryptedBlob(data).data == data
def test_init_encrypted_blob_wrong_cipher():
try:
EncryptedBlob(get_random_value_hex(64), cipher="")
assert False
except ValueError:
assert True
def test_init_encrypted_blob_wrong_hash_function():
try:
EncryptedBlob(get_random_value_hex(64), hash_function="")
assert False
except ValueError:
assert True
def test_equal():
data = get_random_value_hex(64)
e_blob1 = EncryptedBlob(data)

View File

@@ -13,16 +13,16 @@ from pisa.appointment import Appointment
from pisa.block_processor import BlockProcessor
from test.unit.conftest import get_random_value_hex
from pisa.conf import MIN_DISPUTE_DELTA, SUPPORTED_CIPHERS, SUPPORTED_HASH_FUNCTIONS
from pisa.conf import MIN_DISPUTE_DELTA
c_logger.disabled = True
inspector = Inspector()
APPOINTMENT_OK = (0, None)
NO_HEX_STRINGS = ["R" * 64, get_random_value_hex(31) + "PP", "$" * 64, " " * 64]
WRONG_TYPES = [[], "", get_random_value_hex(32), 3.2, 2.0, (), object, {}, " " * 32, object()]
WRONG_TYPES_NO_STR = [[], unhexlify(get_random_value_hex(32)), 3.2, 2.0, (), object, {}, object()]
NO_HEX_STRINGS = ["R" * 32, get_random_value_hex(15) + "PP", "$" * 32, " " * 32]
WRONG_TYPES = [[], "", get_random_value_hex(16), 3.2, 2.0, (), object, {}, " " * 32, object()]
WRONG_TYPES_NO_STR = [[], unhexlify(get_random_value_hex(16)), 3.2, 2.0, (), object, {}, object()]
def sign_appointment(sk, appointment):
@@ -32,15 +32,15 @@ def sign_appointment(sk, appointment):
def test_check_locator():
# Right appointment type, size and format
locator = get_random_value_hex(32)
locator = get_random_value_hex(16)
assert Inspector.check_locator(locator) == APPOINTMENT_OK
# Wrong size (too big)
locator = get_random_value_hex(33)
locator = get_random_value_hex(17)
assert Inspector.check_locator(locator)[0] == APPOINTMENT_WRONG_FIELD_SIZE
# Wrong size (too small)
locator = get_random_value_hex(31)
locator = get_random_value_hex(15)
assert Inspector.check_locator(locator)[0] == APPOINTMENT_WRONG_FIELD_SIZE
# Empty
@@ -157,50 +157,6 @@ def test_check_blob():
assert Inspector.check_blob(encrypted_blob)[0] == APPOINTMENT_WRONG_FIELD_FORMAT
def test_check_cipher():
# Right format and content (any case combination should be accepted)
for cipher in SUPPORTED_CIPHERS:
cipher_cases = [cipher, cipher.lower(), cipher.capitalize()]
for case in cipher_cases:
assert Inspector.check_cipher(case) == APPOINTMENT_OK
# Wrong type
ciphers = WRONG_TYPES_NO_STR
for cipher in ciphers:
assert Inspector.check_cipher(cipher)[0] == APPOINTMENT_WRONG_FIELD_TYPE
# Wrong value
ciphers = NO_HEX_STRINGS
for cipher in ciphers:
assert Inspector.check_cipher(cipher)[0] == APPOINTMENT_CIPHER_NOT_SUPPORTED
# Empty field
cipher = None
assert Inspector.check_cipher(cipher)[0] == APPOINTMENT_EMPTY_FIELD
def test_check_hash_function():
# Right format and content (any case combination should be accepted)
for hash_function in SUPPORTED_HASH_FUNCTIONS:
hash_function_cases = [hash_function, hash_function.lower(), hash_function.capitalize()]
for case in hash_function_cases:
assert Inspector.check_hash_function(case) == APPOINTMENT_OK
# Wrong type
hash_functions = WRONG_TYPES_NO_STR
for hash_function in hash_functions:
assert Inspector.check_hash_function(hash_function)[0] == APPOINTMENT_WRONG_FIELD_TYPE
# Wrong value
hash_functions = NO_HEX_STRINGS
for hash_function in hash_functions:
assert Inspector.check_hash_function(hash_function)[0] == APPOINTMENT_HASH_FUNCTION_NOT_SUPPORTED
# Empty field
hash_function = None
assert Inspector.check_hash_function(hash_function)[0] == APPOINTMENT_EMPTY_FIELD
def test_check_appointment_signature(generate_keypair):
client_sk, client_pk = generate_keypair
@@ -240,13 +196,11 @@ def test_inspect(run_bitcoind, generate_keypair):
assert type(appointment) == tuple and appointment[0] != 0
# Valid appointment
locator = get_random_value_hex(32)
locator = get_random_value_hex(16)
start_time = BlockProcessor.get_block_count() + 5
end_time = start_time + 20
dispute_delta = MIN_DISPUTE_DELTA
encrypted_blob = get_random_value_hex(64)
cipher = SUPPORTED_CIPHERS[0]
hash_function = SUPPORTED_HASH_FUNCTIONS[0]
appointment_data = {
"locator": locator,
@@ -254,8 +208,6 @@ def test_inspect(run_bitcoind, generate_keypair):
"end_time": end_time,
"dispute_delta": dispute_delta,
"encrypted_blob": encrypted_blob,
"cipher": cipher,
"hash_function": hash_function,
}
signature = sign_appointment(client_sk, appointment_data)
@@ -269,6 +221,4 @@ def test_inspect(run_bitcoind, generate_keypair):
and appointment.end_time == end_time
and appointment.dispute_delta == dispute_delta
and appointment.encrypted_blob.data == encrypted_blob
and appointment.cipher == cipher
and appointment.hash_function == hash_function
)

View File

@@ -1,8 +1,6 @@
import pytest
from uuid import uuid4
from hashlib import sha256
from threading import Thread
from binascii import unhexlify
from queue import Queue, Empty
from cryptography.hazmat.backends import default_backend
@@ -44,7 +42,7 @@ def txids():
@pytest.fixture(scope="module")
def locator_uuid_map(txids):
return {sha256(unhexlify(txid)).hexdigest(): uuid4().hex for txid in txids}
return {Watcher.compute_locator(txid): uuid4().hex for txid in txids}
def create_appointments(n):
@@ -232,18 +230,17 @@ def test_filter_valid_matches_random_data(watcher):
def test_filter_valid_matches(watcher):
dispute_txid = "0437cd7f8525ceed2324359c2d0ba26006d92d856a9c20fa0241106ee5a597c9"
encrypted_blob = (
"29f55518945408f567bb7feb4d7bb15ba88b7d8ca0223a44d5c67dfe32d038caee7613e35736025d95ad4ecd6538a50"
"74cbe8d7739705697a5dc4d19b8a6e4459ed2d1b0d0a9b18c49bc2187dcbfb4046b14d58a1add83235fc632efc398d5"
"0abcb7738f1a04b3783d025c1828b4e8a8dc8f13f2843e6bc3bf08eade02fc7e2c4dce7d2f83b055652e944ac114e0b"
"72a9abcd98fd1d785a5d976c05ed780e033e125fa083c6591b6029aa68dbc099f148a2bc2e0cb63733e68af717d48d5"
"a312b5f5b2fcca9561b2ff4191f9cdff936a43f6efef4ee45fbaf1f18d0a4b006f3fc8399dd8ecb21f709d4583bba14"
"4af6d49fa99d7be2ca21059a997475aa8642b66b921dc7fc0321b6a2f6927f6f9bab55c75e17a19dc3b2ae895b6d4a4"
"f64f8eb21b1e"
"a62aa9bb3c8591e4d5de10f1bd49db92432ce2341af55762cdc9242c08662f97f5f47da0a1aa88373508cd6e67e87eefddeca0cee98c1"
"967ec1c1ecbb4c5e8bf08aa26159214e6c0bc4b2c7c247f87e7601d15c746fc4e711be95ba0e363001280138ba9a65b06c4aa6f592b21"
"3635ee763984d522a4c225814510c8f7ab0801f36d4a68f5ee7dd3930710005074121a172c29beba79ed647ebaf7e7fab1bbd9a208251"
"ef5486feadf2c46e33a7d66adf9dbbc5f67b55a34b1b3c4909dd34a482d759b0bc25ecd2400f656db509466d7479b5b92a2fadabccc9e"
"c8918da8979a9feadea27531643210368fee494d3aaa4983e05d6cf082a49105e2f8a7c7821899239ba7dee12940acd7d8a629894b5d31"
"e94b439cfe8d2e9f21e974ae5342a70c91e8"
)
dummy_appointment, _ = generate_dummy_appointment()
dummy_appointment.encrypted_blob.data = encrypted_blob
dummy_appointment.locator = sha256(unhexlify(dispute_txid)).hexdigest()
dummy_appointment.locator = Watcher.compute_locator(dispute_txid)
uuid = uuid4().hex
appointments = {uuid: dummy_appointment}