mirror of
https://github.com/aljazceru/python-teos.git
synced 2026-02-12 10:04:41 +01:00
Refactors the tests to match the new class definitions and config formats
This commit is contained in:
@@ -3,17 +3,16 @@ import time
|
||||
from queue import Queue
|
||||
from threading import Thread, Event, Condition
|
||||
|
||||
from teos.block_processor import BlockProcessor
|
||||
from teos.chain_monitor import ChainMonitor
|
||||
|
||||
from test.teos.unit.conftest import get_random_value_hex, generate_block
|
||||
from test.teos.unit.conftest import get_random_value_hex, generate_block, bitcoind_connect_params, bitcoind_feed_params
|
||||
|
||||
|
||||
def test_init(run_bitcoind):
|
||||
def test_init(run_bitcoind, block_processor):
|
||||
# run_bitcoind is started here instead of later on to avoid race conditions while it initializes
|
||||
|
||||
# Not much to test here, just sanity checks to make sure nothing goes south in the future
|
||||
chain_monitor = ChainMonitor(Queue(), Queue())
|
||||
chain_monitor = ChainMonitor(Queue(), Queue(), block_processor, bitcoind_feed_params)
|
||||
|
||||
assert chain_monitor.best_tip is None
|
||||
assert isinstance(chain_monitor.last_tips, list) and len(chain_monitor.last_tips) == 0
|
||||
@@ -27,8 +26,8 @@ def test_init(run_bitcoind):
|
||||
assert isinstance(chain_monitor.responder_queue, Queue)
|
||||
|
||||
|
||||
def test_notify_subscribers():
|
||||
chain_monitor = ChainMonitor(Queue(), Queue())
|
||||
def test_notify_subscribers(block_processor):
|
||||
chain_monitor = ChainMonitor(Queue(), Queue(), block_processor, bitcoind_feed_params)
|
||||
# Subscribers are only notified as long as they are awake
|
||||
new_block = get_random_value_hex(32)
|
||||
|
||||
@@ -42,11 +41,11 @@ def test_notify_subscribers():
|
||||
assert chain_monitor.responder_queue.get() == new_block
|
||||
|
||||
|
||||
def test_update_state():
|
||||
def test_update_state(block_processor):
|
||||
# The state is updated after receiving a new block (and only if the block is not already known).
|
||||
# Let's start by setting a best_tip and a couple of old tips
|
||||
new_block_hash = get_random_value_hex(32)
|
||||
chain_monitor = ChainMonitor(Queue(), Queue())
|
||||
chain_monitor = ChainMonitor(Queue(), Queue(), block_processor, bitcoind_feed_params)
|
||||
chain_monitor.best_tip = new_block_hash
|
||||
chain_monitor.last_tips = [get_random_value_hex(32) for _ in range(5)]
|
||||
|
||||
@@ -63,14 +62,15 @@ def test_update_state():
|
||||
assert chain_monitor.best_tip == another_block_hash and new_block_hash == chain_monitor.last_tips[-1]
|
||||
|
||||
|
||||
def test_monitor_chain_polling(db_manager):
|
||||
def test_monitor_chain_polling(db_manager, block_processor):
|
||||
# Try polling with the Watcher
|
||||
wq = Queue()
|
||||
chain_monitor = ChainMonitor(wq, Queue())
|
||||
chain_monitor.best_tip = BlockProcessor.get_best_block_hash()
|
||||
chain_monitor = ChainMonitor(Queue(), Queue(), block_processor, bitcoind_feed_params)
|
||||
chain_monitor.best_tip = block_processor.get_best_block_hash()
|
||||
chain_monitor.polling_delta = 0.1
|
||||
|
||||
# monitor_chain_polling runs until terminate if set
|
||||
polling_thread = Thread(target=chain_monitor.monitor_chain_polling, kwargs={"polling_delta": 0.1}, daemon=True)
|
||||
polling_thread = Thread(target=chain_monitor.monitor_chain_polling, daemon=True)
|
||||
polling_thread.start()
|
||||
|
||||
# Check that nothing changes as long as a block is not generated
|
||||
@@ -88,10 +88,10 @@ def test_monitor_chain_polling(db_manager):
|
||||
polling_thread.join()
|
||||
|
||||
|
||||
def test_monitor_chain_zmq(db_manager):
|
||||
rq = Queue()
|
||||
chain_monitor = ChainMonitor(Queue(), rq)
|
||||
chain_monitor.best_tip = BlockProcessor.get_best_block_hash()
|
||||
def test_monitor_chain_zmq(db_manager, block_processor):
|
||||
responder_queue = Queue()
|
||||
chain_monitor = ChainMonitor(Queue(), responder_queue, block_processor, bitcoind_feed_params)
|
||||
chain_monitor.best_tip = block_processor.get_best_block_hash()
|
||||
|
||||
zmq_thread = Thread(target=chain_monitor.monitor_chain_zmq, daemon=True)
|
||||
zmq_thread.start()
|
||||
@@ -106,9 +106,9 @@ def test_monitor_chain_zmq(db_manager):
|
||||
assert chain_monitor.responder_queue.empty()
|
||||
|
||||
|
||||
def test_monitor_chain(db_manager):
|
||||
def test_monitor_chain(db_manager, block_processor):
|
||||
# Not much to test here, this should launch two threads (one per monitor approach) and finish on terminate
|
||||
chain_monitor = ChainMonitor(Queue(), Queue())
|
||||
chain_monitor = ChainMonitor(Queue(), Queue(), block_processor, bitcoind_feed_params)
|
||||
|
||||
chain_monitor.best_tip = None
|
||||
chain_monitor.monitor_chain()
|
||||
@@ -131,15 +131,16 @@ def test_monitor_chain(db_manager):
|
||||
generate_block()
|
||||
|
||||
|
||||
def test_monitor_chain_single_update(db_manager):
|
||||
def test_monitor_chain_single_update(db_manager, block_processor):
|
||||
# This test tests that if both threads try to add the same block to the queue, only the first one will make it
|
||||
chain_monitor = ChainMonitor(Queue(), Queue())
|
||||
chain_monitor = ChainMonitor(Queue(), Queue(), block_processor, bitcoind_feed_params)
|
||||
|
||||
chain_monitor.best_tip = None
|
||||
chain_monitor.polling_delta = 2
|
||||
|
||||
# We will create a block and wait for the polling thread. Then check the queues to see that the block hash has only
|
||||
# been added once.
|
||||
chain_monitor.monitor_chain(polling_delta=2)
|
||||
chain_monitor.monitor_chain()
|
||||
generate_block()
|
||||
|
||||
watcher_block = chain_monitor.watcher_queue.get()
|
||||
|
||||
Reference in New Issue
Block a user