mirror of
https://github.com/aljazceru/python-teos.git
synced 2025-12-17 14:14:22 +01:00
Fixes tests
This commit is contained in:
@@ -1,9 +1,11 @@
|
|||||||
import os
|
import os
|
||||||
|
import pytest
|
||||||
from coincurve import PrivateKey, PublicKey
|
from coincurve import PrivateKey, PublicKey
|
||||||
from cryptography.hazmat.backends import default_backend
|
from cryptography.hazmat.backends import default_backend
|
||||||
from cryptography.hazmat.primitives.asymmetric import ec
|
from cryptography.hazmat.primitives.asymmetric import ec
|
||||||
from cryptography.hazmat.primitives import serialization
|
from cryptography.hazmat.primitives import serialization
|
||||||
|
|
||||||
|
from common.exceptions import InvalidKey, InvalidParameter, EncryptionError, SignatureError
|
||||||
from common.cryptographer import Cryptographer
|
from common.cryptographer import Cryptographer
|
||||||
from test.common.unit.conftest import get_random_value_hex
|
from test.common.unit.conftest import get_random_value_hex
|
||||||
|
|
||||||
@@ -26,55 +28,40 @@ def test_check_data_key_format_wrong_data():
|
|||||||
data = get_random_value_hex(64)[:-1]
|
data = get_random_value_hex(64)[:-1]
|
||||||
key = get_random_value_hex(32)
|
key = get_random_value_hex(32)
|
||||||
|
|
||||||
try:
|
with pytest.raises(InvalidParameter, match="Odd-length"):
|
||||||
Cryptographer.check_data_key_format(data, key)
|
Cryptographer.check_data_key_format(data, key)
|
||||||
assert False
|
|
||||||
|
|
||||||
except ValueError as e:
|
|
||||||
assert "Odd-length" in str(e)
|
|
||||||
|
|
||||||
|
|
||||||
def test_check_data_key_format_wrong_key():
|
def test_check_data_key_format_wrong_key():
|
||||||
data = get_random_value_hex(64)
|
data = get_random_value_hex(64)
|
||||||
key = get_random_value_hex(33)
|
key = get_random_value_hex(33)
|
||||||
|
|
||||||
try:
|
with pytest.raises(InvalidParameter, match="32-byte hex value"):
|
||||||
Cryptographer.check_data_key_format(data, key)
|
Cryptographer.check_data_key_format(data, key)
|
||||||
assert False
|
|
||||||
|
|
||||||
except ValueError as e:
|
|
||||||
assert "32-byte hex" in str(e)
|
|
||||||
|
|
||||||
|
|
||||||
def test_check_data_key_format():
|
def test_check_data_key_format():
|
||||||
data = get_random_value_hex(64)
|
data = get_random_value_hex(64)
|
||||||
key = get_random_value_hex(32)
|
key = get_random_value_hex(32)
|
||||||
|
|
||||||
assert Cryptographer.check_data_key_format(data, key) is True
|
# Correct format does not raise anything
|
||||||
|
Cryptographer.check_data_key_format(data, key)
|
||||||
|
|
||||||
|
|
||||||
def test_encrypt_odd_length_data():
|
def test_encrypt_odd_length_data():
|
||||||
blob = get_random_value_hex(64)[-1]
|
blob = get_random_value_hex(64)[-1]
|
||||||
key = get_random_value_hex(32)
|
key = get_random_value_hex(32)
|
||||||
|
|
||||||
try:
|
with pytest.raises(InvalidParameter, match="Odd-length"):
|
||||||
Cryptographer.encrypt(blob, key)
|
Cryptographer.encrypt(blob, key)
|
||||||
assert False
|
|
||||||
|
|
||||||
except ValueError:
|
|
||||||
assert True
|
|
||||||
|
|
||||||
|
|
||||||
def test_encrypt_wrong_key_size():
|
def test_encrypt_wrong_key_size():
|
||||||
blob = get_random_value_hex(64)
|
blob = get_random_value_hex(64)
|
||||||
key = get_random_value_hex(31)
|
key = get_random_value_hex(31)
|
||||||
|
|
||||||
try:
|
with pytest.raises(InvalidParameter, match="32-byte hex value"):
|
||||||
Cryptographer.encrypt(blob, key)
|
Cryptographer.encrypt(blob, key)
|
||||||
assert False
|
|
||||||
|
|
||||||
except ValueError:
|
|
||||||
assert True
|
|
||||||
|
|
||||||
|
|
||||||
def test_encrypt():
|
def test_encrypt():
|
||||||
@@ -86,10 +73,9 @@ def test_decrypt_invalid_tag():
|
|||||||
random_encrypted_data = get_random_value_hex(64)
|
random_encrypted_data = get_random_value_hex(64)
|
||||||
random_encrypted_blob = random_encrypted_data
|
random_encrypted_blob = random_encrypted_data
|
||||||
|
|
||||||
# Trying to decrypt random data should result in an InvalidTag exception. Our decrypt function
|
# Trying to decrypt random data should result in an EncryptionError
|
||||||
# returns None
|
with pytest.raises(EncryptionError, match="Cannot decrypt blob with the provided key"):
|
||||||
hex_tx = Cryptographer.decrypt(random_encrypted_blob, random_key)
|
Cryptographer.decrypt(random_encrypted_blob, random_key)
|
||||||
assert hex_tx is None
|
|
||||||
|
|
||||||
|
|
||||||
def test_decrypt_odd_length_data():
|
def test_decrypt_odd_length_data():
|
||||||
@@ -97,12 +83,8 @@ def test_decrypt_odd_length_data():
|
|||||||
random_encrypted_data_odd = get_random_value_hex(64)[:-1]
|
random_encrypted_data_odd = get_random_value_hex(64)[:-1]
|
||||||
random_encrypted_blob_odd = random_encrypted_data_odd
|
random_encrypted_blob_odd = random_encrypted_data_odd
|
||||||
|
|
||||||
try:
|
with pytest.raises(InvalidParameter, match="Odd-length"):
|
||||||
Cryptographer.decrypt(random_encrypted_blob_odd, random_key)
|
Cryptographer.decrypt(random_encrypted_blob_odd, random_key)
|
||||||
assert False
|
|
||||||
|
|
||||||
except ValueError:
|
|
||||||
assert True
|
|
||||||
|
|
||||||
|
|
||||||
def test_decrypt_wrong_key_size():
|
def test_decrypt_wrong_key_size():
|
||||||
@@ -110,12 +92,8 @@ def test_decrypt_wrong_key_size():
|
|||||||
random_encrypted_data_odd = get_random_value_hex(64)
|
random_encrypted_data_odd = get_random_value_hex(64)
|
||||||
random_encrypted_blob_odd = random_encrypted_data_odd
|
random_encrypted_blob_odd = random_encrypted_data_odd
|
||||||
|
|
||||||
try:
|
with pytest.raises(InvalidParameter, match="32-byte hex value"):
|
||||||
Cryptographer.decrypt(random_encrypted_blob_odd, random_key)
|
Cryptographer.decrypt(random_encrypted_blob_odd, random_key)
|
||||||
assert False
|
|
||||||
|
|
||||||
except ValueError:
|
|
||||||
assert True
|
|
||||||
|
|
||||||
|
|
||||||
def test_decrypt():
|
def test_decrypt():
|
||||||
@@ -135,29 +113,33 @@ def test_load_key_file():
|
|||||||
with open("key_test_file", "wb") as f:
|
with open("key_test_file", "wb") as f:
|
||||||
f.write(dummy_sk_der)
|
f.write(dummy_sk_der)
|
||||||
|
|
||||||
appt_data = Cryptographer.load_key_file("key_test_file")
|
Cryptographer.load_key_file("key_test_file")
|
||||||
assert appt_data
|
|
||||||
|
|
||||||
os.remove("key_test_file")
|
os.remove("key_test_file")
|
||||||
|
|
||||||
# If file doesn't exist, function should return None
|
# If file doesn't exist, function should return None
|
||||||
assert Cryptographer.load_key_file("nonexistent_file") is None
|
with pytest.raises(InvalidParameter, match="file not found"):
|
||||||
|
Cryptographer.load_key_file("nonexistent_file")
|
||||||
|
|
||||||
# If something that's not a file_path is passed as parameter the method should also return None
|
with pytest.raises(InvalidParameter, match="file path was expected"):
|
||||||
assert Cryptographer.load_key_file(0) is None and Cryptographer.load_key_file(None) is None
|
Cryptographer.load_key_file(0)
|
||||||
|
|
||||||
|
with pytest.raises(InvalidParameter, match="file path was expected"):
|
||||||
|
Cryptographer.load_key_file(None)
|
||||||
|
|
||||||
|
|
||||||
def test_load_private_key_der():
|
def test_load_private_key_der():
|
||||||
# load_private_key_der expects a byte encoded data. Any other should fail and return None
|
# load_private_key_der expects a byte encoded data. Any other should fail and return None
|
||||||
for wtype in WRONG_TYPES:
|
for wtype in WRONG_TYPES:
|
||||||
assert Cryptographer.load_private_key_der(wtype) is None
|
with pytest.raises(InvalidKey, match="(wrong type)"):
|
||||||
|
Cryptographer.load_private_key_der(wtype)
|
||||||
|
|
||||||
# On the other hand, any random formatter byte array would also fail (zeros for example)
|
# On the other hand, any random formatter byte array would also fail (zeros for example)
|
||||||
assert Cryptographer.load_private_key_der(bytes(32)) is None
|
with pytest.raises(InvalidKey, match="(wrong size or format)"):
|
||||||
|
Cryptographer.load_private_key_der(bytes(32))
|
||||||
|
|
||||||
# A proper formatted key should load
|
# A proper formatted key should load
|
||||||
sk_der = generate_keypair()[0].to_der()
|
sk_der = generate_keypair()[0].to_der()
|
||||||
assert Cryptographer.load_private_key_der(sk_der) is not None
|
Cryptographer.load_private_key_der(sk_der)
|
||||||
|
|
||||||
|
|
||||||
def test_sign():
|
def test_sign():
|
||||||
@@ -186,7 +168,8 @@ def test_sign_ground_truth():
|
|||||||
def test_sign_wrong_sk():
|
def test_sign_wrong_sk():
|
||||||
# If a sk is not passed, sign will return None
|
# If a sk is not passed, sign will return None
|
||||||
for wtype in WRONG_TYPES:
|
for wtype in WRONG_TYPES:
|
||||||
assert Cryptographer.sign(b"", wtype) is None
|
with pytest.raises(InvalidParameter, match="Wrong value passed as sk"):
|
||||||
|
Cryptographer.sign(b"", wtype)
|
||||||
|
|
||||||
|
|
||||||
def test_recover_pk():
|
def test_recover_pk():
|
||||||
@@ -200,12 +183,13 @@ def test_recover_pk():
|
|||||||
|
|
||||||
|
|
||||||
def test_recover_pk_invalid_sigrec():
|
def test_recover_pk_invalid_sigrec():
|
||||||
message = "Hey, it's me"
|
message = "Hey, it's me".encode("utf-8")
|
||||||
signature = "ddbfb019e4d56155b4175066c2b615ab765d317ae7996d188b4a5fae4cc394adf98fef46034d0553149392219ca6d37dca9abdfa6366a8e54b28f19d3e5efa8a14b556205dc7f33a"
|
signature = "ddbfb019e4d56155b4175066c2b615ab765d317ae7996d188b4a5fae4cc394adf98fef46034d0553149392219ca6d37dca9abdfa6366a8e54b28f19d3e5efa8a14b556205dc7f33a"
|
||||||
|
|
||||||
# The given signature, when zbase32 decoded, has a fist byte with value lower than 31.
|
# The given signature, when zbase32 decoded, has a fist byte with value lower than 31.
|
||||||
# The first byte of the signature should be 31 + SigRec, so this should fail
|
# The first byte of the signature should be 31 + SigRec, so this should fail
|
||||||
assert Cryptographer.recover_pk(message, signature) is None
|
with pytest.raises(SignatureError, match="Wrong SigRec"):
|
||||||
|
Cryptographer.recover_pk(message, signature)
|
||||||
|
|
||||||
|
|
||||||
def test_recover_pk_ground_truth():
|
def test_recover_pk_ground_truth():
|
||||||
@@ -219,6 +203,7 @@ def test_recover_pk_ground_truth():
|
|||||||
assert org_pk == Cryptographer.get_compressed_pk(rpk)
|
assert org_pk == Cryptographer.get_compressed_pk(rpk)
|
||||||
|
|
||||||
|
|
||||||
|
# FIXME: needs further testing
|
||||||
def test_recover_pk_wrong_inputs():
|
def test_recover_pk_wrong_inputs():
|
||||||
str_message = "Test message"
|
str_message = "Test message"
|
||||||
message = bytes(20)
|
message = bytes(20)
|
||||||
@@ -226,14 +211,18 @@ def test_recover_pk_wrong_inputs():
|
|||||||
sig = bytes(20)
|
sig = bytes(20)
|
||||||
|
|
||||||
# Wrong input type
|
# Wrong input type
|
||||||
assert Cryptographer.recover_pk(message, str_sig) is None
|
with pytest.raises(InvalidParameter, match="Wrong value passed as zbase32_sig"):
|
||||||
assert Cryptographer.recover_pk(str_message, str_sig) is None
|
Cryptographer.recover_pk(message, sig)
|
||||||
assert Cryptographer.recover_pk(str_message, sig) is None
|
|
||||||
assert Cryptographer.recover_pk(message, str_sig) is None
|
|
||||||
|
|
||||||
# Wrong input size or format
|
with pytest.raises(InvalidParameter, match="Wrong value passed as message"):
|
||||||
assert Cryptographer.recover_pk(message, sig) is None
|
Cryptographer.recover_pk(str_message, str_sig)
|
||||||
assert Cryptographer.recover_pk(message, bytes(104)) is None
|
|
||||||
|
with pytest.raises(InvalidParameter, match="Wrong value passed as message"):
|
||||||
|
Cryptographer.recover_pk(str_message, sig)
|
||||||
|
|
||||||
|
# Wrong input size
|
||||||
|
with pytest.raises(SignatureError, match="Serialized signature must be 65 bytes long"):
|
||||||
|
Cryptographer.recover_pk(message, str_sig)
|
||||||
|
|
||||||
|
|
||||||
def test_get_compressed_pk():
|
def test_get_compressed_pk():
|
||||||
@@ -245,16 +234,16 @@ def test_get_compressed_pk():
|
|||||||
|
|
||||||
|
|
||||||
def test_get_compressed_pk_wrong_key():
|
def test_get_compressed_pk_wrong_key():
|
||||||
# pk should be properly initialized. Initializing from int will case it to not be recoverable
|
# pk should be properly initialized. Initializing from int will cause it to not be recoverable
|
||||||
pk = PublicKey(0)
|
pk = PublicKey(0)
|
||||||
compressed_pk = Cryptographer.get_compressed_pk(pk)
|
|
||||||
|
|
||||||
assert compressed_pk is None
|
with pytest.raises(InvalidKey, match="PublicKey has invalid initializer"):
|
||||||
|
Cryptographer.get_compressed_pk(pk)
|
||||||
|
|
||||||
|
|
||||||
def test_get_compressed_pk_wrong_type():
|
def test_get_compressed_pk_wrong_type():
|
||||||
# Passing a value that is not a PublicKey will make it to fail too
|
# Passing a value that is not a PublicKey will make it to fail too
|
||||||
pk = get_random_value_hex(33)
|
pk = get_random_value_hex(33)
|
||||||
compressed_pk = Cryptographer.get_compressed_pk(pk)
|
|
||||||
|
|
||||||
assert compressed_pk is None
|
with pytest.raises(InvalidParameter, match="Wrong value passed as pk"):
|
||||||
|
Cryptographer.get_compressed_pk(pk)
|
||||||
|
|||||||
@@ -33,7 +33,7 @@ teos_get_all_appointments_endpoint = "{}/get_all_appointments".format(teos_base_
|
|||||||
teosd_process = run_teosd()
|
teosd_process = run_teosd()
|
||||||
|
|
||||||
teos_pk, cli_sk, compressed_cli_pk = teos_cli.load_keys(
|
teos_pk, cli_sk, compressed_cli_pk = teos_cli.load_keys(
|
||||||
cli_config.get("TEOS_PUBLIC_KEY"), cli_config.get("CLI_PRIVATE_KEY"), cli_config.get("CLI_PUBLIC_KEY")
|
cli_config.get("TEOS_PUBLIC_KEY"), cli_config.get("CLI_PRIVATE_KEY")
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@@ -238,7 +238,7 @@ def test_appointment_wrong_decryption_key(bitcoin_cli):
|
|||||||
# The appointment data is built using a random 32-byte value.
|
# The appointment data is built using a random 32-byte value.
|
||||||
appointment_data = build_appointment_data(bitcoin_cli, get_random_value_hex(32), penalty_tx)
|
appointment_data = build_appointment_data(bitcoin_cli, get_random_value_hex(32), penalty_tx)
|
||||||
|
|
||||||
# We can't use teos_cli.add_appointment here since it computes the locator internally, so let's do it manually.
|
# We cannot use teos_cli.add_appointment here since it computes the locator internally, so let's do it manually.
|
||||||
# We will encrypt the blob using the random value and derive the locator from the commitment tx.
|
# We will encrypt the blob using the random value and derive the locator from the commitment tx.
|
||||||
appointment_data["locator"] = compute_locator(bitcoin_cli.decoderawtransaction(commitment_tx).get("txid"))
|
appointment_data["locator"] = compute_locator(bitcoin_cli.decoderawtransaction(commitment_tx).get("txid"))
|
||||||
appointment_data["encrypted_blob"] = Cryptographer.encrypt(penalty_tx, get_random_value_hex(32))
|
appointment_data["encrypted_blob"] = Cryptographer.encrypt(penalty_tx, get_random_value_hex(32))
|
||||||
|
|||||||
@@ -14,6 +14,7 @@ from test.teos.unit.conftest import (
|
|||||||
bitcoin_cli,
|
bitcoin_cli,
|
||||||
get_config,
|
get_config,
|
||||||
bitcoind_connect_params,
|
bitcoind_connect_params,
|
||||||
|
generate_keypair,
|
||||||
)
|
)
|
||||||
|
|
||||||
config = get_config()
|
config = get_config()
|
||||||
@@ -99,7 +100,7 @@ def test_update_states_empty_list(db_manager, carrier, block_processor):
|
|||||||
db_manager=db_manager,
|
db_manager=db_manager,
|
||||||
block_processor=block_processor,
|
block_processor=block_processor,
|
||||||
responder=Responder(db_manager, carrier, block_processor),
|
responder=Responder(db_manager, carrier, block_processor),
|
||||||
sk_der=None,
|
sk_der=generate_keypair()[0].to_der(),
|
||||||
max_appointments=config.get("MAX_APPOINTMENTS"),
|
max_appointments=config.get("MAX_APPOINTMENTS"),
|
||||||
expiry_delta=config.get("EXPIRY_DELTA"),
|
expiry_delta=config.get("EXPIRY_DELTA"),
|
||||||
)
|
)
|
||||||
@@ -120,7 +121,7 @@ def test_update_states_responder_misses_more(run_bitcoind, db_manager, carrier,
|
|||||||
db_manager=db_manager,
|
db_manager=db_manager,
|
||||||
block_processor=block_processor,
|
block_processor=block_processor,
|
||||||
responder=Responder(db_manager, carrier, block_processor),
|
responder=Responder(db_manager, carrier, block_processor),
|
||||||
sk_der=None,
|
sk_der=generate_keypair()[0].to_der(),
|
||||||
max_appointments=config.get("MAX_APPOINTMENTS"),
|
max_appointments=config.get("MAX_APPOINTMENTS"),
|
||||||
expiry_delta=config.get("EXPIRY_DELTA"),
|
expiry_delta=config.get("EXPIRY_DELTA"),
|
||||||
)
|
)
|
||||||
@@ -145,7 +146,7 @@ def test_update_states_watcher_misses_more(db_manager, carrier, block_processor)
|
|||||||
db_manager=db_manager,
|
db_manager=db_manager,
|
||||||
block_processor=block_processor,
|
block_processor=block_processor,
|
||||||
responder=Responder(db_manager, carrier, block_processor),
|
responder=Responder(db_manager, carrier, block_processor),
|
||||||
sk_der=None,
|
sk_der=generate_keypair()[0].to_der(),
|
||||||
max_appointments=config.get("MAX_APPOINTMENTS"),
|
max_appointments=config.get("MAX_APPOINTMENTS"),
|
||||||
expiry_delta=config.get("EXPIRY_DELTA"),
|
expiry_delta=config.get("EXPIRY_DELTA"),
|
||||||
)
|
)
|
||||||
|
|||||||
Reference in New Issue
Block a user