Files
python-teos/test/unit/test_watcher.py
2019-11-22 14:48:38 +00:00

259 lines
8.8 KiB
Python

import pytest
from uuid import uuid4
from hashlib import sha256
from threading import Thread
from binascii import unhexlify
from queue import Queue, Empty
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.exceptions import InvalidSignature
from pisa import c_logger
from pisa.watcher import Watcher
from pisa.responder import Responder
from pisa.tools import check_txid_format, bitcoin_cli
from test.unit.conftest import generate_block, generate_blocks, generate_dummy_appointment, get_random_value_hex
from pisa.conf import EXPIRY_DELTA, PISA_SECRET_KEY, MAX_APPOINTMENTS
c_logger.disabled = True
APPOINTMENTS = 5
START_TIME_OFFSET = 1
END_TIME_OFFSET = 1
TEST_SET_SIZE = 200
with open(PISA_SECRET_KEY, "r") as key_file:
pubkey_pem = key_file.read().encode("utf-8")
# TODO: should use the public key file instead, but it is not currently exported in the configuration
signing_key = load_pem_private_key(pubkey_pem, password=None, backend=default_backend())
public_key = signing_key.public_key()
@pytest.fixture(scope="module")
def watcher(db_manager):
return Watcher(db_manager)
@pytest.fixture(scope="module")
def txids():
return [get_random_value_hex(32) for _ in range(100)]
@pytest.fixture(scope="module")
def locator_uuid_map(txids):
return {sha256(unhexlify(txid)).hexdigest(): uuid4().hex for txid in txids}
def create_appointments(n):
locator_uuid_map = dict()
appointments = dict()
dispute_txs = []
for i in range(n):
appointment, dispute_tx = generate_dummy_appointment(
start_time_offset=START_TIME_OFFSET, end_time_offset=END_TIME_OFFSET
)
uuid = uuid4().hex
appointments[uuid] = appointment
locator_uuid_map[appointment.locator] = [uuid]
dispute_txs.append(dispute_tx)
return appointments, locator_uuid_map, dispute_txs
def is_signature_valid(appointment, signature, pk):
# verify the signature
try:
data = appointment.serialize()
pk.verify(signature, data, ec.ECDSA(hashes.SHA256()))
except InvalidSignature:
return False
return True
def test_init(watcher):
assert type(watcher.appointments) is dict and len(watcher.appointments) == 0
assert type(watcher.locator_uuid_map) is dict and len(watcher.locator_uuid_map) == 0
assert watcher.block_queue.empty()
assert watcher.asleep is True
assert watcher.max_appointments == MAX_APPOINTMENTS
assert watcher.zmq_subscriber is None
assert type(watcher.responder) is Responder
def test_init_no_key(db_manager):
try:
Watcher(db_manager, pisa_sk_file=None)
assert False
except ValueError:
assert True
def test_add_appointment(run_bitcoind, watcher):
# The watcher automatically fires do_watch and do_subscribe on adding an appointment if it is asleep (initial state)
# Avoid this by setting the state to awake.
watcher.asleep = False
# We should be able to add appointments up to the limit
for _ in range(10):
appointment, dispute_tx = generate_dummy_appointment(
start_time_offset=START_TIME_OFFSET, end_time_offset=END_TIME_OFFSET
)
added_appointment, sig = watcher.add_appointment(appointment)
assert added_appointment is True
assert is_signature_valid(appointment, sig, public_key)
# Check that we can also add an already added appointment (same locator)
added_appointment, sig = watcher.add_appointment(appointment)
assert added_appointment is True
assert is_signature_valid(appointment, sig, public_key)
def test_sign_appointment(watcher):
appointment, _ = generate_dummy_appointment(start_time_offset=START_TIME_OFFSET, end_time_offset=END_TIME_OFFSET)
signature = watcher.sign_appointment(appointment)
assert is_signature_valid(appointment, signature, public_key)
def test_add_too_many_appointments(watcher):
# Any appointment on top of those should fail
watcher.appointments = dict()
for _ in range(MAX_APPOINTMENTS):
appointment, dispute_tx = generate_dummy_appointment(
start_time_offset=START_TIME_OFFSET, end_time_offset=END_TIME_OFFSET
)
added_appointment, sig = watcher.add_appointment(appointment)
assert added_appointment is True
assert is_signature_valid(appointment, sig, public_key)
appointment, dispute_tx = generate_dummy_appointment(
start_time_offset=START_TIME_OFFSET, end_time_offset=END_TIME_OFFSET
)
added_appointment, sig = watcher.add_appointment(appointment)
assert added_appointment is False
assert sig is None
def test_do_subscribe(watcher):
watcher.block_queue = Queue()
zmq_thread = Thread(target=watcher.do_subscribe)
zmq_thread.daemon = True
zmq_thread.start()
try:
generate_block()
block_hash = watcher.block_queue.get()
assert check_txid_format(block_hash)
except Empty:
assert False
def test_do_watch(watcher):
# We will wipe all the previous data and add 5 appointments
watcher.appointments, watcher.locator_uuid_map, dispute_txs = create_appointments(APPOINTMENTS)
watch_thread = Thread(target=watcher.do_watch)
watch_thread.daemon = True
watch_thread.start()
# Broadcast the first two
for dispute_tx in dispute_txs[:2]:
bitcoin_cli().sendrawtransaction(dispute_tx)
# After leaving some time for the block to be mined and processed, the number of appointments should have reduced
# by two
generate_blocks(START_TIME_OFFSET + END_TIME_OFFSET)
assert len(watcher.appointments) == APPOINTMENTS - 2
# The rest of appointments will timeout after the end (2) + EXPIRY_DELTA
# Wait for an additional block to be safe
generate_blocks(EXPIRY_DELTA + START_TIME_OFFSET + END_TIME_OFFSET)
assert len(watcher.appointments) == 0
assert watcher.asleep is True
def test_get_matches(watcher, txids, locator_uuid_map):
watcher.locator_uuid_map = locator_uuid_map
potential_matches = watcher.get_matches(txids)
# All the txids must match
assert locator_uuid_map.keys() == potential_matches.keys()
def test_get_matches_random_data(watcher, locator_uuid_map):
# The likelihood of finding a potential match with random data should be negligible
watcher.locator_uuid_map = locator_uuid_map
txids = [get_random_value_hex(32) for _ in range(TEST_SET_SIZE)]
potential_matches = watcher.get_matches(txids)
# None of the txids should match
assert len(potential_matches) == 0
def test_filter_valid_matches_random_data(watcher):
appointments = {}
locator_uuid_map = {}
matches = {}
for i in range(TEST_SET_SIZE):
dummy_appointment, _ = generate_dummy_appointment()
uuid = uuid4().hex
appointments[uuid] = dummy_appointment
locator_uuid_map[dummy_appointment.locator] = [uuid]
if i % 2:
dispute_txid = get_random_value_hex(32)
matches[dummy_appointment.locator] = dispute_txid
watcher.locator_uuid_map = locator_uuid_map
watcher.appointments = appointments
filtered_valid_matches = watcher.filter_valid_matches(matches)
assert not any([fil_match["valid_match"] for uuid, fil_match in filtered_valid_matches.items()])
def test_filter_valid_matches(watcher):
dispute_txid = "0437cd7f8525ceed2324359c2d0ba26006d92d856a9c20fa0241106ee5a597c9"
encrypted_blob = (
"29f55518945408f567bb7feb4d7bb15ba88b7d8ca0223a44d5c67dfe32d038caee7613e35736025d95ad4ecd6538a50"
"74cbe8d7739705697a5dc4d19b8a6e4459ed2d1b0d0a9b18c49bc2187dcbfb4046b14d58a1add83235fc632efc398d5"
"0abcb7738f1a04b3783d025c1828b4e8a8dc8f13f2843e6bc3bf08eade02fc7e2c4dce7d2f83b055652e944ac114e0b"
"72a9abcd98fd1d785a5d976c05ed780e033e125fa083c6591b6029aa68dbc099f148a2bc2e0cb63733e68af717d48d5"
"a312b5f5b2fcca9561b2ff4191f9cdff936a43f6efef4ee45fbaf1f18d0a4b006f3fc8399dd8ecb21f709d4583bba14"
"4af6d49fa99d7be2ca21059a997475aa8642b66b921dc7fc0321b6a2f6927f6f9bab55c75e17a19dc3b2ae895b6d4a4"
"f64f8eb21b1e"
)
dummy_appointment, _ = generate_dummy_appointment()
dummy_appointment.encrypted_blob.data = encrypted_blob
dummy_appointment.locator = sha256(unhexlify(dispute_txid)).hexdigest()
uuid = uuid4().hex
appointments = {uuid: dummy_appointment}
locator_uuid_map = {dummy_appointment.locator: [uuid]}
matches = {dummy_appointment.locator: dispute_txid}
watcher.appointments = appointments
watcher.locator_uuid_map = locator_uuid_map
filtered_valid_matches = watcher.filter_valid_matches(matches)
assert all([fil_match["valid_match"] for uuid, fil_match in filtered_valid_matches.items()])