Files
python-teos/test/unit/test_watcher.py
Sergi Delgado Segura 409f8fb5fb Adds Watcher unit tests
2019-10-14 16:58:31 +01:00

133 lines
3.8 KiB
Python

import pytest
import logging
from os import urandom
from time import sleep
from uuid import uuid4
from hashlib import sha256
from threading import Thread
from queue import Queue, Empty
from pisa import bitcoin_cli
from pisa.watcher import Watcher
from pisa.conf import EXPIRY_DELTA
from pisa.responder import Responder
from pisa.conf import MAX_APPOINTMENTS
from pisa.appointment import Appointment
from pisa.tools import check_txid_format
from test.simulator.bitcoind_sim import TIME_BETWEEN_BLOCKS
logging.getLogger().disabled = True
APPOINTMENTS = 5
@pytest.fixture(scope="module")
def watcher():
return Watcher()
def create_appointment(locator=None):
if locator is None:
locator = urandom(32).hex()
start_time = bitcoin_cli.getblockcount() + 1
end_time = start_time + 1
dispute_delta = 20
encrypted_blob_data = urandom(100).hex()
cipher = "AES-GCM-128"
hash_function = "SHA256"
return Appointment(locator, start_time, end_time, dispute_delta, encrypted_blob_data, cipher, hash_function)
def create_appointments(n):
locator_uuid_map = dict()
appointments = dict()
txids = []
for i in range(n):
txid = urandom(32)
uuid = uuid4().hex
locator = sha256(txid).hexdigest()
appointments[uuid] = create_appointment(locator)
locator_uuid_map[locator] = [uuid]
txids.append(txid.hex())
return appointments, locator_uuid_map, txids
def test_init(watcher):
assert type(watcher.appointments) is dict and len(watcher.appointments) == 0
assert type(watcher.locator_uuid_map) is dict and len(watcher.locator_uuid_map) == 0
assert watcher.block_queue is None
assert watcher.asleep is True
assert watcher.max_appointments == MAX_APPOINTMENTS
assert watcher.zmq_subscriber is None
assert type(watcher.responder) is Responder
def test_add_appointment(run_bitcoind, watcher):
# The watcher automatically fire do_watch and do_subscribe on adding an appointment if it is asleep (initial state).
# Avoid this by setting the state to awake.
watcher.asleep = False
# We should be able to add appointments up to the limit
for _ in range(10):
added_appointment = watcher.add_appointment(create_appointment())
assert added_appointment is True
def test_add_too_many_appointments(watcher):
# Any appointment on top of those should fail
watcher.appointments = dict()
for _ in range(MAX_APPOINTMENTS):
added_appointment = watcher.add_appointment(create_appointment())
assert added_appointment is True
added_appointment = watcher.add_appointment(create_appointment())
assert added_appointment is False
def test_do_subscribe(watcher):
watcher.block_queue = Queue()
zmq_thread = Thread(target=watcher.do_subscribe)
zmq_thread.daemon = True
zmq_thread.start()
try:
block_hash = watcher.block_queue.get(timeout=MAX_APPOINTMENTS)
assert check_txid_format(block_hash)
except Empty:
assert False
def test_do_watch(watcher):
# We will wipe all the previous data and add 5 appointments
watcher.appointments, watcher.locator_uuid_map, txids = create_appointments(APPOINTMENTS)
watch_thread = Thread(target=watcher.do_watch)
watch_thread.daemon = True
watch_thread.start()
# Broadcast the first two
for txid in txids[:2]:
bitcoin_cli.sendrawtransaction(txid)
# After leaving some time for the block to be mined and processed, the number of appointments should have reduced
# by two
sleep(TIME_BETWEEN_BLOCKS*2)
assert len(watcher.appointments) == APPOINTMENTS - 2
# The rest of appointments will timeout after the end (2) + EXPIRY_DELTA
# Wait for an additional block to be safe
sleep((EXPIRY_DELTA + 2 + 1) * TIME_BETWEEN_BLOCKS)
assert len(watcher.appointments) == 0
assert watcher.asleep is True