mirror of
https://github.com/aljazceru/python-teos.git
synced 2025-12-17 22:24:23 +01:00
Update unit tests to use serialize
This commit is contained in:
@@ -1,4 +1,6 @@
|
|||||||
import json
|
import json
|
||||||
|
import struct
|
||||||
|
import binascii
|
||||||
from pytest import fixture
|
from pytest import fixture
|
||||||
|
|
||||||
from pisa import c_logger
|
from pisa import c_logger
|
||||||
@@ -110,3 +112,27 @@ def test_from_dict(appointment_data):
|
|||||||
except ValueError:
|
except ValueError:
|
||||||
appointment_data[key] = prev_val
|
appointment_data[key] = prev_val
|
||||||
assert True
|
assert True
|
||||||
|
|
||||||
|
|
||||||
|
def test_serialize(appointment_data):
|
||||||
|
# From the tower end, appointments are only created if they pass the inspector tests, so not covering weird formats.
|
||||||
|
# Serialize may fail if, from the user end, the user tries to do it with an weird appointment. Not critical.
|
||||||
|
|
||||||
|
appointment = Appointment.from_dict(appointment_data)
|
||||||
|
serialized_appointment = appointment.serialize()
|
||||||
|
|
||||||
|
# Size must be 16 + 4 + 4 + 4 + len(encrypted_blob)
|
||||||
|
assert len(serialized_appointment) >= 28
|
||||||
|
assert isinstance(serialized_appointment, bytes)
|
||||||
|
|
||||||
|
locator = serialized_appointment[:16]
|
||||||
|
start_time = serialized_appointment[16:20]
|
||||||
|
end_time = serialized_appointment[20:24]
|
||||||
|
to_self_delay = serialized_appointment[24:28]
|
||||||
|
encrypted_blob = serialized_appointment[28:]
|
||||||
|
|
||||||
|
assert binascii.hexlify(locator).decode() == appointment.locator
|
||||||
|
assert struct.unpack(">I", start_time)[0] == appointment.start_time
|
||||||
|
assert struct.unpack(">I", end_time)[0] == appointment.end_time
|
||||||
|
assert struct.unpack(">I", to_self_delay)[0] == appointment.to_self_delay
|
||||||
|
assert binascii.hexlify(encrypted_blob).decode() == appointment.encrypted_blob.data
|
||||||
|
|||||||
@@ -211,7 +211,7 @@ def test_sign_wrong_rtype():
|
|||||||
# Calling sign with an rtype different than 'str' or 'bytes' should fail
|
# Calling sign with an rtype different than 'str' or 'bytes' should fail
|
||||||
for wtype in WRONG_TYPES:
|
for wtype in WRONG_TYPES:
|
||||||
try:
|
try:
|
||||||
Cryptographer.sign("", "", rtype=wtype)
|
Cryptographer.sign(b"", "", rtype=wtype)
|
||||||
assert False
|
assert False
|
||||||
|
|
||||||
except ValueError:
|
except ValueError:
|
||||||
@@ -221,20 +221,19 @@ def test_sign_wrong_rtype():
|
|||||||
def test_sign_wrong_sk():
|
def test_sign_wrong_sk():
|
||||||
# If a sk is not passed, sign will return None
|
# If a sk is not passed, sign will return None
|
||||||
for wtype in WRONG_TYPES:
|
for wtype in WRONG_TYPES:
|
||||||
assert Cryptographer.sign("", wtype) is None
|
assert Cryptographer.sign(b"", wtype) is None
|
||||||
|
|
||||||
|
|
||||||
# FIXME: signature_format is not covered, so we are not covering cases where the message is not in the proper format
|
|
||||||
# at the moment (related to #68, happens in multiple tests from here on)
|
|
||||||
def test_sign():
|
def test_sign():
|
||||||
# Otherwise we should get a signature
|
# Otherwise we should get a signature
|
||||||
sk, _ = generate_keypair()
|
sk, _ = generate_keypair()
|
||||||
|
message = b""
|
||||||
|
|
||||||
assert Cryptographer.sign(Cryptographer.signature_format(""), sk) is not None
|
assert Cryptographer.sign(message, sk) is not None
|
||||||
|
|
||||||
# Check that the returns work
|
# Check that the returns work
|
||||||
assert isinstance(Cryptographer.sign(Cryptographer.signature_format(""), sk, rtype="str"), str)
|
assert isinstance(Cryptographer.sign(message, sk, rtype="str"), str)
|
||||||
assert isinstance(Cryptographer.sign(Cryptographer.signature_format(""), sk, rtype="bytes"), bytes)
|
assert isinstance(Cryptographer.sign(message, sk, rtype="bytes"), bytes)
|
||||||
|
|
||||||
|
|
||||||
def test_verify_wrong_pk():
|
def test_verify_wrong_pk():
|
||||||
@@ -247,10 +246,10 @@ def test_verify_random_values():
|
|||||||
# Random values shouldn't verify
|
# Random values shouldn't verify
|
||||||
sk, pk = generate_keypair()
|
sk, pk = generate_keypair()
|
||||||
|
|
||||||
message = get_random_value_hex(32)
|
message = binascii.unhexlify(get_random_value_hex(32))
|
||||||
signature = get_random_value_hex(32)
|
signature = get_random_value_hex(32)
|
||||||
|
|
||||||
assert Cryptographer.verify(Cryptographer.signature_format(message), signature, pk) is False
|
assert Cryptographer.verify(message, signature, pk) is False
|
||||||
|
|
||||||
|
|
||||||
def test_verify_wrong_pair():
|
def test_verify_wrong_pair():
|
||||||
@@ -258,16 +257,28 @@ def test_verify_wrong_pair():
|
|||||||
sk, _ = generate_keypair()
|
sk, _ = generate_keypair()
|
||||||
_, pk = generate_keypair()
|
_, pk = generate_keypair()
|
||||||
|
|
||||||
message = get_random_value_hex(32)
|
message = binascii.unhexlify(get_random_value_hex(32))
|
||||||
signature = get_random_value_hex(32)
|
signature = get_random_value_hex(32)
|
||||||
|
|
||||||
assert Cryptographer.verify(Cryptographer.signature_format(message), signature, pk) is False
|
assert Cryptographer.verify(message, signature, pk) is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_verify_wrong_message():
|
||||||
|
# Verifying with a wrong keypair must fail
|
||||||
|
sk, pk = generate_keypair()
|
||||||
|
|
||||||
|
message = binascii.unhexlify(get_random_value_hex(32))
|
||||||
|
signature = Cryptographer.sign(message, sk)
|
||||||
|
|
||||||
|
wrong_message = binascii.unhexlify(get_random_value_hex(32))
|
||||||
|
|
||||||
|
assert Cryptographer.verify(wrong_message, signature, pk) is False
|
||||||
|
|
||||||
|
|
||||||
def test_verify():
|
def test_verify():
|
||||||
# A properly generated signature should verify
|
# A properly generated signature should verify
|
||||||
sk, pk = generate_keypair()
|
sk, pk = generate_keypair()
|
||||||
message = get_random_value_hex(32)
|
message = binascii.unhexlify(get_random_value_hex(32))
|
||||||
signature = Cryptographer.sign(Cryptographer.signature_format(message), sk)
|
signature = Cryptographer.sign(message, sk)
|
||||||
|
|
||||||
assert Cryptographer.verify(Cryptographer.signature_format(message), signature, pk) is True
|
assert Cryptographer.verify(message, signature, pk) is True
|
||||||
|
|||||||
@@ -15,7 +15,7 @@ from pisa.responder import TransactionTracker
|
|||||||
from pisa.watcher import Watcher
|
from pisa.watcher import Watcher
|
||||||
from pisa.tools import bitcoin_cli
|
from pisa.tools import bitcoin_cli
|
||||||
from pisa.db_manager import DBManager
|
from pisa.db_manager import DBManager
|
||||||
from pisa.appointment import Appointment
|
from common.appointment import Appointment
|
||||||
|
|
||||||
from test.simulator.utils import sha256d
|
from test.simulator.utils import sha256d
|
||||||
from test.simulator.transaction import TX
|
from test.simulator.transaction import TX
|
||||||
@@ -110,7 +110,7 @@ def generate_dummy_appointment_data(real_height=True, start_time_offset=5, end_t
|
|||||||
"encrypted_blob": encrypted_blob,
|
"encrypted_blob": encrypted_blob,
|
||||||
}
|
}
|
||||||
|
|
||||||
signature = Cryptographer.sign(Cryptographer.signature_format(appointment_data), client_sk)
|
signature = Cryptographer.sign(Appointment.from_dict(appointment_data).serialize(), client_sk)
|
||||||
pk_hex = hexlify(client_pk_der).decode("utf-8")
|
pk_hex = hexlify(client_pk_der).decode("utf-8")
|
||||||
|
|
||||||
data = {"appointment": appointment_data, "signature": signature, "public_key": pk_hex}
|
data = {"appointment": appointment_data, "signature": signature, "public_key": pk_hex}
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ from uuid import uuid4
|
|||||||
from pisa import c_logger
|
from pisa import c_logger
|
||||||
from pisa.responder import TransactionTracker
|
from pisa.responder import TransactionTracker
|
||||||
from pisa.cleaner import Cleaner
|
from pisa.cleaner import Cleaner
|
||||||
from pisa.appointment import Appointment
|
from common.appointment import Appointment
|
||||||
from pisa.db_manager import WATCHER_PREFIX
|
from pisa.db_manager import WATCHER_PREFIX
|
||||||
|
|
||||||
from test.pisa.unit.conftest import get_random_value_hex
|
from test.pisa.unit.conftest import get_random_value_hex
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ from cryptography.hazmat.primitives import serialization
|
|||||||
from pisa import c_logger
|
from pisa import c_logger
|
||||||
from pisa.errors import *
|
from pisa.errors import *
|
||||||
from pisa.inspector import Inspector
|
from pisa.inspector import Inspector
|
||||||
from pisa.appointment import Appointment
|
from common.appointment import Appointment
|
||||||
from pisa.block_processor import BlockProcessor
|
from pisa.block_processor import BlockProcessor
|
||||||
from pisa.conf import MIN_TO_SELF_DELAY
|
from pisa.conf import MIN_TO_SELF_DELAY
|
||||||
|
|
||||||
@@ -185,7 +185,9 @@ def test_check_appointment_signature():
|
|||||||
fake_sk = ec.generate_private_key(ec.SECP256K1, default_backend())
|
fake_sk = ec.generate_private_key(ec.SECP256K1, default_backend())
|
||||||
|
|
||||||
# Create a bad signature to make sure inspector rejects it
|
# Create a bad signature to make sure inspector rejects it
|
||||||
bad_signature = Cryptographer.sign(Cryptographer.signature_format(dummy_appointment_data["appointment"]), fake_sk)
|
bad_signature = Cryptographer.sign(
|
||||||
|
Appointment.from_dict(dummy_appointment_data["appointment"]).serialize(), fake_sk
|
||||||
|
)
|
||||||
assert (
|
assert (
|
||||||
Inspector.check_appointment_signature(dummy_appointment_data["appointment"], bad_signature, client_pk_hex)[0]
|
Inspector.check_appointment_signature(dummy_appointment_data["appointment"], bad_signature, client_pk_hex)[0]
|
||||||
== APPOINTMENT_INVALID_SIGNATURE
|
== APPOINTMENT_INVALID_SIGNATURE
|
||||||
@@ -202,12 +204,6 @@ def test_inspect(run_bitcoind):
|
|||||||
)
|
)
|
||||||
client_pk_hex = hexlify(client_pk_der).decode("utf-8")
|
client_pk_hex = hexlify(client_pk_der).decode("utf-8")
|
||||||
|
|
||||||
# Invalid appointment, every field is empty
|
|
||||||
appointment_data = dict()
|
|
||||||
signature = Cryptographer.sign(Cryptographer.signature_format(appointment_data), client_sk)
|
|
||||||
appointment = inspector.inspect(appointment_data, signature, client_pk)
|
|
||||||
assert type(appointment) == tuple and appointment[0] != 0
|
|
||||||
|
|
||||||
# Valid appointment
|
# Valid appointment
|
||||||
locator = get_random_value_hex(LOCATOR_LEN_BYTES)
|
locator = get_random_value_hex(LOCATOR_LEN_BYTES)
|
||||||
start_time = BlockProcessor.get_block_count() + 5
|
start_time = BlockProcessor.get_block_count() + 5
|
||||||
@@ -223,7 +219,7 @@ def test_inspect(run_bitcoind):
|
|||||||
"encrypted_blob": encrypted_blob,
|
"encrypted_blob": encrypted_blob,
|
||||||
}
|
}
|
||||||
|
|
||||||
signature = Cryptographer.sign(Cryptographer.signature_format(appointment_data), client_sk)
|
signature = Cryptographer.sign(Appointment.from_dict(appointment_data).serialize(), client_sk)
|
||||||
|
|
||||||
appointment = inspector.inspect(appointment_data, signature, client_pk_hex)
|
appointment = inspector.inspect(appointment_data, signature, client_pk_hex)
|
||||||
|
|
||||||
|
|||||||
@@ -92,13 +92,13 @@ def test_add_appointment(run_bitcoind, watcher):
|
|||||||
added_appointment, sig = watcher.add_appointment(appointment)
|
added_appointment, sig = watcher.add_appointment(appointment)
|
||||||
|
|
||||||
assert added_appointment is True
|
assert added_appointment is True
|
||||||
assert Cryptographer.verify(Cryptographer.signature_format(appointment.to_dict()), sig, public_key)
|
assert Cryptographer.verify(appointment.serialize(), sig, public_key)
|
||||||
|
|
||||||
# Check that we can also add an already added appointment (same locator)
|
# Check that we can also add an already added appointment (same locator)
|
||||||
added_appointment, sig = watcher.add_appointment(appointment)
|
added_appointment, sig = watcher.add_appointment(appointment)
|
||||||
|
|
||||||
assert added_appointment is True
|
assert added_appointment is True
|
||||||
assert Cryptographer.verify(Cryptographer.signature_format(appointment.to_dict()), sig, public_key)
|
assert Cryptographer.verify(appointment.serialize(), sig, public_key)
|
||||||
|
|
||||||
|
|
||||||
def test_add_too_many_appointments(watcher):
|
def test_add_too_many_appointments(watcher):
|
||||||
@@ -112,7 +112,7 @@ def test_add_too_many_appointments(watcher):
|
|||||||
added_appointment, sig = watcher.add_appointment(appointment)
|
added_appointment, sig = watcher.add_appointment(appointment)
|
||||||
|
|
||||||
assert added_appointment is True
|
assert added_appointment is True
|
||||||
assert Cryptographer.verify(Cryptographer.signature_format(appointment.to_dict()), sig, public_key)
|
assert Cryptographer.verify(appointment.serialize(), sig, public_key)
|
||||||
|
|
||||||
appointment, dispute_tx = generate_dummy_appointment(
|
appointment, dispute_tx = generate_dummy_appointment(
|
||||||
start_time_offset=START_TIME_OFFSET, end_time_offset=END_TIME_OFFSET
|
start_time_offset=START_TIME_OFFSET, end_time_offset=END_TIME_OFFSET
|
||||||
|
|||||||
Reference in New Issue
Block a user